fa66119abe2766450ed42bcfbe7e7275e638e8da
[vpp.git] / src / vnet / session / session_node.c
1 /*
2  * Copyright (c) 2017-2019 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include <math.h>
17 #include <vlib/vlib.h>
18 #include <vnet/vnet.h>
19 #include <vppinfra/elog.h>
20 #include <vnet/session/transport.h>
21 #include <vnet/session/session.h>
22 #include <vnet/session/application.h>
23 #include <vnet/session/application_interface.h>
24 #include <vnet/session/application_local.h>
25 #include <vnet/session/session_debug.h>
26 #include <svm/queue.h>
27 #include <sys/timerfd.h>
28
29 #define app_check_thread_and_barrier(_fn, _arg)                         \
30   if (!vlib_thread_is_main_w_barrier ())                                \
31     {                                                                   \
32      vlib_rpc_call_main_thread (_fn, (u8 *) _arg, sizeof(*_arg));       \
33       return;                                                           \
34    }
35
36 static transport_endpt_ext_cfg_t *
37 session_mq_get_ext_config (application_t *app, uword offset)
38 {
39   svm_fifo_chunk_t *c;
40   fifo_segment_t *fs;
41
42   fs = application_get_rx_mqs_segment (app);
43   c = fs_chunk_ptr (fs->h, offset);
44   return (transport_endpt_ext_cfg_t *) c->data;
45 }
46
47 static void
48 session_mq_free_ext_config (application_t *app, uword offset)
49 {
50   svm_fifo_chunk_t *c;
51   fifo_segment_t *fs;
52
53   fs = application_get_rx_mqs_segment (app);
54   c = fs_chunk_ptr (fs->h, offset);
55   fifo_segment_collect_chunk (fs, 0 /* only one slice */, c);
56 }
57
58 static void
59 session_mq_listen_handler (void *data)
60 {
61   session_listen_msg_t *mp = (session_listen_msg_t *) data;
62   vnet_listen_args_t _a, *a = &_a;
63   app_worker_t *app_wrk;
64   application_t *app;
65   int rv;
66
67   app_check_thread_and_barrier (session_mq_listen_handler, mp);
68
69   app = application_lookup (mp->client_index);
70   if (!app)
71     return;
72
73   clib_memset (a, 0, sizeof (*a));
74   a->sep.is_ip4 = mp->is_ip4;
75   ip_copy (&a->sep.ip, &mp->ip, mp->is_ip4);
76   a->sep.port = mp->port;
77   a->sep.fib_index = mp->vrf;
78   a->sep.sw_if_index = ENDPOINT_INVALID_INDEX;
79   a->sep.transport_proto = mp->proto;
80   a->app_index = app->app_index;
81   a->wrk_map_index = mp->wrk_index;
82   a->sep_ext.transport_flags = mp->flags;
83
84   if (mp->ext_config)
85     a->sep_ext.ext_cfg = session_mq_get_ext_config (app, mp->ext_config);
86
87   if ((rv = vnet_listen (a)))
88     clib_warning ("listen returned: %U", format_session_error, rv);
89
90   app_wrk = application_get_worker (app, mp->wrk_index);
91   mq_send_session_bound_cb (app_wrk->wrk_index, mp->context, a->handle, rv);
92
93   if (mp->ext_config)
94     session_mq_free_ext_config (app, mp->ext_config);
95 }
96
97 static void
98 session_mq_listen_uri_handler (void *data)
99 {
100   session_listen_uri_msg_t *mp = (session_listen_uri_msg_t *) data;
101   vnet_listen_args_t _a, *a = &_a;
102   app_worker_t *app_wrk;
103   application_t *app;
104   int rv;
105
106   app_check_thread_and_barrier (session_mq_listen_uri_handler, mp);
107
108   app = application_lookup (mp->client_index);
109   if (!app)
110     return;
111
112   clib_memset (a, 0, sizeof (*a));
113   a->uri = (char *) mp->uri;
114   a->app_index = app->app_index;
115   rv = vnet_bind_uri (a);
116
117   app_wrk = application_get_worker (app, 0);
118   mq_send_session_bound_cb (app_wrk->wrk_index, mp->context, a->handle, rv);
119 }
120
121 static void
122 session_mq_connect_one (session_connect_msg_t *mp)
123 {
124   vnet_connect_args_t _a, *a = &_a;
125   app_worker_t *app_wrk;
126   application_t *app;
127   int rv;
128
129   app = application_lookup (mp->client_index);
130   if (!app)
131     return;
132
133   clib_memset (a, 0, sizeof (*a));
134   a->sep.is_ip4 = mp->is_ip4;
135   clib_memcpy_fast (&a->sep.ip, &mp->ip, sizeof (mp->ip));
136   a->sep.port = mp->port;
137   a->sep.transport_proto = mp->proto;
138   a->sep.peer.fib_index = mp->vrf;
139   clib_memcpy_fast (&a->sep.peer.ip, &mp->lcl_ip, sizeof (mp->lcl_ip));
140   if (mp->is_ip4)
141     {
142       ip46_address_mask_ip4 (&a->sep.ip);
143       ip46_address_mask_ip4 (&a->sep.peer.ip);
144     }
145   a->sep.peer.port = mp->lcl_port;
146   a->sep.peer.sw_if_index = ENDPOINT_INVALID_INDEX;
147   a->sep_ext.parent_handle = mp->parent_handle;
148   a->sep_ext.transport_flags = mp->flags;
149   a->api_context = mp->context;
150   a->app_index = app->app_index;
151   a->wrk_map_index = mp->wrk_index;
152
153   if (mp->ext_config)
154     a->sep_ext.ext_cfg = session_mq_get_ext_config (app, mp->ext_config);
155
156   if ((rv = vnet_connect (a)))
157     {
158       clib_warning ("connect returned: %U", format_session_error, rv);
159       app_wrk = application_get_worker (app, mp->wrk_index);
160       mq_send_session_connected_cb (app_wrk->wrk_index, mp->context, 0, rv);
161     }
162
163   if (mp->ext_config)
164     session_mq_free_ext_config (app, mp->ext_config);
165 }
166
167 static void
168 session_mq_handle_connects_rpc (void *arg)
169 {
170   u32 max_connects = 32, n_connects = 0;
171   vlib_main_t *vm = vlib_get_main ();
172   session_evt_elt_t *he, *elt, *next;
173   session_worker_t *fwrk;
174   u8 need_reschedule = 1;
175
176   ASSERT (vlib_get_thread_index () == 0);
177
178   /* Pending connects on linked list pertaining to first worker */
179   fwrk = session_main_get_worker (1);
180
181   vlib_worker_thread_barrier_sync (vm);
182
183   he = pool_elt_at_index (fwrk->event_elts, fwrk->pending_connects);
184   elt = clib_llist_next (fwrk->event_elts, evt_list, he);
185
186   /* Avoid holding the barrier for too long */
187   while (n_connects < max_connects && elt != he)
188     {
189       next = clib_llist_next (fwrk->event_elts, evt_list, elt);
190       clib_llist_remove (fwrk->event_elts, evt_list, elt);
191       session_mq_connect_one (session_evt_ctrl_data (fwrk, elt));
192       session_evt_elt_free (fwrk, elt);
193       elt = next;
194       n_connects += 1;
195     }
196
197   if (clib_llist_is_empty (fwrk->event_elts, evt_list, he))
198     {
199       fwrk->pending_connects_ntf = 0;
200       need_reschedule = 0;
201     }
202
203   vlib_worker_thread_barrier_release (vm);
204
205   if (need_reschedule)
206     {
207       vlib_node_set_interrupt_pending (vm, session_queue_node.index);
208       elt = session_evt_alloc_ctrl (session_main_get_worker (0));
209       elt->evt.event_type = SESSION_CTRL_EVT_RPC;
210       elt->evt.rpc_args.fp = session_mq_handle_connects_rpc;
211     }
212 }
213
214 static void
215 session_mq_connect_handler (session_worker_t *wrk, session_evt_elt_t *elt)
216 {
217   u32 thread_index = wrk - session_main.wrk;
218   session_evt_elt_t *he;
219
220   /* No workers, so just deal with the connect now */
221   if (PREDICT_FALSE (!thread_index))
222     {
223       session_mq_connect_one (session_evt_ctrl_data (wrk, elt));
224       return;
225     }
226
227   if (PREDICT_FALSE (thread_index != 1))
228     {
229       clib_warning ("Connect on wrong thread. Dropping");
230       return;
231     }
232
233   /* Add to pending list to be handled by main thread */
234   he = pool_elt_at_index (wrk->event_elts, wrk->pending_connects);
235   clib_llist_add_tail (wrk->event_elts, evt_list, elt, he);
236
237   if (!wrk->pending_connects_ntf)
238     {
239       vlib_node_set_interrupt_pending (vlib_get_main_by_index (0),
240                                        session_queue_node.index);
241       session_send_rpc_evt_to_thread (0, session_mq_handle_connects_rpc, 0);
242       wrk->pending_connects_ntf = 1;
243     }
244 }
245
246 static void
247 session_mq_connect_uri_handler (void *data)
248 {
249   session_connect_uri_msg_t *mp = (session_connect_uri_msg_t *) data;
250   vnet_connect_args_t _a, *a = &_a;
251   app_worker_t *app_wrk;
252   application_t *app;
253   int rv;
254
255   app_check_thread_and_barrier (session_mq_connect_uri_handler, mp);
256
257   app = application_lookup (mp->client_index);
258   if (!app)
259     return;
260
261   clib_memset (a, 0, sizeof (*a));
262   a->uri = (char *) mp->uri;
263   a->api_context = mp->context;
264   a->app_index = app->app_index;
265   if ((rv = vnet_connect_uri (a)))
266     {
267       clib_warning ("connect_uri returned: %d", rv);
268       app_wrk = application_get_worker (app, 0 /* default wrk only */ );
269       mq_send_session_connected_cb (app_wrk->wrk_index, mp->context, 0, rv);
270     }
271 }
272
273 static void
274 session_mq_shutdown_handler (void *data)
275 {
276   session_shutdown_msg_t *mp = (session_shutdown_msg_t *) data;
277   vnet_shutdown_args_t _a, *a = &_a;
278   application_t *app;
279
280   app = application_lookup (mp->client_index);
281   if (!app)
282     return;
283
284   a->app_index = app->app_index;
285   a->handle = mp->handle;
286   vnet_shutdown_session (a);
287 }
288
289 static void
290 session_mq_disconnect_handler (void *data)
291 {
292   session_disconnect_msg_t *mp = (session_disconnect_msg_t *) data;
293   vnet_disconnect_args_t _a, *a = &_a;
294   application_t *app;
295
296   app = application_lookup (mp->client_index);
297   if (!app)
298     return;
299
300   a->app_index = app->app_index;
301   a->handle = mp->handle;
302   vnet_disconnect_session (a);
303 }
304
305 static void
306 app_mq_detach_handler (void *data)
307 {
308   session_app_detach_msg_t *mp = (session_app_detach_msg_t *) data;
309   vnet_app_detach_args_t _a, *a = &_a;
310   application_t *app;
311
312   app_check_thread_and_barrier (app_mq_detach_handler, mp);
313
314   app = application_lookup (mp->client_index);
315   if (!app)
316     return;
317
318   a->app_index = app->app_index;
319   a->api_client_index = mp->client_index;
320   vnet_application_detach (a);
321 }
322
323 static void
324 session_mq_unlisten_handler (void *data)
325 {
326   session_unlisten_msg_t *mp = (session_unlisten_msg_t *) data;
327   vnet_unlisten_args_t _a, *a = &_a;
328   app_worker_t *app_wrk;
329   application_t *app;
330   int rv;
331
332   app_check_thread_and_barrier (session_mq_unlisten_handler, mp);
333
334   app = application_lookup (mp->client_index);
335   if (!app)
336     return;
337
338   clib_memset (a, 0, sizeof (*a));
339   a->app_index = app->app_index;
340   a->handle = mp->handle;
341   a->wrk_map_index = mp->wrk_index;
342   if ((rv = vnet_unlisten (a)))
343     clib_warning ("unlisten returned: %d", rv);
344
345   app_wrk = application_get_worker (app, a->wrk_map_index);
346   if (!app_wrk)
347     return;
348
349   mq_send_unlisten_reply (app_wrk, mp->handle, mp->context, rv);
350 }
351
352 static void
353 session_mq_accepted_reply_handler (void *data)
354 {
355   session_accepted_reply_msg_t *mp = (session_accepted_reply_msg_t *) data;
356   vnet_disconnect_args_t _a = { 0 }, *a = &_a;
357   session_state_t old_state;
358   app_worker_t *app_wrk;
359   session_t *s;
360
361   /* Server isn't interested, kill the session */
362   if (mp->retval)
363     {
364       a->app_index = mp->context;
365       a->handle = mp->handle;
366       vnet_disconnect_session (a);
367       return;
368     }
369
370   /* Mail this back from the main thread. We're not polling in main
371    * thread so we're using other workers for notifications. */
372   if (vlib_num_workers () && vlib_get_thread_index () != 0
373       && session_thread_from_handle (mp->handle) == 0)
374     {
375       vlib_rpc_call_main_thread (session_mq_accepted_reply_handler,
376                                  (u8 *) mp, sizeof (*mp));
377       return;
378     }
379
380   s = session_get_from_handle_if_valid (mp->handle);
381   if (!s)
382     return;
383
384   app_wrk = app_worker_get (s->app_wrk_index);
385   if (app_wrk->app_index != mp->context)
386     {
387       clib_warning ("app doesn't own session");
388       return;
389     }
390
391   if (!session_has_transport (s))
392     {
393       s->session_state = SESSION_STATE_READY;
394       if (ct_session_connect_notify (s))
395         return;
396     }
397   else
398     {
399       old_state = s->session_state;
400       s->session_state = SESSION_STATE_READY;
401
402       if (!svm_fifo_is_empty_prod (s->rx_fifo))
403         app_worker_lock_and_send_event (app_wrk, s, SESSION_IO_EVT_RX);
404
405       /* Closed while waiting for app to reply. Resend disconnect */
406       if (old_state >= SESSION_STATE_TRANSPORT_CLOSING)
407         {
408           app_worker_close_notify (app_wrk, s);
409           s->session_state = old_state;
410           return;
411         }
412     }
413 }
414
415 static void
416 session_mq_reset_reply_handler (void *data)
417 {
418   vnet_disconnect_args_t _a = { 0 }, *a = &_a;
419   session_reset_reply_msg_t *mp;
420   app_worker_t *app_wrk;
421   session_t *s;
422   application_t *app;
423   u32 index, thread_index;
424
425   mp = (session_reset_reply_msg_t *) data;
426   app = application_lookup (mp->context);
427   if (!app)
428     return;
429
430   session_parse_handle (mp->handle, &index, &thread_index);
431   s = session_get_if_valid (index, thread_index);
432
433   /* No session or not the right session */
434   if (!s || s->session_state < SESSION_STATE_TRANSPORT_CLOSING)
435     return;
436
437   app_wrk = app_worker_get (s->app_wrk_index);
438   if (!app_wrk || app_wrk->app_index != app->app_index)
439     {
440       clib_warning ("App %u does not own handle 0x%lx!", app->app_index,
441                     mp->handle);
442       return;
443     }
444
445   /* Client objected to resetting the session, log and continue */
446   if (mp->retval)
447     {
448       clib_warning ("client retval %d", mp->retval);
449       return;
450     }
451
452   /* This comes as a response to a reset, transport only waiting for
453    * confirmation to remove connection state, no need to disconnect */
454   a->handle = mp->handle;
455   a->app_index = app->app_index;
456   vnet_disconnect_session (a);
457 }
458
459 static void
460 session_mq_disconnected_handler (void *data)
461 {
462   session_disconnected_reply_msg_t *rmp;
463   vnet_disconnect_args_t _a, *a = &_a;
464   svm_msg_q_msg_t _msg, *msg = &_msg;
465   session_disconnected_msg_t *mp;
466   app_worker_t *app_wrk;
467   session_event_t *evt;
468   session_t *s;
469   application_t *app;
470   int rv = 0;
471
472   mp = (session_disconnected_msg_t *) data;
473   if (!(s = session_get_from_handle_if_valid (mp->handle)))
474     {
475       clib_warning ("could not disconnect handle %llu", mp->handle);
476       return;
477     }
478   app_wrk = app_worker_get (s->app_wrk_index);
479   app = application_lookup (mp->client_index);
480   if (!(app_wrk && app && app->app_index == app_wrk->app_index))
481     {
482       clib_warning ("could not disconnect session: %llu app: %u",
483                     mp->handle, mp->client_index);
484       return;
485     }
486
487   a->handle = mp->handle;
488   a->app_index = app_wrk->wrk_index;
489   rv = vnet_disconnect_session (a);
490
491   svm_msg_q_lock_and_alloc_msg_w_ring (app_wrk->event_queue,
492                                        SESSION_MQ_CTRL_EVT_RING,
493                                        SVM_Q_WAIT, msg);
494   evt = svm_msg_q_msg_data (app_wrk->event_queue, msg);
495   clib_memset (evt, 0, sizeof (*evt));
496   evt->event_type = SESSION_CTRL_EVT_DISCONNECTED_REPLY;
497   rmp = (session_disconnected_reply_msg_t *) evt->data;
498   rmp->handle = mp->handle;
499   rmp->context = mp->context;
500   rmp->retval = rv;
501   svm_msg_q_add_and_unlock (app_wrk->event_queue, msg);
502 }
503
504 static void
505 session_mq_disconnected_reply_handler (void *data)
506 {
507   session_disconnected_reply_msg_t *mp;
508   vnet_disconnect_args_t _a, *a = &_a;
509   application_t *app;
510
511   mp = (session_disconnected_reply_msg_t *) data;
512
513   /* Client objected to disconnecting the session, log and continue */
514   if (mp->retval)
515     {
516       clib_warning ("client retval %d", mp->retval);
517       return;
518     }
519
520   /* Disconnect has been confirmed. Confirm close to transport */
521   app = application_lookup (mp->context);
522   if (app)
523     {
524       a->handle = mp->handle;
525       a->app_index = app->app_index;
526       vnet_disconnect_session (a);
527     }
528 }
529
530 static void
531 session_mq_worker_update_handler (void *data)
532 {
533   session_worker_update_msg_t *mp = (session_worker_update_msg_t *) data;
534   session_worker_update_reply_msg_t *rmp;
535   svm_msg_q_msg_t _msg, *msg = &_msg;
536   app_worker_t *app_wrk;
537   u32 owner_app_wrk_map;
538   session_event_t *evt;
539   session_t *s;
540   application_t *app;
541
542   app = application_lookup (mp->client_index);
543   if (!app)
544     return;
545   if (!(s = session_get_from_handle_if_valid (mp->handle)))
546     {
547       clib_warning ("invalid handle %llu", mp->handle);
548       return;
549     }
550   app_wrk = app_worker_get (s->app_wrk_index);
551   if (app_wrk->app_index != app->app_index)
552     {
553       clib_warning ("app %u does not own session %llu", app->app_index,
554                     mp->handle);
555       return;
556     }
557   owner_app_wrk_map = app_wrk->wrk_map_index;
558   app_wrk = application_get_worker (app, mp->wrk_index);
559
560   /* This needs to come from the new owner */
561   if (mp->req_wrk_index == owner_app_wrk_map)
562     {
563       session_req_worker_update_msg_t *wump;
564
565       svm_msg_q_lock_and_alloc_msg_w_ring (app_wrk->event_queue,
566                                            SESSION_MQ_CTRL_EVT_RING,
567                                            SVM_Q_WAIT, msg);
568       evt = svm_msg_q_msg_data (app_wrk->event_queue, msg);
569       clib_memset (evt, 0, sizeof (*evt));
570       evt->event_type = SESSION_CTRL_EVT_REQ_WORKER_UPDATE;
571       wump = (session_req_worker_update_msg_t *) evt->data;
572       wump->session_handle = mp->handle;
573       svm_msg_q_add_and_unlock (app_wrk->event_queue, msg);
574       return;
575     }
576
577   app_worker_own_session (app_wrk, s);
578
579   /*
580    * Send reply
581    */
582   svm_msg_q_lock_and_alloc_msg_w_ring (app_wrk->event_queue,
583                                        SESSION_MQ_CTRL_EVT_RING,
584                                        SVM_Q_WAIT, msg);
585   evt = svm_msg_q_msg_data (app_wrk->event_queue, msg);
586   clib_memset (evt, 0, sizeof (*evt));
587   evt->event_type = SESSION_CTRL_EVT_WORKER_UPDATE_REPLY;
588   rmp = (session_worker_update_reply_msg_t *) evt->data;
589   rmp->handle = mp->handle;
590   if (s->rx_fifo)
591     rmp->rx_fifo = fifo_segment_fifo_offset (s->rx_fifo);
592   if (s->tx_fifo)
593     rmp->tx_fifo = fifo_segment_fifo_offset (s->tx_fifo);
594   rmp->segment_handle = session_segment_handle (s);
595   svm_msg_q_add_and_unlock (app_wrk->event_queue, msg);
596
597   /*
598    * Retransmit messages that may have been lost
599    */
600   if (s->tx_fifo && !svm_fifo_is_empty (s->tx_fifo))
601     session_send_io_evt_to_thread (s->tx_fifo, SESSION_IO_EVT_TX);
602
603   if (s->rx_fifo && !svm_fifo_is_empty (s->rx_fifo))
604     app_worker_lock_and_send_event (app_wrk, s, SESSION_IO_EVT_RX);
605
606   if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSING)
607     app_worker_close_notify (app_wrk, s);
608 }
609
610 static void
611 session_mq_app_wrk_rpc_handler (void *data)
612 {
613   session_app_wrk_rpc_msg_t *mp = (session_app_wrk_rpc_msg_t *) data;
614   svm_msg_q_msg_t _msg, *msg = &_msg;
615   session_app_wrk_rpc_msg_t *rmp;
616   app_worker_t *app_wrk;
617   session_event_t *evt;
618   application_t *app;
619
620   app = application_lookup (mp->client_index);
621   if (!app)
622     return;
623
624   app_wrk = application_get_worker (app, mp->wrk_index);
625
626   svm_msg_q_lock_and_alloc_msg_w_ring (app_wrk->event_queue,
627                                        SESSION_MQ_CTRL_EVT_RING, SVM_Q_WAIT,
628                                        msg);
629   evt = svm_msg_q_msg_data (app_wrk->event_queue, msg);
630   clib_memset (evt, 0, sizeof (*evt));
631   evt->event_type = SESSION_CTRL_EVT_APP_WRK_RPC;
632   rmp = (session_app_wrk_rpc_msg_t *) evt->data;
633   clib_memcpy (rmp->data, mp->data, sizeof (mp->data));
634   svm_msg_q_add_and_unlock (app_wrk->event_queue, msg);
635 }
636
637 static void
638 session_mq_transport_attr_handler (void *data)
639 {
640   session_transport_attr_msg_t *mp = (session_transport_attr_msg_t *) data;
641   session_transport_attr_reply_msg_t *rmp;
642   svm_msg_q_msg_t _msg, *msg = &_msg;
643   app_worker_t *app_wrk;
644   session_event_t *evt;
645   application_t *app;
646   session_t *s;
647   int rv;
648
649   app = application_lookup (mp->client_index);
650   if (!app)
651     return;
652
653   if (!(s = session_get_from_handle_if_valid (mp->handle)))
654     {
655       clib_warning ("invalid handle %llu", mp->handle);
656       return;
657     }
658   app_wrk = app_worker_get (s->app_wrk_index);
659   if (app_wrk->app_index != app->app_index)
660     {
661       clib_warning ("app %u does not own session %llu", app->app_index,
662                     mp->handle);
663       return;
664     }
665
666   rv = session_transport_attribute (s, mp->is_get, &mp->attr);
667
668   svm_msg_q_lock_and_alloc_msg_w_ring (
669     app_wrk->event_queue, SESSION_MQ_CTRL_EVT_RING, SVM_Q_WAIT, msg);
670   evt = svm_msg_q_msg_data (app_wrk->event_queue, msg);
671   clib_memset (evt, 0, sizeof (*evt));
672   evt->event_type = SESSION_CTRL_EVT_TRANSPORT_ATTR_REPLY;
673   rmp = (session_transport_attr_reply_msg_t *) evt->data;
674   rmp->handle = mp->handle;
675   rmp->retval = rv;
676   rmp->is_get = mp->is_get;
677   if (!rv && mp->is_get)
678     rmp->attr = mp->attr;
679   svm_msg_q_add_and_unlock (app_wrk->event_queue, msg);
680 }
681
682 vlib_node_registration_t session_queue_node;
683
684 typedef struct
685 {
686   u32 session_index;
687   u32 server_thread_index;
688 } session_queue_trace_t;
689
690 /* packet trace format function */
691 static u8 *
692 format_session_queue_trace (u8 * s, va_list * args)
693 {
694   CLIB_UNUSED (vlib_main_t * vm) = va_arg (*args, vlib_main_t *);
695   CLIB_UNUSED (vlib_node_t * node) = va_arg (*args, vlib_node_t *);
696   session_queue_trace_t *t = va_arg (*args, session_queue_trace_t *);
697
698   s = format (s, "session index %d thread index %d",
699               t->session_index, t->server_thread_index);
700   return s;
701 }
702
703 #define foreach_session_queue_error             \
704 _(TX, "Packets transmitted")                    \
705 _(TIMER, "Timer events")                        \
706 _(NO_BUFFER, "Out of buffers")
707
708 typedef enum
709 {
710 #define _(sym,str) SESSION_QUEUE_ERROR_##sym,
711   foreach_session_queue_error
712 #undef _
713     SESSION_QUEUE_N_ERROR,
714 } session_queue_error_t;
715
716 static char *session_queue_error_strings[] = {
717 #define _(sym,string) string,
718   foreach_session_queue_error
719 #undef _
720 };
721
722 enum
723 {
724   SESSION_TX_NO_BUFFERS = -2,
725   SESSION_TX_NO_DATA,
726   SESSION_TX_OK
727 };
728
729 static void
730 session_tx_trace_frame (vlib_main_t * vm, vlib_node_runtime_t * node,
731                         u32 next_index, u32 * to_next, u16 n_segs,
732                         session_t * s, u32 n_trace)
733 {
734   while (n_trace && n_segs)
735     {
736       vlib_buffer_t *b = vlib_get_buffer (vm, to_next[0]);
737       if (PREDICT_TRUE
738           (vlib_trace_buffer
739            (vm, node, next_index, b, 1 /* follow_chain */ )))
740         {
741           session_queue_trace_t *t =
742             vlib_add_trace (vm, node, b, sizeof (*t));
743           t->session_index = s->session_index;
744           t->server_thread_index = s->thread_index;
745           n_trace--;
746         }
747       to_next++;
748       n_segs--;
749     }
750   vlib_set_trace_count (vm, node, n_trace);
751 }
752
753 always_inline void
754 session_tx_fifo_chain_tail (vlib_main_t * vm, session_tx_context_t * ctx,
755                             vlib_buffer_t * b, u16 * n_bufs, u8 peek_data)
756 {
757   vlib_buffer_t *chain_b, *prev_b;
758   u32 chain_bi0, to_deq, left_from_seg;
759   u16 len_to_deq, n_bytes_read;
760   u8 *data, j;
761
762   b->flags |= VLIB_BUFFER_TOTAL_LENGTH_VALID;
763   b->total_length_not_including_first_buffer = 0;
764
765   chain_b = b;
766   left_from_seg = clib_min (ctx->sp.snd_mss - b->current_length,
767                             ctx->left_to_snd);
768   to_deq = left_from_seg;
769   for (j = 1; j < ctx->n_bufs_per_seg; j++)
770     {
771       prev_b = chain_b;
772       len_to_deq = clib_min (to_deq, ctx->deq_per_buf);
773
774       *n_bufs -= 1;
775       chain_bi0 = ctx->tx_buffers[*n_bufs];
776       chain_b = vlib_get_buffer (vm, chain_bi0);
777       chain_b->current_data = 0;
778       data = vlib_buffer_get_current (chain_b);
779       if (peek_data)
780         {
781           n_bytes_read = svm_fifo_peek (ctx->s->tx_fifo,
782                                         ctx->sp.tx_offset, len_to_deq, data);
783           ctx->sp.tx_offset += n_bytes_read;
784         }
785       else
786         {
787           if (ctx->transport_vft->transport_options.tx_type ==
788               TRANSPORT_TX_DGRAM)
789             {
790               svm_fifo_t *f = ctx->s->tx_fifo;
791               session_dgram_hdr_t *hdr = &ctx->hdr;
792               u16 deq_now;
793               u32 offset;
794
795               deq_now = clib_min (hdr->data_length - hdr->data_offset,
796                                   len_to_deq);
797               offset = hdr->data_offset + SESSION_CONN_HDR_LEN;
798               n_bytes_read = svm_fifo_peek (f, offset, deq_now, data);
799               ASSERT (n_bytes_read > 0);
800
801               hdr->data_offset += n_bytes_read;
802               if (hdr->data_offset == hdr->data_length)
803                 {
804                   offset = hdr->data_length + SESSION_CONN_HDR_LEN;
805                   svm_fifo_dequeue_drop (f, offset);
806                   if (ctx->left_to_snd > n_bytes_read)
807                     svm_fifo_peek (ctx->s->tx_fifo, 0, sizeof (ctx->hdr),
808                                    (u8 *) & ctx->hdr);
809                 }
810               else if (ctx->left_to_snd == n_bytes_read)
811                 svm_fifo_overwrite_head (ctx->s->tx_fifo, (u8 *) & ctx->hdr,
812                                          sizeof (session_dgram_pre_hdr_t));
813             }
814           else
815             n_bytes_read = svm_fifo_dequeue (ctx->s->tx_fifo,
816                                              len_to_deq, data);
817         }
818       ASSERT (n_bytes_read == len_to_deq);
819       chain_b->current_length = n_bytes_read;
820       b->total_length_not_including_first_buffer += chain_b->current_length;
821
822       /* update previous buffer */
823       prev_b->next_buffer = chain_bi0;
824       prev_b->flags |= VLIB_BUFFER_NEXT_PRESENT;
825
826       /* update current buffer */
827       chain_b->next_buffer = 0;
828
829       to_deq -= n_bytes_read;
830       if (to_deq == 0)
831         break;
832     }
833   ASSERT (to_deq == 0
834           && b->total_length_not_including_first_buffer == left_from_seg);
835   ctx->left_to_snd -= left_from_seg;
836 }
837
838 always_inline void
839 session_tx_fill_buffer (vlib_main_t * vm, session_tx_context_t * ctx,
840                         vlib_buffer_t * b, u16 * n_bufs, u8 peek_data)
841 {
842   u32 len_to_deq;
843   u8 *data0;
844   int n_bytes_read;
845
846   /*
847    * Start with the first buffer in chain
848    */
849   b->error = 0;
850   b->flags = VNET_BUFFER_F_LOCALLY_ORIGINATED;
851   b->current_data = 0;
852
853   data0 = vlib_buffer_make_headroom (b, TRANSPORT_MAX_HDRS_LEN);
854   len_to_deq = clib_min (ctx->left_to_snd, ctx->deq_per_first_buf);
855
856   if (peek_data)
857     {
858       n_bytes_read = svm_fifo_peek (ctx->s->tx_fifo, ctx->sp.tx_offset,
859                                     len_to_deq, data0);
860       ASSERT (n_bytes_read > 0);
861       /* Keep track of progress locally, transport is also supposed to
862        * increment it independently when pushing the header */
863       ctx->sp.tx_offset += n_bytes_read;
864     }
865   else
866     {
867       if (ctx->transport_vft->transport_options.tx_type == TRANSPORT_TX_DGRAM)
868         {
869           session_dgram_hdr_t *hdr = &ctx->hdr;
870           svm_fifo_t *f = ctx->s->tx_fifo;
871           u16 deq_now;
872           u32 offset;
873
874           ASSERT (hdr->data_length > hdr->data_offset);
875           deq_now = clib_min (hdr->data_length - hdr->data_offset,
876                               len_to_deq);
877           offset = hdr->data_offset + SESSION_CONN_HDR_LEN;
878           n_bytes_read = svm_fifo_peek (f, offset, deq_now, data0);
879           ASSERT (n_bytes_read > 0);
880
881           if (ctx->s->session_state == SESSION_STATE_LISTENING)
882             {
883               ip_copy (&ctx->tc->rmt_ip, &hdr->rmt_ip, ctx->tc->is_ip4);
884               ctx->tc->rmt_port = hdr->rmt_port;
885             }
886           hdr->data_offset += n_bytes_read;
887           if (hdr->data_offset == hdr->data_length)
888             {
889               offset = hdr->data_length + SESSION_CONN_HDR_LEN;
890               svm_fifo_dequeue_drop (f, offset);
891               if (ctx->left_to_snd > n_bytes_read)
892                 svm_fifo_peek (ctx->s->tx_fifo, 0, sizeof (ctx->hdr),
893                                (u8 *) & ctx->hdr);
894             }
895           else if (ctx->left_to_snd == n_bytes_read)
896             svm_fifo_overwrite_head (ctx->s->tx_fifo, (u8 *) & ctx->hdr,
897                                      sizeof (session_dgram_pre_hdr_t));
898         }
899       else
900         {
901           n_bytes_read = svm_fifo_dequeue (ctx->s->tx_fifo,
902                                            len_to_deq, data0);
903           ASSERT (n_bytes_read > 0);
904         }
905     }
906   b->current_length = n_bytes_read;
907   ctx->left_to_snd -= n_bytes_read;
908
909   /*
910    * Fill in the remaining buffers in the chain, if any
911    */
912   if (PREDICT_FALSE (ctx->n_bufs_per_seg > 1 && ctx->left_to_snd))
913     session_tx_fifo_chain_tail (vm, ctx, b, n_bufs, peek_data);
914 }
915
916 always_inline u8
917 session_tx_not_ready (session_t * s, u8 peek_data)
918 {
919   if (peek_data)
920     {
921       if (PREDICT_TRUE (s->session_state == SESSION_STATE_READY))
922         return 0;
923       /* Can retransmit for closed sessions but can't send new data if
924        * session is not ready or closed */
925       else if (s->session_state < SESSION_STATE_READY)
926         return 1;
927       else if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSED)
928         {
929           /* Allow closed transports to still send custom packets.
930            * For instance, tcp may want to send acks in time-wait. */
931           if (s->session_state != SESSION_STATE_TRANSPORT_DELETED
932               && (s->flags & SESSION_F_CUSTOM_TX))
933             return 0;
934           return 2;
935         }
936     }
937   return 0;
938 }
939
940 always_inline transport_connection_t *
941 session_tx_get_transport (session_tx_context_t * ctx, u8 peek_data)
942 {
943   if (peek_data)
944     {
945       return ctx->transport_vft->get_connection (ctx->s->connection_index,
946                                                  ctx->s->thread_index);
947     }
948   else
949     {
950       if (ctx->s->session_state == SESSION_STATE_LISTENING)
951         return ctx->transport_vft->get_listener (ctx->s->connection_index);
952       else
953         {
954           return ctx->transport_vft->get_connection (ctx->s->connection_index,
955                                                      ctx->s->thread_index);
956         }
957     }
958 }
959
960 always_inline void
961 session_tx_set_dequeue_params (vlib_main_t * vm, session_tx_context_t * ctx,
962                                u32 max_segs, u8 peek_data)
963 {
964   u32 n_bytes_per_buf, n_bytes_per_seg;
965
966   n_bytes_per_buf = vlib_buffer_get_default_data_size (vm);
967   ctx->max_dequeue = svm_fifo_max_dequeue_cons (ctx->s->tx_fifo);
968
969   if (peek_data)
970     {
971       /* Offset in rx fifo from where to peek data */
972       if (PREDICT_FALSE (ctx->sp.tx_offset >= ctx->max_dequeue))
973         {
974           ctx->max_len_to_snd = 0;
975           return;
976         }
977       ctx->max_dequeue -= ctx->sp.tx_offset;
978     }
979   else
980     {
981       if (ctx->transport_vft->transport_options.tx_type == TRANSPORT_TX_DGRAM)
982         {
983           u32 len, chain_limit;
984
985           if (ctx->max_dequeue <= sizeof (ctx->hdr))
986             {
987               ctx->max_len_to_snd = 0;
988               return;
989             }
990
991           svm_fifo_peek (ctx->s->tx_fifo, 0, sizeof (ctx->hdr),
992                          (u8 *) & ctx->hdr);
993           ASSERT (ctx->hdr.data_length > ctx->hdr.data_offset);
994           len = ctx->hdr.data_length - ctx->hdr.data_offset;
995
996           /* Process multiple dgrams if smaller than min (buf_space, mss).
997            * This avoids handling multiple dgrams if they require buffer
998            * chains */
999           chain_limit = clib_min (n_bytes_per_buf - TRANSPORT_MAX_HDRS_LEN,
1000                                   ctx->sp.snd_mss);
1001           if (ctx->hdr.data_length <= chain_limit)
1002             {
1003               u32 first_dgram_len, dgram_len, offset, max_offset;
1004               session_dgram_hdr_t hdr;
1005
1006               ctx->sp.snd_mss = clib_min (ctx->sp.snd_mss, len);
1007               offset = ctx->hdr.data_length + sizeof (session_dgram_hdr_t);
1008               first_dgram_len = len;
1009               max_offset = clib_min (ctx->max_dequeue, 16 << 10);
1010
1011               while (offset < max_offset)
1012                 {
1013                   svm_fifo_peek (ctx->s->tx_fifo, offset, sizeof (ctx->hdr),
1014                                  (u8 *) & hdr);
1015                   ASSERT (hdr.data_length > hdr.data_offset);
1016                   dgram_len = hdr.data_length - hdr.data_offset;
1017                   if (len + dgram_len > ctx->max_dequeue
1018                       || first_dgram_len != dgram_len)
1019                     break;
1020                   len += dgram_len;
1021                   offset += sizeof (hdr) + hdr.data_length;
1022                 }
1023             }
1024
1025           ctx->max_dequeue = len;
1026         }
1027     }
1028   ASSERT (ctx->max_dequeue > 0);
1029
1030   /* Ensure we're not writing more than transport window allows */
1031   if (ctx->max_dequeue < ctx->sp.snd_space)
1032     {
1033       /* Constrained by tx queue. Try to send only fully formed segments */
1034       ctx->max_len_to_snd = (ctx->max_dequeue > ctx->sp.snd_mss) ?
1035         (ctx->max_dequeue - (ctx->max_dequeue % ctx->sp.snd_mss)) :
1036         ctx->max_dequeue;
1037       /* TODO Nagle ? */
1038     }
1039   else
1040     {
1041       /* Expectation is that snd_space0 is already a multiple of snd_mss */
1042       ctx->max_len_to_snd = ctx->sp.snd_space;
1043     }
1044
1045   /* Check if we're tx constrained by the node */
1046   ctx->n_segs_per_evt = ceil ((f64) ctx->max_len_to_snd / ctx->sp.snd_mss);
1047   if (ctx->n_segs_per_evt > max_segs)
1048     {
1049       ctx->n_segs_per_evt = max_segs;
1050       ctx->max_len_to_snd = max_segs * ctx->sp.snd_mss;
1051     }
1052
1053   ASSERT (n_bytes_per_buf > TRANSPORT_MAX_HDRS_LEN);
1054   if (ctx->n_segs_per_evt > 1)
1055     {
1056       u32 n_bytes_last_seg, n_bufs_last_seg;
1057
1058       n_bytes_per_seg = TRANSPORT_MAX_HDRS_LEN + ctx->sp.snd_mss;
1059       n_bytes_last_seg = TRANSPORT_MAX_HDRS_LEN + ctx->max_len_to_snd
1060         - ((ctx->n_segs_per_evt - 1) * ctx->sp.snd_mss);
1061       ctx->n_bufs_per_seg = ceil ((f64) n_bytes_per_seg / n_bytes_per_buf);
1062       n_bufs_last_seg = ceil ((f64) n_bytes_last_seg / n_bytes_per_buf);
1063       ctx->n_bufs_needed = ((ctx->n_segs_per_evt - 1) * ctx->n_bufs_per_seg)
1064         + n_bufs_last_seg;
1065     }
1066   else
1067     {
1068       n_bytes_per_seg = TRANSPORT_MAX_HDRS_LEN + ctx->max_len_to_snd;
1069       ctx->n_bufs_per_seg = ceil ((f64) n_bytes_per_seg / n_bytes_per_buf);
1070       ctx->n_bufs_needed = ctx->n_bufs_per_seg;
1071     }
1072
1073   ctx->deq_per_buf = clib_min (ctx->sp.snd_mss, n_bytes_per_buf);
1074   ctx->deq_per_first_buf = clib_min (ctx->sp.snd_mss,
1075                                      n_bytes_per_buf -
1076                                      TRANSPORT_MAX_HDRS_LEN);
1077 }
1078
1079 always_inline void
1080 session_tx_maybe_reschedule (session_worker_t * wrk,
1081                              session_tx_context_t * ctx,
1082                              session_evt_elt_t * elt)
1083 {
1084   session_t *s = ctx->s;
1085
1086   svm_fifo_unset_event (s->tx_fifo);
1087   if (svm_fifo_max_dequeue_cons (s->tx_fifo) > ctx->sp.tx_offset)
1088     if (svm_fifo_set_event (s->tx_fifo))
1089       session_evt_add_head_old (wrk, elt);
1090 }
1091
1092 always_inline int
1093 session_tx_fifo_read_and_snd_i (session_worker_t * wrk,
1094                                 vlib_node_runtime_t * node,
1095                                 session_evt_elt_t * elt,
1096                                 int *n_tx_packets, u8 peek_data)
1097 {
1098   u32 n_trace, n_left, pbi, next_index, max_burst;
1099   session_tx_context_t *ctx = &wrk->ctx;
1100   session_main_t *smm = &session_main;
1101   session_event_t *e = &elt->evt;
1102   vlib_main_t *vm = wrk->vm;
1103   transport_proto_t tp;
1104   vlib_buffer_t *pb;
1105   u16 n_bufs, rv;
1106
1107   if (PREDICT_FALSE ((rv = session_tx_not_ready (ctx->s, peek_data))))
1108     {
1109       if (rv < 2)
1110         session_evt_add_old (wrk, elt);
1111       return SESSION_TX_NO_DATA;
1112     }
1113
1114   next_index = smm->session_type_to_next[ctx->s->session_type];
1115   max_burst = SESSION_NODE_FRAME_SIZE - *n_tx_packets;
1116
1117   tp = session_get_transport_proto (ctx->s);
1118   ctx->transport_vft = transport_protocol_get_vft (tp);
1119   ctx->tc = session_tx_get_transport (ctx, peek_data);
1120
1121   if (PREDICT_FALSE (e->event_type == SESSION_IO_EVT_TX_FLUSH))
1122     {
1123       if (ctx->transport_vft->flush_data)
1124         ctx->transport_vft->flush_data (ctx->tc);
1125       e->event_type = SESSION_IO_EVT_TX;
1126     }
1127
1128   if (ctx->s->flags & SESSION_F_CUSTOM_TX)
1129     {
1130       u32 n_custom_tx;
1131       ctx->s->flags &= ~SESSION_F_CUSTOM_TX;
1132       ctx->sp.max_burst_size = max_burst;
1133       n_custom_tx = ctx->transport_vft->custom_tx (ctx->tc, &ctx->sp);
1134       *n_tx_packets += n_custom_tx;
1135       if (PREDICT_FALSE
1136           (ctx->s->session_state >= SESSION_STATE_TRANSPORT_CLOSED))
1137         return SESSION_TX_OK;
1138       max_burst -= n_custom_tx;
1139       if (!max_burst || (ctx->s->flags & SESSION_F_CUSTOM_TX))
1140         {
1141           session_evt_add_old (wrk, elt);
1142           return SESSION_TX_OK;
1143         }
1144     }
1145
1146   transport_connection_snd_params (ctx->tc, &ctx->sp);
1147
1148   if (!ctx->sp.snd_space)
1149     {
1150       /* If the deschedule flag was set, remove session from scheduler.
1151        * Transport is responsible for rescheduling this session. */
1152       if (ctx->sp.flags & TRANSPORT_SND_F_DESCHED)
1153         transport_connection_deschedule (ctx->tc);
1154       /* Request to postpone the session, e.g., zero-wnd and transport
1155        * is not currently probing */
1156       else if (ctx->sp.flags & TRANSPORT_SND_F_POSTPONE)
1157         session_evt_add_old (wrk, elt);
1158       /* This flow queue is "empty" so it should be re-evaluated before
1159        * the ones that have data to send. */
1160       else
1161         session_evt_add_head_old (wrk, elt);
1162
1163       return SESSION_TX_NO_DATA;
1164     }
1165
1166   if (transport_connection_is_tx_paced (ctx->tc))
1167     {
1168       u32 snd_space = transport_connection_tx_pacer_burst (ctx->tc);
1169       if (snd_space < TRANSPORT_PACER_MIN_BURST)
1170         {
1171           session_evt_add_head_old (wrk, elt);
1172           return SESSION_TX_NO_DATA;
1173         }
1174       snd_space = clib_min (ctx->sp.snd_space, snd_space);
1175       ctx->sp.snd_space = snd_space >= ctx->sp.snd_mss ?
1176         snd_space - snd_space % ctx->sp.snd_mss : snd_space;
1177     }
1178
1179   /* Check how much we can pull. */
1180   session_tx_set_dequeue_params (vm, ctx, max_burst, peek_data);
1181
1182   if (PREDICT_FALSE (!ctx->max_len_to_snd))
1183     {
1184       transport_connection_tx_pacer_reset_bucket (ctx->tc, 0);
1185       session_tx_maybe_reschedule (wrk, ctx, elt);
1186       return SESSION_TX_NO_DATA;
1187     }
1188
1189   vec_validate_aligned (ctx->tx_buffers, ctx->n_bufs_needed - 1,
1190                         CLIB_CACHE_LINE_BYTES);
1191   n_bufs = vlib_buffer_alloc (vm, ctx->tx_buffers, ctx->n_bufs_needed);
1192   if (PREDICT_FALSE (n_bufs < ctx->n_bufs_needed))
1193     {
1194       if (n_bufs)
1195         vlib_buffer_free (vm, ctx->tx_buffers, n_bufs);
1196       session_evt_add_head_old (wrk, elt);
1197       vlib_node_increment_counter (wrk->vm, node->node_index,
1198                                    SESSION_QUEUE_ERROR_NO_BUFFER, 1);
1199       return SESSION_TX_NO_BUFFERS;
1200     }
1201
1202   if (transport_connection_is_tx_paced (ctx->tc))
1203     transport_connection_tx_pacer_update_bytes (ctx->tc, ctx->max_len_to_snd);
1204
1205   ctx->left_to_snd = ctx->max_len_to_snd;
1206   n_left = ctx->n_segs_per_evt;
1207
1208   while (n_left >= 4)
1209     {
1210       vlib_buffer_t *b0, *b1;
1211       u32 bi0, bi1;
1212
1213       pbi = ctx->tx_buffers[n_bufs - 3];
1214       pb = vlib_get_buffer (vm, pbi);
1215       vlib_prefetch_buffer_header (pb, STORE);
1216       pbi = ctx->tx_buffers[n_bufs - 4];
1217       pb = vlib_get_buffer (vm, pbi);
1218       vlib_prefetch_buffer_header (pb, STORE);
1219
1220       bi0 = ctx->tx_buffers[--n_bufs];
1221       bi1 = ctx->tx_buffers[--n_bufs];
1222
1223       b0 = vlib_get_buffer (vm, bi0);
1224       b1 = vlib_get_buffer (vm, bi1);
1225
1226       session_tx_fill_buffer (vm, ctx, b0, &n_bufs, peek_data);
1227       session_tx_fill_buffer (vm, ctx, b1, &n_bufs, peek_data);
1228
1229       ctx->transport_vft->push_header (ctx->tc, b0);
1230       ctx->transport_vft->push_header (ctx->tc, b1);
1231
1232       n_left -= 2;
1233
1234       vec_add1 (wrk->pending_tx_buffers, bi0);
1235       vec_add1 (wrk->pending_tx_buffers, bi1);
1236       vec_add1 (wrk->pending_tx_nexts, next_index);
1237       vec_add1 (wrk->pending_tx_nexts, next_index);
1238     }
1239   while (n_left)
1240     {
1241       vlib_buffer_t *b0;
1242       u32 bi0;
1243
1244       if (n_left > 1)
1245         {
1246           pbi = ctx->tx_buffers[n_bufs - 2];
1247           pb = vlib_get_buffer (vm, pbi);
1248           vlib_prefetch_buffer_header (pb, STORE);
1249         }
1250
1251       bi0 = ctx->tx_buffers[--n_bufs];
1252       b0 = vlib_get_buffer (vm, bi0);
1253       session_tx_fill_buffer (vm, ctx, b0, &n_bufs, peek_data);
1254
1255       /* Ask transport to push header after current_length and
1256        * total_length_not_including_first_buffer are updated */
1257       ctx->transport_vft->push_header (ctx->tc, b0);
1258
1259       n_left -= 1;
1260
1261       vec_add1 (wrk->pending_tx_buffers, bi0);
1262       vec_add1 (wrk->pending_tx_nexts, next_index);
1263     }
1264
1265   if (PREDICT_FALSE ((n_trace = vlib_get_trace_count (vm, node)) > 0))
1266     session_tx_trace_frame (vm, node, next_index, wrk->pending_tx_buffers,
1267                             ctx->n_segs_per_evt, ctx->s, n_trace);
1268
1269   if (PREDICT_FALSE (n_bufs))
1270     vlib_buffer_free (vm, ctx->tx_buffers, n_bufs);
1271
1272   *n_tx_packets += ctx->n_segs_per_evt;
1273
1274   SESSION_EVT (SESSION_EVT_DEQ, ctx->s, ctx->max_len_to_snd, ctx->max_dequeue,
1275                ctx->s->tx_fifo->has_event, wrk->last_vlib_time);
1276
1277   ASSERT (ctx->left_to_snd == 0);
1278
1279   /* If we couldn't dequeue all bytes reschedule as old flow. Otherwise,
1280    * check if application enqueued more data and reschedule accordingly */
1281   if (ctx->max_len_to_snd < ctx->max_dequeue)
1282     session_evt_add_old (wrk, elt);
1283   else
1284     session_tx_maybe_reschedule (wrk, ctx, elt);
1285
1286   if (!peek_data)
1287     {
1288       u32 n_dequeued = ctx->max_len_to_snd;
1289       if (ctx->transport_vft->transport_options.tx_type == TRANSPORT_TX_DGRAM)
1290         n_dequeued += ctx->n_segs_per_evt * SESSION_CONN_HDR_LEN;
1291       if (svm_fifo_needs_deq_ntf (ctx->s->tx_fifo, n_dequeued))
1292         session_dequeue_notify (ctx->s);
1293     }
1294   return SESSION_TX_OK;
1295 }
1296
1297 int
1298 session_tx_fifo_peek_and_snd (session_worker_t * wrk,
1299                               vlib_node_runtime_t * node,
1300                               session_evt_elt_t * e, int *n_tx_packets)
1301 {
1302   return session_tx_fifo_read_and_snd_i (wrk, node, e, n_tx_packets, 1);
1303 }
1304
1305 int
1306 session_tx_fifo_dequeue_and_snd (session_worker_t * wrk,
1307                                  vlib_node_runtime_t * node,
1308                                  session_evt_elt_t * e, int *n_tx_packets)
1309 {
1310   return session_tx_fifo_read_and_snd_i (wrk, node, e, n_tx_packets, 0);
1311 }
1312
1313 int
1314 session_tx_fifo_dequeue_internal (session_worker_t * wrk,
1315                                   vlib_node_runtime_t * node,
1316                                   session_evt_elt_t * elt, int *n_tx_packets)
1317 {
1318   transport_send_params_t *sp = &wrk->ctx.sp;
1319   session_t *s = wrk->ctx.s;
1320   u32 n_packets;
1321
1322   if (PREDICT_FALSE (s->session_state >= SESSION_STATE_TRANSPORT_CLOSED))
1323     return 0;
1324
1325   /* Clear custom-tx flag used to request reschedule for tx */
1326   s->flags &= ~SESSION_F_CUSTOM_TX;
1327
1328   sp->max_burst_size = clib_min (SESSION_NODE_FRAME_SIZE - *n_tx_packets,
1329                                  TRANSPORT_PACER_MAX_BURST_PKTS);
1330
1331   n_packets = transport_custom_tx (session_get_transport_proto (s), s, sp);
1332   *n_tx_packets += n_packets;
1333
1334   if (s->flags & SESSION_F_CUSTOM_TX)
1335     {
1336       session_evt_add_old (wrk, elt);
1337     }
1338   else if (!(sp->flags & TRANSPORT_SND_F_DESCHED))
1339     {
1340       svm_fifo_unset_event (s->tx_fifo);
1341       if (svm_fifo_max_dequeue_cons (s->tx_fifo))
1342         if (svm_fifo_set_event (s->tx_fifo))
1343           session_evt_add_head_old (wrk, elt);
1344     }
1345
1346   if (sp->max_burst_size &&
1347       svm_fifo_needs_deq_ntf (s->tx_fifo, sp->max_burst_size))
1348     session_dequeue_notify (s);
1349
1350   return n_packets;
1351 }
1352
1353 always_inline session_t *
1354 session_event_get_session (session_worker_t * wrk, session_event_t * e)
1355 {
1356   if (PREDICT_FALSE (pool_is_free_index (wrk->sessions, e->session_index)))
1357     return 0;
1358
1359   ASSERT (session_is_valid (e->session_index, wrk->vm->thread_index));
1360   return pool_elt_at_index (wrk->sessions, e->session_index);
1361 }
1362
1363 always_inline void
1364 session_event_dispatch_ctrl (session_worker_t * wrk, session_evt_elt_t * elt)
1365 {
1366   clib_llist_index_t ei;
1367   void (*fp) (void *);
1368   session_event_t *e;
1369   session_t *s;
1370
1371   ei = clib_llist_entry_index (wrk->event_elts, elt);
1372   e = &elt->evt;
1373
1374   switch (e->event_type)
1375     {
1376     case SESSION_CTRL_EVT_RPC:
1377       fp = e->rpc_args.fp;
1378       (*fp) (e->rpc_args.arg);
1379       break;
1380     case SESSION_CTRL_EVT_HALF_CLOSE:
1381       s = session_get_from_handle_if_valid (e->session_handle);
1382       if (PREDICT_FALSE (!s))
1383         break;
1384       session_transport_half_close (s);
1385       break;
1386     case SESSION_CTRL_EVT_CLOSE:
1387       s = session_get_from_handle_if_valid (e->session_handle);
1388       if (PREDICT_FALSE (!s))
1389         break;
1390       session_transport_close (s);
1391       break;
1392     case SESSION_CTRL_EVT_RESET:
1393       s = session_get_from_handle_if_valid (e->session_handle);
1394       if (PREDICT_FALSE (!s))
1395         break;
1396       session_transport_reset (s);
1397       break;
1398     case SESSION_CTRL_EVT_LISTEN:
1399       session_mq_listen_handler (session_evt_ctrl_data (wrk, elt));
1400       break;
1401     case SESSION_CTRL_EVT_LISTEN_URI:
1402       session_mq_listen_uri_handler (session_evt_ctrl_data (wrk, elt));
1403       break;
1404     case SESSION_CTRL_EVT_UNLISTEN:
1405       session_mq_unlisten_handler (session_evt_ctrl_data (wrk, elt));
1406       break;
1407     case SESSION_CTRL_EVT_CONNECT:
1408       session_mq_connect_handler (wrk, elt);
1409       break;
1410     case SESSION_CTRL_EVT_CONNECT_URI:
1411       session_mq_connect_uri_handler (session_evt_ctrl_data (wrk, elt));
1412       break;
1413     case SESSION_CTRL_EVT_SHUTDOWN:
1414       session_mq_shutdown_handler (session_evt_ctrl_data (wrk, elt));
1415       break;
1416     case SESSION_CTRL_EVT_DISCONNECT:
1417       session_mq_disconnect_handler (session_evt_ctrl_data (wrk, elt));
1418       break;
1419     case SESSION_CTRL_EVT_DISCONNECTED:
1420       session_mq_disconnected_handler (session_evt_ctrl_data (wrk, elt));
1421       break;
1422     case SESSION_CTRL_EVT_ACCEPTED_REPLY:
1423       session_mq_accepted_reply_handler (session_evt_ctrl_data (wrk, elt));
1424       break;
1425     case SESSION_CTRL_EVT_DISCONNECTED_REPLY:
1426       session_mq_disconnected_reply_handler (session_evt_ctrl_data (wrk,
1427                                                                     elt));
1428       break;
1429     case SESSION_CTRL_EVT_RESET_REPLY:
1430       session_mq_reset_reply_handler (session_evt_ctrl_data (wrk, elt));
1431       break;
1432     case SESSION_CTRL_EVT_WORKER_UPDATE:
1433       session_mq_worker_update_handler (session_evt_ctrl_data (wrk, elt));
1434       break;
1435     case SESSION_CTRL_EVT_APP_DETACH:
1436       app_mq_detach_handler (session_evt_ctrl_data (wrk, elt));
1437       break;
1438     case SESSION_CTRL_EVT_APP_WRK_RPC:
1439       session_mq_app_wrk_rpc_handler (session_evt_ctrl_data (wrk, elt));
1440       break;
1441     case SESSION_CTRL_EVT_TRANSPORT_ATTR:
1442       session_mq_transport_attr_handler (session_evt_ctrl_data (wrk, elt));
1443       break;
1444     default:
1445       clib_warning ("unhandled event type %d", e->event_type);
1446     }
1447
1448   /* Regrab elements in case pool moved */
1449   elt = pool_elt_at_index (wrk->event_elts, ei);
1450   if (!clib_llist_elt_is_linked (elt, evt_list))
1451     {
1452       e = &elt->evt;
1453       if (e->event_type >= SESSION_CTRL_EVT_BOUND)
1454         session_evt_ctrl_data_free (wrk, elt);
1455       session_evt_elt_free (wrk, elt);
1456     }
1457   SESSION_EVT (SESSION_EVT_COUNTS, CNT_CTRL_EVTS, 1, wrk);
1458 }
1459
1460 always_inline void
1461 session_event_dispatch_io (session_worker_t * wrk, vlib_node_runtime_t * node,
1462                            session_evt_elt_t * elt, int *n_tx_packets)
1463 {
1464   session_main_t *smm = &session_main;
1465   app_worker_t *app_wrk;
1466   clib_llist_index_t ei;
1467   session_event_t *e;
1468   session_t *s;
1469
1470   ei = clib_llist_entry_index (wrk->event_elts, elt);
1471   e = &elt->evt;
1472
1473   switch (e->event_type)
1474     {
1475     case SESSION_IO_EVT_TX_FLUSH:
1476     case SESSION_IO_EVT_TX:
1477       s = session_event_get_session (wrk, e);
1478       if (PREDICT_FALSE (!s))
1479         break;
1480       CLIB_PREFETCH (s->tx_fifo, 2 * CLIB_CACHE_LINE_BYTES, LOAD);
1481       wrk->ctx.s = s;
1482       /* Spray packets in per session type frames, since they go to
1483        * different nodes */
1484       (smm->session_tx_fns[s->session_type]) (wrk, node, elt, n_tx_packets);
1485       break;
1486     case SESSION_IO_EVT_RX:
1487       s = session_event_get_session (wrk, e);
1488       if (!s)
1489         break;
1490       transport_app_rx_evt (session_get_transport_proto (s),
1491                             s->connection_index, s->thread_index);
1492       break;
1493     case SESSION_IO_EVT_BUILTIN_RX:
1494       s = session_event_get_session (wrk, e);
1495       if (PREDICT_FALSE (!s || s->session_state >= SESSION_STATE_CLOSING))
1496         break;
1497       svm_fifo_unset_event (s->rx_fifo);
1498       app_wrk = app_worker_get (s->app_wrk_index);
1499       app_worker_builtin_rx (app_wrk, s);
1500       break;
1501     case SESSION_IO_EVT_BUILTIN_TX:
1502       s = session_get_from_handle_if_valid (e->session_handle);
1503       wrk->ctx.s = s;
1504       if (PREDICT_TRUE (s != 0))
1505         session_tx_fifo_dequeue_internal (wrk, node, elt, n_tx_packets);
1506       break;
1507     default:
1508       clib_warning ("unhandled event type %d", e->event_type);
1509     }
1510
1511   SESSION_EVT (SESSION_IO_EVT_COUNTS, e->event_type, 1, wrk);
1512
1513   /* Regrab elements in case pool moved */
1514   elt = pool_elt_at_index (wrk->event_elts, ei);
1515   if (!clib_llist_elt_is_linked (elt, evt_list))
1516     session_evt_elt_free (wrk, elt);
1517 }
1518
1519 /* *INDENT-OFF* */
1520 static const u32 session_evt_msg_sizes[] = {
1521 #define _(symc, sym)                                                    \
1522   [SESSION_CTRL_EVT_ ## symc] = sizeof (session_ ## sym ##_msg_t),
1523   foreach_session_ctrl_evt
1524 #undef _
1525 };
1526 /* *INDENT-ON* */
1527
1528 always_inline void
1529 session_evt_add_to_list (session_worker_t * wrk, session_event_t * evt)
1530 {
1531   session_evt_elt_t *elt;
1532
1533   if (evt->event_type >= SESSION_CTRL_EVT_RPC)
1534     {
1535       elt = session_evt_alloc_ctrl (wrk);
1536       if (evt->event_type >= SESSION_CTRL_EVT_BOUND)
1537         {
1538           elt->evt.ctrl_data_index = session_evt_ctrl_data_alloc (wrk);
1539           elt->evt.event_type = evt->event_type;
1540           clib_memcpy_fast (session_evt_ctrl_data (wrk, elt), evt->data,
1541                             session_evt_msg_sizes[evt->event_type]);
1542         }
1543       else
1544         {
1545           /* Internal control events fit into io events footprint */
1546           clib_memcpy_fast (&elt->evt, evt, sizeof (elt->evt));
1547         }
1548     }
1549   else
1550     {
1551       elt = session_evt_alloc_new (wrk);
1552       clib_memcpy_fast (&elt->evt, evt, sizeof (elt->evt));
1553     }
1554 }
1555
1556 static void
1557 session_flush_pending_tx_buffers (session_worker_t * wrk,
1558                                   vlib_node_runtime_t * node)
1559 {
1560   vlib_buffer_enqueue_to_next (wrk->vm, node, wrk->pending_tx_buffers,
1561                                wrk->pending_tx_nexts,
1562                                vec_len (wrk->pending_tx_nexts));
1563   vec_reset_length (wrk->pending_tx_buffers);
1564   vec_reset_length (wrk->pending_tx_nexts);
1565 }
1566
1567 int
1568 session_wrk_handle_mq (session_worker_t *wrk, svm_msg_q_t *mq)
1569 {
1570   svm_msg_q_msg_t _msg, *msg = &_msg;
1571   u32 i, n_to_dequeue = 0;
1572   session_event_t *evt;
1573
1574   n_to_dequeue = svm_msg_q_size (mq);
1575   for (i = 0; i < n_to_dequeue; i++)
1576     {
1577       svm_msg_q_sub_raw (mq, msg);
1578       evt = svm_msg_q_msg_data (mq, msg);
1579       session_evt_add_to_list (wrk, evt);
1580       svm_msg_q_free_msg (mq, msg);
1581     }
1582
1583   return n_to_dequeue;
1584 }
1585
1586 static void
1587 session_wrk_timerfd_update (session_worker_t *wrk, u64 time_ns)
1588 {
1589   struct itimerspec its;
1590
1591   its.it_value.tv_sec = 0;
1592   its.it_value.tv_nsec = time_ns;
1593   its.it_interval.tv_sec = 0;
1594   its.it_interval.tv_nsec = its.it_value.tv_nsec;
1595
1596   if (timerfd_settime (wrk->timerfd, 0, &its, NULL) == -1)
1597     clib_warning ("timerfd_settime");
1598 }
1599
1600 always_inline u64
1601 session_wrk_tfd_timeout (session_wrk_state_t state, u32 thread_index)
1602 {
1603   if (state == SESSION_WRK_INTERRUPT)
1604     return thread_index ? 1e6 : vlib_num_workers () ? 5e8 : 1e6;
1605   else if (state == SESSION_WRK_IDLE)
1606     return thread_index ? 1e8 : vlib_num_workers () ? 5e8 : 1e8;
1607   else
1608     return 0;
1609 }
1610
1611 static inline void
1612 session_wrk_set_state (session_worker_t *wrk, session_wrk_state_t state)
1613 {
1614   u64 time_ns;
1615
1616   wrk->state = state;
1617   time_ns = session_wrk_tfd_timeout (state, wrk->vm->thread_index);
1618   session_wrk_timerfd_update (wrk, time_ns);
1619 }
1620
1621 static void
1622 session_wrk_update_state (session_worker_t *wrk)
1623 {
1624   vlib_main_t *vm = wrk->vm;
1625
1626   if (wrk->state == SESSION_WRK_POLLING)
1627     {
1628       if (pool_elts (wrk->event_elts) == 3 &&
1629           vlib_last_vectors_per_main_loop (vm) < 1)
1630         {
1631           session_wrk_set_state (wrk, SESSION_WRK_INTERRUPT);
1632           vlib_node_set_state (vm, session_queue_node.index,
1633                                VLIB_NODE_STATE_INTERRUPT);
1634         }
1635     }
1636   else if (wrk->state == SESSION_WRK_INTERRUPT)
1637     {
1638       if (pool_elts (wrk->event_elts) > 3 ||
1639           vlib_last_vectors_per_main_loop (vm) > 1)
1640         {
1641           session_wrk_set_state (wrk, SESSION_WRK_POLLING);
1642           vlib_node_set_state (vm, session_queue_node.index,
1643                                VLIB_NODE_STATE_POLLING);
1644         }
1645       else if (PREDICT_FALSE (!pool_elts (wrk->sessions)))
1646         {
1647           session_wrk_set_state (wrk, SESSION_WRK_IDLE);
1648         }
1649     }
1650   else
1651     {
1652       if (pool_elts (wrk->event_elts))
1653         {
1654           session_wrk_set_state (wrk, SESSION_WRK_INTERRUPT);
1655         }
1656     }
1657 }
1658
1659 static uword
1660 session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
1661                        vlib_frame_t * frame)
1662 {
1663   u32 thread_index = vm->thread_index, __clib_unused n_evts;
1664   session_evt_elt_t *elt, *ctrl_he, *new_he, *old_he;
1665   session_main_t *smm = vnet_get_session_main ();
1666   session_worker_t *wrk = &smm->wrk[thread_index];
1667   clib_llist_index_t ei, next_ei, old_ti;
1668   int n_tx_packets;
1669
1670   SESSION_EVT (SESSION_EVT_DISPATCH_START, wrk);
1671
1672   session_wrk_update_time (wrk, vlib_time_now (vm));
1673
1674   /*
1675    *  Update transport time
1676    */
1677   transport_update_time (wrk->last_vlib_time, thread_index);
1678   n_tx_packets = vec_len (wrk->pending_tx_buffers);
1679   SESSION_EVT (SESSION_EVT_DSP_CNTRS, UPDATE_TIME, wrk);
1680
1681   /*
1682    *  Dequeue new internal mq events
1683    */
1684
1685   n_evts = session_wrk_handle_mq (wrk, wrk->vpp_event_queue);
1686   SESSION_EVT (SESSION_EVT_DSP_CNTRS, MQ_DEQ, wrk, n_evts);
1687
1688   /*
1689    * Handle control events
1690    */
1691
1692   ctrl_he = pool_elt_at_index (wrk->event_elts, wrk->ctrl_head);
1693
1694   clib_llist_foreach_safe (wrk->event_elts, evt_list, ctrl_he, elt, ({
1695     clib_llist_remove (wrk->event_elts, evt_list, elt);
1696     session_event_dispatch_ctrl (wrk, elt);
1697   }));
1698
1699   SESSION_EVT (SESSION_EVT_DSP_CNTRS, CTRL_EVTS, wrk);
1700
1701   /*
1702    * Handle the new io events.
1703    */
1704
1705   new_he = pool_elt_at_index (wrk->event_elts, wrk->new_head);
1706   old_he = pool_elt_at_index (wrk->event_elts, wrk->old_head);
1707   old_ti = clib_llist_prev_index (old_he, evt_list);
1708
1709   ei = clib_llist_next_index (new_he, evt_list);
1710   while (ei != wrk->new_head && n_tx_packets < SESSION_NODE_FRAME_SIZE)
1711     {
1712       elt = pool_elt_at_index (wrk->event_elts, ei);
1713       ei = clib_llist_next_index (elt, evt_list);
1714       clib_llist_remove (wrk->event_elts, evt_list, elt);
1715       session_event_dispatch_io (wrk, node, elt, &n_tx_packets);
1716     }
1717
1718   SESSION_EVT (SESSION_EVT_DSP_CNTRS, NEW_IO_EVTS, wrk);
1719
1720   /*
1721    * Handle the old io events, if we had any prior to processing the new ones
1722    */
1723
1724   if (old_ti != wrk->old_head)
1725     {
1726       old_he = pool_elt_at_index (wrk->event_elts, wrk->old_head);
1727       ei = clib_llist_next_index (old_he, evt_list);
1728
1729       while (n_tx_packets < SESSION_NODE_FRAME_SIZE)
1730         {
1731           elt = pool_elt_at_index (wrk->event_elts, ei);
1732           next_ei = clib_llist_next_index (elt, evt_list);
1733           clib_llist_remove (wrk->event_elts, evt_list, elt);
1734
1735           session_event_dispatch_io (wrk, node, elt, &n_tx_packets);
1736
1737           if (ei == old_ti)
1738             break;
1739
1740           ei = next_ei;
1741         };
1742     }
1743
1744   SESSION_EVT (SESSION_EVT_DSP_CNTRS, OLD_IO_EVTS, wrk);
1745
1746   if (vec_len (wrk->pending_tx_buffers))
1747     session_flush_pending_tx_buffers (wrk, node);
1748
1749   vlib_node_increment_counter (vm, session_queue_node.index,
1750                                SESSION_QUEUE_ERROR_TX, n_tx_packets);
1751
1752   SESSION_EVT (SESSION_EVT_DISPATCH_END, wrk, n_tx_packets);
1753
1754   if (wrk->flags & SESSION_WRK_F_ADAPTIVE)
1755     session_wrk_update_state (wrk);
1756
1757   return n_tx_packets;
1758 }
1759
1760 /* *INDENT-OFF* */
1761 VLIB_REGISTER_NODE (session_queue_node) =
1762 {
1763   .function = session_queue_node_fn,
1764   .flags = VLIB_NODE_FLAG_TRACE_SUPPORTED,
1765   .name = "session-queue",
1766   .format_trace = format_session_queue_trace,
1767   .type = VLIB_NODE_TYPE_INPUT,
1768   .n_errors = ARRAY_LEN (session_queue_error_strings),
1769   .error_strings = session_queue_error_strings,
1770   .state = VLIB_NODE_STATE_DISABLED,
1771 };
1772 /* *INDENT-ON* */
1773
1774 static clib_error_t *
1775 session_wrk_tfd_read_ready (clib_file_t *cf)
1776 {
1777   session_worker_t *wrk = session_main_get_worker (cf->private_data);
1778   u64 buf;
1779   int rv;
1780
1781   vlib_node_set_interrupt_pending (wrk->vm, session_queue_node.index);
1782   rv = read (wrk->timerfd, &buf, sizeof (buf));
1783   if (rv < 0 && errno != EAGAIN)
1784     clib_unix_warning ("failed");
1785   return 0;
1786 }
1787
1788 static clib_error_t *
1789 session_wrk_tfd_write_ready (clib_file_t *cf)
1790 {
1791   return 0;
1792 }
1793
1794 void
1795 session_wrk_enable_adaptive_mode (session_worker_t *wrk)
1796 {
1797   u32 thread_index = wrk->vm->thread_index;
1798   clib_file_t template = { 0 };
1799
1800   if ((wrk->timerfd = timerfd_create (CLOCK_MONOTONIC, TFD_NONBLOCK)) < 0)
1801     clib_warning ("timerfd_create");
1802
1803   template.read_function = session_wrk_tfd_read_ready;
1804   template.write_function = session_wrk_tfd_write_ready;
1805   template.file_descriptor = wrk->timerfd;
1806   template.private_data = thread_index;
1807   template.polling_thread_index = thread_index;
1808   template.description = format (0, "session-wrk-tfd-%u", thread_index);
1809
1810   wrk->timerfd_file = clib_file_add (&file_main, &template);
1811   wrk->flags |= SESSION_WRK_F_ADAPTIVE;
1812 }
1813
1814 static clib_error_t *
1815 session_queue_exit (vlib_main_t * vm)
1816 {
1817   if (vlib_get_n_threads () < 2)
1818     return 0;
1819
1820   /*
1821    * Shut off (especially) worker-thread session nodes.
1822    * Otherwise, vpp can crash as the main thread unmaps the
1823    * API segment.
1824    */
1825   vlib_worker_thread_barrier_sync (vm);
1826   session_node_enable_disable (0 /* is_enable */ );
1827   vlib_worker_thread_barrier_release (vm);
1828   return 0;
1829 }
1830
1831 VLIB_MAIN_LOOP_EXIT_FUNCTION (session_queue_exit);
1832
1833 static uword
1834 session_queue_run_on_main (vlib_main_t * vm)
1835 {
1836   vlib_node_runtime_t *node;
1837
1838   node = vlib_node_get_runtime (vm, session_queue_node.index);
1839   return session_queue_node_fn (vm, node, 0);
1840 }
1841
1842 static uword
1843 session_queue_process (vlib_main_t * vm, vlib_node_runtime_t * rt,
1844                        vlib_frame_t * f)
1845 {
1846   uword *event_data = 0;
1847   f64 timeout = 1.0;
1848   uword event_type;
1849
1850   while (1)
1851     {
1852       vlib_process_wait_for_event_or_clock (vm, timeout);
1853       event_type = vlib_process_get_events (vm, (uword **) & event_data);
1854
1855       switch (event_type)
1856         {
1857         case SESSION_Q_PROCESS_RUN_ON_MAIN:
1858           /* Run session queue node on main thread */
1859           session_queue_run_on_main (vm);
1860           break;
1861         case SESSION_Q_PROCESS_STOP:
1862           vlib_node_set_state (vm, session_queue_process_node.index,
1863                                VLIB_NODE_STATE_DISABLED);
1864           timeout = 100000.0;
1865           break;
1866         case ~0:
1867           /* Timed out. Run on main to ensure all events are handled */
1868           session_queue_run_on_main (vm);
1869           break;
1870         }
1871       vec_reset_length (event_data);
1872     }
1873   return 0;
1874 }
1875
1876 /* *INDENT-OFF* */
1877 VLIB_REGISTER_NODE (session_queue_process_node) =
1878 {
1879   .function = session_queue_process,
1880   .type = VLIB_NODE_TYPE_PROCESS,
1881   .name = "session-queue-process",
1882   .state = VLIB_NODE_STATE_DISABLED,
1883 };
1884 /* *INDENT-ON* */
1885
1886 static_always_inline uword
1887 session_queue_pre_input_inline (vlib_main_t * vm, vlib_node_runtime_t * node,
1888                                 vlib_frame_t * frame)
1889 {
1890   session_main_t *sm = &session_main;
1891   if (!sm->wrk[0].vpp_event_queue)
1892     return 0;
1893   node = vlib_node_get_runtime (vm, session_queue_node.index);
1894   return session_queue_node_fn (vm, node, frame);
1895 }
1896
1897 /* *INDENT-OFF* */
1898 VLIB_REGISTER_NODE (session_queue_pre_input_node) =
1899 {
1900   .function = session_queue_pre_input_inline,
1901   .type = VLIB_NODE_TYPE_PRE_INPUT,
1902   .name = "session-queue-main",
1903   .state = VLIB_NODE_STATE_DISABLED,
1904 };
1905 /* *INDENT-ON* */
1906
1907 /*
1908  * fd.io coding-style-patch-verification: ON
1909  *
1910  * Local Variables:
1911  * eval: (c-set-style "gnu")
1912  * End:
1913  */