/*
* echo_client.c - vpp built-in echo client code
*
- * Copyright (c) 2017 by Cisco and/or its affiliates.
+ * Copyright (c) 2017-2019 by Cisco and/or its affiliates.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
echo_client_main_t echo_client_main;
#define ECHO_CLIENT_DBG (0)
+#define DBG(_fmt, _args...) \
+ if (ECHO_CLIENT_DBG) \
+ clib_warning (_fmt, ##_args)
static void
signal_evt_to_cli_i (int *code)
if (ecm->no_copy)
{
svm_fifo_t *f = s->data.tx_fifo;
- rv = clib_min (svm_fifo_max_enqueue (f), bytes_this_chunk);
+ rv = clib_min (svm_fifo_max_enqueue_prod (f), bytes_this_chunk);
svm_fifo_enqueue_nocopy (f, rv);
- session_send_io_evt_to_thread_custom (f, s->thread_index,
- FIFO_EVENT_APP_TX);
+ session_send_io_evt_to_thread_custom (&f->master_session_index,
+ s->thread_index,
+ SESSION_IO_EVT_TX);
}
else
rv = app_send_stream (&s->data, test_data + test_buf_offset,
session_dgram_hdr_t hdr;
svm_fifo_t *f = s->data.tx_fifo;
app_session_transport_t *at = &s->data.transport;
- u32 max_enqueue = svm_fifo_max_enqueue (f);
+ u32 max_enqueue = svm_fifo_max_enqueue_prod (f);
if (max_enqueue <= sizeof (session_dgram_hdr_t))
return;
clib_memcpy_fast (&hdr.lcl_ip, &at->lcl_ip,
sizeof (ip46_address_t));
hdr.lcl_port = at->lcl_port;
- svm_fifo_enqueue_nowait (f, sizeof (hdr), (u8 *) & hdr);
+ svm_fifo_enqueue (f, sizeof (hdr), (u8 *) & hdr);
svm_fifo_enqueue_nocopy (f, rv);
- session_send_io_evt_to_thread_custom (f, s->thread_index,
- FIFO_EVENT_APP_TX);
+ session_send_io_evt_to_thread_custom (&f->master_session_index,
+ s->thread_index,
+ SESSION_IO_EVT_TX);
}
else
rv = app_send_dgram (&s->data, test_data + test_buf_offset,
}
else
{
- n_read = svm_fifo_max_dequeue (rx_fifo);
+ n_read = svm_fifo_max_dequeue_cons (rx_fifo);
svm_fifo_dequeue_drop (rx_fifo, n_read);
}
vec_validate (ecm->connection_index_by_thread, vtm->n_vlib_mains);
vec_validate (ecm->connections_this_batch_by_thread, vtm->n_vlib_mains);
+ vec_validate (ecm->quic_session_index_by_thread, vtm->n_vlib_mains);
vec_validate (ecm->vpp_event_queue, vtm->n_vlib_mains);
return 0;
}
+static int
+quic_echo_clients_qsession_connected_callback (u32 app_index, u32 api_context,
+ session_t * s, u8 is_fail)
+{
+ echo_client_main_t *ecm = &echo_client_main;
+ vnet_connect_args_t *a = 0;
+ int rv;
+ u8 thread_index = vlib_get_thread_index ();
+ session_endpoint_cfg_t sep = SESSION_ENDPOINT_CFG_NULL;
+ u32 stream_n;
+
+ DBG ("QUIC Connection handle %d", session_handle (s));
+
+ vec_validate (a, 1);
+ a->uri = (char *) ecm->connect_uri;
+ parse_uri (a->uri, &sep);
+ sep.transport_opts = session_handle (s);
+ sep.port = 0; /* QUIC: create a stream flag */
+
+ for (stream_n = 0; stream_n < ecm->quic_streams; stream_n++)
+ {
+ clib_memset (a, 0, sizeof (a));
+ a->app_index = ecm->app_index;
+ a->api_context = -1 - api_context;
+ clib_memcpy (&a->sep_ext, &sep, sizeof (sep));
+
+ DBG ("QUIC opening stream %d", stream_n);
+ if ((rv = vnet_connect (a)))
+ {
+ clib_error ("Stream session %d opening failed: %d", stream_n, rv);
+ return -1;
+ }
+ DBG ("QUIC stream %d connected", stream_n);
+ }
+ vec_add1 (ecm->quic_session_index_by_thread[thread_index],
+ session_handle (s));
+ vec_free (a);
+ return 0;
+}
+
+static int
+quic_echo_clients_session_connected_callback (u32 app_index, u32 api_context,
+ session_t * s, u8 is_fail)
+{
+ echo_client_main_t *ecm = &echo_client_main;
+ eclient_session_t *session;
+ u32 session_index;
+ u8 thread_index;
+
+ if (PREDICT_FALSE (ecm->run_test != ECHO_CLIENTS_STARTING))
+ return -1;
+
+ if (is_fail)
+ {
+ clib_warning ("connection %d failed!", api_context);
+ ecm->run_test = ECHO_CLIENTS_EXITING;
+ signal_evt_to_cli (-1);
+ return 0;
+ }
+
+ if (!(s->flags & SESSION_F_QUIC_STREAM))
+ return quic_echo_clients_qsession_connected_callback (app_index,
+ api_context, s,
+ is_fail);
+ DBG ("STREAM Connection callback %d", api_context);
+
+ thread_index = s->thread_index;
+ ASSERT (thread_index == vlib_get_thread_index ()
+ || session_transport_service_type (s) == TRANSPORT_SERVICE_CL);
+
+ if (!ecm->vpp_event_queue[thread_index])
+ ecm->vpp_event_queue[thread_index] =
+ session_main_get_vpp_event_queue (thread_index);
+
+ /*
+ * Setup session
+ */
+ clib_spinlock_lock_if_init (&ecm->sessions_lock);
+ pool_get (ecm->sessions, session);
+ clib_spinlock_unlock_if_init (&ecm->sessions_lock);
+
+ clib_memset (session, 0, sizeof (*session));
+ session_index = session - ecm->sessions;
+ session->bytes_to_send = ecm->bytes_to_send;
+ session->bytes_to_receive = ecm->no_return ? 0ULL : ecm->bytes_to_send;
+ session->data.rx_fifo = s->rx_fifo;
+ session->data.rx_fifo->client_session_index = session_index;
+ session->data.tx_fifo = s->tx_fifo;
+ session->data.tx_fifo->client_session_index = session_index;
+ session->data.vpp_evt_q = ecm->vpp_event_queue[thread_index];
+ session->vpp_session_handle = session_handle (s);
+
+ if (ecm->is_dgram)
+ {
+ transport_connection_t *tc;
+ tc = session_get_transport (s);
+ clib_memcpy_fast (&session->data.transport, tc,
+ sizeof (session->data.transport));
+ session->data.is_dgram = 1;
+ }
+
+ vec_add1 (ecm->connection_index_by_thread[thread_index], session_index);
+ clib_atomic_fetch_add (&ecm->ready_connections, 1);
+ if (ecm->ready_connections == ecm->expected_connections)
+ {
+ ecm->run_test = ECHO_CLIENTS_RUNNING;
+ /* Signal the CLI process that the action is starting... */
+ signal_evt_to_cli (1);
+ }
+
+ return 0;
+}
+
static int
echo_clients_session_connected_callback (u32 app_index, u32 api_context,
session_t * s, u8 is_fail)
if (!ecm->vpp_event_queue[thread_index])
ecm->vpp_event_queue[thread_index] =
- session_manager_get_vpp_event_queue (thread_index);
+ session_main_get_vpp_event_queue (thread_index);
/*
* Setup session
vnet_disconnect_args_t _a = { 0 }, *a = &_a;
if (s->session_state == SESSION_STATE_READY)
- clib_warning ("Reset active connection %U", format_stream_session, s, 2);
+ clib_warning ("Reset active connection %U", format_session, s, 2);
a->handle = session_handle (s);
a->app_index = ecm->app_index;
sp = pool_elt_at_index (ecm->sessions, s->rx_fifo->client_session_index);
receive_data_chunk (ecm, sp);
- if (svm_fifo_max_dequeue (s->rx_fifo))
+ if (svm_fifo_max_dequeue_cons (s->rx_fifo))
{
if (svm_fifo_set_event (s->rx_fifo))
- session_send_io_evt_to_thread (s->rx_fifo, FIFO_EVENT_BUILTIN_RX);
+ session_send_io_evt_to_thread (s->rx_fifo, SESSION_IO_EVT_BUILTIN_RX);
}
return 0;
}
clib_memset (options, 0, sizeof (options));
a->api_client_index = ecm->my_client_index;
+ if (ecm->transport_proto == TRANSPORT_PROTO_QUIC)
+ echo_clients.session_connected_callback =
+ quic_echo_clients_session_connected_callback;
a->session_cb_vft = &echo_clients;
prealloc_fifos = ecm->prealloc_fifos ? ecm->expected_connections : 1;
int i, rv;
clib_memset (a, 0, sizeof (*a));
+
for (i = 0; i < n_clients; i++)
{
a->uri = (char *) ecm->connect_uri;
a->api_context = i;
a->app_index = ecm->app_index;
-
if ((rv = vnet_connect_uri (a)))
return clib_error_return (0, "connect returned: %d", rv);
clib_error_t *error = 0;
u8 *appns_id = 0;
int i;
+ session_endpoint_cfg_t sep = SESSION_ENDPOINT_CFG_NULL;
+ int rv;
+ ecm->quic_streams = 1;
ecm->bytes_to_send = 8192;
ecm->no_return = 0;
ecm->fifo_size = 64 << 10;
;
else if (unformat (input, "nclients %d", &n_clients))
;
+ else if (unformat (input, "quic-streams %d", &ecm->quic_streams))
+ ;
else if (unformat (input, "mbytes %lld", &tmp))
ecm->bytes_to_send = tmp << 20;
else if (unformat (input, "gbytes %lld", &tmp))
ecm->ready_connections = 0;
- ecm->expected_connections = n_clients;
+ ecm->expected_connections = n_clients * ecm->quic_streams;
ecm->rx_total = 0;
ecm->tx_total = 0;
ecm->connect_uri = format (0, "%s%c", default_uri, 0);
}
- if (ecm->connect_uri[0] == 'u' && ecm->connect_uri[3] != 'c')
- ecm->is_dgram = 1;
+ if ((rv = parse_uri ((char *) ecm->connect_uri, &sep)))
+ return clib_error_return (0, "Uri parse error: %d", rv);
+ ecm->transport_proto = sep.transport_proto;
+ ecm->is_dgram = (sep.transport_proto == TRANSPORT_PROTO_UDP);
#if ECHO_CLIENT_PTHREAD
echo_clients_start_tx_pthread ();
{
vec_reset_length (ecm->connection_index_by_thread[i]);
vec_reset_length (ecm->connections_this_batch_by_thread[i]);
+ vec_reset_length (ecm->quic_session_index_by_thread[i]);
}
pool_free (ecm->sessions);