vcl: add apis that expos fifo as buffer
[vpp.git] / src / vcl / vppcom.c
index 1562201..df8f4ca 100644 (file)
@@ -285,6 +285,8 @@ vcl_session_accepted_handler (vcl_worker_t * wrk, session_accepted_msg_t * mp)
       vcl_wait_for_memory (session->vpp_evt_q);
       rx_fifo->master_session_index = session->session_index;
       tx_fifo->master_session_index = session->session_index;
+      rx_fifo->master_thread_index = vcl_get_worker_index ();
+      tx_fifo->master_thread_index = vcl_get_worker_index ();
       vec_validate (wrk->vpp_event_queues, 0);
       evt_q = uword_to_pointer (mp->vpp_event_queue_address, svm_msg_q_t *);
       wrk->vpp_event_queues[0] = evt_q;
@@ -295,7 +297,8 @@ vcl_session_accepted_handler (vcl_worker_t * wrk, session_accepted_msg_t * mp)
                                             svm_msg_q_t *);
       rx_fifo->client_session_index = session->session_index;
       tx_fifo->client_session_index = session->session_index;
-
+      rx_fifo->client_thread_index = vcl_get_worker_index ();
+      tx_fifo->client_thread_index = vcl_get_worker_index ();
       vpp_wrk_index = tx_fifo->master_thread_index;
       vec_validate (wrk->vpp_event_queues, vpp_wrk_index);
       wrk->vpp_event_queues[vpp_wrk_index] = session->vpp_evt_q;
