svm session vcl: per app rx message queues 64/30864/37
authorFlorin Coras <fcoras@cisco.com>
Fri, 15 Jan 2021 21:49:33 +0000 (13:49 -0800)
committerDave Barach <openvpp@barachs.net>
Mon, 29 Mar 2021 20:20:03 +0000 (20:20 +0000)
Add option to use per app private segments for app to vpp message
queues, as opposed to exposing internal message queues segment.

When so configured, internal message queues are still polled by the
session queue node but external app message queues are handled by a new
input node (appsl-rx-mqs-input) that runs in interrupt state. Signaling
of the node, when mqs receive new messages, is done through eventfds
epolled by worker epoll input nodes.

Type: feature

Signed-off-by: Florin Coras <fcoras@cisco.com>
Change-Id: Iffe8ce5a9944a56a14e6d0f492a850cb9e392d16

src/svm/message_queue.c
src/vcl/test/test_vcl.py
src/vcl/vcl_sapi.c
src/vnet/session/application.c
src/vnet/session/application.h
src/vnet/session/session.c
src/vnet/session/session.h
src/vnet/session/session_api.c
src/vnet/session/session_debug.h
src/vnet/session/session_node.c
src/vnet/session/session_types.h

index 5c04b19..e08ba06 100644 (file)
@@ -482,7 +482,7 @@ int
 svm_msg_q_alloc_eventfd (svm_msg_q_t *mq)
 {
   int fd;
-  if ((fd = eventfd (0, EFD_NONBLOCK)) < 0)
+  if ((fd = eventfd (0, 0)) < 0)
     return -1;
   svm_msg_q_set_eventfd (mq, fd);
   return 0;
index f5a5beb..80b9f2f 100644 (file)
@@ -281,6 +281,7 @@ class LDPCutThruTestCase(VCLTestCase):
 
     def show_commands_at_teardown(self):
         self.logger.debug(self.vapi.cli("show session verbose 2"))
+        self.logger.debug(self.vapi.cli("show app mq"))
 
     @unittest.skipUnless(running_extended_tests, "part of extended tests")
     def test_ldp_cut_thru_echo(self):
@@ -351,6 +352,7 @@ class VCLCutThruTestCase(VCLTestCase):
 
     def show_commands_at_teardown(self):
         self.logger.debug(self.vapi.cli("show session verbose 2"))
+        self.logger.debug(self.vapi.cli("show app mq"))
 
     def test_vcl_cut_thru_echo(self):
         """ run VCL cut thru echo test """
@@ -406,6 +408,7 @@ class VCLThruHostStackEcho(VCLTestCase):
     def show_commands_at_teardown(self):
         self.logger.debug(self.vapi.cli("show app server"))
         self.logger.debug(self.vapi.cli("show session verbose"))
+        self.logger.debug(self.vapi.cli("show app mq"))
 
 
 class VCLThruHostStackTLS(VCLTestCase):
@@ -444,6 +447,7 @@ class VCLThruHostStackTLS(VCLTestCase):
     def show_commands_at_teardown(self):
         self.logger.debug(self.vapi.cli("show app server"))
         self.logger.debug(self.vapi.cli("show session verbose 2"))
+        self.logger.debug(self.vapi.cli("show app mq"))
 
 
 class VCLThruHostStackBidirNsock(VCLTestCase):
@@ -476,6 +480,7 @@ class VCLThruHostStackBidirNsock(VCLTestCase):
 
     def show_commands_at_teardown(self):
         self.logger.debug(self.vapi.cli("show session verbose 2"))
+        self.logger.debug(self.vapi.cli("show app mq"))
 
     def test_vcl_thru_host_stack_bi_dir_nsock(self):
         """ run VCL thru host stack bi-directional (multiple sockets) test """
@@ -517,6 +522,7 @@ class LDPThruHostStackBidirNsock(VCLTestCase):
 
     def show_commands_at_teardown(self):
         self.logger.debug(self.vapi.cli("show session verbose 2"))
+        self.logger.debug(self.vapi.cli("show app mq"))
 
     def test_ldp_thru_host_stack_bi_dir_nsock(self):
         """ run LDP thru host stack bi-directional (multiple sockets) test """
@@ -632,6 +638,7 @@ class LDPThruHostStackIperf(VCLTestCase):
 
     def show_commands_at_teardown(self):
         self.logger.debug(self.vapi.cli("show session verbose 2"))
+        self.logger.debug(self.vapi.cli("show app mq"))
 
     @unittest.skipUnless(_have_iperf3, "'%s' not found, Skipping.")
     def test_ldp_thru_host_stack_iperf3(self):
@@ -668,6 +675,7 @@ class LDPThruHostStackIperfUdp(VCLTestCase):
 
     def show_commands_at_teardown(self):
         self.logger.debug(self.vapi.cli("show session verbose 2"))
+        self.logger.debug(self.vapi.cli("show app mq"))
 
     @unittest.skipUnless(_have_iperf3, "'%s' not found, Skipping.")
     def test_ldp_thru_host_stack_iperf3_udp(self):
@@ -689,6 +697,10 @@ class LDPIpv6CutThruTestCase(VCLTestCase):
     def tearDownClass(cls):
         super(LDPIpv6CutThruTestCase, cls).tearDownClass()
 
+    def show_commands_at_teardown(self):
+        self.logger.debug(self.vapi.cli("show session verbose 2"))
+        self.logger.debug(self.vapi.cli("show app mq"))
+
     def setUp(self):
         super(LDPIpv6CutThruTestCase, self).setUp()
 
@@ -765,6 +777,10 @@ class VCLIpv6CutThruTestCase(VCLTestCase):
     def tearDownClass(cls):
         super(VCLIpv6CutThruTestCase, cls).tearDownClass()
 
+    def show_commands_at_teardown(self):
+        self.logger.debug(self.vapi.cli("show session verbose 2"))
+        self.logger.debug(self.vapi.cli("show app mq"))
+
     def setUp(self):
         super(VCLIpv6CutThruTestCase, self).setUp()
 
@@ -789,6 +805,10 @@ class VCLIpv6CutThruTestCase(VCLTestCase):
         super(VCLIpv6CutThruTestCase, self).tearDown()
         self.cut_thru_tear_down()
 
+    def show_commands_at_teardown(self):
+        self.logger.debug(self.vapi.cli("show session verbose 2"))
+        self.logger.debug(self.vapi.cli("show app mq"))
+
     def test_vcl_ipv6_cut_thru_echo(self):
         """ run VCL IPv6 cut thru echo test """
 
index 5258722..14401da 100644 (file)
@@ -94,7 +94,6 @@ vcl_api_attach_reply_handler (app_sapi_attach_reply_msg_t * mp, int *fds)
   vcl_segment_attach_mq (vcl_vpp_worker_segment_handle (0), mp->vpp_ctrl_mq,
                         mp->vpp_ctrl_mq_thread, &wrk->ctrl_mq);
   vcm->ctrl_mq = wrk->ctrl_mq;
-
   vcm->app_index = mp->app_index;
 
   return 0;
@@ -156,7 +155,7 @@ vcl_sapi_attach (void)
   app_sapi_msg_t _rmp, *rmp = &_rmp;
   clib_error_t *err;
   clib_socket_t *cs;
-  int fds[SESSION_N_FD_TYPE];
+  int fds[32];
 
   /*
    * Init client socket and send attach
index eb8a716..b055ab4 100644 (file)
@@ -412,6 +412,262 @@ application_lookup_name (const u8 * name)
   return 0;
 }
 
+void
+appsl_pending_rx_mqs_add_tail (appsl_wrk_t *aw, app_rx_mq_elt_t *elt)
+{
+  app_rx_mq_elt_t *head;
+
+  if (!aw->pending_rx_mqs)
+    {
+      elt->next = elt->prev = elt;
+      aw->pending_rx_mqs = elt;
+      return;
+    }
+
+  head = aw->pending_rx_mqs;
+
+  ASSERT (head != elt);
+
+  elt->prev = head->prev;
+  elt->next = head;
+
+  head->prev->next = elt;
+  head->prev = elt;
+}
+
+void
+appsl_pending_rx_mqs_del (appsl_wrk_t *aw, app_rx_mq_elt_t *elt)
+{
+  if (elt->next == elt)
+    {
+      elt->next = elt->prev = 0;
+      aw->pending_rx_mqs = 0;
+      return;
+    }
+
+  if (elt == aw->pending_rx_mqs)
+    aw->pending_rx_mqs = elt->next;
+
+  elt->next->prev = elt->prev;
+  elt->prev->next = elt->next;
+  elt->next = elt->prev = 0;
+}
+
+vlib_node_registration_t appsl_rx_mqs_input_node;
+
+VLIB_NODE_FN (appsl_rx_mqs_input_node)
+(vlib_main_t *vm, vlib_node_runtime_t *node, vlib_frame_t *frame)
+{
+  u32 thread_index = vm->thread_index, n_msgs = 0;
+  app_rx_mq_elt_t *elt, *next;
+  app_main_t *am = &app_main;
+  session_worker_t *wrk;
+  int __clib_unused rv;
+  appsl_wrk_t *aw;
+  u64 buf;
+
+  aw = &am->wrk[thread_index];
+  elt = aw->pending_rx_mqs;
+  if (!elt)
+    return 0;
+
+  wrk = session_main_get_worker (thread_index);
+
+  do
+    {
+      if (!(elt->flags & APP_RX_MQ_F_POSTPONED))
+       rv = read (svm_msg_q_get_eventfd (elt->mq), &buf, sizeof (buf));
+      n_msgs += session_wrk_handle_mq (wrk, elt->mq);
+
+      next = elt->next;
+      appsl_pending_rx_mqs_del (aw, elt);
+      if (!svm_msg_q_is_empty (elt->mq))
+       {
+         elt->flags |= APP_RX_MQ_F_POSTPONED;
+         appsl_pending_rx_mqs_add_tail (aw, elt);
+       }
+      else
+       {
+         elt->flags = 0;
+       }
+      elt = next;
+    }
+  while (aw->pending_rx_mqs && elt != aw->pending_rx_mqs);
+
+  if (aw->pending_rx_mqs)
+    vlib_node_set_interrupt_pending (vm, appsl_rx_mqs_input_node.index);
+
+  return n_msgs;
+}
+
+VLIB_REGISTER_NODE (appsl_rx_mqs_input_node) = {
+  .name = "appsl-rx-mqs-input",
+  .type = VLIB_NODE_TYPE_INPUT,
+  .state = VLIB_NODE_STATE_DISABLED,
+};
+
+static clib_error_t *
+app_rx_mq_fd_read_ready (clib_file_t *cf)
+{
+  app_rx_mq_handle_t *handle = (app_rx_mq_handle_t *) &cf->private_data;
+  vlib_main_t *vm = vlib_get_main ();
+  app_main_t *am = &app_main;
+  app_rx_mq_elt_t *mqe;
+  application_t *app;
+  appsl_wrk_t *aw;
+
+  ASSERT (vlib_get_thread_index () == handle->thread_index);
+  app = application_get_if_valid (handle->app_index);
+  if (!app)
+    return 0;
+
+  mqe = &app->rx_mqs[handle->thread_index];
+  if ((mqe->flags & APP_RX_MQ_F_PENDING) || svm_msg_q_is_empty (mqe->mq))
+    return 0;
+
+  aw = &am->wrk[handle->thread_index];
+  appsl_pending_rx_mqs_add_tail (aw, mqe);
+  mqe->flags |= APP_RX_MQ_F_PENDING;
+
+  vlib_node_set_interrupt_pending (vm, appsl_rx_mqs_input_node.index);
+
+  return 0;
+}
+
+static clib_error_t *
+app_rx_mq_fd_write_ready (clib_file_t *cf)
+{
+  clib_warning ("should not be called");
+  return 0;
+}
+
+static void
+app_rx_mqs_epoll_add (application_t *app, app_rx_mq_elt_t *mqe)
+{
+  clib_file_t template = { 0 };
+  app_rx_mq_handle_t handle;
+  u32 thread_index;
+  int fd;
+
+  thread_index = mqe - app->rx_mqs;
+  fd = svm_msg_q_get_eventfd (mqe->mq);
+
+  handle.app_index = app->app_index;
+  handle.thread_index = thread_index;
+
+  template.read_function = app_rx_mq_fd_read_ready;
+  template.write_function = app_rx_mq_fd_write_ready;
+  template.file_descriptor = fd;
+  template.private_data = handle.as_u64;
+  template.polling_thread_index = thread_index;
+  template.description =
+    format (0, "app-%u-rx-mq-%u", app->app_index, thread_index);
+  mqe->file_index = clib_file_add (&file_main, &template);
+}
+
+static void
+app_rx_mqs_epoll_del (application_t *app, app_rx_mq_elt_t *mqe)
+{
+  u32 thread_index = mqe - app->rx_mqs;
+  app_main_t *am = &app_main;
+  appsl_wrk_t *aw;
+
+  aw = &am->wrk[thread_index];
+
+  if (mqe->flags & APP_RX_MQ_F_PENDING)
+    {
+      session_wrk_handle_mq (session_main_get_worker (thread_index), mqe->mq);
+      appsl_pending_rx_mqs_del (aw, mqe);
+    }
+
+  clib_file_del_by_index (&file_main, mqe->file_index);
+}
+
+svm_msg_q_t *
+application_rx_mq_get (application_t *app, u32 mq_index)
+{
+  if (!app->rx_mqs)
+    return 0;
+
+  return app->rx_mqs[mq_index].mq;
+}
+
+static int
+app_rx_mqs_alloc (application_t *app)
+{
+  u32 evt_q_length, evt_size = sizeof (session_event_t);
+  fifo_segment_t *eqs = &app->rx_mqs_segment;
+  u32 n_mqs = vlib_num_workers () + 1;
+  segment_manager_props_t *props;
+  int i;
+
+  props = application_segment_manager_properties (app);
+  evt_q_length = clib_max (props->evt_q_size, 128);
+
+  svm_msg_q_cfg_t _cfg, *cfg = &_cfg;
+  svm_msg_q_ring_cfg_t rc[SESSION_MQ_N_RINGS] = {
+    { evt_q_length, evt_size, 0 }, { evt_q_length >> 1, 256, 0 }
+  };
+  cfg->consumer_pid = 0;
+  cfg->n_rings = 2;
+  cfg->q_nitems = evt_q_length;
+  cfg->ring_cfgs = rc;
+
+  eqs->ssvm.ssvm_size = svm_msg_q_size_to_alloc (cfg) * n_mqs + (16 << 10);
+  eqs->ssvm.name = format (0, "%s-rx-mqs-seg%c", app->name, 0);
+
+  if (ssvm_server_init (&eqs->ssvm, SSVM_SEGMENT_MEMFD))
+    {
+      clib_warning ("failed to initialize queue segment");
+      return SESSION_E_SEG_CREATE;
+    }
+
+  fifo_segment_init (eqs);
+
+  /* Fifo segment filled only with mqs */
+  eqs->h->n_mqs = n_mqs;
+  vec_validate (app->rx_mqs, n_mqs - 1);
+
+  for (i = 0; i < n_mqs; i++)
+    {
+      app->rx_mqs[i].mq = fifo_segment_msg_q_alloc (eqs, i, cfg);
+      if (svm_msg_q_alloc_eventfd (app->rx_mqs[i].mq))
+       {
+         clib_warning ("eventfd returned");
+         fifo_segment_cleanup (eqs);
+         ssvm_delete (&eqs->ssvm);
+         return SESSION_E_EVENTFD_ALLOC;
+       }
+      app_rx_mqs_epoll_add (app, &app->rx_mqs[i]);
+      app->rx_mqs[i].app_index = app->app_index;
+    }
+
+  return 0;
+}
+
+u8
+application_use_private_rx_mqs (void)
+{
+  return session_main.use_private_rx_mqs;
+}
+
+fifo_segment_t *
+application_get_rx_mqs_segment (application_t *app)
+{
+  if (application_use_private_rx_mqs ())
+    return &app->rx_mqs_segment;
+  return session_main_get_evt_q_segment ();
+}
+
+void
+application_enable_rx_mqs_nodes (u8 is_en)
+{
+  u8 state = is_en ? VLIB_NODE_STATE_INTERRUPT : VLIB_NODE_STATE_DISABLED;
+
+  foreach_vlib_main ()
+    vlib_node_set_state (this_vlib_main, appsl_rx_mqs_input_node.index, state);
+}
+
 static application_t *
 application_alloc (void)
 {
@@ -595,6 +851,20 @@ application_free (application_t * app)
   /* *INDENT-ON* */
   pool_free (app->worker_maps);
 
+  /*
+   * Free rx mqs if allocated
+   */
+  if (app->rx_mqs)
+    {
+      int i;
+      for (i = 0; i < vec_len (app->rx_mqs); i++)
+       app_rx_mqs_epoll_del (app, &app->rx_mqs[i]);
+
+      fifo_segment_cleanup (&app->rx_mqs_segment);
+      ssvm_delete (&app->rx_mqs_segment.ssvm);
+      vec_free (app->rx_mqs);
+    }
+
   /*
    * Cleanup remaining state
    */
@@ -875,8 +1145,12 @@ vnet_application_attach (vnet_app_attach_args_t * a)
   a->segment_handle = segment_manager_segment_handle (sm, fs);
 
   segment_manager_segment_reader_unlock (sm);
+
+  if (!application_is_builtin (app) && application_use_private_rx_mqs ())
+    rv = app_rx_mqs_alloc (app);
+
   vec_free (app_name);
-  return 0;
+  return rv;
 }
 
 /**
@@ -1537,6 +1811,8 @@ appliction_format_app_mq (vlib_main_t * vm, application_t * app)
 {
   app_worker_map_t *map;
   app_worker_t *wrk;
+  int i;
+
   /* *INDENT-OFF* */
   pool_foreach (map, app->worker_maps)  {
     wrk = app_worker_get (map->wrk_index);
@@ -1545,6 +1821,10 @@ appliction_format_app_mq (vlib_main_t * vm, application_t * app)
                     wrk->event_queue);
   }
   /* *INDENT-ON* */
