#include <signal.h>
#include <vnet/session/application_interface.h>
-#include <svm/svm_fifo_segment.h>
#include <vlibmemory/api.h>
#include <vpp/api/vpe_msg_enum.h>
+#include <svm/fifo_segment.h>
#define vl_typedefs /* define message structures */
#include <vpp/api/vpe_all_api_h.h>
#include <vpp/api/vpe_all_api_h.h>
#undef vl_printfun
-#define TCP_ECHO_DBG 0
-#define DBG(_fmt,_args...) \
- if (TCP_ECHO_DBG) \
- clib_warning (_fmt, _args)
+#define QUIC_ECHO_DBG 0
+#define DBG(_fmt, _args...) \
+ if (QUIC_ECHO_DBG) \
+ clib_warning (_fmt, ##_args)
typedef struct
{
STATE_DETACHED
} connection_state_t;
+enum quic_session_type_t
+{
+ QUIC_SESSION_TYPE_QUIC = 0,
+ QUIC_SESSION_TYPE_STREAM = 1,
+ QUIC_SESSION_TYPE_LISTEN = INT32_MAX,
+};
+
typedef struct
{
/* vpe input queue */
/* Hash table for disconnect processing */
uword *session_index_by_vpp_handles;
+ /* Hash table for shared segment_names */
+ uword *shared_segment_names;
+ clib_spinlock_t segment_names_lock;
+
/* intermediate rx buffer */
u8 *rx_buf;
u8 test_return_packets;
u64 bytes_to_send;
u32 fifo_size;
+ u32 quic_streams;
+ u8 *appns_id;
+ u64 appns_flags;
+ u64 appns_secret;
u32 n_clients;
u64 tx_total;
* vpp. If sock api is used, shm binary api is subsequently bootstrapped
* and all other messages are exchanged using shm IPC. */
u8 use_sock_api;
+ int max_test_msg;
- svm_fifo_segment_main_t segment_main;
+ fifo_segment_main_t segment_main;
} echo_main_t;
echo_main_t echo_main;
static void handle_mq_event (session_event_t * e);
+#if CLIB_DEBUG > 0
+#define TIMEOUT 10.0
+#else
+#define TIMEOUT 10.0
+#endif
+
+static int
+wait_for_segment_allocation (u64 segment_handle)
+{
+ echo_main_t *em = &echo_main;
+ f64 timeout;
+ timeout = clib_time_now (&em->clib_time) + TIMEOUT;
+ uword *segment_present;
+ DBG ("ASKING for %lu", segment_handle);
+ while (clib_time_now (&em->clib_time) < timeout)
+ {
+ clib_spinlock_lock (&em->segment_names_lock);
+ segment_present = hash_get (em->shared_segment_names, segment_handle);
+ clib_spinlock_unlock (&em->segment_names_lock);
+ if (segment_present != 0)
+ return 0;
+ if (em->time_to_stop == 1)
+ return 0;
+ }
+ DBG ("timeout waiting for segment_allocation %lu", segment_handle);
+ return -1;
+}
+
+static int
+wait_for_disconnected_sessions (echo_main_t * em)
+{
+ f64 timeout;
+ timeout = clib_time_now (&em->clib_time) + TIMEOUT;
+ while (clib_time_now (&em->clib_time) < timeout)
+ {
+ if (hash_elts (em->session_index_by_vpp_handles) == 0)
+ return 0;
+ }
+ DBG ("timeout waiting for disconnected_sessions");
+ return -1;
+}
+
static int
wait_for_state_change (echo_main_t * em, connection_state_t state)
{
svm_msg_q_msg_t msg;
session_event_t *e;
f64 timeout;
-
-#if CLIB_DEBUG > 0
-#define TIMEOUT 600.0
-#else
-#define TIMEOUT 600.0
-#endif
-
timeout = clib_time_now (&em->clib_time) + TIMEOUT;
while (clib_time_now (&em->clib_time) < timeout)
bmp->options[APP_OPTIONS_ADD_SEGMENT_SIZE] = 128 << 20;
bmp->options[APP_OPTIONS_SEGMENT_SIZE] = 256 << 20;
bmp->options[APP_OPTIONS_EVT_QUEUE_SIZE] = 256;
+ if (em->appns_id)
+ {
+ bmp->namespace_id_len = vec_len (em->appns_id);
+ clib_memcpy_fast (bmp->namespace_id, em->appns_id,
+ bmp->namespace_id_len);
+ bmp->options[APP_OPTIONS_FLAGS] |= em->appns_flags;
+ bmp->options[APP_OPTIONS_NAMESPACE_SECRET] = em->appns_secret;
+ }
vl_msg_api_send_shmem (em->vl_input_queue, (u8 *) & bmp);
cert_mp = vl_msg_api_alloc (sizeof (*cert_mp) + test_srv_crt_rsa_len);
static int
ssvm_segment_attach (char *name, ssvm_segment_type_t type, int fd)
{
- svm_fifo_segment_create_args_t _a, *a = &_a;
- svm_fifo_segment_main_t *sm = &echo_main.segment_main;
+ fifo_segment_create_args_t _a, *a = &_a;
+ fifo_segment_main_t *sm = &echo_main.segment_main;
int rv;
clib_memset (a, 0, sizeof (*a));
if (type == SSVM_SEGMENT_MEMFD)
a->memfd_fd = fd;
- if ((rv = svm_fifo_segment_attach (sm, a)))
+ if ((rv = fifo_segment_attach (sm, a)))
{
clib_warning ("svm_fifo_segment_attach ('%s') failed", name);
return rv;
}
-
vec_reset_length (a->new_segment_indices);
return 0;
}
echo_main_t *em = &echo_main;
int *fds = 0;
u32 n_fds = 0;
+ u64 segment_handle;
+ segment_handle = clib_net_to_host_u64 (mp->segment_handle);
+ DBG ("Attached returned app %u", htons (mp->app_index));
if (mp->retval)
{
-1))
goto failed;
}
+ DBG ("SETTING for %lu", segment_handle);
+ clib_spinlock_lock (&em->segment_names_lock);
+ hash_set (em->shared_segment_names, segment_handle, 1);
+ clib_spinlock_unlock (&em->segment_names_lock);
em->state = STATE_ATTACHED;
return;
static void
vl_api_map_another_segment_t_handler (vl_api_map_another_segment_t * mp)
{
- svm_fifo_segment_main_t *sm = &echo_main.segment_main;
- svm_fifo_segment_create_args_t _a, *a = &_a;
+ fifo_segment_main_t *sm = &echo_main.segment_main;
+ fifo_segment_create_args_t _a, *a = &_a;
+ echo_main_t *em = &echo_main;
int rv;
+ int *fds = 0;
+
+ if (mp->fd_flags & SESSION_FD_F_MEMFD_SEGMENT)
+ {
+ vec_validate (fds, 1);
+ vl_socket_client_recv_fd_msg (fds, 1, 5);
+ if (ssvm_segment_attach
+ ((char *) mp->segment_name, SSVM_SEGMENT_MEMFD, fds[0]))
+ clib_warning
+ ("svm_fifo_segment_attach ('%s') failed on SSVM_SEGMENT_MEMFD",
+ mp->segment_name);
+ DBG ("SETTING for %lu", mp->segment_name);
+ clib_spinlock_lock (&em->segment_names_lock);
+ hash_set (em->shared_segment_names, mp->segment_name, 1);
+ clib_spinlock_unlock (&em->segment_names_lock);
+ vec_free (fds);
+ return;
+ }
clib_memset (a, 0, sizeof (*a));
a->segment_name = (char *) mp->segment_name;
a->segment_size = mp->segment_size;
/* Attach to the segment vpp created */
- rv = svm_fifo_segment_attach (sm, a);
+ rv = fifo_segment_attach (sm, a);
if (rv)
{
clib_warning ("svm_fifo_segment_attach ('%s') failed",
}
clib_warning ("Mapped new segment '%s' size %d", mp->segment_name,
mp->segment_size);
+ clib_spinlock_lock (&em->segment_names_lock);
+ hash_set (em->shared_segment_names, mp->segment_name, 1);
+ clib_spinlock_unlock (&em->segment_names_lock);
}
static void
}
static void
-test_recv_bytes (echo_session_t * s, u8 * rx_buf, u32 n_read)
+test_recv_bytes (echo_main_t * em, echo_session_t * s, u8 * rx_buf,
+ u32 n_read)
{
int i;
for (i = 0; i < n_read; i++)
{
- if (rx_buf[i] != ((s->bytes_received + i) & 0xff))
+ if (rx_buf[i] != ((s->bytes_received + i) & 0xff)
+ && em->max_test_msg > 0)
{
clib_warning ("error at byte %lld, 0x%x not 0x%x",
s->bytes_received + i, rx_buf[i],
((s->bytes_received + i) & 0xff));
+ em->max_test_msg--;
+ if (em->max_test_msg == 0)
+ clib_warning ("Too many errors, hiding next ones");
}
}
}
if (n_read > 0)
{
if (em->test_return_packets)
- test_recv_bytes (s, rx_buf, n_read);
+ test_recv_bytes (em, s, rx_buf, n_read);
n_to_read -= n_read;
s->bytes_received += n_read;
s->bytes_to_receive -= n_read;
+ ASSERT (s->bytes_to_receive >= 0);
}
else
break;
break;
}
- clib_warning ("GOT OUT");
- DBG ("session %d done", session_index);
+ DBG ("session %d done send %lu to do, %lu done || recv %lu to do, %lu done",
+ session_index, s->bytes_to_send, s->bytes_sent, s->bytes_to_receive,
+ s->bytes_received);
em->tx_total += s->bytes_sent;
em->rx_total += s->bytes_received;
em->n_active_clients--;
}
void
-client_send_connect (echo_main_t * em)
+client_send_connect (echo_main_t * em, u8 * uri, u32 opaque)
{
vl_api_connect_uri_t *cmp;
cmp = vl_msg_api_alloc (sizeof (*cmp));
cmp->_vl_msg_id = ntohs (VL_API_CONNECT_URI);
cmp->client_index = em->my_client_index;
- cmp->context = ntohl (0xfeedface);
- memcpy (cmp->uri, em->connect_uri, vec_len (em->connect_uri));
+ cmp->context = ntohl (opaque);
+ memcpy (cmp->uri, uri, vec_len (uri));
vl_msg_api_send_shmem (em->vl_input_queue, (u8 *) & cmp);
}
}
clib_warning ("listening on %U:%u", format_ip46_address, mp->lcl_ip,
- mp->lcl_is_ip4 ? IP46_TYPE_IP4 : IP46_TYPE_IP6, mp->lcl_port);
+ mp->lcl_is_ip4 ? IP46_TYPE_IP4 : IP46_TYPE_IP6,
+ clib_net_to_host_u16 (mp->lcl_port));
em->state = STATE_READY;
}
+static void
+quic_qsession_accepted_handler (session_accepted_msg_t * mp)
+{
+ DBG ("Accept on QSession index %u", mp->handle);
+}
+
+
static void
session_accepted_handler (session_accepted_msg_t * mp)
{
echo_session_t *session;
static f64 start_time;
u32 session_index;
+ u64 segment_handle;
u8 *ip_str;
+ segment_handle = mp->segment_handle;
+
if (start_time == 0.0)
start_time = clib_time_now (&em->clib_time);
- ip_str = format (0, "%U", format_ip46_address, &mp->ip, mp->is_ip4);
+ ip_str = format (0, "%U", format_ip46_address, &mp->rmt.ip, mp->rmt.is_ip4);
clib_warning ("Accepted session from: %s:%d", ip_str,
- clib_net_to_host_u16 (mp->port));
+ clib_net_to_host_u16 (mp->rmt.port));
/* Allocate local session and set it up */
pool_get (em->sessions, session);
session_index = session - em->sessions;
+ DBG ("Setting session_index %lu", session_index);
+ if (wait_for_segment_allocation (segment_handle))
+ {
+ clib_warning ("timeout waiting for segment allocation %lu",
+ segment_handle);
+ return;
+ }
rx_fifo = uword_to_pointer (mp->server_rx_fifo, svm_fifo_t *);
rx_fifo->client_session_index = session_index;
tx_fifo = uword_to_pointer (mp->server_tx_fifo, svm_fifo_t *);
session->rx_fifo = rx_fifo;
session->tx_fifo = tx_fifo;
+ session->vpp_session_handle = mp->handle;
session->vpp_evt_q = uword_to_pointer (mp->vpp_event_queue_address,
svm_msg_q_t *);
/* Add it to lookup table */
hash_set (em->session_index_by_vpp_handles, mp->handle, session_index);
+ /*
+ * Send accept reply to vpp
+ */
+ app_alloc_ctrl_evt_to_vpp (session->vpp_evt_q, app_evt,
+ SESSION_CTRL_EVT_ACCEPTED_REPLY);
+ rmp = (session_accepted_reply_msg_t *) app_evt->evt->data;
+ rmp->handle = mp->handle;
+ rmp->context = mp->context;
+ app_send_ctrl_evt_to_vpp (session->vpp_evt_q, app_evt);
+
+ /* TODO : this is very ugly */
+ if (mp->rmt.is_ip4 != 255)
+ return quic_qsession_accepted_handler (mp);
+ DBG ("SSession handle is %lu", mp->handle);
+
em->state = STATE_READY;
/* Stats printing */
(f64) pool_elts (em->sessions) / (now - start_time));
}
- /*
- * Send accept reply to vpp
- */
- app_alloc_ctrl_evt_to_vpp (session->vpp_evt_q, app_evt,
- SESSION_CTRL_EVT_ACCEPTED_REPLY);
- rmp = (session_accepted_reply_msg_t *) app_evt->evt->data;
- rmp->handle = mp->handle;
- rmp->context = mp->context;
- app_send_ctrl_evt_to_vpp (session->vpp_evt_q, app_evt);
-
session->bytes_received = 0;
session->start = clib_time_now (&em->clib_time);
}
+static void
+quic_session_connected_handler (session_connected_msg_t * mp)
+{
+ echo_main_t *em = &echo_main;
+ u8 *uri = format (0, "QUIC://session/%lu", mp->handle);
+ DBG ("QSession Connect : %s", uri);
+ client_send_connect (em, uri, QUIC_SESSION_TYPE_STREAM);
+}
+
static void
session_connected_handler (session_connected_msg_t * mp)
{
u32 session_index;
svm_fifo_t *rx_fifo, *tx_fifo;
int rv;
+ u64 segment_handle;
+ segment_handle = mp->segment_handle;
if (mp->retval)
{
pool_get (em->sessions, session);
clib_memset (session, 0, sizeof (*session));
session_index = session - em->sessions;
+ DBG ("Setting session_index %lu", session_index);
+ if (wait_for_segment_allocation (segment_handle))
+ {
+ clib_warning ("timeout waiting for segment allocation %lu",
+ segment_handle);
+ return;
+ }
rx_fifo = uword_to_pointer (mp->server_rx_fifo, svm_fifo_t *);
rx_fifo->client_session_index = session_index;
tx_fifo = uword_to_pointer (mp->server_tx_fifo, svm_fifo_t *);
hash_set (em->session_index_by_vpp_handles, mp->handle, session_index);
+ if (mp->context == QUIC_SESSION_TYPE_QUIC)
+ return quic_session_connected_handler (mp);
+
+ DBG ("SSession Connected");
+
/*
* Start RX thread
*/
em->n_clients_connected += 1;
clib_warning ("session %u (0x%llx) connected with local ip %U port %d",
- session_index, mp->handle, format_ip46_address, mp->lcl_ip,
- mp->is_ip4, clib_net_to_host_u16 (mp->lcl_port));
+ session_index, mp->handle, format_ip46_address, &mp->lcl.ip,
+ mp->lcl.is_ip4, clib_net_to_host_u16 (mp->lcl.port));
}
static void
echo_session_t *session = 0;
uword *p;
int rv = 0;
+ DBG ("Got a SESSION_CTRL_EVT_DISCONNECTED for session %lu", mp->handle);
p = hash_get (em->session_index_by_vpp_handles, mp->handle);
if (!p)
session = pool_elt_at_index (em->sessions, p[0]);
hash_unset (em->session_index_by_vpp_handles, mp->handle);
+
pool_put (em->sessions, session);
app_alloc_ctrl_evt_to_vpp (session->vpp_evt_q, app_evt,
switch (e->event_type)
{
case SESSION_CTRL_EVT_BOUND:
+ DBG ("SESSION_CTRL_EVT_BOUND");
session_bound_handler ((session_bound_msg_t *) e->data);
break;
case SESSION_CTRL_EVT_ACCEPTED:
+ DBG ("SESSION_CTRL_EVT_ACCEPTED");
session_accepted_handler ((session_accepted_msg_t *) e->data);
break;
case SESSION_CTRL_EVT_CONNECTED:
+ DBG ("SESSION_CTRL_EVT_CONNECTED");
session_connected_handler ((session_connected_msg_t *) e->data);
break;
case SESSION_CTRL_EVT_DISCONNECTED:
+ DBG ("SESSION_CTRL_EVT_DISCONNECTED");
session_disconnected_handler ((session_disconnected_msg_t *) e->data);
break;
case SESSION_CTRL_EVT_RESET:
+ DBG ("SESSION_CTRL_EVT_RESET");
session_reset_handler ((session_reset_msg_t *) e->data);
break;
default:
svm_msg_q_msg_t msg;
session_event_t *e;
echo_session_t *s;
+ hash_pair_t *p;
int i;
/* Init test data */
return;
for (i = 0; i < em->n_clients; i++)
- client_send_connect (em);
+ client_send_connect (em, em->connect_uri, QUIC_SESSION_TYPE_QUIC);
start_time = clib_time_now (&em->clib_time);
while (em->n_clients_connected < em->n_clients
/*
* Initialize connections
*/
- for (i = 0; i < em->n_clients; i++)
- {
- s = pool_elt_at_index (em->sessions, i);
+ DBG ("Initialize connections on %u clients", em->n_clients);
+
+ /* *INDENT-OFF* */
+ hash_foreach_pair (p, em->session_index_by_vpp_handles,
+ ({
+ s = pool_elt_at_index (em->sessions, p->value[0]);
s->bytes_to_send = em->bytes_to_send;
if (!em->no_return)
s->bytes_to_receive = em->bytes_to_send;
- }
+ }));
+ /* *INDENT-ON* */
em->n_active_clients = em->n_clients_connected;
/*
* Wait for client threads to send the data
*/
+ DBG ("Waiting for data on %u clients", em->n_active_clients);
start_time = clib_time_now (&em->clib_time);
em->state = STATE_READY;
while (em->n_active_clients)
svm_msg_q_free_msg (em->our_event_queue, &msg);
}
- for (i = 0; i < em->n_clients; i++)
- {
- s = pool_elt_at_index (em->sessions, i);
+ /* *INDENT-OFF* */
+ hash_foreach_pair (p, em->session_index_by_vpp_handles,
+ ({
+ s = pool_elt_at_index (em->sessions, p->value[0]);
+ DBG ("Sending disconnect on session %lu", p->key);
client_disconnect (em, s);
- }
+ }));
+ /* *INDENT-ON* */
/*
* Stats and detach
em->tx_total / (1ULL << 30), deltat);
fformat (stdout, "%.4f Gbit/second\n", (em->tx_total * 8.0) / deltat / 1e9);
+ wait_for_disconnected_sessions (em);
application_detach (em);
}
int n_read, max_dequeue, n_sent;
u32 offset, to_dequeue;
echo_session_t *s;
-
s = pool_elt_at_index (em->sessions, e->session_index);
/* Clear event only once. Otherwise, if we do it in the loop by calling
max_dequeue = svm_fifo_max_dequeue (s->rx_fifo);
if (PREDICT_FALSE (!max_dequeue))
return;
-
do
{
/* The options here are to limit ourselves to max_dequeue or read
to_dequeue = clib_min (max_dequeue, vec_len (em->rx_buf));
n_read = app_recv_stream_raw (s->rx_fifo, em->rx_buf, to_dequeue,
0 /* clear evt */ , 0 /* peek */ );
+
if (n_read > 0)
{
+ if (em->test_return_packets)
+ test_recv_bytes (em, s, em->rx_buf, n_read);
+
max_dequeue -= n_read;
s->bytes_received += n_read;
}
if (rc == ETIMEDOUT)
continue;
e = svm_msg_q_msg_data (em->our_event_queue, &msg);
- clib_warning ("Event %d", e->event_type);
switch (e->event_type)
{
- case FIFO_EVENT_APP_RX:
+ case SESSION_IO_EVT_RX:
+ DBG ("SESSION_IO_EVT_RX");
server_handle_rx (em, e);
break;
default:
{
echo_main_t *em = &echo_main;
uword *p;
+ DBG ("Got disonnected reply for session %lu", mp->handle);
if (mp->retval)
{
{
int i_am_server = 1, test_return_packets = 0;
echo_main_t *em = &echo_main;
- svm_fifo_segment_main_t *sm = &em->segment_main;
+ fifo_segment_main_t *sm = &em->segment_main;
unformat_input_t _argv, *a = &_argv;
u8 *chroot_prefix;
u8 *uri = 0;
clib_memset (em, 0, sizeof (*em));
em->session_index_by_vpp_handles = hash_create (0, sizeof (uword));
+ em->shared_segment_names = hash_create (0, sizeof (uword));
+ clib_spinlock_init (&em->segment_names_lock);
em->my_pid = getpid ();
- em->configured_segment_size = 1 << 20;
em->socket_name = 0;
em->use_sock_api = 1;
em->fifo_size = 64 << 10;
em->n_clients = 1;
+ em->max_test_msg = 50;
+ em->quic_streams = 1;
clib_time_init (&em->clib_time);
init_error_string_table (em);
- svm_fifo_segment_main_init (sm, HIGH_SEGMENT_BASEVA, 20);
+ fifo_segment_main_init (sm, HIGH_SEGMENT_BASEVA, 20);
unformat_init_command_line (a, argv);
while (unformat_check_input (a) != UNFORMAT_END_OF_INPUT)
}
else if (unformat (a, "uri %s", &uri))
;
- else if (unformat (a, "segment-size %dM", &tmp))
- em->configured_segment_size = tmp << 20;
- else if (unformat (a, "segment-size %dG", &tmp))
- em->configured_segment_size = tmp << 30;
else if (unformat (a, "server"))
i_am_server = 1;
else if (unformat (a, "client"))
i_am_server = 0;
else if (unformat (a, "no-return"))
em->no_return = 1;
- else if (unformat (a, "test"))
+ else if (unformat (a, "test-bytes"))
test_return_packets = 1;
else if (unformat (a, "bytes %lld", &mbytes))
{
em->fifo_size = tmp << 10;
else if (unformat (a, "nclients %d", &em->n_clients))
;
+ else if (unformat (a, "appns %_%v%_", &em->appns_id))
+ ;
+ else if (unformat (a, "all-scope"))
+ em->appns_flags |= (APP_OPTIONS_FLAGS_USE_GLOBAL_SCOPE
+ | APP_OPTIONS_FLAGS_USE_LOCAL_SCOPE);
+ else if (unformat (a, "local-scope"))
+ em->appns_flags = APP_OPTIONS_FLAGS_USE_LOCAL_SCOPE;
+ else if (unformat (a, "global-scope"))
+ em->appns_flags = APP_OPTIONS_FLAGS_USE_GLOBAL_SCOPE;
+ else if (unformat (a, "secret %lu", &em->appns_secret))
+ ;
+ else if (unformat (a, "quic-streams %d", &em->quic_streams))
+ ;
else
{
fformat (stderr, "%s: usage [master|slave]\n", argv[0]);