session: per worker state for ct sessions 18/29618/7
authorFlorin Coras <fcoras@cisco.com>
Fri, 23 Oct 2020 23:31:40 +0000 (16:31 -0700)
committerFlorin Coras <florin.coras@gmail.com>
Tue, 27 Oct 2020 00:09:36 +0000 (00:09 +0000)
Type: feature

Signed-off-by: Florin Coras <fcoras@cisco.com>
Change-Id: Id6f7b2b969eb50eb7611e4e9ca77b7ef0e0519a1

src/vnet/session/application_local.c
src/vnet/session/segment_manager.c
src/vnet/session/segment_manager.h
src/vnet/session/session_api.c

index 25ff780..58d3789 100644 (file)
 #include <vnet/session/application_local.h>
 #include <vnet/session/session.h>
 
-static ct_connection_t *connections;
-
-static void
-ct_enable_disable_main_pre_input_node (u8 is_add)
+typedef struct ct_main_
 {
-  u32 n_conns;
-
-  if (!vlib_num_workers ())
-    return;
-
-  n_conns = pool_elts (connections);
-  if (n_conns > 2)
-    return;
-
-  if (n_conns > 0 && is_add)
-    vlib_node_set_state (vlib_get_main (),
-                        session_queue_pre_input_node.index,
-                        VLIB_NODE_STATE_POLLING);
-  else if (n_conns == 0)
-    vlib_node_set_state (vlib_get_main (),
-                        session_queue_pre_input_node.index,
-                        VLIB_NODE_STATE_DISABLED);
-}
+  ct_connection_t **connections;       /**< Per-worker connection pools */
+  u32 n_workers;                       /**< Number of vpp workers */
+  u32 n_sessions;                      /**< Cumulative sessions counter */
+} ct_main_t;
+
+static ct_main_t ct_main;
 
 static ct_connection_t *
-ct_connection_alloc (void)
+ct_connection_alloc (u32 thread_index)
 {
   ct_connection_t *ct;
 
-  pool_get_zero (connections, ct);
-  ct->c_c_index = ct - connections;
-  ct->c_thread_index = 0;
+  pool_get_zero (ct_main.connections[thread_index], ct);
+  ct->c_c_index = ct - ct_main.connections[thread_index];
+  ct->c_thread_index = thread_index;
   ct->client_wrk = ~0;
   ct->server_wrk = ~0;
   return ct;
 }
 
 static ct_connection_t *
-ct_connection_get (u32 ct_index)
+ct_connection_get (u32 ct_index, u32 thread_index)
 {
-  if (pool_is_free_index (connections, ct_index))
+  if (pool_is_free_index (ct_main.connections[thread_index], ct_index))
     return 0;
-  return pool_elt_at_index (connections, ct_index);
+  return pool_elt_at_index (ct_main.connections[thread_index], ct_index);
 }
 
 static void
 ct_connection_free (ct_connection_t * ct)
 {
   if (CLIB_DEBUG)
-    memset (ct, 0xfc, sizeof (*ct));
-  pool_put (connections, ct);
+    {
+      u32 thread_index = ct->c_thread_index;
+      memset (ct, 0xfc, sizeof (*ct));
+      pool_put (ct_main.connections[thread_index], ct);
+      return;
+    }
+  pool_put (ct_main.connections[ct->c_thread_index], ct);
 }
 
 session_t *
 ct_session_get_peer (session_t * s)
 {
   ct_connection_t *ct, *peer_ct;
-  ct = ct_connection_get (s->connection_index);
-  peer_ct = ct_connection_get (ct->peer_index);
-  return session_get (peer_ct->c_s_index, 0);
+  ct = ct_connection_get (s->connection_index, s->thread_index);
+  peer_ct = ct_connection_get (ct->peer_index, s->thread_index);
+  return session_get (peer_ct->c_s_index, s->thread_index);
 }
 
 void
