udp: remove connected udp transport proto
[vpp.git] / src / plugins / hs_apps / echo_client.c
1 /*
2  * echo_client.c - vpp built-in echo client code
3  *
4  * Copyright (c) 2017-2019 by Cisco and/or its affiliates.
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at:
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17
18 #include <vnet/vnet.h>
19 #include <vlibapi/api.h>
20 #include <vlibmemory/api.h>
21 #include <hs_apps/echo_client.h>
22
23 echo_client_main_t echo_client_main;
24
25 #define ECHO_CLIENT_DBG (0)
26 #define DBG(_fmt, _args...)                     \
27     if (ECHO_CLIENT_DBG)                                \
28       clib_warning (_fmt, ##_args)
29
30 static void
31 signal_evt_to_cli_i (int *code)
32 {
33   echo_client_main_t *ecm = &echo_client_main;
34   ASSERT (vlib_get_thread_index () == 0);
35   vlib_process_signal_event (ecm->vlib_main, ecm->cli_node_index, *code, 0);
36 }
37
38 static void
39 signal_evt_to_cli (int code)
40 {
41   if (vlib_get_thread_index () != 0)
42     vl_api_rpc_call_main_thread (signal_evt_to_cli_i, (u8 *) & code,
43                                  sizeof (code));
44   else
45     signal_evt_to_cli_i (&code);
46 }
47
48 static void
49 send_data_chunk (echo_client_main_t * ecm, eclient_session_t * s)
50 {
51   u8 *test_data = ecm->connect_test_data;
52   int test_buf_len, test_buf_offset, rv;
53   u32 bytes_this_chunk;
54
55   test_buf_len = vec_len (test_data);
56   ASSERT (test_buf_len > 0);
57   test_buf_offset = s->bytes_sent % test_buf_len;
58   bytes_this_chunk = clib_min (test_buf_len - test_buf_offset,
59                                s->bytes_to_send);
60
61   if (!ecm->is_dgram)
62     {
63       if (ecm->no_copy)
64         {
65           svm_fifo_t *f = s->data.tx_fifo;
66           rv = clib_min (svm_fifo_max_enqueue_prod (f), bytes_this_chunk);
67           svm_fifo_enqueue_nocopy (f, rv);
68           session_send_io_evt_to_thread_custom (&f->master_session_index,
69                                                 s->thread_index,
70                                                 SESSION_IO_EVT_TX);
71         }
72       else
73         rv = app_send_stream (&s->data, test_data + test_buf_offset,
74                               bytes_this_chunk, 0);
75     }
76   else
77     {
78       svm_fifo_t *f = s->data.tx_fifo;
79       u32 max_enqueue = svm_fifo_max_enqueue_prod (f);
80
81       if (max_enqueue < sizeof (session_dgram_hdr_t))
82         return;
83
84       max_enqueue -= sizeof (session_dgram_hdr_t);
85
86       if (ecm->no_copy)
87         {
88           session_dgram_hdr_t hdr;
89           app_session_transport_t *at = &s->data.transport;
90
91           rv = clib_min (max_enqueue, bytes_this_chunk);
92
93           hdr.data_length = rv;
94           hdr.data_offset = 0;
95           clib_memcpy_fast (&hdr.rmt_ip, &at->rmt_ip,
96                             sizeof (ip46_address_t));
97           hdr.is_ip4 = at->is_ip4;
98           hdr.rmt_port = at->rmt_port;
99           clib_memcpy_fast (&hdr.lcl_ip, &at->lcl_ip,
100                             sizeof (ip46_address_t));
101           hdr.lcl_port = at->lcl_port;
102           svm_fifo_enqueue (f, sizeof (hdr), (u8 *) & hdr);
103           svm_fifo_enqueue_nocopy (f, rv);
104           session_send_io_evt_to_thread_custom (&f->master_session_index,
105                                                 s->thread_index,
106                                                 SESSION_IO_EVT_TX);
107         }
108       else
109         {
110           bytes_this_chunk = clib_min (bytes_this_chunk, max_enqueue);
111           rv = app_send_dgram (&s->data, test_data + test_buf_offset,
112                                bytes_this_chunk, 0);
113         }
114     }
115
116   /* If we managed to enqueue data... */
117   if (rv > 0)
118     {
119       /* Account for it... */
120       s->bytes_to_send -= rv;
121       s->bytes_sent += rv;
122
123       if (ECHO_CLIENT_DBG)
124         {
125           /* *INDENT-OFF* */
126           ELOG_TYPE_DECLARE (e) =
127             {
128               .format = "tx-enq: xfer %d bytes, sent %u remain %u",
129               .format_args = "i4i4i4",
130             };
131           /* *INDENT-ON* */
132           struct
133           {
134             u32 data[3];
135           } *ed;
136           ed = ELOG_DATA (&vlib_global_main.elog_main, e);
137           ed->data[0] = rv;
138           ed->data[1] = s->bytes_sent;
139           ed->data[2] = s->bytes_to_send;
140         }
141     }
142 }
143
144 static void
145 receive_data_chunk (echo_client_main_t * ecm, eclient_session_t * s)
146 {
147   svm_fifo_t *rx_fifo = s->data.rx_fifo;
148   u32 thread_index = vlib_get_thread_index ();
149   int n_read, i;
150
151   if (ecm->test_bytes)
152     {
153       if (!ecm->is_dgram)
154         n_read = app_recv_stream (&s->data, ecm->rx_buf[thread_index],
155                                   vec_len (ecm->rx_buf[thread_index]));
156       else
157         n_read = app_recv_dgram (&s->data, ecm->rx_buf[thread_index],
158                                  vec_len (ecm->rx_buf[thread_index]));
159     }
160   else
161     {
162       n_read = svm_fifo_max_dequeue_cons (rx_fifo);
163       svm_fifo_dequeue_drop (rx_fifo, n_read);
164     }
165
166   if (n_read > 0)
167     {
168       if (ECHO_CLIENT_DBG)
169         {
170           /* *INDENT-OFF* */
171           ELOG_TYPE_DECLARE (e) =
172             {
173               .format = "rx-deq: %d bytes",
174               .format_args = "i4",
175             };
176           /* *INDENT-ON* */
177           struct
178           {
179             u32 data[1];
180           } *ed;
181           ed = ELOG_DATA (&vlib_global_main.elog_main, e);
182           ed->data[0] = n_read;
183         }
184
185       if (ecm->test_bytes)
186         {
187           for (i = 0; i < n_read; i++)
188             {
189               if (ecm->rx_buf[thread_index][i]
190                   != ((s->bytes_received + i) & 0xff))
191                 {
192                   clib_warning ("read %d error at byte %lld, 0x%x not 0x%x",
193                                 n_read, s->bytes_received + i,
194                                 ecm->rx_buf[thread_index][i],
195                                 ((s->bytes_received + i) & 0xff));
196                   ecm->test_failed = 1;
197                 }
198             }
199         }
200       ASSERT (n_read <= s->bytes_to_receive);
201       s->bytes_to_receive -= n_read;
202       s->bytes_received += n_read;
203     }
204 }
205
206 static uword
207 echo_client_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
208                      vlib_frame_t * frame)
209 {
210   echo_client_main_t *ecm = &echo_client_main;
211   int my_thread_index = vlib_get_thread_index ();
212   eclient_session_t *sp;
213   int i;
214   int delete_session;
215   u32 *connection_indices;
216   u32 *connections_this_batch;
217   u32 nconnections_this_batch;
218
219   connection_indices = ecm->connection_index_by_thread[my_thread_index];
220   connections_this_batch =
221     ecm->connections_this_batch_by_thread[my_thread_index];
222
223   if ((ecm->run_test != ECHO_CLIENTS_RUNNING) ||
224       ((vec_len (connection_indices) == 0)
225        && vec_len (connections_this_batch) == 0))
226     return 0;
227
228   /* Grab another pile of connections */
229   if (PREDICT_FALSE (vec_len (connections_this_batch) == 0))
230     {
231       nconnections_this_batch =
232         clib_min (ecm->connections_per_batch, vec_len (connection_indices));
233
234       ASSERT (nconnections_this_batch > 0);
235       vec_validate (connections_this_batch, nconnections_this_batch - 1);
236       clib_memcpy_fast (connections_this_batch,
237                         connection_indices + vec_len (connection_indices)
238                         - nconnections_this_batch,
239                         nconnections_this_batch * sizeof (u32));
240       _vec_len (connection_indices) -= nconnections_this_batch;
241     }
242
243   if (PREDICT_FALSE (ecm->prev_conns != ecm->connections_per_batch
244                      && ecm->prev_conns == vec_len (connections_this_batch)))
245     {
246       ecm->repeats++;
247       ecm->prev_conns = vec_len (connections_this_batch);
248       if (ecm->repeats == 500000)
249         {
250           clib_warning ("stuck clients");
251         }
252     }
253   else
254     {
255       ecm->prev_conns = vec_len (connections_this_batch);
256       ecm->repeats = 0;
257     }
258
259   for (i = 0; i < vec_len (connections_this_batch); i++)
260     {
261       delete_session = 1;
262
263       sp = pool_elt_at_index (ecm->sessions, connections_this_batch[i]);
264
265       if (sp->bytes_to_send > 0)
266         {
267           send_data_chunk (ecm, sp);
268           delete_session = 0;
269         }
270       if (sp->bytes_to_receive > 0)
271         {
272           delete_session = 0;
273         }
274       if (PREDICT_FALSE (delete_session == 1))
275         {
276           session_t *s;
277
278           clib_atomic_fetch_add (&ecm->tx_total, sp->bytes_sent);
279           clib_atomic_fetch_add (&ecm->rx_total, sp->bytes_received);
280           s = session_get_from_handle_if_valid (sp->vpp_session_handle);
281
282           if (s)
283             {
284               vnet_disconnect_args_t _a, *a = &_a;
285               a->handle = session_handle (s);
286               a->app_index = ecm->app_index;
287               vnet_disconnect_session (a);
288
289               vec_delete (connections_this_batch, 1, i);
290               i--;
291               clib_atomic_fetch_add (&ecm->ready_connections, -1);
292             }
293           else
294             {
295               clib_warning ("session AWOL?");
296               vec_delete (connections_this_batch, 1, i);
297             }
298
299           /* Kick the debug CLI process */
300           if (ecm->ready_connections == 0)
301             {
302               signal_evt_to_cli (2);
303             }
304         }
305     }
306
307   ecm->connection_index_by_thread[my_thread_index] = connection_indices;
308   ecm->connections_this_batch_by_thread[my_thread_index] =
309     connections_this_batch;
310   return 0;
311 }
312
313 /* *INDENT-OFF* */
314 VLIB_REGISTER_NODE (echo_clients_node) =
315 {
316   .function = echo_client_node_fn,
317   .name = "echo-clients",
318   .type = VLIB_NODE_TYPE_INPUT,
319   .state = VLIB_NODE_STATE_DISABLED,
320 };
321 /* *INDENT-ON* */
322
323 static int
324 create_api_loopback (echo_client_main_t * ecm)
325 {
326   api_main_t *am = vlibapi_get_main ();
327   vl_shmem_hdr_t *shmem_hdr;
328
329   shmem_hdr = am->shmem_hdr;
330   ecm->vl_input_queue = shmem_hdr->vl_input_queue;
331   ecm->my_client_index = vl_api_memclnt_create_internal ("echo_client",
332                                                          ecm->vl_input_queue);
333   return 0;
334 }
335
336 static int
337 echo_clients_init (vlib_main_t * vm)
338 {
339   echo_client_main_t *ecm = &echo_client_main;
340   vlib_thread_main_t *vtm = vlib_get_thread_main ();
341   u32 num_threads;
342   int i;
343
344   if (create_api_loopback (ecm))
345     return -1;
346
347   num_threads = 1 /* main thread */  + vtm->n_threads;
348
349   /* Init test data. Big buffer */
350   vec_validate (ecm->connect_test_data, 4 * 1024 * 1024 - 1);
351   for (i = 0; i < vec_len (ecm->connect_test_data); i++)
352     ecm->connect_test_data[i] = i & 0xff;
353
354   vec_validate (ecm->rx_buf, num_threads - 1);
355   for (i = 0; i < num_threads; i++)
356     vec_validate (ecm->rx_buf[i], vec_len (ecm->connect_test_data) - 1);
357
358   ecm->is_init = 1;
359
360   vec_validate (ecm->connection_index_by_thread, vtm->n_vlib_mains);
361   vec_validate (ecm->connections_this_batch_by_thread, vtm->n_vlib_mains);
362   vec_validate (ecm->quic_session_index_by_thread, vtm->n_vlib_mains);
363   vec_validate (ecm->vpp_event_queue, vtm->n_vlib_mains);
364
365   return 0;
366 }
367
368 static int
369 quic_echo_clients_qsession_connected_callback (u32 app_index, u32 api_context,
370                                                session_t * s,
371                                                session_error_t err)
372 {
373   echo_client_main_t *ecm = &echo_client_main;
374   vnet_connect_args_t *a = 0;
375   int rv;
376   u8 thread_index = vlib_get_thread_index ();
377   session_endpoint_cfg_t sep = SESSION_ENDPOINT_CFG_NULL;
378   u32 stream_n;
379   session_handle_t handle;
380
381   DBG ("QUIC Connection handle %d", session_handle (s));
382
383   vec_validate (a, 1);
384   a->uri = (char *) ecm->connect_uri;
385   if (parse_uri (a->uri, &sep))
386     return -1;
387   sep.parent_handle = handle = session_handle (s);
388
389   for (stream_n = 0; stream_n < ecm->quic_streams; stream_n++)
390     {
391       clib_memset (a, 0, sizeof (*a));
392       a->app_index = ecm->app_index;
393       a->api_context = -1 - api_context;
394       clib_memcpy (&a->sep_ext, &sep, sizeof (sep));
395
396       DBG ("QUIC opening stream %d", stream_n);
397       if ((rv = vnet_connect (a)))
398         {
399           clib_error ("Stream session %d opening failed: %d", stream_n, rv);
400           return -1;
401         }
402       DBG ("QUIC stream %d connected", stream_n);
403     }
404   /*
405    * 's' is no longer valid, its underlying pool could have been moved in
406    * vnet_connect()
407    */
408   vec_add1 (ecm->quic_session_index_by_thread[thread_index], handle);
409   vec_free (a);
410   return 0;
411 }
412
413 static int
414 quic_echo_clients_session_connected_callback (u32 app_index, u32 api_context,
415                                               session_t * s,
416                                               session_error_t err)
417 {
418   echo_client_main_t *ecm = &echo_client_main;
419   eclient_session_t *session;
420   u32 session_index;
421   u8 thread_index;
422
423   if (PREDICT_FALSE (ecm->run_test != ECHO_CLIENTS_STARTING))
424     return -1;
425
426   if (err)
427     {
428       clib_warning ("connection %d failed!", api_context);
429       ecm->run_test = ECHO_CLIENTS_EXITING;
430       signal_evt_to_cli (-1);
431       return 0;
432     }
433
434   if (s->listener_handle == SESSION_INVALID_HANDLE)
435     return quic_echo_clients_qsession_connected_callback (app_index,
436                                                           api_context, s,
437                                                           err);
438   DBG ("STREAM Connection callback %d", api_context);
439
440   thread_index = s->thread_index;
441   ASSERT (thread_index == vlib_get_thread_index ()
442           || session_transport_service_type (s) == TRANSPORT_SERVICE_CL);
443
444   if (!ecm->vpp_event_queue[thread_index])
445     ecm->vpp_event_queue[thread_index] =
446       session_main_get_vpp_event_queue (thread_index);
447
448   /*
449    * Setup session
450    */
451   clib_spinlock_lock_if_init (&ecm->sessions_lock);
452   pool_get (ecm->sessions, session);
453   clib_spinlock_unlock_if_init (&ecm->sessions_lock);
454
455   clib_memset (session, 0, sizeof (*session));
456   session_index = session - ecm->sessions;
457   session->bytes_to_send = ecm->bytes_to_send;
458   session->bytes_to_receive = ecm->no_return ? 0ULL : ecm->bytes_to_send;
459   session->data.rx_fifo = s->rx_fifo;
460   session->data.rx_fifo->client_session_index = session_index;
461   session->data.tx_fifo = s->tx_fifo;
462   session->data.tx_fifo->client_session_index = session_index;
463   session->data.vpp_evt_q = ecm->vpp_event_queue[thread_index];
464   session->vpp_session_handle = session_handle (s);
465
466   if (ecm->is_dgram)
467     {
468       transport_connection_t *tc;
469       tc = session_get_transport (s);
470       clib_memcpy_fast (&session->data.transport, tc,
471                         sizeof (session->data.transport));
472       session->data.is_dgram = 1;
473     }
474
475   vec_add1 (ecm->connection_index_by_thread[thread_index], session_index);
476   clib_atomic_fetch_add (&ecm->ready_connections, 1);
477   if (ecm->ready_connections == ecm->expected_connections)
478     {
479       ecm->run_test = ECHO_CLIENTS_RUNNING;
480       /* Signal the CLI process that the action is starting... */
481       signal_evt_to_cli (1);
482     }
483
484   return 0;
485 }
486
487 static int
488 echo_clients_session_connected_callback (u32 app_index, u32 api_context,
489                                          session_t * s, session_error_t err)
490 {
491   echo_client_main_t *ecm = &echo_client_main;
492   eclient_session_t *session;
493   u32 session_index;
494   u8 thread_index;
495
496   if (PREDICT_FALSE (ecm->run_test != ECHO_CLIENTS_STARTING))
497     return -1;
498
499   if (err)
500     {
501       clib_warning ("connection %d failed!", api_context);
502       ecm->run_test = ECHO_CLIENTS_EXITING;
503       signal_evt_to_cli (-1);
504       return 0;
505     }
506
507   thread_index = s->thread_index;
508   ASSERT (thread_index == vlib_get_thread_index ()
509           || session_transport_service_type (s) == TRANSPORT_SERVICE_CL);
510
511   if (!ecm->vpp_event_queue[thread_index])
512     ecm->vpp_event_queue[thread_index] =
513       session_main_get_vpp_event_queue (thread_index);
514
515   /*
516    * Setup session
517    */
518   clib_spinlock_lock_if_init (&ecm->sessions_lock);
519   pool_get (ecm->sessions, session);
520   clib_spinlock_unlock_if_init (&ecm->sessions_lock);
521
522   clib_memset (session, 0, sizeof (*session));
523   session_index = session - ecm->sessions;
524   session->bytes_to_send = ecm->bytes_to_send;
525   session->bytes_to_receive = ecm->no_return ? 0ULL : ecm->bytes_to_send;
526   session->data.rx_fifo = s->rx_fifo;
527   session->data.rx_fifo->client_session_index = session_index;
528   session->data.tx_fifo = s->tx_fifo;
529   session->data.tx_fifo->client_session_index = session_index;
530   session->data.vpp_evt_q = ecm->vpp_event_queue[thread_index];
531   session->vpp_session_handle = session_handle (s);
532
533   if (ecm->is_dgram)
534     {
535       transport_connection_t *tc;
536       tc = session_get_transport (s);
537       clib_memcpy_fast (&session->data.transport, tc,
538                         sizeof (session->data.transport));
539       session->data.is_dgram = 1;
540     }
541
542   vec_add1 (ecm->connection_index_by_thread[thread_index], session_index);
543   clib_atomic_fetch_add (&ecm->ready_connections, 1);
544   if (ecm->ready_connections == ecm->expected_connections)
545     {
546       ecm->run_test = ECHO_CLIENTS_RUNNING;
547       /* Signal the CLI process that the action is starting... */
548       signal_evt_to_cli (1);
549     }
550
551   return 0;
552 }
553
554 static void
555 echo_clients_session_reset_callback (session_t * s)
556 {
557   echo_client_main_t *ecm = &echo_client_main;
558   vnet_disconnect_args_t _a = { 0 }, *a = &_a;
559
560   if (s->session_state == SESSION_STATE_READY)
561     clib_warning ("Reset active connection %U", format_session, s, 2);
562
563   a->handle = session_handle (s);
564   a->app_index = ecm->app_index;
565   vnet_disconnect_session (a);
566   return;
567 }
568
569 static int
570 echo_clients_session_create_callback (session_t * s)
571 {
572   return 0;
573 }
574
575 static void
576 echo_clients_session_disconnect_callback (session_t * s)
577 {
578   echo_client_main_t *ecm = &echo_client_main;
579   vnet_disconnect_args_t _a = { 0 }, *a = &_a;
580   a->handle = session_handle (s);
581   a->app_index = ecm->app_index;
582   vnet_disconnect_session (a);
583   return;
584 }
585
586 void
587 echo_clients_session_disconnect (session_t * s)
588 {
589   echo_client_main_t *ecm = &echo_client_main;
590   vnet_disconnect_args_t _a = { 0 }, *a = &_a;
591   a->handle = session_handle (s);
592   a->app_index = ecm->app_index;
593   vnet_disconnect_session (a);
594 }
595
596 static int
597 echo_clients_rx_callback (session_t * s)
598 {
599   echo_client_main_t *ecm = &echo_client_main;
600   eclient_session_t *sp;
601
602   if (PREDICT_FALSE (ecm->run_test != ECHO_CLIENTS_RUNNING))
603     {
604       echo_clients_session_disconnect (s);
605       return -1;
606     }
607
608   sp = pool_elt_at_index (ecm->sessions, s->rx_fifo->client_session_index);
609   receive_data_chunk (ecm, sp);
610
611   if (svm_fifo_max_dequeue_cons (s->rx_fifo))
612     {
613       if (svm_fifo_set_event (s->rx_fifo))
614         session_send_io_evt_to_thread (s->rx_fifo, SESSION_IO_EVT_BUILTIN_RX);
615     }
616   return 0;
617 }
618
619 int
620 echo_client_add_segment_callback (u32 client_index, u64 segment_handle)
621 {
622   /* New heaps may be added */
623   return 0;
624 }
625
626 /* *INDENT-OFF* */
627 static session_cb_vft_t echo_clients = {
628   .session_reset_callback = echo_clients_session_reset_callback,
629   .session_connected_callback = echo_clients_session_connected_callback,
630   .session_accept_callback = echo_clients_session_create_callback,
631   .session_disconnect_callback = echo_clients_session_disconnect_callback,
632   .builtin_app_rx_callback = echo_clients_rx_callback,
633   .add_segment_callback = echo_client_add_segment_callback
634 };
635 /* *INDENT-ON* */
636
637 static clib_error_t *
638 echo_clients_attach (u8 * appns_id, u64 appns_flags, u64 appns_secret)
639 {
640   vnet_app_add_tls_cert_args_t _a_cert, *a_cert = &_a_cert;
641   vnet_app_add_tls_key_args_t _a_key, *a_key = &_a_key;
642   u32 prealloc_fifos, segment_size = 256 << 20;
643   echo_client_main_t *ecm = &echo_client_main;
644   vnet_app_attach_args_t _a, *a = &_a;
645   u64 options[17];
646   int rv;
647
648   clib_memset (a, 0, sizeof (*a));
649   clib_memset (options, 0, sizeof (options));
650
651   a->api_client_index = ecm->my_client_index;
652   if (ecm->transport_proto == TRANSPORT_PROTO_QUIC)
653     echo_clients.session_connected_callback =
654       quic_echo_clients_session_connected_callback;
655   a->session_cb_vft = &echo_clients;
656
657   prealloc_fifos = ecm->prealloc_fifos ? ecm->expected_connections : 1;
658
659   if (ecm->private_segment_size)
660     segment_size = ecm->private_segment_size;
661
662   options[APP_OPTIONS_ACCEPT_COOKIE] = 0x12345678;
663   options[APP_OPTIONS_SEGMENT_SIZE] = segment_size;
664   options[APP_OPTIONS_ADD_SEGMENT_SIZE] = segment_size;
665   options[APP_OPTIONS_RX_FIFO_SIZE] = ecm->fifo_size;
666   options[APP_OPTIONS_TX_FIFO_SIZE] = ecm->fifo_size;
667   options[APP_OPTIONS_PRIVATE_SEGMENT_COUNT] = ecm->private_segment_count;
668   options[APP_OPTIONS_PREALLOC_FIFO_PAIRS] = prealloc_fifos;
669   options[APP_OPTIONS_FLAGS] = APP_OPTIONS_FLAGS_IS_BUILTIN;
670   options[APP_OPTIONS_TLS_ENGINE] = ecm->tls_engine;
671   options[APP_OPTIONS_PCT_FIRST_ALLOC] = 100;
672   if (appns_id)
673     {
674       options[APP_OPTIONS_FLAGS] |= appns_flags;
675       options[APP_OPTIONS_NAMESPACE_SECRET] = appns_secret;
676     }
677   a->options = options;
678   a->namespace_id = appns_id;
679
680   if ((rv = vnet_application_attach (a)))
681     return clib_error_return (0, "attach returned %d", rv);
682
683   ecm->app_index = a->app_index;
684
685   clib_memset (a_cert, 0, sizeof (*a_cert));
686   a_cert->app_index = a->app_index;
687   vec_validate (a_cert->cert, test_srv_crt_rsa_len);
688   clib_memcpy_fast (a_cert->cert, test_srv_crt_rsa, test_srv_crt_rsa_len);
689   vnet_app_add_tls_cert (a_cert);
690
691   clib_memset (a_key, 0, sizeof (*a_key));
692   a_key->app_index = a->app_index;
693   vec_validate (a_key->key, test_srv_key_rsa_len);
694   clib_memcpy_fast (a_key->key, test_srv_key_rsa, test_srv_key_rsa_len);
695   vnet_app_add_tls_key (a_key);
696   return 0;
697 }
698
699 static int
700 echo_clients_detach ()
701 {
702   echo_client_main_t *ecm = &echo_client_main;
703   vnet_app_detach_args_t _da, *da = &_da;
704   int rv;
705
706   da->app_index = ecm->app_index;
707   da->api_client_index = ~0;
708   rv = vnet_application_detach (da);
709   ecm->test_client_attached = 0;
710   ecm->app_index = ~0;
711   return rv;
712 }
713
714 static void *
715 echo_client_thread_fn (void *arg)
716 {
717   return 0;
718 }
719
720 /** Start a transmit thread */
721 int
722 echo_clients_start_tx_pthread (echo_client_main_t * ecm)
723 {
724   if (ecm->client_thread_handle == 0)
725     {
726       int rv = pthread_create (&ecm->client_thread_handle,
727                                NULL /*attr */ ,
728                                echo_client_thread_fn, 0);
729       if (rv)
730         {
731           ecm->client_thread_handle = 0;
732           return -1;
733         }
734     }
735   return 0;
736 }
737
738 clib_error_t *
739 echo_clients_connect (vlib_main_t * vm, u32 n_clients)
740 {
741   echo_client_main_t *ecm = &echo_client_main;
742   vnet_connect_args_t _a, *a = &_a;
743   int i, rv;
744
745   clib_memset (a, 0, sizeof (*a));
746
747   for (i = 0; i < n_clients; i++)
748     {
749       a->uri = (char *) ecm->connect_uri;
750       a->api_context = i;
751       a->app_index = ecm->app_index;
752
753       vlib_worker_thread_barrier_sync (vm);
754       if ((rv = vnet_connect_uri (a)))
755         {
756           vlib_worker_thread_barrier_release (vm);
757           return clib_error_return (0, "connect returned: %d", rv);
758         }
759       vlib_worker_thread_barrier_release (vm);
760
761       /* Crude pacing for call setups  */
762       if ((i % 16) == 0)
763         vlib_process_suspend (vm, 100e-6);
764       ASSERT (i + 1 >= ecm->ready_connections);
765       while (i + 1 - ecm->ready_connections > 128)
766         vlib_process_suspend (vm, 1e-3);
767     }
768   return 0;
769 }
770
771 #define ec_cli_output(_fmt, _args...)                   \
772   if (!ecm->no_output)                                  \
773     vlib_cli_output(vm, _fmt, ##_args)
774
775 static clib_error_t *
776 echo_clients_command_fn (vlib_main_t * vm,
777                          unformat_input_t * input, vlib_cli_command_t * cmd)
778 {
779   echo_client_main_t *ecm = &echo_client_main;
780   vlib_thread_main_t *thread_main = vlib_get_thread_main ();
781   u64 tmp, total_bytes, appns_flags = 0, appns_secret = 0;
782   f64 test_timeout = 20.0, syn_timeout = 20.0, delta;
783   char *default_uri = "tcp://6.0.1.1/1234";
784   uword *event_data = 0, event_type;
785   f64 time_before_connects;
786   u32 n_clients = 1;
787   int preallocate_sessions = 0;
788   char *transfer_type;
789   clib_error_t *error = 0;
790   u8 *appns_id = 0;
791   int i;
792   session_endpoint_cfg_t sep = SESSION_ENDPOINT_CFG_NULL;
793   int rv;
794
795   ecm->quic_streams = 1;
796   ecm->bytes_to_send = 8192;
797   ecm->no_return = 0;
798   ecm->fifo_size = 64 << 10;
799   ecm->connections_per_batch = 1000;
800   ecm->private_segment_count = 0;
801   ecm->private_segment_size = 0;
802   ecm->no_output = 0;
803   ecm->test_bytes = 0;
804   ecm->test_failed = 0;
805   ecm->vlib_main = vm;
806   ecm->tls_engine = CRYPTO_ENGINE_OPENSSL;
807   ecm->no_copy = 0;
808   ecm->run_test = ECHO_CLIENTS_STARTING;
809
810   if (thread_main->n_vlib_mains > 1)
811     clib_spinlock_init (&ecm->sessions_lock);
812   vec_free (ecm->connect_uri);
813
814   while (unformat_check_input (input) != UNFORMAT_END_OF_INPUT)
815     {
816       if (unformat (input, "uri %s", &ecm->connect_uri))
817         ;
818       else if (unformat (input, "nclients %d", &n_clients))
819         ;
820       else if (unformat (input, "quic-streams %d", &ecm->quic_streams))
821         ;
822       else if (unformat (input, "mbytes %lld", &tmp))
823         ecm->bytes_to_send = tmp << 20;
824       else if (unformat (input, "gbytes %lld", &tmp))
825         ecm->bytes_to_send = tmp << 30;
826       else if (unformat (input, "bytes %lld", &ecm->bytes_to_send))
827         ;
828       else if (unformat (input, "test-timeout %f", &test_timeout))
829         ;
830       else if (unformat (input, "syn-timeout %f", &syn_timeout))
831         ;
832       else if (unformat (input, "no-return"))
833         ecm->no_return = 1;
834       else if (unformat (input, "fifo-size %d", &ecm->fifo_size))
835         ecm->fifo_size <<= 10;
836       else if (unformat (input, "private-segment-count %d",
837                          &ecm->private_segment_count))
838         ;
839       else if (unformat (input, "private-segment-size %U",
840                          unformat_memory_size, &tmp))
841         {
842           if (tmp >= 0x100000000ULL)
843             return clib_error_return
844               (0, "private segment size %lld (%llu) too large", tmp, tmp);
845           ecm->private_segment_size = tmp;
846         }
847       else if (unformat (input, "preallocate-fifos"))
848         ecm->prealloc_fifos = 1;
849       else if (unformat (input, "preallocate-sessions"))
850         preallocate_sessions = 1;
851       else
852         if (unformat (input, "client-batch %d", &ecm->connections_per_batch))
853         ;
854       else if (unformat (input, "appns %_%v%_", &appns_id))
855         ;
856       else if (unformat (input, "all-scope"))
857         appns_flags |= (APP_OPTIONS_FLAGS_USE_GLOBAL_SCOPE
858                         | APP_OPTIONS_FLAGS_USE_LOCAL_SCOPE);
859       else if (unformat (input, "local-scope"))
860         appns_flags = APP_OPTIONS_FLAGS_USE_LOCAL_SCOPE;
861       else if (unformat (input, "global-scope"))
862         appns_flags = APP_OPTIONS_FLAGS_USE_GLOBAL_SCOPE;
863       else if (unformat (input, "secret %lu", &appns_secret))
864         ;
865       else if (unformat (input, "no-output"))
866         ecm->no_output = 1;
867       else if (unformat (input, "test-bytes"))
868         ecm->test_bytes = 1;
869       else if (unformat (input, "tls-engine %d", &ecm->tls_engine))
870         ;
871       else
872         return clib_error_return (0, "failed: unknown input `%U'",
873                                   format_unformat_error, input);
874     }
875
876   /* Store cli process node index for signalling */
877   ecm->cli_node_index =
878     vlib_get_current_process (vm)->node_runtime.node_index;
879
880   if (ecm->is_init == 0)
881     {
882       if (echo_clients_init (vm))
883         return clib_error_return (0, "failed init");
884     }
885
886
887   ecm->ready_connections = 0;
888   ecm->expected_connections = n_clients * ecm->quic_streams;
889   ecm->rx_total = 0;
890   ecm->tx_total = 0;
891
892   if (!ecm->connect_uri)
893     {
894       clib_warning ("No uri provided. Using default: %s", default_uri);
895       ecm->connect_uri = format (0, "%s%c", default_uri, 0);
896     }
897
898   if ((rv = parse_uri ((char *) ecm->connect_uri, &sep)))
899     return clib_error_return (0, "Uri parse error: %d", rv);
900   ecm->transport_proto = sep.transport_proto;
901   ecm->is_dgram = (sep.transport_proto == TRANSPORT_PROTO_UDP);
902
903 #if ECHO_CLIENT_PTHREAD
904   echo_clients_start_tx_pthread ();
905 #endif
906
907   vlib_worker_thread_barrier_sync (vm);
908   vnet_session_enable_disable (vm, 1 /* turn on session and transports */ );
909   vlib_worker_thread_barrier_release (vm);
910
911   if (ecm->test_client_attached == 0)
912     {
913       if ((error = echo_clients_attach (appns_id, appns_flags, appns_secret)))
914         {
915           vec_free (appns_id);
916           clib_error_report (error);
917           return error;
918         }
919       vec_free (appns_id);
920     }
921   ecm->test_client_attached = 1;
922
923   /* Turn on the builtin client input nodes */
924   for (i = 0; i < thread_main->n_vlib_mains; i++)
925     vlib_node_set_state (vlib_mains[i], echo_clients_node.index,
926                          VLIB_NODE_STATE_POLLING);
927
928   if (preallocate_sessions)
929     pool_init_fixed (ecm->sessions, 1.1 * n_clients);
930
931   /* Fire off connect requests */
932   time_before_connects = vlib_time_now (vm);
933   if ((error = echo_clients_connect (vm, n_clients)))
934     {
935       goto cleanup;
936     }
937
938   /* Park until the sessions come up, or ten seconds elapse... */
939   vlib_process_wait_for_event_or_clock (vm, syn_timeout);
940   event_type = vlib_process_get_events (vm, &event_data);
941   switch (event_type)
942     {
943     case ~0:
944       ec_cli_output ("Timeout with only %d sessions active...",
945                      ecm->ready_connections);
946       error = clib_error_return (0, "failed: syn timeout with %d sessions",
947                                  ecm->ready_connections);
948       goto cleanup;
949
950     case 1:
951       delta = vlib_time_now (vm) - time_before_connects;
952       if (delta != 0.0)
953         ec_cli_output ("%d three-way handshakes in %.2f seconds %.2f/s",
954                        n_clients, delta, ((f64) n_clients) / delta);
955
956       ecm->test_start_time = vlib_time_now (ecm->vlib_main);
957       ec_cli_output ("Test started at %.6f", ecm->test_start_time);
958       break;
959
960     default:
961       ec_cli_output ("unexpected event(1): %d", event_type);
962       error = clib_error_return (0, "failed: unexpected event(1): %d",
963                                  event_type);
964       goto cleanup;
965     }
966
967   /* Now wait for the sessions to finish... */
968   vlib_process_wait_for_event_or_clock (vm, test_timeout);
969   event_type = vlib_process_get_events (vm, &event_data);
970   switch (event_type)
971     {
972     case ~0:
973       ec_cli_output ("Timeout with %d sessions still active...",
974                      ecm->ready_connections);
975       error = clib_error_return (0, "failed: timeout with %d sessions",
976                                  ecm->ready_connections);
977       goto cleanup;
978
979     case 2:
980       ecm->test_end_time = vlib_time_now (vm);
981       ec_cli_output ("Test finished at %.6f", ecm->test_end_time);
982       break;
983
984     default:
985       ec_cli_output ("unexpected event(2): %d", event_type);
986       error = clib_error_return (0, "failed: unexpected event(2): %d",
987                                  event_type);
988       goto cleanup;
989     }
990
991   delta = ecm->test_end_time - ecm->test_start_time;
992   if (delta != 0.0)
993     {
994       total_bytes = (ecm->no_return ? ecm->tx_total : ecm->rx_total);
995       transfer_type = ecm->no_return ? "half-duplex" : "full-duplex";
996       ec_cli_output ("%lld bytes (%lld mbytes, %lld gbytes) in %.2f seconds",
997                      total_bytes, total_bytes / (1ULL << 20),
998                      total_bytes / (1ULL << 30), delta);
999       ec_cli_output ("%.2f bytes/second %s", ((f64) total_bytes) / (delta),
1000                      transfer_type);
1001       ec_cli_output ("%.4f gbit/second %s",
1002                      (((f64) total_bytes * 8.0) / delta / 1e9),
1003                      transfer_type);
1004     }
1005   else
1006     {
1007       ec_cli_output ("zero delta-t?");
1008       error = clib_error_return (0, "failed: zero delta-t");
1009       goto cleanup;
1010     }
1011
1012   if (ecm->test_bytes && ecm->test_failed)
1013     error = clib_error_return (0, "failed: test bytes");
1014
1015 cleanup:
1016   ecm->run_test = ECHO_CLIENTS_EXITING;
1017   vlib_process_wait_for_event_or_clock (vm, 10e-3);
1018   for (i = 0; i < vec_len (ecm->connection_index_by_thread); i++)
1019     {
1020       vec_reset_length (ecm->connection_index_by_thread[i]);
1021       vec_reset_length (ecm->connections_this_batch_by_thread[i]);
1022       vec_reset_length (ecm->quic_session_index_by_thread[i]);
1023     }
1024
1025   pool_free (ecm->sessions);
1026
1027   /* Detach the application, so we can use different fifo sizes next time */
1028   if (ecm->test_client_attached)
1029     {
1030       if (echo_clients_detach ())
1031         {
1032           error = clib_error_return (0, "failed: app detach");
1033           ec_cli_output ("WARNING: app detach failed...");
1034         }
1035     }
1036   if (error)
1037     ec_cli_output ("test failed");
1038   vec_free (ecm->connect_uri);
1039   return error;
1040 }
1041
1042 /* *INDENT-OFF* */
1043 VLIB_CLI_COMMAND (echo_clients_command, static) =
1044 {
1045   .path = "test echo clients",
1046   .short_help = "test echo clients [nclients %d][[m|g]bytes <bytes>]"
1047       "[test-timeout <time>][syn-timeout <time>][no-return][fifo-size <size>]"
1048       "[private-segment-count <count>][private-segment-size <bytes>[m|g]]"
1049       "[preallocate-fifos][preallocate-sessions][client-batch <batch-size>]"
1050       "[uri <tcp://ip/port>][test-bytes][no-output]",
1051   .function = echo_clients_command_fn,
1052   .is_mp_safe = 1,
1053 };
1054 /* *INDENT-ON* */
1055
1056 clib_error_t *
1057 echo_clients_main_init (vlib_main_t * vm)
1058 {
1059   echo_client_main_t *ecm = &echo_client_main;
1060   ecm->is_init = 0;
1061   return 0;
1062 }
1063
1064 VLIB_INIT_FUNCTION (echo_clients_main_init);
1065
1066 /*
1067  * fd.io coding-style-patch-verification: ON
1068  *
1069  * Local Variables:
1070  * eval: (c-set-style "gnu")
1071  * End:
1072  */