session: move ctrl messages from bapi to mq 45/21445/26
authorFlorin Coras <fcoras@cisco.com>
Wed, 21 Aug 2019 23:20:44 +0000 (16:20 -0700)
committerFlorin Coras <florin.coras@gmail.com>
Tue, 27 Aug 2019 18:21:39 +0000 (18:21 +0000)
Type:refactor

Moves connect, disconnect, bind, unbind and app detach to message
queue from binary api. Simplifies app/vcl interaction with the session
layer since all session control messages are now handled over the mq.

Add/del segment messages require internal C api changes which affect all
builtin applications. They'll be moved in a different patch and might
not be back portable to 19.08.

Change-Id: I93f6d18e551b024effa75d47f5ff25f23ba8aff5
Signed-off-by: Florin Coras <fcoras@cisco.com>
14 files changed:
src/vcl/vcl_bapi.c
src/vcl/vcl_locked.c
src/vcl/vcl_private.c
src/vcl/vcl_private.h
src/vcl/vppcom.c
src/vnet/session/application.c
src/vnet/session/application.h
src/vnet/session/application_interface.h
src/vnet/session/session.api
src/vnet/session/session.c
src/vnet/session/session.h
src/vnet/session/session_api.c
src/vnet/session/session_node.c
src/vnet/session/session_types.h

index de64809..cdfc286 100644 (file)
@@ -108,10 +108,10 @@ vcl_vpp_worker_segment_handle (u32 wrk_index)
 }
 
 static void
-vl_api_application_attach_reply_t_handler (vl_api_application_attach_reply_t *
-                                          mp)
+vl_api_app_attach_reply_t_handler (vl_api_app_attach_reply_t * mp)
 {
   vcl_worker_t *wrk = vcl_worker_get (0);
+  svm_msg_q_t *ctrl_mq;
   u64 segment_handle;
   int *fds = 0, i;
   u32 n_fds = 0;
@@ -122,8 +122,11 @@ vl_api_application_attach_reply_t_handler (vl_api_application_attach_reply_t *
       goto failed;
     }
 
-  wrk->app_event_queue = uword_to_pointer (mp->app_event_queue_address,
-                                          svm_msg_q_t *);
+  wrk->app_event_queue = uword_to_pointer (mp->app_mq, svm_msg_q_t *);
+  ctrl_mq = uword_to_pointer (mp->vpp_ctrl_mq, svm_msg_q_t *);
+  vec_validate (wrk->vpp_event_queues, mp->vpp_ctrl_mq_thread);
+  wrk->vpp_event_queues[mp->vpp_ctrl_mq_thread] = ctrl_mq;
+  wrk->ctrl_mq = ctrl_mq;
   segment_handle = clib_net_to_host_u64 (mp->segment_handle);
   if (segment_handle == VCL_INVALID_SEGMENT_HANDLE)
     {
@@ -253,17 +256,6 @@ failed:
   vec_free (fds);
 }
 
-static void
-vl_api_application_detach_reply_t_handler (vl_api_application_detach_reply_t *
-                                          mp)
-{
-  if (mp->retval)
-    clib_warning ("VCL<%d>: detach failed: %U", getpid (), format_api_error,
-                 ntohl (mp->retval));
-
-  vcm->app_state = STATE_APP_ENABLED;
-}
-
 static void
 vl_api_map_another_segment_t_handler (vl_api_map_another_segment_t * mp)
 {
@@ -304,51 +296,13 @@ vl_api_unmap_segment_t_handler (vl_api_unmap_segment_t * mp)
   VDBG (1, "Unmapped segment: %d", segment_handle);
 }
 
-static void
-vl_api_bind_sock_reply_t_handler (vl_api_bind_sock_reply_t * mp)
-{
-  /* Expecting a similar message on mq. So ignore this */
-  VDBG (0, "bapi bind retval: %u!", mp->retval);
-}
-
-static void
-vl_api_unbind_sock_reply_t_handler (vl_api_unbind_sock_reply_t * mp)
-{
-  if (mp->retval)
-    VDBG (0, "ERROR: sid %u: unbind failed: %U", mp->context,
-         format_api_error, ntohl (mp->retval));
-
-  VDBG (1, "sid %u: unbind succeeded!", mp->context);
-
-}
-
-static void
-vl_api_disconnect_session_reply_t_handler (vl_api_disconnect_session_reply_t *
-                                          mp)
-{
-  if (mp->retval)
-    VDBG (0, "ERROR: sid %u: disconnect failed: %U", mp->context,
-         format_api_error, ntohl (mp->retval));
-}
-
-static void
-vl_api_connect_sock_reply_t_handler (vl_api_connect_sock_reply_t * mp)
-{
-  if (mp->retval)
-    VDBG (0, "ERROR: connect failed: %U", format_api_error,
-         ntohl (mp->retval));
-}
-
 static void
   vl_api_application_tls_cert_add_reply_t_handler
   (vl_api_application_tls_cert_add_reply_t * mp)
 {
   if (mp->retval)
-    {
-      clib_warning ("VCL<%d>: add cert failed: %U", getpid (),
-                   format_api_error, ntohl (mp->retval));
-      return;
-    }
+    VDBG (0, "add cert failed: %U", format_api_error, ntohl (mp->retval));
+  vcm->app_state = STATE_APP_READY;
 }
 
 static void
@@ -356,22 +310,13 @@ static void
   (vl_api_application_tls_key_add_reply_t * mp)
 {
   if (mp->retval)
-    {
-      clib_warning ("VCL<%d>: add key failed: %U", getpid (),
-                   format_api_error, ntohl (mp->retval));
-      return;
-    }
-
+    VDBG (0, "add key failed: %U", format_api_error, ntohl (mp->retval));
+  vcm->app_state = STATE_APP_READY;
 }
 
 #define foreach_sock_msg                                               \
 _(SESSION_ENABLE_DISABLE_REPLY, session_enable_disable_reply)          \
-_(BIND_SOCK_REPLY, bind_sock_reply)                                    \
-_(UNBIND_SOCK_REPLY, unbind_sock_reply)                                \
-_(CONNECT_SOCK_REPLY, connect_sock_reply)                              \
-_(DISCONNECT_SESSION_REPLY, disconnect_session_reply)                  \
-_(APPLICATION_ATTACH_REPLY, application_attach_reply)                  \
-_(APPLICATION_DETACH_REPLY, application_detach_reply)                  \
+_(APP_ATTACH_REPLY, app_attach_reply)                                  \
 _(APPLICATION_TLS_CERT_ADD_REPLY, application_tls_cert_add_reply)      \
 _(APPLICATION_TLS_KEY_ADD_REPLY, application_tls_key_add_reply)        \
 _(MAP_ANOTHER_SEGMENT, map_another_segment)                            \
@@ -414,7 +359,7 @@ void
 vppcom_app_send_attach (void)
 {
   vcl_worker_t *wrk = vcl_worker_get_current ();
-  vl_api_application_attach_t *bmp;
+  vl_api_app_attach_t *bmp;
   u8 nsid_len = vec_len (vcm->cfg.namespace_id);
   u8 app_is_proxy = (vcm->cfg.app_proxy_transport_tcp ||
                     vcm->cfg.app_proxy_transport_udp);
@@ -422,7 +367,7 @@ vppcom_app_send_attach (void)
   bmp = vl_msg_api_alloc (sizeof (*bmp));
   memset (bmp, 0, sizeof (*bmp));
 
-  bmp->_vl_msg_id = ntohs (VL_API_APPLICATION_ATTACH);
+  bmp->_vl_msg_id = ntohs (VL_API_APP_ATTACH);
   bmp->client_index = wrk->my_client_index;
   bmp->context = htonl (0xfeedface);
   bmp->options[APP_OPTIONS_FLAGS] =
@@ -504,80 +449,6 @@ vcl_send_child_worker_del (vcl_worker_t * child_wrk)
   vl_msg_api_send_shmem (wrk->vl_input_queue, (u8 *) & mp);
 }
 
-void
-vppcom_send_connect_sock (vcl_session_t * session)
-{
-  vcl_worker_t *wrk = vcl_worker_get_current ();
-  vl_api_connect_sock_t *cmp;
-
-  cmp = vl_msg_api_alloc (sizeof (*cmp));
-  memset (cmp, 0, sizeof (*cmp));
-  cmp->_vl_msg_id = ntohs (VL_API_CONNECT_SOCK);
-  cmp->client_index = wrk->my_client_index;
-  cmp->context = session->session_index;
-  cmp->wrk_index = wrk->vpp_wrk_index;
-  cmp->is_ip4 = session->transport.is_ip4;
-  cmp->parent_handle = session->parent_handle;
-  clib_memcpy_fast (cmp->ip, &session->transport.rmt_ip, sizeof (cmp->ip));
-  cmp->port = session->transport.rmt_port;
-  cmp->proto = session->session_type;
-  vl_msg_api_send_shmem (wrk->vl_input_queue, (u8 *) & cmp);
-}
-
-void
-vppcom_send_disconnect_session (u64 vpp_handle)
-{
-  vcl_worker_t *wrk = vcl_worker_get_current ();
-  vl_api_disconnect_session_t *dmp;
-
-  dmp = vl_msg_api_alloc (sizeof (*dmp));
-  memset (dmp, 0, sizeof (*dmp));
-  dmp->_vl_msg_id = ntohs (VL_API_DISCONNECT_SESSION);
-  dmp->client_index = wrk->my_client_index;
-  dmp->handle = vpp_handle;
-  vl_msg_api_send_shmem (wrk->vl_input_queue, (u8 *) & dmp);
-}
-
-/* VPP combines bind and listen as one operation. VCL manages the separation
- * of bind and listen locally via vppcom_session_bind() and
- * vppcom_session_listen() */
-void
-vppcom_send_bind_sock (vcl_session_t * session)
-{
-  vcl_worker_t *wrk = vcl_worker_get_current ();
-  vl_api_bind_sock_t *bmp;
-
-  /* Assumes caller has acquired spinlock: vcm->sessions_lockp */
-  bmp = vl_msg_api_alloc (sizeof (*bmp));
-  memset (bmp, 0, sizeof (*bmp));
-
-  bmp->_vl_msg_id = ntohs (VL_API_BIND_SOCK);
-  bmp->client_index = wrk->my_client_index;
-  bmp->context = session->session_index;
-  bmp->wrk_index = wrk->vpp_wrk_index;
-  bmp->is_ip4 = session->transport.is_ip4;
-  clib_memcpy_fast (bmp->ip, &session->transport.lcl_ip, sizeof (bmp->ip));
-  bmp->port = session->transport.lcl_port;
-  bmp->proto = session->session_type;
-  vl_msg_api_send_shmem (wrk->vl_input_queue, (u8 *) & bmp);
-}
-
-void
-vppcom_send_unbind_sock (vcl_worker_t * wrk, u64 vpp_handle)
-{
-  vl_api_unbind_sock_t *ump;
-
-  ump = vl_msg_api_alloc (sizeof (*ump));
-  memset (ump, 0, sizeof (*ump));
-
-  ump->_vl_msg_id = ntohs (VL_API_UNBIND_SOCK);
-  ump->client_index = wrk->my_client_index;
-  ump->wrk_index = wrk->vpp_wrk_index;
-  ump->handle = vpp_handle;
-  ump->context = wrk->wrk_index;
-  vl_msg_api_send_shmem (wrk->vl_input_queue, (u8 *) & ump);
-}
-
 void
 vppcom_send_application_tls_cert_add (vcl_session_t * session, char *cert,
                                      u32 cert_len)
@@ -593,7 +464,6 @@ vppcom_send_application_tls_cert_add (vcl_session_t * session, char *cert,
   cert_mp->cert_len = clib_host_to_net_u16 (cert_len);
   clib_memcpy_fast (cert_mp->cert, cert, cert_len);
   vl_msg_api_send_shmem (wrk->vl_input_queue, (u8 *) & cert_mp);
-
 }
 
 void
@@ -611,7 +481,6 @@ vppcom_send_application_tls_key_add (vcl_session_t * session, char *key,
   key_mp->key_len = clib_host_to_net_u16 (key_len);
   clib_memcpy_fast (key_mp->key, key, key_len);
   vl_msg_api_send_shmem (wrk->vl_input_queue, (u8 *) & key_mp);
-
 }
 
 u32
index 3b6817c..d69d391 100644 (file)
@@ -311,7 +311,7 @@ vls_listener_wrk_stop_listen (vcl_locked_session_t * vls, u32 wrk_index)
   s = vcl_session_get (wrk, vls->session_index);
   if (s->session_state != STATE_LISTEN)
     return;
-  vppcom_send_unbind_sock (wrk, s->vpp_handle);
+  vcl_send_session_unlisten (wrk, s);
   s->session_state = STATE_LISTEN_NO_MQ;
   vls_listener_wrk_set (vls, wrk_index, 0 /* is_active */ );
 }