@@ -92,16 +82,17 @@ ct_session_endpoint (session_t * ll, session_endpoint_t * sep)
 int
 ct_session_connect_notify (session_t * ss)
 {
+  u32 ss_index, opaque, thread_index;
   ct_connection_t *sct, *cct;
   app_worker_t *client_wrk;
   segment_manager_t *sm;
-  u32 ss_index, opaque;
   fifo_segment_t *seg;
   u64 segment_handle;
   int err = 0;
   session_t *cs;
 
   ss_index = ss->session_index;
+  thread_index = ss->thread_index;
   sct = (ct_connection_t *) session_get_transport (ss);
   client_wrk = app_worker_get (sct->client_wrk);
   opaque = sct->client_opaque;
@@ -124,10 +115,10 @@ ct_session_connect_notify (session_t * ss)
     }
 
   /* Alloc client session */
-  cct = ct_connection_get (sct->peer_index);
+  cct = ct_connection_get (sct->peer_index, thread_index);
 
-  cs = session_alloc (0);
-  ss = session_get (ss_index, 0);
+  cs = session_alloc (thread_index);
+  ss = session_get (ss_index, thread_index);
   cs->session_type = ss->session_type;
   cs->connection_index = sct->c_c_index;
   cs->listener_handle = SESSION_INVALID_HANDLE;
@@ -162,7 +153,7 @@ ct_session_connect_notify (session_t * ss)
       return -1;
     }
 
-  cs = session_get (cct->c_s_index, 0);
+  cs = session_get (cct->c_s_index, cct->c_thread_index);
   cs->session_state = SESSION_STATE_READY;
 
   return 0;
@@ -173,8 +164,9 @@ error:
 }
 
 static int
