quic: server create streams test case 79/19979/10
authorDave Wallace <dwallacelf@gmail.com>
Wed, 5 Jun 2019 14:40:07 +0000 (10:40 -0400)
committerFlorin Coras <florin.coras@gmail.com>
Mon, 24 Jun 2019 23:37:39 +0000 (23:37 +0000)
Type: test

* Refactor quic_echo test app
* Add timinig capabilities
* Add multiple quic tests

Change-Id: I3302c66539b12c1375d1a0c6d46f9ff4c6f2b27c
Signed-off-by: Dave Wallace <dwallacelf@gmail.com>
src/plugins/quic/quic.c
src/tests/vnet/session/quic_echo.c
test/test_quic.py

index 0b6c975..4e158bc 100644 (file)
@@ -1936,7 +1936,7 @@ quic_create_quic_session (quic_ctx_t * ctx)
   quic_session->session_type =
     session_type_from_proto_and_ip (TRANSPORT_PROTO_QUIC,
                                    ctx->c_quic_ctx_id.udp_is_ip4);
-  quic_session->listener_handle = lctx->c_quic_ctx_id.listener_ctx_id;
+  quic_session->listener_handle = lctx->c_s_index;
 
   /* TODO: don't alloc fifos when we don't transfer data on this session
    * but we still need fifos for the events? */
index 0149448..a62af64 100644 (file)
@@ -38,7 +38,7 @@
 #include <vpp/api/vpe_all_api_h.h>
 #undef vl_printfun
 
-#define QUIC_ECHO_DBG 1
+#define QUIC_ECHO_DBG 0
 #define DBG(_fmt, _args...)                    \
     if (QUIC_ECHO_DBG)                                 \
       clib_warning (_fmt, ##_args)
@@ -68,6 +68,17 @@ typedef enum
   STATE_DETACHED
 } connection_state_t;
 
+typedef enum
+{
+  ECHO_EVT_START,              /* app starts */
+  ECHO_EVT_FIRST_QCONNECT,     /* First connect Quic session sent */
+  ECHO_EVT_LAST_QCONNECTED,    /* All Quic session are connected */
+  ECHO_EVT_FIRST_SCONNECT,     /* First connect Stream session sent */
+  ECHO_EVT_LAST_SCONNECTED,    /* All Stream session are connected */
+  ECHO_EVT_LAST_BYTE,          /* Last byte received */
+  ECHO_EVT_EXIT,               /* app exits */
+} echo_test_evt_t;
+
 enum quic_session_type_t
 {
   QUIC_SESSION_TYPE_QUIC = 0,
@@ -75,6 +86,28 @@ enum quic_session_type_t
   QUIC_SESSION_TYPE_LISTEN = INT32_MAX,
 };
 
+typedef struct _quic_echo_cb_vft
+{
+  void (*quic_connected_cb) (session_connected_msg_t * mp, u32 session_index);
+  void (*client_stream_connected_cb) (session_connected_msg_t * mp,
+                                     u32 session_index);
+  void (*server_stream_connected_cb) (session_connected_msg_t * mp,
+                                     u32 session_index);
+  void (*quic_accepted_cb) (session_accepted_msg_t * mp, u32 session_index);
+  void (*client_stream_accepted_cb) (session_accepted_msg_t * mp,
+                                    u32 session_index);
+  void (*server_stream_accepted_cb) (session_accepted_msg_t * mp,
+                                    u32 session_index);
+} quic_echo_cb_vft_t;
+
+
+typedef enum
+{
+  RETURN_PACKETS_NOTEST,
+  RETURN_PACKETS_LOG_WRONG,
+  RETURN_PACKETS_ASSERT,
+} test_return_packets_t;
+
 typedef struct
 {
   /* vpe input queue */
@@ -91,6 +124,8 @@ typedef struct
 
   /* Hash table for disconnect processing */
   uword *session_index_by_vpp_handles;
+  /* Handle of vpp listener session */
+  u64 listener_handle;
 
   /* Hash table for shared segment_names */
   uword *shared_segment_handles;
@@ -99,11 +134,6 @@ typedef struct
   /* intermediate rx buffer */
   u8 *rx_buf;
 
-  /* URI for slave's connect */
-  u8 *connect_uri;
-
-  u32 connected_session_index;
-
   int i_am_master;
 
   /* drop all packets */
@@ -124,9 +154,6 @@ typedef struct
 
   /* Signal variables */
   volatile int time_to_stop;
-  volatile int time_to_print_stats;
-
-  u32 configured_segment_size;
 
   /* VNET_API_ERROR_FOO -> "Foo" hash table */
   uword *error_string_by_error_number;
@@ -134,27 +161,39 @@ typedef struct
   u8 *connect_test_data;
   pthread_t *client_thread_handles;
   u32 *thread_args;
-  u32 client_bytes_received;
   u8 test_return_packets;
   u64 bytes_to_send;
+  u64 bytes_to_receive;
   u32 fifo_size;
-  u32 quic_streams;
+
   u8 *appns_id;
   u64 appns_flags;
   u64 appns_secret;
 
-  u32 n_clients;
+  u32 n_clients;               /* Target number of QUIC sessions */
+  u32 n_stream_clients;                /* Target Number of STREAM sessions per QUIC session */
+  volatile u32 n_quic_clients_connected;       /* Number of connected QUIC sessions */
+  volatile u32 n_clients_connected;    /* Number of STREAM sessions connected */
+
   u64 tx_total;
   u64 rx_total;
 
-  volatile u32 n_clients_connected;
-  volatile u32 n_active_clients;
+  /* Event based timing : start & end depend on CLI specified events */
+  u8 first_sconnect_sent;      /* Sent the first Stream session connect ? */
+  f64 start_time;
+  f64 end_time;
+  u8 timing_start_event;
+  u8 timing_end_event;
 
+  /* cb vft for QUIC scenarios */
+  quic_echo_cb_vft_t cb_vft;
 
   /** Flag that decides if socket, instead of svm, api is used to connect to
    * vpp. If sock api is used, shm binary api is subsequently bootstrapped
    * and all other messages are exchanged using shm IPC. */
   u8 use_sock_api;
+
+  /* Limit the number of incorrect data messages */
   int max_test_msg;
 
   fifo_segment_main_t segment_main;
@@ -168,6 +207,34 @@ echo_main_t echo_main;
 #define NITER 4000000
 #endif
 
+#if CLIB_DEBUG > 0
+#define TIMEOUT 10.0
+#else
+#define TIMEOUT 10.0
+#endif
+
+u8 *
+format_quic_echo_state (u8 * s, va_list * args)
+{
+  u32 state = va_arg (*args, u32);
+  if (state == STATE_START)
+    return format (s, "STATE_START");
+  if (state == STATE_ATTACHED)
+    return format (s, "STATE_ATTACHED");
+  if (state == STATE_LISTEN)
+    return format (s, "STATE_LISTEN");
+  if (state == STATE_READY)
+    return format (s, "STATE_READY");
+  if (state == STATE_DISCONNECTING)
+    return format (s, "STATE_DISCONNECTING");
+  if (state == STATE_FAILED)
+    return format (s, "STATE_FAILED");
+  if (state == STATE_DETACHED)
+    return format (s, "STATE_DETACHED");
+  else
+    return format (s, "unknown state");
+}
+
 static u8 *
 format_api_error (u8 * s, va_list * args)
 {
@@ -184,6 +251,60 @@ format_api_error (u8 * s, va_list * args)
   return s;
 }
 
+static void
+quic_echo_notify_event (echo_main_t * em, echo_test_evt_t e)
+{
+  if (em->timing_start_event == e)
+    em->start_time = clib_time_now (&em->clib_time);
+  else if (em->timing_end_event == e)
+    em->end_time = clib_time_now (&em->clib_time);
+}
+
+static uword
+echo_unformat_timing_event (unformat_input_t * input, va_list * args)
+{
+  echo_test_evt_t *a = va_arg (*args, echo_test_evt_t *);
+  if (unformat (input, "start"))
+    *a = ECHO_EVT_START;
+  else if (unformat (input, "qconnect"))
+    *a = ECHO_EVT_FIRST_QCONNECT;
+  else if (unformat (input, "qconnected"))
+    *a = ECHO_EVT_LAST_QCONNECTED;
+  else if (unformat (input, "sconnect"))
+    *a = ECHO_EVT_FIRST_SCONNECT;
+  else if (unformat (input, "sconnected"))
+    *a = ECHO_EVT_LAST_SCONNECTED;
+  else if (unformat (input, "lastbyte"))
+    *a = ECHO_EVT_LAST_BYTE;
+  else if (unformat (input, "exit"))
+    *a = ECHO_EVT_EXIT;
+  else
+    return 0;
+  return 1;
+}
+
+u8 *
+echo_format_timing_event (u8 * s, va_list * args)
+{
+  u32 timing_event = va_arg (*args, u32);
+  if (timing_event == ECHO_EVT_START)
+    return format (s, "start");
+  if (timing_event == ECHO_EVT_FIRST_QCONNECT)
+    return format (s, "qconnect");
+  if (timing_event == ECHO_EVT_LAST_QCONNECTED)
+    return format (s, "qconnected");
+  if (timing_event == ECHO_EVT_FIRST_SCONNECT)
+    return format (s, "sconnect");
+  if (timing_event == ECHO_EVT_LAST_SCONNECTED)
+    return format (s, "sconnected");
+  if (timing_event == ECHO_EVT_LAST_BYTE)
+    return format (s, "lastbyte");
+  if (timing_event == ECHO_EVT_EXIT)
+    return format (s, "exit");
+  else
+    return format (s, "unknown timing event");
+}
+
 static void
 init_error_string_table (echo_main_t * em)
 {
@@ -196,13 +317,9 @@ init_error_string_table (echo_main_t * em)
   hash_set (em->error_string_by_error_number, 99, "Misc");
 }
 
-static void handle_mq_event (session_event_t * e);
-
-#if CLIB_DEBUG > 0
-#define TIMEOUT 10.0
-#else
-#define TIMEOUT 10.0
-#endif
+static void handle_mq_event (echo_main_t * em, session_event_t * e,
+                            int handle_rx);
+static void echo_handle_rx (echo_main_t * em, session_event_t * e);
 
 static int
 wait_for_segment_allocation (u64 segment_handle)
@@ -241,14 +358,14 @@ wait_for_disconnected_sessions (echo_main_t * em)
 }
 
 static int