index 13794ea..f431396 100644 (file)
@@ -276,6 +276,12 @@ vcl_worker_set_bapi (void)
   return -1;
 }
 
+svm_msg_q_t *
+vcl_worker_ctrl_mq (vcl_worker_t * wrk)
+{
+  return wrk->ctrl_mq;
+}
+
 void
 vcl_segment_table_add (u64 segment_handle, u32 svm_segment_index)
 {
index 43c8ec3..cd2544c 100644 (file)
@@ -57,6 +57,7 @@ typedef enum
   STATE_APP_ENABLED,
   STATE_APP_ATTACHED,
   STATE_APP_ADDING_WORKER,
+  STATE_APP_ADDING_TLS_DATA,
   STATE_APP_FAILED,
   STATE_APP_READY
 } app_state_t;
@@ -251,6 +252,9 @@ typedef struct vcl_worker_
   /** VPP binary api input queue */
   svm_queue_t *vl_input_queue;
 
+  /** VPP mq to be used for exchanging control messages */
+  svm_msg_q_t *ctrl_mq;
+
   /** Message queues epoll fd. Initialized only if using mqs with eventfds */
   int mqs_epfd;
 
@@ -547,6 +551,7 @@ vcl_worker_t *vcl_worker_alloc_and_init (void);
 void vcl_worker_cleanup (vcl_worker_t * wrk, u8 notify_vpp);
 int vcl_worker_register_with_vpp (void);
 int vcl_worker_set_bapi (void);
+svm_msg_q_t *vcl_worker_ctrl_mq (vcl_worker_t * wrk);
 
 void vcl_flush_mq_events (void);
 void vcl_cleanup_bapi (void);
@@ -603,10 +608,8 @@ void vppcom_init_error_string_table (void);
 void vppcom_send_session_enable_disable (u8 is_enable);
 void vppcom_app_send_attach (void);
 void vppcom_app_send_detach (void);
-void vppcom_send_connect_sock (vcl_session_t * session);
+void vcl_send_session_unlisten (vcl_worker_t * wrk, vcl_session_t * s);
 void vppcom_send_disconnect_session (u64 vpp_handle);
-void vppcom_send_bind_sock (vcl_session_t * session);
-void vppcom_send_unbind_sock (vcl_worker_t * wrk, u64 vpp_handle);
 void vppcom_api_hookup (void);
 void vppcom_send_application_tls_cert_add (vcl_session_t * session,
                                           char *cert, u32 cert_len);
index 0060922..f56c02b 100644 (file)
@@ -197,6 +197,98 @@ format_ip46_address (u8 * s, va_list * args)
  * VPPCOM Utility Functions
  */
 
+static void
+vcl_send_session_listen (vcl_worker_t * wrk, vcl_session_t * s)
+{
+  app_session_evt_t _app_evt, *app_evt = &_app_evt;
+  session_listen_msg_t *mp;
+  svm_msg_q_t *mq;
+
+  mq = vcl_worker_ctrl_mq (wrk);
+  app_alloc_ctrl_evt_to_vpp (mq, app_evt, SESSION_CTRL_EVT_LISTEN);
+  mp = (session_listen_msg_t *) app_evt->evt->data;
+  memset (mp, 0, sizeof (*mp));
+  mp->client_index = wrk->my_client_index;
+  mp->context = s->session_index;
+  mp->wrk_index = wrk->vpp_wrk_index;
+  mp->is_ip4 = s->transport.is_ip4;
+  clib_memcpy_fast (&mp->ip, &s->transport.lcl_ip, sizeof (mp->ip));
+  mp->port = s->transport.lcl_port;
+  mp->proto = s->session_type;
+  app_send_ctrl_evt_to_vpp (mq, app_evt);
+}
+
+static void
+vcl_send_session_connect (vcl_worker_t * wrk, vcl_session_t * s)
+{
+  app_session_evt_t _app_evt, *app_evt = &_app_evt;
+  session_connect_msg_t *mp;
+  svm_msg_q_t *mq;
+
+  mq = vcl_worker_ctrl_mq (wrk);
+  app_alloc_ctrl_evt_to_vpp (mq, app_evt, SESSION_CTRL_EVT_CONNECT);
+  mp = (session_connect_msg_t *) app_evt->evt->data;
+  memset (mp, 0, sizeof (*mp));
+  mp->client_index = wrk->my_client_index;
+  mp->context = s->session_index;
+  mp->wrk_index = wrk->vpp_wrk_index;
+  mp->is_ip4 = s->transport.is_ip4;
+  mp->parent_handle = s->parent_handle;
+  clib_memcpy_fast (&mp->ip, &s->transport.rmt_ip, sizeof (mp->ip));
+  mp->port = s->transport.rmt_port;
+  mp->proto = s->session_type;
+  app_send_ctrl_evt_to_vpp (mq, app_evt);
+}
+
+void
+vcl_send_session_unlisten (vcl_worker_t * wrk, vcl_session_t * s)
+{
+  app_session_evt_t _app_evt, *app_evt = &_app_evt;
+  session_unlisten_msg_t *mp;
+  svm_msg_q_t *mq;
+
+  mq = vcl_worker_ctrl_mq (wrk);
+  app_alloc_ctrl_evt_to_vpp (mq, app_evt, SESSION_CTRL_EVT_UNLISTEN);
+  mp = (session_unlisten_msg_t *) app_evt->evt->data;
+  memset (mp, 0, sizeof (*mp));
+  mp->client_index = wrk->my_client_index;
+  mp->wrk_index = wrk->vpp_wrk_index;
+  mp->handle = s->vpp_handle;
+  mp->context = wrk->wrk_index;
+  app_send_ctrl_evt_to_vpp (mq, app_evt);
+}
+
+static void
+vcl_send_session_disconnect (vcl_worker_t * wrk, vcl_session_t * s)
+{
+  app_session_evt_t _app_evt, *app_evt = &_app_evt;
+  session_disconnect_msg_t *mp;
+  svm_msg_q_t *mq;
+
+  /* Send to thread that owns the session */
+  mq = s->vpp_evt_q;
+  app_alloc_ctrl_evt_to_vpp (mq, app_evt, SESSION_CTRL_EVT_DISCONNECT);
+  mp = (session_disconnect_msg_t *) app_evt->evt->data;
+  memset (mp, 0, sizeof (*mp));
+  mp->client_index = wrk->my_client_index;
+  mp->handle = s->vpp_handle;
+  app_send_ctrl_evt_to_vpp (mq, app_evt);
+}
+
+static void
+vcl_send_app_detach (vcl_worker_t * wrk)
+{
+  app_session_evt_t _app_evt, *app_evt = &_app_evt;
+  session_app_detach_msg_t *mp;
+  svm_msg_q_t *mq;
+
+  mq = vcl_worker_ctrl_mq (wrk);
+  app_alloc_ctrl_evt_to_vpp (mq, app_evt, SESSION_CTRL_EVT_APP_DETACH);
+  mp = (session_app_detach_msg_t *) app_evt->evt->data;
+  memset (mp, 0, sizeof (*mp));
+  mp->client_index = wrk->my_client_index;
+  app_send_ctrl_evt_to_vpp (mq, app_evt);
+}
 
 static void
 vcl_send_session_accepted_reply (svm_msg_q_t * mq, u32 context,
@@ -827,7 +919,6 @@ vppcom_session_unbind (u32 session_handle)
   session_accepted_msg_t *accepted_msg;
   vcl_session_t *session = 0;
   vcl_session_msg_t *evt;
-  u64 vpp_handle;
 
   session = vcl_session_get_w_handle (wrk, session_handle);
   if (!session)
@@ -845,14 +936,14 @@ vppcom_session_unbind (u32 session_handle)
     }
   clib_fifo_free (session->accept_evts_fifo);
 
