vcl: support for eventfd mq signaling
[vpp.git] / src / tests / vnet / session / tcp_echo.c
index e8868ed..140a198 100644 (file)
@@ -48,7 +48,7 @@ typedef struct
   svm_fifo_t *server_rx_fifo;
   svm_fifo_t *server_tx_fifo;
 
-  svm_queue_t *vpp_evt_q;
+  svm_msg_q_t *vpp_evt_q;
 
   u64 vpp_session_handle;
   u64 bytes_sent;
@@ -62,6 +62,7 @@ typedef enum
 {
   STATE_START,
   STATE_ATTACHED,
+  STATE_LISTEN,
   STATE_READY,
   STATE_DISCONNECTING,
   STATE_FAILED,
@@ -99,10 +100,7 @@ typedef struct
   int no_return;
 
   /* Our event queue */
-  svm_queue_t *our_event_queue;
-
-  /* $$$ single thread only for the moment */
-  svm_queue_t *vpp_event_queue;
+  svm_msg_q_t *our_event_queue;
 
   u8 *socket_name;
 
@@ -240,16 +238,22 @@ init_error_string_table (echo_main_t * em)
   hash_set (em->error_string_by_error_number, 99, "Misc");
 }
 
-int
+static void handle_mq_event (session_event_t * e);
+
+static int
 wait_for_state_change (echo_main_t * em, connection_state_t state)
 {
+  svm_msg_q_msg_t msg;
+  session_event_t *e;
+  f64 timeout;
+
 #if CLIB_DEBUG > 0
 #define TIMEOUT 600.0
 #else
 #define TIMEOUT 600.0
 #endif
 
-  f64 timeout = clib_time_now (&em->clib_time) + TIMEOUT;
+  timeout = clib_time_now (&em->clib_time) + TIMEOUT;
 
   while (clib_time_now (&em->clib_time) < timeout)
     {
@@ -259,6 +263,14 @@ wait_for_state_change (echo_main_t * em, connection_state_t state)
        return -1;
       if (em->time_to_stop == 1)
        return 0;
+      if (!em->our_event_queue || em->state < STATE_ATTACHED)
+       continue;
+
+      if (svm_msg_q_sub (em->our_event_queue, &msg, SVM_Q_NOWAIT, 0))
+       continue;
+      e = svm_msg_q_msg_data (em->our_event_queue, &msg);
+      handle_mq_event (e);
+      svm_msg_q_free_msg (em->our_event_queue, &msg);
     }
   clib_warning ("timeout waiting for state %d", state);
   return -1;
@@ -277,8 +289,9 @@ application_send_attach (echo_main_t * em)
   bmp->_vl_msg_id = ntohs (VL_API_APPLICATION_ATTACH);
   bmp->client_index = em->my_client_index;
   bmp->context = ntohl (0xfeedface);
-  bmp->options[APP_OPTIONS_FLAGS] =
-    APP_OPTIONS_FLAGS_ACCEPT_REDIRECT | APP_OPTIONS_FLAGS_ADD_SEGMENT;
+  bmp->options[APP_OPTIONS_FLAGS] = APP_OPTIONS_FLAGS_ACCEPT_REDIRECT;
+  bmp->options[APP_OPTIONS_FLAGS] |= APP_OPTIONS_FLAGS_ADD_SEGMENT;
+  bmp->options[APP_OPTIONS_FLAGS] |= APP_OPTIONS_FLAGS_USE_MQ_FOR_CTRL_MSGS;
   bmp->options[APP_OPTIONS_PREALLOC_FIFO_PAIRS] = 16;
   bmp->options[APP_OPTIONS_RX_FIFO_SIZE] = em->fifo_size;
   bmp->options[APP_OPTIONS_TX_FIFO_SIZE] = em->fifo_size;
@@ -306,7 +319,7 @@ application_send_attach (echo_main_t * em)
   vl_msg_api_send_shmem (em->vl_input_queue, (u8 *) & key_mp);
 }
 
-int
+static int
 application_attach (echo_main_t * em)
 {
   application_send_attach (em);
@@ -334,44 +347,17 @@ application_detach (echo_main_t * em)
 }
 
 static int
