session: async rx event notifications 58/37858/46
authorFlorin Coras <fcoras@cisco.com>
Thu, 22 Dec 2022 23:03:44 +0000 (15:03 -0800)
committerDave Barach <vpp@barachs.net>
Wed, 9 Aug 2023 18:45:26 +0000 (18:45 +0000)
Move from synchronous flushing of io and ctrl events from transports to
applications to an async model via a new session_input input node that
runs in interrupt mode. Events are coalesced per application worker.

On the one hand, this helps by minimizing message queue locking churn.
And on the other, it opens the possibility for further optimizations of
event message generation, obviates need for rx rescheduling rpcs and is
a first step towards a fully async data/io rx path.

Type: improvement

Signed-off-by: Florin Coras <fcoras@cisco.com>
Change-Id: Id6bebcb65fc9feef8aa02ddf1af6d9ba6f6745ce

23 files changed:
src/plugins/hs_apps/echo_client.c
src/plugins/hs_apps/echo_server.c
src/plugins/http/http.c
src/plugins/quic/quic.c
src/plugins/srtp/srtp.c
src/plugins/unittest/session_test.c
src/svm/message_queue.c
src/svm/message_queue.h
src/vnet/CMakeLists.txt
src/vnet/session/application.c
src/vnet/session/application.h
src/vnet/session/application_local.c
src/vnet/session/application_worker.c
src/vnet/session/segment_manager.c
src/vnet/session/session.c
src/vnet/session/session.h
src/vnet/session/session_api.c
src/vnet/session/session_input.c [new file with mode: 0644]
src/vnet/session/session_node.c
src/vnet/session/session_types.h
src/vnet/tcp/tcp_input.c
src/vnet/tls/tls.c
src/vnet/udp/udp_input.c

index a7358a6..d449045 100644 (file)
@@ -706,10 +706,8 @@ ec_session_rx_callback (session_t *s)
   receive_data_chunk (wrk, es);
 
   if (svm_fifo_max_dequeue_cons (s->rx_fifo))
-    {
-      if (svm_fifo_set_event (s->rx_fifo))
-       session_send_io_evt_to_thread (s->rx_fifo, SESSION_IO_EVT_BUILTIN_RX);
-    }
+    session_enqueue_notify (s);
+
   return 0;
 }
 
index 6a29116..178e9ee 100644 (file)
@@ -228,6 +228,8 @@ echo_server_rx_callback (session_t * s)
       /* Program self-tap to retry */
       if (svm_fifo_set_event (rx_fifo))
        {
+         /* TODO should be session_enqueue_notify(s) but quic tests seem
+          * to fail if that's the case */
          if (session_send_io_evt_to_thread (rx_fifo,
                                             SESSION_IO_EVT_BUILTIN_RX))
            clib_warning ("failed to enqueue self-tap");
index d25123a..503752c 100644 (file)
@@ -503,7 +503,7 @@ state_srv_wait_method (http_conn_t *hc, transport_send_params_t *sp)
   hc->http_state = HTTP_STATE_WAIT_APP;
 
   app_wrk = app_worker_get_if_valid (as->app_wrk_index);
-  app_worker_lock_and_send_event (app_wrk, as, SESSION_IO_EVT_RX);
+  app_worker_rx_notify (app_wrk, as);
 
   return HTTP_SM_STOP;
 
@@ -777,7 +777,7 @@ state_cln_wait_method (http_conn_t *hc, transport_send_params_t *sp)
     }
 
   app_wrk = app_worker_get_if_valid (as->app_wrk_index);
-  app_worker_lock_and_send_event (app_wrk, as, SESSION_IO_EVT_RX);
+  app_worker_rx_notify (app_wrk, as);
   return HTTP_SM_STOP;
 }
 
@@ -808,7 +808,7 @@ cln_drain_rx_buf (http_conn_t *hc, session_t *ts, session_t *as)
   app_wrk = app_worker_get_if_valid (as->app_wrk_index);
   ASSERT (app_wrk);
 
-  app_worker_lock_and_send_event (app_wrk, as, SESSION_IO_EVT_RX);
+  app_worker_rx_notify (app_wrk, as);
   return 1;
 }
 
@@ -864,8 +864,9 @@ maybe_reschedule:
   if (hc->rx_buf_offset < vec_len (hc->rx_buf) ||
       svm_fifo_max_dequeue_cons (ts->rx_fifo))
     {
+      /* TODO is the flag really needed? */
       if (svm_fifo_set_event (ts->rx_fifo))
-       session_send_io_evt_to_thread (ts->rx_fifo, SESSION_IO_EVT_BUILTIN_RX);
+       session_enqueue_notify (ts);
     }
   return HTTP_SM_CONTINUE;
 }
index c3c4540..61380b8 100644 (file)
@@ -830,7 +830,7 @@ quic_on_receive (quicly_stream_t * stream, size_t off, const void *src,
                 size_t len)
 {
   QUIC_DBG (3, "received data: %lu bytes, offset %lu", len, off);
-  u32 max_enq, rv;
+  u32 max_enq;
   quic_ctx_t *sctx;
   session_t *stream_session;
   app_worker_t *app_wrk;
@@ -895,10 +895,7 @@ quic_on_receive (quicly_stream_t * stream, size_t off, const void *src,
       app_wrk = app_worker_get_if_valid (stream_session->app_wrk_index);
       if (PREDICT_TRUE (app_wrk != 0))
        {
-         rv = app_worker_lock_and_send_event (app_wrk, stream_session,
-                                              SESSION_IO_EVT_RX);
-         if (rv)
-           QUIC_ERR ("Failed to ping app for RX");
+         app_worker_rx_notify (app_wrk, stream_session);
        }
       quic_ack_rx_data (stream_session);
     }
index aacadce..8b7c5b6 100644 (file)
@@ -309,8 +309,7 @@ done:
 int
 srtp_add_vpp_q_builtin_rx_evt (session_t *s)
 {
-  if (svm_fifo_set_event (s->rx_fifo))
-    session_send_io_evt_to_thread (s->rx_fifo, SESSION_IO_EVT_BUILTIN_RX);
+  session_enqueue_notify (s);
   return 0;
 }
 
@@ -320,7 +319,7 @@ srtp_notify_app_enqueue (srtp_tc_t *ctx, session_t *app_session)
   app_worker_t *app_wrk;
   app_wrk = app_worker_get_if_valid (app_session->app_wrk_index);
   if (PREDICT_TRUE (app_wrk != 0))
-    app_worker_lock_and_send_event (app_wrk, app_session, SESSION_IO_EVT_RX);
+    app_worker_rx_notify (app_wrk, app_session);
 }
 
 static inline int
index c4e41c3..70b3b32 100644 (file)
@@ -1771,6 +1771,74 @@ wait_for_event (svm_msg_q_t * mq, int fd, int epfd, u8 use_eventfd)
     }
 }
 
+/* Used to be part of application_worker.c prior to adding support for
+ * async rx
+ */
+static int
+test_mq_try_lock_and_alloc_msg (svm_msg_q_t *mq, session_mq_rings_e ring,
+                               svm_msg_q_msg_t *msg)
+{
+  int rv, n_try = 0;
+
+  while (n_try < 75)
+    {
+      rv = svm_msg_q_lock_and_alloc_msg_w_ring (mq, ring, SVM_Q_NOWAIT, msg);
+      if (!rv)
+       return 0;
+      /*
+       * Break the loop if mq is full, usually this is because the
+       * app has crashed or is hanging on somewhere.
+       */
+      if (rv != -1)
+       break;
+      n_try += 1;
+      usleep (1);
+    }
+
+  return -1;
+}
+
+/* Used to be part of application_worker.c prior to adding support for
+ * async rx and was used for delivering io events over mq
+ * NB: removed handling of mq congestion
+ */
+static inline int
+test_app_send_io_evt_rx (app_worker_t *app_wrk, session_t *s)
+{
+  svm_msg_q_msg_t _mq_msg = { 0 }, *mq_msg = &_mq_msg;
+  session_event_t *evt;
+  svm_msg_q_t *mq;
+  u32 app_session;
+  int rv;
+
+  if (app_worker_application_is_builtin (app_wrk))
+    return app_worker_rx_notify (app_wrk, s);
+
+  if (svm_fifo_has_event (s->rx_fifo))
+    return 0;
+
+  app_session = s->rx_fifo->shr->client_session_index;
+  mq = app_wrk->event_queue;
+
+  rv = test_mq_try_lock_and_alloc_msg (mq, SESSION_MQ_IO_EVT_RING, mq_msg);
+
+  if (PREDICT_FALSE (rv))
+    {
+      clib_warning ("failed to alloc mq message");
+      return -1;
+    }
+
+  evt = svm_msg_q_msg_data (mq, mq_msg);
+  evt->event_type = SESSION_IO_EVT_RX;
+  evt->session_index = app_session;
+
+  (void) svm_fifo_set_event (s->rx_fifo);
+
+  svm_msg_q_add_and_unlock (mq, mq_msg);
+
+  return 0;
+}
+
 static int
 session_test_mq_speed (vlib_main_t * vm, unformat_input_t * input)
 {
@@ -1885,7 +1953,7 @@ session_test_mq_speed (vlib_main_t * vm, unformat_input_t * input)
        {
          while (svm_fifo_has_event (rx_fifo))
            ;
-         app_worker_lock_and_send_event (app_wrk, &s, SESSION_IO_EVT_RX);
+         test_app_send_io_evt_rx (app_wrk, &s);
        }
     }
 
index 2880645..ab0d230 100644 (file)
@@ -340,15 +340,15 @@ svm_msq_q_msg_is_valid (svm_msg_q_t * mq, svm_msg_q_msg_t * msg)
   return (dist1 < dist2);
 }
 
-static void
-svm_msg_q_add_raw (svm_msg_q_t *mq, u8 *elem)
+void
+svm_msg_q_add_raw (svm_msg_q_t *mq, svm_msg_q_msg_t *msg)
 {
   svm_msg_q_shared_queue_t *sq = mq->q.shr;
   i8 *tailp;
   u32 sz;
 
   tailp = (i8 *) (&sq->data[0] + sq->elsize * sq->tail);
-  clib_memcpy_fast (tailp, elem, sq->elsize);
+  clib_memcpy_fast (tailp, msg, sq->elsize);
 
   sq->tail = (sq->tail + 1) % sq->maxsize;
 
@@ -381,7 +381,7 @@ svm_msg_q_add (svm_msg_q_t * mq, svm_msg_q_msg_t * msg, int nowait)
        svm_msg_q_wait_prod (mq);
     }
 
-  svm_msg_q_add_raw (mq, (u8 *) msg);
+  svm_msg_q_add_raw (mq, msg);
 
   svm_msg_q_unlock (mq);
 
@@ -392,7 +392,7 @@ void
 svm_msg_q_add_and_unlock (svm_msg_q_t * mq, svm_msg_q_msg_t * msg)
 {
   ASSERT (svm_msq_q_msg_is_valid (mq, msg));
-  svm_msg_q_add_raw (mq, (u8 *) msg);
+  svm_msg_q_add_raw (mq, msg);
   svm_msg_q_unlock (mq);
 }
 
index 0780cca..4473c44 100644 (file)
@@ -190,6 +190,17 @@ int svm_msg_q_lock_and_alloc_msg_w_ring (svm_msg_q_t * mq, u32 ring_index,
  */
 void svm_msg_q_free_msg (svm_msg_q_t * mq, svm_msg_q_msg_t * msg);
 
+/**
+ * Producer enqueue one message to queue
+ *
+ * Must be called with mq locked. Prior to calling this, the producer should've
+ * obtained a message buffer from one of the rings.
+ *
+ * @param mq           message queue
+ * @param msg          message to be enqueued
+ */
+void svm_msg_q_add_raw (svm_msg_q_t *mq, svm_msg_q_msg_t *msg);
+
 /**
  * Producer enqueue one message to queue
  *
index 5aada92..5e913df 100644 (file)
@@ -1018,6 +1018,7 @@ list(APPEND VNET_SOURCES
   session/session_rules_table.c
   session/session_lookup.c
   session/session_node.c
+  session/session_input.c
   session/transport.c
   session/application.c
   session/application_worker.c
index cf86740..fdd5a0a 100644 (file)
@@ -725,6 +725,12 @@ application_get_if_valid (u32 app_index)
   return pool_elt_at_index (app_main.app_pool, app_index);
 }
 
+static int
+_null_app_tx_callback (session_t *s)
+{
+  return 0;
+}
+
 static void
 application_verify_cb_fns (session_cb_vft_t * cb_fns)
 {
@@ -736,6 +742,8 @@ application_verify_cb_fns (session_cb_vft_t * cb_fns)
     clib_warning ("No session disconnect callback function provided");
   if (cb_fns->session_reset_callback == 0)
     clib_warning ("No session reset callback function provided");
+  if (!cb_fns->builtin_app_tx_callback)
+    cb_fns->builtin_app_tx_callback = _null_app_tx_callback;
 }
 
 /**
index c3d6180..5505d91 100644 (file)
@@ -77,17 +77,17 @@ typedef struct app_worker_
   /** Pool of half-open session handles. Tracked in case worker detaches */
   session_handle_t *half_open_table;
 
+  /* Per vpp worker fifos of events for app worker */
+  session_event_t **wrk_evts;
+
+  /* Vector of vpp workers mq congestion flags */
+  u8 *wrk_mq_congested;
+
   /** Protects detached seg managers */
   clib_spinlock_t detached_seg_managers_lock;
 
   /** Vector of detached listener segment managers */
   u32 *detached_seg_managers;
-
-  /** Fifo of messages postponed because of mq congestion */
-  app_wrk_postponed_msg_t *postponed_mq_msgs;
-
-  /** Lock to add/sub message from ref @postponed_mq_msgs */
-  clib_spinlock_t postponed_mq_msgs_lock;
 } app_worker_t;
 
 typedef struct app_worker_map_
