session: rpc for connects to main 24/32224/13
authorFlorin Coras <fcoras@cisco.com>
Wed, 5 May 2021 16:54:00 +0000 (09:54 -0700)
committerFlorin Coras <fcoras@cisco.com>
Sun, 16 May 2021 16:40:11 +0000 (09:40 -0700)
Type: improvement

Signed-off-by: Florin Coras <fcoras@cisco.com>
Change-Id: Ifa47e1500e5cfb3c717f87b1d21131b9531c9005

src/vnet/session/session.c
src/vnet/session/session.h
src/vnet/session/session_node.c

index f6b61ab..f38db77 100644 (file)
@@ -1818,6 +1818,7 @@ session_manager_main_enable (vlib_main_t * vm)
       wrk->ctrl_head = clib_llist_make_head (wrk->event_elts, evt_list);
       wrk->new_head = clib_llist_make_head (wrk->event_elts, evt_list);
       wrk->old_head = clib_llist_make_head (wrk->event_elts, evt_list);
+      wrk->pending_connects = clib_llist_make_head (wrk->event_elts, evt_list);
       wrk->vm = vlib_get_main_by_index (i);
       wrk->last_vlib_time = vlib_time_now (vm);
       wrk->last_vlib_us_time = wrk->last_vlib_time * CLIB_US_TIME_FREQ;
index bf32681..245ec25 100644 (file)
@@ -145,6 +145,12 @@ typedef struct session_worker_
   /** Clib file for timerfd. Used only if adaptive mode is on */
   uword timerfd_file;
 
+  /** List of pending connects for first worker */
+  clib_llist_index_t pending_connects;
+
+  /** Flag that is set if main thread signaled to handle connects */
+  u32 pending_connects_ntf;
+
 #if SESSION_DEBUG
   /** last event poll time by thread */
   clib_time_type_t last_event_poll;
index b68ff53..bd60bd7 100644 (file)
@@ -119,16 +119,13 @@ session_mq_listen_uri_handler (void *data)
 }
 
 static void
-session_mq_connect_handler (void *data)
+session_mq_connect_one (session_connect_msg_t *mp)
 {
-  session_connect_msg_t *mp = (session_connect_msg_t *) data;
   vnet_connect_args_t _a, *a = &_a;
   app_worker_t *app_wrk;
   application_t *app;
   int rv;
 
-  app_check_thread_and_barrier (session_mq_connect_handler, mp);
-
   app = application_lookup (mp->client_index);
   if (!app)
     return;
@@ -167,6 +164,85 @@ session_mq_connect_handler (void *data)
     session_mq_free_ext_config (app, mp->ext_config);
 }
 
+static void
+session_mq_handle_connects_rpc (void *arg)
+{
+  u32 max_connects = 32, n_connects = 0;
+  vlib_main_t *vm = vlib_get_main ();
+  session_evt_elt_t *he, *elt, *next;
+  session_worker_t *fwrk;
+  u8 need_reschedule = 1;
+
+  ASSERT (vlib_get_thread_index () == 0);
+
+  /* Pending connects on linked list pertaining to first worker */
+  fwrk = session_main_get_worker (1);
+
+  vlib_worker_thread_barrier_sync (vm);
+
+  he = pool_elt_at_index (fwrk->event_elts, fwrk->pending_connects);
+  elt = clib_llist_next (fwrk->event_elts, evt_list, he);
+
+  /* Avoid holding the barrier for too long */
+  while (n_connects < max_connects && elt != he)
+    {
+      next = clib_llist_next (fwrk->event_elts, evt_list, elt);
+      clib_llist_remove (fwrk->event_elts, evt_list, elt);
+      session_mq_connect_one (session_evt_ctrl_data (fwrk, elt));
+      session_evt_elt_free (fwrk, elt);
+      elt = next;
+      n_connects += 1;
+    }
+
+  if (clib_llist_is_empty (fwrk->event_elts, evt_list, he))
+    {
+      fwrk->pending_connects_ntf = 0;
+      need_reschedule = 0;
+    }
+
+  vlib_worker_thread_barrier_release (vm);
+
+  if (need_reschedule)
+    {
+      vlib_node_set_interrupt_pending (vm, session_queue_node.index);
+      elt = session_evt_alloc_ctrl (session_main_get_worker (0));
+      elt->evt.event_type = SESSION_CTRL_EVT_RPC;
+      elt->evt.rpc_args.fp = session_mq_handle_connects_rpc;
+    }
+}
+
+static void
+session_mq_connect_handler (session_worker_t *wrk, session_evt_elt_t *elt)
+{
+  u32 thread_index = wrk - session_main.wrk;
+  session_evt_elt_t *he;
+
+  /* No workers, so just deal with the connect now */
+  if (PREDICT_FALSE (!thread_index))
+    {
+      session_mq_connect_one (session_evt_ctrl_data (wrk, elt));
+      return;
+    }
+
+  if (PREDICT_FALSE (thread_index != 1))
+    {
+      clib_warning ("Connect on wrong thread. Dropping");
+      return;
+    }
+
+  /* Add to pending list to be handled by main thread */
+  he = pool_elt_at_index (wrk->event_elts, wrk->pending_connects);
+  clib_llist_add_tail (wrk->event_elts, evt_list, elt, he);
+
+  if (!wrk->pending_connects_ntf)
+    {
+      vlib_node_set_interrupt_pending (vlib_get_main_by_index (0),
+                                      session_queue_node.index);
+      session_send_rpc_evt_to_thread (0, session_mq_handle_connects_rpc, 0);
+      wrk->pending_connects_ntf = 1;
+    }
+}
+
 static void
 session_mq_connect_uri_handler (void *data)
 {
@@ -1331,7 +1407,7 @@ session_event_dispatch_ctrl (session_worker_t * wrk, session_evt_elt_t * elt)
       session_mq_unlisten_handler (session_evt_ctrl_data (wrk, elt));
       break;
     case SESSION_CTRL_EVT_CONNECT:
-      session_mq_connect_handler (session_evt_ctrl_data (wrk, elt));
+      session_mq_connect_handler (wrk, elt);
       break;
     case SESSION_CTRL_EVT_CONNECT_URI:
       session_mq_connect_uri_handler (session_evt_ctrl_data (wrk, elt));