+
+  for (i = 0; i < vec_len (app->rx_mqs); i++)
+    vlib_cli_output (vm, "[A%d][R%d]%U", app->app_index, i, format_svm_msg_q,
+                    app->rx_mqs[i].mq);
 }
 
 static clib_error_t *
@@ -1731,10 +2011,18 @@ vnet_app_del_cert_key_pair (u32 index)
 clib_error_t *
 application_init (vlib_main_t * vm)
 {
+  app_main_t *am = &app_main;
+  u32 n_workers;
+
+  n_workers = vlib_num_workers ();
+
   /* Index 0 was originally used by legacy apis, maintain as invalid */
   (void) app_cert_key_pair_alloc ();
-  app_main.last_crypto_engine = CRYPTO_ENGINE_LAST;
-  app_main.app_by_name = hash_create_vec (0, sizeof (u8), sizeof (uword));
+  am->last_crypto_engine = CRYPTO_ENGINE_LAST;
+  am->app_by_name = hash_create_vec (0, sizeof (u8), sizeof (uword));
+
+  vec_validate (am->wrk, n_workers);
+
   return 0;
 }
 
index b6f957a..0bfd4d1 100644 (file)
@@ -92,6 +92,22 @@ typedef struct app_listener_
                                     the app listener */
 } app_listener_t;
 
