Disable for-us udp/tcp checksum validation by default
[vpp.git] / vnet / vnet / devices / dpdk / node.c
1 /*
2  * Copyright (c) 2015 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include <vnet/vnet.h>
16 #include <vppinfra/vec.h>
17 #include <vppinfra/error.h>
18 #include <vppinfra/format.h>
19 #include <vppinfra/xxhash.h>
20
21 #include <vnet/ethernet/ethernet.h>
22 #include <vnet/devices/dpdk/dpdk.h>
23 #include <vnet/classify/vnet_classify.h>
24 #include <vnet/mpls-gre/packet.h>
25
26 #include "dpdk_priv.h"
27
28 #ifndef MAX
29 #define MAX(a,b) ((a) < (b) ? (b) : (a))
30 #endif
31
32 #ifndef MIN
33 #define MIN(a,b) ((a) < (b) ? (a) : (b))
34 #endif
35
36 /*
37  * At least in certain versions of ESXi, vmware e1000's don't honor the
38  * "strip rx CRC" bit. Set this flag to work around that bug FOR UNIT TEST ONLY.
39  *
40  * If wireshark complains like so:
41  *
42  * "Frame check sequence: 0x00000000 [incorrect, should be <hex-num>]"
43  * and you're using ESXi emulated e1000's, set this flag FOR UNIT TEST ONLY.
44  *
45  * Note: do NOT check in this file with this workaround enabled! You'll lose
46  * actual data from e.g. 10xGE interfaces. The extra 4 bytes annoy
47  * wireshark, but they're harmless...
48  */
49 #define VMWARE_LENGTH_BUG_WORKAROUND 0
50
51 typedef struct {
52   u32 cached_next_index;
53
54   /* convenience variables */
55   vlib_main_t * vlib_main;
56   vnet_main_t * vnet_main;
57 } handoff_dispatch_main_t;
58
59 typedef struct {
60   u32 buffer_index;
61   u32 next_index;
62   u32 sw_if_index;
63 } handoff_dispatch_trace_t;
64
65 /* packet trace format function */
66 static u8 * format_handoff_dispatch_trace (u8 * s, va_list * args)
67 {
68   CLIB_UNUSED (vlib_main_t * vm) = va_arg (*args, vlib_main_t *);
69   CLIB_UNUSED (vlib_node_t * node) = va_arg (*args, vlib_node_t *);
70   handoff_dispatch_trace_t * t = va_arg (*args, handoff_dispatch_trace_t *);
71   
72   s = format (s, "HANDOFF_DISPATCH: sw_if_index %d next_index %d buffer 0x%x",
73       t->sw_if_index,
74       t->next_index,
75       t->buffer_index);
76   return s;
77 }
78
79 handoff_dispatch_main_t handoff_dispatch_main;
80
81 vlib_node_registration_t handoff_dispatch_node;
82
83 #define foreach_handoff_dispatch_error \
84 _(EXAMPLE, "example packets")
85
86 typedef enum {
87 #define _(sym,str) HANDOFF_DISPATCH_ERROR_##sym,
88   foreach_handoff_dispatch_error
89 #undef _
90   HANDOFF_DISPATCH_N_ERROR,
91 } handoff_dispatch_error_t;
92
93 static char * handoff_dispatch_error_strings[] = {
94 #define _(sym,string) string,
95   foreach_handoff_dispatch_error
96 #undef _
97 };
98
99 static inline
100 void vlib_put_handoff_queue_elt (vlib_frame_queue_elt_t * hf)
101 {
102   CLIB_MEMORY_BARRIER();
103   hf->valid = 1;
104 }
105
106 static uword
107 handoff_dispatch_node_fn (vlib_main_t * vm,
108                   vlib_node_runtime_t * node,
109                   vlib_frame_t * frame)
110 {
111   u32 n_left_from, * from, * to_next;
112   dpdk_rx_next_t next_index;
113
114   from = vlib_frame_vector_args (frame);
115   n_left_from = frame->n_vectors;
116   next_index = node->cached_next_index;
117
118   while (n_left_from > 0)
119     {
120       u32 n_left_to_next;
121
122       vlib_get_next_frame (vm, node, next_index,
123                            to_next, n_left_to_next);
124
125       while (n_left_from >= 4 && n_left_to_next >= 2)
126         {
127           u32 bi0, bi1;
128           vlib_buffer_t * b0, * b1;
129           u32 next0, next1;
130           u32 sw_if_index0, sw_if_index1;
131           
132           /* Prefetch next iteration. */
133           {
134             vlib_buffer_t * p2, * p3;
135             
136             p2 = vlib_get_buffer (vm, from[2]);
137             p3 = vlib_get_buffer (vm, from[3]);
138             
139             vlib_prefetch_buffer_header (p2, LOAD);
140             vlib_prefetch_buffer_header (p3, LOAD);
141           }
142
143           /* speculatively enqueue b0 and b1 to the current next frame */
144           to_next[0] = bi0 = from[0];
145           to_next[1] = bi1 = from[1];
146           from += 2;
147           to_next += 2;
148           n_left_from -= 2;
149           n_left_to_next -= 2;
150
151           b0 = vlib_get_buffer (vm, bi0);
152           b1 = vlib_get_buffer (vm, bi1);
153
154           next0 = vnet_buffer(b0)->io_handoff.next_index;
155           next1 = vnet_buffer(b1)->io_handoff.next_index;
156
157           if (PREDICT_FALSE(b0->flags & VLIB_BUFFER_IS_TRACED))
158             {
159               vlib_trace_buffer (vm, node, next0, b0, /* follow_chain */ 0);
160               handoff_dispatch_trace_t *t =
161                 vlib_add_trace (vm, node, b0, sizeof (*t));
162               sw_if_index0 = vnet_buffer(b0)->sw_if_index[VLIB_RX];
163               t->sw_if_index = sw_if_index0;
164               t->next_index = next0;
165               t->buffer_index = bi0;
166             }
167           if (PREDICT_FALSE(b1->flags & VLIB_BUFFER_IS_TRACED))
168             {
169               vlib_trace_buffer (vm, node, next1, b1, /* follow_chain */ 0);
170               handoff_dispatch_trace_t *t =
171                 vlib_add_trace (vm, node, b1, sizeof (*t));
172               sw_if_index1 = vnet_buffer(b1)->sw_if_index[VLIB_RX];
173               t->sw_if_index = sw_if_index1;
174               t->next_index = next1;
175               t->buffer_index = bi1;
176             }
177             
178           /* verify speculative enqueues, maybe switch current next frame */
179           vlib_validate_buffer_enqueue_x2 (vm, node, next_index,
180                                            to_next, n_left_to_next,
181                                            bi0, bi1, next0, next1);
182         }
183       
184       while (n_left_from > 0 && n_left_to_next > 0)
185         {
186           u32 bi0;
187           vlib_buffer_t * b0;
188           u32 next0;
189           u32 sw_if_index0;
190
191           /* speculatively enqueue b0 to the current next frame */
192           bi0 = from[0];
193           to_next[0] = bi0;
194           from += 1;
195           to_next += 1;
196           n_left_from -= 1;
197           n_left_to_next -= 1;
198
199           b0 = vlib_get_buffer (vm, bi0);
200
201           next0 = vnet_buffer(b0)->io_handoff.next_index;
202
203           if (PREDICT_FALSE(b0->flags & VLIB_BUFFER_IS_TRACED))
204             {
205               vlib_trace_buffer (vm, node, next0, b0, /* follow_chain */ 0);
206               handoff_dispatch_trace_t *t =
207                 vlib_add_trace (vm, node, b0, sizeof (*t));
208               sw_if_index0 = vnet_buffer(b0)->sw_if_index[VLIB_RX];
209               t->sw_if_index = sw_if_index0;
210               t->next_index = next0;
211               t->buffer_index = bi0;
212            }
213
214           /* verify speculative enqueue, maybe switch current next frame */
215           vlib_validate_buffer_enqueue_x1 (vm, node, next_index,
216                                            to_next, n_left_to_next,
217                                            bi0, next0);
218         }
219
220       vlib_put_next_frame (vm, node, next_index, n_left_to_next);
221     }
222
223   return frame->n_vectors;
224 }
225
226 VLIB_REGISTER_NODE (handoff_dispatch_node) = {
227   .function = handoff_dispatch_node_fn,
228   .name = "handoff-dispatch",
229   .vector_size = sizeof (u32),
230   .format_trace = format_handoff_dispatch_trace,
231   .type = VLIB_NODE_TYPE_INTERNAL,
232   .flags = VLIB_NODE_FLAG_IS_HANDOFF,
233   
234   .n_errors = ARRAY_LEN(handoff_dispatch_error_strings),
235   .error_strings = handoff_dispatch_error_strings,
236
237   .n_next_nodes = DPDK_RX_N_NEXT,
238
239   .next_nodes = {
240         [DPDK_RX_NEXT_DROP] = "error-drop",
241         [DPDK_RX_NEXT_ETHERNET_INPUT] = "ethernet-input",
242         [DPDK_RX_NEXT_IP4_INPUT] = "ip4-input",
243         [DPDK_RX_NEXT_IP6_INPUT] = "ip6-input",
244         [DPDK_RX_NEXT_MPLS_INPUT] = "mpls-gre-input",
245   },
246 };
247
248 clib_error_t *handoff_dispatch_init (vlib_main_t *vm)
249 {
250   handoff_dispatch_main_t * mp = &handoff_dispatch_main;
251     
252   mp->vlib_main = vm;
253   mp->vnet_main = &vnet_main;
254
255   return 0;
256 }
257
258 VLIB_INIT_FUNCTION (handoff_dispatch_init);
259
260 u32 dpdk_get_handoff_node_index (void)
261 {
262   return handoff_dispatch_node.index;
263 }
264
265 static char * dpdk_error_strings[] = {
266 #define _(n,s) s,
267     foreach_dpdk_error
268 #undef _
269 };
270
271 typedef struct {
272   u32 buffer_index;
273   u16 device_index;
274   u16 queue_index;
275   struct rte_mbuf mb;
276   vlib_buffer_t buffer; /* Copy of VLIB buffer; pkt data stored in pre_data. */
277 } dpdk_rx_dma_trace_t;
278
279 static u8 * format_dpdk_rx_dma_trace (u8 * s, va_list * va)
280 {
281   CLIB_UNUSED (vlib_main_t * vm) = va_arg (*va, vlib_main_t *);
282   CLIB_UNUSED (vlib_node_t * node) = va_arg (*va, vlib_node_t *);
283   CLIB_UNUSED (vnet_main_t * vnm) = vnet_get_main();
284   dpdk_rx_dma_trace_t * t = va_arg (*va, dpdk_rx_dma_trace_t *);
285   dpdk_main_t * dm = &dpdk_main;
286   dpdk_device_t * xd = vec_elt_at_index (dm->devices, t->device_index);
287   format_function_t * f;
288   uword indent = format_get_indent (s);
289   vnet_sw_interface_t * sw = vnet_get_sw_interface (vnm, xd->vlib_sw_if_index);
290
291   s = format (s, "%U rx queue %d",
292               format_vnet_sw_interface_name, vnm, sw,
293               t->queue_index);
294
295   s = format (s, "\n%Ubuffer 0x%x: %U",
296               format_white_space, indent,
297               t->buffer_index,
298               format_vlib_buffer, &t->buffer);
299
300 #ifdef RTE_LIBRTE_MBUF_EXT_RX_OLFLAGS
301   s = format (s, "\n%U%U",
302               format_white_space, indent,
303               format_dpdk_rx_rte_mbuf, &t->mb);
304 #else
305   s = format (s, "\n%U%U",
306               format_white_space, indent,
307               format_dpdk_rte_mbuf, &t->mb);
308 #endif /* RTE_LIBRTE_MBUF_EXT_RX_OLFLAGS */
309   f = node->format_buffer;
310   if (!f)
311     f = format_hex_bytes;
312   s = format (s, "\n%U%U", format_white_space, indent,
313               f, t->buffer.pre_data, sizeof (t->buffer.pre_data));
314
315   return s;
316 }
317
318 always_inline void
319 dpdk_rx_next_and_error_from_mb_flags_x1 (dpdk_device_t *xd, struct rte_mbuf *mb,
320                                          vlib_buffer_t *b0,
321                                          u8 * next0, u8 * error0)
322 {
323   u8 is0_ip4, is0_ip6, is0_mpls, n0;
324   uint16_t mb_flags = mb->ol_flags;
325
326   if (PREDICT_FALSE(mb_flags & (
327 #ifdef RTE_LIBRTE_MBUF_EXT_RX_OLFLAGS
328        PKT_EXT_RX_PKT_ERROR | PKT_EXT_RX_BAD_FCS   |
329 #endif /* RTE_LIBRTE_MBUF_EXT_RX_OLFLAGS */
330         PKT_RX_IP_CKSUM_BAD  | PKT_RX_L4_CKSUM_BAD
331     ))) 
332     {
333       /* some error was flagged. determine the drop reason */ 
334       n0 = DPDK_RX_NEXT_DROP;
335       *error0 = 
336 #ifdef RTE_LIBRTE_MBUF_EXT_RX_OLFLAGS
337         (mb_flags & PKT_EXT_RX_PKT_ERROR) ? DPDK_ERROR_RX_PACKET_ERROR : 
338         (mb_flags & PKT_EXT_RX_BAD_FCS) ? DPDK_ERROR_RX_BAD_FCS : 
339 #endif /* RTE_LIBRTE_MBUF_EXT_RX_OLFLAGS */
340         (mb_flags & PKT_RX_IP_CKSUM_BAD) ? DPDK_ERROR_IP_CHECKSUM_ERROR : 
341         (mb_flags & PKT_RX_L4_CKSUM_BAD) ? DPDK_ERROR_L4_CHECKSUM_ERROR : 
342         DPDK_ERROR_NONE;
343     }
344   else
345     {
346       *error0 = DPDK_ERROR_NONE;
347       if (xd->per_interface_next_index != ~0)
348         n0 = xd->per_interface_next_index;
349       else if (mb_flags & PKT_RX_VLAN_PKT)
350         n0 = DPDK_RX_NEXT_ETHERNET_INPUT;
351       else
352         {
353           n0 = DPDK_RX_NEXT_ETHERNET_INPUT;
354 #if RTE_VERSION >= RTE_VERSION_NUM(2, 1, 0, 0)
355           is0_ip4 = (mb->packet_type & (RTE_PTYPE_L3_IPV4 | RTE_PTYPE_L3_IPV4_EXT)) != 0;
356 #else
357           is0_ip4 = (mb_flags & (PKT_RX_IPV4_HDR | PKT_RX_IPV4_HDR_EXT)) != 0;
358 #endif
359
360           if (PREDICT_TRUE(is0_ip4))
361             n0 = DPDK_RX_NEXT_IP4_INPUT;
362           else
363             {
364 #if RTE_VERSION >= RTE_VERSION_NUM(2, 1, 0, 0)
365               is0_ip6 =
366               (mb->packet_type & (RTE_PTYPE_L3_IPV6 | RTE_PTYPE_L3_IPV6_EXT)) != 0;
367 #else
368               is0_ip6 = 
369                       (mb_flags & (PKT_RX_IPV6_HDR | PKT_RX_IPV6_HDR_EXT)) != 0;
370 #endif
371               if (PREDICT_TRUE(is0_ip6))
372                 n0 = DPDK_RX_NEXT_IP6_INPUT;
373               else
374                 {
375                   ethernet_header_t *h0 = (ethernet_header_t *) b0->data;
376                   is0_mpls = (h0->type == clib_host_to_net_u16(ETHERNET_TYPE_MPLS_UNICAST));
377                   n0 = is0_mpls ? DPDK_RX_NEXT_MPLS_INPUT : n0;
378                 }
379             }
380         }
381     }
382   *next0 = n0;
383 }
384
385 void dpdk_rx_trace (dpdk_main_t * dm,
386                     vlib_node_runtime_t * node,
387                     dpdk_device_t * xd,
388                     u16 queue_id,
389                     u32 * buffers,
390                     uword n_buffers)
391 {
392   vlib_main_t * vm = vlib_get_main();
393   u32 * b, n_left;
394   u8 next0;
395
396   n_left = n_buffers;
397   b = buffers;
398
399   while (n_left >= 1)
400     {
401       u32 bi0;
402       vlib_buffer_t * b0;
403       dpdk_rx_dma_trace_t * t0;
404       struct rte_mbuf *mb;
405       u8 error0;
406
407       bi0 = b[0];
408       n_left -= 1;
409
410       b0 = vlib_get_buffer (vm, bi0);
411       mb = ((struct rte_mbuf *)b0) - 1;
412       dpdk_rx_next_and_error_from_mb_flags_x1 (xd, mb, b0,
413                                                &next0, &error0);
414       vlib_trace_buffer (vm, node, next0, b0, /* follow_chain */ 0);
415       t0 = vlib_add_trace (vm, node, b0, sizeof (t0[0]));
416       t0->queue_index = queue_id;
417       t0->device_index = xd->device_index;
418       t0->buffer_index = bi0;
419
420       memcpy (&t0->mb, mb, sizeof (t0->mb));
421       memcpy (&t0->buffer, b0, sizeof (b0[0]) - sizeof (b0->pre_data));
422       memcpy (t0->buffer.pre_data, b0->data, sizeof (t0->buffer.pre_data));
423
424 #ifdef RTE_LIBRTE_MBUF_EXT_RX_OLFLAGS
425       /*
426        * Clear overloaded TX offload flags when a DPDK driver
427        * is using them for RX flags (e.g. Cisco VIC Ethernet driver)
428        */
429       mb->ol_flags &= PKT_EXT_RX_CLR_TX_FLAGS_MASK;
430 #endif /* RTE_LIBRTE_MBUF_EXT_RX_OLFLAGS */
431
432       b += 1;
433     }
434 }
435
436 /*
437  * dpdk_efd_update_counters()
438  * Update EFD (early-fast-discard) counters
439  */
440 void dpdk_efd_update_counters (dpdk_device_t *xd,
441                                u32 n_buffers,
442                                u16 enabled)
443 {
444   if (enabled & DPDK_EFD_MONITOR_ENABLED)
445     {
446       u64 now = clib_cpu_time_now();
447       if (xd->efd_agent.last_poll_time > 0)
448         {
449           u64 elapsed_time = (now - xd->efd_agent.last_poll_time);
450           if (elapsed_time > xd->efd_agent.max_poll_delay)
451             xd->efd_agent.max_poll_delay = elapsed_time;
452         }
453       xd->efd_agent.last_poll_time = now;
454     }
455   
456   xd->efd_agent.total_packet_cnt += n_buffers;
457   xd->efd_agent.last_burst_sz = n_buffers;
458
459   if (n_buffers > xd->efd_agent.max_burst_sz)
460     xd->efd_agent.max_burst_sz = n_buffers;
461
462   if (PREDICT_FALSE(n_buffers == VLIB_FRAME_SIZE))
463     {
464       xd->efd_agent.full_frames_cnt++;
465       xd->efd_agent.consec_full_frames_cnt++;
466     }
467   else
468     {
469       xd->efd_agent.consec_full_frames_cnt = 0;
470     }
471 }
472
473 /* is_efd_discardable()
474  *   returns non zero DPDK error if packet meets early-fast-discard criteria,
475  *           zero otherwise
476  */
477 u32 is_efd_discardable (vlib_thread_main_t *tm,
478                         vlib_buffer_t * b0,
479                         struct rte_mbuf *mb)
480 {
481   ethernet_header_t *eh = (ethernet_header_t *) b0->data;
482
483   if (eh->type == clib_host_to_net_u16(ETHERNET_TYPE_IP4))
484     {
485       ip4_header_t *ipv4 =
486           (ip4_header_t *)&(b0->data[sizeof(ethernet_header_t)]);
487       u8 pkt_prec = (ipv4->tos >> 5);
488           
489       return (tm->efd.ip_prec_bitmap & (1 << pkt_prec) ?
490                   DPDK_ERROR_IPV4_EFD_DROP_PKTS : DPDK_ERROR_NONE);
491     }
492   else if (eh->type == clib_net_to_host_u16(ETHERNET_TYPE_IP6))
493     {
494       ip6_header_t *ipv6 =
495           (ip6_header_t *)&(b0->data[sizeof(ethernet_header_t)]);
496       u8 pkt_tclass =
497           ((ipv6->ip_version_traffic_class_and_flow_label >> 20) & 0xff);
498           
499       return (tm->efd.ip_prec_bitmap & (1 << pkt_tclass) ?
500                   DPDK_ERROR_IPV6_EFD_DROP_PKTS : DPDK_ERROR_NONE);
501     }
502   else if (eh->type == clib_net_to_host_u16(ETHERNET_TYPE_MPLS_UNICAST))
503     {
504       mpls_unicast_header_t *mpls =
505           (mpls_unicast_header_t *)&(b0->data[sizeof(ethernet_header_t)]);
506       u8 pkt_exp = ((mpls->label_exp_s_ttl >> 9) & 0x07);
507
508       return (tm->efd.mpls_exp_bitmap & (1 << pkt_exp) ?
509                   DPDK_ERROR_MPLS_EFD_DROP_PKTS : DPDK_ERROR_NONE);
510     }
511   else if ((eh->type == clib_net_to_host_u16(ETHERNET_TYPE_VLAN)) ||
512            (eh->type == clib_net_to_host_u16(ETHERNET_TYPE_DOT1AD)))
513     {
514       ethernet_vlan_header_t *vlan =
515           (ethernet_vlan_header_t *)&(b0->data[sizeof(ethernet_header_t)]);
516       u8 pkt_cos = ((vlan->priority_cfi_and_id >> 13) & 0x07);
517
518       return (tm->efd.vlan_cos_bitmap & (1 << pkt_cos) ?
519                   DPDK_ERROR_VLAN_EFD_DROP_PKTS : DPDK_ERROR_NONE);
520     }
521
522   return DPDK_ERROR_NONE;
523 }
524
525 /*
526  * This function is used when there are no worker threads.
527  * The main thread performs IO and forwards the packets. 
528  */
529 static inline u32 dpdk_device_input ( dpdk_main_t * dm, 
530                                       dpdk_device_t * xd,
531                                       vlib_node_runtime_t * node,
532                                       u32 cpu_index,
533                                       u16 queue_id)
534 {
535   u32 n_buffers;
536   u32 next_index = DPDK_RX_NEXT_ETHERNET_INPUT;
537   u32 n_left_to_next, * to_next;
538   u32 mb_index;
539   vlib_main_t * vm = vlib_get_main();
540   uword n_rx_bytes = 0;
541   u32 n_trace, trace_cnt __attribute__((unused));
542   vlib_buffer_free_list_t * fl;
543   u8 efd_discard_burst = 0;
544   u16 ip_align_offset = 0;
545   u32 buffer_flags_template;
546   
547   if (xd->admin_up == 0)
548     return 0;
549
550   n_buffers = dpdk_rx_burst(dm, xd, queue_id);
551
552   if (n_buffers == 0)
553     {
554       /* check if EFD (dpdk) is enabled */
555       if (PREDICT_FALSE(dm->efd.enabled))
556         {
557           /* reset a few stats */
558           xd->efd_agent.last_poll_time = 0;
559           xd->efd_agent.last_burst_sz = 0;
560         }
561       return 0;
562     }
563
564   if (xd->pmd == VNET_DPDK_PMD_THUNDERX)
565       ip_align_offset = 6;
566
567   buffer_flags_template = dm->buffer_flags_template;
568
569   vec_reset_length (xd->d_trace_buffers);
570   trace_cnt = n_trace = vlib_get_trace_count (vm, node);
571
572   fl = vlib_buffer_get_free_list (vm, VLIB_BUFFER_DEFAULT_FREE_LIST_INDEX);
573
574   /*
575    * DAW-FIXME: VMXNET3 device stop/start doesn't work, 
576    * therefore fake the stop in the dpdk driver by
577    * silently dropping all of the incoming pkts instead of 
578    * stopping the driver / hardware.
579    */
580   if (PREDICT_FALSE(xd->admin_up != 1))
581     {
582       for (mb_index = 0; mb_index < n_buffers; mb_index++)
583         rte_pktmbuf_free (xd->rx_vectors[queue_id][mb_index]);
584       
585       return 0;
586     }
587
588   /* Check for congestion if EFD (Early-Fast-Discard) is enabled
589    * in any mode (e.g. dpdk, monitor, or drop_all)
590    */
591   if (PREDICT_FALSE(dm->efd.enabled))
592     {
593       /* update EFD counters */
594       dpdk_efd_update_counters(xd, n_buffers, dm->efd.enabled);
595
596       if (PREDICT_FALSE(dm->efd.enabled & DPDK_EFD_DROPALL_ENABLED))
597         {
598           /* discard all received packets */
599           for (mb_index = 0; mb_index < n_buffers; mb_index++)
600             rte_pktmbuf_free(xd->rx_vectors[queue_id][mb_index]);
601
602           xd->efd_agent.discard_cnt += n_buffers;
603           increment_efd_drop_counter(vm, 
604                                      DPDK_ERROR_VLAN_EFD_DROP_PKTS,
605                                      n_buffers);
606
607           return 0;
608         }
609       
610       if (PREDICT_FALSE(xd->efd_agent.consec_full_frames_cnt >=
611                         dm->efd.consec_full_frames_hi_thresh))
612         {
613           u32 device_queue_sz = rte_eth_rx_queue_count(xd->device_index,
614                                                        queue_id);
615           if (device_queue_sz >= dm->efd.queue_hi_thresh)
616             {
617               /* dpdk device queue has reached the critical threshold */
618               xd->efd_agent.congestion_cnt++;
619
620               /* apply EFD to packets from the burst */
621               efd_discard_burst = 1;
622             }
623         }
624     }
625   
626   mb_index = 0;
627
628   while (n_buffers > 0)
629     {
630       u32 bi0;
631       u8 next0, error0;
632       u32 l3_offset0;
633       vlib_buffer_t * b0, * b_seg, * b_chain = 0;
634       u32 cntr_type;
635
636       vlib_get_next_frame (vm, node, next_index, to_next, n_left_to_next);
637
638       while (n_buffers > 0 && n_left_to_next > 0)
639         {
640           u8 nb_seg = 1;
641           struct rte_mbuf *mb = xd->rx_vectors[queue_id][mb_index];
642           struct rte_mbuf *mb_seg = mb->next;
643
644           if (PREDICT_TRUE(n_buffers > 2))
645           {
646               struct rte_mbuf *pfmb = xd->rx_vectors[queue_id][mb_index+2];
647               vlib_buffer_t *bp = (vlib_buffer_t *)(pfmb+1);
648               CLIB_PREFETCH (pfmb, CLIB_CACHE_LINE_BYTES, STORE);
649               CLIB_PREFETCH (bp, CLIB_CACHE_LINE_BYTES, STORE);
650           }
651
652           ASSERT(mb);
653
654           b0 = (vlib_buffer_t *)(mb+1);
655
656           /* check whether EFD is looking for packets to discard */
657           if (PREDICT_FALSE(efd_discard_burst))
658             {
659               vlib_thread_main_t * tm = vlib_get_thread_main();
660               
661               if (PREDICT_TRUE(cntr_type = is_efd_discardable(tm, b0, mb)))
662                 {
663                   rte_pktmbuf_free(mb);
664                   xd->efd_agent.discard_cnt++;
665                   increment_efd_drop_counter(vm, 
666                                              cntr_type,
667                                              1);
668                   n_buffers--;
669                   mb_index++;
670                   continue;
671                 }
672             }
673
674           /* Prefetch one next segment if it exists. */
675           if (PREDICT_FALSE(mb->nb_segs > 1))
676             {
677               struct rte_mbuf *pfmb = mb->next;
678               vlib_buffer_t *bp = (vlib_buffer_t *)(pfmb+1);
679               CLIB_PREFETCH (pfmb, CLIB_CACHE_LINE_BYTES, LOAD);
680               CLIB_PREFETCH (bp, CLIB_CACHE_LINE_BYTES, STORE);
681               b_chain = b0;
682             }
683
684           vlib_buffer_init_for_free_list (b0, fl);
685           b0->clone_count = 0;
686           
687           bi0 = vlib_get_buffer_index (vm, b0);
688
689           to_next[0] = bi0;
690           to_next++;
691           n_left_to_next--;
692           
693           dpdk_rx_next_and_error_from_mb_flags_x1 (xd, mb, b0,
694                                                    &next0, &error0);
695 #ifdef RTE_LIBRTE_MBUF_EXT_RX_OLFLAGS
696           /*
697            * Clear overloaded TX offload flags when a DPDK driver
698            * is using them for RX flags (e.g. Cisco VIC Ethernet driver)
699            */
700
701           if (PREDICT_TRUE(trace_cnt == 0))
702             mb->ol_flags &= PKT_EXT_RX_CLR_TX_FLAGS_MASK;
703           else
704             trace_cnt--;
705 #endif /* RTE_LIBRTE_MBUF_EXT_RX_OLFLAGS */
706
707           b0->error = node->errors[error0];
708
709           l3_offset0 = ((next0 == DPDK_RX_NEXT_IP4_INPUT ||
710                          next0 == DPDK_RX_NEXT_IP6_INPUT ||
711                          next0 == DPDK_RX_NEXT_MPLS_INPUT) ? 
712                         sizeof (ethernet_header_t) : 0);
713
714           b0->current_data = l3_offset0;
715           b0->current_length = mb->data_len - l3_offset0;
716
717           if (PREDICT_FALSE (ip_align_offset != 0))
718             {
719               if (next0 == DPDK_RX_NEXT_IP4_INPUT ||
720                   next0 == DPDK_RX_NEXT_IP6_INPUT)
721                 b0->current_data += ip_align_offset;
722             }
723              
724           b0->flags = buffer_flags_template;
725
726           if (VMWARE_LENGTH_BUG_WORKAROUND)
727               b0->current_length -= 4;
728
729           vnet_buffer(b0)->sw_if_index[VLIB_RX] = xd->vlib_sw_if_index;
730           vnet_buffer(b0)->sw_if_index[VLIB_TX] = (u32)~0;
731           n_rx_bytes += mb->pkt_len;
732
733           /* Process subsequent segments of multi-segment packets */
734           while ((mb->nb_segs > 1) && (nb_seg < mb->nb_segs))
735             {
736               ASSERT(mb_seg != 0);
737
738               b_seg = (vlib_buffer_t *)(mb_seg+1);
739               vlib_buffer_init_for_free_list (b_seg, fl);
740               b_seg->clone_count = 0;
741
742               ASSERT((b_seg->flags & VLIB_BUFFER_NEXT_PRESENT) == 0);
743               ASSERT(b_seg->current_data == 0);
744
745               /*
746                * The driver (e.g. virtio) may not put the packet data at the start
747                * of the segment, so don't assume b_seg->current_data == 0 is correct.
748                */
749               b_seg->current_data = (mb_seg->buf_addr + mb_seg->data_off) - (void *)b_seg->data;
750
751               b_seg->current_length = mb_seg->data_len;
752               b0->total_length_not_including_first_buffer +=
753                 mb_seg->data_len;
754
755               b_chain->flags |= VLIB_BUFFER_NEXT_PRESENT;
756               b_chain->next_buffer = vlib_get_buffer_index (vm, b_seg);
757
758               b_chain = b_seg;
759               mb_seg = mb_seg->next;
760               nb_seg++;
761             } 
762
763           /*
764            * Turn this on if you run into
765            * "bad monkey" contexts, and you want to know exactly
766            * which nodes they've visited... See main.c...
767            */
768           VLIB_BUFFER_TRACE_TRAJECTORY_INIT(b0);
769
770           vlib_validate_buffer_enqueue_x1 (vm, node, next_index,
771                                            to_next, n_left_to_next,
772                                            bi0, next0);
773           if (PREDICT_FALSE (n_trace > mb_index))
774             vec_add1 (xd->d_trace_buffers, bi0);
775           n_buffers--;
776           mb_index++;
777         }
778       vlib_put_next_frame (vm, node, next_index, n_left_to_next);
779     }
780
781   if (PREDICT_FALSE (vec_len (xd->d_trace_buffers) > 0))
782     {
783       dpdk_rx_trace (dm, node, xd, queue_id, xd->d_trace_buffers,
784                      vec_len (xd->d_trace_buffers));
785       vlib_set_trace_count (vm, node, n_trace - vec_len (xd->d_trace_buffers));
786     }
787   
788   vlib_increment_combined_counter 
789     (vnet_get_main()->interface_main.combined_sw_if_counters
790      + VNET_INTERFACE_COUNTER_RX,
791      cpu_index, 
792      xd->vlib_sw_if_index,
793      mb_index, n_rx_bytes);
794
795   dpdk_worker_t * dw = vec_elt_at_index(dm->workers, cpu_index);
796   dw->aggregate_rx_packets += mb_index;
797
798   return mb_index;
799 }
800
801 #if VIRL > 0
802 #define VIRL_SPEED_LIMIT()                         \
803   /* Limit the input rate to 1000 vectors / sec */ \
804   {                                                \
805     struct timespec ts, tsrem;                     \
806                                                    \
807     ts.tv_sec = 0;                                 \
808     ts.tv_nsec = 1000*1000; /* 1ms */              \
809                                                    \
810     while (nanosleep(&ts, &tsrem) < 0)             \
811       {                                            \
812         ts = tsrem;                                \
813       }                                            \
814   }
815 #else
816 #define VIRL_SPEED_LIMIT()
817 #endif
818
819
820 static uword
821 dpdk_input (vlib_main_t * vm,
822             vlib_node_runtime_t * node,
823             vlib_frame_t * f)
824 {
825   dpdk_main_t * dm = &dpdk_main;
826   dpdk_device_t * xd;
827   uword n_rx_packets = 0;
828   dpdk_device_and_queue_t * dq;
829   u32 cpu_index = os_get_cpu_number();
830
831   /*
832    * Poll all devices on this cpu for input/interrupts.
833    */
834   vec_foreach (dq, dm->devices_by_cpu[cpu_index])
835     {
836       xd = vec_elt_at_index(dm->devices, dq->device);
837       ASSERT(dq->queue_id == 0);
838       n_rx_packets += dpdk_device_input (dm, xd, node, cpu_index, 0);
839     }
840
841   VIRL_SPEED_LIMIT()
842
843   return n_rx_packets;
844 }
845
846 uword
847 dpdk_input_rss (vlib_main_t * vm,
848       vlib_node_runtime_t * node,
849       vlib_frame_t * f)
850 {
851   dpdk_main_t * dm = &dpdk_main;
852   dpdk_device_t * xd;
853   uword n_rx_packets = 0;
854   dpdk_device_and_queue_t * dq;
855   u32 cpu_index = os_get_cpu_number();
856
857   /*
858    * Poll all devices on this cpu for input/interrupts.
859    */
860   vec_foreach (dq, dm->devices_by_cpu[cpu_index])
861     {
862       xd = vec_elt_at_index(dm->devices, dq->device);
863       n_rx_packets += dpdk_device_input (dm, xd, node, cpu_index, dq->queue_id);
864     }
865
866   VIRL_SPEED_LIMIT()
867
868   return n_rx_packets;
869 }
870
871 VLIB_REGISTER_NODE (dpdk_input_node) = {
872   .function = dpdk_input,
873   .type = VLIB_NODE_TYPE_INPUT,
874   .name = "dpdk-input",
875
876   /* Will be enabled if/when hardware is detected. */
877   .state = VLIB_NODE_STATE_DISABLED,
878
879   .format_buffer = format_ethernet_header_with_length,
880   .format_trace = format_dpdk_rx_dma_trace,
881
882   .n_errors = DPDK_N_ERROR,
883   .error_strings = dpdk_error_strings,
884
885   .n_next_nodes = DPDK_RX_N_NEXT,
886   .next_nodes = {
887     [DPDK_RX_NEXT_DROP] = "error-drop",
888     [DPDK_RX_NEXT_ETHERNET_INPUT] = "ethernet-input",
889     [DPDK_RX_NEXT_IP4_INPUT] = "ip4-input-no-checksum",
890     [DPDK_RX_NEXT_IP6_INPUT] = "ip6-input",
891     [DPDK_RX_NEXT_MPLS_INPUT] = "mpls-gre-input",
892   },
893 };
894
895 /*
896  * Override the next nodes for the dpdk input nodes.
897  * Must be invoked prior to VLIB_INIT_FUNCTION calls.
898  */
899 void dpdk_set_next_node (dpdk_rx_next_t next, char *name)
900 {
901   vlib_node_registration_t *r = &dpdk_input_node;
902   vlib_node_registration_t *r_io = &dpdk_io_input_node;
903   vlib_node_registration_t *r_handoff = &handoff_dispatch_node;
904
905   switch (next)
906     {
907     case DPDK_RX_NEXT_IP4_INPUT:
908     case DPDK_RX_NEXT_IP6_INPUT:
909     case DPDK_RX_NEXT_MPLS_INPUT:
910     case DPDK_RX_NEXT_ETHERNET_INPUT:
911       r->next_nodes[next] = name;
912       r_io->next_nodes[next] = name;
913       r_handoff->next_nodes[next] = name;
914       break;
915
916     default:
917       clib_warning ("%s: illegal next %d\n", __FUNCTION__, next);
918       break;
919     }
920 }
921
922 inline vlib_frame_queue_elt_t * 
923 vlib_get_handoff_queue_elt (u32 vlib_worker_index) 
924 {
925   vlib_frame_queue_t *fq;
926   vlib_frame_queue_elt_t *elt;
927   u64 new_tail;
928   
929   fq = vlib_frame_queues[vlib_worker_index];
930   ASSERT (fq);
931
932   new_tail = __sync_add_and_fetch (&fq->tail, 1);
933
934   /* Wait until a ring slot is available */
935   while (new_tail >= fq->head_hint + fq->nelts)
936       vlib_worker_thread_barrier_check ();
937
938   elt = fq->elts + (new_tail & (fq->nelts-1));
939
940   /* this would be very bad... */
941   while (elt->valid) 
942     ;
943
944   elt->msg_type = VLIB_FRAME_QUEUE_ELT_DISPATCH_FRAME;
945   elt->last_n_vectors = elt->n_vectors = 0;
946
947   return elt;
948 }
949
950 static inline vlib_frame_queue_elt_t *
951 dpdk_get_handoff_queue_elt ( 
952     u32 vlib_worker_index, 
953     vlib_frame_queue_elt_t ** handoff_queue_elt_by_worker_index)
954 {
955   vlib_frame_queue_elt_t *elt;
956
957   if (handoff_queue_elt_by_worker_index [vlib_worker_index])
958       return handoff_queue_elt_by_worker_index [vlib_worker_index];
959
960   elt = vlib_get_handoff_queue_elt (vlib_worker_index);
961
962   handoff_queue_elt_by_worker_index [vlib_worker_index] = elt;
963
964   return elt;
965 }
966
967 static inline vlib_frame_queue_t *
968 is_vlib_handoff_queue_congested (
969     u32 vlib_worker_index,
970     u32 queue_hi_thresh,
971     vlib_frame_queue_t ** handoff_queue_by_worker_index)
972 {
973   vlib_frame_queue_t *fq;
974
975   fq = handoff_queue_by_worker_index [vlib_worker_index];
976   if (fq != (vlib_frame_queue_t *)(~0)) 
977       return fq;
978   
979   fq = vlib_frame_queues[vlib_worker_index];
980   ASSERT (fq);
981
982   if (PREDICT_FALSE(fq->tail >= (fq->head_hint + queue_hi_thresh))) {
983     /* a valid entry in the array will indicate the queue has reached
984      * the specified threshold and is congested
985      */
986     handoff_queue_by_worker_index [vlib_worker_index] = fq;
987     fq->enqueue_full_events++;
988     return fq;
989   }
990
991   return NULL;
992 }
993
994 static inline u64 ipv4_get_key (ip4_header_t *ip)
995 {
996    u64  hash_key;
997
998    hash_key = *((u64*)(&ip->address_pair)) ^ ip->protocol;
999
1000    return hash_key;
1001 }
1002
1003 static inline u64 ipv6_get_key (ip6_header_t *ip)
1004 {
1005    u64  hash_key;
1006
1007    hash_key = ip->src_address.as_u64[0] ^
1008               ip->src_address.as_u64[1] ^
1009               ip->dst_address.as_u64[0] ^
1010               ip->dst_address.as_u64[1] ^
1011               ip->protocol;
1012
1013    return hash_key;
1014 }
1015
1016
1017 #define MPLS_BOTTOM_OF_STACK_BIT_MASK   0x00000100U
1018 #define MPLS_LABEL_MASK                 0xFFFFF000U
1019
1020 static inline u64 mpls_get_key (mpls_unicast_header_t *m)
1021 {
1022    u64                     hash_key;
1023    u8                      ip_ver;
1024
1025
1026    /* find the bottom of the MPLS label stack. */
1027    if (PREDICT_TRUE(m->label_exp_s_ttl & 
1028                     clib_net_to_host_u32(MPLS_BOTTOM_OF_STACK_BIT_MASK))) {
1029        goto bottom_lbl_found;
1030    }
1031    m++;
1032
1033    if (PREDICT_TRUE(m->label_exp_s_ttl & 
1034                     clib_net_to_host_u32(MPLS_BOTTOM_OF_STACK_BIT_MASK))) {
1035        goto bottom_lbl_found;
1036    }
1037    m++;
1038
1039    if (m->label_exp_s_ttl & clib_net_to_host_u32(MPLS_BOTTOM_OF_STACK_BIT_MASK)) {
1040        goto bottom_lbl_found;
1041    }
1042    m++;
1043
1044    if (m->label_exp_s_ttl & clib_net_to_host_u32(MPLS_BOTTOM_OF_STACK_BIT_MASK)) {
1045        goto bottom_lbl_found;
1046    }
1047    m++;
1048
1049    if (m->label_exp_s_ttl & clib_net_to_host_u32(MPLS_BOTTOM_OF_STACK_BIT_MASK)) {
1050        goto bottom_lbl_found;
1051    }
1052    
1053    /* the bottom label was not found - use the last label */
1054    hash_key = m->label_exp_s_ttl & clib_net_to_host_u32(MPLS_LABEL_MASK);
1055
1056    return hash_key;
1057    
1058
1059 bottom_lbl_found:
1060    m++;
1061    ip_ver = (*((u8 *)m) >> 4);
1062
1063    /* find out if it is IPV4 or IPV6 header */
1064    if (PREDICT_TRUE(ip_ver == 4)) {
1065        hash_key = ipv4_get_key((ip4_header_t *)m);
1066    } else if (PREDICT_TRUE(ip_ver == 6)) {
1067        hash_key = ipv6_get_key((ip6_header_t *)m);
1068    } else {
1069        /* use the bottom label */
1070        hash_key = (m-1)->label_exp_s_ttl & clib_net_to_host_u32(MPLS_LABEL_MASK);
1071    }
1072
1073    return hash_key;
1074
1075 }
1076
1077 static inline u64 eth_get_key (ethernet_header_t *h0)
1078 {
1079    u64 hash_key;
1080
1081
1082    if (PREDICT_TRUE(h0->type) == clib_host_to_net_u16(ETHERNET_TYPE_IP4)) {
1083        hash_key = ipv4_get_key((ip4_header_t *)(h0+1));
1084    } else if (h0->type == clib_host_to_net_u16(ETHERNET_TYPE_IP6)) {
1085        hash_key = ipv6_get_key((ip6_header_t *)(h0+1));
1086    } else if (h0->type == clib_host_to_net_u16(ETHERNET_TYPE_MPLS_UNICAST)) {
1087        hash_key = mpls_get_key((mpls_unicast_header_t *)(h0+1));
1088    } else if ((h0->type == clib_host_to_net_u16(ETHERNET_TYPE_VLAN)) || 
1089               (h0->type == clib_host_to_net_u16(ETHERNET_TYPE_DOT1AD))) {
1090        ethernet_vlan_header_t * outer = (ethernet_vlan_header_t *)(h0 + 1);
1091        
1092        outer = (outer->type == clib_host_to_net_u16(ETHERNET_TYPE_VLAN)) ? 
1093                                   outer+1 : outer;
1094        if (PREDICT_TRUE(outer->type) == clib_host_to_net_u16(ETHERNET_TYPE_IP4)) {
1095            hash_key = ipv4_get_key((ip4_header_t *)(outer+1));
1096        } else if (outer->type == clib_host_to_net_u16 (ETHERNET_TYPE_IP6)) {
1097            hash_key = ipv6_get_key((ip6_header_t *)(outer+1));
1098        } else if (outer->type == clib_host_to_net_u16(ETHERNET_TYPE_MPLS_UNICAST)) {
1099            hash_key = mpls_get_key((mpls_unicast_header_t *)(outer+1));
1100        }  else {
1101            hash_key = outer->type; 
1102        }
1103    } else {
1104        hash_key  = 0;
1105    }
1106
1107    return hash_key;
1108 }
1109
1110 /*
1111  * This function is used when dedicated IO threads feed the worker threads.
1112  *
1113  * Devices are allocated to this thread based on instances and instance_id.
1114  * If instances==0 then the function automatically determines the number
1115  * of instances of this thread, and allocates devices between them. 
1116  * If instances != 0, then instance_id must be in the range 0..instances-1.
1117  * The function allocates devices among the specified number of instances,
1118  * with this thread having the given instance id. This option is used for 
1119  * splitting devices among differently named "io"-type threads.
1120  */
1121 void dpdk_io_thread (vlib_worker_thread_t * w,
1122                      u32 instances,
1123                      u32 instance_id,
1124                      char *worker_name,
1125                      dpdk_io_thread_callback_t callback)
1126 {
1127   vlib_main_t * vm = vlib_get_main();
1128   vlib_thread_main_t * tm = vlib_get_thread_main();
1129   vlib_thread_registration_t * tr;
1130   dpdk_main_t * dm = &dpdk_main;
1131   char *io_name = w->registration->name;
1132   dpdk_device_t * xd;
1133   dpdk_device_t ** my_devices = 0;
1134   vlib_frame_queue_elt_t ** handoff_queue_elt_by_worker_index = 0;
1135   vlib_frame_queue_t ** congested_handoff_queue_by_worker_index = 0;
1136   vlib_frame_queue_elt_t * hf = 0;
1137   int i;
1138   u32 n_left_to_next_worker = 0, * to_next_worker = 0;
1139   u32 next_worker_index = 0;
1140   u32 current_worker_index = ~0;
1141   u32 cpu_index = os_get_cpu_number();
1142   u32 num_workers = 0;
1143   u32 num_devices = 0;
1144   uword * p;
1145   u16 queue_id = 0;
1146   vlib_node_runtime_t * node_trace;
1147   u32 first_worker_index = 0;
1148   u32 buffer_flags_template;
1149   
1150   /* Wait until the dpdk init sequence is complete */
1151   while (dm->io_thread_release == 0)
1152     vlib_worker_thread_barrier_check();
1153
1154   clib_time_init (&vm->clib_time);
1155
1156   p = hash_get_mem (tm->thread_registrations_by_name, worker_name);
1157   ASSERT (p);
1158   tr = (vlib_thread_registration_t *) p[0];
1159   if (tr) 
1160     {
1161       num_workers = tr->count;
1162       first_worker_index = tr->first_index;
1163     }
1164
1165   /* Allocate devices to this thread */
1166   if (instances == 0) 
1167     {
1168       /* auto-assign */
1169       instance_id = w->instance_id;
1170
1171       p = hash_get_mem (tm->thread_registrations_by_name, io_name);
1172       tr = (vlib_thread_registration_t *) p[0];
1173       /* Otherwise, how did we get here */
1174       ASSERT (tr && tr->count);
1175       instances = tr->count;
1176     }
1177   else
1178     {
1179       /* manually assign */
1180       ASSERT (instance_id < instances);
1181     }
1182
1183   vec_validate (handoff_queue_elt_by_worker_index,
1184                 first_worker_index + num_workers - 1);
1185
1186   vec_validate_init_empty (congested_handoff_queue_by_worker_index,
1187                            first_worker_index + num_workers - 1,
1188                            (vlib_frame_queue_t *)(~0));
1189
1190   /* packet tracing is triggered on the dpdk-input node for ease-of-use */
1191   node_trace = vlib_node_get_runtime (vm, dpdk_input_node.index);
1192
1193   buffer_flags_template = dm->buffer_flags_template;
1194
1195   /* And handle them... */
1196   while (1)
1197     {
1198       u32 n_buffers;
1199       u32 mb_index;
1200       uword n_rx_bytes = 0;
1201       u32 n_trace, trace_cnt __attribute__((unused));
1202       vlib_buffer_free_list_t * fl;
1203       u32 hash;
1204       u64 hash_key;
1205       u8 efd_discard_burst;
1206
1207       vlib_worker_thread_barrier_check ();
1208
1209       /* Invoke callback if supplied */
1210       if (PREDICT_FALSE(callback != NULL))
1211           callback(vm);
1212
1213       if (PREDICT_FALSE(vec_len(dm->devices) != num_devices))
1214       {
1215         vec_reset_length(my_devices);
1216         vec_foreach (xd, dm->devices)
1217           {
1218             if (((xd - dm->devices) % tr->count) == instance_id)
1219               {
1220                 fprintf(stderr, "i/o thread %d (cpu %d) takes port %d\n",
1221                         instance_id, (int) os_get_cpu_number(), (int) (xd - dm->devices));
1222                 vec_add1 (my_devices, xd);
1223               }
1224           }
1225         num_devices = vec_len(dm->devices);
1226       }
1227
1228       for (i = 0; i < vec_len (my_devices); i++)
1229       {
1230           xd = my_devices[i];
1231
1232           if (!xd->admin_up)
1233             continue;
1234
1235           n_buffers = dpdk_rx_burst(dm, xd, 0 /* queue_id */);
1236
1237           if (n_buffers == 0)
1238             {
1239               /* check if EFD (dpdk) is enabled */
1240               if (PREDICT_FALSE(dm->efd.enabled))
1241                 {
1242                   /* reset a few stats */
1243                   xd->efd_agent.last_poll_time = 0;
1244                   xd->efd_agent.last_burst_sz = 0;
1245                 }
1246               continue;
1247             }
1248
1249           vec_reset_length (xd->d_trace_buffers);
1250           trace_cnt = n_trace = vlib_get_trace_count (vm, node_trace);
1251         
1252           /*
1253            * DAW-FIXME: VMXNET3 device stop/start doesn't work, 
1254            * therefore fake the stop in the dpdk driver by
1255            * silently dropping all of the incoming pkts instead of 
1256            * stopping the driver / hardware.
1257            */
1258           if (PREDICT_FALSE(xd->admin_up != 1))
1259             {
1260               for (mb_index = 0; mb_index < n_buffers; mb_index++)
1261                 rte_pktmbuf_free (xd->rx_vectors[queue_id][mb_index]);
1262               continue;
1263             }
1264
1265           /* reset EFD action for the burst */
1266           efd_discard_burst = 0;
1267           
1268           /* Check for congestion if EFD (Early-Fast-Discard) is enabled
1269            * in any mode (e.g. dpdk, monitor, or drop_all)
1270            */
1271           if (PREDICT_FALSE(dm->efd.enabled))
1272             {
1273               /* update EFD counters */
1274               dpdk_efd_update_counters(xd, n_buffers, dm->efd.enabled);
1275
1276               if (PREDICT_FALSE(dm->efd.enabled & DPDK_EFD_DROPALL_ENABLED))
1277                 {
1278                   /* drop all received packets */
1279                   for (mb_index = 0; mb_index < n_buffers; mb_index++)
1280                     rte_pktmbuf_free(xd->rx_vectors[queue_id][mb_index]);
1281
1282                   xd->efd_agent.discard_cnt += n_buffers;
1283                   increment_efd_drop_counter(vm, 
1284                                              DPDK_ERROR_VLAN_EFD_DROP_PKTS,
1285                                              n_buffers);
1286
1287                   continue;
1288                 }
1289
1290               if (PREDICT_FALSE(xd->efd_agent.consec_full_frames_cnt >=
1291                                 dm->efd.consec_full_frames_hi_thresh))
1292                 {
1293                   u32 device_queue_sz = rte_eth_rx_queue_count(xd->device_index,
1294                                                                queue_id);
1295                   if (device_queue_sz >= dm->efd.queue_hi_thresh)
1296                     {
1297                       /* dpdk device queue has reached the critical threshold */
1298                       xd->efd_agent.congestion_cnt++;
1299
1300                       /* apply EFD to packets from the burst */
1301                       efd_discard_burst = 1;
1302                     }
1303                 }
1304             }
1305
1306           fl = vlib_buffer_get_free_list 
1307             (vm, VLIB_BUFFER_DEFAULT_FREE_LIST_INDEX);
1308         
1309           mb_index = 0;
1310
1311           while (n_buffers > 0)
1312             {
1313               u32 bi0;
1314               u8 next0, error0;
1315               u32 l3_offset0;
1316               vlib_buffer_t * b0, * b_seg, * b_chain = 0;
1317               ethernet_header_t * h0;
1318               u8 nb_seg = 1;
1319               struct rte_mbuf *mb = xd->rx_vectors[queue_id][mb_index];
1320               struct rte_mbuf *mb_seg = mb->next;
1321                 
1322               if (PREDICT_TRUE(n_buffers > 1))
1323                 {
1324                   struct rte_mbuf *pfmb = xd->rx_vectors[queue_id][mb_index+2];
1325                   vlib_buffer_t *bp = (vlib_buffer_t *)(pfmb+1);
1326                   CLIB_PREFETCH (pfmb, CLIB_CACHE_LINE_BYTES, LOAD);
1327                   CLIB_PREFETCH (bp, CLIB_CACHE_LINE_BYTES, STORE);
1328                   CLIB_PREFETCH (bp->data, CLIB_CACHE_LINE_BYTES, LOAD);
1329                 }
1330                 
1331               b0 = (vlib_buffer_t *)(mb+1);
1332
1333               /* check whether EFD is looking for packets to discard */
1334               if (PREDICT_FALSE(efd_discard_burst))
1335                 {
1336                   u32 cntr_type;
1337                   if (PREDICT_TRUE(cntr_type = is_efd_discardable(tm, b0, mb)))
1338                     {
1339                       rte_pktmbuf_free(mb);
1340                       xd->efd_agent.discard_cnt++;
1341                       increment_efd_drop_counter(vm, 
1342                                                  cntr_type,
1343                                                  1);
1344
1345                       n_buffers--;
1346                       mb_index++;
1347                       continue;
1348                     }
1349                 }
1350               
1351               /* Prefetch one next segment if it exists */
1352               if (PREDICT_FALSE(mb->nb_segs > 1))
1353                 {
1354                   struct rte_mbuf *pfmb = mb->next;
1355                   vlib_buffer_t *bp = (vlib_buffer_t *)(pfmb+1);
1356                   CLIB_PREFETCH (pfmb, CLIB_CACHE_LINE_BYTES, LOAD);
1357                   CLIB_PREFETCH (bp, CLIB_CACHE_LINE_BYTES, STORE);
1358                   b_chain = b0;
1359                 }
1360
1361               bi0 = vlib_get_buffer_index (vm, b0);
1362               vlib_buffer_init_for_free_list (b0, fl);
1363               b0->clone_count = 0;
1364
1365               dpdk_rx_next_and_error_from_mb_flags_x1 (xd, mb, b0,
1366                                                        &next0, &error0);
1367 #ifdef RTE_LIBRTE_MBUF_EXT_RX_OLFLAGS
1368               /*
1369                * Clear overloaded TX offload flags when a DPDK driver
1370                * is using them for RX flags (e.g. Cisco VIC Ethernet driver)
1371                */
1372               if (PREDICT_TRUE(trace_cnt == 0))
1373                 mb->ol_flags &= PKT_EXT_RX_CLR_TX_FLAGS_MASK;
1374               else
1375                 trace_cnt--;
1376 #endif /* RTE_LIBRTE_MBUF_EXT_RX_OLFLAGS */
1377
1378               if (error0)
1379                   clib_warning ("bi %d error %d", bi0, error0);
1380
1381               b0->error = 0;
1382
1383               l3_offset0 = ((next0 == DPDK_RX_NEXT_IP4_INPUT ||
1384                              next0 == DPDK_RX_NEXT_IP6_INPUT || 
1385                              next0 == DPDK_RX_NEXT_MPLS_INPUT) ? 
1386                             sizeof (ethernet_header_t) : 0);
1387
1388               b0->current_data = l3_offset0;
1389               b0->current_length = mb->data_len - l3_offset0;
1390
1391               b0->flags = buffer_flags_template;
1392
1393               if (VMWARE_LENGTH_BUG_WORKAROUND)
1394                   b0->current_length -= 4;
1395                 
1396               vnet_buffer(b0)->sw_if_index[VLIB_RX] = xd->vlib_sw_if_index;
1397               vnet_buffer(b0)->sw_if_index[VLIB_TX] = (u32)~0;
1398               vnet_buffer(b0)->io_handoff.next_index = next0;
1399               n_rx_bytes += mb->pkt_len;
1400
1401               /* Process subsequent segments of multi-segment packets */
1402               while ((mb->nb_segs > 1) && (nb_seg < mb->nb_segs))
1403                 {
1404                   ASSERT(mb_seg != 0);
1405  
1406                   b_seg = (vlib_buffer_t *)(mb_seg+1);
1407                   vlib_buffer_init_for_free_list (b_seg, fl);
1408                   b_seg->clone_count = 0;
1409  
1410                   ASSERT((b_seg->flags & VLIB_BUFFER_NEXT_PRESENT) == 0);
1411                   ASSERT(b_seg->current_data == 0);
1412  
1413                   /*
1414                    * The driver (e.g. virtio) may not put the packet data at the start
1415                    * of the segment, so don't assume b_seg->current_data == 0 is correct.
1416                    */
1417                   b_seg->current_data = (mb_seg->buf_addr + mb_seg->data_off) - (void *)b_seg->data;
1418
1419                   b_seg->current_length = mb_seg->data_len;
1420                   b0->total_length_not_including_first_buffer +=
1421                     mb_seg->data_len;
1422  
1423                   b_chain->flags |= VLIB_BUFFER_NEXT_PRESENT;
1424                   b_chain->next_buffer = vlib_get_buffer_index (vm, b_seg);
1425  
1426                   b_chain = b_seg;
1427                   mb_seg = mb_seg->next;
1428                   nb_seg++;
1429                 }
1430
1431               /*
1432                * Turn this on if you run into
1433                * "bad monkey" contexts, and you want to know exactly
1434                * which nodes they've visited... See main.c...
1435                */
1436               VLIB_BUFFER_TRACE_TRAJECTORY_INIT(b0);
1437  
1438               if (PREDICT_FALSE (n_trace > mb_index))
1439                 vec_add1 (xd->d_trace_buffers, bi0);
1440
1441               next_worker_index = first_worker_index;
1442
1443               /* 
1444                * Force unknown traffic onto worker 0, 
1445                * and into ethernet-input. $$$$ add more hashes.
1446                */
1447               h0 = (ethernet_header_t *) b0->data;
1448
1449               /* Compute ingress LB hash */
1450               hash_key = eth_get_key(h0);
1451               hash = (u32)clib_xxhash(hash_key);
1452
1453               if (PREDICT_TRUE (is_pow2(num_workers)))
1454                 next_worker_index += hash & (num_workers - 1);
1455               else
1456                 next_worker_index += hash % num_workers;
1457
1458               /* if EFD is enabled and not already discarding from dpdk,
1459                * check the worker ring/queue for congestion
1460                */
1461               if (PREDICT_FALSE(tm->efd.enabled && !efd_discard_burst))
1462                 {
1463                   vlib_frame_queue_t *fq;
1464
1465                   /* fq will be valid if the ring is congested */
1466                   fq = is_vlib_handoff_queue_congested(
1467                       next_worker_index, tm->efd.queue_hi_thresh,
1468                       congested_handoff_queue_by_worker_index);
1469                   
1470                   if (PREDICT_FALSE(fq != NULL))
1471                     {
1472                       u32 cntr_type;
1473                       if (PREDICT_TRUE(cntr_type =
1474                                        is_efd_discardable(tm, b0, mb)))
1475                         {
1476                           /* discard the packet */
1477                           fq->enqueue_efd_discards++;
1478                           increment_efd_drop_counter(vm, cntr_type, 1);
1479                           rte_pktmbuf_free(mb);
1480                           n_buffers--;
1481                           mb_index++;
1482                           continue;
1483                         }
1484                     }
1485                 }
1486               
1487               if (next_worker_index != current_worker_index)
1488                 {
1489                   if (hf)
1490                     hf->n_vectors = VLIB_FRAME_SIZE - n_left_to_next_worker;
1491
1492                   hf = dpdk_get_handoff_queue_elt(
1493                            next_worker_index,
1494                            handoff_queue_elt_by_worker_index);
1495                       
1496                   n_left_to_next_worker = VLIB_FRAME_SIZE - hf->n_vectors;
1497                   to_next_worker = &hf->buffer_index[hf->n_vectors];
1498                   current_worker_index = next_worker_index;
1499                 }
1500               
1501               /* enqueue to correct worker thread */
1502               to_next_worker[0] = bi0;
1503               to_next_worker++;
1504               n_left_to_next_worker--;
1505
1506               if (n_left_to_next_worker == 0)
1507                 {
1508                   hf->n_vectors = VLIB_FRAME_SIZE;
1509                   vlib_put_handoff_queue_elt(hf);
1510                   current_worker_index = ~0;
1511                   handoff_queue_elt_by_worker_index[next_worker_index] = 0;
1512                   hf = 0;
1513                 }
1514                   
1515               n_buffers--;
1516               mb_index++;
1517             }
1518
1519           if (PREDICT_FALSE (vec_len (xd->d_trace_buffers) > 0))
1520             {
1521               /* credit the trace to the trace node */
1522               dpdk_rx_trace (dm, node_trace, xd, queue_id, xd->d_trace_buffers,
1523                              vec_len (xd->d_trace_buffers));
1524               vlib_set_trace_count (vm, node_trace, n_trace - vec_len (xd->d_trace_buffers));
1525             }
1526
1527           vlib_increment_combined_counter 
1528             (vnet_get_main()->interface_main.combined_sw_if_counters
1529              + VNET_INTERFACE_COUNTER_RX,
1530              cpu_index, 
1531              xd->vlib_sw_if_index,
1532              mb_index, n_rx_bytes);
1533
1534           dpdk_worker_t * dw = vec_elt_at_index(dm->workers, cpu_index);
1535           dw->aggregate_rx_packets += mb_index;
1536         }
1537
1538       if (hf)
1539         hf->n_vectors = VLIB_FRAME_SIZE - n_left_to_next_worker;
1540
1541       /* Ship frames to the worker nodes */
1542       for (i = 0; i < vec_len (handoff_queue_elt_by_worker_index); i++)
1543         {
1544           if (handoff_queue_elt_by_worker_index[i])
1545             {
1546               hf = handoff_queue_elt_by_worker_index[i];
1547               /* 
1548                * It works better to let the handoff node
1549                * rate-adapt, always ship the handoff queue element.
1550                */
1551               if (1 || hf->n_vectors == hf->last_n_vectors)
1552                 {
1553                   vlib_put_handoff_queue_elt(hf);
1554                   handoff_queue_elt_by_worker_index[i] = 0;
1555                 }
1556               else
1557                 hf->last_n_vectors = hf->n_vectors;
1558             }
1559           congested_handoff_queue_by_worker_index[i] = (vlib_frame_queue_t *)(~0);
1560         }
1561       hf = 0;
1562       current_worker_index = ~0;
1563
1564       vlib_increment_main_loop_counter (vm);
1565     }
1566 }
1567
1568 /*
1569  * This function is used when the main thread performs IO and feeds the
1570  * worker threads.
1571  */
1572 static uword
1573 dpdk_io_input (vlib_main_t * vm,
1574                vlib_node_runtime_t * node,
1575                vlib_frame_t * f)
1576 {
1577   dpdk_main_t * dm = &dpdk_main;
1578   dpdk_device_t * xd;
1579   vlib_thread_main_t * tm = vlib_get_thread_main();
1580   uword n_rx_packets = 0;
1581   static vlib_frame_queue_elt_t ** handoff_queue_elt_by_worker_index;
1582   static vlib_frame_queue_t ** congested_handoff_queue_by_worker_index = 0;
1583   vlib_frame_queue_elt_t * hf = 0;
1584   int i;
1585   u32 n_left_to_next_worker = 0, * to_next_worker = 0;
1586   u32 next_worker_index = 0;
1587   u32 current_worker_index = ~0;
1588   u32 cpu_index = os_get_cpu_number();
1589   static int num_workers_set;
1590   static u32 num_workers;
1591   u16 queue_id = 0;
1592   vlib_node_runtime_t * node_trace;
1593   static u32 first_worker_index;
1594   u32 buffer_flags_template;
1595
1596   if (PREDICT_FALSE(num_workers_set == 0))
1597     {
1598       uword * p;
1599       vlib_thread_registration_t * tr;
1600       /* Only the standard vnet worker threads are supported */
1601       p = hash_get_mem (tm->thread_registrations_by_name, "workers");
1602       tr = (vlib_thread_registration_t *) p[0];
1603       if (tr) 
1604         {
1605           num_workers = tr->count;
1606           first_worker_index = tr->first_index;
1607         }
1608       num_workers_set = 1;
1609     }
1610
1611   if (PREDICT_FALSE(handoff_queue_elt_by_worker_index == 0))
1612     {
1613       vec_validate (handoff_queue_elt_by_worker_index, tm->n_vlib_mains - 1);
1614       
1615       vec_validate_init_empty (congested_handoff_queue_by_worker_index,
1616                                first_worker_index + num_workers - 1,
1617                                (vlib_frame_queue_t *)(~0));
1618     }
1619
1620   /* packet tracing is triggered on the dpdk-input node for ease-of-use */
1621   node_trace = vlib_node_get_runtime (vm, dpdk_input_node.index);
1622
1623   buffer_flags_template = dm->buffer_flags_template;
1624
1625   vec_foreach (xd, dm->devices)
1626     {
1627       u32 n_buffers;
1628       u32 mb_index;
1629       uword n_rx_bytes = 0;
1630       u32 n_trace, trace_cnt __attribute__((unused));
1631       vlib_buffer_free_list_t * fl;
1632       u32 hash;
1633       u64 hash_key;
1634       u8 efd_discard_burst = 0;
1635
1636       if (!xd->admin_up)
1637         continue;
1638
1639       n_buffers = dpdk_rx_burst(dm, xd, queue_id );
1640
1641       if (n_buffers == 0)
1642         {
1643           /* check if EFD (dpdk) is enabled */
1644           if (PREDICT_FALSE(dm->efd.enabled))
1645             {
1646               /* reset a few stats */
1647               xd->efd_agent.last_poll_time = 0;
1648               xd->efd_agent.last_burst_sz = 0;
1649             }
1650           continue;
1651         }
1652
1653       vec_reset_length (xd->d_trace_buffers);
1654       trace_cnt = n_trace = vlib_get_trace_count (vm, node_trace);
1655         
1656       /*
1657        * DAW-FIXME: VMXNET3 device stop/start doesn't work, 
1658        * therefore fake the stop in the dpdk driver by
1659        * silently dropping all of the incoming pkts instead of 
1660        * stopping the driver / hardware.
1661        */
1662       if (PREDICT_FALSE(xd->admin_up != 1))
1663         {
1664           for (mb_index = 0; mb_index < n_buffers; mb_index++)
1665             rte_pktmbuf_free (xd->rx_vectors[queue_id][mb_index]);
1666           continue;
1667         }
1668
1669       /* Check for congestion if EFD (Early-Fast-Discard) is enabled
1670        * in any mode (e.g. dpdk, monitor, or drop_all)
1671        */
1672       if (PREDICT_FALSE(dm->efd.enabled))
1673         {
1674           /* update EFD counters */
1675           dpdk_efd_update_counters(xd, n_buffers, dm->efd.enabled);
1676
1677           if (PREDICT_FALSE(dm->efd.enabled & DPDK_EFD_DROPALL_ENABLED))
1678             {
1679               /* discard all received packets */
1680               for (mb_index = 0; mb_index < n_buffers; mb_index++)
1681                 rte_pktmbuf_free(xd->rx_vectors[queue_id][mb_index]);
1682
1683               xd->efd_agent.discard_cnt += n_buffers;
1684               increment_efd_drop_counter(vm, 
1685                                          DPDK_ERROR_VLAN_EFD_DROP_PKTS,
1686                                          n_buffers);
1687             
1688               continue;
1689             }
1690           
1691           if (PREDICT_FALSE(xd->efd_agent.consec_full_frames_cnt >=
1692                             dm->efd.consec_full_frames_hi_thresh))
1693             {
1694               u32 device_queue_sz = rte_eth_rx_queue_count(xd->device_index,
1695                                                            queue_id);
1696               if (device_queue_sz >= dm->efd.queue_hi_thresh)
1697                 {
1698                   /* dpdk device queue has reached the critical threshold */
1699                   xd->efd_agent.congestion_cnt++;
1700
1701                   /* apply EFD to packets from the burst */
1702                   efd_discard_burst = 1;
1703                 }
1704             }
1705         }
1706       
1707       fl = vlib_buffer_get_free_list 
1708         (vm, VLIB_BUFFER_DEFAULT_FREE_LIST_INDEX);
1709           
1710       mb_index = 0;
1711
1712       while (n_buffers > 0)
1713         {
1714           u32 bi0;
1715           u8 next0, error0;
1716           u32 l3_offset0;
1717           vlib_buffer_t * b0, * b_seg, * b_chain = 0;
1718           ethernet_header_t * h0;
1719           u8 nb_seg = 1;
1720           struct rte_mbuf *mb = xd->rx_vectors[queue_id][mb_index];
1721           struct rte_mbuf *mb_seg = mb->next;
1722
1723           if (PREDICT_TRUE(n_buffers > 1))
1724             {
1725               struct rte_mbuf *pfmb = xd->rx_vectors[queue_id][mb_index+2];
1726               vlib_buffer_t *bp = (vlib_buffer_t *)(pfmb+1);
1727               CLIB_PREFETCH (pfmb, CLIB_CACHE_LINE_BYTES, LOAD);
1728               CLIB_PREFETCH (bp, CLIB_CACHE_LINE_BYTES, STORE);
1729               CLIB_PREFETCH (bp->data, CLIB_CACHE_LINE_BYTES, LOAD);
1730             }
1731                 
1732           b0 = (vlib_buffer_t *)(mb+1);
1733                 
1734           /* check whether EFD is looking for packets to discard */
1735           if (PREDICT_FALSE(efd_discard_burst))
1736             {
1737               u32 cntr_type;
1738               if (PREDICT_TRUE(cntr_type = is_efd_discardable(tm, b0, mb)))
1739                 {
1740                   rte_pktmbuf_free(mb);
1741                   xd->efd_agent.discard_cnt++;
1742                   increment_efd_drop_counter(vm, 
1743                                              cntr_type,
1744                                              1);
1745
1746                   n_buffers--;
1747                   mb_index++;
1748                   continue;
1749                 }
1750             }
1751
1752           /* Prefetch one next segment if it exists */
1753           if (PREDICT_FALSE(mb->nb_segs > 1))
1754             {
1755               struct rte_mbuf *pfmb = mb->next;
1756               vlib_buffer_t *bp = (vlib_buffer_t *)(pfmb+1);
1757               CLIB_PREFETCH (pfmb, CLIB_CACHE_LINE_BYTES, LOAD);
1758               CLIB_PREFETCH (bp, CLIB_CACHE_LINE_BYTES, STORE);
1759               b_chain = b0;
1760             }
1761
1762           bi0 = vlib_get_buffer_index (vm, b0);
1763           vlib_buffer_init_for_free_list (b0, fl);
1764           b0->clone_count = 0;
1765
1766           dpdk_rx_next_and_error_from_mb_flags_x1 (xd, mb, b0,
1767                                                    &next0, &error0);
1768 #ifdef RTE_LIBRTE_MBUF_EXT_RX_OLFLAGS
1769           /*
1770            * Clear overloaded TX offload flags when a DPDK driver
1771            * is using them for RX flags (e.g. Cisco VIC Ethernet driver)
1772            */
1773           if (PREDICT_TRUE(trace_cnt == 0))
1774             mb->ol_flags &= PKT_EXT_RX_CLR_TX_FLAGS_MASK;
1775           else
1776             trace_cnt--;
1777 #endif /* RTE_LIBRTE_MBUF_EXT_RX_OLFLAGS */
1778
1779           if (error0)
1780             clib_warning ("bi %d error %d", bi0, error0);
1781
1782           b0->error = 0;
1783
1784           l3_offset0 = ((next0 == DPDK_RX_NEXT_IP4_INPUT ||
1785                          next0 == DPDK_RX_NEXT_IP6_INPUT || 
1786                          next0 == DPDK_RX_NEXT_MPLS_INPUT) ? 
1787                         sizeof (ethernet_header_t) : 0);
1788
1789           b0->current_data = l3_offset0;
1790           b0->current_length = mb->data_len - l3_offset0;
1791
1792           b0->flags = buffer_flags_template;
1793                 
1794           if (VMWARE_LENGTH_BUG_WORKAROUND)
1795               b0->current_length -= 4;
1796
1797           vnet_buffer(b0)->sw_if_index[VLIB_RX] = xd->vlib_sw_if_index;
1798           vnet_buffer(b0)->sw_if_index[VLIB_TX] = (u32)~0;
1799           vnet_buffer(b0)->io_handoff.next_index = next0;
1800           n_rx_bytes += mb->pkt_len;
1801
1802           /* Process subsequent segments of multi-segment packets */
1803           while ((mb->nb_segs > 1) && (nb_seg < mb->nb_segs))
1804             {
1805               ASSERT(mb_seg != 0);
1806  
1807               b_seg = (vlib_buffer_t *)(mb_seg+1);
1808               vlib_buffer_init_for_free_list (b_seg, fl);
1809               b_seg->clone_count = 0;
1810  
1811               ASSERT((b_seg->flags & VLIB_BUFFER_NEXT_PRESENT) == 0);
1812               ASSERT(b_seg->current_data == 0);
1813  
1814               /*
1815                * The driver (e.g. virtio) may not put the packet data at the start
1816                * of the segment, so don't assume b_seg->current_data == 0 is correct.
1817                */
1818               b_seg->current_data = (mb_seg->buf_addr + mb_seg->data_off) - (void *)b_seg->data;
1819
1820               b_seg->current_length = mb_seg->data_len;
1821               b0->total_length_not_including_first_buffer +=
1822                 mb_seg->data_len;
1823  
1824               b_chain->flags |= VLIB_BUFFER_NEXT_PRESENT;
1825               b_chain->next_buffer = vlib_get_buffer_index (vm, b_seg);
1826  
1827               b_chain = b_seg;
1828               mb_seg = mb_seg->next;
1829               nb_seg++;
1830             }
1831  
1832           /*
1833            * Turn this on if you run into
1834            * "bad monkey" contexts, and you want to know exactly
1835            * which nodes they've visited... See main.c...
1836            */
1837           VLIB_BUFFER_TRACE_TRAJECTORY_INIT(b0);
1838  
1839           if (PREDICT_FALSE (n_trace > mb_index))
1840             vec_add1 (xd->d_trace_buffers, bi0);
1841
1842           next_worker_index = first_worker_index;
1843
1844           /* 
1845            * Force unknown traffic onto worker 0, 
1846            * and into ethernet-input. $$$$ add more hashes.
1847            */
1848           h0 = (ethernet_header_t *) b0->data;
1849
1850           /* Compute ingress LB hash */
1851           hash_key = eth_get_key(h0);
1852           hash = (u32)clib_xxhash(hash_key);
1853
1854           if (PREDICT_TRUE (is_pow2(num_workers)))
1855             next_worker_index += hash & (num_workers - 1);
1856           else
1857             next_worker_index += hash % num_workers;
1858
1859           /* if EFD is enabled and not already discarding from dpdk,
1860            * check the worker ring/queue for congestion
1861            */
1862           if (PREDICT_FALSE(tm->efd.enabled && !efd_discard_burst))
1863             {
1864               vlib_frame_queue_t *fq;
1865
1866               /* fq will be valid if the ring is congested */
1867               fq = is_vlib_handoff_queue_congested(
1868                   next_worker_index, tm->efd.queue_hi_thresh,
1869                   congested_handoff_queue_by_worker_index);
1870               
1871               if (PREDICT_FALSE(fq != NULL))
1872                 {
1873                   u32 cntr_type;
1874                   if (PREDICT_TRUE(cntr_type =
1875                                    is_efd_discardable(tm, b0, mb)))
1876                     {
1877                       /* discard the packet */
1878                       fq->enqueue_efd_discards++;
1879                       increment_efd_drop_counter(vm, cntr_type, 1);
1880                       rte_pktmbuf_free(mb);
1881                       n_buffers--;
1882                       mb_index++;
1883                       continue;
1884                     }
1885                 }
1886             }
1887           
1888           if (next_worker_index != current_worker_index)
1889             {
1890               if (hf)
1891                 hf->n_vectors = VLIB_FRAME_SIZE - n_left_to_next_worker;
1892
1893               hf = dpdk_get_handoff_queue_elt(
1894                      next_worker_index,
1895                      handoff_queue_elt_by_worker_index);
1896
1897               n_left_to_next_worker = VLIB_FRAME_SIZE - hf->n_vectors;
1898               to_next_worker = &hf->buffer_index[hf->n_vectors];
1899               current_worker_index = next_worker_index;
1900             }
1901           
1902           /* enqueue to correct worker thread */
1903           to_next_worker[0] = bi0;
1904           to_next_worker++;
1905           n_left_to_next_worker--;
1906
1907           if (n_left_to_next_worker == 0)
1908             {
1909               hf->n_vectors = VLIB_FRAME_SIZE;
1910               vlib_put_handoff_queue_elt(hf);
1911               current_worker_index = ~0;
1912               handoff_queue_elt_by_worker_index[next_worker_index] = 0;
1913               hf = 0;
1914             }
1915           
1916           n_buffers--;
1917           mb_index++;
1918         }
1919
1920       if (PREDICT_FALSE (vec_len (xd->d_trace_buffers) > 0))
1921         {
1922           /* credit the trace to the trace node */
1923           dpdk_rx_trace (dm, node_trace, xd, queue_id, xd->d_trace_buffers,
1924                          vec_len (xd->d_trace_buffers));
1925           vlib_set_trace_count (vm, node_trace, n_trace - vec_len (xd->d_trace_buffers));
1926         }
1927
1928       vlib_increment_combined_counter 
1929         (vnet_get_main()->interface_main.combined_sw_if_counters
1930          + VNET_INTERFACE_COUNTER_RX,
1931          cpu_index, 
1932          xd->vlib_sw_if_index,
1933          mb_index, n_rx_bytes);
1934
1935       dpdk_worker_t * dw = vec_elt_at_index(dm->workers, cpu_index);
1936       dw->aggregate_rx_packets += mb_index;
1937       n_rx_packets += mb_index;
1938     }
1939
1940   if (hf)
1941     hf->n_vectors = VLIB_FRAME_SIZE - n_left_to_next_worker;
1942   
1943   /* Ship frames to the worker nodes */
1944   for (i = 0; i < vec_len (handoff_queue_elt_by_worker_index); i++)
1945     {
1946       if (handoff_queue_elt_by_worker_index[i])
1947         {
1948           hf = handoff_queue_elt_by_worker_index[i];
1949           /* 
1950            * It works better to let the handoff node
1951            * rate-adapt, always ship the handoff queue element.
1952            */
1953           if (1 || hf->n_vectors == hf->last_n_vectors)
1954             {
1955               vlib_put_handoff_queue_elt(hf);
1956               handoff_queue_elt_by_worker_index[i] = 0;
1957             }
1958           else
1959             hf->last_n_vectors = hf->n_vectors;
1960         }
1961       congested_handoff_queue_by_worker_index[i] = (vlib_frame_queue_t *)(~0);
1962     }
1963   hf = 0;
1964   current_worker_index = ~0;
1965   return n_rx_packets;
1966 }
1967
1968 VLIB_REGISTER_NODE (dpdk_io_input_node) = {
1969   .function = dpdk_io_input,
1970   .type = VLIB_NODE_TYPE_INPUT,
1971   .name = "dpdk-io-input",
1972
1973   /* Will be enabled if/when hardware is detected. */
1974   .state = VLIB_NODE_STATE_DISABLED,
1975
1976   .format_buffer = format_ethernet_header_with_length,
1977   .format_trace = format_dpdk_rx_dma_trace,
1978
1979   .n_errors = DPDK_N_ERROR,
1980   .error_strings = dpdk_error_strings,
1981
1982   .n_next_nodes = DPDK_RX_N_NEXT,
1983   .next_nodes = {
1984     [DPDK_RX_NEXT_DROP] = "error-drop",
1985     [DPDK_RX_NEXT_ETHERNET_INPUT] = "ethernet-input",
1986     [DPDK_RX_NEXT_IP4_INPUT] = "ip4-input-no-checksum",
1987     [DPDK_RX_NEXT_IP6_INPUT] = "ip6-input",
1988     [DPDK_RX_NEXT_MPLS_INPUT] = "mpls-gre-input",
1989   },
1990 };
1991
1992 /*
1993  * set_efd_bitmap()
1994  * Based on the operation type, set lower/upper bits for the given index value
1995  */
1996 void
1997 set_efd_bitmap (u8 *bitmap, u32 value, u32 op)
1998 {
1999     int ix;
2000
2001     *bitmap = 0;
2002     for (ix = 0; ix < 8; ix++) {
2003         if (((op == EFD_OPERATION_LESS_THAN) && (ix < value)) ||
2004             ((op == EFD_OPERATION_GREATER_OR_EQUAL) && (ix >= value))){
2005             (*bitmap) |= (1 << ix);
2006         }
2007     }
2008 }
2009
2010 void
2011 efd_config (u32 enabled, 
2012             u32 ip_prec,  u32 ip_op,
2013             u32 mpls_exp, u32 mpls_op,
2014             u32 vlan_cos, u32 vlan_op)
2015 {
2016    vlib_thread_main_t * tm = vlib_get_thread_main();
2017    dpdk_main_t * dm = &dpdk_main;
2018
2019    if (enabled) {
2020        tm->efd.enabled |= VLIB_EFD_DISCARD_ENABLED;
2021        dm->efd.enabled |= DPDK_EFD_DISCARD_ENABLED;
2022    } else {
2023        tm->efd.enabled &= ~VLIB_EFD_DISCARD_ENABLED;
2024        dm->efd.enabled &= ~DPDK_EFD_DISCARD_ENABLED;
2025    }
2026
2027    set_efd_bitmap(&tm->efd.ip_prec_bitmap, ip_prec, ip_op);
2028    set_efd_bitmap(&tm->efd.mpls_exp_bitmap, mpls_exp, mpls_op);
2029    set_efd_bitmap(&tm->efd.vlan_cos_bitmap, vlan_cos, vlan_op);
2030
2031 }