b7bc85b82a939e004330207d01aad0252b333673
[vpp.git] / src / plugins / hs_apps / echo_client.c
1 /*
2  * echo_client.c - vpp built-in echo client code
3  *
4  * Copyright (c) 2017-2019 by Cisco and/or its affiliates.
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at:
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17
18 #include <hs_apps/echo_client.h>
19
20 static ec_main_t ec_main;
21
22 #define EC_DBG (0)
23 #define DBG(_fmt, _args...)                                                   \
24   if (EC_DBG)                                                                 \
25   clib_warning (_fmt, ##_args)
26
27 static void
28 signal_evt_to_cli_i (int *code)
29 {
30   ec_main_t *ecm = &ec_main;
31   ASSERT (vlib_get_thread_index () == 0);
32   vlib_process_signal_event (ecm->vlib_main, ecm->cli_node_index, *code, 0);
33 }
34
35 static void
36 signal_evt_to_cli (int code)
37 {
38   if (vlib_get_thread_index () != 0)
39     vl_api_rpc_call_main_thread (signal_evt_to_cli_i, (u8 *) & code,
40                                  sizeof (code));
41   else
42     signal_evt_to_cli_i (&code);
43 }
44
45 static inline ec_worker_t *
46 ec_worker_get (u32 thread_index)
47 {
48   return vec_elt_at_index (ec_main.wrk, thread_index);
49 }
50
51 static inline ec_session_t *
52 ec_session_alloc (ec_worker_t *wrk)
53 {
54   ec_session_t *ecs;
55
56   pool_get_zero (wrk->sessions, ecs);
57   ecs->data.session_index = ecs - wrk->sessions;
58   ecs->thread_index = wrk->thread_index;
59
60   return ecs;
61 }
62
63 static inline ec_session_t *
64 ec_session_get (ec_worker_t *wrk, u32 ec_index)
65 {
66   return pool_elt_at_index (wrk->sessions, ec_index);
67 }
68
69 static void
70 send_data_chunk (ec_main_t *ecm, ec_session_t *es)
71 {
72   u8 *test_data = ecm->connect_test_data;
73   int test_buf_len, test_buf_offset, rv;
74   u32 bytes_this_chunk;
75
76   test_buf_len = vec_len (test_data);
77   ASSERT (test_buf_len > 0);
78   test_buf_offset = es->bytes_sent % test_buf_len;
79   bytes_this_chunk =
80     clib_min (test_buf_len - test_buf_offset, es->bytes_to_send);
81
82   if (!ecm->is_dgram)
83     {
84       if (ecm->no_copy)
85         {
86           svm_fifo_t *f = es->data.tx_fifo;
87           rv = clib_min (svm_fifo_max_enqueue_prod (f), bytes_this_chunk);
88           svm_fifo_enqueue_nocopy (f, rv);
89           session_send_io_evt_to_thread_custom (
90             &es->vpp_session_index, es->thread_index, SESSION_IO_EVT_TX);
91         }
92       else
93         rv = app_send_stream (&es->data, test_data + test_buf_offset,
94                               bytes_this_chunk, 0);
95     }
96   else
97     {
98       svm_fifo_t *f = es->data.tx_fifo;
99       u32 max_enqueue = svm_fifo_max_enqueue_prod (f);
100
101       if (max_enqueue < sizeof (session_dgram_hdr_t))
102         return;
103
104       max_enqueue -= sizeof (session_dgram_hdr_t);
105
106       if (ecm->no_copy)
107         {
108           session_dgram_hdr_t hdr;
109           app_session_transport_t *at = &es->data.transport;
110
111           rv = clib_min (max_enqueue, bytes_this_chunk);
112
113           hdr.data_length = rv;
114           hdr.data_offset = 0;
115           clib_memcpy_fast (&hdr.rmt_ip, &at->rmt_ip,
116                             sizeof (ip46_address_t));
117           hdr.is_ip4 = at->is_ip4;
118           hdr.rmt_port = at->rmt_port;
119           clib_memcpy_fast (&hdr.lcl_ip, &at->lcl_ip,
120                             sizeof (ip46_address_t));
121           hdr.lcl_port = at->lcl_port;
122           svm_fifo_enqueue (f, sizeof (hdr), (u8 *) & hdr);
123           svm_fifo_enqueue_nocopy (f, rv);
124           session_send_io_evt_to_thread_custom (
125             &es->vpp_session_index, es->thread_index, SESSION_IO_EVT_TX);
126         }
127       else
128         {
129           bytes_this_chunk = clib_min (bytes_this_chunk, max_enqueue);
130           rv = app_send_dgram (&es->data, test_data + test_buf_offset,
131                                bytes_this_chunk, 0);
132         }
133     }
134
135   /* If we managed to enqueue data... */
136   if (rv > 0)
137     {
138       /* Account for it... */
139       es->bytes_to_send -= rv;
140       es->bytes_sent += rv;
141
142       if (EC_DBG)
143         {
144           ELOG_TYPE_DECLARE (e) =
145             {
146               .format = "tx-enq: xfer %d bytes, sent %u remain %u",
147               .format_args = "i4i4i4",
148             };
149           struct
150           {
151             u32 data[3];
152           } *ed;
153           ed = ELOG_DATA (&vlib_global_main.elog_main, e);
154           ed->data[0] = rv;
155           ed->data[1] = es->bytes_sent;
156           ed->data[2] = es->bytes_to_send;
157         }
158     }
159 }
160
161 static void
162 receive_data_chunk (ec_worker_t *wrk, ec_session_t *es)
163 {
164   ec_main_t *ecm = &ec_main;
165   svm_fifo_t *rx_fifo = es->data.rx_fifo;
166   int n_read, i;
167
168   if (ecm->test_bytes)
169     {
170       if (!ecm->is_dgram)
171         n_read =
172           app_recv_stream (&es->data, wrk->rx_buf, vec_len (wrk->rx_buf));
173       else
174         n_read =
175           app_recv_dgram (&es->data, wrk->rx_buf, vec_len (wrk->rx_buf));
176     }
177   else
178     {
179       n_read = svm_fifo_max_dequeue_cons (rx_fifo);
180       svm_fifo_dequeue_drop (rx_fifo, n_read);
181     }
182
183   if (n_read > 0)
184     {
185       if (EC_DBG)
186         {
187           ELOG_TYPE_DECLARE (e) =
188             {
189               .format = "rx-deq: %d bytes",
190               .format_args = "i4",
191             };
192           struct
193           {
194             u32 data[1];
195           } *ed;
196           ed = ELOG_DATA (&vlib_global_main.elog_main, e);
197           ed->data[0] = n_read;
198         }
199
200       if (ecm->test_bytes)
201         {
202           for (i = 0; i < n_read; i++)
203             {
204               if (wrk->rx_buf[i] != ((es->bytes_received + i) & 0xff))
205                 {
206                   clib_warning ("read %d error at byte %lld, 0x%x not 0x%x",
207                                 n_read, es->bytes_received + i, wrk->rx_buf[i],
208                                 ((es->bytes_received + i) & 0xff));
209                   ecm->test_failed = 1;
210                 }
211             }
212         }
213       ASSERT (n_read <= es->bytes_to_receive);
214       es->bytes_to_receive -= n_read;
215       es->bytes_received += n_read;
216     }
217 }
218
219 static uword
220 ec_node_fn (vlib_main_t *vm, vlib_node_runtime_t *node, vlib_frame_t *frame)
221 {
222   u32 *conn_indices, *conns_this_batch, nconns_this_batch;
223   int thread_index = vm->thread_index, i, delete_session;
224   ec_main_t *ecm = &ec_main;
225   ec_worker_t *wrk;
226   ec_session_t *es;
227   session_t *s;
228
229   if (ecm->run_test != EC_RUNNING)
230     return 0;
231
232   wrk = ec_worker_get (thread_index);
233   conn_indices = wrk->conn_indices;
234   conns_this_batch = wrk->conns_this_batch;
235
236   if (((vec_len (conn_indices) == 0) && vec_len (conns_this_batch) == 0))
237     return 0;
238
239   /* Grab another pile of connections */
240   if (PREDICT_FALSE (vec_len (conns_this_batch) == 0))
241     {
242       nconns_this_batch =
243         clib_min (ecm->connections_per_batch, vec_len (conn_indices));
244
245       ASSERT (nconns_this_batch > 0);
246       vec_validate (conns_this_batch, nconns_this_batch - 1);
247       clib_memcpy_fast (conns_this_batch,
248                         conn_indices + vec_len (conn_indices) -
249                           nconns_this_batch,
250                         nconns_this_batch * sizeof (u32));
251       vec_dec_len (conn_indices, nconns_this_batch);
252     }
253
254   /*
255    * Track progress
256    */
257   if (PREDICT_FALSE (ecm->prev_conns != ecm->connections_per_batch &&
258                      ecm->prev_conns == vec_len (conns_this_batch)))
259     {
260       ecm->repeats++;
261       ecm->prev_conns = vec_len (conns_this_batch);
262       if (ecm->repeats == 500000)
263         {
264           clib_warning ("stuck clients");
265         }
266     }
267   else
268     {
269       ecm->prev_conns = vec_len (conns_this_batch);
270       ecm->repeats = 0;
271     }
272
273   /*
274    * Handle connections in this batch
275    */
276   for (i = 0; i < vec_len (conns_this_batch); i++)
277     {
278       es = ec_session_get (wrk, conns_this_batch[i]);
279
280       delete_session = 1;
281
282       if (es->bytes_to_send > 0)
283         {
284           send_data_chunk (ecm, es);
285           delete_session = 0;
286         }
287
288       if (es->bytes_to_receive > 0)
289         {
290           delete_session = 0;
291         }
292
293       if (PREDICT_FALSE (delete_session == 1))
294         {
295           clib_atomic_fetch_add (&ecm->tx_total, es->bytes_sent);
296           clib_atomic_fetch_add (&ecm->rx_total, es->bytes_received);
297           s = session_get_from_handle_if_valid (es->vpp_session_handle);
298
299           if (s)
300             {
301               vnet_disconnect_args_t _a, *a = &_a;
302               a->handle = session_handle (s);
303               a->app_index = ecm->app_index;
304               vnet_disconnect_session (a);
305
306               vec_delete (conns_this_batch, 1, i);
307               i--;
308               clib_atomic_fetch_add (&ecm->ready_connections, -1);
309             }
310           else
311             {
312               clib_warning ("session AWOL?");
313               vec_delete (conns_this_batch, 1, i);
314             }
315
316           /* Kick the debug CLI process */
317           if (ecm->ready_connections == 0)
318             {
319               signal_evt_to_cli (EC_CLI_TEST_DONE);
320             }
321         }
322     }
323
324   wrk->conn_indices = conn_indices;
325   wrk->conns_this_batch = conns_this_batch;
326   return 0;
327 }
328
329 VLIB_REGISTER_NODE (echo_clients_node) = {
330   .function = ec_node_fn,
331   .name = "echo-clients",
332   .type = VLIB_NODE_TYPE_INPUT,
333   .state = VLIB_NODE_STATE_DISABLED,
334 };
335
336 static void
337 ec_reset_runtime_config (ec_main_t *ecm)
338 {
339   ecm->n_clients = 1;
340   ecm->quic_streams = 1;
341   ecm->bytes_to_send = 8192;
342   ecm->no_return = 0;
343   ecm->fifo_size = 64 << 10;
344   ecm->connections_per_batch = 1000;
345   ecm->private_segment_count = 0;
346   ecm->private_segment_size = 256 << 20;
347   ecm->no_output = 0;
348   ecm->test_bytes = 0;
349   ecm->test_failed = 0;
350   ecm->tls_engine = CRYPTO_ENGINE_OPENSSL;
351   ecm->no_copy = 0;
352   ecm->run_test = EC_STARTING;
353   ecm->ready_connections = 0;
354   ecm->connect_conn_index = 0;
355   ecm->rx_total = 0;
356   ecm->tx_total = 0;
357   ecm->barrier_acq_needed = 0;
358   ecm->prealloc_sessions = 0;
359   ecm->prealloc_fifos = 0;
360   ecm->appns_id = 0;
361   ecm->appns_secret = 0;
362   ecm->attach_flags = 0;
363   ecm->syn_timeout = 20.0;
364   ecm->test_timeout = 20.0;
365   vec_free (ecm->connect_uri);
366 }
367
368 static int
369 ec_init (vlib_main_t *vm)
370 {
371   ec_main_t *ecm = &ec_main;
372   vlib_thread_main_t *vtm = vlib_get_thread_main ();
373   ec_worker_t *wrk;
374   u32 num_threads;
375   int i;
376
377   ec_reset_runtime_config (ecm);
378
379   /* Store cli process node index for signaling */
380   ecm->cli_node_index = vlib_get_current_process (vm)->node_runtime.node_index;
381   ecm->vlib_main = vm;
382
383   if (vlib_num_workers ())
384     {
385       /* The request came over the binary api and the inband cli handler
386        * is not mp_safe. Drop the barrier to make sure the workers are not
387        * blocked.
388        */
389       if (vlib_thread_is_main_w_barrier ())
390         {
391           ecm->barrier_acq_needed = 1;
392           vlib_worker_thread_barrier_release (vm);
393         }
394       /*
395        * There's a good chance that both the client and the server echo
396        * apps will be enabled so make sure the session queue node polls on
397        * the main thread as connections will probably be established on it.
398        */
399       vlib_node_set_state (vm, session_queue_node.index,
400                            VLIB_NODE_STATE_POLLING);
401     }
402
403   /* App init done only once */
404   if (ecm->app_is_init)
405     return 0;
406
407
408   /* Init test data. Big buffer */
409   vec_validate (ecm->connect_test_data, 4 * 1024 * 1024 - 1);
410   for (i = 0; i < vec_len (ecm->connect_test_data); i++)
411     ecm->connect_test_data[i] = i & 0xff;
412
413   num_threads = 1 /* main thread */ + vtm->n_threads;
414   vec_validate (ecm->wrk, num_threads);
415   vec_foreach (wrk, ecm->wrk)
416     {
417       vec_validate (wrk->rx_buf, vec_len (ecm->connect_test_data) - 1);
418       wrk->thread_index = wrk - ecm->wrk;
419       wrk->vpp_event_queue =
420         session_main_get_vpp_event_queue (wrk->thread_index);
421     }
422
423   ecm->app_is_init = 1;
424
425   vlib_worker_thread_barrier_sync (vm);
426   vnet_session_enable_disable (vm, 1 /* turn on session and transports */);
427   vlib_worker_thread_barrier_release (vm);
428
429   /* Turn on the builtin client input nodes */
430   for (i = 0; i < vtm->n_vlib_mains; i++)
431     vlib_node_set_state (vlib_get_main_by_index (i), echo_clients_node.index,
432                          VLIB_NODE_STATE_POLLING);
433
434   return 0;
435 }
436
437 static void
438 ec_prealloc_sessions (ec_main_t *ecm)
439 {
440   u32 sessions_per_wrk, n_wrks;
441   ec_worker_t *wrk;
442
443   n_wrks = vlib_num_workers () ? vlib_num_workers () : 1;
444
445   sessions_per_wrk = ecm->n_clients / n_wrks;
446   vec_foreach (wrk, ecm->wrk)
447     pool_init_fixed (wrk->sessions, 1.1 * sessions_per_wrk);
448 }
449
450 static void
451 ec_worker_cleanup (ec_worker_t *wrk)
452 {
453   pool_free (wrk->sessions);
454   vec_free (wrk->conn_indices);
455   vec_free (wrk->conns_this_batch);
456 }
457
458 static void
459 ec_cleanup (ec_main_t *ecm)
460 {
461   ec_worker_t *wrk;
462
463   vec_foreach (wrk, ecm->wrk)
464     ec_worker_cleanup (wrk);
465
466   vec_free (ecm->connect_uri);
467   vec_free (ecm->appns_id);
468
469   if (ecm->barrier_acq_needed)
470     vlib_worker_thread_barrier_sync (ecm->vlib_main);
471 }
472
473 static int
474 quic_ec_qsession_connected_callback (u32 app_index, u32 api_context,
475                                      session_t *s, session_error_t err)
476 {
477   session_endpoint_cfg_t sep = SESSION_ENDPOINT_CFG_NULL;
478   ec_main_t *ecm = &ec_main;
479   vnet_connect_args_t *a = 0;
480   session_handle_t handle;
481   u32 stream_n;
482   int rv;
483
484   DBG ("QUIC Connection handle %d", session_handle (s));
485
486   vec_validate (a, 1);
487   a->uri = (char *) ecm->connect_uri;
488   if (parse_uri (a->uri, &sep))
489     return -1;
490   sep.parent_handle = handle = session_handle (s);
491
492   for (stream_n = 0; stream_n < ecm->quic_streams; stream_n++)
493     {
494       clib_memset (a, 0, sizeof (*a));
495       a->app_index = ecm->app_index;
496       a->api_context = -1 - api_context;
497       clib_memcpy (&a->sep_ext, &sep, sizeof (sep));
498
499       DBG ("QUIC opening stream %d", stream_n);
500       if ((rv = vnet_connect (a)))
501         {
502           clib_error ("Stream session %d opening failed: %d", stream_n, rv);
503           return -1;
504         }
505       DBG ("QUIC stream %d connected", stream_n);
506     }
507   /*
508    * 's' is no longer valid, its underlying pool could have been moved in
509    * vnet_connect()
510    */
511   vec_free (a);
512   return 0;
513 }
514
515 static int
516 quic_ec_session_connected_callback (u32 app_index, u32 api_context,
517                                     session_t *s, session_error_t err)
518 {
519   ec_main_t *ecm = &ec_main;
520   ec_session_t *es;
521   ec_worker_t *wrk;
522   u32 thread_index;
523
524   if (PREDICT_FALSE (ecm->run_test != EC_STARTING))
525     return -1;
526
527   if (err)
528     {
529       clib_warning ("connection %d failed!", api_context);
530       ecm->run_test = EC_EXITING;
531       signal_evt_to_cli (EC_CLI_CONNECTS_FAILED);
532       return 0;
533     }
534
535   if (s->listener_handle == SESSION_INVALID_HANDLE)
536     return quic_ec_qsession_connected_callback (app_index, api_context, s,
537                                                 err);
538   DBG ("STREAM Connection callback %d", api_context);
539
540   thread_index = s->thread_index;
541   ASSERT (thread_index == vlib_get_thread_index ()
542           || session_transport_service_type (s) == TRANSPORT_SERVICE_CL);
543
544   wrk = ec_worker_get (thread_index);
545
546   /*
547    * Setup session
548    */
549   es = ec_session_alloc (wrk);
550
551   es->bytes_to_send = ecm->bytes_to_send;
552   es->bytes_to_receive = ecm->no_return ? 0ULL : ecm->bytes_to_send;
553   es->data.rx_fifo = s->rx_fifo;
554   es->data.rx_fifo->shr->client_session_index = es->data.session_index;
555   es->data.tx_fifo = s->tx_fifo;
556   es->data.tx_fifo->shr->client_session_index = es->data.session_index;
557   es->data.vpp_evt_q = wrk->vpp_event_queue;
558   es->vpp_session_handle = session_handle (s);
559   es->vpp_session_index = s->session_index;
560   s->opaque = es->data.session_index;
561
562   if (ecm->is_dgram)
563     {
564       transport_connection_t *tc;
565       tc = session_get_transport (s);
566       clib_memcpy_fast (&es->data.transport, tc, sizeof (es->data.transport));
567       es->data.is_dgram = 1;
568     }
569
570   vec_add1 (wrk->conn_indices, es->data.session_index);
571   clib_atomic_fetch_add (&ecm->ready_connections, 1);
572   if (ecm->ready_connections == ecm->expected_connections)
573     {
574       ecm->run_test = EC_RUNNING;
575       /* Signal the CLI process that the action is starting... */
576       signal_evt_to_cli (EC_CLI_CONNECTS_DONE);
577     }
578
579   return 0;
580 }
581
582 static int
583 ec_session_connected_callback (u32 app_index, u32 api_context, session_t *s,
584                                session_error_t err)
585 {
586   ec_main_t *ecm = &ec_main;
587   ec_session_t *es;
588   u32 thread_index;
589   ec_worker_t *wrk;
590
591   if (PREDICT_FALSE (ecm->run_test != EC_STARTING))
592     return -1;
593
594   if (err)
595     {
596       clib_warning ("connection %d failed!", api_context);
597       ecm->run_test = EC_EXITING;
598       signal_evt_to_cli (EC_CLI_CONNECTS_FAILED);
599       return 0;
600     }
601
602   thread_index = s->thread_index;
603   ASSERT (thread_index == vlib_get_thread_index ()
604           || session_transport_service_type (s) == TRANSPORT_SERVICE_CL);
605
606   wrk = ec_worker_get (thread_index);
607
608   /*
609    * Setup session
610    */
611   es = ec_session_alloc (wrk);
612
613   es->bytes_to_send = ecm->bytes_to_send;
614   es->bytes_to_receive = ecm->no_return ? 0ULL : ecm->bytes_to_send;
615   es->data.rx_fifo = s->rx_fifo;
616   es->data.rx_fifo->shr->client_session_index = es->data.session_index;
617   es->data.tx_fifo = s->tx_fifo;
618   es->data.tx_fifo->shr->client_session_index = es->data.session_index;
619   es->data.vpp_evt_q = wrk->vpp_event_queue;
620   es->vpp_session_handle = session_handle (s);
621   es->vpp_session_index = s->session_index;
622   s->opaque = es->data.session_index;
623
624   if (ecm->is_dgram)
625     {
626       transport_connection_t *tc;
627       tc = session_get_transport (s);
628       clib_memcpy_fast (&es->data.transport, tc, sizeof (es->data.transport));
629       es->data.is_dgram = 1;
630     }
631
632   vec_add1 (wrk->conn_indices, es->data.session_index);
633   clib_atomic_fetch_add (&ecm->ready_connections, 1);
634   if (ecm->ready_connections == ecm->expected_connections)
635     {
636       ecm->run_test = EC_RUNNING;
637       /* Signal the CLI process that the action is starting... */
638       signal_evt_to_cli (EC_CLI_CONNECTS_DONE);
639     }
640
641   return 0;
642 }
643
644 static void
645 ec_session_reset_callback (session_t *s)
646 {
647   ec_main_t *ecm = &ec_main;
648   vnet_disconnect_args_t _a = { 0 }, *a = &_a;
649
650   if (s->session_state == SESSION_STATE_READY)
651     clib_warning ("Reset active connection %U", format_session, s, 2);
652
653   a->handle = session_handle (s);
654   a->app_index = ecm->app_index;
655   vnet_disconnect_session (a);
656   return;
657 }
658
659 static int
660 ec_session_accept_callback (session_t *s)
661 {
662   return 0;
663 }
664
665 static void
666 ec_session_disconnect_callback (session_t *s)
667 {
668   ec_main_t *ecm = &ec_main;
669   vnet_disconnect_args_t _a = { 0 }, *a = &_a;
670   a->handle = session_handle (s);
671   a->app_index = ecm->app_index;
672   vnet_disconnect_session (a);
673   return;
674 }
675
676 void
677 ec_session_disconnect (session_t *s)
678 {
679   ec_main_t *ecm = &ec_main;
680   vnet_disconnect_args_t _a = { 0 }, *a = &_a;
681   a->handle = session_handle (s);
682   a->app_index = ecm->app_index;
683   vnet_disconnect_session (a);
684 }
685
686 static int
687 ec_session_rx_callback (session_t *s)
688 {
689   ec_main_t *ecm = &ec_main;
690   ec_worker_t *wrk;
691   ec_session_t *es;
692
693   if (PREDICT_FALSE (ecm->run_test != EC_RUNNING))
694     {
695       ec_session_disconnect (s);
696       return -1;
697     }
698
699   wrk = ec_worker_get (s->thread_index);
700   es = ec_session_get (wrk, s->opaque);
701
702   receive_data_chunk (wrk, es);
703
704   if (svm_fifo_max_dequeue_cons (s->rx_fifo))
705     {
706       if (svm_fifo_set_event (s->rx_fifo))
707         session_send_io_evt_to_thread (s->rx_fifo, SESSION_IO_EVT_BUILTIN_RX);
708     }
709   return 0;
710 }
711
712 static int
713 ec_add_segment_callback (u32 app_index, u64 segment_handle)
714 {
715   /* New segments may be added */
716   return 0;
717 }
718
719 static int
720 ec_del_segment_callback (u32 app_index, u64 segment_handle)
721 {
722   return 0;
723 }
724
725 static session_cb_vft_t ec_cb_vft = {
726   .session_reset_callback = ec_session_reset_callback,
727   .session_connected_callback = ec_session_connected_callback,
728   .session_accept_callback = ec_session_accept_callback,
729   .session_disconnect_callback = ec_session_disconnect_callback,
730   .builtin_app_rx_callback = ec_session_rx_callback,
731   .add_segment_callback = ec_add_segment_callback,
732   .del_segment_callback = ec_del_segment_callback,
733 };
734
735 static clib_error_t *
736 ec_attach ()
737 {
738   vnet_app_add_cert_key_pair_args_t _ck_pair, *ck_pair = &_ck_pair;
739   ec_main_t *ecm = &ec_main;
740   vnet_app_attach_args_t _a, *a = &_a;
741   u32 prealloc_fifos;
742   u64 options[18];
743   int rv;
744
745   clib_memset (a, 0, sizeof (*a));
746   clib_memset (options, 0, sizeof (options));
747
748   a->api_client_index = ~0;
749   a->name = format (0, "echo_client");
750   if (ecm->transport_proto == TRANSPORT_PROTO_QUIC)
751     ec_cb_vft.session_connected_callback = quic_ec_session_connected_callback;
752   a->session_cb_vft = &ec_cb_vft;
753
754   prealloc_fifos = ecm->prealloc_fifos ? ecm->expected_connections : 1;
755
756   options[APP_OPTIONS_ACCEPT_COOKIE] = 0x12345678;
757   options[APP_OPTIONS_SEGMENT_SIZE] = ecm->private_segment_size;
758   options[APP_OPTIONS_ADD_SEGMENT_SIZE] = ecm->private_segment_size;
759   options[APP_OPTIONS_RX_FIFO_SIZE] = ecm->fifo_size;
760   options[APP_OPTIONS_TX_FIFO_SIZE] = ecm->fifo_size;
761   options[APP_OPTIONS_PRIVATE_SEGMENT_COUNT] = ecm->private_segment_count;
762   options[APP_OPTIONS_PREALLOC_FIFO_PAIRS] = prealloc_fifos;
763   options[APP_OPTIONS_FLAGS] = APP_OPTIONS_FLAGS_IS_BUILTIN;
764   options[APP_OPTIONS_TLS_ENGINE] = ecm->tls_engine;
765   options[APP_OPTIONS_PCT_FIRST_ALLOC] = 100;
766   options[APP_OPTIONS_FLAGS] |= ecm->attach_flags;
767   if (ecm->appns_id)
768     {
769       options[APP_OPTIONS_NAMESPACE_SECRET] = ecm->appns_secret;
770       a->namespace_id = ecm->appns_id;
771     }
772   a->options = options;
773
774   if ((rv = vnet_application_attach (a)))
775     return clib_error_return (0, "attach returned %d", rv);
776
777   ecm->app_index = a->app_index;
778   vec_free (a->name);
779
780   clib_memset (ck_pair, 0, sizeof (*ck_pair));
781   ck_pair->cert = (u8 *) test_srv_crt_rsa;
782   ck_pair->key = (u8 *) test_srv_key_rsa;
783   ck_pair->cert_len = test_srv_crt_rsa_len;
784   ck_pair->key_len = test_srv_key_rsa_len;
785   vnet_app_add_cert_key_pair (ck_pair);
786   ecm->ckpair_index = ck_pair->index;
787
788   ecm->test_client_attached = 1;
789
790   return 0;
791 }
792
793 static int
794 ec_detach ()
795 {
796   ec_main_t *ecm = &ec_main;
797   vnet_app_detach_args_t _da, *da = &_da;
798   int rv;
799
800   if (!ecm->test_client_attached)
801     return 0;
802
803   da->app_index = ecm->app_index;
804   da->api_client_index = ~0;
805   rv = vnet_application_detach (da);
806   ecm->test_client_attached = 0;
807   ecm->app_index = ~0;
808   vnet_app_del_cert_key_pair (ecm->ckpair_index);
809
810   return rv;
811 }
812
813 static int
814 ec_transport_needs_crypto (transport_proto_t proto)
815 {
816   return proto == TRANSPORT_PROTO_TLS || proto == TRANSPORT_PROTO_DTLS ||
817          proto == TRANSPORT_PROTO_QUIC;
818 }
819
820 static int
821 ec_connect_rpc (void *args)
822 {
823   ec_main_t *ecm = &ec_main;
824   vnet_connect_args_t _a = {}, *a = &_a;
825   vlib_main_t *vm = vlib_get_main ();
826   int rv, needs_crypto;
827   u32 n_clients, ci;
828
829   n_clients = ecm->n_clients;
830   needs_crypto = ec_transport_needs_crypto (ecm->transport_proto);
831   clib_memcpy (&a->sep_ext, &ecm->connect_sep, sizeof (ecm->connect_sep));
832   a->app_index = ecm->app_index;
833
834   ci = ecm->connect_conn_index;
835
836   vlib_worker_thread_barrier_sync (vm);
837
838   while (ci < n_clients)
839     {
840       /* Crude pacing for call setups  */
841       if (ci - ecm->ready_connections > 128)
842         {
843           ecm->connect_conn_index = ci;
844           break;
845         }
846
847       a->api_context = ci;
848       if (needs_crypto)
849         {
850           session_endpoint_alloc_ext_cfg (&a->sep_ext,
851                                           TRANSPORT_ENDPT_EXT_CFG_CRYPTO);
852           a->sep_ext.ext_cfg->crypto.ckpair_index = ecm->ckpair_index;
853         }
854
855       rv = vnet_connect (a);
856
857       if (needs_crypto)
858         clib_mem_free (a->sep_ext.ext_cfg);
859
860       if (rv)
861         {
862           clib_warning ("connect returned: %U", format_session_error, rv);
863           ecm->run_test = EC_EXITING;
864           signal_evt_to_cli (EC_CLI_CONNECTS_FAILED);
865           break;
866         }
867
868       ci += 1;
869     }
870
871   vlib_worker_thread_barrier_release (vm);
872
873   if (ci < ecm->expected_connections)
874     ec_program_connects ();
875
876   return 0;
877 }
878
879 void
880 ec_program_connects (void)
881 {
882   session_send_rpc_evt_to_thread_force (0, ec_connect_rpc, 0);
883 }
884
885 #define ec_cli(_fmt, _args...)                                                \
886   if (!ecm->no_output)                                                        \
887   vlib_cli_output (vm, _fmt, ##_args)
888
889 static clib_error_t *
890 ec_command_fn (vlib_main_t *vm, unformat_input_t *input,
891                vlib_cli_command_t *cmd)
892 {
893   unformat_input_t _line_input, *line_input = &_line_input;
894   char *default_uri = "tcp://6.0.1.1/1234", *transfer_type;
895   ec_main_t *ecm = &ec_main;
896   uword *event_data = 0, event_type;
897   clib_error_t *error = 0;
898   int rv, had_config = 1;
899   u64 tmp, total_bytes;
900   f64 delta;
901
902   if (ecm->test_client_attached)
903     return clib_error_return (0, "failed: already running!");
904
905   if (ec_init (vm))
906     {
907       error = clib_error_return (0, "failed init");
908       goto cleanup;
909     }
910
911   if (!unformat_user (input, unformat_line_input, line_input))
912     {
913       had_config = 0;
914       goto parse_config;
915     }
916
917   while (unformat_check_input (line_input) != UNFORMAT_END_OF_INPUT)
918     {
919       if (unformat (line_input, "uri %s", &ecm->connect_uri))
920         ;
921       else if (unformat (line_input, "nclients %d", &ecm->n_clients))
922         ;
923       else if (unformat (line_input, "quic-streams %d", &ecm->quic_streams))
924         ;
925       else if (unformat (line_input, "mbytes %lld", &tmp))
926         ecm->bytes_to_send = tmp << 20;
927       else if (unformat (line_input, "gbytes %lld", &tmp))
928         ecm->bytes_to_send = tmp << 30;
929       else if (unformat (line_input, "bytes %U", unformat_memory_size,
930                          &ecm->bytes_to_send))
931         ;
932       else if (unformat (line_input, "test-timeout %f", &ecm->test_timeout))
933         ;
934       else if (unformat (line_input, "syn-timeout %f", &ecm->syn_timeout))
935         ;
936       else if (unformat (line_input, "no-return"))
937         ecm->no_return = 1;
938       else if (unformat (line_input, "fifo-size %d", &ecm->fifo_size))
939         ecm->fifo_size <<= 10;
940       else if (unformat (line_input, "private-segment-count %d",
941                          &ecm->private_segment_count))
942         ;
943       else if (unformat (line_input, "private-segment-size %U",
944                          unformat_memory_size, &ecm->private_segment_size))
945         ;
946       else if (unformat (line_input, "preallocate-fifos"))
947         ecm->prealloc_fifos = 1;
948       else if (unformat (line_input, "preallocate-sessions"))
949         ecm->prealloc_sessions = 1;
950       else if (unformat (line_input, "client-batch %d",
951                          &ecm->connections_per_batch))
952         ;
953       else if (unformat (line_input, "appns %_%v%_", &ecm->appns_id))
954         ;
955       else if (unformat (line_input, "all-scope"))
956         ecm->attach_flags |= (APP_OPTIONS_FLAGS_USE_GLOBAL_SCOPE |
957                               APP_OPTIONS_FLAGS_USE_LOCAL_SCOPE);
958       else if (unformat (line_input, "local-scope"))
959         ecm->attach_flags = APP_OPTIONS_FLAGS_USE_LOCAL_SCOPE;
960       else if (unformat (line_input, "global-scope"))
961         ecm->attach_flags = APP_OPTIONS_FLAGS_USE_GLOBAL_SCOPE;
962       else if (unformat (line_input, "secret %lu", &ecm->appns_secret))
963         ;
964       else if (unformat (line_input, "no-output"))
965         ecm->no_output = 1;
966       else if (unformat (line_input, "test-bytes"))
967         ecm->test_bytes = 1;
968       else if (unformat (line_input, "tls-engine %d", &ecm->tls_engine))
969         ;
970       else
971         {
972           error = clib_error_return (0, "failed: unknown input `%U'",
973                                      format_unformat_error, line_input);
974           goto cleanup;
975         }
976     }
977
978 parse_config:
979
980   ecm->expected_connections = ecm->n_clients * ecm->quic_streams;
981
982   if (!ecm->connect_uri)
983     {
984       clib_warning ("No uri provided. Using default: %s", default_uri);
985       ecm->connect_uri = format (0, "%s%c", default_uri, 0);
986     }
987
988   if ((rv = parse_uri ((char *) ecm->connect_uri, &ecm->connect_sep)))
989     {
990       error = clib_error_return (0, "Uri parse error: %d", rv);
991       goto cleanup;
992     }
993   ecm->transport_proto = ecm->connect_sep.transport_proto;
994   ecm->is_dgram = (ecm->transport_proto == TRANSPORT_PROTO_UDP);
995
996   if (ecm->prealloc_sessions)
997     ec_prealloc_sessions (ecm);
998
999   if ((error = ec_attach ()))
1000     {
1001       clib_error_report (error);
1002       goto cleanup;
1003     }
1004
1005   /*
1006    * Start. Fire off connect requests
1007    */
1008
1009   ecm->syn_start_time = vlib_time_now (vm);
1010   ec_program_connects ();
1011
1012   /*
1013    * Park until the sessions come up, or syn_timeout seconds pass
1014    */
1015
1016   vlib_process_wait_for_event_or_clock (vm, ecm->syn_timeout);
1017   event_type = vlib_process_get_events (vm, &event_data);
1018   switch (event_type)
1019     {
1020     case ~0:
1021       ec_cli ("Timeout with only %d sessions active...",
1022               ecm->ready_connections);
1023       error = clib_error_return (0, "failed: syn timeout with %d sessions",
1024                                  ecm->ready_connections);
1025       goto cleanup;
1026
1027     case EC_CLI_CONNECTS_DONE:
1028       delta = vlib_time_now (vm) - ecm->syn_start_time;
1029       if (delta != 0.0)
1030         ec_cli ("%d three-way handshakes in %.2f seconds %.2f/s",
1031                 ecm->n_clients, delta, ((f64) ecm->n_clients) / delta);
1032       break;
1033
1034     case EC_CLI_CONNECTS_FAILED:
1035       error = clib_error_return (0, "failed: connect returned");
1036       goto cleanup;
1037
1038     default:
1039       ec_cli ("unexpected event(1): %d", event_type);
1040       error = clib_error_return (0, "failed: unexpected event(1): %d",
1041                                  event_type);
1042       goto cleanup;
1043     }
1044
1045   /*
1046    * Wait for the sessions to finish or test_timeout seconds pass
1047    */
1048   ecm->test_start_time = vlib_time_now (ecm->vlib_main);
1049   ec_cli ("Test started at %.6f", ecm->test_start_time);
1050   vlib_process_wait_for_event_or_clock (vm, ecm->test_timeout);
1051   event_type = vlib_process_get_events (vm, &event_data);
1052   switch (event_type)
1053     {
1054     case ~0:
1055       ec_cli ("Timeout with %d sessions still active...",
1056               ecm->ready_connections);
1057       error = clib_error_return (0, "failed: timeout with %d sessions",
1058                                  ecm->ready_connections);
1059       goto cleanup;
1060
1061     case EC_CLI_TEST_DONE:
1062       ecm->test_end_time = vlib_time_now (vm);
1063       ec_cli ("Test finished at %.6f", ecm->test_end_time);
1064       break;
1065
1066     default:
1067       ec_cli ("unexpected event(2): %d", event_type);
1068       error = clib_error_return (0, "failed: unexpected event(2): %d",
1069                                  event_type);
1070       goto cleanup;
1071     }
1072
1073   /*
1074    * Done. Compute stats
1075    */
1076   delta = ecm->test_end_time - ecm->test_start_time;
1077   if (delta == 0.0)
1078     {
1079       ec_cli ("zero delta-t?");
1080       error = clib_error_return (0, "failed: zero delta-t");
1081       goto cleanup;
1082     }
1083
1084   total_bytes = (ecm->no_return ? ecm->tx_total : ecm->rx_total);
1085   transfer_type = ecm->no_return ? "half-duplex" : "full-duplex";
1086   ec_cli ("%lld bytes (%lld mbytes, %lld gbytes) in %.2f seconds", total_bytes,
1087           total_bytes / (1ULL << 20), total_bytes / (1ULL << 30), delta);
1088   ec_cli ("%.2f bytes/second %s", ((f64) total_bytes) / (delta),
1089           transfer_type);
1090   ec_cli ("%.4f gbit/second %s", (((f64) total_bytes * 8.0) / delta / 1e9),
1091           transfer_type);
1092
1093   if (ecm->test_bytes && ecm->test_failed)
1094     error = clib_error_return (0, "failed: test bytes");
1095
1096 cleanup:
1097
1098   /*
1099    * Cleanup
1100    */
1101   ecm->run_test = EC_EXITING;
1102   vlib_process_wait_for_event_or_clock (vm, 10e-3);
1103
1104   /* Detach the application, so we can use different fifo sizes next time */
1105   if (ec_detach ())
1106     {
1107       error = clib_error_return (0, "failed: app detach");
1108       ec_cli ("WARNING: app detach failed...");
1109     }
1110
1111   ec_cleanup (ecm);
1112   if (had_config)
1113     unformat_free (line_input);
1114
1115   if (error)
1116     ec_cli ("test failed");
1117
1118   return error;
1119 }
1120
1121 VLIB_CLI_COMMAND (ec_command, static) = {
1122   .path = "test echo clients",
1123   .short_help =
1124     "test echo clients [nclients %d][[m|g]bytes <bytes>]"
1125     "[test-timeout <time>][syn-timeout <time>][no-return][fifo-size <size>]"
1126     "[private-segment-count <count>][private-segment-size <bytes>[m|g]]"
1127     "[preallocate-fifos][preallocate-sessions][client-batch <batch-size>]"
1128     "[uri <tcp://ip/port>][test-bytes][no-output]",
1129   .function = ec_command_fn,
1130   .is_mp_safe = 1,
1131 };
1132
1133 clib_error_t *
1134 ec_main_init (vlib_main_t *vm)
1135 {
1136   ec_main_t *ecm = &ec_main;
1137   ecm->app_is_init = 0;
1138   return 0;
1139 }
1140
1141 VLIB_INIT_FUNCTION (ec_main_init);
1142
1143 /*
1144  * fd.io coding-style-patch-verification: ON
1145  *
1146  * Local Variables:
1147  * eval: (c-set-style "gnu")
1148  * End:
1149  */