quic: echo thread can handle multiple sessions 45/20545/4
authorNathan Skrzypczak <nathan.skrzypczak@gmail.com>
Mon, 8 Jul 2019 16:18:27 +0000 (18:18 +0200)
committerDave Wallace <dwallacelf@gmail.com>
Tue, 9 Jul 2019 17:07:22 +0000 (17:07 +0000)
Type: feature

Change-Id: Ibb60d5b46aafe109a81a8604712a917f6e246eaf
Signed-off-by: Nathan Skrzypczak <nathan.skrzypczak@gmail.com>
src/plugins/hs_apps/sapi/quic_echo.c
src/plugins/quic/quic.c

index 3cbb663..3be513a 100644 (file)
     clib_warning ("ECHO-ERROR: "_fmt, ##_args);        \
   }
 
-typedef enum echo_session_flag_
-{
-  SESSION_FLAG_NOFLAG = 0,
-  SESSION_FLAG_SHOULD_CLOSE = 1,
-  SESSION_FLAG_SHOULD_FREE = 2,
-} echo_session_flag_t;
-
 typedef struct
 {
   CLIB_CACHE_LINE_ALIGN_MARK (cacheline0);
@@ -78,8 +71,8 @@ typedef struct
   volatile u64 bytes_to_receive;
   f64 start;
   u32 listener_index;          /* listener index in echo session pool */
+  u32 idle_cycles;             /* consecutive enq/deq with no data */
   volatile u64 accepted_session_count; /* sessions we accepted */
-  volatile echo_session_flag_t flags;
 } echo_session_t;
 
 typedef enum
@@ -90,6 +83,14 @@ typedef enum
   ECHO_INVALID_DATA_SOURCE
 } data_source_t;
 
+enum echo_close_f_t
+{
+  ECHO_CLOSE_F_INVALID = 0,
+  ECHO_CLOSE_F_PASSIVE,                /* wait for close msg */
+  ECHO_CLOSE_F_ACTIVE,         /* send close msg */
+  ECHO_CLOSE_F_NONE,           /* don't bother sending close msg */
+};
+
 enum quic_session_type_t
 {
   QUIC_SESSION_TYPE_QUIC,
@@ -97,6 +98,14 @@ enum quic_session_type_t
   QUIC_SESSION_TYPE_LISTEN,
 };
 
+enum quic_session_state_t
+{
+  QUIC_SESSION_STATE_INITIAL,
+  QUIC_SESSION_STATE_AWAIT_CLOSING,    /* Data transfer is done, wait for close evt */
+  QUIC_SESSION_STATE_AWAIT_DATA,       /* Peer closed, wait for outstanding data */
+  QUIC_SESSION_STATE_CLOSING,  /* told vpp to close */
+  QUIC_SESSION_STATE_CLOSED,   /* closed in vpp */
+};
 
 typedef enum
 {
@@ -194,7 +203,8 @@ typedef struct
   u32 rx_buf_size;
   u32 tx_buf_size;
   data_source_t data_source;
-  u8 send_disconnects;         /* actively send disconnect */
+  u8 send_quic_disconnects;    /* actively send disconnect */
+  u8 send_stream_disconnects;  /* actively send disconnect */
 
   u8 *appns_id;
   u64 appns_flags;
@@ -206,6 +216,7 @@ typedef struct
   u32 n_stream_clients;                /* Target Number of STREAM sessions per QUIC session */
   volatile u32 n_quic_clients_connected;       /* Number of connected QUIC sessions */
   volatile u32 n_clients_connected;    /* Number of STREAM sessions connected */
+  u32 n_rx_threads;            /* Number of data threads */
 
   u64 tx_total;
   u64 rx_total;
@@ -370,6 +381,21 @@ format_api_error (u8 * s, va_list * args)
   return s;
 }
 
