session: send ctrl msg over mq
[vpp.git] / src / tests / vnet / session / tcp_echo.c
index f8b75d9..6c99294 100644 (file)
@@ -62,6 +62,7 @@ typedef enum
 {
   STATE_START,
   STATE_ATTACHED,
+  STATE_LISTEN,
   STATE_READY,
   STATE_DISCONNECTING,
   STATE_FAILED,
@@ -101,9 +102,6 @@ typedef struct
   /* Our event queue */
   svm_msg_q_t *our_event_queue;
 
-  /* $$$ single thread only for the moment */
-  svm_queue_t *vpp_event_queue;
-
   u8 *socket_name;
 
   pid_t my_pid;
@@ -240,16 +238,22 @@ init_error_string_table (echo_main_t * em)
   hash_set (em->error_string_by_error_number, 99, "Misc");
 }
 
-int
+static void handle_mq_event (session_event_t * e);
+
+static int
 wait_for_state_change (echo_main_t * em, connection_state_t state)
 {
+  svm_msg_q_msg_t msg;
+  session_event_t *e;
+  f64 timeout;
+
 #if CLIB_DEBUG > 0
 #define TIMEOUT 600.0
 #else
 #define TIMEOUT 600.0
 #endif
 
-  f64 timeout = clib_time_now (&em->clib_time) + TIMEOUT;
+  timeout = clib_time_now (&em->clib_time) + TIMEOUT;
 
   while (clib_time_now (&em->clib_time) < timeout)
     {
@@ -259,6 +263,14 @@ wait_for_state_change (echo_main_t * em, connection_state_t state)
        return -1;
       if (em->time_to_stop == 1)
        return 0;
+      if (!em->our_event_queue)
+       continue;
+
+      if (svm_msg_q_sub (em->our_event_queue, &msg, SVM_Q_NOWAIT, 0))
+       continue;
+      e = svm_msg_q_msg_data (em->our_event_queue, &msg);
+      handle_mq_event (e);
+      svm_msg_q_free_msg (em->our_event_queue, &msg);
     }
   clib_warning ("timeout waiting for state %d", state);
   return -1;
@@ -277,8 +289,9 @@ application_send_attach (echo_main_t * em)
   bmp->_vl_msg_id = ntohs (VL_API_APPLICATION_ATTACH);
   bmp->client_index = em->my_client_index;
   bmp->context = ntohl (0xfeedface);
-  bmp->options[APP_OPTIONS_FLAGS] =
-    APP_OPTIONS_FLAGS_ACCEPT_REDIRECT | APP_OPTIONS_FLAGS_ADD_SEGMENT;
+  bmp->options[APP_OPTIONS_FLAGS] = APP_OPTIONS_FLAGS_ACCEPT_REDIRECT;
+  bmp->options[APP_OPTIONS_FLAGS] |= APP_OPTIONS_FLAGS_ADD_SEGMENT;
+  bmp->options[APP_OPTIONS_FLAGS] |= APP_OPTIONS_FLAGS_USE_MQ_FOR_CTRL_MSGS;
   bmp->options[APP_OPTIONS_PREALLOC_FIFO_PAIRS] = 16;
   bmp->options[APP_OPTIONS_RX_FIFO_SIZE] = em->fifo_size;
   bmp->options[APP_OPTIONS_TX_FIFO_SIZE] = em->fifo_size;
@@ -306,7 +319,7 @@ application_send_attach (echo_main_t * em)
   vl_msg_api_send_shmem (em->vl_input_queue, (u8 *) & key_mp);
 }
 
-int
+static int
 application_attach (echo_main_t * em)
 {
   application_send_attach (em);
@@ -547,74 +560,6 @@ session_print_stats (echo_main_t * em, session_t * session)
   fformat (stdout, "%.4f Gbit/second\n", (bytes * 8.0) / deltat / 1e9);
 }
 
