X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=src%2Fvnet%2Fsession-apps%2Fecho_client.c;h=edc6a2a3990695aa5464ea22436e1416244e2661;hb=63b19bf414530a95bce4d4148a375fff60d411d8;hp=cb93e81054cb0e16db2b3e2ad91547e255befe05;hpb=222e1f4160a5828bb2b5bf62716cd76664f6100b;p=vpp.git diff --git a/src/vnet/session-apps/echo_client.c b/src/vnet/session-apps/echo_client.c index cb93e81054c..edc6a2a3990 100644 --- a/src/vnet/session-apps/echo_client.c +++ b/src/vnet/session-apps/echo_client.c @@ -1,7 +1,7 @@ /* * echo_client.c - vpp built-in echo client code * - * Copyright (c) 2017 by Cisco and/or its affiliates. + * Copyright (c) 2017-2019 by Cisco and/or its affiliates. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: @@ -23,6 +23,9 @@ echo_client_main_t echo_client_main; #define ECHO_CLIENT_DBG (0) +#define DBG(_fmt, _args...) \ + if (ECHO_CLIENT_DBG) \ + clib_warning (_fmt, ##_args) static void signal_evt_to_cli_i (int *code) @@ -60,10 +63,11 @@ send_data_chunk (echo_client_main_t * ecm, eclient_session_t * s) if (ecm->no_copy) { svm_fifo_t *f = s->data.tx_fifo; - rv = clib_min (svm_fifo_max_enqueue (f), bytes_this_chunk); + rv = clib_min (svm_fifo_max_enqueue_prod (f), bytes_this_chunk); svm_fifo_enqueue_nocopy (f, rv); - session_send_io_evt_to_thread_custom (f, s->thread_index, - FIFO_EVENT_APP_TX); + session_send_io_evt_to_thread_custom (&f->master_session_index, + s->thread_index, + SESSION_IO_EVT_TX); } else rv = app_send_stream (&s->data, test_data + test_buf_offset, @@ -76,7 +80,7 @@ send_data_chunk (echo_client_main_t * ecm, eclient_session_t * s) session_dgram_hdr_t hdr; svm_fifo_t *f = s->data.tx_fifo; app_session_transport_t *at = &s->data.transport; - u32 max_enqueue = svm_fifo_max_enqueue (f); + u32 max_enqueue = svm_fifo_max_enqueue_prod (f); if (max_enqueue <= sizeof (session_dgram_hdr_t)) return; @@ -93,10 +97,11 @@ send_data_chunk (echo_client_main_t * ecm, eclient_session_t * s) clib_memcpy_fast (&hdr.lcl_ip, &at->lcl_ip, sizeof (ip46_address_t)); hdr.lcl_port = at->lcl_port; - svm_fifo_enqueue_nowait (f, sizeof (hdr), (u8 *) & hdr); + svm_fifo_enqueue (f, sizeof (hdr), (u8 *) & hdr); svm_fifo_enqueue_nocopy (f, rv); - session_send_io_evt_to_thread_custom (f, s->thread_index, - FIFO_EVENT_APP_TX); + session_send_io_evt_to_thread_custom (&f->master_session_index, + s->thread_index, + SESSION_IO_EVT_TX); } else rv = app_send_dgram (&s->data, test_data + test_buf_offset, @@ -149,7 +154,7 @@ receive_data_chunk (echo_client_main_t * ecm, eclient_session_t * s) } else { - n_read = svm_fifo_max_dequeue (rx_fifo); + n_read = svm_fifo_max_dequeue_cons (rx_fifo); svm_fifo_dequeue_drop (rx_fifo, n_read); } @@ -349,11 +354,125 @@ echo_clients_init (vlib_main_t * vm) vec_validate (ecm->connection_index_by_thread, vtm->n_vlib_mains); vec_validate (ecm->connections_this_batch_by_thread, vtm->n_vlib_mains); + vec_validate (ecm->quic_session_index_by_thread, vtm->n_vlib_mains); vec_validate (ecm->vpp_event_queue, vtm->n_vlib_mains); return 0; } +static int +quic_echo_clients_qsession_connected_callback (u32 app_index, u32 api_context, + session_t * s, u8 is_fail) +{ + echo_client_main_t *ecm = &echo_client_main; + vnet_connect_args_t *a = 0; + int rv; + u8 thread_index = vlib_get_thread_index (); + session_endpoint_cfg_t sep = SESSION_ENDPOINT_CFG_NULL; + u32 stream_n; + + DBG ("QUIC Connection handle %d", session_handle (s)); + + vec_validate (a, 1); + a->uri = (char *) ecm->connect_uri; + parse_uri (a->uri, &sep); + sep.transport_opts = session_handle (s); + sep.port = 0; /* QUIC: create a stream flag */ + + for (stream_n = 0; stream_n < ecm->quic_streams; stream_n++) + { + clib_memset (a, 0, sizeof (a)); + a->app_index = ecm->app_index; + a->api_context = -1 - api_context; + clib_memcpy (&a->sep_ext, &sep, sizeof (sep)); + + DBG ("QUIC opening stream %d", stream_n); + if ((rv = vnet_connect (a))) + { + clib_error ("Stream session %d opening failed: %d", stream_n, rv); + return -1; + } + DBG ("QUIC stream %d connected", stream_n); + } + vec_add1 (ecm->quic_session_index_by_thread[thread_index], + session_handle (s)); + vec_free (a); + return 0; +} + +static int +quic_echo_clients_session_connected_callback (u32 app_index, u32 api_context, + session_t * s, u8 is_fail) +{ + echo_client_main_t *ecm = &echo_client_main; + eclient_session_t *session; + u32 session_index; + u8 thread_index; + + if (PREDICT_FALSE (ecm->run_test != ECHO_CLIENTS_STARTING)) + return -1; + + if (is_fail) + { + clib_warning ("connection %d failed!", api_context); + ecm->run_test = ECHO_CLIENTS_EXITING; + signal_evt_to_cli (-1); + return 0; + } + + if (!(s->flags & SESSION_F_QUIC_STREAM)) + return quic_echo_clients_qsession_connected_callback (app_index, + api_context, s, + is_fail); + DBG ("STREAM Connection callback %d", api_context); + + thread_index = s->thread_index; + ASSERT (thread_index == vlib_get_thread_index () + || session_transport_service_type (s) == TRANSPORT_SERVICE_CL); + + if (!ecm->vpp_event_queue[thread_index]) + ecm->vpp_event_queue[thread_index] = + session_main_get_vpp_event_queue (thread_index); + + /* + * Setup session + */ + clib_spinlock_lock_if_init (&ecm->sessions_lock); + pool_get (ecm->sessions, session); + clib_spinlock_unlock_if_init (&ecm->sessions_lock); + + clib_memset (session, 0, sizeof (*session)); + session_index = session - ecm->sessions; + session->bytes_to_send = ecm->bytes_to_send; + session->bytes_to_receive = ecm->no_return ? 0ULL : ecm->bytes_to_send; + session->data.rx_fifo = s->rx_fifo; + session->data.rx_fifo->client_session_index = session_index; + session->data.tx_fifo = s->tx_fifo; + session->data.tx_fifo->client_session_index = session_index; + session->data.vpp_evt_q = ecm->vpp_event_queue[thread_index]; + session->vpp_session_handle = session_handle (s); + + if (ecm->is_dgram) + { + transport_connection_t *tc; + tc = session_get_transport (s); + clib_memcpy_fast (&session->data.transport, tc, + sizeof (session->data.transport)); + session->data.is_dgram = 1; + } + + vec_add1 (ecm->connection_index_by_thread[thread_index], session_index); + clib_atomic_fetch_add (&ecm->ready_connections, 1); + if (ecm->ready_connections == ecm->expected_connections) + { + ecm->run_test = ECHO_CLIENTS_RUNNING; + /* Signal the CLI process that the action is starting... */ + signal_evt_to_cli (1); + } + + return 0; +} + static int echo_clients_session_connected_callback (u32 app_index, u32 api_context, session_t * s, u8 is_fail) @@ -380,7 +499,7 @@ echo_clients_session_connected_callback (u32 app_index, u32 api_context, if (!ecm->vpp_event_queue[thread_index]) ecm->vpp_event_queue[thread_index] = - session_manager_get_vpp_event_queue (thread_index); + session_main_get_vpp_event_queue (thread_index); /* * Setup session @@ -428,7 +547,7 @@ echo_clients_session_reset_callback (session_t * s) vnet_disconnect_args_t _a = { 0 }, *a = &_a; if (s->session_state == SESSION_STATE_READY) - clib_warning ("Reset active connection %U", format_stream_session, s, 2); + clib_warning ("Reset active connection %U", format_session, s, 2); a->handle = session_handle (s); a->app_index = ecm->app_index; @@ -478,10 +597,10 @@ echo_clients_rx_callback (session_t * s) sp = pool_elt_at_index (ecm->sessions, s->rx_fifo->client_session_index); receive_data_chunk (ecm, sp); - if (svm_fifo_max_dequeue (s->rx_fifo)) + if (svm_fifo_max_dequeue_cons (s->rx_fifo)) { if (svm_fifo_set_event (s->rx_fifo)) - session_send_io_evt_to_thread (s->rx_fifo, FIFO_EVENT_BUILTIN_RX); + session_send_io_evt_to_thread (s->rx_fifo, SESSION_IO_EVT_BUILTIN_RX); } return 0; } @@ -517,6 +636,9 @@ echo_clients_attach (u8 * appns_id, u64 appns_flags, u64 appns_secret) clib_memset (options, 0, sizeof (options)); a->api_client_index = ecm->my_client_index; + if (ecm->transport_proto == TRANSPORT_PROTO_QUIC) + echo_clients.session_connected_callback = + quic_echo_clients_session_connected_callback; a->session_cb_vft = &echo_clients; prealloc_fifos = ecm->prealloc_fifos ? ecm->expected_connections : 1; @@ -595,12 +717,12 @@ echo_clients_connect (vlib_main_t * vm, u32 n_clients) int i, rv; clib_memset (a, 0, sizeof (*a)); + for (i = 0; i < n_clients; i++) { a->uri = (char *) ecm->connect_uri; a->api_context = i; a->app_index = ecm->app_index; - if ((rv = vnet_connect_uri (a))) return clib_error_return (0, "connect returned: %d", rv); @@ -635,7 +757,10 @@ echo_clients_command_fn (vlib_main_t * vm, clib_error_t *error = 0; u8 *appns_id = 0; int i; + session_endpoint_cfg_t sep = SESSION_ENDPOINT_CFG_NULL; + int rv; + ecm->quic_streams = 1; ecm->bytes_to_send = 8192; ecm->no_return = 0; ecm->fifo_size = 64 << 10; @@ -660,6 +785,8 @@ echo_clients_command_fn (vlib_main_t * vm, ; else if (unformat (input, "nclients %d", &n_clients)) ; + else if (unformat (input, "quic-streams %d", &ecm->quic_streams)) + ; else if (unformat (input, "mbytes %lld", &tmp)) ecm->bytes_to_send = tmp << 20; else if (unformat (input, "gbytes %lld", &tmp)) @@ -726,7 +853,7 @@ echo_clients_command_fn (vlib_main_t * vm, ecm->ready_connections = 0; - ecm->expected_connections = n_clients; + ecm->expected_connections = n_clients * ecm->quic_streams; ecm->rx_total = 0; ecm->tx_total = 0; @@ -736,8 +863,10 @@ echo_clients_command_fn (vlib_main_t * vm, ecm->connect_uri = format (0, "%s%c", default_uri, 0); } - if (ecm->connect_uri[0] == 'u' && ecm->connect_uri[3] != 'c') - ecm->is_dgram = 1; + if ((rv = parse_uri ((char *) ecm->connect_uri, &sep))) + return clib_error_return (0, "Uri parse error: %d", rv); + ecm->transport_proto = sep.transport_proto; + ecm->is_dgram = (sep.transport_proto == TRANSPORT_PROTO_UDP); #if ECHO_CLIENT_PTHREAD echo_clients_start_tx_pthread (); @@ -856,6 +985,7 @@ cleanup: { vec_reset_length (ecm->connection_index_by_thread[i]); vec_reset_length (ecm->connections_this_batch_by_thread[i]); + vec_reset_length (ecm->quic_session_index_by_thread[i]); } pool_free (ecm->sessions);