-memfd_segment_attach (void)
-{
-  ssvm_private_t _ssvm = { 0 }, *ssvm = &_ssvm;
-  clib_error_t *error;
-  int rv;
-
-  if ((error = vl_socket_client_recv_fd_msg (&ssvm->fd, 5)))
-    {
-      clib_error_report (error);
-      return -1;
-    }
-
-  if ((rv = ssvm_slave_init_memfd (ssvm)))
-    return rv;
-
-  return 0;
-}
-
-static int
-fifo_segment_attach (char *name, u32 size, ssvm_segment_type_t type)
+ssvm_segment_attach (char *name, ssvm_segment_type_t type, int fd)
 {
   svm_fifo_segment_create_args_t _a, *a = &_a;
-  clib_error_t *error;
   int rv;
 
   memset (a, 0, sizeof (*a));
   a->segment_name = (char *) name;
-  a->segment_size = size;
   a->segment_type = type;
 
   if (type == SSVM_SEGMENT_MEMFD)
-    {
-      if ((error = vl_socket_client_recv_fd_msg (&a->memfd_fd, 5)))
-       {
-         clib_error_report (error);
-         return -1;
-       }
-    }
+    a->memfd_fd = fd;
 
   if ((rv = svm_fifo_segment_attach (a)))
     {
@@ -379,6 +365,7 @@ fifo_segment_attach (char *name, u32 size, ssvm_segment_type_t type)
       return rv;
     }
 
+  vec_reset_length (a->new_segment_indices);
   return 0;
 }
 
@@ -387,47 +374,57 @@ vl_api_application_attach_reply_t_handler (vl_api_application_attach_reply_t *
                                           mp)
 {
   echo_main_t *em = &echo_main;
-  ssvm_segment_type_t seg_type;
+  int *fds = 0;
+  u32 n_fds = 0;
 
   if (mp->retval)
     {
       clib_warning ("attach failed: %U", format_api_error,
                    clib_net_to_host_u32 (mp->retval));
-      em->state = STATE_FAILED;
-      return;
+      goto failed;
     }
 
   if (mp->segment_name_length == 0)
     {
       clib_warning ("segment_name_length zero");
-      return;
+      goto failed;
     }
 
-  seg_type = em->use_sock_api ? SSVM_SEGMENT_MEMFD : SSVM_SEGMENT_SHM;
+  ASSERT (mp->app_event_queue_address);
+  em->our_event_queue = uword_to_pointer (mp->app_event_queue_address,
+                                         svm_msg_q_t *);
 
-  /* Attach to fifo segment */
-  if (fifo_segment_attach ((char *) mp->segment_name, mp->segment_size,
-                          seg_type))
+  if (mp->n_fds)
     {
-      em->state = STATE_FAILED;
-      return;
-    }
+      vec_validate (fds, mp->n_fds);
+      vl_socket_client_recv_fd_msg (fds, mp->n_fds, 5);
+
+      if (mp->fd_flags & SESSION_FD_F_VPP_MQ_SEGMENT)
+       if (ssvm_segment_attach (0, SSVM_SEGMENT_MEMFD, fds[n_fds++]))
+         goto failed;
+
+      if (mp->fd_flags & SESSION_FD_F_MEMFD_SEGMENT)
+       if (ssvm_segment_attach ((char *) mp->segment_name,
+                                SSVM_SEGMENT_MEMFD, fds[n_fds++]))
+         goto failed;
 
-  /* If we're using memfd segments, read and attach to event qs segment */
-  if (seg_type == SSVM_SEGMENT_MEMFD)
+      if (mp->fd_flags & SESSION_FD_F_MQ_EVENTFD)
+       svm_msg_q_set_consumer_eventfd (em->our_event_queue, fds[n_fds++]);
+
+      vec_free (fds);
+    }
+  else
     {
-      if (memfd_segment_attach ())
-       {
-         clib_warning ("failed to attach to evt q segment");
-         em->state = STATE_FAILED;
-         return;
-       }
+      if (ssvm_segment_attach ((char *) mp->segment_name, SSVM_SEGMENT_SHM,
+                              -1))
+       goto failed;
     }
 
-  ASSERT (mp->app_event_queue_address);
-  em->our_event_queue = uword_to_pointer (mp->app_event_queue_address,
-                                         svm_queue_t *);
   em->state = STATE_ATTACHED;
