ARP: add feature arc
[vpp.git] / src / vnet / session-apps / echo_client.c
index 51a85c0..bde9f48 100644 (file)
@@ -1,7 +1,7 @@
 /*
  * echo_client.c - vpp built-in echo client code
  *
- * Copyright (c) 2017 by Cisco and/or its affiliates.
+ * Copyright (c) 2017-2019 by Cisco and/or its affiliates.
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at:
@@ -23,6 +23,9 @@
 echo_client_main_t echo_client_main;
 
 #define ECHO_CLIENT_DBG (0)
+#define DBG(_fmt, _args...)                    \
+    if (ECHO_CLIENT_DBG)                               \
+      clib_warning (_fmt, ##_args)
 
 static void
 signal_evt_to_cli_i (int *code)
@@ -60,10 +63,11 @@ send_data_chunk (echo_client_main_t * ecm, eclient_session_t * s)
       if (ecm->no_copy)
        {
          svm_fifo_t *f = s->data.tx_fifo;
-         rv = clib_min (svm_fifo_max_enqueue (f), bytes_this_chunk);
+         rv = clib_min (svm_fifo_max_enqueue_prod (f), bytes_this_chunk);
          svm_fifo_enqueue_nocopy (f, rv);
-         session_send_io_evt_to_thread_custom (f, s->thread_index,
-                                               FIFO_EVENT_APP_TX);
+         session_send_io_evt_to_thread_custom (&f->master_session_index,
+                                               s->thread_index,
+                                               SESSION_IO_EVT_TX);
        }
       else
        rv = app_send_stream (&s->data, test_data + test_buf_offset,
@@ -76,7 +80,7 @@ send_data_chunk (echo_client_main_t * ecm, eclient_session_t * s)
          session_dgram_hdr_t hdr;
          svm_fifo_t *f = s->data.tx_fifo;
          app_session_transport_t *at = &s->data.transport;
-         u32 max_enqueue = svm_fifo_max_enqueue (f);
+         u32 max_enqueue = svm_fifo_max_enqueue_prod (f);
 
          if (max_enqueue <= sizeof (session_dgram_hdr_t))
            return;
@@ -93,10 +97,11 @@ send_data_chunk (echo_client_main_t * ecm, eclient_session_t * s)
          clib_memcpy_fast (&hdr.lcl_ip, &at->lcl_ip,
                            sizeof (ip46_address_t));
          hdr.lcl_port = at->lcl_port;
-         svm_fifo_enqueue_nowait (f, sizeof (hdr), (u8 *) & hdr);
+         svm_fifo_enqueue (f, sizeof (hdr), (u8 *) & hdr);
          svm_fifo_enqueue_nocopy (f, rv);
-         session_send_io_evt_to_thread_custom (f, s->thread_index,
-                                               FIFO_EVENT_APP_TX);
+         session_send_io_evt_to_thread_custom (&f->master_session_index,
+                                               s->thread_index,
+                                               SESSION_IO_EVT_TX);
        }
       else
        rv = app_send_dgram (&s->data, test_data + test_buf_offset,
@@ -149,7 +154,7 @@ receive_data_chunk (echo_client_main_t * ecm, eclient_session_t * s)
     }
   else
     {
-      n_read = svm_fifo_max_dequeue (rx_fifo);
+      n_read = svm_fifo_max_dequeue_cons (rx_fifo);
       svm_fifo_dequeue_drop (rx_fifo, n_read);
     }
 
@@ -263,7 +268,7 @@ echo_client_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
        }
       if (PREDICT_FALSE (delete_session == 1))
        {
-         stream_session_t *s;
+         session_t *s;
 
          clib_atomic_fetch_add (&ecm->tx_total, sp->bytes_sent);
          clib_atomic_fetch_add (&ecm->rx_total, sp->bytes_received);
@@ -349,14 +354,128 @@ echo_clients_init (vlib_main_t * vm)
 
   vec_validate (ecm->connection_index_by_thread, vtm->n_vlib_mains);
   vec_validate (ecm->connections_this_batch_by_thread, vtm->n_vlib_mains);
+  vec_validate (ecm->quic_session_index_by_thread, vtm->n_vlib_mains);
   vec_validate (ecm->vpp_event_queue, vtm->n_vlib_mains);
 
   return 0;
 }
 
+static int
+quic_echo_clients_qsession_connected_callback (u32 app_index, u32 api_context,
+                                              session_t * s, u8 is_fail)
+{
+  echo_client_main_t *ecm = &echo_client_main;
+  vnet_connect_args_t *a = 0;
+  int rv;
+  u8 thread_index = vlib_get_thread_index ();
+  session_endpoint_cfg_t sep = SESSION_ENDPOINT_CFG_NULL;
+  u32 stream_n;
+
+  DBG ("QUIC Connection handle %d", session_handle (s));
+
+  vec_validate (a, 1);
+  a->uri = (char *) ecm->connect_uri;
+  parse_uri (a->uri, &sep);
+  sep.transport_opts = session_handle (s);
+  sep.port = 0;                        /* QUIC: create a stream flag */
+
+  for (stream_n = 0; stream_n < ecm->quic_streams; stream_n++)
+    {
+      clib_memset (a, 0, sizeof (*a));
+      a->app_index = ecm->app_index;
+      a->api_context = -1 - api_context;
+      clib_memcpy (&a->sep_ext, &sep, sizeof (sep));
+
+      DBG ("QUIC opening stream %d", stream_n);
+      if ((rv = vnet_connect (a)))
+       {
+         clib_error ("Stream session %d opening failed: %d", stream_n, rv);
+         return -1;
+       }
+      DBG ("QUIC stream %d connected", stream_n);
+    }
+  vec_add1 (ecm->quic_session_index_by_thread[thread_index],
+           session_handle (s));
+  vec_free (a);
+  return 0;
+}
+
+static int
+quic_echo_clients_session_connected_callback (u32 app_index, u32 api_context,
+                                             session_t * s, u8 is_fail)
+{
+  echo_client_main_t *ecm = &echo_client_main;
+  eclient_session_t *session;
+  u32 session_index;
+  u8 thread_index;
+
+  if (PREDICT_FALSE (ecm->run_test != ECHO_CLIENTS_STARTING))
+    return -1;
+
+  if (is_fail)
+    {
+      clib_warning ("connection %d failed!", api_context);
+      ecm->run_test = ECHO_CLIENTS_EXITING;
+      signal_evt_to_cli (-1);
+      return 0;
+    }
+
+  if (!(s->flags & SESSION_F_QUIC_STREAM))
+    return quic_echo_clients_qsession_connected_callback (app_index,
+                                                         api_context, s,
+                                                         is_fail);
+  DBG ("STREAM Connection callback %d", api_context);
+
+  thread_index = s->thread_index;
+  ASSERT (thread_index == vlib_get_thread_index ()
+         || session_transport_service_type (s) == TRANSPORT_SERVICE_CL);
+
+  if (!ecm->vpp_event_queue[thread_index])
+    ecm->vpp_event_queue[thread_index] =
+      session_main_get_vpp_event_queue (thread_index);
+
+  /*
+   * Setup session
+   */
+  clib_spinlock_lock_if_init (&ecm->sessions_lock);
+  pool_get (ecm->sessions, session);
+  clib_spinlock_unlock_if_init (&ecm->sessions_lock);
+
+  clib_memset (session, 0, sizeof (*session));
+  session_index = session - ecm->sessions;
+  session->bytes_to_send = ecm->bytes_to_send;
+  session->bytes_to_receive = ecm->no_return ? 0ULL : ecm->bytes_to_send;
+  session->data.rx_fifo = s->rx_fifo;
+  session->data.rx_fifo->client_session_index = session_index;
+  session->data.tx_fifo = s->tx_fifo;
+  session->data.tx_fifo->client_session_index = session_index;
+  session->data.vpp_evt_q = ecm->vpp_event_queue[thread_index];
+  session->vpp_session_handle = session_handle (s);
+
+  if (ecm->is_dgram)
+    {
+      transport_connection_t *tc;
+      tc = session_get_transport (s);
+      clib_memcpy_fast (&session->data.transport, tc,
+                       sizeof (session->data.transport));
+      session->data.is_dgram = 1;
+    }
+
+  vec_add1 (ecm->connection_index_by_thread[thread_index], session_index);
+  clib_atomic_fetch_add (&ecm->ready_connections, 1);
+  if (ecm->ready_connections == ecm->expected_connections)
+    {
+      ecm->run_test = ECHO_CLIENTS_RUNNING;
+      /* Signal the CLI process that the action is starting... */
+      signal_evt_to_cli (1);
+    }
+
+  return 0;
+}
+
 static int
 echo_clients_session_connected_callback (u32 app_index, u32 api_context,
-                                        stream_session_t * s, u8 is_fail)
+                                        session_t * s, u8 is_fail)
 {
   echo_client_main_t *ecm = &echo_client_main;
   eclient_session_t *session;
@@ -380,7 +499,7 @@ echo_clients_session_connected_callback (u32 app_index, u32 api_context,
 
   if (!ecm->vpp_event_queue[thread_index])
     ecm->vpp_event_queue[thread_index] =
-      session_manager_get_vpp_event_queue (thread_index);
+      session_main_get_vpp_event_queue (thread_index);
 
   /*
    * Setup session
@@ -393,9 +512,9 @@ echo_clients_session_connected_callback (u32 app_index, u32 api_context,
   session_index = session - ecm->sessions;
   session->bytes_to_send = ecm->bytes_to_send;
   session->bytes_to_receive = ecm->no_return ? 0ULL : ecm->bytes_to_send;
-  session->data.rx_fifo = s->server_rx_fifo;
+  session->data.rx_fifo = s->rx_fifo;
   session->data.rx_fifo->client_session_index = session_index;
-  session->data.tx_fifo = s->server_tx_fifo;
+  session->data.tx_fifo = s->tx_fifo;
   session->data.tx_fifo->client_session_index = session_index;
   session->data.vpp_evt_q = ecm->vpp_event_queue[thread_index];
   session->vpp_session_handle = session_handle (s);
@@ -422,25 +541,31 @@ echo_clients_session_connected_callback (u32 app_index, u32 api_context,
 }
 
 static void
-echo_clients_session_reset_callback (stream_session_t * s)
+echo_clients_session_reset_callback (session_t * s)
 {
+  echo_client_main_t *ecm = &echo_client_main;
+  vnet_disconnect_args_t _a = { 0 }, *a = &_a;
+
   if (s->session_state == SESSION_STATE_READY)
-    clib_warning ("Reset active connection %U", format_stream_session, s, 2);
-  stream_session_cleanup (s);
+    clib_warning ("Reset active connection %U", format_session, s, 2);
+
+  a->handle = session_handle (s);
+  a->app_index = ecm->app_index;
+  vnet_disconnect_session (a);
   return;
 }
 
 static int
-echo_clients_session_create_callback (stream_session_t * s)
+echo_clients_session_create_callback (session_t * s)
 {
   return 0;
 }
 
 static void
-echo_clients_session_disconnect_callback (stream_session_t * s)
+echo_clients_session_disconnect_callback (session_t * s)
 {
   echo_client_main_t *ecm = &echo_client_main;
-  vnet_disconnect_args_t _a, *a = &_a;
+  vnet_disconnect_args_t _a = { 0 }, *a = &_a;
   a->handle = session_handle (s);
   a->app_index = ecm->app_index;
   vnet_disconnect_session (a);
@@ -448,17 +573,17 @@ echo_clients_session_disconnect_callback (stream_session_t * s)
 }
 
 void
-echo_clients_session_disconnect (stream_session_t * s)
+echo_clients_session_disconnect (session_t * s)
 {
   echo_client_main_t *ecm = &echo_client_main;
-  vnet_disconnect_args_t _a, *a = &_a;
+  vnet_disconnect_args_t _a = { 0 }, *a = &_a;
   a->handle = session_handle (s);
   a->app_index = ecm->app_index;
   vnet_disconnect_session (a);
 }
 
 static int
-echo_clients_rx_callback (stream_session_t * s)
+echo_clients_rx_callback (session_t * s)
 {
   echo_client_main_t *ecm = &echo_client_main;
   eclient_session_t *sp;
@@ -469,15 +594,13 @@ echo_clients_rx_callback (stream_session_t * s)
       return -1;
     }
 
-  sp = pool_elt_at_index (ecm->sessions,
-                         s->server_rx_fifo->client_session_index);
+  sp = pool_elt_at_index (ecm->sessions, s->rx_fifo->client_session_index);
   receive_data_chunk (ecm, sp);
 
-  if (svm_fifo_max_dequeue (s->server_rx_fifo))
+  if (svm_fifo_max_dequeue_cons (s->rx_fifo))
     {
-      if (svm_fifo_set_event (s->server_rx_fifo))
-       session_send_io_evt_to_thread (s->server_rx_fifo,
-                                      FIFO_EVENT_BUILTIN_RX);
+      if (svm_fifo_set_event (s->rx_fifo))
+       session_send_io_evt_to_thread (s->rx_fifo, SESSION_IO_EVT_BUILTIN_RX);
     }
   return 0;
 }
@@ -507,12 +630,15 @@ echo_clients_attach (u8 * appns_id, u64 appns_flags, u64 appns_secret)
   echo_client_main_t *ecm = &echo_client_main;
   vnet_app_attach_args_t _a, *a = &_a;
   u64 options[16];
-  clib_error_t *error = 0;
+  int rv;
 
   clib_memset (a, 0, sizeof (*a));
   clib_memset (options, 0, sizeof (options));
 
   a->api_client_index = ecm->my_client_index;
+  if (ecm->transport_proto == TRANSPORT_PROTO_QUIC)
+    echo_clients.session_connected_callback =
+      quic_echo_clients_session_connected_callback;
   a->session_cb_vft = &echo_clients;
 
   prealloc_fifos = ecm->prealloc_fifos ? ecm->expected_connections : 1;
@@ -537,8 +663,8 @@ echo_clients_attach (u8 * appns_id, u64 appns_flags, u64 appns_secret)
   a->options = options;
   a->namespace_id = appns_id;
 
-  if ((error = vnet_application_attach (a)))
-    return error;
+  if ((rv = vnet_application_attach (a)))
+    return clib_error_return (0, "attach returned %d", rv);
 
   ecm->app_index = a->app_index;
   return 0;
@@ -552,6 +678,7 @@ echo_clients_detach ()
   int rv;
 
   da->app_index = ecm->app_index;
+  da->api_client_index = ~0;
   rv = vnet_application_detach (da);
   ecm->test_client_attached = 0;
   ecm->app_index = ~0;
@@ -587,27 +714,24 @@ echo_clients_connect (vlib_main_t * vm, u32 n_clients)
 {
   echo_client_main_t *ecm = &echo_client_main;
   vnet_connect_args_t _a, *a = &_a;
-  clib_error_t *error = 0;
-  int i;
+  int i, rv;
 
   clib_memset (a, 0, sizeof (*a));
+
   for (i = 0; i < n_clients; i++)
     {
       a->uri = (char *) ecm->connect_uri;
       a->api_context = i;
       a->app_index = ecm->app_index;
-
-      if ((error = vnet_connect_uri (a)))
-       return error;
+      if ((rv = vnet_connect_uri (a)))
+       return clib_error_return (0, "connect returned: %d", rv);
 
       /* Crude pacing for call setups  */
-      if ((i % 4) == 0)
-       vlib_process_suspend (vm, 10e-6);
+      if ((i % 16) == 0)
+       vlib_process_suspend (vm, 100e-6);
       ASSERT (i + 1 >= ecm->ready_connections);
-      while (i + 1 - ecm->ready_connections > 1000)
-       {
-         vlib_process_suspend (vm, 100e-6);
-       }
+      while (i + 1 - ecm->ready_connections > 128)
+       vlib_process_suspend (vm, 1e-3);
     }
   return 0;
 }