-  vpp_handle = session->vpp_handle;
-  session->vpp_handle = ~0;
-  session->session_state = STATE_DISCONNECT;
+  vcl_send_session_unlisten (wrk, session);
 
   VDBG (1, "session %u [0x%llx]: sending unbind!", session->session_index,
-       vpp_handle);
+       session->vpp_handle);
   vcl_evt (VCL_EVT_UNBIND, session);
-  vppcom_send_unbind_sock (wrk, vpp_handle);
+
+  session->vpp_handle = ~0;
+  session->session_state = STATE_DISCONNECT;
 
   return VPPCOM_OK;
 }
@@ -894,7 +985,7 @@ vppcom_session_disconnect (u32 session_handle)
     {
       VDBG (1, "session %u [0x%llx]: sending disconnect...",
            session->session_index, vpp_handle);
-      vppcom_send_disconnect_session (vpp_handle);
+      vcl_send_session_disconnect (wrk, session);
     }
 
   if (session->listener_index != VCL_INVALID_SESSION_INDEX)
@@ -1005,7 +1096,7 @@ vppcom_app_destroy (void)
 
   if (pool_elts (vcm->workers) == 1)
     {
-      vppcom_app_send_detach ();
+      vcl_send_app_detach (vcl_worker_get_current ());
       orig_app_timeout = vcm->cfg.app_timeout;
       vcm->cfg.app_timeout = 2.0;
       rv = vcl_wait_for_app_state_change (STATE_APP_ENABLED);
@@ -1220,7 +1311,7 @@ vppcom_session_listen (uint32_t listen_sh, uint32_t q_len)
   /*
    * Send listen request to vpp and wait for reply
    */
-  vppcom_send_bind_sock (listen_session);
+  vcl_send_session_listen (wrk, listen_session);
   rv = vppcom_wait_for_session_state_change (listen_session->session_index,
                                             STATE_LISTEN,
                                             vcm->cfg.session_timeout);
@@ -1256,7 +1347,8 @@ vppcom_session_tls_add_cert (uint32_t session_handle, char *cert,
    * Send listen request to vpp and wait for reply
    */
   vppcom_send_application_tls_cert_add (session, cert, cert_len);
-
+  vcm->app_state = STATE_APP_ADDING_TLS_DATA;
+  vcl_wait_for_app_state_change (STATE_APP_READY);
   return VPPCOM_OK;
 
 }
@@ -1276,14 +1368,10 @@ vppcom_session_tls_add_key (uint32_t session_handle, char *key,
   if (key_len == 0 || key_len == ~0)
     return VPPCOM_EBADFD;
 
-  /*
-   * Send listen request to vpp and wait for reply
-   */
   vppcom_send_application_tls_key_add (session, key, key_len);
-
+  vcm->app_state = STATE_APP_ADDING_TLS_DATA;
+  vcl_wait_for_app_state_change (STATE_APP_READY);
   return VPPCOM_OK;
-
-
 }
 
 static int
@@ -1505,7 +1593,7 @@ vppcom_session_connect (uint32_t session_handle, vppcom_endpt_t * server_ep)
   /*
    * Send connect request and wait for reply from vpp
    */
-  vppcom_send_connect_sock (session);
+  vcl_send_session_connect (wrk, session);
   rv = vppcom_wait_for_session_state_change (session_index, STATE_CONNECT,
                                             vcm->cfg.session_timeout);
 
@@ -1564,7 +1652,7 @@ vppcom_session_stream_connect (uint32_t session_handle,
   /*
    * Send connect request and wait for reply from vpp
    */
-  vppcom_send_connect_sock (session);
+  vcl_send_session_connect (wrk, session);
   rv = vppcom_wait_for_session_state_change (session_index, STATE_CONNECT,
                                             vcm->cfg.session_timeout);
 
index ab67888..d4f3d61 100644 (file)
@@ -951,6 +951,8 @@ vnet_listen (vnet_listen_args_t * a)
   application_t *app;
   int rv;
 
+  ASSERT (vlib_thread_is_main_w_barrier ());
+
   app = application_get_if_valid (a->app_index);
   if (!app)
     return VNET_API_ERROR_APPLICATION_NOT_ATTACHED;
@@ -1001,6 +1003,8 @@ vnet_connect (vnet_connect_args_t * a)
   app_worker_t *client_wrk;
   application_t *client;
 
+  ASSERT (vlib_thread_is_main_w_barrier ());
+
   if (session_endpoint_is_zero (&a->sep))
     return VNET_API_ERROR_INVALID_VALUE;
 
@@ -1038,6 +1042,8 @@ vnet_unlisten (vnet_unlisten_args_t * a)
   app_listener_t *al;
   application_t *app;
 
+  ASSERT (vlib_thread_is_main_w_barrier ());
+
   if (!(app = application_get_if_valid (a->app_index)))
     return VNET_API_ERROR_APPLICATION_NOT_ATTACHED;
 
index 24c6fc3..9ec1055 100644 (file)
@@ -284,6 +284,15 @@ int vnet_app_worker_add_del (vnet_app_worker_add_del_args_t * a);
 
 uword unformat_application_proto (unformat_input_t * input, va_list * args);
 
+
+/* Needed while we support both bapi and mq ctrl messages */
+int mq_send_session_bound_cb (u32 app_wrk_index, u32 api_context,
+                             session_handle_t handle, int rv);
+int mq_send_session_connected_cb (u32 app_wrk_index, u32 api_context,
+                                 session_t * s, u8 is_fail);
+void mq_send_unlisten_reply (app_worker_t * app_wrk, session_handle_t sh,
+                            u32 context, int rv);
+
 #endif /* SRC_VNET_SESSION_APPLICATION_H_ */
 
 /*
index 6cc1af4..f5f684e 100644 (file)
@@ -261,6 +261,25 @@ typedef struct
 #undef _
 } app_session_t;
 
+typedef struct session_listen_msg_
+{
+  u32 client_index;
+  u32 context;                 /* Not needed but keeping it for compatibility with bapi */
+  u32 wrk_index;
+  u32 vrf;
+  u16 port;
+  u8 proto;
+  u8 is_ip4;
+  ip46_address_t ip;
+} __clib_packed session_listen_msg_t;
+
+typedef struct session_listen_uri_msg_
+{
+  u32 client_index;
+  u32 context;
+  u8 uri[56];
+} __clib_packed session_listen_uri_msg_t;
+
 typedef struct session_bound_msg_
 {
   u32 context;
@@ -277,6 +296,14 @@ typedef struct session_bound_msg_
   u8 segment_name[128];
 } __clib_packed session_bound_msg_t;
 
+typedef struct session_unlisten_msg_
+{
+  u32 client_index;
+  u32 context;
+  u32 wrk_index;
+  session_handle_t handle;
+} __clib_packed session_unlisten_msg_t;
+
 typedef struct session_unlisten_reply_msg_
 {
   u32 context;
@@ -303,9 +330,27 @@ typedef struct session_accepted_reply_msg_
   u64 handle;
 } __clib_packed session_accepted_reply_msg_t;
 
-/* Make sure this is not too large, otherwise it won't fit when dequeued in
- * the session queue node */
-STATIC_ASSERT (sizeof (session_accepted_reply_msg_t) <= 16, "accept reply");
+typedef struct session_connect_msg_
+{
+  u32 client_index;
+  u32 context;
+  u32 wrk_index;
+  u32 vrf;
+  u16 port;
+  u8 proto;
+  u8 is_ip4;
+  ip46_address_t ip;
+  u8 hostname_len;
+  u8 hostname[16];
+  u64 parent_handle;
+} __clib_packed session_connect_msg_t;
+
+typedef struct session_connect_uri_msg_
+{
+  u32 client_index;
+  u32 context;
+  u8 uri[56];
+} __clib_packed session_connect_uri_msg_t;
 
 typedef struct session_connected_msg_
 {
@@ -325,6 +370,13 @@ typedef struct session_connected_msg_
   transport_endpoint_t lcl;
 } __clib_packed session_connected_msg_t;
 
+typedef struct session_disconnect_msg_
+{
+  u32 client_index;
+  u32 context;
+  session_handle_t handle;
+} __clib_packed session_disconnect_msg_t;
+
 typedef struct session_disconnected_msg_
 {
   u32 client_index;
@@ -375,6 +427,12 @@ typedef struct session_worker_update_reply_msg_
   u64 segment_handle;
 } __clib_packed session_worker_update_reply_msg_t;
 
+typedef struct session_app_detach_msg_
+{
+  u32 client_index;
+  u32 context;
+} session_app_detach_msg_t;
+
 typedef struct app_session_event_
 {
   svm_msg_q_msg_t msg;
index 533f65e..52e050d 100644 (file)
  * limitations under the License.
  */
 
-option version = "1.6.0";
+option version = "1.7.0";
 
 /** \brief client->vpp, attach application to session layer
+       ### WILL BE DEPRECATED POST 20.01 ###
     @param client_index - opaque cookie to identify the sender
     @param context - sender context, to match reply w/ request
-    @param initial_segment_size - size of the initial shm segment to be 
+    @param initial_segment_size - size of the initial shm segment to be
                                                          allocated
     @param options - segment size, fifo sizes, etc.
     @param namespace_id_len - length of the namespace id c-string
@@ -32,17 +33,18 @@ option version = "1.6.0";
     u8 namespace_id_len;
     u8 namespace_id [64];
  };
+
  /** \brief Application attach reply
+       ### WILL BE DEPRECATED POST 20.01 ###
     @param context - sender context, to match reply w/ request
     @param retval - return code for the request
-    @param app_event_queue_address - vpp event queue address or 0 if this 
+    @param app_event_queue_address - vpp event queue address or 0 if this
                                         connection shouldn't send events
     @param n_fds - number of fds exchanged
     @param fd_flags - set of flags that indicate which fds are to be expected
-                                 over the socket (set only if socket transport available) 
+                                 over the socket (set only if socket transport available)
     @param segment_size - size of first shm segment
-    @param segment_name_length - length of segment name 
+    @param segment_name_length - length of segment name
     @param segment_name - name of segment client needs to attach to
     @param app_index - index of the newly created app
     @param segment_handle - handle for segment
@@ -60,6 +62,52 @@ define application_attach_reply {
     u64 segment_handle;
 };
 
+/** \brief Application attach to session layer
+    @param client_index - opaque cookie to identify the sender
+    @param context - sender context, to match reply w/ request
+    @param options - segment size, fifo sizes, etc.
+    @param namespace_id_len - length of the namespace id c-string
+    @param namespace_id - 0 terminted c-string
+*/
+ define app_attach {
+    u32 client_index;
+    u32 context;
+    u64 options[16];
+    u8 namespace_id_len;
+    u8 namespace_id[64];
+ };
+
+ /** \brief Application attach reply
+    @param context - sender context, to match reply w/ request
+    @param retval - return code for the request
+    @param app_mq - app message queue
+    @param vpp_ctrl_mq - vpp message queue for control events that should
+                                        be handled in main thread, i.e., bind/connect
+    @param vpp_ctrl_mq_thread_index - thread index of the ctrl mq
+    @param app_index - index of the newly created app
+    @param n_fds - number of fds exchanged
+    @param fd_flags - set of flags that indicate which fds are to be expected
+                                 over the socket (set only if socket transport available)
+    @param segment_size - size of first shm segment
+    @param segment_name_length - length of segment name
+    @param segment_name - name of segment client needs to attach to
+    @param segment_handle - handle for segment
+*/
+define app_attach_reply {
+    u32 context;
+    i32 retval;
+    u64 app_mq;
+    u64 vpp_ctrl_mq;
+    u8 vpp_ctrl_mq_thread;
+    u32 app_index;
+    u8 n_fds;
+    u8 fd_flags;
+    u32 segment_size;
+    u8 segment_name_length;
+    u8 segment_name[128];
+    u64 segment_handle;
+};
+
 /** \brief Application add TLS certificate
     @param client_index - opaque cookie to identify the sender
     @param context - sender context, to match reply w/ request
@@ -89,6 +137,7 @@ autoreply define application_tls_key_add {
 };
 
  /** \brief client->vpp, attach application to session layer
+       ### WILL BE DEPRECATED POST 20.01 ###
     @param client_index - opaque cookie to identify the sender
     @param context - sender context, to match reply w/ request
 */
@@ -96,12 +145,12 @@ autoreply define application_detach {
     u32 client_index;
     u32 context;
  };
+
 /** \brief vpp->client, please map an additional shared memory segment
     @param client_index - opaque cookie to identify the sender
     @param context - sender context, to match reply w/ request
-       @param fd_flags - set of flags that indicate which, if any, fds are 
-                                         to be expected over the socket. This is set only if 
+       @param fd_flags - set of flags that indicate which, if any, fds are
+                                         to be expected over the socket. This is set only if
                                          socket transport available
     @param segment_size - size of the segment to be mapped
     @param segment_name - name of the segment to be mapped
@@ -120,7 +169,7 @@ autoreply define map_another_segment {
     @param client_index - opaque cookie to identify the sender
     @param context - sender context, to match reply w/ request
     @param segment_name - segment name
-    @param segment_handle - handle of the segment to be unmapped 
+    @param segment_handle - handle of the segment to be unmapped
 */
 autoreply define unmap_segment {
     u32 client_index;
@@ -129,6 +178,7 @@ autoreply define unmap_segment {
 };
 
  /** \brief Bind to a given URI
+       ### WILL BE DEPRECATED POST 20.01 ###
     @param client_index - opaque cookie to identify the sender
     @param context - sender context, to match reply w/ request
     @param accept_cookie - sender accept cookie, to identify this bind flavor
@@ -144,6 +194,7 @@ autoreply define bind_uri {
 };
 
 /** \brief Unbind a given URI
+       ### WILL BE DEPRECATED POST 20.01 ###
     @param client_index - opaque cookie to identify the sender
     @param context - sender context, to match reply w/ request
     @param uri - a URI, e.g. "tcp://0.0.0.0/0/80" [ipv4]
@@ -157,12 +208,13 @@ autoreply define unbind_uri {
 };
 
 /** \brief Connect to a given URI
+       ### WILL BE DEPRECATED POST 20.01 ###
     @param client_index - opaque cookie to identify the sender
     @param context - sender context, to match reply w/ request
-    @param client_queue_address - binary API client queue address. Used by 
+    @param client_queue_address - binary API client queue address. Used by
                                                          local server when connect was redirected.
     @param options - socket options, fifo sizes, etc. passed by vpp to the
-                                server when redirecting connects 
+                                server when redirecting connects
     @param uri - a URI, e.g. "tcp4://0.0.0.0/0/80"
                  "tcp6://::/0/80" [ipv6], etc.
 */
@@ -175,6 +227,7 @@ autoreply define connect_uri {
 };
 
 /** \brief bidirectional disconnect API
+       ### WILL BE DEPRECATED POST 20.01 ###
     @param client_index - opaque cookie to identify the sender
                           client to vpp direction only
     @param context - sender context, to match reply w/ request
@@ -187,6 +240,7 @@ define disconnect_session {
 };
 
 /** \brief bidirectional disconnect reply API
+       ### WILL BE DEPRECATED POST 20.01 ###
     @param client_index - opaque cookie to identify the sender
                           client to vpp direction only
     @param context - sender context, to match reply w/ request
@@ -200,13 +254,14 @@ define disconnect_session_reply {
 };
 
 /** \brief Bind to an ip:port pair for a given transport protocol
+       ### WILL BE DEPRECATED POST 20.01 ###
     @param client_index - opaque cookie to identify the sender
     @param context - sender context, to match reply w/ request
     @param wrk_index - index of worker requesting the bind
     @param vrf - bind namespace
     @param is_ip4 - flag that is 1 if ip address family is IPv4
     @param ip - ip address
-    @param port - port 
+    @param port - port
     @param proto - protocol 0 - TCP 1 - UDP
     @param options - socket options, fifo sizes, etc.
 */
@@ -222,7 +277,8 @@ autoreply define bind_sock {
   u64 options[16];
 };
 
-/** \brief Unbind 
+/** \brief Unbind
+       ### WILL BE DEPRECATED POST 20.01 ###s
     @param client_index - opaque cookie to identify the sender
     @param context - sender context, to match reply w/ request
     @param wrk_index - index of worker requesting the bind
@@ -236,16 +292,17 @@ autoreply define unbind_sock {
 };
 
 /** \brief Connect to a remote peer
+       ### WILL BE DEPRECATED POST 20.01 ###
     @param client_index - opaque cookie to identify the sender
     @param context - sender context, to match reply w/ request
     @param wrk_index - worker that requests the connect
-    @param client_queue_address - client's API queue address. Non-zero when 
+    @param client_queue_address - client's API queue address. Non-zero when
                                   used to perform redirects
     @param options - socket options, fifo sizes, etc. when doing redirects
     @param vrf - connection namespace
     @param is_ip4 - flag that is 1 if ip address family is IPv4
     @param ip - ip address
-    @param port - port 
+    @param port - port
     @param proto - protocol 0 - TCP 1 - UDP
     @param hostname-len - length of hostname
     @param hostname - destination's hostname. If present, used by protocols
@@ -269,6 +326,7 @@ autoreply define connect_sock {
 };
 
 /** \brief ask app to add a new cut-through registration
+       ### WILL BE DEPRECATED POST 20.01 ###
     @param client_index - opaque cookie to identify the sender
                           client to vpp direction only
     @param context - sender context, to match reply w/ request
@@ -314,8 +372,8 @@ define app_worker_add_del
     @param app_event_queue_address - vpp event queue address of new worker
     @param n_fds - number of fds exchanged
     @param fd_flags - set of flags that indicate which fds are to be expected
-                                 over the socket (set only if socket transport available) 
-    @param segment_name_length - length of segment name 
+                                 over the socket (set only if socket transport available)
+    @param segment_name_length - length of segment name
     @param segment_name - name of segment client needs to attach to
     @param segment_handle - handle for segment
 */
index 4529245..c6f9d0a 100644 (file)
@@ -92,10 +92,10 @@ session_send_io_evt_to_thread_custom (void *data, u32 thread_index,
 int
 session_send_ctrl_evt_to_thread (session_t * s, session_evt_type_t evt_type)
 {
-  /* only event supported for now is disconnect */
-  ASSERT (evt_type == SESSION_CTRL_EVT_CLOSE);
-  return session_send_evt_to_thread (s, 0, s->thread_index,
-                                    SESSION_CTRL_EVT_CLOSE);
+  /* only events supported are disconnect and reset */
+  ASSERT (evt_type == SESSION_CTRL_EVT_CLOSE
+         || evt_type == SESSION_CTRL_EVT_RESET);
+  return session_send_evt_to_thread (s, 0, s->thread_index, evt_type);
 }
 
 void
index de44bed..04fdebe 100644 (file)
@@ -62,12 +62,19 @@ typedef struct session_tx_context_
   session_dgram_hdr_t hdr;
 } session_tx_context_t;
 
+#define SESSION_CTRL_MSG_MAX_SIZE 64
+
 typedef struct session_evt_elt
 {
   clib_llist_anchor_t evt_list;
   session_event_t evt;
 } session_evt_elt_t;
 
+typedef struct session_ctrl_evt_data_
+{
+  u8 data[SESSION_CTRL_MSG_MAX_SIZE];
+} session_evt_ctrl_data_t;
+
 typedef struct session_worker_
 {
   CLIB_CACHE_LINE_ALIGN_MARK (cacheline0);
@@ -96,6 +103,9 @@ typedef struct session_worker_
   /** Pool of session event list elements */
   session_evt_elt_t *event_elts;
 
+  /** Pool of ctrl events data buffers */
+  session_evt_ctrl_data_t *ctrl_evts_data;
+
   /** Head of control events list */
   clib_llist_index_t ctrl_head;
 
@@ -207,6 +217,14 @@ session_evt_add_old (session_worker_t * wrk, session_evt_elt_t * elt)
                       pool_elt_at_index (wrk->event_elts, wrk->old_head));
 }
 
+static inline u32
+session_evt_ctrl_data_alloc (session_worker_t * wrk)
+{
+  session_evt_ctrl_data_t *data;
+  pool_get (wrk->ctrl_evts_data, data);
+  return (data - wrk->ctrl_evts_data);
+}
+
 static inline session_evt_elt_t *
 session_evt_alloc_ctrl (session_worker_t * wrk)
 {
@@ -217,6 +235,20 @@ session_evt_alloc_ctrl (session_worker_t * wrk)
   return elt;
 }
 
+static inline void *
+session_evt_ctrl_data (session_worker_t * wrk, session_evt_elt_t * elt)
+{
+  return (void *) (pool_elt_at_index (wrk->ctrl_evts_data,
+                                     elt->evt.ctrl_data_index));
+}
+
+static inline void
+session_evt_ctrl_data_free (session_worker_t * wrk, session_evt_elt_t * elt)
+{
+  ASSERT (elt->evt.event_type > SESSION_IO_EVT_BUILTIN_TX);
+  pool_put_index (wrk->ctrl_evts_data, elt->evt.ctrl_data_index);
+}
+
 static inline session_evt_elt_t *
 session_evt_alloc_new (session_worker_t * wrk)
 {
index e3e3bb3..8f9ce3f 100755 (executable)
@@ -43,6 +43,7 @@
 #define foreach_session_api_msg                                         \
 _(MAP_ANOTHER_SEGMENT_REPLY, map_another_segment_reply)                 \
 _(APPLICATION_ATTACH, application_attach)                              \
+_(APP_ATTACH, app_attach)                                              \
 _(APPLICATION_DETACH, application_detach)                              \
 _(BIND_URI, bind_uri)                                                   \
 _(UNBIND_URI, unbind_uri)                                               \
@@ -298,7 +299,7 @@ mq_send_session_reset_cb (session_t * s)
                                 SESSION_CTRL_EVT_RESET);
 }
 
-static int
+int
 mq_send_session_connected_cb (u32 app_wrk_index, u32 api_context,
                              session_t * s, u8 is_fail)
 {
@@ -378,7 +379,7 @@ done:
   return 0;
 }
 
-static int
+int
 mq_send_session_bound_cb (u32 app_wrk_index, u32 api_context,
                          session_handle_t handle, int rv)
 {
@@ -438,13 +439,35 @@ done:
   return 0;
 }
 
+void
+mq_send_unlisten_reply (app_worker_t * app_wrk, session_handle_t sh,
+                       u32 context, int rv)
+{
+  svm_msg_q_msg_t _msg, *msg = &_msg;
+  session_unlisten_reply_msg_t *ump;
+  svm_msg_q_t *app_mq;
+  session_event_t *evt;
+
+  app_mq = app_wrk->event_queue;
+  if (mq_try_lock_and_alloc_msg (app_mq, msg))
+    return;
+
+  evt = svm_msg_q_msg_data (app_mq, msg);
+  clib_memset (evt, 0, sizeof (*evt));
+  evt->event_type = SESSION_CTRL_EVT_UNLISTEN_REPLY;
+  ump = (session_unlisten_reply_msg_t *) evt->data;
+  ump->context = context;
+  ump->handle = sh;
+  ump->retval = rv;
+  svm_msg_q_add_and_unlock (app_mq, msg);
+}
+
 static void
 mq_send_session_migrate_cb (session_t * s, session_handle_t new_sh)
 {
   clib_warning ("not supported");
 }
 
-
 static session_cb_vft_t session_mq_cb_vft = {
   .session_accept_callback = mq_send_session_accepted_cb,
   .session_disconnect_callback = mq_send_session_disconnected_cb,
@@ -466,6 +489,7 @@ vl_api_session_enable_disable_t_handler (vl_api_session_enable_disable_t * mp)
   REPLY_MACRO (VL_API_SESSION_ENABLE_DISABLE_REPLY);
 }
 
+/* ### WILL BE DEPRECATED POST 20.01 ### */
 static void
 vl_api_application_attach_t_handler (vl_api_application_attach_t * mp)
 {
@@ -563,6 +587,108 @@ done:
     session_send_fds (reg, fds, n_fds);
 }
 
+static void
+vl_api_app_attach_t_handler (vl_api_app_attach_t * mp)
+{
+  int rv = 0, fds[SESSION_N_FD_TYPE], n_fds = 0;
+  vl_api_app_attach_reply_t *rmp;
+  ssvm_private_t *segp, *evt_q_segment;
+  vnet_app_attach_args_t _a, *a = &_a;
+  u8 fd_flags = 0, ctrl_thread;
+  vl_api_registration_t *reg;
+  svm_msg_q_t *ctrl_mq;
+
+  reg = vl_api_client_index_to_registration (mp->client_index);
+  if (!reg)
+    return;
+
+  if (session_main_is_enabled () == 0)
+    {
+      rv = VNET_API_ERROR_FEATURE_DISABLED;
+      goto done;
+    }
+
+  STATIC_ASSERT (sizeof (u64) * APP_OPTIONS_N_OPTIONS <=
+                sizeof (mp->options),
+                "Out of options, fix api message definition");
+
+  clib_memset (a, 0, sizeof (*a));
+  a->api_client_index = mp->client_index;
+  a->options = mp->options;
+  a->session_cb_vft = &session_mq_cb_vft;
+  if (mp->namespace_id_len > 64)
+    {
+      rv = VNET_API_ERROR_INVALID_VALUE;
+      goto done;
+    }
+
+  if (mp->namespace_id_len)
+    {
+      vec_validate (a->namespace_id, mp->namespace_id_len - 1);
+      clib_memcpy_fast (a->namespace_id, mp->namespace_id,
+                       mp->namespace_id_len);
+    }
+
+  if ((rv = vnet_application_attach (a)))
+    {
+      clib_warning ("attach returned: %d", rv);
+      vec_free (a->namespace_id);
+      goto done;
+    }
+  vec_free (a->namespace_id);
+
+  /* Send event queues segment */
+  if ((evt_q_segment = session_main_get_evt_q_segment ()))
+    {
+      fd_flags |= SESSION_FD_F_VPP_MQ_SEGMENT;
+      fds[n_fds] = evt_q_segment->fd;
+      n_fds += 1;
+    }
+  /* Send fifo segment fd if needed */
+  if (ssvm_type (a->segment) == SSVM_SEGMENT_MEMFD)
+    {
+      fd_flags |= SESSION_FD_F_MEMFD_SEGMENT;
+      fds[n_fds] = a->segment->fd;
+      n_fds += 1;
+    }
+  if (a->options[APP_OPTIONS_FLAGS] & APP_OPTIONS_FLAGS_EVT_MQ_USE_EVENTFD)
+    {
+      fd_flags |= SESSION_FD_F_MQ_EVENTFD;
+      fds[n_fds] = svm_msg_q_get_producer_eventfd (a->app_evt_q);
+      n_fds += 1;
+    }
+
+done:
+
+  ctrl_thread = vlib_num_workers ()? 1 : 0;
+  ctrl_mq = session_main_get_vpp_event_queue (ctrl_thread);
+  /* *INDENT-OFF* */
+  REPLY_MACRO2 (VL_API_APP_ATTACH_REPLY, ({
+    if (!rv)
+      {
+       segp = a->segment;
+       rmp->app_index = clib_host_to_net_u32 (a->app_index);
+       rmp->app_mq = pointer_to_uword (a->app_evt_q);
+       rmp->vpp_ctrl_mq = pointer_to_uword (ctrl_mq);
+       rmp->vpp_ctrl_mq_thread = ctrl_thread;
+       rmp->n_fds = n_fds;
+       rmp->fd_flags = fd_flags;
+       if (vec_len (segp->name))
+         {
+           memcpy (rmp->segment_name, segp->name, vec_len (segp->name));
+           rmp->segment_name_length = vec_len (segp->name);
+         }
+       rmp->segment_size = segp->ssvm_size;
+       rmp->segment_handle = clib_host_to_net_u64 (a->segment_handle);
+      }
+  }));
+  /* *INDENT-ON* */
+
+  if (n_fds)
+    session_send_fds (reg, fds, n_fds);
+}
+
+/* ### WILL BE DEPRECATED POST 20.01 ### */
 static void
 vl_api_application_detach_t_handler (vl_api_application_detach_t * mp)
 {
@@ -589,6 +715,7 @@ done:
   REPLY_MACRO (VL_API_APPLICATION_DETACH_REPLY);
 }
 
+/* ### WILL BE DEPRECATED POST 20.01 ### */
 static void
 vl_api_bind_uri_t_handler (vl_api_bind_uri_t * mp)
 {
@@ -629,6 +756,7 @@ done:
     }
 }
 
