dpdk: tx code rework 44/12444/6
authorDamjan Marion <damarion@cisco.com>
Sat, 5 May 2018 10:30:28 +0000 (12:30 +0200)
committerDamjan Marion <dmarion.lists@gmail.com>
Wed, 9 May 2018 09:32:39 +0000 (09:32 +0000)
Change-Id: Ifea9c772e8784642433b92091f5769eb9ec06890
Signed-off-by: Damjan Marion <damarion@cisco.com>
src/plugins/dpdk/device/device.c
src/plugins/dpdk/device/dpdk.h
src/plugins/dpdk/device/init.c
src/vppinfra/vector_avx2.h

index 9ae3f9c..044c872 100644 (file)
@@ -26,7 +26,6 @@
 
 #define foreach_dpdk_tx_func_error                     \
   _(BAD_RETVAL, "DPDK tx function returned an error")  \
-  _(RING_FULL, "Tx packet drops (ring full)")          \
   _(PKT_DROP, "Tx packet drops (dpdk tx failure)")     \
   _(REPL_FAIL, "Tx packet drops (replication failure)")
 
@@ -111,10 +110,9 @@ dpdk_replicate_packet_mb (vlib_buffer_t * b)
 }
 
 static void
-dpdk_tx_trace_buffer (dpdk_main_t * dm,
-                     vlib_node_runtime_t * node,
-                     dpdk_device_t * xd,
-                     u16 queue_id, u32 buffer_index, vlib_buffer_t * buffer)
+dpdk_tx_trace_buffer (dpdk_main_t * dm, vlib_node_runtime_t * node,
+                     dpdk_device_t * xd, u16 queue_id,
+                     vlib_buffer_t * buffer)
 {
   vlib_main_t *vm = vlib_get_main ();
   dpdk_tx_trace_t *t0;
@@ -125,7 +123,7 @@ dpdk_tx_trace_buffer (dpdk_main_t * dm,
   t0 = vlib_add_trace (vm, node, buffer, sizeof (t0[0]));
   t0->queue_index = queue_id;
   t0->device_index = xd->device_index;
-  t0->buffer_index = buffer_index;
+  t0->buffer_index = vlib_get_buffer_index (vm, buffer);
   clib_memcpy (&t0->mb, mb, sizeof (t0->mb));
   clib_memcpy (&t0->buffer, buffer,
               sizeof (buffer[0]) - sizeof (buffer->pre_data));
@@ -181,58 +179,26 @@ dpdk_validate_rte_mbuf (vlib_main_t * vm, vlib_buffer_t * b,
 }
 
 /*
- * This function calls the dpdk's tx_burst function to transmit the packets
- * on the tx_vector. It manages a lock per-device if the device does not
+ * This function calls the dpdk's tx_burst function to transmit the packets.
+ * It manages a lock per-device if the device does not
  * support multiple queues. It returns the number of packets untransmitted
- * on the tx_vector. If all packets are transmitted (the normal case), the
- * function returns 0.
- *
- * The function assumes there is at least one packet on the tx_vector.
+ * If all packets are transmitted (the normal case), the function returns 0.
  */
 static_always_inline
   u32 tx_burst_vector_internal (vlib_main_t * vm,
                                dpdk_device_t * xd,
-                               struct rte_mbuf **tx_vector)
+                               struct rte_mbuf **mb, u32 n_left)
 {
   dpdk_main_t *dm = &dpdk_main;
-  u32 n_packets;
-  u32 tx_head;
-  u32 tx_tail;
   u32 n_retry;
-  int rv;
+  int n_sent = 0;
   int queue_id;
-  tx_ring_hdr_t *ring;
-
-  ring = vec_header (tx_vector, sizeof (*ring));
-
-  n_packets = ring->tx_head - ring->tx_tail;
-
-  tx_head = ring->tx_head % xd->nb_tx_desc;
-
-  /*
-   * Ensure rte_eth_tx_burst is not called with 0 packets, which can lead to
-   * unpredictable results.
-   */
-  ASSERT (n_packets > 0);
-
-  /*
-   * Check for tx_vector overflow. If this fails it is a system configuration
-   * error. The ring should be sized big enough to handle the largest un-flowed
-   * off burst from a traffic manager. A larger size also helps performance
-   * a bit because it decreases the probability of having to issue two tx_burst
-   * calls due to a ring wrap.
-   */
-  ASSERT (n_packets < xd->nb_tx_desc);
-  ASSERT (ring->tx_tail == 0);
 
   n_retry = 16;
   queue_id = vm->thread_index;
 
   do
     {
-      /* start the burst at the tail */
-      tx_tail = ring->tx_tail % xd->nb_tx_desc;
-
       /*
        * This device only supports one TX queue,
        * and we're running multi-threaded...
@@ -253,30 +219,25 @@ static_always_inline
 
          ASSERT (hqos->swq != NULL);
 
-         dpdk_hqos_metadata_set (hqos,
-                                 &tx_vector[tx_tail], tx_head - tx_tail);
-         rv = rte_ring_sp_enqueue_burst (hqos->swq,
-                                         (void **) &tx_vector[tx_tail],
-                                         (uint16_t) (tx_head - tx_tail), 0);
+         dpdk_hqos_metadata_set (hqos, mb, n_left);
+         n_sent = rte_ring_sp_enqueue_burst (hqos->swq, (void **) mb,
+                                             n_left, 0);
        }
       else if (PREDICT_TRUE (xd->flags & DPDK_DEVICE_FLAG_PMD))
        {
          /* no wrap, transmit in one burst */
-         rv = rte_eth_tx_burst (xd->device_index,
-                                (uint16_t) queue_id,
-                                &tx_vector[tx_tail],
-                                (uint16_t) (tx_head - tx_tail));
+         n_sent = rte_eth_tx_burst (xd->device_index, queue_id, mb, n_left);
        }
       else
        {
          ASSERT (0);
-         rv = 0;
+         n_sent = 0;
        }
 
       if (PREDICT_FALSE (xd->lockp != 0))
        *xd->lockp[queue_id] = 0;
 
-      if (PREDICT_FALSE (rv < 0))
+      if (PREDICT_FALSE (n_sent < 0))
        {
          // emit non-fatal message, bump counter
          vnet_main_t *vnm = dm->vnet_main;
@@ -288,24 +249,21 @@ static_always_inline
 
          vlib_error_count (vm, node_index, DPDK_TX_FUNC_ERROR_BAD_RETVAL, 1);
          clib_warning ("rte_eth_tx_burst[%d]: error %d", xd->device_index,
-                       rv);
-         return n_packets;     // untransmitted packets
+                       n_sent);
+         return n_left;        // untransmitted packets
        }
-      ring->tx_tail += (u16) rv;
-      n_packets -= (uint16_t) rv;
+      n_left -= n_sent;
+      mb += n_sent;
     }
