hsa: Improve for mq-intensive
[vpp.git] / src / plugins / hs_apps / sapi / vpp_echo_proto_quic.c
1 /*
2  * Copyright (c) 2019 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include <stdio.h>
17 #include <signal.h>
18
19 #include <hs_apps/sapi/vpp_echo_common.h>
20
21 typedef struct _quic_echo_cb_vft
22 {
23   void (*quic_connected_cb) (session_connected_msg_t * mp, u32 session_index);
24   void (*client_stream_connected_cb) (session_connected_msg_t * mp,
25                                       u32 session_index);
26   void (*server_stream_connected_cb) (session_connected_msg_t * mp,
27                                       u32 session_index);
28   void (*quic_accepted_cb) (session_accepted_msg_t * mp, u32 session_index);
29   void (*client_stream_accepted_cb) (session_accepted_msg_t * mp,
30                                      u32 session_index);
31   void (*server_stream_accepted_cb) (session_accepted_msg_t * mp,
32                                      u32 session_index);
33 } quic_echo_cb_vft_t;
34
35 typedef struct
36 {
37   quic_echo_cb_vft_t cb_vft;    /* cb vft for QUIC scenarios */
38   u8 send_quic_disconnects;     /* actively send disconnect */
39   u32 n_stream_clients;         /* Target Number of STREAM sessions per QUIC session */
40   volatile u32 n_quic_clients_connected;        /* Number of connected QUIC sessions */
41 } quic_echo_proto_main_t;
42
43 quic_echo_proto_main_t quic_echo_proto_main;
44
45 /*
46  *
47  *  ECHO Callback definitions
48  *
49  */
50
51 static void
52 quic_echo_on_connected_connect (session_connected_msg_t * mp,
53                                 u32 session_index)
54 {
55   echo_main_t *em = &echo_main;
56   quic_echo_proto_main_t *eqm = &quic_echo_proto_main;
57   u64 i;
58
59   echo_notify_event (em, ECHO_EVT_FIRST_SCONNECT);
60   for (i = 0; i < eqm->n_stream_clients; i++)
61     echo_send_rpc (em, echo_send_connect, (void *) mp->handle, session_index);
62
63   ECHO_LOG (0, "Qsession 0x%llx S[%d] connected to %U:%d",
64             mp->handle, session_index, format_ip46_address, &mp->lcl.ip,
65             mp->lcl.is_ip4, clib_net_to_host_u16 (mp->lcl.port));
66 }
67
68 static void
69 quic_echo_on_connected_send (session_connected_msg_t * mp, u32 session_index)
70 {
71   static u32 client_index = 0;
72   echo_main_t *em = &echo_main;
73   echo_session_t *session;
74
75   session = pool_elt_at_index (em->sessions, session_index);
76   session->bytes_to_send = em->bytes_to_send;
77   session->bytes_to_receive = em->bytes_to_receive;
78   session->session_state = ECHO_SESSION_STATE_READY;
79   em->data_thread_args[client_index++] = session->session_index;
80 }
81
82 static void
83 quic_echo_on_connected_error (session_connected_msg_t * mp, u32 session_index)
84 {
85   ECHO_FAIL (ECHO_FAIL_QUIC_WRONG_CONNECT,
86              "Got a wrong connected on session %u [%lx]", session_index,
87              mp->handle);
88 }
89
90 static void
91 quic_echo_on_accept_recv (session_accepted_msg_t * mp, u32 session_index)
92 {
93   static u32 client_index = 0;
94   echo_main_t *em = &echo_main;
95   echo_session_t *session;
96
97   session = pool_elt_at_index (em->sessions, session_index);
98   session->bytes_to_send = em->bytes_to_send;
99   session->bytes_to_receive = em->bytes_to_receive;
100   em->data_thread_args[client_index++] = session->session_index;
101   session->session_state = ECHO_SESSION_STATE_READY;
102 }
103
104 static void
105 quic_echo_on_accept_connect (session_accepted_msg_t * mp, u32 session_index)
106 {
107   echo_main_t *em = &echo_main;
108   quic_echo_proto_main_t *eqm = &quic_echo_proto_main;
109   ECHO_LOG (1, "Accept on QSession 0x%lx S[%u]", mp->handle, session_index);
110   u32 i;
111
112   echo_notify_event (em, ECHO_EVT_FIRST_SCONNECT);
113   for (i = 0; i < eqm->n_stream_clients; i++)
114     echo_send_rpc (em, echo_send_connect, (void *) mp->handle, session_index);
115 }
116
117 static void
118 quic_echo_on_accept_error (session_accepted_msg_t * mp, u32 session_index)
119 {
120   ECHO_FAIL (ECHO_FAIL_QUIC_WRONG_ACCEPT,
121              "Got a wrong accept on session 0x%lx S[%u]", mp->handle,
122              session_index);
123 }
124
125 static void
126 quic_echo_on_accept_log_ip (session_accepted_msg_t * mp, u32 session_index)
127 {
128   u8 *ip_str;
129   ip_str = format (0, "%U", format_ip46_address, &mp->rmt.ip, mp->rmt.is_ip4);
130   ECHO_LOG (0, "Accepted session from: %s:%d", ip_str,
131             clib_net_to_host_u16 (mp->rmt.port));
132
133 }
134
135 static const quic_echo_cb_vft_t default_cb_vft = {
136   /* Qsessions */
137   .quic_accepted_cb = quic_echo_on_accept_log_ip,
138   .quic_connected_cb = quic_echo_on_connected_connect,
139   /* client initiated streams */
140   .server_stream_accepted_cb = quic_echo_on_accept_recv,
141   .client_stream_connected_cb = quic_echo_on_connected_send,
142   /* server initiated streams */
143   .client_stream_accepted_cb = quic_echo_on_accept_error,
144   .server_stream_connected_cb = quic_echo_on_connected_error,
145 };
146
147 static const quic_echo_cb_vft_t server_stream_cb_vft = {
148   /* Qsessions */
149   .quic_accepted_cb = quic_echo_on_accept_connect,
150   .quic_connected_cb = NULL,
151   /* client initiated streams */
152   .server_stream_accepted_cb = quic_echo_on_accept_error,
153   .client_stream_connected_cb = quic_echo_on_connected_error,
154   /* server initiated streams */
155   .client_stream_accepted_cb = quic_echo_on_accept_recv,
156   .server_stream_connected_cb = quic_echo_on_connected_send,
157 };
158
159 static void quic_echo_cleanup_cb (echo_session_t * s, u8 parent_died);
160
161 static inline void
162 quic_echo_cleanup_listener (u32 listener_index, echo_main_t * em,
163                             quic_echo_proto_main_t * eqm)
164 {
165   echo_session_t *ls;
166   ls = pool_elt_at_index (em->sessions, listener_index);
167   if (ls->session_type != ECHO_SESSION_TYPE_QUIC)
168     {
169       ECHO_LOG (1, "%U: Invalid listener session type",
170                 echo_format_session, ls);
171       return;
172     }
173   if (!clib_atomic_sub_fetch (&ls->accepted_session_count, 1))
174     {
175       if (eqm->send_quic_disconnects == ECHO_CLOSE_F_ACTIVE)
176         {
177           echo_send_rpc (em, echo_send_disconnect_session,
178                          (void *) ls->vpp_session_handle, 0);
179           clib_atomic_fetch_add (&em->stats.active_count.q, 1);
180         }
181       else if (eqm->send_quic_disconnects == ECHO_CLOSE_F_NONE)
182         {
183           quic_echo_cleanup_cb (ls, 0 /* parent_died */ );
184           clib_atomic_fetch_add (&em->stats.clean_count.q, 1);
185         }
186     }
187 }
188
189 static void
190 quic_echo_cleanup_cb (echo_session_t * s, u8 parent_died)
191 {
192   echo_main_t *em = &echo_main;
193   quic_echo_proto_main_t *eqm = &quic_echo_proto_main;
194   if ((em->state == STATE_DATA_DONE) ||
195       !(s->session_state < ECHO_SESSION_STATE_CLOSED))
196     return;
197   ECHO_LOG (2, "%U cleanup (parent_died %d)", echo_format_session, s,
198             parent_died);
199   s->session_state = ECHO_SESSION_STATE_CLOSED;
200   if (s->session_type == ECHO_SESSION_TYPE_QUIC)
201     {
202       if (parent_died)
203         clib_atomic_fetch_add (&em->stats.clean_count.q, 1);
204       /* Don't cleanup listener as it's handled by main() */
205       clib_atomic_sub_fetch (&eqm->n_quic_clients_connected, 1);
206     }
207   else if (s->session_type == ECHO_SESSION_TYPE_STREAM)
208     {
209       if (parent_died)
210         clib_atomic_fetch_add (&em->stats.clean_count.s, 1);
211       else
212         quic_echo_cleanup_listener (s->listener_index, em, eqm);
213       clib_atomic_sub_fetch (&em->n_clients_connected, 1);
214     }
215   if (!em->n_clients_connected && !eqm->n_quic_clients_connected)
216     em->state = STATE_DATA_DONE;
217   ECHO_LOG (1, "Cleanup sessions (still %uQ %uS): app %U",
218             eqm->n_quic_clients_connected, em->n_clients_connected,
219             echo_format_app_state, em->state);
220 }
221
222 static void
223 quic_echo_initiate_qsession_close_no_stream (echo_main_t * em)
224 {
225   quic_echo_proto_main_t *eqm = &quic_echo_proto_main;
226   ECHO_LOG (1, "Closing Qsessions");
227   /* Close Quic session without streams */
228   echo_session_t *s;
229
230   /* *INDENT-OFF* */
231   pool_foreach (s, em->sessions,
232   ({
233     if (s->session_type == ECHO_SESSION_TYPE_QUIC)
234       {
235         if (eqm->send_quic_disconnects == ECHO_CLOSE_F_ACTIVE)
236           {
237             ECHO_LOG (1,"%U: ACTIVE close", echo_format_session, s);
238             echo_send_rpc (em, echo_send_disconnect_session,
239                            (void *) s->vpp_session_handle, 0);
240             clib_atomic_fetch_add (&em->stats.active_count.q, 1);
241           }
242         else if (eqm->send_quic_disconnects == ECHO_CLOSE_F_NONE)
243           {
244             ECHO_LOG (1,"%U: CLEAN close", echo_format_session, s);
245             quic_echo_cleanup_cb (s, 0 /* parent_died */);
246             clib_atomic_fetch_add (&em->stats.clean_count.q, 1);
247           }
248         else
249           ECHO_LOG (1,"%U: PASSIVE close", echo_format_session, s);
250       }
251   }));
252   /* *INDENT-ON* */
253 }
254
255 static void
256 quic_echo_on_connected (session_connected_msg_t * mp, u32 session_index)
257 {
258   echo_main_t *em = &echo_main;
259   quic_echo_proto_main_t *eqm = &quic_echo_proto_main;
260   echo_session_t *listen_session;
261   echo_session_t *session = pool_elt_at_index (em->sessions, session_index);
262
263   if (session->listener_index == SESSION_INVALID_INDEX)
264     {
265       session->session_type = ECHO_SESSION_TYPE_QUIC;
266       ECHO_LOG (1, "Connected %U -> URI", echo_format_session, session);
267       session->accepted_session_count = 0;
268       if (eqm->cb_vft.quic_connected_cb)
269         eqm->cb_vft.quic_connected_cb (mp, session->session_index);
270       clib_atomic_fetch_add (&eqm->n_quic_clients_connected, 1);
271     }
272   else
273     {
274       listen_session =
275         pool_elt_at_index (em->sessions, session->listener_index);
276       session->session_type = ECHO_SESSION_TYPE_STREAM;
277       clib_atomic_fetch_add (&listen_session->accepted_session_count, 1);
278       ECHO_LOG (1, "Connected %U -> %U", echo_format_session, session,
279                 echo_format_session, listen_session);
280       if (em->i_am_master && eqm->cb_vft.server_stream_connected_cb)
281         eqm->cb_vft.server_stream_connected_cb (mp, session->session_index);
282       if (!em->i_am_master && eqm->cb_vft.client_stream_connected_cb)
283         eqm->cb_vft.client_stream_connected_cb (mp, session->session_index);
284       clib_atomic_fetch_add (&em->n_clients_connected, 1);
285     }
286
287   if (em->n_clients_connected == em->n_clients
288       && em->n_clients_connected != 0)
289     echo_notify_event (em, ECHO_EVT_LAST_SCONNECTED);
290
291   if (eqm->n_quic_clients_connected == em->n_connects
292       && em->state < STATE_READY)
293     {
294       echo_notify_event (em, ECHO_EVT_LAST_QCONNECTED);
295       em->state = STATE_READY;
296       if (eqm->n_stream_clients == 0)
297         quic_echo_initiate_qsession_close_no_stream (em);
298     }
299 }
300
301 static void
302 quic_echo_connected_cb (session_connected_bundled_msg_t * mp,
303                         u32 session_index, u8 is_failed)
304 {
305   if (is_failed)
306     {
307       ECHO_FAIL (ECHO_FAIL_QUIC_WRONG_CONNECT, "Echo connect failed");
308       return;
309     }
310   return quic_echo_on_connected ((session_connected_msg_t *) mp,
311                                  session_index);
312 }
313
314 static void
315 quic_echo_accepted_cb (session_accepted_msg_t * mp, echo_session_t * session)
316 {
317   echo_main_t *em = &echo_main;
318   quic_echo_proto_main_t *eqm = &quic_echo_proto_main;
319   echo_session_t *ls;
320   ls = pool_elt_at_index (em->sessions, session->listener_index);
321   if (ls->session_type == ECHO_SESSION_TYPE_LISTEN)
322     {
323       echo_notify_event (em, ECHO_EVT_FIRST_QCONNECT);
324       session->session_type = ECHO_SESSION_TYPE_QUIC;
325       session->accepted_session_count = 0;
326       if (eqm->cb_vft.quic_accepted_cb)
327         eqm->cb_vft.quic_accepted_cb (mp, session->session_index);
328       clib_atomic_fetch_add (&eqm->n_quic_clients_connected, 1);
329     }
330   else
331     {
332       session->session_type = ECHO_SESSION_TYPE_STREAM;
333       echo_notify_event (em, ECHO_EVT_FIRST_SCONNECT);
334       clib_atomic_fetch_add (&ls->accepted_session_count, 1);
335       if (em->i_am_master && eqm->cb_vft.server_stream_accepted_cb)
336         eqm->cb_vft.server_stream_accepted_cb (mp, session->session_index);
337       if (!em->i_am_master && eqm->cb_vft.client_stream_accepted_cb)
338         eqm->cb_vft.client_stream_accepted_cb (mp, session->session_index);
339       clib_atomic_fetch_add (&em->n_clients_connected, 1);
340     }
341
342   if (em->n_clients_connected == em->n_clients
343       && em->n_clients_connected != 0)
344     echo_notify_event (em, ECHO_EVT_LAST_SCONNECTED);
345
346   if (eqm->n_quic_clients_connected == em->n_connects
347       && em->state < STATE_READY)
348     {
349       echo_notify_event (em, ECHO_EVT_LAST_QCONNECTED);
350       em->state = STATE_READY;
351       if (eqm->n_stream_clients == 0)
352         quic_echo_initiate_qsession_close_no_stream (em);
353     }
354 }
355
356 static void
357 quic_echo_sent_disconnect_cb (echo_session_t * s)
358 {
359   if (s->session_type == ECHO_SESSION_TYPE_STREAM)
360     s->session_state = ECHO_SESSION_STATE_CLOSING;
361   else
362     quic_echo_cleanup_cb (s, 0 /* parent_died */ );     /* We can clean Q/Lsessions right away */
363 }
364
365 static void
366 quic_echo_disconnected_cb (session_disconnected_msg_t * mp,
367                            echo_session_t * s)
368 {
369   echo_main_t *em = &echo_main;
370   if (s->session_type == ECHO_SESSION_TYPE_STREAM)
371     {
372       echo_session_print_stats (em, s);
373       if (s->bytes_to_receive || s->bytes_to_send)
374         s->session_state = ECHO_SESSION_STATE_AWAIT_DATA;
375       else
376         s->session_state = ECHO_SESSION_STATE_CLOSING;
377       clib_atomic_fetch_add (&em->stats.close_count.s, 1);
378     }
379   else
380     {
381       quic_echo_cleanup_cb (s, 0 /* parent_died */ );   /* We can clean Q/Lsessions right away */
382       clib_atomic_fetch_add (&em->stats.close_count.q, 1);
383     }
384 }
385
386 static void
387 quic_echo_reset_cb (session_reset_msg_t * mp, echo_session_t * s)
388 {
389   echo_main_t *em = &echo_main;
390   if (s->session_type == ECHO_SESSION_TYPE_STREAM)
391     {
392       clib_atomic_fetch_add (&em->stats.reset_count.s, 1);
393       s->session_state = ECHO_SESSION_STATE_CLOSING;
394     }
395   else
396     {
397       clib_atomic_fetch_add (&em->stats.reset_count.q, 1);
398       quic_echo_cleanup_cb (s, 0 /* parent_died */ );   /* We can clean Q/Lsessions right away */
399     }
400 }
401
402 static uword
403 quic_echo_unformat_setup_vft (unformat_input_t * input, va_list * args)
404 {
405   quic_echo_proto_main_t *eqm = &quic_echo_proto_main;
406   if (unformat (input, "serverstream"))
407     eqm->cb_vft = server_stream_cb_vft;
408   else if (unformat (input, "default"))
409     ;
410   else
411     return 0;
412   return 1;
413 }
414
415 static int
416 quic_echo_process_opts_cb (unformat_input_t * a)
417 {
418   quic_echo_proto_main_t *eqm = &quic_echo_proto_main;
419   if (unformat (a, "quic-streams %d", &eqm->n_stream_clients))
420     ;
421   else if (unformat (a, "quic-setup %U", quic_echo_unformat_setup_vft))
422     ;
423   else if (unformat (a, "qclose=%U",
424                      echo_unformat_close, &eqm->send_quic_disconnects))
425     ;
426   else
427     return 0;
428   return 1;
429 }
430
431 static void
432 quic_echo_set_defaults_before_opts_cb ()
433 {
434   quic_echo_proto_main_t *eqm = &quic_echo_proto_main;
435   eqm->cb_vft = default_cb_vft;
436   eqm->n_stream_clients = 1;
437 }
438
439 static void
440 quic_echo_set_defaults_after_opts_cb ()
441 {
442   quic_echo_proto_main_t *eqm = &quic_echo_proto_main;
443   echo_main_t *em = &echo_main;
444   u8 default_f_active;
445
446   em->n_connects = em->n_clients;
447   em->n_sessions =
448     clib_max (1, eqm->n_stream_clients) * em->n_clients + em->n_clients + 1;
449   em->n_clients = eqm->n_stream_clients * em->n_clients;
450
451   if (em->i_am_master)
452     default_f_active =
453       em->bytes_to_send == 0 ? ECHO_CLOSE_F_ACTIVE : ECHO_CLOSE_F_PASSIVE;
454   else
455     default_f_active =
456       em->bytes_to_receive == 0 ? ECHO_CLOSE_F_PASSIVE : ECHO_CLOSE_F_ACTIVE;
457   if (eqm->send_quic_disconnects == ECHO_CLOSE_F_INVALID)
458     eqm->send_quic_disconnects = default_f_active;
459 }
460
461 static void
462 quic_echo_print_usage_cb ()
463 {
464   fprintf (stderr,
465            "-- QUIC specific options -- \n"
466            "  quic-setup OPT      OPT=serverstream : Client open N connections. \n"
467            "                       On each one server opens M streams\n"
468            "                      OPT=default : Client open N connections.\n"
469            "                       On each one client opens M streams\n"
470            "  qclose=[Y|N|W]      When connection is done send[Y]|nop[N]|wait[W] for close\n"
471            "\n"
472            "  quic-streams N      Open N QUIC streams (defaults to 1)\n");
473 }
474
475 echo_proto_cb_vft_t quic_echo_proto_cb_vft = {
476   .disconnected_cb = quic_echo_disconnected_cb,
477   .connected_cb = quic_echo_connected_cb,
478   .accepted_cb = quic_echo_accepted_cb,
479   .reset_cb = quic_echo_reset_cb,
480   .sent_disconnect_cb = quic_echo_sent_disconnect_cb,
481   .cleanup_cb = quic_echo_cleanup_cb,
482   .process_opts_cb = quic_echo_process_opts_cb,
483   .print_usage_cb = quic_echo_print_usage_cb,
484   .set_defaults_before_opts_cb = quic_echo_set_defaults_before_opts_cb,
485   .set_defaults_after_opts_cb = quic_echo_set_defaults_after_opts_cb,
486 };
487
488 ECHO_REGISTER_PROTO (TRANSPORT_PROTO_QUIC, quic_echo_proto_cb_vft);
489
490 /*
491  * fd.io coding-style-patch-verification: ON
492  *
493  * Local Variables:
494  * eval: (c-set-style "gnu")
495  * End:
496  */