quic: Add support for unidirectional streams
[vpp.git] / src / plugins / hs_apps / sapi / vpp_echo_proto_quic.c
1 /*
2  * Copyright (c) 2019 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include <stdio.h>
17 #include <signal.h>
18
19 #include <hs_apps/sapi/vpp_echo_common.h>
20
21 typedef struct _quic_echo_cb_vft
22 {
23   void (*quic_connected_cb) (session_connected_msg_t * mp, u32 session_index);
24   void (*client_stream_connected_cb) (session_connected_msg_t * mp,
25                                       u32 session_index);
26   void (*server_stream_connected_cb) (session_connected_msg_t * mp,
27                                       u32 session_index);
28   void (*quic_accepted_cb) (session_accepted_msg_t * mp, u32 session_index);
29   void (*client_stream_accepted_cb) (session_accepted_msg_t * mp,
30                                      u32 session_index);
31   void (*server_stream_accepted_cb) (session_accepted_msg_t * mp,
32                                      u32 session_index);
33 } quic_echo_cb_vft_t;
34
35 typedef struct
36 {
37   quic_echo_cb_vft_t cb_vft;    /* cb vft for QUIC scenarios */
38   u8 send_quic_disconnects;     /* actively send disconnect */
39   u32 n_stream_clients;         /* Target Number of STREAM sessions per QUIC session */
40   volatile u32 n_quic_clients_connected;        /* Number of connected QUIC sessions */
41 } quic_echo_proto_main_t;
42
43 quic_echo_proto_main_t quic_echo_proto_main;
44
45 /*
46  *
47  *  ECHO Callback definitions
48  *
49  */
50
51 static void
52 quic_echo_on_connected_connect (session_connected_msg_t * mp,
53                                 u32 session_index)
54 {
55   echo_main_t *em = &echo_main;
56   quic_echo_proto_main_t *eqm = &quic_echo_proto_main;
57   echo_connect_args_t _a, *a = &_a;
58   u64 i;
59
60   a->parent_session_handle = mp->handle;
61   a->context = session_index;
62   clib_memcpy_fast (&a->lcl_ip, &em->lcl_ip, sizeof (ip46_address_t));
63   clib_memcpy_fast (&a->ip, &em->uri_elts.ip, sizeof (ip46_address_t));
64
65   echo_notify_event (em, ECHO_EVT_FIRST_SCONNECT);
66   for (i = 0; i < eqm->n_stream_clients; i++)
67     echo_send_rpc (em, echo_send_connect, (echo_rpc_args_t *) a);
68
69   ECHO_LOG (1, "Qsession 0x%llx S[%d] connected to %U:%d",
70             mp->handle, session_index, format_ip46_address, &mp->lcl.ip,
71             mp->lcl.is_ip4, clib_net_to_host_u16 (mp->lcl.port));
72 }
73
74 static void
75 quic_echo_on_connected_send (session_connected_msg_t * mp, u32 session_index)
76 {
77   static u32 client_index = 0;
78   echo_main_t *em = &echo_main;
79   echo_session_t *session;
80
81   session = pool_elt_at_index (em->sessions, session_index);
82   session->bytes_to_send = em->bytes_to_send;
83   session->bytes_to_receive = em->bytes_to_receive;
84   session->session_state = ECHO_SESSION_STATE_READY;
85   em->data_thread_args[client_index++] = session->session_index;
86 }
87
88 static void
89 quic_echo_on_connected_error (session_connected_msg_t * mp, u32 session_index)
90 {
91   ECHO_FAIL (ECHO_FAIL_QUIC_WRONG_CONNECT,
92              "Got a wrong connected on session %u [%lx]", session_index,
93              mp->handle);
94 }
95
96 static void
97 quic_echo_on_accept_recv (session_accepted_msg_t * mp, u32 session_index)
98 {
99   static u32 client_index = 0;
100   echo_main_t *em = &echo_main;
101   echo_session_t *session;
102
103   session = pool_elt_at_index (em->sessions, session_index);
104   session->bytes_to_send = em->bytes_to_send;
105   session->bytes_to_receive = em->bytes_to_receive;
106   em->data_thread_args[client_index++] = session->session_index;
107   session->session_state = ECHO_SESSION_STATE_READY;
108 }
109
110 static void
111 quic_echo_on_accept_connect (session_accepted_msg_t * mp, u32 session_index)
112 {
113   echo_main_t *em = &echo_main;
114   quic_echo_proto_main_t *eqm = &quic_echo_proto_main;
115   ECHO_LOG (2, "Accept on QSession 0x%lx S[%u]", mp->handle, session_index);
116   echo_connect_args_t _a, *a = &_a;
117   u32 i;
118
119   a->parent_session_handle = mp->handle;
120   a->context = session_index;
121   clib_memcpy_fast (&a->lcl_ip, &em->lcl_ip, sizeof (ip46_address_t));
122   clib_memcpy_fast (&a->ip, &em->uri_elts.ip, sizeof (ip46_address_t));
123
124   echo_notify_event (em, ECHO_EVT_FIRST_SCONNECT);
125   for (i = 0; i < eqm->n_stream_clients; i++)
126     echo_send_rpc (em, echo_send_connect, (echo_rpc_args_t *) a);
127 }
128
129 static void
130 quic_echo_on_accept_error (session_accepted_msg_t * mp, u32 session_index)
131 {
132   ECHO_FAIL (ECHO_FAIL_QUIC_WRONG_ACCEPT,
133              "Got a wrong accept on session 0x%lx S[%u]", mp->handle,
134              session_index);
135 }
136
137 static void
138 quic_echo_on_accept_log_ip (session_accepted_msg_t * mp, u32 session_index)
139 {
140   u8 *ip_str;
141   ip_str = format (0, "%U", format_ip46_address, &mp->rmt.ip, mp->rmt.is_ip4);
142   ECHO_LOG (1, "Accepted session from: %s:%d", ip_str,
143             clib_net_to_host_u16 (mp->rmt.port));
144
145 }
146
147 static const quic_echo_cb_vft_t default_cb_vft = {
148   /* Qsessions */
149   .quic_accepted_cb = quic_echo_on_accept_log_ip,
150   .quic_connected_cb = quic_echo_on_connected_connect,
151   /* client initiated streams */
152   .server_stream_accepted_cb = quic_echo_on_accept_recv,
153   .client_stream_connected_cb = quic_echo_on_connected_send,
154   /* server initiated streams */
155   .client_stream_accepted_cb = quic_echo_on_accept_error,
156   .server_stream_connected_cb = quic_echo_on_connected_error,
157 };
158
159 static const quic_echo_cb_vft_t server_stream_cb_vft = {
160   /* Qsessions */
161   .quic_accepted_cb = quic_echo_on_accept_connect,
162   .quic_connected_cb = NULL,
163   /* client initiated streams */
164   .server_stream_accepted_cb = quic_echo_on_accept_error,
165   .client_stream_connected_cb = quic_echo_on_connected_error,
166   /* server initiated streams */
167   .client_stream_accepted_cb = quic_echo_on_accept_recv,
168   .server_stream_connected_cb = quic_echo_on_connected_send,
169 };
170
171 static void quic_echo_cleanup_cb (echo_session_t * s, u8 parent_died);
172
173 static inline void
174 quic_echo_cleanup_listener (u32 listener_index, echo_main_t * em,
175                             quic_echo_proto_main_t * eqm)
176 {
177   echo_session_t *ls;
178   ls = pool_elt_at_index (em->sessions, listener_index);
179   if (ls->session_type != ECHO_SESSION_TYPE_QUIC)
180     {
181       ECHO_LOG (2, "%U: Invalid listener session type",
182                 echo_format_session, ls);
183       return;
184     }
185   if (!clib_atomic_sub_fetch (&ls->accepted_session_count, 1))
186     {
187       if (eqm->send_quic_disconnects == ECHO_CLOSE_F_ACTIVE)
188         {
189           echo_send_rpc (em, echo_send_disconnect_session,
190                          (echo_rpc_args_t *) & ls->vpp_session_handle);
191           clib_atomic_fetch_add (&em->stats.active_count.q, 1);
192         }
193       else if (eqm->send_quic_disconnects == ECHO_CLOSE_F_NONE)
194         {
195           quic_echo_cleanup_cb (ls, 0 /* parent_died */ );
196           clib_atomic_fetch_add (&em->stats.clean_count.q, 1);
197         }
198     }
199 }
200
201 static void
202 quic_echo_cleanup_cb (echo_session_t * s, u8 parent_died)
203 {
204   echo_main_t *em = &echo_main;
205   quic_echo_proto_main_t *eqm = &quic_echo_proto_main;
206   if ((em->state == STATE_DATA_DONE) ||
207       !(s->session_state < ECHO_SESSION_STATE_CLOSED))
208     return;
209   ECHO_LOG (3, "%U cleanup (parent_died %d)", echo_format_session, s,
210             parent_died);
211   s->session_state = ECHO_SESSION_STATE_CLOSED;
212   if (s->session_type == ECHO_SESSION_TYPE_QUIC)
213     {
214       if (parent_died)
215         clib_atomic_fetch_add (&em->stats.clean_count.q, 1);
216       /* Don't cleanup listener as it's handled by main() */
217       clib_atomic_sub_fetch (&eqm->n_quic_clients_connected, 1);
218     }
219   else if (s->session_type == ECHO_SESSION_TYPE_STREAM)
220     {
221       if (parent_died)
222         clib_atomic_fetch_add (&em->stats.clean_count.s, 1);
223       else
224         quic_echo_cleanup_listener (s->listener_index, em, eqm);
225       clib_atomic_sub_fetch (&em->n_clients_connected, 1);
226     }
227   if (!em->n_clients_connected && !eqm->n_quic_clients_connected)
228     em->state = STATE_DATA_DONE;
229   ECHO_LOG (2, "Cleanup sessions (still %uQ %uS): app %U",
230             eqm->n_quic_clients_connected, em->n_clients_connected,
231             echo_format_app_state, em->state);
232 }
233
234 static void
235 quic_echo_initiate_qsession_close_no_stream (echo_main_t * em)
236 {
237   quic_echo_proto_main_t *eqm = &quic_echo_proto_main;
238   ECHO_LOG (2, "Closing Qsessions");
239   /* Close Quic session without streams */
240   echo_session_t *s;
241
242   /* *INDENT-OFF* */
243   pool_foreach (s, em->sessions,
244   ({
245     if (s->session_type == ECHO_SESSION_TYPE_QUIC)
246       {
247         if (eqm->send_quic_disconnects == ECHO_CLOSE_F_ACTIVE)
248           {
249             ECHO_LOG (2,"%U: ACTIVE close", echo_format_session, s);
250             echo_send_rpc (em, echo_send_disconnect_session,
251                            (echo_rpc_args_t *) &s->vpp_session_handle);
252             clib_atomic_fetch_add (&em->stats.active_count.q, 1);
253           }
254         else if (eqm->send_quic_disconnects == ECHO_CLOSE_F_NONE)
255           {
256             ECHO_LOG (2,"%U: CLEAN close", echo_format_session, s);
257             quic_echo_cleanup_cb (s, 0 /* parent_died */);
258             clib_atomic_fetch_add (&em->stats.clean_count.q, 1);
259           }
260         else
261           ECHO_LOG (2,"%U: PASSIVE close", echo_format_session, s);
262       }
263   }));
264   /* *INDENT-ON* */
265 }
266
267 static void
268 quic_echo_on_connected (session_connected_msg_t * mp, u32 session_index)
269 {
270   echo_main_t *em = &echo_main;
271   quic_echo_proto_main_t *eqm = &quic_echo_proto_main;
272   echo_session_t *listen_session;
273   echo_session_t *session = pool_elt_at_index (em->sessions, session_index);
274
275   if (session->listener_index == SESSION_INVALID_INDEX)
276     {
277       clib_atomic_fetch_add (&em->stats.connected_count.q, 1);
278       session->session_type = ECHO_SESSION_TYPE_QUIC;
279       ECHO_LOG (2, "Connected %U -> URI", echo_format_session, session);
280       session->accepted_session_count = 0;
281       if (eqm->cb_vft.quic_connected_cb)
282         eqm->cb_vft.quic_connected_cb (mp, session->session_index);
283       clib_atomic_fetch_add (&eqm->n_quic_clients_connected, 1);
284
285       if (em->stats.connected_count.q % LOGGING_BATCH == 0)
286         ECHO_LOG (0, "Connected Q %d / %d", em->stats.connected_count.q,
287                   em->n_connects);
288     }
289   else
290     {
291       clib_atomic_fetch_add (&em->stats.connected_count.s, 1);
292       listen_session =
293         pool_elt_at_index (em->sessions, session->listener_index);
294       session->session_type = ECHO_SESSION_TYPE_STREAM;
295       clib_atomic_fetch_add (&listen_session->accepted_session_count, 1);
296       ECHO_LOG (2, "Connected %U -> %U", echo_format_session, session,
297                 echo_format_session, listen_session);
298       if (em->i_am_master && eqm->cb_vft.server_stream_connected_cb)
299         eqm->cb_vft.server_stream_connected_cb (mp, session->session_index);
300       if (!em->i_am_master && eqm->cb_vft.client_stream_connected_cb)
301         eqm->cb_vft.client_stream_connected_cb (mp, session->session_index);
302       clib_atomic_fetch_add (&em->n_clients_connected, 1);
303
304       if (em->stats.connected_count.s % LOGGING_BATCH == 0)
305         ECHO_LOG (0, "Connected S %d / %d", em->stats.connected_count.s,
306                   em->n_clients);
307     }
308
309
310   if (em->n_clients_connected == em->n_clients
311       && em->n_clients_connected != 0)
312     echo_notify_event (em, ECHO_EVT_LAST_SCONNECTED);
313
314   if (eqm->n_quic_clients_connected == em->n_connects
315       && em->state < STATE_READY)
316     {
317       echo_notify_event (em, ECHO_EVT_LAST_QCONNECTED);
318       em->state = STATE_READY;
319       if (eqm->n_stream_clients == 0)
320         quic_echo_initiate_qsession_close_no_stream (em);
321     }
322 }
323
324 static void
325 quic_echo_connected_cb (session_connected_bundled_msg_t * mp,
326                         u32 session_index, u8 is_failed)
327 {
328   if (is_failed)
329     {
330       ECHO_FAIL (ECHO_FAIL_QUIC_WRONG_CONNECT, "Echo connect failed");
331       return;
332     }
333   return quic_echo_on_connected ((session_connected_msg_t *) mp,
334                                  session_index);
335 }
336
337 static void
338 quic_echo_accepted_cb (session_accepted_msg_t * mp, echo_session_t * session)
339 {
340   echo_main_t *em = &echo_main;
341   quic_echo_proto_main_t *eqm = &quic_echo_proto_main;
342   echo_session_t *ls;
343   ls = pool_elt_at_index (em->sessions, session->listener_index);
344   if (ls->session_type == ECHO_SESSION_TYPE_LISTEN)
345     {
346       clib_atomic_fetch_add (&em->stats.accepted_count.q, 1);
347       echo_notify_event (em, ECHO_EVT_FIRST_QCONNECT);
348       session->session_type = ECHO_SESSION_TYPE_QUIC;
349       session->accepted_session_count = 0;
350       if (eqm->cb_vft.quic_accepted_cb)
351         eqm->cb_vft.quic_accepted_cb (mp, session->session_index);
352       clib_atomic_fetch_add (&eqm->n_quic_clients_connected, 1);
353
354       if (em->stats.accepted_count.q % LOGGING_BATCH == 0)
355         ECHO_LOG (0, "Accepted Q %d / %d", em->stats.accepted_count.q,
356                   em->n_connects);
357     }
358   else
359     {
360       clib_atomic_fetch_add (&em->stats.accepted_count.s, 1);
361       session->session_type = ECHO_SESSION_TYPE_STREAM;
362       echo_notify_event (em, ECHO_EVT_FIRST_SCONNECT);
363       clib_atomic_fetch_add (&ls->accepted_session_count, 1);
364       if (em->i_am_master && eqm->cb_vft.server_stream_accepted_cb)
365         eqm->cb_vft.server_stream_accepted_cb (mp, session->session_index);
366       if (!em->i_am_master && eqm->cb_vft.client_stream_accepted_cb)
367         eqm->cb_vft.client_stream_accepted_cb (mp, session->session_index);
368       clib_atomic_fetch_add (&em->n_clients_connected, 1);
369
370       if (em->stats.accepted_count.s % LOGGING_BATCH == 0)
371         ECHO_LOG (0, "Accepted S %d / %d", em->stats.accepted_count.s,
372                   em->n_clients);
373
374       if (em->connect_flag && !(mp->flags & em->connect_flag))
375         {
376           ECHO_FAIL (ECHO_FAIL_UNIDIRECTIONAL,
377                      "expected unidirectional streams");
378         }
379     }
380
381   if (em->n_clients_connected == em->n_clients
382       && em->n_clients_connected != 0)
383     echo_notify_event (em, ECHO_EVT_LAST_SCONNECTED);
384
385   if (eqm->n_quic_clients_connected == em->n_connects
386       && em->state < STATE_READY)
387     {
388       echo_notify_event (em, ECHO_EVT_LAST_QCONNECTED);
389       em->state = STATE_READY;
390       if (eqm->n_stream_clients == 0)
391         quic_echo_initiate_qsession_close_no_stream (em);
392     }
393 }
394
395 static void
396 quic_echo_sent_disconnect_cb (echo_session_t * s)
397 {
398   if (s->session_type == ECHO_SESSION_TYPE_STREAM)
399     s->session_state = ECHO_SESSION_STATE_CLOSING;
400   else
401     quic_echo_cleanup_cb (s, 0 /* parent_died */ );     /* We can clean Q/Lsessions right away */
402 }
403
404 static void
405 quic_echo_disconnected_cb (session_disconnected_msg_t * mp,
406                            echo_session_t * s)
407 {
408   echo_main_t *em = &echo_main;
409   if (s->session_type == ECHO_SESSION_TYPE_STREAM)
410     {
411       echo_session_print_stats (em, s);
412       if (s->bytes_to_receive || s->bytes_to_send)
413         s->session_state = ECHO_SESSION_STATE_AWAIT_DATA;
414       else
415         s->session_state = ECHO_SESSION_STATE_CLOSING;
416       clib_atomic_fetch_add (&em->stats.close_count.s, 1);
417     }
418   else
419     {
420       quic_echo_cleanup_cb (s, 0 /* parent_died */ );   /* We can clean Q/Lsessions right away */
421       clib_atomic_fetch_add (&em->stats.close_count.q, 1);
422     }
423 }
424
425 static void
426 quic_echo_reset_cb (session_reset_msg_t * mp, echo_session_t * s)
427 {
428   echo_main_t *em = &echo_main;
429   if (s->session_type == ECHO_SESSION_TYPE_STREAM)
430     {
431       clib_atomic_fetch_add (&em->stats.reset_count.s, 1);
432       s->session_state = ECHO_SESSION_STATE_CLOSING;
433     }
434   else
435     {
436       clib_atomic_fetch_add (&em->stats.reset_count.q, 1);
437       quic_echo_cleanup_cb (s, 0 /* parent_died */ );   /* We can clean Q/Lsessions right away */
438     }
439 }
440
441 static uword
442 quic_echo_unformat_setup_vft (unformat_input_t * input, va_list * args)
443 {
444   quic_echo_proto_main_t *eqm = &quic_echo_proto_main;
445   if (unformat (input, "serverstream"))
446     eqm->cb_vft = server_stream_cb_vft;
447   else if (unformat (input, "default"))
448     ;
449   else
450     return 0;
451   return 1;
452 }
453
454 static int
455 quic_echo_process_opts_cb (unformat_input_t * a)
456 {
457   echo_main_t *em = &echo_main;
458   quic_echo_proto_main_t *eqm = &quic_echo_proto_main;
459   if (unformat (a, "quic-streams %d", &eqm->n_stream_clients))
460     ;
461   else if (unformat (a, "quic-setup %U", quic_echo_unformat_setup_vft))
462     ;
463   else if (unformat (a, "uni"))
464     em->connect_flag = SESSION_F_UNIDIRECTIONAL;
465   else if (unformat (a, "qclose=%U",
466                      echo_unformat_close, &eqm->send_quic_disconnects))
467     ;
468   else
469     return 0;
470   return 1;
471 }
472
473 static void
474 quic_echo_set_defaults_before_opts_cb ()
475 {
476   quic_echo_proto_main_t *eqm = &quic_echo_proto_main;
477   eqm->cb_vft = default_cb_vft;
478   eqm->n_stream_clients = 1;
479 }
480
481 static void
482 quic_echo_set_defaults_after_opts_cb ()
483 {
484   quic_echo_proto_main_t *eqm = &quic_echo_proto_main;
485   echo_main_t *em = &echo_main;
486   u8 default_f_active;
487
488   em->n_connects = em->n_clients;
489   em->n_sessions =
490     clib_max (1,
491               eqm->n_stream_clients) * em->n_clients + em->n_clients +
492     em->n_uris;
493   em->n_clients = eqm->n_stream_clients * em->n_clients;
494
495   if (em->i_am_master)
496     default_f_active =
497       em->bytes_to_send == 0 ? ECHO_CLOSE_F_ACTIVE : ECHO_CLOSE_F_PASSIVE;
498   else
499     default_f_active =
500       em->bytes_to_receive == 0 ? ECHO_CLOSE_F_PASSIVE : ECHO_CLOSE_F_ACTIVE;
501   if (eqm->send_quic_disconnects == ECHO_CLOSE_F_INVALID)
502     eqm->send_quic_disconnects = default_f_active;
503 }
504
505 static void
506 quic_echo_print_usage_cb ()
507 {
508   fprintf (stderr,
509            "-- QUIC specific options -- \n"
510            "  quic-setup OPT      OPT=serverstream : Client open N connections. \n"
511            "                       On each one server opens M streams\n"
512            "                      OPT=default : Client open N connections.\n"
513            "                       On each one client opens M streams\n"
514            "  qclose=[Y|N|W]      When connection is done send[Y]|nop[N]|wait[W] for close\n"
515            "  uni                 Use unidirectional streams\n"
516            "\n"
517            "  quic-streams N      Open N QUIC streams (defaults to 1)\n");
518 }
519
520 echo_proto_cb_vft_t quic_echo_proto_cb_vft = {
521   .disconnected_cb = quic_echo_disconnected_cb,
522   .connected_cb = quic_echo_connected_cb,
523   .accepted_cb = quic_echo_accepted_cb,
524   .reset_cb = quic_echo_reset_cb,
525   .sent_disconnect_cb = quic_echo_sent_disconnect_cb,
526   .cleanup_cb = quic_echo_cleanup_cb,
527   .process_opts_cb = quic_echo_process_opts_cb,
528   .print_usage_cb = quic_echo_print_usage_cb,
529   .set_defaults_before_opts_cb = quic_echo_set_defaults_before_opts_cb,
530   .set_defaults_after_opts_cb = quic_echo_set_defaults_after_opts_cb,
531 };
532
533 ECHO_REGISTER_PROTO (TRANSPORT_PROTO_QUIC, quic_echo_proto_cb_vft);
534
535 /*
536  * fd.io coding-style-patch-verification: ON
537  *
538  * Local Variables:
539  * eval: (c-set-style "gnu")
540  * End:
541  */