session: move connects to first worker 13/35713/69
authorFlorin Coras <fcoras@cisco.com>
Fri, 18 Mar 2022 15:33:08 +0000 (08:33 -0700)
committerDave Barach <vpp@barachs.net>
Fri, 2 Dec 2022 22:59:13 +0000 (22:59 +0000)
Type: improvement

Signed-off-by: Florin Coras <fcoras@cisco.com>
Change-Id: I035e3fdbb52eca010ad7b2c20ca2930cb1645978

12 files changed:
src/plugins/hs_apps/echo_client.c
src/plugins/unittest/session_test.c
src/vnet/session/application.c
src/vnet/session/application_worker.c
src/vnet/session/session.c
src/vnet/session/session.h
src/vnet/session/session_node.c
src/vnet/session/transport.c
src/vnet/tcp/tcp.c
src/vnet/tcp/tcp_inlines.h
src/vnet/tcp/tcp_timer.h
src/vnet/tls/tls.c

index c6c7ede..14d47be 100644 (file)
@@ -826,7 +826,6 @@ ec_connect_rpc (void *args)
 {
   ec_main_t *ecm = &ec_main;
   vnet_connect_args_t _a = {}, *a = &_a;
-  vlib_main_t *vm = vlib_get_main ();
   int rv, needs_crypto;
   u32 n_clients, ci;
 
@@ -838,8 +837,6 @@ ec_connect_rpc (void *args)
 
   ci = ecm->connect_conn_index;
 
-  vlib_worker_thread_barrier_sync (vm);
-
   while (ci < n_clients)
     {
       /* Crude pacing for call setups  */
@@ -873,8 +870,6 @@ ec_connect_rpc (void *args)
       ci += 1;
     }
 
-  vlib_worker_thread_barrier_release (vm);
-
   if (ci < ecm->expected_connections && ecm->run_test != EC_EXITING)
     ec_program_connects ();
 
@@ -884,7 +879,8 @@ ec_connect_rpc (void *args)
 void
 ec_program_connects (void)
 {
-  session_send_rpc_evt_to_thread_force (0, ec_connect_rpc, 0);
+  session_send_rpc_evt_to_thread_force (transport_cl_thread (), ec_connect_rpc,
+                                       0);
 }
 
 #define ec_cli(_fmt, _args...)                                                \
index 6a292c7..70d9fd0 100644 (file)
@@ -404,14 +404,6 @@ session_test_endpoint_cfg (vlib_main_t * vm, unformat_input_t * input)
   SESSION_TEST ((tc->lcl_port == placeholder_client_port),
                "ports should be equal");
 
-  /* These sessions, because of the way they're established are pinned to
-   * main thread, even when we have workers and we avoid polling main thread,
-   * i.e., we can't cleanup pending disconnects, so force cleanup for both
-   */
-  session_transport_cleanup (s);
-  s = session_get (accepted_session_index, accepted_session_thread);
-  session_transport_cleanup (s);
-
   vnet_app_detach_args_t detach_args = {
     .app_index = server_index,
     .api_client_index = ~0,
index 3b2c7cd..ad4d447 100644 (file)
@@ -1357,7 +1357,7 @@ vnet_connect (vnet_connect_args_t * a)
   app_worker_t *client_wrk;
   application_t *client;
 
-  ASSERT (vlib_thread_is_main_w_barrier ());
+  ASSERT (session_vlib_thread_is_cl_thread ());
 
   if (session_endpoint_is_zero (&a->sep))
     return SESSION_E_INVALID_RMT_IP;
index 844e78f..0cb1791 100644 (file)
@@ -393,7 +393,7 @@ app_worker_add_half_open (app_worker_t *app_wrk, session_handle_t sh)
 {
   session_handle_t *shp;
 
-  ASSERT (vlib_get_thread_index () == 0);
+  ASSERT (session_vlib_thread_is_cl_thread ());
   pool_get (app_wrk->half_open_table, shp);
   *shp = sh;
 
@@ -404,7 +404,7 @@ int
 app_worker_del_half_open (app_worker_t *app_wrk, session_t *s)
 {
   application_t *app = application_get (app_wrk->app_index);
-  ASSERT (vlib_get_thread_index () <= 1);
+  ASSERT (session_vlib_thread_is_cl_thread ());
   pool_put_index (app_wrk->half_open_table, s->ho_index);
   if (app->cb_fns.half_open_cleanup_callback)
     app->cb_fns.half_open_cleanup_callback (s);
index 91e9ed5..eaba80f 100644 (file)
@@ -334,15 +334,15 @@ void
 session_half_open_delete_notify (transport_connection_t *tc)
 {
   /* Notification from ctrl thread accepted without rpc */
-  if (!tc->thread_index)
+  if (tc->thread_index == transport_cl_thread ())
     {
       session_half_open_free (ho_session_get (tc->s_index));
     }
   else
     {
       void *args = uword_to_pointer ((uword) tc->s_index, void *);
-      session_send_rpc_evt_to_thread_force (0, session_half_open_free_rpc,
-                                           args);
+      session_send_rpc_evt_to_thread_force (transport_cl_thread (),
+                                           session_half_open_free_rpc, args);
     }
 }
 
index 16000e6..b8cc1c3 100644 (file)
@@ -157,9 +157,6 @@ typedef struct session_worker_
   /** Flag that is set if main thread signaled to handle connects */
   u32 n_pending_connects;
 
-  /** Main thread loops in poll mode without a connect */
-  u32 no_connect_loops;
-
   /** List head for first worker evts pending handling on main */
   clib_llist_index_t evts_pending_main;
 
@@ -212,7 +209,9 @@ typedef struct session_main_
    * Trade memory for speed, for now */
   u32 *session_type_to_next;
 
-  /** Thread for cl and ho that rely on cl allocs */
+  /** Thread used for allocating active open connections, i.e., half-opens
+   * for transports like tcp, and sessions that will be migrated for cl
+   * transports like udp. If vpp has workers, this will be first worker. */
   u32 transport_cl_thread;
 
   transport_proto_t last_transport_proto_type;
@@ -616,6 +615,13 @@ transport_cl_thread (void)
   return session_main.transport_cl_thread;
 }
 
+always_inline u32
+session_vlib_thread_is_cl_thread (void)
+{
+  return (vlib_get_thread_index () == transport_cl_thread () ||
+         vlib_thread_is_main_w_barrier ());
+}
+
 /*
  * Listen sessions
  */
@@ -668,29 +674,17 @@ always_inline session_t *
 ho_session_alloc (void)
 {
   session_t *s;
-  ASSERT (vlib_get_thread_index () == 0);
-  s = session_alloc (0);
+  ASSERT (session_vlib_thread_is_cl_thread ());
+  s = session_alloc (transport_cl_thread ());
   s->session_state = SESSION_STATE_CONNECTING;
   s->flags |= SESSION_F_HALF_OPEN;
-  /* Not ideal. Half-opens are only allocated from main with worker barrier
-   * but can be cleaned up, i.e., session_half_open_free, from main without
-   * a barrier. In debug images, the free_bitmap can grow while workers peek
-   * the sessions pool, e.g., session_half_open_migrate_notify, and as a
-   * result crash while validating the session. To avoid this, grow the bitmap
-   * now. */
-  if (CLIB_DEBUG)
-    {
-      session_t *sp = session_main.wrk[0].sessions;
-      clib_bitmap_validate (pool_header (sp)->free_bitmap,
-                           s->session_index + 1);
-    }
   return s;
 }
 
 always_inline session_t *
 ho_session_get (u32 ho_index)
 {
-  return session_get (ho_index, 0 /* half-open thread */);
+  return session_get (ho_index, transport_cl_thread ());
 }
 
 always_inline void
index 8f6503d..be00925 100644 (file)
@@ -224,23 +224,20 @@ static void
 session_mq_handle_connects_rpc (void *arg)
 {
   u32 max_connects = 32, n_connects = 0;
-  vlib_main_t *vm = vlib_get_main ();
   session_evt_elt_t *he, *elt, *next;
-  session_worker_t *fwrk, *wrk;
+  session_worker_t *fwrk;
 
-  ASSERT (vlib_get_thread_index () == 0);
+  ASSERT (session_vlib_thread_is_cl_thread ());
 
   /* Pending connects on linked list pertaining to first worker */
-  fwrk = session_main_get_worker (1);
+  fwrk = session_main_get_worker (transport_cl_thread ());
   if (!fwrk->n_pending_connects)
-    goto update_state;
-
-  vlib_worker_thread_barrier_sync (vm);
+    return;
 
   he = clib_llist_elt (fwrk->event_elts, fwrk->pending_connects);
   elt = clib_llist_next (fwrk->event_elts, evt_list, he);
 
-  /* Avoid holding the barrier for too long */
+  /* Avoid holding the worker for too long */
   while (n_connects < max_connects && elt != he)
     {
       next = clib_llist_next (fwrk->event_elts, evt_list, elt);
@@ -254,45 +251,10 @@ session_mq_handle_connects_rpc (void *arg)
 
   /* Decrement with worker barrier */
   fwrk->n_pending_connects -= n_connects;
-
-  vlib_worker_thread_barrier_release (vm);
-
-update_state:
-
-  /* Switch worker to poll mode if it was in interrupt mode and had work or
-   * back to interrupt if threshold of loops without a connect is passed.
-   * While in poll mode, reprogram connects rpc */
-  wrk = session_main_get_worker (0);
-  if (wrk->state != SESSION_WRK_POLLING)
-    {
-      if (n_connects)
-       {
-         session_wrk_set_state (wrk, SESSION_WRK_POLLING);
-         vlib_node_set_state (vm, session_queue_node.index,
-                              VLIB_NODE_STATE_POLLING);
-         wrk->no_connect_loops = 0;
-       }
-    }
-  else
-    {
-      if (!n_connects)
-       {
-         if (++wrk->no_connect_loops > 1e5)
-           {
-             session_wrk_set_state (wrk, SESSION_WRK_INTERRUPT);
-             vlib_node_set_state (vm, session_queue_node.index,
-                                  VLIB_NODE_STATE_INTERRUPT);
-           }
-       }
-      else
-       wrk->no_connect_loops = 0;
-    }
-
-  if (wrk->state == SESSION_WRK_POLLING)
+  if (fwrk->n_pending_connects > 0)
     {
-      elt = session_evt_alloc_ctrl (wrk);
-      elt->evt.event_type = SESSION_CTRL_EVT_RPC;
-      elt->evt.rpc_args.fp = session_mq_handle_connects_rpc;
+      session_send_rpc_evt_to_thread_force (fwrk->vm->thread_index,
+                                           session_mq_handle_connects_rpc, 0);
     }
 }
 
@@ -302,20 +264,28 @@ session_mq_connect_handler (session_worker_t *wrk, session_evt_elt_t *elt)
   u32 thread_index = wrk - session_main.wrk;
   session_evt_elt_t *he;
 
-  /* No workers, so just deal with the connect now */
-  if (PREDICT_FALSE (!thread_index))
+  if (PREDICT_FALSE (thread_index > transport_cl_thread ()))
     {
-      session_mq_connect_one (session_evt_ctrl_data (wrk, elt));
+      clib_warning ("Connect on wrong thread. Dropping");
       return;
     }
 
-  if (PREDICT_FALSE (thread_index != 1))
+  /* If on worker, check if main has any pending messages. Avoids reordering
+   * with other control messages that need to be handled by main
+   */
+  if (thread_index)
     {
-      clib_warning ("Connect on wrong thread. Dropping");
-      return;
+      he = clib_llist_elt (wrk->event_elts, wrk->evts_pending_main);
+
+      /* Events pending on main, postpone to avoid reordering */
+      if (!clib_llist_is_empty (wrk->event_elts, evt_list, he))
+       {
+         clib_llist_add_tail (wrk->event_elts, evt_list, elt, he);
+         return;
+       }
     }
 
-  /* Add to pending list to be handled by main thread */
+  /* Add to pending list to be handled by first worker */
   he = clib_llist_elt (wrk->event_elts, wrk->pending_connects);
   clib_llist_add_tail (wrk->event_elts, evt_list, elt, he);
 
@@ -323,9 +293,8 @@ session_mq_connect_handler (session_worker_t *wrk, session_evt_elt_t *elt)
   wrk->n_pending_connects += 1;
   if (wrk->n_pending_connects == 1)
     {
-      vlib_node_set_interrupt_pending (vlib_get_main_by_index (0),
-                                      session_queue_node.index);
-      session_send_rpc_evt_to_thread (0, session_mq_handle_connects_rpc, 0);
+      session_send_rpc_evt_to_thread_force (thread_index,
+                                           session_mq_handle_connects_rpc, 0);
     }
 }
 
@@ -812,6 +781,9 @@ session_wrk_handle_evts_main_rpc (void *args)
        case SESSION_CTRL_EVT_ACCEPTED_REPLY:
          session_mq_accepted_reply_handler (fwrk, elt);
          break;
+       case SESSION_CTRL_EVT_CONNECT:
+         session_mq_connect_handler (fwrk, elt);
+         break;
        default:
          clib_warning ("unhandled %u", elt->evt.event_type);
          ALWAYS_ASSERT (0);
@@ -820,8 +792,11 @@ session_wrk_handle_evts_main_rpc (void *args)
 
       /* Regrab element in case pool moved */
       elt = clib_llist_elt (fwrk->event_elts, ei);
-      session_evt_ctrl_data_free (fwrk, elt);
-      clib_llist_put (fwrk->event_elts, elt);
+      if (!clib_llist_elt_is_linked (elt, evt_list))
+       {
+         session_evt_ctrl_data_free (fwrk, elt);
+         clib_llist_put (fwrk->event_elts, elt);
+       }
       ei = next_ei;
     }
 
index b13370b..8554c73 100644 (file)
@@ -486,7 +486,8 @@ transport_program_endpoint_cleanup (u32 lepi)
   clib_spinlock_unlock (&tm->local_endpoints_lock);
 
   if (flush_fl)
-    session_send_rpc_evt_to_thread_force (0, transport_cleanup_freelist, 0);
+    session_send_rpc_evt_to_thread_force (transport_cl_thread (),
+                                         transport_cleanup_freelist, 0);
 }
 
 int
index bdf1751..c0445b9 100644 (file)
@@ -188,8 +188,7 @@ tcp_session_get_listener (u32 listener_index)
 static tcp_connection_t *
 tcp_half_open_connection_alloc (void)
 {
-  ASSERT (vlib_get_thread_index () == 0);
-  return tcp_connection_alloc (0);
+  return tcp_connection_alloc (transport_cl_thread ());
 }
 
 /**
@@ -199,7 +198,8 @@ tcp_half_open_connection_alloc (void)
 static void
 tcp_half_open_connection_free (tcp_connection_t * tc)
 {
-  ASSERT (vlib_get_thread_index () == 0);
+  ASSERT (vlib_get_thread_index () == tc->c_thread_index ||
+         vlib_thread_is_main_w_barrier ());
   return tcp_connection_free (tc);
 }
 
index 69f8ce7..91c5757 100644 (file)
@@ -66,7 +66,7 @@ tcp_listener_get (u32 tli)
 always_inline tcp_connection_t *
 tcp_half_open_connection_get (u32 conn_index)
 {
-  return tcp_connection_get (conn_index, 0);
+  return tcp_connection_get (conn_index, transport_cl_thread ());
 }
 
 /**
index 7f7dbf1..c0907ca 100644 (file)
 
 #include <vnet/tcp/tcp_types.h>
 
+static inline u8
+tcp_timer_thread_is_valid (tcp_connection_t *tc)
+{
+  return ((tc->c_thread_index == vlib_get_thread_index ()) ||
+         vlib_thread_is_main_w_barrier ());
+}
+
 always_inline void
-tcp_timer_set (tcp_timer_wheel_t * tw, tcp_connection_t * tc, u8 timer_id,
+tcp_timer_set (tcp_timer_wheel_t *tw, tcp_connection_t *tc, u8 timer_id,
               u32 interval)
 {
-  ASSERT (tc->c_thread_index == vlib_get_thread_index ());
+  ASSERT (tcp_timer_thread_is_valid (tc));
   ASSERT (tc->timers[timer_id] == TCP_TIMER_HANDLE_INVALID);
   tc->timers[timer_id] = tw_timer_start_tcp_twsl (tw, tc->c_c_index,
                                                  timer_id, interval);
@@ -30,7 +37,7 @@ tcp_timer_set (tcp_timer_wheel_t * tw, tcp_connection_t * tc, u8 timer_id,
 always_inline void
 tcp_timer_reset (tcp_timer_wheel_t * tw, tcp_connection_t * tc, u8 timer_id)
 {
-  ASSERT (tc->c_thread_index == vlib_get_thread_index ());
+  ASSERT (tcp_timer_thread_is_valid (tc));
   tc->pending_timers &= ~(1 << timer_id);
   if (tc->timers[timer_id] == TCP_TIMER_HANDLE_INVALID)
     return;
@@ -43,7 +50,7 @@ always_inline void
 tcp_timer_update (tcp_timer_wheel_t * tw, tcp_connection_t * tc, u8 timer_id,
                  u32 interval)
 {
-  ASSERT (tc->c_thread_index == vlib_get_thread_index ());
+  ASSERT (tcp_timer_thread_is_valid (tc));
   if (tc->timers[timer_id] != TCP_TIMER_HANDLE_INVALID)
     tw_timer_update_tcp_twsl (tw, tc->timers[timer_id], interval);
   else
index 85ac7f8..b082467 100644 (file)
@@ -121,6 +121,7 @@ tls_ctx_half_open_alloc (void)
 
   clib_memset (ctx, 0, sizeof (*ctx));
   ctx->c_c_index = ctx - tm->half_open_ctx_pool;
+  ctx->c_thread_index = transport_cl_thread ();
 
   return ctx->c_c_index;
 }