@@ -317,6 +317,12 @@ void application_enable_rx_mqs_nodes (u8 is_en);
  * App worker
  */
 
+always_inline u8
+app_worker_mq_is_congested (app_worker_t *app_wrk)
+{
+  return app_wrk->mq_congested > 0;
+}
+
 app_worker_t *app_worker_alloc (application_t * app);
 int application_alloc_worker_and_init (application_t * app,
                                       app_worker_t ** wrk);
@@ -331,6 +337,10 @@ session_error_t app_worker_start_listen (app_worker_t *app_wrk,
                                         app_listener_t *lstnr);
 int app_worker_stop_listen (app_worker_t * app_wrk, app_listener_t * al);
 int app_worker_init_accepted (session_t * s);
+int app_worker_listened_notify (app_worker_t *app_wrk, session_handle_t alsh,
+                               u32 opaque, int err);
+int app_worker_unlisten_reply (app_worker_t *app_wrk, session_handle_t sh,
+                              u32 opaque, session_error_t err);
 int app_worker_accept_notify (app_worker_t * app_wrk, session_t * s);
 int app_worker_init_connected (app_worker_t * app_wrk, session_t * s);
 int app_worker_connect_notify (app_worker_t * app_wrk, session_t * s,
@@ -343,13 +353,21 @@ int app_worker_transport_closed_notify (app_worker_t * app_wrk,
 int app_worker_reset_notify (app_worker_t * app_wrk, session_t * s);
 int app_worker_cleanup_notify (app_worker_t * app_wrk, session_t * s,
                               session_cleanup_ntf_t ntf);
+int app_worker_cleanup_notify_custom (app_worker_t *app_wrk, session_t *s,
+                                     session_cleanup_ntf_t ntf,
+                                     void (*cleanup_cb) (session_t *s));
 int app_worker_migrate_notify (app_worker_t * app_wrk, session_t * s,
                               session_handle_t new_sh);
-int app_worker_builtin_rx (app_worker_t * app_wrk, session_t * s);
-int app_worker_builtin_tx (app_worker_t * app_wrk, session_t * s);
+int app_worker_rx_notify (app_worker_t *app_wrk, session_t *s);
 int app_worker_session_fifo_tuning (app_worker_t * app_wrk, session_t * s,
                                    svm_fifo_t * f,
                                    session_ft_action_t act, u32 len);
+void app_worker_add_event (app_worker_t *app_wrk, session_t *s,
+                          session_evt_type_t evt_type);
+void app_worker_add_event_custom (app_worker_t *app_wrk, u32 thread_index,
+                                 session_event_t *evt);
+int app_wrk_flush_wrk_events (app_worker_t *app_wrk, u32 thread_index);
+void app_worker_del_all_events (app_worker_t *app_wrk);
 segment_manager_t *app_worker_get_listen_segment_manager (app_worker_t *,
                                                          session_t *);
 segment_manager_t *app_worker_get_connect_segment_manager (app_worker_t *);
@@ -364,9 +382,10 @@ void app_wrk_send_ctrl_evt_fd (app_worker_t *app_wrk, u8 evt_type, void *msg,
                               u32 msg_len, int fd);
 void app_wrk_send_ctrl_evt (app_worker_t *app_wrk, u8 evt_type, void *msg,
                            u32 msg_len);
-int app_worker_send_event (app_worker_t * app, session_t * s, u8 evt);
-int app_worker_lock_and_send_event (app_worker_t * app, session_t * s,
-                                   u8 evt_type);
+u8 app_worker_mq_wrk_is_congested (app_worker_t *app_wrk, u32 thread_index);
+void app_worker_set_mq_wrk_congested (app_worker_t *app_wrk, u32 thread_index);
+void app_worker_unset_wrk_mq_congested (app_worker_t *app_wrk,
+                                       u32 thread_index);
 session_t *app_worker_proxy_listener (app_worker_t * app, u8 fib_proto,
                                      u8 transport_proto);
 void app_worker_del_detached_sm (app_worker_t * app_wrk, u32 sm_index);
@@ -395,6 +414,12 @@ void sapi_socket_close_w_handle (u32 api_handle);
 crypto_engine_type_t app_crypto_engine_type_add (void);
 u8 app_crypto_engine_n_types (void);
 
+static inline u8
+app_worker_application_is_builtin (app_worker_t *app_wrk)
+{
+  return app_wrk->app_is_builtin;
+}
+
 #endif /* SRC_VNET_SESSION_APPLICATION_H_ */
 
 /*
index 6ac4da2..192c22b 100644 (file)
@@ -1027,6 +1027,17 @@ ct_close_is_reset (ct_connection_t *ct, session_t *s)
     return (svm_fifo_max_dequeue (s->rx_fifo) > 0);
 }
 
+static void
+ct_session_cleanup_server_session (session_t *s)
+{
+  ct_connection_t *ct;
+
+  ct = (ct_connection_t *) session_get_transport (s);
+  ct_session_dealloc_fifos (ct, s->rx_fifo, s->tx_fifo);
+  session_free (s);
+  ct_connection_free (ct);
+}
+
 static void
 ct_session_postponed_cleanup (ct_connection_t *ct)
 {
@@ -1047,33 +1058,38 @@ ct_session_postponed_cleanup (ct_connection_t *ct)
     }
   session_transport_closed_notify (&ct->connection);
 
+  /* It would be cleaner to call session_transport_delete_notify
+   * but then we can't control session cleanup lower */
+  session_set_state (s, SESSION_STATE_TRANSPORT_DELETED);
+  if (app_wrk)
+    app_worker_cleanup_notify (app_wrk, s, SESSION_CLEANUP_TRANSPORT);
+
   if (ct->flags & CT_CONN_F_CLIENT)
     {
-      if (app_wrk)
-       app_worker_cleanup_notify (app_wrk, s, SESSION_CLEANUP_TRANSPORT);
-
       /* Normal free for client session as the fifos are allocated through
        * the connects segment manager in a segment that's not shared with
        * the server */
       ct_session_dealloc_fifos (ct, ct->client_rx_fifo, ct->client_tx_fifo);
-      session_free_w_fifos (s);
+      session_program_cleanup (s);
+      ct_connection_free (ct);
     }
   else
     {
       /* Manual session and fifo segment cleanup to avoid implicit
        * segment manager cleanups and notifications */
-      app_wrk = app_worker_get_if_valid (s->app_wrk_index);
       if (app_wrk)
        {
-         app_worker_cleanup_notify (app_wrk, s, SESSION_CLEANUP_TRANSPORT);
-         app_worker_cleanup_notify (app_wrk, s, SESSION_CLEANUP_SESSION);
+         /* Remove custom cleanup notify infra when/if switching to normal
+          * session cleanup. Note that ct is freed in the cb function */
+         app_worker_cleanup_notify_custom (app_wrk, s,
+                                           SESSION_CLEANUP_SESSION,
+                                           ct_session_cleanup_server_session);
+       }
+      else
+       {
+         ct_connection_free (ct);
        }
-
-      ct_session_dealloc_fifos (ct, s->rx_fifo, s->tx_fifo);
-      session_free (s);
     }
-
-  ct_connection_free (ct);
 }
 
 static void
index c3941d1..127963a 100644 (file)
@@ -26,6 +26,7 @@ app_worker_t *
 app_worker_alloc (application_t * app)
 {
   app_worker_t *app_wrk;
+
   pool_get (app_workers, app_wrk);
   clib_memset (app_wrk, 0, sizeof (*app_wrk));
   app_wrk->wrk_index = app_wrk - app_workers;
@@ -33,7 +34,8 @@ app_worker_alloc (application_t * app)
   app_wrk->wrk_map_index = ~0;
   app_wrk->connects_seg_manager = APP_INVALID_SEGMENT_MANAGER_INDEX;
   clib_spinlock_init (&app_wrk->detached_seg_managers_lock);
-  clib_spinlock_init (&app_wrk->postponed_mq_msgs_lock);
+  vec_validate (app_wrk->wrk_evts, vlib_num_workers ());
+  vec_validate (app_wrk->wrk_mq_congested, vlib_num_workers ());
   APP_DBG ("New app %v worker %u", app->name, app_wrk->wrk_index);
   return app_wrk;
 }
@@ -64,18 +66,26 @@ app_worker_free (app_worker_t * app_wrk)
   u32 sm_index;
   int i;
 
+  /*
+   * Cleanup vpp wrk events
+   */
+  app_worker_del_all_events (app_wrk);
+  for (i = 0; i < vec_len (app_wrk->wrk_evts); i++)
+    clib_fifo_free (app_wrk->wrk_evts[i]);
+
+  vec_free (app_wrk->wrk_evts);
+  vec_free (app_wrk->wrk_mq_congested);
+
   /*
    *  Listener cleanup
    */
 
-  /* *INDENT-OFF* */
   hash_foreach (handle, sm_index, app_wrk->listeners_table, ({
     ls = listen_session_get_from_handle (handle);
     vec_add1 (handles, app_listen_session_handle (ls));
     vec_add1 (sm_indices, sm_index);
     sm = segment_manager_get (sm_index);
   }));
-  /* *INDENT-ON* */
 
   for (i = 0; i < vec_len (handles); i++)
     {
@@ -127,7 +137,6 @@ app_worker_free (app_worker_t * app_wrk)
     }
   vec_free (app_wrk->detached_seg_managers);
   clib_spinlock_free (&app_wrk->detached_seg_managers_lock);
-  clib_spinlock_free (&app_wrk->postponed_mq_msgs_lock);
 
   if (CLIB_DEBUG)
     clib_memset (app_wrk, 0xfe, sizeof (*app_wrk));
@@ -339,7 +348,7 @@ app_worker_init_accepted (session_t * s)
 
   listener = listen_session_get_from_handle (s->listener_handle);
   app_wrk = application_listener_select_worker (listener);
-  if (PREDICT_FALSE (app_wrk->mq_congested))
+  if (PREDICT_FALSE (app_worker_mq_is_congested (app_wrk)))
     return -1;
 
   s->app_wrk_index = app_wrk->wrk_index;
@@ -354,11 +363,40 @@ app_worker_init_accepted (session_t * s)
   return 0;
 }
 
+int
+app_worker_listened_notify (app_worker_t *app_wrk, session_handle_t alsh,
+                           u32 opaque, session_error_t err)
+{
+  session_event_t evt;
+
+  evt.event_type = SESSION_CTRL_EVT_BOUND;
+  evt.session_handle = alsh;
+  evt.as_u64[1] = (u64) opaque << 32 | err;
+
+  app_worker_add_event_custom (app_wrk, 0 /* thread index */, &evt);
+
+  return 0;
+}
+
+int
+app_worker_unlisten_reply (app_worker_t *app_wrk, session_handle_t sh,
+                          u32 opaque, session_error_t err)
+{
+  session_event_t evt = {};
+
+  evt.event_type = SESSION_CTRL_EVT_UNLISTEN_REPLY;
+  evt.session_handle = sh;
+  evt.as_u64[1] = (u64) opaque << 32 | (u32) err;
+
+  app_worker_add_event_custom (app_wrk, 0 /* thread index */, &evt);
+  return 0;
+}
+
 int
 app_worker_accept_notify (app_worker_t * app_wrk, session_t * s)
 {
-  application_t *app = application_get (app_wrk->app_index);
-  return app->cb_fns.session_accept_callback (s);
+  app_worker_add_event (app_wrk, s, SESSION_CTRL_EVT_ACCEPTED);
+  return 0;
 }
 
 int
