session: set fifo's session index before vcl attach
[vpp.git] / src / vnet / session / session_api.c
index 3ca3c2a..f54a46b 100644 (file)
@@ -143,7 +143,7 @@ mq_send_session_accepted_cb (session_t * s)
   m.segment_handle = session_segment_handle (s);
   m.flags = s->flags;
 
-  eq_seg = session_main_get_evt_q_segment ();
+  eq_seg = application_get_rx_mqs_segment (app);
 
   if (session_has_transport (s))
     {
@@ -271,6 +271,9 @@ mq_send_session_connected_cb (u32 app_wrk_index, u32 api_context,
   fifo_segment_t *eq_seg;
   app_worker_t *app_wrk;
   session_event_t *evt;
+  application_t *app;
+
+  app_wrk = app_worker_get (app_wrk_index);
 
   m.context = api_context;
   m.retval = err;
@@ -278,7 +281,8 @@ mq_send_session_connected_cb (u32 app_wrk_index, u32 api_context,
   if (err)
     goto snd_msg;
 
-  eq_seg = session_main_get_evt_q_segment ();
+  app = application_get (app_wrk->app_index);
+  eq_seg = application_get_rx_mqs_segment (app);
 
   if (session_has_transport (s))
     {
@@ -299,6 +303,8 @@ mq_send_session_connected_cb (u32 app_wrk_index, u32 api_context,
       m.server_rx_fifo = fifo_segment_fifo_offset (s->rx_fifo);
       m.server_tx_fifo = fifo_segment_fifo_offset (s->tx_fifo);
       m.segment_handle = session_segment_handle (s);
+      s->rx_fifo->shr->client_session_index = api_context;
+      s->tx_fifo->shr->client_session_index = api_context;
     }
   else
     {
@@ -322,7 +328,6 @@ mq_send_session_connected_cb (u32 app_wrk_index, u32 api_context,
 
 snd_msg:
 
-  app_wrk = app_worker_get (app_wrk_index);
   app_mq = app_wrk->event_queue;
 
   if (mq_try_lock_and_alloc_msg (app_mq, msg))
@@ -348,9 +353,12 @@ mq_send_session_bound_cb (u32 app_wrk_index, u32 api_context,
   fifo_segment_t *eq_seg;
   app_worker_t *app_wrk;
   session_event_t *evt;
+  application_t *app;
   app_listener_t *al;
   session_t *ls = 0;
 
+  app_wrk = app_worker_get (app_wrk_index);
+
   m.context = api_context;
   m.retval = rv;
 
@@ -368,8 +376,8 @@ mq_send_session_bound_cb (u32 app_wrk_index, u32 api_context,
   m.lcl_port = tep.port;
   m.lcl_is_ip4 = tep.is_ip4;
   clib_memcpy_fast (m.lcl_ip, &tep.ip, sizeof (tep.ip));
-
-  eq_seg = session_main_get_evt_q_segment ();
+  app = application_get (app_wrk->app_index);
+  eq_seg = application_get_rx_mqs_segment (app);
   m.vpp_evt_q = fifo_segment_msg_q_offset (eq_seg, ls->thread_index);
 
   if (session_transport_service_type (ls) == TRANSPORT_SERVICE_CL &&
@@ -382,7 +390,6 @@ mq_send_session_bound_cb (u32 app_wrk_index, u32 api_context,
 
 snd_msg:
 
-  app_wrk = app_worker_get (app_wrk_index);
   app_mq = app_wrk->event_queue;
 
   if (mq_try_lock_and_alloc_msg (app_mq, msg))
@@ -429,10 +436,14 @@ mq_send_session_migrate_cb (session_t * s, session_handle_t new_sh)
   app_worker_t *app_wrk;
   session_event_t *evt;
   svm_msg_q_t *app_mq;
+  application_t *app;
   u32 thread_index;
 
   thread_index = session_thread_from_handle (new_sh);
-  eq_seg = session_main_get_evt_q_segment ();
+  app_wrk = app_worker_get (s->app_wrk_index);
+  app_mq = app_wrk->event_queue;
+  app = application_get (app_wrk->app_index);
+  eq_seg = application_get_rx_mqs_segment (app);
 
   m.handle = session_handle (s);
   m.new_handle = new_sh;
@@ -440,8 +451,6 @@ mq_send_session_migrate_cb (session_t * s, session_handle_t new_sh)
   m.vpp_evt_q = fifo_segment_msg_q_offset (eq_seg, thread_index);
   m.segment_handle = SESSION_INVALID_HANDLE;
 
-  app_wrk = app_worker_get (s->app_wrk_index);
-  app_mq = app_wrk->event_queue;
   if (mq_try_lock_and_alloc_msg (app_mq, msg))
     return;
 
@@ -604,17 +613,20 @@ vl_api_session_enable_disable_t_handler (vl_api_session_enable_disable_t * mp)
 static void
 vl_api_app_attach_t_handler (vl_api_app_attach_t * mp)
 {
-  int rv = 0, fds[SESSION_N_FD_TYPE], n_fds = 0;
-  vl_api_app_attach_reply_t *rmp;
-  fifo_segment_t *segp, *evt_q_segment = 0;
+  int rv = 0, *fds = 0, n_fds = 0, n_workers, i;
+  fifo_segment_t *segp, *rx_mqs_seg = 0;
   vnet_app_attach_args_t _a, *a = &_a;
+  vl_api_app_attach_reply_t *rmp;
   u8 fd_flags = 0, ctrl_thread;
   vl_api_registration_t *reg;
+  svm_msg_q_t *rx_mq;
+  application_t *app;
 
   reg = vl_api_client_index_to_registration (mp->client_index);
   if (!reg)
     return;
 
+  n_workers = vlib_num_workers ();
   if (!session_main_is_enabled () || appns_sapi_enabled ())
     {
       rv = VNET_API_ERROR_FEATURE_DISABLED;
@@ -645,13 +657,16 @@ vl_api_app_attach_t_handler (vl_api_app_attach_t * mp)
     }
   vec_free (a->namespace_id);
 
-  /* Send event queues segment */
-  if ((evt_q_segment = session_main_get_evt_q_segment ()))
-    {
-      fd_flags |= SESSION_FD_F_VPP_MQ_SEGMENT;
-      fds[n_fds] = evt_q_segment->ssvm.fd;
-      n_fds += 1;
-    }
+  vec_validate (fds, 3 /* segs + tx evtfd */ + n_workers);
+
+  /* Send rx mqs segment */
+  app = application_get (a->app_index);
+  rx_mqs_seg = application_get_rx_mqs_segment (app);
+
+  fd_flags |= SESSION_FD_F_VPP_MQ_SEGMENT;
+  fds[n_fds] = rx_mqs_seg->ssvm.fd;
+  n_fds += 1;
+
   /* Send fifo segment fd if needed */
   if (ssvm_type (a->segment) == SSVM_SEGMENT_MEMFD)
     {
@@ -666,17 +681,27 @@ vl_api_app_attach_t_handler (vl_api_app_attach_t * mp)
       n_fds += 1;
     }
 
+  if (application_use_private_rx_mqs ())
+    {
+      fd_flags |= SESSION_FD_F_VPP_MQ_EVENTFD;
+      for (i = 0; i < n_workers + 1; i++)
+       {
+         rx_mq = application_rx_mq_get (app, i);
+         fds[n_fds] = svm_msg_q_get_eventfd (rx_mq);
+         n_fds += 1;
+       }
+    }
+
 done:
   /* *INDENT-OFF* */
   REPLY_MACRO2 (VL_API_APP_ATTACH_REPLY, ({
     if (!rv)
       {
-       ctrl_thread = vlib_num_workers () ? 1 : 0;
+       ctrl_thread = n_workers ? 1 : 0;
        segp = (fifo_segment_t *) a->segment;
        rmp->app_index = clib_host_to_net_u32 (a->app_index);
        rmp->app_mq = fifo_segment_msg_q_offset (segp, 0);
-       rmp->vpp_ctrl_mq =
-         fifo_segment_msg_q_offset (evt_q_segment, ctrl_thread);
+       rmp->vpp_ctrl_mq = fifo_segment_msg_q_offset (rx_mqs_seg, ctrl_thread);
        rmp->vpp_ctrl_mq_thread = ctrl_thread;
        rmp->n_fds = n_fds;
        rmp->fd_flags = fd_flags;
@@ -692,6 +717,7 @@ done:
 
   if (n_fds)
     session_send_fds (reg, fds, n_fds);
+  vec_free (fds);
 }
 
 static void
@@ -1268,15 +1294,16 @@ static void
 session_api_attach_handler (app_namespace_t * app_ns, clib_socket_t * cs,
                            app_sapi_attach_msg_t * mp)
 {
-  int rv = 0, fds[SESSION_N_FD_TYPE], n_fds = 0;
+  int rv = 0, *fds = 0, n_fds = 0, i, n_workers;
   vnet_app_attach_args_t _a, *a = &_a;
   app_sapi_attach_reply_msg_t *rmp;
-  fifo_segment_t *evt_q_segment;
   u8 fd_flags = 0, ctrl_thread;
   app_ns_api_handle_t *handle;
+  fifo_segment_t *rx_mqs_seg;
   app_sapi_msg_t msg = { 0 };
   app_worker_t *app_wrk;
   application_t *app;
+  svm_msg_q_t *rx_mq;
 
   /* Make sure name is null terminated */
   mp->name[63] = 0;
@@ -1295,13 +1322,17 @@ session_api_attach_handler (app_namespace_t * app_ns, clib_socket_t * cs,
       goto done;
     }
 
+  n_workers = vlib_num_workers ();
+  vec_validate (fds, 3 /* segs + tx evtfd */ + n_workers);
+
   /* Send event queues segment */
-  if ((evt_q_segment = session_main_get_evt_q_segment ()))
-    {
-      fd_flags |= SESSION_FD_F_VPP_MQ_SEGMENT;
-      fds[n_fds] = evt_q_segment->ssvm.fd;
-      n_fds += 1;
-    }
+  app = application_get (a->app_index);
+  rx_mqs_seg = application_get_rx_mqs_segment (app);
+
+  fd_flags |= SESSION_FD_F_VPP_MQ_SEGMENT;
+  fds[n_fds] = rx_mqs_seg->ssvm.fd;
+  n_fds += 1;
+
   /* Send fifo segment fd if needed */
   if (ssvm_type (a->segment) == SSVM_SEGMENT_MEMFD)
     {
@@ -1316,6 +1347,17 @@ session_api_attach_handler (app_namespace_t * app_ns, clib_socket_t * cs,
       n_fds += 1;
     }
 
+  if (application_use_private_rx_mqs ())
+    {
+      fd_flags |= SESSION_FD_F_VPP_MQ_EVENTFD;
+      for (i = 0; i < n_workers + 1; i++)
+       {
+         rx_mq = application_rx_mq_get (app, i);
+         fds[n_fds] = svm_msg_q_get_eventfd (rx_mq);
+         n_fds += 1;
+       }
+    }
+
 done:
 
   msg.type = APP_SAPI_MSG_TYPE_ATTACH_REPLY;
@@ -1323,12 +1365,11 @@ done:
   rmp->retval = rv;
   if (!rv)
     {
-      ctrl_thread = vlib_num_workers ()? 1 : 0;
+      ctrl_thread = n_workers ? 1 : 0;
       rmp->app_index = a->app_index;
       rmp->app_mq =
        fifo_segment_msg_q_offset ((fifo_segment_t *) a->segment, 0);
-      rmp->vpp_ctrl_mq =
-       fifo_segment_msg_q_offset (evt_q_segment, ctrl_thread);
+      rmp->vpp_ctrl_mq = fifo_segment_msg_q_offset (rx_mqs_seg, ctrl_thread);
       rmp->vpp_ctrl_mq_thread = ctrl_thread;
       rmp->n_fds = n_fds;
       rmp->fd_flags = fd_flags;
@@ -1339,13 +1380,13 @@ done:
 
       /* Update app index for socket */
       handle = (app_ns_api_handle_t *) & cs->private_data;
-      app = application_get (a->app_index);
       app_wrk = application_get_worker (app, 0);
       handle->aah_app_wrk_index = app_wrk->wrk_index;
     }
 
   clib_socket_sendmsg (cs, &msg, sizeof (msg), fds, n_fds);
   vec_free (a->name);
+  vec_free (fds);
 }
 
 static void