session: send ctrl msg over mq 58/13458/8
authorFlorin Coras <fcoras@cisco.com>
Thu, 12 Jul 2018 21:48:06 +0000 (14:48 -0700)
committerDamjan Marion <dmarion@me.com>
Tue, 17 Jul 2018 09:02:17 +0000 (09:02 +0000)
Change-Id: I242056bc46ddb671064665916b2687860292dcb2
Signed-off-by: Florin Coras <fcoras@cisco.com>
13 files changed:
src/svm/message_queue.c
src/svm/message_queue.h
src/tests/vnet/session/tcp_echo.c
src/tests/vnet/session/udp_echo.c
src/vnet/session-apps/echo_client.c
src/vnet/session/application.c
src/vnet/session/application_interface.h
src/vnet/session/segment_manager.c
src/vnet/session/session.c
src/vnet/session/session.h
src/vnet/session/session_api.c
src/vnet/session/session_cli.c
src/vnet/session/session_node.c

index 8941114..77c15ed 100644 (file)
@@ -200,10 +200,11 @@ svm_msg_q_add (svm_msg_q_t * mq, svm_msg_q_msg_t * msg, int nowait)
 }
 
 void
-svm_msg_q_add_w_lock (svm_msg_q_t * mq, svm_msg_q_msg_t * msg)
+svm_msg_q_add_and_unlock (svm_msg_q_t * mq, svm_msg_q_msg_t * msg)
 {
   ASSERT (svm_msq_q_msg_is_valid (mq, msg));
   svm_queue_add_raw (mq->q, (u8 *) msg);
+  svm_msg_q_unlock (mq);
 }
 
 int
index 708a03d..5ff0c4b 100644 (file)
@@ -163,7 +163,7 @@ int svm_msg_q_add (svm_msg_q_t * mq, svm_msg_q_msg_t * msg, int nowait);
  * @param msg          message (pointer to ring position) to be enqueued
  * @return             success status
  */
-void svm_msg_q_add_w_lock (svm_msg_q_t * mq, svm_msg_q_msg_t * msg);
+void svm_msg_q_add_and_unlock (svm_msg_q_t * mq, svm_msg_q_msg_t * msg);
 
 /**
  * Consumer dequeue one message from queue
index f8b75d9..6c99294 100644 (file)
@@ -62,6 +62,7 @@ typedef enum
 {
   STATE_START,
   STATE_ATTACHED,
+  STATE_LISTEN,
   STATE_READY,
   STATE_DISCONNECTING,
   STATE_FAILED,
@@ -101,9 +102,6 @@ typedef struct
   /* Our event queue */
   svm_msg_q_t *our_event_queue;
 
-  /* $$$ single thread only for the moment */
-  svm_queue_t *vpp_event_queue;
-
   u8 *socket_name;
 
   pid_t my_pid;
@@ -240,16 +238,22 @@ init_error_string_table (echo_main_t * em)
   hash_set (em->error_string_by_error_number, 99, "Misc");
 }
 
-int
+static void handle_mq_event (session_event_t * e);
+
+static int
 wait_for_state_change (echo_main_t * em, connection_state_t state)
 {
+  svm_msg_q_msg_t msg;
+  session_event_t *e;
+  f64 timeout;
+
 #if CLIB_DEBUG > 0
 #define TIMEOUT 600.0
 #else
 #define TIMEOUT 600.0
 #endif
 
-  f64 timeout = clib_time_now (&em->clib_time) + TIMEOUT;
+  timeout = clib_time_now (&em->clib_time) + TIMEOUT;
 
   while (clib_time_now (&em->clib_time) < timeout)
     {
@@ -259,6 +263,14 @@ wait_for_state_change (echo_main_t * em, connection_state_t state)
        return -1;
       if (em->time_to_stop == 1)
        return 0;
+      if (!em->our_event_queue)
+       continue;
+
+      if (svm_msg_q_sub (em->our_event_queue, &msg, SVM_Q_NOWAIT, 0))
+       continue;
+      e = svm_msg_q_msg_data (em->our_event_queue, &msg);
+      handle_mq_event (e);
+      svm_msg_q_free_msg (em->our_event_queue, &msg);
     }
   clib_warning ("timeout waiting for state %d", state);
   return -1;
@@ -277,8 +289,9 @@ application_send_attach (echo_main_t * em)
   bmp->_vl_msg_id = ntohs (VL_API_APPLICATION_ATTACH);
   bmp->client_index = em->my_client_index;
   bmp->context = ntohl (0xfeedface);
-  bmp->options[APP_OPTIONS_FLAGS] =
-    APP_OPTIONS_FLAGS_ACCEPT_REDIRECT | APP_OPTIONS_FLAGS_ADD_SEGMENT;
+  bmp->options[APP_OPTIONS_FLAGS] = APP_OPTIONS_FLAGS_ACCEPT_REDIRECT;
+  bmp->options[APP_OPTIONS_FLAGS] |= APP_OPTIONS_FLAGS_ADD_SEGMENT;
+  bmp->options[APP_OPTIONS_FLAGS] |= APP_OPTIONS_FLAGS_USE_MQ_FOR_CTRL_MSGS;
   bmp->options[APP_OPTIONS_PREALLOC_FIFO_PAIRS] = 16;
   bmp->options[APP_OPTIONS_RX_FIFO_SIZE] = em->fifo_size;
   bmp->options[APP_OPTIONS_TX_FIFO_SIZE] = em->fifo_size;
@@ -306,7 +319,7 @@ application_send_attach (echo_main_t * em)
   vl_msg_api_send_shmem (em->vl_input_queue, (u8 *) & key_mp);
 }
 
-int
+static int
 application_attach (echo_main_t * em)
 {
   application_send_attach (em);
@@ -547,74 +560,6 @@ session_print_stats (echo_main_t * em, session_t * session)
   fformat (stdout, "%.4f Gbit/second\n", (bytes * 8.0) / deltat / 1e9);
 }
 
-static void
-vl_api_disconnect_session_t_handler (vl_api_disconnect_session_t * mp)
-{
-  echo_main_t *em = &echo_main;
-  session_t *session = 0;
-  vl_api_disconnect_session_reply_t *rmp;
-  uword *p;
-  int rv = 0;
-
-  p = hash_get (em->session_index_by_vpp_handles, mp->handle);
-
-  if (p)
-    {
-      session = pool_elt_at_index (em->sessions, p[0]);
-      hash_unset (em->session_index_by_vpp_handles, mp->handle);
-      pool_put (em->sessions, session);
-    }
-  else
-    {
-      clib_warning ("couldn't find session key %llx", mp->handle);
-      rv = -11;
-    }
-
-//  em->time_to_stop = 1;
-
-  rmp = vl_msg_api_alloc (sizeof (*rmp));
-  memset (rmp, 0, sizeof (*rmp));
-
-  rmp->_vl_msg_id = ntohs (VL_API_DISCONNECT_SESSION_REPLY);
-  rmp->retval = rv;
-  rmp->handle = mp->handle;
-  rmp->context = mp->context;
-  vl_msg_api_send_shmem (em->vl_input_queue, (u8 *) & rmp);
-
-  if (session)
-    session_print_stats (em, session);
-}
-
-static void
-vl_api_reset_session_t_handler (vl_api_reset_session_t * mp)
-{
-  echo_main_t *em = &echo_main;
-  vl_api_reset_session_reply_t *rmp;
-  uword *p;
-  int rv = 0;
-
-  p = hash_get (em->session_index_by_vpp_handles, mp->handle);
-
-  if (p)
-    {
-      clib_warning ("got reset");
-      /* Cleanup later */
-      em->time_to_stop = 1;
-    }
-  else
-    {
-      clib_warning ("couldn't find session key %llx", mp->handle);
-      rv = -11;
-    }
-
-  rmp = vl_msg_api_alloc (sizeof (*rmp));
-  memset (rmp, 0, sizeof (*rmp));
-  rmp->_vl_msg_id = ntohs (VL_API_RESET_SESSION_REPLY);
-  rmp->retval = rv;
-  rmp->handle = mp->handle;
-  vl_msg_api_send_shmem (em->vl_input_queue, (u8 *) & rmp);
-}
-
 static void
 test_recv_bytes (session_t * s, u8 * rx_buf, u32 n_read)
 {
@@ -661,7 +606,7 @@ recv_test_chunk (echo_main_t * em, session_t * s, u8 * rx_buf)
 }
 
 void
