session: async rx event notifications
[vpp.git] / src / vnet / session / application_worker.c
1 /*
2  * Copyright (c) 2019 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include <vnet/session/application.h>
17 #include <vnet/session/application_interface.h>
18 #include <vnet/session/session.h>
19
20 /**
21  * Pool of workers associated to apps
22  */
23 static app_worker_t *app_workers;
24
25 app_worker_t *
26 app_worker_alloc (application_t * app)
27 {
28   app_worker_t *app_wrk;
29
30   pool_get (app_workers, app_wrk);
31   clib_memset (app_wrk, 0, sizeof (*app_wrk));
32   app_wrk->wrk_index = app_wrk - app_workers;
33   app_wrk->app_index = app->app_index;
34   app_wrk->wrk_map_index = ~0;
35   app_wrk->connects_seg_manager = APP_INVALID_SEGMENT_MANAGER_INDEX;
36   clib_spinlock_init (&app_wrk->detached_seg_managers_lock);
37   vec_validate (app_wrk->wrk_evts, vlib_num_workers ());
38   vec_validate (app_wrk->wrk_mq_congested, vlib_num_workers ());
39   APP_DBG ("New app %v worker %u", app->name, app_wrk->wrk_index);
40   return app_wrk;
41 }
42
43 app_worker_t *
44 app_worker_get (u32 wrk_index)
45 {
46   return pool_elt_at_index (app_workers, wrk_index);
47 }
48
49 app_worker_t *
50 app_worker_get_if_valid (u32 wrk_index)
51 {
52   if (pool_is_free_index (app_workers, wrk_index))
53     return 0;
54   return pool_elt_at_index (app_workers, wrk_index);
55 }
56
57 void
58 app_worker_free (app_worker_t * app_wrk)
59 {
60   application_t *app = application_get (app_wrk->app_index);
61   vnet_unlisten_args_t _a, *a = &_a;
62   u64 handle, *handles = 0, *sm_indices = 0;
63   segment_manager_t *sm;
64   session_handle_t *sh;
65   session_t *ls;
66   u32 sm_index;
67   int i;
68
69   /*
70    * Cleanup vpp wrk events
71    */
72   app_worker_del_all_events (app_wrk);
73   for (i = 0; i < vec_len (app_wrk->wrk_evts); i++)
74     clib_fifo_free (app_wrk->wrk_evts[i]);
75
76   vec_free (app_wrk->wrk_evts);
77   vec_free (app_wrk->wrk_mq_congested);
78
79   /*
80    *  Listener cleanup
81    */
82
83   hash_foreach (handle, sm_index, app_wrk->listeners_table, ({
84     ls = listen_session_get_from_handle (handle);
85     vec_add1 (handles, app_listen_session_handle (ls));
86     vec_add1 (sm_indices, sm_index);
87     sm = segment_manager_get (sm_index);
88   }));
89
90   for (i = 0; i < vec_len (handles); i++)
91     {
92       /* Cleanup listener */
93       a->app_index = app->app_index;
94       a->wrk_map_index = app_wrk->wrk_map_index;
95       a->handle = handles[i];
96       (void) vnet_unlisten (a);
97
98       sm = segment_manager_get_if_valid (sm_indices[i]);
99       if (sm && !segment_manager_app_detached (sm))
100         {
101           sm->first_is_protected = 0;
102           segment_manager_init_free (sm);
103         }
104     }
105   vec_reset_length (handles);
106   vec_free (sm_indices);
107   hash_free (app_wrk->listeners_table);
108
109   /*
110    * Connects segment manager cleanup
111    */
112
113   if (app_wrk->connects_seg_manager != APP_INVALID_SEGMENT_MANAGER_INDEX)
114     {
115       sm = segment_manager_get (app_wrk->connects_seg_manager);
116       sm->app_wrk_index = SEGMENT_MANAGER_INVALID_APP_INDEX;
117       sm->first_is_protected = 0;
118       segment_manager_init_free (sm);
119     }
120
121   /*
122    * Half-open cleanup
123    */
124
125   pool_foreach (sh, app_wrk->half_open_table)
126     session_cleanup_half_open (*sh);
127
128   pool_free (app_wrk->half_open_table);
129
130   /*
131    * Detached listener segment managers cleanup
132    */
133   for (i = 0; i < vec_len (app_wrk->detached_seg_managers); i++)
134     {
135       sm = segment_manager_get (app_wrk->detached_seg_managers[i]);
136       segment_manager_init_free (sm);
137     }
138   vec_free (app_wrk->detached_seg_managers);
139   clib_spinlock_free (&app_wrk->detached_seg_managers_lock);
140
141   if (CLIB_DEBUG)
142     clib_memset (app_wrk, 0xfe, sizeof (*app_wrk));
143   pool_put (app_workers, app_wrk);
144 }
145
146 application_t *
147 app_worker_get_app (u32 wrk_index)
148 {
149   app_worker_t *app_wrk;
150   app_wrk = app_worker_get_if_valid (wrk_index);
151   if (!app_wrk)
152     return 0;
153   return application_get_if_valid (app_wrk->app_index);
154 }
155
156 static segment_manager_t *
157 app_worker_alloc_segment_manager (app_worker_t * app_wrk)
158 {
159   segment_manager_t *sm;
160
161   sm = segment_manager_alloc ();
162   sm->app_wrk_index = app_wrk->wrk_index;
163   segment_manager_init (sm);
164   return sm;
165 }
166
167 static int
168 app_worker_alloc_session_fifos (segment_manager_t * sm, session_t * s)
169 {
170   svm_fifo_t *rx_fifo = 0, *tx_fifo = 0;
171   int rv;
172
173   if ((rv = segment_manager_alloc_session_fifos (sm, s->thread_index,
174                                                  &rx_fifo, &tx_fifo)))
175     return rv;
176
177   rx_fifo->shr->master_session_index = s->session_index;
178   rx_fifo->master_thread_index = s->thread_index;
179
180   tx_fifo->shr->master_session_index = s->session_index;
181   tx_fifo->master_thread_index = s->thread_index;
182
183   s->rx_fifo = rx_fifo;
184   s->tx_fifo = tx_fifo;
185   return 0;
186 }
187
188 int
189 app_worker_init_listener (app_worker_t * app_wrk, session_t * ls)
190 {
191   segment_manager_t *sm;
192
193   /* Allocate segment manager. All sessions derived out of a listen session
194    * have fifos allocated by the same segment manager. */
195   if (!(sm = app_worker_alloc_segment_manager (app_wrk)))
196     return SESSION_E_ALLOC;
197
198   /* Once the first segment is mapped, don't remove it until unlisten */
199   sm->first_is_protected = 1;
200
201   /* Keep track of the segment manager for the listener or this worker */
202   hash_set (app_wrk->listeners_table, listen_session_get_handle (ls),
203             segment_manager_index (sm));
204
205   if (transport_connection_is_cless (session_get_transport (ls)))
206     {
207       if (ls->rx_fifo)
208         return SESSION_E_NOSUPPORT;
209       return app_worker_alloc_session_fifos (sm, ls);
210     }
211   return 0;
212 }
213
214 session_error_t
215 app_worker_start_listen (app_worker_t *app_wrk, app_listener_t *app_listener)
216 {
217   session_t *ls;
218   int rv;
219
220   if (clib_bitmap_get (app_listener->workers, app_wrk->wrk_map_index))
221     return SESSION_E_ALREADY_LISTENING;
222
223   app_listener->workers = clib_bitmap_set (app_listener->workers,
224                                            app_wrk->wrk_map_index, 1);
225
226   if (app_listener->session_index != SESSION_INVALID_INDEX)
227     {
228       ls = session_get (app_listener->session_index, 0);
229       if ((rv = app_worker_init_listener (app_wrk, ls)))
230         return rv;
231     }
232
233   if (app_listener->local_index != SESSION_INVALID_INDEX)
234     {
235       ls = session_get (app_listener->local_index, 0);
236       if ((rv = app_worker_init_listener (app_wrk, ls)))
237         return rv;
238     }
239
240   return 0;
241 }
242
243 static void
244 app_worker_add_detached_sm (app_worker_t * app_wrk, u32 sm_index)
245 {
246   vec_add1 (app_wrk->detached_seg_managers, sm_index);
247 }
248
249 void
250 app_worker_del_detached_sm (app_worker_t * app_wrk, u32 sm_index)
251 {
252   u32 i;
253
254   clib_spinlock_lock (&app_wrk->detached_seg_managers_lock);
255   for (i = 0; i < vec_len (app_wrk->detached_seg_managers); i++)
256     {
257       if (app_wrk->detached_seg_managers[i] == sm_index)
258         {
259           vec_del1 (app_wrk->detached_seg_managers, i);
260           break;
261         }
262     }
263   clib_spinlock_unlock (&app_wrk->detached_seg_managers_lock);
264 }
265
266 static void
267 app_worker_stop_listen_session (app_worker_t * app_wrk, session_t * ls)
268 {
269   session_handle_t handle;
270   segment_manager_t *sm;
271   uword *sm_indexp;
272   session_state_t *states = 0;
273
274   handle = listen_session_get_handle (ls);
275   sm_indexp = hash_get (app_wrk->listeners_table, handle);
276   if (PREDICT_FALSE (!sm_indexp))
277     return;
278
279   /* Dealloc fifos, if any (dgram listeners) */
280   if (ls->rx_fifo)
281     {
282       segment_manager_dealloc_fifos (ls->rx_fifo, ls->tx_fifo);
283       ls->tx_fifo = ls->rx_fifo = 0;
284     }
285
286   /* Try to cleanup segment manager */
287   sm = segment_manager_get (*sm_indexp);
288   if (sm)
289     {
290       sm->first_is_protected = 0;
291       segment_manager_app_detach (sm);
292       if (!segment_manager_has_fifos (sm))
293         {
294           /* Empty segment manager, cleanup it up */
295           segment_manager_free (sm);
296         }
297       else
298         {
299           /* Delete sessions in CREATED state */
300           vec_add1 (states, SESSION_STATE_CREATED);
301           segment_manager_del_sessions_filter (sm, states);
302           vec_free (states);
303
304           /* Track segment manager in case app detaches and all the
305            * outstanding sessions need to be closed */
306           app_worker_add_detached_sm (app_wrk, *sm_indexp);
307           sm->flags |= SEG_MANAGER_F_DETACHED_LISTENER;
308         }
309     }
310
311   hash_unset (app_wrk->listeners_table, handle);
312 }
313
314 int
315 app_worker_stop_listen (app_worker_t * app_wrk, app_listener_t * al)
316 {
317   session_t *ls;
318
319   if (!clib_bitmap_get (al->workers, app_wrk->wrk_map_index))
320     return 0;
321
322   if (al->session_index != SESSION_INVALID_INDEX)
323     {
324       ls = listen_session_get (al->session_index);
325       app_worker_stop_listen_session (app_wrk, ls);
326     }
327
328   if (al->local_index != SESSION_INVALID_INDEX)
329     {
330       ls = listen_session_get (al->local_index);
331       app_worker_stop_listen_session (app_wrk, ls);
332     }
333
334   clib_bitmap_set_no_check (al->workers, app_wrk->wrk_map_index, 0);
335   if (clib_bitmap_is_zero (al->workers))
336     app_listener_cleanup (al);
337
338   return 0;
339 }
340
341 int
342 app_worker_init_accepted (session_t * s)
343 {
344   app_worker_t *app_wrk;
345   segment_manager_t *sm;
346   session_t *listener;
347   application_t *app;
348
349   listener = listen_session_get_from_handle (s->listener_handle);
350   app_wrk = application_listener_select_worker (listener);
351   if (PREDICT_FALSE (app_worker_mq_is_congested (app_wrk)))
352     return -1;
353
354   s->app_wrk_index = app_wrk->wrk_index;
355   app = application_get (app_wrk->app_index);
356   if (app->cb_fns.fifo_tuning_callback)
357     s->flags |= SESSION_F_CUSTOM_FIFO_TUNING;
358
359   sm = app_worker_get_listen_segment_manager (app_wrk, listener);
360   if (app_worker_alloc_session_fifos (sm, s))
361     return -1;
362
363   return 0;
364 }
365
366 int
367 app_worker_listened_notify (app_worker_t *app_wrk, session_handle_t alsh,
368                             u32 opaque, session_error_t err)
369 {
370   session_event_t evt;
371
372   evt.event_type = SESSION_CTRL_EVT_BOUND;
373   evt.session_handle = alsh;
374   evt.as_u64[1] = (u64) opaque << 32 | err;
375
376   app_worker_add_event_custom (app_wrk, 0 /* thread index */, &evt);
377
378   return 0;
379 }
380
381 int
382 app_worker_unlisten_reply (app_worker_t *app_wrk, session_handle_t sh,
383                            u32 opaque, session_error_t err)
384 {
385   session_event_t evt = {};
386
387   evt.event_type = SESSION_CTRL_EVT_UNLISTEN_REPLY;
388   evt.session_handle = sh;
389   evt.as_u64[1] = (u64) opaque << 32 | (u32) err;
390
391   app_worker_add_event_custom (app_wrk, 0 /* thread index */, &evt);
392   return 0;
393 }
394
395 int
396 app_worker_accept_notify (app_worker_t * app_wrk, session_t * s)
397 {
398   app_worker_add_event (app_wrk, s, SESSION_CTRL_EVT_ACCEPTED);
399   return 0;
400 }
401
402 int
403 app_worker_init_connected (app_worker_t * app_wrk, session_t * s)
404 {
405   application_t *app = application_get (app_wrk->app_index);
406   segment_manager_t *sm;
407
408   if (app->cb_fns.fifo_tuning_callback)
409     s->flags |= SESSION_F_CUSTOM_FIFO_TUNING;
410
411   /* Allocate fifos for session, unless the app is a builtin proxy */
412   if (application_is_builtin_proxy (app))
413     return 0;
414
415   sm = app_worker_get_connect_segment_manager (app_wrk);
416   return app_worker_alloc_session_fifos (sm, s);
417 }
418
419 int
420 app_worker_connect_notify (app_worker_t * app_wrk, session_t * s,
421                            session_error_t err, u32 opaque)
422 {
423   session_event_t evt = {};
424   u32 thread_index;
425
426   evt.event_type = SESSION_CTRL_EVT_CONNECTED;
427   evt.session_index = s ? s->session_index : ~0;
428   evt.as_u64[1] = (u64) opaque << 32 | (u32) err;
429   thread_index = s ? s->thread_index : vlib_get_thread_index ();
430
431   app_worker_add_event_custom (app_wrk, thread_index, &evt);
432   return 0;
433 }
434
435 int
436 app_worker_add_half_open (app_worker_t *app_wrk, session_handle_t sh)
437 {
438   session_handle_t *shp;
439
440   ASSERT (session_vlib_thread_is_cl_thread ());
441   pool_get (app_wrk->half_open_table, shp);
442   *shp = sh;
443
444   return (shp - app_wrk->half_open_table);
445 }
446
447 int
448 app_worker_del_half_open (app_worker_t *app_wrk, session_t *s)
449 {
450   app_worker_add_event (app_wrk, s, SESSION_CTRL_EVT_HALF_CLEANUP);
451   return 0;
452 }
453
454 int
455 app_worker_close_notify (app_worker_t * app_wrk, session_t * s)
456 {
457   app_worker_add_event (app_wrk, s, SESSION_CTRL_EVT_DISCONNECTED);
458   return 0;
459 }
460
461 int
462 app_worker_transport_closed_notify (app_worker_t * app_wrk, session_t * s)
463 {
464   app_worker_add_event (app_wrk, s, SESSION_CTRL_EVT_TRANSPORT_CLOSED);
465   return 0;
466 }
467
468 int
469 app_worker_reset_notify (app_worker_t * app_wrk, session_t * s)
470 {
471   app_worker_add_event (app_wrk, s, SESSION_CTRL_EVT_RESET);
472   return 0;
473 }
474
475 int
476 app_worker_cleanup_notify (app_worker_t * app_wrk, session_t * s,
477                            session_cleanup_ntf_t ntf)
478 {
479   session_event_t evt;
480
481   evt.event_type = SESSION_CTRL_EVT_CLEANUP;
482   evt.as_u64[0] = (u64) ntf << 32 | s->session_index;
483   evt.as_u64[1] = pointer_to_uword (session_cleanup);
484
485   app_worker_add_event_custom (app_wrk, s->thread_index, &evt);
486
487   return 0;
488 }
489
490 int
491 app_worker_cleanup_notify_custom (app_worker_t *app_wrk, session_t *s,
492                                   session_cleanup_ntf_t ntf,
493                                   void (*cleanup_cb) (session_t *s))
494 {
495   session_event_t evt;
496
497   evt.event_type = SESSION_CTRL_EVT_CLEANUP;
498   evt.as_u64[0] = (u64) ntf << 32 | s->session_index;
499   evt.as_u64[1] = pointer_to_uword (cleanup_cb);
500
501   app_worker_add_event_custom (app_wrk, s->thread_index, &evt);
502
503   return 0;
504 }
505
506 int
507 app_worker_rx_notify (app_worker_t *app_wrk, session_t *s)
508 {
509   app_worker_add_event (app_wrk, s, SESSION_IO_EVT_RX);
510   return 0;
511 }
512
513 int
514 app_worker_migrate_notify (app_worker_t * app_wrk, session_t * s,
515                            session_handle_t new_sh)
516 {
517   session_event_t evt;
518
519   evt.event_type = SESSION_CTRL_EVT_MIGRATED;
520   evt.session_index = s->session_index;
521   evt.as_u64[1] = new_sh;
522
523   app_worker_add_event_custom (app_wrk, s->thread_index, &evt);
524   return 0;
525 }
526
527 int
528 app_worker_own_session (app_worker_t * app_wrk, session_t * s)
529 {
530   segment_manager_t *sm;
531   svm_fifo_t *rxf, *txf;
532   int rv;
533
534   if (s->session_state == SESSION_STATE_LISTENING)
535     return application_change_listener_owner (s, app_wrk);
536
537   s->app_wrk_index = app_wrk->wrk_index;
538
539   rxf = s->rx_fifo;
540   txf = s->tx_fifo;
541
542   if (!rxf || !txf)
543     return 0;
544
545   s->rx_fifo = 0;
546   s->tx_fifo = 0;
547
548   sm = app_worker_get_connect_segment_manager (app_wrk);
549   if ((rv = app_worker_alloc_session_fifos (sm, s)))
550     return rv;
551
552   if (!svm_fifo_is_empty_cons (rxf))
553     svm_fifo_clone (s->rx_fifo, rxf);
554
555   if (!svm_fifo_is_empty_cons (txf))
556     svm_fifo_clone (s->tx_fifo, txf);
557
558   segment_manager_dealloc_fifos (rxf, txf);
559
560   return 0;
561 }
562
563 int
564 app_worker_connect_session (app_worker_t *app_wrk, session_endpoint_cfg_t *sep,
565                             session_handle_t *rsh)
566 {
567   if (PREDICT_FALSE (app_worker_mq_is_congested (app_wrk)))
568     return SESSION_E_REFUSED;
569
570   sep->app_wrk_index = app_wrk->wrk_index;
571
572   return session_open (sep, rsh);
573 }
574
575 int
576 app_worker_session_fifo_tuning (app_worker_t * app_wrk, session_t * s,
577                                 svm_fifo_t * f,
578                                 session_ft_action_t act, u32 len)
579 {
580   application_t *app = application_get (app_wrk->app_index);
581   return app->cb_fns.fifo_tuning_callback (s, f, act, len);
582 }
583
584 segment_manager_t *
585 app_worker_get_connect_segment_manager (app_worker_t * app)
586 {
587   ASSERT (app->connects_seg_manager != (u32) ~ 0);
588   return segment_manager_get (app->connects_seg_manager);
589 }
590
591 segment_manager_t *
592 app_worker_get_listen_segment_manager (app_worker_t * app,
593                                        session_t * listener)
594 {
595   uword *smp;
596   smp = hash_get (app->listeners_table, listen_session_get_handle (listener));
597   ALWAYS_ASSERT (smp != 0);
598   return segment_manager_get (*smp);
599 }
600
601 session_t *
602 app_worker_first_listener (app_worker_t * app_wrk, u8 fib_proto,
603                            u8 transport_proto)
604 {
605   session_t *listener;
606   u64 handle;
607   u32 sm_index;
608   u8 sst;
609
610   sst = session_type_from_proto_and_ip (transport_proto,
611                                         fib_proto == FIB_PROTOCOL_IP4);
612
613   /* *INDENT-OFF* */
614    hash_foreach (handle, sm_index, app_wrk->listeners_table, ({
615      listener = listen_session_get_from_handle (handle);
616      if (listener->session_type == sst
617          && !(listener->flags & SESSION_F_PROXY))
618        return listener;
619    }));
620   /* *INDENT-ON* */
621
622   return 0;
623 }
624
625 session_t *
626 app_worker_proxy_listener (app_worker_t * app_wrk, u8 fib_proto,
627                            u8 transport_proto)
628 {
629   session_t *listener;
630   u64 handle;
631   u32 sm_index;
632   u8 sst;
633
634   sst = session_type_from_proto_and_ip (transport_proto,
635                                         fib_proto == FIB_PROTOCOL_IP4);
636
637   /* *INDENT-OFF* */
638    hash_foreach (handle, sm_index, app_wrk->listeners_table, ({
639      listener = listen_session_get_from_handle (handle);
640      if (listener->session_type == sst && (listener->flags & SESSION_F_PROXY))
641        return listener;
642    }));
643   /* *INDENT-ON* */
644
645   return 0;
646 }
647
648 /**
649  * Send an API message to the external app, to map new segment
650  */
651 int
652 app_worker_add_segment_notify (app_worker_t * app_wrk, u64 segment_handle)
653 {
654   session_event_t evt;
655
656   evt.event_type = SESSION_CTRL_EVT_APP_ADD_SEGMENT;
657   evt.as_u64[1] = segment_handle;
658
659   app_worker_add_event_custom (app_wrk, vlib_get_thread_index (), &evt);
660
661   return 0;
662 }
663
664 int
665 app_worker_del_segment_notify (app_worker_t * app_wrk, u64 segment_handle)
666 {
667   session_event_t evt;
668
669   evt.event_type = SESSION_CTRL_EVT_APP_DEL_SEGMENT;
670   evt.as_u64[1] = segment_handle;
671
672   app_worker_add_event_custom (app_wrk, vlib_get_thread_index (), &evt);
673
674   return 0;
675 }
676
677 static int
678 app_wrk_send_fd (app_worker_t *app_wrk, int fd)
679 {
680   if (!appns_sapi_enabled ())
681     {
682       vl_api_registration_t *reg;
683       clib_error_t *error;
684
685       reg =
686         vl_mem_api_client_index_to_registration (app_wrk->api_client_index);
687       if (!reg)
688         {
689           clib_warning ("no api registration for client: %u",
690                         app_wrk->api_client_index);
691           return -1;
692         }
693
694       if (vl_api_registration_file_index (reg) == VL_API_INVALID_FI)
695         return -1;
696
697       error = vl_api_send_fd_msg (reg, &fd, 1);
698       if (error)
699         {
700           clib_error_report (error);
701           return -1;
702         }
703
704       return 0;
705     }
706
707   app_sapi_msg_t smsg = { 0 };
708   app_namespace_t *app_ns;
709   clib_error_t *error;
710   application_t *app;
711   clib_socket_t *cs;
712   u32 cs_index;
713
714   app = application_get (app_wrk->app_index);
715   app_ns = app_namespace_get (app->ns_index);
716   cs_index = appns_sapi_handle_sock_index (app_wrk->api_client_index);
717   cs = appns_sapi_get_socket (app_ns, cs_index);
718   if (PREDICT_FALSE (!cs))
719     return -1;
720
721   /* There's no payload for the message only the type */
722   smsg.type = APP_SAPI_MSG_TYPE_SEND_FDS;
723   error = clib_socket_sendmsg (cs, &smsg, sizeof (smsg), &fd, 1);
724   if (error)
725     {
726       clib_error_report (error);
727       return -1;
728     }
729
730   return 0;
731 }
732
733 void
734 app_worker_add_event (app_worker_t *app_wrk, session_t *s,
735                       session_evt_type_t evt_type)
736 {
737   session_event_t *evt;
738
739   ASSERT (s->thread_index == vlib_get_thread_index ());
740   clib_fifo_add2 (app_wrk->wrk_evts[s->thread_index], evt);
741   evt->session_index = s->session_index;
742   evt->event_type = evt_type;
743   evt->postponed = 0;
744
745   /* First event for this app_wrk. Schedule it for handling in session input */
746   if (clib_fifo_elts (app_wrk->wrk_evts[s->thread_index]) == 1)
747     {
748       session_worker_t *wrk = session_main_get_worker (s->thread_index);
749       session_wrk_program_app_wrk_evts (wrk, app_wrk->wrk_index);
750     }
751 }
752
753 void
754 app_worker_add_event_custom (app_worker_t *app_wrk, u32 thread_index,
755                              session_event_t *evt)
756 {
757   clib_fifo_add1 (app_wrk->wrk_evts[thread_index], *evt);
758
759   /* First event for this app_wrk. Schedule it for handling in session input */
760   if (clib_fifo_elts (app_wrk->wrk_evts[thread_index]) == 1)
761     {
762       session_worker_t *wrk = session_main_get_worker (thread_index);
763       session_wrk_program_app_wrk_evts (wrk, app_wrk->wrk_index);
764     }
765 }
766
767 always_inline void
768 app_wrk_send_ctrl_evt_inline (app_worker_t *app_wrk, u8 evt_type, void *msg,
769                               u32 msg_len, int fd)
770 {
771   svm_msg_q_msg_t _mq_msg, *mq_msg = &_mq_msg;
772   svm_msg_q_t *mq = app_wrk->event_queue;
773   session_event_t *evt;
774
775   ASSERT (!svm_msg_q_or_ring_is_full (mq, SESSION_MQ_CTRL_EVT_RING));
776   *mq_msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_CTRL_EVT_RING);
777
778   evt = svm_msg_q_msg_data (mq, mq_msg);
779   clib_memset (evt, 0, sizeof (*evt));
780   evt->event_type = evt_type;
781   clib_memcpy_fast (evt->data, msg, msg_len);
782
783   if (fd != -1)
784     app_wrk_send_fd (app_wrk, fd);
785
786   svm_msg_q_add_raw (mq, mq_msg);
787 }
788
789 void
790 app_wrk_send_ctrl_evt_fd (app_worker_t *app_wrk, u8 evt_type, void *msg,
791                           u32 msg_len, int fd)
792 {
793   app_wrk_send_ctrl_evt_inline (app_wrk, evt_type, msg, msg_len, fd);
794 }
795
796 void
797 app_wrk_send_ctrl_evt (app_worker_t *app_wrk, u8 evt_type, void *msg,
798                        u32 msg_len)
799 {
800   app_wrk_send_ctrl_evt_inline (app_wrk, evt_type, msg, msg_len, -1);
801 }
802
803 u8
804 app_worker_mq_wrk_is_congested (app_worker_t *app_wrk, u32 thread_index)
805 {
806   return app_wrk->wrk_mq_congested[thread_index] > 0;
807 }
808
809 void
810 app_worker_set_mq_wrk_congested (app_worker_t *app_wrk, u32 thread_index)
811 {
812   clib_atomic_fetch_add_relax (&app_wrk->mq_congested, 1);
813   ASSERT (thread_index == vlib_get_thread_index ());
814   app_wrk->wrk_mq_congested[thread_index] = 1;
815 }
816
817 void
818 app_worker_unset_wrk_mq_congested (app_worker_t *app_wrk, u32 thread_index)
819 {
820   clib_atomic_fetch_sub_relax (&app_wrk->mq_congested, 1);
821   ASSERT (thread_index == vlib_get_thread_index ());
822   app_wrk->wrk_mq_congested[thread_index] = 0;
823 }
824
825 u8 *
826 format_app_worker_listener (u8 * s, va_list * args)
827 {
828   app_worker_t *app_wrk = va_arg (*args, app_worker_t *);
829   u64 handle = va_arg (*args, u64);
830   u32 sm_index = va_arg (*args, u32);
831   int verbose = va_arg (*args, int);
832   session_t *listener;
833   const u8 *app_name;
834   u8 *str;
835
836   if (!app_wrk)
837     {
838       if (verbose)
839         s = format (s, "%-" SESSION_CLI_ID_LEN "s%-25s%-10s%-15s%-15s%-10s",
840                     "Connection", "App", "Wrk", "API Client", "ListenerID",
841                     "SegManager");
842       else
843         s = format (s, "%-" SESSION_CLI_ID_LEN "s%-25s%-10s", "Connection",
844                     "App", "Wrk");
845
846       return s;
847     }
848
849   app_name = application_name_from_index (app_wrk->app_index);
850   listener = listen_session_get_from_handle (handle);
851   str = format (0, "%U", format_session, listener, verbose);
852
853   if (verbose)
854     {
855       u8 *buf;
856       buf = format (0, "%u(%u)", app_wrk->wrk_map_index, app_wrk->wrk_index);
857       s = format (s, "%-" SESSION_CLI_ID_LEN "v%-25v%-10v%-15u%-15u%-10u", str,
858                   app_name, buf, app_wrk->api_client_index, handle, sm_index);
859       vec_free (buf);
860     }
861   else
862     s = format (s, "%-" SESSION_CLI_ID_LEN "v%-25v%=10u", str, app_name,
863                 app_wrk->wrk_map_index);
864
865   vec_free (str);
866
867   return s;
868 }
869
870 u8 *
871 format_app_worker (u8 * s, va_list * args)
872 {
873   app_worker_t *app_wrk = va_arg (*args, app_worker_t *);
874   u32 indent = 1;
875
876   s = format (s,
877               "%U wrk-index %u app-index %u map-index %u "
878               "api-client-index %d mq-cong %u\n",
879               format_white_space, indent, app_wrk->wrk_index,
880               app_wrk->app_index, app_wrk->wrk_map_index,
881               app_wrk->api_client_index, app_wrk->mq_congested);
882   return s;
883 }
884
885 void
886 app_worker_format_connects (app_worker_t * app_wrk, int verbose)
887 {
888   segment_manager_t *sm;
889
890   /* Header */
891   if (!app_wrk)
892     {
893       segment_manager_format_sessions (0, verbose);
894       return;
895     }
896
897   if (app_wrk->connects_seg_manager == (u32) ~ 0)
898     return;
899
900   sm = segment_manager_get (app_wrk->connects_seg_manager);
901   segment_manager_format_sessions (sm, verbose);
902 }
903
904 /*
905  * fd.io coding-style-patch-verification: ON
906  *
907  * Local Variables:
908  * eval: (c-set-style "gnu")
909  * End:
910  */