crypto crypto-openssl: support hashing operations
[vpp.git] / src / vnet / session / application_local.c
index decff4e..1220952 100644 (file)
 #include <vnet/session/application_local.h>
 #include <vnet/session/session.h>
 
-static ct_connection_t *connections;
-
-static void
-ct_enable_disable_main_pre_input_node (u8 is_add)
+typedef struct ct_main_
 {
-  u32 n_conns;
-
-  if (!vlib_num_workers ())
-    return;
-
-  n_conns = pool_elts (connections);
-  if (n_conns > 2)
-    return;
-
-  if (n_conns > 0 && is_add)
-    vlib_node_set_state (vlib_get_main (),
-                        session_queue_pre_input_node.index,
-                        VLIB_NODE_STATE_POLLING);
-  else if (n_conns == 0)
-    vlib_node_set_state (vlib_get_main (),
-                        session_queue_pre_input_node.index,
-                        VLIB_NODE_STATE_DISABLED);
-}
+  ct_connection_t **connections;       /**< Per-worker connection pools */
+  u32 n_workers;                       /**< Number of vpp workers */
+  u32 n_sessions;                      /**< Cumulative sessions counter */
+} ct_main_t;
+
+static ct_main_t ct_main;
 
 static ct_connection_t *
-ct_connection_alloc (void)
+ct_connection_alloc (u32 thread_index)
 {
   ct_connection_t *ct;
 
-  pool_get_zero (connections, ct);
-  ct->c_c_index = ct - connections;
-  ct->c_thread_index = 0;
+  pool_get_zero (ct_main.connections[thread_index], ct);
+  ct->c_c_index = ct - ct_main.connections[thread_index];
+  ct->c_thread_index = thread_index;
   ct->client_wrk = ~0;
   ct->server_wrk = ~0;
   return ct;
 }
 
 static ct_connection_t *
-ct_connection_get (u32 ct_index)
+ct_connection_get (u32 ct_index, u32 thread_index)
 {
-  if (pool_is_free_index (connections, ct_index))
+  if (pool_is_free_index (ct_main.connections[thread_index], ct_index))
     return 0;
-  return pool_elt_at_index (connections, ct_index);
+  return pool_elt_at_index (ct_main.connections[thread_index], ct_index);
 }
 
 static void
 ct_connection_free (ct_connection_t * ct)
 {
   if (CLIB_DEBUG)
-    memset (ct, 0xfc, sizeof (*ct));
-  pool_put (connections, ct);
+    {
+      u32 thread_index = ct->c_thread_index;
+      memset (ct, 0xfc, sizeof (*ct));
+      pool_put (ct_main.connections[thread_index], ct);
+      return;
+    }
+  pool_put (ct_main.connections[ct->c_thread_index], ct);
 }
 
 session_t *
 ct_session_get_peer (session_t * s)
 {
   ct_connection_t *ct, *peer_ct;
-  ct = ct_connection_get (s->connection_index);
-  peer_ct = ct_connection_get (ct->peer_index);
-  return session_get (peer_ct->c_s_index, 0);
+  ct = ct_connection_get (s->connection_index, s->thread_index);
+  peer_ct = ct_connection_get (ct->peer_index, s->thread_index);
+  return session_get (peer_ct->c_s_index, s->thread_index);
 }
 
 void
@@ -92,16 +82,17 @@ ct_session_endpoint (session_t * ll, session_endpoint_t * sep)
 int
 ct_session_connect_notify (session_t * ss)
 {
+  u32 ss_index, opaque, thread_index;
   ct_connection_t *sct, *cct;
   app_worker_t *client_wrk;
   segment_manager_t *sm;
-  u32 ss_index, opaque;
   fifo_segment_t *seg;
   u64 segment_handle;
   int err = 0;
   session_t *cs;
 
   ss_index = ss->session_index;
+  thread_index = ss->thread_index;
   sct = (ct_connection_t *) session_get_transport (ss);
   client_wrk = app_worker_get (sct->client_wrk);
   opaque = sct->client_opaque;
@@ -124,12 +115,11 @@ ct_session_connect_notify (session_t * ss)
     }
 
   /* Alloc client session */