-wait_for_state_change (echo_main_t * em, connection_state_t state)
+wait_for_state_change (echo_main_t * em, connection_state_t state,
+                      f64 timeout)
 {
   svm_msg_q_msg_t msg;
   session_event_t *e;
-  f64 timeout;
-  timeout = clib_time_now (&em->clib_time) + TIMEOUT;
+  f64 end_time = clib_time_now (&em->clib_time) + timeout;
 
-  while (clib_time_now (&em->clib_time) < timeout)
+  while (!timeout || clib_time_now (&em->clib_time) < end_time)
     {
       if (em->state == state)
        return 0;
@@ -262,13 +379,27 @@ wait_for_state_change (echo_main_t * em, connection_state_t state)
       if (svm_msg_q_sub (em->our_event_queue, &msg, SVM_Q_NOWAIT, 0))
        continue;
       e = svm_msg_q_msg_data (em->our_event_queue, &msg);
-      handle_mq_event (e);
+      handle_mq_event (em, e, 0 /* handle_rx */ );
       svm_msg_q_free_msg (em->our_event_queue, &msg);
     }
-  clib_warning ("timeout waiting for state %d", state);
+  clib_warning ("timeout waiting for state %U", format_quic_echo_state,
+               state);
   return -1;
 }
 
+static void
+notify_rx_data_to_vpp (echo_session_t * s)
+{
+  svm_fifo_t *f = s->tx_fifo;
+  return;                      /* FOR NOW */
+  if (svm_fifo_set_event (f))
+    {
+      DBG ("did send event");
+      app_send_io_evt_to_vpp (s->vpp_evt_q, f->master_session_index,
+                             SESSION_IO_EVT_TX, 0 /* noblock */ );
+    }
+}
+
 void
 application_send_attach (echo_main_t * em)
 {
@@ -323,12 +454,7 @@ static int
 application_attach (echo_main_t * em)
 {
   application_send_attach (em);
-  if (wait_for_state_change (em, STATE_ATTACHED))
-    {
-      clib_warning ("timeout waiting for STATE_ATTACHED");
-      return -1;
-    }
-  return 0;
+  return wait_for_state_change (em, STATE_ATTACHED, TIMEOUT);
 }
 
 void
@@ -450,29 +576,15 @@ stop_signal (int signum)
   um->time_to_stop = 1;
 }
 
-static void
-stats_signal (int signum)
-{
-  echo_main_t *um = &echo_main;
-  um->time_to_print_stats = 1;
-}
-
 static clib_error_t *
 setup_signal_handlers (void)
 {
-  signal (SIGUSR2, stats_signal);
   signal (SIGINT, stop_signal);
   signal (SIGQUIT, stop_signal);
   signal (SIGTERM, stop_signal);
   return 0;
 }
 
-void
-vlib_cli_output (struct vlib_main_t *vm, char *fmt, ...)
-{
-  clib_warning ("BUG");
-}
-
 int
 connect_to_vpp (char *name)
 {
@@ -565,31 +677,55 @@ vl_api_map_another_segment_t_handler (vl_api_map_another_segment_t * mp)
 static void
 session_print_stats (echo_main_t * em, echo_session_t * session)
 {
-  f64 deltat;
-  u64 bytes;
+  f64 deltat = clib_time_now (&em->clib_time) - session->start;
+  fformat (stdout, "Session %x done in %.6fs RX[%.4f] TX[%.4f] Gbit/s\n",
+          session->session_index, deltat,
+          (session->bytes_received * 8.0) / deltat / 1e9,
+          (session->bytes_sent * 8.0) / deltat / 1e9);
+}
 
-  deltat = clib_time_now (&em->clib_time) - session->start;
-  bytes = em->i_am_master ? session->bytes_received : em->bytes_to_send;
-  fformat (stdout, "Finished in %.6f\n", deltat);
-  fformat (stdout, "%.4f Gbit/second\n", (bytes * 8.0) / deltat / 1e9);
+static void
+print_global_stats (echo_main_t * em)
+{
+  f64 deltat = em->end_time - em->start_time;
+  u8 *s = format (0, "%U:%U",
+                 echo_format_timing_event, em->timing_start_event,
+                 echo_format_timing_event, em->timing_end_event);
+  fformat (stdout, "Timinig %s\n", s);
+  fformat (stdout, "-------- TX --------\n");
+  fformat (stdout, "%lld bytes (%lld mbytes, %lld gbytes) in %.2f seconds\n",
+          em->tx_total, em->tx_total / (1ULL << 20),
+          em->tx_total / (1ULL << 30), deltat);
+  fformat (stdout, "%.4f Gbit/second\n", (em->tx_total * 8.0) / deltat / 1e9);
+  fformat (stdout, "-------- RX --------\n");
+  fformat (stdout, "%lld bytes (%lld mbytes, %lld gbytes) in %.2f seconds\n",
+          em->rx_total, em->rx_total / (1ULL << 20),
+          em->rx_total / (1ULL << 30), deltat);
+  fformat (stdout, "%.4f Gbit/second\n", (em->rx_total * 8.0) / deltat / 1e9);
+  fformat (stdout, "--------------------\n");
 }
 
+
 static void
 test_recv_bytes (echo_main_t * em, echo_session_t * s, u8 * rx_buf,
                 u32 n_read)
 {
   int i;
+  u8 expected;
   for (i = 0; i < n_read; i++)
     {
-      if (rx_buf[i] != ((s->bytes_received + i) & 0xff)
-         && em->max_test_msg > 0)
+      expected = (s->bytes_received + i) & 0xff;
+      if (rx_buf[i] != expected && em->max_test_msg > 0)
        {
-         clib_warning ("error at byte %lld, 0x%x not 0x%x",
-                       s->bytes_received + i, rx_buf[i],
-                       ((s->bytes_received + i) & 0xff));
+         clib_warning
+           ("Session[%lx][0x%lx] byte[%lld], got 0x%x but expected 0x%x",
+            s->session_index, s->vpp_session_handle, s->bytes_received + i,
+            rx_buf[i], expected);
          em->max_test_msg--;
          if (em->max_test_msg == 0)
            clib_warning ("Too many errors, hiding next ones");
+         if (em->test_return_packets == RETURN_PACKETS_ASSERT)
+           ASSERT (0);
        }
     }
 }
@@ -605,22 +741,18 @@ recv_data_chunk (echo_main_t * em, echo_session_t * s, u8 * rx_buf)
 
   do
     {
-      n_read = app_recv_stream ((app_session_t *) s, rx_buf,
-                               vec_len (rx_buf));
-
-      if (n_read > 0)
-       {
-         if (em->test_return_packets)
-           test_recv_bytes (em, s, rx_buf, n_read);
-
-         n_to_read -= n_read;
-
-         s->bytes_received += n_read;
-         ASSERT (s->bytes_to_receive >= n_read);
-         s->bytes_to_receive -= n_read;
-       }
-      else
+      n_read =
+       app_recv_stream ((app_session_t *) s, rx_buf, vec_len (rx_buf));
+      if (n_read <= 0)
        break;
+      notify_rx_data_to_vpp (s);
+      if (em->test_return_packets)
+       test_recv_bytes (em, s, rx_buf, n_read);
+
+      ASSERT (s->bytes_to_receive >= n_read);
+      n_to_read -= n_read;
+      s->bytes_received += n_read;
+      s->bytes_to_receive -= n_read;
     }
   while (n_to_read > 0);
 }
@@ -654,7 +786,7 @@ static void *
 client_thread_fn (void *arg)
 {
   echo_main_t *em = &echo_main;
-  static u8 *rx_buf = 0;
+  u8 *rx_buf = 0;
   u32 session_index = *(u32 *) arg;
   echo_session_t *s;
 
@@ -672,18 +804,21 @@ client_thread_fn (void *arg)
        break;
     }
 
-  DBG ("session %d done send %lu to do, %lu done || recv %lu to do, %lu done",
-       session_index, s->bytes_to_send, s->bytes_sent, s->bytes_to_receive,
-       s->bytes_received);
+  DBG ("[%lu/%lu] -> S(%x) -> [%lu/%lu]",
+       s->bytes_received, s->bytes_received + s->bytes_to_receive,
+       session_index, s->bytes_sent, s->bytes_sent + s->bytes_to_send);
   em->tx_total += s->bytes_sent;
   em->rx_total += s->bytes_received;
