rdma: use rings for buffers management 05/21505/2
authorBenoît Ganne <bganne@cisco.com>
Wed, 21 Aug 2019 13:11:43 +0000 (15:11 +0200)
committerDamjan Marion <dmarion@me.com>
Mon, 26 Aug 2019 14:10:36 +0000 (14:10 +0000)
Refactor rdma driver for improved performance and prepare for raw
datapath access.

Type: refactor

Change-Id: Iae31872055a6947708ea9f430bd1dc083ea63b5a
Signed-off-by: Benoît Ganne <bganne@cisco.com>
src/plugins/rdma/device.c
src/plugins/rdma/format.c
src/plugins/rdma/input.c
src/plugins/rdma/output.c
src/plugins/rdma/rdma.h

index 62dff21..532f4f5 100644 (file)
@@ -266,8 +266,7 @@ rdma_async_event_error_ready (clib_file_t * f)
 {
   rdma_main_t *rm = &rdma_main;
   rdma_device_t *rd = vec_elt_at_index (rm->devices, f->private_data);
-  return clib_error_return (0, "RDMA async event error for device %U",
-                           format_vlib_pci_addr, &rd->pci_addr);
+  return clib_error_return (0, "RDMA: %s: async event error", rd->name);
 }
 
 static clib_error_t *
@@ -293,8 +292,7 @@ rdma_async_event_read_ready (clib_file_t * f)
     case IBV_EVENT_DEVICE_FATAL:
       rd->flags &= ~RDMA_DEVICE_F_LINK_UP;
       vnet_hw_interface_set_flags (vnm, rd->hw_if_index, 0);
-      vlib_log_emerg (rm->log_class, "Fatal RDMA error for device %U",
-                     format_vlib_pci_addr, &rd->pci_addr);
+      vlib_log_emerg (rm->log_class, "%s: fatal error", rd->name);
       break;
     default:
       rdma_log__ (VLIB_LOG_LEVEL_ERR, rd, "unhandeld RDMA async event %i",
@@ -326,8 +324,7 @@ rdma_async_event_init (rdma_device_t * rd)
   t.file_descriptor = rd->ctx->async_fd;
   t.error_function = rdma_async_event_error_ready;
   t.private_data = rd->dev_instance;
-  t.description =
-    format (0, "RMDA %U async event", format_vlib_pci_addr, &rd->pci_addr);
+  t.description = format (0, "%s async event", rd->name);
 
   rd->async_event_clib_file_index = clib_file_add (&file_main, &t);
   return 0;
@@ -393,6 +390,7 @@ rdma_dev_cleanup (rdma_device_t * rd)
   vec_free (rd->rxqs);
   vec_free (rd->txqs);
   vec_free (rd->name);
+  vlib_pci_free_device_info (rd->pci);
   pool_put (rm->devices, rd);
 }
 
@@ -406,6 +404,7 @@ rdma_rxq_init (vlib_main_t * vm, rdma_device_t * rd, u16 qid, u32 n_desc)
   vec_validate_aligned (rd->rxqs, qid, CLIB_CACHE_LINE_BYTES);
   rxq = vec_elt_at_index (rd->rxqs, qid);
   rxq->size = n_desc;
+  vec_validate_aligned (rxq->bufs, n_desc - 1, CLIB_CACHE_LINE_BYTES);
 
   if ((rxq->cq = ibv_create_cq (rd->ctx, n_desc, NULL, NULL, 0)) == 0)
     return clib_error_return_unix (0, "Create CQ Failed");
@@ -482,6 +481,7 @@ rdma_txq_init (vlib_main_t * vm, rdma_device_t * rd, u16 qid, u32 n_desc)
   vec_validate_aligned (rd->txqs, qid, CLIB_CACHE_LINE_BYTES);
   txq = vec_elt_at_index (rd->txqs, qid);
   txq->size = n_desc;
+  vec_validate_aligned (txq->bufs, n_desc - 1, CLIB_CACHE_LINE_BYTES);
 
   if ((txq->cq = ibv_create_cq (rd->ctx, n_desc, NULL, NULL, 0)) == 0)
     return clib_error_return_unix (0, "Create CQ Failed");
@@ -492,7 +492,6 @@ rdma_txq_init (vlib_main_t * vm, rdma_device_t * rd, u16 qid, u32 n_desc)
   qpia.cap.max_send_wr = n_desc;
   qpia.cap.max_send_sge = 1;
   qpia.qp_type = IBV_QPT_RAW_PACKET;
-  qpia.sq_sig_all = 1;
 
   if ((txq->qp = ibv_create_qp (rd->pd, &qpia)) == 0)
     return clib_error_return_unix (0, "Queue Pair create failed");
@@ -549,6 +548,7 @@ rdma_dev_init (vlib_main_t * vm, rdma_device_t * rd, u32 rxq_size,
                            bm->buffer_mem_size,
                            IBV_ACCESS_LOCAL_WRITE)) == 0)
     return clib_error_return_unix (0, "Register MR Failed");
