session: fix allocation of proxy fifos
[vpp.git] / src / vnet / session / application_worker.c
1 /*
2  * Copyright (c) 2019 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include <vnet/session/application.h>
17 #include <vnet/session/application_interface.h>
18 #include <vnet/session/session.h>
19
20 /**
21  * Pool of workers associated to apps
22  */
23 static app_worker_t *app_workers;
24
25 app_worker_t *
26 app_worker_alloc (application_t * app)
27 {
28   app_worker_t *app_wrk;
29
30   pool_get (app_workers, app_wrk);
31   clib_memset (app_wrk, 0, sizeof (*app_wrk));
32   app_wrk->wrk_index = app_wrk - app_workers;
33   app_wrk->app_index = app->app_index;
34   app_wrk->wrk_map_index = ~0;
35   app_wrk->connects_seg_manager = APP_INVALID_SEGMENT_MANAGER_INDEX;
36   clib_spinlock_init (&app_wrk->detached_seg_managers_lock);
37   vec_validate (app_wrk->wrk_evts, vlib_num_workers ());
38   vec_validate (app_wrk->wrk_mq_congested, vlib_num_workers ());
39   APP_DBG ("New app %v worker %u", app->name, app_wrk->wrk_index);
40   return app_wrk;
41 }
42
43 app_worker_t *
44 app_worker_get (u32 wrk_index)
45 {
46   return pool_elt_at_index (app_workers, wrk_index);
47 }
48
49 app_worker_t *
50 app_worker_get_if_valid (u32 wrk_index)
51 {
52   if (pool_is_free_index (app_workers, wrk_index))
53     return 0;
54   return pool_elt_at_index (app_workers, wrk_index);
55 }
56
57 void
58 app_worker_free (app_worker_t * app_wrk)
59 {
60   application_t *app = application_get (app_wrk->app_index);
61   vnet_unlisten_args_t _a, *a = &_a;
62   u64 handle, *handles = 0, *sm_indices = 0;
63   segment_manager_t *sm;
64   session_handle_t *sh;
65   session_t *ls;
66   u32 sm_index;
67   int i;
68
69   /*
70    * Cleanup vpp wrk events
71    */
72   app_worker_del_all_events (app_wrk);
73   for (i = 0; i < vec_len (app_wrk->wrk_evts); i++)
74     clib_fifo_free (app_wrk->wrk_evts[i]);
75
76   vec_free (app_wrk->wrk_evts);
77   vec_free (app_wrk->wrk_mq_congested);
78
79   /*
80    *  Listener cleanup
81    */
82
83   hash_foreach (handle, sm_index, app_wrk->listeners_table, ({
84     ls = listen_session_get_from_handle (handle);
85     vec_add1 (handles, app_listen_session_handle (ls));
86     vec_add1 (sm_indices, sm_index);
87     sm = segment_manager_get (sm_index);
88   }));
89
90   for (i = 0; i < vec_len (handles); i++)
91     {
92       /* Cleanup listener */
93       a->app_index = app->app_index;
94       a->wrk_map_index = app_wrk->wrk_map_index;
95       a->handle = handles[i];
96       (void) vnet_unlisten (a);
97
98       sm = segment_manager_get_if_valid (sm_indices[i]);
99       if (sm && !segment_manager_app_detached (sm))
100         {
101           sm->first_is_protected = 0;
102           segment_manager_init_free (sm);
103         }
104     }
105   vec_reset_length (handles);
106   vec_free (sm_indices);
107   hash_free (app_wrk->listeners_table);
108
109   /*
110    * Connects segment manager cleanup
111    */
112
113   if (app_wrk->connects_seg_manager != APP_INVALID_SEGMENT_MANAGER_INDEX)
114     {
115       sm = segment_manager_get (app_wrk->connects_seg_manager);
116       sm->app_wrk_index = SEGMENT_MANAGER_INVALID_APP_INDEX;
117       sm->first_is_protected = 0;
118       segment_manager_init_free (sm);
119     }
120
121   /*
122    * Half-open cleanup
123    */
124
125   pool_foreach (sh, app_wrk->half_open_table)
126     session_cleanup_half_open (*sh);
127
128   pool_free (app_wrk->half_open_table);
129
130   /*
131    * Detached listener segment managers cleanup
132    */
133   for (i = 0; i < vec_len (app_wrk->detached_seg_managers); i++)
134     {
135       sm = segment_manager_get (app_wrk->detached_seg_managers[i]);
136       segment_manager_init_free (sm);
137     }
138   vec_free (app_wrk->detached_seg_managers);
139   clib_spinlock_free (&app_wrk->detached_seg_managers_lock);
140
141   if (CLIB_DEBUG)
142     clib_memset (app_wrk, 0xfe, sizeof (*app_wrk));
143   pool_put (app_workers, app_wrk);
144 }
145
146 application_t *
147 app_worker_get_app (u32 wrk_index)
148 {
149   app_worker_t *app_wrk;
150   app_wrk = app_worker_get_if_valid (wrk_index);
151   if (!app_wrk)
152     return 0;
153   return application_get_if_valid (app_wrk->app_index);
154 }
155
156 static segment_manager_t *
157 app_worker_alloc_segment_manager (app_worker_t * app_wrk)
158 {
159   segment_manager_t *sm;
160
161   sm = segment_manager_alloc ();
162   sm->app_wrk_index = app_wrk->wrk_index;
163   segment_manager_init (sm);
164   return sm;
165 }
166
167 static int
168 app_worker_alloc_session_fifos (segment_manager_t * sm, session_t * s)
169 {
170   svm_fifo_t *rx_fifo = 0, *tx_fifo = 0;
171   int rv;
172
173   if ((rv = segment_manager_alloc_session_fifos (sm, s->thread_index,
174                                                  &rx_fifo, &tx_fifo)))
175     return rv;
176
177   rx_fifo->shr->master_session_index = s->session_index;
178   rx_fifo->master_thread_index = s->thread_index;
179
180   tx_fifo->shr->master_session_index = s->session_index;
181   tx_fifo->master_thread_index = s->thread_index;
182
183   s->rx_fifo = rx_fifo;
184   s->tx_fifo = tx_fifo;
185   return 0;
186 }
187
188 int
189 app_worker_init_listener (app_worker_t * app_wrk, session_t * ls)
190 {
191   segment_manager_t *sm;
192
193   /* Allocate segment manager. All sessions derived out of a listen session
194    * have fifos allocated by the same segment manager. */
195   if (!(sm = app_worker_alloc_segment_manager (app_wrk)))
196     return SESSION_E_ALLOC;
197
198   /* Once the first segment is mapped, don't remove it until unlisten */
199   sm->first_is_protected = 1;
200
201   /* Keep track of the segment manager for the listener or this worker */
202   hash_set (app_wrk->listeners_table, listen_session_get_handle (ls),
203             segment_manager_index (sm));
204
205   if (transport_connection_is_cless (session_get_transport (ls)))
206     {
207       if (ls->rx_fifo)
208         return SESSION_E_NOSUPPORT;
209       return app_worker_alloc_session_fifos (sm, ls);
210     }
211   return 0;
212 }
213
214 session_error_t
215 app_worker_start_listen (app_worker_t *app_wrk, app_listener_t *app_listener)
216 {
217   session_t *ls;
218   int rv;
219
220   if (clib_bitmap_get (app_listener->workers, app_wrk->wrk_map_index))
221     return SESSION_E_ALREADY_LISTENING;
222
223   app_listener->workers = clib_bitmap_set (app_listener->workers,
224                                            app_wrk->wrk_map_index, 1);
225
226   if (app_listener->session_index != SESSION_INVALID_INDEX)
227     {
228       ls = session_get (app_listener->session_index, 0);
229       if ((rv = app_worker_init_listener (app_wrk, ls)))
230         return rv;
231     }
232
233   if (app_listener->local_index != SESSION_INVALID_INDEX)
234     {
235       ls = session_get (app_listener->local_index, 0);
236       if ((rv = app_worker_init_listener (app_wrk, ls)))
237         return rv;
238     }
239
240   return 0;
241 }
242
243 static void
244 app_worker_add_detached_sm (app_worker_t * app_wrk, u32 sm_index)
245 {
246   vec_add1 (app_wrk->detached_seg_managers, sm_index);
247 }
248
249 void
250 app_worker_del_detached_sm (app_worker_t * app_wrk, u32 sm_index)
251 {
252   u32 i;
253
254   clib_spinlock_lock (&app_wrk->detached_seg_managers_lock);
255   for (i = 0; i < vec_len (app_wrk->detached_seg_managers); i++)
256     {
257       if (app_wrk->detached_seg_managers[i] == sm_index)
258         {
259           vec_del1 (app_wrk->detached_seg_managers, i);
260           break;
261         }
262     }
263   clib_spinlock_unlock (&app_wrk->detached_seg_managers_lock);
264 }
265
266 static void
267 app_worker_stop_listen_session (app_worker_t * app_wrk, session_t * ls)
268 {
269   session_handle_t handle;
270   segment_manager_t *sm;
271   uword *sm_indexp;
272   session_state_t *states = 0;
273
274   handle = listen_session_get_handle (ls);
275   sm_indexp = hash_get (app_wrk->listeners_table, handle);
276   if (PREDICT_FALSE (!sm_indexp))
277     return;
278
279   /* Dealloc fifos, if any (dgram listeners) */
280   if (ls->rx_fifo)
281     {
282       segment_manager_dealloc_fifos (ls->rx_fifo, ls->tx_fifo);
283       ls->tx_fifo = ls->rx_fifo = 0;
284     }
285
286   /* Try to cleanup segment manager */
287   sm = segment_manager_get (*sm_indexp);
288   if (sm)
289     {
290       sm->first_is_protected = 0;
291       segment_manager_app_detach (sm);
292       if (!segment_manager_has_fifos (sm))
293         {
294           /* Empty segment manager, cleanup it up */
295           segment_manager_free (sm);
296         }
297       else
298         {
299           /* Delete sessions in CREATED state */
300           vec_add1 (states, SESSION_STATE_CREATED);
301           segment_manager_del_sessions_filter (sm, states);
302           vec_free (states);
303
304           /* Track segment manager in case app detaches and all the
305            * outstanding sessions need to be closed */
306           app_worker_add_detached_sm (app_wrk, *sm_indexp);
307           sm->flags |= SEG_MANAGER_F_DETACHED_LISTENER;
308         }
309     }
310
311   hash_unset (app_wrk->listeners_table, handle);
312 }
313
314 int
315 app_worker_stop_listen (app_worker_t * app_wrk, app_listener_t * al)
316 {
317   session_t *ls;
318
319   if (!clib_bitmap_get (al->workers, app_wrk->wrk_map_index))
320     return 0;
321
322   if (al->session_index != SESSION_INVALID_INDEX)
323     {
324       ls = listen_session_get (al->session_index);
325       app_worker_stop_listen_session (app_wrk, ls);
326     }
327
328   if (al->local_index != SESSION_INVALID_INDEX)
329     {
330       ls = listen_session_get (al->local_index);
331       app_worker_stop_listen_session (app_wrk, ls);
332     }
333
334   clib_bitmap_set_no_check (al->workers, app_wrk->wrk_map_index, 0);
335   if (clib_bitmap_is_zero (al->workers))
336     app_listener_cleanup (al);
337
338   return 0;
339 }
340
341 int
342 app_worker_init_accepted (session_t * s)
343 {
344   app_worker_t *app_wrk;
345   segment_manager_t *sm;
346   session_t *listener;
347   application_t *app;
348
349   listener = listen_session_get_from_handle (s->listener_handle);
350   app_wrk = application_listener_select_worker (listener);
351   if (PREDICT_FALSE (app_worker_mq_is_congested (app_wrk)))
352     return -1;
353
354   s->app_wrk_index = app_wrk->wrk_index;
355   app = application_get (app_wrk->app_index);
356   if (app->cb_fns.fifo_tuning_callback)
357     s->flags |= SESSION_F_CUSTOM_FIFO_TUNING;
358
359   sm = app_worker_get_listen_segment_manager (app_wrk, listener);
360   if (app_worker_alloc_session_fifos (sm, s))
361     return -1;
362
363   return 0;
364 }
365
366 int
367 app_worker_listened_notify (app_worker_t *app_wrk, session_handle_t alsh,
368                             u32 opaque, session_error_t err)
369 {
370   session_event_t evt = { .event_type = SESSION_CTRL_EVT_BOUND,
371                           .as_u64[0] = alsh,
372                           .as_u64[1] = (u64) opaque << 32 | err };
373
374   app_worker_add_event_custom (app_wrk, 0 /* thread index */, &evt);
375
376   return 0;
377 }
378
379 int
380 app_worker_unlisten_reply (app_worker_t *app_wrk, session_handle_t sh,
381                            u32 opaque, session_error_t err)
382 {
383   session_event_t evt = { .event_type = SESSION_CTRL_EVT_UNLISTEN_REPLY,
384                           .as_u64[0] = sh,
385                           .as_u64[1] = (u64) opaque << 32 | (u32) err };
386
387   app_worker_add_event_custom (app_wrk, 0 /* thread index */, &evt);
388   return 0;
389 }
390
391 int
392 app_worker_accept_notify (app_worker_t * app_wrk, session_t * s)
393 {
394   app_worker_add_event (app_wrk, s, SESSION_CTRL_EVT_ACCEPTED);
395   return 0;
396 }
397
398 int
399 app_worker_init_connected (app_worker_t * app_wrk, session_t * s)
400 {
401   application_t *app = application_get (app_wrk->app_index);
402   segment_manager_t *sm;
403
404   if (app->cb_fns.fifo_tuning_callback)
405     s->flags |= SESSION_F_CUSTOM_FIFO_TUNING;
406
407   /* Allocate fifos for session, unless the app is a builtin proxy */
408   if (application_is_builtin_proxy (app))
409     return app->cb_fns.proxy_alloc_session_fifos (s);
410
411   sm = app_worker_get_connect_segment_manager (app_wrk);
412   return app_worker_alloc_session_fifos (sm, s);
413 }
414
415 int
416 app_worker_connect_notify (app_worker_t * app_wrk, session_t * s,
417                            session_error_t err, u32 opaque)
418 {
419   session_event_t evt = { .event_type = SESSION_CTRL_EVT_CONNECTED,
420                           .as_u64[0] = s ? s->session_index : ~0,
421                           .as_u64[1] = (u64) opaque << 32 | (u32) err };
422   u32 thread_index = s ? s->thread_index : vlib_get_thread_index ();
423
424   app_worker_add_event_custom (app_wrk, thread_index, &evt);
425   return 0;
426 }
427
428 int
429 app_worker_add_half_open (app_worker_t *app_wrk, session_handle_t sh)
430 {
431   session_handle_t *shp;
432
433   ASSERT (session_vlib_thread_is_cl_thread ());
434   pool_get (app_wrk->half_open_table, shp);
435   *shp = sh;
436
437   return (shp - app_wrk->half_open_table);
438 }
439
440 int
441 app_worker_del_half_open (app_worker_t *app_wrk, session_t *s)
442 {
443   app_worker_add_event (app_wrk, s, SESSION_CTRL_EVT_HALF_CLEANUP);
444   return 0;
445 }
446
447 int
448 app_worker_close_notify (app_worker_t * app_wrk, session_t * s)
449 {
450   app_worker_add_event (app_wrk, s, SESSION_CTRL_EVT_DISCONNECTED);
451   return 0;
452 }
453
454 int
455 app_worker_transport_closed_notify (app_worker_t * app_wrk, session_t * s)
456 {
457   app_worker_add_event (app_wrk, s, SESSION_CTRL_EVT_TRANSPORT_CLOSED);
458   return 0;
459 }
460
461 int
462 app_worker_reset_notify (app_worker_t * app_wrk, session_t * s)
463 {
464   app_worker_add_event (app_wrk, s, SESSION_CTRL_EVT_RESET);
465   return 0;
466 }
467
468 int
469 app_worker_cleanup_notify (app_worker_t * app_wrk, session_t * s,
470                            session_cleanup_ntf_t ntf)
471 {
472   session_event_t evt = { .event_type = SESSION_CTRL_EVT_CLEANUP,
473                           .as_u64[0] = (u64) ntf << 32 | s->session_index,
474                           .as_u64[1] = pointer_to_uword (session_cleanup) };
475
476   app_worker_add_event_custom (app_wrk, s->thread_index, &evt);
477
478   return 0;
479 }
480
481 int
482 app_worker_cleanup_notify_custom (app_worker_t *app_wrk, session_t *s,
483                                   session_cleanup_ntf_t ntf,
484                                   void (*cleanup_cb) (session_t *s))
485 {
486   session_event_t evt = { .event_type = SESSION_CTRL_EVT_CLEANUP,
487                           .as_u64[0] = (u64) ntf << 32 | s->session_index,
488                           .as_u64[1] = pointer_to_uword (cleanup_cb) };
489
490   app_worker_add_event_custom (app_wrk, s->thread_index, &evt);
491
492   return 0;
493 }
494
495 int
496 app_worker_rx_notify (app_worker_t *app_wrk, session_t *s)
497 {
498   app_worker_add_event (app_wrk, s, SESSION_IO_EVT_RX);
499   return 0;
500 }
501
502 int
503 app_worker_migrate_notify (app_worker_t * app_wrk, session_t * s,
504                            session_handle_t new_sh)
505 {
506   session_event_t evt = { .event_type = SESSION_CTRL_EVT_MIGRATED,
507                           .as_u64[0] = s->session_index,
508                           .as_u64[1] = new_sh };
509
510   app_worker_add_event_custom (app_wrk, s->thread_index, &evt);
511   return 0;
512 }
513
514 int
515 app_worker_own_session (app_worker_t * app_wrk, session_t * s)
516 {
517   segment_manager_t *sm;
518   svm_fifo_t *rxf, *txf;
519   int rv;
520
521   if (s->session_state == SESSION_STATE_LISTENING)
522     return application_change_listener_owner (s, app_wrk);
523
524   s->app_wrk_index = app_wrk->wrk_index;
525
526   rxf = s->rx_fifo;
527   txf = s->tx_fifo;
528
529   if (!rxf || !txf)
530     return 0;
531
532   s->rx_fifo = 0;
533   s->tx_fifo = 0;
534
535   sm = app_worker_get_connect_segment_manager (app_wrk);
536   if ((rv = app_worker_alloc_session_fifos (sm, s)))
537     return rv;
538
539   if (!svm_fifo_is_empty_cons (rxf))
540     svm_fifo_clone (s->rx_fifo, rxf);
541
542   if (!svm_fifo_is_empty_cons (txf))
543     svm_fifo_clone (s->tx_fifo, txf);
544
545   segment_manager_dealloc_fifos (rxf, txf);
546
547   return 0;
548 }
549
550 int
551 app_worker_connect_session (app_worker_t *app_wrk, session_endpoint_cfg_t *sep,
552                             session_handle_t *rsh)
553 {
554   if (PREDICT_FALSE (app_worker_mq_is_congested (app_wrk)))
555     return SESSION_E_REFUSED;
556
557   sep->app_wrk_index = app_wrk->wrk_index;
558
559   return session_open (sep, rsh);
560 }
561
562 int
563 app_worker_session_fifo_tuning (app_worker_t * app_wrk, session_t * s,
564                                 svm_fifo_t * f,
565                                 session_ft_action_t act, u32 len)
566 {
567   application_t *app = application_get (app_wrk->app_index);
568   return app->cb_fns.fifo_tuning_callback (s, f, act, len);
569 }
570
571 segment_manager_t *
572 app_worker_get_connect_segment_manager (app_worker_t * app)
573 {
574   ASSERT (app->connects_seg_manager != (u32) ~ 0);
575   return segment_manager_get (app->connects_seg_manager);
576 }
577
578 segment_manager_t *
579 app_worker_get_listen_segment_manager (app_worker_t * app,
580                                        session_t * listener)
581 {
582   uword *smp;
583   smp = hash_get (app->listeners_table, listen_session_get_handle (listener));
584   ALWAYS_ASSERT (smp != 0);
585   return segment_manager_get (*smp);
586 }
587
588 session_t *
589 app_worker_first_listener (app_worker_t * app_wrk, u8 fib_proto,
590                            u8 transport_proto)
591 {
592   session_t *listener;
593   u64 handle;
594   u32 sm_index;
595   u8 sst;
596
597   sst = session_type_from_proto_and_ip (transport_proto,
598                                         fib_proto == FIB_PROTOCOL_IP4);
599
600   /* *INDENT-OFF* */
601    hash_foreach (handle, sm_index, app_wrk->listeners_table, ({
602      listener = listen_session_get_from_handle (handle);
603      if (listener->session_type == sst
604          && !(listener->flags & SESSION_F_PROXY))
605        return listener;
606    }));
607   /* *INDENT-ON* */
608
609   return 0;
610 }
611
612 session_t *
613 app_worker_proxy_listener (app_worker_t * app_wrk, u8 fib_proto,
614                            u8 transport_proto)
615 {
616   session_t *listener;
617   u64 handle;
618   u32 sm_index;
619   u8 sst;
620
621   sst = session_type_from_proto_and_ip (transport_proto,
622                                         fib_proto == FIB_PROTOCOL_IP4);
623
624   /* *INDENT-OFF* */
625    hash_foreach (handle, sm_index, app_wrk->listeners_table, ({
626      listener = listen_session_get_from_handle (handle);
627      if (listener->session_type == sst && (listener->flags & SESSION_F_PROXY))
628        return listener;
629    }));
630   /* *INDENT-ON* */
631
632   return 0;
633 }
634
635 /**
636  * Send an API message to the external app, to map new segment
637  */
638 int
639 app_worker_add_segment_notify (app_worker_t * app_wrk, u64 segment_handle)
640 {
641   session_event_t evt = { .event_type = SESSION_CTRL_EVT_APP_ADD_SEGMENT,
642                           .as_u64[1] = segment_handle };
643
644   app_worker_add_event_custom (app_wrk, vlib_get_thread_index (), &evt);
645
646   return 0;
647 }
648
649 int
650 app_worker_del_segment_notify (app_worker_t * app_wrk, u64 segment_handle)
651 {
652   session_event_t evt = { .event_type = SESSION_CTRL_EVT_APP_DEL_SEGMENT,
653                           .as_u64[1] = segment_handle };
654
655   app_worker_add_event_custom (app_wrk, vlib_get_thread_index (), &evt);
656
657   return 0;
658 }
659
660 static int
661 app_wrk_send_fd (app_worker_t *app_wrk, int fd)
662 {
663   if (!appns_sapi_enabled ())
664     {
665       vl_api_registration_t *reg;
666       clib_error_t *error;
667
668       reg =
669         vl_mem_api_client_index_to_registration (app_wrk->api_client_index);
670       if (!reg)
671         {
672           clib_warning ("no api registration for client: %u",
673                         app_wrk->api_client_index);
674           return -1;
675         }
676
677       if (vl_api_registration_file_index (reg) == VL_API_INVALID_FI)
678         return -1;
679
680       error = vl_api_send_fd_msg (reg, &fd, 1);
681       if (error)
682         {
683           clib_error_report (error);
684           return -1;
685         }
686
687       return 0;
688     }
689
690   app_sapi_msg_t smsg = { 0 };
691   app_namespace_t *app_ns;
692   clib_error_t *error;
693   application_t *app;
694   clib_socket_t *cs;
695   u32 cs_index;
696
697   app = application_get (app_wrk->app_index);
698   app_ns = app_namespace_get (app->ns_index);
699   cs_index = appns_sapi_handle_sock_index (app_wrk->api_client_index);
700   cs = appns_sapi_get_socket (app_ns, cs_index);
701   if (PREDICT_FALSE (!cs))
702     return -1;
703
704   /* There's no payload for the message only the type */
705   smsg.type = APP_SAPI_MSG_TYPE_SEND_FDS;
706   error = clib_socket_sendmsg (cs, &smsg, sizeof (smsg), &fd, 1);
707   if (error)
708     {
709       clib_error_report (error);
710       return -1;
711     }
712
713   return 0;
714 }
715
716 void
717 app_worker_add_event (app_worker_t *app_wrk, session_t *s,
718                       session_evt_type_t evt_type)
719 {
720   session_event_t *evt;
721
722   ASSERT (s->thread_index == vlib_get_thread_index ());
723   clib_fifo_add2 (app_wrk->wrk_evts[s->thread_index], evt);
724   evt->session_index = s->session_index;
725   evt->event_type = evt_type;
726   evt->postponed = 0;
727
728   /* First event for this app_wrk. Schedule it for handling in session input */
729   if (clib_fifo_elts (app_wrk->wrk_evts[s->thread_index]) == 1)
730     {
731       session_worker_t *wrk = session_main_get_worker (s->thread_index);
732       session_wrk_program_app_wrk_evts (wrk, app_wrk->wrk_index);
733     }
734 }
735
736 void
737 app_worker_add_event_custom (app_worker_t *app_wrk, u32 thread_index,
738                              session_event_t *evt)
739 {
740   clib_fifo_add1 (app_wrk->wrk_evts[thread_index], *evt);
741
742   /* First event for this app_wrk. Schedule it for handling in session input */
743   if (clib_fifo_elts (app_wrk->wrk_evts[thread_index]) == 1)
744     {
745       session_worker_t *wrk = session_main_get_worker (thread_index);
746       session_wrk_program_app_wrk_evts (wrk, app_wrk->wrk_index);
747     }
748 }
749
750 always_inline void
751 app_wrk_send_ctrl_evt_inline (app_worker_t *app_wrk, u8 evt_type, void *msg,
752                               u32 msg_len, int fd)
753 {
754   svm_msg_q_msg_t _mq_msg, *mq_msg = &_mq_msg;
755   svm_msg_q_t *mq = app_wrk->event_queue;
756   session_event_t *evt;
757
758   ASSERT (!svm_msg_q_or_ring_is_full (mq, SESSION_MQ_CTRL_EVT_RING));
759   *mq_msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_CTRL_EVT_RING);
760
761   evt = svm_msg_q_msg_data (mq, mq_msg);
762   clib_memset (evt, 0, sizeof (*evt));
763   evt->event_type = evt_type;
764   clib_memcpy_fast (evt->data, msg, msg_len);
765
766   if (fd != -1)
767     app_wrk_send_fd (app_wrk, fd);
768
769   svm_msg_q_add_raw (mq, mq_msg);
770 }
771
772 void
773 app_wrk_send_ctrl_evt_fd (app_worker_t *app_wrk, u8 evt_type, void *msg,
774                           u32 msg_len, int fd)
775 {
776   app_wrk_send_ctrl_evt_inline (app_wrk, evt_type, msg, msg_len, fd);
777 }
778
779 void
780 app_wrk_send_ctrl_evt (app_worker_t *app_wrk, u8 evt_type, void *msg,
781                        u32 msg_len)
782 {
783   app_wrk_send_ctrl_evt_inline (app_wrk, evt_type, msg, msg_len, -1);
784 }
785
786 u8
787 app_worker_mq_wrk_is_congested (app_worker_t *app_wrk, u32 thread_index)
788 {
789   return app_wrk->wrk_mq_congested[thread_index] > 0;
790 }
791
792 void
793 app_worker_set_mq_wrk_congested (app_worker_t *app_wrk, u32 thread_index)
794 {
795   clib_atomic_fetch_add_relax (&app_wrk->mq_congested, 1);
796   ASSERT (thread_index == vlib_get_thread_index ());
797   app_wrk->wrk_mq_congested[thread_index] = 1;
798 }
799
800 void
801 app_worker_unset_wrk_mq_congested (app_worker_t *app_wrk, u32 thread_index)
802 {
803   clib_atomic_fetch_sub_relax (&app_wrk->mq_congested, 1);
804   ASSERT (thread_index == vlib_get_thread_index ());
805   app_wrk->wrk_mq_congested[thread_index] = 0;
806 }
807
808 u8 *
809 format_app_worker_listener (u8 * s, va_list * args)
810 {
811   app_worker_t *app_wrk = va_arg (*args, app_worker_t *);
812   u64 handle = va_arg (*args, u64);
813   u32 sm_index = va_arg (*args, u32);
814   int verbose = va_arg (*args, int);
815   session_t *listener;
816   const u8 *app_name;
817   u8 *str;
818
819   if (!app_wrk)
820     {
821       if (verbose)
822         s = format (s, "%-" SESSION_CLI_ID_LEN "s%-25s%-10s%-15s%-15s%-10s",
823                     "Connection", "App", "Wrk", "API Client", "ListenerID",
824                     "SegManager");
825       else
826         s = format (s, "%-" SESSION_CLI_ID_LEN "s%-25s%-10s", "Connection",
827                     "App", "Wrk");
828
829       return s;
830     }
831
832   app_name = application_name_from_index (app_wrk->app_index);
833   listener = listen_session_get_from_handle (handle);
834   str = format (0, "%U", format_session, listener, verbose);
835
836   if (verbose)
837     {
838       u8 *buf;
839       buf = format (0, "%u(%u)", app_wrk->wrk_map_index, app_wrk->wrk_index);
840       s = format (s, "%-" SESSION_CLI_ID_LEN "v%-25v%-10v%-15u%-15u%-10u", str,
841                   app_name, buf, app_wrk->api_client_index, handle, sm_index);
842       vec_free (buf);
843     }
844   else
845     s = format (s, "%-" SESSION_CLI_ID_LEN "v%-25v%=10u", str, app_name,
846                 app_wrk->wrk_map_index);
847
848   vec_free (str);
849
850   return s;
851 }
852
853 u8 *
854 format_app_worker (u8 * s, va_list * args)
855 {
856   app_worker_t *app_wrk = va_arg (*args, app_worker_t *);
857   u32 indent = 1;
858
859   s = format (s,
860               "%U wrk-index %u app-index %u map-index %u "
861               "api-client-index %d mq-cong %u\n",
862               format_white_space, indent, app_wrk->wrk_index,
863               app_wrk->app_index, app_wrk->wrk_map_index,
864               app_wrk->api_client_index, app_wrk->mq_congested);
865   return s;
866 }
867
868 void
869 app_worker_format_connects (app_worker_t * app_wrk, int verbose)
870 {
871   segment_manager_t *sm;
872
873   /* Header */
874   if (!app_wrk)
875     {
876       segment_manager_format_sessions (0, verbose);
877       return;
878     }
879
880   if (app_wrk->connects_seg_manager == (u32) ~ 0)
881     return;
882
883   sm = segment_manager_get (app_wrk->connects_seg_manager);
884   segment_manager_format_sessions (sm, verbose);
885 }
886
887 /*
888  * fd.io coding-style-patch-verification: ON
889  *
890  * Local Variables:
891  * eval: (c-set-style "gnu")
892  * End:
893  */