VPP-846: tcp perf / scale / hardening 31/6731/2
authorDave Barach <dbarach@cisco.com>
Tue, 16 May 2017 21:41:34 +0000 (17:41 -0400)
committerFlorin Coras <florin.coras@gmail.com>
Wed, 17 May 2017 04:52:46 +0000 (04:52 +0000)
Fix builtin server event queue handling

Change-Id: I21b49c37188746cadb2fd9423291c5dc1335798c
Signed-off-by: Dave Barach <dbarach@cisco.com>
src/svm/svm_fifo.h
src/vnet/session/node.c
src/vnet/session/session.c
src/vnet/session/session.h
src/vnet/tcp/builtin_client.c
src/vnet/tcp/builtin_server.c
src/vnet/unix/gdb_funcs.c

index 36158dc..6936916 100644 (file)
@@ -44,7 +44,7 @@ typedef struct
   u32 nitems;
     CLIB_CACHE_LINE_ALIGN_MARK (end_cursize);
 
-  volatile u8 has_event;       /**< non-zero if deq event exists */
+  volatile u32 has_event;      /**< non-zero if deq event exists */
 
   /* Backpointers */
   u32 master_session_index;
@@ -103,7 +103,7 @@ always_inline void
 svm_fifo_unset_event (svm_fifo_t * f)
 {
   /* Probably doesn't need to be atomic. Still, better avoid surprises */
-  __sync_lock_test_and_set (&f->has_event, 0);
+  __sync_lock_release (&f->has_event);
 }
 
 svm_fifo_t *svm_fifo_create (u32 data_size_in_bytes);
index fffc8eb..3053ccc 100644 (file)
@@ -154,7 +154,7 @@ session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node,
   /* Can't make any progress */
   if (snd_space0 == 0 || snd_mss0 == 0)
     {
-      vec_add1 (smm->evts_partially_read[thread_index], *e0);
+      vec_add1 (smm->pending_event_vector[thread_index], *e0);
       return 0;
     }
 
@@ -216,7 +216,7 @@ session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node,
            {
              if (svm_fifo_set_event (s0->server_tx_fifo))
                {
-                 vec_add1 (smm->evts_partially_read[thread_index], *e0);
+                 vec_add1 (smm->pending_event_vector[thread_index], *e0);
                }
              return -1;
            }
@@ -324,7 +324,7 @@ session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node,
       /* If we don't already have new event */
       if (svm_fifo_set_event (s0->server_tx_fifo))
        {
-         vec_add1 (smm->evts_partially_read[thread_index], *e0);
+         vec_add1 (smm->pending_event_vector[thread_index], *e0);
        }
     }
   return 0;
@@ -338,7 +338,7 @@ dequeue_fail:
 
   if (svm_fifo_set_event (s0->server_tx_fifo))
     {
-      vec_add1 (smm->evts_partially_read[thread_index], *e0);
+      vec_add1 (smm->pending_event_vector[thread_index], *e0);
     }
   vlib_put_next_frame (vm, node, next_index, n_left_to_next + 1);
   _vec_len (smm->tx_buffers[thread_index]) += 1;
@@ -388,12 +388,70 @@ session_event_get_session (session_fifo_event_t * e0, u8 thread_index)
   return s0;
 }
 
