vcl: add read/write udp support
[vpp.git] / src / vcl / vppcom.c
index d1c4413..a74e55a 100644 (file)
@@ -33,9 +33,15 @@ static void
 vcl_wait_for_memory (void *mem)
 {
   u8 __clib_unused test;
+  if (vcm->mounting_segment)
+    {
+      while (vcm->mounting_segment)
+       ;
+      return;
+    }
   if (1 || vcm->debug)
     {
-      sleep (1);
+      usleep (1e5);
       return;
     }
   if (signal (SIGSEGV, sigsegv_signal))
@@ -359,6 +365,8 @@ vcl_session_accepted_handler (session_accepted_msg_t * mp)
   hash_set (vcm->session_index_by_vpp_handles, mp->handle, session_index);
   session->transport.lcl_port = listen_session->transport.lcl_port;
   session->transport.lcl_ip = listen_session->transport.lcl_ip;
+  session->session_type = listen_session->session_type;
+  session->is_dgram = session->session_type == VPPCOM_PROTO_UDP;
 
   VDBG (1, "VCL<%d>: vpp handle 0x%llx, sid %u: client accept request from %s"
        " address %U port %d queue %p!", getpid (), mp->handle, session_index,
@@ -405,13 +413,18 @@ done:
   if (rv)
     goto done_unlock;
 
+  rx_fifo = uword_to_pointer (mp->server_rx_fifo, svm_fifo_t *);
+  tx_fifo = uword_to_pointer (mp->server_tx_fifo, svm_fifo_t *);
+  vcl_wait_for_memory (rx_fifo);
+  rx_fifo->client_session_index = session_index;
+  tx_fifo->client_session_index = session_index;
+
   if (mp->client_event_queue_address)
     {
       session->vpp_evt_q = uword_to_pointer (mp->server_event_queue_address,
                                             svm_msg_q_t *);
       session->our_evt_q = uword_to_pointer (mp->client_event_queue_address,
                                             svm_msg_q_t *);
-      vcl_wait_for_memory (session->vpp_evt_q);
       session->ct_registration = vcl_ct_registration_add (session->our_evt_q,
                                                          session_index);
     }
@@ -419,11 +432,6 @@ done:
     session->vpp_evt_q = uword_to_pointer (mp->vpp_event_queue_address,
                                           svm_msg_q_t *);
 
-  rx_fifo = uword_to_pointer (mp->server_rx_fifo, svm_fifo_t *);
-  rx_fifo->client_session_index = session_index;
-  tx_fifo = uword_to_pointer (mp->server_tx_fifo, svm_fifo_t *);
-  tx_fifo->client_session_index = session_index;
-
   session->rx_fifo = rx_fifo;
   session->tx_fifo = tx_fifo;
   session->vpp_handle = mp->handle;
@@ -762,11 +770,10 @@ vppcom_session_create (u8 proto, u8 is_nonblocking)
   session->session_type = proto;
   session->session_state = STATE_START;
   session->vpp_handle = ~0;
+  session->is_dgram = proto == VPPCOM_PROTO_UDP;
 
   if (is_nonblocking)
     VCL_SESS_ATTR_SET (session->attr, VCL_SESS_ATTR_NONBLOCK);
-  else
-    VCL_SESS_ATTR_CLR (session->attr, VCL_SESS_ATTR_NONBLOCK);
 
   vcl_evt (VCL_EVT_CREATE, session, session_type, session->session_state,
           is_nonblocking, session_index);
@@ -929,6 +936,10 @@ vppcom_session_bind (uint32_t session_index, vppcom_endpt_t * ep)
        session->session_type ? "UDP" : "TCP");
   vcl_evt (VCL_EVT_BIND, session);
   VCL_SESSION_UNLOCK ();
+
+  if (session->session_type == VPPCOM_PROTO_UDP)
+    vppcom_session_listen (session_index, 10);
+
 done:
   return rv;
 }
@@ -1225,12 +1236,20 @@ vcl_is_rx_evt_for_session (session_event_t * e, u32 sid, u8 is_ct)
     return (e->event_type == SESSION_IO_EVT_CT_TX);
 }
 
