svm: queue sub: Add conditional timed wait 60/9960/6
authorMohsin Kazmi <sykazmi@cisco.com>
Thu, 4 Jan 2018 17:57:26 +0000 (18:57 +0100)
committerDave Barach <openvpp@barachs.net>
Mon, 22 Jan 2018 18:17:23 +0000 (18:17 +0000)
On reviece side svm queue only permits blocking and
non-blocking calls. This patch adds timed wait blocking
functionality which returns either on signal/event or
on given time out.

It also preserves the original behavior, so it will not
hurt client applications which are using svm queue.

Change-Id: Ic10632170330a80afb8bc781d4ccddfe4da2c69a
Signed-off-by: Mohsin Kazmi <sykazmi@cisco.com>
13 files changed:
src/svm/queue.c
src/svm/queue.h
src/tests/vnet/session/tcp_echo.c
src/tests/vnet/session/udp_echo.c
src/vlibapi/api_shared.c
src/vlibmemory/memory_client.c
src/vlibmemory/memory_shared.c
src/vpp-api/client/client.c
src/vpp-api/vapi/vapi.c
src/vpp-api/vapi/vapi.h
src/vpp-api/vapi/vapi.hpp
test/ext/vapi_c_test.c
test/ext/vapi_cpp_test.cpp

index aef4092..96e40fc 100644 (file)
@@ -26,6 +26,7 @@
 #include <vppinfra/format.h>
 #include <vppinfra/cache.h>
 #include <svm/queue.h>
+#include <vppinfra/time.h>
 #include <signal.h>
 
 /*
@@ -299,12 +300,14 @@ svm_queue_add2 (svm_queue_t * q, u8 * elem, u8 * elem2, int nowait)
  * svm_queue_sub
  */
 int
