vcl: support pre/post cb before mq wait 01/41801/31
authorFlorin Coras <[email protected]>
Sat, 2 Nov 2024 20:27:53 +0000 (16:27 -0400)
committerDave Wallace <[email protected]>
Tue, 7 Jan 2025 20:25:22 +0000 (20:25 +0000)
Allow vls to register cb functions with vcl pre/post mq sleep. These can
be used to drop/reacquire locks prior/after waiting on vcl mq events.

This then allows multi-thread, as opposed to multi-worker, applications
to share sessions between threads without deadlocking, e.g., multiple
threads trying to read/write/close non-blocking sessions. Caveat:
connects still need to be improved.

Type: improvement

Change-Id: I589aa9dfd0553b0fad54f02ed16c3cda9761a83d
Signed-off-by: Florin Coras <[email protected]>
Signed-off-by: Dave Wallace <[email protected]>
src/vcl/ldp.c
src/vcl/vcl_locked.c
src/vcl/vcl_locked.h
src/vcl/vcl_private.c
src/vcl/vcl_private.h
src/vcl/vppcom.c
test/asf/test_vcl.py

index 023a0eb..d9f45b2 100644 (file)
@@ -645,6 +645,7 @@ ldp_select_init_maps (fd_set * __restrict original,
                      u32 n_bytes, uword * si_bits, uword * libc_bits)
 {
   uword si_bits_set, libc_bits_set;
+  u32 session_index, wrk_index;
   vls_handle_t vlsh;
   int fd;
 
@@ -660,10 +661,14 @@ ldp_select_init_maps (fd_set * __restrict original,
     vlsh = ldp_fd_to_vlsh (fd);
     if (vlsh == VLS_INVALID_HANDLE)
       clib_bitmap_set_no_check (*libcb, fd, 1);
-    else if (vlsh_to_worker_index (vlsh) != vppcom_worker_index ())
-      clib_warning ("migration currently not supported");
     else
-      *vclb = clib_bitmap_set (*vclb, vlsh_to_session_index (vlsh), 1);
+      {
+         vlsh_to_session_and_worker_index (vlsh, &session_index, &wrk_index);
+         if (wrk_index != vppcom_worker_index ())
+           clib_warning ("migration currently not supported");
+         else
+           *vclb = clib_bitmap_set (*vclb, session_index, 1);
+      }
   }
 
   si_bits_set = clib_bitmap_last_set (*vclb) + 1;
@@ -686,7 +691,8 @@ ldp_select_vcl_map_to_libc (clib_bitmap_t * vclb, fd_set * __restrict libcb)
 
   clib_bitmap_foreach (si, vclb)  {
     vlsh = vls_session_index_to_vlsh (si);
-    ASSERT (vlsh != VLS_INVALID_HANDLE);
+    if (vlsh == VLS_INVALID_HANDLE)
+      continue;
     fd = ldp_vlsh_to_fd (vlsh);
     if (PREDICT_FALSE (fd < 0))
       {
index f8a306c..a9992e9 100644 (file)
@@ -94,6 +94,22 @@ typedef struct vls_shared_data_
   clib_bitmap_t *listeners; /**< bitmap of wrks actively listening */
 } vls_shared_data_t;
 
+#define foreach_vls_flag _ (APP_CLOSED, "app closed")
+
+enum vls_flags_bits_
+{
+#define _(sym, str) VLS_FLAG_BIT_##sym,
+  foreach_vls_flag
+#undef _
+};
+
+typedef enum vls_flags_
+{
+#define _(sym, str) VLS_F_##sym = 1 << VLS_FLAG_BIT_##sym,
+  foreach_vls_flag
+#undef _
+} vls_flags_t;
+
 typedef struct vcl_locked_session_
 {
   clib_spinlock_t lock;           /**< vls lock when in use */
@@ -104,6 +120,7 @@ typedef struct vcl_locked_session_
   u32 owner_vcl_wrk_index; /**< vcl wrk of the vls wrk at alloc */
   uword *vcl_wrk_index_to_session_index; /**< map vcl wrk to session */
   int libc_epfd;                        /**< epoll fd for libc epoll */
+  vls_flags_t flags;                    /**< vls flags */
 } vcl_locked_session_t;
 
 typedef struct vls_worker_
@@ -303,19 +320,22 @@ vls_mt_add (void)
     vcl_set_worker_index (vlsl->vls_wrk_index);
 
   /* Only allow new pthread to be cancled in vls_mt_mq_lock */
-  int old_state;
   if (vlsl->vls_mt_n_threads >= 2)
-    pthread_setcancelstate (PTHREAD_CANCEL_DISABLE, &old_state);
+    pthread_setcancelstate (PTHREAD_CANCEL_DISABLE, NULL);
 }
 
 static inline void
 vls_mt_mq_lock (void)
 {
-  /* Allow controlled cancelation of thread before grabbing mutex */
-  pthread_testcancel ();
   pthread_mutex_lock (&vlsl->vls_mt_mq_mlock);
 }
 
+static inline int
+vls_mt_mq_trylock (void)
+{
+  return pthread_mutex_trylock (&vlsl->vls_mt_mq_mlock);
+}
+
 static inline void
 vls_mt_mq_unlock (void)
 {
@@ -354,6 +374,14 @@ vls_lock (vcl_locked_session_t * vls)
     clib_spinlock_lock (&vls->lock);
 }
 
+static inline int
+vls_trylock (vcl_locked_session_t *vls)
+{
+  if ((vlsl->vls_mt_n_threads > 1) || vls_is_shared (vls))
+    return !clib_spinlock_trylock (&vls->lock);
+  return 0;
+}
+
 static inline void
 vls_unlock (vcl_locked_session_t * vls)
 {
@@ -559,28 +587,30 @@ vlsh_to_sh (vls_handle_t vlsh)
   return rv;
 }
 
-vcl_session_handle_t
-vlsh_to_session_index (vls_handle_t vlsh)
-{
-  vcl_session_handle_t sh;
-  sh = vlsh_to_sh (vlsh);
-  return vppcom_session_index (sh);
-}
-
-int
-vlsh_to_worker_index (vls_handle_t vlsh)
+void
+vlsh_to_session_and_worker_index (vls_handle_t vlsh, u32 *session_index,
+                                 u32 *wrk_index)
 {
   vcl_locked_session_t *vls;
-  u32 wrk_index;
 
-  vls = vls_get_w_dlock (vlsh);
+  vls_mt_pool_rlock ();
+
+  /* Do not lock vls because for mt apps that use select this could
+   * deadlock if multiple threads select on the same vlsh */
+  vls = vls_get (vlsh);
+
   if (!vls)
-    wrk_index = INVALID_SESSION_ID;
+    {
+      *session_index = INVALID_SESSION_ID;
+      *wrk_index = INVALID_SESSION_ID;
+    }
   else
-    wrk_index = vls->vcl_wrk_index;
-  vls_dunlock (vls);
+    {
+      *session_index = vls->session_index;
+      *wrk_index = vls->vcl_wrk_index;
+    }
 
-  return wrk_index;
+  vls_mt_pool_runlock ();
 }
 
 vls_handle_t
@@ -977,6 +1007,19 @@ vls_worker_copy_on_fork (vcl_worker_t * parent_wrk)
   vls_share_sessions (vls_parent_wrk, vls_wrk);
 }
 