+static uword
+unformat_close (unformat_input_t * input, va_list * args)
+{
+  u8 *a = va_arg (*args, u8 *);
+  if (unformat (input, "Y"))
+    *a = ECHO_CLOSE_F_ACTIVE;
+  else if (unformat (input, "N"))
+    *a = ECHO_CLOSE_F_NONE;
+  else if (unformat (input, "W"))
+    *a = ECHO_CLOSE_F_PASSIVE;
+  else
+    return 0;
+  return 1;
+}
+
 static uword
 unformat_data (unformat_input_t * input, va_list * args)
 {
@@ -458,6 +484,18 @@ init_error_string_table (echo_main_t * em)
  *
  */
 
+static echo_session_t *
+echo_session_alloc (echo_main_t * em)
+{
+  echo_session_t *session;
+  pool_get (em->sessions, session);
+  clib_memset (session, 0, sizeof (*session));
+  session->session_index = session - em->sessions;
+  session->listener_index = SESSION_INVALID_INDEX;
+  session->session_state = QUIC_SESSION_STATE_INITIAL;
+  return session;
+}
+
 /*
  *
  *  Session API Calls
@@ -754,7 +792,7 @@ echo_free_sessions (echo_main_t * em)
   /* *INDENT-OFF* */
   pool_foreach (s, em->sessions,
   ({
-    if (s->flags & SESSION_FLAG_SHOULD_FREE)
+    if (s->session_state == QUIC_SESSION_STATE_CLOSED)
       vec_add1 (session_indexes, s->session_index);}
   ));
   /* *INDENT-ON* */
@@ -771,17 +809,19 @@ echo_free_sessions (echo_main_t * em)
 static void
 echo_cleanup_session (echo_main_t * em, echo_session_t * s)
 {
+  u64 c;
   echo_session_t *ls;
-  u64 accepted_session_count;
   if (s->listener_index != SESSION_INVALID_INDEX)
     {
       ls = pool_elt_at_index (em->sessions, s->listener_index);
-      accepted_session_count =
-       clib_atomic_sub_fetch (&ls->accepted_session_count, 1);
-      if (accepted_session_count == 0
-         && ls->session_type != QUIC_SESSION_TYPE_LISTEN
-         && em->send_disconnects)
-       echo_disconnect_session (em, ls);
+      c = clib_atomic_sub_fetch (&ls->accepted_session_count, 1);
+      if (c == 0 && ls->session_type == QUIC_SESSION_TYPE_QUIC)
+       {
+         if (em->send_quic_disconnects == ECHO_CLOSE_F_ACTIVE)
+           echo_disconnect_session (em, ls);
+         else if (em->send_quic_disconnects == ECHO_CLOSE_F_NONE)
+           echo_cleanup_session (em, ls);
+       }
     }
   if (s->session_type == QUIC_SESSION_TYPE_QUIC)
     clib_atomic_sub_fetch (&em->n_quic_clients_connected, 1);
@@ -791,9 +831,21 @@ echo_cleanup_session (echo_main_t * em, echo_session_t * s)
   DBG ("Cleanup sessions (still %uQ %uS)", em->n_quic_clients_connected,
        em->n_clients_connected);
   hash_unset (em->session_index_by_vpp_handles, s->vpp_session_handle);
+  s->session_state = QUIC_SESSION_STATE_CLOSED;
+}
 
-  /* Mark session as to be freed */
-  s->flags |= SESSION_FLAG_SHOULD_FREE;
+static void
+echo_initiate_session_close (echo_main_t * em, echo_session_t * s, u8 active)
+{
+  if (s->session_type == QUIC_SESSION_TYPE_STREAM)
+    {
+      if (!active && s->bytes_to_receive)
+       s->session_state = QUIC_SESSION_STATE_AWAIT_DATA;
+      else
+       s->session_state = QUIC_SESSION_STATE_CLOSING;
+    }
+  else
+    echo_cleanup_session (em, s);      /* We can clean Q/Lsessions right away */
 }
 
 static void
@@ -831,7 +883,6 @@ recv_data_chunk (echo_main_t * em, echo_session_t * s, u8 * rx_buf)
   if (em->test_return_packets)
     test_recv_bytes (em, s, rx_buf, n_read);
 
-  ASSERT (s->bytes_to_receive >= n_read);
   s->bytes_received += n_read;
   s->bytes_to_receive -= n_read;
   return n_read;
@@ -865,68 +916,131 @@ mirror_data_chunk (echo_main_t * em, echo_session_t * s, u8 * tx_buf, u64 len)
 /*
  * Rx/Tx polling thread per connection
  */
+static void
+echo_thread_handle_data (echo_main_t * em, echo_session_t * s, u8 * rx_buf)
+{
+  int n_read, n_sent;
+
+  n_read = recv_data_chunk (em, s, rx_buf);
+  if (em->data_source == ECHO_TEST_DATA_SOURCE)
+    n_sent =
+      send_data_chunk (s, em->connect_test_data,
+                      s->bytes_sent % em->tx_buf_size, em->tx_buf_size);
+  else if (em->data_source == ECHO_RX_DATA_SOURCE)
+    n_sent = mirror_data_chunk (em, s, rx_buf, n_read);
+  else
+    n_sent = 0;
+  if (!s->bytes_to_send && !s->bytes_to_receive)
+    {
+      /* Session is done, need to close */
+      if (s->session_state == QUIC_SESSION_STATE_AWAIT_DATA)
+       s->session_state = QUIC_SESSION_STATE_CLOSING;
+      else
+       {
+         s->session_state = QUIC_SESSION_STATE_AWAIT_CLOSING;
+         if (em->send_stream_disconnects == ECHO_CLOSE_F_ACTIVE)
+           echo_disconnect_session (em, s);
+         else if (em->send_stream_disconnects == ECHO_CLOSE_F_NONE)
+           s->session_state = QUIC_SESSION_STATE_CLOSING;
+       }
+      return;
+    }
+
+  /* check idle clients */
+  if (n_sent || n_read)
+    s->idle_cycles = 0;
+  else if (s->idle_cycles++ == 1e7)
+    {
+      s->idle_cycles = 0;
+      DBG ("Idle client TX:%dB RX:%dB", s->bytes_to_send,
+          s->bytes_to_receive);
+      DBG ("Idle FIFOs  TX:%dB RX:%dB", svm_fifo_max_dequeue (s->tx_fifo),
+          svm_fifo_max_dequeue (s->rx_fifo));
+    }
+}
+
+static void
+echo_thread_handle_closing (echo_main_t * em, echo_session_t * s)
+{
+
+  DBG ("[%lu/%lu] -> S(%x) -> [%lu/%lu]",
+       s->bytes_received, s->bytes_received + s->bytes_to_receive,
+       s->session_index, s->bytes_sent, s->bytes_sent + s->bytes_to_send);
+  clib_atomic_fetch_add (&em->tx_total, s->bytes_sent);
+  clib_atomic_fetch_add (&em->rx_total, s->bytes_received);
+
+  if (PREDICT_FALSE (em->rx_total ==
+                    em->n_clients * em->n_stream_clients *
+                    em->bytes_to_receive))
+    quic_echo_notify_event (em, ECHO_EVT_LAST_BYTE);
+  echo_cleanup_session (em, s);
+}
+
 static void *
 echo_thread_fn (void *arg)
 {
   echo_main_t *em = &echo_main;
+  u32 thread_n_sessions = (u64) arg & 0xFFFFFFFF;
+  u32 session_first_idx = (u64) arg >> 32;
+
+  u32 i = 0;
+  u32 n_closed_sessions = 0;
+  u32 session_index;
   u8 *rx_buf = 0;
-  u32 session_index = *(u32 *) arg;
   echo_session_t *s;
-  int n_read, n_sent, idle_loop = 0;
   vec_validate (rx_buf, em->rx_buf_size);
 
   while (!em->time_to_stop && em->state != STATE_READY)
     ;
 
-  s = pool_elt_at_index (em->sessions, session_index);
-  while (!em->time_to_stop)
+  for (i = 0; !em->time_to_stop; i++)
     {
-      n_read = recv_data_chunk (em, s, rx_buf);
-      if (em->data_source == ECHO_TEST_DATA_SOURCE)
-       n_sent =
-         send_data_chunk (s, em->connect_test_data,
-                          s->bytes_sent % em->tx_buf_size, em->tx_buf_size);
-      else if (em->data_source == ECHO_RX_DATA_SOURCE)
-       n_sent = mirror_data_chunk (em, s, rx_buf, n_read);
-      else
-       n_sent = 0;
-      if (!s->bytes_to_send && !s->bytes_to_receive)
-       break;
-      if (s->flags & SESSION_FLAG_SHOULD_CLOSE)
+      if (i % thread_n_sessions == 0)
+       n_closed_sessions = 0;
+      session_index =
+       em->thread_args[session_first_idx + i % thread_n_sessions];
+      s = pool_elt_at_index (em->sessions, session_index);
+      if (s->session_state == QUIC_SESSION_STATE_INITIAL
+         || s->session_state == QUIC_SESSION_STATE_AWAIT_DATA)
+       echo_thread_handle_data (em, s, rx_buf);
+      else if (s->session_state == QUIC_SESSION_STATE_CLOSING)
+       echo_thread_handle_closing (em, s);
+      else if (s->session_state == QUIC_SESSION_STATE_CLOSED)
+       n_closed_sessions++;
+      if (n_closed_sessions == thread_n_sessions)
        break;
-
-      /* check idle clients */
-      if (!n_sent & !n_read)
-       idle_loop++;
-      else
-       idle_loop = 0;
-      if (idle_loop == 1e7)
-       {
-         DBG ("Idle client TX:%dB RX:%dB", s->bytes_to_send,
-              s->bytes_to_receive);
-         DBG ("Idle FIFOs  TX:%dB RX:%dB", svm_fifo_max_dequeue (s->tx_fifo),
-              svm_fifo_max_dequeue (s->rx_fifo));
-       }
     }
-  DBG ("[%lu/%lu] -> S(%x) -> [%lu/%lu]",
-       s->bytes_received, s->bytes_received + s->bytes_to_receive,
-       session_index, s->bytes_sent, s->bytes_sent + s->bytes_to_send);
-  em->tx_total += s->bytes_sent;
-  em->rx_total += s->bytes_received;
-  clib_warning ("%d/%d", em->rx_total,
-               em->n_clients * em->n_stream_clients * em->bytes_to_receive);
-  if (em->rx_total ==
-      em->n_clients * em->n_stream_clients * em->bytes_to_receive)
-    quic_echo_notify_event (em, ECHO_EVT_LAST_BYTE);
-  if (em->send_disconnects)
-    echo_disconnect_session (em, s);
-  else
+  pthread_exit (0);
+}
+
+static int
+echo_start_rx_thread (u32 session_index)
+{
+  /* Each thread owns n consecutive sessions of the total N */
+
+  echo_main_t *em = &echo_main;
+  u32 N = em->n_clients * em->n_stream_clients;
+  u32 nc, n, first_idx, thread_sessions;
+
+  n = (N + em->n_rx_threads - 1) / em->n_rx_threads;
+  nc = em->n_clients_connected;
+
+  ASSERT (nc + 1 <= N);
+  em->thread_args[nc] = session_index;
+
+  if ((nc + 1) % n == 0 || nc + 1 == N)
     {
-      while (!(s->flags & SESSION_FLAG_SHOULD_CLOSE) & !em->time_to_stop)
-       ;
-      echo_cleanup_session (em, s);
+      first_idx = n * (nc / n);
+      thread_sessions = (nc + 1) % n == 0 ? n : (nc + 1) % n;
+      DBG ("Start thread %u [%u -> %u]", nc / n, first_idx,
+          first_idx + thread_sessions - 1);
+      return pthread_create (&em->client_thread_handles[nc / n],
+                            NULL /*attr */ , echo_thread_fn,
+                            (void *) ((u64) first_idx << 32 | (u64)
+                                      thread_sessions));
     }
-  pthread_exit (0);
+
+  return 0;
 }
 
 static void
@@ -934,8 +1048,6 @@ session_bound_handler (session_bound_msg_t * mp)
 {
   echo_main_t *em = &echo_main;
   echo_session_t *listen_session;
-  u32 session_index;
-
   if (mp->retval)
     {
       ECHO_FAIL ("bind failed: %U", format_api_error,
@@ -948,14 +1060,13 @@ session_bound_handler (session_bound_msg_t * mp)
                clib_net_to_host_u16 (mp->lcl_port));
 
   /* Allocate local session and set it up */
-  pool_get (em->sessions, listen_session);
+  listen_session = echo_session_alloc (em);
   listen_session->session_type = QUIC_SESSION_TYPE_LISTEN;
   listen_session->accepted_session_count = 0;
-  session_index = listen_session - em->sessions;
-  listen_session->session_index = session_index;
-  hash_set (em->session_index_by_vpp_handles, mp->handle, session_index);
+  hash_set (em->session_index_by_vpp_handles, mp->handle,
+           listen_session->session_index);
   em->state = STATE_LISTEN;
-  em->listen_session_index = session_index;
+  em->listen_session_index = listen_session->session_index;
 }
 
 static void
@@ -966,11 +1077,9 @@ session_accepted_handler (session_accepted_msg_t * mp)
   svm_fifo_t *rx_fifo, *tx_fifo;
   echo_main_t *em = &echo_main;
   echo_session_t *session, *listen_session;
-  u32 session_index;
   uword *p;
   /* Allocate local session and set it up */
-  pool_get (em->sessions, session);
-  session_index = session - em->sessions;
+  session = echo_session_alloc (em);
 
   if (wait_for_segment_allocation (mp->segment_handle))
     {
@@ -979,13 +1088,12 @@ session_accepted_handler (session_accepted_msg_t * mp)
     }
 
   rx_fifo = uword_to_pointer (mp->server_rx_fifo, svm_fifo_t *);
-  rx_fifo->client_session_index = session_index;
+  rx_fifo->client_session_index = session->session_index;
   tx_fifo = uword_to_pointer (mp->server_tx_fifo, svm_fifo_t *);
-  tx_fifo->client_session_index = session_index;
+  tx_fifo->client_session_index = session->session_index;
 
   session->rx_fifo = rx_fifo;
   session->tx_fifo = tx_fifo;
-  session->session_index = session_index;
   session->vpp_session_handle = mp->handle;
   session->start = clib_time_now (&em->clib_time);
   session->vpp_evt_q = uword_to_pointer (mp->vpp_event_queue_address,
@@ -1003,9 +1111,9 @@ session_accepted_handler (session_accepted_msg_t * mp)
   clib_atomic_fetch_add (&listen_session->accepted_session_count, 1);
 
   /* Add it to lookup table */
-  DBG ("Accepted session 0x%lx, Listener 0x%lx idx %lu", mp->handle,
-       mp->listener_handle, session_index);
-  hash_set (em->session_index_by_vpp_handles, mp->handle, session_index);
+  DBG ("Accepted session 0x%lx -> 0x%lx", mp->handle, mp->listener_handle);
+  hash_set (em->session_index_by_vpp_handles, mp->handle,
+           session->session_index);
 
   app_alloc_ctrl_evt_to_vpp (session->vpp_evt_q, app_evt,
                             SESSION_CTRL_EVT_ACCEPTED_REPLY);
@@ -1018,26 +1126,27 @@ session_accepted_handler (session_accepted_msg_t * mp)
     {
       session->session_type = QUIC_SESSION_TYPE_QUIC;
       if (em->cb_vft.quic_accepted_cb)
-       em->cb_vft.quic_accepted_cb (mp, session_index);
+       em->cb_vft.quic_accepted_cb (mp, session->session_index);
       clib_atomic_fetch_add (&em->n_quic_clients_connected, 1);
     }
   else if (em->i_am_master)
     {
       session->session_type = QUIC_SESSION_TYPE_STREAM;
       if (em->cb_vft.server_stream_accepted_cb)
-       em->cb_vft.server_stream_accepted_cb (mp, session_index);
+       em->cb_vft.server_stream_accepted_cb (mp, session->session_index);
       clib_atomic_fetch_add (&em->n_clients_connected, 1);
     }
   else
     {
       session->session_type = QUIC_SESSION_TYPE_STREAM;
       if (em->cb_vft.client_stream_accepted_cb)
-       em->cb_vft.client_stream_accepted_cb (mp, session_index);
+       em->cb_vft.client_stream_accepted_cb (mp, session->session_index);
       clib_atomic_fetch_add (&em->n_clients_connected, 1);
     }
 
   if (em->n_clients_connected == em->n_clients * em->n_stream_clients)
     {
+      DBG ("App is ready");
       em->state = STATE_READY;
       quic_echo_notify_event (em, ECHO_EVT_LAST_SCONNECTED);
     }
@@ -1050,7 +1159,6 @@ session_connected_handler (session_connected_msg_t * mp)
 {
   echo_main_t *em = &echo_main;
   echo_session_t *session, *listen_session;
-  u32 session_index;
   u32 listener_index = htonl (mp->context);
   svm_fifo_t *rx_fifo, *tx_fifo;
 
@@ -1061,10 +1169,7 @@ session_connected_handler (session_connected_msg_t * mp)
       return;
     }
 
-  pool_get (em->sessions, session);
-  clib_memset (session, 0, sizeof (*session));
-  session_index = session - em->sessions;
-
+  session = echo_session_alloc (em);
   if (wait_for_segment_allocation (mp->segment_handle))
     {
       ECHO_FAIL ("wait_for_segment_allocation errored");
@@ -1072,53 +1177,57 @@ session_connected_handler (session_connected_msg_t * mp)
     }
 
   rx_fifo = uword_to_pointer (mp->server_rx_fifo, svm_fifo_t *);
-  rx_fifo->client_session_index = session_index;
+  rx_fifo->client_session_index = session->session_index;
   tx_fifo = uword_to_pointer (mp->server_tx_fifo, svm_fifo_t *);
-  tx_fifo->client_session_index = session_index;
+  tx_fifo->client_session_index = session->session_index;
 
   session->rx_fifo = rx_fifo;
   session->tx_fifo = tx_fifo;
   session->vpp_session_handle = mp->handle;
-  session->session_index = session_index;
   session->start = clib_time_now (&em->clib_time);
   session->vpp_evt_q = uword_to_pointer (mp->vpp_event_queue_address,
                                         svm_msg_q_t *);
 
-  session->listener_index = listener_index;
   session->accepted_session_count = 0;
   if (listener_index != SESSION_INVALID_INDEX)
     {
       listen_session = pool_elt_at_index (em->sessions, listener_index);
       clib_atomic_fetch_add (&listen_session->accepted_session_count, 1);
+      session->listener_index = listen_session->session_index;
     }
 
-  DBG ("Connected session 0x%lx", mp->handle, session_index);
-  hash_set (em->session_index_by_vpp_handles, mp->handle, session_index);
+  DBG ("Connected session 0x%lx -> 0x%lx", mp->handle,
+       listener_index !=
+       SESSION_INVALID_INDEX ? listen_session->vpp_session_handle : (u64) ~
+       0);
+  hash_set (em->session_index_by_vpp_handles, mp->handle,
+           session->session_index);
 
   if (listener_index == SESSION_INVALID_INDEX)
     {
       session->session_type = QUIC_SESSION_TYPE_QUIC;
       if (em->cb_vft.quic_connected_cb)
-       em->cb_vft.quic_connected_cb (mp, session_index);
+       em->cb_vft.quic_connected_cb (mp, session->session_index);
       clib_atomic_fetch_add (&em->n_quic_clients_connected, 1);
     }
   else if (em->i_am_master)
     {
       session->session_type = QUIC_SESSION_TYPE_STREAM;
       if (em->cb_vft.server_stream_connected_cb)
-       em->cb_vft.server_stream_connected_cb (mp, session_index);
+       em->cb_vft.server_stream_connected_cb (mp, session->session_index);
       clib_atomic_fetch_add (&em->n_clients_connected, 1);
     }
   else
     {
       session->session_type = QUIC_SESSION_TYPE_STREAM;
       if (em->cb_vft.client_stream_connected_cb)
-       em->cb_vft.client_stream_connected_cb (mp, session_index);
+       em->cb_vft.client_stream_connected_cb (mp, session->session_index);
       clib_atomic_fetch_add (&em->n_clients_connected, 1);
     }
 
   if (em->n_clients_connected == em->n_clients * em->n_stream_clients)
     {
+      DBG ("App is ready");
       em->state = STATE_READY;
       quic_echo_notify_event (em, ECHO_EVT_LAST_SCONNECTED);
     }
@@ -1164,16 +1273,7 @@ echo_on_connected_send (session_connected_msg_t * mp, u32 session_index)
   session->bytes_to_send = em->bytes_to_send;
   session->bytes_to_receive = em->bytes_to_receive;
 
-  DBG ("Stream session 0x%lx connected", session->vpp_session_handle);
-
-  /*
-   * Start RX thread
-   */
-  em->thread_args[em->n_clients_connected] = session_index;
-  rv = pthread_create (&em->client_thread_handles[em->n_clients_connected],
-                      NULL /*attr */ , echo_thread_fn,
-                      (void *) &em->thread_args[em->n_clients_connected]);
-  if (rv)
+  if ((rv = echo_start_rx_thread (session_index)))
     {
       ECHO_FAIL ("pthread_create returned %d", rv);
       return;
@@ -1198,16 +1298,7 @@ echo_on_accept_recv (session_accepted_msg_t * mp, u32 session_index)
   session->bytes_to_send = em->bytes_to_send;
   session->bytes_to_receive = em->bytes_to_receive;
 
-  DBG ("Stream session 0x%lx accepted", mp->handle);
-
-  /*
-   * Start RX thread
-   */
-  em->thread_args[em->n_clients_connected] = session_index;
-  rv = pthread_create (&em->client_thread_handles[em->n_clients_connected],
-                      NULL /*attr */ , echo_thread_fn,
-                      (void *) &em->thread_args[em->n_clients_connected]);
-  if (rv)
+  if ((rv = echo_start_rx_thread (session_index)))
     {
       ECHO_FAIL ("pthread_create returned %d", rv);
       return;
@@ -1317,11 +1408,7 @@ session_disconnected_handler (session_disconnected_msg_t * mp)
   rmp->context = mp->context;
   app_send_ctrl_evt_to_vpp (s->vpp_evt_q, app_evt);
 
-  if (s->session_type == QUIC_SESSION_TYPE_STREAM)
-    s->flags |= SESSION_FLAG_SHOULD_CLOSE;     /* tell thread to close session */
-  else
-    echo_cleanup_session (em, s);      /* We can clean Qsessions right away */
-
+  echo_initiate_session_close (em, s, 0 /* active */ );
   if (s->session_type == QUIC_SESSION_TYPE_STREAM)
     session_print_stats (em, s);
 }
@@ -1636,7 +1723,7 @@ vl_api_unbind_uri_reply_t_handler (vl_api_unbind_uri_reply_t * mp)
     }
   em->state = STATE_DISCONNECTED;
   listen_session = pool_elt_at_index (em->sessions, em->listen_session_index);
-  echo_cleanup_session (em, listen_session);
+  echo_initiate_session_close (em, listen_session, 1 /* active */ );
 }
 
 static void
@@ -1662,7 +1749,7 @@ vl_api_disconnect_session_reply_t_handler (vl_api_disconnect_session_reply_t *
     }
 
   s = pool_elt_at_index (em->sessions, p[0]);
-  echo_cleanup_session (em, s);
+  echo_initiate_session_close (em, s, 1 /* active */ );
 }
 
 static void
@@ -1681,6 +1768,29 @@ static void
     ECHO_FAIL ("failed to add tls key");
 }
 