+  return;
+failed:
+  em->state = STATE_FAILED;
+  return;
 }
 
 static void
@@ -547,74 +544,6 @@ session_print_stats (echo_main_t * em, session_t * session)
   fformat (stdout, "%.4f Gbit/second\n", (bytes * 8.0) / deltat / 1e9);
 }
 
-static void
-vl_api_disconnect_session_t_handler (vl_api_disconnect_session_t * mp)
-{
-  echo_main_t *em = &echo_main;
-  session_t *session = 0;
-  vl_api_disconnect_session_reply_t *rmp;
-  uword *p;
-  int rv = 0;
-
-  p = hash_get (em->session_index_by_vpp_handles, mp->handle);
-
-  if (p)
-    {
-      session = pool_elt_at_index (em->sessions, p[0]);
-      hash_unset (em->session_index_by_vpp_handles, mp->handle);
-      pool_put (em->sessions, session);
-    }
-  else
-    {
-      clib_warning ("couldn't find session key %llx", mp->handle);
-      rv = -11;
-    }
-
-//  em->time_to_stop = 1;
-
-  rmp = vl_msg_api_alloc (sizeof (*rmp));
-  memset (rmp, 0, sizeof (*rmp));
-
-  rmp->_vl_msg_id = ntohs (VL_API_DISCONNECT_SESSION_REPLY);
-  rmp->retval = rv;
-  rmp->handle = mp->handle;
-  rmp->context = mp->context;
-  vl_msg_api_send_shmem (em->vl_input_queue, (u8 *) & rmp);
-
-  if (session)
-    session_print_stats (em, session);
-}
-
-static void
-vl_api_reset_session_t_handler (vl_api_reset_session_t * mp)
-{
-  echo_main_t *em = &echo_main;
-  vl_api_reset_session_reply_t *rmp;
-  uword *p;
-  int rv = 0;
-
-  p = hash_get (em->session_index_by_vpp_handles, mp->handle);
-
-  if (p)
-    {
-      clib_warning ("got reset");
-      /* Cleanup later */
-      em->time_to_stop = 1;
-    }
-  else
-    {
-      clib_warning ("couldn't find session key %llx", mp->handle);
-      rv = -11;
-    }
-
-  rmp = vl_msg_api_alloc (sizeof (*rmp));
-  memset (rmp, 0, sizeof (*rmp));
-  rmp->_vl_msg_id = ntohs (VL_API_RESET_SESSION_REPLY);
-  rmp->retval = rv;
-  rmp->handle = mp->handle;
-  vl_msg_api_send_shmem (em->vl_input_queue, (u8 *) & rmp);
-}
-
 static void
 test_recv_bytes (session_t * s, u8 * rx_buf, u32 n_read)
 {
@@ -661,7 +590,7 @@ recv_test_chunk (echo_main_t * em, session_t * s, u8 * rx_buf)
 }
 
 void