+/* ### WILL BE DEPRECATED POST 20.01 ### */
 static void
 vl_api_unbind_uri_t_handler (vl_api_unbind_uri_t * mp)
 {
@@ -660,6 +788,7 @@ done:
   REPLY_MACRO (VL_API_UNBIND_URI_REPLY);
 }
 
+/* ### WILL BE DEPRECATED POST 20.01 ### */
 static void
 vl_api_connect_uri_t_handler (vl_api_connect_uri_t * mp)
 {
@@ -701,6 +830,7 @@ done:
   REPLY_MACRO (VL_API_CONNECT_URI_REPLY);
 }
 
+/* ### WILL BE DEPRECATED POST 20.01 ### */
 static void
 vl_api_disconnect_session_t_handler (vl_api_disconnect_session_t * mp)
 {
@@ -731,6 +861,7 @@ done:
   REPLY_MACRO2 (VL_API_DISCONNECT_SESSION_REPLY, rmp->handle = mp->handle);
 }
 
+/* ### WILL BE DEPRECATED POST 20.01 ### */
 static void
 vl_api_disconnect_session_reply_t_handler (vl_api_disconnect_session_reply_t *
                                           mp)
@@ -762,6 +893,7 @@ vl_api_map_another_segment_reply_t_handler (vl_api_map_another_segment_reply_t
   clib_warning ("not implemented");
 }
 