+static void
+vl_api_connect_uri_reply_t_handler (vl_api_connect_uri_reply_t * mp)
+{
+  echo_session_t *session;
+  echo_main_t *em = &echo_main;
+  u8 *uri;
+  if (!mp->retval)
+    return;
+  /* retry connect */
+  if (mp->context == SESSION_INVALID_INDEX)
+    {
+      DBG ("Retrying connect %s", em->uri);
+      echo_send_connect (em, em->uri, SESSION_INVALID_INDEX);
+    }
+  else
+    {
+      session = pool_elt_at_index (em->sessions, mp->context);
+      uri = format (0, "QUIC://session/%lu", session->vpp_session_handle);
+      DBG ("Retrying connect %s", uri);
+      echo_send_connect (em, uri, mp->context);
+    }
+}
+
 #define foreach_quic_echo_msg                                          \
 _(BIND_URI_REPLY, bind_uri_reply)                                      \
 _(UNBIND_URI_REPLY, unbind_uri_reply)                                  \
@@ -1691,6 +1801,7 @@ _(MAP_ANOTHER_SEGMENT, map_another_segment)                               \
 _(UNMAP_SEGMENT, unmap_segment)                                                \
 _(APPLICATION_TLS_CERT_ADD_REPLY, application_tls_cert_add_reply)      \
 _(APPLICATION_TLS_KEY_ADD_REPLY, application_tls_key_add_reply)                \
