vcl: add read/write udp support 83/13783/8
authorFlorin Coras <fcoras@cisco.com>
Fri, 27 Jul 2018 12:45:06 +0000 (05:45 -0700)
committerDave Barach <openvpp@barachs.net>
Tue, 31 Jul 2018 11:36:54 +0000 (11:36 +0000)
Change-Id: Ie6171c12055cde6915856de340839f5da1b1b1da
Signed-off-by: Florin Coras <fcoras@cisco.com>
src/vcl/vcl_bapi.c
src/vcl/vcl_cfg.c
src/vcl/vcl_private.h
src/vcl/vcl_test_server.c
src/vcl/vppcom.c
src/vnet/session-apps/echo_server.c
src/vnet/session/application.c
src/vnet/session/application_interface.h
src/vnet/session/session.h

index 0201cd8..311df64 100644 (file)
@@ -132,6 +132,7 @@ vl_api_map_another_segment_t_handler (vl_api_map_another_segment_t * mp)
   svm_fifo_segment_create_args_t *a = &_a;
   int rv;
 
+  vcm->mounting_segment = 1;
   memset (a, 0, sizeof (*a));
   a->segment_name = (char *) mp->segment_name;
   a->segment_size = mp->segment_size;
@@ -147,6 +148,7 @@ vl_api_map_another_segment_t_handler (vl_api_map_another_segment_t * mp)
 
   VDBG (1, "VCL<%d>: mapped new segment '%s' size %d", getpid (),
        mp->segment_name, mp->segment_size);
+  vcm->mounting_segment = 0;
 }
 
 static void
@@ -356,6 +358,18 @@ done:
   vppcom_session_table_add_listener (mp->handle, session_index);
   session->session_state = STATE_LISTEN;
 
+  if (session->is_dgram)
+    {
+      svm_fifo_t *rx_fifo, *tx_fifo;
+      session->vpp_evt_q = uword_to_pointer (mp->vpp_evt_q, svm_msg_q_t *);
+      rx_fifo = uword_to_pointer (mp->rx_fifo, svm_fifo_t *);
+      rx_fifo->client_session_index = session_index;
+      tx_fifo = uword_to_pointer (mp->tx_fifo, svm_fifo_t *);
+      tx_fifo->client_session_index = session_index;
+      session->rx_fifo = rx_fifo;
+      session->tx_fifo = tx_fifo;
+    }
+
   VDBG (1, "VCL<%d>: vpp handle 0x%llx, sid %u: bind succeeded!",
        getpid (), mp->handle, mp->context);
 done_unlock:
index 279a975..6e6a0ac 100644 (file)
@@ -21,6 +21,7 @@
  */
 static vppcom_main_t _vppcom_main = {
   .debug = VPPCOM_DEBUG_INIT,
+  .init = 0,
   .my_client_index = ~0
 };
 
index 327a7fc..af58cdc 100644 (file)
@@ -237,6 +237,9 @@ typedef struct vppcom_main_t_
   /** Pool of cut through registrations */
   vcl_cut_through_registration_t *cut_through_registrations;
 
+  /** Flag indicating that a new segment is being mounted */
+  volatile u32 mounting_segment;
+
 #ifdef VCL_ELOG
   /* VPP Event-logger */
   elog_main_t elog_main;
@@ -313,6 +316,12 @@ vcl_session_get_index_from_handle (u64 handle)
   return VCL_INVALID_SESSION_INDEX;
 }
 
+static inline u8
+vcl_session_is_ct (vcl_session_t * s)
+{
+  return (s->our_evt_q != 0);
+}
+
 static inline int
 vppcom_session_at_index (u32 session_index, vcl_session_t * volatile *sess)
 {
index 6a2fda0..b49383e 100644 (file)
@@ -425,14 +425,17 @@ main (int argc, char **argv)
       return -1;
     }
 
-  rv = vppcom_session_listen (ssm->listen_fd, 10);
-  if (rv < 0)
+  if (!ssm->cfg.transport_udp)
     {
-      errno_val = errno = -rv;
-      perror ("ERROR in main()");
-      fprintf (stderr, "SERVER: ERROR: listen failed "
-              "(errno = %d)!\n", errno_val);
-      return -1;
+      rv = vppcom_session_listen (ssm->listen_fd, 10);
+      if (rv < 0)
+       {
+         errno_val = errno = -rv;
+         perror ("ERROR in main()");
+         fprintf (stderr, "SERVER: ERROR: listen failed "
+                  "(errno = %d)!\n", errno_val);
+         return -1;
+       }
     }
 
   ssm->epfd = vppcom_epoll_create ();