+  rd->lkey = rd->mr->lkey;     /* avoid indirection in datapath */
 
   return 0;
 }
@@ -573,11 +573,13 @@ rdma_create_if (vlib_main_t * vm, rdma_create_if_args_t * args)
 {
   vnet_main_t *vnm = vnet_get_main ();
   rdma_main_t *rm = &rdma_main;
-  rdma_device_t *rd = 0;
-  struct ibv_device **dev_list = 0;
+  rdma_device_t *rd;
+  vlib_pci_addr_t pci_addr;
+  struct ibv_device **dev_list;
   int n_devs;
-  u8 *s = 0, *s2 = 0;
+  u8 *s;
   u16 qid;
+  int i;
 
   args->rxq_size = args->rxq_size ? args->rxq_size : 2 * VLIB_FRAME_SIZE;
   args->txq_size = args->txq_size ? args->txq_size : 2 * VLIB_FRAME_SIZE;
@@ -588,54 +590,57 @@ rdma_create_if (vlib_main_t * vm, rdma_create_if_args_t * args)
       args->rv = VNET_API_ERROR_INVALID_VALUE;
       args->error =
        clib_error_return (0, "rx queue number must be a power of two");
-      return;
+      goto err0;
     }
 
-  if (!is_pow2 (args->rxq_size) || !is_pow2 (args->txq_size))
+  if (args->rxq_size < VLIB_FRAME_SIZE || args->txq_size < VLIB_FRAME_SIZE ||
+      !is_pow2 (args->rxq_size) || !is_pow2 (args->txq_size))
     {
       args->rv = VNET_API_ERROR_INVALID_VALUE;
       args->error =
-       clib_error_return (0, "queue size must be a power of two");
-      return;
+       clib_error_return (0, "queue size must be a power of two >= %i",
+                          VLIB_FRAME_SIZE);
+      goto err0;
     }
 
-  pool_get_zero (rm->devices, rd);
-  rd->dev_instance = rd - rm->devices;
-  rd->per_interface_next_index = VNET_DEVICE_INPUT_NEXT_ETHERNET_INPUT;
-  rd->name = vec_dup (args->name);
-
-  /* check if device exist and if it is bound to mlx5_core */
-  s = format (s, "/sys/class/net/%s/device/driver/module%c", args->ifname, 0);
-  s2 = clib_sysfs_link_to_name ((char *) s);
-
-  if (s2 == 0 || strncmp ((char *) s2, "mlx5_core", 9) != 0)
+  dev_list = ibv_get_device_list (&n_devs);
+  if (n_devs == 0)
     {
       args->error =
-       clib_error_return (0,
-                          "invalid interface (only mlx5 supported for now)");
+       clib_error_return_unix (0,
+                               "no RDMA devices available. Is the ib_uverbs module loaded?");
       goto err0;
     }
 
-  /* extract PCI address */
-  vec_reset_length (s);
-  s = format (s, "/sys/class/net/%s/device%c", args->ifname, 0);
-  if (sysfs_path_to_pci_addr ((char *) s, &rd->pci_addr) == 0)
+  /* get PCI address */
+  s = format (0, "/sys/class/net/%s/device%c", args->ifname, 0);
+  if (sysfs_path_to_pci_addr ((char *) s, &pci_addr) == 0)
     {
-      args->error = clib_error_return (0, "cannot find PCI address");
-      goto err0;
+      args->error =
+       clib_error_return (0, "cannot find PCI address for device ");
+      goto err1;
     }
 
-  dev_list = ibv_get_device_list (&n_devs);
-  if (n_devs == 0)
+  pool_get_zero (rm->devices, rd);
+  rd->dev_instance = rd - rm->devices;
+  rd->per_interface_next_index = VNET_DEVICE_INPUT_NEXT_ETHERNET_INPUT;
+  rd->name = format (0, "%s", args->name);
+  rd->linux_ifname = format (0, "%s", args->ifname);
+
+  rd->pci = vlib_pci_get_device_info (vm, &pci_addr, &args->error);
+  if (!rd->pci)
+    goto err2;
+  rd->pool = vlib_buffer_pool_get_default_for_numa (vm, rd->pci->numa_node);
+
+  if (strncmp ((char *) rd->pci->driver_name, "mlx5_core", 9))
     {
       args->error =
-       clib_error_return_unix (0,
-                               "no RDMA devices available, errno = %d. "
-                               "Is the ib_uverbs module loaded?", errno);
-      goto err0;
+       clib_error_return (0,
+                          "invalid interface (only mlx5 supported for now)");
+      goto err2;
     }
 