+/* ### WILL BE DEPRECATED POST 20.01 ### */
 static void
 vl_api_bind_sock_t_handler (vl_api_bind_sock_t * mp)
 {
@@ -811,6 +943,7 @@ done:
     }
 }
 
+/* ### WILL BE DEPRECATED POST 20.01 ### */
 static void
 vl_api_unbind_sock_t_handler (vl_api_unbind_sock_t * mp)
 {
@@ -839,35 +972,14 @@ vl_api_unbind_sock_t_handler (vl_api_unbind_sock_t * mp)
 done:
   REPLY_MACRO (VL_API_UNBIND_SOCK_REPLY);
 
-  /*
-   * Send reply over msg queue
-   */
-  svm_msg_q_msg_t _msg, *msg = &_msg;
-  session_unlisten_reply_msg_t *ump;
-  svm_msg_q_t *app_mq;
-  session_event_t *evt;
-
-  if (!app)
-    return;
-
   app_wrk = application_get_worker (app, a->wrk_map_index);
   if (!app_wrk)
     return;
 
-  app_mq = app_wrk->event_queue;
-  if (mq_try_lock_and_alloc_msg (app_mq, msg))
-    return;
-
-  evt = svm_msg_q_msg_data (app_mq, msg);
-  clib_memset (evt, 0, sizeof (*evt));
-  evt->event_type = SESSION_CTRL_EVT_UNLISTEN_REPLY;
-  ump = (session_unlisten_reply_msg_t *) evt->data;
-  ump->context = mp->context;
-  ump->handle = mp->handle;
-  ump->retval = rv;
-  svm_msg_q_add_and_unlock (app_mq, msg);
+  mq_send_unlisten_reply (app_wrk, mp->handle, mp->context, rv);
 }
 
