Add QUIC multistream support
[vpp.git] / src / tests / vnet / session / quic_echo.c
index 7d0cedd..29b33f5 100644 (file)
 #include <vpp/api/vpe_all_api_h.h>
 #undef vl_printfun
 
-#define TCP_ECHO_DBG 0
-#define DBG(_fmt,_args...)                     \
-    if (TCP_ECHO_DBG)                          \
-      clib_warning (_fmt, _args)
+#define QUIC_ECHO_DBG 0
+#define DBG(_fmt, _args...)                    \
+    if (QUIC_ECHO_DBG)                                 \
+      clib_warning (_fmt, ##_args)
 
 typedef struct
 {
@@ -68,6 +68,13 @@ typedef enum
   STATE_DETACHED
 } connection_state_t;
 
+enum quic_session_type_t
+{
+  QUIC_SESSION_TYPE_QUIC = 0,
+  QUIC_SESSION_TYPE_STREAM = 1,
+  QUIC_SESSION_TYPE_LISTEN = INT32_MAX,
+};
+
 typedef struct
 {
   /* vpe input queue */
@@ -85,6 +92,10 @@ typedef struct
   /* Hash table for disconnect processing */
   uword *session_index_by_vpp_handles;
 
+  /* Hash table for shared segment_names */
+  uword *shared_segment_names;
+  clib_spinlock_t segment_names_lock;
+
   /* intermediate rx buffer */
   u8 *rx_buf;
 
@@ -140,6 +151,7 @@ typedef struct
    * vpp. If sock api is used, shm binary api is subsequently bootstrapped
    * and all other messages are exchanged using shm IPC. */
   u8 use_sock_api;
+  int max_test_msg;
 
   fifo_segment_main_t segment_main;
 } echo_main_t;
@@ -182,19 +194,54 @@ init_error_string_table (echo_main_t * em)
 
 static void handle_mq_event (session_event_t * e);
 
+#if CLIB_DEBUG > 0
+#define TIMEOUT 10.0
+#else
+#define TIMEOUT 10.0
+#endif
+
+static int
+wait_for_segment_allocation (u64 segment_handle)
+{
+  echo_main_t *em = &echo_main;
+  f64 timeout;
+  timeout = clib_time_now (&em->clib_time) + TIMEOUT;
+  uword *segment_present;
+  DBG ("ASKING for %lu", segment_handle);
+  while (clib_time_now (&em->clib_time) < timeout)
+    {
+      clib_spinlock_lock (&em->segment_names_lock);
+      segment_present = hash_get (em->shared_segment_names, segment_handle);
+      clib_spinlock_unlock (&em->segment_names_lock);
+      if (segment_present != 0)
+       return 0;
+      if (em->time_to_stop == 1)
+       return 0;
+    }
+  DBG ("timeout waiting for segment_allocation %lu", segment_handle);
+  return -1;
+}
+
+static int
+wait_for_disconnected_sessions (echo_main_t * em)
+{
+  f64 timeout;
+  timeout = clib_time_now (&em->clib_time) + TIMEOUT;
+  while (clib_time_now (&em->clib_time) < timeout)
+    {
+      if (hash_elts (em->session_index_by_vpp_handles) == 0)
+       return 0;
+    }
+  DBG ("timeout waiting for disconnected_sessions");
+  return -1;
+}
+
 static int
 wait_for_state_change (echo_main_t * em, connection_state_t state)
 {
   svm_msg_q_msg_t msg;
   session_event_t *e;
   f64 timeout;
-
-#if CLIB_DEBUG > 0
-#define TIMEOUT 600.0
-#else
-#define TIMEOUT 600.0
-#endif
-
   timeout = clib_time_now (&em->clib_time) + TIMEOUT;
 
   while (clib_time_now (&em->clib_time) < timeout)
@@ -306,7 +353,6 @@ ssvm_segment_attach (char *name, ssvm_segment_type_t type, int fd)
       clib_warning ("svm_fifo_segment_attach ('%s') failed", name);
       return rv;
     }
-
   vec_reset_length (a->new_segment_indices);
   return 0;
 }
@@ -318,6 +364,9 @@ vl_api_application_attach_reply_t_handler (vl_api_application_attach_reply_t *
   echo_main_t *em = &echo_main;
   int *fds = 0;
   u32 n_fds = 0;
+  u64 segment_handle;
+  segment_handle = clib_net_to_host_u64 (mp->segment_handle);
+  DBG ("Attached returned app %u", htons (mp->app_index));
 
   if (mp->retval)
     {
@@ -361,6 +410,10 @@ vl_api_application_attach_reply_t_handler (vl_api_application_attach_reply_t *
                               -1))
        goto failed;
     }