-client_handle_fifo_event_rx (echo_main_t * em, session_fifo_event_t * e,
+client_handle_fifo_event_rx (echo_main_t * em, session_event_t * e,
                             u8 * rx_buf)
 {
   session_t *s;
@@ -743,7 +688,7 @@ client_thread_fn (void *arg)
 void *
 client_rx_thread_fn (void *arg)
 {
-  session_fifo_event_t _e, *e = &_e;
+  session_event_t _e, *e = &_e;
   echo_main_t *em = &echo_main;
   static u8 *rx_buf = 0;
   svm_msg_q_msg_t msg;
@@ -771,8 +716,104 @@ client_rx_thread_fn (void *arg)
   pthread_exit (0);
 }
 
+void
+client_send_connect (echo_main_t * em)
+{
+  vl_api_connect_uri_t *cmp;
+  cmp = vl_msg_api_alloc (sizeof (*cmp));
+  memset (cmp, 0, sizeof (*cmp));
+
+  cmp->_vl_msg_id = ntohs (VL_API_CONNECT_URI);
+  cmp->client_index = em->my_client_index;
+  cmp->context = ntohl (0xfeedface);
+  memcpy (cmp->uri, em->connect_uri, vec_len (em->connect_uri));
+  vl_msg_api_send_shmem (em->vl_input_queue, (u8 *) & cmp);
+}
+
+void
+client_send_disconnect (echo_main_t * em, session_t * s)
+{
+  vl_api_disconnect_session_t *dmp;
+  dmp = vl_msg_api_alloc (sizeof (*dmp));
+  memset (dmp, 0, sizeof (*dmp));
+  dmp->_vl_msg_id = ntohs (VL_API_DISCONNECT_SESSION);
+  dmp->client_index = em->my_client_index;
+  dmp->handle = s->vpp_session_handle;
+  vl_msg_api_send_shmem (em->vl_input_queue, (u8 *) & dmp);
+}
+
+int
+client_disconnect (echo_main_t * em, session_t * s)
+{
+  client_send_disconnect (em, s);
+  pool_put (em->sessions, s);
+  memset (s, 0xfe, sizeof (*s));
+  return 0;
+}
+
+static void
+session_accepted_handler (session_accepted_msg_t * mp)
+{
+  app_session_evt_t _app_evt, *app_evt = &_app_evt;
+  session_accepted_reply_msg_t *rmp;
+  svm_fifo_t *rx_fifo, *tx_fifo;
+  echo_main_t *em = &echo_main;
+  session_t *session;
+  static f64 start_time;
+  u32 session_index;
+  u8 *ip_str;
+
+  if (start_time == 0.0)
+    start_time = clib_time_now (&em->clib_time);
+
+  ip_str = format (0, "%U", format_ip46_address, &mp->ip, mp->is_ip4);
+  clib_warning ("Accepted session from: %s:%d", ip_str,
+               clib_net_to_host_u16 (mp->port));
+
+  /* Allocate local session and set it up */
+  pool_get (em->sessions, session);
+  session_index = session - em->sessions;
+
+  rx_fifo = uword_to_pointer (mp->server_rx_fifo, svm_fifo_t *);
+  rx_fifo->client_session_index = session_index;
+  tx_fifo = uword_to_pointer (mp->server_tx_fifo, svm_fifo_t *);
+  tx_fifo->client_session_index = session_index;
+
+  session->server_rx_fifo = rx_fifo;
+  session->server_tx_fifo = tx_fifo;
+  session->vpp_evt_q = uword_to_pointer (mp->vpp_event_queue_address,
+                                        svm_msg_q_t *);
+
+  /* Add it to lookup table */
+  hash_set (em->session_index_by_vpp_handles, mp->handle, session_index);
+
+  em->state = STATE_READY;
+
+  /* Stats printing */
+  if (pool_elts (em->sessions) && (pool_elts (em->sessions) % 20000) == 0)
+    {
+      f64 now = clib_time_now (&em->clib_time);
+      fformat (stdout, "%d active sessions in %.2f seconds, %.2f/sec...\n",
+              pool_elts (em->sessions), now - start_time,
+              (f64) pool_elts (em->sessions) / (now - start_time));
+    }
+
+  /*
+   * Send accept reply to vpp
+   */
+  app_alloc_ctrl_evt_to_vpp (session->vpp_evt_q, app_evt,
+                            SESSION_CTRL_EVT_ACCEPTED_REPLY);
+  rmp = (session_accepted_reply_msg_t *) app_evt->evt->data;
+  rmp->handle = mp->handle;
+  rmp->context = mp->context;
+  app_send_ctrl_evt_to_vpp (session->vpp_evt_q, app_evt);
+
+  session->bytes_received = 0;
+  session->start = clib_time_now (&em->clib_time);
+}
+
 static void
-vl_api_connect_session_reply_t_handler (vl_api_connect_session_reply_t * mp)
+session_connected_handler (session_connected_msg_t * mp)
 {
   echo_main_t *em = &echo_main;
   session_t *session;
@@ -829,39 +870,96 @@ vl_api_connect_session_reply_t_handler (vl_api_connect_session_reply_t * mp)
                mp->is_ip4, clib_net_to_host_u16 (mp->lcl_port));
 }
 
-void
-client_send_connect (echo_main_t * em)
+static void
+session_disconnected_handler (session_disconnected_msg_t * mp)
 {
-  vl_api_connect_uri_t *cmp;
-  cmp = vl_msg_api_alloc (sizeof (*cmp));
-  memset (cmp, 0, sizeof (*cmp));
+  app_session_evt_t _app_evt, *app_evt = &_app_evt;
+  session_disconnected_reply_msg_t *rmp;
+  echo_main_t *em = &echo_main;
+  session_t *session = 0;
+  uword *p;
+  int rv = 0;
 
-  cmp->_vl_msg_id = ntohs (VL_API_CONNECT_URI);
-  cmp->client_index = em->my_client_index;
-  cmp->context = ntohl (0xfeedface);
-  memcpy (cmp->uri, em->connect_uri, vec_len (em->connect_uri));
-  vl_msg_api_send_shmem (em->vl_input_queue, (u8 *) & cmp);
+  p = hash_get (em->session_index_by_vpp_handles, mp->handle);
+
+  if (p)
+    {
+      clib_warning ("disconnected");
+      session = pool_elt_at_index (em->sessions, p[0]);
+      hash_unset (em->session_index_by_vpp_handles, mp->handle);
+      pool_put (em->sessions, session);
+    }
+  else
+    {
+      clib_warning ("couldn't find session key %llx", mp->handle);
+      rv = -11;
+    }
+
+  app_alloc_ctrl_evt_to_vpp (session->vpp_evt_q, app_evt,
+                            SESSION_CTRL_EVT_DISCONNECTED_REPLY);
+  rmp = (session_disconnected_reply_msg_t *) app_evt->evt->data;
+  rmp->retval = rv;
+  rmp->handle = mp->handle;
+  rmp->context = mp->context;
+  app_send_ctrl_evt_to_vpp (session->vpp_evt_q, app_evt);
+
+  if (session)
+    session_print_stats (em, session);
 }
 
-void
-client_send_disconnect (echo_main_t * em, session_t * s)
+static void
+session_reset_handler (session_reset_msg_t * mp)
 {
-  vl_api_disconnect_session_t *dmp;
-  dmp = vl_msg_api_alloc (sizeof (*dmp));
-  memset (dmp, 0, sizeof (*dmp));
-  dmp->_vl_msg_id = ntohs (VL_API_DISCONNECT_SESSION);
-  dmp->client_index = em->my_client_index;
-  dmp->handle = s->vpp_session_handle;
-  vl_msg_api_send_shmem (em->vl_input_queue, (u8 *) & dmp);
+  app_session_evt_t _app_evt, *app_evt = &_app_evt;
+  echo_main_t *em = &echo_main;
+  session_reset_reply_msg_t *rmp;
+  session_t *session = 0;
+  uword *p;
+  int rv = 0;
+
+  p = hash_get (em->session_index_by_vpp_handles, mp->handle);
+
+  if (p)
+    {
+      session = pool_elt_at_index (em->sessions, p[0]);
+      clib_warning ("got reset");
+      /* Cleanup later */
+      em->time_to_stop = 1;
+    }
+  else
+    {
+      clib_warning ("couldn't find session key %llx", mp->handle);
+      return;
+    }
+
+  app_alloc_ctrl_evt_to_vpp (session->vpp_evt_q, app_evt,
+                            SESSION_CTRL_EVT_RESET_REPLY);
+  rmp = (session_reset_reply_msg_t *) app_evt->evt->data;
+  rmp->retval = rv;
+  rmp->handle = mp->handle;
+  app_send_ctrl_evt_to_vpp (session->vpp_evt_q, app_evt);
 }
 
-int
-client_disconnect (echo_main_t * em, session_t * s)
+static void
+handle_mq_event (session_event_t * e)
 {
-  client_send_disconnect (em, s);
-  pool_put (em->sessions, s);
-  memset (s, 0xfe, sizeof (*s));
-  return 0;
+  switch (e->event_type)
+    {
+    case SESSION_CTRL_EVT_ACCEPTED:
+      session_accepted_handler ((session_accepted_msg_t *) e->data);
+      break;
+    case SESSION_CTRL_EVT_CONNECTED:
+      session_connected_handler ((session_connected_msg_t *) e->data);
+      break;
+    case SESSION_CTRL_EVT_DISCONNECTED:
+      session_disconnected_handler ((session_disconnected_msg_t *) e->data);
+      break;
+    case SESSION_CTRL_EVT_RESET:
+      session_reset_handler ((session_reset_msg_t *) e->data);
+      break;
+    default:
+      clib_warning ("unhandled %u", e->event_type);
+    }
 }
 
 static void
@@ -869,6 +967,7 @@ clients_run (echo_main_t * em)
 {
   f64 start_time, deltat, timeout = 100.0;
   svm_msg_q_msg_t msg;
+  session_event_t *e;
   session_t *s;
   int i;
 
@@ -890,7 +989,13 @@ clients_run (echo_main_t * em)
   while (em->n_clients_connected < em->n_clients
         && (clib_time_now (&em->clib_time) - start_time < timeout)
         && em->state != STATE_FAILED)
-    ;
+
+    {
+      svm_msg_q_sub (em->our_event_queue, &msg, SVM_Q_WAIT, 0);
+      e = svm_msg_q_msg_data (em->our_event_queue, &msg);
+      handle_mq_event (e);
+      svm_msg_q_free_msg (em->our_event_queue, &msg);
+    }
 
   if (em->n_clients_connected != em->n_clients)
     {
@@ -921,9 +1026,12 @@ clients_run (echo_main_t * em)
        if (svm_msg_q_sub (em->our_event_queue, &msg, SVM_Q_WAIT, 0))
          {
            clib_warning ("svm msg q returned");
+           continue;
          }
-       else
-         svm_msg_q_free_msg (em->our_event_queue, &msg);
+       e = svm_msg_q_msg_data (em->our_event_queue, &msg);
+       if (e->event_type != FIFO_EVENT_APP_RX)
+         handle_mq_event (e);
+       svm_msg_q_free_msg (em->our_event_queue, &msg);
       }
 
   for (i = 0; i < em->n_clients; i++)
@@ -1055,72 +1163,10 @@ format_ip46_address (u8 * s, va_list * args)
 }
 
 static void
-vl_api_accept_session_t_handler (vl_api_accept_session_t * mp)
-{
-  echo_main_t *em = &echo_main;
-  vl_api_accept_session_reply_t *rmp;
-  svm_fifo_t *rx_fifo, *tx_fifo;
-  session_t *session;
-  static f64 start_time;
-  u32 session_index;
-  u8 *ip_str;
-
-  if (start_time == 0.0)
-    start_time = clib_time_now (&em->clib_time);
-
-  ip_str = format (0, "%U", format_ip46_address, &mp->ip, mp->is_ip4);
-  clib_warning ("Accepted session from: %s:%d", ip_str,
-               clib_net_to_host_u16 (mp->port));
-  em->vpp_event_queue =
-    uword_to_pointer (mp->vpp_event_queue_address, svm_queue_t *);
-
-  /* Allocate local session and set it up */
-  pool_get (em->sessions, session);
-  session_index = session - em->sessions;
-
-  rx_fifo = uword_to_pointer (mp->server_rx_fifo, svm_fifo_t *);
-  rx_fifo->client_session_index = session_index;
-  tx_fifo = uword_to_pointer (mp->server_tx_fifo, svm_fifo_t *);
-  tx_fifo->client_session_index = session_index;
-
-  session->server_rx_fifo = rx_fifo;
-  session->server_tx_fifo = tx_fifo;
-
-  /* Add it to lookup table */
-  hash_set (em->session_index_by_vpp_handles, mp->handle, session_index);
-
-  em->state = STATE_READY;
-
-  /* Stats printing */
-  if (pool_elts (em->sessions) && (pool_elts (em->sessions) % 20000) == 0)
-    {
-      f64 now = clib_time_now (&em->clib_time);
-      fformat (stdout, "%d active sessions in %.2f seconds, %.2f/sec...\n",
-              pool_elts (em->sessions), now - start_time,
-              (f64) pool_elts (em->sessions) / (now - start_time));
-    }
-
-  /*
-   * Send accept reply to vpp
-   */
-  rmp = vl_msg_api_alloc (sizeof (*rmp));
-  memset (rmp, 0, sizeof (*rmp));
-  rmp->_vl_msg_id = ntohs (VL_API_ACCEPT_SESSION_REPLY);
-  rmp->handle = mp->handle;
-  rmp->context = mp->context;
-  vl_msg_api_send_shmem (em->vl_input_queue, (u8 *) & rmp);
-
-  session->bytes_received = 0;
-  session->start = clib_time_now (&em->clib_time);
-}
-
-void
-server_handle_fifo_event_rx (echo_main_t * em, session_fifo_event_t * e)
+server_handle_fifo_event_rx (echo_main_t * em, session_event_t * e)
 {
   svm_fifo_t *rx_fifo, *tx_fifo;
   int n_read;
-  session_fifo_event_t evt;
-  svm_queue_t *q;
   session_t *session;
   int rv;
   u32 max_dequeue, offset, max_transfer, rx_buf_len;
@@ -1167,24 +1213,18 @@ server_handle_fifo_event_rx (echo_main_t * em, session_fifo_event_t * e)
 
          /* If event wasn't set, add one */
          if (svm_fifo_set_event (tx_fifo))
-           {
-             /* Fabricate TX event, send to vpp */
-             evt.fifo = tx_fifo;
-             evt.event_type = FIFO_EVENT_APP_TX;
-
-             q = em->vpp_event_queue;
-             svm_queue_add (q, (u8 *) & evt, 1 /* do wait for mutex */ );
-           }
+           app_send_io_evt_to_vpp (session->vpp_evt_q, tx_fifo,
+                                   FIFO_EVENT_APP_TX, SVM_Q_WAIT);
        }
     }
   while ((n_read < 0 || max_dequeue > 0) && !em->time_to_stop);
 }
 
