session: improve use of session handles
[vpp.git] / src / vnet / session / application_worker.c
1 /*
2  * Copyright (c) 2019 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include <vnet/session/application.h>
17 #include <vnet/session/application_interface.h>
18 #include <vnet/session/session.h>
19
20 /**
21  * Pool of workers associated to apps
22  */
23 static app_worker_t *app_workers;
24
25 app_worker_t *
26 app_worker_alloc (application_t * app)
27 {
28   app_worker_t *app_wrk;
29
30   pool_get (app_workers, app_wrk);
31   clib_memset (app_wrk, 0, sizeof (*app_wrk));
32   app_wrk->wrk_index = app_wrk - app_workers;
33   app_wrk->app_index = app->app_index;
34   app_wrk->wrk_map_index = ~0;
35   app_wrk->connects_seg_manager = APP_INVALID_SEGMENT_MANAGER_INDEX;
36   clib_spinlock_init (&app_wrk->detached_seg_managers_lock);
37   vec_validate (app_wrk->wrk_evts, vlib_num_workers ());
38   vec_validate (app_wrk->wrk_mq_congested, vlib_num_workers ());
39   APP_DBG ("New app %v worker %u", app->name, app_wrk->wrk_index);
40   return app_wrk;
41 }
42
43 app_worker_t *
44 app_worker_get (u32 wrk_index)
45 {
46   return pool_elt_at_index (app_workers, wrk_index);
47 }
48
49 app_worker_t *
50 app_worker_get_if_valid (u32 wrk_index)
51 {
52   if (pool_is_free_index (app_workers, wrk_index))
53     return 0;
54   return pool_elt_at_index (app_workers, wrk_index);
55 }
56
57 void
58 app_worker_free (app_worker_t * app_wrk)
59 {
60   application_t *app = application_get (app_wrk->app_index);
61   session_handle_t handle, *handles = 0, *sh;
62   vnet_unlisten_args_t _a, *a = &_a;
63   segment_manager_t *sm;
64   u64 *sm_indices = 0;
65   session_t *ls;
66   u32 sm_index;
67   int i;
68
69   /*
70    * Cleanup vpp wrk events
71    */
72   app_worker_del_all_events (app_wrk);
73   for (i = 0; i < vec_len (app_wrk->wrk_evts); i++)
74     clib_fifo_free (app_wrk->wrk_evts[i]);
75
76   vec_free (app_wrk->wrk_evts);
77   vec_free (app_wrk->wrk_mq_congested);
78
79   /*
80    *  Listener cleanup
81    */
82
83   hash_foreach (handle, sm_index, app_wrk->listeners_table, ({
84     ls = listen_session_get_from_handle (handle);
85     vec_add1 (handles, app_listen_session_handle (ls));
86     vec_add1 (sm_indices, sm_index);
87     sm = segment_manager_get (sm_index);
88   }));
89
90   for (i = 0; i < vec_len (handles); i++)
91     {
92       /* Cleanup listener */
93       a->app_index = app->app_index;
94       a->wrk_map_index = app_wrk->wrk_map_index;
95       a->handle = handles[i];
96       (void) vnet_unlisten (a);
97
98       sm = segment_manager_get_if_valid (sm_indices[i]);
99       if (sm && !segment_manager_app_detached (sm))
100         {
101           sm->first_is_protected = 0;
102           segment_manager_init_free (sm);
103         }
104     }
105   vec_free (handles);
106   vec_free (sm_indices);
107   hash_free (app_wrk->listeners_table);
108
109   /*
110    * Connects segment manager cleanup
111    */
112
113   if (app_wrk->connects_seg_manager != APP_INVALID_SEGMENT_MANAGER_INDEX)
114     {
115       sm = segment_manager_get (app_wrk->connects_seg_manager);
116       sm->app_wrk_index = SEGMENT_MANAGER_INVALID_APP_INDEX;
117       sm->first_is_protected = 0;
118       segment_manager_init_free (sm);
119     }
120
121   /*
122    * Half-open cleanup
123    */
124
125   pool_foreach (sh, app_wrk->half_open_table)
126     session_cleanup_half_open (*sh);
127
128   pool_free (app_wrk->half_open_table);
129
130   /*
131    * Detached listener segment managers cleanup
132    */
133   for (i = 0; i < vec_len (app_wrk->detached_seg_managers); i++)
134     {
135       sm = segment_manager_get (app_wrk->detached_seg_managers[i]);
136       segment_manager_init_free (sm);
137     }
138   vec_free (app_wrk->detached_seg_managers);
139   clib_spinlock_free (&app_wrk->detached_seg_managers_lock);
140
141   if (CLIB_DEBUG)
142     clib_memset (app_wrk, 0xfe, sizeof (*app_wrk));
143   pool_put (app_workers, app_wrk);
144 }
145
146 application_t *
147 app_worker_get_app (u32 wrk_index)
148 {
149   app_worker_t *app_wrk;
150   app_wrk = app_worker_get_if_valid (wrk_index);
151   if (!app_wrk)
152     return 0;
153   return application_get_if_valid (app_wrk->app_index);
154 }
155
156 static segment_manager_t *
157 app_worker_alloc_segment_manager (app_worker_t * app_wrk)
158 {
159   segment_manager_t *sm;
160
161   sm = segment_manager_alloc ();
162   sm->app_wrk_index = app_wrk->wrk_index;
163   segment_manager_init (sm);
164   return sm;
165 }
166
167 static int
168 app_worker_alloc_session_fifos (segment_manager_t * sm, session_t * s)
169 {
170   svm_fifo_t *rx_fifo = 0, *tx_fifo = 0;
171   int rv;
172
173   if ((rv = segment_manager_alloc_session_fifos (sm, s->thread_index,
174                                                  &rx_fifo, &tx_fifo)))
175     return rv;
176
177   rx_fifo->shr->master_session_index = s->session_index;
178   rx_fifo->master_thread_index = s->thread_index;
179
180   tx_fifo->shr->master_session_index = s->session_index;
181   tx_fifo->master_thread_index = s->thread_index;
182
183   s->rx_fifo = rx_fifo;
184   s->tx_fifo = tx_fifo;
185   return 0;
186 }
187
188 int
189 app_worker_alloc_wrk_cl_session (app_worker_t *app_wrk, session_t *ls)
190 {
191   svm_fifo_t *rx_fifo = 0, *tx_fifo = 0;
192   segment_manager_t *sm;
193   session_handle_t lsh;
194   app_listener_t *al;
195   session_t *s;
196
197   al = app_listener_get (ls->al_index);
198   sm = app_worker_get_listen_segment_manager (app_wrk, ls);
199   lsh = session_handle (ls);
200
201   s = session_alloc (0 /* listener on main worker */);
202   session_set_state (s, SESSION_STATE_LISTENING);
203   s->flags |= SESSION_F_IS_CLESS;
204   s->app_wrk_index = app_wrk->wrk_index;
205   ls = session_get_from_handle (lsh);
206   s->session_type = ls->session_type;
207   s->connection_index = ls->connection_index;
208
209   segment_manager_alloc_session_fifos (sm, s->thread_index, &rx_fifo,
210                                        &tx_fifo);
211
212   rx_fifo->shr->master_session_index = s->session_index;
213   rx_fifo->master_thread_index = s->thread_index;
214
215   tx_fifo->shr->master_session_index = s->session_index;
216   tx_fifo->master_thread_index = s->thread_index;
217
218   s->rx_fifo = rx_fifo;
219   s->tx_fifo = tx_fifo;
220
221   vec_validate (al->cl_listeners, app_wrk->wrk_map_index);
222   al->cl_listeners[app_wrk->wrk_map_index] = s->session_index;
223
224   return 0;
225 }
226
227 void
228 app_worker_free_wrk_cl_session (app_worker_t *app_wrk, session_t *ls)
229 {
230   app_listener_t *al;
231   session_t *s;
232
233   al = app_listener_get (ls->al_index);
234
235   s = app_listener_get_wrk_cl_session (al, app_wrk->wrk_map_index);
236   segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo);
237   session_free (s);
238
239   al->cl_listeners[app_wrk->wrk_map_index] = SESSION_INVALID_INDEX;
240 }
241
242 int
243 app_worker_init_listener (app_worker_t * app_wrk, session_t * ls)
244 {
245   segment_manager_t *sm;
246
247   /* Allocate segment manager. All sessions derived out of a listen session
248    * have fifos allocated by the same segment manager.
249    * TODO(fcoras): limit memory consumption by cless listeners */
250   if (!(sm = app_worker_alloc_segment_manager (app_wrk)))
251     return SESSION_E_ALLOC;
252
253   /* Once the first segment is mapped, don't remove it until unlisten */
254   sm->first_is_protected = 1;
255
256   /* Keep track of the segment manager for the listener or this worker */
257   hash_set (app_wrk->listeners_table, listen_session_get_handle (ls),
258             segment_manager_index (sm));
259
260   if (ls->flags & SESSION_F_IS_CLESS)
261     return app_worker_alloc_wrk_cl_session (app_wrk, ls);
262
263   return 0;
264 }
265
266 session_error_t
267 app_worker_start_listen (app_worker_t *app_wrk, app_listener_t *app_listener)
268 {
269   session_t *ls;
270   int rv;
271
272   if (clib_bitmap_get (app_listener->workers, app_wrk->wrk_map_index))
273     return SESSION_E_ALREADY_LISTENING;
274
275   app_listener->workers = clib_bitmap_set (app_listener->workers,
276                                            app_wrk->wrk_map_index, 1);
277
278   if (app_listener->session_index != SESSION_INVALID_INDEX)
279     {
280       ls = session_get (app_listener->session_index, 0);
281       if ((rv = app_worker_init_listener (app_wrk, ls)))
282         return rv;
283     }
284
285   if (app_listener->local_index != SESSION_INVALID_INDEX)
286     {
287       ls = session_get (app_listener->local_index, 0);
288       if ((rv = app_worker_init_listener (app_wrk, ls)))
289         return rv;
290     }
291
292   return 0;
293 }
294
295 static void
296 app_worker_add_detached_sm (app_worker_t * app_wrk, u32 sm_index)
297 {
298   vec_add1 (app_wrk->detached_seg_managers, sm_index);
299 }
300
301 void
302 app_worker_del_detached_sm (app_worker_t * app_wrk, u32 sm_index)
303 {
304   u32 i;
305
306   clib_spinlock_lock (&app_wrk->detached_seg_managers_lock);
307   for (i = 0; i < vec_len (app_wrk->detached_seg_managers); i++)
308     {
309       if (app_wrk->detached_seg_managers[i] == sm_index)
310         {
311           vec_del1 (app_wrk->detached_seg_managers, i);
312           break;
313         }
314     }
315   clib_spinlock_unlock (&app_wrk->detached_seg_managers_lock);
316 }
317
318 static void
319 app_worker_stop_listen_session (app_worker_t * app_wrk, session_t * ls)
320 {
321   session_handle_t handle;
322   segment_manager_t *sm;
323   uword *sm_indexp;
324   session_state_t *states = 0;
325
326   handle = listen_session_get_handle (ls);
327   sm_indexp = hash_get (app_wrk->listeners_table, handle);
328   if (PREDICT_FALSE (!sm_indexp))
329     return;
330
331   if (ls->flags & SESSION_F_IS_CLESS)
332     app_worker_free_wrk_cl_session (app_wrk, ls);
333
334   /* Try to cleanup segment manager */
335   sm = segment_manager_get (*sm_indexp);
336   if (sm)
337     {
338       sm->first_is_protected = 0;
339       segment_manager_app_detach (sm);
340       if (!segment_manager_has_fifos (sm))
341         {
342           /* Empty segment manager, cleanup it up */
343           segment_manager_free (sm);
344         }
345       else
346         {
347           /* Delete sessions in CREATED state */
348           vec_add1 (states, SESSION_STATE_CREATED);
349           segment_manager_del_sessions_filter (sm, states);
350           vec_free (states);
351
352           /* Track segment manager in case app detaches and all the
353            * outstanding sessions need to be closed */
354           app_worker_add_detached_sm (app_wrk, *sm_indexp);
355           sm->flags |= SEG_MANAGER_F_DETACHED_LISTENER;
356         }
357     }
358
359   hash_unset (app_wrk->listeners_table, handle);
360 }
361
362 int
363 app_worker_stop_listen (app_worker_t * app_wrk, app_listener_t * al)
364 {
365   session_t *ls;
366
367   if (!clib_bitmap_get (al->workers, app_wrk->wrk_map_index))
368     return 0;
369
370   if (al->session_index != SESSION_INVALID_INDEX)
371     {
372       ls = listen_session_get (al->session_index);
373       app_worker_stop_listen_session (app_wrk, ls);
374     }
375
376   if (al->local_index != SESSION_INVALID_INDEX)
377     {
378       ls = listen_session_get (al->local_index);
379       app_worker_stop_listen_session (app_wrk, ls);
380     }
381
382   clib_bitmap_set_no_check (al->workers, app_wrk->wrk_map_index, 0);
383   if (clib_bitmap_is_zero (al->workers))
384     app_listener_cleanup (al);
385
386   return 0;
387 }
388
389 int
390 app_worker_init_accepted (session_t * s)
391 {
392   app_worker_t *app_wrk;
393   segment_manager_t *sm;
394   session_t *listener;
395   application_t *app;
396
397   listener = listen_session_get_from_handle (s->listener_handle);
398   app_wrk = application_listener_select_worker (listener);
399   if (PREDICT_FALSE (app_worker_mq_is_congested (app_wrk)))
400     return -1;
401
402   s->app_wrk_index = app_wrk->wrk_index;
403   app = application_get (app_wrk->app_index);
404   if (app->cb_fns.fifo_tuning_callback)
405     s->flags |= SESSION_F_CUSTOM_FIFO_TUNING;
406
407   sm = app_worker_get_listen_segment_manager (app_wrk, listener);
408   if (app_worker_alloc_session_fifos (sm, s))
409     return -1;
410
411   return 0;
412 }
413
414 int
415 app_worker_listened_notify (app_worker_t *app_wrk, session_handle_t alsh,
416                             u32 opaque, session_error_t err)
417 {
418   session_event_t evt = { .event_type = SESSION_CTRL_EVT_BOUND,
419                           .as_u64[0] = alsh,
420                           .as_u64[1] = (u64) opaque << 32 | (u32) err };
421
422   app_worker_add_event_custom (app_wrk, 0 /* thread index */, &evt);
423
424   return 0;
425 }
426
427 int
428 app_worker_unlisten_reply (app_worker_t *app_wrk, session_handle_t sh,
429                            u32 opaque, session_error_t err)
430 {
431   session_event_t evt = { .event_type = SESSION_CTRL_EVT_UNLISTEN_REPLY,
432                           .as_u64[0] = sh,
433                           .as_u64[1] = (u64) opaque << 32 | (u32) err };
434
435   app_worker_add_event_custom (app_wrk, 0 /* thread index */, &evt);
436   return 0;
437 }
438
439 int
440 app_worker_accept_notify (app_worker_t * app_wrk, session_t * s)
441 {
442   app_worker_add_event (app_wrk, s, SESSION_CTRL_EVT_ACCEPTED);
443   return 0;
444 }
445
446 int
447 app_worker_init_connected (app_worker_t * app_wrk, session_t * s)
448 {
449   application_t *app = application_get (app_wrk->app_index);
450   segment_manager_t *sm;
451
452   if (app->cb_fns.fifo_tuning_callback)
453     s->flags |= SESSION_F_CUSTOM_FIFO_TUNING;
454
455   /* Allocate fifos for session, unless the app is a builtin proxy */
456   if (application_is_builtin_proxy (app))
457     return app->cb_fns.proxy_alloc_session_fifos (s);
458
459   sm = app_worker_get_connect_segment_manager (app_wrk);
460   return app_worker_alloc_session_fifos (sm, s);
461 }
462
463 int
464 app_worker_connect_notify (app_worker_t * app_wrk, session_t * s,
465                            session_error_t err, u32 opaque)
466 {
467   session_event_t evt = { .event_type = SESSION_CTRL_EVT_CONNECTED,
468                           .as_u64[0] = s ? s->session_index : ~0,
469                           .as_u64[1] = (u64) opaque << 32 | (u32) err };
470   u32 thread_index = s ? s->thread_index : vlib_get_thread_index ();
471
472   app_worker_add_event_custom (app_wrk, thread_index, &evt);
473   return 0;
474 }
475
476 int
477 app_worker_add_half_open (app_worker_t *app_wrk, session_handle_t sh)
478 {
479   session_handle_t *shp;
480
481   ASSERT (session_vlib_thread_is_cl_thread ());
482   pool_get (app_wrk->half_open_table, shp);
483   *shp = sh;
484
485   return (shp - app_wrk->half_open_table);
486 }
487
488 int
489 app_worker_del_half_open (app_worker_t *app_wrk, session_t *s)
490 {
491   app_worker_add_event (app_wrk, s, SESSION_CTRL_EVT_HALF_CLEANUP);
492   return 0;
493 }
494
495 int
496 app_worker_close_notify (app_worker_t * app_wrk, session_t * s)
497 {
498   app_worker_add_event (app_wrk, s, SESSION_CTRL_EVT_DISCONNECTED);
499   return 0;
500 }
501
502 int
503 app_worker_transport_closed_notify (app_worker_t * app_wrk, session_t * s)
504 {
505   app_worker_add_event (app_wrk, s, SESSION_CTRL_EVT_TRANSPORT_CLOSED);
506   return 0;
507 }
508
509 int
510 app_worker_reset_notify (app_worker_t * app_wrk, session_t * s)
511 {
512   app_worker_add_event (app_wrk, s, SESSION_CTRL_EVT_RESET);
513   return 0;
514 }
515
516 int
517 app_worker_cleanup_notify (app_worker_t * app_wrk, session_t * s,
518                            session_cleanup_ntf_t ntf)
519 {
520   session_event_t evt = { .event_type = SESSION_CTRL_EVT_CLEANUP,
521                           .as_u64[0] = (u64) ntf << 32 | s->session_index,
522                           .as_u64[1] = pointer_to_uword (session_cleanup) };
523
524   app_worker_add_event_custom (app_wrk, s->thread_index, &evt);
525
526   return 0;
527 }
528
529 int
530 app_worker_cleanup_notify_custom (app_worker_t *app_wrk, session_t *s,
531                                   session_cleanup_ntf_t ntf,
532                                   void (*cleanup_cb) (session_t *s))
533 {
534   session_event_t evt = { .event_type = SESSION_CTRL_EVT_CLEANUP,
535                           .as_u64[0] = (u64) ntf << 32 | s->session_index,
536                           .as_u64[1] = pointer_to_uword (cleanup_cb) };
537
538   app_worker_add_event_custom (app_wrk, s->thread_index, &evt);
539
540   return 0;
541 }
542
543 int
544 app_worker_rx_notify (app_worker_t *app_wrk, session_t *s)
545 {
546   app_worker_add_event (app_wrk, s, SESSION_IO_EVT_RX);
547   return 0;
548 }
549
550 int
551 app_worker_migrate_notify (app_worker_t * app_wrk, session_t * s,
552                            session_handle_t new_sh)
553 {
554   session_event_t evt = { .event_type = SESSION_CTRL_EVT_MIGRATED,
555                           .as_u64[0] = s->session_index,
556                           .as_u64[1] = new_sh };
557
558   app_worker_add_event_custom (app_wrk, s->thread_index, &evt);
559   return 0;
560 }
561
562 int
563 app_worker_own_session (app_worker_t * app_wrk, session_t * s)
564 {
565   segment_manager_t *sm;
566   svm_fifo_t *rxf, *txf;
567   int rv;
568
569   if (s->session_state == SESSION_STATE_LISTENING)
570     return application_change_listener_owner (s, app_wrk);
571
572   s->app_wrk_index = app_wrk->wrk_index;
573
574   rxf = s->rx_fifo;
575   txf = s->tx_fifo;
576
577   if (!rxf || !txf)
578     return 0;
579
580   s->rx_fifo = 0;
581   s->tx_fifo = 0;
582
583   sm = app_worker_get_connect_segment_manager (app_wrk);
584   if ((rv = app_worker_alloc_session_fifos (sm, s)))
585     return rv;
586
587   if (!svm_fifo_is_empty_cons (rxf))
588     svm_fifo_clone (s->rx_fifo, rxf);
589
590   if (!svm_fifo_is_empty_cons (txf))
591     svm_fifo_clone (s->tx_fifo, txf);
592
593   segment_manager_dealloc_fifos (rxf, txf);
594
595   return 0;
596 }
597
598 int
599 app_worker_connect_session (app_worker_t *app_wrk, session_endpoint_cfg_t *sep,
600                             session_handle_t *rsh)
601 {
602   if (PREDICT_FALSE (app_worker_mq_is_congested (app_wrk)))
603     return SESSION_E_REFUSED;
604
605   sep->app_wrk_index = app_wrk->wrk_index;
606
607   return session_open (sep, rsh);
608 }
609
610 int
611 app_worker_session_fifo_tuning (app_worker_t * app_wrk, session_t * s,
612                                 svm_fifo_t * f,
613                                 session_ft_action_t act, u32 len)
614 {
615   application_t *app = application_get (app_wrk->app_index);
616   return app->cb_fns.fifo_tuning_callback (s, f, act, len);
617 }
618
619 segment_manager_t *
620 app_worker_get_connect_segment_manager (app_worker_t * app)
621 {
622   ASSERT (app->connects_seg_manager != (u32) ~ 0);
623   return segment_manager_get (app->connects_seg_manager);
624 }
625
626 segment_manager_t *
627 app_worker_get_listen_segment_manager (app_worker_t * app,
628                                        session_t * listener)
629 {
630   uword *smp;
631   smp = hash_get (app->listeners_table, listen_session_get_handle (listener));
632   ALWAYS_ASSERT (smp != 0);
633   return segment_manager_get (*smp);
634 }
635
636 session_t *
637 app_worker_first_listener (app_worker_t * app_wrk, u8 fib_proto,
638                            u8 transport_proto)
639 {
640   session_t *listener;
641   u64 handle;
642   u32 sm_index;
643   u8 sst;
644
645   sst = session_type_from_proto_and_ip (transport_proto,
646                                         fib_proto == FIB_PROTOCOL_IP4);
647
648    hash_foreach (handle, sm_index, app_wrk->listeners_table, ({
649      listener = listen_session_get_from_handle (handle);
650      if (listener->session_type == sst
651          && !(listener->flags & SESSION_F_PROXY))
652        return listener;
653    }));
654
655   return 0;
656 }
657
658 session_t *
659 app_worker_proxy_listener (app_worker_t * app_wrk, u8 fib_proto,
660                            u8 transport_proto)
661 {
662   session_t *listener;
663   u64 handle;
664   u32 sm_index;
665   u8 sst;
666
667   sst = session_type_from_proto_and_ip (transport_proto,
668                                         fib_proto == FIB_PROTOCOL_IP4);
669
670    hash_foreach (handle, sm_index, app_wrk->listeners_table, ({
671      listener = listen_session_get_from_handle (handle);
672      if (listener->session_type == sst && (listener->flags & SESSION_F_PROXY))
673        return listener;
674    }));
675
676   return 0;
677 }
678
679 /**
680  * Send an API message to the external app, to map new segment
681  */
682 int
683 app_worker_add_segment_notify (app_worker_t * app_wrk, u64 segment_handle)
684 {
685   session_event_t evt = { .event_type = SESSION_CTRL_EVT_APP_ADD_SEGMENT,
686                           .as_u64[1] = segment_handle };
687
688   app_worker_add_event_custom (app_wrk, vlib_get_thread_index (), &evt);
689
690   return 0;
691 }
692
693 int
694 app_worker_del_segment_notify (app_worker_t * app_wrk, u64 segment_handle)
695 {
696   session_event_t evt = { .event_type = SESSION_CTRL_EVT_APP_DEL_SEGMENT,
697                           .as_u64[1] = segment_handle };
698
699   app_worker_add_event_custom (app_wrk, vlib_get_thread_index (), &evt);
700
701   return 0;
702 }
703
704 static int
705 app_wrk_send_fd (app_worker_t *app_wrk, int fd)
706 {
707   if (!appns_sapi_enabled ())
708     {
709       vl_api_registration_t *reg;
710       clib_error_t *error;
711
712       reg =
713         vl_mem_api_client_index_to_registration (app_wrk->api_client_index);
714       if (!reg)
715         {
716           clib_warning ("no api registration for client: %u",
717                         app_wrk->api_client_index);
718           return -1;
719         }
720
721       if (vl_api_registration_file_index (reg) == VL_API_INVALID_FI)
722         return -1;
723
724       error = vl_api_send_fd_msg (reg, &fd, 1);
725       if (error)
726         {
727           clib_error_report (error);
728           return -1;
729         }
730
731       return 0;
732     }
733
734   app_sapi_msg_t smsg = { 0 };
735   app_namespace_t *app_ns;
736   clib_error_t *error;
737   application_t *app;
738   clib_socket_t *cs;
739   u32 cs_index;
740
741   app = application_get (app_wrk->app_index);
742   app_ns = app_namespace_get (app->ns_index);
743   cs_index = appns_sapi_handle_sock_index (app_wrk->api_client_index);
744   cs = appns_sapi_get_socket (app_ns, cs_index);
745   if (PREDICT_FALSE (!cs))
746     return -1;
747
748   /* There's no payload for the message only the type */
749   smsg.type = APP_SAPI_MSG_TYPE_SEND_FDS;
750   error = clib_socket_sendmsg (cs, &smsg, sizeof (smsg), &fd, 1);
751   if (error)
752     {
753       clib_error_report (error);
754       return -1;
755     }
756
757   return 0;
758 }
759
760 void
761 app_worker_add_event (app_worker_t *app_wrk, session_t *s,
762                       session_evt_type_t evt_type)
763 {
764   session_event_t *evt;
765
766   ASSERT (s->thread_index == vlib_get_thread_index ());
767   clib_fifo_add2 (app_wrk->wrk_evts[s->thread_index], evt);
768   evt->session_index = s->session_index;
769   evt->event_type = evt_type;
770   evt->postponed = 0;
771
772   /* First event for this app_wrk. Schedule it for handling in session input */
773   if (clib_fifo_elts (app_wrk->wrk_evts[s->thread_index]) == 1)
774     {
775       session_worker_t *wrk = session_main_get_worker (s->thread_index);
776       session_wrk_program_app_wrk_evts (wrk, app_wrk->wrk_index);
777     }
778 }
779
780 void
781 app_worker_add_event_custom (app_worker_t *app_wrk, u32 thread_index,
782                              session_event_t *evt)
783 {
784   clib_fifo_add1 (app_wrk->wrk_evts[thread_index], *evt);
785
786   /* First event for this app_wrk. Schedule it for handling in session input */
787   if (clib_fifo_elts (app_wrk->wrk_evts[thread_index]) == 1)
788     {
789       session_worker_t *wrk = session_main_get_worker (thread_index);
790       session_wrk_program_app_wrk_evts (wrk, app_wrk->wrk_index);
791     }
792 }
793
794 always_inline void
795 app_wrk_send_ctrl_evt_inline (app_worker_t *app_wrk, u8 evt_type, void *msg,
796                               u32 msg_len, int fd)
797 {
798   svm_msg_q_msg_t _mq_msg, *mq_msg = &_mq_msg;
799   svm_msg_q_t *mq = app_wrk->event_queue;
800   session_event_t *evt;
801
802   ASSERT (!svm_msg_q_or_ring_is_full (mq, SESSION_MQ_CTRL_EVT_RING));
803   *mq_msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_CTRL_EVT_RING);
804
805   evt = svm_msg_q_msg_data (mq, mq_msg);
806   clib_memset (evt, 0, sizeof (*evt));
807   evt->event_type = evt_type;
808   clib_memcpy_fast (evt->data, msg, msg_len);
809
810   if (fd != -1)
811     app_wrk_send_fd (app_wrk, fd);
812
813   svm_msg_q_add_raw (mq, mq_msg);
814 }
815
816 void
817 app_wrk_send_ctrl_evt_fd (app_worker_t *app_wrk, u8 evt_type, void *msg,
818                           u32 msg_len, int fd)
819 {
820   app_wrk_send_ctrl_evt_inline (app_wrk, evt_type, msg, msg_len, fd);
821 }
822
823 void
824 app_wrk_send_ctrl_evt (app_worker_t *app_wrk, u8 evt_type, void *msg,
825                        u32 msg_len)
826 {
827   app_wrk_send_ctrl_evt_inline (app_wrk, evt_type, msg, msg_len, -1);
828 }
829
830 u8
831 app_worker_mq_wrk_is_congested (app_worker_t *app_wrk, u32 thread_index)
832 {
833   return app_wrk->wrk_mq_congested[thread_index] > 0;
834 }
835
836 void
837 app_worker_set_mq_wrk_congested (app_worker_t *app_wrk, u32 thread_index)
838 {
839   clib_atomic_fetch_add_relax (&app_wrk->mq_congested, 1);
840   ASSERT (thread_index == vlib_get_thread_index ());
841   app_wrk->wrk_mq_congested[thread_index] = 1;
842 }
843
844 void
845 app_worker_unset_wrk_mq_congested (app_worker_t *app_wrk, u32 thread_index)
846 {
847   clib_atomic_fetch_sub_relax (&app_wrk->mq_congested, 1);
848   ASSERT (thread_index == vlib_get_thread_index ());
849   app_wrk->wrk_mq_congested[thread_index] = 0;
850 }
851
852 u8 *
853 format_app_worker_listener (u8 * s, va_list * args)
854 {
855   app_worker_t *app_wrk = va_arg (*args, app_worker_t *);
856   session_handle_t handle = va_arg (*args, u64);
857   u32 sm_index = va_arg (*args, u32);
858   int verbose = va_arg (*args, int);
859   session_t *listener;
860   const u8 *app_name;
861   u8 *str;
862
863   if (!app_wrk)
864     {
865       if (verbose)
866         s = format (s, "%-" SESSION_CLI_ID_LEN "s%-25s%-10s%-15s%-15s%-10s",
867                     "Connection", "App", "Wrk", "API Client", "ListenerID",
868                     "SegManager");
869       else
870         s = format (s, "%-" SESSION_CLI_ID_LEN "s%-25s%-10s", "Connection",
871                     "App", "Wrk");
872
873       return s;
874     }
875
876   app_name = application_name_from_index (app_wrk->app_index);
877   listener = listen_session_get_from_handle (handle);
878   str = format (0, "%U", format_session, listener, verbose);
879
880   if (verbose)
881     {
882       u8 *buf;
883       buf = format (0, "%u(%u)", app_wrk->wrk_map_index, app_wrk->wrk_index);
884       s = format (s, "%-" SESSION_CLI_ID_LEN "v%-25v%-10v%-15u%-15u%-10u", str,
885                   app_name, buf, app_wrk->api_client_index, handle, sm_index);
886       vec_free (buf);
887     }
888   else
889     s = format (s, "%-" SESSION_CLI_ID_LEN "v%-25v%=10u", str, app_name,
890                 app_wrk->wrk_map_index);
891
892   vec_free (str);
893
894   return s;
895 }
896
897 u8 *
898 format_app_worker (u8 * s, va_list * args)
899 {
900   app_worker_t *app_wrk = va_arg (*args, app_worker_t *);
901   u32 indent = 1;
902
903   s = format (s,
904               "%U wrk-index %u app-index %u map-index %u "
905               "api-client-index %d mq-cong %u\n",
906               format_white_space, indent, app_wrk->wrk_index,
907               app_wrk->app_index, app_wrk->wrk_map_index,
908               app_wrk->api_client_index, app_wrk->mq_congested);
909   return s;
910 }
911
912 void
913 app_worker_format_connects (app_worker_t * app_wrk, int verbose)
914 {
915   segment_manager_t *sm;
916
917   /* Header */
918   if (!app_wrk)
919     {
920       segment_manager_format_sessions (0, verbose);
921       return;
922     }
923
924   if (app_wrk->connects_seg_manager == (u32) ~ 0)
925     return;
926
927   sm = segment_manager_get (app_wrk->connects_seg_manager);
928   segment_manager_format_sessions (sm, verbose);
929 }
930
931 /*
932  * fd.io coding-style-patch-verification: ON
933  *
934  * Local Variables:
935  * eval: (c-set-style "gnu")
936  * End:
937  */