crypto-native: add AES-CTR
[vpp.git] / src / vnet / session / application_worker.c
1 /*
2  * Copyright (c) 2019 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include <vnet/session/application.h>
17 #include <vnet/session/application_interface.h>
18 #include <vnet/session/session.h>
19
20 /**
21  * Pool of workers associated to apps
22  */
23 static app_worker_t *app_workers;
24
25 app_worker_t *
26 app_worker_alloc (application_t * app)
27 {
28   app_worker_t *app_wrk;
29
30   pool_get (app_workers, app_wrk);
31   clib_memset (app_wrk, 0, sizeof (*app_wrk));
32   app_wrk->wrk_index = app_wrk - app_workers;
33   app_wrk->app_index = app->app_index;
34   app_wrk->wrk_map_index = ~0;
35   app_wrk->connects_seg_manager = APP_INVALID_SEGMENT_MANAGER_INDEX;
36   clib_spinlock_init (&app_wrk->detached_seg_managers_lock);
37   vec_validate (app_wrk->wrk_evts, vlib_num_workers ());
38   vec_validate (app_wrk->wrk_mq_congested, vlib_num_workers ());
39   APP_DBG ("New app %v worker %u", app->name, app_wrk->wrk_index);
40   return app_wrk;
41 }
42
43 app_worker_t *
44 app_worker_get (u32 wrk_index)
45 {
46   return pool_elt_at_index (app_workers, wrk_index);
47 }
48
49 app_worker_t *
50 app_worker_get_if_valid (u32 wrk_index)
51 {
52   if (pool_is_free_index (app_workers, wrk_index))
53     return 0;
54   return pool_elt_at_index (app_workers, wrk_index);
55 }
56
57 void
58 app_worker_free (app_worker_t * app_wrk)
59 {
60   application_t *app = application_get (app_wrk->app_index);
61   vnet_unlisten_args_t _a, *a = &_a;
62   u64 handle, *handles = 0, *sm_indices = 0;
63   segment_manager_t *sm;
64   session_handle_t *sh;
65   session_t *ls;
66   u32 sm_index;
67   int i;
68
69   /*
70    * Cleanup vpp wrk events
71    */
72   app_worker_del_all_events (app_wrk);
73   for (i = 0; i < vec_len (app_wrk->wrk_evts); i++)
74     clib_fifo_free (app_wrk->wrk_evts[i]);
75
76   vec_free (app_wrk->wrk_evts);
77   vec_free (app_wrk->wrk_mq_congested);
78
79   /*
80    *  Listener cleanup
81    */
82
83   hash_foreach (handle, sm_index, app_wrk->listeners_table, ({
84     ls = listen_session_get_from_handle (handle);
85     vec_add1 (handles, app_listen_session_handle (ls));
86     vec_add1 (sm_indices, sm_index);
87     sm = segment_manager_get (sm_index);
88   }));
89
90   for (i = 0; i < vec_len (handles); i++)
91     {
92       /* Cleanup listener */
93       a->app_index = app->app_index;
94       a->wrk_map_index = app_wrk->wrk_map_index;
95       a->handle = handles[i];
96       (void) vnet_unlisten (a);
97
98       sm = segment_manager_get_if_valid (sm_indices[i]);
99       if (sm && !segment_manager_app_detached (sm))
100         {
101           sm->first_is_protected = 0;
102           segment_manager_init_free (sm);
103         }
104     }
105   vec_reset_length (handles);
106   vec_free (sm_indices);
107   hash_free (app_wrk->listeners_table);
108
109   /*
110    * Connects segment manager cleanup
111    */
112
113   if (app_wrk->connects_seg_manager != APP_INVALID_SEGMENT_MANAGER_INDEX)
114     {
115       sm = segment_manager_get (app_wrk->connects_seg_manager);
116       sm->app_wrk_index = SEGMENT_MANAGER_INVALID_APP_INDEX;
117       sm->first_is_protected = 0;
118       segment_manager_init_free (sm);
119     }
120
121   /*
122    * Half-open cleanup
123    */
124
125   pool_foreach (sh, app_wrk->half_open_table)
126     session_cleanup_half_open (*sh);
127
128   pool_free (app_wrk->half_open_table);
129
130   /*
131    * Detached listener segment managers cleanup
132    */
133   for (i = 0; i < vec_len (app_wrk->detached_seg_managers); i++)
134     {
135       sm = segment_manager_get (app_wrk->detached_seg_managers[i]);
136       segment_manager_init_free (sm);
137     }
138   vec_free (app_wrk->detached_seg_managers);
139   clib_spinlock_free (&app_wrk->detached_seg_managers_lock);
140
141   if (CLIB_DEBUG)
142     clib_memset (app_wrk, 0xfe, sizeof (*app_wrk));
143   pool_put (app_workers, app_wrk);
144 }
145
146 application_t *
147 app_worker_get_app (u32 wrk_index)
148 {
149   app_worker_t *app_wrk;
150   app_wrk = app_worker_get_if_valid (wrk_index);
151   if (!app_wrk)
152     return 0;
153   return application_get_if_valid (app_wrk->app_index);
154 }
155
156 static segment_manager_t *
157 app_worker_alloc_segment_manager (app_worker_t * app_wrk)
158 {
159   segment_manager_t *sm;
160
161   sm = segment_manager_alloc ();
162   sm->app_wrk_index = app_wrk->wrk_index;
163   segment_manager_init (sm);
164   return sm;
165 }
166
167 static int
168 app_worker_alloc_session_fifos (segment_manager_t * sm, session_t * s)
169 {
170   svm_fifo_t *rx_fifo = 0, *tx_fifo = 0;
171   int rv;
172
173   if ((rv = segment_manager_alloc_session_fifos (sm, s->thread_index,
174                                                  &rx_fifo, &tx_fifo)))
175     return rv;
176
177   rx_fifo->shr->master_session_index = s->session_index;
178   rx_fifo->master_thread_index = s->thread_index;
179
180   tx_fifo->shr->master_session_index = s->session_index;
181   tx_fifo->master_thread_index = s->thread_index;
182
183   s->rx_fifo = rx_fifo;
184   s->tx_fifo = tx_fifo;
185   return 0;
186 }
187
188 int
189 app_worker_alloc_wrk_cl_session (app_worker_t *app_wrk, session_t *ls)
190 {
191   svm_fifo_t *rx_fifo = 0, *tx_fifo = 0;
192   segment_manager_t *sm;
193   session_handle_t lsh;
194   app_listener_t *al;
195   session_t *s;
196
197   al = app_listener_get (ls->al_index);
198   sm = app_worker_get_listen_segment_manager (app_wrk, ls);
199   lsh = session_handle (ls);
200
201   s = session_alloc (0 /* listener on main worker */);
202   session_set_state (s, SESSION_STATE_LISTENING);
203   s->flags |= SESSION_F_IS_CLESS;
204   s->app_wrk_index = app_wrk->wrk_index;
205   ls = session_get_from_handle (lsh);
206   s->session_type = ls->session_type;
207   s->connection_index = ls->connection_index;
208
209   segment_manager_alloc_session_fifos (sm, s->thread_index, &rx_fifo,
210                                        &tx_fifo);
211
212   rx_fifo->shr->master_session_index = s->session_index;
213   rx_fifo->master_thread_index = s->thread_index;
214
215   tx_fifo->shr->master_session_index = s->session_index;
216   tx_fifo->master_thread_index = s->thread_index;
217
218   s->rx_fifo = rx_fifo;
219   s->tx_fifo = tx_fifo;
220
221   vec_validate (al->cl_listeners, app_wrk->wrk_map_index);
222   al->cl_listeners[app_wrk->wrk_map_index] = s->session_index;
223
224   return 0;
225 }
226
227 void
228 app_worker_free_wrk_cl_session (app_worker_t *app_wrk, session_t *ls)
229 {
230   app_listener_t *al;
231   session_t *s;
232
233   al = app_listener_get (ls->al_index);
234
235   s = app_listener_get_wrk_cl_session (al, app_wrk->wrk_map_index);
236   segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo);
237   session_free (s);
238
239   al->cl_listeners[app_wrk->wrk_map_index] = SESSION_INVALID_INDEX;
240 }
241
242 int
243 app_worker_init_listener (app_worker_t * app_wrk, session_t * ls)
244 {
245   segment_manager_t *sm;
246
247   /* Allocate segment manager. All sessions derived out of a listen session
248    * have fifos allocated by the same segment manager.
249    * TODO(fcoras): limit memory consumption by cless listeners */
250   if (!(sm = app_worker_alloc_segment_manager (app_wrk)))
251     return SESSION_E_ALLOC;
252
253   /* Once the first segment is mapped, don't remove it until unlisten */
254   sm->first_is_protected = 1;
255
256   /* Keep track of the segment manager for the listener or this worker */
257   hash_set (app_wrk->listeners_table, listen_session_get_handle (ls),
258             segment_manager_index (sm));
259
260   if (ls->flags & SESSION_F_IS_CLESS)
261     return app_worker_alloc_wrk_cl_session (app_wrk, ls);
262
263   return 0;
264 }
265
266 session_error_t
267 app_worker_start_listen (app_worker_t *app_wrk, app_listener_t *app_listener)
268 {
269   session_t *ls;
270   int rv;
271
272   if (clib_bitmap_get (app_listener->workers, app_wrk->wrk_map_index))
273     return SESSION_E_ALREADY_LISTENING;
274
275   app_listener->workers = clib_bitmap_set (app_listener->workers,
276                                            app_wrk->wrk_map_index, 1);
277
278   if (app_listener->session_index != SESSION_INVALID_INDEX)
279     {
280       ls = session_get (app_listener->session_index, 0);
281       if ((rv = app_worker_init_listener (app_wrk, ls)))
282         return rv;
283     }
284
285   if (app_listener->local_index != SESSION_INVALID_INDEX)
286     {
287       ls = session_get (app_listener->local_index, 0);
288       if ((rv = app_worker_init_listener (app_wrk, ls)))
289         return rv;
290     }
291
292   return 0;
293 }
294
295 static void
296 app_worker_add_detached_sm (app_worker_t * app_wrk, u32 sm_index)
297 {
298   vec_add1 (app_wrk->detached_seg_managers, sm_index);
299 }
300
301 void
302 app_worker_del_detached_sm (app_worker_t * app_wrk, u32 sm_index)
303 {
304   u32 i;
305
306   clib_spinlock_lock (&app_wrk->detached_seg_managers_lock);
307   for (i = 0; i < vec_len (app_wrk->detached_seg_managers); i++)
308     {
309       if (app_wrk->detached_seg_managers[i] == sm_index)
310         {
311           vec_del1 (app_wrk->detached_seg_managers, i);
312           break;
313         }
314     }
315   clib_spinlock_unlock (&app_wrk->detached_seg_managers_lock);
316 }
317
318 static void
319 app_worker_stop_listen_session (app_worker_t * app_wrk, session_t * ls)
320 {
321   session_handle_t handle;
322   segment_manager_t *sm;
323   uword *sm_indexp;
324   session_state_t *states = 0;
325
326   handle = listen_session_get_handle (ls);
327   sm_indexp = hash_get (app_wrk->listeners_table, handle);
328   if (PREDICT_FALSE (!sm_indexp))
329     return;
330
331   if (ls->flags & SESSION_F_IS_CLESS)
332     app_worker_free_wrk_cl_session (app_wrk, ls);
333
334   /* Try to cleanup segment manager */
335   sm = segment_manager_get (*sm_indexp);
336   if (sm)
337     {
338       sm->first_is_protected = 0;
339       segment_manager_app_detach (sm);
340       if (!segment_manager_has_fifos (sm))
341         {
342           /* Empty segment manager, cleanup it up */
343           segment_manager_free (sm);
344         }
345       else
346         {
347           /* Delete sessions in CREATED state */
348           vec_add1 (states, SESSION_STATE_CREATED);
349           segment_manager_del_sessions_filter (sm, states);
350           vec_free (states);
351
352           /* Track segment manager in case app detaches and all the
353            * outstanding sessions need to be closed */
354           app_worker_add_detached_sm (app_wrk, *sm_indexp);
355           sm->flags |= SEG_MANAGER_F_DETACHED_LISTENER;
356         }
357     }
358
359   hash_unset (app_wrk->listeners_table, handle);
360 }
361
362 int
363 app_worker_stop_listen (app_worker_t * app_wrk, app_listener_t * al)
364 {
365   session_t *ls;
366
367   if (!clib_bitmap_get (al->workers, app_wrk->wrk_map_index))
368     return 0;
369
370   if (al->session_index != SESSION_INVALID_INDEX)
371     {
372       ls = listen_session_get (al->session_index);
373       app_worker_stop_listen_session (app_wrk, ls);
374     }
375
376   if (al->local_index != SESSION_INVALID_INDEX)
377     {
378       ls = listen_session_get (al->local_index);
379       app_worker_stop_listen_session (app_wrk, ls);
380     }
381
382   clib_bitmap_set_no_check (al->workers, app_wrk->wrk_map_index, 0);
383   if (clib_bitmap_is_zero (al->workers))
384     app_listener_cleanup (al);
385
386   return 0;
387 }
388
389 int
390 app_worker_init_accepted (session_t * s)
391 {
392   app_worker_t *app_wrk;
393   segment_manager_t *sm;
394   session_t *listener;
395   application_t *app;
396
397   listener = listen_session_get_from_handle (s->listener_handle);
398   app_wrk = application_listener_select_worker (listener);
399   if (PREDICT_FALSE (app_worker_mq_is_congested (app_wrk)))
400     return -1;
401
402   s->app_wrk_index = app_wrk->wrk_index;
403   app = application_get (app_wrk->app_index);
404   if (app->cb_fns.fifo_tuning_callback)
405     s->flags |= SESSION_F_CUSTOM_FIFO_TUNING;
406
407   sm = app_worker_get_listen_segment_manager (app_wrk, listener);
408   if (app_worker_alloc_session_fifos (sm, s))
409     return -1;
410
411   return 0;
412 }
413
414 int
415 app_worker_listened_notify (app_worker_t *app_wrk, session_handle_t alsh,
416                             u32 opaque, session_error_t err)
417 {
418   session_event_t evt = { .event_type = SESSION_CTRL_EVT_BOUND,
419                           .as_u64[0] = alsh,
420                           .as_u64[1] = (u64) opaque << 32 | (u32) err };
421
422   app_worker_add_event_custom (app_wrk, 0 /* thread index */, &evt);
423
424   return 0;
425 }
426
427 int
428 app_worker_unlisten_reply (app_worker_t *app_wrk, session_handle_t sh,
429                            u32 opaque, session_error_t err)
430 {
431   session_event_t evt = { .event_type = SESSION_CTRL_EVT_UNLISTEN_REPLY,
432                           .as_u64[0] = sh,
433                           .as_u64[1] = (u64) opaque << 32 | (u32) err };
434
435   app_worker_add_event_custom (app_wrk, 0 /* thread index */, &evt);
436   return 0;
437 }
438
439 int
440 app_worker_accept_notify (app_worker_t * app_wrk, session_t * s)
441 {
442   app_worker_add_event (app_wrk, s, SESSION_CTRL_EVT_ACCEPTED);
443   return 0;
444 }
445
446 int
447 app_worker_init_connected (app_worker_t * app_wrk, session_t * s)
448 {
449   application_t *app = application_get (app_wrk->app_index);
450   segment_manager_t *sm;
451
452   if (app->cb_fns.fifo_tuning_callback)
453     s->flags |= SESSION_F_CUSTOM_FIFO_TUNING;
454
455   /* Allocate fifos for session, unless the app is a builtin proxy */
456   if (application_is_builtin_proxy (app))
457     return app->cb_fns.proxy_alloc_session_fifos (s);
458
459   sm = app_worker_get_connect_segment_manager (app_wrk);
460   return app_worker_alloc_session_fifos (sm, s);
461 }
462
463 int
464 app_worker_connect_notify (app_worker_t * app_wrk, session_t * s,
465                            session_error_t err, u32 opaque)
466 {
467   session_event_t evt = { .event_type = SESSION_CTRL_EVT_CONNECTED,
468                           .as_u64[0] = s ? s->session_index : ~0,
469                           .as_u64[1] = (u64) opaque << 32 | (u32) err };
470   u32 thread_index = s ? s->thread_index : vlib_get_thread_index ();
471
472   app_worker_add_event_custom (app_wrk, thread_index, &evt);
473   return 0;
474 }
475
476 int
477 app_worker_add_half_open (app_worker_t *app_wrk, session_handle_t sh)
478 {
479   session_handle_t *shp;
480
481   ASSERT (session_vlib_thread_is_cl_thread ());
482   pool_get (app_wrk->half_open_table, shp);
483   *shp = sh;
484
485   return (shp - app_wrk->half_open_table);
486 }
487
488 int
489 app_worker_del_half_open (app_worker_t *app_wrk, session_t *s)
490 {
491   app_worker_add_event (app_wrk, s, SESSION_CTRL_EVT_HALF_CLEANUP);
492   return 0;
493 }
494
495 int
496 app_worker_close_notify (app_worker_t * app_wrk, session_t * s)
497 {
498   app_worker_add_event (app_wrk, s, SESSION_CTRL_EVT_DISCONNECTED);
499   return 0;
500 }
501
502 int
503 app_worker_transport_closed_notify (app_worker_t * app_wrk, session_t * s)
504 {
505   app_worker_add_event (app_wrk, s, SESSION_CTRL_EVT_TRANSPORT_CLOSED);
506   return 0;
507 }
508
509 int
510 app_worker_reset_notify (app_worker_t * app_wrk, session_t * s)
511 {
512   app_worker_add_event (app_wrk, s, SESSION_CTRL_EVT_RESET);
513   return 0;
514 }
515
516 int
517 app_worker_cleanup_notify (app_worker_t * app_wrk, session_t * s,
518                            session_cleanup_ntf_t ntf)
519 {
520   session_event_t evt = { .event_type = SESSION_CTRL_EVT_CLEANUP,
521                           .as_u64[0] = (u64) ntf << 32 | s->session_index,
522                           .as_u64[1] = pointer_to_uword (session_cleanup) };
523
524   app_worker_add_event_custom (app_wrk, s->thread_index, &evt);
525
526   return 0;
527 }
528
529 int
530 app_worker_cleanup_notify_custom (app_worker_t *app_wrk, session_t *s,
531                                   session_cleanup_ntf_t ntf,
532                                   void (*cleanup_cb) (session_t *s))
533 {
534   session_event_t evt = { .event_type = SESSION_CTRL_EVT_CLEANUP,
535                           .as_u64[0] = (u64) ntf << 32 | s->session_index,
536                           .as_u64[1] = pointer_to_uword (cleanup_cb) };
537
538   app_worker_add_event_custom (app_wrk, s->thread_index, &evt);
539
540   return 0;
541 }
542
543 int
544 app_worker_rx_notify (app_worker_t *app_wrk, session_t *s)
545 {
546   app_worker_add_event (app_wrk, s, SESSION_IO_EVT_RX);
547   return 0;
548 }
549
550 int
551 app_worker_migrate_notify (app_worker_t * app_wrk, session_t * s,
552                            session_handle_t new_sh)
553 {
554   session_event_t evt = { .event_type = SESSION_CTRL_EVT_MIGRATED,
555                           .as_u64[0] = s->session_index,
556                           .as_u64[1] = new_sh };
557
558   app_worker_add_event_custom (app_wrk, s->thread_index, &evt);
559   return 0;
560 }
561
562 int
563 app_worker_own_session (app_worker_t * app_wrk, session_t * s)
564 {
565   segment_manager_t *sm;
566   svm_fifo_t *rxf, *txf;
567   int rv;
568
569   if (s->session_state == SESSION_STATE_LISTENING)
570     return application_change_listener_owner (s, app_wrk);
571
572   s->app_wrk_index = app_wrk->wrk_index;
573
574   rxf = s->rx_fifo;
575   txf = s->tx_fifo;
576
577   if (!rxf || !txf)
578     return 0;
579
580   s->rx_fifo = 0;
581   s->tx_fifo = 0;
582
583   sm = app_worker_get_connect_segment_manager (app_wrk);
584   if ((rv = app_worker_alloc_session_fifos (sm, s)))
585     return rv;
586
587   if (!svm_fifo_is_empty_cons (rxf))
588     svm_fifo_clone (s->rx_fifo, rxf);
589
590   if (!svm_fifo_is_empty_cons (txf))
591     svm_fifo_clone (s->tx_fifo, txf);
592
593   segment_manager_dealloc_fifos (rxf, txf);
594
595   return 0;
596 }
597
598 int
599 app_worker_connect_session (app_worker_t *app_wrk, session_endpoint_cfg_t *sep,
600                             session_handle_t *rsh)
601 {
602   if (PREDICT_FALSE (app_worker_mq_is_congested (app_wrk)))
603     return SESSION_E_REFUSED;
604
605   sep->app_wrk_index = app_wrk->wrk_index;
606
607   return session_open (sep, rsh);
608 }
609
610 int
611 app_worker_session_fifo_tuning (app_worker_t * app_wrk, session_t * s,
612                                 svm_fifo_t * f,
613                                 session_ft_action_t act, u32 len)
614 {
615   application_t *app = application_get (app_wrk->app_index);
616   return app->cb_fns.fifo_tuning_callback (s, f, act, len);
617 }
618
619 segment_manager_t *
620 app_worker_get_connect_segment_manager (app_worker_t * app)
621 {
622   ASSERT (app->connects_seg_manager != (u32) ~ 0);
623   return segment_manager_get (app->connects_seg_manager);
624 }
625
626 segment_manager_t *
627 app_worker_get_listen_segment_manager (app_worker_t * app,
628                                        session_t * listener)
629 {
630   uword *smp;
631   smp = hash_get (app->listeners_table, listen_session_get_handle (listener));
632   ALWAYS_ASSERT (smp != 0);
633   return segment_manager_get (*smp);
634 }
635
636 session_t *
637 app_worker_first_listener (app_worker_t * app_wrk, u8 fib_proto,
638                            u8 transport_proto)
639 {
640   session_t *listener;
641   u64 handle;
642   u32 sm_index;
643   u8 sst;
644
645   sst = session_type_from_proto_and_ip (transport_proto,
646                                         fib_proto == FIB_PROTOCOL_IP4);
647
648   /* *INDENT-OFF* */
649    hash_foreach (handle, sm_index, app_wrk->listeners_table, ({
650      listener = listen_session_get_from_handle (handle);
651      if (listener->session_type == sst
652          && !(listener->flags & SESSION_F_PROXY))
653        return listener;
654    }));
655   /* *INDENT-ON* */
656
657   return 0;
658 }
659
660 session_t *
661 app_worker_proxy_listener (app_worker_t * app_wrk, u8 fib_proto,
662                            u8 transport_proto)
663 {
664   session_t *listener;
665   u64 handle;
666   u32 sm_index;
667   u8 sst;
668
669   sst = session_type_from_proto_and_ip (transport_proto,
670                                         fib_proto == FIB_PROTOCOL_IP4);
671
672   /* *INDENT-OFF* */
673    hash_foreach (handle, sm_index, app_wrk->listeners_table, ({
674      listener = listen_session_get_from_handle (handle);
675      if (listener->session_type == sst && (listener->flags & SESSION_F_PROXY))
676        return listener;
677    }));
678   /* *INDENT-ON* */
679
680   return 0;
681 }
682
683 /**
684  * Send an API message to the external app, to map new segment
685  */
686 int
687 app_worker_add_segment_notify (app_worker_t * app_wrk, u64 segment_handle)
688 {
689   session_event_t evt = { .event_type = SESSION_CTRL_EVT_APP_ADD_SEGMENT,
690                           .as_u64[1] = segment_handle };
691
692   app_worker_add_event_custom (app_wrk, vlib_get_thread_index (), &evt);
693
694   return 0;
695 }
696
697 int
698 app_worker_del_segment_notify (app_worker_t * app_wrk, u64 segment_handle)
699 {
700   session_event_t evt = { .event_type = SESSION_CTRL_EVT_APP_DEL_SEGMENT,
701                           .as_u64[1] = segment_handle };
702
703   app_worker_add_event_custom (app_wrk, vlib_get_thread_index (), &evt);
704
705   return 0;
706 }
707
708 static int
709 app_wrk_send_fd (app_worker_t *app_wrk, int fd)
710 {
711   if (!appns_sapi_enabled ())
712     {
713       vl_api_registration_t *reg;
714       clib_error_t *error;
715
716       reg =
717         vl_mem_api_client_index_to_registration (app_wrk->api_client_index);
718       if (!reg)
719         {
720           clib_warning ("no api registration for client: %u",
721                         app_wrk->api_client_index);
722           return -1;
723         }
724
725       if (vl_api_registration_file_index (reg) == VL_API_INVALID_FI)
726         return -1;
727
728       error = vl_api_send_fd_msg (reg, &fd, 1);
729       if (error)
730         {
731           clib_error_report (error);
732           return -1;
733         }
734
735       return 0;
736     }
737
738   app_sapi_msg_t smsg = { 0 };
739   app_namespace_t *app_ns;
740   clib_error_t *error;
741   application_t *app;
742   clib_socket_t *cs;
743   u32 cs_index;
744
745   app = application_get (app_wrk->app_index);
746   app_ns = app_namespace_get (app->ns_index);
747   cs_index = appns_sapi_handle_sock_index (app_wrk->api_client_index);
748   cs = appns_sapi_get_socket (app_ns, cs_index);
749   if (PREDICT_FALSE (!cs))
750     return -1;
751
752   /* There's no payload for the message only the type */
753   smsg.type = APP_SAPI_MSG_TYPE_SEND_FDS;
754   error = clib_socket_sendmsg (cs, &smsg, sizeof (smsg), &fd, 1);
755   if (error)
756     {
757       clib_error_report (error);
758       return -1;
759     }
760
761   return 0;
762 }
763
764 void
765 app_worker_add_event (app_worker_t *app_wrk, session_t *s,
766                       session_evt_type_t evt_type)
767 {
768   session_event_t *evt;
769
770   ASSERT (s->thread_index == vlib_get_thread_index ());
771   clib_fifo_add2 (app_wrk->wrk_evts[s->thread_index], evt);
772   evt->session_index = s->session_index;
773   evt->event_type = evt_type;
774   evt->postponed = 0;
775
776   /* First event for this app_wrk. Schedule it for handling in session input */
777   if (clib_fifo_elts (app_wrk->wrk_evts[s->thread_index]) == 1)
778     {
779       session_worker_t *wrk = session_main_get_worker (s->thread_index);
780       session_wrk_program_app_wrk_evts (wrk, app_wrk->wrk_index);
781     }
782 }
783
784 void
785 app_worker_add_event_custom (app_worker_t *app_wrk, u32 thread_index,
786                              session_event_t *evt)
787 {
788   clib_fifo_add1 (app_wrk->wrk_evts[thread_index], *evt);
789
790   /* First event for this app_wrk. Schedule it for handling in session input */
791   if (clib_fifo_elts (app_wrk->wrk_evts[thread_index]) == 1)
792     {
793       session_worker_t *wrk = session_main_get_worker (thread_index);
794       session_wrk_program_app_wrk_evts (wrk, app_wrk->wrk_index);
795     }
796 }
797
798 always_inline void
799 app_wrk_send_ctrl_evt_inline (app_worker_t *app_wrk, u8 evt_type, void *msg,
800                               u32 msg_len, int fd)
801 {
802   svm_msg_q_msg_t _mq_msg, *mq_msg = &_mq_msg;
803   svm_msg_q_t *mq = app_wrk->event_queue;
804   session_event_t *evt;
805
806   ASSERT (!svm_msg_q_or_ring_is_full (mq, SESSION_MQ_CTRL_EVT_RING));
807   *mq_msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_CTRL_EVT_RING);
808
809   evt = svm_msg_q_msg_data (mq, mq_msg);
810   clib_memset (evt, 0, sizeof (*evt));
811   evt->event_type = evt_type;
812   clib_memcpy_fast (evt->data, msg, msg_len);
813
814   if (fd != -1)
815     app_wrk_send_fd (app_wrk, fd);
816
817   svm_msg_q_add_raw (mq, mq_msg);
818 }
819
820 void
821 app_wrk_send_ctrl_evt_fd (app_worker_t *app_wrk, u8 evt_type, void *msg,
822                           u32 msg_len, int fd)
823 {
824   app_wrk_send_ctrl_evt_inline (app_wrk, evt_type, msg, msg_len, fd);
825 }
826
827 void
828 app_wrk_send_ctrl_evt (app_worker_t *app_wrk, u8 evt_type, void *msg,
829                        u32 msg_len)
830 {
831   app_wrk_send_ctrl_evt_inline (app_wrk, evt_type, msg, msg_len, -1);
832 }
833
834 u8
835 app_worker_mq_wrk_is_congested (app_worker_t *app_wrk, u32 thread_index)
836 {
837   return app_wrk->wrk_mq_congested[thread_index] > 0;
838 }
839
840 void
841 app_worker_set_mq_wrk_congested (app_worker_t *app_wrk, u32 thread_index)
842 {
843   clib_atomic_fetch_add_relax (&app_wrk->mq_congested, 1);
844   ASSERT (thread_index == vlib_get_thread_index ());
845   app_wrk->wrk_mq_congested[thread_index] = 1;
846 }
847
848 void
849 app_worker_unset_wrk_mq_congested (app_worker_t *app_wrk, u32 thread_index)
850 {
851   clib_atomic_fetch_sub_relax (&app_wrk->mq_congested, 1);
852   ASSERT (thread_index == vlib_get_thread_index ());
853   app_wrk->wrk_mq_congested[thread_index] = 0;
854 }
855
856 u8 *
857 format_app_worker_listener (u8 * s, va_list * args)
858 {
859   app_worker_t *app_wrk = va_arg (*args, app_worker_t *);
860   u64 handle = va_arg (*args, u64);
861   u32 sm_index = va_arg (*args, u32);
862   int verbose = va_arg (*args, int);
863   session_t *listener;
864   const u8 *app_name;
865   u8 *str;
866
867   if (!app_wrk)
868     {
869       if (verbose)
870         s = format (s, "%-" SESSION_CLI_ID_LEN "s%-25s%-10s%-15s%-15s%-10s",
871                     "Connection", "App", "Wrk", "API Client", "ListenerID",
872                     "SegManager");
873       else
874         s = format (s, "%-" SESSION_CLI_ID_LEN "s%-25s%-10s", "Connection",
875                     "App", "Wrk");
876
877       return s;
878     }
879
880   app_name = application_name_from_index (app_wrk->app_index);
881   listener = listen_session_get_from_handle (handle);
882   str = format (0, "%U", format_session, listener, verbose);
883
884   if (verbose)
885     {
886       u8 *buf;
887       buf = format (0, "%u(%u)", app_wrk->wrk_map_index, app_wrk->wrk_index);
888       s = format (s, "%-" SESSION_CLI_ID_LEN "v%-25v%-10v%-15u%-15u%-10u", str,
889                   app_name, buf, app_wrk->api_client_index, handle, sm_index);
890       vec_free (buf);
891     }
892   else
893     s = format (s, "%-" SESSION_CLI_ID_LEN "v%-25v%=10u", str, app_name,
894                 app_wrk->wrk_map_index);
895
896   vec_free (str);
897
898   return s;
899 }
900
901 u8 *
902 format_app_worker (u8 * s, va_list * args)
903 {
904   app_worker_t *app_wrk = va_arg (*args, app_worker_t *);
905   u32 indent = 1;
906
907   s = format (s,
908               "%U wrk-index %u app-index %u map-index %u "
909               "api-client-index %d mq-cong %u\n",
910               format_white_space, indent, app_wrk->wrk_index,
911               app_wrk->app_index, app_wrk->wrk_map_index,
912               app_wrk->api_client_index, app_wrk->mq_congested);
913   return s;
914 }
915
916 void
917 app_worker_format_connects (app_worker_t * app_wrk, int verbose)
918 {
919   segment_manager_t *sm;
920
921   /* Header */
922   if (!app_wrk)
923     {
924       segment_manager_format_sessions (0, verbose);
925       return;
926     }
927
928   if (app_wrk->connects_seg_manager == (u32) ~ 0)
929     return;
930
931   sm = segment_manager_get (app_wrk->connects_seg_manager);
932   segment_manager_format_sessions (sm, verbose);
933 }
934
935 /*
936  * fd.io coding-style-patch-verification: ON
937  *
938  * Local Variables:
939  * eval: (c-set-style "gnu")
940  * End:
941  */