udp/session: refactor to support dgram mode
[vpp.git] / src / vnet / session / session_node.c
1 /*
2  * Copyright (c) 2017 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include <math.h>
17 #include <vlib/vlib.h>
18 #include <vnet/vnet.h>
19 #include <vppinfra/elog.h>
20 #include <vnet/session/transport.h>
21 #include <vnet/session/application.h>
22 #include <vnet/session/session_debug.h>
23 #include <svm/queue.h>
24
25 vlib_node_registration_t session_queue_node;
26
27 typedef struct
28 {
29   u32 session_index;
30   u32 server_thread_index;
31 } session_queue_trace_t;
32
33 /* packet trace format function */
34 static u8 *
35 format_session_queue_trace (u8 * s, va_list * args)
36 {
37   CLIB_UNUSED (vlib_main_t * vm) = va_arg (*args, vlib_main_t *);
38   CLIB_UNUSED (vlib_node_t * node) = va_arg (*args, vlib_node_t *);
39   session_queue_trace_t *t = va_arg (*args, session_queue_trace_t *);
40
41   s = format (s, "SESSION_QUEUE: session index %d, server thread index %d",
42               t->session_index, t->server_thread_index);
43   return s;
44 }
45
46 vlib_node_registration_t session_queue_node;
47
48 #define foreach_session_queue_error             \
49 _(TX, "Packets transmitted")                    \
50 _(TIMER, "Timer events")                        \
51 _(NO_BUFFER, "Out of buffers")
52
53 typedef enum
54 {
55 #define _(sym,str) SESSION_QUEUE_ERROR_##sym,
56   foreach_session_queue_error
57 #undef _
58     SESSION_QUEUE_N_ERROR,
59 } session_queue_error_t;
60
61 static char *session_queue_error_strings[] = {
62 #define _(sym,string) string,
63   foreach_session_queue_error
64 #undef _
65 };
66
67 always_inline void
68 session_tx_fifo_chain_tail (session_manager_main_t * smm, vlib_main_t * vm,
69                             u8 thread_index, svm_fifo_t * fifo,
70                             vlib_buffer_t * b0, u32 bi0, u8 n_bufs_per_seg,
71                             u32 left_from_seg, u32 * left_to_snd0,
72                             u16 * n_bufs, u32 * tx_offset, u16 deq_per_buf,
73                             u8 peek_data, transport_tx_fn_type_t tx_type)
74 {
75   vlib_buffer_t *chain_b0, *prev_b0;
76   u32 chain_bi0, to_deq;
77   u16 len_to_deq0, n_bytes_read;
78   u8 *data0, j;
79
80   b0->flags |= VLIB_BUFFER_TOTAL_LENGTH_VALID;
81   b0->total_length_not_including_first_buffer = 0;
82
83   chain_bi0 = bi0;
84   chain_b0 = b0;
85   to_deq = left_from_seg;
86   for (j = 1; j < n_bufs_per_seg; j++)
87     {
88       prev_b0 = chain_b0;
89       len_to_deq0 = clib_min (to_deq, deq_per_buf);
90
91       *n_bufs -= 1;
92       chain_bi0 = smm->tx_buffers[thread_index][*n_bufs];
93       _vec_len (smm->tx_buffers[thread_index]) = *n_bufs;
94
95       chain_b0 = vlib_get_buffer (vm, chain_bi0);
96       chain_b0->current_data = 0;
97       data0 = vlib_buffer_get_current (chain_b0);
98       if (peek_data)
99         {
100           n_bytes_read = svm_fifo_peek (fifo, *tx_offset, len_to_deq0, data0);
101           *tx_offset += n_bytes_read;
102         }
103       else
104         {
105           if (tx_type == TRANSPORT_TX_DGRAM)
106             {
107               session_dgram_hdr_t *hdr;
108               u16 deq_now;
109               hdr = (session_dgram_hdr_t *) svm_fifo_head (fifo);
110               deq_now = clib_min (hdr->data_length - hdr->data_offset,
111                                   len_to_deq0);
112               n_bytes_read = svm_fifo_peek (fifo, hdr->data_offset, deq_now,
113                                             data0);
114               ASSERT (n_bytes_read > 0);
115
116               hdr->data_offset += n_bytes_read;
117               if (hdr->data_offset == hdr->data_length)
118                 svm_fifo_dequeue_drop (fifo, hdr->data_length);
119             }
120           else
121             n_bytes_read = svm_fifo_dequeue_nowait (fifo, len_to_deq0, data0);
122         }
123       ASSERT (n_bytes_read == len_to_deq0);
124       chain_b0->current_length = n_bytes_read;
125       b0->total_length_not_including_first_buffer += chain_b0->current_length;
126
127       /* update previous buffer */
128       prev_b0->next_buffer = chain_bi0;
129       prev_b0->flags |= VLIB_BUFFER_NEXT_PRESENT;
130
131       /* update current buffer */
132       chain_b0->next_buffer = 0;
133
134       to_deq -= n_bytes_read;
135       if (to_deq == 0)
136         break;
137     }
138   ASSERT (to_deq == 0
139           && b0->total_length_not_including_first_buffer == left_from_seg);
140   *left_to_snd0 -= left_from_seg;
141 }
142
143 always_inline int
144 session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node,
145                                 session_manager_main_t * smm,
146                                 session_fifo_event_t * e0,
147                                 stream_session_t * s0, u32 thread_index,
148                                 int *n_tx_packets, u8 peek_data)
149 {
150   u32 n_trace = vlib_get_trace_count (vm, node);
151   u32 left_to_snd0, max_len_to_snd0, len_to_deq0, snd_space0;
152   u32 n_bufs_per_evt, n_frames_per_evt, n_bufs_per_frame;
153   transport_connection_t *tc0;
154   transport_proto_vft_t *transport_vft;
155   transport_proto_t tp;
156   u32 next_index, next0, *to_next, n_left_to_next, bi0;
157   vlib_buffer_t *b0;
158   u32 tx_offset = 0, max_dequeue0, n_bytes_per_seg, left_for_seg;
159   u16 snd_mss0, n_bufs_per_seg, n_bufs;
160   u8 *data0;
161   int i, n_bytes_read;
162   u32 n_bytes_per_buf, deq_per_buf, deq_per_first_buf;
163   u32 bufs_alloc, bufs_now;
164   session_dgram_hdr_t hdr;
165
166   next_index = next0 = smm->session_type_to_next[s0->session_type];
167   tp = session_get_transport_proto (s0);
168   transport_vft = transport_protocol_get_vft (tp);
169   if (peek_data)
170     {
171       if (PREDICT_FALSE (s0->session_state < SESSION_STATE_READY))
172         {
173           /* Can retransmit for closed sessions but can't send new data if
174            * session is not ready or closed */
175           vec_add1 (smm->pending_event_vector[thread_index], *e0);
176           return 0;
177         }
178       tc0 =
179         transport_vft->get_connection (s0->connection_index, thread_index);
180     }
181   else
182     {
183       if (s0->session_state == SESSION_STATE_LISTENING)
184         tc0 = transport_vft->get_listener (s0->connection_index);
185       else
186         {
187           if (PREDICT_FALSE (s0->session_state == SESSION_STATE_CLOSED))
188             return 0;
189           tc0 = transport_vft->get_connection (s0->connection_index,
190                                                thread_index);
191         }
192     }
193
194   /* Make sure we have space to send and there's something to dequeue */
195   snd_mss0 = transport_vft->send_mss (tc0);
196   snd_space0 = transport_vft->send_space (tc0);
197
198   /* Can't make any progress */
199   if (snd_space0 == 0 || snd_mss0 == 0)
200     {
201       vec_add1 (smm->pending_event_vector[thread_index], *e0);
202       return 0;
203     }
204
205   /* Allow enqueuing of a new event */
206   svm_fifo_unset_event (s0->server_tx_fifo);
207
208   /* Check how much we can pull. */
209   max_dequeue0 = svm_fifo_max_dequeue (s0->server_tx_fifo);
210   if (peek_data)
211     {
212       /* Offset in rx fifo from where to peek data */
213       tx_offset = transport_vft->tx_fifo_offset (tc0);
214       if (PREDICT_FALSE (tx_offset >= max_dequeue0))
215         return 0;
216       max_dequeue0 -= tx_offset;
217     }
218   else
219     {
220       if (transport_vft->tx_type == TRANSPORT_TX_DGRAM)
221         {
222           if (max_dequeue0 < sizeof (hdr))
223             return 0;
224           svm_fifo_peek (s0->server_tx_fifo, 0, sizeof (hdr), (u8 *) & hdr);
225           ASSERT (hdr.data_length > hdr.data_offset);
226           max_dequeue0 = hdr.data_length - hdr.data_offset;
227         }
228     }
229   ASSERT (max_dequeue0 > 0);
230
231   /* Ensure we're not writing more than transport window allows */
232   if (max_dequeue0 < snd_space0)
233     {
234       /* Constrained by tx queue. Try to send only fully formed segments */
235       max_len_to_snd0 = (max_dequeue0 > snd_mss0) ?
236         max_dequeue0 - max_dequeue0 % snd_mss0 : max_dequeue0;
237       /* TODO Nagle ? */
238     }
239   else
240     {
241       /* Expectation is that snd_space0 is already a multiple of snd_mss */
242       max_len_to_snd0 = snd_space0;
243     }
244
245   n_bytes_per_buf = vlib_buffer_free_list_buffer_size
246     (vm, VLIB_BUFFER_DEFAULT_FREE_LIST_INDEX);
247   ASSERT (n_bytes_per_buf > MAX_HDRS_LEN);
248   n_bytes_per_seg = MAX_HDRS_LEN + snd_mss0;
249   n_bufs_per_seg = ceil ((double) n_bytes_per_seg / n_bytes_per_buf);
250   n_bufs_per_evt = ceil ((double) max_len_to_snd0 / n_bytes_per_seg);
251   n_frames_per_evt = ceil ((double) n_bufs_per_evt / VLIB_FRAME_SIZE);
252   n_bufs_per_frame = n_bufs_per_seg * VLIB_FRAME_SIZE;
253
254   deq_per_buf = clib_min (snd_mss0, n_bytes_per_buf);
255   deq_per_first_buf = clib_min (snd_mss0, n_bytes_per_buf - MAX_HDRS_LEN);
256
257   n_bufs = vec_len (smm->tx_buffers[thread_index]);
258   left_to_snd0 = max_len_to_snd0;
259   for (i = 0; i < n_frames_per_evt; i++)
260     {
261       /* Make sure we have at least one full frame of buffers ready */
262       if (PREDICT_FALSE (n_bufs < n_bufs_per_frame))
263         {
264           vec_validate (smm->tx_buffers[thread_index],
265                         n_bufs + n_bufs_per_frame - 1);
266           bufs_alloc = 0;
267           do
268             {
269               bufs_now =
270                 vlib_buffer_alloc (vm,
271                                    &smm->tx_buffers[thread_index][n_bufs +
272                                                                   bufs_alloc],
273                                    n_bufs_per_frame - bufs_alloc);
274               bufs_alloc += bufs_now;
275             }
276           while (bufs_now > 0 && ((bufs_alloc + n_bufs < n_bufs_per_frame)));
277
278           n_bufs += bufs_alloc;
279           _vec_len (smm->tx_buffers[thread_index]) = n_bufs;
280
281           if (PREDICT_FALSE (n_bufs < n_bufs_per_frame))
282             {
283               vec_add1 (smm->pending_event_vector[thread_index], *e0);
284               return -1;
285             }
286           ASSERT (n_bufs >= n_bufs_per_frame);
287         }
288
289       vlib_get_next_frame (vm, node, next_index, to_next, n_left_to_next);
290       while (left_to_snd0 && n_left_to_next)
291         {
292           /*
293            * Handle first buffer in chain separately
294            */
295
296           len_to_deq0 = clib_min (left_to_snd0, deq_per_first_buf);
297           if (left_to_snd0 > len_to_deq0 && n_left_to_next > 1)
298             {
299               vlib_buffer_t *pb;
300               u32 pbi = smm->tx_buffers[thread_index][n_bufs - 2];
301               pb = vlib_get_buffer (vm, pbi);
302               vlib_prefetch_buffer_header (pb, LOAD);
303             }
304
305           /* Get free buffer */
306           ASSERT (n_bufs >= 1);
307           bi0 = smm->tx_buffers[thread_index][--n_bufs];
308           _vec_len (smm->tx_buffers[thread_index]) = n_bufs;
309
310           /* usual speculation, or the enqueue_x1 macro will barf */
311           to_next[0] = bi0;
312           to_next += 1;
313           n_left_to_next -= 1;
314
315           b0 = vlib_get_buffer (vm, bi0);
316           b0->error = 0;
317           b0->flags = VNET_BUFFER_F_LOCALLY_ORIGINATED;
318           b0->current_data = 0;
319           b0->total_length_not_including_first_buffer = 0;
320
321           data0 = vlib_buffer_make_headroom (b0, MAX_HDRS_LEN);
322           if (peek_data)
323             {
324               n_bytes_read = svm_fifo_peek (s0->server_tx_fifo, tx_offset,
325                                             len_to_deq0, data0);
326               if (n_bytes_read <= 0)
327                 goto dequeue_fail;
328               /* Keep track of progress locally, transport is also supposed to
329                * increment it independently when pushing the header */
330               tx_offset += n_bytes_read;
331             }
332           else
333             {
334               if (transport_vft->tx_type == TRANSPORT_TX_DGRAM)
335                 {
336                   svm_fifo_t *f = s0->server_tx_fifo;
337                   u16 deq_now;
338                   u32 offset;
339
340                   ASSERT (hdr.data_length > hdr.data_offset);
341                   deq_now = clib_min (hdr.data_length - hdr.data_offset,
342                                       len_to_deq0);
343                   offset = hdr.data_offset + SESSION_CONN_HDR_LEN;
344                   n_bytes_read = svm_fifo_peek (f, offset, deq_now, data0);
345                   if (PREDICT_FALSE (n_bytes_read <= 0))
346                     goto dequeue_fail;
347
348                   if (s0->session_state == SESSION_STATE_LISTENING)
349                     {
350                       ip_copy (&tc0->rmt_ip, &hdr.rmt_ip, tc0->is_ip4);
351                       tc0->rmt_port = hdr.rmt_port;
352                     }
353                   hdr.data_offset += n_bytes_read;
354                   if (hdr.data_offset == hdr.data_length)
355                     {
356                       offset = hdr.data_length + SESSION_CONN_HDR_LEN;
357                       svm_fifo_dequeue_drop (f, offset);
358                     }
359                 }
360               else
361                 {
362                   n_bytes_read = svm_fifo_dequeue_nowait (s0->server_tx_fifo,
363                                                           len_to_deq0, data0);
364                   if (n_bytes_read <= 0)
365                     goto dequeue_fail;
366                 }
367             }
368
369           b0->current_length = n_bytes_read;
370           left_to_snd0 -= n_bytes_read;
371           *n_tx_packets = *n_tx_packets + 1;
372
373           /*
374            * Fill in the remaining buffers in the chain, if any
375            */
376           if (PREDICT_FALSE (n_bufs_per_seg > 1 && left_to_snd0))
377             {
378               left_for_seg = clib_min (snd_mss0 - n_bytes_read, left_to_snd0);
379               session_tx_fifo_chain_tail (smm, vm, thread_index,
380                                           s0->server_tx_fifo, b0, bi0,
381                                           n_bufs_per_seg, left_for_seg,
382                                           &left_to_snd0, &n_bufs, &tx_offset,
383                                           deq_per_buf, peek_data,
384                                           transport_vft->tx_type);
385             }
386
387           /* Ask transport to push header after current_length and
388            * total_length_not_including_first_buffer are updated */
389           transport_vft->push_header (tc0, b0);
390
391           /* *INDENT-OFF* */
392           SESSION_EVT_DBG(SESSION_EVT_DEQ, s0, ({
393               ed->data[0] = e0->event_type;
394               ed->data[1] = max_dequeue0;
395               ed->data[2] = len_to_deq0;
396               ed->data[3] = left_to_snd0;
397           }));
398           /* *INDENT-ON* */
399
400           VLIB_BUFFER_TRACE_TRAJECTORY_INIT (b0);
401
402           if (PREDICT_FALSE (n_trace > 0))
403             {
404               session_queue_trace_t *t0;
405               vlib_trace_buffer (vm, node, next_index, b0,
406                                  1 /* follow_chain */ );
407               vlib_set_trace_count (vm, node, --n_trace);
408               t0 = vlib_add_trace (vm, node, b0, sizeof (*t0));
409               t0->session_index = s0->session_index;
410               t0->server_thread_index = s0->thread_index;
411             }
412
413           vlib_validate_buffer_enqueue_x1 (vm, node, next_index,
414                                            to_next, n_left_to_next,
415                                            bi0, next0);
416         }
417       vlib_put_next_frame (vm, node, next_index, n_left_to_next);
418     }
419
420   /* If we couldn't dequeue all bytes mark as partially read */
421   if (max_len_to_snd0 < max_dequeue0)
422     if (svm_fifo_set_event (s0->server_tx_fifo))
423       vec_add1 (smm->pending_event_vector[thread_index], *e0);
424
425   if (!peek_data && transport_vft->tx_type == TRANSPORT_TX_DGRAM)
426     {
427       /* Fix dgram pre header */
428       if (max_len_to_snd0 < max_dequeue0)
429         svm_fifo_overwrite_head (s0->server_tx_fifo, (u8 *) & hdr,
430                                  sizeof (session_dgram_pre_hdr_t));
431       /* More data needs to be read */
432       else if (svm_fifo_max_dequeue (s0->server_tx_fifo) > 0)
433         vec_add1 (smm->pending_event_vector[thread_index], *e0);
434     }
435   return 0;
436
437 dequeue_fail:
438   /*
439    * Can't read from fifo. If we don't already have an event, save as partially
440    * read, return buff to free list and return
441    */
442   clib_warning ("dequeue fail");
443   if (svm_fifo_set_event (s0->server_tx_fifo))
444     {
445       vec_add1 (smm->pending_event_vector[thread_index], *e0);
446     }
447   vlib_put_next_frame (vm, node, next_index, n_left_to_next + 1);
448   _vec_len (smm->tx_buffers[thread_index]) += 1;
449
450   return 0;
451 }
452
453 int
454 session_tx_fifo_peek_and_snd (vlib_main_t * vm, vlib_node_runtime_t * node,
455                               session_manager_main_t * smm,
456                               session_fifo_event_t * e0,
457                               stream_session_t * s0, u32 thread_index,
458                               int *n_tx_pkts)
459 {
460   return session_tx_fifo_read_and_snd_i (vm, node, smm, e0, s0, thread_index,
461                                          n_tx_pkts, 1);
462 }
463
464 int
465 session_tx_fifo_dequeue_and_snd (vlib_main_t * vm, vlib_node_runtime_t * node,
466                                  session_manager_main_t * smm,
467                                  session_fifo_event_t * e0,
468                                  stream_session_t * s0, u32 thread_index,
469                                  int *n_tx_pkts)
470 {
471   return session_tx_fifo_read_and_snd_i (vm, node, smm, e0, s0, thread_index,
472                                          n_tx_pkts, 0);
473 }
474
475 int
476 session_tx_fifo_dequeue_internal (vlib_main_t * vm,
477                                   vlib_node_runtime_t * node,
478                                   session_manager_main_t * smm,
479                                   session_fifo_event_t * e0,
480                                   stream_session_t * s0, u32 thread_index,
481                                   int *n_tx_pkts)
482 {
483   application_t *app;
484   app = application_get (s0->opaque);
485   svm_fifo_unset_event (s0->server_tx_fifo);
486   return app->cb_fns.builtin_app_tx_callback (s0);
487 }
488
489 always_inline stream_session_t *
490 session_event_get_session (session_fifo_event_t * e, u8 thread_index)
491 {
492   return session_get_if_valid (e->fifo->master_session_index, thread_index);
493 }
494
495 void
496 dump_thread_0_event_queue (void)
497 {
498   session_manager_main_t *smm = vnet_get_session_manager_main ();
499   vlib_main_t *vm = &vlib_global_main;
500   u32 my_thread_index = vm->thread_index;
501   session_fifo_event_t _e, *e = &_e;
502   stream_session_t *s0;
503   int i, index;
504   i8 *headp;
505
506   svm_queue_t *q;
507   q = smm->vpp_event_queues[my_thread_index];
508
509   index = q->head;
510
511   for (i = 0; i < q->cursize; i++)
512     {
513       headp = (i8 *) (&q->data[0] + q->elsize * index);
514       clib_memcpy (e, headp, q->elsize);
515
516       switch (e->event_type)
517         {
518         case FIFO_EVENT_APP_TX:
519           s0 = session_event_get_session (e, my_thread_index);
520           fformat (stdout, "[%04d] TX session %d\n", i, s0->session_index);
521           break;
522
523         case FIFO_EVENT_DISCONNECT:
524           s0 = session_get_from_handle (e->session_handle);
525           fformat (stdout, "[%04d] disconnect session %d\n", i,
526                    s0->session_index);
527           break;
528
529         case FIFO_EVENT_BUILTIN_RX:
530           s0 = session_event_get_session (e, my_thread_index);
531           fformat (stdout, "[%04d] builtin_rx %d\n", i, s0->session_index);
532           break;
533
534         case FIFO_EVENT_RPC:
535           fformat (stdout, "[%04d] RPC call %llx with %llx\n",
536                    i, (u64) (e->rpc_args.fp), (u64) (e->rpc_args.arg));
537           break;
538
539         default:
540           fformat (stdout, "[%04d] unhandled event type %d\n",
541                    i, e->event_type);
542           break;
543         }
544
545       index++;
546
547       if (index == q->maxsize)
548         index = 0;
549     }
550 }
551
552 static u8
553 session_node_cmp_event (session_fifo_event_t * e, svm_fifo_t * f)
554 {
555   stream_session_t *s;
556   switch (e->event_type)
557     {
558     case FIFO_EVENT_APP_RX:
559     case FIFO_EVENT_APP_TX:
560     case FIFO_EVENT_BUILTIN_RX:
561       if (e->fifo == f)
562         return 1;
563       break;
564     case FIFO_EVENT_DISCONNECT:
565       break;
566     case FIFO_EVENT_RPC:
567       s = session_get_from_handle (e->session_handle);
568       if (!s)
569         {
570           clib_warning ("session has event but doesn't exist!");
571           break;
572         }
573       if (s->server_rx_fifo == f || s->server_tx_fifo == f)
574         return 1;
575       break;
576     default:
577       break;
578     }
579   return 0;
580 }
581
582 u8
583 session_node_lookup_fifo_event (svm_fifo_t * f, session_fifo_event_t * e)
584 {
585   session_manager_main_t *smm = vnet_get_session_manager_main ();
586   svm_queue_t *q;
587   session_fifo_event_t *pending_event_vector, *evt;
588   int i, index, found = 0;
589   i8 *headp;
590   u8 thread_index;
591
592   ASSERT (e);
593   thread_index = f->master_thread_index;
594   /*
595    * Search evt queue
596    */
597   q = smm->vpp_event_queues[thread_index];
598   index = q->head;
599   for (i = 0; i < q->cursize; i++)
600     {
601       headp = (i8 *) (&q->data[0] + q->elsize * index);
602       clib_memcpy (e, headp, q->elsize);
603       found = session_node_cmp_event (e, f);
604       if (found)
605         return 1;
606       if (++index == q->maxsize)
607         index = 0;
608     }
609   /*
610    * Search pending events vector
611    */
612   pending_event_vector = smm->pending_event_vector[thread_index];
613   vec_foreach (evt, pending_event_vector)
614   {
615     found = session_node_cmp_event (evt, f);
616     if (found)
617       {
618         clib_memcpy (e, evt, sizeof (*evt));
619         break;
620       }
621   }
622   return found;
623 }
624
625 static uword
626 session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
627                        vlib_frame_t * frame)
628 {
629   session_manager_main_t *smm = vnet_get_session_manager_main ();
630   session_fifo_event_t *my_pending_event_vector, *pending_disconnects, *e;
631   session_fifo_event_t *my_fifo_events;
632   u32 n_to_dequeue, n_events;
633   svm_queue_t *q;
634   application_t *app;
635   int n_tx_packets = 0;
636   u32 my_thread_index = vm->thread_index;
637   int i, rv;
638   f64 now = vlib_time_now (vm);
639   void (*fp) (void *);
640
641   SESSION_EVT_DBG (SESSION_EVT_POLL_GAP_TRACK, smm, my_thread_index);
642
643   /*
644    *  Update transport time
645    */
646   transport_update_time (now, my_thread_index);
647
648   /*
649    * Get vpp queue events
650    */
651   q = smm->vpp_event_queues[my_thread_index];
652   if (PREDICT_FALSE (q == 0))
653     return 0;
654
655   my_fifo_events = smm->free_event_vector[my_thread_index];
656
657   /* min number of events we can dequeue without blocking */
658   n_to_dequeue = q->cursize;
659   my_pending_event_vector = smm->pending_event_vector[my_thread_index];
660   pending_disconnects = smm->pending_disconnects[my_thread_index];
661
662   if (!n_to_dequeue && !vec_len (my_pending_event_vector)
663       && !vec_len (pending_disconnects))
664     return 0;
665
666   SESSION_EVT_DBG (SESSION_EVT_DEQ_NODE, 0);
667
668   /*
669    * If we didn't manage to process previous events try going
670    * over them again without dequeuing new ones.
671    */
672   /* XXX: Block senders to sessions that can't keep up */
673   if (0 && vec_len (my_pending_event_vector) >= 100)
674     {
675       clib_warning ("too many fifo events unsolved");
676       goto skip_dequeue;
677     }
678
679   /* See you in the next life, don't be late */
680   if (pthread_mutex_trylock (&q->mutex))
681     return 0;
682
683   for (i = 0; i < n_to_dequeue; i++)
684     {
685       vec_add2 (my_fifo_events, e, 1);
686       svm_queue_sub_raw (q, (u8 *) e);
687     }
688
689   /* The other side of the connection is not polling */
690   if (q->cursize < (q->maxsize / 8))
691     (void) pthread_cond_broadcast (&q->condvar);
692   pthread_mutex_unlock (&q->mutex);
693
694   vec_append (my_fifo_events, my_pending_event_vector);
695   vec_append (my_fifo_events, smm->pending_disconnects[my_thread_index]);
696
697   _vec_len (my_pending_event_vector) = 0;
698   smm->pending_event_vector[my_thread_index] = my_pending_event_vector;
699   _vec_len (smm->pending_disconnects[my_thread_index]) = 0;
700
701 skip_dequeue:
702   n_events = vec_len (my_fifo_events);
703   for (i = 0; i < n_events; i++)
704     {
705       stream_session_t *s0;     /* $$$ prefetch 1 ahead maybe */
706       session_fifo_event_t *e0;
707
708       e0 = &my_fifo_events[i];
709
710       switch (e0->event_type)
711         {
712         case FIFO_EVENT_APP_TX:
713           s0 = session_event_get_session (e0, my_thread_index);
714
715           if (PREDICT_FALSE (!s0))
716             {
717               clib_warning ("It's dead, Jim!");
718               continue;
719             }
720           /* Spray packets in per session type frames, since they go to
721            * different nodes */
722           rv = (smm->session_tx_fns[s0->session_type]) (vm, node, smm, e0, s0,
723                                                         my_thread_index,
724                                                         &n_tx_packets);
725           /* Out of buffers */
726           if (PREDICT_FALSE (rv < 0))
727             {
728               vlib_node_increment_counter (vm, node->node_index,
729                                            SESSION_QUEUE_ERROR_NO_BUFFER, 1);
730               continue;
731             }
732           break;
733         case FIFO_EVENT_DISCONNECT:
734           /* Make sure disconnects run after the pending list is drained */
735           if (!e0->postponed)
736             {
737               e0->postponed = 1;
738               vec_add1 (smm->pending_disconnects[my_thread_index], *e0);
739               continue;
740             }
741           s0 = session_get_from_handle (e0->session_handle);
742           stream_session_disconnect_transport (s0);
743           break;
744         case FIFO_EVENT_BUILTIN_RX:
745           s0 = session_event_get_session (e0, my_thread_index);
746           if (PREDICT_FALSE (!s0))
747             continue;
748           svm_fifo_unset_event (s0->server_rx_fifo);
749           app = application_get (s0->app_index);
750           app->cb_fns.builtin_app_rx_callback (s0);
751           break;
752         case FIFO_EVENT_RPC:
753           fp = e0->rpc_args.fp;
754           (*fp) (e0->rpc_args.arg);
755           break;
756
757         default:
758           clib_warning ("unhandled event type %d", e0->event_type);
759         }
760     }
761
762   _vec_len (my_fifo_events) = 0;
763   smm->free_event_vector[my_thread_index] = my_fifo_events;
764
765   vlib_node_increment_counter (vm, session_queue_node.index,
766                                SESSION_QUEUE_ERROR_TX, n_tx_packets);
767
768   SESSION_EVT_DBG (SESSION_EVT_DEQ_NODE, 1);
769
770   return n_tx_packets;
771 }
772
773 /* *INDENT-OFF* */
774 VLIB_REGISTER_NODE (session_queue_node) =
775 {
776   .function = session_queue_node_fn,
777   .name = "session-queue",
778   .format_trace = format_session_queue_trace,
779   .type = VLIB_NODE_TYPE_INPUT,
780   .n_errors = ARRAY_LEN (session_queue_error_strings),
781   .error_strings = session_queue_error_strings,
782   .state = VLIB_NODE_STATE_DISABLED,
783 };
784 /* *INDENT-ON* */
785
786 static clib_error_t *
787 session_queue_exit (vlib_main_t * vm)
788 {
789   if (vec_len (vlib_mains) < 2)
790     return 0;
791
792   /*
793    * Shut off (especially) worker-thread session nodes.
794    * Otherwise, vpp can crash as the main thread unmaps the
795    * API segment.
796    */
797   vlib_worker_thread_barrier_sync (vm);
798   session_node_enable_disable (0 /* is_enable */ );
799   vlib_worker_thread_barrier_release (vm);
800   return 0;
801 }
802
803 VLIB_MAIN_LOOP_EXIT_FUNCTION (session_queue_exit);
804
805 /*
806  * fd.io coding-style-patch-verification: ON
807  *
808  * Local Variables:
809  * eval: (c-set-style "gnu")
810  * End:
811  */