+_(CONNECT_URI_REPLY, connect_uri_reply)                \
 
 void
 quic_echo_api_hookup (echo_main_t * em)
@@ -1732,8 +1843,11 @@ print_usage_and_exit (void)
           "  chroot prefix PATH  Use PATH as memory root path\n"
           "  quic-setup OPT      OPT=serverstream : Client open N connections. On each one server opens M streams\n"
           "                            by default : Client open N connections. On each one client opens M streams\n"
+          "  sclose=[Y|N|W]      When a stream is done (RX & TX),     send close[Y] wait for close[W] or pass[N]\n"
+          "  qclose=[Y|N|W]      When a connection is done (RX & TX), send close[Y] wait for close[W] or pass[N]\n"
           "\n"
           "  nclients N[/M]       Open N QUIC connections, each one with M streams (M defaults to 1)\n"
+          "  nthreads N           Use N busy loop threads for data [in addition to main & msg queue]\n"
           "  TX=1337[Kb|Mb|GB]    Send 1337 [K|M|G]bytes, use TX=RX to reflect the data\n"
           "  RX=1337[Kb|Mb|GB]    Expect 1337 [K|M|G]bytes\n"
           "\n"
@@ -1752,6 +1866,7 @@ quic_echo_process_opts (int argc, char **argv)
   u32 tmp;
   u8 *chroot_prefix;
   u8 *uri = 0;
