#include <vppinfra/format.h>
#include <vppinfra/cache.h>
#include <svm/queue.h>
+#include <vppinfra/time.h>
#include <signal.h>
/*
* svm_queue_sub
*/
int
-svm_queue_sub (svm_queue_t * q, u8 * elem, int nowait)
+svm_queue_sub (svm_queue_t * q, u8 * elem, svm_q_conditional_wait_t cond,
+ u32 time)
{
i8 *headp;
int need_broadcast = 0;
+ int rc = 0;
- if (nowait)
+ if (cond == SVM_Q_NOWAIT)
{
/* zero on success */
if (pthread_mutex_trylock (&q->mutex))
if (PREDICT_FALSE (q->cursize == 0))
{
- if (nowait)
+ if (cond == SVM_Q_NOWAIT)
{
pthread_mutex_unlock (&q->mutex);
return (-2);
}
- while (q->cursize == 0)
+ else if (cond == SVM_Q_TIMEDWAIT)
{
- (void) pthread_cond_wait (&q->condvar, &q->mutex);
+ struct timespec ts;
+ ts.tv_sec = unix_time_now () + time;
+ ts.tv_nsec = 0;
+ while (q->cursize == 0 && rc == 0)
+ {
+ rc = pthread_cond_timedwait (&q->condvar, &q->mutex, &ts);
+ }
+ if (rc == ETIMEDOUT)
+ {
+ pthread_mutex_unlock (&q->mutex);
+ return ETIMEDOUT;
+ }
+ }
+ else
+ {
+ while (q->cursize == 0)
+ {
+ (void) pthread_cond_wait (&q->condvar, &q->mutex);
+ }
}
}
char data[0];
} svm_queue_t;
+typedef enum
+{
+ /**
+ * blocking call
+ */
+ SVM_Q_WAIT = 0,
+
+ /**
+ * non-blocking call
+ */
+ SVM_Q_NOWAIT,
+
+ /**
+ * blocking call, return on signal or time-out
+ */
+ SVM_Q_TIMEDWAIT,
+} svm_q_conditional_wait_t;
+
svm_queue_t *svm_queue_init (int nels,
int elsize,
int consumer_pid,
void svm_queue_free (svm_queue_t * q);
int svm_queue_add (svm_queue_t * q, u8 * elem, int nowait);
int svm_queue_add2 (svm_queue_t * q, u8 * elem, u8 * elem2, int nowait);
-int svm_queue_sub (svm_queue_t * q, u8 * elem, int nowait);
+int svm_queue_sub (svm_queue_t * q, u8 * elem, svm_q_conditional_wait_t cond,
+ u32 time);
int svm_queue_sub2 (svm_queue_t * q, u8 * elem);
void svm_queue_lock (svm_queue_t * q);
void svm_queue_unlock (svm_queue_t * q);
{
session_fifo_event_t _e, *e = &_e;;
- svm_queue_sub (utm->our_event_queue, (u8 *) e, 0 /* nowait */ );
+ svm_queue_sub (utm->our_event_queue, (u8 *) e, SVM_Q_WAIT, 0);
switch (e->event_type)
{
case FIFO_EVENT_APP_RX:
utm->client_bytes_received = 0;
while (1)
{
- svm_queue_sub (utm->our_event_queue, (u8 *) e, 0 /* nowait */ );
+ svm_queue_sub (utm->our_event_queue, (u8 *) e, SVM_Q_WAIT, 0);
switch (e->event_type)
{
case FIFO_EVENT_APP_RX:
while (1)
{
- svm_queue_sub (utm->our_event_queue, (u8 *) e, 0 /* nowait */ );
+ svm_queue_sub (utm->our_event_queue, (u8 *) e, SVM_Q_WAIT, 0);
switch (e->event_type)
{
case FIFO_EVENT_APP_RX:
while (1)
{
- svm_queue_sub (utm->our_event_queue, (u8 *) e, 0 /* nowait */ );
+ svm_queue_sub (utm->our_event_queue, (u8 *) e, SVM_Q_WAIT, 0);
switch (e->event_type)
{
case FIFO_EVENT_APP_RX:
{
uword msg;
- while (!svm_queue_sub (q, (u8 *) & msg, 0))
+ while (!svm_queue_sub (q, (u8 *) & msg, SVM_Q_WAIT, 0))
vl_msg_api_handler ((void *) msg);
}
for (i = 0; i < 1000; i++)
{
qstatus = svm_queue_sub (vl_input_queue, (u8 *) & rp,
- 1 /* nowait */ );
+ SVM_Q_NOWAIT, 0);
if (qstatus == 0)
goto read_one_msg;
ts.tv_sec = 0;
am->shmem_hdr = 0;
break;
}
- if (svm_queue_sub (vl_input_queue, (u8 *) & rp, 1) < 0)
+ if (svm_queue_sub (vl_input_queue, (u8 *) & rp, SVM_Q_NOWAIT, 0) < 0)
continue;
/* drain the queue */
mutex_ok:
am->vlib_rp = vlib_rp;
- while (svm_queue_sub (q, (u8 *) & old_msg, 1 /* nowait */ )
+ while (svm_queue_sub (q, (u8 *) & old_msg, SVM_Q_NOWAIT, 0)
!= -2 /* queue underflow */ )
{
vl_msg_api_free_nolock ((void *) old_msg);
q = am->vl_input_queue;
while (1)
- while (!svm_queue_sub(q, (u8 *)&msg, 0))
+ while (!svm_queue_sub(q, (u8 *)&msg, SVM_Q_WAIT, 0))
{
u16 id = ntohs(*((u16 *)msg));
switch (id) {
q = am->vl_input_queue;
again:
- rv = svm_queue_sub(q, (u8 *)&msg, 0);
+ rv = svm_queue_sub(q, (u8 *)&msg, SVM_Q_WAIT, 0);
+
if (rv == 0) {
u16 msg_id = ntohs(*((u16 *)msg));
switch (msg_id) {
}
vapi_error_e
-vapi_recv (vapi_ctx_t ctx, void **msg, size_t * msg_size)
+vapi_recv (vapi_ctx_t ctx, void **msg, size_t * msg_size,
+ svm_q_conditional_wait_t cond, u32 time)
{
if (!ctx || !ctx->connected || !msg || !msg_size)
{
svm_queue_t *q = am->vl_input_queue;
VAPI_DBG ("doing shm queue sub");
- int tmp = svm_queue_sub (q, (u8 *) & data, 0);
+
+ int tmp = svm_queue_sub (q, (u8 *) & data, cond, time);
+
if (tmp == 0)
{
#if VAPI_DEBUG_ALLOC
VAPI_DBG ("vapi_dispatch_one()");
void *msg;
size_t size;
- vapi_error_e rv = vapi_recv (ctx, &msg, &size);
+ vapi_error_e rv = vapi_recv (ctx, &msg, &size, SVM_Q_WAIT, 0);
if (VAPI_OK != rv)
{
VAPI_DBG ("vapi_recv failed with rv=%d", rv);
#include <stdbool.h>
#include <vppinfra/types.h>
#include <vapi/vapi_common.h>
+#include <svm/queue.h>
#ifdef __cplusplus
extern "C"
* @param ctx opaque vapi context
* @param[out] msg pointer to result variable containing message
* @param[out] msg_size pointer to result variable containing message size
+ * @param cond enum type for blocking, non-blocking or timed wait call
+ * @param time in sec for timed wait
*
* @return VAPI_OK on success, other error code on error
*/
- vapi_error_e vapi_recv (vapi_ctx_t ctx, void **msg, size_t * msg_size);
+ vapi_error_e vapi_recv (vapi_ctx_t ctx, void **msg, size_t * msg_size,
+ svm_q_conditional_wait_t cond, u32 time);
/**
* @brief wait for connection to become readable or writable
*
* @return VAPI_OK on success, other error code on error
*/
- vapi_error_e dispatch (const Common_req *limit = nullptr)
+ vapi_error_e dispatch (const Common_req *limit = nullptr, u32 time = 5)
{
std::lock_guard<std::mutex> lock (dispatch_mutex);
vapi_error_e rv = VAPI_OK;
{
void *shm_data;
size_t shm_data_size;
- rv = vapi_recv (vapi_ctx, &shm_data, &shm_data_size);
+ rv = vapi_recv (vapi_ctx, &shm_data, &shm_data_size, SVM_Q_TIMEDWAIT,
+ time);
if (VAPI_OK != rv)
{
return rv;
ck_assert_int_eq (VAPI_EINVAL, rv);
rv = vapi_send (NULL, NULL);
ck_assert_int_eq (VAPI_EINVAL, rv);
- rv = vapi_recv (NULL, NULL, NULL);
+ rv = vapi_recv (NULL, NULL, NULL, 0, 0);
ck_assert_int_eq (VAPI_EINVAL, rv);
- rv = vapi_recv (ctx, NULL, NULL);
+ rv = vapi_recv (ctx, NULL, NULL, 0, 0);
ck_assert_int_eq (VAPI_EINVAL, rv);
vapi_msg_show_version_reply *reply;
- rv = vapi_recv (ctx, (void **) &reply, NULL);
+ rv = vapi_recv (ctx, (void **) &reply, NULL, 0, 0);
ck_assert_int_eq (VAPI_EINVAL, rv);
rv = vapi_disconnect (ctx);
ck_assert_int_eq (VAPI_OK, rv);
ck_assert_int_eq (VAPI_OK, rv);
vapi_msg_show_version_reply *resp;
size_t size;
- rv = vapi_recv (ctx, (void *) &resp, &size);
+ rv = vapi_recv (ctx, (void *) &resp, &size, 0, 0);
ck_assert_int_eq (VAPI_OK, rv);
int dummy;
show_version_cb (NULL, &dummy, VAPI_OK, true, &resp->payload);
static const int max_outstanding_requests = 32;
static const int response_queue_size = 32;
+#define WAIT_FOR_RESPONSE(param, ret) \
+ do \
+ { \
+ ret = con.wait_for_response (param); \
+ } \
+ while (ret == VAPI_EAGAIN)
+
using namespace vapi;
void verify_show_version_reply (const Show_version_reply &r)
Show_version sv (con);
vapi_error_e rv = sv.execute ();
ck_assert_int_eq (VAPI_OK, rv);
- rv = con.wait_for_response (sv);
+ WAIT_FOR_RESPONSE (sv, rv);
ck_assert_int_eq (VAPI_OK, rv);
auto &r = sv.get_response ();
verify_show_version_reply (r);
memcpy (p.mac_address, mac_addresses[i], sizeof (p.mac_address));
auto e = cl.execute ();
ck_assert_int_eq (VAPI_OK, e);
- vapi_error_e rv = con.wait_for_response (cl);
+ vapi_error_e rv;
+ WAIT_FOR_RESPONSE (cl, rv);
ck_assert_int_eq (VAPI_OK, rv);
auto &rp = cl.get_response ().get_payload ();
ck_assert_int_eq (0, rp.retval);
memset (p.name_filter, 0, sizeof (p.name_filter));
auto rv = d.execute ();
ck_assert_int_eq (VAPI_OK, rv);
- rv = con.wait_for_response (d);
+ WAIT_FOR_RESPONSE (d, rv);
ck_assert_int_eq (VAPI_OK, rv);
auto &rs = d.get_result_set ();
for (auto &r : rs)
dl.get_request ().get_payload ().sw_if_index = sw_if_indexes[i];
auto rv = dl.execute ();
ck_assert_int_eq (VAPI_OK, rv);
- rv = con.wait_for_response (dl);
+ WAIT_FOR_RESPONSE (dl, rv);
ck_assert_int_eq (VAPI_OK, rv);
auto &response = dl.get_response ();
auto rp = response.get_payload ();
memset (p.name_filter, 0, sizeof (p.name_filter));
auto rv = d.execute ();
ck_assert_int_eq (VAPI_OK, rv);
- rv = con.wait_for_response (d);
+ WAIT_FOR_RESPONSE (d, rv);
ck_assert_int_eq (VAPI_OK, rv);
auto &rs = d.get_result_set ();
for (auto &r : rs)
memset (p.name_filter, 0, sizeof (p.name_filter));
auto rv = d.execute ();
ck_assert_int_eq (VAPI_OK, rv);
- rv = con.wait_for_response (d);
+ WAIT_FOR_RESPONSE (d, rv);
ck_assert_int_eq (VAPI_OK, rv);
ck_assert_int_ne (0, swdcb.called);
std::array<Delete_loopback_cb, num_ifs> dcbs;
memset (p.name_filter, 0, sizeof (p.name_filter));
auto rv = d.execute ();
ck_assert_int_eq (VAPI_OK, rv);
- rv = con.wait_for_response (d);
+ WAIT_FOR_RESPONSE (d, rv);
ck_assert_int_eq (VAPI_OK, rv);
auto &rs = d.get_result_set ();
for (auto &r : rs)
auto rv = ws.execute ();
ck_assert_int_eq (VAPI_OK, rv);
Event_registration<Vnet_interface_simple_counters> sc (con);
- rv = con.wait_for_response (sc);
+ WAIT_FOR_RESPONSE (sc, rv);
ck_assert_int_eq (VAPI_OK, rv);
auto &rs = sc.get_result_set ();
int count = 0;
ck_assert_int_eq (VAPI_OK, rv);
Vnet_interface_simple_counters_cb cb;
Event_registration<Vnet_interface_simple_counters> sc (con, std::ref (cb));
- rv = con.wait_for_response (sc);
+ WAIT_FOR_RESPONSE (sc, rv);
ck_assert_int_eq (VAPI_OK, rv);
ck_assert_int_ne (0, cb.called);
}
Event_registration<Vnet_interface_simple_counters> sc (con, std::ref (cb));
for (int i = 0; i < 5; ++i)
{
- rv = con.wait_for_response (sc);
+ WAIT_FOR_RESPONSE (sc, rv);
}
ck_assert_int_eq (VAPI_OK, rv);
ck_assert_int_eq (5, cb.called);
ck_assert_int_eq (VAPI_OK, rv);
Event_registration<Vnet_interface_simple_counters> sc (con);
Event_registration<Vnet_interface_combined_counters> cc (con);
- rv = con.wait_for_response (sc);
+ WAIT_FOR_RESPONSE (sc, rv);
ck_assert_int_eq (VAPI_OK, rv);
- rv = con.wait_for_response (cc);
+ WAIT_FOR_RESPONSE (cc, rv);
ck_assert_int_eq (VAPI_OK, rv);
int count = 0;
for (auto &r : sc.get_result_set ())