hsa: fix builtin echo apps with multiple workers
[vpp.git] / src / plugins / hs_apps / echo_client.c
index 1d036b6..9da7bc5 100644 (file)
@@ -65,9 +65,8 @@ send_data_chunk (echo_client_main_t * ecm, eclient_session_t * s)
          svm_fifo_t *f = s->data.tx_fifo;
          rv = clib_min (svm_fifo_max_enqueue_prod (f), bytes_this_chunk);
          svm_fifo_enqueue_nocopy (f, rv);
-         session_send_io_evt_to_thread_custom (&f->master_session_index,
-                                               s->thread_index,
-                                               SESSION_IO_EVT_TX);
+         session_send_io_evt_to_thread_custom (
+           &f->shr->master_session_index, s->thread_index, SESSION_IO_EVT_TX);
        }
       else
        rv = app_send_stream (&s->data, test_data + test_buf_offset,
@@ -75,17 +74,19 @@ send_data_chunk (echo_client_main_t * ecm, eclient_session_t * s)
     }
   else
     {
+      svm_fifo_t *f = s->data.tx_fifo;
+      u32 max_enqueue = svm_fifo_max_enqueue_prod (f);
+
+      if (max_enqueue < sizeof (session_dgram_hdr_t))
+       return;
+
+      max_enqueue -= sizeof (session_dgram_hdr_t);
+
       if (ecm->no_copy)
        {
          session_dgram_hdr_t hdr;
-         svm_fifo_t *f = s->data.tx_fifo;
          app_session_transport_t *at = &s->data.transport;
-         u32 max_enqueue = svm_fifo_max_enqueue_prod (f);
 
-         if (max_enqueue <= sizeof (session_dgram_hdr_t))
-           return;
-
-         max_enqueue -= sizeof (session_dgram_hdr_t);
          rv = clib_min (max_enqueue, bytes_this_chunk);
 
          hdr.data_length = rv;
@@ -99,13 +100,15 @@ send_data_chunk (echo_client_main_t * ecm, eclient_session_t * s)
          hdr.lcl_port = at->lcl_port;
          svm_fifo_enqueue (f, sizeof (hdr), (u8 *) & hdr);
          svm_fifo_enqueue_nocopy (f, rv);
-         session_send_io_evt_to_thread_custom (&f->master_session_index,
-                                               s->thread_index,
-                                               SESSION_IO_EVT_TX);
+         session_send_io_evt_to_thread_custom (
+           &f->shr->master_session_index, s->thread_index, SESSION_IO_EVT_TX);
        }
       else
-       rv = app_send_dgram (&s->data, test_data + test_buf_offset,
-                            bytes_this_chunk, 0);
+       {
+         bytes_this_chunk = clib_min (bytes_this_chunk, max_enqueue);
+         rv = app_send_dgram (&s->data, test_data + test_buf_offset,
+                              bytes_this_chunk, 0);
+       }
     }
 
   /* If we managed to enqueue data... */
@@ -315,19 +318,6 @@ VLIB_REGISTER_NODE (echo_clients_node) =
 };
 /* *INDENT-ON* */
 
