From 502785b65c40351f62e510a245ccee56084a07f4 Mon Sep 17 00:00:00 2001 From: Aloys Augustin Date: Tue, 9 Apr 2019 11:40:57 +0200 Subject: [PATCH] QUIC: Add multi-stream support to internal test apps Change-Id: Iab07697ef482529e62c11433cffa1f8f894e5bb7 Signed-off-by: Aloys Augustin Signed-off-by: Nathan Skrzypczak --- src/vnet/session-apps/echo_client.c | 122 ++++++++++++++++++++++++++++++- src/vnet/session-apps/echo_client.h | 2 + src/vnet/session-apps/echo_server.c | 44 ++++++++++- src/vnet/session/application_interface.c | 8 ++ src/vnet/session/application_interface.h | 1 + src/vnet/session/session_types.h | 1 + 6 files changed, 171 insertions(+), 7 deletions(-) diff --git a/src/vnet/session-apps/echo_client.c b/src/vnet/session-apps/echo_client.c index 39f464d23ce..8366c56aa6b 100644 --- a/src/vnet/session-apps/echo_client.c +++ b/src/vnet/session-apps/echo_client.c @@ -23,6 +23,9 @@ echo_client_main_t echo_client_main; #define ECHO_CLIENT_DBG (0) +#define DBG(_fmt, _args...) \ + if (ECHO_CLIENT_DBG) \ + clib_warning (_fmt, ##_args) static void signal_evt_to_cli_i (int *code) @@ -351,11 +354,116 @@ echo_clients_init (vlib_main_t * vm) vec_validate (ecm->connection_index_by_thread, vtm->n_vlib_mains); vec_validate (ecm->connections_this_batch_by_thread, vtm->n_vlib_mains); + vec_validate (ecm->quic_session_index_by_thread, vtm->n_vlib_mains); vec_validate (ecm->vpp_event_queue, vtm->n_vlib_mains); return 0; } +static int +quic_echo_clients_qsession_connected_callback (u32 app_index, u32 api_context, + session_t * s, u8 is_fail) +{ + echo_client_main_t *ecm = &echo_client_main; + vnet_connect_args_t a; + int rv; + u8 thread_index = vlib_get_thread_index (); + session_endpoint_cfg_t sep = SESSION_ENDPOINT_CFG_NULL; + + DBG ("QUIC Connection handle %d", session_handle (s)); + + a.uri = (char *) ecm->connect_uri; + parse_uri (a.uri, &sep); + sep.transport_opts = session_handle (s); + sep.port = 0; + clib_memset (&a, 0, sizeof (a)); + a.app_index = ecm->app_index; + a.api_context = -1 - api_context; + clib_memcpy (&a.sep_ext, &sep, sizeof (sep)); + + if ((rv = vnet_connect (&a))) + { + clib_error ("Session opening failed: %d", rv); + return -1; + } + vec_add1 (ecm->quic_session_index_by_thread[thread_index], + session_handle (s)); + return 0; +} + +static int +quic_echo_clients_session_connected_callback (u32 app_index, u32 api_context, + session_t * s, u8 is_fail) +{ + echo_client_main_t *ecm = &echo_client_main; + eclient_session_t *session; + u32 session_index; + u8 thread_index; + + if (PREDICT_FALSE (ecm->run_test != ECHO_CLIENTS_STARTING)) + return -1; + + if (is_fail) + { + clib_warning ("connection %d failed!", api_context); + ecm->run_test = ECHO_CLIENTS_EXITING; + signal_evt_to_cli (-1); + return 0; + } + + if (!(s->flags & SESSION_F_QUIC_STREAM)) + return quic_echo_clients_qsession_connected_callback (app_index, + api_context, s, + is_fail); + DBG ("STREAM Connection callback %d", api_context); + + thread_index = s->thread_index; + ASSERT (thread_index == vlib_get_thread_index () + || session_transport_service_type (s) == TRANSPORT_SERVICE_CL); + + if (!ecm->vpp_event_queue[thread_index]) + ecm->vpp_event_queue[thread_index] = + session_main_get_vpp_event_queue (thread_index); + + /* + * Setup session + */ + clib_spinlock_lock_if_init (&ecm->sessions_lock); + pool_get (ecm->sessions, session); + clib_spinlock_unlock_if_init (&ecm->sessions_lock); + + clib_memset (session, 0, sizeof (*session)); + session_index = session - ecm->sessions; + session->bytes_to_send = ecm->bytes_to_send; + session->bytes_to_receive = ecm->no_return ? 0ULL : ecm->bytes_to_send; + session->data.rx_fifo = s->rx_fifo; + session->data.rx_fifo->client_session_index = session_index; + session->data.tx_fifo = s->tx_fifo; + session->data.tx_fifo->client_session_index = session_index; + session->data.vpp_evt_q = ecm->vpp_event_queue[thread_index]; + session->vpp_session_handle = session_handle (s); + + if (ecm->is_dgram) + { + transport_connection_t *tc; + tc = session_get_transport (s); + clib_memcpy_fast (&session->data.transport, tc, + sizeof (session->data.transport)); + session->data.is_dgram = 1; + } + + vec_add1 (ecm->connection_index_by_thread[thread_index], session_index); + clib_atomic_fetch_add (&ecm->ready_connections, 1); + if (ecm->ready_connections == ecm->expected_connections) + { + ecm->run_test = ECHO_CLIENTS_RUNNING; + /* Signal the CLI process that the action is starting... */ + signal_evt_to_cli (1); + } + + return 0; +} + static int echo_clients_session_connected_callback (u32 app_index, u32 api_context, session_t * s, u8 is_fail) @@ -519,6 +627,9 @@ echo_clients_attach (u8 * appns_id, u64 appns_flags, u64 appns_secret) clib_memset (options, 0, sizeof (options)); a->api_client_index = ecm->my_client_index; + if (ecm->transport_proto == TRANSPORT_PROTO_QUIC) + echo_clients.session_connected_callback = + quic_echo_clients_session_connected_callback; a->session_cb_vft = &echo_clients; prealloc_fifos = ecm->prealloc_fifos ? ecm->expected_connections : 1; @@ -597,12 +708,12 @@ echo_clients_connect (vlib_main_t * vm, u32 n_clients) int i, rv; clib_memset (a, 0, sizeof (*a)); + for (i = 0; i < n_clients; i++) { a->uri = (char *) ecm->connect_uri; a->api_context = i; a->app_index = ecm->app_index; - if ((rv = vnet_connect_uri (a))) return clib_error_return (0, "connect returned: %d", rv); @@ -637,6 +748,8 @@ echo_clients_command_fn (vlib_main_t * vm, clib_error_t *error = 0; u8 *appns_id = 0; int i; + session_endpoint_cfg_t sep = SESSION_ENDPOINT_CFG_NULL; + int rv; ecm->bytes_to_send = 8192; ecm->no_return = 0; @@ -738,8 +851,10 @@ echo_clients_command_fn (vlib_main_t * vm, ecm->connect_uri = format (0, "%s%c", default_uri, 0); } - if (ecm->connect_uri[0] == 'u' && ecm->connect_uri[3] != 'c') - ecm->is_dgram = 1; + if ((rv = parse_uri ((char *) ecm->connect_uri, &sep))) + return clib_error_return (0, "Uri parse error: %d", rv); + ecm->transport_proto = sep.transport_proto; + ecm->is_dgram = (sep.transport_proto == TRANSPORT_PROTO_UDP); #if ECHO_CLIENT_PTHREAD echo_clients_start_tx_pthread (); @@ -858,6 +973,7 @@ cleanup: { vec_reset_length (ecm->connection_index_by_thread[i]); vec_reset_length (ecm->connections_this_batch_by_thread[i]); + vec_reset_length (ecm->quic_session_index_by_thread[i]); } pool_free (ecm->sessions); diff --git a/src/vnet/session-apps/echo_client.h b/src/vnet/session-apps/echo_client.h index b183ed7f2c7..81ffcae77dd 100644 --- a/src/vnet/session-apps/echo_client.h +++ b/src/vnet/session-apps/echo_client.h @@ -74,6 +74,7 @@ typedef struct clib_spinlock_t sessions_lock; u8 **rx_buf; /**< intermediate rx buffers */ u8 *connect_test_data; /**< Pre-computed test data */ + u32 **quic_session_index_by_thread; u32 **connection_index_by_thread; u32 **connections_this_batch_by_thread; /**< active connection batch */ pthread_t client_thread_handle; @@ -101,6 +102,7 @@ typedef struct u8 no_output; u8 test_bytes; u8 test_failed; + u8 transport_proto; vlib_main_t *vlib_main; } echo_client_main_t; diff --git a/src/vnet/session-apps/echo_server.c b/src/vnet/session-apps/echo_server.c index 4249ed83292..7459d03202f 100644 --- a/src/vnet/session-apps/echo_server.c +++ b/src/vnet/session-apps/echo_server.c @@ -19,6 +19,11 @@ #include #include +#define ECHO_SERVER_DBG (0) +#define DBG(_fmt, _args...) \ + if (ECHO_SERVER_DBG) \ + clib_warning (_fmt, ##_args) + typedef struct { /* @@ -49,6 +54,7 @@ typedef struct u8 **rx_buf; /**< Per-thread RX buffer */ u64 byte_index; u32 **rx_retries; + u8 transport_proto; vlib_main_t *vlib_main; } echo_server_main_t; @@ -56,10 +62,34 @@ typedef struct echo_server_main_t echo_server_main; int -echo_server_session_accept_callback (session_t * s) +quic_echo_server_qsession_accept_callback (session_t * s) +{ + DBG ("QSession %u accept w/opaque %d", s->session_index, s->opaque); + return 0; +} + +int +quic_echo_server_session_accept_callback (session_t * s) { echo_server_main_t *esm = &echo_server_main; + if (!(s->flags & SESSION_F_QUIC_STREAM)) + return quic_echo_server_qsession_accept_callback (s); + DBG ("SSESSION %u accept w/opaque %d", s->session_index, s->opaque); + + esm->vpp_queue[s->thread_index] = + session_main_get_vpp_event_queue (s->thread_index); + s->session_state = SESSION_STATE_READY; + esm->byte_index = 0; + ASSERT (vec_len (esm->rx_retries) > s->thread_index); + vec_validate (esm->rx_retries[s->thread_index], s->session_index); + esm->rx_retries[s->thread_index][s->session_index] = 0; + return 0; +} +int +echo_server_session_accept_callback (session_t * s) +{ + echo_server_main_t *esm = &echo_server_main; esm->vpp_queue[s->thread_index] = session_main_get_vpp_event_queue (s->thread_index); s->session_state = SESSION_STATE_READY; @@ -304,6 +334,9 @@ echo_server_attach (u8 * appns_id, u64 appns_flags, u64 appns_secret) else echo_server_session_cb_vft.builtin_app_rx_callback = echo_server_rx_callback; + if (esm->transport_proto == TRANSPORT_PROTO_QUIC) + echo_server_session_cb_vft.session_accept_callback = + quic_echo_server_session_accept_callback; if (esm->private_segment_size) segment_size = esm->private_segment_size; @@ -426,6 +459,7 @@ echo_server_create_command_fn (vlib_main_t * vm, unformat_input_t * input, u64 tmp, appns_flags = 0, appns_secret = 0; char *default_uri = "tcp://0.0.0.0/1234"; int rv, is_stop = 0; + session_endpoint_cfg_t sep = SESSION_ENDPOINT_CFG_NULL; esm->no_echo = 0; esm->fifo_size = 64 << 10; @@ -434,7 +468,6 @@ echo_server_create_command_fn (vlib_main_t * vm, unformat_input_t * input, esm->private_segment_count = 0; esm->private_segment_size = 0; esm->tls_engine = TLS_ENGINE_OPENSSL; - esm->is_dgram = 0; vec_free (esm->server_uri); while (unformat_check_input (input) != UNFORMAT_END_OF_INPUT) @@ -503,8 +536,11 @@ echo_server_create_command_fn (vlib_main_t * vm, unformat_input_t * input, clib_warning ("No uri provided! Using default: %s", default_uri); esm->server_uri = (char *) format (0, "%s%c", default_uri, 0); } - if (esm->server_uri[0] == 'u' && esm->server_uri[3] != 'c') - esm->is_dgram = 1; + + if ((rv = parse_uri ((char *) esm->server_uri, &sep))) + return clib_error_return (0, "Uri parse error: %d", rv); + esm->transport_proto = sep.transport_proto; + esm->is_dgram = (sep.transport_proto == TRANSPORT_PROTO_UDP); rv = echo_server_create (vm, appns_id, appns_flags, appns_secret); vec_free (appns_id); diff --git a/src/vnet/session/application_interface.c b/src/vnet/session/application_interface.c index ae00292d918..2bd3ceb2785 100644 --- a/src/vnet/session/application_interface.c +++ b/src/vnet/session/application_interface.c @@ -78,6 +78,14 @@ unformat_vnet_uri (unformat_input_t * input, va_list * args) sep->is_ip4 = 0; return 1; } + else if (unformat (input, "%U://session/%u", unformat_transport_proto, + &transport_proto, &sep->transport_opts)) + { + sep->transport_proto = transport_proto; + sep->is_ip4 = 1; + sep->ip.ip4.as_u32 = 1; /* ip need to be non zero in vnet */ + return 1; + } return 0; } diff --git a/src/vnet/session/application_interface.h b/src/vnet/session/application_interface.h index b49744c0b29..f6091c51698 100644 --- a/src/vnet/session/application_interface.h +++ b/src/vnet/session/application_interface.h @@ -211,6 +211,7 @@ typedef enum session_fd_flag_ #undef _ } session_fd_flag_t; +int parse_uri (char *uri, session_endpoint_cfg_t * sep); int vnet_bind_uri (vnet_listen_args_t *); int vnet_unbind_uri (vnet_unlisten_args_t * a); int vnet_connect_uri (vnet_connect_args_t * a); diff --git a/src/vnet/session/session_types.h b/src/vnet/session/session_types.h index 32a13cf94f9..b3924398977 100644 --- a/src/vnet/session/session_types.h +++ b/src/vnet/session/session_types.h @@ -133,6 +133,7 @@ typedef enum session_flags_ { SESSION_F_RX_EVT = 1, SESSION_F_PROXY = (1 << 1), + SESSION_F_QUIC_STREAM = (1 << 2), } session_flags_t; typedef struct session_ -- 2.16.6