index d1c4413..a74e55a 100644 (file)
@@ -33,9 +33,15 @@ static void
 vcl_wait_for_memory (void *mem)
 {
   u8 __clib_unused test;
+  if (vcm->mounting_segment)
+    {
+      while (vcm->mounting_segment)
+       ;
+      return;
+    }
   if (1 || vcm->debug)
     {
-      sleep (1);
+      usleep (1e5);
       return;
     }
   if (signal (SIGSEGV, sigsegv_signal))
@@ -359,6 +365,8 @@ vcl_session_accepted_handler (session_accepted_msg_t * mp)
   hash_set (vcm->session_index_by_vpp_handles, mp->handle, session_index);
   session->transport.lcl_port = listen_session->transport.lcl_port;
   session->transport.lcl_ip = listen_session->transport.lcl_ip;
+  session->session_type = listen_session->session_type;
+  session->is_dgram = session->session_type == VPPCOM_PROTO_UDP;
 
   VDBG (1, "VCL<%d>: vpp handle 0x%llx, sid %u: client accept request from %s"
        " address %U port %d queue %p!", getpid (), mp->handle, session_index,
@@ -405,13 +413,18 @@ done:
   if (rv)
     goto done_unlock;
 
+  rx_fifo = uword_to_pointer (mp->server_rx_fifo, svm_fifo_t *);
+  tx_fifo = uword_to_pointer (mp->server_tx_fifo, svm_fifo_t *);
+  vcl_wait_for_memory (rx_fifo);
+  rx_fifo->client_session_index = session_index;
+  tx_fifo->client_session_index = session_index;
+
   if (mp->client_event_queue_address)
     {
       session->vpp_evt_q = uword_to_pointer (mp->server_event_queue_address,
                                             svm_msg_q_t *);
       session->our_evt_q = uword_to_pointer (mp->client_event_queue_address,
                                             svm_msg_q_t *);
-      vcl_wait_for_memory (session->vpp_evt_q);
       session->ct_registration = vcl_ct_registration_add (session->our_evt_q,
                                                          session_index);
     }
@@ -419,11 +432,6 @@ done:
     session->vpp_evt_q = uword_to_pointer (mp->vpp_event_queue_address,
                                           svm_msg_q_t *);
 
-  rx_fifo = uword_to_pointer (mp->server_rx_fifo, svm_fifo_t *);
-  rx_fifo->client_session_index = session_index;
-  tx_fifo = uword_to_pointer (mp->server_tx_fifo, svm_fifo_t *);
-  tx_fifo->client_session_index = session_index;
-
   session->rx_fifo = rx_fifo;
   session->tx_fifo = tx_fifo;
   session->vpp_handle = mp->handle;
@@ -762,11 +770,10 @@ vppcom_session_create (u8 proto, u8 is_nonblocking)
   session->session_type = proto;
   session->session_state = STATE_START;
   session->vpp_handle = ~0;
+  session->is_dgram = proto == VPPCOM_PROTO_UDP;
 
   if (is_nonblocking)
     VCL_SESS_ATTR_SET (session->attr, VCL_SESS_ATTR_NONBLOCK);
-  else
-    VCL_SESS_ATTR_CLR (session->attr, VCL_SESS_ATTR_NONBLOCK);
 
   vcl_evt (VCL_EVT_CREATE, session, session_type, session->session_state,
           is_nonblocking, session_index);
@@ -929,6 +936,10 @@ vppcom_session_bind (uint32_t session_index, vppcom_endpt_t * ep)
        session->session_type ? "UDP" : "TCP");
   vcl_evt (VCL_EVT_BIND, session);
   VCL_SESSION_UNLOCK ();
+
+  if (session->session_type == VPPCOM_PROTO_UDP)
+    vppcom_session_listen (session_index, 10);
+
 done:
   return rv;
 }
@@ -1225,12 +1236,20 @@ vcl_is_rx_evt_for_session (session_event_t * e, u32 sid, u8 is_ct)
     return (e->event_type == SESSION_IO_EVT_CT_TX);
 }
 