+void
+dump_thread_0_event_queue (void)
+{
+  session_manager_main_t *smm = vnet_get_session_manager_main ();
+  vlib_main_t *vm = &vlib_global_main;
+  u32 my_thread_index = vm->thread_index;
+  session_fifo_event_t _e, *e = &_e;
+  stream_session_t *s0;
+  int i, index;
+  i8 *headp;
+
+  unix_shared_memory_queue_t *q;
+  q = smm->vpp_event_queues[my_thread_index];
+
+  index = q->head;
+
+  for (i = 0; i < q->cursize; i++)
+    {
+      headp = (i8 *) (&q->data[0] + q->elsize * index);
+      clib_memcpy (e, headp, q->elsize);
+
+      switch (e->event_type)
+       {
+       case FIFO_EVENT_APP_TX:
+         s0 = session_event_get_session (e, my_thread_index);
+         fformat (stdout, "[%04d] TX session %d\n", i, s0->session_index);
+         break;
+
+       case FIFO_EVENT_DISCONNECT:
+         s0 = stream_session_get_from_handle (e->session_handle);
+         fformat (stdout, "[%04d] disconnect session %d\n", i,
+                  s0->session_index);
+         break;
+
+       case FIFO_EVENT_BUILTIN_RX:
+         s0 = session_event_get_session (e, my_thread_index);
+         fformat (stdout, "[%04d] builtin_rx %d\n", i, s0->session_index);
+         break;
+
+       case FIFO_EVENT_RPC:
+         fformat (stdout, "[%04d] RPC call %llx with %llx\n",
+                  i, (u64) (e->rpc_args.fp), (u64) (e->rpc_args.arg));
+         break;
+
+       default:
+         fformat (stdout, "[%04d] unhandled event type %d\n",
+                  i, e->event_type);
+         break;
+       }
+
+      index++;
+
+      if (index == q->maxsize)
+       index = 0;
+    }
+}
+
 static uword
 session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
                       vlib_frame_t * frame)
 {
   session_manager_main_t *smm = vnet_get_session_manager_main ();
-  session_fifo_event_t *my_fifo_events, *e;
+  session_fifo_event_t *my_pending_event_vector, *e;
+  session_fifo_event_t *my_fifo_events;
   u32 n_to_dequeue, n_events;
   unix_shared_memory_queue_t *q;
   application_t *app;
@@ -417,11 +475,13 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
   if (PREDICT_FALSE (q == 0))
     return 0;
 
+  my_fifo_events = smm->free_event_vector[my_thread_index];
+
   /* min number of events we can dequeue without blocking */
   n_to_dequeue = q->cursize;
-  my_fifo_events = smm->fifo_events[my_thread_index];
+  my_pending_event_vector = smm->pending_event_vector[my_thread_index];
 
-  if (n_to_dequeue == 0 && vec_len (my_fifo_events) == 0)
+  if (n_to_dequeue == 0 && vec_len (my_pending_event_vector) == 0)
     return 0;
 
   SESSION_EVT_DBG (SESSION_EVT_DEQ_NODE, 0);
@@ -431,7 +491,7 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
    * over them again without dequeuing new ones.
    */
   /* XXX: Block senders to sessions that can't keep up */
-  if (vec_len (my_fifo_events) >= 100)
+  if (0 && vec_len (my_pending_event_vector) >= 100)
     {
       clib_warning ("too many fifo events unsolved");
       goto skip_dequeue;
@@ -452,7 +512,10 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
     (void) pthread_cond_broadcast (&q->condvar);
   pthread_mutex_unlock (&q->mutex);
 
-  smm->fifo_events[my_thread_index] = my_fifo_events;
+  vec_append (my_fifo_events, my_pending_event_vector);
+
+  _vec_len (my_pending_event_vector) = 0;
+  smm->pending_event_vector[my_thread_index] = my_pending_event_vector;
 
 skip_dequeue:
   n_events = vec_len (my_fifo_events);
@@ -483,8 +546,10 @@ skip_dequeue:
                                                        &n_tx_packets);
          /* Out of buffers */
          if (rv < 0)
-           goto done;
-
+           {
+             vec_add1 (smm->pending_event_vector[my_thread_index], *e0);
+             continue;
+           }
          break;
        case FIFO_EVENT_DISCONNECT:
          s0 = stream_session_get_from_handle (e0->session_handle);
@@ -507,25 +572,8 @@ skip_dequeue:
        }
     }
 
-done:
-
-  /* Couldn't process all events. Probably out of buffers */
-  if (PREDICT_FALSE (i < n_events))
-    {
-      session_fifo_event_t *partially_read =
-       smm->evts_partially_read[my_thread_index];
-      vec_add (partially_read, &my_fifo_events[i], n_events - i);
-      vec_free (my_fifo_events);
-      smm->fifo_events[my_thread_index] = partially_read;
-      smm->evts_partially_read[my_thread_index] = 0;
-    }
-  else
-    {
-      vec_free (smm->fifo_events[my_thread_index]);
-      smm->fifo_events[my_thread_index] =
-       smm->evts_partially_read[my_thread_index];
-      smm->evts_partially_read[my_thread_index] = 0;
-    }
+  _vec_len (my_fifo_events) = 0;
+  smm->free_event_vector[my_thread_index] = my_fifo_events;
 
   vlib_node_increment_counter (vm, session_queue_node.index,
                               SESSION_QUEUE_ERROR_TX, n_tx_packets);
index d0792fa..c5aaf2e 100644 (file)
@@ -732,10 +732,6 @@ stream_session_connect_notify (transport_connection_t * tc, u8 sst,
 
   /* Cleanup session lookup */
   stream_session_half_open_table_del (smm, sst, tc);
-
-  /* Add to established lookup table */
-  handle = (((u64) tc->thread_index) << 32) | (u64) new_s->session_index;
-  stream_session_table_add_for_tc (tc, handle);
 }
 
 void
