vcl: rx dequeue notifications with epoll
[vpp.git] / src / vcl / vppcom.c
index fd9136a..85d8081 100644 (file)
@@ -351,6 +351,39 @@ vcl_send_session_worker_update (vcl_worker_t * wrk, vcl_session_t * s,
   app_send_ctrl_evt_to_vpp (mq, app_evt);
 }
 
+int
+vcl_send_worker_rpc (u32 dst_wrk_index, void *data, u32 data_len)
+{
+  app_session_evt_t _app_evt, *app_evt = &_app_evt;
+  session_app_wrk_rpc_msg_t *mp;
+  vcl_worker_t *dst_wrk, *wrk;
+  svm_msg_q_t *mq;
+  int ret = -1;
+
+  if (data_len > sizeof (mp->data))
+    goto done;
+
+  clib_spinlock_lock (&vcm->workers_lock);
+
+  dst_wrk = vcl_worker_get_if_valid (dst_wrk_index);
+  if (!dst_wrk)
+    goto done;
+
+  wrk = vcl_worker_get_current ();
+  mq = vcl_worker_ctrl_mq (wrk);
+  app_alloc_ctrl_evt_to_vpp (mq, app_evt, SESSION_CTRL_EVT_APP_WRK_RPC);
+  mp = (session_app_wrk_rpc_msg_t *) app_evt->evt->data;
+  mp->client_index = wrk->my_client_index;
+  mp->wrk_index = dst_wrk->vpp_wrk_index;
+  clib_memcpy (mp->data, data, data_len);
+  app_send_ctrl_evt_to_vpp (mq, app_evt);
+  ret = 0;
+
+done:
+  clib_spinlock_unlock (&vcm->workers_lock);
+  return ret;
+}
+
 static u32
 vcl_session_accepted_handler (vcl_worker_t * wrk, session_accepted_msg_t * mp,
                              u32 ls_index)
@@ -866,6 +899,15 @@ vcl_session_app_del_segment_handler (vcl_worker_t * wrk, void *data)
   VDBG (1, "Unmapped segment: %d", msg->segment_handle);
 }
 
+static void
+vcl_worker_rpc_handler (vcl_worker_t * wrk, void *data)
+{
+  if (!vcm->wrk_rpc_fn)
+    return;
+
+  (vcm->wrk_rpc_fn) (((session_app_wrk_rpc_msg_t *) data)->data);
+}
+
 static int
 vcl_handle_mq_event (vcl_worker_t * wrk, session_event_t * e)
 {
@@ -923,6 +965,9 @@ vcl_handle_mq_event (vcl_worker_t * wrk, session_event_t * e)
     case SESSION_CTRL_EVT_APP_DEL_SEGMENT:
       vcl_session_app_del_segment_handler (wrk, e->data);
       break;
+    case SESSION_CTRL_EVT_APP_WRK_RPC:
+      vcl_worker_rpc_handler (wrk, e->data);
+      break;
     default:
       clib_warning ("unhandled %u", e->event_type);
     }