+static inline u8
+vcl_session_is_write_nonblk (vcl_session_t *s)
+{
+  int rv = vcl_session_write_ready (s);
+
+  if (!s->is_dgram)
+    return rv != 0;
+
+  /* Probably not common, but without knowing the actual write size, this is
+   * the only way we can guarantee the write won't block */
+  return rv < 0 ? 1 : (rv > (64 << 10));
+}
+
 static void
 vls_mt_acq_locks (vcl_locked_session_t * vls, vls_mt_ops_t op, int *locks_acq)
 {
@@ -995,23 +1038,25 @@ vls_mt_acq_locks (vcl_locked_session_t * vls, vls_mt_ops_t op, int *locks_acq)
   switch (op)
     {
     case VLS_MT_OP_READ:
-      if (!is_nonblk)
-       is_nonblk = vcl_session_read_ready (s) != 0;
-      if (!is_nonblk)
+      is_nonblk = is_nonblk ?: vcl_session_read_ready (s) != 0;
+      while (!is_nonblk && vls_mt_mq_trylock ())
        {
-         vls_mt_mq_lock ();
-         *locks_acq |= VLS_MT_LOCK_MQ;
+         /* might get data while waiting for lock */
+         is_nonblk = vcl_session_read_ready (s) != 0;
        }
+      if (!is_nonblk)
+       *locks_acq |= VLS_MT_LOCK_MQ;
       break;
     case VLS_MT_OP_WRITE:
       ASSERT (s);
-      if (!is_nonblk)
-       is_nonblk = vcl_session_write_ready (s) != 0;
-      if (!is_nonblk)
+      is_nonblk = is_nonblk ?: vcl_session_is_write_nonblk (s);
+      while (!is_nonblk && vls_mt_mq_trylock ())
        {
-         vls_mt_mq_lock ();
-         *locks_acq |= VLS_MT_LOCK_MQ;
+         /* might get space while waiting for lock */
+         is_nonblk = vcl_session_is_write_nonblk (s);
        }
+      if (!is_nonblk)
+       *locks_acq |= VLS_MT_LOCK_MQ;
       break;
     case VLS_MT_OP_XPOLL:
       vls_mt_mq_lock ();
@@ -1442,15 +1487,23 @@ vls_close (vls_handle_t vlsh)
   int rv;
 
   vls_mt_detect ();
-  vls_mt_pool_wlock ();
 
-  vls = vls_get_and_lock (vlsh);
+  /* Notify vcl while holding a reader lock. Allows other threads to
+   * regrab vls and unlock it if needed. */
+  vls_mt_pool_rlock ();
+
+  vls = vls_get (vlsh);
   if (!vls)
     {
-      vls_mt_pool_wunlock ();
+      vls_mt_pool_runlock ();
       return VPPCOM_EBADFD;
     }
 
+  /* Notify other threads, if any, that app closed. Do it before
+   * grabbing lock as vls might be already locked */
+  vls->flags |= VLS_F_APP_CLOSED;
+  vls_lock (vls);
+
   vls_mt_guard (vls, VLS_MT_OP_SPOOL);
 
   if (vls_is_shared (vls))
@@ -1461,8 +1514,23 @@ vls_close (vls_handle_t vlsh)
   if (vls_mt_wrk_supported ())
     vls_mt_session_cleanup (vls);
 
-  vls_free (vls);
   vls_mt_unguard ();
+  vls_unlock (vls);
+  vls_mt_pool_runlock ();
+
+  /* Drop mt reader lock on pool and acquire writer lock */
+  vls_mt_pool_wlock ();
+
+  vls = vls_get (vlsh);
+
+  /* Other threads might be still using the session */
+  while (vls_trylock (vls))
+    {
+      vls_mt_pool_wunlock ();
+      vls_mt_pool_wlock ();
+    }
+
+  vls_free (vls);
 
   vls_mt_pool_wunlock ();
 
@@ -2002,6 +2070,51 @@ vls_send_session_cleanup_rpc (vcl_worker_t * wrk,
        dst_wrk_index, msg->session_index, msg->origin_vcl_wrk, ret);
 }
 
+static inline void
+vls_mt_mq_wait_lock (vcl_session_handle_t vcl_sh)
+{
+  vcl_locked_session_t *vls;
+  vls_worker_t *wrk;
+  uword *vlshp;
+
+  /* If mt wrk supported or single threaded just return */
+  if (vls_mt_wrk_supported () || (vlsl->vls_mt_n_threads <= 1))
+    return;
+
+  wrk = vls_worker_get_current ();
+  /* Expect current thread to have dropped lock before calling vcl */
+  vls_mt_pool_rlock ();
+
+  vlshp = vls_sh_to_vlsh_table_get (wrk, vcl_sh);
+  if (vlshp)
+    {
+      vls = vls_get (*vlshp);
+      /* Handle case here other threads might've closed the session */
+      if (vls->flags & VLS_F_APP_CLOSED)
+       {
+         vcl_session_t *s;
+         s = vcl_session_get_w_handle (vcl_worker_get_current (), vcl_sh);
+         s->flags |= VCL_SESSION_F_APP_CLOSING;
+         vls_mt_pool_runlock ();
+         return;
+       }
+    }
+
+  vls_mt_mq_unlock ();
+}
+
+static inline void
+vls_mt_mq_wait_unlock (vcl_session_handle_t vcl_sh)
+{
+  if (vls_mt_wrk_supported () || (vlsl->vls_mt_n_threads <= 1))
+    return;
+
+  vls_mt_mq_lock ();
+
+  /* writers can grab lock now */
+  vls_mt_pool_runlock ();
+}
+
 int
 vls_app_create (char *app_name)
 {
@@ -2025,6 +2138,9 @@ vls_app_create (char *app_name)
   clib_rwlock_init (&vlsl->vls_pool_lock);
   vls_mt_locks_init ();
   vcm->wrk_rpc_fn = vls_rpc_handler;
+  /* For multi threaded apps where sessions are implicitly shared, ask vcl
+   * to use these callbacks prior and after blocking on io operations */
+  vcl_worker_set_wait_mq_fns (vls_mt_mq_wait_lock, vls_mt_mq_wait_unlock);
 
   return VPPCOM_OK;
 }
index 0e747ac..98a1c54 100644 (file)
@@ -50,8 +50,9 @@ int vls_select (int n_bits, vcl_si_set * read_map, vcl_si_set * write_map,
                vcl_si_set * except_map, double wait_for_time);
 int vls_poll (vcl_poll_t *vp, uint32_t n_sids, double wait_for_time);
 vcl_session_handle_t vlsh_to_sh (vls_handle_t vlsh);
-vcl_session_handle_t vlsh_to_session_index (vls_handle_t vlsh);
-int vlsh_to_worker_index (vls_handle_t vlsh);
+void vlsh_to_session_and_worker_index (vls_handle_t vlsh,
+                                      uint32_t *session_index,
+                                      uint32_t *wrk_index);
 vls_handle_t vls_session_index_to_vlsh (uint32_t session_index);
 int vls_app_create (char *app_name);
 unsigned char vls_use_eventfd (void);
index 4af79e9..d3ad233 100644 (file)
@@ -75,6 +75,8 @@ vcl_mq_epoll_add_evfd (vcl_worker_t * wrk, svm_msg_q_t * mq)
   mqc->mq_fd = mq_fd;
   mqc->mq = mq;
 
+  fcntl (mq_fd, F_SETFL, O_NONBLOCK);
+
   e.events = EPOLLIN;
   e.data.u32 = mqc_index;
   if (epoll_ctl (wrk->mqs_epfd, EPOLL_CTL_ADD, mq_fd, &e) < 0)
@@ -228,6 +230,15 @@ vcl_worker_detach_sessions (vcl_worker_t *wrk)
   hash_free (seg_indices_map);
 }
 