-void
+static void
 server_handle_event_queue (echo_main_t * em)
 {
-  session_fifo_event_t _e, *e = &_e;
   svm_msg_q_msg_t msg;
+  session_event_t *e;
 
   while (1)
     {
@@ -1195,13 +1235,8 @@ server_handle_event_queue (echo_main_t * em)
        case FIFO_EVENT_APP_RX:
          server_handle_fifo_event_rx (em, e);
          break;
-
-       case FIFO_EVENT_DISCONNECT:
-         svm_msg_q_free_msg (em->our_event_queue, &msg);
-         return;
-
        default:
-         clib_warning ("unknown event type %d", e->event_type);
+         handle_mq_event (e);
          break;
        }
       if (PREDICT_FALSE (em->time_to_stop == 1))
@@ -1346,11 +1381,7 @@ static void
 #define foreach_tcp_echo_msg                                           \
 _(BIND_URI_REPLY, bind_uri_reply)                                      \
 _(UNBIND_URI_REPLY, unbind_uri_reply)                                  \
-_(ACCEPT_SESSION, accept_session)                                      \
-_(CONNECT_SESSION_REPLY, connect_session_reply)                        \
-_(DISCONNECT_SESSION, disconnect_session)                              \
 _(DISCONNECT_SESSION_REPLY, disconnect_session_reply)                  \
-_(RESET_SESSION, reset_session)                                        \
 _(APPLICATION_ATTACH_REPLY, application_attach_reply)                  \
 _(APPLICATION_DETACH_REPLY, application_detach_reply)                  \
 _(MAP_ANOTHER_SEGMENT, map_another_segment)                            \
@@ -1394,8 +1425,6 @@ main (int argc, char **argv)
   /* make the main heap thread-safe */
   h->flags |= MHEAP_FLAG_THREAD_SAFE;
 
-  vec_validate (em->rx_buf, 128 << 10);
-
   memset (em, 0, sizeof (*em));
   em->session_index_by_vpp_handles = hash_create (0, sizeof (uword));
   em->my_pid = getpid ();
@@ -1472,6 +1501,7 @@ main (int argc, char **argv)
   em->test_return_packets = test_return_packets;
   em->bytes_to_send = bytes_to_send;
   em->time_to_stop = 0;
+  vec_validate (em->rx_buf, 128 << 10);
   vec_validate (em->client_thread_handles, em->n_clients - 1);
   vec_validate (em->thread_args, em->n_clients - 1);
 
index d796b6b..2e176ff 100644 (file)
 typedef enum
 {
   STATE_START,
+  STATE_ATTACHED,
   STATE_BOUND,
   STATE_READY,
   STATE_FAILED,
   STATE_DISCONNECTING,
+  STATE_DETACHED
 } connection_state_t;
 
 typedef struct
@@ -298,7 +300,7 @@ unformat_uri (unformat_input_t * input, va_list * args)
   return 0;
 }
 
-void
+static void
 application_send_attach (udp_echo_main_t * utm)
 {
   vl_api_application_attach_t *bmp;
@@ -311,6 +313,7 @@ application_send_attach (udp_echo_main_t * utm)
   bmp->options[APP_OPTIONS_FLAGS] = APP_OPTIONS_FLAGS_ADD_SEGMENT;
   bmp->options[APP_OPTIONS_FLAGS] |= APP_OPTIONS_FLAGS_USE_GLOBAL_SCOPE;
   bmp->options[APP_OPTIONS_FLAGS] |= APP_OPTIONS_FLAGS_USE_LOCAL_SCOPE;
+  bmp->options[APP_OPTIONS_FLAGS] |= APP_OPTIONS_FLAGS_USE_MQ_FOR_CTRL_MSGS;
   bmp->options[APP_OPTIONS_PREALLOC_FIFO_PAIRS] = 2;
   bmp->options[APP_OPTIONS_RX_FIFO_SIZE] = utm->fifo_size;
   bmp->options[APP_OPTIONS_TX_FIFO_SIZE] = utm->fifo_size;
@@ -370,6 +373,7 @@ vl_api_application_attach_reply_t_handler (vl_api_application_attach_reply_t *
 
   utm->our_event_queue =
     uword_to_pointer (mp->app_event_queue_address, svm_msg_q_t *);
+  utm->state = STATE_ATTACHED;
 }
 
 static void
@@ -378,6 +382,7 @@ vl_api_application_detach_reply_t_handler (vl_api_application_detach_reply_t *
 {
   if (mp->retval)
     clib_warning ("detach returned with err: %d", mp->retval);
+  udp_echo_main.state = STATE_DETACHED;
 }
 
 u8 *
@@ -473,7 +478,199 @@ cut_through_thread_fn (void *arg)
 }
 
 static void
-udp_client_connect (udp_echo_main_t * utm)
+session_accepted_handler (session_accepted_msg_t * mp)
+{
+  app_session_evt_t _app_evt, *app_evt = &_app_evt;
+  udp_echo_main_t *utm = &udp_echo_main;
+  session_accepted_reply_msg_t *rmp;
+  svm_fifo_t *rx_fifo, *tx_fifo;
+  app_session_t *session;
+  static f64 start_time;
+  u32 session_index;
+  int rv = 0;
+
+  if (start_time == 0.0)
+    start_time = clib_time_now (&utm->clib_time);
+
+  utm->vpp_event_queue = uword_to_pointer (mp->vpp_event_queue_address,
+                                          svm_msg_q_t *);
+  rx_fifo = uword_to_pointer (mp->server_rx_fifo, svm_fifo_t *);
+  tx_fifo = uword_to_pointer (mp->server_tx_fifo, svm_fifo_t *);
+
+  pool_get (utm->sessions, session);
+  memset (session, 0, sizeof (*session));
+  session_index = session - utm->sessions;
+
+  /* Cut-through case */
+  if (mp->server_event_queue_address)
+    {
+      clib_warning ("cut-through session");
+      utm->our_event_queue = uword_to_pointer (mp->server_event_queue_address,
+                                              svm_msg_q_t *);
+      rx_fifo->master_session_index = session_index;
+      tx_fifo->master_session_index = session_index;
+      utm->cut_through_session_index = session_index;
+      session->rx_fifo = rx_fifo;
+      session->tx_fifo = tx_fifo;
+
+      rv = pthread_create (&utm->cut_through_thread_handle,
+                          NULL /*attr */ , cut_through_thread_fn, 0);
+      if (rv)
+       {
+         clib_warning ("pthread_create returned %d", rv);
+         rv = VNET_API_ERROR_SYSCALL_ERROR_1;
+       }
+      utm->do_echo = 1;
+    }
+  else
+    {
+      rx_fifo->client_session_index = session_index;
+      tx_fifo->client_session_index = session_index;
+      session->rx_fifo = rx_fifo;
+      session->tx_fifo = tx_fifo;
+      clib_memcpy (&session->transport.rmt_ip, mp->ip,
+                  sizeof (ip46_address_t));
+      session->transport.is_ip4 = mp->is_ip4;
+      session->transport.rmt_port = mp->port;
+    }
+
+  hash_set (utm->session_index_by_vpp_handles, mp->handle, session_index);
+  if (pool_elts (utm->sessions) && (pool_elts (utm->sessions) % 20000) == 0)
+    {
+      f64 now = clib_time_now (&utm->clib_time);
+      fformat (stdout, "%d active sessions in %.2f seconds, %.2f/sec...\n",
+              pool_elts (utm->sessions), now - start_time,
+              (f64) pool_elts (utm->sessions) / (now - start_time));
+    }
+
+  app_alloc_ctrl_evt_to_vpp (session->vpp_evt_q, app_evt,
+                            SESSION_CTRL_EVT_ACCEPTED_REPLY);
+  rmp = (session_accepted_reply_msg_t *) app_evt->evt->data;
+  rmp->handle = mp->handle;
+  rmp->context = mp->context;
+  rmp->retval = rv;
+  app_send_ctrl_evt_to_vpp (session->vpp_evt_q, app_evt);
+
+  CLIB_MEMORY_BARRIER ();
+  utm->state = STATE_READY;
+}
+
+static void
+session_disconnected_handler (session_disconnected_msg_t * mp)
+{
+  app_session_evt_t _app_evt, *app_evt = &_app_evt;
+  udp_echo_main_t *utm = &udp_echo_main;
+  session_disconnected_reply_msg_t *rmp;
+  app_session_t *session;
+  uword *p;
+  int rv = 0;
+
+  p = hash_get (utm->session_index_by_vpp_handles, mp->handle);
+
+  if (p)
+    {
+      session = pool_elt_at_index (utm->sessions, p[0]);
+      hash_unset (utm->session_index_by_vpp_handles, mp->handle);
+      pool_put (utm->sessions, session);
+    }
+  else
+    {
+      clib_warning ("couldn't find session key %llx", mp->handle);
+      return;
+    }
+
+  app_alloc_ctrl_evt_to_vpp (session->vpp_evt_q, app_evt,
+                            SESSION_CTRL_EVT_DISCONNECTED_REPLY);
+  rmp = (session_disconnected_reply_msg_t *) app_evt->evt->data;
+  rmp->retval = rv;
+  rmp->handle = mp->handle;
+  rmp->context = mp->context;
+  app_send_ctrl_evt_to_vpp (session->vpp_evt_q, app_evt);
+}
+
+static void
+session_connected_handler (session_connected_msg_t * mp)
+{
+  udp_echo_main_t *utm = &udp_echo_main;
+  unformat_input_t _input, *input = &_input;
+  session_endpoint_extended_t _sep, *sep = &_sep;
+  app_session_t *session;
+
+  ASSERT (utm->i_am_server == 0);
+
+  if (mp->retval)
+    {
+      clib_warning ("failed connect");
+      return;
+    }
+
+  ASSERT (mp->server_rx_fifo && mp->server_tx_fifo);
+
+  pool_get (utm->sessions, session);
+  session->rx_fifo = uword_to_pointer (mp->server_rx_fifo, svm_fifo_t *);
+  session->tx_fifo = uword_to_pointer (mp->server_tx_fifo, svm_fifo_t *);
+  session->vpp_evt_q = uword_to_pointer (mp->vpp_event_queue_address,
+                                        svm_msg_q_t *);
+  /* Cut-through case */
+  if (mp->client_event_queue_address)
+    {
+      clib_warning ("cut-through session");
+      utm->cut_through_session_index = session - utm->sessions;
+      utm->vpp_event_queue = uword_to_pointer (mp->vpp_event_queue_address,
+                                              svm_msg_q_t *);
+      utm->our_event_queue = uword_to_pointer (mp->client_event_queue_address,
+                                              svm_msg_q_t *);
+      utm->do_echo = 1;
+    }
+  else
+    {
+      utm->connected_session = session - utm->sessions;
+      utm->vpp_event_queue = uword_to_pointer (mp->vpp_event_queue_address,
+                                              svm_msg_q_t *);
+
+      clib_memcpy (&session->transport.lcl_ip, mp->lcl_ip,
+                  sizeof (ip46_address_t));
+      session->transport.is_ip4 = mp->is_ip4;
+      session->transport.lcl_port = mp->lcl_port;
+
+      unformat_init_vector (input, utm->connect_uri);
+      if (!unformat (input, "%U", unformat_uri, sep))
+       {
+         clib_warning ("can't figure out remote ip and port");
+         utm->state = STATE_FAILED;
+         unformat_free (input);
+         return;
+       }
+      unformat_free (input);
+      clib_memcpy (&session->transport.rmt_ip, &sep->ip,
+                  sizeof (ip46_address_t));
+      session->transport.rmt_port = sep->port;
+      session->is_dgram = !utm->is_connected;
+    }
+  utm->state = STATE_READY;
+}
+
+static void
+handle_mq_event (session_event_t * e)
+{
+  switch (e->event_type)
+    {
+    case SESSION_CTRL_EVT_ACCEPTED:
+      session_accepted_handler ((session_accepted_msg_t *) e->data);
+      break;
+    case SESSION_CTRL_EVT_CONNECTED:
+      session_connected_handler ((session_connected_msg_t *) e->data);
+      break;
+    case SESSION_CTRL_EVT_DISCONNECTED:
+      session_disconnected_handler ((session_disconnected_msg_t *) e->data);
+      break;
+    default:
+      clib_warning ("unhandled %u", e->event_type);
+    }
+}
+
+static void
+udp_client_send_connect (udp_echo_main_t * utm)
 {
   vl_api_connect_uri_t *cmp;
   cmp = vl_msg_api_alloc (sizeof (*cmp));
@@ -677,18 +874,41 @@ client_send_data (udp_echo_main_t * utm)
                transfer_type);
 }
 
