rdma: ring db only once per vector on tx
[vpp.git] / src / plugins / rdma / output.c
1 /*
2  *------------------------------------------------------------------
3  * Copyright (c) 2018 Cisco and/or its affiliates.
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at:
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *------------------------------------------------------------------
16  */
17
18 #include <vlib/vlib.h>
19 #include <vlib/unix/unix.h>
20 #include <vlib/pci/pci.h>
21 #include <vppinfra/ring.h>
22 #include <vnet/ethernet/ethernet.h>
23 #include <vnet/devices/devices.h>
24 #include <rdma/rdma.h>
25
26 #define RDMA_TX_RETRIES 5
27
28 #define RDMA_TXQ_DV_DSEG_SZ(txq)        (RDMA_MLX5_WQE_DS * RDMA_TXQ_DV_SQ_SZ(txq))
29 #define RDMA_TXQ_DV_DSEG2WQE(d)         (((d) + RDMA_MLX5_WQE_DS - 1) / RDMA_MLX5_WQE_DS)
30
31 /*
32  * MLX5 direct verbs tx/free functions
33  */
34
35 static_always_inline void
36 rdma_device_output_free_mlx5 (vlib_main_t * vm,
37                               const vlib_node_runtime_t * node,
38                               rdma_txq_t * txq)
39 {
40   u16 idx = txq->dv_cq_idx;
41   u32 cq_mask = pow2_mask (txq->dv_cq_log2sz);
42   u32 sq_mask = pow2_mask (txq->dv_sq_log2sz);
43   u32 mask = pow2_mask (txq->bufs_log2sz);
44   u32 buf_sz = RDMA_TXQ_BUF_SZ (txq);
45   u32 log2_cq_sz = txq->dv_cq_log2sz;
46   struct mlx5_cqe64 *cqes = txq->dv_cq_cqes, *cur = cqes + (idx & cq_mask);
47   u8 op_own, saved;
48   const rdma_mlx5_wqe_t *wqe;
49
50   for (;;)
51     {
52       op_own = *(volatile u8 *) &cur->op_own;
53       if (((idx >> log2_cq_sz) & MLX5_CQE_OWNER_MASK) !=
54           (op_own & MLX5_CQE_OWNER_MASK) || (op_own >> 4) == MLX5_CQE_INVALID)
55         break;
56       if (PREDICT_FALSE ((op_own >> 4)) != MLX5_CQE_REQ)
57         vlib_error_count (vm, node->node_index, RDMA_TX_ERROR_COMPLETION, 1);
58       idx++;
59       cur = cqes + (idx & cq_mask);
60     }
61
62   if (idx == txq->dv_cq_idx)
63     return;                     /* nothing to do */
64
65   cur = cqes + ((idx - 1) & cq_mask);
66   saved = cur->op_own;
67   (void) saved;
68   cur->op_own = 0xf0;
69   txq->dv_cq_idx = idx;
70
71   /* retrieve original WQE and get new tail counter */
72   wqe = txq->dv_sq_wqes + (be16toh (cur->wqe_counter) & sq_mask);
73   if (PREDICT_FALSE (wqe->ctrl.imm == RDMA_TXQ_DV_INVALID_ID))
74     return;                     /* can happen if CQE reports error for an intermediate WQE */
75
76   ASSERT (RDMA_TXQ_USED_SZ (txq->head, wqe->ctrl.imm) <= buf_sz &&
77           RDMA_TXQ_USED_SZ (wqe->ctrl.imm, txq->tail) < buf_sz);
78
79   /* free sent buffers and update txq head */
80   vlib_buffer_free_from_ring (vm, txq->bufs, txq->head & mask, buf_sz,
81                               RDMA_TXQ_USED_SZ (txq->head, wqe->ctrl.imm));
82   txq->head = wqe->ctrl.imm;
83
84   /* ring doorbell */
85   CLIB_MEMORY_STORE_BARRIER ();
86   txq->dv_cq_dbrec[0] = htobe32 (idx);
87 }
88
89 static_always_inline void
90 rdma_device_output_tx_mlx5_doorbell (rdma_txq_t * txq, rdma_mlx5_wqe_t * last,
91                                      const u16 tail, u32 sq_mask)
92 {
93   last->ctrl.imm = tail;        /* register item to free */
94   last->ctrl.fm_ce_se = MLX5_WQE_CTRL_CQ_UPDATE;        /* generate a CQE so we can free buffers */
95
96   ASSERT (tail != txq->tail &&
97           RDMA_TXQ_AVAIL_SZ (txq, txq->head, txq->tail) >=
98           RDMA_TXQ_USED_SZ (txq->tail, tail));
99
100   CLIB_MEMORY_STORE_BARRIER ();
101   txq->dv_sq_dbrec[MLX5_SND_DBR] = htobe32 (tail);
102   CLIB_COMPILER_BARRIER ();
103   txq->dv_sq_db[0] = *(u64 *) (txq->dv_sq_wqes + (txq->tail & sq_mask));
104 }
105
106 static_always_inline void
107 rdma_mlx5_wqe_init (rdma_mlx5_wqe_t * wqe, const void *tmpl,
108                     vlib_buffer_t * b, const u16 tail)
109 {
110   u16 sz = b->current_length;
111   const void *cur = vlib_buffer_get_current (b);
112   uword addr = pointer_to_uword (cur);
113
114   clib_memcpy_fast (wqe, tmpl, RDMA_MLX5_WQE_SZ);
115   /* speculatively copy at least MLX5_ETH_L2_INLINE_HEADER_SIZE (18-bytes) */
116   STATIC_ASSERT (STRUCT_SIZE_OF (struct mlx5_wqe_eth_seg, inline_hdr_start) +
117                  STRUCT_SIZE_OF (struct mlx5_wqe_eth_seg,
118                                  inline_hdr) >=
119                  MLX5_ETH_L2_INLINE_HEADER_SIZE, "wrong size");
120   clib_memcpy_fast (wqe->eseg.inline_hdr_start, cur,
121                     MLX5_ETH_L2_INLINE_HEADER_SIZE);
122
123   wqe->wqe_index_lo = tail;
124   wqe->wqe_index_hi = tail >> 8;
125   if (PREDICT_TRUE (sz >= MLX5_ETH_L2_INLINE_HEADER_SIZE))
126     {
127       /* inline_hdr_sz is set to MLX5_ETH_L2_INLINE_HEADER_SIZE
128          in the template */
129       wqe->dseg.byte_count = htobe32 (sz - MLX5_ETH_L2_INLINE_HEADER_SIZE);
130       wqe->dseg.addr = htobe64 (addr + MLX5_ETH_L2_INLINE_HEADER_SIZE);
131     }
132   else
133     {
134       /* dseg.byte_count and desg.addr are set to 0 in the template */
135       wqe->eseg.inline_hdr_sz = htobe16 (sz);
136     }
137 }
138
139 /*
140  * specific data path for chained buffers, supporting ring wrap-around
141  * contrary to the normal path - otherwise we may fail to enqueue chained
142  * buffers because we are close to the end of the ring while we still have
143  * plenty of descriptors available
144  */
145 static_always_inline u32
146 rdma_device_output_tx_mlx5_chained (vlib_main_t * vm,
147                                     const vlib_node_runtime_t * node,
148                                     const rdma_device_t * rd,
149                                     rdma_txq_t * txq, u32 n_left_from, u32 n,
150                                     u32 * bi, vlib_buffer_t ** b,
151                                     rdma_mlx5_wqe_t * wqe, u16 tail)
152 {
153   rdma_mlx5_wqe_t *last = wqe;
154   u32 wqe_n = RDMA_TXQ_AVAIL_SZ (txq, txq->head, tail);
155   u32 sq_mask = pow2_mask (txq->dv_sq_log2sz);
156   u32 mask = pow2_mask (txq->bufs_log2sz);
157   u32 dseg_mask = RDMA_TXQ_DV_DSEG_SZ (txq) - 1;
158   const u32 lkey = wqe[0].dseg.lkey;
159
160   vlib_buffer_copy_indices (txq->bufs + (txq->tail & mask), bi,
161                             n_left_from - n);
162
163   while (n >= 1 && wqe_n >= 1)
164     {
165       u32 *bufs = txq->bufs + (tail & mask);
166       rdma_mlx5_wqe_t *wqe = txq->dv_sq_wqes + (tail & sq_mask);
167
168       /* setup the head WQE */
169       rdma_mlx5_wqe_init (wqe, txq->dv_wqe_tmpl, b[0], tail);
170
171       bufs[0] = bi[0];
172
173       if (b[0]->flags & VLIB_BUFFER_NEXT_PRESENT)
174         {
175           /*
176            * max number of available dseg:
177            *  - 4 dseg per WQEBB available
178            *  - max 32 dseg per WQE (5-bits length field in WQE ctrl)
179            */
180 #define RDMA_MLX5_WQE_DS_MAX    (1 << 5)
181           const u32 dseg_max =
182             clib_min (RDMA_MLX5_WQE_DS * (wqe_n - 1), RDMA_MLX5_WQE_DS_MAX);
183           vlib_buffer_t *chained_b = b[0];
184           u32 chained_n = 0;
185
186           /* there are exactly 4 dseg per WQEBB and we rely on that */
187           STATIC_ASSERT (RDMA_MLX5_WQE_DS *
188                          sizeof (struct mlx5_wqe_data_seg) ==
189                          MLX5_SEND_WQE_BB, "wrong size");
190
191           /*
192            * iterate over fragments, supporting ring wrap-around contrary to
193            * the normal path - otherwise we may fail to enqueue chained
194            * buffers because we are close to the end of the ring while we
195            * still have plenty of descriptors available
196            */
197           while (chained_n < dseg_max
198                  && chained_b->flags & VLIB_BUFFER_NEXT_PRESENT)
199             {
200               struct mlx5_wqe_data_seg *dseg = (void *) txq->dv_sq_wqes;
201               dseg += ((tail + 1) * RDMA_MLX5_WQE_DS + chained_n) & dseg_mask;
202               if (((clib_address_t) dseg & (MLX5_SEND_WQE_BB - 1)) == 0)
203                 {
204                   /*
205                    * start of new WQEBB
206                    * head/tail are shared between buffers and descriptor
207                    * In order to maintain 1:1 correspondance between
208                    * buffer index and descriptor index, we build
209                    * 4-fragments chains and save the head
210                    */
211                   chained_b->flags &= ~(VLIB_BUFFER_NEXT_PRESENT |
212                                         VLIB_BUFFER_TOTAL_LENGTH_VALID);
213                   u32 idx = tail + 1 + RDMA_TXQ_DV_DSEG2WQE (chained_n);
214                   idx &= mask;
215                   txq->bufs[idx] = chained_b->next_buffer;
216                 }
217
218               chained_b = vlib_get_buffer (vm, chained_b->next_buffer);
219               dseg->byte_count = htobe32 (chained_b->current_length);
220               dseg->lkey = lkey;
221               dseg->addr = htobe64 (vlib_buffer_get_current_va (chained_b));
222
223               chained_n += 1;
224             }
225
226           if (chained_b->flags & VLIB_BUFFER_NEXT_PRESENT)
227             {
228               /*
229                * no descriptors left: drop the chain including 1st WQE
230                * skip the problematic packet and continue
231                */
232               vlib_buffer_free_from_ring (vm, txq->bufs, tail & mask,
233                                           RDMA_TXQ_BUF_SZ (txq), 1 +
234                                           RDMA_TXQ_DV_DSEG2WQE (chained_n));
235               vlib_error_count (vm, node->node_index,
236                                 dseg_max == chained_n ?
237                                 RDMA_TX_ERROR_SEGMENT_SIZE_EXCEEDED :
238                                 RDMA_TX_ERROR_NO_FREE_SLOTS, 1);
239
240               /* fixup tail to overwrite wqe head with next packet */
241               tail -= 1;
242             }
243           else
244             {
245               /* update WQE descriptor with new dseg number */
246               ((u8 *) & wqe[0].ctrl.qpn_ds)[3] = RDMA_MLX5_WQE_DS + chained_n;
247
248               tail += RDMA_TXQ_DV_DSEG2WQE (chained_n);
249               wqe_n -= RDMA_TXQ_DV_DSEG2WQE (chained_n);
250
251               last = wqe;
252             }
253         }
254       else
255         {
256           /* not chained */
257           last = wqe;
258         }
259
260       tail += 1;
261       bi += 1;
262       b += 1;
263       wqe_n -= 1;
264       n -= 1;
265     }
266
267   if (n == n_left_from)
268     return 0;                   /* we fail to enqueue even a single packet */
269
270   rdma_device_output_tx_mlx5_doorbell (txq, last, tail, sq_mask);
271   return n_left_from - n;
272 }
273
274 static_always_inline u32
275 rdma_device_output_tx_mlx5 (vlib_main_t * vm,
276                             const vlib_node_runtime_t * node,
277                             const rdma_device_t * rd, rdma_txq_t * txq,
278                             const u32 n_left_from, u32 * bi,
279                             vlib_buffer_t ** b)
280 {
281
282   u32 sq_mask = pow2_mask (txq->dv_sq_log2sz);
283   u32 mask = pow2_mask (txq->bufs_log2sz);
284   rdma_mlx5_wqe_t *wqe;
285   u32 n, n_wrap;
286   u16 tail = txq->tail;
287
288   ASSERT (RDMA_TXQ_BUF_SZ (txq) <= RDMA_TXQ_DV_SQ_SZ (txq));
289
290   /* avoid wrap-around logic in core loop */
291   n = clib_min (n_left_from, RDMA_TXQ_BUF_SZ (txq) - (tail & mask));
292   n_wrap = n_left_from - n;
293
294 wrap_around:
295   wqe = txq->dv_sq_wqes + (tail & sq_mask);
296
297   while (n >= 4)
298     {
299       u32 flags = b[0]->flags | b[1]->flags | b[2]->flags | b[3]->flags;
300       if (PREDICT_FALSE (flags & VLIB_BUFFER_NEXT_PRESENT))
301         return rdma_device_output_tx_mlx5_chained (vm, node, rd, txq,
302                                                    n_left_from, n, bi, b, wqe,
303                                                    tail);
304
305       if (PREDICT_TRUE (n >= 8))
306         {
307           vlib_prefetch_buffer_header (b[4], LOAD);
308           vlib_prefetch_buffer_header (b[5], LOAD);
309           vlib_prefetch_buffer_header (b[6], LOAD);
310           vlib_prefetch_buffer_header (b[7], LOAD);
311           CLIB_PREFETCH (wqe + 4, 4 * sizeof (wqe[0]), STORE);
312         }
313
314       rdma_mlx5_wqe_init (wqe + 0, txq->dv_wqe_tmpl, b[0], tail + 0);
315       rdma_mlx5_wqe_init (wqe + 1, txq->dv_wqe_tmpl, b[1], tail + 1);
316       rdma_mlx5_wqe_init (wqe + 2, txq->dv_wqe_tmpl, b[2], tail + 2);
317       rdma_mlx5_wqe_init (wqe + 3, txq->dv_wqe_tmpl, b[3], tail + 3);
318
319       b += 4;
320       tail += 4;
321       wqe += 4;
322       n -= 4;
323     }
324
325   while (n >= 1)
326     {
327       if (PREDICT_FALSE (b[0]->flags & VLIB_BUFFER_NEXT_PRESENT))
328         return rdma_device_output_tx_mlx5_chained (vm, node, rd, txq,
329                                                    n_left_from, n, bi, b, wqe,
330                                                    tail);
331
332       rdma_mlx5_wqe_init (wqe, txq->dv_wqe_tmpl, b[0], tail);
333
334       b += 1;
335       tail += 1;
336       wqe += 1;
337       n -= 1;
338     }
339
340   if (n_wrap)
341     {
342       n = n_wrap;
343       n_wrap = 0;
344       goto wrap_around;
345     }
346
347   rdma_device_output_tx_mlx5_doorbell (txq, &wqe[-1], tail, sq_mask);
348   return n_left_from;
349 }
350
351 /*
352  * standard ibverb tx/free functions
353  */
354
355 static_always_inline void
356 rdma_device_output_free_ibverb (vlib_main_t * vm,
357                                 const vlib_node_runtime_t * node,
358                                 rdma_txq_t * txq)
359 {
360   struct ibv_wc wc[VLIB_FRAME_SIZE];
361   u32 mask = pow2_mask (txq->bufs_log2sz);
362   u16 tail;
363   int n;
364
365   n = ibv_poll_cq (txq->ibv_cq, VLIB_FRAME_SIZE, wc);
366   if (n <= 0)
367     {
368       if (PREDICT_FALSE (n < 0))
369         vlib_error_count (vm, node->node_index, RDMA_TX_ERROR_COMPLETION, 1);
370       return;
371     }
372
373   while (PREDICT_FALSE (IBV_WC_SUCCESS != wc[n - 1].status))
374     {
375       vlib_error_count (vm, node->node_index, RDMA_TX_ERROR_COMPLETION, 1);
376       n--;
377       if (0 == n)
378         return;
379     }
380
381   tail = wc[n - 1].wr_id;
382   vlib_buffer_free_from_ring (vm, txq->bufs, txq->head & mask,
383                               RDMA_TXQ_BUF_SZ (txq),
384                               RDMA_TXQ_USED_SZ (txq->head, tail));
385   txq->head = tail;
386 }
387
388 static_always_inline u32
389 rdma_device_output_tx_ibverb (vlib_main_t * vm,
390                               const vlib_node_runtime_t * node,
391                               const rdma_device_t * rd, rdma_txq_t * txq,
392                               u32 n_left_from, u32 * bi, vlib_buffer_t ** b)
393 {
394   struct ibv_send_wr wr[VLIB_FRAME_SIZE], *w = wr;
395   struct ibv_sge sge[VLIB_FRAME_SIZE], *s = sge;
396   u32 n = n_left_from;
397
398   while (n >= 4)
399     {
400       if (PREDICT_TRUE (n >= 8))
401         {
402           vlib_prefetch_buffer_header (b[4 + 0], LOAD);
403           vlib_prefetch_buffer_header (b[4 + 1], LOAD);
404           vlib_prefetch_buffer_header (b[4 + 2], LOAD);
405           vlib_prefetch_buffer_header (b[4 + 3], LOAD);
406           CLIB_PREFETCH (&s[4 + 0], 4 * sizeof (s[0]), STORE);
407           clib_prefetch_store (&w[4 + 0]);
408           clib_prefetch_store (&w[4 + 1]);
409           clib_prefetch_store (&w[4 + 2]);
410           clib_prefetch_store (&w[4 + 3]);
411         }
412
413       s[0].addr = vlib_buffer_get_current_va (b[0]);
414       s[0].length = b[0]->current_length;
415       s[0].lkey = rd->lkey;
416
417       s[1].addr = vlib_buffer_get_current_va (b[1]);
418       s[1].length = b[1]->current_length;
419       s[1].lkey = rd->lkey;
420
421       s[2].addr = vlib_buffer_get_current_va (b[2]);
422       s[2].length = b[2]->current_length;
423       s[2].lkey = rd->lkey;
424
425       s[3].addr = vlib_buffer_get_current_va (b[3]);
426       s[3].length = b[3]->current_length;
427       s[3].lkey = rd->lkey;
428
429       clib_memset_u8 (&w[0], 0, sizeof (w[0]));
430       w[0].next = &w[0] + 1;
431       w[0].sg_list = &s[0];
432       w[0].num_sge = 1;
433       w[0].opcode = IBV_WR_SEND;
434
435       clib_memset_u8 (&w[1], 0, sizeof (w[1]));
436       w[1].next = &w[1] + 1;
437       w[1].sg_list = &s[1];
438       w[1].num_sge = 1;
439       w[1].opcode = IBV_WR_SEND;
440
441       clib_memset_u8 (&w[2], 0, sizeof (w[2]));
442       w[2].next = &w[2] + 1;
443       w[2].sg_list = &s[2];
444       w[2].num_sge = 1;
445       w[2].opcode = IBV_WR_SEND;
446
447       clib_memset_u8 (&w[3], 0, sizeof (w[3]));
448       w[3].next = &w[3] + 1;
449       w[3].sg_list = &s[3];
450       w[3].num_sge = 1;
451       w[3].opcode = IBV_WR_SEND;
452
453       s += 4;
454       w += 4;
455       b += 4;
456       n -= 4;
457     }
458
459   while (n >= 1)
460     {
461       s[0].addr = vlib_buffer_get_current_va (b[0]);
462       s[0].length = b[0]->current_length;
463       s[0].lkey = rd->lkey;
464
465       clib_memset_u8 (&w[0], 0, sizeof (w[0]));
466       w[0].next = &w[0] + 1;
467       w[0].sg_list = &s[0];
468       w[0].num_sge = 1;
469       w[0].opcode = IBV_WR_SEND;
470
471       s += 1;
472       w += 1;
473       b += 1;
474       n -= 1;
475     }
476
477   w[-1].wr_id = txq->tail;      /* register item to free */
478   w[-1].next = 0;               /* fix next pointer in WR linked-list */
479   w[-1].send_flags = IBV_SEND_SIGNALED; /* generate a CQE so we can free buffers */
480
481   w = wr;
482   if (PREDICT_FALSE (0 != ibv_post_send (txq->ibv_qp, w, &w)))
483     {
484       vlib_error_count (vm, node->node_index, RDMA_TX_ERROR_SUBMISSION,
485                         n_left_from - (w - wr));
486       n_left_from = w - wr;
487     }
488
489   return n_left_from;
490 }
491
492 /*
493  * common tx/free functions
494  */
495
496 static_always_inline void
497 rdma_device_output_free (vlib_main_t * vm, const vlib_node_runtime_t * node,
498                          rdma_txq_t * txq, int is_mlx5dv)
499 {
500   if (is_mlx5dv)
501     rdma_device_output_free_mlx5 (vm, node, txq);
502   else
503     rdma_device_output_free_ibverb (vm, node, txq);
504 }
505
506 static_always_inline u32
507 rdma_device_output_tx_try (vlib_main_t * vm, const vlib_node_runtime_t * node,
508                            const rdma_device_t * rd, rdma_txq_t * txq,
509                            u32 n_left_from, u32 * bi, int is_mlx5dv)
510 {
511   vlib_buffer_t *b[VLIB_FRAME_SIZE];
512   const u32 mask = pow2_mask (txq->bufs_log2sz);
513
514   /* do not enqueue more packet than ring space */
515   n_left_from = clib_min (n_left_from, RDMA_TXQ_AVAIL_SZ (txq, txq->head,
516                                                           txq->tail));
517   /* if ring is full, do nothing */
518   if (PREDICT_FALSE (n_left_from == 0))
519     return 0;
520
521   vlib_get_buffers (vm, bi, b, n_left_from);
522
523   n_left_from = is_mlx5dv ?
524     rdma_device_output_tx_mlx5 (vm, node, rd, txq, n_left_from, bi, b) :
525     rdma_device_output_tx_ibverb (vm, node, rd, txq, n_left_from, bi, b);
526
527   vlib_buffer_copy_indices_to_ring (txq->bufs, bi, txq->tail & mask,
528                                     RDMA_TXQ_BUF_SZ (txq), n_left_from);
529   txq->tail += n_left_from;
530
531   return n_left_from;
532 }
533
534 static_always_inline uword
535 rdma_device_output_tx (vlib_main_t * vm, vlib_node_runtime_t * node,
536                        vlib_frame_t * frame, rdma_device_t * rd,
537                        int is_mlx5dv)
538 {
539   u32 thread_index = vm->thread_index;
540   rdma_txq_t *txq =
541     vec_elt_at_index (rd->txqs, thread_index % vec_len (rd->txqs));
542   u32 *from;
543   u32 n_left_from;
544   int i;
545
546   ASSERT (RDMA_TXQ_BUF_SZ (txq) >= VLIB_FRAME_SIZE);
547
548   from = vlib_frame_vector_args (frame);
549   n_left_from = frame->n_vectors;
550
551   clib_spinlock_lock_if_init (&txq->lock);
552
553   for (i = 0; i < RDMA_TX_RETRIES && n_left_from > 0; i++)
554     {
555       u32 n_enq;
556       rdma_device_output_free (vm, node, txq, is_mlx5dv);
557       n_enq = rdma_device_output_tx_try (vm, node, rd, txq, n_left_from, from,
558                                          is_mlx5dv);
559
560       n_left_from -= n_enq;
561       from += n_enq;
562     }
563
564   clib_spinlock_unlock_if_init (&txq->lock);
565
566   if (PREDICT_FALSE (n_left_from))
567     {
568       vlib_buffer_free (vm, from, n_left_from);
569       vlib_error_count (vm, node->node_index,
570                         RDMA_TX_ERROR_NO_FREE_SLOTS, n_left_from);
571     }
572
573   return frame->n_vectors - n_left_from;
574 }
575
576 VNET_DEVICE_CLASS_TX_FN (rdma_device_class) (vlib_main_t * vm,
577                                              vlib_node_runtime_t * node,
578                                              vlib_frame_t * frame)
579 {
580   rdma_main_t *rm = &rdma_main;
581   vnet_interface_output_runtime_t *ord = (void *) node->runtime_data;
582   rdma_device_t *rd = pool_elt_at_index (rm->devices, ord->dev_instance);
583
584   if (PREDICT_TRUE (rd->flags & RDMA_DEVICE_F_MLX5DV))
585     return rdma_device_output_tx (vm, node, frame, rd, 1 /* is_mlx5dv */ );
586
587   return rdma_device_output_tx (vm, node, frame, rd, 0 /* is_mlx5dv */ );
588 }
589
590 /*
591  * fd.io coding-style-patch-verification: ON
592  *
593  * Local Variables:
594  * eval: (c-set-style "gnu")
595  * End:
596  */