From 47e77304edf72fda629f054b393013ff795d5637 Mon Sep 17 00:00:00 2001 From: Dave Wallace Date: Wed, 5 Jun 2019 10:40:07 -0400 Subject: [PATCH] quic: server create streams test case Type: test * Refactor quic_echo test app * Add timinig capabilities * Add multiple quic tests Change-Id: I3302c66539b12c1375d1a0c6d46f9ff4c6f2b27c Signed-off-by: Dave Wallace --- src/plugins/quic/quic.c | 2 +- src/tests/vnet/session/quic_echo.c | 1007 ++++++++++++++++++++++-------------- test/test_quic.py | 317 +++++------- 3 files changed, 746 insertions(+), 580 deletions(-) diff --git a/src/plugins/quic/quic.c b/src/plugins/quic/quic.c index 0b6c975935e..4e158bcb2e3 100644 --- a/src/plugins/quic/quic.c +++ b/src/plugins/quic/quic.c @@ -1936,7 +1936,7 @@ quic_create_quic_session (quic_ctx_t * ctx) quic_session->session_type = session_type_from_proto_and_ip (TRANSPORT_PROTO_QUIC, ctx->c_quic_ctx_id.udp_is_ip4); - quic_session->listener_handle = lctx->c_quic_ctx_id.listener_ctx_id; + quic_session->listener_handle = lctx->c_s_index; /* TODO: don't alloc fifos when we don't transfer data on this session * but we still need fifos for the events? */ diff --git a/src/tests/vnet/session/quic_echo.c b/src/tests/vnet/session/quic_echo.c index 0149448c916..a62af642b2f 100644 --- a/src/tests/vnet/session/quic_echo.c +++ b/src/tests/vnet/session/quic_echo.c @@ -38,7 +38,7 @@ #include #undef vl_printfun -#define QUIC_ECHO_DBG 1 +#define QUIC_ECHO_DBG 0 #define DBG(_fmt, _args...) \ if (QUIC_ECHO_DBG) \ clib_warning (_fmt, ##_args) @@ -68,6 +68,17 @@ typedef enum STATE_DETACHED } connection_state_t; +typedef enum +{ + ECHO_EVT_START, /* app starts */ + ECHO_EVT_FIRST_QCONNECT, /* First connect Quic session sent */ + ECHO_EVT_LAST_QCONNECTED, /* All Quic session are connected */ + ECHO_EVT_FIRST_SCONNECT, /* First connect Stream session sent */ + ECHO_EVT_LAST_SCONNECTED, /* All Stream session are connected */ + ECHO_EVT_LAST_BYTE, /* Last byte received */ + ECHO_EVT_EXIT, /* app exits */ +} echo_test_evt_t; + enum quic_session_type_t { QUIC_SESSION_TYPE_QUIC = 0, @@ -75,6 +86,28 @@ enum quic_session_type_t QUIC_SESSION_TYPE_LISTEN = INT32_MAX, }; +typedef struct _quic_echo_cb_vft +{ + void (*quic_connected_cb) (session_connected_msg_t * mp, u32 session_index); + void (*client_stream_connected_cb) (session_connected_msg_t * mp, + u32 session_index); + void (*server_stream_connected_cb) (session_connected_msg_t * mp, + u32 session_index); + void (*quic_accepted_cb) (session_accepted_msg_t * mp, u32 session_index); + void (*client_stream_accepted_cb) (session_accepted_msg_t * mp, + u32 session_index); + void (*server_stream_accepted_cb) (session_accepted_msg_t * mp, + u32 session_index); +} quic_echo_cb_vft_t; + + +typedef enum +{ + RETURN_PACKETS_NOTEST, + RETURN_PACKETS_LOG_WRONG, + RETURN_PACKETS_ASSERT, +} test_return_packets_t; + typedef struct { /* vpe input queue */ @@ -91,6 +124,8 @@ typedef struct /* Hash table for disconnect processing */ uword *session_index_by_vpp_handles; + /* Handle of vpp listener session */ + u64 listener_handle; /* Hash table for shared segment_names */ uword *shared_segment_handles; @@ -99,11 +134,6 @@ typedef struct /* intermediate rx buffer */ u8 *rx_buf; - /* URI for slave's connect */ - u8 *connect_uri; - - u32 connected_session_index; - int i_am_master; /* drop all packets */ @@ -124,9 +154,6 @@ typedef struct /* Signal variables */ volatile int time_to_stop; - volatile int time_to_print_stats; - - u32 configured_segment_size; /* VNET_API_ERROR_FOO -> "Foo" hash table */ uword *error_string_by_error_number; @@ -134,27 +161,39 @@ typedef struct u8 *connect_test_data; pthread_t *client_thread_handles; u32 *thread_args; - u32 client_bytes_received; u8 test_return_packets; u64 bytes_to_send; + u64 bytes_to_receive; u32 fifo_size; - u32 quic_streams; + u8 *appns_id; u64 appns_flags; u64 appns_secret; - u32 n_clients; + u32 n_clients; /* Target number of QUIC sessions */ + u32 n_stream_clients; /* Target Number of STREAM sessions per QUIC session */ + volatile u32 n_quic_clients_connected; /* Number of connected QUIC sessions */ + volatile u32 n_clients_connected; /* Number of STREAM sessions connected */ + u64 tx_total; u64 rx_total; - volatile u32 n_clients_connected; - volatile u32 n_active_clients; + /* Event based timing : start & end depend on CLI specified events */ + u8 first_sconnect_sent; /* Sent the first Stream session connect ? */ + f64 start_time; + f64 end_time; + u8 timing_start_event; + u8 timing_end_event; + /* cb vft for QUIC scenarios */ + quic_echo_cb_vft_t cb_vft; /** Flag that decides if socket, instead of svm, api is used to connect to * vpp. If sock api is used, shm binary api is subsequently bootstrapped * and all other messages are exchanged using shm IPC. */ u8 use_sock_api; + + /* Limit the number of incorrect data messages */ int max_test_msg; fifo_segment_main_t segment_main; @@ -168,6 +207,34 @@ echo_main_t echo_main; #define NITER 4000000 #endif +#if CLIB_DEBUG > 0 +#define TIMEOUT 10.0 +#else +#define TIMEOUT 10.0 +#endif + +u8 * +format_quic_echo_state (u8 * s, va_list * args) +{ + u32 state = va_arg (*args, u32); + if (state == STATE_START) + return format (s, "STATE_START"); + if (state == STATE_ATTACHED) + return format (s, "STATE_ATTACHED"); + if (state == STATE_LISTEN) + return format (s, "STATE_LISTEN"); + if (state == STATE_READY) + return format (s, "STATE_READY"); + if (state == STATE_DISCONNECTING) + return format (s, "STATE_DISCONNECTING"); + if (state == STATE_FAILED) + return format (s, "STATE_FAILED"); + if (state == STATE_DETACHED) + return format (s, "STATE_DETACHED"); + else + return format (s, "unknown state"); +} + static u8 * format_api_error (u8 * s, va_list * args) { @@ -184,6 +251,60 @@ format_api_error (u8 * s, va_list * args) return s; } +static void +quic_echo_notify_event (echo_main_t * em, echo_test_evt_t e) +{ + if (em->timing_start_event == e) + em->start_time = clib_time_now (&em->clib_time); + else if (em->timing_end_event == e) + em->end_time = clib_time_now (&em->clib_time); +} + +static uword +echo_unformat_timing_event (unformat_input_t * input, va_list * args) +{ + echo_test_evt_t *a = va_arg (*args, echo_test_evt_t *); + if (unformat (input, "start")) + *a = ECHO_EVT_START; + else if (unformat (input, "qconnect")) + *a = ECHO_EVT_FIRST_QCONNECT; + else if (unformat (input, "qconnected")) + *a = ECHO_EVT_LAST_QCONNECTED; + else if (unformat (input, "sconnect")) + *a = ECHO_EVT_FIRST_SCONNECT; + else if (unformat (input, "sconnected")) + *a = ECHO_EVT_LAST_SCONNECTED; + else if (unformat (input, "lastbyte")) + *a = ECHO_EVT_LAST_BYTE; + else if (unformat (input, "exit")) + *a = ECHO_EVT_EXIT; + else + return 0; + return 1; +} + +u8 * +echo_format_timing_event (u8 * s, va_list * args) +{ + u32 timing_event = va_arg (*args, u32); + if (timing_event == ECHO_EVT_START) + return format (s, "start"); + if (timing_event == ECHO_EVT_FIRST_QCONNECT) + return format (s, "qconnect"); + if (timing_event == ECHO_EVT_LAST_QCONNECTED) + return format (s, "qconnected"); + if (timing_event == ECHO_EVT_FIRST_SCONNECT) + return format (s, "sconnect"); + if (timing_event == ECHO_EVT_LAST_SCONNECTED) + return format (s, "sconnected"); + if (timing_event == ECHO_EVT_LAST_BYTE) + return format (s, "lastbyte"); + if (timing_event == ECHO_EVT_EXIT) + return format (s, "exit"); + else + return format (s, "unknown timing event"); +} + static void init_error_string_table (echo_main_t * em) { @@ -196,13 +317,9 @@ init_error_string_table (echo_main_t * em) hash_set (em->error_string_by_error_number, 99, "Misc"); } -static void handle_mq_event (session_event_t * e); - -#if CLIB_DEBUG > 0 -#define TIMEOUT 10.0 -#else -#define TIMEOUT 10.0 -#endif +static void handle_mq_event (echo_main_t * em, session_event_t * e, + int handle_rx); +static void echo_handle_rx (echo_main_t * em, session_event_t * e); static int wait_for_segment_allocation (u64 segment_handle) @@ -241,14 +358,14 @@ wait_for_disconnected_sessions (echo_main_t * em) } static int -wait_for_state_change (echo_main_t * em, connection_state_t state) +wait_for_state_change (echo_main_t * em, connection_state_t state, + f64 timeout) { svm_msg_q_msg_t msg; session_event_t *e; - f64 timeout; - timeout = clib_time_now (&em->clib_time) + TIMEOUT; + f64 end_time = clib_time_now (&em->clib_time) + timeout; - while (clib_time_now (&em->clib_time) < timeout) + while (!timeout || clib_time_now (&em->clib_time) < end_time) { if (em->state == state) return 0; @@ -262,13 +379,27 @@ wait_for_state_change (echo_main_t * em, connection_state_t state) if (svm_msg_q_sub (em->our_event_queue, &msg, SVM_Q_NOWAIT, 0)) continue; e = svm_msg_q_msg_data (em->our_event_queue, &msg); - handle_mq_event (e); + handle_mq_event (em, e, 0 /* handle_rx */ ); svm_msg_q_free_msg (em->our_event_queue, &msg); } - clib_warning ("timeout waiting for state %d", state); + clib_warning ("timeout waiting for state %U", format_quic_echo_state, + state); return -1; } +static void +notify_rx_data_to_vpp (echo_session_t * s) +{ + svm_fifo_t *f = s->tx_fifo; + return; /* FOR NOW */ + if (svm_fifo_set_event (f)) + { + DBG ("did send event"); + app_send_io_evt_to_vpp (s->vpp_evt_q, f->master_session_index, + SESSION_IO_EVT_TX, 0 /* noblock */ ); + } +} + void application_send_attach (echo_main_t * em) { @@ -323,12 +454,7 @@ static int application_attach (echo_main_t * em) { application_send_attach (em); - if (wait_for_state_change (em, STATE_ATTACHED)) - { - clib_warning ("timeout waiting for STATE_ATTACHED"); - return -1; - } - return 0; + return wait_for_state_change (em, STATE_ATTACHED, TIMEOUT); } void @@ -450,29 +576,15 @@ stop_signal (int signum) um->time_to_stop = 1; } -static void -stats_signal (int signum) -{ - echo_main_t *um = &echo_main; - um->time_to_print_stats = 1; -} - static clib_error_t * setup_signal_handlers (void) { - signal (SIGUSR2, stats_signal); signal (SIGINT, stop_signal); signal (SIGQUIT, stop_signal); signal (SIGTERM, stop_signal); return 0; } -void -vlib_cli_output (struct vlib_main_t *vm, char *fmt, ...) -{ - clib_warning ("BUG"); -} - int connect_to_vpp (char *name) { @@ -565,31 +677,55 @@ vl_api_map_another_segment_t_handler (vl_api_map_another_segment_t * mp) static void session_print_stats (echo_main_t * em, echo_session_t * session) { - f64 deltat; - u64 bytes; + f64 deltat = clib_time_now (&em->clib_time) - session->start; + fformat (stdout, "Session %x done in %.6fs RX[%.4f] TX[%.4f] Gbit/s\n", + session->session_index, deltat, + (session->bytes_received * 8.0) / deltat / 1e9, + (session->bytes_sent * 8.0) / deltat / 1e9); +} - deltat = clib_time_now (&em->clib_time) - session->start; - bytes = em->i_am_master ? session->bytes_received : em->bytes_to_send; - fformat (stdout, "Finished in %.6f\n", deltat); - fformat (stdout, "%.4f Gbit/second\n", (bytes * 8.0) / deltat / 1e9); +static void +print_global_stats (echo_main_t * em) +{ + f64 deltat = em->end_time - em->start_time; + u8 *s = format (0, "%U:%U", + echo_format_timing_event, em->timing_start_event, + echo_format_timing_event, em->timing_end_event); + fformat (stdout, "Timinig %s\n", s); + fformat (stdout, "-------- TX --------\n"); + fformat (stdout, "%lld bytes (%lld mbytes, %lld gbytes) in %.2f seconds\n", + em->tx_total, em->tx_total / (1ULL << 20), + em->tx_total / (1ULL << 30), deltat); + fformat (stdout, "%.4f Gbit/second\n", (em->tx_total * 8.0) / deltat / 1e9); + fformat (stdout, "-------- RX --------\n"); + fformat (stdout, "%lld bytes (%lld mbytes, %lld gbytes) in %.2f seconds\n", + em->rx_total, em->rx_total / (1ULL << 20), + em->rx_total / (1ULL << 30), deltat); + fformat (stdout, "%.4f Gbit/second\n", (em->rx_total * 8.0) / deltat / 1e9); + fformat (stdout, "--------------------\n"); } + static void test_recv_bytes (echo_main_t * em, echo_session_t * s, u8 * rx_buf, u32 n_read) { int i; + u8 expected; for (i = 0; i < n_read; i++) { - if (rx_buf[i] != ((s->bytes_received + i) & 0xff) - && em->max_test_msg > 0) + expected = (s->bytes_received + i) & 0xff; + if (rx_buf[i] != expected && em->max_test_msg > 0) { - clib_warning ("error at byte %lld, 0x%x not 0x%x", - s->bytes_received + i, rx_buf[i], - ((s->bytes_received + i) & 0xff)); + clib_warning + ("Session[%lx][0x%lx] byte[%lld], got 0x%x but expected 0x%x", + s->session_index, s->vpp_session_handle, s->bytes_received + i, + rx_buf[i], expected); em->max_test_msg--; if (em->max_test_msg == 0) clib_warning ("Too many errors, hiding next ones"); + if (em->test_return_packets == RETURN_PACKETS_ASSERT) + ASSERT (0); } } } @@ -605,22 +741,18 @@ recv_data_chunk (echo_main_t * em, echo_session_t * s, u8 * rx_buf) do { - n_read = app_recv_stream ((app_session_t *) s, rx_buf, - vec_len (rx_buf)); - - if (n_read > 0) - { - if (em->test_return_packets) - test_recv_bytes (em, s, rx_buf, n_read); - - n_to_read -= n_read; - - s->bytes_received += n_read; - ASSERT (s->bytes_to_receive >= n_read); - s->bytes_to_receive -= n_read; - } - else + n_read = + app_recv_stream ((app_session_t *) s, rx_buf, vec_len (rx_buf)); + if (n_read <= 0) break; + notify_rx_data_to_vpp (s); + if (em->test_return_packets) + test_recv_bytes (em, s, rx_buf, n_read); + + ASSERT (s->bytes_to_receive >= n_read); + n_to_read -= n_read; + s->bytes_received += n_read; + s->bytes_to_receive -= n_read; } while (n_to_read > 0); } @@ -654,7 +786,7 @@ static void * client_thread_fn (void *arg) { echo_main_t *em = &echo_main; - static u8 *rx_buf = 0; + u8 *rx_buf = 0; u32 session_index = *(u32 *) arg; echo_session_t *s; @@ -672,18 +804,21 @@ client_thread_fn (void *arg) break; } - DBG ("session %d done send %lu to do, %lu done || recv %lu to do, %lu done", - session_index, s->bytes_to_send, s->bytes_sent, s->bytes_to_receive, - s->bytes_received); + DBG ("[%lu/%lu] -> S(%x) -> [%lu/%lu]", + s->bytes_received, s->bytes_received + s->bytes_to_receive, + session_index, s->bytes_sent, s->bytes_sent + s->bytes_to_send); em->tx_total += s->bytes_sent; em->rx_total += s->bytes_received; - em->n_active_clients--; + em->n_clients_connected--; + + if (em->n_clients_connected == 0) + quic_echo_notify_event (em, ECHO_EVT_LAST_BYTE); pthread_exit (0); } -void -client_send_connect (echo_main_t * em, u8 * uri, u32 opaque) +static void +echo_send_connect (echo_main_t * em, u8 * uri, u32 opaque) { vl_api_connect_uri_t *cmp; cmp = vl_msg_api_alloc (sizeof (*cmp)); @@ -696,8 +831,8 @@ client_send_connect (echo_main_t * em, u8 * uri, u32 opaque) vl_msg_api_send_shmem (em->vl_input_queue, (u8 *) & cmp); } -void -client_send_disconnect (echo_main_t * em, echo_session_t * s) +static void +client_disconnect_session (echo_main_t * em, echo_session_t * s) { vl_api_disconnect_session_t *dmp; dmp = vl_msg_api_alloc (sizeof (*dmp)); @@ -707,15 +842,8 @@ client_send_disconnect (echo_main_t * em, echo_session_t * s) dmp->handle = s->vpp_session_handle; DBG ("Sending Session disonnect handle %lu", dmp->handle); vl_msg_api_send_shmem (em->vl_input_queue, (u8 *) & dmp); -} - -int -client_disconnect (echo_main_t * em, echo_session_t * s) -{ - client_send_disconnect (em, s); pool_put (em->sessions, s); clib_memset (s, 0xfe, sizeof (*s)); - return 0; } static void @@ -734,16 +862,10 @@ session_bound_handler (session_bound_msg_t * mp) clib_warning ("listening on %U:%u", format_ip46_address, mp->lcl_ip, mp->lcl_is_ip4 ? IP46_TYPE_IP4 : IP46_TYPE_IP6, clib_net_to_host_u16 (mp->lcl_port)); - em->state = STATE_READY; -} - -static void -quic_qsession_accepted_handler (session_accepted_msg_t * mp) -{ - DBG ("Accept on QSession index %u", mp->handle); + em->listener_handle = mp->handle; + em->state = STATE_LISTEN; } - static void session_accepted_handler (session_accepted_msg_t * mp) { @@ -752,30 +874,15 @@ session_accepted_handler (session_accepted_msg_t * mp) svm_fifo_t *rx_fifo, *tx_fifo; echo_main_t *em = &echo_main; echo_session_t *session; - static f64 start_time; u32 session_index; - u64 segment_handle; - u8 *ip_str; - - segment_handle = mp->segment_handle; - - if (start_time == 0.0) - start_time = clib_time_now (&em->clib_time); - - ip_str = format (0, "%U", format_ip46_address, &mp->rmt.ip, mp->rmt.is_ip4); - clib_warning ("Accepted session from: %s:%d", ip_str, - clib_net_to_host_u16 (mp->rmt.port)); /* Allocate local session and set it up */ pool_get (em->sessions, session); session_index = session - em->sessions; - if (wait_for_segment_allocation (segment_handle)) - { - clib_warning ("timeout waiting for segment allocation %lu", - segment_handle); - return; - } + if (wait_for_segment_allocation (mp->segment_handle)) + return; + rx_fifo = uword_to_pointer (mp->server_rx_fifo, svm_fifo_t *); rx_fifo->client_session_index = session_index; tx_fifo = uword_to_pointer (mp->server_tx_fifo, svm_fifo_t *); @@ -783,7 +890,9 @@ session_accepted_handler (session_accepted_msg_t * mp) session->rx_fifo = rx_fifo; session->tx_fifo = tx_fifo; + session->session_index = session_index; session->vpp_session_handle = mp->handle; + session->start = clib_time_now (&em->clib_time); session->vpp_evt_q = uword_to_pointer (mp->vpp_event_queue_address, svm_msg_q_t *); @@ -792,9 +901,6 @@ session_accepted_handler (session_accepted_msg_t * mp) mp->listener_handle, session_index); hash_set (em->session_index_by_vpp_handles, mp->handle, session_index); - /* - * Send accept reply to vpp - */ app_alloc_ctrl_evt_to_vpp (session->vpp_evt_q, app_evt, SESSION_CTRL_EVT_ACCEPTED_REPLY); rmp = (session_accepted_reply_msg_t *) app_evt->evt->data; @@ -802,33 +908,33 @@ session_accepted_handler (session_accepted_msg_t * mp) rmp->context = mp->context; app_send_ctrl_evt_to_vpp (session->vpp_evt_q, app_evt); - /* TODO : this is very ugly */ - if (mp->rmt.is_ip4 != 255) - return quic_qsession_accepted_handler (mp); DBG ("SSession handle is %lu", mp->handle); - - em->state = STATE_READY; - - /* Stats printing */ - if (pool_elts (em->sessions) && (pool_elts (em->sessions) % 20000) == 0) + if (mp->listener_handle == em->listener_handle) { - f64 now = clib_time_now (&em->clib_time); - fformat (stdout, "%d active sessions in %.2f seconds, %.2f/sec...\n", - pool_elts (em->sessions), now - start_time, - (f64) pool_elts (em->sessions) / (now - start_time)); + if (em->cb_vft.quic_accepted_cb) + em->cb_vft.quic_accepted_cb (mp, session_index); + em->n_quic_clients_connected++; + } + else if (em->i_am_master) + { + if (em->cb_vft.server_stream_accepted_cb) + em->cb_vft.server_stream_accepted_cb (mp, session_index); + em->n_clients_connected++; + } + else + { + if (em->cb_vft.client_stream_accepted_cb) + em->cb_vft.client_stream_accepted_cb (mp, session_index); + em->n_clients_connected++; } - session->bytes_received = 0; - session->start = clib_time_now (&em->clib_time); -} - -static void -quic_session_connected_handler (session_connected_msg_t * mp) -{ - echo_main_t *em = &echo_main; - u8 *uri = format (0, "QUIC://session/%lu", mp->handle); - DBG ("QSession Connect : %s", uri); - client_send_connect (em, uri, QUIC_SESSION_TYPE_STREAM); + if (em->n_clients_connected == em->n_clients * em->n_stream_clients) + { + em->state = STATE_READY; + quic_echo_notify_event (em, ECHO_EVT_LAST_SCONNECTED); + } + if (em->n_quic_clients_connected == em->n_clients) + quic_echo_notify_event (em, ECHO_EVT_LAST_QCONNECTED); } static void @@ -838,9 +944,6 @@ session_connected_handler (session_connected_msg_t * mp) echo_session_t *session; u32 session_index; svm_fifo_t *rx_fifo, *tx_fifo; - int rv; - u64 segment_handle; - segment_handle = mp->segment_handle; if (mp->retval) { @@ -850,21 +953,14 @@ session_connected_handler (session_connected_msg_t * mp) return; } - /* - * Setup session - */ - pool_get (em->sessions, session); clib_memset (session, 0, sizeof (*session)); session_index = session - em->sessions; - DBG ("Setting session_index %lu", session_index); + DBG ("CONNECTED session[%lx][0x%lx]", session_index, mp->handle); + + if (wait_for_segment_allocation (mp->segment_handle)) + return; - if (wait_for_segment_allocation (segment_handle)) - { - clib_warning ("timeout waiting for segment allocation %lu", - segment_handle); - return; - } rx_fifo = uword_to_pointer (mp->server_rx_fifo, svm_fifo_t *); rx_fifo->client_session_index = session_index; tx_fifo = uword_to_pointer (mp->server_tx_fifo, svm_fifo_t *); @@ -873,17 +969,120 @@ session_connected_handler (session_connected_msg_t * mp) session->rx_fifo = rx_fifo; session->tx_fifo = tx_fifo; session->vpp_session_handle = mp->handle; + session->session_index = session_index; session->start = clib_time_now (&em->clib_time); session->vpp_evt_q = uword_to_pointer (mp->vpp_event_queue_address, svm_msg_q_t *); - DBG ("Connected session handle %lx, idx %lu", mp->handle, session_index); + DBG ("Connected session handle %lx, idx %lu RX[%lx] TX[%lx]", mp->handle, + session_index, rx_fifo, tx_fifo); hash_set (em->session_index_by_vpp_handles, mp->handle, session_index); if (mp->context == QUIC_SESSION_TYPE_QUIC) - return quic_session_connected_handler (mp); + { + if (em->cb_vft.quic_connected_cb) + em->cb_vft.quic_connected_cb (mp, session_index); + em->n_quic_clients_connected++; + } + else if (em->i_am_master) + { + if (em->cb_vft.server_stream_connected_cb) + em->cb_vft.server_stream_connected_cb (mp, session_index); + em->n_clients_connected++; + } + else + { + if (em->cb_vft.client_stream_connected_cb) + em->cb_vft.client_stream_connected_cb (mp, session_index); + em->n_clients_connected++; + } - DBG ("SSession Connected"); + if (em->n_clients_connected == em->n_clients * em->n_stream_clients) + { + em->state = STATE_READY; + quic_echo_notify_event (em, ECHO_EVT_LAST_SCONNECTED); + } + if (em->n_quic_clients_connected == em->n_clients) + quic_echo_notify_event (em, ECHO_EVT_LAST_QCONNECTED); +} + +/* + * + * ECHO Callback definitions + * + */ + + +static void +echo_on_connected_connect (session_connected_msg_t * mp, u32 session_index) +{ + echo_main_t *em = &echo_main; + u8 *uri = format (0, "QUIC://session/%lu", mp->handle); + int i; + + if (!em->first_sconnect_sent) + { + em->first_sconnect_sent = 1; + quic_echo_notify_event (em, ECHO_EVT_FIRST_SCONNECT); + } + for (i = 0; i < em->n_stream_clients; i++) + { + DBG ("CONNECT : new QUIC stream #%d: %s", i, uri); + echo_send_connect (em, uri, QUIC_SESSION_TYPE_STREAM); + } + + clib_warning ("session %u (0x%llx) connected with local ip %U port %d", + session_index, mp->handle, format_ip46_address, &mp->lcl.ip, + mp->lcl.is_ip4, clib_net_to_host_u16 (mp->lcl.port)); +} + +static void +echo_on_connected_send (session_connected_msg_t * mp, u32 session_index) +{ + echo_main_t *em = &echo_main; + int rv; + echo_session_t *session; + + DBG ("Stream Session Connected"); + + session = pool_elt_at_index (em->sessions, session_index); + session->bytes_to_send = em->bytes_to_send; + session->bytes_to_receive = em->bytes_to_receive; + + /* + * Start RX thread + */ + em->thread_args[em->n_clients_connected] = session_index; + rv = pthread_create (&em->client_thread_handles[em->n_clients_connected], + NULL /*attr */ , client_thread_fn, + (void *) &em->thread_args[em->n_clients_connected]); + if (rv) + { + clib_warning ("pthread_create returned %d", rv); + return; + } +} + +static void +echo_on_connected_error (session_connected_msg_t * mp, u32 session_index) +{ + clib_warning ("Got a wrong connected on session %u [%lx]", session_index, + mp->handle); +} + +static void +echo_on_accept_recv (session_accepted_msg_t * mp, u32 session_index) +{ + echo_main_t *em = &echo_main; + int rv; + echo_session_t *session; + + session = pool_elt_at_index (em->sessions, session_index); + session->bytes_to_send = em->bytes_to_send; + session->bytes_to_receive = em->bytes_to_receive; + + DBG ("Stream session accepted 0x%lx, expecting %lu bytes", + session->vpp_session_handle, session->bytes_to_receive); /* * Start RX thread @@ -898,12 +1097,116 @@ session_connected_handler (session_connected_msg_t * mp) return; } - em->n_clients_connected += 1; - clib_warning ("session %u (0x%llx) connected with local ip %U port %d", - session_index, mp->handle, format_ip46_address, &mp->lcl.ip, - mp->lcl.is_ip4, clib_net_to_host_u16 (mp->lcl.port)); } +static void +echo_on_accept_connect (session_accepted_msg_t * mp, u32 session_index) +{ + echo_main_t *em = &echo_main; + DBG ("Accept on QSession index %u", mp->handle); + u8 *uri = format (0, "QUIC://session/%lu", mp->handle); + u32 i; + + if (!em->first_sconnect_sent) + { + em->first_sconnect_sent = 1; + quic_echo_notify_event (em, ECHO_EVT_FIRST_SCONNECT); + } + for (i = 0; i < em->n_stream_clients; i++) + { + DBG ("ACCEPT : new QUIC stream #%d: %s", i, uri); + echo_send_connect (em, uri, QUIC_SESSION_TYPE_STREAM); + } +} + +static void +echo_on_accept_error (session_accepted_msg_t * mp, u32 session_index) +{ + clib_warning ("Got a wrong accept on session %u [%lx]", session_index, + mp->handle); +} + +static void +echo_on_accept_log_ip (session_accepted_msg_t * mp, u32 session_index) +{ + u8 *ip_str; + ip_str = format (0, "%U", format_ip46_address, &mp->rmt.ip, mp->rmt.is_ip4); + clib_warning ("Accepted session from: %s:%d", ip_str, + clib_net_to_host_u16 (mp->rmt.port)); + +} + +static const quic_echo_cb_vft_t default_cb_vft = { + /* Qsessions */ + .quic_accepted_cb = &echo_on_accept_log_ip, + .quic_connected_cb = &echo_on_connected_connect, + /* client initiated streams */ + .server_stream_accepted_cb = NULL, + .client_stream_connected_cb = &echo_on_connected_send, + /* server initiated streams */ + .client_stream_accepted_cb = &echo_on_accept_error, + .server_stream_connected_cb = &echo_on_connected_error, +}; + +static const quic_echo_cb_vft_t server_stream_cb_vft = { + /* Qsessions */ + .quic_accepted_cb = &echo_on_accept_connect, + .quic_connected_cb = NULL, + /* client initiated streams */ + .server_stream_accepted_cb = &echo_on_accept_error, + .client_stream_connected_cb = &echo_on_connected_error, + /* server initiated streams */ + .client_stream_accepted_cb = &echo_on_accept_recv, + .server_stream_connected_cb = &echo_on_connected_send, +}; + +static uword +echo_unformat_quic_setup_vft (unformat_input_t * input, va_list * args) +{ + echo_main_t *em = &echo_main; + if (unformat (input, "serverstream")) + { + clib_warning ("Using QUIC server initiated streams"); + em->no_return = 1; + em->cb_vft = server_stream_cb_vft; + return 1; + } + else if (unformat (input, "default")) + return 1; + return 0; +} + +static uword +echo_unformat_data (unformat_input_t * input, va_list * args) +{ + u64 _a; + u64 *a = va_arg (*args, u64 *); + if (unformat (input, "%lluGb", &_a)) + { + *a = _a << 30; + return 1; + } + else if (unformat (input, "%lluMb", &_a)) + { + *a = _a << 20; + return 1; + } + else if (unformat (input, "%lluKb", &_a)) + { + *a = _a << 10; + return 1; + } + else if (unformat (input, "%llu", a)) + return 1; + return 0; +} + +/* + * + * End of ECHO callback definitions + * + */ + static void session_disconnected_handler (session_disconnected_msg_t * mp) { @@ -972,7 +1275,7 @@ session_reset_handler (session_reset_msg_t * mp) } static void -handle_mq_event (session_event_t * e) +handle_mq_event (echo_main_t * em, session_event_t * e, int handle_rx) { switch (e->event_type) { @@ -996,120 +1299,70 @@ handle_mq_event (session_event_t * e) DBG ("SESSION_CTRL_EVT_RESET"); session_reset_handler ((session_reset_msg_t *) e->data); break; + case SESSION_IO_EVT_RX: + DBG ("SESSION_IO_EVT_RX"); + if (handle_rx) + echo_handle_rx (em, e); + break; default: - clib_warning ("unhandled %u", e->event_type); + clib_warning ("unhandled event %u", e->event_type); } } -static void +static int clients_run (echo_main_t * em) { - f64 start_time, deltat, timeout = 100.0; svm_msg_q_msg_t msg; session_event_t *e; echo_session_t *s; hash_pair_t *p; int i; - /* Init test data */ - vec_validate (em->connect_test_data, 1024 * 1024 - 1); - for (i = 0; i < vec_len (em->connect_test_data); i++) - em->connect_test_data[i] = i & 0xff; - /* * Attach and connect the clients */ if (application_attach (em)) - return; + return -1; + quic_echo_notify_event (em, ECHO_EVT_FIRST_QCONNECT); for (i = 0; i < em->n_clients; i++) - client_send_connect (em, em->connect_uri, QUIC_SESSION_TYPE_QUIC); + echo_send_connect (em, em->uri, QUIC_SESSION_TYPE_QUIC); - start_time = clib_time_now (&em->clib_time); - while (em->n_clients_connected < em->n_clients - && (clib_time_now (&em->clib_time) - start_time < timeout) - && em->state != STATE_FAILED && em->time_to_stop != 1) + wait_for_state_change (em, STATE_READY, TIMEOUT); + /* + * Wait for client threads to send the data + */ + DBG ("Waiting for data on %u clients", em->n_clients_connected); + while (em->n_clients_connected) { - int rc = svm_msg_q_sub (em->our_event_queue, &msg, SVM_Q_TIMEDWAIT, 1); - if (rc == ETIMEDOUT && em->time_to_stop) - break; - if (rc == ETIMEDOUT) + if (svm_msg_q_is_empty (em->our_event_queue)) + continue; + if (svm_msg_q_sub (em->our_event_queue, &msg, SVM_Q_TIMEDWAIT, 1)) continue; e = svm_msg_q_msg_data (em->our_event_queue, &msg); - handle_mq_event (e); + handle_mq_event (em, e, 0 /* handle_rx */ ); svm_msg_q_free_msg (em->our_event_queue, &msg); } - if (em->n_clients_connected != em->n_clients) - { - clib_warning ("failed to initialize all connections"); - return; - } - - /* - * Initialize connections - */ - DBG ("Initialize connections on %u clients", em->n_clients); - - /* *INDENT-OFF* */ - hash_foreach_pair (p, em->session_index_by_vpp_handles, - ({ - s = pool_elt_at_index (em->sessions, p->value[0]); - s->bytes_to_send = em->bytes_to_send; - if (!em->no_return) - s->bytes_to_receive = em->bytes_to_send; - })); - /* *INDENT-ON* */ - em->n_active_clients = em->n_clients_connected; - - /* - * Wait for client threads to send the data - */ - DBG ("Waiting for data on %u clients", em->n_active_clients); - start_time = clib_time_now (&em->clib_time); - em->state = STATE_READY; - while (em->n_active_clients) - if (!svm_msg_q_is_empty (em->our_event_queue)) - { - if (svm_msg_q_sub (em->our_event_queue, &msg, SVM_Q_TIMEDWAIT, 0)) - { - clib_warning ("svm msg q returned"); - continue; - } - e = svm_msg_q_msg_data (em->our_event_queue, &msg); - if (e->event_type != FIFO_EVENT_APP_RX) - handle_mq_event (e); - svm_msg_q_free_msg (em->our_event_queue, &msg); - } - /* *INDENT-OFF* */ hash_foreach_pair (p, em->session_index_by_vpp_handles, - ({ + ({ s = pool_elt_at_index (em->sessions, p->value[0]); DBG ("Sending disconnect on session %lu", p->key); - client_disconnect (em, s); - })); + client_disconnect_session (em, s); + })); /* *INDENT-ON* */ - /* - * Stats and detach - */ - deltat = clib_time_now (&em->clib_time) - start_time; - fformat (stdout, "%lld bytes (%lld mbytes, %lld gbytes) in %.2f seconds\n", - em->tx_total, em->tx_total / (1ULL << 20), - em->tx_total / (1ULL << 30), deltat); - fformat (stdout, "%.4f Gbit/second\n", (em->tx_total * 8.0) / deltat / 1e9); - wait_for_disconnected_sessions (em); application_detach (em); + return 0; } static void vl_api_bind_uri_reply_t_handler (vl_api_bind_uri_reply_t * mp) { echo_main_t *em = &echo_main; - if (mp->retval) { clib_warning ("bind failed: %U", format_api_error, @@ -1118,7 +1371,7 @@ vl_api_bind_uri_reply_t_handler (vl_api_bind_uri_reply_t * mp) return; } - em->state = STATE_READY; + em->state = STATE_LISTEN; } static void @@ -1216,7 +1469,7 @@ format_ip46_address (u8 * s, va_list * args) } static void -server_handle_rx (echo_main_t * em, session_event_t * e) +echo_handle_rx (echo_main_t * em, session_event_t * e) { int n_read, max_dequeue, n_sent; u32 offset, to_dequeue; @@ -1227,7 +1480,6 @@ server_handle_rx (echo_main_t * em, session_event_t * e) * app_recv_stream, we may end up with a lot of unhandled rx events on the * message queue */ svm_fifo_unset_event (s->rx_fifo); - max_dequeue = svm_fifo_max_dequeue (s->rx_fifo); if (PREDICT_FALSE (!max_dequeue)) return; @@ -1240,33 +1492,34 @@ server_handle_rx (echo_main_t * em, session_event_t * e) n_read = app_recv_stream_raw (s->rx_fifo, em->rx_buf, to_dequeue, 0 /* clear evt */ , 0 /* peek */ ); - if (n_read > 0) - { - if (em->test_return_packets) - test_recv_bytes (em, s, em->rx_buf, n_read); - - max_dequeue -= n_read; - s->bytes_received += n_read; - } - else + if (n_read <= 0) break; + DBG ("Notify cause %u bytes", n_read); + notify_rx_data_to_vpp (s); + if (em->test_return_packets) + test_recv_bytes (em, s, em->rx_buf, n_read); + + max_dequeue -= n_read; + s->bytes_received += n_read; + s->bytes_to_receive -= n_read; /* Reflect if a non-drop session */ - if (!em->no_return && n_read > 0) + if (!em->no_return) { offset = 0; do { n_sent = app_send_stream ((app_session_t *) s, - &em->rx_buf[offset], + em->rx_buf + offset, n_read, SVM_Q_WAIT); - if (n_sent > 0) - { - n_read -= n_sent; - offset += n_sent; - } + if (n_sent <= 0) + continue; + n_read -= n_sent; + s->bytes_to_send -= n_sent; + s->bytes_sent += n_sent; + offset += n_sent; } - while ((n_sent <= 0 || n_read > 0) && !em->time_to_stop); + while (n_read > 0); } } while (max_dequeue > 0 && !em->time_to_stop); @@ -1283,29 +1536,15 @@ server_handle_mq (echo_main_t * em) int rc = svm_msg_q_sub (em->our_event_queue, &msg, SVM_Q_TIMEDWAIT, 1); if (PREDICT_FALSE (rc == ETIMEDOUT && em->time_to_stop)) break; - if (PREDICT_FALSE (em->time_to_print_stats == 1)) - { - em->time_to_print_stats = 0; - fformat (stdout, "%d connections\n", pool_elts (em->sessions)); - } if (rc == ETIMEDOUT) continue; e = svm_msg_q_msg_data (em->our_event_queue, &msg); - switch (e->event_type) - { - case SESSION_IO_EVT_RX: - DBG ("SESSION_IO_EVT_RX"); - server_handle_rx (em, e); - break; - default: - handle_mq_event (e); - break; - } + handle_mq_event (em, e, em->state == STATE_READY /* handle_rx */ ); svm_msg_q_free_msg (em->our_event_queue, &msg); } } -void +static void server_send_listen (echo_main_t * em) { vl_api_bind_uri_t *bmp; @@ -1319,19 +1558,7 @@ server_send_listen (echo_main_t * em) vl_msg_api_send_shmem (em->vl_input_queue, (u8 *) & bmp); } -int -server_listen (echo_main_t * em) -{ - server_send_listen (em); - if (wait_for_state_change (em, STATE_READY)) - { - clib_warning ("timeout waiting for STATE_READY"); - return -1; - } - return 0; -} - -void +static void server_send_unbind (echo_main_t * em) { vl_api_unbind_uri_t *ump; @@ -1345,7 +1572,7 @@ server_send_unbind (echo_main_t * em) vl_msg_api_send_shmem (em->vl_input_queue, (u8 *) & ump); } -void +static int server_run (echo_main_t * em) { echo_session_t *session; @@ -1361,21 +1588,21 @@ server_run (echo_main_t * em) pool_put_index (em->sessions, i); if (application_attach (em)) - return; + return -1; /* Bind to uri */ - if (server_listen (em)) - return; + server_send_listen (em); + if (wait_for_state_change (em, STATE_READY, 0)) + return -2; /* Enter handle event loop */ server_handle_mq (em); /* Cleanup */ server_send_unbind (em); - application_detach (em); - fformat (stdout, "Test complete...\n"); + return 0; } static void @@ -1385,25 +1612,16 @@ vl_api_disconnect_session_reply_t_handler (vl_api_disconnect_session_reply_t * echo_main_t *em = &echo_main; uword *p; DBG ("Got disonnected reply for session handle %lu", mp->handle); - - if (mp->retval) - { - clib_warning ("vpp complained about disconnect: %d", - ntohl (mp->retval)); - return; - } - em->state = STATE_START; p = hash_get (em->session_index_by_vpp_handles, mp->handle); if (p) - { - hash_unset (em->session_index_by_vpp_handles, mp->handle); - } + hash_unset (em->session_index_by_vpp_handles, mp->handle); else - { - clib_warning ("couldn't find session key %llx", mp->handle); - } + clib_warning ("couldn't find session key %llx", mp->handle); + + if (mp->retval) + clib_warning ("vpp complained about disconnect: %d", ntohl (mp->retval)); } static void @@ -1446,40 +1664,44 @@ quic_echo_api_hookup (echo_main_t * em) #undef _ } -int -main (int argc, char **argv) +static void +print_usage_and_exit (void) +{ + fprintf (stderr, + "quic_echo [socket-name SOCKET] [client|server] [uri URI] [OPTIONS]\n" + "\n" + " socket-name PATH Specify the binary socket path to connect to VPP\n" + " use-svm-api Use SVM API to connect to VPP\n" + " test-bytes[:assert] Check data correctness when receiving (assert fails on first error)\n" + " fifo-size N Use N Kb fifos\n" + " appns NAMESPACE Use the namespace NAMESPACE\n" + " all-scope all-scope option\n" + " local-scope local-scope option\n" + " global-scope global-scope option\n" + " secret SECRET set namespace secret\n" + " chroot prefix PATH Use PATH as memory root path\n" + " quic-setup OPT OPT=serverstream : Client open N connections. On each one server opens M streams\n" + " by default : Client open N connections. On each one client opens M streams\n" + "\n" + " no-return Drop the data when received, dont reply\n" + " nclients N[/M] Open N QUIC connections, each one with M streams (M defaults to 1)\n" + " send N[Kb|Mb|GB] Send N [K|M|G]bytes\n" + " recv N[Kb|Mb|GB] Expect N [K|M|G]bytes\n" + " nclients N[/M] Open N QUIC connections, each one with M streams (M defaults to 1)\n"); + exit (1); +} + + +void +quic_echo_process_opts (int argc, char **argv) { - int i_am_server = 1, test_return_packets = 0; echo_main_t *em = &echo_main; - fifo_segment_main_t *sm = &em->segment_main; unformat_input_t _argv, *a = &_argv; + u32 tmp; u8 *chroot_prefix; u8 *uri = 0; - u8 *bind_uri = (u8 *) "quic://0.0.0.0/1234"; - u8 *connect_uri = (u8 *) "quic://6.0.1.1/1234"; - u64 bytes_to_send = 64 << 10, mbytes; - char *app_name; - u32 tmp; - - clib_mem_init_thread_safe (0, 256 << 20); - - clib_memset (em, 0, sizeof (*em)); - em->session_index_by_vpp_handles = hash_create (0, sizeof (uword)); - em->shared_segment_handles = hash_create (0, sizeof (uword)); - clib_spinlock_init (&em->segment_handles_lock); - em->my_pid = getpid (); - em->socket_name = 0; - em->use_sock_api = 1; - em->fifo_size = 64 << 10; - em->n_clients = 1; - em->max_test_msg = 50; - em->quic_streams = 1; - clib_time_init (&em->clib_time); - init_error_string_table (em); - fifo_segment_main_init (sm, HIGH_SEGMENT_BASEVA, 20); unformat_init_command_line (a, argv); - while (unformat_check_input (a) != UNFORMAT_END_OF_INPUT) { if (unformat (a, "chroot prefix %s", &chroot_prefix)) @@ -1487,33 +1709,27 @@ main (int argc, char **argv) vl_set_memory_root_path ((char *) chroot_prefix); } else if (unformat (a, "uri %s", &uri)) - ; + em->uri = format (0, "%s%c", uri, 0); else if (unformat (a, "server")) - i_am_server = 1; + em->i_am_master = 1; else if (unformat (a, "client")) - i_am_server = 0; + em->i_am_master = 0; else if (unformat (a, "no-return")) em->no_return = 1; + else if (unformat (a, "test-bytes:assert")) + em->test_return_packets = RETURN_PACKETS_ASSERT; else if (unformat (a, "test-bytes")) - test_return_packets = 1; - else if (unformat (a, "bytes %lld", &mbytes)) - { - bytes_to_send = mbytes; - } - else if (unformat (a, "mbytes %lld", &mbytes)) - { - bytes_to_send = mbytes << 20; - } - else if (unformat (a, "gbytes %lld", &mbytes)) - { - bytes_to_send = mbytes << 30; - } + em->test_return_packets = RETURN_PACKETS_LOG_WRONG; else if (unformat (a, "socket-name %s", &em->socket_name)) ; else if (unformat (a, "use-svm-api")) em->use_sock_api = 0; else if (unformat (a, "fifo-size %d", &tmp)) em->fifo_size = tmp << 10; + else + if (unformat + (a, "nclients %d/%d", &em->n_clients, &em->n_stream_clients)) + ; else if (unformat (a, "nclients %d", &em->n_clients)) ; else if (unformat (a, "appns %_%v%_", &em->appns_id)) @@ -1527,41 +1743,71 @@ main (int argc, char **argv) em->appns_flags = APP_OPTIONS_FLAGS_USE_GLOBAL_SCOPE; else if (unformat (a, "secret %lu", &em->appns_secret)) ; - else if (unformat (a, "quic-streams %d", &em->quic_streams)) + else if (unformat (a, "quic-setup %U", echo_unformat_quic_setup_vft)) ; else - { - fformat (stderr, "%s: usage [master|slave]\n", argv[0]); - exit (1); - } + if (unformat (a, "send %U", echo_unformat_data, &em->bytes_to_send)) + ; + else + if (unformat + (a, "recv %U", echo_unformat_data, &em->bytes_to_receive)) + ; + else if (unformat (a, "time %U:%U", + echo_unformat_timing_event, &em->timing_start_event, + echo_unformat_timing_event, &em->timing_end_event)) + ; + else + print_usage_and_exit (); } +} - if (!em->socket_name) - em->socket_name = format (0, "%s%c", API_SOCKET_FILE, 0); - - if (uri) - { - em->uri = format (0, "%s%c", uri, 0); - em->connect_uri = format (0, "%s%c", uri, 0); - } - else - { - em->uri = format (0, "%s%c", bind_uri, 0); - em->connect_uri = format (0, "%s%c", connect_uri, 0); - } +int +main (int argc, char **argv) +{ + echo_main_t *em = &echo_main; + fifo_segment_main_t *sm = &em->segment_main; + char *app_name; + int i, rv; + u32 n_clients; - em->i_am_master = i_am_server; - em->test_return_packets = test_return_packets; - em->bytes_to_send = bytes_to_send; + clib_mem_init_thread_safe (0, 256 << 20); + clib_memset (em, 0, sizeof (*em)); + em->session_index_by_vpp_handles = hash_create (0, sizeof (uword)); + em->shared_segment_handles = hash_create (0, sizeof (uword)); + em->my_pid = getpid (); + em->socket_name = format (0, "%s%c", API_SOCKET_FILE, 0); + em->use_sock_api = 1; + em->fifo_size = 64 << 10; + em->n_clients = 1; + em->n_stream_clients = 1; + em->max_test_msg = 50; em->time_to_stop = 0; + em->i_am_master = 1; + em->test_return_packets = RETURN_PACKETS_NOTEST; + em->timing_start_event = ECHO_EVT_FIRST_QCONNECT; + em->timing_end_event = ECHO_EVT_LAST_BYTE; + em->bytes_to_receive = 64 << 10; + em->bytes_to_send = 64 << 10; + em->uri = format (0, "%s%c", "quic://0.0.0.0/1234", 0); + em->cb_vft = default_cb_vft; + quic_echo_process_opts (argc, argv); + + n_clients = em->n_clients * em->n_stream_clients; + vec_validate (em->client_thread_handles, n_clients - 1); + vec_validate (em->thread_args, n_clients - 1); + clib_time_init (&em->clib_time); + init_error_string_table (em); + fifo_segment_main_init (sm, HIGH_SEGMENT_BASEVA, 20); + clib_spinlock_init (&em->segment_handles_lock); vec_validate (em->rx_buf, 4 << 20); - vec_validate (em->client_thread_handles, em->n_clients - 1); - vec_validate (em->thread_args, em->n_clients - 1); + vec_validate (em->connect_test_data, 1024 * 1024 - 1); + for (i = 0; i < vec_len (em->connect_test_data); i++) + em->connect_test_data[i] = i & 0xff; setup_signal_handlers (); quic_echo_api_hookup (em); - app_name = i_am_server ? "quic_echo_server" : "quic_echo_client"; + app_name = em->i_am_master ? "quic_echo_server" : "quic_echo_client"; if (connect_to_vpp (app_name) < 0) { svm_region_exit (); @@ -1569,14 +1815,19 @@ main (int argc, char **argv) exit (1); } - if (i_am_server == 0) - clients_run (em); + quic_echo_notify_event (em, ECHO_EVT_START); + if (em->i_am_master) + rv = server_run (em); else - server_run (em); + rv = clients_run (em); + if (rv) + exit (rv); + quic_echo_notify_event (em, ECHO_EVT_EXIT); + print_global_stats (em); /* Make sure detach finishes */ - wait_for_state_change (em, STATE_DETACHED); - + if (wait_for_state_change (em, STATE_DETACHED, TIMEOUT)) + exit (-1); disconnect_from_vpp (em); exit (0); } diff --git a/test/test_quic.py b/test/test_quic.py index 21f2fd73091..2bcbcf3387e 100644 --- a/test/test_quic.py +++ b/test/test_quic.py @@ -8,51 +8,54 @@ import signal from framework import VppTestCase, VppTestRunner, running_extended_tests, \ Worker from vpp_ip_route import VppIpTable, VppIpRoute, VppRoutePath +from threading import Event class QUICAppWorker(Worker): """ QUIC Test Application Worker """ + process = None - def __init__(self, build_dir, appname, args, logger, env={}): + def __init__(self, build_dir, appname, args, logger, env={}, event=None): app = "%s/vpp/bin/%s" % (build_dir, appname) self.args = [app] + args + self.event = event super(QUICAppWorker, self).__init__(self.args, logger, env) + def run(self): + super(QUICAppWorker, self).run() + if self.event: + self.event.set() -class QUICTestCase(VppTestCase): - """ QUIC Test Case """ + def teardown(self, logger, timeout): + if self.process is None: + return False + try: + logger.debug("Killing worker process (pid %d)" % self.process.pid) + os.killpg(os.getpgid(self.process.pid), signal.SIGKILL) + self.join(timeout) + except OSError as e: + logger.debug("Couldn't kill worker process") + return True + return False - @classmethod - def setUpClass(cls): - super(QUICTestCase, cls).setUpClass() - @classmethod - def tearDownClass(cls): - super(QUICTestCase, cls).tearDownClass() +class QUICTestCase(VppTestCase): + """ QUIC Test Case """ def setUp(self): + super(QUICTestCase, self).setUp() var = "VPP_BUILD_DIR" self.build_dir = os.getenv(var, None) if self.build_dir is None: raise Exception("Environment variable `%s' not set" % var) self.vppDebug = 'vpp_debug' in self.build_dir self.timeout = 20 - self.pre_test_sleep = 0.3 - self.post_test_sleep = 0.3 self.vapi.session_enable_disable(is_enabled=1) - - def tearDown(self): - self.vapi.session_enable_disable(is_enabled=0) - - def thru_host_stack_ipv4_setup(self): - super(QUICTestCase, self).setUp() + self.pre_test_sleep = 0.3 + self.post_test_sleep = 0.2 self.create_loopback_interfaces(2) self.uri = "quic://%s/1234" % self.loop0.local_ip4 - common_args = ["uri", self.uri, "fifo-size", "64"] - self.server_echo_test_args = common_args + ["appns", "server"] - self.client_echo_test_args = common_args + ["appns", "client", - "test-bytes"] table_id = 1 for i in self.lo_interfaces: i.admin_up() @@ -84,7 +87,8 @@ class QUICTestCase(VppTestCase): self.ip_t10.add_vpp_config() self.logger.debug(self.vapi.cli("show ip fib")) - def thru_host_stack_ipv4_tear_down(self): + def tearDown(self): + self.vapi.session_enable_disable(is_enabled=0) # Delete inter-table routes self.ip_t01.remove_vpp_config() self.ip_t10.remove_vpp_config() @@ -93,212 +97,123 @@ class QUICTestCase(VppTestCase): i.unconfig_ip4() i.set_table_ip4(0) i.admin_down() + super(QUICTestCase, self).tearDown() - def start_internal_echo_server(self, args): - error = self.vapi.cli("test echo server %s" % ' '.join(args)) + +class QUICEchoInternalTestCase(QUICTestCase): + """QUIC Echo Internal Test Case""" + def setUp(self): + super(QUICEchoInternalTestCase, self).setUp() + self.client_args = "uri %s fifo-size 64 test-bytes appns client" % self.uri + self.server_args = "uri %s fifo-size 64 appns server" % self.uri + + def server(self, *args): + error = self.vapi.cli("test echo server %s %s" % (self.server_args, ' '.join(args))) if error: self.logger.critical(error) self.assertNotIn("failed", error) - def start_internal_echo_client(self, args): - error = self.vapi.cli("test echo client %s" % ' '.join(args)) + def client(self, *args): + error = self.vapi.cli("test echo client %s %s" % (self.client_args, ' '.join(args))) if error: self.logger.critical(error) self.assertNotIn("failed", error) - def internal_ipv4_transfer_test(self, server_args, client_args): - self.start_internal_echo_server(server_args) - self.start_internal_echo_client(client_args) - - def start_external_echo_server(self, args): - self.worker_server = QUICAppWorker(self.build_dir, "quic_echo", - args, self.logger) - self.worker_server.start() - - def start_external_echo_client(self, args): - self.client_echo_test_args += "use-svm-api" - self.worker_client = QUICAppWorker(self.build_dir, "quic_echo", - args, self.logger) - self.worker_client.start() - self.worker_client.join(self.timeout) - try: - self.validateExternalTestResults() - except Exception as error: - self.fail("Failed with %s" % error) - - def external_ipv4_transfer_test(self, server_args, client_args): - self.start_external_echo_server(server_args) - self.sleep(self.pre_test_sleep) - self.start_external_echo_client(client_args) - self.sleep(self.post_test_sleep) - - def validateExternalTestResults(self): - if os.path.isdir('/proc/{}'.format(self.worker_server.process.pid)): - self.logger.info("Killing server worker process (pid %d)" % - self.worker_server.process.pid) - os.killpg(os.getpgid(self.worker_server.process.pid), - signal.SIGTERM) - self.worker_server.join() - self.logger.info("Client worker result is `%s'" % - self.worker_client.result) - error = False - if self.worker_client.result is None: - try: - error = True - self.logger.error( - "Timeout: %ss! Killing client worker process (pid %d)" % - (self.timeout, self.worker_client.process.pid)) - os.killpg(os.getpgid(self.worker_client.process.pid), - signal.SIGKILL) - self.worker_client.join() - except OSError: - self.logger.debug( - "Couldn't kill client worker process") - raise - if error: - raise Exception( - "Timeout! Client worker did not finish in %ss" % self.timeout) - self.assert_equal(self.worker_client.result, 0, - "Binary test return code") - - -class QUICInternalEchoIPv4TestCase(QUICTestCase): - """ QUIC Internal Echo IPv4 Transfer Test Cases """ - - @classmethod - def setUpClass(cls): - super(QUICInternalEchoIPv4TestCase, cls).setUpClass() - - @classmethod - def tearDownClass(cls): - super(QUICInternalEchoIPv4TestCase, cls).tearDownClass() - - def setUp(self): - super(QUICInternalEchoIPv4TestCase, self).setUp() - self.thru_host_stack_ipv4_setup() - - def tearDown(self): - super(QUICInternalEchoIPv4TestCase, self).tearDown() - self.thru_host_stack_ipv4_tear_down() - - def show_commands_at_teardown(self): - self.logger.debug(self.vapi.cli("show session verbose 2")) - +class QUICEchoInternalTransferTestCase(QUICEchoInternalTestCase): + """QUIC Echo Internal Transfer Test Case""" @unittest.skipUnless(running_extended_tests, "part of extended tests") def test_quic_internal_transfer(self): - """ QUIC internal echo client/server transfer """ - - self.internal_ipv4_transfer_test(self.server_echo_test_args, - self.client_echo_test_args + - ["no-output", "mbytes", "10"]) - - -class QUICInternalSerialEchoIPv4TestCase(QUICTestCase): - """ QUIC Internal Serial Echo IPv4 Transfer Test Cases """ - - @classmethod - def setUpClass(cls): - super(QUICInternalSerialEchoIPv4TestCase, cls).setUpClass() - - @classmethod - def tearDownClass(cls): - super(QUICInternalSerialEchoIPv4TestCase, cls).tearDownClass() - - def setUp(self): - super(QUICInternalSerialEchoIPv4TestCase, self).setUp() - self.thru_host_stack_ipv4_setup() - - def tearDown(self): - super(QUICInternalSerialEchoIPv4TestCase, self).tearDown() - self.thru_host_stack_ipv4_tear_down() - - def show_commands_at_teardown(self): - self.logger.debug(self.vapi.cli("show session verbose 2")) + self.server() + self.client("no-output", "mbytes", "10") +class QUICEchoInternalSerialTestCase(QUICEchoInternalTestCase): + """QUIC Echo Internal Serial Transfer Test Case""" @unittest.skipUnless(running_extended_tests, "part of extended tests") def test_quic_serial_internal_transfer(self): - """ QUIC serial internal echo client/server transfer """ - - client_args = (self.client_echo_test_args + - ["no-output", "mbytes", "10"]) - self.internal_ipv4_transfer_test(self.server_echo_test_args, - client_args) - self.start_internal_echo_client(client_args) - self.start_internal_echo_client(client_args) - self.start_internal_echo_client(client_args) - self.start_internal_echo_client(client_args) - - -class QUICInternalEchoIPv4MultiStreamTestCase(QUICTestCase): - """ QUIC Internal Echo IPv4 Transfer Test Cases """ - - @classmethod - def setUpClass(cls): - super(QUICInternalEchoIPv4MultiStreamTestCase, cls).setUpClass() - - @classmethod - def tearDownClass(cls): - super(QUICInternalEchoIPv4MultiStreamTestCase, cls).tearDownClass() - - def setUp(self): - super(QUICInternalEchoIPv4MultiStreamTestCase, self).setUp() - self.thru_host_stack_ipv4_setup() - - def tearDown(self): - super(QUICInternalEchoIPv4MultiStreamTestCase, self).tearDown() - self.thru_host_stack_ipv4_tear_down() - - def show_commands_at_teardown(self): - self.logger.debug(self.vapi.cli("show session verbose 2")) - + self.server() + self.client("no-output", "mbytes", "10") + self.client("no-output", "mbytes", "10") + self.client("no-output", "mbytes", "10") + self.client("no-output", "mbytes", "10") + self.client("no-output", "mbytes", "10") + +class QUICEchoInternalMStreamTestCase(QUICEchoInternalTestCase): + """QUIC Echo Internal MultiStream Test Case""" @unittest.skipUnless(running_extended_tests, "part of extended tests") def test_quic_internal_multistream_transfer(self): - """ QUIC internal echo client/server multi-stream transfer """ + self.server() + self.client("nclients", "10", "mbytes", "1", "no-output") - self.internal_ipv4_transfer_test(self.server_echo_test_args, - self.client_echo_test_args + - ["quic-streams", "10", - "mbytes", "1", - "no-output"]) +class QUICEchoExternalTestCase(QUICTestCase): + extra_vpp_punt_config = ["session", "{", "evt_qs_memfd_seg", "}"] + quic_setup = "default" -class QUICExternalEchoIPv4TestCase(QUICTestCase): - """ QUIC External Echo IPv4 Transfer Test Cases """ + def setUp(self): + super(QUICEchoExternalTestCase, self).setUp() + common_args = ["uri", self.uri, "fifo-size", "64", "test-bytes:assert", "socket-name", self.api_sock] + self.server_echo_test_args = common_args + ["server", "appns", "server", "quic-setup", self.quic_setup] + self.client_echo_test_args = common_args + ["client", "appns", "client", "quic-setup", self.quic_setup] + self.event = Event() + + def server(self, *args): + _args = self.server_echo_test_args + list(args) + self.worker_server = QUICAppWorker(self.build_dir, "quic_echo", + _args, self.logger, event=self.event) + self.worker_server.start() + self.sleep(self.pre_test_sleep) - @classmethod - def setUpConstants(cls): - super(QUICExternalEchoIPv4TestCase, cls).setUpConstants() - cls.vpp_cmdline.extend(["session", "{", "evt_qs_memfd_seg", "}"]) + def client(self, *args): + _args = self.client_echo_test_args + list(args) + # self.client_echo_test_args += "use-svm-api" + self.worker_client = QUICAppWorker(self.build_dir, "quic_echo", + _args, self.logger, event=self.event) + self.worker_client.start() + self.event.wait(self.timeout) + self.sleep(self.post_test_sleep) - @classmethod - def setUpClass(cls): - super(QUICExternalEchoIPv4TestCase, cls).setUpClass() + def validate_external_test_results(self): + self.logger.info("Client worker result is `%s'" % self.worker_client.result) + server_result = self.worker_server.result + client_result = self.worker_client.result + server_kill_error = False + if self.worker_server.result is None: + server_kill_error = self.worker_server.teardown(self.logger, self.timeout) + if self.worker_client.result is None: + self.worker_client.teardown(self.logger, self.timeout) + self.assertIsNone(server_result, "Wrong server worker return code") + self.assertIsNotNone(client_result, "Timeout! Client worker did not finish in %ss" % self.timeout) + self.assertEqual(client_result, 0, "Wrong client worker return code") + self.assertFalse(server_kill_error, "Server kill errored") - @classmethod - def tearDownClass(cls): - super(QUICExternalEchoIPv4TestCase, cls).tearDownClass() - def setUp(self): - super(QUICExternalEchoIPv4TestCase, self).setUp() - self.thru_host_stack_ipv4_setup() +class QUICEchoExternalTransferTestCase(QUICEchoExternalTestCase): + """QUIC Echo External Transfer Test Case""" + @unittest.skipUnless(running_extended_tests, "part of extended tests") + def test_quic_external_transfer(self): + self.server() + self.client() + self.validate_external_test_results() - def tearDown(self): - super(QUICExternalEchoIPv4TestCase, self).tearDown() - self.thru_host_stack_ipv4_tear_down() +class QUICEchoExternalServerStreamTestCase(QUICEchoExternalTestCase): + """QUIC Echo External Transfer Server Stream Test Case""" + quic_setup = "serverstream" + + @unittest.skipUnless(running_extended_tests, "part of extended tests") + def test_quic_external_transfer_server_stream(self): + self.server("nclients", "1/1", "send", "1Kb", "recv", "0") + self.client("nclients" ,"1/1", "send", "0", "recv", "1Kb") + self.validate_external_test_results() - def show_commands_at_teardown(self): - self.logger.debug(self.vapi.cli("show session verbose 2")) +class QUICEchoExternalServerStreamWorkersTestCase(QUICEchoExternalTestCase): + """QUIC Echo External Transfer Server Stream MultiWorker Test Case""" + quic_setup = "serverstream" @unittest.skipUnless(running_extended_tests, "part of extended tests") - def test_quic_external_transfer(self): - """ QUIC external echo client/server transfer """ - - self.external_ipv4_transfer_test(self.server_echo_test_args + - ["socket-name", self.api_sock, - "server"], - self.client_echo_test_args + - ["socket-name", self.api_sock, - "client", "mbytes", "10"]) + def test_quic_external_transfer_server_stream_multi_workers(self): + self.server("nclients", "4/4", "send", "1Kb", "recv", "0") + self.client("nclients", "4/4", "send", "0", "recv", "1Kb") + self.validate_external_test_results() if __name__ == '__main__': -- 2.16.6