2 * builtin_client.c - vpp built-in tcp client/connect code
4 * Copyright (c) 2017 by Cisco and/or its affiliates.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at:
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
18 #include <vnet/vnet.h>
19 #include <vnet/plugin/plugin.h>
20 #include <vnet/tcp/builtin_client.h>
22 #include <vlibapi/api.h>
23 #include <vlibmemory/api.h>
24 #include <vlibsocket/api.h>
25 #include <vpp/app/version.h>
27 /* define message IDs */
28 #include <vpp/api/vpe_msg_enum.h>
30 /* define message structures */
32 #include <vpp/api/vpe_all_api_h.h>
35 /* define generated endian-swappers */
37 #include <vpp/api/vpe_all_api_h.h>
40 /* instantiate all the print functions we know about */
41 #define vl_print(handle, ...) vlib_cli_output (handle, __VA_ARGS__)
43 #include <vpp/api/vpe_all_api_h.h>
46 #define TCP_BUILTIN_CLIENT_DBG (0)
49 signal_evt_to_cli_i (int *code)
51 tclient_main_t *tm = &tclient_main;
52 ASSERT (vlib_get_thread_index () == 0);
53 vlib_process_signal_event (tm->vlib_main, tm->cli_node_index, *code, 0);
57 signal_evt_to_cli (int code)
59 if (vlib_get_thread_index () != 0)
60 vl_api_rpc_call_main_thread (signal_evt_to_cli_i, (u8 *) & code,
63 signal_evt_to_cli_i (&code);
67 send_test_chunk (tclient_main_t * tm, session_t * s)
69 u8 *test_data = tm->connect_test_data;
72 session_fifo_event_t evt;
73 static int serial_number = 0;
77 ASSERT (vec_len (test_data) > 0);
79 test_buf_offset = s->bytes_sent % vec_len (test_data);
80 bytes_this_chunk = vec_len (test_data) - test_buf_offset;
82 bytes_this_chunk = bytes_this_chunk < s->bytes_to_send
83 ? bytes_this_chunk : s->bytes_to_send;
85 txf = s->server_tx_fifo;
86 rv = svm_fifo_enqueue_nowait (txf, bytes_this_chunk,
87 test_data + test_buf_offset);
89 /* If we managed to enqueue data... */
92 /* Account for it... */
93 s->bytes_to_send -= rv;
96 if (TCP_BUILTIN_CLIENT_DBG)
99 ELOG_TYPE_DECLARE (e) =
101 .format = "tx-enq: xfer %d bytes, sent %u remain %u",
102 .format_args = "i4i4i4",
109 ed = ELOG_DATA (&vlib_global_main.elog_main, e);
111 ed->data[1] = s->bytes_sent;
112 ed->data[2] = s->bytes_to_send;
115 /* Poke the session layer */
116 if (svm_fifo_set_event (txf))
118 /* Fabricate TX event, send to vpp */
120 evt.event_type = FIFO_EVENT_APP_TX;
121 evt.event_id = serial_number++;
123 if (unix_shared_memory_queue_add
124 (tm->vpp_event_queue[txf->master_thread_index], (u8 *) & evt,
125 0 /* do wait for mutex */ ))
126 clib_warning ("could not enqueue event");
132 receive_test_chunk (tclient_main_t * tm, session_t * s)
134 svm_fifo_t *rx_fifo = s->server_rx_fifo;
135 int n_read, test_bytes = 0;
136 u32 my_thread_index = vlib_get_thread_index ();
138 /* Allow enqueuing of new event */
139 // svm_fifo_unset_event (rx_fifo);
143 n_read = svm_fifo_dequeue_nowait (rx_fifo,
144 vec_len (tm->rx_buf[my_thread_index]),
145 tm->rx_buf[my_thread_index]);
149 n_read = svm_fifo_max_dequeue (rx_fifo);
150 svm_fifo_dequeue_drop (rx_fifo, n_read);
155 if (TCP_BUILTIN_CLIENT_DBG)
158 ELOG_TYPE_DECLARE (e) =
160 .format = "rx-deq: %d bytes",
168 ed = ELOG_DATA (&vlib_global_main.elog_main, e);
169 ed->data[0] = n_read;
175 for (i = 0; i < n_read; i++)
177 if (tm->rx_buf[my_thread_index][i]
178 != ((s->bytes_received + i) & 0xff))
180 clib_warning ("read %d error at byte %lld, 0x%x not 0x%x",
181 n_read, s->bytes_received + i,
182 tm->rx_buf[my_thread_index][i],
183 ((s->bytes_received + i) & 0xff));
187 s->bytes_to_receive -= n_read;
188 s->bytes_received += n_read;
193 builtin_client_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
194 vlib_frame_t * frame)
196 tclient_main_t *tm = &tclient_main;
197 int my_thread_index = vlib_get_thread_index ();
201 u32 *connection_indices;
202 u32 *connections_this_batch;
203 u32 nconnections_this_batch;
205 connection_indices = tm->connection_index_by_thread[my_thread_index];
206 connections_this_batch =
207 tm->connections_this_batch_by_thread[my_thread_index];
209 if ((tm->run_test == 0) ||
210 ((vec_len (connection_indices) == 0)
211 && vec_len (connections_this_batch) == 0))
214 /* Grab another pile of connections */
215 if (PREDICT_FALSE (vec_len (connections_this_batch) == 0))
217 nconnections_this_batch =
218 clib_min (tm->connections_per_batch, vec_len (connection_indices));
220 ASSERT (nconnections_this_batch > 0);
221 vec_validate (connections_this_batch, nconnections_this_batch - 1);
222 clib_memcpy (connections_this_batch,
223 connection_indices + vec_len (connection_indices)
224 - nconnections_this_batch,
225 nconnections_this_batch * sizeof (u32));
226 _vec_len (connection_indices) -= nconnections_this_batch;
229 if (PREDICT_FALSE (tm->prev_conns != tm->connections_per_batch
230 && tm->prev_conns == vec_len (connections_this_batch)))
233 tm->prev_conns = vec_len (connections_this_batch);
234 if (tm->repeats == 500000)
236 clib_warning ("stuck clients");
241 tm->prev_conns = vec_len (connections_this_batch);
245 for (i = 0; i < vec_len (connections_this_batch); i++)
249 sp = pool_elt_at_index (tm->sessions, connections_this_batch[i]);
251 if (sp->bytes_to_send > 0)
253 send_test_chunk (tm, sp);
256 if (sp->bytes_to_receive > 0)
258 receive_test_chunk (tm, sp);
261 if (PREDICT_FALSE (delete_session == 1))
263 u32 index, thread_index;
266 __sync_fetch_and_add (&tm->tx_total, sp->bytes_sent);
267 __sync_fetch_and_add (&tm->rx_total, sp->bytes_received);
269 stream_session_parse_handle (sp->vpp_session_handle,
270 &index, &thread_index);
271 s = stream_session_get_if_valid (index, thread_index);
275 vnet_disconnect_args_t _a, *a = &_a;
276 a->handle = stream_session_handle (s);
277 a->app_index = tm->app_index;
278 vnet_disconnect_session (a);
280 vec_delete (connections_this_batch, 1, i);
282 __sync_fetch_and_add (&tm->ready_connections, -1);
285 clib_warning ("session AWOL?");
287 /* Kick the debug CLI process */
288 if (tm->ready_connections == 0)
290 signal_evt_to_cli (2);
295 tm->connection_index_by_thread[my_thread_index] = connection_indices;
296 tm->connections_this_batch_by_thread[my_thread_index] =
297 connections_this_batch;
302 VLIB_REGISTER_NODE (builtin_client_node) =
304 .function = builtin_client_node_fn,
305 .name = "builtin-tcp-client",
306 .type = VLIB_NODE_TYPE_INPUT,
307 .state = VLIB_NODE_STATE_DISABLED,
311 /* So we don't get "no handler for... " msgs */
313 vl_api_memclnt_create_reply_t_handler (vl_api_memclnt_create_reply_t * mp)
315 vlib_main_t *vm = vlib_get_main ();
316 tclient_main_t *tm = &tclient_main;
317 tm->my_client_index = mp->index;
318 vlib_process_signal_event (vm, tm->cli_node_index, 1 /* evt */ ,
323 create_api_loopback (tclient_main_t * tm)
325 vlib_main_t *vm = vlib_get_main ();
326 vl_api_memclnt_create_t _m, *mp = &_m;
327 extern void vl_api_memclnt_create_t_handler (vl_api_memclnt_create_t *);
328 api_main_t *am = &api_main;
329 vl_shmem_hdr_t *shmem_hdr;
330 uword *event_data = 0, event_type;
334 * Create a "loopback" API client connection
335 * Don't do things like this unless you know what you're doing...
338 shmem_hdr = am->shmem_hdr;
339 tm->vl_input_queue = shmem_hdr->vl_input_queue;
340 memset (mp, 0, sizeof (*mp));
341 mp->_vl_msg_id = VL_API_MEMCLNT_CREATE;
342 mp->context = 0xFEEDFACE;
343 mp->input_queue = pointer_to_uword (tm->vl_input_queue);
344 strncpy ((char *) mp->name, "tcp_clients_tester", sizeof (mp->name) - 1);
346 vl_api_memclnt_create_t_handler (mp);
349 vlib_process_wait_for_event_or_clock (vm, 1.0);
350 event_type = vlib_process_get_events (vm, &event_data);
360 clib_warning ("unknown event_type %d", event_type);
367 #define foreach_tclient_static_api_msg \
368 _(MEMCLNT_CREATE_REPLY, memclnt_create_reply) \
370 static clib_error_t *
371 tclient_api_hookup (vlib_main_t * vm)
373 vl_msg_api_msg_config_t _c, *c = &_c;
375 /* Hook up client-side static APIs to our handlers */
376 #define _(N,n) do { \
377 c->id = VL_API_##N; \
379 c->handler = vl_api_##n##_t_handler; \
380 c->cleanup = vl_noop_handler; \
381 c->endian = vl_api_##n##_t_endian; \
382 c->print = vl_api_##n##_t_print; \
383 c->size = sizeof(vl_api_##n##_t); \
384 c->traced = 1; /* trace, so these msgs print */ \
385 c->replay = 0; /* don't replay client create/delete msgs */ \
386 c->message_bounce = 0; /* don't bounce this message */ \
387 vl_msg_api_config(c);} while (0);
389 foreach_tclient_static_api_msg;
396 tcp_test_clients_init (vlib_main_t * vm)
398 tclient_main_t *tm = &tclient_main;
399 vlib_thread_main_t *vtm = vlib_get_thread_main ();
403 tclient_api_hookup (vm);
404 if (create_api_loopback (tm))
407 num_threads = 1 /* main thread */ + vtm->n_threads;
409 /* Init test data. Big buffer */
410 vec_validate (tm->connect_test_data, 1024 * 1024 - 1);
411 for (i = 0; i < vec_len (tm->connect_test_data); i++)
412 tm->connect_test_data[i] = i & 0xff;
414 vec_validate (tm->rx_buf, num_threads - 1);
415 for (i = 0; i < num_threads; i++)
416 vec_validate (tm->rx_buf[i], vec_len (tm->connect_test_data) - 1);
420 vec_validate (tm->connection_index_by_thread, vtm->n_vlib_mains);
421 vec_validate (tm->connections_this_batch_by_thread, vtm->n_vlib_mains);
422 vec_validate (tm->vpp_event_queue, vtm->n_vlib_mains);
428 builtin_session_connected_callback (u32 app_index, u32 api_context,
429 stream_session_t * s, u8 is_fail)
431 tclient_main_t *tm = &tclient_main;
434 u8 thread_index = vlib_get_thread_index ();
436 ASSERT (s->thread_index == thread_index);
440 clib_warning ("connection %d failed!", api_context);
441 signal_evt_to_cli (-1);
445 if (!tm->vpp_event_queue[thread_index])
446 tm->vpp_event_queue[thread_index] =
447 session_manager_get_vpp_event_queue (thread_index);
452 clib_spinlock_lock_if_init (&tm->sessions_lock);
453 pool_get (tm->sessions, session);
454 clib_spinlock_unlock_if_init (&tm->sessions_lock);
456 memset (session, 0, sizeof (*session));
457 session_index = session - tm->sessions;
458 session->bytes_to_send = tm->bytes_to_send;
459 session->bytes_to_receive = tm->no_return ? 0ULL : tm->bytes_to_send;
460 session->server_rx_fifo = s->server_rx_fifo;
461 session->server_rx_fifo->client_session_index = session_index;
462 session->server_tx_fifo = s->server_tx_fifo;
463 session->server_tx_fifo->client_session_index = session_index;
464 session->vpp_session_handle = stream_session_handle (s);
466 vec_add1 (tm->connection_index_by_thread[thread_index], session_index);
467 __sync_fetch_and_add (&tm->ready_connections, 1);
468 if (tm->ready_connections == tm->expected_connections)
471 /* Signal the CLI process that the action is starting... */
472 signal_evt_to_cli (1);
479 builtin_session_reset_callback (stream_session_t * s)
481 if (s->session_state == SESSION_STATE_READY)
482 clib_warning ("Reset active connection %U", format_stream_session, s, 2);
483 stream_session_cleanup (s);
488 builtin_session_create_callback (stream_session_t * s)
494 builtin_session_disconnect_callback (stream_session_t * s)
496 tclient_main_t *tm = &tclient_main;
497 vnet_disconnect_args_t _a, *a = &_a;
498 a->handle = stream_session_handle (s);
499 a->app_index = tm->app_index;
500 vnet_disconnect_session (a);
505 builtin_server_rx_callback (stream_session_t * s)
511 static session_cb_vft_t builtin_clients = {
512 .session_reset_callback = builtin_session_reset_callback,
513 .session_connected_callback = builtin_session_connected_callback,
514 .session_accept_callback = builtin_session_create_callback,
515 .session_disconnect_callback = builtin_session_disconnect_callback,
516 .builtin_server_rx_callback = builtin_server_rx_callback
521 attach_builtin_test_clients_app (void)
523 tclient_main_t *tm = &tclient_main;
524 vnet_app_attach_args_t _a, *a = &_a;
525 u8 segment_name[128];
526 u32 segment_name_length, prealloc_fifos;
529 segment_name_length = ARRAY_LEN (segment_name);
531 memset (a, 0, sizeof (*a));
532 memset (options, 0, sizeof (options));
534 a->api_client_index = tm->my_client_index;
535 a->segment_name = segment_name;
536 a->segment_name_length = segment_name_length;
537 a->session_cb_vft = &builtin_clients;
539 prealloc_fifos = tm->prealloc_fifos ? tm->expected_connections : 1;
541 options[SESSION_OPTIONS_ACCEPT_COOKIE] = 0x12345678;
542 options[SESSION_OPTIONS_SEGMENT_SIZE] = (2ULL << 32);
543 options[SESSION_OPTIONS_RX_FIFO_SIZE] = tm->fifo_size;
544 options[SESSION_OPTIONS_TX_FIFO_SIZE] = tm->fifo_size;
545 options[APP_OPTIONS_PRIVATE_SEGMENT_COUNT] = tm->private_segment_count;
546 options[APP_OPTIONS_PRIVATE_SEGMENT_SIZE] = tm->private_segment_size;
547 options[APP_OPTIONS_PREALLOC_FIFO_PAIRS] = prealloc_fifos;
549 options[APP_OPTIONS_FLAGS] = APP_OPTIONS_FLAGS_BUILTIN_APP;
551 a->options = options;
553 if (vnet_application_attach (a))
556 tm->app_index = a->app_index;
561 tclient_thread_fn (void *arg)
566 /** Start a transmit thread */
568 start_tx_pthread (tclient_main_t * tm)
570 if (tm->client_thread_handle == 0)
572 int rv = pthread_create (&tm->client_thread_handle,
574 tclient_thread_fn, 0);
577 tm->client_thread_handle = 0;
585 clients_connect (vlib_main_t * vm, u8 * uri, u32 n_clients)
587 tclient_main_t *tm = &tclient_main;
588 vnet_connect_args_t _a, *a = &_a;
590 for (i = 0; i < n_clients; i++)
592 memset (a, 0, sizeof (*a));
594 a->uri = (char *) uri;
596 a->app_index = tm->app_index;
598 vnet_connect_uri (a);
600 /* Crude pacing for call setups */
602 vlib_process_suspend (vm, 10e-6);
606 static clib_error_t *
607 test_tcp_clients_command_fn (vlib_main_t * vm,
608 unformat_input_t * input,
609 vlib_cli_command_t * cmd)
611 tclient_main_t *tm = &tclient_main;
612 vlib_thread_main_t *thread_main = vlib_get_thread_main ();
613 uword *event_data = 0, event_type;
614 u8 *default_connect_uri = (u8 *) "tcp://6.0.1.1/1234", *uri;
615 u64 tmp, total_bytes;
616 f64 test_timeout = 20.0, syn_timeout = 20.0, delta;
617 f64 time_before_connects;
619 int preallocate_sessions = 0;
623 tm->bytes_to_send = 8192;
625 tm->fifo_size = 64 << 10;
626 tm->connections_per_batch = 1000;
627 tm->private_segment_count = 0;
628 tm->private_segment_size = 0;
630 if (thread_main->n_vlib_mains > 1)
631 clib_spinlock_init (&tm->sessions_lock);
632 vec_free (tm->connect_uri);
634 while (unformat_check_input (input) != UNFORMAT_END_OF_INPUT)
636 if (unformat (input, "nclients %d", &n_clients))
638 else if (unformat (input, "mbytes %lld", &tmp))
639 tm->bytes_to_send = tmp << 20;
640 else if (unformat (input, "gbytes %lld", &tmp))
641 tm->bytes_to_send = tmp << 30;
642 else if (unformat (input, "bytes %lld", &tm->bytes_to_send))
644 else if (unformat (input, "uri %s", &tm->connect_uri))
646 else if (unformat (input, "test-timeout %f", &test_timeout))
648 else if (unformat (input, "syn-timeout %f", &syn_timeout))
650 else if (unformat (input, "no-return"))
652 else if (unformat (input, "fifo-size %d", &tm->fifo_size))
653 tm->fifo_size <<= 10;
654 else if (unformat (input, "private-segment-count %d",
655 &tm->private_segment_count))
657 else if (unformat (input, "private-segment-size %dm", &tmp))
658 tm->private_segment_size = tmp << 20;
659 else if (unformat (input, "private-segment-size %dg", &tmp))
660 tm->private_segment_size = tmp << 30;
661 else if (unformat (input, "private-segment-size %d", &tmp))
662 tm->private_segment_size = tmp;
663 else if (unformat (input, "preallocate-fifos"))
664 tm->prealloc_fifos = 1;
665 else if (unformat (input, "preallocate-sessions"))
666 preallocate_sessions = 1;
668 if (unformat (input, "client-batch %d", &tm->connections_per_batch))
671 return clib_error_return (0, "unknown input `%U'",
672 format_unformat_error, input);
675 /* Store cli process node index for signalling */
676 tm->cli_node_index = vlib_get_current_process (vm)->node_runtime.node_index;
678 if (tm->is_init == 0)
680 if (tcp_test_clients_init (vm))
681 return clib_error_return (0, "failed init");
685 tm->ready_connections = 0;
686 tm->expected_connections = n_clients;
690 uri = default_connect_uri;
692 uri = tm->connect_uri;
694 #if TCP_BUILTIN_CLIENT_PTHREAD
698 vlib_worker_thread_barrier_sync (vm);
699 vnet_session_enable_disable (vm, 1 /* turn on TCP, etc. */ );
700 vlib_worker_thread_barrier_release (vm);
702 if (tm->test_client_attached == 0)
704 if (attach_builtin_test_clients_app ())
706 return clib_error_return (0, "app attach failed");
709 tm->test_client_attached = 1;
711 /* Turn on the builtin client input nodes */
712 for (i = 0; i < thread_main->n_vlib_mains; i++)
713 vlib_node_set_state (vlib_mains[i], builtin_client_node.index,
714 VLIB_NODE_STATE_POLLING);
716 if (preallocate_sessions)
718 session_t *sp __attribute__ ((unused));
719 for (i = 0; i < n_clients; i++)
720 pool_get (tm->sessions, sp);
721 for (i = 0; i < n_clients; i++)
722 pool_put_index (tm->sessions, i);
725 /* Fire off connect requests */
726 time_before_connects = vlib_time_now (vm);
727 clients_connect (vm, uri, n_clients);
729 /* Park until the sessions come up, or ten seconds elapse... */
730 vlib_process_wait_for_event_or_clock (vm, syn_timeout);
731 event_type = vlib_process_get_events (vm, &event_data);
735 vlib_cli_output (vm, "Timeout with only %d sessions active...",
736 tm->ready_connections);
740 delta = vlib_time_now (vm) - time_before_connects;
745 (vm, "%d three-way handshakes in %.2f seconds, %.2f/sec",
746 n_clients, delta, ((f64) n_clients) / delta);
749 tm->test_start_time = vlib_time_now (tm->vlib_main);
750 vlib_cli_output (vm, "Test started at %.6f", tm->test_start_time);
754 vlib_cli_output (vm, "unexpected event(1): %d", event_type);
758 /* Now wait for the sessions to finish... */
759 vlib_process_wait_for_event_or_clock (vm, test_timeout);
760 event_type = vlib_process_get_events (vm, &event_data);
764 vlib_cli_output (vm, "Timeout with %d sessions still active...",
765 tm->ready_connections);
769 tm->test_end_time = vlib_time_now (vm);
770 vlib_cli_output (vm, "Test finished at %.6f", tm->test_end_time);
774 vlib_cli_output (vm, "unexpected event(2): %d", event_type);
778 delta = tm->test_end_time - tm->test_start_time;
782 total_bytes = (tm->no_return ? tm->tx_total : tm->rx_total);
783 transfer_type = tm->no_return ? "half-duplex" : "full-duplex";
785 "%lld bytes (%lld mbytes, %lld gbytes) in %.2f seconds",
786 total_bytes, total_bytes / (1ULL << 20),
787 total_bytes / (1ULL << 30), delta);
788 vlib_cli_output (vm, "%.2f bytes/second %s",
789 ((f64) total_bytes) / (delta), transfer_type);
790 vlib_cli_output (vm, "%.4f gbit/second %s",
791 (((f64) total_bytes * 8.0) / delta / 1e9),
795 vlib_cli_output (vm, "zero delta-t?");
799 for (i = 0; i < vec_len (tm->connection_index_by_thread); i++)
801 vec_reset_length (tm->connection_index_by_thread[i]);
802 vec_reset_length (tm->connections_this_batch_by_thread[i]);
805 pool_free (tm->sessions);
811 VLIB_CLI_COMMAND (test_clients_command, static) =
813 .path = "test tcp clients",
814 .short_help = "test tcp clients [nclients %d]"
815 "[iterations %d] [bytes %d] [uri tcp://6.0.1.1/1234]",
816 .function = test_tcp_clients_command_fn,
822 tcp_test_clients_main_init (vlib_main_t * vm)
824 tclient_main_t *tm = &tclient_main;
829 VLIB_INIT_FUNCTION (tcp_test_clients_main_init);
832 * fd.io coding-style-patch-verification: ON
835 * eval: (c-set-style "gnu")