-static void
-vl_api_disconnect_session_t_handler (vl_api_disconnect_session_t * mp)
-{
-  echo_main_t *em = &echo_main;
-  session_t *session = 0;
-  vl_api_disconnect_session_reply_t *rmp;
-  uword *p;
-  int rv = 0;
-
-  p = hash_get (em->session_index_by_vpp_handles, mp->handle);
-
-  if (p)
-    {
-      session = pool_elt_at_index (em->sessions, p[0]);
-      hash_unset (em->session_index_by_vpp_handles, mp->handle);
-      pool_put (em->sessions, session);
-    }
-  else
-    {
-      clib_warning ("couldn't find session key %llx", mp->handle);
-      rv = -11;
-    }
-
-//  em->time_to_stop = 1;
-
-  rmp = vl_msg_api_alloc (sizeof (*rmp));
-  memset (rmp, 0, sizeof (*rmp));
-
-  rmp->_vl_msg_id = ntohs (VL_API_DISCONNECT_SESSION_REPLY);
-  rmp->retval = rv;
-  rmp->handle = mp->handle;
-  rmp->context = mp->context;
-  vl_msg_api_send_shmem (em->vl_input_queue, (u8 *) & rmp);
-
-  if (session)
-    session_print_stats (em, session);
-}
-
-static void
-vl_api_reset_session_t_handler (vl_api_reset_session_t * mp)
-{
-  echo_main_t *em = &echo_main;
-  vl_api_reset_session_reply_t *rmp;
-  uword *p;
-  int rv = 0;
-
-  p = hash_get (em->session_index_by_vpp_handles, mp->handle);
-
-  if (p)
-    {
-      clib_warning ("got reset");
-      /* Cleanup later */
-      em->time_to_stop = 1;
-    }
-  else
-    {
-      clib_warning ("couldn't find session key %llx", mp->handle);
-      rv = -11;
-    }
-
-  rmp = vl_msg_api_alloc (sizeof (*rmp));
-  memset (rmp, 0, sizeof (*rmp));
-  rmp->_vl_msg_id = ntohs (VL_API_RESET_SESSION_REPLY);
-  rmp->retval = rv;
-  rmp->handle = mp->handle;
-  vl_msg_api_send_shmem (em->vl_input_queue, (u8 *) & rmp);
-}
-
 static void
 test_recv_bytes (session_t * s, u8 * rx_buf, u32 n_read)
 {
@@ -661,7 +606,7 @@ recv_test_chunk (echo_main_t * em, session_t * s, u8 * rx_buf)
 }
 
 void