@@ -382,9 +420,16 @@ int
 app_worker_connect_notify (app_worker_t * app_wrk, session_t * s,
                           session_error_t err, u32 opaque)
 {
-  application_t *app = application_get (app_wrk->app_index);
-  return app->cb_fns.session_connected_callback (app_wrk->wrk_index, opaque,
-                                                s, err);
+  session_event_t evt = {};
+  u32 thread_index;
+
+  evt.event_type = SESSION_CTRL_EVT_CONNECTED;
+  evt.session_index = s ? s->session_index : ~0;
+  evt.as_u64[1] = (u64) opaque << 32 | (u32) err;
+  thread_index = s ? s->thread_index : vlib_get_thread_index ();
+
+  app_worker_add_event_custom (app_wrk, thread_index, &evt);
+  return 0;
 }
 
 int
@@ -402,36 +447,28 @@ app_worker_add_half_open (app_worker_t *app_wrk, session_handle_t sh)
 int
 app_worker_del_half_open (app_worker_t *app_wrk, session_t *s)
 {
-  application_t *app = application_get (app_wrk->app_index);
-  ASSERT (session_vlib_thread_is_cl_thread ());
-  pool_put_index (app_wrk->half_open_table, s->ho_index);
-  if (app->cb_fns.half_open_cleanup_callback)
-    app->cb_fns.half_open_cleanup_callback (s);
+  app_worker_add_event (app_wrk, s, SESSION_CTRL_EVT_HALF_CLEANUP);
   return 0;
 }
 
 int
 app_worker_close_notify (app_worker_t * app_wrk, session_t * s)
 {
-  application_t *app = application_get (app_wrk->app_index);
-  app->cb_fns.session_disconnect_callback (s);
+  app_worker_add_event (app_wrk, s, SESSION_CTRL_EVT_DISCONNECTED);
   return 0;
 }
 
 int
 app_worker_transport_closed_notify (app_worker_t * app_wrk, session_t * s)
 {
-  application_t *app = application_get (app_wrk->app_index);
-  if (app->cb_fns.session_transport_closed_callback)
-    app->cb_fns.session_transport_closed_callback (s);
+  app_worker_add_event (app_wrk, s, SESSION_CTRL_EVT_TRANSPORT_CLOSED);
   return 0;
 }
 
 int
 app_worker_reset_notify (app_worker_t * app_wrk, session_t * s)
 {
-  application_t *app = application_get (app_wrk->app_index);
-  app->cb_fns.session_reset_callback (s);
+  app_worker_add_event (app_wrk, s, SESSION_CTRL_EVT_RESET);
   return 0;
 }
 
@@ -439,29 +476,37 @@ int
 app_worker_cleanup_notify (app_worker_t * app_wrk, session_t * s,
                           session_cleanup_ntf_t ntf)
 {
-  application_t *app = application_get (app_wrk->app_index);
-  if (app->cb_fns.session_cleanup_callback)
-    app->cb_fns.session_cleanup_callback (s, ntf);
+  session_event_t evt;
+
+  evt.event_type = SESSION_CTRL_EVT_CLEANUP;
+  evt.as_u64[0] = (u64) ntf << 32 | s->session_index;
+  evt.as_u64[1] = pointer_to_uword (session_cleanup);
+
+  app_worker_add_event_custom (app_wrk, s->thread_index, &evt);
+
   return 0;
 }
 
 int
-app_worker_builtin_rx (app_worker_t * app_wrk, session_t * s)
+app_worker_cleanup_notify_custom (app_worker_t *app_wrk, session_t *s,
+                                 session_cleanup_ntf_t ntf,
+                                 void (*cleanup_cb) (session_t *s))
 {
-  application_t *app = application_get (app_wrk->app_index);
-  app->cb_fns.builtin_app_rx_callback (s);
+  session_event_t evt;
+
+  evt.event_type = SESSION_CTRL_EVT_CLEANUP;
+  evt.as_u64[0] = (u64) ntf << 32 | s->session_index;
+  evt.as_u64[1] = pointer_to_uword (cleanup_cb);
+
+  app_worker_add_event_custom (app_wrk, s->thread_index, &evt);
+
   return 0;
 }
 
 int
-app_worker_builtin_tx (app_worker_t * app_wrk, session_t * s)
+app_worker_rx_notify (app_worker_t *app_wrk, session_t *s)
 {
-  application_t *app = application_get (app_wrk->app_index);
-
-  if (!app->cb_fns.builtin_app_tx_callback)
-    return 0;
-
-  app->cb_fns.builtin_app_tx_callback (s);
+  app_worker_add_event (app_wrk, s, SESSION_IO_EVT_RX);
   return 0;
 }
 
@@ -469,8 +514,13 @@ int
 app_worker_migrate_notify (app_worker_t * app_wrk, session_t * s,
                           session_handle_t new_sh)
 {
-  application_t *app = application_get (app_wrk->app_index);
-  app->cb_fns.session_migrate_callback (s, new_sh);
+  session_event_t evt;
+
+  evt.event_type = SESSION_CTRL_EVT_MIGRATED;
+  evt.session_index = s->session_index;
+  evt.as_u64[1] = new_sh;
+
+  app_worker_add_event_custom (app_wrk, s->thread_index, &evt);
   return 0;
 }
 
@@ -514,7 +564,7 @@ int
 app_worker_connect_session (app_worker_t *app_wrk, session_endpoint_cfg_t *sep,
                            session_handle_t *rsh)
 {
-  if (PREDICT_FALSE (app_wrk->mq_congested))
+  if (PREDICT_FALSE (app_worker_mq_is_congested (app_wrk)))
     return SESSION_E_REFUSED;
 
   sep->app_wrk_index = app_wrk->wrk_index;
@@ -601,24 +651,27 @@ app_worker_proxy_listener (app_worker_t * app_wrk, u8 fib_proto,
 int
 app_worker_add_segment_notify (app_worker_t * app_wrk, u64 segment_handle)
 {
-  application_t *app = application_get (app_wrk->app_index);
+  session_event_t evt;
+
+  evt.event_type = SESSION_CTRL_EVT_APP_ADD_SEGMENT;
+  evt.as_u64[1] = segment_handle;
 
-  return app->cb_fns.add_segment_callback (app_wrk->wrk_index,
-                                          segment_handle);
+  app_worker_add_event_custom (app_wrk, vlib_get_thread_index (), &evt);
+
+  return 0;
 }
 
 int
 app_worker_del_segment_notify (app_worker_t * app_wrk, u64 segment_handle)
 {
-  application_t *app = application_get (app_wrk->app_index);
-  return app->cb_fns.del_segment_callback (app_wrk->wrk_index,
-                                          segment_handle);
-}
+  session_event_t evt;
 
-static inline u8
-app_worker_application_is_builtin (app_worker_t * app_wrk)
-{
-  return app_wrk->app_is_builtin;
+  evt.event_type = SESSION_CTRL_EVT_APP_DEL_SEGMENT;
+  evt.as_u64[1] = segment_handle;
+
+  app_worker_add_event_custom (app_wrk, vlib_get_thread_index (), &evt);
+
+  return 0;
 }
 
 static int
@@ -677,126 +730,38 @@ app_wrk_send_fd (app_worker_t *app_wrk, int fd)
   return 0;
 }
 
-static int
-mq_try_lock_and_alloc_msg (svm_msg_q_t *mq, session_mq_rings_e ring,
-                          svm_msg_q_msg_t *msg)
-{
-  int rv, n_try = 0;
-
-  while (n_try < 75)
-    {
-      rv = svm_msg_q_lock_and_alloc_msg_w_ring (mq, ring, SVM_Q_NOWAIT, msg);
-      if (!rv)
-       return 0;
-      /*
-       * Break the loop if mq is full, usually this is because the
-       * app has crashed or is hanging on somewhere.
-       */
-      if (rv != -1)
-       break;
-      n_try += 1;
-      usleep (1);
-    }
-
-  return -1;
-}
-
-typedef union app_wrk_mq_rpc_args_
-{
-  struct
-  {
-    u32 thread_index;
-    u32 app_wrk_index;
-  };
-  uword as_uword;
-} app_wrk_mq_rpc_ags_t;
-
-static int
-app_wrk_handle_mq_postponed_msgs (void *arg)
+void
+app_worker_add_event (app_worker_t *app_wrk, session_t *s,
+                     session_evt_type_t evt_type)
 {
-  svm_msg_q_msg_t _mq_msg, *mq_msg = &_mq_msg;
-  app_wrk_postponed_msg_t *pm;
-  app_wrk_mq_rpc_ags_t args;
-  u32 max_msg, n_msg = 0;
-  app_worker_t *app_wrk;
   session_event_t *evt;
-  svm_msg_q_t *mq;
-
-  args.as_uword = pointer_to_uword (arg);
-  app_wrk = app_worker_get_if_valid (args.app_wrk_index);
-  if (!app_wrk)
-    return 0;
-
-  mq = app_wrk->event_queue;
-
-  clib_spinlock_lock (&app_wrk->postponed_mq_msgs_lock);
-
-  max_msg = clib_min (32, clib_fifo_elts (app_wrk->postponed_mq_msgs));
-
-  while (n_msg < max_msg)
-    {
-      pm = clib_fifo_head (app_wrk->postponed_mq_msgs);
-      if (mq_try_lock_and_alloc_msg (mq, pm->ring, mq_msg))
-       break;
 
-      evt = svm_msg_q_msg_data (mq, mq_msg);
-      clib_memset (evt, 0, sizeof (*evt));
-      evt->event_type = pm->event_type;
-      clib_memcpy_fast (evt->data, pm->data, pm->len);
-
-      if (pm->fd != -1)
-       app_wrk_send_fd (app_wrk, pm->fd);
-
-      svm_msg_q_add_and_unlock (mq, mq_msg);
-
-      clib_fifo_advance_head (app_wrk->postponed_mq_msgs, 1);
-      n_msg += 1;
-    }
+  ASSERT (s->thread_index == vlib_get_thread_index ());
+  clib_fifo_add2 (app_wrk->wrk_evts[s->thread_index], evt);
+  evt->session_index = s->session_index;
+  evt->event_type = evt_type;
+  evt->postponed = 0;
 
-  if (!clib_fifo_elts (app_wrk->postponed_mq_msgs))
+  /* First event for this app_wrk. Schedule it for handling in session input */
+  if (clib_fifo_elts (app_wrk->wrk_evts[s->thread_index]) == 1)
     {
-      app_wrk->mq_congested = 0;
+      session_worker_t *wrk = session_main_get_worker (s->thread_index);
+      session_wrk_program_app_wrk_evts (wrk, app_wrk->wrk_index);
     }
-  else
-    {
-      session_send_rpc_evt_to_thread_force (
-       args.thread_index, app_wrk_handle_mq_postponed_msgs,
-       uword_to_pointer (args.as_uword, void *));
-    }
-
-  clib_spinlock_unlock (&app_wrk->postponed_mq_msgs_lock);
-
-  return 0;
 }
 
-static void
-app_wrk_add_mq_postponed_msg (app_worker_t *app_wrk, session_mq_rings_e ring,
-                             u8 evt_type, void *msg, u32 msg_len, int fd)
+void
+app_worker_add_event_custom (app_worker_t *app_wrk, u32 thread_index,
+                            session_event_t *evt)
 {
-  app_wrk_postponed_msg_t *pm;
-
-  clib_spinlock_lock (&app_wrk->postponed_mq_msgs_lock);
+  clib_fifo_add1 (app_wrk->wrk_evts[thread_index], *evt);
 
-  app_wrk->mq_congested = 1;
-
-  clib_fifo_add2 (app_wrk->postponed_mq_msgs, pm);
-  clib_memcpy_fast (pm->data, msg, msg_len);
-  pm->event_type = evt_type;
-  pm->ring = ring;
-  pm->len = msg_len;
-  pm->fd = fd;
-
-  if (clib_fifo_elts (app_wrk->postponed_mq_msgs) == 1)
+  /* First event for this app_wrk. Schedule it for handling in session input */
+  if (clib_fifo_elts (app_wrk->wrk_evts[thread_index]) == 1)
     {
-      app_wrk_mq_rpc_ags_t args = { .thread_index = vlib_get_thread_index (),
-                                   .app_wrk_index = app_wrk->wrk_index };
-
-      session_send_rpc_evt_to_thread_force (
-       args.thread_index, app_wrk_handle_mq_postponed_msgs,
-       uword_to_pointer (args.as_uword, void *));
+      session_worker_t *wrk = session_main_get_worker (thread_index);
+      session_wrk_program_app_wrk_evts (wrk, app_wrk->wrk_index);
     }
-
-  clib_spinlock_unlock (&app_wrk->postponed_mq_msgs_lock);
 }
 
 always_inline void
@@ -806,14 +771,9 @@ app_wrk_send_ctrl_evt_inline (app_worker_t *app_wrk, u8 evt_type, void *msg,
   svm_msg_q_msg_t _mq_msg, *mq_msg = &_mq_msg;
   svm_msg_q_t *mq = app_wrk->event_queue;
   session_event_t *evt;
-  int rv;
-
-  if (PREDICT_FALSE (app_wrk->mq_congested))
-    goto handle_congestion;
 
-  rv = mq_try_lock_and_alloc_msg (mq, SESSION_MQ_CTRL_EVT_RING, mq_msg);
-  if (PREDICT_FALSE (rv))
-    goto handle_congestion;
+  ASSERT (!svm_msg_q_or_ring_is_full (mq, SESSION_MQ_CTRL_EVT_RING));
+  *mq_msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_CTRL_EVT_RING);
 
   evt = svm_msg_q_msg_data (mq, mq_msg);
   clib_memset (evt, 0, sizeof (*evt));
@@ -823,14 +783,7 @@ app_wrk_send_ctrl_evt_inline (app_worker_t *app_wrk, u8 evt_type, void *msg,
   if (fd != -1)
     app_wrk_send_fd (app_wrk, fd);
 
-  svm_msg_q_add_and_unlock (mq, mq_msg);
-
-  return;
-
-handle_congestion:
-
-  app_wrk_add_mq_postponed_msg (app_wrk, SESSION_MQ_CTRL_EVT_RING, evt_type,
-                               msg, msg_len, fd);
+  svm_msg_q_add_raw (mq, mq_msg);
 }
 
 void
@@ -847,109 +800,26 @@ app_wrk_send_ctrl_evt (app_worker_t *app_wrk, u8 evt_type, void *msg,
   app_wrk_send_ctrl_evt_inline (app_wrk, evt_type, msg, msg_len, -1);
 }
 
-static inline int
-app_send_io_evt_rx (app_worker_t * app_wrk, session_t * s)
+u8
+app_worker_mq_wrk_is_congested (app_worker_t *app_wrk, u32 thread_index)
 {
-  svm_msg_q_msg_t _mq_msg = { 0 }, *mq_msg = &_mq_msg;
-  session_event_t *evt;
-  svm_msg_q_t *mq;
-  u32 app_session;
-  int rv;
-
-  if (app_worker_application_is_builtin (app_wrk))
-    return app_worker_builtin_rx (app_wrk, s);
-
-  if (svm_fifo_has_event (s->rx_fifo))
-    return 0;
-
-  app_session = s->rx_fifo->shr->client_session_index;
-  mq = app_wrk->event_queue;
-
-  if (PREDICT_FALSE (app_wrk->mq_congested))
-    goto handle_congestion;
-
-  rv = mq_try_lock_and_alloc_msg (mq, SESSION_MQ_IO_EVT_RING, mq_msg);
-
-  if (PREDICT_FALSE (rv))
-    goto handle_congestion;
-
-  evt = svm_msg_q_msg_data (mq, mq_msg);
-  evt->event_type = SESSION_IO_EVT_RX;
-  evt->session_index = app_session;
-
-  (void) svm_fifo_set_event (s->rx_fifo);
-
-  svm_msg_q_add_and_unlock (mq, mq_msg);
-
-  return 0;
-
-handle_congestion:
-
-  app_wrk_add_mq_postponed_msg (app_wrk, SESSION_MQ_IO_EVT_RING,
-                               SESSION_IO_EVT_RX, &app_session,
-                               sizeof (app_session), -1);
-  return -1;
+  return app_wrk->wrk_mq_congested[thread_index] > 0;
 }
 
-static inline int
-app_send_io_evt_tx (app_worker_t * app_wrk, session_t * s)
+void
+app_worker_set_mq_wrk_congested (app_worker_t *app_wrk, u32 thread_index)
 {
-  svm_msg_q_msg_t _mq_msg = { 0 }, *mq_msg = &_mq_msg;
-  session_event_t *evt;
-  svm_msg_q_t *mq;
-  u32 app_session;
-  int rv;
-
-  if (app_worker_application_is_builtin (app_wrk))
-    return app_worker_builtin_tx (app_wrk, s);
-
-  app_session = s->tx_fifo->shr->client_session_index;
-  mq = app_wrk->event_queue;
-
-  if (PREDICT_FALSE (app_wrk->mq_congested))
-    goto handle_congestion;
-
-  rv = mq_try_lock_and_alloc_msg (mq, SESSION_MQ_IO_EVT_RING, mq_msg);
-
-  if (PREDICT_FALSE (rv))
-    goto handle_congestion;
-
-  evt = svm_msg_q_msg_data (mq, mq_msg);
-  evt->event_type = SESSION_IO_EVT_TX;
-  evt->session_index = app_session;
-
-  svm_msg_q_add_and_unlock (mq, mq_msg);
-
-  return 0;
-
-handle_congestion:
-
-  app_wrk_add_mq_postponed_msg (app_wrk, SESSION_MQ_IO_EVT_RING,
-                               SESSION_IO_EVT_TX, &app_session,
-                               sizeof (app_session), -1);
-  return -1;
+  clib_atomic_fetch_add_relax (&app_wrk->mq_congested, 1);
+  ASSERT (thread_index == vlib_get_thread_index ());
+  app_wrk->wrk_mq_congested[thread_index] = 1;
 }
 
-/* *INDENT-OFF* */
-typedef int (app_send_evt_handler_fn) (app_worker_t *app,
-                                      session_t *s);
-static app_send_evt_handler_fn * const app_send_evt_handler_fns[2] = {
-    app_send_io_evt_rx,
-    app_send_io_evt_tx,
-};
-/* *INDENT-ON* */
-
-/**
- * Send event to application
- *
- * Logic from queue perspective is blocking. However, if queue is full,
- * we return.
- */
-int
-app_worker_lock_and_send_event (app_worker_t * app, session_t * s,
-                               u8 evt_type)
+void
+app_worker_unset_wrk_mq_congested (app_worker_t *app_wrk, u32 thread_index)
 {
-  return app_send_evt_handler_fns[evt_type] (app, s);
+  clib_atomic_fetch_sub_relax (&app_wrk->mq_congested, 1);
+  ASSERT (thread_index == vlib_get_thread_index ());
+  app_wrk->wrk_mq_congested[thread_index] = 0;
 }
 
 u8 *
index ad0ba89..d459b73 100644 (file)
@@ -866,7 +866,7 @@ segment_manager_dealloc_fifos (svm_fifo_t * rx_fifo, svm_fifo_t * tx_fifo)
 
   /* Thread that allocated the fifos must be the one to clean them up */
   ASSERT (rx_fifo->master_thread_index == vlib_get_thread_index () ||
-         rx_fifo->refcnt > 1);
+         rx_fifo->refcnt > 1 || vlib_thread_is_main_w_barrier ());
 
   /* It's possible to have no segment manager if the session was removed
    * as result of a detach. */