+void
+vcl_worker_set_wait_mq_fns (vcl_worker_wait_mq_fn pre_wait,
+                           vcl_worker_wait_mq_fn post_wait)
+{
+  vcl_worker_t *wrk = vcl_worker_get_current ();
+  wrk->pre_wait_fn = pre_wait;
+  wrk->post_wait_fn = post_wait;
+}
+
 vcl_worker_t *
 vcl_worker_alloc_and_init ()
 {
@@ -336,7 +347,7 @@ vcl_session_read_ready (vcl_session_t * s)
          max_deq = svm_fifo_max_dequeue_cons (s->rx_fifo);
          if (max_deq <= SESSION_CONN_HDR_LEN)
            return 0;
-         if (svm_fifo_peek (s->rx_fifo, 0, sizeof (ph), (u8 *) & ph) < 0)
+         if (svm_fifo_peek (s->rx_fifo, 0, sizeof (ph), (u8 *) &ph) < 0)
            return 0;
          if (ph.data_length + SESSION_CONN_HDR_LEN > max_deq)
            return 0;
@@ -358,6 +369,39 @@ vcl_session_read_ready (vcl_session_t * s)
     }
 }
 
+/**
+ * Used as alternative to vcl_session_read_ready to avoid peeking udp sessions.
+ * Multi-threaded applications could select the same session from multiple
+ * threads */
+int
+vcl_session_read_ready2 (vcl_session_t *s)
+{
+  if (vcl_session_is_open (s))
+    {
+      if (vcl_session_is_ct (s))
+       return svm_fifo_max_dequeue_cons (s->ct_rx_fifo);
+
+      if (s->is_dgram)
+       {
+         if (svm_fifo_max_dequeue_cons (s->rx_fifo) <= SESSION_CONN_HDR_LEN)
+           return 0;
+
+         /* Return 1 even if not yet sure if a full datagram was received */
+         return 1;
+       }
+
+      return svm_fifo_max_dequeue_cons (s->rx_fifo);
+    }
+  else if (s->session_state == VCL_STATE_LISTEN)
+    {
+      return clib_fifo_elts (s->accept_evts_fifo);
+    }
+  else
+    {
+      return 1;
+    }
+}
+
 int
 vcl_session_write_ready (vcl_session_t * s)
 {
index 7e72b29..c98e1cd 100644 (file)
@@ -143,8 +143,16 @@ typedef enum vcl_session_flags_
   VCL_SESSION_F_PENDING_DISCONNECT = 1 << 6,
   VCL_SESSION_F_PENDING_FREE = 1 << 7,
   VCL_SESSION_F_PENDING_LISTEN = 1 << 8,
+  VCL_SESSION_F_APP_CLOSING = 1 << 9,
 } __clib_packed vcl_session_flags_t;
 