@@ -633,7 +757,10 @@ echo_clients_command_fn (vlib_main_t * vm,
   clib_error_t *error = 0;
   u8 *appns_id = 0;
   int i;
+  session_endpoint_cfg_t sep = SESSION_ENDPOINT_CFG_NULL;
+  int rv;
 
+  ecm->quic_streams = 1;
   ecm->bytes_to_send = 8192;
   ecm->no_return = 0;
   ecm->fifo_size = 64 << 10;
@@ -658,6 +785,8 @@ echo_clients_command_fn (vlib_main_t * vm,
        ;
       else if (unformat (input, "nclients %d", &n_clients))
        ;
+      else if (unformat (input, "quic-streams %d", &ecm->quic_streams))
+       ;
       else if (unformat (input, "mbytes %lld", &tmp))
        ecm->bytes_to_send = tmp << 20;
       else if (unformat (input, "gbytes %lld", &tmp))
@@ -724,7 +853,7 @@ echo_clients_command_fn (vlib_main_t * vm,
 
 
   ecm->ready_connections = 0;
-  ecm->expected_connections = n_clients;
+  ecm->expected_connections = n_clients * ecm->quic_streams;
   ecm->rx_total = 0;
   ecm->tx_total = 0;
 
@@ -734,8 +863,10 @@ echo_clients_command_fn (vlib_main_t * vm,
       ecm->connect_uri = format (0, "%s%c", default_uri, 0);
     }
 
-  if (ecm->connect_uri[0] == 'u' && ecm->connect_uri[3] != 'c')
-    ecm->is_dgram = 1;
+  if ((rv = parse_uri ((char *) ecm->connect_uri, &sep)))
+    return clib_error_return (0, "Uri parse error: %d", rv);
+  ecm->transport_proto = sep.transport_proto;
+  ecm->is_dgram = (sep.transport_proto == TRANSPORT_PROTO_UDP);
 
 #if ECHO_CLIENT_PTHREAD
   echo_clients_start_tx_pthread ();
@@ -854,6 +985,7 @@ cleanup:
     {
       vec_reset_length (ecm->connection_index_by_thread[i]);
       vec_reset_length (ecm->connections_this_batch_by_thread[i]);
+      vec_reset_length (ecm->quic_session_index_by_thread[i]);
     }
 
   pool_free (ecm->sessions);