session: free ctrl event data on connect rpc
[vpp.git] / src / vnet / session / session_node.c
1 /*
2  * Copyright (c) 2017-2019 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include <math.h>
17 #include <vlib/vlib.h>
18 #include <vnet/vnet.h>
19 #include <vppinfra/elog.h>
20 #include <vnet/session/transport.h>
21 #include <vnet/session/session.h>
22 #include <vnet/session/application.h>
23 #include <vnet/session/application_interface.h>
24 #include <vnet/session/application_local.h>
25 #include <vnet/session/session_debug.h>
26 #include <svm/queue.h>
27 #include <sys/timerfd.h>
28
29 #define app_check_thread_and_barrier(_fn, _arg)                         \
30   if (!vlib_thread_is_main_w_barrier ())                                \
31     {                                                                   \
32      vlib_rpc_call_main_thread (_fn, (u8 *) _arg, sizeof(*_arg));       \
33       return;                                                           \
34    }
35
36 static void
37 session_wrk_timerfd_update (session_worker_t *wrk, u64 time_ns)
38 {
39   struct itimerspec its;
40
41   its.it_value.tv_sec = 0;
42   its.it_value.tv_nsec = time_ns;
43   its.it_interval.tv_sec = 0;
44   its.it_interval.tv_nsec = its.it_value.tv_nsec;
45
46   if (timerfd_settime (wrk->timerfd, 0, &its, NULL) == -1)
47     clib_warning ("timerfd_settime");
48 }
49
50 always_inline u64
51 session_wrk_tfd_timeout (session_wrk_state_t state, u32 thread_index)
52 {
53   if (state == SESSION_WRK_INTERRUPT)
54     return thread_index ? 1e6 : vlib_num_workers () ? 5e8 : 1e6;
55   else if (state == SESSION_WRK_IDLE)
56     return thread_index ? 1e8 : vlib_num_workers () ? 5e8 : 1e8;
57   else
58     return 0;
59 }
60
61 static inline void
62 session_wrk_set_state (session_worker_t *wrk, session_wrk_state_t state)
63 {
64   u64 time_ns;
65
66   wrk->state = state;
67   if (wrk->timerfd == -1)
68     return;
69   time_ns = session_wrk_tfd_timeout (state, wrk->vm->thread_index);
70   session_wrk_timerfd_update (wrk, time_ns);
71 }
72
73 static transport_endpt_ext_cfg_t *
74 session_mq_get_ext_config (application_t *app, uword offset)
75 {
76   svm_fifo_chunk_t *c;
77   fifo_segment_t *fs;
78
79   fs = application_get_rx_mqs_segment (app);
80   c = fs_chunk_ptr (fs->h, offset);
81   return (transport_endpt_ext_cfg_t *) c->data;
82 }
83
84 static void
85 session_mq_free_ext_config (application_t *app, uword offset)
86 {
87   svm_fifo_chunk_t *c;
88   fifo_segment_t *fs;
89
90   fs = application_get_rx_mqs_segment (app);
91   c = fs_chunk_ptr (fs->h, offset);
92   fifo_segment_collect_chunk (fs, 0 /* only one slice */, c);
93 }
94
95 static void
96 session_mq_listen_handler (void *data)
97 {
98   session_listen_msg_t *mp = (session_listen_msg_t *) data;
99   vnet_listen_args_t _a, *a = &_a;
100   app_worker_t *app_wrk;
101   application_t *app;
102   int rv;
103
104   app_check_thread_and_barrier (session_mq_listen_handler, mp);
105
106   app = application_lookup (mp->client_index);
107   if (!app)
108     return;
109
110   clib_memset (a, 0, sizeof (*a));
111   a->sep.is_ip4 = mp->is_ip4;
112   ip_copy (&a->sep.ip, &mp->ip, mp->is_ip4);
113   a->sep.port = mp->port;
114   a->sep.fib_index = mp->vrf;
115   a->sep.sw_if_index = ENDPOINT_INVALID_INDEX;
116   a->sep.transport_proto = mp->proto;
117   a->app_index = app->app_index;
118   a->wrk_map_index = mp->wrk_index;
119   a->sep_ext.transport_flags = mp->flags;
120
121   if (mp->ext_config)
122     a->sep_ext.ext_cfg = session_mq_get_ext_config (app, mp->ext_config);
123
124   if ((rv = vnet_listen (a)))
125     clib_warning ("listen returned: %U", format_session_error, rv);
126
127   app_wrk = application_get_worker (app, mp->wrk_index);
128   mq_send_session_bound_cb (app_wrk->wrk_index, mp->context, a->handle, rv);
129
130   if (mp->ext_config)
131     session_mq_free_ext_config (app, mp->ext_config);
132 }
133
134 static void
135 session_mq_listen_uri_handler (void *data)
136 {
137   session_listen_uri_msg_t *mp = (session_listen_uri_msg_t *) data;
138   vnet_listen_args_t _a, *a = &_a;
139   app_worker_t *app_wrk;
140   application_t *app;
141   int rv;
142
143   app_check_thread_and_barrier (session_mq_listen_uri_handler, mp);
144
145   app = application_lookup (mp->client_index);
146   if (!app)
147     return;
148
149   clib_memset (a, 0, sizeof (*a));
150   a->uri = (char *) mp->uri;
151   a->app_index = app->app_index;
152   rv = vnet_bind_uri (a);
153
154   app_wrk = application_get_worker (app, 0);
155   mq_send_session_bound_cb (app_wrk->wrk_index, mp->context, a->handle, rv);
156 }
157
158 static void
159 session_mq_connect_one (session_connect_msg_t *mp)
160 {
161   vnet_connect_args_t _a, *a = &_a;
162   app_worker_t *app_wrk;
163   application_t *app;
164   int rv;
165
166   app = application_lookup (mp->client_index);
167   if (!app)
168     return;
169
170   clib_memset (a, 0, sizeof (*a));
171   a->sep.is_ip4 = mp->is_ip4;
172   clib_memcpy_fast (&a->sep.ip, &mp->ip, sizeof (mp->ip));
173   a->sep.port = mp->port;
174   a->sep.transport_proto = mp->proto;
175   a->sep.peer.fib_index = mp->vrf;
176   clib_memcpy_fast (&a->sep.peer.ip, &mp->lcl_ip, sizeof (mp->lcl_ip));
177   if (mp->is_ip4)
178     {
179       ip46_address_mask_ip4 (&a->sep.ip);
180       ip46_address_mask_ip4 (&a->sep.peer.ip);
181     }
182   a->sep.peer.port = mp->lcl_port;
183   a->sep.peer.sw_if_index = ENDPOINT_INVALID_INDEX;
184   a->sep_ext.parent_handle = mp->parent_handle;
185   a->sep_ext.transport_flags = mp->flags;
186   a->api_context = mp->context;
187   a->app_index = app->app_index;
188   a->wrk_map_index = mp->wrk_index;
189
190   if (mp->ext_config)
191     a->sep_ext.ext_cfg = session_mq_get_ext_config (app, mp->ext_config);
192
193   if ((rv = vnet_connect (a)))
194     {
195       clib_warning ("connect returned: %U", format_session_error, rv);
196       app_wrk = application_get_worker (app, mp->wrk_index);
197       mq_send_session_connected_cb (app_wrk->wrk_index, mp->context, 0, rv);
198     }
199
200   if (mp->ext_config)
201     session_mq_free_ext_config (app, mp->ext_config);
202 }
203
204 static void
205 session_mq_handle_connects_rpc (void *arg)
206 {
207   u32 max_connects = 32, n_connects = 0;
208   vlib_main_t *vm = vlib_get_main ();
209   session_evt_elt_t *he, *elt, *next;
210   session_worker_t *fwrk, *wrk;
211
212   ASSERT (vlib_get_thread_index () == 0);
213
214   /* Pending connects on linked list pertaining to first worker */
215   fwrk = session_main_get_worker (1);
216   if (!fwrk->n_pending_connects)
217     goto update_state;
218
219   vlib_worker_thread_barrier_sync (vm);
220
221   he = clib_llist_elt (fwrk->event_elts, fwrk->pending_connects);
222   elt = clib_llist_next (fwrk->event_elts, evt_list, he);
223
224   /* Avoid holding the barrier for too long */
225   while (n_connects < max_connects && elt != he)
226     {
227       next = clib_llist_next (fwrk->event_elts, evt_list, elt);
228       clib_llist_remove (fwrk->event_elts, evt_list, elt);
229       session_mq_connect_one (session_evt_ctrl_data (fwrk, elt));
230       session_evt_ctrl_data_free (fwrk, elt);
231       clib_llist_put (fwrk->event_elts, elt);
232       elt = next;
233       n_connects += 1;
234     }
235
236   /* Decrement with worker barrier */
237   fwrk->n_pending_connects -= n_connects;
238
239   vlib_worker_thread_barrier_release (vm);
240
241 update_state:
242
243   /* Switch worker to poll mode if it was in interrupt mode and had work or
244    * back to interrupt if threshold of loops without a connect is passed.
245    * While in poll mode, reprogram connects rpc */
246   wrk = session_main_get_worker (0);
247   if (wrk->state != SESSION_WRK_POLLING)
248     {
249       if (n_connects)
250         {
251           session_wrk_set_state (wrk, SESSION_WRK_POLLING);
252           vlib_node_set_state (vm, session_queue_node.index,
253                                VLIB_NODE_STATE_POLLING);
254           wrk->no_connect_loops = 0;
255         }
256     }
257   else
258     {
259       if (!n_connects)
260         {
261           if (++wrk->no_connect_loops > 1e5)
262             {
263               session_wrk_set_state (wrk, SESSION_WRK_INTERRUPT);
264               vlib_node_set_state (vm, session_queue_node.index,
265                                    VLIB_NODE_STATE_INTERRUPT);
266             }
267         }
268       else
269         wrk->no_connect_loops = 0;
270     }
271
272   if (wrk->state == SESSION_WRK_POLLING)
273     {
274       elt = session_evt_alloc_ctrl (wrk);
275       elt->evt.event_type = SESSION_CTRL_EVT_RPC;
276       elt->evt.rpc_args.fp = session_mq_handle_connects_rpc;
277     }
278 }
279
280 static void
281 session_mq_connect_handler (session_worker_t *wrk, session_evt_elt_t *elt)
282 {
283   u32 thread_index = wrk - session_main.wrk;
284   session_evt_elt_t *he;
285
286   /* No workers, so just deal with the connect now */
287   if (PREDICT_FALSE (!thread_index))
288     {
289       session_mq_connect_one (session_evt_ctrl_data (wrk, elt));
290       return;
291     }
292
293   if (PREDICT_FALSE (thread_index != 1))
294     {
295       clib_warning ("Connect on wrong thread. Dropping");
296       return;
297     }
298
299   /* Add to pending list to be handled by main thread */
300   he = clib_llist_elt (wrk->event_elts, wrk->pending_connects);
301   clib_llist_add_tail (wrk->event_elts, evt_list, elt, he);
302
303   /* Decremented with worker barrier */
304   wrk->n_pending_connects += 1;
305   if (wrk->n_pending_connects == 1)
306     {
307       vlib_node_set_interrupt_pending (vlib_get_main_by_index (0),
308                                        session_queue_node.index);
309       session_send_rpc_evt_to_thread (0, session_mq_handle_connects_rpc, 0);
310     }
311 }
312
313 static void
314 session_mq_connect_uri_handler (void *data)
315 {
316   session_connect_uri_msg_t *mp = (session_connect_uri_msg_t *) data;
317   vnet_connect_args_t _a, *a = &_a;
318   app_worker_t *app_wrk;
319   application_t *app;
320   int rv;
321
322   app_check_thread_and_barrier (session_mq_connect_uri_handler, mp);
323
324   app = application_lookup (mp->client_index);
325   if (!app)
326     return;
327
328   clib_memset (a, 0, sizeof (*a));
329   a->uri = (char *) mp->uri;
330   a->api_context = mp->context;
331   a->app_index = app->app_index;
332   if ((rv = vnet_connect_uri (a)))
333     {
334       clib_warning ("connect_uri returned: %d", rv);
335       app_wrk = application_get_worker (app, 0 /* default wrk only */ );
336       mq_send_session_connected_cb (app_wrk->wrk_index, mp->context, 0, rv);
337     }
338 }
339
340 static void
341 session_mq_shutdown_handler (void *data)
342 {
343   session_shutdown_msg_t *mp = (session_shutdown_msg_t *) data;
344   vnet_shutdown_args_t _a, *a = &_a;
345   application_t *app;
346
347   app = application_lookup (mp->client_index);
348   if (!app)
349     return;
350
351   a->app_index = app->app_index;
352   a->handle = mp->handle;
353   vnet_shutdown_session (a);
354 }
355
356 static void
357 session_mq_disconnect_handler (void *data)
358 {
359   session_disconnect_msg_t *mp = (session_disconnect_msg_t *) data;
360   vnet_disconnect_args_t _a, *a = &_a;
361   application_t *app;
362
363   app = application_lookup (mp->client_index);
364   if (!app)
365     return;
366
367   a->app_index = app->app_index;
368   a->handle = mp->handle;
369   vnet_disconnect_session (a);
370 }
371
372 static void
373 app_mq_detach_handler (void *data)
374 {
375   session_app_detach_msg_t *mp = (session_app_detach_msg_t *) data;
376   vnet_app_detach_args_t _a, *a = &_a;
377   application_t *app;
378
379   app_check_thread_and_barrier (app_mq_detach_handler, mp);
380
381   app = application_lookup (mp->client_index);
382   if (!app)
383     return;
384
385   a->app_index = app->app_index;
386   a->api_client_index = mp->client_index;
387   vnet_application_detach (a);
388 }
389
390 static void
391 session_mq_unlisten_rpc (session_unlisten_msg_t *mp)
392 {
393   vlib_main_t *vm = vlib_get_main ();
394   vnet_unlisten_args_t _a, *a = &_a;
395   app_worker_t *app_wrk;
396   session_handle_t sh;
397   application_t *app;
398   u32 context;
399   int rv;
400
401   sh = mp->handle;
402   context = mp->context;
403
404   vlib_worker_thread_barrier_sync (vm);
405
406   app = application_lookup (mp->client_index);
407   if (!app)
408     return;
409
410   clib_memset (a, 0, sizeof (*a));
411   a->app_index = app->app_index;
412   a->handle = sh;
413   a->wrk_map_index = mp->wrk_index;
414   if ((rv = vnet_unlisten (a)))
415     clib_warning ("unlisten returned: %d", rv);
416
417   app_wrk = application_get_worker (app, a->wrk_map_index);
418   if (!app_wrk)
419     return;
420
421   vlib_worker_thread_barrier_release (vm);
422
423   mq_send_unlisten_reply (app_wrk, sh, context, rv);
424   clib_mem_free (mp);
425 }
426
427 static void
428 session_mq_unlisten_handler (session_worker_t *wrk, session_evt_elt_t *elt)
429 {
430   u32 thread_index = wrk - session_main.wrk;
431   session_unlisten_msg_t *mp, *arg;
432
433   mp = session_evt_ctrl_data (wrk, elt);
434   arg = clib_mem_alloc (sizeof (session_unlisten_msg_t));
435   clib_memcpy_fast (arg, mp, sizeof (*arg));
436
437   if (PREDICT_FALSE (!thread_index))
438     {
439       session_mq_unlisten_rpc (arg);
440       return;
441     }
442
443   session_send_rpc_evt_to_thread_force (0, session_mq_unlisten_rpc, arg);
444 }
445
446 static void
447 session_mq_accepted_reply_handler (void *data)
448 {
449   session_accepted_reply_msg_t *mp = (session_accepted_reply_msg_t *) data;
450   vnet_disconnect_args_t _a = { 0 }, *a = &_a;
451   session_state_t old_state;
452   app_worker_t *app_wrk;
453   session_t *s;
454
455   /* Server isn't interested, kill the session */
456   if (mp->retval)
457     {
458       a->app_index = mp->context;
459       a->handle = mp->handle;
460       vnet_disconnect_session (a);
461       return;
462     }
463
464   /* Mail this back from the main thread. We're not polling in main
465    * thread so we're using other workers for notifications. */
466   if (vlib_num_workers () && vlib_get_thread_index () != 0
467       && session_thread_from_handle (mp->handle) == 0)
468     {
469       vlib_rpc_call_main_thread (session_mq_accepted_reply_handler,
470                                  (u8 *) mp, sizeof (*mp));
471       return;
472     }
473
474   s = session_get_from_handle_if_valid (mp->handle);
475   if (!s)
476     return;
477
478   app_wrk = app_worker_get (s->app_wrk_index);
479   if (app_wrk->app_index != mp->context)
480     {
481       clib_warning ("app doesn't own session");
482       return;
483     }
484
485   if (!session_has_transport (s))
486     {
487       s->session_state = SESSION_STATE_READY;
488       if (ct_session_connect_notify (s, SESSION_E_NONE))
489         return;
490     }
491   else
492     {
493       old_state = s->session_state;
494       s->session_state = SESSION_STATE_READY;
495
496       if (!svm_fifo_is_empty_prod (s->rx_fifo))
497         app_worker_lock_and_send_event (app_wrk, s, SESSION_IO_EVT_RX);
498
499       /* Closed while waiting for app to reply. Resend disconnect */
500       if (old_state >= SESSION_STATE_TRANSPORT_CLOSING)
501         {
502           app_worker_close_notify (app_wrk, s);
503           s->session_state = old_state;
504           return;
505         }
506     }
507 }
508
509 static void
510 session_mq_reset_reply_handler (void *data)
511 {
512   vnet_disconnect_args_t _a = { 0 }, *a = &_a;
513   session_reset_reply_msg_t *mp;
514   app_worker_t *app_wrk;
515   session_t *s;
516   application_t *app;
517   u32 index, thread_index;
518
519   mp = (session_reset_reply_msg_t *) data;
520   app = application_lookup (mp->context);
521   if (!app)
522     return;
523
524   session_parse_handle (mp->handle, &index, &thread_index);
525   s = session_get_if_valid (index, thread_index);
526
527   /* No session or not the right session */
528   if (!s || s->session_state < SESSION_STATE_TRANSPORT_CLOSING)
529     return;
530
531   app_wrk = app_worker_get (s->app_wrk_index);
532   if (!app_wrk || app_wrk->app_index != app->app_index)
533     {
534       clib_warning ("App %u does not own handle 0x%lx!", app->app_index,
535                     mp->handle);
536       return;
537     }
538
539   /* Client objected to resetting the session, log and continue */
540   if (mp->retval)
541     {
542       clib_warning ("client retval %d", mp->retval);
543       return;
544     }
545
546   /* This comes as a response to a reset, transport only waiting for
547    * confirmation to remove connection state, no need to disconnect */
548   a->handle = mp->handle;
549   a->app_index = app->app_index;
550   vnet_disconnect_session (a);
551 }
552
553 static void
554 session_mq_disconnected_handler (void *data)
555 {
556   session_disconnected_reply_msg_t *rmp;
557   vnet_disconnect_args_t _a, *a = &_a;
558   svm_msg_q_msg_t _msg, *msg = &_msg;
559   session_disconnected_msg_t *mp;
560   app_worker_t *app_wrk;
561   session_event_t *evt;
562   session_t *s;
563   application_t *app;
564   int rv = 0;
565
566   mp = (session_disconnected_msg_t *) data;
567   if (!(s = session_get_from_handle_if_valid (mp->handle)))
568     {
569       clib_warning ("could not disconnect handle %llu", mp->handle);
570       return;
571     }
572   app_wrk = app_worker_get (s->app_wrk_index);
573   app = application_lookup (mp->client_index);
574   if (!(app_wrk && app && app->app_index == app_wrk->app_index))
575     {
576       clib_warning ("could not disconnect session: %llu app: %u",
577                     mp->handle, mp->client_index);
578       return;
579     }
580
581   a->handle = mp->handle;
582   a->app_index = app_wrk->wrk_index;
583   rv = vnet_disconnect_session (a);
584
585   svm_msg_q_lock_and_alloc_msg_w_ring (app_wrk->event_queue,
586                                        SESSION_MQ_CTRL_EVT_RING,
587                                        SVM_Q_WAIT, msg);
588   evt = svm_msg_q_msg_data (app_wrk->event_queue, msg);
589   clib_memset (evt, 0, sizeof (*evt));
590   evt->event_type = SESSION_CTRL_EVT_DISCONNECTED_REPLY;
591   rmp = (session_disconnected_reply_msg_t *) evt->data;
592   rmp->handle = mp->handle;
593   rmp->context = mp->context;
594   rmp->retval = rv;
595   svm_msg_q_add_and_unlock (app_wrk->event_queue, msg);
596 }
597
598 static void
599 session_mq_disconnected_reply_handler (void *data)
600 {
601   session_disconnected_reply_msg_t *mp;
602   vnet_disconnect_args_t _a, *a = &_a;
603   application_t *app;
604
605   mp = (session_disconnected_reply_msg_t *) data;
606
607   /* Client objected to disconnecting the session, log and continue */
608   if (mp->retval)
609     {
610       clib_warning ("client retval %d", mp->retval);
611       return;
612     }
613
614   /* Disconnect has been confirmed. Confirm close to transport */
615   app = application_lookup (mp->context);
616   if (app)
617     {
618       a->handle = mp->handle;
619       a->app_index = app->app_index;
620       vnet_disconnect_session (a);
621     }
622 }
623
624 static void
625 session_mq_worker_update_handler (void *data)
626 {
627   session_worker_update_msg_t *mp = (session_worker_update_msg_t *) data;
628   session_worker_update_reply_msg_t *rmp;
629   svm_msg_q_msg_t _msg, *msg = &_msg;
630   app_worker_t *app_wrk;
631   u32 owner_app_wrk_map;
632   session_event_t *evt;
633   session_t *s;
634   application_t *app;
635
636   app = application_lookup (mp->client_index);
637   if (!app)
638     return;
639   if (!(s = session_get_from_handle_if_valid (mp->handle)))
640     {
641       clib_warning ("invalid handle %llu", mp->handle);
642       return;
643     }
644   app_wrk = app_worker_get (s->app_wrk_index);
645   if (app_wrk->app_index != app->app_index)
646     {
647       clib_warning ("app %u does not own session %llu", app->app_index,
648                     mp->handle);
649       return;
650     }
651   owner_app_wrk_map = app_wrk->wrk_map_index;
652   app_wrk = application_get_worker (app, mp->wrk_index);
653
654   /* This needs to come from the new owner */
655   if (mp->req_wrk_index == owner_app_wrk_map)
656     {
657       session_req_worker_update_msg_t *wump;
658
659       svm_msg_q_lock_and_alloc_msg_w_ring (app_wrk->event_queue,
660                                            SESSION_MQ_CTRL_EVT_RING,
661                                            SVM_Q_WAIT, msg);
662       evt = svm_msg_q_msg_data (app_wrk->event_queue, msg);
663       clib_memset (evt, 0, sizeof (*evt));
664       evt->event_type = SESSION_CTRL_EVT_REQ_WORKER_UPDATE;
665       wump = (session_req_worker_update_msg_t *) evt->data;
666       wump->session_handle = mp->handle;
667       svm_msg_q_add_and_unlock (app_wrk->event_queue, msg);
668       return;
669     }
670
671   app_worker_own_session (app_wrk, s);
672
673   /*
674    * Send reply
675    */
676   svm_msg_q_lock_and_alloc_msg_w_ring (app_wrk->event_queue,
677                                        SESSION_MQ_CTRL_EVT_RING,
678                                        SVM_Q_WAIT, msg);
679   evt = svm_msg_q_msg_data (app_wrk->event_queue, msg);
680   clib_memset (evt, 0, sizeof (*evt));
681   evt->event_type = SESSION_CTRL_EVT_WORKER_UPDATE_REPLY;
682   rmp = (session_worker_update_reply_msg_t *) evt->data;
683   rmp->handle = mp->handle;
684   if (s->rx_fifo)
685     rmp->rx_fifo = fifo_segment_fifo_offset (s->rx_fifo);
686   if (s->tx_fifo)
687     rmp->tx_fifo = fifo_segment_fifo_offset (s->tx_fifo);
688   rmp->segment_handle = session_segment_handle (s);
689   svm_msg_q_add_and_unlock (app_wrk->event_queue, msg);
690
691   /*
692    * Retransmit messages that may have been lost
693    */
694   if (s->tx_fifo && !svm_fifo_is_empty (s->tx_fifo))
695     session_send_io_evt_to_thread (s->tx_fifo, SESSION_IO_EVT_TX);
696
697   if (s->rx_fifo && !svm_fifo_is_empty (s->rx_fifo))
698     app_worker_lock_and_send_event (app_wrk, s, SESSION_IO_EVT_RX);
699
700   if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSING)
701     app_worker_close_notify (app_wrk, s);
702 }
703
704 static void
705 session_mq_app_wrk_rpc_handler (void *data)
706 {
707   session_app_wrk_rpc_msg_t *mp = (session_app_wrk_rpc_msg_t *) data;
708   svm_msg_q_msg_t _msg, *msg = &_msg;
709   session_app_wrk_rpc_msg_t *rmp;
710   app_worker_t *app_wrk;
711   session_event_t *evt;
712   application_t *app;
713
714   app = application_lookup (mp->client_index);
715   if (!app)
716     return;
717
718   app_wrk = application_get_worker (app, mp->wrk_index);
719
720   svm_msg_q_lock_and_alloc_msg_w_ring (app_wrk->event_queue,
721                                        SESSION_MQ_CTRL_EVT_RING, SVM_Q_WAIT,
722                                        msg);
723   evt = svm_msg_q_msg_data (app_wrk->event_queue, msg);
724   clib_memset (evt, 0, sizeof (*evt));
725   evt->event_type = SESSION_CTRL_EVT_APP_WRK_RPC;
726   rmp = (session_app_wrk_rpc_msg_t *) evt->data;
727   clib_memcpy (rmp->data, mp->data, sizeof (mp->data));
728   svm_msg_q_add_and_unlock (app_wrk->event_queue, msg);
729 }
730
731 static void
732 session_mq_transport_attr_handler (void *data)
733 {
734   session_transport_attr_msg_t *mp = (session_transport_attr_msg_t *) data;
735   session_transport_attr_reply_msg_t *rmp;
736   svm_msg_q_msg_t _msg, *msg = &_msg;
737   app_worker_t *app_wrk;
738   session_event_t *evt;
739   application_t *app;
740   session_t *s;
741   int rv;
742
743   app = application_lookup (mp->client_index);
744   if (!app)
745     return;
746
747   if (!(s = session_get_from_handle_if_valid (mp->handle)))
748     {
749       clib_warning ("invalid handle %llu", mp->handle);
750       return;
751     }
752   app_wrk = app_worker_get (s->app_wrk_index);
753   if (app_wrk->app_index != app->app_index)
754     {
755       clib_warning ("app %u does not own session %llu", app->app_index,
756                     mp->handle);
757       return;
758     }
759
760   rv = session_transport_attribute (s, mp->is_get, &mp->attr);
761
762   svm_msg_q_lock_and_alloc_msg_w_ring (
763     app_wrk->event_queue, SESSION_MQ_CTRL_EVT_RING, SVM_Q_WAIT, msg);
764   evt = svm_msg_q_msg_data (app_wrk->event_queue, msg);
765   clib_memset (evt, 0, sizeof (*evt));
766   evt->event_type = SESSION_CTRL_EVT_TRANSPORT_ATTR_REPLY;
767   rmp = (session_transport_attr_reply_msg_t *) evt->data;
768   rmp->handle = mp->handle;
769   rmp->retval = rv;
770   rmp->is_get = mp->is_get;
771   if (!rv && mp->is_get)
772     rmp->attr = mp->attr;
773   svm_msg_q_add_and_unlock (app_wrk->event_queue, msg);
774 }
775
776 vlib_node_registration_t session_queue_node;
777
778 typedef struct
779 {
780   u32 session_index;
781   u32 server_thread_index;
782 } session_queue_trace_t;
783
784 /* packet trace format function */
785 static u8 *
786 format_session_queue_trace (u8 * s, va_list * args)
787 {
788   CLIB_UNUSED (vlib_main_t * vm) = va_arg (*args, vlib_main_t *);
789   CLIB_UNUSED (vlib_node_t * node) = va_arg (*args, vlib_node_t *);
790   session_queue_trace_t *t = va_arg (*args, session_queue_trace_t *);
791
792   s = format (s, "session index %d thread index %d",
793               t->session_index, t->server_thread_index);
794   return s;
795 }
796
797 #define foreach_session_queue_error             \
798 _(TX, "Packets transmitted")                    \
799 _(TIMER, "Timer events")                        \
800 _(NO_BUFFER, "Out of buffers")
801
802 typedef enum
803 {
804 #define _(sym,str) SESSION_QUEUE_ERROR_##sym,
805   foreach_session_queue_error
806 #undef _
807     SESSION_QUEUE_N_ERROR,
808 } session_queue_error_t;
809
810 static char *session_queue_error_strings[] = {
811 #define _(sym,string) string,
812   foreach_session_queue_error
813 #undef _
814 };
815
816 enum
817 {
818   SESSION_TX_NO_BUFFERS = -2,
819   SESSION_TX_NO_DATA,
820   SESSION_TX_OK
821 };
822
823 static void
824 session_tx_trace_frame (vlib_main_t * vm, vlib_node_runtime_t * node,
825                         u32 next_index, u32 * to_next, u16 n_segs,
826                         session_t * s, u32 n_trace)
827 {
828   while (n_trace && n_segs)
829     {
830       vlib_buffer_t *b = vlib_get_buffer (vm, to_next[0]);
831       if (PREDICT_TRUE
832           (vlib_trace_buffer
833            (vm, node, next_index, b, 1 /* follow_chain */ )))
834         {
835           session_queue_trace_t *t =
836             vlib_add_trace (vm, node, b, sizeof (*t));
837           t->session_index = s->session_index;
838           t->server_thread_index = s->thread_index;
839           n_trace--;
840         }
841       to_next++;
842       n_segs--;
843     }
844   vlib_set_trace_count (vm, node, n_trace);
845 }
846
847 always_inline void
848 session_tx_fifo_chain_tail (vlib_main_t * vm, session_tx_context_t * ctx,
849                             vlib_buffer_t * b, u16 * n_bufs, u8 peek_data)
850 {
851   vlib_buffer_t *chain_b, *prev_b;
852   u32 chain_bi0, to_deq, left_from_seg;
853   u16 len_to_deq, n_bytes_read;
854   u8 *data, j;
855
856   b->flags |= VLIB_BUFFER_TOTAL_LENGTH_VALID;
857   b->total_length_not_including_first_buffer = 0;
858
859   chain_b = b;
860   left_from_seg = clib_min (ctx->sp.snd_mss - b->current_length,
861                             ctx->left_to_snd);
862   to_deq = left_from_seg;
863   for (j = 1; j < ctx->n_bufs_per_seg; j++)
864     {
865       prev_b = chain_b;
866       len_to_deq = clib_min (to_deq, ctx->deq_per_buf);
867
868       *n_bufs -= 1;
869       chain_bi0 = ctx->tx_buffers[*n_bufs];
870       chain_b = vlib_get_buffer (vm, chain_bi0);
871       chain_b->current_data = 0;
872       data = vlib_buffer_get_current (chain_b);
873       if (peek_data)
874         {
875           n_bytes_read = svm_fifo_peek (ctx->s->tx_fifo,
876                                         ctx->sp.tx_offset, len_to_deq, data);
877           ctx->sp.tx_offset += n_bytes_read;
878         }
879       else
880         {
881           if (ctx->transport_vft->transport_options.tx_type ==
882               TRANSPORT_TX_DGRAM)
883             {
884               svm_fifo_t *f = ctx->s->tx_fifo;
885               session_dgram_hdr_t *hdr = &ctx->hdr;
886               u16 deq_now;
887               u32 offset;
888
889               deq_now = clib_min (hdr->data_length - hdr->data_offset,
890                                   len_to_deq);
891               offset = hdr->data_offset + SESSION_CONN_HDR_LEN;
892               n_bytes_read = svm_fifo_peek (f, offset, deq_now, data);
893               ASSERT (n_bytes_read > 0);
894
895               hdr->data_offset += n_bytes_read;
896               if (hdr->data_offset == hdr->data_length)
897                 {
898                   offset = hdr->data_length + SESSION_CONN_HDR_LEN;
899                   svm_fifo_dequeue_drop (f, offset);
900                   if (ctx->left_to_snd > n_bytes_read)
901                     svm_fifo_peek (ctx->s->tx_fifo, 0, sizeof (ctx->hdr),
902                                    (u8 *) & ctx->hdr);
903                 }
904               else if (ctx->left_to_snd == n_bytes_read)
905                 svm_fifo_overwrite_head (ctx->s->tx_fifo, (u8 *) & ctx->hdr,
906                                          sizeof (session_dgram_pre_hdr_t));
907             }
908           else
909             n_bytes_read = svm_fifo_dequeue (ctx->s->tx_fifo,
910                                              len_to_deq, data);
911         }
912       ASSERT (n_bytes_read == len_to_deq);
913       chain_b->current_length = n_bytes_read;
914       b->total_length_not_including_first_buffer += chain_b->current_length;
915
916       /* update previous buffer */
917       prev_b->next_buffer = chain_bi0;
918       prev_b->flags |= VLIB_BUFFER_NEXT_PRESENT;
919
920       /* update current buffer */
921       chain_b->next_buffer = 0;
922
923       to_deq -= n_bytes_read;
924       if (to_deq == 0)
925         break;
926     }
927   ASSERT (to_deq == 0
928           && b->total_length_not_including_first_buffer == left_from_seg);
929   ctx->left_to_snd -= left_from_seg;
930 }
931
932 always_inline void
933 session_tx_fill_buffer (vlib_main_t * vm, session_tx_context_t * ctx,
934                         vlib_buffer_t * b, u16 * n_bufs, u8 peek_data)
935 {
936   u32 len_to_deq;
937   u8 *data0;
938   int n_bytes_read;
939
940   /*
941    * Start with the first buffer in chain
942    */
943   b->error = 0;
944   b->flags = VNET_BUFFER_F_LOCALLY_ORIGINATED;
945   b->current_data = 0;
946
947   data0 = vlib_buffer_make_headroom (b, TRANSPORT_MAX_HDRS_LEN);
948   len_to_deq = clib_min (ctx->left_to_snd, ctx->deq_per_first_buf);
949
950   if (peek_data)
951     {
952       n_bytes_read = svm_fifo_peek (ctx->s->tx_fifo, ctx->sp.tx_offset,
953                                     len_to_deq, data0);
954       ASSERT (n_bytes_read > 0);
955       /* Keep track of progress locally, transport is also supposed to
956        * increment it independently when pushing the header */
957       ctx->sp.tx_offset += n_bytes_read;
958     }
959   else
960     {
961       if (ctx->transport_vft->transport_options.tx_type == TRANSPORT_TX_DGRAM)
962         {
963           session_dgram_hdr_t *hdr = &ctx->hdr;
964           svm_fifo_t *f = ctx->s->tx_fifo;
965           u16 deq_now;
966           u32 offset;
967
968           ASSERT (hdr->data_length > hdr->data_offset);
969           deq_now = clib_min (hdr->data_length - hdr->data_offset,
970                               len_to_deq);
971           offset = hdr->data_offset + SESSION_CONN_HDR_LEN;
972           n_bytes_read = svm_fifo_peek (f, offset, deq_now, data0);
973           ASSERT (n_bytes_read > 0);
974
975           if (ctx->s->session_state == SESSION_STATE_LISTENING)
976             {
977               ip_copy (&ctx->tc->rmt_ip, &hdr->rmt_ip, ctx->tc->is_ip4);
978               ctx->tc->rmt_port = hdr->rmt_port;
979             }
980           hdr->data_offset += n_bytes_read;
981           if (hdr->data_offset == hdr->data_length)
982             {
983               offset = hdr->data_length + SESSION_CONN_HDR_LEN;
984               svm_fifo_dequeue_drop (f, offset);
985               if (ctx->left_to_snd > n_bytes_read)
986                 svm_fifo_peek (ctx->s->tx_fifo, 0, sizeof (ctx->hdr),
987                                (u8 *) & ctx->hdr);
988             }
989           else if (ctx->left_to_snd == n_bytes_read)
990             svm_fifo_overwrite_head (ctx->s->tx_fifo, (u8 *) & ctx->hdr,
991                                      sizeof (session_dgram_pre_hdr_t));
992         }
993       else
994         {
995           n_bytes_read = svm_fifo_dequeue (ctx->s->tx_fifo,
996                                            len_to_deq, data0);
997           ASSERT (n_bytes_read > 0);
998         }
999     }
1000   b->current_length = n_bytes_read;
1001   ctx->left_to_snd -= n_bytes_read;
1002
1003   /*
1004    * Fill in the remaining buffers in the chain, if any
1005    */
1006   if (PREDICT_FALSE (ctx->n_bufs_per_seg > 1 && ctx->left_to_snd))
1007     session_tx_fifo_chain_tail (vm, ctx, b, n_bufs, peek_data);
1008 }
1009
1010 always_inline u8
1011 session_tx_not_ready (session_t * s, u8 peek_data)
1012 {
1013   if (peek_data)
1014     {
1015       if (PREDICT_TRUE (s->session_state == SESSION_STATE_READY))
1016         return 0;
1017       /* Can retransmit for closed sessions but can't send new data if
1018        * session is not ready or closed */
1019       else if (s->session_state < SESSION_STATE_READY)
1020         return 1;
1021       else if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSED)
1022         {
1023           /* Allow closed transports to still send custom packets.
1024            * For instance, tcp may want to send acks in time-wait. */
1025           if (s->session_state != SESSION_STATE_TRANSPORT_DELETED
1026               && (s->flags & SESSION_F_CUSTOM_TX))
1027             return 0;
1028           return 2;
1029         }
1030     }
1031   return 0;
1032 }
1033
1034 always_inline transport_connection_t *
1035 session_tx_get_transport (session_tx_context_t * ctx, u8 peek_data)
1036 {
1037   if (peek_data)
1038     {
1039       return ctx->transport_vft->get_connection (ctx->s->connection_index,
1040                                                  ctx->s->thread_index);
1041     }
1042   else
1043     {
1044       if (ctx->s->session_state == SESSION_STATE_LISTENING)
1045         return ctx->transport_vft->get_listener (ctx->s->connection_index);
1046       else
1047         {
1048           return ctx->transport_vft->get_connection (ctx->s->connection_index,
1049                                                      ctx->s->thread_index);
1050         }
1051     }
1052 }
1053
1054 always_inline void
1055 session_tx_set_dequeue_params (vlib_main_t * vm, session_tx_context_t * ctx,
1056                                u32 max_segs, u8 peek_data)
1057 {
1058   u32 n_bytes_per_buf, n_bytes_per_seg;
1059
1060   n_bytes_per_buf = vlib_buffer_get_default_data_size (vm);
1061   ctx->max_dequeue = svm_fifo_max_dequeue_cons (ctx->s->tx_fifo);
1062
1063   if (peek_data)
1064     {
1065       /* Offset in rx fifo from where to peek data */
1066       if (PREDICT_FALSE (ctx->sp.tx_offset >= ctx->max_dequeue))
1067         {
1068           ctx->max_len_to_snd = 0;
1069           return;
1070         }
1071       ctx->max_dequeue -= ctx->sp.tx_offset;
1072     }
1073   else
1074     {
1075       if (ctx->transport_vft->transport_options.tx_type == TRANSPORT_TX_DGRAM)
1076         {
1077           u32 len, chain_limit;
1078
1079           if (ctx->max_dequeue <= sizeof (ctx->hdr))
1080             {
1081               ctx->max_len_to_snd = 0;
1082               return;
1083             }
1084
1085           svm_fifo_peek (ctx->s->tx_fifo, 0, sizeof (ctx->hdr),
1086                          (u8 *) & ctx->hdr);
1087           ASSERT (ctx->hdr.data_length > ctx->hdr.data_offset);
1088           len = ctx->hdr.data_length - ctx->hdr.data_offset;
1089
1090           /* Process multiple dgrams if smaller than min (buf_space, mss).
1091            * This avoids handling multiple dgrams if they require buffer
1092            * chains */
1093           chain_limit = clib_min (n_bytes_per_buf - TRANSPORT_MAX_HDRS_LEN,
1094                                   ctx->sp.snd_mss);
1095           if (ctx->hdr.data_length <= chain_limit)
1096             {
1097               u32 first_dgram_len, dgram_len, offset, max_offset;
1098               session_dgram_hdr_t hdr;
1099
1100               ctx->sp.snd_mss = clib_min (ctx->sp.snd_mss, len);
1101               offset = ctx->hdr.data_length + sizeof (session_dgram_hdr_t);
1102               first_dgram_len = len;
1103               max_offset = clib_min (ctx->max_dequeue, 16 << 10);
1104
1105               while (offset < max_offset)
1106                 {
1107                   svm_fifo_peek (ctx->s->tx_fifo, offset, sizeof (ctx->hdr),
1108                                  (u8 *) & hdr);
1109                   ASSERT (hdr.data_length > hdr.data_offset);
1110                   dgram_len = hdr.data_length - hdr.data_offset;
1111                   if (len + dgram_len > ctx->max_dequeue
1112                       || first_dgram_len != dgram_len)
1113                     break;
1114                   len += dgram_len;
1115                   offset += sizeof (hdr) + hdr.data_length;
1116                 }
1117             }
1118
1119           ctx->max_dequeue = len;
1120         }
1121     }
1122   ASSERT (ctx->max_dequeue > 0);
1123
1124   /* Ensure we're not writing more than transport window allows */
1125   if (ctx->max_dequeue < ctx->sp.snd_space)
1126     {
1127       /* Constrained by tx queue. Try to send only fully formed segments */
1128       ctx->max_len_to_snd = (ctx->max_dequeue > ctx->sp.snd_mss) ?
1129         (ctx->max_dequeue - (ctx->max_dequeue % ctx->sp.snd_mss)) :
1130         ctx->max_dequeue;
1131       /* TODO Nagle ? */
1132     }
1133   else
1134     {
1135       /* Expectation is that snd_space0 is already a multiple of snd_mss */
1136       ctx->max_len_to_snd = ctx->sp.snd_space;
1137     }
1138
1139   /* Check if we're tx constrained by the node */
1140   ctx->n_segs_per_evt = ceil ((f64) ctx->max_len_to_snd / ctx->sp.snd_mss);
1141   if (ctx->n_segs_per_evt > max_segs)
1142     {
1143       ctx->n_segs_per_evt = max_segs;
1144       ctx->max_len_to_snd = max_segs * ctx->sp.snd_mss;
1145     }
1146
1147   ASSERT (n_bytes_per_buf > TRANSPORT_MAX_HDRS_LEN);
1148   if (ctx->n_segs_per_evt > 1)
1149     {
1150       u32 n_bytes_last_seg, n_bufs_last_seg;
1151
1152       n_bytes_per_seg = TRANSPORT_MAX_HDRS_LEN + ctx->sp.snd_mss;
1153       n_bytes_last_seg = TRANSPORT_MAX_HDRS_LEN + ctx->max_len_to_snd
1154         - ((ctx->n_segs_per_evt - 1) * ctx->sp.snd_mss);
1155       ctx->n_bufs_per_seg = ceil ((f64) n_bytes_per_seg / n_bytes_per_buf);
1156       n_bufs_last_seg = ceil ((f64) n_bytes_last_seg / n_bytes_per_buf);
1157       ctx->n_bufs_needed = ((ctx->n_segs_per_evt - 1) * ctx->n_bufs_per_seg)
1158         + n_bufs_last_seg;
1159     }
1160   else
1161     {
1162       n_bytes_per_seg = TRANSPORT_MAX_HDRS_LEN + ctx->max_len_to_snd;
1163       ctx->n_bufs_per_seg = ceil ((f64) n_bytes_per_seg / n_bytes_per_buf);
1164       ctx->n_bufs_needed = ctx->n_bufs_per_seg;
1165     }
1166
1167   ctx->deq_per_buf = clib_min (ctx->sp.snd_mss, n_bytes_per_buf);
1168   ctx->deq_per_first_buf = clib_min (ctx->sp.snd_mss,
1169                                      n_bytes_per_buf -
1170                                      TRANSPORT_MAX_HDRS_LEN);
1171 }
1172
1173 always_inline void
1174 session_tx_maybe_reschedule (session_worker_t * wrk,
1175                              session_tx_context_t * ctx,
1176                              session_evt_elt_t * elt)
1177 {
1178   session_t *s = ctx->s;
1179
1180   svm_fifo_unset_event (s->tx_fifo);
1181   if (svm_fifo_max_dequeue_cons (s->tx_fifo) > ctx->sp.tx_offset)
1182     if (svm_fifo_set_event (s->tx_fifo))
1183       session_evt_add_head_old (wrk, elt);
1184 }
1185
1186 always_inline int
1187 session_tx_fifo_read_and_snd_i (session_worker_t * wrk,
1188                                 vlib_node_runtime_t * node,
1189                                 session_evt_elt_t * elt,
1190                                 int *n_tx_packets, u8 peek_data)
1191 {
1192   u32 n_trace, n_left, pbi, next_index, max_burst;
1193   session_tx_context_t *ctx = &wrk->ctx;
1194   session_main_t *smm = &session_main;
1195   session_event_t *e = &elt->evt;
1196   vlib_main_t *vm = wrk->vm;
1197   transport_proto_t tp;
1198   vlib_buffer_t *pb;
1199   u16 n_bufs, rv;
1200
1201   if (PREDICT_FALSE ((rv = session_tx_not_ready (ctx->s, peek_data))))
1202     {
1203       if (rv < 2)
1204         session_evt_add_old (wrk, elt);
1205       return SESSION_TX_NO_DATA;
1206     }
1207
1208   next_index = smm->session_type_to_next[ctx->s->session_type];
1209   max_burst = SESSION_NODE_FRAME_SIZE - *n_tx_packets;
1210
1211   tp = session_get_transport_proto (ctx->s);
1212   ctx->transport_vft = transport_protocol_get_vft (tp);
1213   ctx->tc = session_tx_get_transport (ctx, peek_data);
1214
1215   if (PREDICT_FALSE (e->event_type == SESSION_IO_EVT_TX_FLUSH))
1216     {
1217       if (ctx->transport_vft->flush_data)
1218         ctx->transport_vft->flush_data (ctx->tc);
1219       e->event_type = SESSION_IO_EVT_TX;
1220     }
1221
1222   if (ctx->s->flags & SESSION_F_CUSTOM_TX)
1223     {
1224       u32 n_custom_tx;
1225       ctx->s->flags &= ~SESSION_F_CUSTOM_TX;
1226       ctx->sp.max_burst_size = max_burst;
1227       n_custom_tx = ctx->transport_vft->custom_tx (ctx->tc, &ctx->sp);
1228       *n_tx_packets += n_custom_tx;
1229       if (PREDICT_FALSE
1230           (ctx->s->session_state >= SESSION_STATE_TRANSPORT_CLOSED))
1231         return SESSION_TX_OK;
1232       max_burst -= n_custom_tx;
1233       if (!max_burst || (ctx->s->flags & SESSION_F_CUSTOM_TX))
1234         {
1235           session_evt_add_old (wrk, elt);
1236           return SESSION_TX_OK;
1237         }
1238     }
1239
1240   transport_connection_snd_params (ctx->tc, &ctx->sp);
1241
1242   if (!ctx->sp.snd_space)
1243     {
1244       /* If the deschedule flag was set, remove session from scheduler.
1245        * Transport is responsible for rescheduling this session. */
1246       if (ctx->sp.flags & TRANSPORT_SND_F_DESCHED)
1247         transport_connection_deschedule (ctx->tc);
1248       /* Request to postpone the session, e.g., zero-wnd and transport
1249        * is not currently probing */
1250       else if (ctx->sp.flags & TRANSPORT_SND_F_POSTPONE)
1251         session_evt_add_old (wrk, elt);
1252       /* This flow queue is "empty" so it should be re-evaluated before
1253        * the ones that have data to send. */
1254       else
1255         session_evt_add_head_old (wrk, elt);
1256
1257       return SESSION_TX_NO_DATA;
1258     }
1259
1260   if (transport_connection_is_tx_paced (ctx->tc))
1261     {
1262       u32 snd_space = transport_connection_tx_pacer_burst (ctx->tc);
1263       if (snd_space < TRANSPORT_PACER_MIN_BURST)
1264         {
1265           session_evt_add_head_old (wrk, elt);
1266           return SESSION_TX_NO_DATA;
1267         }
1268       snd_space = clib_min (ctx->sp.snd_space, snd_space);
1269       ctx->sp.snd_space = snd_space >= ctx->sp.snd_mss ?
1270         snd_space - snd_space % ctx->sp.snd_mss : snd_space;
1271     }
1272
1273   /* Check how much we can pull. */
1274   session_tx_set_dequeue_params (vm, ctx, max_burst, peek_data);
1275
1276   if (PREDICT_FALSE (!ctx->max_len_to_snd))
1277     {
1278       transport_connection_tx_pacer_reset_bucket (ctx->tc, 0);
1279       session_tx_maybe_reschedule (wrk, ctx, elt);
1280       return SESSION_TX_NO_DATA;
1281     }
1282
1283   vec_validate_aligned (ctx->tx_buffers, ctx->n_bufs_needed - 1,
1284                         CLIB_CACHE_LINE_BYTES);
1285   n_bufs = vlib_buffer_alloc (vm, ctx->tx_buffers, ctx->n_bufs_needed);
1286   if (PREDICT_FALSE (n_bufs < ctx->n_bufs_needed))
1287     {
1288       if (n_bufs)
1289         vlib_buffer_free (vm, ctx->tx_buffers, n_bufs);
1290       session_evt_add_head_old (wrk, elt);
1291       vlib_node_increment_counter (wrk->vm, node->node_index,
1292                                    SESSION_QUEUE_ERROR_NO_BUFFER, 1);
1293       return SESSION_TX_NO_BUFFERS;
1294     }
1295
1296   if (transport_connection_is_tx_paced (ctx->tc))
1297     transport_connection_tx_pacer_update_bytes (ctx->tc, ctx->max_len_to_snd);
1298
1299   ctx->left_to_snd = ctx->max_len_to_snd;
1300   n_left = ctx->n_segs_per_evt;
1301
1302   while (n_left >= 4)
1303     {
1304       vlib_buffer_t *b0, *b1;
1305       u32 bi0, bi1;
1306
1307       pbi = ctx->tx_buffers[n_bufs - 3];
1308       pb = vlib_get_buffer (vm, pbi);
1309       vlib_prefetch_buffer_header (pb, STORE);
1310       pbi = ctx->tx_buffers[n_bufs - 4];
1311       pb = vlib_get_buffer (vm, pbi);
1312       vlib_prefetch_buffer_header (pb, STORE);
1313
1314       bi0 = ctx->tx_buffers[--n_bufs];
1315       bi1 = ctx->tx_buffers[--n_bufs];
1316
1317       b0 = vlib_get_buffer (vm, bi0);
1318       b1 = vlib_get_buffer (vm, bi1);
1319
1320       session_tx_fill_buffer (vm, ctx, b0, &n_bufs, peek_data);
1321       session_tx_fill_buffer (vm, ctx, b1, &n_bufs, peek_data);
1322
1323       ctx->transport_vft->push_header (ctx->tc, b0);
1324       ctx->transport_vft->push_header (ctx->tc, b1);
1325
1326       n_left -= 2;
1327
1328       vec_add1 (wrk->pending_tx_buffers, bi0);
1329       vec_add1 (wrk->pending_tx_buffers, bi1);
1330       vec_add1 (wrk->pending_tx_nexts, next_index);
1331       vec_add1 (wrk->pending_tx_nexts, next_index);
1332     }
1333   while (n_left)
1334     {
1335       vlib_buffer_t *b0;
1336       u32 bi0;
1337
1338       if (n_left > 1)
1339         {
1340           pbi = ctx->tx_buffers[n_bufs - 2];
1341           pb = vlib_get_buffer (vm, pbi);
1342           vlib_prefetch_buffer_header (pb, STORE);
1343         }
1344
1345       bi0 = ctx->tx_buffers[--n_bufs];
1346       b0 = vlib_get_buffer (vm, bi0);
1347       session_tx_fill_buffer (vm, ctx, b0, &n_bufs, peek_data);
1348
1349       /* Ask transport to push header after current_length and
1350        * total_length_not_including_first_buffer are updated */
1351       ctx->transport_vft->push_header (ctx->tc, b0);
1352
1353       n_left -= 1;
1354
1355       vec_add1 (wrk->pending_tx_buffers, bi0);
1356       vec_add1 (wrk->pending_tx_nexts, next_index);
1357     }
1358
1359   if (PREDICT_FALSE ((n_trace = vlib_get_trace_count (vm, node)) > 0))
1360     session_tx_trace_frame (vm, node, next_index, wrk->pending_tx_buffers,
1361                             ctx->n_segs_per_evt, ctx->s, n_trace);
1362
1363   if (PREDICT_FALSE (n_bufs))
1364     vlib_buffer_free (vm, ctx->tx_buffers, n_bufs);
1365
1366   *n_tx_packets += ctx->n_segs_per_evt;
1367
1368   SESSION_EVT (SESSION_EVT_DEQ, ctx->s, ctx->max_len_to_snd, ctx->max_dequeue,
1369                ctx->s->tx_fifo->has_event, wrk->last_vlib_time);
1370
1371   ASSERT (ctx->left_to_snd == 0);
1372
1373   /* If we couldn't dequeue all bytes reschedule as old flow. Otherwise,
1374    * check if application enqueued more data and reschedule accordingly */
1375   if (ctx->max_len_to_snd < ctx->max_dequeue)
1376     session_evt_add_old (wrk, elt);
1377   else
1378     session_tx_maybe_reschedule (wrk, ctx, elt);
1379
1380   if (!peek_data)
1381     {
1382       u32 n_dequeued = ctx->max_len_to_snd;
1383       if (ctx->transport_vft->transport_options.tx_type == TRANSPORT_TX_DGRAM)
1384         n_dequeued += ctx->n_segs_per_evt * SESSION_CONN_HDR_LEN;
1385       if (svm_fifo_needs_deq_ntf (ctx->s->tx_fifo, n_dequeued))
1386         session_dequeue_notify (ctx->s);
1387     }
1388   return SESSION_TX_OK;
1389 }
1390
1391 int
1392 session_tx_fifo_peek_and_snd (session_worker_t * wrk,
1393                               vlib_node_runtime_t * node,
1394                               session_evt_elt_t * e, int *n_tx_packets)
1395 {
1396   return session_tx_fifo_read_and_snd_i (wrk, node, e, n_tx_packets, 1);
1397 }
1398
1399 int
1400 session_tx_fifo_dequeue_and_snd (session_worker_t * wrk,
1401                                  vlib_node_runtime_t * node,
1402                                  session_evt_elt_t * e, int *n_tx_packets)
1403 {
1404   return session_tx_fifo_read_and_snd_i (wrk, node, e, n_tx_packets, 0);
1405 }
1406
1407 int
1408 session_tx_fifo_dequeue_internal (session_worker_t * wrk,
1409                                   vlib_node_runtime_t * node,
1410                                   session_evt_elt_t * elt, int *n_tx_packets)
1411 {
1412   transport_send_params_t *sp = &wrk->ctx.sp;
1413   session_t *s = wrk->ctx.s;
1414   u32 n_packets;
1415
1416   if (PREDICT_FALSE (s->session_state >= SESSION_STATE_TRANSPORT_CLOSED))
1417     return 0;
1418
1419   /* Clear custom-tx flag used to request reschedule for tx */
1420   s->flags &= ~SESSION_F_CUSTOM_TX;
1421
1422   sp->max_burst_size = clib_min (SESSION_NODE_FRAME_SIZE - *n_tx_packets,
1423                                  TRANSPORT_PACER_MAX_BURST_PKTS);
1424
1425   n_packets = transport_custom_tx (session_get_transport_proto (s), s, sp);
1426   *n_tx_packets += n_packets;
1427
1428   if (s->flags & SESSION_F_CUSTOM_TX)
1429     {
1430       session_evt_add_old (wrk, elt);
1431     }
1432   else if (!(sp->flags & TRANSPORT_SND_F_DESCHED))
1433     {
1434       svm_fifo_unset_event (s->tx_fifo);
1435       if (svm_fifo_max_dequeue_cons (s->tx_fifo))
1436         if (svm_fifo_set_event (s->tx_fifo))
1437           session_evt_add_head_old (wrk, elt);
1438     }
1439
1440   if (sp->max_burst_size &&
1441       svm_fifo_needs_deq_ntf (s->tx_fifo, sp->max_burst_size))
1442     session_dequeue_notify (s);
1443
1444   return n_packets;
1445 }
1446
1447 always_inline session_t *
1448 session_event_get_session (session_worker_t * wrk, session_event_t * e)
1449 {
1450   if (PREDICT_FALSE (pool_is_free_index (wrk->sessions, e->session_index)))
1451     return 0;
1452
1453   ASSERT (session_is_valid (e->session_index, wrk->vm->thread_index));
1454   return pool_elt_at_index (wrk->sessions, e->session_index);
1455 }
1456
1457 always_inline void
1458 session_event_dispatch_ctrl (session_worker_t * wrk, session_evt_elt_t * elt)
1459 {
1460   clib_llist_index_t ei;
1461   void (*fp) (void *);
1462   session_event_t *e;
1463   session_t *s;
1464
1465   ei = clib_llist_entry_index (wrk->event_elts, elt);
1466   e = &elt->evt;
1467
1468   switch (e->event_type)
1469     {
1470     case SESSION_CTRL_EVT_RPC:
1471       fp = e->rpc_args.fp;
1472       (*fp) (e->rpc_args.arg);
1473       break;
1474     case SESSION_CTRL_EVT_HALF_CLOSE:
1475       s = session_get_from_handle_if_valid (e->session_handle);
1476       if (PREDICT_FALSE (!s))
1477         break;
1478       session_transport_half_close (s);
1479       break;
1480     case SESSION_CTRL_EVT_CLOSE:
1481       s = session_get_from_handle_if_valid (e->session_handle);
1482       if (PREDICT_FALSE (!s))
1483         break;
1484       session_transport_close (s);
1485       break;
1486     case SESSION_CTRL_EVT_RESET:
1487       s = session_get_from_handle_if_valid (e->session_handle);
1488       if (PREDICT_FALSE (!s))
1489         break;
1490       session_transport_reset (s);
1491       break;
1492     case SESSION_CTRL_EVT_LISTEN:
1493       session_mq_listen_handler (session_evt_ctrl_data (wrk, elt));
1494       break;
1495     case SESSION_CTRL_EVT_LISTEN_URI:
1496       session_mq_listen_uri_handler (session_evt_ctrl_data (wrk, elt));
1497       break;
1498     case SESSION_CTRL_EVT_UNLISTEN:
1499       session_mq_unlisten_handler (wrk, elt);
1500       break;
1501     case SESSION_CTRL_EVT_CONNECT:
1502       session_mq_connect_handler (wrk, elt);
1503       break;
1504     case SESSION_CTRL_EVT_CONNECT_URI:
1505       session_mq_connect_uri_handler (session_evt_ctrl_data (wrk, elt));
1506       break;
1507     case SESSION_CTRL_EVT_SHUTDOWN:
1508       session_mq_shutdown_handler (session_evt_ctrl_data (wrk, elt));
1509       break;
1510     case SESSION_CTRL_EVT_DISCONNECT:
1511       session_mq_disconnect_handler (session_evt_ctrl_data (wrk, elt));
1512       break;
1513     case SESSION_CTRL_EVT_DISCONNECTED:
1514       session_mq_disconnected_handler (session_evt_ctrl_data (wrk, elt));
1515       break;
1516     case SESSION_CTRL_EVT_ACCEPTED_REPLY:
1517       session_mq_accepted_reply_handler (session_evt_ctrl_data (wrk, elt));
1518       break;
1519     case SESSION_CTRL_EVT_DISCONNECTED_REPLY:
1520       session_mq_disconnected_reply_handler (session_evt_ctrl_data (wrk,
1521                                                                     elt));
1522       break;
1523     case SESSION_CTRL_EVT_RESET_REPLY:
1524       session_mq_reset_reply_handler (session_evt_ctrl_data (wrk, elt));
1525       break;
1526     case SESSION_CTRL_EVT_WORKER_UPDATE:
1527       session_mq_worker_update_handler (session_evt_ctrl_data (wrk, elt));
1528       break;
1529     case SESSION_CTRL_EVT_APP_DETACH:
1530       app_mq_detach_handler (session_evt_ctrl_data (wrk, elt));
1531       break;
1532     case SESSION_CTRL_EVT_APP_WRK_RPC:
1533       session_mq_app_wrk_rpc_handler (session_evt_ctrl_data (wrk, elt));
1534       break;
1535     case SESSION_CTRL_EVT_TRANSPORT_ATTR:
1536       session_mq_transport_attr_handler (session_evt_ctrl_data (wrk, elt));
1537       break;
1538     default:
1539       clib_warning ("unhandled event type %d", e->event_type);
1540     }
1541
1542   /* Regrab elements in case pool moved */
1543   elt = clib_llist_elt (wrk->event_elts, ei);
1544   if (!clib_llist_elt_is_linked (elt, evt_list))
1545     {
1546       e = &elt->evt;
1547       if (e->event_type >= SESSION_CTRL_EVT_BOUND)
1548         session_evt_ctrl_data_free (wrk, elt);
1549       clib_llist_put (wrk->event_elts, elt);
1550     }
1551   SESSION_EVT (SESSION_EVT_COUNTS, CNT_CTRL_EVTS, 1, wrk);
1552 }
1553
1554 always_inline void
1555 session_event_dispatch_io (session_worker_t * wrk, vlib_node_runtime_t * node,
1556                            session_evt_elt_t * elt, int *n_tx_packets)
1557 {
1558   session_main_t *smm = &session_main;
1559   app_worker_t *app_wrk;
1560   clib_llist_index_t ei;
1561   session_event_t *e;
1562   session_t *s;
1563
1564   ei = clib_llist_entry_index (wrk->event_elts, elt);
1565   e = &elt->evt;
1566
1567   switch (e->event_type)
1568     {
1569     case SESSION_IO_EVT_TX_FLUSH:
1570     case SESSION_IO_EVT_TX:
1571       s = session_event_get_session (wrk, e);
1572       if (PREDICT_FALSE (!s))
1573         break;
1574       CLIB_PREFETCH (s->tx_fifo, 2 * CLIB_CACHE_LINE_BYTES, LOAD);
1575       wrk->ctx.s = s;
1576       /* Spray packets in per session type frames, since they go to
1577        * different nodes */
1578       (smm->session_tx_fns[s->session_type]) (wrk, node, elt, n_tx_packets);
1579       break;
1580     case SESSION_IO_EVT_RX:
1581       s = session_event_get_session (wrk, e);
1582       if (!s)
1583         break;
1584       transport_app_rx_evt (session_get_transport_proto (s),
1585                             s->connection_index, s->thread_index);
1586       break;
1587     case SESSION_IO_EVT_BUILTIN_RX:
1588       s = session_event_get_session (wrk, e);
1589       if (PREDICT_FALSE (!s || s->session_state >= SESSION_STATE_CLOSING))
1590         break;
1591       svm_fifo_unset_event (s->rx_fifo);
1592       app_wrk = app_worker_get (s->app_wrk_index);
1593       app_worker_builtin_rx (app_wrk, s);
1594       break;
1595     case SESSION_IO_EVT_BUILTIN_TX:
1596       s = session_get_from_handle_if_valid (e->session_handle);
1597       wrk->ctx.s = s;
1598       if (PREDICT_TRUE (s != 0))
1599         session_tx_fifo_dequeue_internal (wrk, node, elt, n_tx_packets);
1600       break;
1601     default:
1602       clib_warning ("unhandled event type %d", e->event_type);
1603     }
1604
1605   SESSION_EVT (SESSION_IO_EVT_COUNTS, e->event_type, 1, wrk);
1606
1607   /* Regrab elements in case pool moved */
1608   elt = clib_llist_elt (wrk->event_elts, ei);
1609   if (!clib_llist_elt_is_linked (elt, evt_list))
1610     clib_llist_put (wrk->event_elts, elt);
1611 }
1612
1613 /* *INDENT-OFF* */
1614 static const u32 session_evt_msg_sizes[] = {
1615 #define _(symc, sym)                                                    \
1616   [SESSION_CTRL_EVT_ ## symc] = sizeof (session_ ## sym ##_msg_t),
1617   foreach_session_ctrl_evt
1618 #undef _
1619 };
1620 /* *INDENT-ON* */
1621
1622 always_inline void
1623 session_evt_add_to_list (session_worker_t * wrk, session_event_t * evt)
1624 {
1625   session_evt_elt_t *elt;
1626
1627   if (evt->event_type >= SESSION_CTRL_EVT_RPC)
1628     {
1629       elt = session_evt_alloc_ctrl (wrk);
1630       if (evt->event_type >= SESSION_CTRL_EVT_BOUND)
1631         {
1632           elt->evt.ctrl_data_index = session_evt_ctrl_data_alloc (wrk);
1633           elt->evt.event_type = evt->event_type;
1634           clib_memcpy_fast (session_evt_ctrl_data (wrk, elt), evt->data,
1635                             session_evt_msg_sizes[evt->event_type]);
1636         }
1637       else
1638         {
1639           /* Internal control events fit into io events footprint */
1640           clib_memcpy_fast (&elt->evt, evt, sizeof (elt->evt));
1641         }
1642     }
1643   else
1644     {
1645       elt = session_evt_alloc_new (wrk);
1646       clib_memcpy_fast (&elt->evt, evt, sizeof (elt->evt));
1647     }
1648 }
1649
1650 static void
1651 session_flush_pending_tx_buffers (session_worker_t * wrk,
1652                                   vlib_node_runtime_t * node)
1653 {
1654   vlib_buffer_enqueue_to_next (wrk->vm, node, wrk->pending_tx_buffers,
1655                                wrk->pending_tx_nexts,
1656                                vec_len (wrk->pending_tx_nexts));
1657   vec_reset_length (wrk->pending_tx_buffers);
1658   vec_reset_length (wrk->pending_tx_nexts);
1659 }
1660
1661 int
1662 session_wrk_handle_mq (session_worker_t *wrk, svm_msg_q_t *mq)
1663 {
1664   svm_msg_q_msg_t _msg, *msg = &_msg;
1665   u32 i, n_to_dequeue = 0;
1666   session_event_t *evt;
1667
1668   n_to_dequeue = svm_msg_q_size (mq);
1669   for (i = 0; i < n_to_dequeue; i++)
1670     {
1671       svm_msg_q_sub_raw (mq, msg);
1672       evt = svm_msg_q_msg_data (mq, msg);
1673       session_evt_add_to_list (wrk, evt);
1674       svm_msg_q_free_msg (mq, msg);
1675     }
1676
1677   return n_to_dequeue;
1678 }
1679
1680 static void
1681 session_wrk_update_state (session_worker_t *wrk)
1682 {
1683   vlib_main_t *vm = wrk->vm;
1684
1685   if (wrk->state == SESSION_WRK_POLLING)
1686     {
1687       if (clib_llist_elts (wrk->event_elts) == 4 &&
1688           vlib_last_vectors_per_main_loop (vm) < 1)
1689         {
1690           session_wrk_set_state (wrk, SESSION_WRK_INTERRUPT);
1691           vlib_node_set_state (vm, session_queue_node.index,
1692                                VLIB_NODE_STATE_INTERRUPT);
1693         }
1694     }
1695   else if (wrk->state == SESSION_WRK_INTERRUPT)
1696     {
1697       if (clib_llist_elts (wrk->event_elts) > 4 ||
1698           vlib_last_vectors_per_main_loop (vm) > 1)
1699         {
1700           session_wrk_set_state (wrk, SESSION_WRK_POLLING);
1701           vlib_node_set_state (vm, session_queue_node.index,
1702                                VLIB_NODE_STATE_POLLING);
1703         }
1704       else if (PREDICT_FALSE (!pool_elts (wrk->sessions)))
1705         {
1706           session_wrk_set_state (wrk, SESSION_WRK_IDLE);
1707         }
1708     }
1709   else
1710     {
1711       if (clib_llist_elts (wrk->event_elts))
1712         {
1713           session_wrk_set_state (wrk, SESSION_WRK_INTERRUPT);
1714         }
1715     }
1716 }
1717
1718 static uword
1719 session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
1720                        vlib_frame_t * frame)
1721 {
1722   u32 thread_index = vm->thread_index, __clib_unused n_evts;
1723   session_evt_elt_t *elt, *ctrl_he, *new_he, *old_he;
1724   session_main_t *smm = vnet_get_session_main ();
1725   session_worker_t *wrk = &smm->wrk[thread_index];
1726   clib_llist_index_t ei, next_ei, old_ti;
1727   int n_tx_packets;
1728
1729   SESSION_EVT (SESSION_EVT_DISPATCH_START, wrk);
1730
1731   session_wrk_update_time (wrk, vlib_time_now (vm));
1732
1733   /*
1734    *  Update transport time
1735    */
1736   transport_update_time (wrk->last_vlib_time, thread_index);
1737   n_tx_packets = vec_len (wrk->pending_tx_buffers);
1738   SESSION_EVT (SESSION_EVT_DSP_CNTRS, UPDATE_TIME, wrk);
1739
1740   /*
1741    *  Dequeue new internal mq events
1742    */
1743
1744   n_evts = session_wrk_handle_mq (wrk, wrk->vpp_event_queue);
1745   SESSION_EVT (SESSION_EVT_DSP_CNTRS, MQ_DEQ, wrk, n_evts);
1746
1747   /*
1748    * Handle control events
1749    */
1750
1751   ei = wrk->ctrl_head;
1752   ctrl_he = clib_llist_elt (wrk->event_elts, ei);
1753   next_ei = clib_llist_next_index (ctrl_he, evt_list);
1754   old_ti = clib_llist_prev_index (ctrl_he, evt_list);
1755   while (ei != old_ti)
1756     {
1757       ei = next_ei;
1758       elt = clib_llist_elt (wrk->event_elts, next_ei);
1759       next_ei = clib_llist_next_index (elt, evt_list);
1760       clib_llist_remove (wrk->event_elts, evt_list, elt);
1761       session_event_dispatch_ctrl (wrk, elt);
1762     }
1763
1764   SESSION_EVT (SESSION_EVT_DSP_CNTRS, CTRL_EVTS, wrk);
1765
1766   /*
1767    * Handle the new io events.
1768    */
1769
1770   new_he = clib_llist_elt (wrk->event_elts, wrk->new_head);
1771   old_he = clib_llist_elt (wrk->event_elts, wrk->old_head);
1772   old_ti = clib_llist_prev_index (old_he, evt_list);
1773
1774   ei = clib_llist_next_index (new_he, evt_list);
1775   while (ei != wrk->new_head && n_tx_packets < SESSION_NODE_FRAME_SIZE)
1776     {
1777       elt = clib_llist_elt (wrk->event_elts, ei);
1778       ei = clib_llist_next_index (elt, evt_list);
1779       clib_llist_remove (wrk->event_elts, evt_list, elt);
1780       session_event_dispatch_io (wrk, node, elt, &n_tx_packets);
1781     }
1782
1783   SESSION_EVT (SESSION_EVT_DSP_CNTRS, NEW_IO_EVTS, wrk);
1784
1785   /*
1786    * Handle the old io events, if we had any prior to processing the new ones
1787    */
1788
1789   if (old_ti != wrk->old_head)
1790     {
1791       old_he = clib_llist_elt (wrk->event_elts, wrk->old_head);
1792       ei = clib_llist_next_index (old_he, evt_list);
1793
1794       while (n_tx_packets < SESSION_NODE_FRAME_SIZE)
1795         {
1796           elt = clib_llist_elt (wrk->event_elts, ei);
1797           next_ei = clib_llist_next_index (elt, evt_list);
1798           clib_llist_remove (wrk->event_elts, evt_list, elt);
1799
1800           session_event_dispatch_io (wrk, node, elt, &n_tx_packets);
1801
1802           if (ei == old_ti)
1803             break;
1804
1805           ei = next_ei;
1806         };
1807     }
1808
1809   SESSION_EVT (SESSION_EVT_DSP_CNTRS, OLD_IO_EVTS, wrk);
1810
1811   if (vec_len (wrk->pending_tx_buffers))
1812     session_flush_pending_tx_buffers (wrk, node);
1813
1814   vlib_node_increment_counter (vm, session_queue_node.index,
1815                                SESSION_QUEUE_ERROR_TX, n_tx_packets);
1816
1817   SESSION_EVT (SESSION_EVT_DISPATCH_END, wrk, n_tx_packets);
1818
1819   if (wrk->flags & SESSION_WRK_F_ADAPTIVE)
1820     session_wrk_update_state (wrk);
1821
1822   return n_tx_packets;
1823 }
1824
1825 /* *INDENT-OFF* */
1826 VLIB_REGISTER_NODE (session_queue_node) =
1827 {
1828   .function = session_queue_node_fn,
1829   .flags = VLIB_NODE_FLAG_TRACE_SUPPORTED,
1830   .name = "session-queue",
1831   .format_trace = format_session_queue_trace,
1832   .type = VLIB_NODE_TYPE_INPUT,
1833   .n_errors = ARRAY_LEN (session_queue_error_strings),
1834   .error_strings = session_queue_error_strings,
1835   .state = VLIB_NODE_STATE_DISABLED,
1836 };
1837 /* *INDENT-ON* */
1838
1839 static clib_error_t *
1840 session_wrk_tfd_read_ready (clib_file_t *cf)
1841 {
1842   session_worker_t *wrk = session_main_get_worker (cf->private_data);
1843   u64 buf;
1844   int rv;
1845
1846   vlib_node_set_interrupt_pending (wrk->vm, session_queue_node.index);
1847   rv = read (wrk->timerfd, &buf, sizeof (buf));
1848   if (rv < 0 && errno != EAGAIN)
1849     clib_unix_warning ("failed");
1850   return 0;
1851 }
1852
1853 static clib_error_t *
1854 session_wrk_tfd_write_ready (clib_file_t *cf)
1855 {
1856   return 0;
1857 }
1858
1859 void
1860 session_wrk_enable_adaptive_mode (session_worker_t *wrk)
1861 {
1862   u32 thread_index = wrk->vm->thread_index;
1863   clib_file_t template = { 0 };
1864
1865   if ((wrk->timerfd = timerfd_create (CLOCK_MONOTONIC, TFD_NONBLOCK)) < 0)
1866     clib_warning ("timerfd_create");
1867
1868   template.read_function = session_wrk_tfd_read_ready;
1869   template.write_function = session_wrk_tfd_write_ready;
1870   template.file_descriptor = wrk->timerfd;
1871   template.private_data = thread_index;
1872   template.polling_thread_index = thread_index;
1873   template.description = format (0, "session-wrk-tfd-%u", thread_index);
1874
1875   wrk->timerfd_file = clib_file_add (&file_main, &template);
1876   wrk->flags |= SESSION_WRK_F_ADAPTIVE;
1877 }
1878
1879 static clib_error_t *
1880 session_queue_exit (vlib_main_t * vm)
1881 {
1882   if (vlib_get_n_threads () < 2)
1883     return 0;
1884
1885   /*
1886    * Shut off (especially) worker-thread session nodes.
1887    * Otherwise, vpp can crash as the main thread unmaps the
1888    * API segment.
1889    */
1890   vlib_worker_thread_barrier_sync (vm);
1891   session_node_enable_disable (0 /* is_enable */ );
1892   vlib_worker_thread_barrier_release (vm);
1893   return 0;
1894 }
1895
1896 VLIB_MAIN_LOOP_EXIT_FUNCTION (session_queue_exit);
1897
1898 static uword
1899 session_queue_run_on_main (vlib_main_t * vm)
1900 {
1901   vlib_node_runtime_t *node;
1902
1903   node = vlib_node_get_runtime (vm, session_queue_node.index);
1904   return session_queue_node_fn (vm, node, 0);
1905 }
1906
1907 static uword
1908 session_queue_process (vlib_main_t * vm, vlib_node_runtime_t * rt,
1909                        vlib_frame_t * f)
1910 {
1911   uword *event_data = 0;
1912   f64 timeout = 1.0;
1913   uword event_type;
1914
1915   while (1)
1916     {
1917       vlib_process_wait_for_event_or_clock (vm, timeout);
1918       event_type = vlib_process_get_events (vm, (uword **) & event_data);
1919
1920       switch (event_type)
1921         {
1922         case SESSION_Q_PROCESS_RUN_ON_MAIN:
1923           /* Run session queue node on main thread */
1924           session_queue_run_on_main (vm);
1925           break;
1926         case SESSION_Q_PROCESS_STOP:
1927           vlib_node_set_state (vm, session_queue_process_node.index,
1928                                VLIB_NODE_STATE_DISABLED);
1929           timeout = 100000.0;
1930           break;
1931         case ~0:
1932           /* Timed out. Run on main to ensure all events are handled */
1933           session_queue_run_on_main (vm);
1934           break;
1935         }
1936       vec_reset_length (event_data);
1937     }
1938   return 0;
1939 }
1940
1941 /* *INDENT-OFF* */
1942 VLIB_REGISTER_NODE (session_queue_process_node) =
1943 {
1944   .function = session_queue_process,
1945   .type = VLIB_NODE_TYPE_PROCESS,
1946   .name = "session-queue-process",
1947   .state = VLIB_NODE_STATE_DISABLED,
1948 };
1949 /* *INDENT-ON* */
1950
1951 static_always_inline uword
1952 session_queue_pre_input_inline (vlib_main_t * vm, vlib_node_runtime_t * node,
1953                                 vlib_frame_t * frame)
1954 {
1955   session_main_t *sm = &session_main;
1956   if (!sm->wrk[0].vpp_event_queue)
1957     return 0;
1958   node = vlib_node_get_runtime (vm, session_queue_node.index);
1959   return session_queue_node_fn (vm, node, frame);
1960 }
1961
1962 /* *INDENT-OFF* */
1963 VLIB_REGISTER_NODE (session_queue_pre_input_node) =
1964 {
1965   .function = session_queue_pre_input_inline,
1966   .type = VLIB_NODE_TYPE_PRE_INPUT,
1967   .name = "session-queue-main",
1968   .state = VLIB_NODE_STATE_DISABLED,
1969 };
1970 /* *INDENT-ON* */
1971
1972 /*
1973  * fd.io coding-style-patch-verification: ON
1974  *
1975  * Local Variables:
1976  * eval: (c-set-style "gnu")
1977  * End:
1978  */