session: move connects to first worker
[vpp.git] / src / plugins / hs_apps / echo_client.c
1 /*
2  * echo_client.c - vpp built-in echo client code
3  *
4  * Copyright (c) 2017-2019 by Cisco and/or its affiliates.
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at:
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17
18 #include <hs_apps/echo_client.h>
19
20 static ec_main_t ec_main;
21
22 #define EC_DBG (0)
23 #define DBG(_fmt, _args...)                                                   \
24   if (EC_DBG)                                                                 \
25   clib_warning (_fmt, ##_args)
26
27 static void
28 signal_evt_to_cli_i (void *codep)
29 {
30   ec_main_t *ecm = &ec_main;
31   int code;
32
33   ASSERT (vlib_get_thread_index () == 0);
34   code = pointer_to_uword (codep);
35   vlib_process_signal_event (ecm->vlib_main, ecm->cli_node_index, code, 0);
36 }
37
38 static void
39 signal_evt_to_cli (int code)
40 {
41   if (vlib_get_thread_index () != 0)
42     session_send_rpc_evt_to_thread_force (
43       0, signal_evt_to_cli_i, uword_to_pointer ((uword) code, void *));
44   else
45     signal_evt_to_cli_i (uword_to_pointer ((uword) code, void *));
46 }
47
48 static inline ec_worker_t *
49 ec_worker_get (u32 thread_index)
50 {
51   return vec_elt_at_index (ec_main.wrk, thread_index);
52 }
53
54 static inline ec_session_t *
55 ec_session_alloc (ec_worker_t *wrk)
56 {
57   ec_session_t *ecs;
58
59   pool_get_zero (wrk->sessions, ecs);
60   ecs->data.session_index = ecs - wrk->sessions;
61   ecs->thread_index = wrk->thread_index;
62
63   return ecs;
64 }
65
66 static inline ec_session_t *
67 ec_session_get (ec_worker_t *wrk, u32 ec_index)
68 {
69   return pool_elt_at_index (wrk->sessions, ec_index);
70 }
71
72 static void
73 send_data_chunk (ec_main_t *ecm, ec_session_t *es)
74 {
75   u8 *test_data = ecm->connect_test_data;
76   int test_buf_len, test_buf_offset, rv;
77   u32 bytes_this_chunk;
78
79   test_buf_len = vec_len (test_data);
80   ASSERT (test_buf_len > 0);
81   test_buf_offset = es->bytes_sent % test_buf_len;
82   bytes_this_chunk =
83     clib_min (test_buf_len - test_buf_offset, es->bytes_to_send);
84
85   if (!ecm->is_dgram)
86     {
87       if (ecm->no_copy)
88         {
89           svm_fifo_t *f = es->data.tx_fifo;
90           rv = clib_min (svm_fifo_max_enqueue_prod (f), bytes_this_chunk);
91           svm_fifo_enqueue_nocopy (f, rv);
92           session_send_io_evt_to_thread_custom (
93             &es->vpp_session_index, es->thread_index, SESSION_IO_EVT_TX);
94         }
95       else
96         rv = app_send_stream (&es->data, test_data + test_buf_offset,
97                               bytes_this_chunk, 0);
98     }
99   else
100     {
101       svm_fifo_t *f = es->data.tx_fifo;
102       u32 max_enqueue = svm_fifo_max_enqueue_prod (f);
103
104       if (max_enqueue < sizeof (session_dgram_hdr_t))
105         return;
106
107       max_enqueue -= sizeof (session_dgram_hdr_t);
108
109       if (ecm->no_copy)
110         {
111           session_dgram_hdr_t hdr;
112           app_session_transport_t *at = &es->data.transport;
113
114           rv = clib_min (max_enqueue, bytes_this_chunk);
115
116           hdr.data_length = rv;
117           hdr.data_offset = 0;
118           clib_memcpy_fast (&hdr.rmt_ip, &at->rmt_ip,
119                             sizeof (ip46_address_t));
120           hdr.is_ip4 = at->is_ip4;
121           hdr.rmt_port = at->rmt_port;
122           clib_memcpy_fast (&hdr.lcl_ip, &at->lcl_ip,
123                             sizeof (ip46_address_t));
124           hdr.lcl_port = at->lcl_port;
125           svm_fifo_enqueue (f, sizeof (hdr), (u8 *) & hdr);
126           svm_fifo_enqueue_nocopy (f, rv);
127           session_send_io_evt_to_thread_custom (
128             &es->vpp_session_index, es->thread_index, SESSION_IO_EVT_TX);
129         }
130       else
131         {
132           bytes_this_chunk = clib_min (bytes_this_chunk, max_enqueue);
133           bytes_this_chunk = clib_min (bytes_this_chunk, 1460);
134           rv = app_send_dgram (&es->data, test_data + test_buf_offset,
135                                bytes_this_chunk, 0);
136         }
137     }
138
139   /* If we managed to enqueue data... */
140   if (rv > 0)
141     {
142       /* Account for it... */
143       es->bytes_to_send -= rv;
144       es->bytes_sent += rv;
145
146       if (EC_DBG)
147         {
148           ELOG_TYPE_DECLARE (e) =
149             {
150               .format = "tx-enq: xfer %d bytes, sent %u remain %u",
151               .format_args = "i4i4i4",
152             };
153           struct
154           {
155             u32 data[3];
156           } *ed;
157           ed = ELOG_DATA (&vlib_global_main.elog_main, e);
158           ed->data[0] = rv;
159           ed->data[1] = es->bytes_sent;
160           ed->data[2] = es->bytes_to_send;
161         }
162     }
163 }
164
165 static void
166 receive_data_chunk (ec_worker_t *wrk, ec_session_t *es)
167 {
168   ec_main_t *ecm = &ec_main;
169   svm_fifo_t *rx_fifo = es->data.rx_fifo;
170   int n_read, i;
171
172   if (ecm->test_bytes)
173     {
174       if (!ecm->is_dgram)
175         n_read =
176           app_recv_stream (&es->data, wrk->rx_buf, vec_len (wrk->rx_buf));
177       else
178         n_read =
179           app_recv_dgram (&es->data, wrk->rx_buf, vec_len (wrk->rx_buf));
180     }
181   else
182     {
183       n_read = svm_fifo_max_dequeue_cons (rx_fifo);
184       svm_fifo_dequeue_drop (rx_fifo, n_read);
185     }
186
187   if (n_read > 0)
188     {
189       if (EC_DBG)
190         {
191           ELOG_TYPE_DECLARE (e) =
192             {
193               .format = "rx-deq: %d bytes",
194               .format_args = "i4",
195             };
196           struct
197           {
198             u32 data[1];
199           } *ed;
200           ed = ELOG_DATA (&vlib_global_main.elog_main, e);
201           ed->data[0] = n_read;
202         }
203
204       if (ecm->test_bytes)
205         {
206           for (i = 0; i < n_read; i++)
207             {
208               if (wrk->rx_buf[i] != ((es->bytes_received + i) & 0xff))
209                 {
210                   clib_warning ("read %d error at byte %lld, 0x%x not 0x%x",
211                                 n_read, es->bytes_received + i, wrk->rx_buf[i],
212                                 ((es->bytes_received + i) & 0xff));
213                   ecm->test_failed = 1;
214                 }
215             }
216         }
217       ASSERT (n_read <= es->bytes_to_receive);
218       es->bytes_to_receive -= n_read;
219       es->bytes_received += n_read;
220     }
221 }
222
223 static uword
224 ec_node_fn (vlib_main_t *vm, vlib_node_runtime_t *node, vlib_frame_t *frame)
225 {
226   u32 *conn_indices, *conns_this_batch, nconns_this_batch;
227   int thread_index = vm->thread_index, i, delete_session;
228   ec_main_t *ecm = &ec_main;
229   ec_worker_t *wrk;
230   ec_session_t *es;
231   session_t *s;
232
233   if (ecm->run_test != EC_RUNNING)
234     return 0;
235
236   wrk = ec_worker_get (thread_index);
237   conn_indices = wrk->conn_indices;
238   conns_this_batch = wrk->conns_this_batch;
239
240   if (((vec_len (conn_indices) == 0) && vec_len (conns_this_batch) == 0))
241     return 0;
242
243   /* Grab another pile of connections */
244   if (PREDICT_FALSE (vec_len (conns_this_batch) == 0))
245     {
246       nconns_this_batch =
247         clib_min (ecm->connections_per_batch, vec_len (conn_indices));
248
249       ASSERT (nconns_this_batch > 0);
250       vec_validate (conns_this_batch, nconns_this_batch - 1);
251       clib_memcpy_fast (conns_this_batch,
252                         conn_indices + vec_len (conn_indices) -
253                           nconns_this_batch,
254                         nconns_this_batch * sizeof (u32));
255       vec_dec_len (conn_indices, nconns_this_batch);
256     }
257
258   /*
259    * Track progress
260    */
261   if (PREDICT_FALSE (ecm->prev_conns != ecm->connections_per_batch &&
262                      ecm->prev_conns == vec_len (conns_this_batch)))
263     {
264       ecm->repeats++;
265       ecm->prev_conns = vec_len (conns_this_batch);
266       if (ecm->repeats == 500000)
267         {
268           clib_warning ("stuck clients");
269         }
270     }
271   else
272     {
273       ecm->prev_conns = vec_len (conns_this_batch);
274       ecm->repeats = 0;
275     }
276
277   /*
278    * Handle connections in this batch
279    */
280   for (i = 0; i < vec_len (conns_this_batch); i++)
281     {
282       es = ec_session_get (wrk, conns_this_batch[i]);
283
284       delete_session = 1;
285
286       if (es->bytes_to_send > 0)
287         {
288           send_data_chunk (ecm, es);
289           delete_session = 0;
290         }
291
292       if (es->bytes_to_receive > 0)
293         {
294           delete_session = 0;
295         }
296
297       if (PREDICT_FALSE (delete_session == 1))
298         {
299           clib_atomic_fetch_add (&ecm->tx_total, es->bytes_sent);
300           clib_atomic_fetch_add (&ecm->rx_total, es->bytes_received);
301           s = session_get_from_handle_if_valid (es->vpp_session_handle);
302
303           if (s)
304             {
305               vnet_disconnect_args_t _a, *a = &_a;
306               a->handle = session_handle (s);
307               a->app_index = ecm->app_index;
308               vnet_disconnect_session (a);
309
310               vec_delete (conns_this_batch, 1, i);
311               i--;
312               clib_atomic_fetch_add (&ecm->ready_connections, -1);
313             }
314           else
315             {
316               clib_warning ("session AWOL?");
317               vec_delete (conns_this_batch, 1, i);
318             }
319
320           /* Kick the debug CLI process */
321           if (ecm->ready_connections == 0)
322             {
323               signal_evt_to_cli (EC_CLI_TEST_DONE);
324             }
325         }
326     }
327
328   wrk->conn_indices = conn_indices;
329   wrk->conns_this_batch = conns_this_batch;
330   return 0;
331 }
332
333 VLIB_REGISTER_NODE (echo_clients_node) = {
334   .function = ec_node_fn,
335   .name = "echo-clients",
336   .type = VLIB_NODE_TYPE_INPUT,
337   .state = VLIB_NODE_STATE_DISABLED,
338 };
339
340 static void
341 ec_reset_runtime_config (ec_main_t *ecm)
342 {
343   ecm->n_clients = 1;
344   ecm->quic_streams = 1;
345   ecm->bytes_to_send = 8192;
346   ecm->no_return = 0;
347   ecm->fifo_size = 64 << 10;
348   ecm->connections_per_batch = 1000;
349   ecm->private_segment_count = 0;
350   ecm->private_segment_size = 256 << 20;
351   ecm->no_output = 0;
352   ecm->test_bytes = 0;
353   ecm->test_failed = 0;
354   ecm->tls_engine = CRYPTO_ENGINE_OPENSSL;
355   ecm->no_copy = 0;
356   ecm->run_test = EC_STARTING;
357   ecm->ready_connections = 0;
358   ecm->connect_conn_index = 0;
359   ecm->rx_total = 0;
360   ecm->tx_total = 0;
361   ecm->barrier_acq_needed = 0;
362   ecm->prealloc_sessions = 0;
363   ecm->prealloc_fifos = 0;
364   ecm->appns_id = 0;
365   ecm->appns_secret = 0;
366   ecm->attach_flags = 0;
367   ecm->syn_timeout = 20.0;
368   ecm->test_timeout = 20.0;
369   vec_free (ecm->connect_uri);
370 }
371
372 static int
373 ec_init (vlib_main_t *vm)
374 {
375   ec_main_t *ecm = &ec_main;
376   vlib_thread_main_t *vtm = vlib_get_thread_main ();
377   ec_worker_t *wrk;
378   u32 num_threads;
379   int i;
380
381   ec_reset_runtime_config (ecm);
382
383   /* Store cli process node index for signaling */
384   ecm->cli_node_index = vlib_get_current_process (vm)->node_runtime.node_index;
385   ecm->vlib_main = vm;
386
387   if (vlib_num_workers ())
388     {
389       /* The request came over the binary api and the inband cli handler
390        * is not mp_safe. Drop the barrier to make sure the workers are not
391        * blocked.
392        */
393       if (vlib_thread_is_main_w_barrier ())
394         {
395           ecm->barrier_acq_needed = 1;
396           vlib_worker_thread_barrier_release (vm);
397         }
398       /*
399        * There's a good chance that both the client and the server echo
400        * apps will be enabled so make sure the session queue node polls on
401        * the main thread as connections will probably be established on it.
402        */
403       vlib_node_set_state (vm, session_queue_node.index,
404                            VLIB_NODE_STATE_POLLING);
405     }
406
407   /* App init done only once */
408   if (ecm->app_is_init)
409     return 0;
410
411
412   /* Init test data. Big buffer */
413   vec_validate (ecm->connect_test_data, 4 * 1024 * 1024 - 1);
414   for (i = 0; i < vec_len (ecm->connect_test_data); i++)
415     ecm->connect_test_data[i] = i & 0xff;
416
417   num_threads = 1 /* main thread */ + vtm->n_threads;
418   vec_validate (ecm->wrk, num_threads);
419   vec_foreach (wrk, ecm->wrk)
420     {
421       vec_validate (wrk->rx_buf, vec_len (ecm->connect_test_data) - 1);
422       wrk->thread_index = wrk - ecm->wrk;
423       wrk->vpp_event_queue =
424         session_main_get_vpp_event_queue (wrk->thread_index);
425     }
426
427   ecm->app_is_init = 1;
428
429   vlib_worker_thread_barrier_sync (vm);
430   vnet_session_enable_disable (vm, 1 /* turn on session and transports */);
431   vlib_worker_thread_barrier_release (vm);
432
433   /* Turn on the builtin client input nodes */
434   for (i = 0; i < vtm->n_vlib_mains; i++)
435     vlib_node_set_state (vlib_get_main_by_index (i), echo_clients_node.index,
436                          VLIB_NODE_STATE_POLLING);
437
438   return 0;
439 }
440
441 static void
442 ec_prealloc_sessions (ec_main_t *ecm)
443 {
444   u32 sessions_per_wrk, n_wrks;
445   ec_worker_t *wrk;
446
447   n_wrks = vlib_num_workers () ? vlib_num_workers () : 1;
448
449   sessions_per_wrk = ecm->n_clients / n_wrks;
450   vec_foreach (wrk, ecm->wrk)
451     pool_init_fixed (wrk->sessions, 1.1 * sessions_per_wrk);
452 }
453
454 static void
455 ec_worker_cleanup (ec_worker_t *wrk)
456 {
457   pool_free (wrk->sessions);
458   vec_free (wrk->conn_indices);
459   vec_free (wrk->conns_this_batch);
460 }
461
462 static void
463 ec_cleanup (ec_main_t *ecm)
464 {
465   ec_worker_t *wrk;
466
467   vec_foreach (wrk, ecm->wrk)
468     ec_worker_cleanup (wrk);
469
470   vec_free (ecm->connect_uri);
471   vec_free (ecm->appns_id);
472
473   if (ecm->barrier_acq_needed)
474     vlib_worker_thread_barrier_sync (ecm->vlib_main);
475 }
476
477 static int
478 quic_ec_qsession_connected_callback (u32 app_index, u32 api_context,
479                                      session_t *s, session_error_t err)
480 {
481   session_endpoint_cfg_t sep = SESSION_ENDPOINT_CFG_NULL;
482   ec_main_t *ecm = &ec_main;
483   vnet_connect_args_t *a = 0;
484   session_handle_t handle;
485   u32 stream_n;
486   int rv;
487
488   DBG ("QUIC Connection handle %d", session_handle (s));
489
490   vec_validate (a, 1);
491   a->uri = (char *) ecm->connect_uri;
492   if (parse_uri (a->uri, &sep))
493     return -1;
494   sep.parent_handle = handle = session_handle (s);
495
496   for (stream_n = 0; stream_n < ecm->quic_streams; stream_n++)
497     {
498       clib_memset (a, 0, sizeof (*a));
499       a->app_index = ecm->app_index;
500       a->api_context = -1 - api_context;
501       clib_memcpy (&a->sep_ext, &sep, sizeof (sep));
502
503       DBG ("QUIC opening stream %d", stream_n);
504       if ((rv = vnet_connect (a)))
505         {
506           clib_error ("Stream session %d opening failed: %d", stream_n, rv);
507           return -1;
508         }
509       DBG ("QUIC stream %d connected", stream_n);
510     }
511   /*
512    * 's' is no longer valid, its underlying pool could have been moved in
513    * vnet_connect()
514    */
515   vec_free (a);
516   return 0;
517 }
518
519 static int
520 quic_ec_session_connected_callback (u32 app_index, u32 api_context,
521                                     session_t *s, session_error_t err)
522 {
523   ec_main_t *ecm = &ec_main;
524   ec_session_t *es;
525   ec_worker_t *wrk;
526   u32 thread_index;
527
528   if (PREDICT_FALSE (ecm->run_test != EC_STARTING))
529     return -1;
530
531   if (err)
532     {
533       clib_warning ("connection %d failed!", api_context);
534       ecm->run_test = EC_EXITING;
535       signal_evt_to_cli (EC_CLI_CONNECTS_FAILED);
536       return 0;
537     }
538
539   if (s->listener_handle == SESSION_INVALID_HANDLE)
540     return quic_ec_qsession_connected_callback (app_index, api_context, s,
541                                                 err);
542   DBG ("STREAM Connection callback %d", api_context);
543
544   thread_index = s->thread_index;
545   ASSERT (thread_index == vlib_get_thread_index ()
546           || session_transport_service_type (s) == TRANSPORT_SERVICE_CL);
547
548   wrk = ec_worker_get (thread_index);
549
550   /*
551    * Setup session
552    */
553   es = ec_session_alloc (wrk);
554
555   es->bytes_to_send = ecm->bytes_to_send;
556   es->bytes_to_receive = ecm->no_return ? 0ULL : ecm->bytes_to_send;
557   es->data.rx_fifo = s->rx_fifo;
558   es->data.rx_fifo->shr->client_session_index = es->data.session_index;
559   es->data.tx_fifo = s->tx_fifo;
560   es->data.tx_fifo->shr->client_session_index = es->data.session_index;
561   es->data.vpp_evt_q = wrk->vpp_event_queue;
562   es->vpp_session_handle = session_handle (s);
563   es->vpp_session_index = s->session_index;
564   s->opaque = es->data.session_index;
565
566   if (ecm->is_dgram)
567     {
568       transport_connection_t *tc;
569       tc = session_get_transport (s);
570       clib_memcpy_fast (&es->data.transport, tc, sizeof (es->data.transport));
571       es->data.is_dgram = 1;
572     }
573
574   vec_add1 (wrk->conn_indices, es->data.session_index);
575   clib_atomic_fetch_add (&ecm->ready_connections, 1);
576   if (ecm->ready_connections == ecm->expected_connections)
577     {
578       ecm->run_test = EC_RUNNING;
579       /* Signal the CLI process that the action is starting... */
580       signal_evt_to_cli (EC_CLI_CONNECTS_DONE);
581     }
582
583   return 0;
584 }
585
586 static int
587 ec_session_connected_callback (u32 app_index, u32 api_context, session_t *s,
588                                session_error_t err)
589 {
590   ec_main_t *ecm = &ec_main;
591   ec_session_t *es;
592   u32 thread_index;
593   ec_worker_t *wrk;
594
595   if (PREDICT_FALSE (ecm->run_test != EC_STARTING))
596     return -1;
597
598   if (err)
599     {
600       clib_warning ("connection %d failed!", api_context);
601       ecm->run_test = EC_EXITING;
602       signal_evt_to_cli (EC_CLI_CONNECTS_FAILED);
603       return 0;
604     }
605
606   thread_index = s->thread_index;
607   ASSERT (thread_index == vlib_get_thread_index ()
608           || session_transport_service_type (s) == TRANSPORT_SERVICE_CL);
609
610   wrk = ec_worker_get (thread_index);
611
612   /*
613    * Setup session
614    */
615   es = ec_session_alloc (wrk);
616
617   es->bytes_to_send = ecm->bytes_to_send;
618   es->bytes_to_receive = ecm->no_return ? 0ULL : ecm->bytes_to_send;
619   es->data.rx_fifo = s->rx_fifo;
620   es->data.rx_fifo->shr->client_session_index = es->data.session_index;
621   es->data.tx_fifo = s->tx_fifo;
622   es->data.tx_fifo->shr->client_session_index = es->data.session_index;
623   es->data.vpp_evt_q = wrk->vpp_event_queue;
624   es->vpp_session_handle = session_handle (s);
625   es->vpp_session_index = s->session_index;
626   s->opaque = es->data.session_index;
627
628   if (ecm->is_dgram)
629     {
630       transport_connection_t *tc;
631       tc = session_get_transport (s);
632       clib_memcpy_fast (&es->data.transport, tc, sizeof (es->data.transport));
633       es->data.is_dgram = 1;
634     }
635
636   vec_add1 (wrk->conn_indices, es->data.session_index);
637   clib_atomic_fetch_add (&ecm->ready_connections, 1);
638   if (ecm->ready_connections == ecm->expected_connections)
639     {
640       ecm->run_test = EC_RUNNING;
641       /* Signal the CLI process that the action is starting... */
642       signal_evt_to_cli (EC_CLI_CONNECTS_DONE);
643     }
644
645   return 0;
646 }
647
648 static void
649 ec_session_reset_callback (session_t *s)
650 {
651   ec_main_t *ecm = &ec_main;
652   vnet_disconnect_args_t _a = { 0 }, *a = &_a;
653
654   if (s->session_state == SESSION_STATE_READY)
655     clib_warning ("Reset active connection %U", format_session, s, 2);
656
657   a->handle = session_handle (s);
658   a->app_index = ecm->app_index;
659   vnet_disconnect_session (a);
660   return;
661 }
662
663 static int
664 ec_session_accept_callback (session_t *s)
665 {
666   return 0;
667 }
668
669 static void
670 ec_session_disconnect_callback (session_t *s)
671 {
672   ec_main_t *ecm = &ec_main;
673   vnet_disconnect_args_t _a = { 0 }, *a = &_a;
674   a->handle = session_handle (s);
675   a->app_index = ecm->app_index;
676   vnet_disconnect_session (a);
677   return;
678 }
679
680 void
681 ec_session_disconnect (session_t *s)
682 {
683   ec_main_t *ecm = &ec_main;
684   vnet_disconnect_args_t _a = { 0 }, *a = &_a;
685   a->handle = session_handle (s);
686   a->app_index = ecm->app_index;
687   vnet_disconnect_session (a);
688 }
689
690 static int
691 ec_session_rx_callback (session_t *s)
692 {
693   ec_main_t *ecm = &ec_main;
694   ec_worker_t *wrk;
695   ec_session_t *es;
696
697   if (PREDICT_FALSE (ecm->run_test != EC_RUNNING))
698     {
699       ec_session_disconnect (s);
700       return -1;
701     }
702
703   wrk = ec_worker_get (s->thread_index);
704   es = ec_session_get (wrk, s->opaque);
705
706   receive_data_chunk (wrk, es);
707
708   if (svm_fifo_max_dequeue_cons (s->rx_fifo))
709     {
710       if (svm_fifo_set_event (s->rx_fifo))
711         session_send_io_evt_to_thread (s->rx_fifo, SESSION_IO_EVT_BUILTIN_RX);
712     }
713   return 0;
714 }
715
716 static int
717 ec_add_segment_callback (u32 app_index, u64 segment_handle)
718 {
719   /* New segments may be added */
720   return 0;
721 }
722
723 static int
724 ec_del_segment_callback (u32 app_index, u64 segment_handle)
725 {
726   return 0;
727 }
728
729 static session_cb_vft_t ec_cb_vft = {
730   .session_reset_callback = ec_session_reset_callback,
731   .session_connected_callback = ec_session_connected_callback,
732   .session_accept_callback = ec_session_accept_callback,
733   .session_disconnect_callback = ec_session_disconnect_callback,
734   .builtin_app_rx_callback = ec_session_rx_callback,
735   .add_segment_callback = ec_add_segment_callback,
736   .del_segment_callback = ec_del_segment_callback,
737 };
738
739 static clib_error_t *
740 ec_attach ()
741 {
742   vnet_app_add_cert_key_pair_args_t _ck_pair, *ck_pair = &_ck_pair;
743   ec_main_t *ecm = &ec_main;
744   vnet_app_attach_args_t _a, *a = &_a;
745   u32 prealloc_fifos;
746   u64 options[18];
747   int rv;
748
749   clib_memset (a, 0, sizeof (*a));
750   clib_memset (options, 0, sizeof (options));
751
752   a->api_client_index = ~0;
753   a->name = format (0, "echo_client");
754   if (ecm->transport_proto == TRANSPORT_PROTO_QUIC)
755     ec_cb_vft.session_connected_callback = quic_ec_session_connected_callback;
756   a->session_cb_vft = &ec_cb_vft;
757
758   prealloc_fifos = ecm->prealloc_fifos ? ecm->expected_connections : 1;
759
760   options[APP_OPTIONS_ACCEPT_COOKIE] = 0x12345678;
761   options[APP_OPTIONS_SEGMENT_SIZE] = ecm->private_segment_size;
762   options[APP_OPTIONS_ADD_SEGMENT_SIZE] = ecm->private_segment_size;
763   options[APP_OPTIONS_RX_FIFO_SIZE] = ecm->fifo_size;
764   options[APP_OPTIONS_TX_FIFO_SIZE] = ecm->fifo_size;
765   options[APP_OPTIONS_PRIVATE_SEGMENT_COUNT] = ecm->private_segment_count;
766   options[APP_OPTIONS_PREALLOC_FIFO_PAIRS] = prealloc_fifos;
767   options[APP_OPTIONS_FLAGS] = APP_OPTIONS_FLAGS_IS_BUILTIN;
768   options[APP_OPTIONS_TLS_ENGINE] = ecm->tls_engine;
769   options[APP_OPTIONS_PCT_FIRST_ALLOC] = 100;
770   options[APP_OPTIONS_FLAGS] |= ecm->attach_flags;
771   if (ecm->appns_id)
772     {
773       options[APP_OPTIONS_NAMESPACE_SECRET] = ecm->appns_secret;
774       a->namespace_id = ecm->appns_id;
775     }
776   a->options = options;
777
778   if ((rv = vnet_application_attach (a)))
779     return clib_error_return (0, "attach returned %d", rv);
780
781   ecm->app_index = a->app_index;
782   vec_free (a->name);
783
784   clib_memset (ck_pair, 0, sizeof (*ck_pair));
785   ck_pair->cert = (u8 *) test_srv_crt_rsa;
786   ck_pair->key = (u8 *) test_srv_key_rsa;
787   ck_pair->cert_len = test_srv_crt_rsa_len;
788   ck_pair->key_len = test_srv_key_rsa_len;
789   vnet_app_add_cert_key_pair (ck_pair);
790   ecm->ckpair_index = ck_pair->index;
791
792   ecm->test_client_attached = 1;
793
794   return 0;
795 }
796
797 static int
798 ec_detach ()
799 {
800   ec_main_t *ecm = &ec_main;
801   vnet_app_detach_args_t _da, *da = &_da;
802   int rv;
803
804   if (!ecm->test_client_attached)
805     return 0;
806
807   da->app_index = ecm->app_index;
808   da->api_client_index = ~0;
809   rv = vnet_application_detach (da);
810   ecm->test_client_attached = 0;
811   ecm->app_index = ~0;
812   vnet_app_del_cert_key_pair (ecm->ckpair_index);
813
814   return rv;
815 }
816
817 static int
818 ec_transport_needs_crypto (transport_proto_t proto)
819 {
820   return proto == TRANSPORT_PROTO_TLS || proto == TRANSPORT_PROTO_DTLS ||
821          proto == TRANSPORT_PROTO_QUIC;
822 }
823
824 static int
825 ec_connect_rpc (void *args)
826 {
827   ec_main_t *ecm = &ec_main;
828   vnet_connect_args_t _a = {}, *a = &_a;
829   int rv, needs_crypto;
830   u32 n_clients, ci;
831
832   n_clients = ecm->n_clients;
833   needs_crypto = ec_transport_needs_crypto (ecm->transport_proto);
834   clib_memcpy (&a->sep_ext, &ecm->connect_sep, sizeof (ecm->connect_sep));
835   a->sep_ext.transport_flags |= TRANSPORT_CFG_F_CONNECTED;
836   a->app_index = ecm->app_index;
837
838   ci = ecm->connect_conn_index;
839
840   while (ci < n_clients)
841     {
842       /* Crude pacing for call setups  */
843       if (ci - ecm->ready_connections > 128)
844         {
845           ecm->connect_conn_index = ci;
846           break;
847         }
848
849       a->api_context = ci;
850       if (needs_crypto)
851         {
852           session_endpoint_alloc_ext_cfg (&a->sep_ext,
853                                           TRANSPORT_ENDPT_EXT_CFG_CRYPTO);
854           a->sep_ext.ext_cfg->crypto.ckpair_index = ecm->ckpair_index;
855         }
856
857       rv = vnet_connect (a);
858
859       if (needs_crypto)
860         clib_mem_free (a->sep_ext.ext_cfg);
861
862       if (rv)
863         {
864           clib_warning ("connect returned: %U", format_session_error, rv);
865           ecm->run_test = EC_EXITING;
866           signal_evt_to_cli (EC_CLI_CONNECTS_FAILED);
867           break;
868         }
869
870       ci += 1;
871     }
872
873   if (ci < ecm->expected_connections && ecm->run_test != EC_EXITING)
874     ec_program_connects ();
875
876   return 0;
877 }
878
879 void
880 ec_program_connects (void)
881 {
882   session_send_rpc_evt_to_thread_force (transport_cl_thread (), ec_connect_rpc,
883                                         0);
884 }
885
886 #define ec_cli(_fmt, _args...)                                                \
887   if (!ecm->no_output)                                                        \
888   vlib_cli_output (vm, _fmt, ##_args)
889
890 static clib_error_t *
891 ec_command_fn (vlib_main_t *vm, unformat_input_t *input,
892                vlib_cli_command_t *cmd)
893 {
894   unformat_input_t _line_input, *line_input = &_line_input;
895   char *default_uri = "tcp://6.0.1.1/1234", *transfer_type;
896   ec_main_t *ecm = &ec_main;
897   uword *event_data = 0, event_type;
898   clib_error_t *error = 0;
899   int rv, had_config = 1;
900   u64 tmp, total_bytes;
901   f64 delta;
902
903   if (ecm->test_client_attached)
904     return clib_error_return (0, "failed: already running!");
905
906   if (ec_init (vm))
907     {
908       error = clib_error_return (0, "failed init");
909       goto cleanup;
910     }
911
912   if (!unformat_user (input, unformat_line_input, line_input))
913     {
914       had_config = 0;
915       goto parse_config;
916     }
917
918   while (unformat_check_input (line_input) != UNFORMAT_END_OF_INPUT)
919     {
920       if (unformat (line_input, "uri %s", &ecm->connect_uri))
921         ;
922       else if (unformat (line_input, "nclients %d", &ecm->n_clients))
923         ;
924       else if (unformat (line_input, "quic-streams %d", &ecm->quic_streams))
925         ;
926       else if (unformat (line_input, "mbytes %lld", &tmp))
927         ecm->bytes_to_send = tmp << 20;
928       else if (unformat (line_input, "gbytes %lld", &tmp))
929         ecm->bytes_to_send = tmp << 30;
930       else if (unformat (line_input, "bytes %U", unformat_memory_size,
931                          &ecm->bytes_to_send))
932         ;
933       else if (unformat (line_input, "test-timeout %f", &ecm->test_timeout))
934         ;
935       else if (unformat (line_input, "syn-timeout %f", &ecm->syn_timeout))
936         ;
937       else if (unformat (line_input, "no-return"))
938         ecm->no_return = 1;
939       else if (unformat (line_input, "fifo-size %d", &ecm->fifo_size))
940         ecm->fifo_size <<= 10;
941       else if (unformat (line_input, "private-segment-count %d",
942                          &ecm->private_segment_count))
943         ;
944       else if (unformat (line_input, "private-segment-size %U",
945                          unformat_memory_size, &ecm->private_segment_size))
946         ;
947       else if (unformat (line_input, "preallocate-fifos"))
948         ecm->prealloc_fifos = 1;
949       else if (unformat (line_input, "preallocate-sessions"))
950         ecm->prealloc_sessions = 1;
951       else if (unformat (line_input, "client-batch %d",
952                          &ecm->connections_per_batch))
953         ;
954       else if (unformat (line_input, "appns %_%v%_", &ecm->appns_id))
955         ;
956       else if (unformat (line_input, "all-scope"))
957         ecm->attach_flags |= (APP_OPTIONS_FLAGS_USE_GLOBAL_SCOPE |
958                               APP_OPTIONS_FLAGS_USE_LOCAL_SCOPE);
959       else if (unformat (line_input, "local-scope"))
960         ecm->attach_flags = APP_OPTIONS_FLAGS_USE_LOCAL_SCOPE;
961       else if (unformat (line_input, "global-scope"))
962         ecm->attach_flags = APP_OPTIONS_FLAGS_USE_GLOBAL_SCOPE;
963       else if (unformat (line_input, "secret %lu", &ecm->appns_secret))
964         ;
965       else if (unformat (line_input, "no-output"))
966         ecm->no_output = 1;
967       else if (unformat (line_input, "test-bytes"))
968         ecm->test_bytes = 1;
969       else if (unformat (line_input, "tls-engine %d", &ecm->tls_engine))
970         ;
971       else
972         {
973           error = clib_error_return (0, "failed: unknown input `%U'",
974                                      format_unformat_error, line_input);
975           goto cleanup;
976         }
977     }
978
979 parse_config:
980
981   ecm->expected_connections = ecm->n_clients * ecm->quic_streams;
982
983   if (!ecm->connect_uri)
984     {
985       clib_warning ("No uri provided. Using default: %s", default_uri);
986       ecm->connect_uri = format (0, "%s%c", default_uri, 0);
987     }
988
989   if ((rv = parse_uri ((char *) ecm->connect_uri, &ecm->connect_sep)))
990     {
991       error = clib_error_return (0, "Uri parse error: %d", rv);
992       goto cleanup;
993     }
994   ecm->transport_proto = ecm->connect_sep.transport_proto;
995   ecm->is_dgram = (ecm->transport_proto == TRANSPORT_PROTO_UDP);
996
997   if (ecm->prealloc_sessions)
998     ec_prealloc_sessions (ecm);
999
1000   if ((error = ec_attach ()))
1001     {
1002       clib_error_report (error);
1003       goto cleanup;
1004     }
1005
1006   /*
1007    * Start. Fire off connect requests
1008    */
1009
1010   ecm->syn_start_time = vlib_time_now (vm);
1011   ec_program_connects ();
1012
1013   /*
1014    * Park until the sessions come up, or syn_timeout seconds pass
1015    */
1016
1017   vlib_process_wait_for_event_or_clock (vm, ecm->syn_timeout);
1018   event_type = vlib_process_get_events (vm, &event_data);
1019   switch (event_type)
1020     {
1021     case ~0:
1022       ec_cli ("Timeout with only %d sessions active...",
1023               ecm->ready_connections);
1024       error = clib_error_return (0, "failed: syn timeout with %d sessions",
1025                                  ecm->ready_connections);
1026       goto cleanup;
1027
1028     case EC_CLI_CONNECTS_DONE:
1029       delta = vlib_time_now (vm) - ecm->syn_start_time;
1030       if (delta != 0.0)
1031         ec_cli ("%d three-way handshakes in %.2f seconds %.2f/s",
1032                 ecm->n_clients, delta, ((f64) ecm->n_clients) / delta);
1033       break;
1034
1035     case EC_CLI_CONNECTS_FAILED:
1036       error = clib_error_return (0, "failed: connect returned");
1037       goto cleanup;
1038
1039     default:
1040       ec_cli ("unexpected event(1): %d", event_type);
1041       error = clib_error_return (0, "failed: unexpected event(1): %d",
1042                                  event_type);
1043       goto cleanup;
1044     }
1045
1046   /*
1047    * Wait for the sessions to finish or test_timeout seconds pass
1048    */
1049   ecm->test_start_time = vlib_time_now (ecm->vlib_main);
1050   ec_cli ("Test started at %.6f", ecm->test_start_time);
1051   vlib_process_wait_for_event_or_clock (vm, ecm->test_timeout);
1052   event_type = vlib_process_get_events (vm, &event_data);
1053   switch (event_type)
1054     {
1055     case ~0:
1056       ec_cli ("Timeout at %.6f with %d sessions still active...",
1057               vlib_time_now (ecm->vlib_main), ecm->ready_connections);
1058       error = clib_error_return (0, "failed: timeout with %d sessions",
1059                                  ecm->ready_connections);
1060       goto cleanup;
1061
1062     case EC_CLI_TEST_DONE:
1063       ecm->test_end_time = vlib_time_now (vm);
1064       ec_cli ("Test finished at %.6f", ecm->test_end_time);
1065       break;
1066
1067     default:
1068       ec_cli ("unexpected event(2): %d", event_type);
1069       error = clib_error_return (0, "failed: unexpected event(2): %d",
1070                                  event_type);
1071       goto cleanup;
1072     }
1073
1074   /*
1075    * Done. Compute stats
1076    */
1077   delta = ecm->test_end_time - ecm->test_start_time;
1078   if (delta == 0.0)
1079     {
1080       ec_cli ("zero delta-t?");
1081       error = clib_error_return (0, "failed: zero delta-t");
1082       goto cleanup;
1083     }
1084
1085   total_bytes = (ecm->no_return ? ecm->tx_total : ecm->rx_total);
1086   transfer_type = ecm->no_return ? "half-duplex" : "full-duplex";
1087   ec_cli ("%lld bytes (%lld mbytes, %lld gbytes) in %.2f seconds", total_bytes,
1088           total_bytes / (1ULL << 20), total_bytes / (1ULL << 30), delta);
1089   ec_cli ("%.2f bytes/second %s", ((f64) total_bytes) / (delta),
1090           transfer_type);
1091   ec_cli ("%.4f gbit/second %s", (((f64) total_bytes * 8.0) / delta / 1e9),
1092           transfer_type);
1093
1094   if (ecm->test_bytes && ecm->test_failed)
1095     error = clib_error_return (0, "failed: test bytes");
1096
1097 cleanup:
1098
1099   /*
1100    * Cleanup
1101    */
1102   ecm->run_test = EC_EXITING;
1103   vlib_process_wait_for_event_or_clock (vm, 10e-3);
1104
1105   /* Detach the application, so we can use different fifo sizes next time */
1106   if (ec_detach ())
1107     {
1108       error = clib_error_return (0, "failed: app detach");
1109       ec_cli ("WARNING: app detach failed...");
1110     }
1111
1112   ec_cleanup (ecm);
1113   if (had_config)
1114     unformat_free (line_input);
1115
1116   if (error)
1117     ec_cli ("test failed");
1118
1119   return error;
1120 }
1121
1122 VLIB_CLI_COMMAND (ec_command, static) = {
1123   .path = "test echo clients",
1124   .short_help =
1125     "test echo clients [nclients %d][[m|g]bytes <bytes>]"
1126     "[test-timeout <time>][syn-timeout <time>][no-return][fifo-size <size>]"
1127     "[private-segment-count <count>][private-segment-size <bytes>[m|g]]"
1128     "[preallocate-fifos][preallocate-sessions][client-batch <batch-size>]"
1129     "[uri <tcp://ip/port>][test-bytes][no-output]",
1130   .function = ec_command_fn,
1131   .is_mp_safe = 1,
1132 };
1133
1134 clib_error_t *
1135 ec_main_init (vlib_main_t *vm)
1136 {
1137   ec_main_t *ecm = &ec_main;
1138   ecm->app_is_init = 0;
1139   return 0;
1140 }
1141
1142 VLIB_INIT_FUNCTION (ec_main_init);
1143
1144 /*
1145  * fd.io coding-style-patch-verification: ON
1146  *
1147  * Local Variables:
1148  * eval: (c-set-style "gnu")
1149  * End:
1150  */