dpdk: quad loop dpdk-input node
authorDamjan Marion <[email protected]>
Tue, 15 Nov 2016 18:47:58 +0000 (19:47 +0100)
committerDave Barach <[email protected]>
Fri, 18 Nov 2016 13:42:30 +0000 (13:42 +0000)
Change-Id: I761af883403b6740bd24ce196ae0bfe6bc77b409
Signed-off-by: Damjan Marion <[email protected]>
vnet/vnet/devices/devices.c
vnet/vnet/devices/devices.h
vnet/vnet/devices/dpdk/dpdk.h
vnet/vnet/devices/dpdk/init.c
vnet/vnet/devices/dpdk/node.c
vnet/vnet/feature/feature.h

index 3eef95b..a110019 100644 (file)
  * limitations under the License.
  */
 
+#include <vnet/vnet.h>
 #include <vnet/devices/devices.h>
 #include <vnet/feature/feature.h>
+#include <vnet/ip/ip.h>
+#include <vnet/ethernet/ethernet.h>
 
 static uword
 device_input_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
@@ -33,6 +36,18 @@ VLIB_REGISTER_NODE (device_input_node) = {
   .next_nodes = VNET_DEVICE_INPUT_NEXT_NODES,
 };
 
+/* Table defines how much we need to advance current data pointer
+   in the buffer if we shortcut to l3 nodes */
+
+const u32 __attribute__((aligned (CLIB_CACHE_LINE_BYTES)))
+device_input_next_node_advance[((VNET_DEVICE_INPUT_N_NEXT_NODES /
+                               CLIB_CACHE_LINE_BYTES) +1) * CLIB_CACHE_LINE_BYTES] =
+{
+      [VNET_DEVICE_INPUT_NEXT_IP4_INPUT] = sizeof (ethernet_header_t),
+      [VNET_DEVICE_INPUT_NEXT_IP6_INPUT] = sizeof (ethernet_header_t),
+      [VNET_DEVICE_INPUT_NEXT_MPLS_INPUT] = sizeof (ethernet_header_t),
+};
+
 VNET_FEATURE_ARC_INIT (device_input, static) =
 {
   .arc_name  = "device-input",
index b9a8aaa..3bd700a 100644 (file)
@@ -38,6 +38,7 @@ typedef enum
 }
 
 extern vlib_node_registration_t device_input_node;
+extern const u32 device_input_next_node_advance[];
 
 #endif /* included_vnet_vnet_device_h */
 
index b970d4a..7e5712f 100644 (file)
@@ -192,6 +192,7 @@ typedef struct
 #define DPDK_DEVICE_FLAG_ADMIN_UP       (1 << 0)
 #define DPDK_DEVICE_FLAG_PROMISC        (1 << 1)
 #define DPDK_DEVICE_FLAG_PMD            (1 << 2)
+#define DPDK_DEVICE_FLAG_MAYBE_MULTISEG (1 << 3)
 
 #define DPDK_DEVICE_FLAG_HAVE_SUBIF     (1 << 5)
 #define DPDK_DEVICE_FLAG_HQOS           (1 << 6)