@@ -1091,11 +1087,19 @@ session_manager_main_enable (vlib_main_t * vm)
   vec_validate (smm->sessions, num_threads - 1);
   vec_validate (smm->session_indices_to_enqueue_by_thread, num_threads - 1);
   vec_validate (smm->tx_buffers, num_threads - 1);
-  vec_validate (smm->fifo_events, num_threads - 1);
-  vec_validate (smm->evts_partially_read, num_threads - 1);
+  vec_validate (smm->pending_event_vector, num_threads - 1);
+  vec_validate (smm->free_event_vector, num_threads - 1);
   vec_validate (smm->current_enqueue_epoch, num_threads - 1);
   vec_validate (smm->vpp_event_queues, num_threads - 1);
 
+  for (i = 0; i < num_threads; i++)
+    {
+      vec_validate (smm->free_event_vector[i], 0);
+      _vec_len (smm->free_event_vector[i]) = 0;
+      vec_validate (smm->pending_event_vector[i], 0);
+      _vec_len (smm->pending_event_vector[i]) = 0;
+    }
+
 #if SESSION_DBG
   vec_validate (smm->last_event_poll_by_thread, num_threads - 1);
 #endif
index a08fa23..d60cca2 100644 (file)
@@ -197,10 +197,10 @@ struct _session_manager_main
   u32 **tx_buffers;
 
   /** Per worker-thread vector of partially read events */
-  session_fifo_event_t **evts_partially_read;
+  session_fifo_event_t **free_event_vector;
 
   /** per-worker active event vectors */
-  session_fifo_event_t **fifo_events;
+  session_fifo_event_t **pending_event_vector;
 
   /** vpp fifo event queue */
   unix_shared_memory_queue_t **vpp_event_queues;
index a0e61f4..d13fd44 100644 (file)
@@ -44,7 +44,7 @@
 #undef vl_printfun
 
 #define TCP_BUILTIN_CLIENT_DBG (1)
-#define TCP_BUILTIN_CLIENT_VPP_THREAD (1)
+#define TCP_BUILTIN_CLIENT_VPP_THREAD (0)
 #define TCP_BUILTIN_CLIENT_PTHREAD (!TCP_BUILTIN_CLIENT_VPP_THREAD)
 
 static void
index 621ce02..64fc4a7 100644 (file)
@@ -141,16 +141,14 @@ builtin_server_rx_callback (stream_session_t * s)
   session_fifo_event_t evt;
   static int serial_number = 0;
 
+  tx_fifo = s->server_tx_fifo;
+  rx_fifo = s->server_rx_fifo;
+
   max_dequeue = svm_fifo_max_dequeue (s->server_rx_fifo);
   max_enqueue = svm_fifo_max_enqueue (s->server_tx_fifo);
 
   if (PREDICT_FALSE (max_dequeue == 0))
-    {
-      return 0;
-    }
-
-  tx_fifo = s->server_tx_fifo;
-  rx_fifo = s->server_rx_fifo;
+    return 0;
 
   /* Number of bytes we're going to copy */
   max_transfer = (max_dequeue < max_enqueue) ? max_dequeue : max_enqueue;
@@ -175,8 +173,6 @@ builtin_server_rx_callback (stream_session_t * s)
       return 0;
     }
 
-  svm_fifo_unset_event (rx_fifo);
-
   vec_validate (bsm->rx_buf, max_transfer - 1);
   _vec_len (bsm->rx_buf) = max_transfer;
 
index cfb4b24..cca2e42 100644 (file)
@@ -20,7 +20,7 @@
 #include <vlib/vlib.h>
 
 #include <vlib/threads.h>
-
+#include <vnet/vnet.h>
 
 
 /**
@@ -164,6 +164,16 @@ VLIB_CLI_COMMAND (show_gdb_funcs_command, static) = {
   .function = show_gdb_command_fn,
 };
 
+vnet_buffer_opaque_t *vb (void *vb_arg)
+{
+    vlib_buffer_t *b = (vlib_buffer_t *)vb_arg;
+    vnet_buffer_opaque_t *rv;
+    
+    rv = vnet_buffer (b);
+
+    return rv;
+}
+
 /* Cafeteria plan, maybe you don't want these functions */
 clib_error_t * 
 gdb_func_init (vlib_main_t * vm) { return 0; }