vcl: always fill buffer or drain rx fifo
[vpp.git] / src / vcl / vppcom.c
index 85d8081..dd8ffb4 100644 (file)
@@ -530,7 +530,13 @@ vcl_session_connected_handler (vcl_worker_t * wrk,
   clib_memcpy_fast (&session->transport.lcl_ip, &mp->lcl.ip,
                    sizeof (session->transport.lcl_ip));
   session->transport.lcl_port = mp->lcl.port;
-  session->session_state = STATE_CONNECT;
+
+  /* Application closed session before connect reply */
+  if (VCL_SESS_ATTR_TEST (session->attr, VCL_SESS_ATTR_NONBLOCK)
+      && session->session_state == STATE_CLOSED)
+    vcl_send_session_disconnect (wrk, session);
+  else
+    session->session_state = STATE_CONNECT;
 
   /* Add it to lookup table */
   vcl_session_table_add_vpp_handle (wrk, mp->handle, session_index);
@@ -1172,6 +1178,11 @@ vppcom_session_disconnect (u32 session_handle)
     }
   else
     {
+      /* Session doesn't have an event queue yet. Probably a non-blocking
+       * connect. Wait for the reply */
+      if (PREDICT_FALSE (!session->vpp_evt_q))
+       return VPPCOM_OK;
+
       VDBG (1, "session %u [0x%llx]: sending disconnect...",
            session->session_index, vpp_handle);
       vcl_send_session_disconnect (wrk, session);
@@ -1788,7 +1799,10 @@ vppcom_session_connect (uint32_t session_handle, vppcom_endpt_t * server_ep)
   vcl_send_session_connect (wrk, session);
 
   if (VCL_SESS_ATTR_TEST (session->attr, VCL_SESS_ATTR_NONBLOCK))
-    return VPPCOM_EINPROGRESS;
+    {
+      session->session_state = STATE_CONNECT;
+      return VPPCOM_EINPROGRESS;
+    }
 
   /*
    * Wait for reply from vpp if blocking
@@ -1878,7 +1892,7 @@ vppcom_session_read_internal (uint32_t session_handle, void *buf, int n,
                              u8 peek)
 {
   vcl_worker_t *wrk = vcl_worker_get_current ();
-  int n_read = 0, is_nonblocking;
+  int rv, n_read = 0, is_nonblocking;
   vcl_session_t *s = 0;
   svm_fifo_t *rx_fifo;
   svm_msg_q_msg_t msg;
@@ -1935,10 +1949,15 @@ vppcom_session_read_internal (uint32_t session_handle, void *buf, int n,
        }
     }
 
+read_again:
+
   if (s->is_dgram)
-    n_read = app_recv_dgram_raw (rx_fifo, buf, n, &s->transport, 0, peek);
+    rv = app_recv_dgram_raw (rx_fifo, buf, n, &s->transport, 0, peek);
   else
-    n_read = app_recv_stream_raw (rx_fifo, buf, n, 0, peek);
+    rv = app_recv_stream_raw (rx_fifo, buf, n, 0, peek);
+
+  ASSERT (rv >= 0);
+  n_read += rv;
 
   if (svm_fifo_is_empty_cons (rx_fifo))
     {
@@ -1946,12 +1965,19 @@ vppcom_session_read_internal (uint32_t session_handle, void *buf, int n,
       if (!svm_fifo_is_empty_cons (rx_fifo)
          && svm_fifo_set_event (s->rx_fifo) && is_nonblocking)
        {
-         session_event_t *e;
          vec_add2 (wrk->unhandled_evts_vector, e, 1);
          e->event_type = SESSION_IO_EVT_RX;
          e->session_index = s->session_index;
        }
     }
+  else if (PREDICT_FALSE (rv < n))
+    {
+      /* More data enqueued while reading. Try to drain it
+       * or fill the buffer */
+      buf += rv;
+      n -= rv;
+      goto read_again;
+    }
 
   if (PREDICT_FALSE (svm_fifo_needs_deq_ntf (rx_fifo, n_read)))
     {
@@ -2158,7 +2184,9 @@ vppcom_session_write_inline (vcl_worker_t * wrk, vcl_session_t * s, void *buf,
     app_send_io_evt_to_vpp (s->vpp_evt_q, s->tx_fifo->master_session_index,
                            et, SVM_Q_WAIT);
 
-  ASSERT (n_write > 0);
+  /* The underlying fifo segment can run out of memory */
+  if (PREDICT_FALSE (n_write < 0))
+    return VPPCOM_EAGAIN;
 
   VDBG (2, "session %u [0x%llx]: wrote %d bytes", s->session_index,
        s->vpp_handle, n_write);