svm_msg_q_msg_t msg;
session_event_t *e;
svm_msg_q_t *mq;
- u8 is_full;
+ u8 is_ct;
if (PREDICT_FALSE (!buf))
return VPPCOM_EINVAL;
s = vcl_session_get_w_handle (wrk, session_handle);
- if (PREDICT_FALSE (!s))
+ if (PREDICT_FALSE (!s || s->is_vep))
return VPPCOM_EBADFD;
- if (PREDICT_FALSE (s->is_vep))
- {
- clib_warning ("VCL<%d>: ERROR: sid %u: cannot "
- "read from an epoll session!", getpid (), session_handle);
- return VPPCOM_EBADFD;
- }
-
- is_nonblocking = VCL_SESS_ATTR_TEST (s->attr, VCL_SESS_ATTR_NONBLOCK);
- rx_fifo = s->rx_fifo;
-
if (PREDICT_FALSE (!vcl_session_is_readable (s)))
{
session_state_t state = s->session_state;
return rv;
}
- mq = vcl_session_is_ct (s) ? s->our_evt_q : wrk->app_event_queue;
- svm_fifo_unset_event (rx_fifo);
- is_full = svm_fifo_is_full (rx_fifo);
+ is_nonblocking = VCL_SESS_ATTR_TEST (s->attr, VCL_SESS_ATTR_NONBLOCK);
+ is_ct = vcl_session_is_ct (s);
+ mq = is_ct ? s->our_evt_q : wrk->app_event_queue;
+ rx_fifo = s->rx_fifo;
if (svm_fifo_is_empty (rx_fifo))
{
if (is_nonblocking)
{
+ svm_fifo_unset_event (rx_fifo);
return VPPCOM_OK;
}
- while (1)
+ while (svm_fifo_is_empty (rx_fifo))
{
+ svm_fifo_unset_event (rx_fifo);
svm_msg_q_lock (mq);
if (svm_msg_q_is_empty (mq))
svm_msg_q_wait (mq);
svm_msg_q_sub_w_lock (mq, &msg);
e = svm_msg_q_msg_data (mq, &msg);
svm_msg_q_unlock (mq);
- if (!vcl_is_rx_evt_for_session (e, s->session_index,
- s->our_evt_q != 0))
+ if (!vcl_is_rx_evt_for_session (e, s->session_index, is_ct))
{
vcl_handle_mq_ctrl_event (wrk, e);
svm_msg_q_free_msg (mq, &msg);
continue;
}
- svm_fifo_unset_event (rx_fifo);
svm_msg_q_free_msg (mq, &msg);
+
if (PREDICT_FALSE (s->session_state == STATE_CLOSE_ON_EMPTY))
return 0;
- if (svm_fifo_is_empty (rx_fifo))
- continue;
- break;
}
}
else
n_read = app_recv_stream_raw (rx_fifo, buf, n, 0, peek);
- if (vcl_session_is_ct (s) && is_full)
+ if (svm_fifo_is_empty (rx_fifo))
+ svm_fifo_unset_event (rx_fifo);
+
+ if (is_ct && n_read + svm_fifo_max_dequeue (rx_fifo) == rx_fifo->nitems)
{
/* If the peer is not polling send notification */
if (!svm_fifo_has_event (s->rx_fifo))
SESSION_IO_EVT_CT_RX, SVM_Q_WAIT);
}
- if (VPPCOM_DEBUG > 2)
- {
- if (n_read > 0)
- clib_warning ("VCL<%d>: vpp handle 0x%llx, sid %u: read %d bytes "
- "from (%p)", getpid (), s->vpp_handle,
- session_handle, n_read, rx_fifo);
- else
- clib_warning ("VCL<%d>: vpp handle 0x%llx, sid %u: nothing read! "
- "returning %d (%s)", getpid (), s->vpp_handle,
- session_handle, n_read, vppcom_retval_str (n_read));
- }
+ VDBG (2, "VCL<%d>: vpp handle 0x%llx, sid %u: read %d bytes from (%p)",
+ getpid (), s->vpp_handle, session_handle, n_read, rx_fifo);
+
return n_read;
}
return (vppcom_session_read_internal (session_handle, buf, n, 1));
}
+int
+vppcom_session_read_segments (uint32_t session_handle,
+ vppcom_data_segments_t ds)
+{
+ vcl_worker_t *wrk = vcl_worker_get_current ();
+ int n_read = 0, rv, is_nonblocking;
+ vcl_session_t *s = 0;
+ svm_fifo_t *rx_fifo;
+ svm_msg_q_msg_t msg;
+ session_event_t *e;
+ svm_msg_q_t *mq;
+ u8 is_ct;
+
+ s = vcl_session_get_w_handle (wrk, session_handle);
+ if (PREDICT_FALSE (!s || s->is_vep))
+ return VPPCOM_EBADFD;
+
+ if (PREDICT_FALSE (!vcl_session_is_readable (s)))
+ {
+ session_state_t state = s->session_state;
+ rv = ((state & STATE_DISCONNECT) ? VPPCOM_ECONNRESET : VPPCOM_ENOTCONN);
+ return rv;
+ }
+
+ is_nonblocking = VCL_SESS_ATTR_TEST (s->attr, VCL_SESS_ATTR_NONBLOCK);
+ is_ct = vcl_session_is_ct (s);
+ mq = is_ct ? s->our_evt_q : wrk->app_event_queue;
+ rx_fifo = s->rx_fifo;
+
+ if (svm_fifo_is_empty (rx_fifo))
+ {
+ if (is_nonblocking)
+ {
+ svm_fifo_unset_event (rx_fifo);
+ return VPPCOM_OK;
+ }
+ while (svm_fifo_is_empty (rx_fifo))
+ {
+ svm_fifo_unset_event (rx_fifo);
+ svm_msg_q_lock (mq);
+ if (svm_msg_q_is_empty (mq))
+ svm_msg_q_wait (mq);
+
+ svm_msg_q_sub_w_lock (mq, &msg);
+ e = svm_msg_q_msg_data (mq, &msg);
+ svm_msg_q_unlock (mq);
+ if (!vcl_is_rx_evt_for_session (e, s->session_index, is_ct))
+ {
+ vcl_handle_mq_ctrl_event (wrk, e);
+ svm_msg_q_free_msg (mq, &msg);
+ continue;
+ }
+ svm_msg_q_free_msg (mq, &msg);
+
+ if (PREDICT_FALSE (s->session_state == STATE_CLOSE_ON_EMPTY))
+ return 0;
+ }
+ }
+
+ n_read = svm_fifo_segments (rx_fifo, (svm_fifo_segment_t *) ds);
+ svm_fifo_unset_event (rx_fifo);
+
+ if (is_ct && n_read + svm_fifo_max_dequeue (rx_fifo) == rx_fifo->nitems)
+ {
+ /* If the peer is not polling send notification */
+ if (!svm_fifo_has_event (s->rx_fifo))
+ app_send_io_evt_to_vpp (s->vpp_evt_q, s->rx_fifo,
+ SESSION_IO_EVT_CT_RX, SVM_Q_WAIT);
+ }
+
+ return n_read;
+}
+
+void
+vppcom_session_free_segments (uint32_t session_handle,
+ vppcom_data_segments_t ds)
+{
+ vcl_worker_t *wrk = vcl_worker_get_current ();
+ vcl_session_t *s;
+
+ s = vcl_session_get_w_handle (wrk, session_handle);
+ if (PREDICT_FALSE (!s || s->is_vep))
+ return;
+
+ svm_fifo_segments_free (s->rx_fifo, (svm_fifo_segment_t *) ds);
+}
+
static inline int
vppcom_session_read_ready (vcl_session_t * session)
{
return svm_fifo_max_dequeue (session->rx_fifo);
}
+int
+vppcom_data_segment_copy (void *buf, vppcom_data_segments_t ds, u32 max_bytes)
+{
+ u32 first_copy = clib_min (ds[0].len, max_bytes);
+ clib_memcpy (buf, ds[0].data, first_copy);
+ if (first_copy < max_bytes)
+ {
+ clib_memcpy (buf + first_copy, ds[1].data,
+ clib_min (ds[1].len, max_bytes - first_copy));
+ }
+ return 0;
+}
+
static u8
vcl_is_tx_evt_for_session (session_event_t * e, u32 sid, u8 is_ct)
{
{ \
svm_fifo_unset_event (_fifo); \
if (svm_fifo_is_empty (_fifo)) \
- break; \
+ break; \
} \
static int