session: refactor local/cut-through listens
[vpp.git] / src / vnet / session / application_local.c
1 /*
2  * Copyright (c) 2019 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include <vnet/session/application_local.h>
17 #include <vnet/session/session.h>
18
19 typedef struct ct_connection_
20 {
21   transport_connection_t connection;
22   u32 client_wrk;
23   u32 server_wrk;
24   u32 transport_listener_index;
25   transport_proto_t actual_tp;
26 } ct_connection_t;
27
28 ct_connection_t *connections;
29
30 ct_connection_t *
31 ct_connection_alloc (void)
32 {
33   ct_connection_t *ct;
34
35   pool_get_zero (connections, ct);
36   ct->c_c_index = ct - connections;
37   ct->c_thread_index = 0;
38   ct->client_wrk = ~0;
39   ct->server_wrk = ~0;
40   return ct;
41 }
42
43 ct_connection_t *
44 ct_connection_get (u32 ct_index)
45 {
46   if (pool_is_free_index (connections, ct_index))
47     return 0;
48   return pool_elt_at_index (connections, ct_index);
49 }
50
51 void
52 ct_connection_free (ct_connection_t * ct)
53 {
54   if (CLIB_DEBUG)
55     memset (ct, 0xfc, sizeof (*ct));
56   pool_put (connections, ct);
57 }
58
59 void
60 application_local_listener_session_endpoint (session_t * ll,
61                                              session_endpoint_t * sep)
62 {
63   ct_connection_t *ct;
64   ct = (ct_connection_t *) session_get_transport (ll);
65   sep->transport_proto = ct->actual_tp;
66   sep->port = ct->c_lcl_port;
67   sep->is_ip4 = ct->c_is_ip4;
68 }
69
70 local_session_t *
71 app_worker_local_session_alloc (app_worker_t * app_wrk)
72 {
73   local_session_t *s;
74   pool_get (app_wrk->local_sessions, s);
75   clib_memset (s, 0, sizeof (*s));
76   s->app_wrk_index = app_wrk->wrk_index;
77   s->session_index = s - app_wrk->local_sessions;
78   s->session_type = session_type_from_proto_and_ip (TRANSPORT_PROTO_NONE, 0);
79   return s;
80 }
81
82 void
83 app_worker_local_session_free (app_worker_t * app_wrk, local_session_t * s)
84 {
85   pool_put (app_wrk->local_sessions, s);
86   if (CLIB_DEBUG)
87     clib_memset (s, 0xfc, sizeof (*s));
88 }
89
90 local_session_t *
91 app_worker_get_local_session (app_worker_t * app_wrk, u32 session_index)
92 {
93   if (pool_is_free_index (app_wrk->local_sessions, session_index))
94     return 0;
95   return pool_elt_at_index (app_wrk->local_sessions, session_index);
96 }
97
98 local_session_t *
99 app_worker_get_local_session_from_handle (session_handle_t handle)
100 {
101   app_worker_t *server_wrk;
102   u32 session_index, server_wrk_index;
103   local_session_parse_handle (handle, &server_wrk_index, &session_index);
104   server_wrk = app_worker_get_if_valid (server_wrk_index);
105   if (!server_wrk)
106     return 0;
107   return app_worker_get_local_session (server_wrk, session_index);
108 }
109
110 static inline u64
111 application_client_local_connect_key (local_session_t * ls)
112 {
113   return (((u64) ls->app_wrk_index) << 32 | (u64) ls->session_index);
114 }
115
116 static inline void
117 application_client_local_connect_key_parse (u64 key, u32 * app_wrk_index,
118                                             u32 * session_index)
119 {
120   *app_wrk_index = key >> 32;
121   *session_index = key & 0xFFFFFFFF;
122 }
123
124 void
125 app_worker_local_sessions_free (app_worker_t * app_wrk)
126 {
127   u32 index, server_wrk_index, session_index;
128   u64 handle, *handles = 0;
129   app_worker_t *server_wrk;
130   segment_manager_t *sm;
131   local_session_t *ls;
132   int i;
133
134   /*
135    * Local sessions
136    */
137   if (app_wrk->local_sessions)
138     {
139       /* *INDENT-OFF* */
140       pool_foreach (ls, app_wrk->local_sessions, ({
141         app_worker_local_session_disconnect (app_wrk->wrk_index, ls);
142       }));
143       /* *INDENT-ON* */
144     }
145
146   /*
147    * Local connects
148    */
149   vec_reset_length (handles);
150   /* *INDENT-OFF* */
151   hash_foreach (handle, index, app_wrk->local_connects, ({
152     vec_add1 (handles, handle);
153   }));
154   /* *INDENT-ON* */
155
156   for (i = 0; i < vec_len (handles); i++)
157     {
158       application_client_local_connect_key_parse (handles[i],
159                                                   &server_wrk_index,
160                                                   &session_index);
161       server_wrk = app_worker_get_if_valid (server_wrk_index);
162       if (server_wrk)
163         {
164           ls = app_worker_get_local_session (server_wrk, session_index);
165           app_worker_local_session_disconnect (app_wrk->wrk_index, ls);
166         }
167     }
168
169   sm = segment_manager_get (app_wrk->local_segment_manager);
170   sm->app_wrk_index = SEGMENT_MANAGER_INVALID_APP_INDEX;
171   segment_manager_del (sm);
172 }
173
174 int
175 app_worker_local_session_cleanup (app_worker_t * client_wrk,
176                                   app_worker_t * server_wrk,
177                                   local_session_t * ls)
178 {
179   svm_fifo_segment_private_t *seg;
180   session_t *listener;
181   segment_manager_t *sm;
182   u64 client_key;
183   u8 has_transport;
184
185   /* Retrieve listener transport type as it is the one that decides where
186    * the fifos are allocated */
187   has_transport = application_local_session_listener_has_transport (ls);
188   if (!has_transport)
189     sm = app_worker_get_local_segment_manager_w_session (server_wrk, ls);
190   else
191     {
192       listener = listen_session_get (ls->listener_index);
193       sm = app_worker_get_listen_segment_manager (server_wrk, listener);
194     }
195
196   seg = segment_manager_get_segment (sm, ls->svm_segment_index);
197   if (client_wrk)
198     {
199       client_key = application_client_local_connect_key (ls);
200       hash_unset (client_wrk->local_connects, client_key);
201     }
202
203   if (!has_transport)
204     {
205       application_t *server = application_get (server_wrk->app_index);
206       u64 segment_handle = segment_manager_segment_handle (sm, seg);
207       server->cb_fns.del_segment_callback (server_wrk->api_client_index,
208                                            segment_handle);
209       if (client_wrk)
210         {
211           application_t *client = application_get (client_wrk->app_index);
212           client->cb_fns.del_segment_callback (client_wrk->api_client_index,
213                                                segment_handle);
214         }
215       segment_manager_del_segment (sm, seg);
216     }
217
218   app_worker_local_session_free (server_wrk, ls);
219
220   return 0;
221 }
222
223 int
224 app_worker_local_session_connect_notify (local_session_t * ls)
225 {
226   svm_fifo_segment_private_t *seg;
227   app_worker_t *client_wrk, *server_wrk;
228   segment_manager_t *sm;
229   application_t *client;
230   int rv, is_fail = 0;
231   u64 segment_handle;
232   u64 client_key;
233
234   client_wrk = app_worker_get (ls->client_wrk_index);
235   server_wrk = app_worker_get (ls->app_wrk_index);
236   client = application_get (client_wrk->app_index);
237
238   sm = app_worker_get_local_segment_manager_w_session (server_wrk, ls);
239   seg = segment_manager_get_segment_w_lock (sm, ls->svm_segment_index);
240   segment_handle = segment_manager_segment_handle (sm, seg);
241   if ((rv = client->cb_fns.add_segment_callback (client_wrk->api_client_index,
242                                                  segment_handle)))
243     {
244       clib_warning ("failed to notify client %u of new segment",
245                     ls->client_wrk_index);
246       segment_manager_segment_reader_unlock (sm);
247       app_worker_local_session_disconnect (ls->client_wrk_index, ls);
248       is_fail = 1;
249     }
250   else
251     {
252       segment_manager_segment_reader_unlock (sm);
253     }
254
255   client->cb_fns.session_connected_callback (client_wrk->wrk_index,
256                                              ls->client_opaque,
257                                              (session_t *) ls, is_fail);
258
259   client_key = application_client_local_connect_key (ls);
260   hash_set (client_wrk->local_connects, client_key, client_key);
261   return 0;
262 }
263
264 static void
265 application_local_session_fix_eventds (svm_msg_q_t * sq, svm_msg_q_t * cq)
266 {
267   int fd;
268
269   /*
270    * segment manager initializes only the producer eventds, since vpp is
271    * typically the producer. But for local sessions, we also pass to the
272    * apps the mqs they listen on for events from peer apps, so they are also
273    * consumer fds.
274    */
275   fd = svm_msg_q_get_producer_eventfd (sq);
276   svm_msg_q_set_consumer_eventfd (sq, fd);
277   fd = svm_msg_q_get_producer_eventfd (cq);
278   svm_msg_q_set_consumer_eventfd (cq, fd);
279 }
280
281 int
282 app_worker_local_session_connect (app_worker_t * client_wrk,
283                                   app_worker_t * server_wrk,
284                                   session_t * ll, u32 opaque)
285 {
286   u32 seg_size, evt_q_sz, evt_q_elts, margin = 16 << 10;
287   u32 round_rx_fifo_sz, round_tx_fifo_sz, sm_index;
288   segment_manager_properties_t *props, *cprops;
289   int rv, has_transport, seg_index;
290   svm_fifo_segment_private_t *seg;
291   application_t *server, *client;
292   segment_manager_t *sm;
293   local_session_t *ls;
294   svm_msg_q_t *sq, *cq;
295   u64 segment_handle;
296
297   ls = app_worker_local_session_alloc (server_wrk);
298   server = application_get (server_wrk->app_index);
299   client = application_get (client_wrk->app_index);
300
301   props = application_segment_manager_properties (server);
302   cprops = application_segment_manager_properties (client);
303   evt_q_elts = props->evt_q_size + cprops->evt_q_size;
304   evt_q_sz = segment_manager_evt_q_expected_size (evt_q_elts);
305   round_rx_fifo_sz = 1 << max_log2 (props->rx_fifo_size);
306   round_tx_fifo_sz = 1 << max_log2 (props->tx_fifo_size);
307   seg_size = round_rx_fifo_sz + round_tx_fifo_sz + evt_q_sz + margin;
308
309   has_transport = session_has_transport (ll);
310   if (!has_transport)
311     {
312       /* Local sessions don't have backing transport */
313       transport_connection_t *tc;
314       tc = session_get_transport (ll);
315       ls->port = tc->lcl_port;
316       sm = app_worker_get_local_segment_manager (server_wrk);
317     }
318   else
319     {
320       session_t *sl = (session_t *) ll;
321       transport_connection_t *tc;
322       tc = listen_session_get_transport (sl);
323       ls->port = tc->lcl_port;
324       sm = app_worker_get_listen_segment_manager (server_wrk, sl);
325     }
326
327   seg_index = segment_manager_add_segment (sm, seg_size);
328   if (seg_index < 0)
329     {
330       clib_warning ("failed to add new cut-through segment");
331       return seg_index;
332     }
333   seg = segment_manager_get_segment_w_lock (sm, seg_index);
334   sq = segment_manager_alloc_queue (seg, props);
335   cq = segment_manager_alloc_queue (seg, cprops);
336
337   if (props->use_mq_eventfd)
338     application_local_session_fix_eventds (sq, cq);
339
340   ls->server_evt_q = pointer_to_uword (sq);
341   ls->client_evt_q = pointer_to_uword (cq);
342   rv = segment_manager_try_alloc_fifos (seg, props->rx_fifo_size,
343                                         props->tx_fifo_size,
344                                         &ls->rx_fifo, &ls->tx_fifo);
345   if (rv)
346     {
347       clib_warning ("failed to add fifos in cut-through segment");
348       segment_manager_segment_reader_unlock (sm);
349       goto failed;
350     }
351   sm_index = segment_manager_index (sm);
352   ls->rx_fifo->ct_session_index = ls->session_index;
353   ls->tx_fifo->ct_session_index = ls->session_index;
354   ls->rx_fifo->segment_manager = sm_index;
355   ls->tx_fifo->segment_manager = sm_index;
356   ls->rx_fifo->segment_index = seg_index;
357   ls->tx_fifo->segment_index = seg_index;
358   ls->svm_segment_index = seg_index;
359   ls->listener_index = ll->session_index;
360   ls->client_wrk_index = client_wrk->wrk_index;
361   ls->client_opaque = opaque;
362   ls->listener_session_type = ll->session_type;
363   ls->session_state = SESSION_STATE_READY;
364
365   segment_handle = segment_manager_segment_handle (sm, seg);
366   if ((rv = server->cb_fns.add_segment_callback (server_wrk->api_client_index,
367                                                  segment_handle)))
368     {
369       clib_warning ("failed to notify server of new segment");
370       segment_manager_segment_reader_unlock (sm);
371       goto failed;
372     }
373   segment_manager_segment_reader_unlock (sm);
374   if ((rv = server->cb_fns.session_accept_callback ((session_t *) ls)))
375     {
376       clib_warning ("failed to send accept cut-through notify to server");
377       goto failed;
378     }
379   if (server->flags & APP_OPTIONS_FLAGS_IS_BUILTIN)
380     app_worker_local_session_connect_notify (ls);
381
382   return 0;
383
384 failed:
385   if (!has_transport)
386     segment_manager_del_segment (sm, seg);
387   return rv;
388 }
389
390 int
391 app_worker_local_session_disconnect (u32 app_wrk_index, local_session_t * ls)
392 {
393   app_worker_t *client_wrk, *server_wrk;
394
395   client_wrk = app_worker_get_if_valid (ls->client_wrk_index);
396   server_wrk = app_worker_get (ls->app_wrk_index);
397
398   if (ls->session_state == SESSION_STATE_CLOSED)
399     return app_worker_local_session_cleanup (client_wrk, server_wrk, ls);
400
401   if (app_wrk_index == ls->client_wrk_index)
402     {
403       mq_send_local_session_disconnected_cb (ls->app_wrk_index, ls);
404     }
405   else
406     {
407       if (!client_wrk)
408         {
409           return app_worker_local_session_cleanup (client_wrk, server_wrk,
410                                                    ls);
411         }
412       else if (ls->session_state < SESSION_STATE_READY)
413         {
414           application_t *client = application_get (client_wrk->app_index);
415           client->cb_fns.session_connected_callback (client_wrk->wrk_index,
416                                                      ls->client_opaque,
417                                                      (session_t *) ls,
418                                                      1 /* is_fail */ );
419           ls->session_state = SESSION_STATE_CLOSED;
420           return app_worker_local_session_cleanup (client_wrk, server_wrk,
421                                                    ls);
422         }
423       else
424         {
425           mq_send_local_session_disconnected_cb (client_wrk->wrk_index, ls);
426         }
427     }
428
429   ls->session_state = SESSION_STATE_CLOSED;
430
431   return 0;
432 }
433
434 int
435 app_worker_local_session_disconnect_w_index (u32 app_wrk_index, u32 ls_index)
436 {
437   app_worker_t *app_wrk;
438   local_session_t *ls;
439   app_wrk = app_worker_get (app_wrk_index);
440   ls = app_worker_get_local_session (app_wrk, ls_index);
441   return app_worker_local_session_disconnect (app_wrk_index, ls);
442 }
443
444 void
445 app_worker_format_local_sessions (app_worker_t * app_wrk, int verbose)
446 {
447   vlib_main_t *vm = vlib_get_main ();
448   app_worker_t *client_wrk;
449   local_session_t *ls;
450   transport_proto_t tp;
451   u8 *conn = 0;
452
453   /* Header */
454   if (app_wrk == 0)
455     {
456       vlib_cli_output (vm, "%-40s%-15s%-20s", "Connection", "ServerApp",
457                        "ClientApp");
458       return;
459     }
460
461   if (!pool_elts (app_wrk->local_sessions)
462       && !pool_elts (app_wrk->local_connects))
463     return;
464
465   /* *INDENT-OFF* */
466   pool_foreach (ls, app_wrk->local_sessions, ({
467     tp = session_type_transport_proto(ls->listener_session_type);
468     conn = format (0, "[L][%U] *:%u", format_transport_proto_short, tp,
469                    ls->port);
470     client_wrk = app_worker_get (ls->client_wrk_index);
471     vlib_cli_output (vm, "%-40v%-15u%-20u", conn, ls->app_index,
472                      client_wrk->app_index);
473     vec_reset_length (conn);
474   }));
475   /* *INDENT-ON* */
476
477   vec_free (conn);
478 }
479
480 void
481 app_worker_format_local_connects (app_worker_t * app, int verbose)
482 {
483   vlib_main_t *vm = vlib_get_main ();
484   u32 app_wrk_index, session_index;
485   app_worker_t *server_wrk;
486   local_session_t *ls;
487   u64 client_key;
488   u64 value;
489
490   /* Header */
491   if (app == 0)
492     {
493       if (verbose)
494         vlib_cli_output (vm, "%-40s%-15s%-20s%-10s", "Connection", "App",
495                          "Peer App", "SegManager");
496       else
497         vlib_cli_output (vm, "%-40s%-15s%-20s", "Connection", "App",
498                          "Peer App");
499       return;
500     }
501
502   if (!app->local_connects)
503     return;
504
505   /* *INDENT-OFF* */
506   hash_foreach (client_key, value, app->local_connects, ({
507     application_client_local_connect_key_parse (client_key, &app_wrk_index,
508                                                 &session_index);
509     server_wrk = app_worker_get (app_wrk_index);
510     ls = app_worker_get_local_session (server_wrk, session_index);
511     vlib_cli_output (vm, "%-40s%-15s%-20s", "TODO", ls->app_wrk_index,
512                      ls->client_wrk_index);
513   }));
514   /* *INDENT-ON* */
515 }
516
517 u32
518 ct_start_listen (u32 app_listener_index, transport_endpoint_t * tep)
519 {
520   session_endpoint_cfg_t *sep;
521   ct_connection_t *ct;
522
523   sep = (session_endpoint_cfg_t *) tep;
524   ct = ct_connection_alloc ();
525   ct->server_wrk = sep->app_wrk_index;
526   ct->c_is_ip4 = sep->is_ip4;
527   clib_memcpy (&ct->c_lcl_ip, &sep->ip, sizeof (sep->ip));
528   ct->c_lcl_port = sep->port;
529   ct->actual_tp = sep->transport_proto;
530   return ct->c_c_index;
531 }
532
533 u32
534 ct_stop_listen (u32 ct_index)
535 {
536   ct_connection_t *ct;
537   ct = ct_connection_get (ct_index);
538   ct_connection_free (ct);
539   return 0;
540 }
541
542 transport_connection_t *
543 ct_listener_get (u32 ct_index)
544 {
545   return (transport_connection_t *) ct_connection_get (ct_index);
546 }
547
548 static u8 *
549 format_ct_connection_id (u8 * s, va_list * args)
550 {
551   ct_connection_t *ct = va_arg (*args, ct_connection_t *);
552   if (!ct)
553     return s;
554   if (ct->c_is_ip4)
555     {
556       s = format (s, "[%d:%d][CT:%U] %U:%d->%U:%d", ct->c_thread_index,
557                   ct->c_s_index, format_transport_proto_short, ct->actual_tp,
558                   format_ip4_address, &ct->c_lcl_ip4,
559                   clib_net_to_host_u16 (ct->c_lcl_port), format_ip4_address,
560                   &ct->c_rmt_ip4, clib_net_to_host_u16 (ct->c_rmt_port));
561     }
562   else
563     {
564       s = format (s, "[%d:%d][CT:%U] %U:%d->%U:%d", ct->c_thread_index,
565                   ct->c_s_index, format_transport_proto_short, ct->actual_tp,
566                   format_ip6_address, &ct->c_lcl_ip6,
567                   clib_net_to_host_u16 (ct->c_lcl_port), format_ip6_address,
568                   &ct->c_rmt_ip6, clib_net_to_host_u16 (ct->c_rmt_port));
569     }
570
571   return s;
572 }
573
574 u8 *
575 format_ct_listener (u8 * s, va_list * args)
576 {
577   u32 tc_index = va_arg (*args, u32);
578   u32 __clib_unused verbose = va_arg (*args, u32);
579   ct_connection_t *ct = ct_connection_get (tc_index);
580   s = format (s, "%-50U", format_ct_connection_id, ct);
581   if (verbose)
582     s = format (s, "%-15s", "LISTEN");
583   return s;
584 }
585
586 /* *INDENT-OFF* */
587 const static transport_proto_vft_t cut_thru_proto = {
588   .start_listen = ct_start_listen,
589   .stop_listen = ct_stop_listen,
590   .get_listener = ct_listener_get,
591   .tx_type = TRANSPORT_TX_INTERNAL,
592   .service_type = TRANSPORT_SERVICE_APP,
593   .format_listener = format_ct_listener,
594 };
595 /* *INDENT-ON* */
596
597 static clib_error_t *
598 ct_transport_init (vlib_main_t * vm)
599 {
600   transport_register_protocol (TRANSPORT_PROTO_NONE, &cut_thru_proto,
601                                FIB_PROTOCOL_IP4, ~0);
602   return 0;
603 }
604
605 VLIB_INIT_FUNCTION (ct_transport_init);
606
607 /*
608  * fd.io coding-style-patch-verification: ON
609  *
610  * Local Variables:
611  * eval: (c-set-style "gnu")
612  * End:
613  */