-ct_init_local_session (app_worker_t * client_wrk, app_worker_t * server_wrk,
-                      ct_connection_t * ct, session_t * ls, session_t * ll)
+ct_init_accepted_session (app_worker_t * server_wrk,
+                         ct_connection_t * ct, session_t * ls,
+                         session_t * ll)
 {
   u32 round_rx_fifo_sz, round_tx_fifo_sz, sm_index, seg_size;
   segment_manager_props_t *props;
@@ -241,50 +233,58 @@ ct_init_local_session (app_worker_t * client_wrk, app_worker_t * server_wrk,
   return 0;
 
 failed:
-  segment_manager_del_segment (sm, seg);
+  segment_manager_lock_and_del_segment (sm, seg_index);
   return rv;
 }
 
-static int
-ct_connect (app_worker_t * client_wrk, session_t * ll,
-           session_endpoint_cfg_t * sep)
+typedef struct ct_accept_rpc_args
+{
+  u32 ll_s_index;
+  u32 thread_index;
+  ip46_address_t ip;
+  u16 port;
+  u8 is_ip4;
+  u32 opaque;
+  u32 client_wrk_index;
+} ct_accept_rpc_args_t;
+
+static void
+ct_accept_rpc_wrk_handler (void *accept_args)
 {
-  u32 cct_index, ll_index, ll_ct_index;
+  ct_accept_rpc_args_t *args = (ct_accept_rpc_args_t *) accept_args;
   ct_connection_t *sct, *cct, *ll_ct;
   app_worker_t *server_wrk;
-  session_t *ss;
+  session_t *ss, *ll;
+  u32 cct_index;
 
-  ll_index = ll->session_index;
-  ll_ct_index = ll->connection_index;
+  ll = listen_session_get (args->ll_s_index);
 
-  cct = ct_connection_alloc ();
+  cct = ct_connection_alloc (args->thread_index);
   cct_index = cct->c_c_index;
-  sct = ct_connection_alloc ();
-  ll_ct = ct_connection_get (ll_ct_index);
+  sct = ct_connection_alloc (args->thread_index);
+  ll_ct = ct_connection_get (ll->connection_index, 0 /* listener thread */ );
 
   /*
    * Alloc and init client transport
    */
-  cct = ct_connection_get (cct_index);
-  cct->c_thread_index = 0;
-  cct->c_rmt_port = sep->port;
+  cct = ct_connection_get (cct_index, args->thread_index);
+  cct->c_rmt_port = args->port;
   cct->c_lcl_port = 0;
-  cct->c_is_ip4 = sep->is_ip4;
-  clib_memcpy (&cct->c_rmt_ip, &sep->ip, sizeof (sep->ip));
+  cct->c_is_ip4 = args->is_ip4;
+  clib_memcpy (&cct->c_rmt_ip, &args->ip, sizeof (args->ip));
   cct->actual_tp = ll_ct->actual_tp;
   cct->is_client = 1;
 
   /*
    * Init server transport
    */
-  sct->c_thread_index = 0;
   sct->c_rmt_port = 0;
   sct->c_lcl_port = ll_ct->c_lcl_port;
-  sct->c_is_ip4 = sep->is_ip4;
+  sct->c_is_ip4 = args->is_ip4;
   clib_memcpy (&sct->c_lcl_ip, &ll_ct->c_lcl_ip, sizeof (ll_ct->c_lcl_ip));
-  sct->client_wrk = client_wrk->wrk_index;
+  sct->client_wrk = args->client_wrk_index;
   sct->c_proto = TRANSPORT_PROTO_NONE;
-  sct->client_opaque = sep->opaque;
+  sct->client_opaque = args->opaque;
   sct->actual_tp = ll_ct->actual_tp;
 
   sct->peer_index = cct->c_c_index;
@@ -294,8 +294,8 @@ ct_connect (app_worker_t * client_wrk, session_t * ll,
    * Accept server session. Client session is created only after
    * server confirms accept.
    */
-  ss = session_alloc (0);
-  ll = listen_session_get (ll_index);
+  ss = session_alloc (args->thread_index);
+  ll = listen_session_get (args->ll_s_index);
   ss->session_type = session_type_from_proto_and_ip (TRANSPORT_PROTO_NONE,
                                                     sct->c_is_ip4);
   ss->connection_index = sct->c_c_index;
@@ -308,11 +308,11 @@ ct_connect (app_worker_t * client_wrk, session_t * ll,
   sct->c_s_index = ss->session_index;
   sct->server_wrk = ss->app_wrk_index;
 
-  if (ct_init_local_session (client_wrk, server_wrk, sct, ss, ll))
+  if (ct_init_accepted_session (server_wrk, sct, ss, ll))
     {
       ct_connection_free (sct);
       session_free (ss);
-      return -1;
+      return;
     }
 
   ss->session_state = SESSION_STATE_ACCEPTING;
@@ -321,11 +321,40 @@ ct_connect (app_worker_t * client_wrk, session_t * ll,
       ct_connection_free (sct);
       segment_manager_dealloc_fifos (ss->rx_fifo, ss->tx_fifo);
       session_free (ss);
-      return -1;
+      return;
     }
 
   cct->segment_handle = sct->segment_handle;
-  ct_enable_disable_main_pre_input_node (1 /* is_add */ );
+
+  clib_mem_free (args);
+}
+
+static int
+ct_connect (app_worker_t * client_wrk, session_t * ll,
+           session_endpoint_cfg_t * sep)
+{
+  ct_accept_rpc_args_t *args;
+  ct_main_t *cm = &ct_main;
+  u32 thread_index;
+
+  /* Simple round-robin policy for spreading sessions over workers. We skip
+   * thread index 0, i.e., offset the index by 1, when we have workers as it
+   * is the one dedicated to main thread. Note that n_workers does not include
+   * main thread */
+  cm->n_sessions += 1;
+  thread_index = cm->n_workers ? (cm->n_sessions % cm->n_workers) + 1 : 0;
+
+  args = clib_mem_alloc (sizeof (*args));
+  args->ll_s_index = ll->session_index;
+  args->thread_index = thread_index;
+  clib_memcpy_fast (&args->ip, &sep->ip, sizeof (ip46_address_t));
+  args->port = sep->port;
+  args->is_ip4 = sep->is_ip4;
+  args->opaque = sep->opaque;
+  args->client_wrk_index = client_wrk->wrk_index;
+
+  session_send_rpc_evt_to_thread (thread_index, ct_accept_rpc_wrk_handler,
+                                 args);
   return 0;
 }
 
@@ -336,14 +365,13 @@ ct_start_listen (u32 app_listener_index, transport_endpoint_t * tep)
   ct_connection_t *ct;
 
   sep = (session_endpoint_cfg_t *) tep;
-  ct = ct_connection_alloc ();
+  ct = ct_connection_alloc (0);
   ct->server_wrk = sep->app_wrk_index;
   ct->c_is_ip4 = sep->is_ip4;
   clib_memcpy (&ct->c_lcl_ip, &sep->ip, sizeof (sep->ip));
   ct->c_lcl_port = sep->port;
   ct->c_s_index = app_listener_index;
   ct->actual_tp = sep->transport_proto;
-  ct_enable_disable_main_pre_input_node (1 /* is_add */ );
   return ct->c_c_index;
 }
 
@@ -351,16 +379,15 @@ static u32
 ct_stop_listen (u32 ct_index)
 {
   ct_connection_t *ct;
-  ct = ct_connection_get (ct_index);
+  ct = ct_connection_get (ct_index, 0);
   ct_connection_free (ct);
-  ct_enable_disable_main_pre_input_node (0 /* is_add */ );
   return 0;
 }
 
 static transport_connection_t *
 ct_listener_get (u32 ct_index)
 {
-  return (transport_connection_t *) ct_connection_get (ct_index);
+  return (transport_connection_t *) ct_connection_get (ct_index, 0);
 }
 
 static int
@@ -433,15 +460,15 @@ ct_session_close (u32 ct_index, u32 thread_index)
   app_worker_t *app_wrk;
   session_t *s;
 
-  ct = ct_connection_get (ct_index);
-  peer_ct = ct_connection_get (ct->peer_index);
+  ct = ct_connection_get (ct_index, thread_index);
+  peer_ct = ct_connection_get (ct->peer_index, thread_index);
   if (peer_ct)
     {
       peer_ct->peer_index = ~0;
       session_transport_closing_notify (&peer_ct->connection);
     }
 
-  s = session_get (ct->c_s_index, 0);
+  s = session_get (ct->c_s_index, ct->c_thread_index);
   app_wrk = app_worker_get_if_valid (s->app_wrk_index);
   if (app_wrk)
     app_worker_del_segment_notify (app_wrk, ct->segment_handle);
@@ -450,13 +477,13 @@ ct_session_close (u32 ct_index, u32 thread_index)
     segment_manager_dealloc_fifos (ct->client_rx_fifo, ct->client_tx_fifo);
 
   ct_connection_free (ct);
-  ct_enable_disable_main_pre_input_node (0 /* is_add */ );
 }
 
 static transport_connection_t *
 ct_session_get (u32 ct_index, u32 thread_index)
 {
-  return (transport_connection_t *) ct_connection_get (ct_index);
+  return (transport_connection_t *) ct_connection_get (ct_index,
+                                                      thread_index);
 }
 
 static u8 *
@@ -503,7 +530,7 @@ ct_app_rx_evt (transport_connection_t * tc)
   ct_connection_t *ct = (ct_connection_t *) tc, *peer_ct;
   session_t *ps;
 
-  peer_ct = ct_connection_get (ct->peer_index);
+  peer_ct = ct_connection_get (ct->peer_index, tc->thread_index);
   if (!peer_ct)
     return -1;
   ps = session_get (peer_ct->c_s_index, peer_ct->c_thread_index);
@@ -516,7 +543,7 @@ format_ct_listener (u8 * s, va_list * args)
   u32 tc_index = va_arg (*args, u32);
   u32 __clib_unused thread_index = va_arg (*args, u32);
   u32 __clib_unused verbose = va_arg (*args, u32);
-  ct_connection_t *ct = ct_connection_get (tc_index);
+  ct_connection_t *ct = ct_connection_get (tc_index, 0);
   s = format (s, "%-" SESSION_CLI_ID_LEN "U", format_ct_connection_id, ct);
   if (verbose)
     s = format (s, "%-" SESSION_CLI_STATE_LEN "s", "LISTEN");
@@ -547,11 +574,11 @@ static u8 *
 format_ct_session (u8 * s, va_list * args)
 {
   u32 ct_index = va_arg (*args, u32);
-  u32 __clib_unused thread_index = va_arg (*args, u32);
+  u32 thread_index = va_arg (*args, u32);
   u32 verbose = va_arg (*args, u32);
   ct_connection_t *ct;
 
-  ct = ct_connection_get (ct_index);
+  ct = ct_connection_get (ct_index, thread_index);
   if (!ct)
     {
       s = format (s, "empty\n");
@@ -562,8 +589,17 @@ format_ct_session (u8 * s, va_list * args)
   return s;
 }
 
+clib_error_t *
+ct_enable_disable (vlib_main_t * vm, u8 is_en)
+{
+  ct_main.n_workers = vlib_num_workers ();
+  vec_validate (ct_main.connections, ct_main.n_workers);
+  return 0;
+}
+
 /* *INDENT-OFF* */
 static const transport_proto_vft_t cut_thru_proto = {
+  .enable = ct_enable_disable,
   .start_listen = ct_start_listen,
   .stop_listen = ct_stop_listen,
   .get_listener = ct_listener_get,
@@ -590,10 +626,10 @@ ct_session_tx (session_t * s)
   session_t *peer_s;
 
   ct = (ct_connection_t *) session_get_transport (s);
-  peer_ct = ct_connection_get (ct->peer_index);
+  peer_ct = ct_connection_get (ct->peer_index, ct->c_thread_index);
   if (!peer_ct)
     return 0;
-  peer_s = session_get (peer_ct->c_s_index, 0);
+  peer_s = session_get (peer_ct->c_s_index, peer_ct->c_thread_index);
   if (peer_s->session_state >= SESSION_STATE_TRANSPORT_CLOSING)
     return 0;
   session_enqueue_notify (peer_s);
index e00a761..1d648f9 100644 (file)
@@ -231,7 +231,7 @@ segment_manager_get_segment_if_valid (segment_manager_t * sm,
  * Removes segment after acquiring writer lock
  */
 static inline void
-segment_manager_lock_and_del_segment (segment_manager_t * sm, u32 fs_index)
+sm_lock_and_del_segment_inline (segment_manager_t * sm, u32 fs_index)
 {
   fifo_segment_t *fs;
   u8 is_prealloc;
@@ -252,6 +252,12 @@ done:
   clib_rwlock_writer_unlock (&sm->segments_rwlock);
 }
 
+void
+segment_manager_lock_and_del_segment (segment_manager_t * sm, u32 fs_index)
+{
+  sm_lock_and_del_segment_inline (sm, fs_index);
+}
+
 /**
  * Reads a segment from the segment manager's pool without lock
  */
@@ -774,7 +780,7 @@ segment_manager_dealloc_fifos (svm_fifo_t * rx_fifo, svm_fifo_t * tx_fifo)
 
       /* Remove segment if it holds no fifos or first but not protected */
       if (segment_index != 0 || !sm->first_is_protected)
-       segment_manager_lock_and_del_segment (sm, segment_index);
+       sm_lock_and_del_segment_inline (sm, segment_index);
 
       /* Remove segment manager if no sessions and detached from app */
       if (segment_manager_app_detached (sm)
index f4dacdd..1036ca2 100644 (file)
@@ -110,6 +110,8 @@ u32 segment_manager_index (segment_manager_t * sm);
 int segment_manager_add_segment (segment_manager_t * sm, uword segment_size);
 void segment_manager_del_segment (segment_manager_t * sm,
                                  fifo_segment_t * fs);
+void segment_manager_lock_and_del_segment (segment_manager_t * sm,
+                                          u32 fs_index);
 fifo_segment_t *segment_manager_get_segment (segment_manager_t * sm,
                                             u32 segment_index);
 fifo_segment_t *segment_manager_get_segment_w_handle (u64 sh);
index 12aea49..6a609eb 100644 (file)
@@ -180,7 +180,7 @@ mq_send_session_accepted_cb (session_t * s)
       mp->rmt.is_ip4 = session_type_is_ip4 (listener->session_type);
       mp->rmt.port = ct->c_rmt_port;
       mp->handle = session_handle (s);
-      vpp_queue = session_main_get_vpp_event_queue (0);
+      vpp_queue = session_main_get_vpp_event_queue (s->thread_index);
       mp->vpp_event_queue_address = pointer_to_uword (vpp_queue);
     }
   svm_msg_q_add_and_unlock (app_mq, msg);
@@ -318,7 +318,7 @@ mq_send_session_connected_cb (u32 app_wrk_index, u32 api_context,
       mp->handle = session_handle (s);
       mp->lcl.port = cct->c_lcl_port;
       mp->lcl.is_ip4 = cct->c_is_ip4;
-      vpp_mq = session_main_get_vpp_event_queue (0);
+      vpp_mq = session_main_get_vpp_event_queue (s->thread_index);
       mp->vpp_event_queue_address = pointer_to_uword (vpp_mq);
       mp->server_rx_fifo = pointer_to_uword (s->rx_fifo);
       mp->server_tx_fifo = pointer_to_uword (s->tx_fifo);