volatile int vls_mt_needs_locks; /**< mt single vcl wrk needs locks */
clib_rwlock_t vls_pool_lock; /**< per process/wrk vls pool locks */
pthread_mutex_t vls_mt_mq_mlock; /**< vcl mq lock */
- pthread_mutex_t vls_mt_spool_mlock; /**< vcl select or pool lock */
+ pthread_rwlock_t vls_mt_spool_rwlock; /**< vcl select or pool rwlock */
volatile u8 select_mp_check; /**< flag set if select checks done */
struct sigaction old_sa; /**< old sigaction to restore */
} vls_process_local_t;
static pthread_key_t vls_mt_pthread_stop_key;
-typedef enum
+#define foreach_mt_lock_type \
+ _ (LOCK_MQ, "mq") \
+ _ (RLOCK_SPOOL, "rlock_spool") \
+ _ (WLOCK_SPOOL, "wlock_spool") \
+ _ (RLOCK_POOL, "rlock_pool") \
+ _ (WLOCK_POOL, "wlock_pool")
+
+enum vls_mt_lock_type_bit_
+{
+#define _(sym, str) VLS_MT_BIT_##sym,
+ foreach_mt_lock_type
+#undef _
+};
+
+typedef enum vls_mt_lock_type_
{
- VLS_MT_LOCK_MQ = 1 << 0,
- VLS_MT_LOCK_SPOOL = 1 << 1,
- VLS_MT_RLOCK_POOL = 1 << 2,
- VLS_MT_WLOCK_POOL = 1 << 3
+#define _(sym, str) VLS_MT_##sym = 1 << VLS_MT_BIT_##sym,
+ foreach_mt_lock_type
+#undef _
} vls_mt_lock_type_t;
typedef struct vls_mt_pthread_local_
}
}
+static inline void
+vls_mt_pool_unlock (void)
+{
+ if (vlspt->locks_acq & VLS_MT_RLOCK_POOL)
+ {
+ clib_rwlock_reader_unlock (&vlsl->vls_pool_lock);
+ vlspt->locks_acq &= ~VLS_MT_RLOCK_POOL;
+ }
+ else if (vlspt->locks_acq & VLS_MT_WLOCK_POOL)
+ {
+ clib_rwlock_writer_unlock (&vlsl->vls_pool_lock);
+ vlspt->locks_acq &= ~VLS_MT_WLOCK_POOL;
+ }
+}
+
typedef enum
{
VLS_MT_OP_READ,
}
static inline void
-vls_mt_spool_lock (void)
+vls_mt_spool_rlock (void)
{
- pthread_mutex_lock (&vlsl->vls_mt_spool_mlock);
- vlspt->locks_acq |= VLS_MT_LOCK_SPOOL;
+ pthread_rwlock_rdlock (&vlsl->vls_mt_spool_rwlock);
+ vlspt->locks_acq |= VLS_MT_RLOCK_SPOOL;
+}
+
+static inline void
+vls_mt_spool_wlock (void)
+{
+ pthread_rwlock_wrlock (&vlsl->vls_mt_spool_rwlock);
+ vlspt->locks_acq |= VLS_MT_WLOCK_SPOOL;
+}
+
+static inline void
+vls_mt_spool_runlock (void)
+{
+ pthread_rwlock_unlock (&vlsl->vls_mt_spool_rwlock);
+ vlspt->locks_acq &= ~VLS_MT_RLOCK_SPOOL;
+}
+
+static inline void
+vls_mt_spool_wunlock (void)
+{
+ pthread_rwlock_unlock (&vlsl->vls_mt_spool_rwlock);
+ vlspt->locks_acq &= ~VLS_MT_WLOCK_SPOOL;
}
static inline void
vls_mt_spool_unlock (void)
{
- pthread_mutex_unlock (&vlsl->vls_mt_spool_mlock);
- vlspt->locks_acq &= ~VLS_MT_LOCK_SPOOL;
+ if (vlspt->locks_acq & VLS_MT_RLOCK_SPOOL)
+ vls_mt_spool_runlock ();
+ else if (vlspt->locks_acq & VLS_MT_WLOCK_SPOOL)
+ vls_mt_spool_wunlock ();
}
static void
{
if (vlspt->locks_acq & VLS_MT_LOCK_MQ)
vls_mt_mq_unlock ();
- if (vlspt->locks_acq & VLS_MT_LOCK_SPOOL)
+ if (vlspt->locks_acq & (VLS_MT_RLOCK_SPOOL | VLS_MT_WLOCK_SPOOL))
vls_mt_spool_unlock ();
}
vls_mt_locks_init (void)
{
pthread_mutex_init (&vlsl->vls_mt_mq_mlock, NULL);
- pthread_mutex_init (&vlsl->vls_mt_spool_mlock, NULL);
+ pthread_rwlock_init (&vlsl->vls_mt_spool_rwlock, NULL);
}
static void
/* drop locks if any held */
vls_mt_rel_locks ();
- vls_mt_pool_runlock ();
- vls_mt_pool_wunlock ();
+ vls_mt_pool_unlock ();
if (vls_mt_wrk_supported ())
{
/* might get data while waiting for lock */
is_nonblk = vcl_session_read_ready (s) != 0;
}
+ vls_mt_spool_rlock ();
break;
case VLS_MT_OP_WRITE:
ASSERT (s);
/* might get space while waiting for lock */
is_nonblk = vcl_session_is_write_nonblk (s);
}
+ vls_mt_spool_rlock ();
break;
case VLS_MT_OP_XPOLL:
vls_mt_mq_lock ();
+ vls_mt_spool_rlock ();
break;
case VLS_MT_OP_SPOOL:
- vls_mt_spool_lock ();
+ vls_mt_spool_wlock ();
break;
default:
break;
}
}
+ vls_mt_pool_runlock ();
+
vls_mt_mq_unlock ();
+
+ /* Only lock taken should be spool reader. Needed because VCL is probably
+ * touching a session while waiting for events */
}
static inline void
if (vls_mt_wrk_supported () || !vlsl->vls_mt_needs_locks)
return;
- vls_mt_mq_lock ();
+ vls_mt_spool_runlock ();
- /* writers can grab lock now */
- vls_mt_pool_runlock ();
+ /* No locks should be taken by pthread at this point. So writers to spool,
+ * which were blocked until now, should be able to proceed */
+
+ vls_mt_mq_lock ();
+ vls_mt_spool_rlock ();
}
int
vcl_worker_flush_mq_events (vcl_worker_get_current ());
}
-static inline void
+static inline vcl_session_t *
vcl_worker_wait_mq (vcl_worker_t *wrk, u32 session_handle,
vcl_worker_wait_type_t wait)
{
/* Session might've been closed by another thread if multi-threaded
* as opposed to multi-worker app */
if (s->flags & VCL_SESSION_F_APP_CLOSING)
- return;
+ goto done;
}
/* Short sleeps waiting on mq notifications. Note that we drop mq lock for
if (wrk->post_wait_fn)
wrk->post_wait_fn (session_handle);
+
+done:
+ return session_handle != VCL_INVALID_SESSION_INDEX ?
+ vcl_session_get_w_handle (wrk, session_handle) :
+ 0;
}
static int
svm_fifo_unset_event (s->rx_fifo);
svm_fifo_unset_event (rx_fifo);
- vcl_worker_wait_mq (wrk, session_handle, VCL_WRK_WAIT_IO_RX);
+ s = vcl_worker_wait_mq (wrk, session_handle, VCL_WRK_WAIT_IO_RX);
vcl_worker_flush_mq_events (wrk);
}
}
svm_fifo_unset_event (s->rx_fifo);
svm_fifo_unset_event (rx_fifo);
- vcl_worker_wait_mq (wrk, session_handle, VCL_WRK_WAIT_IO_RX);
+ s = vcl_worker_wait_mq (wrk, session_handle, VCL_WRK_WAIT_IO_RX);
vcl_worker_flush_mq_events (wrk);
}
}
if (s->flags & VCL_SESSION_F_APP_CLOSING)
return vcl_session_closed_error (s);
- vcl_worker_wait_mq (wrk, vcl_session_handle (s), VCL_WRK_WAIT_IO_TX);
+ s = vcl_worker_wait_mq (wrk, vcl_session_handle (s),
+ VCL_WRK_WAIT_IO_TX);
vcl_worker_flush_mq_events (wrk);
}
}
if (s->flags & VCL_SESSION_F_APP_CLOSING)
return vcl_session_closed_error (s);
- vcl_worker_wait_mq (wrk, session_handle, VCL_WRK_WAIT_IO_TX);
+ s = vcl_worker_wait_mq (wrk, session_handle, VCL_WRK_WAIT_IO_TX);
vcl_worker_flush_mq_events (wrk);
}
}