-  cct = ct_connection_get (sct->peer_index);
+  cct = ct_connection_get (sct->peer_index, thread_index);
 
-  cs = session_alloc (0);
-  ss = session_get (ss_index, 0);
+  cs = session_alloc (thread_index);
+  ss = session_get (ss_index, thread_index);
   cs->session_type = ss->session_type;
-  cs->connection_index = sct->c_c_index;
   cs->listener_handle = SESSION_INVALID_HANDLE;
   cs->session_state = SESSION_STATE_CONNECTING;
   cs->app_wrk_index = client_wrk->wrk_index;
@@ -162,7 +152,7 @@ ct_session_connect_notify (session_t * ss)
       return -1;
     }
 
-  cs = session_get (cct->c_s_index, 0);
+  cs = session_get (cct->c_s_index, cct->c_thread_index);
   cs->session_state = SESSION_STATE_READY;
 
   return 0;
@@ -173,8 +163,9 @@ error:
 }
 
 static int
-ct_init_local_session (app_worker_t * client_wrk, app_worker_t * server_wrk,
-                      ct_connection_t * ct, session_t * ls, session_t * ll)
+ct_init_accepted_session (app_worker_t * server_wrk,
+                         ct_connection_t * ct, session_t * ls,
+                         session_t * ll)
 {
   u32 round_rx_fifo_sz, round_tx_fifo_sz, sm_index, seg_size;
   segment_manager_props_t *props;
@@ -197,7 +188,7 @@ ct_init_local_session (app_worker_t * client_wrk, app_worker_t * server_wrk,
   seg_size = 4 * (round_rx_fifo_sz + round_tx_fifo_sz + margin);
 
   sm = app_worker_get_listen_segment_manager (server_wrk, ll);
-  seg_index = segment_manager_add_segment (sm, seg_size);
+  seg_index = segment_manager_add_segment (sm, seg_size, 0);
   if (seg_index < 0)
     {
       clib_warning ("failed to add new cut-through segment");
@@ -217,8 +208,10 @@ ct_init_local_session (app_worker_t * client_wrk, app_worker_t * server_wrk,
     }
 
   sm_index = segment_manager_index (sm);
-  ls->rx_fifo->master_session_index = ls->session_index;
-  ls->tx_fifo->master_session_index = ls->session_index;
+  ls->rx_fifo->shr->master_session_index = ls->session_index;
+  ls->tx_fifo->shr->master_session_index = ls->session_index;
+  ls->rx_fifo->master_thread_index = ls->thread_index;
+  ls->tx_fifo->master_thread_index = ls->thread_index;
   ls->rx_fifo->segment_manager = sm_index;
   ls->tx_fifo->segment_manager = sm_index;
   ls->rx_fifo->segment_index = seg_index;
@@ -237,50 +230,59 @@ ct_init_local_session (app_worker_t * client_wrk, app_worker_t * server_wrk,
   return 0;
 
 failed:
-  segment_manager_del_segment (sm, seg);
+  segment_manager_lock_and_del_segment (sm, seg_index);
   return rv;
 }
 
-static int
-ct_connect (app_worker_t * client_wrk, session_t * ll,
-           session_endpoint_cfg_t * sep)
+typedef struct ct_accept_rpc_args
+{
+  u32 ll_s_index;
+  u32 thread_index;
+  ip46_address_t ip;
+  u16 port;
+  u8 is_ip4;
+  u32 opaque;
+  u32 client_wrk_index;
+} ct_accept_rpc_args_t;
+
+static void
+ct_accept_rpc_wrk_handler (void *accept_args)
 {
-  u32 cct_index, ll_index, ll_ct_index;
+  ct_accept_rpc_args_t *args = (ct_accept_rpc_args_t *) accept_args;
   ct_connection_t *sct, *cct, *ll_ct;
   app_worker_t *server_wrk;
-  session_t *ss;
+  session_t *ss, *ll;
+  u32 cct_index;
 
-  ll_index = ll->session_index;
-  ll_ct_index = ll->connection_index;
+  ll = listen_session_get (args->ll_s_index);
 
-  cct = ct_connection_alloc ();
+  cct = ct_connection_alloc (args->thread_index);
   cct_index = cct->c_c_index;
-  sct = ct_connection_alloc ();
-  ll_ct = ct_connection_get (ll_ct_index);
+  sct = ct_connection_alloc (args->thread_index);
+  ll_ct = ct_connection_get (ll->connection_index, 0 /* listener thread */ );
 
   /*
    * Alloc and init client transport
    */
-  cct = ct_connection_get (cct_index);
-  cct->c_thread_index = 0;
-  cct->c_rmt_port = sep->port;
+  cct = ct_connection_get (cct_index, args->thread_index);
+  cct->c_rmt_port = args->port;
   cct->c_lcl_port = 0;
-  cct->c_is_ip4 = sep->is_ip4;
-  clib_memcpy (&cct->c_rmt_ip, &sep->ip, sizeof (sep->ip));
+  cct->c_is_ip4 = args->is_ip4;
+  clib_memcpy (&cct->c_rmt_ip, &args->ip, sizeof (args->ip));
   cct->actual_tp = ll_ct->actual_tp;
   cct->is_client = 1;
+  cct->c_s_index = ~0;
 
   /*
    * Init server transport
    */
-  sct->c_thread_index = 0;
   sct->c_rmt_port = 0;
   sct->c_lcl_port = ll_ct->c_lcl_port;
-  sct->c_is_ip4 = sep->is_ip4;
+  sct->c_is_ip4 = args->is_ip4;
   clib_memcpy (&sct->c_lcl_ip, &ll_ct->c_lcl_ip, sizeof (ll_ct->c_lcl_ip));
-  sct->client_wrk = client_wrk->wrk_index;
+  sct->client_wrk = args->client_wrk_index;
   sct->c_proto = TRANSPORT_PROTO_NONE;
-  sct->client_opaque = sep->opaque;
+  sct->client_opaque = args->opaque;
   sct->actual_tp = ll_ct->actual_tp;
 
   sct->peer_index = cct->c_c_index;
@@ -290,8 +292,8 @@ ct_connect (app_worker_t * client_wrk, session_t * ll,
    * Accept server session. Client session is created only after
    * server confirms accept.
    */
-  ss = session_alloc (0);
-  ll = listen_session_get (ll_index);
+  ss = session_alloc (args->thread_index);
+  ll = listen_session_get (args->ll_s_index);
   ss->session_type = session_type_from_proto_and_ip (TRANSPORT_PROTO_NONE,
                                                     sct->c_is_ip4);
   ss->connection_index = sct->c_c_index;
@@ -304,11 +306,11 @@ ct_connect (app_worker_t * client_wrk, session_t * ll,
   sct->c_s_index = ss->session_index;
   sct->server_wrk = ss->app_wrk_index;
 
-  if (ct_init_local_session (client_wrk, server_wrk, sct, ss, ll))
+  if (ct_init_accepted_session (server_wrk, sct, ss, ll))
     {
       ct_connection_free (sct);
       session_free (ss);
-      return -1;
+      return;
     }
 
   ss->session_state = SESSION_STATE_ACCEPTING;
@@ -317,11 +319,40 @@ ct_connect (app_worker_t * client_wrk, session_t * ll,
       ct_connection_free (sct);
       segment_manager_dealloc_fifos (ss->rx_fifo, ss->tx_fifo);
       session_free (ss);
-      return -1;
+      return;
     }
 
   cct->segment_handle = sct->segment_handle;
-  ct_enable_disable_main_pre_input_node (1 /* is_add */ );
+
+  clib_mem_free (args);
+}
+
+static int
+ct_connect (app_worker_t * client_wrk, session_t * ll,
+           session_endpoint_cfg_t * sep)
+{
+  ct_accept_rpc_args_t *args;
+  ct_main_t *cm = &ct_main;
+  u32 thread_index;
+
+  /* Simple round-robin policy for spreading sessions over workers. We skip
+   * thread index 0, i.e., offset the index by 1, when we have workers as it
+   * is the one dedicated to main thread. Note that n_workers does not include
+   * main thread */
+  cm->n_sessions += 1;
+  thread_index = cm->n_workers ? (cm->n_sessions % cm->n_workers) + 1 : 0;
+
+  args = clib_mem_alloc (sizeof (*args));
+  args->ll_s_index = ll->session_index;
+  args->thread_index = thread_index;
+  clib_memcpy_fast (&args->ip, &sep->ip, sizeof (ip46_address_t));
+  args->port = sep->port;
+  args->is_ip4 = sep->is_ip4;
+  args->opaque = sep->opaque;
+  args->client_wrk_index = client_wrk->wrk_index;
+
+  session_send_rpc_evt_to_thread (thread_index, ct_accept_rpc_wrk_handler,
+                                 args);
   return 0;
 }
 
@@ -332,13 +363,13 @@ ct_start_listen (u32 app_listener_index, transport_endpoint_t * tep)
   ct_connection_t *ct;
 
   sep = (session_endpoint_cfg_t *) tep;
-  ct = ct_connection_alloc ();
+  ct = ct_connection_alloc (0);
   ct->server_wrk = sep->app_wrk_index;
   ct->c_is_ip4 = sep->is_ip4;
   clib_memcpy (&ct->c_lcl_ip, &sep->ip, sizeof (sep->ip));
   ct->c_lcl_port = sep->port;
+  ct->c_s_index = app_listener_index;
   ct->actual_tp = sep->transport_proto;
-  ct_enable_disable_main_pre_input_node (1 /* is_add */ );
   return ct->c_c_index;
 }
 
@@ -346,16 +377,15 @@ static u32
 ct_stop_listen (u32 ct_index)
 {
   ct_connection_t *ct;
-  ct = ct_connection_get (ct_index);
+  ct = ct_connection_get (ct_index, 0);
   ct_connection_free (ct);
-  ct_enable_disable_main_pre_input_node (0 /* is_add */ );
   return 0;
 }
 
 static transport_connection_t *
 ct_listener_get (u32 ct_index)
 {
-  return (transport_connection_t *) ct_connection_get (ct_index);
+  return (transport_connection_t *) ct_connection_get (ct_index, 0);
 }
 
 static int
@@ -428,15 +458,19 @@ ct_session_close (u32 ct_index, u32 thread_index)
   app_worker_t *app_wrk;
   session_t *s;
 
-  ct = ct_connection_get (ct_index);
-  peer_ct = ct_connection_get (ct->peer_index);
+  ct = ct_connection_get (ct_index, thread_index);
+  peer_ct = ct_connection_get (ct->peer_index, thread_index);
   if (peer_ct)
     {
       peer_ct->peer_index = ~0;
-      session_transport_closing_notify (&peer_ct->connection);
+      /* Make sure session was allocated */
+      if (peer_ct->c_s_index != ~0)
+       session_transport_closing_notify (&peer_ct->connection);
+      else
+       ct_connection_free (peer_ct);
     }
 
-  s = session_get (ct->c_s_index, 0);
+  s = session_get (ct->c_s_index, ct->c_thread_index);
   app_wrk = app_worker_get_if_valid (s->app_wrk_index);
   if (app_wrk)
     app_worker_del_segment_notify (app_wrk, ct->segment_handle);
@@ -445,13 +479,13 @@ ct_session_close (u32 ct_index, u32 thread_index)
     segment_manager_dealloc_fifos (ct->client_rx_fifo, ct->client_tx_fifo);
 
   ct_connection_free (ct);
-  ct_enable_disable_main_pre_input_node (0 /* is_add */ );
 }
 
 static transport_connection_t *
 ct_session_get (u32 ct_index, u32 thread_index)
 {
-  return (transport_connection_t *) ct_connection_get (ct_index);
+  return (transport_connection_t *) ct_connection_get (ct_index,
+                                                      thread_index);
 }
 
 static u8 *
@@ -486,10 +520,17 @@ ct_custom_tx (void *session, transport_send_params_t * sp)
   session_t *s = (session_t *) session;
   if (session_has_transport (s))
     return 0;
-  /* As all other sessions, cut-throughs are scheduled by vpp for tx so let
-   * the scheduler's custom tx logic decide when to deschedule, i.e., after
-   * fifo is emptied. */
-  return ct_session_tx (s);
+  /* If event enqueued towards peer, remove from scheduler and remove
+   * session tx flag, i.e., accept new tx events. Unset fifo flag now to
+   * avoid missing events if peer did not clear fifo flag yet, which is
+   * interpreted as successful notification and session is descheduled. */
+  svm_fifo_unset_event (s->tx_fifo);
+  if (!ct_session_tx (s))
+    sp->flags = TRANSPORT_SND_F_DESCHED;
+
+  /* The scheduler uses packet count as a means of upper bounding the amount
+   * of work done per dispatch. So make it look like we have sent something */
+  return 1;
 }
 
 static int
@@ -498,7 +539,7 @@ ct_app_rx_evt (transport_connection_t * tc)
   ct_connection_t *ct = (ct_connection_t *) tc, *peer_ct;
   session_t *ps;
 
-  peer_ct = ct_connection_get (ct->peer_index);
+  peer_ct = ct_connection_get (ct->peer_index, tc->thread_index);
   if (!peer_ct)
     return -1;
   ps = session_get (peer_ct->c_s_index, peer_ct->c_thread_index);
@@ -511,10 +552,10 @@ format_ct_listener (u8 * s, va_list * args)
   u32 tc_index = va_arg (*args, u32);
   u32 __clib_unused thread_index = va_arg (*args, u32);
   u32 __clib_unused verbose = va_arg (*args, u32);
-  ct_connection_t *ct = ct_connection_get (tc_index);
-  s = format (s, "%-50U", format_ct_connection_id, ct);
+  ct_connection_t *ct = ct_connection_get (tc_index, 0);
+  s = format (s, "%-" SESSION_CLI_ID_LEN "U", format_ct_connection_id, ct);
   if (verbose)
-    s = format (s, "%-15s", "LISTEN");
+    s = format (s, "%-" SESSION_CLI_STATE_LEN "s", "LISTEN");
   return s;
 }
 
@@ -526,10 +567,10 @@ format_ct_connection (u8 * s, va_list * args)
 
   if (!ct)
     return s;
-  s = format (s, "%-50U", format_ct_connection_id, ct);
+  s = format (s, "%-" SESSION_CLI_ID_LEN "U", format_ct_connection_id, ct);
   if (verbose)
     {
-      s = format (s, "%-15s", "ESTABLISHED");
+      s = format (s, "%-" SESSION_CLI_STATE_LEN "s", "ESTABLISHED");
       if (verbose > 1)
        {
          s = format (s, "\n");
@@ -542,11 +583,11 @@ static u8 *
 format_ct_session (u8 * s, va_list * args)
 {
   u32 ct_index = va_arg (*args, u32);
-  u32 __clib_unused thread_index = va_arg (*args, u32);
+  u32 thread_index = va_arg (*args, u32);
   u32 verbose = va_arg (*args, u32);
   ct_connection_t *ct;
 
-  ct = ct_connection_get (ct_index);
+  ct = ct_connection_get (ct_index, thread_index);
   if (!ct)
     {
       s = format (s, "empty\n");
@@ -557,8 +598,17 @@ format_ct_session (u8 * s, va_list * args)
   return s;
 }
 
+clib_error_t *
+ct_enable_disable (vlib_main_t * vm, u8 is_en)
+{
+  ct_main.n_workers = vlib_num_workers ();
+  vec_validate (ct_main.connections, ct_main.n_workers);
+  return 0;
+}
+
 /* *INDENT-OFF* */
 static const transport_proto_vft_t cut_thru_proto = {
+  .enable = ct_enable_disable,
   .start_listen = ct_start_listen,
   .stop_listen = ct_stop_listen,
   .get_listener = ct_listener_get,
@@ -585,10 +635,10 @@ ct_session_tx (session_t * s)
   session_t *peer_s;
 
   ct = (ct_connection_t *) session_get_transport (s);
-  peer_ct = ct_connection_get (ct->peer_index);
+  peer_ct = ct_connection_get (ct->peer_index, ct->c_thread_index);
   if (!peer_ct)
-    return -1;
-  peer_s = session_get (peer_ct->c_s_index, 0);
+    return 0;
+  peer_s = session_get (peer_ct->c_s_index, peer_ct->c_thread_index);
   if (peer_s->session_state >= SESSION_STATE_TRANSPORT_CLOSING)
     return 0;
   return session_enqueue_notify (peer_s);