session: rpc for connects to main
[vpp.git] / src / vnet / session / session_node.c
1 /*
2  * Copyright (c) 2017-2019 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include <math.h>
17 #include <vlib/vlib.h>
18 #include <vnet/vnet.h>
19 #include <vppinfra/elog.h>
20 #include <vnet/session/transport.h>
21 #include <vnet/session/session.h>
22 #include <vnet/session/application.h>
23 #include <vnet/session/application_interface.h>
24 #include <vnet/session/application_local.h>
25 #include <vnet/session/session_debug.h>
26 #include <svm/queue.h>
27 #include <sys/timerfd.h>
28
29 #define app_check_thread_and_barrier(_fn, _arg)                         \
30   if (!vlib_thread_is_main_w_barrier ())                                \
31     {                                                                   \
32      vlib_rpc_call_main_thread (_fn, (u8 *) _arg, sizeof(*_arg));       \
33       return;                                                           \
34    }
35
36 static transport_endpt_ext_cfg_t *
37 session_mq_get_ext_config (application_t *app, uword offset)
38 {
39   svm_fifo_chunk_t *c;
40   fifo_segment_t *fs;
41
42   fs = application_get_rx_mqs_segment (app);
43   c = fs_chunk_ptr (fs->h, offset);
44   return (transport_endpt_ext_cfg_t *) c->data;
45 }
46
47 static void
48 session_mq_free_ext_config (application_t *app, uword offset)
49 {
50   svm_fifo_chunk_t *c;
51   fifo_segment_t *fs;
52
53   fs = application_get_rx_mqs_segment (app);
54   c = fs_chunk_ptr (fs->h, offset);
55   fifo_segment_collect_chunk (fs, 0 /* only one slice */, c);
56 }
57
58 static void
59 session_mq_listen_handler (void *data)
60 {
61   session_listen_msg_t *mp = (session_listen_msg_t *) data;
62   vnet_listen_args_t _a, *a = &_a;
63   app_worker_t *app_wrk;
64   application_t *app;
65   int rv;
66
67   app_check_thread_and_barrier (session_mq_listen_handler, mp);
68
69   app = application_lookup (mp->client_index);
70   if (!app)
71     return;
72
73   clib_memset (a, 0, sizeof (*a));
74   a->sep.is_ip4 = mp->is_ip4;
75   ip_copy (&a->sep.ip, &mp->ip, mp->is_ip4);
76   a->sep.port = mp->port;
77   a->sep.fib_index = mp->vrf;
78   a->sep.sw_if_index = ENDPOINT_INVALID_INDEX;
79   a->sep.transport_proto = mp->proto;
80   a->app_index = app->app_index;
81   a->wrk_map_index = mp->wrk_index;
82   a->sep_ext.transport_flags = mp->flags;
83
84   if (mp->ext_config)
85     a->sep_ext.ext_cfg = session_mq_get_ext_config (app, mp->ext_config);
86
87   if ((rv = vnet_listen (a)))
88     clib_warning ("listen returned: %U", format_session_error, rv);
89
90   app_wrk = application_get_worker (app, mp->wrk_index);
91   mq_send_session_bound_cb (app_wrk->wrk_index, mp->context, a->handle, rv);
92
93   if (mp->ext_config)
94     session_mq_free_ext_config (app, mp->ext_config);
95 }
96
97 static void
98 session_mq_listen_uri_handler (void *data)
99 {
100   session_listen_uri_msg_t *mp = (session_listen_uri_msg_t *) data;
101   vnet_listen_args_t _a, *a = &_a;
102   app_worker_t *app_wrk;
103   application_t *app;
104   int rv;
105
106   app_check_thread_and_barrier (session_mq_listen_uri_handler, mp);
107
108   app = application_lookup (mp->client_index);
109   if (!app)
110     return;
111
112   clib_memset (a, 0, sizeof (*a));
113   a->uri = (char *) mp->uri;
114   a->app_index = app->app_index;
115   rv = vnet_bind_uri (a);
116
117   app_wrk = application_get_worker (app, 0);
118   mq_send_session_bound_cb (app_wrk->wrk_index, mp->context, a->handle, rv);
119 }
120
121 static void
122 session_mq_connect_one (session_connect_msg_t *mp)
123 {
124   vnet_connect_args_t _a, *a = &_a;
125   app_worker_t *app_wrk;
126   application_t *app;
127   int rv;
128
129   app = application_lookup (mp->client_index);
130   if (!app)
131     return;
132
133   clib_memset (a, 0, sizeof (*a));
134   a->sep.is_ip4 = mp->is_ip4;
135   clib_memcpy_fast (&a->sep.ip, &mp->ip, sizeof (mp->ip));
136   a->sep.port = mp->port;
137   a->sep.transport_proto = mp->proto;
138   a->sep.peer.fib_index = mp->vrf;
139   clib_memcpy_fast (&a->sep.peer.ip, &mp->lcl_ip, sizeof (mp->lcl_ip));
140   if (mp->is_ip4)
141     {
142       ip46_address_mask_ip4 (&a->sep.ip);
143       ip46_address_mask_ip4 (&a->sep.peer.ip);
144     }
145   a->sep.peer.port = mp->lcl_port;
146   a->sep.peer.sw_if_index = ENDPOINT_INVALID_INDEX;
147   a->sep_ext.parent_handle = mp->parent_handle;
148   a->sep_ext.transport_flags = mp->flags;
149   a->api_context = mp->context;
150   a->app_index = app->app_index;
151   a->wrk_map_index = mp->wrk_index;
152
153   if (mp->ext_config)
154     a->sep_ext.ext_cfg = session_mq_get_ext_config (app, mp->ext_config);
155
156   if ((rv = vnet_connect (a)))
157     {
158       clib_warning ("connect returned: %U", format_session_error, rv);
159       app_wrk = application_get_worker (app, mp->wrk_index);
160       mq_send_session_connected_cb (app_wrk->wrk_index, mp->context, 0, rv);
161     }
162
163   if (mp->ext_config)
164     session_mq_free_ext_config (app, mp->ext_config);
165 }
166
167 static void
168 session_mq_handle_connects_rpc (void *arg)
169 {
170   u32 max_connects = 32, n_connects = 0;
171   vlib_main_t *vm = vlib_get_main ();
172   session_evt_elt_t *he, *elt, *next;
173   session_worker_t *fwrk;
174   u8 need_reschedule = 1;
175
176   ASSERT (vlib_get_thread_index () == 0);
177
178   /* Pending connects on linked list pertaining to first worker */
179   fwrk = session_main_get_worker (1);
180
181   vlib_worker_thread_barrier_sync (vm);
182
183   he = pool_elt_at_index (fwrk->event_elts, fwrk->pending_connects);
184   elt = clib_llist_next (fwrk->event_elts, evt_list, he);
185
186   /* Avoid holding the barrier for too long */
187   while (n_connects < max_connects && elt != he)
188     {
189       next = clib_llist_next (fwrk->event_elts, evt_list, elt);
190       clib_llist_remove (fwrk->event_elts, evt_list, elt);
191       session_mq_connect_one (session_evt_ctrl_data (fwrk, elt));
192       session_evt_elt_free (fwrk, elt);
193       elt = next;
194       n_connects += 1;
195     }
196
197   if (clib_llist_is_empty (fwrk->event_elts, evt_list, he))
198     {
199       fwrk->pending_connects_ntf = 0;
200       need_reschedule = 0;
201     }
202
203   vlib_worker_thread_barrier_release (vm);
204
205   if (need_reschedule)
206     {
207       vlib_node_set_interrupt_pending (vm, session_queue_node.index);
208       elt = session_evt_alloc_ctrl (session_main_get_worker (0));
209       elt->evt.event_type = SESSION_CTRL_EVT_RPC;
210       elt->evt.rpc_args.fp = session_mq_handle_connects_rpc;
211     }
212 }
213
214 static void
215 session_mq_connect_handler (session_worker_t *wrk, session_evt_elt_t *elt)
216 {
217   u32 thread_index = wrk - session_main.wrk;
218   session_evt_elt_t *he;
219
220   /* No workers, so just deal with the connect now */
221   if (PREDICT_FALSE (!thread_index))
222     {
223       session_mq_connect_one (session_evt_ctrl_data (wrk, elt));
224       return;
225     }
226
227   if (PREDICT_FALSE (thread_index != 1))
228     {
229       clib_warning ("Connect on wrong thread. Dropping");
230       return;
231     }
232
233   /* Add to pending list to be handled by main thread */
234   he = pool_elt_at_index (wrk->event_elts, wrk->pending_connects);
235   clib_llist_add_tail (wrk->event_elts, evt_list, elt, he);
236
237   if (!wrk->pending_connects_ntf)
238     {
239       vlib_node_set_interrupt_pending (vlib_get_main_by_index (0),
240                                        session_queue_node.index);
241       session_send_rpc_evt_to_thread (0, session_mq_handle_connects_rpc, 0);
242       wrk->pending_connects_ntf = 1;
243     }
244 }
245
246 static void
247 session_mq_connect_uri_handler (void *data)
248 {
249   session_connect_uri_msg_t *mp = (session_connect_uri_msg_t *) data;
250   vnet_connect_args_t _a, *a = &_a;
251   app_worker_t *app_wrk;
252   application_t *app;
253   int rv;
254
255   app_check_thread_and_barrier (session_mq_connect_uri_handler, mp);
256
257   app = application_lookup (mp->client_index);
258   if (!app)
259     return;
260
261   clib_memset (a, 0, sizeof (*a));
262   a->uri = (char *) mp->uri;
263   a->api_context = mp->context;
264   a->app_index = app->app_index;
265   if ((rv = vnet_connect_uri (a)))
266     {
267       clib_warning ("connect_uri returned: %d", rv);
268       app_wrk = application_get_worker (app, 0 /* default wrk only */ );
269       mq_send_session_connected_cb (app_wrk->wrk_index, mp->context, 0, rv);
270     }
271 }
272
273 static void
274 session_mq_shutdown_handler (void *data)
275 {
276   session_shutdown_msg_t *mp = (session_shutdown_msg_t *) data;
277   vnet_shutdown_args_t _a, *a = &_a;
278   application_t *app;
279
280   app = application_lookup (mp->client_index);
281   if (!app)
282     return;
283
284   a->app_index = app->app_index;
285   a->handle = mp->handle;
286   vnet_shutdown_session (a);
287 }
288
289 static void
290 session_mq_disconnect_handler (void *data)
291 {
292   session_disconnect_msg_t *mp = (session_disconnect_msg_t *) data;
293   vnet_disconnect_args_t _a, *a = &_a;
294   application_t *app;
295
296   app = application_lookup (mp->client_index);
297   if (!app)
298     return;
299
300   a->app_index = app->app_index;
301   a->handle = mp->handle;
302   vnet_disconnect_session (a);
303 }
304
305 static void
306 app_mq_detach_handler (void *data)
307 {
308   session_app_detach_msg_t *mp = (session_app_detach_msg_t *) data;
309   vnet_app_detach_args_t _a, *a = &_a;
310   application_t *app;
311
312   app_check_thread_and_barrier (app_mq_detach_handler, mp);
313
314   app = application_lookup (mp->client_index);
315   if (!app)
316     return;
317
318   a->app_index = app->app_index;
319   a->api_client_index = mp->client_index;
320   vnet_application_detach (a);
321 }
322
323 static void
324 session_mq_unlisten_handler (void *data)
325 {
326   session_unlisten_msg_t *mp = (session_unlisten_msg_t *) data;
327   vnet_unlisten_args_t _a, *a = &_a;
328   app_worker_t *app_wrk;
329   application_t *app;
330   int rv;
331
332   app_check_thread_and_barrier (session_mq_unlisten_handler, mp);
333
334   app = application_lookup (mp->client_index);
335   if (!app)
336     return;
337
338   clib_memset (a, 0, sizeof (*a));
339   a->app_index = app->app_index;
340   a->handle = mp->handle;
341   a->wrk_map_index = mp->wrk_index;
342   if ((rv = vnet_unlisten (a)))
343     clib_warning ("unlisten returned: %d", rv);
344
345   app_wrk = application_get_worker (app, a->wrk_map_index);
346   if (!app_wrk)
347     return;
348
349   mq_send_unlisten_reply (app_wrk, mp->handle, mp->context, rv);
350 }
351
352 static void
353 session_mq_accepted_reply_handler (void *data)
354 {
355   session_accepted_reply_msg_t *mp = (session_accepted_reply_msg_t *) data;
356   vnet_disconnect_args_t _a = { 0 }, *a = &_a;
357   session_state_t old_state;
358   app_worker_t *app_wrk;
359   session_t *s;
360
361   /* Server isn't interested, kill the session */
362   if (mp->retval)
363     {
364       a->app_index = mp->context;
365       a->handle = mp->handle;
366       vnet_disconnect_session (a);
367       return;
368     }
369
370   /* Mail this back from the main thread. We're not polling in main
371    * thread so we're using other workers for notifications. */
372   if (vlib_num_workers () && vlib_get_thread_index () != 0
373       && session_thread_from_handle (mp->handle) == 0)
374     {
375       vlib_rpc_call_main_thread (session_mq_accepted_reply_handler,
376                                  (u8 *) mp, sizeof (*mp));
377       return;
378     }
379
380   s = session_get_from_handle_if_valid (mp->handle);
381   if (!s)
382     return;
383
384   app_wrk = app_worker_get (s->app_wrk_index);
385   if (app_wrk->app_index != mp->context)
386     {
387       clib_warning ("app doesn't own session");
388       return;
389     }
390
391   if (!session_has_transport (s))
392     {
393       s->session_state = SESSION_STATE_READY;
394       if (ct_session_connect_notify (s))
395         return;
396     }
397   else
398     {
399       old_state = s->session_state;
400       s->session_state = SESSION_STATE_READY;
401
402       if (!svm_fifo_is_empty_prod (s->rx_fifo))
403         app_worker_lock_and_send_event (app_wrk, s, SESSION_IO_EVT_RX);
404
405       /* Closed while waiting for app to reply. Resend disconnect */
406       if (old_state >= SESSION_STATE_TRANSPORT_CLOSING)
407         {
408           app_worker_close_notify (app_wrk, s);
409           s->session_state = old_state;
410           return;
411         }
412     }
413 }
414
415 static void
416 session_mq_reset_reply_handler (void *data)
417 {
418   vnet_disconnect_args_t _a = { 0 }, *a = &_a;
419   session_reset_reply_msg_t *mp;
420   app_worker_t *app_wrk;
421   session_t *s;
422   application_t *app;
423   u32 index, thread_index;
424
425   mp = (session_reset_reply_msg_t *) data;
426   app = application_lookup (mp->context);
427   if (!app)
428     return;
429
430   session_parse_handle (mp->handle, &index, &thread_index);
431   s = session_get_if_valid (index, thread_index);
432
433   /* No session or not the right session */
434   if (!s || s->session_state < SESSION_STATE_TRANSPORT_CLOSING)
435     return;
436
437   app_wrk = app_worker_get (s->app_wrk_index);
438   if (!app_wrk || app_wrk->app_index != app->app_index)
439     {
440       clib_warning ("App %u does not own handle 0x%lx!", app->app_index,
441                     mp->handle);
442       return;
443     }
444
445   /* Client objected to resetting the session, log and continue */
446   if (mp->retval)
447     {
448       clib_warning ("client retval %d", mp->retval);
449       return;
450     }
451
452   /* This comes as a response to a reset, transport only waiting for
453    * confirmation to remove connection state, no need to disconnect */
454   a->handle = mp->handle;
455   a->app_index = app->app_index;
456   vnet_disconnect_session (a);
457 }
458
459 static void
460 session_mq_disconnected_handler (void *data)
461 {
462   session_disconnected_reply_msg_t *rmp;
463   vnet_disconnect_args_t _a, *a = &_a;
464   svm_msg_q_msg_t _msg, *msg = &_msg;
465   session_disconnected_msg_t *mp;
466   app_worker_t *app_wrk;
467   session_event_t *evt;
468   session_t *s;
469   application_t *app;
470   int rv = 0;
471
472   mp = (session_disconnected_msg_t *) data;
473   if (!(s = session_get_from_handle_if_valid (mp->handle)))
474     {
475       clib_warning ("could not disconnect handle %llu", mp->handle);
476       return;
477     }
478   app_wrk = app_worker_get (s->app_wrk_index);
479   app = application_lookup (mp->client_index);
480   if (!(app_wrk && app && app->app_index == app_wrk->app_index))
481     {
482       clib_warning ("could not disconnect session: %llu app: %u",
483                     mp->handle, mp->client_index);
484       return;
485     }
486
487   a->handle = mp->handle;
488   a->app_index = app_wrk->wrk_index;
489   rv = vnet_disconnect_session (a);
490
491   svm_msg_q_lock_and_alloc_msg_w_ring (app_wrk->event_queue,
492                                        SESSION_MQ_CTRL_EVT_RING,
493                                        SVM_Q_WAIT, msg);
494   evt = svm_msg_q_msg_data (app_wrk->event_queue, msg);
495   clib_memset (evt, 0, sizeof (*evt));
496   evt->event_type = SESSION_CTRL_EVT_DISCONNECTED_REPLY;
497   rmp = (session_disconnected_reply_msg_t *) evt->data;
498   rmp->handle = mp->handle;
499   rmp->context = mp->context;
500   rmp->retval = rv;
501   svm_msg_q_add_and_unlock (app_wrk->event_queue, msg);
502 }
503
504 static void
505 session_mq_disconnected_reply_handler (void *data)
506 {
507   session_disconnected_reply_msg_t *mp;
508   vnet_disconnect_args_t _a, *a = &_a;
509   application_t *app;
510
511   mp = (session_disconnected_reply_msg_t *) data;
512
513   /* Client objected to disconnecting the session, log and continue */
514   if (mp->retval)
515     {
516       clib_warning ("client retval %d", mp->retval);
517       return;
518     }
519
520   /* Disconnect has been confirmed. Confirm close to transport */
521   app = application_lookup (mp->context);
522   if (app)
523     {
524       a->handle = mp->handle;
525       a->app_index = app->app_index;
526       vnet_disconnect_session (a);
527     }
528 }
529
530 static void
531 session_mq_worker_update_handler (void *data)
532 {
533   session_worker_update_msg_t *mp = (session_worker_update_msg_t *) data;
534   session_worker_update_reply_msg_t *rmp;
535   svm_msg_q_msg_t _msg, *msg = &_msg;
536   app_worker_t *app_wrk;
537   u32 owner_app_wrk_map;
538   session_event_t *evt;
539   session_t *s;
540   application_t *app;
541
542   app = application_lookup (mp->client_index);
543   if (!app)
544     return;
545   if (!(s = session_get_from_handle_if_valid (mp->handle)))
546     {
547       clib_warning ("invalid handle %llu", mp->handle);
548       return;
549     }
550   app_wrk = app_worker_get (s->app_wrk_index);
551   if (app_wrk->app_index != app->app_index)
552     {
553       clib_warning ("app %u does not own session %llu", app->app_index,
554                     mp->handle);
555       return;
556     }
557   owner_app_wrk_map = app_wrk->wrk_map_index;
558   app_wrk = application_get_worker (app, mp->wrk_index);
559
560   /* This needs to come from the new owner */
561   if (mp->req_wrk_index == owner_app_wrk_map)
562     {
563       session_req_worker_update_msg_t *wump;
564
565       svm_msg_q_lock_and_alloc_msg_w_ring (app_wrk->event_queue,
566                                            SESSION_MQ_CTRL_EVT_RING,
567                                            SVM_Q_WAIT, msg);
568       evt = svm_msg_q_msg_data (app_wrk->event_queue, msg);
569       clib_memset (evt, 0, sizeof (*evt));
570       evt->event_type = SESSION_CTRL_EVT_REQ_WORKER_UPDATE;
571       wump = (session_req_worker_update_msg_t *) evt->data;
572       wump->session_handle = mp->handle;
573       svm_msg_q_add_and_unlock (app_wrk->event_queue, msg);
574       return;
575     }
576
577   app_worker_own_session (app_wrk, s);
578
579   /*
580    * Send reply
581    */
582   svm_msg_q_lock_and_alloc_msg_w_ring (app_wrk->event_queue,
583                                        SESSION_MQ_CTRL_EVT_RING,
584                                        SVM_Q_WAIT, msg);
585   evt = svm_msg_q_msg_data (app_wrk->event_queue, msg);
586   clib_memset (evt, 0, sizeof (*evt));
587   evt->event_type = SESSION_CTRL_EVT_WORKER_UPDATE_REPLY;
588   rmp = (session_worker_update_reply_msg_t *) evt->data;
589   rmp->handle = mp->handle;
590   if (s->rx_fifo)
591     rmp->rx_fifo = fifo_segment_fifo_offset (s->rx_fifo);
592   if (s->tx_fifo)
593     rmp->tx_fifo = fifo_segment_fifo_offset (s->tx_fifo);
594   rmp->segment_handle = session_segment_handle (s);
595   svm_msg_q_add_and_unlock (app_wrk->event_queue, msg);
596
597   /*
598    * Retransmit messages that may have been lost
599    */
600   if (s->tx_fifo && !svm_fifo_is_empty (s->tx_fifo))
601     session_send_io_evt_to_thread (s->tx_fifo, SESSION_IO_EVT_TX);
602
603   if (s->rx_fifo && !svm_fifo_is_empty (s->rx_fifo))
604     app_worker_lock_and_send_event (app_wrk, s, SESSION_IO_EVT_RX);
605
606   if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSING)
607     app_worker_close_notify (app_wrk, s);
608 }
609
610 static void
611 session_mq_app_wrk_rpc_handler (void *data)
612 {
613   session_app_wrk_rpc_msg_t *mp = (session_app_wrk_rpc_msg_t *) data;
614   svm_msg_q_msg_t _msg, *msg = &_msg;
615   session_app_wrk_rpc_msg_t *rmp;
616   app_worker_t *app_wrk;
617   session_event_t *evt;
618   application_t *app;
619
620   app = application_lookup (mp->client_index);
621   if (!app)
622     return;
623
624   app_wrk = application_get_worker (app, mp->wrk_index);
625
626   svm_msg_q_lock_and_alloc_msg_w_ring (app_wrk->event_queue,
627                                        SESSION_MQ_CTRL_EVT_RING, SVM_Q_WAIT,
628                                        msg);
629   evt = svm_msg_q_msg_data (app_wrk->event_queue, msg);
630   clib_memset (evt, 0, sizeof (*evt));
631   evt->event_type = SESSION_CTRL_EVT_APP_WRK_RPC;
632   rmp = (session_app_wrk_rpc_msg_t *) evt->data;
633   clib_memcpy (rmp->data, mp->data, sizeof (mp->data));
634   svm_msg_q_add_and_unlock (app_wrk->event_queue, msg);
635 }
636
637 static void
638 session_mq_transport_attr_handler (void *data)
639 {
640   session_transport_attr_msg_t *mp = (session_transport_attr_msg_t *) data;
641   session_transport_attr_reply_msg_t *rmp;
642   svm_msg_q_msg_t _msg, *msg = &_msg;
643   app_worker_t *app_wrk;
644   session_event_t *evt;
645   application_t *app;
646   session_t *s;
647   int rv;
648
649   app = application_lookup (mp->client_index);
650   if (!app)
651     return;
652
653   if (!(s = session_get_from_handle_if_valid (mp->handle)))
654     {
655       clib_warning ("invalid handle %llu", mp->handle);
656       return;
657     }
658   app_wrk = app_worker_get (s->app_wrk_index);
659   if (app_wrk->app_index != app->app_index)
660     {
661       clib_warning ("app %u does not own session %llu", app->app_index,
662                     mp->handle);
663       return;
664     }
665
666   rv = session_transport_attribute (s, mp->is_get, &mp->attr);
667
668   svm_msg_q_lock_and_alloc_msg_w_ring (
669     app_wrk->event_queue, SESSION_MQ_CTRL_EVT_RING, SVM_Q_WAIT, msg);
670   evt = svm_msg_q_msg_data (app_wrk->event_queue, msg);
671   clib_memset (evt, 0, sizeof (*evt));
672   evt->event_type = SESSION_CTRL_EVT_TRANSPORT_ATTR_REPLY;
673   rmp = (session_transport_attr_reply_msg_t *) evt->data;
674   rmp->handle = mp->handle;
675   rmp->retval = rv;
676   rmp->is_get = mp->is_get;
677   if (!rv && mp->is_get)
678     rmp->attr = mp->attr;
679   svm_msg_q_add_and_unlock (app_wrk->event_queue, msg);
680 }
681
682 vlib_node_registration_t session_queue_node;
683
684 typedef struct
685 {
686   u32 session_index;
687   u32 server_thread_index;
688 } session_queue_trace_t;
689
690 /* packet trace format function */
691 static u8 *
692 format_session_queue_trace (u8 * s, va_list * args)
693 {
694   CLIB_UNUSED (vlib_main_t * vm) = va_arg (*args, vlib_main_t *);
695   CLIB_UNUSED (vlib_node_t * node) = va_arg (*args, vlib_node_t *);
696   session_queue_trace_t *t = va_arg (*args, session_queue_trace_t *);
697
698   s = format (s, "session index %d thread index %d",
699               t->session_index, t->server_thread_index);
700   return s;
701 }
702
703 #define foreach_session_queue_error             \
704 _(TX, "Packets transmitted")                    \
705 _(TIMER, "Timer events")                        \
706 _(NO_BUFFER, "Out of buffers")
707
708 typedef enum
709 {
710 #define _(sym,str) SESSION_QUEUE_ERROR_##sym,
711   foreach_session_queue_error
712 #undef _
713     SESSION_QUEUE_N_ERROR,
714 } session_queue_error_t;
715
716 static char *session_queue_error_strings[] = {
717 #define _(sym,string) string,
718   foreach_session_queue_error
719 #undef _
720 };
721
722 enum
723 {
724   SESSION_TX_NO_BUFFERS = -2,
725   SESSION_TX_NO_DATA,
726   SESSION_TX_OK
727 };
728
729 static void
730 session_tx_trace_frame (vlib_main_t * vm, vlib_node_runtime_t * node,
731                         u32 next_index, u32 * to_next, u16 n_segs,
732                         session_t * s, u32 n_trace)
733 {
734   while (n_trace && n_segs)
735     {
736       vlib_buffer_t *b = vlib_get_buffer (vm, to_next[0]);
737       if (PREDICT_TRUE
738           (vlib_trace_buffer
739            (vm, node, next_index, b, 1 /* follow_chain */ )))
740         {
741           session_queue_trace_t *t =
742             vlib_add_trace (vm, node, b, sizeof (*t));
743           t->session_index = s->session_index;
744           t->server_thread_index = s->thread_index;
745           n_trace--;
746         }
747       to_next++;
748       n_segs--;
749     }
750   vlib_set_trace_count (vm, node, n_trace);
751 }
752
753 always_inline void
754 session_tx_fifo_chain_tail (vlib_main_t * vm, session_tx_context_t * ctx,
755                             vlib_buffer_t * b, u16 * n_bufs, u8 peek_data)
756 {
757   vlib_buffer_t *chain_b, *prev_b;
758   u32 chain_bi0, to_deq, left_from_seg;
759   session_worker_t *wrk;
760   u16 len_to_deq, n_bytes_read;
761   u8 *data, j;
762
763   wrk = session_main_get_worker (ctx->s->thread_index);
764   b->flags |= VLIB_BUFFER_TOTAL_LENGTH_VALID;
765   b->total_length_not_including_first_buffer = 0;
766
767   chain_b = b;
768   left_from_seg = clib_min (ctx->sp.snd_mss - b->current_length,
769                             ctx->left_to_snd);
770   to_deq = left_from_seg;
771   for (j = 1; j < ctx->n_bufs_per_seg; j++)
772     {
773       prev_b = chain_b;
774       len_to_deq = clib_min (to_deq, ctx->deq_per_buf);
775
776       *n_bufs -= 1;
777       chain_bi0 = wrk->tx_buffers[*n_bufs];
778       chain_b = vlib_get_buffer (vm, chain_bi0);
779       chain_b->current_data = 0;
780       data = vlib_buffer_get_current (chain_b);
781       if (peek_data)
782         {
783           n_bytes_read = svm_fifo_peek (ctx->s->tx_fifo,
784                                         ctx->sp.tx_offset, len_to_deq, data);
785           ctx->sp.tx_offset += n_bytes_read;
786         }
787       else
788         {
789           if (ctx->transport_vft->transport_options.tx_type ==
790               TRANSPORT_TX_DGRAM)
791             {
792               svm_fifo_t *f = ctx->s->tx_fifo;
793               session_dgram_hdr_t *hdr = &ctx->hdr;
794               u16 deq_now;
795               u32 offset;
796
797               deq_now = clib_min (hdr->data_length - hdr->data_offset,
798                                   len_to_deq);
799               offset = hdr->data_offset + SESSION_CONN_HDR_LEN;
800               n_bytes_read = svm_fifo_peek (f, offset, deq_now, data);
801               ASSERT (n_bytes_read > 0);
802
803               hdr->data_offset += n_bytes_read;
804               if (hdr->data_offset == hdr->data_length)
805                 {
806                   offset = hdr->data_length + SESSION_CONN_HDR_LEN;
807                   svm_fifo_dequeue_drop (f, offset);
808                   if (ctx->left_to_snd > n_bytes_read)
809                     svm_fifo_peek (ctx->s->tx_fifo, 0, sizeof (ctx->hdr),
810                                    (u8 *) & ctx->hdr);
811                 }
812               else if (ctx->left_to_snd == n_bytes_read)
813                 svm_fifo_overwrite_head (ctx->s->tx_fifo, (u8 *) & ctx->hdr,
814                                          sizeof (session_dgram_pre_hdr_t));
815             }
816           else
817             n_bytes_read = svm_fifo_dequeue (ctx->s->tx_fifo,
818                                              len_to_deq, data);
819         }
820       ASSERT (n_bytes_read == len_to_deq);
821       chain_b->current_length = n_bytes_read;
822       b->total_length_not_including_first_buffer += chain_b->current_length;
823
824       /* update previous buffer */
825       prev_b->next_buffer = chain_bi0;
826       prev_b->flags |= VLIB_BUFFER_NEXT_PRESENT;
827
828       /* update current buffer */
829       chain_b->next_buffer = 0;
830
831       to_deq -= n_bytes_read;
832       if (to_deq == 0)
833         break;
834     }
835   ASSERT (to_deq == 0
836           && b->total_length_not_including_first_buffer == left_from_seg);
837   ctx->left_to_snd -= left_from_seg;
838 }
839
840 always_inline void
841 session_tx_fill_buffer (vlib_main_t * vm, session_tx_context_t * ctx,
842                         vlib_buffer_t * b, u16 * n_bufs, u8 peek_data)
843 {
844   u32 len_to_deq;
845   u8 *data0;
846   int n_bytes_read;
847
848   /*
849    * Start with the first buffer in chain
850    */
851   b->error = 0;
852   b->flags = VNET_BUFFER_F_LOCALLY_ORIGINATED;
853   b->current_data = 0;
854
855   data0 = vlib_buffer_make_headroom (b, TRANSPORT_MAX_HDRS_LEN);
856   len_to_deq = clib_min (ctx->left_to_snd, ctx->deq_per_first_buf);
857
858   if (peek_data)
859     {
860       n_bytes_read = svm_fifo_peek (ctx->s->tx_fifo, ctx->sp.tx_offset,
861                                     len_to_deq, data0);
862       ASSERT (n_bytes_read > 0);
863       /* Keep track of progress locally, transport is also supposed to
864        * increment it independently when pushing the header */
865       ctx->sp.tx_offset += n_bytes_read;
866     }
867   else
868     {
869       if (ctx->transport_vft->transport_options.tx_type == TRANSPORT_TX_DGRAM)
870         {
871           session_dgram_hdr_t *hdr = &ctx->hdr;
872           svm_fifo_t *f = ctx->s->tx_fifo;
873           u16 deq_now;
874           u32 offset;
875
876           ASSERT (hdr->data_length > hdr->data_offset);
877           deq_now = clib_min (hdr->data_length - hdr->data_offset,
878                               len_to_deq);
879           offset = hdr->data_offset + SESSION_CONN_HDR_LEN;
880           n_bytes_read = svm_fifo_peek (f, offset, deq_now, data0);
881           ASSERT (n_bytes_read > 0);
882
883           if (ctx->s->session_state == SESSION_STATE_LISTENING)
884             {
885               ip_copy (&ctx->tc->rmt_ip, &hdr->rmt_ip, ctx->tc->is_ip4);
886               ctx->tc->rmt_port = hdr->rmt_port;
887             }
888           hdr->data_offset += n_bytes_read;
889           if (hdr->data_offset == hdr->data_length)
890             {
891               offset = hdr->data_length + SESSION_CONN_HDR_LEN;
892               svm_fifo_dequeue_drop (f, offset);
893               if (ctx->left_to_snd > n_bytes_read)
894                 svm_fifo_peek (ctx->s->tx_fifo, 0, sizeof (ctx->hdr),
895                                (u8 *) & ctx->hdr);
896             }
897           else if (ctx->left_to_snd == n_bytes_read)
898             svm_fifo_overwrite_head (ctx->s->tx_fifo, (u8 *) & ctx->hdr,
899                                      sizeof (session_dgram_pre_hdr_t));
900         }
901       else
902         {
903           n_bytes_read = svm_fifo_dequeue (ctx->s->tx_fifo,
904                                            len_to_deq, data0);
905           ASSERT (n_bytes_read > 0);
906         }
907     }
908   b->current_length = n_bytes_read;
909   ctx->left_to_snd -= n_bytes_read;
910
911   /*
912    * Fill in the remaining buffers in the chain, if any
913    */
914   if (PREDICT_FALSE (ctx->n_bufs_per_seg > 1 && ctx->left_to_snd))
915     session_tx_fifo_chain_tail (vm, ctx, b, n_bufs, peek_data);
916 }
917
918 always_inline u8
919 session_tx_not_ready (session_t * s, u8 peek_data)
920 {
921   if (peek_data)
922     {
923       if (PREDICT_TRUE (s->session_state == SESSION_STATE_READY))
924         return 0;
925       /* Can retransmit for closed sessions but can't send new data if
926        * session is not ready or closed */
927       else if (s->session_state < SESSION_STATE_READY)
928         return 1;
929       else if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSED)
930         {
931           /* Allow closed transports to still send custom packets.
932            * For instance, tcp may want to send acks in time-wait. */
933           if (s->session_state != SESSION_STATE_TRANSPORT_DELETED
934               && (s->flags & SESSION_F_CUSTOM_TX))
935             return 0;
936           return 2;
937         }
938     }
939   return 0;
940 }
941
942 always_inline transport_connection_t *
943 session_tx_get_transport (session_tx_context_t * ctx, u8 peek_data)
944 {
945   if (peek_data)
946     {
947       return ctx->transport_vft->get_connection (ctx->s->connection_index,
948                                                  ctx->s->thread_index);
949     }
950   else
951     {
952       if (ctx->s->session_state == SESSION_STATE_LISTENING)
953         return ctx->transport_vft->get_listener (ctx->s->connection_index);
954       else
955         {
956           return ctx->transport_vft->get_connection (ctx->s->connection_index,
957                                                      ctx->s->thread_index);
958         }
959     }
960 }
961
962 always_inline void
963 session_tx_set_dequeue_params (vlib_main_t * vm, session_tx_context_t * ctx,
964                                u32 max_segs, u8 peek_data)
965 {
966   u32 n_bytes_per_buf, n_bytes_per_seg;
967
968   n_bytes_per_buf = vlib_buffer_get_default_data_size (vm);
969   ctx->max_dequeue = svm_fifo_max_dequeue_cons (ctx->s->tx_fifo);
970
971   if (peek_data)
972     {
973       /* Offset in rx fifo from where to peek data */
974       if (PREDICT_FALSE (ctx->sp.tx_offset >= ctx->max_dequeue))
975         {
976           ctx->max_len_to_snd = 0;
977           return;
978         }
979       ctx->max_dequeue -= ctx->sp.tx_offset;
980     }
981   else
982     {
983       if (ctx->transport_vft->transport_options.tx_type == TRANSPORT_TX_DGRAM)
984         {
985           u32 len, chain_limit;
986
987           if (ctx->max_dequeue <= sizeof (ctx->hdr))
988             {
989               ctx->max_len_to_snd = 0;
990               return;
991             }
992
993           svm_fifo_peek (ctx->s->tx_fifo, 0, sizeof (ctx->hdr),
994                          (u8 *) & ctx->hdr);
995           ASSERT (ctx->hdr.data_length > ctx->hdr.data_offset);
996           len = ctx->hdr.data_length - ctx->hdr.data_offset;
997
998           /* Process multiple dgrams if smaller than min (buf_space, mss).
999            * This avoids handling multiple dgrams if they require buffer
1000            * chains */
1001           chain_limit = clib_min (n_bytes_per_buf - TRANSPORT_MAX_HDRS_LEN,
1002                                   ctx->sp.snd_mss);
1003           if (ctx->hdr.data_length <= chain_limit)
1004             {
1005               u32 first_dgram_len, dgram_len, offset, max_offset;
1006               session_dgram_hdr_t hdr;
1007
1008               ctx->sp.snd_mss = clib_min (ctx->sp.snd_mss, len);
1009               offset = ctx->hdr.data_length + sizeof (session_dgram_hdr_t);
1010               first_dgram_len = len;
1011               max_offset = clib_min (ctx->max_dequeue, 16 << 10);
1012
1013               while (offset < max_offset)
1014                 {
1015                   svm_fifo_peek (ctx->s->tx_fifo, offset, sizeof (ctx->hdr),
1016                                  (u8 *) & hdr);
1017                   ASSERT (hdr.data_length > hdr.data_offset);
1018                   dgram_len = hdr.data_length - hdr.data_offset;
1019                   if (len + dgram_len > ctx->max_dequeue
1020                       || first_dgram_len != dgram_len)
1021                     break;
1022                   len += dgram_len;
1023                   offset += sizeof (hdr) + hdr.data_length;
1024                 }
1025             }
1026
1027           ctx->max_dequeue = len;
1028         }
1029     }
1030   ASSERT (ctx->max_dequeue > 0);
1031
1032   /* Ensure we're not writing more than transport window allows */
1033   if (ctx->max_dequeue < ctx->sp.snd_space)
1034     {
1035       /* Constrained by tx queue. Try to send only fully formed segments */
1036       ctx->max_len_to_snd = (ctx->max_dequeue > ctx->sp.snd_mss) ?
1037         (ctx->max_dequeue - (ctx->max_dequeue % ctx->sp.snd_mss)) :
1038         ctx->max_dequeue;
1039       /* TODO Nagle ? */
1040     }
1041   else
1042     {
1043       /* Expectation is that snd_space0 is already a multiple of snd_mss */
1044       ctx->max_len_to_snd = ctx->sp.snd_space;
1045     }
1046
1047   /* Check if we're tx constrained by the node */
1048   ctx->n_segs_per_evt = ceil ((f64) ctx->max_len_to_snd / ctx->sp.snd_mss);
1049   if (ctx->n_segs_per_evt > max_segs)
1050     {
1051       ctx->n_segs_per_evt = max_segs;
1052       ctx->max_len_to_snd = max_segs * ctx->sp.snd_mss;
1053     }
1054
1055   ASSERT (n_bytes_per_buf > TRANSPORT_MAX_HDRS_LEN);
1056   if (ctx->n_segs_per_evt > 1)
1057     {
1058       u32 n_bytes_last_seg, n_bufs_last_seg;
1059
1060       n_bytes_per_seg = TRANSPORT_MAX_HDRS_LEN + ctx->sp.snd_mss;
1061       n_bytes_last_seg = TRANSPORT_MAX_HDRS_LEN + ctx->max_len_to_snd
1062         - ((ctx->n_segs_per_evt - 1) * ctx->sp.snd_mss);
1063       ctx->n_bufs_per_seg = ceil ((f64) n_bytes_per_seg / n_bytes_per_buf);
1064       n_bufs_last_seg = ceil ((f64) n_bytes_last_seg / n_bytes_per_buf);
1065       ctx->n_bufs_needed = ((ctx->n_segs_per_evt - 1) * ctx->n_bufs_per_seg)
1066         + n_bufs_last_seg;
1067     }
1068   else
1069     {
1070       n_bytes_per_seg = TRANSPORT_MAX_HDRS_LEN + ctx->max_len_to_snd;
1071       ctx->n_bufs_per_seg = ceil ((f64) n_bytes_per_seg / n_bytes_per_buf);
1072       ctx->n_bufs_needed = ctx->n_bufs_per_seg;
1073     }
1074
1075   ctx->deq_per_buf = clib_min (ctx->sp.snd_mss, n_bytes_per_buf);
1076   ctx->deq_per_first_buf = clib_min (ctx->sp.snd_mss,
1077                                      n_bytes_per_buf -
1078                                      TRANSPORT_MAX_HDRS_LEN);
1079 }
1080
1081 always_inline void
1082 session_tx_maybe_reschedule (session_worker_t * wrk,
1083                              session_tx_context_t * ctx,
1084                              session_evt_elt_t * elt)
1085 {
1086   session_t *s = ctx->s;
1087
1088   svm_fifo_unset_event (s->tx_fifo);
1089   if (svm_fifo_max_dequeue_cons (s->tx_fifo) > ctx->sp.tx_offset)
1090     if (svm_fifo_set_event (s->tx_fifo))
1091       session_evt_add_head_old (wrk, elt);
1092 }
1093
1094 always_inline int
1095 session_tx_fifo_read_and_snd_i (session_worker_t * wrk,
1096                                 vlib_node_runtime_t * node,
1097                                 session_evt_elt_t * elt,
1098                                 int *n_tx_packets, u8 peek_data)
1099 {
1100   u32 n_trace, n_left, pbi, next_index, max_burst;
1101   session_tx_context_t *ctx = &wrk->ctx;
1102   session_main_t *smm = &session_main;
1103   session_event_t *e = &elt->evt;
1104   vlib_main_t *vm = wrk->vm;
1105   transport_proto_t tp;
1106   vlib_buffer_t *pb;
1107   u16 n_bufs, rv;
1108
1109   if (PREDICT_FALSE ((rv = session_tx_not_ready (ctx->s, peek_data))))
1110     {
1111       if (rv < 2)
1112         session_evt_add_old (wrk, elt);
1113       return SESSION_TX_NO_DATA;
1114     }
1115
1116   next_index = smm->session_type_to_next[ctx->s->session_type];
1117   max_burst = SESSION_NODE_FRAME_SIZE - *n_tx_packets;
1118
1119   tp = session_get_transport_proto (ctx->s);
1120   ctx->transport_vft = transport_protocol_get_vft (tp);
1121   ctx->tc = session_tx_get_transport (ctx, peek_data);
1122
1123   if (PREDICT_FALSE (e->event_type == SESSION_IO_EVT_TX_FLUSH))
1124     {
1125       if (ctx->transport_vft->flush_data)
1126         ctx->transport_vft->flush_data (ctx->tc);
1127       e->event_type = SESSION_IO_EVT_TX;
1128     }
1129
1130   if (ctx->s->flags & SESSION_F_CUSTOM_TX)
1131     {
1132       u32 n_custom_tx;
1133       ctx->s->flags &= ~SESSION_F_CUSTOM_TX;
1134       ctx->sp.max_burst_size = max_burst;
1135       n_custom_tx = ctx->transport_vft->custom_tx (ctx->tc, &ctx->sp);
1136       *n_tx_packets += n_custom_tx;
1137       if (PREDICT_FALSE
1138           (ctx->s->session_state >= SESSION_STATE_TRANSPORT_CLOSED))
1139         return SESSION_TX_OK;
1140       max_burst -= n_custom_tx;
1141       if (!max_burst || (ctx->s->flags & SESSION_F_CUSTOM_TX))
1142         {
1143           session_evt_add_old (wrk, elt);
1144           return SESSION_TX_OK;
1145         }
1146     }
1147
1148   transport_connection_snd_params (ctx->tc, &ctx->sp);
1149
1150   if (!ctx->sp.snd_space)
1151     {
1152       /* If the deschedule flag was set, remove session from scheduler.
1153        * Transport is responsible for rescheduling this session. */
1154       if (ctx->sp.flags & TRANSPORT_SND_F_DESCHED)
1155         transport_connection_deschedule (ctx->tc);
1156       /* Request to postpone the session, e.g., zero-wnd and transport
1157        * is not currently probing */
1158       else if (ctx->sp.flags & TRANSPORT_SND_F_POSTPONE)
1159         session_evt_add_old (wrk, elt);
1160       /* This flow queue is "empty" so it should be re-evaluated before
1161        * the ones that have data to send. */
1162       else
1163         session_evt_add_head_old (wrk, elt);
1164
1165       return SESSION_TX_NO_DATA;
1166     }
1167
1168   if (transport_connection_is_tx_paced (ctx->tc))
1169     {
1170       u32 snd_space = transport_connection_tx_pacer_burst (ctx->tc);
1171       if (snd_space < TRANSPORT_PACER_MIN_BURST)
1172         {
1173           session_evt_add_head_old (wrk, elt);
1174           return SESSION_TX_NO_DATA;
1175         }
1176       snd_space = clib_min (ctx->sp.snd_space, snd_space);
1177       ctx->sp.snd_space = snd_space >= ctx->sp.snd_mss ?
1178         snd_space - snd_space % ctx->sp.snd_mss : snd_space;
1179     }
1180
1181   /* Check how much we can pull. */
1182   session_tx_set_dequeue_params (vm, ctx, max_burst, peek_data);
1183
1184   if (PREDICT_FALSE (!ctx->max_len_to_snd))
1185     {
1186       transport_connection_tx_pacer_reset_bucket (ctx->tc, 0);
1187       session_tx_maybe_reschedule (wrk, ctx, elt);
1188       return SESSION_TX_NO_DATA;
1189     }
1190
1191   vec_validate_aligned (wrk->tx_buffers, ctx->n_bufs_needed - 1,
1192                         CLIB_CACHE_LINE_BYTES);
1193   n_bufs = vlib_buffer_alloc (vm, wrk->tx_buffers, ctx->n_bufs_needed);
1194   if (PREDICT_FALSE (n_bufs < ctx->n_bufs_needed))
1195     {
1196       if (n_bufs)
1197         vlib_buffer_free (vm, wrk->tx_buffers, n_bufs);
1198       session_evt_add_head_old (wrk, elt);
1199       vlib_node_increment_counter (wrk->vm, node->node_index,
1200                                    SESSION_QUEUE_ERROR_NO_BUFFER, 1);
1201       return SESSION_TX_NO_BUFFERS;
1202     }
1203
1204   if (transport_connection_is_tx_paced (ctx->tc))
1205     transport_connection_tx_pacer_update_bytes (ctx->tc, ctx->max_len_to_snd);
1206
1207   ctx->left_to_snd = ctx->max_len_to_snd;
1208   n_left = ctx->n_segs_per_evt;
1209
1210   while (n_left >= 4)
1211     {
1212       vlib_buffer_t *b0, *b1;
1213       u32 bi0, bi1;
1214
1215       pbi = wrk->tx_buffers[n_bufs - 3];
1216       pb = vlib_get_buffer (vm, pbi);
1217       vlib_prefetch_buffer_header (pb, STORE);
1218       pbi = wrk->tx_buffers[n_bufs - 4];
1219       pb = vlib_get_buffer (vm, pbi);
1220       vlib_prefetch_buffer_header (pb, STORE);
1221
1222       bi0 = wrk->tx_buffers[--n_bufs];
1223       bi1 = wrk->tx_buffers[--n_bufs];
1224
1225       b0 = vlib_get_buffer (vm, bi0);
1226       b1 = vlib_get_buffer (vm, bi1);
1227
1228       session_tx_fill_buffer (vm, ctx, b0, &n_bufs, peek_data);
1229       session_tx_fill_buffer (vm, ctx, b1, &n_bufs, peek_data);
1230
1231       ctx->transport_vft->push_header (ctx->tc, b0);
1232       ctx->transport_vft->push_header (ctx->tc, b1);
1233
1234       n_left -= 2;
1235
1236       vec_add1 (wrk->pending_tx_buffers, bi0);
1237       vec_add1 (wrk->pending_tx_buffers, bi1);
1238       vec_add1 (wrk->pending_tx_nexts, next_index);
1239       vec_add1 (wrk->pending_tx_nexts, next_index);
1240     }
1241   while (n_left)
1242     {
1243       vlib_buffer_t *b0;
1244       u32 bi0;
1245
1246       if (n_left > 1)
1247         {
1248           pbi = wrk->tx_buffers[n_bufs - 2];
1249           pb = vlib_get_buffer (vm, pbi);
1250           vlib_prefetch_buffer_header (pb, STORE);
1251         }
1252
1253       bi0 = wrk->tx_buffers[--n_bufs];
1254       b0 = vlib_get_buffer (vm, bi0);
1255       session_tx_fill_buffer (vm, ctx, b0, &n_bufs, peek_data);
1256
1257       /* Ask transport to push header after current_length and
1258        * total_length_not_including_first_buffer are updated */
1259       ctx->transport_vft->push_header (ctx->tc, b0);
1260
1261       n_left -= 1;
1262
1263       vec_add1 (wrk->pending_tx_buffers, bi0);
1264       vec_add1 (wrk->pending_tx_nexts, next_index);
1265     }
1266
1267   if (PREDICT_FALSE ((n_trace = vlib_get_trace_count (vm, node)) > 0))
1268     session_tx_trace_frame (vm, node, next_index, wrk->pending_tx_buffers,
1269                             ctx->n_segs_per_evt, ctx->s, n_trace);
1270
1271   if (PREDICT_FALSE (n_bufs))
1272     vlib_buffer_free (vm, wrk->tx_buffers, n_bufs);
1273
1274   *n_tx_packets += ctx->n_segs_per_evt;
1275
1276   SESSION_EVT (SESSION_EVT_DEQ, ctx->s, ctx->max_len_to_snd, ctx->max_dequeue,
1277                ctx->s->tx_fifo->has_event, wrk->last_vlib_time);
1278
1279   ASSERT (ctx->left_to_snd == 0);
1280
1281   /* If we couldn't dequeue all bytes reschedule as old flow. Otherwise,
1282    * check if application enqueued more data and reschedule accordingly */
1283   if (ctx->max_len_to_snd < ctx->max_dequeue)
1284     session_evt_add_old (wrk, elt);
1285   else
1286     session_tx_maybe_reschedule (wrk, ctx, elt);
1287
1288   if (!peek_data)
1289     {
1290       u32 n_dequeued = ctx->max_len_to_snd;
1291       if (ctx->transport_vft->transport_options.tx_type == TRANSPORT_TX_DGRAM)
1292         n_dequeued += ctx->n_segs_per_evt * SESSION_CONN_HDR_LEN;
1293       if (svm_fifo_needs_deq_ntf (ctx->s->tx_fifo, n_dequeued))
1294         session_dequeue_notify (ctx->s);
1295     }
1296   return SESSION_TX_OK;
1297 }
1298
1299 int
1300 session_tx_fifo_peek_and_snd (session_worker_t * wrk,
1301                               vlib_node_runtime_t * node,
1302                               session_evt_elt_t * e, int *n_tx_packets)
1303 {
1304   return session_tx_fifo_read_and_snd_i (wrk, node, e, n_tx_packets, 1);
1305 }
1306
1307 int
1308 session_tx_fifo_dequeue_and_snd (session_worker_t * wrk,
1309                                  vlib_node_runtime_t * node,
1310                                  session_evt_elt_t * e, int *n_tx_packets)
1311 {
1312   return session_tx_fifo_read_and_snd_i (wrk, node, e, n_tx_packets, 0);
1313 }
1314
1315 int
1316 session_tx_fifo_dequeue_internal (session_worker_t * wrk,
1317                                   vlib_node_runtime_t * node,
1318                                   session_evt_elt_t * elt, int *n_tx_packets)
1319 {
1320   transport_send_params_t *sp = &wrk->ctx.sp;
1321   session_t *s = wrk->ctx.s;
1322   u32 n_packets;
1323
1324   if (PREDICT_FALSE (s->session_state >= SESSION_STATE_TRANSPORT_CLOSED))
1325     return 0;
1326
1327   /* Clear custom-tx flag used to request reschedule for tx */
1328   s->flags &= ~SESSION_F_CUSTOM_TX;
1329
1330   sp->max_burst_size = clib_min (SESSION_NODE_FRAME_SIZE - *n_tx_packets,
1331                                  TRANSPORT_PACER_MAX_BURST_PKTS);
1332
1333   n_packets = transport_custom_tx (session_get_transport_proto (s), s, sp);
1334   *n_tx_packets += n_packets;
1335
1336   if (s->flags & SESSION_F_CUSTOM_TX)
1337     {
1338       session_evt_add_old (wrk, elt);
1339     }
1340   else if (!(sp->flags & TRANSPORT_SND_F_DESCHED))
1341     {
1342       svm_fifo_unset_event (s->tx_fifo);
1343       if (svm_fifo_max_dequeue_cons (s->tx_fifo))
1344         if (svm_fifo_set_event (s->tx_fifo))
1345           session_evt_add_head_old (wrk, elt);
1346     }
1347
1348   if (sp->max_burst_size &&
1349       svm_fifo_needs_deq_ntf (s->tx_fifo, sp->max_burst_size))
1350     session_dequeue_notify (s);
1351
1352   return n_packets;
1353 }
1354
1355 always_inline session_t *
1356 session_event_get_session (session_worker_t * wrk, session_event_t * e)
1357 {
1358   if (PREDICT_FALSE (pool_is_free_index (wrk->sessions, e->session_index)))
1359     return 0;
1360
1361   ASSERT (session_is_valid (e->session_index, wrk->vm->thread_index));
1362   return pool_elt_at_index (wrk->sessions, e->session_index);
1363 }
1364
1365 always_inline void
1366 session_event_dispatch_ctrl (session_worker_t * wrk, session_evt_elt_t * elt)
1367 {
1368   clib_llist_index_t ei;
1369   void (*fp) (void *);
1370   session_event_t *e;
1371   session_t *s;
1372
1373   ei = clib_llist_entry_index (wrk->event_elts, elt);
1374   e = &elt->evt;
1375
1376   switch (e->event_type)
1377     {
1378     case SESSION_CTRL_EVT_RPC:
1379       fp = e->rpc_args.fp;
1380       (*fp) (e->rpc_args.arg);
1381       break;
1382     case SESSION_CTRL_EVT_HALF_CLOSE:
1383       s = session_get_from_handle_if_valid (e->session_handle);
1384       if (PREDICT_FALSE (!s))
1385         break;
1386       session_transport_half_close (s);
1387       break;
1388     case SESSION_CTRL_EVT_CLOSE:
1389       s = session_get_from_handle_if_valid (e->session_handle);
1390       if (PREDICT_FALSE (!s))
1391         break;
1392       session_transport_close (s);
1393       break;
1394     case SESSION_CTRL_EVT_RESET:
1395       s = session_get_from_handle_if_valid (e->session_handle);
1396       if (PREDICT_FALSE (!s))
1397         break;
1398       session_transport_reset (s);
1399       break;
1400     case SESSION_CTRL_EVT_LISTEN:
1401       session_mq_listen_handler (session_evt_ctrl_data (wrk, elt));
1402       break;
1403     case SESSION_CTRL_EVT_LISTEN_URI:
1404       session_mq_listen_uri_handler (session_evt_ctrl_data (wrk, elt));
1405       break;
1406     case SESSION_CTRL_EVT_UNLISTEN:
1407       session_mq_unlisten_handler (session_evt_ctrl_data (wrk, elt));
1408       break;
1409     case SESSION_CTRL_EVT_CONNECT:
1410       session_mq_connect_handler (wrk, elt);
1411       break;
1412     case SESSION_CTRL_EVT_CONNECT_URI:
1413       session_mq_connect_uri_handler (session_evt_ctrl_data (wrk, elt));
1414       break;
1415     case SESSION_CTRL_EVT_SHUTDOWN:
1416       session_mq_shutdown_handler (session_evt_ctrl_data (wrk, elt));
1417       break;
1418     case SESSION_CTRL_EVT_DISCONNECT:
1419       session_mq_disconnect_handler (session_evt_ctrl_data (wrk, elt));
1420       break;
1421     case SESSION_CTRL_EVT_DISCONNECTED:
1422       session_mq_disconnected_handler (session_evt_ctrl_data (wrk, elt));
1423       break;
1424     case SESSION_CTRL_EVT_ACCEPTED_REPLY:
1425       session_mq_accepted_reply_handler (session_evt_ctrl_data (wrk, elt));
1426       break;
1427     case SESSION_CTRL_EVT_DISCONNECTED_REPLY:
1428       session_mq_disconnected_reply_handler (session_evt_ctrl_data (wrk,
1429                                                                     elt));
1430       break;
1431     case SESSION_CTRL_EVT_RESET_REPLY:
1432       session_mq_reset_reply_handler (session_evt_ctrl_data (wrk, elt));
1433       break;
1434     case SESSION_CTRL_EVT_WORKER_UPDATE:
1435       session_mq_worker_update_handler (session_evt_ctrl_data (wrk, elt));
1436       break;
1437     case SESSION_CTRL_EVT_APP_DETACH:
1438       app_mq_detach_handler (session_evt_ctrl_data (wrk, elt));
1439       break;
1440     case SESSION_CTRL_EVT_APP_WRK_RPC:
1441       session_mq_app_wrk_rpc_handler (session_evt_ctrl_data (wrk, elt));
1442       break;
1443     case SESSION_CTRL_EVT_TRANSPORT_ATTR:
1444       session_mq_transport_attr_handler (session_evt_ctrl_data (wrk, elt));
1445       break;
1446     default:
1447       clib_warning ("unhandled event type %d", e->event_type);
1448     }
1449
1450   /* Regrab elements in case pool moved */
1451   elt = pool_elt_at_index (wrk->event_elts, ei);
1452   if (!clib_llist_elt_is_linked (elt, evt_list))
1453     {
1454       e = &elt->evt;
1455       if (e->event_type >= SESSION_CTRL_EVT_BOUND)
1456         session_evt_ctrl_data_free (wrk, elt);
1457       session_evt_elt_free (wrk, elt);
1458     }
1459   SESSION_EVT (SESSION_EVT_COUNTS, CNT_CTRL_EVTS, 1, wrk);
1460 }
1461
1462 always_inline void
1463 session_event_dispatch_io (session_worker_t * wrk, vlib_node_runtime_t * node,
1464                            session_evt_elt_t * elt, int *n_tx_packets)
1465 {
1466   session_main_t *smm = &session_main;
1467   app_worker_t *app_wrk;
1468   clib_llist_index_t ei;
1469   session_event_t *e;
1470   session_t *s;
1471
1472   ei = clib_llist_entry_index (wrk->event_elts, elt);
1473   e = &elt->evt;
1474
1475   switch (e->event_type)
1476     {
1477     case SESSION_IO_EVT_TX_FLUSH:
1478     case SESSION_IO_EVT_TX:
1479       s = session_event_get_session (wrk, e);
1480       if (PREDICT_FALSE (!s))
1481         break;
1482       CLIB_PREFETCH (s->tx_fifo, 2 * CLIB_CACHE_LINE_BYTES, LOAD);
1483       wrk->ctx.s = s;
1484       /* Spray packets in per session type frames, since they go to
1485        * different nodes */
1486       (smm->session_tx_fns[s->session_type]) (wrk, node, elt, n_tx_packets);
1487       break;
1488     case SESSION_IO_EVT_RX:
1489       s = session_event_get_session (wrk, e);
1490       if (!s)
1491         break;
1492       transport_app_rx_evt (session_get_transport_proto (s),
1493                             s->connection_index, s->thread_index);
1494       break;
1495     case SESSION_IO_EVT_BUILTIN_RX:
1496       s = session_event_get_session (wrk, e);
1497       if (PREDICT_FALSE (!s || s->session_state >= SESSION_STATE_CLOSING))
1498         break;
1499       svm_fifo_unset_event (s->rx_fifo);
1500       app_wrk = app_worker_get (s->app_wrk_index);
1501       app_worker_builtin_rx (app_wrk, s);
1502       break;
1503     case SESSION_IO_EVT_BUILTIN_TX:
1504       s = session_get_from_handle_if_valid (e->session_handle);
1505       wrk->ctx.s = s;
1506       if (PREDICT_TRUE (s != 0))
1507         session_tx_fifo_dequeue_internal (wrk, node, elt, n_tx_packets);
1508       break;
1509     default:
1510       clib_warning ("unhandled event type %d", e->event_type);
1511     }
1512
1513   SESSION_EVT (SESSION_IO_EVT_COUNTS, e->event_type, 1, wrk);
1514
1515   /* Regrab elements in case pool moved */
1516   elt = pool_elt_at_index (wrk->event_elts, ei);
1517   if (!clib_llist_elt_is_linked (elt, evt_list))
1518     session_evt_elt_free (wrk, elt);
1519 }
1520
1521 /* *INDENT-OFF* */
1522 static const u32 session_evt_msg_sizes[] = {
1523 #define _(symc, sym)                                                    \
1524   [SESSION_CTRL_EVT_ ## symc] = sizeof (session_ ## sym ##_msg_t),
1525   foreach_session_ctrl_evt
1526 #undef _
1527 };
1528 /* *INDENT-ON* */
1529
1530 always_inline void
1531 session_evt_add_to_list (session_worker_t * wrk, session_event_t * evt)
1532 {
1533   session_evt_elt_t *elt;
1534
1535   if (evt->event_type >= SESSION_CTRL_EVT_RPC)
1536     {
1537       elt = session_evt_alloc_ctrl (wrk);
1538       if (evt->event_type >= SESSION_CTRL_EVT_BOUND)
1539         {
1540           elt->evt.ctrl_data_index = session_evt_ctrl_data_alloc (wrk);
1541           elt->evt.event_type = evt->event_type;
1542           clib_memcpy_fast (session_evt_ctrl_data (wrk, elt), evt->data,
1543                             session_evt_msg_sizes[evt->event_type]);
1544         }
1545       else
1546         {
1547           /* Internal control events fit into io events footprint */
1548           clib_memcpy_fast (&elt->evt, evt, sizeof (elt->evt));
1549         }
1550     }
1551   else
1552     {
1553       elt = session_evt_alloc_new (wrk);
1554       clib_memcpy_fast (&elt->evt, evt, sizeof (elt->evt));
1555     }
1556 }
1557
1558 static void
1559 session_flush_pending_tx_buffers (session_worker_t * wrk,
1560                                   vlib_node_runtime_t * node)
1561 {
1562   vlib_buffer_enqueue_to_next (wrk->vm, node, wrk->pending_tx_buffers,
1563                                wrk->pending_tx_nexts,
1564                                vec_len (wrk->pending_tx_nexts));
1565   vec_reset_length (wrk->pending_tx_buffers);
1566   vec_reset_length (wrk->pending_tx_nexts);
1567 }
1568
1569 int
1570 session_wrk_handle_mq (session_worker_t *wrk, svm_msg_q_t *mq)
1571 {
1572   svm_msg_q_msg_t _msg, *msg = &_msg;
1573   u32 i, n_to_dequeue = 0;
1574   session_event_t *evt;
1575
1576   n_to_dequeue = svm_msg_q_size (mq);
1577   for (i = 0; i < n_to_dequeue; i++)
1578     {
1579       svm_msg_q_sub_raw (mq, msg);
1580       evt = svm_msg_q_msg_data (mq, msg);
1581       session_evt_add_to_list (wrk, evt);
1582       svm_msg_q_free_msg (mq, msg);
1583     }
1584
1585   return n_to_dequeue;
1586 }
1587
1588 static void
1589 session_wrk_timerfd_update (session_worker_t *wrk, u64 time_ns)
1590 {
1591   struct itimerspec its;
1592
1593   its.it_value.tv_sec = 0;
1594   its.it_value.tv_nsec = time_ns;
1595   its.it_interval.tv_sec = 0;
1596   its.it_interval.tv_nsec = its.it_value.tv_nsec;
1597
1598   if (timerfd_settime (wrk->timerfd, 0, &its, NULL) == -1)
1599     clib_warning ("timerfd_settime");
1600 }
1601
1602 always_inline u64
1603 session_wrk_tfd_timeout (session_wrk_state_t state, u32 thread_index)
1604 {
1605   if (state == SESSION_WRK_INTERRUPT)
1606     return thread_index ? 1e6 : vlib_num_workers () ? 5e8 : 1e6;
1607   else if (state == SESSION_WRK_IDLE)
1608     return thread_index ? 1e8 : vlib_num_workers () ? 5e8 : 1e8;
1609   else
1610     return 0;
1611 }
1612
1613 static inline void
1614 session_wrk_set_state (session_worker_t *wrk, session_wrk_state_t state)
1615 {
1616   u64 time_ns;
1617
1618   wrk->state = state;
1619   time_ns = session_wrk_tfd_timeout (state, wrk->vm->thread_index);
1620   session_wrk_timerfd_update (wrk, time_ns);
1621 }
1622
1623 static void
1624 session_wrk_update_state (session_worker_t *wrk)
1625 {
1626   vlib_main_t *vm = wrk->vm;
1627
1628   if (wrk->state == SESSION_WRK_POLLING)
1629     {
1630       if (pool_elts (wrk->event_elts) == 3 &&
1631           vlib_last_vectors_per_main_loop (vm) < 1)
1632         {
1633           session_wrk_set_state (wrk, SESSION_WRK_INTERRUPT);
1634           vlib_node_set_state (vm, session_queue_node.index,
1635                                VLIB_NODE_STATE_INTERRUPT);
1636         }
1637     }
1638   else if (wrk->state == SESSION_WRK_INTERRUPT)
1639     {
1640       if (pool_elts (wrk->event_elts) > 3 ||
1641           vlib_last_vectors_per_main_loop (vm) > 1)
1642         {
1643           session_wrk_set_state (wrk, SESSION_WRK_POLLING);
1644           vlib_node_set_state (vm, session_queue_node.index,
1645                                VLIB_NODE_STATE_POLLING);
1646         }
1647       else if (PREDICT_FALSE (!pool_elts (wrk->sessions)))
1648         {
1649           session_wrk_set_state (wrk, SESSION_WRK_IDLE);
1650         }
1651     }
1652   else
1653     {
1654       if (pool_elts (wrk->event_elts))
1655         {
1656           session_wrk_set_state (wrk, SESSION_WRK_INTERRUPT);
1657         }
1658     }
1659 }
1660
1661 static uword
1662 session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
1663                        vlib_frame_t * frame)
1664 {
1665   u32 thread_index = vm->thread_index, __clib_unused n_evts;
1666   session_evt_elt_t *elt, *ctrl_he, *new_he, *old_he;
1667   session_main_t *smm = vnet_get_session_main ();
1668   session_worker_t *wrk = &smm->wrk[thread_index];
1669   clib_llist_index_t ei, next_ei, old_ti;
1670   int n_tx_packets;
1671
1672   SESSION_EVT (SESSION_EVT_DISPATCH_START, wrk);
1673
1674   session_wrk_update_time (wrk, vlib_time_now (vm));
1675
1676   /*
1677    *  Update transport time
1678    */
1679   transport_update_time (wrk->last_vlib_time, thread_index);
1680   n_tx_packets = vec_len (wrk->pending_tx_buffers);
1681   SESSION_EVT (SESSION_EVT_DSP_CNTRS, UPDATE_TIME, wrk);
1682
1683   /*
1684    *  Dequeue new internal mq events
1685    */
1686
1687   n_evts = session_wrk_handle_mq (wrk, wrk->vpp_event_queue);
1688   SESSION_EVT (SESSION_EVT_DSP_CNTRS, MQ_DEQ, wrk, n_evts);
1689
1690   /*
1691    * Handle control events
1692    */
1693
1694   ctrl_he = pool_elt_at_index (wrk->event_elts, wrk->ctrl_head);
1695
1696   clib_llist_foreach_safe (wrk->event_elts, evt_list, ctrl_he, elt, ({
1697     clib_llist_remove (wrk->event_elts, evt_list, elt);
1698     session_event_dispatch_ctrl (wrk, elt);
1699   }));
1700
1701   SESSION_EVT (SESSION_EVT_DSP_CNTRS, CTRL_EVTS, wrk);
1702
1703   /*
1704    * Handle the new io events.
1705    */
1706
1707   new_he = pool_elt_at_index (wrk->event_elts, wrk->new_head);
1708   old_he = pool_elt_at_index (wrk->event_elts, wrk->old_head);
1709   old_ti = clib_llist_prev_index (old_he, evt_list);
1710
1711   ei = clib_llist_next_index (new_he, evt_list);
1712   while (ei != wrk->new_head && n_tx_packets < SESSION_NODE_FRAME_SIZE)
1713     {
1714       elt = pool_elt_at_index (wrk->event_elts, ei);
1715       ei = clib_llist_next_index (elt, evt_list);
1716       clib_llist_remove (wrk->event_elts, evt_list, elt);
1717       session_event_dispatch_io (wrk, node, elt, &n_tx_packets);
1718     }
1719
1720   SESSION_EVT (SESSION_EVT_DSP_CNTRS, NEW_IO_EVTS, wrk);
1721
1722   /*
1723    * Handle the old io events, if we had any prior to processing the new ones
1724    */
1725
1726   if (old_ti != wrk->old_head)
1727     {
1728       old_he = pool_elt_at_index (wrk->event_elts, wrk->old_head);
1729       ei = clib_llist_next_index (old_he, evt_list);
1730
1731       while (n_tx_packets < SESSION_NODE_FRAME_SIZE)
1732         {
1733           elt = pool_elt_at_index (wrk->event_elts, ei);
1734           next_ei = clib_llist_next_index (elt, evt_list);
1735           clib_llist_remove (wrk->event_elts, evt_list, elt);
1736
1737           session_event_dispatch_io (wrk, node, elt, &n_tx_packets);
1738
1739           if (ei == old_ti)
1740             break;
1741
1742           ei = next_ei;
1743         };
1744     }
1745
1746   SESSION_EVT (SESSION_EVT_DSP_CNTRS, OLD_IO_EVTS, wrk);
1747
1748   if (vec_len (wrk->pending_tx_buffers))
1749     session_flush_pending_tx_buffers (wrk, node);
1750
1751   vlib_node_increment_counter (vm, session_queue_node.index,
1752                                SESSION_QUEUE_ERROR_TX, n_tx_packets);
1753
1754   SESSION_EVT (SESSION_EVT_DISPATCH_END, wrk, n_tx_packets);
1755
1756   if (wrk->flags & SESSION_WRK_F_ADAPTIVE)
1757     session_wrk_update_state (wrk);
1758
1759   return n_tx_packets;
1760 }
1761
1762 /* *INDENT-OFF* */
1763 VLIB_REGISTER_NODE (session_queue_node) =
1764 {
1765   .function = session_queue_node_fn,
1766   .flags = VLIB_NODE_FLAG_TRACE_SUPPORTED,
1767   .name = "session-queue",
1768   .format_trace = format_session_queue_trace,
1769   .type = VLIB_NODE_TYPE_INPUT,
1770   .n_errors = ARRAY_LEN (session_queue_error_strings),
1771   .error_strings = session_queue_error_strings,
1772   .state = VLIB_NODE_STATE_DISABLED,
1773 };
1774 /* *INDENT-ON* */
1775
1776 static clib_error_t *
1777 session_wrk_tfd_read_ready (clib_file_t *cf)
1778 {
1779   session_worker_t *wrk = session_main_get_worker (cf->private_data);
1780   u64 buf;
1781   int rv;
1782
1783   vlib_node_set_interrupt_pending (wrk->vm, session_queue_node.index);
1784   rv = read (wrk->timerfd, &buf, sizeof (buf));
1785   if (rv < 0 && errno != EAGAIN)
1786     clib_unix_warning ("failed");
1787   return 0;
1788 }
1789
1790 static clib_error_t *
1791 session_wrk_tfd_write_ready (clib_file_t *cf)
1792 {
1793   return 0;
1794 }
1795
1796 void
1797 session_wrk_enable_adaptive_mode (session_worker_t *wrk)
1798 {
1799   u32 thread_index = wrk->vm->thread_index;
1800   clib_file_t template = { 0 };
1801
1802   if ((wrk->timerfd = timerfd_create (CLOCK_MONOTONIC, TFD_NONBLOCK)) < 0)
1803     clib_warning ("timerfd_create");
1804
1805   template.read_function = session_wrk_tfd_read_ready;
1806   template.write_function = session_wrk_tfd_write_ready;
1807   template.file_descriptor = wrk->timerfd;
1808   template.private_data = thread_index;
1809   template.polling_thread_index = thread_index;
1810   template.description = format (0, "session-wrk-tfd-%u", thread_index);
1811
1812   wrk->timerfd_file = clib_file_add (&file_main, &template);
1813   wrk->flags |= SESSION_WRK_F_ADAPTIVE;
1814 }
1815
1816 static clib_error_t *
1817 session_queue_exit (vlib_main_t * vm)
1818 {
1819   if (vlib_get_n_threads () < 2)
1820     return 0;
1821
1822   /*
1823    * Shut off (especially) worker-thread session nodes.
1824    * Otherwise, vpp can crash as the main thread unmaps the
1825    * API segment.
1826    */
1827   vlib_worker_thread_barrier_sync (vm);
1828   session_node_enable_disable (0 /* is_enable */ );
1829   vlib_worker_thread_barrier_release (vm);
1830   return 0;
1831 }
1832
1833 VLIB_MAIN_LOOP_EXIT_FUNCTION (session_queue_exit);
1834
1835 static uword
1836 session_queue_run_on_main (vlib_main_t * vm)
1837 {
1838   vlib_node_runtime_t *node;
1839
1840   node = vlib_node_get_runtime (vm, session_queue_node.index);
1841   return session_queue_node_fn (vm, node, 0);
1842 }
1843
1844 static uword
1845 session_queue_process (vlib_main_t * vm, vlib_node_runtime_t * rt,
1846                        vlib_frame_t * f)
1847 {
1848   uword *event_data = 0;
1849   f64 timeout = 1.0;
1850   uword event_type;
1851
1852   while (1)
1853     {
1854       vlib_process_wait_for_event_or_clock (vm, timeout);
1855       event_type = vlib_process_get_events (vm, (uword **) & event_data);
1856
1857       switch (event_type)
1858         {
1859         case SESSION_Q_PROCESS_RUN_ON_MAIN:
1860           /* Run session queue node on main thread */
1861           session_queue_run_on_main (vm);
1862           break;
1863         case SESSION_Q_PROCESS_STOP:
1864           vlib_node_set_state (vm, session_queue_process_node.index,
1865                                VLIB_NODE_STATE_DISABLED);
1866           timeout = 100000.0;
1867           break;
1868         case ~0:
1869           /* Timed out. Run on main to ensure all events are handled */
1870           session_queue_run_on_main (vm);
1871           break;
1872         }
1873       vec_reset_length (event_data);
1874     }
1875   return 0;
1876 }
1877
1878 /* *INDENT-OFF* */
1879 VLIB_REGISTER_NODE (session_queue_process_node) =
1880 {
1881   .function = session_queue_process,
1882   .type = VLIB_NODE_TYPE_PROCESS,
1883   .name = "session-queue-process",
1884   .state = VLIB_NODE_STATE_DISABLED,
1885 };
1886 /* *INDENT-ON* */
1887
1888 static_always_inline uword
1889 session_queue_pre_input_inline (vlib_main_t * vm, vlib_node_runtime_t * node,
1890                                 vlib_frame_t * frame)
1891 {
1892   session_main_t *sm = &session_main;
1893   if (!sm->wrk[0].vpp_event_queue)
1894     return 0;
1895   node = vlib_node_get_runtime (vm, session_queue_node.index);
1896   return session_queue_node_fn (vm, node, frame);
1897 }
1898
1899 /* *INDENT-OFF* */
1900 VLIB_REGISTER_NODE (session_queue_pre_input_node) =
1901 {
1902   .function = session_queue_pre_input_inline,
1903   .type = VLIB_NODE_TYPE_PRE_INPUT,
1904   .name = "session-queue-main",
1905   .state = VLIB_NODE_STATE_DISABLED,
1906 };
1907 /* *INDENT-ON* */
1908
1909 /*
1910  * fd.io coding-style-patch-verification: ON
1911  *
1912  * Local Variables:
1913  * eval: (c-set-style "gnu")
1914  * End:
1915  */