-client_handle_fifo_event_rx (echo_main_t * em, session_fifo_event_t * e,
+client_handle_fifo_event_rx (echo_main_t * em, session_event_t * e,
                             u8 * rx_buf)
 {
   session_t *s;
@@ -676,8 +605,7 @@ send_test_chunk (echo_main_t * em, session_t * s)
   u64 test_buf_len, bytes_this_chunk, test_buf_offset;
   svm_fifo_t *tx_fifo = s->server_tx_fifo;
   u8 *test_data = em->connect_test_data;
-  u32 enq_space, min_chunk = 16 << 10;
-  session_fifo_event_t evt;
+  u32 enq_space = 16 << 10;
   int written;
 
   test_buf_len = vec_len (test_data);
@@ -685,8 +613,6 @@ send_test_chunk (echo_main_t * em, session_t * s)
   bytes_this_chunk = clib_min (test_buf_len - test_buf_offset,
                               s->bytes_to_send);
   enq_space = svm_fifo_max_enqueue (tx_fifo);
-  if (enq_space < clib_min (bytes_this_chunk, min_chunk))
-    return;
 
   bytes_this_chunk = clib_min (bytes_this_chunk, enq_space);
   written = svm_fifo_enqueue_nowait (tx_fifo, bytes_this_chunk,
@@ -698,12 +624,8 @@ send_test_chunk (echo_main_t * em, session_t * s)
       s->bytes_sent += written;
 
       if (svm_fifo_set_event (tx_fifo))
-       {
-         /* Fabricate TX event, send to vpp */
-         evt.fifo = tx_fifo;
-         evt.event_type = FIFO_EVENT_APP_TX;
-         svm_queue_add (s->vpp_evt_q, (u8 *) & evt, 0 /* wait for mutex */ );
-       }
+       app_send_io_evt_to_vpp (s->vpp_evt_q, tx_fifo, FIFO_EVENT_APP_TX,
+                               0 /* do wait for mutex */ );
     }
 }
 
@@ -748,9 +670,10 @@ client_thread_fn (void *arg)
 void *
 client_rx_thread_fn (void *arg)
 {
-  session_fifo_event_t _e, *e = &_e;
+  session_event_t _e, *e = &_e;
   echo_main_t *em = &echo_main;
   static u8 *rx_buf = 0;
+  svm_msg_q_msg_t msg;
 
   vec_validate (rx_buf, 1 << 20);
 
@@ -759,7 +682,8 @@ client_rx_thread_fn (void *arg)
 
   while (!em->time_to_stop)
     {
-      svm_queue_sub (em->our_event_queue, (u8 *) e, SVM_Q_WAIT, 0);
+      svm_msg_q_sub (em->our_event_queue, &msg, SVM_Q_WAIT, 0);
+      e = svm_msg_q_msg_data (em->our_event_queue, &msg);
       switch (e->event_type)
        {
        case FIFO_EVENT_APP_RX:
@@ -769,12 +693,109 @@ client_rx_thread_fn (void *arg)
          clib_warning ("unknown event type %d", e->event_type);
          break;
        }
+      svm_msg_q_free_msg (em->our_event_queue, &msg);
     }
   pthread_exit (0);
 }
 
+void
+client_send_connect (echo_main_t * em)
+{
+  vl_api_connect_uri_t *cmp;
+  cmp = vl_msg_api_alloc (sizeof (*cmp));
+  memset (cmp, 0, sizeof (*cmp));
+
+  cmp->_vl_msg_id = ntohs (VL_API_CONNECT_URI);
+  cmp->client_index = em->my_client_index;
+  cmp->context = ntohl (0xfeedface);
+  memcpy (cmp->uri, em->connect_uri, vec_len (em->connect_uri));
+  vl_msg_api_send_shmem (em->vl_input_queue, (u8 *) & cmp);
+}
+
+void
+client_send_disconnect (echo_main_t * em, session_t * s)
+{
+  vl_api_disconnect_session_t *dmp;
+  dmp = vl_msg_api_alloc (sizeof (*dmp));
+  memset (dmp, 0, sizeof (*dmp));
+  dmp->_vl_msg_id = ntohs (VL_API_DISCONNECT_SESSION);
+  dmp->client_index = em->my_client_index;
+  dmp->handle = s->vpp_session_handle;
+  vl_msg_api_send_shmem (em->vl_input_queue, (u8 *) & dmp);
+}
+
+int
+client_disconnect (echo_main_t * em, session_t * s)
+{
+  client_send_disconnect (em, s);
+  pool_put (em->sessions, s);
+  memset (s, 0xfe, sizeof (*s));
+  return 0;
+}
+
 static void
-vl_api_connect_session_reply_t_handler (vl_api_connect_session_reply_t * mp)
+session_accepted_handler (session_accepted_msg_t * mp)
+{
+  app_session_evt_t _app_evt, *app_evt = &_app_evt;
+  session_accepted_reply_msg_t *rmp;
+  svm_fifo_t *rx_fifo, *tx_fifo;
+  echo_main_t *em = &echo_main;
+  session_t *session;
+  static f64 start_time;
+  u32 session_index;
+  u8 *ip_str;
+
+  if (start_time == 0.0)
+    start_time = clib_time_now (&em->clib_time);
+
+  ip_str = format (0, "%U", format_ip46_address, &mp->ip, mp->is_ip4);
+  clib_warning ("Accepted session from: %s:%d", ip_str,
+               clib_net_to_host_u16 (mp->port));
+
+  /* Allocate local session and set it up */
+  pool_get (em->sessions, session);
+  session_index = session - em->sessions;
+
+  rx_fifo = uword_to_pointer (mp->server_rx_fifo, svm_fifo_t *);
+  rx_fifo->client_session_index = session_index;
+  tx_fifo = uword_to_pointer (mp->server_tx_fifo, svm_fifo_t *);
+  tx_fifo->client_session_index = session_index;
+
+  session->server_rx_fifo = rx_fifo;
+  session->server_tx_fifo = tx_fifo;
+  session->vpp_evt_q = uword_to_pointer (mp->vpp_event_queue_address,
+                                        svm_msg_q_t *);
+
+  /* Add it to lookup table */
+  hash_set (em->session_index_by_vpp_handles, mp->handle, session_index);
+
+  em->state = STATE_READY;
+
+  /* Stats printing */
+  if (pool_elts (em->sessions) && (pool_elts (em->sessions) % 20000) == 0)
+    {
+      f64 now = clib_time_now (&em->clib_time);
+      fformat (stdout, "%d active sessions in %.2f seconds, %.2f/sec...\n",
+              pool_elts (em->sessions), now - start_time,
+              (f64) pool_elts (em->sessions) / (now - start_time));
+    }
+
+  /*
+   * Send accept reply to vpp
+   */
+  app_alloc_ctrl_evt_to_vpp (session->vpp_evt_q, app_evt,
+                            SESSION_CTRL_EVT_ACCEPTED_REPLY);
+  rmp = (session_accepted_reply_msg_t *) app_evt->evt->data;
+  rmp->handle = mp->handle;
+  rmp->context = mp->context;
+  app_send_ctrl_evt_to_vpp (session->vpp_evt_q, app_evt);
+
+  session->bytes_received = 0;
+  session->start = clib_time_now (&em->clib_time);
+}
+
+static void
+session_connected_handler (session_connected_msg_t * mp)
 {
   echo_main_t *em = &echo_main;
   session_t *session;
@@ -808,7 +829,7 @@ vl_api_connect_session_reply_t_handler (vl_api_connect_session_reply_t * mp)
   session->vpp_session_handle = mp->handle;
   session->start = clib_time_now (&em->clib_time);
   session->vpp_evt_q = uword_to_pointer (mp->vpp_event_queue_address,
-                                        svm_queue_t *);
+                                        svm_msg_q_t *);
 
   hash_set (em->session_index_by_vpp_handles, mp->handle, session_index);
 
@@ -831,46 +852,99 @@ vl_api_connect_session_reply_t_handler (vl_api_connect_session_reply_t * mp)
                mp->is_ip4, clib_net_to_host_u16 (mp->lcl_port));
 }
 
