session: cleanup event llist usage
[vpp.git] / src / vnet / session / session_node.c
1 /*
2  * Copyright (c) 2017-2019 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include <math.h>
17 #include <vlib/vlib.h>
18 #include <vnet/vnet.h>
19 #include <vppinfra/elog.h>
20 #include <vnet/session/transport.h>
21 #include <vnet/session/session.h>
22 #include <vnet/session/application.h>
23 #include <vnet/session/application_interface.h>
24 #include <vnet/session/application_local.h>
25 #include <vnet/session/session_debug.h>
26 #include <svm/queue.h>
27 #include <sys/timerfd.h>
28
29 #define app_check_thread_and_barrier(_fn, _arg)                         \
30   if (!vlib_thread_is_main_w_barrier ())                                \
31     {                                                                   \
32      vlib_rpc_call_main_thread (_fn, (u8 *) _arg, sizeof(*_arg));       \
33       return;                                                           \
34    }
35
36 static void
37 session_wrk_timerfd_update (session_worker_t *wrk, u64 time_ns)
38 {
39   struct itimerspec its;
40
41   its.it_value.tv_sec = 0;
42   its.it_value.tv_nsec = time_ns;
43   its.it_interval.tv_sec = 0;
44   its.it_interval.tv_nsec = its.it_value.tv_nsec;
45
46   if (timerfd_settime (wrk->timerfd, 0, &its, NULL) == -1)
47     clib_warning ("timerfd_settime");
48 }
49
50 always_inline u64
51 session_wrk_tfd_timeout (session_wrk_state_t state, u32 thread_index)
52 {
53   if (state == SESSION_WRK_INTERRUPT)
54     return thread_index ? 1e6 : vlib_num_workers () ? 5e8 : 1e6;
55   else if (state == SESSION_WRK_IDLE)
56     return thread_index ? 1e8 : vlib_num_workers () ? 5e8 : 1e8;
57   else
58     return 0;
59 }
60
61 static inline void
62 session_wrk_set_state (session_worker_t *wrk, session_wrk_state_t state)
63 {
64   u64 time_ns;
65
66   wrk->state = state;
67   if (wrk->timerfd == -1)
68     return;
69   time_ns = session_wrk_tfd_timeout (state, wrk->vm->thread_index);
70   session_wrk_timerfd_update (wrk, time_ns);
71 }
72
73 static transport_endpt_ext_cfg_t *
74 session_mq_get_ext_config (application_t *app, uword offset)
75 {
76   svm_fifo_chunk_t *c;
77   fifo_segment_t *fs;
78
79   fs = application_get_rx_mqs_segment (app);
80   c = fs_chunk_ptr (fs->h, offset);
81   return (transport_endpt_ext_cfg_t *) c->data;
82 }
83
84 static void
85 session_mq_free_ext_config (application_t *app, uword offset)
86 {
87   svm_fifo_chunk_t *c;
88   fifo_segment_t *fs;
89
90   fs = application_get_rx_mqs_segment (app);
91   c = fs_chunk_ptr (fs->h, offset);
92   fifo_segment_collect_chunk (fs, 0 /* only one slice */, c);
93 }
94
95 static void
96 session_mq_listen_handler (void *data)
97 {
98   session_listen_msg_t *mp = (session_listen_msg_t *) data;
99   vnet_listen_args_t _a, *a = &_a;
100   app_worker_t *app_wrk;
101   application_t *app;
102   int rv;
103
104   app_check_thread_and_barrier (session_mq_listen_handler, mp);
105
106   app = application_lookup (mp->client_index);
107   if (!app)
108     return;
109
110   clib_memset (a, 0, sizeof (*a));
111   a->sep.is_ip4 = mp->is_ip4;
112   ip_copy (&a->sep.ip, &mp->ip, mp->is_ip4);
113   a->sep.port = mp->port;
114   a->sep.fib_index = mp->vrf;
115   a->sep.sw_if_index = ENDPOINT_INVALID_INDEX;
116   a->sep.transport_proto = mp->proto;
117   a->app_index = app->app_index;
118   a->wrk_map_index = mp->wrk_index;
119   a->sep_ext.transport_flags = mp->flags;
120
121   if (mp->ext_config)
122     a->sep_ext.ext_cfg = session_mq_get_ext_config (app, mp->ext_config);
123
124   if ((rv = vnet_listen (a)))
125     clib_warning ("listen returned: %U", format_session_error, rv);
126
127   app_wrk = application_get_worker (app, mp->wrk_index);
128   mq_send_session_bound_cb (app_wrk->wrk_index, mp->context, a->handle, rv);
129
130   if (mp->ext_config)
131     session_mq_free_ext_config (app, mp->ext_config);
132 }
133
134 static void
135 session_mq_listen_uri_handler (void *data)
136 {
137   session_listen_uri_msg_t *mp = (session_listen_uri_msg_t *) data;
138   vnet_listen_args_t _a, *a = &_a;
139   app_worker_t *app_wrk;
140   application_t *app;
141   int rv;
142
143   app_check_thread_and_barrier (session_mq_listen_uri_handler, mp);
144
145   app = application_lookup (mp->client_index);
146   if (!app)
147     return;
148
149   clib_memset (a, 0, sizeof (*a));
150   a->uri = (char *) mp->uri;
151   a->app_index = app->app_index;
152   rv = vnet_bind_uri (a);
153
154   app_wrk = application_get_worker (app, 0);
155   mq_send_session_bound_cb (app_wrk->wrk_index, mp->context, a->handle, rv);
156 }
157
158 static void
159 session_mq_connect_one (session_connect_msg_t *mp)
160 {
161   vnet_connect_args_t _a, *a = &_a;
162   app_worker_t *app_wrk;
163   application_t *app;
164   int rv;
165
166   app = application_lookup (mp->client_index);
167   if (!app)
168     return;
169
170   clib_memset (a, 0, sizeof (*a));
171   a->sep.is_ip4 = mp->is_ip4;
172   clib_memcpy_fast (&a->sep.ip, &mp->ip, sizeof (mp->ip));
173   a->sep.port = mp->port;
174   a->sep.transport_proto = mp->proto;
175   a->sep.peer.fib_index = mp->vrf;
176   clib_memcpy_fast (&a->sep.peer.ip, &mp->lcl_ip, sizeof (mp->lcl_ip));
177   if (mp->is_ip4)
178     {
179       ip46_address_mask_ip4 (&a->sep.ip);
180       ip46_address_mask_ip4 (&a->sep.peer.ip);
181     }
182   a->sep.peer.port = mp->lcl_port;
183   a->sep.peer.sw_if_index = ENDPOINT_INVALID_INDEX;
184   a->sep_ext.parent_handle = mp->parent_handle;
185   a->sep_ext.transport_flags = mp->flags;
186   a->api_context = mp->context;
187   a->app_index = app->app_index;
188   a->wrk_map_index = mp->wrk_index;
189
190   if (mp->ext_config)
191     a->sep_ext.ext_cfg = session_mq_get_ext_config (app, mp->ext_config);
192
193   if ((rv = vnet_connect (a)))
194     {
195       clib_warning ("connect returned: %U", format_session_error, rv);
196       app_wrk = application_get_worker (app, mp->wrk_index);
197       mq_send_session_connected_cb (app_wrk->wrk_index, mp->context, 0, rv);
198     }
199
200   if (mp->ext_config)
201     session_mq_free_ext_config (app, mp->ext_config);
202 }
203
204 static void
205 session_mq_handle_connects_rpc (void *arg)
206 {
207   u32 max_connects = 32, n_connects = 0;
208   vlib_main_t *vm = vlib_get_main ();
209   session_evt_elt_t *he, *elt, *next;
210   session_worker_t *fwrk, *wrk;
211
212   ASSERT (vlib_get_thread_index () == 0);
213
214   /* Pending connects on linked list pertaining to first worker */
215   fwrk = session_main_get_worker (1);
216
217   vlib_worker_thread_barrier_sync (vm);
218
219   he = clib_llist_elt (fwrk->event_elts, fwrk->pending_connects);
220   elt = clib_llist_next (fwrk->event_elts, evt_list, he);
221
222   /* Avoid holding the barrier for too long */
223   while (n_connects < max_connects && elt != he)
224     {
225       next = clib_llist_next (fwrk->event_elts, evt_list, elt);
226       clib_llist_remove (fwrk->event_elts, evt_list, elt);
227       session_mq_connect_one (session_evt_ctrl_data (fwrk, elt));
228       clib_llist_put (fwrk->event_elts, elt);
229       elt = next;
230       n_connects += 1;
231     }
232
233   /* Switch worker to poll mode if it was in interrupt mode and had work or
234    * back to interrupt if threshold of loops without a connect is passed.
235    * While in poll mode, reprogram connects rpc */
236   wrk = session_main_get_worker (0);
237   if (wrk->state != SESSION_WRK_POLLING)
238     {
239       if (!n_connects)
240         goto done;
241
242       session_wrk_set_state (wrk, SESSION_WRK_POLLING);
243       vlib_node_set_state (vm, session_queue_node.index,
244                            VLIB_NODE_STATE_POLLING);
245       wrk->no_connect_loops = 0;
246     }
247   else
248     {
249       if (!n_connects)
250         {
251           if (++wrk->no_connect_loops > 1e5)
252             {
253               session_wrk_set_state (wrk, SESSION_WRK_INTERRUPT);
254               vlib_node_set_state (vm, session_queue_node.index,
255                                    VLIB_NODE_STATE_INTERRUPT);
256               fwrk->pending_connects_ntf = 0;
257               goto done;
258             }
259         }
260       else
261         wrk->no_connect_loops = 0;
262     }
263
264   elt = session_evt_alloc_ctrl (wrk);
265   elt->evt.event_type = SESSION_CTRL_EVT_RPC;
266   elt->evt.rpc_args.fp = session_mq_handle_connects_rpc;
267
268 done:
269   vlib_worker_thread_barrier_release (vm);
270 }
271
272 static void
273 session_mq_connect_handler (session_worker_t *wrk, session_evt_elt_t *elt)
274 {
275   u32 thread_index = wrk - session_main.wrk;
276   session_evt_elt_t *he;
277
278   /* No workers, so just deal with the connect now */
279   if (PREDICT_FALSE (!thread_index))
280     {
281       session_mq_connect_one (session_evt_ctrl_data (wrk, elt));
282       return;
283     }
284
285   if (PREDICT_FALSE (thread_index != 1))
286     {
287       clib_warning ("Connect on wrong thread. Dropping");
288       return;
289     }
290
291   /* Add to pending list to be handled by main thread */
292   he = clib_llist_elt (wrk->event_elts, wrk->pending_connects);
293   clib_llist_add_tail (wrk->event_elts, evt_list, elt, he);
294
295   if (!wrk->pending_connects_ntf)
296     {
297       vlib_node_set_interrupt_pending (vlib_get_main_by_index (0),
298                                        session_queue_node.index);
299       session_send_rpc_evt_to_thread (0, session_mq_handle_connects_rpc, 0);
300       wrk->pending_connects_ntf = 1;
301     }
302 }
303
304 static void
305 session_mq_connect_uri_handler (void *data)
306 {
307   session_connect_uri_msg_t *mp = (session_connect_uri_msg_t *) data;
308   vnet_connect_args_t _a, *a = &_a;
309   app_worker_t *app_wrk;
310   application_t *app;
311   int rv;
312
313   app_check_thread_and_barrier (session_mq_connect_uri_handler, mp);
314
315   app = application_lookup (mp->client_index);
316   if (!app)
317     return;
318
319   clib_memset (a, 0, sizeof (*a));
320   a->uri = (char *) mp->uri;
321   a->api_context = mp->context;
322   a->app_index = app->app_index;
323   if ((rv = vnet_connect_uri (a)))
324     {
325       clib_warning ("connect_uri returned: %d", rv);
326       app_wrk = application_get_worker (app, 0 /* default wrk only */ );
327       mq_send_session_connected_cb (app_wrk->wrk_index, mp->context, 0, rv);
328     }
329 }
330
331 static void
332 session_mq_shutdown_handler (void *data)
333 {
334   session_shutdown_msg_t *mp = (session_shutdown_msg_t *) data;
335   vnet_shutdown_args_t _a, *a = &_a;
336   application_t *app;
337
338   app = application_lookup (mp->client_index);
339   if (!app)
340     return;
341
342   a->app_index = app->app_index;
343   a->handle = mp->handle;
344   vnet_shutdown_session (a);
345 }
346
347 static void
348 session_mq_disconnect_handler (void *data)
349 {
350   session_disconnect_msg_t *mp = (session_disconnect_msg_t *) data;
351   vnet_disconnect_args_t _a, *a = &_a;
352   application_t *app;
353
354   app = application_lookup (mp->client_index);
355   if (!app)
356     return;
357
358   a->app_index = app->app_index;
359   a->handle = mp->handle;
360   vnet_disconnect_session (a);
361 }
362
363 static void
364 app_mq_detach_handler (void *data)
365 {
366   session_app_detach_msg_t *mp = (session_app_detach_msg_t *) data;
367   vnet_app_detach_args_t _a, *a = &_a;
368   application_t *app;
369
370   app_check_thread_and_barrier (app_mq_detach_handler, mp);
371
372   app = application_lookup (mp->client_index);
373   if (!app)
374     return;
375
376   a->app_index = app->app_index;
377   a->api_client_index = mp->client_index;
378   vnet_application_detach (a);
379 }
380
381 static void
382 session_mq_unlisten_handler (void *data)
383 {
384   session_unlisten_msg_t *mp = (session_unlisten_msg_t *) data;
385   vnet_unlisten_args_t _a, *a = &_a;
386   app_worker_t *app_wrk;
387   application_t *app;
388   int rv;
389
390   app_check_thread_and_barrier (session_mq_unlisten_handler, mp);
391
392   app = application_lookup (mp->client_index);
393   if (!app)
394     return;
395
396   clib_memset (a, 0, sizeof (*a));
397   a->app_index = app->app_index;
398   a->handle = mp->handle;
399   a->wrk_map_index = mp->wrk_index;
400   if ((rv = vnet_unlisten (a)))
401     clib_warning ("unlisten returned: %d", rv);
402
403   app_wrk = application_get_worker (app, a->wrk_map_index);
404   if (!app_wrk)
405     return;
406
407   mq_send_unlisten_reply (app_wrk, mp->handle, mp->context, rv);
408 }
409
410 static void
411 session_mq_accepted_reply_handler (void *data)
412 {
413   session_accepted_reply_msg_t *mp = (session_accepted_reply_msg_t *) data;
414   vnet_disconnect_args_t _a = { 0 }, *a = &_a;
415   session_state_t old_state;
416   app_worker_t *app_wrk;
417   session_t *s;
418
419   /* Server isn't interested, kill the session */
420   if (mp->retval)
421     {
422       a->app_index = mp->context;
423       a->handle = mp->handle;
424       vnet_disconnect_session (a);
425       return;
426     }
427
428   /* Mail this back from the main thread. We're not polling in main
429    * thread so we're using other workers for notifications. */
430   if (vlib_num_workers () && vlib_get_thread_index () != 0
431       && session_thread_from_handle (mp->handle) == 0)
432     {
433       vlib_rpc_call_main_thread (session_mq_accepted_reply_handler,
434                                  (u8 *) mp, sizeof (*mp));
435       return;
436     }
437
438   s = session_get_from_handle_if_valid (mp->handle);
439   if (!s)
440     return;
441
442   app_wrk = app_worker_get (s->app_wrk_index);
443   if (app_wrk->app_index != mp->context)
444     {
445       clib_warning ("app doesn't own session");
446       return;
447     }
448
449   if (!session_has_transport (s))
450     {
451       s->session_state = SESSION_STATE_READY;
452       if (ct_session_connect_notify (s))
453         return;
454     }
455   else
456     {
457       old_state = s->session_state;
458       s->session_state = SESSION_STATE_READY;
459
460       if (!svm_fifo_is_empty_prod (s->rx_fifo))
461         app_worker_lock_and_send_event (app_wrk, s, SESSION_IO_EVT_RX);
462
463       /* Closed while waiting for app to reply. Resend disconnect */
464       if (old_state >= SESSION_STATE_TRANSPORT_CLOSING)
465         {
466           app_worker_close_notify (app_wrk, s);
467           s->session_state = old_state;
468           return;
469         }
470     }
471 }
472
473 static void
474 session_mq_reset_reply_handler (void *data)
475 {
476   vnet_disconnect_args_t _a = { 0 }, *a = &_a;
477   session_reset_reply_msg_t *mp;
478   app_worker_t *app_wrk;
479   session_t *s;
480   application_t *app;
481   u32 index, thread_index;
482
483   mp = (session_reset_reply_msg_t *) data;
484   app = application_lookup (mp->context);
485   if (!app)
486     return;
487
488   session_parse_handle (mp->handle, &index, &thread_index);
489   s = session_get_if_valid (index, thread_index);
490
491   /* No session or not the right session */
492   if (!s || s->session_state < SESSION_STATE_TRANSPORT_CLOSING)
493     return;
494
495   app_wrk = app_worker_get (s->app_wrk_index);
496   if (!app_wrk || app_wrk->app_index != app->app_index)
497     {
498       clib_warning ("App %u does not own handle 0x%lx!", app->app_index,
499                     mp->handle);
500       return;
501     }
502
503   /* Client objected to resetting the session, log and continue */
504   if (mp->retval)
505     {
506       clib_warning ("client retval %d", mp->retval);
507       return;
508     }
509
510   /* This comes as a response to a reset, transport only waiting for
511    * confirmation to remove connection state, no need to disconnect */
512   a->handle = mp->handle;
513   a->app_index = app->app_index;
514   vnet_disconnect_session (a);
515 }
516
517 static void
518 session_mq_disconnected_handler (void *data)
519 {
520   session_disconnected_reply_msg_t *rmp;
521   vnet_disconnect_args_t _a, *a = &_a;
522   svm_msg_q_msg_t _msg, *msg = &_msg;
523   session_disconnected_msg_t *mp;
524   app_worker_t *app_wrk;
525   session_event_t *evt;
526   session_t *s;
527   application_t *app;
528   int rv = 0;
529
530   mp = (session_disconnected_msg_t *) data;
531   if (!(s = session_get_from_handle_if_valid (mp->handle)))
532     {
533       clib_warning ("could not disconnect handle %llu", mp->handle);
534       return;
535     }
536   app_wrk = app_worker_get (s->app_wrk_index);
537   app = application_lookup (mp->client_index);
538   if (!(app_wrk && app && app->app_index == app_wrk->app_index))
539     {
540       clib_warning ("could not disconnect session: %llu app: %u",
541                     mp->handle, mp->client_index);
542       return;
543     }
544
545   a->handle = mp->handle;
546   a->app_index = app_wrk->wrk_index;
547   rv = vnet_disconnect_session (a);
548
549   svm_msg_q_lock_and_alloc_msg_w_ring (app_wrk->event_queue,
550                                        SESSION_MQ_CTRL_EVT_RING,
551                                        SVM_Q_WAIT, msg);
552   evt = svm_msg_q_msg_data (app_wrk->event_queue, msg);
553   clib_memset (evt, 0, sizeof (*evt));
554   evt->event_type = SESSION_CTRL_EVT_DISCONNECTED_REPLY;
555   rmp = (session_disconnected_reply_msg_t *) evt->data;
556   rmp->handle = mp->handle;
557   rmp->context = mp->context;
558   rmp->retval = rv;
559   svm_msg_q_add_and_unlock (app_wrk->event_queue, msg);
560 }
561
562 static void
563 session_mq_disconnected_reply_handler (void *data)
564 {
565   session_disconnected_reply_msg_t *mp;
566   vnet_disconnect_args_t _a, *a = &_a;
567   application_t *app;
568
569   mp = (session_disconnected_reply_msg_t *) data;
570
571   /* Client objected to disconnecting the session, log and continue */
572   if (mp->retval)
573     {
574       clib_warning ("client retval %d", mp->retval);
575       return;
576     }
577
578   /* Disconnect has been confirmed. Confirm close to transport */
579   app = application_lookup (mp->context);
580   if (app)
581     {
582       a->handle = mp->handle;
583       a->app_index = app->app_index;
584       vnet_disconnect_session (a);
585     }
586 }
587
588 static void
589 session_mq_worker_update_handler (void *data)
590 {
591   session_worker_update_msg_t *mp = (session_worker_update_msg_t *) data;
592   session_worker_update_reply_msg_t *rmp;
593   svm_msg_q_msg_t _msg, *msg = &_msg;
594   app_worker_t *app_wrk;
595   u32 owner_app_wrk_map;
596   session_event_t *evt;
597   session_t *s;
598   application_t *app;
599
600   app = application_lookup (mp->client_index);
601   if (!app)
602     return;
603   if (!(s = session_get_from_handle_if_valid (mp->handle)))
604     {
605       clib_warning ("invalid handle %llu", mp->handle);
606       return;
607     }
608   app_wrk = app_worker_get (s->app_wrk_index);
609   if (app_wrk->app_index != app->app_index)
610     {
611       clib_warning ("app %u does not own session %llu", app->app_index,
612                     mp->handle);
613       return;
614     }
615   owner_app_wrk_map = app_wrk->wrk_map_index;
616   app_wrk = application_get_worker (app, mp->wrk_index);
617
618   /* This needs to come from the new owner */
619   if (mp->req_wrk_index == owner_app_wrk_map)
620     {
621       session_req_worker_update_msg_t *wump;
622
623       svm_msg_q_lock_and_alloc_msg_w_ring (app_wrk->event_queue,
624                                            SESSION_MQ_CTRL_EVT_RING,
625                                            SVM_Q_WAIT, msg);
626       evt = svm_msg_q_msg_data (app_wrk->event_queue, msg);
627       clib_memset (evt, 0, sizeof (*evt));
628       evt->event_type = SESSION_CTRL_EVT_REQ_WORKER_UPDATE;
629       wump = (session_req_worker_update_msg_t *) evt->data;
630       wump->session_handle = mp->handle;
631       svm_msg_q_add_and_unlock (app_wrk->event_queue, msg);
632       return;
633     }
634
635   app_worker_own_session (app_wrk, s);
636
637   /*
638    * Send reply
639    */
640   svm_msg_q_lock_and_alloc_msg_w_ring (app_wrk->event_queue,
641                                        SESSION_MQ_CTRL_EVT_RING,
642                                        SVM_Q_WAIT, msg);
643   evt = svm_msg_q_msg_data (app_wrk->event_queue, msg);
644   clib_memset (evt, 0, sizeof (*evt));
645   evt->event_type = SESSION_CTRL_EVT_WORKER_UPDATE_REPLY;
646   rmp = (session_worker_update_reply_msg_t *) evt->data;
647   rmp->handle = mp->handle;
648   if (s->rx_fifo)
649     rmp->rx_fifo = fifo_segment_fifo_offset (s->rx_fifo);
650   if (s->tx_fifo)
651     rmp->tx_fifo = fifo_segment_fifo_offset (s->tx_fifo);
652   rmp->segment_handle = session_segment_handle (s);
653   svm_msg_q_add_and_unlock (app_wrk->event_queue, msg);
654
655   /*
656    * Retransmit messages that may have been lost
657    */
658   if (s->tx_fifo && !svm_fifo_is_empty (s->tx_fifo))
659     session_send_io_evt_to_thread (s->tx_fifo, SESSION_IO_EVT_TX);
660
661   if (s->rx_fifo && !svm_fifo_is_empty (s->rx_fifo))
662     app_worker_lock_and_send_event (app_wrk, s, SESSION_IO_EVT_RX);
663
664   if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSING)
665     app_worker_close_notify (app_wrk, s);
666 }
667
668 static void
669 session_mq_app_wrk_rpc_handler (void *data)
670 {
671   session_app_wrk_rpc_msg_t *mp = (session_app_wrk_rpc_msg_t *) data;
672   svm_msg_q_msg_t _msg, *msg = &_msg;
673   session_app_wrk_rpc_msg_t *rmp;
674   app_worker_t *app_wrk;
675   session_event_t *evt;
676   application_t *app;
677
678   app = application_lookup (mp->client_index);
679   if (!app)
680     return;
681
682   app_wrk = application_get_worker (app, mp->wrk_index);
683
684   svm_msg_q_lock_and_alloc_msg_w_ring (app_wrk->event_queue,
685                                        SESSION_MQ_CTRL_EVT_RING, SVM_Q_WAIT,
686                                        msg);
687   evt = svm_msg_q_msg_data (app_wrk->event_queue, msg);
688   clib_memset (evt, 0, sizeof (*evt));
689   evt->event_type = SESSION_CTRL_EVT_APP_WRK_RPC;
690   rmp = (session_app_wrk_rpc_msg_t *) evt->data;
691   clib_memcpy (rmp->data, mp->data, sizeof (mp->data));
692   svm_msg_q_add_and_unlock (app_wrk->event_queue, msg);
693 }
694
695 static void
696 session_mq_transport_attr_handler (void *data)
697 {
698   session_transport_attr_msg_t *mp = (session_transport_attr_msg_t *) data;
699   session_transport_attr_reply_msg_t *rmp;
700   svm_msg_q_msg_t _msg, *msg = &_msg;
701   app_worker_t *app_wrk;
702   session_event_t *evt;
703   application_t *app;
704   session_t *s;
705   int rv;
706
707   app = application_lookup (mp->client_index);
708   if (!app)
709     return;
710
711   if (!(s = session_get_from_handle_if_valid (mp->handle)))
712     {
713       clib_warning ("invalid handle %llu", mp->handle);
714       return;
715     }
716   app_wrk = app_worker_get (s->app_wrk_index);
717   if (app_wrk->app_index != app->app_index)
718     {
719       clib_warning ("app %u does not own session %llu", app->app_index,
720                     mp->handle);
721       return;
722     }
723
724   rv = session_transport_attribute (s, mp->is_get, &mp->attr);
725
726   svm_msg_q_lock_and_alloc_msg_w_ring (
727     app_wrk->event_queue, SESSION_MQ_CTRL_EVT_RING, SVM_Q_WAIT, msg);
728   evt = svm_msg_q_msg_data (app_wrk->event_queue, msg);
729   clib_memset (evt, 0, sizeof (*evt));
730   evt->event_type = SESSION_CTRL_EVT_TRANSPORT_ATTR_REPLY;
731   rmp = (session_transport_attr_reply_msg_t *) evt->data;
732   rmp->handle = mp->handle;
733   rmp->retval = rv;
734   rmp->is_get = mp->is_get;
735   if (!rv && mp->is_get)
736     rmp->attr = mp->attr;
737   svm_msg_q_add_and_unlock (app_wrk->event_queue, msg);
738 }
739
740 vlib_node_registration_t session_queue_node;
741
742 typedef struct
743 {
744   u32 session_index;
745   u32 server_thread_index;
746 } session_queue_trace_t;
747
748 /* packet trace format function */
749 static u8 *
750 format_session_queue_trace (u8 * s, va_list * args)
751 {
752   CLIB_UNUSED (vlib_main_t * vm) = va_arg (*args, vlib_main_t *);
753   CLIB_UNUSED (vlib_node_t * node) = va_arg (*args, vlib_node_t *);
754   session_queue_trace_t *t = va_arg (*args, session_queue_trace_t *);
755
756   s = format (s, "session index %d thread index %d",
757               t->session_index, t->server_thread_index);
758   return s;
759 }
760
761 #define foreach_session_queue_error             \
762 _(TX, "Packets transmitted")                    \
763 _(TIMER, "Timer events")                        \
764 _(NO_BUFFER, "Out of buffers")
765
766 typedef enum
767 {
768 #define _(sym,str) SESSION_QUEUE_ERROR_##sym,
769   foreach_session_queue_error
770 #undef _
771     SESSION_QUEUE_N_ERROR,
772 } session_queue_error_t;
773
774 static char *session_queue_error_strings[] = {
775 #define _(sym,string) string,
776   foreach_session_queue_error
777 #undef _
778 };
779
780 enum
781 {
782   SESSION_TX_NO_BUFFERS = -2,
783   SESSION_TX_NO_DATA,
784   SESSION_TX_OK
785 };
786
787 static void
788 session_tx_trace_frame (vlib_main_t * vm, vlib_node_runtime_t * node,
789                         u32 next_index, u32 * to_next, u16 n_segs,
790                         session_t * s, u32 n_trace)
791 {
792   while (n_trace && n_segs)
793     {
794       vlib_buffer_t *b = vlib_get_buffer (vm, to_next[0]);
795       if (PREDICT_TRUE
796           (vlib_trace_buffer
797            (vm, node, next_index, b, 1 /* follow_chain */ )))
798         {
799           session_queue_trace_t *t =
800             vlib_add_trace (vm, node, b, sizeof (*t));
801           t->session_index = s->session_index;
802           t->server_thread_index = s->thread_index;
803           n_trace--;
804         }
805       to_next++;
806       n_segs--;
807     }
808   vlib_set_trace_count (vm, node, n_trace);
809 }
810
811 always_inline void
812 session_tx_fifo_chain_tail (vlib_main_t * vm, session_tx_context_t * ctx,
813                             vlib_buffer_t * b, u16 * n_bufs, u8 peek_data)
814 {
815   vlib_buffer_t *chain_b, *prev_b;
816   u32 chain_bi0, to_deq, left_from_seg;
817   u16 len_to_deq, n_bytes_read;
818   u8 *data, j;
819
820   b->flags |= VLIB_BUFFER_TOTAL_LENGTH_VALID;
821   b->total_length_not_including_first_buffer = 0;
822
823   chain_b = b;
824   left_from_seg = clib_min (ctx->sp.snd_mss - b->current_length,
825                             ctx->left_to_snd);
826   to_deq = left_from_seg;
827   for (j = 1; j < ctx->n_bufs_per_seg; j++)
828     {
829       prev_b = chain_b;
830       len_to_deq = clib_min (to_deq, ctx->deq_per_buf);
831
832       *n_bufs -= 1;
833       chain_bi0 = ctx->tx_buffers[*n_bufs];
834       chain_b = vlib_get_buffer (vm, chain_bi0);
835       chain_b->current_data = 0;
836       data = vlib_buffer_get_current (chain_b);
837       if (peek_data)
838         {
839           n_bytes_read = svm_fifo_peek (ctx->s->tx_fifo,
840                                         ctx->sp.tx_offset, len_to_deq, data);
841           ctx->sp.tx_offset += n_bytes_read;
842         }
843       else
844         {
845           if (ctx->transport_vft->transport_options.tx_type ==
846               TRANSPORT_TX_DGRAM)
847             {
848               svm_fifo_t *f = ctx->s->tx_fifo;
849               session_dgram_hdr_t *hdr = &ctx->hdr;
850               u16 deq_now;
851               u32 offset;
852
853               deq_now = clib_min (hdr->data_length - hdr->data_offset,
854                                   len_to_deq);
855               offset = hdr->data_offset + SESSION_CONN_HDR_LEN;
856               n_bytes_read = svm_fifo_peek (f, offset, deq_now, data);
857               ASSERT (n_bytes_read > 0);
858
859               hdr->data_offset += n_bytes_read;
860               if (hdr->data_offset == hdr->data_length)
861                 {
862                   offset = hdr->data_length + SESSION_CONN_HDR_LEN;
863                   svm_fifo_dequeue_drop (f, offset);
864                   if (ctx->left_to_snd > n_bytes_read)
865                     svm_fifo_peek (ctx->s->tx_fifo, 0, sizeof (ctx->hdr),
866                                    (u8 *) & ctx->hdr);
867                 }
868               else if (ctx->left_to_snd == n_bytes_read)
869                 svm_fifo_overwrite_head (ctx->s->tx_fifo, (u8 *) & ctx->hdr,
870                                          sizeof (session_dgram_pre_hdr_t));
871             }
872           else
873             n_bytes_read = svm_fifo_dequeue (ctx->s->tx_fifo,
874                                              len_to_deq, data);
875         }
876       ASSERT (n_bytes_read == len_to_deq);
877       chain_b->current_length = n_bytes_read;
878       b->total_length_not_including_first_buffer += chain_b->current_length;
879
880       /* update previous buffer */
881       prev_b->next_buffer = chain_bi0;
882       prev_b->flags |= VLIB_BUFFER_NEXT_PRESENT;
883
884       /* update current buffer */
885       chain_b->next_buffer = 0;
886
887       to_deq -= n_bytes_read;
888       if (to_deq == 0)
889         break;
890     }
891   ASSERT (to_deq == 0
892           && b->total_length_not_including_first_buffer == left_from_seg);
893   ctx->left_to_snd -= left_from_seg;
894 }
895
896 always_inline void
897 session_tx_fill_buffer (vlib_main_t * vm, session_tx_context_t * ctx,
898                         vlib_buffer_t * b, u16 * n_bufs, u8 peek_data)
899 {
900   u32 len_to_deq;
901   u8 *data0;
902   int n_bytes_read;
903
904   /*
905    * Start with the first buffer in chain
906    */
907   b->error = 0;
908   b->flags = VNET_BUFFER_F_LOCALLY_ORIGINATED;
909   b->current_data = 0;
910
911   data0 = vlib_buffer_make_headroom (b, TRANSPORT_MAX_HDRS_LEN);
912   len_to_deq = clib_min (ctx->left_to_snd, ctx->deq_per_first_buf);
913
914   if (peek_data)
915     {
916       n_bytes_read = svm_fifo_peek (ctx->s->tx_fifo, ctx->sp.tx_offset,
917                                     len_to_deq, data0);
918       ASSERT (n_bytes_read > 0);
919       /* Keep track of progress locally, transport is also supposed to
920        * increment it independently when pushing the header */
921       ctx->sp.tx_offset += n_bytes_read;
922     }
923   else
924     {
925       if (ctx->transport_vft->transport_options.tx_type == TRANSPORT_TX_DGRAM)
926         {
927           session_dgram_hdr_t *hdr = &ctx->hdr;
928           svm_fifo_t *f = ctx->s->tx_fifo;
929           u16 deq_now;
930           u32 offset;
931
932           ASSERT (hdr->data_length > hdr->data_offset);
933           deq_now = clib_min (hdr->data_length - hdr->data_offset,
934                               len_to_deq);
935           offset = hdr->data_offset + SESSION_CONN_HDR_LEN;
936           n_bytes_read = svm_fifo_peek (f, offset, deq_now, data0);
937           ASSERT (n_bytes_read > 0);
938
939           if (ctx->s->session_state == SESSION_STATE_LISTENING)
940             {
941               ip_copy (&ctx->tc->rmt_ip, &hdr->rmt_ip, ctx->tc->is_ip4);
942               ctx->tc->rmt_port = hdr->rmt_port;
943             }
944           hdr->data_offset += n_bytes_read;
945           if (hdr->data_offset == hdr->data_length)
946             {
947               offset = hdr->data_length + SESSION_CONN_HDR_LEN;
948               svm_fifo_dequeue_drop (f, offset);
949               if (ctx->left_to_snd > n_bytes_read)
950                 svm_fifo_peek (ctx->s->tx_fifo, 0, sizeof (ctx->hdr),
951                                (u8 *) & ctx->hdr);
952             }
953           else if (ctx->left_to_snd == n_bytes_read)
954             svm_fifo_overwrite_head (ctx->s->tx_fifo, (u8 *) & ctx->hdr,
955                                      sizeof (session_dgram_pre_hdr_t));
956         }
957       else
958         {
959           n_bytes_read = svm_fifo_dequeue (ctx->s->tx_fifo,
960                                            len_to_deq, data0);
961           ASSERT (n_bytes_read > 0);
962         }
963     }
964   b->current_length = n_bytes_read;
965   ctx->left_to_snd -= n_bytes_read;
966
967   /*
968    * Fill in the remaining buffers in the chain, if any
969    */
970   if (PREDICT_FALSE (ctx->n_bufs_per_seg > 1 && ctx->left_to_snd))
971     session_tx_fifo_chain_tail (vm, ctx, b, n_bufs, peek_data);
972 }
973
974 always_inline u8
975 session_tx_not_ready (session_t * s, u8 peek_data)
976 {
977   if (peek_data)
978     {
979       if (PREDICT_TRUE (s->session_state == SESSION_STATE_READY))
980         return 0;
981       /* Can retransmit for closed sessions but can't send new data if
982        * session is not ready or closed */
983       else if (s->session_state < SESSION_STATE_READY)
984         return 1;
985       else if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSED)
986         {
987           /* Allow closed transports to still send custom packets.
988            * For instance, tcp may want to send acks in time-wait. */
989           if (s->session_state != SESSION_STATE_TRANSPORT_DELETED
990               && (s->flags & SESSION_F_CUSTOM_TX))
991             return 0;
992           return 2;
993         }
994     }
995   return 0;
996 }
997
998 always_inline transport_connection_t *
999 session_tx_get_transport (session_tx_context_t * ctx, u8 peek_data)
1000 {
1001   if (peek_data)
1002     {
1003       return ctx->transport_vft->get_connection (ctx->s->connection_index,
1004                                                  ctx->s->thread_index);
1005     }
1006   else
1007     {
1008       if (ctx->s->session_state == SESSION_STATE_LISTENING)
1009         return ctx->transport_vft->get_listener (ctx->s->connection_index);
1010       else
1011         {
1012           return ctx->transport_vft->get_connection (ctx->s->connection_index,
1013                                                      ctx->s->thread_index);
1014         }
1015     }
1016 }
1017
1018 always_inline void
1019 session_tx_set_dequeue_params (vlib_main_t * vm, session_tx_context_t * ctx,
1020                                u32 max_segs, u8 peek_data)
1021 {
1022   u32 n_bytes_per_buf, n_bytes_per_seg;
1023
1024   n_bytes_per_buf = vlib_buffer_get_default_data_size (vm);
1025   ctx->max_dequeue = svm_fifo_max_dequeue_cons (ctx->s->tx_fifo);
1026
1027   if (peek_data)
1028     {
1029       /* Offset in rx fifo from where to peek data */
1030       if (PREDICT_FALSE (ctx->sp.tx_offset >= ctx->max_dequeue))
1031         {
1032           ctx->max_len_to_snd = 0;
1033           return;
1034         }
1035       ctx->max_dequeue -= ctx->sp.tx_offset;
1036     }
1037   else
1038     {
1039       if (ctx->transport_vft->transport_options.tx_type == TRANSPORT_TX_DGRAM)
1040         {
1041           u32 len, chain_limit;
1042
1043           if (ctx->max_dequeue <= sizeof (ctx->hdr))
1044             {
1045               ctx->max_len_to_snd = 0;
1046               return;
1047             }
1048
1049           svm_fifo_peek (ctx->s->tx_fifo, 0, sizeof (ctx->hdr),
1050                          (u8 *) & ctx->hdr);
1051           ASSERT (ctx->hdr.data_length > ctx->hdr.data_offset);
1052           len = ctx->hdr.data_length - ctx->hdr.data_offset;
1053
1054           /* Process multiple dgrams if smaller than min (buf_space, mss).
1055            * This avoids handling multiple dgrams if they require buffer
1056            * chains */
1057           chain_limit = clib_min (n_bytes_per_buf - TRANSPORT_MAX_HDRS_LEN,
1058                                   ctx->sp.snd_mss);
1059           if (ctx->hdr.data_length <= chain_limit)
1060             {
1061               u32 first_dgram_len, dgram_len, offset, max_offset;
1062               session_dgram_hdr_t hdr;
1063
1064               ctx->sp.snd_mss = clib_min (ctx->sp.snd_mss, len);
1065               offset = ctx->hdr.data_length + sizeof (session_dgram_hdr_t);
1066               first_dgram_len = len;
1067               max_offset = clib_min (ctx->max_dequeue, 16 << 10);
1068
1069               while (offset < max_offset)
1070                 {
1071                   svm_fifo_peek (ctx->s->tx_fifo, offset, sizeof (ctx->hdr),
1072                                  (u8 *) & hdr);
1073                   ASSERT (hdr.data_length > hdr.data_offset);
1074                   dgram_len = hdr.data_length - hdr.data_offset;
1075                   if (len + dgram_len > ctx->max_dequeue
1076                       || first_dgram_len != dgram_len)
1077                     break;
1078                   len += dgram_len;
1079                   offset += sizeof (hdr) + hdr.data_length;
1080                 }
1081             }
1082
1083           ctx->max_dequeue = len;
1084         }
1085     }
1086   ASSERT (ctx->max_dequeue > 0);
1087
1088   /* Ensure we're not writing more than transport window allows */
1089   if (ctx->max_dequeue < ctx->sp.snd_space)
1090     {
1091       /* Constrained by tx queue. Try to send only fully formed segments */
1092       ctx->max_len_to_snd = (ctx->max_dequeue > ctx->sp.snd_mss) ?
1093         (ctx->max_dequeue - (ctx->max_dequeue % ctx->sp.snd_mss)) :
1094         ctx->max_dequeue;
1095       /* TODO Nagle ? */
1096     }
1097   else
1098     {
1099       /* Expectation is that snd_space0 is already a multiple of snd_mss */
1100       ctx->max_len_to_snd = ctx->sp.snd_space;
1101     }
1102
1103   /* Check if we're tx constrained by the node */
1104   ctx->n_segs_per_evt = ceil ((f64) ctx->max_len_to_snd / ctx->sp.snd_mss);
1105   if (ctx->n_segs_per_evt > max_segs)
1106     {
1107       ctx->n_segs_per_evt = max_segs;
1108       ctx->max_len_to_snd = max_segs * ctx->sp.snd_mss;
1109     }
1110
1111   ASSERT (n_bytes_per_buf > TRANSPORT_MAX_HDRS_LEN);
1112   if (ctx->n_segs_per_evt > 1)
1113     {
1114       u32 n_bytes_last_seg, n_bufs_last_seg;
1115
1116       n_bytes_per_seg = TRANSPORT_MAX_HDRS_LEN + ctx->sp.snd_mss;
1117       n_bytes_last_seg = TRANSPORT_MAX_HDRS_LEN + ctx->max_len_to_snd
1118         - ((ctx->n_segs_per_evt - 1) * ctx->sp.snd_mss);
1119       ctx->n_bufs_per_seg = ceil ((f64) n_bytes_per_seg / n_bytes_per_buf);
1120       n_bufs_last_seg = ceil ((f64) n_bytes_last_seg / n_bytes_per_buf);
1121       ctx->n_bufs_needed = ((ctx->n_segs_per_evt - 1) * ctx->n_bufs_per_seg)
1122         + n_bufs_last_seg;
1123     }
1124   else
1125     {
1126       n_bytes_per_seg = TRANSPORT_MAX_HDRS_LEN + ctx->max_len_to_snd;
1127       ctx->n_bufs_per_seg = ceil ((f64) n_bytes_per_seg / n_bytes_per_buf);
1128       ctx->n_bufs_needed = ctx->n_bufs_per_seg;
1129     }
1130
1131   ctx->deq_per_buf = clib_min (ctx->sp.snd_mss, n_bytes_per_buf);
1132   ctx->deq_per_first_buf = clib_min (ctx->sp.snd_mss,
1133                                      n_bytes_per_buf -
1134                                      TRANSPORT_MAX_HDRS_LEN);
1135 }
1136
1137 always_inline void
1138 session_tx_maybe_reschedule (session_worker_t * wrk,
1139                              session_tx_context_t * ctx,
1140                              session_evt_elt_t * elt)
1141 {
1142   session_t *s = ctx->s;
1143
1144   svm_fifo_unset_event (s->tx_fifo);
1145   if (svm_fifo_max_dequeue_cons (s->tx_fifo) > ctx->sp.tx_offset)
1146     if (svm_fifo_set_event (s->tx_fifo))
1147       session_evt_add_head_old (wrk, elt);
1148 }
1149
1150 always_inline int
1151 session_tx_fifo_read_and_snd_i (session_worker_t * wrk,
1152                                 vlib_node_runtime_t * node,
1153                                 session_evt_elt_t * elt,
1154                                 int *n_tx_packets, u8 peek_data)
1155 {
1156   u32 n_trace, n_left, pbi, next_index, max_burst;
1157   session_tx_context_t *ctx = &wrk->ctx;
1158   session_main_t *smm = &session_main;
1159   session_event_t *e = &elt->evt;
1160   vlib_main_t *vm = wrk->vm;
1161   transport_proto_t tp;
1162   vlib_buffer_t *pb;
1163   u16 n_bufs, rv;
1164
1165   if (PREDICT_FALSE ((rv = session_tx_not_ready (ctx->s, peek_data))))
1166     {
1167       if (rv < 2)
1168         session_evt_add_old (wrk, elt);
1169       return SESSION_TX_NO_DATA;
1170     }
1171
1172   next_index = smm->session_type_to_next[ctx->s->session_type];
1173   max_burst = SESSION_NODE_FRAME_SIZE - *n_tx_packets;
1174
1175   tp = session_get_transport_proto (ctx->s);
1176   ctx->transport_vft = transport_protocol_get_vft (tp);
1177   ctx->tc = session_tx_get_transport (ctx, peek_data);
1178
1179   if (PREDICT_FALSE (e->event_type == SESSION_IO_EVT_TX_FLUSH))
1180     {
1181       if (ctx->transport_vft->flush_data)
1182         ctx->transport_vft->flush_data (ctx->tc);
1183       e->event_type = SESSION_IO_EVT_TX;
1184     }
1185
1186   if (ctx->s->flags & SESSION_F_CUSTOM_TX)
1187     {
1188       u32 n_custom_tx;
1189       ctx->s->flags &= ~SESSION_F_CUSTOM_TX;
1190       ctx->sp.max_burst_size = max_burst;
1191       n_custom_tx = ctx->transport_vft->custom_tx (ctx->tc, &ctx->sp);
1192       *n_tx_packets += n_custom_tx;
1193       if (PREDICT_FALSE
1194           (ctx->s->session_state >= SESSION_STATE_TRANSPORT_CLOSED))
1195         return SESSION_TX_OK;
1196       max_burst -= n_custom_tx;
1197       if (!max_burst || (ctx->s->flags & SESSION_F_CUSTOM_TX))
1198         {
1199           session_evt_add_old (wrk, elt);
1200           return SESSION_TX_OK;
1201         }
1202     }
1203
1204   transport_connection_snd_params (ctx->tc, &ctx->sp);
1205
1206   if (!ctx->sp.snd_space)
1207     {
1208       /* If the deschedule flag was set, remove session from scheduler.
1209        * Transport is responsible for rescheduling this session. */
1210       if (ctx->sp.flags & TRANSPORT_SND_F_DESCHED)
1211         transport_connection_deschedule (ctx->tc);
1212       /* Request to postpone the session, e.g., zero-wnd and transport
1213        * is not currently probing */
1214       else if (ctx->sp.flags & TRANSPORT_SND_F_POSTPONE)
1215         session_evt_add_old (wrk, elt);
1216       /* This flow queue is "empty" so it should be re-evaluated before
1217        * the ones that have data to send. */
1218       else
1219         session_evt_add_head_old (wrk, elt);
1220
1221       return SESSION_TX_NO_DATA;
1222     }
1223
1224   if (transport_connection_is_tx_paced (ctx->tc))
1225     {
1226       u32 snd_space = transport_connection_tx_pacer_burst (ctx->tc);
1227       if (snd_space < TRANSPORT_PACER_MIN_BURST)
1228         {
1229           session_evt_add_head_old (wrk, elt);
1230           return SESSION_TX_NO_DATA;
1231         }
1232       snd_space = clib_min (ctx->sp.snd_space, snd_space);
1233       ctx->sp.snd_space = snd_space >= ctx->sp.snd_mss ?
1234         snd_space - snd_space % ctx->sp.snd_mss : snd_space;
1235     }
1236
1237   /* Check how much we can pull. */
1238   session_tx_set_dequeue_params (vm, ctx, max_burst, peek_data);
1239
1240   if (PREDICT_FALSE (!ctx->max_len_to_snd))
1241     {
1242       transport_connection_tx_pacer_reset_bucket (ctx->tc, 0);
1243       session_tx_maybe_reschedule (wrk, ctx, elt);
1244       return SESSION_TX_NO_DATA;
1245     }
1246
1247   vec_validate_aligned (ctx->tx_buffers, ctx->n_bufs_needed - 1,
1248                         CLIB_CACHE_LINE_BYTES);
1249   n_bufs = vlib_buffer_alloc (vm, ctx->tx_buffers, ctx->n_bufs_needed);
1250   if (PREDICT_FALSE (n_bufs < ctx->n_bufs_needed))
1251     {
1252       if (n_bufs)
1253         vlib_buffer_free (vm, ctx->tx_buffers, n_bufs);
1254       session_evt_add_head_old (wrk, elt);
1255       vlib_node_increment_counter (wrk->vm, node->node_index,
1256                                    SESSION_QUEUE_ERROR_NO_BUFFER, 1);
1257       return SESSION_TX_NO_BUFFERS;
1258     }
1259
1260   if (transport_connection_is_tx_paced (ctx->tc))
1261     transport_connection_tx_pacer_update_bytes (ctx->tc, ctx->max_len_to_snd);
1262
1263   ctx->left_to_snd = ctx->max_len_to_snd;
1264   n_left = ctx->n_segs_per_evt;
1265
1266   while (n_left >= 4)
1267     {
1268       vlib_buffer_t *b0, *b1;
1269       u32 bi0, bi1;
1270
1271       pbi = ctx->tx_buffers[n_bufs - 3];
1272       pb = vlib_get_buffer (vm, pbi);
1273       vlib_prefetch_buffer_header (pb, STORE);
1274       pbi = ctx->tx_buffers[n_bufs - 4];
1275       pb = vlib_get_buffer (vm, pbi);
1276       vlib_prefetch_buffer_header (pb, STORE);
1277
1278       bi0 = ctx->tx_buffers[--n_bufs];
1279       bi1 = ctx->tx_buffers[--n_bufs];
1280
1281       b0 = vlib_get_buffer (vm, bi0);
1282       b1 = vlib_get_buffer (vm, bi1);
1283
1284       session_tx_fill_buffer (vm, ctx, b0, &n_bufs, peek_data);
1285       session_tx_fill_buffer (vm, ctx, b1, &n_bufs, peek_data);
1286
1287       ctx->transport_vft->push_header (ctx->tc, b0);
1288       ctx->transport_vft->push_header (ctx->tc, b1);
1289
1290       n_left -= 2;
1291
1292       vec_add1 (wrk->pending_tx_buffers, bi0);
1293       vec_add1 (wrk->pending_tx_buffers, bi1);
1294       vec_add1 (wrk->pending_tx_nexts, next_index);
1295       vec_add1 (wrk->pending_tx_nexts, next_index);
1296     }
1297   while (n_left)
1298     {
1299       vlib_buffer_t *b0;
1300       u32 bi0;
1301
1302       if (n_left > 1)
1303         {
1304           pbi = ctx->tx_buffers[n_bufs - 2];
1305           pb = vlib_get_buffer (vm, pbi);
1306           vlib_prefetch_buffer_header (pb, STORE);
1307         }
1308
1309       bi0 = ctx->tx_buffers[--n_bufs];
1310       b0 = vlib_get_buffer (vm, bi0);
1311       session_tx_fill_buffer (vm, ctx, b0, &n_bufs, peek_data);
1312
1313       /* Ask transport to push header after current_length and
1314        * total_length_not_including_first_buffer are updated */
1315       ctx->transport_vft->push_header (ctx->tc, b0);
1316
1317       n_left -= 1;
1318
1319       vec_add1 (wrk->pending_tx_buffers, bi0);
1320       vec_add1 (wrk->pending_tx_nexts, next_index);
1321     }
1322
1323   if (PREDICT_FALSE ((n_trace = vlib_get_trace_count (vm, node)) > 0))
1324     session_tx_trace_frame (vm, node, next_index, wrk->pending_tx_buffers,
1325                             ctx->n_segs_per_evt, ctx->s, n_trace);
1326
1327   if (PREDICT_FALSE (n_bufs))
1328     vlib_buffer_free (vm, ctx->tx_buffers, n_bufs);
1329
1330   *n_tx_packets += ctx->n_segs_per_evt;
1331
1332   SESSION_EVT (SESSION_EVT_DEQ, ctx->s, ctx->max_len_to_snd, ctx->max_dequeue,
1333                ctx->s->tx_fifo->has_event, wrk->last_vlib_time);
1334
1335   ASSERT (ctx->left_to_snd == 0);
1336
1337   /* If we couldn't dequeue all bytes reschedule as old flow. Otherwise,
1338    * check if application enqueued more data and reschedule accordingly */
1339   if (ctx->max_len_to_snd < ctx->max_dequeue)
1340     session_evt_add_old (wrk, elt);
1341   else
1342     session_tx_maybe_reschedule (wrk, ctx, elt);
1343
1344   if (!peek_data)
1345     {
1346       u32 n_dequeued = ctx->max_len_to_snd;
1347       if (ctx->transport_vft->transport_options.tx_type == TRANSPORT_TX_DGRAM)
1348         n_dequeued += ctx->n_segs_per_evt * SESSION_CONN_HDR_LEN;
1349       if (svm_fifo_needs_deq_ntf (ctx->s->tx_fifo, n_dequeued))
1350         session_dequeue_notify (ctx->s);
1351     }
1352   return SESSION_TX_OK;
1353 }
1354
1355 int
1356 session_tx_fifo_peek_and_snd (session_worker_t * wrk,
1357                               vlib_node_runtime_t * node,
1358                               session_evt_elt_t * e, int *n_tx_packets)
1359 {
1360   return session_tx_fifo_read_and_snd_i (wrk, node, e, n_tx_packets, 1);
1361 }
1362
1363 int
1364 session_tx_fifo_dequeue_and_snd (session_worker_t * wrk,
1365                                  vlib_node_runtime_t * node,
1366                                  session_evt_elt_t * e, int *n_tx_packets)
1367 {
1368   return session_tx_fifo_read_and_snd_i (wrk, node, e, n_tx_packets, 0);
1369 }
1370
1371 int
1372 session_tx_fifo_dequeue_internal (session_worker_t * wrk,
1373                                   vlib_node_runtime_t * node,
1374                                   session_evt_elt_t * elt, int *n_tx_packets)
1375 {
1376   transport_send_params_t *sp = &wrk->ctx.sp;
1377   session_t *s = wrk->ctx.s;
1378   u32 n_packets;
1379
1380   if (PREDICT_FALSE (s->session_state >= SESSION_STATE_TRANSPORT_CLOSED))
1381     return 0;
1382
1383   /* Clear custom-tx flag used to request reschedule for tx */
1384   s->flags &= ~SESSION_F_CUSTOM_TX;
1385
1386   sp->max_burst_size = clib_min (SESSION_NODE_FRAME_SIZE - *n_tx_packets,
1387                                  TRANSPORT_PACER_MAX_BURST_PKTS);
1388
1389   n_packets = transport_custom_tx (session_get_transport_proto (s), s, sp);
1390   *n_tx_packets += n_packets;
1391
1392   if (s->flags & SESSION_F_CUSTOM_TX)
1393     {
1394       session_evt_add_old (wrk, elt);
1395     }
1396   else if (!(sp->flags & TRANSPORT_SND_F_DESCHED))
1397     {
1398       svm_fifo_unset_event (s->tx_fifo);
1399       if (svm_fifo_max_dequeue_cons (s->tx_fifo))
1400         if (svm_fifo_set_event (s->tx_fifo))
1401           session_evt_add_head_old (wrk, elt);
1402     }
1403
1404   if (sp->max_burst_size &&
1405       svm_fifo_needs_deq_ntf (s->tx_fifo, sp->max_burst_size))
1406     session_dequeue_notify (s);
1407
1408   return n_packets;
1409 }
1410
1411 always_inline session_t *
1412 session_event_get_session (session_worker_t * wrk, session_event_t * e)
1413 {
1414   if (PREDICT_FALSE (pool_is_free_index (wrk->sessions, e->session_index)))
1415     return 0;
1416
1417   ASSERT (session_is_valid (e->session_index, wrk->vm->thread_index));
1418   return pool_elt_at_index (wrk->sessions, e->session_index);
1419 }
1420
1421 always_inline void
1422 session_event_dispatch_ctrl (session_worker_t * wrk, session_evt_elt_t * elt)
1423 {
1424   clib_llist_index_t ei;
1425   void (*fp) (void *);
1426   session_event_t *e;
1427   session_t *s;
1428
1429   ei = clib_llist_entry_index (wrk->event_elts, elt);
1430   e = &elt->evt;
1431
1432   switch (e->event_type)
1433     {
1434     case SESSION_CTRL_EVT_RPC:
1435       fp = e->rpc_args.fp;
1436       (*fp) (e->rpc_args.arg);
1437       break;
1438     case SESSION_CTRL_EVT_HALF_CLOSE:
1439       s = session_get_from_handle_if_valid (e->session_handle);
1440       if (PREDICT_FALSE (!s))
1441         break;
1442       session_transport_half_close (s);
1443       break;
1444     case SESSION_CTRL_EVT_CLOSE:
1445       s = session_get_from_handle_if_valid (e->session_handle);
1446       if (PREDICT_FALSE (!s))
1447         break;
1448       session_transport_close (s);
1449       break;
1450     case SESSION_CTRL_EVT_RESET:
1451       s = session_get_from_handle_if_valid (e->session_handle);
1452       if (PREDICT_FALSE (!s))
1453         break;
1454       session_transport_reset (s);
1455       break;
1456     case SESSION_CTRL_EVT_LISTEN:
1457       session_mq_listen_handler (session_evt_ctrl_data (wrk, elt));
1458       break;
1459     case SESSION_CTRL_EVT_LISTEN_URI:
1460       session_mq_listen_uri_handler (session_evt_ctrl_data (wrk, elt));
1461       break;
1462     case SESSION_CTRL_EVT_UNLISTEN:
1463       session_mq_unlisten_handler (session_evt_ctrl_data (wrk, elt));
1464       break;
1465     case SESSION_CTRL_EVT_CONNECT:
1466       session_mq_connect_handler (wrk, elt);
1467       break;
1468     case SESSION_CTRL_EVT_CONNECT_URI:
1469       session_mq_connect_uri_handler (session_evt_ctrl_data (wrk, elt));
1470       break;
1471     case SESSION_CTRL_EVT_SHUTDOWN:
1472       session_mq_shutdown_handler (session_evt_ctrl_data (wrk, elt));
1473       break;
1474     case SESSION_CTRL_EVT_DISCONNECT:
1475       session_mq_disconnect_handler (session_evt_ctrl_data (wrk, elt));
1476       break;
1477     case SESSION_CTRL_EVT_DISCONNECTED:
1478       session_mq_disconnected_handler (session_evt_ctrl_data (wrk, elt));
1479       break;
1480     case SESSION_CTRL_EVT_ACCEPTED_REPLY:
1481       session_mq_accepted_reply_handler (session_evt_ctrl_data (wrk, elt));
1482       break;
1483     case SESSION_CTRL_EVT_DISCONNECTED_REPLY:
1484       session_mq_disconnected_reply_handler (session_evt_ctrl_data (wrk,
1485                                                                     elt));
1486       break;
1487     case SESSION_CTRL_EVT_RESET_REPLY:
1488       session_mq_reset_reply_handler (session_evt_ctrl_data (wrk, elt));
1489       break;
1490     case SESSION_CTRL_EVT_WORKER_UPDATE:
1491       session_mq_worker_update_handler (session_evt_ctrl_data (wrk, elt));
1492       break;
1493     case SESSION_CTRL_EVT_APP_DETACH:
1494       app_mq_detach_handler (session_evt_ctrl_data (wrk, elt));
1495       break;
1496     case SESSION_CTRL_EVT_APP_WRK_RPC:
1497       session_mq_app_wrk_rpc_handler (session_evt_ctrl_data (wrk, elt));
1498       break;
1499     case SESSION_CTRL_EVT_TRANSPORT_ATTR:
1500       session_mq_transport_attr_handler (session_evt_ctrl_data (wrk, elt));
1501       break;
1502     default:
1503       clib_warning ("unhandled event type %d", e->event_type);
1504     }
1505
1506   /* Regrab elements in case pool moved */
1507   elt = clib_llist_elt (wrk->event_elts, ei);
1508   if (!clib_llist_elt_is_linked (elt, evt_list))
1509     {
1510       e = &elt->evt;
1511       if (e->event_type >= SESSION_CTRL_EVT_BOUND)
1512         session_evt_ctrl_data_free (wrk, elt);
1513       clib_llist_put (wrk->event_elts, elt);
1514     }
1515   SESSION_EVT (SESSION_EVT_COUNTS, CNT_CTRL_EVTS, 1, wrk);
1516 }
1517
1518 always_inline void
1519 session_event_dispatch_io (session_worker_t * wrk, vlib_node_runtime_t * node,
1520                            session_evt_elt_t * elt, int *n_tx_packets)
1521 {
1522   session_main_t *smm = &session_main;
1523   app_worker_t *app_wrk;
1524   clib_llist_index_t ei;
1525   session_event_t *e;
1526   session_t *s;
1527
1528   ei = clib_llist_entry_index (wrk->event_elts, elt);
1529   e = &elt->evt;
1530
1531   switch (e->event_type)
1532     {
1533     case SESSION_IO_EVT_TX_FLUSH:
1534     case SESSION_IO_EVT_TX:
1535       s = session_event_get_session (wrk, e);
1536       if (PREDICT_FALSE (!s))
1537         break;
1538       CLIB_PREFETCH (s->tx_fifo, 2 * CLIB_CACHE_LINE_BYTES, LOAD);
1539       wrk->ctx.s = s;
1540       /* Spray packets in per session type frames, since they go to
1541        * different nodes */
1542       (smm->session_tx_fns[s->session_type]) (wrk, node, elt, n_tx_packets);
1543       break;
1544     case SESSION_IO_EVT_RX:
1545       s = session_event_get_session (wrk, e);
1546       if (!s)
1547         break;
1548       transport_app_rx_evt (session_get_transport_proto (s),
1549                             s->connection_index, s->thread_index);
1550       break;
1551     case SESSION_IO_EVT_BUILTIN_RX:
1552       s = session_event_get_session (wrk, e);
1553       if (PREDICT_FALSE (!s || s->session_state >= SESSION_STATE_CLOSING))
1554         break;
1555       svm_fifo_unset_event (s->rx_fifo);
1556       app_wrk = app_worker_get (s->app_wrk_index);
1557       app_worker_builtin_rx (app_wrk, s);
1558       break;
1559     case SESSION_IO_EVT_BUILTIN_TX:
1560       s = session_get_from_handle_if_valid (e->session_handle);
1561       wrk->ctx.s = s;
1562       if (PREDICT_TRUE (s != 0))
1563         session_tx_fifo_dequeue_internal (wrk, node, elt, n_tx_packets);
1564       break;
1565     default:
1566       clib_warning ("unhandled event type %d", e->event_type);
1567     }
1568
1569   SESSION_EVT (SESSION_IO_EVT_COUNTS, e->event_type, 1, wrk);
1570
1571   /* Regrab elements in case pool moved */
1572   elt = clib_llist_elt (wrk->event_elts, ei);
1573   if (!clib_llist_elt_is_linked (elt, evt_list))
1574     clib_llist_put (wrk->event_elts, elt);
1575 }
1576
1577 /* *INDENT-OFF* */
1578 static const u32 session_evt_msg_sizes[] = {
1579 #define _(symc, sym)                                                    \
1580   [SESSION_CTRL_EVT_ ## symc] = sizeof (session_ ## sym ##_msg_t),
1581   foreach_session_ctrl_evt
1582 #undef _
1583 };
1584 /* *INDENT-ON* */
1585
1586 always_inline void
1587 session_evt_add_to_list (session_worker_t * wrk, session_event_t * evt)
1588 {
1589   session_evt_elt_t *elt;
1590
1591   if (evt->event_type >= SESSION_CTRL_EVT_RPC)
1592     {
1593       elt = session_evt_alloc_ctrl (wrk);
1594       if (evt->event_type >= SESSION_CTRL_EVT_BOUND)
1595         {
1596           elt->evt.ctrl_data_index = session_evt_ctrl_data_alloc (wrk);
1597           elt->evt.event_type = evt->event_type;
1598           clib_memcpy_fast (session_evt_ctrl_data (wrk, elt), evt->data,
1599                             session_evt_msg_sizes[evt->event_type]);
1600         }
1601       else
1602         {
1603           /* Internal control events fit into io events footprint */
1604           clib_memcpy_fast (&elt->evt, evt, sizeof (elt->evt));
1605         }
1606     }
1607   else
1608     {
1609       elt = session_evt_alloc_new (wrk);
1610       clib_memcpy_fast (&elt->evt, evt, sizeof (elt->evt));
1611     }
1612 }
1613
1614 static void
1615 session_flush_pending_tx_buffers (session_worker_t * wrk,
1616                                   vlib_node_runtime_t * node)
1617 {
1618   vlib_buffer_enqueue_to_next (wrk->vm, node, wrk->pending_tx_buffers,
1619                                wrk->pending_tx_nexts,
1620                                vec_len (wrk->pending_tx_nexts));
1621   vec_reset_length (wrk->pending_tx_buffers);
1622   vec_reset_length (wrk->pending_tx_nexts);
1623 }
1624
1625 int
1626 session_wrk_handle_mq (session_worker_t *wrk, svm_msg_q_t *mq)
1627 {
1628   svm_msg_q_msg_t _msg, *msg = &_msg;
1629   u32 i, n_to_dequeue = 0;
1630   session_event_t *evt;
1631
1632   n_to_dequeue = svm_msg_q_size (mq);
1633   for (i = 0; i < n_to_dequeue; i++)
1634     {
1635       svm_msg_q_sub_raw (mq, msg);
1636       evt = svm_msg_q_msg_data (mq, msg);
1637       session_evt_add_to_list (wrk, evt);
1638       svm_msg_q_free_msg (mq, msg);
1639     }
1640
1641   return n_to_dequeue;
1642 }
1643
1644 static void
1645 session_wrk_update_state (session_worker_t *wrk)
1646 {
1647   vlib_main_t *vm = wrk->vm;
1648
1649   if (wrk->state == SESSION_WRK_POLLING)
1650     {
1651       if (clib_llist_elts (wrk->event_elts) == 4 &&
1652           vlib_last_vectors_per_main_loop (vm) < 1)
1653         {
1654           session_wrk_set_state (wrk, SESSION_WRK_INTERRUPT);
1655           vlib_node_set_state (vm, session_queue_node.index,
1656                                VLIB_NODE_STATE_INTERRUPT);
1657         }
1658     }
1659   else if (wrk->state == SESSION_WRK_INTERRUPT)
1660     {
1661       if (clib_llist_elts (wrk->event_elts) > 4 ||
1662           vlib_last_vectors_per_main_loop (vm) > 1)
1663         {
1664           session_wrk_set_state (wrk, SESSION_WRK_POLLING);
1665           vlib_node_set_state (vm, session_queue_node.index,
1666                                VLIB_NODE_STATE_POLLING);
1667         }
1668       else if (PREDICT_FALSE (!pool_elts (wrk->sessions)))
1669         {
1670           session_wrk_set_state (wrk, SESSION_WRK_IDLE);
1671         }
1672     }
1673   else
1674     {
1675       if (clib_llist_elts (wrk->event_elts))
1676         {
1677           session_wrk_set_state (wrk, SESSION_WRK_INTERRUPT);
1678         }
1679     }
1680 }
1681
1682 static uword
1683 session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
1684                        vlib_frame_t * frame)
1685 {
1686   u32 thread_index = vm->thread_index, __clib_unused n_evts;
1687   session_evt_elt_t *elt, *ctrl_he, *new_he, *old_he;
1688   session_main_t *smm = vnet_get_session_main ();
1689   session_worker_t *wrk = &smm->wrk[thread_index];
1690   clib_llist_index_t ei, next_ei, old_ti;
1691   int n_tx_packets;
1692
1693   SESSION_EVT (SESSION_EVT_DISPATCH_START, wrk);
1694
1695   session_wrk_update_time (wrk, vlib_time_now (vm));
1696
1697   /*
1698    *  Update transport time
1699    */
1700   transport_update_time (wrk->last_vlib_time, thread_index);
1701   n_tx_packets = vec_len (wrk->pending_tx_buffers);
1702   SESSION_EVT (SESSION_EVT_DSP_CNTRS, UPDATE_TIME, wrk);
1703
1704   /*
1705    *  Dequeue new internal mq events
1706    */
1707
1708   n_evts = session_wrk_handle_mq (wrk, wrk->vpp_event_queue);
1709   SESSION_EVT (SESSION_EVT_DSP_CNTRS, MQ_DEQ, wrk, n_evts);
1710
1711   /*
1712    * Handle control events
1713    */
1714
1715   ei = wrk->ctrl_head;
1716   ctrl_he = clib_llist_elt (wrk->event_elts, ei);
1717   next_ei = clib_llist_next_index (ctrl_he, evt_list);
1718   old_ti = clib_llist_prev_index (ctrl_he, evt_list);
1719   while (ei != old_ti)
1720     {
1721       ei = next_ei;
1722       elt = clib_llist_elt (wrk->event_elts, next_ei);
1723       next_ei = clib_llist_next_index (elt, evt_list);
1724       clib_llist_remove (wrk->event_elts, evt_list, elt);
1725       session_event_dispatch_ctrl (wrk, elt);
1726     }
1727
1728   SESSION_EVT (SESSION_EVT_DSP_CNTRS, CTRL_EVTS, wrk);
1729
1730   /*
1731    * Handle the new io events.
1732    */
1733
1734   new_he = clib_llist_elt (wrk->event_elts, wrk->new_head);
1735   old_he = clib_llist_elt (wrk->event_elts, wrk->old_head);
1736   old_ti = clib_llist_prev_index (old_he, evt_list);
1737
1738   ei = clib_llist_next_index (new_he, evt_list);
1739   while (ei != wrk->new_head && n_tx_packets < SESSION_NODE_FRAME_SIZE)
1740     {
1741       elt = clib_llist_elt (wrk->event_elts, ei);
1742       ei = clib_llist_next_index (elt, evt_list);
1743       clib_llist_remove (wrk->event_elts, evt_list, elt);
1744       session_event_dispatch_io (wrk, node, elt, &n_tx_packets);
1745     }
1746
1747   SESSION_EVT (SESSION_EVT_DSP_CNTRS, NEW_IO_EVTS, wrk);
1748
1749   /*
1750    * Handle the old io events, if we had any prior to processing the new ones
1751    */
1752
1753   if (old_ti != wrk->old_head)
1754     {
1755       old_he = clib_llist_elt (wrk->event_elts, wrk->old_head);
1756       ei = clib_llist_next_index (old_he, evt_list);
1757
1758       while (n_tx_packets < SESSION_NODE_FRAME_SIZE)
1759         {
1760           elt = clib_llist_elt (wrk->event_elts, ei);
1761           next_ei = clib_llist_next_index (elt, evt_list);
1762           clib_llist_remove (wrk->event_elts, evt_list, elt);
1763
1764           session_event_dispatch_io (wrk, node, elt, &n_tx_packets);
1765
1766           if (ei == old_ti)
1767             break;
1768
1769           ei = next_ei;
1770         };
1771     }
1772
1773   SESSION_EVT (SESSION_EVT_DSP_CNTRS, OLD_IO_EVTS, wrk);
1774
1775   if (vec_len (wrk->pending_tx_buffers))
1776     session_flush_pending_tx_buffers (wrk, node);
1777
1778   vlib_node_increment_counter (vm, session_queue_node.index,
1779                                SESSION_QUEUE_ERROR_TX, n_tx_packets);
1780
1781   SESSION_EVT (SESSION_EVT_DISPATCH_END, wrk, n_tx_packets);
1782
1783   if (wrk->flags & SESSION_WRK_F_ADAPTIVE)
1784     session_wrk_update_state (wrk);
1785
1786   return n_tx_packets;
1787 }
1788
1789 /* *INDENT-OFF* */
1790 VLIB_REGISTER_NODE (session_queue_node) =
1791 {
1792   .function = session_queue_node_fn,
1793   .flags = VLIB_NODE_FLAG_TRACE_SUPPORTED,
1794   .name = "session-queue",
1795   .format_trace = format_session_queue_trace,
1796   .type = VLIB_NODE_TYPE_INPUT,
1797   .n_errors = ARRAY_LEN (session_queue_error_strings),
1798   .error_strings = session_queue_error_strings,
1799   .state = VLIB_NODE_STATE_DISABLED,
1800 };
1801 /* *INDENT-ON* */
1802
1803 static clib_error_t *
1804 session_wrk_tfd_read_ready (clib_file_t *cf)
1805 {
1806   session_worker_t *wrk = session_main_get_worker (cf->private_data);
1807   u64 buf;
1808   int rv;
1809
1810   vlib_node_set_interrupt_pending (wrk->vm, session_queue_node.index);
1811   rv = read (wrk->timerfd, &buf, sizeof (buf));
1812   if (rv < 0 && errno != EAGAIN)
1813     clib_unix_warning ("failed");
1814   return 0;
1815 }
1816
1817 static clib_error_t *
1818 session_wrk_tfd_write_ready (clib_file_t *cf)
1819 {
1820   return 0;
1821 }
1822
1823 void
1824 session_wrk_enable_adaptive_mode (session_worker_t *wrk)
1825 {
1826   u32 thread_index = wrk->vm->thread_index;
1827   clib_file_t template = { 0 };
1828
1829   if ((wrk->timerfd = timerfd_create (CLOCK_MONOTONIC, TFD_NONBLOCK)) < 0)
1830     clib_warning ("timerfd_create");
1831
1832   template.read_function = session_wrk_tfd_read_ready;
1833   template.write_function = session_wrk_tfd_write_ready;
1834   template.file_descriptor = wrk->timerfd;
1835   template.private_data = thread_index;
1836   template.polling_thread_index = thread_index;
1837   template.description = format (0, "session-wrk-tfd-%u", thread_index);
1838
1839   wrk->timerfd_file = clib_file_add (&file_main, &template);
1840   wrk->flags |= SESSION_WRK_F_ADAPTIVE;
1841 }
1842
1843 static clib_error_t *
1844 session_queue_exit (vlib_main_t * vm)
1845 {
1846   if (vlib_get_n_threads () < 2)
1847     return 0;
1848
1849   /*
1850    * Shut off (especially) worker-thread session nodes.
1851    * Otherwise, vpp can crash as the main thread unmaps the
1852    * API segment.
1853    */
1854   vlib_worker_thread_barrier_sync (vm);
1855   session_node_enable_disable (0 /* is_enable */ );
1856   vlib_worker_thread_barrier_release (vm);
1857   return 0;
1858 }
1859
1860 VLIB_MAIN_LOOP_EXIT_FUNCTION (session_queue_exit);
1861
1862 static uword
1863 session_queue_run_on_main (vlib_main_t * vm)
1864 {
1865   vlib_node_runtime_t *node;
1866
1867   node = vlib_node_get_runtime (vm, session_queue_node.index);
1868   return session_queue_node_fn (vm, node, 0);
1869 }
1870
1871 static uword
1872 session_queue_process (vlib_main_t * vm, vlib_node_runtime_t * rt,
1873                        vlib_frame_t * f)
1874 {
1875   uword *event_data = 0;
1876   f64 timeout = 1.0;
1877   uword event_type;
1878
1879   while (1)
1880     {
1881       vlib_process_wait_for_event_or_clock (vm, timeout);
1882       event_type = vlib_process_get_events (vm, (uword **) & event_data);
1883
1884       switch (event_type)
1885         {
1886         case SESSION_Q_PROCESS_RUN_ON_MAIN:
1887           /* Run session queue node on main thread */
1888           session_queue_run_on_main (vm);
1889           break;
1890         case SESSION_Q_PROCESS_STOP:
1891           vlib_node_set_state (vm, session_queue_process_node.index,
1892                                VLIB_NODE_STATE_DISABLED);
1893           timeout = 100000.0;
1894           break;
1895         case ~0:
1896           /* Timed out. Run on main to ensure all events are handled */
1897           session_queue_run_on_main (vm);
1898           break;
1899         }
1900       vec_reset_length (event_data);
1901     }
1902   return 0;
1903 }
1904
1905 /* *INDENT-OFF* */
1906 VLIB_REGISTER_NODE (session_queue_process_node) =
1907 {
1908   .function = session_queue_process,
1909   .type = VLIB_NODE_TYPE_PROCESS,
1910   .name = "session-queue-process",
1911   .state = VLIB_NODE_STATE_DISABLED,
1912 };
1913 /* *INDENT-ON* */
1914
1915 static_always_inline uword
1916 session_queue_pre_input_inline (vlib_main_t * vm, vlib_node_runtime_t * node,
1917                                 vlib_frame_t * frame)
1918 {
1919   session_main_t *sm = &session_main;
1920   if (!sm->wrk[0].vpp_event_queue)
1921     return 0;
1922   node = vlib_node_get_runtime (vm, session_queue_node.index);
1923   return session_queue_node_fn (vm, node, frame);
1924 }
1925
1926 /* *INDENT-OFF* */
1927 VLIB_REGISTER_NODE (session_queue_pre_input_node) =
1928 {
1929   .function = session_queue_pre_input_inline,
1930   .type = VLIB_NODE_TYPE_PRE_INPUT,
1931   .name = "session-queue-main",
1932   .state = VLIB_NODE_STATE_DISABLED,
1933 };
1934 /* *INDENT-ON* */
1935
1936 /*
1937  * fd.io coding-style-patch-verification: ON
1938  *
1939  * Local Variables:
1940  * eval: (c-set-style "gnu")
1941  * End:
1942  */