d30df33a5e79060dabd72de16bd029c0f0dc6e68
[vpp.git] / src / vnet / session / session_node.c
1 /*
2  * Copyright (c) 2017-2019 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include <math.h>
17 #include <vlib/vlib.h>
18 #include <vnet/vnet.h>
19 #include <vppinfra/elog.h>
20 #include <vnet/session/transport.h>
21 #include <vnet/session/session.h>
22 #include <vnet/session/application.h>
23 #include <vnet/session/application_interface.h>
24 #include <vnet/session/application_local.h>
25 #include <vnet/session/session_debug.h>
26 #include <svm/queue.h>
27 #include <sys/timerfd.h>
28
29 #define app_check_thread_and_barrier(_fn, _arg)                         \
30   if (!vlib_thread_is_main_w_barrier ())                                \
31     {                                                                   \
32      vlib_rpc_call_main_thread (_fn, (u8 *) _arg, sizeof(*_arg));       \
33       return;                                                           \
34    }
35
36 static transport_endpt_ext_cfg_t *
37 session_mq_get_ext_config (application_t *app, uword offset)
38 {
39   svm_fifo_chunk_t *c;
40   fifo_segment_t *fs;
41
42   fs = application_get_rx_mqs_segment (app);
43   c = fs_chunk_ptr (fs->h, offset);
44   return (transport_endpt_ext_cfg_t *) c->data;
45 }
46
47 static void
48 session_mq_free_ext_config (application_t *app, uword offset)
49 {
50   svm_fifo_chunk_t *c;
51   fifo_segment_t *fs;
52
53   fs = application_get_rx_mqs_segment (app);
54   c = fs_chunk_ptr (fs->h, offset);
55   fifo_segment_collect_chunk (fs, 0 /* only one slice */, c);
56 }
57
58 static void
59 session_mq_listen_handler (void *data)
60 {
61   session_listen_msg_t *mp = (session_listen_msg_t *) data;
62   vnet_listen_args_t _a, *a = &_a;
63   app_worker_t *app_wrk;
64   application_t *app;
65   int rv;
66
67   app_check_thread_and_barrier (session_mq_listen_handler, mp);
68
69   app = application_lookup (mp->client_index);
70   if (!app)
71     return;
72
73   clib_memset (a, 0, sizeof (*a));
74   a->sep.is_ip4 = mp->is_ip4;
75   ip_copy (&a->sep.ip, &mp->ip, mp->is_ip4);
76   a->sep.port = mp->port;
77   a->sep.fib_index = mp->vrf;
78   a->sep.sw_if_index = ENDPOINT_INVALID_INDEX;
79   a->sep.transport_proto = mp->proto;
80   a->app_index = app->app_index;
81   a->wrk_map_index = mp->wrk_index;
82   a->sep_ext.transport_flags = mp->flags;
83
84   if (mp->ext_config)
85     a->sep_ext.ext_cfg = session_mq_get_ext_config (app, mp->ext_config);
86
87   if ((rv = vnet_listen (a)))
88     clib_warning ("listen returned: %U", format_session_error, rv);
89
90   app_wrk = application_get_worker (app, mp->wrk_index);
91   mq_send_session_bound_cb (app_wrk->wrk_index, mp->context, a->handle, rv);
92
93   if (mp->ext_config)
94     session_mq_free_ext_config (app, mp->ext_config);
95 }
96
97 static void
98 session_mq_listen_uri_handler (void *data)
99 {
100   session_listen_uri_msg_t *mp = (session_listen_uri_msg_t *) data;
101   vnet_listen_args_t _a, *a = &_a;
102   app_worker_t *app_wrk;
103   application_t *app;
104   int rv;
105
106   app_check_thread_and_barrier (session_mq_listen_uri_handler, mp);
107
108   app = application_lookup (mp->client_index);
109   if (!app)
110     return;
111
112   clib_memset (a, 0, sizeof (*a));
113   a->uri = (char *) mp->uri;
114   a->app_index = app->app_index;
115   rv = vnet_bind_uri (a);
116
117   app_wrk = application_get_worker (app, 0);
118   mq_send_session_bound_cb (app_wrk->wrk_index, mp->context, a->handle, rv);
119 }
120
121 static void
122 session_mq_connect_handler (void *data)
123 {
124   session_connect_msg_t *mp = (session_connect_msg_t *) data;
125   vnet_connect_args_t _a, *a = &_a;
126   app_worker_t *app_wrk;
127   application_t *app;
128   int rv;
129
130   app_check_thread_and_barrier (session_mq_connect_handler, mp);
131
132   app = application_lookup (mp->client_index);
133   if (!app)
134     return;
135
136   clib_memset (a, 0, sizeof (*a));
137   a->sep.is_ip4 = mp->is_ip4;
138   clib_memcpy_fast (&a->sep.ip, &mp->ip, sizeof (mp->ip));
139   a->sep.port = mp->port;
140   a->sep.transport_proto = mp->proto;
141   a->sep.peer.fib_index = mp->vrf;
142   clib_memcpy_fast (&a->sep.peer.ip, &mp->lcl_ip, sizeof (mp->lcl_ip));
143   if (mp->is_ip4)
144     {
145       ip46_address_mask_ip4 (&a->sep.ip);
146       ip46_address_mask_ip4 (&a->sep.peer.ip);
147     }
148   a->sep.peer.port = mp->lcl_port;
149   a->sep.peer.sw_if_index = ENDPOINT_INVALID_INDEX;
150   a->sep_ext.parent_handle = mp->parent_handle;
151   a->sep_ext.transport_flags = mp->flags;
152   a->api_context = mp->context;
153   a->app_index = app->app_index;
154   a->wrk_map_index = mp->wrk_index;
155
156   if (mp->ext_config)
157     a->sep_ext.ext_cfg = session_mq_get_ext_config (app, mp->ext_config);
158
159   if ((rv = vnet_connect (a)))
160     {
161       clib_warning ("connect returned: %U", format_session_error, rv);
162       app_wrk = application_get_worker (app, mp->wrk_index);
163       mq_send_session_connected_cb (app_wrk->wrk_index, mp->context, 0, rv);
164     }
165
166   if (mp->ext_config)
167     session_mq_free_ext_config (app, mp->ext_config);
168 }
169
170 static void
171 session_mq_connect_uri_handler (void *data)
172 {
173   session_connect_uri_msg_t *mp = (session_connect_uri_msg_t *) data;
174   vnet_connect_args_t _a, *a = &_a;
175   app_worker_t *app_wrk;
176   application_t *app;
177   int rv;
178
179   app_check_thread_and_barrier (session_mq_connect_uri_handler, mp);
180
181   app = application_lookup (mp->client_index);
182   if (!app)
183     return;
184
185   clib_memset (a, 0, sizeof (*a));
186   a->uri = (char *) mp->uri;
187   a->api_context = mp->context;
188   a->app_index = app->app_index;
189   if ((rv = vnet_connect_uri (a)))
190     {
191       clib_warning ("connect_uri returned: %d", rv);
192       app_wrk = application_get_worker (app, 0 /* default wrk only */ );
193       mq_send_session_connected_cb (app_wrk->wrk_index, mp->context, 0, rv);
194     }
195 }
196
197 static void
198 session_mq_disconnect_handler (void *data)
199 {
200   session_disconnect_msg_t *mp = (session_disconnect_msg_t *) data;
201   vnet_disconnect_args_t _a, *a = &_a;
202   application_t *app;
203
204   app = application_lookup (mp->client_index);
205   if (!app)
206     return;
207
208   a->app_index = app->app_index;
209   a->handle = mp->handle;
210   vnet_disconnect_session (a);
211 }
212
213 static void
214 app_mq_detach_handler (void *data)
215 {
216   session_app_detach_msg_t *mp = (session_app_detach_msg_t *) data;
217   vnet_app_detach_args_t _a, *a = &_a;
218   application_t *app;
219
220   app_check_thread_and_barrier (app_mq_detach_handler, mp);
221
222   app = application_lookup (mp->client_index);
223   if (!app)
224     return;
225
226   a->app_index = app->app_index;
227   a->api_client_index = mp->client_index;
228   vnet_application_detach (a);
229 }
230
231 static void
232 session_mq_unlisten_handler (void *data)
233 {
234   session_unlisten_msg_t *mp = (session_unlisten_msg_t *) data;
235   vnet_unlisten_args_t _a, *a = &_a;
236   app_worker_t *app_wrk;
237   application_t *app;
238   int rv;
239
240   app_check_thread_and_barrier (session_mq_unlisten_handler, mp);
241
242   app = application_lookup (mp->client_index);
243   if (!app)
244     return;
245
246   clib_memset (a, 0, sizeof (*a));
247   a->app_index = app->app_index;
248   a->handle = mp->handle;
249   a->wrk_map_index = mp->wrk_index;
250   if ((rv = vnet_unlisten (a)))
251     clib_warning ("unlisten returned: %d", rv);
252
253   app_wrk = application_get_worker (app, a->wrk_map_index);
254   if (!app_wrk)
255     return;
256
257   mq_send_unlisten_reply (app_wrk, mp->handle, mp->context, rv);
258 }
259
260 static void
261 session_mq_accepted_reply_handler (void *data)
262 {
263   session_accepted_reply_msg_t *mp = (session_accepted_reply_msg_t *) data;
264   vnet_disconnect_args_t _a = { 0 }, *a = &_a;
265   session_state_t old_state;
266   app_worker_t *app_wrk;
267   session_t *s;
268
269   /* Server isn't interested, kill the session */
270   if (mp->retval)
271     {
272       a->app_index = mp->context;
273       a->handle = mp->handle;
274       vnet_disconnect_session (a);
275       return;
276     }
277
278   /* Mail this back from the main thread. We're not polling in main
279    * thread so we're using other workers for notifications. */
280   if (vlib_num_workers () && vlib_get_thread_index () != 0
281       && session_thread_from_handle (mp->handle) == 0)
282     {
283       vlib_rpc_call_main_thread (session_mq_accepted_reply_handler,
284                                  (u8 *) mp, sizeof (*mp));
285       return;
286     }
287
288   s = session_get_from_handle_if_valid (mp->handle);
289   if (!s)
290     return;
291
292   app_wrk = app_worker_get (s->app_wrk_index);
293   if (app_wrk->app_index != mp->context)
294     {
295       clib_warning ("app doesn't own session");
296       return;
297     }
298
299   if (!session_has_transport (s))
300     {
301       s->session_state = SESSION_STATE_READY;
302       if (ct_session_connect_notify (s))
303         return;
304     }
305   else
306     {
307       old_state = s->session_state;
308       s->session_state = SESSION_STATE_READY;
309
310       if (!svm_fifo_is_empty_prod (s->rx_fifo))
311         app_worker_lock_and_send_event (app_wrk, s, SESSION_IO_EVT_RX);
312
313       /* Closed while waiting for app to reply. Resend disconnect */
314       if (old_state >= SESSION_STATE_TRANSPORT_CLOSING)
315         {
316           app_worker_close_notify (app_wrk, s);
317           s->session_state = old_state;
318           return;
319         }
320     }
321 }
322
323 static void
324 session_mq_reset_reply_handler (void *data)
325 {
326   vnet_disconnect_args_t _a = { 0 }, *a = &_a;
327   session_reset_reply_msg_t *mp;
328   app_worker_t *app_wrk;
329   session_t *s;
330   application_t *app;
331   u32 index, thread_index;
332
333   mp = (session_reset_reply_msg_t *) data;
334   app = application_lookup (mp->context);
335   if (!app)
336     return;
337
338   session_parse_handle (mp->handle, &index, &thread_index);
339   s = session_get_if_valid (index, thread_index);
340
341   /* No session or not the right session */
342   if (!s || s->session_state < SESSION_STATE_TRANSPORT_CLOSING)
343     return;
344
345   app_wrk = app_worker_get (s->app_wrk_index);
346   if (!app_wrk || app_wrk->app_index != app->app_index)
347     {
348       clib_warning ("App %u does not own handle 0x%lx!", app->app_index,
349                     mp->handle);
350       return;
351     }
352
353   /* Client objected to resetting the session, log and continue */
354   if (mp->retval)
355     {
356       clib_warning ("client retval %d", mp->retval);
357       return;
358     }
359
360   /* This comes as a response to a reset, transport only waiting for
361    * confirmation to remove connection state, no need to disconnect */
362   a->handle = mp->handle;
363   a->app_index = app->app_index;
364   vnet_disconnect_session (a);
365 }
366
367 static void
368 session_mq_disconnected_handler (void *data)
369 {
370   session_disconnected_reply_msg_t *rmp;
371   vnet_disconnect_args_t _a, *a = &_a;
372   svm_msg_q_msg_t _msg, *msg = &_msg;
373   session_disconnected_msg_t *mp;
374   app_worker_t *app_wrk;
375   session_event_t *evt;
376   session_t *s;
377   application_t *app;
378   int rv = 0;
379
380   mp = (session_disconnected_msg_t *) data;
381   if (!(s = session_get_from_handle_if_valid (mp->handle)))
382     {
383       clib_warning ("could not disconnect handle %llu", mp->handle);
384       return;
385     }
386   app_wrk = app_worker_get (s->app_wrk_index);
387   app = application_lookup (mp->client_index);
388   if (!(app_wrk && app && app->app_index == app_wrk->app_index))
389     {
390       clib_warning ("could not disconnect session: %llu app: %u",
391                     mp->handle, mp->client_index);
392       return;
393     }
394
395   a->handle = mp->handle;
396   a->app_index = app_wrk->wrk_index;
397   rv = vnet_disconnect_session (a);
398
399   svm_msg_q_lock_and_alloc_msg_w_ring (app_wrk->event_queue,
400                                        SESSION_MQ_CTRL_EVT_RING,
401                                        SVM_Q_WAIT, msg);
402   evt = svm_msg_q_msg_data (app_wrk->event_queue, msg);
403   clib_memset (evt, 0, sizeof (*evt));
404   evt->event_type = SESSION_CTRL_EVT_DISCONNECTED_REPLY;
405   rmp = (session_disconnected_reply_msg_t *) evt->data;
406   rmp->handle = mp->handle;
407   rmp->context = mp->context;
408   rmp->retval = rv;
409   svm_msg_q_add_and_unlock (app_wrk->event_queue, msg);
410 }
411
412 static void
413 session_mq_disconnected_reply_handler (void *data)
414 {
415   session_disconnected_reply_msg_t *mp;
416   vnet_disconnect_args_t _a, *a = &_a;
417   application_t *app;
418
419   mp = (session_disconnected_reply_msg_t *) data;
420
421   /* Client objected to disconnecting the session, log and continue */
422   if (mp->retval)
423     {
424       clib_warning ("client retval %d", mp->retval);
425       return;
426     }
427
428   /* Disconnect has been confirmed. Confirm close to transport */
429   app = application_lookup (mp->context);
430   if (app)
431     {
432       a->handle = mp->handle;
433       a->app_index = app->app_index;
434       vnet_disconnect_session (a);
435     }
436 }
437
438 static void
439 session_mq_worker_update_handler (void *data)
440 {
441   session_worker_update_msg_t *mp = (session_worker_update_msg_t *) data;
442   session_worker_update_reply_msg_t *rmp;
443   svm_msg_q_msg_t _msg, *msg = &_msg;
444   app_worker_t *app_wrk;
445   u32 owner_app_wrk_map;
446   session_event_t *evt;
447   session_t *s;
448   application_t *app;
449
450   app = application_lookup (mp->client_index);
451   if (!app)
452     return;
453   if (!(s = session_get_from_handle_if_valid (mp->handle)))
454     {
455       clib_warning ("invalid handle %llu", mp->handle);
456       return;
457     }
458   app_wrk = app_worker_get (s->app_wrk_index);
459   if (app_wrk->app_index != app->app_index)
460     {
461       clib_warning ("app %u does not own session %llu", app->app_index,
462                     mp->handle);
463       return;
464     }
465   owner_app_wrk_map = app_wrk->wrk_map_index;
466   app_wrk = application_get_worker (app, mp->wrk_index);
467
468   /* This needs to come from the new owner */
469   if (mp->req_wrk_index == owner_app_wrk_map)
470     {
471       session_req_worker_update_msg_t *wump;
472
473       svm_msg_q_lock_and_alloc_msg_w_ring (app_wrk->event_queue,
474                                            SESSION_MQ_CTRL_EVT_RING,
475                                            SVM_Q_WAIT, msg);
476       evt = svm_msg_q_msg_data (app_wrk->event_queue, msg);
477       clib_memset (evt, 0, sizeof (*evt));
478       evt->event_type = SESSION_CTRL_EVT_REQ_WORKER_UPDATE;
479       wump = (session_req_worker_update_msg_t *) evt->data;
480       wump->session_handle = mp->handle;
481       svm_msg_q_add_and_unlock (app_wrk->event_queue, msg);
482       return;
483     }
484
485   app_worker_own_session (app_wrk, s);
486
487   /*
488    * Send reply
489    */
490   svm_msg_q_lock_and_alloc_msg_w_ring (app_wrk->event_queue,
491                                        SESSION_MQ_CTRL_EVT_RING,
492                                        SVM_Q_WAIT, msg);
493   evt = svm_msg_q_msg_data (app_wrk->event_queue, msg);
494   clib_memset (evt, 0, sizeof (*evt));
495   evt->event_type = SESSION_CTRL_EVT_WORKER_UPDATE_REPLY;
496   rmp = (session_worker_update_reply_msg_t *) evt->data;
497   rmp->handle = mp->handle;
498   if (s->rx_fifo)
499     rmp->rx_fifo = fifo_segment_fifo_offset (s->rx_fifo);
500   if (s->tx_fifo)
501     rmp->tx_fifo = fifo_segment_fifo_offset (s->tx_fifo);
502   rmp->segment_handle = session_segment_handle (s);
503   svm_msg_q_add_and_unlock (app_wrk->event_queue, msg);
504
505   /*
506    * Retransmit messages that may have been lost
507    */
508   if (s->tx_fifo && !svm_fifo_is_empty (s->tx_fifo))
509     session_send_io_evt_to_thread (s->tx_fifo, SESSION_IO_EVT_TX);
510
511   if (s->rx_fifo && !svm_fifo_is_empty (s->rx_fifo))
512     app_worker_lock_and_send_event (app_wrk, s, SESSION_IO_EVT_RX);
513
514   if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSING)
515     app_worker_close_notify (app_wrk, s);
516 }
517
518 static void
519 session_mq_app_wrk_rpc_handler (void *data)
520 {
521   session_app_wrk_rpc_msg_t *mp = (session_app_wrk_rpc_msg_t *) data;
522   svm_msg_q_msg_t _msg, *msg = &_msg;
523   session_app_wrk_rpc_msg_t *rmp;
524   app_worker_t *app_wrk;
525   session_event_t *evt;
526   application_t *app;
527
528   app = application_lookup (mp->client_index);
529   if (!app)
530     return;
531
532   app_wrk = application_get_worker (app, mp->wrk_index);
533
534   svm_msg_q_lock_and_alloc_msg_w_ring (app_wrk->event_queue,
535                                        SESSION_MQ_CTRL_EVT_RING, SVM_Q_WAIT,
536                                        msg);
537   evt = svm_msg_q_msg_data (app_wrk->event_queue, msg);
538   clib_memset (evt, 0, sizeof (*evt));
539   evt->event_type = SESSION_CTRL_EVT_APP_WRK_RPC;
540   rmp = (session_app_wrk_rpc_msg_t *) evt->data;
541   clib_memcpy (rmp->data, mp->data, sizeof (mp->data));
542   svm_msg_q_add_and_unlock (app_wrk->event_queue, msg);
543 }
544
545 static void
546 session_mq_transport_attr_handler (void *data)
547 {
548   session_transport_attr_msg_t *mp = (session_transport_attr_msg_t *) data;
549   session_transport_attr_reply_msg_t *rmp;
550   svm_msg_q_msg_t _msg, *msg = &_msg;
551   app_worker_t *app_wrk;
552   session_event_t *evt;
553   application_t *app;
554   session_t *s;
555   int rv;
556
557   app = application_lookup (mp->client_index);
558   if (!app)
559     return;
560
561   if (!(s = session_get_from_handle_if_valid (mp->handle)))
562     {
563       clib_warning ("invalid handle %llu", mp->handle);
564       return;
565     }
566   app_wrk = app_worker_get (s->app_wrk_index);
567   if (app_wrk->app_index != app->app_index)
568     {
569       clib_warning ("app %u does not own session %llu", app->app_index,
570                     mp->handle);
571       return;
572     }
573
574   rv = session_transport_attribute (s, mp->is_get, &mp->attr);
575
576   svm_msg_q_lock_and_alloc_msg_w_ring (
577     app_wrk->event_queue, SESSION_MQ_CTRL_EVT_RING, SVM_Q_WAIT, msg);
578   evt = svm_msg_q_msg_data (app_wrk->event_queue, msg);
579   clib_memset (evt, 0, sizeof (*evt));
580   evt->event_type = SESSION_CTRL_EVT_TRANSPORT_ATTR_REPLY;
581   rmp = (session_transport_attr_reply_msg_t *) evt->data;
582   rmp->handle = mp->handle;
583   rmp->retval = rv;
584   rmp->is_get = mp->is_get;
585   if (!rv && mp->is_get)
586     rmp->attr = mp->attr;
587   svm_msg_q_add_and_unlock (app_wrk->event_queue, msg);
588 }
589
590 vlib_node_registration_t session_queue_node;
591
592 typedef struct
593 {
594   u32 session_index;
595   u32 server_thread_index;
596 } session_queue_trace_t;
597
598 /* packet trace format function */
599 static u8 *
600 format_session_queue_trace (u8 * s, va_list * args)
601 {
602   CLIB_UNUSED (vlib_main_t * vm) = va_arg (*args, vlib_main_t *);
603   CLIB_UNUSED (vlib_node_t * node) = va_arg (*args, vlib_node_t *);
604   session_queue_trace_t *t = va_arg (*args, session_queue_trace_t *);
605
606   s = format (s, "session index %d thread index %d",
607               t->session_index, t->server_thread_index);
608   return s;
609 }
610
611 #define foreach_session_queue_error             \
612 _(TX, "Packets transmitted")                    \
613 _(TIMER, "Timer events")                        \
614 _(NO_BUFFER, "Out of buffers")
615
616 typedef enum
617 {
618 #define _(sym,str) SESSION_QUEUE_ERROR_##sym,
619   foreach_session_queue_error
620 #undef _
621     SESSION_QUEUE_N_ERROR,
622 } session_queue_error_t;
623
624 static char *session_queue_error_strings[] = {
625 #define _(sym,string) string,
626   foreach_session_queue_error
627 #undef _
628 };
629
630 enum
631 {
632   SESSION_TX_NO_BUFFERS = -2,
633   SESSION_TX_NO_DATA,
634   SESSION_TX_OK
635 };
636
637 static void
638 session_tx_trace_frame (vlib_main_t * vm, vlib_node_runtime_t * node,
639                         u32 next_index, u32 * to_next, u16 n_segs,
640                         session_t * s, u32 n_trace)
641 {
642   while (n_trace && n_segs)
643     {
644       vlib_buffer_t *b = vlib_get_buffer (vm, to_next[0]);
645       if (PREDICT_TRUE
646           (vlib_trace_buffer
647            (vm, node, next_index, b, 1 /* follow_chain */ )))
648         {
649           session_queue_trace_t *t =
650             vlib_add_trace (vm, node, b, sizeof (*t));
651           t->session_index = s->session_index;
652           t->server_thread_index = s->thread_index;
653           n_trace--;
654         }
655       to_next++;
656       n_segs--;
657     }
658   vlib_set_trace_count (vm, node, n_trace);
659 }
660
661 always_inline void
662 session_tx_fifo_chain_tail (vlib_main_t * vm, session_tx_context_t * ctx,
663                             vlib_buffer_t * b, u16 * n_bufs, u8 peek_data)
664 {
665   vlib_buffer_t *chain_b, *prev_b;
666   u32 chain_bi0, to_deq, left_from_seg;
667   session_worker_t *wrk;
668   u16 len_to_deq, n_bytes_read;
669   u8 *data, j;
670
671   wrk = session_main_get_worker (ctx->s->thread_index);
672   b->flags |= VLIB_BUFFER_TOTAL_LENGTH_VALID;
673   b->total_length_not_including_first_buffer = 0;
674
675   chain_b = b;
676   left_from_seg = clib_min (ctx->sp.snd_mss - b->current_length,
677                             ctx->left_to_snd);
678   to_deq = left_from_seg;
679   for (j = 1; j < ctx->n_bufs_per_seg; j++)
680     {
681       prev_b = chain_b;
682       len_to_deq = clib_min (to_deq, ctx->deq_per_buf);
683
684       *n_bufs -= 1;
685       chain_bi0 = wrk->tx_buffers[*n_bufs];
686       chain_b = vlib_get_buffer (vm, chain_bi0);
687       chain_b->current_data = 0;
688       data = vlib_buffer_get_current (chain_b);
689       if (peek_data)
690         {
691           n_bytes_read = svm_fifo_peek (ctx->s->tx_fifo,
692                                         ctx->sp.tx_offset, len_to_deq, data);
693           ctx->sp.tx_offset += n_bytes_read;
694         }
695       else
696         {
697           if (ctx->transport_vft->transport_options.tx_type ==
698               TRANSPORT_TX_DGRAM)
699             {
700               svm_fifo_t *f = ctx->s->tx_fifo;
701               session_dgram_hdr_t *hdr = &ctx->hdr;
702               u16 deq_now;
703               u32 offset;
704
705               deq_now = clib_min (hdr->data_length - hdr->data_offset,
706                                   len_to_deq);
707               offset = hdr->data_offset + SESSION_CONN_HDR_LEN;
708               n_bytes_read = svm_fifo_peek (f, offset, deq_now, data);
709               ASSERT (n_bytes_read > 0);
710
711               hdr->data_offset += n_bytes_read;
712               if (hdr->data_offset == hdr->data_length)
713                 {
714                   offset = hdr->data_length + SESSION_CONN_HDR_LEN;
715                   svm_fifo_dequeue_drop (f, offset);
716                   if (ctx->left_to_snd > n_bytes_read)
717                     svm_fifo_peek (ctx->s->tx_fifo, 0, sizeof (ctx->hdr),
718                                    (u8 *) & ctx->hdr);
719                 }
720               else if (ctx->left_to_snd == n_bytes_read)
721                 svm_fifo_overwrite_head (ctx->s->tx_fifo, (u8 *) & ctx->hdr,
722                                          sizeof (session_dgram_pre_hdr_t));
723             }
724           else
725             n_bytes_read = svm_fifo_dequeue (ctx->s->tx_fifo,
726                                              len_to_deq, data);
727         }
728       ASSERT (n_bytes_read == len_to_deq);
729       chain_b->current_length = n_bytes_read;
730       b->total_length_not_including_first_buffer += chain_b->current_length;
731
732       /* update previous buffer */
733       prev_b->next_buffer = chain_bi0;
734       prev_b->flags |= VLIB_BUFFER_NEXT_PRESENT;
735
736       /* update current buffer */
737       chain_b->next_buffer = 0;
738
739       to_deq -= n_bytes_read;
740       if (to_deq == 0)
741         break;
742     }
743   ASSERT (to_deq == 0
744           && b->total_length_not_including_first_buffer == left_from_seg);
745   ctx->left_to_snd -= left_from_seg;
746 }
747
748 always_inline void
749 session_tx_fill_buffer (vlib_main_t * vm, session_tx_context_t * ctx,
750                         vlib_buffer_t * b, u16 * n_bufs, u8 peek_data)
751 {
752   u32 len_to_deq;
753   u8 *data0;
754   int n_bytes_read;
755
756   /*
757    * Start with the first buffer in chain
758    */
759   b->error = 0;
760   b->flags = VNET_BUFFER_F_LOCALLY_ORIGINATED;
761   b->current_data = 0;
762
763   data0 = vlib_buffer_make_headroom (b, TRANSPORT_MAX_HDRS_LEN);
764   len_to_deq = clib_min (ctx->left_to_snd, ctx->deq_per_first_buf);
765
766   if (peek_data)
767     {
768       n_bytes_read = svm_fifo_peek (ctx->s->tx_fifo, ctx->sp.tx_offset,
769                                     len_to_deq, data0);
770       ASSERT (n_bytes_read > 0);
771       /* Keep track of progress locally, transport is also supposed to
772        * increment it independently when pushing the header */
773       ctx->sp.tx_offset += n_bytes_read;
774     }
775   else
776     {
777       if (ctx->transport_vft->transport_options.tx_type == TRANSPORT_TX_DGRAM)
778         {
779           session_dgram_hdr_t *hdr = &ctx->hdr;
780           svm_fifo_t *f = ctx->s->tx_fifo;
781           u16 deq_now;
782           u32 offset;
783
784           ASSERT (hdr->data_length > hdr->data_offset);
785           deq_now = clib_min (hdr->data_length - hdr->data_offset,
786                               len_to_deq);
787           offset = hdr->data_offset + SESSION_CONN_HDR_LEN;
788           n_bytes_read = svm_fifo_peek (f, offset, deq_now, data0);
789           ASSERT (n_bytes_read > 0);
790
791           if (ctx->s->session_state == SESSION_STATE_LISTENING)
792             {
793               ip_copy (&ctx->tc->rmt_ip, &hdr->rmt_ip, ctx->tc->is_ip4);
794               ctx->tc->rmt_port = hdr->rmt_port;
795             }
796           hdr->data_offset += n_bytes_read;
797           if (hdr->data_offset == hdr->data_length)
798             {
799               offset = hdr->data_length + SESSION_CONN_HDR_LEN;
800               svm_fifo_dequeue_drop (f, offset);
801               if (ctx->left_to_snd > n_bytes_read)
802                 svm_fifo_peek (ctx->s->tx_fifo, 0, sizeof (ctx->hdr),
803                                (u8 *) & ctx->hdr);
804             }
805           else if (ctx->left_to_snd == n_bytes_read)
806             svm_fifo_overwrite_head (ctx->s->tx_fifo, (u8 *) & ctx->hdr,
807                                      sizeof (session_dgram_pre_hdr_t));
808         }
809       else
810         {
811           n_bytes_read = svm_fifo_dequeue (ctx->s->tx_fifo,
812                                            len_to_deq, data0);
813           ASSERT (n_bytes_read > 0);
814         }
815     }
816   b->current_length = n_bytes_read;
817   ctx->left_to_snd -= n_bytes_read;
818
819   /*
820    * Fill in the remaining buffers in the chain, if any
821    */
822   if (PREDICT_FALSE (ctx->n_bufs_per_seg > 1 && ctx->left_to_snd))
823     session_tx_fifo_chain_tail (vm, ctx, b, n_bufs, peek_data);
824 }
825
826 always_inline u8
827 session_tx_not_ready (session_t * s, u8 peek_data)
828 {
829   if (peek_data)
830     {
831       if (PREDICT_TRUE (s->session_state == SESSION_STATE_READY))
832         return 0;
833       /* Can retransmit for closed sessions but can't send new data if
834        * session is not ready or closed */
835       else if (s->session_state < SESSION_STATE_READY)
836         return 1;
837       else if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSED)
838         {
839           /* Allow closed transports to still send custom packets.
840            * For instance, tcp may want to send acks in time-wait. */
841           if (s->session_state != SESSION_STATE_TRANSPORT_DELETED
842               && (s->flags & SESSION_F_CUSTOM_TX))
843             return 0;
844           return 2;
845         }
846     }
847   return 0;
848 }
849
850 always_inline transport_connection_t *
851 session_tx_get_transport (session_tx_context_t * ctx, u8 peek_data)
852 {
853   if (peek_data)
854     {
855       return ctx->transport_vft->get_connection (ctx->s->connection_index,
856                                                  ctx->s->thread_index);
857     }
858   else
859     {
860       if (ctx->s->session_state == SESSION_STATE_LISTENING)
861         return ctx->transport_vft->get_listener (ctx->s->connection_index);
862       else
863         {
864           return ctx->transport_vft->get_connection (ctx->s->connection_index,
865                                                      ctx->s->thread_index);
866         }
867     }
868 }
869
870 always_inline void
871 session_tx_set_dequeue_params (vlib_main_t * vm, session_tx_context_t * ctx,
872                                u32 max_segs, u8 peek_data)
873 {
874   u32 n_bytes_per_buf, n_bytes_per_seg;
875
876   n_bytes_per_buf = vlib_buffer_get_default_data_size (vm);
877   ctx->max_dequeue = svm_fifo_max_dequeue_cons (ctx->s->tx_fifo);
878
879   if (peek_data)
880     {
881       /* Offset in rx fifo from where to peek data */
882       if (PREDICT_FALSE (ctx->sp.tx_offset >= ctx->max_dequeue))
883         {
884           ctx->max_len_to_snd = 0;
885           return;
886         }
887       ctx->max_dequeue -= ctx->sp.tx_offset;
888     }
889   else
890     {
891       if (ctx->transport_vft->transport_options.tx_type == TRANSPORT_TX_DGRAM)
892         {
893           u32 len, chain_limit;
894
895           if (ctx->max_dequeue <= sizeof (ctx->hdr))
896             {
897               ctx->max_len_to_snd = 0;
898               return;
899             }
900
901           svm_fifo_peek (ctx->s->tx_fifo, 0, sizeof (ctx->hdr),
902                          (u8 *) & ctx->hdr);
903           ASSERT (ctx->hdr.data_length > ctx->hdr.data_offset);
904           len = ctx->hdr.data_length - ctx->hdr.data_offset;
905
906           /* Process multiple dgrams if smaller than min (buf_space, mss).
907            * This avoids handling multiple dgrams if they require buffer
908            * chains */
909           chain_limit = clib_min (n_bytes_per_buf - TRANSPORT_MAX_HDRS_LEN,
910                                   ctx->sp.snd_mss);
911           if (ctx->hdr.data_length <= chain_limit)
912             {
913               u32 first_dgram_len, dgram_len, offset, max_offset;
914               session_dgram_hdr_t hdr;
915
916               ctx->sp.snd_mss = clib_min (ctx->sp.snd_mss, len);
917               offset = ctx->hdr.data_length + sizeof (session_dgram_hdr_t);
918               first_dgram_len = len;
919               max_offset = clib_min (ctx->max_dequeue, 16 << 10);
920
921               while (offset < max_offset)
922                 {
923                   svm_fifo_peek (ctx->s->tx_fifo, offset, sizeof (ctx->hdr),
924                                  (u8 *) & hdr);
925                   ASSERT (hdr.data_length > hdr.data_offset);
926                   dgram_len = hdr.data_length - hdr.data_offset;
927                   if (len + dgram_len > ctx->max_dequeue
928                       || first_dgram_len != dgram_len)
929                     break;
930                   len += dgram_len;
931                   offset += sizeof (hdr) + hdr.data_length;
932                 }
933             }
934
935           ctx->max_dequeue = len;
936         }
937     }
938   ASSERT (ctx->max_dequeue > 0);
939
940   /* Ensure we're not writing more than transport window allows */
941   if (ctx->max_dequeue < ctx->sp.snd_space)
942     {
943       /* Constrained by tx queue. Try to send only fully formed segments */
944       ctx->max_len_to_snd = (ctx->max_dequeue > ctx->sp.snd_mss) ?
945         (ctx->max_dequeue - (ctx->max_dequeue % ctx->sp.snd_mss)) :
946         ctx->max_dequeue;
947       /* TODO Nagle ? */
948     }
949   else
950     {
951       /* Expectation is that snd_space0 is already a multiple of snd_mss */
952       ctx->max_len_to_snd = ctx->sp.snd_space;
953     }
954
955   /* Check if we're tx constrained by the node */
956   ctx->n_segs_per_evt = ceil ((f64) ctx->max_len_to_snd / ctx->sp.snd_mss);
957   if (ctx->n_segs_per_evt > max_segs)
958     {
959       ctx->n_segs_per_evt = max_segs;
960       ctx->max_len_to_snd = max_segs * ctx->sp.snd_mss;
961     }
962
963   ASSERT (n_bytes_per_buf > TRANSPORT_MAX_HDRS_LEN);
964   if (ctx->n_segs_per_evt > 1)
965     {
966       u32 n_bytes_last_seg, n_bufs_last_seg;
967
968       n_bytes_per_seg = TRANSPORT_MAX_HDRS_LEN + ctx->sp.snd_mss;
969       n_bytes_last_seg = TRANSPORT_MAX_HDRS_LEN + ctx->max_len_to_snd
970         - ((ctx->n_segs_per_evt - 1) * ctx->sp.snd_mss);
971       ctx->n_bufs_per_seg = ceil ((f64) n_bytes_per_seg / n_bytes_per_buf);
972       n_bufs_last_seg = ceil ((f64) n_bytes_last_seg / n_bytes_per_buf);
973       ctx->n_bufs_needed = ((ctx->n_segs_per_evt - 1) * ctx->n_bufs_per_seg)
974         + n_bufs_last_seg;
975     }
976   else
977     {
978       n_bytes_per_seg = TRANSPORT_MAX_HDRS_LEN + ctx->max_len_to_snd;
979       ctx->n_bufs_per_seg = ceil ((f64) n_bytes_per_seg / n_bytes_per_buf);
980       ctx->n_bufs_needed = ctx->n_bufs_per_seg;
981     }
982
983   ctx->deq_per_buf = clib_min (ctx->sp.snd_mss, n_bytes_per_buf);
984   ctx->deq_per_first_buf = clib_min (ctx->sp.snd_mss,
985                                      n_bytes_per_buf -
986                                      TRANSPORT_MAX_HDRS_LEN);
987 }
988
989 always_inline void
990 session_tx_maybe_reschedule (session_worker_t * wrk,
991                              session_tx_context_t * ctx,
992                              session_evt_elt_t * elt)
993 {
994   session_t *s = ctx->s;
995
996   svm_fifo_unset_event (s->tx_fifo);
997   if (svm_fifo_max_dequeue_cons (s->tx_fifo) > ctx->sp.tx_offset)
998     if (svm_fifo_set_event (s->tx_fifo))
999       session_evt_add_head_old (wrk, elt);
1000 }
1001
1002 always_inline int
1003 session_tx_fifo_read_and_snd_i (session_worker_t * wrk,
1004                                 vlib_node_runtime_t * node,
1005                                 session_evt_elt_t * elt,
1006                                 int *n_tx_packets, u8 peek_data)
1007 {
1008   u32 n_trace, n_left, pbi, next_index, max_burst;
1009   session_tx_context_t *ctx = &wrk->ctx;
1010   session_main_t *smm = &session_main;
1011   session_event_t *e = &elt->evt;
1012   vlib_main_t *vm = wrk->vm;
1013   transport_proto_t tp;
1014   vlib_buffer_t *pb;
1015   u16 n_bufs, rv;
1016
1017   if (PREDICT_FALSE ((rv = session_tx_not_ready (ctx->s, peek_data))))
1018     {
1019       if (rv < 2)
1020         session_evt_add_old (wrk, elt);
1021       return SESSION_TX_NO_DATA;
1022     }
1023
1024   next_index = smm->session_type_to_next[ctx->s->session_type];
1025   max_burst = SESSION_NODE_FRAME_SIZE - *n_tx_packets;
1026
1027   tp = session_get_transport_proto (ctx->s);
1028   ctx->transport_vft = transport_protocol_get_vft (tp);
1029   ctx->tc = session_tx_get_transport (ctx, peek_data);
1030
1031   if (PREDICT_FALSE (e->event_type == SESSION_IO_EVT_TX_FLUSH))
1032     {
1033       if (ctx->transport_vft->flush_data)
1034         ctx->transport_vft->flush_data (ctx->tc);
1035       e->event_type = SESSION_IO_EVT_TX;
1036     }
1037
1038   if (ctx->s->flags & SESSION_F_CUSTOM_TX)
1039     {
1040       u32 n_custom_tx;
1041       ctx->s->flags &= ~SESSION_F_CUSTOM_TX;
1042       ctx->sp.max_burst_size = max_burst;
1043       n_custom_tx = ctx->transport_vft->custom_tx (ctx->tc, &ctx->sp);
1044       *n_tx_packets += n_custom_tx;
1045       if (PREDICT_FALSE
1046           (ctx->s->session_state >= SESSION_STATE_TRANSPORT_CLOSED))
1047         return SESSION_TX_OK;
1048       max_burst -= n_custom_tx;
1049       if (!max_burst || (ctx->s->flags & SESSION_F_CUSTOM_TX))
1050         {
1051           session_evt_add_old (wrk, elt);
1052           return SESSION_TX_OK;
1053         }
1054     }
1055
1056   transport_connection_snd_params (ctx->tc, &ctx->sp);
1057
1058   if (!ctx->sp.snd_space)
1059     {
1060       /* If the deschedule flag was set, remove session from scheduler.
1061        * Transport is responsible for rescheduling this session. */
1062       if (ctx->sp.flags & TRANSPORT_SND_F_DESCHED)
1063         transport_connection_deschedule (ctx->tc);
1064       /* Request to postpone the session, e.g., zero-wnd and transport
1065        * is not currently probing */
1066       else if (ctx->sp.flags & TRANSPORT_SND_F_POSTPONE)
1067         session_evt_add_old (wrk, elt);
1068       /* This flow queue is "empty" so it should be re-evaluated before
1069        * the ones that have data to send. */
1070       else
1071         session_evt_add_head_old (wrk, elt);
1072
1073       return SESSION_TX_NO_DATA;
1074     }
1075
1076   if (transport_connection_is_tx_paced (ctx->tc))
1077     {
1078       u32 snd_space = transport_connection_tx_pacer_burst (ctx->tc);
1079       if (snd_space < TRANSPORT_PACER_MIN_BURST)
1080         {
1081           session_evt_add_head_old (wrk, elt);
1082           return SESSION_TX_NO_DATA;
1083         }
1084       snd_space = clib_min (ctx->sp.snd_space, snd_space);
1085       ctx->sp.snd_space = snd_space >= ctx->sp.snd_mss ?
1086         snd_space - snd_space % ctx->sp.snd_mss : snd_space;
1087     }
1088
1089   /* Check how much we can pull. */
1090   session_tx_set_dequeue_params (vm, ctx, max_burst, peek_data);
1091
1092   if (PREDICT_FALSE (!ctx->max_len_to_snd))
1093     {
1094       transport_connection_tx_pacer_reset_bucket (ctx->tc, 0);
1095       session_tx_maybe_reschedule (wrk, ctx, elt);
1096       return SESSION_TX_NO_DATA;
1097     }
1098
1099   vec_validate_aligned (wrk->tx_buffers, ctx->n_bufs_needed - 1,
1100                         CLIB_CACHE_LINE_BYTES);
1101   n_bufs = vlib_buffer_alloc (vm, wrk->tx_buffers, ctx->n_bufs_needed);
1102   if (PREDICT_FALSE (n_bufs < ctx->n_bufs_needed))
1103     {
1104       if (n_bufs)
1105         vlib_buffer_free (vm, wrk->tx_buffers, n_bufs);
1106       session_evt_add_head_old (wrk, elt);
1107       vlib_node_increment_counter (wrk->vm, node->node_index,
1108                                    SESSION_QUEUE_ERROR_NO_BUFFER, 1);
1109       return SESSION_TX_NO_BUFFERS;
1110     }
1111
1112   if (transport_connection_is_tx_paced (ctx->tc))
1113     transport_connection_tx_pacer_update_bytes (ctx->tc, ctx->max_len_to_snd);
1114
1115   ctx->left_to_snd = ctx->max_len_to_snd;
1116   n_left = ctx->n_segs_per_evt;
1117
1118   while (n_left >= 4)
1119     {
1120       vlib_buffer_t *b0, *b1;
1121       u32 bi0, bi1;
1122
1123       pbi = wrk->tx_buffers[n_bufs - 3];
1124       pb = vlib_get_buffer (vm, pbi);
1125       vlib_prefetch_buffer_header (pb, STORE);
1126       pbi = wrk->tx_buffers[n_bufs - 4];
1127       pb = vlib_get_buffer (vm, pbi);
1128       vlib_prefetch_buffer_header (pb, STORE);
1129
1130       bi0 = wrk->tx_buffers[--n_bufs];
1131       bi1 = wrk->tx_buffers[--n_bufs];
1132
1133       b0 = vlib_get_buffer (vm, bi0);
1134       b1 = vlib_get_buffer (vm, bi1);
1135
1136       session_tx_fill_buffer (vm, ctx, b0, &n_bufs, peek_data);
1137       session_tx_fill_buffer (vm, ctx, b1, &n_bufs, peek_data);
1138
1139       ctx->transport_vft->push_header (ctx->tc, b0);
1140       ctx->transport_vft->push_header (ctx->tc, b1);
1141
1142       n_left -= 2;
1143
1144       vec_add1 (wrk->pending_tx_buffers, bi0);
1145       vec_add1 (wrk->pending_tx_buffers, bi1);
1146       vec_add1 (wrk->pending_tx_nexts, next_index);
1147       vec_add1 (wrk->pending_tx_nexts, next_index);
1148     }
1149   while (n_left)
1150     {
1151       vlib_buffer_t *b0;
1152       u32 bi0;
1153
1154       if (n_left > 1)
1155         {
1156           pbi = wrk->tx_buffers[n_bufs - 2];
1157           pb = vlib_get_buffer (vm, pbi);
1158           vlib_prefetch_buffer_header (pb, STORE);
1159         }
1160
1161       bi0 = wrk->tx_buffers[--n_bufs];
1162       b0 = vlib_get_buffer (vm, bi0);
1163       session_tx_fill_buffer (vm, ctx, b0, &n_bufs, peek_data);
1164
1165       /* Ask transport to push header after current_length and
1166        * total_length_not_including_first_buffer are updated */
1167       ctx->transport_vft->push_header (ctx->tc, b0);
1168
1169       n_left -= 1;
1170
1171       vec_add1 (wrk->pending_tx_buffers, bi0);
1172       vec_add1 (wrk->pending_tx_nexts, next_index);
1173     }
1174
1175   if (PREDICT_FALSE ((n_trace = vlib_get_trace_count (vm, node)) > 0))
1176     session_tx_trace_frame (vm, node, next_index, wrk->pending_tx_buffers,
1177                             ctx->n_segs_per_evt, ctx->s, n_trace);
1178
1179   if (PREDICT_FALSE (n_bufs))
1180     vlib_buffer_free (vm, wrk->tx_buffers, n_bufs);
1181
1182   *n_tx_packets += ctx->n_segs_per_evt;
1183
1184   SESSION_EVT (SESSION_EVT_DEQ, ctx->s, ctx->max_len_to_snd, ctx->max_dequeue,
1185                ctx->s->tx_fifo->has_event, wrk->last_vlib_time);
1186
1187   ASSERT (ctx->left_to_snd == 0);
1188
1189   /* If we couldn't dequeue all bytes reschedule as old flow. Otherwise,
1190    * check if application enqueued more data and reschedule accordingly */
1191   if (ctx->max_len_to_snd < ctx->max_dequeue)
1192     session_evt_add_old (wrk, elt);
1193   else
1194     session_tx_maybe_reschedule (wrk, ctx, elt);
1195
1196   if (!peek_data)
1197     {
1198       u32 n_dequeued = ctx->max_len_to_snd;
1199       if (ctx->transport_vft->transport_options.tx_type == TRANSPORT_TX_DGRAM)
1200         n_dequeued += ctx->n_segs_per_evt * SESSION_CONN_HDR_LEN;
1201       if (svm_fifo_needs_deq_ntf (ctx->s->tx_fifo, n_dequeued))
1202         session_dequeue_notify (ctx->s);
1203     }
1204   return SESSION_TX_OK;
1205 }
1206
1207 int
1208 session_tx_fifo_peek_and_snd (session_worker_t * wrk,
1209                               vlib_node_runtime_t * node,
1210                               session_evt_elt_t * e, int *n_tx_packets)
1211 {
1212   return session_tx_fifo_read_and_snd_i (wrk, node, e, n_tx_packets, 1);
1213 }
1214
1215 int
1216 session_tx_fifo_dequeue_and_snd (session_worker_t * wrk,
1217                                  vlib_node_runtime_t * node,
1218                                  session_evt_elt_t * e, int *n_tx_packets)
1219 {
1220   return session_tx_fifo_read_and_snd_i (wrk, node, e, n_tx_packets, 0);
1221 }
1222
1223 int
1224 session_tx_fifo_dequeue_internal (session_worker_t * wrk,
1225                                   vlib_node_runtime_t * node,
1226                                   session_evt_elt_t * elt, int *n_tx_packets)
1227 {
1228   transport_send_params_t *sp = &wrk->ctx.sp;
1229   session_t *s = wrk->ctx.s;
1230   u32 n_packets;
1231
1232   if (PREDICT_FALSE (s->session_state >= SESSION_STATE_TRANSPORT_CLOSED))
1233     return 0;
1234
1235   /* Clear custom-tx flag used to request reschedule for tx */
1236   s->flags &= ~SESSION_F_CUSTOM_TX;
1237
1238   sp->max_burst_size = clib_min (SESSION_NODE_FRAME_SIZE - *n_tx_packets,
1239                                  TRANSPORT_PACER_MAX_BURST_PKTS);
1240
1241   n_packets = transport_custom_tx (session_get_transport_proto (s), s, sp);
1242   *n_tx_packets += n_packets;
1243
1244   if (s->flags & SESSION_F_CUSTOM_TX)
1245     {
1246       session_evt_add_old (wrk, elt);
1247     }
1248   else if (!(sp->flags & TRANSPORT_SND_F_DESCHED))
1249     {
1250       svm_fifo_unset_event (s->tx_fifo);
1251       if (svm_fifo_max_dequeue_cons (s->tx_fifo))
1252         if (svm_fifo_set_event (s->tx_fifo))
1253           session_evt_add_head_old (wrk, elt);
1254     }
1255
1256   if (sp->max_burst_size &&
1257       svm_fifo_needs_deq_ntf (s->tx_fifo, sp->max_burst_size))
1258     session_dequeue_notify (s);
1259
1260   return n_packets;
1261 }
1262
1263 always_inline session_t *
1264 session_event_get_session (session_worker_t * wrk, session_event_t * e)
1265 {
1266   if (PREDICT_FALSE (pool_is_free_index (wrk->sessions, e->session_index)))
1267     return 0;
1268
1269   ASSERT (session_is_valid (e->session_index, wrk->vm->thread_index));
1270   return pool_elt_at_index (wrk->sessions, e->session_index);
1271 }
1272
1273 always_inline void
1274 session_event_dispatch_ctrl (session_worker_t * wrk, session_evt_elt_t * elt)
1275 {
1276   clib_llist_index_t ei;
1277   void (*fp) (void *);
1278   session_event_t *e;
1279   session_t *s;
1280
1281   ei = clib_llist_entry_index (wrk->event_elts, elt);
1282   e = &elt->evt;
1283
1284   switch (e->event_type)
1285     {
1286     case SESSION_CTRL_EVT_RPC:
1287       fp = e->rpc_args.fp;
1288       (*fp) (e->rpc_args.arg);
1289       break;
1290     case SESSION_CTRL_EVT_CLOSE:
1291       s = session_get_from_handle_if_valid (e->session_handle);
1292       if (PREDICT_FALSE (!s))
1293         break;
1294       session_transport_close (s);
1295       break;
1296     case SESSION_CTRL_EVT_RESET:
1297       s = session_get_from_handle_if_valid (e->session_handle);
1298       if (PREDICT_FALSE (!s))
1299         break;
1300       session_transport_reset (s);
1301       break;
1302     case SESSION_CTRL_EVT_LISTEN:
1303       session_mq_listen_handler (session_evt_ctrl_data (wrk, elt));
1304       break;
1305     case SESSION_CTRL_EVT_LISTEN_URI:
1306       session_mq_listen_uri_handler (session_evt_ctrl_data (wrk, elt));
1307       break;
1308     case SESSION_CTRL_EVT_UNLISTEN:
1309       session_mq_unlisten_handler (session_evt_ctrl_data (wrk, elt));
1310       break;
1311     case SESSION_CTRL_EVT_CONNECT:
1312       session_mq_connect_handler (session_evt_ctrl_data (wrk, elt));
1313       break;
1314     case SESSION_CTRL_EVT_CONNECT_URI:
1315       session_mq_connect_uri_handler (session_evt_ctrl_data (wrk, elt));
1316       break;
1317     case SESSION_CTRL_EVT_DISCONNECT:
1318       session_mq_disconnect_handler (session_evt_ctrl_data (wrk, elt));
1319       break;
1320     case SESSION_CTRL_EVT_DISCONNECTED:
1321       session_mq_disconnected_handler (session_evt_ctrl_data (wrk, elt));
1322       break;
1323     case SESSION_CTRL_EVT_ACCEPTED_REPLY:
1324       session_mq_accepted_reply_handler (session_evt_ctrl_data (wrk, elt));
1325       break;
1326     case SESSION_CTRL_EVT_DISCONNECTED_REPLY:
1327       session_mq_disconnected_reply_handler (session_evt_ctrl_data (wrk,
1328                                                                     elt));
1329       break;
1330     case SESSION_CTRL_EVT_RESET_REPLY:
1331       session_mq_reset_reply_handler (session_evt_ctrl_data (wrk, elt));
1332       break;
1333     case SESSION_CTRL_EVT_WORKER_UPDATE:
1334       session_mq_worker_update_handler (session_evt_ctrl_data (wrk, elt));
1335       break;
1336     case SESSION_CTRL_EVT_APP_DETACH:
1337       app_mq_detach_handler (session_evt_ctrl_data (wrk, elt));
1338       break;
1339     case SESSION_CTRL_EVT_APP_WRK_RPC:
1340       session_mq_app_wrk_rpc_handler (session_evt_ctrl_data (wrk, elt));
1341       break;
1342     case SESSION_CTRL_EVT_TRANSPORT_ATTR:
1343       session_mq_transport_attr_handler (session_evt_ctrl_data (wrk, elt));
1344       break;
1345     default:
1346       clib_warning ("unhandled event type %d", e->event_type);
1347     }
1348
1349   /* Regrab elements in case pool moved */
1350   elt = pool_elt_at_index (wrk->event_elts, ei);
1351   if (!clib_llist_elt_is_linked (elt, evt_list))
1352     {
1353       e = &elt->evt;
1354       if (e->event_type >= SESSION_CTRL_EVT_BOUND)
1355         session_evt_ctrl_data_free (wrk, elt);
1356       session_evt_elt_free (wrk, elt);
1357     }
1358   SESSION_EVT (SESSION_EVT_COUNTS, CNT_CTRL_EVTS, 1, wrk);
1359 }
1360
1361 always_inline void
1362 session_event_dispatch_io (session_worker_t * wrk, vlib_node_runtime_t * node,
1363                            session_evt_elt_t * elt, int *n_tx_packets)
1364 {
1365   session_main_t *smm = &session_main;
1366   app_worker_t *app_wrk;
1367   clib_llist_index_t ei;
1368   session_event_t *e;
1369   session_t *s;
1370
1371   ei = clib_llist_entry_index (wrk->event_elts, elt);
1372   e = &elt->evt;
1373
1374   switch (e->event_type)
1375     {
1376     case SESSION_IO_EVT_TX_FLUSH:
1377     case SESSION_IO_EVT_TX:
1378       s = session_event_get_session (wrk, e);
1379       if (PREDICT_FALSE (!s))
1380         break;
1381       CLIB_PREFETCH (s->tx_fifo, 2 * CLIB_CACHE_LINE_BYTES, LOAD);
1382       wrk->ctx.s = s;
1383       /* Spray packets in per session type frames, since they go to
1384        * different nodes */
1385       (smm->session_tx_fns[s->session_type]) (wrk, node, elt, n_tx_packets);
1386       break;
1387     case SESSION_IO_EVT_RX:
1388       s = session_event_get_session (wrk, e);
1389       if (!s)
1390         break;
1391       transport_app_rx_evt (session_get_transport_proto (s),
1392                             s->connection_index, s->thread_index);
1393       break;
1394     case SESSION_IO_EVT_BUILTIN_RX:
1395       s = session_event_get_session (wrk, e);
1396       if (PREDICT_FALSE (!s || s->session_state >= SESSION_STATE_CLOSING))
1397         break;
1398       svm_fifo_unset_event (s->rx_fifo);
1399       app_wrk = app_worker_get (s->app_wrk_index);
1400       app_worker_builtin_rx (app_wrk, s);
1401       break;
1402     case SESSION_IO_EVT_BUILTIN_TX:
1403       s = session_get_from_handle_if_valid (e->session_handle);
1404       wrk->ctx.s = s;
1405       if (PREDICT_TRUE (s != 0))
1406         session_tx_fifo_dequeue_internal (wrk, node, elt, n_tx_packets);
1407       break;
1408     default:
1409       clib_warning ("unhandled event type %d", e->event_type);
1410     }
1411
1412   SESSION_EVT (SESSION_IO_EVT_COUNTS, e->event_type, 1, wrk);
1413
1414   /* Regrab elements in case pool moved */
1415   elt = pool_elt_at_index (wrk->event_elts, ei);
1416   if (!clib_llist_elt_is_linked (elt, evt_list))
1417     session_evt_elt_free (wrk, elt);
1418 }
1419
1420 /* *INDENT-OFF* */
1421 static const u32 session_evt_msg_sizes[] = {
1422 #define _(symc, sym)                                                    \
1423   [SESSION_CTRL_EVT_ ## symc] = sizeof (session_ ## sym ##_msg_t),
1424   foreach_session_ctrl_evt
1425 #undef _
1426 };
1427 /* *INDENT-ON* */
1428
1429 always_inline void
1430 session_evt_add_to_list (session_worker_t * wrk, session_event_t * evt)
1431 {
1432   session_evt_elt_t *elt;
1433
1434   if (evt->event_type >= SESSION_CTRL_EVT_RPC)
1435     {
1436       elt = session_evt_alloc_ctrl (wrk);
1437       if (evt->event_type >= SESSION_CTRL_EVT_BOUND)
1438         {
1439           elt->evt.ctrl_data_index = session_evt_ctrl_data_alloc (wrk);
1440           elt->evt.event_type = evt->event_type;
1441           clib_memcpy_fast (session_evt_ctrl_data (wrk, elt), evt->data,
1442                             session_evt_msg_sizes[evt->event_type]);
1443         }
1444       else
1445         {
1446           /* Internal control events fit into io events footprint */
1447           clib_memcpy_fast (&elt->evt, evt, sizeof (elt->evt));
1448         }
1449     }
1450   else
1451     {
1452       elt = session_evt_alloc_new (wrk);
1453       clib_memcpy_fast (&elt->evt, evt, sizeof (elt->evt));
1454     }
1455 }
1456
1457 static void
1458 session_flush_pending_tx_buffers (session_worker_t * wrk,
1459                                   vlib_node_runtime_t * node)
1460 {
1461   vlib_buffer_enqueue_to_next (wrk->vm, node, wrk->pending_tx_buffers,
1462                                wrk->pending_tx_nexts,
1463                                vec_len (wrk->pending_tx_nexts));
1464   vec_reset_length (wrk->pending_tx_buffers);
1465   vec_reset_length (wrk->pending_tx_nexts);
1466 }
1467
1468 int
1469 session_wrk_handle_mq (session_worker_t *wrk, svm_msg_q_t *mq)
1470 {
1471   svm_msg_q_msg_t _msg, *msg = &_msg;
1472   u32 i, n_to_dequeue = 0;
1473   session_event_t *evt;
1474
1475   n_to_dequeue = svm_msg_q_size (mq);
1476   for (i = 0; i < n_to_dequeue; i++)
1477     {
1478       svm_msg_q_sub_raw (mq, msg);
1479       evt = svm_msg_q_msg_data (mq, msg);
1480       session_evt_add_to_list (wrk, evt);
1481       svm_msg_q_free_msg (mq, msg);
1482     }
1483
1484   return n_to_dequeue;
1485 }
1486
1487 static void
1488 session_wrk_timerfd_update (session_worker_t *wrk, u64 time_ns)
1489 {
1490   struct itimerspec its;
1491
1492   its.it_value.tv_sec = 0;
1493   its.it_value.tv_nsec = time_ns;
1494   its.it_interval.tv_sec = 0;
1495   its.it_interval.tv_nsec = its.it_value.tv_nsec;
1496
1497   if (timerfd_settime (wrk->timerfd, 0, &its, NULL) == -1)
1498     clib_warning ("timerfd_settime");
1499 }
1500
1501 always_inline u64
1502 session_wrk_tfd_timeout (session_wrk_state_t state, u32 thread_index)
1503 {
1504   if (state == SESSION_WRK_INTERRUPT)
1505     return thread_index ? 1e6 : vlib_num_workers () ? 5e8 : 1e6;
1506   else if (state == SESSION_WRK_IDLE)
1507     return thread_index ? 1e8 : vlib_num_workers () ? 5e8 : 1e8;
1508   else
1509     return 0;
1510 }
1511
1512 static inline void
1513 session_wrk_set_state (session_worker_t *wrk, session_wrk_state_t state)
1514 {
1515   u64 time_ns;
1516
1517   wrk->state = state;
1518   time_ns = session_wrk_tfd_timeout (state, wrk->vm->thread_index);
1519   session_wrk_timerfd_update (wrk, time_ns);
1520 }
1521
1522 static void
1523 session_wrk_update_state (session_worker_t *wrk)
1524 {
1525   vlib_main_t *vm = wrk->vm;
1526
1527   if (wrk->state == SESSION_WRK_POLLING)
1528     {
1529       if (pool_elts (wrk->event_elts) == 3 &&
1530           vlib_last_vectors_per_main_loop (vm) < 1)
1531         {
1532           session_wrk_set_state (wrk, SESSION_WRK_INTERRUPT);
1533           vlib_node_set_state (vm, session_queue_node.index,
1534                                VLIB_NODE_STATE_INTERRUPT);
1535         }
1536     }
1537   else if (wrk->state == SESSION_WRK_INTERRUPT)
1538     {
1539       if (pool_elts (wrk->event_elts) > 3 ||
1540           vlib_last_vectors_per_main_loop (vm) > 1)
1541         {
1542           session_wrk_set_state (wrk, SESSION_WRK_POLLING);
1543           vlib_node_set_state (vm, session_queue_node.index,
1544                                VLIB_NODE_STATE_POLLING);
1545         }
1546       else if (PREDICT_FALSE (!pool_elts (wrk->sessions)))
1547         {
1548           session_wrk_set_state (wrk, SESSION_WRK_IDLE);
1549         }
1550     }
1551   else
1552     {
1553       if (pool_elts (wrk->event_elts))
1554         {
1555           session_wrk_set_state (wrk, SESSION_WRK_INTERRUPT);
1556         }
1557     }
1558 }
1559
1560 static uword
1561 session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
1562                        vlib_frame_t * frame)
1563 {
1564   u32 thread_index = vm->thread_index, __clib_unused n_evts;
1565   session_evt_elt_t *elt, *ctrl_he, *new_he, *old_he;
1566   session_main_t *smm = vnet_get_session_main ();
1567   session_worker_t *wrk = &smm->wrk[thread_index];
1568   clib_llist_index_t ei, next_ei, old_ti;
1569   int n_tx_packets;
1570
1571   SESSION_EVT (SESSION_EVT_DISPATCH_START, wrk);
1572
1573   session_wrk_update_time (wrk, vlib_time_now (vm));
1574
1575   /*
1576    *  Update transport time
1577    */
1578   transport_update_time (wrk->last_vlib_time, thread_index);
1579   n_tx_packets = vec_len (wrk->pending_tx_buffers);
1580   SESSION_EVT (SESSION_EVT_DSP_CNTRS, UPDATE_TIME, wrk);
1581
1582   /*
1583    *  Dequeue new internal mq events
1584    */
1585
1586   n_evts = session_wrk_handle_mq (wrk, wrk->vpp_event_queue);
1587   SESSION_EVT (SESSION_EVT_DSP_CNTRS, MQ_DEQ, wrk, n_evts);
1588
1589   /*
1590    * Handle control events
1591    */
1592
1593   ctrl_he = pool_elt_at_index (wrk->event_elts, wrk->ctrl_head);
1594
1595   clib_llist_foreach_safe (wrk->event_elts, evt_list, ctrl_he, elt, ({
1596     clib_llist_remove (wrk->event_elts, evt_list, elt);
1597     session_event_dispatch_ctrl (wrk, elt);
1598   }));
1599
1600   SESSION_EVT (SESSION_EVT_DSP_CNTRS, CTRL_EVTS, wrk);
1601
1602   /*
1603    * Handle the new io events.
1604    */
1605
1606   new_he = pool_elt_at_index (wrk->event_elts, wrk->new_head);
1607   old_he = pool_elt_at_index (wrk->event_elts, wrk->old_head);
1608   old_ti = clib_llist_prev_index (old_he, evt_list);
1609
1610   ei = clib_llist_next_index (new_he, evt_list);
1611   while (ei != wrk->new_head && n_tx_packets < SESSION_NODE_FRAME_SIZE)
1612     {
1613       elt = pool_elt_at_index (wrk->event_elts, ei);
1614       ei = clib_llist_next_index (elt, evt_list);
1615       clib_llist_remove (wrk->event_elts, evt_list, elt);
1616       session_event_dispatch_io (wrk, node, elt, &n_tx_packets);
1617     }
1618
1619   SESSION_EVT (SESSION_EVT_DSP_CNTRS, NEW_IO_EVTS, wrk);
1620
1621   /*
1622    * Handle the old io events, if we had any prior to processing the new ones
1623    */
1624
1625   if (old_ti != wrk->old_head)
1626     {
1627       old_he = pool_elt_at_index (wrk->event_elts, wrk->old_head);
1628       ei = clib_llist_next_index (old_he, evt_list);
1629
1630       while (n_tx_packets < SESSION_NODE_FRAME_SIZE)
1631         {
1632           elt = pool_elt_at_index (wrk->event_elts, ei);
1633           next_ei = clib_llist_next_index (elt, evt_list);
1634           clib_llist_remove (wrk->event_elts, evt_list, elt);
1635
1636           session_event_dispatch_io (wrk, node, elt, &n_tx_packets);
1637
1638           if (ei == old_ti)
1639             break;
1640
1641           ei = next_ei;
1642         };
1643     }
1644
1645   SESSION_EVT (SESSION_EVT_DSP_CNTRS, OLD_IO_EVTS, wrk);
1646
1647   if (vec_len (wrk->pending_tx_buffers))
1648     session_flush_pending_tx_buffers (wrk, node);
1649
1650   vlib_node_increment_counter (vm, session_queue_node.index,
1651                                SESSION_QUEUE_ERROR_TX, n_tx_packets);
1652
1653   SESSION_EVT (SESSION_EVT_DISPATCH_END, wrk, n_tx_packets);
1654
1655   if (wrk->flags & SESSION_WRK_F_ADAPTIVE)
1656     session_wrk_update_state (wrk);
1657
1658   return n_tx_packets;
1659 }
1660
1661 /* *INDENT-OFF* */
1662 VLIB_REGISTER_NODE (session_queue_node) =
1663 {
1664   .function = session_queue_node_fn,
1665   .flags = VLIB_NODE_FLAG_TRACE_SUPPORTED,
1666   .name = "session-queue",
1667   .format_trace = format_session_queue_trace,
1668   .type = VLIB_NODE_TYPE_INPUT,
1669   .n_errors = ARRAY_LEN (session_queue_error_strings),
1670   .error_strings = session_queue_error_strings,
1671   .state = VLIB_NODE_STATE_DISABLED,
1672 };
1673 /* *INDENT-ON* */
1674
1675 static clib_error_t *
1676 session_wrk_tfd_read_ready (clib_file_t *cf)
1677 {
1678   session_worker_t *wrk = session_main_get_worker (cf->private_data);
1679   u64 buf;
1680   int rv;
1681
1682   vlib_node_set_interrupt_pending (wrk->vm, session_queue_node.index);
1683   rv = read (wrk->timerfd, &buf, sizeof (buf));
1684   if (rv < 0 && errno != EAGAIN)
1685     clib_unix_warning ("failed");
1686   return 0;
1687 }
1688
1689 static clib_error_t *
1690 session_wrk_tfd_write_ready (clib_file_t *cf)
1691 {
1692   return 0;
1693 }
1694
1695 void
1696 session_wrk_enable_adaptive_mode (session_worker_t *wrk)
1697 {
1698   u32 thread_index = wrk->vm->thread_index;
1699   clib_file_t template = { 0 };
1700
1701   if ((wrk->timerfd = timerfd_create (CLOCK_MONOTONIC, TFD_NONBLOCK)) < 0)
1702     clib_warning ("timerfd_create");
1703
1704   template.read_function = session_wrk_tfd_read_ready;
1705   template.write_function = session_wrk_tfd_write_ready;
1706   template.file_descriptor = wrk->timerfd;
1707   template.private_data = thread_index;
1708   template.polling_thread_index = thread_index;
1709   template.description = format (0, "session-wrk-tfd-%u", thread_index);
1710
1711   wrk->timerfd_file = clib_file_add (&file_main, &template);
1712   wrk->flags |= SESSION_WRK_F_ADAPTIVE;
1713 }
1714
1715 static clib_error_t *
1716 session_queue_exit (vlib_main_t * vm)
1717 {
1718   if (vlib_get_n_threads () < 2)
1719     return 0;
1720
1721   /*
1722    * Shut off (especially) worker-thread session nodes.
1723    * Otherwise, vpp can crash as the main thread unmaps the
1724    * API segment.
1725    */
1726   vlib_worker_thread_barrier_sync (vm);
1727   session_node_enable_disable (0 /* is_enable */ );
1728   vlib_worker_thread_barrier_release (vm);
1729   return 0;
1730 }
1731
1732 VLIB_MAIN_LOOP_EXIT_FUNCTION (session_queue_exit);
1733
1734 static uword
1735 session_queue_run_on_main (vlib_main_t * vm)
1736 {
1737   vlib_node_runtime_t *node;
1738
1739   node = vlib_node_get_runtime (vm, session_queue_node.index);
1740   return session_queue_node_fn (vm, node, 0);
1741 }
1742
1743 static uword
1744 session_queue_process (vlib_main_t * vm, vlib_node_runtime_t * rt,
1745                        vlib_frame_t * f)
1746 {
1747   uword *event_data = 0;
1748   f64 timeout = 1.0;
1749   uword event_type;
1750
1751   while (1)
1752     {
1753       vlib_process_wait_for_event_or_clock (vm, timeout);
1754       event_type = vlib_process_get_events (vm, (uword **) & event_data);
1755
1756       switch (event_type)
1757         {
1758         case SESSION_Q_PROCESS_RUN_ON_MAIN:
1759           /* Run session queue node on main thread */
1760           session_queue_run_on_main (vm);
1761           break;
1762         case SESSION_Q_PROCESS_STOP:
1763           vlib_node_set_state (vm, session_queue_process_node.index,
1764                                VLIB_NODE_STATE_DISABLED);
1765           timeout = 100000.0;
1766           break;
1767         case ~0:
1768           /* Timed out. Run on main to ensure all events are handled */
1769           session_queue_run_on_main (vm);
1770           break;
1771         }
1772       vec_reset_length (event_data);
1773     }
1774   return 0;
1775 }
1776
1777 /* *INDENT-OFF* */
1778 VLIB_REGISTER_NODE (session_queue_process_node) =
1779 {
1780   .function = session_queue_process,
1781   .type = VLIB_NODE_TYPE_PROCESS,
1782   .name = "session-queue-process",
1783   .state = VLIB_NODE_STATE_DISABLED,
1784 };
1785 /* *INDENT-ON* */
1786
1787 static_always_inline uword
1788 session_queue_pre_input_inline (vlib_main_t * vm, vlib_node_runtime_t * node,
1789                                 vlib_frame_t * frame)
1790 {
1791   session_main_t *sm = &session_main;
1792   if (!sm->wrk[0].vpp_event_queue)
1793     return 0;
1794   node = vlib_node_get_runtime (vm, session_queue_node.index);
1795   return session_queue_node_fn (vm, node, frame);
1796 }
1797
1798 /* *INDENT-OFF* */
1799 VLIB_REGISTER_NODE (session_queue_pre_input_node) =
1800 {
1801   .function = session_queue_pre_input_inline,
1802   .type = VLIB_NODE_TYPE_PRE_INPUT,
1803   .name = "session-queue-main",
1804   .state = VLIB_NODE_STATE_DISABLED,
1805 };
1806 /* *INDENT-ON* */
1807
1808 /*
1809  * fd.io coding-style-patch-verification: ON
1810  *
1811  * Local Variables:
1812  * eval: (c-set-style "gnu")
1813  * End:
1814  */