+/* ### WILL BE DEPRECATED POST 20.01 ### */
 static void
 vl_api_connect_sock_t_handler (vl_api_connect_sock_t * mp)
 {
index 1d662a2..ad18637 100644 (file)
 #include <vnet/session/session_debug.h>
 #include <svm/queue.h>
 
-static void session_mq_accepted_reply_handler (void *data);
+#define app_check_thread_and_barrier(_fn, _arg)                                \
+  if (!vlib_thread_is_main_w_barrier ())                               \
+    {                                                                  \
+     vlib_rpc_call_main_thread (_fn, (u8 *) _arg, sizeof(*_arg));      \
+      return;                                                          \
+   }
 
 static void
-accepted_notify_cb (void *data, u32 data_len)
+session_mq_listen_handler (void *data)
 {
-  session_mq_accepted_reply_handler (data);
+  session_listen_msg_t *mp = (session_listen_msg_t *) data;
+  vnet_listen_args_t _a, *a = &_a;
+  app_worker_t *app_wrk;
+  application_t *app;
+  int rv;
+
+  app_check_thread_and_barrier (session_mq_listen_handler, mp);
+
+  app = application_lookup (mp->client_index);
+  if (!app)
+    return;
+
+  clib_memset (a, 0, sizeof (*a));
+  a->sep.is_ip4 = mp->is_ip4;
+  clib_memcpy_fast (&a->sep.ip, &mp->ip, sizeof (mp->ip));
+  a->sep.port = mp->port;
+  a->sep.fib_index = mp->vrf;
+  a->sep.sw_if_index = ENDPOINT_INVALID_INDEX;
+  a->sep.transport_proto = mp->proto;
+  a->app_index = app->app_index;
+  a->wrk_map_index = mp->wrk_index;
+
+  if ((rv = vnet_listen (a)))
+    clib_warning ("listen returned: %d", rv);
+
+  app_wrk = application_get_worker (app, mp->wrk_index);
+  mq_send_session_bound_cb (app_wrk->wrk_index, mp->context, a->handle, rv);
+  return;
+}
+
+static void
+session_mq_listen_uri_handler (void *data)
+{
+  session_listen_uri_msg_t *mp = (session_listen_uri_msg_t *) data;
+  vnet_listen_args_t _a, *a = &_a;
+  app_worker_t *app_wrk;
+  application_t *app;
+  int rv;
+
+  app_check_thread_and_barrier (session_mq_listen_uri_handler, mp);
+
+  app = application_lookup (mp->client_index);
+  if (!app)
+    return;
+
+  clib_memset (a, 0, sizeof (*a));
+  a->uri = (char *) mp->uri;
+  a->app_index = app->app_index;
+  rv = vnet_bind_uri (a);
+
+  app_wrk = application_get_worker (app, 0);
+  mq_send_session_bound_cb (app_wrk->wrk_index, mp->context, a->handle, rv);
+}
+
+static void
+session_mq_connect_handler (void *data)
+{
+  session_connect_msg_t *mp = (session_connect_msg_t *) data;
+  vnet_connect_args_t _a, *a = &_a;
+  app_worker_t *app_wrk;
+  application_t *app;
+  int rv;
+
+  app_check_thread_and_barrier (session_mq_connect_handler, mp);
+
+  app = application_lookup (mp->client_index);
+  if (!app)
+    return;
+
+  clib_memset (a, 0, sizeof (*a));
+  a->sep.is_ip4 = mp->is_ip4;
+  clib_memcpy_fast (&a->sep.ip, &mp->ip, sizeof (mp->ip));
+  a->sep.port = mp->port;
+  a->sep.transport_proto = mp->proto;
+  a->sep.peer.fib_index = mp->vrf;
+  a->sep.peer.sw_if_index = ENDPOINT_INVALID_INDEX;
+  a->sep_ext.parent_handle = mp->parent_handle;
+  if (mp->hostname_len)
+    {
+      vec_validate (a->sep_ext.hostname, mp->hostname_len - 1);
+      clib_memcpy_fast (a->sep_ext.hostname, mp->hostname, mp->hostname_len);
+    }
+  a->api_context = mp->context;
+  a->app_index = app->app_index;
+  a->wrk_map_index = mp->wrk_index;
+
+  if ((rv = vnet_connect (a)))
+    {
+      clib_warning ("connect returned: %U", format_vnet_api_errno, rv);
+      app_wrk = application_get_worker (app, mp->wrk_index);
+      mq_send_session_connected_cb (app_wrk->wrk_index, mp->context, 0,
+                                   /* is_fail */ 1);
+    }
+
+  vec_free (a->sep_ext.hostname);
+}
+
+static void
+session_mq_connect_uri_handler (void *data)
+{
+  session_connect_uri_msg_t *mp = (session_connect_uri_msg_t *) data;
+  vnet_connect_args_t _a, *a = &_a;
+  app_worker_t *app_wrk;
+  application_t *app;
+  int rv;
+
+  app_check_thread_and_barrier (session_mq_connect_uri_handler, mp);
+
+  app = application_lookup (mp->client_index);
+  if (!app)
+    return;
+
+  clib_memset (a, 0, sizeof (*a));
+  a->uri = (char *) mp->uri;
+  a->api_context = mp->context;
+  a->app_index = app->app_index;
+  if ((rv = vnet_connect_uri (a)))
+    {
+      clib_warning ("connect_uri returned: %d", rv);
+      app_wrk = application_get_worker (app, 0 /* default wrk only */ );
+      mq_send_session_connected_cb (app_wrk->wrk_index, mp->context, 0,
+                                   /* is_fail */ 1);
+    }
+}
+
+static void
+session_mq_disconnect_handler (void *data)
+{
+  session_disconnect_msg_t *mp = (session_disconnect_msg_t *) data;
+  vnet_disconnect_args_t _a, *a = &_a;
+  application_t *app;
+
+  app = application_lookup (mp->client_index);
+  if (!app)
+    return;
+
+  a->app_index = app->app_index;
+  a->handle = mp->handle;
+  vnet_disconnect_session (a);
+}
+
+static void
+app_mq_detach_handler (void *data)
+{
+  session_app_detach_msg_t *mp = (session_app_detach_msg_t *) data;
+  vnet_app_detach_args_t _a, *a = &_a;
+  application_t *app;
+
+  app_check_thread_and_barrier (app_mq_detach_handler, mp);
+
+  app = application_lookup (mp->client_index);
+  if (!app)
+    return;
+
+  a->app_index = app->app_index;
+  a->api_client_index = mp->client_index;
+  vnet_application_detach (a);
+}
+
+static void
+session_mq_unlisten_handler (void *data)
+{
+  session_unlisten_msg_t *mp = (session_unlisten_msg_t *) data;
+  vnet_unlisten_args_t _a, *a = &_a;
+  app_worker_t *app_wrk;
+  application_t *app;
+  int rv;
+
+  app_check_thread_and_barrier (session_mq_unlisten_handler, mp);
+
+  app = application_lookup (mp->client_index);
+  if (!app)
+    return;
+
+  clib_memset (a, 0, sizeof (*a));
+  a->app_index = app->app_index;
+  a->handle = mp->handle;
+  a->wrk_map_index = mp->wrk_index;
+  if ((rv = vnet_unlisten (a)))
+    clib_warning ("unlisten returned: %d", rv);
+
+  app_wrk = application_get_worker (app, a->wrk_map_index);
+  if (!app_wrk)
+    return;
+
+  mq_send_unlisten_reply (app_wrk, mp->handle, mp->context, rv);
 }
 
 static void
@@ -56,8 +246,8 @@ session_mq_accepted_reply_handler (void *data)
   if (vlib_num_workers () && vlib_get_thread_index () != 0
       && session_thread_from_handle (mp->handle) == 0)
     {
-      vl_api_rpc_call_main_thread (accepted_notify_cb, data,
-                                  sizeof (session_accepted_reply_msg_t));
+      vlib_rpc_call_main_thread (session_mq_accepted_reply_handler,
+                                (u8 *) mp, sizeof (*mp));
       return;
     }
 
@@ -859,14 +1049,93 @@ session_event_get_session (session_event_t * e, u8 thread_index)
 }
 
 always_inline void
