2 * Copyright (c) 2016 Cisco and/or its affiliates.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at:
7 * http://www.apache.org/licenses/LICENSE-2.0
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
18 #include <vlib/vlib.h>
19 #include <vnet/vnet.h>
20 #include <svm/svm_fifo_segment.h>
21 #include <vlibmemory/api.h>
22 #include <vpp/api/vpe_msg_enum.h>
24 #include "../vnet/session/application_interface.h"
26 #define vl_typedefs /* define message structures */
27 #include <vpp/api/vpe_all_api_h.h>
30 /* declare message handlers for each api */
32 #define vl_endianfun /* define message structures */
33 #include <vpp/api/vpe_all_api_h.h>
36 /* instantiate all the print functions we know about */
37 #define vl_print(handle, ...)
39 #include <vpp/api/vpe_all_api_h.h>
42 /* Satisfy external references when not linking with -lvlib */
43 vlib_main_t vlib_global_main;
44 vlib_main_t **vlib_mains;
48 svm_fifo_t * server_rx_fifo;
49 svm_fifo_t * server_tx_fifo;
51 u32 vpp_session_index;
52 u32 vpp_session_thread;
66 unix_shared_memory_queue_t *vl_input_queue;
68 /* API client handle */
71 /* The URI we're playing with */
77 /* Hash table for disconnect processing */
78 uword * session_index_by_vpp_handles;
80 /* intermediate rx buffer */
83 /* URI for slave's connect */
86 u32 connected_session_index;
90 /* drop all packets */
94 unix_shared_memory_queue_t * our_event_queue;
96 /* $$$ single thread only for the moment */
97 unix_shared_memory_queue_t * vpp_event_queue;
101 /* For deadman timers */
102 clib_time_t clib_time;
104 /* State of the connection, shared between msg RX thread and main thread */
105 volatile connection_state_t state;
107 /* Signal variables */
108 volatile int time_to_stop;
109 volatile int time_to_print_stats;
111 u32 configured_segment_size;
113 /* VNET_API_ERROR_FOO -> "Foo" hash table */
114 uword * error_string_by_error_number;
117 svm_fifo_segment_main_t * segment_main;
119 u8 *connect_test_data;
120 } uri_tcp_test_main_t;
122 uri_tcp_test_main_t uri_tcp_test_main;
127 #define NITER 4000000
131 wait_for_state_change (uri_tcp_test_main_t * utm, connection_state_t state)
134 #define TIMEOUT 600.0
136 #define TIMEOUT 600.0
139 f64 timeout = clib_time_now (&utm->clib_time) + TIMEOUT;
141 while (clib_time_now (&utm->clib_time) < timeout)
143 if (utm->state == state)
145 if (utm->state == STATE_FAILED)
148 clib_warning ("timeout waiting for STATE_READY");
153 init_error_string_table (uri_tcp_test_main_t * utm)
155 utm->error_string_by_error_number = hash_create (0, sizeof (uword));
157 #define _(n,v,s) hash_set (utm->error_string_by_error_number, -v, s);
158 foreach_vnet_api_error;
161 hash_set (utm->error_string_by_error_number, 99, "Misc");
165 stop_signal (int signum)
167 uri_tcp_test_main_t *um = &uri_tcp_test_main;
169 um->time_to_stop = 1;
173 stats_signal (int signum)
175 uri_tcp_test_main_t *um = &uri_tcp_test_main;
177 um->time_to_print_stats = 1;
180 static clib_error_t *
181 setup_signal_handlers (void)
183 signal (SIGINT, stats_signal);
184 signal (SIGQUIT, stop_signal);
185 signal (SIGTERM, stop_signal);
191 vlib_cli_output (struct vlib_main_t *vm, char *fmt, ...)
193 clib_warning ("BUG");
197 connect_to_vpp (char *name)
199 uri_tcp_test_main_t *utm = &uri_tcp_test_main;
200 api_main_t *am = &api_main;
202 if (vl_client_connect_to_vlib ("/vpe-api", name, 32) < 0)
205 utm->vl_input_queue = am->shmem_hdr->vl_input_queue;
206 utm->my_client_index = am->my_client_index;
212 vl_api_map_another_segment_t_handler (vl_api_map_another_segment_t *mp)
214 svm_fifo_segment_create_args_t _a, *a = &_a;
217 a->segment_name = (char *) mp->segment_name;
218 a->segment_size = mp->segment_size;
219 /* Attach to the segment vpp created */
220 rv = svm_fifo_segment_attach (a);
223 clib_warning ("svm_fifo_segment_attach ('%s') failed",
227 clib_warning ("Mapped new segment '%s' size %d", mp->segment_name,
232 vl_api_disconnect_session_t_handler (vl_api_disconnect_session_t * mp)
234 uri_tcp_test_main_t *utm = &uri_tcp_test_main;
236 vl_api_disconnect_session_reply_t * rmp;
241 key = (((u64)mp->session_thread_index) << 32) | (u64)mp->session_index;
243 p = hash_get (utm->session_index_by_vpp_handles, key);
247 session = pool_elt_at_index (utm->sessions, p[0]);
248 hash_unset (utm->session_index_by_vpp_handles, key);
249 pool_put (utm->sessions, session);
253 clib_warning ("couldn't find session key %llx", key);
257 rmp = vl_msg_api_alloc (sizeof (*rmp));
258 memset (rmp, 0, sizeof (*rmp));
260 rmp->_vl_msg_id = ntohs (VL_API_DISCONNECT_SESSION_REPLY);
262 rmp->session_index = mp->session_index;
263 rmp->session_thread_index = mp->session_thread_index;
264 vl_msg_api_send_shmem (utm->vl_input_queue, (u8 *)&rmp);
268 vl_api_reset_session_t_handler (vl_api_reset_session_t * mp)
270 uri_tcp_test_main_t *utm = &uri_tcp_test_main;
272 vl_api_reset_session_reply_t * rmp;
277 key = (((u64)mp->session_thread_index) << 32) | (u64)mp->session_index;
279 p = hash_get(utm->session_index_by_vpp_handles, key);
283 session = pool_elt_at_index(utm->sessions, p[0]);
284 hash_unset(utm->session_index_by_vpp_handles, key);
285 pool_put(utm->sessions, session);
289 clib_warning("couldn't find session key %llx", key);
293 rmp = vl_msg_api_alloc (sizeof (*rmp));
294 memset (rmp, 0, sizeof (*rmp));
295 rmp->_vl_msg_id = ntohs (VL_API_DISCONNECT_SESSION_REPLY);
297 rmp->session_index = mp->session_index;
298 rmp->session_thread_index = mp->session_thread_index;
299 vl_msg_api_send_shmem (utm->vl_input_queue, (u8 *)&rmp);
303 handle_fifo_event_connect_rx (uri_tcp_test_main_t *utm, session_fifo_event_t * e)
305 svm_fifo_t * rx_fifo;
310 bytes = e->enqueue_length;
313 n_read = svm_fifo_dequeue_nowait (rx_fifo, 0, vec_len(utm->rx_buf),
318 while (n_read < 0 || bytes > 0);
320 // bytes_to_read = svm_fifo_max_dequeue (rx_fifo);
322 // bytes_to_read = vec_len(utm->rx_buf) > bytes_to_read ?
323 // bytes_to_read : vec_len(utm->rx_buf);
325 // buffer_offset = 0;
326 // while (bytes_to_read > 0)
328 // rv = svm_fifo_dequeue_nowait2 (rx_fifo, mypid,
330 // utm->rx_buf + buffer_offset);
333 // bytes_to_read -= rv;
334 // buffer_offset += rv;
335 // bytes_received += rv;
340 // while (bytes_received < bytes_sent)
342 // rv = svm_fifo_dequeue_nowait2 (rx_fifo, mypid,
343 // vec_len (utm->rx_buf),
349 // for (j = 0; j < rv; j++)
351 // if (utm->rx_buf[j] != ((bytes_received + j) & 0xff))
353 // clib_warning ("error at byte %lld, 0x%x not 0x%x",
354 // bytes_received + j,
356 // ((bytes_received + j )&0xff));
360 // bytes_received += (u64) rv;
366 handle_connect_event_queue (uri_tcp_test_main_t * utm)
368 session_fifo_event_t _e, *e = &_e;;
370 unix_shared_memory_queue_sub (utm->our_event_queue, (u8 *) e, 0 /* nowait */);
371 switch (e->event_type)
373 case FIFO_EVENT_SERVER_RX:
374 handle_fifo_event_connect_rx (utm, e);
377 case FIFO_EVENT_SERVER_EXIT:
381 clib_warning("unknown event type %d", e->event_type);
387 uri_tcp_connect_send (uri_tcp_test_main_t *utm)
389 u8 *test_data = utm->connect_test_data;
392 int mypid = getpid();
395 int buffer_offset, bytes_to_send = 0;
396 session_fifo_event_t evt;
397 static int serial_number = 0;
399 u32 max_chunk = 64 << 10, write;
401 session = pool_elt_at_index (utm->sessions, utm->connected_session_index);
402 tx_fifo = session->server_tx_fifo;
404 vec_validate (utm->rx_buf, vec_len (test_data) - 1);
406 for (i = 0; i < 10; i++)
408 bytes_to_send = vec_len (test_data);
410 while (bytes_to_send > 0)
412 write = bytes_to_send > max_chunk ? max_chunk : bytes_to_send;
413 rv = svm_fifo_enqueue_nowait (tx_fifo, mypid, write,
414 test_data + buffer_offset);
422 /* Fabricate TX event, send to vpp */
424 evt.event_type = FIFO_EVENT_SERVER_TX;
425 /* $$$$ for event logging */
426 evt.enqueue_length = rv;
427 evt.event_id = serial_number++;
429 unix_shared_memory_queue_add (utm->vpp_event_queue, (u8 *) &evt,
430 0 /* do wait for mutex */);
437 uri_tcp_client_test (uri_tcp_test_main_t * utm)
439 vl_api_connect_uri_t * cmp;
440 vl_api_disconnect_session_t *dmp;
441 session_t *connected_session;
444 cmp = vl_msg_api_alloc (sizeof (*cmp));
445 memset (cmp, 0, sizeof (*cmp));
447 cmp->_vl_msg_id = ntohs (VL_API_CONNECT_URI);
448 cmp->client_index = utm->my_client_index;
449 cmp->context = ntohl(0xfeedface);
450 memcpy (cmp->uri, utm->connect_uri, vec_len (utm->connect_uri));
451 vl_msg_api_send_shmem (utm->vl_input_queue, (u8 *)&cmp);
453 if (wait_for_state_change (utm, STATE_READY))
459 vec_validate (utm->connect_test_data, 64 * 1024 - 1);
460 for (i = 0; i < vec_len (utm->connect_test_data); i++)
461 utm->connect_test_data[i] = i & 0xff;
463 /* Start reader thread */
464 /* handle_connect_event_queue (utm); */
467 uri_tcp_connect_send (utm);
470 connected_session = pool_elt_at_index(utm->sessions,
471 utm->connected_session_index);
472 dmp = vl_msg_api_alloc (sizeof (*dmp));
473 memset (dmp, 0, sizeof (*dmp));
474 dmp->_vl_msg_id = ntohs (VL_API_DISCONNECT_SESSION);
475 dmp->client_index = utm->my_client_index;
476 dmp->session_index = connected_session->vpp_session_index;
477 dmp->session_thread_index = connected_session->vpp_session_thread;
478 vl_msg_api_send_shmem (utm->vl_input_queue, (u8 *)&dmp);
482 handle_fifo_event_server_rx (uri_tcp_test_main_t *utm, session_fifo_event_t * e)
484 svm_fifo_t * rx_fifo, * tx_fifo;
487 session_fifo_event_t evt;
488 unix_shared_memory_queue_t *q;
492 tx_fifo = utm->sessions[rx_fifo->client_session_index].server_tx_fifo;
494 bytes = e->enqueue_length;
497 n_read = svm_fifo_dequeue_nowait (rx_fifo, 0, vec_len(utm->rx_buf),
500 /* Reflect if a non-drop session */
501 if (!utm->drop_packets && n_read > 0)
505 rv = svm_fifo_enqueue_nowait (tx_fifo, 0, n_read, utm->rx_buf);
509 /* Fabricate TX event, send to vpp */
511 evt.event_type = FIFO_EVENT_SERVER_TX;
512 /* $$$$ for event logging */
513 evt.enqueue_length = n_read;
514 evt.event_id = e->event_id;
515 q = utm->vpp_event_queue;
516 unix_shared_memory_queue_add (q, (u8 *) &evt, 0 /* do wait for mutex */);
522 while (n_read < 0 || bytes > 0);
526 handle_event_queue (uri_tcp_test_main_t * utm)
528 session_fifo_event_t _e, *e = &_e;;
532 unix_shared_memory_queue_sub (utm->our_event_queue, (u8 *)e,
534 switch (e->event_type)
536 case FIFO_EVENT_SERVER_RX:
537 handle_fifo_event_server_rx (utm, e);
540 case FIFO_EVENT_SERVER_EXIT:
544 clib_warning ("unknown event type %d", e->event_type);
547 if (PREDICT_FALSE(utm->time_to_stop == 1))
549 if (PREDICT_FALSE(utm->time_to_print_stats == 1))
551 utm->time_to_print_stats = 0;
552 fformat(stdout, "%d connections\n", pool_elts (utm->sessions));
558 vl_api_bind_uri_reply_t_handler (vl_api_bind_uri_reply_t * mp)
560 uri_tcp_test_main_t *utm = &uri_tcp_test_main;
561 svm_fifo_segment_create_args_t _a, *a = &_a;
566 clib_warning("bind failed: %d", mp->retval);
570 if (mp->segment_name_length == 0)
572 clib_warning("segment_name_length zero");
576 a->segment_name = (char *) mp->segment_name;
577 a->segment_size = mp->segment_size;
579 ASSERT(mp->server_event_queue_address);
581 /* Attach to the segment vpp created */
582 rv = svm_fifo_segment_attach (a);
585 clib_warning("svm_fifo_segment_attach ('%s') failed", mp->segment_name);
589 utm->our_event_queue =
590 (unix_shared_memory_queue_t *) mp->server_event_queue_address;
592 utm->state = STATE_READY;
596 vl_api_connect_uri_reply_t_handler (vl_api_connect_uri_reply_t * mp)
598 uri_tcp_test_main_t *utm = &uri_tcp_test_main;
599 svm_fifo_segment_create_args_t _a, *a = &_a;
602 svm_fifo_t *rx_fifo, *tx_fifo;
607 clib_warning ("connection failed with code: %d", mp->retval);
608 utm->state = STATE_FAILED;
615 if (mp->segment_name_length == 0)
617 clib_warning ("segment_name_length zero");
618 utm->state = STATE_FAILED;
622 a->segment_name = (char *) mp->segment_name;
623 a->segment_size = mp->segment_size;
625 ASSERT(mp->client_event_queue_address);
627 /* Attach to the segment vpp created */
628 rv = svm_fifo_segment_attach (a);
631 clib_warning ("svm_fifo_segment_attach ('%s') failed",
640 utm->our_event_queue = (unix_shared_memory_queue_t *)
641 mp->client_event_queue_address;
643 utm->vpp_event_queue = (unix_shared_memory_queue_t *)
644 mp->vpp_event_queue_address;
650 pool_get (utm->sessions, session);
651 session_index = session - utm->sessions;
653 rx_fifo = (svm_fifo_t *)mp->server_rx_fifo;
654 rx_fifo->client_session_index = session_index;
655 tx_fifo = (svm_fifo_t *)mp->server_tx_fifo;
656 tx_fifo->client_session_index = session_index;
658 session->server_rx_fifo = rx_fifo;
659 session->server_tx_fifo = tx_fifo;
660 session->vpp_session_index = mp->session_index;
661 session->vpp_session_thread = mp->session_thread_index;
664 utm->connected_session_index = session_index;
666 utm->state = STATE_READY;
670 uri_tcp_bind (uri_tcp_test_main_t *utm)
672 vl_api_bind_uri_t * bmp;
673 u32 fifo_size = 3 << 20;
674 bmp = vl_msg_api_alloc (sizeof (*bmp));
675 memset (bmp, 0, sizeof (*bmp));
677 bmp->_vl_msg_id = ntohs (VL_API_BIND_URI);
678 bmp->client_index = utm->my_client_index;
679 bmp->context = ntohl(0xfeedface);
680 bmp->initial_segment_size = 256<<20; /* size of initial segment */
681 bmp->options[SESSION_OPTIONS_FLAGS] =
682 SESSION_OPTIONS_FLAGS_USE_FIFO | SESSION_OPTIONS_FLAGS_ADD_SEGMENT;
683 bmp->options[SESSION_OPTIONS_RX_FIFO_SIZE] = fifo_size;
684 bmp->options[SESSION_OPTIONS_TX_FIFO_SIZE] = fifo_size;
685 bmp->options[SESSION_OPTIONS_ADD_SEGMENT_SIZE] = 128<<20;
686 memcpy (bmp->uri, utm->uri, vec_len (utm->uri));
687 vl_msg_api_send_shmem (utm->vl_input_queue, (u8 *)&bmp);
691 vl_api_unbind_uri_reply_t_handler (vl_api_unbind_uri_reply_t *mp)
693 uri_tcp_test_main_t *utm = &uri_tcp_test_main;
696 clib_warning ("returned %d", ntohl(mp->retval));
698 utm->state = STATE_START;
702 uri_tcp_unbind (uri_tcp_test_main_t *utm)
704 vl_api_unbind_uri_t * ump;
706 ump = vl_msg_api_alloc (sizeof (*ump));
707 memset (ump, 0, sizeof (*ump));
709 ump->_vl_msg_id = ntohs (VL_API_UNBIND_URI);
710 ump->client_index = utm->my_client_index;
711 memcpy (ump->uri, utm->uri, vec_len (utm->uri));
712 vl_msg_api_send_shmem (utm->vl_input_queue, (u8 *)&ump);
716 vl_api_accept_session_t_handler (vl_api_accept_session_t * mp)
718 uri_tcp_test_main_t *utm = &uri_tcp_test_main;
719 vl_api_accept_session_reply_t *rmp;
720 svm_fifo_t * rx_fifo, * tx_fifo;
722 static f64 start_time;
726 if (start_time == 0.0)
727 start_time = clib_time_now (&utm->clib_time);
729 utm->vpp_event_queue = (unix_shared_memory_queue_t *)
730 mp->vpp_event_queue_address;
732 /* Allocate local session and set it up */
733 pool_get (utm->sessions, session);
734 session_index = session - utm->sessions;
736 rx_fifo = (svm_fifo_t *)mp->server_rx_fifo;
737 rx_fifo->client_session_index = session_index;
738 tx_fifo = (svm_fifo_t *)mp->server_tx_fifo;
739 tx_fifo->client_session_index = session_index;
741 session->server_rx_fifo = rx_fifo;
742 session->server_tx_fifo = tx_fifo;
744 /* Add it to lookup table */
745 key = (((u64)mp->session_thread_index) << 32) | (u64)mp->session_index;
746 hash_set (utm->session_index_by_vpp_handles, key, session_index);
748 utm->state = STATE_READY;
751 if (pool_elts (utm->sessions) && (pool_elts(utm->sessions) % 20000) == 0)
753 f64 now = clib_time_now (&utm->clib_time);
754 fformat (stdout, "%d active sessions in %.2f seconds, %.2f/sec...\n",
755 pool_elts(utm->sessions), now - start_time,
756 (f64)pool_elts(utm->sessions) / (now - start_time));
759 /* Send accept reply to vpp */
760 rmp = vl_msg_api_alloc (sizeof (*rmp));
761 memset (rmp, 0, sizeof (*rmp));
762 rmp->_vl_msg_id = ntohs (VL_API_ACCEPT_SESSION_REPLY);
763 rmp->session_type = mp->session_type;
764 rmp->session_index = mp->session_index;
765 rmp->session_thread_index = mp->session_thread_index;
766 vl_msg_api_send_shmem (utm->vl_input_queue, (u8 *)&rmp);
770 uri_tcp_server_test (uri_tcp_test_main_t * utm)
776 if (wait_for_state_change (utm, STATE_READY))
778 clib_warning ("timeout waiting for STATE_READY");
782 /* Enter handle event loop */
783 handle_event_queue (utm);
786 uri_tcp_unbind (utm);
788 if (wait_for_state_change (utm, STATE_START))
790 clib_warning ("timeout waiting for STATE_START");
794 fformat (stdout, "Test complete...\n");
797 #define foreach_uri_msg \
798 _(BIND_URI_REPLY, bind_uri_reply) \
799 _(UNBIND_URI_REPLY, unbind_uri_reply) \
800 _(ACCEPT_SESSION, accept_session) \
801 _(CONNECT_URI_REPLY, connect_uri_reply) \
802 _(DISCONNECT_SESSION, disconnect_session) \
803 _(RESET_SESSION, reset_session) \
804 _(MAP_ANOTHER_SEGMENT, map_another_segment)
807 uri_api_hookup (uri_tcp_test_main_t * utm)
810 vl_msg_api_set_handlers(VL_API_##N, #n, \
811 vl_api_##n##_t_handler, \
813 vl_api_##n##_t_endian, \
814 vl_api_##n##_t_print, \
815 sizeof(vl_api_##n##_t), 1);
821 main (int argc, char **argv)
823 uri_tcp_test_main_t *utm = &uri_tcp_test_main;
824 unformat_input_t _argv, *a = &_argv;
827 u8 * bind_name = (u8 *) "tcp://0.0.0.0/1234";
832 int i_am_master = 1, drop_packets = 0;
834 clib_mem_init (0, 256 << 20);
836 heap = clib_mem_get_per_cpu_heap ();
837 h = mheap_header (heap);
839 /* make the main heap thread-safe */
840 h->flags |= MHEAP_FLAG_THREAD_SAFE;
842 vec_validate (utm->rx_buf, 65536);
844 utm->session_index_by_vpp_handles =
845 hash_create (0, sizeof(uword));
847 utm->my_pid = getpid();
848 utm->configured_segment_size = 1<<20;
850 clib_time_init (&utm->clib_time);
851 init_error_string_table (utm);
852 svm_fifo_segment_init(0x200000000ULL, 20);
853 unformat_init_command_line (a, argv);
855 while (unformat_check_input (a) != UNFORMAT_END_OF_INPUT)
857 if (unformat (a, "chroot prefix %s", &chroot_prefix))
859 vl_set_memory_root_path ((char *) chroot_prefix);
861 else if (unformat (a, "uri %s", &bind_name))
863 else if (unformat (a, "segment-size %dM", &tmp))
864 utm->configured_segment_size = tmp<<20;
865 else if (unformat (a, "segment-size %dG", &tmp))
866 utm->configured_segment_size = tmp<<30;
867 else if (unformat (a, "master"))
869 else if (unformat (a, "slave"))
871 else if (unformat (a, "drop"))
875 fformat (stderr, "%s: usage [master|slave]\n");
880 utm->uri = format (0, "%s%c", bind_name, 0);
881 utm->i_am_master = i_am_master;
882 utm->segment_main = &svm_fifo_segment_main;
883 utm->drop_packets = drop_packets;
885 utm->connect_uri = format (0, "tcp://6.0.1.2/1234%c", 0);
887 setup_signal_handlers();
888 uri_api_hookup (utm);
890 if (connect_to_vpp (i_am_master? "uri_tcp_server":"uri_tcp_client") < 0)
893 fformat (stderr, "Couldn't connect to vpe, exiting...\n");
897 if (i_am_master == 0)
899 uri_tcp_client_test (utm);
903 /* $$$$ hack preallocation */
904 for (i = 0; i < 200000; i++)
906 pool_get (utm->sessions, session);
907 memset (session, 0, sizeof (*session));
909 for (i = 0; i < 200000; i++)
910 pool_put_index (utm->sessions, i);
912 uri_tcp_server_test (utm);
914 vl_client_disconnect_from_vlib ();