-  em->n_active_clients--;
+  em->n_clients_connected--;
+
+  if (em->n_clients_connected == 0)
+    quic_echo_notify_event (em, ECHO_EVT_LAST_BYTE);
 
   pthread_exit (0);
 }
 
-void
-client_send_connect (echo_main_t * em, u8 * uri, u32 opaque)
+static void
+echo_send_connect (echo_main_t * em, u8 * uri, u32 opaque)
 {
   vl_api_connect_uri_t *cmp;
   cmp = vl_msg_api_alloc (sizeof (*cmp));
@@ -696,8 +831,8 @@ client_send_connect (echo_main_t * em, u8 * uri, u32 opaque)
   vl_msg_api_send_shmem (em->vl_input_queue, (u8 *) & cmp);
 }
 
-void
-client_send_disconnect (echo_main_t * em, echo_session_t * s)
+static void
+client_disconnect_session (echo_main_t * em, echo_session_t * s)
 {
   vl_api_disconnect_session_t *dmp;
   dmp = vl_msg_api_alloc (sizeof (*dmp));
@@ -707,15 +842,8 @@ client_send_disconnect (echo_main_t * em, echo_session_t * s)
   dmp->handle = s->vpp_session_handle;
   DBG ("Sending Session disonnect handle %lu", dmp->handle);
   vl_msg_api_send_shmem (em->vl_input_queue, (u8 *) & dmp);
-}
-
-int
-client_disconnect (echo_main_t * em, echo_session_t * s)
-{
-  client_send_disconnect (em, s);
   pool_put (em->sessions, s);
   clib_memset (s, 0xfe, sizeof (*s));
-  return 0;
 }
 
 static void
@@ -734,16 +862,10 @@ session_bound_handler (session_bound_msg_t * mp)
   clib_warning ("listening on %U:%u", format_ip46_address, mp->lcl_ip,
                mp->lcl_is_ip4 ? IP46_TYPE_IP4 : IP46_TYPE_IP6,
                clib_net_to_host_u16 (mp->lcl_port));
-  em->state = STATE_READY;
-}
-
-static void
-quic_qsession_accepted_handler (session_accepted_msg_t * mp)
-{
-  DBG ("Accept on QSession index %u", mp->handle);
+  em->listener_handle = mp->handle;
+  em->state = STATE_LISTEN;
 }
 
-
 static void
 session_accepted_handler (session_accepted_msg_t * mp)
 {
@@ -752,30 +874,15 @@ session_accepted_handler (session_accepted_msg_t * mp)
   svm_fifo_t *rx_fifo, *tx_fifo;
   echo_main_t *em = &echo_main;
   echo_session_t *session;
-  static f64 start_time;
   u32 session_index;
-  u64 segment_handle;
-  u8 *ip_str;
-
-  segment_handle = mp->segment_handle;
-
-  if (start_time == 0.0)
-    start_time = clib_time_now (&em->clib_time);
-
-  ip_str = format (0, "%U", format_ip46_address, &mp->rmt.ip, mp->rmt.is_ip4);
-  clib_warning ("Accepted session from: %s:%d", ip_str,
-               clib_net_to_host_u16 (mp->rmt.port));
 
   /* Allocate local session and set it up */
   pool_get (em->sessions, session);
   session_index = session - em->sessions;
 
-  if (wait_for_segment_allocation (segment_handle))
-    {
-      clib_warning ("timeout waiting for segment allocation %lu",
-                   segment_handle);
-      return;
-    }
+  if (wait_for_segment_allocation (mp->segment_handle))
+    return;
+
   rx_fifo = uword_to_pointer (mp->server_rx_fifo, svm_fifo_t *);
   rx_fifo->client_session_index = session_index;
   tx_fifo = uword_to_pointer (mp->server_tx_fifo, svm_fifo_t *);
@@ -783,7 +890,9 @@ session_accepted_handler (session_accepted_msg_t * mp)
 
   session->rx_fifo = rx_fifo;
   session->tx_fifo = tx_fifo;
+  session->session_index = session_index;
   session->vpp_session_handle = mp->handle;
+  session->start = clib_time_now (&em->clib_time);
   session->vpp_evt_q = uword_to_pointer (mp->vpp_event_queue_address,
                                         svm_msg_q_t *);
 
@@ -792,9 +901,6 @@ session_accepted_handler (session_accepted_msg_t * mp)
        mp->listener_handle, session_index);
   hash_set (em->session_index_by_vpp_handles, mp->handle, session_index);
 
-  /*
-   * Send accept reply to vpp
-   */
   app_alloc_ctrl_evt_to_vpp (session->vpp_evt_q, app_evt,
                             SESSION_CTRL_EVT_ACCEPTED_REPLY);
   rmp = (session_accepted_reply_msg_t *) app_evt->evt->data;
@@ -802,33 +908,33 @@ session_accepted_handler (session_accepted_msg_t * mp)
   rmp->context = mp->context;
   app_send_ctrl_evt_to_vpp (session->vpp_evt_q, app_evt);
 
-  /* TODO : this is very ugly */
-  if (mp->rmt.is_ip4 != 255)
-    return quic_qsession_accepted_handler (mp);
   DBG ("SSession handle is %lu", mp->handle);
-
-  em->state = STATE_READY;
-
-  /* Stats printing */
-  if (pool_elts (em->sessions) && (pool_elts (em->sessions) % 20000) == 0)
+  if (mp->listener_handle == em->listener_handle)
     {
-      f64 now = clib_time_now (&em->clib_time);
-      fformat (stdout, "%d active sessions in %.2f seconds, %.2f/sec...\n",
-              pool_elts (em->sessions), now - start_time,
-              (f64) pool_elts (em->sessions) / (now - start_time));
+      if (em->cb_vft.quic_accepted_cb)
+       em->cb_vft.quic_accepted_cb (mp, session_index);
+      em->n_quic_clients_connected++;
+    }
+  else if (em->i_am_master)
+    {
+      if (em->cb_vft.server_stream_accepted_cb)
+       em->cb_vft.server_stream_accepted_cb (mp, session_index);
+      em->n_clients_connected++;
+    }
+  else
+    {
+      if (em->cb_vft.client_stream_accepted_cb)
+       em->cb_vft.client_stream_accepted_cb (mp, session_index);
+      em->n_clients_connected++;
     }
 
-  session->bytes_received = 0;
-  session->start = clib_time_now (&em->clib_time);
-}
-
-static void
-quic_session_connected_handler (session_connected_msg_t * mp)
-{
-  echo_main_t *em = &echo_main;
-  u8 *uri = format (0, "QUIC://session/%lu", mp->handle);
-  DBG ("QSession Connect : %s", uri);
-  client_send_connect (em, uri, QUIC_SESSION_TYPE_STREAM);
+  if (em->n_clients_connected == em->n_clients * em->n_stream_clients)
+    {
+      em->state = STATE_READY;
+      quic_echo_notify_event (em, ECHO_EVT_LAST_SCONNECTED);
+    }
+  if (em->n_quic_clients_connected == em->n_clients)
+    quic_echo_notify_event (em, ECHO_EVT_LAST_QCONNECTED);
 }
 
 static void
@@ -838,9 +944,6 @@ session_connected_handler (session_connected_msg_t * mp)
   echo_session_t *session;
   u32 session_index;
   svm_fifo_t *rx_fifo, *tx_fifo;
-  int rv;
-  u64 segment_handle;
-  segment_handle = mp->segment_handle;
 
   if (mp->retval)
     {
@@ -850,21 +953,14 @@ session_connected_handler (session_connected_msg_t * mp)
       return;
     }
 
-  /*
-   * Setup session
-   */
-
   pool_get (em->sessions, session);
   clib_memset (session, 0, sizeof (*session));
   session_index = session - em->sessions;
-  DBG ("Setting session_index %lu", session_index);
+  DBG ("CONNECTED session[%lx][0x%lx]", session_index, mp->handle);
+
+  if (wait_for_segment_allocation (mp->segment_handle))
+    return;
 
-  if (wait_for_segment_allocation (segment_handle))
-    {
-      clib_warning ("timeout waiting for segment allocation %lu",
-                   segment_handle);
-      return;
-    }
   rx_fifo = uword_to_pointer (mp->server_rx_fifo, svm_fifo_t *);
   rx_fifo->client_session_index = session_index;
   tx_fifo = uword_to_pointer (mp->server_tx_fifo, svm_fifo_t *);
@@ -873,17 +969,120 @@ session_connected_handler (session_connected_msg_t * mp)
   session->rx_fifo = rx_fifo;
   session->tx_fifo = tx_fifo;
   session->vpp_session_handle = mp->handle;
+  session->session_index = session_index;
   session->start = clib_time_now (&em->clib_time);
   session->vpp_evt_q = uword_to_pointer (mp->vpp_event_queue_address,
                                         svm_msg_q_t *);
 
-  DBG ("Connected session handle %lx, idx %lu", mp->handle, session_index);
+  DBG ("Connected session handle %lx, idx %lu RX[%lx] TX[%lx]", mp->handle,
+       session_index, rx_fifo, tx_fifo);
   hash_set (em->session_index_by_vpp_handles, mp->handle, session_index);
 
   if (mp->context == QUIC_SESSION_TYPE_QUIC)
