hsa: fix echo client workers initialization
[vpp.git] / src / plugins / hs_apps / echo_client.c
1 /*
2  * echo_client.c - vpp built-in echo client code
3  *
4  * Copyright (c) 2017-2019 by Cisco and/or its affiliates.
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at:
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17
18 #include <hs_apps/echo_client.h>
19
20 static ec_main_t ec_main;
21
22 #define EC_DBG (0)
23 #define DBG(_fmt, _args...)                                                   \
24   if (EC_DBG)                                                                 \
25   clib_warning (_fmt, ##_args)
26
27 static void
28 signal_evt_to_cli_i (void *codep)
29 {
30   ec_main_t *ecm = &ec_main;
31   int code;
32
33   ASSERT (vlib_get_thread_index () == 0);
34   code = pointer_to_uword (codep);
35   vlib_process_signal_event (ecm->vlib_main, ecm->cli_node_index, code, 0);
36 }
37
38 static void
39 signal_evt_to_cli (int code)
40 {
41   if (vlib_get_thread_index () != 0)
42     session_send_rpc_evt_to_thread_force (
43       0, signal_evt_to_cli_i, uword_to_pointer ((uword) code, void *));
44   else
45     signal_evt_to_cli_i (uword_to_pointer ((uword) code, void *));
46 }
47
48 static inline ec_worker_t *
49 ec_worker_get (u32 thread_index)
50 {
51   return vec_elt_at_index (ec_main.wrk, thread_index);
52 }
53
54 static inline ec_session_t *
55 ec_session_alloc (ec_worker_t *wrk)
56 {
57   ec_session_t *ecs;
58
59   pool_get_zero (wrk->sessions, ecs);
60   ecs->data.session_index = ecs - wrk->sessions;
61   ecs->thread_index = wrk->thread_index;
62
63   return ecs;
64 }
65
66 static inline ec_session_t *
67 ec_session_get (ec_worker_t *wrk, u32 ec_index)
68 {
69   return pool_elt_at_index (wrk->sessions, ec_index);
70 }
71
72 static void
73 send_data_chunk (ec_main_t *ecm, ec_session_t *es)
74 {
75   u8 *test_data = ecm->connect_test_data;
76   int test_buf_len, test_buf_offset, rv;
77   u32 bytes_this_chunk;
78
79   test_buf_len = vec_len (test_data);
80   ASSERT (test_buf_len > 0);
81   test_buf_offset = es->bytes_sent % test_buf_len;
82   bytes_this_chunk =
83     clib_min (test_buf_len - test_buf_offset, es->bytes_to_send);
84
85   if (!ecm->is_dgram)
86     {
87       if (ecm->no_copy)
88         {
89           svm_fifo_t *f = es->data.tx_fifo;
90           rv = clib_min (svm_fifo_max_enqueue_prod (f), bytes_this_chunk);
91           svm_fifo_enqueue_nocopy (f, rv);
92           session_send_io_evt_to_thread_custom (
93             &es->vpp_session_index, es->thread_index, SESSION_IO_EVT_TX);
94         }
95       else
96         rv = app_send_stream (&es->data, test_data + test_buf_offset,
97                               bytes_this_chunk, 0);
98     }
99   else
100     {
101       svm_fifo_t *f = es->data.tx_fifo;
102       u32 max_enqueue = svm_fifo_max_enqueue_prod (f);
103
104       if (max_enqueue < sizeof (session_dgram_hdr_t))
105         return;
106
107       max_enqueue -= sizeof (session_dgram_hdr_t);
108
109       if (ecm->no_copy)
110         {
111           session_dgram_hdr_t hdr;
112           app_session_transport_t *at = &es->data.transport;
113
114           rv = clib_min (max_enqueue, bytes_this_chunk);
115
116           hdr.data_length = rv;
117           hdr.data_offset = 0;
118           clib_memcpy_fast (&hdr.rmt_ip, &at->rmt_ip,
119                             sizeof (ip46_address_t));
120           hdr.is_ip4 = at->is_ip4;
121           hdr.rmt_port = at->rmt_port;
122           clib_memcpy_fast (&hdr.lcl_ip, &at->lcl_ip,
123                             sizeof (ip46_address_t));
124           hdr.lcl_port = at->lcl_port;
125           svm_fifo_enqueue (f, sizeof (hdr), (u8 *) & hdr);
126           svm_fifo_enqueue_nocopy (f, rv);
127           session_send_io_evt_to_thread_custom (
128             &es->vpp_session_index, es->thread_index, SESSION_IO_EVT_TX);
129         }
130       else
131         {
132           bytes_this_chunk = clib_min (bytes_this_chunk, max_enqueue);
133           bytes_this_chunk = clib_min (bytes_this_chunk, 1460);
134           rv = app_send_dgram (&es->data, test_data + test_buf_offset,
135                                bytes_this_chunk, 0);
136         }
137     }
138
139   /* If we managed to enqueue data... */
140   if (rv > 0)
141     {
142       /* Account for it... */
143       es->bytes_to_send -= rv;
144       es->bytes_sent += rv;
145
146       if (EC_DBG)
147         {
148           ELOG_TYPE_DECLARE (e) =
149             {
150               .format = "tx-enq: xfer %d bytes, sent %u remain %u",
151               .format_args = "i4i4i4",
152             };
153           struct
154           {
155             u32 data[3];
156           } *ed;
157           ed = ELOG_DATA (&vlib_global_main.elog_main, e);
158           ed->data[0] = rv;
159           ed->data[1] = es->bytes_sent;
160           ed->data[2] = es->bytes_to_send;
161         }
162     }
163 }
164
165 static void
166 receive_data_chunk (ec_worker_t *wrk, ec_session_t *es)
167 {
168   ec_main_t *ecm = &ec_main;
169   svm_fifo_t *rx_fifo = es->data.rx_fifo;
170   int n_read, i;
171
172   if (ecm->test_bytes)
173     {
174       if (!ecm->is_dgram)
175         n_read =
176           app_recv_stream (&es->data, wrk->rx_buf, vec_len (wrk->rx_buf));
177       else
178         n_read =
179           app_recv_dgram (&es->data, wrk->rx_buf, vec_len (wrk->rx_buf));
180     }
181   else
182     {
183       n_read = svm_fifo_max_dequeue_cons (rx_fifo);
184       svm_fifo_dequeue_drop (rx_fifo, n_read);
185     }
186
187   if (n_read > 0)
188     {
189       if (EC_DBG)
190         {
191           ELOG_TYPE_DECLARE (e) =
192             {
193               .format = "rx-deq: %d bytes",
194               .format_args = "i4",
195             };
196           struct
197           {
198             u32 data[1];
199           } *ed;
200           ed = ELOG_DATA (&vlib_global_main.elog_main, e);
201           ed->data[0] = n_read;
202         }
203
204       if (ecm->test_bytes)
205         {
206           for (i = 0; i < n_read; i++)
207             {
208               if (wrk->rx_buf[i] != ((es->bytes_received + i) & 0xff))
209                 {
210                   clib_warning ("read %d error at byte %lld, 0x%x not 0x%x",
211                                 n_read, es->bytes_received + i, wrk->rx_buf[i],
212                                 ((es->bytes_received + i) & 0xff));
213                   ecm->test_failed = 1;
214                 }
215             }
216         }
217       ASSERT (n_read <= es->bytes_to_receive);
218       es->bytes_to_receive -= n_read;
219       es->bytes_received += n_read;
220     }
221 }
222
223 static uword
224 ec_node_fn (vlib_main_t *vm, vlib_node_runtime_t *node, vlib_frame_t *frame)
225 {
226   u32 *conn_indices, *conns_this_batch, nconns_this_batch;
227   int thread_index = vm->thread_index, i, delete_session;
228   ec_main_t *ecm = &ec_main;
229   ec_worker_t *wrk;
230   ec_session_t *es;
231   session_t *s;
232
233   if (ecm->run_test != EC_RUNNING)
234     return 0;
235
236   wrk = ec_worker_get (thread_index);
237   conn_indices = wrk->conn_indices;
238   conns_this_batch = wrk->conns_this_batch;
239
240   if (((vec_len (conn_indices) == 0) && vec_len (conns_this_batch) == 0))
241     return 0;
242
243   /* Grab another pile of connections */
244   if (PREDICT_FALSE (vec_len (conns_this_batch) == 0))
245     {
246       nconns_this_batch =
247         clib_min (ecm->connections_per_batch, vec_len (conn_indices));
248
249       ASSERT (nconns_this_batch > 0);
250       vec_validate (conns_this_batch, nconns_this_batch - 1);
251       clib_memcpy_fast (conns_this_batch,
252                         conn_indices + vec_len (conn_indices) -
253                           nconns_this_batch,
254                         nconns_this_batch * sizeof (u32));
255       vec_dec_len (conn_indices, nconns_this_batch);
256     }
257
258   /*
259    * Track progress
260    */
261   if (PREDICT_FALSE (ecm->prev_conns != ecm->connections_per_batch &&
262                      ecm->prev_conns == vec_len (conns_this_batch)))
263     {
264       ecm->repeats++;
265       ecm->prev_conns = vec_len (conns_this_batch);
266       if (ecm->repeats == 500000)
267         {
268           clib_warning ("stuck clients");
269         }
270     }
271   else
272     {
273       ecm->prev_conns = vec_len (conns_this_batch);
274       ecm->repeats = 0;
275     }
276
277   /*
278    * Handle connections in this batch
279    */
280   for (i = 0; i < vec_len (conns_this_batch); i++)
281     {
282       es = ec_session_get (wrk, conns_this_batch[i]);
283
284       delete_session = 1;
285
286       if (es->bytes_to_send > 0)
287         {
288           send_data_chunk (ecm, es);
289           delete_session = 0;
290         }
291
292       if (es->bytes_to_receive > 0)
293         {
294           delete_session = 0;
295         }
296
297       if (PREDICT_FALSE (delete_session == 1))
298         {
299           clib_atomic_fetch_add (&ecm->tx_total, es->bytes_sent);
300           clib_atomic_fetch_add (&ecm->rx_total, es->bytes_received);
301           s = session_get_from_handle_if_valid (es->vpp_session_handle);
302
303           if (s)
304             {
305               vnet_disconnect_args_t _a, *a = &_a;
306               a->handle = session_handle (s);
307               a->app_index = ecm->app_index;
308               vnet_disconnect_session (a);
309
310               vec_delete (conns_this_batch, 1, i);
311               i--;
312               clib_atomic_fetch_add (&ecm->ready_connections, -1);
313             }
314           else
315             {
316               clib_warning ("session AWOL?");
317               vec_delete (conns_this_batch, 1, i);
318             }
319
320           /* Kick the debug CLI process */
321           if (ecm->ready_connections == 0)
322             {
323               signal_evt_to_cli (EC_CLI_TEST_DONE);
324             }
325         }
326     }
327
328   wrk->conn_indices = conn_indices;
329   wrk->conns_this_batch = conns_this_batch;
330   return 0;
331 }
332
333 VLIB_REGISTER_NODE (echo_clients_node) = {
334   .function = ec_node_fn,
335   .name = "echo-clients",
336   .type = VLIB_NODE_TYPE_INPUT,
337   .state = VLIB_NODE_STATE_DISABLED,
338 };
339
340 static void
341 ec_reset_runtime_config (ec_main_t *ecm)
342 {
343   ecm->n_clients = 1;
344   ecm->quic_streams = 1;
345   ecm->bytes_to_send = 8192;
346   ecm->no_return = 0;
347   ecm->fifo_size = 64 << 10;
348   ecm->connections_per_batch = 1000;
349   ecm->private_segment_count = 0;
350   ecm->private_segment_size = 256 << 20;
351   ecm->no_output = 0;
352   ecm->test_bytes = 0;
353   ecm->test_failed = 0;
354   ecm->tls_engine = CRYPTO_ENGINE_OPENSSL;
355   ecm->no_copy = 0;
356   ecm->run_test = EC_STARTING;
357   ecm->ready_connections = 0;
358   ecm->connect_conn_index = 0;
359   ecm->rx_total = 0;
360   ecm->tx_total = 0;
361   ecm->barrier_acq_needed = 0;
362   ecm->prealloc_sessions = 0;
363   ecm->prealloc_fifos = 0;
364   ecm->appns_id = 0;
365   ecm->appns_secret = 0;
366   ecm->attach_flags = 0;
367   ecm->syn_timeout = 20.0;
368   ecm->test_timeout = 20.0;
369   vec_free (ecm->connect_uri);
370 }
371
372 static int
373 ec_init (vlib_main_t *vm)
374 {
375   ec_main_t *ecm = &ec_main;
376   ec_worker_t *wrk;
377   u32 num_threads;
378   int i;
379
380   ec_reset_runtime_config (ecm);
381
382   /* Store cli process node index for signaling */
383   ecm->cli_node_index = vlib_get_current_process (vm)->node_runtime.node_index;
384   ecm->vlib_main = vm;
385
386   if (vlib_num_workers ())
387     {
388       /* The request came over the binary api and the inband cli handler
389        * is not mp_safe. Drop the barrier to make sure the workers are not
390        * blocked.
391        */
392       if (vlib_thread_is_main_w_barrier ())
393         {
394           ecm->barrier_acq_needed = 1;
395           vlib_worker_thread_barrier_release (vm);
396         }
397       /*
398        * There's a good chance that both the client and the server echo
399        * apps will be enabled so make sure the session queue node polls on
400        * the main thread as connections will probably be established on it.
401        */
402       vlib_node_set_state (vm, session_queue_node.index,
403                            VLIB_NODE_STATE_POLLING);
404     }
405
406   /* App init done only once */
407   if (ecm->app_is_init)
408     return 0;
409
410
411   /* Init test data. Big buffer */
412   vec_validate (ecm->connect_test_data, 4 * 1024 * 1024 - 1);
413   for (i = 0; i < vec_len (ecm->connect_test_data); i++)
414     ecm->connect_test_data[i] = i & 0xff;
415
416   num_threads = 1 /* main thread */ + vlib_num_workers ();
417   vec_validate (ecm->wrk, num_threads - 1);
418   vec_foreach (wrk, ecm->wrk)
419     {
420       vec_validate (wrk->rx_buf, vec_len (ecm->connect_test_data) - 1);
421       wrk->thread_index = wrk - ecm->wrk;
422       wrk->vpp_event_queue =
423         session_main_get_vpp_event_queue (wrk->thread_index);
424     }
425
426   ecm->app_is_init = 1;
427
428   vlib_worker_thread_barrier_sync (vm);
429   vnet_session_enable_disable (vm, 1 /* turn on session and transports */);
430   vlib_worker_thread_barrier_release (vm);
431
432   /* Turn on the builtin client input nodes */
433   foreach_vlib_main ()
434     vlib_node_set_state (this_vlib_main, echo_clients_node.index,
435                          VLIB_NODE_STATE_POLLING);
436
437   return 0;
438 }
439
440 static void
441 ec_prealloc_sessions (ec_main_t *ecm)
442 {
443   u32 sessions_per_wrk, n_wrks;
444   ec_worker_t *wrk;
445
446   n_wrks = vlib_num_workers () ? vlib_num_workers () : 1;
447
448   sessions_per_wrk = ecm->n_clients / n_wrks;
449   vec_foreach (wrk, ecm->wrk)
450     pool_init_fixed (wrk->sessions, 1.1 * sessions_per_wrk);
451 }
452
453 static void
454 ec_worker_cleanup (ec_worker_t *wrk)
455 {
456   pool_free (wrk->sessions);
457   vec_free (wrk->conn_indices);
458   vec_free (wrk->conns_this_batch);
459 }
460
461 static void
462 ec_cleanup (ec_main_t *ecm)
463 {
464   ec_worker_t *wrk;
465
466   vec_foreach (wrk, ecm->wrk)
467     ec_worker_cleanup (wrk);
468
469   vec_free (ecm->connect_uri);
470   vec_free (ecm->appns_id);
471
472   if (ecm->barrier_acq_needed)
473     vlib_worker_thread_barrier_sync (ecm->vlib_main);
474 }
475
476 static int
477 quic_ec_qsession_connected_callback (u32 app_index, u32 api_context,
478                                      session_t *s, session_error_t err)
479 {
480   session_endpoint_cfg_t sep = SESSION_ENDPOINT_CFG_NULL;
481   ec_main_t *ecm = &ec_main;
482   vnet_connect_args_t *a = 0;
483   session_handle_t handle;
484   u32 stream_n;
485   int rv;
486
487   DBG ("QUIC Connection handle %d", session_handle (s));
488
489   vec_validate (a, 1);
490   a->uri = (char *) ecm->connect_uri;
491   if (parse_uri (a->uri, &sep))
492     return -1;
493   sep.parent_handle = handle = session_handle (s);
494
495   for (stream_n = 0; stream_n < ecm->quic_streams; stream_n++)
496     {
497       clib_memset (a, 0, sizeof (*a));
498       a->app_index = ecm->app_index;
499       a->api_context = -1 - api_context;
500       clib_memcpy (&a->sep_ext, &sep, sizeof (sep));
501
502       DBG ("QUIC opening stream %d", stream_n);
503       if ((rv = vnet_connect (a)))
504         {
505           clib_error ("Stream session %d opening failed: %d", stream_n, rv);
506           return -1;
507         }
508       DBG ("QUIC stream %d connected", stream_n);
509     }
510   /*
511    * 's' is no longer valid, its underlying pool could have been moved in
512    * vnet_connect()
513    */
514   vec_free (a);
515   return 0;
516 }
517
518 static int
519 quic_ec_session_connected_callback (u32 app_index, u32 api_context,
520                                     session_t *s, session_error_t err)
521 {
522   ec_main_t *ecm = &ec_main;
523   ec_session_t *es;
524   ec_worker_t *wrk;
525   u32 thread_index;
526
527   if (PREDICT_FALSE (ecm->run_test != EC_STARTING))
528     return -1;
529
530   if (err)
531     {
532       clib_warning ("connection %d failed!", api_context);
533       ecm->run_test = EC_EXITING;
534       signal_evt_to_cli (EC_CLI_CONNECTS_FAILED);
535       return 0;
536     }
537
538   if (s->listener_handle == SESSION_INVALID_HANDLE)
539     return quic_ec_qsession_connected_callback (app_index, api_context, s,
540                                                 err);
541   DBG ("STREAM Connection callback %d", api_context);
542
543   thread_index = s->thread_index;
544   ASSERT (thread_index == vlib_get_thread_index ()
545           || session_transport_service_type (s) == TRANSPORT_SERVICE_CL);
546
547   wrk = ec_worker_get (thread_index);
548
549   /*
550    * Setup session
551    */
552   es = ec_session_alloc (wrk);
553
554   es->bytes_to_send = ecm->bytes_to_send;
555   es->bytes_to_receive = ecm->no_return ? 0ULL : ecm->bytes_to_send;
556   es->data.rx_fifo = s->rx_fifo;
557   es->data.rx_fifo->shr->client_session_index = es->data.session_index;
558   es->data.tx_fifo = s->tx_fifo;
559   es->data.tx_fifo->shr->client_session_index = es->data.session_index;
560   es->data.vpp_evt_q = wrk->vpp_event_queue;
561   es->vpp_session_handle = session_handle (s);
562   es->vpp_session_index = s->session_index;
563   s->opaque = es->data.session_index;
564
565   if (ecm->is_dgram)
566     {
567       transport_connection_t *tc;
568       tc = session_get_transport (s);
569       clib_memcpy_fast (&es->data.transport, tc, sizeof (es->data.transport));
570       es->data.is_dgram = 1;
571     }
572
573   vec_add1 (wrk->conn_indices, es->data.session_index);
574   clib_atomic_fetch_add (&ecm->ready_connections, 1);
575   if (ecm->ready_connections == ecm->expected_connections)
576     {
577       ecm->run_test = EC_RUNNING;
578       /* Signal the CLI process that the action is starting... */
579       signal_evt_to_cli (EC_CLI_CONNECTS_DONE);
580     }
581
582   return 0;
583 }
584
585 static int
586 ec_session_connected_callback (u32 app_index, u32 api_context, session_t *s,
587                                session_error_t err)
588 {
589   ec_main_t *ecm = &ec_main;
590   ec_session_t *es;
591   u32 thread_index;
592   ec_worker_t *wrk;
593
594   if (PREDICT_FALSE (ecm->run_test != EC_STARTING))
595     return -1;
596
597   if (err)
598     {
599       clib_warning ("connection %d failed!", api_context);
600       ecm->run_test = EC_EXITING;
601       signal_evt_to_cli (EC_CLI_CONNECTS_FAILED);
602       return 0;
603     }
604
605   thread_index = s->thread_index;
606   ASSERT (thread_index == vlib_get_thread_index ()
607           || session_transport_service_type (s) == TRANSPORT_SERVICE_CL);
608
609   wrk = ec_worker_get (thread_index);
610
611   /*
612    * Setup session
613    */
614   es = ec_session_alloc (wrk);
615
616   es->bytes_to_send = ecm->bytes_to_send;
617   es->bytes_to_receive = ecm->no_return ? 0ULL : ecm->bytes_to_send;
618   es->data.rx_fifo = s->rx_fifo;
619   es->data.rx_fifo->shr->client_session_index = es->data.session_index;
620   es->data.tx_fifo = s->tx_fifo;
621   es->data.tx_fifo->shr->client_session_index = es->data.session_index;
622   es->data.vpp_evt_q = wrk->vpp_event_queue;
623   es->vpp_session_handle = session_handle (s);
624   es->vpp_session_index = s->session_index;
625   s->opaque = es->data.session_index;
626
627   if (ecm->is_dgram)
628     {
629       transport_connection_t *tc;
630       tc = session_get_transport (s);
631       clib_memcpy_fast (&es->data.transport, tc, sizeof (es->data.transport));
632       es->data.is_dgram = 1;
633     }
634
635   vec_add1 (wrk->conn_indices, es->data.session_index);
636   clib_atomic_fetch_add (&ecm->ready_connections, 1);
637   if (ecm->ready_connections == ecm->expected_connections)
638     {
639       ecm->run_test = EC_RUNNING;
640       /* Signal the CLI process that the action is starting... */
641       signal_evt_to_cli (EC_CLI_CONNECTS_DONE);
642     }
643
644   return 0;
645 }
646
647 static void
648 ec_session_reset_callback (session_t *s)
649 {
650   ec_main_t *ecm = &ec_main;
651   vnet_disconnect_args_t _a = { 0 }, *a = &_a;
652
653   if (s->session_state == SESSION_STATE_READY)
654     clib_warning ("Reset active connection %U", format_session, s, 2);
655
656   a->handle = session_handle (s);
657   a->app_index = ecm->app_index;
658   vnet_disconnect_session (a);
659   return;
660 }
661
662 static int
663 ec_session_accept_callback (session_t *s)
664 {
665   return 0;
666 }
667
668 static void
669 ec_session_disconnect_callback (session_t *s)
670 {
671   ec_main_t *ecm = &ec_main;
672   vnet_disconnect_args_t _a = { 0 }, *a = &_a;
673   a->handle = session_handle (s);
674   a->app_index = ecm->app_index;
675   vnet_disconnect_session (a);
676   return;
677 }
678
679 void
680 ec_session_disconnect (session_t *s)
681 {
682   ec_main_t *ecm = &ec_main;
683   vnet_disconnect_args_t _a = { 0 }, *a = &_a;
684   a->handle = session_handle (s);
685   a->app_index = ecm->app_index;
686   vnet_disconnect_session (a);
687 }
688
689 static int
690 ec_session_rx_callback (session_t *s)
691 {
692   ec_main_t *ecm = &ec_main;
693   ec_worker_t *wrk;
694   ec_session_t *es;
695
696   if (PREDICT_FALSE (ecm->run_test != EC_RUNNING))
697     {
698       ec_session_disconnect (s);
699       return -1;
700     }
701
702   wrk = ec_worker_get (s->thread_index);
703   es = ec_session_get (wrk, s->opaque);
704
705   receive_data_chunk (wrk, es);
706
707   if (svm_fifo_max_dequeue_cons (s->rx_fifo))
708     {
709       if (svm_fifo_set_event (s->rx_fifo))
710         session_send_io_evt_to_thread (s->rx_fifo, SESSION_IO_EVT_BUILTIN_RX);
711     }
712   return 0;
713 }
714
715 static int
716 ec_add_segment_callback (u32 app_index, u64 segment_handle)
717 {
718   /* New segments may be added */
719   return 0;
720 }
721
722 static int
723 ec_del_segment_callback (u32 app_index, u64 segment_handle)
724 {
725   return 0;
726 }
727
728 static session_cb_vft_t ec_cb_vft = {
729   .session_reset_callback = ec_session_reset_callback,
730   .session_connected_callback = ec_session_connected_callback,
731   .session_accept_callback = ec_session_accept_callback,
732   .session_disconnect_callback = ec_session_disconnect_callback,
733   .builtin_app_rx_callback = ec_session_rx_callback,
734   .add_segment_callback = ec_add_segment_callback,
735   .del_segment_callback = ec_del_segment_callback,
736 };
737
738 static clib_error_t *
739 ec_attach ()
740 {
741   vnet_app_add_cert_key_pair_args_t _ck_pair, *ck_pair = &_ck_pair;
742   ec_main_t *ecm = &ec_main;
743   vnet_app_attach_args_t _a, *a = &_a;
744   u32 prealloc_fifos;
745   u64 options[18];
746   int rv;
747
748   clib_memset (a, 0, sizeof (*a));
749   clib_memset (options, 0, sizeof (options));
750
751   a->api_client_index = ~0;
752   a->name = format (0, "echo_client");
753   if (ecm->transport_proto == TRANSPORT_PROTO_QUIC)
754     ec_cb_vft.session_connected_callback = quic_ec_session_connected_callback;
755   a->session_cb_vft = &ec_cb_vft;
756
757   prealloc_fifos = ecm->prealloc_fifos ? ecm->expected_connections : 1;
758
759   options[APP_OPTIONS_ACCEPT_COOKIE] = 0x12345678;
760   options[APP_OPTIONS_SEGMENT_SIZE] = ecm->private_segment_size;
761   options[APP_OPTIONS_ADD_SEGMENT_SIZE] = ecm->private_segment_size;
762   options[APP_OPTIONS_RX_FIFO_SIZE] = ecm->fifo_size;
763   options[APP_OPTIONS_TX_FIFO_SIZE] = ecm->fifo_size;
764   options[APP_OPTIONS_PRIVATE_SEGMENT_COUNT] = ecm->private_segment_count;
765   options[APP_OPTIONS_PREALLOC_FIFO_PAIRS] = prealloc_fifos;
766   options[APP_OPTIONS_FLAGS] = APP_OPTIONS_FLAGS_IS_BUILTIN;
767   options[APP_OPTIONS_TLS_ENGINE] = ecm->tls_engine;
768   options[APP_OPTIONS_PCT_FIRST_ALLOC] = 100;
769   options[APP_OPTIONS_FLAGS] |= ecm->attach_flags;
770   if (ecm->appns_id)
771     {
772       options[APP_OPTIONS_NAMESPACE_SECRET] = ecm->appns_secret;
773       a->namespace_id = ecm->appns_id;
774     }
775   a->options = options;
776
777   if ((rv = vnet_application_attach (a)))
778     return clib_error_return (0, "attach returned %d", rv);
779
780   ecm->app_index = a->app_index;
781   vec_free (a->name);
782
783   clib_memset (ck_pair, 0, sizeof (*ck_pair));
784   ck_pair->cert = (u8 *) test_srv_crt_rsa;
785   ck_pair->key = (u8 *) test_srv_key_rsa;
786   ck_pair->cert_len = test_srv_crt_rsa_len;
787   ck_pair->key_len = test_srv_key_rsa_len;
788   vnet_app_add_cert_key_pair (ck_pair);
789   ecm->ckpair_index = ck_pair->index;
790
791   ecm->test_client_attached = 1;
792
793   return 0;
794 }
795
796 static int
797 ec_detach ()
798 {
799   ec_main_t *ecm = &ec_main;
800   vnet_app_detach_args_t _da, *da = &_da;
801   int rv;
802
803   if (!ecm->test_client_attached)
804     return 0;
805
806   da->app_index = ecm->app_index;
807   da->api_client_index = ~0;
808   rv = vnet_application_detach (da);
809   ecm->test_client_attached = 0;
810   ecm->app_index = ~0;
811   vnet_app_del_cert_key_pair (ecm->ckpair_index);
812
813   return rv;
814 }
815
816 static int
817 ec_transport_needs_crypto (transport_proto_t proto)
818 {
819   return proto == TRANSPORT_PROTO_TLS || proto == TRANSPORT_PROTO_DTLS ||
820          proto == TRANSPORT_PROTO_QUIC;
821 }
822
823 static int
824 ec_connect_rpc (void *args)
825 {
826   ec_main_t *ecm = &ec_main;
827   vnet_connect_args_t _a = {}, *a = &_a;
828   int rv, needs_crypto;
829   u32 n_clients, ci;
830
831   n_clients = ecm->n_clients;
832   needs_crypto = ec_transport_needs_crypto (ecm->transport_proto);
833   clib_memcpy (&a->sep_ext, &ecm->connect_sep, sizeof (ecm->connect_sep));
834   a->sep_ext.transport_flags |= TRANSPORT_CFG_F_CONNECTED;
835   a->app_index = ecm->app_index;
836
837   ci = ecm->connect_conn_index;
838
839   while (ci < n_clients)
840     {
841       /* Crude pacing for call setups  */
842       if (ci - ecm->ready_connections > 128)
843         {
844           ecm->connect_conn_index = ci;
845           break;
846         }
847
848       a->api_context = ci;
849       if (needs_crypto)
850         {
851           session_endpoint_alloc_ext_cfg (&a->sep_ext,
852                                           TRANSPORT_ENDPT_EXT_CFG_CRYPTO);
853           a->sep_ext.ext_cfg->crypto.ckpair_index = ecm->ckpair_index;
854         }
855
856       rv = vnet_connect (a);
857
858       if (needs_crypto)
859         clib_mem_free (a->sep_ext.ext_cfg);
860
861       if (rv)
862         {
863           clib_warning ("connect returned: %U", format_session_error, rv);
864           ecm->run_test = EC_EXITING;
865           signal_evt_to_cli (EC_CLI_CONNECTS_FAILED);
866           break;
867         }
868
869       ci += 1;
870     }
871
872   if (ci < ecm->expected_connections && ecm->run_test != EC_EXITING)
873     ec_program_connects ();
874
875   return 0;
876 }
877
878 void
879 ec_program_connects (void)
880 {
881   session_send_rpc_evt_to_thread_force (transport_cl_thread (), ec_connect_rpc,
882                                         0);
883 }
884
885 #define ec_cli(_fmt, _args...)                                                \
886   if (!ecm->no_output)                                                        \
887   vlib_cli_output (vm, _fmt, ##_args)
888
889 static clib_error_t *
890 ec_command_fn (vlib_main_t *vm, unformat_input_t *input,
891                vlib_cli_command_t *cmd)
892 {
893   unformat_input_t _line_input, *line_input = &_line_input;
894   char *default_uri = "tcp://6.0.1.1/1234", *transfer_type;
895   ec_main_t *ecm = &ec_main;
896   uword *event_data = 0, event_type;
897   clib_error_t *error = 0;
898   int rv, had_config = 1;
899   u64 tmp, total_bytes;
900   f64 delta;
901
902   if (ecm->test_client_attached)
903     return clib_error_return (0, "failed: already running!");
904
905   if (ec_init (vm))
906     {
907       error = clib_error_return (0, "failed init");
908       goto cleanup;
909     }
910
911   if (!unformat_user (input, unformat_line_input, line_input))
912     {
913       had_config = 0;
914       goto parse_config;
915     }
916
917   while (unformat_check_input (line_input) != UNFORMAT_END_OF_INPUT)
918     {
919       if (unformat (line_input, "uri %s", &ecm->connect_uri))
920         ;
921       else if (unformat (line_input, "nclients %d", &ecm->n_clients))
922         ;
923       else if (unformat (line_input, "quic-streams %d", &ecm->quic_streams))
924         ;
925       else if (unformat (line_input, "mbytes %lld", &tmp))
926         ecm->bytes_to_send = tmp << 20;
927       else if (unformat (line_input, "gbytes %lld", &tmp))
928         ecm->bytes_to_send = tmp << 30;
929       else if (unformat (line_input, "bytes %U", unformat_memory_size,
930                          &ecm->bytes_to_send))
931         ;
932       else if (unformat (line_input, "test-timeout %f", &ecm->test_timeout))
933         ;
934       else if (unformat (line_input, "syn-timeout %f", &ecm->syn_timeout))
935         ;
936       else if (unformat (line_input, "no-return"))
937         ecm->no_return = 1;
938       else if (unformat (line_input, "fifo-size %d", &ecm->fifo_size))
939         ecm->fifo_size <<= 10;
940       else if (unformat (line_input, "private-segment-count %d",
941                          &ecm->private_segment_count))
942         ;
943       else if (unformat (line_input, "private-segment-size %U",
944                          unformat_memory_size, &ecm->private_segment_size))
945         ;
946       else if (unformat (line_input, "preallocate-fifos"))
947         ecm->prealloc_fifos = 1;
948       else if (unformat (line_input, "preallocate-sessions"))
949         ecm->prealloc_sessions = 1;
950       else if (unformat (line_input, "client-batch %d",
951                          &ecm->connections_per_batch))
952         ;
953       else if (unformat (line_input, "appns %_%v%_", &ecm->appns_id))
954         ;
955       else if (unformat (line_input, "all-scope"))
956         ecm->attach_flags |= (APP_OPTIONS_FLAGS_USE_GLOBAL_SCOPE |
957                               APP_OPTIONS_FLAGS_USE_LOCAL_SCOPE);
958       else if (unformat (line_input, "local-scope"))
959         ecm->attach_flags = APP_OPTIONS_FLAGS_USE_LOCAL_SCOPE;
960       else if (unformat (line_input, "global-scope"))
961         ecm->attach_flags = APP_OPTIONS_FLAGS_USE_GLOBAL_SCOPE;
962       else if (unformat (line_input, "secret %lu", &ecm->appns_secret))
963         ;
964       else if (unformat (line_input, "no-output"))
965         ecm->no_output = 1;
966       else if (unformat (line_input, "test-bytes"))
967         ecm->test_bytes = 1;
968       else if (unformat (line_input, "tls-engine %d", &ecm->tls_engine))
969         ;
970       else
971         {
972           error = clib_error_return (0, "failed: unknown input `%U'",
973                                      format_unformat_error, line_input);
974           goto cleanup;
975         }
976     }
977
978 parse_config:
979
980   ecm->expected_connections = ecm->n_clients * ecm->quic_streams;
981
982   if (!ecm->connect_uri)
983     {
984       clib_warning ("No uri provided. Using default: %s", default_uri);
985       ecm->connect_uri = format (0, "%s%c", default_uri, 0);
986     }
987
988   if ((rv = parse_uri ((char *) ecm->connect_uri, &ecm->connect_sep)))
989     {
990       error = clib_error_return (0, "Uri parse error: %d", rv);
991       goto cleanup;
992     }
993   ecm->transport_proto = ecm->connect_sep.transport_proto;
994   ecm->is_dgram = (ecm->transport_proto == TRANSPORT_PROTO_UDP);
995
996   if (ecm->prealloc_sessions)
997     ec_prealloc_sessions (ecm);
998
999   if ((error = ec_attach ()))
1000     {
1001       clib_error_report (error);
1002       goto cleanup;
1003     }
1004
1005   /*
1006    * Start. Fire off connect requests
1007    */
1008
1009   ecm->syn_start_time = vlib_time_now (vm);
1010   ec_program_connects ();
1011
1012   /*
1013    * Park until the sessions come up, or syn_timeout seconds pass
1014    */
1015
1016   vlib_process_wait_for_event_or_clock (vm, ecm->syn_timeout);
1017   event_type = vlib_process_get_events (vm, &event_data);
1018   switch (event_type)
1019     {
1020     case ~0:
1021       ec_cli ("Timeout with only %d sessions active...",
1022               ecm->ready_connections);
1023       error = clib_error_return (0, "failed: syn timeout with %d sessions",
1024                                  ecm->ready_connections);
1025       goto cleanup;
1026
1027     case EC_CLI_CONNECTS_DONE:
1028       delta = vlib_time_now (vm) - ecm->syn_start_time;
1029       if (delta != 0.0)
1030         ec_cli ("%d three-way handshakes in %.2f seconds %.2f/s",
1031                 ecm->n_clients, delta, ((f64) ecm->n_clients) / delta);
1032       break;
1033
1034     case EC_CLI_CONNECTS_FAILED:
1035       error = clib_error_return (0, "failed: connect returned");
1036       goto cleanup;
1037
1038     default:
1039       ec_cli ("unexpected event(1): %d", event_type);
1040       error = clib_error_return (0, "failed: unexpected event(1): %d",
1041                                  event_type);
1042       goto cleanup;
1043     }
1044
1045   /*
1046    * Wait for the sessions to finish or test_timeout seconds pass
1047    */
1048   ecm->test_start_time = vlib_time_now (ecm->vlib_main);
1049   ec_cli ("Test started at %.6f", ecm->test_start_time);
1050   vlib_process_wait_for_event_or_clock (vm, ecm->test_timeout);
1051   event_type = vlib_process_get_events (vm, &event_data);
1052   switch (event_type)
1053     {
1054     case ~0:
1055       ec_cli ("Timeout at %.6f with %d sessions still active...",
1056               vlib_time_now (ecm->vlib_main), ecm->ready_connections);
1057       error = clib_error_return (0, "failed: timeout with %d sessions",
1058                                  ecm->ready_connections);
1059       goto cleanup;
1060
1061     case EC_CLI_TEST_DONE:
1062       ecm->test_end_time = vlib_time_now (vm);
1063       ec_cli ("Test finished at %.6f", ecm->test_end_time);
1064       break;
1065
1066     default:
1067       ec_cli ("unexpected event(2): %d", event_type);
1068       error = clib_error_return (0, "failed: unexpected event(2): %d",
1069                                  event_type);
1070       goto cleanup;
1071     }
1072
1073   /*
1074    * Done. Compute stats
1075    */
1076   delta = ecm->test_end_time - ecm->test_start_time;
1077   if (delta == 0.0)
1078     {
1079       ec_cli ("zero delta-t?");
1080       error = clib_error_return (0, "failed: zero delta-t");
1081       goto cleanup;
1082     }
1083
1084   total_bytes = (ecm->no_return ? ecm->tx_total : ecm->rx_total);
1085   transfer_type = ecm->no_return ? "half-duplex" : "full-duplex";
1086   ec_cli ("%lld bytes (%lld mbytes, %lld gbytes) in %.2f seconds", total_bytes,
1087           total_bytes / (1ULL << 20), total_bytes / (1ULL << 30), delta);
1088   ec_cli ("%.2f bytes/second %s", ((f64) total_bytes) / (delta),
1089           transfer_type);
1090   ec_cli ("%.4f gbit/second %s", (((f64) total_bytes * 8.0) / delta / 1e9),
1091           transfer_type);
1092
1093   if (ecm->test_bytes && ecm->test_failed)
1094     error = clib_error_return (0, "failed: test bytes");
1095
1096 cleanup:
1097
1098   /*
1099    * Cleanup
1100    */
1101   ecm->run_test = EC_EXITING;
1102   vlib_process_wait_for_event_or_clock (vm, 10e-3);
1103
1104   /* Detach the application, so we can use different fifo sizes next time */
1105   if (ec_detach ())
1106     {
1107       error = clib_error_return (0, "failed: app detach");
1108       ec_cli ("WARNING: app detach failed...");
1109     }
1110
1111   ec_cleanup (ecm);
1112   if (had_config)
1113     unformat_free (line_input);
1114
1115   if (error)
1116     ec_cli ("test failed");
1117
1118   return error;
1119 }
1120
1121 VLIB_CLI_COMMAND (ec_command, static) = {
1122   .path = "test echo clients",
1123   .short_help =
1124     "test echo clients [nclients %d][[m|g]bytes <bytes>]"
1125     "[test-timeout <time>][syn-timeout <time>][no-return][fifo-size <size>]"
1126     "[private-segment-count <count>][private-segment-size <bytes>[m|g]]"
1127     "[preallocate-fifos][preallocate-sessions][client-batch <batch-size>]"
1128     "[uri <tcp://ip/port>][test-bytes][no-output]",
1129   .function = ec_command_fn,
1130   .is_mp_safe = 1,
1131 };
1132
1133 clib_error_t *
1134 ec_main_init (vlib_main_t *vm)
1135 {
1136   ec_main_t *ecm = &ec_main;
1137   ecm->app_is_init = 0;
1138   return 0;
1139 }
1140
1141 VLIB_INIT_FUNCTION (ec_main_init);
1142
1143 /*
1144  * fd.io coding-style-patch-verification: ON
1145  *
1146  * Local Variables:
1147  * eval: (c-set-style "gnu")
1148  * End:
1149  */