+static inline u8
+vcl_session_is_readable (vcl_session_t * s)
+{
+  return ((s->session_state & STATE_OPEN)
+         || (s->session_state == STATE_LISTEN
+             && s->session_type == VPPCOM_PROTO_UDP));
+}
+
 static inline int
 vppcom_session_read_internal (uint32_t session_index, void *buf, int n,
                              u8 peek)
 {
   int n_read = 0, rv, is_nonblocking;
-  vcl_session_t *session = 0;
+  vcl_session_t *s = 0;
   svm_fifo_t *rx_fifo;
   svm_msg_q_msg_t msg;
   session_event_t *e;
@@ -1239,9 +1258,9 @@ vppcom_session_read_internal (uint32_t session_index, void *buf, int n,
 
   ASSERT (buf);
 
-  VCL_SESSION_LOCK_AND_GET (session_index, &session);
+  VCL_SESSION_LOCK_AND_GET (session_index, &s);
 
-  if (PREDICT_FALSE (session->is_vep))
+  if (PREDICT_FALSE (s->is_vep))
     {
       VCL_SESSION_UNLOCK ();
       clib_warning ("VCL<%d>: ERROR: sid %u: cannot "
@@ -1250,24 +1269,24 @@ vppcom_session_read_internal (uint32_t session_index, void *buf, int n,
       goto done;
     }
 
-  is_nonblocking = VCL_SESS_ATTR_TEST (session->attr, VCL_SESS_ATTR_NONBLOCK);
-  rx_fifo = session->rx_fifo;
+  is_nonblocking = VCL_SESS_ATTR_TEST (s->attr, VCL_SESS_ATTR_NONBLOCK);
+  rx_fifo = s->rx_fifo;
 
-  if (PREDICT_FALSE (!(session->session_state & STATE_OPEN)))
+  if (PREDICT_FALSE (!vcl_session_is_readable (s)))
     {
-      session_state_t state = session->session_state;
+      session_state_t state = s->session_state;
       VCL_SESSION_UNLOCK ();
       rv = ((state & STATE_DISCONNECT) ? VPPCOM_ECONNRESET : VPPCOM_ENOTCONN);
 
       VDBG (0, "VCL<%d>: vpp handle 0x%llx, sid %u: %s session is not open! "
            "state 0x%x (%s), returning %d (%s)",
-           getpid (), session->vpp_handle, session_index, state,
+           getpid (), s->vpp_handle, session_index, state,
            vppcom_session_state_str (state), rv, vppcom_retval_str (rv));
       goto done;
     }
 
   VCL_SESSION_UNLOCK ();
-  mq = session->our_evt_q ? session->our_evt_q : vcm->app_event_queue;
+  mq = vcl_session_is_ct (s) ? s->our_evt_q : vcm->app_event_queue;
   is_full = svm_fifo_is_full (rx_fifo);
 
   if (svm_fifo_is_empty (rx_fifo))
@@ -1286,7 +1305,7 @@ vppcom_session_read_internal (uint32_t session_index, void *buf, int n,
          svm_msg_q_sub_w_lock (mq, &msg);
          e = svm_msg_q_msg_data (mq, &msg);
          if (!vcl_is_rx_evt_for_session (e, session_index,
-                                         session->our_evt_q != 0))
+                                         s->our_evt_q != 0))
            {
              vcl_handle_mq_ctrl_event (e);
              svm_msg_q_free_msg (mq, &msg);
@@ -1303,27 +1322,24 @@ vppcom_session_read_internal (uint32_t session_index, void *buf, int n,
        }
     }
 
-  if (peek)
-    n_read = svm_fifo_peek (rx_fifo, 0, n, buf);
+  if (s->is_dgram)
+    n_read = app_recv_dgram_raw (rx_fifo, buf, n, &s->transport, 1, peek);
   else
-    n_read = svm_fifo_dequeue_nowait (rx_fifo, n, buf);
-  ASSERT (n_read > 0);
-  svm_fifo_unset_event (rx_fifo);
+    n_read = app_recv_stream_raw (rx_fifo, buf, n, 1, peek);
 
-  if (session->our_evt_q && is_full)
-    app_send_io_evt_to_vpp (session->vpp_evt_q, rx_fifo, SESSION_IO_EVT_CT_RX,
+  if (vcl_session_is_ct (s) && is_full)
+    app_send_io_evt_to_vpp (s->vpp_evt_q, rx_fifo, SESSION_IO_EVT_CT_RX,
                            SVM_Q_WAIT);
 
-
   if (VPPCOM_DEBUG > 2)
     {
       if (n_read > 0)
        clib_warning ("VCL<%d>: vpp handle 0x%llx, sid %u: read %d bytes "
-                     "from (%p)", getpid (), session->vpp_handle,
+                     "from (%p)", getpid (), s->vpp_handle,
                      session_index, n_read, rx_fifo);
       else
        clib_warning ("VCL<%d>: vpp handle 0x%llx, sid %u: nothing read! "
-                     "returning %d (%s)", getpid (), session->vpp_handle,
+                     "returning %d (%s)", getpid (), s->vpp_handle,
                      session_index, rv, vppcom_retval_str (rv));
     }
   return n_read;
@@ -1389,45 +1405,46 @@ int
 vppcom_session_write (uint32_t session_index, void *buf, size_t n)
 {
   int rv, n_write, is_nonblocking;
-  vcl_session_t *session = 0;
+  vcl_session_t *s = 0;
   svm_fifo_t *tx_fifo = 0;
+  session_evt_type_t et;
   svm_msg_q_msg_t msg;
   session_event_t *e;
   svm_msg_q_t *mq;
 
   ASSERT (buf);
 
-  VCL_SESSION_LOCK_AND_GET (session_index, &session);
+  VCL_SESSION_LOCK_AND_GET (session_index, &s);
 
-  tx_fifo = session->tx_fifo;
-  is_nonblocking = VCL_SESS_ATTR_TEST (session->attr, VCL_SESS_ATTR_NONBLOCK);
+  tx_fifo = s->tx_fifo;
+  is_nonblocking = VCL_SESS_ATTR_TEST (s->attr, VCL_SESS_ATTR_NONBLOCK);
 
-  if (PREDICT_FALSE (session->is_vep))
+  if (PREDICT_FALSE (s->is_vep))
     {
       VCL_SESSION_UNLOCK ();
       clib_warning ("VCL<%d>: ERROR: vpp handle 0x%llx, sid %u: "
                    "cannot write to an epoll session!",
-                   getpid (), session->vpp_handle, session_index);
+                   getpid (), s->vpp_handle, session_index);
 
       rv = VPPCOM_EBADFD;
       goto done;
     }
 
-  if (!(session->session_state & STATE_OPEN))
+  if (!(s->session_state & STATE_OPEN))
     {
-      session_state_t state = session->session_state;
+      session_state_t state = s->session_state;
       rv = ((state & STATE_DISCONNECT) ? VPPCOM_ECONNRESET : VPPCOM_ENOTCONN);
       VCL_SESSION_UNLOCK ();
       VDBG (1, "VCL<%d>: vpp handle 0x%llx, sid %u: session is not open! "
            "state 0x%x (%s)",
-           getpid (), session->vpp_handle, session_index,
+           getpid (), s->vpp_handle, session_index,
            state, vppcom_session_state_str (state));
       goto done;
     }
 
   VCL_SESSION_UNLOCK ();
 
-  mq = session->our_evt_q ? session->our_evt_q : vcm->app_event_queue;
+  mq = vcl_session_is_ct (s) ? s->our_evt_q : vcm->app_event_queue;
   if (svm_fifo_is_full (tx_fifo))
     {
       if (is_nonblocking)
@@ -1448,7 +1465,7 @@ vppcom_session_write (uint32_t session_index, void *buf, size_t n)
          svm_msg_q_sub_w_lock (mq, &msg);
          e = svm_msg_q_msg_data (mq, &msg);
          if (!vcl_is_tx_evt_for_session (e, session_index,
-                                         session->our_evt_q != 0))
+                                         s->our_evt_q != 0))
            {
              vcl_handle_mq_ctrl_event (e);
              svm_msg_q_free_msg (mq, &msg);
@@ -1465,31 +1482,27 @@ vppcom_session_write (uint32_t session_index, void *buf, size_t n)
        }
     }
 
-  n_write = svm_fifo_enqueue_nowait (tx_fifo, n, (void *) buf);
-  ASSERT (n_write > 0);
+  ASSERT (FIFO_EVENT_APP_TX + 1 == SESSION_IO_EVT_CT_TX);
+  et = FIFO_EVENT_APP_TX + vcl_session_is_ct (s);
+  if (s->is_dgram)
+    n_write = app_send_dgram_raw (tx_fifo, &s->transport,
+                                 s->vpp_evt_q, buf, n, et, SVM_Q_WAIT);
+  else
+    n_write = app_send_stream_raw (tx_fifo, s->vpp_evt_q, buf, n, et,
+                                  SVM_Q_WAIT);
 
-  if (svm_fifo_set_event (tx_fifo))
-    {
-      session_evt_type_t et;
-      VCL_SESSION_LOCK_AND_GET (session_index, &session);
-      et = session->our_evt_q ? SESSION_IO_EVT_CT_TX : FIFO_EVENT_APP_TX;
-      app_send_io_evt_to_vpp (session->vpp_evt_q, tx_fifo, et, SVM_Q_WAIT);
-      VCL_SESSION_UNLOCK ();
-      VDBG (1, "VCL<%d>: vpp handle 0x%llx, sid %u: added FIFO_EVENT_APP_TX "
-           "to vpp_event_q %p, n_write %d", getpid (),
-           session->vpp_handle, session_index, session->vpp_evt_q, n_write);
-    }
+  ASSERT (n_write > 0);
 
   if (VPPCOM_DEBUG > 2)
     {
       if (n_write <= 0)
        clib_warning ("VCL<%d>: vpp handle 0x%llx, sid %u: "
-                     "FIFO-FULL (%p)", getpid (), session->vpp_handle,
+                     "FIFO-FULL (%p)", getpid (), s->vpp_handle,
                      session_index, tx_fifo);
       else
        clib_warning ("VCL<%d>: vpp handle 0x%llx, sid %u: "
                      "wrote %d bytes tx-fifo: (%p)", getpid (),
-                     session->vpp_handle, session_index, n_write, tx_fifo);
+                     s->vpp_handle, session_index, n_write, tx_fifo);
     }
   return n_write;
 
@@ -2885,18 +2898,11 @@ vppcom_session_recvfrom (uint32_t session_index, void *buffer,
          VCL_SESSION_UNLOCK ();
          VDBG (0, "VCL<%d>: invalid session, sid (%u) has been closed!",
                getpid (), session_index);
-         rv = VPPCOM_EBADFD;
          VCL_SESSION_UNLOCK ();
-         goto done;
+         return VPPCOM_EBADFD;
        }
       ep->is_ip4 = session->transport.is_ip4;
       ep->port = session->transport.rmt_port;
-      if (session->transport.is_ip4)
-       clib_memcpy (ep->ip, &session->transport.rmt_ip.ip4,
-                    sizeof (ip4_address_t));
-      else
-       clib_memcpy (ep->ip, &session->transport.rmt_ip.ip6,
-                    sizeof (ip6_address_t));
       VCL_SESSION_UNLOCK ();
     }
 