-  while (rv && n_packets && (n_retry > 0));
+  while (n_sent && n_left && (n_retry > 0));
 
-  return n_packets;
+  return n_left;
 }
 
 static_always_inline void
-dpdk_prefetch_buffer_by_index (vlib_main_t * vm, u32 bi)
+dpdk_prefetch_buffer (vlib_main_t * vm, struct rte_mbuf *mb)
 {
-  vlib_buffer_t *b;
-  struct rte_mbuf *mb;
-  b = vlib_get_buffer (vm, bi);
-  mb = rte_mbuf_from_vlib_buffer (b);
+  vlib_buffer_t *b = vlib_buffer_from_rte_mbuf (mb);
   CLIB_PREFETCH (mb, 2 * CLIB_CACHE_LINE_BYTES, STORE);
   CLIB_PREFETCH (b, CLIB_CACHE_LINE_BYTES, LOAD);
 }
@@ -315,7 +273,6 @@ dpdk_buffer_recycle (vlib_main_t * vm, vlib_node_runtime_t * node,
                     vlib_buffer_t * b, u32 bi, struct rte_mbuf **mbp)
 {
   dpdk_main_t *dm = &dpdk_main;
-  u32 my_cpu = vm->thread_index;
   struct rte_mbuf *mb_new;
 
   if (PREDICT_FALSE (b->flags & VLIB_BUFFER_RECYCLE) == 0)
@@ -331,7 +288,7 @@ dpdk_buffer_recycle (vlib_main_t * vm, vlib_node_runtime_t * node,
   else
     *mbp = mb_new;
 
-  vec_add1 (dm->recycle[my_cpu], bi);
+  vec_add1 (dm->recycle[vm->thread_index], bi);
 }
 
 static_always_inline void
@@ -367,9 +324,8 @@ dpdk_buffer_tx_offload (dpdk_device_t * xd, vlib_buffer_t * b,
 
 /*
  * Transmits the packets on the frame to the interface associated with the
- * node. It first copies packets on the frame to a tx_vector containing the
- * rte_mbuf pointers. It then passes this vector to tx_burst_vector_internal
- * which calls the dpdk tx_burst function.
+ * node. It first copies packets on the frame to a per-thread arrays
+ * containing the rte_mbuf pointers.
  */
 uword
 CLIB_MULTIARCH_FN (dpdk_interface_tx) (vlib_main_t * vm,
@@ -382,46 +338,25 @@ CLIB_MULTIARCH_FN (dpdk_interface_tx) (vlib_main_t * vm,
   u32 n_packets = f->n_vectors;
   u32 n_left;
   u32 *from;
-  struct rte_mbuf **tx_vector;
-  u16 i;
-  u16 nb_tx_desc = xd->nb_tx_desc;
-  int queue_id;
-  u32 my_cpu;
-  u32 tx_pkts = 0;
-  tx_ring_hdr_t *ring;
-  u32 n_on_ring;
-
-  my_cpu = vm->thread_index;
-
-  queue_id = my_cpu;
-
-  tx_vector = xd->tx_vectors[queue_id];
-  ring = vec_header (tx_vector, sizeof (*ring));
+  u32 thread_index = vm->thread_index;
+  int queue_id = thread_index;
+  u32 tx_pkts = 0, all_or_flags = 0;
+  dpdk_per_thread_data_t *ptd = vec_elt_at_index (dm->per_thread_data,
+                                                 thread_index);
+  struct rte_mbuf **mb;
+  vlib_buffer_t *b[4];
+#ifdef CLIB_HAVE_VEC256
+  u64x4 off4 = u64x4_splat (buffer_main.buffer_mem_start -
+                           sizeof (struct rte_mbuf));
+  u32x8 permute_mask = { 0, 4, 1, 5, 2, 6, 3, 7 };
+  u32x8 zero = { 0 };
+#endif
 
-  n_on_ring = ring->tx_head - ring->tx_tail;
   from = vlib_frame_vector_args (f);
 
   ASSERT (n_packets <= VLIB_FRAME_SIZE);
 
-  if (PREDICT_FALSE (n_on_ring + n_packets > nb_tx_desc))
-    {
-      /*
-       * Overflowing the ring should never happen.
-       * If it does then drop the whole frame.
-       */
-      vlib_error_count (vm, node->node_index, DPDK_TX_FUNC_ERROR_RING_FULL,
-                       n_packets);
-
-      while (n_packets--)
-       {
-         u32 bi0 = from[n_packets];
-         vlib_buffer_t *b0 = vlib_get_buffer (vm, bi0);
-         struct rte_mbuf *mb0 = rte_mbuf_from_vlib_buffer (b0);
-         rte_pktmbuf_free (mb0);
-       }
-      return n_on_ring;
-    }
-
+  /* TX PCAP tracing */
   if (PREDICT_FALSE (dm->tx_pcap_enable))
     {
       n_left = n_packets;
@@ -437,170 +372,162 @@ CLIB_MULTIARCH_FN (dpdk_interface_tx) (vlib_main_t * vm,
        }
     }
 
+  /* calculate rte_mbuf pointers out of buffer indices */
+  from = vlib_frame_vector_args (f);
+  n_left = n_packets;
+  mb = ptd->mbufs;
+  while (n_left >= 8)
+    {
+#ifdef CLIB_HAVE_VEC256
+      u32x8 bi0, bi1;
+      u64x4 mb0, mb1;
+      /* load 4 bufer indices into lower part of 256-bit register */
+      bi0 = u32x8_insert_lo (zero, u32x4_load_unaligned (from));
+      bi1 = u32x8_insert_lo (zero, u32x4_load_unaligned (from + 4));
+      /* permute 256-bit register so each buffer index is in own u64 */
+      mb0 = (u64x4) u32x8_permute (bi0, permute_mask);
+      mb1 = (u64x4) u32x8_permute (bi1, permute_mask);
+      /* shift and add to get rte_mbuf pointer */
+      mb0 <<= CLIB_LOG2_CACHE_LINE_BYTES;
+      mb1 <<= CLIB_LOG2_CACHE_LINE_BYTES;
+      u64x4_store_unaligned (mb0 + off4, mb);
+      u64x4_store_unaligned (mb1 + off4, mb + 4);
+#else
+      mb[0] = rte_mbuf_from_vlib_buffer (vlib_get_buffer (vm, from[0]));
+      mb[1] = rte_mbuf_from_vlib_buffer (vlib_get_buffer (vm, from[1]));
+      mb[2] = rte_mbuf_from_vlib_buffer (vlib_get_buffer (vm, from[2]));
+      mb[3] = rte_mbuf_from_vlib_buffer (vlib_get_buffer (vm, from[3]));
+      mb[4] = rte_mbuf_from_vlib_buffer (vlib_get_buffer (vm, from[4]));
+      mb[5] = rte_mbuf_from_vlib_buffer (vlib_get_buffer (vm, from[5]));
+      mb[6] = rte_mbuf_from_vlib_buffer (vlib_get_buffer (vm, from[6]));
+      mb[7] = rte_mbuf_from_vlib_buffer (vlib_get_buffer (vm, from[7]));
+#endif
+      from += 8;
+      mb += 8;
+      n_left -= 8;
+    }
+  while (n_left)
+    {
+      mb[0] = rte_mbuf_from_vlib_buffer (vlib_get_buffer (vm, from[0]));
+      from++;
+      mb++;
+      n_left--;
+    }
   from = vlib_frame_vector_args (f);
   n_left = n_packets;
-  i = ring->tx_head % nb_tx_desc;
+  mb = ptd->mbufs;
 
   while (n_left >= 8)
     {
-      u32 bi0, bi1, bi2, bi3;
-      struct rte_mbuf *mb0, *mb1, *mb2, *mb3;
-      vlib_buffer_t *b0, *b1, *b2, *b3;
       u32 or_flags;
 
-      dpdk_prefetch_buffer_by_index (vm, from[4]);
-      dpdk_prefetch_buffer_by_index (vm, from[5]);
-      dpdk_prefetch_buffer_by_index (vm, from[6]);
-      dpdk_prefetch_buffer_by_index (vm, from[7]);
+      dpdk_prefetch_buffer (vm, mb[4]);
+      dpdk_prefetch_buffer (vm, mb[5]);
+      dpdk_prefetch_buffer (vm, mb[6]);
+      dpdk_prefetch_buffer (vm, mb[7]);
 
-      bi0 = from[0];
-      bi1 = from[1];
-      bi2 = from[2];
-      bi3 = from[3];
-      from += 4;
+      b[0] = vlib_buffer_from_rte_mbuf (mb[0]);
+      b[1] = vlib_buffer_from_rte_mbuf (mb[1]);
+      b[2] = vlib_buffer_from_rte_mbuf (mb[2]);
+      b[3] = vlib_buffer_from_rte_mbuf (mb[3]);
 
-      b0 = vlib_get_buffer (vm, bi0);
-      b1 = vlib_get_buffer (vm, bi1);
-      b2 = vlib_get_buffer (vm, bi2);
-      b3 = vlib_get_buffer (vm, bi3);
+      or_flags = b[0]->flags | b[1]->flags | b[2]->flags | b[3]->flags;
+      all_or_flags |= or_flags;
 
-      or_flags = b0->flags | b1->flags | b2->flags | b3->flags;
-
-      VLIB_BUFFER_TRACE_TRAJECTORY_INIT (b0);
-      VLIB_BUFFER_TRACE_TRAJECTORY_INIT (b1);
-      VLIB_BUFFER_TRACE_TRAJECTORY_INIT (b2);
-      VLIB_BUFFER_TRACE_TRAJECTORY_INIT (b3);
+      VLIB_BUFFER_TRACE_TRAJECTORY_INIT (b[0]);
+      VLIB_BUFFER_TRACE_TRAJECTORY_INIT (b[1]);
+      VLIB_BUFFER_TRACE_TRAJECTORY_INIT (b[2]);
+      VLIB_BUFFER_TRACE_TRAJECTORY_INIT (b[3]);
 
       if (or_flags & VLIB_BUFFER_NEXT_PRESENT)
        {
-         dpdk_validate_rte_mbuf (vm, b0, 1);
-         dpdk_validate_rte_mbuf (vm, b1, 1);
-         dpdk_validate_rte_mbuf (vm, b2, 1);
-         dpdk_validate_rte_mbuf (vm, b3, 1);
+         dpdk_validate_rte_mbuf (vm, b[0], 1);
+         dpdk_validate_rte_mbuf (vm, b[1], 1);
+         dpdk_validate_rte_mbuf (vm, b[2], 1);
+         dpdk_validate_rte_mbuf (vm, b[3], 1);
        }
       else
        {
-         dpdk_validate_rte_mbuf (vm, b0, 0);
-         dpdk_validate_rte_mbuf (vm, b1, 0);
-         dpdk_validate_rte_mbuf (vm, b2, 0);
-         dpdk_validate_rte_mbuf (vm, b3, 0);
+         dpdk_validate_rte_mbuf (vm, b[0], 0);
+         dpdk_validate_rte_mbuf (vm, b[1], 0);
+         dpdk_validate_rte_mbuf (vm, b[2], 0);
+         dpdk_validate_rte_mbuf (vm, b[3], 0);
        }
 
-      mb0 = rte_mbuf_from_vlib_buffer (b0);
-      mb1 = rte_mbuf_from_vlib_buffer (b1);
-      mb2 = rte_mbuf_from_vlib_buffer (b2);
-      mb3 = rte_mbuf_from_vlib_buffer (b3);
-
       if (PREDICT_FALSE ((xd->flags & DPDK_DEVICE_FLAG_TX_OFFLOAD) &&
                         (or_flags &
                          (VNET_BUFFER_F_OFFLOAD_TCP_CKSUM
                           | VNET_BUFFER_F_OFFLOAD_IP_CKSUM
                           | VNET_BUFFER_F_OFFLOAD_UDP_CKSUM))))
        {
-         dpdk_buffer_tx_offload (xd, b0, mb0);
-         dpdk_buffer_tx_offload (xd, b1, mb1);
-         dpdk_buffer_tx_offload (xd, b2, mb2);
-         dpdk_buffer_tx_offload (xd, b3, mb3);
-       }
-
-      if (PREDICT_FALSE (or_flags & VLIB_BUFFER_RECYCLE))
-       {
-         dpdk_buffer_recycle (vm, node, b0, bi0, &mb0);
-         dpdk_buffer_recycle (vm, node, b1, bi1, &mb1);
-         dpdk_buffer_recycle (vm, node, b2, bi2, &mb2);
-         dpdk_buffer_recycle (vm, node, b3, bi3, &mb3);
-
-         /* dont enqueue packets if replication failed as they must
-            be sent back to recycle */
-         if (PREDICT_TRUE ((b0->flags & VLIB_BUFFER_REPL_FAIL) == 0))
-           tx_vector[i++ % nb_tx_desc] = mb0;
-         if (PREDICT_TRUE ((b1->flags & VLIB_BUFFER_REPL_FAIL) == 0))
-           tx_vector[i++ % nb_tx_desc] = mb1;
-         if (PREDICT_TRUE ((b2->flags & VLIB_BUFFER_REPL_FAIL) == 0))
-           tx_vector[i++ % nb_tx_desc] = mb2;
-         if (PREDICT_TRUE ((b3->flags & VLIB_BUFFER_REPL_FAIL) == 0))
-           tx_vector[i++ % nb_tx_desc] = mb3;
-       }
-      else
-       {
-         if (PREDICT_FALSE (i + 3 >= nb_tx_desc))
-           {
-             tx_vector[i++ % nb_tx_desc] = mb0;
-             tx_vector[i++ % nb_tx_desc] = mb1;
-             tx_vector[i++ % nb_tx_desc] = mb2;
-             tx_vector[i++ % nb_tx_desc] = mb3;
-             i %= nb_tx_desc;
-           }
-         else
-           {
-             tx_vector[i++] = mb0;
-             tx_vector[i++] = mb1;
-             tx_vector[i++] = mb2;
-             tx_vector[i++] = mb3;
-           }
+         dpdk_buffer_tx_offload (xd, b[0], mb[0]);
+         dpdk_buffer_tx_offload (xd, b[1], mb[1]);
+         dpdk_buffer_tx_offload (xd, b[2], mb[2]);
+         dpdk_buffer_tx_offload (xd, b[3], mb[3]);
        }
 
-
       if (PREDICT_FALSE (node->flags & VLIB_NODE_FLAG_TRACE))
        {
-         if (b0->flags & VLIB_BUFFER_IS_TRACED)
-           dpdk_tx_trace_buffer (dm, node, xd, queue_id, bi0, b0);
-         if (b1->flags & VLIB_BUFFER_IS_TRACED)
-           dpdk_tx_trace_buffer (dm, node, xd, queue_id, bi1, b1);
-         if (b2->flags & VLIB_BUFFER_IS_TRACED)
-           dpdk_tx_trace_buffer (dm, node, xd, queue_id, bi2, b2);
-         if (b3->flags & VLIB_BUFFER_IS_TRACED)
-           dpdk_tx_trace_buffer (dm, node, xd, queue_id, bi3, b3);
+         if (b[0]->flags & VLIB_BUFFER_IS_TRACED)
+           dpdk_tx_trace_buffer (dm, node, xd, queue_id, b[0]);
+         if (b[1]->flags & VLIB_BUFFER_IS_TRACED)
+           dpdk_tx_trace_buffer (dm, node, xd, queue_id, b[1]);
+         if (b[2]->flags & VLIB_BUFFER_IS_TRACED)
+           dpdk_tx_trace_buffer (dm, node, xd, queue_id, b[2]);
+         if (b[3]->flags & VLIB_BUFFER_IS_TRACED)
+           dpdk_tx_trace_buffer (dm, node, xd, queue_id, b[3]);
        }
 
+      mb += 4;
       n_left -= 4;
     }
   while (n_left > 0)
     {
-      u32 bi0;
-      struct rte_mbuf *mb0;
-      vlib_buffer_t *b0;
+      b[0] = vlib_buffer_from_rte_mbuf (mb[0]);
+      all_or_flags |= b[0]->flags;
+      VLIB_BUFFER_TRACE_TRAJECTORY_INIT (b[0]);
 
-      bi0 = from[0];
-      from++;
+      dpdk_validate_rte_mbuf (vm, b[0], 1);
+      dpdk_buffer_tx_offload (xd, b[0], mb[0]);
 
-      b0 = vlib_get_buffer (vm, bi0);
-      VLIB_BUFFER_TRACE_TRAJECTORY_INIT (b0);
+      if (PREDICT_FALSE (node->flags & VLIB_NODE_FLAG_TRACE))
+       if (b[0]->flags & VLIB_BUFFER_IS_TRACED)
+         dpdk_tx_trace_buffer (dm, node, xd, queue_id, b[0]);
 
-      dpdk_validate_rte_mbuf (vm, b0, 1);
+      mb++;
+      n_left--;
+    }
 
-      mb0 = rte_mbuf_from_vlib_buffer (b0);
-      dpdk_buffer_tx_offload (xd, b0, mb0);
-      dpdk_buffer_recycle (vm, node, b0, bi0, &mb0);
+  /* run inly if we have buffers to recycle */
+  if (PREDICT_FALSE (all_or_flags & VLIB_BUFFER_RECYCLE))
+    {
+      struct rte_mbuf **mb_old;
+      from = vlib_frame_vector_args (f);
+      n_left = n_packets;
+      mb_old = mb = ptd->mbufs;
+      while (n_left > 0)
+       {
+         b[0] = vlib_buffer_from_rte_mbuf (mb[0]);
+         dpdk_buffer_recycle (vm, node, b[0], from[0], &mb_old[0]);
 
-      if (PREDICT_FALSE (node->flags & VLIB_NODE_FLAG_TRACE))
-       if (b0->flags & VLIB_BUFFER_IS_TRACED)
-         dpdk_tx_trace_buffer (dm, node, xd, queue_id, bi0, b0);
+         /* in case of REPL_FAIL we need to shift data */
+         mb[0] = mb_old[0];
 
-      if (PREDICT_TRUE ((b0->flags & VLIB_BUFFER_REPL_FAIL) == 0))
-       {
-         tx_vector[i % nb_tx_desc] = mb0;
-         i++;
+         if (PREDICT_TRUE ((b[0]->flags & VLIB_BUFFER_REPL_FAIL) == 0))
+           mb++;
+         mb_old++;
+         from++;
+         n_left--;
        }
-      n_left--;
     }
 
-  /* account for additional packets in the ring */
-  ring->tx_head += n_packets;
-  n_on_ring = ring->tx_head - ring->tx_tail;
-
   /* transmit as many packets as possible */
-  n_packets = tx_burst_vector_internal (vm, xd, tx_vector);
-
-  /*
-   * tx_pkts is the number of packets successfully transmitted
-   * This is the number originally on ring minus the number remaining on ring
-   */
-  tx_pkts = n_on_ring - n_packets;
+  n_packets = mb - ptd->mbufs;
+  n_left = tx_burst_vector_internal (vm, xd, ptd->mbufs, n_packets);
 
   {
     /* If there is no callback then drop any non-transmitted packets */
-    if (PREDICT_FALSE (n_packets))
+    if (PREDICT_FALSE (n_left))
       {
        vlib_simple_counter_main_t *cm;
        vnet_main_t *vnm = vnet_get_main ();
@@ -608,31 +535,25 @@ CLIB_MULTIARCH_FN (dpdk_interface_tx) (vlib_main_t * vm,
        cm = vec_elt_at_index (vnm->interface_main.sw_if_counters,
                               VNET_INTERFACE_COUNTER_TX_ERROR);
 
-       vlib_increment_simple_counter (cm, my_cpu, xd->sw_if_index,
-                                      n_packets);
+       vlib_increment_simple_counter (cm, thread_index, xd->sw_if_index,
+                                      n_left);
 
        vlib_error_count (vm, node->node_index, DPDK_TX_FUNC_ERROR_PKT_DROP,
-                         n_packets);
+                         n_left);
 
-       while (n_packets--)
-         rte_pktmbuf_free (tx_vector[ring->tx_tail + n_packets]);
+       while (n_left--)
+         rte_pktmbuf_free (ptd->mbufs[n_packets - n_left]);
       }
-
-    /* Reset head/tail to avoid unnecessary wrap */
-    ring->tx_head = 0;
-    ring->tx_tail = 0;
   }
 
   /* Recycle replicated buffers */
-  if (PREDICT_FALSE (vec_len (dm->recycle[my_cpu])))
+  if (PREDICT_FALSE (vec_len (dm->recycle[thread_index])))
     {
-      vlib_buffer_free (vm, dm->recycle[my_cpu],
-                       vec_len (dm->recycle[my_cpu]));
-      _vec_len (dm->recycle[my_cpu]) = 0;
+      vlib_buffer_free (vm, dm->recycle[thread_index],
+                       vec_len (dm->recycle[thread_index]));
+      _vec_len (dm->recycle[thread_index]) = 0;
     }
 
-  ASSERT (ring->tx_head >= ring->tx_tail);
-
   return tx_pkts;
 }
 
index f02e718..0778659 100644 (file)
@@ -110,17 +110,6 @@ typedef enum
   VNET_DPDK_PORT_TYPE_UNKNOWN,
 } dpdk_port_type_t;
 
-/*
- * The header for the tx_vector in dpdk_device_t.
- * Head and tail are indexes into the tx_vector and are of type
- * u64 so they never overflow.
- */
-typedef struct
-{
-  u64 tx_head;
-  u64 tx_tail;
-} tx_ring_hdr_t;
-
 typedef uint16_t dpdk_portid_t;
 
 typedef struct
@@ -191,9 +180,6 @@ typedef struct
   /* next node index if we decide to steal the rx graph arc */
   u32 per_interface_next_index;
 
-  /* dpdk rte_mbuf rx and tx vectors, VLIB_FRAME_SIZE */
-  struct rte_mbuf ***tx_vectors;       /* one per worker thread */
-
   dpdk_pmd_t pmd:8;
   i8 cpu_socket;
 
index 9ed3efd..83d26ce 100755 (executable)
@@ -256,7 +256,6 @@ dpdk_lib_init (dpdk_main_t * dm)
     {
       u8 addr[6];
       u8 vlan_strip = 0;
-      int j;
       struct rte_eth_dev_info dev_info;
       struct rte_eth_link l;
       dpdk_device_config_t *devconf = 0;
@@ -537,15 +536,6 @@ dpdk_lib_init (dpdk_main_t * dm)
          dq->queue_id = 0;
        }
 
-      vec_validate_aligned (xd->tx_vectors, tm->n_vlib_mains,
-                           CLIB_CACHE_LINE_BYTES);
-      for (j = 0; j < tm->n_vlib_mains; j++)
-       {
-         vec_validate_ha (xd->tx_vectors[j], xd->nb_tx_desc,
-                          sizeof (tx_ring_hdr_t), CLIB_CACHE_LINE_BYTES);
-         vec_reset_length (xd->tx_vectors[j]);
-       }
-
       /* count the number of descriptors used for this device */
       nb_desc += xd->nb_rx_desc + xd->nb_tx_desc * xd->tx_q_used;
 
index ad7e7d4..9c1ce47 100644 (file)
@@ -69,6 +69,18 @@ u32x8_extract_hi (u32x8 v)
   return (u32x4) _mm256_extracti128_si256 ((__m256i) v, 1);
 }
 
+always_inline u32x8
+u32x8_insert_lo (u32x8 v1, u32x4 v2)
+{
+  return (u32x8) _mm256_inserti128_si256 ((__m256i) v1, (__m128i) v2, 0);
+}
+
+always_inline u32x8
+u32x8_insert_hi (u32x8 v1, u32x4 v2)
+{
+  return (u32x8) _mm256_inserti128_si256 ((__m256i) v1, (__m128i) v2, 1);
+}
+
 #endif /* included_vector_avx2_h */
 
 /*