@@ -1896,14 +1941,23 @@ vppcom_session_read_internal (uint32_t session_handle, void *buf, int n,
     n_read = app_recv_stream_raw (rx_fifo, buf, n, 0, peek);
 
   if (svm_fifo_is_empty_cons (rx_fifo))
-    svm_fifo_unset_event (s->rx_fifo);
+    {
+      svm_fifo_unset_event (s->rx_fifo);
+      if (!svm_fifo_is_empty_cons (rx_fifo)
+         && svm_fifo_set_event (s->rx_fifo) && is_nonblocking)
+       {
+         session_event_t *e;
+         vec_add2 (wrk->unhandled_evts_vector, e, 1);
+         e->event_type = SESSION_IO_EVT_RX;
+         e->session_index = s->session_index;
+       }
+    }
 
-  /* Cut-through sessions might request tx notifications on rx fifos */
-  if (PREDICT_FALSE (rx_fifo->want_deq_ntf))
+  if (PREDICT_FALSE (svm_fifo_needs_deq_ntf (rx_fifo, n_read)))
     {
+      svm_fifo_clear_deq_ntf (rx_fifo);
       app_send_io_evt_to_vpp (s->vpp_evt_q, s->rx_fifo->master_session_index,
                              SESSION_IO_EVT_RX, SVM_Q_WAIT);
-      svm_fifo_reset_has_deq_ntf (s->rx_fifo);
     }
 
   VDBG (2, "session %u[0x%llx]: read %d bytes from (%p)", s->session_index,
@@ -2153,7 +2207,7 @@ if (PREDICT_FALSE (svm_fifo_is_empty (_s->rx_fifo)))                      \
       }                                                                        \
     else if (svm_fifo_is_empty (_s->ct_rx_fifo))                       \
       {                                                                        \
-       svm_fifo_unset_event (_s->ct_rx_fifo);                          \
+       svm_fifo_unset_event (_s->rx_fifo); /* rx evts on actual fifo*/ \
        if (svm_fifo_is_empty (_s->ct_rx_fifo))                         \
          break;                                                        \
       }                                                                        \
@@ -2259,6 +2313,9 @@ vcl_select_handle_mq_event (vcl_worker_t * wrk, session_event_t * e,
     case SESSION_CTRL_EVT_APP_DEL_SEGMENT:
       vcl_session_app_del_segment_handler (wrk, e->data);
       break;
+    case SESSION_CTRL_EVT_APP_WRK_RPC:
+      vcl_worker_rpc_handler (wrk, e->data);
+      break;
     default:
       clib_warning ("unhandled: %u", e->event_type);
       break;
@@ -2565,8 +2622,9 @@ vppcom_epoll_ctl (uint32_t vep_handle, int op, uint32_t session_handle,
 {
   vcl_worker_t *wrk = vcl_worker_get_current ();
   vcl_session_t *vep_session;
-  vcl_session_t *session;
   int rv = VPPCOM_OK;
+  vcl_session_t *s;
+  svm_fifo_t *txf;
 
   if (vep_handle == session_handle)
     {
@@ -2589,13 +2647,13 @@ vppcom_epoll_ctl (uint32_t vep_handle, int op, uint32_t session_handle,
   ASSERT (vep_session->vep.vep_sh == ~0);
   ASSERT (vep_session->vep.prev_sh == ~0);
 
-  session = vcl_session_get_w_handle (wrk, session_handle);
-  if (PREDICT_FALSE (!session))
+  s = vcl_session_get_w_handle (wrk, session_handle);
+  if (PREDICT_FALSE (!s))
     {
       VDBG (0, "Invalid session_handle (%u)!", session_handle);
       return VPPCOM_EBADFD;
     }
-  if (PREDICT_FALSE (session->is_vep))
+  if (PREDICT_FALSE (s->is_vep))
     {
       VDBG (0, "session_handle (%u) is a vep!", vep_handle);
       return VPPCOM_EINVAL;
@@ -2623,39 +2681,38 @@ vppcom_epoll_ctl (uint32_t vep_handle, int op, uint32_t session_handle,
          ASSERT (next_session->vep.prev_sh == vep_handle);
          next_session->vep.prev_sh = session_handle;
        }
-      session->vep.next_sh = vep_session->vep.next_sh;
-      session->vep.prev_sh = vep_handle;
-      session->vep.vep_sh = vep_handle;
-      session->vep.et_mask = VEP_DEFAULT_ET_MASK;
-      session->vep.ev = *event;
-      session->is_vep = 0;
-      session->is_vep_session = 1;
+      s->vep.next_sh = vep_session->vep.next_sh;
+      s->vep.prev_sh = vep_handle;
+      s->vep.vep_sh = vep_handle;
+      s->vep.et_mask = VEP_DEFAULT_ET_MASK;
+      s->vep.ev = *event;
+      s->is_vep = 0;
+      s->is_vep_session = 1;
       vep_session->vep.next_sh = session_handle;
 
-      if (session->tx_fifo)
-       svm_fifo_add_want_deq_ntf (session->tx_fifo,
-                                  SVM_FIFO_WANT_DEQ_NOTIF_IF_FULL);
+      txf = vcl_session_is_ct (s) ? s->ct_tx_fifo : s->tx_fifo;
+      if (txf && (event->events & EPOLLOUT))
+       svm_fifo_add_want_deq_ntf (txf, SVM_FIFO_WANT_DEQ_NOTIF_IF_FULL);
 
       /* Generate EPOLLOUT if tx fifo not full */
-      if ((event->events & EPOLLOUT) &&
-         (vcl_session_write_ready (session) > 0))
+      if ((event->events & EPOLLOUT) && (vcl_session_write_ready (s) > 0))
        {
          session_event_t e = { 0 };
          e.event_type = SESSION_IO_EVT_TX;
-         e.session_index = session->session_index;
+         e.session_index = s->session_index;
          vec_add1 (wrk->unhandled_evts_vector, e);
        }
       /* Generate EPOLLIN if rx fifo has data */
-      if ((event->events & EPOLLIN) && (vcl_session_read_ready (session) > 0))
+      if ((event->events & EPOLLIN) && (vcl_session_read_ready (s) > 0))
        {
          session_event_t e = { 0 };
          e.event_type = SESSION_IO_EVT_RX;
-         e.session_index = session->session_index;
+         e.session_index = s->session_index;
          vec_add1 (wrk->unhandled_evts_vector, e);
        }
       VDBG (1, "EPOLL_CTL_ADD: vep_sh %u, sh %u, events 0x%x, data 0x%llx!",
            vep_handle, session_handle, event->events, event->data.u64);
-      vcl_evt (VCL_EVT_EPOLL_CTLADD, session, event->events, event->data.u64);
+      vcl_evt (VCL_EVT_EPOLL_CTLADD, s, event->events, event->data.u64);
       break;
 
     case EPOLL_CTL_MOD:
@@ -2665,92 +2722,97 @@ vppcom_epoll_ctl (uint32_t vep_handle, int op, uint32_t session_handle,
          rv = VPPCOM_EINVAL;
          goto done;
        }
-      else if (PREDICT_FALSE (!session->is_vep_session))
+      else if (PREDICT_FALSE (!s->is_vep_session))
        {
          VDBG (0, "sh %u EPOLL_CTL_MOD: not a vep session!", session_handle);
          rv = VPPCOM_EINVAL;
          goto done;
        }
-      else if (PREDICT_FALSE (session->vep.vep_sh != vep_handle))
+      else if (PREDICT_FALSE (s->vep.vep_sh != vep_handle))
        {
          VDBG (0, "EPOLL_CTL_MOD: sh %u vep_sh (%u) != vep_sh (%u)!",
-               session_handle, session->vep.vep_sh, vep_handle);
+               session_handle, s->vep.vep_sh, vep_handle);
          rv = VPPCOM_EINVAL;
          goto done;
        }
 
       /* Generate EPOLLOUT when tx_fifo/ct_tx_fifo not full */
       if ((event->events & EPOLLOUT) &&
-         !(session->vep.ev.events & EPOLLOUT) &&
-         (vcl_session_write_ready (session) > 0))
+         !(s->vep.ev.events & EPOLLOUT) && (vcl_session_write_ready (s) > 0))
        {
          session_event_t e = { 0 };
          e.event_type = SESSION_IO_EVT_TX;
-         e.session_index = session->session_index;
+         e.session_index = s->session_index;
          vec_add1 (wrk->unhandled_evts_vector, e);
        }
-      session->vep.et_mask = VEP_DEFAULT_ET_MASK;
-      session->vep.ev = *event;
+      s->vep.et_mask = VEP_DEFAULT_ET_MASK;
+      s->vep.ev = *event;
+      txf = vcl_session_is_ct (s) ? s->ct_tx_fifo : s->tx_fifo;
+      if (event->events & EPOLLOUT)
+       svm_fifo_add_want_deq_ntf (txf, SVM_FIFO_WANT_DEQ_NOTIF_IF_FULL);
+      else
+       svm_fifo_del_want_deq_ntf (txf, SVM_FIFO_WANT_DEQ_NOTIF_IF_FULL);
       VDBG (1, "EPOLL_CTL_MOD: vep_sh %u, sh %u, events 0x%x, data 0x%llx!",
            vep_handle, session_handle, event->events, event->data.u64);
       break;
 
     case EPOLL_CTL_DEL:
-      if (PREDICT_FALSE (!session->is_vep_session))
+      if (PREDICT_FALSE (!s->is_vep_session))
        {
          VDBG (0, "EPOLL_CTL_DEL: %u not a vep session!", session_handle);
          rv = VPPCOM_EINVAL;
          goto done;
        }
-      else if (PREDICT_FALSE (session->vep.vep_sh != vep_handle))
+      else if (PREDICT_FALSE (s->vep.vep_sh != vep_handle))
        {
          VDBG (0, "EPOLL_CTL_DEL: sh %u vep_sh (%u) != vep_sh (%u)!",
-               session_handle, session->vep.vep_sh, vep_handle);
+               session_handle, s->vep.vep_sh, vep_handle);
          rv = VPPCOM_EINVAL;
          goto done;
        }
 
-      if (session->vep.prev_sh == vep_handle)
-       vep_session->vep.next_sh = session->vep.next_sh;
+      if (s->vep.prev_sh == vep_handle)
+       vep_session->vep.next_sh = s->vep.next_sh;
       else
        {
          vcl_session_t *prev_session;
-         prev_session = vcl_session_get_w_handle (wrk, session->vep.prev_sh);
+         prev_session = vcl_session_get_w_handle (wrk, s->vep.prev_sh);
          if (PREDICT_FALSE (!prev_session))
            {
              VDBG (0, "EPOLL_CTL_DEL: Invalid prev_sh (%u) on sh (%u)!",
-                   session->vep.prev_sh, session_handle);
+                   s->vep.prev_sh, session_handle);
              return VPPCOM_EBADFD;
            }
          ASSERT (prev_session->vep.next_sh == session_handle);
-         prev_session->vep.next_sh = session->vep.next_sh;
+         prev_session->vep.next_sh = s->vep.next_sh;
        }
-      if (session->vep.next_sh != ~0)
+      if (s->vep.next_sh != ~0)
        {
          vcl_session_t *next_session;
-         next_session = vcl_session_get_w_handle (wrk, session->vep.next_sh);
+         next_session = vcl_session_get_w_handle (wrk, s->vep.next_sh);
          if (PREDICT_FALSE (!next_session))
            {
              VDBG (0, "EPOLL_CTL_DEL: Invalid next_sh (%u) on sh (%u)!",
-                   session->vep.next_sh, session_handle);
+                   s->vep.next_sh, session_handle);
              return VPPCOM_EBADFD;
            }
          ASSERT (next_session->vep.prev_sh == session_handle);
-         next_session->vep.prev_sh = session->vep.prev_sh;
+         next_session->vep.prev_sh = s->vep.prev_sh;
        }
 
-      memset (&session->vep, 0, sizeof (session->vep));
-      session->vep.next_sh = ~0;
-      session->vep.prev_sh = ~0;
-      session->vep.vep_sh = ~0;
-      session->is_vep_session = 0;
+      memset (&s->vep, 0, sizeof (s->vep));
+      s->vep.next_sh = ~0;
+      s->vep.prev_sh = ~0;
+      s->vep.vep_sh = ~0;
+      s->is_vep_session = 0;
 
-      if (session->tx_fifo)
-       svm_fifo_del_want_deq_ntf (session->tx_fifo, SVM_FIFO_NO_DEQ_NOTIF);
+      txf = vcl_session_is_ct (s) ? s->ct_tx_fifo : s->tx_fifo;
+      if (txf)
+       svm_fifo_del_want_deq_ntf (txf, SVM_FIFO_WANT_DEQ_NOTIF_IF_FULL);
 
       VDBG (1, "EPOLL_CTL_DEL: vep_idx %u, sh %u!", vep_handle,
            session_handle);
-      vcl_evt (VCL_EVT_EPOLL_CTLDEL, session, vep_sh);
+      vcl_evt (VCL_EVT_EPOLL_CTLDEL, s, vep_sh);
       break;
 
     default:
@@ -2802,7 +2864,8 @@ vcl_epoll_wait_handle_mq_event (vcl_worker_t * wrk, session_event_t * e,
       add_event = 1;
       events[*num_ev].events |= EPOLLOUT;
       session_evt_data = session->vep.ev.data.u64;
-      svm_fifo_reset_has_deq_ntf (session->tx_fifo);
+      svm_fifo_reset_has_deq_ntf (vcl_session_is_ct (session) ?
+                                 session->ct_tx_fifo : session->tx_fifo);
       break;
     case SESSION_CTRL_EVT_ACCEPTED:
       session = vcl_session_accepted (wrk,
@@ -2875,6 +2938,9 @@ vcl_epoll_wait_handle_mq_event (vcl_worker_t * wrk, session_event_t * e,
     case SESSION_CTRL_EVT_APP_DEL_SEGMENT:
       vcl_session_app_del_segment_handler (wrk, e->data);
       break;
+    case SESSION_CTRL_EVT_APP_WRK_RPC:
+      vcl_worker_rpc_handler (wrk, e->data);
+      break;
     default:
       VDBG (0, "unhandled: %u", e->event_type);
       break;
@@ -3081,8 +3147,10 @@ vppcom_session_attr (uint32_t session_handle, uint32_t op,
     case VPPCOM_ATTR_GET_FLAGS:
       if (PREDICT_TRUE (buffer && buflen && (*buflen >= sizeof (*flags))))
        {
-         *flags = O_RDWR | (VCL_SESS_ATTR_TEST (session->attr,
-                                                VCL_SESS_ATTR_NONBLOCK));
+         *flags =
+           O_RDWR |
+           (VCL_SESS_ATTR_TEST (session->attr, VCL_SESS_ATTR_NONBLOCK) ?
+            O_NONBLOCK : 0);
          *buflen = sizeof (*flags);
          VDBG (2, "VPPCOM_ATTR_GET_FLAGS: sh %u, flags = 0x%08x, "
                "is_nonblocking = %u", session_handle, *flags,