822afebde0c06fb726c9aaeb22339216b307ceeb
[vpp.git] / src / vnet / session / node.c
1 /*
2  * Copyright (c) 2017 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include <vlib/vlib.h>
17 #include <vnet/vnet.h>
18 #include <vnet/pg/pg.h>
19 #include <vnet/ip/ip.h>
20
21 #include <vnet/tcp/tcp.h>
22
23 #include <vppinfra/hash.h>
24 #include <vppinfra/error.h>
25 #include <vppinfra/elog.h>
26 #include <vlibmemory/unix_shared_memory_queue.h>
27
28 #include <vnet/udp/udp_packet.h>
29 #include <math.h>
30 #include <vnet/session/session_debug.h>
31
32 vlib_node_registration_t session_queue_node;
33
34 typedef struct
35 {
36   u32 session_index;
37   u32 server_thread_index;
38 } session_queue_trace_t;
39
40 /* packet trace format function */
41 static u8 *
42 format_session_queue_trace (u8 * s, va_list * args)
43 {
44   CLIB_UNUSED (vlib_main_t * vm) = va_arg (*args, vlib_main_t *);
45   CLIB_UNUSED (vlib_node_t * node) = va_arg (*args, vlib_node_t *);
46   session_queue_trace_t *t = va_arg (*args, session_queue_trace_t *);
47
48   s = format (s, "SESSION_QUEUE: session index %d, server thread index %d",
49               t->session_index, t->server_thread_index);
50   return s;
51 }
52
53 vlib_node_registration_t session_queue_node;
54
55 #define foreach_session_queue_error                 \
56 _(TX, "Packets transmitted")                    \
57 _(TIMER, "Timer events")
58
59 typedef enum
60 {
61 #define _(sym,str) SESSION_QUEUE_ERROR_##sym,
62   foreach_session_queue_error
63 #undef _
64     SESSION_QUEUE_N_ERROR,
65 } session_queue_error_t;
66
67 static char *session_queue_error_strings[] = {
68 #define _(sym,string) string,
69   foreach_session_queue_error
70 #undef _
71 };
72
73 static u32 session_type_to_next[] = {
74   SESSION_QUEUE_NEXT_TCP_IP4_OUTPUT,
75   SESSION_QUEUE_NEXT_IP4_LOOKUP,
76   SESSION_QUEUE_NEXT_TCP_IP6_OUTPUT,
77   SESSION_QUEUE_NEXT_IP6_LOOKUP,
78 };
79
80 always_inline int
81 session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node,
82                                 session_manager_main_t * smm,
83                                 session_fifo_event_t * e0,
84                                 stream_session_t * s0, u32 thread_index,
85                                 int *n_tx_packets, u8 peek_data)
86 {
87   u32 n_trace = vlib_get_trace_count (vm, node);
88   u32 left_to_snd0, max_len_to_snd0, len_to_deq0, n_bufs, snd_space0;
89   u32 n_frame_bytes, n_frames_per_evt;
90   transport_connection_t *tc0;
91   transport_proto_vft_t *transport_vft;
92   u32 next_index, next0, *to_next, n_left_to_next, bi0;
93   vlib_buffer_t *b0;
94   u32 rx_offset;
95   u16 snd_mss0;
96   u8 *data0;
97   int i;
98
99   next_index = next0 = session_type_to_next[s0->session_type];
100
101   transport_vft = session_get_transport_vft (s0->session_type);
102   tc0 = transport_vft->get_connection (s0->connection_index, thread_index);
103
104   /* Make sure we have space to send and there's something to dequeue */
105   snd_space0 = transport_vft->send_space (tc0);
106   snd_mss0 = transport_vft->send_mss (tc0);
107
108   /* Can't make any progress */
109   if (snd_space0 == 0 || svm_fifo_max_dequeue (s0->server_tx_fifo) == 0
110       || snd_mss0 == 0)
111     {
112       vec_add1 (smm->evts_partially_read[thread_index], *e0);
113       return 0;
114     }
115
116   ASSERT (e0->enqueue_length > 0);
117
118   /* Ensure we're not writing more than transport window allows */
119   max_len_to_snd0 = clib_min (e0->enqueue_length, snd_space0);
120
121   if (peek_data)
122     {
123       /* Offset in rx fifo from where to peek data  */
124       rx_offset = transport_vft->tx_fifo_offset (tc0);
125     }
126
127   /* TODO check if transport is willing to send len_to_snd0
128    * bytes (Nagle) */
129
130   n_frame_bytes = snd_mss0 * VLIB_FRAME_SIZE;
131   n_frames_per_evt = ceil ((double) max_len_to_snd0 / n_frame_bytes);
132
133   n_bufs = vec_len (smm->tx_buffers[thread_index]);
134   left_to_snd0 = max_len_to_snd0;
135   for (i = 0; i < n_frames_per_evt; i++)
136     {
137       /* Make sure we have at least one full frame of buffers ready */
138       if (PREDICT_FALSE (n_bufs < VLIB_FRAME_SIZE))
139         {
140           vec_validate (smm->tx_buffers[thread_index],
141                         n_bufs + VLIB_FRAME_SIZE - 1);
142           n_bufs +=
143             vlib_buffer_alloc (vm, &smm->tx_buffers[thread_index][n_bufs],
144                                VLIB_FRAME_SIZE);
145
146           /* buffer shortage
147            * XXX 0.9 because when debugging we might not get a full frame */
148           if (PREDICT_FALSE (n_bufs < 0.9 * VLIB_FRAME_SIZE))
149             {
150               /* Keep track of how much we've dequeued and exit */
151               if (left_to_snd0 != max_len_to_snd0)
152                 {
153                   e0->enqueue_length -= max_len_to_snd0 - left_to_snd0;
154                   vec_add1 (smm->evts_partially_read[thread_index], *e0);
155                 }
156
157               return -1;
158             }
159
160           _vec_len (smm->tx_buffers[thread_index]) = n_bufs;
161         }
162
163       vlib_get_next_frame (vm, node, next_index, to_next, n_left_to_next);
164       while (left_to_snd0 && n_left_to_next)
165         {
166           /* Get free buffer */
167           n_bufs--;
168           bi0 = smm->tx_buffers[thread_index][n_bufs];
169           _vec_len (smm->tx_buffers[thread_index]) = n_bufs;
170
171           b0 = vlib_get_buffer (vm, bi0);
172           b0->error = 0;
173           b0->flags = VLIB_BUFFER_TOTAL_LENGTH_VALID
174             | VNET_BUFFER_LOCALLY_ORIGINATED;
175           b0->current_data = 0;
176
177           /* RX on the local interface. tx in default fib */
178           vnet_buffer (b0)->sw_if_index[VLIB_RX] = 0;
179           vnet_buffer (b0)->sw_if_index[VLIB_TX] = (u32) ~ 0;
180
181           /* usual speculation, or the enqueue_x1 macro will barf */
182           to_next[0] = bi0;
183           to_next += 1;
184           n_left_to_next -= 1;
185
186           VLIB_BUFFER_TRACE_TRAJECTORY_INIT (b0);
187           if (PREDICT_FALSE (n_trace > 0))
188             {
189               session_queue_trace_t *t0;
190               vlib_trace_buffer (vm, node, next_index, b0,
191                                  1 /* follow_chain */ );
192               vlib_set_trace_count (vm, node, --n_trace);
193               t0 = vlib_add_trace (vm, node, b0, sizeof (*t0));
194               t0->session_index = s0->session_index;
195               t0->server_thread_index = s0->thread_index;
196             }
197
198           len_to_deq0 = (left_to_snd0 < snd_mss0) ? left_to_snd0 : snd_mss0;
199
200           /* *INDENT-OFF* */
201           SESSION_EVT_DBG(s0, SESSION_EVT_DEQ, ({
202               ed->data[0] = e0->event_id;
203               ed->data[1] = e0->enqueue_length;
204               ed->data[2] = len_to_deq0;
205               ed->data[3] = left_to_snd0;
206           }));
207           /* *INDENT-ON* */
208
209           /* Make room for headers */
210           data0 = vlib_buffer_make_headroom (b0, MAX_HDRS_LEN);
211
212           /* Dequeue the data
213            * TODO 1) peek instead of dequeue
214            *      2) buffer chains */
215           if (peek_data)
216             {
217               int n_bytes_read;
218               n_bytes_read = svm_fifo_peek (s0->server_tx_fifo, s0->pid,
219                                             rx_offset, len_to_deq0, data0);
220               if (n_bytes_read < 0)
221                 goto dequeue_fail;
222
223               /* Keep track of progress locally, transport is also supposed to
224                * increment it independently when pushing header */
225               rx_offset += n_bytes_read;
226             }
227           else
228             {
229               if (svm_fifo_dequeue_nowait (s0->server_tx_fifo, s0->pid,
230                                            len_to_deq0, data0) < 0)
231                 goto dequeue_fail;
232             }
233
234           b0->current_length = len_to_deq0;
235
236           /* Ask transport to push header */
237           transport_vft->push_header (tc0, b0);
238
239           left_to_snd0 -= len_to_deq0;
240           *n_tx_packets = *n_tx_packets + 1;
241
242           vlib_validate_buffer_enqueue_x1 (vm, node, next_index,
243                                            to_next, n_left_to_next,
244                                            bi0, next0);
245         }
246       vlib_put_next_frame (vm, node, next_index, n_left_to_next);
247     }
248
249   /* If we couldn't dequeue all bytes store progress */
250   if (max_len_to_snd0 < e0->enqueue_length)
251     {
252       e0->enqueue_length -= max_len_to_snd0;
253       vec_add1 (smm->evts_partially_read[thread_index], *e0);
254     }
255   return 0;
256
257 dequeue_fail:
258   /* Can't read from fifo. Store event rx progress, save as partially read,
259    * return buff to free list and return  */
260   e0->enqueue_length -= max_len_to_snd0 - left_to_snd0;
261   vec_add1 (smm->evts_partially_read[thread_index], *e0);
262
263   to_next -= 1;
264   n_left_to_next += 1;
265   _vec_len (smm->tx_buffers[thread_index]) += 1;
266
267   clib_warning ("dequeue fail");
268   return 0;
269 }
270
271 int
272 session_tx_fifo_peek_and_snd (vlib_main_t * vm, vlib_node_runtime_t * node,
273                               session_manager_main_t * smm,
274                               session_fifo_event_t * e0,
275                               stream_session_t * s0, u32 thread_index,
276                               int *n_tx_pkts)
277 {
278   return session_tx_fifo_read_and_snd_i (vm, node, smm, e0, s0, thread_index,
279                                          n_tx_pkts, 1);
280 }
281
282 int
283 session_tx_fifo_dequeue_and_snd (vlib_main_t * vm, vlib_node_runtime_t * node,
284                                  session_manager_main_t * smm,
285                                  session_fifo_event_t * e0,
286                                  stream_session_t * s0, u32 thread_index,
287                                  int *n_tx_pkts)
288 {
289   return session_tx_fifo_read_and_snd_i (vm, node, smm, e0, s0, thread_index,
290                                          n_tx_pkts, 0);
291 }
292
293 static uword
294 session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
295                        vlib_frame_t * frame)
296 {
297   session_manager_main_t *smm = vnet_get_session_manager_main ();
298   session_fifo_event_t *my_fifo_events, *e;
299   u32 n_to_dequeue, n_events;
300   unix_shared_memory_queue_t *q;
301   int n_tx_packets = 0;
302   u32 my_thread_index = vm->cpu_index;
303   int i, rv;
304
305   /*
306    *  Update TCP time
307    */
308   tcp_update_time (vlib_time_now (vm), my_thread_index);
309
310   /*
311    * Get vpp queue events
312    */
313   q = smm->vpp_event_queues[my_thread_index];
314   if (PREDICT_FALSE (q == 0))
315     return 0;
316
317   /* min number of events we can dequeue without blocking */
318   n_to_dequeue = q->cursize;
319   my_fifo_events = smm->fifo_events[my_thread_index];
320
321   if (n_to_dequeue == 0 && vec_len (my_fifo_events) == 0)
322     return 0;
323
324   /*
325    * If we didn't manage to process previous events try going
326    * over them again without dequeuing new ones.
327    */
328   /* XXX: Block senders to sessions that can't keep up */
329   if (vec_len (my_fifo_events) >= 100)
330     goto skip_dequeue;
331
332   /* See you in the next life, don't be late */
333   if (pthread_mutex_trylock (&q->mutex))
334     return 0;
335
336   for (i = 0; i < n_to_dequeue; i++)
337     {
338       vec_add2 (my_fifo_events, e, 1);
339       unix_shared_memory_queue_sub_raw (q, (u8 *) e);
340     }
341
342   /* The other side of the connection is not polling */
343   if (q->cursize < (q->maxsize / 8))
344     (void) pthread_cond_broadcast (&q->condvar);
345   pthread_mutex_unlock (&q->mutex);
346
347   smm->fifo_events[my_thread_index] = my_fifo_events;
348
349 skip_dequeue:
350   n_events = vec_len (my_fifo_events);
351   for (i = 0; i < n_events; i++)
352     {
353       svm_fifo_t *f0;           /* $$$ prefetch 1 ahead maybe */
354       stream_session_t *s0;
355       u32 server_session_index0, server_thread_index0;
356       session_fifo_event_t *e0;
357
358       e0 = &my_fifo_events[i];
359       f0 = e0->fifo;
360       server_session_index0 = f0->server_session_index;
361       server_thread_index0 = f0->server_thread_index;
362
363       /* $$$ add multiple event queues, per vpp worker thread */
364       ASSERT (server_thread_index0 == my_thread_index);
365
366       s0 = stream_session_get_if_valid (server_session_index0,
367                                         my_thread_index);
368
369       if (CLIB_DEBUG && !s0)
370         {
371           clib_warning ("It's dead, Jim!");
372           continue;
373         }
374
375       if (PREDICT_FALSE (s0->session_state == SESSION_STATE_CLOSED))
376         continue;
377
378       ASSERT (s0->thread_index == my_thread_index);
379
380       switch (e0->event_type)
381         {
382         case FIFO_EVENT_SERVER_TX:
383           /* Spray packets in per session type frames, since they go to
384            * different nodes */
385           rv = (smm->session_tx_fns[s0->session_type]) (vm, node, smm, e0, s0,
386                                                         my_thread_index,
387                                                         &n_tx_packets);
388           if (rv < 0)
389             goto done;
390
391           break;
392
393         default:
394           clib_warning ("unhandled event type %d", e0->event_type);
395         }
396     }
397
398 done:
399
400   /* Couldn't process all events. Probably out of buffers */
401   if (PREDICT_FALSE (i < n_events))
402     {
403       session_fifo_event_t *partially_read =
404         smm->evts_partially_read[my_thread_index];
405       vec_add (partially_read, &my_fifo_events[i], n_events - i);
406       vec_free (my_fifo_events);
407       smm->fifo_events[my_thread_index] = partially_read;
408       smm->evts_partially_read[my_thread_index] = 0;
409     }
410   else
411     {
412       vec_free (smm->fifo_events[my_thread_index]);
413       smm->fifo_events[my_thread_index] =
414         smm->evts_partially_read[my_thread_index];
415       smm->evts_partially_read[my_thread_index] = 0;
416     }
417
418   vlib_node_increment_counter (vm, session_queue_node.index,
419                                SESSION_QUEUE_ERROR_TX, n_tx_packets);
420
421   return n_tx_packets;
422 }
423
424 /* *INDENT-OFF* */
425 VLIB_REGISTER_NODE (session_queue_node) =
426 {
427   .function = session_queue_node_fn,
428   .name = "session-queue",
429   .format_trace = format_session_queue_trace,
430   .type = VLIB_NODE_TYPE_INPUT,
431   .n_errors = ARRAY_LEN (session_queue_error_strings),
432   .error_strings = session_queue_error_strings,
433   .n_next_nodes = SESSION_QUEUE_N_NEXT,
434   .state = VLIB_NODE_STATE_DISABLED,
435   .next_nodes =
436   {
437       [SESSION_QUEUE_NEXT_DROP] = "error-drop",
438       [SESSION_QUEUE_NEXT_IP4_LOOKUP] = "ip4-lookup",
439       [SESSION_QUEUE_NEXT_IP6_LOOKUP] = "ip6-lookup",
440       [SESSION_QUEUE_NEXT_TCP_IP4_OUTPUT] = "tcp4-output",
441       [SESSION_QUEUE_NEXT_TCP_IP6_OUTPUT] = "tcp6-output",
442   },
443 };
444 /* *INDENT-ON* */
445
446 /*
447  * fd.io coding-style-patch-verification: ON
448  *
449  * Local Variables:
450  * eval: (c-set-style "gnu")
451  * End:
452  */