From: Florin Coras Date: Fri, 23 Apr 2021 21:01:01 +0000 (-0700) Subject: hsa: use tcp for vcl test control channel X-Git-Tag: v21.10-rc0~190 X-Git-Url: https://gerrit.fd.io/r/gitweb?a=commitdiff_plain;ds=sidebyside;h=7992bdcecea6935b630203b5a73c4df641d46b5d;p=vpp.git hsa: use tcp for vcl test control channel Also, only exchange config over control session. Type: improvement Signed-off-by: Florin Coras Change-Id: I001df635896762bc5330cebb7d5744e3e754482d --- diff --git a/src/plugins/hs_apps/vcl/vcl_test.h b/src/plugins/hs_apps/vcl/vcl_test.h index 6bb3446cf48..5b3ae653860 100644 --- a/src/plugins/hs_apps/vcl/vcl_test.h +++ b/src/plugins/hs_apps/vcl/vcl_test.h @@ -67,6 +67,8 @@ #define VCL_TEST_CFG_MAX_TEST_SESS 32 #define VCL_TEST_CFG_MAX_EPOLL_EVENTS 16 +#define VCL_TEST_CTRL_LISTENER (~0 - 1) +#define VCL_TEST_DATA_LISTENER (~0) #define VCL_TEST_DELAY_DISCONNECT 1 #define VCL_TEST_SEPARATOR_STRING \ " -----------------------------\n" @@ -79,11 +81,19 @@ typedef enum VCL_TEST_TYPE_EXIT, } vcl_test_t; +typedef enum +{ + VCL_TEST_CMD_SYNC, + VCL_TEST_CMD_START, + VCL_TEST_CMD_STOP, +} vcl_test_cmd_t; + typedef struct __attribute__ ((packed)) { uint32_t magic; uint32_t seq_num; uint32_t test; + uint32_t cmd; uint32_t ctrl_handle; uint32_t num_test_sessions; uint32_t num_test_sessions_perq; diff --git a/src/plugins/hs_apps/vcl/vcl_test_client.c b/src/plugins/hs_apps/vcl/vcl_test_client.c index 35f1ac11818..6b0953f0f4d 100644 --- a/src/plugins/hs_apps/vcl/vcl_test_client.c +++ b/src/plugins/hs_apps/vcl/vcl_test_client.c @@ -266,7 +266,7 @@ vtc_connect_test_sessions (vcl_test_client_worker_t * wrk) return ts->fd; } - if (vcm->proto == VPPCOM_PROTO_TLS) + if (vcm->proto == VPPCOM_PROTO_TLS || vcm->proto == VPPCOM_PROTO_DTLS) { uint32_t ckp_len = sizeof (vcm->ckpair_index); vppcom_session_attr (ts->fd, VPPCOM_ATTR_SET_CKPAIR, @@ -365,13 +365,9 @@ vtc_worker_init (vcl_test_client_worker_t * wrk) if (vtc_worker_test_setup (wrk)) return -1; - vtinf ("Sending config to server on all sessions ..."); - for (n = 0; n < cfg->num_test_sessions; n++) { ts = &wrk->sessions[n]; - if (vtc_cfg_sync (ts)) - return -1; memset (&ts->stats, 0, sizeof (ts->stats)); } @@ -418,23 +414,13 @@ vtc_accumulate_stats (vcl_test_client_worker_t * wrk, static void vtc_worker_sessions_exit (vcl_test_client_worker_t * wrk) { - vcl_test_client_main_t *vcm = &vcl_client_main; - vcl_test_session_t *ctrl = &vcm->ctrl_session; vcl_test_session_t *ts; - int i, verbose = ctrl->cfg.verbose; + int i; for (i = 0; i < wrk->cfg.num_test_sessions; i++) { ts = &wrk->sessions[i]; - ts->cfg.test = VCL_TEST_TYPE_EXIT; - - if (verbose) - { - vtinf ("(fd %d): Sending exit cfg to server...", ts->fd); - vcl_test_cfg_dump (&ts->cfg, 1 /* is_client */ ); - } - (void) vcl_test_write (ts->fd, (uint8_t *) & ts->cfg, - sizeof (ts->cfg), &ts->stats, verbose); + vppcom_session_close (ts->fd); } wrk->n_sessions = 0; @@ -570,25 +556,23 @@ vtc_print_stats (vcl_test_session_t * ctrl) static void vtc_echo_client (vcl_test_client_main_t * vcm) { - vcl_test_client_worker_t *wrk; vcl_test_session_t *ctrl = &vcm->ctrl_session; vcl_test_cfg_t *cfg = &ctrl->cfg; + int rv; cfg->total_bytes = strlen (ctrl->txbuf) + 1; memset (&ctrl->stats, 0, sizeof (ctrl->stats)); - /* Echo works with only one worker */ - wrk = vcm->workers; - wrk->wrk_index = 0; - wrk->cfg = *cfg; - - vtc_worker_loop (wrk); + rv = vcl_test_write (ctrl->fd, (uint8_t *) ctrl->txbuf, cfg->total_bytes, + &ctrl->stats, ctrl->cfg.verbose); + if (rv < 0) + { + vtwrn ("vppcom_test_write (%d) failed ", ctrl->fd); + return; + } - /* Not relevant for echo test - clock_gettime (CLOCK_REALTIME, &ctrl->stats.stop); - vtc_accumulate_stats (wrk, ctrl); - vtc_print_stats (ctrl); - */ + (void) vcl_test_read (ctrl->fd, (uint8_t *) ctrl->rxbuf, ctrl->rxbuf_size, + &ctrl->stats); } static void @@ -603,14 +587,7 @@ vtc_stream_client (vcl_test_client_main_t * vcm) ctrl->cfg.test == VCL_TEST_TYPE_BI ? "Bi" : "Uni"); cfg->total_bytes = cfg->num_writes * cfg->txbuf_size; - cfg->ctrl_handle = ~0; - if (vtc_cfg_sync (ctrl)) - { - vtwrn ("test cfg sync failed -- aborting!"); - return; - } - cfg->ctrl_handle = ((vcl_test_cfg_t *) ctrl->rxbuf)->ctrl_handle; - memset (&ctrl->stats, 0, sizeof (ctrl->stats)); + cfg->ctrl_handle = ctrl->fd; n_conn = cfg->num_test_sessions; n_conn_per_wrk = n_conn / vcm->n_workers; @@ -623,6 +600,13 @@ vtc_stream_client (vcl_test_client_main_t * vcm) n_conn -= wrk->cfg.num_test_sessions; } + ctrl->cfg.cmd = VCL_TEST_CMD_START; + if (vtc_cfg_sync (ctrl)) + { + vtwrn ("test cfg sync failed -- aborting!"); + return; + } + for (i = 1; i < vcm->n_workers; i++) { wrk = &vcm->workers[i]; @@ -635,6 +619,7 @@ vtc_stream_client (vcl_test_client_main_t * vcm) ; vtinf ("Sending config on ctrl session (fd %d) for stats...", ctrl->fd); + ctrl->cfg.cmd = VCL_TEST_CMD_STOP; if (vtc_cfg_sync (ctrl)) { vtwrn ("test cfg sync failed -- aborting!"); @@ -643,6 +628,7 @@ vtc_stream_client (vcl_test_client_main_t * vcm) vtc_print_stats (ctrl); + ctrl->cfg.cmd = VCL_TEST_CMD_SYNC; ctrl->cfg.test = VCL_TEST_TYPE_ECHO; ctrl->cfg.total_bytes = 0; if (vtc_cfg_sync (ctrl)) @@ -1073,11 +1059,9 @@ vtc_ctrl_session_exit (void) int verbose = ctrl->cfg.verbose; ctrl->cfg.test = VCL_TEST_TYPE_EXIT; + vtinf ("(fd %d): Sending exit cfg to server...", ctrl->fd); if (verbose) - { - vtinf ("(fd %d): Sending exit cfg to server...", ctrl->fd); - vcl_test_cfg_dump (&ctrl->cfg, 1 /* is_client */ ); - } + vcl_test_cfg_dump (&ctrl->cfg, 1 /* is_client */); (void) vcl_test_write (ctrl->fd, (uint8_t *) & ctrl->cfg, sizeof (ctrl->cfg), &ctrl->stats, verbose); sleep (1); @@ -1101,51 +1085,17 @@ main (int argc, char **argv) if (rv < 0) vtfail ("vppcom_app_create()", rv); - ctrl->fd = vppcom_session_create (vcm->proto, 0 /* is_nonblocking */ ); + ctrl->fd = vppcom_session_create (VPPCOM_PROTO_TCP, 0 /* is_nonblocking */); if (ctrl->fd < 0) vtfail ("vppcom_session_create()", ctrl->fd); - if (vcm->proto == VPPCOM_PROTO_TLS || vcm->proto == VPPCOM_PROTO_QUIC || - vcm->proto == VPPCOM_PROTO_DTLS) - { - vppcom_cert_key_pair_t ckpair; - uint32_t ckp_len; - int ckp_index; - - vtinf ("Adding tls certs ..."); - ckpair.cert = vcl_test_crt_rsa; - ckpair.key = vcl_test_key_rsa; - ckpair.cert_len = vcl_test_crt_rsa_len; - ckpair.key_len = vcl_test_key_rsa_len; - ckp_index = vppcom_add_cert_key_pair (&ckpair); - if (ckp_index < 0) - vtfail ("vppcom_add_cert_key_pair()", ckp_index); - - vcm->ckpair_index = ckp_index; - ckp_len = sizeof (ckp_index); - vppcom_session_attr (ctrl->fd, VPPCOM_ATTR_SET_CKPAIR, &ckp_index, - &ckp_len); - } - vtinf ("Connecting to server..."); - if (vcm->proto == VPPCOM_PROTO_QUIC) - { - quic_session->fd = vppcom_session_create (vcm->proto, - 0 /* is_nonblocking */ ); - if (quic_session->fd < 0) - vtfail ("vppcom_session_create()", quic_session->fd); - rv = vppcom_session_connect (quic_session->fd, &vcm->server_endpt); - if (rv) - vtfail ("vppcom_session_connect()", rv); - vtinf ("Connecting to stream..."); - rv = vppcom_session_stream_connect (ctrl->fd, quic_session->fd); - } - else - rv = vppcom_session_connect (ctrl->fd, &vcm->server_endpt); + rv = vppcom_session_connect (ctrl->fd, &vcm->server_endpt); if (rv) vtfail ("vppcom_session_connect()", rv); vtinf ("Control session (fd %d) connected.", ctrl->fd); + ctrl->cfg.cmd = VCL_TEST_CMD_SYNC; rv = vtc_cfg_sync (ctrl); if (rv) vtfail ("vtc_cfg_sync()", rv); @@ -1153,6 +1103,9 @@ main (int argc, char **argv) ctrl->cfg.ctrl_handle = ((vcl_test_cfg_t *) ctrl->rxbuf)->ctrl_handle; memset (&ctrl->stats, 0, sizeof (ctrl->stats)); + /* Update ctrl port to data port */ + vcm->server_endpt.port += 1; + while (ctrl->cfg.test != VCL_TEST_TYPE_EXIT) { if (vcm->dump_cfg) diff --git a/src/plugins/hs_apps/vcl/vcl_test_server.c b/src/plugins/hs_apps/vcl/vcl_test_server.c index b4966bf168b..5b02fc08e8e 100644 --- a/src/plugins/hs_apps/vcl/vcl_test_server.c +++ b/src/plugins/hs_apps/vcl/vcl_test_server.c @@ -31,6 +31,7 @@ typedef struct { uint8_t is_alloc; + uint8_t is_open; int fd; uint8_t *buf; uint32_t buf_size; @@ -67,6 +68,8 @@ typedef struct vcl_test_server_cfg_t cfg; vcl_test_server_worker_t *workers; + vcl_test_server_conn_t *ctrl; + int ctrl_listen_fd; struct sockaddr_storage servaddr; volatile int worker_fails; volatile int active_workers; @@ -157,33 +160,81 @@ sync_config_and_reply (vcl_test_server_conn_t * conn, vcl_test_cfg_t * rx_cfg) } static void -vts_server_start_stop (vcl_test_server_worker_t * wrk, - vcl_test_server_conn_t * conn, vcl_test_cfg_t * rx_cfg) +vts_session_close (vcl_test_server_conn_t *conn) +{ + if (!conn->is_open) + return; + vppcom_session_close (conn->fd); + conn->is_open = 0; +} + +static void +vts_session_cleanup (vcl_test_server_conn_t *conn) +{ + vts_session_close (conn); + conn_pool_free (conn); +} + +static void +vts_wrk_cleanup_all (vcl_test_server_worker_t *wrk) +{ + vcl_test_server_conn_t *conn; + int i; + + for (i = 0; i < wrk->conn_pool_size; i++) + { + conn = &wrk->conn_pool[i]; + vts_session_cleanup (conn); + } + + wrk->nfds = 0; +} + +static void +vts_test_cmd (vcl_test_server_worker_t *wrk, vcl_test_server_conn_t *conn, + vcl_test_cfg_t *rx_cfg) { u8 is_bi = rx_cfg->test == VCL_TEST_TYPE_BI; vcl_test_server_conn_t *tc; char buf[64]; int i; - if (rx_cfg->ctrl_handle == conn->fd) + if (rx_cfg->cmd == VCL_TEST_CMD_STOP) { + struct timespec stop; + clock_gettime (CLOCK_REALTIME, &stop); + + /* Test session are not closed, e.g., connection-less or errors */ + if (wrk->nfds > 1) + { + vtinf ("%u sessions are still open", wrk->nfds - 1); + stop.tv_sec -= VCL_TEST_DELAY_DISCONNECT; + conn->stats.stop = stop; + } + + /* Accumulate stats over all of the worker's sessions */ for (i = 0; i < wrk->conn_pool_size; i++) { tc = &wrk->conn_pool[i]; - if (tc->cfg.ctrl_handle != conn->fd) + if (tc == conn) continue; vcl_test_stats_accumulate (&conn->stats, &tc->stats); - if (vcl_comp_tspec (&conn->stats.stop, &tc->stats.stop) < 0) - conn->stats.stop = tc->stats.stop; - /* Client delays sending of disconnect */ - conn->stats.stop.tv_sec -= VCL_TEST_DELAY_DISCONNECT; - if (conn->cfg.verbose) + if (tc->is_open) { - snprintf (buf, sizeof (buf), "SERVER (fd %d) RESULTS", tc->fd); - vcl_test_stats_dump (buf, &tc->stats, 1 /* show_rx */ , - is_bi /* show tx */ , conn->cfg.verbose); + vts_session_close (tc); + continue; } + /* Only relevant if all connections previously closed */ + if (vcl_comp_tspec (&conn->stats.stop, &tc->stats.stop) < 0) + conn->stats.stop = tc->stats.stop; + } + + if (conn->cfg.verbose) + { + snprintf (buf, sizeof (buf), "SERVER (fd %d) RESULTS", conn->fd); + vcl_test_stats_dump (buf, &conn->stats, 1 /* show_rx */, + is_bi /* show tx */, conn->cfg.verbose); } vcl_test_stats_dump ("SERVER RESULTS", &conn->stats, 1 /* show_rx */ , @@ -202,19 +253,17 @@ vts_server_start_stop (vcl_test_server_worker_t * wrk, sync_config_and_reply (conn, rx_cfg); memset (&conn->stats, 0, sizeof (conn->stats)); } - else + else if (rx_cfg->cmd == VCL_TEST_CMD_SYNC) { - if (rx_cfg->ctrl_handle == ~0) - { - rx_cfg->ctrl_handle = conn->fd; - vtinf ("Set control fd %d for test!", conn->fd); - } - else - { - vtinf ("Starting %s-directional Stream Test (fd %d)!", - is_bi ? "Bi" : "Uni", conn->fd); - } - + rx_cfg->ctrl_handle = conn->fd; + vtinf ("Set control fd %d for test!", conn->fd); + sync_config_and_reply (conn, rx_cfg); + } + else if (rx_cfg->cmd == VCL_TEST_CMD_START) + { + vtinf ("Starting %s-directional Stream Test (fd %d)!", + is_bi ? "Bi" : "Uni", conn->fd); + rx_cfg->ctrl_handle = conn->fd; sync_config_and_reply (conn, rx_cfg); /* read the 1st chunk, record start time */ @@ -224,7 +273,7 @@ vts_server_start_stop (vcl_test_server_worker_t * wrk, } static inline void -vts_server_rx (vcl_test_server_conn_t * conn, int rx_bytes) +vts_server_process_rx (vcl_test_server_conn_t *conn, int rx_bytes) { vcl_test_server_main_t *vsm = &vcl_server_main; int client_fd = conn->fd; @@ -290,8 +339,8 @@ vts_server_echo (vcl_test_server_conn_t * conn, int rx_bytes) vtinf ("(fd %d): TX (%d bytes) - '%s'", conn->fd, tx_bytes, conn->buf); } -static void -vts_new_client (vcl_test_server_worker_t * wrk, int listen_fd) +static vcl_test_server_conn_t * +vts_accept_client (vcl_test_server_worker_t *wrk, int listen_fd) { vcl_test_server_conn_t *conn; struct epoll_event ev; @@ -301,16 +350,17 @@ vts_new_client (vcl_test_server_worker_t * wrk, int listen_fd) if (!conn) { vtwrn ("No free connections!"); - return; + return 0; } client_fd = vppcom_session_accept (listen_fd, &conn->endpt, 0); if (client_fd < 0) { vterr ("vppcom_session_accept()", client_fd); - return; + return 0; } conn->fd = client_fd; + conn->is_open = 1; vtinf ("Got a connection -- fd = %d (0x%08x) on listener fd = %d (0x%08x)", client_fd, client_fd, listen_fd, listen_fd); @@ -321,9 +371,11 @@ vts_new_client (vcl_test_server_worker_t * wrk, int listen_fd) if (rv < 0) { vterr ("vppcom_epoll_ctl()", rv); - return; + return 0; } wrk->nfds++; + + return conn; } static void @@ -467,8 +519,8 @@ vts_clean_connected_listeners (vcl_test_server_worker_t * wrk, } int -vts_handle_cfg (vcl_test_server_worker_t * wrk, vcl_test_cfg_t * rx_cfg, - vcl_test_server_conn_t * conn, int rx_bytes) +vts_handle_ctrl_cfg (vcl_test_server_worker_t *wrk, vcl_test_cfg_t *rx_cfg, + vcl_test_server_conn_t *conn, int rx_bytes) { int listener_fd; if (rx_cfg->verbose) @@ -502,17 +554,17 @@ vts_handle_cfg (vcl_test_server_worker_t * wrk, vcl_test_cfg_t * rx_cfg, case VCL_TEST_TYPE_BI: case VCL_TEST_TYPE_UNI: - vts_server_start_stop (wrk, conn, rx_cfg); + vts_test_cmd (wrk, conn, rx_cfg); break; case VCL_TEST_TYPE_EXIT: - vtinf ("Session fd %d closing!", conn->fd); - clock_gettime (CLOCK_REALTIME, &conn->stats.stop); + vtinf ("Ctrl session fd %d closing!", conn->fd); listener_fd = vppcom_session_listener (conn->fd); - vppcom_session_close (conn->fd); - conn_pool_free (conn); - wrk->nfds--; vts_clean_connected_listeners (wrk, listener_fd); + vts_session_cleanup (conn); + wrk->nfds--; + if (wrk->nfds) + vts_wrk_cleanup_all (wrk); break; default: @@ -583,12 +635,16 @@ vts_worker_init (vcl_test_server_worker_t * wrk) vtfail ("vppcom_session_listen()", rv); } - wrk->epfd = vppcom_epoll_create (); - if (wrk->epfd < 0) - vtfail ("vppcom_epoll_create()", wrk->epfd); + /* First worker already has epoll fd */ + if (wrk->wrk_index) + { + wrk->epfd = vppcom_epoll_create (); + if (wrk->epfd < 0) + vtfail ("vppcom_epoll_create()", wrk->epfd); + } listen_ev.events = EPOLLIN; - listen_ev.data.u32 = ~0; + listen_ev.data.u32 = VCL_TEST_DATA_LISTENER; rv = vppcom_epoll_ctl (wrk->epfd, EPOLL_CTL_ADD, wrk->listen_fd, &listen_ev); if (rv < 0) @@ -598,30 +654,6 @@ vts_worker_init (vcl_test_server_worker_t * wrk) vtinf ("Waiting for a client to connect on port %d ...", vsm->cfg.port); } -static int -vts_conn_expect_config (vcl_test_server_conn_t * conn) -{ - if (conn->cfg.test == VCL_TEST_TYPE_ECHO) - return 1; - - return (conn->stats.rx_bytes < 128 - || conn->stats.rx_bytes > conn->cfg.total_bytes); -} - -static vcl_test_cfg_t * -vts_conn_read_config (vcl_test_server_conn_t * conn) -{ - vcl_test_server_main_t *vsm = &vcl_server_main; - - if (vsm->use_ds) - { - /* We could avoid the copy if the first segment is big enough but this - * just simplifies things */ - vts_copy_ds (conn->buf, conn->ds, sizeof (vcl_test_cfg_t)); - } - return (vcl_test_cfg_t *) conn->buf; -} - static inline int vts_conn_read (vcl_test_server_conn_t * conn) { @@ -632,17 +664,6 @@ vts_conn_read (vcl_test_server_conn_t * conn) return vcl_test_read (conn->fd, conn->buf, conn->buf_size, &conn->stats); } -static inline int -vts_conn_has_ascii (vcl_test_server_conn_t * conn) -{ - vcl_test_server_main_t *vsm = &vcl_server_main; - - if (vsm->use_ds) - return isascii (conn->ds[0].data[0]); - else - return isascii (conn->buf[0]); -} - static void * vts_worker_loop (void *arg) { @@ -672,13 +693,15 @@ vts_worker_loop (void *arg) for (i = 0; i < num_ev; i++) { conn = &wrk->conn_pool[wrk->wait_events[i].data.u32]; + /* + * Check for close events + */ if (wrk->wait_events[i].events & (EPOLLHUP | EPOLLRDHUP)) { - vtinf ("Closing session %d on HUP", conn->fd); listener_fd = vppcom_session_listener (conn->fd); - vppcom_session_close (conn->fd); - wrk->nfds--; vts_clean_connected_listeners (wrk, listener_fd); + vts_session_close (conn); + wrk->nfds--; if (!wrk->nfds) { vtinf ("All client connections closed\n"); @@ -686,17 +709,66 @@ vts_worker_loop (void *arg) } continue; } - if (wrk->wait_events[i].data.u32 == ~0) + + /* + * Check if new session needs to be accepted + */ + + if (wrk->wait_events[i].data.u32 == VCL_TEST_CTRL_LISTENER) + { + if (vsm->ctrl) + { + vtwrn ("ctrl already exists"); + continue; + } + vsm->ctrl = vts_accept_client (wrk, vsm->ctrl_listen_fd); + continue; + } + if (wrk->wait_events[i].data.u32 == VCL_TEST_DATA_LISTENER) { - vts_new_client (wrk, wrk->listen_fd); + conn = vts_accept_client (wrk, wrk->listen_fd); + conn->cfg = vsm->ctrl->cfg; continue; } else if (vppcom_session_is_connectable_listener (conn->fd)) { - vts_new_client (wrk, conn->fd); + vts_accept_client (wrk, conn->fd); + continue; + } + + /* + * Message on control session + */ + + if (!wrk->wrk_index && conn->fd == vsm->ctrl->fd) + { + rx_bytes = vcl_test_read (conn->fd, conn->buf, conn->buf_size, + &conn->stats); + rx_cfg = (vcl_test_cfg_t *) conn->buf; + if (rx_cfg->magic == VCL_TEST_CFG_CTRL_MAGIC) + { + vts_handle_ctrl_cfg (wrk, rx_cfg, conn, rx_bytes); + if (!wrk->nfds) + { + vtinf ("All client connections closed\n"); + goto done; + } + } + else if (isascii (conn->buf[0])) + { + vts_server_echo (conn, rx_bytes); + } + else + { + vtwrn ("FIFO not drained! extra bytes %d", rx_bytes); + } continue; } + /* + * Read perf test data + */ + if (EPOLLIN & wrk->wait_events[i].events) { read_again: @@ -712,40 +784,11 @@ vts_worker_loop (void *arg) else continue; } - - if (vts_conn_expect_config (conn)) - { - rx_cfg = vts_conn_read_config (conn); - if (rx_cfg->magic == VCL_TEST_CFG_CTRL_MAGIC) - { - if (vsm->use_ds) - vppcom_session_free_segments (conn->fd, rx_bytes); - vts_handle_cfg (wrk, rx_cfg, conn, rx_bytes); - if (!wrk->nfds) - { - vtinf ("All client connections closed\n"); - goto done; - } - continue; - } - } - if ((conn->cfg.test == VCL_TEST_TYPE_UNI) - || (conn->cfg.test == VCL_TEST_TYPE_BI)) - { - vts_server_rx (conn, rx_bytes); - if (vppcom_session_attr (conn->fd, VPPCOM_ATTR_GET_NREAD, 0, - 0) > 0) - goto read_again; - continue; - } - if (vts_conn_has_ascii (conn)) - { - vts_server_echo (conn, rx_bytes); - } - else - { - vtwrn ("FIFO not drained! extra bytes %d", rx_bytes); - } + vts_server_process_rx (conn, rx_bytes); + if (vppcom_session_attr (conn->fd, VPPCOM_ATTR_GET_NREAD, 0, 0) > + 0) + goto read_again; + continue; } else { @@ -766,6 +809,42 @@ done: return 0; } +static void +vts_ctrl_session_init (vcl_test_server_worker_t *wrk) +{ + vcl_test_server_main_t *vsm = &vcl_server_main; + struct epoll_event listen_ev; + int rv; + + vtinf ("Initializing main ctrl session ..."); + + vsm->ctrl_listen_fd = + vppcom_session_create (VPPCOM_PROTO_TCP, 0 /* is_nonblocking */); + if (vsm->ctrl_listen_fd < 0) + vtfail ("vppcom_session_create()", vsm->ctrl_listen_fd); + + rv = vppcom_session_bind (vsm->ctrl_listen_fd, &vsm->cfg.endpt); + if (rv < 0) + vtfail ("vppcom_session_bind()", rv); + + rv = vppcom_session_listen (vsm->ctrl_listen_fd, 10); + if (rv < 0) + vtfail ("vppcom_session_listen()", rv); + + wrk->epfd = vppcom_epoll_create (); + if (wrk->epfd < 0) + vtfail ("vppcom_epoll_create()", wrk->epfd); + + listen_ev.events = EPOLLIN; + listen_ev.data.u32 = VCL_TEST_CTRL_LISTENER; + rv = vppcom_epoll_ctl (wrk->epfd, EPOLL_CTL_ADD, vsm->ctrl_listen_fd, + &listen_ev); + if (rv < 0) + vtfail ("vppcom_epoll_ctl", rv); + + vtinf ("Waiting for a client to connect on port %d ...", vsm->cfg.port); +} + int main (int argc, char **argv) { @@ -783,6 +862,10 @@ main (int argc, char **argv) vtfail ("vppcom_app_create()", rv); vsm->workers = calloc (vsm->cfg.workers, sizeof (*vsm->workers)); + vts_ctrl_session_init (&vsm->workers[0]); + + /* Update ctrl port to data port */ + vsm->cfg.endpt.port += 1; vts_worker_init (&vsm->workers[0]); for (i = 1; i < vsm->cfg.workers; i++) { @@ -790,6 +873,7 @@ main (int argc, char **argv) rv = pthread_create (&vsm->workers[i].thread_handle, NULL, vts_worker_loop, (void *) &vsm->workers[i]); } + vts_worker_loop (&vsm->workers[0]); while (vsm->active_workers > 0)