Add support for multiple Rx and Tx queues 62/8462/10
authorMichal Mazur <[email protected]>
Tue, 19 Sep 2017 18:18:41 +0000 (20:18 +0200)
committerMichal Mazur <[email protected]>
Thu, 14 Dec 2017 14:18:05 +0000 (15:18 +0100)
 1) Incoming packets can be spread across multiple worker
    threads based on IP and TCP/UDP headers.
 2) Multiple output queues are used in Burst mode if supported
    by hardware (checked in interface capabilities)
 3) Synchronization of output traffic can be disabled due to
    multiple Tx queues - one for each thread.

Change-Id: Ib5ee18103c860eae3b56ffc453a5953c729bb521
Signed-off-by: Michal Mazur <[email protected]>
README.vppodp
src/plugins/odp/cli.c
src/plugins/odp/device.c
src/plugins/odp/node.c
src/plugins/odp/odp_packet.c
src/plugins/odp/odp_packet.h

index 7ff3c86..6e53698 100644 (file)
@@ -74,8 +74,9 @@ Below is a basic verification test.
 Note :For odp-dpdk the port has to bound with dpdk driver prior to test and interface name is passed as 0,1..etc.
 
 1)Configure odp packet interface with mode ie (0-burst,1-queue,2-schedule) default mode is 0.
+  Setting rx-queues to more than 1 will enable RSS.
 
--create pktio-interface name <int name> hw-addr <mac> mode <0/1/2>
+-create pktio-interface name <int name> hw-addr <mac> mode <0/1/2> rx-queues <number of input queues>
 -set int ip address odp-<int name> X.X.X.X/24
 -set int state odp-<int name> up
 
