dpdk: cleanup, add rx queue struct 13/27413/2
authorDamjan Marion <damarion@cisco.com>
Wed, 3 Jun 2020 18:13:59 +0000 (20:13 +0200)
committerDamjan Marion <dmarion@me.com>
Thu, 4 Jun 2020 12:08:09 +0000 (12:08 +0000)
Type: improvement
Change-Id: I3defde103ab245404de42d2be7abcb2c43d49a60
Signed-off-by: Damjan Marion <damarion@cisco.com>
src/plugins/dpdk/device/common.c
src/plugins/dpdk/device/device.c
src/plugins/dpdk/device/dpdk.h
src/plugins/dpdk/device/init.c
src/plugins/dpdk/device/node.c

index 41b32a6..18d4555 100644 (file)
@@ -112,10 +112,11 @@ dpdk_device_setup (dpdk_device_t * xd)
        dpdk_device_error (xd, "rte_eth_tx_queue_setup", rv);
     }
 
-  vec_validate_aligned (xd->buffer_pool_for_queue, xd->rx_q_used - 1,
+  vec_validate_aligned (xd->rx_queues, xd->rx_q_used - 1,
                        CLIB_CACHE_LINE_BYTES);
   for (j = 0; j < xd->rx_q_used; j++)
     {
+      dpdk_rx_queue_t *rxq = vec_elt_at_index (xd->rx_queues, j);
       uword tidx = vnet_get_device_input_thread_index (dm->vnet_main,
                                                       xd->hw_if_index, j);
       unsigned lcore = vlib_worker_threads[tidx].cpu_id;
@@ -132,7 +133,7 @@ dpdk_device_setup (dpdk_device_t * xd)
        rv = rte_eth_rx_queue_setup (xd->port_id, j, xd->nb_rx_desc,
                                     SOCKET_ID_ANY, 0, mp);
 
-      xd->buffer_pool_for_queue[j] = bp->index;
+      rxq->buffer_pool_index = bp->index;
 
       if (rv < 0)
        dpdk_device_error (xd, "rte_eth_rx_queue_setup", rv);
index 1ef2c5d..2467b7d 100644 (file)
@@ -158,26 +158,18 @@ static_always_inline
                                struct rte_mbuf **mb, u32 n_left)
 {
   dpdk_main_t *dm = &dpdk_main;
+  dpdk_rx_queue_t *rxq;
   u32 n_retry;
   int n_sent = 0;
   int queue_id;
 
   n_retry = 16;
-  queue_id = vm->thread_index;
+  queue_id = vm->thread_index % xd->tx_q_used;
+  rxq = vec_elt_at_index (xd->rx_queues, queue_id);
 
   do
     {
-      /*
-       * This device only supports one TX queue,
-       * and we're running multi-threaded...
-       */
-      if (PREDICT_FALSE (xd->lockp != 0))
-       {
-         queue_id = queue_id % xd->tx_q_used;
-         while (clib_atomic_test_and_set (xd->lockp[queue_id]))
-           /* zzzz */
-           queue_id = (queue_id + 1) % xd->tx_q_used;
-       }
+      clib_spinlock_lock_if_init (&rxq->lock);
 
       if (PREDICT_TRUE (xd->flags & DPDK_DEVICE_FLAG_PMD))
        {
@@ -191,8 +183,7 @@ static_always_inline
          n_sent = 0;
        }
 
-      if (PREDICT_FALSE (xd->lockp != 0))
-       clib_atomic_release (xd->lockp[queue_id]);
+      clib_spinlock_unlock_if_init (&rxq->lock);
 
       if (PREDICT_FALSE (n_sent < 0))
        {
@@ -205,8 +196,6 @@ static_always_inline
                                         xd->hw_if_index)->tx_node_index;
 
          vlib_error_count (vm, node_index, DPDK_TX_FUNC_ERROR_BAD_RETVAL, 1);
-         clib_warning ("rte_eth_tx_burst[%d]: error %d",
-                       xd->port_id, n_sent);
          return n_left;        // untransmitted packets
        }
       n_left -= n_sent;
index 46d36a2..0ace4f3 100644 (file)
@@ -156,13 +156,16 @@ typedef struct
 typedef struct
 {
   CLIB_CACHE_LINE_ALIGN_MARK (cacheline0);
-  volatile u32 **lockp;
+  clib_spinlock_t lock;
+  u8 buffer_pool_index;
+} dpdk_rx_queue_t;
 
-  /* Instance ID to access internal device array. */
-  dpdk_portid_t device_index;
+typedef struct
+{
+  CLIB_CACHE_LINE_ALIGN_MARK (cacheline0);
 
-  /* DPDK device port number */
-  dpdk_portid_t port_id;
+  /* Instance ID to access internal device array. */
+  u32 device_index;
 
   u32 hw_if_index;
   u32 sw_if_index;
@@ -170,13 +173,20 @@ typedef struct
   /* next node index if we decide to steal the rx graph arc */
   u32 per_interface_next_index;
 
+  /* DPDK device port number */
+  dpdk_portid_t port_id;
   dpdk_pmd_t pmd:8;
   i8 cpu_socket;
 
   u16 flags;
 
-  u16 nb_tx_desc;
+  dpdk_rx_queue_t *rx_queues;
+  u16 tx_q_used;
+
     CLIB_CACHE_LINE_ALIGN_MARK (cacheline1);
+  u16 rx_q_used;
+  u16 nb_tx_desc;
+  u16 nb_rx_desc;
 
   u8 *name;
   u8 *interface_name_suffix;
@@ -185,11 +195,6 @@ typedef struct
   u16 num_subifs;
 
   /* PMD related */
-  u16 tx_q_used;
-  u16 rx_q_used;
-  u16 nb_rx_desc;
-  u16 *cpu_socket_id_by_queue;
-  u8 *buffer_pool_for_queue;
   struct rte_eth_conf port_conf;
   struct rte_eth_txconf tx_conf;
 
@@ -226,21 +231,6 @@ typedef struct
 #define DPDK_LINK_POLL_INTERVAL       (3.0)
 #define DPDK_MIN_LINK_POLL_INTERVAL   (0.001)  /* 1msec */
 
-typedef struct
-{
-  u32 device;
-  u16 queue_id;
-} dpdk_device_and_queue_t;
-
-#ifndef DPDK_HQOS_DBG_BYPASS
-#define DPDK_HQOS_DBG_BYPASS 0
-#endif
-
-#ifndef HQOS_FLUSH_COUNT_THRESHOLD
-#define HQOS_FLUSH_COUNT_THRESHOLD              100000
-#endif
-
-
 #define foreach_dpdk_device_config_item \
   _ (num_rx_queues) \
   _ (num_tx_queues) \
index 7911ccc..b69e2b1 100644 (file)
@@ -143,19 +143,6 @@ dpdk_flag_change (vnet_main_t * vnm, vnet_hw_interface_t * hi, u32 flags)
   return old;
 }
 
-static void
-dpdk_device_lock_init (dpdk_device_t * xd)
-{
-  int q;
-  vec_validate (xd->lockp, xd->tx_q_used - 1);
-  for (q = 0; q < xd->tx_q_used; q++)
-    {
-      xd->lockp[q] = clib_mem_alloc_aligned (CLIB_CACHE_LINE_BYTES,
-                                            CLIB_CACHE_LINE_BYTES);
-      clib_memset ((void *) xd->lockp[q], 0, CLIB_CACHE_LINE_BYTES);
-    }
-}
-
 static int
 dpdk_port_crc_strip_enabled (dpdk_device_t * xd)
 {
@@ -623,7 +610,11 @@ dpdk_lib_init (dpdk_main_t * dm)
        rte_eth_macaddr_get (i, (void *) addr);
 
       if (xd->tx_q_used < tm->n_vlib_mains)
-       dpdk_device_lock_init (xd);
+        for (int q = 0; q < xd->tx_q_used; q++)
+         {
+           dpdk_rx_queue_t *rxq = vec_elt_at_index (xd->rx_queues, q);
+           clib_spinlock_init (&rxq->lock);
+         }
 
       xd->port_id = i;
       xd->device_index = xd - dm->devices;
index a16d485..256c399 100644 (file)
@@ -287,6 +287,7 @@ dpdk_device_input (vlib_main_t * vm, dpdk_main_t * dm, dpdk_device_t * xd,
                   vlib_node_runtime_t * node, u32 thread_index, u16 queue_id)
 {
   uword n_rx_packets = 0, n_rx_bytes;
+  dpdk_rx_queue_t *rxq = vec_elt_at_index (xd->rx_queues, queue_id);
   u32 n_left, n_trace;
   u32 *buffers;
   u32 next_index = VNET_DEVICE_INPUT_NEXT_ETHERNET_INPUT;
@@ -324,7 +325,7 @@ dpdk_device_input (vlib_main_t * vm, dpdk_main_t * dm, dpdk_device_t * xd,
   bt->error = node->errors[DPDK_ERROR_NONE];
   /* as DPDK is allocating empty buffers from mempool provided before interface
      start for each queue, it is safe to store this in the template */
-  bt->buffer_pool_index = xd->buffer_pool_for_queue[queue_id];
+  bt->buffer_pool_index = rxq->buffer_pool_index;
   bt->ref_count = 1;
   vnet_buffer (bt)->feature_arc_index = 0;
   bt->current_config_index = 0;