+  u8 default_f_active;
 
   unformat_init_command_line (a, argv);
   while (unformat_check_input (a) != UNFORMAT_END_OF_INPUT)
@@ -1786,6 +1901,8 @@ quic_echo_process_opts (int argc, char **argv)
        ;
       else if (unformat (a, "nclients %d", &em->n_clients))
        ;
+      else if (unformat (a, "nthreads %d", &em->n_rx_threads))
+       ;
       else if (unformat (a, "appns %_%v%_", &em->appns_id))
        ;
       else if (unformat (a, "all-scope"))
@@ -1805,6 +1922,14 @@ quic_echo_process_opts (int argc, char **argv)
        ;
       else if (unformat (a, "RX=%U", unformat_data, &em->bytes_to_receive))
        ;
+      else
+       if (unformat
+           (a, "sclose=%U", unformat_close, &em->send_stream_disconnects))
+       ;
+      else
+       if (unformat
+           (a, "qclose=%U", unformat_close, &em->send_quic_disconnects))
+       ;
       else if (unformat (a, "time %U:%U",
                         echo_unformat_timing_event, &em->timing_start_event,
                         echo_unformat_timing_event, &em->timing_end_event))
@@ -1813,6 +1938,9 @@ quic_echo_process_opts (int argc, char **argv)
        print_usage_and_exit ();
     }
 