+static inline u8
+vcl_session_is_readable (vcl_session_t * s)
+{
+  return ((s->session_state & STATE_OPEN)
+         || (s->session_state == STATE_LISTEN
+             && s->session_type == VPPCOM_PROTO_UDP));
+}
+
 static inline int
 vppcom_session_read_internal (uint32_t session_index, void *buf, int n,
                              u8 peek)
 {
   int n_read = 0, rv, is_nonblocking;
-  vcl_session_t *session = 0;
+  vcl_session_t *s = 0;
   svm_fifo_t *rx_fifo;
   svm_msg_q_msg_t msg;
   session_event_t *e;
@@ -1239,9 +1258,9 @@ vppcom_session_read_internal (uint32_t session_index, void *buf, int n,
 
   ASSERT (buf);
 
-  VCL_SESSION_LOCK_AND_GET (session_index, &session);
+  VCL_SESSION_LOCK_AND_GET (session_index, &s);
 
-  if (PREDICT_FALSE (session->is_vep))
+  if (PREDICT_FALSE (s->is_vep))
     {
       VCL_SESSION_UNLOCK ();
       clib_warning ("VCL<%d>: ERROR: sid %u: cannot "
@@ -1250,24 +1269,24 @@ vppcom_session_read_internal (uint32_t session_index, void *buf, int n,
       goto done;
     }
 
-  is_nonblocking = VCL_SESS_ATTR_TEST (session->attr, VCL_SESS_ATTR_NONBLOCK);
-  rx_fifo = session->rx_fifo;
+  is_nonblocking = VCL_SESS_ATTR_TEST (s->attr, VCL_SESS_ATTR_NONBLOCK);
+  rx_fifo = s->rx_fifo;
 
-  if (PREDICT_FALSE (!(session->session_state & STATE_OPEN)))
+  if (PREDICT_FALSE (!vcl_session_is_readable (s)))
     {
-      session_state_t state = session->session_state;
+      session_state_t state = s->session_state;
       VCL_SESSION_UNLOCK ();
       rv = ((state & STATE_DISCONNECT) ? VPPCOM_ECONNRESET : VPPCOM_ENOTCONN);
 
       VDBG (0, "VCL<%d>: vpp handle 0x%llx, sid %u: %s session is not open! "
            "state 0x%x (%s), returning %d (%s)",
-           getpid (), session->vpp_handle, session_index, state,
+           getpid (), s->vpp_handle, session_index, state,
            vppcom_session_state_str (state), rv, vppcom_retval_str (rv));
       goto done;
     }
 
   VCL_SESSION_UNLOCK ();
-  mq = session->our_evt_q ? session->our_evt_q : vcm->app_event_queue;
+  mq = vcl_session_is_ct (s) ? s->our_evt_q : vcm->app_event_queue;
   is_full = svm_fifo_is_full (rx_fifo);
 
   if (svm_fifo_is_empty (rx_fifo))
@@ -1286,7 +1305,7 @@ vppcom_session_read_internal (uint32_t session_index, void *buf, int n,
          svm_msg_q_sub_w_lock (mq, &msg);
          e = svm_msg_q_msg_data (mq, &msg);
          if (!vcl_is_rx_evt_for_session (e, session_index,
-                                         session->our_evt_q != 0))
+                                         s->our_evt_q != 0))
            {
              vcl_handle_mq_ctrl_event (e);
              svm_msg_q_free_msg (mq, &msg);
@@ -1303,27 +1322,24 @@ vppcom_session_read_internal (uint32_t session_index, void *buf, int n,
        }
     }
 
-  if (peek)
-    n_read = svm_fifo_peek (rx_fifo, 0, n, buf);
+  if (s->is_dgram)
+    n_read = app_recv_dgram_raw (rx_fifo, buf, n, &s->transport, 1, peek);
   else
-    n_read = svm_fifo_dequeue_nowait (rx_fifo, n, buf);
-  ASSERT (n_read > 0);
-  svm_fifo_unset_event (rx_fifo);
+    n_read = app_recv_stream_raw (rx_fifo, buf, n, 1, peek);
 
-  if (session->our_evt_q && is_full)
-    app_send_io_evt_to_vpp (session->vpp_evt_q, rx_fifo, SESSION_IO_EVT_CT_RX,
+  if (vcl_session_is_ct (s) && is_full)
+    app_send_io_evt_to_vpp (s->vpp_evt_q, rx_fifo, SESSION_IO_EVT_CT_RX,
                            SVM_Q_WAIT);
 
-
   if (VPPCOM_DEBUG > 2)
     {
       if (n_read > 0)
        clib_warning ("VCL<%d>: vpp handle 0x%llx, sid %u: read %d bytes "
-                     "from (%p)", getpid (), session->vpp_handle,
+                     "from (%p)", getpid (), s->vpp_handle,
                      session_index, n_read, rx_fifo);
       else
        clib_warning ("VCL<%d>: vpp handle 0x%llx, sid %u: nothing read! "
-                     "returning %d (%s)", getpid (), session->vpp_handle,
+                     "returning %d (%s)", getpid (), s->vpp_handle,
                      session_index, rv, vppcom_retval_str (rv));
     }
   return n_read;
@@ -1389,45 +1405,46 @@ int
 vppcom_session_write (uint32_t session_index, void *buf, size_t n)
 {
   int rv, n_write, is_nonblocking;
-  vcl_session_t *session = 0;
+  vcl_session_t *s = 0;
   svm_fifo_t *tx_fifo = 0;
+  session_evt_type_t et;
   svm_msg_q_msg_t msg;
   session_event_t *e;
   svm_msg_q_t *mq;
 
   ASSERT (buf);
 
-  VCL_SESSION_LOCK_AND_GET (session_index, &session);
+  VCL_SESSION_LOCK_AND_GET (session_index, &s);
 
-  tx_fifo = session->tx_fifo;
-  is_nonblocking = VCL_SESS_ATTR_TEST (session->attr, VCL_SESS_ATTR_NONBLOCK);
+  tx_fifo = s->tx_fifo;
+  is_nonblocking = VCL_SESS_ATTR_TEST (s->attr, VCL_SESS_ATTR_NONBLOCK);
 
-  if (PREDICT_FALSE (session->is_vep))
+  if (PREDICT_FALSE (s->is_vep))
     {
       VCL_SESSION_UNLOCK ();
       clib_warning ("VCL<%d>: ERROR: vpp handle 0x%llx, sid %u: "
                    "cannot write to an epoll session!",
-                   getpid (), session->vpp_handle, session_index);
+                   getpid (), s->vpp_handle, session_index);
 
       rv = VPPCOM_EBADFD;
       goto done;
     }
 
-  if (!(session->session_state & STATE_OPEN))
+  if (!(s->session_state & STATE_OPEN))
     {
-      session_state_t state = session->session_state;
+      session_state_t state = s->session_state;
       rv = ((state & STATE_DISCONNECT) ? VPPCOM_ECONNRESET : VPPCOM_ENOTCONN);
       VCL_SESSION_UNLOCK ();
       VDBG (1, "VCL<%d>: vpp handle 0x%llx, sid %u: session is not open! "
            "state 0x%x (%s)",
-           getpid (), session->vpp_handle, session_index,
+           getpid (), s->vpp_handle, session_index,
            state, vppcom_session_state_str (state));
       goto done;
     }
 
   VCL_SESSION_UNLOCK ();
 
-  mq = session->our_evt_q ? session->our_evt_q : vcm->app_event_queue;
+  mq = vcl_session_is_ct (s) ? s->our_evt_q : vcm->app_event_queue;
   if (svm_fifo_is_full (tx_fifo))
     {
       if (is_nonblocking)
@@ -1448,7 +1465,7 @@ vppcom_session_write (uint32_t session_index, void *buf, size_t n)
          svm_msg_q_sub_w_lock (mq, &msg);
          e = svm_msg_q_msg_data (mq, &msg);
          if (!vcl_is_tx_evt_for_session (e, session_index,
-                                         session->our_evt_q != 0))
+                                         s->our_evt_q != 0))
            {
              vcl_handle_mq_ctrl_event (e);
              svm_msg_q_free_msg (mq, &msg);
@@ -1465,31 +1482,27 @@ vppcom_session_write (uint32_t session_index, void *buf, size_t n)
        }
     }
 
-  n_write = svm_fifo_enqueue_nowait (tx_fifo, n, (void *) buf);
-  ASSERT (n_write > 0);
+  ASSERT (FIFO_EVENT_APP_TX + 1 == SESSION_IO_EVT_CT_TX);
+  et = FIFO_EVENT_APP_TX + vcl_session_is_ct (s);
+  if (s->is_dgram)
+    n_write = app_send_dgram_raw (tx_fifo, &s->transport,
+                                 s->vpp_evt_q, buf, n, et, SVM_Q_WAIT);
+  else
+    n_write = app_send_stream_raw (tx_fifo, s->vpp_evt_q, buf, n, et,
+                                  SVM_Q_WAIT);
 
-  if (svm_fifo_set_event (tx_fifo))
-    {
-      session_evt_type_t et;
-      VCL_SESSION_LOCK_AND_GET (session_index, &session);
-      et = session->our_evt_q ? SESSION_IO_EVT_CT_TX : FIFO_EVENT_APP_TX;
-      app_send_io_evt_to_vpp (session->vpp_evt_q, tx_fifo, et, SVM_Q_WAIT);
-      VCL_SESSION_UNLOCK ();
-      VDBG (1, "VCL<%d>: vpp handle 0x%llx, sid %u: added FIFO_EVENT_APP_TX "
-           "to vpp_event_q %p, n_write %d", getpid (),
-           session->vpp_handle, session_index, session->vpp_evt_q, n_write);
-    }
+  ASSERT (n_write > 0);
 
   if (VPPCOM_DEBUG > 2)
     {
       if (n_write <= 0)
        clib_warning ("VCL<%d>: vpp handle 0x%llx, sid %u: "
-                     "FIFO-FULL (%p)", getpid (), session->vpp_handle,
+                     "FIFO-FULL (%p)", getpid (), s->vpp_handle,
                      session_index, tx_fifo);
       else
        clib_warning ("VCL<%d>: vpp handle 0x%llx, sid %u: "
                      "wrote %d bytes tx-fifo: (%p)", getpid (),
-                     session->vpp_handle, session_index, n_write, tx_fifo);
+                     s->vpp_handle, session_index, n_write, tx_fifo);
     }
   return n_write;
 
@@ -2885,18 +2898,11 @@ vppcom_session_recvfrom (uint32_t session_index, void *buffer,
          VCL_SESSION_UNLOCK ();
          VDBG (0, "VCL<%d>: invalid session, sid (%u) has been closed!",
                getpid (), session_index);
-         rv = VPPCOM_EBADFD;
          VCL_SESSION_UNLOCK ();
-         goto done;
+         return VPPCOM_EBADFD;
        }
       ep->is_ip4 = session->transport.is_ip4;
       ep->port = session->transport.rmt_port;
-      if (session->transport.is_ip4)
-       clib_memcpy (ep->ip, &session->transport.rmt_ip.ip4,
-                    sizeof (ip4_address_t));
-      else
-       clib_memcpy (ep->ip, &session->transport.rmt_ip.ip6,
-                    sizeof (ip6_address_t));
       VCL_SESSION_UNLOCK ();
     }
 
@@ -2908,10 +2914,16 @@ vppcom_session_recvfrom (uint32_t session_index, void *buffer,
     {
       clib_warning ("VCL<%d>: Unsupport flags for recvfrom %d",
                    getpid (), flags);
-      rv = VPPCOM_EAFNOSUPPORT;
+      return VPPCOM_EAFNOSUPPORT;
     }
 
-done:
+  if (session->transport.is_ip4)
+    clib_memcpy (ep->ip, &session->transport.rmt_ip.ip4,
+                sizeof (ip4_address_t));
+  else
+    clib_memcpy (ep->ip, &session->transport.rmt_ip.ip6,
+                sizeof (ip6_address_t));
+
   return rv;
 }