@@ -2908,10 +2914,16 @@ vppcom_session_recvfrom (uint32_t session_index, void *buffer,
     {
       clib_warning ("VCL<%d>: Unsupport flags for recvfrom %d",
                    getpid (), flags);
-      rv = VPPCOM_EAFNOSUPPORT;
+      return VPPCOM_EAFNOSUPPORT;
     }
 
-done:
+  if (session->transport.is_ip4)
+    clib_memcpy (ep->ip, &session->transport.rmt_ip.ip4,
+                sizeof (ip4_address_t));
+  else
+    clib_memcpy (ep->ip, &session->transport.rmt_ip.ip6,
+                sizeof (ip6_address_t));
+
   return rv;
 }
 
index 770f4ba..14ab36d 100644 (file)
@@ -211,14 +211,16 @@ echo_server_rx_callback (stream_session_t * s)
       actual_transfer = app_recv_stream_raw (rx_fifo,
                                             esm->rx_buf[thread_index],
                                             max_transfer,
-                                            0 /* don't clear event */ );
+                                            0 /* don't clear event */ ,
+                                            0 /* peek */ );
     }
   else
     {
       actual_transfer = app_recv_dgram_raw (rx_fifo,
                                            esm->rx_buf[thread_index],
                                            max_transfer, &at,
-                                           0 /* don't clear event */ );
+                                           0 /* don't clear event */ ,
+                                           0 /* peek */ );
     }
   ASSERT (actual_transfer == max_transfer);
   /* test_bytes (esm, actual_transfer); */