+typedef enum vcl_worker_wait_
+{
+  VCL_WRK_WAIT_CTRL,
+  VCL_WRK_WAIT_IO_RX,
+  VCL_WRK_WAIT_IO_TX,
+} vcl_worker_wait_type_t;
+
 typedef struct vcl_session_
 {
   CLIB_CACHE_LINE_ALIGN_MARK (cacheline0);
@@ -229,6 +237,7 @@ typedef struct vcl_mq_evt_conn_
   int mq_fd;
 } vcl_mq_evt_conn_t;
 
+typedef void (*vcl_worker_wait_mq_fn) (u32 vcl_sh);
 typedef struct vcl_worker_
 {
   CLIB_CACHE_LINE_ALIGN_MARK (cacheline0);
@@ -312,6 +321,10 @@ typedef struct vcl_worker_
   /** vcl needs next epoll_create to go to libc_epoll */
   u8 vcl_needs_real_epoll;
   volatile int rpc_done;
+
+  /* functions to be called pre/post wait if vcl managed by vls */
+  vcl_worker_wait_mq_fn pre_wait_fn;
+  vcl_worker_wait_mq_fn post_wait_fn;
 } vcl_worker_t;
 
 STATIC_ASSERT (sizeof (session_disconnected_msg_t) <= 16,
@@ -724,6 +737,7 @@ u32 vcl_segment_table_lookup (u64 segment_handle);
 void vcl_segment_table_del (u64 segment_handle);
 
 int vcl_session_read_ready (vcl_session_t * session);
+int vcl_session_read_ready2 (vcl_session_t *s);
 int vcl_session_write_ready (vcl_session_t * session);
 int vcl_session_alloc_ext_cfg (vcl_session_t *s,
                               transport_endpt_ext_cfg_type_t type, u32 len);
@@ -783,6 +797,8 @@ svm_fifo_chunk_t *vcl_segment_alloc_chunk (uword segment_handle,
 int vcl_session_share_fifos (vcl_session_t *s, svm_fifo_t *rxf,
                             svm_fifo_t *txf);
 void vcl_worker_detach_sessions (vcl_worker_t *wrk);
+void vcl_worker_set_wait_mq_fns (vcl_worker_wait_mq_fn pre_wait,
+                                vcl_worker_wait_mq_fn post_wait);
 
 /*
  * VCL Binary API
index d084c09..0b21416 100644 (file)
@@ -1263,6 +1263,51 @@ vcl_flush_mq_events (void)
   vcl_worker_flush_mq_events (vcl_worker_get_current ());
 }
 
+static inline void
+vcl_worker_wait_mq (vcl_worker_t *wrk, u32 session_handle,
+                   vcl_worker_wait_type_t wait)
+{
+  vcl_session_t *s = 0;
+  u32 sleeps = 0;
+
+  if (wrk->pre_wait_fn)
+    wrk->pre_wait_fn (session_handle);
+
+  if (session_handle != VCL_INVALID_SESSION_INDEX)
+    {
+      s = vcl_session_get_w_handle (wrk, session_handle);
+      /* Session might've been closed by another thread if multi-threaded
+       * as opposed to multi-worker app */
+      if (s->flags & VCL_SESSION_F_APP_CLOSING)
+       return;
+    }
+
+  /* Short sleeps waiting on mq notifications. Note that we drop mq lock for
+   * multi-thread apps so we may be getting spurious notifications. Not ideal,
+   * as we'd like to only be woken up on events, but since multiple threads may
+   * be waiting on the same mq, events may be missed. If waiting on session io
+   * events, return if session is ready. Otherwise return if mq has event */
+  while (svm_msg_q_timedwait (wrk->app_event_queue, 1e-3))
+    {
+      if (s)
+       {
+         if ((wait == VCL_WRK_WAIT_IO_RX && vcl_session_read_ready (s)) ||
+             (wait == VCL_WRK_WAIT_IO_TX && vcl_session_write_ready (s)))
+           break;
+       }
+      else
+       {
+         if (!svm_msg_q_is_empty (wrk->app_event_queue))
+           break;
+       }
+      if (++sleeps > 200)
+       break;
+    }
+
+  if (wrk->post_wait_fn)
+    wrk->post_wait_fn (session_handle);
+}
+
 static int
 vppcom_session_unbind (u32 session_handle)
 {
@@ -1831,7 +1876,7 @@ again:
       if (svm_msg_q_is_empty (wrk->app_event_queue) && is_nonblocking)
        return VPPCOM_EAGAIN;
 
-      svm_msg_q_wait (wrk->app_event_queue, SVM_MQ_WAIT_EMPTY);
+      vcl_worker_wait_mq (wrk, -1, VCL_WRK_WAIT_CTRL);
       vcl_worker_flush_mq_events (wrk);
       goto again;
     }
