af_xdp: make sure all packets are transmitted
[vpp.git] / src / plugins / af_xdp / output.c
1 #include <string.h>
2 #include <vlib/vlib.h>
3 #include <vlib/unix/unix.h>
4 #include <vnet/ethernet/ethernet.h>
5 #include <vnet/devices/devices.h>
6 #include <af_xdp/af_xdp.h>
7
8 #define AF_XDP_TX_RETRIES 5
9
10 static_always_inline void
11 af_xdp_device_output_free (vlib_main_t * vm, const vlib_node_runtime_t * node,
12                            af_xdp_txq_t * txq)
13 {
14   const __u64 *compl;
15   const u32 size = txq->cq.size;
16   const u32 mask = size - 1;
17   u32 bis[VLIB_FRAME_SIZE], *bi = bis;
18   u32 n_wrap, idx;
19   u32 n = xsk_ring_cons__peek (&txq->cq, ARRAY_LEN (bis), &idx);
20   const u32 n_free = n;
21
22   /* we rely on on casting addr (u64) -> bi (u32) to discard XSK offset below */
23   STATIC_ASSERT (BITS (bi[0]) + CLIB_LOG2_CACHE_LINE_BYTES <=
24                  XSK_UNALIGNED_BUF_OFFSET_SHIFT, "wrong size");
25   ASSERT (mask == txq->cq.mask);
26
27   if (!n_free)
28     return;
29
30   compl = xsk_ring_cons__comp_addr (&txq->cq, idx);
31   n = clib_min (n_free, size - (idx & mask));
32   n_wrap = n_free - n;
33
34 wrap_around:
35
36   while (n >= 8)
37     {
38 #ifdef CLIB_HAVE_VEC256
39       u64x4 b0 = (*(u64x4u *) (compl + 0)) >> CLIB_LOG2_CACHE_LINE_BYTES;
40       u64x4 b1 = (*(u64x4u *) (compl + 4)) >> CLIB_LOG2_CACHE_LINE_BYTES;
41       /* permute 256-bit register so lower u32s of each buffer index are
42        * placed into lower 128-bits */
43       const u32x8 mask = { 0, 2, 4, 6, 1, 3, 5, 7 };
44       u32x8 b2 = u32x8_permute ((u32x8) b0, mask);
45       u32x8 b3 = u32x8_permute ((u32x8) b1, mask);
46       /* extract lower 128-bits and save them to the array of buffer indices */
47       *(u32x4u *) (bi + 0) = u32x8_extract_lo (b2);
48       *(u32x4u *) (bi + 4) = u32x8_extract_lo (b3);
49 #else
50       bi[0] = compl[0] >> CLIB_LOG2_CACHE_LINE_BYTES;
51       bi[1] = compl[1] >> CLIB_LOG2_CACHE_LINE_BYTES;
52       bi[2] = compl[2] >> CLIB_LOG2_CACHE_LINE_BYTES;
53       bi[3] = compl[3] >> CLIB_LOG2_CACHE_LINE_BYTES;
54       bi[4] = compl[4] >> CLIB_LOG2_CACHE_LINE_BYTES;
55       bi[5] = compl[5] >> CLIB_LOG2_CACHE_LINE_BYTES;
56       bi[6] = compl[6] >> CLIB_LOG2_CACHE_LINE_BYTES;
57       bi[7] = compl[7] >> CLIB_LOG2_CACHE_LINE_BYTES;
58 #endif
59       compl += 8;
60       bi += 8;
61       n -= 8;
62     }
63
64   while (n >= 1)
65     {
66       bi[0] = compl[0] >> CLIB_LOG2_CACHE_LINE_BYTES;
67       ASSERT (vlib_buffer_is_known (vm, bi[0]) ==
68               VLIB_BUFFER_KNOWN_ALLOCATED);
69       compl += 1;
70       bi += 1;
71       n -= 1;
72     }
73
74   if (n_wrap)
75     {
76       compl = xsk_ring_cons__comp_addr (&txq->cq, 0);
77       n = n_wrap;
78       n_wrap = 0;
79       goto wrap_around;
80     }
81
82   xsk_ring_cons__release (&txq->cq, n_free);
83   vlib_buffer_free (vm, bis, n_free);
84 }
85
86 static_always_inline void
87 af_xdp_device_output_tx_db (vlib_main_t * vm,
88                             const vlib_node_runtime_t * node,
89                             af_xdp_device_t * ad,
90                             af_xdp_txq_t * txq, const u32 n_tx)
91 {
92   xsk_ring_prod__submit (&txq->tx, n_tx);
93
94   if (!xsk_ring_prod__needs_wakeup (&txq->tx))
95     return;
96
97   vlib_error_count (vm, node->node_index, AF_XDP_TX_ERROR_SYSCALL_REQUIRED, 1);
98
99   clib_spinlock_lock_if_init (&txq->syscall_lock);
100
101   if (xsk_ring_prod__needs_wakeup (&txq->tx))
102     {
103       const struct msghdr msg = {};
104       int ret;
105       /* On tx, xsk socket will only tx up to TX_BATCH_SIZE, as defined in
106        * kernel net/xdp/xsk.c. Unfortunately we do not know how much this is,
107        * our only option is to retry until everything is sent... */
108       do
109         {
110           ret = sendmsg (txq->xsk_fd, &msg, MSG_DONTWAIT);
111         }
112       while (ret < 0 && EAGAIN == errno);
113       if (PREDICT_FALSE (ret < 0))
114         {
115           /* not EAGAIN: something bad is happening */
116           vlib_error_count (vm, node->node_index,
117                             AF_XDP_TX_ERROR_SYSCALL_FAILURES, 1);
118           af_xdp_device_error (ad, "tx poll() failed");
119         }
120     }
121
122   clib_spinlock_unlock_if_init (&txq->syscall_lock);
123 }
124
125 static_always_inline u32
126 af_xdp_device_output_tx_try (vlib_main_t * vm,
127                              const vlib_node_runtime_t * node,
128                              af_xdp_device_t * ad, af_xdp_txq_t * txq,
129                              u32 n_tx, u32 * bi)
130 {
131   vlib_buffer_t *bufs[VLIB_FRAME_SIZE], **b = bufs;
132   const uword start = vm->buffer_main->buffer_mem_start;
133   const u32 size = txq->tx.size;
134   const u32 mask = size - 1;
135   struct xdp_desc *desc;
136   u64 offset, addr;
137   u32 idx, n, n_wrap;
138
139   ASSERT (mask == txq->cq.mask);
140
141   n_tx = xsk_ring_prod__reserve (&txq->tx, n_tx, &idx);
142
143   /* if ring is full, do nothing */
144   if (PREDICT_FALSE (0 == n_tx))
145     return 0;
146
147   vlib_get_buffers (vm, bi, bufs, n_tx);
148
149   desc = xsk_ring_prod__tx_desc (&txq->tx, idx);
150   n = clib_min (n_tx, size - (idx & mask));
151   n_wrap = n_tx - n;
152
153 wrap_around:
154
155   while (n >= 8)
156     {
157       vlib_prefetch_buffer_header (b[4], LOAD);
158       offset =
159         (sizeof (vlib_buffer_t) +
160          b[0]->current_data) << XSK_UNALIGNED_BUF_OFFSET_SHIFT;
161       addr = pointer_to_uword (b[0]) - start;
162       desc[0].addr = offset | addr;
163       desc[0].len = b[0]->current_length;
164
165       vlib_prefetch_buffer_header (b[5], LOAD);
166       offset =
167         (sizeof (vlib_buffer_t) +
168          b[1]->current_data) << XSK_UNALIGNED_BUF_OFFSET_SHIFT;
169       addr = pointer_to_uword (b[1]) - start;
170       desc[1].addr = offset | addr;
171       desc[1].len = b[1]->current_length;
172
173       vlib_prefetch_buffer_header (b[6], LOAD);
174       offset =
175         (sizeof (vlib_buffer_t) +
176          b[2]->current_data) << XSK_UNALIGNED_BUF_OFFSET_SHIFT;
177       addr = pointer_to_uword (b[2]) - start;
178       desc[2].addr = offset | addr;
179       desc[2].len = b[2]->current_length;
180
181       vlib_prefetch_buffer_header (b[7], LOAD);
182       offset =
183         (sizeof (vlib_buffer_t) +
184          b[3]->current_data) << XSK_UNALIGNED_BUF_OFFSET_SHIFT;
185       addr = pointer_to_uword (b[3]) - start;
186       desc[3].addr = offset | addr;
187       desc[3].len = b[3]->current_length;
188
189       desc += 4;
190       b += 4;
191       n -= 4;
192     }
193
194   while (n >= 1)
195     {
196       offset =
197         (sizeof (vlib_buffer_t) +
198          b[0]->current_data) << XSK_UNALIGNED_BUF_OFFSET_SHIFT;
199       addr = pointer_to_uword (b[0]) - start;
200       desc[0].addr = offset | addr;
201       desc[0].len = b[0]->current_length;
202       desc += 1;
203       b += 1;
204       n -= 1;
205     }
206
207   if (n_wrap)
208     {
209       desc = xsk_ring_prod__tx_desc (&txq->tx, 0);
210       n = n_wrap;
211       n_wrap = 0;
212       goto wrap_around;
213     }
214
215   return n_tx;
216 }
217
218 VNET_DEVICE_CLASS_TX_FN (af_xdp_device_class) (vlib_main_t * vm,
219                                                vlib_node_runtime_t * node,
220                                                vlib_frame_t * frame)
221 {
222   af_xdp_main_t *rm = &af_xdp_main;
223   vnet_interface_output_runtime_t *ord = (void *) node->runtime_data;
224   af_xdp_device_t *ad = pool_elt_at_index (rm->devices, ord->dev_instance);
225   const vnet_hw_if_tx_frame_t *tf = vlib_frame_scalar_args (frame);
226   const int shared_queue = tf->shared_queue;
227   af_xdp_txq_t *txq = vec_elt_at_index (ad->txqs, tf->queue_id);
228   u32 *from;
229   u32 n, n_tx;
230   int i;
231
232   from = vlib_frame_vector_args (frame);
233   n_tx = frame->n_vectors;
234
235   if (shared_queue)
236     clib_spinlock_lock (&txq->lock);
237
238   for (i = 0, n = 0; i < AF_XDP_TX_RETRIES && n < n_tx; i++)
239     {
240       u32 n_enq;
241       af_xdp_device_output_free (vm, node, txq);
242       n_enq =
243         af_xdp_device_output_tx_try (vm, node, ad, txq, n_tx - n, from + n);
244       n += n_enq;
245     }
246
247   af_xdp_device_output_tx_db (vm, node, ad, txq, n);
248
249   if (shared_queue)
250     clib_spinlock_unlock (&txq->lock);
251
252   if (PREDICT_FALSE (n != n_tx))
253     {
254       vlib_buffer_free (vm, from + n, n_tx - n);
255       vlib_error_count (vm, node->node_index,
256                         AF_XDP_TX_ERROR_NO_FREE_SLOTS, n_tx - n);
257     }
258
259   return n;
260 }
261
262 /*
263  * fd.io coding-style-patch-verification: ON
264  *
265  * Local Variables:
266  * eval: (c-set-style "gnu")
267  * End:
268  */