VPP-598: tcp stack initial commit
[vpp.git] / src / vnet / session / node.c
1 /*
2  * Copyright (c) 2017 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include <vlib/vlib.h>
17 #include <vnet/vnet.h>
18 #include <vnet/pg/pg.h>
19 #include <vnet/ip/ip.h>
20
21 #include <vnet/tcp/tcp.h>
22
23 #include <vppinfra/hash.h>
24 #include <vppinfra/error.h>
25 #include <vppinfra/elog.h>
26 #include <vlibmemory/unix_shared_memory_queue.h>
27
28 #include <vnet/udp/udp_packet.h>
29 #include <vnet/lisp-cp/packets.h>
30 #include <math.h>
31
32 vlib_node_registration_t session_queue_node;
33
34 typedef struct
35 {
36   u32 session_index;
37   u32 server_thread_index;
38 } session_queue_trace_t;
39
40 /* packet trace format function */
41 static u8 *
42 format_session_queue_trace (u8 * s, va_list * args)
43 {
44   CLIB_UNUSED (vlib_main_t * vm) = va_arg (*args, vlib_main_t *);
45   CLIB_UNUSED (vlib_node_t * node) = va_arg (*args, vlib_node_t *);
46   session_queue_trace_t *t = va_arg (*args, session_queue_trace_t *);
47
48   s = format (s, "SESSION_QUEUE: session index %d, server thread index %d",
49               t->session_index, t->server_thread_index);
50   return s;
51 }
52
53 vlib_node_registration_t session_queue_node;
54
55 #define foreach_session_queue_error                 \
56 _(TX, "Packets transmitted")                    \
57 _(TIMER, "Timer events")
58
59 typedef enum
60 {
61 #define _(sym,str) SESSION_QUEUE_ERROR_##sym,
62   foreach_session_queue_error
63 #undef _
64     SESSION_QUEUE_N_ERROR,
65 } session_queue_error_t;
66
67 static char *session_queue_error_strings[] = {
68 #define _(sym,string) string,
69   foreach_session_queue_error
70 #undef _
71 };
72
73 static u32 session_type_to_next[] = {
74   SESSION_QUEUE_NEXT_TCP_IP4_OUTPUT,
75   SESSION_QUEUE_NEXT_IP4_LOOKUP,
76   SESSION_QUEUE_NEXT_TCP_IP6_OUTPUT,
77   SESSION_QUEUE_NEXT_IP6_LOOKUP,
78 };
79
80 always_inline int
81 session_fifo_rx_i (vlib_main_t * vm, vlib_node_runtime_t * node,
82                    session_manager_main_t * smm, session_fifo_event_t * e0,
83                    stream_session_t * s0, u32 thread_index, int *n_tx_packets,
84                    u8 peek_data)
85 {
86   u32 n_trace = vlib_get_trace_count (vm, node);
87   u32 left_to_snd0, max_len_to_snd0, len_to_deq0, n_bufs, snd_space0;
88   u32 n_frame_bytes, n_frames_per_evt;
89   transport_connection_t *tc0;
90   transport_proto_vft_t *transport_vft;
91   u32 next_index, next0, *to_next, n_left_to_next, bi0;
92   vlib_buffer_t *b0;
93   u32 rx_offset;
94   u16 snd_mss0;
95   u8 *data0;
96   int i;
97
98   next_index = next0 = session_type_to_next[s0->session_type];
99
100   transport_vft = session_get_transport_vft (s0->session_type);
101   tc0 = transport_vft->get_connection (s0->connection_index, thread_index);
102
103   /* Make sure we have space to send and there's something to dequeue */
104   snd_space0 = transport_vft->send_space (tc0);
105   snd_mss0 = transport_vft->send_mss (tc0);
106
107   if (snd_space0 == 0 || svm_fifo_max_dequeue (s0->server_tx_fifo) == 0
108       || snd_mss0 == 0)
109     return 0;
110
111   ASSERT (e0->enqueue_length > 0);
112
113   /* Ensure we're not writing more than transport window allows */
114   max_len_to_snd0 = clib_min (e0->enqueue_length, snd_space0);
115
116   if (peek_data)
117     {
118       /* Offset in rx fifo from where to peek data  */
119       rx_offset = transport_vft->rx_fifo_offset (tc0);
120     }
121
122   /* TODO check if transport is willing to send len_to_snd0
123    * bytes (Nagle) */
124
125   n_frame_bytes = snd_mss0 * VLIB_FRAME_SIZE;
126   n_frames_per_evt = ceil ((double) max_len_to_snd0 / n_frame_bytes);
127
128   n_bufs = vec_len (smm->tx_buffers[thread_index]);
129   left_to_snd0 = max_len_to_snd0;
130   for (i = 0; i < n_frames_per_evt; i++)
131     {
132       /* Make sure we have at least one full frame of buffers ready */
133       if (PREDICT_FALSE (n_bufs < VLIB_FRAME_SIZE))
134         {
135           vec_validate (smm->tx_buffers[thread_index],
136                         n_bufs + VLIB_FRAME_SIZE - 1);
137           n_bufs +=
138             vlib_buffer_alloc (vm, &smm->tx_buffers[thread_index][n_bufs],
139                                VLIB_FRAME_SIZE);
140
141           /* buffer shortage
142            * XXX 0.9 because when debugging we might not get a full frame */
143           if (PREDICT_FALSE (n_bufs < 0.9 * VLIB_FRAME_SIZE))
144             {
145               /* Keep track of how much we've dequeued and exit */
146               e0->enqueue_length -= max_len_to_snd0 - left_to_snd0;
147               return -1;
148             }
149
150           _vec_len (smm->tx_buffers[thread_index]) = n_bufs;
151         }
152
153       vlib_get_next_frame (vm, node, next_index, to_next, n_left_to_next);
154       while (left_to_snd0 && n_left_to_next)
155         {
156           /* Get free buffer */
157           n_bufs--;
158           bi0 = smm->tx_buffers[thread_index][n_bufs];
159           _vec_len (smm->tx_buffers[thread_index]) = n_bufs;
160
161           b0 = vlib_get_buffer (vm, bi0);
162           b0->error = 0;
163           b0->flags = VLIB_BUFFER_TOTAL_LENGTH_VALID
164             | VNET_BUFFER_LOCALLY_ORIGINATED;
165           b0->current_data = 0;
166
167           /* RX on the local interface. tx in default fib */
168           vnet_buffer (b0)->sw_if_index[VLIB_RX] = 0;
169           vnet_buffer (b0)->sw_if_index[VLIB_TX] = (u32) ~ 0;
170
171           /* usual speculation, or the enqueue_x1 macro will barf */
172           to_next[0] = bi0;
173           to_next += 1;
174           n_left_to_next -= 1;
175
176           VLIB_BUFFER_TRACE_TRAJECTORY_INIT (b0);
177           if (PREDICT_FALSE (n_trace > 0))
178             {
179               session_queue_trace_t *t0;
180               vlib_trace_buffer (vm, node, next_index, b0,
181                                  1 /* follow_chain */ );
182               vlib_set_trace_count (vm, node, --n_trace);
183               t0 = vlib_add_trace (vm, node, b0, sizeof (*t0));
184               t0->session_index = s0->session_index;
185               t0->server_thread_index = s0->thread_index;
186             }
187
188           if (1)
189             {
190               ELOG_TYPE_DECLARE (e) =
191               {
192               .format = "evt-dequeue: id %d length %d",.format_args =
193                   "i4i4",};
194               struct
195               {
196                 u32 data[2];
197               } *ed;
198               ed = ELOG_DATA (&vm->elog_main, e);
199               ed->data[0] = e0->event_id;
200               ed->data[1] = e0->enqueue_length;
201             }
202
203           len_to_deq0 = (left_to_snd0 < snd_mss0) ? left_to_snd0 : snd_mss0;
204
205           /* Make room for headers */
206           data0 = vlib_buffer_make_headroom (b0, MAX_HDRS_LEN);
207
208           /* Dequeue the data
209            * TODO 1) peek instead of dequeue
210            *      2) buffer chains */
211           if (peek_data)
212             {
213               int n_bytes_read;
214               n_bytes_read = svm_fifo_peek (s0->server_tx_fifo, s0->pid,
215                                             rx_offset, len_to_deq0, data0);
216               if (n_bytes_read < 0)
217                 goto dequeue_fail;
218
219               /* Keep track of progress locally, transport is also supposed to
220                * increment it independently when pushing header */
221               rx_offset += n_bytes_read;
222             }
223           else
224             {
225               if (svm_fifo_dequeue_nowait (s0->server_tx_fifo, s0->pid,
226                                            len_to_deq0, data0) < 0)
227                 goto dequeue_fail;
228             }
229
230           b0->current_length = len_to_deq0;
231
232           /* Ask transport to push header */
233           transport_vft->push_header (tc0, b0);
234
235           left_to_snd0 -= len_to_deq0;
236           *n_tx_packets = *n_tx_packets + 1;
237
238           vlib_validate_buffer_enqueue_x1 (vm, node, next_index,
239                                            to_next, n_left_to_next,
240                                            bi0, next0);
241         }
242       vlib_put_next_frame (vm, node, next_index, n_left_to_next);
243     }
244
245   /* If we couldn't dequeue all bytes store progress */
246   if (max_len_to_snd0 < e0->enqueue_length)
247     {
248       e0->enqueue_length -= max_len_to_snd0;
249       vec_add1 (smm->evts_partially_read[thread_index], *e0);
250     }
251   return 0;
252
253 dequeue_fail:
254   /* Can't read from fifo. Store event rx progress, save as partially read,
255    * return buff to free list and return  */
256   e0->enqueue_length -= max_len_to_snd0 - left_to_snd0;
257   vec_add1 (smm->evts_partially_read[thread_index], *e0);
258
259   to_next -= 1;
260   n_left_to_next += 1;
261   _vec_len (smm->tx_buffers[thread_index]) += 1;
262
263   clib_warning ("dequeue fail");
264   return 0;
265 }
266
267 int
268 session_fifo_rx_peek (vlib_main_t * vm, vlib_node_runtime_t * node,
269                       session_manager_main_t * smm, session_fifo_event_t * e0,
270                       stream_session_t * s0, u32 thread_index, int *n_tx_pkts)
271 {
272   return session_fifo_rx_i (vm, node, smm, e0, s0, thread_index, n_tx_pkts,
273                             1);
274 }
275
276 int
277 session_fifo_rx_dequeue (vlib_main_t * vm, vlib_node_runtime_t * node,
278                          session_manager_main_t * smm,
279                          session_fifo_event_t * e0, stream_session_t * s0,
280                          u32 thread_index, int *n_tx_pkts)
281 {
282   return session_fifo_rx_i (vm, node, smm, e0, s0, thread_index, n_tx_pkts,
283                             0);
284 }
285
286 static uword
287 session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
288                        vlib_frame_t * frame)
289 {
290   session_manager_main_t *smm = vnet_get_session_manager_main ();
291   session_fifo_event_t *my_fifo_events, *e;
292   u32 n_to_dequeue;
293   unix_shared_memory_queue_t *q;
294   int n_tx_packets = 0;
295   u32 my_thread_index = vm->cpu_index;
296   int i, rv;
297
298   /*
299    *  Update TCP time
300    */
301   tcp_update_time (vlib_time_now (vm), my_thread_index);
302
303   /*
304    * Get vpp queue events
305    */
306   q = smm->vpp_event_queues[my_thread_index];
307   if (PREDICT_FALSE (q == 0))
308     return 0;
309
310   /* min number of events we can dequeue without blocking */
311   n_to_dequeue = q->cursize;
312   if (n_to_dequeue == 0)
313     return 0;
314
315   my_fifo_events = smm->fifo_events[my_thread_index];
316
317   /* If we didn't manage to process previous events try going
318    * over them again without dequeuing new ones.
319    * XXX: Block senders to sessions that can't keep up */
320   if (vec_len (my_fifo_events) >= 100)
321     goto skip_dequeue;
322
323   /* See you in the next life, don't be late */
324   if (pthread_mutex_trylock (&q->mutex))
325     return 0;
326
327   for (i = 0; i < n_to_dequeue; i++)
328     {
329       vec_add2 (my_fifo_events, e, 1);
330       unix_shared_memory_queue_sub_raw (q, (u8 *) e);
331     }
332
333   /* The other side of the connection is not polling */
334   if (q->cursize < (q->maxsize / 8))
335     (void) pthread_cond_broadcast (&q->condvar);
336   pthread_mutex_unlock (&q->mutex);
337
338   smm->fifo_events[my_thread_index] = my_fifo_events;
339
340 skip_dequeue:
341
342   for (i = 0; i < n_to_dequeue; i++)
343     {
344       svm_fifo_t *f0;           /* $$$ prefetch 1 ahead maybe */
345       stream_session_t *s0;
346       u32 server_session_index0, server_thread_index0;
347       session_fifo_event_t *e0;
348
349       e0 = &my_fifo_events[i];
350       f0 = e0->fifo;
351       server_session_index0 = f0->server_session_index;
352       server_thread_index0 = f0->server_thread_index;
353
354       /* $$$ add multiple event queues, per vpp worker thread */
355       ASSERT (server_thread_index0 == my_thread_index);
356
357       s0 = pool_elt_at_index (smm->sessions[my_thread_index],
358                               server_session_index0);
359
360       ASSERT (s0->thread_index == my_thread_index);
361
362       switch (e0->event_type)
363         {
364         case FIFO_EVENT_SERVER_TX:
365           /* Spray packets in per session type frames, since they go to
366            * different nodes */
367           rv = (smm->session_rx_fns[s0->session_type]) (vm, node, smm, e0, s0,
368                                                         my_thread_index,
369                                                         &n_tx_packets);
370           if (rv < 0)
371             goto done;
372
373           break;
374
375         default:
376           clib_warning ("unhandled event type %d", e0->event_type);
377         }
378     }
379
380 done:
381
382   /* Couldn't process all events. Probably out of buffers */
383   if (PREDICT_FALSE (i < n_to_dequeue))
384     {
385       session_fifo_event_t *partially_read =
386         smm->evts_partially_read[my_thread_index];
387       vec_add (partially_read, &my_fifo_events[i], n_to_dequeue - i);
388       vec_free (my_fifo_events);
389       smm->fifo_events[my_thread_index] = partially_read;
390       smm->evts_partially_read[my_thread_index] = 0;
391     }
392   else
393     {
394       vec_free (smm->fifo_events[my_thread_index]);
395       smm->fifo_events[my_thread_index] =
396         smm->evts_partially_read[my_thread_index];
397       smm->evts_partially_read[my_thread_index] = 0;
398     }
399
400   vlib_node_increment_counter (vm, session_queue_node.index,
401                                SESSION_QUEUE_ERROR_TX, n_tx_packets);
402
403   return n_tx_packets;
404 }
405
406 /* *INDENT-OFF* */
407 VLIB_REGISTER_NODE (session_queue_node) =
408 {
409   .function = session_queue_node_fn,
410   .name = "session-queue",
411   .format_trace = format_session_queue_trace,
412   .type = VLIB_NODE_TYPE_INPUT,
413   .n_errors = ARRAY_LEN (session_queue_error_strings),
414   .error_strings = session_queue_error_strings,
415   .n_next_nodes = SESSION_QUEUE_N_NEXT,
416   /* .state = VLIB_NODE_STATE_DISABLED, enable on-demand? */
417   /* edit / add dispositions here */
418   .next_nodes =
419   {
420       [SESSION_QUEUE_NEXT_DROP] = "error-drop",
421       [SESSION_QUEUE_NEXT_IP4_LOOKUP] = "ip4-lookup",
422       [SESSION_QUEUE_NEXT_IP6_LOOKUP] = "ip6-lookup",
423       [SESSION_QUEUE_NEXT_TCP_IP4_OUTPUT] = "tcp4-output",
424       [SESSION_QUEUE_NEXT_TCP_IP6_OUTPUT] = "tcp6-output",
425   },
426 };
427 /* *INDENT-ON* */
428
429 /*
430  * fd.io coding-style-patch-verification: ON
431  *
432  * Local Variables:
433  * eval: (c-set-style "gnu")
434  * End:
435  */