07273cbeb87b531e29d874e212b814b8bef3cf6e
[vpp.git] / vnet / vnet / devices / dpdk / node.c
1 /*
2  * Copyright (c) 2015 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include <vnet/vnet.h>
16 #include <vppinfra/vec.h>
17 #include <vppinfra/error.h>
18 #include <vppinfra/format.h>
19 #include <vppinfra/xxhash.h>
20
21 #include <vnet/ethernet/ethernet.h>
22 #include <vnet/devices/dpdk/dpdk.h>
23 #include <vnet/classify/vnet_classify.h>
24 #include <vnet/mpls-gre/packet.h>
25
26 #include "dpdk_priv.h"
27
28 #ifndef MAX
29 #define MAX(a,b) ((a) < (b) ? (b) : (a))
30 #endif
31
32 #ifndef MIN
33 #define MIN(a,b) ((a) < (b) ? (a) : (b))
34 #endif
35
36 /*
37  * At least in certain versions of ESXi, vmware e1000's don't honor the
38  * "strip rx CRC" bit. Set this flag to work around that bug FOR UNIT TEST ONLY.
39  *
40  * If wireshark complains like so:
41  *
42  * "Frame check sequence: 0x00000000 [incorrect, should be <hex-num>]"
43  * and you're using ESXi emulated e1000's, set this flag FOR UNIT TEST ONLY.
44  *
45  * Note: do NOT check in this file with this workaround enabled! You'll lose
46  * actual data from e.g. 10xGE interfaces. The extra 4 bytes annoy
47  * wireshark, but they're harmless...
48  */
49 #define VMWARE_LENGTH_BUG_WORKAROUND 0
50
51 typedef struct {
52   u32 cached_next_index;
53
54   /* convenience variables */
55   vlib_main_t * vlib_main;
56   vnet_main_t * vnet_main;
57 } handoff_dispatch_main_t;
58
59 typedef struct {
60   u32 buffer_index;
61   u32 next_index;
62   u32 sw_if_index;
63 } handoff_dispatch_trace_t;
64
65 /* packet trace format function */
66 static u8 * format_handoff_dispatch_trace (u8 * s, va_list * args)
67 {
68   CLIB_UNUSED (vlib_main_t * vm) = va_arg (*args, vlib_main_t *);
69   CLIB_UNUSED (vlib_node_t * node) = va_arg (*args, vlib_node_t *);
70   handoff_dispatch_trace_t * t = va_arg (*args, handoff_dispatch_trace_t *);
71   
72   s = format (s, "HANDOFF_DISPATCH: sw_if_index %d next_index %d buffer 0x%x",
73       t->sw_if_index,
74       t->next_index,
75       t->buffer_index);
76   return s;
77 }
78
79 handoff_dispatch_main_t handoff_dispatch_main;
80
81 vlib_node_registration_t handoff_dispatch_node;
82
83 #define foreach_handoff_dispatch_error \
84 _(EXAMPLE, "example packets")
85
86 typedef enum {
87 #define _(sym,str) HANDOFF_DISPATCH_ERROR_##sym,
88   foreach_handoff_dispatch_error
89 #undef _
90   HANDOFF_DISPATCH_N_ERROR,
91 } handoff_dispatch_error_t;
92
93 static char * handoff_dispatch_error_strings[] = {
94 #define _(sym,string) string,
95   foreach_handoff_dispatch_error
96 #undef _
97 };
98
99 static inline
100 void vlib_put_handoff_queue_elt (vlib_frame_queue_elt_t * hf)
101 {
102   CLIB_MEMORY_BARRIER();
103   hf->valid = 1;
104 }
105
106 static uword
107 handoff_dispatch_node_fn (vlib_main_t * vm,
108                   vlib_node_runtime_t * node,
109                   vlib_frame_t * frame)
110 {
111   u32 n_left_from, * from, * to_next;
112   dpdk_rx_next_t next_index;
113
114   from = vlib_frame_vector_args (frame);
115   n_left_from = frame->n_vectors;
116   next_index = node->cached_next_index;
117
118   while (n_left_from > 0)
119     {
120       u32 n_left_to_next;
121
122       vlib_get_next_frame (vm, node, next_index,
123                            to_next, n_left_to_next);
124
125       while (n_left_from >= 4 && n_left_to_next >= 2)
126         {
127           u32 bi0, bi1;
128           vlib_buffer_t * b0, * b1;
129           u32 next0, next1;
130           u32 sw_if_index0, sw_if_index1;
131           
132           /* Prefetch next iteration. */
133           {
134             vlib_buffer_t * p2, * p3;
135             
136             p2 = vlib_get_buffer (vm, from[2]);
137             p3 = vlib_get_buffer (vm, from[3]);
138             
139             vlib_prefetch_buffer_header (p2, LOAD);
140             vlib_prefetch_buffer_header (p3, LOAD);
141           }
142
143           /* speculatively enqueue b0 and b1 to the current next frame */
144           to_next[0] = bi0 = from[0];
145           to_next[1] = bi1 = from[1];
146           from += 2;
147           to_next += 2;
148           n_left_from -= 2;
149           n_left_to_next -= 2;
150
151           b0 = vlib_get_buffer (vm, bi0);
152           b1 = vlib_get_buffer (vm, bi1);
153
154           next0 = vnet_buffer(b0)->io_handoff.next_index;
155           next1 = vnet_buffer(b1)->io_handoff.next_index;
156
157           if (PREDICT_FALSE(b0->flags & VLIB_BUFFER_IS_TRACED))
158             {
159               vlib_trace_buffer (vm, node, next0, b0, /* follow_chain */ 0);
160               handoff_dispatch_trace_t *t =
161                 vlib_add_trace (vm, node, b0, sizeof (*t));
162               sw_if_index0 = vnet_buffer(b0)->sw_if_index[VLIB_RX];
163               t->sw_if_index = sw_if_index0;
164               t->next_index = next0;
165               t->buffer_index = bi0;
166             }
167           if (PREDICT_FALSE(b1->flags & VLIB_BUFFER_IS_TRACED))
168             {
169               vlib_trace_buffer (vm, node, next1, b1, /* follow_chain */ 0);
170               handoff_dispatch_trace_t *t =
171                 vlib_add_trace (vm, node, b1, sizeof (*t));
172               sw_if_index1 = vnet_buffer(b1)->sw_if_index[VLIB_RX];
173               t->sw_if_index = sw_if_index1;
174               t->next_index = next1;
175               t->buffer_index = bi1;
176             }
177             
178           /* verify speculative enqueues, maybe switch current next frame */
179           vlib_validate_buffer_enqueue_x2 (vm, node, next_index,
180                                            to_next, n_left_to_next,
181                                            bi0, bi1, next0, next1);
182         }
183       
184       while (n_left_from > 0 && n_left_to_next > 0)
185         {
186           u32 bi0;
187           vlib_buffer_t * b0;
188           u32 next0;
189           u32 sw_if_index0;
190
191           /* speculatively enqueue b0 to the current next frame */
192           bi0 = from[0];
193           to_next[0] = bi0;
194           from += 1;
195           to_next += 1;
196           n_left_from -= 1;
197           n_left_to_next -= 1;
198
199           b0 = vlib_get_buffer (vm, bi0);
200
201           next0 = vnet_buffer(b0)->io_handoff.next_index;
202
203           if (PREDICT_FALSE(b0->flags & VLIB_BUFFER_IS_TRACED))
204             {
205               vlib_trace_buffer (vm, node, next0, b0, /* follow_chain */ 0);
206               handoff_dispatch_trace_t *t =
207                 vlib_add_trace (vm, node, b0, sizeof (*t));
208               sw_if_index0 = vnet_buffer(b0)->sw_if_index[VLIB_RX];
209               t->sw_if_index = sw_if_index0;
210               t->next_index = next0;
211               t->buffer_index = bi0;
212            }
213
214           /* verify speculative enqueue, maybe switch current next frame */
215           vlib_validate_buffer_enqueue_x1 (vm, node, next_index,
216                                            to_next, n_left_to_next,
217                                            bi0, next0);
218         }
219
220       vlib_put_next_frame (vm, node, next_index, n_left_to_next);
221     }
222
223   return frame->n_vectors;
224 }
225
226 VLIB_REGISTER_NODE (handoff_dispatch_node) = {
227   .function = handoff_dispatch_node_fn,
228   .name = "handoff-dispatch",
229   .vector_size = sizeof (u32),
230   .format_trace = format_handoff_dispatch_trace,
231   .type = VLIB_NODE_TYPE_INTERNAL,
232   .flags = VLIB_NODE_FLAG_IS_HANDOFF,
233   
234   .n_errors = ARRAY_LEN(handoff_dispatch_error_strings),
235   .error_strings = handoff_dispatch_error_strings,
236
237   .n_next_nodes = DPDK_RX_N_NEXT,
238
239   .next_nodes = {
240         [DPDK_RX_NEXT_DROP] = "error-drop",
241         [DPDK_RX_NEXT_ETHERNET_INPUT] = "ethernet-input",
242         [DPDK_RX_NEXT_IP4_INPUT] = "ip4-input",
243         [DPDK_RX_NEXT_IP6_INPUT] = "ip6-input",
244         [DPDK_RX_NEXT_MPLS_INPUT] = "mpls-gre-input",
245   },
246 };
247
248 clib_error_t *handoff_dispatch_init (vlib_main_t *vm)
249 {
250   handoff_dispatch_main_t * mp = &handoff_dispatch_main;
251     
252   mp->vlib_main = vm;
253   mp->vnet_main = &vnet_main;
254
255   return 0;
256 }
257
258 VLIB_INIT_FUNCTION (handoff_dispatch_init);
259
260 u32 dpdk_get_handoff_node_index (void)
261 {
262   return handoff_dispatch_node.index;
263 }
264
265 static char * dpdk_error_strings[] = {
266 #define _(n,s) s,
267     foreach_dpdk_error
268 #undef _
269 };
270
271 typedef struct {
272   u32 buffer_index;
273   u16 device_index;
274   u16 queue_index;
275   struct rte_mbuf mb;
276   vlib_buffer_t buffer; /* Copy of VLIB buffer; pkt data stored in pre_data. */
277 } dpdk_rx_dma_trace_t;
278
279 static u8 * format_dpdk_rx_dma_trace (u8 * s, va_list * va)
280 {
281   CLIB_UNUSED (vlib_main_t * vm) = va_arg (*va, vlib_main_t *);
282   CLIB_UNUSED (vlib_node_t * node) = va_arg (*va, vlib_node_t *);
283   CLIB_UNUSED (vnet_main_t * vnm) = vnet_get_main();
284   dpdk_rx_dma_trace_t * t = va_arg (*va, dpdk_rx_dma_trace_t *);
285   dpdk_main_t * dm = &dpdk_main;
286   dpdk_device_t * xd = vec_elt_at_index (dm->devices, t->device_index);
287   format_function_t * f;
288   uword indent = format_get_indent (s);
289   vnet_sw_interface_t * sw = vnet_get_sw_interface (vnm, xd->vlib_sw_if_index);
290
291   s = format (s, "%U rx queue %d",
292               format_vnet_sw_interface_name, vnm, sw,
293               t->queue_index);
294
295   s = format (s, "\n%Ubuffer 0x%x: %U",
296               format_white_space, indent,
297               t->buffer_index,
298               format_vlib_buffer, &t->buffer);
299
300 #ifdef RTE_LIBRTE_MBUF_EXT_RX_OLFLAGS
301   s = format (s, "\n%U%U",
302               format_white_space, indent,
303               format_dpdk_rx_rte_mbuf, &t->mb);
304 #else
305   s = format (s, "\n%U%U",
306               format_white_space, indent,
307               format_dpdk_rte_mbuf, &t->mb);
308 #endif /* RTE_LIBRTE_MBUF_EXT_RX_OLFLAGS */
309   f = node->format_buffer;
310   if (!f)
311     f = format_hex_bytes;
312   s = format (s, "\n%U%U", format_white_space, indent,
313               f, t->buffer.pre_data, sizeof (t->buffer.pre_data));
314
315   return s;
316 }
317
318 always_inline void
319 dpdk_rx_next_and_error_from_mb_flags_x1 (dpdk_device_t *xd, struct rte_mbuf *mb,
320                                          vlib_buffer_t *b0,
321                                          u8 * next0, u8 * error0)
322 {
323   u8 is0_ip4, is0_ip6, is0_mpls, n0;
324   uint16_t mb_flags = mb->ol_flags;
325
326   if (PREDICT_FALSE(mb_flags & (
327 #ifdef RTE_LIBRTE_MBUF_EXT_RX_OLFLAGS
328        PKT_EXT_RX_PKT_ERROR | PKT_EXT_RX_BAD_FCS   |
329 #endif /* RTE_LIBRTE_MBUF_EXT_RX_OLFLAGS */
330         PKT_RX_IP_CKSUM_BAD  | PKT_RX_L4_CKSUM_BAD
331     ))) 
332     {
333       /* some error was flagged. determine the drop reason */ 
334       n0 = DPDK_RX_NEXT_DROP;
335       *error0 = 
336 #ifdef RTE_LIBRTE_MBUF_EXT_RX_OLFLAGS
337         (mb_flags & PKT_EXT_RX_PKT_ERROR) ? DPDK_ERROR_RX_PACKET_ERROR : 
338         (mb_flags & PKT_EXT_RX_BAD_FCS) ? DPDK_ERROR_RX_BAD_FCS : 
339 #endif /* RTE_LIBRTE_MBUF_EXT_RX_OLFLAGS */
340         (mb_flags & PKT_RX_IP_CKSUM_BAD) ? DPDK_ERROR_IP_CHECKSUM_ERROR : 
341         (mb_flags & PKT_RX_L4_CKSUM_BAD) ? DPDK_ERROR_L4_CHECKSUM_ERROR : 
342         DPDK_ERROR_NONE;
343     }
344   else
345     {
346       *error0 = DPDK_ERROR_NONE;
347       if (xd->per_interface_next_index != ~0)
348         n0 = xd->per_interface_next_index;
349       else if (mb_flags & PKT_RX_VLAN_PKT)
350         n0 = DPDK_RX_NEXT_ETHERNET_INPUT;
351       else
352         {
353           n0 = DPDK_RX_NEXT_ETHERNET_INPUT;
354 #if RTE_VERSION >= RTE_VERSION_NUM(2, 1, 0, 0)
355           is0_ip4 = RTE_ETH_IS_IPV4_HDR(mb->packet_type) != 0;
356 #else
357           is0_ip4 = (mb_flags & (PKT_RX_IPV4_HDR | PKT_RX_IPV4_HDR_EXT)) != 0;
358 #endif
359
360           if (PREDICT_TRUE(is0_ip4))
361             n0 = DPDK_RX_NEXT_IP4_INPUT;
362           else
363             {
364 #if RTE_VERSION >= RTE_VERSION_NUM(2, 1, 0, 0)
365               is0_ip6 = RTE_ETH_IS_IPV6_HDR(mb->packet_type) != 0;
366 #else
367               is0_ip6 = 
368                       (mb_flags & (PKT_RX_IPV6_HDR | PKT_RX_IPV6_HDR_EXT)) != 0;
369 #endif
370               if (PREDICT_TRUE(is0_ip6))
371                 n0 = DPDK_RX_NEXT_IP6_INPUT;
372               else
373                 {
374                   ethernet_header_t *h0 = (ethernet_header_t *) b0->data;
375                   is0_mpls = (h0->type == clib_host_to_net_u16(ETHERNET_TYPE_MPLS_UNICAST));
376                   n0 = is0_mpls ? DPDK_RX_NEXT_MPLS_INPUT : n0;
377                 }
378             }
379         }
380     }
381   *next0 = n0;
382 }
383
384 void dpdk_rx_trace (dpdk_main_t * dm,
385                     vlib_node_runtime_t * node,
386                     dpdk_device_t * xd,
387                     u16 queue_id,
388                     u32 * buffers,
389                     uword n_buffers)
390 {
391   vlib_main_t * vm = vlib_get_main();
392   u32 * b, n_left;
393   u8 next0;
394
395   n_left = n_buffers;
396   b = buffers;
397
398   while (n_left >= 1)
399     {
400       u32 bi0;
401       vlib_buffer_t * b0;
402       dpdk_rx_dma_trace_t * t0;
403       struct rte_mbuf *mb;
404       u8 error0;
405
406       bi0 = b[0];
407       n_left -= 1;
408
409       b0 = vlib_get_buffer (vm, bi0);
410       mb = ((struct rte_mbuf *)b0) - 1;
411       dpdk_rx_next_and_error_from_mb_flags_x1 (xd, mb, b0,
412                                                &next0, &error0);
413       vlib_trace_buffer (vm, node, next0, b0, /* follow_chain */ 0);
414       t0 = vlib_add_trace (vm, node, b0, sizeof (t0[0]));
415       t0->queue_index = queue_id;
416       t0->device_index = xd->device_index;
417       t0->buffer_index = bi0;
418
419       memcpy (&t0->mb, mb, sizeof (t0->mb));
420       memcpy (&t0->buffer, b0, sizeof (b0[0]) - sizeof (b0->pre_data));
421       memcpy (t0->buffer.pre_data, b0->data, sizeof (t0->buffer.pre_data));
422
423 #ifdef RTE_LIBRTE_MBUF_EXT_RX_OLFLAGS
424       /*
425        * Clear overloaded TX offload flags when a DPDK driver
426        * is using them for RX flags (e.g. Cisco VIC Ethernet driver)
427        */
428       mb->ol_flags &= PKT_EXT_RX_CLR_TX_FLAGS_MASK;
429 #endif /* RTE_LIBRTE_MBUF_EXT_RX_OLFLAGS */
430
431       b += 1;
432     }
433 }
434
435 /*
436  * dpdk_efd_update_counters()
437  * Update EFD (early-fast-discard) counters
438  */
439 void dpdk_efd_update_counters (dpdk_device_t *xd,
440                                u32 n_buffers,
441                                u16 enabled)
442 {
443   if (enabled & DPDK_EFD_MONITOR_ENABLED)
444     {
445       u64 now = clib_cpu_time_now();
446       if (xd->efd_agent.last_poll_time > 0)
447         {
448           u64 elapsed_time = (now - xd->efd_agent.last_poll_time);
449           if (elapsed_time > xd->efd_agent.max_poll_delay)
450             xd->efd_agent.max_poll_delay = elapsed_time;
451         }
452       xd->efd_agent.last_poll_time = now;
453     }
454   
455   xd->efd_agent.total_packet_cnt += n_buffers;
456   xd->efd_agent.last_burst_sz = n_buffers;
457
458   if (n_buffers > xd->efd_agent.max_burst_sz)
459     xd->efd_agent.max_burst_sz = n_buffers;
460
461   if (PREDICT_FALSE(n_buffers == VLIB_FRAME_SIZE))
462     {
463       xd->efd_agent.full_frames_cnt++;
464       xd->efd_agent.consec_full_frames_cnt++;
465     }
466   else
467     {
468       xd->efd_agent.consec_full_frames_cnt = 0;
469     }
470 }
471
472 /* is_efd_discardable()
473  *   returns non zero DPDK error if packet meets early-fast-discard criteria,
474  *           zero otherwise
475  */
476 u32 is_efd_discardable (vlib_thread_main_t *tm,
477                         vlib_buffer_t * b0,
478                         struct rte_mbuf *mb)
479 {
480   ethernet_header_t *eh = (ethernet_header_t *) b0->data;
481
482   if (eh->type == clib_host_to_net_u16(ETHERNET_TYPE_IP4))
483     {
484       ip4_header_t *ipv4 =
485           (ip4_header_t *)&(b0->data[sizeof(ethernet_header_t)]);
486       u8 pkt_prec = (ipv4->tos >> 5);
487           
488       return (tm->efd.ip_prec_bitmap & (1 << pkt_prec) ?
489                   DPDK_ERROR_IPV4_EFD_DROP_PKTS : DPDK_ERROR_NONE);
490     }
491   else if (eh->type == clib_net_to_host_u16(ETHERNET_TYPE_IP6))
492     {
493       ip6_header_t *ipv6 =
494           (ip6_header_t *)&(b0->data[sizeof(ethernet_header_t)]);
495       u8 pkt_tclass =
496           ((ipv6->ip_version_traffic_class_and_flow_label >> 20) & 0xff);
497           
498       return (tm->efd.ip_prec_bitmap & (1 << pkt_tclass) ?
499                   DPDK_ERROR_IPV6_EFD_DROP_PKTS : DPDK_ERROR_NONE);
500     }
501   else if (eh->type == clib_net_to_host_u16(ETHERNET_TYPE_MPLS_UNICAST))
502     {
503       mpls_unicast_header_t *mpls =
504           (mpls_unicast_header_t *)&(b0->data[sizeof(ethernet_header_t)]);
505       u8 pkt_exp = ((mpls->label_exp_s_ttl >> 9) & 0x07);
506
507       return (tm->efd.mpls_exp_bitmap & (1 << pkt_exp) ?
508                   DPDK_ERROR_MPLS_EFD_DROP_PKTS : DPDK_ERROR_NONE);
509     }
510   else if ((eh->type == clib_net_to_host_u16(ETHERNET_TYPE_VLAN)) ||
511            (eh->type == clib_net_to_host_u16(ETHERNET_TYPE_DOT1AD)))
512     {
513       ethernet_vlan_header_t *vlan =
514           (ethernet_vlan_header_t *)&(b0->data[sizeof(ethernet_header_t)]);
515       u8 pkt_cos = ((vlan->priority_cfi_and_id >> 13) & 0x07);
516
517       return (tm->efd.vlan_cos_bitmap & (1 << pkt_cos) ?
518                   DPDK_ERROR_VLAN_EFD_DROP_PKTS : DPDK_ERROR_NONE);
519     }
520
521   return DPDK_ERROR_NONE;
522 }
523
524 /*
525  * This function is used when there are no worker threads.
526  * The main thread performs IO and forwards the packets. 
527  */
528 static inline u32 dpdk_device_input ( dpdk_main_t * dm, 
529                                       dpdk_device_t * xd,
530                                       vlib_node_runtime_t * node,
531                                       u32 cpu_index,
532                                       u16 queue_id)
533 {
534   u32 n_buffers;
535   u32 next_index = DPDK_RX_NEXT_ETHERNET_INPUT;
536   u32 n_left_to_next, * to_next;
537   u32 mb_index;
538   vlib_main_t * vm = vlib_get_main();
539   uword n_rx_bytes = 0;
540   u32 n_trace, trace_cnt __attribute__((unused));
541   vlib_buffer_free_list_t * fl;
542   u8 efd_discard_burst = 0;
543   u16 ip_align_offset = 0;
544   u32 buffer_flags_template;
545   
546   if (xd->admin_up == 0)
547     return 0;
548
549   n_buffers = dpdk_rx_burst(dm, xd, queue_id);
550
551   if (n_buffers == 0)
552     {
553       /* check if EFD (dpdk) is enabled */
554       if (PREDICT_FALSE(dm->efd.enabled))
555         {
556           /* reset a few stats */
557           xd->efd_agent.last_poll_time = 0;
558           xd->efd_agent.last_burst_sz = 0;
559         }
560       return 0;
561     }
562
563   if (xd->pmd == VNET_DPDK_PMD_THUNDERX)
564       ip_align_offset = 6;
565
566   buffer_flags_template = dm->buffer_flags_template;
567
568   vec_reset_length (xd->d_trace_buffers);
569   trace_cnt = n_trace = vlib_get_trace_count (vm, node);
570
571   fl = vlib_buffer_get_free_list (vm, VLIB_BUFFER_DEFAULT_FREE_LIST_INDEX);
572
573   /*
574    * DAW-FIXME: VMXNET3 device stop/start doesn't work, 
575    * therefore fake the stop in the dpdk driver by
576    * silently dropping all of the incoming pkts instead of 
577    * stopping the driver / hardware.
578    */
579   if (PREDICT_FALSE(xd->admin_up != 1))
580     {
581       for (mb_index = 0; mb_index < n_buffers; mb_index++)
582         rte_pktmbuf_free (xd->rx_vectors[queue_id][mb_index]);
583       
584       return 0;
585     }
586
587   /* Check for congestion if EFD (Early-Fast-Discard) is enabled
588    * in any mode (e.g. dpdk, monitor, or drop_all)
589    */
590   if (PREDICT_FALSE(dm->efd.enabled))
591     {
592       /* update EFD counters */
593       dpdk_efd_update_counters(xd, n_buffers, dm->efd.enabled);
594
595       if (PREDICT_FALSE(dm->efd.enabled & DPDK_EFD_DROPALL_ENABLED))
596         {
597           /* discard all received packets */
598           for (mb_index = 0; mb_index < n_buffers; mb_index++)
599             rte_pktmbuf_free(xd->rx_vectors[queue_id][mb_index]);
600
601           xd->efd_agent.discard_cnt += n_buffers;
602           increment_efd_drop_counter(vm, 
603                                      DPDK_ERROR_VLAN_EFD_DROP_PKTS,
604                                      n_buffers);
605
606           return 0;
607         }
608       
609       if (PREDICT_FALSE(xd->efd_agent.consec_full_frames_cnt >=
610                         dm->efd.consec_full_frames_hi_thresh))
611         {
612           u32 device_queue_sz = rte_eth_rx_queue_count(xd->device_index,
613                                                        queue_id);
614           if (device_queue_sz >= dm->efd.queue_hi_thresh)
615             {
616               /* dpdk device queue has reached the critical threshold */
617               xd->efd_agent.congestion_cnt++;
618
619               /* apply EFD to packets from the burst */
620               efd_discard_burst = 1;
621             }
622         }
623     }
624   
625   mb_index = 0;
626
627   while (n_buffers > 0)
628     {
629       u32 bi0;
630       u8 next0, error0;
631       u32 l3_offset0;
632       vlib_buffer_t * b0, * b_seg, * b_chain = 0;
633       u32 cntr_type;
634
635       vlib_get_next_frame (vm, node, next_index, to_next, n_left_to_next);
636
637       while (n_buffers > 0 && n_left_to_next > 0)
638         {
639           u8 nb_seg = 1;
640           struct rte_mbuf *mb = xd->rx_vectors[queue_id][mb_index];
641           struct rte_mbuf *mb_seg = mb->next;
642
643           if (PREDICT_TRUE(n_buffers > 2))
644           {
645               struct rte_mbuf *pfmb = xd->rx_vectors[queue_id][mb_index+2];
646               vlib_buffer_t *bp = (vlib_buffer_t *)(pfmb+1);
647               CLIB_PREFETCH (pfmb, CLIB_CACHE_LINE_BYTES, STORE);
648               CLIB_PREFETCH (bp, CLIB_CACHE_LINE_BYTES, STORE);
649           }
650
651           ASSERT(mb);
652
653           b0 = (vlib_buffer_t *)(mb+1);
654
655           /* check whether EFD is looking for packets to discard */
656           if (PREDICT_FALSE(efd_discard_burst))
657             {
658               vlib_thread_main_t * tm = vlib_get_thread_main();
659               
660               if (PREDICT_TRUE(cntr_type = is_efd_discardable(tm, b0, mb)))
661                 {
662                   rte_pktmbuf_free(mb);
663                   xd->efd_agent.discard_cnt++;
664                   increment_efd_drop_counter(vm, 
665                                              cntr_type,
666                                              1);
667                   n_buffers--;
668                   mb_index++;
669                   continue;
670                 }
671             }
672
673           /* Prefetch one next segment if it exists. */
674           if (PREDICT_FALSE(mb->nb_segs > 1))
675             {
676               struct rte_mbuf *pfmb = mb->next;
677               vlib_buffer_t *bp = (vlib_buffer_t *)(pfmb+1);
678               CLIB_PREFETCH (pfmb, CLIB_CACHE_LINE_BYTES, LOAD);
679               CLIB_PREFETCH (bp, CLIB_CACHE_LINE_BYTES, STORE);
680               b_chain = b0;
681             }
682
683           vlib_buffer_init_for_free_list (b0, fl);
684           b0->clone_count = 0;
685           
686           bi0 = vlib_get_buffer_index (vm, b0);
687
688           to_next[0] = bi0;
689           to_next++;
690           n_left_to_next--;
691           
692           dpdk_rx_next_and_error_from_mb_flags_x1 (xd, mb, b0,
693                                                    &next0, &error0);
694 #ifdef RTE_LIBRTE_MBUF_EXT_RX_OLFLAGS
695           /*
696            * Clear overloaded TX offload flags when a DPDK driver
697            * is using them for RX flags (e.g. Cisco VIC Ethernet driver)
698            */
699
700           if (PREDICT_TRUE(trace_cnt == 0))
701             mb->ol_flags &= PKT_EXT_RX_CLR_TX_FLAGS_MASK;
702           else
703             trace_cnt--;
704 #endif /* RTE_LIBRTE_MBUF_EXT_RX_OLFLAGS */
705
706           b0->error = node->errors[error0];
707
708           l3_offset0 = ((next0 == DPDK_RX_NEXT_IP4_INPUT ||
709                          next0 == DPDK_RX_NEXT_IP6_INPUT ||
710                          next0 == DPDK_RX_NEXT_MPLS_INPUT) ? 
711                         sizeof (ethernet_header_t) : 0);
712
713           b0->current_data = l3_offset0;
714           b0->current_length = mb->data_len - l3_offset0;
715
716           if (PREDICT_FALSE (ip_align_offset != 0))
717             {
718               if (next0 == DPDK_RX_NEXT_IP4_INPUT ||
719                   next0 == DPDK_RX_NEXT_IP6_INPUT)
720                 b0->current_data += ip_align_offset;
721             }
722              
723           b0->flags = buffer_flags_template;
724
725           if (VMWARE_LENGTH_BUG_WORKAROUND)
726               b0->current_length -= 4;
727
728           vnet_buffer(b0)->sw_if_index[VLIB_RX] = xd->vlib_sw_if_index;
729           vnet_buffer(b0)->sw_if_index[VLIB_TX] = (u32)~0;
730           n_rx_bytes += mb->pkt_len;
731
732           /* Process subsequent segments of multi-segment packets */
733           while ((mb->nb_segs > 1) && (nb_seg < mb->nb_segs))
734             {
735               ASSERT(mb_seg != 0);
736
737               b_seg = (vlib_buffer_t *)(mb_seg+1);
738               vlib_buffer_init_for_free_list (b_seg, fl);
739               b_seg->clone_count = 0;
740
741               ASSERT((b_seg->flags & VLIB_BUFFER_NEXT_PRESENT) == 0);
742               ASSERT(b_seg->current_data == 0);
743
744               /*
745                * The driver (e.g. virtio) may not put the packet data at the start
746                * of the segment, so don't assume b_seg->current_data == 0 is correct.
747                */
748               b_seg->current_data = (mb_seg->buf_addr + mb_seg->data_off) - (void *)b_seg->data;
749
750               b_seg->current_length = mb_seg->data_len;
751               b0->total_length_not_including_first_buffer +=
752                 mb_seg->data_len;
753
754               b_chain->flags |= VLIB_BUFFER_NEXT_PRESENT;
755               b_chain->next_buffer = vlib_get_buffer_index (vm, b_seg);
756
757               b_chain = b_seg;
758               mb_seg = mb_seg->next;
759               nb_seg++;
760             } 
761
762           /*
763            * Turn this on if you run into
764            * "bad monkey" contexts, and you want to know exactly
765            * which nodes they've visited... See main.c...
766            */
767           VLIB_BUFFER_TRACE_TRAJECTORY_INIT(b0);
768
769           vlib_validate_buffer_enqueue_x1 (vm, node, next_index,
770                                            to_next, n_left_to_next,
771                                            bi0, next0);
772           if (PREDICT_FALSE (n_trace > mb_index))
773             vec_add1 (xd->d_trace_buffers, bi0);
774           n_buffers--;
775           mb_index++;
776         }
777       vlib_put_next_frame (vm, node, next_index, n_left_to_next);
778     }
779
780   if (PREDICT_FALSE (vec_len (xd->d_trace_buffers) > 0))
781     {
782       dpdk_rx_trace (dm, node, xd, queue_id, xd->d_trace_buffers,
783                      vec_len (xd->d_trace_buffers));
784       vlib_set_trace_count (vm, node, n_trace - vec_len (xd->d_trace_buffers));
785     }
786   
787   vlib_increment_combined_counter 
788     (vnet_get_main()->interface_main.combined_sw_if_counters
789      + VNET_INTERFACE_COUNTER_RX,
790      cpu_index, 
791      xd->vlib_sw_if_index,
792      mb_index, n_rx_bytes);
793
794   dpdk_worker_t * dw = vec_elt_at_index(dm->workers, cpu_index);
795   dw->aggregate_rx_packets += mb_index;
796
797   return mb_index;
798 }
799
800 #if VIRL > 0
801 #define VIRL_SPEED_LIMIT()                         \
802   /* Limit the input rate to 1000 vectors / sec */ \
803   {                                                \
804     struct timespec ts, tsrem;                     \
805                                                    \
806     ts.tv_sec = 0;                                 \
807     ts.tv_nsec = 1000*1000; /* 1ms */              \
808                                                    \
809     while (nanosleep(&ts, &tsrem) < 0)             \
810       {                                            \
811         ts = tsrem;                                \
812       }                                            \
813   }
814 #else
815 #define VIRL_SPEED_LIMIT()
816 #endif
817
818
819 static uword
820 dpdk_input (vlib_main_t * vm,
821             vlib_node_runtime_t * node,
822             vlib_frame_t * f)
823 {
824   dpdk_main_t * dm = &dpdk_main;
825   dpdk_device_t * xd;
826   uword n_rx_packets = 0;
827   dpdk_device_and_queue_t * dq;
828   u32 cpu_index = os_get_cpu_number();
829
830   /*
831    * Poll all devices on this cpu for input/interrupts.
832    */
833   vec_foreach (dq, dm->devices_by_cpu[cpu_index])
834     {
835       xd = vec_elt_at_index(dm->devices, dq->device);
836       ASSERT(dq->queue_id == 0);
837       n_rx_packets += dpdk_device_input (dm, xd, node, cpu_index, 0);
838     }
839
840   VIRL_SPEED_LIMIT()
841
842   return n_rx_packets;
843 }
844
845 uword
846 dpdk_input_rss (vlib_main_t * vm,
847       vlib_node_runtime_t * node,
848       vlib_frame_t * f)
849 {
850   dpdk_main_t * dm = &dpdk_main;
851   dpdk_device_t * xd;
852   uword n_rx_packets = 0;
853   dpdk_device_and_queue_t * dq;
854   u32 cpu_index = os_get_cpu_number();
855
856   /*
857    * Poll all devices on this cpu for input/interrupts.
858    */
859   vec_foreach (dq, dm->devices_by_cpu[cpu_index])
860     {
861       xd = vec_elt_at_index(dm->devices, dq->device);
862       n_rx_packets += dpdk_device_input (dm, xd, node, cpu_index, dq->queue_id);
863     }
864
865   VIRL_SPEED_LIMIT()
866
867   return n_rx_packets;
868 }
869
870 VLIB_REGISTER_NODE (dpdk_input_node) = {
871   .function = dpdk_input,
872   .type = VLIB_NODE_TYPE_INPUT,
873   .name = "dpdk-input",
874
875   /* Will be enabled if/when hardware is detected. */
876   .state = VLIB_NODE_STATE_DISABLED,
877
878   .format_buffer = format_ethernet_header_with_length,
879   .format_trace = format_dpdk_rx_dma_trace,
880
881   .n_errors = DPDK_N_ERROR,
882   .error_strings = dpdk_error_strings,
883
884   .n_next_nodes = DPDK_RX_N_NEXT,
885   .next_nodes = {
886     [DPDK_RX_NEXT_DROP] = "error-drop",
887     [DPDK_RX_NEXT_ETHERNET_INPUT] = "ethernet-input",
888     [DPDK_RX_NEXT_IP4_INPUT] = "ip4-input-no-checksum",
889     [DPDK_RX_NEXT_IP6_INPUT] = "ip6-input",
890     [DPDK_RX_NEXT_MPLS_INPUT] = "mpls-gre-input",
891   },
892 };
893
894 /*
895  * Override the next nodes for the dpdk input nodes.
896  * Must be invoked prior to VLIB_INIT_FUNCTION calls.
897  */
898 void dpdk_set_next_node (dpdk_rx_next_t next, char *name)
899 {
900   vlib_node_registration_t *r = &dpdk_input_node;
901   vlib_node_registration_t *r_io = &dpdk_io_input_node;
902   vlib_node_registration_t *r_handoff = &handoff_dispatch_node;
903
904   switch (next)
905     {
906     case DPDK_RX_NEXT_IP4_INPUT:
907     case DPDK_RX_NEXT_IP6_INPUT:
908     case DPDK_RX_NEXT_MPLS_INPUT:
909     case DPDK_RX_NEXT_ETHERNET_INPUT:
910       r->next_nodes[next] = name;
911       r_io->next_nodes[next] = name;
912       r_handoff->next_nodes[next] = name;
913       break;
914
915     default:
916       clib_warning ("%s: illegal next %d\n", __FUNCTION__, next);
917       break;
918     }
919 }
920
921 inline vlib_frame_queue_elt_t * 
922 vlib_get_handoff_queue_elt (u32 vlib_worker_index) 
923 {
924   vlib_frame_queue_t *fq;
925   vlib_frame_queue_elt_t *elt;
926   u64 new_tail;
927   
928   fq = vlib_frame_queues[vlib_worker_index];
929   ASSERT (fq);
930
931   new_tail = __sync_add_and_fetch (&fq->tail, 1);
932
933   /* Wait until a ring slot is available */
934   while (new_tail >= fq->head_hint + fq->nelts)
935       vlib_worker_thread_barrier_check ();
936
937   elt = fq->elts + (new_tail & (fq->nelts-1));
938
939   /* this would be very bad... */
940   while (elt->valid) 
941     ;
942
943   elt->msg_type = VLIB_FRAME_QUEUE_ELT_DISPATCH_FRAME;
944   elt->last_n_vectors = elt->n_vectors = 0;
945
946   return elt;
947 }
948
949 static inline vlib_frame_queue_elt_t *
950 dpdk_get_handoff_queue_elt ( 
951     u32 vlib_worker_index, 
952     vlib_frame_queue_elt_t ** handoff_queue_elt_by_worker_index)
953 {
954   vlib_frame_queue_elt_t *elt;
955
956   if (handoff_queue_elt_by_worker_index [vlib_worker_index])
957       return handoff_queue_elt_by_worker_index [vlib_worker_index];
958
959   elt = vlib_get_handoff_queue_elt (vlib_worker_index);
960
961   handoff_queue_elt_by_worker_index [vlib_worker_index] = elt;
962
963   return elt;
964 }
965
966 static inline vlib_frame_queue_t *
967 is_vlib_handoff_queue_congested (
968     u32 vlib_worker_index,
969     u32 queue_hi_thresh,
970     vlib_frame_queue_t ** handoff_queue_by_worker_index)
971 {
972   vlib_frame_queue_t *fq;
973
974   fq = handoff_queue_by_worker_index [vlib_worker_index];
975   if (fq != (vlib_frame_queue_t *)(~0)) 
976       return fq;
977   
978   fq = vlib_frame_queues[vlib_worker_index];
979   ASSERT (fq);
980
981   if (PREDICT_FALSE(fq->tail >= (fq->head_hint + queue_hi_thresh))) {
982     /* a valid entry in the array will indicate the queue has reached
983      * the specified threshold and is congested
984      */
985     handoff_queue_by_worker_index [vlib_worker_index] = fq;
986     fq->enqueue_full_events++;
987     return fq;
988   }
989
990   return NULL;
991 }
992
993 static inline u64 ipv4_get_key (ip4_header_t *ip)
994 {
995    u64  hash_key;
996
997    hash_key = *((u64*)(&ip->address_pair)) ^ ip->protocol;
998
999    return hash_key;
1000 }
1001
1002 static inline u64 ipv6_get_key (ip6_header_t *ip)
1003 {
1004    u64  hash_key;
1005
1006    hash_key = ip->src_address.as_u64[0] ^
1007               ip->src_address.as_u64[1] ^
1008               ip->dst_address.as_u64[0] ^
1009               ip->dst_address.as_u64[1] ^
1010               ip->protocol;
1011
1012    return hash_key;
1013 }
1014
1015
1016 #define MPLS_BOTTOM_OF_STACK_BIT_MASK   0x00000100U
1017 #define MPLS_LABEL_MASK                 0xFFFFF000U
1018
1019 static inline u64 mpls_get_key (mpls_unicast_header_t *m)
1020 {
1021    u64                     hash_key;
1022    u8                      ip_ver;
1023
1024
1025    /* find the bottom of the MPLS label stack. */
1026    if (PREDICT_TRUE(m->label_exp_s_ttl & 
1027                     clib_net_to_host_u32(MPLS_BOTTOM_OF_STACK_BIT_MASK))) {
1028        goto bottom_lbl_found;
1029    }
1030    m++;
1031
1032    if (PREDICT_TRUE(m->label_exp_s_ttl & 
1033                     clib_net_to_host_u32(MPLS_BOTTOM_OF_STACK_BIT_MASK))) {
1034        goto bottom_lbl_found;
1035    }
1036    m++;
1037
1038    if (m->label_exp_s_ttl & clib_net_to_host_u32(MPLS_BOTTOM_OF_STACK_BIT_MASK)) {
1039        goto bottom_lbl_found;
1040    }
1041    m++;
1042
1043    if (m->label_exp_s_ttl & clib_net_to_host_u32(MPLS_BOTTOM_OF_STACK_BIT_MASK)) {
1044        goto bottom_lbl_found;
1045    }
1046    m++;
1047
1048    if (m->label_exp_s_ttl & clib_net_to_host_u32(MPLS_BOTTOM_OF_STACK_BIT_MASK)) {
1049        goto bottom_lbl_found;
1050    }
1051    
1052    /* the bottom label was not found - use the last label */
1053    hash_key = m->label_exp_s_ttl & clib_net_to_host_u32(MPLS_LABEL_MASK);
1054
1055    return hash_key;
1056    
1057
1058 bottom_lbl_found:
1059    m++;
1060    ip_ver = (*((u8 *)m) >> 4);
1061
1062    /* find out if it is IPV4 or IPV6 header */
1063    if (PREDICT_TRUE(ip_ver == 4)) {
1064        hash_key = ipv4_get_key((ip4_header_t *)m);
1065    } else if (PREDICT_TRUE(ip_ver == 6)) {
1066        hash_key = ipv6_get_key((ip6_header_t *)m);
1067    } else {
1068        /* use the bottom label */
1069        hash_key = (m-1)->label_exp_s_ttl & clib_net_to_host_u32(MPLS_LABEL_MASK);
1070    }
1071
1072    return hash_key;
1073
1074 }
1075
1076 static inline u64 eth_get_key (ethernet_header_t *h0)
1077 {
1078    u64 hash_key;
1079
1080
1081    if (PREDICT_TRUE(h0->type) == clib_host_to_net_u16(ETHERNET_TYPE_IP4)) {
1082        hash_key = ipv4_get_key((ip4_header_t *)(h0+1));
1083    } else if (h0->type == clib_host_to_net_u16(ETHERNET_TYPE_IP6)) {
1084        hash_key = ipv6_get_key((ip6_header_t *)(h0+1));
1085    } else if (h0->type == clib_host_to_net_u16(ETHERNET_TYPE_MPLS_UNICAST)) {
1086        hash_key = mpls_get_key((mpls_unicast_header_t *)(h0+1));
1087    } else if ((h0->type == clib_host_to_net_u16(ETHERNET_TYPE_VLAN)) || 
1088               (h0->type == clib_host_to_net_u16(ETHERNET_TYPE_DOT1AD))) {
1089        ethernet_vlan_header_t * outer = (ethernet_vlan_header_t *)(h0 + 1);
1090        
1091        outer = (outer->type == clib_host_to_net_u16(ETHERNET_TYPE_VLAN)) ? 
1092                                   outer+1 : outer;
1093        if (PREDICT_TRUE(outer->type) == clib_host_to_net_u16(ETHERNET_TYPE_IP4)) {
1094            hash_key = ipv4_get_key((ip4_header_t *)(outer+1));
1095        } else if (outer->type == clib_host_to_net_u16 (ETHERNET_TYPE_IP6)) {
1096            hash_key = ipv6_get_key((ip6_header_t *)(outer+1));
1097        } else if (outer->type == clib_host_to_net_u16(ETHERNET_TYPE_MPLS_UNICAST)) {
1098            hash_key = mpls_get_key((mpls_unicast_header_t *)(outer+1));
1099        }  else {
1100            hash_key = outer->type; 
1101        }
1102    } else {
1103        hash_key  = 0;
1104    }
1105
1106    return hash_key;
1107 }
1108
1109 /*
1110  * This function is used when dedicated IO threads feed the worker threads.
1111  *
1112  * Devices are allocated to this thread based on instances and instance_id.
1113  * If instances==0 then the function automatically determines the number
1114  * of instances of this thread, and allocates devices between them. 
1115  * If instances != 0, then instance_id must be in the range 0..instances-1.
1116  * The function allocates devices among the specified number of instances,
1117  * with this thread having the given instance id. This option is used for 
1118  * splitting devices among differently named "io"-type threads.
1119  */
1120 void dpdk_io_thread (vlib_worker_thread_t * w,
1121                      u32 instances,
1122                      u32 instance_id,
1123                      char *worker_name,
1124                      dpdk_io_thread_callback_t callback)
1125 {
1126   vlib_main_t * vm = vlib_get_main();
1127   vlib_thread_main_t * tm = vlib_get_thread_main();
1128   vlib_thread_registration_t * tr;
1129   dpdk_main_t * dm = &dpdk_main;
1130   char *io_name = w->registration->name;
1131   dpdk_device_t * xd;
1132   dpdk_device_t ** my_devices = 0;
1133   vlib_frame_queue_elt_t ** handoff_queue_elt_by_worker_index = 0;
1134   vlib_frame_queue_t ** congested_handoff_queue_by_worker_index = 0;
1135   vlib_frame_queue_elt_t * hf = 0;
1136   int i;
1137   u32 n_left_to_next_worker = 0, * to_next_worker = 0;
1138   u32 next_worker_index = 0;
1139   u32 current_worker_index = ~0;
1140   u32 cpu_index = os_get_cpu_number();
1141   u32 num_workers = 0;
1142   u32 num_devices = 0;
1143   uword * p;
1144   u16 queue_id = 0;
1145   vlib_node_runtime_t * node_trace;
1146   u32 first_worker_index = 0;
1147   u32 buffer_flags_template;
1148   
1149   /* Wait until the dpdk init sequence is complete */
1150   while (dm->io_thread_release == 0)
1151     vlib_worker_thread_barrier_check();
1152
1153   clib_time_init (&vm->clib_time);
1154
1155   p = hash_get_mem (tm->thread_registrations_by_name, worker_name);
1156   ASSERT (p);
1157   tr = (vlib_thread_registration_t *) p[0];
1158   if (tr) 
1159     {
1160       num_workers = tr->count;
1161       first_worker_index = tr->first_index;
1162     }
1163
1164   /* Allocate devices to this thread */
1165   if (instances == 0) 
1166     {
1167       /* auto-assign */
1168       instance_id = w->instance_id;
1169
1170       p = hash_get_mem (tm->thread_registrations_by_name, io_name);
1171       tr = (vlib_thread_registration_t *) p[0];
1172       /* Otherwise, how did we get here */
1173       ASSERT (tr && tr->count);
1174       instances = tr->count;
1175     }
1176   else
1177     {
1178       /* manually assign */
1179       ASSERT (instance_id < instances);
1180     }
1181
1182   vec_validate (handoff_queue_elt_by_worker_index,
1183                 first_worker_index + num_workers - 1);
1184
1185   vec_validate_init_empty (congested_handoff_queue_by_worker_index,
1186                            first_worker_index + num_workers - 1,
1187                            (vlib_frame_queue_t *)(~0));
1188
1189   /* packet tracing is triggered on the dpdk-input node for ease-of-use */
1190   node_trace = vlib_node_get_runtime (vm, dpdk_input_node.index);
1191
1192   buffer_flags_template = dm->buffer_flags_template;
1193
1194   /* And handle them... */
1195   while (1)
1196     {
1197       u32 n_buffers;
1198       u32 mb_index;
1199       uword n_rx_bytes = 0;
1200       u32 n_trace, trace_cnt __attribute__((unused));
1201       vlib_buffer_free_list_t * fl;
1202       u32 hash;
1203       u64 hash_key;
1204       u8 efd_discard_burst;
1205
1206       vlib_worker_thread_barrier_check ();
1207
1208       /* Invoke callback if supplied */
1209       if (PREDICT_FALSE(callback != NULL))
1210           callback(vm);
1211
1212       if (PREDICT_FALSE(vec_len(dm->devices) != num_devices))
1213       {
1214         vec_reset_length(my_devices);
1215         vec_foreach (xd, dm->devices)
1216           {
1217             if (((xd - dm->devices) % tr->count) == instance_id)
1218               {
1219                 fprintf(stderr, "i/o thread %d (cpu %d) takes port %d\n",
1220                         instance_id, (int) os_get_cpu_number(), (int) (xd - dm->devices));
1221                 vec_add1 (my_devices, xd);
1222               }
1223           }
1224         num_devices = vec_len(dm->devices);
1225       }
1226
1227       for (i = 0; i < vec_len (my_devices); i++)
1228       {
1229           xd = my_devices[i];
1230
1231           if (!xd->admin_up)
1232             continue;
1233
1234           n_buffers = dpdk_rx_burst(dm, xd, 0 /* queue_id */);
1235
1236           if (n_buffers == 0)
1237             {
1238               /* check if EFD (dpdk) is enabled */
1239               if (PREDICT_FALSE(dm->efd.enabled))
1240                 {
1241                   /* reset a few stats */
1242                   xd->efd_agent.last_poll_time = 0;
1243                   xd->efd_agent.last_burst_sz = 0;
1244                 }
1245               continue;
1246             }
1247
1248           vec_reset_length (xd->d_trace_buffers);
1249           trace_cnt = n_trace = vlib_get_trace_count (vm, node_trace);
1250         
1251           /*
1252            * DAW-FIXME: VMXNET3 device stop/start doesn't work, 
1253            * therefore fake the stop in the dpdk driver by
1254            * silently dropping all of the incoming pkts instead of 
1255            * stopping the driver / hardware.
1256            */
1257           if (PREDICT_FALSE(xd->admin_up != 1))
1258             {
1259               for (mb_index = 0; mb_index < n_buffers; mb_index++)
1260                 rte_pktmbuf_free (xd->rx_vectors[queue_id][mb_index]);
1261               continue;
1262             }
1263
1264           /* reset EFD action for the burst */
1265           efd_discard_burst = 0;
1266           
1267           /* Check for congestion if EFD (Early-Fast-Discard) is enabled
1268            * in any mode (e.g. dpdk, monitor, or drop_all)
1269            */
1270           if (PREDICT_FALSE(dm->efd.enabled))
1271             {
1272               /* update EFD counters */
1273               dpdk_efd_update_counters(xd, n_buffers, dm->efd.enabled);
1274
1275               if (PREDICT_FALSE(dm->efd.enabled & DPDK_EFD_DROPALL_ENABLED))
1276                 {
1277                   /* drop all received packets */
1278                   for (mb_index = 0; mb_index < n_buffers; mb_index++)
1279                     rte_pktmbuf_free(xd->rx_vectors[queue_id][mb_index]);
1280
1281                   xd->efd_agent.discard_cnt += n_buffers;
1282                   increment_efd_drop_counter(vm, 
1283                                              DPDK_ERROR_VLAN_EFD_DROP_PKTS,
1284                                              n_buffers);
1285
1286                   continue;
1287                 }
1288
1289               if (PREDICT_FALSE(xd->efd_agent.consec_full_frames_cnt >=
1290                                 dm->efd.consec_full_frames_hi_thresh))
1291                 {
1292                   u32 device_queue_sz = rte_eth_rx_queue_count(xd->device_index,
1293                                                                queue_id);
1294                   if (device_queue_sz >= dm->efd.queue_hi_thresh)
1295                     {
1296                       /* dpdk device queue has reached the critical threshold */
1297                       xd->efd_agent.congestion_cnt++;
1298
1299                       /* apply EFD to packets from the burst */
1300                       efd_discard_burst = 1;
1301                     }
1302                 }
1303             }
1304
1305           fl = vlib_buffer_get_free_list 
1306             (vm, VLIB_BUFFER_DEFAULT_FREE_LIST_INDEX);
1307         
1308           mb_index = 0;
1309
1310           while (n_buffers > 0)
1311             {
1312               u32 bi0;
1313               u8 next0, error0;
1314               u32 l3_offset0;
1315               vlib_buffer_t * b0, * b_seg, * b_chain = 0;
1316               ethernet_header_t * h0;
1317               u8 nb_seg = 1;
1318               struct rte_mbuf *mb = xd->rx_vectors[queue_id][mb_index];
1319               struct rte_mbuf *mb_seg = mb->next;
1320                 
1321               if (PREDICT_TRUE(n_buffers > 1))
1322                 {
1323                   struct rte_mbuf *pfmb = xd->rx_vectors[queue_id][mb_index+2];
1324                   vlib_buffer_t *bp = (vlib_buffer_t *)(pfmb+1);
1325                   CLIB_PREFETCH (pfmb, CLIB_CACHE_LINE_BYTES, LOAD);
1326                   CLIB_PREFETCH (bp, CLIB_CACHE_LINE_BYTES, STORE);
1327                   CLIB_PREFETCH (bp->data, CLIB_CACHE_LINE_BYTES, LOAD);
1328                 }
1329                 
1330               b0 = (vlib_buffer_t *)(mb+1);
1331
1332               /* check whether EFD is looking for packets to discard */
1333               if (PREDICT_FALSE(efd_discard_burst))
1334                 {
1335                   u32 cntr_type;
1336                   if (PREDICT_TRUE(cntr_type = is_efd_discardable(tm, b0, mb)))
1337                     {
1338                       rte_pktmbuf_free(mb);
1339                       xd->efd_agent.discard_cnt++;
1340                       increment_efd_drop_counter(vm, 
1341                                                  cntr_type,
1342                                                  1);
1343
1344                       n_buffers--;
1345                       mb_index++;
1346                       continue;
1347                     }
1348                 }
1349               
1350               /* Prefetch one next segment if it exists */
1351               if (PREDICT_FALSE(mb->nb_segs > 1))
1352                 {
1353                   struct rte_mbuf *pfmb = mb->next;
1354                   vlib_buffer_t *bp = (vlib_buffer_t *)(pfmb+1);
1355                   CLIB_PREFETCH (pfmb, CLIB_CACHE_LINE_BYTES, LOAD);
1356                   CLIB_PREFETCH (bp, CLIB_CACHE_LINE_BYTES, STORE);
1357                   b_chain = b0;
1358                 }
1359
1360               bi0 = vlib_get_buffer_index (vm, b0);
1361               vlib_buffer_init_for_free_list (b0, fl);
1362               b0->clone_count = 0;
1363
1364               dpdk_rx_next_and_error_from_mb_flags_x1 (xd, mb, b0,
1365                                                        &next0, &error0);
1366 #ifdef RTE_LIBRTE_MBUF_EXT_RX_OLFLAGS
1367               /*
1368                * Clear overloaded TX offload flags when a DPDK driver
1369                * is using them for RX flags (e.g. Cisco VIC Ethernet driver)
1370                */
1371               if (PREDICT_TRUE(trace_cnt == 0))
1372                 mb->ol_flags &= PKT_EXT_RX_CLR_TX_FLAGS_MASK;
1373               else
1374                 trace_cnt--;
1375 #endif /* RTE_LIBRTE_MBUF_EXT_RX_OLFLAGS */
1376
1377               if (error0)
1378                   clib_warning ("bi %d error %d", bi0, error0);
1379
1380               b0->error = 0;
1381
1382               l3_offset0 = ((next0 == DPDK_RX_NEXT_IP4_INPUT ||
1383                              next0 == DPDK_RX_NEXT_IP6_INPUT || 
1384                              next0 == DPDK_RX_NEXT_MPLS_INPUT) ? 
1385                             sizeof (ethernet_header_t) : 0);
1386
1387               b0->current_data = l3_offset0;
1388               b0->current_length = mb->data_len - l3_offset0;
1389
1390               b0->flags = buffer_flags_template;
1391
1392               if (VMWARE_LENGTH_BUG_WORKAROUND)
1393                   b0->current_length -= 4;
1394                 
1395               vnet_buffer(b0)->sw_if_index[VLIB_RX] = xd->vlib_sw_if_index;
1396               vnet_buffer(b0)->sw_if_index[VLIB_TX] = (u32)~0;
1397               vnet_buffer(b0)->io_handoff.next_index = next0;
1398               n_rx_bytes += mb->pkt_len;
1399
1400               /* Process subsequent segments of multi-segment packets */
1401               while ((mb->nb_segs > 1) && (nb_seg < mb->nb_segs))
1402                 {
1403                   ASSERT(mb_seg != 0);
1404  
1405                   b_seg = (vlib_buffer_t *)(mb_seg+1);
1406                   vlib_buffer_init_for_free_list (b_seg, fl);
1407                   b_seg->clone_count = 0;
1408  
1409                   ASSERT((b_seg->flags & VLIB_BUFFER_NEXT_PRESENT) == 0);
1410                   ASSERT(b_seg->current_data == 0);
1411  
1412                   /*
1413                    * The driver (e.g. virtio) may not put the packet data at the start
1414                    * of the segment, so don't assume b_seg->current_data == 0 is correct.
1415                    */
1416                   b_seg->current_data = (mb_seg->buf_addr + mb_seg->data_off) - (void *)b_seg->data;
1417
1418                   b_seg->current_length = mb_seg->data_len;
1419                   b0->total_length_not_including_first_buffer +=
1420                     mb_seg->data_len;
1421  
1422                   b_chain->flags |= VLIB_BUFFER_NEXT_PRESENT;
1423                   b_chain->next_buffer = vlib_get_buffer_index (vm, b_seg);
1424  
1425                   b_chain = b_seg;
1426                   mb_seg = mb_seg->next;
1427                   nb_seg++;
1428                 }
1429
1430               /*
1431                * Turn this on if you run into
1432                * "bad monkey" contexts, and you want to know exactly
1433                * which nodes they've visited... See main.c...
1434                */
1435               VLIB_BUFFER_TRACE_TRAJECTORY_INIT(b0);
1436  
1437               if (PREDICT_FALSE (n_trace > mb_index))
1438                 vec_add1 (xd->d_trace_buffers, bi0);
1439
1440               next_worker_index = first_worker_index;
1441
1442               /* 
1443                * Force unknown traffic onto worker 0, 
1444                * and into ethernet-input. $$$$ add more hashes.
1445                */
1446               h0 = (ethernet_header_t *) b0->data;
1447
1448               /* Compute ingress LB hash */
1449               hash_key = eth_get_key(h0);
1450               hash = (u32)clib_xxhash(hash_key);
1451
1452               if (PREDICT_TRUE (is_pow2(num_workers)))
1453                 next_worker_index += hash & (num_workers - 1);
1454               else
1455                 next_worker_index += hash % num_workers;
1456
1457               /* if EFD is enabled and not already discarding from dpdk,
1458                * check the worker ring/queue for congestion
1459                */
1460               if (PREDICT_FALSE(tm->efd.enabled && !efd_discard_burst))
1461                 {
1462                   vlib_frame_queue_t *fq;
1463
1464                   /* fq will be valid if the ring is congested */
1465                   fq = is_vlib_handoff_queue_congested(
1466                       next_worker_index, tm->efd.queue_hi_thresh,
1467                       congested_handoff_queue_by_worker_index);
1468                   
1469                   if (PREDICT_FALSE(fq != NULL))
1470                     {
1471                       u32 cntr_type;
1472                       if (PREDICT_TRUE(cntr_type =
1473                                        is_efd_discardable(tm, b0, mb)))
1474                         {
1475                           /* discard the packet */
1476                           fq->enqueue_efd_discards++;
1477                           increment_efd_drop_counter(vm, cntr_type, 1);
1478                           rte_pktmbuf_free(mb);
1479                           n_buffers--;
1480                           mb_index++;
1481                           continue;
1482                         }
1483                     }
1484                 }
1485               
1486               if (next_worker_index != current_worker_index)
1487                 {
1488                   if (hf)
1489                     hf->n_vectors = VLIB_FRAME_SIZE - n_left_to_next_worker;
1490
1491                   hf = dpdk_get_handoff_queue_elt(
1492                            next_worker_index,
1493                            handoff_queue_elt_by_worker_index);
1494                       
1495                   n_left_to_next_worker = VLIB_FRAME_SIZE - hf->n_vectors;
1496                   to_next_worker = &hf->buffer_index[hf->n_vectors];
1497                   current_worker_index = next_worker_index;
1498                 }
1499               
1500               /* enqueue to correct worker thread */
1501               to_next_worker[0] = bi0;
1502               to_next_worker++;
1503               n_left_to_next_worker--;
1504
1505               if (n_left_to_next_worker == 0)
1506                 {
1507                   hf->n_vectors = VLIB_FRAME_SIZE;
1508                   vlib_put_handoff_queue_elt(hf);
1509                   current_worker_index = ~0;
1510                   handoff_queue_elt_by_worker_index[next_worker_index] = 0;
1511                   hf = 0;
1512                 }
1513                   
1514               n_buffers--;
1515               mb_index++;
1516             }
1517
1518           if (PREDICT_FALSE (vec_len (xd->d_trace_buffers) > 0))
1519             {
1520               /* credit the trace to the trace node */
1521               dpdk_rx_trace (dm, node_trace, xd, queue_id, xd->d_trace_buffers,
1522                              vec_len (xd->d_trace_buffers));
1523               vlib_set_trace_count (vm, node_trace, n_trace - vec_len (xd->d_trace_buffers));
1524             }
1525
1526           vlib_increment_combined_counter 
1527             (vnet_get_main()->interface_main.combined_sw_if_counters
1528              + VNET_INTERFACE_COUNTER_RX,
1529              cpu_index, 
1530              xd->vlib_sw_if_index,
1531              mb_index, n_rx_bytes);
1532
1533           dpdk_worker_t * dw = vec_elt_at_index(dm->workers, cpu_index);
1534           dw->aggregate_rx_packets += mb_index;
1535         }
1536
1537       if (hf)
1538         hf->n_vectors = VLIB_FRAME_SIZE - n_left_to_next_worker;
1539
1540       /* Ship frames to the worker nodes */
1541       for (i = 0; i < vec_len (handoff_queue_elt_by_worker_index); i++)
1542         {
1543           if (handoff_queue_elt_by_worker_index[i])
1544             {
1545               hf = handoff_queue_elt_by_worker_index[i];
1546               /* 
1547                * It works better to let the handoff node
1548                * rate-adapt, always ship the handoff queue element.
1549                */
1550               if (1 || hf->n_vectors == hf->last_n_vectors)
1551                 {
1552                   vlib_put_handoff_queue_elt(hf);
1553                   handoff_queue_elt_by_worker_index[i] = 0;
1554                 }
1555               else
1556                 hf->last_n_vectors = hf->n_vectors;
1557             }
1558           congested_handoff_queue_by_worker_index[i] = (vlib_frame_queue_t *)(~0);
1559         }
1560       hf = 0;
1561       current_worker_index = ~0;
1562
1563       vlib_increment_main_loop_counter (vm);
1564     }
1565 }
1566
1567 /*
1568  * This function is used when the main thread performs IO and feeds the
1569  * worker threads.
1570  */
1571 static uword
1572 dpdk_io_input (vlib_main_t * vm,
1573                vlib_node_runtime_t * node,
1574                vlib_frame_t * f)
1575 {
1576   dpdk_main_t * dm = &dpdk_main;
1577   dpdk_device_t * xd;
1578   vlib_thread_main_t * tm = vlib_get_thread_main();
1579   uword n_rx_packets = 0;
1580   static vlib_frame_queue_elt_t ** handoff_queue_elt_by_worker_index;
1581   static vlib_frame_queue_t ** congested_handoff_queue_by_worker_index = 0;
1582   vlib_frame_queue_elt_t * hf = 0;
1583   int i;
1584   u32 n_left_to_next_worker = 0, * to_next_worker = 0;
1585   u32 next_worker_index = 0;
1586   u32 current_worker_index = ~0;
1587   u32 cpu_index = os_get_cpu_number();
1588   static int num_workers_set;
1589   static u32 num_workers;
1590   u16 queue_id = 0;
1591   vlib_node_runtime_t * node_trace;
1592   static u32 first_worker_index;
1593   u32 buffer_flags_template;
1594
1595   if (PREDICT_FALSE(num_workers_set == 0))
1596     {
1597       uword * p;
1598       vlib_thread_registration_t * tr;
1599       /* Only the standard vnet worker threads are supported */
1600       p = hash_get_mem (tm->thread_registrations_by_name, "workers");
1601       tr = (vlib_thread_registration_t *) p[0];
1602       if (tr) 
1603         {
1604           num_workers = tr->count;
1605           first_worker_index = tr->first_index;
1606         }
1607       num_workers_set = 1;
1608     }
1609
1610   if (PREDICT_FALSE(handoff_queue_elt_by_worker_index == 0))
1611     {
1612       vec_validate (handoff_queue_elt_by_worker_index, tm->n_vlib_mains - 1);
1613       
1614       vec_validate_init_empty (congested_handoff_queue_by_worker_index,
1615                                first_worker_index + num_workers - 1,
1616                                (vlib_frame_queue_t *)(~0));
1617     }
1618
1619   /* packet tracing is triggered on the dpdk-input node for ease-of-use */
1620   node_trace = vlib_node_get_runtime (vm, dpdk_input_node.index);
1621
1622   buffer_flags_template = dm->buffer_flags_template;
1623
1624   vec_foreach (xd, dm->devices)
1625     {
1626       u32 n_buffers;
1627       u32 mb_index;
1628       uword n_rx_bytes = 0;
1629       u32 n_trace, trace_cnt __attribute__((unused));
1630       vlib_buffer_free_list_t * fl;
1631       u32 hash;
1632       u64 hash_key;
1633       u8 efd_discard_burst = 0;
1634
1635       if (!xd->admin_up)
1636         continue;
1637
1638       n_buffers = dpdk_rx_burst(dm, xd, queue_id );
1639
1640       if (n_buffers == 0)
1641         {
1642           /* check if EFD (dpdk) is enabled */
1643           if (PREDICT_FALSE(dm->efd.enabled))
1644             {
1645               /* reset a few stats */
1646               xd->efd_agent.last_poll_time = 0;
1647               xd->efd_agent.last_burst_sz = 0;
1648             }
1649           continue;
1650         }
1651
1652       vec_reset_length (xd->d_trace_buffers);
1653       trace_cnt = n_trace = vlib_get_trace_count (vm, node_trace);
1654         
1655       /*
1656        * DAW-FIXME: VMXNET3 device stop/start doesn't work, 
1657        * therefore fake the stop in the dpdk driver by
1658        * silently dropping all of the incoming pkts instead of 
1659        * stopping the driver / hardware.
1660        */
1661       if (PREDICT_FALSE(xd->admin_up != 1))
1662         {
1663           for (mb_index = 0; mb_index < n_buffers; mb_index++)
1664             rte_pktmbuf_free (xd->rx_vectors[queue_id][mb_index]);
1665           continue;
1666         }
1667
1668       /* Check for congestion if EFD (Early-Fast-Discard) is enabled
1669        * in any mode (e.g. dpdk, monitor, or drop_all)
1670        */
1671       if (PREDICT_FALSE(dm->efd.enabled))
1672         {
1673           /* update EFD counters */
1674           dpdk_efd_update_counters(xd, n_buffers, dm->efd.enabled);
1675
1676           if (PREDICT_FALSE(dm->efd.enabled & DPDK_EFD_DROPALL_ENABLED))
1677             {
1678               /* discard all received packets */
1679               for (mb_index = 0; mb_index < n_buffers; mb_index++)
1680                 rte_pktmbuf_free(xd->rx_vectors[queue_id][mb_index]);
1681
1682               xd->efd_agent.discard_cnt += n_buffers;
1683               increment_efd_drop_counter(vm, 
1684                                          DPDK_ERROR_VLAN_EFD_DROP_PKTS,
1685                                          n_buffers);
1686             
1687               continue;
1688             }
1689           
1690           if (PREDICT_FALSE(xd->efd_agent.consec_full_frames_cnt >=
1691                             dm->efd.consec_full_frames_hi_thresh))
1692             {
1693               u32 device_queue_sz = rte_eth_rx_queue_count(xd->device_index,
1694                                                            queue_id);
1695               if (device_queue_sz >= dm->efd.queue_hi_thresh)
1696                 {
1697                   /* dpdk device queue has reached the critical threshold */
1698                   xd->efd_agent.congestion_cnt++;
1699
1700                   /* apply EFD to packets from the burst */
1701                   efd_discard_burst = 1;
1702                 }
1703             }
1704         }
1705       
1706       fl = vlib_buffer_get_free_list 
1707         (vm, VLIB_BUFFER_DEFAULT_FREE_LIST_INDEX);
1708           
1709       mb_index = 0;
1710
1711       while (n_buffers > 0)
1712         {
1713           u32 bi0;
1714           u8 next0, error0;
1715           u32 l3_offset0;
1716           vlib_buffer_t * b0, * b_seg, * b_chain = 0;
1717           ethernet_header_t * h0;
1718           u8 nb_seg = 1;
1719           struct rte_mbuf *mb = xd->rx_vectors[queue_id][mb_index];
1720           struct rte_mbuf *mb_seg = mb->next;
1721
1722           if (PREDICT_TRUE(n_buffers > 1))
1723             {
1724               struct rte_mbuf *pfmb = xd->rx_vectors[queue_id][mb_index+2];
1725               vlib_buffer_t *bp = (vlib_buffer_t *)(pfmb+1);
1726               CLIB_PREFETCH (pfmb, CLIB_CACHE_LINE_BYTES, LOAD);
1727               CLIB_PREFETCH (bp, CLIB_CACHE_LINE_BYTES, STORE);
1728               CLIB_PREFETCH (bp->data, CLIB_CACHE_LINE_BYTES, LOAD);
1729             }
1730                 
1731           b0 = (vlib_buffer_t *)(mb+1);
1732                 
1733           /* check whether EFD is looking for packets to discard */
1734           if (PREDICT_FALSE(efd_discard_burst))
1735             {
1736               u32 cntr_type;
1737               if (PREDICT_TRUE(cntr_type = is_efd_discardable(tm, b0, mb)))
1738                 {
1739                   rte_pktmbuf_free(mb);
1740                   xd->efd_agent.discard_cnt++;
1741                   increment_efd_drop_counter(vm, 
1742                                              cntr_type,
1743                                              1);
1744
1745                   n_buffers--;
1746                   mb_index++;
1747                   continue;
1748                 }
1749             }
1750
1751           /* Prefetch one next segment if it exists */
1752           if (PREDICT_FALSE(mb->nb_segs > 1))
1753             {
1754               struct rte_mbuf *pfmb = mb->next;
1755               vlib_buffer_t *bp = (vlib_buffer_t *)(pfmb+1);
1756               CLIB_PREFETCH (pfmb, CLIB_CACHE_LINE_BYTES, LOAD);
1757               CLIB_PREFETCH (bp, CLIB_CACHE_LINE_BYTES, STORE);
1758               b_chain = b0;
1759             }
1760
1761           bi0 = vlib_get_buffer_index (vm, b0);
1762           vlib_buffer_init_for_free_list (b0, fl);
1763           b0->clone_count = 0;
1764
1765           dpdk_rx_next_and_error_from_mb_flags_x1 (xd, mb, b0,
1766                                                    &next0, &error0);
1767 #ifdef RTE_LIBRTE_MBUF_EXT_RX_OLFLAGS
1768           /*
1769            * Clear overloaded TX offload flags when a DPDK driver
1770            * is using them for RX flags (e.g. Cisco VIC Ethernet driver)
1771            */
1772           if (PREDICT_TRUE(trace_cnt == 0))
1773             mb->ol_flags &= PKT_EXT_RX_CLR_TX_FLAGS_MASK;
1774           else
1775             trace_cnt--;
1776 #endif /* RTE_LIBRTE_MBUF_EXT_RX_OLFLAGS */
1777
1778           if (error0)
1779             clib_warning ("bi %d error %d", bi0, error0);
1780
1781           b0->error = 0;
1782
1783           l3_offset0 = ((next0 == DPDK_RX_NEXT_IP4_INPUT ||
1784                          next0 == DPDK_RX_NEXT_IP6_INPUT || 
1785                          next0 == DPDK_RX_NEXT_MPLS_INPUT) ? 
1786                         sizeof (ethernet_header_t) : 0);
1787
1788           b0->current_data = l3_offset0;
1789           b0->current_length = mb->data_len - l3_offset0;
1790
1791           b0->flags = buffer_flags_template;
1792                 
1793           if (VMWARE_LENGTH_BUG_WORKAROUND)
1794               b0->current_length -= 4;
1795
1796           vnet_buffer(b0)->sw_if_index[VLIB_RX] = xd->vlib_sw_if_index;
1797           vnet_buffer(b0)->sw_if_index[VLIB_TX] = (u32)~0;
1798           vnet_buffer(b0)->io_handoff.next_index = next0;
1799           n_rx_bytes += mb->pkt_len;
1800
1801           /* Process subsequent segments of multi-segment packets */
1802           while ((mb->nb_segs > 1) && (nb_seg < mb->nb_segs))
1803             {
1804               ASSERT(mb_seg != 0);
1805  
1806               b_seg = (vlib_buffer_t *)(mb_seg+1);
1807               vlib_buffer_init_for_free_list (b_seg, fl);
1808               b_seg->clone_count = 0;
1809  
1810               ASSERT((b_seg->flags & VLIB_BUFFER_NEXT_PRESENT) == 0);
1811               ASSERT(b_seg->current_data == 0);
1812  
1813               /*
1814                * The driver (e.g. virtio) may not put the packet data at the start
1815                * of the segment, so don't assume b_seg->current_data == 0 is correct.
1816                */
1817               b_seg->current_data = (mb_seg->buf_addr + mb_seg->data_off) - (void *)b_seg->data;
1818
1819               b_seg->current_length = mb_seg->data_len;
1820               b0->total_length_not_including_first_buffer +=
1821                 mb_seg->data_len;
1822  
1823               b_chain->flags |= VLIB_BUFFER_NEXT_PRESENT;
1824               b_chain->next_buffer = vlib_get_buffer_index (vm, b_seg);
1825  
1826               b_chain = b_seg;
1827               mb_seg = mb_seg->next;
1828               nb_seg++;
1829             }
1830  
1831           /*
1832            * Turn this on if you run into
1833            * "bad monkey" contexts, and you want to know exactly
1834            * which nodes they've visited... See main.c...
1835            */
1836           VLIB_BUFFER_TRACE_TRAJECTORY_INIT(b0);
1837  
1838           if (PREDICT_FALSE (n_trace > mb_index))
1839             vec_add1 (xd->d_trace_buffers, bi0);
1840
1841           next_worker_index = first_worker_index;
1842
1843           /* 
1844            * Force unknown traffic onto worker 0, 
1845            * and into ethernet-input. $$$$ add more hashes.
1846            */
1847           h0 = (ethernet_header_t *) b0->data;
1848
1849           /* Compute ingress LB hash */
1850           hash_key = eth_get_key(h0);
1851           hash = (u32)clib_xxhash(hash_key);
1852
1853           if (PREDICT_TRUE (is_pow2(num_workers)))
1854             next_worker_index += hash & (num_workers - 1);
1855           else
1856             next_worker_index += hash % num_workers;
1857
1858           /* if EFD is enabled and not already discarding from dpdk,
1859            * check the worker ring/queue for congestion
1860            */
1861           if (PREDICT_FALSE(tm->efd.enabled && !efd_discard_burst))
1862             {
1863               vlib_frame_queue_t *fq;
1864
1865               /* fq will be valid if the ring is congested */
1866               fq = is_vlib_handoff_queue_congested(
1867                   next_worker_index, tm->efd.queue_hi_thresh,
1868                   congested_handoff_queue_by_worker_index);
1869               
1870               if (PREDICT_FALSE(fq != NULL))
1871                 {
1872                   u32 cntr_type;
1873                   if (PREDICT_TRUE(cntr_type =
1874                                    is_efd_discardable(tm, b0, mb)))
1875                     {
1876                       /* discard the packet */
1877                       fq->enqueue_efd_discards++;
1878                       increment_efd_drop_counter(vm, cntr_type, 1);
1879                       rte_pktmbuf_free(mb);
1880                       n_buffers--;
1881                       mb_index++;
1882                       continue;
1883                     }
1884                 }
1885             }
1886           
1887           if (next_worker_index != current_worker_index)
1888             {
1889               if (hf)
1890                 hf->n_vectors = VLIB_FRAME_SIZE - n_left_to_next_worker;
1891
1892               hf = dpdk_get_handoff_queue_elt(
1893                      next_worker_index,
1894                      handoff_queue_elt_by_worker_index);
1895
1896               n_left_to_next_worker = VLIB_FRAME_SIZE - hf->n_vectors;
1897               to_next_worker = &hf->buffer_index[hf->n_vectors];
1898               current_worker_index = next_worker_index;
1899             }
1900           
1901           /* enqueue to correct worker thread */
1902           to_next_worker[0] = bi0;
1903           to_next_worker++;
1904           n_left_to_next_worker--;
1905
1906           if (n_left_to_next_worker == 0)
1907             {
1908               hf->n_vectors = VLIB_FRAME_SIZE;
1909               vlib_put_handoff_queue_elt(hf);
1910               current_worker_index = ~0;
1911               handoff_queue_elt_by_worker_index[next_worker_index] = 0;
1912               hf = 0;
1913             }
1914           
1915           n_buffers--;
1916           mb_index++;
1917         }
1918
1919       if (PREDICT_FALSE (vec_len (xd->d_trace_buffers) > 0))
1920         {
1921           /* credit the trace to the trace node */
1922           dpdk_rx_trace (dm, node_trace, xd, queue_id, xd->d_trace_buffers,
1923                          vec_len (xd->d_trace_buffers));
1924           vlib_set_trace_count (vm, node_trace, n_trace - vec_len (xd->d_trace_buffers));
1925         }
1926
1927       vlib_increment_combined_counter 
1928         (vnet_get_main()->interface_main.combined_sw_if_counters
1929          + VNET_INTERFACE_COUNTER_RX,
1930          cpu_index, 
1931          xd->vlib_sw_if_index,
1932          mb_index, n_rx_bytes);
1933
1934       dpdk_worker_t * dw = vec_elt_at_index(dm->workers, cpu_index);
1935       dw->aggregate_rx_packets += mb_index;
1936       n_rx_packets += mb_index;
1937     }
1938
1939   if (hf)
1940     hf->n_vectors = VLIB_FRAME_SIZE - n_left_to_next_worker;
1941   
1942   /* Ship frames to the worker nodes */
1943   for (i = 0; i < vec_len (handoff_queue_elt_by_worker_index); i++)
1944     {
1945       if (handoff_queue_elt_by_worker_index[i])
1946         {
1947           hf = handoff_queue_elt_by_worker_index[i];
1948           /* 
1949            * It works better to let the handoff node
1950            * rate-adapt, always ship the handoff queue element.
1951            */
1952           if (1 || hf->n_vectors == hf->last_n_vectors)
1953             {
1954               vlib_put_handoff_queue_elt(hf);
1955               handoff_queue_elt_by_worker_index[i] = 0;
1956             }
1957           else
1958             hf->last_n_vectors = hf->n_vectors;
1959         }
1960       congested_handoff_queue_by_worker_index[i] = (vlib_frame_queue_t *)(~0);
1961     }
1962   hf = 0;
1963   current_worker_index = ~0;
1964   return n_rx_packets;
1965 }
1966
1967 VLIB_REGISTER_NODE (dpdk_io_input_node) = {
1968   .function = dpdk_io_input,
1969   .type = VLIB_NODE_TYPE_INPUT,
1970   .name = "dpdk-io-input",
1971
1972   /* Will be enabled if/when hardware is detected. */
1973   .state = VLIB_NODE_STATE_DISABLED,
1974
1975   .format_buffer = format_ethernet_header_with_length,
1976   .format_trace = format_dpdk_rx_dma_trace,
1977
1978   .n_errors = DPDK_N_ERROR,
1979   .error_strings = dpdk_error_strings,
1980
1981   .n_next_nodes = DPDK_RX_N_NEXT,
1982   .next_nodes = {
1983     [DPDK_RX_NEXT_DROP] = "error-drop",
1984     [DPDK_RX_NEXT_ETHERNET_INPUT] = "ethernet-input",
1985     [DPDK_RX_NEXT_IP4_INPUT] = "ip4-input-no-checksum",
1986     [DPDK_RX_NEXT_IP6_INPUT] = "ip6-input",
1987     [DPDK_RX_NEXT_MPLS_INPUT] = "mpls-gre-input",
1988   },
1989 };
1990
1991 /*
1992  * set_efd_bitmap()
1993  * Based on the operation type, set lower/upper bits for the given index value
1994  */
1995 void
1996 set_efd_bitmap (u8 *bitmap, u32 value, u32 op)
1997 {
1998     int ix;
1999
2000     *bitmap = 0;
2001     for (ix = 0; ix < 8; ix++) {
2002         if (((op == EFD_OPERATION_LESS_THAN) && (ix < value)) ||
2003             ((op == EFD_OPERATION_GREATER_OR_EQUAL) && (ix >= value))){
2004             (*bitmap) |= (1 << ix);
2005         }
2006     }
2007 }
2008
2009 void
2010 efd_config (u32 enabled, 
2011             u32 ip_prec,  u32 ip_op,
2012             u32 mpls_exp, u32 mpls_op,
2013             u32 vlan_cos, u32 vlan_op)
2014 {
2015    vlib_thread_main_t * tm = vlib_get_thread_main();
2016    dpdk_main_t * dm = &dpdk_main;
2017
2018    if (enabled) {
2019        tm->efd.enabled |= VLIB_EFD_DISCARD_ENABLED;
2020        dm->efd.enabled |= DPDK_EFD_DISCARD_ENABLED;
2021    } else {
2022        tm->efd.enabled &= ~VLIB_EFD_DISCARD_ENABLED;
2023        dm->efd.enabled &= ~DPDK_EFD_DISCARD_ENABLED;
2024    }
2025
2026    set_efd_bitmap(&tm->efd.ip_prec_bitmap, ip_prec, ip_op);
2027    set_efd_bitmap(&tm->efd.mpls_exp_bitmap, mpls_exp, mpls_op);
2028    set_efd_bitmap(&tm->efd.vlan_cos_bitmap, vlan_cos, vlan_op);
2029
2030 }