session: add support for multiple app workers
[vpp.git] / src / vnet / session / application.c
1 /*
2  * Copyright (c) 2017 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include <vnet/session/application.h>
17 #include <vnet/session/application_interface.h>
18 #include <vnet/session/application_namespace.h>
19 #include <vnet/session/session.h>
20
21 static app_main_t app_main;
22
23 static app_worker_map_t *
24 app_worker_map_alloc (application_t * app)
25 {
26   app_worker_map_t *map;
27   pool_get (app->worker_maps, map);
28   memset (map, 0, sizeof (*map));
29   return map;
30 }
31
32 static u32
33 app_worker_map_index (application_t * app, app_worker_map_t * map)
34 {
35   return (map - app->worker_maps);
36 }
37
38 static void
39 app_worker_map_free (application_t * app, app_worker_map_t * map)
40 {
41   pool_put (app->worker_maps, map);
42 }
43
44 static app_worker_map_t *
45 app_worker_map_get (application_t * app, u32 map_index)
46 {
47   return pool_elt_at_index (app->worker_maps, map_index);
48 }
49
50 static u8 *
51 app_get_name_from_reg_index (application_t * app)
52 {
53   u8 *app_name;
54
55   vl_api_registration_t *regp;
56   regp = vl_api_client_index_to_registration (app->api_client_index);
57   if (!regp)
58     app_name = format (0, "builtin-%d%c", app->app_index, 0);
59   else
60     app_name = format (0, "%s%c", regp->name, 0);
61
62   return app_name;
63 }
64
65 static u8 *
66 app_get_name (application_t * app)
67 {
68   if (!app->name)
69     return app_get_name_from_reg_index (app);
70   return app->name;
71 }
72
73 u32
74 application_session_table (application_t * app, u8 fib_proto)
75 {
76   app_namespace_t *app_ns;
77   app_ns = app_namespace_get (app->ns_index);
78   if (!application_has_global_scope (app))
79     return APP_INVALID_INDEX;
80   if (fib_proto == FIB_PROTOCOL_IP4)
81     return session_lookup_get_index_for_fib (fib_proto,
82                                              app_ns->ip4_fib_index);
83   else
84     return session_lookup_get_index_for_fib (fib_proto,
85                                              app_ns->ip6_fib_index);
86 }
87
88 u32
89 application_local_session_table (application_t * app)
90 {
91   app_namespace_t *app_ns;
92   if (!application_has_local_scope (app))
93     return APP_INVALID_INDEX;
94   app_ns = app_namespace_get (app->ns_index);
95   return app_ns->local_table_index;
96 }
97
98 int
99 application_api_queue_is_full (application_t * app)
100 {
101   svm_queue_t *q;
102
103   /* builtin servers are always OK */
104   if (app->api_client_index == ~0)
105     return 0;
106
107   q = vl_api_client_index_to_input_queue (app->api_client_index);
108   if (!q)
109     return 1;
110
111   if (q->cursize == q->maxsize)
112     return 1;
113   return 0;
114 }
115
116 /**
117  * Returns app name
118  *
119  * Since the name is not stored per app, we generate it on the fly. It is
120  * the caller's responsibility to free the vector
121  */
122 u8 *
123 application_name_from_index (u32 app_index)
124 {
125   application_t *app = application_get (app_index);
126   if (!app)
127     return 0;
128   return app_get_name_from_reg_index (app);
129 }
130
131 static void
132 application_table_add (application_t * app)
133 {
134   if (app->api_client_index != APP_INVALID_INDEX)
135     hash_set (app_main.app_by_api_client_index, app->api_client_index,
136               app->app_index);
137   else if (app->name)
138     hash_set_mem (app_main.app_by_name, app->name, app->app_index);
139 }
140
141 static void
142 application_table_del (application_t * app)
143 {
144   if (app->api_client_index != APP_INVALID_INDEX)
145     hash_unset (app_main.app_by_api_client_index, app->api_client_index);
146   else if (app->name)
147     hash_unset_mem (app_main.app_by_name, app->name);
148 }
149
150 application_t *
151 application_lookup (u32 api_client_index)
152 {
153   uword *p;
154   p = hash_get (app_main.app_by_api_client_index, api_client_index);
155   if (p)
156     return application_get (p[0]);
157
158   return 0;
159 }
160
161 application_t *
162 application_lookup_name (const u8 * name)
163 {
164   uword *p;
165   p = hash_get_mem (app_main.app_by_name, name);
166   if (p)
167     return application_get (p[0]);
168
169   return 0;
170 }
171
172 application_t *
173 application_alloc (void)
174 {
175   application_t *app;
176   pool_get (app_main.app_pool, app);
177   memset (app, 0, sizeof (*app));
178   app->app_index = app - app_main.app_pool;
179   return app;
180 }
181
182 application_t *
183 application_get (u32 app_index)
184 {
185   if (app_index == APP_INVALID_INDEX)
186     return 0;
187   return pool_elt_at_index (app_main.app_pool, app_index);
188 }
189
190 application_t *
191 application_get_if_valid (u32 app_index)
192 {
193   if (pool_is_free_index (app_main.app_pool, app_index))
194     return 0;
195
196   return pool_elt_at_index (app_main.app_pool, app_index);
197 }
198
199 u32
200 application_index (application_t * app)
201 {
202   return app - app_main.app_pool;
203 }
204
205 static void
206 application_verify_cb_fns (session_cb_vft_t * cb_fns)
207 {
208   if (cb_fns->session_accept_callback == 0)
209     clib_warning ("No accept callback function provided");
210   if (cb_fns->session_connected_callback == 0)
211     clib_warning ("No session connected callback function provided");
212   if (cb_fns->session_disconnect_callback == 0)
213     clib_warning ("No session disconnect callback function provided");
214   if (cb_fns->session_reset_callback == 0)
215     clib_warning ("No session reset callback function provided");
216 }
217
218 /**
219  * Check app config for given segment type
220  *
221  * Returns 1 on success and 0 otherwise
222  */
223 static u8
224 application_verify_cfg (ssvm_segment_type_t st)
225 {
226   u8 is_valid;
227   if (st == SSVM_SEGMENT_MEMFD)
228     {
229       is_valid = (session_manager_get_evt_q_segment () != 0);
230       if (!is_valid)
231         clib_warning ("memfd seg: vpp's event qs IN binary api svm region");
232       return is_valid;
233     }
234   else if (st == SSVM_SEGMENT_SHM)
235     {
236       is_valid = (session_manager_get_evt_q_segment () == 0);
237       if (!is_valid)
238         clib_warning ("shm seg: vpp's event qs NOT IN binary api svm region");
239       return is_valid;
240     }
241   else
242     return 1;
243 }
244
245 int
246 application_alloc_and_init (app_init_args_t * a)
247 {
248   ssvm_segment_type_t seg_type = SSVM_SEGMENT_MEMFD;
249   segment_manager_properties_t *props;
250   vl_api_registration_t *reg;
251   application_t *app;
252   u64 *options;
253
254   app = application_alloc ();
255   options = a->options;
256   /*
257    * Make sure we support the requested configuration
258    */
259   if (!(options[APP_OPTIONS_FLAGS] & APP_OPTIONS_FLAGS_IS_BUILTIN))
260     {
261       reg = vl_api_client_index_to_registration (a->api_client_index);
262       if (!reg)
263         return VNET_API_ERROR_APP_UNSUPPORTED_CFG;
264       if (vl_api_registration_file_index (reg) == VL_API_INVALID_FI)
265         seg_type = SSVM_SEGMENT_SHM;
266     }
267   else
268     {
269       if (options[APP_OPTIONS_FLAGS] & APP_OPTIONS_FLAGS_EVT_MQ_USE_EVENTFD)
270         {
271           clib_warning ("mq eventfds can only be used if socket transport is "
272                         "used for api");
273           return VNET_API_ERROR_APP_UNSUPPORTED_CFG;
274         }
275       seg_type = SSVM_SEGMENT_PRIVATE;
276     }
277
278   if (!application_verify_cfg (seg_type))
279     return VNET_API_ERROR_APP_UNSUPPORTED_CFG;
280
281   /* Check that the obvious things are properly set up */
282   application_verify_cb_fns (a->session_cb_vft);
283
284   app->api_client_index = a->api_client_index;
285   app->flags = options[APP_OPTIONS_FLAGS];
286   app->cb_fns = *a->session_cb_vft;
287   app->ns_index = options[APP_OPTIONS_NAMESPACE];
288   app->proxied_transports = options[APP_OPTIONS_PROXY_TRANSPORT];
289   app->name = vec_dup (a->name);
290
291   /* If no scope enabled, default to global */
292   if (!application_has_global_scope (app)
293       && !application_has_local_scope (app))
294     app->flags |= APP_OPTIONS_FLAGS_USE_GLOBAL_SCOPE;
295
296   props = application_segment_manager_properties (app);
297   segment_manager_properties_init (props);
298   props->segment_size = options[APP_OPTIONS_ADD_SEGMENT_SIZE];
299   props->prealloc_fifos = options[APP_OPTIONS_PREALLOC_FIFO_PAIRS];
300   if (options[APP_OPTIONS_ADD_SEGMENT_SIZE])
301     {
302       props->add_segment_size = options[APP_OPTIONS_ADD_SEGMENT_SIZE];
303       props->add_segment = 1;
304     }
305   if (options[APP_OPTIONS_RX_FIFO_SIZE])
306     props->rx_fifo_size = options[APP_OPTIONS_RX_FIFO_SIZE];
307   if (options[APP_OPTIONS_TX_FIFO_SIZE])
308     props->tx_fifo_size = options[APP_OPTIONS_TX_FIFO_SIZE];
309   if (options[APP_OPTIONS_EVT_QUEUE_SIZE])
310     props->evt_q_size = options[APP_OPTIONS_EVT_QUEUE_SIZE];
311   if (options[APP_OPTIONS_FLAGS] & APP_OPTIONS_FLAGS_EVT_MQ_USE_EVENTFD)
312     props->use_mq_eventfd = 1;
313   if (options[APP_OPTIONS_TLS_ENGINE])
314     app->tls_engine = options[APP_OPTIONS_TLS_ENGINE];
315   props->segment_type = seg_type;
316
317   /* Add app to lookup by api_client_index table */
318   application_table_add (app);
319   a->app_index = application_index (app);
320
321   APP_DBG ("New app name: %v api index: %u index %u", app->name,
322            app->api_client_index, app->app_index);
323
324   return 0;
325 }
326
327 void
328 application_free (application_t * app)
329 {
330   app_worker_map_t *wrk_map;
331   app_worker_t *app_wrk;
332
333   /*
334    * The app event queue allocated in first segment is cleared with
335    * the segment manager. No need to explicitly free it.
336    */
337   APP_DBG ("Delete app name %v api index: %d index: %d", app->name,
338            app->api_client_index, app->app_index);
339
340   if (application_is_proxy (app))
341     application_remove_proxy (app);
342
343   /* *INDENT-OFF* */
344   pool_flush (wrk_map, app->worker_maps, ({
345     app_wrk = app_worker_get (wrk_map->wrk_index);
346     app_worker_free (app_wrk);
347   }));
348   /* *INDENT-ON* */
349   pool_free (app->worker_maps);
350
351   application_table_del (app);
352   vec_free (app->name);
353   vec_free (app->tls_cert);
354   vec_free (app->tls_key);
355   pool_put (app_main.app_pool, app);
356 }
357
358 app_worker_t *
359 application_get_worker (application_t * app, u32 wrk_map_index)
360 {
361   app_worker_map_t *map;
362   map = app_worker_map_get (app, wrk_map_index);
363   if (!map)
364     return 0;
365   return app_worker_get (map->wrk_index);
366 }
367
368 app_worker_t *
369 application_get_default_worker (application_t * app)
370 {
371   return application_get_worker (app, 0);
372 }
373
374 app_worker_t *
375 app_worker_alloc (application_t * app)
376 {
377   app_worker_t *app_wrk;
378   pool_get (app_main.workers, app_wrk);
379   memset (app_wrk, 0, sizeof (*app_wrk));
380   app_wrk->wrk_index = app_wrk - app_main.workers;
381   app_wrk->app_index = app->app_index;
382   app_wrk->wrk_map_index = ~0;
383   app_wrk->connects_seg_manager = APP_INVALID_SEGMENT_MANAGER_INDEX;
384   app_wrk->first_segment_manager = APP_INVALID_SEGMENT_MANAGER_INDEX;
385   app_wrk->local_segment_manager = APP_INVALID_SEGMENT_MANAGER_INDEX;
386   APP_DBG ("New app %v worker %u", app_get_name (app), app_wrk->wrk_index);
387   return app_wrk;
388 }
389
390 app_worker_t *
391 app_worker_get (u32 wrk_index)
392 {
393   return pool_elt_at_index (app_main.workers, wrk_index);
394 }
395
396 app_worker_t *
397 app_worker_get_if_valid (u32 wrk_index)
398 {
399   if (pool_is_free_index (app_main.workers, wrk_index))
400     return 0;
401   return pool_elt_at_index (app_main.workers, wrk_index);
402 }
403
404 void
405 app_worker_free (app_worker_t * app_wrk)
406 {
407   application_t *app = application_get (app_wrk->app_index);
408   vnet_unbind_args_t _a, *a = &_a;
409   u64 handle, *handles = 0;
410   segment_manager_t *sm;
411   u32 sm_index;
412   int i;
413
414   /*
415    *  Listener cleanup
416    */
417
418   /* *INDENT-OFF* */
419   hash_foreach (handle, sm_index, app_wrk->listeners_table,
420   ({
421     vec_add1 (handles, handle);
422     sm = segment_manager_get (sm_index);
423     sm->app_wrk_index = SEGMENT_MANAGER_INVALID_APP_INDEX;
424   }));
425   /* *INDENT-ON* */
426
427   for (i = 0; i < vec_len (handles); i++)
428     {
429       a->app_index = app->app_index;
430       a->app_wrk_index = app_wrk->wrk_map_index;
431       a->handle = handles[i];
432       /* seg manager is removed when unbind completes */
433       vnet_unbind (a);
434     }
435
436   /*
437    * Connects segment manager cleanup
438    */
439
440   if (app_wrk->connects_seg_manager != APP_INVALID_SEGMENT_MANAGER_INDEX)
441     {
442       sm = segment_manager_get (app_wrk->connects_seg_manager);
443       sm->app_wrk_index = SEGMENT_MANAGER_INVALID_APP_INDEX;
444       segment_manager_init_del (sm);
445     }
446
447   /* If first segment manager is used by a listener */
448   if (app_wrk->first_segment_manager != APP_INVALID_SEGMENT_MANAGER_INDEX
449       && app_wrk->first_segment_manager != app_wrk->connects_seg_manager)
450     {
451       sm = segment_manager_get (app_wrk->first_segment_manager);
452       /* .. and has no fifos, e.g. it might be used for redirected sessions,
453        * remove it */
454       if (!segment_manager_has_fifos (sm))
455         {
456           sm->app_wrk_index = SEGMENT_MANAGER_INVALID_APP_INDEX;
457           segment_manager_del (sm);
458         }
459     }
460
461   /*
462    * Local sessions
463    */
464   application_local_sessions_free (app_wrk);
465
466   pool_put (app_main.workers, app_wrk);
467   if (CLIB_DEBUG)
468     memset (app_wrk, 0xfe, sizeof (*app_wrk));
469 }
470
471 int
472 app_worker_alloc_and_init (application_t * app, app_worker_t ** wrk)
473 {
474   app_worker_map_t *wrk_map;
475   app_worker_t *app_wrk;
476   segment_manager_t *sm;
477   int rv;
478
479   app_wrk = app_worker_alloc (app);
480   wrk_map = app_worker_map_alloc (app);
481   wrk_map->wrk_index = app_wrk->wrk_index;
482   app_wrk->wrk_map_index = app_worker_map_index (app, wrk_map);
483
484   /*
485    * Setup first segment manager
486    */
487   sm = segment_manager_new ();
488   sm->app_wrk_index = app_wrk->wrk_index;
489
490   if ((rv = segment_manager_init (sm, app->sm_properties.segment_size,
491                                   app->sm_properties.prealloc_fifos)))
492     {
493       app_worker_free (app_wrk);
494       return rv;
495     }
496   sm->first_is_protected = 1;
497
498   /*
499    * Setup app worker
500    */
501   app_wrk->first_segment_manager = segment_manager_index (sm);
502   app_wrk->listeners_table = hash_create (0, sizeof (u64));
503   app_wrk->event_queue = segment_manager_event_queue (sm);
504   app_wrk->app_is_builtin = application_is_builtin (app);
505
506   /*
507    * Segment manager for local sessions
508    */
509   sm = segment_manager_new ();
510   sm->app_wrk_index = app_wrk->wrk_index;
511   app_wrk->local_segment_manager = segment_manager_index (sm);
512   app_wrk->local_connects = hash_create (0, sizeof (u64));
513
514   *wrk = app_wrk;
515
516   return 0;
517 }
518
519 static segment_manager_t *
520 application_alloc_segment_manager (app_worker_t * app_wrk)
521 {
522   segment_manager_t *sm = 0;
523
524   /* If the first segment manager is not in use, don't allocate a new one */
525   if (app_wrk->first_segment_manager != APP_INVALID_SEGMENT_MANAGER_INDEX
526       && app_wrk->first_segment_manager_in_use == 0)
527     {
528       sm = segment_manager_get (app_wrk->first_segment_manager);
529       app_wrk->first_segment_manager_in_use = 1;
530       return sm;
531     }
532
533   sm = segment_manager_new ();
534   sm->app_wrk_index = app_wrk->wrk_index;
535
536   return sm;
537 }
538
539 /**
540  * Start listening local transport endpoint for requested transport.
541  *
542  * Creates a 'dummy' stream session with state LISTENING to be used in session
543  * lookups, prior to establishing connection. Requests transport to build
544  * it's own specific listening connection.
545  */
546 int
547 app_worker_start_listen (app_worker_t * app_wrk, session_endpoint_t * sep,
548                          session_handle_t * res)
549 {
550   segment_manager_t *sm;
551   stream_session_t *s;
552   session_handle_t handle;
553   session_type_t sst;
554
555   sst = session_type_from_proto_and_ip (sep->transport_proto, sep->is_ip4);
556   s = listen_session_new (0, sst);
557   s->app_wrk_index = app_wrk->wrk_index;
558
559   /* Allocate segment manager. All sessions derived out of a listen session
560    * have fifos allocated by the same segment manager. */
561   if (!(sm = application_alloc_segment_manager (app_wrk)))
562     goto err;
563
564   /* Add to app's listener table. Useful to find all child listeners
565    * when app goes down, although, just for unbinding this is not needed */
566   handle = listen_session_get_handle (s);
567   hash_set (app_wrk->listeners_table, handle, segment_manager_index (sm));
568
569   if (stream_session_listen (s, sep))
570     {
571       segment_manager_del (sm);
572       hash_unset (app_wrk->listeners_table, handle);
573       goto err;
574     }
575
576   *res = handle;
577   return 0;
578
579 err:
580   listen_session_del (s);
581   return -1;
582 }
583
584 /**
585  * Stop listening on session associated to handle
586  *
587  * @param handle        listener handle
588  * @param app_index     index of the app owning the handle. This is used
589  *                      only for validating ownership
590  */
591 int
592 app_worker_stop_listen (session_handle_t handle, u32 app_index)
593 {
594   stream_session_t *listener;
595   segment_manager_t *sm;
596   app_worker_t *app_wrk;
597   uword *indexp;
598
599   listener = listen_session_get_from_handle (handle);
600   app_wrk = app_worker_get (listener->app_wrk_index);
601   if (PREDICT_FALSE (!app_wrk || app_wrk->app_index != app_index))
602     {
603       clib_warning ("app doesn't own handle %llu!", handle);
604       return -1;
605     }
606   if (PREDICT_FALSE (hash_get (app_wrk->listeners_table, handle) == 0))
607     {
608       clib_warning ("listener handle was removed %llu!", handle);
609       return -1;
610     }
611
612   stream_session_stop_listen (listener);
613
614   indexp = hash_get (app_wrk->listeners_table, handle);
615   ASSERT (indexp);
616
617   sm = segment_manager_get (*indexp);
618   if (app_wrk->first_segment_manager == *indexp)
619     {
620       /* Delete sessions but don't remove segment manager */
621       app_wrk->first_segment_manager_in_use = 0;
622       segment_manager_del_sessions (sm);
623     }
624   else
625     {
626       segment_manager_init_del (sm);
627     }
628   hash_unset (app_wrk->listeners_table, handle);
629   listen_session_del (listener);
630
631   return 0;
632 }
633
634 int
635 app_worker_open_session (app_worker_t * app, session_endpoint_t * sep,
636                          u32 api_context)
637 {
638   int rv;
639
640   /* Make sure we have a segment manager for connects */
641   app_worker_alloc_connects_segment_manager (app);
642
643   if ((rv = session_open (app->wrk_index, sep, api_context)))
644     return rv;
645
646   return 0;
647 }
648
649 int
650 app_worker_alloc_connects_segment_manager (app_worker_t * app_wrk)
651 {
652   segment_manager_t *sm;
653
654   if (app_wrk->connects_seg_manager == APP_INVALID_SEGMENT_MANAGER_INDEX)
655     {
656       sm = application_alloc_segment_manager (app_wrk);
657       if (sm == 0)
658         return -1;
659       app_wrk->connects_seg_manager = segment_manager_index (sm);
660     }
661   return 0;
662 }
663
664 segment_manager_t *
665 app_worker_get_connect_segment_manager (app_worker_t * app)
666 {
667   ASSERT (app->connects_seg_manager != (u32) ~ 0);
668   return segment_manager_get (app->connects_seg_manager);
669 }
670
671 segment_manager_t *
672 app_worker_get_listen_segment_manager (app_worker_t * app,
673                                        stream_session_t * s)
674 {
675   uword *smp;
676   smp = hash_get (app->listeners_table, listen_session_get_handle (s));
677   ASSERT (smp != 0);
678   return segment_manager_get (*smp);
679 }
680
681 clib_error_t *
682 vnet_app_worker_add_del (vnet_app_worker_add_del_args_t * a)
683 {
684   svm_fifo_segment_private_t *fs;
685   app_worker_map_t *wrk_map;
686   app_worker_t *app_wrk;
687   segment_manager_t *sm;
688   application_t *app;
689   int rv;
690
691   app = application_get (a->app_index);
692   if (!app)
693     return clib_error_return_code (0, VNET_API_ERROR_INVALID_VALUE, 0,
694                                    "App %u does not exist", a->app_index);
695
696   if (a->is_add)
697     {
698       if ((rv = app_worker_alloc_and_init (app, &app_wrk)))
699         return clib_error_return_code (0, rv, 0, "app wrk init: %d", rv);
700       sm = segment_manager_get (app_wrk->first_segment_manager);
701       fs = segment_manager_get_segment_w_lock (sm, 0);
702       a->segment = &fs->ssvm;
703       segment_manager_segment_reader_unlock (sm);
704       a->evt_q = app_wrk->event_queue;
705     }
706   else
707     {
708       wrk_map = app_worker_map_get (app, a->wrk_index);
709       if (!wrk_map)
710         return clib_error_return_code (0, VNET_API_ERROR_INVALID_VALUE, 0,
711                                        "App %u does not have worker %u",
712                                        app->app_index, a->wrk_index);
713       app_wrk = app_worker_get (wrk_map->wrk_index);
714       app_worker_map_free (app, wrk_map);
715       if (!app_wrk)
716         return clib_error_return_code (0, VNET_API_ERROR_INVALID_VALUE, 0,
717                                        "No worker %u", a->wrk_index);
718       app_worker_free (app_wrk);
719     }
720   return 0;
721 }
722
723 segment_manager_t *
724 application_get_local_segment_manager (app_worker_t * app)
725 {
726   return segment_manager_get (app->local_segment_manager);
727 }
728
729 segment_manager_t *
730 application_get_local_segment_manager_w_session (app_worker_t * app,
731                                                  local_session_t * ls)
732 {
733   stream_session_t *listener;
734   if (application_local_session_listener_has_transport (ls))
735     {
736       listener = listen_session_get (ls->listener_index);
737       return app_worker_get_listen_segment_manager (app, listener);
738     }
739   return segment_manager_get (app->local_segment_manager);
740 }
741
742 int
743 application_is_proxy (application_t * app)
744 {
745   return (app->flags & APP_OPTIONS_FLAGS_IS_PROXY);
746 }
747
748 int
749 application_is_builtin (application_t * app)
750 {
751   return (app->flags & APP_OPTIONS_FLAGS_IS_BUILTIN);
752 }
753
754 int
755 application_is_builtin_proxy (application_t * app)
756 {
757   return (application_is_proxy (app) && application_is_builtin (app));
758 }
759
760 u8
761 application_has_local_scope (application_t * app)
762 {
763   return app->flags & APP_OPTIONS_FLAGS_USE_LOCAL_SCOPE;
764 }
765
766 u8
767 application_has_global_scope (application_t * app)
768 {
769   return app->flags & APP_OPTIONS_FLAGS_USE_GLOBAL_SCOPE;
770 }
771
772 /**
773  * Send an API message to the external app, to map new segment
774  */
775 int
776 app_worker_add_segment_notify (u32 app_wrk_index, ssvm_private_t * fs)
777 {
778   app_worker_t *app_wrk = app_worker_get (app_wrk_index);
779   application_t *app = application_get (app_wrk->app_index);
780   return app->cb_fns.add_segment_callback (app->api_client_index, fs);
781 }
782
783 u32
784 application_n_listeners (app_worker_t * app)
785 {
786   return hash_elts (app->listeners_table);
787 }
788
789 stream_session_t *
790 app_worker_first_listener (app_worker_t * app, u8 fib_proto,
791                            u8 transport_proto)
792 {
793   stream_session_t *listener;
794   u64 handle;
795   u32 sm_index;
796   u8 sst;
797
798   sst = session_type_from_proto_and_ip (transport_proto,
799                                         fib_proto == FIB_PROTOCOL_IP4);
800
801   /* *INDENT-OFF* */
802    hash_foreach (handle, sm_index, app->listeners_table, ({
803      listener = listen_session_get_from_handle (handle);
804      if (listener->session_type == sst
805          && listener->listener_index != SESSION_PROXY_LISTENER_INDEX)
806        return listener;
807    }));
808   /* *INDENT-ON* */
809
810   return 0;
811 }
812
813 u8
814 app_worker_application_is_builtin (app_worker_t * app_wrk)
815 {
816   return app_wrk->app_is_builtin;
817 }
818
819 stream_session_t *
820 application_proxy_listener (app_worker_t * app, u8 fib_proto,
821                             u8 transport_proto)
822 {
823   stream_session_t *listener;
824   u64 handle;
825   u32 sm_index;
826   u8 sst;
827
828   sst = session_type_from_proto_and_ip (transport_proto,
829                                         fib_proto == FIB_PROTOCOL_IP4);
830
831   /* *INDENT-OFF* */
832    hash_foreach (handle, sm_index, app->listeners_table, ({
833      listener = listen_session_get_from_handle (handle);
834      if (listener->session_type == sst
835          && listener->listener_index == SESSION_PROXY_LISTENER_INDEX)
836        return listener;
837    }));
838   /* *INDENT-ON* */
839
840   return 0;
841 }
842
843 static clib_error_t *
844 application_start_stop_proxy_fib_proto (application_t * app, u8 fib_proto,
845                                         u8 transport_proto, u8 is_start)
846 {
847   app_namespace_t *app_ns = app_namespace_get (app->ns_index);
848   u8 is_ip4 = (fib_proto == FIB_PROTOCOL_IP4);
849   session_endpoint_t sep = SESSION_ENDPOINT_NULL;
850   transport_connection_t *tc;
851   app_worker_t *app_wrk;
852   stream_session_t *s;
853   u64 handle;
854
855   /* TODO decide if we want proxy to be enabled for all workers */
856   app_wrk = application_get_default_worker (app);
857   if (is_start)
858     {
859       s = app_worker_first_listener (app_wrk, fib_proto, transport_proto);
860       if (!s)
861         {
862           sep.is_ip4 = is_ip4;
863           sep.fib_index = app_namespace_get_fib_index (app_ns, fib_proto);
864           sep.sw_if_index = app_ns->sw_if_index;
865           sep.transport_proto = transport_proto;
866           app_worker_start_listen (app_wrk, &sep, &handle);
867           s = listen_session_get_from_handle (handle);
868           s->listener_index = SESSION_PROXY_LISTENER_INDEX;
869         }
870     }
871   else
872     {
873       s = application_proxy_listener (app_wrk, fib_proto, transport_proto);
874       ASSERT (s);
875     }
876
877   tc = listen_session_get_transport (s);
878
879   if (!ip_is_zero (&tc->lcl_ip, 1))
880     {
881       u32 sti;
882       sep.is_ip4 = is_ip4;
883       sep.fib_index = app_namespace_get_fib_index (app_ns, fib_proto);
884       sep.transport_proto = transport_proto;
885       sep.port = 0;
886       sti = session_lookup_get_index_for_fib (fib_proto, sep.fib_index);
887       if (is_start)
888         session_lookup_add_session_endpoint (sti, &sep, s->session_index);
889       else
890         session_lookup_del_session_endpoint (sti, &sep);
891     }
892
893   return 0;
894 }
895
896 static void
897 application_start_stop_proxy_local_scope (application_t * app,
898                                           u8 transport_proto, u8 is_start)
899 {
900   session_endpoint_t sep = SESSION_ENDPOINT_NULL;
901   app_namespace_t *app_ns;
902   app_ns = app_namespace_get (app->ns_index);
903   sep.is_ip4 = 1;
904   sep.transport_proto = transport_proto;
905   sep.port = 0;
906
907   if (is_start)
908     {
909       session_lookup_add_session_endpoint (app_ns->local_table_index, &sep,
910                                            app->app_index);
911       sep.is_ip4 = 0;
912       session_lookup_add_session_endpoint (app_ns->local_table_index, &sep,
913                                            app->app_index);
914     }
915   else
916     {
917       session_lookup_del_session_endpoint (app_ns->local_table_index, &sep);
918       sep.is_ip4 = 0;
919       session_lookup_del_session_endpoint (app_ns->local_table_index, &sep);
920     }
921 }
922
923 void
924 application_start_stop_proxy (application_t * app,
925                               transport_proto_t transport_proto, u8 is_start)
926 {
927   if (application_has_local_scope (app))
928     application_start_stop_proxy_local_scope (app, transport_proto, is_start);
929
930   if (application_has_global_scope (app))
931     {
932       application_start_stop_proxy_fib_proto (app, FIB_PROTOCOL_IP4,
933                                               transport_proto, is_start);
934       application_start_stop_proxy_fib_proto (app, FIB_PROTOCOL_IP6,
935                                               transport_proto, is_start);
936     }
937 }
938
939 void
940 application_setup_proxy (application_t * app)
941 {
942   u16 transports = app->proxied_transports;
943   transport_proto_t tp;
944
945   ASSERT (application_is_proxy (app));
946
947   /* *INDENT-OFF* */
948   transport_proto_foreach (tp, ({
949     if (transports & (1 << tp))
950       application_start_stop_proxy (app, tp, 1);
951   }));
952   /* *INDENT-ON* */
953 }
954
955 void
956 application_remove_proxy (application_t * app)
957 {
958   u16 transports = app->proxied_transports;
959   transport_proto_t tp;
960
961   ASSERT (application_is_proxy (app));
962
963   /* *INDENT-OFF* */
964   transport_proto_foreach (tp, ({
965     if (transports & (1 << tp))
966       application_start_stop_proxy (app, tp, 0);
967   }));
968   /* *INDENT-ON* */
969 }
970
971 segment_manager_properties_t *
972 application_segment_manager_properties (application_t * app)
973 {
974   return &app->sm_properties;
975 }
976
977 segment_manager_properties_t *
978 application_get_segment_manager_properties (u32 app_index)
979 {
980   application_t *app = application_get (app_index);
981   return &app->sm_properties;
982 }
983
984 static inline int
985 app_enqueue_evt (svm_msg_q_t * mq, svm_msg_q_msg_t * msg, u8 lock)
986 {
987   if (PREDICT_FALSE (svm_msg_q_is_full (mq)))
988     {
989       clib_warning ("evt q full");
990       svm_msg_q_free_msg (mq, msg);
991       if (lock)
992         svm_msg_q_unlock (mq);
993       return -1;
994     }
995
996   if (lock)
997     {
998       svm_msg_q_add_and_unlock (mq, msg);
999       return 0;
1000     }
1001
1002   /* Even when not locking the ring, we must wait for queue mutex */
1003   if (svm_msg_q_add (mq, msg, SVM_Q_WAIT))
1004     {
1005       clib_warning ("msg q add returned");
1006       return -1;
1007     }
1008   return 0;
1009 }
1010
1011 static inline int
1012 app_send_io_evt_rx (app_worker_t * app_wrk, stream_session_t * s, u8 lock)
1013 {
1014   session_event_t *evt;
1015   svm_msg_q_msg_t msg;
1016   svm_msg_q_t *mq;
1017
1018   if (PREDICT_FALSE (s->session_state != SESSION_STATE_READY
1019                      && s->session_state != SESSION_STATE_LISTENING))
1020     {
1021       /* Session is closed so app will never clean up. Flush rx fifo */
1022       if (s->session_state == SESSION_STATE_CLOSED)
1023         svm_fifo_dequeue_drop_all (s->server_rx_fifo);
1024       return 0;
1025     }
1026
1027   if (app_worker_application_is_builtin (app_wrk))
1028     {
1029       application_t *app = application_get (app_wrk->app_index);
1030       return app->cb_fns.builtin_app_rx_callback (s);
1031     }
1032
1033   if (svm_fifo_has_event (s->server_rx_fifo)
1034       || svm_fifo_is_empty (s->server_rx_fifo))
1035     return 0;
1036
1037   mq = app_wrk->event_queue;
1038   if (lock)
1039     svm_msg_q_lock (mq);
1040
1041   if (PREDICT_FALSE (svm_msg_q_ring_is_full (mq, SESSION_MQ_IO_EVT_RING)))
1042     {
1043       clib_warning ("evt q rings full");
1044       if (lock)
1045         svm_msg_q_unlock (mq);
1046       return -1;
1047     }
1048
1049   msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_IO_EVT_RING);
1050   ASSERT (!svm_msg_q_msg_is_invalid (&msg));
1051
1052   evt = (session_event_t *) svm_msg_q_msg_data (mq, &msg);
1053   evt->fifo = s->server_rx_fifo;
1054   evt->event_type = FIFO_EVENT_APP_RX;
1055
1056   if (app_enqueue_evt (mq, &msg, lock))
1057     return -1;
1058   (void) svm_fifo_set_event (s->server_rx_fifo);
1059   return 0;
1060 }
1061
1062 static inline int
1063 app_send_io_evt_tx (app_worker_t * app_wrk, stream_session_t * s, u8 lock)
1064 {
1065   svm_msg_q_t *mq;
1066   session_event_t *evt;
1067   svm_msg_q_msg_t msg;
1068
1069   if (app_worker_application_is_builtin (app_wrk))
1070     return 0;
1071
1072   mq = app_wrk->event_queue;
1073   if (lock)
1074     svm_msg_q_lock (mq);
1075
1076   if (PREDICT_FALSE (svm_msg_q_ring_is_full (mq, SESSION_MQ_IO_EVT_RING)))
1077     {
1078       clib_warning ("evt q rings full");
1079       if (lock)
1080         svm_msg_q_unlock (mq);
1081       return -1;
1082     }
1083
1084   msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_IO_EVT_RING);
1085   ASSERT (!svm_msg_q_msg_is_invalid (&msg));
1086
1087   evt = (session_event_t *) svm_msg_q_msg_data (mq, &msg);
1088   evt->event_type = FIFO_EVENT_APP_TX;
1089   evt->fifo = s->server_tx_fifo;
1090
1091   return app_enqueue_evt (mq, &msg, lock);
1092 }
1093
1094 /* *INDENT-OFF* */
1095 typedef int (app_send_evt_handler_fn) (app_worker_t *app,
1096                                        stream_session_t *s,
1097                                        u8 lock);
1098 static app_send_evt_handler_fn * const app_send_evt_handler_fns[3] = {
1099     app_send_io_evt_rx,
1100     0,
1101     app_send_io_evt_tx,
1102 };
1103 /* *INDENT-ON* */
1104
1105 /**
1106  * Send event to application
1107  *
1108  * Logic from queue perspective is non-blocking. That is, if there's
1109  * not enough space to enqueue a message, we return. However, if the lock
1110  * flag is set, we do wait for queue mutex.
1111  */
1112 int
1113 application_send_event (app_worker_t * app, stream_session_t * s, u8 evt_type)
1114 {
1115   ASSERT (app && evt_type <= FIFO_EVENT_APP_TX);
1116   return app_send_evt_handler_fns[evt_type] (app, s, 0 /* lock */ );
1117 }
1118
1119 int
1120 application_lock_and_send_event (app_worker_t * app, stream_session_t * s,
1121                                  u8 evt_type)
1122 {
1123   return app_send_evt_handler_fns[evt_type] (app, s, 1 /* lock */ );
1124 }
1125
1126 local_session_t *
1127 application_alloc_local_session (app_worker_t * app)
1128 {
1129   local_session_t *s;
1130   pool_get (app->local_sessions, s);
1131   memset (s, 0, sizeof (*s));
1132   s->app_wrk_index = app->app_index;
1133   s->session_index = s - app->local_sessions;
1134   s->session_type = session_type_from_proto_and_ip (TRANSPORT_PROTO_NONE, 0);
1135   return s;
1136 }
1137
1138 void
1139 application_free_local_session (app_worker_t * app, local_session_t * s)
1140 {
1141   pool_put (app->local_sessions, s);
1142   if (CLIB_DEBUG)
1143     memset (s, 0xfc, sizeof (*s));
1144 }
1145
1146 local_session_t *
1147 application_get_local_session (app_worker_t * app_wrk, u32 session_index)
1148 {
1149   if (pool_is_free_index (app_wrk->local_sessions, session_index))
1150     return 0;
1151   return pool_elt_at_index (app_wrk->local_sessions, session_index);
1152 }
1153
1154 local_session_t *
1155 application_get_local_session_from_handle (session_handle_t handle)
1156 {
1157   app_worker_t *server_wrk;
1158   u32 session_index, server_wrk_index;
1159   local_session_parse_handle (handle, &server_wrk_index, &session_index);
1160   server_wrk = app_worker_get_if_valid (server_wrk_index);
1161   if (!server_wrk)
1162     return 0;
1163   return application_get_local_session (server_wrk, session_index);
1164 }
1165
1166 always_inline void
1167 application_local_listener_session_endpoint (local_session_t * ll,
1168                                              session_endpoint_t * sep)
1169 {
1170   sep->transport_proto =
1171     session_type_transport_proto (ll->listener_session_type);
1172   sep->port = ll->port;
1173   sep->is_ip4 = ll->listener_session_type & 1;
1174 }
1175
1176 int
1177 application_start_local_listen (app_worker_t * app_wrk,
1178                                 session_endpoint_t * sep,
1179                                 session_handle_t * handle)
1180 {
1181   session_handle_t lh;
1182   local_session_t *ll;
1183   application_t *app;
1184   u32 table_index;
1185
1186   app = application_get (app_wrk->app_index);
1187   table_index = application_local_session_table (app);
1188
1189   /* An exact sep match, as opposed to session_lookup_local_listener */
1190   lh = session_lookup_endpoint_listener (table_index, sep, 1);
1191   if (lh != SESSION_INVALID_HANDLE)
1192     return VNET_API_ERROR_ADDRESS_IN_USE;
1193
1194   pool_get (app_wrk->local_listen_sessions, ll);
1195   memset (ll, 0, sizeof (*ll));
1196   ll->session_type = session_type_from_proto_and_ip (TRANSPORT_PROTO_NONE, 0);
1197   ll->app_wrk_index = app_wrk->app_index;
1198   ll->session_index = ll - app_wrk->local_listen_sessions;
1199   ll->port = sep->port;
1200   /* Store the original session type for the unbind */
1201   ll->listener_session_type =
1202     session_type_from_proto_and_ip (sep->transport_proto, sep->is_ip4);
1203   ll->transport_listener_index = ~0;
1204
1205   *handle = application_local_session_handle (ll);
1206   session_lookup_add_session_endpoint (table_index, sep, *handle);
1207
1208   return 0;
1209 }
1210
1211 /**
1212  * Clean up local session table. If we have a listener session use it to
1213  * find the port and proto. If not, the handle must be a local table handle
1214  * so parse it.
1215  */
1216 int
1217 application_stop_local_listen (session_handle_t lh, u32 app_index)
1218 {
1219   session_endpoint_t sep = SESSION_ENDPOINT_NULL;
1220   u32 table_index, ll_index, server_wrk_index;
1221   app_worker_t *server_wrk;
1222   stream_session_t *sl = 0;
1223   local_session_t *ll, *ls;
1224   application_t *server;
1225
1226   server = application_get (app_index);
1227   table_index = application_local_session_table (server);
1228
1229   /* We have both local and global table binds. Figure from global what
1230    * the sep we should be cleaning up is.
1231    */
1232   if (!session_handle_is_local (lh))
1233     {
1234       sl = listen_session_get_from_handle (lh);
1235       if (!sl || listen_session_get_local_session_endpoint (sl, &sep))
1236         {
1237           clib_warning ("broken listener");
1238           return -1;
1239         }
1240       lh = session_lookup_endpoint_listener (table_index, &sep, 0);
1241       if (lh == SESSION_INVALID_HANDLE)
1242         return -1;
1243     }
1244
1245   local_session_parse_handle (lh, &server_wrk_index, &ll_index);
1246   server_wrk = app_worker_get (server_wrk_index);
1247   if (PREDICT_FALSE (server_wrk->app_index != app_index))
1248     {
1249       clib_warning ("app %u does not own local handle 0x%lx", app_index, lh);
1250     }
1251   ll = application_get_local_listen_session (server_wrk, ll_index);
1252   if (PREDICT_FALSE (!ll))
1253     {
1254       clib_warning ("no local listener");
1255       return -1;
1256     }
1257   application_local_listener_session_endpoint (ll, &sep);
1258   session_lookup_del_session_endpoint (table_index, &sep);
1259
1260   /* *INDENT-OFF* */
1261   pool_foreach (ls, server_wrk->local_sessions, ({
1262     if (ls->listener_index == ll->session_index)
1263       application_local_session_disconnect (server_wrk->app_index, ls);
1264   }));
1265   /* *INDENT-ON* */
1266   pool_put_index (server_wrk->local_listen_sessions, ll->session_index);
1267
1268   return 0;
1269 }
1270
1271 static void
1272 application_local_session_fix_eventds (svm_msg_q_t * sq, svm_msg_q_t * cq)
1273 {
1274   int fd;
1275
1276   /*
1277    * segment manager initializes only the producer eventds, since vpp is
1278    * typically the producer. But for local sessions, we also pass to the
1279    * apps the mqs they listen on for events from peer apps, so they are also
1280    * consumer fds.
1281    */
1282   fd = svm_msg_q_get_producer_eventfd (sq);
1283   svm_msg_q_set_consumer_eventfd (sq, fd);
1284   fd = svm_msg_q_get_producer_eventfd (cq);
1285   svm_msg_q_set_consumer_eventfd (cq, fd);
1286 }
1287
1288 int
1289 application_local_session_connect (app_worker_t * client_wrk,
1290                                    app_worker_t * server_wrk,
1291                                    local_session_t * ll, u32 opaque)
1292 {
1293   u32 seg_size, evt_q_sz, evt_q_elts, margin = 16 << 10;
1294   segment_manager_properties_t *props, *cprops;
1295   u32 round_rx_fifo_sz, round_tx_fifo_sz;
1296   int rv, has_transport, seg_index;
1297   svm_fifo_segment_private_t *seg;
1298   application_t *server, *client;
1299   segment_manager_t *sm;
1300   local_session_t *ls;
1301   svm_msg_q_t *sq, *cq;
1302
1303   ls = application_alloc_local_session (server_wrk);
1304
1305   server = application_get (server_wrk->app_index);
1306   client = application_get (client_wrk->app_index);
1307
1308   props = application_segment_manager_properties (server);
1309   cprops = application_segment_manager_properties (client);
1310   evt_q_elts = props->evt_q_size + cprops->evt_q_size;
1311   evt_q_sz = segment_manager_evt_q_expected_size (evt_q_elts);
1312   round_rx_fifo_sz = 1 << max_log2 (props->rx_fifo_size);
1313   round_tx_fifo_sz = 1 << max_log2 (props->tx_fifo_size);
1314   seg_size = round_rx_fifo_sz + round_tx_fifo_sz + evt_q_sz + margin;
1315
1316   has_transport = session_has_transport ((stream_session_t *) ll);
1317   if (!has_transport)
1318     {
1319       /* Local sessions don't have backing transport */
1320       ls->port = ll->port;
1321       sm = application_get_local_segment_manager (server_wrk);
1322     }
1323   else
1324     {
1325       stream_session_t *sl = (stream_session_t *) ll;
1326       transport_connection_t *tc;
1327       tc = listen_session_get_transport (sl);
1328       ls->port = tc->lcl_port;
1329       sm = app_worker_get_listen_segment_manager (server_wrk, sl);
1330     }
1331
1332   seg_index = segment_manager_add_segment (sm, seg_size);
1333   if (seg_index < 0)
1334     {
1335       clib_warning ("failed to add new cut-through segment");
1336       return seg_index;
1337     }
1338   seg = segment_manager_get_segment_w_lock (sm, seg_index);
1339   sq = segment_manager_alloc_queue (seg, props);
1340   cq = segment_manager_alloc_queue (seg, cprops);
1341
1342   if (props->use_mq_eventfd)
1343     application_local_session_fix_eventds (sq, cq);
1344
1345   ls->server_evt_q = pointer_to_uword (sq);
1346   ls->client_evt_q = pointer_to_uword (cq);
1347   rv = segment_manager_try_alloc_fifos (seg, props->rx_fifo_size,
1348                                         props->tx_fifo_size,
1349                                         &ls->server_rx_fifo,
1350                                         &ls->server_tx_fifo);
1351   if (rv)
1352     {
1353       clib_warning ("failed to add fifos in cut-through segment");
1354       segment_manager_segment_reader_unlock (sm);
1355       goto failed;
1356     }
1357   ls->server_rx_fifo->master_session_index = ls->session_index;
1358   ls->server_tx_fifo->master_session_index = ls->session_index;
1359   ls->server_rx_fifo->master_thread_index = ~0;
1360   ls->server_tx_fifo->master_thread_index = ~0;
1361   ls->svm_segment_index = seg_index;
1362   ls->listener_index = ll->session_index;
1363   ls->client_wrk_index = client_wrk->wrk_index;
1364   ls->client_opaque = opaque;
1365   ls->listener_session_type = ll->session_type;
1366
1367   if ((rv = server->cb_fns.add_segment_callback (server->api_client_index,
1368                                                  &seg->ssvm)))
1369     {
1370       clib_warning ("failed to notify server of new segment");
1371       segment_manager_segment_reader_unlock (sm);
1372       goto failed;
1373     }
1374   segment_manager_segment_reader_unlock (sm);
1375   if ((rv = server->cb_fns.session_accept_callback ((stream_session_t *) ls)))
1376     {
1377       clib_warning ("failed to send accept cut-through notify to server");
1378       goto failed;
1379     }
1380   if (server->flags & APP_OPTIONS_FLAGS_IS_BUILTIN)
1381     application_local_session_connect_notify (ls);
1382
1383   return 0;
1384
1385 failed:
1386   if (!has_transport)
1387     segment_manager_del_segment (sm, seg);
1388   return rv;
1389 }
1390
1391 static uword
1392 application_client_local_connect_key (local_session_t * ls)
1393 {
1394   return ((uword) ls->app_wrk_index << 32 | (uword) ls->session_index);
1395 }
1396
1397 static void
1398 application_client_local_connect_key_parse (uword key, u32 * app_wrk_index,
1399                                             u32 * session_index)
1400 {
1401   *app_wrk_index = key >> 32;
1402   *session_index = key & 0xFFFFFFFF;
1403 }
1404
1405 int
1406 application_local_session_connect_notify (local_session_t * ls)
1407 {
1408   svm_fifo_segment_private_t *seg;
1409   app_worker_t *client_wrk, *server_wrk;
1410   segment_manager_t *sm;
1411   application_t *client;
1412   int rv, is_fail = 0;
1413   uword client_key;
1414
1415   client_wrk = app_worker_get (ls->client_wrk_index);
1416   server_wrk = app_worker_get (ls->app_wrk_index);
1417   client = application_get (client_wrk->app_index);
1418
1419   sm = application_get_local_segment_manager_w_session (server_wrk, ls);
1420   seg = segment_manager_get_segment_w_lock (sm, ls->svm_segment_index);
1421   if ((rv = client->cb_fns.add_segment_callback (client->api_client_index,
1422                                                  &seg->ssvm)))
1423     {
1424       clib_warning ("failed to notify client %u of new segment",
1425                     ls->client_wrk_index);
1426       segment_manager_segment_reader_unlock (sm);
1427       application_local_session_disconnect (ls->client_wrk_index, ls);
1428       is_fail = 1;
1429     }
1430   else
1431     {
1432       segment_manager_segment_reader_unlock (sm);
1433     }
1434
1435   client->cb_fns.session_connected_callback (client_wrk->wrk_index,
1436                                              ls->client_opaque,
1437                                              (stream_session_t *) ls,
1438                                              is_fail);
1439
1440   client_key = application_client_local_connect_key (ls);
1441   hash_set (client_wrk->local_connects, client_key, client_key);
1442   return 0;
1443 }
1444
1445 int
1446 application_local_session_cleanup (app_worker_t * client_wrk,
1447                                    app_worker_t * server_wrk,
1448                                    local_session_t * ls)
1449 {
1450   svm_fifo_segment_private_t *seg;
1451   segment_manager_t *sm;
1452   uword client_key;
1453   u8 has_transport;
1454
1455   has_transport = session_has_transport ((stream_session_t *) ls);
1456   client_key = application_client_local_connect_key (ls);
1457   if (!has_transport)
1458     sm = application_get_local_segment_manager_w_session (server_wrk, ls);
1459   else
1460     sm = app_worker_get_listen_segment_manager (server_wrk,
1461                                                 (stream_session_t *) ls);
1462
1463   seg = segment_manager_get_segment (sm, ls->svm_segment_index);
1464   if (client_wrk)
1465     hash_unset (client_wrk->local_connects, client_key);
1466
1467   if (!has_transport)
1468     {
1469       application_t *server = application_get (server_wrk->app_index);
1470       server->cb_fns.del_segment_callback (server->api_client_index,
1471                                            &seg->ssvm);
1472       if (client_wrk)
1473         {
1474           application_t *client = application_get (client_wrk->app_index);
1475           client->cb_fns.del_segment_callback (client->api_client_index,
1476                                                &seg->ssvm);
1477         }
1478       segment_manager_del_segment (sm, seg);
1479     }
1480
1481   application_free_local_session (server_wrk, ls);
1482
1483   return 0;
1484 }
1485
1486 int
1487 application_local_session_disconnect (u32 app_wrk_index, local_session_t * ls)
1488 {
1489   app_worker_t *client_wrk, *server_wrk;
1490
1491   client_wrk = app_worker_get_if_valid (ls->client_wrk_index);
1492   server_wrk = app_worker_get (ls->app_wrk_index);
1493
1494
1495   if (ls->session_state == SESSION_STATE_CLOSED)
1496     return application_local_session_cleanup (client_wrk, server_wrk, ls);
1497
1498   if (app_wrk_index == ls->client_wrk_index)
1499     {
1500       mq_send_local_session_disconnected_cb (ls->app_wrk_index, ls);
1501     }
1502   else
1503     {
1504       if (!client_wrk)
1505         {
1506           return application_local_session_cleanup (client_wrk, server_wrk,
1507                                                     ls);
1508         }
1509       else if (ls->session_state < SESSION_STATE_READY)
1510         {
1511           application_t *client = application_get (client_wrk->app_index);
1512           client->cb_fns.session_connected_callback (client_wrk->wrk_index,
1513                                                      ls->client_opaque,
1514                                                      (stream_session_t *) ls,
1515                                                      1 /* is_fail */ );
1516           ls->session_state = SESSION_STATE_CLOSED;
1517           return application_local_session_cleanup (client_wrk, server_wrk,
1518                                                     ls);
1519         }
1520       else
1521         {
1522           mq_send_local_session_disconnected_cb (client_wrk->wrk_index, ls);
1523         }
1524     }
1525
1526   ls->session_state = SESSION_STATE_CLOSED;
1527
1528   return 0;
1529 }
1530
1531 int
1532 application_local_session_disconnect_w_index (u32 app_wrk_index, u32 ls_index)
1533 {
1534   app_worker_t *app_wrk;
1535   local_session_t *ls;
1536   app_wrk = app_worker_get (app_wrk_index);
1537   ls = application_get_local_session (app_wrk, ls_index);
1538   return application_local_session_disconnect (app_wrk_index, ls);
1539 }
1540
1541 void
1542 application_local_sessions_free (app_worker_t * app_wrk)
1543 {
1544   u32 index, server_wrk_index, session_index, table_index;
1545   segment_manager_t *sm;
1546   u64 handle, *handles = 0;
1547   local_session_t *ls, *ll;
1548   app_worker_t *server_wrk;
1549   session_endpoint_t sep;
1550   application_t *app;
1551   int i;
1552
1553   /*
1554    * Local listens. Don't bother with local sessions, we clean them lower
1555    */
1556   app = application_get (app_wrk->app_index);
1557   table_index = application_local_session_table (app);
1558   /* *INDENT-OFF* */
1559   pool_foreach (ll, app_wrk->local_listen_sessions, ({
1560     application_local_listener_session_endpoint (ll, &sep);
1561     session_lookup_del_session_endpoint (table_index, &sep);
1562   }));
1563   /* *INDENT-ON* */
1564
1565   /*
1566    * Local sessions
1567    */
1568   if (app_wrk->local_sessions)
1569     {
1570       /* *INDENT-OFF* */
1571       pool_foreach (ls, app_wrk->local_sessions, ({
1572         application_local_session_disconnect (app_wrk->wrk_index, ls);
1573       }));
1574       /* *INDENT-ON* */
1575     }
1576
1577   /*
1578    * Local connects
1579    */
1580   vec_reset_length (handles);
1581   /* *INDENT-OFF* */
1582   hash_foreach (handle, index, app_wrk->local_connects, ({
1583     vec_add1 (handles, handle);
1584   }));
1585   /* *INDENT-ON* */
1586
1587   for (i = 0; i < vec_len (handles); i++)
1588     {
1589       application_client_local_connect_key_parse (handles[i],
1590                                                   &server_wrk_index,
1591                                                   &session_index);
1592       server_wrk = app_worker_get_if_valid (server_wrk_index);
1593       if (server_wrk)
1594         {
1595           ls = application_get_local_session (server_wrk, session_index);
1596           application_local_session_disconnect (app_wrk->wrk_index, ls);
1597         }
1598     }
1599
1600   sm = segment_manager_get (app_wrk->local_segment_manager);
1601   sm->app_wrk_index = SEGMENT_MANAGER_INVALID_APP_INDEX;
1602   segment_manager_del (sm);
1603 }
1604
1605 clib_error_t *
1606 vnet_app_add_tls_cert (vnet_app_add_tls_cert_args_t * a)
1607 {
1608   application_t *app;
1609   app = application_get (a->app_index);
1610   if (!app)
1611     return clib_error_return_code (0, VNET_API_ERROR_APPLICATION_NOT_ATTACHED,
1612                                    0, "app %u doesn't exist", a->app_index);
1613   app->tls_cert = vec_dup (a->cert);
1614   return 0;
1615 }
1616
1617 clib_error_t *
1618 vnet_app_add_tls_key (vnet_app_add_tls_key_args_t * a)
1619 {
1620   application_t *app;
1621   app = application_get (a->app_index);
1622   if (!app)
1623     return clib_error_return_code (0, VNET_API_ERROR_APPLICATION_NOT_ATTACHED,
1624                                    0, "app %u doesn't exist", a->app_index);
1625   app->tls_key = vec_dup (a->key);
1626   return 0;
1627 }
1628
1629 u8 *
1630 format_app_worker_listener (u8 * s, va_list * args)
1631 {
1632   app_worker_t *app_wrk = va_arg (*args, app_worker_t *);
1633   u64 handle = va_arg (*args, u64);
1634   u32 sm_index = va_arg (*args, u32);
1635   int verbose = va_arg (*args, int);
1636   stream_session_t *listener;
1637   application_t *app;
1638   u8 *app_name, *str;
1639
1640   if (!app_wrk)
1641     {
1642       if (verbose)
1643         s = format (s, "%-40s%-25s%-15s%-15s%-10s", "Connection", "App",
1644                     "API Client", "ListenerID", "SegManager");
1645       else
1646         s = format (s, "%-40s%-25s", "Connection", "App");
1647
1648       return s;
1649     }
1650
1651   app = application_get (app_wrk->app_index);
1652   app_name = app_get_name_from_reg_index (app);
1653   listener = listen_session_get_from_handle (handle);
1654   str = format (0, "%U", format_stream_session, listener, verbose);
1655
1656   if (verbose)
1657     {
1658       s = format (s, "%-40s%-25s%-15u%-15u%-10u", str, app_name,
1659                   app->api_client_index, handle, sm_index);
1660     }
1661   else
1662     s = format (s, "%-40s%-25s", str, app_name);
1663
1664   vec_free (app_name);
1665   return s;
1666 }
1667
1668 static void
1669 application_format_listeners (application_t * app, int verbose)
1670 {
1671   vlib_main_t *vm = vlib_get_main ();
1672   app_worker_map_t *wrk_map;
1673   app_worker_t *app_wrk;
1674   u32 sm_index;
1675   u64 handle;
1676
1677   if (!app)
1678     {
1679       vlib_cli_output (vm, "%U", format_app_worker_listener, 0 /* header */ ,
1680                        0, 0, verbose);
1681       return;
1682     }
1683
1684   /* *INDENT-OFF* */
1685   pool_foreach (wrk_map, app->worker_maps, ({
1686     app_wrk = app_worker_get (wrk_map->wrk_index);
1687     if (hash_elts (app_wrk->listeners_table) == 0)
1688       continue;
1689     hash_foreach (handle, sm_index, app_wrk->listeners_table, ({
1690       vlib_cli_output (vm, "%U", format_app_worker_listener, app_wrk,
1691                        handle, sm_index, verbose);
1692     }));
1693   }));
1694   /* *INDENT-ON* */
1695 }
1696
1697 static void
1698 app_worker_format_connects (app_worker_t * app_wrk, int verbose)
1699 {
1700   svm_fifo_segment_private_t *fifo_segment;
1701   vlib_main_t *vm = vlib_get_main ();
1702   segment_manager_t *sm;
1703   u8 *app_name, *s = 0;
1704   application_t *app;
1705
1706   /* Header */
1707   if (!app_wrk)
1708     {
1709       if (verbose)
1710         vlib_cli_output (vm, "%-40s%-20s%-15s%-10s", "Connection", "App",
1711                          "API Client", "SegManager");
1712       else
1713         vlib_cli_output (vm, "%-40s%-20s", "Connection", "App");
1714       return;
1715     }
1716
1717   app = application_get (app_wrk->app_index);
1718   if (app_wrk->connects_seg_manager == (u32) ~ 0)
1719     return;
1720
1721   app_name = app_get_name_from_reg_index (app);
1722
1723   /* Across all fifo segments */
1724   sm = segment_manager_get (app_wrk->connects_seg_manager);
1725
1726   /* *INDENT-OFF* */
1727   segment_manager_foreach_segment_w_lock (fifo_segment, sm, ({
1728     svm_fifo_t *fifo;
1729     u8 *str;
1730
1731     fifo = svm_fifo_segment_get_fifo_list (fifo_segment);
1732     while (fifo)
1733         {
1734           u32 session_index, thread_index;
1735           stream_session_t *session;
1736
1737           session_index = fifo->master_session_index;
1738           thread_index = fifo->master_thread_index;
1739
1740           session = session_get (session_index, thread_index);
1741           str = format (0, "%U", format_stream_session, session, verbose);
1742
1743           if (verbose)
1744             s = format (s, "%-40s%-20s%-15u%-10u", str, app_name,
1745                         app->api_client_index, app_wrk->connects_seg_manager);
1746           else
1747             s = format (s, "%-40s%-20s", str, app_name);
1748
1749           vlib_cli_output (vm, "%v", s);
1750           vec_reset_length (s);
1751           vec_free (str);
1752
1753           fifo = fifo->next;
1754         }
1755     vec_free (s);
1756   }));
1757   /* *INDENT-ON* */
1758
1759   vec_free (app_name);
1760 }
1761
1762 static void
1763 application_format_connects (application_t * app, int verbose)
1764 {
1765   app_worker_map_t *wrk_map;
1766   app_worker_t *app_wrk;
1767
1768   if (!app)
1769     {
1770       app_worker_format_connects (0, verbose);
1771       return;
1772     }
1773
1774   /* *INDENT-OFF* */
1775   pool_foreach (wrk_map, app->worker_maps, ({
1776     app_wrk = app_worker_get (wrk_map->wrk_index);
1777     app_worker_format_connects (app_wrk, verbose);
1778   }));
1779   /* *INDENT-ON* */
1780 }
1781
1782 static void
1783 app_worker_format_local_sessions (app_worker_t * app_wrk, int verbose)
1784 {
1785   vlib_main_t *vm = vlib_get_main ();
1786   local_session_t *ls;
1787   transport_proto_t tp;
1788   u8 *conn = 0;
1789
1790   /* Header */
1791   if (app_wrk == 0)
1792     {
1793       vlib_cli_output (vm, "%-40s%-15s%-20s", "Connection", "ServerApp",
1794                        "ClientApp");
1795       return;
1796     }
1797
1798   if (!pool_elts (app_wrk->local_sessions)
1799       && !pool_elts (app_wrk->local_connects))
1800     return;
1801
1802   /* *INDENT-OFF* */
1803   pool_foreach (ls, app_wrk->local_listen_sessions, ({
1804     tp = session_type_transport_proto(ls->listener_session_type);
1805     conn = format (0, "[L][%U] *:%u", format_transport_proto_short, tp,
1806                    ls->port);
1807     vlib_cli_output (vm, "%-40v%-15u%-20s", conn, ls->app_wrk_index, "*");
1808     vec_reset_length (conn);
1809   }));
1810   pool_foreach (ls, app_wrk->local_sessions, ({
1811     tp = session_type_transport_proto(ls->listener_session_type);
1812     conn = format (0, "[L][%U] *:%u", format_transport_proto_short, tp,
1813                    ls->port);
1814     vlib_cli_output (vm, "%-40v%-15u%-20u", conn, ls->app_wrk_index,
1815                      ls->client_wrk_index);
1816     vec_reset_length (conn);
1817   }));
1818   /* *INDENT-ON* */
1819
1820   vec_free (conn);
1821 }
1822
1823 static void
1824 application_format_local_sessions (application_t * app, int verbose)
1825 {
1826   app_worker_map_t *wrk_map;
1827   app_worker_t *app_wrk;
1828
1829   if (!app)
1830     {
1831       app_worker_format_local_sessions (0, verbose);
1832       return;
1833     }
1834
1835   /* *INDENT-OFF* */
1836   pool_foreach (wrk_map, app->worker_maps, ({
1837     app_wrk = app_worker_get (wrk_map->wrk_index);
1838     app_worker_format_local_sessions (app_wrk, verbose);
1839   }));
1840   /* *INDENT-ON* */
1841 }
1842
1843 static void
1844 app_worker_format_local_connects (app_worker_t * app, int verbose)
1845 {
1846   vlib_main_t *vm = vlib_get_main ();
1847   u32 app_wrk_index, session_index;
1848   app_worker_t *server_wrk;
1849   local_session_t *ls;
1850   uword client_key;
1851   u64 value;
1852
1853   /* Header */
1854   if (app == 0)
1855     {
1856       if (verbose)
1857         vlib_cli_output (vm, "%-40s%-15s%-20s%-10s", "Connection", "App",
1858                          "Peer App", "SegManager");
1859       else
1860         vlib_cli_output (vm, "%-40s%-15s%-20s", "Connection", "App",
1861                          "Peer App");
1862       return;
1863     }
1864
1865   if (!app->local_connects)
1866     return;
1867
1868   /* *INDENT-OFF* */
1869   hash_foreach (client_key, value, app->local_connects, ({
1870     application_client_local_connect_key_parse (client_key, &app_wrk_index,
1871                                                 &session_index);
1872     server_wrk = app_worker_get (app_wrk_index);
1873     ls = application_get_local_session (server_wrk, session_index);
1874     vlib_cli_output (vm, "%-40s%-15s%-20s", "TODO", ls->app_wrk_index,
1875                      ls->client_wrk_index);
1876   }));
1877   /* *INDENT-ON* */
1878 }
1879
1880 static void
1881 application_format_local_connects (application_t * app, int verbose)
1882 {
1883   app_worker_map_t *wrk_map;
1884   app_worker_t *app_wrk;
1885
1886   if (!app)
1887     {
1888       app_worker_format_local_connects (0, verbose);
1889       return;
1890     }
1891
1892   /* *INDENT-OFF* */
1893   pool_foreach (wrk_map, app->worker_maps, ({
1894     app_wrk = app_worker_get (wrk_map->wrk_index);
1895     app_worker_format_local_connects (app_wrk, verbose);
1896   }));
1897   /* *INDENT-ON* */
1898 }
1899
1900 u8 *
1901 format_application (u8 * s, va_list * args)
1902 {
1903   application_t *app = va_arg (*args, application_t *);
1904   CLIB_UNUSED (int verbose) = va_arg (*args, int);
1905   segment_manager_properties_t *props;
1906   const u8 *app_ns_name;
1907   u8 *app_name;
1908
1909   if (app == 0)
1910     {
1911       if (verbose)
1912         s = format (s, "%-10s%-20s%-15s%-15s%-15s%-15s%-15s", "Index", "Name",
1913                     "API Client", "Namespace", "Add seg size", "Rx-f size",
1914                     "Tx-f size");
1915       else
1916         s = format (s, "%-10s%-20s%-15s%-40s", "Index", "Name", "API Client",
1917                     "Namespace");
1918       return s;
1919     }
1920
1921   app_name = app_get_name (app);
1922   app_ns_name = app_namespace_id_from_index (app->ns_index);
1923   props = application_segment_manager_properties (app);
1924   if (verbose)
1925     s = format (s, "%-10u%-20s%-15d%-15u%-15U%-15U%-15U", app->app_index,
1926                 app_name, app->api_client_index, app->ns_index,
1927                 format_memory_size, props->add_segment_size,
1928                 format_memory_size, props->rx_fifo_size, format_memory_size,
1929                 props->tx_fifo_size);
1930   else
1931     s = format (s, "%-10u%-20s%-15d%-40s", app->app_index, app_name,
1932                 app->api_client_index, app_ns_name);
1933   return s;
1934 }
1935
1936 void
1937 application_format_all_listeners (vlib_main_t * vm, int do_local, int verbose)
1938 {
1939   application_t *app;
1940
1941   if (!pool_elts (app_main.app_pool))
1942     {
1943       vlib_cli_output (vm, "No active server bindings");
1944       return;
1945     }
1946
1947   if (do_local)
1948     {
1949       application_format_local_sessions (0, verbose);
1950       /* *INDENT-OFF* */
1951       pool_foreach (app, app_main.app_pool, ({
1952         application_format_local_sessions (app, verbose);
1953       }));
1954       /* *INDENT-ON* */
1955     }
1956   else
1957     {
1958       application_format_listeners (0, verbose);
1959
1960       /* *INDENT-OFF* */
1961       pool_foreach (app, app_main.app_pool, ({
1962         application_format_listeners (app, verbose);
1963       }));
1964       /* *INDENT-ON* */
1965     }
1966 }
1967
1968 void
1969 application_format_all_clients (vlib_main_t * vm, int do_local, int verbose)
1970 {
1971   application_t *app;
1972
1973   if (!pool_elts (app_main.app_pool))
1974     {
1975       vlib_cli_output (vm, "No active apps");
1976       return;
1977     }
1978
1979   if (do_local)
1980     {
1981       application_format_local_connects (0, verbose);
1982
1983       /* *INDENT-OFF* */
1984       pool_foreach (app, app_main.app_pool, ({
1985         application_format_local_connects (app, verbose);
1986       }));
1987       /* *INDENT-ON* */
1988     }
1989   else
1990     {
1991       application_format_connects (0, verbose);
1992
1993       /* *INDENT-OFF* */
1994       pool_foreach (app, app_main.app_pool, ({
1995         application_format_connects (app, verbose);
1996       }));
1997       /* *INDENT-ON* */
1998     }
1999 }
2000
2001 static clib_error_t *
2002 show_app_command_fn (vlib_main_t * vm, unformat_input_t * input,
2003                      vlib_cli_command_t * cmd)
2004 {
2005   int do_server = 0, do_client = 0, do_local = 0;
2006   application_t *app;
2007   int verbose = 0;
2008
2009   session_cli_return_if_not_enabled ();
2010
2011   while (unformat_check_input (input) != UNFORMAT_END_OF_INPUT)
2012     {
2013       if (unformat (input, "server"))
2014         do_server = 1;
2015       else if (unformat (input, "client"))
2016         do_client = 1;
2017       else if (unformat (input, "local"))
2018         do_local = 1;
2019       else if (unformat (input, "verbose"))
2020         verbose = 1;
2021       else
2022         break;
2023     }
2024
2025   if (do_server)
2026     application_format_all_listeners (vm, do_local, verbose);
2027
2028   if (do_client)
2029     application_format_all_clients (vm, do_local, verbose);
2030
2031   /* Print app related info */
2032   if (!do_server && !do_client)
2033     {
2034       vlib_cli_output (vm, "%U", format_application, 0, verbose);
2035       /* *INDENT-OFF* */
2036       pool_foreach (app, app_main.app_pool, ({
2037         vlib_cli_output (vm, "%U", format_application, app, verbose);
2038       }));
2039       /* *INDENT-ON* */
2040     }
2041
2042   return 0;
2043 }
2044
2045 /* *INDENT-OFF* */
2046 VLIB_CLI_COMMAND (show_app_command, static) =
2047 {
2048   .path = "show app",
2049   .short_help = "show app [server|client] [verbose]",
2050   .function = show_app_command_fn,
2051 };
2052 /* *INDENT-ON* */
2053
2054 /*
2055  * fd.io coding-style-patch-verification: ON
2056  *
2057  * Local Variables:
2058  * eval: (c-set-style "gnu")
2059  * End:
2060  */