-  for (int i = 0; i < n_devs; i++)
+  for (i = 0; i < n_devs; i++)
     {
       vlib_pci_addr_t addr;
 
@@ -645,7 +650,7 @@ rdma_create_if (vlib_main_t * vm, rdma_create_if_args_t * args)
       if (sysfs_path_to_pci_addr ((char *) s, &addr) == 0)
        continue;
 
-      if (addr.as_u32 != rd->pci_addr.as_u32)
+      if (addr.as_u32 != rd->pci->addr.as_u32)
        continue;
 
       if ((rd->ctx = ibv_open_device (dev_list[i])))
@@ -654,7 +659,7 @@ rdma_create_if (vlib_main_t * vm, rdma_create_if_args_t * args)
 
   if ((args->error =
        rdma_dev_init (vm, rd, args->rxq_size, args->txq_size, args->rxq_num)))
-    goto err1;
+    goto err2;
 
   if ((args->error = rdma_register_interface (vnm, rd)))
     goto err2;
@@ -675,6 +680,8 @@ rdma_create_if (vlib_main_t * vm, rdma_create_if_args_t * args)
                                    rdma_input_node.index);
   vec_foreach_index (qid, rd->rxqs)
     vnet_hw_interface_assign_rx_thread (vnm, rd->hw_if_index, qid, ~0);
+
+  vec_free (s);
   return;
 
 err3:
@@ -683,10 +690,9 @@ err2:
   rdma_dev_cleanup (rd);
 err1:
   ibv_free_device_list (dev_list);
-err0:
-  vec_free (s2);
   vec_free (s);
   args->rv = VNET_API_ERROR_INVALID_INTERFACE;
+err0:
   vlib_log_err (rm->log_class, "%U", format_clib_error, args->error);
 }
 
index fbd4067..798b21d 100644 (file)
@@ -59,7 +59,9 @@ format_rdma_device (u8 * s, va_list * args)
   rdma_device_t *rd = vec_elt_at_index (rm->devices, i);
   u32 indent = format_get_indent (s);
 
-  s = format (s, "flags: %U", format_rdma_device_flags, rd);
+  s = format (s, "netdev: %s\n", rd->linux_ifname);
+  s = format (s, "%Uflags: %U", format_white_space, indent,
+             format_rdma_device_flags, rd);
   if (rd->error)
     s = format (s, "\n%Uerror %U", format_white_space, indent,
                format_clib_error, rd->error);
