d1443e75e80163285da0c4c8bdf9e88dcb9b205d
[vpp.git] / src / plugins / hs_apps / echo_client.c
1 /*
2  * echo_client.c - vpp built-in echo client code
3  *
4  * Copyright (c) 2017-2019 by Cisco and/or its affiliates.
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at:
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17
18 #include <hs_apps/echo_client.h>
19
20 static ec_main_t ec_main;
21
22 #define ec_err(_fmt, _args...) clib_warning (_fmt, ##_args);
23
24 #define ec_dbg(_fmt, _args...)                                                \
25   do                                                                          \
26     {                                                                         \
27       if (ec_main.cfg.verbose)                                                \
28         ec_err (_fmt, ##_args);                                               \
29     }                                                                         \
30   while (0)
31
32 #define ec_cli(_fmt, _args...) vlib_cli_output (vm, _fmt, ##_args)
33
34 static void
35 signal_evt_to_cli_i (void *codep)
36 {
37   ec_main_t *ecm = &ec_main;
38   int code;
39
40   ASSERT (vlib_get_thread_index () == 0);
41   code = pointer_to_uword (codep);
42   vlib_process_signal_event (ecm->vlib_main, ecm->cli_node_index, code, 0);
43 }
44
45 static void
46 signal_evt_to_cli (int code)
47 {
48   if (vlib_get_thread_index () != 0)
49     session_send_rpc_evt_to_thread_force (
50       0, signal_evt_to_cli_i, uword_to_pointer ((uword) code, void *));
51   else
52     signal_evt_to_cli_i (uword_to_pointer ((uword) code, void *));
53 }
54
55 static inline ec_worker_t *
56 ec_worker_get (u32 thread_index)
57 {
58   return vec_elt_at_index (ec_main.wrk, thread_index);
59 }
60
61 static inline ec_session_t *
62 ec_session_alloc (ec_worker_t *wrk)
63 {
64   ec_session_t *ecs;
65
66   pool_get_zero (wrk->sessions, ecs);
67   ecs->session_index = ecs - wrk->sessions;
68   ecs->thread_index = wrk->thread_index;
69
70   return ecs;
71 }
72
73 static inline ec_session_t *
74 ec_session_get (ec_worker_t *wrk, u32 ec_index)
75 {
76   return pool_elt_at_index (wrk->sessions, ec_index);
77 }
78
79 static void
80 send_data_chunk (ec_main_t *ecm, ec_session_t *es)
81 {
82   u8 *test_data = ecm->connect_test_data;
83   int test_buf_len, test_buf_offset, rv;
84   u32 bytes_this_chunk;
85
86   test_buf_len = vec_len (test_data);
87   ASSERT (test_buf_len > 0);
88   test_buf_offset = es->bytes_sent % test_buf_len;
89   bytes_this_chunk =
90     clib_min (test_buf_len - test_buf_offset, es->bytes_to_send);
91
92   if (!es->is_dgram)
93     {
94       if (ecm->no_copy)
95         {
96           svm_fifo_t *f = es->tx_fifo;
97           rv = clib_min (svm_fifo_max_enqueue_prod (f), bytes_this_chunk);
98           svm_fifo_enqueue_nocopy (f, rv);
99           session_send_io_evt_to_thread_custom (
100             &es->vpp_session_index, es->thread_index, SESSION_IO_EVT_TX);
101         }
102       else
103         rv =
104           app_send_stream ((app_session_t *) es, test_data + test_buf_offset,
105                            bytes_this_chunk, 0);
106     }
107   else
108     {
109       svm_fifo_t *f = es->tx_fifo;
110       u32 max_enqueue = svm_fifo_max_enqueue_prod (f);
111
112       if (max_enqueue < sizeof (session_dgram_hdr_t))
113         return;
114
115       max_enqueue -= sizeof (session_dgram_hdr_t);
116
117       if (ecm->no_copy)
118         {
119           session_dgram_hdr_t hdr;
120           app_session_transport_t *at = &es->transport;
121
122           rv = clib_min (max_enqueue, bytes_this_chunk);
123
124           hdr.data_length = rv;
125           hdr.data_offset = 0;
126           clib_memcpy_fast (&hdr.rmt_ip, &at->rmt_ip,
127                             sizeof (ip46_address_t));
128           hdr.is_ip4 = at->is_ip4;
129           hdr.rmt_port = at->rmt_port;
130           clib_memcpy_fast (&hdr.lcl_ip, &at->lcl_ip,
131                             sizeof (ip46_address_t));
132           hdr.lcl_port = at->lcl_port;
133           svm_fifo_enqueue (f, sizeof (hdr), (u8 *) & hdr);
134           svm_fifo_enqueue_nocopy (f, rv);
135           session_send_io_evt_to_thread_custom (
136             &es->vpp_session_index, es->thread_index, SESSION_IO_EVT_TX);
137         }
138       else
139         {
140           bytes_this_chunk = clib_min (bytes_this_chunk, max_enqueue);
141           bytes_this_chunk = clib_min (bytes_this_chunk, 1460);
142           rv =
143             app_send_dgram ((app_session_t *) es, test_data + test_buf_offset,
144                             bytes_this_chunk, 0);
145         }
146     }
147
148   /* If we managed to enqueue data... */
149   if (rv > 0)
150     {
151       /* Account for it... */
152       es->bytes_to_send -= rv;
153       es->bytes_sent += rv;
154
155       if (ecm->cfg.verbose)
156         {
157           ELOG_TYPE_DECLARE (e) =
158             {
159               .format = "tx-enq: xfer %d bytes, sent %u remain %u",
160               .format_args = "i4i4i4",
161             };
162           struct
163           {
164             u32 data[3];
165           } *ed;
166           ed = ELOG_DATA (&vlib_global_main.elog_main, e);
167           ed->data[0] = rv;
168           ed->data[1] = es->bytes_sent;
169           ed->data[2] = es->bytes_to_send;
170         }
171     }
172 }
173
174 static void
175 receive_data_chunk (ec_worker_t *wrk, ec_session_t *es)
176 {
177   ec_main_t *ecm = &ec_main;
178   svm_fifo_t *rx_fifo = es->rx_fifo;
179   int n_read, i;
180
181   if (ecm->cfg.test_bytes)
182     {
183       n_read =
184         app_recv ((app_session_t *) es, wrk->rx_buf, vec_len (wrk->rx_buf));
185     }
186   else
187     {
188       n_read = svm_fifo_max_dequeue_cons (rx_fifo);
189       svm_fifo_dequeue_drop (rx_fifo, n_read);
190     }
191
192   if (n_read > 0)
193     {
194       if (ecm->cfg.verbose)
195         {
196           ELOG_TYPE_DECLARE (e) =
197             {
198               .format = "rx-deq: %d bytes",
199               .format_args = "i4",
200             };
201           struct
202           {
203             u32 data[1];
204           } *ed;
205           ed = ELOG_DATA (&vlib_global_main.elog_main, e);
206           ed->data[0] = n_read;
207         }
208
209       if (ecm->cfg.test_bytes)
210         {
211           for (i = 0; i < n_read; i++)
212             {
213               if (wrk->rx_buf[i] != ((es->bytes_received + i) & 0xff))
214                 {
215                   ec_err ("read %d error at byte %lld, 0x%x not 0x%x", n_read,
216                           es->bytes_received + i, wrk->rx_buf[i],
217                           ((es->bytes_received + i) & 0xff));
218                   ecm->test_failed = 1;
219                 }
220             }
221         }
222       ASSERT (n_read <= es->bytes_to_receive);
223       es->bytes_to_receive -= n_read;
224       es->bytes_received += n_read;
225     }
226 }
227
228 static uword
229 ec_node_fn (vlib_main_t *vm, vlib_node_runtime_t *node, vlib_frame_t *frame)
230 {
231   u32 *conn_indices, *conns_this_batch, nconns_this_batch;
232   int thread_index = vm->thread_index, i, delete_session;
233   ec_main_t *ecm = &ec_main;
234   ec_worker_t *wrk;
235   ec_session_t *es;
236   session_t *s;
237
238   if (ecm->run_test != EC_RUNNING)
239     return 0;
240
241   wrk = ec_worker_get (thread_index);
242   conn_indices = wrk->conn_indices;
243   conns_this_batch = wrk->conns_this_batch;
244
245   if (((vec_len (conn_indices) == 0) && vec_len (conns_this_batch) == 0))
246     return 0;
247
248   /* Grab another pile of connections */
249   if (PREDICT_FALSE (vec_len (conns_this_batch) == 0))
250     {
251       nconns_this_batch =
252         clib_min (ecm->connections_per_batch, vec_len (conn_indices));
253
254       ASSERT (nconns_this_batch > 0);
255       vec_validate (conns_this_batch, nconns_this_batch - 1);
256       clib_memcpy_fast (conns_this_batch,
257                         conn_indices + vec_len (conn_indices) -
258                           nconns_this_batch,
259                         nconns_this_batch * sizeof (u32));
260       vec_dec_len (conn_indices, nconns_this_batch);
261     }
262
263   /*
264    * Track progress
265    */
266   if (PREDICT_FALSE (ecm->prev_conns != ecm->connections_per_batch &&
267                      ecm->prev_conns == vec_len (conns_this_batch)))
268     {
269       ecm->repeats++;
270       ecm->prev_conns = vec_len (conns_this_batch);
271       if (ecm->repeats == 500000)
272         {
273           ec_err ("stuck clients");
274         }
275     }
276   else
277     {
278       ecm->prev_conns = vec_len (conns_this_batch);
279       ecm->repeats = 0;
280     }
281
282   /*
283    * Handle connections in this batch
284    */
285   for (i = 0; i < vec_len (conns_this_batch); i++)
286     {
287       es = ec_session_get (wrk, conns_this_batch[i]);
288
289       delete_session = 1;
290
291       if (es->bytes_to_send > 0)
292         {
293           send_data_chunk (ecm, es);
294           delete_session = 0;
295         }
296
297       if (es->bytes_to_receive > 0)
298         {
299           delete_session = 0;
300         }
301
302       if (PREDICT_FALSE (delete_session == 1))
303         {
304           clib_atomic_fetch_add (&ecm->tx_total, es->bytes_sent);
305           clib_atomic_fetch_add (&ecm->rx_total, es->bytes_received);
306           s = session_get_from_handle_if_valid (es->vpp_session_handle);
307
308           if (s)
309             {
310               vnet_disconnect_args_t _a, *a = &_a;
311               a->handle = session_handle (s);
312               a->app_index = ecm->app_index;
313               vnet_disconnect_session (a);
314
315               vec_delete (conns_this_batch, 1, i);
316               i--;
317               clib_atomic_fetch_add (&ecm->ready_connections, -1);
318             }
319           else
320             {
321               ec_err ("session AWOL?");
322               vec_delete (conns_this_batch, 1, i);
323             }
324
325           /* Kick the debug CLI process */
326           if (ecm->ready_connections == 0)
327             {
328               signal_evt_to_cli (EC_CLI_TEST_DONE);
329             }
330         }
331     }
332
333   wrk->conn_indices = conn_indices;
334   wrk->conns_this_batch = conns_this_batch;
335   return 0;
336 }
337
338 VLIB_REGISTER_NODE (echo_clients_node) = {
339   .function = ec_node_fn,
340   .name = "echo-clients",
341   .type = VLIB_NODE_TYPE_INPUT,
342   .state = VLIB_NODE_STATE_DISABLED,
343 };
344
345 static void
346 ec_reset_runtime_config (ec_main_t *ecm)
347 {
348   hs_test_cfg_init (&ecm->cfg);
349   ecm->n_clients = 1;
350   ecm->quic_streams = 1;
351   ecm->bytes_to_send = 8192;
352   ecm->echo_bytes = 0;
353   ecm->fifo_size = 64 << 10;
354   ecm->connections_per_batch = 1000;
355   ecm->private_segment_count = 0;
356   ecm->private_segment_size = 256 << 20;
357   ecm->test_failed = 0;
358   ecm->tls_engine = CRYPTO_ENGINE_OPENSSL;
359   ecm->no_copy = 0;
360   ecm->run_test = EC_STARTING;
361   ecm->ready_connections = 0;
362   ecm->connect_conn_index = 0;
363   ecm->rx_total = 0;
364   ecm->tx_total = 0;
365   ecm->barrier_acq_needed = 0;
366   ecm->prealloc_sessions = 0;
367   ecm->prealloc_fifos = 0;
368   ecm->appns_id = 0;
369   ecm->appns_secret = 0;
370   ecm->attach_flags = 0;
371   ecm->syn_timeout = 20.0;
372   ecm->test_timeout = 20.0;
373   vec_free (ecm->connect_uri);
374 }
375
376 static int
377 ec_init (vlib_main_t *vm)
378 {
379   ec_main_t *ecm = &ec_main;
380   ec_worker_t *wrk;
381   u32 num_threads;
382   int i;
383
384   ec_reset_runtime_config (ecm);
385
386   /* Store cli process node index for signaling */
387   ecm->cli_node_index = vlib_get_current_process (vm)->node_runtime.node_index;
388   ecm->vlib_main = vm;
389
390   if (vlib_num_workers ())
391     {
392       /* The request came over the binary api and the inband cli handler
393        * is not mp_safe. Drop the barrier to make sure the workers are not
394        * blocked.
395        */
396       if (vlib_thread_is_main_w_barrier ())
397         {
398           ecm->barrier_acq_needed = 1;
399           vlib_worker_thread_barrier_release (vm);
400         }
401       /*
402        * There's a good chance that both the client and the server echo
403        * apps will be enabled so make sure the session queue node polls on
404        * the main thread as connections will probably be established on it.
405        */
406       vlib_node_set_state (vm, session_queue_node.index,
407                            VLIB_NODE_STATE_POLLING);
408     }
409
410   /* App init done only once */
411   if (ecm->app_is_init)
412     return 0;
413
414
415   /* Init test data. Big buffer */
416   vec_validate (ecm->connect_test_data, 4 * 1024 * 1024 - 1);
417   for (i = 0; i < vec_len (ecm->connect_test_data); i++)
418     ecm->connect_test_data[i] = i & 0xff;
419
420   num_threads = 1 /* main thread */ + vlib_num_workers ();
421   vec_validate (ecm->wrk, num_threads - 1);
422   vec_foreach (wrk, ecm->wrk)
423     {
424       vec_validate (wrk->rx_buf, vec_len (ecm->connect_test_data) - 1);
425       wrk->thread_index = wrk - ecm->wrk;
426       wrk->vpp_event_queue =
427         session_main_get_vpp_event_queue (wrk->thread_index);
428     }
429
430   ecm->app_is_init = 1;
431
432   vlib_worker_thread_barrier_sync (vm);
433   vnet_session_enable_disable (vm, 1 /* turn on session and transports */);
434
435   /* Turn on the builtin client input nodes */
436   foreach_vlib_main ()
437     vlib_node_set_state (this_vlib_main, echo_clients_node.index,
438                          VLIB_NODE_STATE_POLLING);
439
440   vlib_worker_thread_barrier_release (vm);
441
442   return 0;
443 }
444
445 static void
446 ec_prealloc_sessions (ec_main_t *ecm)
447 {
448   u32 sessions_per_wrk, n_wrks;
449   ec_worker_t *wrk;
450
451   n_wrks = vlib_num_workers () ? vlib_num_workers () : 1;
452
453   sessions_per_wrk = ecm->n_clients / n_wrks;
454   vec_foreach (wrk, ecm->wrk)
455     pool_init_fixed (wrk->sessions, 1.1 * sessions_per_wrk);
456 }
457
458 static void
459 ec_worker_cleanup (ec_worker_t *wrk)
460 {
461   pool_free (wrk->sessions);
462   vec_free (wrk->conn_indices);
463   vec_free (wrk->conns_this_batch);
464 }
465
466 static void
467 ec_cleanup (ec_main_t *ecm)
468 {
469   ec_worker_t *wrk;
470
471   vec_foreach (wrk, ecm->wrk)
472     ec_worker_cleanup (wrk);
473
474   vec_free (ecm->connect_uri);
475   vec_free (ecm->appns_id);
476
477   if (ecm->barrier_acq_needed)
478     vlib_worker_thread_barrier_sync (ecm->vlib_main);
479 }
480
481 static int
482 quic_ec_qsession_connected_callback (u32 app_index, u32 api_context,
483                                      session_t *s, session_error_t err)
484 {
485   session_endpoint_cfg_t sep = SESSION_ENDPOINT_CFG_NULL;
486   ec_main_t *ecm = &ec_main;
487   vnet_connect_args_t _a, *a = &_a;
488   u32 stream_n;
489   int rv;
490
491   ec_dbg ("QUIC Connection handle %d", session_handle (s));
492
493   a->uri = (char *) ecm->connect_uri;
494   if (parse_uri (a->uri, &sep))
495     return -1;
496   sep.parent_handle = session_handle (s);
497
498   for (stream_n = 0; stream_n < ecm->quic_streams; stream_n++)
499     {
500       clib_memset (a, 0, sizeof (*a));
501       a->app_index = ecm->app_index;
502       a->api_context = -2 - api_context;
503       clib_memcpy (&a->sep_ext, &sep, sizeof (sep));
504
505       ec_dbg ("QUIC opening stream %d", stream_n);
506       if ((rv = vnet_connect (a)))
507         {
508           clib_error ("Stream session %d opening failed: %d", stream_n, rv);
509           return -1;
510         }
511       ec_dbg ("QUIC stream %d connected", stream_n);
512     }
513   return 0;
514 }
515
516 static int
517 ec_ctrl_send (hs_test_cmd_t cmd)
518 {
519   ec_main_t *ecm = &ec_main;
520   session_t *s;
521   int rv;
522
523   ecm->cfg.cmd = cmd;
524   if (ecm->ctrl_session_handle == SESSION_INVALID_HANDLE)
525     {
526       ec_dbg ("ctrl session went away");
527       return -1;
528     }
529
530   s = session_get_from_handle_if_valid (ecm->ctrl_session_handle);
531   if (!s)
532     {
533       ec_err ("ctrl session not found");
534       return -1;
535     }
536
537   ec_dbg ("sending test paramters to the server..");
538   if (ecm->cfg.verbose)
539     hs_test_cfg_dump (&ecm->cfg, 1);
540
541   rv = svm_fifo_enqueue (s->tx_fifo, sizeof (ecm->cfg), (u8 *) &ecm->cfg);
542   ASSERT (rv == sizeof (ecm->cfg));
543   session_send_io_evt_to_thread (s->tx_fifo, SESSION_IO_EVT_TX);
544   return 0;
545 }
546
547 static int
548 ec_ctrl_session_connected_callback (session_t *s)
549 {
550   ec_main_t *ecm = &ec_main;
551
552   s->opaque = HS_CTRL_HANDLE;
553   ecm->ctrl_session_handle = session_handle (s);
554
555   /* send test parameters to the server */
556   ec_ctrl_send (HS_TEST_CMD_SYNC);
557   return 0;
558 }
559
560 static int
561 quic_ec_session_connected_callback (u32 app_index, u32 api_context,
562                                     session_t *s, session_error_t err)
563 {
564   ec_main_t *ecm = &ec_main;
565   ec_session_t *es;
566   ec_worker_t *wrk;
567   u32 thread_index;
568
569   if (PREDICT_FALSE (api_context == HS_CTRL_HANDLE))
570     return ec_ctrl_session_connected_callback (s);
571
572   if (PREDICT_FALSE (ecm->run_test != EC_STARTING))
573     return -1;
574
575   if (err)
576     {
577       ec_err ("connection %d failed!", api_context);
578       ecm->run_test = EC_EXITING;
579       signal_evt_to_cli (EC_CLI_CONNECTS_FAILED);
580       return 0;
581     }
582
583   if (s->listener_handle == SESSION_INVALID_HANDLE)
584     return quic_ec_qsession_connected_callback (app_index, api_context, s,
585                                                 err);
586   ec_dbg ("STREAM Connection callback %d", api_context);
587
588   thread_index = s->thread_index;
589   ASSERT (thread_index == vlib_get_thread_index ()
590           || session_transport_service_type (s) == TRANSPORT_SERVICE_CL);
591
592   wrk = ec_worker_get (thread_index);
593
594   /*
595    * Setup session
596    */
597   es = ec_session_alloc (wrk);
598   hs_test_app_session_init (es, s);
599
600   es->bytes_to_send = ecm->bytes_to_send;
601   es->bytes_to_receive = ecm->echo_bytes ? ecm->bytes_to_send : 0ULL;
602   es->vpp_session_handle = session_handle (s);
603   es->vpp_session_index = s->session_index;
604   s->opaque = es->session_index;
605
606   vec_add1 (wrk->conn_indices, es->session_index);
607   clib_atomic_fetch_add (&ecm->ready_connections, 1);
608   if (ecm->ready_connections == ecm->expected_connections)
609     {
610       ecm->run_test = EC_RUNNING;
611       /* Signal the CLI process that the action is starting... */
612       signal_evt_to_cli (EC_CLI_CONNECTS_DONE);
613     }
614
615   return 0;
616 }
617
618 static int
619 ec_session_connected_callback (u32 app_index, u32 api_context, session_t *s,
620                                session_error_t err)
621 {
622   ec_main_t *ecm = &ec_main;
623   ec_session_t *es;
624   u32 thread_index;
625   ec_worker_t *wrk;
626
627   if (PREDICT_FALSE (ecm->run_test != EC_STARTING))
628     return -1;
629
630   if (err)
631     {
632       ec_err ("connection %d failed! %U", api_context, format_session_error,
633               err);
634       ecm->run_test = EC_EXITING;
635       signal_evt_to_cli (EC_CLI_CONNECTS_FAILED);
636       return 0;
637     }
638
639   thread_index = s->thread_index;
640   ASSERT (thread_index == vlib_get_thread_index ()
641           || session_transport_service_type (s) == TRANSPORT_SERVICE_CL);
642
643   if (PREDICT_FALSE (api_context == HS_CTRL_HANDLE))
644     return ec_ctrl_session_connected_callback (s);
645
646   wrk = ec_worker_get (thread_index);
647
648   /*
649    * Setup session
650    */
651   es = ec_session_alloc (wrk);
652   hs_test_app_session_init (es, s);
653
654   es->bytes_to_send = ecm->bytes_to_send;
655   es->bytes_to_receive = ecm->echo_bytes ? ecm->bytes_to_send : 0ULL;
656   es->vpp_session_handle = session_handle (s);
657   es->vpp_session_index = s->session_index;
658   s->opaque = es->session_index;
659
660   vec_add1 (wrk->conn_indices, es->session_index);
661   clib_atomic_fetch_add (&ecm->ready_connections, 1);
662   if (ecm->ready_connections == ecm->expected_connections)
663     {
664       ecm->run_test = EC_RUNNING;
665       /* Signal the CLI process that the action is starting... */
666       signal_evt_to_cli (EC_CLI_CONNECTS_DONE);
667     }
668
669   return 0;
670 }
671
672 static void
673 ec_session_reset_callback (session_t *s)
674 {
675   ec_main_t *ecm = &ec_main;
676   vnet_disconnect_args_t _a = { 0 }, *a = &_a;
677
678   if (s->session_state == SESSION_STATE_READY)
679     ec_err ("Reset active connection %U", format_session, s, 2);
680
681   a->handle = session_handle (s);
682   a->app_index = ecm->app_index;
683   vnet_disconnect_session (a);
684   return;
685 }
686
687 static int
688 ec_session_accept_callback (session_t *s)
689 {
690   return 0;
691 }
692
693 static void
694 ec_session_disconnect_callback (session_t *s)
695 {
696   ec_main_t *ecm = &ec_main;
697   vnet_disconnect_args_t _a = { 0 }, *a = &_a;
698
699   if (session_handle (s) == ecm->ctrl_session_handle)
700     {
701       ec_dbg ("ctrl session disconnect");
702       ecm->ctrl_session_handle = SESSION_INVALID_HANDLE;
703     }
704
705   a->handle = session_handle (s);
706   a->app_index = ecm->app_index;
707   vnet_disconnect_session (a);
708   return;
709 }
710
711 void
712 ec_session_disconnect (session_t *s)
713 {
714   ec_main_t *ecm = &ec_main;
715   vnet_disconnect_args_t _a = { 0 }, *a = &_a;
716   a->handle = session_handle (s);
717   a->app_index = ecm->app_index;
718   vnet_disconnect_session (a);
719 }
720
721 static int
722 ec_ctrl_session_rx_callback (session_t *s)
723 {
724   ec_main_t *ecm = &ec_main;
725   int rx_bytes;
726   hs_test_cfg_t cfg = { 0 };
727
728   rx_bytes = svm_fifo_dequeue (s->rx_fifo, sizeof (cfg), (u8 *) &cfg);
729   if (rx_bytes != sizeof (cfg))
730     {
731       ec_err ("invalid cfg length %d (expected %d)", rx_bytes, sizeof (cfg));
732       signal_evt_to_cli (EC_CLI_CONNECTS_FAILED);
733       return -1;
734     }
735
736   ec_dbg ("control message received:");
737   if (ecm->cfg.verbose)
738     hs_test_cfg_dump (&cfg, 1);
739
740   switch (cfg.cmd)
741     {
742     case HS_TEST_CMD_SYNC:
743       switch (ecm->run_test)
744         {
745         case EC_STARTING:
746           if (!hs_test_cfg_verify (&cfg, &ecm->cfg))
747             {
748               ec_err ("invalid config received from server!");
749               signal_evt_to_cli (EC_CLI_CONNECTS_FAILED);
750               return -1;
751             }
752           signal_evt_to_cli (EC_CLI_CFG_SYNC);
753           break;
754
755         case EC_RUNNING:
756           ec_dbg ("test running..");
757           break;
758
759         case EC_EXITING:
760           /* post test sync */
761           signal_evt_to_cli (EC_CLI_CFG_SYNC);
762           break;
763
764         default:
765           ec_err ("unexpected test state! %d", ecm->run_test);
766           break;
767         }
768       break;
769     case HS_TEST_CMD_START:
770       signal_evt_to_cli (EC_CLI_START);
771       break;
772     case HS_TEST_CMD_STOP:
773       signal_evt_to_cli (EC_CLI_STOP);
774       break;
775     default:
776       ec_err ("unexpected cmd! %d", cfg.cmd);
777       break;
778     }
779
780   return 0;
781 }
782
783 static int
784 ec_session_rx_callback (session_t *s)
785 {
786   ec_main_t *ecm = &ec_main;
787   ec_worker_t *wrk;
788   ec_session_t *es;
789
790   if (PREDICT_FALSE (s->opaque == HS_CTRL_HANDLE))
791     return ec_ctrl_session_rx_callback (s);
792
793   if (PREDICT_FALSE (ecm->run_test != EC_RUNNING))
794     {
795       ec_session_disconnect (s);
796       return -1;
797     }
798
799   wrk = ec_worker_get (s->thread_index);
800   es = ec_session_get (wrk, s->opaque);
801
802   receive_data_chunk (wrk, es);
803
804   if (svm_fifo_max_dequeue_cons (s->rx_fifo))
805     session_enqueue_notify (s);
806
807   return 0;
808 }
809
810 static int
811 ec_add_segment_callback (u32 app_index, u64 segment_handle)
812 {
813   /* New segments may be added */
814   return 0;
815 }
816
817 static int
818 ec_del_segment_callback (u32 app_index, u64 segment_handle)
819 {
820   return 0;
821 }
822
823 static session_cb_vft_t ec_cb_vft = {
824   .session_reset_callback = ec_session_reset_callback,
825   .session_connected_callback = ec_session_connected_callback,
826   .session_accept_callback = ec_session_accept_callback,
827   .session_disconnect_callback = ec_session_disconnect_callback,
828   .builtin_app_rx_callback = ec_session_rx_callback,
829   .add_segment_callback = ec_add_segment_callback,
830   .del_segment_callback = ec_del_segment_callback,
831 };
832
833 static clib_error_t *
834 ec_attach ()
835 {
836   vnet_app_add_cert_key_pair_args_t _ck_pair, *ck_pair = &_ck_pair;
837   ec_main_t *ecm = &ec_main;
838   vnet_app_attach_args_t _a, *a = &_a;
839   u32 prealloc_fifos;
840   u64 options[18];
841   int rv;
842
843   clib_memset (a, 0, sizeof (*a));
844   clib_memset (options, 0, sizeof (options));
845
846   a->api_client_index = ~0;
847   a->name = format (0, "echo_client");
848   if (ecm->transport_proto == TRANSPORT_PROTO_QUIC)
849     ec_cb_vft.session_connected_callback = quic_ec_session_connected_callback;
850   a->session_cb_vft = &ec_cb_vft;
851
852   prealloc_fifos = ecm->prealloc_fifos ? ecm->expected_connections : 1;
853
854   options[APP_OPTIONS_ACCEPT_COOKIE] = 0x12345678;
855   options[APP_OPTIONS_SEGMENT_SIZE] = ecm->private_segment_size;
856   options[APP_OPTIONS_ADD_SEGMENT_SIZE] = ecm->private_segment_size;
857   options[APP_OPTIONS_RX_FIFO_SIZE] = ecm->fifo_size;
858   options[APP_OPTIONS_TX_FIFO_SIZE] = ecm->fifo_size;
859   options[APP_OPTIONS_PRIVATE_SEGMENT_COUNT] = ecm->private_segment_count;
860   options[APP_OPTIONS_PREALLOC_FIFO_PAIRS] = prealloc_fifos;
861   options[APP_OPTIONS_FLAGS] = APP_OPTIONS_FLAGS_IS_BUILTIN;
862   options[APP_OPTIONS_TLS_ENGINE] = ecm->tls_engine;
863   options[APP_OPTIONS_PCT_FIRST_ALLOC] = 100;
864   options[APP_OPTIONS_FLAGS] |= ecm->attach_flags;
865   if (ecm->appns_id)
866     {
867       options[APP_OPTIONS_NAMESPACE_SECRET] = ecm->appns_secret;
868       a->namespace_id = ecm->appns_id;
869     }
870   a->options = options;
871
872   if ((rv = vnet_application_attach (a)))
873     return clib_error_return (0, "attach returned %d", rv);
874
875   ecm->app_index = a->app_index;
876   vec_free (a->name);
877
878   clib_memset (ck_pair, 0, sizeof (*ck_pair));
879   ck_pair->cert = (u8 *) test_srv_crt_rsa;
880   ck_pair->key = (u8 *) test_srv_key_rsa;
881   ck_pair->cert_len = test_srv_crt_rsa_len;
882   ck_pair->key_len = test_srv_key_rsa_len;
883   vnet_app_add_cert_key_pair (ck_pair);
884   ecm->ckpair_index = ck_pair->index;
885
886   ecm->test_client_attached = 1;
887
888   return 0;
889 }
890
891 static int
892 ec_detach ()
893 {
894   ec_main_t *ecm = &ec_main;
895   vnet_app_detach_args_t _da, *da = &_da;
896   int rv;
897
898   if (!ecm->test_client_attached)
899     return 0;
900
901   da->app_index = ecm->app_index;
902   da->api_client_index = ~0;
903   rv = vnet_application_detach (da);
904   ecm->test_client_attached = 0;
905   ecm->app_index = ~0;
906   vnet_app_del_cert_key_pair (ecm->ckpair_index);
907
908   return rv;
909 }
910
911 static int
912 ec_transport_needs_crypto (transport_proto_t proto)
913 {
914   return proto == TRANSPORT_PROTO_TLS || proto == TRANSPORT_PROTO_DTLS ||
915          proto == TRANSPORT_PROTO_QUIC;
916 }
917
918 static int
919 ec_connect_rpc (void *args)
920 {
921   ec_main_t *ecm = &ec_main;
922   vnet_connect_args_t _a = {}, *a = &_a;
923   int rv, needs_crypto;
924   u32 n_clients, ci;
925
926   n_clients = ecm->n_clients;
927   needs_crypto = ec_transport_needs_crypto (ecm->transport_proto);
928   clib_memcpy (&a->sep_ext, &ecm->connect_sep, sizeof (ecm->connect_sep));
929   a->sep_ext.transport_flags |= TRANSPORT_CFG_F_CONNECTED;
930   a->app_index = ecm->app_index;
931
932   ci = ecm->connect_conn_index;
933
934   while (ci < n_clients)
935     {
936       /* Crude pacing for call setups  */
937       if (ci - ecm->ready_connections > 128)
938         {
939           ecm->connect_conn_index = ci;
940           break;
941         }
942
943       a->api_context = ci;
944       if (needs_crypto)
945         {
946           session_endpoint_alloc_ext_cfg (&a->sep_ext,
947                                           TRANSPORT_ENDPT_EXT_CFG_CRYPTO);
948           a->sep_ext.ext_cfg->crypto.ckpair_index = ecm->ckpair_index;
949         }
950
951       rv = vnet_connect (a);
952
953       if (needs_crypto)
954         clib_mem_free (a->sep_ext.ext_cfg);
955
956       if (rv)
957         {
958           ec_err ("connect returned: %U", format_session_error, rv);
959           ecm->run_test = EC_EXITING;
960           signal_evt_to_cli (EC_CLI_CONNECTS_FAILED);
961           break;
962         }
963
964       ci += 1;
965     }
966
967   if (ci < ecm->expected_connections && ecm->run_test != EC_EXITING)
968     ec_program_connects ();
969
970   return 0;
971 }
972
973 void
974 ec_program_connects (void)
975 {
976   session_send_rpc_evt_to_thread_force (transport_cl_thread (), ec_connect_rpc,
977                                         0);
978 }
979
980 static clib_error_t *
981 ec_ctrl_connect_rpc ()
982 {
983   session_error_t rv;
984   ec_main_t *ecm = &ec_main;
985   vnet_connect_args_t _a = {}, *a = &_a;
986
987   a->api_context = HS_CTRL_HANDLE;
988   ecm->cfg.cmd = HS_TEST_CMD_SYNC;
989   clib_memcpy (&a->sep_ext, &ecm->connect_sep, sizeof (ecm->connect_sep));
990   a->sep_ext.transport_proto = TRANSPORT_PROTO_TCP;
991   a->app_index = ecm->app_index;
992
993   rv = vnet_connect (a);
994   if (rv)
995     {
996       ec_err ("ctrl connect returned: %U", format_session_error, rv);
997       ecm->run_test = EC_EXITING;
998       signal_evt_to_cli (EC_CLI_CONNECTS_FAILED);
999     }
1000   return 0;
1001 }
1002
1003 static void
1004 ec_ctrl_connect (void)
1005 {
1006   session_send_rpc_evt_to_thread_force (transport_cl_thread (),
1007                                         ec_ctrl_connect_rpc, 0);
1008 }
1009
1010 static void
1011 ec_ctrl_session_disconnect ()
1012 {
1013   ec_main_t *ecm = &ec_main;
1014   vnet_disconnect_args_t _a, *a = &_a;
1015   session_error_t err;
1016
1017   a->handle = ecm->ctrl_session_handle;
1018   a->app_index = ecm->app_index;
1019   err = vnet_disconnect_session (a);
1020   if (err)
1021     ec_err ("vnet_disconnect_session: %U", format_session_error, err);
1022 }
1023
1024 static int
1025 ec_ctrl_test_sync ()
1026 {
1027   ec_main_t *ecm = &ec_main;
1028   ecm->cfg.test = HS_TEST_TYPE_ECHO;
1029   return ec_ctrl_send (HS_TEST_CMD_SYNC);
1030 }
1031
1032 static int
1033 ec_ctrl_test_start ()
1034 {
1035   return ec_ctrl_send (HS_TEST_CMD_START);
1036 }
1037
1038 static int
1039 ec_ctrl_test_stop ()
1040 {
1041   return ec_ctrl_send (HS_TEST_CMD_STOP);
1042 }
1043
1044 #define ec_wait_for_signal(_sig)                                              \
1045   vlib_process_wait_for_event_or_clock (vm, ecm->syn_timeout);                \
1046   event_type = vlib_process_get_events (vm, &event_data);                     \
1047   switch (event_type)                                                         \
1048     {                                                                         \
1049     case ~0:                                                                  \
1050       ec_cli ("Timeout while waiting for " #_sig);                            \
1051       error =                                                                 \
1052         clib_error_return (0, "failed: timeout while waiting for " #_sig);    \
1053       goto cleanup;                                                           \
1054     case _sig:                                                                \
1055       break;                                                                  \
1056     default:                                                                  \
1057       ec_cli ("unexpected event while waiting for " #_sig ": %d",             \
1058               event_type);                                                    \
1059       error =                                                                 \
1060         clib_error_return (0, "failed: unexpected event: %d", event_type);    \
1061       goto cleanup;                                                           \
1062     }
1063
1064 static clib_error_t *
1065 ec_command_fn (vlib_main_t *vm, unformat_input_t *input,
1066                vlib_cli_command_t *cmd)
1067 {
1068   unformat_input_t _line_input, *line_input = &_line_input;
1069   char *default_uri = "tcp://6.0.1.1/1234", *transfer_type;
1070   ec_main_t *ecm = &ec_main;
1071   uword *event_data = 0, event_type;
1072   clib_error_t *error = 0;
1073   int rv, had_config = 1;
1074   u64 tmp, total_bytes;
1075   f64 delta;
1076
1077   if (ecm->test_client_attached)
1078     return clib_error_return (0, "failed: already running!");
1079
1080   if (ec_init (vm))
1081     {
1082       error = clib_error_return (0, "failed init");
1083       goto cleanup;
1084     }
1085
1086   if (!unformat_user (input, unformat_line_input, line_input))
1087     {
1088       had_config = 0;
1089       goto parse_config;
1090     }
1091
1092   while (unformat_check_input (line_input) != UNFORMAT_END_OF_INPUT)
1093     {
1094       if (unformat (line_input, "uri %s", &ecm->connect_uri))
1095         ;
1096       else if (unformat (line_input, "nclients %d", &ecm->n_clients))
1097         ;
1098       else if (unformat (line_input, "quic-streams %d", &ecm->quic_streams))
1099         ;
1100       else if (unformat (line_input, "mbytes %lld", &tmp))
1101         ecm->bytes_to_send = tmp << 20;
1102       else if (unformat (line_input, "gbytes %lld", &tmp))
1103         ecm->bytes_to_send = tmp << 30;
1104       else if (unformat (line_input, "bytes %U", unformat_memory_size,
1105                          &ecm->bytes_to_send))
1106         ;
1107       else if (unformat (line_input, "test-timeout %f", &ecm->test_timeout))
1108         ;
1109       else if (unformat (line_input, "syn-timeout %f", &ecm->syn_timeout))
1110         ;
1111       else if (unformat (line_input, "echo-bytes"))
1112         ecm->echo_bytes = 1;
1113       else if (unformat (line_input, "fifo-size %U", unformat_memory_size,
1114                          &ecm->fifo_size))
1115         ;
1116       else if (unformat (line_input, "private-segment-count %d",
1117                          &ecm->private_segment_count))
1118         ;
1119       else if (unformat (line_input, "private-segment-size %U",
1120                          unformat_memory_size, &ecm->private_segment_size))
1121         ;
1122       else if (unformat (line_input, "preallocate-fifos"))
1123         ecm->prealloc_fifos = 1;
1124       else if (unformat (line_input, "preallocate-sessions"))
1125         ecm->prealloc_sessions = 1;
1126       else if (unformat (line_input, "client-batch %d",
1127                          &ecm->connections_per_batch))
1128         ;
1129       else if (unformat (line_input, "appns %_%v%_", &ecm->appns_id))
1130         ;
1131       else if (unformat (line_input, "all-scope"))
1132         ecm->attach_flags |= (APP_OPTIONS_FLAGS_USE_GLOBAL_SCOPE |
1133                               APP_OPTIONS_FLAGS_USE_LOCAL_SCOPE);
1134       else if (unformat (line_input, "local-scope"))
1135         ecm->attach_flags = APP_OPTIONS_FLAGS_USE_LOCAL_SCOPE;
1136       else if (unformat (line_input, "global-scope"))
1137         ecm->attach_flags = APP_OPTIONS_FLAGS_USE_GLOBAL_SCOPE;
1138       else if (unformat (line_input, "secret %lu", &ecm->appns_secret))
1139         ;
1140       else if (unformat (line_input, "verbose"))
1141         ecm->cfg.verbose = 1;
1142       else if (unformat (line_input, "test-bytes"))
1143         ecm->cfg.test_bytes = 1;
1144       else if (unformat (line_input, "tls-engine %d", &ecm->tls_engine))
1145         ;
1146       else
1147         {
1148           error = clib_error_return (0, "failed: unknown input `%U'",
1149                                      format_unformat_error, line_input);
1150           goto cleanup;
1151         }
1152     }
1153
1154 parse_config:
1155
1156   ecm->cfg.num_test_sessions = ecm->expected_connections =
1157     ecm->n_clients * ecm->quic_streams;
1158
1159   if (!ecm->connect_uri)
1160     {
1161       ec_cli ("No uri provided. Using default: %s", default_uri);
1162       ecm->connect_uri = format (0, "%s%c", default_uri, 0);
1163     }
1164
1165   if ((rv = parse_uri ((char *) ecm->connect_uri, &ecm->connect_sep)))
1166     {
1167       error = clib_error_return (0, "Uri parse error: %d", rv);
1168       goto cleanup;
1169     }
1170   ecm->transport_proto = ecm->connect_sep.transport_proto;
1171
1172   if (ecm->prealloc_sessions)
1173     ec_prealloc_sessions (ecm);
1174
1175   if ((error = ec_attach ()))
1176     {
1177       clib_error_report (error);
1178       goto cleanup;
1179     }
1180
1181   if (ecm->echo_bytes)
1182     ecm->cfg.test = HS_TEST_TYPE_BI;
1183   else
1184     ecm->cfg.test = HS_TEST_TYPE_UNI;
1185
1186   ec_ctrl_connect ();
1187   ec_wait_for_signal (EC_CLI_CFG_SYNC);
1188
1189   if (ec_ctrl_test_start () < 0)
1190     {
1191       ec_cli ("failed to send start command");
1192       goto cleanup;
1193     }
1194   ec_wait_for_signal (EC_CLI_START);
1195
1196   /*
1197    * Start. Fire off connect requests
1198    */
1199
1200   /* update data port */
1201   ecm->connect_sep.port = hs_make_data_port (ecm->connect_sep.port);
1202
1203   ecm->syn_start_time = vlib_time_now (vm);
1204   ec_program_connects ();
1205
1206   /*
1207    * Park until the sessions come up, or syn_timeout seconds pass
1208    */
1209
1210   vlib_process_wait_for_event_or_clock (vm, ecm->syn_timeout);
1211   event_type = vlib_process_get_events (vm, &event_data);
1212   switch (event_type)
1213     {
1214     case ~0:
1215       ec_cli ("Timeout with only %d sessions active...",
1216               ecm->ready_connections);
1217       error = clib_error_return (0, "failed: syn timeout with %d sessions",
1218                                  ecm->ready_connections);
1219       goto stop_test;
1220
1221     case EC_CLI_CONNECTS_DONE:
1222       delta = vlib_time_now (vm) - ecm->syn_start_time;
1223       if (delta != 0.0)
1224         ec_cli ("%d three-way handshakes in %.2f seconds %.2f/s",
1225                 ecm->n_clients, delta, ((f64) ecm->n_clients) / delta);
1226       break;
1227
1228     case EC_CLI_CONNECTS_FAILED:
1229       error = clib_error_return (0, "failed: connect returned");
1230       goto stop_test;
1231
1232     default:
1233       ec_cli ("unexpected event(2): %d", event_type);
1234       error =
1235         clib_error_return (0, "failed: unexpected event(2): %d", event_type);
1236       goto stop_test;
1237     }
1238
1239   /*
1240    * Wait for the sessions to finish or test_timeout seconds pass
1241    */
1242   ecm->test_start_time = vlib_time_now (ecm->vlib_main);
1243   ec_cli ("Test started at %.6f", ecm->test_start_time);
1244   vlib_process_wait_for_event_or_clock (vm, ecm->test_timeout);
1245   event_type = vlib_process_get_events (vm, &event_data);
1246   switch (event_type)
1247     {
1248     case ~0:
1249       ec_cli ("Timeout at %.6f with %d sessions still active...",
1250               vlib_time_now (ecm->vlib_main), ecm->ready_connections);
1251       error = clib_error_return (0, "failed: timeout with %d sessions",
1252                                  ecm->ready_connections);
1253       goto stop_test;
1254
1255     case EC_CLI_TEST_DONE:
1256       ecm->test_end_time = vlib_time_now (vm);
1257       ec_cli ("Test finished at %.6f", ecm->test_end_time);
1258       break;
1259
1260     default:
1261       ec_cli ("unexpected event(3): %d", event_type);
1262       error =
1263         clib_error_return (0, "failed: unexpected event(3): %d", event_type);
1264       goto stop_test;
1265     }
1266
1267   /*
1268    * Done. Compute stats
1269    */
1270   delta = ecm->test_end_time - ecm->test_start_time;
1271   if (delta == 0.0)
1272     {
1273       ec_cli ("zero delta-t?");
1274       error = clib_error_return (0, "failed: zero delta-t");
1275       goto stop_test;
1276     }
1277
1278   total_bytes = (ecm->echo_bytes ? ecm->rx_total : ecm->tx_total);
1279   transfer_type = ecm->echo_bytes ? "full-duplex" : "half-duplex";
1280   ec_cli ("%lld bytes (%lld mbytes, %lld gbytes) in %.2f seconds", total_bytes,
1281           total_bytes / (1ULL << 20), total_bytes / (1ULL << 30), delta);
1282   ec_cli ("%.2f bytes/second %s", ((f64) total_bytes) / (delta),
1283           transfer_type);
1284   ec_cli ("%.4f gbit/second %s", (((f64) total_bytes * 8.0) / delta / 1e9),
1285           transfer_type);
1286
1287   if (ecm->cfg.test_bytes && ecm->test_failed)
1288     error = clib_error_return (0, "failed: test bytes");
1289
1290 stop_test:
1291   ecm->run_test = EC_EXITING;
1292
1293   /* send stop test command to the server */
1294   if (ec_ctrl_test_stop () < 0)
1295     {
1296       ec_cli ("failed to send stop command");
1297       goto cleanup;
1298     }
1299   ec_wait_for_signal (EC_CLI_STOP);
1300
1301   /* post test sync */
1302   if (ec_ctrl_test_sync () < 0)
1303     {
1304       ec_cli ("failed to send post sync command");
1305       goto cleanup;
1306     }
1307   ec_wait_for_signal (EC_CLI_CFG_SYNC);
1308
1309   /* disconnect control session */
1310   ec_ctrl_session_disconnect ();
1311
1312 cleanup:
1313
1314   ecm->run_test = EC_EXITING;
1315   vlib_process_wait_for_event_or_clock (vm, 10e-3);
1316
1317   /* Detach the application, so we can use different fifo sizes next time */
1318   if (ec_detach ())
1319     {
1320       error = clib_error_return (0, "failed: app detach");
1321       ec_cli ("WARNING: app detach failed...");
1322     }
1323
1324   ec_cleanup (ecm);
1325   if (had_config)
1326     unformat_free (line_input);
1327
1328   if (error)
1329     ec_cli ("test failed");
1330
1331   return error;
1332 }
1333
1334 VLIB_CLI_COMMAND (ec_command, static) = {
1335   .path = "test echo clients",
1336   .short_help =
1337     "test echo clients [nclients %d][[m|g]bytes <bytes>]"
1338     "[test-timeout <time>][syn-timeout <time>][echo-bytes][fifo-size <size>]"
1339     "[private-segment-count <count>][private-segment-size <bytes>[m|g]]"
1340     "[preallocate-fifos][preallocate-sessions][client-batch <batch-size>]"
1341     "[uri <tcp://ip/port>][test-bytes][verbose]",
1342   .function = ec_command_fn,
1343   .is_mp_safe = 1,
1344 };
1345
1346 clib_error_t *
1347 ec_main_init (vlib_main_t *vm)
1348 {
1349   ec_main_t *ecm = &ec_main;
1350   ecm->app_is_init = 0;
1351   return 0;
1352 }
1353
1354 VLIB_INIT_FUNCTION (ec_main_init);
1355
1356 /*
1357  * fd.io coding-style-patch-verification: ON
1358  *
1359  * Local Variables:
1360  * eval: (c-set-style "gnu")
1361  * End:
1362  */