-client_handle_fifo_event_rx (echo_main_t * em, session_fifo_event_t * e,
+client_handle_fifo_event_rx (echo_main_t * em, session_event_t * e,
                             u8 * rx_buf)
 {
   session_t *s;
@@ -743,7 +688,7 @@ client_thread_fn (void *arg)
 void *
 client_rx_thread_fn (void *arg)
 {
-  session_fifo_event_t _e, *e = &_e;
+  session_event_t _e, *e = &_e;
   echo_main_t *em = &echo_main;
   static u8 *rx_buf = 0;
   svm_msg_q_msg_t msg;
@@ -771,8 +716,104 @@ client_rx_thread_fn (void *arg)
   pthread_exit (0);
 }
 
+void
+client_send_connect (echo_main_t * em)
+{
+  vl_api_connect_uri_t *cmp;
+  cmp = vl_msg_api_alloc (sizeof (*cmp));
+  memset (cmp, 0, sizeof (*cmp));
+
+  cmp->_vl_msg_id = ntohs (VL_API_CONNECT_URI);
+  cmp->client_index = em->my_client_index;
+  cmp->context = ntohl (0xfeedface);
+  memcpy (cmp->uri, em->connect_uri, vec_len (em->connect_uri));
+  vl_msg_api_send_shmem (em->vl_input_queue, (u8 *) & cmp);
+}
+
+void
+client_send_disconnect (echo_main_t * em, session_t * s)
+{
+  vl_api_disconnect_session_t *dmp;
+  dmp = vl_msg_api_alloc (sizeof (*dmp));
+  memset (dmp, 0, sizeof (*dmp));
+  dmp->_vl_msg_id = ntohs (VL_API_DISCONNECT_SESSION);
+  dmp->client_index = em->my_client_index;
+  dmp->handle = s->vpp_session_handle;
+  vl_msg_api_send_shmem (em->vl_input_queue, (u8 *) & dmp);
+}
+
+int
+client_disconnect (echo_main_t * em, session_t * s)
+{
+  client_send_disconnect (em, s);
+  pool_put (em->sessions, s);
+  memset (s, 0xfe, sizeof (*s));
+  return 0;
+}
+
+static void
+session_accepted_handler (session_accepted_msg_t * mp)
+{
+  app_session_evt_t _app_evt, *app_evt = &_app_evt;
+  session_accepted_reply_msg_t *rmp;
+  svm_fifo_t *rx_fifo, *tx_fifo;
+  echo_main_t *em = &echo_main;
+  session_t *session;
+  static f64 start_time;
+  u32 session_index;
+  u8 *ip_str;
+
+  if (start_time == 0.0)
+    start_time = clib_time_now (&em->clib_time);
+
+  ip_str = format (0, "%U", format_ip46_address, &mp->ip, mp->is_ip4);
+  clib_warning ("Accepted session from: %s:%d", ip_str,
+               clib_net_to_host_u16 (mp->port));
+
+  /* Allocate local session and set it up */
+  pool_get (em->sessions, session);
+  session_index = session - em->sessions;
+
+  rx_fifo = uword_to_pointer (mp->server_rx_fifo, svm_fifo_t *);
+  rx_fifo->client_session_index = session_index;
+  tx_fifo = uword_to_pointer (mp->server_tx_fifo, svm_fifo_t *);
+  tx_fifo->client_session_index = session_index;
+
+  session->server_rx_fifo = rx_fifo;
+  session->server_tx_fifo = tx_fifo;
+  session->vpp_evt_q = uword_to_pointer (mp->vpp_event_queue_address,
+                                        svm_msg_q_t *);
+
+  /* Add it to lookup table */
+  hash_set (em->session_index_by_vpp_handles, mp->handle, session_index);
+
+  em->state = STATE_READY;
+
+  /* Stats printing */
+  if (pool_elts (em->sessions) && (pool_elts (em->sessions) % 20000) == 0)
+    {
+      f64 now = clib_time_now (&em->clib_time);
+      fformat (stdout, "%d active sessions in %.2f seconds, %.2f/sec...\n",
+              pool_elts (em->sessions), now - start_time,
+              (f64) pool_elts (em->sessions) / (now - start_time));
+    }
+
+  /*
+   * Send accept reply to vpp
+   */
+  app_alloc_ctrl_evt_to_vpp (session->vpp_evt_q, app_evt,
+                            SESSION_CTRL_EVT_ACCEPTED_REPLY);
+  rmp = (session_accepted_reply_msg_t *) app_evt->evt->data;
+  rmp->handle = mp->handle;
+  rmp->context = mp->context;
+  app_send_ctrl_evt_to_vpp (session->vpp_evt_q, app_evt);
+
+  session->bytes_received = 0;
+  session->start = clib_time_now (&em->clib_time);
+}
+
 static void
-vl_api_connect_session_reply_t_handler (vl_api_connect_session_reply_t * mp)
+session_connected_handler (session_connected_msg_t * mp)
 {
   echo_main_t *em = &echo_main;
   session_t *session;
@@ -829,39 +870,96 @@ vl_api_connect_session_reply_t_handler (vl_api_connect_session_reply_t * mp)
                mp->is_ip4, clib_net_to_host_u16 (mp->lcl_port));
 }
 
-void
-client_send_connect (echo_main_t * em)
+static void
+session_disconnected_handler (session_disconnected_msg_t * mp)
 {
-  vl_api_connect_uri_t *cmp;
-  cmp = vl_msg_api_alloc (sizeof (*cmp));
-  memset (cmp, 0, sizeof (*cmp));
+  app_session_evt_t _app_evt, *app_evt = &_app_evt;
+  session_disconnected_reply_msg_t *rmp;
+  echo_main_t *em = &echo_main;
+  session_t *session = 0;
+  uword *p;
+  int rv = 0;
 
-  cmp->_vl_msg_id = ntohs (VL_API_CONNECT_URI);
-  cmp->client_index = em->my_client_index;
-  cmp->context = ntohl (0xfeedface);
-  memcpy (cmp->uri, em->connect_uri, vec_len (em->connect_uri));
-  vl_msg_api_send_shmem (em->vl_input_queue, (u8 *) & cmp);
+  p = hash_get (em->session_index_by_vpp_handles, mp->handle);
+
+  if (p)
+    {
+      clib_warning ("disconnected");
+      session = pool_elt_at_index (em->sessions, p[0]);
+      hash_unset (em->session_index_by_vpp_handles, mp->handle);
+      pool_put (em->sessions, session);
+    }
+  else
+    {
+      clib_warning ("couldn't find session key %llx", mp->handle);
+      rv = -11;
+    }
+
+  app_alloc_ctrl_evt_to_vpp (session->vpp_evt_q, app_evt,
+                            SESSION_CTRL_EVT_DISCONNECTED_REPLY);
+  rmp = (session_disconnected_reply_msg_t *) app_evt->evt->data;
+  rmp->retval = rv;
+  rmp->handle = mp->handle;
+  rmp->context = mp->context;
+  app_send_ctrl_evt_to_vpp (session->vpp_evt_q, app_evt);
+
+  if (session)
+    session_print_stats (em, session);
 }
 
-void
-client_send_disconnect (echo_main_t * em, session_t * s)
+static void
+session_reset_handler (session_reset_msg_t * mp)
 {
-  vl_api_disconnect_session_t *dmp;
-  dmp = vl_msg_api_alloc (sizeof (*dmp));
-  memset (dmp, 0, sizeof (*dmp));
-  dmp->_vl_msg_id = ntohs (VL_API_DISCONNECT_SESSION);
-  dmp->client_index = em->my_client_index;
-  dmp->handle = s->vpp_session_handle;
-  vl_msg_api_send_shmem (em->vl_input_queue, (u8 *) & dmp);
+  app_session_evt_t _app_evt, *app_evt = &_app_evt;
+  echo_main_t *em = &echo_main;
+  session_reset_reply_msg_t *rmp;
+  session_t *session = 0;
+  uword *p;
+  int rv = 0;
+
+  p = hash_get (em->session_index_by_vpp_handles, mp->handle);
+
+  if (p)
+    {
+      session = pool_elt_at_index (em->sessions, p[0]);
+      clib_warning ("got reset");
+      /* Cleanup later */
+      em->time_to_stop = 1;
+    }
+  else
+    {
+      clib_warning ("couldn't find session key %llx", mp->handle);
+      return;
+    }
+
+  app_alloc_ctrl_evt_to_vpp (session->vpp_evt_q, app_evt,
+                            SESSION_CTRL_EVT_RESET_REPLY);
+  rmp = (session_reset_reply_msg_t *) app_evt->evt->data;
+  rmp->retval = rv;
+  rmp->handle = mp->handle;
+  app_send_ctrl_evt_to_vpp (session->vpp_evt_q, app_evt);
 }
 
-int
-client_disconnect (echo_main_t * em, session_t * s)
+static void
+handle_mq_event (session_event_t * e)
 {
-  client_send_disconnect (em, s);
-  pool_put (em->sessions, s);
-  memset (s, 0xfe, sizeof (*s));
-  return 0;
+  switch (e->event_type)
+    {
+    case SESSION_CTRL_EVT_ACCEPTED:
+      session_accepted_handler ((session_accepted_msg_t *) e->data);
+      break;
+    case SESSION_CTRL_EVT_CONNECTED:
+      session_connected_handler ((session_connected_msg_t *) e->data);
+      break;
+    case SESSION_CTRL_EVT_DISCONNECTED:
+      session_disconnected_handler ((session_disconnected_msg_t *) e->data);
+      break;
+    case SESSION_CTRL_EVT_RESET:
+      session_reset_handler ((session_reset_msg_t *) e->data);
+      break;
+    default:
+      clib_warning ("unhandled %u", e->event_type);
+    }
 }
 
 static void
@@ -869,6 +967,7 @@ clients_run (echo_main_t * em)
 {
   f64 start_time, deltat, timeout = 100.0;
   svm_msg_q_msg_t msg;
+  session_event_t *e;
   session_t *s;
   int i;
 
@@ -890,7 +989,13 @@ clients_run (echo_main_t * em)
   while (em->n_clients_connected < em->n_clients
         && (clib_time_now (&em->clib_time) - start_time < timeout)
         && em->state != STATE_FAILED)
-    ;
+
+    {
+      svm_msg_q_sub (em->our_event_queue, &msg, SVM_Q_WAIT, 0);
+      e = svm_msg_q_msg_data (em->our_event_queue, &msg);
+      handle_mq_event (e);
+      svm_msg_q_free_msg (em->our_event_queue, &msg);
+    }
 
   if (em->n_clients_connected != em->n_clients)
     {
@@ -921,9 +1026,12 @@ clients_run (echo_main_t * em)
        if (svm_msg_q_sub (em->our_event_queue, &msg, SVM_Q_WAIT, 0))
          {
            clib_warning ("svm msg q returned");
+           continue;
          }
-       else
-         svm_msg_q_free_msg (em->our_event_queue, &msg);
+       e = svm_msg_q_msg_data (em->our_event_queue, &msg);
+       if (e->event_type != FIFO_EVENT_APP_RX)
+         handle_mq_event (e);
+       svm_msg_q_free_msg (em->our_event_queue, &msg);
       }
 
   for (i = 0; i < em->n_clients; i++)
@@ -1055,72 +1163,10 @@ format_ip46_address (u8 * s, va_list * args)
 }
 
 static void
-vl_api_accept_session_t_handler (vl_api_accept_session_t * mp)
-{
-  echo_main_t *em = &echo_main;
-  vl_api_accept_session_reply_t *rmp;
-  svm_fifo_t *rx_fifo, *tx_fifo;
-  session_t *session;
-  static f64 start_time;
-  u32 session_index;
-  u8 *ip_str;
-
-  if (start_time == 0.0)
-    start_time = clib_time_now (&em->clib_time);
-
-  ip_str = format (0, "%U", format_ip46_address, &mp->ip, mp->is_ip4);
-  clib_warning ("Accepted session from: %s:%d", ip_str,
-               clib_net_to_host_u16 (mp->port));
-  em->vpp_event_queue =
-    uword_to_pointer (mp->vpp_event_queue_address, svm_queue_t *);
-
-  /* Allocate local session and set it up */
-  pool_get (em->sessions, session);
-  session_index = session - em->sessions;
-
-  rx_fifo = uword_to_pointer (mp->server_rx_fifo, svm_fifo_t *);
-  rx_fifo->client_session_index = session_index;
-  tx_fifo = uword_to_pointer (mp->server_tx_fifo, svm_fifo_t *);
-  tx_fifo->client_session_index = session_index;
-
-  session->server_rx_fifo = rx_fifo;
-  session->server_tx_fifo = tx_fifo;
-
-  /* Add it to lookup table */
-  hash_set (em->session_index_by_vpp_handles, mp->handle, session_index);
-
-  em->state = STATE_READY;
-
-  /* Stats printing */
-  if (pool_elts (em->sessions) && (pool_elts (em->sessions) % 20000) == 0)
-    {
-      f64 now = clib_time_now (&em->clib_time);
-      fformat (stdout, "%d active sessions in %.2f seconds, %.2f/sec...\n",
-              pool_elts (em->sessions), now - start_time,
-              (f64) pool_elts (em->sessions) / (now - start_time));
-    }
-
-  /*
-   * Send accept reply to vpp
-   */
-  rmp = vl_msg_api_alloc (sizeof (*rmp));
-  memset (rmp, 0, sizeof (*rmp));
-  rmp->_vl_msg_id = ntohs (VL_API_ACCEPT_SESSION_REPLY);
-  rmp->handle = mp->handle;
-  rmp->context = mp->context;
-  vl_msg_api_send_shmem (em->vl_input_queue, (u8 *) & rmp);
-
-  session->bytes_received = 0;
-  session->start = clib_time_now (&em->clib_time);
-}
-
-void
-server_handle_fifo_event_rx (echo_main_t * em, session_fifo_event_t * e)
+server_handle_fifo_event_rx (echo_main_t * em, session_event_t * e)
 {
   svm_fifo_t *rx_fifo, *tx_fifo;
   int n_read;
-  session_fifo_event_t evt;
-  svm_queue_t *q;
   session_t *session;
   int rv;
   u32 max_dequeue, offset, max_transfer, rx_buf_len;
@@ -1167,24 +1213,18 @@ server_handle_fifo_event_rx (echo_main_t * em, session_fifo_event_t * e)
 
          /* If event wasn't set, add one */
          if (svm_fifo_set_event (tx_fifo))
-           {
-             /* Fabricate TX event, send to vpp */
-             evt.fifo = tx_fifo;
-             evt.event_type = FIFO_EVENT_APP_TX;
-
-             q = em->vpp_event_queue;
-             svm_queue_add (q, (u8 *) & evt, 1 /* do wait for mutex */ );
-           }
+           app_send_io_evt_to_vpp (session->vpp_evt_q, tx_fifo,
+                                   FIFO_EVENT_APP_TX, SVM_Q_WAIT);
        }
     }
   while ((n_read < 0 || max_dequeue > 0) && !em->time_to_stop);
 }
 
