session: improve main thread connects rpc
[vpp.git] / src / vnet / session / session_node.c
1 /*
2  * Copyright (c) 2017-2019 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include <math.h>
17 #include <vlib/vlib.h>
18 #include <vnet/vnet.h>
19 #include <vppinfra/elog.h>
20 #include <vnet/session/transport.h>
21 #include <vnet/session/session.h>
22 #include <vnet/session/application.h>
23 #include <vnet/session/application_interface.h>
24 #include <vnet/session/application_local.h>
25 #include <vnet/session/session_debug.h>
26 #include <svm/queue.h>
27 #include <sys/timerfd.h>
28
29 #define app_check_thread_and_barrier(_fn, _arg)                         \
30   if (!vlib_thread_is_main_w_barrier ())                                \
31     {                                                                   \
32      vlib_rpc_call_main_thread (_fn, (u8 *) _arg, sizeof(*_arg));       \
33       return;                                                           \
34    }
35
36 static void
37 session_wrk_timerfd_update (session_worker_t *wrk, u64 time_ns)
38 {
39   struct itimerspec its;
40
41   its.it_value.tv_sec = 0;
42   its.it_value.tv_nsec = time_ns;
43   its.it_interval.tv_sec = 0;
44   its.it_interval.tv_nsec = its.it_value.tv_nsec;
45
46   if (timerfd_settime (wrk->timerfd, 0, &its, NULL) == -1)
47     clib_warning ("timerfd_settime");
48 }
49
50 always_inline u64
51 session_wrk_tfd_timeout (session_wrk_state_t state, u32 thread_index)
52 {
53   if (state == SESSION_WRK_INTERRUPT)
54     return thread_index ? 1e6 : vlib_num_workers () ? 5e8 : 1e6;
55   else if (state == SESSION_WRK_IDLE)
56     return thread_index ? 1e8 : vlib_num_workers () ? 5e8 : 1e8;
57   else
58     return 0;
59 }
60
61 static inline void
62 session_wrk_set_state (session_worker_t *wrk, session_wrk_state_t state)
63 {
64   u64 time_ns;
65
66   wrk->state = state;
67   if (wrk->timerfd == -1)
68     return;
69   time_ns = session_wrk_tfd_timeout (state, wrk->vm->thread_index);
70   session_wrk_timerfd_update (wrk, time_ns);
71 }
72
73 static transport_endpt_ext_cfg_t *
74 session_mq_get_ext_config (application_t *app, uword offset)
75 {
76   svm_fifo_chunk_t *c;
77   fifo_segment_t *fs;
78
79   fs = application_get_rx_mqs_segment (app);
80   c = fs_chunk_ptr (fs->h, offset);
81   return (transport_endpt_ext_cfg_t *) c->data;
82 }
83
84 static void
85 session_mq_free_ext_config (application_t *app, uword offset)
86 {
87   svm_fifo_chunk_t *c;
88   fifo_segment_t *fs;
89
90   fs = application_get_rx_mqs_segment (app);
91   c = fs_chunk_ptr (fs->h, offset);
92   fifo_segment_collect_chunk (fs, 0 /* only one slice */, c);
93 }
94
95 static void
96 session_mq_listen_handler (void *data)
97 {
98   session_listen_msg_t *mp = (session_listen_msg_t *) data;
99   vnet_listen_args_t _a, *a = &_a;
100   app_worker_t *app_wrk;
101   application_t *app;
102   int rv;
103
104   app_check_thread_and_barrier (session_mq_listen_handler, mp);
105
106   app = application_lookup (mp->client_index);
107   if (!app)
108     return;
109
110   clib_memset (a, 0, sizeof (*a));
111   a->sep.is_ip4 = mp->is_ip4;
112   ip_copy (&a->sep.ip, &mp->ip, mp->is_ip4);
113   a->sep.port = mp->port;
114   a->sep.fib_index = mp->vrf;
115   a->sep.sw_if_index = ENDPOINT_INVALID_INDEX;
116   a->sep.transport_proto = mp->proto;
117   a->app_index = app->app_index;
118   a->wrk_map_index = mp->wrk_index;
119   a->sep_ext.transport_flags = mp->flags;
120
121   if (mp->ext_config)
122     a->sep_ext.ext_cfg = session_mq_get_ext_config (app, mp->ext_config);
123
124   if ((rv = vnet_listen (a)))
125     clib_warning ("listen returned: %U", format_session_error, rv);
126
127   app_wrk = application_get_worker (app, mp->wrk_index);
128   mq_send_session_bound_cb (app_wrk->wrk_index, mp->context, a->handle, rv);
129
130   if (mp->ext_config)
131     session_mq_free_ext_config (app, mp->ext_config);
132 }
133
134 static void
135 session_mq_listen_uri_handler (void *data)
136 {
137   session_listen_uri_msg_t *mp = (session_listen_uri_msg_t *) data;
138   vnet_listen_args_t _a, *a = &_a;
139   app_worker_t *app_wrk;
140   application_t *app;
141   int rv;
142
143   app_check_thread_and_barrier (session_mq_listen_uri_handler, mp);
144
145   app = application_lookup (mp->client_index);
146   if (!app)
147     return;
148
149   clib_memset (a, 0, sizeof (*a));
150   a->uri = (char *) mp->uri;
151   a->app_index = app->app_index;
152   rv = vnet_bind_uri (a);
153
154   app_wrk = application_get_worker (app, 0);
155   mq_send_session_bound_cb (app_wrk->wrk_index, mp->context, a->handle, rv);
156 }
157
158 static void
159 session_mq_connect_one (session_connect_msg_t *mp)
160 {
161   vnet_connect_args_t _a, *a = &_a;
162   app_worker_t *app_wrk;
163   application_t *app;
164   int rv;
165
166   app = application_lookup (mp->client_index);
167   if (!app)
168     return;
169
170   clib_memset (a, 0, sizeof (*a));
171   a->sep.is_ip4 = mp->is_ip4;
172   clib_memcpy_fast (&a->sep.ip, &mp->ip, sizeof (mp->ip));
173   a->sep.port = mp->port;
174   a->sep.transport_proto = mp->proto;
175   a->sep.peer.fib_index = mp->vrf;
176   clib_memcpy_fast (&a->sep.peer.ip, &mp->lcl_ip, sizeof (mp->lcl_ip));
177   if (mp->is_ip4)
178     {
179       ip46_address_mask_ip4 (&a->sep.ip);
180       ip46_address_mask_ip4 (&a->sep.peer.ip);
181     }
182   a->sep.peer.port = mp->lcl_port;
183   a->sep.peer.sw_if_index = ENDPOINT_INVALID_INDEX;
184   a->sep_ext.parent_handle = mp->parent_handle;
185   a->sep_ext.transport_flags = mp->flags;
186   a->api_context = mp->context;
187   a->app_index = app->app_index;
188   a->wrk_map_index = mp->wrk_index;
189
190   if (mp->ext_config)
191     a->sep_ext.ext_cfg = session_mq_get_ext_config (app, mp->ext_config);
192
193   if ((rv = vnet_connect (a)))
194     {
195       clib_warning ("connect returned: %U", format_session_error, rv);
196       app_wrk = application_get_worker (app, mp->wrk_index);
197       mq_send_session_connected_cb (app_wrk->wrk_index, mp->context, 0, rv);
198     }
199
200   if (mp->ext_config)
201     session_mq_free_ext_config (app, mp->ext_config);
202 }
203
204 static void
205 session_mq_handle_connects_rpc (void *arg)
206 {
207   u32 max_connects = 32, n_connects = 0;
208   vlib_main_t *vm = vlib_get_main ();
209   session_evt_elt_t *he, *elt, *next;
210   session_worker_t *fwrk, *wrk;
211
212   ASSERT (vlib_get_thread_index () == 0);
213
214   /* Pending connects on linked list pertaining to first worker */
215   fwrk = session_main_get_worker (1);
216   if (!fwrk->n_pending_connects)
217     goto update_state;
218
219   vlib_worker_thread_barrier_sync (vm);
220
221   he = clib_llist_elt (fwrk->event_elts, fwrk->pending_connects);
222   elt = clib_llist_next (fwrk->event_elts, evt_list, he);
223
224   /* Avoid holding the barrier for too long */
225   while (n_connects < max_connects && elt != he)
226     {
227       next = clib_llist_next (fwrk->event_elts, evt_list, elt);
228       clib_llist_remove (fwrk->event_elts, evt_list, elt);
229       session_mq_connect_one (session_evt_ctrl_data (fwrk, elt));
230       clib_llist_put (fwrk->event_elts, elt);
231       elt = next;
232       n_connects += 1;
233     }
234
235   /* Decrement with worker barrier */
236   fwrk->n_pending_connects -= n_connects;
237
238   vlib_worker_thread_barrier_release (vm);
239
240 update_state:
241
242   /* Switch worker to poll mode if it was in interrupt mode and had work or
243    * back to interrupt if threshold of loops without a connect is passed.
244    * While in poll mode, reprogram connects rpc */
245   wrk = session_main_get_worker (0);
246   if (wrk->state != SESSION_WRK_POLLING)
247     {
248       if (n_connects)
249         {
250           session_wrk_set_state (wrk, SESSION_WRK_POLLING);
251           vlib_node_set_state (vm, session_queue_node.index,
252                                VLIB_NODE_STATE_POLLING);
253           wrk->no_connect_loops = 0;
254         }
255     }
256   else
257     {
258       if (!n_connects)
259         {
260           if (++wrk->no_connect_loops > 1e5)
261             {
262               session_wrk_set_state (wrk, SESSION_WRK_INTERRUPT);
263               vlib_node_set_state (vm, session_queue_node.index,
264                                    VLIB_NODE_STATE_INTERRUPT);
265             }
266         }
267       else
268         wrk->no_connect_loops = 0;
269     }
270
271   if (wrk->state == SESSION_WRK_POLLING)
272     {
273       elt = session_evt_alloc_ctrl (wrk);
274       elt->evt.event_type = SESSION_CTRL_EVT_RPC;
275       elt->evt.rpc_args.fp = session_mq_handle_connects_rpc;
276     }
277 }
278
279 static void
280 session_mq_connect_handler (session_worker_t *wrk, session_evt_elt_t *elt)
281 {
282   u32 thread_index = wrk - session_main.wrk;
283   session_evt_elt_t *he;
284
285   /* No workers, so just deal with the connect now */
286   if (PREDICT_FALSE (!thread_index))
287     {
288       session_mq_connect_one (session_evt_ctrl_data (wrk, elt));
289       return;
290     }
291
292   if (PREDICT_FALSE (thread_index != 1))
293     {
294       clib_warning ("Connect on wrong thread. Dropping");
295       return;
296     }
297
298   /* Add to pending list to be handled by main thread */
299   he = clib_llist_elt (wrk->event_elts, wrk->pending_connects);
300   clib_llist_add_tail (wrk->event_elts, evt_list, elt, he);
301
302   /* Decremented with worker barrier */
303   wrk->n_pending_connects += 1;
304   if (wrk->n_pending_connects == 1)
305     {
306       vlib_node_set_interrupt_pending (vlib_get_main_by_index (0),
307                                        session_queue_node.index);
308       session_send_rpc_evt_to_thread (0, session_mq_handle_connects_rpc, 0);
309     }
310 }
311
312 static void
313 session_mq_connect_uri_handler (void *data)
314 {
315   session_connect_uri_msg_t *mp = (session_connect_uri_msg_t *) data;
316   vnet_connect_args_t _a, *a = &_a;
317   app_worker_t *app_wrk;
318   application_t *app;
319   int rv;
320
321   app_check_thread_and_barrier (session_mq_connect_uri_handler, mp);
322
323   app = application_lookup (mp->client_index);
324   if (!app)
325     return;
326
327   clib_memset (a, 0, sizeof (*a));
328   a->uri = (char *) mp->uri;
329   a->api_context = mp->context;
330   a->app_index = app->app_index;
331   if ((rv = vnet_connect_uri (a)))
332     {
333       clib_warning ("connect_uri returned: %d", rv);
334       app_wrk = application_get_worker (app, 0 /* default wrk only */ );
335       mq_send_session_connected_cb (app_wrk->wrk_index, mp->context, 0, rv);
336     }
337 }
338
339 static void
340 session_mq_shutdown_handler (void *data)
341 {
342   session_shutdown_msg_t *mp = (session_shutdown_msg_t *) data;
343   vnet_shutdown_args_t _a, *a = &_a;
344   application_t *app;
345
346   app = application_lookup (mp->client_index);
347   if (!app)
348     return;
349
350   a->app_index = app->app_index;
351   a->handle = mp->handle;
352   vnet_shutdown_session (a);
353 }
354
355 static void
356 session_mq_disconnect_handler (void *data)
357 {
358   session_disconnect_msg_t *mp = (session_disconnect_msg_t *) data;
359   vnet_disconnect_args_t _a, *a = &_a;
360   application_t *app;
361
362   app = application_lookup (mp->client_index);
363   if (!app)
364     return;
365
366   a->app_index = app->app_index;
367   a->handle = mp->handle;
368   vnet_disconnect_session (a);
369 }
370
371 static void
372 app_mq_detach_handler (void *data)
373 {
374   session_app_detach_msg_t *mp = (session_app_detach_msg_t *) data;
375   vnet_app_detach_args_t _a, *a = &_a;
376   application_t *app;
377
378   app_check_thread_and_barrier (app_mq_detach_handler, mp);
379
380   app = application_lookup (mp->client_index);
381   if (!app)
382     return;
383
384   a->app_index = app->app_index;
385   a->api_client_index = mp->client_index;
386   vnet_application_detach (a);
387 }
388
389 static void
390 session_mq_unlisten_handler (void *data)
391 {
392   session_unlisten_msg_t *mp = (session_unlisten_msg_t *) data;
393   vnet_unlisten_args_t _a, *a = &_a;
394   app_worker_t *app_wrk;
395   application_t *app;
396   int rv;
397
398   app_check_thread_and_barrier (session_mq_unlisten_handler, mp);
399
400   app = application_lookup (mp->client_index);
401   if (!app)
402     return;
403
404   clib_memset (a, 0, sizeof (*a));
405   a->app_index = app->app_index;
406   a->handle = mp->handle;
407   a->wrk_map_index = mp->wrk_index;
408   if ((rv = vnet_unlisten (a)))
409     clib_warning ("unlisten returned: %d", rv);
410
411   app_wrk = application_get_worker (app, a->wrk_map_index);
412   if (!app_wrk)
413     return;
414
415   mq_send_unlisten_reply (app_wrk, mp->handle, mp->context, rv);
416 }
417
418 static void
419 session_mq_accepted_reply_handler (void *data)
420 {
421   session_accepted_reply_msg_t *mp = (session_accepted_reply_msg_t *) data;
422   vnet_disconnect_args_t _a = { 0 }, *a = &_a;
423   session_state_t old_state;
424   app_worker_t *app_wrk;
425   session_t *s;
426
427   /* Server isn't interested, kill the session */
428   if (mp->retval)
429     {
430       a->app_index = mp->context;
431       a->handle = mp->handle;
432       vnet_disconnect_session (a);
433       return;
434     }
435
436   /* Mail this back from the main thread. We're not polling in main
437    * thread so we're using other workers for notifications. */
438   if (vlib_num_workers () && vlib_get_thread_index () != 0
439       && session_thread_from_handle (mp->handle) == 0)
440     {
441       vlib_rpc_call_main_thread (session_mq_accepted_reply_handler,
442                                  (u8 *) mp, sizeof (*mp));
443       return;
444     }
445
446   s = session_get_from_handle_if_valid (mp->handle);
447   if (!s)
448     return;
449
450   app_wrk = app_worker_get (s->app_wrk_index);
451   if (app_wrk->app_index != mp->context)
452     {
453       clib_warning ("app doesn't own session");
454       return;
455     }
456
457   if (!session_has_transport (s))
458     {
459       s->session_state = SESSION_STATE_READY;
460       if (ct_session_connect_notify (s))
461         return;
462     }
463   else
464     {
465       old_state = s->session_state;
466       s->session_state = SESSION_STATE_READY;
467
468       if (!svm_fifo_is_empty_prod (s->rx_fifo))
469         app_worker_lock_and_send_event (app_wrk, s, SESSION_IO_EVT_RX);
470
471       /* Closed while waiting for app to reply. Resend disconnect */
472       if (old_state >= SESSION_STATE_TRANSPORT_CLOSING)
473         {
474           app_worker_close_notify (app_wrk, s);
475           s->session_state = old_state;
476           return;
477         }
478     }
479 }
480
481 static void
482 session_mq_reset_reply_handler (void *data)
483 {
484   vnet_disconnect_args_t _a = { 0 }, *a = &_a;
485   session_reset_reply_msg_t *mp;
486   app_worker_t *app_wrk;
487   session_t *s;
488   application_t *app;
489   u32 index, thread_index;
490
491   mp = (session_reset_reply_msg_t *) data;
492   app = application_lookup (mp->context);
493   if (!app)
494     return;
495
496   session_parse_handle (mp->handle, &index, &thread_index);
497   s = session_get_if_valid (index, thread_index);
498
499   /* No session or not the right session */
500   if (!s || s->session_state < SESSION_STATE_TRANSPORT_CLOSING)
501     return;
502
503   app_wrk = app_worker_get (s->app_wrk_index);
504   if (!app_wrk || app_wrk->app_index != app->app_index)
505     {
506       clib_warning ("App %u does not own handle 0x%lx!", app->app_index,
507                     mp->handle);
508       return;
509     }
510
511   /* Client objected to resetting the session, log and continue */
512   if (mp->retval)
513     {
514       clib_warning ("client retval %d", mp->retval);
515       return;
516     }
517
518   /* This comes as a response to a reset, transport only waiting for
519    * confirmation to remove connection state, no need to disconnect */
520   a->handle = mp->handle;
521   a->app_index = app->app_index;
522   vnet_disconnect_session (a);
523 }
524
525 static void
526 session_mq_disconnected_handler (void *data)
527 {
528   session_disconnected_reply_msg_t *rmp;
529   vnet_disconnect_args_t _a, *a = &_a;
530   svm_msg_q_msg_t _msg, *msg = &_msg;
531   session_disconnected_msg_t *mp;
532   app_worker_t *app_wrk;
533   session_event_t *evt;
534   session_t *s;
535   application_t *app;
536   int rv = 0;
537
538   mp = (session_disconnected_msg_t *) data;
539   if (!(s = session_get_from_handle_if_valid (mp->handle)))
540     {
541       clib_warning ("could not disconnect handle %llu", mp->handle);
542       return;
543     }
544   app_wrk = app_worker_get (s->app_wrk_index);
545   app = application_lookup (mp->client_index);
546   if (!(app_wrk && app && app->app_index == app_wrk->app_index))
547     {
548       clib_warning ("could not disconnect session: %llu app: %u",
549                     mp->handle, mp->client_index);
550       return;
551     }
552
553   a->handle = mp->handle;
554   a->app_index = app_wrk->wrk_index;
555   rv = vnet_disconnect_session (a);
556
557   svm_msg_q_lock_and_alloc_msg_w_ring (app_wrk->event_queue,
558                                        SESSION_MQ_CTRL_EVT_RING,
559                                        SVM_Q_WAIT, msg);
560   evt = svm_msg_q_msg_data (app_wrk->event_queue, msg);
561   clib_memset (evt, 0, sizeof (*evt));
562   evt->event_type = SESSION_CTRL_EVT_DISCONNECTED_REPLY;
563   rmp = (session_disconnected_reply_msg_t *) evt->data;
564   rmp->handle = mp->handle;
565   rmp->context = mp->context;
566   rmp->retval = rv;
567   svm_msg_q_add_and_unlock (app_wrk->event_queue, msg);
568 }
569
570 static void
571 session_mq_disconnected_reply_handler (void *data)
572 {
573   session_disconnected_reply_msg_t *mp;
574   vnet_disconnect_args_t _a, *a = &_a;
575   application_t *app;
576
577   mp = (session_disconnected_reply_msg_t *) data;
578
579   /* Client objected to disconnecting the session, log and continue */
580   if (mp->retval)
581     {
582       clib_warning ("client retval %d", mp->retval);
583       return;
584     }
585
586   /* Disconnect has been confirmed. Confirm close to transport */
587   app = application_lookup (mp->context);
588   if (app)
589     {
590       a->handle = mp->handle;
591       a->app_index = app->app_index;
592       vnet_disconnect_session (a);
593     }
594 }
595
596 static void
597 session_mq_worker_update_handler (void *data)
598 {
599   session_worker_update_msg_t *mp = (session_worker_update_msg_t *) data;
600   session_worker_update_reply_msg_t *rmp;
601   svm_msg_q_msg_t _msg, *msg = &_msg;
602   app_worker_t *app_wrk;
603   u32 owner_app_wrk_map;
604   session_event_t *evt;
605   session_t *s;
606   application_t *app;
607
608   app = application_lookup (mp->client_index);
609   if (!app)
610     return;
611   if (!(s = session_get_from_handle_if_valid (mp->handle)))
612     {
613       clib_warning ("invalid handle %llu", mp->handle);
614       return;
615     }
616   app_wrk = app_worker_get (s->app_wrk_index);
617   if (app_wrk->app_index != app->app_index)
618     {
619       clib_warning ("app %u does not own session %llu", app->app_index,
620                     mp->handle);
621       return;
622     }
623   owner_app_wrk_map = app_wrk->wrk_map_index;
624   app_wrk = application_get_worker (app, mp->wrk_index);
625
626   /* This needs to come from the new owner */
627   if (mp->req_wrk_index == owner_app_wrk_map)
628     {
629       session_req_worker_update_msg_t *wump;
630
631       svm_msg_q_lock_and_alloc_msg_w_ring (app_wrk->event_queue,
632                                            SESSION_MQ_CTRL_EVT_RING,
633                                            SVM_Q_WAIT, msg);
634       evt = svm_msg_q_msg_data (app_wrk->event_queue, msg);
635       clib_memset (evt, 0, sizeof (*evt));
636       evt->event_type = SESSION_CTRL_EVT_REQ_WORKER_UPDATE;
637       wump = (session_req_worker_update_msg_t *) evt->data;
638       wump->session_handle = mp->handle;
639       svm_msg_q_add_and_unlock (app_wrk->event_queue, msg);
640       return;
641     }
642
643   app_worker_own_session (app_wrk, s);
644
645   /*
646    * Send reply
647    */
648   svm_msg_q_lock_and_alloc_msg_w_ring (app_wrk->event_queue,
649                                        SESSION_MQ_CTRL_EVT_RING,
650                                        SVM_Q_WAIT, msg);
651   evt = svm_msg_q_msg_data (app_wrk->event_queue, msg);
652   clib_memset (evt, 0, sizeof (*evt));
653   evt->event_type = SESSION_CTRL_EVT_WORKER_UPDATE_REPLY;
654   rmp = (session_worker_update_reply_msg_t *) evt->data;
655   rmp->handle = mp->handle;
656   if (s->rx_fifo)
657     rmp->rx_fifo = fifo_segment_fifo_offset (s->rx_fifo);
658   if (s->tx_fifo)
659     rmp->tx_fifo = fifo_segment_fifo_offset (s->tx_fifo);
660   rmp->segment_handle = session_segment_handle (s);
661   svm_msg_q_add_and_unlock (app_wrk->event_queue, msg);
662
663   /*
664    * Retransmit messages that may have been lost
665    */
666   if (s->tx_fifo && !svm_fifo_is_empty (s->tx_fifo))
667     session_send_io_evt_to_thread (s->tx_fifo, SESSION_IO_EVT_TX);
668
669   if (s->rx_fifo && !svm_fifo_is_empty (s->rx_fifo))
670     app_worker_lock_and_send_event (app_wrk, s, SESSION_IO_EVT_RX);
671
672   if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSING)
673     app_worker_close_notify (app_wrk, s);
674 }
675
676 static void
677 session_mq_app_wrk_rpc_handler (void *data)
678 {
679   session_app_wrk_rpc_msg_t *mp = (session_app_wrk_rpc_msg_t *) data;
680   svm_msg_q_msg_t _msg, *msg = &_msg;
681   session_app_wrk_rpc_msg_t *rmp;
682   app_worker_t *app_wrk;
683   session_event_t *evt;
684   application_t *app;
685
686   app = application_lookup (mp->client_index);
687   if (!app)
688     return;
689
690   app_wrk = application_get_worker (app, mp->wrk_index);
691
692   svm_msg_q_lock_and_alloc_msg_w_ring (app_wrk->event_queue,
693                                        SESSION_MQ_CTRL_EVT_RING, SVM_Q_WAIT,
694                                        msg);
695   evt = svm_msg_q_msg_data (app_wrk->event_queue, msg);
696   clib_memset (evt, 0, sizeof (*evt));
697   evt->event_type = SESSION_CTRL_EVT_APP_WRK_RPC;
698   rmp = (session_app_wrk_rpc_msg_t *) evt->data;
699   clib_memcpy (rmp->data, mp->data, sizeof (mp->data));
700   svm_msg_q_add_and_unlock (app_wrk->event_queue, msg);
701 }
702
703 static void
704 session_mq_transport_attr_handler (void *data)
705 {
706   session_transport_attr_msg_t *mp = (session_transport_attr_msg_t *) data;
707   session_transport_attr_reply_msg_t *rmp;
708   svm_msg_q_msg_t _msg, *msg = &_msg;
709   app_worker_t *app_wrk;
710   session_event_t *evt;
711   application_t *app;
712   session_t *s;
713   int rv;
714
715   app = application_lookup (mp->client_index);
716   if (!app)
717     return;
718
719   if (!(s = session_get_from_handle_if_valid (mp->handle)))
720     {
721       clib_warning ("invalid handle %llu", mp->handle);
722       return;
723     }
724   app_wrk = app_worker_get (s->app_wrk_index);
725   if (app_wrk->app_index != app->app_index)
726     {
727       clib_warning ("app %u does not own session %llu", app->app_index,
728                     mp->handle);
729       return;
730     }
731
732   rv = session_transport_attribute (s, mp->is_get, &mp->attr);
733
734   svm_msg_q_lock_and_alloc_msg_w_ring (
735     app_wrk->event_queue, SESSION_MQ_CTRL_EVT_RING, SVM_Q_WAIT, msg);
736   evt = svm_msg_q_msg_data (app_wrk->event_queue, msg);
737   clib_memset (evt, 0, sizeof (*evt));
738   evt->event_type = SESSION_CTRL_EVT_TRANSPORT_ATTR_REPLY;
739   rmp = (session_transport_attr_reply_msg_t *) evt->data;
740   rmp->handle = mp->handle;
741   rmp->retval = rv;
742   rmp->is_get = mp->is_get;
743   if (!rv && mp->is_get)
744     rmp->attr = mp->attr;
745   svm_msg_q_add_and_unlock (app_wrk->event_queue, msg);
746 }
747
748 vlib_node_registration_t session_queue_node;
749
750 typedef struct
751 {
752   u32 session_index;
753   u32 server_thread_index;
754 } session_queue_trace_t;
755
756 /* packet trace format function */
757 static u8 *
758 format_session_queue_trace (u8 * s, va_list * args)
759 {
760   CLIB_UNUSED (vlib_main_t * vm) = va_arg (*args, vlib_main_t *);
761   CLIB_UNUSED (vlib_node_t * node) = va_arg (*args, vlib_node_t *);
762   session_queue_trace_t *t = va_arg (*args, session_queue_trace_t *);
763
764   s = format (s, "session index %d thread index %d",
765               t->session_index, t->server_thread_index);
766   return s;
767 }
768
769 #define foreach_session_queue_error             \
770 _(TX, "Packets transmitted")                    \
771 _(TIMER, "Timer events")                        \
772 _(NO_BUFFER, "Out of buffers")
773
774 typedef enum
775 {
776 #define _(sym,str) SESSION_QUEUE_ERROR_##sym,
777   foreach_session_queue_error
778 #undef _
779     SESSION_QUEUE_N_ERROR,
780 } session_queue_error_t;
781
782 static char *session_queue_error_strings[] = {
783 #define _(sym,string) string,
784   foreach_session_queue_error
785 #undef _
786 };
787
788 enum
789 {
790   SESSION_TX_NO_BUFFERS = -2,
791   SESSION_TX_NO_DATA,
792   SESSION_TX_OK
793 };
794
795 static void
796 session_tx_trace_frame (vlib_main_t * vm, vlib_node_runtime_t * node,
797                         u32 next_index, u32 * to_next, u16 n_segs,
798                         session_t * s, u32 n_trace)
799 {
800   while (n_trace && n_segs)
801     {
802       vlib_buffer_t *b = vlib_get_buffer (vm, to_next[0]);
803       if (PREDICT_TRUE
804           (vlib_trace_buffer
805            (vm, node, next_index, b, 1 /* follow_chain */ )))
806         {
807           session_queue_trace_t *t =
808             vlib_add_trace (vm, node, b, sizeof (*t));
809           t->session_index = s->session_index;
810           t->server_thread_index = s->thread_index;
811           n_trace--;
812         }
813       to_next++;
814       n_segs--;
815     }
816   vlib_set_trace_count (vm, node, n_trace);
817 }
818
819 always_inline void
820 session_tx_fifo_chain_tail (vlib_main_t * vm, session_tx_context_t * ctx,
821                             vlib_buffer_t * b, u16 * n_bufs, u8 peek_data)
822 {
823   vlib_buffer_t *chain_b, *prev_b;
824   u32 chain_bi0, to_deq, left_from_seg;
825   u16 len_to_deq, n_bytes_read;
826   u8 *data, j;
827
828   b->flags |= VLIB_BUFFER_TOTAL_LENGTH_VALID;
829   b->total_length_not_including_first_buffer = 0;
830
831   chain_b = b;
832   left_from_seg = clib_min (ctx->sp.snd_mss - b->current_length,
833                             ctx->left_to_snd);
834   to_deq = left_from_seg;
835   for (j = 1; j < ctx->n_bufs_per_seg; j++)
836     {
837       prev_b = chain_b;
838       len_to_deq = clib_min (to_deq, ctx->deq_per_buf);
839
840       *n_bufs -= 1;
841       chain_bi0 = ctx->tx_buffers[*n_bufs];
842       chain_b = vlib_get_buffer (vm, chain_bi0);
843       chain_b->current_data = 0;
844       data = vlib_buffer_get_current (chain_b);
845       if (peek_data)
846         {
847           n_bytes_read = svm_fifo_peek (ctx->s->tx_fifo,
848                                         ctx->sp.tx_offset, len_to_deq, data);
849           ctx->sp.tx_offset += n_bytes_read;
850         }
851       else
852         {
853           if (ctx->transport_vft->transport_options.tx_type ==
854               TRANSPORT_TX_DGRAM)
855             {
856               svm_fifo_t *f = ctx->s->tx_fifo;
857               session_dgram_hdr_t *hdr = &ctx->hdr;
858               u16 deq_now;
859               u32 offset;
860
861               deq_now = clib_min (hdr->data_length - hdr->data_offset,
862                                   len_to_deq);
863               offset = hdr->data_offset + SESSION_CONN_HDR_LEN;
864               n_bytes_read = svm_fifo_peek (f, offset, deq_now, data);
865               ASSERT (n_bytes_read > 0);
866
867               hdr->data_offset += n_bytes_read;
868               if (hdr->data_offset == hdr->data_length)
869                 {
870                   offset = hdr->data_length + SESSION_CONN_HDR_LEN;
871                   svm_fifo_dequeue_drop (f, offset);
872                   if (ctx->left_to_snd > n_bytes_read)
873                     svm_fifo_peek (ctx->s->tx_fifo, 0, sizeof (ctx->hdr),
874                                    (u8 *) & ctx->hdr);
875                 }
876               else if (ctx->left_to_snd == n_bytes_read)
877                 svm_fifo_overwrite_head (ctx->s->tx_fifo, (u8 *) & ctx->hdr,
878                                          sizeof (session_dgram_pre_hdr_t));
879             }
880           else
881             n_bytes_read = svm_fifo_dequeue (ctx->s->tx_fifo,
882                                              len_to_deq, data);
883         }
884       ASSERT (n_bytes_read == len_to_deq);
885       chain_b->current_length = n_bytes_read;
886       b->total_length_not_including_first_buffer += chain_b->current_length;
887
888       /* update previous buffer */
889       prev_b->next_buffer = chain_bi0;
890       prev_b->flags |= VLIB_BUFFER_NEXT_PRESENT;
891
892       /* update current buffer */
893       chain_b->next_buffer = 0;
894
895       to_deq -= n_bytes_read;
896       if (to_deq == 0)
897         break;
898     }
899   ASSERT (to_deq == 0
900           && b->total_length_not_including_first_buffer == left_from_seg);
901   ctx->left_to_snd -= left_from_seg;
902 }
903
904 always_inline void
905 session_tx_fill_buffer (vlib_main_t * vm, session_tx_context_t * ctx,
906                         vlib_buffer_t * b, u16 * n_bufs, u8 peek_data)
907 {
908   u32 len_to_deq;
909   u8 *data0;
910   int n_bytes_read;
911
912   /*
913    * Start with the first buffer in chain
914    */
915   b->error = 0;
916   b->flags = VNET_BUFFER_F_LOCALLY_ORIGINATED;
917   b->current_data = 0;
918
919   data0 = vlib_buffer_make_headroom (b, TRANSPORT_MAX_HDRS_LEN);
920   len_to_deq = clib_min (ctx->left_to_snd, ctx->deq_per_first_buf);
921
922   if (peek_data)
923     {
924       n_bytes_read = svm_fifo_peek (ctx->s->tx_fifo, ctx->sp.tx_offset,
925                                     len_to_deq, data0);
926       ASSERT (n_bytes_read > 0);
927       /* Keep track of progress locally, transport is also supposed to
928        * increment it independently when pushing the header */
929       ctx->sp.tx_offset += n_bytes_read;
930     }
931   else
932     {
933       if (ctx->transport_vft->transport_options.tx_type == TRANSPORT_TX_DGRAM)
934         {
935           session_dgram_hdr_t *hdr = &ctx->hdr;
936           svm_fifo_t *f = ctx->s->tx_fifo;
937           u16 deq_now;
938           u32 offset;
939
940           ASSERT (hdr->data_length > hdr->data_offset);
941           deq_now = clib_min (hdr->data_length - hdr->data_offset,
942                               len_to_deq);
943           offset = hdr->data_offset + SESSION_CONN_HDR_LEN;
944           n_bytes_read = svm_fifo_peek (f, offset, deq_now, data0);
945           ASSERT (n_bytes_read > 0);
946
947           if (ctx->s->session_state == SESSION_STATE_LISTENING)
948             {
949               ip_copy (&ctx->tc->rmt_ip, &hdr->rmt_ip, ctx->tc->is_ip4);
950               ctx->tc->rmt_port = hdr->rmt_port;
951             }
952           hdr->data_offset += n_bytes_read;
953           if (hdr->data_offset == hdr->data_length)
954             {
955               offset = hdr->data_length + SESSION_CONN_HDR_LEN;
956               svm_fifo_dequeue_drop (f, offset);
957               if (ctx->left_to_snd > n_bytes_read)
958                 svm_fifo_peek (ctx->s->tx_fifo, 0, sizeof (ctx->hdr),
959                                (u8 *) & ctx->hdr);
960             }
961           else if (ctx->left_to_snd == n_bytes_read)
962             svm_fifo_overwrite_head (ctx->s->tx_fifo, (u8 *) & ctx->hdr,
963                                      sizeof (session_dgram_pre_hdr_t));
964         }
965       else
966         {
967           n_bytes_read = svm_fifo_dequeue (ctx->s->tx_fifo,
968                                            len_to_deq, data0);
969           ASSERT (n_bytes_read > 0);
970         }
971     }
972   b->current_length = n_bytes_read;
973   ctx->left_to_snd -= n_bytes_read;
974
975   /*
976    * Fill in the remaining buffers in the chain, if any
977    */
978   if (PREDICT_FALSE (ctx->n_bufs_per_seg > 1 && ctx->left_to_snd))
979     session_tx_fifo_chain_tail (vm, ctx, b, n_bufs, peek_data);
980 }
981
982 always_inline u8
983 session_tx_not_ready (session_t * s, u8 peek_data)
984 {
985   if (peek_data)
986     {
987       if (PREDICT_TRUE (s->session_state == SESSION_STATE_READY))
988         return 0;
989       /* Can retransmit for closed sessions but can't send new data if
990        * session is not ready or closed */
991       else if (s->session_state < SESSION_STATE_READY)
992         return 1;
993       else if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSED)
994         {
995           /* Allow closed transports to still send custom packets.
996            * For instance, tcp may want to send acks in time-wait. */
997           if (s->session_state != SESSION_STATE_TRANSPORT_DELETED
998               && (s->flags & SESSION_F_CUSTOM_TX))
999             return 0;
1000           return 2;
1001         }
1002     }
1003   return 0;
1004 }
1005
1006 always_inline transport_connection_t *
1007 session_tx_get_transport (session_tx_context_t * ctx, u8 peek_data)
1008 {
1009   if (peek_data)
1010     {
1011       return ctx->transport_vft->get_connection (ctx->s->connection_index,
1012                                                  ctx->s->thread_index);
1013     }
1014   else
1015     {
1016       if (ctx->s->session_state == SESSION_STATE_LISTENING)
1017         return ctx->transport_vft->get_listener (ctx->s->connection_index);
1018       else
1019         {
1020           return ctx->transport_vft->get_connection (ctx->s->connection_index,
1021                                                      ctx->s->thread_index);
1022         }
1023     }
1024 }
1025
1026 always_inline void
1027 session_tx_set_dequeue_params (vlib_main_t * vm, session_tx_context_t * ctx,
1028                                u32 max_segs, u8 peek_data)
1029 {
1030   u32 n_bytes_per_buf, n_bytes_per_seg;
1031
1032   n_bytes_per_buf = vlib_buffer_get_default_data_size (vm);
1033   ctx->max_dequeue = svm_fifo_max_dequeue_cons (ctx->s->tx_fifo);
1034
1035   if (peek_data)
1036     {
1037       /* Offset in rx fifo from where to peek data */
1038       if (PREDICT_FALSE (ctx->sp.tx_offset >= ctx->max_dequeue))
1039         {
1040           ctx->max_len_to_snd = 0;
1041           return;
1042         }
1043       ctx->max_dequeue -= ctx->sp.tx_offset;
1044     }
1045   else
1046     {
1047       if (ctx->transport_vft->transport_options.tx_type == TRANSPORT_TX_DGRAM)
1048         {
1049           u32 len, chain_limit;
1050
1051           if (ctx->max_dequeue <= sizeof (ctx->hdr))
1052             {
1053               ctx->max_len_to_snd = 0;
1054               return;
1055             }
1056
1057           svm_fifo_peek (ctx->s->tx_fifo, 0, sizeof (ctx->hdr),
1058                          (u8 *) & ctx->hdr);
1059           ASSERT (ctx->hdr.data_length > ctx->hdr.data_offset);
1060           len = ctx->hdr.data_length - ctx->hdr.data_offset;
1061
1062           /* Process multiple dgrams if smaller than min (buf_space, mss).
1063            * This avoids handling multiple dgrams if they require buffer
1064            * chains */
1065           chain_limit = clib_min (n_bytes_per_buf - TRANSPORT_MAX_HDRS_LEN,
1066                                   ctx->sp.snd_mss);
1067           if (ctx->hdr.data_length <= chain_limit)
1068             {
1069               u32 first_dgram_len, dgram_len, offset, max_offset;
1070               session_dgram_hdr_t hdr;
1071
1072               ctx->sp.snd_mss = clib_min (ctx->sp.snd_mss, len);
1073               offset = ctx->hdr.data_length + sizeof (session_dgram_hdr_t);
1074               first_dgram_len = len;
1075               max_offset = clib_min (ctx->max_dequeue, 16 << 10);
1076
1077               while (offset < max_offset)
1078                 {
1079                   svm_fifo_peek (ctx->s->tx_fifo, offset, sizeof (ctx->hdr),
1080                                  (u8 *) & hdr);
1081                   ASSERT (hdr.data_length > hdr.data_offset);
1082                   dgram_len = hdr.data_length - hdr.data_offset;
1083                   if (len + dgram_len > ctx->max_dequeue
1084                       || first_dgram_len != dgram_len)
1085                     break;
1086                   len += dgram_len;
1087                   offset += sizeof (hdr) + hdr.data_length;
1088                 }
1089             }
1090
1091           ctx->max_dequeue = len;
1092         }
1093     }
1094   ASSERT (ctx->max_dequeue > 0);
1095
1096   /* Ensure we're not writing more than transport window allows */
1097   if (ctx->max_dequeue < ctx->sp.snd_space)
1098     {
1099       /* Constrained by tx queue. Try to send only fully formed segments */
1100       ctx->max_len_to_snd = (ctx->max_dequeue > ctx->sp.snd_mss) ?
1101         (ctx->max_dequeue - (ctx->max_dequeue % ctx->sp.snd_mss)) :
1102         ctx->max_dequeue;
1103       /* TODO Nagle ? */
1104     }
1105   else
1106     {
1107       /* Expectation is that snd_space0 is already a multiple of snd_mss */
1108       ctx->max_len_to_snd = ctx->sp.snd_space;
1109     }
1110
1111   /* Check if we're tx constrained by the node */
1112   ctx->n_segs_per_evt = ceil ((f64) ctx->max_len_to_snd / ctx->sp.snd_mss);
1113   if (ctx->n_segs_per_evt > max_segs)
1114     {
1115       ctx->n_segs_per_evt = max_segs;
1116       ctx->max_len_to_snd = max_segs * ctx->sp.snd_mss;
1117     }
1118
1119   ASSERT (n_bytes_per_buf > TRANSPORT_MAX_HDRS_LEN);
1120   if (ctx->n_segs_per_evt > 1)
1121     {
1122       u32 n_bytes_last_seg, n_bufs_last_seg;
1123
1124       n_bytes_per_seg = TRANSPORT_MAX_HDRS_LEN + ctx->sp.snd_mss;
1125       n_bytes_last_seg = TRANSPORT_MAX_HDRS_LEN + ctx->max_len_to_snd
1126         - ((ctx->n_segs_per_evt - 1) * ctx->sp.snd_mss);
1127       ctx->n_bufs_per_seg = ceil ((f64) n_bytes_per_seg / n_bytes_per_buf);
1128       n_bufs_last_seg = ceil ((f64) n_bytes_last_seg / n_bytes_per_buf);
1129       ctx->n_bufs_needed = ((ctx->n_segs_per_evt - 1) * ctx->n_bufs_per_seg)
1130         + n_bufs_last_seg;
1131     }
1132   else
1133     {
1134       n_bytes_per_seg = TRANSPORT_MAX_HDRS_LEN + ctx->max_len_to_snd;
1135       ctx->n_bufs_per_seg = ceil ((f64) n_bytes_per_seg / n_bytes_per_buf);
1136       ctx->n_bufs_needed = ctx->n_bufs_per_seg;
1137     }
1138
1139   ctx->deq_per_buf = clib_min (ctx->sp.snd_mss, n_bytes_per_buf);
1140   ctx->deq_per_first_buf = clib_min (ctx->sp.snd_mss,
1141                                      n_bytes_per_buf -
1142                                      TRANSPORT_MAX_HDRS_LEN);
1143 }
1144
1145 always_inline void
1146 session_tx_maybe_reschedule (session_worker_t * wrk,
1147                              session_tx_context_t * ctx,
1148                              session_evt_elt_t * elt)
1149 {
1150   session_t *s = ctx->s;
1151
1152   svm_fifo_unset_event (s->tx_fifo);
1153   if (svm_fifo_max_dequeue_cons (s->tx_fifo) > ctx->sp.tx_offset)
1154     if (svm_fifo_set_event (s->tx_fifo))
1155       session_evt_add_head_old (wrk, elt);
1156 }
1157
1158 always_inline int
1159 session_tx_fifo_read_and_snd_i (session_worker_t * wrk,
1160                                 vlib_node_runtime_t * node,
1161                                 session_evt_elt_t * elt,
1162                                 int *n_tx_packets, u8 peek_data)
1163 {
1164   u32 n_trace, n_left, pbi, next_index, max_burst;
1165   session_tx_context_t *ctx = &wrk->ctx;
1166   session_main_t *smm = &session_main;
1167   session_event_t *e = &elt->evt;
1168   vlib_main_t *vm = wrk->vm;
1169   transport_proto_t tp;
1170   vlib_buffer_t *pb;
1171   u16 n_bufs, rv;
1172
1173   if (PREDICT_FALSE ((rv = session_tx_not_ready (ctx->s, peek_data))))
1174     {
1175       if (rv < 2)
1176         session_evt_add_old (wrk, elt);
1177       return SESSION_TX_NO_DATA;
1178     }
1179
1180   next_index = smm->session_type_to_next[ctx->s->session_type];
1181   max_burst = SESSION_NODE_FRAME_SIZE - *n_tx_packets;
1182
1183   tp = session_get_transport_proto (ctx->s);
1184   ctx->transport_vft = transport_protocol_get_vft (tp);
1185   ctx->tc = session_tx_get_transport (ctx, peek_data);
1186
1187   if (PREDICT_FALSE (e->event_type == SESSION_IO_EVT_TX_FLUSH))
1188     {
1189       if (ctx->transport_vft->flush_data)
1190         ctx->transport_vft->flush_data (ctx->tc);
1191       e->event_type = SESSION_IO_EVT_TX;
1192     }
1193
1194   if (ctx->s->flags & SESSION_F_CUSTOM_TX)
1195     {
1196       u32 n_custom_tx;
1197       ctx->s->flags &= ~SESSION_F_CUSTOM_TX;
1198       ctx->sp.max_burst_size = max_burst;
1199       n_custom_tx = ctx->transport_vft->custom_tx (ctx->tc, &ctx->sp);
1200       *n_tx_packets += n_custom_tx;
1201       if (PREDICT_FALSE
1202           (ctx->s->session_state >= SESSION_STATE_TRANSPORT_CLOSED))
1203         return SESSION_TX_OK;
1204       max_burst -= n_custom_tx;
1205       if (!max_burst || (ctx->s->flags & SESSION_F_CUSTOM_TX))
1206         {
1207           session_evt_add_old (wrk, elt);
1208           return SESSION_TX_OK;
1209         }
1210     }
1211
1212   transport_connection_snd_params (ctx->tc, &ctx->sp);
1213
1214   if (!ctx->sp.snd_space)
1215     {
1216       /* If the deschedule flag was set, remove session from scheduler.
1217        * Transport is responsible for rescheduling this session. */
1218       if (ctx->sp.flags & TRANSPORT_SND_F_DESCHED)
1219         transport_connection_deschedule (ctx->tc);
1220       /* Request to postpone the session, e.g., zero-wnd and transport
1221        * is not currently probing */
1222       else if (ctx->sp.flags & TRANSPORT_SND_F_POSTPONE)
1223         session_evt_add_old (wrk, elt);
1224       /* This flow queue is "empty" so it should be re-evaluated before
1225        * the ones that have data to send. */
1226       else
1227         session_evt_add_head_old (wrk, elt);
1228
1229       return SESSION_TX_NO_DATA;
1230     }
1231
1232   if (transport_connection_is_tx_paced (ctx->tc))
1233     {
1234       u32 snd_space = transport_connection_tx_pacer_burst (ctx->tc);
1235       if (snd_space < TRANSPORT_PACER_MIN_BURST)
1236         {
1237           session_evt_add_head_old (wrk, elt);
1238           return SESSION_TX_NO_DATA;
1239         }
1240       snd_space = clib_min (ctx->sp.snd_space, snd_space);
1241       ctx->sp.snd_space = snd_space >= ctx->sp.snd_mss ?
1242         snd_space - snd_space % ctx->sp.snd_mss : snd_space;
1243     }
1244
1245   /* Check how much we can pull. */
1246   session_tx_set_dequeue_params (vm, ctx, max_burst, peek_data);
1247
1248   if (PREDICT_FALSE (!ctx->max_len_to_snd))
1249     {
1250       transport_connection_tx_pacer_reset_bucket (ctx->tc, 0);
1251       session_tx_maybe_reschedule (wrk, ctx, elt);
1252       return SESSION_TX_NO_DATA;
1253     }
1254
1255   vec_validate_aligned (ctx->tx_buffers, ctx->n_bufs_needed - 1,
1256                         CLIB_CACHE_LINE_BYTES);
1257   n_bufs = vlib_buffer_alloc (vm, ctx->tx_buffers, ctx->n_bufs_needed);
1258   if (PREDICT_FALSE (n_bufs < ctx->n_bufs_needed))
1259     {
1260       if (n_bufs)
1261         vlib_buffer_free (vm, ctx->tx_buffers, n_bufs);
1262       session_evt_add_head_old (wrk, elt);
1263       vlib_node_increment_counter (wrk->vm, node->node_index,
1264                                    SESSION_QUEUE_ERROR_NO_BUFFER, 1);
1265       return SESSION_TX_NO_BUFFERS;
1266     }
1267
1268   if (transport_connection_is_tx_paced (ctx->tc))
1269     transport_connection_tx_pacer_update_bytes (ctx->tc, ctx->max_len_to_snd);
1270
1271   ctx->left_to_snd = ctx->max_len_to_snd;
1272   n_left = ctx->n_segs_per_evt;
1273
1274   while (n_left >= 4)
1275     {
1276       vlib_buffer_t *b0, *b1;
1277       u32 bi0, bi1;
1278
1279       pbi = ctx->tx_buffers[n_bufs - 3];
1280       pb = vlib_get_buffer (vm, pbi);
1281       vlib_prefetch_buffer_header (pb, STORE);
1282       pbi = ctx->tx_buffers[n_bufs - 4];
1283       pb = vlib_get_buffer (vm, pbi);
1284       vlib_prefetch_buffer_header (pb, STORE);
1285
1286       bi0 = ctx->tx_buffers[--n_bufs];
1287       bi1 = ctx->tx_buffers[--n_bufs];
1288
1289       b0 = vlib_get_buffer (vm, bi0);
1290       b1 = vlib_get_buffer (vm, bi1);
1291
1292       session_tx_fill_buffer (vm, ctx, b0, &n_bufs, peek_data);
1293       session_tx_fill_buffer (vm, ctx, b1, &n_bufs, peek_data);
1294
1295       ctx->transport_vft->push_header (ctx->tc, b0);
1296       ctx->transport_vft->push_header (ctx->tc, b1);
1297
1298       n_left -= 2;
1299
1300       vec_add1 (wrk->pending_tx_buffers, bi0);
1301       vec_add1 (wrk->pending_tx_buffers, bi1);
1302       vec_add1 (wrk->pending_tx_nexts, next_index);
1303       vec_add1 (wrk->pending_tx_nexts, next_index);
1304     }
1305   while (n_left)
1306     {
1307       vlib_buffer_t *b0;
1308       u32 bi0;
1309
1310       if (n_left > 1)
1311         {
1312           pbi = ctx->tx_buffers[n_bufs - 2];
1313           pb = vlib_get_buffer (vm, pbi);
1314           vlib_prefetch_buffer_header (pb, STORE);
1315         }
1316
1317       bi0 = ctx->tx_buffers[--n_bufs];
1318       b0 = vlib_get_buffer (vm, bi0);
1319       session_tx_fill_buffer (vm, ctx, b0, &n_bufs, peek_data);
1320
1321       /* Ask transport to push header after current_length and
1322        * total_length_not_including_first_buffer are updated */
1323       ctx->transport_vft->push_header (ctx->tc, b0);
1324
1325       n_left -= 1;
1326
1327       vec_add1 (wrk->pending_tx_buffers, bi0);
1328       vec_add1 (wrk->pending_tx_nexts, next_index);
1329     }
1330
1331   if (PREDICT_FALSE ((n_trace = vlib_get_trace_count (vm, node)) > 0))
1332     session_tx_trace_frame (vm, node, next_index, wrk->pending_tx_buffers,
1333                             ctx->n_segs_per_evt, ctx->s, n_trace);
1334
1335   if (PREDICT_FALSE (n_bufs))
1336     vlib_buffer_free (vm, ctx->tx_buffers, n_bufs);
1337
1338   *n_tx_packets += ctx->n_segs_per_evt;
1339
1340   SESSION_EVT (SESSION_EVT_DEQ, ctx->s, ctx->max_len_to_snd, ctx->max_dequeue,
1341                ctx->s->tx_fifo->has_event, wrk->last_vlib_time);
1342
1343   ASSERT (ctx->left_to_snd == 0);
1344
1345   /* If we couldn't dequeue all bytes reschedule as old flow. Otherwise,
1346    * check if application enqueued more data and reschedule accordingly */
1347   if (ctx->max_len_to_snd < ctx->max_dequeue)
1348     session_evt_add_old (wrk, elt);
1349   else
1350     session_tx_maybe_reschedule (wrk, ctx, elt);
1351
1352   if (!peek_data)
1353     {
1354       u32 n_dequeued = ctx->max_len_to_snd;
1355       if (ctx->transport_vft->transport_options.tx_type == TRANSPORT_TX_DGRAM)
1356         n_dequeued += ctx->n_segs_per_evt * SESSION_CONN_HDR_LEN;
1357       if (svm_fifo_needs_deq_ntf (ctx->s->tx_fifo, n_dequeued))
1358         session_dequeue_notify (ctx->s);
1359     }
1360   return SESSION_TX_OK;
1361 }
1362
1363 int
1364 session_tx_fifo_peek_and_snd (session_worker_t * wrk,
1365                               vlib_node_runtime_t * node,
1366                               session_evt_elt_t * e, int *n_tx_packets)
1367 {
1368   return session_tx_fifo_read_and_snd_i (wrk, node, e, n_tx_packets, 1);
1369 }
1370
1371 int
1372 session_tx_fifo_dequeue_and_snd (session_worker_t * wrk,
1373                                  vlib_node_runtime_t * node,
1374                                  session_evt_elt_t * e, int *n_tx_packets)
1375 {
1376   return session_tx_fifo_read_and_snd_i (wrk, node, e, n_tx_packets, 0);
1377 }
1378
1379 int
1380 session_tx_fifo_dequeue_internal (session_worker_t * wrk,
1381                                   vlib_node_runtime_t * node,
1382                                   session_evt_elt_t * elt, int *n_tx_packets)
1383 {
1384   transport_send_params_t *sp = &wrk->ctx.sp;
1385   session_t *s = wrk->ctx.s;
1386   u32 n_packets;
1387
1388   if (PREDICT_FALSE (s->session_state >= SESSION_STATE_TRANSPORT_CLOSED))
1389     return 0;
1390
1391   /* Clear custom-tx flag used to request reschedule for tx */
1392   s->flags &= ~SESSION_F_CUSTOM_TX;
1393
1394   sp->max_burst_size = clib_min (SESSION_NODE_FRAME_SIZE - *n_tx_packets,
1395                                  TRANSPORT_PACER_MAX_BURST_PKTS);
1396
1397   n_packets = transport_custom_tx (session_get_transport_proto (s), s, sp);
1398   *n_tx_packets += n_packets;
1399
1400   if (s->flags & SESSION_F_CUSTOM_TX)
1401     {
1402       session_evt_add_old (wrk, elt);
1403     }
1404   else if (!(sp->flags & TRANSPORT_SND_F_DESCHED))
1405     {
1406       svm_fifo_unset_event (s->tx_fifo);
1407       if (svm_fifo_max_dequeue_cons (s->tx_fifo))
1408         if (svm_fifo_set_event (s->tx_fifo))
1409           session_evt_add_head_old (wrk, elt);
1410     }
1411
1412   if (sp->max_burst_size &&
1413       svm_fifo_needs_deq_ntf (s->tx_fifo, sp->max_burst_size))
1414     session_dequeue_notify (s);
1415
1416   return n_packets;
1417 }
1418
1419 always_inline session_t *
1420 session_event_get_session (session_worker_t * wrk, session_event_t * e)
1421 {
1422   if (PREDICT_FALSE (pool_is_free_index (wrk->sessions, e->session_index)))
1423     return 0;
1424
1425   ASSERT (session_is_valid (e->session_index, wrk->vm->thread_index));
1426   return pool_elt_at_index (wrk->sessions, e->session_index);
1427 }
1428
1429 always_inline void
1430 session_event_dispatch_ctrl (session_worker_t * wrk, session_evt_elt_t * elt)
1431 {
1432   clib_llist_index_t ei;
1433   void (*fp) (void *);
1434   session_event_t *e;
1435   session_t *s;
1436
1437   ei = clib_llist_entry_index (wrk->event_elts, elt);
1438   e = &elt->evt;
1439
1440   switch (e->event_type)
1441     {
1442     case SESSION_CTRL_EVT_RPC:
1443       fp = e->rpc_args.fp;
1444       (*fp) (e->rpc_args.arg);
1445       break;
1446     case SESSION_CTRL_EVT_HALF_CLOSE:
1447       s = session_get_from_handle_if_valid (e->session_handle);
1448       if (PREDICT_FALSE (!s))
1449         break;
1450       session_transport_half_close (s);
1451       break;
1452     case SESSION_CTRL_EVT_CLOSE:
1453       s = session_get_from_handle_if_valid (e->session_handle);
1454       if (PREDICT_FALSE (!s))
1455         break;
1456       session_transport_close (s);
1457       break;
1458     case SESSION_CTRL_EVT_RESET:
1459       s = session_get_from_handle_if_valid (e->session_handle);
1460       if (PREDICT_FALSE (!s))
1461         break;
1462       session_transport_reset (s);
1463       break;
1464     case SESSION_CTRL_EVT_LISTEN:
1465       session_mq_listen_handler (session_evt_ctrl_data (wrk, elt));
1466       break;
1467     case SESSION_CTRL_EVT_LISTEN_URI:
1468       session_mq_listen_uri_handler (session_evt_ctrl_data (wrk, elt));
1469       break;
1470     case SESSION_CTRL_EVT_UNLISTEN:
1471       session_mq_unlisten_handler (session_evt_ctrl_data (wrk, elt));
1472       break;
1473     case SESSION_CTRL_EVT_CONNECT:
1474       session_mq_connect_handler (wrk, elt);
1475       break;
1476     case SESSION_CTRL_EVT_CONNECT_URI:
1477       session_mq_connect_uri_handler (session_evt_ctrl_data (wrk, elt));
1478       break;
1479     case SESSION_CTRL_EVT_SHUTDOWN:
1480       session_mq_shutdown_handler (session_evt_ctrl_data (wrk, elt));
1481       break;
1482     case SESSION_CTRL_EVT_DISCONNECT:
1483       session_mq_disconnect_handler (session_evt_ctrl_data (wrk, elt));
1484       break;
1485     case SESSION_CTRL_EVT_DISCONNECTED:
1486       session_mq_disconnected_handler (session_evt_ctrl_data (wrk, elt));
1487       break;
1488     case SESSION_CTRL_EVT_ACCEPTED_REPLY:
1489       session_mq_accepted_reply_handler (session_evt_ctrl_data (wrk, elt));
1490       break;
1491     case SESSION_CTRL_EVT_DISCONNECTED_REPLY:
1492       session_mq_disconnected_reply_handler (session_evt_ctrl_data (wrk,
1493                                                                     elt));
1494       break;
1495     case SESSION_CTRL_EVT_RESET_REPLY:
1496       session_mq_reset_reply_handler (session_evt_ctrl_data (wrk, elt));
1497       break;
1498     case SESSION_CTRL_EVT_WORKER_UPDATE:
1499       session_mq_worker_update_handler (session_evt_ctrl_data (wrk, elt));
1500       break;
1501     case SESSION_CTRL_EVT_APP_DETACH:
1502       app_mq_detach_handler (session_evt_ctrl_data (wrk, elt));
1503       break;
1504     case SESSION_CTRL_EVT_APP_WRK_RPC:
1505       session_mq_app_wrk_rpc_handler (session_evt_ctrl_data (wrk, elt));
1506       break;
1507     case SESSION_CTRL_EVT_TRANSPORT_ATTR:
1508       session_mq_transport_attr_handler (session_evt_ctrl_data (wrk, elt));
1509       break;
1510     default:
1511       clib_warning ("unhandled event type %d", e->event_type);
1512     }
1513
1514   /* Regrab elements in case pool moved */
1515   elt = clib_llist_elt (wrk->event_elts, ei);
1516   if (!clib_llist_elt_is_linked (elt, evt_list))
1517     {
1518       e = &elt->evt;
1519       if (e->event_type >= SESSION_CTRL_EVT_BOUND)
1520         session_evt_ctrl_data_free (wrk, elt);
1521       clib_llist_put (wrk->event_elts, elt);
1522     }
1523   SESSION_EVT (SESSION_EVT_COUNTS, CNT_CTRL_EVTS, 1, wrk);
1524 }
1525
1526 always_inline void
1527 session_event_dispatch_io (session_worker_t * wrk, vlib_node_runtime_t * node,
1528                            session_evt_elt_t * elt, int *n_tx_packets)
1529 {
1530   session_main_t *smm = &session_main;
1531   app_worker_t *app_wrk;
1532   clib_llist_index_t ei;
1533   session_event_t *e;
1534   session_t *s;
1535
1536   ei = clib_llist_entry_index (wrk->event_elts, elt);
1537   e = &elt->evt;
1538
1539   switch (e->event_type)
1540     {
1541     case SESSION_IO_EVT_TX_FLUSH:
1542     case SESSION_IO_EVT_TX:
1543       s = session_event_get_session (wrk, e);
1544       if (PREDICT_FALSE (!s))
1545         break;
1546       CLIB_PREFETCH (s->tx_fifo, 2 * CLIB_CACHE_LINE_BYTES, LOAD);
1547       wrk->ctx.s = s;
1548       /* Spray packets in per session type frames, since they go to
1549        * different nodes */
1550       (smm->session_tx_fns[s->session_type]) (wrk, node, elt, n_tx_packets);
1551       break;
1552     case SESSION_IO_EVT_RX:
1553       s = session_event_get_session (wrk, e);
1554       if (!s)
1555         break;
1556       transport_app_rx_evt (session_get_transport_proto (s),
1557                             s->connection_index, s->thread_index);
1558       break;
1559     case SESSION_IO_EVT_BUILTIN_RX:
1560       s = session_event_get_session (wrk, e);
1561       if (PREDICT_FALSE (!s || s->session_state >= SESSION_STATE_CLOSING))
1562         break;
1563       svm_fifo_unset_event (s->rx_fifo);
1564       app_wrk = app_worker_get (s->app_wrk_index);
1565       app_worker_builtin_rx (app_wrk, s);
1566       break;
1567     case SESSION_IO_EVT_BUILTIN_TX:
1568       s = session_get_from_handle_if_valid (e->session_handle);
1569       wrk->ctx.s = s;
1570       if (PREDICT_TRUE (s != 0))
1571         session_tx_fifo_dequeue_internal (wrk, node, elt, n_tx_packets);
1572       break;
1573     default:
1574       clib_warning ("unhandled event type %d", e->event_type);
1575     }
1576
1577   SESSION_EVT (SESSION_IO_EVT_COUNTS, e->event_type, 1, wrk);
1578
1579   /* Regrab elements in case pool moved */
1580   elt = clib_llist_elt (wrk->event_elts, ei);
1581   if (!clib_llist_elt_is_linked (elt, evt_list))
1582     clib_llist_put (wrk->event_elts, elt);
1583 }
1584
1585 /* *INDENT-OFF* */
1586 static const u32 session_evt_msg_sizes[] = {
1587 #define _(symc, sym)                                                    \
1588   [SESSION_CTRL_EVT_ ## symc] = sizeof (session_ ## sym ##_msg_t),
1589   foreach_session_ctrl_evt
1590 #undef _
1591 };
1592 /* *INDENT-ON* */
1593
1594 always_inline void
1595 session_evt_add_to_list (session_worker_t * wrk, session_event_t * evt)
1596 {
1597   session_evt_elt_t *elt;
1598
1599   if (evt->event_type >= SESSION_CTRL_EVT_RPC)
1600     {
1601       elt = session_evt_alloc_ctrl (wrk);
1602       if (evt->event_type >= SESSION_CTRL_EVT_BOUND)
1603         {
1604           elt->evt.ctrl_data_index = session_evt_ctrl_data_alloc (wrk);
1605           elt->evt.event_type = evt->event_type;
1606           clib_memcpy_fast (session_evt_ctrl_data (wrk, elt), evt->data,
1607                             session_evt_msg_sizes[evt->event_type]);
1608         }
1609       else
1610         {
1611           /* Internal control events fit into io events footprint */
1612           clib_memcpy_fast (&elt->evt, evt, sizeof (elt->evt));
1613         }
1614     }
1615   else
1616     {
1617       elt = session_evt_alloc_new (wrk);
1618       clib_memcpy_fast (&elt->evt, evt, sizeof (elt->evt));
1619     }
1620 }
1621
1622 static void
1623 session_flush_pending_tx_buffers (session_worker_t * wrk,
1624                                   vlib_node_runtime_t * node)
1625 {
1626   vlib_buffer_enqueue_to_next (wrk->vm, node, wrk->pending_tx_buffers,
1627                                wrk->pending_tx_nexts,
1628                                vec_len (wrk->pending_tx_nexts));
1629   vec_reset_length (wrk->pending_tx_buffers);
1630   vec_reset_length (wrk->pending_tx_nexts);
1631 }
1632
1633 int
1634 session_wrk_handle_mq (session_worker_t *wrk, svm_msg_q_t *mq)
1635 {
1636   svm_msg_q_msg_t _msg, *msg = &_msg;
1637   u32 i, n_to_dequeue = 0;
1638   session_event_t *evt;
1639
1640   n_to_dequeue = svm_msg_q_size (mq);
1641   for (i = 0; i < n_to_dequeue; i++)
1642     {
1643       svm_msg_q_sub_raw (mq, msg);
1644       evt = svm_msg_q_msg_data (mq, msg);
1645       session_evt_add_to_list (wrk, evt);
1646       svm_msg_q_free_msg (mq, msg);
1647     }
1648
1649   return n_to_dequeue;
1650 }
1651
1652 static void
1653 session_wrk_update_state (session_worker_t *wrk)
1654 {
1655   vlib_main_t *vm = wrk->vm;
1656
1657   if (wrk->state == SESSION_WRK_POLLING)
1658     {
1659       if (clib_llist_elts (wrk->event_elts) == 4 &&
1660           vlib_last_vectors_per_main_loop (vm) < 1)
1661         {
1662           session_wrk_set_state (wrk, SESSION_WRK_INTERRUPT);
1663           vlib_node_set_state (vm, session_queue_node.index,
1664                                VLIB_NODE_STATE_INTERRUPT);
1665         }
1666     }
1667   else if (wrk->state == SESSION_WRK_INTERRUPT)
1668     {
1669       if (clib_llist_elts (wrk->event_elts) > 4 ||
1670           vlib_last_vectors_per_main_loop (vm) > 1)
1671         {
1672           session_wrk_set_state (wrk, SESSION_WRK_POLLING);
1673           vlib_node_set_state (vm, session_queue_node.index,
1674                                VLIB_NODE_STATE_POLLING);
1675         }
1676       else if (PREDICT_FALSE (!pool_elts (wrk->sessions)))
1677         {
1678           session_wrk_set_state (wrk, SESSION_WRK_IDLE);
1679         }
1680     }
1681   else
1682     {
1683       if (clib_llist_elts (wrk->event_elts))
1684         {
1685           session_wrk_set_state (wrk, SESSION_WRK_INTERRUPT);
1686         }
1687     }
1688 }
1689
1690 static uword
1691 session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
1692                        vlib_frame_t * frame)
1693 {
1694   u32 thread_index = vm->thread_index, __clib_unused n_evts;
1695   session_evt_elt_t *elt, *ctrl_he, *new_he, *old_he;
1696   session_main_t *smm = vnet_get_session_main ();
1697   session_worker_t *wrk = &smm->wrk[thread_index];
1698   clib_llist_index_t ei, next_ei, old_ti;
1699   int n_tx_packets;
1700
1701   SESSION_EVT (SESSION_EVT_DISPATCH_START, wrk);
1702
1703   session_wrk_update_time (wrk, vlib_time_now (vm));
1704
1705   /*
1706    *  Update transport time
1707    */
1708   transport_update_time (wrk->last_vlib_time, thread_index);
1709   n_tx_packets = vec_len (wrk->pending_tx_buffers);
1710   SESSION_EVT (SESSION_EVT_DSP_CNTRS, UPDATE_TIME, wrk);
1711
1712   /*
1713    *  Dequeue new internal mq events
1714    */
1715
1716   n_evts = session_wrk_handle_mq (wrk, wrk->vpp_event_queue);
1717   SESSION_EVT (SESSION_EVT_DSP_CNTRS, MQ_DEQ, wrk, n_evts);
1718
1719   /*
1720    * Handle control events
1721    */
1722
1723   ei = wrk->ctrl_head;
1724   ctrl_he = clib_llist_elt (wrk->event_elts, ei);
1725   next_ei = clib_llist_next_index (ctrl_he, evt_list);
1726   old_ti = clib_llist_prev_index (ctrl_he, evt_list);
1727   while (ei != old_ti)
1728     {
1729       ei = next_ei;
1730       elt = clib_llist_elt (wrk->event_elts, next_ei);
1731       next_ei = clib_llist_next_index (elt, evt_list);
1732       clib_llist_remove (wrk->event_elts, evt_list, elt);
1733       session_event_dispatch_ctrl (wrk, elt);
1734     }
1735
1736   SESSION_EVT (SESSION_EVT_DSP_CNTRS, CTRL_EVTS, wrk);
1737
1738   /*
1739    * Handle the new io events.
1740    */
1741
1742   new_he = clib_llist_elt (wrk->event_elts, wrk->new_head);
1743   old_he = clib_llist_elt (wrk->event_elts, wrk->old_head);
1744   old_ti = clib_llist_prev_index (old_he, evt_list);
1745
1746   ei = clib_llist_next_index (new_he, evt_list);
1747   while (ei != wrk->new_head && n_tx_packets < SESSION_NODE_FRAME_SIZE)
1748     {
1749       elt = clib_llist_elt (wrk->event_elts, ei);
1750       ei = clib_llist_next_index (elt, evt_list);
1751       clib_llist_remove (wrk->event_elts, evt_list, elt);
1752       session_event_dispatch_io (wrk, node, elt, &n_tx_packets);
1753     }
1754
1755   SESSION_EVT (SESSION_EVT_DSP_CNTRS, NEW_IO_EVTS, wrk);
1756
1757   /*
1758    * Handle the old io events, if we had any prior to processing the new ones
1759    */
1760
1761   if (old_ti != wrk->old_head)
1762     {
1763       old_he = clib_llist_elt (wrk->event_elts, wrk->old_head);
1764       ei = clib_llist_next_index (old_he, evt_list);
1765
1766       while (n_tx_packets < SESSION_NODE_FRAME_SIZE)
1767         {
1768           elt = clib_llist_elt (wrk->event_elts, ei);
1769           next_ei = clib_llist_next_index (elt, evt_list);
1770           clib_llist_remove (wrk->event_elts, evt_list, elt);
1771
1772           session_event_dispatch_io (wrk, node, elt, &n_tx_packets);
1773
1774           if (ei == old_ti)
1775             break;
1776
1777           ei = next_ei;
1778         };
1779     }
1780
1781   SESSION_EVT (SESSION_EVT_DSP_CNTRS, OLD_IO_EVTS, wrk);
1782
1783   if (vec_len (wrk->pending_tx_buffers))
1784     session_flush_pending_tx_buffers (wrk, node);
1785
1786   vlib_node_increment_counter (vm, session_queue_node.index,
1787                                SESSION_QUEUE_ERROR_TX, n_tx_packets);
1788
1789   SESSION_EVT (SESSION_EVT_DISPATCH_END, wrk, n_tx_packets);
1790
1791   if (wrk->flags & SESSION_WRK_F_ADAPTIVE)
1792     session_wrk_update_state (wrk);
1793
1794   return n_tx_packets;
1795 }
1796
1797 /* *INDENT-OFF* */
1798 VLIB_REGISTER_NODE (session_queue_node) =
1799 {
1800   .function = session_queue_node_fn,
1801   .flags = VLIB_NODE_FLAG_TRACE_SUPPORTED,
1802   .name = "session-queue",
1803   .format_trace = format_session_queue_trace,
1804   .type = VLIB_NODE_TYPE_INPUT,
1805   .n_errors = ARRAY_LEN (session_queue_error_strings),
1806   .error_strings = session_queue_error_strings,
1807   .state = VLIB_NODE_STATE_DISABLED,
1808 };
1809 /* *INDENT-ON* */
1810
1811 static clib_error_t *
1812 session_wrk_tfd_read_ready (clib_file_t *cf)
1813 {
1814   session_worker_t *wrk = session_main_get_worker (cf->private_data);
1815   u64 buf;
1816   int rv;
1817
1818   vlib_node_set_interrupt_pending (wrk->vm, session_queue_node.index);
1819   rv = read (wrk->timerfd, &buf, sizeof (buf));
1820   if (rv < 0 && errno != EAGAIN)
1821     clib_unix_warning ("failed");
1822   return 0;
1823 }
1824
1825 static clib_error_t *
1826 session_wrk_tfd_write_ready (clib_file_t *cf)
1827 {
1828   return 0;
1829 }
1830
1831 void
1832 session_wrk_enable_adaptive_mode (session_worker_t *wrk)
1833 {
1834   u32 thread_index = wrk->vm->thread_index;
1835   clib_file_t template = { 0 };
1836
1837   if ((wrk->timerfd = timerfd_create (CLOCK_MONOTONIC, TFD_NONBLOCK)) < 0)
1838     clib_warning ("timerfd_create");
1839
1840   template.read_function = session_wrk_tfd_read_ready;
1841   template.write_function = session_wrk_tfd_write_ready;
1842   template.file_descriptor = wrk->timerfd;
1843   template.private_data = thread_index;
1844   template.polling_thread_index = thread_index;
1845   template.description = format (0, "session-wrk-tfd-%u", thread_index);
1846
1847   wrk->timerfd_file = clib_file_add (&file_main, &template);
1848   wrk->flags |= SESSION_WRK_F_ADAPTIVE;
1849 }
1850
1851 static clib_error_t *
1852 session_queue_exit (vlib_main_t * vm)
1853 {
1854   if (vlib_get_n_threads () < 2)
1855     return 0;
1856
1857   /*
1858    * Shut off (especially) worker-thread session nodes.
1859    * Otherwise, vpp can crash as the main thread unmaps the
1860    * API segment.
1861    */
1862   vlib_worker_thread_barrier_sync (vm);
1863   session_node_enable_disable (0 /* is_enable */ );
1864   vlib_worker_thread_barrier_release (vm);
1865   return 0;
1866 }
1867
1868 VLIB_MAIN_LOOP_EXIT_FUNCTION (session_queue_exit);
1869
1870 static uword
1871 session_queue_run_on_main (vlib_main_t * vm)
1872 {
1873   vlib_node_runtime_t *node;
1874
1875   node = vlib_node_get_runtime (vm, session_queue_node.index);
1876   return session_queue_node_fn (vm, node, 0);
1877 }
1878
1879 static uword
1880 session_queue_process (vlib_main_t * vm, vlib_node_runtime_t * rt,
1881                        vlib_frame_t * f)
1882 {
1883   uword *event_data = 0;
1884   f64 timeout = 1.0;
1885   uword event_type;
1886
1887   while (1)
1888     {
1889       vlib_process_wait_for_event_or_clock (vm, timeout);
1890       event_type = vlib_process_get_events (vm, (uword **) & event_data);
1891
1892       switch (event_type)
1893         {
1894         case SESSION_Q_PROCESS_RUN_ON_MAIN:
1895           /* Run session queue node on main thread */
1896           session_queue_run_on_main (vm);
1897           break;
1898         case SESSION_Q_PROCESS_STOP:
1899           vlib_node_set_state (vm, session_queue_process_node.index,
1900                                VLIB_NODE_STATE_DISABLED);
1901           timeout = 100000.0;
1902           break;
1903         case ~0:
1904           /* Timed out. Run on main to ensure all events are handled */
1905           session_queue_run_on_main (vm);
1906           break;
1907         }
1908       vec_reset_length (event_data);
1909     }
1910   return 0;
1911 }
1912
1913 /* *INDENT-OFF* */
1914 VLIB_REGISTER_NODE (session_queue_process_node) =
1915 {
1916   .function = session_queue_process,
1917   .type = VLIB_NODE_TYPE_PROCESS,
1918   .name = "session-queue-process",
1919   .state = VLIB_NODE_STATE_DISABLED,
1920 };
1921 /* *INDENT-ON* */
1922
1923 static_always_inline uword
1924 session_queue_pre_input_inline (vlib_main_t * vm, vlib_node_runtime_t * node,
1925                                 vlib_frame_t * frame)
1926 {
1927   session_main_t *sm = &session_main;
1928   if (!sm->wrk[0].vpp_event_queue)
1929     return 0;
1930   node = vlib_node_get_runtime (vm, session_queue_node.index);
1931   return session_queue_node_fn (vm, node, frame);
1932 }
1933
1934 /* *INDENT-OFF* */
1935 VLIB_REGISTER_NODE (session_queue_pre_input_node) =
1936 {
1937   .function = session_queue_pre_input_inline,
1938   .type = VLIB_NODE_TYPE_PRE_INPUT,
1939   .name = "session-queue-main",
1940   .state = VLIB_NODE_STATE_DISABLED,
1941 };
1942 /* *INDENT-ON* */
1943
1944 /*
1945  * fd.io coding-style-patch-verification: ON
1946  *
1947  * Local Variables:
1948  * eval: (c-set-style "gnu")
1949  * End:
1950  */