@@ -232,14 +234,14 @@ echo_server_rx_callback (stream_session_t * s)
       n_written = app_send_stream_raw (tx_fifo,
                                       esm->vpp_queue[thread_index],
                                       esm->rx_buf[thread_index],
-                                      actual_transfer, 0);
+                                      actual_transfer, FIFO_EVENT_APP_TX, 0);
     }
   else
     {
       n_written = app_send_dgram_raw (tx_fifo, &at,
                                      esm->vpp_queue[s->thread_index],
                                      esm->rx_buf[thread_index],
-                                     actual_transfer, 0);
+                                     actual_transfer, FIFO_EVENT_APP_TX, 0);
     }
 
   if (n_written != max_transfer)
index 99f0dff..757e12e 100644 (file)
@@ -841,7 +841,8 @@ app_send_io_evt_rx (application_t * app, stream_session_t * s, u8 lock)
   svm_msg_q_msg_t msg;
   svm_msg_q_t *mq;
 
-  if (PREDICT_FALSE (s->session_state != SESSION_STATE_READY))
+  if (PREDICT_FALSE (s->session_state != SESSION_STATE_READY
+                    && s->session_state != SESSION_STATE_LISTENING))
     {
       /* Session is closed so app will never clean up. Flush rx fifo */
       if (s->session_state == SESSION_STATE_CLOSED)
@@ -917,8 +918,9 @@ app_send_io_evt_tx (application_t * app, stream_session_t * s, u8 lock)
 typedef int (app_send_evt_handler_fn) (application_t *app,
                                       stream_session_t *s,
                                       u8 lock);
-static app_send_evt_handler_fn * const app_send_evt_handler_fns[2] = {
+static app_send_evt_handler_fn * const app_send_evt_handler_fns[3] = {
     app_send_io_evt_rx,
+    0,
     app_send_io_evt_tx,
 };
 /* *INDENT-ON* */
index ffe2a64..daba169 100644 (file)
@@ -354,7 +354,8 @@ app_send_io_evt_to_vpp (svm_msg_q_t * mq, svm_fifo_t * f, u8 evt_type,
 
 always_inline int
 app_send_dgram_raw (svm_fifo_t * f, app_session_transport_t * at,
-                   svm_msg_q_t * vpp_evt_q, u8 * data, u32 len, u8 noblock)
+                   svm_msg_q_t * vpp_evt_q, u8 * data, u32 len, u8 evt_type,
+                   u8 noblock)
 {
   u32 max_enqueue, actual_write;
   session_dgram_hdr_t hdr;
@@ -379,7 +380,7 @@ app_send_dgram_raw (svm_fifo_t * f, app_session_transport_t * at,
   if ((rv = svm_fifo_enqueue_nowait (f, actual_write, data)) > 0)
     {
       if (svm_fifo_set_event (f))
-       app_send_io_evt_to_vpp (vpp_evt_q, f, FIFO_EVENT_APP_TX, noblock);
+       app_send_io_evt_to_vpp (vpp_evt_q, f, evt_type, noblock);
     }
   ASSERT (rv);
   return rv;
@@ -389,19 +390,19 @@ always_inline int
 app_send_dgram (app_session_t * s, u8 * data, u32 len, u8 noblock)
 {
   return app_send_dgram_raw (s->tx_fifo, &s->transport, s->vpp_evt_q, data,
-                            len, noblock);
+                            len, FIFO_EVENT_APP_TX, noblock);
 }
 
 always_inline int
 app_send_stream_raw (svm_fifo_t * f, svm_msg_q_t * vpp_evt_q, u8 * data,
-                    u32 len, u8 noblock)
+                    u32 len, u8 evt_type, u8 noblock)
 {
   int rv;
 
   if ((rv = svm_fifo_enqueue_nowait (f, len, data)) > 0)
     {
       if (svm_fifo_set_event (f))
-       app_send_io_evt_to_vpp (vpp_evt_q, f, FIFO_EVENT_APP_TX, noblock);
+       app_send_io_evt_to_vpp (vpp_evt_q, f, evt_type, noblock);
     }
   return rv;
 }
@@ -409,7 +410,8 @@ app_send_stream_raw (svm_fifo_t * f, svm_msg_q_t * vpp_evt_q, u8 * data,
 always_inline int
 app_send_stream (app_session_t * s, u8 * data, u32 len, u8 noblock)
 {
-  return app_send_stream_raw (s->tx_fifo, s->vpp_evt_q, data, len, noblock);
+  return app_send_stream_raw (s->tx_fifo, s->vpp_evt_q, data, len,
+                             FIFO_EVENT_APP_TX, noblock);
 }
 
 always_inline int
@@ -422,17 +424,22 @@ app_send (app_session_t * s, u8 * data, u32 len, u8 noblock)
 
 always_inline int
 app_recv_dgram_raw (svm_fifo_t * f, u8 * buf, u32 len,
-                   app_session_transport_t * at, u8 clear_evt)
+                   app_session_transport_t * at, u8 clear_evt, u8 peek)
 {
   session_dgram_pre_hdr_t ph;
   u32 max_deq;
   int rv;
 
-  if (clear_evt)
-    svm_fifo_unset_event (f);
   max_deq = svm_fifo_max_dequeue (f);
   if (max_deq < sizeof (session_dgram_hdr_t))
-    return 0;
+    {
+      if (clear_evt)
+       svm_fifo_unset_event (f);
+      return 0;
+    }
+
+  if (clear_evt)
+    svm_fifo_unset_event (f);
 
   svm_fifo_peek (f, 0, sizeof (ph), (u8 *) & ph);
   ASSERT (ph.data_length >= ph.data_offset);
@@ -440,6 +447,8 @@ app_recv_dgram_raw (svm_fifo_t * f, u8 * buf, u32 len,
     svm_fifo_peek (f, sizeof (ph), sizeof (*at), (u8 *) at);
   len = clib_min (len, ph.data_length - ph.data_offset);
   rv = svm_fifo_peek (f, ph.data_offset + SESSION_CONN_HDR_LEN, len, buf);
+  if (peek)
+    return rv;
   ph.data_offset += rv;
   if (ph.data_offset == ph.data_length)
     svm_fifo_dequeue_drop (f, ph.data_length + SESSION_CONN_HDR_LEN);
@@ -451,21 +460,25 @@ app_recv_dgram_raw (svm_fifo_t * f, u8 * buf, u32 len,
 always_inline int
 app_recv_dgram (app_session_t * s, u8 * buf, u32 len)
 {
-  return app_recv_dgram_raw (s->rx_fifo, buf, len, &s->transport, 1);
+  return app_recv_dgram_raw (s->rx_fifo, buf, len, &s->transport, 1, 0);
 }
 
 always_inline int
-app_recv_stream_raw (svm_fifo_t * f, u8 * buf, u32 len, u8 clear_evt)
+app_recv_stream_raw (svm_fifo_t * f, u8 * buf, u32 len, u8 clear_evt, u8 peek)
 {
   if (clear_evt)
     svm_fifo_unset_event (f);
+
+  if (peek)
+    return svm_fifo_peek (f, 0, len, buf);
+
   return svm_fifo_dequeue_nowait (f, len, buf);
 }
 
 always_inline int
 app_recv_stream (app_session_t * s, u8 * buf, u32 len)
 {
-  return app_recv_stream_raw (s->rx_fifo, buf, len, 1);
+  return app_recv_stream_raw (s->rx_fifo, buf, len, 1, 0);
 }
 
 always_inline int
index 99546cb..5e94c41 100644 (file)
 typedef enum
 {
   FIFO_EVENT_APP_RX,
+  SESSION_IO_EVT_CT_RX,
   FIFO_EVENT_APP_TX,
+  SESSION_IO_EVT_CT_TX,
   FIFO_EVENT_DISCONNECT,
   FIFO_EVENT_BUILTIN_RX,
   FIFO_EVENT_RPC,
-  SESSION_IO_EVT_CT_TX,
-  SESSION_IO_EVT_CT_RX,
   SESSION_CTRL_EVT_ACCEPTED,
   SESSION_CTRL_EVT_ACCEPTED_REPLY,
   SESSION_CTRL_EVT_CONNECTED,