+  DBG ("SETTING for %lu", segment_handle);
+  clib_spinlock_lock (&em->segment_names_lock);
+  hash_set (em->shared_segment_names, segment_handle, 1);
+  clib_spinlock_unlock (&em->segment_names_lock);
 
   em->state = STATE_ATTACHED;
   return;
@@ -456,7 +509,26 @@ vl_api_map_another_segment_t_handler (vl_api_map_another_segment_t * mp)
 {
   fifo_segment_main_t *sm = &echo_main.segment_main;
   fifo_segment_create_args_t _a, *a = &_a;
+  echo_main_t *em = &echo_main;
   int rv;
+  int *fds = 0;
+
+  if (mp->fd_flags & SESSION_FD_F_MEMFD_SEGMENT)
+    {
+      vec_validate (fds, 1);
+      vl_socket_client_recv_fd_msg (fds, 1, 5);
+      if (ssvm_segment_attach
+         ((char *) mp->segment_name, SSVM_SEGMENT_MEMFD, fds[0]))
+       clib_warning
+         ("svm_fifo_segment_attach ('%s') failed on SSVM_SEGMENT_MEMFD",
+          mp->segment_name);
+      DBG ("SETTING for %lu", mp->segment_name);
+      clib_spinlock_lock (&em->segment_names_lock);
+      hash_set (em->shared_segment_names, mp->segment_name, 1);
+      clib_spinlock_unlock (&em->segment_names_lock);
+      vec_free (fds);
+      return;
+    }
 
   clib_memset (a, 0, sizeof (*a));
   a->segment_name = (char *) mp->segment_name;
@@ -471,6 +543,9 @@ vl_api_map_another_segment_t_handler (vl_api_map_another_segment_t * mp)
     }
   clib_warning ("Mapped new segment '%s' size %d", mp->segment_name,
                mp->segment_size);
+  clib_spinlock_lock (&em->segment_names_lock);
+  hash_set (em->shared_segment_names, mp->segment_name, 1);
+  clib_spinlock_unlock (&em->segment_names_lock);
 }
 
 static void
@@ -486,16 +561,21 @@ session_print_stats (echo_main_t * em, echo_session_t * session)
 }
 
 static void
-test_recv_bytes (echo_session_t * s, u8 * rx_buf, u32 n_read)
+test_recv_bytes (echo_main_t * em, echo_session_t * s, u8 * rx_buf,
+                u32 n_read)
 {
   int i;
   for (i = 0; i < n_read; i++)
     {
-      if (rx_buf[i] != ((s->bytes_received + i) & 0xff))
+      if (rx_buf[i] != ((s->bytes_received + i) & 0xff)
+         && em->max_test_msg > 0)
        {
          clib_warning ("error at byte %lld, 0x%x not 0x%x",
                        s->bytes_received + i, rx_buf[i],
                        ((s->bytes_received + i) & 0xff));
+         em->max_test_msg--;
+         if (em->max_test_msg == 0)
+           clib_warning ("Too many errors, hiding next ones");
        }
     }
 }
@@ -517,12 +597,13 @@ recv_data_chunk (echo_main_t * em, echo_session_t * s, u8 * rx_buf)
       if (n_read > 0)
        {
          if (em->test_return_packets)
-           test_recv_bytes (s, rx_buf, n_read);
+           test_recv_bytes (em, s, rx_buf, n_read);
 
          n_to_read -= n_read;
 
          s->bytes_received += n_read;
          s->bytes_to_receive -= n_read;
+         ASSERT (s->bytes_to_receive >= 0);
        }
       else
        break;
@@ -577,8 +658,9 @@ client_thread_fn (void *arg)
        break;
     }
 
-  clib_warning ("GOT OUT");
-  DBG ("session %d done", session_index);
+  DBG ("session %d done send %lu to do, %lu done || recv %lu to do, %lu done",
+       session_index, s->bytes_to_send, s->bytes_sent, s->bytes_to_receive,
+       s->bytes_received);
   em->tx_total += s->bytes_sent;
   em->rx_total += s->bytes_received;
   em->n_active_clients--;
@@ -587,7 +669,7 @@ client_thread_fn (void *arg)
 }
 
 void
