session: support half-close connection
[vpp.git] / src / vnet / session / session_node.c
1 /*
2  * Copyright (c) 2017-2019 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include <math.h>
17 #include <vlib/vlib.h>
18 #include <vnet/vnet.h>
19 #include <vppinfra/elog.h>
20 #include <vnet/session/transport.h>
21 #include <vnet/session/session.h>
22 #include <vnet/session/application.h>
23 #include <vnet/session/application_interface.h>
24 #include <vnet/session/application_local.h>
25 #include <vnet/session/session_debug.h>
26 #include <svm/queue.h>
27 #include <sys/timerfd.h>
28
29 #define app_check_thread_and_barrier(_fn, _arg)                         \
30   if (!vlib_thread_is_main_w_barrier ())                                \
31     {                                                                   \
32      vlib_rpc_call_main_thread (_fn, (u8 *) _arg, sizeof(*_arg));       \
33       return;                                                           \
34    }
35
36 static transport_endpt_ext_cfg_t *
37 session_mq_get_ext_config (application_t *app, uword offset)
38 {
39   svm_fifo_chunk_t *c;
40   fifo_segment_t *fs;
41
42   fs = application_get_rx_mqs_segment (app);
43   c = fs_chunk_ptr (fs->h, offset);
44   return (transport_endpt_ext_cfg_t *) c->data;
45 }
46
47 static void
48 session_mq_free_ext_config (application_t *app, uword offset)
49 {
50   svm_fifo_chunk_t *c;
51   fifo_segment_t *fs;
52
53   fs = application_get_rx_mqs_segment (app);
54   c = fs_chunk_ptr (fs->h, offset);
55   fifo_segment_collect_chunk (fs, 0 /* only one slice */, c);
56 }
57
58 static void
59 session_mq_listen_handler (void *data)
60 {
61   session_listen_msg_t *mp = (session_listen_msg_t *) data;
62   vnet_listen_args_t _a, *a = &_a;
63   app_worker_t *app_wrk;
64   application_t *app;
65   int rv;
66
67   app_check_thread_and_barrier (session_mq_listen_handler, mp);
68
69   app = application_lookup (mp->client_index);
70   if (!app)
71     return;
72
73   clib_memset (a, 0, sizeof (*a));
74   a->sep.is_ip4 = mp->is_ip4;
75   ip_copy (&a->sep.ip, &mp->ip, mp->is_ip4);
76   a->sep.port = mp->port;
77   a->sep.fib_index = mp->vrf;
78   a->sep.sw_if_index = ENDPOINT_INVALID_INDEX;
79   a->sep.transport_proto = mp->proto;
80   a->app_index = app->app_index;
81   a->wrk_map_index = mp->wrk_index;
82   a->sep_ext.transport_flags = mp->flags;
83
84   if (mp->ext_config)
85     a->sep_ext.ext_cfg = session_mq_get_ext_config (app, mp->ext_config);
86
87   if ((rv = vnet_listen (a)))
88     clib_warning ("listen returned: %U", format_session_error, rv);
89
90   app_wrk = application_get_worker (app, mp->wrk_index);
91   mq_send_session_bound_cb (app_wrk->wrk_index, mp->context, a->handle, rv);
92
93   if (mp->ext_config)
94     session_mq_free_ext_config (app, mp->ext_config);
95 }
96
97 static void
98 session_mq_listen_uri_handler (void *data)
99 {
100   session_listen_uri_msg_t *mp = (session_listen_uri_msg_t *) data;
101   vnet_listen_args_t _a, *a = &_a;
102   app_worker_t *app_wrk;
103   application_t *app;
104   int rv;
105
106   app_check_thread_and_barrier (session_mq_listen_uri_handler, mp);
107
108   app = application_lookup (mp->client_index);
109   if (!app)
110     return;
111
112   clib_memset (a, 0, sizeof (*a));
113   a->uri = (char *) mp->uri;
114   a->app_index = app->app_index;
115   rv = vnet_bind_uri (a);
116
117   app_wrk = application_get_worker (app, 0);
118   mq_send_session_bound_cb (app_wrk->wrk_index, mp->context, a->handle, rv);
119 }
120
121 static void
122 session_mq_connect_handler (void *data)
123 {
124   session_connect_msg_t *mp = (session_connect_msg_t *) data;
125   vnet_connect_args_t _a, *a = &_a;
126   app_worker_t *app_wrk;
127   application_t *app;
128   int rv;
129
130   app_check_thread_and_barrier (session_mq_connect_handler, mp);
131
132   app = application_lookup (mp->client_index);
133   if (!app)
134     return;
135
136   clib_memset (a, 0, sizeof (*a));
137   a->sep.is_ip4 = mp->is_ip4;
138   clib_memcpy_fast (&a->sep.ip, &mp->ip, sizeof (mp->ip));
139   a->sep.port = mp->port;
140   a->sep.transport_proto = mp->proto;
141   a->sep.peer.fib_index = mp->vrf;
142   clib_memcpy_fast (&a->sep.peer.ip, &mp->lcl_ip, sizeof (mp->lcl_ip));
143   if (mp->is_ip4)
144     {
145       ip46_address_mask_ip4 (&a->sep.ip);
146       ip46_address_mask_ip4 (&a->sep.peer.ip);
147     }
148   a->sep.peer.port = mp->lcl_port;
149   a->sep.peer.sw_if_index = ENDPOINT_INVALID_INDEX;
150   a->sep_ext.parent_handle = mp->parent_handle;
151   a->sep_ext.transport_flags = mp->flags;
152   a->api_context = mp->context;
153   a->app_index = app->app_index;
154   a->wrk_map_index = mp->wrk_index;
155
156   if (mp->ext_config)
157     a->sep_ext.ext_cfg = session_mq_get_ext_config (app, mp->ext_config);
158
159   if ((rv = vnet_connect (a)))
160     {
161       clib_warning ("connect returned: %U", format_session_error, rv);
162       app_wrk = application_get_worker (app, mp->wrk_index);
163       mq_send_session_connected_cb (app_wrk->wrk_index, mp->context, 0, rv);
164     }
165
166   if (mp->ext_config)
167     session_mq_free_ext_config (app, mp->ext_config);
168 }
169
170 static void
171 session_mq_connect_uri_handler (void *data)
172 {
173   session_connect_uri_msg_t *mp = (session_connect_uri_msg_t *) data;
174   vnet_connect_args_t _a, *a = &_a;
175   app_worker_t *app_wrk;
176   application_t *app;
177   int rv;
178
179   app_check_thread_and_barrier (session_mq_connect_uri_handler, mp);
180
181   app = application_lookup (mp->client_index);
182   if (!app)
183     return;
184
185   clib_memset (a, 0, sizeof (*a));
186   a->uri = (char *) mp->uri;
187   a->api_context = mp->context;
188   a->app_index = app->app_index;
189   if ((rv = vnet_connect_uri (a)))
190     {
191       clib_warning ("connect_uri returned: %d", rv);
192       app_wrk = application_get_worker (app, 0 /* default wrk only */ );
193       mq_send_session_connected_cb (app_wrk->wrk_index, mp->context, 0, rv);
194     }
195 }
196
197 static void
198 session_mq_shutdown_handler (void *data)
199 {
200   session_shutdown_msg_t *mp = (session_shutdown_msg_t *) data;
201   vnet_shutdown_args_t _a, *a = &_a;
202   application_t *app;
203
204   app = application_lookup (mp->client_index);
205   if (!app)
206     return;
207
208   a->app_index = app->app_index;
209   a->handle = mp->handle;
210   vnet_shutdown_session (a);
211 }
212
213 static void
214 session_mq_disconnect_handler (void *data)
215 {
216   session_disconnect_msg_t *mp = (session_disconnect_msg_t *) data;
217   vnet_disconnect_args_t _a, *a = &_a;
218   application_t *app;
219
220   app = application_lookup (mp->client_index);
221   if (!app)
222     return;
223
224   a->app_index = app->app_index;
225   a->handle = mp->handle;
226   vnet_disconnect_session (a);
227 }
228
229 static void
230 app_mq_detach_handler (void *data)
231 {
232   session_app_detach_msg_t *mp = (session_app_detach_msg_t *) data;
233   vnet_app_detach_args_t _a, *a = &_a;
234   application_t *app;
235
236   app_check_thread_and_barrier (app_mq_detach_handler, mp);
237
238   app = application_lookup (mp->client_index);
239   if (!app)
240     return;
241
242   a->app_index = app->app_index;
243   a->api_client_index = mp->client_index;
244   vnet_application_detach (a);
245 }
246
247 static void
248 session_mq_unlisten_handler (void *data)
249 {
250   session_unlisten_msg_t *mp = (session_unlisten_msg_t *) data;
251   vnet_unlisten_args_t _a, *a = &_a;
252   app_worker_t *app_wrk;
253   application_t *app;
254   int rv;
255
256   app_check_thread_and_barrier (session_mq_unlisten_handler, mp);
257
258   app = application_lookup (mp->client_index);
259   if (!app)
260     return;
261
262   clib_memset (a, 0, sizeof (*a));
263   a->app_index = app->app_index;
264   a->handle = mp->handle;
265   a->wrk_map_index = mp->wrk_index;
266   if ((rv = vnet_unlisten (a)))
267     clib_warning ("unlisten returned: %d", rv);
268
269   app_wrk = application_get_worker (app, a->wrk_map_index);
270   if (!app_wrk)
271     return;
272
273   mq_send_unlisten_reply (app_wrk, mp->handle, mp->context, rv);
274 }
275
276 static void
277 session_mq_accepted_reply_handler (void *data)
278 {
279   session_accepted_reply_msg_t *mp = (session_accepted_reply_msg_t *) data;
280   vnet_disconnect_args_t _a = { 0 }, *a = &_a;
281   session_state_t old_state;
282   app_worker_t *app_wrk;
283   session_t *s;
284
285   /* Server isn't interested, kill the session */
286   if (mp->retval)
287     {
288       a->app_index = mp->context;
289       a->handle = mp->handle;
290       vnet_disconnect_session (a);
291       return;
292     }
293
294   /* Mail this back from the main thread. We're not polling in main
295    * thread so we're using other workers for notifications. */
296   if (vlib_num_workers () && vlib_get_thread_index () != 0
297       && session_thread_from_handle (mp->handle) == 0)
298     {
299       vlib_rpc_call_main_thread (session_mq_accepted_reply_handler,
300                                  (u8 *) mp, sizeof (*mp));
301       return;
302     }
303
304   s = session_get_from_handle_if_valid (mp->handle);
305   if (!s)
306     return;
307
308   app_wrk = app_worker_get (s->app_wrk_index);
309   if (app_wrk->app_index != mp->context)
310     {
311       clib_warning ("app doesn't own session");
312       return;
313     }
314
315   if (!session_has_transport (s))
316     {
317       s->session_state = SESSION_STATE_READY;
318       if (ct_session_connect_notify (s))
319         return;
320     }
321   else
322     {
323       old_state = s->session_state;
324       s->session_state = SESSION_STATE_READY;
325
326       if (!svm_fifo_is_empty_prod (s->rx_fifo))
327         app_worker_lock_and_send_event (app_wrk, s, SESSION_IO_EVT_RX);
328
329       /* Closed while waiting for app to reply. Resend disconnect */
330       if (old_state >= SESSION_STATE_TRANSPORT_CLOSING)
331         {
332           app_worker_close_notify (app_wrk, s);
333           s->session_state = old_state;
334           return;
335         }
336     }
337 }
338
339 static void
340 session_mq_reset_reply_handler (void *data)
341 {
342   vnet_disconnect_args_t _a = { 0 }, *a = &_a;
343   session_reset_reply_msg_t *mp;
344   app_worker_t *app_wrk;
345   session_t *s;
346   application_t *app;
347   u32 index, thread_index;
348
349   mp = (session_reset_reply_msg_t *) data;
350   app = application_lookup (mp->context);
351   if (!app)
352     return;
353
354   session_parse_handle (mp->handle, &index, &thread_index);
355   s = session_get_if_valid (index, thread_index);
356
357   /* No session or not the right session */
358   if (!s || s->session_state < SESSION_STATE_TRANSPORT_CLOSING)
359     return;
360
361   app_wrk = app_worker_get (s->app_wrk_index);
362   if (!app_wrk || app_wrk->app_index != app->app_index)
363     {
364       clib_warning ("App %u does not own handle 0x%lx!", app->app_index,
365                     mp->handle);
366       return;
367     }
368
369   /* Client objected to resetting the session, log and continue */
370   if (mp->retval)
371     {
372       clib_warning ("client retval %d", mp->retval);
373       return;
374     }
375
376   /* This comes as a response to a reset, transport only waiting for
377    * confirmation to remove connection state, no need to disconnect */
378   a->handle = mp->handle;
379   a->app_index = app->app_index;
380   vnet_disconnect_session (a);
381 }
382
383 static void
384 session_mq_disconnected_handler (void *data)
385 {
386   session_disconnected_reply_msg_t *rmp;
387   vnet_disconnect_args_t _a, *a = &_a;
388   svm_msg_q_msg_t _msg, *msg = &_msg;
389   session_disconnected_msg_t *mp;
390   app_worker_t *app_wrk;
391   session_event_t *evt;
392   session_t *s;
393   application_t *app;
394   int rv = 0;
395
396   mp = (session_disconnected_msg_t *) data;
397   if (!(s = session_get_from_handle_if_valid (mp->handle)))
398     {
399       clib_warning ("could not disconnect handle %llu", mp->handle);
400       return;
401     }
402   app_wrk = app_worker_get (s->app_wrk_index);
403   app = application_lookup (mp->client_index);
404   if (!(app_wrk && app && app->app_index == app_wrk->app_index))
405     {
406       clib_warning ("could not disconnect session: %llu app: %u",
407                     mp->handle, mp->client_index);
408       return;
409     }
410
411   a->handle = mp->handle;
412   a->app_index = app_wrk->wrk_index;
413   rv = vnet_disconnect_session (a);
414
415   svm_msg_q_lock_and_alloc_msg_w_ring (app_wrk->event_queue,
416                                        SESSION_MQ_CTRL_EVT_RING,
417                                        SVM_Q_WAIT, msg);
418   evt = svm_msg_q_msg_data (app_wrk->event_queue, msg);
419   clib_memset (evt, 0, sizeof (*evt));
420   evt->event_type = SESSION_CTRL_EVT_DISCONNECTED_REPLY;
421   rmp = (session_disconnected_reply_msg_t *) evt->data;
422   rmp->handle = mp->handle;
423   rmp->context = mp->context;
424   rmp->retval = rv;
425   svm_msg_q_add_and_unlock (app_wrk->event_queue, msg);
426 }
427
428 static void
429 session_mq_disconnected_reply_handler (void *data)
430 {
431   session_disconnected_reply_msg_t *mp;
432   vnet_disconnect_args_t _a, *a = &_a;
433   application_t *app;
434
435   mp = (session_disconnected_reply_msg_t *) data;
436
437   /* Client objected to disconnecting the session, log and continue */
438   if (mp->retval)
439     {
440       clib_warning ("client retval %d", mp->retval);
441       return;
442     }
443
444   /* Disconnect has been confirmed. Confirm close to transport */
445   app = application_lookup (mp->context);
446   if (app)
447     {
448       a->handle = mp->handle;
449       a->app_index = app->app_index;
450       vnet_disconnect_session (a);
451     }
452 }
453
454 static void
455 session_mq_worker_update_handler (void *data)
456 {
457   session_worker_update_msg_t *mp = (session_worker_update_msg_t *) data;
458   session_worker_update_reply_msg_t *rmp;
459   svm_msg_q_msg_t _msg, *msg = &_msg;
460   app_worker_t *app_wrk;
461   u32 owner_app_wrk_map;
462   session_event_t *evt;
463   session_t *s;
464   application_t *app;
465
466   app = application_lookup (mp->client_index);
467   if (!app)
468     return;
469   if (!(s = session_get_from_handle_if_valid (mp->handle)))
470     {
471       clib_warning ("invalid handle %llu", mp->handle);
472       return;
473     }
474   app_wrk = app_worker_get (s->app_wrk_index);
475   if (app_wrk->app_index != app->app_index)
476     {
477       clib_warning ("app %u does not own session %llu", app->app_index,
478                     mp->handle);
479       return;
480     }
481   owner_app_wrk_map = app_wrk->wrk_map_index;
482   app_wrk = application_get_worker (app, mp->wrk_index);
483
484   /* This needs to come from the new owner */
485   if (mp->req_wrk_index == owner_app_wrk_map)
486     {
487       session_req_worker_update_msg_t *wump;
488
489       svm_msg_q_lock_and_alloc_msg_w_ring (app_wrk->event_queue,
490                                            SESSION_MQ_CTRL_EVT_RING,
491                                            SVM_Q_WAIT, msg);
492       evt = svm_msg_q_msg_data (app_wrk->event_queue, msg);
493       clib_memset (evt, 0, sizeof (*evt));
494       evt->event_type = SESSION_CTRL_EVT_REQ_WORKER_UPDATE;
495       wump = (session_req_worker_update_msg_t *) evt->data;
496       wump->session_handle = mp->handle;
497       svm_msg_q_add_and_unlock (app_wrk->event_queue, msg);
498       return;
499     }
500
501   app_worker_own_session (app_wrk, s);
502
503   /*
504    * Send reply
505    */
506   svm_msg_q_lock_and_alloc_msg_w_ring (app_wrk->event_queue,
507                                        SESSION_MQ_CTRL_EVT_RING,
508                                        SVM_Q_WAIT, msg);
509   evt = svm_msg_q_msg_data (app_wrk->event_queue, msg);
510   clib_memset (evt, 0, sizeof (*evt));
511   evt->event_type = SESSION_CTRL_EVT_WORKER_UPDATE_REPLY;
512   rmp = (session_worker_update_reply_msg_t *) evt->data;
513   rmp->handle = mp->handle;
514   if (s->rx_fifo)
515     rmp->rx_fifo = fifo_segment_fifo_offset (s->rx_fifo);
516   if (s->tx_fifo)
517     rmp->tx_fifo = fifo_segment_fifo_offset (s->tx_fifo);
518   rmp->segment_handle = session_segment_handle (s);
519   svm_msg_q_add_and_unlock (app_wrk->event_queue, msg);
520
521   /*
522    * Retransmit messages that may have been lost
523    */
524   if (s->tx_fifo && !svm_fifo_is_empty (s->tx_fifo))
525     session_send_io_evt_to_thread (s->tx_fifo, SESSION_IO_EVT_TX);
526
527   if (s->rx_fifo && !svm_fifo_is_empty (s->rx_fifo))
528     app_worker_lock_and_send_event (app_wrk, s, SESSION_IO_EVT_RX);
529
530   if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSING)
531     app_worker_close_notify (app_wrk, s);
532 }
533
534 static void
535 session_mq_app_wrk_rpc_handler (void *data)
536 {
537   session_app_wrk_rpc_msg_t *mp = (session_app_wrk_rpc_msg_t *) data;
538   svm_msg_q_msg_t _msg, *msg = &_msg;
539   session_app_wrk_rpc_msg_t *rmp;
540   app_worker_t *app_wrk;
541   session_event_t *evt;
542   application_t *app;
543
544   app = application_lookup (mp->client_index);
545   if (!app)
546     return;
547
548   app_wrk = application_get_worker (app, mp->wrk_index);
549
550   svm_msg_q_lock_and_alloc_msg_w_ring (app_wrk->event_queue,
551                                        SESSION_MQ_CTRL_EVT_RING, SVM_Q_WAIT,
552                                        msg);
553   evt = svm_msg_q_msg_data (app_wrk->event_queue, msg);
554   clib_memset (evt, 0, sizeof (*evt));
555   evt->event_type = SESSION_CTRL_EVT_APP_WRK_RPC;
556   rmp = (session_app_wrk_rpc_msg_t *) evt->data;
557   clib_memcpy (rmp->data, mp->data, sizeof (mp->data));
558   svm_msg_q_add_and_unlock (app_wrk->event_queue, msg);
559 }
560
561 static void
562 session_mq_transport_attr_handler (void *data)
563 {
564   session_transport_attr_msg_t *mp = (session_transport_attr_msg_t *) data;
565   session_transport_attr_reply_msg_t *rmp;
566   svm_msg_q_msg_t _msg, *msg = &_msg;
567   app_worker_t *app_wrk;
568   session_event_t *evt;
569   application_t *app;
570   session_t *s;
571   int rv;
572
573   app = application_lookup (mp->client_index);
574   if (!app)
575     return;
576
577   if (!(s = session_get_from_handle_if_valid (mp->handle)))
578     {
579       clib_warning ("invalid handle %llu", mp->handle);
580       return;
581     }
582   app_wrk = app_worker_get (s->app_wrk_index);
583   if (app_wrk->app_index != app->app_index)
584     {
585       clib_warning ("app %u does not own session %llu", app->app_index,
586                     mp->handle);
587       return;
588     }
589
590   rv = session_transport_attribute (s, mp->is_get, &mp->attr);
591
592   svm_msg_q_lock_and_alloc_msg_w_ring (
593     app_wrk->event_queue, SESSION_MQ_CTRL_EVT_RING, SVM_Q_WAIT, msg);
594   evt = svm_msg_q_msg_data (app_wrk->event_queue, msg);
595   clib_memset (evt, 0, sizeof (*evt));
596   evt->event_type = SESSION_CTRL_EVT_TRANSPORT_ATTR_REPLY;
597   rmp = (session_transport_attr_reply_msg_t *) evt->data;
598   rmp->handle = mp->handle;
599   rmp->retval = rv;
600   rmp->is_get = mp->is_get;
601   if (!rv && mp->is_get)
602     rmp->attr = mp->attr;
603   svm_msg_q_add_and_unlock (app_wrk->event_queue, msg);
604 }
605
606 vlib_node_registration_t session_queue_node;
607
608 typedef struct
609 {
610   u32 session_index;
611   u32 server_thread_index;
612 } session_queue_trace_t;
613
614 /* packet trace format function */
615 static u8 *
616 format_session_queue_trace (u8 * s, va_list * args)
617 {
618   CLIB_UNUSED (vlib_main_t * vm) = va_arg (*args, vlib_main_t *);
619   CLIB_UNUSED (vlib_node_t * node) = va_arg (*args, vlib_node_t *);
620   session_queue_trace_t *t = va_arg (*args, session_queue_trace_t *);
621
622   s = format (s, "session index %d thread index %d",
623               t->session_index, t->server_thread_index);
624   return s;
625 }
626
627 #define foreach_session_queue_error             \
628 _(TX, "Packets transmitted")                    \
629 _(TIMER, "Timer events")                        \
630 _(NO_BUFFER, "Out of buffers")
631
632 typedef enum
633 {
634 #define _(sym,str) SESSION_QUEUE_ERROR_##sym,
635   foreach_session_queue_error
636 #undef _
637     SESSION_QUEUE_N_ERROR,
638 } session_queue_error_t;
639
640 static char *session_queue_error_strings[] = {
641 #define _(sym,string) string,
642   foreach_session_queue_error
643 #undef _
644 };
645
646 enum
647 {
648   SESSION_TX_NO_BUFFERS = -2,
649   SESSION_TX_NO_DATA,
650   SESSION_TX_OK
651 };
652
653 static void
654 session_tx_trace_frame (vlib_main_t * vm, vlib_node_runtime_t * node,
655                         u32 next_index, u32 * to_next, u16 n_segs,
656                         session_t * s, u32 n_trace)
657 {
658   while (n_trace && n_segs)
659     {
660       vlib_buffer_t *b = vlib_get_buffer (vm, to_next[0]);
661       if (PREDICT_TRUE
662           (vlib_trace_buffer
663            (vm, node, next_index, b, 1 /* follow_chain */ )))
664         {
665           session_queue_trace_t *t =
666             vlib_add_trace (vm, node, b, sizeof (*t));
667           t->session_index = s->session_index;
668           t->server_thread_index = s->thread_index;
669           n_trace--;
670         }
671       to_next++;
672       n_segs--;
673     }
674   vlib_set_trace_count (vm, node, n_trace);
675 }
676
677 always_inline void
678 session_tx_fifo_chain_tail (vlib_main_t * vm, session_tx_context_t * ctx,
679                             vlib_buffer_t * b, u16 * n_bufs, u8 peek_data)
680 {
681   vlib_buffer_t *chain_b, *prev_b;
682   u32 chain_bi0, to_deq, left_from_seg;
683   session_worker_t *wrk;
684   u16 len_to_deq, n_bytes_read;
685   u8 *data, j;
686
687   wrk = session_main_get_worker (ctx->s->thread_index);
688   b->flags |= VLIB_BUFFER_TOTAL_LENGTH_VALID;
689   b->total_length_not_including_first_buffer = 0;
690
691   chain_b = b;
692   left_from_seg = clib_min (ctx->sp.snd_mss - b->current_length,
693                             ctx->left_to_snd);
694   to_deq = left_from_seg;
695   for (j = 1; j < ctx->n_bufs_per_seg; j++)
696     {
697       prev_b = chain_b;
698       len_to_deq = clib_min (to_deq, ctx->deq_per_buf);
699
700       *n_bufs -= 1;
701       chain_bi0 = wrk->tx_buffers[*n_bufs];
702       chain_b = vlib_get_buffer (vm, chain_bi0);
703       chain_b->current_data = 0;
704       data = vlib_buffer_get_current (chain_b);
705       if (peek_data)
706         {
707           n_bytes_read = svm_fifo_peek (ctx->s->tx_fifo,
708                                         ctx->sp.tx_offset, len_to_deq, data);
709           ctx->sp.tx_offset += n_bytes_read;
710         }
711       else
712         {
713           if (ctx->transport_vft->transport_options.tx_type ==
714               TRANSPORT_TX_DGRAM)
715             {
716               svm_fifo_t *f = ctx->s->tx_fifo;
717               session_dgram_hdr_t *hdr = &ctx->hdr;
718               u16 deq_now;
719               u32 offset;
720
721               deq_now = clib_min (hdr->data_length - hdr->data_offset,
722                                   len_to_deq);
723               offset = hdr->data_offset + SESSION_CONN_HDR_LEN;
724               n_bytes_read = svm_fifo_peek (f, offset, deq_now, data);
725               ASSERT (n_bytes_read > 0);
726
727               hdr->data_offset += n_bytes_read;
728               if (hdr->data_offset == hdr->data_length)
729                 {
730                   offset = hdr->data_length + SESSION_CONN_HDR_LEN;
731                   svm_fifo_dequeue_drop (f, offset);
732                   if (ctx->left_to_snd > n_bytes_read)
733                     svm_fifo_peek (ctx->s->tx_fifo, 0, sizeof (ctx->hdr),
734                                    (u8 *) & ctx->hdr);
735                 }
736               else if (ctx->left_to_snd == n_bytes_read)
737                 svm_fifo_overwrite_head (ctx->s->tx_fifo, (u8 *) & ctx->hdr,
738                                          sizeof (session_dgram_pre_hdr_t));
739             }
740           else
741             n_bytes_read = svm_fifo_dequeue (ctx->s->tx_fifo,
742                                              len_to_deq, data);
743         }
744       ASSERT (n_bytes_read == len_to_deq);
745       chain_b->current_length = n_bytes_read;
746       b->total_length_not_including_first_buffer += chain_b->current_length;
747
748       /* update previous buffer */
749       prev_b->next_buffer = chain_bi0;
750       prev_b->flags |= VLIB_BUFFER_NEXT_PRESENT;
751
752       /* update current buffer */
753       chain_b->next_buffer = 0;
754
755       to_deq -= n_bytes_read;
756       if (to_deq == 0)
757         break;
758     }
759   ASSERT (to_deq == 0
760           && b->total_length_not_including_first_buffer == left_from_seg);
761   ctx->left_to_snd -= left_from_seg;
762 }
763
764 always_inline void
765 session_tx_fill_buffer (vlib_main_t * vm, session_tx_context_t * ctx,
766                         vlib_buffer_t * b, u16 * n_bufs, u8 peek_data)
767 {
768   u32 len_to_deq;
769   u8 *data0;
770   int n_bytes_read;
771
772   /*
773    * Start with the first buffer in chain
774    */
775   b->error = 0;
776   b->flags = VNET_BUFFER_F_LOCALLY_ORIGINATED;
777   b->current_data = 0;
778
779   data0 = vlib_buffer_make_headroom (b, TRANSPORT_MAX_HDRS_LEN);
780   len_to_deq = clib_min (ctx->left_to_snd, ctx->deq_per_first_buf);
781
782   if (peek_data)
783     {
784       n_bytes_read = svm_fifo_peek (ctx->s->tx_fifo, ctx->sp.tx_offset,
785                                     len_to_deq, data0);
786       ASSERT (n_bytes_read > 0);
787       /* Keep track of progress locally, transport is also supposed to
788        * increment it independently when pushing the header */
789       ctx->sp.tx_offset += n_bytes_read;
790     }
791   else
792     {
793       if (ctx->transport_vft->transport_options.tx_type == TRANSPORT_TX_DGRAM)
794         {
795           session_dgram_hdr_t *hdr = &ctx->hdr;
796           svm_fifo_t *f = ctx->s->tx_fifo;
797           u16 deq_now;
798           u32 offset;
799
800           ASSERT (hdr->data_length > hdr->data_offset);
801           deq_now = clib_min (hdr->data_length - hdr->data_offset,
802                               len_to_deq);
803           offset = hdr->data_offset + SESSION_CONN_HDR_LEN;
804           n_bytes_read = svm_fifo_peek (f, offset, deq_now, data0);
805           ASSERT (n_bytes_read > 0);
806
807           if (ctx->s->session_state == SESSION_STATE_LISTENING)
808             {
809               ip_copy (&ctx->tc->rmt_ip, &hdr->rmt_ip, ctx->tc->is_ip4);
810               ctx->tc->rmt_port = hdr->rmt_port;
811             }
812           hdr->data_offset += n_bytes_read;
813           if (hdr->data_offset == hdr->data_length)
814             {
815               offset = hdr->data_length + SESSION_CONN_HDR_LEN;
816               svm_fifo_dequeue_drop (f, offset);
817               if (ctx->left_to_snd > n_bytes_read)
818                 svm_fifo_peek (ctx->s->tx_fifo, 0, sizeof (ctx->hdr),
819                                (u8 *) & ctx->hdr);
820             }
821           else if (ctx->left_to_snd == n_bytes_read)
822             svm_fifo_overwrite_head (ctx->s->tx_fifo, (u8 *) & ctx->hdr,
823                                      sizeof (session_dgram_pre_hdr_t));
824         }
825       else
826         {
827           n_bytes_read = svm_fifo_dequeue (ctx->s->tx_fifo,
828                                            len_to_deq, data0);
829           ASSERT (n_bytes_read > 0);
830         }
831     }
832   b->current_length = n_bytes_read;
833   ctx->left_to_snd -= n_bytes_read;
834
835   /*
836    * Fill in the remaining buffers in the chain, if any
837    */
838   if (PREDICT_FALSE (ctx->n_bufs_per_seg > 1 && ctx->left_to_snd))
839     session_tx_fifo_chain_tail (vm, ctx, b, n_bufs, peek_data);
840 }
841
842 always_inline u8
843 session_tx_not_ready (session_t * s, u8 peek_data)
844 {
845   if (peek_data)
846     {
847       if (PREDICT_TRUE (s->session_state == SESSION_STATE_READY))
848         return 0;
849       /* Can retransmit for closed sessions but can't send new data if
850        * session is not ready or closed */
851       else if (s->session_state < SESSION_STATE_READY)
852         return 1;
853       else if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSED)
854         {
855           /* Allow closed transports to still send custom packets.
856            * For instance, tcp may want to send acks in time-wait. */
857           if (s->session_state != SESSION_STATE_TRANSPORT_DELETED
858               && (s->flags & SESSION_F_CUSTOM_TX))
859             return 0;
860           return 2;
861         }
862     }
863   return 0;
864 }
865
866 always_inline transport_connection_t *
867 session_tx_get_transport (session_tx_context_t * ctx, u8 peek_data)
868 {
869   if (peek_data)
870     {
871       return ctx->transport_vft->get_connection (ctx->s->connection_index,
872                                                  ctx->s->thread_index);
873     }
874   else
875     {
876       if (ctx->s->session_state == SESSION_STATE_LISTENING)
877         return ctx->transport_vft->get_listener (ctx->s->connection_index);
878       else
879         {
880           return ctx->transport_vft->get_connection (ctx->s->connection_index,
881                                                      ctx->s->thread_index);
882         }
883     }
884 }
885
886 always_inline void
887 session_tx_set_dequeue_params (vlib_main_t * vm, session_tx_context_t * ctx,
888                                u32 max_segs, u8 peek_data)
889 {
890   u32 n_bytes_per_buf, n_bytes_per_seg;
891
892   n_bytes_per_buf = vlib_buffer_get_default_data_size (vm);
893   ctx->max_dequeue = svm_fifo_max_dequeue_cons (ctx->s->tx_fifo);
894
895   if (peek_data)
896     {
897       /* Offset in rx fifo from where to peek data */
898       if (PREDICT_FALSE (ctx->sp.tx_offset >= ctx->max_dequeue))
899         {
900           ctx->max_len_to_snd = 0;
901           return;
902         }
903       ctx->max_dequeue -= ctx->sp.tx_offset;
904     }
905   else
906     {
907       if (ctx->transport_vft->transport_options.tx_type == TRANSPORT_TX_DGRAM)
908         {
909           u32 len, chain_limit;
910
911           if (ctx->max_dequeue <= sizeof (ctx->hdr))
912             {
913               ctx->max_len_to_snd = 0;
914               return;
915             }
916
917           svm_fifo_peek (ctx->s->tx_fifo, 0, sizeof (ctx->hdr),
918                          (u8 *) & ctx->hdr);
919           ASSERT (ctx->hdr.data_length > ctx->hdr.data_offset);
920           len = ctx->hdr.data_length - ctx->hdr.data_offset;
921
922           /* Process multiple dgrams if smaller than min (buf_space, mss).
923            * This avoids handling multiple dgrams if they require buffer
924            * chains */
925           chain_limit = clib_min (n_bytes_per_buf - TRANSPORT_MAX_HDRS_LEN,
926                                   ctx->sp.snd_mss);
927           if (ctx->hdr.data_length <= chain_limit)
928             {
929               u32 first_dgram_len, dgram_len, offset, max_offset;
930               session_dgram_hdr_t hdr;
931
932               ctx->sp.snd_mss = clib_min (ctx->sp.snd_mss, len);
933               offset = ctx->hdr.data_length + sizeof (session_dgram_hdr_t);
934               first_dgram_len = len;
935               max_offset = clib_min (ctx->max_dequeue, 16 << 10);
936
937               while (offset < max_offset)
938                 {
939                   svm_fifo_peek (ctx->s->tx_fifo, offset, sizeof (ctx->hdr),
940                                  (u8 *) & hdr);
941                   ASSERT (hdr.data_length > hdr.data_offset);
942                   dgram_len = hdr.data_length - hdr.data_offset;
943                   if (len + dgram_len > ctx->max_dequeue
944                       || first_dgram_len != dgram_len)
945                     break;
946                   len += dgram_len;
947                   offset += sizeof (hdr) + hdr.data_length;
948                 }
949             }
950
951           ctx->max_dequeue = len;
952         }
953     }
954   ASSERT (ctx->max_dequeue > 0);
955
956   /* Ensure we're not writing more than transport window allows */
957   if (ctx->max_dequeue < ctx->sp.snd_space)
958     {
959       /* Constrained by tx queue. Try to send only fully formed segments */
960       ctx->max_len_to_snd = (ctx->max_dequeue > ctx->sp.snd_mss) ?
961         (ctx->max_dequeue - (ctx->max_dequeue % ctx->sp.snd_mss)) :
962         ctx->max_dequeue;
963       /* TODO Nagle ? */
964     }
965   else
966     {
967       /* Expectation is that snd_space0 is already a multiple of snd_mss */
968       ctx->max_len_to_snd = ctx->sp.snd_space;
969     }
970
971   /* Check if we're tx constrained by the node */
972   ctx->n_segs_per_evt = ceil ((f64) ctx->max_len_to_snd / ctx->sp.snd_mss);
973   if (ctx->n_segs_per_evt > max_segs)
974     {
975       ctx->n_segs_per_evt = max_segs;
976       ctx->max_len_to_snd = max_segs * ctx->sp.snd_mss;
977     }
978
979   ASSERT (n_bytes_per_buf > TRANSPORT_MAX_HDRS_LEN);
980   if (ctx->n_segs_per_evt > 1)
981     {
982       u32 n_bytes_last_seg, n_bufs_last_seg;
983
984       n_bytes_per_seg = TRANSPORT_MAX_HDRS_LEN + ctx->sp.snd_mss;
985       n_bytes_last_seg = TRANSPORT_MAX_HDRS_LEN + ctx->max_len_to_snd
986         - ((ctx->n_segs_per_evt - 1) * ctx->sp.snd_mss);
987       ctx->n_bufs_per_seg = ceil ((f64) n_bytes_per_seg / n_bytes_per_buf);
988       n_bufs_last_seg = ceil ((f64) n_bytes_last_seg / n_bytes_per_buf);
989       ctx->n_bufs_needed = ((ctx->n_segs_per_evt - 1) * ctx->n_bufs_per_seg)
990         + n_bufs_last_seg;
991     }
992   else
993     {
994       n_bytes_per_seg = TRANSPORT_MAX_HDRS_LEN + ctx->max_len_to_snd;
995       ctx->n_bufs_per_seg = ceil ((f64) n_bytes_per_seg / n_bytes_per_buf);
996       ctx->n_bufs_needed = ctx->n_bufs_per_seg;
997     }
998
999   ctx->deq_per_buf = clib_min (ctx->sp.snd_mss, n_bytes_per_buf);
1000   ctx->deq_per_first_buf = clib_min (ctx->sp.snd_mss,
1001                                      n_bytes_per_buf -
1002                                      TRANSPORT_MAX_HDRS_LEN);
1003 }
1004
1005 always_inline void
1006 session_tx_maybe_reschedule (session_worker_t * wrk,
1007                              session_tx_context_t * ctx,
1008                              session_evt_elt_t * elt)
1009 {
1010   session_t *s = ctx->s;
1011
1012   svm_fifo_unset_event (s->tx_fifo);
1013   if (svm_fifo_max_dequeue_cons (s->tx_fifo) > ctx->sp.tx_offset)
1014     if (svm_fifo_set_event (s->tx_fifo))
1015       session_evt_add_head_old (wrk, elt);
1016 }
1017
1018 always_inline int
1019 session_tx_fifo_read_and_snd_i (session_worker_t * wrk,
1020                                 vlib_node_runtime_t * node,
1021                                 session_evt_elt_t * elt,
1022                                 int *n_tx_packets, u8 peek_data)
1023 {
1024   u32 n_trace, n_left, pbi, next_index, max_burst;
1025   session_tx_context_t *ctx = &wrk->ctx;
1026   session_main_t *smm = &session_main;
1027   session_event_t *e = &elt->evt;
1028   vlib_main_t *vm = wrk->vm;
1029   transport_proto_t tp;
1030   vlib_buffer_t *pb;
1031   u16 n_bufs, rv;
1032
1033   if (PREDICT_FALSE ((rv = session_tx_not_ready (ctx->s, peek_data))))
1034     {
1035       if (rv < 2)
1036         session_evt_add_old (wrk, elt);
1037       return SESSION_TX_NO_DATA;
1038     }
1039
1040   next_index = smm->session_type_to_next[ctx->s->session_type];
1041   max_burst = SESSION_NODE_FRAME_SIZE - *n_tx_packets;
1042
1043   tp = session_get_transport_proto (ctx->s);
1044   ctx->transport_vft = transport_protocol_get_vft (tp);
1045   ctx->tc = session_tx_get_transport (ctx, peek_data);
1046
1047   if (PREDICT_FALSE (e->event_type == SESSION_IO_EVT_TX_FLUSH))
1048     {
1049       if (ctx->transport_vft->flush_data)
1050         ctx->transport_vft->flush_data (ctx->tc);
1051       e->event_type = SESSION_IO_EVT_TX;
1052     }
1053
1054   if (ctx->s->flags & SESSION_F_CUSTOM_TX)
1055     {
1056       u32 n_custom_tx;
1057       ctx->s->flags &= ~SESSION_F_CUSTOM_TX;
1058       ctx->sp.max_burst_size = max_burst;
1059       n_custom_tx = ctx->transport_vft->custom_tx (ctx->tc, &ctx->sp);
1060       *n_tx_packets += n_custom_tx;
1061       if (PREDICT_FALSE
1062           (ctx->s->session_state >= SESSION_STATE_TRANSPORT_CLOSED))
1063         return SESSION_TX_OK;
1064       max_burst -= n_custom_tx;
1065       if (!max_burst || (ctx->s->flags & SESSION_F_CUSTOM_TX))
1066         {
1067           session_evt_add_old (wrk, elt);
1068           return SESSION_TX_OK;
1069         }
1070     }
1071
1072   transport_connection_snd_params (ctx->tc, &ctx->sp);
1073
1074   if (!ctx->sp.snd_space)
1075     {
1076       /* If the deschedule flag was set, remove session from scheduler.
1077        * Transport is responsible for rescheduling this session. */
1078       if (ctx->sp.flags & TRANSPORT_SND_F_DESCHED)
1079         transport_connection_deschedule (ctx->tc);
1080       /* Request to postpone the session, e.g., zero-wnd and transport
1081        * is not currently probing */
1082       else if (ctx->sp.flags & TRANSPORT_SND_F_POSTPONE)
1083         session_evt_add_old (wrk, elt);
1084       /* This flow queue is "empty" so it should be re-evaluated before
1085        * the ones that have data to send. */
1086       else
1087         session_evt_add_head_old (wrk, elt);
1088
1089       return SESSION_TX_NO_DATA;
1090     }
1091
1092   if (transport_connection_is_tx_paced (ctx->tc))
1093     {
1094       u32 snd_space = transport_connection_tx_pacer_burst (ctx->tc);
1095       if (snd_space < TRANSPORT_PACER_MIN_BURST)
1096         {
1097           session_evt_add_head_old (wrk, elt);
1098           return SESSION_TX_NO_DATA;
1099         }
1100       snd_space = clib_min (ctx->sp.snd_space, snd_space);
1101       ctx->sp.snd_space = snd_space >= ctx->sp.snd_mss ?
1102         snd_space - snd_space % ctx->sp.snd_mss : snd_space;
1103     }
1104
1105   /* Check how much we can pull. */
1106   session_tx_set_dequeue_params (vm, ctx, max_burst, peek_data);
1107
1108   if (PREDICT_FALSE (!ctx->max_len_to_snd))
1109     {
1110       transport_connection_tx_pacer_reset_bucket (ctx->tc, 0);
1111       session_tx_maybe_reschedule (wrk, ctx, elt);
1112       return SESSION_TX_NO_DATA;
1113     }
1114
1115   vec_validate_aligned (wrk->tx_buffers, ctx->n_bufs_needed - 1,
1116                         CLIB_CACHE_LINE_BYTES);
1117   n_bufs = vlib_buffer_alloc (vm, wrk->tx_buffers, ctx->n_bufs_needed);
1118   if (PREDICT_FALSE (n_bufs < ctx->n_bufs_needed))
1119     {
1120       if (n_bufs)
1121         vlib_buffer_free (vm, wrk->tx_buffers, n_bufs);
1122       session_evt_add_head_old (wrk, elt);
1123       vlib_node_increment_counter (wrk->vm, node->node_index,
1124                                    SESSION_QUEUE_ERROR_NO_BUFFER, 1);
1125       return SESSION_TX_NO_BUFFERS;
1126     }
1127
1128   if (transport_connection_is_tx_paced (ctx->tc))
1129     transport_connection_tx_pacer_update_bytes (ctx->tc, ctx->max_len_to_snd);
1130
1131   ctx->left_to_snd = ctx->max_len_to_snd;
1132   n_left = ctx->n_segs_per_evt;
1133
1134   while (n_left >= 4)
1135     {
1136       vlib_buffer_t *b0, *b1;
1137       u32 bi0, bi1;
1138
1139       pbi = wrk->tx_buffers[n_bufs - 3];
1140       pb = vlib_get_buffer (vm, pbi);
1141       vlib_prefetch_buffer_header (pb, STORE);
1142       pbi = wrk->tx_buffers[n_bufs - 4];
1143       pb = vlib_get_buffer (vm, pbi);
1144       vlib_prefetch_buffer_header (pb, STORE);
1145
1146       bi0 = wrk->tx_buffers[--n_bufs];
1147       bi1 = wrk->tx_buffers[--n_bufs];
1148
1149       b0 = vlib_get_buffer (vm, bi0);
1150       b1 = vlib_get_buffer (vm, bi1);
1151
1152       session_tx_fill_buffer (vm, ctx, b0, &n_bufs, peek_data);
1153       session_tx_fill_buffer (vm, ctx, b1, &n_bufs, peek_data);
1154
1155       ctx->transport_vft->push_header (ctx->tc, b0);
1156       ctx->transport_vft->push_header (ctx->tc, b1);
1157
1158       n_left -= 2;
1159
1160       vec_add1 (wrk->pending_tx_buffers, bi0);
1161       vec_add1 (wrk->pending_tx_buffers, bi1);
1162       vec_add1 (wrk->pending_tx_nexts, next_index);
1163       vec_add1 (wrk->pending_tx_nexts, next_index);
1164     }
1165   while (n_left)
1166     {
1167       vlib_buffer_t *b0;
1168       u32 bi0;
1169
1170       if (n_left > 1)
1171         {
1172           pbi = wrk->tx_buffers[n_bufs - 2];
1173           pb = vlib_get_buffer (vm, pbi);
1174           vlib_prefetch_buffer_header (pb, STORE);
1175         }
1176
1177       bi0 = wrk->tx_buffers[--n_bufs];
1178       b0 = vlib_get_buffer (vm, bi0);
1179       session_tx_fill_buffer (vm, ctx, b0, &n_bufs, peek_data);
1180
1181       /* Ask transport to push header after current_length and
1182        * total_length_not_including_first_buffer are updated */
1183       ctx->transport_vft->push_header (ctx->tc, b0);
1184
1185       n_left -= 1;
1186
1187       vec_add1 (wrk->pending_tx_buffers, bi0);
1188       vec_add1 (wrk->pending_tx_nexts, next_index);
1189     }
1190
1191   if (PREDICT_FALSE ((n_trace = vlib_get_trace_count (vm, node)) > 0))
1192     session_tx_trace_frame (vm, node, next_index, wrk->pending_tx_buffers,
1193                             ctx->n_segs_per_evt, ctx->s, n_trace);
1194
1195   if (PREDICT_FALSE (n_bufs))
1196     vlib_buffer_free (vm, wrk->tx_buffers, n_bufs);
1197
1198   *n_tx_packets += ctx->n_segs_per_evt;
1199
1200   SESSION_EVT (SESSION_EVT_DEQ, ctx->s, ctx->max_len_to_snd, ctx->max_dequeue,
1201                ctx->s->tx_fifo->has_event, wrk->last_vlib_time);
1202
1203   ASSERT (ctx->left_to_snd == 0);
1204
1205   /* If we couldn't dequeue all bytes reschedule as old flow. Otherwise,
1206    * check if application enqueued more data and reschedule accordingly */
1207   if (ctx->max_len_to_snd < ctx->max_dequeue)
1208     session_evt_add_old (wrk, elt);
1209   else
1210     session_tx_maybe_reschedule (wrk, ctx, elt);
1211
1212   if (!peek_data)
1213     {
1214       u32 n_dequeued = ctx->max_len_to_snd;
1215       if (ctx->transport_vft->transport_options.tx_type == TRANSPORT_TX_DGRAM)
1216         n_dequeued += ctx->n_segs_per_evt * SESSION_CONN_HDR_LEN;
1217       if (svm_fifo_needs_deq_ntf (ctx->s->tx_fifo, n_dequeued))
1218         session_dequeue_notify (ctx->s);
1219     }
1220   return SESSION_TX_OK;
1221 }
1222
1223 int
1224 session_tx_fifo_peek_and_snd (session_worker_t * wrk,
1225                               vlib_node_runtime_t * node,
1226                               session_evt_elt_t * e, int *n_tx_packets)
1227 {
1228   return session_tx_fifo_read_and_snd_i (wrk, node, e, n_tx_packets, 1);
1229 }
1230
1231 int
1232 session_tx_fifo_dequeue_and_snd (session_worker_t * wrk,
1233                                  vlib_node_runtime_t * node,
1234                                  session_evt_elt_t * e, int *n_tx_packets)
1235 {
1236   return session_tx_fifo_read_and_snd_i (wrk, node, e, n_tx_packets, 0);
1237 }
1238
1239 int
1240 session_tx_fifo_dequeue_internal (session_worker_t * wrk,
1241                                   vlib_node_runtime_t * node,
1242                                   session_evt_elt_t * elt, int *n_tx_packets)
1243 {
1244   transport_send_params_t *sp = &wrk->ctx.sp;
1245   session_t *s = wrk->ctx.s;
1246   u32 n_packets;
1247
1248   if (PREDICT_FALSE (s->session_state >= SESSION_STATE_TRANSPORT_CLOSED))
1249     return 0;
1250
1251   /* Clear custom-tx flag used to request reschedule for tx */
1252   s->flags &= ~SESSION_F_CUSTOM_TX;
1253
1254   sp->max_burst_size = clib_min (SESSION_NODE_FRAME_SIZE - *n_tx_packets,
1255                                  TRANSPORT_PACER_MAX_BURST_PKTS);
1256
1257   n_packets = transport_custom_tx (session_get_transport_proto (s), s, sp);
1258   *n_tx_packets += n_packets;
1259
1260   if (s->flags & SESSION_F_CUSTOM_TX)
1261     {
1262       session_evt_add_old (wrk, elt);
1263     }
1264   else if (!(sp->flags & TRANSPORT_SND_F_DESCHED))
1265     {
1266       svm_fifo_unset_event (s->tx_fifo);
1267       if (svm_fifo_max_dequeue_cons (s->tx_fifo))
1268         if (svm_fifo_set_event (s->tx_fifo))
1269           session_evt_add_head_old (wrk, elt);
1270     }
1271
1272   if (sp->max_burst_size &&
1273       svm_fifo_needs_deq_ntf (s->tx_fifo, sp->max_burst_size))
1274     session_dequeue_notify (s);
1275
1276   return n_packets;
1277 }
1278
1279 always_inline session_t *
1280 session_event_get_session (session_worker_t * wrk, session_event_t * e)
1281 {
1282   if (PREDICT_FALSE (pool_is_free_index (wrk->sessions, e->session_index)))
1283     return 0;
1284
1285   ASSERT (session_is_valid (e->session_index, wrk->vm->thread_index));
1286   return pool_elt_at_index (wrk->sessions, e->session_index);
1287 }
1288
1289 always_inline void
1290 session_event_dispatch_ctrl (session_worker_t * wrk, session_evt_elt_t * elt)
1291 {
1292   clib_llist_index_t ei;
1293   void (*fp) (void *);
1294   session_event_t *e;
1295   session_t *s;
1296
1297   ei = clib_llist_entry_index (wrk->event_elts, elt);
1298   e = &elt->evt;
1299
1300   switch (e->event_type)
1301     {
1302     case SESSION_CTRL_EVT_RPC:
1303       fp = e->rpc_args.fp;
1304       (*fp) (e->rpc_args.arg);
1305       break;
1306     case SESSION_CTRL_EVT_HALF_CLOSE:
1307       s = session_get_from_handle_if_valid (e->session_handle);
1308       if (PREDICT_FALSE (!s))
1309         break;
1310       session_transport_half_close (s);
1311       break;
1312     case SESSION_CTRL_EVT_CLOSE:
1313       s = session_get_from_handle_if_valid (e->session_handle);
1314       if (PREDICT_FALSE (!s))
1315         break;
1316       session_transport_close (s);
1317       break;
1318     case SESSION_CTRL_EVT_RESET:
1319       s = session_get_from_handle_if_valid (e->session_handle);
1320       if (PREDICT_FALSE (!s))
1321         break;
1322       session_transport_reset (s);
1323       break;
1324     case SESSION_CTRL_EVT_LISTEN:
1325       session_mq_listen_handler (session_evt_ctrl_data (wrk, elt));
1326       break;
1327     case SESSION_CTRL_EVT_LISTEN_URI:
1328       session_mq_listen_uri_handler (session_evt_ctrl_data (wrk, elt));
1329       break;
1330     case SESSION_CTRL_EVT_UNLISTEN:
1331       session_mq_unlisten_handler (session_evt_ctrl_data (wrk, elt));
1332       break;
1333     case SESSION_CTRL_EVT_CONNECT:
1334       session_mq_connect_handler (session_evt_ctrl_data (wrk, elt));
1335       break;
1336     case SESSION_CTRL_EVT_CONNECT_URI:
1337       session_mq_connect_uri_handler (session_evt_ctrl_data (wrk, elt));
1338       break;
1339     case SESSION_CTRL_EVT_SHUTDOWN:
1340       session_mq_shutdown_handler (session_evt_ctrl_data (wrk, elt));
1341       break;
1342     case SESSION_CTRL_EVT_DISCONNECT:
1343       session_mq_disconnect_handler (session_evt_ctrl_data (wrk, elt));
1344       break;
1345     case SESSION_CTRL_EVT_DISCONNECTED:
1346       session_mq_disconnected_handler (session_evt_ctrl_data (wrk, elt));
1347       break;
1348     case SESSION_CTRL_EVT_ACCEPTED_REPLY:
1349       session_mq_accepted_reply_handler (session_evt_ctrl_data (wrk, elt));
1350       break;
1351     case SESSION_CTRL_EVT_DISCONNECTED_REPLY:
1352       session_mq_disconnected_reply_handler (session_evt_ctrl_data (wrk,
1353                                                                     elt));
1354       break;
1355     case SESSION_CTRL_EVT_RESET_REPLY:
1356       session_mq_reset_reply_handler (session_evt_ctrl_data (wrk, elt));
1357       break;
1358     case SESSION_CTRL_EVT_WORKER_UPDATE:
1359       session_mq_worker_update_handler (session_evt_ctrl_data (wrk, elt));
1360       break;
1361     case SESSION_CTRL_EVT_APP_DETACH:
1362       app_mq_detach_handler (session_evt_ctrl_data (wrk, elt));
1363       break;
1364     case SESSION_CTRL_EVT_APP_WRK_RPC:
1365       session_mq_app_wrk_rpc_handler (session_evt_ctrl_data (wrk, elt));
1366       break;
1367     case SESSION_CTRL_EVT_TRANSPORT_ATTR:
1368       session_mq_transport_attr_handler (session_evt_ctrl_data (wrk, elt));
1369       break;
1370     default:
1371       clib_warning ("unhandled event type %d", e->event_type);
1372     }
1373
1374   /* Regrab elements in case pool moved */
1375   elt = pool_elt_at_index (wrk->event_elts, ei);
1376   if (!clib_llist_elt_is_linked (elt, evt_list))
1377     {
1378       e = &elt->evt;
1379       if (e->event_type >= SESSION_CTRL_EVT_BOUND)
1380         session_evt_ctrl_data_free (wrk, elt);
1381       session_evt_elt_free (wrk, elt);
1382     }
1383   SESSION_EVT (SESSION_EVT_COUNTS, CNT_CTRL_EVTS, 1, wrk);
1384 }
1385
1386 always_inline void
1387 session_event_dispatch_io (session_worker_t * wrk, vlib_node_runtime_t * node,
1388                            session_evt_elt_t * elt, int *n_tx_packets)
1389 {
1390   session_main_t *smm = &session_main;
1391   app_worker_t *app_wrk;
1392   clib_llist_index_t ei;
1393   session_event_t *e;
1394   session_t *s;
1395
1396   ei = clib_llist_entry_index (wrk->event_elts, elt);
1397   e = &elt->evt;
1398
1399   switch (e->event_type)
1400     {
1401     case SESSION_IO_EVT_TX_FLUSH:
1402     case SESSION_IO_EVT_TX:
1403       s = session_event_get_session (wrk, e);
1404       if (PREDICT_FALSE (!s))
1405         break;
1406       CLIB_PREFETCH (s->tx_fifo, 2 * CLIB_CACHE_LINE_BYTES, LOAD);
1407       wrk->ctx.s = s;
1408       /* Spray packets in per session type frames, since they go to
1409        * different nodes */
1410       (smm->session_tx_fns[s->session_type]) (wrk, node, elt, n_tx_packets);
1411       break;
1412     case SESSION_IO_EVT_RX:
1413       s = session_event_get_session (wrk, e);
1414       if (!s)
1415         break;
1416       transport_app_rx_evt (session_get_transport_proto (s),
1417                             s->connection_index, s->thread_index);
1418       break;
1419     case SESSION_IO_EVT_BUILTIN_RX:
1420       s = session_event_get_session (wrk, e);
1421       if (PREDICT_FALSE (!s || s->session_state >= SESSION_STATE_CLOSING))
1422         break;
1423       svm_fifo_unset_event (s->rx_fifo);
1424       app_wrk = app_worker_get (s->app_wrk_index);
1425       app_worker_builtin_rx (app_wrk, s);
1426       break;
1427     case SESSION_IO_EVT_BUILTIN_TX:
1428       s = session_get_from_handle_if_valid (e->session_handle);
1429       wrk->ctx.s = s;
1430       if (PREDICT_TRUE (s != 0))
1431         session_tx_fifo_dequeue_internal (wrk, node, elt, n_tx_packets);
1432       break;
1433     default:
1434       clib_warning ("unhandled event type %d", e->event_type);
1435     }
1436
1437   SESSION_EVT (SESSION_IO_EVT_COUNTS, e->event_type, 1, wrk);
1438
1439   /* Regrab elements in case pool moved */
1440   elt = pool_elt_at_index (wrk->event_elts, ei);
1441   if (!clib_llist_elt_is_linked (elt, evt_list))
1442     session_evt_elt_free (wrk, elt);
1443 }
1444
1445 /* *INDENT-OFF* */
1446 static const u32 session_evt_msg_sizes[] = {
1447 #define _(symc, sym)                                                    \
1448   [SESSION_CTRL_EVT_ ## symc] = sizeof (session_ ## sym ##_msg_t),
1449   foreach_session_ctrl_evt
1450 #undef _
1451 };
1452 /* *INDENT-ON* */
1453
1454 always_inline void
1455 session_evt_add_to_list (session_worker_t * wrk, session_event_t * evt)
1456 {
1457   session_evt_elt_t *elt;
1458
1459   if (evt->event_type >= SESSION_CTRL_EVT_RPC)
1460     {
1461       elt = session_evt_alloc_ctrl (wrk);
1462       if (evt->event_type >= SESSION_CTRL_EVT_BOUND)
1463         {
1464           elt->evt.ctrl_data_index = session_evt_ctrl_data_alloc (wrk);
1465           elt->evt.event_type = evt->event_type;
1466           clib_memcpy_fast (session_evt_ctrl_data (wrk, elt), evt->data,
1467                             session_evt_msg_sizes[evt->event_type]);
1468         }
1469       else
1470         {
1471           /* Internal control events fit into io events footprint */
1472           clib_memcpy_fast (&elt->evt, evt, sizeof (elt->evt));
1473         }
1474     }
1475   else
1476     {
1477       elt = session_evt_alloc_new (wrk);
1478       clib_memcpy_fast (&elt->evt, evt, sizeof (elt->evt));
1479     }
1480 }
1481
1482 static void
1483 session_flush_pending_tx_buffers (session_worker_t * wrk,
1484                                   vlib_node_runtime_t * node)
1485 {
1486   vlib_buffer_enqueue_to_next (wrk->vm, node, wrk->pending_tx_buffers,
1487                                wrk->pending_tx_nexts,
1488                                vec_len (wrk->pending_tx_nexts));
1489   vec_reset_length (wrk->pending_tx_buffers);
1490   vec_reset_length (wrk->pending_tx_nexts);
1491 }
1492
1493 int
1494 session_wrk_handle_mq (session_worker_t *wrk, svm_msg_q_t *mq)
1495 {
1496   svm_msg_q_msg_t _msg, *msg = &_msg;
1497   u32 i, n_to_dequeue = 0;
1498   session_event_t *evt;
1499
1500   n_to_dequeue = svm_msg_q_size (mq);
1501   for (i = 0; i < n_to_dequeue; i++)
1502     {
1503       svm_msg_q_sub_raw (mq, msg);
1504       evt = svm_msg_q_msg_data (mq, msg);
1505       session_evt_add_to_list (wrk, evt);
1506       svm_msg_q_free_msg (mq, msg);
1507     }
1508
1509   return n_to_dequeue;
1510 }
1511
1512 static void
1513 session_wrk_timerfd_update (session_worker_t *wrk, u64 time_ns)
1514 {
1515   struct itimerspec its;
1516
1517   its.it_value.tv_sec = 0;
1518   its.it_value.tv_nsec = time_ns;
1519   its.it_interval.tv_sec = 0;
1520   its.it_interval.tv_nsec = its.it_value.tv_nsec;
1521
1522   if (timerfd_settime (wrk->timerfd, 0, &its, NULL) == -1)
1523     clib_warning ("timerfd_settime");
1524 }
1525
1526 always_inline u64
1527 session_wrk_tfd_timeout (session_wrk_state_t state, u32 thread_index)
1528 {
1529   if (state == SESSION_WRK_INTERRUPT)
1530     return thread_index ? 1e6 : vlib_num_workers () ? 5e8 : 1e6;
1531   else if (state == SESSION_WRK_IDLE)
1532     return thread_index ? 1e8 : vlib_num_workers () ? 5e8 : 1e8;
1533   else
1534     return 0;
1535 }
1536
1537 static inline void
1538 session_wrk_set_state (session_worker_t *wrk, session_wrk_state_t state)
1539 {
1540   u64 time_ns;
1541
1542   wrk->state = state;
1543   time_ns = session_wrk_tfd_timeout (state, wrk->vm->thread_index);
1544   session_wrk_timerfd_update (wrk, time_ns);
1545 }
1546
1547 static void
1548 session_wrk_update_state (session_worker_t *wrk)
1549 {
1550   vlib_main_t *vm = wrk->vm;
1551
1552   if (wrk->state == SESSION_WRK_POLLING)
1553     {
1554       if (pool_elts (wrk->event_elts) == 3 &&
1555           vlib_last_vectors_per_main_loop (vm) < 1)
1556         {
1557           session_wrk_set_state (wrk, SESSION_WRK_INTERRUPT);
1558           vlib_node_set_state (vm, session_queue_node.index,
1559                                VLIB_NODE_STATE_INTERRUPT);
1560         }
1561     }
1562   else if (wrk->state == SESSION_WRK_INTERRUPT)
1563     {
1564       if (pool_elts (wrk->event_elts) > 3 ||
1565           vlib_last_vectors_per_main_loop (vm) > 1)
1566         {
1567           session_wrk_set_state (wrk, SESSION_WRK_POLLING);
1568           vlib_node_set_state (vm, session_queue_node.index,
1569                                VLIB_NODE_STATE_POLLING);
1570         }
1571       else if (PREDICT_FALSE (!pool_elts (wrk->sessions)))
1572         {
1573           session_wrk_set_state (wrk, SESSION_WRK_IDLE);
1574         }
1575     }
1576   else
1577     {
1578       if (pool_elts (wrk->event_elts))
1579         {
1580           session_wrk_set_state (wrk, SESSION_WRK_INTERRUPT);
1581         }
1582     }
1583 }
1584
1585 static uword
1586 session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
1587                        vlib_frame_t * frame)
1588 {
1589   u32 thread_index = vm->thread_index, __clib_unused n_evts;
1590   session_evt_elt_t *elt, *ctrl_he, *new_he, *old_he;
1591   session_main_t *smm = vnet_get_session_main ();
1592   session_worker_t *wrk = &smm->wrk[thread_index];
1593   clib_llist_index_t ei, next_ei, old_ti;
1594   int n_tx_packets;
1595
1596   SESSION_EVT (SESSION_EVT_DISPATCH_START, wrk);
1597
1598   session_wrk_update_time (wrk, vlib_time_now (vm));
1599
1600   /*
1601    *  Update transport time
1602    */
1603   transport_update_time (wrk->last_vlib_time, thread_index);
1604   n_tx_packets = vec_len (wrk->pending_tx_buffers);
1605   SESSION_EVT (SESSION_EVT_DSP_CNTRS, UPDATE_TIME, wrk);
1606
1607   /*
1608    *  Dequeue new internal mq events
1609    */
1610
1611   n_evts = session_wrk_handle_mq (wrk, wrk->vpp_event_queue);
1612   SESSION_EVT (SESSION_EVT_DSP_CNTRS, MQ_DEQ, wrk, n_evts);
1613
1614   /*
1615    * Handle control events
1616    */
1617
1618   ctrl_he = pool_elt_at_index (wrk->event_elts, wrk->ctrl_head);
1619
1620   clib_llist_foreach_safe (wrk->event_elts, evt_list, ctrl_he, elt, ({
1621     clib_llist_remove (wrk->event_elts, evt_list, elt);
1622     session_event_dispatch_ctrl (wrk, elt);
1623   }));
1624
1625   SESSION_EVT (SESSION_EVT_DSP_CNTRS, CTRL_EVTS, wrk);
1626
1627   /*
1628    * Handle the new io events.
1629    */
1630
1631   new_he = pool_elt_at_index (wrk->event_elts, wrk->new_head);
1632   old_he = pool_elt_at_index (wrk->event_elts, wrk->old_head);
1633   old_ti = clib_llist_prev_index (old_he, evt_list);
1634
1635   ei = clib_llist_next_index (new_he, evt_list);
1636   while (ei != wrk->new_head && n_tx_packets < SESSION_NODE_FRAME_SIZE)
1637     {
1638       elt = pool_elt_at_index (wrk->event_elts, ei);
1639       ei = clib_llist_next_index (elt, evt_list);
1640       clib_llist_remove (wrk->event_elts, evt_list, elt);
1641       session_event_dispatch_io (wrk, node, elt, &n_tx_packets);
1642     }
1643
1644   SESSION_EVT (SESSION_EVT_DSP_CNTRS, NEW_IO_EVTS, wrk);
1645
1646   /*
1647    * Handle the old io events, if we had any prior to processing the new ones
1648    */
1649
1650   if (old_ti != wrk->old_head)
1651     {
1652       old_he = pool_elt_at_index (wrk->event_elts, wrk->old_head);
1653       ei = clib_llist_next_index (old_he, evt_list);
1654
1655       while (n_tx_packets < SESSION_NODE_FRAME_SIZE)
1656         {
1657           elt = pool_elt_at_index (wrk->event_elts, ei);
1658           next_ei = clib_llist_next_index (elt, evt_list);
1659           clib_llist_remove (wrk->event_elts, evt_list, elt);
1660
1661           session_event_dispatch_io (wrk, node, elt, &n_tx_packets);
1662
1663           if (ei == old_ti)
1664             break;
1665
1666           ei = next_ei;
1667         };
1668     }
1669
1670   SESSION_EVT (SESSION_EVT_DSP_CNTRS, OLD_IO_EVTS, wrk);
1671
1672   if (vec_len (wrk->pending_tx_buffers))
1673     session_flush_pending_tx_buffers (wrk, node);
1674
1675   vlib_node_increment_counter (vm, session_queue_node.index,
1676                                SESSION_QUEUE_ERROR_TX, n_tx_packets);
1677
1678   SESSION_EVT (SESSION_EVT_DISPATCH_END, wrk, n_tx_packets);
1679
1680   if (wrk->flags & SESSION_WRK_F_ADAPTIVE)
1681     session_wrk_update_state (wrk);
1682
1683   return n_tx_packets;
1684 }
1685
1686 /* *INDENT-OFF* */
1687 VLIB_REGISTER_NODE (session_queue_node) =
1688 {
1689   .function = session_queue_node_fn,
1690   .flags = VLIB_NODE_FLAG_TRACE_SUPPORTED,
1691   .name = "session-queue",
1692   .format_trace = format_session_queue_trace,
1693   .type = VLIB_NODE_TYPE_INPUT,
1694   .n_errors = ARRAY_LEN (session_queue_error_strings),
1695   .error_strings = session_queue_error_strings,
1696   .state = VLIB_NODE_STATE_DISABLED,
1697 };
1698 /* *INDENT-ON* */
1699
1700 static clib_error_t *
1701 session_wrk_tfd_read_ready (clib_file_t *cf)
1702 {
1703   session_worker_t *wrk = session_main_get_worker (cf->private_data);
1704   u64 buf;
1705   int rv;
1706
1707   vlib_node_set_interrupt_pending (wrk->vm, session_queue_node.index);
1708   rv = read (wrk->timerfd, &buf, sizeof (buf));
1709   if (rv < 0 && errno != EAGAIN)
1710     clib_unix_warning ("failed");
1711   return 0;
1712 }
1713
1714 static clib_error_t *
1715 session_wrk_tfd_write_ready (clib_file_t *cf)
1716 {
1717   return 0;
1718 }
1719
1720 void
1721 session_wrk_enable_adaptive_mode (session_worker_t *wrk)
1722 {
1723   u32 thread_index = wrk->vm->thread_index;
1724   clib_file_t template = { 0 };
1725
1726   if ((wrk->timerfd = timerfd_create (CLOCK_MONOTONIC, TFD_NONBLOCK)) < 0)
1727     clib_warning ("timerfd_create");
1728
1729   template.read_function = session_wrk_tfd_read_ready;
1730   template.write_function = session_wrk_tfd_write_ready;
1731   template.file_descriptor = wrk->timerfd;
1732   template.private_data = thread_index;
1733   template.polling_thread_index = thread_index;
1734   template.description = format (0, "session-wrk-tfd-%u", thread_index);
1735
1736   wrk->timerfd_file = clib_file_add (&file_main, &template);
1737   wrk->flags |= SESSION_WRK_F_ADAPTIVE;
1738 }
1739
1740 static clib_error_t *
1741 session_queue_exit (vlib_main_t * vm)
1742 {
1743   if (vlib_get_n_threads () < 2)
1744     return 0;
1745
1746   /*
1747    * Shut off (especially) worker-thread session nodes.
1748    * Otherwise, vpp can crash as the main thread unmaps the
1749    * API segment.
1750    */
1751   vlib_worker_thread_barrier_sync (vm);
1752   session_node_enable_disable (0 /* is_enable */ );
1753   vlib_worker_thread_barrier_release (vm);
1754   return 0;
1755 }
1756
1757 VLIB_MAIN_LOOP_EXIT_FUNCTION (session_queue_exit);
1758
1759 static uword
1760 session_queue_run_on_main (vlib_main_t * vm)
1761 {
1762   vlib_node_runtime_t *node;
1763
1764   node = vlib_node_get_runtime (vm, session_queue_node.index);
1765   return session_queue_node_fn (vm, node, 0);
1766 }
1767
1768 static uword
1769 session_queue_process (vlib_main_t * vm, vlib_node_runtime_t * rt,
1770                        vlib_frame_t * f)
1771 {
1772   uword *event_data = 0;
1773   f64 timeout = 1.0;
1774   uword event_type;
1775
1776   while (1)
1777     {
1778       vlib_process_wait_for_event_or_clock (vm, timeout);
1779       event_type = vlib_process_get_events (vm, (uword **) & event_data);
1780
1781       switch (event_type)
1782         {
1783         case SESSION_Q_PROCESS_RUN_ON_MAIN:
1784           /* Run session queue node on main thread */
1785           session_queue_run_on_main (vm);
1786           break;
1787         case SESSION_Q_PROCESS_STOP:
1788           vlib_node_set_state (vm, session_queue_process_node.index,
1789                                VLIB_NODE_STATE_DISABLED);
1790           timeout = 100000.0;
1791           break;
1792         case ~0:
1793           /* Timed out. Run on main to ensure all events are handled */
1794           session_queue_run_on_main (vm);
1795           break;
1796         }
1797       vec_reset_length (event_data);
1798     }
1799   return 0;
1800 }
1801
1802 /* *INDENT-OFF* */
1803 VLIB_REGISTER_NODE (session_queue_process_node) =
1804 {
1805   .function = session_queue_process,
1806   .type = VLIB_NODE_TYPE_PROCESS,
1807   .name = "session-queue-process",
1808   .state = VLIB_NODE_STATE_DISABLED,
1809 };
1810 /* *INDENT-ON* */
1811
1812 static_always_inline uword
1813 session_queue_pre_input_inline (vlib_main_t * vm, vlib_node_runtime_t * node,
1814                                 vlib_frame_t * frame)
1815 {
1816   session_main_t *sm = &session_main;
1817   if (!sm->wrk[0].vpp_event_queue)
1818     return 0;
1819   node = vlib_node_get_runtime (vm, session_queue_node.index);
1820   return session_queue_node_fn (vm, node, frame);
1821 }
1822
1823 /* *INDENT-OFF* */
1824 VLIB_REGISTER_NODE (session_queue_pre_input_node) =
1825 {
1826   .function = session_queue_pre_input_inline,
1827   .type = VLIB_NODE_TYPE_PRE_INPUT,
1828   .name = "session-queue-main",
1829   .state = VLIB_NODE_STATE_DISABLED,
1830 };
1831 /* *INDENT-ON* */
1832
1833 /*
1834  * fd.io coding-style-patch-verification: ON
1835  *
1836  * Local Variables:
1837  * eval: (c-set-style "gnu")
1838  * End:
1839  */