-    return quic_session_connected_handler (mp);
+    {
+      if (em->cb_vft.quic_connected_cb)
+       em->cb_vft.quic_connected_cb (mp, session_index);
+      em->n_quic_clients_connected++;
+    }
+  else if (em->i_am_master)
+    {
+      if (em->cb_vft.server_stream_connected_cb)
+       em->cb_vft.server_stream_connected_cb (mp, session_index);
+      em->n_clients_connected++;
+    }
+  else
+    {
+      if (em->cb_vft.client_stream_connected_cb)
+       em->cb_vft.client_stream_connected_cb (mp, session_index);
+      em->n_clients_connected++;
+    }
 
-  DBG ("SSession Connected");
+  if (em->n_clients_connected == em->n_clients * em->n_stream_clients)
+    {
+      em->state = STATE_READY;
+      quic_echo_notify_event (em, ECHO_EVT_LAST_SCONNECTED);
+    }
+  if (em->n_quic_clients_connected == em->n_clients)
+    quic_echo_notify_event (em, ECHO_EVT_LAST_QCONNECTED);
+}
+
+/*
+ *
+ *  ECHO Callback definitions
+ *
+ */
+
+
+static void
+echo_on_connected_connect (session_connected_msg_t * mp, u32 session_index)
+{
+  echo_main_t *em = &echo_main;
+  u8 *uri = format (0, "QUIC://session/%lu", mp->handle);
+  int i;
+
+  if (!em->first_sconnect_sent)
+    {
+      em->first_sconnect_sent = 1;
+      quic_echo_notify_event (em, ECHO_EVT_FIRST_SCONNECT);
+    }
+  for (i = 0; i < em->n_stream_clients; i++)
+    {
+      DBG ("CONNECT : new QUIC stream #%d: %s", i, uri);
+      echo_send_connect (em, uri, QUIC_SESSION_TYPE_STREAM);
+    }
+
+  clib_warning ("session %u (0x%llx) connected with local ip %U port %d",
+               session_index, mp->handle, format_ip46_address, &mp->lcl.ip,
+               mp->lcl.is_ip4, clib_net_to_host_u16 (mp->lcl.port));
+}
+
+static void
+echo_on_connected_send (session_connected_msg_t * mp, u32 session_index)
+{
+  echo_main_t *em = &echo_main;
+  int rv;
+  echo_session_t *session;
+
+  DBG ("Stream Session Connected");
+
+  session = pool_elt_at_index (em->sessions, session_index);
+  session->bytes_to_send = em->bytes_to_send;
+  session->bytes_to_receive = em->bytes_to_receive;
+
+  /*
+   * Start RX thread
+   */
+  em->thread_args[em->n_clients_connected] = session_index;
+  rv = pthread_create (&em->client_thread_handles[em->n_clients_connected],
+                      NULL /*attr */ , client_thread_fn,
+                      (void *) &em->thread_args[em->n_clients_connected]);
+  if (rv)
+    {
+      clib_warning ("pthread_create returned %d", rv);
+      return;
+    }
+}
+
+static void
+echo_on_connected_error (session_connected_msg_t * mp, u32 session_index)
+{
+  clib_warning ("Got a wrong connected on session %u [%lx]", session_index,
+               mp->handle);
+}
+
+static void
+echo_on_accept_recv (session_accepted_msg_t * mp, u32 session_index)
+{
+  echo_main_t *em = &echo_main;
+  int rv;
+  echo_session_t *session;
+
+  session = pool_elt_at_index (em->sessions, session_index);
+  session->bytes_to_send = em->bytes_to_send;
+  session->bytes_to_receive = em->bytes_to_receive;
+
+  DBG ("Stream session accepted 0x%lx, expecting %lu bytes",
+       session->vpp_session_handle, session->bytes_to_receive);
 
   /*
    * Start RX thread
@@ -898,12 +1097,116 @@ session_connected_handler (session_connected_msg_t * mp)
       return;
     }
 
-  em->n_clients_connected += 1;
-  clib_warning ("session %u (0x%llx) connected with local ip %U port %d",
-               session_index, mp->handle, format_ip46_address, &mp->lcl.ip,
-               mp->lcl.is_ip4, clib_net_to_host_u16 (mp->lcl.port));
 }
 
+static void
+echo_on_accept_connect (session_accepted_msg_t * mp, u32 session_index)
+{
+  echo_main_t *em = &echo_main;
+  DBG ("Accept on QSession index %u", mp->handle);
+  u8 *uri = format (0, "QUIC://session/%lu", mp->handle);
+  u32 i;
+
+  if (!em->first_sconnect_sent)
+    {
+      em->first_sconnect_sent = 1;
+      quic_echo_notify_event (em, ECHO_EVT_FIRST_SCONNECT);
+    }
+  for (i = 0; i < em->n_stream_clients; i++)
+    {
+      DBG ("ACCEPT : new QUIC stream #%d: %s", i, uri);
+      echo_send_connect (em, uri, QUIC_SESSION_TYPE_STREAM);
+    }
+}
+
+static void
+echo_on_accept_error (session_accepted_msg_t * mp, u32 session_index)
+{
+  clib_warning ("Got a wrong accept on session %u [%lx]", session_index,
+               mp->handle);
+}
+
+static void
+echo_on_accept_log_ip (session_accepted_msg_t * mp, u32 session_index)
+{
+  u8 *ip_str;
+  ip_str = format (0, "%U", format_ip46_address, &mp->rmt.ip, mp->rmt.is_ip4);
+  clib_warning ("Accepted session from: %s:%d", ip_str,
+               clib_net_to_host_u16 (mp->rmt.port));
+
+}
+
+static const quic_echo_cb_vft_t default_cb_vft = {
+  /* Qsessions */
+  .quic_accepted_cb = &echo_on_accept_log_ip,
+  .quic_connected_cb = &echo_on_connected_connect,
+  /* client initiated streams */
+  .server_stream_accepted_cb = NULL,
+  .client_stream_connected_cb = &echo_on_connected_send,
+  /* server initiated streams */
+  .client_stream_accepted_cb = &echo_on_accept_error,
+  .server_stream_connected_cb = &echo_on_connected_error,
+};
+
+static const quic_echo_cb_vft_t server_stream_cb_vft = {
+  /* Qsessions */
+  .quic_accepted_cb = &echo_on_accept_connect,
+  .quic_connected_cb = NULL,
+  /* client initiated streams */
+  .server_stream_accepted_cb = &echo_on_accept_error,
+  .client_stream_connected_cb = &echo_on_connected_error,
+  /* server initiated streams */
+  .client_stream_accepted_cb = &echo_on_accept_recv,
+  .server_stream_connected_cb = &echo_on_connected_send,
+};
+
+static uword
+echo_unformat_quic_setup_vft (unformat_input_t * input, va_list * args)
+{
+  echo_main_t *em = &echo_main;
+  if (unformat (input, "serverstream"))
+    {
+      clib_warning ("Using QUIC server initiated streams");
+      em->no_return = 1;
+      em->cb_vft = server_stream_cb_vft;
+      return 1;
+    }
+  else if (unformat (input, "default"))
+    return 1;
+  return 0;
+}
+
+static uword
+echo_unformat_data (unformat_input_t * input, va_list * args)
+{
+  u64 _a;
+  u64 *a = va_arg (*args, u64 *);
+  if (unformat (input, "%lluGb", &_a))
+    {
+      *a = _a << 30;
+      return 1;
+    }
+  else if (unformat (input, "%lluMb", &_a))
+    {
+      *a = _a << 20;
+      return 1;
+    }
+  else if (unformat (input, "%lluKb", &_a))
+    {
+      *a = _a << 10;
+      return 1;
+    }
+  else if (unformat (input, "%llu", a))
+    return 1;
+  return 0;
+}
+
+/*
+ *
+ *  End of ECHO callback definitions
+ *
+ */
+
 static void
 session_disconnected_handler (session_disconnected_msg_t * mp)
 {
@@ -972,7 +1275,7 @@ session_reset_handler (session_reset_msg_t * mp)
 }
 
 static void
-handle_mq_event (session_event_t * e)
+handle_mq_event (echo_main_t * em, session_event_t * e, int handle_rx)
 {
   switch (e->event_type)
     {
@@ -996,120 +1299,70 @@ handle_mq_event (session_event_t * e)
       DBG ("SESSION_CTRL_EVT_RESET");
       session_reset_handler ((session_reset_msg_t *) e->data);
       break;
+    case SESSION_IO_EVT_RX:
+      DBG ("SESSION_IO_EVT_RX");
+      if (handle_rx)
+       echo_handle_rx (em, e);
+      break;
     default:
-      clib_warning ("unhandled %u", e->event_type);
+      clib_warning ("unhandled event %u", e->event_type);
     }
 }
 