index 228234c..b494041 100644 (file)
@@ -240,18 +240,26 @@ session_is_valid (u32 si, u8 thread_index)
       || s->session_state <= SESSION_STATE_LISTENING)
     return 1;
 
-  if (s->session_state == SESSION_STATE_CONNECTING &&
+  if ((s->session_state == SESSION_STATE_CONNECTING ||
+       s->session_state == SESSION_STATE_TRANSPORT_CLOSED) &&
       (s->flags & SESSION_F_HALF_OPEN))
     return 1;
 
   tc = session_get_transport (s);
-  if (s->connection_index != tc->c_index
-      || s->thread_index != tc->thread_index || tc->s_index != si)
+  if (s->connection_index != tc->c_index ||
+      s->thread_index != tc->thread_index || tc->s_index != si)
     return 0;
 
   return 1;
 }
 
+void
+session_cleanup (session_t *s)
+{
+  segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo);
+  session_free (s);
+}
+
 static void
 session_cleanup_notify (session_t * s, session_cleanup_ntf_t ntf)
 {
@@ -259,16 +267,21 @@ session_cleanup_notify (session_t * s, session_cleanup_ntf_t ntf)
 
   app_wrk = app_worker_get_if_valid (s->app_wrk_index);
   if (!app_wrk)
-    return;
+    {
+      if (ntf == SESSION_CLEANUP_TRANSPORT)
+       return;
+
+      session_cleanup (s);
+      return;
+    }
   app_worker_cleanup_notify (app_wrk, s, ntf);
 }
 
 void
-session_free_w_fifos (session_t * s)
+session_program_cleanup (session_t *s)
 {
+  ASSERT (s->session_state == SESSION_STATE_TRANSPORT_DELETED);
   session_cleanup_notify (s, SESSION_CLEANUP_SESSION);
-  segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo);
-  session_free (s);
 }
 
 /**
@@ -285,7 +298,7 @@ session_delete (session_t * s)
   if ((rv = session_lookup_del_session (s)))
     clib_warning ("session %u hash delete rv %d", s->session_index, rv);
 
-  session_free_w_fifos (s);
+  session_program_cleanup (s);
 }
 
 void
@@ -307,7 +320,7 @@ session_cleanup_half_open (session_handle_t ho_handle)
       transport_cleanup (session_get_transport_proto (ho),
                         ho->connection_index, ho->app_index /* overloaded */);
     }
-  else
+  else if (ho->session_state != SESSION_STATE_TRANSPORT_DELETED)
     {
       /* Cleanup half-open session lookup table if need be */
       if (ho->session_state != SESSION_STATE_TRANSPORT_CLOSED)
@@ -333,7 +346,8 @@ session_half_open_free (session_t *ho)
   app_wrk = app_worker_get_if_valid (ho->app_wrk_index);
   if (app_wrk)
     app_worker_del_half_open (app_wrk, ho);
-  session_free (ho);
+  else
+    session_free (ho);
 }
 
 static void
@@ -354,6 +368,7 @@ session_half_open_delete_notify (transport_connection_t *tc)
       if (!(tc->flags & TRANSPORT_CONNECTION_F_NO_LOOKUP))
        session_lookup_del_half_open (tc);
     }
+  session_set_state (ho, SESSION_STATE_TRANSPORT_DELETED);
 
   /* Notification from ctrl thread accepted without rpc */
   if (tc->thread_index == transport_cl_thread ())
@@ -558,10 +573,158 @@ session_fifo_tuning (session_t * s, svm_fifo_t * f,
     }
 }
 