-void
-client_send_connect (echo_main_t * em)
+static void
+session_disconnected_handler (session_disconnected_msg_t * mp)
 {
-  vl_api_connect_uri_t *cmp;
-  cmp = vl_msg_api_alloc (sizeof (*cmp));
-  memset (cmp, 0, sizeof (*cmp));
+  app_session_evt_t _app_evt, *app_evt = &_app_evt;
+  session_disconnected_reply_msg_t *rmp;
+  echo_main_t *em = &echo_main;
+  session_t *session = 0;
+  uword *p;
+  int rv = 0;
 
-  cmp->_vl_msg_id = ntohs (VL_API_CONNECT_URI);
-  cmp->client_index = em->my_client_index;
-  cmp->context = ntohl (0xfeedface);
-  memcpy (cmp->uri, em->connect_uri, vec_len (em->connect_uri));
-  vl_msg_api_send_shmem (em->vl_input_queue, (u8 *) & cmp);
+  p = hash_get (em->session_index_by_vpp_handles, mp->handle);
+  if (!p)
+    {
+      clib_warning ("couldn't find session key %llx", mp->handle);
+      return;
+    }
+
+  session = pool_elt_at_index (em->sessions, p[0]);
+  hash_unset (em->session_index_by_vpp_handles, mp->handle);
+  pool_put (em->sessions, session);
+
+  app_alloc_ctrl_evt_to_vpp (session->vpp_evt_q, app_evt,
+                            SESSION_CTRL_EVT_DISCONNECTED_REPLY);
+  rmp = (session_disconnected_reply_msg_t *) app_evt->evt->data;
+  rmp->retval = rv;
+  rmp->handle = mp->handle;
+  rmp->context = mp->context;
+  app_send_ctrl_evt_to_vpp (session->vpp_evt_q, app_evt);
+
+  session_print_stats (em, session);
 }
 