-static void
+static int
 clients_run (echo_main_t * em)
 {
-  f64 start_time, deltat, timeout = 100.0;
   svm_msg_q_msg_t msg;
   session_event_t *e;
   echo_session_t *s;
   hash_pair_t *p;
   int i;
 
-  /* Init test data */
-  vec_validate (em->connect_test_data, 1024 * 1024 - 1);
-  for (i = 0; i < vec_len (em->connect_test_data); i++)
-    em->connect_test_data[i] = i & 0xff;
-
   /*
    * Attach and connect the clients
    */
   if (application_attach (em))
-    return;
+    return -1;
 
+  quic_echo_notify_event (em, ECHO_EVT_FIRST_QCONNECT);
   for (i = 0; i < em->n_clients; i++)
-    client_send_connect (em, em->connect_uri, QUIC_SESSION_TYPE_QUIC);
+    echo_send_connect (em, em->uri, QUIC_SESSION_TYPE_QUIC);
 
-  start_time = clib_time_now (&em->clib_time);
-  while (em->n_clients_connected < em->n_clients
-        && (clib_time_now (&em->clib_time) - start_time < timeout)
-        && em->state != STATE_FAILED && em->time_to_stop != 1)
+  wait_for_state_change (em, STATE_READY, TIMEOUT);
 
+  /*
+   * Wait for client threads to send the data
+   */
+  DBG ("Waiting for data on %u clients", em->n_clients_connected);
+  while (em->n_clients_connected)
     {
-      int rc = svm_msg_q_sub (em->our_event_queue, &msg, SVM_Q_TIMEDWAIT, 1);
-      if (rc == ETIMEDOUT && em->time_to_stop)
-       break;
-      if (rc == ETIMEDOUT)
+      if (svm_msg_q_is_empty (em->our_event_queue))
+       continue;
+      if (svm_msg_q_sub (em->our_event_queue, &msg, SVM_Q_TIMEDWAIT, 1))
        continue;
       e = svm_msg_q_msg_data (em->our_event_queue, &msg);
-      handle_mq_event (e);
+      handle_mq_event (em, e, 0 /* handle_rx */ );
       svm_msg_q_free_msg (em->our_event_queue, &msg);
     }
 
-  if (em->n_clients_connected != em->n_clients)
-    {
-      clib_warning ("failed to initialize all connections");
-      return;
-    }
-
-  /*
-   * Initialize connections
-   */
-  DBG ("Initialize connections on %u clients", em->n_clients);
-
-  /* *INDENT-OFF* */
-  hash_foreach_pair (p, em->session_index_by_vpp_handles,
-                ({
-      s = pool_elt_at_index (em->sessions, p->value[0]);
-      s->bytes_to_send = em->bytes_to_send;
-      if (!em->no_return)
-       s->bytes_to_receive = em->bytes_to_send;
-                }));
-  /* *INDENT-ON* */
-  em->n_active_clients = em->n_clients_connected;
-
-  /*
-   * Wait for client threads to send the data
-   */
-  DBG ("Waiting for data on %u clients", em->n_active_clients);
-  start_time = clib_time_now (&em->clib_time);
-  em->state = STATE_READY;
-  while (em->n_active_clients)
-    if (!svm_msg_q_is_empty (em->our_event_queue))
-      {
-       if (svm_msg_q_sub (em->our_event_queue, &msg, SVM_Q_TIMEDWAIT, 0))
-         {
-           clib_warning ("svm msg q returned");
-           continue;
-         }
-       e = svm_msg_q_msg_data (em->our_event_queue, &msg);
-       if (e->event_type != FIFO_EVENT_APP_RX)
-         handle_mq_event (e);
-       svm_msg_q_free_msg (em->our_event_queue, &msg);
-      }
-
   /* *INDENT-OFF* */
   hash_foreach_pair (p, em->session_index_by_vpp_handles,
-                ({
+    ({
       s = pool_elt_at_index (em->sessions, p->value[0]);
       DBG ("Sending disconnect on session %lu", p->key);
-      client_disconnect (em, s);
-                }));
+      client_disconnect_session (em, s);
+    }));
   /* *INDENT-ON* */
 
-  /*
-   * Stats and detach
-   */
-  deltat = clib_time_now (&em->clib_time) - start_time;
-  fformat (stdout, "%lld bytes (%lld mbytes, %lld gbytes) in %.2f seconds\n",
-          em->tx_total, em->tx_total / (1ULL << 20),
-          em->tx_total / (1ULL << 30), deltat);
-  fformat (stdout, "%.4f Gbit/second\n", (em->tx_total * 8.0) / deltat / 1e9);
-
   wait_for_disconnected_sessions (em);
   application_detach (em);
+  return 0;
 }
 
 static void
 vl_api_bind_uri_reply_t_handler (vl_api_bind_uri_reply_t * mp)
 {
   echo_main_t *em = &echo_main;
-
   if (mp->retval)
     {
       clib_warning ("bind failed: %U", format_api_error,
@@ -1118,7 +1371,7 @@ vl_api_bind_uri_reply_t_handler (vl_api_bind_uri_reply_t * mp)
       return;
     }
 
-  em->state = STATE_READY;
+  em->state = STATE_LISTEN;
 }
 
 static void
@@ -1216,7 +1469,7 @@ format_ip46_address (u8 * s, va_list * args)
 }
 
 static void
-server_handle_rx (echo_main_t * em, session_event_t * e)
+echo_handle_rx (echo_main_t * em, session_event_t * e)
 {
   int n_read, max_dequeue, n_sent;
   u32 offset, to_dequeue;
@@ -1227,7 +1480,6 @@ server_handle_rx (echo_main_t * em, session_event_t * e)
    * app_recv_stream, we may end up with a lot of unhandled rx events on the
    * message queue */
   svm_fifo_unset_event (s->rx_fifo);
-
   max_dequeue = svm_fifo_max_dequeue (s->rx_fifo);
   if (PREDICT_FALSE (!max_dequeue))
     return;
@@ -1240,33 +1492,34 @@ server_handle_rx (echo_main_t * em, session_event_t * e)
       n_read = app_recv_stream_raw (s->rx_fifo, em->rx_buf, to_dequeue,
                                    0 /* clear evt */ , 0 /* peek */ );
 
-      if (n_read > 0)
-       {
-         if (em->test_return_packets)
-           test_recv_bytes (em, s, em->rx_buf, n_read);
-
-         max_dequeue -= n_read;
-         s->bytes_received += n_read;
-       }
-      else
+      if (n_read <= 0)
        break;
+      DBG ("Notify cause %u bytes", n_read);
+      notify_rx_data_to_vpp (s);
+      if (em->test_return_packets)
+       test_recv_bytes (em, s, em->rx_buf, n_read);
+
+      max_dequeue -= n_read;
+      s->bytes_received += n_read;
+      s->bytes_to_receive -= n_read;
 
       /* Reflect if a non-drop session */
-      if (!em->no_return && n_read > 0)
+      if (!em->no_return)
        {
          offset = 0;
          do
            {
              n_sent = app_send_stream ((app_session_t *) s,
-                                       &em->rx_buf[offset],
+                                       em->rx_buf + offset,
                                        n_read, SVM_Q_WAIT);
-             if (n_sent > 0)
-               {
-                 n_read -= n_sent;
-                 offset += n_sent;
-               }
+             if (n_sent <= 0)
+               continue;
+             n_read -= n_sent;
+             s->bytes_to_send -= n_sent;
+             s->bytes_sent += n_sent;
+             offset += n_sent;
            }
-         while ((n_sent <= 0 || n_read > 0) && !em->time_to_stop);
+         while (n_read > 0);
        }
     }
   while (max_dequeue > 0 && !em->time_to_stop);
@@ -1283,29 +1536,15 @@ server_handle_mq (echo_main_t * em)
       int rc = svm_msg_q_sub (em->our_event_queue, &msg, SVM_Q_TIMEDWAIT, 1);
       if (PREDICT_FALSE (rc == ETIMEDOUT && em->time_to_stop))
        break;
-      if (PREDICT_FALSE (em->time_to_print_stats == 1))
-       {
-         em->time_to_print_stats = 0;
-         fformat (stdout, "%d connections\n", pool_elts (em->sessions));
-       }
       if (rc == ETIMEDOUT)
        continue;
       e = svm_msg_q_msg_data (em->our_event_queue, &msg);
-      switch (e->event_type)
-       {
-       case SESSION_IO_EVT_RX:
-         DBG ("SESSION_IO_EVT_RX");
-         server_handle_rx (em, e);
-         break;
-       default:
-         handle_mq_event (e);
-         break;
-       }
+      handle_mq_event (em, e, em->state == STATE_READY /* handle_rx */ );
       svm_msg_q_free_msg (em->our_event_queue, &msg);
     }
 }
 
-void
+static void
 server_send_listen (echo_main_t * em)
 {
   vl_api_bind_uri_t *bmp;
@@ -1319,19 +1558,7 @@ server_send_listen (echo_main_t * em)
   vl_msg_api_send_shmem (em->vl_input_queue, (u8 *) & bmp);
 }
 
-int
-server_listen (echo_main_t * em)
-{
-  server_send_listen (em);
-  if (wait_for_state_change (em, STATE_READY))
-    {
-      clib_warning ("timeout waiting for STATE_READY");
-      return -1;
-    }
-  return 0;
-}
-
-void
+static void
 server_send_unbind (echo_main_t * em)
 {
   vl_api_unbind_uri_t *ump;
@@ -1345,7 +1572,7 @@ server_send_unbind (echo_main_t * em)
   vl_msg_api_send_shmem (em->vl_input_queue, (u8 *) & ump);
 }
 