+void
+session_wrk_program_app_wrk_evts (session_worker_t *wrk, u32 app_wrk_index)
+{
+  u8 need_interrupt;
+
+  ASSERT ((wrk - session_main.wrk) == vlib_get_thread_index ());
+  need_interrupt = clib_bitmap_is_zero (wrk->app_wrks_pending_ntf);
+  wrk->app_wrks_pending_ntf =
+    clib_bitmap_set (wrk->app_wrks_pending_ntf, app_wrk_index, 1);
+
+  if (need_interrupt)
+    vlib_node_set_interrupt_pending (wrk->vm, session_input_node.index);
+}
+
+always_inline void
+session_program_io_event (app_worker_t *app_wrk, session_t *s,
+                         session_evt_type_t et, u8 is_cl)
+{
+  if (is_cl)
+    {
+      /* Special events for connectionless sessions */
+      et += SESSION_IO_EVT_BUILTIN_RX - SESSION_IO_EVT_RX;
+
+      ASSERT (s->thread_index == 0);
+      session_event_t evt = {
+       .event_type = et,
+       .session_handle = session_handle (s),
+      };
+
+      app_worker_add_event_custom (app_wrk, vlib_get_thread_index (), &evt);
+    }
+  else
+    {
+      app_worker_add_event (app_wrk, s, et);
+    }
+}
+
+static inline int
+session_notify_subscribers (u32 app_index, session_t *s, svm_fifo_t *f,
+                           session_evt_type_t evt_type)
+{
+  app_worker_t *app_wrk;
+  application_t *app;
+  u8 is_cl;
+  int i;
+
+  app = application_get (app_index);
+  if (!app)
+    return -1;
+
+  is_cl = s->thread_index != vlib_get_thread_index ();
+  for (i = 0; i < f->shr->n_subscribers; i++)
+    {
+      app_wrk = application_get_worker (app, f->shr->subscribers[i]);
+      if (!app_wrk)
+       continue;
+      session_program_io_event (app_wrk, s, evt_type, is_cl ? 1 : 0);
+    }
+
+  return 0;
+}
+
+always_inline int
+session_enqueue_notify_inline (session_t *s, u8 is_cl)
+{
+  app_worker_t *app_wrk;
+
+  app_wrk = app_worker_get_if_valid (s->app_wrk_index);
+  if (PREDICT_FALSE (!app_wrk))
+    return -1;
+
+  session_program_io_event (app_wrk, s, SESSION_IO_EVT_RX, is_cl);
+
+  if (PREDICT_FALSE (svm_fifo_n_subscribers (s->rx_fifo)))
+    return session_notify_subscribers (app_wrk->app_index, s, s->rx_fifo,
+                                      SESSION_IO_EVT_RX);
+
+  return 0;
+}
+
+int
+session_enqueue_notify (session_t *s)
+{
+  return session_enqueue_notify_inline (s, 0 /* is_cl */);
+}
+
+int
+session_enqueue_notify_cl (session_t *s)
+{
+  return session_enqueue_notify_inline (s, 1 /* is_cl */);
+}
+
+int
+session_dequeue_notify (session_t *s)
+{
+  app_worker_t *app_wrk;
+  u8 is_cl;
+
+  /* Unset as soon as event is requested */
+  svm_fifo_clear_deq_ntf (s->tx_fifo);
+
+  app_wrk = app_worker_get_if_valid (s->app_wrk_index);
+  if (PREDICT_FALSE (!app_wrk))
+    return -1;
+
+  is_cl = s->session_state == SESSION_STATE_LISTENING ||
+         s->session_state == SESSION_STATE_OPENED;
+  session_program_io_event (app_wrk, s, SESSION_IO_EVT_TX, is_cl ? 1 : 0);
+
+  if (PREDICT_FALSE (svm_fifo_n_subscribers (s->tx_fifo)))
+    return session_notify_subscribers (app_wrk->app_index, s, s->tx_fifo,
+                                      SESSION_IO_EVT_TX);
+
+  return 0;
+}
+
+/**
+ * Flushes queue of sessions that are to be notified of new data
+ * enqueued events.
+ *
+ * @param transport_proto transport protocol for which queue to be flushed
+ * @param thread_index Thread index for which the flush is to be performed.
+ * @return 0 on success or a positive number indicating the number of
+ *         failures due to API queue being full.
+ */
+void
+session_main_flush_enqueue_events (transport_proto_t transport_proto,
+                                  u32 thread_index)
+{
+  session_worker_t *wrk = session_main_get_worker (thread_index);
+  session_handle_t *handles;
+  session_t *s;
+  u32 i;
+
+  handles = wrk->session_to_enqueue[transport_proto];
+
+  for (i = 0; i < vec_len (handles); i++)
+    {
+      s = session_get_from_handle (handles[i]);
+      session_fifo_tuning (s, s->rx_fifo, SESSION_FT_ACTION_ENQUEUED,
+                          0 /* TODO/not needed */);
+      session_enqueue_notify_inline (s,
+                                    s->thread_index != thread_index ? 1 : 0);
+    }
+
+  vec_reset_length (handles);
+  wrk->session_to_enqueue[transport_proto] = handles;
+}
+
 /*
- * Enqueue data for delivery to session peer. Does not notify peer of enqueue
- * event but on request can queue notification events for later delivery by
- * calling stream_server_flush_enqueue_events().
+ * Enqueue data for delivery to app. If requested, it queues app notification
+ * event for later delivery.
  *
  * @param tc Transport connection which is to be enqueued data
  * @param b Buffer to be enqueued
@@ -610,15 +773,14 @@ session_enqueue_stream_connection (transport_connection_t * tc,
 
   if (queue_event)
     {
-      /* Queue RX event on this fifo. Eventually these will need to be flushed
-       * by calling stream_server_flush_enqueue_events () */
-      session_worker_t *wrk;
-
-      wrk = session_main_get_worker (s->thread_index);
+      /* Queue RX event on this fifo. Eventually these will need to be
+       * flushed by calling @ref session_main_flush_enqueue_events () */
       if (!(s->flags & SESSION_F_RX_EVT))
        {
+         session_worker_t *wrk = session_main_get_worker (s->thread_index);
+         ASSERT (s->thread_index == vlib_get_thread_index ());
          s->flags |= SESSION_F_RX_EVT;
-         vec_add1 (wrk->session_to_enqueue[tc->proto], s->session_index);
+         vec_add1 (wrk->session_to_enqueue[tc->proto], session_handle (s));
        }
 
       session_fifo_tuning (s, s->rx_fifo, SESSION_FT_ACTION_ENQUEUED, 0);
@@ -627,10 +789,11 @@ session_enqueue_stream_connection (transport_connection_t * tc,
   return enqueued;
 }
 
-int
-session_enqueue_dgram_connection (session_t * s,
-                                 session_dgram_hdr_t * hdr,
-                                 vlib_buffer_t * b, u8 proto, u8 queue_event)
+always_inline int
+session_enqueue_dgram_connection_inline (session_t *s,
+                                        session_dgram_hdr_t *hdr,
+                                        vlib_buffer_t *b, u8 proto,
+                                        u8 queue_event, u32 is_cl)
 {
   int rv;
 
@@ -639,12 +802,10 @@ session_enqueue_dgram_connection (session_t * s,
 
   if (PREDICT_TRUE (!(b->flags & VLIB_BUFFER_NEXT_PRESENT)))
     {
-      /* *INDENT-OFF* */
       svm_fifo_seg_t segs[2] = {
          { (u8 *) hdr, sizeof (*hdr) },
          { vlib_buffer_get_current (b), b->current_length }
       };
-      /* *INDENT-ON* */
 
       rv = svm_fifo_enqueue_segments (s->rx_fifo, segs, 2,
                                      0 /* allow_partial */ );
@@ -676,15 +837,16 @@ session_enqueue_dgram_connection (session_t * s,
 
   if (queue_event && rv > 0)
     {
-      /* Queue RX event on this fifo. Eventually these will need to be flushed
-       * by calling stream_server_flush_enqueue_events () */
-      session_worker_t *wrk;
-
-      wrk = session_main_get_worker (s->thread_index);
+      /* Queue RX event on this fifo. Eventually these will need to be
+       * flushed by calling @ref session_main_flush_enqueue_events () */
       if (!(s->flags & SESSION_F_RX_EVT))
        {
+         u32 thread_index =
+           is_cl ? vlib_get_thread_index () : s->thread_index;
+         session_worker_t *wrk = session_main_get_worker (thread_index);
+         ASSERT (s->thread_index == vlib_get_thread_index () || is_cl);
          s->flags |= SESSION_F_RX_EVT;
-         vec_add1 (wrk->session_to_enqueue[proto], s->session_index);
+         vec_add1 (wrk->session_to_enqueue[proto], session_handle (s));
        }
 
       session_fifo_tuning (s, s->rx_fifo, SESSION_FT_ACTION_ENQUEUED, 0);
@@ -692,6 +854,23 @@ session_enqueue_dgram_connection (session_t * s,
   return rv > 0 ? rv : 0;
 }
 
+int
+session_enqueue_dgram_connection (session_t *s, session_dgram_hdr_t *hdr,
+                                 vlib_buffer_t *b, u8 proto, u8 queue_event)
+{
+  return session_enqueue_dgram_connection_inline (s, hdr, b, proto,
+                                                 queue_event, 0 /* is_cl */);
+}
+
+int
+session_enqueue_dgram_connection_cl (session_t *s, session_dgram_hdr_t *hdr,
+                                    vlib_buffer_t *b, u8 proto,
+                                    u8 queue_event)
+{
+  return session_enqueue_dgram_connection_inline (s, hdr, b, proto,
+                                                 queue_event, 1 /* is_cl */);
+}
+
 int
 session_tx_fifo_peek_bytes (transport_connection_t * tc, u8 * buffer,
                            u32 offset, u32 max_bytes)
@@ -715,189 +894,6 @@ session_tx_fifo_dequeue_drop (transport_connection_t * tc, u32 max_bytes)
   return rv;
 }
 
-static inline int
-session_notify_subscribers (u32 app_index, session_t * s,
-                           svm_fifo_t * f, session_evt_type_t evt_type)
-{
-  app_worker_t *app_wrk;
-  application_t *app;
-  int i;
-
-  app = application_get (app_index);
-  if (!app)
-    return -1;
-
-  for (i = 0; i < f->shr->n_subscribers; i++)
-    {
-      app_wrk = application_get_worker (app, f->shr->subscribers[i]);
-      if (!app_wrk)
-       continue;
-      if (app_worker_lock_and_send_event (app_wrk, s, evt_type))
-       return -1;
-    }
-
-  return 0;
-}
-
-/**
- * Notify session peer that new data has been enqueued.
- *
- * @param s    Stream session for which the event is to be generated.
- * @param lock         Flag to indicate if call should lock message queue.
- *
- * @return 0 on success or negative number if failed to send notification.
- */
-static inline int
-session_enqueue_notify_inline (session_t * s)
-{
-  app_worker_t *app_wrk;
-  u32 session_index;
-  u8 n_subscribers;
-  u32 thread_index;
-
-  session_index = s->session_index;
-  thread_index = s->thread_index;
-  n_subscribers = svm_fifo_n_subscribers (s->rx_fifo);
-
-  app_wrk = app_worker_get_if_valid (s->app_wrk_index);
-  if (PREDICT_FALSE (!app_wrk))
-    {
-      SESSION_DBG ("invalid s->app_index = %d", s->app_wrk_index);
-      return 0;
-    }
-
-  SESSION_EVT (SESSION_EVT_ENQ, s, svm_fifo_max_dequeue_prod (s->rx_fifo));
-
-  s->flags &= ~SESSION_F_RX_EVT;
-
-  /* Application didn't confirm accept yet */
-  if (PREDICT_FALSE (s->session_state == SESSION_STATE_ACCEPTING))
-    return 0;
-
-  if (PREDICT_FALSE (app_worker_lock_and_send_event (app_wrk, s,
-                                                    SESSION_IO_EVT_RX)))
-    return -1;
-
-  if (PREDICT_FALSE (n_subscribers))
-    {
-      s = session_get (session_index, thread_index);
-      return session_notify_subscribers (app_wrk->app_index, s,
-                                        s->rx_fifo, SESSION_IO_EVT_RX);
-    }
-
-  return 0;
-}
-
-int
-session_enqueue_notify (session_t * s)
-{
-  return session_enqueue_notify_inline (s);
-}
-
-static void
-session_enqueue_notify_rpc (void *arg)
-{
-  u32 session_index = pointer_to_uword (arg);
-  session_t *s;
-
-  s = session_get_if_valid (session_index, vlib_get_thread_index ());
-  if (!s)
-    return;
-
-  session_enqueue_notify (s);
-}
-
-/**
- * Like session_enqueue_notify, but can be called from a thread that does not
- * own the session.
- */
-void
-session_enqueue_notify_thread (session_handle_t sh)
-{
-  u32 thread_index = session_thread_from_handle (sh);
-  u32 session_index = session_index_from_handle (sh);
-
-  /*
-   * Pass session index (u32) as opposed to handle (u64) in case pointers
-   * are not 64-bit.
-   */
-  session_send_rpc_evt_to_thread (thread_index,
-                                 session_enqueue_notify_rpc,
-                                 uword_to_pointer (session_index, void *));
-}
-
-int
-session_dequeue_notify (session_t * s)
-{
-  app_worker_t *app_wrk;
-
-  svm_fifo_clear_deq_ntf (s->tx_fifo);
-
-  app_wrk = app_worker_get_if_valid (s->app_wrk_index);
-  if (PREDICT_FALSE (!app_wrk))
-    return -1;
-
-  if (PREDICT_FALSE (app_worker_lock_and_send_event (app_wrk, s,
-                                                    SESSION_IO_EVT_TX)))
-    return -1;
-
-  if (PREDICT_FALSE (s->tx_fifo->shr->n_subscribers))
-    return session_notify_subscribers (app_wrk->app_index, s,
-                                      s->tx_fifo, SESSION_IO_EVT_TX);
-
-  return 0;
-}
-
-/**
- * Flushes queue of sessions that are to be notified of new data
- * enqueued events.
- *
- * @param thread_index Thread index for which the flush is to be performed.
- * @return 0 on success or a positive number indicating the number of
- *         failures due to API queue being full.
- */
-int
-session_main_flush_enqueue_events (u8 transport_proto, u32 thread_index)
-{
-  session_worker_t *wrk = session_main_get_worker (thread_index);
-  session_t *s;
-  int i, errors = 0;
-  u32 *indices;
-
-  indices = wrk->session_to_enqueue[transport_proto];
-
-  for (i = 0; i < vec_len (indices); i++)
-    {
-      s = session_get_if_valid (indices[i], thread_index);
-      if (PREDICT_FALSE (!s))
-       {
-         errors++;
-         continue;
-       }
-
-      session_fifo_tuning (s, s->rx_fifo, SESSION_FT_ACTION_ENQUEUED,
-                          0 /* TODO/not needed */ );
-
-      if (PREDICT_FALSE (session_enqueue_notify_inline (s)))
-       errors++;
-    }
-
-  vec_reset_length (indices);
-  wrk->session_to_enqueue[transport_proto] = indices;
-
-  return errors;
-}
-
-int
-session_main_flush_all_enqueue_events (u8 transport_proto)
-{
-  vlib_thread_main_t *vtm = vlib_get_thread_main ();
-  int i, errors = 0;
-  for (i = 0; i < 1 + vtm->n_threads; i++)
-    errors += session_main_flush_enqueue_events (transport_proto, i);
-  return errors;
-}
-
 int
 session_stream_connect_notify (transport_connection_t * tc,
                               session_error_t err)
@@ -951,43 +947,20 @@ session_stream_connect_notify (transport_connection_t * tc,
   return 0;
 }
 
-typedef union session_switch_pool_reply_args_
-{
-  struct
-  {
-    u32 session_index;
-    u16 thread_index;
-    u8 is_closed;
-  };
-  u64 as_u64;
-} session_switch_pool_reply_args_t;
-
-STATIC_ASSERT (sizeof (session_switch_pool_reply_args_t) <= sizeof (uword),
-              "switch pool reply args size");
-
 static void
-session_switch_pool_reply (void *arg)
+session_switch_pool_closed_rpc (void *arg)
 {
-  session_switch_pool_reply_args_t rargs;
+  session_handle_t sh;
   session_t *s;
 
-  rargs.as_u64 = pointer_to_uword (arg);
-  s = session_get_if_valid (rargs.session_index, rargs.thread_index);
+  sh = pointer_to_uword (arg);
+  s = session_get_from_handle_if_valid (sh);
   if (!s)
     return;
 
-  /* Session closed during migration. Clean everything up */
-  if (rargs.is_closed)
-    {
-      transport_cleanup (session_get_transport_proto (s), s->connection_index,
-                        s->thread_index);
-      segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo);
-      session_free (s);
-      return;
-    }
-
-  /* Notify app that it has data on the new session */
-  session_enqueue_notify (s);
+  transport_cleanup (session_get_transport_proto (s), s->connection_index,
+                    s->thread_index);
+  session_cleanup (s);
 }
 
 typedef struct _session_switch_pool_args
@@ -1005,8 +978,7 @@ static void
 session_switch_pool (void *cb_args)
 {
   session_switch_pool_args_t *args = (session_switch_pool_args_t *) cb_args;
-  session_switch_pool_reply_args_t rargs;
-  session_handle_t new_sh;
+  session_handle_t sh, new_sh;
   segment_manager_t *sm;
   app_worker_t *app_wrk;
   session_t *s;
@@ -1014,37 +986,32 @@ session_switch_pool (void *cb_args)
   ASSERT (args->thread_index == vlib_get_thread_index ());
   s = session_get (args->session_index, args->thread_index);
 
-  /* Check if session closed during migration */
-  rargs.is_closed = s->session_state >= SESSION_STATE_TRANSPORT_CLOSING;
+  app_wrk = app_worker_get_if_valid (s->app_wrk_index);
+  if (!app_wrk)
+    goto app_closed;
 
-  transport_cleanup (session_get_transport_proto (s), s->connection_index,
-                    s->thread_index);
+  /* Cleanup fifo segment slice state for fifos */
+  sm = app_worker_get_connect_segment_manager (app_wrk);
+  segment_manager_detach_fifo (sm, &s->rx_fifo);
+  segment_manager_detach_fifo (sm, &s->tx_fifo);
 
-  app_wrk = app_worker_get_if_valid (s->app_wrk_index);
-  if (app_wrk)
-    {
-      /* Cleanup fifo segment slice state for fifos */
-      sm = app_worker_get_connect_segment_manager (app_wrk);
-      segment_manager_detach_fifo (sm, &s->rx_fifo);
-      segment_manager_detach_fifo (sm, &s->tx_fifo);
+  /* Check if session closed during migration */
+  if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSING)
+    goto app_closed;
 
-      /* Notify app, using old session, about the migration event */
-      if (!rargs.is_closed)
-       {
-         new_sh = session_make_handle (args->new_session_index,
-                                       args->new_thread_index);
-         app_worker_migrate_notify (app_wrk, s, new_sh);
-       }
-    }
+  new_sh =
+    session_make_handle (args->new_session_index, args->new_thread_index);
+  app_worker_migrate_notify (app_wrk, s, new_sh);
 
-  /* Trigger app read and fifo updates on the new thread */
-  rargs.session_index = args->new_session_index;
-  rargs.thread_index = args->new_thread_index;
-  session_send_rpc_evt_to_thread (args->new_thread_index,
-                                 session_switch_pool_reply,
-                                 uword_to_pointer (rargs.as_u64, void *));
+  clib_mem_free (cb_args);
+  return;
 
-  session_free (s);
+app_closed:
+  /* Session closed during migration. Clean everything up */
+  sh = session_handle (s);
+  session_send_rpc_evt_to_thread (args->new_thread_index,
+                                 session_switch_pool_closed_rpc,
+                                 uword_to_pointer (sh, void *));
   clib_mem_free (cb_args);
 }
 
@@ -1184,6 +1151,7 @@ session_transport_delete_notify (transport_connection_t * tc)
       break;
     case SESSION_STATE_CLOSED:
       session_cleanup_notify (s, SESSION_CLEANUP_TRANSPORT);
+      session_set_state (s, SESSION_STATE_TRANSPORT_DELETED);
       session_delete (s);
       break;
     default:
@@ -1633,7 +1601,7 @@ session_transport_close (session_t * s)
        session_set_state (s, SESSION_STATE_CLOSED);
       /* If transport is already deleted, just free the session */
       else if (s->session_state >= SESSION_STATE_TRANSPORT_DELETED)
-       session_free_w_fifos (s);
+       session_program_cleanup (s);
       return;
     }
 
@@ -1660,7 +1628,7 @@ session_transport_reset (session_t * s)
       if (s->session_state == SESSION_STATE_TRANSPORT_CLOSED)
        session_set_state (s, SESSION_STATE_CLOSED);
       else if (s->session_state >= SESSION_STATE_TRANSPORT_DELETED)
-       session_free_w_fifos (s);
+       session_program_cleanup (s);
       return;
     }
 
