tcp: improve waitclose in closing states
[vpp.git] / src / vnet / session / session.c
index 3dd80ad..6492ce7 100644 (file)
@@ -66,6 +66,7 @@ session_send_evt_to_thread (void *data, void *args, u32 thread_index,
       evt->rpc_args.arg = args;
       break;
     case FIFO_EVENT_APP_TX:
+    case SESSION_IO_EVT_TX_FLUSH:
     case FIFO_EVENT_BUILTIN_RX:
       evt->fifo = data;
       break;
@@ -194,7 +195,6 @@ session_alloc_for_connection (transport_connection_t * tc)
 
   s = session_alloc (thread_index);
   s->session_type = session_type_from_proto_and_ip (tc->proto, tc->is_ip4);
-  s->session_state = SESSION_STATE_CONNECTING;
   s->enqueue_epoch = (u64) ~ 0;
 
   /* Attach transport to session and vice versa */
@@ -629,6 +629,7 @@ session_stream_connect_notify (transport_connection_t * tc, u8 is_fail)
        }
       else
        {
+         new_s->session_state = SESSION_STATE_CONNECTING;
          new_s->app_wrk_index = app_wrk->wrk_index;
          new_si = new_s->session_index;
          new_ti = new_s->thread_index;
@@ -723,7 +724,7 @@ session_dgram_connect_notify (transport_connection_t * tc,
   return 0;
 }
 
-void
+int
 stream_session_accept_notify (transport_connection_t * tc)
 {
   app_worker_t *app_wrk;
@@ -733,9 +734,9 @@ stream_session_accept_notify (transport_connection_t * tc)
   s = session_get (tc->s_index, tc->thread_index);
   app_wrk = app_worker_get_if_valid (s->app_wrk_index);
   if (!app_wrk)
-    return;
+    return -1;
   app = application_get (app_wrk->app_index);
-  app->cb_fns.session_accept_callback (s);
+  return app->cb_fns.session_accept_callback (s);
 }
 
 /**
@@ -805,7 +806,10 @@ stream_session_delete_notify (transport_connection_t * tc)
     case SESSION_STATE_TRANSPORT_CLOSING:
       /* If transport finishes or times out before we get a reply
        * from the app, do the whole disconnect since we might still
-       * have lingering events */
+       * have lingering events. Cleanup session table in advance
+       * because transport will soon be closed and closed sessions
+       * are assumed to have been removed from the lookup table */
+      session_lookup_del_session (s);
       stream_session_disconnect (s);
       s->session_state = SESSION_STATE_CLOSED;
       break;
@@ -816,15 +820,33 @@ stream_session_delete_notify (transport_connection_t * tc)
       break;
     case SESSION_STATE_CLOSED:
     case SESSION_STATE_ACCEPTING:
+    case SESSION_STATE_CLOSED_WAITING:
       stream_session_delete (s);
       break;
     default:
-      /* Assume connection was not yet added the lookup table */
-      session_free_w_fifos (s);
+      stream_session_delete (s);
       break;
     }
 }
 
+/**
+ * Notification from transport that session can be closed
+ *
+ * Should be called by transport only if it was closed with non-empty
+ * tx fifo and once it decides to begin the closing procedure prior to
+ * issuing a delete notify. This gives the chance to the session layer
+ * to cleanup any outstanding events.
+ */
+void
+session_stream_close_notify (transport_connection_t * tc)
+{
+  stream_session_t *s;
+
+  if (!(s = session_get_if_valid (tc->s_index, tc->thread_index)))
+    return;
+  s->session_state = SESSION_STATE_CLOSED;
+}
+
 /**
  * Notify application that connection has been reset.
  */
@@ -835,7 +857,10 @@ stream_session_reset_notify (transport_connection_t * tc)
   app_worker_t *app_wrk;
   application_t *app;
   s = session_get (tc->s_index, tc->thread_index);
-  s->session_state = SESSION_STATE_CLOSED;
+  svm_fifo_dequeue_drop_all (s->server_tx_fifo);
+  if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSING)
+    return;
+  s->session_state = SESSION_STATE_TRANSPORT_CLOSING;
   app_wrk = app_worker_get (s->app_wrk_index);
   app = application_get (app_wrk->app_index);
   app->cb_fns.session_reset_callback (s);
@@ -869,7 +894,7 @@ stream_session_accept (transport_connection_t * tc, u32 listener_index,
   if (notify)
     {
       application_t *app = application_get (app_wrk->app_index);
-      app->cb_fns.session_accept_callback (s);
+      return app->cb_fns.session_accept_callback (s);
     }
 
   return 0;
@@ -1081,7 +1106,7 @@ stream_session_disconnect (stream_session_t * s)
    * held, just append a new event to pending disconnects vector. */
   if (vlib_thread_is_main_w_barrier () || thread_index == s->thread_index)
     {
-      wrk = session_manager_get_worker (thread_index);
+      wrk = session_manager_get_worker (s->thread_index);
       vec_add2 (wrk->pending_disconnects, evt, 1);
       clib_memset (evt, 0, sizeof (*evt));
       evt->session_handle = session_handle (s);
@@ -1107,7 +1132,18 @@ stream_session_disconnect_transport (stream_session_t * s)
       session_free_w_fifos (s);
       return;
     }
-  s->session_state = SESSION_STATE_CLOSED;
+
+  /* If tx queue wasn't drained, change state to closed waiting for transport.
+   * This way, the transport, if it so wishes, can continue to try sending the
+   * outstanding data (in closed state it cannot). It MUST however at one
+   * point, either after sending everything or after a timeout, call delete
+   * notify. This will finally lead to the complete cleanup of the session.
+   */
+  if (svm_fifo_max_dequeue (s->server_tx_fifo))
+    s->session_state = SESSION_STATE_CLOSED_WAITING;
+  else
+    s->session_state = SESSION_STATE_CLOSED;
+
   tp_vfts[session_get_transport_proto (s)].close (s->connection_index,
                                                  s->thread_index);
 }
@@ -1204,11 +1240,10 @@ session_vpp_event_queues_allocate (session_manager_main_t * smm)
   for (i = 0; i < vec_len (smm->wrk); i++)
     {
       svm_msg_q_cfg_t _cfg, *cfg = &_cfg;
-      u32 notif_q_size = clib_max (16, evt_q_length >> 4);
       svm_msg_q_ring_cfg_t rc[SESSION_MQ_N_RINGS] = {
        {evt_q_length, evt_size, 0}
        ,
-       {notif_q_size, 256, 0}
+       {evt_q_length << 1, 256, 0}
       };
       cfg->consumer_pid = 0;
       cfg->n_rings = 2;
@@ -1366,6 +1401,7 @@ session_manager_main_enable (vlib_main_t * vm)
       _vec_len (wrk->postponed_event_vector) = 0;
 
       wrk->last_vlib_time = vlib_time_now (vlib_mains[i]);
+      wrk->dispatch_period = 500e-6;
 
       if (num_threads > 1)
        clib_rwlock_init (&smm->wrk[i].peekers_rw_locks);