-client_send_connect (echo_main_t * em)
+client_send_connect (echo_main_t * em, u8 * uri, u32 opaque)
 {
   vl_api_connect_uri_t *cmp;
   cmp = vl_msg_api_alloc (sizeof (*cmp));
@@ -595,8 +677,8 @@ client_send_connect (echo_main_t * em)
 
   cmp->_vl_msg_id = ntohs (VL_API_CONNECT_URI);
   cmp->client_index = em->my_client_index;
-  cmp->context = ntohl (0xfeedface);
-  memcpy (cmp->uri, em->connect_uri, vec_len (em->connect_uri));
+  cmp->context = ntohl (opaque);
+  memcpy (cmp->uri, uri, vec_len (uri));
   vl_msg_api_send_shmem (em->vl_input_queue, (u8 *) & cmp);
 }
 
@@ -640,6 +722,13 @@ session_bound_handler (session_bound_msg_t * mp)
   em->state = STATE_READY;
 }
 
+static void
+quic_qsession_accepted_handler (session_accepted_msg_t * mp)
+{
+  DBG ("Accept on QSession index %u", mp->handle);
+}
+
+
 static void
 session_accepted_handler (session_accepted_msg_t * mp)
 {
@@ -650,8 +739,11 @@ session_accepted_handler (session_accepted_msg_t * mp)
   echo_session_t *session;
   static f64 start_time;
   u32 session_index;
+  u64 segment_handle;
   u8 *ip_str;
 
+  segment_handle = mp->segment_handle;
+
   if (start_time == 0.0)
     start_time = clib_time_now (&em->clib_time);
 
@@ -662,7 +754,14 @@ session_accepted_handler (session_accepted_msg_t * mp)
   /* Allocate local session and set it up */
   pool_get (em->sessions, session);
   session_index = session - em->sessions;
+  DBG ("Setting session_index %lu", session_index);
 
+  if (wait_for_segment_allocation (segment_handle))
+    {
+      clib_warning ("timeout waiting for segment allocation %lu",
+                   segment_handle);
+      return;
+    }
   rx_fifo = uword_to_pointer (mp->server_rx_fifo, svm_fifo_t *);
   rx_fifo->client_session_index = session_index;
   tx_fifo = uword_to_pointer (mp->server_tx_fifo, svm_fifo_t *);
@@ -670,12 +769,28 @@ session_accepted_handler (session_accepted_msg_t * mp)
 
   session->rx_fifo = rx_fifo;
   session->tx_fifo = tx_fifo;
+  session->vpp_session_handle = mp->handle;
   session->vpp_evt_q = uword_to_pointer (mp->vpp_event_queue_address,
                                         svm_msg_q_t *);
 
   /* Add it to lookup table */
   hash_set (em->session_index_by_vpp_handles, mp->handle, session_index);
 
+  /*
+   * Send accept reply to vpp
+   */
+  app_alloc_ctrl_evt_to_vpp (session->vpp_evt_q, app_evt,
+                            SESSION_CTRL_EVT_ACCEPTED_REPLY);
+  rmp = (session_accepted_reply_msg_t *) app_evt->evt->data;
+  rmp->handle = mp->handle;
+  rmp->context = mp->context;
+  app_send_ctrl_evt_to_vpp (session->vpp_evt_q, app_evt);
+
+  /* TODO : this is very ugly */
+  if (mp->rmt.is_ip4 != 255)
+    return quic_qsession_accepted_handler (mp);
+  DBG ("SSession handle is %lu", mp->handle);
+
   em->state = STATE_READY;
 
   /* Stats printing */
@@ -687,20 +802,19 @@ session_accepted_handler (session_accepted_msg_t * mp)
               (f64) pool_elts (em->sessions) / (now - start_time));
     }
 
-  /*
-   * Send accept reply to vpp
-   */
-  app_alloc_ctrl_evt_to_vpp (session->vpp_evt_q, app_evt,
-                            SESSION_CTRL_EVT_ACCEPTED_REPLY);
-  rmp = (session_accepted_reply_msg_t *) app_evt->evt->data;
-  rmp->handle = mp->handle;
-  rmp->context = mp->context;
-  app_send_ctrl_evt_to_vpp (session->vpp_evt_q, app_evt);
-
   session->bytes_received = 0;
   session->start = clib_time_now (&em->clib_time);
 }
 