+static int
+application_attach (udp_echo_main_t * utm)
+{
+  application_send_attach (utm);
+  if (wait_for_state_change (utm, STATE_ATTACHED))
+    {
+      clib_warning ("timeout waiting for STATE_ATTACHED");
+      return -1;
+    }
+  return 0;
+}
+
 static void
 client_test (udp_echo_main_t * utm)
 {
+  f64 start_time, timeout = 100.0;
   app_session_t *session;
+  svm_msg_q_msg_t msg;
+  session_event_t *e;
 
-  application_send_attach (utm);
-  udp_client_connect (utm);
+  if (application_attach (utm))
+    return;
+
+  udp_client_send_connect (utm);
+
+  start_time = clib_time_now (&utm->clib_time);
+  while (pool_elts (utm->sessions) != 1
+        && clib_time_now (&utm->clib_time) - start_time < timeout
+        && utm->state != STATE_FAILED)
 
-  if (wait_for_state_change (utm, STATE_READY))
     {
-      clib_warning ("timeout waiting for STATE_READY");
-      return;
+      svm_msg_q_sub (utm->our_event_queue, &msg, SVM_Q_WAIT, 0);
+      e = svm_msg_q_msg_data (utm->our_event_queue, &msg);
+      handle_mq_event (e);
+      svm_msg_q_free_msg (utm->our_event_queue, &msg);
     }
 
   if (utm->cut_through_session_index != ~0)
@@ -704,6 +924,7 @@ client_test (udp_echo_main_t * utm)
     }
 
   application_detach (utm);
+  wait_for_state_change (utm, STATE_DETACHED);
 }
 
 static void
@@ -792,84 +1013,6 @@ vl_api_unmap_segment_t_handler (vl_api_unmap_segment_t * mp)
   vec_free (seg_name);
 }
 
-/**
- * Acting as server for redirected connect requests
- */
-static void
-vl_api_connect_uri_t_handler (vl_api_connect_uri_t * mp)
-{
-  u32 segment_index;
-  udp_echo_main_t *utm = &udp_echo_main;
-  svm_fifo_segment_main_t *sm = &svm_fifo_segment_main;
-  svm_fifo_segment_create_args_t _a, *a = &_a;
-  svm_fifo_segment_private_t *seg;
-  svm_queue_t *client_q;
-  vl_api_connect_session_reply_t *rmp;
-  app_session_t *session = 0;
-  int rv = 0;
-
-  /* Create the segment */
-  a->segment_name = (char *) format (0, "%d:segment%d%c", utm->my_pid,
-                                    utm->unique_segment_index++, 0);
-  a->segment_size = utm->configured_segment_size;
-
-  rv = svm_fifo_segment_create (a);
-  if (rv)
-    {
-      clib_warning ("sm_fifo_segment_create ('%s') failed", a->segment_name);
-      rv = VNET_API_ERROR_URI_FIFO_CREATE_FAILED;
-      goto send_reply;
-    }
-
-  vec_add2 (utm->seg, seg, 1);
-
-  segment_index = vec_len (sm->segments) - 1;
-  memcpy (seg, sm->segments + segment_index, sizeof (utm->seg[0]));
-
-  pool_get (utm->sessions, session);
-
-  session->rx_fifo = svm_fifo_segment_alloc_fifo
-    (utm->seg, 128 * 1024, FIFO_SEGMENT_RX_FREELIST);
-  ASSERT (session->rx_fifo);
-
-  session->tx_fifo = svm_fifo_segment_alloc_fifo
-    (utm->seg, 128 * 1024, FIFO_SEGMENT_TX_FREELIST);
-  ASSERT (session->tx_fifo);
-
-  session->rx_fifo->master_session_index = session - utm->sessions;
-  session->tx_fifo->master_session_index = session - utm->sessions;
-  utm->cut_through_session_index = session - utm->sessions;
-
-  rv = pthread_create (&utm->cut_through_thread_handle,
-                      NULL /*attr */ , cut_through_thread_fn, 0);
-  if (rv)
-    {
-      clib_warning ("pthread_create returned %d", rv);
-      rv = VNET_API_ERROR_SYSCALL_ERROR_1;
-    }
-
-send_reply:
-  rmp = vl_msg_api_alloc (sizeof (*rmp));
-  memset (rmp, 0, sizeof (*rmp));
-
-  rmp->_vl_msg_id = ntohs (VL_API_CONNECT_SESSION_REPLY);
-  rmp->context = mp->context;
-  rmp->retval = ntohl (rv);
-  rmp->segment_name_length = vec_len (a->segment_name);
-  if (session)
-    {
-      rmp->server_rx_fifo = pointer_to_uword (session->rx_fifo);
-      rmp->server_tx_fifo = pointer_to_uword (session->tx_fifo);
-    }
-
-  memcpy (rmp->segment_name, a->segment_name, vec_len (a->segment_name));
-
-  vec_free (a->segment_name);
-
-  client_q = uword_to_pointer (mp->client_queue_address, svm_queue_t *);
-  vl_msg_api_send_shmem (client_q, (u8 *) & rmp);
-}
-
 static void
 vl_api_unbind_uri_reply_t_handler (vl_api_unbind_uri_reply_t * mp)
 {
@@ -881,184 +1024,9 @@ vl_api_unbind_uri_reply_t_handler (vl_api_unbind_uri_reply_t * mp)
   utm->state = STATE_START;
 }
 