@@ -2157,6 +2125,7 @@ session_node_enable_disable (u8 is_en)
          if (!sm->poll_main)
            continue;
        }
+      vlib_node_set_state (vm, session_input_node.index, mstate);
       vlib_node_set_state (vm, session_queue_node.index, state);
     }
 
index 6b6d1f6..4de7bb2 100644 (file)
@@ -100,8 +100,8 @@ typedef struct session_worker_
   /** Convenience pointer to this worker's vlib_main */
   vlib_main_t *vm;
 
-  /** Per-proto vector of sessions to enqueue */
-  u32 **session_to_enqueue;
+  /** Per-proto vector of session handles to enqueue */
+  session_handle_t **session_to_enqueue;
 
   /** Timerfd used to periodically signal wrk session queue node */
   int timerfd;
@@ -148,6 +148,9 @@ typedef struct session_worker_
   /** List head for first worker evts pending handling on main */
   clib_llist_index_t evts_pending_main;
 
+  /** Per-app-worker bitmap of pending notifications */
+  uword *app_wrks_pending_ntf;
+
   int config_index;
   u8 dma_enabled;
   session_dma_transfer *dma_trans;
@@ -275,6 +278,7 @@ typedef struct session_main_
 
 extern session_main_t session_main;
 extern vlib_node_registration_t session_queue_node;
+extern vlib_node_registration_t session_input_node;
 extern vlib_node_registration_t session_queue_process_node;
 extern vlib_node_registration_t session_queue_pre_input_node;
 
@@ -358,7 +362,8 @@ int session_wrk_handle_mq (session_worker_t *wrk, svm_msg_q_t *mq);
 
 session_t *session_alloc (u32 thread_index);
 void session_free (session_t * s);
-void session_free_w_fifos (session_t * s);
+void session_cleanup (session_t *s);
+void session_program_cleanup (session_t *s);
 void session_cleanup_half_open (session_handle_t ho_handle);
 u8 session_is_valid (u32 si, u8 thread_index);
 
@@ -452,8 +457,9 @@ void session_transport_reset (session_t * s);
 void session_transport_cleanup (session_t * s);
 int session_send_io_evt_to_thread (svm_fifo_t * f,
                                   session_evt_type_t evt_type);
-int session_enqueue_notify (session_t * s);
+int session_enqueue_notify (session_t *s);
 int session_dequeue_notify (session_t * s);