+static void
+quic_session_connected_handler (session_connected_msg_t * mp)
+{
+  echo_main_t *em = &echo_main;
+  u8 *uri = format (0, "QUIC://session/%lu", mp->handle);
+  DBG ("QSession Connect : %s", uri);
+  client_send_connect (em, uri, QUIC_SESSION_TYPE_STREAM);
+}
+
 static void
 session_connected_handler (session_connected_msg_t * mp)
 {
@@ -709,6 +823,8 @@ session_connected_handler (session_connected_msg_t * mp)
   u32 session_index;
   svm_fifo_t *rx_fifo, *tx_fifo;
   int rv;
+  u64 segment_handle;
+  segment_handle = mp->segment_handle;
 
   if (mp->retval)
     {
@@ -725,7 +841,14 @@ session_connected_handler (session_connected_msg_t * mp)
   pool_get (em->sessions, session);
   clib_memset (session, 0, sizeof (*session));
   session_index = session - em->sessions;
+  DBG ("Setting session_index %lu", session_index);
 
+  if (wait_for_segment_allocation (segment_handle))
+    {
+      clib_warning ("timeout waiting for segment allocation %lu",
+                   segment_handle);
+      return;
+    }
   rx_fifo = uword_to_pointer (mp->server_rx_fifo, svm_fifo_t *);
   rx_fifo->client_session_index = session_index;
   tx_fifo = uword_to_pointer (mp->server_tx_fifo, svm_fifo_t *);
@@ -740,6 +863,11 @@ session_connected_handler (session_connected_msg_t * mp)
 
   hash_set (em->session_index_by_vpp_handles, mp->handle, session_index);
 
+  if (mp->context == QUIC_SESSION_TYPE_QUIC)
+    return quic_session_connected_handler (mp);
+
+  DBG ("SSession Connected");
+
   /*
    * Start RX thread
    */
@@ -768,6 +896,7 @@ session_disconnected_handler (session_disconnected_msg_t * mp)
   echo_session_t *session = 0;
   uword *p;
   int rv = 0;
+  DBG ("Got a SESSION_CTRL_EVT_DISCONNECTED for session %lu", mp->handle);
 
   p = hash_get (em->session_index_by_vpp_handles, mp->handle);
   if (!p)
@@ -778,6 +907,7 @@ session_disconnected_handler (session_disconnected_msg_t * mp)
 
   session = pool_elt_at_index (em->sessions, p[0]);
   hash_unset (em->session_index_by_vpp_handles, mp->handle);
+
   pool_put (em->sessions, session);
 
   app_alloc_ctrl_evt_to_vpp (session->vpp_evt_q, app_evt,
@@ -830,18 +960,23 @@ handle_mq_event (session_event_t * e)
   switch (e->event_type)
     {
     case SESSION_CTRL_EVT_BOUND:
+      DBG ("SESSION_CTRL_EVT_BOUND");
       session_bound_handler ((session_bound_msg_t *) e->data);
       break;
     case SESSION_CTRL_EVT_ACCEPTED:
+      DBG ("SESSION_CTRL_EVT_ACCEPTED");
       session_accepted_handler ((session_accepted_msg_t *) e->data);
       break;
     case SESSION_CTRL_EVT_CONNECTED:
+      DBG ("SESSION_CTRL_EVT_CONNECTED");
       session_connected_handler ((session_connected_msg_t *) e->data);
       break;
     case SESSION_CTRL_EVT_DISCONNECTED:
+      DBG ("SESSION_CTRL_EVT_DISCONNECTED");
       session_disconnected_handler ((session_disconnected_msg_t *) e->data);
       break;
     case SESSION_CTRL_EVT_RESET:
+      DBG ("SESSION_CTRL_EVT_RESET");
       session_reset_handler ((session_reset_msg_t *) e->data);
       break;
     default:
@@ -856,6 +991,7 @@ clients_run (echo_main_t * em)
   svm_msg_q_msg_t msg;
   session_event_t *e;
   echo_session_t *s;
+  hash_pair_t *p;
   int i;
 
   /* Init test data */
@@ -870,7 +1006,7 @@ clients_run (echo_main_t * em)
     return;
 
   for (i = 0; i < em->n_clients; i++)
-    client_send_connect (em);
+    client_send_connect (em, em->connect_uri, QUIC_SESSION_TYPE_QUIC);
 
   start_time = clib_time_now (&em->clib_time);
   while (em->n_clients_connected < em->n_clients
@@ -897,18 +1033,23 @@ clients_run (echo_main_t * em)
   /*
    * Initialize connections
    */
-  for (i = 0; i < em->n_clients; i++)
-    {
-      s = pool_elt_at_index (em->sessions, i);
+  DBG ("Initialize connections on %u clients", em->n_clients);
+
+  /* *INDENT-OFF* */
+  hash_foreach_pair (p, em->session_index_by_vpp_handles,
+                ({
+      s = pool_elt_at_index (em->sessions, p->value[0]);
       s->bytes_to_send = em->bytes_to_send;
       if (!em->no_return)
        s->bytes_to_receive = em->bytes_to_send;
-    }
+                }));
+  /* *INDENT-ON* */
   em->n_active_clients = em->n_clients_connected;
 
   /*
    * Wait for client threads to send the data
    */
+  DBG ("Waiting for data on %u clients", em->n_active_clients);
   start_time = clib_time_now (&em->clib_time);
   em->state = STATE_READY;
   while (em->n_active_clients)
@@ -925,11 +1066,14 @@ clients_run (echo_main_t * em)
        svm_msg_q_free_msg (em->our_event_queue, &msg);
       }
 
-  for (i = 0; i < em->n_clients; i++)
-    {
-      s = pool_elt_at_index (em->sessions, i);
+  /* *INDENT-OFF* */
+  hash_foreach_pair (p, em->session_index_by_vpp_handles,
+                ({
+      s = pool_elt_at_index (em->sessions, p->value[0]);
+      DBG ("Sending disconnect on session %lu", p->key);
       client_disconnect (em, s);
-    }
+                }));
+  /* *INDENT-ON* */
 
   /*
    * Stats and detach
@@ -940,6 +1084,7 @@ clients_run (echo_main_t * em)
           em->tx_total / (1ULL << 30), deltat);
   fformat (stdout, "%.4f Gbit/second\n", (em->tx_total * 8.0) / deltat / 1e9);
 
+  wait_for_disconnected_sessions (em);
   application_detach (em);
 }
 
@@ -1059,7 +1204,6 @@ server_handle_rx (echo_main_t * em, session_event_t * e)
   int n_read, max_dequeue, n_sent;
   u32 offset, to_dequeue;
   echo_session_t *s;
-
   s = pool_elt_at_index (em->sessions, e->session_index);
 
   /* Clear event only once. Otherwise, if we do it in the loop by calling
@@ -1070,7 +1214,6 @@ server_handle_rx (echo_main_t * em, session_event_t * e)
   max_dequeue = svm_fifo_max_dequeue (s->rx_fifo);
   if (PREDICT_FALSE (!max_dequeue))
     return;
-
   do
     {
       /* The options here are to limit ourselves to max_dequeue or read
@@ -1079,8 +1222,12 @@ server_handle_rx (echo_main_t * em, session_event_t * e)
       to_dequeue = clib_min (max_dequeue, vec_len (em->rx_buf));
       n_read = app_recv_stream_raw (s->rx_fifo, em->rx_buf, to_dequeue,
                                    0 /* clear evt */ , 0 /* peek */ );
+
       if (n_read > 0)
        {
+         if (em->test_return_packets)
+           test_recv_bytes (em, s, em->rx_buf, n_read);
+
          max_dequeue -= n_read;
          s->bytes_received += n_read;
        }
@@ -1127,10 +1274,10 @@ server_handle_mq (echo_main_t * em)
       if (rc == ETIMEDOUT)
        continue;
       e = svm_msg_q_msg_data (em->our_event_queue, &msg);
-      clib_warning ("Event %d", e->event_type);
       switch (e->event_type)
        {
-       case FIFO_EVENT_APP_RX:
+       case SESSION_IO_EVT_RX:
+         DBG ("SESSION_IO_EVT_RX");
          server_handle_rx (em, e);
          break;
        default:
@@ -1220,6 +1367,7 @@ vl_api_disconnect_session_reply_t_handler (vl_api_disconnect_session_reply_t *
 {
   echo_main_t *em = &echo_main;
   uword *p;
+  DBG ("Got disonnected reply for session %lu", mp->handle);
 
   if (mp->retval)
     {
@@ -1300,12 +1448,14 @@ main (int argc, char **argv)
 
   clib_memset (em, 0, sizeof (*em));
   em->session_index_by_vpp_handles = hash_create (0, sizeof (uword));
+  em->shared_segment_names = hash_create (0, sizeof (uword));
+  clib_spinlock_init (&em->segment_names_lock);
   em->my_pid = getpid ();
-  em->configured_segment_size = 1 << 20;
   em->socket_name = 0;
   em->use_sock_api = 1;
   em->fifo_size = 64 << 10;
   em->n_clients = 1;
+  em->max_test_msg = 50;
 
   clib_time_init (&em->clib_time);
   init_error_string_table (em);
@@ -1320,10 +1470,6 @@ main (int argc, char **argv)
        }
       else if (unformat (a, "uri %s", &uri))
        ;
-      else if (unformat (a, "segment-size %dM", &tmp))
-       em->configured_segment_size = tmp << 20;
-      else if (unformat (a, "segment-size %dG", &tmp))
-       em->configured_segment_size = tmp << 30;
       else if (unformat (a, "server"))
        i_am_server = 1;
       else if (unformat (a, "client"))