session: improve disconnect handling 34/12834/3
authorFlorin Coras <fcoras@cisco.com>
Fri, 1 Jun 2018 00:14:10 +0000 (17:14 -0700)
committerDave Barach <openvpp@barachs.net>
Fri, 1 Jun 2018 21:18:45 +0000 (21:18 +0000)
If the caller is the session owning thread or the main thread with a
worker barrier sync (cli/api) add an event to the pending disconnects
vector in the session node and entirely avoid using the event queue.
Useful for bursts of disconnects (like an app detach).

If disconnects come from a processes, be willing to retry enqueueing the
disconnect to the event queue multiple times.

Change-Id: Ieece1f1091b713f94c41c703b6e805bc8498816a
Signed-off-by: Florin Coras <fcoras@cisco.com>
src/svm/svm_fifo.c
src/vnet/session/session.c
src/vnet/session/session_node.c
src/vnet/session/stream_session.h

index dbdb813..10c3192 100644 (file)
@@ -174,7 +174,8 @@ format_svm_fifo (u8 * s, va_list * args)
 
   s = format (s, "cursize %u nitems %u has_event %d\n",
              f->cursize, f->nitems, f->has_event);
-  s = format (s, " head %d tail %d\n", f->head, f->tail);
+  s = format (s, " head %d tail %d segment manager %u\n", f->head, f->tail,
+             f->segment_manager);
 
   if (verbose > 1)
     s = format
index c016325..ee02526 100644 (file)
@@ -31,9 +31,9 @@ static void
 session_send_evt_to_thread (u64 session_handle, fifo_event_type_t evt_type,
                            u32 thread_index, void *fp, void *rpc_args)
 {
-  u32 tries = 0;
   session_fifo_event_t evt = { {0}, };
   svm_queue_t *q;
+  u32 tries = 0, max_tries;
 
   evt.event_type = evt_type;
   if (evt_type == FIFO_EVENT_RPC)
@@ -47,7 +47,8 @@ session_send_evt_to_thread (u64 session_handle, fifo_event_type_t evt_type,
   q = session_manager_get_vpp_event_queue (thread_index);
   while (svm_queue_add (q, (u8 *) & evt, 1))
     {
-      if (tries++ == 3)
+      max_tries = vlib_get_current_process (vlib_get_main ())? 1e6 : 3;
+      if (tries++ == max_tries)
        {
          SESSION_DBG ("failed to enqueue evt");
          break;
@@ -450,7 +451,7 @@ session_enqueue_notify (stream_session_t * s, u8 block)
   session_fifo_event_t evt;
   svm_queue_t *q;
 
-  if (PREDICT_FALSE (s->session_state == SESSION_STATE_CLOSED))
+  if (PREDICT_FALSE (s->session_state >= SESSION_STATE_CLOSING))
     {
       /* Session is closed so app will never clean up. Flush rx fifo */
       u32 to_dequeue = svm_fifo_max_dequeue (s->server_rx_fifo);
@@ -1059,11 +1060,28 @@ stream_session_stop_listen (stream_session_t * s)
 void
 stream_session_disconnect (stream_session_t * s)
 {
-  if (!s || s->session_state == SESSION_STATE_CLOSED)
+  u32 thread_index = vlib_get_thread_index ();
+  session_manager_main_t *smm = &session_manager_main;
+  session_fifo_event_t *evt;
+
+  if (!s || s->session_state >= SESSION_STATE_CLOSING)
     return;
-  s->session_state = SESSION_STATE_CLOSED;
-  session_send_session_evt_to_thread (session_handle (s),
-                                     FIFO_EVENT_DISCONNECT, s->thread_index);
+  s->session_state = SESSION_STATE_CLOSING;
+
+  /* If we are in the handler thread, or being called with the worker barrier
+   * held (api/cli), just append a new event pending disconnects vector. */
+  if (thread_index > 0 || !vlib_get_current_process (vlib_get_main ()))
+    {
+      ASSERT (s->thread_index == thread_index || thread_index == 0);
+      vec_add2 (smm->pending_disconnects[s->thread_index], evt, 1);
+      memset (evt, 0, sizeof (*evt));
+      evt->session_handle = session_handle (s);
+      evt->event_type = FIFO_EVENT_DISCONNECT;
+    }
+  else
+    session_send_session_evt_to_thread (session_handle (s),
+                                       FIFO_EVENT_DISCONNECT,
+                                       s->thread_index);
 }
 
 /**
index e046efb..b6c1b2f 100644 (file)
@@ -692,7 +692,7 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
                       vlib_frame_t * frame)
 {
   session_manager_main_t *smm = vnet_get_session_manager_main ();
-  session_fifo_event_t *my_pending_event_vector, *pending_disconnects, *e;
+  session_fifo_event_t *my_pending_event_vector, *e;
   session_fifo_event_t *my_fifo_events;
   u32 n_to_dequeue, n_events;
   svm_queue_t *q;
@@ -722,10 +722,9 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
   /* min number of events we can dequeue without blocking */
   n_to_dequeue = q->cursize;
   my_pending_event_vector = smm->pending_event_vector[thread_index];
-  pending_disconnects = smm->pending_disconnects[thread_index];
 
   if (!n_to_dequeue && !vec_len (my_pending_event_vector)
-      && !vec_len (pending_disconnects))
+      && !vec_len (smm->pending_disconnects[thread_index]))
     return 0;
 
   SESSION_EVT_DBG (SESSION_EVT_DEQ_NODE, 0);
@@ -802,12 +801,20 @@ skip_dequeue:
        case FIFO_EVENT_DISCONNECT:
          /* Make sure stream disconnects run after the pending list is drained */
          s0 = session_get_from_handle (e0->session_handle);
-         if (!e0->postponed || svm_fifo_max_dequeue (s0->server_tx_fifo))
+         if (!e0->postponed)
            {
              e0->postponed = 1;
              vec_add1 (smm->pending_disconnects[thread_index], *e0);
              continue;
            }
+         /* If tx queue is still not empty, wait a bit */
+         if (svm_fifo_max_dequeue (s0->server_tx_fifo)
+             && e0->postponed < 200)
+           {
+             e0->postponed += 1;
+             vec_add1 (smm->pending_disconnects[thread_index], *e0);
+             continue;
+           }
 
          stream_session_disconnect_transport (s0);
          break;
index 9e0e4d9..1673356 100644 (file)
@@ -32,6 +32,7 @@ typedef enum
   SESSION_STATE_ACCEPTING,
   SESSION_STATE_READY,
   SESSION_STATE_OPENED,
+  SESSION_STATE_CLOSING,
   SESSION_STATE_CLOSED,
   SESSION_STATE_N_STATES,
 } stream_session_state_t;