-static void
-vl_api_accept_session_t_handler (vl_api_accept_session_t * mp)
-{
-  udp_echo_main_t *utm = &udp_echo_main;
-  vl_api_accept_session_reply_t *rmp;
-  svm_fifo_t *rx_fifo, *tx_fifo;
-  app_session_t *session;
-  static f64 start_time;
-  u32 session_index;
-  int rv = 0;
-
-  if (start_time == 0.0)
-    start_time = clib_time_now (&utm->clib_time);
-
-  utm->vpp_event_queue = uword_to_pointer (mp->vpp_event_queue_address,
-                                          svm_msg_q_t *);
-  rx_fifo = uword_to_pointer (mp->server_rx_fifo, svm_fifo_t *);
-  tx_fifo = uword_to_pointer (mp->server_tx_fifo, svm_fifo_t *);
-
-  pool_get (utm->sessions, session);
-  memset (session, 0, sizeof (*session));
-  session_index = session - utm->sessions;
-
-  /* Cut-through case */
-  if (mp->server_event_queue_address)
-    {
-      clib_warning ("cut-through session");
-      utm->our_event_queue = uword_to_pointer (mp->server_event_queue_address,
-                                              svm_msg_q_t *);
-      rx_fifo->master_session_index = session_index;
-      tx_fifo->master_session_index = session_index;
-      utm->cut_through_session_index = session_index;
-      session->rx_fifo = rx_fifo;
-      session->tx_fifo = tx_fifo;
-
-      rv = pthread_create (&utm->cut_through_thread_handle,
-                          NULL /*attr */ , cut_through_thread_fn, 0);
-      if (rv)
-       {
-         clib_warning ("pthread_create returned %d", rv);
-         rv = VNET_API_ERROR_SYSCALL_ERROR_1;
-       }
-      utm->do_echo = 1;
-    }
-  else
-    {
-      rx_fifo->client_session_index = session_index;
-      tx_fifo->client_session_index = session_index;
-      session->rx_fifo = rx_fifo;
-      session->tx_fifo = tx_fifo;
-      clib_memcpy (&session->transport.rmt_ip, mp->ip,
-                  sizeof (ip46_address_t));
-      session->transport.is_ip4 = mp->is_ip4;
-      session->transport.rmt_port = mp->port;
-    }
-
-  hash_set (utm->session_index_by_vpp_handles, mp->handle, session_index);
-  if (pool_elts (utm->sessions) && (pool_elts (utm->sessions) % 20000) == 0)
-    {
-      f64 now = clib_time_now (&utm->clib_time);
-      fformat (stdout, "%d active sessions in %.2f seconds, %.2f/sec...\n",
-              pool_elts (utm->sessions), now - start_time,
-              (f64) pool_elts (utm->sessions) / (now - start_time));
-    }
-
-  rmp = vl_msg_api_alloc (sizeof (*rmp));
-  memset (rmp, 0, sizeof (*rmp));
-  rmp->_vl_msg_id = ntohs (VL_API_ACCEPT_SESSION_REPLY);
-  rmp->handle = mp->handle;
-  rmp->context = mp->context;
-  rmp->retval = rv;
-  vl_msg_api_send_shmem (utm->vl_input_queue, (u8 *) & rmp);
-
-  CLIB_MEMORY_BARRIER ();
-  utm->state = STATE_READY;
-}
-
-static void
-vl_api_disconnect_session_t_handler (vl_api_disconnect_session_t * mp)
-{
-  udp_echo_main_t *utm = &udp_echo_main;
-  app_session_t *session;
-  vl_api_disconnect_session_reply_t *rmp;
-  uword *p;
-  int rv = 0;
-
-  p = hash_get (utm->session_index_by_vpp_handles, mp->handle);
-
-  if (p)
-    {
-      session = pool_elt_at_index (utm->sessions, p[0]);
-      hash_unset (utm->session_index_by_vpp_handles, mp->handle);
-      pool_put (utm->sessions, session);
-    }
-  else
-    {
-      clib_warning ("couldn't find session key %llx", mp->handle);
-      rv = -11;
-    }
-
-  rmp = vl_msg_api_alloc (sizeof (*rmp));
-  memset (rmp, 0, sizeof (*rmp));
-  rmp->_vl_msg_id = ntohs (VL_API_DISCONNECT_SESSION_REPLY);
-  rmp->retval = rv;
-  rmp->handle = mp->handle;
-  rmp->context = mp->context;
-  vl_msg_api_send_shmem (utm->vl_input_queue, (u8 *) & rmp);
-}
-
-static void
-vl_api_connect_session_reply_t_handler (vl_api_connect_session_reply_t * mp)
-{
-  udp_echo_main_t *utm = &udp_echo_main;
-  unformat_input_t _input, *input = &_input;
-  session_endpoint_extended_t _sep, *sep = &_sep;
-  app_session_t *session;
-
-  ASSERT (utm->i_am_server == 0);
-
-  if (mp->retval)
-    {
-      clib_warning ("failed connect");
-      return;
-    }
-
-  ASSERT (mp->server_rx_fifo && mp->server_tx_fifo);
-
-  pool_get (utm->sessions, session);
-  session->rx_fifo = uword_to_pointer (mp->server_rx_fifo, svm_fifo_t *);
-  session->tx_fifo = uword_to_pointer (mp->server_tx_fifo, svm_fifo_t *);
-  session->vpp_evt_q = uword_to_pointer (mp->vpp_event_queue_address,
-                                        svm_msg_q_t *);
-  /* Cut-through case */
-  if (mp->client_event_queue_address)
-    {
-      clib_warning ("cut-through session");
-      utm->cut_through_session_index = session - utm->sessions;
-      utm->vpp_event_queue = uword_to_pointer (mp->vpp_event_queue_address,
-                                              svm_msg_q_t *);
-      utm->our_event_queue = uword_to_pointer (mp->client_event_queue_address,
-                                              svm_msg_q_t *);
-      utm->do_echo = 1;
-    }
-  else
-    {
-      utm->connected_session = session - utm->sessions;
-      utm->vpp_event_queue = uword_to_pointer (mp->vpp_event_queue_address,
-                                              svm_msg_q_t *);
-
-      clib_memcpy (&session->transport.lcl_ip, mp->lcl_ip,
-                  sizeof (ip46_address_t));
-      session->transport.is_ip4 = mp->is_ip4;
-      session->transport.lcl_port = mp->lcl_port;
-
-      unformat_init_vector (input, utm->connect_uri);
-      if (!unformat (input, "%U", unformat_uri, sep))
-       {
-         clib_warning ("can't figure out remote ip and port");
-         utm->state = STATE_FAILED;
-         unformat_free (input);
-         return;
-       }
-      unformat_free (input);
-      clib_memcpy (&session->transport.rmt_ip, &sep->ip,
-                  sizeof (ip46_address_t));
-      session->transport.rmt_port = sep->port;
-      session->is_dgram = !utm->is_connected;
-    }
-  utm->state = STATE_READY;
-}
-
 #define foreach_tcp_echo_msg                           \
 _(BIND_URI_REPLY, bind_uri_reply)                      \
-_(CONNECT_URI, connect_uri)                            \
-_(CONNECT_SESSION_REPLY, connect_session_reply)        \
 _(UNBIND_URI_REPLY, unbind_uri_reply)                  \
-_(ACCEPT_SESSION, accept_session)                      \
-_(DISCONNECT_SESSION, disconnect_session)              \
 _(MAP_ANOTHER_SEGMENT, map_another_segment)            \
 _(UNMAP_SEGMENT, unmap_segment)                                \
 _(APPLICATION_ATTACH_REPLY, application_attach_reply)  \
@@ -1113,7 +1081,7 @@ init_error_string_table (udp_echo_main_t * utm)
 }
 
 void
