VPP-659 TCP improvements
[vpp.git] / src / vnet / session / node.c
1 /*
2  * Copyright (c) 2017 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include <vlib/vlib.h>
17 #include <vnet/vnet.h>
18 #include <vnet/pg/pg.h>
19 #include <vnet/ip/ip.h>
20
21 #include <vnet/tcp/tcp.h>
22
23 #include <vppinfra/hash.h>
24 #include <vppinfra/error.h>
25 #include <vppinfra/elog.h>
26 #include <vlibmemory/unix_shared_memory_queue.h>
27
28 #include <vnet/udp/udp_packet.h>
29 #include <vnet/lisp-cp/packets.h>
30 #include <math.h>
31
32 vlib_node_registration_t session_queue_node;
33
34 typedef struct
35 {
36   u32 session_index;
37   u32 server_thread_index;
38 } session_queue_trace_t;
39
40 /* packet trace format function */
41 static u8 *
42 format_session_queue_trace (u8 * s, va_list * args)
43 {
44   CLIB_UNUSED (vlib_main_t * vm) = va_arg (*args, vlib_main_t *);
45   CLIB_UNUSED (vlib_node_t * node) = va_arg (*args, vlib_node_t *);
46   session_queue_trace_t *t = va_arg (*args, session_queue_trace_t *);
47
48   s = format (s, "SESSION_QUEUE: session index %d, server thread index %d",
49               t->session_index, t->server_thread_index);
50   return s;
51 }
52
53 vlib_node_registration_t session_queue_node;
54
55 #define foreach_session_queue_error                 \
56 _(TX, "Packets transmitted")                    \
57 _(TIMER, "Timer events")
58
59 typedef enum
60 {
61 #define _(sym,str) SESSION_QUEUE_ERROR_##sym,
62   foreach_session_queue_error
63 #undef _
64     SESSION_QUEUE_N_ERROR,
65 } session_queue_error_t;
66
67 static char *session_queue_error_strings[] = {
68 #define _(sym,string) string,
69   foreach_session_queue_error
70 #undef _
71 };
72
73 static u32 session_type_to_next[] = {
74   SESSION_QUEUE_NEXT_TCP_IP4_OUTPUT,
75   SESSION_QUEUE_NEXT_IP4_LOOKUP,
76   SESSION_QUEUE_NEXT_TCP_IP6_OUTPUT,
77   SESSION_QUEUE_NEXT_IP6_LOOKUP,
78 };
79
80 always_inline int
81 session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node,
82                                 session_manager_main_t * smm,
83                                 session_fifo_event_t * e0,
84                                 stream_session_t * s0, u32 thread_index,
85                                 int *n_tx_packets, u8 peek_data)
86 {
87   u32 n_trace = vlib_get_trace_count (vm, node);
88   u32 left_to_snd0, max_len_to_snd0, len_to_deq0, n_bufs, snd_space0;
89   u32 n_frame_bytes, n_frames_per_evt;
90   transport_connection_t *tc0;
91   transport_proto_vft_t *transport_vft;
92   u32 next_index, next0, *to_next, n_left_to_next, bi0;
93   vlib_buffer_t *b0;
94   u32 rx_offset;
95   u16 snd_mss0;
96   u8 *data0;
97   int i;
98
99   next_index = next0 = session_type_to_next[s0->session_type];
100
101   transport_vft = session_get_transport_vft (s0->session_type);
102   tc0 = transport_vft->get_connection (s0->connection_index, thread_index);
103
104   /* Make sure we have space to send and there's something to dequeue */
105   snd_space0 = transport_vft->send_space (tc0);
106   snd_mss0 = transport_vft->send_mss (tc0);
107
108   /* Can't make any progress */
109   if (snd_space0 == 0 || svm_fifo_max_dequeue (s0->server_tx_fifo) == 0
110       || snd_mss0 == 0)
111     {
112       vec_add1 (smm->evts_partially_read[thread_index], *e0);
113       return 0;
114     }
115
116   ASSERT (e0->enqueue_length > 0);
117
118   /* Ensure we're not writing more than transport window allows */
119   max_len_to_snd0 = clib_min (e0->enqueue_length, snd_space0);
120
121   if (peek_data)
122     {
123       /* Offset in rx fifo from where to peek data  */
124       rx_offset = transport_vft->tx_fifo_offset (tc0);
125     }
126
127   /* TODO check if transport is willing to send len_to_snd0
128    * bytes (Nagle) */
129
130   n_frame_bytes = snd_mss0 * VLIB_FRAME_SIZE;
131   n_frames_per_evt = ceil ((double) max_len_to_snd0 / n_frame_bytes);
132
133   n_bufs = vec_len (smm->tx_buffers[thread_index]);
134   left_to_snd0 = max_len_to_snd0;
135   for (i = 0; i < n_frames_per_evt; i++)
136     {
137       /* Make sure we have at least one full frame of buffers ready */
138       if (PREDICT_FALSE (n_bufs < VLIB_FRAME_SIZE))
139         {
140           vec_validate (smm->tx_buffers[thread_index],
141                         n_bufs + VLIB_FRAME_SIZE - 1);
142           n_bufs +=
143             vlib_buffer_alloc (vm, &smm->tx_buffers[thread_index][n_bufs],
144                                VLIB_FRAME_SIZE);
145
146           /* buffer shortage
147            * XXX 0.9 because when debugging we might not get a full frame */
148           if (PREDICT_FALSE (n_bufs < 0.9 * VLIB_FRAME_SIZE))
149             {
150               /* Keep track of how much we've dequeued and exit */
151               if (left_to_snd0 != max_len_to_snd0)
152                 {
153                   e0->enqueue_length -= max_len_to_snd0 - left_to_snd0;
154                   vec_add1 (smm->evts_partially_read[thread_index], *e0);
155                 }
156
157               return -1;
158             }
159
160           _vec_len (smm->tx_buffers[thread_index]) = n_bufs;
161         }
162
163       vlib_get_next_frame (vm, node, next_index, to_next, n_left_to_next);
164       while (left_to_snd0 && n_left_to_next)
165         {
166           /* Get free buffer */
167           n_bufs--;
168           bi0 = smm->tx_buffers[thread_index][n_bufs];
169           _vec_len (smm->tx_buffers[thread_index]) = n_bufs;
170
171           b0 = vlib_get_buffer (vm, bi0);
172           b0->error = 0;
173           b0->flags = VLIB_BUFFER_TOTAL_LENGTH_VALID
174             | VNET_BUFFER_LOCALLY_ORIGINATED;
175           b0->current_data = 0;
176
177           /* RX on the local interface. tx in default fib */
178           vnet_buffer (b0)->sw_if_index[VLIB_RX] = 0;
179           vnet_buffer (b0)->sw_if_index[VLIB_TX] = (u32) ~ 0;
180
181           /* usual speculation, or the enqueue_x1 macro will barf */
182           to_next[0] = bi0;
183           to_next += 1;
184           n_left_to_next -= 1;
185
186           VLIB_BUFFER_TRACE_TRAJECTORY_INIT (b0);
187           if (PREDICT_FALSE (n_trace > 0))
188             {
189               session_queue_trace_t *t0;
190               vlib_trace_buffer (vm, node, next_index, b0,
191                                  1 /* follow_chain */ );
192               vlib_set_trace_count (vm, node, --n_trace);
193               t0 = vlib_add_trace (vm, node, b0, sizeof (*t0));
194               t0->session_index = s0->session_index;
195               t0->server_thread_index = s0->thread_index;
196             }
197
198           len_to_deq0 = (left_to_snd0 < snd_mss0) ? left_to_snd0 : snd_mss0;
199
200           /* *INDENT-OFF* */
201           if (1)
202             {
203               ELOG_TYPE_DECLARE (e) = {
204                   .format = "evt-deq: id %d len %d rd %d wnd %d",
205                   .format_args = "i4i4i4i4",
206               };
207               struct
208               {
209                 u32 data[4];
210               } *ed;
211               ed = ELOG_DATA (&vm->elog_main, e);
212               ed->data[0] = e0->event_id;
213               ed->data[1] = e0->enqueue_length;
214               ed->data[2] = len_to_deq0;
215               ed->data[3] = left_to_snd0;
216             }
217           /* *INDENT-ON* */
218
219           /* Make room for headers */
220           data0 = vlib_buffer_make_headroom (b0, MAX_HDRS_LEN);
221
222           /* Dequeue the data
223            * TODO 1) peek instead of dequeue
224            *      2) buffer chains */
225           if (peek_data)
226             {
227               int n_bytes_read;
228               n_bytes_read = svm_fifo_peek (s0->server_tx_fifo, s0->pid,
229                                             rx_offset, len_to_deq0, data0);
230               if (n_bytes_read < 0)
231                 goto dequeue_fail;
232
233               /* Keep track of progress locally, transport is also supposed to
234                * increment it independently when pushing header */
235               rx_offset += n_bytes_read;
236             }
237           else
238             {
239               if (svm_fifo_dequeue_nowait (s0->server_tx_fifo, s0->pid,
240                                            len_to_deq0, data0) < 0)
241                 goto dequeue_fail;
242             }
243
244           b0->current_length = len_to_deq0;
245
246           /* Ask transport to push header */
247           transport_vft->push_header (tc0, b0);
248
249           left_to_snd0 -= len_to_deq0;
250           *n_tx_packets = *n_tx_packets + 1;
251
252           vlib_validate_buffer_enqueue_x1 (vm, node, next_index,
253                                            to_next, n_left_to_next,
254                                            bi0, next0);
255         }
256       vlib_put_next_frame (vm, node, next_index, n_left_to_next);
257     }
258
259   /* If we couldn't dequeue all bytes store progress */
260   if (max_len_to_snd0 < e0->enqueue_length)
261     {
262       e0->enqueue_length -= max_len_to_snd0;
263       vec_add1 (smm->evts_partially_read[thread_index], *e0);
264     }
265   return 0;
266
267 dequeue_fail:
268   /* Can't read from fifo. Store event rx progress, save as partially read,
269    * return buff to free list and return  */
270   e0->enqueue_length -= max_len_to_snd0 - left_to_snd0;
271   vec_add1 (smm->evts_partially_read[thread_index], *e0);
272
273   to_next -= 1;
274   n_left_to_next += 1;
275   _vec_len (smm->tx_buffers[thread_index]) += 1;
276
277   clib_warning ("dequeue fail");
278   return 0;
279 }
280
281 int
282 session_tx_fifo_peek_and_snd (vlib_main_t * vm, vlib_node_runtime_t * node,
283                               session_manager_main_t * smm,
284                               session_fifo_event_t * e0,
285                               stream_session_t * s0, u32 thread_index,
286                               int *n_tx_pkts)
287 {
288   return session_tx_fifo_read_and_snd_i (vm, node, smm, e0, s0, thread_index,
289                                          n_tx_pkts, 1);
290 }
291
292 int
293 session_tx_fifo_dequeue_and_snd (vlib_main_t * vm, vlib_node_runtime_t * node,
294                                  session_manager_main_t * smm,
295                                  session_fifo_event_t * e0,
296                                  stream_session_t * s0, u32 thread_index,
297                                  int *n_tx_pkts)
298 {
299   return session_tx_fifo_read_and_snd_i (vm, node, smm, e0, s0, thread_index,
300                                          n_tx_pkts, 0);
301 }
302
303 static uword
304 session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
305                        vlib_frame_t * frame)
306 {
307   session_manager_main_t *smm = vnet_get_session_manager_main ();
308   session_fifo_event_t *my_fifo_events, *e;
309   u32 n_to_dequeue, n_events;
310   unix_shared_memory_queue_t *q;
311   int n_tx_packets = 0;
312   u32 my_thread_index = vm->cpu_index;
313   int i, rv;
314
315   /*
316    *  Update TCP time
317    */
318   tcp_update_time (vlib_time_now (vm), my_thread_index);
319
320   /*
321    * Get vpp queue events
322    */
323   q = smm->vpp_event_queues[my_thread_index];
324   if (PREDICT_FALSE (q == 0))
325     return 0;
326
327   /* min number of events we can dequeue without blocking */
328   n_to_dequeue = q->cursize;
329   my_fifo_events = smm->fifo_events[my_thread_index];
330
331   if (n_to_dequeue == 0 && vec_len (my_fifo_events) == 0)
332     return 0;
333
334   /*
335    * If we didn't manage to process previous events try going
336    * over them again without dequeuing new ones.
337    */
338   /* XXX: Block senders to sessions that can't keep up */
339   if (vec_len (my_fifo_events) >= 100)
340     goto skip_dequeue;
341
342   /* See you in the next life, don't be late */
343   if (pthread_mutex_trylock (&q->mutex))
344     return 0;
345
346   for (i = 0; i < n_to_dequeue; i++)
347     {
348       vec_add2 (my_fifo_events, e, 1);
349       unix_shared_memory_queue_sub_raw (q, (u8 *) e);
350     }
351
352   /* The other side of the connection is not polling */
353   if (q->cursize < (q->maxsize / 8))
354     (void) pthread_cond_broadcast (&q->condvar);
355   pthread_mutex_unlock (&q->mutex);
356
357   smm->fifo_events[my_thread_index] = my_fifo_events;
358
359 skip_dequeue:
360   n_events = vec_len (my_fifo_events);
361   for (i = 0; i < n_events; i++)
362     {
363       svm_fifo_t *f0;           /* $$$ prefetch 1 ahead maybe */
364       stream_session_t *s0;
365       u32 server_session_index0, server_thread_index0;
366       session_fifo_event_t *e0;
367
368       e0 = &my_fifo_events[i];
369       f0 = e0->fifo;
370       server_session_index0 = f0->server_session_index;
371       server_thread_index0 = f0->server_thread_index;
372
373       /* $$$ add multiple event queues, per vpp worker thread */
374       ASSERT (server_thread_index0 == my_thread_index);
375
376       s0 = stream_session_get_if_valid (server_session_index0,
377                                         my_thread_index);
378
379       if (CLIB_DEBUG && !s0)
380         {
381           clib_warning ("It's dead, Jim!");
382           continue;
383         }
384
385       if (PREDICT_FALSE (s0->session_state == SESSION_STATE_CLOSED))
386         continue;
387
388       ASSERT (s0->thread_index == my_thread_index);
389
390       switch (e0->event_type)
391         {
392         case FIFO_EVENT_SERVER_TX:
393           /* Spray packets in per session type frames, since they go to
394            * different nodes */
395           rv = (smm->session_rx_fns[s0->session_type]) (vm, node, smm, e0, s0,
396                                                         my_thread_index,
397                                                         &n_tx_packets);
398           if (rv < 0)
399             goto done;
400
401           break;
402
403         default:
404           clib_warning ("unhandled event type %d", e0->event_type);
405         }
406     }
407
408 done:
409
410   /* Couldn't process all events. Probably out of buffers */
411   if (PREDICT_FALSE (i < n_events))
412     {
413       session_fifo_event_t *partially_read =
414         smm->evts_partially_read[my_thread_index];
415       vec_add (partially_read, &my_fifo_events[i], n_events - i);
416       vec_free (my_fifo_events);
417       smm->fifo_events[my_thread_index] = partially_read;
418       smm->evts_partially_read[my_thread_index] = 0;
419     }
420   else
421     {
422       vec_free (smm->fifo_events[my_thread_index]);
423       smm->fifo_events[my_thread_index] =
424         smm->evts_partially_read[my_thread_index];
425       smm->evts_partially_read[my_thread_index] = 0;
426     }
427
428   vlib_node_increment_counter (vm, session_queue_node.index,
429                                SESSION_QUEUE_ERROR_TX, n_tx_packets);
430
431   return n_tx_packets;
432 }
433
434 /* *INDENT-OFF* */
435 VLIB_REGISTER_NODE (session_queue_node) =
436 {
437   .function = session_queue_node_fn,
438   .name = "session-queue",
439   .format_trace = format_session_queue_trace,
440   .type = VLIB_NODE_TYPE_INPUT,
441   .n_errors = ARRAY_LEN (session_queue_error_strings),
442   .error_strings = session_queue_error_strings,
443   .n_next_nodes = SESSION_QUEUE_N_NEXT,
444   .state = VLIB_NODE_STATE_DISABLED,
445   .next_nodes =
446   {
447       [SESSION_QUEUE_NEXT_DROP] = "error-drop",
448       [SESSION_QUEUE_NEXT_IP4_LOOKUP] = "ip4-lookup",
449       [SESSION_QUEUE_NEXT_IP6_LOOKUP] = "ip6-lookup",
450       [SESSION_QUEUE_NEXT_TCP_IP4_OUTPUT] = "tcp4-output",
451       [SESSION_QUEUE_NEXT_TCP_IP6_OUTPUT] = "tcp6-output",
452   },
453 };
454 /* *INDENT-ON* */
455
456 /*
457  * fd.io coding-style-patch-verification: ON
458  *
459  * Local Variables:
460  * eval: (c-set-style "gnu")
461  * End:
462  */