+  /* setting default for unset values
+   *
+   * bytes_to_send / bytes_to_receive & data_source  */
   if (em->bytes_to_receive == (u64) ~ 0)
     em->bytes_to_receive = 64 << 10;   /* default */
   if (em->bytes_to_send == (u64) ~ 0)
@@ -1828,7 +1956,17 @@ quic_echo_process_opts (int argc, char **argv)
   if (em->data_source == ECHO_RX_DATA_SOURCE)
     em->bytes_to_send = em->bytes_to_receive;
 
-  em->send_disconnects = !em->i_am_master;
+  /* disconnect flags  */
+  if (em->i_am_master)
+    default_f_active =
+      em->bytes_to_send == 0 ? ECHO_CLOSE_F_ACTIVE : ECHO_CLOSE_F_PASSIVE;
+  else
+    default_f_active =
+      em->bytes_to_receive == 0 ? ECHO_CLOSE_F_PASSIVE : ECHO_CLOSE_F_ACTIVE;
+  if (em->send_stream_disconnects == ECHO_CLOSE_F_INVALID)
+    em->send_stream_disconnects = default_f_active;
+  if (em->send_quic_disconnects == ECHO_CLOSE_F_INVALID)
+    em->send_quic_disconnects = default_f_active;
 }
 
 int
