hsa: make vpp_echo use mq instead of bapi
[vpp.git] / src / plugins / hs_apps / sapi / vpp_echo_proto_quic.c
1 /*
2  * Copyright (c) 2019 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include <stdio.h>
17 #include <signal.h>
18
19 #include <hs_apps/sapi/vpp_echo_common.h>
20
21 typedef struct _quic_echo_cb_vft
22 {
23   void (*quic_connected_cb) (session_connected_msg_t * mp, u32 session_index);
24   void (*client_stream_connected_cb) (session_connected_msg_t * mp,
25                                       u32 session_index);
26   void (*server_stream_connected_cb) (session_connected_msg_t * mp,
27                                       u32 session_index);
28   void (*quic_accepted_cb) (session_accepted_msg_t * mp, u32 session_index);
29   void (*client_stream_accepted_cb) (session_accepted_msg_t * mp,
30                                      u32 session_index);
31   void (*server_stream_accepted_cb) (session_accepted_msg_t * mp,
32                                      u32 session_index);
33 } quic_echo_cb_vft_t;
34
35 typedef struct
36 {
37   quic_echo_cb_vft_t cb_vft;    /* cb vft for QUIC scenarios */
38   u8 send_quic_disconnects;     /* actively send disconnect */
39   u32 n_stream_clients;         /* Target Number of STREAM sessions per QUIC session */
40   volatile u32 n_quic_clients_connected;        /* Number of connected QUIC sessions */
41 } quic_echo_proto_main_t;
42
43 quic_echo_proto_main_t quic_echo_proto_main;
44
45 /*
46  *
47  *  ECHO Callback definitions
48  *
49  */
50
51 static void
52 quic_echo_on_connected_connect (session_connected_msg_t * mp,
53                                 u32 session_index)
54 {
55   echo_main_t *em = &echo_main;
56   quic_echo_proto_main_t *eqm = &quic_echo_proto_main;
57   u64 i;
58
59   echo_notify_event (em, ECHO_EVT_FIRST_SCONNECT);
60   for (i = 0; i < eqm->n_stream_clients; i++)
61     echo_send_rpc (em, echo_send_connect, (void *) mp->handle, session_index);
62
63   ECHO_LOG (0, "Qsession 0x%llx connected to %U:%d",
64             mp->handle, format_ip46_address, &mp->lcl.ip,
65             mp->lcl.is_ip4, clib_net_to_host_u16 (mp->lcl.port));
66 }
67
68 static void
69 quic_echo_on_connected_send (session_connected_msg_t * mp, u32 session_index)
70 {
71   static u32 client_index = 0;
72   echo_main_t *em = &echo_main;
73   echo_session_t *session;
74
75   session = pool_elt_at_index (em->sessions, session_index);
76   session->bytes_to_send = em->bytes_to_send;
77   session->bytes_to_receive = em->bytes_to_receive;
78   session->session_state = ECHO_SESSION_STATE_READY;
79   em->data_thread_args[client_index++] = session->session_index;
80 }
81
82 static void
83 quic_echo_on_connected_error (session_connected_msg_t * mp, u32 session_index)
84 {
85   ECHO_FAIL (ECHO_FAIL_QUIC_WRONG_CONNECT,
86              "Got a wrong connected on session %u [%lx]", session_index,
87              mp->handle);
88 }
89
90 static void
91 quic_echo_on_accept_recv (session_accepted_msg_t * mp, u32 session_index)
92 {
93   static u32 client_index = 0;
94   echo_main_t *em = &echo_main;
95   echo_session_t *session;
96
97   session = pool_elt_at_index (em->sessions, session_index);
98   session->bytes_to_send = em->bytes_to_send;
99   session->bytes_to_receive = em->bytes_to_receive;
100   em->data_thread_args[client_index++] = session->session_index;
101   session->session_state = ECHO_SESSION_STATE_READY;
102 }
103
104 static void
105 quic_echo_on_accept_connect (session_accepted_msg_t * mp, u32 session_index)
106 {
107   echo_main_t *em = &echo_main;
108   quic_echo_proto_main_t *eqm = &quic_echo_proto_main;
109   ECHO_LOG (1, "Accept on QSession 0x%lx %u", mp->handle);
110   u32 i;
111
112   echo_notify_event (em, ECHO_EVT_FIRST_SCONNECT);
113   for (i = 0; i < eqm->n_stream_clients; i++)
114     echo_send_rpc (em, echo_send_connect, (void *) mp->handle, session_index);
115 }
116
117 static void
118 quic_echo_on_accept_error (session_accepted_msg_t * mp, u32 session_index)
119 {
120   ECHO_FAIL (ECHO_FAIL_QUIC_WRONG_ACCEPT,
121              "Got a wrong accept on session %u [%lx]", session_index,
122              mp->handle);
123 }
124
125 static void
126 quic_echo_on_accept_log_ip (session_accepted_msg_t * mp, u32 session_index)
127 {
128   u8 *ip_str;
129   ip_str = format (0, "%U", format_ip46_address, &mp->rmt.ip, mp->rmt.is_ip4);
130   ECHO_LOG (0, "Accepted session from: %s:%d", ip_str,
131             clib_net_to_host_u16 (mp->rmt.port));
132
133 }
134
135 static const quic_echo_cb_vft_t default_cb_vft = {
136   /* Qsessions */
137   .quic_accepted_cb = quic_echo_on_accept_log_ip,
138   .quic_connected_cb = quic_echo_on_connected_connect,
139   /* client initiated streams */
140   .server_stream_accepted_cb = quic_echo_on_accept_recv,
141   .client_stream_connected_cb = quic_echo_on_connected_send,
142   /* server initiated streams */
143   .client_stream_accepted_cb = quic_echo_on_accept_error,
144   .server_stream_connected_cb = quic_echo_on_connected_error,
145 };
146
147 static const quic_echo_cb_vft_t server_stream_cb_vft = {
148   /* Qsessions */
149   .quic_accepted_cb = quic_echo_on_accept_connect,
150   .quic_connected_cb = NULL,
151   /* client initiated streams */
152   .server_stream_accepted_cb = quic_echo_on_accept_error,
153   .client_stream_connected_cb = quic_echo_on_connected_error,
154   /* server initiated streams */
155   .client_stream_accepted_cb = quic_echo_on_accept_recv,
156   .server_stream_connected_cb = quic_echo_on_connected_send,
157 };
158
159 static void quic_echo_cleanup_cb (echo_session_t * s, u8 parent_died);
160
161 static inline void
162 quic_echo_cleanup_listener (u32 listener_index, echo_main_t * em,
163                             quic_echo_proto_main_t * eqm)
164 {
165   echo_session_t *ls;
166   ls = pool_elt_at_index (em->sessions, listener_index);
167   ASSERT (ls->session_type == ECHO_SESSION_TYPE_QUIC);
168   if (!clib_atomic_sub_fetch (&ls->accepted_session_count, 1))
169     {
170       if (eqm->send_quic_disconnects == ECHO_CLOSE_F_ACTIVE)
171         {
172           echo_send_rpc (em, echo_send_disconnect_session,
173                          (void *) ls->vpp_session_handle, 0);
174           clib_atomic_fetch_add (&em->stats.active_count.q, 1);
175         }
176       else if (eqm->send_quic_disconnects == ECHO_CLOSE_F_NONE)
177         {
178           quic_echo_cleanup_cb (ls, 0 /* parent_died */ );
179           clib_atomic_fetch_add (&em->stats.clean_count.q, 1);
180         }
181     }
182 }
183
184 static void
185 quic_echo_cleanup_cb (echo_session_t * s, u8 parent_died)
186 {
187   echo_main_t *em = &echo_main;
188   quic_echo_proto_main_t *eqm = &quic_echo_proto_main;
189   ASSERT (s->session_state < ECHO_SESSION_STATE_CLOSED);
190   if (s->session_type == ECHO_SESSION_TYPE_QUIC)
191     {
192       if (parent_died)
193         clib_atomic_fetch_add (&em->stats.clean_count.q, 1);
194       /* Don't cleanup listener as it's handled by main() */
195       clib_atomic_sub_fetch (&eqm->n_quic_clients_connected, 1);
196     }
197   else if (s->session_type == ECHO_SESSION_TYPE_STREAM)
198     {
199       if (parent_died)
200         clib_atomic_fetch_add (&em->stats.clean_count.s, 1);
201       else
202         quic_echo_cleanup_listener (s->listener_index, em, eqm);
203       clib_atomic_sub_fetch (&em->n_clients_connected, 1);
204     }
205
206   ECHO_LOG (1, "Cleanup sessions (still %uQ %uS)",
207             eqm->n_quic_clients_connected, em->n_clients_connected);
208   s->session_state = ECHO_SESSION_STATE_CLOSED;
209   if (!em->n_clients_connected && !eqm->n_quic_clients_connected)
210     em->state = STATE_DATA_DONE;
211 }
212
213 static void
214 quic_echo_initiate_qsession_close_no_stream (echo_main_t * em)
215 {
216   quic_echo_proto_main_t *eqm = &quic_echo_proto_main;
217   ECHO_LOG (1, "Closing Qsessions");
218   /* Close Quic session without streams */
219   echo_session_t *s;
220
221   /* *INDENT-OFF* */
222   pool_foreach (s, em->sessions,
223   ({
224     if (s->session_type == ECHO_SESSION_TYPE_QUIC)
225       {
226         if (eqm->send_quic_disconnects == ECHO_CLOSE_F_ACTIVE)
227           {
228             ECHO_LOG (1,"ACTIVE close 0x%lx", s->vpp_session_handle);
229             echo_send_rpc (em, echo_send_disconnect_session, (void *) s->vpp_session_handle, 0);
230             clib_atomic_fetch_add (&em->stats.active_count.q, 1);
231           }
232         else if (eqm->send_quic_disconnects == ECHO_CLOSE_F_NONE)
233           {
234             ECHO_LOG (1,"Discard close 0x%lx", s->vpp_session_handle);
235             quic_echo_cleanup_cb (s, 0 /* parent_died */);
236             clib_atomic_fetch_add (&em->stats.clean_count.q, 1);
237           }
238         else
239           ECHO_LOG (1,"Passive close 0x%lx", s->vpp_session_handle);
240       }
241   }));
242   /* *INDENT-ON* */
243 }
244
245 static void
246 quic_echo_on_connected (session_connected_msg_t * mp, u32 session_index)
247 {
248   echo_main_t *em = &echo_main;
249   quic_echo_proto_main_t *eqm = &quic_echo_proto_main;
250   echo_session_t *listen_session;
251   echo_session_t *session = pool_elt_at_index (em->sessions, session_index);
252   if (session->listener_index == SESSION_INVALID_INDEX)
253     {
254       ECHO_LOG (1, "Connected session 0x%lx -> URI", mp->handle);
255       session->session_type = ECHO_SESSION_TYPE_QUIC;
256       session->accepted_session_count = 0;
257       if (eqm->cb_vft.quic_connected_cb)
258         eqm->cb_vft.quic_connected_cb (mp, session->session_index);
259       clib_atomic_fetch_add (&eqm->n_quic_clients_connected, 1);
260     }
261   else
262     {
263       listen_session =
264         pool_elt_at_index (em->sessions, session->listener_index);
265       ECHO_LOG (1, "Connected session 0x%lx -> 0x%lx", mp->handle,
266                 listen_session->vpp_session_handle);
267       session->session_type = ECHO_SESSION_TYPE_STREAM;
268       clib_atomic_fetch_add (&listen_session->accepted_session_count, 1);
269       if (em->i_am_master && eqm->cb_vft.server_stream_connected_cb)
270         eqm->cb_vft.server_stream_connected_cb (mp, session->session_index);
271       if (!em->i_am_master && eqm->cb_vft.client_stream_connected_cb)
272         eqm->cb_vft.client_stream_connected_cb (mp, session->session_index);
273       clib_atomic_fetch_add (&em->n_clients_connected, 1);
274     }
275
276   if (em->n_clients_connected == em->n_clients
277       && em->n_clients_connected != 0)
278     echo_notify_event (em, ECHO_EVT_LAST_SCONNECTED);
279
280   if (eqm->n_quic_clients_connected == em->n_connects
281       && em->state < STATE_READY)
282     {
283       echo_notify_event (em, ECHO_EVT_LAST_QCONNECTED);
284       em->state = STATE_READY;
285       if (eqm->n_stream_clients == 0)
286         quic_echo_initiate_qsession_close_no_stream (em);
287     }
288 }
289
290 static void
291 quic_echo_retry_connect (u32 session_index)
292 {
293   /* retry connect */
294   echo_session_t *session;
295   echo_main_t *em = &echo_main;
296   if (session_index == SESSION_INVALID_INDEX)
297     {
298       ECHO_LOG (1, "Retrying connect %s", em->uri);
299       echo_send_rpc (em, echo_send_connect, (void *) SESSION_INVALID_HANDLE,
300                      SESSION_INVALID_INDEX);
301     }
302   else
303     {
304       session = pool_elt_at_index (em->sessions, session_index);
305       ECHO_LOG (1, "Retrying connect 0x%lx", session->vpp_session_handle);
306       echo_send_rpc (em, echo_send_connect,
307                      (void *) session->vpp_session_handle, session_index);
308     }
309 }
310
311 static void
312 quic_echo_connected_cb (session_connected_bundled_msg_t * mp,
313                         u32 session_index, u8 is_failed)
314 {
315   if (is_failed)
316     return quic_echo_retry_connect (session_index);
317   return quic_echo_on_connected ((session_connected_msg_t *) mp,
318                                  session_index);
319 }
320
321 static void
322 quic_echo_accepted_cb (session_accepted_msg_t * mp, echo_session_t * session)
323 {
324   echo_main_t *em = &echo_main;
325   quic_echo_proto_main_t *eqm = &quic_echo_proto_main;
326   echo_session_t *ls;
327   ls = pool_elt_at_index (em->sessions, session->listener_index);
328   if (ls->session_type == ECHO_SESSION_TYPE_LISTEN)
329     {
330       echo_notify_event (em, ECHO_EVT_FIRST_QCONNECT);
331       session->session_type = ECHO_SESSION_TYPE_QUIC;
332       session->accepted_session_count = 0;
333       if (eqm->cb_vft.quic_accepted_cb)
334         eqm->cb_vft.quic_accepted_cb (mp, session->session_index);
335       clib_atomic_fetch_add (&eqm->n_quic_clients_connected, 1);
336     }
337   else
338     {
339       session->session_type = ECHO_SESSION_TYPE_STREAM;
340       echo_notify_event (em, ECHO_EVT_FIRST_SCONNECT);
341       clib_atomic_fetch_add (&ls->accepted_session_count, 1);
342       if (em->i_am_master && eqm->cb_vft.server_stream_accepted_cb)
343         eqm->cb_vft.server_stream_accepted_cb (mp, session->session_index);
344       if (!em->i_am_master && eqm->cb_vft.client_stream_accepted_cb)
345         eqm->cb_vft.client_stream_accepted_cb (mp, session->session_index);
346       clib_atomic_fetch_add (&em->n_clients_connected, 1);
347     }
348
349   if (em->n_clients_connected == em->n_clients
350       && em->n_clients_connected != 0)
351     echo_notify_event (em, ECHO_EVT_LAST_SCONNECTED);
352
353   if (eqm->n_quic_clients_connected == em->n_connects
354       && em->state < STATE_READY)
355     {
356       echo_notify_event (em, ECHO_EVT_LAST_QCONNECTED);
357       em->state = STATE_READY;
358       if (eqm->n_stream_clients == 0)
359         quic_echo_initiate_qsession_close_no_stream (em);
360     }
361 }
362
363 static void
364 quic_echo_sent_disconnect_cb (echo_session_t * s)
365 {
366   if (s->session_type == ECHO_SESSION_TYPE_STREAM)
367     s->session_state = ECHO_SESSION_STATE_CLOSING;
368   else
369     quic_echo_cleanup_cb (s, 0 /* parent_died */ );     /* We can clean Q/Lsessions right away */
370 }
371
372 static void
373 quic_echo_disconnected_cb (session_disconnected_msg_t * mp,
374                            echo_session_t * s)
375 {
376   echo_main_t *em = &echo_main;
377   if (s->session_type == ECHO_SESSION_TYPE_STREAM)
378     {
379       echo_session_print_stats (em, s);
380       if (s->bytes_to_receive || s->bytes_to_send)
381         s->session_state = ECHO_SESSION_STATE_AWAIT_DATA;
382       else
383         s->session_state = ECHO_SESSION_STATE_CLOSING;
384       clib_atomic_fetch_add (&em->stats.close_count.s, 1);
385     }
386   else
387     {
388       quic_echo_cleanup_cb (s, 0 /* parent_died */ );   /* We can clean Q/Lsessions right away */
389       clib_atomic_fetch_add (&em->stats.close_count.q, 1);
390     }
391 }
392
393 static void
394 quic_echo_reset_cb (session_reset_msg_t * mp, echo_session_t * s)
395 {
396   echo_main_t *em = &echo_main;
397   if (s->session_type == ECHO_SESSION_TYPE_STREAM)
398     {
399       clib_atomic_fetch_add (&em->stats.reset_count.s, 1);
400       s->session_state = ECHO_SESSION_STATE_CLOSING;
401     }
402   else
403     {
404       clib_atomic_fetch_add (&em->stats.reset_count.q, 1);
405       quic_echo_cleanup_cb (s, 0 /* parent_died */ );   /* We can clean Q/Lsessions right away */
406     }
407 }
408
409 static uword
410 quic_echo_unformat_setup_vft (unformat_input_t * input, va_list * args)
411 {
412   quic_echo_proto_main_t *eqm = &quic_echo_proto_main;
413   if (unformat (input, "serverstream"))
414     eqm->cb_vft = server_stream_cb_vft;
415   else if (unformat (input, "default"))
416     ;
417   else
418     return 0;
419   return 1;
420 }
421
422 static int
423 quic_echo_process_opts_cb (unformat_input_t * a)
424 {
425   echo_main_t *em = &echo_main;
426   quic_echo_proto_main_t *eqm = &quic_echo_proto_main;
427   if (unformat (a, "nclients %d/%d", &em->n_clients, &eqm->n_stream_clients))
428     ;
429   else if (unformat (a, "quic-setup %U", quic_echo_unformat_setup_vft))
430     ;
431   else if (unformat (a, "qclose=%U",
432                      echo_unformat_close, &eqm->send_quic_disconnects))
433     ;
434   else
435     return 0;
436   return 1;
437 }
438
439 static void
440 quic_echo_set_defaults_before_opts_cb ()
441 {
442   quic_echo_proto_main_t *eqm = &quic_echo_proto_main;
443   eqm->cb_vft = default_cb_vft;
444   eqm->n_stream_clients = 1;
445 }
446
447 static void
448 quic_echo_set_defaults_after_opts_cb ()
449 {
450   quic_echo_proto_main_t *eqm = &quic_echo_proto_main;
451   echo_main_t *em = &echo_main;
452   u8 default_f_active;
453
454   if (em->crypto_ctx_engine == TLS_ENGINE_NONE)
455     em->crypto_ctx_engine = CRYPTO_ENGINE_PICOTLS;
456   em->n_connects = em->n_clients;
457   em->n_sessions =
458     clib_max (1, eqm->n_stream_clients) * em->n_clients + em->n_clients + 1;
459   em->n_clients = eqm->n_stream_clients * em->n_clients;
460
461   if (em->i_am_master)
462     default_f_active =
463       em->bytes_to_send == 0 ? ECHO_CLOSE_F_ACTIVE : ECHO_CLOSE_F_PASSIVE;
464   else
465     default_f_active =
466       em->bytes_to_receive == 0 ? ECHO_CLOSE_F_PASSIVE : ECHO_CLOSE_F_ACTIVE;
467   if (eqm->send_quic_disconnects == ECHO_CLOSE_F_INVALID)
468     eqm->send_quic_disconnects = default_f_active;
469 }
470
471 static void
472 quic_echo_print_usage_cb ()
473 {
474   fprintf (stderr,
475            "-- QUIC specific options -- \n"
476            "  quic-setup OPT      OPT=serverstream : Client open N connections. \n"
477            "                       On each one server opens M streams\n"
478            "                      OPT=default : Client open N connections.\n"
479            "                       On each one client opens M streams\n"
480            "  qclose=[Y|N|W]      When a connection is done pass[N] send[Y] or wait[W] for close\n"
481            "\n"
482            "  nclients N[/M]      Open N QUIC connections, each one with M streams (M defaults to 1)\n");
483 }
484
485 echo_proto_cb_vft_t quic_echo_proto_cb_vft = {
486   .disconnected_cb = quic_echo_disconnected_cb,
487   .connected_cb = quic_echo_connected_cb,
488   .accepted_cb = quic_echo_accepted_cb,
489   .reset_cb = quic_echo_reset_cb,
490   .sent_disconnect_cb = quic_echo_sent_disconnect_cb,
491   .cleanup_cb = quic_echo_cleanup_cb,
492   .process_opts_cb = quic_echo_process_opts_cb,
493   .print_usage_cb = quic_echo_print_usage_cb,
494   .set_defaults_before_opts_cb = quic_echo_set_defaults_before_opts_cb,
495   .set_defaults_after_opts_cb = quic_echo_set_defaults_after_opts_cb,
496 };
497
498 ECHO_REGISTER_PROTO (TRANSPORT_PROTO_QUIC, quic_echo_proto_cb_vft);
499
500 /*
501  * fd.io coding-style-patch-verification: ON
502  *
503  * Local Variables:
504  * eval: (c-set-style "gnu")
505  * End:
506  */