wireguard: add async mode for encryption packets
[vpp.git] / src / svm / message_queue.c
index b423826..a6af796 100644 (file)
@@ -166,11 +166,20 @@ svm_msg_q_attach (svm_msg_q_t *mq, void *smq_base)
   clib_spinlock_init (&mq->q.lock);
 }
 
+void
+svm_msg_q_cleanup (svm_msg_q_t *mq)
+{
+  vec_free (mq->rings);
+  clib_spinlock_free (&mq->q.lock);
+  if (mq->q.evtfd != -1)
+    close (mq->q.evtfd);
+}
+
 void
 svm_msg_q_free (svm_msg_q_t * mq)
 {
+  svm_msg_q_cleanup (mq);
   clib_mem_free (mq->q.shr);
-  clib_spinlock_free (&mq->q.lock);
   clib_mem_free (mq);
 }
 
@@ -247,7 +256,7 @@ svm_msg_q_lock_and_alloc_msg_w_ring (svm_msg_q_t * mq, u32 ring_index,
       svm_msg_q_lock (mq);
       while (svm_msg_q_is_full (mq)
             || svm_msg_q_ring_is_full (mq, ring_index))
-       svm_msg_q_wait (mq, SVM_MQ_WAIT_FULL);
+       svm_msg_q_wait_prod (mq);
       *msg = svm_msg_q_alloc_msg_w_ring (mq, ring_index);
     }
   return 0;
@@ -371,7 +380,7 @@ svm_msg_q_add (svm_msg_q_t * mq, svm_msg_q_msg_t * msg, int nowait)
       if (nowait)
        return (-2);
       while (svm_msg_q_is_full (mq))
-       svm_msg_q_wait (mq, SVM_MQ_WAIT_FULL);
+       svm_msg_q_wait_prod (mq);
     }
 
   svm_msg_q_add_raw (mq, (u8 *) msg);
@@ -482,7 +491,7 @@ int
 svm_msg_q_alloc_eventfd (svm_msg_q_t *mq)
 {
   int fd;
-  if ((fd = eventfd (0, EFD_NONBLOCK)) < 0)
+  if ((fd = eventfd (0, 0)) < 0)
     return -1;
   svm_msg_q_set_eventfd (mq, fd);
   return 0;
@@ -530,6 +539,35 @@ svm_msg_q_wait (svm_msg_q_t *mq, svm_msg_q_wait_type_t type)
   return 0;
 }
 
+int
+svm_msg_q_wait_prod (svm_msg_q_t *mq)
+{
+  if (mq->q.evtfd == -1)
+    {
+      while (svm_msg_q_is_full (mq))
+       pthread_cond_wait (&mq->q.shr->condvar, &mq->q.shr->mutex);
+    }
+  else
+    {
+      u64 buf;
+      int rv;
+
+      while (svm_msg_q_is_full (mq))
+       {
+         while ((rv = read (mq->q.evtfd, &buf, sizeof (buf))) < 0)
+           {
+             if (errno != EAGAIN)
+               {
+                 clib_unix_warning ("read error");
+                 return rv;
+               }
+           }
+       }
+    }
+
+  return 0;
+}
+
 int
 svm_msg_q_timedwait (svm_msg_q_t *mq, double timeout)
 {
@@ -570,8 +608,13 @@ svm_msg_q_timedwait (svm_msg_q_t *mq, double timeout)
 
       tv.tv_sec = (u64) timeout;
       tv.tv_usec = ((u64) timeout - (u64) timeout) * 1e9;
-      setsockopt (mq->q.evtfd, SOL_SOCKET, SO_RCVTIMEO, (const char *) &tv,
-                 sizeof tv);
+      rv = setsockopt (mq->q.evtfd, SOL_SOCKET, SO_RCVTIMEO,
+                      (const char *) &tv, sizeof tv);
+      if (rv < 0)
+       {
+         clib_unix_warning ("setsockopt");
+         return -1;
+       }
 
       rv = read (mq->q.evtfd, &buf, sizeof (buf));
       if (rv < 0)