vcl session: refactor passing of crypto context
[vpp.git] / src / vnet / session / session_node.c
1 /*
2  * Copyright (c) 2017-2019 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include <math.h>
17 #include <vlib/vlib.h>
18 #include <vnet/vnet.h>
19 #include <vppinfra/elog.h>
20 #include <vnet/session/transport.h>
21 #include <vnet/session/session.h>
22 #include <vnet/session/application.h>
23 #include <vnet/session/application_interface.h>
24 #include <vnet/session/application_local.h>
25 #include <vnet/session/session_debug.h>
26 #include <svm/queue.h>
27 #include <sys/timerfd.h>
28
29 #define app_check_thread_and_barrier(_fn, _arg)                         \
30   if (!vlib_thread_is_main_w_barrier ())                                \
31     {                                                                   \
32      vlib_rpc_call_main_thread (_fn, (u8 *) _arg, sizeof(*_arg));       \
33       return;                                                           \
34    }
35
36 static transport_endpt_ext_cfg_t *
37 session_mq_get_ext_config (application_t *app, uword offset)
38 {
39   svm_fifo_chunk_t *c;
40   fifo_segment_t *fs;
41
42   fs = application_get_rx_mqs_segment (app);
43   c = fs_chunk_ptr (fs->h, offset);
44   return (transport_endpt_ext_cfg_t *) c->data;
45 }
46
47 static void
48 session_mq_free_ext_config (application_t *app, uword offset)
49 {
50   u32 ctrl_thread = vlib_num_workers () ? 1 : 0;
51   svm_fifo_chunk_t *c;
52   fifo_segment_t *fs;
53
54   fs = application_get_rx_mqs_segment (app);
55   c = fs_chunk_ptr (fs->h, offset);
56   fifo_segment_collect_chunk (fs, ctrl_thread, c);
57 }
58
59 static void
60 session_mq_listen_handler (void *data)
61 {
62   session_listen_msg_t *mp = (session_listen_msg_t *) data;
63   vnet_listen_args_t _a, *a = &_a;
64   app_worker_t *app_wrk;
65   application_t *app;
66   int rv;
67
68   app_check_thread_and_barrier (session_mq_listen_handler, mp);
69
70   app = application_lookup (mp->client_index);
71   if (!app)
72     return;
73
74   clib_memset (a, 0, sizeof (*a));
75   a->sep.is_ip4 = mp->is_ip4;
76   ip_copy (&a->sep.ip, &mp->ip, mp->is_ip4);
77   a->sep.port = mp->port;
78   a->sep.fib_index = mp->vrf;
79   a->sep.sw_if_index = ENDPOINT_INVALID_INDEX;
80   a->sep.transport_proto = mp->proto;
81   a->app_index = app->app_index;
82   a->wrk_map_index = mp->wrk_index;
83   a->sep_ext.transport_flags = mp->flags;
84
85   if (mp->ext_config)
86     a->sep_ext.ext_cfg = session_mq_get_ext_config (app, mp->ext_config);
87
88   if ((rv = vnet_listen (a)))
89     clib_warning ("listen returned: %U", format_session_error, rv);
90
91   app_wrk = application_get_worker (app, mp->wrk_index);
92   mq_send_session_bound_cb (app_wrk->wrk_index, mp->context, a->handle, rv);
93
94   if (mp->ext_config)
95     session_mq_free_ext_config (app, mp->ext_config);
96 }
97
98 static void
99 session_mq_listen_uri_handler (void *data)
100 {
101   session_listen_uri_msg_t *mp = (session_listen_uri_msg_t *) data;
102   vnet_listen_args_t _a, *a = &_a;
103   app_worker_t *app_wrk;
104   application_t *app;
105   int rv;
106
107   app_check_thread_and_barrier (session_mq_listen_uri_handler, mp);
108
109   app = application_lookup (mp->client_index);
110   if (!app)
111     return;
112
113   clib_memset (a, 0, sizeof (*a));
114   a->uri = (char *) mp->uri;
115   a->app_index = app->app_index;
116   rv = vnet_bind_uri (a);
117
118   app_wrk = application_get_worker (app, 0);
119   mq_send_session_bound_cb (app_wrk->wrk_index, mp->context, a->handle, rv);
120 }
121
122 static void
123 session_mq_connect_handler (void *data)
124 {
125   session_connect_msg_t *mp = (session_connect_msg_t *) data;
126   vnet_connect_args_t _a, *a = &_a;
127   app_worker_t *app_wrk;
128   application_t *app;
129   int rv;
130
131   app_check_thread_and_barrier (session_mq_connect_handler, mp);
132
133   app = application_lookup (mp->client_index);
134   if (!app)
135     return;
136
137   clib_memset (a, 0, sizeof (*a));
138   a->sep.is_ip4 = mp->is_ip4;
139   clib_memcpy_fast (&a->sep.ip, &mp->ip, sizeof (mp->ip));
140   a->sep.port = mp->port;
141   a->sep.transport_proto = mp->proto;
142   a->sep.peer.fib_index = mp->vrf;
143   clib_memcpy_fast (&a->sep.peer.ip, &mp->lcl_ip, sizeof (mp->lcl_ip));
144   if (mp->is_ip4)
145     {
146       ip46_address_mask_ip4 (&a->sep.ip);
147       ip46_address_mask_ip4 (&a->sep.peer.ip);
148     }
149   a->sep.peer.port = mp->lcl_port;
150   a->sep.peer.sw_if_index = ENDPOINT_INVALID_INDEX;
151   a->sep_ext.parent_handle = mp->parent_handle;
152   a->sep_ext.transport_flags = mp->flags;
153   a->api_context = mp->context;
154   a->app_index = app->app_index;
155   a->wrk_map_index = mp->wrk_index;
156
157   if (mp->ext_config)
158     a->sep_ext.ext_cfg = session_mq_get_ext_config (app, mp->ext_config);
159
160   if ((rv = vnet_connect (a)))
161     {
162       clib_warning ("connect returned: %U", format_session_error, rv);
163       app_wrk = application_get_worker (app, mp->wrk_index);
164       mq_send_session_connected_cb (app_wrk->wrk_index, mp->context, 0, rv);
165     }
166
167   if (mp->ext_config)
168     session_mq_free_ext_config (app, mp->ext_config);
169 }
170
171 static void
172 session_mq_connect_uri_handler (void *data)
173 {
174   session_connect_uri_msg_t *mp = (session_connect_uri_msg_t *) data;
175   vnet_connect_args_t _a, *a = &_a;
176   app_worker_t *app_wrk;
177   application_t *app;
178   int rv;
179
180   app_check_thread_and_barrier (session_mq_connect_uri_handler, mp);
181
182   app = application_lookup (mp->client_index);
183   if (!app)
184     return;
185
186   clib_memset (a, 0, sizeof (*a));
187   a->uri = (char *) mp->uri;
188   a->api_context = mp->context;
189   a->app_index = app->app_index;
190   if ((rv = vnet_connect_uri (a)))
191     {
192       clib_warning ("connect_uri returned: %d", rv);
193       app_wrk = application_get_worker (app, 0 /* default wrk only */ );
194       mq_send_session_connected_cb (app_wrk->wrk_index, mp->context, 0, rv);
195     }
196 }
197
198 static void
199 session_mq_disconnect_handler (void *data)
200 {
201   session_disconnect_msg_t *mp = (session_disconnect_msg_t *) data;
202   vnet_disconnect_args_t _a, *a = &_a;
203   application_t *app;
204
205   app = application_lookup (mp->client_index);
206   if (!app)
207     return;
208
209   a->app_index = app->app_index;
210   a->handle = mp->handle;
211   vnet_disconnect_session (a);
212 }
213
214 static void
215 app_mq_detach_handler (void *data)
216 {
217   session_app_detach_msg_t *mp = (session_app_detach_msg_t *) data;
218   vnet_app_detach_args_t _a, *a = &_a;
219   application_t *app;
220
221   app_check_thread_and_barrier (app_mq_detach_handler, mp);
222
223   app = application_lookup (mp->client_index);
224   if (!app)
225     return;
226
227   a->app_index = app->app_index;
228   a->api_client_index = mp->client_index;
229   vnet_application_detach (a);
230 }
231
232 static void
233 session_mq_unlisten_handler (void *data)
234 {
235   session_unlisten_msg_t *mp = (session_unlisten_msg_t *) data;
236   vnet_unlisten_args_t _a, *a = &_a;
237   app_worker_t *app_wrk;
238   application_t *app;
239   int rv;
240
241   app_check_thread_and_barrier (session_mq_unlisten_handler, mp);
242
243   app = application_lookup (mp->client_index);
244   if (!app)
245     return;
246
247   clib_memset (a, 0, sizeof (*a));
248   a->app_index = app->app_index;
249   a->handle = mp->handle;
250   a->wrk_map_index = mp->wrk_index;
251   if ((rv = vnet_unlisten (a)))
252     clib_warning ("unlisten returned: %d", rv);
253
254   app_wrk = application_get_worker (app, a->wrk_map_index);
255   if (!app_wrk)
256     return;
257
258   mq_send_unlisten_reply (app_wrk, mp->handle, mp->context, rv);
259 }
260
261 static void
262 session_mq_accepted_reply_handler (void *data)
263 {
264   session_accepted_reply_msg_t *mp = (session_accepted_reply_msg_t *) data;
265   vnet_disconnect_args_t _a = { 0 }, *a = &_a;
266   session_state_t old_state;
267   app_worker_t *app_wrk;
268   session_t *s;
269
270   /* Server isn't interested, kill the session */
271   if (mp->retval)
272     {
273       a->app_index = mp->context;
274       a->handle = mp->handle;
275       vnet_disconnect_session (a);
276       return;
277     }
278
279   /* Mail this back from the main thread. We're not polling in main
280    * thread so we're using other workers for notifications. */
281   if (vlib_num_workers () && vlib_get_thread_index () != 0
282       && session_thread_from_handle (mp->handle) == 0)
283     {
284       vlib_rpc_call_main_thread (session_mq_accepted_reply_handler,
285                                  (u8 *) mp, sizeof (*mp));
286       return;
287     }
288
289   s = session_get_from_handle_if_valid (mp->handle);
290   if (!s)
291     return;
292
293   app_wrk = app_worker_get (s->app_wrk_index);
294   if (app_wrk->app_index != mp->context)
295     {
296       clib_warning ("app doesn't own session");
297       return;
298     }
299
300   if (!session_has_transport (s))
301     {
302       s->session_state = SESSION_STATE_READY;
303       if (ct_session_connect_notify (s))
304         return;
305     }
306   else
307     {
308       old_state = s->session_state;
309       s->session_state = SESSION_STATE_READY;
310
311       if (!svm_fifo_is_empty_prod (s->rx_fifo))
312         app_worker_lock_and_send_event (app_wrk, s, SESSION_IO_EVT_RX);
313
314       /* Closed while waiting for app to reply. Resend disconnect */
315       if (old_state >= SESSION_STATE_TRANSPORT_CLOSING)
316         {
317           app_worker_close_notify (app_wrk, s);
318           s->session_state = old_state;
319           return;
320         }
321     }
322 }
323
324 static void
325 session_mq_reset_reply_handler (void *data)
326 {
327   vnet_disconnect_args_t _a = { 0 }, *a = &_a;
328   session_reset_reply_msg_t *mp;
329   app_worker_t *app_wrk;
330   session_t *s;
331   application_t *app;
332   u32 index, thread_index;
333
334   mp = (session_reset_reply_msg_t *) data;
335   app = application_lookup (mp->context);
336   if (!app)
337     return;
338
339   session_parse_handle (mp->handle, &index, &thread_index);
340   s = session_get_if_valid (index, thread_index);
341
342   /* No session or not the right session */
343   if (!s || s->session_state < SESSION_STATE_TRANSPORT_CLOSING)
344     return;
345
346   app_wrk = app_worker_get (s->app_wrk_index);
347   if (!app_wrk || app_wrk->app_index != app->app_index)
348     {
349       clib_warning ("App %u does not own handle 0x%lx!", app->app_index,
350                     mp->handle);
351       return;
352     }
353
354   /* Client objected to resetting the session, log and continue */
355   if (mp->retval)
356     {
357       clib_warning ("client retval %d", mp->retval);
358       return;
359     }
360
361   /* This comes as a response to a reset, transport only waiting for
362    * confirmation to remove connection state, no need to disconnect */
363   a->handle = mp->handle;
364   a->app_index = app->app_index;
365   vnet_disconnect_session (a);
366 }
367
368 static void
369 session_mq_disconnected_handler (void *data)
370 {
371   session_disconnected_reply_msg_t *rmp;
372   vnet_disconnect_args_t _a, *a = &_a;
373   svm_msg_q_msg_t _msg, *msg = &_msg;
374   session_disconnected_msg_t *mp;
375   app_worker_t *app_wrk;
376   session_event_t *evt;
377   session_t *s;
378   application_t *app;
379   int rv = 0;
380
381   mp = (session_disconnected_msg_t *) data;
382   if (!(s = session_get_from_handle_if_valid (mp->handle)))
383     {
384       clib_warning ("could not disconnect handle %llu", mp->handle);
385       return;
386     }
387   app_wrk = app_worker_get (s->app_wrk_index);
388   app = application_lookup (mp->client_index);
389   if (!(app_wrk && app && app->app_index == app_wrk->app_index))
390     {
391       clib_warning ("could not disconnect session: %llu app: %u",
392                     mp->handle, mp->client_index);
393       return;
394     }
395
396   a->handle = mp->handle;
397   a->app_index = app_wrk->wrk_index;
398   rv = vnet_disconnect_session (a);
399
400   svm_msg_q_lock_and_alloc_msg_w_ring (app_wrk->event_queue,
401                                        SESSION_MQ_CTRL_EVT_RING,
402                                        SVM_Q_WAIT, msg);
403   evt = svm_msg_q_msg_data (app_wrk->event_queue, msg);
404   clib_memset (evt, 0, sizeof (*evt));
405   evt->event_type = SESSION_CTRL_EVT_DISCONNECTED_REPLY;
406   rmp = (session_disconnected_reply_msg_t *) evt->data;
407   rmp->handle = mp->handle;
408   rmp->context = mp->context;
409   rmp->retval = rv;
410   svm_msg_q_add_and_unlock (app_wrk->event_queue, msg);
411 }
412
413 static void
414 session_mq_disconnected_reply_handler (void *data)
415 {
416   session_disconnected_reply_msg_t *mp;
417   vnet_disconnect_args_t _a, *a = &_a;
418   application_t *app;
419
420   mp = (session_disconnected_reply_msg_t *) data;
421
422   /* Client objected to disconnecting the session, log and continue */
423   if (mp->retval)
424     {
425       clib_warning ("client retval %d", mp->retval);
426       return;
427     }
428
429   /* Disconnect has been confirmed. Confirm close to transport */
430   app = application_lookup (mp->context);
431   if (app)
432     {
433       a->handle = mp->handle;
434       a->app_index = app->app_index;
435       vnet_disconnect_session (a);
436     }
437 }
438
439 static void
440 session_mq_worker_update_handler (void *data)
441 {
442   session_worker_update_msg_t *mp = (session_worker_update_msg_t *) data;
443   session_worker_update_reply_msg_t *rmp;
444   svm_msg_q_msg_t _msg, *msg = &_msg;
445   app_worker_t *app_wrk;
446   u32 owner_app_wrk_map;
447   session_event_t *evt;
448   session_t *s;
449   application_t *app;
450
451   app = application_lookup (mp->client_index);
452   if (!app)
453     return;
454   if (!(s = session_get_from_handle_if_valid (mp->handle)))
455     {
456       clib_warning ("invalid handle %llu", mp->handle);
457       return;
458     }
459   app_wrk = app_worker_get (s->app_wrk_index);
460   if (app_wrk->app_index != app->app_index)
461     {
462       clib_warning ("app %u does not own session %llu", app->app_index,
463                     mp->handle);
464       return;
465     }
466   owner_app_wrk_map = app_wrk->wrk_map_index;
467   app_wrk = application_get_worker (app, mp->wrk_index);
468
469   /* This needs to come from the new owner */
470   if (mp->req_wrk_index == owner_app_wrk_map)
471     {
472       session_req_worker_update_msg_t *wump;
473
474       svm_msg_q_lock_and_alloc_msg_w_ring (app_wrk->event_queue,
475                                            SESSION_MQ_CTRL_EVT_RING,
476                                            SVM_Q_WAIT, msg);
477       evt = svm_msg_q_msg_data (app_wrk->event_queue, msg);
478       clib_memset (evt, 0, sizeof (*evt));
479       evt->event_type = SESSION_CTRL_EVT_REQ_WORKER_UPDATE;
480       wump = (session_req_worker_update_msg_t *) evt->data;
481       wump->session_handle = mp->handle;
482       svm_msg_q_add_and_unlock (app_wrk->event_queue, msg);
483       return;
484     }
485
486   app_worker_own_session (app_wrk, s);
487
488   /*
489    * Send reply
490    */
491   svm_msg_q_lock_and_alloc_msg_w_ring (app_wrk->event_queue,
492                                        SESSION_MQ_CTRL_EVT_RING,
493                                        SVM_Q_WAIT, msg);
494   evt = svm_msg_q_msg_data (app_wrk->event_queue, msg);
495   clib_memset (evt, 0, sizeof (*evt));
496   evt->event_type = SESSION_CTRL_EVT_WORKER_UPDATE_REPLY;
497   rmp = (session_worker_update_reply_msg_t *) evt->data;
498   rmp->handle = mp->handle;
499   if (s->rx_fifo)
500     rmp->rx_fifo = fifo_segment_fifo_offset (s->rx_fifo);
501   if (s->tx_fifo)
502     rmp->tx_fifo = fifo_segment_fifo_offset (s->tx_fifo);
503   rmp->segment_handle = session_segment_handle (s);
504   svm_msg_q_add_and_unlock (app_wrk->event_queue, msg);
505
506   /*
507    * Retransmit messages that may have been lost
508    */
509   if (s->tx_fifo && !svm_fifo_is_empty (s->tx_fifo))
510     session_send_io_evt_to_thread (s->tx_fifo, SESSION_IO_EVT_TX);
511
512   if (s->rx_fifo && !svm_fifo_is_empty (s->rx_fifo))
513     app_worker_lock_and_send_event (app_wrk, s, SESSION_IO_EVT_RX);
514
515   if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSING)
516     app_worker_close_notify (app_wrk, s);
517 }
518
519 static void
520 session_mq_app_wrk_rpc_handler (void *data)
521 {
522   session_app_wrk_rpc_msg_t *mp = (session_app_wrk_rpc_msg_t *) data;
523   svm_msg_q_msg_t _msg, *msg = &_msg;
524   session_app_wrk_rpc_msg_t *rmp;
525   app_worker_t *app_wrk;
526   session_event_t *evt;
527   application_t *app;
528
529   app = application_lookup (mp->client_index);
530   if (!app)
531     return;
532
533   app_wrk = application_get_worker (app, mp->wrk_index);
534
535   svm_msg_q_lock_and_alloc_msg_w_ring (app_wrk->event_queue,
536                                        SESSION_MQ_CTRL_EVT_RING, SVM_Q_WAIT,
537                                        msg);
538   evt = svm_msg_q_msg_data (app_wrk->event_queue, msg);
539   clib_memset (evt, 0, sizeof (*evt));
540   evt->event_type = SESSION_CTRL_EVT_APP_WRK_RPC;
541   rmp = (session_app_wrk_rpc_msg_t *) evt->data;
542   clib_memcpy (rmp->data, mp->data, sizeof (mp->data));
543   svm_msg_q_add_and_unlock (app_wrk->event_queue, msg);
544 }
545
546 static void
547 session_mq_transport_attr_handler (void *data)
548 {
549   session_transport_attr_msg_t *mp = (session_transport_attr_msg_t *) data;
550   session_transport_attr_reply_msg_t *rmp;
551   svm_msg_q_msg_t _msg, *msg = &_msg;
552   app_worker_t *app_wrk;
553   session_event_t *evt;
554   application_t *app;
555   session_t *s;
556   int rv;
557
558   app = application_lookup (mp->client_index);
559   if (!app)
560     return;
561
562   if (!(s = session_get_from_handle_if_valid (mp->handle)))
563     {
564       clib_warning ("invalid handle %llu", mp->handle);
565       return;
566     }
567   app_wrk = app_worker_get (s->app_wrk_index);
568   if (app_wrk->app_index != app->app_index)
569     {
570       clib_warning ("app %u does not own session %llu", app->app_index,
571                     mp->handle);
572       return;
573     }
574
575   rv = session_transport_attribute (s, mp->is_get, &mp->attr);
576
577   svm_msg_q_lock_and_alloc_msg_w_ring (
578     app_wrk->event_queue, SESSION_MQ_CTRL_EVT_RING, SVM_Q_WAIT, msg);
579   evt = svm_msg_q_msg_data (app_wrk->event_queue, msg);
580   clib_memset (evt, 0, sizeof (*evt));
581   evt->event_type = SESSION_CTRL_EVT_TRANSPORT_ATTR_REPLY;
582   rmp = (session_transport_attr_reply_msg_t *) evt->data;
583   rmp->handle = mp->handle;
584   rmp->retval = rv;
585   rmp->is_get = mp->is_get;
586   if (!rv && mp->is_get)
587     rmp->attr = mp->attr;
588   svm_msg_q_add_and_unlock (app_wrk->event_queue, msg);
589 }
590
591 vlib_node_registration_t session_queue_node;
592
593 typedef struct
594 {
595   u32 session_index;
596   u32 server_thread_index;
597 } session_queue_trace_t;
598
599 /* packet trace format function */
600 static u8 *
601 format_session_queue_trace (u8 * s, va_list * args)
602 {
603   CLIB_UNUSED (vlib_main_t * vm) = va_arg (*args, vlib_main_t *);
604   CLIB_UNUSED (vlib_node_t * node) = va_arg (*args, vlib_node_t *);
605   session_queue_trace_t *t = va_arg (*args, session_queue_trace_t *);
606
607   s = format (s, "session index %d thread index %d",
608               t->session_index, t->server_thread_index);
609   return s;
610 }
611
612 #define foreach_session_queue_error             \
613 _(TX, "Packets transmitted")                    \
614 _(TIMER, "Timer events")                        \
615 _(NO_BUFFER, "Out of buffers")
616
617 typedef enum
618 {
619 #define _(sym,str) SESSION_QUEUE_ERROR_##sym,
620   foreach_session_queue_error
621 #undef _
622     SESSION_QUEUE_N_ERROR,
623 } session_queue_error_t;
624
625 static char *session_queue_error_strings[] = {
626 #define _(sym,string) string,
627   foreach_session_queue_error
628 #undef _
629 };
630
631 enum
632 {
633   SESSION_TX_NO_BUFFERS = -2,
634   SESSION_TX_NO_DATA,
635   SESSION_TX_OK
636 };
637
638 static void
639 session_tx_trace_frame (vlib_main_t * vm, vlib_node_runtime_t * node,
640                         u32 next_index, u32 * to_next, u16 n_segs,
641                         session_t * s, u32 n_trace)
642 {
643   while (n_trace && n_segs)
644     {
645       vlib_buffer_t *b = vlib_get_buffer (vm, to_next[0]);
646       if (PREDICT_TRUE
647           (vlib_trace_buffer
648            (vm, node, next_index, b, 1 /* follow_chain */ )))
649         {
650           session_queue_trace_t *t =
651             vlib_add_trace (vm, node, b, sizeof (*t));
652           t->session_index = s->session_index;
653           t->server_thread_index = s->thread_index;
654           n_trace--;
655         }
656       to_next++;
657       n_segs--;
658     }
659   vlib_set_trace_count (vm, node, n_trace);
660 }
661
662 always_inline void
663 session_tx_fifo_chain_tail (vlib_main_t * vm, session_tx_context_t * ctx,
664                             vlib_buffer_t * b, u16 * n_bufs, u8 peek_data)
665 {
666   vlib_buffer_t *chain_b, *prev_b;
667   u32 chain_bi0, to_deq, left_from_seg;
668   session_worker_t *wrk;
669   u16 len_to_deq, n_bytes_read;
670   u8 *data, j;
671
672   wrk = session_main_get_worker (ctx->s->thread_index);
673   b->flags |= VLIB_BUFFER_TOTAL_LENGTH_VALID;
674   b->total_length_not_including_first_buffer = 0;
675
676   chain_b = b;
677   left_from_seg = clib_min (ctx->sp.snd_mss - b->current_length,
678                             ctx->left_to_snd);
679   to_deq = left_from_seg;
680   for (j = 1; j < ctx->n_bufs_per_seg; j++)
681     {
682       prev_b = chain_b;
683       len_to_deq = clib_min (to_deq, ctx->deq_per_buf);
684
685       *n_bufs -= 1;
686       chain_bi0 = wrk->tx_buffers[*n_bufs];
687       chain_b = vlib_get_buffer (vm, chain_bi0);
688       chain_b->current_data = 0;
689       data = vlib_buffer_get_current (chain_b);
690       if (peek_data)
691         {
692           n_bytes_read = svm_fifo_peek (ctx->s->tx_fifo,
693                                         ctx->sp.tx_offset, len_to_deq, data);
694           ctx->sp.tx_offset += n_bytes_read;
695         }
696       else
697         {
698           if (ctx->transport_vft->transport_options.tx_type ==
699               TRANSPORT_TX_DGRAM)
700             {
701               svm_fifo_t *f = ctx->s->tx_fifo;
702               session_dgram_hdr_t *hdr = &ctx->hdr;
703               u16 deq_now;
704               u32 offset;
705
706               deq_now = clib_min (hdr->data_length - hdr->data_offset,
707                                   len_to_deq);
708               offset = hdr->data_offset + SESSION_CONN_HDR_LEN;
709               n_bytes_read = svm_fifo_peek (f, offset, deq_now, data);
710               ASSERT (n_bytes_read > 0);
711
712               hdr->data_offset += n_bytes_read;
713               if (hdr->data_offset == hdr->data_length)
714                 {
715                   offset = hdr->data_length + SESSION_CONN_HDR_LEN;
716                   svm_fifo_dequeue_drop (f, offset);
717                   if (ctx->left_to_snd > n_bytes_read)
718                     svm_fifo_peek (ctx->s->tx_fifo, 0, sizeof (ctx->hdr),
719                                    (u8 *) & ctx->hdr);
720                 }
721               else if (ctx->left_to_snd == n_bytes_read)
722                 svm_fifo_overwrite_head (ctx->s->tx_fifo, (u8 *) & ctx->hdr,
723                                          sizeof (session_dgram_pre_hdr_t));
724             }
725           else
726             n_bytes_read = svm_fifo_dequeue (ctx->s->tx_fifo,
727                                              len_to_deq, data);
728         }
729       ASSERT (n_bytes_read == len_to_deq);
730       chain_b->current_length = n_bytes_read;
731       b->total_length_not_including_first_buffer += chain_b->current_length;
732
733       /* update previous buffer */
734       prev_b->next_buffer = chain_bi0;
735       prev_b->flags |= VLIB_BUFFER_NEXT_PRESENT;
736
737       /* update current buffer */
738       chain_b->next_buffer = 0;
739
740       to_deq -= n_bytes_read;
741       if (to_deq == 0)
742         break;
743     }
744   ASSERT (to_deq == 0
745           && b->total_length_not_including_first_buffer == left_from_seg);
746   ctx->left_to_snd -= left_from_seg;
747 }
748
749 always_inline void
750 session_tx_fill_buffer (vlib_main_t * vm, session_tx_context_t * ctx,
751                         vlib_buffer_t * b, u16 * n_bufs, u8 peek_data)
752 {
753   u32 len_to_deq;
754   u8 *data0;
755   int n_bytes_read;
756
757   /*
758    * Start with the first buffer in chain
759    */
760   b->error = 0;
761   b->flags = VNET_BUFFER_F_LOCALLY_ORIGINATED;
762   b->current_data = 0;
763
764   data0 = vlib_buffer_make_headroom (b, TRANSPORT_MAX_HDRS_LEN);
765   len_to_deq = clib_min (ctx->left_to_snd, ctx->deq_per_first_buf);
766
767   if (peek_data)
768     {
769       n_bytes_read = svm_fifo_peek (ctx->s->tx_fifo, ctx->sp.tx_offset,
770                                     len_to_deq, data0);
771       ASSERT (n_bytes_read > 0);
772       /* Keep track of progress locally, transport is also supposed to
773        * increment it independently when pushing the header */
774       ctx->sp.tx_offset += n_bytes_read;
775     }
776   else
777     {
778       if (ctx->transport_vft->transport_options.tx_type == TRANSPORT_TX_DGRAM)
779         {
780           session_dgram_hdr_t *hdr = &ctx->hdr;
781           svm_fifo_t *f = ctx->s->tx_fifo;
782           u16 deq_now;
783           u32 offset;
784
785           ASSERT (hdr->data_length > hdr->data_offset);
786           deq_now = clib_min (hdr->data_length - hdr->data_offset,
787                               len_to_deq);
788           offset = hdr->data_offset + SESSION_CONN_HDR_LEN;
789           n_bytes_read = svm_fifo_peek (f, offset, deq_now, data0);
790           ASSERT (n_bytes_read > 0);
791
792           if (ctx->s->session_state == SESSION_STATE_LISTENING)
793             {
794               ip_copy (&ctx->tc->rmt_ip, &hdr->rmt_ip, ctx->tc->is_ip4);
795               ctx->tc->rmt_port = hdr->rmt_port;
796             }
797           hdr->data_offset += n_bytes_read;
798           if (hdr->data_offset == hdr->data_length)
799             {
800               offset = hdr->data_length + SESSION_CONN_HDR_LEN;
801               svm_fifo_dequeue_drop (f, offset);
802               if (ctx->left_to_snd > n_bytes_read)
803                 svm_fifo_peek (ctx->s->tx_fifo, 0, sizeof (ctx->hdr),
804                                (u8 *) & ctx->hdr);
805             }
806           else if (ctx->left_to_snd == n_bytes_read)
807             svm_fifo_overwrite_head (ctx->s->tx_fifo, (u8 *) & ctx->hdr,
808                                      sizeof (session_dgram_pre_hdr_t));
809         }
810       else
811         {
812           n_bytes_read = svm_fifo_dequeue (ctx->s->tx_fifo,
813                                            len_to_deq, data0);
814           ASSERT (n_bytes_read > 0);
815         }
816     }
817   b->current_length = n_bytes_read;
818   ctx->left_to_snd -= n_bytes_read;
819
820   /*
821    * Fill in the remaining buffers in the chain, if any
822    */
823   if (PREDICT_FALSE (ctx->n_bufs_per_seg > 1 && ctx->left_to_snd))
824     session_tx_fifo_chain_tail (vm, ctx, b, n_bufs, peek_data);
825 }
826
827 always_inline u8
828 session_tx_not_ready (session_t * s, u8 peek_data)
829 {
830   if (peek_data)
831     {
832       if (PREDICT_TRUE (s->session_state == SESSION_STATE_READY))
833         return 0;
834       /* Can retransmit for closed sessions but can't send new data if
835        * session is not ready or closed */
836       else if (s->session_state < SESSION_STATE_READY)
837         return 1;
838       else if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSED)
839         {
840           /* Allow closed transports to still send custom packets.
841            * For instance, tcp may want to send acks in time-wait. */
842           if (s->session_state != SESSION_STATE_TRANSPORT_DELETED
843               && (s->flags & SESSION_F_CUSTOM_TX))
844             return 0;
845           return 2;
846         }
847     }
848   return 0;
849 }
850
851 always_inline transport_connection_t *
852 session_tx_get_transport (session_tx_context_t * ctx, u8 peek_data)
853 {
854   if (peek_data)
855     {
856       return ctx->transport_vft->get_connection (ctx->s->connection_index,
857                                                  ctx->s->thread_index);
858     }
859   else
860     {
861       if (ctx->s->session_state == SESSION_STATE_LISTENING)
862         return ctx->transport_vft->get_listener (ctx->s->connection_index);
863       else
864         {
865           return ctx->transport_vft->get_connection (ctx->s->connection_index,
866                                                      ctx->s->thread_index);
867         }
868     }
869 }
870
871 always_inline void
872 session_tx_set_dequeue_params (vlib_main_t * vm, session_tx_context_t * ctx,
873                                u32 max_segs, u8 peek_data)
874 {
875   u32 n_bytes_per_buf, n_bytes_per_seg;
876
877   n_bytes_per_buf = vlib_buffer_get_default_data_size (vm);
878   ctx->max_dequeue = svm_fifo_max_dequeue_cons (ctx->s->tx_fifo);
879
880   if (peek_data)
881     {
882       /* Offset in rx fifo from where to peek data */
883       if (PREDICT_FALSE (ctx->sp.tx_offset >= ctx->max_dequeue))
884         {
885           ctx->max_len_to_snd = 0;
886           return;
887         }
888       ctx->max_dequeue -= ctx->sp.tx_offset;
889     }
890   else
891     {
892       if (ctx->transport_vft->transport_options.tx_type == TRANSPORT_TX_DGRAM)
893         {
894           u32 len, chain_limit;
895
896           if (ctx->max_dequeue <= sizeof (ctx->hdr))
897             {
898               ctx->max_len_to_snd = 0;
899               return;
900             }
901
902           svm_fifo_peek (ctx->s->tx_fifo, 0, sizeof (ctx->hdr),
903                          (u8 *) & ctx->hdr);
904           ASSERT (ctx->hdr.data_length > ctx->hdr.data_offset);
905           len = ctx->hdr.data_length - ctx->hdr.data_offset;
906
907           /* Process multiple dgrams if smaller than min (buf_space, mss).
908            * This avoids handling multiple dgrams if they require buffer
909            * chains */
910           chain_limit = clib_min (n_bytes_per_buf - TRANSPORT_MAX_HDRS_LEN,
911                                   ctx->sp.snd_mss);
912           if (ctx->hdr.data_length <= chain_limit)
913             {
914               u32 first_dgram_len, dgram_len, offset, max_offset;
915               session_dgram_hdr_t hdr;
916
917               ctx->sp.snd_mss = clib_min (ctx->sp.snd_mss, len);
918               offset = ctx->hdr.data_length + sizeof (session_dgram_hdr_t);
919               first_dgram_len = len;
920               max_offset = clib_min (ctx->max_dequeue, 16 << 10);
921
922               while (offset < max_offset)
923                 {
924                   svm_fifo_peek (ctx->s->tx_fifo, offset, sizeof (ctx->hdr),
925                                  (u8 *) & hdr);
926                   ASSERT (hdr.data_length > hdr.data_offset);
927                   dgram_len = hdr.data_length - hdr.data_offset;
928                   if (len + dgram_len > ctx->max_dequeue
929                       || first_dgram_len != dgram_len)
930                     break;
931                   len += dgram_len;
932                   offset += sizeof (hdr) + hdr.data_length;
933                 }
934             }
935
936           ctx->max_dequeue = len;
937         }
938     }
939   ASSERT (ctx->max_dequeue > 0);
940
941   /* Ensure we're not writing more than transport window allows */
942   if (ctx->max_dequeue < ctx->sp.snd_space)
943     {
944       /* Constrained by tx queue. Try to send only fully formed segments */
945       ctx->max_len_to_snd = (ctx->max_dequeue > ctx->sp.snd_mss) ?
946         (ctx->max_dequeue - (ctx->max_dequeue % ctx->sp.snd_mss)) :
947         ctx->max_dequeue;
948       /* TODO Nagle ? */
949     }
950   else
951     {
952       /* Expectation is that snd_space0 is already a multiple of snd_mss */
953       ctx->max_len_to_snd = ctx->sp.snd_space;
954     }
955
956   /* Check if we're tx constrained by the node */
957   ctx->n_segs_per_evt = ceil ((f64) ctx->max_len_to_snd / ctx->sp.snd_mss);
958   if (ctx->n_segs_per_evt > max_segs)
959     {
960       ctx->n_segs_per_evt = max_segs;
961       ctx->max_len_to_snd = max_segs * ctx->sp.snd_mss;
962     }
963
964   ASSERT (n_bytes_per_buf > TRANSPORT_MAX_HDRS_LEN);
965   if (ctx->n_segs_per_evt > 1)
966     {
967       u32 n_bytes_last_seg, n_bufs_last_seg;
968
969       n_bytes_per_seg = TRANSPORT_MAX_HDRS_LEN + ctx->sp.snd_mss;
970       n_bytes_last_seg = TRANSPORT_MAX_HDRS_LEN + ctx->max_len_to_snd
971         - ((ctx->n_segs_per_evt - 1) * ctx->sp.snd_mss);
972       ctx->n_bufs_per_seg = ceil ((f64) n_bytes_per_seg / n_bytes_per_buf);
973       n_bufs_last_seg = ceil ((f64) n_bytes_last_seg / n_bytes_per_buf);
974       ctx->n_bufs_needed = ((ctx->n_segs_per_evt - 1) * ctx->n_bufs_per_seg)
975         + n_bufs_last_seg;
976     }
977   else
978     {
979       n_bytes_per_seg = TRANSPORT_MAX_HDRS_LEN + ctx->max_len_to_snd;
980       ctx->n_bufs_per_seg = ceil ((f64) n_bytes_per_seg / n_bytes_per_buf);
981       ctx->n_bufs_needed = ctx->n_bufs_per_seg;
982     }
983
984   ctx->deq_per_buf = clib_min (ctx->sp.snd_mss, n_bytes_per_buf);
985   ctx->deq_per_first_buf = clib_min (ctx->sp.snd_mss,
986                                      n_bytes_per_buf -
987                                      TRANSPORT_MAX_HDRS_LEN);
988 }
989
990 always_inline void
991 session_tx_maybe_reschedule (session_worker_t * wrk,
992                              session_tx_context_t * ctx,
993                              session_evt_elt_t * elt)
994 {
995   session_t *s = ctx->s;
996
997   svm_fifo_unset_event (s->tx_fifo);
998   if (svm_fifo_max_dequeue_cons (s->tx_fifo) > ctx->sp.tx_offset)
999     if (svm_fifo_set_event (s->tx_fifo))
1000       session_evt_add_head_old (wrk, elt);
1001 }
1002
1003 always_inline int
1004 session_tx_fifo_read_and_snd_i (session_worker_t * wrk,
1005                                 vlib_node_runtime_t * node,
1006                                 session_evt_elt_t * elt,
1007                                 int *n_tx_packets, u8 peek_data)
1008 {
1009   u32 n_trace, n_left, pbi, next_index, max_burst;
1010   session_tx_context_t *ctx = &wrk->ctx;
1011   session_main_t *smm = &session_main;
1012   session_event_t *e = &elt->evt;
1013   vlib_main_t *vm = wrk->vm;
1014   transport_proto_t tp;
1015   vlib_buffer_t *pb;
1016   u16 n_bufs, rv;
1017
1018   if (PREDICT_FALSE ((rv = session_tx_not_ready (ctx->s, peek_data))))
1019     {
1020       if (rv < 2)
1021         session_evt_add_old (wrk, elt);
1022       return SESSION_TX_NO_DATA;
1023     }
1024
1025   next_index = smm->session_type_to_next[ctx->s->session_type];
1026   max_burst = SESSION_NODE_FRAME_SIZE - *n_tx_packets;
1027
1028   tp = session_get_transport_proto (ctx->s);
1029   ctx->transport_vft = transport_protocol_get_vft (tp);
1030   ctx->tc = session_tx_get_transport (ctx, peek_data);
1031
1032   if (PREDICT_FALSE (e->event_type == SESSION_IO_EVT_TX_FLUSH))
1033     {
1034       if (ctx->transport_vft->flush_data)
1035         ctx->transport_vft->flush_data (ctx->tc);
1036       e->event_type = SESSION_IO_EVT_TX;
1037     }
1038
1039   if (ctx->s->flags & SESSION_F_CUSTOM_TX)
1040     {
1041       u32 n_custom_tx;
1042       ctx->s->flags &= ~SESSION_F_CUSTOM_TX;
1043       ctx->sp.max_burst_size = max_burst;
1044       n_custom_tx = ctx->transport_vft->custom_tx (ctx->tc, &ctx->sp);
1045       *n_tx_packets += n_custom_tx;
1046       if (PREDICT_FALSE
1047           (ctx->s->session_state >= SESSION_STATE_TRANSPORT_CLOSED))
1048         return SESSION_TX_OK;
1049       max_burst -= n_custom_tx;
1050       if (!max_burst || (ctx->s->flags & SESSION_F_CUSTOM_TX))
1051         {
1052           session_evt_add_old (wrk, elt);
1053           return SESSION_TX_OK;
1054         }
1055     }
1056
1057   transport_connection_snd_params (ctx->tc, &ctx->sp);
1058
1059   if (!ctx->sp.snd_space)
1060     {
1061       /* If the deschedule flag was set, remove session from scheduler.
1062        * Transport is responsible for rescheduling this session. */
1063       if (ctx->sp.flags & TRANSPORT_SND_F_DESCHED)
1064         transport_connection_deschedule (ctx->tc);
1065       /* Request to postpone the session, e.g., zero-wnd and transport
1066        * is not currently probing */
1067       else if (ctx->sp.flags & TRANSPORT_SND_F_POSTPONE)
1068         session_evt_add_old (wrk, elt);
1069       /* This flow queue is "empty" so it should be re-evaluated before
1070        * the ones that have data to send. */
1071       else
1072         session_evt_add_head_old (wrk, elt);
1073
1074       return SESSION_TX_NO_DATA;
1075     }
1076
1077   if (transport_connection_is_tx_paced (ctx->tc))
1078     {
1079       u32 snd_space = transport_connection_tx_pacer_burst (ctx->tc);
1080       if (snd_space < TRANSPORT_PACER_MIN_BURST)
1081         {
1082           session_evt_add_head_old (wrk, elt);
1083           return SESSION_TX_NO_DATA;
1084         }
1085       snd_space = clib_min (ctx->sp.snd_space, snd_space);
1086       ctx->sp.snd_space = snd_space >= ctx->sp.snd_mss ?
1087         snd_space - snd_space % ctx->sp.snd_mss : snd_space;
1088     }
1089
1090   /* Check how much we can pull. */
1091   session_tx_set_dequeue_params (vm, ctx, max_burst, peek_data);
1092
1093   if (PREDICT_FALSE (!ctx->max_len_to_snd))
1094     {
1095       transport_connection_tx_pacer_reset_bucket (ctx->tc, 0);
1096       session_tx_maybe_reschedule (wrk, ctx, elt);
1097       return SESSION_TX_NO_DATA;
1098     }
1099
1100   vec_validate_aligned (wrk->tx_buffers, ctx->n_bufs_needed - 1,
1101                         CLIB_CACHE_LINE_BYTES);
1102   n_bufs = vlib_buffer_alloc (vm, wrk->tx_buffers, ctx->n_bufs_needed);
1103   if (PREDICT_FALSE (n_bufs < ctx->n_bufs_needed))
1104     {
1105       if (n_bufs)
1106         vlib_buffer_free (vm, wrk->tx_buffers, n_bufs);
1107       session_evt_add_head_old (wrk, elt);
1108       vlib_node_increment_counter (wrk->vm, node->node_index,
1109                                    SESSION_QUEUE_ERROR_NO_BUFFER, 1);
1110       return SESSION_TX_NO_BUFFERS;
1111     }
1112
1113   if (transport_connection_is_tx_paced (ctx->tc))
1114     transport_connection_tx_pacer_update_bytes (ctx->tc, ctx->max_len_to_snd);
1115
1116   ctx->left_to_snd = ctx->max_len_to_snd;
1117   n_left = ctx->n_segs_per_evt;
1118
1119   while (n_left >= 4)
1120     {
1121       vlib_buffer_t *b0, *b1;
1122       u32 bi0, bi1;
1123
1124       pbi = wrk->tx_buffers[n_bufs - 3];
1125       pb = vlib_get_buffer (vm, pbi);
1126       vlib_prefetch_buffer_header (pb, STORE);
1127       pbi = wrk->tx_buffers[n_bufs - 4];
1128       pb = vlib_get_buffer (vm, pbi);
1129       vlib_prefetch_buffer_header (pb, STORE);
1130
1131       bi0 = wrk->tx_buffers[--n_bufs];
1132       bi1 = wrk->tx_buffers[--n_bufs];
1133
1134       b0 = vlib_get_buffer (vm, bi0);
1135       b1 = vlib_get_buffer (vm, bi1);
1136
1137       session_tx_fill_buffer (vm, ctx, b0, &n_bufs, peek_data);
1138       session_tx_fill_buffer (vm, ctx, b1, &n_bufs, peek_data);
1139
1140       ctx->transport_vft->push_header (ctx->tc, b0);
1141       ctx->transport_vft->push_header (ctx->tc, b1);
1142
1143       n_left -= 2;
1144
1145       VLIB_BUFFER_TRACE_TRAJECTORY_INIT (b0);
1146       VLIB_BUFFER_TRACE_TRAJECTORY_INIT (b1);
1147
1148       vec_add1 (wrk->pending_tx_buffers, bi0);
1149       vec_add1 (wrk->pending_tx_buffers, bi1);
1150       vec_add1 (wrk->pending_tx_nexts, next_index);
1151       vec_add1 (wrk->pending_tx_nexts, next_index);
1152     }
1153   while (n_left)
1154     {
1155       vlib_buffer_t *b0;
1156       u32 bi0;
1157
1158       if (n_left > 1)
1159         {
1160           pbi = wrk->tx_buffers[n_bufs - 2];
1161           pb = vlib_get_buffer (vm, pbi);
1162           vlib_prefetch_buffer_header (pb, STORE);
1163         }
1164
1165       bi0 = wrk->tx_buffers[--n_bufs];
1166       b0 = vlib_get_buffer (vm, bi0);
1167       session_tx_fill_buffer (vm, ctx, b0, &n_bufs, peek_data);
1168
1169       /* Ask transport to push header after current_length and
1170        * total_length_not_including_first_buffer are updated */
1171       ctx->transport_vft->push_header (ctx->tc, b0);
1172
1173       n_left -= 1;
1174
1175       VLIB_BUFFER_TRACE_TRAJECTORY_INIT (b0);
1176
1177       vec_add1 (wrk->pending_tx_buffers, bi0);
1178       vec_add1 (wrk->pending_tx_nexts, next_index);
1179     }
1180
1181   if (PREDICT_FALSE ((n_trace = vlib_get_trace_count (vm, node)) > 0))
1182     session_tx_trace_frame (vm, node, next_index, wrk->pending_tx_buffers,
1183                             ctx->n_segs_per_evt, ctx->s, n_trace);
1184
1185   if (PREDICT_FALSE (n_bufs))
1186     vlib_buffer_free (vm, wrk->tx_buffers, n_bufs);
1187
1188   *n_tx_packets += ctx->n_segs_per_evt;
1189
1190   SESSION_EVT (SESSION_EVT_DEQ, ctx->s, ctx->max_len_to_snd, ctx->max_dequeue,
1191                ctx->s->tx_fifo->has_event, wrk->last_vlib_time);
1192
1193   ASSERT (ctx->left_to_snd == 0);
1194
1195   /* If we couldn't dequeue all bytes reschedule as old flow. Otherwise,
1196    * check if application enqueued more data and reschedule accordingly */
1197   if (ctx->max_len_to_snd < ctx->max_dequeue)
1198     session_evt_add_old (wrk, elt);
1199   else
1200     session_tx_maybe_reschedule (wrk, ctx, elt);
1201
1202   if (!peek_data)
1203     {
1204       u32 n_dequeued = ctx->max_len_to_snd;
1205       if (ctx->transport_vft->transport_options.tx_type == TRANSPORT_TX_DGRAM)
1206         n_dequeued += ctx->n_segs_per_evt * SESSION_CONN_HDR_LEN;
1207       if (svm_fifo_needs_deq_ntf (ctx->s->tx_fifo, n_dequeued))
1208         session_dequeue_notify (ctx->s);
1209     }
1210   return SESSION_TX_OK;
1211 }
1212
1213 int
1214 session_tx_fifo_peek_and_snd (session_worker_t * wrk,
1215                               vlib_node_runtime_t * node,
1216                               session_evt_elt_t * e, int *n_tx_packets)
1217 {
1218   return session_tx_fifo_read_and_snd_i (wrk, node, e, n_tx_packets, 1);
1219 }
1220
1221 int
1222 session_tx_fifo_dequeue_and_snd (session_worker_t * wrk,
1223                                  vlib_node_runtime_t * node,
1224                                  session_evt_elt_t * e, int *n_tx_packets)
1225 {
1226   return session_tx_fifo_read_and_snd_i (wrk, node, e, n_tx_packets, 0);
1227 }
1228
1229 int
1230 session_tx_fifo_dequeue_internal (session_worker_t * wrk,
1231                                   vlib_node_runtime_t * node,
1232                                   session_evt_elt_t * elt, int *n_tx_packets)
1233 {
1234   transport_send_params_t *sp = &wrk->ctx.sp;
1235   session_t *s = wrk->ctx.s;
1236   u32 n_packets;
1237
1238   if (PREDICT_FALSE (s->session_state >= SESSION_STATE_TRANSPORT_CLOSED))
1239     return 0;
1240
1241   /* Clear custom-tx flag used to request reschedule for tx */
1242   s->flags &= ~SESSION_F_CUSTOM_TX;
1243
1244   sp->max_burst_size = clib_min (SESSION_NODE_FRAME_SIZE - *n_tx_packets,
1245                                  TRANSPORT_PACER_MAX_BURST_PKTS);
1246
1247   n_packets = transport_custom_tx (session_get_transport_proto (s), s, sp);
1248   *n_tx_packets += n_packets;
1249
1250   if (s->flags & SESSION_F_CUSTOM_TX)
1251     {
1252       session_evt_add_old (wrk, elt);
1253     }
1254   else if (!(sp->flags & TRANSPORT_SND_F_DESCHED))
1255     {
1256       svm_fifo_unset_event (s->tx_fifo);
1257       if (svm_fifo_max_dequeue_cons (s->tx_fifo))
1258         if (svm_fifo_set_event (s->tx_fifo))
1259           session_evt_add_head_old (wrk, elt);
1260     }
1261
1262   if (sp->max_burst_size &&
1263       svm_fifo_needs_deq_ntf (s->tx_fifo, sp->max_burst_size))
1264     session_dequeue_notify (s);
1265
1266   return n_packets;
1267 }
1268
1269 always_inline session_t *
1270 session_event_get_session (session_worker_t * wrk, session_event_t * e)
1271 {
1272   if (PREDICT_FALSE (pool_is_free_index (wrk->sessions, e->session_index)))
1273     return 0;
1274
1275   ASSERT (session_is_valid (e->session_index, wrk->vm->thread_index));
1276   return pool_elt_at_index (wrk->sessions, e->session_index);
1277 }
1278
1279 always_inline void
1280 session_event_dispatch_ctrl (session_worker_t * wrk, session_evt_elt_t * elt)
1281 {
1282   clib_llist_index_t ei;
1283   void (*fp) (void *);
1284   session_event_t *e;
1285   session_t *s;
1286
1287   ei = clib_llist_entry_index (wrk->event_elts, elt);
1288   e = &elt->evt;
1289
1290   switch (e->event_type)
1291     {
1292     case SESSION_CTRL_EVT_RPC:
1293       fp = e->rpc_args.fp;
1294       (*fp) (e->rpc_args.arg);
1295       break;
1296     case SESSION_CTRL_EVT_CLOSE:
1297       s = session_get_from_handle_if_valid (e->session_handle);
1298       if (PREDICT_FALSE (!s))
1299         break;
1300       session_transport_close (s);
1301       break;
1302     case SESSION_CTRL_EVT_RESET:
1303       s = session_get_from_handle_if_valid (e->session_handle);
1304       if (PREDICT_FALSE (!s))
1305         break;
1306       session_transport_reset (s);
1307       break;
1308     case SESSION_CTRL_EVT_LISTEN:
1309       session_mq_listen_handler (session_evt_ctrl_data (wrk, elt));
1310       break;
1311     case SESSION_CTRL_EVT_LISTEN_URI:
1312       session_mq_listen_uri_handler (session_evt_ctrl_data (wrk, elt));
1313       break;
1314     case SESSION_CTRL_EVT_UNLISTEN:
1315       session_mq_unlisten_handler (session_evt_ctrl_data (wrk, elt));
1316       break;
1317     case SESSION_CTRL_EVT_CONNECT:
1318       session_mq_connect_handler (session_evt_ctrl_data (wrk, elt));
1319       break;
1320     case SESSION_CTRL_EVT_CONNECT_URI:
1321       session_mq_connect_uri_handler (session_evt_ctrl_data (wrk, elt));
1322       break;
1323     case SESSION_CTRL_EVT_DISCONNECT:
1324       session_mq_disconnect_handler (session_evt_ctrl_data (wrk, elt));
1325       break;
1326     case SESSION_CTRL_EVT_DISCONNECTED:
1327       session_mq_disconnected_handler (session_evt_ctrl_data (wrk, elt));
1328       break;
1329     case SESSION_CTRL_EVT_ACCEPTED_REPLY:
1330       session_mq_accepted_reply_handler (session_evt_ctrl_data (wrk, elt));
1331       break;
1332     case SESSION_CTRL_EVT_DISCONNECTED_REPLY:
1333       session_mq_disconnected_reply_handler (session_evt_ctrl_data (wrk,
1334                                                                     elt));
1335       break;
1336     case SESSION_CTRL_EVT_RESET_REPLY:
1337       session_mq_reset_reply_handler (session_evt_ctrl_data (wrk, elt));
1338       break;
1339     case SESSION_CTRL_EVT_WORKER_UPDATE:
1340       session_mq_worker_update_handler (session_evt_ctrl_data (wrk, elt));
1341       break;
1342     case SESSION_CTRL_EVT_APP_DETACH:
1343       app_mq_detach_handler (session_evt_ctrl_data (wrk, elt));
1344       break;
1345     case SESSION_CTRL_EVT_APP_WRK_RPC:
1346       session_mq_app_wrk_rpc_handler (session_evt_ctrl_data (wrk, elt));
1347       break;
1348     case SESSION_CTRL_EVT_TRANSPORT_ATTR:
1349       session_mq_transport_attr_handler (session_evt_ctrl_data (wrk, elt));
1350       break;
1351     default:
1352       clib_warning ("unhandled event type %d", e->event_type);
1353     }
1354
1355   /* Regrab elements in case pool moved */
1356   elt = pool_elt_at_index (wrk->event_elts, ei);
1357   if (!clib_llist_elt_is_linked (elt, evt_list))
1358     {
1359       e = &elt->evt;
1360       if (e->event_type >= SESSION_CTRL_EVT_BOUND)
1361         session_evt_ctrl_data_free (wrk, elt);
1362       session_evt_elt_free (wrk, elt);
1363     }
1364   SESSION_EVT (SESSION_EVT_COUNTS, CNT_CTRL_EVTS, 1, wrk);
1365 }
1366
1367 always_inline void
1368 session_event_dispatch_io (session_worker_t * wrk, vlib_node_runtime_t * node,
1369                            session_evt_elt_t * elt, int *n_tx_packets)
1370 {
1371   session_main_t *smm = &session_main;
1372   app_worker_t *app_wrk;
1373   clib_llist_index_t ei;
1374   session_event_t *e;
1375   session_t *s;
1376
1377   ei = clib_llist_entry_index (wrk->event_elts, elt);
1378   e = &elt->evt;
1379
1380   switch (e->event_type)
1381     {
1382     case SESSION_IO_EVT_TX_FLUSH:
1383     case SESSION_IO_EVT_TX:
1384       s = session_event_get_session (wrk, e);
1385       if (PREDICT_FALSE (!s))
1386         break;
1387       CLIB_PREFETCH (s->tx_fifo, 2 * CLIB_CACHE_LINE_BYTES, LOAD);
1388       wrk->ctx.s = s;
1389       /* Spray packets in per session type frames, since they go to
1390        * different nodes */
1391       (smm->session_tx_fns[s->session_type]) (wrk, node, elt, n_tx_packets);
1392       break;
1393     case SESSION_IO_EVT_RX:
1394       s = session_event_get_session (wrk, e);
1395       if (!s)
1396         break;
1397       transport_app_rx_evt (session_get_transport_proto (s),
1398                             s->connection_index, s->thread_index);
1399       break;
1400     case SESSION_IO_EVT_BUILTIN_RX:
1401       s = session_event_get_session (wrk, e);
1402       if (PREDICT_FALSE (!s || s->session_state >= SESSION_STATE_CLOSING))
1403         break;
1404       svm_fifo_unset_event (s->rx_fifo);
1405       app_wrk = app_worker_get (s->app_wrk_index);
1406       app_worker_builtin_rx (app_wrk, s);
1407       break;
1408     case SESSION_IO_EVT_BUILTIN_TX:
1409       s = session_get_from_handle_if_valid (e->session_handle);
1410       wrk->ctx.s = s;
1411       if (PREDICT_TRUE (s != 0))
1412         session_tx_fifo_dequeue_internal (wrk, node, elt, n_tx_packets);
1413       break;
1414     default:
1415       clib_warning ("unhandled event type %d", e->event_type);
1416     }
1417
1418   SESSION_EVT (SESSION_IO_EVT_COUNTS, e->event_type, 1, wrk);
1419
1420   /* Regrab elements in case pool moved */
1421   elt = pool_elt_at_index (wrk->event_elts, ei);
1422   if (!clib_llist_elt_is_linked (elt, evt_list))
1423     session_evt_elt_free (wrk, elt);
1424 }
1425
1426 /* *INDENT-OFF* */
1427 static const u32 session_evt_msg_sizes[] = {
1428 #define _(symc, sym)                                                    \
1429   [SESSION_CTRL_EVT_ ## symc] = sizeof (session_ ## sym ##_msg_t),
1430   foreach_session_ctrl_evt
1431 #undef _
1432 };
1433 /* *INDENT-ON* */
1434
1435 always_inline void
1436 session_evt_add_to_list (session_worker_t * wrk, session_event_t * evt)
1437 {
1438   session_evt_elt_t *elt;
1439
1440   if (evt->event_type >= SESSION_CTRL_EVT_RPC)
1441     {
1442       elt = session_evt_alloc_ctrl (wrk);
1443       if (evt->event_type >= SESSION_CTRL_EVT_BOUND)
1444         {
1445           elt->evt.ctrl_data_index = session_evt_ctrl_data_alloc (wrk);
1446           elt->evt.event_type = evt->event_type;
1447           clib_memcpy_fast (session_evt_ctrl_data (wrk, elt), evt->data,
1448                             session_evt_msg_sizes[evt->event_type]);
1449         }
1450       else
1451         {
1452           /* Internal control events fit into io events footprint */
1453           clib_memcpy_fast (&elt->evt, evt, sizeof (elt->evt));
1454         }
1455     }
1456   else
1457     {
1458       elt = session_evt_alloc_new (wrk);
1459       clib_memcpy_fast (&elt->evt, evt, sizeof (elt->evt));
1460     }
1461 }
1462
1463 static void
1464 session_flush_pending_tx_buffers (session_worker_t * wrk,
1465                                   vlib_node_runtime_t * node)
1466 {
1467   vlib_buffer_enqueue_to_next (wrk->vm, node, wrk->pending_tx_buffers,
1468                                wrk->pending_tx_nexts,
1469                                vec_len (wrk->pending_tx_nexts));
1470   vec_reset_length (wrk->pending_tx_buffers);
1471   vec_reset_length (wrk->pending_tx_nexts);
1472 }
1473
1474 int
1475 session_wrk_handle_mq (session_worker_t *wrk, svm_msg_q_t *mq)
1476 {
1477   svm_msg_q_msg_t _msg, *msg = &_msg;
1478   u32 i, n_to_dequeue = 0;
1479   session_event_t *evt;
1480
1481   n_to_dequeue = svm_msg_q_size (mq);
1482   for (i = 0; i < n_to_dequeue; i++)
1483     {
1484       svm_msg_q_sub_raw (mq, msg);
1485       evt = svm_msg_q_msg_data (mq, msg);
1486       session_evt_add_to_list (wrk, evt);
1487       svm_msg_q_free_msg (mq, msg);
1488     }
1489
1490   return n_to_dequeue;
1491 }
1492
1493 static void
1494 session_wrk_timerfd_update (session_worker_t *wrk, u64 time_ns)
1495 {
1496   struct itimerspec its;
1497
1498   its.it_value.tv_sec = 0;
1499   its.it_value.tv_nsec = time_ns;
1500   its.it_interval.tv_sec = 0;
1501   its.it_interval.tv_nsec = its.it_value.tv_nsec;
1502
1503   if (timerfd_settime (wrk->timerfd, 0, &its, NULL) == -1)
1504     clib_warning ("timerfd_settime");
1505 }
1506
1507 always_inline u64
1508 session_wrk_tfd_timeout (session_wrk_state_t state, u32 thread_index)
1509 {
1510   if (state == SESSION_WRK_INTERRUPT)
1511     return thread_index ? 1e6 : vlib_num_workers () ? 5e8 : 1e6;
1512   else if (state == SESSION_WRK_IDLE)
1513     return thread_index ? 1e8 : vlib_num_workers () ? 5e8 : 1e8;
1514   else
1515     return 0;
1516 }
1517
1518 static inline void
1519 session_wrk_set_state (session_worker_t *wrk, session_wrk_state_t state)
1520 {
1521   u64 time_ns;
1522
1523   wrk->state = state;
1524   time_ns = session_wrk_tfd_timeout (state, wrk->vm->thread_index);
1525   session_wrk_timerfd_update (wrk, time_ns);
1526 }
1527
1528 static void
1529 session_wrk_update_state (session_worker_t *wrk)
1530 {
1531   vlib_main_t *vm = wrk->vm;
1532
1533   if (wrk->state == SESSION_WRK_POLLING)
1534     {
1535       if (pool_elts (wrk->event_elts) == 3 &&
1536           vlib_last_vectors_per_main_loop (vm) < 1)
1537         {
1538           session_wrk_set_state (wrk, SESSION_WRK_INTERRUPT);
1539           vlib_node_set_state (vm, session_queue_node.index,
1540                                VLIB_NODE_STATE_INTERRUPT);
1541         }
1542     }
1543   else if (wrk->state == SESSION_WRK_INTERRUPT)
1544     {
1545       if (pool_elts (wrk->event_elts) > 3 ||
1546           vlib_last_vectors_per_main_loop (vm) > 1)
1547         {
1548           session_wrk_set_state (wrk, SESSION_WRK_POLLING);
1549           vlib_node_set_state (vm, session_queue_node.index,
1550                                VLIB_NODE_STATE_POLLING);
1551         }
1552       else if (PREDICT_FALSE (!pool_elts (wrk->sessions)))
1553         {
1554           session_wrk_set_state (wrk, SESSION_WRK_IDLE);
1555         }
1556     }
1557   else
1558     {
1559       if (pool_elts (wrk->event_elts))
1560         {
1561           session_wrk_set_state (wrk, SESSION_WRK_INTERRUPT);
1562         }
1563     }
1564 }
1565
1566 static uword
1567 session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
1568                        vlib_frame_t * frame)
1569 {
1570   u32 thread_index = vm->thread_index, __clib_unused n_evts;
1571   session_evt_elt_t *elt, *ctrl_he, *new_he, *old_he;
1572   session_main_t *smm = vnet_get_session_main ();
1573   session_worker_t *wrk = &smm->wrk[thread_index];
1574   clib_llist_index_t ei, next_ei, old_ti;
1575   int n_tx_packets;
1576
1577   SESSION_EVT (SESSION_EVT_DISPATCH_START, wrk);
1578
1579   session_wrk_update_time (wrk, vlib_time_now (vm));
1580
1581   /*
1582    *  Update transport time
1583    */
1584   transport_update_time (wrk->last_vlib_time, thread_index);
1585   n_tx_packets = vec_len (wrk->pending_tx_buffers);
1586   SESSION_EVT (SESSION_EVT_DSP_CNTRS, UPDATE_TIME, wrk);
1587
1588   /*
1589    *  Dequeue new internal mq events
1590    */
1591
1592   n_evts = session_wrk_handle_mq (wrk, wrk->vpp_event_queue);
1593   SESSION_EVT (SESSION_EVT_DSP_CNTRS, MQ_DEQ, wrk, n_evts);
1594
1595   /*
1596    * Handle control events
1597    */
1598
1599   ctrl_he = pool_elt_at_index (wrk->event_elts, wrk->ctrl_head);
1600
1601   clib_llist_foreach_safe (wrk->event_elts, evt_list, ctrl_he, elt, ({
1602     clib_llist_remove (wrk->event_elts, evt_list, elt);
1603     session_event_dispatch_ctrl (wrk, elt);
1604   }));
1605
1606   SESSION_EVT (SESSION_EVT_DSP_CNTRS, CTRL_EVTS, wrk);
1607
1608   /*
1609    * Handle the new io events.
1610    */
1611
1612   new_he = pool_elt_at_index (wrk->event_elts, wrk->new_head);
1613   old_he = pool_elt_at_index (wrk->event_elts, wrk->old_head);
1614   old_ti = clib_llist_prev_index (old_he, evt_list);
1615
1616   ei = clib_llist_next_index (new_he, evt_list);
1617   while (ei != wrk->new_head && n_tx_packets < SESSION_NODE_FRAME_SIZE)
1618     {
1619       elt = pool_elt_at_index (wrk->event_elts, ei);
1620       ei = clib_llist_next_index (elt, evt_list);
1621       clib_llist_remove (wrk->event_elts, evt_list, elt);
1622       session_event_dispatch_io (wrk, node, elt, &n_tx_packets);
1623     }
1624
1625   SESSION_EVT (SESSION_EVT_DSP_CNTRS, NEW_IO_EVTS, wrk);
1626
1627   /*
1628    * Handle the old io events, if we had any prior to processing the new ones
1629    */
1630
1631   if (old_ti != wrk->old_head)
1632     {
1633       old_he = pool_elt_at_index (wrk->event_elts, wrk->old_head);
1634       ei = clib_llist_next_index (old_he, evt_list);
1635
1636       while (n_tx_packets < SESSION_NODE_FRAME_SIZE)
1637         {
1638           elt = pool_elt_at_index (wrk->event_elts, ei);
1639           next_ei = clib_llist_next_index (elt, evt_list);
1640           clib_llist_remove (wrk->event_elts, evt_list, elt);
1641
1642           session_event_dispatch_io (wrk, node, elt, &n_tx_packets);
1643
1644           if (ei == old_ti)
1645             break;
1646
1647           ei = next_ei;
1648         };
1649     }
1650
1651   SESSION_EVT (SESSION_EVT_DSP_CNTRS, OLD_IO_EVTS, wrk);
1652
1653   if (vec_len (wrk->pending_tx_buffers))
1654     session_flush_pending_tx_buffers (wrk, node);
1655
1656   vlib_node_increment_counter (vm, session_queue_node.index,
1657                                SESSION_QUEUE_ERROR_TX, n_tx_packets);
1658
1659   SESSION_EVT (SESSION_EVT_DISPATCH_END, wrk, n_tx_packets);
1660
1661   if (wrk->flags & SESSION_WRK_F_ADAPTIVE)
1662     session_wrk_update_state (wrk);
1663
1664   return n_tx_packets;
1665 }
1666
1667 /* *INDENT-OFF* */
1668 VLIB_REGISTER_NODE (session_queue_node) =
1669 {
1670   .function = session_queue_node_fn,
1671   .flags = VLIB_NODE_FLAG_TRACE_SUPPORTED,
1672   .name = "session-queue",
1673   .format_trace = format_session_queue_trace,
1674   .type = VLIB_NODE_TYPE_INPUT,
1675   .n_errors = ARRAY_LEN (session_queue_error_strings),
1676   .error_strings = session_queue_error_strings,
1677   .state = VLIB_NODE_STATE_DISABLED,
1678 };
1679 /* *INDENT-ON* */
1680
1681 static clib_error_t *
1682 session_wrk_tfd_read_ready (clib_file_t *cf)
1683 {
1684   session_worker_t *wrk = session_main_get_worker (cf->private_data);
1685   u64 buf;
1686   int rv;
1687
1688   vlib_node_set_interrupt_pending (wrk->vm, session_queue_node.index);
1689   rv = read (wrk->timerfd, &buf, sizeof (buf));
1690   if (rv < 0 && errno != EAGAIN)
1691     clib_unix_warning ("failed");
1692   return 0;
1693 }
1694
1695 static clib_error_t *
1696 session_wrk_tfd_write_ready (clib_file_t *cf)
1697 {
1698   return 0;
1699 }
1700
1701 void
1702 session_wrk_enable_adaptive_mode (session_worker_t *wrk)
1703 {
1704   u32 thread_index = wrk->vm->thread_index;
1705   clib_file_t template = { 0 };
1706
1707   if ((wrk->timerfd = timerfd_create (CLOCK_MONOTONIC, TFD_NONBLOCK)) < 0)
1708     clib_warning ("timerfd_create");
1709
1710   template.read_function = session_wrk_tfd_read_ready;
1711   template.write_function = session_wrk_tfd_write_ready;
1712   template.file_descriptor = wrk->timerfd;
1713   template.private_data = thread_index;
1714   template.polling_thread_index = thread_index;
1715   template.description = format (0, "session-wrk-tfd-%u", thread_index);
1716
1717   wrk->timerfd_file = clib_file_add (&file_main, &template);
1718   wrk->flags |= SESSION_WRK_F_ADAPTIVE;
1719 }
1720
1721 static clib_error_t *
1722 session_queue_exit (vlib_main_t * vm)
1723 {
1724   if (vlib_get_n_threads () < 2)
1725     return 0;
1726
1727   /*
1728    * Shut off (especially) worker-thread session nodes.
1729    * Otherwise, vpp can crash as the main thread unmaps the
1730    * API segment.
1731    */
1732   vlib_worker_thread_barrier_sync (vm);
1733   session_node_enable_disable (0 /* is_enable */ );
1734   vlib_worker_thread_barrier_release (vm);
1735   return 0;
1736 }
1737
1738 VLIB_MAIN_LOOP_EXIT_FUNCTION (session_queue_exit);
1739
1740 static uword
1741 session_queue_run_on_main (vlib_main_t * vm)
1742 {
1743   vlib_node_runtime_t *node;
1744
1745   node = vlib_node_get_runtime (vm, session_queue_node.index);
1746   return session_queue_node_fn (vm, node, 0);
1747 }
1748
1749 static uword
1750 session_queue_process (vlib_main_t * vm, vlib_node_runtime_t * rt,
1751                        vlib_frame_t * f)
1752 {
1753   uword *event_data = 0;
1754   f64 timeout = 1.0;
1755   uword event_type;
1756
1757   while (1)
1758     {
1759       vlib_process_wait_for_event_or_clock (vm, timeout);
1760       event_type = vlib_process_get_events (vm, (uword **) & event_data);
1761
1762       switch (event_type)
1763         {
1764         case SESSION_Q_PROCESS_RUN_ON_MAIN:
1765           /* Run session queue node on main thread */
1766           session_queue_run_on_main (vm);
1767           break;
1768         case SESSION_Q_PROCESS_STOP:
1769           vlib_node_set_state (vm, session_queue_process_node.index,
1770                                VLIB_NODE_STATE_DISABLED);
1771           timeout = 100000.0;
1772           break;
1773         case ~0:
1774           /* Timed out. Run on main to ensure all events are handled */
1775           session_queue_run_on_main (vm);
1776           break;
1777         }
1778       vec_reset_length (event_data);
1779     }
1780   return 0;
1781 }
1782
1783 /* *INDENT-OFF* */
1784 VLIB_REGISTER_NODE (session_queue_process_node) =
1785 {
1786   .function = session_queue_process,
1787   .type = VLIB_NODE_TYPE_PROCESS,
1788   .name = "session-queue-process",
1789   .state = VLIB_NODE_STATE_DISABLED,
1790 };
1791 /* *INDENT-ON* */
1792
1793 static_always_inline uword
1794 session_queue_pre_input_inline (vlib_main_t * vm, vlib_node_runtime_t * node,
1795                                 vlib_frame_t * frame)
1796 {
1797   session_main_t *sm = &session_main;
1798   if (!sm->wrk[0].vpp_event_queue)
1799     return 0;
1800   node = vlib_node_get_runtime (vm, session_queue_node.index);
1801   return session_queue_node_fn (vm, node, frame);
1802 }
1803
1804 /* *INDENT-OFF* */
1805 VLIB_REGISTER_NODE (session_queue_pre_input_node) =
1806 {
1807   .function = session_queue_pre_input_inline,
1808   .type = VLIB_NODE_TYPE_PRE_INPUT,
1809   .name = "session-queue-main",
1810   .state = VLIB_NODE_STATE_DISABLED,
1811 };
1812 /* *INDENT-ON* */
1813
1814 /*
1815  * fd.io coding-style-patch-verification: ON
1816  *
1817  * Local Variables:
1818  * eval: (c-set-style "gnu")
1819  * End:
1820  */