session: avoid reordering unlisten and connect msg
[vpp.git] / src / vnet / session / session_node.c
1 /*
2  * Copyright (c) 2017-2019 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include <math.h>
17 #include <vlib/vlib.h>
18 #include <vnet/vnet.h>
19 #include <vppinfra/elog.h>
20 #include <vnet/session/transport.h>
21 #include <vnet/session/session.h>
22 #include <vnet/session/application.h>
23 #include <vnet/session/application_interface.h>
24 #include <vnet/session/application_local.h>
25 #include <vnet/session/session_debug.h>
26 #include <svm/queue.h>
27 #include <sys/timerfd.h>
28
29 #define app_check_thread_and_barrier(_fn, _arg)                         \
30   if (!vlib_thread_is_main_w_barrier ())                                \
31     {                                                                   \
32      vlib_rpc_call_main_thread (_fn, (u8 *) _arg, sizeof(*_arg));       \
33       return;                                                           \
34    }
35
36 static void
37 session_wrk_timerfd_update (session_worker_t *wrk, u64 time_ns)
38 {
39   struct itimerspec its;
40
41   its.it_value.tv_sec = 0;
42   its.it_value.tv_nsec = time_ns;
43   its.it_interval.tv_sec = 0;
44   its.it_interval.tv_nsec = its.it_value.tv_nsec;
45
46   if (timerfd_settime (wrk->timerfd, 0, &its, NULL) == -1)
47     clib_warning ("timerfd_settime");
48 }
49
50 always_inline u64
51 session_wrk_tfd_timeout (session_wrk_state_t state, u32 thread_index)
52 {
53   if (state == SESSION_WRK_INTERRUPT)
54     return thread_index ? 1e6 : vlib_num_workers () ? 5e8 : 1e6;
55   else if (state == SESSION_WRK_IDLE)
56     return thread_index ? 1e8 : vlib_num_workers () ? 5e8 : 1e8;
57   else
58     return 0;
59 }
60
61 static inline void
62 session_wrk_set_state (session_worker_t *wrk, session_wrk_state_t state)
63 {
64   u64 time_ns;
65
66   wrk->state = state;
67   if (wrk->timerfd == -1)
68     return;
69   time_ns = session_wrk_tfd_timeout (state, wrk->vm->thread_index);
70   session_wrk_timerfd_update (wrk, time_ns);
71 }
72
73 static transport_endpt_ext_cfg_t *
74 session_mq_get_ext_config (application_t *app, uword offset)
75 {
76   svm_fifo_chunk_t *c;
77   fifo_segment_t *fs;
78
79   fs = application_get_rx_mqs_segment (app);
80   c = fs_chunk_ptr (fs->h, offset);
81   return (transport_endpt_ext_cfg_t *) c->data;
82 }
83
84 static void
85 session_mq_free_ext_config (application_t *app, uword offset)
86 {
87   svm_fifo_chunk_t *c;
88   fifo_segment_t *fs;
89
90   fs = application_get_rx_mqs_segment (app);
91   c = fs_chunk_ptr (fs->h, offset);
92   fifo_segment_collect_chunk (fs, 0 /* only one slice */, c);
93 }
94
95 static void
96 session_mq_listen_handler (void *data)
97 {
98   session_listen_msg_t *mp = (session_listen_msg_t *) data;
99   vnet_listen_args_t _a, *a = &_a;
100   app_worker_t *app_wrk;
101   application_t *app;
102   int rv;
103
104   app_check_thread_and_barrier (session_mq_listen_handler, mp);
105
106   app = application_lookup (mp->client_index);
107   if (!app)
108     return;
109
110   clib_memset (a, 0, sizeof (*a));
111   a->sep.is_ip4 = mp->is_ip4;
112   ip_copy (&a->sep.ip, &mp->ip, mp->is_ip4);
113   a->sep.port = mp->port;
114   a->sep.fib_index = mp->vrf;
115   a->sep.sw_if_index = ENDPOINT_INVALID_INDEX;
116   a->sep.transport_proto = mp->proto;
117   a->app_index = app->app_index;
118   a->wrk_map_index = mp->wrk_index;
119   a->sep_ext.transport_flags = mp->flags;
120
121   if (mp->ext_config)
122     a->sep_ext.ext_cfg = session_mq_get_ext_config (app, mp->ext_config);
123
124   if ((rv = vnet_listen (a)))
125     clib_warning ("listen returned: %U", format_session_error, rv);
126
127   app_wrk = application_get_worker (app, mp->wrk_index);
128   mq_send_session_bound_cb (app_wrk->wrk_index, mp->context, a->handle, rv);
129
130   if (mp->ext_config)
131     session_mq_free_ext_config (app, mp->ext_config);
132 }
133
134 static void
135 session_mq_listen_uri_handler (void *data)
136 {
137   session_listen_uri_msg_t *mp = (session_listen_uri_msg_t *) data;
138   vnet_listen_args_t _a, *a = &_a;
139   app_worker_t *app_wrk;
140   application_t *app;
141   int rv;
142
143   app_check_thread_and_barrier (session_mq_listen_uri_handler, mp);
144
145   app = application_lookup (mp->client_index);
146   if (!app)
147     return;
148
149   clib_memset (a, 0, sizeof (*a));
150   a->uri = (char *) mp->uri;
151   a->app_index = app->app_index;
152   rv = vnet_bind_uri (a);
153
154   app_wrk = application_get_worker (app, 0);
155   mq_send_session_bound_cb (app_wrk->wrk_index, mp->context, a->handle, rv);
156 }
157
158 static void
159 session_mq_connect_one (session_connect_msg_t *mp)
160 {
161   vnet_connect_args_t _a, *a = &_a;
162   app_worker_t *app_wrk;
163   application_t *app;
164   int rv;
165
166   app = application_lookup (mp->client_index);
167   if (!app)
168     return;
169
170   clib_memset (a, 0, sizeof (*a));
171   a->sep.is_ip4 = mp->is_ip4;
172   clib_memcpy_fast (&a->sep.ip, &mp->ip, sizeof (mp->ip));
173   a->sep.port = mp->port;
174   a->sep.transport_proto = mp->proto;
175   a->sep.peer.fib_index = mp->vrf;
176   clib_memcpy_fast (&a->sep.peer.ip, &mp->lcl_ip, sizeof (mp->lcl_ip));
177   if (mp->is_ip4)
178     {
179       ip46_address_mask_ip4 (&a->sep.ip);
180       ip46_address_mask_ip4 (&a->sep.peer.ip);
181     }
182   a->sep.peer.port = mp->lcl_port;
183   a->sep.peer.sw_if_index = ENDPOINT_INVALID_INDEX;
184   a->sep_ext.parent_handle = mp->parent_handle;
185   a->sep_ext.transport_flags = mp->flags;
186   a->api_context = mp->context;
187   a->app_index = app->app_index;
188   a->wrk_map_index = mp->wrk_index;
189
190   if (mp->ext_config)
191     a->sep_ext.ext_cfg = session_mq_get_ext_config (app, mp->ext_config);
192
193   if ((rv = vnet_connect (a)))
194     {
195       clib_warning ("connect returned: %U", format_session_error, rv);
196       app_wrk = application_get_worker (app, mp->wrk_index);
197       mq_send_session_connected_cb (app_wrk->wrk_index, mp->context, 0, rv);
198     }
199
200   if (mp->ext_config)
201     session_mq_free_ext_config (app, mp->ext_config);
202 }
203
204 static void
205 session_mq_handle_connects_rpc (void *arg)
206 {
207   u32 max_connects = 32, n_connects = 0;
208   vlib_main_t *vm = vlib_get_main ();
209   session_evt_elt_t *he, *elt, *next;
210   session_worker_t *fwrk, *wrk;
211
212   ASSERT (vlib_get_thread_index () == 0);
213
214   /* Pending connects on linked list pertaining to first worker */
215   fwrk = session_main_get_worker (1);
216   if (!fwrk->n_pending_connects)
217     goto update_state;
218
219   vlib_worker_thread_barrier_sync (vm);
220
221   he = clib_llist_elt (fwrk->event_elts, fwrk->pending_connects);
222   elt = clib_llist_next (fwrk->event_elts, evt_list, he);
223
224   /* Avoid holding the barrier for too long */
225   while (n_connects < max_connects && elt != he)
226     {
227       next = clib_llist_next (fwrk->event_elts, evt_list, elt);
228       clib_llist_remove (fwrk->event_elts, evt_list, elt);
229       session_mq_connect_one (session_evt_ctrl_data (fwrk, elt));
230       clib_llist_put (fwrk->event_elts, elt);
231       elt = next;
232       n_connects += 1;
233     }
234
235   /* Decrement with worker barrier */
236   fwrk->n_pending_connects -= n_connects;
237
238   vlib_worker_thread_barrier_release (vm);
239
240 update_state:
241
242   /* Switch worker to poll mode if it was in interrupt mode and had work or
243    * back to interrupt if threshold of loops without a connect is passed.
244    * While in poll mode, reprogram connects rpc */
245   wrk = session_main_get_worker (0);
246   if (wrk->state != SESSION_WRK_POLLING)
247     {
248       if (n_connects)
249         {
250           session_wrk_set_state (wrk, SESSION_WRK_POLLING);
251           vlib_node_set_state (vm, session_queue_node.index,
252                                VLIB_NODE_STATE_POLLING);
253           wrk->no_connect_loops = 0;
254         }
255     }
256   else
257     {
258       if (!n_connects)
259         {
260           if (++wrk->no_connect_loops > 1e5)
261             {
262               session_wrk_set_state (wrk, SESSION_WRK_INTERRUPT);
263               vlib_node_set_state (vm, session_queue_node.index,
264                                    VLIB_NODE_STATE_INTERRUPT);
265             }
266         }
267       else
268         wrk->no_connect_loops = 0;
269     }
270
271   if (wrk->state == SESSION_WRK_POLLING)
272     {
273       elt = session_evt_alloc_ctrl (wrk);
274       elt->evt.event_type = SESSION_CTRL_EVT_RPC;
275       elt->evt.rpc_args.fp = session_mq_handle_connects_rpc;
276     }
277 }
278
279 static void
280 session_mq_connect_handler (session_worker_t *wrk, session_evt_elt_t *elt)
281 {
282   u32 thread_index = wrk - session_main.wrk;
283   session_evt_elt_t *he;
284
285   /* No workers, so just deal with the connect now */
286   if (PREDICT_FALSE (!thread_index))
287     {
288       session_mq_connect_one (session_evt_ctrl_data (wrk, elt));
289       return;
290     }
291
292   if (PREDICT_FALSE (thread_index != 1))
293     {
294       clib_warning ("Connect on wrong thread. Dropping");
295       return;
296     }
297
298   /* Add to pending list to be handled by main thread */
299   he = clib_llist_elt (wrk->event_elts, wrk->pending_connects);
300   clib_llist_add_tail (wrk->event_elts, evt_list, elt, he);
301
302   /* Decremented with worker barrier */
303   wrk->n_pending_connects += 1;
304   if (wrk->n_pending_connects == 1)
305     {
306       vlib_node_set_interrupt_pending (vlib_get_main_by_index (0),
307                                        session_queue_node.index);
308       session_send_rpc_evt_to_thread (0, session_mq_handle_connects_rpc, 0);
309     }
310 }
311
312 static void
313 session_mq_connect_uri_handler (void *data)
314 {
315   session_connect_uri_msg_t *mp = (session_connect_uri_msg_t *) data;
316   vnet_connect_args_t _a, *a = &_a;
317   app_worker_t *app_wrk;
318   application_t *app;
319   int rv;
320
321   app_check_thread_and_barrier (session_mq_connect_uri_handler, mp);
322
323   app = application_lookup (mp->client_index);
324   if (!app)
325     return;
326
327   clib_memset (a, 0, sizeof (*a));
328   a->uri = (char *) mp->uri;
329   a->api_context = mp->context;
330   a->app_index = app->app_index;
331   if ((rv = vnet_connect_uri (a)))
332     {
333       clib_warning ("connect_uri returned: %d", rv);
334       app_wrk = application_get_worker (app, 0 /* default wrk only */ );
335       mq_send_session_connected_cb (app_wrk->wrk_index, mp->context, 0, rv);
336     }
337 }
338
339 static void
340 session_mq_shutdown_handler (void *data)
341 {
342   session_shutdown_msg_t *mp = (session_shutdown_msg_t *) data;
343   vnet_shutdown_args_t _a, *a = &_a;
344   application_t *app;
345
346   app = application_lookup (mp->client_index);
347   if (!app)
348     return;
349
350   a->app_index = app->app_index;
351   a->handle = mp->handle;
352   vnet_shutdown_session (a);
353 }
354
355 static void
356 session_mq_disconnect_handler (void *data)
357 {
358   session_disconnect_msg_t *mp = (session_disconnect_msg_t *) data;
359   vnet_disconnect_args_t _a, *a = &_a;
360   application_t *app;
361
362   app = application_lookup (mp->client_index);
363   if (!app)
364     return;
365
366   a->app_index = app->app_index;
367   a->handle = mp->handle;
368   vnet_disconnect_session (a);
369 }
370
371 static void
372 app_mq_detach_handler (void *data)
373 {
374   session_app_detach_msg_t *mp = (session_app_detach_msg_t *) data;
375   vnet_app_detach_args_t _a, *a = &_a;
376   application_t *app;
377
378   app_check_thread_and_barrier (app_mq_detach_handler, mp);
379
380   app = application_lookup (mp->client_index);
381   if (!app)
382     return;
383
384   a->app_index = app->app_index;
385   a->api_client_index = mp->client_index;
386   vnet_application_detach (a);
387 }
388
389 static void
390 session_mq_unlisten_rpc (session_unlisten_msg_t *mp)
391 {
392   vlib_main_t *vm = vlib_get_main ();
393   vnet_unlisten_args_t _a, *a = &_a;
394   app_worker_t *app_wrk;
395   session_handle_t sh;
396   application_t *app;
397   u32 context;
398   int rv;
399
400   sh = mp->handle;
401   context = mp->context;
402
403   vlib_worker_thread_barrier_sync (vm);
404
405   app = application_lookup (mp->client_index);
406   if (!app)
407     return;
408
409   clib_memset (a, 0, sizeof (*a));
410   a->app_index = app->app_index;
411   a->handle = sh;
412   a->wrk_map_index = mp->wrk_index;
413   if ((rv = vnet_unlisten (a)))
414     clib_warning ("unlisten returned: %d", rv);
415
416   app_wrk = application_get_worker (app, a->wrk_map_index);
417   if (!app_wrk)
418     return;
419
420   vlib_worker_thread_barrier_release (vm);
421
422   mq_send_unlisten_reply (app_wrk, sh, context, rv);
423   clib_mem_free (mp);
424 }
425
426 static void
427 session_mq_unlisten_handler (session_worker_t *wrk, session_evt_elt_t *elt)
428 {
429   u32 thread_index = wrk - session_main.wrk;
430   session_unlisten_msg_t *mp, *arg;
431
432   mp = session_evt_ctrl_data (wrk, elt);
433   arg = clib_mem_alloc (sizeof (session_unlisten_msg_t));
434   clib_memcpy_fast (arg, mp, sizeof (*arg));
435
436   if (PREDICT_FALSE (!thread_index))
437     {
438       session_mq_unlisten_rpc (arg);
439       return;
440     }
441
442   session_send_rpc_evt_to_thread_force (0, session_mq_unlisten_rpc, arg);
443 }
444
445 static void
446 session_mq_accepted_reply_handler (void *data)
447 {
448   session_accepted_reply_msg_t *mp = (session_accepted_reply_msg_t *) data;
449   vnet_disconnect_args_t _a = { 0 }, *a = &_a;
450   session_state_t old_state;
451   app_worker_t *app_wrk;
452   session_t *s;
453
454   /* Server isn't interested, kill the session */
455   if (mp->retval)
456     {
457       a->app_index = mp->context;
458       a->handle = mp->handle;
459       vnet_disconnect_session (a);
460       return;
461     }
462
463   /* Mail this back from the main thread. We're not polling in main
464    * thread so we're using other workers for notifications. */
465   if (vlib_num_workers () && vlib_get_thread_index () != 0
466       && session_thread_from_handle (mp->handle) == 0)
467     {
468       vlib_rpc_call_main_thread (session_mq_accepted_reply_handler,
469                                  (u8 *) mp, sizeof (*mp));
470       return;
471     }
472
473   s = session_get_from_handle_if_valid (mp->handle);
474   if (!s)
475     return;
476
477   app_wrk = app_worker_get (s->app_wrk_index);
478   if (app_wrk->app_index != mp->context)
479     {
480       clib_warning ("app doesn't own session");
481       return;
482     }
483
484   if (!session_has_transport (s))
485     {
486       s->session_state = SESSION_STATE_READY;
487       if (ct_session_connect_notify (s, SESSION_E_NONE))
488         return;
489     }
490   else
491     {
492       old_state = s->session_state;
493       s->session_state = SESSION_STATE_READY;
494
495       if (!svm_fifo_is_empty_prod (s->rx_fifo))
496         app_worker_lock_and_send_event (app_wrk, s, SESSION_IO_EVT_RX);
497
498       /* Closed while waiting for app to reply. Resend disconnect */
499       if (old_state >= SESSION_STATE_TRANSPORT_CLOSING)
500         {
501           app_worker_close_notify (app_wrk, s);
502           s->session_state = old_state;
503           return;
504         }
505     }
506 }
507
508 static void
509 session_mq_reset_reply_handler (void *data)
510 {
511   vnet_disconnect_args_t _a = { 0 }, *a = &_a;
512   session_reset_reply_msg_t *mp;
513   app_worker_t *app_wrk;
514   session_t *s;
515   application_t *app;
516   u32 index, thread_index;
517
518   mp = (session_reset_reply_msg_t *) data;
519   app = application_lookup (mp->context);
520   if (!app)
521     return;
522
523   session_parse_handle (mp->handle, &index, &thread_index);
524   s = session_get_if_valid (index, thread_index);
525
526   /* No session or not the right session */
527   if (!s || s->session_state < SESSION_STATE_TRANSPORT_CLOSING)
528     return;
529
530   app_wrk = app_worker_get (s->app_wrk_index);
531   if (!app_wrk || app_wrk->app_index != app->app_index)
532     {
533       clib_warning ("App %u does not own handle 0x%lx!", app->app_index,
534                     mp->handle);
535       return;
536     }
537
538   /* Client objected to resetting the session, log and continue */
539   if (mp->retval)
540     {
541       clib_warning ("client retval %d", mp->retval);
542       return;
543     }
544
545   /* This comes as a response to a reset, transport only waiting for
546    * confirmation to remove connection state, no need to disconnect */
547   a->handle = mp->handle;
548   a->app_index = app->app_index;
549   vnet_disconnect_session (a);
550 }
551
552 static void
553 session_mq_disconnected_handler (void *data)
554 {
555   session_disconnected_reply_msg_t *rmp;
556   vnet_disconnect_args_t _a, *a = &_a;
557   svm_msg_q_msg_t _msg, *msg = &_msg;
558   session_disconnected_msg_t *mp;
559   app_worker_t *app_wrk;
560   session_event_t *evt;
561   session_t *s;
562   application_t *app;
563   int rv = 0;
564
565   mp = (session_disconnected_msg_t *) data;
566   if (!(s = session_get_from_handle_if_valid (mp->handle)))
567     {
568       clib_warning ("could not disconnect handle %llu", mp->handle);
569       return;
570     }
571   app_wrk = app_worker_get (s->app_wrk_index);
572   app = application_lookup (mp->client_index);
573   if (!(app_wrk && app && app->app_index == app_wrk->app_index))
574     {
575       clib_warning ("could not disconnect session: %llu app: %u",
576                     mp->handle, mp->client_index);
577       return;
578     }
579
580   a->handle = mp->handle;
581   a->app_index = app_wrk->wrk_index;
582   rv = vnet_disconnect_session (a);
583
584   svm_msg_q_lock_and_alloc_msg_w_ring (app_wrk->event_queue,
585                                        SESSION_MQ_CTRL_EVT_RING,
586                                        SVM_Q_WAIT, msg);
587   evt = svm_msg_q_msg_data (app_wrk->event_queue, msg);
588   clib_memset (evt, 0, sizeof (*evt));
589   evt->event_type = SESSION_CTRL_EVT_DISCONNECTED_REPLY;
590   rmp = (session_disconnected_reply_msg_t *) evt->data;
591   rmp->handle = mp->handle;
592   rmp->context = mp->context;
593   rmp->retval = rv;
594   svm_msg_q_add_and_unlock (app_wrk->event_queue, msg);
595 }
596
597 static void
598 session_mq_disconnected_reply_handler (void *data)
599 {
600   session_disconnected_reply_msg_t *mp;
601   vnet_disconnect_args_t _a, *a = &_a;
602   application_t *app;
603
604   mp = (session_disconnected_reply_msg_t *) data;
605
606   /* Client objected to disconnecting the session, log and continue */
607   if (mp->retval)
608     {
609       clib_warning ("client retval %d", mp->retval);
610       return;
611     }
612
613   /* Disconnect has been confirmed. Confirm close to transport */
614   app = application_lookup (mp->context);
615   if (app)
616     {
617       a->handle = mp->handle;
618       a->app_index = app->app_index;
619       vnet_disconnect_session (a);
620     }
621 }
622
623 static void
624 session_mq_worker_update_handler (void *data)
625 {
626   session_worker_update_msg_t *mp = (session_worker_update_msg_t *) data;
627   session_worker_update_reply_msg_t *rmp;
628   svm_msg_q_msg_t _msg, *msg = &_msg;
629   app_worker_t *app_wrk;
630   u32 owner_app_wrk_map;
631   session_event_t *evt;
632   session_t *s;
633   application_t *app;
634
635   app = application_lookup (mp->client_index);
636   if (!app)
637     return;
638   if (!(s = session_get_from_handle_if_valid (mp->handle)))
639     {
640       clib_warning ("invalid handle %llu", mp->handle);
641       return;
642     }
643   app_wrk = app_worker_get (s->app_wrk_index);
644   if (app_wrk->app_index != app->app_index)
645     {
646       clib_warning ("app %u does not own session %llu", app->app_index,
647                     mp->handle);
648       return;
649     }
650   owner_app_wrk_map = app_wrk->wrk_map_index;
651   app_wrk = application_get_worker (app, mp->wrk_index);
652
653   /* This needs to come from the new owner */
654   if (mp->req_wrk_index == owner_app_wrk_map)
655     {
656       session_req_worker_update_msg_t *wump;
657
658       svm_msg_q_lock_and_alloc_msg_w_ring (app_wrk->event_queue,
659                                            SESSION_MQ_CTRL_EVT_RING,
660                                            SVM_Q_WAIT, msg);
661       evt = svm_msg_q_msg_data (app_wrk->event_queue, msg);
662       clib_memset (evt, 0, sizeof (*evt));
663       evt->event_type = SESSION_CTRL_EVT_REQ_WORKER_UPDATE;
664       wump = (session_req_worker_update_msg_t *) evt->data;
665       wump->session_handle = mp->handle;
666       svm_msg_q_add_and_unlock (app_wrk->event_queue, msg);
667       return;
668     }
669
670   app_worker_own_session (app_wrk, s);
671
672   /*
673    * Send reply
674    */
675   svm_msg_q_lock_and_alloc_msg_w_ring (app_wrk->event_queue,
676                                        SESSION_MQ_CTRL_EVT_RING,
677                                        SVM_Q_WAIT, msg);
678   evt = svm_msg_q_msg_data (app_wrk->event_queue, msg);
679   clib_memset (evt, 0, sizeof (*evt));
680   evt->event_type = SESSION_CTRL_EVT_WORKER_UPDATE_REPLY;
681   rmp = (session_worker_update_reply_msg_t *) evt->data;
682   rmp->handle = mp->handle;
683   if (s->rx_fifo)
684     rmp->rx_fifo = fifo_segment_fifo_offset (s->rx_fifo);
685   if (s->tx_fifo)
686     rmp->tx_fifo = fifo_segment_fifo_offset (s->tx_fifo);
687   rmp->segment_handle = session_segment_handle (s);
688   svm_msg_q_add_and_unlock (app_wrk->event_queue, msg);
689
690   /*
691    * Retransmit messages that may have been lost
692    */
693   if (s->tx_fifo && !svm_fifo_is_empty (s->tx_fifo))
694     session_send_io_evt_to_thread (s->tx_fifo, SESSION_IO_EVT_TX);
695
696   if (s->rx_fifo && !svm_fifo_is_empty (s->rx_fifo))
697     app_worker_lock_and_send_event (app_wrk, s, SESSION_IO_EVT_RX);
698
699   if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSING)
700     app_worker_close_notify (app_wrk, s);
701 }
702
703 static void
704 session_mq_app_wrk_rpc_handler (void *data)
705 {
706   session_app_wrk_rpc_msg_t *mp = (session_app_wrk_rpc_msg_t *) data;
707   svm_msg_q_msg_t _msg, *msg = &_msg;
708   session_app_wrk_rpc_msg_t *rmp;
709   app_worker_t *app_wrk;
710   session_event_t *evt;
711   application_t *app;
712
713   app = application_lookup (mp->client_index);
714   if (!app)
715     return;
716
717   app_wrk = application_get_worker (app, mp->wrk_index);
718
719   svm_msg_q_lock_and_alloc_msg_w_ring (app_wrk->event_queue,
720                                        SESSION_MQ_CTRL_EVT_RING, SVM_Q_WAIT,
721                                        msg);
722   evt = svm_msg_q_msg_data (app_wrk->event_queue, msg);
723   clib_memset (evt, 0, sizeof (*evt));
724   evt->event_type = SESSION_CTRL_EVT_APP_WRK_RPC;
725   rmp = (session_app_wrk_rpc_msg_t *) evt->data;
726   clib_memcpy (rmp->data, mp->data, sizeof (mp->data));
727   svm_msg_q_add_and_unlock (app_wrk->event_queue, msg);
728 }
729
730 static void
731 session_mq_transport_attr_handler (void *data)
732 {
733   session_transport_attr_msg_t *mp = (session_transport_attr_msg_t *) data;
734   session_transport_attr_reply_msg_t *rmp;
735   svm_msg_q_msg_t _msg, *msg = &_msg;
736   app_worker_t *app_wrk;
737   session_event_t *evt;
738   application_t *app;
739   session_t *s;
740   int rv;
741
742   app = application_lookup (mp->client_index);
743   if (!app)
744     return;
745
746   if (!(s = session_get_from_handle_if_valid (mp->handle)))
747     {
748       clib_warning ("invalid handle %llu", mp->handle);
749       return;
750     }
751   app_wrk = app_worker_get (s->app_wrk_index);
752   if (app_wrk->app_index != app->app_index)
753     {
754       clib_warning ("app %u does not own session %llu", app->app_index,
755                     mp->handle);
756       return;
757     }
758
759   rv = session_transport_attribute (s, mp->is_get, &mp->attr);
760
761   svm_msg_q_lock_and_alloc_msg_w_ring (
762     app_wrk->event_queue, SESSION_MQ_CTRL_EVT_RING, SVM_Q_WAIT, msg);
763   evt = svm_msg_q_msg_data (app_wrk->event_queue, msg);
764   clib_memset (evt, 0, sizeof (*evt));
765   evt->event_type = SESSION_CTRL_EVT_TRANSPORT_ATTR_REPLY;
766   rmp = (session_transport_attr_reply_msg_t *) evt->data;
767   rmp->handle = mp->handle;
768   rmp->retval = rv;
769   rmp->is_get = mp->is_get;
770   if (!rv && mp->is_get)
771     rmp->attr = mp->attr;
772   svm_msg_q_add_and_unlock (app_wrk->event_queue, msg);
773 }
774
775 vlib_node_registration_t session_queue_node;
776
777 typedef struct
778 {
779   u32 session_index;
780   u32 server_thread_index;
781 } session_queue_trace_t;
782
783 /* packet trace format function */
784 static u8 *
785 format_session_queue_trace (u8 * s, va_list * args)
786 {
787   CLIB_UNUSED (vlib_main_t * vm) = va_arg (*args, vlib_main_t *);
788   CLIB_UNUSED (vlib_node_t * node) = va_arg (*args, vlib_node_t *);
789   session_queue_trace_t *t = va_arg (*args, session_queue_trace_t *);
790
791   s = format (s, "session index %d thread index %d",
792               t->session_index, t->server_thread_index);
793   return s;
794 }
795
796 #define foreach_session_queue_error             \
797 _(TX, "Packets transmitted")                    \
798 _(TIMER, "Timer events")                        \
799 _(NO_BUFFER, "Out of buffers")
800
801 typedef enum
802 {
803 #define _(sym,str) SESSION_QUEUE_ERROR_##sym,
804   foreach_session_queue_error
805 #undef _
806     SESSION_QUEUE_N_ERROR,
807 } session_queue_error_t;
808
809 static char *session_queue_error_strings[] = {
810 #define _(sym,string) string,
811   foreach_session_queue_error
812 #undef _
813 };
814
815 enum
816 {
817   SESSION_TX_NO_BUFFERS = -2,
818   SESSION_TX_NO_DATA,
819   SESSION_TX_OK
820 };
821
822 static void
823 session_tx_trace_frame (vlib_main_t * vm, vlib_node_runtime_t * node,
824                         u32 next_index, u32 * to_next, u16 n_segs,
825                         session_t * s, u32 n_trace)
826 {
827   while (n_trace && n_segs)
828     {
829       vlib_buffer_t *b = vlib_get_buffer (vm, to_next[0]);
830       if (PREDICT_TRUE
831           (vlib_trace_buffer
832            (vm, node, next_index, b, 1 /* follow_chain */ )))
833         {
834           session_queue_trace_t *t =
835             vlib_add_trace (vm, node, b, sizeof (*t));
836           t->session_index = s->session_index;
837           t->server_thread_index = s->thread_index;
838           n_trace--;
839         }
840       to_next++;
841       n_segs--;
842     }
843   vlib_set_trace_count (vm, node, n_trace);
844 }
845
846 always_inline void
847 session_tx_fifo_chain_tail (vlib_main_t * vm, session_tx_context_t * ctx,
848                             vlib_buffer_t * b, u16 * n_bufs, u8 peek_data)
849 {
850   vlib_buffer_t *chain_b, *prev_b;
851   u32 chain_bi0, to_deq, left_from_seg;
852   u16 len_to_deq, n_bytes_read;
853   u8 *data, j;
854
855   b->flags |= VLIB_BUFFER_TOTAL_LENGTH_VALID;
856   b->total_length_not_including_first_buffer = 0;
857
858   chain_b = b;
859   left_from_seg = clib_min (ctx->sp.snd_mss - b->current_length,
860                             ctx->left_to_snd);
861   to_deq = left_from_seg;
862   for (j = 1; j < ctx->n_bufs_per_seg; j++)
863     {
864       prev_b = chain_b;
865       len_to_deq = clib_min (to_deq, ctx->deq_per_buf);
866
867       *n_bufs -= 1;
868       chain_bi0 = ctx->tx_buffers[*n_bufs];
869       chain_b = vlib_get_buffer (vm, chain_bi0);
870       chain_b->current_data = 0;
871       data = vlib_buffer_get_current (chain_b);
872       if (peek_data)
873         {
874           n_bytes_read = svm_fifo_peek (ctx->s->tx_fifo,
875                                         ctx->sp.tx_offset, len_to_deq, data);
876           ctx->sp.tx_offset += n_bytes_read;
877         }
878       else
879         {
880           if (ctx->transport_vft->transport_options.tx_type ==
881               TRANSPORT_TX_DGRAM)
882             {
883               svm_fifo_t *f = ctx->s->tx_fifo;
884               session_dgram_hdr_t *hdr = &ctx->hdr;
885               u16 deq_now;
886               u32 offset;
887
888               deq_now = clib_min (hdr->data_length - hdr->data_offset,
889                                   len_to_deq);
890               offset = hdr->data_offset + SESSION_CONN_HDR_LEN;
891               n_bytes_read = svm_fifo_peek (f, offset, deq_now, data);
892               ASSERT (n_bytes_read > 0);
893
894               hdr->data_offset += n_bytes_read;
895               if (hdr->data_offset == hdr->data_length)
896                 {
897                   offset = hdr->data_length + SESSION_CONN_HDR_LEN;
898                   svm_fifo_dequeue_drop (f, offset);
899                   if (ctx->left_to_snd > n_bytes_read)
900                     svm_fifo_peek (ctx->s->tx_fifo, 0, sizeof (ctx->hdr),
901                                    (u8 *) & ctx->hdr);
902                 }
903               else if (ctx->left_to_snd == n_bytes_read)
904                 svm_fifo_overwrite_head (ctx->s->tx_fifo, (u8 *) & ctx->hdr,
905                                          sizeof (session_dgram_pre_hdr_t));
906             }
907           else
908             n_bytes_read = svm_fifo_dequeue (ctx->s->tx_fifo,
909                                              len_to_deq, data);
910         }
911       ASSERT (n_bytes_read == len_to_deq);
912       chain_b->current_length = n_bytes_read;
913       b->total_length_not_including_first_buffer += chain_b->current_length;
914
915       /* update previous buffer */
916       prev_b->next_buffer = chain_bi0;
917       prev_b->flags |= VLIB_BUFFER_NEXT_PRESENT;
918
919       /* update current buffer */
920       chain_b->next_buffer = 0;
921
922       to_deq -= n_bytes_read;
923       if (to_deq == 0)
924         break;
925     }
926   ASSERT (to_deq == 0
927           && b->total_length_not_including_first_buffer == left_from_seg);
928   ctx->left_to_snd -= left_from_seg;
929 }
930
931 always_inline void
932 session_tx_fill_buffer (vlib_main_t * vm, session_tx_context_t * ctx,
933                         vlib_buffer_t * b, u16 * n_bufs, u8 peek_data)
934 {
935   u32 len_to_deq;
936   u8 *data0;
937   int n_bytes_read;
938
939   /*
940    * Start with the first buffer in chain
941    */
942   b->error = 0;
943   b->flags = VNET_BUFFER_F_LOCALLY_ORIGINATED;
944   b->current_data = 0;
945
946   data0 = vlib_buffer_make_headroom (b, TRANSPORT_MAX_HDRS_LEN);
947   len_to_deq = clib_min (ctx->left_to_snd, ctx->deq_per_first_buf);
948
949   if (peek_data)
950     {
951       n_bytes_read = svm_fifo_peek (ctx->s->tx_fifo, ctx->sp.tx_offset,
952                                     len_to_deq, data0);
953       ASSERT (n_bytes_read > 0);
954       /* Keep track of progress locally, transport is also supposed to
955        * increment it independently when pushing the header */
956       ctx->sp.tx_offset += n_bytes_read;
957     }
958   else
959     {
960       if (ctx->transport_vft->transport_options.tx_type == TRANSPORT_TX_DGRAM)
961         {
962           session_dgram_hdr_t *hdr = &ctx->hdr;
963           svm_fifo_t *f = ctx->s->tx_fifo;
964           u16 deq_now;
965           u32 offset;
966
967           ASSERT (hdr->data_length > hdr->data_offset);
968           deq_now = clib_min (hdr->data_length - hdr->data_offset,
969                               len_to_deq);
970           offset = hdr->data_offset + SESSION_CONN_HDR_LEN;
971           n_bytes_read = svm_fifo_peek (f, offset, deq_now, data0);
972           ASSERT (n_bytes_read > 0);
973
974           if (ctx->s->session_state == SESSION_STATE_LISTENING)
975             {
976               ip_copy (&ctx->tc->rmt_ip, &hdr->rmt_ip, ctx->tc->is_ip4);
977               ctx->tc->rmt_port = hdr->rmt_port;
978             }
979           hdr->data_offset += n_bytes_read;
980           if (hdr->data_offset == hdr->data_length)
981             {
982               offset = hdr->data_length + SESSION_CONN_HDR_LEN;
983               svm_fifo_dequeue_drop (f, offset);
984               if (ctx->left_to_snd > n_bytes_read)
985                 svm_fifo_peek (ctx->s->tx_fifo, 0, sizeof (ctx->hdr),
986                                (u8 *) & ctx->hdr);
987             }
988           else if (ctx->left_to_snd == n_bytes_read)
989             svm_fifo_overwrite_head (ctx->s->tx_fifo, (u8 *) & ctx->hdr,
990                                      sizeof (session_dgram_pre_hdr_t));
991         }
992       else
993         {
994           n_bytes_read = svm_fifo_dequeue (ctx->s->tx_fifo,
995                                            len_to_deq, data0);
996           ASSERT (n_bytes_read > 0);
997         }
998     }
999   b->current_length = n_bytes_read;
1000   ctx->left_to_snd -= n_bytes_read;
1001
1002   /*
1003    * Fill in the remaining buffers in the chain, if any
1004    */
1005   if (PREDICT_FALSE (ctx->n_bufs_per_seg > 1 && ctx->left_to_snd))
1006     session_tx_fifo_chain_tail (vm, ctx, b, n_bufs, peek_data);
1007 }
1008
1009 always_inline u8
1010 session_tx_not_ready (session_t * s, u8 peek_data)
1011 {
1012   if (peek_data)
1013     {
1014       if (PREDICT_TRUE (s->session_state == SESSION_STATE_READY))
1015         return 0;
1016       /* Can retransmit for closed sessions but can't send new data if
1017        * session is not ready or closed */
1018       else if (s->session_state < SESSION_STATE_READY)
1019         return 1;
1020       else if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSED)
1021         {
1022           /* Allow closed transports to still send custom packets.
1023            * For instance, tcp may want to send acks in time-wait. */
1024           if (s->session_state != SESSION_STATE_TRANSPORT_DELETED
1025               && (s->flags & SESSION_F_CUSTOM_TX))
1026             return 0;
1027           return 2;
1028         }
1029     }
1030   return 0;
1031 }
1032
1033 always_inline transport_connection_t *
1034 session_tx_get_transport (session_tx_context_t * ctx, u8 peek_data)
1035 {
1036   if (peek_data)
1037     {
1038       return ctx->transport_vft->get_connection (ctx->s->connection_index,
1039                                                  ctx->s->thread_index);
1040     }
1041   else
1042     {
1043       if (ctx->s->session_state == SESSION_STATE_LISTENING)
1044         return ctx->transport_vft->get_listener (ctx->s->connection_index);
1045       else
1046         {
1047           return ctx->transport_vft->get_connection (ctx->s->connection_index,
1048                                                      ctx->s->thread_index);
1049         }
1050     }
1051 }
1052
1053 always_inline void
1054 session_tx_set_dequeue_params (vlib_main_t * vm, session_tx_context_t * ctx,
1055                                u32 max_segs, u8 peek_data)
1056 {
1057   u32 n_bytes_per_buf, n_bytes_per_seg;
1058
1059   n_bytes_per_buf = vlib_buffer_get_default_data_size (vm);
1060   ctx->max_dequeue = svm_fifo_max_dequeue_cons (ctx->s->tx_fifo);
1061
1062   if (peek_data)
1063     {
1064       /* Offset in rx fifo from where to peek data */
1065       if (PREDICT_FALSE (ctx->sp.tx_offset >= ctx->max_dequeue))
1066         {
1067           ctx->max_len_to_snd = 0;
1068           return;
1069         }
1070       ctx->max_dequeue -= ctx->sp.tx_offset;
1071     }
1072   else
1073     {
1074       if (ctx->transport_vft->transport_options.tx_type == TRANSPORT_TX_DGRAM)
1075         {
1076           u32 len, chain_limit;
1077
1078           if (ctx->max_dequeue <= sizeof (ctx->hdr))
1079             {
1080               ctx->max_len_to_snd = 0;
1081               return;
1082             }
1083
1084           svm_fifo_peek (ctx->s->tx_fifo, 0, sizeof (ctx->hdr),
1085                          (u8 *) & ctx->hdr);
1086           ASSERT (ctx->hdr.data_length > ctx->hdr.data_offset);
1087           len = ctx->hdr.data_length - ctx->hdr.data_offset;
1088
1089           /* Process multiple dgrams if smaller than min (buf_space, mss).
1090            * This avoids handling multiple dgrams if they require buffer
1091            * chains */
1092           chain_limit = clib_min (n_bytes_per_buf - TRANSPORT_MAX_HDRS_LEN,
1093                                   ctx->sp.snd_mss);
1094           if (ctx->hdr.data_length <= chain_limit)
1095             {
1096               u32 first_dgram_len, dgram_len, offset, max_offset;
1097               session_dgram_hdr_t hdr;
1098
1099               ctx->sp.snd_mss = clib_min (ctx->sp.snd_mss, len);
1100               offset = ctx->hdr.data_length + sizeof (session_dgram_hdr_t);
1101               first_dgram_len = len;
1102               max_offset = clib_min (ctx->max_dequeue, 16 << 10);
1103
1104               while (offset < max_offset)
1105                 {
1106                   svm_fifo_peek (ctx->s->tx_fifo, offset, sizeof (ctx->hdr),
1107                                  (u8 *) & hdr);
1108                   ASSERT (hdr.data_length > hdr.data_offset);
1109                   dgram_len = hdr.data_length - hdr.data_offset;
1110                   if (len + dgram_len > ctx->max_dequeue
1111                       || first_dgram_len != dgram_len)
1112                     break;
1113                   len += dgram_len;
1114                   offset += sizeof (hdr) + hdr.data_length;
1115                 }
1116             }
1117
1118           ctx->max_dequeue = len;
1119         }
1120     }
1121   ASSERT (ctx->max_dequeue > 0);
1122
1123   /* Ensure we're not writing more than transport window allows */
1124   if (ctx->max_dequeue < ctx->sp.snd_space)
1125     {
1126       /* Constrained by tx queue. Try to send only fully formed segments */
1127       ctx->max_len_to_snd = (ctx->max_dequeue > ctx->sp.snd_mss) ?
1128         (ctx->max_dequeue - (ctx->max_dequeue % ctx->sp.snd_mss)) :
1129         ctx->max_dequeue;
1130       /* TODO Nagle ? */
1131     }
1132   else
1133     {
1134       /* Expectation is that snd_space0 is already a multiple of snd_mss */
1135       ctx->max_len_to_snd = ctx->sp.snd_space;
1136     }
1137
1138   /* Check if we're tx constrained by the node */
1139   ctx->n_segs_per_evt = ceil ((f64) ctx->max_len_to_snd / ctx->sp.snd_mss);
1140   if (ctx->n_segs_per_evt > max_segs)
1141     {
1142       ctx->n_segs_per_evt = max_segs;
1143       ctx->max_len_to_snd = max_segs * ctx->sp.snd_mss;
1144     }
1145
1146   ASSERT (n_bytes_per_buf > TRANSPORT_MAX_HDRS_LEN);
1147   if (ctx->n_segs_per_evt > 1)
1148     {
1149       u32 n_bytes_last_seg, n_bufs_last_seg;
1150
1151       n_bytes_per_seg = TRANSPORT_MAX_HDRS_LEN + ctx->sp.snd_mss;
1152       n_bytes_last_seg = TRANSPORT_MAX_HDRS_LEN + ctx->max_len_to_snd
1153         - ((ctx->n_segs_per_evt - 1) * ctx->sp.snd_mss);
1154       ctx->n_bufs_per_seg = ceil ((f64) n_bytes_per_seg / n_bytes_per_buf);
1155       n_bufs_last_seg = ceil ((f64) n_bytes_last_seg / n_bytes_per_buf);
1156       ctx->n_bufs_needed = ((ctx->n_segs_per_evt - 1) * ctx->n_bufs_per_seg)
1157         + n_bufs_last_seg;
1158     }
1159   else
1160     {
1161       n_bytes_per_seg = TRANSPORT_MAX_HDRS_LEN + ctx->max_len_to_snd;
1162       ctx->n_bufs_per_seg = ceil ((f64) n_bytes_per_seg / n_bytes_per_buf);
1163       ctx->n_bufs_needed = ctx->n_bufs_per_seg;
1164     }
1165
1166   ctx->deq_per_buf = clib_min (ctx->sp.snd_mss, n_bytes_per_buf);
1167   ctx->deq_per_first_buf = clib_min (ctx->sp.snd_mss,
1168                                      n_bytes_per_buf -
1169                                      TRANSPORT_MAX_HDRS_LEN);
1170 }
1171
1172 always_inline void
1173 session_tx_maybe_reschedule (session_worker_t * wrk,
1174                              session_tx_context_t * ctx,
1175                              session_evt_elt_t * elt)
1176 {
1177   session_t *s = ctx->s;
1178
1179   svm_fifo_unset_event (s->tx_fifo);
1180   if (svm_fifo_max_dequeue_cons (s->tx_fifo) > ctx->sp.tx_offset)
1181     if (svm_fifo_set_event (s->tx_fifo))
1182       session_evt_add_head_old (wrk, elt);
1183 }
1184
1185 always_inline int
1186 session_tx_fifo_read_and_snd_i (session_worker_t * wrk,
1187                                 vlib_node_runtime_t * node,
1188                                 session_evt_elt_t * elt,
1189                                 int *n_tx_packets, u8 peek_data)
1190 {
1191   u32 n_trace, n_left, pbi, next_index, max_burst;
1192   session_tx_context_t *ctx = &wrk->ctx;
1193   session_main_t *smm = &session_main;
1194   session_event_t *e = &elt->evt;
1195   vlib_main_t *vm = wrk->vm;
1196   transport_proto_t tp;
1197   vlib_buffer_t *pb;
1198   u16 n_bufs, rv;
1199
1200   if (PREDICT_FALSE ((rv = session_tx_not_ready (ctx->s, peek_data))))
1201     {
1202       if (rv < 2)
1203         session_evt_add_old (wrk, elt);
1204       return SESSION_TX_NO_DATA;
1205     }
1206
1207   next_index = smm->session_type_to_next[ctx->s->session_type];
1208   max_burst = SESSION_NODE_FRAME_SIZE - *n_tx_packets;
1209
1210   tp = session_get_transport_proto (ctx->s);
1211   ctx->transport_vft = transport_protocol_get_vft (tp);
1212   ctx->tc = session_tx_get_transport (ctx, peek_data);
1213
1214   if (PREDICT_FALSE (e->event_type == SESSION_IO_EVT_TX_FLUSH))
1215     {
1216       if (ctx->transport_vft->flush_data)
1217         ctx->transport_vft->flush_data (ctx->tc);
1218       e->event_type = SESSION_IO_EVT_TX;
1219     }
1220
1221   if (ctx->s->flags & SESSION_F_CUSTOM_TX)
1222     {
1223       u32 n_custom_tx;
1224       ctx->s->flags &= ~SESSION_F_CUSTOM_TX;
1225       ctx->sp.max_burst_size = max_burst;
1226       n_custom_tx = ctx->transport_vft->custom_tx (ctx->tc, &ctx->sp);
1227       *n_tx_packets += n_custom_tx;
1228       if (PREDICT_FALSE
1229           (ctx->s->session_state >= SESSION_STATE_TRANSPORT_CLOSED))
1230         return SESSION_TX_OK;
1231       max_burst -= n_custom_tx;
1232       if (!max_burst || (ctx->s->flags & SESSION_F_CUSTOM_TX))
1233         {
1234           session_evt_add_old (wrk, elt);
1235           return SESSION_TX_OK;
1236         }
1237     }
1238
1239   transport_connection_snd_params (ctx->tc, &ctx->sp);
1240
1241   if (!ctx->sp.snd_space)
1242     {
1243       /* If the deschedule flag was set, remove session from scheduler.
1244        * Transport is responsible for rescheduling this session. */
1245       if (ctx->sp.flags & TRANSPORT_SND_F_DESCHED)
1246         transport_connection_deschedule (ctx->tc);
1247       /* Request to postpone the session, e.g., zero-wnd and transport
1248        * is not currently probing */
1249       else if (ctx->sp.flags & TRANSPORT_SND_F_POSTPONE)
1250         session_evt_add_old (wrk, elt);
1251       /* This flow queue is "empty" so it should be re-evaluated before
1252        * the ones that have data to send. */
1253       else
1254         session_evt_add_head_old (wrk, elt);
1255
1256       return SESSION_TX_NO_DATA;
1257     }
1258
1259   if (transport_connection_is_tx_paced (ctx->tc))
1260     {
1261       u32 snd_space = transport_connection_tx_pacer_burst (ctx->tc);
1262       if (snd_space < TRANSPORT_PACER_MIN_BURST)
1263         {
1264           session_evt_add_head_old (wrk, elt);
1265           return SESSION_TX_NO_DATA;
1266         }
1267       snd_space = clib_min (ctx->sp.snd_space, snd_space);
1268       ctx->sp.snd_space = snd_space >= ctx->sp.snd_mss ?
1269         snd_space - snd_space % ctx->sp.snd_mss : snd_space;
1270     }
1271
1272   /* Check how much we can pull. */
1273   session_tx_set_dequeue_params (vm, ctx, max_burst, peek_data);
1274
1275   if (PREDICT_FALSE (!ctx->max_len_to_snd))
1276     {
1277       transport_connection_tx_pacer_reset_bucket (ctx->tc, 0);
1278       session_tx_maybe_reschedule (wrk, ctx, elt);
1279       return SESSION_TX_NO_DATA;
1280     }
1281
1282   vec_validate_aligned (ctx->tx_buffers, ctx->n_bufs_needed - 1,
1283                         CLIB_CACHE_LINE_BYTES);
1284   n_bufs = vlib_buffer_alloc (vm, ctx->tx_buffers, ctx->n_bufs_needed);
1285   if (PREDICT_FALSE (n_bufs < ctx->n_bufs_needed))
1286     {
1287       if (n_bufs)
1288         vlib_buffer_free (vm, ctx->tx_buffers, n_bufs);
1289       session_evt_add_head_old (wrk, elt);
1290       vlib_node_increment_counter (wrk->vm, node->node_index,
1291                                    SESSION_QUEUE_ERROR_NO_BUFFER, 1);
1292       return SESSION_TX_NO_BUFFERS;
1293     }
1294
1295   if (transport_connection_is_tx_paced (ctx->tc))
1296     transport_connection_tx_pacer_update_bytes (ctx->tc, ctx->max_len_to_snd);
1297
1298   ctx->left_to_snd = ctx->max_len_to_snd;
1299   n_left = ctx->n_segs_per_evt;
1300
1301   while (n_left >= 4)
1302     {
1303       vlib_buffer_t *b0, *b1;
1304       u32 bi0, bi1;
1305
1306       pbi = ctx->tx_buffers[n_bufs - 3];
1307       pb = vlib_get_buffer (vm, pbi);
1308       vlib_prefetch_buffer_header (pb, STORE);
1309       pbi = ctx->tx_buffers[n_bufs - 4];
1310       pb = vlib_get_buffer (vm, pbi);
1311       vlib_prefetch_buffer_header (pb, STORE);
1312
1313       bi0 = ctx->tx_buffers[--n_bufs];
1314       bi1 = ctx->tx_buffers[--n_bufs];
1315
1316       b0 = vlib_get_buffer (vm, bi0);
1317       b1 = vlib_get_buffer (vm, bi1);
1318
1319       session_tx_fill_buffer (vm, ctx, b0, &n_bufs, peek_data);
1320       session_tx_fill_buffer (vm, ctx, b1, &n_bufs, peek_data);
1321
1322       ctx->transport_vft->push_header (ctx->tc, b0);
1323       ctx->transport_vft->push_header (ctx->tc, b1);
1324
1325       n_left -= 2;
1326
1327       vec_add1 (wrk->pending_tx_buffers, bi0);
1328       vec_add1 (wrk->pending_tx_buffers, bi1);
1329       vec_add1 (wrk->pending_tx_nexts, next_index);
1330       vec_add1 (wrk->pending_tx_nexts, next_index);
1331     }
1332   while (n_left)
1333     {
1334       vlib_buffer_t *b0;
1335       u32 bi0;
1336
1337       if (n_left > 1)
1338         {
1339           pbi = ctx->tx_buffers[n_bufs - 2];
1340           pb = vlib_get_buffer (vm, pbi);
1341           vlib_prefetch_buffer_header (pb, STORE);
1342         }
1343
1344       bi0 = ctx->tx_buffers[--n_bufs];
1345       b0 = vlib_get_buffer (vm, bi0);
1346       session_tx_fill_buffer (vm, ctx, b0, &n_bufs, peek_data);
1347
1348       /* Ask transport to push header after current_length and
1349        * total_length_not_including_first_buffer are updated */
1350       ctx->transport_vft->push_header (ctx->tc, b0);
1351
1352       n_left -= 1;
1353
1354       vec_add1 (wrk->pending_tx_buffers, bi0);
1355       vec_add1 (wrk->pending_tx_nexts, next_index);
1356     }
1357
1358   if (PREDICT_FALSE ((n_trace = vlib_get_trace_count (vm, node)) > 0))
1359     session_tx_trace_frame (vm, node, next_index, wrk->pending_tx_buffers,
1360                             ctx->n_segs_per_evt, ctx->s, n_trace);
1361
1362   if (PREDICT_FALSE (n_bufs))
1363     vlib_buffer_free (vm, ctx->tx_buffers, n_bufs);
1364
1365   *n_tx_packets += ctx->n_segs_per_evt;
1366
1367   SESSION_EVT (SESSION_EVT_DEQ, ctx->s, ctx->max_len_to_snd, ctx->max_dequeue,
1368                ctx->s->tx_fifo->has_event, wrk->last_vlib_time);
1369
1370   ASSERT (ctx->left_to_snd == 0);
1371
1372   /* If we couldn't dequeue all bytes reschedule as old flow. Otherwise,
1373    * check if application enqueued more data and reschedule accordingly */
1374   if (ctx->max_len_to_snd < ctx->max_dequeue)
1375     session_evt_add_old (wrk, elt);
1376   else
1377     session_tx_maybe_reschedule (wrk, ctx, elt);
1378
1379   if (!peek_data)
1380     {
1381       u32 n_dequeued = ctx->max_len_to_snd;
1382       if (ctx->transport_vft->transport_options.tx_type == TRANSPORT_TX_DGRAM)
1383         n_dequeued += ctx->n_segs_per_evt * SESSION_CONN_HDR_LEN;
1384       if (svm_fifo_needs_deq_ntf (ctx->s->tx_fifo, n_dequeued))
1385         session_dequeue_notify (ctx->s);
1386     }
1387   return SESSION_TX_OK;
1388 }
1389
1390 int
1391 session_tx_fifo_peek_and_snd (session_worker_t * wrk,
1392                               vlib_node_runtime_t * node,
1393                               session_evt_elt_t * e, int *n_tx_packets)
1394 {
1395   return session_tx_fifo_read_and_snd_i (wrk, node, e, n_tx_packets, 1);
1396 }
1397
1398 int
1399 session_tx_fifo_dequeue_and_snd (session_worker_t * wrk,
1400                                  vlib_node_runtime_t * node,
1401                                  session_evt_elt_t * e, int *n_tx_packets)
1402 {
1403   return session_tx_fifo_read_and_snd_i (wrk, node, e, n_tx_packets, 0);
1404 }
1405
1406 int
1407 session_tx_fifo_dequeue_internal (session_worker_t * wrk,
1408                                   vlib_node_runtime_t * node,
1409                                   session_evt_elt_t * elt, int *n_tx_packets)
1410 {
1411   transport_send_params_t *sp = &wrk->ctx.sp;
1412   session_t *s = wrk->ctx.s;
1413   u32 n_packets;
1414
1415   if (PREDICT_FALSE (s->session_state >= SESSION_STATE_TRANSPORT_CLOSED))
1416     return 0;
1417
1418   /* Clear custom-tx flag used to request reschedule for tx */
1419   s->flags &= ~SESSION_F_CUSTOM_TX;
1420
1421   sp->max_burst_size = clib_min (SESSION_NODE_FRAME_SIZE - *n_tx_packets,
1422                                  TRANSPORT_PACER_MAX_BURST_PKTS);
1423
1424   n_packets = transport_custom_tx (session_get_transport_proto (s), s, sp);
1425   *n_tx_packets += n_packets;
1426
1427   if (s->flags & SESSION_F_CUSTOM_TX)
1428     {
1429       session_evt_add_old (wrk, elt);
1430     }
1431   else if (!(sp->flags & TRANSPORT_SND_F_DESCHED))
1432     {
1433       svm_fifo_unset_event (s->tx_fifo);
1434       if (svm_fifo_max_dequeue_cons (s->tx_fifo))
1435         if (svm_fifo_set_event (s->tx_fifo))
1436           session_evt_add_head_old (wrk, elt);
1437     }
1438
1439   if (sp->max_burst_size &&
1440       svm_fifo_needs_deq_ntf (s->tx_fifo, sp->max_burst_size))
1441     session_dequeue_notify (s);
1442
1443   return n_packets;
1444 }
1445
1446 always_inline session_t *
1447 session_event_get_session (session_worker_t * wrk, session_event_t * e)
1448 {
1449   if (PREDICT_FALSE (pool_is_free_index (wrk->sessions, e->session_index)))
1450     return 0;
1451
1452   ASSERT (session_is_valid (e->session_index, wrk->vm->thread_index));
1453   return pool_elt_at_index (wrk->sessions, e->session_index);
1454 }
1455
1456 always_inline void
1457 session_event_dispatch_ctrl (session_worker_t * wrk, session_evt_elt_t * elt)
1458 {
1459   clib_llist_index_t ei;
1460   void (*fp) (void *);
1461   session_event_t *e;
1462   session_t *s;
1463
1464   ei = clib_llist_entry_index (wrk->event_elts, elt);
1465   e = &elt->evt;
1466
1467   switch (e->event_type)
1468     {
1469     case SESSION_CTRL_EVT_RPC:
1470       fp = e->rpc_args.fp;
1471       (*fp) (e->rpc_args.arg);
1472       break;
1473     case SESSION_CTRL_EVT_HALF_CLOSE:
1474       s = session_get_from_handle_if_valid (e->session_handle);
1475       if (PREDICT_FALSE (!s))
1476         break;
1477       session_transport_half_close (s);
1478       break;
1479     case SESSION_CTRL_EVT_CLOSE:
1480       s = session_get_from_handle_if_valid (e->session_handle);
1481       if (PREDICT_FALSE (!s))
1482         break;
1483       session_transport_close (s);
1484       break;
1485     case SESSION_CTRL_EVT_RESET:
1486       s = session_get_from_handle_if_valid (e->session_handle);
1487       if (PREDICT_FALSE (!s))
1488         break;
1489       session_transport_reset (s);
1490       break;
1491     case SESSION_CTRL_EVT_LISTEN:
1492       session_mq_listen_handler (session_evt_ctrl_data (wrk, elt));
1493       break;
1494     case SESSION_CTRL_EVT_LISTEN_URI:
1495       session_mq_listen_uri_handler (session_evt_ctrl_data (wrk, elt));
1496       break;
1497     case SESSION_CTRL_EVT_UNLISTEN:
1498       session_mq_unlisten_handler (wrk, elt);
1499       break;
1500     case SESSION_CTRL_EVT_CONNECT:
1501       session_mq_connect_handler (wrk, elt);
1502       break;
1503     case SESSION_CTRL_EVT_CONNECT_URI:
1504       session_mq_connect_uri_handler (session_evt_ctrl_data (wrk, elt));
1505       break;
1506     case SESSION_CTRL_EVT_SHUTDOWN:
1507       session_mq_shutdown_handler (session_evt_ctrl_data (wrk, elt));
1508       break;
1509     case SESSION_CTRL_EVT_DISCONNECT:
1510       session_mq_disconnect_handler (session_evt_ctrl_data (wrk, elt));
1511       break;
1512     case SESSION_CTRL_EVT_DISCONNECTED:
1513       session_mq_disconnected_handler (session_evt_ctrl_data (wrk, elt));
1514       break;
1515     case SESSION_CTRL_EVT_ACCEPTED_REPLY:
1516       session_mq_accepted_reply_handler (session_evt_ctrl_data (wrk, elt));
1517       break;
1518     case SESSION_CTRL_EVT_DISCONNECTED_REPLY:
1519       session_mq_disconnected_reply_handler (session_evt_ctrl_data (wrk,
1520                                                                     elt));
1521       break;
1522     case SESSION_CTRL_EVT_RESET_REPLY:
1523       session_mq_reset_reply_handler (session_evt_ctrl_data (wrk, elt));
1524       break;
1525     case SESSION_CTRL_EVT_WORKER_UPDATE:
1526       session_mq_worker_update_handler (session_evt_ctrl_data (wrk, elt));
1527       break;
1528     case SESSION_CTRL_EVT_APP_DETACH:
1529       app_mq_detach_handler (session_evt_ctrl_data (wrk, elt));
1530       break;
1531     case SESSION_CTRL_EVT_APP_WRK_RPC:
1532       session_mq_app_wrk_rpc_handler (session_evt_ctrl_data (wrk, elt));
1533       break;
1534     case SESSION_CTRL_EVT_TRANSPORT_ATTR:
1535       session_mq_transport_attr_handler (session_evt_ctrl_data (wrk, elt));
1536       break;
1537     default:
1538       clib_warning ("unhandled event type %d", e->event_type);
1539     }
1540
1541   /* Regrab elements in case pool moved */
1542   elt = clib_llist_elt (wrk->event_elts, ei);
1543   if (!clib_llist_elt_is_linked (elt, evt_list))
1544     {
1545       e = &elt->evt;
1546       if (e->event_type >= SESSION_CTRL_EVT_BOUND)
1547         session_evt_ctrl_data_free (wrk, elt);
1548       clib_llist_put (wrk->event_elts, elt);
1549     }
1550   SESSION_EVT (SESSION_EVT_COUNTS, CNT_CTRL_EVTS, 1, wrk);
1551 }
1552
1553 always_inline void
1554 session_event_dispatch_io (session_worker_t * wrk, vlib_node_runtime_t * node,
1555                            session_evt_elt_t * elt, int *n_tx_packets)
1556 {
1557   session_main_t *smm = &session_main;
1558   app_worker_t *app_wrk;
1559   clib_llist_index_t ei;
1560   session_event_t *e;
1561   session_t *s;
1562
1563   ei = clib_llist_entry_index (wrk->event_elts, elt);
1564   e = &elt->evt;
1565
1566   switch (e->event_type)
1567     {
1568     case SESSION_IO_EVT_TX_FLUSH:
1569     case SESSION_IO_EVT_TX:
1570       s = session_event_get_session (wrk, e);
1571       if (PREDICT_FALSE (!s))
1572         break;
1573       CLIB_PREFETCH (s->tx_fifo, 2 * CLIB_CACHE_LINE_BYTES, LOAD);
1574       wrk->ctx.s = s;
1575       /* Spray packets in per session type frames, since they go to
1576        * different nodes */
1577       (smm->session_tx_fns[s->session_type]) (wrk, node, elt, n_tx_packets);
1578       break;
1579     case SESSION_IO_EVT_RX:
1580       s = session_event_get_session (wrk, e);
1581       if (!s)
1582         break;
1583       transport_app_rx_evt (session_get_transport_proto (s),
1584                             s->connection_index, s->thread_index);
1585       break;
1586     case SESSION_IO_EVT_BUILTIN_RX:
1587       s = session_event_get_session (wrk, e);
1588       if (PREDICT_FALSE (!s || s->session_state >= SESSION_STATE_CLOSING))
1589         break;
1590       svm_fifo_unset_event (s->rx_fifo);
1591       app_wrk = app_worker_get (s->app_wrk_index);
1592       app_worker_builtin_rx (app_wrk, s);
1593       break;
1594     case SESSION_IO_EVT_BUILTIN_TX:
1595       s = session_get_from_handle_if_valid (e->session_handle);
1596       wrk->ctx.s = s;
1597       if (PREDICT_TRUE (s != 0))
1598         session_tx_fifo_dequeue_internal (wrk, node, elt, n_tx_packets);
1599       break;
1600     default:
1601       clib_warning ("unhandled event type %d", e->event_type);
1602     }
1603
1604   SESSION_EVT (SESSION_IO_EVT_COUNTS, e->event_type, 1, wrk);
1605
1606   /* Regrab elements in case pool moved */
1607   elt = clib_llist_elt (wrk->event_elts, ei);
1608   if (!clib_llist_elt_is_linked (elt, evt_list))
1609     clib_llist_put (wrk->event_elts, elt);
1610 }
1611
1612 /* *INDENT-OFF* */
1613 static const u32 session_evt_msg_sizes[] = {
1614 #define _(symc, sym)                                                    \
1615   [SESSION_CTRL_EVT_ ## symc] = sizeof (session_ ## sym ##_msg_t),
1616   foreach_session_ctrl_evt
1617 #undef _
1618 };
1619 /* *INDENT-ON* */
1620
1621 always_inline void
1622 session_evt_add_to_list (session_worker_t * wrk, session_event_t * evt)
1623 {
1624   session_evt_elt_t *elt;
1625
1626   if (evt->event_type >= SESSION_CTRL_EVT_RPC)
1627     {
1628       elt = session_evt_alloc_ctrl (wrk);
1629       if (evt->event_type >= SESSION_CTRL_EVT_BOUND)
1630         {
1631           elt->evt.ctrl_data_index = session_evt_ctrl_data_alloc (wrk);
1632           elt->evt.event_type = evt->event_type;
1633           clib_memcpy_fast (session_evt_ctrl_data (wrk, elt), evt->data,
1634                             session_evt_msg_sizes[evt->event_type]);
1635         }
1636       else
1637         {
1638           /* Internal control events fit into io events footprint */
1639           clib_memcpy_fast (&elt->evt, evt, sizeof (elt->evt));
1640         }
1641     }
1642   else
1643     {
1644       elt = session_evt_alloc_new (wrk);
1645       clib_memcpy_fast (&elt->evt, evt, sizeof (elt->evt));
1646     }
1647 }
1648
1649 static void
1650 session_flush_pending_tx_buffers (session_worker_t * wrk,
1651                                   vlib_node_runtime_t * node)
1652 {
1653   vlib_buffer_enqueue_to_next (wrk->vm, node, wrk->pending_tx_buffers,
1654                                wrk->pending_tx_nexts,
1655                                vec_len (wrk->pending_tx_nexts));
1656   vec_reset_length (wrk->pending_tx_buffers);
1657   vec_reset_length (wrk->pending_tx_nexts);
1658 }
1659
1660 int
1661 session_wrk_handle_mq (session_worker_t *wrk, svm_msg_q_t *mq)
1662 {
1663   svm_msg_q_msg_t _msg, *msg = &_msg;
1664   u32 i, n_to_dequeue = 0;
1665   session_event_t *evt;
1666
1667   n_to_dequeue = svm_msg_q_size (mq);
1668   for (i = 0; i < n_to_dequeue; i++)
1669     {
1670       svm_msg_q_sub_raw (mq, msg);
1671       evt = svm_msg_q_msg_data (mq, msg);
1672       session_evt_add_to_list (wrk, evt);
1673       svm_msg_q_free_msg (mq, msg);
1674     }
1675
1676   return n_to_dequeue;
1677 }
1678
1679 static void
1680 session_wrk_update_state (session_worker_t *wrk)
1681 {
1682   vlib_main_t *vm = wrk->vm;
1683
1684   if (wrk->state == SESSION_WRK_POLLING)
1685     {
1686       if (clib_llist_elts (wrk->event_elts) == 4 &&
1687           vlib_last_vectors_per_main_loop (vm) < 1)
1688         {
1689           session_wrk_set_state (wrk, SESSION_WRK_INTERRUPT);
1690           vlib_node_set_state (vm, session_queue_node.index,
1691                                VLIB_NODE_STATE_INTERRUPT);
1692         }
1693     }
1694   else if (wrk->state == SESSION_WRK_INTERRUPT)
1695     {
1696       if (clib_llist_elts (wrk->event_elts) > 4 ||
1697           vlib_last_vectors_per_main_loop (vm) > 1)
1698         {
1699           session_wrk_set_state (wrk, SESSION_WRK_POLLING);
1700           vlib_node_set_state (vm, session_queue_node.index,
1701                                VLIB_NODE_STATE_POLLING);
1702         }
1703       else if (PREDICT_FALSE (!pool_elts (wrk->sessions)))
1704         {
1705           session_wrk_set_state (wrk, SESSION_WRK_IDLE);
1706         }
1707     }
1708   else
1709     {
1710       if (clib_llist_elts (wrk->event_elts))
1711         {
1712           session_wrk_set_state (wrk, SESSION_WRK_INTERRUPT);
1713         }
1714     }
1715 }
1716
1717 static uword
1718 session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
1719                        vlib_frame_t * frame)
1720 {
1721   u32 thread_index = vm->thread_index, __clib_unused n_evts;
1722   session_evt_elt_t *elt, *ctrl_he, *new_he, *old_he;
1723   session_main_t *smm = vnet_get_session_main ();
1724   session_worker_t *wrk = &smm->wrk[thread_index];
1725   clib_llist_index_t ei, next_ei, old_ti;
1726   int n_tx_packets;
1727
1728   SESSION_EVT (SESSION_EVT_DISPATCH_START, wrk);
1729
1730   session_wrk_update_time (wrk, vlib_time_now (vm));
1731
1732   /*
1733    *  Update transport time
1734    */
1735   transport_update_time (wrk->last_vlib_time, thread_index);
1736   n_tx_packets = vec_len (wrk->pending_tx_buffers);
1737   SESSION_EVT (SESSION_EVT_DSP_CNTRS, UPDATE_TIME, wrk);
1738
1739   /*
1740    *  Dequeue new internal mq events
1741    */
1742
1743   n_evts = session_wrk_handle_mq (wrk, wrk->vpp_event_queue);
1744   SESSION_EVT (SESSION_EVT_DSP_CNTRS, MQ_DEQ, wrk, n_evts);
1745
1746   /*
1747    * Handle control events
1748    */
1749
1750   ei = wrk->ctrl_head;
1751   ctrl_he = clib_llist_elt (wrk->event_elts, ei);
1752   next_ei = clib_llist_next_index (ctrl_he, evt_list);
1753   old_ti = clib_llist_prev_index (ctrl_he, evt_list);
1754   while (ei != old_ti)
1755     {
1756       ei = next_ei;
1757       elt = clib_llist_elt (wrk->event_elts, next_ei);
1758       next_ei = clib_llist_next_index (elt, evt_list);
1759       clib_llist_remove (wrk->event_elts, evt_list, elt);
1760       session_event_dispatch_ctrl (wrk, elt);
1761     }
1762
1763   SESSION_EVT (SESSION_EVT_DSP_CNTRS, CTRL_EVTS, wrk);
1764
1765   /*
1766    * Handle the new io events.
1767    */
1768
1769   new_he = clib_llist_elt (wrk->event_elts, wrk->new_head);
1770   old_he = clib_llist_elt (wrk->event_elts, wrk->old_head);
1771   old_ti = clib_llist_prev_index (old_he, evt_list);
1772
1773   ei = clib_llist_next_index (new_he, evt_list);
1774   while (ei != wrk->new_head && n_tx_packets < SESSION_NODE_FRAME_SIZE)
1775     {
1776       elt = clib_llist_elt (wrk->event_elts, ei);
1777       ei = clib_llist_next_index (elt, evt_list);
1778       clib_llist_remove (wrk->event_elts, evt_list, elt);
1779       session_event_dispatch_io (wrk, node, elt, &n_tx_packets);
1780     }
1781
1782   SESSION_EVT (SESSION_EVT_DSP_CNTRS, NEW_IO_EVTS, wrk);
1783
1784   /*
1785    * Handle the old io events, if we had any prior to processing the new ones
1786    */
1787
1788   if (old_ti != wrk->old_head)
1789     {
1790       old_he = clib_llist_elt (wrk->event_elts, wrk->old_head);
1791       ei = clib_llist_next_index (old_he, evt_list);
1792
1793       while (n_tx_packets < SESSION_NODE_FRAME_SIZE)
1794         {
1795           elt = clib_llist_elt (wrk->event_elts, ei);
1796           next_ei = clib_llist_next_index (elt, evt_list);
1797           clib_llist_remove (wrk->event_elts, evt_list, elt);
1798
1799           session_event_dispatch_io (wrk, node, elt, &n_tx_packets);
1800
1801           if (ei == old_ti)
1802             break;
1803
1804           ei = next_ei;
1805         };
1806     }
1807
1808   SESSION_EVT (SESSION_EVT_DSP_CNTRS, OLD_IO_EVTS, wrk);
1809
1810   if (vec_len (wrk->pending_tx_buffers))
1811     session_flush_pending_tx_buffers (wrk, node);
1812
1813   vlib_node_increment_counter (vm, session_queue_node.index,
1814                                SESSION_QUEUE_ERROR_TX, n_tx_packets);
1815
1816   SESSION_EVT (SESSION_EVT_DISPATCH_END, wrk, n_tx_packets);
1817
1818   if (wrk->flags & SESSION_WRK_F_ADAPTIVE)
1819     session_wrk_update_state (wrk);
1820
1821   return n_tx_packets;
1822 }
1823
1824 /* *INDENT-OFF* */
1825 VLIB_REGISTER_NODE (session_queue_node) =
1826 {
1827   .function = session_queue_node_fn,
1828   .flags = VLIB_NODE_FLAG_TRACE_SUPPORTED,
1829   .name = "session-queue",
1830   .format_trace = format_session_queue_trace,
1831   .type = VLIB_NODE_TYPE_INPUT,
1832   .n_errors = ARRAY_LEN (session_queue_error_strings),
1833   .error_strings = session_queue_error_strings,
1834   .state = VLIB_NODE_STATE_DISABLED,
1835 };
1836 /* *INDENT-ON* */
1837
1838 static clib_error_t *
1839 session_wrk_tfd_read_ready (clib_file_t *cf)
1840 {
1841   session_worker_t *wrk = session_main_get_worker (cf->private_data);
1842   u64 buf;
1843   int rv;
1844
1845   vlib_node_set_interrupt_pending (wrk->vm, session_queue_node.index);
1846   rv = read (wrk->timerfd, &buf, sizeof (buf));
1847   if (rv < 0 && errno != EAGAIN)
1848     clib_unix_warning ("failed");
1849   return 0;
1850 }
1851
1852 static clib_error_t *
1853 session_wrk_tfd_write_ready (clib_file_t *cf)
1854 {
1855   return 0;
1856 }
1857
1858 void
1859 session_wrk_enable_adaptive_mode (session_worker_t *wrk)
1860 {
1861   u32 thread_index = wrk->vm->thread_index;
1862   clib_file_t template = { 0 };
1863
1864   if ((wrk->timerfd = timerfd_create (CLOCK_MONOTONIC, TFD_NONBLOCK)) < 0)
1865     clib_warning ("timerfd_create");
1866
1867   template.read_function = session_wrk_tfd_read_ready;
1868   template.write_function = session_wrk_tfd_write_ready;
1869   template.file_descriptor = wrk->timerfd;
1870   template.private_data = thread_index;
1871   template.polling_thread_index = thread_index;
1872   template.description = format (0, "session-wrk-tfd-%u", thread_index);
1873
1874   wrk->timerfd_file = clib_file_add (&file_main, &template);
1875   wrk->flags |= SESSION_WRK_F_ADAPTIVE;
1876 }
1877
1878 static clib_error_t *
1879 session_queue_exit (vlib_main_t * vm)
1880 {
1881   if (vlib_get_n_threads () < 2)
1882     return 0;
1883
1884   /*
1885    * Shut off (especially) worker-thread session nodes.
1886    * Otherwise, vpp can crash as the main thread unmaps the
1887    * API segment.
1888    */
1889   vlib_worker_thread_barrier_sync (vm);
1890   session_node_enable_disable (0 /* is_enable */ );
1891   vlib_worker_thread_barrier_release (vm);
1892   return 0;
1893 }
1894
1895 VLIB_MAIN_LOOP_EXIT_FUNCTION (session_queue_exit);
1896
1897 static uword
1898 session_queue_run_on_main (vlib_main_t * vm)
1899 {
1900   vlib_node_runtime_t *node;
1901
1902   node = vlib_node_get_runtime (vm, session_queue_node.index);
1903   return session_queue_node_fn (vm, node, 0);
1904 }
1905
1906 static uword
1907 session_queue_process (vlib_main_t * vm, vlib_node_runtime_t * rt,
1908                        vlib_frame_t * f)
1909 {
1910   uword *event_data = 0;
1911   f64 timeout = 1.0;
1912   uword event_type;
1913
1914   while (1)
1915     {
1916       vlib_process_wait_for_event_or_clock (vm, timeout);
1917       event_type = vlib_process_get_events (vm, (uword **) & event_data);
1918
1919       switch (event_type)
1920         {
1921         case SESSION_Q_PROCESS_RUN_ON_MAIN:
1922           /* Run session queue node on main thread */
1923           session_queue_run_on_main (vm);
1924           break;
1925         case SESSION_Q_PROCESS_STOP:
1926           vlib_node_set_state (vm, session_queue_process_node.index,
1927                                VLIB_NODE_STATE_DISABLED);
1928           timeout = 100000.0;
1929           break;
1930         case ~0:
1931           /* Timed out. Run on main to ensure all events are handled */
1932           session_queue_run_on_main (vm);
1933           break;
1934         }
1935       vec_reset_length (event_data);
1936     }
1937   return 0;
1938 }
1939
1940 /* *INDENT-OFF* */
1941 VLIB_REGISTER_NODE (session_queue_process_node) =
1942 {
1943   .function = session_queue_process,
1944   .type = VLIB_NODE_TYPE_PROCESS,
1945   .name = "session-queue-process",
1946   .state = VLIB_NODE_STATE_DISABLED,
1947 };
1948 /* *INDENT-ON* */
1949
1950 static_always_inline uword
1951 session_queue_pre_input_inline (vlib_main_t * vm, vlib_node_runtime_t * node,
1952                                 vlib_frame_t * frame)
1953 {
1954   session_main_t *sm = &session_main;
1955   if (!sm->wrk[0].vpp_event_queue)
1956     return 0;
1957   node = vlib_node_get_runtime (vm, session_queue_node.index);
1958   return session_queue_node_fn (vm, node, frame);
1959 }
1960
1961 /* *INDENT-OFF* */
1962 VLIB_REGISTER_NODE (session_queue_pre_input_node) =
1963 {
1964   .function = session_queue_pre_input_inline,
1965   .type = VLIB_NODE_TYPE_PRE_INPUT,
1966   .name = "session-queue-main",
1967   .state = VLIB_NODE_STATE_DISABLED,
1968 };
1969 /* *INDENT-ON* */
1970
1971 /*
1972  * fd.io coding-style-patch-verification: ON
1973  *
1974  * Local Variables:
1975  * eval: (c-set-style "gnu")
1976  * End:
1977  */