From 2cba8533cc4444c4615903add9a8f20c7c87079c Mon Sep 17 00:00:00 2001 From: Florin Coras Date: Tue, 11 Sep 2018 16:33:36 -0700 Subject: [PATCH] vcl: add apis that expos fifo as buffer Change-Id: I4bd9c9f73499711e04b38d53daa5c917a4285bf5 Signed-off-by: Florin Coras --- src/svm/svm_fifo.c | 47 +++++++++++++++++ src/svm/svm_fifo.h | 12 ++++- src/vcl/vcl_private.h | 1 + src/vcl/vcl_test.h | 42 +++++++++++++-- src/vcl/vcl_test_client.c | 19 ++++--- src/vcl/vcl_test_server.c | 111 ++++++++++++++++++++++++++++++++-------- src/vcl/vppcom.c | 128 ++++++++++++++++++++++++++++++++++++++-------- src/vcl/vppcom.h | 15 ++++++ 8 files changed, 320 insertions(+), 55 deletions(-) diff --git a/src/svm/svm_fifo.c b/src/svm/svm_fifo.c index 018827e0343..0c258b4259c 100644 --- a/src/svm/svm_fifo.c +++ b/src/svm/svm_fifo.c @@ -835,6 +835,53 @@ svm_fifo_dequeue_drop_all (svm_fifo_t * f) __sync_fetch_and_sub (&f->cursize, f->cursize); } +int +svm_fifo_segments (svm_fifo_t * f, svm_fifo_segment_t * fs) +{ + u32 cursize, nitems; + + /* read cursize, which can only increase while we're working */ + cursize = svm_fifo_max_dequeue (f); + if (PREDICT_FALSE (cursize == 0)) + return -2; + + nitems = f->nitems; + + fs[0].len = ((nitems - f->head) < cursize) ? (nitems - f->head) : cursize; + fs[0].data = f->data + f->head; + + if (fs[0].len < cursize) + { + fs[1].len = cursize - fs[0].len; + fs[1].data = f->data; + } + else + { + fs[1].len = 0; + fs[1].data = 0; + } + return cursize; +} + +void +svm_fifo_segments_free (svm_fifo_t * f, svm_fifo_segment_t * fs) +{ + u32 total_drop_bytes; + + ASSERT (fs[0].data == f->data + f->head); + if (fs[1].len) + { + f->head = fs[1].len; + total_drop_bytes = fs[0].len + fs[1].len; + } + else + { + f->head = (f->head + fs[0].len) % f->nitems; + total_drop_bytes = fs[0].len; + } + __sync_fetch_and_sub (&f->cursize, total_drop_bytes); +} + u32 svm_fifo_number_ooo_segments (svm_fifo_t * f) { diff --git a/src/svm/svm_fifo.h b/src/svm/svm_fifo.h index a8aea00996e..ec32fd5f40d 100644 --- a/src/svm/svm_fifo.h +++ b/src/svm/svm_fifo.h @@ -85,6 +85,12 @@ typedef enum SVM_FIFO_FULL = -2, } svm_fifo_err_t; +typedef struct svm_fifo_segment_ +{ + u8 *data; + u32 len; +} svm_fifo_segment_t; + #if SVM_FIFO_TRACE #define svm_fifo_trace_add(_f, _s, _l, _t) \ { \ @@ -147,8 +153,8 @@ svm_fifo_has_ooo_data (svm_fifo_t * f) always_inline u8 svm_fifo_set_event (svm_fifo_t * f) { -// return __sync_lock_test_and_set (&f->has_event, 1) == 0; -// return __sync_bool_compare_and_swap (&f->has_event, 0, 1); + /* return __sync_lock_test_and_set (&f->has_event, 1) == 0; + return __sync_bool_compare_and_swap (&f->has_event, 0, 1); */ return !__atomic_exchange_n (&f->has_event, 1, __ATOMIC_RELEASE); } @@ -175,6 +181,8 @@ int svm_fifo_dequeue_nowait (svm_fifo_t * f, u32 max_bytes, u8 * copy_here); int svm_fifo_peek (svm_fifo_t * f, u32 offset, u32 max_bytes, u8 * copy_here); int svm_fifo_dequeue_drop (svm_fifo_t * f, u32 max_bytes); void svm_fifo_dequeue_drop_all (svm_fifo_t * f); +int svm_fifo_segments (svm_fifo_t * f, svm_fifo_segment_t * fs); +void svm_fifo_segments_free (svm_fifo_t * f, svm_fifo_segment_t * fs); u32 svm_fifo_number_ooo_segments (svm_fifo_t * f); ooo_segment_t *svm_fifo_first_ooo_segment (svm_fifo_t * f); void svm_fifo_init_pointers (svm_fifo_t * f, u32 pointer); diff --git a/src/vcl/vcl_private.h b/src/vcl/vcl_private.h index 11d957a1317..5f3c1ecd63f 100644 --- a/src/vcl/vcl_private.h +++ b/src/vcl/vcl_private.h @@ -139,6 +139,7 @@ do { \ typedef struct { + CLIB_CACHE_LINE_ALIGN_MARK (cacheline0); #define _(type, name) type name; foreach_app_session_field #undef _ diff --git a/src/vcl/vcl_test.h b/src/vcl/vcl_test.h index 8808c0f5c5e..83e63e1a4e7 100644 --- a/src/vcl/vcl_test.h +++ b/src/vcl/vcl_test.h @@ -77,11 +77,43 @@ vcl_test_read (int fd, uint8_t *buf, uint32_t nbytes, if (rx_bytes < 0) { - errno_val = errno; - perror ("ERROR in sock_test_read()"); - fprintf (stderr, "SOCK_TEST: ERROR: socket read " - "failed (errno = %d)!\n", errno_val); - errno = errno_val; + vterr ("vppcom_session_read()", -errno); + } + else if (stats) + stats->rx_bytes += rx_bytes; + + return (rx_bytes); +} + +static inline int +vcl_test_read_ds (int fd, vppcom_data_segments_t ds, sock_test_stats_t *stats) +{ + int rx_bytes, errno_val; + + do + { + if (stats) + stats->rx_xacts++; + rx_bytes = vppcom_session_read_segments (fd, ds); + + if (rx_bytes < 0) + { + errno = -rx_bytes; + rx_bytes = -1; + } + if (stats) + { + if ((rx_bytes == 0) || + ((rx_bytes < 0) && ((errno == EAGAIN) || (errno == EWOULDBLOCK)))) + stats->rx_eagain++; + } + } + while ((rx_bytes == 0) || + ((rx_bytes < 0) && ((errno == EAGAIN) || (errno == EWOULDBLOCK)))); + + if (rx_bytes < 0) + { + vterr ("vppcom_session_read()", -errno); } else if (stats) stats->rx_bytes += rx_bytes; diff --git a/src/vcl/vcl_test_client.c b/src/vcl/vcl_test_client.c index 899d729c47d..c92f0cb754d 100644 --- a/src/vcl/vcl_test_client.c +++ b/src/vcl/vcl_test_client.c @@ -314,6 +314,7 @@ vtc_worker_sessions_exit (vcl_test_client_worker_t * wrk) (void) vcl_test_write (tsock->fd, (uint8_t *) & tsock->cfg, sizeof (tsock->cfg), &tsock->stats, verbose); } + wrk->n_sessions = 0; } static void * @@ -322,10 +323,10 @@ vtc_worker_loop (void *arg) vcl_test_client_main_t *vcm = &vcl_client_main; sock_test_socket_t *ctrl = &vcm->ctrl_socket; vcl_test_client_worker_t *wrk = arg; + uint32_t n_active_sessions, n_bytes; fd_set _wfdset, *wfdset = &_wfdset; fd_set _rfdset, *rfdset = &_rfdset; sock_test_socket_t *tsock; - uint32_t n_active_sessions; int i, rv, check_rx = 0; rv = vtc_worker_init (wrk); @@ -374,8 +375,11 @@ vtc_worker_loop (void *arg) if (FD_ISSET (vppcom_session_index (tsock->fd), wfdset) && tsock->stats.tx_bytes < tsock->cfg.total_bytes) { + n_bytes = tsock->cfg.txbuf_size; + if (tsock->cfg.test == SOCK_TEST_TYPE_ECHO) + n_bytes = strlen (ctrl->txbuf) + 1; rv = vcl_test_write (tsock->fd, (uint8_t *) tsock->txbuf, - tsock->cfg.txbuf_size, &tsock->stats, + n_bytes, &tsock->stats, tsock->cfg.verbose); if (rv < 0) { @@ -396,7 +400,8 @@ vtc_worker_loop (void *arg) } exit: vtinf ("Worker %d done ...", wrk->wrk_index); - vtc_accumulate_stats (wrk, ctrl); + if (tsock->cfg.test != SOCK_TEST_TYPE_ECHO) + vtc_accumulate_stats (wrk, ctrl); sleep (1); vtc_worker_sessions_exit (wrk); if (wrk->wrk_index) @@ -462,9 +467,11 @@ vtc_echo_client (vcl_test_client_main_t * vcm) vtc_worker_loop (wrk); - clock_gettime (CLOCK_REALTIME, &ctrl->stats.stop); - vtc_accumulate_stats (wrk, ctrl); - vtc_print_stats (ctrl); + /* Not relevant for echo test + clock_gettime (CLOCK_REALTIME, &ctrl->stats.stop); + vtc_accumulate_stats (wrk, ctrl); + vtc_print_stats (ctrl); + */ } static void diff --git a/src/vcl/vcl_test_server.c b/src/vcl/vcl_test_server.c index 7c1bef62d6d..a184d995372 100644 --- a/src/vcl/vcl_test_server.c +++ b/src/vcl/vcl_test_server.c @@ -38,6 +38,7 @@ typedef struct sock_test_stats_t stats; vppcom_endpt_t endpt; uint8_t ip[16]; + vppcom_data_segments_t ds; } vcl_test_server_conn_t; typedef struct @@ -72,6 +73,7 @@ typedef struct struct sockaddr_storage servaddr; volatile int worker_fails; volatile int active_workers; + u8 use_ds; } vcl_test_server_main_t; static __thread int __wrk_index = 0; @@ -196,6 +198,7 @@ vts_server_start_stop (vcl_test_server_worker_t * wrk, sync_config_and_reply (conn, rx_cfg); vtinf ("(fd %d): %s-directional Stream Test Complete!\n" SOCK_TEST_BANNER_STRING "\n", conn->fd, is_bi ? "Bi" : "Uni"); + memset (&conn->stats, 0, sizeof (conn->stats)); } else { @@ -217,12 +220,27 @@ vts_server_start_stop (vcl_test_server_worker_t * wrk, static inline void vts_server_rx (vcl_test_server_conn_t * conn, int rx_bytes) { + vcl_test_server_main_t *vts = &sock_server_main; int client_fd = conn->fd; - sock_test_t test = conn->cfg.test; - if (test == SOCK_TEST_TYPE_BI) - (void) vcl_test_write (client_fd, conn->buf, rx_bytes, &conn->stats, - conn->cfg.verbose); + if (conn->cfg.test == SOCK_TEST_TYPE_BI) + { + if (vts->use_ds) + { + (void) vcl_test_write (client_fd, conn->ds[0].data, conn->ds[0].len, + &conn->stats, conn->cfg.verbose); + if (conn->ds[1].len) + (void) vcl_test_write (client_fd, conn->ds[1].data, + conn->ds[1].len, &conn->stats, + conn->cfg.verbose); + } + else + (void) vcl_test_write (client_fd, conn->buf, rx_bytes, &conn->stats, + conn->cfg.verbose); + } + + if (vts->use_ds) + vppcom_session_free_segments (conn->fd, conn->ds); if (conn->stats.rx_bytes >= conn->cfg.total_bytes) clock_gettime (CLOCK_REALTIME, &conn->stats.stop); @@ -231,11 +249,13 @@ vts_server_rx (vcl_test_server_conn_t * conn, int rx_bytes) static void vts_server_echo (vcl_test_server_conn_t * conn, int rx_bytes) { + vcl_test_server_main_t *vts = &sock_server_main; int tx_bytes, nbytes, pos; - /* If it looks vaguely like a string, - * make sure it's terminated - */ + if (vts->use_ds) + vppcom_data_segment_copy (conn->buf, conn->ds, rx_bytes); + + /* If it looks vaguely like a string, make sure it's terminated */ pos = rx_bytes < conn->buf_size ? rx_bytes : conn->buf_size - 1; ((char *) conn->buf)[pos] = 0; vtinf ("(fd %d): RX (%d bytes) - '%s'", conn->fd, rx_bytes, conn->buf); @@ -349,7 +369,7 @@ vcl_test_server_process_opts (vcl_test_server_main_t * ssm, int argc, ssm->cfg.proto = VPPCOM_PROTO_TCP; opterr = 0; - while ((c = getopt (argc, argv, "6Dw:")) != -1) + while ((c = getopt (argc, argv, "6Dsw:")) != -1) switch (c) { case '6': @@ -367,7 +387,9 @@ vcl_test_server_process_opts (vcl_test_server_main_t * ssm, int argc, else vtwrn ("Invalid number of workers %d", v); break; - + case 's': + ssm->use_ds = 1; + break; case '?': switch (optopt) { @@ -501,6 +523,53 @@ vts_worker_init (vcl_test_server_worker_t * wrk) vtinf ("Waiting for a client to connect on port %d ...", ssm->cfg.port); } +static int +vts_conn_expect_config (vcl_test_server_conn_t * conn) +{ + if (conn->cfg.test == SOCK_TEST_TYPE_ECHO) + return 1; + + return (conn->stats.rx_bytes < 128 + || conn->stats.rx_bytes > conn->cfg.total_bytes); +} + +static sock_test_cfg_t * +vts_conn_read_config (vcl_test_server_conn_t * conn) +{ + vcl_test_server_main_t *vts = &sock_server_main; + + if (vts->use_ds) + { + /* We could avoid the copy if the first segment is big enough but this + * just simplifies things */ + vppcom_data_segment_copy (conn->buf, conn->ds, + sizeof (sock_test_cfg_t)); + vppcom_session_free_segments (conn->fd, conn->ds); + } + return (sock_test_cfg_t *) conn->buf; +} + +static inline int +vts_conn_read (vcl_test_server_conn_t * conn) +{ + vcl_test_server_main_t *vts = &sock_server_main; + if (vts->use_ds) + return vcl_test_read_ds (conn->fd, conn->ds, &conn->stats); + else + return vcl_test_read (conn->fd, conn->buf, conn->buf_size, &conn->stats); +} + +static inline int +vts_conn_has_ascii (vcl_test_server_conn_t * conn) +{ + vcl_test_server_main_t *vts = &sock_server_main; + + if (vts->use_ds) + return isascii (conn->ds[0].data[0]); + else + return isascii (conn->buf[0]); +} + static void * vts_worker_loop (void *arg) { @@ -550,8 +619,7 @@ vts_worker_loop (void *arg) if (EPOLLIN & wrk->wait_events[i].events) { read_again: - rx_bytes = vcl_test_read (conn->fd, conn->buf, - conn->buf_size, &conn->stats); + rx_bytes = vts_conn_read (conn); if (rx_bytes <= 0) { @@ -564,16 +632,19 @@ vts_worker_loop (void *arg) continue; } - rx_cfg = (sock_test_cfg_t *) conn->buf; - if (rx_cfg->magic == SOCK_TEST_CFG_CTRL_MAGIC) + if (vts_conn_expect_config (conn)) { - vts_handle_cfg (wrk, rx_cfg, conn, rx_bytes); - if (!wrk->nfds) + rx_cfg = vts_conn_read_config (conn); + if (rx_cfg->magic == SOCK_TEST_CFG_CTRL_MAGIC) { - vtinf ("All client connections closed\n"); - goto done; + vts_handle_cfg (wrk, rx_cfg, conn, rx_bytes); + if (!wrk->nfds) + { + vtinf ("All client connections closed\n"); + goto done; + } + continue; } - continue; } if ((conn->cfg.test == SOCK_TEST_TYPE_UNI) || (conn->cfg.test == SOCK_TEST_TYPE_BI)) @@ -584,7 +655,7 @@ vts_worker_loop (void *arg) goto read_again; continue; } - if (isascii (conn->buf[0])) + if (vts_conn_has_ascii (conn)) { vts_server_echo (conn, rx_bytes); } @@ -621,7 +692,7 @@ main (int argc, char **argv) clib_mem_init_thread_safe (0, 64 << 20); vsm->cfg.port = SOCK_TEST_SERVER_PORT; vsm->cfg.workers = 1; - vsm->active_workers = 1; + vsm->active_workers = 0; vcl_test_server_process_opts (vsm, argc, argv); rv = vppcom_app_create ("vcl_test_server"); diff --git a/src/vcl/vppcom.c b/src/vcl/vppcom.c index 6d6e7d08797..df8f4cae964 100644 --- a/src/vcl/vppcom.c +++ b/src/vcl/vppcom.c @@ -1274,19 +1274,9 @@ vppcom_session_read_internal (uint32_t session_handle, void *buf, int n, return VPPCOM_EINVAL; s = vcl_session_get_w_handle (wrk, session_handle); - if (PREDICT_FALSE (!s)) + if (PREDICT_FALSE (!s || s->is_vep)) return VPPCOM_EBADFD; - if (PREDICT_FALSE (s->is_vep)) - { - clib_warning ("VCL<%d>: ERROR: sid %u: cannot " - "read from an epoll session!", getpid (), session_handle); - return VPPCOM_EBADFD; - } - - is_nonblocking = VCL_SESS_ATTR_TEST (s->attr, VCL_SESS_ATTR_NONBLOCK); - rx_fifo = s->rx_fifo; - if (PREDICT_FALSE (!vcl_session_is_readable (s))) { session_state_t state = s->session_state; @@ -1299,8 +1289,10 @@ vppcom_session_read_internal (uint32_t session_handle, void *buf, int n, return rv; } + is_nonblocking = VCL_SESS_ATTR_TEST (s->attr, VCL_SESS_ATTR_NONBLOCK); is_ct = vcl_session_is_ct (s); mq = is_ct ? s->our_evt_q : wrk->app_event_queue; + rx_fifo = s->rx_fifo; if (svm_fifo_is_empty (rx_fifo)) { @@ -1348,17 +1340,9 @@ vppcom_session_read_internal (uint32_t session_handle, void *buf, int n, SESSION_IO_EVT_CT_RX, SVM_Q_WAIT); } - if (VPPCOM_DEBUG > 2) - { - if (n_read > 0) - clib_warning ("VCL<%d>: vpp handle 0x%llx, sid %u: read %d bytes " - "from (%p)", getpid (), s->vpp_handle, - session_handle, n_read, rx_fifo); - else - clib_warning ("VCL<%d>: vpp handle 0x%llx, sid %u: nothing read! " - "returning %d (%s)", getpid (), s->vpp_handle, - session_handle, n_read, vppcom_retval_str (n_read)); - } + VDBG (2, "VCL<%d>: vpp handle 0x%llx, sid %u: read %d bytes from (%p)", + getpid (), s->vpp_handle, session_handle, n_read, rx_fifo); + return n_read; } @@ -1374,6 +1358,93 @@ vppcom_session_peek (uint32_t session_handle, void *buf, int n) return (vppcom_session_read_internal (session_handle, buf, n, 1)); } +int +vppcom_session_read_segments (uint32_t session_handle, + vppcom_data_segments_t ds) +{ + vcl_worker_t *wrk = vcl_worker_get_current (); + int n_read = 0, rv, is_nonblocking; + vcl_session_t *s = 0; + svm_fifo_t *rx_fifo; + svm_msg_q_msg_t msg; + session_event_t *e; + svm_msg_q_t *mq; + u8 is_ct; + + s = vcl_session_get_w_handle (wrk, session_handle); + if (PREDICT_FALSE (!s || s->is_vep)) + return VPPCOM_EBADFD; + + if (PREDICT_FALSE (!vcl_session_is_readable (s))) + { + session_state_t state = s->session_state; + rv = ((state & STATE_DISCONNECT) ? VPPCOM_ECONNRESET : VPPCOM_ENOTCONN); + return rv; + } + + is_nonblocking = VCL_SESS_ATTR_TEST (s->attr, VCL_SESS_ATTR_NONBLOCK); + is_ct = vcl_session_is_ct (s); + mq = is_ct ? s->our_evt_q : wrk->app_event_queue; + rx_fifo = s->rx_fifo; + + if (svm_fifo_is_empty (rx_fifo)) + { + if (is_nonblocking) + { + svm_fifo_unset_event (rx_fifo); + return VPPCOM_OK; + } + while (svm_fifo_is_empty (rx_fifo)) + { + svm_fifo_unset_event (rx_fifo); + svm_msg_q_lock (mq); + if (svm_msg_q_is_empty (mq)) + svm_msg_q_wait (mq); + + svm_msg_q_sub_w_lock (mq, &msg); + e = svm_msg_q_msg_data (mq, &msg); + svm_msg_q_unlock (mq); + if (!vcl_is_rx_evt_for_session (e, s->session_index, is_ct)) + { + vcl_handle_mq_ctrl_event (wrk, e); + svm_msg_q_free_msg (mq, &msg); + continue; + } + svm_msg_q_free_msg (mq, &msg); + + if (PREDICT_FALSE (s->session_state == STATE_CLOSE_ON_EMPTY)) + return 0; + } + } + + n_read = svm_fifo_segments (rx_fifo, (svm_fifo_segment_t *) ds); + svm_fifo_unset_event (rx_fifo); + + if (is_ct && n_read + svm_fifo_max_dequeue (rx_fifo) == rx_fifo->nitems) + { + /* If the peer is not polling send notification */ + if (!svm_fifo_has_event (s->rx_fifo)) + app_send_io_evt_to_vpp (s->vpp_evt_q, s->rx_fifo, + SESSION_IO_EVT_CT_RX, SVM_Q_WAIT); + } + + return n_read; +} + +void +vppcom_session_free_segments (uint32_t session_handle, + vppcom_data_segments_t ds) +{ + vcl_worker_t *wrk = vcl_worker_get_current (); + vcl_session_t *s; + + s = vcl_session_get_w_handle (wrk, session_handle); + if (PREDICT_FALSE (!s || s->is_vep)) + return; + + svm_fifo_segments_free (s->rx_fifo, (svm_fifo_segment_t *) ds); +} + static inline int vppcom_session_read_ready (vcl_session_t * session) { @@ -1405,6 +1476,19 @@ vppcom_session_read_ready (vcl_session_t * session) return svm_fifo_max_dequeue (session->rx_fifo); } +int +vppcom_data_segment_copy (void *buf, vppcom_data_segments_t ds, u32 max_bytes) +{ + u32 first_copy = clib_min (ds[0].len, max_bytes); + clib_memcpy (buf, ds[0].data, first_copy); + if (first_copy < max_bytes) + { + clib_memcpy (buf + first_copy, ds[1].data, + clib_min (ds[1].len, max_bytes - first_copy)); + } + return 0; +} + static u8 vcl_is_tx_evt_for_session (session_event_t * e, u32 sid, u8 is_ct) { diff --git a/src/vcl/vppcom.h b/src/vcl/vppcom.h index b5e753292ec..63466844a3f 100644 --- a/src/vcl/vppcom.h +++ b/src/vcl/vppcom.h @@ -147,6 +147,14 @@ typedef struct _vcl_poll short *revents; } vcl_poll_t; +typedef struct vppcom_data_segment_ +{ + unsigned char *data; + uint32_t len; +} vppcom_data_segment_t; + +typedef vppcom_data_segment_t vppcom_data_segments_t[2]; + /* * VPPCOM Public API Functions */ @@ -255,6 +263,13 @@ extern int vppcom_poll (vcl_poll_t * vp, uint32_t n_sids, extern int vppcom_mq_epoll_fd (void); extern int vppcom_session_index (uint32_t session_handle); +extern int vppcom_session_read_segments (uint32_t session_handle, + vppcom_data_segments_t ds); +extern void vppcom_session_free_segments (uint32_t session_handle, + vppcom_data_segments_t ds); +extern int vppcom_data_segment_copy (void *buf, vppcom_data_segments_t ds, + uint32_t max_bytes); + /** * Request from application to register a new worker * -- 2.16.6