-void
-client_send_disconnect (echo_main_t * em, session_t * s)
+static void
+session_reset_handler (session_reset_msg_t * mp)
 {
-  vl_api_disconnect_session_t *dmp;
-  dmp = vl_msg_api_alloc (sizeof (*dmp));
-  memset (dmp, 0, sizeof (*dmp));
-  dmp->_vl_msg_id = ntohs (VL_API_DISCONNECT_SESSION);
-  dmp->client_index = em->my_client_index;
-  dmp->handle = s->vpp_session_handle;
-  vl_msg_api_send_shmem (em->vl_input_queue, (u8 *) & dmp);
+  app_session_evt_t _app_evt, *app_evt = &_app_evt;
+  echo_main_t *em = &echo_main;
+  session_reset_reply_msg_t *rmp;
+  session_t *session = 0;
+  uword *p;
+  int rv = 0;
+
+  p = hash_get (em->session_index_by_vpp_handles, mp->handle);
+
+  if (p)
+    {
+      session = pool_elt_at_index (em->sessions, p[0]);
+      clib_warning ("got reset");
+      /* Cleanup later */
+      em->time_to_stop = 1;
+    }
+  else
+    {
+      clib_warning ("couldn't find session key %llx", mp->handle);
+      return;
+    }
+
+  app_alloc_ctrl_evt_to_vpp (session->vpp_evt_q, app_evt,
+                            SESSION_CTRL_EVT_RESET_REPLY);
+  rmp = (session_reset_reply_msg_t *) app_evt->evt->data;
+  rmp->retval = rv;
+  rmp->handle = mp->handle;
+  app_send_ctrl_evt_to_vpp (session->vpp_evt_q, app_evt);
 }
 