@@ -2047,7 +2092,6 @@ vppcom_session_read_internal (uint32_t session_handle, void *buf, int n,
   vcl_session_t *s = 0;
   svm_fifo_t *rx_fifo;
   session_event_t *e;
-  svm_msg_q_t *mq;
   u8 is_ct;
 
   if (PREDICT_FALSE (!buf))
@@ -2079,7 +2123,6 @@ vppcom_session_read_internal (uint32_t session_handle, void *buf, int n,
 
   is_nonblocking = vcl_session_has_attr (s, VCL_SESS_ATTR_NONBLOCK);
   is_ct = vcl_session_is_ct (s);
-  mq = wrk->app_event_queue;
   rx_fifo = is_ct ? s->ct_rx_fifo : s->rx_fifo;
   s->flags &= ~VCL_SESSION_F_HAS_RX_EVT;
 
@@ -2098,12 +2141,14 @@ vppcom_session_read_internal (uint32_t session_handle, void *buf, int n,
        {
          if (vcl_session_is_closing (s))
            return vcl_session_closing_error (s);
+         if (s->flags & VCL_SESSION_F_APP_CLOSING)
+           return vcl_session_closed_error (s);
 
          if (is_ct)
            svm_fifo_unset_event (s->rx_fifo);
          svm_fifo_unset_event (rx_fifo);
 
-         svm_msg_q_wait (mq, SVM_MQ_WAIT_EMPTY);
+         vcl_worker_wait_mq (wrk, session_handle, VCL_WRK_WAIT_IO_RX);
          vcl_worker_flush_mq_events (wrk);
        }
     }
@@ -2188,7 +2233,6 @@ vppcom_session_read_segments (uint32_t session_handle,
   int n_read = 0, is_nonblocking;
   vcl_session_t *s = 0;
   svm_fifo_t *rx_fifo;
-  svm_msg_q_t *mq;
   u8 is_ct;
 
   s = vcl_session_get_w_handle (wrk, session_handle);
@@ -2200,7 +2244,6 @@ vppcom_session_read_segments (uint32_t session_handle,
 
   is_nonblocking = vcl_session_has_attr (s, VCL_SESS_ATTR_NONBLOCK);
   is_ct = vcl_session_is_ct (s);
-  mq = wrk->app_event_queue;
   rx_fifo = is_ct ? s->ct_rx_fifo : s->rx_fifo;
   s->flags &= ~VCL_SESSION_F_HAS_RX_EVT;
 
@@ -2219,12 +2262,14 @@ vppcom_session_read_segments (uint32_t session_handle,
        {
          if (vcl_session_is_closing (s))
            return vcl_session_closing_error (s);
+         if (s->flags & VCL_SESSION_F_APP_CLOSING)
+           return vcl_session_closed_error (s);
 
          if (is_ct)
            svm_fifo_unset_event (s->rx_fifo);
          svm_fifo_unset_event (rx_fifo);
 
-         svm_msg_q_wait (mq, SVM_MQ_WAIT_EMPTY);
+         vcl_worker_wait_mq (wrk, session_handle, VCL_WRK_WAIT_IO_RX);
          vcl_worker_flush_mq_events (wrk);
        }
     }
@@ -2289,7 +2334,6 @@ vppcom_session_write_inline (vcl_worker_t *wrk, vcl_session_t *s, void *buf,
   int n_write, is_nonblocking;
   session_evt_type_t et;
   svm_fifo_t *tx_fifo;
-  svm_msg_q_t *mq;
   u8 is_ct;
 
   /* Accept zero length writes but just return */
@@ -2326,7 +2370,6 @@ vppcom_session_write_inline (vcl_worker_t *wrk, vcl_session_t *s, void *buf,
   tx_fifo = is_ct ? s->ct_tx_fifo : s->tx_fifo;
   is_nonblocking = vcl_session_has_attr (s, VCL_SESS_ATTR_NONBLOCK);
 
-  mq = wrk->app_event_queue;
   if (!vcl_fifo_is_writeable (tx_fifo, n, is_dgram))
     {
       if (is_nonblocking)
@@ -2338,8 +2381,10 @@ vppcom_session_write_inline (vcl_worker_t *wrk, vcl_session_t *s, void *buf,
          svm_fifo_add_want_deq_ntf (tx_fifo, SVM_FIFO_WANT_DEQ_NOTIF);
          if (vcl_session_is_closing (s))
            return vcl_session_closing_error (s);
+         if (s->flags & VCL_SESSION_F_APP_CLOSING)
+           return vcl_session_closed_error (s);
 
-         svm_msg_q_wait (mq, SVM_MQ_WAIT_EMPTY);
+         vcl_worker_wait_mq (wrk, vcl_session_handle (s), VCL_WRK_WAIT_IO_TX);
          vcl_worker_flush_mq_events (wrk);
        }
     }
@@ -2383,7 +2428,6 @@ vppcom_session_write_segments (uint32_t session_handle,
   int n_write = 0, n_bytes = 0, is_nonblocking;
   vcl_session_t *s = 0;
   svm_fifo_t *tx_fifo;
-  svm_msg_q_t *mq;
   u8 is_ct;
   u32 i;
 
@@ -2406,7 +2450,6 @@ vppcom_session_write_segments (uint32_t session_handle,
 
   is_nonblocking = vcl_session_has_attr (s, VCL_SESS_ATTR_NONBLOCK);
   is_ct = vcl_session_is_ct (s);
-  mq = wrk->app_event_queue;
   tx_fifo = is_ct ? s->ct_tx_fifo : s->tx_fifo;
 
   for (i = 0; i < n_segments; i++)
@@ -2423,8 +2466,10 @@ vppcom_session_write_segments (uint32_t session_handle,
          svm_fifo_add_want_deq_ntf (tx_fifo, SVM_FIFO_WANT_DEQ_NOTIF);
          if (vcl_session_is_closing (s))
            return vcl_session_closing_error (s);
+         if (s->flags & VCL_SESSION_F_APP_CLOSING)
+           return vcl_session_closed_error (s);
 
-         svm_msg_q_wait (mq, SVM_MQ_WAIT_EMPTY);
+         vcl_worker_wait_mq (wrk, session_handle, VCL_WRK_WAIT_IO_TX);
          vcl_worker_flush_mq_events (wrk);
        }
     }
@@ -2843,7 +2888,7 @@ check_rd:
          continue;
        }
 
-      if (vcl_session_read_ready (s))
+      if (vcl_session_read_ready2 (s))
        {
          clib_bitmap_set_no_check ((uword *) read_map, sid, 1);
          bits_set++;
index 124ea14..143b46c 100644 (file)
@@ -7,7 +7,7 @@ import subprocess
 import signal
 import glob
 from config import config
-from asfframework import VppAsfTestCase, VppTestRunner, Worker, tag_fixme_ubuntu2404
+from asfframework import VppAsfTestCase, VppTestRunner, Worker
 from vpp_ip_route import VppIpTable, VppIpRoute, VppRoutePath
 
 iperf3 = "/usr/bin/iperf3"
@@ -311,7 +311,6 @@ class VCLTestCase(VppAsfTestCase):
         self.assert_equal(worker_client.result, 0, "Binary test return code")
 
 
-@tag_fixme_ubuntu2404
 class LDPCutThruTestCase(VCLTestCase):
     """LDP Cut Thru Tests"""
 
@@ -1024,7 +1023,6 @@ class VCLThruHostStackNsock(VCLTestCase):
         )
 
 
-@tag_fixme_ubuntu2404
 class LDPThruHostStackIperf(VCLTestCase):
     """LDP Thru Host Stack Iperf"""
 
@@ -1072,7 +1070,6 @@ class LDPThruHostStackIperf(VCLTestCase):
         )
 
 
-@tag_fixme_ubuntu2404
 class LDPThruHostStackIperfUdp(VCLTestCase):
     """LDP Thru Host Stack Iperf UDP"""
 
@@ -1118,7 +1115,6 @@ class LDPThruHostStackIperfUdp(VCLTestCase):
         )
 
 
-@tag_fixme_ubuntu2404
 class LDPIpv6CutThruTestCase(VCLTestCase):
     """LDP IPv6 Cut Thru Tests"""