-void
+static void
 server_handle_event_queue (echo_main_t * em)
 {
-  session_fifo_event_t _e, *e = &_e;
   svm_msg_q_msg_t msg;
+  session_event_t *e;
 
   while (1)
     {
@@ -1195,13 +1235,8 @@ server_handle_event_queue (echo_main_t * em)
        case FIFO_EVENT_APP_RX:
          server_handle_fifo_event_rx (em, e);
          break;
-
-       case FIFO_EVENT_DISCONNECT:
-         svm_msg_q_free_msg (em->our_event_queue, &msg);
-         return;
-
        default:
-         clib_warning ("unknown event type %d", e->event_type);
+         handle_mq_event (e);
          break;
        }
       if (PREDICT_FALSE (em->time_to_stop == 1))
@@ -1346,11 +1381,7 @@ static void
 #define foreach_tcp_echo_msg                                           \
 _(BIND_URI_REPLY, bind_uri_reply)                                      \
 _(UNBIND_URI_REPLY, unbind_uri_reply)                                  \
-_(ACCEPT_SESSION, accept_session)                                      \
-_(CONNECT_SESSION_REPLY, connect_session_reply)                        \
-_(DISCONNECT_SESSION, disconnect_session)                              \
 _(DISCONNECT_SESSION_REPLY, disconnect_session_reply)                  \
-_(RESET_SESSION, reset_session)                                        \
 _(APPLICATION_ATTACH_REPLY, application_attach_reply)                  \
 _(APPLICATION_DETACH_REPLY, application_detach_reply)                  \
 _(MAP_ANOTHER_SEGMENT, map_another_segment)                            \
@@ -1394,8 +1425,6 @@ main (int argc, char **argv)
   /* make the main heap thread-safe */
   h->flags |= MHEAP_FLAG_THREAD_SAFE;
 
-  vec_validate (em->rx_buf, 128 << 10);
-
   memset (em, 0, sizeof (*em));
   em->session_index_by_vpp_handles = hash_create (0, sizeof (uword));
   em->my_pid = getpid ();
@@ -1472,6 +1501,7 @@ main (int argc, char **argv)
   em->test_return_packets = test_return_packets;
   em->bytes_to_send = bytes_to_send;
   em->time_to_stop = 0;
+  vec_validate (em->rx_buf, 128 << 10);
   vec_validate (em->client_thread_handles, em->n_clients - 1);
   vec_validate (em->thread_args, em->n_clients - 1);