index cf5f329..c040af6 100755 (executable)
@@ -396,6 +396,7 @@ dpdk_lib_init (dpdk_main_t * dm)
        {
          xd->tx_conf.txq_flags &= ~ETH_TXQ_FLAGS_NOMULTSEGS;
          port_conf_template.rxmode.jumbo_frame = 1;
+         xd->flags |= DPDK_DEVICE_FLAG_MAYBE_MULTISEG;
        }
 
       clib_memcpy (&xd->port_conf, &port_conf_template,
index fa89403..417ac39 100644 (file)
 
 #include "dpdk_priv.h"
 
-#ifndef MAX
-#define MAX(a,b) ((a) < (b) ? (b) : (a))
-#endif
-
-#ifndef MIN
-#define MIN(a,b) ((a) < (b) ? (a) : (b))
-#endif
-
-/*
- * At least in certain versions of ESXi, vmware e1000's don't honor the
- * "strip rx CRC" bit. Set this flag to work around that bug FOR UNIT TEST ONLY.
- *
- * If wireshark complains like so:
- *
- * "Frame check sequence: 0x00000000 [incorrect, should be <hex-num>]"
- * and you're using ESXi emulated e1000's, set this flag FOR UNIT TEST ONLY.
- *
- * Note: do NOT check in this file with this workaround enabled! You'll lose
- * actual data from e.g. 10xGE interfaces. The extra 4 bytes annoy
- * wireshark, but they're harmless...
- */
-#define VMWARE_LENGTH_BUG_WORKAROUND 0
-
 static char *dpdk_error_strings[] = {
 #define _(n,s) s,
   foreach_dpdk_error
@@ -82,7 +59,7 @@ dpdk_rx_next_and_error_from_mb_flags_x1 (dpdk_device_t * xd,
                                         vlib_buffer_t * b0, u32 * next0,
                                         u8 * error0)
 {
-  u8 n0;
+  u32 n0;
   uint16_t mb_flags = mb->ol_flags;
 
   if (PREDICT_FALSE (mb_flags & (PKT_RX_IP_CKSUM_BAD | PKT_RX_L4_CKSUM_BAD)))
@@ -209,6 +186,53 @@ dpdk_rx_burst (dpdk_main_t * dm, dpdk_device_t * xd, u16 queue_id)
   return n_buffers;
 }
 
+
+static_always_inline void
+dpdk_process_subseq_segs (vlib_main_t *vm, vlib_buffer_t *b, struct rte_mbuf * mb, vlib_buffer_free_list_t *fl)
+{
+  u8 nb_seg = 1;
+  struct rte_mbuf *mb_seg = 0;
+  vlib_buffer_t *b_seg, *b_chain = 0;
+  mb_seg = mb->next;
+  b_chain = b;
+
+  while ((mb->nb_segs > 1) && (nb_seg < mb->nb_segs))
+    {
+      ASSERT (mb_seg != 0);
+
+      b_seg = vlib_buffer_from_rte_mbuf (mb_seg);
+      vlib_buffer_init_for_free_list (b_seg, fl);
+
+      ASSERT ((b_seg->flags & VLIB_BUFFER_NEXT_PRESENT) == 0);
+      ASSERT (b_seg->current_data == 0);
+
+      /*
+       * The driver (e.g. virtio) may not put the packet data at the start
+       * of the segment, so don't assume b_seg->current_data == 0 is correct.
+       */
+      b_seg->current_data =
+       (mb_seg->buf_addr + mb_seg->data_off) - (void *) b_seg->data;
+
+      b_seg->current_length = mb_seg->data_len;
+      b->total_length_not_including_first_buffer += mb_seg->data_len;
+
+      b_chain->flags |= VLIB_BUFFER_NEXT_PRESENT;
+      b_chain->next_buffer = vlib_get_buffer_index (vm, b_seg);
+
+      b_chain = b_seg;
+      mb_seg = mb_seg->next;
+      nb_seg++;
+    }
+}
+
+static_always_inline void
+dpdk_prefetch_buffer (struct rte_mbuf *mb)
+{
+    vlib_buffer_t *b = vlib_buffer_from_rte_mbuf (mb);
+    CLIB_PREFETCH (mb, CLIB_CACHE_LINE_BYTES, LOAD);
+    CLIB_PREFETCH (b, CLIB_CACHE_LINE_BYTES, STORE);
+}
+
 /*
  * This function is used when there are no worker threads.
  * The main thread performs IO and forwards the packets.
@@ -244,107 +268,201 @@ dpdk_device_input (dpdk_main_t * dm,
   vec_reset_length (xd->d_trace_buffers[cpu_index]);
   trace_cnt = n_trace = vlib_get_trace_count (vm, node);
 
+  if (n_trace > 0)
+    {
+      u32 n = clib_min (n_trace, n_buffers);
+      mb_index = 0;
+
+      while (n--)
+       {
+          struct rte_mbuf *mb = xd->rx_vectors[queue_id][mb_index++];
+          vlib_buffer_t * b = vlib_buffer_from_rte_mbuf (mb);
+          vec_add1 (xd->d_trace_buffers[cpu_index], vlib_get_buffer_index (vm, b));
+       }
+    }
+
   fl = vlib_buffer_get_free_list (vm, VLIB_BUFFER_DEFAULT_FREE_LIST_INDEX);
 
   mb_index = 0;
 
   while (n_buffers > 0)
     {
-      u32 bi0, next0;
-      u8 error0;
-      u32 l3_offset0;
-      vlib_buffer_t *b0, *b_seg, *b_chain = 0;
+      vlib_buffer_t *b0, *b1, *b2, *b3;
+      u32 bi0, next0, l3_offset0;
+      u32 bi1, next1, l3_offset1;
+      u32 bi2, next2, l3_offset2;
+      u32 bi3, next3, l3_offset3;
+      u8 error0, error1, error2, error3;
 
       vlib_get_next_frame (vm, node, next_index, to_next, n_left_to_next);
 
-      while (n_buffers > 0 && n_left_to_next > 0)
+      while (n_buffers > 8 && n_left_to_next > 4)
        {
-         u8 nb_seg = 1;
-         struct rte_mbuf *mb = xd->rx_vectors[queue_id][mb_index];
-         struct rte_mbuf *mb_seg = mb->next;
+         struct rte_mbuf *mb0 = xd->rx_vectors[queue_id][mb_index];
+         struct rte_mbuf *mb1 = xd->rx_vectors[queue_id][mb_index + 1];
+         struct rte_mbuf *mb2 = xd->rx_vectors[queue_id][mb_index + 2];
+         struct rte_mbuf *mb3 = xd->rx_vectors[queue_id][mb_index + 3];
+
+         dpdk_prefetch_buffer (xd->rx_vectors[queue_id][mb_index + 4]);
+         dpdk_prefetch_buffer (xd->rx_vectors[queue_id][mb_index + 5]);
+         dpdk_prefetch_buffer (xd->rx_vectors[queue_id][mb_index + 6]);
+         dpdk_prefetch_buffer (xd->rx_vectors[queue_id][mb_index + 7]);
 
-         if (PREDICT_TRUE (n_buffers > 2))
+         if (xd->flags & DPDK_DEVICE_FLAG_MAYBE_MULTISEG)
            {
-             struct rte_mbuf *pfmb = xd->rx_vectors[queue_id][mb_index + 2];
-             vlib_buffer_t *bp = vlib_buffer_from_rte_mbuf (pfmb);
-             CLIB_PREFETCH (pfmb, CLIB_CACHE_LINE_BYTES, STORE);
-             CLIB_PREFETCH (bp, CLIB_CACHE_LINE_BYTES, STORE);
+             if (PREDICT_FALSE (mb0->nb_segs > 1))
+               dpdk_prefetch_buffer (mb0->next);
+             if (PREDICT_FALSE (mb1->nb_segs > 1))
+               dpdk_prefetch_buffer (mb1->next);
+             if (PREDICT_FALSE (mb2->nb_segs > 1))
+               dpdk_prefetch_buffer (mb2->next);
+             if (PREDICT_FALSE (mb3->nb_segs > 1))
+               dpdk_prefetch_buffer (mb3->next);
            }
 
-         ASSERT (mb);
-
-         b0 = vlib_buffer_from_rte_mbuf (mb);
+         ASSERT (mb0);
+         ASSERT (mb1);
+         ASSERT (mb2);
+         ASSERT (mb3);
 
-         /* Prefetch one next segment if it exists. */
-         if (PREDICT_FALSE (mb->nb_segs > 1))
-           {
-             struct rte_mbuf *pfmb = mb->next;
-             vlib_buffer_t *bp = vlib_buffer_from_rte_mbuf (pfmb);
-             CLIB_PREFETCH (pfmb, CLIB_CACHE_LINE_BYTES, LOAD);
-             CLIB_PREFETCH (bp, CLIB_CACHE_LINE_BYTES, STORE);
-             b_chain = b0;
-           }
+         b0 = vlib_buffer_from_rte_mbuf (mb0);
+         b1 = vlib_buffer_from_rte_mbuf (mb1);
+         b2 = vlib_buffer_from_rte_mbuf (mb2);
+         b3 = vlib_buffer_from_rte_mbuf (mb3);
 
          vlib_buffer_init_for_free_list (b0, fl);
+         vlib_buffer_init_for_free_list (b1, fl);
+         vlib_buffer_init_for_free_list (b2, fl);
+         vlib_buffer_init_for_free_list (b3, fl);
 
          bi0 = vlib_get_buffer_index (vm, b0);
+         bi1 = vlib_get_buffer_index (vm, b1);
+         bi2 = vlib_get_buffer_index (vm, b2);
+         bi3 = vlib_get_buffer_index (vm, b3);
 
          to_next[0] = bi0;
-         to_next++;
-         n_left_to_next--;
+         to_next[1] = bi1;
+         to_next[2] = bi2;
+         to_next[3] = bi3;
+         to_next+=4;
+         n_left_to_next-=4;
+
+         dpdk_rx_next_and_error_from_mb_flags_x1 (xd, mb0, b0, &next0, &error0);
+         dpdk_rx_next_and_error_from_mb_flags_x1 (xd, mb1, b1, &next1, &error1);
+         dpdk_rx_next_and_error_from_mb_flags_x1 (xd, mb2, b2, &next2, &error2);
+         dpdk_rx_next_and_error_from_mb_flags_x1 (xd, mb3, b3, &next3, &error3);
 
-         dpdk_rx_next_and_error_from_mb_flags_x1 (xd, mb, b0,
-                                                  &next0, &error0);
          b0->error = node->errors[error0];
+         b1->error = node->errors[error1];
+         b2->error = node->errors[error2];
+         b3->error = node->errors[error3];
 
-         l3_offset0 = ((next0 == VNET_DEVICE_INPUT_NEXT_IP4_INPUT ||
-                        next0 == VNET_DEVICE_INPUT_NEXT_IP6_INPUT ||
-                        next0 == VNET_DEVICE_INPUT_NEXT_MPLS_INPUT) ?
-                       sizeof (ethernet_header_t) : 0);
+         l3_offset0 = device_input_next_node_advance[next0];
+         l3_offset1 = device_input_next_node_advance[next1];
+         l3_offset2 = device_input_next_node_advance[next2];
+         l3_offset3 = device_input_next_node_advance[next3];
 
-         b0->current_data = l3_offset0;
-         /* Some drivers like fm10k receive frames with
-            mb->data_off > RTE_PKTMBUF_HEADROOM */
-         b0->current_data += mb->data_off - RTE_PKTMBUF_HEADROOM;
-         b0->current_length = mb->data_len - l3_offset0;
+         b0->current_data = l3_offset0 + mb0->data_off - RTE_PKTMBUF_HEADROOM;
+         b1->current_data = l3_offset1 + mb1->data_off - RTE_PKTMBUF_HEADROOM;
+         b2->current_data = l3_offset2 + mb2->data_off - RTE_PKTMBUF_HEADROOM;
+         b3->current_data = l3_offset3 + mb3->data_off - RTE_PKTMBUF_HEADROOM;
 
-         b0->flags = buffer_flags_template;
+         b0->current_length = mb0->data_len - l3_offset0;
+         b1->current_length = mb1->data_len - l3_offset1;
+         b2->current_length = mb2->data_len - l3_offset2;
+         b3->current_length = mb3->data_len - l3_offset3;
 
-         if (VMWARE_LENGTH_BUG_WORKAROUND)
-           b0->current_length -= 4;
+         b0->flags = buffer_flags_template;
+         b1->flags = buffer_flags_template;
+         b2->flags = buffer_flags_template;
+         b3->flags = buffer_flags_template;
 
          vnet_buffer (b0)->sw_if_index[VLIB_RX] = xd->vlib_sw_if_index;
+         vnet_buffer (b1)->sw_if_index[VLIB_RX] = xd->vlib_sw_if_index;
+         vnet_buffer (b2)->sw_if_index[VLIB_RX] = xd->vlib_sw_if_index;
+         vnet_buffer (b3)->sw_if_index[VLIB_RX] = xd->vlib_sw_if_index;
+
          vnet_buffer (b0)->sw_if_index[VLIB_TX] = (u32) ~ 0;
-         n_rx_bytes += mb->pkt_len;
+         vnet_buffer (b1)->sw_if_index[VLIB_TX] = (u32) ~ 0;
+         vnet_buffer (b2)->sw_if_index[VLIB_TX] = (u32) ~ 0;
+         vnet_buffer (b3)->sw_if_index[VLIB_TX] = (u32) ~ 0;
+
+         n_rx_bytes += mb0->pkt_len;
+         n_rx_bytes += mb1->pkt_len;
+         n_rx_bytes += mb2->pkt_len;
+         n_rx_bytes += mb3->pkt_len;
 
          /* Process subsequent segments of multi-segment packets */
-         while ((mb->nb_segs > 1) && (nb_seg < mb->nb_segs))
+         if (xd->flags & DPDK_DEVICE_FLAG_MAYBE_MULTISEG)
            {
-             ASSERT (mb_seg != 0);
+             dpdk_process_subseq_segs (vm, b0, mb0, fl);
+             dpdk_process_subseq_segs (vm, b1, mb1, fl);
+             dpdk_process_subseq_segs (vm, b2, mb2, fl);
+             dpdk_process_subseq_segs (vm, b3, mb3, fl);
+           }
+
+         /*
+          * Turn this on if you run into
+          * "bad monkey" contexts, and you want to know exactly
+          * which nodes they've visited... See main.c...
+          */
+         VLIB_BUFFER_TRACE_TRAJECTORY_INIT (b0);
+         VLIB_BUFFER_TRACE_TRAJECTORY_INIT (b1);
+         VLIB_BUFFER_TRACE_TRAJECTORY_INIT (b2);
+         VLIB_BUFFER_TRACE_TRAJECTORY_INIT (b3);
 
-             b_seg = vlib_buffer_from_rte_mbuf (mb_seg);
-             vlib_buffer_init_for_free_list (b_seg, fl);
+          /* Do we have any driver RX features configured on the interface? */
+         vnet_feature_start_device_input_x4 (xd->vlib_sw_if_index,
+                                             &next0, &next1, &next2, &next3,
+                                             b0, b1, b2, b3,
+                                             l3_offset0, l3_offset1,
+                                             l3_offset2, l3_offset3);
 
-             ASSERT ((b_seg->flags & VLIB_BUFFER_NEXT_PRESENT) == 0);
-             ASSERT (b_seg->current_data == 0);
+         vlib_validate_buffer_enqueue_x4 (vm, node, next_index,
+                                          to_next, n_left_to_next,
+                                          bi0, bi1, bi2, bi3,
+                                          next0, next1, next2, next3);
+         n_buffers-=4;
+         mb_index+=4;
+       }
+      while (n_buffers > 0 && n_left_to_next > 0)
+       {
+         struct rte_mbuf *mb0 = xd->rx_vectors[queue_id][mb_index];
 
-             /*
-              * The driver (e.g. virtio) may not put the packet data at the start
-              * of the segment, so don't assume b_seg->current_data == 0 is correct.
-              */
-             b_seg->current_data =
-               (mb_seg->buf_addr + mb_seg->data_off) - (void *) b_seg->data;
+         ASSERT (mb0);
 
-             b_seg->current_length = mb_seg->data_len;
-             b0->total_length_not_including_first_buffer += mb_seg->data_len;
+         b0 = vlib_buffer_from_rte_mbuf (mb0);
 
-             b_chain->flags |= VLIB_BUFFER_NEXT_PRESENT;
-             b_chain->next_buffer = vlib_get_buffer_index (vm, b_seg);
+         /* Prefetch one next segment if it exists. */
+         if (PREDICT_FALSE (mb0->nb_segs > 1))
+           dpdk_prefetch_buffer (mb0->next);
 
-             b_chain = b_seg;
-             mb_seg = mb_seg->next;
-             nb_seg++;
-           }
+         vlib_buffer_init_for_free_list (b0, fl);
+
+         bi0 = vlib_get_buffer_index (vm, b0);
+
+         to_next[0] = bi0;
+         to_next++;
+         n_left_to_next--;
+
+         dpdk_rx_next_and_error_from_mb_flags_x1 (xd, mb0, b0,
+                                                  &next0, &error0);
+         b0->error = node->errors[error0];
+
+         l3_offset0 = device_input_next_node_advance[next0];
+
+         b0->current_data = l3_offset0;
+         b0->current_data += mb0->data_off - RTE_PKTMBUF_HEADROOM;
+         b0->current_length = mb0->data_len - l3_offset0;
+
+         b0->flags = buffer_flags_template;
+
+         vnet_buffer (b0)->sw_if_index[VLIB_RX] = xd->vlib_sw_if_index;
+         vnet_buffer (b0)->sw_if_index[VLIB_TX] = (u32) ~ 0;
+         n_rx_bytes += mb0->pkt_len;
+
+         /* Process subsequent segments of multi-segment packets */
+         dpdk_process_subseq_segs (vm, b0, mb0, fl);
 
          /*
           * Turn this on if you run into
@@ -359,8 +477,6 @@ dpdk_device_input (dpdk_main_t * dm,
          vlib_validate_buffer_enqueue_x1 (vm, node, next_index,
                                           to_next, n_left_to_next,
                                           bi0, next0);
-         if (PREDICT_FALSE (n_trace > mb_index))
-           vec_add1 (xd->d_trace_buffers[cpu_index], bi0);
          n_buffers--;
          mb_index++;
        }
index 96d573b..0b33f05 100644 (file)
@@ -292,6 +292,70 @@ vnet_feature_start_device_input_x2 (u32 sw_if_index,
     }
 }
 
+static_always_inline void
+vnet_feature_start_device_input_x4 (u32 sw_if_index,
+                                   u32 * next0,
+                                   u32 * next1,
+                                   u32 * next2,
+                                   u32 * next3,
+                                   vlib_buffer_t * b0,
+                                   vlib_buffer_t * b1,
+                                   vlib_buffer_t * b2,
+                                   vlib_buffer_t * b3,
+                                   u16 buffer_advanced0,
+                                   u16 buffer_advanced1,
+                                   u16 buffer_advanced2,
+                                   u16 buffer_advanced3)
+{
+  vnet_feature_main_t *fm = &feature_main;
+  vnet_feature_config_main_t *cm;
+  u8 feature_arc_index = fm->device_input_feature_arc_index;
+  cm = &fm->feature_config_mains[feature_arc_index];
+
+  if (PREDICT_FALSE
+      (clib_bitmap_get
+       (fm->sw_if_index_has_features[feature_arc_index], sw_if_index)))
+    {
+      /*
+       * Save next0 so that the last feature in the chain
+       * can skip ethernet-input if indicated...
+       */
+      vnet_buffer (b0)->device_input_feat.saved_next_index = *next0;
+      vnet_buffer (b1)->device_input_feat.saved_next_index = *next1;
+      vnet_buffer (b2)->device_input_feat.saved_next_index = *next2;
+      vnet_buffer (b3)->device_input_feat.saved_next_index = *next3;
+
+      vnet_buffer (b0)->device_input_feat.buffer_advance = buffer_advanced0;
+      vnet_buffer (b1)->device_input_feat.buffer_advance = buffer_advanced1;
+      vnet_buffer (b2)->device_input_feat.buffer_advance = buffer_advanced2;
+      vnet_buffer (b3)->device_input_feat.buffer_advance = buffer_advanced3;
+
+      vlib_buffer_advance (b0, -buffer_advanced0);
+      vlib_buffer_advance (b1, -buffer_advanced1);
+      vlib_buffer_advance (b2, -buffer_advanced2);
+      vlib_buffer_advance (b3, -buffer_advanced3);
+
+      b0->feature_arc_index = feature_arc_index;
+      b1->feature_arc_index = feature_arc_index;
+      b2->feature_arc_index = feature_arc_index;
+      b3->feature_arc_index = feature_arc_index;
+
+      b0->current_config_index =
+       vec_elt (cm->config_index_by_sw_if_index, sw_if_index);
+      b1->current_config_index = b0->current_config_index;
+      b2->current_config_index = b0->current_config_index;
+      b3->current_config_index = b0->current_config_index;
+
+      vnet_get_config_data (&cm->config_main, &b0->current_config_index,
+                           next0, /* # bytes of config data */ 0);
+      vnet_get_config_data (&cm->config_main, &b1->current_config_index,
+                           next1, /* # bytes of config data */ 0);
+      vnet_get_config_data (&cm->config_main, &b2->current_config_index,
+                           next2, /* # bytes of config data */ 0);
+      vnet_get_config_data (&cm->config_main, &b3->current_config_index,
+                           next3, /* # bytes of config data */ 0);
+    }
+}
 
 #define VNET_FEATURES(...)  (char*[]) { __VA_ARGS__, 0}