vcl/session: use mq for bind replies
[vpp.git] / src / vnet / session / application.c
1 /*
2  * Copyright (c) 2017 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include <vnet/session/application.h>
17 #include <vnet/session/application_interface.h>
18 #include <vnet/session/application_namespace.h>
19 #include <vnet/session/session.h>
20
21 static app_main_t app_main;
22
23 static app_worker_map_t *
24 app_worker_map_alloc (application_t * app)
25 {
26   app_worker_map_t *map;
27   pool_get (app->worker_maps, map);
28   memset (map, 0, sizeof (*map));
29   return map;
30 }
31
32 static u32
33 app_worker_map_index (application_t * app, app_worker_map_t * map)
34 {
35   return (map - app->worker_maps);
36 }
37
38 static void
39 app_worker_map_free (application_t * app, app_worker_map_t * map)
40 {
41   pool_put (app->worker_maps, map);
42 }
43
44 static app_worker_map_t *
45 app_worker_map_get (application_t * app, u32 map_index)
46 {
47   return pool_elt_at_index (app->worker_maps, map_index);
48 }
49
50 static u8 *
51 app_get_name_from_reg_index (application_t * app)
52 {
53   u8 *app_name;
54
55   vl_api_registration_t *regp;
56   regp = vl_api_client_index_to_registration (app->api_client_index);
57   if (!regp)
58     app_name = format (0, "builtin-%d%c", app->app_index, 0);
59   else
60     app_name = format (0, "%s%c", regp->name, 0);
61
62   return app_name;
63 }
64
65 static u8 *
66 app_get_name (application_t * app)
67 {
68   if (!app->name)
69     return app_get_name_from_reg_index (app);
70   return app->name;
71 }
72
73 u32
74 application_session_table (application_t * app, u8 fib_proto)
75 {
76   app_namespace_t *app_ns;
77   app_ns = app_namespace_get (app->ns_index);
78   if (!application_has_global_scope (app))
79     return APP_INVALID_INDEX;
80   if (fib_proto == FIB_PROTOCOL_IP4)
81     return session_lookup_get_index_for_fib (fib_proto,
82                                              app_ns->ip4_fib_index);
83   else
84     return session_lookup_get_index_for_fib (fib_proto,
85                                              app_ns->ip6_fib_index);
86 }
87
88 u32
89 application_local_session_table (application_t * app)
90 {
91   app_namespace_t *app_ns;
92   if (!application_has_local_scope (app))
93     return APP_INVALID_INDEX;
94   app_ns = app_namespace_get (app->ns_index);
95   return app_ns->local_table_index;
96 }
97
98 int
99 application_api_queue_is_full (application_t * app)
100 {
101   svm_queue_t *q;
102
103   /* builtin servers are always OK */
104   if (app->api_client_index == ~0)
105     return 0;
106
107   q = vl_api_client_index_to_input_queue (app->api_client_index);
108   if (!q)
109     return 1;
110
111   if (q->cursize == q->maxsize)
112     return 1;
113   return 0;
114 }
115
116 /**
117  * Returns app name
118  *
119  * Since the name is not stored per app, we generate it on the fly. It is
120  * the caller's responsibility to free the vector
121  */
122 u8 *
123 application_name_from_index (u32 app_index)
124 {
125   application_t *app = application_get (app_index);
126   if (!app)
127     return 0;
128   return app_get_name_from_reg_index (app);
129 }
130
131 static void
132 application_table_add (application_t * app)
133 {
134   if (app->api_client_index != APP_INVALID_INDEX)
135     hash_set (app_main.app_by_api_client_index, app->api_client_index,
136               app->app_index);
137   else if (app->name)
138     hash_set_mem (app_main.app_by_name, app->name, app->app_index);
139 }
140
141 static void
142 application_table_del (application_t * app)
143 {
144   if (app->api_client_index != APP_INVALID_INDEX)
145     hash_unset (app_main.app_by_api_client_index, app->api_client_index);
146   else if (app->name)
147     hash_unset_mem (app_main.app_by_name, app->name);
148 }
149
150 application_t *
151 application_lookup (u32 api_client_index)
152 {
153   uword *p;
154   p = hash_get (app_main.app_by_api_client_index, api_client_index);
155   if (p)
156     return application_get (p[0]);
157
158   return 0;
159 }
160
161 application_t *
162 application_lookup_name (const u8 * name)
163 {
164   uword *p;
165   p = hash_get_mem (app_main.app_by_name, name);
166   if (p)
167     return application_get (p[0]);
168
169   return 0;
170 }
171
172 application_t *
173 application_alloc (void)
174 {
175   application_t *app;
176   pool_get (app_main.app_pool, app);
177   memset (app, 0, sizeof (*app));
178   app->app_index = app - app_main.app_pool;
179   return app;
180 }
181
182 application_t *
183 application_get (u32 app_index)
184 {
185   if (app_index == APP_INVALID_INDEX)
186     return 0;
187   return pool_elt_at_index (app_main.app_pool, app_index);
188 }
189
190 application_t *
191 application_get_if_valid (u32 app_index)
192 {
193   if (pool_is_free_index (app_main.app_pool, app_index))
194     return 0;
195
196   return pool_elt_at_index (app_main.app_pool, app_index);
197 }
198
199 u32
200 application_index (application_t * app)
201 {
202   return app - app_main.app_pool;
203 }
204
205 static void
206 application_verify_cb_fns (session_cb_vft_t * cb_fns)
207 {
208   if (cb_fns->session_accept_callback == 0)
209     clib_warning ("No accept callback function provided");
210   if (cb_fns->session_connected_callback == 0)
211     clib_warning ("No session connected callback function provided");
212   if (cb_fns->session_disconnect_callback == 0)
213     clib_warning ("No session disconnect callback function provided");
214   if (cb_fns->session_reset_callback == 0)
215     clib_warning ("No session reset callback function provided");
216 }
217
218 /**
219  * Check app config for given segment type
220  *
221  * Returns 1 on success and 0 otherwise
222  */
223 static u8
224 application_verify_cfg (ssvm_segment_type_t st)
225 {
226   u8 is_valid;
227   if (st == SSVM_SEGMENT_MEMFD)
228     {
229       is_valid = (session_manager_get_evt_q_segment () != 0);
230       if (!is_valid)
231         clib_warning ("memfd seg: vpp's event qs IN binary api svm region");
232       return is_valid;
233     }
234   else if (st == SSVM_SEGMENT_SHM)
235     {
236       is_valid = (session_manager_get_evt_q_segment () == 0);
237       if (!is_valid)
238         clib_warning ("shm seg: vpp's event qs NOT IN binary api svm region");
239       return is_valid;
240     }
241   else
242     return 1;
243 }
244
245 int
246 application_alloc_and_init (app_init_args_t * a)
247 {
248   ssvm_segment_type_t seg_type = SSVM_SEGMENT_MEMFD;
249   segment_manager_properties_t *props;
250   vl_api_registration_t *reg;
251   application_t *app;
252   u64 *options;
253
254   app = application_alloc ();
255   options = a->options;
256   /*
257    * Make sure we support the requested configuration
258    */
259   if (!(options[APP_OPTIONS_FLAGS] & APP_OPTIONS_FLAGS_IS_BUILTIN))
260     {
261       reg = vl_api_client_index_to_registration (a->api_client_index);
262       if (!reg)
263         return VNET_API_ERROR_APP_UNSUPPORTED_CFG;
264       if (vl_api_registration_file_index (reg) == VL_API_INVALID_FI)
265         seg_type = SSVM_SEGMENT_SHM;
266     }
267   else
268     {
269       if (options[APP_OPTIONS_FLAGS] & APP_OPTIONS_FLAGS_EVT_MQ_USE_EVENTFD)
270         {
271           clib_warning ("mq eventfds can only be used if socket transport is "
272                         "used for api");
273           return VNET_API_ERROR_APP_UNSUPPORTED_CFG;
274         }
275       seg_type = SSVM_SEGMENT_PRIVATE;
276     }
277
278   if (!application_verify_cfg (seg_type))
279     return VNET_API_ERROR_APP_UNSUPPORTED_CFG;
280
281   /* Check that the obvious things are properly set up */
282   application_verify_cb_fns (a->session_cb_vft);
283
284   app->api_client_index = a->api_client_index;
285   app->flags = options[APP_OPTIONS_FLAGS];
286   app->cb_fns = *a->session_cb_vft;
287   app->ns_index = options[APP_OPTIONS_NAMESPACE];
288   app->proxied_transports = options[APP_OPTIONS_PROXY_TRANSPORT];
289   app->name = vec_dup (a->name);
290
291   /* If no scope enabled, default to global */
292   if (!application_has_global_scope (app)
293       && !application_has_local_scope (app))
294     app->flags |= APP_OPTIONS_FLAGS_USE_GLOBAL_SCOPE;
295
296   props = application_segment_manager_properties (app);
297   segment_manager_properties_init (props);
298   props->segment_size = options[APP_OPTIONS_ADD_SEGMENT_SIZE];
299   props->prealloc_fifos = options[APP_OPTIONS_PREALLOC_FIFO_PAIRS];
300   if (options[APP_OPTIONS_ADD_SEGMENT_SIZE])
301     {
302       props->add_segment_size = options[APP_OPTIONS_ADD_SEGMENT_SIZE];
303       props->add_segment = 1;
304     }
305   if (options[APP_OPTIONS_RX_FIFO_SIZE])
306     props->rx_fifo_size = options[APP_OPTIONS_RX_FIFO_SIZE];
307   if (options[APP_OPTIONS_TX_FIFO_SIZE])
308     props->tx_fifo_size = options[APP_OPTIONS_TX_FIFO_SIZE];
309   if (options[APP_OPTIONS_EVT_QUEUE_SIZE])
310     props->evt_q_size = options[APP_OPTIONS_EVT_QUEUE_SIZE];
311   if (options[APP_OPTIONS_FLAGS] & APP_OPTIONS_FLAGS_EVT_MQ_USE_EVENTFD)
312     props->use_mq_eventfd = 1;
313   if (options[APP_OPTIONS_TLS_ENGINE])
314     app->tls_engine = options[APP_OPTIONS_TLS_ENGINE];
315   props->segment_type = seg_type;
316
317   /* Add app to lookup by api_client_index table */
318   application_table_add (app);
319   a->app_index = application_index (app);
320
321   APP_DBG ("New app name: %v api index: %u index %u", app->name,
322            app->api_client_index, app->app_index);
323
324   return 0;
325 }
326
327 void
328 application_free (application_t * app)
329 {
330   app_worker_map_t *wrk_map;
331   app_worker_t *app_wrk;
332
333   /*
334    * The app event queue allocated in first segment is cleared with
335    * the segment manager. No need to explicitly free it.
336    */
337   APP_DBG ("Delete app name %v api index: %d index: %d", app->name,
338            app->api_client_index, app->app_index);
339
340   if (application_is_proxy (app))
341     application_remove_proxy (app);
342
343   /* *INDENT-OFF* */
344   pool_flush (wrk_map, app->worker_maps, ({
345     app_wrk = app_worker_get (wrk_map->wrk_index);
346     app_worker_free (app_wrk);
347   }));
348   /* *INDENT-ON* */
349   pool_free (app->worker_maps);
350
351   application_table_del (app);
352   vec_free (app->name);
353   vec_free (app->tls_cert);
354   vec_free (app->tls_key);
355   pool_put (app_main.app_pool, app);
356 }
357
358 app_worker_t *
359 application_get_worker (application_t * app, u32 wrk_map_index)
360 {
361   app_worker_map_t *map;
362   map = app_worker_map_get (app, wrk_map_index);
363   if (!map)
364     return 0;
365   return app_worker_get (map->wrk_index);
366 }
367
368 app_worker_t *
369 application_get_default_worker (application_t * app)
370 {
371   return application_get_worker (app, 0);
372 }
373
374 app_worker_t *
375 app_worker_alloc (application_t * app)
376 {
377   app_worker_t *app_wrk;
378   pool_get (app_main.workers, app_wrk);
379   memset (app_wrk, 0, sizeof (*app_wrk));
380   app_wrk->wrk_index = app_wrk - app_main.workers;
381   app_wrk->app_index = app->app_index;
382   app_wrk->wrk_map_index = ~0;
383   app_wrk->connects_seg_manager = APP_INVALID_SEGMENT_MANAGER_INDEX;
384   app_wrk->first_segment_manager = APP_INVALID_SEGMENT_MANAGER_INDEX;
385   app_wrk->local_segment_manager = APP_INVALID_SEGMENT_MANAGER_INDEX;
386   APP_DBG ("New app %v worker %u", app_get_name (app), app_wrk->wrk_index);
387   return app_wrk;
388 }
389
390 app_worker_t *
391 app_worker_get (u32 wrk_index)
392 {
393   return pool_elt_at_index (app_main.workers, wrk_index);
394 }
395
396 app_worker_t *
397 app_worker_get_if_valid (u32 wrk_index)
398 {
399   if (pool_is_free_index (app_main.workers, wrk_index))
400     return 0;
401   return pool_elt_at_index (app_main.workers, wrk_index);
402 }
403
404 void
405 app_worker_free (app_worker_t * app_wrk)
406 {
407   application_t *app = application_get (app_wrk->app_index);
408   vnet_unbind_args_t _a, *a = &_a;
409   u64 handle, *handles = 0;
410   segment_manager_t *sm;
411   u32 sm_index;
412   int i;
413
414   /*
415    *  Listener cleanup
416    */
417
418   /* *INDENT-OFF* */
419   hash_foreach (handle, sm_index, app_wrk->listeners_table,
420   ({
421     vec_add1 (handles, handle);
422     sm = segment_manager_get (sm_index);
423     sm->app_wrk_index = SEGMENT_MANAGER_INVALID_APP_INDEX;
424   }));
425   /* *INDENT-ON* */
426
427   for (i = 0; i < vec_len (handles); i++)
428     {
429       a->app_index = app->app_index;
430       a->app_wrk_index = app_wrk->wrk_map_index;
431       a->handle = handles[i];
432       /* seg manager is removed when unbind completes */
433       vnet_unbind (a);
434     }
435
436   /*
437    * Connects segment manager cleanup
438    */
439
440   if (app_wrk->connects_seg_manager != APP_INVALID_SEGMENT_MANAGER_INDEX)
441     {
442       sm = segment_manager_get (app_wrk->connects_seg_manager);
443       sm->app_wrk_index = SEGMENT_MANAGER_INVALID_APP_INDEX;
444       segment_manager_init_del (sm);
445     }
446
447   /* If first segment manager is used by a listener */
448   if (app_wrk->first_segment_manager != APP_INVALID_SEGMENT_MANAGER_INDEX
449       && app_wrk->first_segment_manager != app_wrk->connects_seg_manager)
450     {
451       sm = segment_manager_get (app_wrk->first_segment_manager);
452       /* .. and has no fifos, e.g. it might be used for redirected sessions,
453        * remove it */
454       if (!segment_manager_has_fifos (sm))
455         {
456           sm->app_wrk_index = SEGMENT_MANAGER_INVALID_APP_INDEX;
457           segment_manager_del (sm);
458         }
459     }
460
461   /*
462    * Local sessions
463    */
464   application_local_sessions_free (app_wrk);
465
466   pool_put (app_main.workers, app_wrk);
467   if (CLIB_DEBUG)
468     memset (app_wrk, 0xfe, sizeof (*app_wrk));
469 }
470
471 int
472 app_worker_alloc_and_init (application_t * app, app_worker_t ** wrk)
473 {
474   app_worker_map_t *wrk_map;
475   app_worker_t *app_wrk;
476   segment_manager_t *sm;
477   int rv;
478
479   app_wrk = app_worker_alloc (app);
480   wrk_map = app_worker_map_alloc (app);
481   wrk_map->wrk_index = app_wrk->wrk_index;
482   app_wrk->wrk_map_index = app_worker_map_index (app, wrk_map);
483
484   /*
485    * Setup first segment manager
486    */
487   sm = segment_manager_new ();
488   sm->app_wrk_index = app_wrk->wrk_index;
489
490   if ((rv = segment_manager_init (sm, app->sm_properties.segment_size,
491                                   app->sm_properties.prealloc_fifos)))
492     {
493       app_worker_free (app_wrk);
494       return rv;
495     }
496   sm->first_is_protected = 1;
497
498   /*
499    * Setup app worker
500    */
501   app_wrk->first_segment_manager = segment_manager_index (sm);
502   app_wrk->listeners_table = hash_create (0, sizeof (u64));
503   app_wrk->event_queue = segment_manager_event_queue (sm);
504   app_wrk->app_is_builtin = application_is_builtin (app);
505
506   /*
507    * Segment manager for local sessions
508    */
509   sm = segment_manager_new ();
510   sm->app_wrk_index = app_wrk->wrk_index;
511   app_wrk->local_segment_manager = segment_manager_index (sm);
512   app_wrk->local_connects = hash_create (0, sizeof (u64));
513
514   *wrk = app_wrk;
515
516   return 0;
517 }
518
519 static segment_manager_t *
520 application_alloc_segment_manager (app_worker_t * app_wrk)
521 {
522   segment_manager_t *sm = 0;
523
524   /* If the first segment manager is not in use, don't allocate a new one */
525   if (app_wrk->first_segment_manager != APP_INVALID_SEGMENT_MANAGER_INDEX
526       && app_wrk->first_segment_manager_in_use == 0)
527     {
528       sm = segment_manager_get (app_wrk->first_segment_manager);
529       app_wrk->first_segment_manager_in_use = 1;
530       return sm;
531     }
532
533   sm = segment_manager_new ();
534   sm->app_wrk_index = app_wrk->wrk_index;
535
536   return sm;
537 }
538
539 /**
540  * Start listening local transport endpoint for requested transport.
541  *
542  * Creates a 'dummy' stream session with state LISTENING to be used in session
543  * lookups, prior to establishing connection. Requests transport to build
544  * it's own specific listening connection.
545  */
546 int
547 app_worker_start_listen (app_worker_t * app_wrk, session_endpoint_t * sep,
548                          session_handle_t * res)
549 {
550   segment_manager_t *sm;
551   stream_session_t *s;
552   session_handle_t handle;
553   session_type_t sst;
554
555   sst = session_type_from_proto_and_ip (sep->transport_proto, sep->is_ip4);
556   s = listen_session_new (0, sst);
557   s->app_wrk_index = app_wrk->wrk_index;
558
559   /* Allocate segment manager. All sessions derived out of a listen session
560    * have fifos allocated by the same segment manager. */
561   if (!(sm = application_alloc_segment_manager (app_wrk)))
562     goto err;
563
564   /* Add to app's listener table. Useful to find all child listeners
565    * when app goes down, although, just for unbinding this is not needed */
566   handle = listen_session_get_handle (s);
567   hash_set (app_wrk->listeners_table, handle, segment_manager_index (sm));
568
569   if (stream_session_listen (s, sep))
570     {
571       segment_manager_del (sm);
572       hash_unset (app_wrk->listeners_table, handle);
573       goto err;
574     }
575
576   *res = handle;
577   return 0;
578
579 err:
580   listen_session_del (s);
581   return -1;
582 }
583
584 /**
585  * Stop listening on session associated to handle
586  *
587  * @param handle        listener handle
588  * @param app_index     index of the app owning the handle. This is used
589  *                      only for validating ownership
590  */
591 int
592 app_worker_stop_listen (session_handle_t handle, u32 app_index)
593 {
594   stream_session_t *listener;
595   segment_manager_t *sm;
596   app_worker_t *app_wrk;
597   uword *indexp;
598
599   listener = listen_session_get_from_handle (handle);
600   app_wrk = app_worker_get (listener->app_wrk_index);
601   if (PREDICT_FALSE (!app_wrk || app_wrk->app_index != app_index))
602     {
603       clib_warning ("app doesn't own handle %llu!", handle);
604       return -1;
605     }
606   if (PREDICT_FALSE (hash_get (app_wrk->listeners_table, handle) == 0))
607     {
608       clib_warning ("listener handle was removed %llu!", handle);
609       return -1;
610     }
611
612   stream_session_stop_listen (listener);
613
614   indexp = hash_get (app_wrk->listeners_table, handle);
615   ASSERT (indexp);
616
617   sm = segment_manager_get (*indexp);
618   if (app_wrk->first_segment_manager == *indexp)
619     {
620       /* Delete sessions but don't remove segment manager */
621       app_wrk->first_segment_manager_in_use = 0;
622       segment_manager_del_sessions (sm);
623     }
624   else
625     {
626       segment_manager_init_del (sm);
627     }
628   hash_unset (app_wrk->listeners_table, handle);
629   listen_session_del (listener);
630
631   return 0;
632 }
633
634 int
635 app_worker_open_session (app_worker_t * app, session_endpoint_t * sep,
636                          u32 api_context)
637 {
638   int rv;
639
640   /* Make sure we have a segment manager for connects */
641   app_worker_alloc_connects_segment_manager (app);
642
643   if ((rv = session_open (app->wrk_index, sep, api_context)))
644     return rv;
645
646   return 0;
647 }
648
649 int
650 app_worker_alloc_connects_segment_manager (app_worker_t * app_wrk)
651 {
652   segment_manager_t *sm;
653
654   if (app_wrk->connects_seg_manager == APP_INVALID_SEGMENT_MANAGER_INDEX)
655     {
656       sm = application_alloc_segment_manager (app_wrk);
657       if (sm == 0)
658         return -1;
659       app_wrk->connects_seg_manager = segment_manager_index (sm);
660     }
661   return 0;
662 }
663
664 segment_manager_t *
665 app_worker_get_connect_segment_manager (app_worker_t * app)
666 {
667   ASSERT (app->connects_seg_manager != (u32) ~ 0);
668   return segment_manager_get (app->connects_seg_manager);
669 }
670
671 segment_manager_t *
672 app_worker_get_listen_segment_manager (app_worker_t * app,
673                                        stream_session_t * s)
674 {
675   uword *smp;
676   smp = hash_get (app->listeners_table, listen_session_get_handle (s));
677   ASSERT (smp != 0);
678   return segment_manager_get (*smp);
679 }
680
681 clib_error_t *
682 vnet_app_worker_add_del (vnet_app_worker_add_del_args_t * a)
683 {
684   svm_fifo_segment_private_t *fs;
685   app_worker_map_t *wrk_map;
686   app_worker_t *app_wrk;
687   segment_manager_t *sm;
688   application_t *app;
689   int rv;
690
691   app = application_get (a->app_index);
692   if (!app)
693     return clib_error_return_code (0, VNET_API_ERROR_INVALID_VALUE, 0,
694                                    "App %u does not exist", a->app_index);
695
696   if (a->is_add)
697     {
698       if ((rv = app_worker_alloc_and_init (app, &app_wrk)))
699         return clib_error_return_code (0, rv, 0, "app wrk init: %d", rv);
700       sm = segment_manager_get (app_wrk->first_segment_manager);
701       fs = segment_manager_get_segment_w_lock (sm, 0);
702       a->segment = &fs->ssvm;
703       segment_manager_segment_reader_unlock (sm);
704       a->evt_q = app_wrk->event_queue;
705     }
706   else
707     {
708       wrk_map = app_worker_map_get (app, a->wrk_index);
709       if (!wrk_map)
710         return clib_error_return_code (0, VNET_API_ERROR_INVALID_VALUE, 0,
711                                        "App %u does not have worker %u",
712                                        app->app_index, a->wrk_index);
713       app_wrk = app_worker_get (wrk_map->wrk_index);
714       app_worker_map_free (app, wrk_map);
715       if (!app_wrk)
716         return clib_error_return_code (0, VNET_API_ERROR_INVALID_VALUE, 0,
717                                        "No worker %u", a->wrk_index);
718       app_worker_free (app_wrk);
719     }
720   return 0;
721 }
722
723 segment_manager_t *
724 application_get_local_segment_manager (app_worker_t * app)
725 {
726   return segment_manager_get (app->local_segment_manager);
727 }
728
729 segment_manager_t *
730 application_get_local_segment_manager_w_session (app_worker_t * app,
731                                                  local_session_t * ls)
732 {
733   stream_session_t *listener;
734   if (application_local_session_listener_has_transport (ls))
735     {
736       listener = listen_session_get (ls->listener_index);
737       return app_worker_get_listen_segment_manager (app, listener);
738     }
739   return segment_manager_get (app->local_segment_manager);
740 }
741
742 int
743 application_is_proxy (application_t * app)
744 {
745   return (app->flags & APP_OPTIONS_FLAGS_IS_PROXY);
746 }
747
748 int
749 application_is_builtin (application_t * app)
750 {
751   return (app->flags & APP_OPTIONS_FLAGS_IS_BUILTIN);
752 }
753
754 int
755 application_is_builtin_proxy (application_t * app)
756 {
757   return (application_is_proxy (app) && application_is_builtin (app));
758 }
759
760 u8
761 application_has_local_scope (application_t * app)
762 {
763   return app->flags & APP_OPTIONS_FLAGS_USE_LOCAL_SCOPE;
764 }
765
766 u8
767 application_has_global_scope (application_t * app)
768 {
769   return app->flags & APP_OPTIONS_FLAGS_USE_GLOBAL_SCOPE;
770 }
771
772 u8
773 application_use_mq_for_ctrl (application_t * app)
774 {
775   return app->flags & APP_OPTIONS_FLAGS_USE_MQ_FOR_CTRL_MSGS;
776 }
777
778 /**
779  * Send an API message to the external app, to map new segment
780  */
781 int
782 app_worker_add_segment_notify (u32 app_wrk_index, ssvm_private_t * fs)
783 {
784   app_worker_t *app_wrk = app_worker_get (app_wrk_index);
785   application_t *app = application_get (app_wrk->app_index);
786   return app->cb_fns.add_segment_callback (app->api_client_index, fs);
787 }
788
789 u32
790 application_n_listeners (app_worker_t * app)
791 {
792   return hash_elts (app->listeners_table);
793 }
794
795 stream_session_t *
796 app_worker_first_listener (app_worker_t * app, u8 fib_proto,
797                            u8 transport_proto)
798 {
799   stream_session_t *listener;
800   u64 handle;
801   u32 sm_index;
802   u8 sst;
803
804   sst = session_type_from_proto_and_ip (transport_proto,
805                                         fib_proto == FIB_PROTOCOL_IP4);
806
807   /* *INDENT-OFF* */
808    hash_foreach (handle, sm_index, app->listeners_table, ({
809      listener = listen_session_get_from_handle (handle);
810      if (listener->session_type == sst
811          && listener->listener_index != SESSION_PROXY_LISTENER_INDEX)
812        return listener;
813    }));
814   /* *INDENT-ON* */
815
816   return 0;
817 }
818
819 u8
820 app_worker_application_is_builtin (app_worker_t * app_wrk)
821 {
822   return app_wrk->app_is_builtin;
823 }
824
825 stream_session_t *
826 application_proxy_listener (app_worker_t * app, u8 fib_proto,
827                             u8 transport_proto)
828 {
829   stream_session_t *listener;
830   u64 handle;
831   u32 sm_index;
832   u8 sst;
833
834   sst = session_type_from_proto_and_ip (transport_proto,
835                                         fib_proto == FIB_PROTOCOL_IP4);
836
837   /* *INDENT-OFF* */
838    hash_foreach (handle, sm_index, app->listeners_table, ({
839      listener = listen_session_get_from_handle (handle);
840      if (listener->session_type == sst
841          && listener->listener_index == SESSION_PROXY_LISTENER_INDEX)
842        return listener;
843    }));
844   /* *INDENT-ON* */
845
846   return 0;
847 }
848
849 static clib_error_t *
850 application_start_stop_proxy_fib_proto (application_t * app, u8 fib_proto,
851                                         u8 transport_proto, u8 is_start)
852 {
853   app_namespace_t *app_ns = app_namespace_get (app->ns_index);
854   u8 is_ip4 = (fib_proto == FIB_PROTOCOL_IP4);
855   session_endpoint_t sep = SESSION_ENDPOINT_NULL;
856   transport_connection_t *tc;
857   app_worker_t *app_wrk;
858   stream_session_t *s;
859   u64 handle;
860
861   /* TODO decide if we want proxy to be enabled for all workers */
862   app_wrk = application_get_default_worker (app);
863   if (is_start)
864     {
865       s = app_worker_first_listener (app_wrk, fib_proto, transport_proto);
866       if (!s)
867         {
868           sep.is_ip4 = is_ip4;
869           sep.fib_index = app_namespace_get_fib_index (app_ns, fib_proto);
870           sep.sw_if_index = app_ns->sw_if_index;
871           sep.transport_proto = transport_proto;
872           app_worker_start_listen (app_wrk, &sep, &handle);
873           s = listen_session_get_from_handle (handle);
874           s->listener_index = SESSION_PROXY_LISTENER_INDEX;
875         }
876     }
877   else
878     {
879       s = application_proxy_listener (app_wrk, fib_proto, transport_proto);
880       ASSERT (s);
881     }
882
883   tc = listen_session_get_transport (s);
884
885   if (!ip_is_zero (&tc->lcl_ip, 1))
886     {
887       u32 sti;
888       sep.is_ip4 = is_ip4;
889       sep.fib_index = app_namespace_get_fib_index (app_ns, fib_proto);
890       sep.transport_proto = transport_proto;
891       sep.port = 0;
892       sti = session_lookup_get_index_for_fib (fib_proto, sep.fib_index);
893       if (is_start)
894         session_lookup_add_session_endpoint (sti, &sep, s->session_index);
895       else
896         session_lookup_del_session_endpoint (sti, &sep);
897     }
898
899   return 0;
900 }
901
902 static void
903 application_start_stop_proxy_local_scope (application_t * app,
904                                           u8 transport_proto, u8 is_start)
905 {
906   session_endpoint_t sep = SESSION_ENDPOINT_NULL;
907   app_namespace_t *app_ns;
908   app_ns = app_namespace_get (app->ns_index);
909   sep.is_ip4 = 1;
910   sep.transport_proto = transport_proto;
911   sep.port = 0;
912
913   if (is_start)
914     {
915       session_lookup_add_session_endpoint (app_ns->local_table_index, &sep,
916                                            app->app_index);
917       sep.is_ip4 = 0;
918       session_lookup_add_session_endpoint (app_ns->local_table_index, &sep,
919                                            app->app_index);
920     }
921   else
922     {
923       session_lookup_del_session_endpoint (app_ns->local_table_index, &sep);
924       sep.is_ip4 = 0;
925       session_lookup_del_session_endpoint (app_ns->local_table_index, &sep);
926     }
927 }
928
929 void
930 application_start_stop_proxy (application_t * app,
931                               transport_proto_t transport_proto, u8 is_start)
932 {
933   if (application_has_local_scope (app))
934     application_start_stop_proxy_local_scope (app, transport_proto, is_start);
935
936   if (application_has_global_scope (app))
937     {
938       application_start_stop_proxy_fib_proto (app, FIB_PROTOCOL_IP4,
939                                               transport_proto, is_start);
940       application_start_stop_proxy_fib_proto (app, FIB_PROTOCOL_IP6,
941                                               transport_proto, is_start);
942     }
943 }
944
945 void
946 application_setup_proxy (application_t * app)
947 {
948   u16 transports = app->proxied_transports;
949   transport_proto_t tp;
950
951   ASSERT (application_is_proxy (app));
952
953   /* *INDENT-OFF* */
954   transport_proto_foreach (tp, ({
955     if (transports & (1 << tp))
956       application_start_stop_proxy (app, tp, 1);
957   }));
958   /* *INDENT-ON* */
959 }
960
961 void
962 application_remove_proxy (application_t * app)
963 {
964   u16 transports = app->proxied_transports;
965   transport_proto_t tp;
966
967   ASSERT (application_is_proxy (app));
968
969   /* *INDENT-OFF* */
970   transport_proto_foreach (tp, ({
971     if (transports & (1 << tp))
972       application_start_stop_proxy (app, tp, 0);
973   }));
974   /* *INDENT-ON* */
975 }
976
977 segment_manager_properties_t *
978 application_segment_manager_properties (application_t * app)
979 {
980   return &app->sm_properties;
981 }
982
983 segment_manager_properties_t *
984 application_get_segment_manager_properties (u32 app_index)
985 {
986   application_t *app = application_get (app_index);
987   return &app->sm_properties;
988 }
989
990 static inline int
991 app_enqueue_evt (svm_msg_q_t * mq, svm_msg_q_msg_t * msg, u8 lock)
992 {
993   if (PREDICT_FALSE (svm_msg_q_is_full (mq)))
994     {
995       clib_warning ("evt q full");
996       svm_msg_q_free_msg (mq, msg);
997       if (lock)
998         svm_msg_q_unlock (mq);
999       return -1;
1000     }
1001
1002   if (lock)
1003     {
1004       svm_msg_q_add_and_unlock (mq, msg);
1005       return 0;
1006     }
1007
1008   /* Even when not locking the ring, we must wait for queue mutex */
1009   if (svm_msg_q_add (mq, msg, SVM_Q_WAIT))
1010     {
1011       clib_warning ("msg q add returned");
1012       return -1;
1013     }
1014   return 0;
1015 }
1016
1017 static inline int
1018 app_send_io_evt_rx (app_worker_t * app_wrk, stream_session_t * s, u8 lock)
1019 {
1020   session_event_t *evt;
1021   svm_msg_q_msg_t msg;
1022   svm_msg_q_t *mq;
1023
1024   if (PREDICT_FALSE (s->session_state != SESSION_STATE_READY
1025                      && s->session_state != SESSION_STATE_LISTENING))
1026     {
1027       /* Session is closed so app will never clean up. Flush rx fifo */
1028       if (s->session_state == SESSION_STATE_CLOSED)
1029         svm_fifo_dequeue_drop_all (s->server_rx_fifo);
1030       return 0;
1031     }
1032
1033   if (app_worker_application_is_builtin (app_wrk))
1034     {
1035       application_t *app = application_get (app_wrk->app_index);
1036       return app->cb_fns.builtin_app_rx_callback (s);
1037     }
1038
1039   if (svm_fifo_has_event (s->server_rx_fifo)
1040       || svm_fifo_is_empty (s->server_rx_fifo))
1041     return 0;
1042
1043   mq = app_wrk->event_queue;
1044   if (lock)
1045     svm_msg_q_lock (mq);
1046
1047   if (PREDICT_FALSE (svm_msg_q_ring_is_full (mq, SESSION_MQ_IO_EVT_RING)))
1048     {
1049       clib_warning ("evt q rings full");
1050       if (lock)
1051         svm_msg_q_unlock (mq);
1052       return -1;
1053     }
1054
1055   msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_IO_EVT_RING);
1056   ASSERT (!svm_msg_q_msg_is_invalid (&msg));
1057
1058   evt = (session_event_t *) svm_msg_q_msg_data (mq, &msg);
1059   evt->fifo = s->server_rx_fifo;
1060   evt->event_type = FIFO_EVENT_APP_RX;
1061
1062   if (app_enqueue_evt (mq, &msg, lock))
1063     return -1;
1064   (void) svm_fifo_set_event (s->server_rx_fifo);
1065   return 0;
1066 }
1067
1068 static inline int
1069 app_send_io_evt_tx (app_worker_t * app_wrk, stream_session_t * s, u8 lock)
1070 {
1071   svm_msg_q_t *mq;
1072   session_event_t *evt;
1073   svm_msg_q_msg_t msg;
1074
1075   if (app_worker_application_is_builtin (app_wrk))
1076     return 0;
1077
1078   mq = app_wrk->event_queue;
1079   if (lock)
1080     svm_msg_q_lock (mq);
1081
1082   if (PREDICT_FALSE (svm_msg_q_ring_is_full (mq, SESSION_MQ_IO_EVT_RING)))
1083     {
1084       clib_warning ("evt q rings full");
1085       if (lock)
1086         svm_msg_q_unlock (mq);
1087       return -1;
1088     }
1089
1090   msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_IO_EVT_RING);
1091   ASSERT (!svm_msg_q_msg_is_invalid (&msg));
1092
1093   evt = (session_event_t *) svm_msg_q_msg_data (mq, &msg);
1094   evt->event_type = FIFO_EVENT_APP_TX;
1095   evt->fifo = s->server_tx_fifo;
1096
1097   return app_enqueue_evt (mq, &msg, lock);
1098 }
1099
1100 /* *INDENT-OFF* */
1101 typedef int (app_send_evt_handler_fn) (app_worker_t *app,
1102                                        stream_session_t *s,
1103                                        u8 lock);
1104 static app_send_evt_handler_fn * const app_send_evt_handler_fns[3] = {
1105     app_send_io_evt_rx,
1106     0,
1107     app_send_io_evt_tx,
1108 };
1109 /* *INDENT-ON* */
1110
1111 /**
1112  * Send event to application
1113  *
1114  * Logic from queue perspective is non-blocking. That is, if there's
1115  * not enough space to enqueue a message, we return. However, if the lock
1116  * flag is set, we do wait for queue mutex.
1117  */
1118 int
1119 application_send_event (app_worker_t * app, stream_session_t * s, u8 evt_type)
1120 {
1121   ASSERT (app && evt_type <= FIFO_EVENT_APP_TX);
1122   return app_send_evt_handler_fns[evt_type] (app, s, 0 /* lock */ );
1123 }
1124
1125 int
1126 application_lock_and_send_event (app_worker_t * app, stream_session_t * s,
1127                                  u8 evt_type)
1128 {
1129   return app_send_evt_handler_fns[evt_type] (app, s, 1 /* lock */ );
1130 }
1131
1132 local_session_t *
1133 application_alloc_local_session (app_worker_t * app)
1134 {
1135   local_session_t *s;
1136   pool_get (app->local_sessions, s);
1137   memset (s, 0, sizeof (*s));
1138   s->app_wrk_index = app->app_index;
1139   s->session_index = s - app->local_sessions;
1140   s->session_type = session_type_from_proto_and_ip (TRANSPORT_PROTO_NONE, 0);
1141   return s;
1142 }
1143
1144 void
1145 application_free_local_session (app_worker_t * app, local_session_t * s)
1146 {
1147   pool_put (app->local_sessions, s);
1148   if (CLIB_DEBUG)
1149     memset (s, 0xfc, sizeof (*s));
1150 }
1151
1152 local_session_t *
1153 application_get_local_session (app_worker_t * app_wrk, u32 session_index)
1154 {
1155   if (pool_is_free_index (app_wrk->local_sessions, session_index))
1156     return 0;
1157   return pool_elt_at_index (app_wrk->local_sessions, session_index);
1158 }
1159
1160 local_session_t *
1161 application_get_local_session_from_handle (session_handle_t handle)
1162 {
1163   app_worker_t *server_wrk;
1164   u32 session_index, server_wrk_index;
1165   local_session_parse_handle (handle, &server_wrk_index, &session_index);
1166   server_wrk = app_worker_get_if_valid (server_wrk_index);
1167   if (!server_wrk)
1168     return 0;
1169   return application_get_local_session (server_wrk, session_index);
1170 }
1171
1172 local_session_t *
1173 application_get_local_listen_session_from_handle (session_handle_t lh)
1174 {
1175   u32 ll_index, server_wrk_index;
1176   app_worker_t *server_wrk;
1177
1178   local_session_parse_handle (lh, &server_wrk_index, &ll_index);
1179   server_wrk = app_worker_get (server_wrk_index);
1180   return application_get_local_listen_session (server_wrk, ll_index);
1181 }
1182
1183 always_inline void
1184 application_local_listener_session_endpoint (local_session_t * ll,
1185                                              session_endpoint_t * sep)
1186 {
1187   sep->transport_proto =
1188     session_type_transport_proto (ll->listener_session_type);
1189   sep->port = ll->port;
1190   sep->is_ip4 = ll->listener_session_type & 1;
1191 }
1192
1193 int
1194 application_start_local_listen (app_worker_t * app_wrk,
1195                                 session_endpoint_t * sep,
1196                                 session_handle_t * handle)
1197 {
1198   session_handle_t lh;
1199   local_session_t *ll;
1200   application_t *app;
1201   u32 table_index;
1202
1203   app = application_get (app_wrk->app_index);
1204   table_index = application_local_session_table (app);
1205
1206   /* An exact sep match, as opposed to session_lookup_local_listener */
1207   lh = session_lookup_endpoint_listener (table_index, sep, 1);
1208   if (lh != SESSION_INVALID_HANDLE)
1209     return VNET_API_ERROR_ADDRESS_IN_USE;
1210
1211   pool_get (app_wrk->local_listen_sessions, ll);
1212   memset (ll, 0, sizeof (*ll));
1213   ll->session_type = session_type_from_proto_and_ip (TRANSPORT_PROTO_NONE, 0);
1214   ll->app_wrk_index = app_wrk->app_index;
1215   ll->session_index = ll - app_wrk->local_listen_sessions;
1216   ll->port = sep->port;
1217   /* Store the original session type for the unbind */
1218   ll->listener_session_type =
1219     session_type_from_proto_and_ip (sep->transport_proto, sep->is_ip4);
1220   ll->transport_listener_index = ~0;
1221
1222   *handle = application_local_session_handle (ll);
1223   session_lookup_add_session_endpoint (table_index, sep, *handle);
1224
1225   return 0;
1226 }
1227
1228 /**
1229  * Clean up local session table. If we have a listener session use it to
1230  * find the port and proto. If not, the handle must be a local table handle
1231  * so parse it.
1232  */
1233 int
1234 application_stop_local_listen (session_handle_t lh, u32 app_index)
1235 {
1236   session_endpoint_t sep = SESSION_ENDPOINT_NULL;
1237   u32 table_index, ll_index, server_wrk_index;
1238   app_worker_t *server_wrk;
1239   stream_session_t *sl = 0;
1240   local_session_t *ll, *ls;
1241   application_t *server;
1242
1243   server = application_get (app_index);
1244   table_index = application_local_session_table (server);
1245
1246   /* We have both local and global table binds. Figure from global what
1247    * the sep we should be cleaning up is.
1248    */
1249   if (!session_handle_is_local (lh))
1250     {
1251       sl = listen_session_get_from_handle (lh);
1252       if (!sl || listen_session_get_local_session_endpoint (sl, &sep))
1253         {
1254           clib_warning ("broken listener");
1255           return -1;
1256         }
1257       lh = session_lookup_endpoint_listener (table_index, &sep, 0);
1258       if (lh == SESSION_INVALID_HANDLE)
1259         return -1;
1260     }
1261
1262   local_session_parse_handle (lh, &server_wrk_index, &ll_index);
1263   server_wrk = app_worker_get (server_wrk_index);
1264   if (PREDICT_FALSE (server_wrk->app_index != app_index))
1265     {
1266       clib_warning ("app %u does not own local handle 0x%lx", app_index, lh);
1267     }
1268   ll = application_get_local_listen_session (server_wrk, ll_index);
1269   if (PREDICT_FALSE (!ll))
1270     {
1271       clib_warning ("no local listener");
1272       return -1;
1273     }
1274   application_local_listener_session_endpoint (ll, &sep);
1275   session_lookup_del_session_endpoint (table_index, &sep);
1276
1277   /* *INDENT-OFF* */
1278   pool_foreach (ls, server_wrk->local_sessions, ({
1279     if (ls->listener_index == ll->session_index)
1280       application_local_session_disconnect (server_wrk->app_index, ls);
1281   }));
1282   /* *INDENT-ON* */
1283   pool_put_index (server_wrk->local_listen_sessions, ll->session_index);
1284
1285   return 0;
1286 }
1287
1288 static void
1289 application_local_session_fix_eventds (svm_msg_q_t * sq, svm_msg_q_t * cq)
1290 {
1291   int fd;
1292
1293   /*
1294    * segment manager initializes only the producer eventds, since vpp is
1295    * typically the producer. But for local sessions, we also pass to the
1296    * apps the mqs they listen on for events from peer apps, so they are also
1297    * consumer fds.
1298    */
1299   fd = svm_msg_q_get_producer_eventfd (sq);
1300   svm_msg_q_set_consumer_eventfd (sq, fd);
1301   fd = svm_msg_q_get_producer_eventfd (cq);
1302   svm_msg_q_set_consumer_eventfd (cq, fd);
1303 }
1304
1305 int
1306 application_local_session_connect (app_worker_t * client_wrk,
1307                                    app_worker_t * server_wrk,
1308                                    local_session_t * ll, u32 opaque)
1309 {
1310   u32 seg_size, evt_q_sz, evt_q_elts, margin = 16 << 10;
1311   segment_manager_properties_t *props, *cprops;
1312   u32 round_rx_fifo_sz, round_tx_fifo_sz;
1313   int rv, has_transport, seg_index;
1314   svm_fifo_segment_private_t *seg;
1315   application_t *server, *client;
1316   segment_manager_t *sm;
1317   local_session_t *ls;
1318   svm_msg_q_t *sq, *cq;
1319
1320   ls = application_alloc_local_session (server_wrk);
1321
1322   server = application_get (server_wrk->app_index);
1323   client = application_get (client_wrk->app_index);
1324
1325   props = application_segment_manager_properties (server);
1326   cprops = application_segment_manager_properties (client);
1327   evt_q_elts = props->evt_q_size + cprops->evt_q_size;
1328   evt_q_sz = segment_manager_evt_q_expected_size (evt_q_elts);
1329   round_rx_fifo_sz = 1 << max_log2 (props->rx_fifo_size);
1330   round_tx_fifo_sz = 1 << max_log2 (props->tx_fifo_size);
1331   seg_size = round_rx_fifo_sz + round_tx_fifo_sz + evt_q_sz + margin;
1332
1333   has_transport = session_has_transport ((stream_session_t *) ll);
1334   if (!has_transport)
1335     {
1336       /* Local sessions don't have backing transport */
1337       ls->port = ll->port;
1338       sm = application_get_local_segment_manager (server_wrk);
1339     }
1340   else
1341     {
1342       stream_session_t *sl = (stream_session_t *) ll;
1343       transport_connection_t *tc;
1344       tc = listen_session_get_transport (sl);
1345       ls->port = tc->lcl_port;
1346       sm = app_worker_get_listen_segment_manager (server_wrk, sl);
1347     }
1348
1349   seg_index = segment_manager_add_segment (sm, seg_size);
1350   if (seg_index < 0)
1351     {
1352       clib_warning ("failed to add new cut-through segment");
1353       return seg_index;
1354     }
1355   seg = segment_manager_get_segment_w_lock (sm, seg_index);
1356   sq = segment_manager_alloc_queue (seg, props);
1357   cq = segment_manager_alloc_queue (seg, cprops);
1358
1359   if (props->use_mq_eventfd)
1360     application_local_session_fix_eventds (sq, cq);
1361
1362   ls->server_evt_q = pointer_to_uword (sq);
1363   ls->client_evt_q = pointer_to_uword (cq);
1364   rv = segment_manager_try_alloc_fifos (seg, props->rx_fifo_size,
1365                                         props->tx_fifo_size,
1366                                         &ls->server_rx_fifo,
1367                                         &ls->server_tx_fifo);
1368   if (rv)
1369     {
1370       clib_warning ("failed to add fifos in cut-through segment");
1371       segment_manager_segment_reader_unlock (sm);
1372       goto failed;
1373     }
1374   ls->server_rx_fifo->master_session_index = ls->session_index;
1375   ls->server_tx_fifo->master_session_index = ls->session_index;
1376   ls->server_rx_fifo->master_thread_index = ~0;
1377   ls->server_tx_fifo->master_thread_index = ~0;
1378   ls->svm_segment_index = seg_index;
1379   ls->listener_index = ll->session_index;
1380   ls->client_wrk_index = client_wrk->wrk_index;
1381   ls->client_opaque = opaque;
1382   ls->listener_session_type = ll->session_type;
1383
1384   if ((rv = server->cb_fns.add_segment_callback (server->api_client_index,
1385                                                  &seg->ssvm)))
1386     {
1387       clib_warning ("failed to notify server of new segment");
1388       segment_manager_segment_reader_unlock (sm);
1389       goto failed;
1390     }
1391   segment_manager_segment_reader_unlock (sm);
1392   if ((rv = server->cb_fns.session_accept_callback ((stream_session_t *) ls)))
1393     {
1394       clib_warning ("failed to send accept cut-through notify to server");
1395       goto failed;
1396     }
1397   if (server->flags & APP_OPTIONS_FLAGS_IS_BUILTIN)
1398     application_local_session_connect_notify (ls);
1399
1400   return 0;
1401
1402 failed:
1403   if (!has_transport)
1404     segment_manager_del_segment (sm, seg);
1405   return rv;
1406 }
1407
1408 static uword
1409 application_client_local_connect_key (local_session_t * ls)
1410 {
1411   return ((uword) ls->app_wrk_index << 32 | (uword) ls->session_index);
1412 }
1413
1414 static void
1415 application_client_local_connect_key_parse (uword key, u32 * app_wrk_index,
1416                                             u32 * session_index)
1417 {
1418   *app_wrk_index = key >> 32;
1419   *session_index = key & 0xFFFFFFFF;
1420 }
1421
1422 int
1423 application_local_session_connect_notify (local_session_t * ls)
1424 {
1425   svm_fifo_segment_private_t *seg;
1426   app_worker_t *client_wrk, *server_wrk;
1427   segment_manager_t *sm;
1428   application_t *client;
1429   int rv, is_fail = 0;
1430   uword client_key;
1431
1432   client_wrk = app_worker_get (ls->client_wrk_index);
1433   server_wrk = app_worker_get (ls->app_wrk_index);
1434   client = application_get (client_wrk->app_index);
1435
1436   sm = application_get_local_segment_manager_w_session (server_wrk, ls);
1437   seg = segment_manager_get_segment_w_lock (sm, ls->svm_segment_index);
1438   if ((rv = client->cb_fns.add_segment_callback (client->api_client_index,
1439                                                  &seg->ssvm)))
1440     {
1441       clib_warning ("failed to notify client %u of new segment",
1442                     ls->client_wrk_index);
1443       segment_manager_segment_reader_unlock (sm);
1444       application_local_session_disconnect (ls->client_wrk_index, ls);
1445       is_fail = 1;
1446     }
1447   else
1448     {
1449       segment_manager_segment_reader_unlock (sm);
1450     }
1451
1452   client->cb_fns.session_connected_callback (client_wrk->wrk_index,
1453                                              ls->client_opaque,
1454                                              (stream_session_t *) ls,
1455                                              is_fail);
1456
1457   client_key = application_client_local_connect_key (ls);
1458   hash_set (client_wrk->local_connects, client_key, client_key);
1459   return 0;
1460 }
1461
1462 int
1463 application_local_session_cleanup (app_worker_t * client_wrk,
1464                                    app_worker_t * server_wrk,
1465                                    local_session_t * ls)
1466 {
1467   svm_fifo_segment_private_t *seg;
1468   segment_manager_t *sm;
1469   uword client_key;
1470   u8 has_transport;
1471
1472   has_transport = session_has_transport ((stream_session_t *) ls);
1473   client_key = application_client_local_connect_key (ls);
1474   if (!has_transport)
1475     sm = application_get_local_segment_manager_w_session (server_wrk, ls);
1476   else
1477     sm = app_worker_get_listen_segment_manager (server_wrk,
1478                                                 (stream_session_t *) ls);
1479
1480   seg = segment_manager_get_segment (sm, ls->svm_segment_index);
1481   if (client_wrk)
1482     hash_unset (client_wrk->local_connects, client_key);
1483
1484   if (!has_transport)
1485     {
1486       application_t *server = application_get (server_wrk->app_index);
1487       server->cb_fns.del_segment_callback (server->api_client_index,
1488                                            &seg->ssvm);
1489       if (client_wrk)
1490         {
1491           application_t *client = application_get (client_wrk->app_index);
1492           client->cb_fns.del_segment_callback (client->api_client_index,
1493                                                &seg->ssvm);
1494         }
1495       segment_manager_del_segment (sm, seg);
1496     }
1497
1498   application_free_local_session (server_wrk, ls);
1499
1500   return 0;
1501 }
1502
1503 int
1504 application_local_session_disconnect (u32 app_wrk_index, local_session_t * ls)
1505 {
1506   app_worker_t *client_wrk, *server_wrk;
1507
1508   client_wrk = app_worker_get_if_valid (ls->client_wrk_index);
1509   server_wrk = app_worker_get (ls->app_wrk_index);
1510
1511
1512   if (ls->session_state == SESSION_STATE_CLOSED)
1513     return application_local_session_cleanup (client_wrk, server_wrk, ls);
1514
1515   if (app_wrk_index == ls->client_wrk_index)
1516     {
1517       mq_send_local_session_disconnected_cb (ls->app_wrk_index, ls);
1518     }
1519   else
1520     {
1521       if (!client_wrk)
1522         {
1523           return application_local_session_cleanup (client_wrk, server_wrk,
1524                                                     ls);
1525         }
1526       else if (ls->session_state < SESSION_STATE_READY)
1527         {
1528           application_t *client = application_get (client_wrk->app_index);
1529           client->cb_fns.session_connected_callback (client_wrk->wrk_index,
1530                                                      ls->client_opaque,
1531                                                      (stream_session_t *) ls,
1532                                                      1 /* is_fail */ );
1533           ls->session_state = SESSION_STATE_CLOSED;
1534           return application_local_session_cleanup (client_wrk, server_wrk,
1535                                                     ls);
1536         }
1537       else
1538         {
1539           mq_send_local_session_disconnected_cb (client_wrk->wrk_index, ls);
1540         }
1541     }
1542
1543   ls->session_state = SESSION_STATE_CLOSED;
1544
1545   return 0;
1546 }
1547
1548 int
1549 application_local_session_disconnect_w_index (u32 app_wrk_index, u32 ls_index)
1550 {
1551   app_worker_t *app_wrk;
1552   local_session_t *ls;
1553   app_wrk = app_worker_get (app_wrk_index);
1554   ls = application_get_local_session (app_wrk, ls_index);
1555   return application_local_session_disconnect (app_wrk_index, ls);
1556 }
1557
1558 void
1559 application_local_sessions_free (app_worker_t * app_wrk)
1560 {
1561   u32 index, server_wrk_index, session_index, table_index;
1562   segment_manager_t *sm;
1563   u64 handle, *handles = 0;
1564   local_session_t *ls, *ll;
1565   app_worker_t *server_wrk;
1566   session_endpoint_t sep;
1567   application_t *app;
1568   int i;
1569
1570   /*
1571    * Local listens. Don't bother with local sessions, we clean them lower
1572    */
1573   app = application_get (app_wrk->app_index);
1574   table_index = application_local_session_table (app);
1575   /* *INDENT-OFF* */
1576   pool_foreach (ll, app_wrk->local_listen_sessions, ({
1577     application_local_listener_session_endpoint (ll, &sep);
1578     session_lookup_del_session_endpoint (table_index, &sep);
1579   }));
1580   /* *INDENT-ON* */
1581
1582   /*
1583    * Local sessions
1584    */
1585   if (app_wrk->local_sessions)
1586     {
1587       /* *INDENT-OFF* */
1588       pool_foreach (ls, app_wrk->local_sessions, ({
1589         application_local_session_disconnect (app_wrk->wrk_index, ls);
1590       }));
1591       /* *INDENT-ON* */
1592     }
1593
1594   /*
1595    * Local connects
1596    */
1597   vec_reset_length (handles);
1598   /* *INDENT-OFF* */
1599   hash_foreach (handle, index, app_wrk->local_connects, ({
1600     vec_add1 (handles, handle);
1601   }));
1602   /* *INDENT-ON* */
1603
1604   for (i = 0; i < vec_len (handles); i++)
1605     {
1606       application_client_local_connect_key_parse (handles[i],
1607                                                   &server_wrk_index,
1608                                                   &session_index);
1609       server_wrk = app_worker_get_if_valid (server_wrk_index);
1610       if (server_wrk)
1611         {
1612           ls = application_get_local_session (server_wrk, session_index);
1613           application_local_session_disconnect (app_wrk->wrk_index, ls);
1614         }
1615     }
1616
1617   sm = segment_manager_get (app_wrk->local_segment_manager);
1618   sm->app_wrk_index = SEGMENT_MANAGER_INVALID_APP_INDEX;
1619   segment_manager_del (sm);
1620 }
1621
1622 clib_error_t *
1623 vnet_app_add_tls_cert (vnet_app_add_tls_cert_args_t * a)
1624 {
1625   application_t *app;
1626   app = application_get (a->app_index);
1627   if (!app)
1628     return clib_error_return_code (0, VNET_API_ERROR_APPLICATION_NOT_ATTACHED,
1629                                    0, "app %u doesn't exist", a->app_index);
1630   app->tls_cert = vec_dup (a->cert);
1631   return 0;
1632 }
1633
1634 clib_error_t *
1635 vnet_app_add_tls_key (vnet_app_add_tls_key_args_t * a)
1636 {
1637   application_t *app;
1638   app = application_get (a->app_index);
1639   if (!app)
1640     return clib_error_return_code (0, VNET_API_ERROR_APPLICATION_NOT_ATTACHED,
1641                                    0, "app %u doesn't exist", a->app_index);
1642   app->tls_key = vec_dup (a->key);
1643   return 0;
1644 }
1645
1646 u8 *
1647 format_app_worker_listener (u8 * s, va_list * args)
1648 {
1649   app_worker_t *app_wrk = va_arg (*args, app_worker_t *);
1650   u64 handle = va_arg (*args, u64);
1651   u32 sm_index = va_arg (*args, u32);
1652   int verbose = va_arg (*args, int);
1653   stream_session_t *listener;
1654   application_t *app;
1655   u8 *app_name, *str;
1656
1657   if (!app_wrk)
1658     {
1659       if (verbose)
1660         s = format (s, "%-40s%-25s%-15s%-15s%-10s", "Connection", "App",
1661                     "API Client", "ListenerID", "SegManager");
1662       else
1663         s = format (s, "%-40s%-25s", "Connection", "App");
1664
1665       return s;
1666     }
1667
1668   app = application_get (app_wrk->app_index);
1669   app_name = app_get_name_from_reg_index (app);
1670   listener = listen_session_get_from_handle (handle);
1671   str = format (0, "%U", format_stream_session, listener, verbose);
1672
1673   if (verbose)
1674     {
1675       s = format (s, "%-40s%-25s%-15u%-15u%-10u", str, app_name,
1676                   app->api_client_index, handle, sm_index);
1677     }
1678   else
1679     s = format (s, "%-40s%-25s", str, app_name);
1680
1681   vec_free (app_name);
1682   return s;
1683 }
1684
1685 static void
1686 application_format_listeners (application_t * app, int verbose)
1687 {
1688   vlib_main_t *vm = vlib_get_main ();
1689   app_worker_map_t *wrk_map;
1690   app_worker_t *app_wrk;
1691   u32 sm_index;
1692   u64 handle;
1693
1694   if (!app)
1695     {
1696       vlib_cli_output (vm, "%U", format_app_worker_listener, 0 /* header */ ,
1697                        0, 0, verbose);
1698       return;
1699     }
1700
1701   /* *INDENT-OFF* */
1702   pool_foreach (wrk_map, app->worker_maps, ({
1703     app_wrk = app_worker_get (wrk_map->wrk_index);
1704     if (hash_elts (app_wrk->listeners_table) == 0)
1705       continue;
1706     hash_foreach (handle, sm_index, app_wrk->listeners_table, ({
1707       vlib_cli_output (vm, "%U", format_app_worker_listener, app_wrk,
1708                        handle, sm_index, verbose);
1709     }));
1710   }));
1711   /* *INDENT-ON* */
1712 }
1713
1714 static void
1715 app_worker_format_connects (app_worker_t * app_wrk, int verbose)
1716 {
1717   svm_fifo_segment_private_t *fifo_segment;
1718   vlib_main_t *vm = vlib_get_main ();
1719   segment_manager_t *sm;
1720   u8 *app_name, *s = 0;
1721   application_t *app;
1722
1723   /* Header */
1724   if (!app_wrk)
1725     {
1726       if (verbose)
1727         vlib_cli_output (vm, "%-40s%-20s%-15s%-10s", "Connection", "App",
1728                          "API Client", "SegManager");
1729       else
1730         vlib_cli_output (vm, "%-40s%-20s", "Connection", "App");
1731       return;
1732     }
1733
1734   app = application_get (app_wrk->app_index);
1735   if (app_wrk->connects_seg_manager == (u32) ~ 0)
1736     return;
1737
1738   app_name = app_get_name_from_reg_index (app);
1739
1740   /* Across all fifo segments */
1741   sm = segment_manager_get (app_wrk->connects_seg_manager);
1742
1743   /* *INDENT-OFF* */
1744   segment_manager_foreach_segment_w_lock (fifo_segment, sm, ({
1745     svm_fifo_t *fifo;
1746     u8 *str;
1747
1748     fifo = svm_fifo_segment_get_fifo_list (fifo_segment);
1749     while (fifo)
1750         {
1751           u32 session_index, thread_index;
1752           stream_session_t *session;
1753
1754           session_index = fifo->master_session_index;
1755           thread_index = fifo->master_thread_index;
1756
1757           session = session_get (session_index, thread_index);
1758           str = format (0, "%U", format_stream_session, session, verbose);
1759
1760           if (verbose)
1761             s = format (s, "%-40s%-20s%-15u%-10u", str, app_name,
1762                         app->api_client_index, app_wrk->connects_seg_manager);
1763           else
1764             s = format (s, "%-40s%-20s", str, app_name);
1765
1766           vlib_cli_output (vm, "%v", s);
1767           vec_reset_length (s);
1768           vec_free (str);
1769
1770           fifo = fifo->next;
1771         }
1772     vec_free (s);
1773   }));
1774   /* *INDENT-ON* */
1775
1776   vec_free (app_name);
1777 }
1778
1779 static void
1780 application_format_connects (application_t * app, int verbose)
1781 {
1782   app_worker_map_t *wrk_map;
1783   app_worker_t *app_wrk;
1784
1785   if (!app)
1786     {
1787       app_worker_format_connects (0, verbose);
1788       return;
1789     }
1790
1791   /* *INDENT-OFF* */
1792   pool_foreach (wrk_map, app->worker_maps, ({
1793     app_wrk = app_worker_get (wrk_map->wrk_index);
1794     app_worker_format_connects (app_wrk, verbose);
1795   }));
1796   /* *INDENT-ON* */
1797 }
1798
1799 static void
1800 app_worker_format_local_sessions (app_worker_t * app_wrk, int verbose)
1801 {
1802   vlib_main_t *vm = vlib_get_main ();
1803   local_session_t *ls;
1804   transport_proto_t tp;
1805   u8 *conn = 0;
1806
1807   /* Header */
1808   if (app_wrk == 0)
1809     {
1810       vlib_cli_output (vm, "%-40s%-15s%-20s", "Connection", "ServerApp",
1811                        "ClientApp");
1812       return;
1813     }
1814
1815   if (!pool_elts (app_wrk->local_sessions)
1816       && !pool_elts (app_wrk->local_connects))
1817     return;
1818
1819   /* *INDENT-OFF* */
1820   pool_foreach (ls, app_wrk->local_listen_sessions, ({
1821     tp = session_type_transport_proto(ls->listener_session_type);
1822     conn = format (0, "[L][%U] *:%u", format_transport_proto_short, tp,
1823                    ls->port);
1824     vlib_cli_output (vm, "%-40v%-15u%-20s", conn, ls->app_wrk_index, "*");
1825     vec_reset_length (conn);
1826   }));
1827   pool_foreach (ls, app_wrk->local_sessions, ({
1828     tp = session_type_transport_proto(ls->listener_session_type);
1829     conn = format (0, "[L][%U] *:%u", format_transport_proto_short, tp,
1830                    ls->port);
1831     vlib_cli_output (vm, "%-40v%-15u%-20u", conn, ls->app_wrk_index,
1832                      ls->client_wrk_index);
1833     vec_reset_length (conn);
1834   }));
1835   /* *INDENT-ON* */
1836
1837   vec_free (conn);
1838 }
1839
1840 static void
1841 application_format_local_sessions (application_t * app, int verbose)
1842 {
1843   app_worker_map_t *wrk_map;
1844   app_worker_t *app_wrk;
1845
1846   if (!app)
1847     {
1848       app_worker_format_local_sessions (0, verbose);
1849       return;
1850     }
1851
1852   /* *INDENT-OFF* */
1853   pool_foreach (wrk_map, app->worker_maps, ({
1854     app_wrk = app_worker_get (wrk_map->wrk_index);
1855     app_worker_format_local_sessions (app_wrk, verbose);
1856   }));
1857   /* *INDENT-ON* */
1858 }
1859
1860 static void
1861 app_worker_format_local_connects (app_worker_t * app, int verbose)
1862 {
1863   vlib_main_t *vm = vlib_get_main ();
1864   u32 app_wrk_index, session_index;
1865   app_worker_t *server_wrk;
1866   local_session_t *ls;
1867   uword client_key;
1868   u64 value;
1869
1870   /* Header */
1871   if (app == 0)
1872     {
1873       if (verbose)
1874         vlib_cli_output (vm, "%-40s%-15s%-20s%-10s", "Connection", "App",
1875                          "Peer App", "SegManager");
1876       else
1877         vlib_cli_output (vm, "%-40s%-15s%-20s", "Connection", "App",
1878                          "Peer App");
1879       return;
1880     }
1881
1882   if (!app->local_connects)
1883     return;
1884
1885   /* *INDENT-OFF* */
1886   hash_foreach (client_key, value, app->local_connects, ({
1887     application_client_local_connect_key_parse (client_key, &app_wrk_index,
1888                                                 &session_index);
1889     server_wrk = app_worker_get (app_wrk_index);
1890     ls = application_get_local_session (server_wrk, session_index);
1891     vlib_cli_output (vm, "%-40s%-15s%-20s", "TODO", ls->app_wrk_index,
1892                      ls->client_wrk_index);
1893   }));
1894   /* *INDENT-ON* */
1895 }
1896
1897 static void
1898 application_format_local_connects (application_t * app, int verbose)
1899 {
1900   app_worker_map_t *wrk_map;
1901   app_worker_t *app_wrk;
1902
1903   if (!app)
1904     {
1905       app_worker_format_local_connects (0, verbose);
1906       return;
1907     }
1908
1909   /* *INDENT-OFF* */
1910   pool_foreach (wrk_map, app->worker_maps, ({
1911     app_wrk = app_worker_get (wrk_map->wrk_index);
1912     app_worker_format_local_connects (app_wrk, verbose);
1913   }));
1914   /* *INDENT-ON* */
1915 }
1916
1917 u8 *
1918 format_application (u8 * s, va_list * args)
1919 {
1920   application_t *app = va_arg (*args, application_t *);
1921   CLIB_UNUSED (int verbose) = va_arg (*args, int);
1922   segment_manager_properties_t *props;
1923   const u8 *app_ns_name;
1924   u8 *app_name;
1925
1926   if (app == 0)
1927     {
1928       if (verbose)
1929         s = format (s, "%-10s%-20s%-15s%-15s%-15s%-15s%-15s", "Index", "Name",
1930                     "API Client", "Namespace", "Add seg size", "Rx-f size",
1931                     "Tx-f size");
1932       else
1933         s = format (s, "%-10s%-20s%-15s%-40s", "Index", "Name", "API Client",
1934                     "Namespace");
1935       return s;
1936     }
1937
1938   app_name = app_get_name (app);
1939   app_ns_name = app_namespace_id_from_index (app->ns_index);
1940   props = application_segment_manager_properties (app);
1941   if (verbose)
1942     s = format (s, "%-10u%-20s%-15d%-15u%-15U%-15U%-15U", app->app_index,
1943                 app_name, app->api_client_index, app->ns_index,
1944                 format_memory_size, props->add_segment_size,
1945                 format_memory_size, props->rx_fifo_size, format_memory_size,
1946                 props->tx_fifo_size);
1947   else
1948     s = format (s, "%-10u%-20s%-15d%-40s", app->app_index, app_name,
1949                 app->api_client_index, app_ns_name);
1950   return s;
1951 }
1952
1953 void
1954 application_format_all_listeners (vlib_main_t * vm, int do_local, int verbose)
1955 {
1956   application_t *app;
1957
1958   if (!pool_elts (app_main.app_pool))
1959     {
1960       vlib_cli_output (vm, "No active server bindings");
1961       return;
1962     }
1963
1964   if (do_local)
1965     {
1966       application_format_local_sessions (0, verbose);
1967       /* *INDENT-OFF* */
1968       pool_foreach (app, app_main.app_pool, ({
1969         application_format_local_sessions (app, verbose);
1970       }));
1971       /* *INDENT-ON* */
1972     }
1973   else
1974     {
1975       application_format_listeners (0, verbose);
1976
1977       /* *INDENT-OFF* */
1978       pool_foreach (app, app_main.app_pool, ({
1979         application_format_listeners (app, verbose);
1980       }));
1981       /* *INDENT-ON* */
1982     }
1983 }
1984
1985 void
1986 application_format_all_clients (vlib_main_t * vm, int do_local, int verbose)
1987 {
1988   application_t *app;
1989
1990   if (!pool_elts (app_main.app_pool))
1991     {
1992       vlib_cli_output (vm, "No active apps");
1993       return;
1994     }
1995
1996   if (do_local)
1997     {
1998       application_format_local_connects (0, verbose);
1999
2000       /* *INDENT-OFF* */
2001       pool_foreach (app, app_main.app_pool, ({
2002         application_format_local_connects (app, verbose);
2003       }));
2004       /* *INDENT-ON* */
2005     }
2006   else
2007     {
2008       application_format_connects (0, verbose);
2009
2010       /* *INDENT-OFF* */
2011       pool_foreach (app, app_main.app_pool, ({
2012         application_format_connects (app, verbose);
2013       }));
2014       /* *INDENT-ON* */
2015     }
2016 }
2017
2018 static clib_error_t *
2019 show_app_command_fn (vlib_main_t * vm, unformat_input_t * input,
2020                      vlib_cli_command_t * cmd)
2021 {
2022   int do_server = 0, do_client = 0, do_local = 0;
2023   application_t *app;
2024   int verbose = 0;
2025
2026   session_cli_return_if_not_enabled ();
2027
2028   while (unformat_check_input (input) != UNFORMAT_END_OF_INPUT)
2029     {
2030       if (unformat (input, "server"))
2031         do_server = 1;
2032       else if (unformat (input, "client"))
2033         do_client = 1;
2034       else if (unformat (input, "local"))
2035         do_local = 1;
2036       else if (unformat (input, "verbose"))
2037         verbose = 1;
2038       else
2039         break;
2040     }
2041
2042   if (do_server)
2043     application_format_all_listeners (vm, do_local, verbose);
2044
2045   if (do_client)
2046     application_format_all_clients (vm, do_local, verbose);
2047
2048   /* Print app related info */
2049   if (!do_server && !do_client)
2050     {
2051       vlib_cli_output (vm, "%U", format_application, 0, verbose);
2052       /* *INDENT-OFF* */
2053       pool_foreach (app, app_main.app_pool, ({
2054         vlib_cli_output (vm, "%U", format_application, app, verbose);
2055       }));
2056       /* *INDENT-ON* */
2057     }
2058
2059   return 0;
2060 }
2061
2062 /* *INDENT-OFF* */
2063 VLIB_CLI_COMMAND (show_app_command, static) =
2064 {
2065   .path = "show app",
2066   .short_help = "show app [server|client] [verbose]",
2067   .function = show_app_command_fn,
2068 };
2069 /* *INDENT-ON* */
2070
2071 /*
2072  * fd.io coding-style-patch-verification: ON
2073  *
2074  * Local Variables:
2075  * eval: (c-set-style "gnu")
2076  * End:
2077  */