session: coverity warnings
[vpp.git] / src / vnet / session / session_node.c
1 /*
2  * Copyright (c) 2017 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include <math.h>
17 #include <vlib/vlib.h>
18 #include <vnet/vnet.h>
19 #include <vppinfra/elog.h>
20 #include <vnet/session/transport.h>
21 #include <vnet/session/session.h>
22 #include <vnet/session/application.h>
23 #include <vnet/session/application_interface.h>
24 #include <vnet/session/session_debug.h>
25 #include <svm/queue.h>
26
27 static void
28 session_mq_accepted_reply_handler (void *data)
29 {
30   session_accepted_reply_msg_t *mp = (session_accepted_reply_msg_t *) data;
31   vnet_disconnect_args_t _a = { 0 }, *a = &_a;
32   local_session_t *ls;
33   stream_session_t *s;
34
35   /* Server isn't interested, kill the session */
36   if (mp->retval)
37     {
38       a->app_index = mp->context;
39       a->handle = mp->handle;
40       vnet_disconnect_session (a);
41       return;
42     }
43
44   if (session_handle_is_local (mp->handle))
45     {
46       ls = application_get_local_session_from_handle (mp->handle);
47       if (!ls || ls->app_index != mp->context)
48         {
49           clib_warning ("server %u doesn't own local handle %llu",
50                         mp->context, mp->handle);
51           return;
52         }
53       if (application_local_session_connect_notify (ls))
54         return;
55       ls->session_state = SESSION_STATE_READY;
56     }
57   else
58     {
59       s = session_get_from_handle_if_valid (mp->handle);
60       if (!s)
61         {
62           clib_warning ("session doesn't exist");
63           return;
64         }
65       if (s->app_index != mp->context)
66         {
67           clib_warning ("app doesn't own session");
68           return;
69         }
70       s->session_state = SESSION_STATE_READY;
71     }
72 }
73
74 static void
75 session_mq_reset_reply_handler (void *data)
76 {
77   session_reset_reply_msg_t *mp;
78   application_t *app;
79   stream_session_t *s;
80   u32 index, thread_index;
81
82   mp = (session_reset_reply_msg_t *) data;
83   app = application_lookup (mp->client_index);
84   if (!app)
85     return;
86
87   session_parse_handle (mp->handle, &index, &thread_index);
88   s = session_get_if_valid (index, thread_index);
89   if (s == 0 || app->index != s->app_index)
90     {
91       clib_warning ("Invalid session!");
92       return;
93     }
94
95   /* Client objected to resetting the session, log and continue */
96   if (mp->retval)
97     {
98       clib_warning ("client retval %d", mp->retval);
99       return;
100     }
101
102   /* This comes as a response to a reset, transport only waiting for
103    * confirmation to remove connection state, no need to disconnect */
104   stream_session_cleanup (s);
105 }
106
107 static void
108 session_mq_disconnected_handler (void *data)
109 {
110   session_disconnected_reply_msg_t *rmp;
111   vnet_disconnect_args_t _a, *a = &_a;
112   svm_msg_q_msg_t _msg, *msg = &_msg;
113   session_disconnected_msg_t *mp;
114   session_event_t *evt;
115   stream_session_t *s;
116   application_t *app;
117   int rv = 0;
118
119   mp = (session_disconnected_msg_t *) data;
120   app = application_lookup (mp->client_index);
121   s = session_get_from_handle_if_valid (mp->handle);
122   if (!(app && s && s->app_index == app->index))
123     {
124       clib_warning ("could not disconnect session: %llu app: %u", mp->handle,
125                     mp->client_index);
126       return;
127     }
128
129   a->handle = mp->handle;
130   a->app_index = app->index;
131   rv = vnet_disconnect_session (a);
132
133   svm_msg_q_lock_and_alloc_msg_w_ring (app->event_queue,
134                                        SESSION_MQ_CTRL_EVT_RING,
135                                        SVM_Q_WAIT, msg);
136   svm_msg_q_unlock (app->event_queue);
137   evt = svm_msg_q_msg_data (app->event_queue, msg);
138   memset (evt, 0, sizeof (*evt));
139   evt->event_type = SESSION_CTRL_EVT_DISCONNECTED;
140   rmp = (session_disconnected_reply_msg_t *) evt->data;
141   rmp->handle = mp->handle;
142   rmp->context = mp->context;
143   rmp->retval = rv;
144   svm_msg_q_add (app->event_queue, msg, SVM_Q_WAIT);
145 }
146
147 static void
148 session_mq_disconnected_reply_handler (void *data)
149 {
150   session_disconnected_reply_msg_t *mp;
151   vnet_disconnect_args_t _a, *a = &_a;
152   application_t *app;
153
154   mp = (session_disconnected_reply_msg_t *) data;
155
156   /* Client objected to disconnecting the session, log and continue */
157   if (mp->retval)
158     {
159       clib_warning ("client retval %d", mp->retval);
160       return;
161     }
162
163   /* Disconnect has been confirmed. Confirm close to transport */
164   app = application_lookup (mp->context);
165   if (app)
166     {
167       a->handle = mp->handle;
168       a->app_index = app->index;
169       vnet_disconnect_session (a);
170     }
171 }
172
173 vlib_node_registration_t session_queue_node;
174
175 typedef struct
176 {
177   u32 session_index;
178   u32 server_thread_index;
179 } session_queue_trace_t;
180
181 /* packet trace format function */
182 static u8 *
183 format_session_queue_trace (u8 * s, va_list * args)
184 {
185   CLIB_UNUSED (vlib_main_t * vm) = va_arg (*args, vlib_main_t *);
186   CLIB_UNUSED (vlib_node_t * node) = va_arg (*args, vlib_node_t *);
187   session_queue_trace_t *t = va_arg (*args, session_queue_trace_t *);
188
189   s = format (s, "SESSION_QUEUE: session index %d, server thread index %d",
190               t->session_index, t->server_thread_index);
191   return s;
192 }
193
194 #define foreach_session_queue_error             \
195 _(TX, "Packets transmitted")                    \
196 _(TIMER, "Timer events")                        \
197 _(NO_BUFFER, "Out of buffers")
198
199 typedef enum
200 {
201 #define _(sym,str) SESSION_QUEUE_ERROR_##sym,
202   foreach_session_queue_error
203 #undef _
204     SESSION_QUEUE_N_ERROR,
205 } session_queue_error_t;
206
207 static char *session_queue_error_strings[] = {
208 #define _(sym,string) string,
209   foreach_session_queue_error
210 #undef _
211 };
212
213 enum
214 {
215   SESSION_TX_NO_BUFFERS = -2,
216   SESSION_TX_NO_DATA,
217   SESSION_TX_OK
218 };
219
220 static void
221 session_tx_trace_frame (vlib_main_t * vm, vlib_node_runtime_t * node,
222                         u32 next_index, u32 * to_next, u16 n_segs,
223                         stream_session_t * s, u32 n_trace)
224 {
225   session_queue_trace_t *t;
226   vlib_buffer_t *b;
227   int i;
228
229   for (i = 0; i < clib_min (n_trace, n_segs); i++)
230     {
231       b = vlib_get_buffer (vm, to_next[i - n_segs]);
232       vlib_trace_buffer (vm, node, next_index, b, 1 /* follow_chain */ );
233       t = vlib_add_trace (vm, node, b, sizeof (*t));
234       t->session_index = s->session_index;
235       t->server_thread_index = s->thread_index;
236     }
237   vlib_set_trace_count (vm, node, n_trace - i);
238 }
239
240 always_inline void
241 session_tx_fifo_chain_tail (vlib_main_t * vm, session_tx_context_t * ctx,
242                             vlib_buffer_t * b, u16 * n_bufs, u8 peek_data)
243 {
244   session_manager_main_t *smm = &session_manager_main;
245   vlib_buffer_t *chain_b, *prev_b;
246   u32 chain_bi0, to_deq, left_from_seg;
247   u16 len_to_deq, n_bytes_read;
248   u8 *data, j;
249
250   b->flags |= VLIB_BUFFER_TOTAL_LENGTH_VALID;
251   b->total_length_not_including_first_buffer = 0;
252
253   chain_b = b;
254   left_from_seg = clib_min (ctx->snd_mss - b->current_length,
255                             ctx->left_to_snd);
256   to_deq = left_from_seg;
257   for (j = 1; j < ctx->n_bufs_per_seg; j++)
258     {
259       prev_b = chain_b;
260       len_to_deq = clib_min (to_deq, ctx->deq_per_buf);
261
262       *n_bufs -= 1;
263       chain_bi0 = smm->tx_buffers[ctx->s->thread_index][*n_bufs];
264       _vec_len (smm->tx_buffers[ctx->s->thread_index]) = *n_bufs;
265
266       chain_b = vlib_get_buffer (vm, chain_bi0);
267       chain_b->current_data = 0;
268       data = vlib_buffer_get_current (chain_b);
269       if (peek_data)
270         {
271           n_bytes_read = svm_fifo_peek (ctx->s->server_tx_fifo,
272                                         ctx->tx_offset, len_to_deq, data);
273           ctx->tx_offset += n_bytes_read;
274         }
275       else
276         {
277           if (ctx->transport_vft->tx_type == TRANSPORT_TX_DGRAM)
278             {
279               svm_fifo_t *f = ctx->s->server_tx_fifo;
280               session_dgram_hdr_t *hdr = &ctx->hdr;
281               u16 deq_now;
282               deq_now = clib_min (hdr->data_length - hdr->data_offset,
283                                   len_to_deq);
284               n_bytes_read = svm_fifo_peek (f, hdr->data_offset, deq_now,
285                                             data);
286               ASSERT (n_bytes_read > 0);
287
288               hdr->data_offset += n_bytes_read;
289               if (hdr->data_offset == hdr->data_length)
290                 svm_fifo_dequeue_drop (f, hdr->data_length);
291             }
292           else
293             n_bytes_read = svm_fifo_dequeue_nowait (ctx->s->server_tx_fifo,
294                                                     len_to_deq, data);
295         }
296       ASSERT (n_bytes_read == len_to_deq);
297       chain_b->current_length = n_bytes_read;
298       b->total_length_not_including_first_buffer += chain_b->current_length;
299
300       /* update previous buffer */
301       prev_b->next_buffer = chain_bi0;
302       prev_b->flags |= VLIB_BUFFER_NEXT_PRESENT;
303
304       /* update current buffer */
305       chain_b->next_buffer = 0;
306
307       to_deq -= n_bytes_read;
308       if (to_deq == 0)
309         break;
310     }
311   ASSERT (to_deq == 0
312           && b->total_length_not_including_first_buffer == left_from_seg);
313   ctx->left_to_snd -= left_from_seg;
314 }
315
316 always_inline int
317 session_output_try_get_buffers (vlib_main_t * vm,
318                                 session_manager_main_t * smm,
319                                 u32 thread_index, u16 * n_bufs, u32 wanted)
320 {
321   u32 n_alloc;
322   vec_validate_aligned (smm->tx_buffers[thread_index], wanted - 1,
323                         CLIB_CACHE_LINE_BYTES);
324   n_alloc = vlib_buffer_alloc (vm, &smm->tx_buffers[thread_index][*n_bufs],
325                                wanted - *n_bufs);
326   *n_bufs += n_alloc;
327   _vec_len (smm->tx_buffers[thread_index]) = *n_bufs;
328   return n_alloc;
329 }
330
331 always_inline void
332 session_tx_fill_buffer (vlib_main_t * vm, session_tx_context_t * ctx,
333                         vlib_buffer_t * b, u16 * n_bufs, u8 peek_data)
334 {
335   u32 len_to_deq;
336   u8 *data0;
337   int n_bytes_read;
338
339   /*
340    * Start with the first buffer in chain
341    */
342   b->error = 0;
343   b->flags = VNET_BUFFER_F_LOCALLY_ORIGINATED;
344   b->current_data = 0;
345
346   data0 = vlib_buffer_make_headroom (b, MAX_HDRS_LEN);
347   len_to_deq = clib_min (ctx->left_to_snd, ctx->deq_per_first_buf);
348
349   if (peek_data)
350     {
351       n_bytes_read = svm_fifo_peek (ctx->s->server_tx_fifo, ctx->tx_offset,
352                                     len_to_deq, data0);
353       ASSERT (n_bytes_read > 0);
354       /* Keep track of progress locally, transport is also supposed to
355        * increment it independently when pushing the header */
356       ctx->tx_offset += n_bytes_read;
357     }
358   else
359     {
360       if (ctx->transport_vft->tx_type == TRANSPORT_TX_DGRAM)
361         {
362           session_dgram_hdr_t *hdr = &ctx->hdr;
363           svm_fifo_t *f = ctx->s->server_tx_fifo;
364           u16 deq_now;
365           u32 offset;
366
367           ASSERT (hdr->data_length > hdr->data_offset);
368           deq_now = clib_min (hdr->data_length - hdr->data_offset,
369                               len_to_deq);
370           offset = hdr->data_offset + SESSION_CONN_HDR_LEN;
371           n_bytes_read = svm_fifo_peek (f, offset, deq_now, data0);
372           ASSERT (n_bytes_read > 0);
373
374           if (ctx->s->session_state == SESSION_STATE_LISTENING)
375             {
376               ip_copy (&ctx->tc->rmt_ip, &hdr->rmt_ip, ctx->tc->is_ip4);
377               ctx->tc->rmt_port = hdr->rmt_port;
378             }
379           hdr->data_offset += n_bytes_read;
380           if (hdr->data_offset == hdr->data_length)
381             {
382               offset = hdr->data_length + SESSION_CONN_HDR_LEN;
383               svm_fifo_dequeue_drop (f, offset);
384             }
385         }
386       else
387         {
388           n_bytes_read = svm_fifo_dequeue_nowait (ctx->s->server_tx_fifo,
389                                                   len_to_deq, data0);
390           ASSERT (n_bytes_read > 0);
391         }
392     }
393   b->current_length = n_bytes_read;
394   ctx->left_to_snd -= n_bytes_read;
395
396   /*
397    * Fill in the remaining buffers in the chain, if any
398    */
399   if (PREDICT_FALSE (ctx->n_bufs_per_seg > 1 && ctx->left_to_snd))
400     session_tx_fifo_chain_tail (vm, ctx, b, n_bufs, peek_data);
401
402   /* *INDENT-OFF* */
403   SESSION_EVT_DBG(SESSION_EVT_DEQ, ctx->s, ({
404         ed->data[0] = FIFO_EVENT_APP_TX;
405         ed->data[1] = ctx->max_dequeue;
406         ed->data[2] = len_to_deq;
407         ed->data[3] = ctx->left_to_snd;
408   }));
409   /* *INDENT-ON* */
410 }
411
412 always_inline u8
413 session_tx_not_ready (stream_session_t * s, u8 peek_data)
414 {
415   if (peek_data)
416     {
417       /* Can retransmit for closed sessions but can't send new data if
418        * session is not ready or closed */
419       if (s->session_state < SESSION_STATE_READY)
420         return 1;
421       if (s->session_state == SESSION_STATE_CLOSED)
422         return 2;
423     }
424   return 0;
425 }
426
427 always_inline transport_connection_t *
428 session_tx_get_transport (session_tx_context_t * ctx, u8 peek_data)
429 {
430   if (peek_data)
431     {
432       return ctx->transport_vft->get_connection (ctx->s->connection_index,
433                                                  ctx->s->thread_index);
434     }
435   else
436     {
437       if (ctx->s->session_state == SESSION_STATE_LISTENING)
438         return ctx->transport_vft->get_listener (ctx->s->connection_index);
439       else
440         {
441           return ctx->transport_vft->get_connection (ctx->s->connection_index,
442                                                      ctx->s->thread_index);
443         }
444     }
445 }
446
447 always_inline void
448 session_tx_set_dequeue_params (vlib_main_t * vm, session_tx_context_t * ctx,
449                                u32 max_segs, u8 peek_data)
450 {
451   u32 n_bytes_per_buf, n_bytes_per_seg;
452   ctx->max_dequeue = svm_fifo_max_dequeue (ctx->s->server_tx_fifo);
453   if (peek_data)
454     {
455       /* Offset in rx fifo from where to peek data */
456       ctx->tx_offset = ctx->transport_vft->tx_fifo_offset (ctx->tc);
457       if (PREDICT_FALSE (ctx->tx_offset >= ctx->max_dequeue))
458         {
459           ctx->max_len_to_snd = 0;
460           return;
461         }
462       ctx->max_dequeue -= ctx->tx_offset;
463     }
464   else
465     {
466       if (ctx->transport_vft->tx_type == TRANSPORT_TX_DGRAM)
467         {
468           if (ctx->max_dequeue <= sizeof (ctx->hdr))
469             {
470               ctx->max_len_to_snd = 0;
471               return;
472             }
473           svm_fifo_peek (ctx->s->server_tx_fifo, 0, sizeof (ctx->hdr),
474                          (u8 *) & ctx->hdr);
475           ASSERT (ctx->hdr.data_length > ctx->hdr.data_offset);
476           ctx->max_dequeue = ctx->hdr.data_length - ctx->hdr.data_offset;
477         }
478     }
479   ASSERT (ctx->max_dequeue > 0);
480
481   /* Ensure we're not writing more than transport window allows */
482   if (ctx->max_dequeue < ctx->snd_space)
483     {
484       /* Constrained by tx queue. Try to send only fully formed segments */
485       ctx->max_len_to_snd =
486         (ctx->max_dequeue > ctx->snd_mss) ?
487         ctx->max_dequeue - ctx->max_dequeue % ctx->snd_mss : ctx->max_dequeue;
488       /* TODO Nagle ? */
489     }
490   else
491     {
492       /* Expectation is that snd_space0 is already a multiple of snd_mss */
493       ctx->max_len_to_snd = ctx->snd_space;
494     }
495
496   /* Check if we're tx constrained by the node */
497   ctx->n_segs_per_evt = ceil ((f64) ctx->max_len_to_snd / ctx->snd_mss);
498   if (ctx->n_segs_per_evt > max_segs)
499     {
500       ctx->n_segs_per_evt = max_segs;
501       ctx->max_len_to_snd = max_segs * ctx->snd_mss;
502     }
503
504   n_bytes_per_buf = vlib_buffer_free_list_buffer_size (vm,
505                                                        VLIB_BUFFER_DEFAULT_FREE_LIST_INDEX);
506   ASSERT (n_bytes_per_buf > MAX_HDRS_LEN);
507   n_bytes_per_seg = MAX_HDRS_LEN + ctx->snd_mss;
508   ctx->n_bufs_per_seg = ceil ((f64) n_bytes_per_seg / n_bytes_per_buf);
509   ctx->deq_per_buf = clib_min (ctx->snd_mss, n_bytes_per_buf);
510   ctx->deq_per_first_buf = clib_min (ctx->snd_mss,
511                                      n_bytes_per_buf - MAX_HDRS_LEN);
512 }
513
514 always_inline int
515 session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node,
516                                 session_event_t * e,
517                                 stream_session_t * s, int *n_tx_packets,
518                                 u8 peek_data)
519 {
520   u32 next_index, next0, next1, *to_next, n_left_to_next;
521   u32 n_trace = vlib_get_trace_count (vm, node), n_bufs_needed = 0;
522   u32 thread_index = s->thread_index, n_left, pbi;
523   session_manager_main_t *smm = &session_manager_main;
524   session_tx_context_t *ctx = &smm->ctx[thread_index];
525   transport_proto_t tp;
526   vlib_buffer_t *pb;
527   u16 n_bufs, rv;
528
529   if (PREDICT_FALSE ((rv = session_tx_not_ready (s, peek_data))))
530     {
531       if (rv < 2)
532         vec_add1 (smm->pending_event_vector[thread_index], *e);
533       return SESSION_TX_NO_DATA;
534     }
535
536   next_index = smm->session_type_to_next[s->session_type];
537   next0 = next1 = next_index;
538
539   tp = session_get_transport_proto (s);
540   ctx->s = s;
541   ctx->transport_vft = transport_protocol_get_vft (tp);
542   ctx->tc = session_tx_get_transport (ctx, peek_data);
543   ctx->snd_mss = ctx->transport_vft->send_mss (ctx->tc);
544   ctx->snd_space = ctx->transport_vft->send_space (ctx->tc);
545   if (ctx->snd_space == 0 || ctx->snd_mss == 0)
546     {
547       vec_add1 (smm->pending_event_vector[thread_index], *e);
548       return SESSION_TX_NO_DATA;
549     }
550
551   /* Allow enqueuing of a new event */
552   svm_fifo_unset_event (s->server_tx_fifo);
553
554   /* Check how much we can pull. */
555   session_tx_set_dequeue_params (vm, ctx, VLIB_FRAME_SIZE - *n_tx_packets,
556                                  peek_data);
557
558   if (PREDICT_FALSE (!ctx->max_len_to_snd))
559     return SESSION_TX_NO_DATA;
560
561   n_bufs = vec_len (smm->tx_buffers[thread_index]);
562   n_bufs_needed = ctx->n_segs_per_evt * ctx->n_bufs_per_seg;
563
564   /*
565    * Make sure we have at least one full frame of buffers ready
566    */
567   if (n_bufs < n_bufs_needed)
568     {
569       session_output_try_get_buffers (vm, smm, thread_index, &n_bufs,
570                                       ctx->n_bufs_per_seg * VLIB_FRAME_SIZE);
571       if (PREDICT_FALSE (n_bufs < n_bufs_needed))
572         {
573           vec_add1 (smm->pending_event_vector[thread_index], *e);
574           return SESSION_TX_NO_BUFFERS;
575         }
576     }
577
578   /*
579    * Write until we fill up a frame
580    */
581   vlib_get_next_frame (vm, node, next_index, to_next, n_left_to_next);
582   if (PREDICT_FALSE (ctx->n_segs_per_evt > n_left_to_next))
583     {
584       ctx->n_segs_per_evt = n_left_to_next;
585       ctx->max_len_to_snd = ctx->snd_mss * n_left_to_next;
586     }
587   ctx->left_to_snd = ctx->max_len_to_snd;
588   n_left = ctx->n_segs_per_evt;
589
590   while (n_left >= 4)
591     {
592       vlib_buffer_t *b0, *b1;
593       u32 bi0, bi1;
594
595       pbi = smm->tx_buffers[thread_index][n_bufs - 3];
596       pb = vlib_get_buffer (vm, pbi);
597       vlib_prefetch_buffer_header (pb, STORE);
598       pbi = smm->tx_buffers[thread_index][n_bufs - 4];
599       pb = vlib_get_buffer (vm, pbi);
600       vlib_prefetch_buffer_header (pb, STORE);
601
602       to_next[0] = bi0 = smm->tx_buffers[thread_index][--n_bufs];
603       to_next[1] = bi1 = smm->tx_buffers[thread_index][--n_bufs];
604
605       b0 = vlib_get_buffer (vm, bi0);
606       b1 = vlib_get_buffer (vm, bi1);
607
608       session_tx_fill_buffer (vm, ctx, b0, &n_bufs, peek_data);
609       session_tx_fill_buffer (vm, ctx, b1, &n_bufs, peek_data);
610
611       ctx->transport_vft->push_header (ctx->tc, b0);
612       ctx->transport_vft->push_header (ctx->tc, b1);
613
614       to_next += 2;
615       n_left_to_next -= 2;
616       n_left -= 2;
617
618       VLIB_BUFFER_TRACE_TRAJECTORY_INIT (b0);
619       VLIB_BUFFER_TRACE_TRAJECTORY_INIT (b1);
620
621       vlib_validate_buffer_enqueue_x2 (vm, node, next_index, to_next,
622                                        n_left_to_next, bi0, bi1, next0,
623                                        next1);
624     }
625   while (n_left)
626     {
627       vlib_buffer_t *b0;
628       u32 bi0;
629
630       if (n_left > 1)
631         {
632           pbi = smm->tx_buffers[thread_index][n_bufs - 2];
633           pb = vlib_get_buffer (vm, pbi);
634           vlib_prefetch_buffer_header (pb, STORE);
635         }
636
637       to_next[0] = bi0 = smm->tx_buffers[thread_index][--n_bufs];
638       b0 = vlib_get_buffer (vm, bi0);
639       session_tx_fill_buffer (vm, ctx, b0, &n_bufs, peek_data);
640
641       /* Ask transport to push header after current_length and
642        * total_length_not_including_first_buffer are updated */
643       ctx->transport_vft->push_header (ctx->tc, b0);
644
645       to_next += 1;
646       n_left_to_next -= 1;
647       n_left -= 1;
648
649       VLIB_BUFFER_TRACE_TRAJECTORY_INIT (b0);
650
651       vlib_validate_buffer_enqueue_x1 (vm, node, next_index, to_next,
652                                        n_left_to_next, bi0, next0);
653     }
654
655   if (PREDICT_FALSE (n_trace > 0))
656     session_tx_trace_frame (vm, node, next_index, to_next,
657                             ctx->n_segs_per_evt, s, n_trace);
658
659   _vec_len (smm->tx_buffers[thread_index]) = n_bufs;
660   *n_tx_packets += ctx->n_segs_per_evt;
661   vlib_put_next_frame (vm, node, next_index, n_left_to_next);
662
663   /* If we couldn't dequeue all bytes mark as partially read */
664   ASSERT (ctx->left_to_snd == 0);
665   if (ctx->max_len_to_snd < ctx->max_dequeue)
666     if (svm_fifo_set_event (s->server_tx_fifo))
667       vec_add1 (smm->pending_event_vector[thread_index], *e);
668
669   if (!peek_data && ctx->transport_vft->tx_type == TRANSPORT_TX_DGRAM)
670     {
671       /* Fix dgram pre header */
672       if (ctx->max_len_to_snd < ctx->max_dequeue)
673         svm_fifo_overwrite_head (s->server_tx_fifo, (u8 *) & ctx->hdr,
674                                  sizeof (session_dgram_pre_hdr_t));
675       /* More data needs to be read */
676       else if (svm_fifo_max_dequeue (s->server_tx_fifo) > 0)
677         if (svm_fifo_set_event (s->server_tx_fifo))
678           vec_add1 (smm->pending_event_vector[thread_index], *e);
679     }
680   return SESSION_TX_OK;
681 }
682
683 int
684 session_tx_fifo_peek_and_snd (vlib_main_t * vm, vlib_node_runtime_t * node,
685                               session_event_t * e,
686                               stream_session_t * s, int *n_tx_pkts)
687 {
688   return session_tx_fifo_read_and_snd_i (vm, node, e, s, n_tx_pkts, 1);
689 }
690
691 int
692 session_tx_fifo_dequeue_and_snd (vlib_main_t * vm, vlib_node_runtime_t * node,
693                                  session_event_t * e,
694                                  stream_session_t * s, int *n_tx_pkts)
695 {
696   return session_tx_fifo_read_and_snd_i (vm, node, e, s, n_tx_pkts, 0);
697 }
698
699 int
700 session_tx_fifo_dequeue_internal (vlib_main_t * vm,
701                                   vlib_node_runtime_t * node,
702                                   session_event_t * e,
703                                   stream_session_t * s, int *n_tx_pkts)
704 {
705   application_t *app;
706   app = application_get (s->opaque);
707   svm_fifo_unset_event (s->server_tx_fifo);
708   return app->cb_fns.builtin_app_tx_callback (s);
709 }
710
711 always_inline stream_session_t *
712 session_event_get_session (session_event_t * e, u8 thread_index)
713 {
714   return session_get_if_valid (e->fifo->master_session_index, thread_index);
715 }
716
717 static uword
718 session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
719                        vlib_frame_t * frame)
720 {
721   session_manager_main_t *smm = vnet_get_session_manager_main ();
722   u32 thread_index = vm->thread_index, n_to_dequeue, n_events;
723   session_event_t *pending_events, *e;
724   session_event_t *fifo_events;
725   svm_msg_q_msg_t _msg, *msg = &_msg;
726   f64 now = vlib_time_now (vm);
727   int n_tx_packets = 0, i, rv;
728   application_t *app;
729   svm_msg_q_t *mq;
730   void (*fp) (void *);
731
732   SESSION_EVT_DBG (SESSION_EVT_POLL_GAP_TRACK, smm, thread_index);
733
734   /*
735    *  Update transport time
736    */
737   transport_update_time (now, thread_index);
738
739   /*
740    * Get vpp queue events that we can dequeue without blocking
741    */
742   mq = smm->vpp_event_queues[thread_index];
743   fifo_events = smm->free_event_vector[thread_index];
744   n_to_dequeue = svm_msg_q_size (mq);
745   pending_events = smm->pending_event_vector[thread_index];
746
747   if (!n_to_dequeue && !vec_len (pending_events)
748       && !vec_len (smm->pending_disconnects[thread_index]))
749     return 0;
750
751   SESSION_EVT_DBG (SESSION_EVT_DEQ_NODE, 0);
752
753   /*
754    * If we didn't manage to process previous events try going
755    * over them again without dequeuing new ones.
756    * XXX: Handle senders to sessions that can't keep up
757    */
758   if (0 && vec_len (pending_events) >= 100)
759     {
760       clib_warning ("too many fifo events unsolved");
761       goto skip_dequeue;
762     }
763
764   /* See you in the next life, don't be late
765    * XXX: we may need priorities here */
766   if (svm_msg_q_try_lock (mq))
767     return 0;
768
769   for (i = 0; i < n_to_dequeue; i++)
770     {
771       vec_add2 (fifo_events, e, 1);
772       svm_msg_q_sub_w_lock (mq, msg);
773       clib_memcpy (e, svm_msg_q_msg_data (mq, msg), sizeof (*e));
774       svm_msg_q_free_msg (mq, msg);
775     }
776
777   svm_msg_q_unlock (mq);
778
779   vec_append (fifo_events, pending_events);
780   vec_append (fifo_events, smm->pending_disconnects[thread_index]);
781
782   _vec_len (pending_events) = 0;
783   smm->pending_event_vector[thread_index] = pending_events;
784   _vec_len (smm->pending_disconnects[thread_index]) = 0;
785
786 skip_dequeue:
787   n_events = vec_len (fifo_events);
788   for (i = 0; i < n_events; i++)
789     {
790       stream_session_t *s;      /* $$$ prefetch 1 ahead maybe */
791       session_event_t *e;
792       u32 to_dequeue;
793
794       e = &fifo_events[i];
795       switch (e->event_type)
796         {
797         case FIFO_EVENT_APP_TX:
798           /* Don't try to send more that one frame per dispatch cycle */
799           if (n_tx_packets == VLIB_FRAME_SIZE)
800             {
801               vec_add1 (smm->pending_event_vector[thread_index], *e);
802               break;
803             }
804
805           s = session_event_get_session (e, thread_index);
806           if (PREDICT_FALSE (!s))
807             {
808               clib_warning ("It's dead, Jim!");
809               continue;
810             }
811           to_dequeue = svm_fifo_max_dequeue (s->server_tx_fifo);
812
813           /* Spray packets in per session type frames, since they go to
814            * different nodes */
815           rv = (smm->session_tx_fns[s->session_type]) (vm, node, e, s,
816                                                        &n_tx_packets);
817           if (PREDICT_TRUE (rv == SESSION_TX_OK))
818             {
819               /* Notify app there's tx space if not polling */
820               if (PREDICT_FALSE (to_dequeue == s->server_tx_fifo->nitems
821                                  && !svm_fifo_has_event (s->server_tx_fifo)))
822                 session_dequeue_notify (s);
823             }
824           else if (PREDICT_FALSE (rv == SESSION_TX_NO_BUFFERS))
825             {
826               vlib_node_increment_counter (vm, node->node_index,
827                                            SESSION_QUEUE_ERROR_NO_BUFFER, 1);
828               continue;
829             }
830           break;
831         case FIFO_EVENT_DISCONNECT:
832           /* Make sure stream disconnects run after the pending list is
833            * drained */
834           s = session_get_from_handle (e->session_handle);
835           if (!e->postponed)
836             {
837               e->postponed = 1;
838               vec_add1 (smm->pending_disconnects[thread_index], *e);
839               continue;
840             }
841           /* If tx queue is still not empty, wait */
842           if (svm_fifo_max_dequeue (s->server_tx_fifo))
843             {
844               vec_add1 (smm->pending_disconnects[thread_index], *e);
845               continue;
846             }
847
848           stream_session_disconnect_transport (s);
849           break;
850         case FIFO_EVENT_BUILTIN_RX:
851           s = session_event_get_session (e, thread_index);
852           if (PREDICT_FALSE (!s))
853             continue;
854           svm_fifo_unset_event (s->server_rx_fifo);
855           app = application_get (s->app_index);
856           app->cb_fns.builtin_app_rx_callback (s);
857           break;
858         case FIFO_EVENT_RPC:
859           fp = e->rpc_args.fp;
860           (*fp) (e->rpc_args.arg);
861           break;
862         case SESSION_CTRL_EVT_DISCONNECTED:
863           session_mq_disconnected_handler (e->data);
864           break;
865         case SESSION_CTRL_EVT_ACCEPTED_REPLY:
866           session_mq_accepted_reply_handler (e->data);
867           break;
868         case SESSION_CTRL_EVT_CONNECTED_REPLY:
869           break;
870         case SESSION_CTRL_EVT_DISCONNECTED_REPLY:
871           session_mq_disconnected_reply_handler (e->data);
872           break;
873         case SESSION_CTRL_EVT_RESET_REPLY:
874           session_mq_reset_reply_handler (e->data);
875           break;
876         default:
877           clib_warning ("unhandled event type %d", e->event_type);
878         }
879     }
880
881   _vec_len (fifo_events) = 0;
882   smm->free_event_vector[thread_index] = fifo_events;
883
884   vlib_node_increment_counter (vm, session_queue_node.index,
885                                SESSION_QUEUE_ERROR_TX, n_tx_packets);
886
887   SESSION_EVT_DBG (SESSION_EVT_DISPATCH_END, smm, thread_index);
888
889   return n_tx_packets;
890 }
891
892 /* *INDENT-OFF* */
893 VLIB_REGISTER_NODE (session_queue_node) =
894 {
895   .function = session_queue_node_fn,
896   .name = "session-queue",
897   .format_trace = format_session_queue_trace,
898   .type = VLIB_NODE_TYPE_INPUT,
899   .n_errors = ARRAY_LEN (session_queue_error_strings),
900   .error_strings = session_queue_error_strings,
901   .state = VLIB_NODE_STATE_DISABLED,
902 };
903 /* *INDENT-ON* */
904
905 void
906 dump_thread_0_event_queue (void)
907 {
908   session_manager_main_t *smm = vnet_get_session_manager_main ();
909   vlib_main_t *vm = &vlib_global_main;
910   u32 my_thread_index = vm->thread_index;
911   session_event_t _e, *e = &_e;
912   svm_msg_q_ring_t *ring;
913   stream_session_t *s0;
914   svm_msg_q_msg_t *msg;
915   svm_msg_q_t *mq;
916   int i, index;
917
918   mq = smm->vpp_event_queues[my_thread_index];
919   index = mq->q->head;
920
921   for (i = 0; i < mq->q->cursize; i++)
922     {
923       msg = (svm_msg_q_msg_t *) (&mq->q->data[0] + mq->q->elsize * index);
924       ring = svm_msg_q_ring (mq, msg->ring_index);
925       clib_memcpy (e, svm_msg_q_msg_data (mq, msg), ring->elsize);
926
927       switch (e->event_type)
928         {
929         case FIFO_EVENT_APP_TX:
930           s0 = session_event_get_session (e, my_thread_index);
931           fformat (stdout, "[%04d] TX session %d\n", i, s0->session_index);
932           break;
933
934         case FIFO_EVENT_DISCONNECT:
935           s0 = session_get_from_handle (e->session_handle);
936           fformat (stdout, "[%04d] disconnect session %d\n", i,
937                    s0->session_index);
938           break;
939
940         case FIFO_EVENT_BUILTIN_RX:
941           s0 = session_event_get_session (e, my_thread_index);
942           fformat (stdout, "[%04d] builtin_rx %d\n", i, s0->session_index);
943           break;
944
945         case FIFO_EVENT_RPC:
946           fformat (stdout, "[%04d] RPC call %llx with %llx\n",
947                    i, (u64) (e->rpc_args.fp), (u64) (e->rpc_args.arg));
948           break;
949
950         default:
951           fformat (stdout, "[%04d] unhandled event type %d\n",
952                    i, e->event_type);
953           break;
954         }
955
956       index++;
957
958       if (index == mq->q->maxsize)
959         index = 0;
960     }
961 }
962
963 static u8
964 session_node_cmp_event (session_event_t * e, svm_fifo_t * f)
965 {
966   stream_session_t *s;
967   switch (e->event_type)
968     {
969     case FIFO_EVENT_APP_RX:
970     case FIFO_EVENT_APP_TX:
971     case FIFO_EVENT_BUILTIN_RX:
972       if (e->fifo == f)
973         return 1;
974       break;
975     case FIFO_EVENT_DISCONNECT:
976       break;
977     case FIFO_EVENT_RPC:
978       s = session_get_from_handle (e->session_handle);
979       if (!s)
980         {
981           clib_warning ("session has event but doesn't exist!");
982           break;
983         }
984       if (s->server_rx_fifo == f || s->server_tx_fifo == f)
985         return 1;
986       break;
987     default:
988       break;
989     }
990   return 0;
991 }
992
993 u8
994 session_node_lookup_fifo_event (svm_fifo_t * f, session_event_t * e)
995 {
996   session_manager_main_t *smm = vnet_get_session_manager_main ();
997   svm_msg_q_t *mq;
998   session_event_t *pending_event_vector, *evt;
999   int i, index, found = 0;
1000   svm_msg_q_msg_t *msg;
1001   svm_msg_q_ring_t *ring;
1002   u8 thread_index;
1003
1004   ASSERT (e);
1005   thread_index = f->master_thread_index;
1006   /*
1007    * Search evt queue
1008    */
1009   mq = smm->vpp_event_queues[thread_index];
1010   index = mq->q->head;
1011   for (i = 0; i < mq->q->cursize; i++)
1012     {
1013       msg = (svm_msg_q_msg_t *) (&mq->q->data[0] + mq->q->elsize * index);
1014       ring = svm_msg_q_ring (mq, msg->ring_index);
1015       clib_memcpy (e, svm_msg_q_msg_data (mq, msg), ring->elsize);
1016       found = session_node_cmp_event (e, f);
1017       if (found)
1018         return 1;
1019       if (++index == mq->q->maxsize)
1020         index = 0;
1021     }
1022   /*
1023    * Search pending events vector
1024    */
1025   pending_event_vector = smm->pending_event_vector[thread_index];
1026   vec_foreach (evt, pending_event_vector)
1027   {
1028     found = session_node_cmp_event (evt, f);
1029     if (found)
1030       {
1031         clib_memcpy (e, evt, sizeof (*evt));
1032         break;
1033       }
1034   }
1035   return found;
1036 }
1037
1038 static clib_error_t *
1039 session_queue_exit (vlib_main_t * vm)
1040 {
1041   if (vec_len (vlib_mains) < 2)
1042     return 0;
1043
1044   /*
1045    * Shut off (especially) worker-thread session nodes.
1046    * Otherwise, vpp can crash as the main thread unmaps the
1047    * API segment.
1048    */
1049   vlib_worker_thread_barrier_sync (vm);
1050   session_node_enable_disable (0 /* is_enable */ );
1051   vlib_worker_thread_barrier_release (vm);
1052   return 0;
1053 }
1054
1055 VLIB_MAIN_LOOP_EXIT_FUNCTION (session_queue_exit);
1056
1057 static uword
1058 session_queue_process (vlib_main_t * vm, vlib_node_runtime_t * rt,
1059                        vlib_frame_t * f)
1060 {
1061   f64 now, timeout = 1.0;
1062   uword *event_data = 0;
1063   uword event_type;
1064
1065   while (1)
1066     {
1067       vlib_process_wait_for_event_or_clock (vm, timeout);
1068       now = vlib_time_now (vm);
1069       event_type = vlib_process_get_events (vm, (uword **) & event_data);
1070
1071       switch (event_type)
1072         {
1073         case SESSION_Q_PROCESS_FLUSH_FRAMES:
1074           /* Flush the frames by updating all transports times */
1075           transport_update_time (now, 0);
1076           break;
1077         case SESSION_Q_PROCESS_STOP:
1078           timeout = 100000.0;
1079           break;
1080         case ~0:
1081           /* Timed out. Update time for all transports to trigger all
1082            * outstanding retransmits. */
1083           transport_update_time (now, 0);
1084           break;
1085         }
1086       vec_reset_length (event_data);
1087     }
1088   return 0;
1089 }
1090
1091 /* *INDENT-OFF* */
1092 VLIB_REGISTER_NODE (session_queue_process_node) =
1093 {
1094   .function = session_queue_process,
1095   .type = VLIB_NODE_TYPE_PROCESS,
1096   .name = "session-queue-process",
1097   .state = VLIB_NODE_STATE_DISABLED,
1098 };
1099 /* *INDENT-ON* */
1100
1101
1102 /*
1103  * fd.io coding-style-patch-verification: ON
1104  *
1105  * Local Variables:
1106  * eval: (c-set-style "gnu")
1107  * End:
1108  */