e6c239c1013ade2680e3abbd704ae628988e9595
[vpp.git] / src / uri / uri_udp_test.c
1 /*
2  * Copyright (c) 2016 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include <stdio.h>
17 #include <setjmp.h>
18 #include <signal.h>
19 #include <vppinfra/clib.h>
20 #include <vppinfra/format.h>
21 #include <vppinfra/error.h>
22 #include <vppinfra/time.h>
23 #include <vppinfra/macros.h>
24 #include <vnet/vnet.h>
25 #include <vlib/vlib.h>
26 #include <vlib/unix/unix.h>
27 #include <vlibapi/api.h>
28 #include <vlibmemory/api.h>
29 #include <vpp/api/vpe_msg_enum.h>
30 #include <svm/svm_fifo_segment.h>
31 #include <pthread.h>
32 #include <vnet/session/application_interface.h>
33
34 #define vl_typedefs             /* define message structures */
35 #include <vpp/api/vpe_all_api_h.h>
36 #undef vl_typedefs
37
38 /* declare message handlers for each api */
39
40 #define vl_endianfun            /* define message structures */
41 #include <vpp/api/vpe_all_api_h.h>
42 #undef vl_endianfun
43
44 /* instantiate all the print functions we know about */
45 #define vl_print(handle, ...)
46 #define vl_printfun
47 #include <vpp/api/vpe_all_api_h.h>
48 #undef vl_printfun
49
50 /* Satisfy external references when not linking with -lvlib */
51 vlib_main_t vlib_global_main;
52 vlib_main_t **vlib_mains;
53
54 typedef enum
55 {
56   STATE_START,
57   STATE_READY,
58   STATE_DISCONNECTING,
59 } connection_state_t;
60
61 typedef struct
62 {
63   svm_fifo_t *server_rx_fifo;
64   svm_fifo_t *server_tx_fifo;
65 } session_t;
66
67 typedef struct
68 {
69   /* vpe input queue */
70   unix_shared_memory_queue_t *vl_input_queue;
71
72   /* API client handle */
73   u32 my_client_index;
74
75   /* The URI we're playing with */
76   u8 *uri;
77
78   /* Session pool */
79   session_t *sessions;
80
81   /* Hash table for disconnect processing */
82   uword *session_index_by_vpp_handles;
83
84   /* fifo segment */
85   svm_fifo_segment_private_t *seg;
86
87   /* intermediate rx buffer */
88   u8 *rx_buf;
89
90   /* URI for connect */
91   u8 *connect_uri;
92
93   int i_am_master;
94
95   /* Our event queue */
96   unix_shared_memory_queue_t *our_event_queue;
97
98   /* $$$ single thread only for the moment */
99   unix_shared_memory_queue_t *vpp_event_queue;
100
101   /* $$$$ hack: cut-through session index */
102   volatile u32 cut_through_session_index;
103
104   /* unique segment name counter */
105   u32 unique_segment_index;
106
107   pid_t my_pid;
108
109   /* pthread handle */
110   pthread_t cut_through_thread_handle;
111
112   /* For deadman timers */
113   clib_time_t clib_time;
114
115   /* State of the connection, shared between msg RX thread and main thread */
116   volatile connection_state_t state;
117
118   volatile int time_to_stop;
119   volatile int time_to_print_stats;
120
121   u32 configured_segment_size;
122
123   /* VNET_API_ERROR_FOO -> "Foo" hash table */
124   uword *error_string_by_error_number;
125
126   /* convenience */
127   svm_fifo_segment_main_t *segment_main;
128
129 } uri_udp_test_main_t;
130
131 #if CLIB_DEBUG > 0
132 #define NITER 10000
133 #else
134 #define NITER 4000000
135 #endif
136
137 uri_udp_test_main_t uri_udp_test_main;
138
139 static void
140 stop_signal (int signum)
141 {
142   uri_udp_test_main_t *um = &uri_udp_test_main;
143
144   um->time_to_stop = 1;
145 }
146
147 static void
148 stats_signal (int signum)
149 {
150   uri_udp_test_main_t *um = &uri_udp_test_main;
151
152   um->time_to_print_stats = 1;
153 }
154
155 static clib_error_t *
156 setup_signal_handlers (void)
157 {
158   signal (SIGINT, stats_signal);
159   signal (SIGQUIT, stop_signal);
160   signal (SIGTERM, stop_signal);
161
162   return 0;
163 }
164
165 u8 *
166 format_api_error (u8 * s, va_list * args)
167 {
168   uri_udp_test_main_t *utm = va_arg (*args, uri_udp_test_main_t *);
169   i32 error = va_arg (*args, u32);
170   uword *p;
171
172   p = hash_get (utm->error_string_by_error_number, -error);
173
174   if (p)
175     s = format (s, "%s", p[0]);
176   else
177     s = format (s, "%d", error);
178   return s;
179 }
180
181 int
182 wait_for_state_change (uri_udp_test_main_t * utm, connection_state_t state)
183 {
184 #if CLIB_DEBUG > 0
185 #define TIMEOUT 600.0
186 #else
187 #define TIMEOUT 600.0
188 #endif
189
190   f64 timeout = clib_time_now (&utm->clib_time) + TIMEOUT;
191
192   while (clib_time_now (&utm->clib_time) < timeout)
193     {
194       if (utm->state == state)
195         return 0;
196     }
197   return -1;
198 }
199
200 u64 server_bytes_received, server_bytes_sent;
201
202 static void *
203 cut_through_thread_fn (void *arg)
204 {
205   session_t *s;
206   svm_fifo_t *rx_fifo;
207   svm_fifo_t *tx_fifo;
208   u8 *my_copy_buffer = 0;
209   uri_udp_test_main_t *utm = &uri_udp_test_main;
210   i32 actual_transfer;
211   int rv;
212   u32 buffer_offset;
213
214   while (utm->cut_through_session_index == ~0)
215     ;
216
217   s = pool_elt_at_index (utm->sessions, utm->cut_through_session_index);
218
219   rx_fifo = s->server_rx_fifo;
220   tx_fifo = s->server_tx_fifo;
221
222   vec_validate (my_copy_buffer, 64 * 1024 - 1);
223
224   while (true)
225     {
226       /* We read from the tx fifo and write to the rx fifo */
227       do
228         {
229           actual_transfer = svm_fifo_dequeue_nowait (tx_fifo, 0,
230                                                      vec_len (my_copy_buffer),
231                                                      my_copy_buffer);
232         }
233       while (actual_transfer <= 0);
234
235       server_bytes_received += actual_transfer;
236
237       buffer_offset = 0;
238       while (actual_transfer > 0)
239         {
240           rv = svm_fifo_enqueue_nowait (rx_fifo, 0, actual_transfer,
241                                         my_copy_buffer + buffer_offset);
242           if (rv > 0)
243             {
244               actual_transfer -= rv;
245               buffer_offset += rv;
246               server_bytes_sent += rv;
247             }
248
249         }
250       if (PREDICT_FALSE (utm->time_to_stop))
251         break;
252     }
253
254   pthread_exit (0);
255 }
256
257 static void
258 uri_udp_slave_test (uri_udp_test_main_t * utm)
259 {
260   vl_api_connect_uri_t *cmp;
261   int i;
262   u8 *test_data = 0;
263   u64 bytes_received = 0, bytes_sent = 0;
264   i32 bytes_to_read;
265   int rv;
266   int mypid = getpid ();
267   f64 before, after, delta, bytes_per_second;
268   session_t *session;
269   svm_fifo_t *rx_fifo, *tx_fifo;
270   int buffer_offset, bytes_to_send = 0;
271
272   vec_validate (test_data, 64 * 1024 - 1);
273   for (i = 0; i < vec_len (test_data); i++)
274     test_data[i] = i & 0xff;
275
276   cmp = vl_msg_api_alloc (sizeof (*cmp));
277   memset (cmp, 0, sizeof (*cmp));
278
279   cmp->_vl_msg_id = ntohs (VL_API_CONNECT_URI);
280   cmp->client_index = utm->my_client_index;
281   cmp->context = ntohl (0xfeedface);
282   memcpy (cmp->uri, utm->connect_uri, vec_len (utm->connect_uri));
283   vl_msg_api_send_shmem (utm->vl_input_queue, (u8 *) & cmp);
284
285   if (wait_for_state_change (utm, STATE_READY))
286     {
287       clib_warning ("timeout waiting for STATE_READY");
288       return;
289     }
290
291   session = pool_elt_at_index (utm->sessions, utm->cut_through_session_index);
292   rx_fifo = session->server_rx_fifo;
293   tx_fifo = session->server_tx_fifo;
294
295   before = clib_time_now (&utm->clib_time);
296
297   vec_validate (utm->rx_buf, vec_len (test_data) - 1);
298
299   for (i = 0; i < NITER; i++)
300     {
301       bytes_to_send = vec_len (test_data);
302       buffer_offset = 0;
303       while (bytes_to_send > 0)
304         {
305           rv = svm_fifo_enqueue_nowait (tx_fifo, mypid,
306                                         bytes_to_send,
307                                         test_data + buffer_offset);
308
309           if (rv > 0)
310             {
311               bytes_to_send -= rv;
312               buffer_offset += rv;
313               bytes_sent += rv;
314             }
315         }
316
317       bytes_to_read = svm_fifo_max_dequeue (rx_fifo);
318
319       bytes_to_read = vec_len (utm->rx_buf) > bytes_to_read ?
320         bytes_to_read : vec_len (utm->rx_buf);
321
322       buffer_offset = 0;
323       while (bytes_to_read > 0)
324         {
325           rv = svm_fifo_dequeue_nowait (rx_fifo, mypid,
326                                         bytes_to_read,
327                                         utm->rx_buf + buffer_offset);
328           if (rv > 0)
329             {
330               bytes_to_read -= rv;
331               buffer_offset += rv;
332               bytes_received += rv;
333             }
334         }
335     }
336   while (bytes_received < bytes_sent)
337     {
338       rv = svm_fifo_dequeue_nowait (rx_fifo, mypid,
339                                     vec_len (utm->rx_buf), utm->rx_buf);
340       if (rv > 0)
341         {
342 #if CLIB_DEBUG > 0
343           int j;
344           for (j = 0; j < rv; j++)
345             {
346               if (utm->rx_buf[j] != ((bytes_received + j) & 0xff))
347                 {
348                   clib_warning ("error at byte %lld, 0x%x not 0x%x",
349                                 bytes_received + j,
350                                 utm->rx_buf[j],
351                                 ((bytes_received + j) & 0xff));
352                 }
353             }
354 #endif
355           bytes_received += (u64) rv;
356         }
357     }
358
359   after = clib_time_now (&utm->clib_time);
360   delta = after - before;
361   bytes_per_second = 0.0;
362
363   if (delta > 0.0)
364     bytes_per_second = (f64) bytes_received / delta;
365
366   fformat (stdout,
367            "Done: %lld recv bytes in %.2f seconds, %.2f bytes/sec...\n\n",
368            bytes_received, delta, bytes_per_second);
369   fformat (stdout,
370            "Done: %lld sent bytes in %.2f seconds, %.2f bytes/sec...\n\n",
371            bytes_sent, delta, bytes_per_second);
372   fformat (stdout,
373            "client -> server -> client round trip: %.2f Gbit/sec \n\n",
374            (bytes_per_second * 8.0) / 1e9);
375 }
376
377 static void
378 vl_api_bind_uri_reply_t_handler (vl_api_bind_uri_reply_t * mp)
379 {
380   uri_udp_test_main_t *utm = &uri_udp_test_main;
381   svm_fifo_segment_create_args_t _a, *a = &_a;
382   int rv;
383
384   if (mp->segment_name_length == 0)
385     {
386       clib_warning ("segment_name_length zero");
387       return;
388     }
389
390   a->segment_name = (char *) mp->segment_name;
391   a->segment_size = mp->segment_size;
392
393   ASSERT (mp->server_event_queue_address);
394
395   /* Attach to the segment vpp created */
396   rv = svm_fifo_segment_attach (a);
397   if (rv)
398     {
399       clib_warning ("svm_fifo_segment_attach ('%s') failed",
400                     mp->segment_name);
401       return;
402     }
403
404   utm->our_event_queue = (unix_shared_memory_queue_t *)
405     mp->server_event_queue_address;
406
407   utm->state = STATE_READY;
408 }
409
410 static void
411 vl_api_map_another_segment_t_handler (vl_api_map_another_segment_t * mp)
412 {
413   svm_fifo_segment_create_args_t _a, *a = &_a;
414   int rv;
415
416   a->segment_name = (char *) mp->segment_name;
417   a->segment_size = mp->segment_size;
418   /* Attach to the segment vpp created */
419   rv = svm_fifo_segment_attach (a);
420   if (rv)
421     {
422       clib_warning ("svm_fifo_segment_attach ('%s') failed",
423                     mp->segment_name);
424       return;
425     }
426   clib_warning ("Mapped new segment '%s' size %d", mp->segment_name,
427                 mp->segment_size);
428 }
429
430 static void
431 vl_api_connect_uri_t_handler (vl_api_connect_uri_t * mp)
432 {
433   u32 segment_index;
434   uri_udp_test_main_t *utm = &uri_udp_test_main;
435   svm_fifo_segment_main_t *sm = &svm_fifo_segment_main;
436   svm_fifo_segment_create_args_t _a, *a = &_a;
437   svm_fifo_segment_private_t *seg;
438   unix_shared_memory_queue_t *client_q;
439   vl_api_connect_uri_reply_t *rmp;
440   session_t *session;
441   int rv = 0;
442
443   /* Create the segment */
444   a->segment_name = (char *) format (0, "%d:segment%d%c", utm->my_pid,
445                                      utm->unique_segment_index++, 0);
446   a->segment_size = utm->configured_segment_size;
447
448   rv = svm_fifo_segment_create (a);
449   if (rv)
450     {
451       clib_warning ("sm_fifo_segment_create ('%s') failed", a->segment_name);
452       rv = VNET_API_ERROR_URI_FIFO_CREATE_FAILED;
453       goto send_reply;
454     }
455
456   vec_add2 (utm->seg, seg, 1);
457
458   segment_index = vec_len (sm->segments) - 1;
459
460   memcpy (seg, sm->segments + segment_index, sizeof (utm->seg[0]));
461
462   pool_get (utm->sessions, session);
463
464   /*
465    * By construction the master's idea of the rx fifo ends up in
466    * fsh->fifos[0], and the master's idea of the tx fifo ends up in
467    * fsh->fifos[1].
468    */
469   session->server_rx_fifo = svm_fifo_segment_alloc_fifo (utm->seg,
470                                                          128 * 1024);
471   ASSERT (session->server_rx_fifo);
472
473   session->server_tx_fifo = svm_fifo_segment_alloc_fifo (utm->seg,
474                                                          128 * 1024);
475   ASSERT (session->server_tx_fifo);
476
477   session->server_rx_fifo->server_session_index = session - utm->sessions;
478   session->server_tx_fifo->server_session_index = session - utm->sessions;
479   utm->cut_through_session_index = session - utm->sessions;
480
481   rv = pthread_create (&utm->cut_through_thread_handle,
482                        NULL /*attr */ , cut_through_thread_fn, 0);
483   if (rv)
484     {
485       clib_warning ("pthread_create returned %d", rv);
486       rv = VNET_API_ERROR_SYSCALL_ERROR_1;
487     }
488
489 send_reply:
490   rmp = vl_msg_api_alloc (sizeof (*rmp));
491   memset (rmp, 0, sizeof (*rmp));
492
493   rmp->_vl_msg_id = ntohs (VL_API_CONNECT_URI_REPLY);
494   rmp->context = mp->context;
495   rmp->retval = ntohl (rv);
496   rmp->segment_name_length = vec_len (a->segment_name);
497   memcpy (rmp->segment_name, a->segment_name, vec_len (a->segment_name));
498
499   vec_free (a->segment_name);
500
501   client_q = (unix_shared_memory_queue_t *) mp->client_queue_address;
502   vl_msg_api_send_shmem (client_q, (u8 *) & rmp);
503 }
504
505 static void
506 vl_api_unbind_uri_reply_t_handler (vl_api_unbind_uri_reply_t * mp)
507 {
508   uri_udp_test_main_t *utm = &uri_udp_test_main;
509
510   if (mp->retval != 0)
511     clib_warning ("returned %d", ntohl (mp->retval));
512
513   utm->state = STATE_START;
514 }
515
516 static void
517 vl_api_accept_session_t_handler (vl_api_accept_session_t * mp)
518 {
519   uri_udp_test_main_t *utm = &uri_udp_test_main;
520   vl_api_accept_session_reply_t *rmp;
521   svm_fifo_t *rx_fifo, *tx_fifo;
522   session_t *session;
523   static f64 start_time;
524   u64 key;
525
526   if (start_time == 0.0)
527     start_time = clib_time_now (&utm->clib_time);
528
529   utm->vpp_event_queue = (unix_shared_memory_queue_t *)
530     mp->vpp_event_queue_address;
531
532   pool_get (utm->sessions, session);
533
534   rx_fifo = (svm_fifo_t *) mp->server_rx_fifo;
535   rx_fifo->client_session_index = session - utm->sessions;
536   tx_fifo = (svm_fifo_t *) mp->server_tx_fifo;
537   tx_fifo->client_session_index = session - utm->sessions;
538
539   session->server_rx_fifo = rx_fifo;
540   session->server_tx_fifo = tx_fifo;
541
542   key = (((u64) mp->session_thread_index) << 32) | (u64) mp->session_index;
543
544   hash_set (utm->session_index_by_vpp_handles, key, session - utm->sessions);
545
546   utm->state = STATE_READY;
547
548   if (pool_elts (utm->sessions) && (pool_elts (utm->sessions) % 20000) == 0)
549     {
550       f64 now = clib_time_now (&utm->clib_time);
551       fformat (stdout, "%d active sessions in %.2f seconds, %.2f/sec...\n",
552                pool_elts (utm->sessions), now - start_time,
553                (f64) pool_elts (utm->sessions) / (now - start_time));
554     }
555
556   rmp = vl_msg_api_alloc (sizeof (*rmp));
557   memset (rmp, 0, sizeof (*rmp));
558   rmp->_vl_msg_id = ntohs (VL_API_ACCEPT_SESSION_REPLY);
559   rmp->session_type = mp->session_type;
560   rmp->session_index = mp->session_index;
561   rmp->session_thread_index = mp->session_thread_index;
562   vl_msg_api_send_shmem (utm->vl_input_queue, (u8 *) & rmp);
563 }
564
565 static void
566 vl_api_disconnect_session_t_handler (vl_api_disconnect_session_t * mp)
567 {
568   uri_udp_test_main_t *utm = &uri_udp_test_main;
569   session_t *session;
570   vl_api_disconnect_session_reply_t *rmp;
571   uword *p;
572   int rv = 0;
573   u64 key;
574
575   key = (((u64) mp->session_thread_index) << 32) | (u64) mp->session_index;
576
577   p = hash_get (utm->session_index_by_vpp_handles, key);
578
579   if (p)
580     {
581       session = pool_elt_at_index (utm->sessions, p[0]);
582       hash_unset (utm->session_index_by_vpp_handles, key);
583       pool_put (utm->sessions, session);
584     }
585   else
586     {
587       clib_warning ("couldn't find session key %llx", key);
588       rv = -11;
589     }
590
591   rmp = vl_msg_api_alloc (sizeof (*rmp));
592   memset (rmp, 0, sizeof (*rmp));
593   rmp->_vl_msg_id = ntohs (VL_API_DISCONNECT_SESSION_REPLY);
594   rmp->retval = rv;
595   rmp->session_index = mp->session_index;
596   rmp->session_thread_index = mp->session_thread_index;
597   vl_msg_api_send_shmem (utm->vl_input_queue, (u8 *) & rmp);
598 }
599
600 static void
601 vl_api_connect_uri_reply_t_handler (vl_api_connect_uri_reply_t * mp)
602 {
603   svm_fifo_segment_main_t *sm = &svm_fifo_segment_main;
604   uri_udp_test_main_t *utm = &uri_udp_test_main;
605   svm_fifo_segment_create_args_t _a, *a = &_a;
606   ssvm_shared_header_t *sh;
607   svm_fifo_segment_private_t *seg;
608   svm_fifo_segment_header_t *fsh;
609   session_t *session;
610   u32 segment_index;
611   int rv;
612
613   ASSERT (utm->i_am_master == 0);
614
615   if (mp->segment_name_length == 0)
616     {
617       clib_warning ("segment_name_length zero");
618       return;
619     }
620
621   memset (a, 0, sizeof (*a));
622
623   a->segment_name = (char *) mp->segment_name;
624
625   sleep (1);
626
627   rv = svm_fifo_segment_attach (a);
628   if (rv)
629     {
630       clib_warning ("sm_fifo_segment_create ('%v') failed", mp->segment_name);
631       return;
632     }
633
634   segment_index = vec_len (sm->segments) - 1;
635
636   vec_add2 (utm->seg, seg, 1);
637
638   memcpy (seg, sm->segments + segment_index, sizeof (*seg));
639   sh = seg->ssvm.sh;
640   fsh = (svm_fifo_segment_header_t *) sh->opaque[0];
641
642   while (vec_len (fsh->fifos) < 2)
643     sleep (1);
644
645   pool_get (utm->sessions, session);
646   utm->cut_through_session_index = session - utm->sessions;
647
648   session->server_rx_fifo = (svm_fifo_t *) fsh->fifos[0];
649   ASSERT (session->server_rx_fifo);
650   session->server_tx_fifo = (svm_fifo_t *) fsh->fifos[1];
651   ASSERT (session->server_tx_fifo);
652
653   /* security: could unlink /dev/shm/<mp->segment_name> here, maybe */
654
655   utm->state = STATE_READY;
656 }
657
658 #define foreach_uri_msg                         \
659 _(BIND_URI_REPLY, bind_uri_reply)               \
660 _(CONNECT_URI, connect_uri)                     \
661 _(CONNECT_URI_REPLY, connect_uri_reply)         \
662 _(UNBIND_URI_REPLY, unbind_uri_reply)           \
663 _(ACCEPT_SESSION, accept_session)               \
664 _(DISCONNECT_SESSION, disconnect_session)       \
665 _(MAP_ANOTHER_SEGMENT, map_another_segment)
666
667 void
668 uri_api_hookup (uri_udp_test_main_t * utm)
669 {
670 #define _(N,n)                                                  \
671     vl_msg_api_set_handlers(VL_API_##N, #n,                     \
672                            vl_api_##n##_t_handler,              \
673                            vl_noop_handler,                     \
674                            vl_api_##n##_t_endian,               \
675                            vl_api_##n##_t_print,                \
676                            sizeof(vl_api_##n##_t), 1);
677   foreach_uri_msg;
678 #undef _
679
680 }
681
682
683 int
684 connect_to_vpp (char *name)
685 {
686   uri_udp_test_main_t *utm = &uri_udp_test_main;
687   api_main_t *am = &api_main;
688
689   if (vl_client_connect_to_vlib ("/vpe-api", name, 32) < 0)
690     return -1;
691
692   utm->vl_input_queue = am->shmem_hdr->vl_input_queue;
693   utm->my_client_index = am->my_client_index;
694
695   return 0;
696 }
697
698 void
699 vlib_cli_output (struct vlib_main_t *vm, char *fmt, ...)
700 {
701   clib_warning ("BUG");
702 }
703
704 static void
705 init_error_string_table (uri_udp_test_main_t * utm)
706 {
707   utm->error_string_by_error_number = hash_create (0, sizeof (uword));
708
709 #define _(n,v,s) hash_set (utm->error_string_by_error_number, -v, s);
710   foreach_vnet_api_error;
711 #undef _
712
713   hash_set (utm->error_string_by_error_number, 99, "Misc");
714 }
715
716 void
717 server_handle_fifo_event_rx (uri_udp_test_main_t * utm,
718                              session_fifo_event_t * e)
719 {
720   svm_fifo_t *rx_fifo, *tx_fifo;
721   int nbytes;
722
723   session_fifo_event_t evt;
724   unix_shared_memory_queue_t *q;
725   int rv;
726
727   rx_fifo = e->fifo;
728   tx_fifo = utm->sessions[rx_fifo->client_session_index].server_tx_fifo;
729
730   do
731     {
732       nbytes = svm_fifo_dequeue_nowait (rx_fifo, 0,
733                                         vec_len (utm->rx_buf), utm->rx_buf);
734     }
735   while (nbytes <= 0);
736   do
737     {
738       rv = svm_fifo_enqueue_nowait (tx_fifo, 0, nbytes, utm->rx_buf);
739     }
740   while (rv == -2);
741
742   /* Fabricate TX event, send to vpp */
743   evt.fifo = tx_fifo;
744   evt.event_type = FIFO_EVENT_SERVER_TX;
745   evt.event_id = e->event_id;
746
747   if (svm_fifo_set_event (tx_fifo))
748     {
749       q = utm->vpp_event_queue;
750       unix_shared_memory_queue_add (q, (u8 *) & evt,
751                                     0 /* do wait for mutex */ );
752     }
753 }
754
755 void
756 server_handle_event_queue (uri_udp_test_main_t * utm)
757 {
758   session_fifo_event_t _e, *e = &_e;
759
760   while (1)
761     {
762       unix_shared_memory_queue_sub (utm->our_event_queue, (u8 *) e,
763                                     0 /* nowait */ );
764       switch (e->event_type)
765         {
766         case FIFO_EVENT_SERVER_RX:
767           server_handle_fifo_event_rx (utm, e);
768           break;
769
770         case FIFO_EVENT_SERVER_EXIT:
771           return;
772
773         default:
774           clib_warning ("unknown event type %d", e->event_type);
775           break;
776         }
777       if (PREDICT_FALSE (utm->time_to_stop == 1))
778         break;
779       if (PREDICT_FALSE (utm->time_to_print_stats == 1))
780         {
781           utm->time_to_print_stats = 0;
782           fformat (stdout, "%d connections\n", pool_elts (utm->sessions));
783         }
784     }
785 }
786
787 void
788 uri_udp_test (uri_udp_test_main_t * utm)
789 {
790   vl_api_bind_uri_t *bmp;
791   vl_api_unbind_uri_t *ump;
792
793   bmp = vl_msg_api_alloc (sizeof (*bmp));
794   memset (bmp, 0, sizeof (*bmp));
795
796   bmp->_vl_msg_id = ntohs (VL_API_BIND_URI);
797   bmp->client_index = utm->my_client_index;
798   bmp->context = ntohl (0xfeedface);
799   bmp->initial_segment_size = 256 << 20;        /* size of initial segment */
800   bmp->options[SESSION_OPTIONS_FLAGS] =
801     SESSION_OPTIONS_FLAGS_USE_FIFO | SESSION_OPTIONS_FLAGS_ADD_SEGMENT;
802   bmp->options[SESSION_OPTIONS_RX_FIFO_SIZE] = 16 << 10;
803   bmp->options[SESSION_OPTIONS_TX_FIFO_SIZE] = 16 << 10;
804   bmp->options[SESSION_OPTIONS_ADD_SEGMENT_SIZE] = 128 << 20;
805   memcpy (bmp->uri, utm->uri, vec_len (utm->uri));
806   vl_msg_api_send_shmem (utm->vl_input_queue, (u8 *) & bmp);
807
808   if (wait_for_state_change (utm, STATE_READY))
809     {
810       clib_warning ("timeout waiting for STATE_READY");
811       return;
812     }
813
814   server_handle_event_queue (utm);
815
816   ump = vl_msg_api_alloc (sizeof (*ump));
817   memset (ump, 0, sizeof (*ump));
818
819   ump->_vl_msg_id = ntohs (VL_API_UNBIND_URI);
820   ump->client_index = utm->my_client_index;
821   memcpy (ump->uri, utm->uri, vec_len (utm->uri));
822   vl_msg_api_send_shmem (utm->vl_input_queue, (u8 *) & ump);
823
824   if (wait_for_state_change (utm, STATE_START))
825     {
826       clib_warning ("timeout waiting for STATE_START");
827       return;
828     }
829
830   fformat (stdout, "Test complete...\n");
831 }
832
833 int
834 main (int argc, char **argv)
835 {
836   uri_udp_test_main_t *utm = &uri_udp_test_main;
837   unformat_input_t _argv, *a = &_argv;
838   u8 *chroot_prefix;
839   u8 *heap;
840   u8 *bind_name = (u8 *) "udp://0.0.0.0/1234";
841   u32 tmp;
842   mheap_t *h;
843   session_t *session;
844   int i;
845   int i_am_master = 1;
846
847   clib_mem_init (0, 256 << 20);
848
849   heap = clib_mem_get_per_cpu_heap ();
850   h = mheap_header (heap);
851
852   /* make the main heap thread-safe */
853   h->flags |= MHEAP_FLAG_THREAD_SAFE;
854
855   vec_validate (utm->rx_buf, 8192);
856
857   utm->session_index_by_vpp_handles = hash_create (0, sizeof (uword));
858
859   utm->my_pid = getpid ();
860   utm->configured_segment_size = 1 << 20;
861
862   clib_time_init (&utm->clib_time);
863   init_error_string_table (utm);
864   svm_fifo_segment_init (0x200000000ULL, 20);
865   unformat_init_command_line (a, argv);
866
867   while (unformat_check_input (a) != UNFORMAT_END_OF_INPUT)
868     {
869       if (unformat (a, "chroot prefix %s", &chroot_prefix))
870         {
871           vl_set_memory_root_path ((char *) chroot_prefix);
872         }
873       else if (unformat (a, "uri %s", &bind_name))
874         ;
875       else if (unformat (a, "segment-size %dM", &tmp))
876         utm->configured_segment_size = tmp << 20;
877       else if (unformat (a, "segment-size %dG", &tmp))
878         utm->configured_segment_size = tmp << 30;
879       else if (unformat (a, "master"))
880         i_am_master = 1;
881       else if (unformat (a, "slave"))
882         i_am_master = 0;
883       else
884         {
885           fformat (stderr, "%s: usage [master|slave]\n");
886           exit (1);
887         }
888     }
889
890   utm->cut_through_session_index = ~0;
891   utm->uri = format (0, "%s%c", bind_name, 0);
892   utm->i_am_master = i_am_master;
893   utm->segment_main = &svm_fifo_segment_main;
894
895   utm->connect_uri = format (0, "udp://10.0.0.1/1234%c", 0);
896
897   setup_signal_handlers ();
898
899   uri_api_hookup (utm);
900
901   if (connect_to_vpp (i_am_master ? "uri_udp_master" : "uri_udp_slave") < 0)
902     {
903       svm_region_exit ();
904       fformat (stderr, "Couldn't connect to vpe, exiting...\n");
905       exit (1);
906     }
907
908   if (i_am_master == 0)
909     {
910       uri_udp_slave_test (utm);
911       exit (0);
912     }
913
914   /* $$$$ hack preallocation */
915   for (i = 0; i < 200000; i++)
916     {
917       pool_get (utm->sessions, session);
918       memset (session, 0, sizeof (*session));
919     }
920   for (i = 0; i < 200000; i++)
921     pool_put_index (utm->sessions, i);
922
923   uri_udp_test (utm);
924
925   vl_client_disconnect_from_vlib ();
926   exit (0);
927 }
928
929 #undef vl_api_version
930 #define vl_api_version(n,v) static u32 vpe_api_version = v;
931 #include <vpp/api/vpe.api.h>
932 #undef vl_api_version
933
934 void
935 vl_client_add_api_signatures (vl_api_memclnt_create_t * mp)
936 {
937   /*
938    * Send the main API signature in slot 0. This bit of code must
939    * match the checks in ../vpe/api/api.c: vl_msg_api_version_check().
940    */
941   mp->api_versions[0] = clib_host_to_net_u32 (vpe_api_version);
942 }
943
944 u32
945 vl (void *p)
946 {
947   return vec_len (p);
948 }
949
950 /*
951  * fd.io coding-style-patch-verification: ON
952  *
953  * Local Variables:
954  * eval: (c-set-style "gnu")
955  * End:
956  */