From 0e88e851e058f4fb7cc690dbbdb19216ab360d1c Mon Sep 17 00:00:00 2001 From: Florin Coras Date: Mon, 17 Sep 2018 22:09:02 -0700 Subject: [PATCH] session/svm: add want_tx_event flag to fifo Have applications use explicit flag to request events from vpp when it transmits from a full fifo. Change-Id: I687c8f050a066bd5ce739d880eaec1f286038d95 Signed-off-by: Florin Coras --- src/svm/svm_fifo.h | 13 +++++++++++++ src/vcl/sock_test_server.c | 4 ++++ src/vcl/vcl_debug.h | 3 ++- src/vcl/vcl_test.h | 9 ++------- src/vcl/vppcom.c | 36 ++++++++++++++---------------------- src/vnet/session/session_node.c | 13 +++++++------ test/test_vcl.py | 32 +++++++++++++++++--------------- 7 files changed, 59 insertions(+), 51 deletions(-) diff --git a/src/svm/svm_fifo.h b/src/svm/svm_fifo.h index ec32fd5f40d..1ac5b6363cd 100644 --- a/src/svm/svm_fifo.h +++ b/src/svm/svm_fifo.h @@ -62,6 +62,7 @@ typedef struct _svm_fifo u32 segment_manager; CLIB_CACHE_LINE_ALIGN_MARK (end_shared); u32 head; + volatile u32 want_tx_evt; /**< producer wants nudge */ CLIB_CACHE_LINE_ALIGN_MARK (end_consumer); /* producer */ @@ -169,6 +170,18 @@ svm_fifo_unset_event (svm_fifo_t * f) __sync_lock_release (&f->has_event); } +static inline void +svm_fifo_set_want_tx_evt (svm_fifo_t * f, u8 want_evt) +{ + f->want_tx_evt = want_evt; +} + +static inline u8 +svm_fifo_want_tx_evt (svm_fifo_t * f) +{ + return f->want_tx_evt; +} + svm_fifo_t *svm_fifo_create (u32 data_size_in_bytes); void svm_fifo_free (svm_fifo_t * f); diff --git a/src/vcl/sock_test_server.c b/src/vcl/sock_test_server.c index 1280429f0f6..896aeb024d8 100644 --- a/src/vcl/sock_test_server.c +++ b/src/vcl/sock_test_server.c @@ -24,6 +24,7 @@ #include #include #include +#include #define SOCK_SERVER_USE_EPOLL 1 #define VPPCOM_SESSION_ATTR_UNIT_TEST 0 @@ -828,6 +829,7 @@ main (int argc, char **argv) if (EPOLLIN & ssm->wait_events[i].events) #endif { + read_again: rx_bytes = sock_test_read (client_fd, conn->buf, conn->buf_size, &conn->stats); if (rx_bytes > 0) @@ -910,6 +912,8 @@ main (int argc, char **argv) (conn->cfg.test == SOCK_TEST_TYPE_BI)) { stream_test_server (conn, rx_bytes); + if (ioctl (conn->fd, FIONREAD)) + goto read_again; continue; } diff --git a/src/vcl/vcl_debug.h b/src/vcl/vcl_debug.h index 52a3d94b63f..7b284166503 100644 --- a/src/vcl/vcl_debug.h +++ b/src/vcl/vcl_debug.h @@ -19,9 +19,10 @@ #include #define VCL_ELOG 0 +#define VCL_DBG_ON 1 #define VDBG(_lvl, _fmt, _args...) \ - if (vcm->debug > _lvl) \ + if (VCL_DBG_ON && vcm->debug > _lvl) \ clib_warning ("vcl: " _fmt, __vcl_worker_index, ##_args) #define foreach_vcl_dbg_evt \ diff --git a/src/vcl/vcl_test.h b/src/vcl/vcl_test.h index 83e63e1a4e7..68750af89a1 100644 --- a/src/vcl/vcl_test.h +++ b/src/vcl/vcl_test.h @@ -125,9 +125,7 @@ static inline int vcl_test_write (int fd, uint8_t *buf, uint32_t nbytes, sock_test_stats_t *stats, uint32_t verbose) { - int tx_bytes = 0; - int nbytes_left = nbytes; - int rv, errno_val; + int tx_bytes = 0, nbytes_left = nbytes, rv; do { @@ -163,10 +161,7 @@ vcl_test_write (int fd, uint8_t *buf, uint32_t nbytes, if (tx_bytes < 0) { - errno_val = errno; - perror ("ERROR in sock_test_write()"); - fprintf (stderr, "SOCK_TEST: ERROR: socket write failed " - "(errno = %d)!\n", errno_val); + vterr ("vpcom_session_write", -errno); } else if (stats) stats->tx_bytes += tx_bytes; diff --git a/src/vcl/vppcom.c b/src/vcl/vppcom.c index cf3a770f045..16b467f5a36 100644 --- a/src/vcl/vppcom.c +++ b/src/vcl/vppcom.c @@ -1511,6 +1511,7 @@ vppcom_session_write (uint32_t session_handle, void *buf, size_t n) svm_msg_q_msg_t msg; session_event_t *e; svm_msg_q_t *mq; + u8 is_ct; if (PREDICT_FALSE (!buf)) return VPPCOM_EINVAL; @@ -1519,9 +1520,6 @@ vppcom_session_write (uint32_t session_handle, void *buf, size_t n) if (PREDICT_FALSE (!s)) return VPPCOM_EBADFD; - tx_fifo = s->tx_fifo; - is_nonblocking = VCL_SESS_ATTR_TEST (s->attr, VCL_SESS_ATTR_NONBLOCK); - if (PREDICT_FALSE (s->is_vep)) { clib_warning ("VCL<%d>: ERROR: vpp handle 0x%llx, sid %u: " @@ -1531,18 +1529,20 @@ vppcom_session_write (uint32_t session_handle, void *buf, size_t n) return VPPCOM_EBADFD; } - if (!(s->session_state & STATE_OPEN)) + if (PREDICT_FALSE (!(s->session_state & STATE_OPEN))) { session_state_t state = s->session_state; rv = ((state & STATE_DISCONNECT) ? VPPCOM_ECONNRESET : VPPCOM_ENOTCONN); VDBG (1, "VCL<%d>: vpp handle 0x%llx, sid %u: session is not open! " - "state 0x%x (%s)", - getpid (), s->vpp_handle, session_handle, + "state 0x%x (%s)", getpid (), s->vpp_handle, session_handle, state, vppcom_session_state_str (state)); return rv; } - mq = vcl_session_is_ct (s) ? s->our_evt_q : wrk->app_event_queue; + tx_fifo = s->tx_fifo; + is_ct = vcl_session_is_ct (s); + is_nonblocking = VCL_SESS_ATTR_TEST (s->attr, VCL_SESS_ATTR_NONBLOCK); + mq = is_ct ? s->our_evt_q : wrk->app_event_queue; if (svm_fifo_is_full (tx_fifo)) { if (is_nonblocking) @@ -1551,15 +1551,15 @@ vppcom_session_write (uint32_t session_handle, void *buf, size_t n) } while (svm_fifo_is_full (tx_fifo)) { + svm_fifo_set_want_tx_evt (tx_fifo, 1); svm_msg_q_lock (mq); - while (svm_msg_q_is_empty (mq) && svm_msg_q_timedwait (mq, 10e-6)) - ; + svm_msg_q_wait (mq); + svm_msg_q_sub_w_lock (mq, &msg); e = svm_msg_q_msg_data (mq, &msg); svm_msg_q_unlock (mq); - if (!vcl_is_tx_evt_for_session (e, s->session_index, - s->our_evt_q != 0)) + if (!vcl_is_tx_evt_for_session (e, s->session_index, is_ct)) vcl_handle_mq_event (wrk, e); svm_msg_q_free_msg (mq, &msg); } @@ -1576,17 +1576,9 @@ vppcom_session_write (uint32_t session_handle, void *buf, size_t n) ASSERT (n_write > 0); - if (VPPCOM_DEBUG > 2) - { - if (n_write <= 0) - clib_warning ("VCL<%d>: vpp handle 0x%llx, sid %u: " - "FIFO-FULL (%p)", getpid (), s->vpp_handle, - session_handle, tx_fifo); - else - clib_warning ("VCL<%d>: vpp handle 0x%llx, sid %u: " - "wrote %d bytes tx-fifo: (%p)", getpid (), - s->vpp_handle, session_handle, n_write, tx_fifo); - } + VDBG (2, "VCL<%d>: vpp handle 0x%llx, sid %u: wrote %d bytes", getpid (), + s->vpp_handle, session_handle, n_write); + return n_write; } diff --git a/src/vnet/session/session_node.c b/src/vnet/session/session_node.c index f5aed7490ea..e2a6f4c67c6 100644 --- a/src/vnet/session/session_node.c +++ b/src/vnet/session/session_node.c @@ -817,7 +817,7 @@ skip_dequeue: { stream_session_t *s; /* $$$ prefetch 1 ahead maybe */ session_event_t *e; - u8 is_full; + u8 want_tx_evt; e = &fifo_events[i]; switch (e->event_type) @@ -836,18 +836,19 @@ skip_dequeue: clib_warning ("It's dead, Jim!"); continue; } - is_full = svm_fifo_is_full (s->server_tx_fifo); + want_tx_evt = svm_fifo_want_tx_evt (s->server_tx_fifo); /* Spray packets in per session type frames, since they go to * different nodes */ rv = (smm->session_tx_fns[s->session_type]) (vm, node, e, s, &n_tx_packets); if (PREDICT_TRUE (rv == SESSION_TX_OK)) { - /* Notify app there's tx space if not polling */ - if (PREDICT_FALSE (is_full - && !svm_fifo_has_event (s->server_tx_fifo))) - session_dequeue_notify (s); + if (PREDICT_FALSE (want_tx_evt)) + { + svm_fifo_set_want_tx_evt (s->server_tx_fifo, 0); + session_dequeue_notify (s); + } } else if (PREDICT_FALSE (rv == SESSION_TX_NO_BUFFERS)) { diff --git a/test/test_vcl.py b/test/test_vcl.py index 8a7faad6f0c..8e8cc401888 100644 --- a/test/test_vcl.py +++ b/test/test_vcl.py @@ -341,28 +341,29 @@ class VCLThruHostStackTestCase(VCLTestCase): # is fixed. -class VCLThruHostStackExtendedATestCase(VCLTestCase): - """ VCL Thru Host Stack Extended Tests """ +class VCLThruHostStackNSessionBidirTestCase(VCLTestCase): + """ VCL Thru Host Stack NSession Bidir Tests """ def setUp(self): - super(VCLThruHostStackExtendedATestCase, self).setUp() + super(VCLThruHostStackNSessionBidirTestCase, self).setUp() self.thru_host_stack_setup() if self.vppDebug: self.client_bi_dir_nsock_timeout = 120 - self.client_bi_dir_nsock_test_args = ["-B", "-X", + self.client_bi_dir_nsock_test_args = ["-B", "-X", "-N 10000", self.loop0.local_ip4, self.server_port] else: self.client_bi_dir_nsock_timeout = 90 self.client_bi_dir_nsock_test_args = ["-I", "2", "-B", "-X", + "-N 1000", self.loop0.local_ip4, self.server_port] def tearDown(self): self.thru_host_stack_tear_down() - super(VCLThruHostStackExtendedATestCase, self).tearDown() + super(VCLThruHostStackNSessionBidirTestCase, self).tearDown() @unittest.skipUnless(running_extended_tests(), "part of extended tests") def test_vcl_thru_host_stack_bi_dir_nsock(self): @@ -375,7 +376,7 @@ class VCLThruHostStackExtendedATestCase(VCLTestCase): class VCLThruHostStackExtendedBTestCase(VCLTestCase): - """ VCL Thru Host Stack Extended Tests """ + """ VCL Thru Host Stack Extended B Tests """ def setUp(self): super(VCLThruHostStackExtendedBTestCase, self).setUp() @@ -383,12 +384,13 @@ class VCLThruHostStackExtendedBTestCase(VCLTestCase): self.thru_host_stack_setup() if self.vppDebug: self.client_bi_dir_nsock_timeout = 120 - self.client_bi_dir_nsock_test_args = ["-B", "-X", + self.client_bi_dir_nsock_test_args = ["-B", "-X", "-N 1000", self.loop0.local_ip4, self.server_port] else: self.client_bi_dir_nsock_timeout = 60 self.client_bi_dir_nsock_test_args = ["-I", "2", "-B", "-X", + "-N 1000", self.loop0.local_ip4, self.server_port] @@ -408,7 +410,7 @@ class VCLThruHostStackExtendedBTestCase(VCLTestCase): class VCLThruHostStackExtendedCTestCase(VCLTestCase): - """ VCL Thru Host Stack Extended Tests """ + """ VCL Thru Host Stack Extended C Tests """ def setUp(self): super(VCLThruHostStackExtendedCTestCase, self).setUp() @@ -422,7 +424,7 @@ class VCLThruHostStackExtendedCTestCase(VCLTestCase): self.numSockets = "5" self.client_uni_dir_nsock_test_args = ["-I", self.numSockets, - "-U", "-X", + "-U", "-X", "-N 1000", self.loop0.local_ip4, self.server_port] @@ -442,7 +444,7 @@ class VCLThruHostStackExtendedCTestCase(VCLTestCase): class VCLThruHostStackExtendedDTestCase(VCLTestCase): - """ VCL Thru Host Stack Extended Tests """ + """ VCL Thru Host Stack Extended D Tests """ def setUp(self): super(VCLThruHostStackExtendedDTestCase, self).setUp() @@ -456,7 +458,7 @@ class VCLThruHostStackExtendedDTestCase(VCLTestCase): self.numSockets = "5" self.client_uni_dir_nsock_test_args = ["-I", self.numSockets, - "-U", "-X", + "-U", "-X", "-N 1000", self.loop0.local_ip4, self.server_port] @@ -648,7 +650,7 @@ class VCLIpv6ThruHostStackTestCase(VCLTestCase): class VCLIpv6ThruHostStackExtendedATestCase(VCLTestCase): - """ VCL IPv6 Thru Host Stack Extended Tests """ + """ VCL IPv6 Thru Host Stack Extended A Tests """ def setUp(self): super(VCLIpv6ThruHostStackExtendedATestCase, self).setUp() @@ -682,7 +684,7 @@ class VCLIpv6ThruHostStackExtendedATestCase(VCLTestCase): class VCLIpv6ThruHostStackExtendedBTestCase(VCLTestCase): - """ VCL IPv6 Thru Host Stack Extended Tests """ + """ VCL IPv6 Thru Host Stack Extended B Tests """ def setUp(self): super(VCLIpv6ThruHostStackExtendedBTestCase, self).setUp() @@ -717,7 +719,7 @@ class VCLIpv6ThruHostStackExtendedBTestCase(VCLTestCase): class VCLIpv6ThruHostStackExtendedCTestCase(VCLTestCase): - """ VCL IPv6 Thru Host Stack Extended Tests """ + """ VCL IPv6 Thru Host Stack Extended C Tests """ def setUp(self): super(VCLIpv6ThruHostStackExtendedCTestCase, self).setUp() @@ -753,7 +755,7 @@ class VCLIpv6ThruHostStackExtendedCTestCase(VCLTestCase): class VCLIpv6ThruHostStackExtendedDTestCase(VCLTestCase): - """ VCL IPv6 Thru Host Stack Extended Tests """ + """ VCL IPv6 Thru Host Stack Extended D Tests """ def setUp(self): super(VCLIpv6ThruHostStackExtendedDTestCase, self).setUp() -- 2.16.6