session: send ctrl msg over mq
[vpp.git] / src / tests / vnet / session / udp_echo.c
1 /*
2  * Copyright (c) 2016 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include <stdio.h>
17 #include <setjmp.h>
18 #include <signal.h>
19 #include <vppinfra/clib.h>
20 #include <vppinfra/format.h>
21 #include <vppinfra/error.h>
22 #include <vppinfra/time.h>
23 #include <vppinfra/macros.h>
24 #include <vnet/vnet.h>
25 #include <vlib/vlib.h>
26 #include <vlib/unix/unix.h>
27 #include <vlibapi/api.h>
28 #include <vlibmemory/api.h>
29 #include <vpp/api/vpe_msg_enum.h>
30 #include <svm/svm_fifo_segment.h>
31 #include <pthread.h>
32 #include <vnet/session/application_interface.h>
33
34 #define vl_typedefs             /* define message structures */
35 #include <vpp/api/vpe_all_api_h.h>
36 #undef vl_typedefs
37
38 /* declare message handlers for each api */
39
40 #define vl_endianfun            /* define message structures */
41 #include <vpp/api/vpe_all_api_h.h>
42 #undef vl_endianfun
43
44 /* instantiate all the print functions we know about */
45 #define vl_print(handle, ...)
46 #define vl_printfun
47 #include <vpp/api/vpe_all_api_h.h>
48 #undef vl_printfun
49
50 typedef enum
51 {
52   STATE_START,
53   STATE_ATTACHED,
54   STATE_BOUND,
55   STATE_READY,
56   STATE_FAILED,
57   STATE_DISCONNECTING,
58   STATE_DETACHED
59 } connection_state_t;
60
61 typedef struct
62 {
63   /* vpe input queue */
64   svm_queue_t *vl_input_queue;
65
66   /* API client handle */
67   u32 my_client_index;
68
69   /* The URI we're playing with */
70   u8 *listen_uri;
71
72   /* URI for connect */
73   u8 *connect_uri;
74
75   /* Session pool */
76   app_session_t *sessions;
77
78   /* Hash table for disconnect processing */
79   uword *session_index_by_vpp_handles;
80
81   /* fifo segment */
82   svm_fifo_segment_private_t *seg;
83
84   /* intermediate rx buffer */
85   u8 *rx_buf;
86
87   u32 fifo_size;
88   int i_am_server;
89   u8 is_connected;
90
91   /* Our event queue */
92   svm_msg_q_t *our_event_queue;
93
94   /* $$$ single thread only for the moment */
95   svm_msg_q_t *vpp_event_queue;
96
97   /* $$$$ hack: cut-through session index */
98   volatile u32 cut_through_session_index;
99   volatile u32 connected_session;
100
101   /* unique segment name counter */
102   u32 unique_segment_index;
103
104   pid_t my_pid;
105
106   /* pthread handle */
107   pthread_t cut_through_thread_handle;
108
109   /* For deadman timers */
110   clib_time_t clib_time;
111
112   /* State of the connection, shared between msg RX thread and main thread */
113   volatile connection_state_t state;
114
115   volatile int time_to_stop;
116   volatile int time_to_print_stats;
117
118   u32 configured_segment_size;
119
120   /* VNET_API_ERROR_FOO -> "Foo" hash table */
121   uword *error_string_by_error_number;
122
123   /* convenience */
124   svm_fifo_segment_main_t *segment_main;
125
126   u8 *connect_test_data;
127
128   uword *segments_table;
129   u8 do_echo;
130   u8 have_return;
131   u64 bytes_to_send;
132 } udp_echo_main_t;
133
134 #if CLIB_DEBUG > 0
135 #define NITER 10000
136 #else
137 #define NITER 4000000
138 #endif
139
140 udp_echo_main_t udp_echo_main;
141
142 static void
143 stop_signal (int signum)
144 {
145   udp_echo_main_t *um = &udp_echo_main;
146
147   um->time_to_stop = 1;
148 }
149
150 static void
151 stats_signal (int signum)
152 {
153   udp_echo_main_t *um = &udp_echo_main;
154   um->time_to_print_stats = 1;
155 }
156
157 static clib_error_t *
158 setup_signal_handlers (void)
159 {
160   signal (SIGINT, stats_signal);
161   signal (SIGQUIT, stop_signal);
162   signal (SIGTERM, stop_signal);
163
164   return 0;
165 }
166
167 uword
168 unformat_ip4_address (unformat_input_t * input, va_list * args)
169 {
170   u8 *result = va_arg (*args, u8 *);
171   unsigned a[4];
172
173   if (!unformat (input, "%d.%d.%d.%d", &a[0], &a[1], &a[2], &a[3]))
174     return 0;
175
176   if (a[0] >= 256 || a[1] >= 256 || a[2] >= 256 || a[3] >= 256)
177     return 0;
178
179   result[0] = a[0];
180   result[1] = a[1];
181   result[2] = a[2];
182   result[3] = a[3];
183
184   return 1;
185 }
186
187 uword
188 unformat_ip6_address (unformat_input_t * input, va_list * args)
189 {
190   ip6_address_t *result = va_arg (*args, ip6_address_t *);
191   u16 hex_quads[8];
192   uword hex_quad, n_hex_quads, hex_digit, n_hex_digits;
193   uword c, n_colon, double_colon_index;
194
195   n_hex_quads = hex_quad = n_hex_digits = n_colon = 0;
196   double_colon_index = ARRAY_LEN (hex_quads);
197   while ((c = unformat_get_input (input)) != UNFORMAT_END_OF_INPUT)
198     {
199       hex_digit = 16;
200       if (c >= '0' && c <= '9')
201         hex_digit = c - '0';
202       else if (c >= 'a' && c <= 'f')
203         hex_digit = c + 10 - 'a';
204       else if (c >= 'A' && c <= 'F')
205         hex_digit = c + 10 - 'A';
206       else if (c == ':' && n_colon < 2)
207         n_colon++;
208       else
209         {
210           unformat_put_input (input);
211           break;
212         }
213
214       /* Too many hex quads. */
215       if (n_hex_quads >= ARRAY_LEN (hex_quads))
216         return 0;
217
218       if (hex_digit < 16)
219         {
220           hex_quad = (hex_quad << 4) | hex_digit;
221
222           /* Hex quad must fit in 16 bits. */
223           if (n_hex_digits >= 4)
224             return 0;
225
226           n_colon = 0;
227           n_hex_digits++;
228         }
229
230       /* Save position of :: */
231       if (n_colon == 2)
232         {
233           /* More than one :: ? */
234           if (double_colon_index < ARRAY_LEN (hex_quads))
235             return 0;
236           double_colon_index = n_hex_quads;
237         }
238
239       if (n_colon > 0 && n_hex_digits > 0)
240         {
241           hex_quads[n_hex_quads++] = hex_quad;
242           hex_quad = 0;
243           n_hex_digits = 0;
244         }
245     }
246
247   if (n_hex_digits > 0)
248     hex_quads[n_hex_quads++] = hex_quad;
249
250   {
251     word i;
252
253     /* Expand :: to appropriate number of zero hex quads. */
254     if (double_colon_index < ARRAY_LEN (hex_quads))
255       {
256         word n_zero = ARRAY_LEN (hex_quads) - n_hex_quads;
257
258         for (i = n_hex_quads - 1; i >= (signed) double_colon_index; i--)
259           hex_quads[n_zero + i] = hex_quads[i];
260
261         for (i = 0; i < n_zero; i++)
262           hex_quads[double_colon_index + i] = 0;
263
264         n_hex_quads = ARRAY_LEN (hex_quads);
265       }
266
267     /* Too few hex quads given. */
268     if (n_hex_quads < ARRAY_LEN (hex_quads))
269       return 0;
270
271     for (i = 0; i < ARRAY_LEN (hex_quads); i++)
272       result->as_u16[i] = clib_host_to_net_u16 (hex_quads[i]);
273
274     return 1;
275   }
276 }
277
278 uword
279 unformat_uri (unformat_input_t * input, va_list * args)
280 {
281   session_endpoint_extended_t *sep = va_arg (*args,
282                                              session_endpoint_extended_t *);
283   u32 port;
284   char *tmp;
285
286   if (unformat (input, "%s://%U/%d", &tmp, unformat_ip4_address, &sep->ip.ip4,
287                 &port))
288     {
289       sep->port = clib_host_to_net_u16 (port);
290       sep->is_ip4 = 1;
291       return 1;
292     }
293   else if (unformat (input, "%s://%U/%d", &tmp, unformat_ip6_address,
294                      &sep->ip.ip6, &port))
295     {
296       sep->port = clib_host_to_net_u16 (port);
297       sep->is_ip4 = 0;
298       return 1;
299     }
300   return 0;
301 }
302
303 static void
304 application_send_attach (udp_echo_main_t * utm)
305 {
306   vl_api_application_attach_t *bmp;
307   bmp = vl_msg_api_alloc (sizeof (*bmp));
308   memset (bmp, 0, sizeof (*bmp));
309
310   bmp->_vl_msg_id = ntohs (VL_API_APPLICATION_ATTACH);
311   bmp->client_index = utm->my_client_index;
312   bmp->context = ntohl (0xfeedface);
313   bmp->options[APP_OPTIONS_FLAGS] = APP_OPTIONS_FLAGS_ADD_SEGMENT;
314   bmp->options[APP_OPTIONS_FLAGS] |= APP_OPTIONS_FLAGS_USE_GLOBAL_SCOPE;
315   bmp->options[APP_OPTIONS_FLAGS] |= APP_OPTIONS_FLAGS_USE_LOCAL_SCOPE;
316   bmp->options[APP_OPTIONS_FLAGS] |= APP_OPTIONS_FLAGS_USE_MQ_FOR_CTRL_MSGS;
317   bmp->options[APP_OPTIONS_PREALLOC_FIFO_PAIRS] = 2;
318   bmp->options[APP_OPTIONS_RX_FIFO_SIZE] = utm->fifo_size;
319   bmp->options[APP_OPTIONS_TX_FIFO_SIZE] = utm->fifo_size;
320   bmp->options[APP_OPTIONS_ADD_SEGMENT_SIZE] = 128 << 20;
321   bmp->options[APP_OPTIONS_SEGMENT_SIZE] = 256 << 20;
322   bmp->options[APP_OPTIONS_EVT_QUEUE_SIZE] = 16768;
323   vl_msg_api_send_shmem (utm->vl_input_queue, (u8 *) & bmp);
324 }
325
326 void
327 application_detach (udp_echo_main_t * utm)
328 {
329   vl_api_application_detach_t *bmp;
330   bmp = vl_msg_api_alloc (sizeof (*bmp));
331   memset (bmp, 0, sizeof (*bmp));
332
333   bmp->_vl_msg_id = ntohs (VL_API_APPLICATION_DETACH);
334   bmp->client_index = utm->my_client_index;
335   bmp->context = ntohl (0xfeedface);
336   vl_msg_api_send_shmem (utm->vl_input_queue, (u8 *) & bmp);
337 }
338
339 static void
340 vl_api_application_attach_reply_t_handler (vl_api_application_attach_reply_t *
341                                            mp)
342 {
343   udp_echo_main_t *utm = &udp_echo_main;
344   svm_fifo_segment_create_args_t _a = { 0 }, *a = &_a;
345   int rv;
346
347   if (mp->retval)
348     {
349       clib_warning ("attach failed: %d", mp->retval);
350       utm->state = STATE_FAILED;
351       return;
352     }
353
354   if (mp->segment_name_length == 0)
355     {
356       clib_warning ("segment_name_length zero");
357       return;
358     }
359
360   a->segment_name = (char *) mp->segment_name;
361   a->segment_size = mp->segment_size;
362
363   ASSERT (mp->app_event_queue_address);
364
365   /* Attach to the segment vpp created */
366   rv = svm_fifo_segment_attach (a);
367   if (rv)
368     {
369       clib_warning ("svm_fifo_segment_attach ('%s') failed",
370                     mp->segment_name);
371       return;
372     }
373
374   utm->our_event_queue =
375     uword_to_pointer (mp->app_event_queue_address, svm_msg_q_t *);
376   utm->state = STATE_ATTACHED;
377 }
378
379 static void
380 vl_api_application_detach_reply_t_handler (vl_api_application_detach_reply_t *
381                                            mp)
382 {
383   if (mp->retval)
384     clib_warning ("detach returned with err: %d", mp->retval);
385   udp_echo_main.state = STATE_DETACHED;
386 }
387
388 u8 *
389 format_api_error (u8 * s, va_list * args)
390 {
391   udp_echo_main_t *utm = va_arg (*args, udp_echo_main_t *);
392   i32 error = va_arg (*args, u32);
393   uword *p;
394
395   p = hash_get (utm->error_string_by_error_number, -error);
396
397   if (p)
398     s = format (s, "%s", p[0]);
399   else
400     s = format (s, "%d", error);
401   return s;
402 }
403
404 int
405 wait_for_state_change (udp_echo_main_t * utm, connection_state_t state)
406 {
407 #if CLIB_DEBUG > 0
408 #define TIMEOUT 600.0
409 #else
410 #define TIMEOUT 600.0
411 #endif
412
413   f64 timeout = clib_time_now (&utm->clib_time) + TIMEOUT;
414
415   while (clib_time_now (&utm->clib_time) < timeout)
416     {
417       if (utm->state == state)
418         return 0;
419     }
420   return -1;
421 }
422
423 u64 server_bytes_received, server_bytes_sent;
424
425 static void *
426 cut_through_thread_fn (void *arg)
427 {
428   app_session_t *s;
429   svm_fifo_t *rx_fifo;
430   svm_fifo_t *tx_fifo;
431   u8 *my_copy_buffer = 0;
432   udp_echo_main_t *utm = &udp_echo_main;
433   i32 actual_transfer;
434   int rv;
435   u32 buffer_offset;
436
437   while (utm->cut_through_session_index == ~0)
438     ;
439
440   s = pool_elt_at_index (utm->sessions, utm->cut_through_session_index);
441
442   rx_fifo = s->rx_fifo;
443   tx_fifo = s->tx_fifo;
444
445   vec_validate (my_copy_buffer, 64 * 1024 - 1);
446
447   while (true)
448     {
449       /* We read from the tx fifo and write to the rx fifo */
450       do
451         {
452           actual_transfer = svm_fifo_dequeue_nowait (rx_fifo,
453                                                      vec_len (my_copy_buffer),
454                                                      my_copy_buffer);
455         }
456       while (actual_transfer <= 0);
457
458       server_bytes_received += actual_transfer;
459
460       buffer_offset = 0;
461       while (actual_transfer > 0)
462         {
463           rv = svm_fifo_enqueue_nowait (tx_fifo, actual_transfer,
464                                         my_copy_buffer + buffer_offset);
465           if (rv > 0)
466             {
467               actual_transfer -= rv;
468               buffer_offset += rv;
469               server_bytes_sent += rv;
470             }
471
472         }
473       if (PREDICT_FALSE (utm->time_to_stop))
474         break;
475     }
476
477   pthread_exit (0);
478 }
479
480 static void
481 session_accepted_handler (session_accepted_msg_t * mp)
482 {
483   app_session_evt_t _app_evt, *app_evt = &_app_evt;
484   udp_echo_main_t *utm = &udp_echo_main;
485   session_accepted_reply_msg_t *rmp;
486   svm_fifo_t *rx_fifo, *tx_fifo;
487   app_session_t *session;
488   static f64 start_time;
489   u32 session_index;
490   int rv = 0;
491
492   if (start_time == 0.0)
493     start_time = clib_time_now (&utm->clib_time);
494
495   utm->vpp_event_queue = uword_to_pointer (mp->vpp_event_queue_address,
496                                            svm_msg_q_t *);
497   rx_fifo = uword_to_pointer (mp->server_rx_fifo, svm_fifo_t *);
498   tx_fifo = uword_to_pointer (mp->server_tx_fifo, svm_fifo_t *);
499
500   pool_get (utm->sessions, session);
501   memset (session, 0, sizeof (*session));
502   session_index = session - utm->sessions;
503
504   /* Cut-through case */
505   if (mp->server_event_queue_address)
506     {
507       clib_warning ("cut-through session");
508       utm->our_event_queue = uword_to_pointer (mp->server_event_queue_address,
509                                                svm_msg_q_t *);
510       rx_fifo->master_session_index = session_index;
511       tx_fifo->master_session_index = session_index;
512       utm->cut_through_session_index = session_index;
513       session->rx_fifo = rx_fifo;
514       session->tx_fifo = tx_fifo;
515
516       rv = pthread_create (&utm->cut_through_thread_handle,
517                            NULL /*attr */ , cut_through_thread_fn, 0);
518       if (rv)
519         {
520           clib_warning ("pthread_create returned %d", rv);
521           rv = VNET_API_ERROR_SYSCALL_ERROR_1;
522         }
523       utm->do_echo = 1;
524     }
525   else
526     {
527       rx_fifo->client_session_index = session_index;
528       tx_fifo->client_session_index = session_index;
529       session->rx_fifo = rx_fifo;
530       session->tx_fifo = tx_fifo;
531       clib_memcpy (&session->transport.rmt_ip, mp->ip,
532                    sizeof (ip46_address_t));
533       session->transport.is_ip4 = mp->is_ip4;
534       session->transport.rmt_port = mp->port;
535     }
536
537   hash_set (utm->session_index_by_vpp_handles, mp->handle, session_index);
538   if (pool_elts (utm->sessions) && (pool_elts (utm->sessions) % 20000) == 0)
539     {
540       f64 now = clib_time_now (&utm->clib_time);
541       fformat (stdout, "%d active sessions in %.2f seconds, %.2f/sec...\n",
542                pool_elts (utm->sessions), now - start_time,
543                (f64) pool_elts (utm->sessions) / (now - start_time));
544     }
545
546   app_alloc_ctrl_evt_to_vpp (session->vpp_evt_q, app_evt,
547                              SESSION_CTRL_EVT_ACCEPTED_REPLY);
548   rmp = (session_accepted_reply_msg_t *) app_evt->evt->data;
549   rmp->handle = mp->handle;
550   rmp->context = mp->context;
551   rmp->retval = rv;
552   app_send_ctrl_evt_to_vpp (session->vpp_evt_q, app_evt);
553
554   CLIB_MEMORY_BARRIER ();
555   utm->state = STATE_READY;
556 }
557
558 static void
559 session_disconnected_handler (session_disconnected_msg_t * mp)
560 {
561   app_session_evt_t _app_evt, *app_evt = &_app_evt;
562   udp_echo_main_t *utm = &udp_echo_main;
563   session_disconnected_reply_msg_t *rmp;
564   app_session_t *session;
565   uword *p;
566   int rv = 0;
567
568   p = hash_get (utm->session_index_by_vpp_handles, mp->handle);
569
570   if (p)
571     {
572       session = pool_elt_at_index (utm->sessions, p[0]);
573       hash_unset (utm->session_index_by_vpp_handles, mp->handle);
574       pool_put (utm->sessions, session);
575     }
576   else
577     {
578       clib_warning ("couldn't find session key %llx", mp->handle);
579       return;
580     }
581
582   app_alloc_ctrl_evt_to_vpp (session->vpp_evt_q, app_evt,
583                              SESSION_CTRL_EVT_DISCONNECTED_REPLY);
584   rmp = (session_disconnected_reply_msg_t *) app_evt->evt->data;
585   rmp->retval = rv;
586   rmp->handle = mp->handle;
587   rmp->context = mp->context;
588   app_send_ctrl_evt_to_vpp (session->vpp_evt_q, app_evt);
589 }
590
591 static void
592 session_connected_handler (session_connected_msg_t * mp)
593 {
594   udp_echo_main_t *utm = &udp_echo_main;
595   unformat_input_t _input, *input = &_input;
596   session_endpoint_extended_t _sep, *sep = &_sep;
597   app_session_t *session;
598
599   ASSERT (utm->i_am_server == 0);
600
601   if (mp->retval)
602     {
603       clib_warning ("failed connect");
604       return;
605     }
606
607   ASSERT (mp->server_rx_fifo && mp->server_tx_fifo);
608
609   pool_get (utm->sessions, session);
610   session->rx_fifo = uword_to_pointer (mp->server_rx_fifo, svm_fifo_t *);
611   session->tx_fifo = uword_to_pointer (mp->server_tx_fifo, svm_fifo_t *);
612   session->vpp_evt_q = uword_to_pointer (mp->vpp_event_queue_address,
613                                          svm_msg_q_t *);
614   /* Cut-through case */
615   if (mp->client_event_queue_address)
616     {
617       clib_warning ("cut-through session");
618       utm->cut_through_session_index = session - utm->sessions;
619       utm->vpp_event_queue = uword_to_pointer (mp->vpp_event_queue_address,
620                                                svm_msg_q_t *);
621       utm->our_event_queue = uword_to_pointer (mp->client_event_queue_address,
622                                                svm_msg_q_t *);
623       utm->do_echo = 1;
624     }
625   else
626     {
627       utm->connected_session = session - utm->sessions;
628       utm->vpp_event_queue = uword_to_pointer (mp->vpp_event_queue_address,
629                                                svm_msg_q_t *);
630
631       clib_memcpy (&session->transport.lcl_ip, mp->lcl_ip,
632                    sizeof (ip46_address_t));
633       session->transport.is_ip4 = mp->is_ip4;
634       session->transport.lcl_port = mp->lcl_port;
635
636       unformat_init_vector (input, utm->connect_uri);
637       if (!unformat (input, "%U", unformat_uri, sep))
638         {
639           clib_warning ("can't figure out remote ip and port");
640           utm->state = STATE_FAILED;
641           unformat_free (input);
642           return;
643         }
644       unformat_free (input);
645       clib_memcpy (&session->transport.rmt_ip, &sep->ip,
646                    sizeof (ip46_address_t));
647       session->transport.rmt_port = sep->port;
648       session->is_dgram = !utm->is_connected;
649     }
650   utm->state = STATE_READY;
651 }
652
653 static void
654 handle_mq_event (session_event_t * e)
655 {
656   switch (e->event_type)
657     {
658     case SESSION_CTRL_EVT_ACCEPTED:
659       session_accepted_handler ((session_accepted_msg_t *) e->data);
660       break;
661     case SESSION_CTRL_EVT_CONNECTED:
662       session_connected_handler ((session_connected_msg_t *) e->data);
663       break;
664     case SESSION_CTRL_EVT_DISCONNECTED:
665       session_disconnected_handler ((session_disconnected_msg_t *) e->data);
666       break;
667     default:
668       clib_warning ("unhandled %u", e->event_type);
669     }
670 }
671
672 static void
673 udp_client_send_connect (udp_echo_main_t * utm)
674 {
675   vl_api_connect_uri_t *cmp;
676   cmp = vl_msg_api_alloc (sizeof (*cmp));
677   memset (cmp, 0, sizeof (*cmp));
678
679   cmp->_vl_msg_id = ntohs (VL_API_CONNECT_URI);
680   cmp->client_index = utm->my_client_index;
681   cmp->context = ntohl (0xfeedface);
682   memcpy (cmp->uri, utm->connect_uri, vec_len (utm->connect_uri));
683   vl_msg_api_send_shmem (utm->vl_input_queue, (u8 *) & cmp);
684 }
685
686 static void
687 client_send_cut_through (udp_echo_main_t * utm, app_session_t * session)
688 {
689   int i;
690   u8 *test_data = 0;
691   u64 bytes_received = 0, bytes_sent = 0;
692   i32 bytes_to_read;
693   int rv;
694   f64 before, after, delta, bytes_per_second;
695   svm_fifo_t *rx_fifo, *tx_fifo;
696   int buffer_offset, bytes_to_send = 0;
697
698   /*
699    * Prepare test data
700    */
701   vec_validate (test_data, 64 * 1024 - 1);
702   for (i = 0; i < vec_len (test_data); i++)
703     test_data[i] = i & 0xff;
704
705   rx_fifo = session->rx_fifo;
706   tx_fifo = session->tx_fifo;
707
708   before = clib_time_now (&utm->clib_time);
709
710   vec_validate (utm->rx_buf, vec_len (test_data) - 1);
711
712   for (i = 0; i < NITER; i++)
713     {
714       bytes_to_send = vec_len (test_data);
715       buffer_offset = 0;
716       while (bytes_to_send > 0)
717         {
718           rv = svm_fifo_enqueue_nowait (tx_fifo, bytes_to_send,
719                                         test_data + buffer_offset);
720
721           if (rv > 0)
722             {
723               bytes_to_send -= rv;
724               buffer_offset += rv;
725               bytes_sent += rv;
726             }
727         }
728
729       bytes_to_read = svm_fifo_max_dequeue (rx_fifo);
730       bytes_to_read = vec_len (utm->rx_buf) > bytes_to_read ?
731         bytes_to_read : vec_len (utm->rx_buf);
732
733       buffer_offset = 0;
734       while (bytes_to_read > 0)
735         {
736           rv = svm_fifo_dequeue_nowait (rx_fifo,
737                                         bytes_to_read,
738                                         utm->rx_buf + buffer_offset);
739           if (rv > 0)
740             {
741               bytes_to_read -= rv;
742               buffer_offset += rv;
743               bytes_received += rv;
744             }
745         }
746     }
747   while (bytes_received < bytes_sent)
748     {
749       rv =
750         svm_fifo_dequeue_nowait (rx_fifo, vec_len (utm->rx_buf), utm->rx_buf);
751       if (rv > 0)
752         {
753 #if CLIB_DEBUG > 0
754           int j;
755           for (j = 0; j < rv; j++)
756             {
757               if (utm->rx_buf[j] != ((bytes_received + j) & 0xff))
758                 {
759                   clib_warning ("error at byte %lld, 0x%x not 0x%x",
760                                 bytes_received + j,
761                                 utm->rx_buf[j],
762                                 ((bytes_received + j) & 0xff));
763                 }
764             }
765 #endif
766           bytes_received += (u64) rv;
767         }
768     }
769
770   after = clib_time_now (&utm->clib_time);
771   delta = after - before;
772   bytes_per_second = 0.0;
773
774   if (delta > 0.0)
775     bytes_per_second = (f64) bytes_received / delta;
776
777   fformat (stdout,
778            "Done: %lld recv bytes in %.2f seconds, %.2f bytes/sec...\n\n",
779            bytes_received, delta, bytes_per_second);
780   fformat (stdout,
781            "Done: %lld sent bytes in %.2f seconds, %.2f bytes/sec...\n\n",
782            bytes_sent, delta, bytes_per_second);
783   fformat (stdout,
784            "client -> server -> client round trip: %.2f Gbit/sec \n\n",
785            (bytes_per_second * 8.0) / 1e9);
786 }
787
788 static void
789 send_test_chunk (udp_echo_main_t * utm, app_session_t * s, u32 bytes)
790 {
791   u8 *test_data = utm->connect_test_data;
792   int test_buf_offset = 0;
793   u64 bytes_sent = 0;
794   u32 bytes_to_snd, enq_space, min_chunk;
795   int rv;
796
797   min_chunk = clib_min (65536, s->tx_fifo->nitems);
798   bytes_to_snd = (bytes == 0) ? vec_len (test_data) : bytes;
799   if (bytes_to_snd > vec_len (test_data))
800     bytes_to_snd = vec_len (test_data);
801
802   while (bytes_to_snd > 0 && !utm->time_to_stop)
803     {
804       enq_space = svm_fifo_max_enqueue (s->tx_fifo);
805       if (enq_space < clib_min (bytes_to_snd, min_chunk))
806         continue;
807
808       rv = app_send (s, test_data + test_buf_offset, bytes_to_snd, 0);
809       if (rv > 0)
810         {
811           bytes_to_snd -= rv;
812           test_buf_offset += rv;
813           bytes_sent += rv;
814         }
815     }
816 }
817
818 static void
819 recv_test_chunk (udp_echo_main_t * utm, app_session_t * s)
820 {
821   app_recv (s, utm->rx_buf, vec_len (utm->rx_buf));
822 }
823
824 void
825 client_send_data (udp_echo_main_t * utm)
826 {
827   f64 start_time, end_time, delta;
828   app_session_t *session;
829   char *transfer_type;
830   u32 n_iterations;
831   u8 *test_data;
832   int i;
833
834   vec_validate (utm->connect_test_data, 1024 * 1024 - 1);
835   for (i = 0; i < vec_len (utm->connect_test_data); i++)
836     utm->connect_test_data[i] = i & 0xff;
837
838   test_data = utm->connect_test_data;
839   session = pool_elt_at_index (utm->sessions, utm->connected_session);
840   ASSERT (vec_len (test_data) > 0);
841
842   vec_validate (utm->rx_buf, vec_len (test_data) - 1);
843   n_iterations = utm->bytes_to_send / vec_len (test_data);
844   if (!n_iterations)
845     n_iterations = 1;
846
847   start_time = clib_time_now (&utm->clib_time);
848   for (i = 0; i < n_iterations; i++)
849     {
850       send_test_chunk (utm, session, 0);
851       if (utm->have_return)
852         recv_test_chunk (utm, session);
853       if (utm->time_to_stop)
854         break;
855     }
856
857   if (utm->have_return)
858     {
859       f64 timeout = clib_time_now (&utm->clib_time) + 5;
860       while (clib_time_now (&utm->clib_time) < timeout)
861         recv_test_chunk (utm, session);
862     }
863
864   end_time = clib_time_now (&utm->clib_time);
865   delta = end_time - start_time;
866   transfer_type = utm->have_return ? "full-duplex" : "half-duplex";
867   clib_warning ("%lld bytes (%lld mbytes, %lld gbytes) in %.2f seconds",
868                 utm->bytes_to_send, utm->bytes_to_send / (1ULL << 20),
869                 utm->bytes_to_send / (1ULL << 30), delta);
870   clib_warning ("%.2f bytes/second %s", ((f64) utm->bytes_to_send) / (delta),
871                 transfer_type);
872   clib_warning ("%.4f gbit/second %s",
873                 (((f64) utm->bytes_to_send * 8.0) / delta / 1e9),
874                 transfer_type);
875 }
876
877 static int
878 application_attach (udp_echo_main_t * utm)
879 {
880   application_send_attach (utm);
881   if (wait_for_state_change (utm, STATE_ATTACHED))
882     {
883       clib_warning ("timeout waiting for STATE_ATTACHED");
884       return -1;
885     }
886   return 0;
887 }
888
889 static void
890 client_test (udp_echo_main_t * utm)
891 {
892   f64 start_time, timeout = 100.0;
893   app_session_t *session;
894   svm_msg_q_msg_t msg;
895   session_event_t *e;
896
897   if (application_attach (utm))
898     return;
899
900   udp_client_send_connect (utm);
901
902   start_time = clib_time_now (&utm->clib_time);
903   while (pool_elts (utm->sessions) != 1
904          && clib_time_now (&utm->clib_time) - start_time < timeout
905          && utm->state != STATE_FAILED)
906
907     {
908       svm_msg_q_sub (utm->our_event_queue, &msg, SVM_Q_WAIT, 0);
909       e = svm_msg_q_msg_data (utm->our_event_queue, &msg);
910       handle_mq_event (e);
911       svm_msg_q_free_msg (utm->our_event_queue, &msg);
912     }
913
914   if (utm->cut_through_session_index != ~0)
915     {
916       session = pool_elt_at_index (utm->sessions,
917                                    utm->cut_through_session_index);
918       client_send_cut_through (utm, session);
919     }
920   else
921     {
922       session = pool_elt_at_index (utm->sessions, utm->connected_session);
923       client_send_data (utm);
924     }
925
926   application_detach (utm);
927   wait_for_state_change (utm, STATE_DETACHED);
928 }
929
930 static void
931 vl_api_bind_uri_reply_t_handler (vl_api_bind_uri_reply_t * mp)
932 {
933   udp_echo_main_t *utm = &udp_echo_main;
934   svm_fifo_t *rx_fifo, *tx_fifo;
935   app_session_t *session;
936   u32 session_index;
937
938   if (mp->retval)
939     {
940       clib_warning ("bind failed: %d", mp->retval);
941       utm->state = STATE_FAILED;
942       return;
943     }
944
945   rx_fifo = uword_to_pointer (mp->rx_fifo, svm_fifo_t *);
946   tx_fifo = uword_to_pointer (mp->tx_fifo, svm_fifo_t *);
947
948   pool_get (utm->sessions, session);
949   memset (session, 0, sizeof (*session));
950   session_index = session - utm->sessions;
951
952   rx_fifo->client_session_index = session_index;
953   tx_fifo->client_session_index = session_index;
954   session->rx_fifo = rx_fifo;
955   session->tx_fifo = tx_fifo;
956   clib_memcpy (&session->transport.lcl_ip, mp->lcl_ip,
957                sizeof (ip46_address_t));
958   session->transport.is_ip4 = mp->lcl_is_ip4;
959   session->transport.lcl_port = mp->lcl_port;
960   session->vpp_evt_q = uword_to_pointer (mp->vpp_evt_q, svm_msg_q_t *);
961
962   utm->state = utm->is_connected ? STATE_BOUND : STATE_READY;
963 }
964
965 static void
966 vl_api_map_another_segment_t_handler (vl_api_map_another_segment_t * mp)
967 {
968   udp_echo_main_t *utm = &udp_echo_main;
969   svm_fifo_segment_create_args_t _a, *a = &_a;
970   svm_fifo_segment_private_t *seg;
971   u8 *seg_name;
972   int rv;
973
974   memset (a, 0, sizeof (*a));
975   a->segment_name = (char *) mp->segment_name;
976   a->segment_size = mp->segment_size;
977   /* Attach to the segment vpp created */
978   rv = svm_fifo_segment_attach (a);
979   if (rv)
980     {
981       clib_warning ("svm_fifo_segment_attach ('%s') failed",
982                     mp->segment_name);
983       return;
984     }
985   seg = svm_fifo_segment_get_segment (a->new_segment_indices[0]);
986   clib_warning ("Mapped new segment '%s' size %d", seg->ssvm.name,
987                 seg->ssvm.ssvm_size);
988   seg_name = format (0, "%s", (char *) mp->segment_name);
989   hash_set_mem (utm->segments_table, seg_name, a->new_segment_indices[0]);
990   vec_free (seg_name);
991 }
992
993 static void
994 vl_api_unmap_segment_t_handler (vl_api_unmap_segment_t * mp)
995 {
996   udp_echo_main_t *utm = &udp_echo_main;
997   svm_fifo_segment_private_t *seg;
998   u64 *seg_indexp;
999   u8 *seg_name;
1000
1001
1002   seg_name = format (0, "%s", mp->segment_name);
1003   seg_indexp = hash_get_mem (utm->segments_table, seg_name);
1004   if (!seg_indexp)
1005     {
1006       clib_warning ("segment not mapped: %s", seg_name);
1007       return;
1008     }
1009   hash_unset_mem (utm->segments_table, seg_name);
1010   seg = svm_fifo_segment_get_segment ((u32) seg_indexp[0]);
1011   svm_fifo_segment_delete (seg);
1012   clib_warning ("Unmapped segment '%s'", seg_name);
1013   vec_free (seg_name);
1014 }
1015
1016 static void
1017 vl_api_unbind_uri_reply_t_handler (vl_api_unbind_uri_reply_t * mp)
1018 {
1019   udp_echo_main_t *utm = &udp_echo_main;
1020
1021   if (mp->retval != 0)
1022     clib_warning ("returned %d", ntohl (mp->retval));
1023
1024   utm->state = STATE_START;
1025 }
1026
1027 #define foreach_tcp_echo_msg                            \
1028 _(BIND_URI_REPLY, bind_uri_reply)                       \
1029 _(UNBIND_URI_REPLY, unbind_uri_reply)                   \
1030 _(MAP_ANOTHER_SEGMENT, map_another_segment)             \
1031 _(UNMAP_SEGMENT, unmap_segment)                         \
1032 _(APPLICATION_ATTACH_REPLY, application_attach_reply)   \
1033 _(APPLICATION_DETACH_REPLY, application_detach_reply)   \
1034
1035 void
1036 tcp_echo_api_hookup (udp_echo_main_t * utm)
1037 {
1038 #define _(N,n)                                                  \
1039     vl_msg_api_set_handlers(VL_API_##N, #n,                     \
1040                            vl_api_##n##_t_handler,              \
1041                            vl_noop_handler,                     \
1042                            vl_api_##n##_t_endian,               \
1043                            vl_api_##n##_t_print,                \
1044                            sizeof(vl_api_##n##_t), 1);
1045   foreach_tcp_echo_msg;
1046 #undef _
1047
1048 }
1049
1050 int
1051 connect_to_vpp (char *name)
1052 {
1053   udp_echo_main_t *utm = &udp_echo_main;
1054   api_main_t *am = &api_main;
1055
1056   if (vl_client_connect_to_vlib ("/vpe-api", name, 32) < 0)
1057     return -1;
1058
1059   utm->vl_input_queue = am->shmem_hdr->vl_input_queue;
1060   utm->my_client_index = am->my_client_index;
1061
1062   return 0;
1063 }
1064
1065 void
1066 vlib_cli_output (struct vlib_main_t *vm, char *fmt, ...)
1067 {
1068   clib_warning ("BUG");
1069 }
1070
1071 static void
1072 init_error_string_table (udp_echo_main_t * utm)
1073 {
1074   utm->error_string_by_error_number = hash_create (0, sizeof (uword));
1075
1076 #define _(n,v,s) hash_set (utm->error_string_by_error_number, -v, s);
1077   foreach_vnet_api_error;
1078 #undef _
1079
1080   hash_set (utm->error_string_by_error_number, 99, "Misc");
1081 }
1082
1083 void
1084 server_handle_fifo_event_rx (udp_echo_main_t * utm, session_event_t * e)
1085 {
1086   app_session_t *s;
1087   int rv;
1088
1089   s = pool_elt_at_index (utm->sessions, e->fifo->client_session_index);
1090   app_recv (s, utm->rx_buf, vec_len (utm->rx_buf));
1091
1092   if (utm->do_echo)
1093     {
1094       do
1095         {
1096           rv = app_send_stream (s, utm->rx_buf, vec_len (utm->rx_buf), 0);
1097         }
1098       while (rv == SVM_FIFO_FULL);
1099     }
1100 }
1101
1102 void
1103 server_handle_event_queue (udp_echo_main_t * utm)
1104 {
1105   session_event_t *e;
1106   svm_msg_q_msg_t msg;
1107
1108   while (utm->state != STATE_READY)
1109     sleep (5);
1110
1111   while (1)
1112     {
1113       if (svm_msg_q_sub (utm->our_event_queue, &msg, SVM_Q_WAIT, 0))
1114         {
1115           clib_warning ("svm msg q returned");
1116           continue;
1117         }
1118       e = svm_msg_q_msg_data (utm->our_event_queue, &msg);
1119       switch (e->event_type)
1120         {
1121         case FIFO_EVENT_APP_RX:
1122           server_handle_fifo_event_rx (utm, e);
1123           break;
1124
1125         default:
1126           handle_mq_event (e);
1127           break;
1128         }
1129       svm_msg_q_free_msg (utm->our_event_queue, &msg);
1130       if (PREDICT_FALSE (utm->time_to_stop == 1))
1131         return;
1132       if (PREDICT_FALSE (utm->time_to_print_stats == 1))
1133         {
1134           utm->time_to_print_stats = 0;
1135           fformat (stdout, "%d connections\n", pool_elts (utm->sessions));
1136         }
1137     }
1138 }
1139
1140 static void
1141 server_unbind (udp_echo_main_t * utm)
1142 {
1143   vl_api_unbind_uri_t *ump;
1144
1145   ump = vl_msg_api_alloc (sizeof (*ump));
1146   memset (ump, 0, sizeof (*ump));
1147
1148   ump->_vl_msg_id = ntohs (VL_API_UNBIND_URI);
1149   ump->client_index = utm->my_client_index;
1150   memcpy (ump->uri, utm->listen_uri, vec_len (utm->listen_uri));
1151   vl_msg_api_send_shmem (utm->vl_input_queue, (u8 *) & ump);
1152 }
1153
1154 static void
1155 server_bind (udp_echo_main_t * utm)
1156 {
1157   vl_api_bind_uri_t *bmp;
1158
1159   bmp = vl_msg_api_alloc (sizeof (*bmp));
1160   memset (bmp, 0, sizeof (*bmp));
1161
1162   bmp->_vl_msg_id = ntohs (VL_API_BIND_URI);
1163   bmp->client_index = utm->my_client_index;
1164   bmp->context = ntohl (0xfeedface);
1165   memcpy (bmp->uri, utm->listen_uri, vec_len (utm->listen_uri));
1166   vl_msg_api_send_shmem (utm->vl_input_queue, (u8 *) & bmp);
1167 }
1168
1169 void
1170 udp_server_test (udp_echo_main_t * utm)
1171 {
1172   u8 wait_for_state = utm->is_connected ? STATE_BOUND : STATE_READY;
1173   application_send_attach (utm);
1174
1175   /* Bind to uri */
1176   server_bind (utm);
1177
1178   if (wait_for_state_change (utm, wait_for_state))
1179     {
1180       clib_warning ("timeout waiting for state change");
1181       return;
1182     }
1183
1184   server_handle_event_queue (utm);
1185
1186   /* Cleanup */
1187   server_unbind (utm);
1188
1189   if (wait_for_state_change (utm, STATE_START))
1190     {
1191       clib_warning ("timeout waiting for STATE_START");
1192       return;
1193     }
1194
1195   application_detach (utm);
1196
1197   fformat (stdout, "Test complete...\n");
1198 }
1199
1200 int
1201 main (int argc, char **argv)
1202 {
1203   udp_echo_main_t *utm = &udp_echo_main;
1204   u8 *uri = (u8 *) "udp://6.0.1.1/1234";
1205   unformat_input_t _argv, *a = &_argv;
1206   int i_am_server = 1;
1207   app_session_t *session;
1208   u8 *chroot_prefix;
1209   char *app_name;
1210   mheap_t *h;
1211   u8 *heap;
1212   u32 tmp;
1213   int i;
1214
1215   clib_mem_init (0, 256 << 20);
1216   heap = clib_mem_get_per_cpu_heap ();
1217   h = mheap_header (heap);
1218   /* make the main heap thread-safe */
1219   h->flags |= MHEAP_FLAG_THREAD_SAFE;
1220   svm_fifo_segment_main_init (0x200000000ULL, 20);
1221
1222   vec_validate (utm->rx_buf, 8192);
1223   utm->session_index_by_vpp_handles = hash_create (0, sizeof (uword));
1224   utm->my_pid = getpid ();
1225   utm->configured_segment_size = 1 << 20;
1226   utm->segments_table = hash_create_vec (0, sizeof (u8), sizeof (u64));
1227   utm->have_return = 1;
1228   utm->bytes_to_send = 1024;
1229   utm->fifo_size = 128 << 10;
1230   utm->segment_main = &svm_fifo_segment_main;
1231   utm->cut_through_session_index = ~0;
1232   clib_time_init (&utm->clib_time);
1233
1234   init_error_string_table (utm);
1235   unformat_init_command_line (a, argv);
1236
1237   while (unformat_check_input (a) != UNFORMAT_END_OF_INPUT)
1238     {
1239       if (unformat (a, "chroot prefix %s", &chroot_prefix))
1240         {
1241           vl_set_memory_root_path ((char *) chroot_prefix);
1242         }
1243       else if (unformat (a, "uri %s", &uri))
1244         ;
1245       else if (unformat (a, "segment-size %dM", &tmp))
1246         utm->configured_segment_size = tmp << 20;
1247       else if (unformat (a, "segment-size %dG", &tmp))
1248         utm->configured_segment_size = tmp << 30;
1249       else if (unformat (a, "server"))
1250         i_am_server = 1;
1251       else if (unformat (a, "client"))
1252         i_am_server = 0;
1253       else if (unformat (a, "no-return"))
1254         utm->have_return = 0;
1255       else if (unformat (a, "mbytes %d", &tmp))
1256         utm->bytes_to_send = (u64) tmp << 20;
1257       else if (unformat (a, "fifo-size %d", &tmp))
1258         utm->fifo_size = tmp << 10;
1259       else
1260         {
1261           fformat (stderr, "%s: usage [server|client]\n");
1262           exit (1);
1263         }
1264     }
1265
1266   utm->i_am_server = i_am_server;
1267
1268   setup_signal_handlers ();
1269   tcp_echo_api_hookup (utm);
1270
1271   if (i_am_server)
1272     {
1273       utm->listen_uri = format (0, "%s%c", uri, 0);
1274       utm->is_connected = (utm->listen_uri[4] == 'c');
1275       app_name = "udp_echo_server";
1276     }
1277   else
1278     {
1279       app_name = "udp_echo_client";
1280       utm->connect_uri = format (0, "%s%c", uri, 0);
1281       utm->is_connected = (utm->connect_uri[4] == 'c');
1282     }
1283   if (connect_to_vpp (app_name) < 0)
1284     {
1285       svm_region_exit ();
1286       fformat (stderr, "Couldn't connect to vpe, exiting...\n");
1287       exit (1);
1288     }
1289
1290   if (i_am_server == 0)
1291     {
1292       client_test (utm);
1293       goto done;
1294     }
1295
1296   /* $$$$ hack preallocation */
1297   for (i = 0; i < 200000; i++)
1298     {
1299       pool_get (utm->sessions, session);
1300       memset (session, 0, sizeof (*session));
1301     }
1302   for (i = 0; i < 200000; i++)
1303     pool_put_index (utm->sessions, i);
1304
1305   udp_server_test (utm);
1306
1307 done:
1308   vl_client_disconnect_from_vlib ();
1309   exit (0);
1310 }
1311
1312 #undef vl_api_version
1313 #define vl_api_version(n,v) static u32 vpe_api_version = v;
1314 #include <vpp/api/vpe.api.h>
1315 #undef vl_api_version
1316
1317 void
1318 vl_client_add_api_signatures (vl_api_memclnt_create_t * mp)
1319 {
1320   /*
1321    * Send the main API signature in slot 0. This bit of code must
1322    * match the checks in ../vpe/api/api.c: vl_msg_api_version_check().
1323    */
1324   mp->api_versions[0] = clib_host_to_net_u32 (vpe_api_version);
1325 }
1326
1327 u32
1328 vl (void *p)
1329 {
1330   return vec_len (p);
1331 }
1332
1333 /*
1334  * fd.io coding-style-patch-verification: ON
1335  *
1336  * Local Variables:
1337  * eval: (c-set-style "gnu")
1338  * End:
1339  */