-void
+static int
 server_run (echo_main_t * em)
 {
   echo_session_t *session;
@@ -1361,21 +1588,21 @@ server_run (echo_main_t * em)
     pool_put_index (em->sessions, i);
 
   if (application_attach (em))
-    return;
+    return -1;
 
   /* Bind to uri */
-  if (server_listen (em))
-    return;
+  server_send_listen (em);
+  if (wait_for_state_change (em, STATE_READY, 0))
+    return -2;
 
   /* Enter handle event loop */
   server_handle_mq (em);
 
   /* Cleanup */
   server_send_unbind (em);
-
   application_detach (em);
-
   fformat (stdout, "Test complete...\n");
+  return 0;
 }
 
 static void
@@ -1385,25 +1612,16 @@ vl_api_disconnect_session_reply_t_handler (vl_api_disconnect_session_reply_t *
   echo_main_t *em = &echo_main;
   uword *p;
   DBG ("Got disonnected reply for session handle %lu", mp->handle);
-
-  if (mp->retval)
-    {
-      clib_warning ("vpp complained about disconnect: %d",
-                   ntohl (mp->retval));
-      return;
-    }
-
   em->state = STATE_START;
 
   p = hash_get (em->session_index_by_vpp_handles, mp->handle);
   if (p)
-    {
-      hash_unset (em->session_index_by_vpp_handles, mp->handle);
-    }
+    hash_unset (em->session_index_by_vpp_handles, mp->handle);
   else
-    {
-      clib_warning ("couldn't find session key %llx", mp->handle);
-    }
+    clib_warning ("couldn't find session key %llx", mp->handle);
+
+  if (mp->retval)
+    clib_warning ("vpp complained about disconnect: %d", ntohl (mp->retval));
 }
 
 static void
@@ -1446,40 +1664,44 @@ quic_echo_api_hookup (echo_main_t * em)
 #undef _
 }
 
-int
-main (int argc, char **argv)
+static void
+print_usage_and_exit (void)
+{
+  fprintf (stderr,
+          "quic_echo [socket-name SOCKET] [client|server] [uri URI] [OPTIONS]\n"
+          "\n"
+          "  socket-name PATH    Specify the binary socket path to connect to VPP\n"
+          "  use-svm-api         Use SVM API to connect to VPP\n"
+          "  test-bytes[:assert] Check data correctness when receiving (assert fails on first error)\n"
+          "  fifo-size N         Use N Kb fifos\n"
+          "  appns NAMESPACE     Use the namespace NAMESPACE\n"
+          "  all-scope           all-scope option\n"
+          "  local-scope         local-scope option\n"
+          "  global-scope        global-scope option\n"
+          "  secret SECRET       set namespace secret\n"
+          "  chroot prefix PATH  Use PATH as memory root path\n"
+          "  quic-setup OPT      OPT=serverstream : Client open N connections. On each one server opens M streams\n"
+          "                            by default : Client open N connections. On each one client opens M streams\n"
+          "\n"
+          "  no-return            Drop the data when received, dont reply\n"
+          "  nclients N[/M]       Open N QUIC connections, each one with M streams (M defaults to 1)\n"
+          "  send N[Kb|Mb|GB]     Send N [K|M|G]bytes\n"
+          "  recv N[Kb|Mb|GB]     Expect N [K|M|G]bytes\n"
+          "  nclients N[/M]       Open N QUIC connections, each one with M streams (M defaults to 1)\n");
+  exit (1);
+}
+
+
+void
+quic_echo_process_opts (int argc, char **argv)
 {
-  int i_am_server = 1, test_return_packets = 0;
   echo_main_t *em = &echo_main;
-  fifo_segment_main_t *sm = &em->segment_main;
   unformat_input_t _argv, *a = &_argv;
+  u32 tmp;
   u8 *chroot_prefix;
   u8 *uri = 0;
-  u8 *bind_uri = (u8 *) "quic://0.0.0.0/1234";
-  u8 *connect_uri = (u8 *) "quic://6.0.1.1/1234";
-  u64 bytes_to_send = 64 << 10, mbytes;
-  char *app_name;
-  u32 tmp;
-
-  clib_mem_init_thread_safe (0, 256 << 20);
-
-  clib_memset (em, 0, sizeof (*em));
-  em->session_index_by_vpp_handles = hash_create (0, sizeof (uword));
-  em->shared_segment_handles = hash_create (0, sizeof (uword));
-  clib_spinlock_init (&em->segment_handles_lock);
-  em->my_pid = getpid ();
-  em->socket_name = 0;
-  em->use_sock_api = 1;
-  em->fifo_size = 64 << 10;
-  em->n_clients = 1;
-  em->max_test_msg = 50;
-  em->quic_streams = 1;
 
-  clib_time_init (&em->clib_time);
-  init_error_string_table (em);
-  fifo_segment_main_init (sm, HIGH_SEGMENT_BASEVA, 20);
   unformat_init_command_line (a, argv);
-
   while (unformat_check_input (a) != UNFORMAT_END_OF_INPUT)
     {
       if (unformat (a, "chroot prefix %s", &chroot_prefix))
@@ -1487,33 +1709,27 @@ main (int argc, char **argv)
          vl_set_memory_root_path ((char *) chroot_prefix);
        }
       else if (unformat (a, "uri %s", &uri))
-       ;
+       em->uri = format (0, "%s%c", uri, 0);
       else if (unformat (a, "server"))
-       i_am_server = 1;
+       em->i_am_master = 1;
       else if (unformat (a, "client"))
-       i_am_server = 0;
+       em->i_am_master = 0;
       else if (unformat (a, "no-return"))
        em->no_return = 1;
+      else if (unformat (a, "test-bytes:assert"))
+       em->test_return_packets = RETURN_PACKETS_ASSERT;
       else if (unformat (a, "test-bytes"))
-       test_return_packets = 1;
-      else if (unformat (a, "bytes %lld", &mbytes))
-       {
-         bytes_to_send = mbytes;
-       }
-      else if (unformat (a, "mbytes %lld", &mbytes))
-       {
-         bytes_to_send = mbytes << 20;
-       }
-      else if (unformat (a, "gbytes %lld", &mbytes))
-       {
-         bytes_to_send = mbytes << 30;
-       }
+       em->test_return_packets = RETURN_PACKETS_LOG_WRONG;
       else if (unformat (a, "socket-name %s", &em->socket_name))
        ;
       else if (unformat (a, "use-svm-api"))
        em->use_sock_api = 0;
       else if (unformat (a, "fifo-size %d", &tmp))
        em->fifo_size = tmp << 10;
+      else
+       if (unformat
+           (a, "nclients %d/%d", &em->n_clients, &em->n_stream_clients))
+       ;
       else if (unformat (a, "nclients %d", &em->n_clients))
        ;
       else if (unformat (a, "appns %_%v%_", &em->appns_id))
@@ -1527,41 +1743,71 @@ main (int argc, char **argv)
        em->appns_flags = APP_OPTIONS_FLAGS_USE_GLOBAL_SCOPE;
       else if (unformat (a, "secret %lu", &em->appns_secret))
        ;
-      else if (unformat (a, "quic-streams %d", &em->quic_streams))
+      else if (unformat (a, "quic-setup %U", echo_unformat_quic_setup_vft))
        ;
       else
-       {
-         fformat (stderr, "%s: usage [master|slave]\n", argv[0]);
-         exit (1);
-       }
+       if (unformat (a, "send %U", echo_unformat_data, &em->bytes_to_send))
+       ;
+      else
+       if (unformat
+           (a, "recv %U", echo_unformat_data, &em->bytes_to_receive))
+       ;
+      else if (unformat (a, "time %U:%U",
+                        echo_unformat_timing_event, &em->timing_start_event,
+                        echo_unformat_timing_event, &em->timing_end_event))
+       ;
+      else
+       print_usage_and_exit ();
     }
+}
 