-svm_queue_sub (svm_queue_t * q, u8 * elem, int nowait)
+svm_queue_sub (svm_queue_t * q, u8 * elem, svm_q_conditional_wait_t cond,
+              u32 time)
 {
   i8 *headp;
   int need_broadcast = 0;
+  int rc = 0;
 
-  if (nowait)
+  if (cond == SVM_Q_NOWAIT)
     {
       /* zero on success */
       if (pthread_mutex_trylock (&q->mutex))
@@ -317,14 +320,32 @@ svm_queue_sub (svm_queue_t * q, u8 * elem, int nowait)
 
   if (PREDICT_FALSE (q->cursize == 0))
     {
-      if (nowait)
+      if (cond == SVM_Q_NOWAIT)
        {
          pthread_mutex_unlock (&q->mutex);
          return (-2);
        }
-      while (q->cursize == 0)
+      else if (cond == SVM_Q_TIMEDWAIT)
        {
-         (void) pthread_cond_wait (&q->condvar, &q->mutex);
+         struct timespec ts;
+         ts.tv_sec = unix_time_now () + time;
+         ts.tv_nsec = 0;
+         while (q->cursize == 0 && rc == 0)
+           {
+             rc = pthread_cond_timedwait (&q->condvar, &q->mutex, &ts);
+           }
+         if (rc == ETIMEDOUT)
+           {
+             pthread_mutex_unlock (&q->mutex);
+             return ETIMEDOUT;
+           }
+       }
+      else
+       {
+         while (q->cursize == 0)
+           {
+             (void) pthread_cond_wait (&q->condvar, &q->mutex);
+           }
        }
     }
 
index dc1fc36..856c172 100644 (file)
@@ -36,6 +36,24 @@ typedef struct _svm_queue
   char data[0];
 } svm_queue_t;
 
+typedef enum
+{
+  /**
+   * blocking call
+   */
+  SVM_Q_WAIT = 0,
+
+  /**
+   * non-blocking call
+   */
+  SVM_Q_NOWAIT,
+
+  /**
+   * blocking call, return on signal or time-out
+   */
+  SVM_Q_TIMEDWAIT,
+} svm_q_conditional_wait_t;
+
 svm_queue_t *svm_queue_init (int nels,
                             int elsize,
                             int consumer_pid,
@@ -43,7 +61,8 @@ svm_queue_t *svm_queue_init (int nels,
 void svm_queue_free (svm_queue_t * q);
 int svm_queue_add (svm_queue_t * q, u8 * elem, int nowait);
 int svm_queue_add2 (svm_queue_t * q, u8 * elem, u8 * elem2, int nowait);
-int svm_queue_sub (svm_queue_t * q, u8 * elem, int nowait);
+int svm_queue_sub (svm_queue_t * q, u8 * elem, svm_q_conditional_wait_t cond,
+                  u32 time);
 int svm_queue_sub2 (svm_queue_t * q, u8 * elem);
 void svm_queue_lock (svm_queue_t * q);
 void svm_queue_unlock (svm_queue_t * q);
index ed9e2f7..3fef65a 100644 (file)
@@ -505,7 +505,7 @@ client_handle_event_queue (uri_tcp_test_main_t * utm)
 {
   session_fifo_event_t _e, *e = &_e;;
 
-  svm_queue_sub (utm->our_event_queue, (u8 *) e, 0 /* nowait */ );
+  svm_queue_sub (utm->our_event_queue, (u8 *) e, SVM_Q_WAIT, 0);
   switch (e->event_type)
     {
     case FIFO_EVENT_APP_RX:
@@ -530,7 +530,7 @@ client_rx_thread_fn (void *arg)
   utm->client_bytes_received = 0;
   while (1)
     {
-      svm_queue_sub (utm->our_event_queue, (u8 *) e, 0 /* nowait */ );
+      svm_queue_sub (utm->our_event_queue, (u8 *) e, SVM_Q_WAIT, 0);
       switch (e->event_type)
        {
        case FIFO_EVENT_APP_RX:
@@ -1027,7 +1027,7 @@ server_handle_event_queue (uri_tcp_test_main_t * utm)
 
   while (1)
     {
-      svm_queue_sub (utm->our_event_queue, (u8 *) e, 0 /* nowait */ );
+      svm_queue_sub (utm->our_event_queue, (u8 *) e, SVM_Q_WAIT, 0);
       switch (e->event_type)
        {
        case FIFO_EVENT_APP_RX:
index 4e17a17..07e7237 100644 (file)
@@ -956,7 +956,7 @@ server_handle_event_queue (uri_udp_test_main_t * utm)
 
   while (1)
     {
-      svm_queue_sub (utm->our_event_queue, (u8 *) e, 0 /* nowait */ );
+      svm_queue_sub (utm->our_event_queue, (u8 *) e, SVM_Q_WAIT, 0);
       switch (e->event_type)
        {
        case FIFO_EVENT_APP_RX:
index d419626..3b9c0f4 100644 (file)
@@ -758,7 +758,7 @@ vl_msg_api_queue_handler (svm_queue_t * q)
 {
   uword msg;
 
-  while (!svm_queue_sub (q, (u8 *) & msg, 0))
+  while (!svm_queue_sub (q, (u8 *) & msg, SVM_Q_WAIT, 0))
     vl_msg_api_handler ((void *) msg);
 }
 
index a0b5cc9..deb913b 100644 (file)
@@ -218,7 +218,7 @@ vl_client_connect (const char *name, int ctx_quota, int input_queue_size)
       for (i = 0; i < 1000; i++)
        {
          qstatus = svm_queue_sub (vl_input_queue, (u8 *) & rp,
-                                  1 /* nowait */ );
+                                  SVM_Q_NOWAIT, 0);
          if (qstatus == 0)
            goto read_one_msg;
          ts.tv_sec = 0;
@@ -305,7 +305,7 @@ vl_client_disconnect (void)
          am->shmem_hdr = 0;
          break;
        }
-      if (svm_queue_sub (vl_input_queue, (u8 *) & rp, 1) < 0)
+      if (svm_queue_sub (vl_input_queue, (u8 *) & rp, SVM_Q_NOWAIT, 0) < 0)
        continue;
 
       /* drain the queue */
index c9ace1b..5b7d735 100644 (file)
@@ -577,7 +577,7 @@ vl_map_shmem (const char *region_name, int is_vlib)
 
        mutex_ok:
          am->vlib_rp = vlib_rp;
-         while (svm_queue_sub (q, (u8 *) & old_msg, 1 /* nowait */ )
+         while (svm_queue_sub (q, (u8 *) & old_msg, SVM_Q_NOWAIT, 0)
                 != -2 /* queue underflow */ )
            {
              vl_msg_api_free_nolock ((void *) old_msg);
index f137223..fd2c417 100644 (file)
@@ -143,7 +143,7 @@ vac_rx_thread_fn (void *arg)
   q = am->vl_input_queue;
 
   while (1)
-    while (!svm_queue_sub(q, (u8 *)&msg, 0))
+    while (!svm_queue_sub(q, (u8 *)&msg, SVM_Q_WAIT, 0))
       {
        u16 id = ntohs(*((u16 *)msg));
        switch (id) {
@@ -404,7 +404,8 @@ vac_read (char **p, int *l, u16 timeout)
   q = am->vl_input_queue;
 
  again:
-  rv = svm_queue_sub(q, (u8 *)&msg, 0);
+  rv = svm_queue_sub(q, (u8 *)&msg, SVM_Q_WAIT, 0);
+
   if (rv == 0) {
     u16 msg_id = ntohs(*((u16 *)msg));
     switch (msg_id) {
index 7aa346b..754c89c 100644 (file)
@@ -502,7 +502,8 @@ out:
 }
 
 vapi_error_e
-vapi_recv (vapi_ctx_t ctx, void **msg, size_t * msg_size)
+vapi_recv (vapi_ctx_t ctx, void **msg, size_t * msg_size,
+          svm_q_conditional_wait_t cond, u32 time)
 {
   if (!ctx || !ctx->connected || !msg || !msg_size)
     {
@@ -519,7 +520,9 @@ vapi_recv (vapi_ctx_t ctx, void **msg, size_t * msg_size)
 
   svm_queue_t *q = am->vl_input_queue;
   VAPI_DBG ("doing shm queue sub");
-  int tmp = svm_queue_sub (q, (u8 *) & data, 0);
+
+  int tmp = svm_queue_sub (q, (u8 *) & data, cond, time);
+
   if (tmp == 0)
     {
 #if VAPI_DEBUG_ALLOC
@@ -700,7 +703,7 @@ vapi_dispatch_one (vapi_ctx_t ctx)
   VAPI_DBG ("vapi_dispatch_one()");
   void *msg;
   size_t size;
-  vapi_error_e rv = vapi_recv (ctx, &msg, &size);
+  vapi_error_e rv = vapi_recv (ctx, &msg, &size, SVM_Q_WAIT, 0);
   if (VAPI_OK != rv)
     {
       VAPI_DBG ("vapi_recv failed with rv=%d", rv);
index 245bf65..fc48e7d 100644 (file)
@@ -22,6 +22,7 @@
 #include <stdbool.h>
 #include <vppinfra/types.h>
 #include <vapi/vapi_common.h>
+#include <svm/queue.h>
 
 #ifdef __cplusplus
 extern "C"
@@ -162,10 +163,13 @@ extern "C"
  * @param ctx opaque vapi context
  * @param[out] msg pointer to result variable containing message
  * @param[out] msg_size pointer to result variable containing message size
+ * @param cond enum type for blocking, non-blocking or timed wait call
+ * @param time in sec for timed wait
  *
  * @return VAPI_OK on success, other error code on error
  */
-  vapi_error_e vapi_recv (vapi_ctx_t ctx, void **msg, size_t * msg_size);
+  vapi_error_e vapi_recv (vapi_ctx_t ctx, void **msg, size_t * msg_size,
+                         svm_q_conditional_wait_t cond, u32 time);
 
 /**
  * @brief wait for connection to become readable or writable
index 893851a..28357db 100644 (file)
@@ -245,7 +245,7 @@ public:
    *
    * @return VAPI_OK on success, other error code on error
    */
-  vapi_error_e dispatch (const Common_req *limit = nullptr)
+  vapi_error_e dispatch (const Common_req *limit = nullptr, u32 time = 5)
   {
     std::lock_guard<std::mutex> lock (dispatch_mutex);
     vapi_error_e rv = VAPI_OK;
@@ -254,7 +254,8 @@ public:
       {
         void *shm_data;
         size_t shm_data_size;
-        rv = vapi_recv (vapi_ctx, &shm_data, &shm_data_size);
+        rv = vapi_recv (vapi_ctx, &shm_data, &shm_data_size, SVM_Q_TIMEDWAIT,
+                        time);
         if (VAPI_OK != rv)
           {
             return rv;
index ad75ad6..52a939f 100644 (file)
@@ -66,12 +66,12 @@ START_TEST (test_invalid_values)
   ck_assert_int_eq (VAPI_EINVAL, rv);
   rv = vapi_send (NULL, NULL);
   ck_assert_int_eq (VAPI_EINVAL, rv);
-  rv = vapi_recv (NULL, NULL, NULL);
+  rv = vapi_recv (NULL, NULL, NULL, 0, 0);
   ck_assert_int_eq (VAPI_EINVAL, rv);
-  rv = vapi_recv (ctx, NULL, NULL);
+  rv = vapi_recv (ctx, NULL, NULL, 0, 0);
   ck_assert_int_eq (VAPI_EINVAL, rv);
   vapi_msg_show_version_reply *reply;
-  rv = vapi_recv (ctx, (void **) &reply, NULL);
+  rv = vapi_recv (ctx, (void **) &reply, NULL, 0, 0);
   ck_assert_int_eq (VAPI_EINVAL, rv);
   rv = vapi_disconnect (ctx);
   ck_assert_int_eq (VAPI_OK, rv);
@@ -531,7 +531,7 @@ START_TEST (test_show_version_1)
   ck_assert_int_eq (VAPI_OK, rv);
   vapi_msg_show_version_reply *resp;
   size_t size;
-  rv = vapi_recv (ctx, (void *) &resp, &size);
+  rv = vapi_recv (ctx, (void *) &resp, &size, 0, 0);
   ck_assert_int_eq (VAPI_OK, rv);
   int dummy;
   show_version_cb (NULL, &dummy, VAPI_OK, true, &resp->payload);
index 14c35d5..25ea9cc 100644 (file)
@@ -37,6 +37,13 @@ static char *api_prefix = nullptr;
 static const int max_outstanding_requests = 32;
 static const int response_queue_size = 32;
 
+#define WAIT_FOR_RESPONSE(param, ret)      \
+  do                                       \
+    {                                      \
+      ret = con.wait_for_response (param); \
+    }                                      \
+  while (ret == VAPI_EAGAIN)
+
 using namespace vapi;
 
 void verify_show_version_reply (const Show_version_reply &r)
@@ -68,7 +75,7 @@ START_TEST (test_show_version_1)
   Show_version sv (con);
   vapi_error_e rv = sv.execute ();
   ck_assert_int_eq (VAPI_OK, rv);
-  rv = con.wait_for_response (sv);
+  WAIT_FOR_RESPONSE (sv, rv);
   ck_assert_int_eq (VAPI_OK, rv);
   auto &r = sv.get_response ();
   verify_show_version_reply (r);
@@ -122,7 +129,8 @@ START_TEST (test_loopbacks_1)
       memcpy (p.mac_address, mac_addresses[i], sizeof (p.mac_address));
       auto e = cl.execute ();
       ck_assert_int_eq (VAPI_OK, e);
-      vapi_error_e rv = con.wait_for_response (cl);
+      vapi_error_e rv;
+      WAIT_FOR_RESPONSE (cl, rv);
       ck_assert_int_eq (VAPI_OK, rv);
       auto &rp = cl.get_response ().get_payload ();
       ck_assert_int_eq (0, rp.retval);
@@ -145,7 +153,7 @@ START_TEST (test_loopbacks_1)
     memset (p.name_filter, 0, sizeof (p.name_filter));
     auto rv = d.execute ();
     ck_assert_int_eq (VAPI_OK, rv);
-    rv = con.wait_for_response (d);
+    WAIT_FOR_RESPONSE (d, rv);
     ck_assert_int_eq (VAPI_OK, rv);
     auto &rs = d.get_result_set ();
     for (auto &r : rs)
@@ -172,7 +180,7 @@ START_TEST (test_loopbacks_1)
       dl.get_request ().get_payload ().sw_if_index = sw_if_indexes[i];
       auto rv = dl.execute ();
       ck_assert_int_eq (VAPI_OK, rv);
-      rv = con.wait_for_response (dl);
+      WAIT_FOR_RESPONSE (dl, rv);
       ck_assert_int_eq (VAPI_OK, rv);
       auto &response = dl.get_response ();
       auto rp = response.get_payload ();
@@ -187,7 +195,7 @@ START_TEST (test_loopbacks_1)
     memset (p.name_filter, 0, sizeof (p.name_filter));
     auto rv = d.execute ();
     ck_assert_int_eq (VAPI_OK, rv);
-    rv = con.wait_for_response (d);
+    WAIT_FOR_RESPONSE (d, rv);
     ck_assert_int_eq (VAPI_OK, rv);
     auto &rs = d.get_result_set ();
     for (auto &r : rs)
@@ -305,7 +313,7 @@ START_TEST (test_loopbacks_2)
   memset (p.name_filter, 0, sizeof (p.name_filter));
   auto rv = d.execute ();
   ck_assert_int_eq (VAPI_OK, rv);
-  rv = con.wait_for_response (d);
+  WAIT_FOR_RESPONSE (d, rv);
   ck_assert_int_eq (VAPI_OK, rv);
   ck_assert_int_ne (0, swdcb.called);
   std::array<Delete_loopback_cb, num_ifs> dcbs;
@@ -334,7 +342,7 @@ START_TEST (test_loopbacks_2)
     memset (p.name_filter, 0, sizeof (p.name_filter));
     auto rv = d.execute ();
     ck_assert_int_eq (VAPI_OK, rv);
-    rv = con.wait_for_response (d);
+    WAIT_FOR_RESPONSE (d, rv);
     ck_assert_int_eq (VAPI_OK, rv);
     auto &rs = d.get_result_set ();
     for (auto &r : rs)
@@ -360,7 +368,7 @@ START_TEST (test_stats_1)
   auto rv = ws.execute ();
   ck_assert_int_eq (VAPI_OK, rv);
   Event_registration<Vnet_interface_simple_counters> sc (con);
-  rv = con.wait_for_response (sc);
+  WAIT_FOR_RESPONSE (sc, rv);
   ck_assert_int_eq (VAPI_OK, rv);
   auto &rs = sc.get_result_set ();
   int count = 0;
@@ -407,7 +415,7 @@ START_TEST (test_stats_2)
   ck_assert_int_eq (VAPI_OK, rv);
   Vnet_interface_simple_counters_cb cb;
   Event_registration<Vnet_interface_simple_counters> sc (con, std::ref (cb));
-  rv = con.wait_for_response (sc);
+  WAIT_FOR_RESPONSE (sc, rv);
   ck_assert_int_eq (VAPI_OK, rv);
   ck_assert_int_ne (0, cb.called);
 }
@@ -452,7 +460,7 @@ START_TEST (test_stats_3)
   Event_registration<Vnet_interface_simple_counters> sc (con, std::ref (cb));
   for (int i = 0; i < 5; ++i)
     {
-      rv = con.wait_for_response (sc);
+      WAIT_FOR_RESPONSE (sc, rv);
     }
   ck_assert_int_eq (VAPI_OK, rv);
   ck_assert_int_eq (5, cb.called);
@@ -472,9 +480,9 @@ START_TEST (test_stats_4)
   ck_assert_int_eq (VAPI_OK, rv);
   Event_registration<Vnet_interface_simple_counters> sc (con);
   Event_registration<Vnet_interface_combined_counters> cc (con);
-  rv = con.wait_for_response (sc);
+  WAIT_FOR_RESPONSE (sc, rv);
   ck_assert_int_eq (VAPI_OK, rv);
-  rv = con.wait_for_response (cc);
+  WAIT_FOR_RESPONSE (cc, rv);
   ck_assert_int_eq (VAPI_OK, rv);
   int count = 0;
   for (auto &r : sc.get_result_set ())