399077decb095b18ce7a553192c2ffd48dc3650a
[vpp.git] / src / vnet / session / node.c
1 /*
2  * Copyright (c) 2017 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include <vlib/vlib.h>
17 #include <vnet/vnet.h>
18 #include <vnet/pg/pg.h>
19 #include <vnet/ip/ip.h>
20
21 #include <vnet/tcp/tcp.h>
22
23 #include <vppinfra/hash.h>
24 #include <vppinfra/error.h>
25 #include <vppinfra/elog.h>
26 #include <vlibmemory/unix_shared_memory_queue.h>
27
28 #include <vnet/udp/udp_packet.h>
29 #include <vnet/lisp-cp/packets.h>
30 #include <math.h>
31
32 vlib_node_registration_t session_queue_node;
33
34 typedef struct
35 {
36   u32 session_index;
37   u32 server_thread_index;
38 } session_queue_trace_t;
39
40 /* packet trace format function */
41 static u8 *
42 format_session_queue_trace (u8 * s, va_list * args)
43 {
44   CLIB_UNUSED (vlib_main_t * vm) = va_arg (*args, vlib_main_t *);
45   CLIB_UNUSED (vlib_node_t * node) = va_arg (*args, vlib_node_t *);
46   session_queue_trace_t *t = va_arg (*args, session_queue_trace_t *);
47
48   s = format (s, "SESSION_QUEUE: session index %d, server thread index %d",
49               t->session_index, t->server_thread_index);
50   return s;
51 }
52
53 vlib_node_registration_t session_queue_node;
54
55 #define foreach_session_queue_error                 \
56 _(TX, "Packets transmitted")                    \
57 _(TIMER, "Timer events")
58
59 typedef enum
60 {
61 #define _(sym,str) SESSION_QUEUE_ERROR_##sym,
62   foreach_session_queue_error
63 #undef _
64     SESSION_QUEUE_N_ERROR,
65 } session_queue_error_t;
66
67 static char *session_queue_error_strings[] = {
68 #define _(sym,string) string,
69   foreach_session_queue_error
70 #undef _
71 };
72
73 static u32 session_type_to_next[] = {
74   SESSION_QUEUE_NEXT_TCP_IP4_OUTPUT,
75   SESSION_QUEUE_NEXT_IP4_LOOKUP,
76   SESSION_QUEUE_NEXT_TCP_IP6_OUTPUT,
77   SESSION_QUEUE_NEXT_IP6_LOOKUP,
78 };
79
80 always_inline int
81 session_fifo_rx_i (vlib_main_t * vm, vlib_node_runtime_t * node,
82                    session_manager_main_t * smm, session_fifo_event_t * e0,
83                    stream_session_t * s0, u32 thread_index, int *n_tx_packets,
84                    u8 peek_data)
85 {
86   u32 n_trace = vlib_get_trace_count (vm, node);
87   u32 left_to_snd0, max_len_to_snd0, len_to_deq0, n_bufs, snd_space0;
88   u32 n_frame_bytes, n_frames_per_evt;
89   transport_connection_t *tc0;
90   transport_proto_vft_t *transport_vft;
91   u32 next_index, next0, *to_next, n_left_to_next, bi0;
92   vlib_buffer_t *b0;
93   u32 rx_offset;
94   u16 snd_mss0;
95   u8 *data0;
96   int i;
97
98   next_index = next0 = session_type_to_next[s0->session_type];
99
100   transport_vft = session_get_transport_vft (s0->session_type);
101   tc0 = transport_vft->get_connection (s0->connection_index, thread_index);
102
103   /* Make sure we have space to send and there's something to dequeue */
104   snd_space0 = transport_vft->send_space (tc0);
105   snd_mss0 = transport_vft->send_mss (tc0);
106
107   /* Can't make any progress */
108   if (snd_space0 == 0 || svm_fifo_max_dequeue (s0->server_tx_fifo) == 0
109       || snd_mss0 == 0)
110     {
111       vec_add1 (smm->evts_partially_read[thread_index], *e0);
112       return 0;
113     }
114
115   ASSERT (e0->enqueue_length > 0);
116
117   /* Ensure we're not writing more than transport window allows */
118   max_len_to_snd0 = clib_min (e0->enqueue_length, snd_space0);
119
120   if (peek_data)
121     {
122       /* Offset in rx fifo from where to peek data  */
123       rx_offset = transport_vft->rx_fifo_offset (tc0);
124     }
125
126   /* TODO check if transport is willing to send len_to_snd0
127    * bytes (Nagle) */
128
129   n_frame_bytes = snd_mss0 * VLIB_FRAME_SIZE;
130   n_frames_per_evt = ceil ((double) max_len_to_snd0 / n_frame_bytes);
131
132   n_bufs = vec_len (smm->tx_buffers[thread_index]);
133   left_to_snd0 = max_len_to_snd0;
134   for (i = 0; i < n_frames_per_evt; i++)
135     {
136       /* Make sure we have at least one full frame of buffers ready */
137       if (PREDICT_FALSE (n_bufs < VLIB_FRAME_SIZE))
138         {
139           vec_validate (smm->tx_buffers[thread_index],
140                         n_bufs + VLIB_FRAME_SIZE - 1);
141           n_bufs +=
142             vlib_buffer_alloc (vm, &smm->tx_buffers[thread_index][n_bufs],
143                                VLIB_FRAME_SIZE);
144
145           /* buffer shortage
146            * XXX 0.9 because when debugging we might not get a full frame */
147           if (PREDICT_FALSE (n_bufs < 0.9 * VLIB_FRAME_SIZE))
148             {
149               /* Keep track of how much we've dequeued and exit */
150               if (left_to_snd0 != max_len_to_snd0)
151                 {
152                   e0->enqueue_length -= max_len_to_snd0 - left_to_snd0;
153                   vec_add1 (smm->evts_partially_read[thread_index], *e0);
154                 }
155
156               return -1;
157             }
158
159           _vec_len (smm->tx_buffers[thread_index]) = n_bufs;
160         }
161
162       vlib_get_next_frame (vm, node, next_index, to_next, n_left_to_next);
163       while (left_to_snd0 && n_left_to_next)
164         {
165           /* Get free buffer */
166           n_bufs--;
167           bi0 = smm->tx_buffers[thread_index][n_bufs];
168           _vec_len (smm->tx_buffers[thread_index]) = n_bufs;
169
170           b0 = vlib_get_buffer (vm, bi0);
171           b0->error = 0;
172           b0->flags = VLIB_BUFFER_TOTAL_LENGTH_VALID
173             | VNET_BUFFER_LOCALLY_ORIGINATED;
174           b0->current_data = 0;
175
176           /* RX on the local interface. tx in default fib */
177           vnet_buffer (b0)->sw_if_index[VLIB_RX] = 0;
178           vnet_buffer (b0)->sw_if_index[VLIB_TX] = (u32) ~ 0;
179
180           /* usual speculation, or the enqueue_x1 macro will barf */
181           to_next[0] = bi0;
182           to_next += 1;
183           n_left_to_next -= 1;
184
185           VLIB_BUFFER_TRACE_TRAJECTORY_INIT (b0);
186           if (PREDICT_FALSE (n_trace > 0))
187             {
188               session_queue_trace_t *t0;
189               vlib_trace_buffer (vm, node, next_index, b0,
190                                  1 /* follow_chain */ );
191               vlib_set_trace_count (vm, node, --n_trace);
192               t0 = vlib_add_trace (vm, node, b0, sizeof (*t0));
193               t0->session_index = s0->session_index;
194               t0->server_thread_index = s0->thread_index;
195             }
196
197           /* *INDENT-OFF* */
198           if (1)
199             {
200               ELOG_TYPE_DECLARE (e) = {
201                   .format = "evt-dequeue: id %d length %d",
202                   .format_args = "i4i4",
203               };
204               struct
205               {
206                 u32 data[2];
207               } *ed;
208               ed = ELOG_DATA (&vm->elog_main, e);
209               ed->data[0] = e0->event_id;
210               ed->data[1] = e0->enqueue_length;
211             }
212           /* *INDENT-ON* */
213
214           len_to_deq0 = (left_to_snd0 < snd_mss0) ? left_to_snd0 : snd_mss0;
215
216           /* Make room for headers */
217           data0 = vlib_buffer_make_headroom (b0, MAX_HDRS_LEN);
218
219           /* Dequeue the data
220            * TODO 1) peek instead of dequeue
221            *      2) buffer chains */
222           if (peek_data)
223             {
224               int n_bytes_read;
225               n_bytes_read = svm_fifo_peek (s0->server_tx_fifo, s0->pid,
226                                             rx_offset, len_to_deq0, data0);
227               if (n_bytes_read < 0)
228                 goto dequeue_fail;
229
230               /* Keep track of progress locally, transport is also supposed to
231                * increment it independently when pushing header */
232               rx_offset += n_bytes_read;
233             }
234           else
235             {
236               if (svm_fifo_dequeue_nowait (s0->server_tx_fifo, s0->pid,
237                                            len_to_deq0, data0) < 0)
238                 goto dequeue_fail;
239             }
240
241           b0->current_length = len_to_deq0;
242
243           /* Ask transport to push header */
244           transport_vft->push_header (tc0, b0);
245
246           left_to_snd0 -= len_to_deq0;
247           *n_tx_packets = *n_tx_packets + 1;
248
249           vlib_validate_buffer_enqueue_x1 (vm, node, next_index,
250                                            to_next, n_left_to_next,
251                                            bi0, next0);
252         }
253       vlib_put_next_frame (vm, node, next_index, n_left_to_next);
254     }
255
256   /* If we couldn't dequeue all bytes store progress */
257   if (max_len_to_snd0 < e0->enqueue_length)
258     {
259       e0->enqueue_length -= max_len_to_snd0;
260       vec_add1 (smm->evts_partially_read[thread_index], *e0);
261     }
262   return 0;
263
264 dequeue_fail:
265   /* Can't read from fifo. Store event rx progress, save as partially read,
266    * return buff to free list and return  */
267   e0->enqueue_length -= max_len_to_snd0 - left_to_snd0;
268   vec_add1 (smm->evts_partially_read[thread_index], *e0);
269
270   to_next -= 1;
271   n_left_to_next += 1;
272   _vec_len (smm->tx_buffers[thread_index]) += 1;
273
274   clib_warning ("dequeue fail");
275   return 0;
276 }
277
278 int
279 session_fifo_rx_peek (vlib_main_t * vm, vlib_node_runtime_t * node,
280                       session_manager_main_t * smm, session_fifo_event_t * e0,
281                       stream_session_t * s0, u32 thread_index, int *n_tx_pkts)
282 {
283   return session_fifo_rx_i (vm, node, smm, e0, s0, thread_index, n_tx_pkts,
284                             1);
285 }
286
287 int
288 session_fifo_rx_dequeue (vlib_main_t * vm, vlib_node_runtime_t * node,
289                          session_manager_main_t * smm,
290                          session_fifo_event_t * e0, stream_session_t * s0,
291                          u32 thread_index, int *n_tx_pkts)
292 {
293   return session_fifo_rx_i (vm, node, smm, e0, s0, thread_index, n_tx_pkts,
294                             0);
295 }
296
297 static uword
298 session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
299                        vlib_frame_t * frame)
300 {
301   session_manager_main_t *smm = vnet_get_session_manager_main ();
302   session_fifo_event_t *my_fifo_events, *e;
303   u32 n_to_dequeue, n_events;
304   unix_shared_memory_queue_t *q;
305   int n_tx_packets = 0;
306   u32 my_thread_index = vm->cpu_index;
307   int i, rv;
308
309   /*
310    *  Update TCP time
311    */
312   tcp_update_time (vlib_time_now (vm), my_thread_index);
313
314   /*
315    * Get vpp queue events
316    */
317   q = smm->vpp_event_queues[my_thread_index];
318   if (PREDICT_FALSE (q == 0))
319     return 0;
320
321   /* min number of events we can dequeue without blocking */
322   n_to_dequeue = q->cursize;
323   my_fifo_events = smm->fifo_events[my_thread_index];
324
325   if (n_to_dequeue == 0 && vec_len (my_fifo_events) == 0)
326     return 0;
327
328   /*
329    * If we didn't manage to process previous events try going
330    * over them again without dequeuing new ones.
331    */
332   /* XXX: Block senders to sessions that can't keep up */
333   if (vec_len (my_fifo_events) >= 100)
334     goto skip_dequeue;
335
336   /* See you in the next life, don't be late */
337   if (pthread_mutex_trylock (&q->mutex))
338     return 0;
339
340   for (i = 0; i < n_to_dequeue; i++)
341     {
342       vec_add2 (my_fifo_events, e, 1);
343       unix_shared_memory_queue_sub_raw (q, (u8 *) e);
344     }
345
346   /* The other side of the connection is not polling */
347   if (q->cursize < (q->maxsize / 8))
348     (void) pthread_cond_broadcast (&q->condvar);
349   pthread_mutex_unlock (&q->mutex);
350
351   smm->fifo_events[my_thread_index] = my_fifo_events;
352
353 skip_dequeue:
354   n_events = vec_len (my_fifo_events);
355   for (i = 0; i < n_events; i++)
356     {
357       svm_fifo_t *f0;           /* $$$ prefetch 1 ahead maybe */
358       stream_session_t *s0;
359       u32 server_session_index0, server_thread_index0;
360       session_fifo_event_t *e0;
361
362       e0 = &my_fifo_events[i];
363       f0 = e0->fifo;
364       server_session_index0 = f0->server_session_index;
365       server_thread_index0 = f0->server_thread_index;
366
367       /* $$$ add multiple event queues, per vpp worker thread */
368       ASSERT (server_thread_index0 == my_thread_index);
369
370       s0 = stream_session_get_if_valid (server_session_index0,
371                                         my_thread_index);
372       if (!s0)
373         {
374           clib_warning ("It's dead Jim!");
375           continue;
376         }
377
378       ASSERT (s0->thread_index == my_thread_index);
379
380       switch (e0->event_type)
381         {
382         case FIFO_EVENT_SERVER_TX:
383           /* Spray packets in per session type frames, since they go to
384            * different nodes */
385           rv = (smm->session_rx_fns[s0->session_type]) (vm, node, smm, e0, s0,
386                                                         my_thread_index,
387                                                         &n_tx_packets);
388           if (rv < 0)
389             goto done;
390
391           break;
392
393         default:
394           clib_warning ("unhandled event type %d", e0->event_type);
395         }
396     }
397
398 done:
399
400   /* Couldn't process all events. Probably out of buffers */
401   if (PREDICT_FALSE (i < n_events))
402     {
403       session_fifo_event_t *partially_read =
404         smm->evts_partially_read[my_thread_index];
405       vec_add (partially_read, &my_fifo_events[i], n_events - i);
406       vec_free (my_fifo_events);
407       smm->fifo_events[my_thread_index] = partially_read;
408       smm->evts_partially_read[my_thread_index] = 0;
409     }
410   else
411     {
412       vec_free (smm->fifo_events[my_thread_index]);
413       smm->fifo_events[my_thread_index] =
414         smm->evts_partially_read[my_thread_index];
415       smm->evts_partially_read[my_thread_index] = 0;
416     }
417
418   vlib_node_increment_counter (vm, session_queue_node.index,
419                                SESSION_QUEUE_ERROR_TX, n_tx_packets);
420
421   return n_tx_packets;
422 }
423
424 /* *INDENT-OFF* */
425 VLIB_REGISTER_NODE (session_queue_node) =
426 {
427   .function = session_queue_node_fn,
428   .name = "session-queue",
429   .format_trace = format_session_queue_trace,
430   .type = VLIB_NODE_TYPE_INPUT,
431   .n_errors = ARRAY_LEN (session_queue_error_strings),
432   .error_strings = session_queue_error_strings,
433   .n_next_nodes = SESSION_QUEUE_N_NEXT,
434   .state = VLIB_NODE_STATE_DISABLED,
435   .next_nodes =
436   {
437       [SESSION_QUEUE_NEXT_DROP] = "error-drop",
438       [SESSION_QUEUE_NEXT_IP4_LOOKUP] = "ip4-lookup",
439       [SESSION_QUEUE_NEXT_IP6_LOOKUP] = "ip6-lookup",
440       [SESSION_QUEUE_NEXT_TCP_IP4_OUTPUT] = "tcp4-output",
441       [SESSION_QUEUE_NEXT_TCP_IP6_OUTPUT] = "tcp6-output",
442   },
443 };
444 /* *INDENT-ON* */
445
446 /*
447  * fd.io coding-style-patch-verification: ON
448  *
449  * Local Variables:
450  * eval: (c-set-style "gnu")
451  * End:
452  */