-static int
-create_api_loopback (echo_client_main_t * ecm)
-{
-  api_main_t *am = &api_main;
-  vl_shmem_hdr_t *shmem_hdr;
-
-  shmem_hdr = am->shmem_hdr;
-  ecm->vl_input_queue = shmem_hdr->vl_input_queue;
-  ecm->my_client_index = vl_api_memclnt_create_internal ("echo_client",
-                                                        ecm->vl_input_queue);
-  return 0;
-}
-
 static int
 echo_clients_init (vlib_main_t * vm)
 {
@@ -336,9 +326,6 @@ echo_clients_init (vlib_main_t * vm)
   u32 num_threads;
   int i;
 
-  if (create_api_loopback (ecm))
-    return -1;
-
   num_threads = 1 /* main thread */  + vtm->n_threads;
 
   /* Init test data. Big buffer */
@@ -362,7 +349,8 @@ echo_clients_init (vlib_main_t * vm)
 
 static int
 quic_echo_clients_qsession_connected_callback (u32 app_index, u32 api_context,
-                                              session_t * s, u8 is_fail)
+                                              session_t * s,
+                                              session_error_t err)
 {
   echo_client_main_t *ecm = &echo_client_main;
   vnet_connect_args_t *a = 0;
@@ -370,13 +358,15 @@ quic_echo_clients_qsession_connected_callback (u32 app_index, u32 api_context,
   u8 thread_index = vlib_get_thread_index ();
   session_endpoint_cfg_t sep = SESSION_ENDPOINT_CFG_NULL;
   u32 stream_n;
+  session_handle_t handle;
 
   DBG ("QUIC Connection handle %d", session_handle (s));
 
   vec_validate (a, 1);
   a->uri = (char *) ecm->connect_uri;
-  parse_uri (a->uri, &sep);
-  sep.parent_handle = session_handle (s);
+  if (parse_uri (a->uri, &sep))
+    return -1;
+  sep.parent_handle = handle = session_handle (s);
 
   for (stream_n = 0; stream_n < ecm->quic_streams; stream_n++)
     {
@@ -393,15 +383,19 @@ quic_echo_clients_qsession_connected_callback (u32 app_index, u32 api_context,
        }
       DBG ("QUIC stream %d connected", stream_n);
     }
-  vec_add1 (ecm->quic_session_index_by_thread[thread_index],
-           session_handle (s));
+  /*
+   * 's' is no longer valid, its underlying pool could have been moved in
+   * vnet_connect()
+   */
+  vec_add1 (ecm->quic_session_index_by_thread[thread_index], handle);
   vec_free (a);
   return 0;
 }
 
 static int
 quic_echo_clients_session_connected_callback (u32 app_index, u32 api_context,
-                                             session_t * s, u8 is_fail)
+                                             session_t * s,
+                                             session_error_t err)
 {
   echo_client_main_t *ecm = &echo_client_main;
   eclient_session_t *session;
@@ -411,7 +405,7 @@ quic_echo_clients_session_connected_callback (u32 app_index, u32 api_context,
   if (PREDICT_FALSE (ecm->run_test != ECHO_CLIENTS_STARTING))
     return -1;
 
-  if (is_fail)
+  if (err)
     {
       clib_warning ("connection %d failed!", api_context);
       ecm->run_test = ECHO_CLIENTS_EXITING;
@@ -422,7 +416,7 @@ quic_echo_clients_session_connected_callback (u32 app_index, u32 api_context,
   if (s->listener_handle == SESSION_INVALID_HANDLE)
     return quic_echo_clients_qsession_connected_callback (app_index,
                                                          api_context, s,
-                                                         is_fail);
+                                                         err);
   DBG ("STREAM Connection callback %d", api_context);
 
   thread_index = s->thread_index;
@@ -445,9 +439,9 @@ quic_echo_clients_session_connected_callback (u32 app_index, u32 api_context,
   session->bytes_to_send = ecm->bytes_to_send;
   session->bytes_to_receive = ecm->no_return ? 0ULL : ecm->bytes_to_send;
   session->data.rx_fifo = s->rx_fifo;
-  session->data.rx_fifo->client_session_index = session_index;
+  session->data.rx_fifo->shr->client_session_index = session_index;
   session->data.tx_fifo = s->tx_fifo;
-  session->data.tx_fifo->client_session_index = session_index;
+  session->data.tx_fifo->shr->client_session_index = session_index;
   session->data.vpp_evt_q = ecm->vpp_event_queue[thread_index];
   session->vpp_session_handle = session_handle (s);
 
@@ -474,7 +468,7 @@ quic_echo_clients_session_connected_callback (u32 app_index, u32 api_context,
 
 static int
 echo_clients_session_connected_callback (u32 app_index, u32 api_context,
-                                        session_t * s, u8 is_fail)
+                                        session_t * s, session_error_t err)
 {
   echo_client_main_t *ecm = &echo_client_main;
   eclient_session_t *session;
@@ -484,7 +478,7 @@ echo_clients_session_connected_callback (u32 app_index, u32 api_context,
   if (PREDICT_FALSE (ecm->run_test != ECHO_CLIENTS_STARTING))
     return -1;
 
-  if (is_fail)
+  if (err)
     {
       clib_warning ("connection %d failed!", api_context);
       ecm->run_test = ECHO_CLIENTS_EXITING;
@@ -512,9 +506,9 @@ echo_clients_session_connected_callback (u32 app_index, u32 api_context,
   session->bytes_to_send = ecm->bytes_to_send;
   session->bytes_to_receive = ecm->no_return ? 0ULL : ecm->bytes_to_send;
   session->data.rx_fifo = s->rx_fifo;
-  session->data.rx_fifo->client_session_index = session_index;
+  session->data.rx_fifo->shr->client_session_index = session_index;
   session->data.tx_fifo = s->tx_fifo;
-  session->data.tx_fifo->client_session_index = session_index;
+  session->data.tx_fifo->shr->client_session_index = session_index;
   session->data.vpp_evt_q = ecm->vpp_event_queue[thread_index];
   session->vpp_session_handle = session_handle (s);
 
@@ -593,7 +587,8 @@ echo_clients_rx_callback (session_t * s)
       return -1;
     }
 
-  sp = pool_elt_at_index (ecm->sessions, s->rx_fifo->client_session_index);
+  sp =
+    pool_elt_at_index (ecm->sessions, s->rx_fifo->shr->client_session_index);
   receive_data_chunk (ecm, sp);
 
   if (svm_fifo_max_dequeue_cons (s->rx_fifo))
@@ -625,16 +620,18 @@ static session_cb_vft_t echo_clients = {
 static clib_error_t *
 echo_clients_attach (u8 * appns_id, u64 appns_flags, u64 appns_secret)
 {
+  vnet_app_add_cert_key_pair_args_t _ck_pair, *ck_pair = &_ck_pair;
   u32 prealloc_fifos, segment_size = 256 << 20;
   echo_client_main_t *ecm = &echo_client_main;
   vnet_app_attach_args_t _a, *a = &_a;
-  u64 options[16];
+  u64 options[18];
   int rv;
 
   clib_memset (a, 0, sizeof (*a));
   clib_memset (options, 0, sizeof (options));
 
-  a->api_client_index = ecm->my_client_index;
+  a->api_client_index = ~0;
+  a->name = format (0, "echo_client");
   if (ecm->transport_proto == TRANSPORT_PROTO_QUIC)
     echo_clients.session_connected_callback =
       quic_echo_clients_session_connected_callback;
@@ -654,6 +651,7 @@ echo_clients_attach (u8 * appns_id, u64 appns_flags, u64 appns_secret)
   options[APP_OPTIONS_PREALLOC_FIFO_PAIRS] = prealloc_fifos;
   options[APP_OPTIONS_FLAGS] = APP_OPTIONS_FLAGS_IS_BUILTIN;
   options[APP_OPTIONS_TLS_ENGINE] = ecm->tls_engine;
+  options[APP_OPTIONS_PCT_FIRST_ALLOC] = 100;
   if (appns_id)
     {
       options[APP_OPTIONS_FLAGS] |= appns_flags;
@@ -666,6 +664,16 @@ echo_clients_attach (u8 * appns_id, u64 appns_flags, u64 appns_secret)
     return clib_error_return (0, "attach returned %d", rv);
 
   ecm->app_index = a->app_index;
+  vec_free (a->name);
+
+  clib_memset (ck_pair, 0, sizeof (*ck_pair));
+  ck_pair->cert = (u8 *) test_srv_crt_rsa;
+  ck_pair->key = (u8 *) test_srv_key_rsa;
+  ck_pair->cert_len = test_srv_crt_rsa_len;
+  ck_pair->key_len = test_srv_key_rsa_len;
+  vnet_app_add_cert_key_pair (ck_pair);
+  ecm->ckpair_index = ck_pair->index;
+
   return 0;
 }
 
@@ -681,6 +689,8 @@ echo_clients_detach ()
   rv = vnet_application_detach (da);
   ecm->test_client_attached = 0;
   ecm->app_index = ~0;
+  vnet_app_del_cert_key_pair (ecm->ckpair_index);
+
   return rv;
 }
 
@@ -711,19 +721,30 @@ echo_clients_start_tx_pthread (echo_client_main_t * ecm)
 clib_error_t *
 echo_clients_connect (vlib_main_t * vm, u32 n_clients)
 {
+  session_endpoint_cfg_t sep = SESSION_ENDPOINT_CFG_NULL;
   echo_client_main_t *ecm = &echo_client_main;
   vnet_connect_args_t _a, *a = &_a;
   int i, rv;
 
   clib_memset (a, 0, sizeof (*a));
 
+  if (parse_uri ((char *) ecm->connect_uri, &sep))
+    return clib_error_return (0, "invalid uri");
+
   for (i = 0; i < n_clients; i++)
     {
-      a->uri = (char *) ecm->connect_uri;
+      clib_memcpy (&a->sep_ext, &sep, sizeof (sep));
       a->api_context = i;
       a->app_index = ecm->app_index;
-      if ((rv = vnet_connect_uri (a)))
-       return clib_error_return (0, "connect returned: %d", rv);
+      a->sep_ext.ckpair_index = ecm->ckpair_index;
+
+      vlib_worker_thread_barrier_sync (vm);
+      if ((rv = vnet_connect (a)))
+       {
+         vlib_worker_thread_barrier_release (vm);
+         return clib_error_return (0, "connect returned: %d", rv);
+       }
+      vlib_worker_thread_barrier_release (vm);
 
       /* Crude pacing for call setups  */
       if ((i % 16) == 0)
@@ -746,18 +767,16 @@ echo_clients_command_fn (vlib_main_t * vm,
   echo_client_main_t *ecm = &echo_client_main;
   vlib_thread_main_t *thread_main = vlib_get_thread_main ();
   u64 tmp, total_bytes, appns_flags = 0, appns_secret = 0;
+  session_endpoint_cfg_t sep = SESSION_ENDPOINT_CFG_NULL;
   f64 test_timeout = 20.0, syn_timeout = 20.0, delta;
   char *default_uri = "tcp://6.0.1.1/1234";
+  u8 *appns_id = 0, barrier_acq_needed = 0;
+  int preallocate_sessions = 0, i, rv;
   uword *event_data = 0, event_type;
   f64 time_before_connects;
   u32 n_clients = 1;
-  int preallocate_sessions = 0;
   char *transfer_type;
   clib_error_t *error = 0;
-  u8 *appns_id = 0;
-  int i;
-  session_endpoint_cfg_t sep = SESSION_ENDPOINT_CFG_NULL;
-  int rv;
 
   ecm->quic_streams = 1;
   ecm->bytes_to_send = 8192;
@@ -770,10 +789,30 @@ echo_clients_command_fn (vlib_main_t * vm,
   ecm->test_bytes = 0;
   ecm->test_failed = 0;
   ecm->vlib_main = vm;
-  ecm->tls_engine = TLS_ENGINE_OPENSSL;
+  ecm->tls_engine = CRYPTO_ENGINE_OPENSSL;
   ecm->no_copy = 0;
   ecm->run_test = ECHO_CLIENTS_STARTING;
 
+  if (vlib_num_workers ())
+    {
+      /* The request came over the binary api and the inband cli handler
+       * is not mp_safe. Drop the barrier to make sure the workers are not
+       * blocked.
+       */
+      if (vlib_thread_is_main_w_barrier ())
+       {
+         barrier_acq_needed = 1;
+         vlib_worker_thread_barrier_release (vm);
+       }
+      /*
+       * There's a good chance that both the client and the server echo
+       * apps will be enabled so make sure the session queue node polls on
+       * the main thread as connections will probably be established on it.
+       */
+      vlib_node_set_state (vm, session_queue_node.index,
+                          VLIB_NODE_STATE_POLLING);
+    }
+
   if (thread_main->n_vlib_mains > 1)
     clib_spinlock_init (&ecm->sessions_lock);
   vec_free (ecm->connect_uri);
@@ -807,8 +846,11 @@ echo_clients_command_fn (vlib_main_t * vm,
                         unformat_memory_size, &tmp))
        {
          if (tmp >= 0x100000000ULL)
-           return clib_error_return
-             (0, "private segment size %lld (%llu) too large", tmp, tmp);
+           {
+             error = clib_error_return (
+               0, "private segment size %lld (%llu) too large", tmp, tmp);
+             goto cleanup;
+           }
          ecm->private_segment_size = tmp;
        }
       else if (unformat (input, "preallocate-fifos"))
@@ -836,8 +878,11 @@ echo_clients_command_fn (vlib_main_t * vm,
       else if (unformat (input, "tls-engine %d", &ecm->tls_engine))
        ;
       else
-       return clib_error_return (0, "failed: unknown input `%U'",
-                                 format_unformat_error, input);
+       {
+         error = clib_error_return (0, "failed: unknown input `%U'",
+                                    format_unformat_error, input);
+         goto cleanup;
+       }
     }
 
   /* Store cli process node index for signalling */
@@ -847,7 +892,10 @@ echo_clients_command_fn (vlib_main_t * vm,
   if (ecm->is_init == 0)
     {
       if (echo_clients_init (vm))
-       return clib_error_return (0, "failed init");
+       {
+         error = clib_error_return (0, "failed init");
+         goto cleanup;
+       }
     }
 
 
@@ -863,7 +911,10 @@ echo_clients_command_fn (vlib_main_t * vm,
     }
 
   if ((rv = parse_uri ((char *) ecm->connect_uri, &sep)))
-    return clib_error_return (0, "Uri parse error: %d", rv);
+    {
+      error = clib_error_return (0, "Uri parse error: %d", rv);
+      goto cleanup;
+    }
   ecm->transport_proto = sep.transport_proto;
   ecm->is_dgram = (sep.transport_proto == TRANSPORT_PROTO_UDP);
 
@@ -881,7 +932,7 @@ echo_clients_command_fn (vlib_main_t * vm,
        {
          vec_free (appns_id);
          clib_error_report (error);
-         return error;
+         goto cleanup;
        }
       vec_free (appns_id);
     }
@@ -898,7 +949,9 @@ echo_clients_command_fn (vlib_main_t * vm,
   /* Fire off connect requests */
   time_before_connects = vlib_time_now (vm);
   if ((error = echo_clients_connect (vm, n_clients)))
-    goto cleanup;
+    {
+      goto cleanup;
+    }
 
   /* Park until the sessions come up, or ten seconds elapse... */
   vlib_process_wait_for_event_or_clock (vm, syn_timeout);
@@ -1001,6 +1054,11 @@ cleanup:
   if (error)
     ec_cli_output ("test failed");
   vec_free (ecm->connect_uri);
+  clib_spinlock_free (&ecm->sessions_lock);
+
+  if (barrier_acq_needed)
+    vlib_worker_thread_barrier_sync (vm);
+
   return error;
 }