-int
-client_disconnect (echo_main_t * em, session_t * s)
+static void
+handle_mq_event (session_event_t * e)
 {
-  client_send_disconnect (em, s);
-  pool_put (em->sessions, s);
-  memset (s, 0xfe, sizeof (*s));
-  return 0;
+  switch (e->event_type)
+    {
+    case SESSION_CTRL_EVT_ACCEPTED:
+      session_accepted_handler ((session_accepted_msg_t *) e->data);
+      break;
+    case SESSION_CTRL_EVT_CONNECTED:
+      session_connected_handler ((session_connected_msg_t *) e->data);
+      break;
+    case SESSION_CTRL_EVT_DISCONNECTED:
+      session_disconnected_handler ((session_disconnected_msg_t *) e->data);
+      break;
+    case SESSION_CTRL_EVT_RESET:
+      session_reset_handler ((session_reset_msg_t *) e->data);
+      break;
+    default:
+      clib_warning ("unhandled %u", e->event_type);
+    }
 }
 
 static void
 clients_run (echo_main_t * em)
 {
-  session_fifo_event_t _e, *e = &_e;
   f64 start_time, deltat, timeout = 100.0;
+  svm_msg_q_msg_t msg;
+  session_event_t *e;
   session_t *s;
   int i;
 
@@ -892,7 +966,13 @@ clients_run (echo_main_t * em)
   while (em->n_clients_connected < em->n_clients
         && (clib_time_now (&em->clib_time) - start_time < timeout)
         && em->state != STATE_FAILED)
-    ;
+
+    {
+      svm_msg_q_sub (em->our_event_queue, &msg, SVM_Q_WAIT, 0);
+      e = svm_msg_q_msg_data (em->our_event_queue, &msg);
+      handle_mq_event (e);
+      svm_msg_q_free_msg (em->our_event_queue, &msg);
+    }
 
   if (em->n_clients_connected != em->n_clients)
     {
@@ -918,8 +998,18 @@ clients_run (echo_main_t * em)
   start_time = clib_time_now (&em->clib_time);
   em->state = STATE_READY;
   while (em->n_active_clients)
-    svm_queue_sub (em->our_event_queue, (u8 *) e, SVM_Q_NOWAIT, 0);
-
+    if (!svm_msg_q_is_empty (em->our_event_queue))
+      {
+       if (svm_msg_q_sub (em->our_event_queue, &msg, SVM_Q_WAIT, 0))
+         {
+           clib_warning ("svm msg q returned");
+           continue;
+         }
+       e = svm_msg_q_msg_data (em->our_event_queue, &msg);
+       if (e->event_type != FIFO_EVENT_APP_RX)
+         handle_mq_event (e);
+       svm_msg_q_free_msg (em->our_event_queue, &msg);
+      }
 
   for (i = 0; i < em->n_clients; i++)
     {
@@ -1050,72 +1140,10 @@ format_ip46_address (u8 * s, va_list * args)
 }
 
 static void
-vl_api_accept_session_t_handler (vl_api_accept_session_t * mp)
-{
-  echo_main_t *em = &echo_main;
-  vl_api_accept_session_reply_t *rmp;
-  svm_fifo_t *rx_fifo, *tx_fifo;
-  session_t *session;
-  static f64 start_time;
-  u32 session_index;
-  u8 *ip_str;
-
-  if (start_time == 0.0)
-    start_time = clib_time_now (&em->clib_time);
-
-  ip_str = format (0, "%U", format_ip46_address, &mp->ip, mp->is_ip4);
-  clib_warning ("Accepted session from: %s:%d", ip_str,
-               clib_net_to_host_u16 (mp->port));
-  em->vpp_event_queue =
-    uword_to_pointer (mp->vpp_event_queue_address, svm_queue_t *);
-
-  /* Allocate local session and set it up */
-  pool_get (em->sessions, session);
-  session_index = session - em->sessions;
-
-  rx_fifo = uword_to_pointer (mp->server_rx_fifo, svm_fifo_t *);
-  rx_fifo->client_session_index = session_index;
-  tx_fifo = uword_to_pointer (mp->server_tx_fifo, svm_fifo_t *);
-  tx_fifo->client_session_index = session_index;
-
-  session->server_rx_fifo = rx_fifo;
-  session->server_tx_fifo = tx_fifo;
-
-  /* Add it to lookup table */
-  hash_set (em->session_index_by_vpp_handles, mp->handle, session_index);
-
-  em->state = STATE_READY;
-
-  /* Stats printing */
-  if (pool_elts (em->sessions) && (pool_elts (em->sessions) % 20000) == 0)
-    {
-      f64 now = clib_time_now (&em->clib_time);
-      fformat (stdout, "%d active sessions in %.2f seconds, %.2f/sec...\n",
-              pool_elts (em->sessions), now - start_time,
-              (f64) pool_elts (em->sessions) / (now - start_time));
-    }
-
-  /*
-   * Send accept reply to vpp
-   */
-  rmp = vl_msg_api_alloc (sizeof (*rmp));
-  memset (rmp, 0, sizeof (*rmp));
-  rmp->_vl_msg_id = ntohs (VL_API_ACCEPT_SESSION_REPLY);
-  rmp->handle = mp->handle;
-  rmp->context = mp->context;
-  vl_msg_api_send_shmem (em->vl_input_queue, (u8 *) & rmp);
-
-  session->bytes_received = 0;
-  session->start = clib_time_now (&em->clib_time);
-}
-
-void
-server_handle_fifo_event_rx (echo_main_t * em, session_fifo_event_t * e)
+server_handle_fifo_event_rx (echo_main_t * em, session_event_t * e)
 {
   svm_fifo_t *rx_fifo, *tx_fifo;
   int n_read;
-  session_fifo_event_t evt;
-  svm_queue_t *q;
   session_t *session;
   int rv;
   u32 max_dequeue, offset, max_transfer, rx_buf_len;
@@ -1162,38 +1190,30 @@ server_handle_fifo_event_rx (echo_main_t * em, session_fifo_event_t * e)
 
          /* If event wasn't set, add one */
          if (svm_fifo_set_event (tx_fifo))
-           {
-             /* Fabricate TX event, send to vpp */
-             evt.fifo = tx_fifo;
-             evt.event_type = FIFO_EVENT_APP_TX;
-
-             q = em->vpp_event_queue;
-             svm_queue_add (q, (u8 *) & evt, 1 /* do wait for mutex */ );
-           }
+           app_send_io_evt_to_vpp (session->vpp_evt_q, tx_fifo,
+                                   FIFO_EVENT_APP_TX, SVM_Q_WAIT);
        }
     }
   while ((n_read < 0 || max_dequeue > 0) && !em->time_to_stop);
 }
 