-  if (!em->socket_name)
-    em->socket_name = format (0, "%s%c", API_SOCKET_FILE, 0);
-
-  if (uri)
-    {
-      em->uri = format (0, "%s%c", uri, 0);
-      em->connect_uri = format (0, "%s%c", uri, 0);
-    }
-  else
-    {
-      em->uri = format (0, "%s%c", bind_uri, 0);
-      em->connect_uri = format (0, "%s%c", connect_uri, 0);
-    }
+int
+main (int argc, char **argv)
+{
+  echo_main_t *em = &echo_main;
+  fifo_segment_main_t *sm = &em->segment_main;
+  char *app_name;
+  int i, rv;
+  u32 n_clients;
 
-  em->i_am_master = i_am_server;
-  em->test_return_packets = test_return_packets;
-  em->bytes_to_send = bytes_to_send;
+  clib_mem_init_thread_safe (0, 256 << 20);
+  clib_memset (em, 0, sizeof (*em));
+  em->session_index_by_vpp_handles = hash_create (0, sizeof (uword));
+  em->shared_segment_handles = hash_create (0, sizeof (uword));
+  em->my_pid = getpid ();
+  em->socket_name = format (0, "%s%c", API_SOCKET_FILE, 0);
+  em->use_sock_api = 1;
+  em->fifo_size = 64 << 10;
+  em->n_clients = 1;
+  em->n_stream_clients = 1;
+  em->max_test_msg = 50;
   em->time_to_stop = 0;
+  em->i_am_master = 1;
+  em->test_return_packets = RETURN_PACKETS_NOTEST;
+  em->timing_start_event = ECHO_EVT_FIRST_QCONNECT;
+  em->timing_end_event = ECHO_EVT_LAST_BYTE;
+  em->bytes_to_receive = 64 << 10;
+  em->bytes_to_send = 64 << 10;
+  em->uri = format (0, "%s%c", "quic://0.0.0.0/1234", 0);
+  em->cb_vft = default_cb_vft;
+  quic_echo_process_opts (argc, argv);
+
+  n_clients = em->n_clients * em->n_stream_clients;
+  vec_validate (em->client_thread_handles, n_clients - 1);
+  vec_validate (em->thread_args, n_clients - 1);
+  clib_time_init (&em->clib_time);
+  init_error_string_table (em);
+  fifo_segment_main_init (sm, HIGH_SEGMENT_BASEVA, 20);
+  clib_spinlock_init (&em->segment_handles_lock);
   vec_validate (em->rx_buf, 4 << 20);
-  vec_validate (em->client_thread_handles, em->n_clients - 1);
-  vec_validate (em->thread_args, em->n_clients - 1);
+  vec_validate (em->connect_test_data, 1024 * 1024 - 1);
+  for (i = 0; i < vec_len (em->connect_test_data); i++)
+    em->connect_test_data[i] = i & 0xff;
 
   setup_signal_handlers ();
   quic_echo_api_hookup (em);
 
-  app_name = i_am_server ? "quic_echo_server" : "quic_echo_client";
+  app_name = em->i_am_master ? "quic_echo_server" : "quic_echo_client";
   if (connect_to_vpp (app_name) < 0)
     {
       svm_region_exit ();
@@ -1569,14 +1815,19 @@ main (int argc, char **argv)
       exit (1);
     }
 
-  if (i_am_server == 0)
-    clients_run (em);
+  quic_echo_notify_event (em, ECHO_EVT_START);
+  if (em->i_am_master)
+    rv = server_run (em);
   else
-    server_run (em);
+    rv = clients_run (em);
+  if (rv)
+    exit (rv);
+  quic_echo_notify_event (em, ECHO_EVT_EXIT);
+  print_global_stats (em);
 
   /* Make sure detach finishes */
-  wait_for_state_change (em, STATE_DETACHED);
-
+  if (wait_for_state_change (em, STATE_DETACHED, TIMEOUT))
+    exit (-1);
   disconnect_from_vpp (em);
   exit (0);
 }
index 21f2fd7..2bcbcf3 100644 (file)
@@ -8,51 +8,54 @@ import signal
 from framework import VppTestCase, VppTestRunner, running_extended_tests, \
     Worker
 from vpp_ip_route import VppIpTable, VppIpRoute, VppRoutePath
+from threading import Event
 
 
 class QUICAppWorker(Worker):
     """ QUIC Test Application Worker """
+    process = None
 
-    def __init__(self, build_dir, appname, args, logger, env={}):
+    def __init__(self, build_dir, appname, args, logger, env={}, event=None):
         app = "%s/vpp/bin/%s" % (build_dir, appname)
         self.args = [app] + args
+        self.event = event
         super(QUICAppWorker, self).__init__(self.args, logger, env)
 
+    def run(self):
+        super(QUICAppWorker, self).run()
+        if self.event:
+            self.event.set()
 
-class QUICTestCase(VppTestCase):
-    """ QUIC Test Case """
+    def teardown(self, logger, timeout):
+        if self.process is None:
+           return False
+        try:
+            logger.debug("Killing worker process (pid %d)" % self.process.pid)
+            os.killpg(os.getpgid(self.process.pid), signal.SIGKILL)
+            self.join(timeout)
+        except OSError as e:
+            logger.debug("Couldn't kill worker process")
+            return True
+        return False
 
-    @classmethod
-    def setUpClass(cls):
-        super(QUICTestCase, cls).setUpClass()
 
-    @classmethod
-    def tearDownClass(cls):
-        super(QUICTestCase, cls).tearDownClass()
+class QUICTestCase(VppTestCase):
+    """ QUIC Test Case """
 
     def setUp(self):
+        super(QUICTestCase, self).setUp()
         var = "VPP_BUILD_DIR"
         self.build_dir = os.getenv(var, None)
         if self.build_dir is None:
             raise Exception("Environment variable `%s' not set" % var)
         self.vppDebug = 'vpp_debug' in self.build_dir
         self.timeout = 20
-        self.pre_test_sleep = 0.3
-        self.post_test_sleep = 0.3
         self.vapi.session_enable_disable(is_enabled=1)
-
-    def tearDown(self):
-        self.vapi.session_enable_disable(is_enabled=0)
-
-    def thru_host_stack_ipv4_setup(self):
-        super(QUICTestCase, self).setUp()
+        self.pre_test_sleep = 0.3
+        self.post_test_sleep = 0.2
 
         self.create_loopback_interfaces(2)
         self.uri = "quic://%s/1234" % self.loop0.local_ip4
-        common_args = ["uri", self.uri, "fifo-size", "64"]
-        self.server_echo_test_args = common_args + ["appns", "server"]
-        self.client_echo_test_args = common_args + ["appns", "client",
-                                                    "test-bytes"]
         table_id = 1
         for i in self.lo_interfaces:
             i.admin_up()
@@ -84,7 +87,8 @@ class QUICTestCase(VppTestCase):
         self.ip_t10.add_vpp_config()
         self.logger.debug(self.vapi.cli("show ip fib"))
 
-    def thru_host_stack_ipv4_tear_down(self):
+    def tearDown(self):
+        self.vapi.session_enable_disable(is_enabled=0)
         # Delete inter-table routes
         self.ip_t01.remove_vpp_config()
         self.ip_t10.remove_vpp_config()
@@ -93,212 +97,123 @@ class QUICTestCase(VppTestCase):
             i.unconfig_ip4()
             i.set_table_ip4(0)
             i.admin_down()
+        super(QUICTestCase, self).tearDown()
 
-    def start_internal_echo_server(self, args):
-        error = self.vapi.cli("test echo server %s" % ' '.join(args))
+
+class QUICEchoInternalTestCase(QUICTestCase):
+    """QUIC Echo Internal Test Case"""
+    def setUp(self):
+        super(QUICEchoInternalTestCase, self).setUp()
+        self.client_args = "uri %s fifo-size 64 test-bytes appns client" % self.uri
+        self.server_args = "uri %s fifo-size 64 appns server" % self.uri
+
+    def server(self, *args):
+        error = self.vapi.cli("test echo server %s %s" % (self.server_args, ' '.join(args)))
         if error:
             self.logger.critical(error)
             self.assertNotIn("failed", error)
 
-    def start_internal_echo_client(self, args):
-        error = self.vapi.cli("test echo client %s" % ' '.join(args))
+    def client(self, *args):
+        error = self.vapi.cli("test echo client %s %s" % (self.client_args, ' '.join(args)))
         if error:
             self.logger.critical(error)
             self.assertNotIn("failed", error)
 
-    def internal_ipv4_transfer_test(self, server_args, client_args):
-        self.start_internal_echo_server(server_args)
-        self.start_internal_echo_client(client_args)
-
-    def start_external_echo_server(self, args):
-        self.worker_server = QUICAppWorker(self.build_dir, "quic_echo",
-                                           args, self.logger)
-        self.worker_server.start()
-
-    def start_external_echo_client(self, args):
-        self.client_echo_test_args += "use-svm-api"
-        self.worker_client = QUICAppWorker(self.build_dir, "quic_echo",
-                                           args, self.logger)
-        self.worker_client.start()
-        self.worker_client.join(self.timeout)
-        try:
-            self.validateExternalTestResults()
-        except Exception as error:
-            self.fail("Failed with %s" % error)
-
-    def external_ipv4_transfer_test(self, server_args, client_args):
-        self.start_external_echo_server(server_args)
-        self.sleep(self.pre_test_sleep)
-        self.start_external_echo_client(client_args)
-        self.sleep(self.post_test_sleep)
-
-    def validateExternalTestResults(self):
-        if os.path.isdir('/proc/{}'.format(self.worker_server.process.pid)):
-            self.logger.info("Killing server worker process (pid %d)" %
-                             self.worker_server.process.pid)
-            os.killpg(os.getpgid(self.worker_server.process.pid),
-                      signal.SIGTERM)
-            self.worker_server.join()
-        self.logger.info("Client worker result is `%s'" %
-                         self.worker_client.result)
-        error = False
-        if self.worker_client.result is None:
-            try:
-                error = True
-                self.logger.error(
-                    "Timeout: %ss! Killing client worker process (pid %d)" %
-                    (self.timeout, self.worker_client.process.pid))
-                os.killpg(os.getpgid(self.worker_client.process.pid),
-                          signal.SIGKILL)
-                self.worker_client.join()
-            except OSError:
-                self.logger.debug(
-                    "Couldn't kill client worker process")
-                raise
-        if error:
-            raise Exception(
-                "Timeout! Client worker did not finish in %ss" % self.timeout)
-        self.assert_equal(self.worker_client.result, 0,
-                          "Binary test return code")
-
-
-class QUICInternalEchoIPv4TestCase(QUICTestCase):
-    """ QUIC Internal Echo IPv4 Transfer Test Cases """
-
-    @classmethod
-    def setUpClass(cls):
-        super(QUICInternalEchoIPv4TestCase, cls).setUpClass()
-
-    @classmethod
-    def tearDownClass(cls):
-        super(QUICInternalEchoIPv4TestCase, cls).tearDownClass()
-
-    def setUp(self):
-        super(QUICInternalEchoIPv4TestCase, self).setUp()
-        self.thru_host_stack_ipv4_setup()
-
-    def tearDown(self):
-        super(QUICInternalEchoIPv4TestCase, self).tearDown()
-        self.thru_host_stack_ipv4_tear_down()
-
-    def show_commands_at_teardown(self):
-        self.logger.debug(self.vapi.cli("show session verbose 2"))
-
+class QUICEchoInternalTransferTestCase(QUICEchoInternalTestCase):
+    """QUIC Echo Internal Transfer Test Case"""
     @unittest.skipUnless(running_extended_tests, "part of extended tests")
     def test_quic_internal_transfer(self):