index e965022..4da921a 100755 (executable)
@@ -26,7 +26,8 @@ odp_packet_create_command_fn (vlib_main_t * vm, unformat_input_t * input,
   u8 hwaddr[6];
   u8 *hw_addr_ptr = 0;
   u32 sw_if_index;
-  u32 mode = 0;
+  u32 mode = APPL_MODE_PKT_BURST;
+  u32 rx_queues = 0;
   int r;
 
   if (!unformat_user (input, unformat_line_input, line_input))
@@ -43,6 +44,8 @@ odp_packet_create_command_fn (vlib_main_t * vm, unformat_input_t * input,
        hw_addr_ptr = hwaddr;
       else if (unformat (line_input, "mode %d", &mode))
        ;
+      else if (unformat (line_input, "rx-queues %d", &rx_queues))
+       ;
       else
        return clib_error_return (0, "unknown input `%U'",
                                  format_unformat_error, input);
@@ -52,8 +55,8 @@ odp_packet_create_command_fn (vlib_main_t * vm, unformat_input_t * input,
   if (host_if_name == NULL)
     return clib_error_return (0, "missing host interface name");
 
-  r =
-    odp_packet_create_if (vm, host_if_name, hw_addr_ptr, &sw_if_index, mode);
+  r = odp_packet_create_if (vm, host_if_name, hw_addr_ptr, &sw_if_index,
+                           mode, rx_queues);
   vec_free (host_if_name);
 
   if (r == VNET_API_ERROR_SYSCALL_ERROR_1)
index 3e11106..68397db 100755 (executable)
@@ -77,11 +77,12 @@ odp_packet_interface_tx (vlib_main_t * vm,
   u32 n_left = frame->n_vectors;
   vnet_interface_output_runtime_t *rd = (void *) node->runtime_data;
   odp_packet_if_t *oif = pool_elt_at_index (om->interfaces, rd->dev_instance);
-  odp_pktout_queue_t pktout;
+  uword queue_index = vlib_get_thread_index () % oif->tx_queues;
+  u32 mode = oif->mode;
   odp_packet_t pkt_tbl[VLIB_FRAME_SIZE];
-  u32 sent, count = 0;
+  odp_event_t evt_tbl[VLIB_FRAME_SIZE];
   vlib_buffer_t *b0;
-  u32 bi;
+  u32 bi, sent, count = 0;
 
   if (PREDICT_FALSE (oif->lockp != 0))
     {
@@ -89,11 +90,6 @@ odp_packet_interface_tx (vlib_main_t * vm,
        ;
     }
 
-  if (odp_pktout_queue (oif->pktio, &pktout, 1) != 1)
-    {
-      return -1;
-    }
-
   while (n_left > 0)
     {
       odp_packet_t pkt;
@@ -121,6 +117,8 @@ odp_packet_interface_tx (vlib_main_t * vm,
          else if (diff < 0)
            odp_packet_pull_tail (pkt, -diff);
          pkt_tbl[count] = pkt;
+         if (mode == APPL_MODE_PKT_QUEUE)
+           evt_tbl[count] = odp_packet_to_event (pkt_tbl[count]);
          count++;
          bi = b0->next_buffer;
        }
@@ -133,7 +131,22 @@ odp_packet_interface_tx (vlib_main_t * vm,
       sent = 0;
       while (count > 0)
        {
-         ret = odp_pktout_send (pktout, &pkt_tbl[sent], count);
+         switch (mode)
+           {
+           case APPL_MODE_PKT_SCHED:
+           case APPL_MODE_PKT_BURST:
+             ret =
+               odp_pktout_send (oif->outq[queue_index], &pkt_tbl[sent],
+                                count);
+             break;
+           case APPL_MODE_PKT_QUEUE:
+             ret = odp_queue_enq_multi (oif->txq[queue_index],
+                                        &evt_tbl[sent], count);
+             break;
+           default:
+             ret = 0;
+             clib_error ("Invalid mode\n");
+           }
          if (odp_unlikely (ret <= 0))
            {
              /* Drop one packet and try again */
index a04ee47..f7e02b7 100755 (executable)
@@ -66,43 +66,48 @@ odp_prefetch_ethertype (odp_packet_t pkt)
 }
 
 always_inline int
-odp_packet_queue_mode (odp_pktio_t pktio, u32 mode, odp_packet_t pkt_tbl[])
+odp_packet_queue_mode (odp_packet_if_t * oif, odp_packet_t pkt_tbl[],
+                      u32 queue_id, u32 req_pkts)
 {
   u32 num_evts = 0, num_pkts = 0;
   int i;
-  odp_queue_t inq;
-  odp_event_t evt_tbl[VLIB_FRAME_SIZE];
-  u64 sched_wait = odp_schedule_wait_time (ODP_TIME_MSEC_IN_NS * 100);
+  odp_pktio_t pktio = oif->pktio;
+  odp_event_t evt_tbl[req_pkts];
+  u64 sched_wait;
 
   if (pktio == ODP_PKTIO_INVALID)
     {
-      clib_warning ("odp_pktio_lookup() failed");
+      clib_warning ("invalid oif->pktio value");
       return 0;
     }
-
-  inq = ODP_QUEUE_INVALID;
-  if ((mode == APPL_MODE_PKT_QUEUE) &&
-      (odp_pktin_event_queue (pktio, &inq, 1) != 1))
+  if ((oif->mode == APPL_MODE_PKT_QUEUE) &&
+      (oif->rxq[queue_id] == ODP_QUEUE_INVALID))
     {
-      clib_warning ("Error:no input queue");
+      clib_warning ("invalid rxq[%d] queue", queue_id);
       return 0;
     }
 
-  while (num_evts < VLIB_FRAME_SIZE)
+  while (req_pkts)
     {
-      if (inq != ODP_QUEUE_INVALID)
-       i = odp_queue_deq_multi (inq, &evt_tbl[num_evts],
-                                VLIB_FRAME_SIZE - num_evts);
+      if (oif->mode == APPL_MODE_PKT_QUEUE)
+       {
+         i = odp_queue_deq_multi (oif->rxq[queue_id],
+                                  &evt_tbl[num_evts], req_pkts);
+       }
       else
-       i = odp_schedule_multi (NULL, sched_wait, &evt_tbl[num_evts],
-                               VLIB_FRAME_SIZE - num_evts);
+       {
+         sched_wait = odp_schedule_wait_time (ODP_TIME_USEC_IN_NS);
+         i = odp_schedule_multi (NULL, sched_wait,
+                                 &evt_tbl[num_evts], req_pkts);
+       }
       if (i <= 0)
        break;
       num_evts += i;
+      req_pkts -= i;
     }
 
   /* convert events to packets, discarding any non-packet events */
-  for (i = 0; i < num_evts; ++i)
+  for (i = 0; i < num_evts; i++)
     {
       if (odp_event_type (evt_tbl[i]) == ODP_EVENT_PACKET)
        pkt_tbl[num_pkts++] = odp_packet_from_event (evt_tbl[i]);
@@ -114,25 +119,20 @@ odp_packet_queue_mode (odp_pktio_t pktio, u32 mode, odp_packet_t pkt_tbl[])
 }
 
 always_inline int
-odp_packet_burst_mode (odp_pktio_t pktio, odp_pktin_queue_t pktin,
-                      odp_packet_t pkt_tbl[], u32 req_pkts)
+odp_packet_burst_mode (odp_packet_if_t * oif, odp_packet_t pkt_tbl[],
+                      u32 queue_id, u32 req_pkts)
 {
   u32 num_pkts = 0;
   int ret;
+  odp_pktin_queue_t inq = oif->inq[queue_id];
 
-  if (odp_pktin_queue (pktio, &pktin, 1) != 1)
-    {
-      clib_warning ("odp_pktio_open() failed: no pktin queue");
-      return 0;
-    }
-
-  while (num_pkts < req_pkts)
+  while (req_pkts)
     {
-      ret = odp_pktin_recv (pktin, &pkt_tbl[num_pkts],
-                           req_pkts - num_pkts);
+      ret = odp_pktin_recv (inq, &pkt_tbl[num_pkts], req_pkts);
       if (ret <= 0)
        break;
       num_pkts += ret;
+      req_pkts -= ret;
     }
 
   return num_pkts;
@@ -211,44 +211,40 @@ odp_trace_buffer_x4 (uword * n_trace, vlib_main_t * vm,
 
 always_inline uword
 odp_packet_device_input_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
-                           vlib_frame_t * frame, odp_packet_if_t * oif)
+                           vlib_frame_t * frame, odp_packet_if_t * oif,
+                           u32 queue_id)
 {
   u32 next_index = VNET_DEVICE_INPUT_NEXT_ETHERNET_INPUT;
   uword n_trace = vlib_get_trace_count (vm, node);
   u32 n_rx_packets = 0;
   u32 n_rx_bytes = 0;
   u32 *to_next = 0;
-  odp_pktin_queue_t pktin = { 0 };
   odp_packet_t pkt_tbl[VLIB_FRAME_SIZE];
   int pkts = 0, i;
-  u32 retry = 8;
-  u32 n_left = 0, n_left_to_next = VLIB_FRAME_SIZE;
+  int n_left = 0, n_left_to_next = VLIB_FRAME_SIZE;
   u32 next0 = next_index;
   u32 next1 = next_index;
   u32 next2 = next_index;
   u32 next3 = next_index;
 
-  while (1)
+  do
     {
       if ((oif->mode == (APPL_MODE_PKT_QUEUE)) ||
          (oif->mode == (APPL_MODE_PKT_SCHED)))
        {
-         pkts = odp_packet_queue_mode (oif->pktio, oif->mode, pkt_tbl);
+         pkts =
+           odp_packet_queue_mode (oif, pkt_tbl, queue_id, n_left_to_next);
        }
       else
        {
-         pkts = odp_packet_burst_mode (oif->pktio, pktin, pkt_tbl,
-                                       n_left_to_next);
+         pkts =
+           odp_packet_burst_mode (oif, pkt_tbl, queue_id, n_left_to_next);
        }
 
       n_left = drop_err_pkts (pkt_tbl, pkts);
       if (n_left == 0)
-       {
-         if (retry--)
-           continue;
-         else
-           break;
-       }
+       break;
+
       i = 0;
 
       if (n_rx_packets == 0)
@@ -360,10 +356,8 @@ odp_packet_device_input_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
          n_left--;
          n_rx_packets++;
        }
-
-      if (n_left_to_next < 4)
-       break;
     }
+  while (0);
 
   if (n_rx_packets)
     {
@@ -397,7 +391,8 @@ odp_packet_input_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
   {
     oif = pool_elt_at_index (om->interfaces, dq->dev_instance);
     if (oif->is_admin_up)
-      n_rx_packets += odp_packet_device_input_fn (vm, node, frame, oif);
+      n_rx_packets += odp_packet_device_input_fn (vm, node, frame, oif,
+                                                 dq->queue_id);
   }
 
   return n_rx_packets;
index 70f7452..bcb8a48 100755 (executable)
@@ -61,7 +61,8 @@ drop_err_pkts (odp_packet_t pkt_tbl[], unsigned len)
 }
 
 static odp_pktio_t
-create_pktio (const char *dev, odp_pool_t pool, u32 mode)
+create_pktio (const char *dev, odp_pool_t pool, u32 mode,
+             odp_packet_if_t * oif)
 {
   odp_pktio_t pktio;
   int ret;
@@ -69,6 +70,8 @@ create_pktio (const char *dev, odp_pool_t pool, u32 mode)
   odp_pktin_queue_param_t pktin_param;
   odp_pktout_queue_param_t pktout_param;
   odp_pktio_config_t pktio_config;
+  vlib_thread_main_t *tm = vlib_get_thread_main ();
+  odp_pktio_capability_t capa;
 
   odp_pktio_param_init (&pktio_param);
 
@@ -76,12 +79,15 @@ create_pktio (const char *dev, odp_pool_t pool, u32 mode)
     {
     case APPL_MODE_PKT_BURST:
       pktio_param.in_mode = ODP_PKTIN_MODE_DIRECT;
+      pktio_param.out_mode = ODP_PKTOUT_MODE_DIRECT;
       break;
     case APPL_MODE_PKT_QUEUE:
       pktio_param.in_mode = ODP_PKTIN_MODE_QUEUE;
+      pktio_param.out_mode = ODP_PKTOUT_MODE_QUEUE;
       break;
     case APPL_MODE_PKT_SCHED:
       pktio_param.in_mode = ODP_PKTIN_MODE_SCHED;
+      pktio_param.out_mode = ODP_PKTOUT_MODE_DIRECT;
       break;
     default:
       clib_warning ("Invalid mode\n");
@@ -93,6 +99,13 @@ create_pktio (const char *dev, odp_pool_t pool, u32 mode)
   if (pktio == ODP_PKTIO_INVALID)
     {
       clib_warning ("Error: pktio create failed for %s", dev);
+      return 0;
+    }
+
+  if (odp_pktio_capability (pktio, &capa))
+    {
+      clib_warning ("Error: capability query failed %s\n", dev);
+      return 0;
     }
 
   odp_pktio_config_init (&pktio_config);
@@ -100,9 +113,29 @@ create_pktio (const char *dev, odp_pool_t pool, u32 mode)
   odp_pktio_config (pktio, &pktio_config);
 
   odp_pktin_queue_param_init (&pktin_param);
-  pktin_param.classifier_enable = 0;
   pktin_param.op_mode = ODP_PKTIO_OP_MT_UNSAFE;
 
+  if (oif->rx_queues > capa.max_input_queues)
+    {
+      oif->rx_queues = capa.max_input_queues;
+      clib_warning ("Number of RX queues limited to %d\n", oif->rx_queues);
+    }
+
+  if (oif->rx_queues > 1)
+    {
+      if (oif->rx_queues > MAX_QUEUES)
+       oif->rx_queues = MAX_QUEUES;
+
+      pktin_param.classifier_enable = 0;
+      pktin_param.hash_enable = 1;
+      pktin_param.num_queues = oif->rx_queues;
+      pktin_param.hash_proto.proto.ipv4_udp = 1;
+      pktin_param.hash_proto.proto.ipv4_tcp = 1;
+      pktin_param.hash_proto.proto.ipv4 = 1;
+    }
+  else
+    oif->rx_queues = 1;
+
   if (mode == APPL_MODE_PKT_SCHED)
     pktin_param.queue_param.sched.sync = ODP_SCHED_SYNC_ATOMIC;
 
@@ -112,9 +145,16 @@ create_pktio (const char *dev, odp_pool_t pool, u32 mode)
     }
 
   odp_pktout_queue_param_init (&pktout_param);
-  /* TODO use multiple output queue and no synchronization
-     pktout_param.op_mode = ODP_PKTIO_OP_MT_UNSAFE;
-   */
+  if (capa.max_output_queues >= tm->n_vlib_mains)
+    {
+      pktout_param.op_mode = ODP_PKTIO_OP_MT_UNSAFE;
+      pktout_param.num_queues = tm->n_vlib_mains;
+      oif->tx_queues = tm->n_vlib_mains;
+    }
+  else
+    {
+      oif->tx_queues = 1;
+    }
 
   if (odp_pktout_queue_config (pktio, &pktout_param))
     {
@@ -132,10 +172,10 @@ create_pktio (const char *dev, odp_pool_t pool, u32 mode)
 
 u32
 odp_packet_create_if (vlib_main_t * vm, u8 * host_if_name, u8 * hw_addr_set,
-                     u32 * sw_if_index, u32 mode)
+                     u32 * sw_if_index, u32 mode, u32 rx_queues)
 {
   odp_packet_main_t *om = odp_packet_main;
-  int ret = 0;
+  int ret = 0, j;
   odp_packet_if_t *oif = 0;
   u8 hw_addr[6];
   clib_error_t *error = 0;
@@ -154,8 +194,13 @@ odp_packet_create_if (vlib_main_t * vm, u8 * host_if_name, u8 * hw_addr_set,
   oif->host_if_name = host_if_name_dup;
   oif->per_interface_next_index = ~0;
 
+  if (mode == APPL_MODE_PKT_SCHED)
+    oif->rx_queues = tm->n_vlib_mains - 1;
+  else
+    oif->rx_queues = rx_queues;
+
   /* Create a pktio instance */
-  oif->pktio = create_pktio ((char *) host_if_name, om->pool, mode);
+  oif->pktio = create_pktio ((char *) host_if_name, om->pool, mode, oif);
   oif->mode = mode;
   om->if_count++;
 
@@ -166,6 +211,17 @@ odp_packet_create_if (vlib_main_t * vm, u8 * host_if_name, u8 * hw_addr_set,
       memset ((void *) oif->lockp, 0, CLIB_CACHE_LINE_BYTES);
     }
 
+  if ((mode == APPL_MODE_PKT_BURST) || (mode == APPL_MODE_PKT_SCHED))
+    {
+      odp_pktin_queue (oif->pktio, oif->inq, oif->rx_queues);
+      odp_pktout_queue (oif->pktio, oif->outq, oif->tx_queues);
+    }
+  else if (mode == APPL_MODE_PKT_QUEUE)
+    {
+      odp_pktin_event_queue (oif->pktio, oif->rxq, oif->rx_queues);
+      odp_pktout_event_queue (oif->pktio, oif->txq, oif->tx_queues);
+    }
+
   /*use configured or generate random MAC address */
   if (hw_addr_set)
     clib_memcpy (hw_addr, hw_addr_set, 6);
@@ -208,8 +264,14 @@ odp_packet_create_if (vlib_main_t * vm, u8 * host_if_name, u8 * hw_addr_set,
   if (sw_if_index)
     *sw_if_index = oif->sw_if_index;
 
-  /* Assign queue 0 of the new interface to first available worker thread */
-  vnet_hw_interface_assign_rx_thread (vnm, oif->hw_if_index, 0, ~0);
+  /* Assign queues of the new interface to first available worker thread */
+  for (j = 0; j < oif->rx_queues; j++)
+    {
+      if (mode == APPL_MODE_PKT_SCHED)
+       vnet_hw_interface_assign_rx_thread (vnm, oif->hw_if_index, 0, j + 1);
+      else
+       vnet_hw_interface_assign_rx_thread (vnm, oif->hw_if_index, j, -1);
+    }
 
   return 0;
 
index f9c793b..81b4a8e 100755 (executable)
@@ -16,6 +16,7 @@
 #define APPL_MODE_PKT_SCHED    2
 
 #define MAX_WORKERS 32
+#define MAX_QUEUES (MAX_WORKERS + 1)
 
 typedef struct
 {
@@ -31,6 +32,12 @@ typedef struct
   u32 per_interface_next_index;
   u8 is_admin_up;
   u32 mode;
+  odp_queue_t rxq[MAX_QUEUES];
+  odp_pktin_queue_t inq[MAX_QUEUES];
+  odp_pktout_queue_t outq[MAX_QUEUES];
+  odp_queue_t txq[MAX_QUEUES];
+  u16 rx_queues;
+  u16 tx_queues;
 } odp_packet_if_t;
 
 typedef struct
@@ -53,7 +60,8 @@ extern vnet_device_class_t odp_packet_device_class;
 extern vlib_node_registration_t odp_packet_input_node;
 
 u32 odp_packet_create_if (vlib_main_t * vm, u8 * host_if_name,
-                         u8 * hw_addr_set, u32 * sw_if_index, u32 mode);
+                         u8 * hw_addr_set, u32 * sw_if_index, u32 mode,
+                         u32 rx_queues);
 u32 odp_packet_delete_if (vlib_main_t * vm, u8 * host_if_name);
 
 u32 drop_err_pkts (odp_packet_t pkt_tbl[], u32 len);