session: add session process node 09/12609/6
authorFlorin Coras <fcoras@cisco.com>
Thu, 17 May 2018 02:28:24 +0000 (19:28 -0700)
committerDamjan Marion <dmarion.lists@gmail.com>
Fri, 18 May 2018 07:49:59 +0000 (07:49 +0000)
Add a session process node that handles main thread tx and retransmit in
order to avoid having a polling input node.

Change-Id: I3357e987c023a84b533b32793e37ab4204420f64
Signed-off-by: Florin Coras <fcoras@cisco.com>
src/vlib/main.c
src/vnet/sctp/sctp_output.c
src/vnet/session/session.c
src/vnet/session/session.h
src/vnet/session/session_node.c
src/vnet/tcp/tcp_output.c

index d1f7592..7da5192 100644 (file)
@@ -1276,6 +1276,7 @@ dispatch_process (vlib_main_t * vm,
   vlib_node_main_t *nm = &vm->node_main;
   vlib_node_runtime_t *node_runtime = &p->node_runtime;
   vlib_node_t *node = vlib_get_node (vm, node_runtime->node_index);
+  u32 old_process_index;
   u64 t;
   uword n_vectors, is_suspend;
 
@@ -1291,11 +1292,12 @@ dispatch_process (vlib_main_t * vm,
                             f ? f->n_vectors : 0, /* is_after */ 0);
 
   /* Save away current process for suspend. */
+  old_process_index = nm->current_process_index;
   nm->current_process_index = node->runtime_index;
 
   n_vectors = vlib_process_startup (vm, p, f);
 
-  nm->current_process_index = ~0;
+  nm->current_process_index = old_process_index;
 
   ASSERT (n_vectors != VLIB_PROCESS_RETURN_LONGJMP_RETURN);
   is_suspend = n_vectors == VLIB_PROCESS_RETURN_LONGJMP_SUSPEND;
index a65f166..3c83f68 100644 (file)
@@ -406,6 +406,8 @@ sctp_enqueue_to_ip_lookup (vlib_main_t * vm, vlib_buffer_t * b, u32 bi,
                           u8 is_ip4, u32 fib_index)
 {
   sctp_enqueue_to_ip_lookup_i (vm, b, bi, is_ip4, fib_index, 0);
+  if (vm->thread_index == 0 && vlib_num_workers ())
+    session_flush_frames_main_thread (vm);
 }
 
 /**
index 66cad2a..2697c26 100644 (file)
@@ -1272,6 +1272,14 @@ listen_session_get_local_session_endpoint (stream_session_t * listener,
   return 0;
 }
 
+void
+session_flush_frames_main_thread (vlib_main_t * vm)
+{
+  ASSERT (vlib_get_thread_index () == 0);
+  vlib_process_signal_event_mt (vm, session_queue_process_node.index,
+                               SESSION_Q_PROCESS_FLUSH_FRAMES, 0);
+}
+
 static clib_error_t *
 session_manager_main_enable (vlib_main_t * vm)
 {
@@ -1366,8 +1374,30 @@ void
 session_node_enable_disable (u8 is_en)
 {
   u8 state = is_en ? VLIB_NODE_STATE_POLLING : VLIB_NODE_STATE_DISABLED;
+  vlib_thread_main_t *vtm = vlib_get_thread_main ();
+  u8 have_workers = vtm->n_threads != 0;
+
   /* *INDENT-OFF* */
   foreach_vlib_main (({
+    if (have_workers && ii == 0)
+      {
+       vlib_node_set_state (this_vlib_main, session_queue_process_node.index,
+                            state);
+       if (is_en)
+         {
+           vlib_node_t *n = vlib_get_node (this_vlib_main,
+                                           session_queue_process_node.index);
+           vlib_start_process (this_vlib_main, n->runtime_index);
+         }
+       else
+         {
+           vlib_process_signal_event_mt (this_vlib_main,
+                                         session_queue_process_node.index,
+                                         SESSION_Q_PROCESS_STOP, 0);
+         }
+
+       continue;
+      }
     vlib_node_set_state (this_vlib_main, session_queue_node.index,
                          state);
   }));
index d6cc2cb..b24e229 100644 (file)
@@ -254,6 +254,10 @@ struct _session_manager_main
 
 extern session_manager_main_t session_manager_main;
 extern vlib_node_registration_t session_queue_node;
+extern vlib_node_registration_t session_queue_process_node;
+
+#define SESSION_Q_PROCESS_FLUSH_FRAMES 1
+#define SESSION_Q_PROCESS_STOP         2
 
 /*
  * Session manager function
@@ -594,6 +598,8 @@ int
 listen_session_get_local_session_endpoint (stream_session_t * listener,
                                           session_endpoint_t * sep);
 
+void session_flush_frames_main_thread (vlib_main_t * vm);
+
 always_inline u8
 session_manager_is_enabled ()
 {
index 07cca6d..46fc4dc 100644 (file)
@@ -869,6 +869,51 @@ session_queue_exit (vlib_main_t * vm)
 
 VLIB_MAIN_LOOP_EXIT_FUNCTION (session_queue_exit);
 
+static uword
+session_queue_process (vlib_main_t * vm, vlib_node_runtime_t * rt,
+                      vlib_frame_t * f)
+{
+  f64 now, timeout = 1.0;
+  uword *event_data = 0;
+  uword event_type;
+
+  while (1)
+    {
+      vlib_process_wait_for_event_or_clock (vm, timeout);
+      now = vlib_time_now (vm);
+      event_type = vlib_process_get_events (vm, (uword **) & event_data);
+
+      switch (event_type)
+       {
+       case SESSION_Q_PROCESS_FLUSH_FRAMES:
+         /* Flush the frames by updating all transports times */
+         transport_update_time (now, 0);
+         break;
+       case SESSION_Q_PROCESS_STOP:
+         timeout = 100000.0;
+         break;
+       case ~0:
+         /* Timed out. Update time for all transports to trigger all
+          * outstanding retransmits. */
+         transport_update_time (now, 0);
+         break;
+       }
+      vec_reset_length (event_data);
+    }
+  return 0;
+}
+
+/* *INDENT-OFF* */
+VLIB_REGISTER_NODE (session_queue_process_node) =
+{
+  .function = session_queue_process,
+  .type = VLIB_NODE_TYPE_PROCESS,
+  .name = "session-queue-process",
+  .state = VLIB_NODE_STATE_DISABLED,
+};
+/* *INDENT-ON* */
+
+
 /*
  * fd.io coding-style-patch-verification: ON
  *
index fe6d09c..0d938ca 100644 (file)
@@ -675,6 +675,8 @@ tcp_enqueue_to_ip_lookup (vlib_main_t * vm, vlib_buffer_t * b, u32 bi,
                          u8 is_ip4, u32 fib_index)
 {
   tcp_enqueue_to_ip_lookup_i (vm, b, bi, is_ip4, fib_index, 0);
+  if (vm->thread_index == 0 && vlib_num_workers ())
+    session_flush_frames_main_thread (vm);
 }
 
 always_inline void