+typedef enum app_rx_mq_flags_
+{
+  APP_RX_MQ_F_PENDING = 1 << 0,
+  APP_RX_MQ_F_POSTPONED = 1 << 1,
+} app_rx_mq_flags_t;
+
+typedef struct app_rx_mq_elt_
+{
+  struct app_rx_mq_elt_ *next;
+  struct app_rx_mq_elt_ *prev;
+  svm_msg_q_t *mq;
+  uword file_index;
+  u32 app_index;
+  u8 flags;
+} app_rx_mq_elt_t;
+
 typedef struct application_
 {
   /** App index in app pool */
@@ -127,8 +143,38 @@ typedef struct application_
   char quic_iv[17];
   u8 quic_iv_set;
 
+  /** Segment where rx mqs were allocated */
+  fifo_segment_t rx_mqs_segment;
+
+  /**
+   * Fixed vector of rx mqs that can be a part of pending_rx_mqs
+   * linked list maintained by the app sublayer for each worker
+   */
+  app_rx_mq_elt_t *rx_mqs;
 } application_t;
 
+typedef struct app_rx_mq_handle_
+{
+  union
+  {
+    struct
+    {
+      u32 app_index;
+      u32 thread_index;
+    };
+    u64 as_u64;
+  };
+} __attribute__ ((aligned (sizeof (u64)))) app_rx_mq_handle_t;
+
+/**
+ * App sublayer per vpp worker state
+ */
+typedef struct asl_wrk_
+{
+  /** Linked list of mqs with pending messages */
+  app_rx_mq_elt_t *pending_rx_mqs;
+} appsl_wrk_t;
+
 typedef struct app_main_
 {
   /**
@@ -155,6 +201,11 @@ typedef struct app_main_
    * Last registered crypto engine type
    */
   crypto_engine_type_t last_crypto_engine;
+
+  /**
+   * App sublayer per-worker state
+   */
+  appsl_wrk_t *wrk;
 } app_main_t;
 
 typedef struct app_init_args_