+int session_enqueue_notify_cl (session_t *s);
 int session_send_io_evt_to_thread_custom (void *data, u32 thread_index,
                                          session_evt_type_t evt_type);
 void session_send_rpc_evt_to_thread (u32 thread_index, void *fp,
@@ -485,6 +491,10 @@ int session_enqueue_dgram_connection (session_t * s,
                                      session_dgram_hdr_t * hdr,
                                      vlib_buffer_t * b, u8 proto,
                                      u8 queue_event);
+int session_enqueue_dgram_connection_cl (session_t *s,
+                                        session_dgram_hdr_t *hdr,
+                                        vlib_buffer_t *b, u8 proto,
+                                        u8 queue_event);
 int session_stream_connect_notify (transport_connection_t * tc,
                                   session_error_t err);
 int session_dgram_connect_notify (transport_connection_t * tc,
@@ -502,6 +512,7 @@ int session_stream_accept (transport_connection_t * tc, u32 listener_index,
                           u32 thread_index, u8 notify);
 int session_dgram_accept (transport_connection_t * tc, u32 listener_index,
                          u32 thread_index);
+
 /**
  * Initialize session layer for given transport proto and ip version
  *
@@ -765,8 +776,8 @@ do {                                                                        \
       return clib_error_return (0, "session layer is not enabled");    \
 } while (0)
 
-int session_main_flush_enqueue_events (u8 proto, u32 thread_index);
-int session_main_flush_all_enqueue_events (u8 transport_proto);
+void session_main_flush_enqueue_events (transport_proto_t transport_proto,
+                                       u32 thread_index);
 void session_queue_run_on_main_thread (vlib_main_t * vm);
 
 /**
@@ -799,6 +810,8 @@ fifo_segment_t *session_main_get_wrk_mqs_segment (void);
 void session_node_enable_disable (u8 is_en);
 clib_error_t *vnet_session_enable_disable (vlib_main_t * vm, u8 is_en);
 void session_wrk_handle_evts_main_rpc (void *);
+void session_wrk_program_app_wrk_evts (session_worker_t *wrk,
+                                      u32 app_wrk_index);
 
 session_t *session_alloc_for_connection (transport_connection_t * tc);
 session_t *session_alloc_for_half_open (transport_connection_t *tc);
index ff11bcb..55fc72e 100644 (file)
@@ -460,6 +460,52 @@ mq_send_session_cleanup_cb (session_t * s, session_cleanup_ntf_t ntf)
   app_wrk_send_ctrl_evt (app_wrk, SESSION_CTRL_EVT_CLEANUP, &m, sizeof (m));
 }
 
+static int
+mq_send_io_rx_event (session_t *s)
+{
+  session_event_t *mq_evt;
+  svm_msg_q_msg_t mq_msg;
+  app_worker_t *app_wrk;
+  svm_msg_q_t *mq;
+
+  if (svm_fifo_has_event (s->rx_fifo))
+    return 0;
+
+  app_wrk = app_worker_get (s->app_wrk_index);
+  mq = app_wrk->event_queue;
+
+  mq_msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_IO_EVT_RING);
+  mq_evt = svm_msg_q_msg_data (mq, &mq_msg);
+
+  mq_evt->event_type = SESSION_IO_EVT_RX;
+  mq_evt->session_index = s->rx_fifo->shr->client_session_index;
+
+  (void) svm_fifo_set_event (s->rx_fifo);
+
+  svm_msg_q_add_raw (mq, &mq_msg);
+
+  return 0;
+}
+
+static int
+mq_send_io_tx_event (session_t *s)
+{
+  app_worker_t *app_wrk = app_worker_get (s->app_wrk_index);
+  svm_msg_q_t *mq = app_wrk->event_queue;
+  session_event_t *mq_evt;
+  svm_msg_q_msg_t mq_msg;
+
+  mq_msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_IO_EVT_RING);
+  mq_evt = svm_msg_q_msg_data (mq, &mq_msg);
+
+  mq_evt->event_type = SESSION_IO_EVT_TX;
+  mq_evt->session_index = s->tx_fifo->shr->client_session_index;
+
+  svm_msg_q_add_raw (mq, &mq_msg);
+
+  return 0;
+}
+
 static session_cb_vft_t session_mq_cb_vft = {
   .session_accept_callback = mq_send_session_accepted_cb,
   .session_disconnect_callback = mq_send_session_disconnected_cb,
@@ -469,6 +515,8 @@ static session_cb_vft_t session_mq_cb_vft = {
   .session_cleanup_callback = mq_send_session_cleanup_cb,
   .add_segment_callback = mq_send_add_segment_cb,
   .del_segment_callback = mq_send_del_segment_cb,
+  .builtin_app_rx_callback = mq_send_io_rx_event,
+  .builtin_app_tx_callback = mq_send_io_tx_event,
 };
 
 static void
@@ -1246,6 +1294,8 @@ static session_cb_vft_t session_mq_sapi_cb_vft = {
   .session_cleanup_callback = mq_send_session_cleanup_cb,
   .add_segment_callback = mq_send_add_segment_sapi_cb,
   .del_segment_callback = mq_send_del_segment_sapi_cb,
+  .builtin_app_rx_callback = mq_send_io_rx_event,
+  .builtin_app_tx_callback = mq_send_io_tx_event,
 };
 
 static void
diff --git a/src/vnet/session/session_input.c b/src/vnet/session/session_input.c
new file mode 100644 (file)
index 0000000..8c1f11c
--- /dev/null
@@ -0,0 +1,296 @@
+/* SPDX-License-Identifier: Apache-2.0
+ * Copyright(c) 2023 Cisco Systems, Inc.
+ */
+
+#include <vnet/session/session.h>
+#include <vnet/session/application.h>
+
+static inline int
+mq_try_lock (svm_msg_q_t *mq)
+{
+  int rv, n_try = 0;
+
+  while (n_try < 100)
+    {
+      rv = svm_msg_q_try_lock (mq);
+      if (!rv)
+       return 0;
+      n_try += 1;
+      usleep (1);
+    }
+
+  return -1;
+}
+
+always_inline u8
+mq_event_ring_index (session_evt_type_t et)
+{
+  return (et >= SESSION_CTRL_EVT_RPC ? SESSION_MQ_CTRL_EVT_RING :
+                                            SESSION_MQ_IO_EVT_RING);
+}
+
+void
+app_worker_del_all_events (app_worker_t *app_wrk)
+{
+  session_worker_t *wrk;
+  session_event_t *evt;
+  u32 thread_index;
+  session_t *s;
+
+  for (thread_index = 0; thread_index < vec_len (app_wrk->wrk_evts);
+       thread_index++)
+    {
+      while (clib_fifo_elts (app_wrk->wrk_evts[thread_index]))
+       {
+         clib_fifo_sub2 (app_wrk->wrk_evts[thread_index], evt);
+         switch (evt->event_type)
+           {
+           case SESSION_CTRL_EVT_MIGRATED:
+             s = session_get (evt->session_index, thread_index);
+             transport_cleanup (session_get_transport_proto (s),
+                                s->connection_index, s->thread_index);
+             session_free (s);
+             break;
+           case SESSION_CTRL_EVT_CLEANUP:
+             s = session_get (evt->as_u64[0] & 0xffffffff, thread_index);
+             if (evt->as_u64[0] >> 32 != SESSION_CLEANUP_SESSION)
+               break;
+             uword_to_pointer (evt->as_u64[1], void (*) (session_t * s)) (s);
+             break;
+           case SESSION_CTRL_EVT_HALF_CLEANUP:
+             s = ho_session_get (evt->session_index);
+             pool_put_index (app_wrk->half_open_table, s->ho_index);
+             session_free (s);
+             break;
+           default:
+             break;
+           }
+       }
+      wrk = session_main_get_worker (thread_index);
+      clib_bitmap_set (wrk->app_wrks_pending_ntf, app_wrk->wrk_index, 0);
+    }
+}
+
+always_inline int
+app_worker_flush_events_inline (app_worker_t *app_wrk, u32 thread_index,
+                               u8 is_builtin)
+{
+  application_t *app = application_get (app_wrk->app_index);
+  svm_msg_q_t *mq = app_wrk->event_queue;
+  session_event_t *evt;
+  u32 n_evts = 128, i;
+  u8 ring_index, mq_is_cong;
+  session_t *s;
+
+  n_evts = clib_min (n_evts, clib_fifo_elts (app_wrk->wrk_evts[thread_index]));
+
+  if (!is_builtin)
+    {
+      mq_is_cong = app_worker_mq_is_congested (app_wrk);
+      if (mq_try_lock (mq))
+       {
+         app_worker_set_mq_wrk_congested (app_wrk, thread_index);
+         return 0;
+       }
+    }
+
+  for (i = 0; i < n_evts; i++)
+    {
+      evt = clib_fifo_head (app_wrk->wrk_evts[thread_index]);
+      if (!is_builtin)
+       {
+         ring_index = mq_event_ring_index (evt->event_type);
+         if (svm_msg_q_or_ring_is_full (mq, ring_index))
+           {
+             app_worker_set_mq_wrk_congested (app_wrk, thread_index);
+             break;
+           }
+       }
+
+      switch (evt->event_type)
+       {
+       case SESSION_IO_EVT_RX:
+         s = session_get (evt->session_index, thread_index);
+         s->flags &= ~SESSION_F_RX_EVT;
+         /* Application didn't confirm accept yet */
+         if (PREDICT_FALSE (s->session_state == SESSION_STATE_ACCEPTING))
+           break;
+         app->cb_fns.builtin_app_rx_callback (s);
+         break;
+       /* Handle sessions that might not be on current thread */
+       case SESSION_IO_EVT_BUILTIN_RX:
+         s = session_get_from_handle_if_valid (evt->session_handle);
+         if (!s || s->session_state == SESSION_STATE_ACCEPTING)
+           break;
+         app->cb_fns.builtin_app_rx_callback (s);
+         break;
+       case SESSION_IO_EVT_TX:
+         s = session_get (evt->session_index, thread_index);
+         app->cb_fns.builtin_app_tx_callback (s);
+         break;
+       case SESSION_IO_EVT_TX_MAIN:
+         s = session_get_from_handle_if_valid (evt->session_handle);
+         if (!s)
+           break;
+         app->cb_fns.builtin_app_tx_callback (s);
+         break;
+       case SESSION_CTRL_EVT_BOUND:
+         /* No app cb function currently */
+         if (is_builtin)
+           break;
+         mq_send_session_bound_cb (app_wrk->wrk_index, evt->as_u64[1] >> 32,
+                                   evt->session_handle,
+                                   evt->as_u64[1] & 0xffffffff);
+         break;
+       case SESSION_CTRL_EVT_ACCEPTED:
+         s = session_get (evt->session_index, thread_index);
+         app->cb_fns.session_accept_callback (s);
+         break;
+       case SESSION_CTRL_EVT_CONNECTED:
+         if (!(evt->as_u64[1] & 0xffffffff))
+           s = session_get (evt->session_index, thread_index);
+         else
+           s = 0;
+         app->cb_fns.session_connected_callback (app_wrk->wrk_index,
+                                                 evt->as_u64[1] >> 32, s,
+                                                 evt->as_u64[1] & 0xffffffff);
+         break;
+       case SESSION_CTRL_EVT_DISCONNECTED:
+         s = session_get (evt->session_index, thread_index);
+         app->cb_fns.session_disconnect_callback (s);
+         break;
+       case SESSION_CTRL_EVT_RESET:
+         s = session_get (evt->session_index, thread_index);
+         app->cb_fns.session_reset_callback (s);
+         break;
+       case SESSION_CTRL_EVT_UNLISTEN_REPLY:
+         if (is_builtin)
+           break;
+         mq_send_unlisten_reply (app_wrk, evt->session_handle,
+                                 evt->as_u64[1] >> 32,
+                                 evt->as_u64[1] & 0xffffffff);
+         break;
+       case SESSION_CTRL_EVT_MIGRATED:
+         s = session_get (evt->session_index, thread_index);
+         app->cb_fns.session_migrate_callback (s, evt->as_u64[1]);
+         transport_cleanup (session_get_transport_proto (s),
+                            s->connection_index, s->thread_index);
+         session_free (s);
+         /* Notify app that it has data on the new session */
+         s = session_get_from_handle (evt->as_u64[1]);
+         session_send_io_evt_to_thread (s->rx_fifo,
+                                        SESSION_IO_EVT_BUILTIN_RX);
+         break;
+       case SESSION_CTRL_EVT_TRANSPORT_CLOSED:
+         s = session_get (evt->session_index, thread_index);
+         if (app->cb_fns.session_transport_closed_callback)
+           app->cb_fns.session_transport_closed_callback (s);
+         break;
+       case SESSION_CTRL_EVT_CLEANUP:
+         s = session_get (evt->as_u64[0] & 0xffffffff, thread_index);
+         if (app->cb_fns.session_cleanup_callback)
+           app->cb_fns.session_cleanup_callback (s, evt->as_u64[0] >> 32);
+         if (evt->as_u64[0] >> 32 != SESSION_CLEANUP_SESSION)
+           break;
+         uword_to_pointer (evt->as_u64[1], void (*) (session_t * s)) (s);
+         break;
+       case SESSION_CTRL_EVT_HALF_CLEANUP:
+         s = ho_session_get (evt->session_index);
+         ASSERT (session_vlib_thread_is_cl_thread ());
+         if (app->cb_fns.half_open_cleanup_callback)
+           app->cb_fns.half_open_cleanup_callback (s);
+         pool_put_index (app_wrk->half_open_table, s->ho_index);
+         session_free (s);
+         break;
+       case SESSION_CTRL_EVT_APP_ADD_SEGMENT:
+         app->cb_fns.add_segment_callback (app_wrk->wrk_index,
+                                           evt->as_u64[1]);
+         break;
+       case SESSION_CTRL_EVT_APP_DEL_SEGMENT:
+         app->cb_fns.del_segment_callback (app_wrk->wrk_index,
+                                           evt->as_u64[1]);
+         break;
+       default:
+         clib_warning ("unexpected event: %u", evt->event_type);
+         ASSERT (0);
+         break;
+       }
+      clib_fifo_advance_head (app_wrk->wrk_evts[thread_index], 1);
+    }
+
+  if (!is_builtin)
+    {
+      svm_msg_q_unlock (mq);
+      if (mq_is_cong && i == n_evts)
+       app_worker_unset_wrk_mq_congested (app_wrk, thread_index);
+    }
+
+  return 0;
+}
+
+int
+app_wrk_flush_wrk_events (app_worker_t *app_wrk, u32 thread_index)
+{
+  if (app_worker_application_is_builtin (app_wrk))
+    return app_worker_flush_events_inline (app_wrk, thread_index,
+                                          1 /* is_builtin */);
+  else
+    return app_worker_flush_events_inline (app_wrk, thread_index,
+                                          0 /* is_builtin */);
+}
+
+static inline int
+session_wrk_flush_events (session_worker_t *wrk)
+{
+  app_worker_t *app_wrk;
+  uword app_wrk_index;
+  u32 thread_index;
+
+  thread_index = wrk->vm->thread_index;
+  app_wrk_index = clib_bitmap_first_set (wrk->app_wrks_pending_ntf);
+
+  while (app_wrk_index != ~0)
+    {
+      app_wrk = app_worker_get_if_valid (app_wrk_index);
+      /* app_wrk events are flushed on free, so should be valid here */
+      ASSERT (app_wrk != 0);
+      app_wrk_flush_wrk_events (app_wrk, thread_index);
+
+      if (!clib_fifo_elts (app_wrk->wrk_evts[thread_index]))
+       clib_bitmap_set (wrk->app_wrks_pending_ntf, app_wrk->wrk_index, 0);
+
+      app_wrk_index =
+       clib_bitmap_next_set (wrk->app_wrks_pending_ntf, app_wrk_index + 1);
+    }
+
+  if (!clib_bitmap_is_zero (wrk->app_wrks_pending_ntf))
+    vlib_node_set_interrupt_pending (wrk->vm, session_input_node.index);
+
+  return 0;
+}
+
+VLIB_NODE_FN (session_input_node)
+(vlib_main_t *vm, vlib_node_runtime_t *node, vlib_frame_t *frame)
+{
+  u32 thread_index = vm->thread_index;
+  session_worker_t *wrk;
+
+  wrk = session_main_get_worker (thread_index);
+  session_wrk_flush_events (wrk);
+
+  return 0;
+}
+
+VLIB_REGISTER_NODE (session_input_node) = {
+  .name = "session-input",
+  .type = VLIB_NODE_TYPE_INPUT,
+  .state = VLIB_NODE_STATE_DISABLED,
+};
+
+/*
+ * fd.io coding-style-patch-verification: ON
+ *
+ * Local Variables:
+ * eval: (c-set-style "gnu")
+ * End:
+ */
\ No newline at end of file
index e15625e..4f2cae4 100644 (file)
@@ -142,10 +142,14 @@ session_mq_listen_handler (session_worker_t *wrk, session_evt_elt_t *elt)
     session_worker_stat_error_inc (wrk, rv, 1);
 
   app_wrk = application_get_worker (app, mp->wrk_index);
-  mq_send_session_bound_cb (app_wrk->wrk_index, mp->context, a->handle, rv);
+  app_worker_listened_notify (app_wrk, a->handle, mp->context, rv);
 
   if (mp->ext_config)
     session_mq_free_ext_config (app, mp->ext_config);
+
+  /* Make sure events are flushed before releasing barrier, to avoid
+   * potential race with accept. */
+  app_wrk_flush_wrk_events (app_wrk, 0);
 }
 
 static void
@@ -170,7 +174,8 @@ session_mq_listen_uri_handler (session_worker_t *wrk, session_evt_elt_t *elt)
   rv = vnet_bind_uri (a);
 
   app_wrk = application_get_worker (app, 0);
-  mq_send_session_bound_cb (app_wrk->wrk_index, mp->context, a->handle, rv);
+  app_worker_listened_notify (app_wrk, a->handle, mp->context, rv);
+  app_wrk_flush_wrk_events (app_wrk, 0);
 }
 
 static void