@@ -349,8 +352,7 @@ vcl_session_connected_handler (vcl_worker_t * wrk,
   if (mp->retval)
     {
       clib_warning ("VCL<%d>: ERROR: sid %u: connect failed! %U", getpid (),
-                   mp->handle, session_index, format_api_error,
-                   ntohl (mp->retval));
+                   session_index, format_api_error, ntohl (mp->retval));
       session->session_state = STATE_FAILED;
       session->vpp_handle = mp->handle;
       return session_index;
@@ -361,6 +363,8 @@ vcl_session_connected_handler (vcl_worker_t * wrk,
   vcl_wait_for_memory (rx_fifo);
   rx_fifo->client_session_index = session_index;
   tx_fifo->client_session_index = session_index;
+  rx_fifo->client_thread_index = vcl_get_worker_index ();
+  tx_fifo->client_thread_index = vcl_get_worker_index ();
 
   if (mp->client_event_queue_address)
     {
@@ -648,6 +652,9 @@ vppcom_session_disconnect (u32 session_handle)
   u64 vpp_handle;
 
   session = vcl_session_get_w_handle (wrk, session_handle);
+  if (!session)
+    return VPPCOM_EBADFD;
+
   vpp_handle = session->vpp_handle;
   state = session->session_state;
 
@@ -700,6 +707,7 @@ vppcom_app_create (char *app_name)
       svm_fifo_segment_main_init (vcl_cfg->segment_baseva,
                                  20 /* timeout in secs */ );
       pool_init_fixed (vcm->workers, vcl_cfg->max_workers);
+      clib_spinlock_init (&vcm->workers_lock);
       vcl_worker_alloc_and_init ();
     }
 
@@ -1260,25 +1268,15 @@ vppcom_session_read_internal (uint32_t session_handle, void *buf, int n,
   svm_msg_q_msg_t msg;
   session_event_t *e;
   svm_msg_q_t *mq;
-  u8 is_full;
+  u8 is_ct;
 
   if (PREDICT_FALSE (!buf))
     return VPPCOM_EINVAL;
 
   s = vcl_session_get_w_handle (wrk, session_handle);
-  if (PREDICT_FALSE (!s))
+  if (PREDICT_FALSE (!s || s->is_vep))
     return VPPCOM_EBADFD;
 
-  if (PREDICT_FALSE (s->is_vep))
-    {
-      clib_warning ("VCL<%d>: ERROR: sid %u: cannot "
-                   "read from an epoll session!", getpid (), session_handle);
-      return VPPCOM_EBADFD;
-    }
-
-  is_nonblocking = VCL_SESS_ATTR_TEST (s->attr, VCL_SESS_ATTR_NONBLOCK);
-  rx_fifo = s->rx_fifo;
-
   if (PREDICT_FALSE (!vcl_session_is_readable (s)))
     {
       session_state_t state = s->session_state;
@@ -1291,18 +1289,21 @@ vppcom_session_read_internal (uint32_t session_handle, void *buf, int n,
       return rv;
     }
 
-  mq = vcl_session_is_ct (s) ? s->our_evt_q : wrk->app_event_queue;
-  svm_fifo_unset_event (rx_fifo);
-  is_full = svm_fifo_is_full (rx_fifo);
+  is_nonblocking = VCL_SESS_ATTR_TEST (s->attr, VCL_SESS_ATTR_NONBLOCK);
+  is_ct = vcl_session_is_ct (s);
+  mq = is_ct ? s->our_evt_q : wrk->app_event_queue;
+  rx_fifo = s->rx_fifo;
 
   if (svm_fifo_is_empty (rx_fifo))
     {
       if (is_nonblocking)
        {
+         svm_fifo_unset_event (rx_fifo);
          return VPPCOM_OK;
        }
-      while (1)
+      while (svm_fifo_is_empty (rx_fifo))
        {
+         svm_fifo_unset_event (rx_fifo);
          svm_msg_q_lock (mq);
          if (svm_msg_q_is_empty (mq))
            svm_msg_q_wait (mq);
@@ -1310,20 +1311,16 @@ vppcom_session_read_internal (uint32_t session_handle, void *buf, int n,
          svm_msg_q_sub_w_lock (mq, &msg);
          e = svm_msg_q_msg_data (mq, &msg);
          svm_msg_q_unlock (mq);
-         if (!vcl_is_rx_evt_for_session (e, s->session_index,
-                                         s->our_evt_q != 0))
+         if (!vcl_is_rx_evt_for_session (e, s->session_index, is_ct))
            {
              vcl_handle_mq_ctrl_event (wrk, e);
              svm_msg_q_free_msg (mq, &msg);
              continue;
            }
-         svm_fifo_unset_event (rx_fifo);
          svm_msg_q_free_msg (mq, &msg);
+
          if (PREDICT_FALSE (s->session_state == STATE_CLOSE_ON_EMPTY))
            return 0;
-         if (svm_fifo_is_empty (rx_fifo))
-           continue;
-         break;
        }
     }
 
@@ -1332,7 +1329,10 @@ vppcom_session_read_internal (uint32_t session_handle, void *buf, int n,
   else
     n_read = app_recv_stream_raw (rx_fifo, buf, n, 0, peek);
 
-  if (vcl_session_is_ct (s) && is_full)
+  if (svm_fifo_is_empty (rx_fifo))
+    svm_fifo_unset_event (rx_fifo);
+
+  if (is_ct && n_read + svm_fifo_max_dequeue (rx_fifo) == rx_fifo->nitems)
     {
       /* If the peer is not polling send notification */
       if (!svm_fifo_has_event (s->rx_fifo))
@@ -1340,17 +1340,9 @@ vppcom_session_read_internal (uint32_t session_handle, void *buf, int n,
                                SESSION_IO_EVT_CT_RX, SVM_Q_WAIT);
     }
 
-  if (VPPCOM_DEBUG > 2)
-    {
-      if (n_read > 0)
-       clib_warning ("VCL<%d>: vpp handle 0x%llx, sid %u: read %d bytes "
-                     "from (%p)", getpid (), s->vpp_handle,
-                     session_handle, n_read, rx_fifo);
-      else
-       clib_warning ("VCL<%d>: vpp handle 0x%llx, sid %u: nothing read! "
-                     "returning %d (%s)", getpid (), s->vpp_handle,
-                     session_handle, n_read, vppcom_retval_str (n_read));
-    }
+  VDBG (2, "VCL<%d>: vpp handle 0x%llx, sid %u: read %d bytes from (%p)",
+       getpid (), s->vpp_handle, session_handle, n_read, rx_fifo);
+
   return n_read;
 }
 
@@ -1366,6 +1358,93 @@ vppcom_session_peek (uint32_t session_handle, void *buf, int n)
   return (vppcom_session_read_internal (session_handle, buf, n, 1));
 }
 
+int
+vppcom_session_read_segments (uint32_t session_handle,
+                             vppcom_data_segments_t ds)
+{
+  vcl_worker_t *wrk = vcl_worker_get_current ();
+  int n_read = 0, rv, is_nonblocking;
+  vcl_session_t *s = 0;
+  svm_fifo_t *rx_fifo;
+  svm_msg_q_msg_t msg;
+  session_event_t *e;
+  svm_msg_q_t *mq;
+  u8 is_ct;
+
+  s = vcl_session_get_w_handle (wrk, session_handle);
+  if (PREDICT_FALSE (!s || s->is_vep))
+    return VPPCOM_EBADFD;
+
+  if (PREDICT_FALSE (!vcl_session_is_readable (s)))
+    {
+      session_state_t state = s->session_state;
+      rv = ((state & STATE_DISCONNECT) ? VPPCOM_ECONNRESET : VPPCOM_ENOTCONN);
+      return rv;
+    }
+
+  is_nonblocking = VCL_SESS_ATTR_TEST (s->attr, VCL_SESS_ATTR_NONBLOCK);
+  is_ct = vcl_session_is_ct (s);
+  mq = is_ct ? s->our_evt_q : wrk->app_event_queue;
+  rx_fifo = s->rx_fifo;
+
+  if (svm_fifo_is_empty (rx_fifo))
+    {
+      if (is_nonblocking)
+       {
+         svm_fifo_unset_event (rx_fifo);
+         return VPPCOM_OK;
+       }
+      while (svm_fifo_is_empty (rx_fifo))
+       {
+         svm_fifo_unset_event (rx_fifo);
+         svm_msg_q_lock (mq);
+         if (svm_msg_q_is_empty (mq))
+           svm_msg_q_wait (mq);
+
+         svm_msg_q_sub_w_lock (mq, &msg);
+         e = svm_msg_q_msg_data (mq, &msg);
+         svm_msg_q_unlock (mq);
+         if (!vcl_is_rx_evt_for_session (e, s->session_index, is_ct))
+           {
+             vcl_handle_mq_ctrl_event (wrk, e);
+             svm_msg_q_free_msg (mq, &msg);
+             continue;
+           }
+         svm_msg_q_free_msg (mq, &msg);
+
+         if (PREDICT_FALSE (s->session_state == STATE_CLOSE_ON_EMPTY))
+           return 0;
+       }
+    }
+
+  n_read = svm_fifo_segments (rx_fifo, (svm_fifo_segment_t *) ds);
+  svm_fifo_unset_event (rx_fifo);
+
+  if (is_ct && n_read + svm_fifo_max_dequeue (rx_fifo) == rx_fifo->nitems)
+    {
+      /* If the peer is not polling send notification */
+      if (!svm_fifo_has_event (s->rx_fifo))
+       app_send_io_evt_to_vpp (s->vpp_evt_q, s->rx_fifo,
+                               SESSION_IO_EVT_CT_RX, SVM_Q_WAIT);
+    }
+
+  return n_read;
+}
+
+void
+vppcom_session_free_segments (uint32_t session_handle,
+                             vppcom_data_segments_t ds)
+{
+  vcl_worker_t *wrk = vcl_worker_get_current ();
+  vcl_session_t *s;
+
+  s = vcl_session_get_w_handle (wrk, session_handle);
+  if (PREDICT_FALSE (!s || s->is_vep))
+    return;
+
+  svm_fifo_segments_free (s->rx_fifo, (svm_fifo_segment_t *) ds);
+}
+
 static inline int
 vppcom_session_read_ready (vcl_session_t * session)
 {
@@ -1397,6 +1476,19 @@ vppcom_session_read_ready (vcl_session_t * session)
   return svm_fifo_max_dequeue (session->rx_fifo);
 }
 
+int
+vppcom_data_segment_copy (void *buf, vppcom_data_segments_t ds, u32 max_bytes)
+{
+  u32 first_copy = clib_min (ds[0].len, max_bytes);
+  clib_memcpy (buf, ds[0].data, first_copy);
+  if (first_copy < max_bytes)
+    {
+      clib_memcpy (buf + first_copy, ds[1].data,
+                  clib_min (ds[1].len, max_bytes - first_copy));
+    }
+  return 0;
+}
+
 static u8
 vcl_is_tx_evt_for_session (session_event_t * e, u32 sid, u8 is_ct)
 {
@@ -1585,7 +1677,7 @@ if (PREDICT_FALSE (svm_fifo_is_empty (_fifo)))                    \
   {                                                            \
     svm_fifo_unset_event (_fifo);                              \
     if (svm_fifo_is_empty (_fifo))                             \
-       break;                                                  \
+      break;                                                   \
   }                                                            \
 
 static int
@@ -2232,6 +2324,7 @@ vcl_epoll_wait_handle_mq (vcl_worker_t * wrk, svm_msg_q_t * mq,
       switch (e->event_type)
        {
        case FIFO_EVENT_APP_RX:
+         ASSERT (e->fifo->client_thread_index == vcl_get_worker_index ());
          vcl_fifo_rx_evt_valid_or_break (e->fifo);
          sid = e->fifo->client_session_index;
          session = vcl_session_get (wrk, sid);