d449045cf6ae98f4deff17ad1b628495948b81e9
[vpp.git] / src / plugins / hs_apps / echo_client.c
1 /*
2  * echo_client.c - vpp built-in echo client code
3  *
4  * Copyright (c) 2017-2019 by Cisco and/or its affiliates.
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at:
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17
18 #include <hs_apps/echo_client.h>
19
20 static ec_main_t ec_main;
21
22 #define EC_DBG (0)
23 #define DBG(_fmt, _args...)                                                   \
24   if (EC_DBG)                                                                 \
25   clib_warning (_fmt, ##_args)
26
27 static void
28 signal_evt_to_cli_i (void *codep)
29 {
30   ec_main_t *ecm = &ec_main;
31   int code;
32
33   ASSERT (vlib_get_thread_index () == 0);
34   code = pointer_to_uword (codep);
35   vlib_process_signal_event (ecm->vlib_main, ecm->cli_node_index, code, 0);
36 }
37
38 static void
39 signal_evt_to_cli (int code)
40 {
41   if (vlib_get_thread_index () != 0)
42     session_send_rpc_evt_to_thread_force (
43       0, signal_evt_to_cli_i, uword_to_pointer ((uword) code, void *));
44   else
45     signal_evt_to_cli_i (uword_to_pointer ((uword) code, void *));
46 }
47
48 static inline ec_worker_t *
49 ec_worker_get (u32 thread_index)
50 {
51   return vec_elt_at_index (ec_main.wrk, thread_index);
52 }
53
54 static inline ec_session_t *
55 ec_session_alloc (ec_worker_t *wrk)
56 {
57   ec_session_t *ecs;
58
59   pool_get_zero (wrk->sessions, ecs);
60   ecs->data.session_index = ecs - wrk->sessions;
61   ecs->thread_index = wrk->thread_index;
62
63   return ecs;
64 }
65
66 static inline ec_session_t *
67 ec_session_get (ec_worker_t *wrk, u32 ec_index)
68 {
69   return pool_elt_at_index (wrk->sessions, ec_index);
70 }
71
72 static void
73 send_data_chunk (ec_main_t *ecm, ec_session_t *es)
74 {
75   u8 *test_data = ecm->connect_test_data;
76   int test_buf_len, test_buf_offset, rv;
77   u32 bytes_this_chunk;
78
79   test_buf_len = vec_len (test_data);
80   ASSERT (test_buf_len > 0);
81   test_buf_offset = es->bytes_sent % test_buf_len;
82   bytes_this_chunk =
83     clib_min (test_buf_len - test_buf_offset, es->bytes_to_send);
84
85   if (!ecm->is_dgram)
86     {
87       if (ecm->no_copy)
88         {
89           svm_fifo_t *f = es->data.tx_fifo;
90           rv = clib_min (svm_fifo_max_enqueue_prod (f), bytes_this_chunk);
91           svm_fifo_enqueue_nocopy (f, rv);
92           session_send_io_evt_to_thread_custom (
93             &es->vpp_session_index, es->thread_index, SESSION_IO_EVT_TX);
94         }
95       else
96         rv = app_send_stream (&es->data, test_data + test_buf_offset,
97                               bytes_this_chunk, 0);
98     }
99   else
100     {
101       svm_fifo_t *f = es->data.tx_fifo;
102       u32 max_enqueue = svm_fifo_max_enqueue_prod (f);
103
104       if (max_enqueue < sizeof (session_dgram_hdr_t))
105         return;
106
107       max_enqueue -= sizeof (session_dgram_hdr_t);
108
109       if (ecm->no_copy)
110         {
111           session_dgram_hdr_t hdr;
112           app_session_transport_t *at = &es->data.transport;
113
114           rv = clib_min (max_enqueue, bytes_this_chunk);
115
116           hdr.data_length = rv;
117           hdr.data_offset = 0;
118           clib_memcpy_fast (&hdr.rmt_ip, &at->rmt_ip,
119                             sizeof (ip46_address_t));
120           hdr.is_ip4 = at->is_ip4;
121           hdr.rmt_port = at->rmt_port;
122           clib_memcpy_fast (&hdr.lcl_ip, &at->lcl_ip,
123                             sizeof (ip46_address_t));
124           hdr.lcl_port = at->lcl_port;
125           svm_fifo_enqueue (f, sizeof (hdr), (u8 *) & hdr);
126           svm_fifo_enqueue_nocopy (f, rv);
127           session_send_io_evt_to_thread_custom (
128             &es->vpp_session_index, es->thread_index, SESSION_IO_EVT_TX);
129         }
130       else
131         {
132           bytes_this_chunk = clib_min (bytes_this_chunk, max_enqueue);
133           bytes_this_chunk = clib_min (bytes_this_chunk, 1460);
134           rv = app_send_dgram (&es->data, test_data + test_buf_offset,
135                                bytes_this_chunk, 0);
136         }
137     }
138
139   /* If we managed to enqueue data... */
140   if (rv > 0)
141     {
142       /* Account for it... */
143       es->bytes_to_send -= rv;
144       es->bytes_sent += rv;
145
146       if (EC_DBG)
147         {
148           ELOG_TYPE_DECLARE (e) =
149             {
150               .format = "tx-enq: xfer %d bytes, sent %u remain %u",
151               .format_args = "i4i4i4",
152             };
153           struct
154           {
155             u32 data[3];
156           } *ed;
157           ed = ELOG_DATA (&vlib_global_main.elog_main, e);
158           ed->data[0] = rv;
159           ed->data[1] = es->bytes_sent;
160           ed->data[2] = es->bytes_to_send;
161         }
162     }
163 }
164
165 static void
166 receive_data_chunk (ec_worker_t *wrk, ec_session_t *es)
167 {
168   ec_main_t *ecm = &ec_main;
169   svm_fifo_t *rx_fifo = es->data.rx_fifo;
170   int n_read, i;
171
172   if (ecm->test_bytes)
173     {
174       if (!ecm->is_dgram)
175         n_read =
176           app_recv_stream (&es->data, wrk->rx_buf, vec_len (wrk->rx_buf));
177       else
178         n_read =
179           app_recv_dgram (&es->data, wrk->rx_buf, vec_len (wrk->rx_buf));
180     }
181   else
182     {
183       n_read = svm_fifo_max_dequeue_cons (rx_fifo);
184       svm_fifo_dequeue_drop (rx_fifo, n_read);
185     }
186
187   if (n_read > 0)
188     {
189       if (EC_DBG)
190         {
191           ELOG_TYPE_DECLARE (e) =
192             {
193               .format = "rx-deq: %d bytes",
194               .format_args = "i4",
195             };
196           struct
197           {
198             u32 data[1];
199           } *ed;
200           ed = ELOG_DATA (&vlib_global_main.elog_main, e);
201           ed->data[0] = n_read;
202         }
203
204       if (ecm->test_bytes)
205         {
206           for (i = 0; i < n_read; i++)
207             {
208               if (wrk->rx_buf[i] != ((es->bytes_received + i) & 0xff))
209                 {
210                   clib_warning ("read %d error at byte %lld, 0x%x not 0x%x",
211                                 n_read, es->bytes_received + i, wrk->rx_buf[i],
212                                 ((es->bytes_received + i) & 0xff));
213                   ecm->test_failed = 1;
214                 }
215             }
216         }
217       ASSERT (n_read <= es->bytes_to_receive);
218       es->bytes_to_receive -= n_read;
219       es->bytes_received += n_read;
220     }
221 }
222
223 static uword
224 ec_node_fn (vlib_main_t *vm, vlib_node_runtime_t *node, vlib_frame_t *frame)
225 {
226   u32 *conn_indices, *conns_this_batch, nconns_this_batch;
227   int thread_index = vm->thread_index, i, delete_session;
228   ec_main_t *ecm = &ec_main;
229   ec_worker_t *wrk;
230   ec_session_t *es;
231   session_t *s;
232
233   if (ecm->run_test != EC_RUNNING)
234     return 0;
235
236   wrk = ec_worker_get (thread_index);
237   conn_indices = wrk->conn_indices;
238   conns_this_batch = wrk->conns_this_batch;
239
240   if (((vec_len (conn_indices) == 0) && vec_len (conns_this_batch) == 0))
241     return 0;
242
243   /* Grab another pile of connections */
244   if (PREDICT_FALSE (vec_len (conns_this_batch) == 0))
245     {
246       nconns_this_batch =
247         clib_min (ecm->connections_per_batch, vec_len (conn_indices));
248
249       ASSERT (nconns_this_batch > 0);
250       vec_validate (conns_this_batch, nconns_this_batch - 1);
251       clib_memcpy_fast (conns_this_batch,
252                         conn_indices + vec_len (conn_indices) -
253                           nconns_this_batch,
254                         nconns_this_batch * sizeof (u32));
255       vec_dec_len (conn_indices, nconns_this_batch);
256     }
257
258   /*
259    * Track progress
260    */
261   if (PREDICT_FALSE (ecm->prev_conns != ecm->connections_per_batch &&
262                      ecm->prev_conns == vec_len (conns_this_batch)))
263     {
264       ecm->repeats++;
265       ecm->prev_conns = vec_len (conns_this_batch);
266       if (ecm->repeats == 500000)
267         {
268           clib_warning ("stuck clients");
269         }
270     }
271   else
272     {
273       ecm->prev_conns = vec_len (conns_this_batch);
274       ecm->repeats = 0;
275     }
276
277   /*
278    * Handle connections in this batch
279    */
280   for (i = 0; i < vec_len (conns_this_batch); i++)
281     {
282       es = ec_session_get (wrk, conns_this_batch[i]);
283
284       delete_session = 1;
285
286       if (es->bytes_to_send > 0)
287         {
288           send_data_chunk (ecm, es);
289           delete_session = 0;
290         }
291
292       if (es->bytes_to_receive > 0)
293         {
294           delete_session = 0;
295         }
296
297       if (PREDICT_FALSE (delete_session == 1))
298         {
299           clib_atomic_fetch_add (&ecm->tx_total, es->bytes_sent);
300           clib_atomic_fetch_add (&ecm->rx_total, es->bytes_received);
301           s = session_get_from_handle_if_valid (es->vpp_session_handle);
302
303           if (s)
304             {
305               vnet_disconnect_args_t _a, *a = &_a;
306               a->handle = session_handle (s);
307               a->app_index = ecm->app_index;
308               vnet_disconnect_session (a);
309
310               vec_delete (conns_this_batch, 1, i);
311               i--;
312               clib_atomic_fetch_add (&ecm->ready_connections, -1);
313             }
314           else
315             {
316               clib_warning ("session AWOL?");
317               vec_delete (conns_this_batch, 1, i);
318             }
319
320           /* Kick the debug CLI process */
321           if (ecm->ready_connections == 0)
322             {
323               signal_evt_to_cli (EC_CLI_TEST_DONE);
324             }
325         }
326     }
327
328   wrk->conn_indices = conn_indices;
329   wrk->conns_this_batch = conns_this_batch;
330   return 0;
331 }
332
333 VLIB_REGISTER_NODE (echo_clients_node) = {
334   .function = ec_node_fn,
335   .name = "echo-clients",
336   .type = VLIB_NODE_TYPE_INPUT,
337   .state = VLIB_NODE_STATE_DISABLED,
338 };
339
340 static void
341 ec_reset_runtime_config (ec_main_t *ecm)
342 {
343   ecm->n_clients = 1;
344   ecm->quic_streams = 1;
345   ecm->bytes_to_send = 8192;
346   ecm->no_return = 0;
347   ecm->fifo_size = 64 << 10;
348   ecm->connections_per_batch = 1000;
349   ecm->private_segment_count = 0;
350   ecm->private_segment_size = 256 << 20;
351   ecm->no_output = 0;
352   ecm->test_bytes = 0;
353   ecm->test_failed = 0;
354   ecm->tls_engine = CRYPTO_ENGINE_OPENSSL;
355   ecm->no_copy = 0;
356   ecm->run_test = EC_STARTING;
357   ecm->ready_connections = 0;
358   ecm->connect_conn_index = 0;
359   ecm->rx_total = 0;
360   ecm->tx_total = 0;
361   ecm->barrier_acq_needed = 0;
362   ecm->prealloc_sessions = 0;
363   ecm->prealloc_fifos = 0;
364   ecm->appns_id = 0;
365   ecm->appns_secret = 0;
366   ecm->attach_flags = 0;
367   ecm->syn_timeout = 20.0;
368   ecm->test_timeout = 20.0;
369   vec_free (ecm->connect_uri);
370 }
371
372 static int
373 ec_init (vlib_main_t *vm)
374 {
375   ec_main_t *ecm = &ec_main;
376   ec_worker_t *wrk;
377   u32 num_threads;
378   int i;
379
380   ec_reset_runtime_config (ecm);
381
382   /* Store cli process node index for signaling */
383   ecm->cli_node_index = vlib_get_current_process (vm)->node_runtime.node_index;
384   ecm->vlib_main = vm;
385
386   if (vlib_num_workers ())
387     {
388       /* The request came over the binary api and the inband cli handler
389        * is not mp_safe. Drop the barrier to make sure the workers are not
390        * blocked.
391        */
392       if (vlib_thread_is_main_w_barrier ())
393         {
394           ecm->barrier_acq_needed = 1;
395           vlib_worker_thread_barrier_release (vm);
396         }
397       /*
398        * There's a good chance that both the client and the server echo
399        * apps will be enabled so make sure the session queue node polls on
400        * the main thread as connections will probably be established on it.
401        */
402       vlib_node_set_state (vm, session_queue_node.index,
403                            VLIB_NODE_STATE_POLLING);
404     }
405
406   /* App init done only once */
407   if (ecm->app_is_init)
408     return 0;
409
410
411   /* Init test data. Big buffer */
412   vec_validate (ecm->connect_test_data, 4 * 1024 * 1024 - 1);
413   for (i = 0; i < vec_len (ecm->connect_test_data); i++)
414     ecm->connect_test_data[i] = i & 0xff;
415
416   num_threads = 1 /* main thread */ + vlib_num_workers ();
417   vec_validate (ecm->wrk, num_threads - 1);
418   vec_foreach (wrk, ecm->wrk)
419     {
420       vec_validate (wrk->rx_buf, vec_len (ecm->connect_test_data) - 1);
421       wrk->thread_index = wrk - ecm->wrk;
422       wrk->vpp_event_queue =
423         session_main_get_vpp_event_queue (wrk->thread_index);
424     }
425
426   ecm->app_is_init = 1;
427
428   vlib_worker_thread_barrier_sync (vm);
429   vnet_session_enable_disable (vm, 1 /* turn on session and transports */);
430
431   /* Turn on the builtin client input nodes */
432   foreach_vlib_main ()
433     vlib_node_set_state (this_vlib_main, echo_clients_node.index,
434                          VLIB_NODE_STATE_POLLING);
435
436   vlib_worker_thread_barrier_release (vm);
437
438   return 0;
439 }
440
441 static void
442 ec_prealloc_sessions (ec_main_t *ecm)
443 {
444   u32 sessions_per_wrk, n_wrks;
445   ec_worker_t *wrk;
446
447   n_wrks = vlib_num_workers () ? vlib_num_workers () : 1;
448
449   sessions_per_wrk = ecm->n_clients / n_wrks;
450   vec_foreach (wrk, ecm->wrk)
451     pool_init_fixed (wrk->sessions, 1.1 * sessions_per_wrk);
452 }
453
454 static void
455 ec_worker_cleanup (ec_worker_t *wrk)
456 {
457   pool_free (wrk->sessions);
458   vec_free (wrk->conn_indices);
459   vec_free (wrk->conns_this_batch);
460 }
461
462 static void
463 ec_cleanup (ec_main_t *ecm)
464 {
465   ec_worker_t *wrk;
466
467   vec_foreach (wrk, ecm->wrk)
468     ec_worker_cleanup (wrk);
469
470   vec_free (ecm->connect_uri);
471   vec_free (ecm->appns_id);
472
473   if (ecm->barrier_acq_needed)
474     vlib_worker_thread_barrier_sync (ecm->vlib_main);
475 }
476
477 static int
478 quic_ec_qsession_connected_callback (u32 app_index, u32 api_context,
479                                      session_t *s, session_error_t err)
480 {
481   session_endpoint_cfg_t sep = SESSION_ENDPOINT_CFG_NULL;
482   ec_main_t *ecm = &ec_main;
483   vnet_connect_args_t *a = 0;
484   session_handle_t handle;
485   u32 stream_n;
486   int rv;
487
488   DBG ("QUIC Connection handle %d", session_handle (s));
489
490   vec_validate (a, 1);
491   a->uri = (char *) ecm->connect_uri;
492   if (parse_uri (a->uri, &sep))
493     return -1;
494   sep.parent_handle = handle = session_handle (s);
495
496   for (stream_n = 0; stream_n < ecm->quic_streams; stream_n++)
497     {
498       clib_memset (a, 0, sizeof (*a));
499       a->app_index = ecm->app_index;
500       a->api_context = -1 - api_context;
501       clib_memcpy (&a->sep_ext, &sep, sizeof (sep));
502
503       DBG ("QUIC opening stream %d", stream_n);
504       if ((rv = vnet_connect (a)))
505         {
506           clib_error ("Stream session %d opening failed: %d", stream_n, rv);
507           return -1;
508         }
509       DBG ("QUIC stream %d connected", stream_n);
510     }
511   /*
512    * 's' is no longer valid, its underlying pool could have been moved in
513    * vnet_connect()
514    */
515   vec_free (a);
516   return 0;
517 }
518
519 static int
520 quic_ec_session_connected_callback (u32 app_index, u32 api_context,
521                                     session_t *s, session_error_t err)
522 {
523   ec_main_t *ecm = &ec_main;
524   ec_session_t *es;
525   ec_worker_t *wrk;
526   u32 thread_index;
527
528   if (PREDICT_FALSE (ecm->run_test != EC_STARTING))
529     return -1;
530
531   if (err)
532     {
533       clib_warning ("connection %d failed!", api_context);
534       ecm->run_test = EC_EXITING;
535       signal_evt_to_cli (EC_CLI_CONNECTS_FAILED);
536       return 0;
537     }
538
539   if (s->listener_handle == SESSION_INVALID_HANDLE)
540     return quic_ec_qsession_connected_callback (app_index, api_context, s,
541                                                 err);
542   DBG ("STREAM Connection callback %d", api_context);
543
544   thread_index = s->thread_index;
545   ASSERT (thread_index == vlib_get_thread_index ()
546           || session_transport_service_type (s) == TRANSPORT_SERVICE_CL);
547
548   wrk = ec_worker_get (thread_index);
549
550   /*
551    * Setup session
552    */
553   es = ec_session_alloc (wrk);
554
555   es->bytes_to_send = ecm->bytes_to_send;
556   es->bytes_to_receive = ecm->no_return ? 0ULL : ecm->bytes_to_send;
557   es->data.rx_fifo = s->rx_fifo;
558   es->data.rx_fifo->shr->client_session_index = es->data.session_index;
559   es->data.tx_fifo = s->tx_fifo;
560   es->data.tx_fifo->shr->client_session_index = es->data.session_index;
561   es->data.vpp_evt_q = wrk->vpp_event_queue;
562   es->vpp_session_handle = session_handle (s);
563   es->vpp_session_index = s->session_index;
564   s->opaque = es->data.session_index;
565
566   if (ecm->is_dgram)
567     {
568       transport_connection_t *tc;
569       tc = session_get_transport (s);
570       clib_memcpy_fast (&es->data.transport, tc, sizeof (es->data.transport));
571       es->data.is_dgram = 1;
572     }
573
574   vec_add1 (wrk->conn_indices, es->data.session_index);
575   clib_atomic_fetch_add (&ecm->ready_connections, 1);
576   if (ecm->ready_connections == ecm->expected_connections)
577     {
578       ecm->run_test = EC_RUNNING;
579       /* Signal the CLI process that the action is starting... */
580       signal_evt_to_cli (EC_CLI_CONNECTS_DONE);
581     }
582
583   return 0;
584 }
585
586 static int
587 ec_session_connected_callback (u32 app_index, u32 api_context, session_t *s,
588                                session_error_t err)
589 {
590   ec_main_t *ecm = &ec_main;
591   ec_session_t *es;
592   u32 thread_index;
593   ec_worker_t *wrk;
594
595   if (PREDICT_FALSE (ecm->run_test != EC_STARTING))
596     return -1;
597
598   if (err)
599     {
600       clib_warning ("connection %d failed!", api_context);
601       ecm->run_test = EC_EXITING;
602       signal_evt_to_cli (EC_CLI_CONNECTS_FAILED);
603       return 0;
604     }
605
606   thread_index = s->thread_index;
607   ASSERT (thread_index == vlib_get_thread_index ()
608           || session_transport_service_type (s) == TRANSPORT_SERVICE_CL);
609
610   wrk = ec_worker_get (thread_index);
611
612   /*
613    * Setup session
614    */
615   es = ec_session_alloc (wrk);
616
617   es->bytes_to_send = ecm->bytes_to_send;
618   es->bytes_to_receive = ecm->no_return ? 0ULL : ecm->bytes_to_send;
619   es->data.rx_fifo = s->rx_fifo;
620   es->data.rx_fifo->shr->client_session_index = es->data.session_index;
621   es->data.tx_fifo = s->tx_fifo;
622   es->data.tx_fifo->shr->client_session_index = es->data.session_index;
623   es->data.vpp_evt_q = wrk->vpp_event_queue;
624   es->vpp_session_handle = session_handle (s);
625   es->vpp_session_index = s->session_index;
626   s->opaque = es->data.session_index;
627
628   if (ecm->is_dgram)
629     {
630       transport_connection_t *tc;
631       tc = session_get_transport (s);
632       clib_memcpy_fast (&es->data.transport, tc, sizeof (es->data.transport));
633       es->data.is_dgram = 1;
634     }
635
636   vec_add1 (wrk->conn_indices, es->data.session_index);
637   clib_atomic_fetch_add (&ecm->ready_connections, 1);
638   if (ecm->ready_connections == ecm->expected_connections)
639     {
640       ecm->run_test = EC_RUNNING;
641       /* Signal the CLI process that the action is starting... */
642       signal_evt_to_cli (EC_CLI_CONNECTS_DONE);
643     }
644
645   return 0;
646 }
647
648 static void
649 ec_session_reset_callback (session_t *s)
650 {
651   ec_main_t *ecm = &ec_main;
652   vnet_disconnect_args_t _a = { 0 }, *a = &_a;
653
654   if (s->session_state == SESSION_STATE_READY)
655     clib_warning ("Reset active connection %U", format_session, s, 2);
656
657   a->handle = session_handle (s);
658   a->app_index = ecm->app_index;
659   vnet_disconnect_session (a);
660   return;
661 }
662
663 static int
664 ec_session_accept_callback (session_t *s)
665 {
666   return 0;
667 }
668
669 static void
670 ec_session_disconnect_callback (session_t *s)
671 {
672   ec_main_t *ecm = &ec_main;
673   vnet_disconnect_args_t _a = { 0 }, *a = &_a;
674   a->handle = session_handle (s);
675   a->app_index = ecm->app_index;
676   vnet_disconnect_session (a);
677   return;
678 }
679
680 void
681 ec_session_disconnect (session_t *s)
682 {
683   ec_main_t *ecm = &ec_main;
684   vnet_disconnect_args_t _a = { 0 }, *a = &_a;
685   a->handle = session_handle (s);
686   a->app_index = ecm->app_index;
687   vnet_disconnect_session (a);
688 }
689
690 static int
691 ec_session_rx_callback (session_t *s)
692 {
693   ec_main_t *ecm = &ec_main;
694   ec_worker_t *wrk;
695   ec_session_t *es;
696
697   if (PREDICT_FALSE (ecm->run_test != EC_RUNNING))
698     {
699       ec_session_disconnect (s);
700       return -1;
701     }
702
703   wrk = ec_worker_get (s->thread_index);
704   es = ec_session_get (wrk, s->opaque);
705
706   receive_data_chunk (wrk, es);
707
708   if (svm_fifo_max_dequeue_cons (s->rx_fifo))
709     session_enqueue_notify (s);
710
711   return 0;
712 }
713
714 static int
715 ec_add_segment_callback (u32 app_index, u64 segment_handle)
716 {
717   /* New segments may be added */
718   return 0;
719 }
720
721 static int
722 ec_del_segment_callback (u32 app_index, u64 segment_handle)
723 {
724   return 0;
725 }
726
727 static session_cb_vft_t ec_cb_vft = {
728   .session_reset_callback = ec_session_reset_callback,
729   .session_connected_callback = ec_session_connected_callback,
730   .session_accept_callback = ec_session_accept_callback,
731   .session_disconnect_callback = ec_session_disconnect_callback,
732   .builtin_app_rx_callback = ec_session_rx_callback,
733   .add_segment_callback = ec_add_segment_callback,
734   .del_segment_callback = ec_del_segment_callback,
735 };
736
737 static clib_error_t *
738 ec_attach ()
739 {
740   vnet_app_add_cert_key_pair_args_t _ck_pair, *ck_pair = &_ck_pair;
741   ec_main_t *ecm = &ec_main;
742   vnet_app_attach_args_t _a, *a = &_a;
743   u32 prealloc_fifos;
744   u64 options[18];
745   int rv;
746
747   clib_memset (a, 0, sizeof (*a));
748   clib_memset (options, 0, sizeof (options));
749
750   a->api_client_index = ~0;
751   a->name = format (0, "echo_client");
752   if (ecm->transport_proto == TRANSPORT_PROTO_QUIC)
753     ec_cb_vft.session_connected_callback = quic_ec_session_connected_callback;
754   a->session_cb_vft = &ec_cb_vft;
755
756   prealloc_fifos = ecm->prealloc_fifos ? ecm->expected_connections : 1;
757
758   options[APP_OPTIONS_ACCEPT_COOKIE] = 0x12345678;
759   options[APP_OPTIONS_SEGMENT_SIZE] = ecm->private_segment_size;
760   options[APP_OPTIONS_ADD_SEGMENT_SIZE] = ecm->private_segment_size;
761   options[APP_OPTIONS_RX_FIFO_SIZE] = ecm->fifo_size;
762   options[APP_OPTIONS_TX_FIFO_SIZE] = ecm->fifo_size;
763   options[APP_OPTIONS_PRIVATE_SEGMENT_COUNT] = ecm->private_segment_count;
764   options[APP_OPTIONS_PREALLOC_FIFO_PAIRS] = prealloc_fifos;
765   options[APP_OPTIONS_FLAGS] = APP_OPTIONS_FLAGS_IS_BUILTIN;
766   options[APP_OPTIONS_TLS_ENGINE] = ecm->tls_engine;
767   options[APP_OPTIONS_PCT_FIRST_ALLOC] = 100;
768   options[APP_OPTIONS_FLAGS] |= ecm->attach_flags;
769   if (ecm->appns_id)
770     {
771       options[APP_OPTIONS_NAMESPACE_SECRET] = ecm->appns_secret;
772       a->namespace_id = ecm->appns_id;
773     }
774   a->options = options;
775
776   if ((rv = vnet_application_attach (a)))
777     return clib_error_return (0, "attach returned %d", rv);
778
779   ecm->app_index = a->app_index;
780   vec_free (a->name);
781
782   clib_memset (ck_pair, 0, sizeof (*ck_pair));
783   ck_pair->cert = (u8 *) test_srv_crt_rsa;
784   ck_pair->key = (u8 *) test_srv_key_rsa;
785   ck_pair->cert_len = test_srv_crt_rsa_len;
786   ck_pair->key_len = test_srv_key_rsa_len;
787   vnet_app_add_cert_key_pair (ck_pair);
788   ecm->ckpair_index = ck_pair->index;
789
790   ecm->test_client_attached = 1;
791
792   return 0;
793 }
794
795 static int
796 ec_detach ()
797 {
798   ec_main_t *ecm = &ec_main;
799   vnet_app_detach_args_t _da, *da = &_da;
800   int rv;
801
802   if (!ecm->test_client_attached)
803     return 0;
804
805   da->app_index = ecm->app_index;
806   da->api_client_index = ~0;
807   rv = vnet_application_detach (da);
808   ecm->test_client_attached = 0;
809   ecm->app_index = ~0;
810   vnet_app_del_cert_key_pair (ecm->ckpair_index);
811
812   return rv;
813 }
814
815 static int
816 ec_transport_needs_crypto (transport_proto_t proto)
817 {
818   return proto == TRANSPORT_PROTO_TLS || proto == TRANSPORT_PROTO_DTLS ||
819          proto == TRANSPORT_PROTO_QUIC;
820 }
821
822 static int
823 ec_connect_rpc (void *args)
824 {
825   ec_main_t *ecm = &ec_main;
826   vnet_connect_args_t _a = {}, *a = &_a;
827   int rv, needs_crypto;
828   u32 n_clients, ci;
829
830   n_clients = ecm->n_clients;
831   needs_crypto = ec_transport_needs_crypto (ecm->transport_proto);
832   clib_memcpy (&a->sep_ext, &ecm->connect_sep, sizeof (ecm->connect_sep));
833   a->sep_ext.transport_flags |= TRANSPORT_CFG_F_CONNECTED;
834   a->app_index = ecm->app_index;
835
836   ci = ecm->connect_conn_index;
837
838   while (ci < n_clients)
839     {
840       /* Crude pacing for call setups  */
841       if (ci - ecm->ready_connections > 128)
842         {
843           ecm->connect_conn_index = ci;
844           break;
845         }
846
847       a->api_context = ci;
848       if (needs_crypto)
849         {
850           session_endpoint_alloc_ext_cfg (&a->sep_ext,
851                                           TRANSPORT_ENDPT_EXT_CFG_CRYPTO);
852           a->sep_ext.ext_cfg->crypto.ckpair_index = ecm->ckpair_index;
853         }
854
855       rv = vnet_connect (a);
856
857       if (needs_crypto)
858         clib_mem_free (a->sep_ext.ext_cfg);
859
860       if (rv)
861         {
862           clib_warning ("connect returned: %U", format_session_error, rv);
863           ecm->run_test = EC_EXITING;
864           signal_evt_to_cli (EC_CLI_CONNECTS_FAILED);
865           break;
866         }
867
868       ci += 1;
869     }
870
871   if (ci < ecm->expected_connections && ecm->run_test != EC_EXITING)
872     ec_program_connects ();
873
874   return 0;
875 }
876
877 void
878 ec_program_connects (void)
879 {
880   session_send_rpc_evt_to_thread_force (transport_cl_thread (), ec_connect_rpc,
881                                         0);
882 }
883
884 #define ec_cli(_fmt, _args...)                                                \
885   if (!ecm->no_output)                                                        \
886   vlib_cli_output (vm, _fmt, ##_args)
887
888 static clib_error_t *
889 ec_command_fn (vlib_main_t *vm, unformat_input_t *input,
890                vlib_cli_command_t *cmd)
891 {
892   unformat_input_t _line_input, *line_input = &_line_input;
893   char *default_uri = "tcp://6.0.1.1/1234", *transfer_type;
894   ec_main_t *ecm = &ec_main;
895   uword *event_data = 0, event_type;
896   clib_error_t *error = 0;
897   int rv, had_config = 1;
898   u64 tmp, total_bytes;
899   f64 delta;
900
901   if (ecm->test_client_attached)
902     return clib_error_return (0, "failed: already running!");
903
904   if (ec_init (vm))
905     {
906       error = clib_error_return (0, "failed init");
907       goto cleanup;
908     }
909
910   if (!unformat_user (input, unformat_line_input, line_input))
911     {
912       had_config = 0;
913       goto parse_config;
914     }
915
916   while (unformat_check_input (line_input) != UNFORMAT_END_OF_INPUT)
917     {
918       if (unformat (line_input, "uri %s", &ecm->connect_uri))
919         ;
920       else if (unformat (line_input, "nclients %d", &ecm->n_clients))
921         ;
922       else if (unformat (line_input, "quic-streams %d", &ecm->quic_streams))
923         ;
924       else if (unformat (line_input, "mbytes %lld", &tmp))
925         ecm->bytes_to_send = tmp << 20;
926       else if (unformat (line_input, "gbytes %lld", &tmp))
927         ecm->bytes_to_send = tmp << 30;
928       else if (unformat (line_input, "bytes %U", unformat_memory_size,
929                          &ecm->bytes_to_send))
930         ;
931       else if (unformat (line_input, "test-timeout %f", &ecm->test_timeout))
932         ;
933       else if (unformat (line_input, "syn-timeout %f", &ecm->syn_timeout))
934         ;
935       else if (unformat (line_input, "no-return"))
936         ecm->no_return = 1;
937       else if (unformat (line_input, "fifo-size %d", &ecm->fifo_size))
938         ecm->fifo_size <<= 10;
939       else if (unformat (line_input, "private-segment-count %d",
940                          &ecm->private_segment_count))
941         ;
942       else if (unformat (line_input, "private-segment-size %U",
943                          unformat_memory_size, &ecm->private_segment_size))
944         ;
945       else if (unformat (line_input, "preallocate-fifos"))
946         ecm->prealloc_fifos = 1;
947       else if (unformat (line_input, "preallocate-sessions"))
948         ecm->prealloc_sessions = 1;
949       else if (unformat (line_input, "client-batch %d",
950                          &ecm->connections_per_batch))
951         ;
952       else if (unformat (line_input, "appns %_%v%_", &ecm->appns_id))
953         ;
954       else if (unformat (line_input, "all-scope"))
955         ecm->attach_flags |= (APP_OPTIONS_FLAGS_USE_GLOBAL_SCOPE |
956                               APP_OPTIONS_FLAGS_USE_LOCAL_SCOPE);
957       else if (unformat (line_input, "local-scope"))
958         ecm->attach_flags = APP_OPTIONS_FLAGS_USE_LOCAL_SCOPE;
959       else if (unformat (line_input, "global-scope"))
960         ecm->attach_flags = APP_OPTIONS_FLAGS_USE_GLOBAL_SCOPE;
961       else if (unformat (line_input, "secret %lu", &ecm->appns_secret))
962         ;
963       else if (unformat (line_input, "no-output"))
964         ecm->no_output = 1;
965       else if (unformat (line_input, "test-bytes"))
966         ecm->test_bytes = 1;
967       else if (unformat (line_input, "tls-engine %d", &ecm->tls_engine))
968         ;
969       else
970         {
971           error = clib_error_return (0, "failed: unknown input `%U'",
972                                      format_unformat_error, line_input);
973           goto cleanup;
974         }
975     }
976
977 parse_config:
978
979   ecm->expected_connections = ecm->n_clients * ecm->quic_streams;
980
981   if (!ecm->connect_uri)
982     {
983       clib_warning ("No uri provided. Using default: %s", default_uri);
984       ecm->connect_uri = format (0, "%s%c", default_uri, 0);
985     }
986
987   if ((rv = parse_uri ((char *) ecm->connect_uri, &ecm->connect_sep)))
988     {
989       error = clib_error_return (0, "Uri parse error: %d", rv);
990       goto cleanup;
991     }
992   ecm->transport_proto = ecm->connect_sep.transport_proto;
993   ecm->is_dgram = (ecm->transport_proto == TRANSPORT_PROTO_UDP);
994
995   if (ecm->prealloc_sessions)
996     ec_prealloc_sessions (ecm);
997
998   if ((error = ec_attach ()))
999     {
1000       clib_error_report (error);
1001       goto cleanup;
1002     }
1003
1004   /*
1005    * Start. Fire off connect requests
1006    */
1007
1008   ecm->syn_start_time = vlib_time_now (vm);
1009   ec_program_connects ();
1010
1011   /*
1012    * Park until the sessions come up, or syn_timeout seconds pass
1013    */
1014
1015   vlib_process_wait_for_event_or_clock (vm, ecm->syn_timeout);
1016   event_type = vlib_process_get_events (vm, &event_data);
1017   switch (event_type)
1018     {
1019     case ~0:
1020       ec_cli ("Timeout with only %d sessions active...",
1021               ecm->ready_connections);
1022       error = clib_error_return (0, "failed: syn timeout with %d sessions",
1023                                  ecm->ready_connections);
1024       goto cleanup;
1025
1026     case EC_CLI_CONNECTS_DONE:
1027       delta = vlib_time_now (vm) - ecm->syn_start_time;
1028       if (delta != 0.0)
1029         ec_cli ("%d three-way handshakes in %.2f seconds %.2f/s",
1030                 ecm->n_clients, delta, ((f64) ecm->n_clients) / delta);
1031       break;
1032
1033     case EC_CLI_CONNECTS_FAILED:
1034       error = clib_error_return (0, "failed: connect returned");
1035       goto cleanup;
1036
1037     default:
1038       ec_cli ("unexpected event(1): %d", event_type);
1039       error = clib_error_return (0, "failed: unexpected event(1): %d",
1040                                  event_type);
1041       goto cleanup;
1042     }
1043
1044   /*
1045    * Wait for the sessions to finish or test_timeout seconds pass
1046    */
1047   ecm->test_start_time = vlib_time_now (ecm->vlib_main);
1048   ec_cli ("Test started at %.6f", ecm->test_start_time);
1049   vlib_process_wait_for_event_or_clock (vm, ecm->test_timeout);
1050   event_type = vlib_process_get_events (vm, &event_data);
1051   switch (event_type)
1052     {
1053     case ~0:
1054       ec_cli ("Timeout at %.6f with %d sessions still active...",
1055               vlib_time_now (ecm->vlib_main), ecm->ready_connections);
1056       error = clib_error_return (0, "failed: timeout with %d sessions",
1057                                  ecm->ready_connections);
1058       goto cleanup;
1059
1060     case EC_CLI_TEST_DONE:
1061       ecm->test_end_time = vlib_time_now (vm);
1062       ec_cli ("Test finished at %.6f", ecm->test_end_time);
1063       break;
1064
1065     default:
1066       ec_cli ("unexpected event(2): %d", event_type);
1067       error = clib_error_return (0, "failed: unexpected event(2): %d",
1068                                  event_type);
1069       goto cleanup;
1070     }
1071
1072   /*
1073    * Done. Compute stats
1074    */
1075   delta = ecm->test_end_time - ecm->test_start_time;
1076   if (delta == 0.0)
1077     {
1078       ec_cli ("zero delta-t?");
1079       error = clib_error_return (0, "failed: zero delta-t");
1080       goto cleanup;
1081     }
1082
1083   total_bytes = (ecm->no_return ? ecm->tx_total : ecm->rx_total);
1084   transfer_type = ecm->no_return ? "half-duplex" : "full-duplex";
1085   ec_cli ("%lld bytes (%lld mbytes, %lld gbytes) in %.2f seconds", total_bytes,
1086           total_bytes / (1ULL << 20), total_bytes / (1ULL << 30), delta);
1087   ec_cli ("%.2f bytes/second %s", ((f64) total_bytes) / (delta),
1088           transfer_type);
1089   ec_cli ("%.4f gbit/second %s", (((f64) total_bytes * 8.0) / delta / 1e9),
1090           transfer_type);
1091
1092   if (ecm->test_bytes && ecm->test_failed)
1093     error = clib_error_return (0, "failed: test bytes");
1094
1095 cleanup:
1096
1097   /*
1098    * Cleanup
1099    */
1100   ecm->run_test = EC_EXITING;
1101   vlib_process_wait_for_event_or_clock (vm, 10e-3);
1102
1103   /* Detach the application, so we can use different fifo sizes next time */
1104   if (ec_detach ())
1105     {
1106       error = clib_error_return (0, "failed: app detach");
1107       ec_cli ("WARNING: app detach failed...");
1108     }
1109
1110   ec_cleanup (ecm);
1111   if (had_config)
1112     unformat_free (line_input);
1113
1114   if (error)
1115     ec_cli ("test failed");
1116
1117   return error;
1118 }
1119
1120 VLIB_CLI_COMMAND (ec_command, static) = {
1121   .path = "test echo clients",
1122   .short_help =
1123     "test echo clients [nclients %d][[m|g]bytes <bytes>]"
1124     "[test-timeout <time>][syn-timeout <time>][no-return][fifo-size <size>]"
1125     "[private-segment-count <count>][private-segment-size <bytes>[m|g]]"
1126     "[preallocate-fifos][preallocate-sessions][client-batch <batch-size>]"
1127     "[uri <tcp://ip/port>][test-bytes][no-output]",
1128   .function = ec_command_fn,
1129   .is_mp_safe = 1,
1130 };
1131
1132 clib_error_t *
1133 ec_main_init (vlib_main_t *vm)
1134 {
1135   ec_main_t *ecm = &ec_main;
1136   ecm->app_is_init = 0;
1137   return 0;
1138 }
1139
1140 VLIB_INIT_FUNCTION (ec_main_init);
1141
1142 /*
1143  * fd.io coding-style-patch-verification: ON
1144  *
1145  * Local Variables:
1146  * eval: (c-set-style "gnu")
1147  * End:
1148  */