-session_event_dispatch (session_worker_t * wrk, vlib_node_runtime_t * node,
-                       session_evt_elt_t * elt, u32 thread_index,
-                       int *n_tx_packets)
+session_event_dispatch_ctrl (session_worker_t * wrk, session_evt_elt_t * elt)
+{
+  clib_llist_index_t ei;
+  void (*fp) (void *);
+  session_event_t *e;
+  session_t *s;
+
+  ei = clib_llist_entry_index (wrk->event_elts, elt);
+  e = &elt->evt;
+
+  switch (e->event_type)
+    {
+    case SESSION_CTRL_EVT_RPC:
+      fp = e->rpc_args.fp;
+      (*fp) (e->rpc_args.arg);
+      break;
+    case SESSION_CTRL_EVT_CLOSE:
+      s = session_get_from_handle_if_valid (e->session_handle);
+      if (PREDICT_FALSE (!s))
+       break;
+      session_transport_close (s);
+      break;
+    case SESSION_CTRL_EVT_RESET:
+      s = session_get_from_handle_if_valid (e->session_handle);
+      if (PREDICT_FALSE (!s))
+       break;
+      session_transport_reset (s);
+      break;
+    case SESSION_CTRL_EVT_LISTEN:
+      session_mq_listen_handler (session_evt_ctrl_data (wrk, elt));
+      break;
+    case SESSION_CTRL_EVT_LISTEN_URI:
+      session_mq_listen_uri_handler (session_evt_ctrl_data (wrk, elt));
+      break;
+    case SESSION_CTRL_EVT_UNLISTEN:
+      session_mq_unlisten_handler (session_evt_ctrl_data (wrk, elt));
+      break;
+    case SESSION_CTRL_EVT_CONNECT:
+      session_mq_connect_handler (session_evt_ctrl_data (wrk, elt));
+      break;
+    case SESSION_CTRL_EVT_CONNECT_URI:
+      session_mq_connect_uri_handler (session_evt_ctrl_data (wrk, elt));
+      break;
+    case SESSION_CTRL_EVT_DISCONNECT:
+      session_mq_disconnect_handler (session_evt_ctrl_data (wrk, elt));
+      break;
+    case SESSION_CTRL_EVT_DISCONNECTED:
+      session_mq_disconnected_handler (session_evt_ctrl_data (wrk, elt));
+      break;
+    case SESSION_CTRL_EVT_ACCEPTED_REPLY:
+      session_mq_accepted_reply_handler (session_evt_ctrl_data (wrk, elt));
+      break;
+    case SESSION_CTRL_EVT_DISCONNECTED_REPLY:
+      session_mq_disconnected_reply_handler (session_evt_ctrl_data (wrk,
+                                                                   elt));
+      break;
+    case SESSION_CTRL_EVT_RESET_REPLY:
+      session_mq_reset_reply_handler (session_evt_ctrl_data (wrk, elt));
+      break;
+    case SESSION_CTRL_EVT_WORKER_UPDATE:
+      session_mq_worker_update_handler (session_evt_ctrl_data (wrk, elt));
+      break;
+    case SESSION_CTRL_EVT_APP_DETACH:
+      app_mq_detach_handler (session_evt_ctrl_data (wrk, elt));
+      break;
+    default:
+      clib_warning ("unhandled event type %d", e->event_type);
+    }
+
+  /* Regrab elements in case pool moved */
+  elt = pool_elt_at_index (wrk->event_elts, ei);
+  if (!clib_llist_elt_is_linked (elt, evt_list))
+    {
+      if (e->event_type >= SESSION_CTRL_EVT_BOUND)
+       session_evt_ctrl_data_free (wrk, elt);
+      session_evt_elt_free (wrk, elt);
+    }
+}
+
+always_inline void
+session_event_dispatch_io (session_worker_t * wrk, vlib_node_runtime_t * node,
+                          session_evt_elt_t * elt, u32 thread_index,
+                          int *n_tx_packets)
 {
   session_main_t *smm = &session_main;
   app_worker_t *app_wrk;
   clib_llist_index_t ei;
-  void (*fp) (void *);
   session_event_t *e;
   session_t *s;
 
@@ -896,18 +1165,6 @@ session_event_dispatch (session_worker_t * wrk, vlib_node_runtime_t * node,
       transport_app_rx_evt (session_get_transport_proto (s),
                            s->connection_index, s->thread_index);
       break;
-    case SESSION_CTRL_EVT_CLOSE:
-      s = session_get_from_handle_if_valid (e->session_handle);
-      if (PREDICT_FALSE (!s))
-       break;
-      session_transport_close (s);
-      break;
-    case SESSION_CTRL_EVT_RESET:
-      s = session_get_from_handle_if_valid (e->session_handle);
-      if (PREDICT_FALSE (!s))
-       break;
-      session_transport_reset (s);
-      break;
     case SESSION_IO_EVT_BUILTIN_RX:
       s = session_event_get_session (e, thread_index);
       if (PREDICT_FALSE (!s || s->session_state >= SESSION_STATE_CLOSING))
@@ -922,27 +1179,6 @@ session_event_dispatch (session_worker_t * wrk, vlib_node_runtime_t * node,
       if (PREDICT_TRUE (s != 0))
        session_tx_fifo_dequeue_internal (wrk, node, elt, n_tx_packets);
       break;
-    case SESSION_CTRL_EVT_RPC:
-      fp = e->rpc_args.fp;
-      (*fp) (e->rpc_args.arg);
-      break;
-    case SESSION_CTRL_EVT_DISCONNECTED:
-      session_mq_disconnected_handler (e->data);
-      break;
-    case SESSION_CTRL_EVT_ACCEPTED_REPLY:
-      session_mq_accepted_reply_handler (e->data);
-      break;
-    case SESSION_CTRL_EVT_CONNECTED_REPLY:
-      break;
-    case SESSION_CTRL_EVT_DISCONNECTED_REPLY:
-      session_mq_disconnected_reply_handler (e->data);
-      break;
-    case SESSION_CTRL_EVT_RESET_REPLY:
-      session_mq_reset_reply_handler (e->data);
-      break;
-    case SESSION_CTRL_EVT_WORKER_UPDATE:
-      session_mq_worker_update_handler (e->data);
-      break;
     default:
       clib_warning ("unhandled event type %d", e->event_type);
     }
@@ -953,6 +1189,43 @@ session_event_dispatch (session_worker_t * wrk, vlib_node_runtime_t * node,
     session_evt_elt_free (wrk, elt);
 }
 
+/* *INDENT-OFF* */
+static const u32 session_evt_msg_sizes[] = {
+#define _(symc, sym)                                                   \
+  [SESSION_CTRL_EVT_ ## symc] = sizeof (session_ ## sym ##_msg_t),
+  foreach_session_ctrl_evt
+#undef _
+};
+/* *INDENT-ON* */
+
+always_inline void
+session_evt_add_to_list (session_worker_t * wrk, session_event_t * evt)
+{
+  session_evt_elt_t *elt;
+
+  if (evt->event_type >= SESSION_CTRL_EVT_RPC)
+    {
+      elt = session_evt_alloc_ctrl (wrk);
+      if (evt->event_type >= SESSION_CTRL_EVT_BOUND)
+       {
+         elt->evt.ctrl_data_index = session_evt_ctrl_data_alloc (wrk);
+         elt->evt.event_type = evt->event_type;
+         clib_memcpy_fast (session_evt_ctrl_data (wrk, elt), evt->data,
+                           session_evt_msg_sizes[evt->event_type]);
+       }
+      else
+       {
+         /* Internal control events fit into io events footprint */
+         clib_memcpy_fast (&elt->evt, evt, sizeof (elt->evt));
+       }
+    }
+  else
+    {
+      elt = session_evt_alloc_new (wrk);
+      clib_memcpy_fast (&elt->evt, evt, sizeof (elt->evt));
+    }
+}
+
 static uword
 session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
                       vlib_frame_t * frame)
@@ -990,14 +1263,7 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
        {
          svm_msg_q_sub_w_lock (mq, msg);
          evt = svm_msg_q_msg_data (mq, msg);
-         if (evt->event_type > SESSION_IO_EVT_BUILTIN_TX)
-           elt = session_evt_alloc_ctrl (wrk);
-         else
-           elt = session_evt_alloc_new (wrk);
-         /* Works because reply messages are smaller than a session evt.
-          * If we ever need to support bigger messages this needs to be
-          * fixed */
-         clib_memcpy_fast (&elt->evt, evt, sizeof (elt->evt));
+         session_evt_add_to_list (wrk, evt);
          svm_msg_q_free_msg (mq, msg);
        }
       svm_msg_q_unlock (mq);
@@ -1012,7 +1278,7 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
   /* *INDENT-OFF* */
   clib_llist_foreach_safe (wrk->event_elts, evt_list, ctrl_he, elt, ({
     clib_llist_remove (wrk->event_elts, evt_list, elt);
-    session_event_dispatch (wrk, node, elt, thread_index, &n_tx_packets);
+    session_event_dispatch_ctrl (wrk, elt);
   }));
   /* *INDENT-ON* */
 
@@ -1037,7 +1303,7 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
        continue;
       }
 
-    session_event_dispatch (wrk, node, elt, thread_index, &n_tx_packets);
+    session_event_dispatch_io (wrk, node, elt, thread_index, &n_tx_packets);
   }));
   /* *INDENT-ON* */
 