-void
+static void
 server_handle_event_queue (echo_main_t * em)
 {
-  session_fifo_event_t _e, *e = &_e;
+  svm_msg_q_msg_t msg;
+  session_event_t *e;
 
   while (1)
     {
-      svm_queue_sub (em->our_event_queue, (u8 *) e, SVM_Q_WAIT, 0);
+      svm_msg_q_sub (em->our_event_queue, &msg, SVM_Q_WAIT, 0);
+      e = svm_msg_q_msg_data (em->our_event_queue, &msg);
       switch (e->event_type)
        {
        case FIFO_EVENT_APP_RX:
          server_handle_fifo_event_rx (em, e);
          break;
-
-       case FIFO_EVENT_DISCONNECT:
-         return;
-
        default:
-         clib_warning ("unknown event type %d", e->event_type);
+         handle_mq_event (e);
          break;
        }
       if (PREDICT_FALSE (em->time_to_stop == 1))
@@ -1203,6 +1223,7 @@ server_handle_event_queue (echo_main_t * em)
          em->time_to_print_stats = 0;
          fformat (stdout, "%d connections\n", pool_elts (em->sessions));
        }
+      svm_msg_q_free_msg (em->our_event_queue, &msg);
     }
 }
 
@@ -1337,11 +1358,7 @@ static void
 #define foreach_tcp_echo_msg                                           \
 _(BIND_URI_REPLY, bind_uri_reply)                                      \
 _(UNBIND_URI_REPLY, unbind_uri_reply)                                  \
-_(ACCEPT_SESSION, accept_session)                                      \
-_(CONNECT_SESSION_REPLY, connect_session_reply)                        \
-_(DISCONNECT_SESSION, disconnect_session)                              \
 _(DISCONNECT_SESSION_REPLY, disconnect_session_reply)                  \
-_(RESET_SESSION, reset_session)                                        \
 _(APPLICATION_ATTACH_REPLY, application_attach_reply)                  \
 _(APPLICATION_DETACH_REPLY, application_detach_reply)                  \
 _(MAP_ANOTHER_SEGMENT, map_another_segment)                            \
@@ -1369,23 +1386,14 @@ main (int argc, char **argv)
   echo_main_t *em = &echo_main;
   unformat_input_t _argv, *a = &_argv;
   u8 *chroot_prefix;
-  u8 *heap, *uri = 0;
+  u8 *uri = 0;
   u8 *bind_uri = (u8 *) "tcp://0.0.0.0/1234";
   u8 *connect_uri = (u8 *) "tcp://6.0.1.1/1234";
   u64 bytes_to_send = 64 << 10, mbytes;
   char *app_name;
   u32 tmp;
-  mheap_t *h;
 
-  clib_mem_init (0, 256 << 20);
-
-  heap = clib_mem_get_per_cpu_heap ();
-  h = mheap_header (heap);
-
-  /* make the main heap thread-safe */
-  h->flags |= MHEAP_FLAG_THREAD_SAFE;
-
-  vec_validate (em->rx_buf, 128 << 10);
+  clib_mem_init_thread_safe (0, 256 << 20);
 
   memset (em, 0, sizeof (*em));
   em->session_index_by_vpp_handles = hash_create (0, sizeof (uword));
@@ -1439,7 +1447,7 @@ main (int argc, char **argv)
        ;
       else
        {
-         fformat (stderr, "%s: usage [master|slave]\n");
+         fformat (stderr, "%s: usage [master|slave]\n", argv[0]);
          exit (1);
        }
     }
@@ -1463,6 +1471,7 @@ main (int argc, char **argv)
   em->test_return_packets = test_return_packets;
   em->bytes_to_send = bytes_to_send;
   em->time_to_stop = 0;
+  vec_validate (em->rx_buf, 128 << 10);
   vec_validate (em->client_thread_handles, em->n_clients - 1);
   vec_validate (em->thread_args, em->n_clients - 1);