-        """ QUIC internal echo client/server transfer """
-
-        self.internal_ipv4_transfer_test(self.server_echo_test_args,
-                                         self.client_echo_test_args +
-                                         ["no-output", "mbytes", "10"])
-
-
-class QUICInternalSerialEchoIPv4TestCase(QUICTestCase):
-    """ QUIC Internal Serial Echo IPv4 Transfer Test Cases """
-
-    @classmethod
-    def setUpClass(cls):
-        super(QUICInternalSerialEchoIPv4TestCase, cls).setUpClass()
-
-    @classmethod
-    def tearDownClass(cls):
-        super(QUICInternalSerialEchoIPv4TestCase, cls).tearDownClass()
-
-    def setUp(self):
-        super(QUICInternalSerialEchoIPv4TestCase, self).setUp()
-        self.thru_host_stack_ipv4_setup()
-
-    def tearDown(self):
-        super(QUICInternalSerialEchoIPv4TestCase, self).tearDown()
-        self.thru_host_stack_ipv4_tear_down()
-
-    def show_commands_at_teardown(self):
-        self.logger.debug(self.vapi.cli("show session verbose 2"))
+        self.server()
+        self.client("no-output", "mbytes", "10")
 
+class QUICEchoInternalSerialTestCase(QUICEchoInternalTestCase):
+    """QUIC Echo Internal Serial Transfer Test Case"""
     @unittest.skipUnless(running_extended_tests, "part of extended tests")
     def test_quic_serial_internal_transfer(self):
-        """ QUIC serial internal echo client/server transfer """
-
-        client_args = (self.client_echo_test_args +
-                       ["no-output", "mbytes", "10"])
-        self.internal_ipv4_transfer_test(self.server_echo_test_args,
-                                         client_args)
-        self.start_internal_echo_client(client_args)
-        self.start_internal_echo_client(client_args)
-        self.start_internal_echo_client(client_args)
-        self.start_internal_echo_client(client_args)
-
-
-class QUICInternalEchoIPv4MultiStreamTestCase(QUICTestCase):
-    """ QUIC Internal Echo IPv4 Transfer Test Cases """
-
-    @classmethod
-    def setUpClass(cls):
-        super(QUICInternalEchoIPv4MultiStreamTestCase, cls).setUpClass()
-
-    @classmethod
-    def tearDownClass(cls):
-        super(QUICInternalEchoIPv4MultiStreamTestCase, cls).tearDownClass()
-
-    def setUp(self):
-        super(QUICInternalEchoIPv4MultiStreamTestCase, self).setUp()
-        self.thru_host_stack_ipv4_setup()
-
-    def tearDown(self):
-        super(QUICInternalEchoIPv4MultiStreamTestCase, self).tearDown()
-        self.thru_host_stack_ipv4_tear_down()
-
-    def show_commands_at_teardown(self):
-        self.logger.debug(self.vapi.cli("show session verbose 2"))
-
+        self.server()
+        self.client("no-output", "mbytes", "10")
+        self.client("no-output", "mbytes", "10")
+        self.client("no-output", "mbytes", "10")
+        self.client("no-output", "mbytes", "10")
+        self.client("no-output", "mbytes", "10")
+
+class QUICEchoInternalMStreamTestCase(QUICEchoInternalTestCase):
+    """QUIC Echo Internal MultiStream Test Case"""
     @unittest.skipUnless(running_extended_tests, "part of extended tests")
     def test_quic_internal_multistream_transfer(self):
-        """ QUIC internal echo client/server multi-stream transfer """
+        self.server()
+        self.client("nclients", "10", "mbytes", "1", "no-output")
 
-        self.internal_ipv4_transfer_test(self.server_echo_test_args,
-                                         self.client_echo_test_args +
-                                         ["quic-streams", "10",
-                                          "mbytes", "1",
-                                          "no-output"])
 
+class QUICEchoExternalTestCase(QUICTestCase):
+    extra_vpp_punt_config = ["session", "{", "evt_qs_memfd_seg", "}"]
+    quic_setup = "default"
 
-class QUICExternalEchoIPv4TestCase(QUICTestCase):
-    """ QUIC External Echo IPv4 Transfer Test Cases """
+    def setUp(self):
+        super(QUICEchoExternalTestCase, self).setUp()
+        common_args = ["uri", self.uri, "fifo-size", "64", "test-bytes:assert", "socket-name", self.api_sock]
+        self.server_echo_test_args = common_args + ["server", "appns", "server", "quic-setup", self.quic_setup]
+        self.client_echo_test_args = common_args + ["client", "appns", "client", "quic-setup", self.quic_setup]
+        self.event = Event()
+
+    def server(self, *args):
+        _args = self.server_echo_test_args + list(args)
+        self.worker_server = QUICAppWorker(self.build_dir, "quic_echo",
+                                           _args, self.logger, event=self.event)
+        self.worker_server.start()
+        self.sleep(self.pre_test_sleep)
 
-    @classmethod
-    def setUpConstants(cls):
-        super(QUICExternalEchoIPv4TestCase, cls).setUpConstants()
-        cls.vpp_cmdline.extend(["session", "{", "evt_qs_memfd_seg", "}"])
+    def client(self, *args):
+        _args = self.client_echo_test_args + list(args)
+        # self.client_echo_test_args += "use-svm-api"
+        self.worker_client = QUICAppWorker(self.build_dir, "quic_echo",
+                                           _args, self.logger, event=self.event)
+        self.worker_client.start()
+        self.event.wait(self.timeout)
+        self.sleep(self.post_test_sleep)
 
-    @classmethod
-    def setUpClass(cls):
-        super(QUICExternalEchoIPv4TestCase, cls).setUpClass()
+    def validate_external_test_results(self):
+        self.logger.info("Client worker result is `%s'" % self.worker_client.result)
+        server_result = self.worker_server.result
+        client_result = self.worker_client.result
+        server_kill_error = False
+        if self.worker_server.result is None:
+            server_kill_error = self.worker_server.teardown(self.logger, self.timeout)
+        if self.worker_client.result is None:
+            self.worker_client.teardown(self.logger, self.timeout)
+        self.assertIsNone(server_result, "Wrong server worker return code")
+        self.assertIsNotNone(client_result, "Timeout! Client worker did not finish in %ss" % self.timeout)
+        self.assertEqual(client_result, 0, "Wrong client worker return code")
+        self.assertFalse(server_kill_error, "Server kill errored")
 
-    @classmethod
-    def tearDownClass(cls):
-        super(QUICExternalEchoIPv4TestCase, cls).tearDownClass()
 
-    def setUp(self):
-        super(QUICExternalEchoIPv4TestCase, self).setUp()
-        self.thru_host_stack_ipv4_setup()
+class QUICEchoExternalTransferTestCase(QUICEchoExternalTestCase):
+    """QUIC Echo External Transfer Test Case"""
+    @unittest.skipUnless(running_extended_tests, "part of extended tests")
+    def test_quic_external_transfer(self):
+        self.server()
+        self.client()
+        self.validate_external_test_results()
 
-    def tearDown(self):
-        super(QUICExternalEchoIPv4TestCase, self).tearDown()
-        self.thru_host_stack_ipv4_tear_down()
+class QUICEchoExternalServerStreamTestCase(QUICEchoExternalTestCase):
+    """QUIC Echo External Transfer Server Stream Test Case"""
+    quic_setup = "serverstream"
+
+    @unittest.skipUnless(running_extended_tests, "part of extended tests")
+    def test_quic_external_transfer_server_stream(self):
+        self.server("nclients", "1/1", "send", "1Kb", "recv", "0")
+        self.client("nclients" ,"1/1", "send", "0", "recv", "1Kb")
+        self.validate_external_test_results()
 
-    def show_commands_at_teardown(self):
-        self.logger.debug(self.vapi.cli("show session verbose 2"))
+class QUICEchoExternalServerStreamWorkersTestCase(QUICEchoExternalTestCase):
+    """QUIC Echo External Transfer Server Stream MultiWorker Test Case"""
+    quic_setup = "serverstream"
 
     @unittest.skipUnless(running_extended_tests, "part of extended tests")
-    def test_quic_external_transfer(self):
-        """ QUIC external echo client/server transfer """
-
-        self.external_ipv4_transfer_test(self.server_echo_test_args +
-                                         ["socket-name", self.api_sock,
-                                          "server"],
-                                         self.client_echo_test_args +
-                                         ["socket-name", self.api_sock,
-                                          "client", "mbytes", "10"])
+    def test_quic_external_transfer_server_stream_multi_workers(self):
+        self.server("nclients", "4/4", "send", "1Kb", "recv", "0")
+        self.client("nclients", "4/4", "send", "0", "recv", "1Kb")
+        self.validate_external_test_results()
 
 
 if __name__ == '__main__':