-server_handle_fifo_event_rx (udp_echo_main_t * utm, session_fifo_event_t * e)
+server_handle_fifo_event_rx (udp_echo_main_t * utm, session_event_t * e)
 {
   app_session_t *s;
   int rv;
@@ -1134,7 +1102,7 @@ server_handle_fifo_event_rx (udp_echo_main_t * utm, session_fifo_event_t * e)
 void
 server_handle_event_queue (udp_echo_main_t * utm)
 {
-  session_fifo_event_t *e;
+  session_event_t *e;
   svm_msg_q_msg_t msg;
 
   while (utm->state != STATE_READY)
@@ -1154,12 +1122,8 @@ server_handle_event_queue (udp_echo_main_t * utm)
          server_handle_fifo_event_rx (utm, e);
          break;
 
-       case FIFO_EVENT_DISCONNECT:
-         utm->time_to_stop = 1;
-         break;
-
        default:
-         clib_warning ("unknown event type %d", e->event_type);
+         handle_mq_event (e);
          break;
        }
       svm_msg_q_free_msg (utm->our_event_queue, &msg);
index 3d1af67..e67de95 100644 (file)
@@ -359,7 +359,7 @@ echo_clients_session_connected_callback (u32 app_index, u32 api_context,
   echo_client_main_t *ecm = &echo_client_main;
   eclient_session_t *session;
   u32 session_index;
-  u8 thread_index = s->thread_index;
+  u8 thread_index;
 
   if (is_fail)
     {
@@ -368,6 +368,7 @@ echo_clients_session_connected_callback (u32 app_index, u32 api_context,
       return 0;
     }
 
+  thread_index = s->thread_index;
   ASSERT (thread_index == vlib_get_thread_index ()
          || session_transport_service_type (s) == TRANSPORT_SERVICE_CL);
 
index 6041b49..1dc04f0 100644 (file)
@@ -810,22 +810,7 @@ application_get_segment_manager_properties (u32 app_index)
 static inline int
 app_enqueue_evt (svm_msg_q_t * mq, svm_msg_q_msg_t * msg, u8 lock)
 {
-  if (PREDICT_TRUE (!svm_msg_q_is_full (mq)))
-    {
-      if (lock)
-       {
-         svm_msg_q_add_w_lock (mq, msg);
-         svm_msg_q_unlock (mq);
-       }
-      else if (svm_msg_q_add (mq, msg, SVM_Q_WAIT))
-       {
-         clib_warning ("msg q add returned");
-         if (lock)
-           svm_msg_q_unlock (mq);
-         return -1;
-       }
-    }
-  else
+  if (PREDICT_FALSE (svm_msg_q_is_full (mq)))
     {
       clib_warning ("evt q full");
       svm_msg_q_free_msg (mq, msg);
@@ -833,13 +818,26 @@ app_enqueue_evt (svm_msg_q_t * mq, svm_msg_q_msg_t * msg, u8 lock)
        svm_msg_q_unlock (mq);
       return -1;
     }
+
+  if (lock)
+    {
+      svm_msg_q_add_and_unlock (mq, msg);
+      return 0;
+    }
+
+  /* Even when not locking the ring, we must wait for queue mutex */
+  if (svm_msg_q_add (mq, msg, SVM_Q_WAIT))
+    {
+      clib_warning ("msg q add returned");
+      return -1;
+    }
   return 0;
 }
 
 static inline int
 app_send_io_evt_rx (application_t * app, stream_session_t * s, u8 lock)
 {
-  session_fifo_event_t *evt;
+  session_event_t *evt;
   svm_msg_q_msg_t msg;
   svm_msg_q_t *mq;
 
@@ -873,7 +871,7 @@ app_send_io_evt_rx (application_t * app, stream_session_t * s, u8 lock)
   msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_IO_EVT_RING);
   ASSERT (!svm_msg_q_msg_is_invalid (&msg));
 
-  evt = (session_fifo_event_t *) svm_msg_q_msg_data (mq, &msg);
+  evt = (session_event_t *) svm_msg_q_msg_data (mq, &msg);
   evt->fifo = s->server_rx_fifo;
   evt->event_type = FIFO_EVENT_APP_RX;
 
@@ -884,7 +882,7 @@ static inline int
 app_send_io_evt_tx (application_t * app, stream_session_t * s, u8 lock)
 {
   svm_msg_q_t *mq;
-  session_fifo_event_t *evt;
+  session_event_t *evt;
   svm_msg_q_msg_t msg;
 
   if (application_is_builtin (app))
@@ -905,7 +903,7 @@ app_send_io_evt_tx (application_t * app, stream_session_t * s, u8 lock)
   msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_IO_EVT_RING);
   ASSERT (!svm_msg_q_msg_is_invalid (&msg));
 
-  evt = (session_fifo_event_t *) svm_msg_q_msg_data (mq, &msg);
+  evt = (session_event_t *) svm_msg_q_msg_data (mq, &msg);
   evt->event_type = FIFO_EVENT_APP_TX;
   evt->fifo = s->server_tx_fifo;
 
index 50c0434..0aabd38 100644 (file)
@@ -137,6 +137,7 @@ typedef enum
   _(IS_PROXY, "Application is proxying")                       \
   _(USE_GLOBAL_SCOPE, "App can use global session scope")      \
   _(USE_LOCAL_SCOPE, "App can use local session scope")                \
+  _(USE_MQ_FOR_CTRL_MSGS, "Use message queue for ctr msgs")    \
 
 typedef enum _app_options
 {
@@ -197,6 +198,102 @@ typedef struct
 #undef _
 } app_session_t;
 
+typedef struct session_accepted_msg_
+{
+  u32 context;
+  u64 listener_handle;
+  u64 handle;
+  u64 server_rx_fifo;
+  u64 server_tx_fifo;
+  u64 vpp_event_queue_address;
+  u64 server_event_queue_address;
+  u16 port;
+  u8 is_ip4;
+  u8 ip[16];
+} session_accepted_msg_t;
+
+typedef struct session_accepted_reply_msg_
+{
+  u32 context;
+  i32 retval;
+  u64 handle;
+} session_accepted_reply_msg_t;
+
+/* Make sure this is not too large, otherwise it won't fit when dequeued in
+ * the session queue node */
+STATIC_ASSERT (sizeof (session_accepted_reply_msg_t) <= 16, "accept reply");
+
+typedef struct session_connected_msg_
+{
+  u32 context;
+  i32 retval;
+  u64 handle;
+  u64 server_rx_fifo;
+  u64 server_tx_fifo;
+  u64 vpp_event_queue_address;
+  u64 client_event_queue_address;
+  u32 segment_size;
+  u8 segment_name_length;
+  u8 segment_name[64];
+  u8 lcl_ip[16];
+  u8 is_ip4;
+  u16 lcl_port;
+} session_connected_msg_t;
+
+typedef struct session_disconnected_msg_
+{
+  u32 client_index;
+  u32 context;
+  u64 handle;
+} session_disconnected_msg_t;
+
+typedef struct session_disconnected_reply_msg_
+{
+  u32 context;
+  i32 retval;
+  u64 handle;
+} session_disconnected_reply_msg_t;
+
+typedef struct session_reset_msg_
+{
+  u32 client_index;
+  u32 context;
+  u64 handle;
+} session_reset_msg_t;
+
+typedef struct session_reset_reply_msg_
+{
+  u32 client_index;
+  u32 context;
+  i32 retval;
+  u64 handle;
+} session_reset_reply_msg_t;
+
+typedef struct app_session_event_
+{
+  svm_msg_q_msg_t msg;
+  session_event_t *evt;
+} app_session_evt_t;
+
+static inline void
+app_alloc_ctrl_evt_to_vpp (svm_msg_q_t * mq, app_session_evt_t * app_evt,
+                          u8 evt_type)
+{
+  svm_msg_q_lock_and_alloc_msg_w_ring (mq,
+                                      SESSION_MQ_CTRL_EVT_RING,
+                                      SVM_Q_WAIT, &app_evt->msg);
+  svm_msg_q_unlock (mq);
+  app_evt->evt = svm_msg_q_msg_data (mq, &app_evt->msg);
+  memset (app_evt->evt, 0, sizeof (*app_evt->evt));
+  app_evt->evt->event_type = evt_type;
+}
+
+static inline void
+app_send_ctrl_evt_to_vpp (svm_msg_q_t * mq, app_session_evt_t * app_evt)
+{
+  svm_msg_q_add (mq, &app_evt->msg, SVM_Q_WAIT);
+}
+
 /**
  * Send fifo io event to vpp worker thread
  *
@@ -213,7 +310,7 @@ static inline int
 app_send_io_evt_to_vpp (svm_msg_q_t * mq, svm_fifo_t * f, u8 evt_type,
                        u8 noblock)
 {
-  session_fifo_event_t *evt;
+  session_event_t *evt;
   svm_msg_q_msg_t msg;
 
   if (noblock)
@@ -231,11 +328,10 @@ app_send_io_evt_to_vpp (svm_msg_q_t * mq, svm_fifo_t * f, u8 evt_type,
          svm_msg_q_unlock (mq);
          return -2;
        }
-      evt = (session_fifo_event_t *) svm_msg_q_msg_data (mq, &msg);
+      evt = (session_event_t *) svm_msg_q_msg_data (mq, &msg);
       evt->fifo = f;
       evt->event_type = evt_type;
-      svm_msg_q_add_w_lock (mq, &msg);
-      svm_msg_q_unlock (mq);
+      svm_msg_q_add_and_unlock (mq, &msg);
       return 0;
     }
   else
@@ -247,13 +343,12 @@ app_send_io_evt_to_vpp (svm_msg_q_t * mq, svm_fifo_t * f, u8 evt_type,
          svm_msg_q_wait (mq);
          msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_IO_EVT_RING);
        }
-      evt = (session_fifo_event_t *) svm_msg_q_msg_data (mq, &msg);
+      evt = (session_event_t *) svm_msg_q_msg_data (mq, &msg);
       evt->fifo = f;
       evt->event_type = evt_type;
       if (svm_msg_q_is_full (mq))
        svm_msg_q_wait (mq);
-      svm_msg_q_add_w_lock (mq, &msg);
-      svm_msg_q_unlock (mq);
+      svm_msg_q_add_and_unlock (mq, &msg);
       return 0;
     }
 }
index b00bcd5..c7d2482 100644 (file)
@@ -605,7 +605,7 @@ segment_manager_evt_q_expected_size (u32 q_len)
   u32 fifo_evt_size, notif_q_size, q_hdrs;
   u32 msg_q_sz, fifo_evt_ring_sz, session_ntf_ring_sz;
 
-  fifo_evt_size = 1 << max_log2 (sizeof (session_fifo_event_t));
+  fifo_evt_size = 1 << max_log2 (sizeof (session_event_t));
   notif_q_size = clib_max (16, q_len >> 4);
 
   msg_q_sz = q_len * sizeof (svm_msg_q_msg_t);
@@ -630,7 +630,7 @@ segment_manager_alloc_queue (svm_fifo_segment_private_t * segment,
   svm_msg_q_t *q;
   void *oldheap;
 
-  fifo_evt_size = sizeof (session_fifo_event_t);
+  fifo_evt_size = sizeof (session_event_t);
   notif_q_size = clib_max (16, queue_size >> 4);
   /* *INDENT-OFF* */
   svm_msg_q_ring_cfg_t rc[SESSION_MQ_N_RINGS] = {
index 38a0521..897cb1a 100644 (file)
@@ -31,7 +31,7 @@ static inline int
 session_send_evt_to_thread (void *data, void *args, u32 thread_index,
                            session_evt_type_t evt_type)
 {
-  session_fifo_event_t *evt;
+  session_event_t *evt;
   svm_msg_q_msg_t msg;
   svm_msg_q_t *mq;
   u32 tries = 0, max_tries;
@@ -57,7 +57,7 @@ session_send_evt_to_thread (void *data, void *args, u32 thread_index,
       svm_msg_q_unlock (mq);
       return -2;
     }
-  evt = (session_fifo_event_t *) svm_msg_q_msg_data (mq, &msg);
+  evt = (session_event_t *) svm_msg_q_msg_data (mq, &msg);
   evt->event_type = evt_type;
   switch (evt_type)
     {
@@ -78,8 +78,7 @@ session_send_evt_to_thread (void *data, void *args, u32 thread_index,
       return -1;
     }
 
-  svm_msg_q_add_w_lock (mq, &msg);
-  svm_msg_q_unlock (mq);
+  svm_msg_q_add_and_unlock (mq, &msg);
   return 0;
 }
 
@@ -1095,7 +1094,7 @@ stream_session_disconnect (stream_session_t * s)
 {
   u32 thread_index = vlib_get_thread_index ();
   session_manager_main_t *smm = &session_manager_main;
-  session_fifo_event_t *evt;
+  session_event_t *evt;
 
   if (!s)
     return;
@@ -1197,7 +1196,7 @@ session_tx_is_dgram (stream_session_t * s)
 void
 session_vpp_event_queues_allocate (session_manager_main_t * smm)
 {
-  u32 evt_q_length = 2048, evt_size = sizeof (session_fifo_event_t);
+  u32 evt_q_length = 2048, evt_size = sizeof (session_event_t);
   ssvm_private_t *eqs = &smm->evt_qs_segment;
   api_main_t *am = &api_main;
   u64 eqs_size = 64 << 20;
index 879b382..1917616 100644 (file)
@@ -34,10 +34,17 @@ typedef enum
 {
   FIFO_EVENT_APP_RX,
   FIFO_EVENT_APP_TX,
-  FIFO_EVENT_TIMEOUT,
   FIFO_EVENT_DISCONNECT,
   FIFO_EVENT_BUILTIN_RX,
   FIFO_EVENT_RPC,
+  SESSION_CTRL_EVT_ACCEPTED,
+  SESSION_CTRL_EVT_ACCEPTED_REPLY,
+  SESSION_CTRL_EVT_CONNECTED,
+  SESSION_CTRL_EVT_CONNECTED_REPLY,
+  SESSION_CTRL_EVT_DISCONNECTED,
+  SESSION_CTRL_EVT_DISCONNECTED_REPLY,
+  SESSION_CTRL_EVT_RESET,
+  SESSION_CTRL_EVT_RESET_REPLY
 } session_evt_type_t;
 
 static inline const char *
@@ -49,8 +56,6 @@ fifo_event_type_str (session_evt_type_t et)
       return "FIFO_EVENT_APP_RX";
     case FIFO_EVENT_APP_TX:
       return "FIFO_EVENT_APP_TX";
-    case FIFO_EVENT_TIMEOUT:
-      return "FIFO_EVENT_TIMEOUT";
     case FIFO_EVENT_DISCONNECT:
       return "FIFO_EVENT_DISCONNECT";
     case FIFO_EVENT_BUILTIN_RX:
@@ -112,7 +117,7 @@ typedef struct
       u8 data[0];
     };
   };
-} __clib_packed session_fifo_event_t;
+} __clib_packed session_event_t;
 /* *INDENT-ON* */
 
 #define SESSION_MSG_NULL { }
@@ -168,14 +173,14 @@ typedef struct _session_manager_main session_manager_main_t;
 
 typedef int
   (session_fifo_rx_fn) (vlib_main_t * vm, vlib_node_runtime_t * node,
-                       session_fifo_event_t * e0, stream_session_t * s0,
+                       session_event_t * e0, stream_session_t * s0,
                        int *n_tx_pkts);
 
 extern session_fifo_rx_fn session_tx_fifo_peek_and_snd;
 extern session_fifo_rx_fn session_tx_fifo_dequeue_and_snd;
 extern session_fifo_rx_fn session_tx_fifo_dequeue_internal;
 
-u8 session_node_lookup_fifo_event (svm_fifo_t * f, session_fifo_event_t * e);
+u8 session_node_lookup_fifo_event (svm_fifo_t * f, session_event_t * e);
 
 struct _session_manager_main
 {
@@ -195,13 +200,13 @@ struct _session_manager_main
   u32 **tx_buffers;
 
   /** Per worker-thread vector of partially read events */
-  session_fifo_event_t **free_event_vector;
+  session_event_t **free_event_vector;
 
   /** per-worker active event vectors */
-  session_fifo_event_t **pending_event_vector;
+  session_event_t **pending_event_vector;
 
   /** per-worker postponed disconnects */
-  session_fifo_event_t **pending_disconnects;
+  session_event_t **pending_disconnects;
 
   /** per-worker session context */
   session_tx_context_t *ctx;
index f9fddea..724aff1 100755 (executable)
@@ -363,6 +363,207 @@ static session_cb_vft_t session_cb_vft = {
   .del_segment_callback = send_del_segment_callback,
 };
 
+static int
+mq_send_session_accepted_cb (stream_session_t * s)
+{
+  application_t *app = application_get (s->app_index);
+  svm_msg_q_msg_t _msg, *msg = &_msg;
+  svm_msg_q_t *vpp_queue, *app_mq;
+  transport_proto_vft_t *tp_vft;
+  transport_connection_t *tc;
+  stream_session_t *listener;
+  session_accepted_msg_t *mp;
+  session_event_t *evt;
+
+  app_mq = app->event_queue;
+  svm_msg_q_lock_and_alloc_msg_w_ring (app_mq, SESSION_MQ_CTRL_EVT_RING,
+                                      SVM_Q_WAIT, msg);
+  svm_msg_q_unlock (app_mq);
+
+  evt = svm_msg_q_msg_data (app_mq, msg);
+  memset (evt, 0, sizeof (*evt));
+  evt->event_type = SESSION_CTRL_EVT_ACCEPTED;
+  mp = (session_accepted_msg_t *) evt->data;
+  mp->context = app->index;
+  mp->server_rx_fifo = pointer_to_uword (s->server_rx_fifo);
+  mp->server_tx_fifo = pointer_to_uword (s->server_tx_fifo);
+
+  if (session_has_transport (s))
+    {
+      listener = listen_session_get (s->listener_index);
+      mp->listener_handle = listen_session_get_handle (listener);
+      if (application_is_proxy (app))
+       {
+         listener =
+           application_first_listener (app, session_get_fib_proto (s),
+                                       session_get_transport_proto (s));
+         if (listener)
+           mp->listener_handle = listen_session_get_handle (listener);
+       }
+      vpp_queue = session_manager_get_vpp_event_queue (s->thread_index);
+      mp->vpp_event_queue_address = pointer_to_uword (vpp_queue);
+      mp->handle = session_handle (s);
+      tp_vft = transport_protocol_get_vft (session_get_transport_proto (s));
+      tc = tp_vft->get_connection (s->connection_index, s->thread_index);
+      mp->port = tc->rmt_port;
+      mp->is_ip4 = tc->is_ip4;
+      clib_memcpy (&mp->ip, &tc->rmt_ip, sizeof (tc->rmt_ip));
+    }
+  else
+    {
+      local_session_t *ls = (local_session_t *) s;
+      local_session_t *ll;
+      if (application_local_session_listener_has_transport (ls))
+       {
+         listener = listen_session_get (ls->listener_index);
+         mp->listener_handle = listen_session_get_handle (listener);
+         mp->is_ip4 = session_type_is_ip4 (listener->session_type);
+       }
+      else
+       {
+         ll = application_get_local_listen_session (app, ls->listener_index);
+         if (ll->transport_listener_index != ~0)
+           {
+             listener = listen_session_get (ll->transport_listener_index);
+             mp->listener_handle = listen_session_get_handle (listener);
+           }
+         else
+           {
+             mp->listener_handle = application_local_session_handle (ll);
+           }
+         mp->is_ip4 = session_type_is_ip4 (ll->listener_session_type);
+       }
+      mp->handle = application_local_session_handle (ls);
+      mp->port = ls->port;
+      mp->vpp_event_queue_address = ls->client_evt_q;
+      mp->server_event_queue_address = ls->server_evt_q;
+    }
+  svm_msg_q_add (app_mq, msg, SVM_Q_WAIT);
+
+  return 0;
+}
+
+static void
+mq_send_session_disconnected_cb (stream_session_t * s)
+{
+  application_t *app = application_get (s->app_index);
+  svm_msg_q_msg_t _msg, *msg = &_msg;
+  session_disconnected_msg_t *mp;
+  svm_msg_q_t *app_mq;
+  session_event_t *evt;
+
+  app_mq = app->event_queue;
+  svm_msg_q_lock_and_alloc_msg_w_ring (app_mq, SESSION_MQ_CTRL_EVT_RING,
+                                      SVM_Q_WAIT, msg);
+  svm_msg_q_unlock (app_mq);
+  evt = svm_msg_q_msg_data (app_mq, msg);
+  memset (evt, 0, sizeof (*evt));
+  evt->event_type = SESSION_CTRL_EVT_DISCONNECTED;
+  mp = (session_disconnected_msg_t *) evt->data;
+  mp->handle = session_handle (s);
+  mp->context = app->api_client_index;
+  svm_msg_q_add (app_mq, msg, SVM_Q_WAIT);
+}
+
+static void
+mq_send_session_reset_cb (stream_session_t * s)
+{
+  application_t *app = application_get (s->app_index);
+  svm_msg_q_msg_t _msg, *msg = &_msg;
+  session_reset_msg_t *mp;
+  svm_msg_q_t *app_mq;
+  session_event_t *evt;
+
+  app_mq = app->event_queue;
+  svm_msg_q_lock_and_alloc_msg_w_ring (app_mq, SESSION_MQ_CTRL_EVT_RING,
+                                      SVM_Q_WAIT, msg);
+  svm_msg_q_unlock (app_mq);
+  evt = svm_msg_q_msg_data (app_mq, msg);
+  memset (evt, 0, sizeof (*evt));
+  evt->event_type = SESSION_CTRL_EVT_RESET;
+  mp = (session_reset_msg_t *) evt->data;
+  mp->handle = session_handle (s);
+  svm_msg_q_add (app_mq, msg, SVM_Q_WAIT);
+}
+
+static int
+mq_send_session_connected_cb (u32 app_index, u32 api_context,
+                             stream_session_t * s, u8 is_fail)
+{
+  svm_msg_q_msg_t _msg, *msg = &_msg;
+  session_connected_msg_t *mp;
+  svm_msg_q_t *vpp_mq, *app_mq;
+  transport_connection_t *tc;
+  session_event_t *evt;
+  application_t *app;
+
+  app = application_get (app_index);
+  app_mq = app->event_queue;
+  if (!app_mq)
+    {
+      clib_warning ("app %u with api index: %u not attached", app->index,
+                   app->api_client_index);
+      return -1;
+    }
+
+  svm_msg_q_lock_and_alloc_msg_w_ring (app_mq, SESSION_MQ_CTRL_EVT_RING,
+                                      SVM_Q_WAIT, msg);
+  svm_msg_q_unlock (app_mq);
+  evt = svm_msg_q_msg_data (app_mq, msg);
+  memset (evt, 0, sizeof (*evt));
+  evt->event_type = SESSION_CTRL_EVT_CONNECTED;
+  mp = (session_connected_msg_t *) evt->data;
+  mp->context = api_context;
+
+  if (is_fail)
+    goto done;
+
+  if (session_has_transport (s))
+    {
+      tc = session_get_transport (s);
+      if (!tc)
+       {
+         is_fail = 1;
+         goto done;
+       }
+
+      vpp_mq = session_manager_get_vpp_event_queue (s->thread_index);
+      mp->handle = session_handle (s);
+      mp->vpp_event_queue_address = pointer_to_uword (vpp_mq);
+      clib_memcpy (mp->lcl_ip, &tc->lcl_ip, sizeof (tc->lcl_ip));
+      mp->is_ip4 = tc->is_ip4;
+      mp->lcl_port = tc->lcl_port;
+      mp->server_rx_fifo = pointer_to_uword (s->server_rx_fifo);
+      mp->server_tx_fifo = pointer_to_uword (s->server_tx_fifo);
+    }
+  else
+    {
+      local_session_t *ls = (local_session_t *) s;
+      mp->handle = application_local_session_handle (ls);
+      mp->lcl_port = ls->port;
+      mp->vpp_event_queue_address = ls->server_evt_q;
+      mp->client_event_queue_address = ls->client_evt_q;
+      mp->server_rx_fifo = pointer_to_uword (s->server_tx_fifo);
+      mp->server_tx_fifo = pointer_to_uword (s->server_rx_fifo);
+    }
+
+done:
+  mp->retval = is_fail ?
+    clib_host_to_net_u32 (VNET_API_ERROR_SESSION_CONNECT) : 0;
+
+  svm_msg_q_add (app_mq, msg, SVM_Q_WAIT);
+  return 0;
+}
+
+static session_cb_vft_t session_mq_cb_vft = {
+  .session_accept_callback = mq_send_session_accepted_cb,
+  .session_disconnect_callback = mq_send_session_disconnected_cb,
+  .session_connected_callback = mq_send_session_connected_cb,
+  .session_reset_callback = mq_send_session_reset_cb,
+  .add_segment_callback = send_add_segment_callback,
+  .del_segment_callback = send_del_segment_callback,
+};
+
 static void
 vl_api_session_enable_disable_t_handler (vl_api_session_enable_disable_t * mp)
 {
@@ -401,7 +602,11 @@ vl_api_application_attach_t_handler (vl_api_application_attach_t * mp)
   memset (a, 0, sizeof (*a));
   a->api_client_index = mp->client_index;
   a->options = mp->options;
-  a->session_cb_vft = &session_cb_vft;
+
+  if (a->options[APP_OPTIONS_FLAGS] & APP_OPTIONS_FLAGS_USE_MQ_FOR_CTRL_MSGS)
+    a->session_cb_vft = &session_mq_cb_vft;
+  else
+    a->session_cb_vft = &session_cb_vft;
 
   if (mp->namespace_id_len > 64)
     {
index 201f6f1..06d98ae 100755 (executable)
@@ -20,7 +20,7 @@ format_stream_session_fifos (u8 * s, va_list * args)
 {
   stream_session_t *ss = va_arg (*args, stream_session_t *);
   int verbose = va_arg (*args, int);
-  session_fifo_event_t _e, *e = &_e;
+  session_event_t _e, *e = &_e;
   u8 found;
 
   if (!ss->server_rx_fifo || !ss->server_tx_fifo)
index 350282b..bf0c395 100644 (file)
 #include <vnet/session/transport.h>
 #include <vnet/session/session.h>
 #include <vnet/session/application.h>
+#include <vnet/session/application_interface.h>
 #include <vnet/session/session_debug.h>
 #include <svm/queue.h>
 
+static void
+session_mq_accepted_reply_handler (void *data)
+{
+  session_accepted_reply_msg_t *mp = (session_accepted_reply_msg_t *) data;
+  vnet_disconnect_args_t _a = { 0 }, *a = &_a;
+  local_session_t *ls;
+  stream_session_t *s;
+
+  /* Server isn't interested, kill the session */
+  if (mp->retval)
+    {
+      a->app_index = mp->context;
+      a->handle = mp->handle;
+      vnet_disconnect_session (a);
+      return;
+    }
+
+  if (session_handle_is_local (mp->handle))
+    {
+      ls = application_get_local_session_from_handle (mp->handle);
+      if (!ls || ls->app_index != mp->context)
+       {
+         clib_warning ("server %u doesn't own local handle %llu",
+                       mp->context, mp->handle);
+         return;
+       }
+      if (application_local_session_connect_notify (ls))
+       return;
+      ls->session_state = SESSION_STATE_READY;
+    }
+  else
+    {
+      s = session_get_from_handle_if_valid (mp->handle);
+      if (!s)
+       {
+         clib_warning ("session doesn't exist");
+         return;
+       }
+      if (s->app_index != mp->context)
+       {
+         clib_warning ("app doesn't own session");
+         return;
+       }
+      s->session_state = SESSION_STATE_READY;
+    }
+}
+
+static void
+session_mq_reset_reply_handler (void *data)
+{
+  session_reset_reply_msg_t *mp;
+  application_t *app;
+  stream_session_t *s;
+  u32 index, thread_index;
+
+  mp = (session_reset_reply_msg_t *) data;
+  app = application_lookup (mp->client_index);
+  if (!app)
+    return;
+
+  session_parse_handle (mp->handle, &index, &thread_index);
+  s = session_get_if_valid (index, thread_index);
+  if (s == 0 || app->index != s->app_index)
+    {
+      clib_warning ("Invalid session!");
+      return;
+    }
+
+  /* Client objected to resetting the session, log and continue */
+  if (mp->retval)
+    {
+      clib_warning ("client retval %d", mp->retval);
+      return;
+    }
+
+  /* This comes as a response to a reset, transport only waiting for
+   * confirmation to remove connection state, no need to disconnect */
+  stream_session_cleanup (s);
+}
+
+static void
+session_mq_disconnected_handler (void *data)
+{
+  session_disconnected_reply_msg_t *rmp;
+  vnet_disconnect_args_t _a, *a = &_a;
+  svm_msg_q_msg_t _msg, *msg = &_msg;
+  session_disconnected_msg_t *mp;
+  session_event_t *evt;
+  application_t *app;
+  int rv = 0;
+
+  mp = (session_disconnected_msg_t *) data;
+  app = application_lookup (mp->client_index);
+  if (app)
+    {
+      a->handle = mp->handle;
+      a->app_index = app->index;
+      rv = vnet_disconnect_session (a);
+    }
+  else
+    {
+      rv = VNET_API_ERROR_APPLICATION_NOT_ATTACHED;
+    }
+
+  svm_msg_q_lock_and_alloc_msg_w_ring (app->event_queue,
+                                      SESSION_MQ_CTRL_EVT_RING,
+                                      SVM_Q_WAIT, msg);
+  svm_msg_q_unlock (app->event_queue);
+  evt = svm_msg_q_msg_data (app->event_queue, msg);
+  memset (evt, 0, sizeof (*evt));
+  evt->event_type = SESSION_CTRL_EVT_DISCONNECTED;
+  rmp = (session_disconnected_reply_msg_t *) evt->data;
+  rmp->handle = mp->handle;
+  rmp->context = mp->context;
+  rmp->retval = rv;
+  svm_msg_q_add (app->event_queue, msg, SVM_Q_WAIT);
+}
+
+static void
+session_mq_disconnected_reply_handler (void *data)
+{
+  session_disconnected_reply_msg_t *mp;
+  vnet_disconnect_args_t _a, *a = &_a;
+  application_t *app;
+
+  mp = (session_disconnected_reply_msg_t *) data;
+
+  /* Client objected to disconnecting the session, log and continue */
+  if (mp->retval)
+    {
+      clib_warning ("client retval %d", mp->retval);
+      return;
+    }
+
+  /* Disconnect has been confirmed. Confirm close to transport */
+  app = application_lookup (mp->context);
+  if (app)
+    {
+      a->handle = mp->handle;
+      a->app_index = app->index;
+      vnet_disconnect_session (a);
+    }
+}
+
 vlib_node_registration_t session_queue_node;
 
 typedef struct
@@ -44,8 +189,6 @@ format_session_queue_trace (u8 * s, va_list * args)
   return s;
 }
 
-vlib_node_registration_t session_queue_node;
-
 #define foreach_session_queue_error            \
 _(TX, "Packets transmitted")                   \
 _(TIMER, "Timer events")                       \
@@ -72,7 +215,6 @@ enum
   SESSION_TX_OK
 };
 
-
 static void
 session_tx_trace_frame (vlib_main_t * vm, vlib_node_runtime_t * node,
                        u32 next_index, u32 * to_next, u16 n_segs,
@@ -369,7 +511,7 @@ session_tx_set_dequeue_params (vlib_main_t * vm, session_tx_context_t * ctx,
 
 always_inline int
 session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node,
-                               session_fifo_event_t * e,
+                               session_event_t * e,
                                stream_session_t * s, int *n_tx_packets,
                                u8 peek_data)
 {
@@ -538,7 +680,7 @@ session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node,
 
 int
 session_tx_fifo_peek_and_snd (vlib_main_t * vm, vlib_node_runtime_t * node,
-                             session_fifo_event_t * e,
+                             session_event_t * e,
                              stream_session_t * s, int *n_tx_pkts)
 {
   return session_tx_fifo_read_and_snd_i (vm, node, e, s, n_tx_pkts, 1);
@@ -546,7 +688,7 @@ session_tx_fifo_peek_and_snd (vlib_main_t * vm, vlib_node_runtime_t * node,
 
 int
 session_tx_fifo_dequeue_and_snd (vlib_main_t * vm, vlib_node_runtime_t * node,
-                                session_fifo_event_t * e,
+                                session_event_t * e,
                                 stream_session_t * s, int *n_tx_pkts)
 {
   return session_tx_fifo_read_and_snd_i (vm, node, e, s, n_tx_pkts, 0);
@@ -555,7 +697,7 @@ session_tx_fifo_dequeue_and_snd (vlib_main_t * vm, vlib_node_runtime_t * node,
 int
 session_tx_fifo_dequeue_internal (vlib_main_t * vm,
                                  vlib_node_runtime_t * node,
-                                 session_fifo_event_t * e,
+                                 session_event_t * e,
                                  stream_session_t * s, int *n_tx_pkts)
 {
   application_t *app;
@@ -565,7 +707,7 @@ session_tx_fifo_dequeue_internal (vlib_main_t * vm,
 }
 
 always_inline stream_session_t *
-session_event_get_session (session_fifo_event_t * e, u8 thread_index)
+session_event_get_session (session_event_t * e, u8 thread_index)
 {
   return session_get_if_valid (e->fifo->master_session_index, thread_index);
 }
@@ -576,8 +718,8 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
 {
   session_manager_main_t *smm = vnet_get_session_manager_main ();
   u32 thread_index = vm->thread_index, n_to_dequeue, n_events;
-  session_fifo_event_t *pending_events, *e;
-  session_fifo_event_t *fifo_events;
+  session_event_t *pending_events, *e;
+  session_event_t *fifo_events;
   svm_msg_q_msg_t _msg, *msg = &_msg;
   f64 now = vlib_time_now (vm);
   int n_tx_packets = 0, i, rv;
@@ -644,7 +786,7 @@ skip_dequeue:
   for (i = 0; i < n_events; i++)
     {
       stream_session_t *s;     /* $$$ prefetch 1 ahead maybe */
-      session_fifo_event_t *e;
+      session_event_t *e;
       u32 to_dequeue;
 
       e = &fifo_events[i];
@@ -715,7 +857,20 @@ skip_dequeue:
          fp = e->rpc_args.fp;
          (*fp) (e->rpc_args.arg);
          break;
-
+       case SESSION_CTRL_EVT_DISCONNECTED:
+         session_mq_disconnected_handler (e->data);
+         break;
+       case SESSION_CTRL_EVT_ACCEPTED_REPLY:
+         session_mq_accepted_reply_handler (e->data);
+         break;
+       case SESSION_CTRL_EVT_CONNECTED_REPLY:
+         break;
+       case SESSION_CTRL_EVT_DISCONNECTED_REPLY:
+         session_mq_disconnected_reply_handler (e->data);
+         break;
+       case SESSION_CTRL_EVT_RESET_REPLY:
+         session_mq_reset_reply_handler (e->data);
+         break;
        default:
          clib_warning ("unhandled event type %d", e->event_type);
        }
@@ -751,7 +906,7 @@ dump_thread_0_event_queue (void)
   session_manager_main_t *smm = vnet_get_session_manager_main ();
   vlib_main_t *vm = &vlib_global_main;
   u32 my_thread_index = vm->thread_index;
-  session_fifo_event_t _e, *e = &_e;
+  session_event_t _e, *e = &_e;
   svm_msg_q_ring_t *ring;
   stream_session_t *s0;
   svm_msg_q_msg_t *msg;
@@ -804,7 +959,7 @@ dump_thread_0_event_queue (void)
 }
 
 static u8
-session_node_cmp_event (session_fifo_event_t * e, svm_fifo_t * f)
+session_node_cmp_event (session_event_t * e, svm_fifo_t * f)
 {
   stream_session_t *s;
   switch (e->event_type)
@@ -834,11 +989,11 @@ session_node_cmp_event (session_fifo_event_t * e, svm_fifo_t * f)
 }
 
 u8
-session_node_lookup_fifo_event (svm_fifo_t * f, session_fifo_event_t * e)
+session_node_lookup_fifo_event (svm_fifo_t * f, session_event_t * e)
 {
   session_manager_main_t *smm = vnet_get_session_manager_main ();
   svm_msg_q_t *mq;
-  session_fifo_event_t *pending_event_vector, *evt;
+  session_event_t *pending_event_vector, *evt;
   int i, index, found = 0;
   svm_msg_q_msg_t *msg;
   svm_msg_q_ring_t *ring;