session: send ctrl msg over mq
[vpp.git] / src / vnet / session / session_api.c
index f9fddea..724aff1 100755 (executable)
@@ -363,6 +363,207 @@ static session_cb_vft_t session_cb_vft = {
   .del_segment_callback = send_del_segment_callback,
 };
 
+static int
+mq_send_session_accepted_cb (stream_session_t * s)
+{
+  application_t *app = application_get (s->app_index);
+  svm_msg_q_msg_t _msg, *msg = &_msg;
+  svm_msg_q_t *vpp_queue, *app_mq;
+  transport_proto_vft_t *tp_vft;
+  transport_connection_t *tc;
+  stream_session_t *listener;
+  session_accepted_msg_t *mp;
+  session_event_t *evt;
+
+  app_mq = app->event_queue;
+  svm_msg_q_lock_and_alloc_msg_w_ring (app_mq, SESSION_MQ_CTRL_EVT_RING,
+                                      SVM_Q_WAIT, msg);
+  svm_msg_q_unlock (app_mq);
+
+  evt = svm_msg_q_msg_data (app_mq, msg);
+  memset (evt, 0, sizeof (*evt));
+  evt->event_type = SESSION_CTRL_EVT_ACCEPTED;
+  mp = (session_accepted_msg_t *) evt->data;
+  mp->context = app->index;
+  mp->server_rx_fifo = pointer_to_uword (s->server_rx_fifo);
+  mp->server_tx_fifo = pointer_to_uword (s->server_tx_fifo);
+
+  if (session_has_transport (s))
+    {
+      listener = listen_session_get (s->listener_index);
+      mp->listener_handle = listen_session_get_handle (listener);
+      if (application_is_proxy (app))
+       {
+         listener =
+           application_first_listener (app, session_get_fib_proto (s),
+                                       session_get_transport_proto (s));
+         if (listener)
+           mp->listener_handle = listen_session_get_handle (listener);
+       }
+      vpp_queue = session_manager_get_vpp_event_queue (s->thread_index);
+      mp->vpp_event_queue_address = pointer_to_uword (vpp_queue);
+      mp->handle = session_handle (s);
+      tp_vft = transport_protocol_get_vft (session_get_transport_proto (s));
+      tc = tp_vft->get_connection (s->connection_index, s->thread_index);
+      mp->port = tc->rmt_port;
+      mp->is_ip4 = tc->is_ip4;
+      clib_memcpy (&mp->ip, &tc->rmt_ip, sizeof (tc->rmt_ip));
+    }
+  else
+    {
+      local_session_t *ls = (local_session_t *) s;
+      local_session_t *ll;
+      if (application_local_session_listener_has_transport (ls))
+       {
+         listener = listen_session_get (ls->listener_index);
+         mp->listener_handle = listen_session_get_handle (listener);
+         mp->is_ip4 = session_type_is_ip4 (listener->session_type);
+       }
+      else
+       {
+         ll = application_get_local_listen_session (app, ls->listener_index);
+         if (ll->transport_listener_index != ~0)
+           {
+             listener = listen_session_get (ll->transport_listener_index);
+             mp->listener_handle = listen_session_get_handle (listener);
+           }
+         else
+           {
+             mp->listener_handle = application_local_session_handle (ll);
+           }
+         mp->is_ip4 = session_type_is_ip4 (ll->listener_session_type);
+       }
+      mp->handle = application_local_session_handle (ls);
+      mp->port = ls->port;
+      mp->vpp_event_queue_address = ls->client_evt_q;
+      mp->server_event_queue_address = ls->server_evt_q;
+    }
+  svm_msg_q_add (app_mq, msg, SVM_Q_WAIT);
+
+  return 0;
+}
+
+static void
+mq_send_session_disconnected_cb (stream_session_t * s)
+{
+  application_t *app = application_get (s->app_index);
+  svm_msg_q_msg_t _msg, *msg = &_msg;
+  session_disconnected_msg_t *mp;
+  svm_msg_q_t *app_mq;
+  session_event_t *evt;
+
+  app_mq = app->event_queue;
+  svm_msg_q_lock_and_alloc_msg_w_ring (app_mq, SESSION_MQ_CTRL_EVT_RING,
+                                      SVM_Q_WAIT, msg);
+  svm_msg_q_unlock (app_mq);
+  evt = svm_msg_q_msg_data (app_mq, msg);
+  memset (evt, 0, sizeof (*evt));
+  evt->event_type = SESSION_CTRL_EVT_DISCONNECTED;
+  mp = (session_disconnected_msg_t *) evt->data;
+  mp->handle = session_handle (s);
+  mp->context = app->api_client_index;
+  svm_msg_q_add (app_mq, msg, SVM_Q_WAIT);
+}
+
+static void
+mq_send_session_reset_cb (stream_session_t * s)
+{
+  application_t *app = application_get (s->app_index);
+  svm_msg_q_msg_t _msg, *msg = &_msg;
+  session_reset_msg_t *mp;
+  svm_msg_q_t *app_mq;
+  session_event_t *evt;
+
+  app_mq = app->event_queue;
+  svm_msg_q_lock_and_alloc_msg_w_ring (app_mq, SESSION_MQ_CTRL_EVT_RING,
+                                      SVM_Q_WAIT, msg);
+  svm_msg_q_unlock (app_mq);
+  evt = svm_msg_q_msg_data (app_mq, msg);
+  memset (evt, 0, sizeof (*evt));
+  evt->event_type = SESSION_CTRL_EVT_RESET;
+  mp = (session_reset_msg_t *) evt->data;
+  mp->handle = session_handle (s);
+  svm_msg_q_add (app_mq, msg, SVM_Q_WAIT);
+}
+
+static int
+mq_send_session_connected_cb (u32 app_index, u32 api_context,
+                             stream_session_t * s, u8 is_fail)
+{
+  svm_msg_q_msg_t _msg, *msg = &_msg;
+  session_connected_msg_t *mp;
+  svm_msg_q_t *vpp_mq, *app_mq;
+  transport_connection_t *tc;
+  session_event_t *evt;
+  application_t *app;
+
+  app = application_get (app_index);
+  app_mq = app->event_queue;
+  if (!app_mq)
+    {
+      clib_warning ("app %u with api index: %u not attached", app->index,
+                   app->api_client_index);
+      return -1;
+    }
+
+  svm_msg_q_lock_and_alloc_msg_w_ring (app_mq, SESSION_MQ_CTRL_EVT_RING,
+                                      SVM_Q_WAIT, msg);
+  svm_msg_q_unlock (app_mq);
+  evt = svm_msg_q_msg_data (app_mq, msg);
+  memset (evt, 0, sizeof (*evt));
+  evt->event_type = SESSION_CTRL_EVT_CONNECTED;
+  mp = (session_connected_msg_t *) evt->data;
+  mp->context = api_context;
+
+  if (is_fail)
+    goto done;
+
+  if (session_has_transport (s))
+    {
+      tc = session_get_transport (s);
+      if (!tc)
+       {
+         is_fail = 1;
+         goto done;
+       }
+
+      vpp_mq = session_manager_get_vpp_event_queue (s->thread_index);
+      mp->handle = session_handle (s);
+      mp->vpp_event_queue_address = pointer_to_uword (vpp_mq);
+      clib_memcpy (mp->lcl_ip, &tc->lcl_ip, sizeof (tc->lcl_ip));
+      mp->is_ip4 = tc->is_ip4;
+      mp->lcl_port = tc->lcl_port;
+      mp->server_rx_fifo = pointer_to_uword (s->server_rx_fifo);
+      mp->server_tx_fifo = pointer_to_uword (s->server_tx_fifo);
+    }
+  else
+    {
+      local_session_t *ls = (local_session_t *) s;
+      mp->handle = application_local_session_handle (ls);
+      mp->lcl_port = ls->port;
+      mp->vpp_event_queue_address = ls->server_evt_q;
+      mp->client_event_queue_address = ls->client_evt_q;
+      mp->server_rx_fifo = pointer_to_uword (s->server_tx_fifo);
+      mp->server_tx_fifo = pointer_to_uword (s->server_rx_fifo);
+    }
+
+done:
+  mp->retval = is_fail ?
+    clib_host_to_net_u32 (VNET_API_ERROR_SESSION_CONNECT) : 0;
+
+  svm_msg_q_add (app_mq, msg, SVM_Q_WAIT);
+  return 0;
+}
+
+static session_cb_vft_t session_mq_cb_vft = {
+  .session_accept_callback = mq_send_session_accepted_cb,
+  .session_disconnect_callback = mq_send_session_disconnected_cb,
+  .session_connected_callback = mq_send_session_connected_cb,
+  .session_reset_callback = mq_send_session_reset_cb,
+  .add_segment_callback = send_add_segment_callback,
+  .del_segment_callback = send_del_segment_callback,
+};
+
 static void
 vl_api_session_enable_disable_t_handler (vl_api_session_enable_disable_t * mp)
 {
@@ -401,7 +602,11 @@ vl_api_application_attach_t_handler (vl_api_application_attach_t * mp)
   memset (a, 0, sizeof (*a));
   a->api_client_index = mp->client_index;
   a->options = mp->options;
-  a->session_cb_vft = &session_cb_vft;
+
+  if (a->options[APP_OPTIONS_FLAGS] & APP_OPTIONS_FLAGS_USE_MQ_FOR_CTRL_MSGS)
+    a->session_cb_vft = &session_mq_cb_vft;
+  else
+    a->session_cb_vft = &session_cb_vft;
 
   if (mp->namespace_id_len > 64)
     {