@@ -1054,7 +1320,7 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
 
       clib_llist_pop_first (wrk->event_elts, evt_list, elt, old_he);
       ei = clib_llist_entry_index (wrk->event_elts, elt);
-      session_event_dispatch (wrk, node, elt, thread_index, &n_tx_packets);
+      session_event_dispatch_io (wrk, node, elt, thread_index, &n_tx_packets);
 
       old_he = pool_elt_at_index (wrk->event_elts, wrk->old_head);
       if (n_tx_packets >= VLIB_FRAME_SIZE || ei == old_ti)
index f9472ba..52a79e3 100644 (file)
@@ -290,21 +290,48 @@ typedef enum
   SESSION_IO_EVT_BUILTIN_TX,
   SESSION_CTRL_EVT_RPC,
   SESSION_CTRL_EVT_CLOSE,
+  SESSION_CTRL_EVT_RESET,
   SESSION_CTRL_EVT_BOUND,
   SESSION_CTRL_EVT_UNLISTEN_REPLY,
   SESSION_CTRL_EVT_ACCEPTED,
   SESSION_CTRL_EVT_ACCEPTED_REPLY,
   SESSION_CTRL_EVT_CONNECTED,
-  SESSION_CTRL_EVT_CONNECTED_REPLY,
   SESSION_CTRL_EVT_DISCONNECTED,
   SESSION_CTRL_EVT_DISCONNECTED_REPLY,
-  SESSION_CTRL_EVT_RESET,
   SESSION_CTRL_EVT_RESET_REPLY,
   SESSION_CTRL_EVT_REQ_WORKER_UPDATE,
   SESSION_CTRL_EVT_WORKER_UPDATE,
   SESSION_CTRL_EVT_WORKER_UPDATE_REPLY,
+  SESSION_CTRL_EVT_DISCONNECT,
+  SESSION_CTRL_EVT_CONNECT,
+  SESSION_CTRL_EVT_CONNECT_URI,
+  SESSION_CTRL_EVT_LISTEN,
+  SESSION_CTRL_EVT_LISTEN_URI,
+  SESSION_CTRL_EVT_UNLISTEN,
+  SESSION_CTRL_EVT_APP_DETACH,
 } session_evt_type_t;
 
+#define foreach_session_ctrl_evt                               \
+  _(LISTEN, listen)                                            \
+  _(LISTEN_URI, listen_uri)                                    \
+  _(BOUND, bound)                                              \
+  _(UNLISTEN, unlisten)                                                \
+  _(UNLISTEN_REPLY, unlisten_reply)                            \
+  _(ACCEPTED, accepted)                                                \
+  _(ACCEPTED_REPLY, accepted_reply)                            \
+  _(CONNECT, connect)                                          \
+  _(CONNECT_URI, connect_uri)                                  \
+  _(CONNECTED, connected)                                      \
+  _(DISCONNECT, disconnect)                                    \
+  _(DISCONNECTED, disconnected)                                        \
+  _(DISCONNECTED_REPLY, disconnected_reply)                    \
+  _(RESET_REPLY, reset_reply)                                  \
+  _(REQ_WORKER_UPDATE, req_worker_update)                      \
+  _(WORKER_UPDATE, worker_update)                              \
+  _(WORKER_UPDATE_REPLY, worker_update_reply)                  \
+  _(APP_DETACH, app_detach)                                    \
+
+
 /* Deprecated and will be removed. Use types above */
 #define FIFO_EVENT_APP_RX SESSION_IO_EVT_RX
 #define FIFO_EVENT_APP_TX SESSION_IO_EVT_TX
@@ -334,6 +361,7 @@ typedef struct
     u32 session_index;
     session_handle_t session_handle;
     session_rpc_args_t rpc_args;
+    u32 ctrl_data_index;
     struct
     {
       u8 data[0];