index b2f3c28..3c9481f 100644 (file)
@@ -45,17 +45,30 @@ rdma_device_input_refill (vlib_main_t * vm, rdma_device_t * rd,
                          rdma_rxq_t * rxq)
 {
   u32 n_alloc, n;
-  u32 buffers[VLIB_FRAME_SIZE], *bi = buffers;
   vlib_buffer_t *bufs[VLIB_FRAME_SIZE], **b = bufs;
   struct ibv_recv_wr wr[VLIB_FRAME_SIZE], *w = wr;
   struct ibv_sge sge[VLIB_FRAME_SIZE], *s = sge;
+  u32 slot = rxq->tail & (rxq->size - 1);
 
-  if (PREDICT_FALSE (rxq->n_enq >= rxq->size))
+  /* do not enqueue more packet than ring space */
+  n_alloc = clib_min (VLIB_FRAME_SIZE, rxq->size - (rxq->tail - rxq->head));
+
+  /* do not bother to allocate if too small */
+  if (n_alloc < 16)
+    return;
+
+  /* avoid wrap-around logic in core loop */
+  n_alloc = clib_min (n_alloc, rxq->size - slot);
+
+  n = n_alloc =
+    vlib_buffer_alloc_to_ring_from_pool (vm, rxq->bufs, slot, rxq->size,
+                                        n_alloc, rd->pool);
+
+  /* if ring is full or allocation error, do nothing */
+  if (PREDICT_FALSE (0 == n_alloc))
     return;
 
-  n_alloc = clib_min (VLIB_FRAME_SIZE, rxq->size - rxq->n_enq);
-  n_alloc = n = vlib_buffer_alloc (vm, buffers, n_alloc);
-  vlib_get_buffers (vm, buffers, bufs, n_alloc);
+  vlib_get_buffers (vm, &rxq->bufs[slot], bufs, n_alloc);
 
   while (n >= 4)
     {
@@ -67,42 +80,37 @@ rdma_device_input_refill (vlib_main_t * vm, rdma_device_t * rd,
 
       s[0].addr = vlib_buffer_get_va (b[0]);
       s[0].length = vlib_buffer_get_default_data_size (vm);
-      s[0].lkey = rd->mr->lkey;
+      s[0].lkey = rd->lkey;
 
       s[1].addr = vlib_buffer_get_va (b[1]);
       s[1].length = vlib_buffer_get_default_data_size (vm);
-      s[1].lkey = rd->mr->lkey;
+      s[1].lkey = rd->lkey;
 
       s[2].addr = vlib_buffer_get_va (b[2]);
       s[2].length = vlib_buffer_get_default_data_size (vm);
-      s[2].lkey = rd->mr->lkey;
+      s[2].lkey = rd->lkey;
 
       s[3].addr = vlib_buffer_get_va (b[3]);
       s[3].length = vlib_buffer_get_default_data_size (vm);
-      s[3].lkey = rd->mr->lkey;
+      s[3].lkey = rd->lkey;
 
-      w[0].wr_id = bi[0];
       w[0].next = &w[0] + 1;
       w[0].sg_list = &s[0];
       w[0].num_sge = 1;
 
-      w[1].wr_id = bi[1];
       w[1].next = &w[1] + 1;
       w[1].sg_list = &s[1];
       w[1].num_sge = 1;
 
-      w[2].wr_id = bi[2];
       w[2].next = &w[2] + 1;
       w[2].sg_list = &s[2];
       w[2].num_sge = 1;
 
-      w[3].wr_id = bi[3];
       w[3].next = &w[3] + 1;
       w[3].sg_list = &s[3];
       w[3].num_sge = 1;
 
       s += 4;
-      bi += 4;
       w += 4;
       b += 4;
       n -= 4;
@@ -112,15 +120,13 @@ rdma_device_input_refill (vlib_main_t * vm, rdma_device_t * rd,
     {
       s[0].addr = vlib_buffer_get_va (b[0]);
       s[0].length = vlib_buffer_get_default_data_size (vm);
-      s[0].lkey = rd->mr->lkey;
+      s[0].lkey = rd->lkey;
 
-      w[0].wr_id = bi[0];
       w[0].next = &w[0] + 1;
       w[0].sg_list = &s[0];
       w[0].num_sge = 1;
 
       s += 1;
-      bi += 1;
       w += 1;
       b += 1;
       n -= 1;
@@ -132,10 +138,11 @@ rdma_device_input_refill (vlib_main_t * vm, rdma_device_t * rd,
   if (ibv_post_wq_recv (rxq->wq, wr, &w) != 0)
     {
       n = w - wr;
-      vlib_buffer_free (vm, buffers + n, n_alloc - n);
+      vlib_buffer_free_from_ring (vm, rxq->bufs, slot + n, rxq->size,
+                                 n_alloc - n);
     }
 
-  rxq->n_enq += n;
+  rxq->tail += n;
 }
 
 static_always_inline void
@@ -193,11 +200,16 @@ rdma_device_input_ethernet (vlib_main_t * vm, vlib_node_runtime_t * node,
 }
 
 static_always_inline u32
-rdma_device_input_load_wc (u32 n_left_from, struct ibv_wc * wc, u32 * to_next,
-                          u32 * bufsz)
+rdma_device_input_bufs (vlib_main_t * vm, const rdma_device_t * rd,
+                       u32 * next, u32 * bi, struct ibv_wc * wc,
+                       u32 n_left_from, vlib_buffer_t * bt)
 {
+  vlib_buffer_t *bufs[VLIB_FRAME_SIZE], **b = bufs;
   u32 n_rx_bytes[4] = { 0 };
 
+  vlib_get_buffers (vm, bi, bufs, n_left_from);
+  ASSERT (bt->buffer_pool_index == bufs[0]->buffer_pool_index);
+
   while (n_left_from >= 4)
     {
       if (PREDICT_TRUE (n_left_from >= 8))
@@ -206,92 +218,53 @@ rdma_device_input_load_wc (u32 n_left_from, struct ibv_wc * wc, u32 * to_next,
          CLIB_PREFETCH (&wc[4 + 1], CLIB_CACHE_LINE_BYTES, LOAD);
          CLIB_PREFETCH (&wc[4 + 2], CLIB_CACHE_LINE_BYTES, LOAD);
          CLIB_PREFETCH (&wc[4 + 3], CLIB_CACHE_LINE_BYTES, LOAD);
-         CLIB_PREFETCH (&bufsz[4 + 0], 4 * sizeof (bufsz[0]), STORE);
-         CLIB_PREFETCH (&to_next[4 + 0], 4 * sizeof (to_next[0]), STORE);
+         vlib_prefetch_buffer_header (b[4 + 0], STORE);
+         vlib_prefetch_buffer_header (b[4 + 1], STORE);
+         vlib_prefetch_buffer_header (b[4 + 2], STORE);
+         vlib_prefetch_buffer_header (b[4 + 3], STORE);
        }
 
-      to_next[0] = wc[0].wr_id;
-      to_next[1] = wc[1].wr_id;
-      to_next[2] = wc[2].wr_id;
-      to_next[3] = wc[3].wr_id;
+      vlib_buffer_copy_indices (next, bi, 4);
 
-      bufsz[0] = wc[0].byte_len;
-      bufsz[1] = wc[1].byte_len;
-      bufsz[2] = wc[2].byte_len;
-      bufsz[3] = wc[3].byte_len;
+      vlib_buffer_copy_template (b[0], bt);
+      vlib_buffer_copy_template (b[1], bt);
+      vlib_buffer_copy_template (b[2], bt);
+      vlib_buffer_copy_template (b[3], bt);
+
+      b[0]->current_length = wc[0].byte_len;
+      b[1]->current_length = wc[1].byte_len;
+      b[2]->current_length = wc[2].byte_len;
+      b[3]->current_length = wc[3].byte_len;
 
       n_rx_bytes[0] += wc[0].byte_len;
       n_rx_bytes[1] += wc[1].byte_len;
       n_rx_bytes[2] += wc[2].byte_len;
       n_rx_bytes[3] += wc[3].byte_len;
 
+      next += 4;
+      bi += 4;
+      b += 4;
       wc += 4;
-      to_next += 4;
-      bufsz += 4;
       n_left_from -= 4;
     }
 
   while (n_left_from >= 1)
     {
-      to_next[0] = wc[0].wr_id;
-      bufsz[0] = wc[0].byte_len;
+      vlib_buffer_copy_indices (next, bi, 1);
+      vlib_buffer_copy_template (b[0], bt);
+      b[0]->current_length = wc[0].byte_len;
       n_rx_bytes[0] += wc[0].byte_len;
 
+      next += 1;
+      bi += 1;
+      b += 1;
       wc += 1;
-      to_next += 1;
-      bufsz += 1;
       n_left_from -= 1;
     }
 
   return n_rx_bytes[0] + n_rx_bytes[1] + n_rx_bytes[2] + n_rx_bytes[3];
 }
 
-static_always_inline void
-rdma_device_input_bufs_init (u32 n_left_from, vlib_buffer_t ** bufs,
-                            u32 * bufsz, u32 sw_if_index, vlib_buffer_t * bt)
-{
-  vnet_buffer (bt)->sw_if_index[VLIB_RX] = sw_if_index;
-  vnet_buffer (bt)->sw_if_index[VLIB_TX] = ~0;
-  bt->buffer_pool_index = bufs[0]->buffer_pool_index;
-  bt->ref_count = 1;
-
-  while (n_left_from >= 4)
-    {
-      if (PREDICT_TRUE (n_left_from >= 8))
-       {
-         vlib_prefetch_buffer_header (bufs[4 + 0], STORE);
-         vlib_prefetch_buffer_header (bufs[4 + 1], STORE);
-         vlib_prefetch_buffer_header (bufs[4 + 2], STORE);
-         vlib_prefetch_buffer_header (bufs[4 + 3], STORE);
-         CLIB_PREFETCH (&bufsz[4 + 0], 4 * sizeof (bufsz[0]), LOAD);
-       }
-
-      vlib_buffer_copy_template (bufs[0], bt);
-      vlib_buffer_copy_template (bufs[1], bt);
-      vlib_buffer_copy_template (bufs[2], bt);
-      vlib_buffer_copy_template (bufs[3], bt);
-
-      bufs[0]->current_length = bufsz[0];
-      bufs[1]->current_length = bufsz[1];
-      bufs[2]->current_length = bufsz[2];
-      bufs[3]->current_length = bufsz[3];
-
-      bufs += 4;
-      bufsz += 4;
-      n_left_from -= 4;
-    }
-
-  while (n_left_from >= 1)
-    {
-      vlib_buffer_copy_template (bufs[0], bt);
-      bufs[0]->current_length = bufsz[0];
-
-      bufs += 1;
-      bufsz += 1;
-      n_left_from -= 1;
-    }
-}
-
 static_always_inline uword
 rdma_device_input_inline (vlib_main_t * vm, vlib_node_runtime_t * node,
                          vlib_frame_t * frame, rdma_device_t * rd, u16 qid)
@@ -299,12 +272,16 @@ rdma_device_input_inline (vlib_main_t * vm, vlib_node_runtime_t * node,
   vnet_main_t *vnm = vnet_get_main ();
   rdma_rxq_t *rxq = vec_elt_at_index (rd->rxqs, qid);
   struct ibv_wc wc[VLIB_FRAME_SIZE];
-  u32 bufsz[VLIB_FRAME_SIZE];
-  vlib_buffer_t *bufs[VLIB_FRAME_SIZE], bt;
+  vlib_buffer_t bt;
   u32 next_index, *to_next, n_left_to_next;
   u32 n_rx_packets, n_rx_bytes;
+  u32 slot, n_tail;
+
+  ASSERT (rxq->size >= VLIB_FRAME_SIZE && is_pow2 (rxq->size));
+  ASSERT (rxq->tail - rxq->head <= rxq->size);
 
   n_rx_packets = ibv_poll_cq (rxq->cq, VLIB_FRAME_SIZE, wc);
+  ASSERT (n_rx_packets <= rxq->tail - rxq->head);
 
   if (PREDICT_FALSE (n_rx_packets <= 0))
     {
@@ -312,30 +289,50 @@ rdma_device_input_inline (vlib_main_t * vm, vlib_node_runtime_t * node,
       return 0;
     }
 
+  /* init buffer template */
   clib_memset_u64 (&bt, 0,
                   STRUCT_OFFSET_OF (vlib_buffer_t,
                                     template_end) / sizeof (u64));
+  vnet_buffer (&bt)->sw_if_index[VLIB_RX] = rd->sw_if_index;
+  vnet_buffer (&bt)->sw_if_index[VLIB_TX] = ~0;
+  bt.buffer_pool_index = rd->pool;
+  bt.ref_count = 1;
+
+  /* update buffer template for input feature arcs if any */
   next_index = rd->per_interface_next_index;
   if (PREDICT_FALSE (vnet_device_input_have_features (rd->sw_if_index)))
     vnet_feature_start_device_input_x1 (rd->sw_if_index, &next_index, &bt);
 
   vlib_get_new_next_frame (vm, node, next_index, to_next, n_left_to_next);
-  n_rx_bytes = rdma_device_input_load_wc (n_rx_packets, wc, to_next, bufsz);
-  vlib_get_buffers (vm, to_next, bufs, n_rx_packets);
-  rdma_device_input_bufs_init (n_rx_packets, bufs, bufsz, rd->sw_if_index,
-                              &bt);
-  rdma_device_input_trace (vm, node, rd, n_rx_packets, to_next, next_index);
+  ASSERT (n_rx_packets <= n_left_to_next);
+
+  /*
+   * avoid wrap-around logic in core loop
+   * we requested VLIB_FRAME_SIZE packets and rxq->size >= VLIB_FRAME_SIZE
+   *    => we can process all packets in 2 iterations max
+   */
+  slot = rxq->head & (rxq->size - 1);
+  n_tail = clib_min (n_rx_packets, rxq->size - slot);
+  n_rx_bytes =
+    rdma_device_input_bufs (vm, rd, &to_next[0], &rxq->bufs[slot], wc, n_tail,
+                           &bt);
+  if (n_tail < n_rx_packets)
+    n_rx_bytes +=
+      rdma_device_input_bufs (vm, rd, &to_next[n_tail], &rxq->bufs[0], wc,
+                             n_rx_packets - n_tail, &bt);
   rdma_device_input_ethernet (vm, node, rd, next_index);
 
   vlib_put_next_frame (vm, node, next_index, n_left_to_next - n_rx_packets);
 
+  rxq->head += n_rx_packets;
+
+  rdma_device_input_trace (vm, node, rd, n_rx_packets, to_next, next_index);
+
   vlib_increment_combined_counter
     (vnm->interface_main.combined_sw_if_counters +
      VNET_INTERFACE_COUNTER_RX, vm->thread_index,
      rd->hw_if_index, n_rx_packets, n_rx_bytes);
 
-  rxq->n_enq -= n_rx_packets;
-
   rdma_device_input_refill (vm, rd, rxq);
 
   return n_rx_packets;
index ddda81a..0c6848e 100644 (file)
@@ -28,46 +28,45 @@ static_always_inline void
 rdma_device_output_free (vlib_main_t * vm, rdma_txq_t * txq)
 {
   struct ibv_wc wc[VLIB_FRAME_SIZE];
-  u32 to_free[VLIB_FRAME_SIZE];
-  int n_free;
-  int i;
+  u32 tail, slot;
+  int n;
 
-  n_free = ibv_poll_cq (txq->cq, VLIB_FRAME_SIZE, wc);
-  if (n_free <= 0)
+  n = ibv_poll_cq (txq->cq, VLIB_FRAME_SIZE, wc);
+  if (n <= 0)
     return;
 
-  for (i = 0; i < n_free; i++)
-    to_free[i] = wc[i].wr_id;
-
-  vlib_buffer_free (vm, to_free, n_free);
+  tail = wc[n - 1].wr_id;
+  slot = txq->head & (txq->size - 1);
+  vlib_buffer_free_from_ring (vm, txq->bufs, slot, txq->size,
+                             tail - txq->head);
+  txq->head = tail;
 }
 
-VNET_DEVICE_CLASS_TX_FN (rdma_device_class) (vlib_main_t * vm,
-                                            vlib_node_runtime_t * node,
-                                            vlib_frame_t * frame)
+static_always_inline u32
+rmda_device_output_tx (vlib_main_t * vm, const rdma_device_t * rd,
+                      rdma_txq_t * txq, u32 n_left_from, u32 * bi)
 {
-  rdma_main_t *rm = &rdma_main;
-  vnet_interface_output_runtime_t *ord = (void *) node->runtime_data;
-  rdma_device_t *rd = pool_elt_at_index (rm->devices, ord->dev_instance);
-  u32 thread_index = vm->thread_index;
-  rdma_txq_t *txq =
-    vec_elt_at_index (rd->txqs, thread_index % vec_len (rd->txqs));
-  u32 *from, *f, n_left_from;
-  u32 n_tx_packets, n_tx_failed;
   vlib_buffer_t *bufs[VLIB_FRAME_SIZE], **b = bufs;
   struct ibv_send_wr wr[VLIB_FRAME_SIZE], *w = wr;
   struct ibv_sge sge[VLIB_FRAME_SIZE], *s = sge;
-  int i, ret;
+  u32 n, slot = txq->tail & (txq->size - 1);
+  u32 *tx = &txq->bufs[slot];
 
-  f = from = vlib_frame_vector_args (frame);
-  n_left_from = frame->n_vectors;
-  vlib_get_buffers (vm, from, bufs, n_left_from);
+  /* do not enqueue more packet than ring space */
+  n_left_from = clib_min (n_left_from, txq->size - (txq->tail - txq->head));
+  /* avoid wrap-around logic in core loop */
+  n = n_left_from = clib_min (n_left_from, txq->size - slot);
 
+  /* if ring is full, do nothing */
+  if (PREDICT_FALSE (0 == n_left_from))
+    return 0;
+
+  vlib_get_buffers (vm, bi, bufs, n_left_from);
   memset (w, 0, n_left_from * sizeof (w[0]));
 
-  while (n_left_from >= 4)
+  while (n >= 4)
     {
-      if (PREDICT_TRUE (n_left_from >= 8))
+      if (PREDICT_TRUE (n >= 8))
        {
          vlib_prefetch_buffer_header (b[4 + 0], LOAD);
          vlib_prefetch_buffer_header (b[4 + 1], LOAD);
@@ -82,96 +81,126 @@ VNET_DEVICE_CLASS_TX_FN (rdma_device_class) (vlib_main_t * vm,
          CLIB_PREFETCH (&w[4 + 3], CLIB_CACHE_LINE_BYTES, STORE);
        }
 
+      vlib_buffer_copy_indices (tx, bi, 4);
+
       s[0].addr = vlib_buffer_get_current_va (b[0]);
       s[0].length = b[0]->current_length;
-      s[0].lkey = rd->mr->lkey;
+      s[0].lkey = rd->lkey;
 
       s[1].addr = vlib_buffer_get_current_va (b[1]);
       s[1].length = b[1]->current_length;
-      s[1].lkey = rd->mr->lkey;
+      s[1].lkey = rd->lkey;
 
       s[2].addr = vlib_buffer_get_current_va (b[2]);
       s[2].length = b[2]->current_length;
-      s[2].lkey = rd->mr->lkey;
+      s[2].lkey = rd->lkey;
 
       s[3].addr = vlib_buffer_get_current_va (b[3]);
       s[3].length = b[3]->current_length;
-      s[3].lkey = rd->mr->lkey;
+      s[3].lkey = rd->lkey;
 
-      w[0].wr_id = f[0];
       w[0].next = &w[0] + 1;
       w[0].sg_list = &s[0];
       w[0].num_sge = 1;
       w[0].opcode = IBV_WR_SEND;
 
-      w[1].wr_id = f[1];
       w[1].next = &w[1] + 1;
       w[1].sg_list = &s[1];
       w[1].num_sge = 1;
       w[1].opcode = IBV_WR_SEND;
 
-      w[2].wr_id = f[2];
       w[2].next = &w[2] + 1;
       w[2].sg_list = &s[2];
       w[2].num_sge = 1;
       w[2].opcode = IBV_WR_SEND;
 
-      w[3].wr_id = f[3];
       w[3].next = &w[3] + 1;
       w[3].sg_list = &s[3];
       w[3].num_sge = 1;
       w[3].opcode = IBV_WR_SEND;
 
       s += 4;
-      f += 4;
       w += 4;
       b += 4;
-      n_left_from -= 4;
+      bi += 4;
+      tx += 4;
+      n -= 4;
     }
 
-  while (n_left_from >= 1)
+  while (n >= 1)
     {
+      vlib_buffer_copy_indices (tx, bi, 1);
+
       s[0].addr = vlib_buffer_get_current_va (b[0]);
       s[0].length = b[0]->current_length;
-      s[0].lkey = rd->mr->lkey;
+      s[0].lkey = rd->lkey;
 
-      w[0].wr_id = f[0];
       w[0].next = &w[0] + 1;
       w[0].sg_list = &s[0];
       w[0].num_sge = 1;
       w[0].opcode = IBV_WR_SEND;
 
       s += 1;
-      f += 1;
       w += 1;
       b += 1;
-      n_left_from -= 1;
+      bi += 1;
+      tx += 1;
+      n -= 1;
     }
 
-  w[-1].next = 0;              /* fix next pointer in WR linked-list last item */
+  w[-1].wr_id = txq->tail + n_left_from;       /* register item to free */
+  w[-1].next = 0;              /* fix next pointer in WR linked-list */
+  w[-1].send_flags = IBV_SEND_SIGNALED;        /* generate a CQE so we can free buffers */
 
   w = wr;
+  if (PREDICT_FALSE (0 != ibv_post_send (txq->qp, w, &w)))
+    n_left_from = w - wr;
+
+  txq->tail += n_left_from;
+  return n_left_from;
+}
+
+VNET_DEVICE_CLASS_TX_FN (rdma_device_class) (vlib_main_t * vm,
+                                            vlib_node_runtime_t * node,
+                                            vlib_frame_t * frame)
+{
+  rdma_main_t *rm = &rdma_main;
+  vnet_interface_output_runtime_t *ord = (void *) node->runtime_data;
+  rdma_device_t *rd = pool_elt_at_index (rm->devices, ord->dev_instance);
+  u32 thread_index = vm->thread_index;
+  rdma_txq_t *txq =
+    vec_elt_at_index (rd->txqs, thread_index % vec_len (rd->txqs));
+  u32 *from;
+  u32 n_left_from;
+  int i;
+
+  ASSERT (txq->size >= VLIB_FRAME_SIZE && is_pow2 (txq->size));
+  ASSERT (txq->tail - txq->head <= txq->size);
+
+  from = vlib_frame_vector_args (frame);
+  n_left_from = frame->n_vectors;
+
   clib_spinlock_lock_if_init (&txq->lock);
-  for (i = 0; i < 5; i++)
+
+  for (i = 0; i < 5 && n_left_from >= 0; i++)
     {
+      u32 n_enq;
       rdma_device_output_free (vm, txq);
-      ret = ibv_post_send (txq->qp, w, &w);
-      if (0 == ret)
-       break;
+      n_enq = rmda_device_output_tx (vm, rd, txq, n_left_from, from);
+      n_left_from -= n_enq;
+      from += n_enq;
     }
-  clib_spinlock_unlock_if_init (&txq->lock);
 
-  n_tx_packets = 0 == ret ? frame->n_vectors : w - wr;
-  n_tx_failed = frame->n_vectors - n_tx_packets;
+  clib_spinlock_unlock_if_init (&txq->lock);
 
-  if (PREDICT_FALSE (n_tx_failed))
+  if (PREDICT_FALSE (n_left_from))
     {
-      vlib_buffer_free (vm, &from[n_tx_packets], n_tx_failed);
+      vlib_buffer_free (vm, from, n_left_from);
       vlib_error_count (vm, node->node_index,
-                       RDMA_TX_ERROR_NO_FREE_SLOTS, n_tx_failed);
+                       RDMA_TX_ERROR_NO_FREE_SLOTS, n_left_from);
     }
 
-  return n_tx_packets;
+  return frame->n_vectors - n_left_from;
 }
 
 /*
index c7df6f7..0aae498 100644 (file)
@@ -37,40 +37,47 @@ enum
 typedef struct
 {
   CLIB_CACHE_LINE_ALIGN_MARK (cacheline0);
-  u32 size;
-  u32 n_enq;
   struct ibv_cq *cq;
   struct ibv_wq *wq;
+  u32 *bufs;
+  u32 size;
+  u32 head;
+  u32 tail;
 } rdma_rxq_t;
 
 typedef struct
 {
   CLIB_CACHE_LINE_ALIGN_MARK (cacheline0);
-  u32 size;
-  u32 n_enq;
+  clib_spinlock_t lock;
   struct ibv_cq *cq;
   struct ibv_qp *qp;
-  clib_spinlock_t lock;
+  u32 *bufs;
+  u32 size;
+  u32 head;
+  u32 tail;
 } rdma_txq_t;
 
 typedef struct
 {
   CLIB_CACHE_LINE_ALIGN_MARK (cacheline0);
+
+  /* following fields are accessed in datapath */
+  rdma_rxq_t *rxqs;
+  rdma_txq_t *txqs;
   u32 flags;
   u32 per_interface_next_index;
-
-  u32 dev_instance;
   u32 sw_if_index;
   u32 hw_if_index;
+  u32 lkey;                    /* cache of mr->lkey */
+  u8 pool;                     /* buffer pool index */
 
-  u32 async_event_clib_file_index;
-
-  rdma_rxq_t *rxqs;
-  rdma_txq_t *txqs;
-
+  /* fields below are not accessed in datapath */
+  vlib_pci_device_info_t *pci;
   u8 *name;
+  u8 *linux_ifname;
   mac_address_t hwaddr;
-  vlib_pci_addr_t pci_addr;
+  u32 async_event_clib_file_index;
+  u32 dev_instance;
 
   struct ibv_context *ctx;
   struct ibv_pd *pd;
@@ -80,7 +87,6 @@ typedef struct
   struct ibv_flow *flow_ucast;
   struct ibv_flow *flow_mcast;
 
-  /* error */
   clib_error_t *error;
 } rdma_device_t;