@@ -215,7 +220,7 @@ session_mq_connect_one (session_connect_msg_t *mp)
       wrk = session_main_get_worker (vlib_get_thread_index ());
       session_worker_stat_error_inc (wrk, rv, 1);
       app_wrk = application_get_worker (app, mp->wrk_index);
-      mq_send_session_connected_cb (app_wrk->wrk_index, mp->context, 0, rv);
+      app_worker_connect_notify (app_wrk, 0, rv, mp->context);
     }
 
   if (mp->ext_config)
@@ -324,7 +329,7 @@ session_mq_connect_uri_handler (session_worker_t *wrk, session_evt_elt_t *elt)
     {
       session_worker_stat_error_inc (wrk, rv, 1);
       app_wrk = application_get_worker (app, 0 /* default wrk only */ );
-      mq_send_session_connected_cb (app_wrk->wrk_index, mp->context, 0, rv);
+      app_worker_connect_notify (app_wrk, 0, rv, mp->context);
     }
 }
 
@@ -410,7 +415,7 @@ session_mq_unlisten_handler (session_worker_t *wrk, session_evt_elt_t *elt)
   if (!app_wrk)
     return;
 
-  mq_send_unlisten_reply (app_wrk, sh, mp->context, rv);
+  app_worker_unlisten_reply (app_wrk, sh, mp->context, rv);
 }
 
 static void
@@ -466,7 +471,7 @@ session_mq_accepted_reply_handler (session_worker_t *wrk,
   session_set_state (s, SESSION_STATE_READY);
 
   if (!svm_fifo_is_empty_prod (s->rx_fifo))
-    app_worker_lock_and_send_event (app_wrk, s, SESSION_IO_EVT_RX);
+    app_worker_rx_notify (app_wrk, s);
 
   /* Closed while waiting for app to reply. Resend disconnect */
   if (old_state >= SESSION_STATE_TRANSPORT_CLOSING)
@@ -669,7 +674,7 @@ session_mq_worker_update_handler (void *data)
     session_send_io_evt_to_thread (s->tx_fifo, SESSION_IO_EVT_TX);
 
   if (s->rx_fifo && !svm_fifo_is_empty (s->rx_fifo))
-    app_worker_lock_and_send_event (app_wrk, s, SESSION_IO_EVT_RX);
+    app_worker_rx_notify (app_wrk, s);
 
   if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSING)
     app_worker_close_notify (app_wrk, s);
@@ -1790,7 +1795,7 @@ session_event_dispatch_io (session_worker_t * wrk, vlib_node_runtime_t * node,
        break;
       svm_fifo_unset_event (s->rx_fifo);
       app_wrk = app_worker_get (s->app_wrk_index);
-      app_worker_builtin_rx (app_wrk, s);
+      app_worker_rx_notify (app_wrk, s);
       break;
     case SESSION_IO_EVT_TX_MAIN:
       s = session_get_if_valid (e->session_index, 0 /* main thread */);
index 8755a14..4fe0c7c 100644 (file)
@@ -379,6 +379,8 @@ typedef enum
   SESSION_CTRL_EVT_APP_WRK_RPC,
   SESSION_CTRL_EVT_TRANSPORT_ATTR,
   SESSION_CTRL_EVT_TRANSPORT_ATTR_REPLY,
+  SESSION_CTRL_EVT_TRANSPORT_CLOSED,
+  SESSION_CTRL_EVT_HALF_CLEANUP,
 } session_evt_type_t;
 
 #define foreach_session_ctrl_evt                                              \
@@ -437,6 +439,7 @@ typedef struct
     session_handle_t session_handle;
     session_rpc_args_t rpc_args;
     u32 ctrl_data_index;
+    u64 as_u64[2];
     struct
     {
       u8 data[0];
index f0d039f..3d8afaa 100644 (file)
@@ -1394,11 +1394,10 @@ always_inline uword
 tcp46_established_inline (vlib_main_t * vm, vlib_node_runtime_t * node,
                          vlib_frame_t * frame, int is_ip4)
 {
-  u32 thread_index = vm->thread_index, errors = 0;
+  u32 thread_index = vm->thread_index, n_left_from, *from;
   tcp_worker_ctx_t *wrk = tcp_get_worker (thread_index);
   vlib_buffer_t *bufs[VLIB_FRAME_SIZE], **b;
   u16 err_counters[TCP_N_ERROR] = { 0 };
-  u32 n_left_from, *from;
 
   if (node->flags & VLIB_NODE_FLAG_TRACE)
     tcp_established_trace_frame (vm, node, frame, is_ip4);
@@ -1462,9 +1461,7 @@ tcp46_established_inline (vlib_main_t * vm, vlib_node_runtime_t * node,
       b += 1;
     }
 
-  errors = session_main_flush_enqueue_events (TRANSPORT_PROTO_TCP,
-                                             thread_index);
-  err_counters[TCP_ERROR_MSG_QUEUE_FULL] = errors;
+  session_main_flush_enqueue_events (TRANSPORT_PROTO_TCP, thread_index);
   tcp_store_err_counters (established, err_counters);
   tcp_handle_postponed_dequeues (wrk);
   tcp_handle_disconnects (wrk);
@@ -1746,7 +1743,7 @@ always_inline uword
 tcp46_syn_sent_inline (vlib_main_t *vm, vlib_node_runtime_t *node,
                       vlib_frame_t *frame, int is_ip4)
 {
-  u32 n_left_from, *from, thread_index = vm->thread_index, errors = 0;
+  u32 n_left_from, *from, thread_index = vm->thread_index;
   tcp_worker_ctx_t *wrk = tcp_get_worker (thread_index);
   vlib_buffer_t *bufs[VLIB_FRAME_SIZE], **b;
 
@@ -1981,9 +1978,7 @@ tcp46_syn_sent_inline (vlib_main_t *vm, vlib_node_runtime_t *node,
       tcp_inc_counter (syn_sent, error, 1);
     }
 
-  errors =
-    session_main_flush_enqueue_events (TRANSPORT_PROTO_TCP, thread_index);
-  tcp_inc_counter (syn_sent, TCP_ERROR_MSG_QUEUE_FULL, errors);
+  session_main_flush_enqueue_events (TRANSPORT_PROTO_TCP, thread_index);
   vlib_buffer_free (vm, from, frame->n_vectors);
   tcp_handle_disconnects (wrk);
 
@@ -2058,7 +2053,7 @@ always_inline uword
 tcp46_rcv_process_inline (vlib_main_t *vm, vlib_node_runtime_t *node,
                          vlib_frame_t *frame, int is_ip4)
 {
-  u32 thread_index = vm->thread_index, errors, n_left_from, *from, max_deq;
+  u32 thread_index = vm->thread_index, n_left_from, *from, max_deq;
   tcp_worker_ctx_t *wrk = tcp_get_worker (thread_index);
   vlib_buffer_t *bufs[VLIB_FRAME_SIZE], **b;
 
@@ -2431,9 +2426,7 @@ tcp46_rcv_process_inline (vlib_main_t *vm, vlib_node_runtime_t *node,
       tcp_inc_counter (rcv_process, error, 1);
     }
 
-  errors = session_main_flush_enqueue_events (TRANSPORT_PROTO_TCP,
-                                             thread_index);
-  tcp_inc_counter (rcv_process, TCP_ERROR_MSG_QUEUE_FULL, errors);
+  session_main_flush_enqueue_events (TRANSPORT_PROTO_TCP, thread_index);
   tcp_handle_postponed_dequeues (wrk);
   tcp_handle_disconnects (wrk);
   vlib_buffer_free (vm, from, frame->n_vectors);
index fb625c8..8175e22 100644 (file)
@@ -61,8 +61,7 @@ tls_add_vpp_q_rx_evt (session_t * s)
 int
 tls_add_vpp_q_builtin_rx_evt (session_t * s)
 {
-  if (svm_fifo_set_event (s->rx_fifo))
-    session_send_io_evt_to_thread (s->rx_fifo, SESSION_IO_EVT_BUILTIN_RX);
+  session_enqueue_notify (s);
   return 0;
 }
 
@@ -75,9 +74,10 @@ tls_add_vpp_q_tx_evt (session_t * s)
 }
 
 static inline int
-tls_add_app_q_evt (app_worker_t * app, session_t * app_session)
+tls_add_app_q_evt (app_worker_t *app_wrk, session_t *app_session)
 {
-  return app_worker_lock_and_send_event (app, app_session, SESSION_IO_EVT_RX);
+  app_worker_add_event (app_wrk, app_session, SESSION_IO_EVT_RX);
+  return 0;
 }
 
 u32
index 6e5ed15..33ee2cd 100644 (file)
@@ -149,11 +149,9 @@ udp_connection_enqueue (udp_connection_t * uc0, session_t * s0,
    * enqueue event now while we still have the peeker lock */
   if (s0->thread_index != thread_index)
     {
-      wrote0 = session_enqueue_dgram_connection (s0, hdr0, b,
-                                                TRANSPORT_PROTO_UDP,
-                                                /* queue event */ 0);
-      if (queue_event && !svm_fifo_has_event (s0->rx_fifo))
-       session_enqueue_notify (s0);
+      wrote0 = session_enqueue_dgram_connection_cl (
+       s0, hdr0, b, TRANSPORT_PROTO_UDP,
+       /* queue event */ queue_event && !svm_fifo_has_event (s0->rx_fifo));
     }
   else
     {
@@ -232,10 +230,9 @@ always_inline uword
 udp46_input_inline (vlib_main_t * vm, vlib_node_runtime_t * node,
                    vlib_frame_t * frame, u8 is_ip4)
 {
-  u32 n_left_from, *from, errors, *first_buffer;
+  u32 thread_index = vm->thread_index, n_left_from, *from, *first_buffer;
   vlib_buffer_t *bufs[VLIB_FRAME_SIZE], **b;
   u16 err_counters[UDP_N_ERROR] = { 0 };
-  u32 thread_index = vm->thread_index;
 
   from = first_buffer = vlib_frame_vector_args (frame);
   n_left_from = frame->n_vectors;
@@ -327,9 +324,7 @@ udp46_input_inline (vlib_main_t * vm, vlib_node_runtime_t * node,
     }
 
   vlib_buffer_free (vm, first_buffer, frame->n_vectors);
-  errors = session_main_flush_enqueue_events (TRANSPORT_PROTO_UDP,
-                                             thread_index);
-  err_counters[UDP_ERROR_MQ_FULL] = errors;
+  session_main_flush_enqueue_events (TRANSPORT_PROTO_UDP, thread_index);
   udp_store_err_counters (vm, is_ip4, err_counters);
   return frame->n_vectors;
 }