@@ -1852,6 +1990,7 @@ main (int argc, char **argv)
   em->max_test_msg = 50;
   em->time_to_stop = 0;
   em->i_am_master = 1;
+  em->n_rx_threads = 4;
   em->test_return_packets = RETURN_PACKETS_NOTEST;
   em->timing_start_event = ECHO_EVT_FIRST_QCONNECT;
   em->timing_end_event = ECHO_EVT_LAST_BYTE;
@@ -1865,7 +2004,7 @@ main (int argc, char **argv)
   quic_echo_process_opts (argc, argv);
 
   n_clients = em->n_clients * em->n_stream_clients;
-  vec_validate (em->client_thread_handles, n_clients - 1);
+  vec_validate (em->client_thread_handles, em->n_rx_threads - 1);
   vec_validate (em->thread_args, n_clients - 1);
   clib_time_init (&em->clib_time);
   init_error_string_table (em);
index 8732b7d..f38a7ca 100644 (file)
@@ -306,6 +306,14 @@ quic_ctx_get (u32 ctx_index, u32 thread_index)
   return pool_elt_at_index (quic_main.ctx_pool[thread_index], ctx_index);
 }
 
+static quic_ctx_t *
+quic_ctx_get_if_valid (u32 ctx_index, u32 thread_index)
+{
+  if (pool_is_free_index (quic_main.ctx_pool[thread_index], ctx_index))
+    return 0;
+  return pool_elt_at_index (quic_main.ctx_pool[thread_index], ctx_index);
+}
+
 static quic_ctx_t *
 quic_get_conn_ctx (quicly_conn_t * conn)
 {
@@ -1494,7 +1502,9 @@ quic_connect (transport_endpoint_cfg_t * tep)
 static void
 quic_proto_on_close (u32 ctx_index, u32 thread_index)
 {
-  quic_ctx_t *ctx = quic_ctx_get (ctx_index, thread_index);
+  quic_ctx_t *ctx = quic_ctx_get_if_valid (ctx_index, thread_index);
+  if (!ctx)
+    return;
 #if QUIC_DEBUG >= 2
   session_t *stream_session =
     session_get (ctx->c_s_index, ctx->c_thread_index);