@@ -239,6 +290,11 @@ segment_manager_props_t *application_get_segment_manager_properties (u32
 segment_manager_props_t
   * application_segment_manager_properties (application_t * app);
 
+svm_msg_q_t *application_rx_mq_get (application_t *app, u32 mq_index);
+u8 application_use_private_rx_mqs (void);
+fifo_segment_t *application_get_rx_mqs_segment (application_t *app);
+void application_enable_rx_mqs_nodes (u8 is_en);
+
 /*
  * App worker
  */
index 2400a19..c447557 100644 (file)
@@ -1776,6 +1776,9 @@ session_node_enable_disable (u8 is_en)
        }
       vlib_node_set_state (this_vlib_main, session_queue_node.index, state);
     }
+
+  if (session_main.use_private_rx_mqs)
+    application_enable_rx_mqs_nodes (is_en);
 }
 
 clib_error_t *
@@ -1808,6 +1811,7 @@ session_main_init (vlib_main_t * vm)
   smm->is_enabled = 0;
   smm->session_enable_asap = 0;
   smm->poll_main = 0;
+  smm->use_private_rx_mqs = 0;
   smm->session_baseva = HIGH_SEGMENT_BASEVA;
 
 #if (HIGH_SEGMENT_BASEVA > (4ULL << 30))
@@ -1927,6 +1931,8 @@ session_config_fn (vlib_main_t * vm, unformat_input_t * input)
        appns_sapi_enable ();
       else if (unformat (input, "poll-main"))
        smm->poll_main = 1;
+      else if (unformat (input, "use-private-rx-mqs"))
+       smm->use_private_rx_mqs = 1;
       else
        return clib_error_return (0, "unknown input `%U'",
                                  format_unformat_error, input);
index e8afcd0..aba8a1c 100644 (file)
@@ -174,6 +174,9 @@ typedef struct session_main_
   /** Poll session node in main thread */
   u8 poll_main;
 
+  /** Allocate private rx mqs for external apps */
+  u8 use_private_rx_mqs;
+
   /** vpp fifo event queue configured length */
   u32 configured_event_queue_length;
 
@@ -296,6 +299,8 @@ session_evt_alloc_old (session_worker_t * wrk)
   return elt;
 }
 
+int session_wrk_handle_mq (session_worker_t *wrk, svm_msg_q_t *mq);
+
 session_t *session_alloc (u32 thread_index);
 void session_free (session_t * s);
 void session_free_w_fifos (session_t * s);
index 3ca3c2a..5910cd3 100644 (file)
@@ -143,7 +143,7 @@ mq_send_session_accepted_cb (session_t * s)
   m.segment_handle = session_segment_handle (s);
   m.flags = s->flags;
 
-  eq_seg = session_main_get_evt_q_segment ();
+  eq_seg = application_get_rx_mqs_segment (app);
 
   if (session_has_transport (s))
     {
@@ -271,6 +271,9 @@ mq_send_session_connected_cb (u32 app_wrk_index, u32 api_context,
   fifo_segment_t *eq_seg;
   app_worker_t *app_wrk;
   session_event_t *evt;
+  application_t *app;
+
+  app_wrk = app_worker_get (app_wrk_index);
 
   m.context = api_context;
   m.retval = err;
@@ -278,7 +281,8 @@ mq_send_session_connected_cb (u32 app_wrk_index, u32 api_context,
   if (err)
     goto snd_msg;
 
-  eq_seg = session_main_get_evt_q_segment ();
+  app = application_get (app_wrk->app_index);
+  eq_seg = application_get_rx_mqs_segment (app);
 
   if (session_has_transport (s))
     {
@@ -322,7 +326,6 @@ mq_send_session_connected_cb (u32 app_wrk_index, u32 api_context,
 
 snd_msg:
 
-  app_wrk = app_worker_get (app_wrk_index);
   app_mq = app_wrk->event_queue;
 
   if (mq_try_lock_and_alloc_msg (app_mq, msg))
@@ -348,9 +351,12 @@ mq_send_session_bound_cb (u32 app_wrk_index, u32 api_context,
   fifo_segment_t *eq_seg;
   app_worker_t *app_wrk;
   session_event_t *evt;
+  application_t *app;
   app_listener_t *al;
   session_t *ls = 0;
 
+  app_wrk = app_worker_get (app_wrk_index);
+
   m.context = api_context;
   m.retval = rv;
 
@@ -368,8 +374,8 @@ mq_send_session_bound_cb (u32 app_wrk_index, u32 api_context,
   m.lcl_port = tep.port;
   m.lcl_is_ip4 = tep.is_ip4;
   clib_memcpy_fast (m.lcl_ip, &tep.ip, sizeof (tep.ip));
-
-  eq_seg = session_main_get_evt_q_segment ();
+  app = application_get (app_wrk->app_index);
+  eq_seg = application_get_rx_mqs_segment (app);
   m.vpp_evt_q = fifo_segment_msg_q_offset (eq_seg, ls->thread_index);
 
   if (session_transport_service_type (ls) == TRANSPORT_SERVICE_CL &&
@@ -382,7 +388,6 @@ mq_send_session_bound_cb (u32 app_wrk_index, u32 api_context,
 
 snd_msg:
 
-  app_wrk = app_worker_get (app_wrk_index);
   app_mq = app_wrk->event_queue;
 
   if (mq_try_lock_and_alloc_msg (app_mq, msg))
@@ -429,10 +434,14 @@ mq_send_session_migrate_cb (session_t * s, session_handle_t new_sh)
   app_worker_t *app_wrk;
   session_event_t *evt;
   svm_msg_q_t *app_mq;
+  application_t *app;
   u32 thread_index;
 
   thread_index = session_thread_from_handle (new_sh);
-  eq_seg = session_main_get_evt_q_segment ();
+  app_wrk = app_worker_get (s->app_wrk_index);
+  app_mq = app_wrk->event_queue;
+  app = application_get (app_wrk->app_index);
+  eq_seg = application_get_rx_mqs_segment (app);
 
   m.handle = session_handle (s);
   m.new_handle = new_sh;
@@ -440,8 +449,6 @@ mq_send_session_migrate_cb (session_t * s, session_handle_t new_sh)
   m.vpp_evt_q = fifo_segment_msg_q_offset (eq_seg, thread_index);
   m.segment_handle = SESSION_INVALID_HANDLE;
 
-  app_wrk = app_worker_get (s->app_wrk_index);
-  app_mq = app_wrk->event_queue;
   if (mq_try_lock_and_alloc_msg (app_mq, msg))
     return;
 
@@ -604,17 +611,20 @@ vl_api_session_enable_disable_t_handler (vl_api_session_enable_disable_t * mp)
 static void
 vl_api_app_attach_t_handler (vl_api_app_attach_t * mp)
 {
-  int rv = 0, fds[SESSION_N_FD_TYPE], n_fds = 0;
-  vl_api_app_attach_reply_t *rmp;
-  fifo_segment_t *segp, *evt_q_segment = 0;
+  int rv = 0, *fds = 0, n_fds = 0, n_workers, i;
+  fifo_segment_t *segp, *rx_mqs_seg = 0;
   vnet_app_attach_args_t _a, *a = &_a;
+  vl_api_app_attach_reply_t *rmp;
   u8 fd_flags = 0, ctrl_thread;
   vl_api_registration_t *reg;
+  svm_msg_q_t *rx_mq;
+  application_t *app;
 
   reg = vl_api_client_index_to_registration (mp->client_index);
   if (!reg)
     return;
 
+  n_workers = vlib_num_workers ();
   if (!session_main_is_enabled () || appns_sapi_enabled ())
     {
       rv = VNET_API_ERROR_FEATURE_DISABLED;
@@ -645,13 +655,16 @@ vl_api_app_attach_t_handler (vl_api_app_attach_t * mp)
     }
   vec_free (a->namespace_id);
 
-  /* Send event queues segment */
-  if ((evt_q_segment = session_main_get_evt_q_segment ()))
-    {
-      fd_flags |= SESSION_FD_F_VPP_MQ_SEGMENT;
-      fds[n_fds] = evt_q_segment->ssvm.fd;
-      n_fds += 1;
-    }
+  vec_validate (fds, 3 /* segs + tx evtfd */ + n_workers);
+
+  /* Send rx mqs segment */
+  app = application_get (a->app_index);
+  rx_mqs_seg = application_get_rx_mqs_segment (app);
+
+  fd_flags |= SESSION_FD_F_VPP_MQ_SEGMENT;
+  fds[n_fds] = rx_mqs_seg->ssvm.fd;
+  n_fds += 1;
+
   /* Send fifo segment fd if needed */
   if (ssvm_type (a->segment) == SSVM_SEGMENT_MEMFD)
     {
@@ -666,17 +679,27 @@ vl_api_app_attach_t_handler (vl_api_app_attach_t * mp)
       n_fds += 1;
     }
 
+  if (application_use_private_rx_mqs ())
+    {
+      fd_flags |= SESSION_FD_F_VPP_MQ_EVENTFD;
+      for (i = 0; i < n_workers + 1; i++)
+       {
+         rx_mq = application_rx_mq_get (app, i);
+         fds[n_fds] = svm_msg_q_get_eventfd (rx_mq);
+         n_fds += 1;
+       }
+    }
+
 done:
   /* *INDENT-OFF* */
   REPLY_MACRO2 (VL_API_APP_ATTACH_REPLY, ({
     if (!rv)
       {
-       ctrl_thread = vlib_num_workers () ? 1 : 0;
+       ctrl_thread = n_workers ? 1 : 0;
        segp = (fifo_segment_t *) a->segment;
        rmp->app_index = clib_host_to_net_u32 (a->app_index);
        rmp->app_mq = fifo_segment_msg_q_offset (segp, 0);
-       rmp->vpp_ctrl_mq =
-         fifo_segment_msg_q_offset (evt_q_segment, ctrl_thread);
+       rmp->vpp_ctrl_mq = fifo_segment_msg_q_offset (rx_mqs_seg, ctrl_thread);
        rmp->vpp_ctrl_mq_thread = ctrl_thread;
        rmp->n_fds = n_fds;
        rmp->fd_flags = fd_flags;
@@ -692,6 +715,7 @@ done:
 
   if (n_fds)
     session_send_fds (reg, fds, n_fds);
+  vec_free (fds);
 }
 
 static void
@@ -1268,15 +1292,16 @@ static void
 session_api_attach_handler (app_namespace_t * app_ns, clib_socket_t * cs,
                            app_sapi_attach_msg_t * mp)
 {
-  int rv = 0, fds[SESSION_N_FD_TYPE], n_fds = 0;
+  int rv = 0, *fds = 0, n_fds = 0, i, n_workers;
   vnet_app_attach_args_t _a, *a = &_a;
   app_sapi_attach_reply_msg_t *rmp;
-  fifo_segment_t *evt_q_segment;
   u8 fd_flags = 0, ctrl_thread;
   app_ns_api_handle_t *handle;
+  fifo_segment_t *rx_mqs_seg;
   app_sapi_msg_t msg = { 0 };
   app_worker_t *app_wrk;
   application_t *app;
+  svm_msg_q_t *rx_mq;
 
   /* Make sure name is null terminated */
   mp->name[63] = 0;
@@ -1295,13 +1320,17 @@ session_api_attach_handler (app_namespace_t * app_ns, clib_socket_t * cs,
       goto done;
     }
 
+  n_workers = vlib_num_workers ();
+  vec_validate (fds, 3 /* segs + tx evtfd */ + n_workers);
+
   /* Send event queues segment */
-  if ((evt_q_segment = session_main_get_evt_q_segment ()))
-    {
-      fd_flags |= SESSION_FD_F_VPP_MQ_SEGMENT;
-      fds[n_fds] = evt_q_segment->ssvm.fd;
-      n_fds += 1;
-    }
+  app = application_get (a->app_index);
+  rx_mqs_seg = application_get_rx_mqs_segment (app);
+
+  fd_flags |= SESSION_FD_F_VPP_MQ_SEGMENT;
+  fds[n_fds] = rx_mqs_seg->ssvm.fd;
+  n_fds += 1;
+
   /* Send fifo segment fd if needed */
   if (ssvm_type (a->segment) == SSVM_SEGMENT_MEMFD)
     {
@@ -1316,6 +1345,17 @@ session_api_attach_handler (app_namespace_t * app_ns, clib_socket_t * cs,
       n_fds += 1;
     }
 
+  if (application_use_private_rx_mqs ())
+    {
+      fd_flags |= SESSION_FD_F_VPP_MQ_EVENTFD;
+      for (i = 0; i < n_workers + 1; i++)
+       {
+         rx_mq = application_rx_mq_get (app, i);
+         fds[n_fds] = svm_msg_q_get_eventfd (rx_mq);
+         n_fds += 1;
+       }
+    }
+
 done:
 
   msg.type = APP_SAPI_MSG_TYPE_ATTACH_REPLY;
@@ -1323,12 +1363,11 @@ done:
   rmp->retval = rv;
   if (!rv)
     {
-      ctrl_thread = vlib_num_workers ()? 1 : 0;
+      ctrl_thread = n_workers ? 1 : 0;
       rmp->app_index = a->app_index;
       rmp->app_mq =
        fifo_segment_msg_q_offset ((fifo_segment_t *) a->segment, 0);
-      rmp->vpp_ctrl_mq =
-       fifo_segment_msg_q_offset (evt_q_segment, ctrl_thread);
+      rmp->vpp_ctrl_mq = fifo_segment_msg_q_offset (rx_mqs_seg, ctrl_thread);
       rmp->vpp_ctrl_mq_thread = ctrl_thread;
       rmp->n_fds = n_fds;
       rmp->fd_flags = fd_flags;
@@ -1339,13 +1378,13 @@ done:
 
       /* Update app index for socket */
       handle = (app_ns_api_handle_t *) & cs->private_data;
-      app = application_get (a->app_index);
       app_wrk = application_get_worker (app, 0);
       handle->aah_app_wrk_index = app_wrk->wrk_index;
     }
 
   clib_socket_sendmsg (cs, &msg, sizeof (msg), fds, n_fds);
   vec_free (a->name);
+  vec_free (fds);
 }
 
 static void
index a42d90d..9e49a35 100644 (file)
@@ -237,14 +237,14 @@ extern session_dbg_main_t session_dbg_main;
 
 #if SESSION_CLOCKS_EVT_DBG
 
-#define SESSION_EVT_DSP_CNTRS_UPDATE_TIME_HANDLER(_wrk, _diff, _args...)       \
-  session_dbg_evts_t *sde = &session_dbg_main.wrk[_wrk->vm->thread_index];     \
-  sde->counters[SESS_Q_CLK_UPDATE_TIME].f64 += _diff;                          \
-
-#define SESSION_EVT_DSP_CNTRS_MQ_DEQ_HANDLER(_wrk, _diff, _cnt, _dq, _args...) \
-  session_dbg_evts_t *sde = &session_dbg_main.wrk[_wrk->vm->thread_index];     \
-  sde->counters[SESS_Q_CNT_MQ_EVTS].u64 += _dq * _cnt;                         \
-  sde->counters[SESS_Q_CLK_MQ_DEQ].f64 += _diff;                               \
+#define SESSION_EVT_DSP_CNTRS_UPDATE_TIME_HANDLER(_wrk, _diff, _args...)      \
+  session_dbg_evts_t *sde = &session_dbg_main.wrk[_wrk->vm->thread_index];    \
+  sde->counters[SESS_Q_CLK_UPDATE_TIME].f64 += _diff;
+
+#define SESSION_EVT_DSP_CNTRS_MQ_DEQ_HANDLER(_wrk, _diff, _cnt, _args...)     \
+  session_dbg_evts_t *sde = &session_dbg_main.wrk[_wrk->vm->thread_index];    \
+  sde->counters[SESS_Q_CNT_MQ_EVTS].u64 += _cnt;                              \
+  sde->counters[SESS_Q_CLK_MQ_DEQ].f64 += _diff;
 
 #define SESSION_EVT_DSP_CNTRS_CTRL_EVTS_HANDLER(_wrk, _diff, _args...)         \
   session_dbg_evts_t *sde = &session_dbg_main.wrk[_wrk->vm->thread_index];     \
index ccf93cb..f3713d0 100644 (file)
@@ -1399,19 +1399,35 @@ session_flush_pending_tx_buffers (session_worker_t * wrk,
   vec_reset_length (wrk->pending_tx_nexts);
 }
 
+int
+session_wrk_handle_mq (session_worker_t *wrk, svm_msg_q_t *mq)
+{
+  svm_msg_q_msg_t _msg, *msg = &_msg;
+  u32 i, n_to_dequeue = 0;
+  session_event_t *evt;
+
+  n_to_dequeue = svm_msg_q_size (mq);
+  for (i = 0; i < n_to_dequeue; i++)
+    {
+      svm_msg_q_sub_raw (mq, msg);
+      evt = svm_msg_q_msg_data (mq, msg);
+      session_evt_add_to_list (wrk, evt);
+      svm_msg_q_free_msg (mq, msg);
+    }
+
+  return n_to_dequeue;
+}
+
 static uword
 session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
                       vlib_frame_t * frame)
 {
+  u32 thread_index = vm->thread_index, __clib_unused n_evts;
+  session_evt_elt_t *elt, *ctrl_he, *new_he, *old_he;
   session_main_t *smm = vnet_get_session_main ();
-  u32 thread_index = vm->thread_index, n_to_dequeue;
   session_worker_t *wrk = &smm->wrk[thread_index];
-  session_evt_elt_t *elt, *ctrl_he, *new_he, *old_he;
   clib_llist_index_t ei, next_ei, old_ti;
-  svm_msg_q_msg_t _msg, *msg = &_msg;
-  int i = 0, n_tx_packets;
-  session_event_t *evt;
-  svm_msg_q_t *mq;
+  int n_tx_packets;
 
   SESSION_EVT (SESSION_EVT_DISPATCH_START, wrk);
 
@@ -1426,25 +1442,11 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
   SESSION_EVT (SESSION_EVT_DSP_CNTRS, UPDATE_TIME, wrk);
 
   /*
-   *  Dequeue and handle new events
+   *  Dequeue new internal mq events
    */
 
-  /* Try to dequeue what is available. Don't wait for lock.
-   * XXX: we may need priorities here */
-  mq = wrk->vpp_event_queue;
-  n_to_dequeue = svm_msg_q_size (mq);
-  if (n_to_dequeue)
-    {
-      for (i = 0; i < n_to_dequeue; i++)
-       {
-         svm_msg_q_sub_raw (mq, msg);
-         evt = svm_msg_q_msg_data (mq, msg);
-         session_evt_add_to_list (wrk, evt);
-         svm_msg_q_free_msg (mq, msg);
-       }
-    }
-
-  SESSION_EVT (SESSION_EVT_DSP_CNTRS, MQ_DEQ, wrk, n_to_dequeue, !i);
+  n_evts = session_wrk_handle_mq (wrk, wrk->vpp_event_queue);
+  SESSION_EVT (SESSION_EVT_DSP_CNTRS, MQ_DEQ, wrk, n_evts);
 
   /*
    * Handle control events
@@ -1452,12 +1454,10 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
 
   ctrl_he = pool_elt_at_index (wrk->event_elts, wrk->ctrl_head);
 
-  /* *INDENT-OFF* */
   clib_llist_foreach_safe (wrk->event_elts, evt_list, ctrl_he, elt, ({
     clib_llist_remove (wrk->event_elts, evt_list, elt);
     session_event_dispatch_ctrl (wrk, elt);
   }));
-  /* *INDENT-ON* */
 
   SESSION_EVT (SESSION_EVT_DSP_CNTRS, CTRL_EVTS, wrk);
 
index 7a9f687..32cb953 100644 (file)
@@ -449,37 +449,38 @@ typedef struct session_dgram_header_
 STATIC_ASSERT (sizeof (session_dgram_hdr_t) == (SESSION_CONN_ID_LEN + 8),
               "session conn id wrong length");
 
-#define foreach_session_error                                          \
-  _(NONE, "no error")                                                  \
-  _(UNKNOWN, "generic/unknown error")                                  \
-  _(REFUSED, "refused")                                                        \
-  _(TIMEDOUT, "timedout")                                              \
-  _(ALLOC, "obj/memory allocation error")                              \
-  _(OWNER, "object not owned by application")                          \
-  _(NOROUTE, "no route")                                               \
-  _(NOINTF, "no resolving interface")                                  \
-  _(NOIP, "no ip for lcl interface")                                   \
-  _(NOPORT, "no lcl port")                                             \
-  _(NOSUPPORT, "not supported")                                                \
-  _(NOLISTEN, "not listening")                                         \
-  _(NOSESSION, "session does not exist")                               \
-  _(NOAPP, "app not attached")                                         \
-  _(PORTINUSE, "lcl port in use")                                      \
-  _(IPINUSE, "ip in use")                                              \
-  _(ALREADY_LISTENING, "ip port pair already listened on")             \
-  _(INVALID_RMT_IP, "invalid remote ip")                               \
-  _(INVALID_APPWRK, "invalid app worker")                              \
-  _(INVALID_NS, "invalid namespace")                                   \
-  _(SEG_NO_SPACE, "Couldn't allocate a fifo pair")                     \
-  _(SEG_NO_SPACE2, "Created segment, couldn't allocate a fifo pair")   \
-  _(SEG_CREATE, "Couldn't create a new segment")                       \
-  _(FILTERED, "session filtered")                                      \
-  _(SCOPE, "scope not supported")                                      \
-  _(BAPI_NO_FD, "bapi doesn't have a socket fd")                       \
-  _(BAPI_SEND_FD, "couldn't send fd over bapi socket fd")              \
-  _(BAPI_NO_REG, "app bapi registration not found")                    \
-  _(MQ_MSG_ALLOC, "failed to alloc mq msg")                            \
-  _(TLS_HANDSHAKE, "failed tls handshake")                             \
+#define foreach_session_error                                                 \
+  _ (NONE, "no error")                                                        \
+  _ (UNKNOWN, "generic/unknown error")                                        \
+  _ (REFUSED, "refused")                                                      \
+  _ (TIMEDOUT, "timedout")                                                    \
+  _ (ALLOC, "obj/memory allocation error")                                    \
+  _ (OWNER, "object not owned by application")                                \
+  _ (NOROUTE, "no route")                                                     \
+  _ (NOINTF, "no resolving interface")                                        \
+  _ (NOIP, "no ip for lcl interface")                                         \
+  _ (NOPORT, "no lcl port")                                                   \
+  _ (NOSUPPORT, "not supported")                                              \
+  _ (NOLISTEN, "not listening")                                               \
+  _ (NOSESSION, "session does not exist")                                     \
+  _ (NOAPP, "app not attached")                                               \
+  _ (PORTINUSE, "lcl port in use")                                            \
+  _ (IPINUSE, "ip in use")                                                    \
+  _ (ALREADY_LISTENING, "ip port pair already listened on")                   \
+  _ (INVALID_RMT_IP, "invalid remote ip")                                     \
+  _ (INVALID_APPWRK, "invalid app worker")                                    \
+  _ (INVALID_NS, "invalid namespace")                                         \
+  _ (SEG_NO_SPACE, "Couldn't allocate a fifo pair")                           \
+  _ (SEG_NO_SPACE2, "Created segment, couldn't allocate a fifo pair")         \
+  _ (SEG_CREATE, "Couldn't create a new segment")                             \
+  _ (FILTERED, "session filtered")                                            \
+  _ (SCOPE, "scope not supported")                                            \
+  _ (BAPI_NO_FD, "bapi doesn't have a socket fd")                             \
+  _ (BAPI_SEND_FD, "couldn't send fd over bapi socket fd")                    \
+  _ (BAPI_NO_REG, "app bapi registration not found")                          \
+  _ (MQ_MSG_ALLOC, "failed to alloc mq msg")                                  \
+  _ (TLS_HANDSHAKE, "failed tls handshake")                                   \
+  _ (EVENTFD_ALLOC, "failed to alloc eventfd")
 
 typedef enum session_error_p_
 {