New upstream version 18.08
[deb_dpdk.git] / lib / librte_eventdev / rte_event_eth_rx_adapter.c
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2017 Intel Corporation.
3  * All rights reserved.
4  */
5 #if defined(LINUX)
6 #include <sys/epoll.h>
7 #endif
8 #include <unistd.h>
9
10 #include <rte_cycles.h>
11 #include <rte_common.h>
12 #include <rte_dev.h>
13 #include <rte_errno.h>
14 #include <rte_ethdev.h>
15 #include <rte_log.h>
16 #include <rte_malloc.h>
17 #include <rte_service_component.h>
18 #include <rte_thash.h>
19 #include <rte_interrupts.h>
20
21 #include "rte_eventdev.h"
22 #include "rte_eventdev_pmd.h"
23 #include "rte_event_eth_rx_adapter.h"
24
25 #define BATCH_SIZE              32
26 #define BLOCK_CNT_THRESHOLD     10
27 #define ETH_EVENT_BUFFER_SIZE   (4*BATCH_SIZE)
28
29 #define ETH_RX_ADAPTER_SERVICE_NAME_LEN 32
30 #define ETH_RX_ADAPTER_MEM_NAME_LEN     32
31
32 #define RSS_KEY_SIZE    40
33 /* value written to intr thread pipe to signal thread exit */
34 #define ETH_BRIDGE_INTR_THREAD_EXIT     1
35 /* Sentinel value to detect initialized file handle */
36 #define INIT_FD         -1
37
38 /*
39  * Used to store port and queue ID of interrupting Rx queue
40  */
41 union queue_data {
42         RTE_STD_C11
43         void *ptr;
44         struct {
45                 uint16_t port;
46                 uint16_t queue;
47         };
48 };
49
50 /*
51  * There is an instance of this struct per polled Rx queue added to the
52  * adapter
53  */
54 struct eth_rx_poll_entry {
55         /* Eth port to poll */
56         uint16_t eth_dev_id;
57         /* Eth rx queue to poll */
58         uint16_t eth_rx_qid;
59 };
60
61 /* Instance per adapter */
62 struct rte_eth_event_enqueue_buffer {
63         /* Count of events in this buffer */
64         uint16_t count;
65         /* Array of events in this buffer */
66         struct rte_event events[ETH_EVENT_BUFFER_SIZE];
67 };
68
69 struct rte_event_eth_rx_adapter {
70         /* RSS key */
71         uint8_t rss_key_be[RSS_KEY_SIZE];
72         /* Event device identifier */
73         uint8_t eventdev_id;
74         /* Per ethernet device structure */
75         struct eth_device_info *eth_devices;
76         /* Event port identifier */
77         uint8_t event_port_id;
78         /* Lock to serialize config updates with service function */
79         rte_spinlock_t rx_lock;
80         /* Max mbufs processed in any service function invocation */
81         uint32_t max_nb_rx;
82         /* Receive queues that need to be polled */
83         struct eth_rx_poll_entry *eth_rx_poll;
84         /* Size of the eth_rx_poll array */
85         uint16_t num_rx_polled;
86         /* Weighted round robin schedule */
87         uint32_t *wrr_sched;
88         /* wrr_sched[] size */
89         uint32_t wrr_len;
90         /* Next entry in wrr[] to begin polling */
91         uint32_t wrr_pos;
92         /* Event burst buffer */
93         struct rte_eth_event_enqueue_buffer event_enqueue_buffer;
94         /* Per adapter stats */
95         struct rte_event_eth_rx_adapter_stats stats;
96         /* Block count, counts up to BLOCK_CNT_THRESHOLD */
97         uint16_t enq_block_count;
98         /* Block start ts */
99         uint64_t rx_enq_block_start_ts;
100         /* epoll fd used to wait for Rx interrupts */
101         int epd;
102         /* Num of interrupt driven interrupt queues */
103         uint32_t num_rx_intr;
104         /* Used to send <dev id, queue id> of interrupting Rx queues from
105          * the interrupt thread to the Rx thread
106          */
107         struct rte_ring *intr_ring;
108         /* Rx Queue data (dev id, queue id) for the last non-empty
109          * queue polled
110          */
111         union queue_data qd;
112         /* queue_data is valid */
113         int qd_valid;
114         /* Interrupt ring lock, synchronizes Rx thread
115          * and interrupt thread
116          */
117         rte_spinlock_t intr_ring_lock;
118         /* event array passed to rte_poll_wait */
119         struct rte_epoll_event *epoll_events;
120         /* Count of interrupt vectors in use */
121         uint32_t num_intr_vec;
122         /* Thread blocked on Rx interrupts */
123         pthread_t rx_intr_thread;
124         /* Configuration callback for rte_service configuration */
125         rte_event_eth_rx_adapter_conf_cb conf_cb;
126         /* Configuration callback argument */
127         void *conf_arg;
128         /* Set if  default_cb is being used */
129         int default_cb_arg;
130         /* Service initialization state */
131         uint8_t service_inited;
132         /* Total count of Rx queues in adapter */
133         uint32_t nb_queues;
134         /* Memory allocation name */
135         char mem_name[ETH_RX_ADAPTER_MEM_NAME_LEN];
136         /* Socket identifier cached from eventdev */
137         int socket_id;
138         /* Per adapter EAL service */
139         uint32_t service_id;
140         /* Adapter started flag */
141         uint8_t rxa_started;
142         /* Adapter ID */
143         uint8_t id;
144 } __rte_cache_aligned;
145
146 /* Per eth device */
147 struct eth_device_info {
148         struct rte_eth_dev *dev;
149         struct eth_rx_queue_info *rx_queue;
150         /* Rx callback */
151         rte_event_eth_rx_adapter_cb_fn cb_fn;
152         /* Rx callback argument */
153         void *cb_arg;
154         /* Set if ethdev->eventdev packet transfer uses a
155          * hardware mechanism
156          */
157         uint8_t internal_event_port;
158         /* Set if the adapter is processing rx queues for
159          * this eth device and packet processing has been
160          * started, allows for the code to know if the PMD
161          * rx_adapter_stop callback needs to be invoked
162          */
163         uint8_t dev_rx_started;
164         /* Number of queues added for this device */
165         uint16_t nb_dev_queues;
166         /* Number of poll based queues
167          * If nb_rx_poll > 0, the start callback will
168          * be invoked if not already invoked
169          */
170         uint16_t nb_rx_poll;
171         /* Number of interrupt based queues
172          * If nb_rx_intr > 0, the start callback will
173          * be invoked if not already invoked.
174          */
175         uint16_t nb_rx_intr;
176         /* Number of queues that use the shared interrupt */
177         uint16_t nb_shared_intr;
178         /* sum(wrr(q)) for all queues within the device
179          * useful when deleting all device queues
180          */
181         uint32_t wrr_len;
182         /* Intr based queue index to start polling from, this is used
183          * if the number of shared interrupts is non-zero
184          */
185         uint16_t next_q_idx;
186         /* Intr based queue indices */
187         uint16_t *intr_queue;
188         /* device generates per Rx queue interrupt for queue index
189          * for queue indices < RTE_MAX_RXTX_INTR_VEC_ID - 1
190          */
191         int multi_intr_cap;
192         /* shared interrupt enabled */
193         int shared_intr_enabled;
194 };
195
196 /* Per Rx queue */
197 struct eth_rx_queue_info {
198         int queue_enabled;      /* True if added */
199         int intr_enabled;
200         uint16_t wt;            /* Polling weight */
201         uint8_t event_queue_id; /* Event queue to enqueue packets to */
202         uint8_t sched_type;     /* Sched type for events */
203         uint8_t priority;       /* Event priority */
204         uint32_t flow_id;       /* App provided flow identifier */
205         uint32_t flow_id_mask;  /* Set to ~0 if app provides flow id else 0 */
206 };
207
208 static struct rte_event_eth_rx_adapter **event_eth_rx_adapter;
209
210 static inline int
211 rxa_validate_id(uint8_t id)
212 {
213         return id < RTE_EVENT_ETH_RX_ADAPTER_MAX_INSTANCE;
214 }
215
216 #define RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, retval) do { \
217         if (!rxa_validate_id(id)) { \
218                 RTE_EDEV_LOG_ERR("Invalid eth Rx adapter id = %d\n", id); \
219                 return retval; \
220         } \
221 } while (0)
222
223 static inline int
224 rxa_sw_adapter_queue_count(struct rte_event_eth_rx_adapter *rx_adapter)
225 {
226         return rx_adapter->num_rx_polled + rx_adapter->num_rx_intr;
227 }
228
229 /* Greatest common divisor */
230 static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)
231 {
232         uint16_t r = a % b;
233
234         return r ? rxa_gcd_u16(b, r) : b;
235 }
236
237 /* Returns the next queue in the polling sequence
238  *
239  * http://kb.linuxvirtualserver.org/wiki/Weighted_Round-Robin_Scheduling
240  */
241 static int
242 rxa_wrr_next(struct rte_event_eth_rx_adapter *rx_adapter,
243          unsigned int n, int *cw,
244          struct eth_rx_poll_entry *eth_rx_poll, uint16_t max_wt,
245          uint16_t gcd, int prev)
246 {
247         int i = prev;
248         uint16_t w;
249
250         while (1) {
251                 uint16_t q;
252                 uint16_t d;
253
254                 i = (i + 1) % n;
255                 if (i == 0) {
256                         *cw = *cw - gcd;
257                         if (*cw <= 0)
258                                 *cw = max_wt;
259                 }
260
261                 q = eth_rx_poll[i].eth_rx_qid;
262                 d = eth_rx_poll[i].eth_dev_id;
263                 w = rx_adapter->eth_devices[d].rx_queue[q].wt;
264
265                 if ((int)w >= *cw)
266                         return i;
267         }
268 }
269
270 static inline int
271 rxa_shared_intr(struct eth_device_info *dev_info,
272         int rx_queue_id)
273 {
274         int multi_intr_cap;
275
276         if (dev_info->dev->intr_handle == NULL)
277                 return 0;
278
279         multi_intr_cap = rte_intr_cap_multiple(dev_info->dev->intr_handle);
280         return !multi_intr_cap ||
281                 rx_queue_id >= RTE_MAX_RXTX_INTR_VEC_ID - 1;
282 }
283
284 static inline int
285 rxa_intr_queue(struct eth_device_info *dev_info,
286         int rx_queue_id)
287 {
288         struct eth_rx_queue_info *queue_info;
289
290         queue_info = &dev_info->rx_queue[rx_queue_id];
291         return dev_info->rx_queue &&
292                 !dev_info->internal_event_port &&
293                 queue_info->queue_enabled && queue_info->wt == 0;
294 }
295
296 static inline int
297 rxa_polled_queue(struct eth_device_info *dev_info,
298         int rx_queue_id)
299 {
300         struct eth_rx_queue_info *queue_info;
301
302         queue_info = &dev_info->rx_queue[rx_queue_id];
303         return !dev_info->internal_event_port &&
304                 dev_info->rx_queue &&
305                 queue_info->queue_enabled && queue_info->wt != 0;
306 }
307
308 /* Calculate change in number of vectors after Rx queue ID is add/deleted */
309 static int
310 rxa_nb_intr_vect(struct eth_device_info *dev_info, int rx_queue_id, int add)
311 {
312         uint16_t i;
313         int n, s;
314         uint16_t nbq;
315
316         nbq = dev_info->dev->data->nb_rx_queues;
317         n = 0; /* non shared count */
318         s = 0; /* shared count */
319
320         if (rx_queue_id == -1) {
321                 for (i = 0; i < nbq; i++) {
322                         if (!rxa_shared_intr(dev_info, i))
323                                 n += add ? !rxa_intr_queue(dev_info, i) :
324                                         rxa_intr_queue(dev_info, i);
325                         else
326                                 s += add ? !rxa_intr_queue(dev_info, i) :
327                                         rxa_intr_queue(dev_info, i);
328                 }
329
330                 if (s > 0) {
331                         if ((add && dev_info->nb_shared_intr == 0) ||
332                                 (!add && dev_info->nb_shared_intr))
333                                 n += 1;
334                 }
335         } else {
336                 if (!rxa_shared_intr(dev_info, rx_queue_id))
337                         n = add ? !rxa_intr_queue(dev_info, rx_queue_id) :
338                                 rxa_intr_queue(dev_info, rx_queue_id);
339                 else
340                         n = add ? !dev_info->nb_shared_intr :
341                                 dev_info->nb_shared_intr == 1;
342         }
343
344         return add ? n : -n;
345 }
346
347 /* Calculate nb_rx_intr after deleting interrupt mode rx queues
348  */
349 static void
350 rxa_calc_nb_post_intr_del(struct rte_event_eth_rx_adapter *rx_adapter,
351                         struct eth_device_info *dev_info,
352                         int rx_queue_id,
353                         uint32_t *nb_rx_intr)
354 {
355         uint32_t intr_diff;
356
357         if (rx_queue_id == -1)
358                 intr_diff = dev_info->nb_rx_intr;
359         else
360                 intr_diff = rxa_intr_queue(dev_info, rx_queue_id);
361
362         *nb_rx_intr = rx_adapter->num_rx_intr - intr_diff;
363 }
364
365 /* Calculate nb_rx_* after adding interrupt mode rx queues, newly added
366  * interrupt queues could currently be poll mode Rx queues
367  */
368 static void
369 rxa_calc_nb_post_add_intr(struct rte_event_eth_rx_adapter *rx_adapter,
370                         struct eth_device_info *dev_info,
371                         int rx_queue_id,
372                         uint32_t *nb_rx_poll,
373                         uint32_t *nb_rx_intr,
374                         uint32_t *nb_wrr)
375 {
376         uint32_t intr_diff;
377         uint32_t poll_diff;
378         uint32_t wrr_len_diff;
379
380         if (rx_queue_id == -1) {
381                 intr_diff = dev_info->dev->data->nb_rx_queues -
382                                                 dev_info->nb_rx_intr;
383                 poll_diff = dev_info->nb_rx_poll;
384                 wrr_len_diff = dev_info->wrr_len;
385         } else {
386                 intr_diff = !rxa_intr_queue(dev_info, rx_queue_id);
387                 poll_diff = rxa_polled_queue(dev_info, rx_queue_id);
388                 wrr_len_diff = poll_diff ? dev_info->rx_queue[rx_queue_id].wt :
389                                         0;
390         }
391
392         *nb_rx_intr = rx_adapter->num_rx_intr + intr_diff;
393         *nb_rx_poll = rx_adapter->num_rx_polled - poll_diff;
394         *nb_wrr = rx_adapter->wrr_len - wrr_len_diff;
395 }
396
397 /* Calculate size of the eth_rx_poll and wrr_sched arrays
398  * after deleting poll mode rx queues
399  */
400 static void
401 rxa_calc_nb_post_poll_del(struct rte_event_eth_rx_adapter *rx_adapter,
402                         struct eth_device_info *dev_info,
403                         int rx_queue_id,
404                         uint32_t *nb_rx_poll,
405                         uint32_t *nb_wrr)
406 {
407         uint32_t poll_diff;
408         uint32_t wrr_len_diff;
409
410         if (rx_queue_id == -1) {
411                 poll_diff = dev_info->nb_rx_poll;
412                 wrr_len_diff = dev_info->wrr_len;
413         } else {
414                 poll_diff = rxa_polled_queue(dev_info, rx_queue_id);
415                 wrr_len_diff = poll_diff ? dev_info->rx_queue[rx_queue_id].wt :
416                                         0;
417         }
418
419         *nb_rx_poll = rx_adapter->num_rx_polled - poll_diff;
420         *nb_wrr = rx_adapter->wrr_len - wrr_len_diff;
421 }
422
423 /* Calculate nb_rx_* after adding poll mode rx queues
424  */
425 static void
426 rxa_calc_nb_post_add_poll(struct rte_event_eth_rx_adapter *rx_adapter,
427                         struct eth_device_info *dev_info,
428                         int rx_queue_id,
429                         uint16_t wt,
430                         uint32_t *nb_rx_poll,
431                         uint32_t *nb_rx_intr,
432                         uint32_t *nb_wrr)
433 {
434         uint32_t intr_diff;
435         uint32_t poll_diff;
436         uint32_t wrr_len_diff;
437
438         if (rx_queue_id == -1) {
439                 intr_diff = dev_info->nb_rx_intr;
440                 poll_diff = dev_info->dev->data->nb_rx_queues -
441                                                 dev_info->nb_rx_poll;
442                 wrr_len_diff = wt*dev_info->dev->data->nb_rx_queues
443                                 - dev_info->wrr_len;
444         } else {
445                 intr_diff = rxa_intr_queue(dev_info, rx_queue_id);
446                 poll_diff = !rxa_polled_queue(dev_info, rx_queue_id);
447                 wrr_len_diff = rxa_polled_queue(dev_info, rx_queue_id) ?
448                                 wt - dev_info->rx_queue[rx_queue_id].wt :
449                                 wt;
450         }
451
452         *nb_rx_poll = rx_adapter->num_rx_polled + poll_diff;
453         *nb_rx_intr = rx_adapter->num_rx_intr - intr_diff;
454         *nb_wrr = rx_adapter->wrr_len + wrr_len_diff;
455 }
456
457 /* Calculate nb_rx_* after adding rx_queue_id */
458 static void
459 rxa_calc_nb_post_add(struct rte_event_eth_rx_adapter *rx_adapter,
460                 struct eth_device_info *dev_info,
461                 int rx_queue_id,
462                 uint16_t wt,
463                 uint32_t *nb_rx_poll,
464                 uint32_t *nb_rx_intr,
465                 uint32_t *nb_wrr)
466 {
467         if (wt != 0)
468                 rxa_calc_nb_post_add_poll(rx_adapter, dev_info, rx_queue_id,
469                                         wt, nb_rx_poll, nb_rx_intr, nb_wrr);
470         else
471                 rxa_calc_nb_post_add_intr(rx_adapter, dev_info, rx_queue_id,
472                                         nb_rx_poll, nb_rx_intr, nb_wrr);
473 }
474
475 /* Calculate nb_rx_* after deleting rx_queue_id */
476 static void
477 rxa_calc_nb_post_del(struct rte_event_eth_rx_adapter *rx_adapter,
478                 struct eth_device_info *dev_info,
479                 int rx_queue_id,
480                 uint32_t *nb_rx_poll,
481                 uint32_t *nb_rx_intr,
482                 uint32_t *nb_wrr)
483 {
484         rxa_calc_nb_post_poll_del(rx_adapter, dev_info, rx_queue_id, nb_rx_poll,
485                                 nb_wrr);
486         rxa_calc_nb_post_intr_del(rx_adapter, dev_info, rx_queue_id,
487                                 nb_rx_intr);
488 }
489
490 /*
491  * Allocate the rx_poll array
492  */
493 static struct eth_rx_poll_entry *
494 rxa_alloc_poll(struct rte_event_eth_rx_adapter *rx_adapter,
495         uint32_t num_rx_polled)
496 {
497         size_t len;
498
499         len  = RTE_ALIGN(num_rx_polled * sizeof(*rx_adapter->eth_rx_poll),
500                                                         RTE_CACHE_LINE_SIZE);
501         return  rte_zmalloc_socket(rx_adapter->mem_name,
502                                 len,
503                                 RTE_CACHE_LINE_SIZE,
504                                 rx_adapter->socket_id);
505 }
506
507 /*
508  * Allocate the WRR array
509  */
510 static uint32_t *
511 rxa_alloc_wrr(struct rte_event_eth_rx_adapter *rx_adapter, int nb_wrr)
512 {
513         size_t len;
514
515         len = RTE_ALIGN(nb_wrr * sizeof(*rx_adapter->wrr_sched),
516                         RTE_CACHE_LINE_SIZE);
517         return  rte_zmalloc_socket(rx_adapter->mem_name,
518                                 len,
519                                 RTE_CACHE_LINE_SIZE,
520                                 rx_adapter->socket_id);
521 }
522
523 static int
524 rxa_alloc_poll_arrays(struct rte_event_eth_rx_adapter *rx_adapter,
525                 uint32_t nb_poll,
526                 uint32_t nb_wrr,
527                 struct eth_rx_poll_entry **rx_poll,
528                 uint32_t **wrr_sched)
529 {
530
531         if (nb_poll == 0) {
532                 *rx_poll = NULL;
533                 *wrr_sched = NULL;
534                 return 0;
535         }
536
537         *rx_poll = rxa_alloc_poll(rx_adapter, nb_poll);
538         if (*rx_poll == NULL) {
539                 *wrr_sched = NULL;
540                 return -ENOMEM;
541         }
542
543         *wrr_sched = rxa_alloc_wrr(rx_adapter, nb_wrr);
544         if (*wrr_sched == NULL) {
545                 rte_free(*rx_poll);
546                 return -ENOMEM;
547         }
548         return 0;
549 }
550
551 /* Precalculate WRR polling sequence for all queues in rx_adapter */
552 static void
553 rxa_calc_wrr_sequence(struct rte_event_eth_rx_adapter *rx_adapter,
554                 struct eth_rx_poll_entry *rx_poll,
555                 uint32_t *rx_wrr)
556 {
557         uint16_t d;
558         uint16_t q;
559         unsigned int i;
560         int prev = -1;
561         int cw = -1;
562
563         /* Initialize variables for calculation of wrr schedule */
564         uint16_t max_wrr_pos = 0;
565         unsigned int poll_q = 0;
566         uint16_t max_wt = 0;
567         uint16_t gcd = 0;
568
569         if (rx_poll == NULL)
570                 return;
571
572         /* Generate array of all queues to poll, the size of this
573          * array is poll_q
574          */
575         RTE_ETH_FOREACH_DEV(d) {
576                 uint16_t nb_rx_queues;
577                 struct eth_device_info *dev_info =
578                                 &rx_adapter->eth_devices[d];
579                 nb_rx_queues = dev_info->dev->data->nb_rx_queues;
580                 if (dev_info->rx_queue == NULL)
581                         continue;
582                 if (dev_info->internal_event_port)
583                         continue;
584                 dev_info->wrr_len = 0;
585                 for (q = 0; q < nb_rx_queues; q++) {
586                         struct eth_rx_queue_info *queue_info =
587                                 &dev_info->rx_queue[q];
588                         uint16_t wt;
589
590                         if (!rxa_polled_queue(dev_info, q))
591                                 continue;
592                         wt = queue_info->wt;
593                         rx_poll[poll_q].eth_dev_id = d;
594                         rx_poll[poll_q].eth_rx_qid = q;
595                         max_wrr_pos += wt;
596                         dev_info->wrr_len += wt;
597                         max_wt = RTE_MAX(max_wt, wt);
598                         gcd = (gcd) ? rxa_gcd_u16(gcd, wt) : wt;
599                         poll_q++;
600                 }
601         }
602
603         /* Generate polling sequence based on weights */
604         prev = -1;
605         cw = -1;
606         for (i = 0; i < max_wrr_pos; i++) {
607                 rx_wrr[i] = rxa_wrr_next(rx_adapter, poll_q, &cw,
608                                      rx_poll, max_wt, gcd, prev);
609                 prev = rx_wrr[i];
610         }
611 }
612
613 static inline void
614 rxa_mtoip(struct rte_mbuf *m, struct ipv4_hdr **ipv4_hdr,
615         struct ipv6_hdr **ipv6_hdr)
616 {
617         struct ether_hdr *eth_hdr = rte_pktmbuf_mtod(m, struct ether_hdr *);
618         struct vlan_hdr *vlan_hdr;
619
620         *ipv4_hdr = NULL;
621         *ipv6_hdr = NULL;
622
623         switch (eth_hdr->ether_type) {
624         case RTE_BE16(ETHER_TYPE_IPv4):
625                 *ipv4_hdr = (struct ipv4_hdr *)(eth_hdr + 1);
626                 break;
627
628         case RTE_BE16(ETHER_TYPE_IPv6):
629                 *ipv6_hdr = (struct ipv6_hdr *)(eth_hdr + 1);
630                 break;
631
632         case RTE_BE16(ETHER_TYPE_VLAN):
633                 vlan_hdr = (struct vlan_hdr *)(eth_hdr + 1);
634                 switch (vlan_hdr->eth_proto) {
635                 case RTE_BE16(ETHER_TYPE_IPv4):
636                         *ipv4_hdr = (struct ipv4_hdr *)(vlan_hdr + 1);
637                         break;
638                 case RTE_BE16(ETHER_TYPE_IPv6):
639                         *ipv6_hdr = (struct ipv6_hdr *)(vlan_hdr + 1);
640                         break;
641                 default:
642                         break;
643                 }
644                 break;
645
646         default:
647                 break;
648         }
649 }
650
651 /* Calculate RSS hash for IPv4/6 */
652 static inline uint32_t
653 rxa_do_softrss(struct rte_mbuf *m, const uint8_t *rss_key_be)
654 {
655         uint32_t input_len;
656         void *tuple;
657         struct rte_ipv4_tuple ipv4_tuple;
658         struct rte_ipv6_tuple ipv6_tuple;
659         struct ipv4_hdr *ipv4_hdr;
660         struct ipv6_hdr *ipv6_hdr;
661
662         rxa_mtoip(m, &ipv4_hdr, &ipv6_hdr);
663
664         if (ipv4_hdr) {
665                 ipv4_tuple.src_addr = rte_be_to_cpu_32(ipv4_hdr->src_addr);
666                 ipv4_tuple.dst_addr = rte_be_to_cpu_32(ipv4_hdr->dst_addr);
667                 tuple = &ipv4_tuple;
668                 input_len = RTE_THASH_V4_L3_LEN;
669         } else if (ipv6_hdr) {
670                 rte_thash_load_v6_addrs(ipv6_hdr,
671                                         (union rte_thash_tuple *)&ipv6_tuple);
672                 tuple = &ipv6_tuple;
673                 input_len = RTE_THASH_V6_L3_LEN;
674         } else
675                 return 0;
676
677         return rte_softrss_be(tuple, input_len, rss_key_be);
678 }
679
680 static inline int
681 rxa_enq_blocked(struct rte_event_eth_rx_adapter *rx_adapter)
682 {
683         return !!rx_adapter->enq_block_count;
684 }
685
686 static inline void
687 rxa_enq_block_start_ts(struct rte_event_eth_rx_adapter *rx_adapter)
688 {
689         if (rx_adapter->rx_enq_block_start_ts)
690                 return;
691
692         rx_adapter->enq_block_count++;
693         if (rx_adapter->enq_block_count < BLOCK_CNT_THRESHOLD)
694                 return;
695
696         rx_adapter->rx_enq_block_start_ts = rte_get_tsc_cycles();
697 }
698
699 static inline void
700 rxa_enq_block_end_ts(struct rte_event_eth_rx_adapter *rx_adapter,
701                     struct rte_event_eth_rx_adapter_stats *stats)
702 {
703         if (unlikely(!stats->rx_enq_start_ts))
704                 stats->rx_enq_start_ts = rte_get_tsc_cycles();
705
706         if (likely(!rxa_enq_blocked(rx_adapter)))
707                 return;
708
709         rx_adapter->enq_block_count = 0;
710         if (rx_adapter->rx_enq_block_start_ts) {
711                 stats->rx_enq_end_ts = rte_get_tsc_cycles();
712                 stats->rx_enq_block_cycles += stats->rx_enq_end_ts -
713                     rx_adapter->rx_enq_block_start_ts;
714                 rx_adapter->rx_enq_block_start_ts = 0;
715         }
716 }
717
718 /* Add event to buffer, free space check is done prior to calling
719  * this function
720  */
721 static inline void
722 rxa_buffer_event(struct rte_event_eth_rx_adapter *rx_adapter,
723                 struct rte_event *ev)
724 {
725         struct rte_eth_event_enqueue_buffer *buf =
726             &rx_adapter->event_enqueue_buffer;
727         rte_memcpy(&buf->events[buf->count++], ev, sizeof(struct rte_event));
728 }
729
730 /* Enqueue buffered events to event device */
731 static inline uint16_t
732 rxa_flush_event_buffer(struct rte_event_eth_rx_adapter *rx_adapter)
733 {
734         struct rte_eth_event_enqueue_buffer *buf =
735             &rx_adapter->event_enqueue_buffer;
736         struct rte_event_eth_rx_adapter_stats *stats = &rx_adapter->stats;
737
738         uint16_t n = rte_event_enqueue_new_burst(rx_adapter->eventdev_id,
739                                         rx_adapter->event_port_id,
740                                         buf->events,
741                                         buf->count);
742         if (n != buf->count) {
743                 memmove(buf->events,
744                         &buf->events[n],
745                         (buf->count - n) * sizeof(struct rte_event));
746                 stats->rx_enq_retry++;
747         }
748
749         n ? rxa_enq_block_end_ts(rx_adapter, stats) :
750                 rxa_enq_block_start_ts(rx_adapter);
751
752         buf->count -= n;
753         stats->rx_enq_count += n;
754
755         return n;
756 }
757
758 static inline void
759 rxa_buffer_mbufs(struct rte_event_eth_rx_adapter *rx_adapter,
760                 uint16_t eth_dev_id,
761                 uint16_t rx_queue_id,
762                 struct rte_mbuf **mbufs,
763                 uint16_t num)
764 {
765         uint32_t i;
766         struct eth_device_info *dev_info =
767                                         &rx_adapter->eth_devices[eth_dev_id];
768         struct eth_rx_queue_info *eth_rx_queue_info =
769                                         &dev_info->rx_queue[rx_queue_id];
770         struct rte_eth_event_enqueue_buffer *buf =
771                                         &rx_adapter->event_enqueue_buffer;
772         int32_t qid = eth_rx_queue_info->event_queue_id;
773         uint8_t sched_type = eth_rx_queue_info->sched_type;
774         uint8_t priority = eth_rx_queue_info->priority;
775         uint32_t flow_id;
776         struct rte_event events[BATCH_SIZE];
777         struct rte_mbuf *m = mbufs[0];
778         uint32_t rss_mask;
779         uint32_t rss;
780         int do_rss;
781         uint64_t ts;
782         struct rte_mbuf *cb_mbufs[BATCH_SIZE];
783         uint16_t nb_cb;
784
785         /* 0xffff ffff if PKT_RX_RSS_HASH is set, otherwise 0 */
786         rss_mask = ~(((m->ol_flags & PKT_RX_RSS_HASH) != 0) - 1);
787         do_rss = !rss_mask && !eth_rx_queue_info->flow_id_mask;
788
789         if ((m->ol_flags & PKT_RX_TIMESTAMP) == 0) {
790                 ts = rte_get_tsc_cycles();
791                 for (i = 0; i < num; i++) {
792                         m = mbufs[i];
793
794                         m->timestamp = ts;
795                         m->ol_flags |= PKT_RX_TIMESTAMP;
796                 }
797         }
798
799
800         nb_cb = dev_info->cb_fn ? dev_info->cb_fn(eth_dev_id, rx_queue_id,
801                                                 ETH_EVENT_BUFFER_SIZE,
802                                                 buf->count, mbufs,
803                                                 num,
804                                                 dev_info->cb_arg,
805                                                 cb_mbufs) :
806                                                 num;
807         if (nb_cb < num) {
808                 mbufs = cb_mbufs;
809                 num = nb_cb;
810         }
811
812         for (i = 0; i < num; i++) {
813                 m = mbufs[i];
814                 struct rte_event *ev = &events[i];
815
816                 rss = do_rss ?
817                         rxa_do_softrss(m, rx_adapter->rss_key_be) :
818                         m->hash.rss;
819                 flow_id =
820                     eth_rx_queue_info->flow_id &
821                                 eth_rx_queue_info->flow_id_mask;
822                 flow_id |= rss & ~eth_rx_queue_info->flow_id_mask;
823                 ev->flow_id = flow_id;
824                 ev->op = RTE_EVENT_OP_NEW;
825                 ev->sched_type = sched_type;
826                 ev->queue_id = qid;
827                 ev->event_type = RTE_EVENT_TYPE_ETH_RX_ADAPTER;
828                 ev->sub_event_type = 0;
829                 ev->priority = priority;
830                 ev->mbuf = m;
831
832                 rxa_buffer_event(rx_adapter, ev);
833         }
834 }
835
836 /* Enqueue packets from  <port, q>  to event buffer */
837 static inline uint32_t
838 rxa_eth_rx(struct rte_event_eth_rx_adapter *rx_adapter,
839         uint16_t port_id,
840         uint16_t queue_id,
841         uint32_t rx_count,
842         uint32_t max_rx,
843         int *rxq_empty)
844 {
845         struct rte_mbuf *mbufs[BATCH_SIZE];
846         struct rte_eth_event_enqueue_buffer *buf =
847                                         &rx_adapter->event_enqueue_buffer;
848         struct rte_event_eth_rx_adapter_stats *stats =
849                                         &rx_adapter->stats;
850         uint16_t n;
851         uint32_t nb_rx = 0;
852
853         if (rxq_empty)
854                 *rxq_empty = 0;
855         /* Don't do a batch dequeue from the rx queue if there isn't
856          * enough space in the enqueue buffer.
857          */
858         while (BATCH_SIZE <= (RTE_DIM(buf->events) - buf->count)) {
859                 if (buf->count >= BATCH_SIZE)
860                         rxa_flush_event_buffer(rx_adapter);
861
862                 stats->rx_poll_count++;
863                 n = rte_eth_rx_burst(port_id, queue_id, mbufs, BATCH_SIZE);
864                 if (unlikely(!n)) {
865                         if (rxq_empty)
866                                 *rxq_empty = 1;
867                         break;
868                 }
869                 rxa_buffer_mbufs(rx_adapter, port_id, queue_id, mbufs, n);
870                 nb_rx += n;
871                 if (rx_count + nb_rx > max_rx)
872                         break;
873         }
874
875         if (buf->count >= BATCH_SIZE)
876                 rxa_flush_event_buffer(rx_adapter);
877
878         return nb_rx;
879 }
880
881 static inline void
882 rxa_intr_ring_enqueue(struct rte_event_eth_rx_adapter *rx_adapter,
883                 void *data)
884 {
885         uint16_t port_id;
886         uint16_t queue;
887         int err;
888         union queue_data qd;
889         struct eth_device_info *dev_info;
890         struct eth_rx_queue_info *queue_info;
891         int *intr_enabled;
892
893         qd.ptr = data;
894         port_id = qd.port;
895         queue = qd.queue;
896
897         dev_info = &rx_adapter->eth_devices[port_id];
898         queue_info = &dev_info->rx_queue[queue];
899         rte_spinlock_lock(&rx_adapter->intr_ring_lock);
900         if (rxa_shared_intr(dev_info, queue))
901                 intr_enabled = &dev_info->shared_intr_enabled;
902         else
903                 intr_enabled = &queue_info->intr_enabled;
904
905         if (*intr_enabled) {
906                 *intr_enabled = 0;
907                 err = rte_ring_enqueue(rx_adapter->intr_ring, data);
908                 /* Entry should always be available.
909                  * The ring size equals the maximum number of interrupt
910                  * vectors supported (an interrupt vector is shared in
911                  * case of shared interrupts)
912                  */
913                 if (err)
914                         RTE_EDEV_LOG_ERR("Failed to enqueue interrupt"
915                                 " to ring: %s", strerror(err));
916                 else
917                         rte_eth_dev_rx_intr_disable(port_id, queue);
918         }
919         rte_spinlock_unlock(&rx_adapter->intr_ring_lock);
920 }
921
922 static int
923 rxa_intr_ring_check_avail(struct rte_event_eth_rx_adapter *rx_adapter,
924                         uint32_t num_intr_vec)
925 {
926         if (rx_adapter->num_intr_vec + num_intr_vec >
927                                 RTE_EVENT_ETH_INTR_RING_SIZE) {
928                 RTE_EDEV_LOG_ERR("Exceeded intr ring slots current"
929                 " %d needed %d limit %d", rx_adapter->num_intr_vec,
930                 num_intr_vec, RTE_EVENT_ETH_INTR_RING_SIZE);
931                 return -ENOSPC;
932         }
933
934         return 0;
935 }
936
937 /* Delete entries for (dev, queue) from the interrupt ring */
938 static void
939 rxa_intr_ring_del_entries(struct rte_event_eth_rx_adapter *rx_adapter,
940                         struct eth_device_info *dev_info,
941                         uint16_t rx_queue_id)
942 {
943         int i, n;
944         union queue_data qd;
945
946         rte_spinlock_lock(&rx_adapter->intr_ring_lock);
947
948         n = rte_ring_count(rx_adapter->intr_ring);
949         for (i = 0; i < n; i++) {
950                 rte_ring_dequeue(rx_adapter->intr_ring, &qd.ptr);
951                 if (!rxa_shared_intr(dev_info, rx_queue_id)) {
952                         if (qd.port == dev_info->dev->data->port_id &&
953                                 qd.queue == rx_queue_id)
954                                 continue;
955                 } else {
956                         if (qd.port == dev_info->dev->data->port_id)
957                                 continue;
958                 }
959                 rte_ring_enqueue(rx_adapter->intr_ring, qd.ptr);
960         }
961
962         rte_spinlock_unlock(&rx_adapter->intr_ring_lock);
963 }
964
965 /* pthread callback handling interrupt mode receive queues
966  * After receiving an Rx interrupt, it enqueues the port id and queue id of the
967  * interrupting queue to the adapter's ring buffer for interrupt events.
968  * These events are picked up by rxa_intr_ring_dequeue() which is invoked from
969  * the adapter service function.
970  */
971 static void *
972 rxa_intr_thread(void *arg)
973 {
974         struct rte_event_eth_rx_adapter *rx_adapter = arg;
975         struct rte_epoll_event *epoll_events = rx_adapter->epoll_events;
976         int n, i;
977
978         while (1) {
979                 n = rte_epoll_wait(rx_adapter->epd, epoll_events,
980                                 RTE_EVENT_ETH_INTR_RING_SIZE, -1);
981                 if (unlikely(n < 0))
982                         RTE_EDEV_LOG_ERR("rte_epoll_wait returned error %d",
983                                         n);
984                 for (i = 0; i < n; i++) {
985                         rxa_intr_ring_enqueue(rx_adapter,
986                                         epoll_events[i].epdata.data);
987                 }
988         }
989
990         return NULL;
991 }
992
993 /* Dequeue <port, q> from interrupt ring and enqueue received
994  * mbufs to eventdev
995  */
996 static inline uint32_t
997 rxa_intr_ring_dequeue(struct rte_event_eth_rx_adapter *rx_adapter)
998 {
999         uint32_t n;
1000         uint32_t nb_rx = 0;
1001         int rxq_empty;
1002         struct rte_eth_event_enqueue_buffer *buf;
1003         rte_spinlock_t *ring_lock;
1004         uint8_t max_done = 0;
1005
1006         if (rx_adapter->num_rx_intr == 0)
1007                 return 0;
1008
1009         if (rte_ring_count(rx_adapter->intr_ring) == 0
1010                 && !rx_adapter->qd_valid)
1011                 return 0;
1012
1013         buf = &rx_adapter->event_enqueue_buffer;
1014         ring_lock = &rx_adapter->intr_ring_lock;
1015
1016         if (buf->count >= BATCH_SIZE)
1017                 rxa_flush_event_buffer(rx_adapter);
1018
1019         while (BATCH_SIZE <= (RTE_DIM(buf->events) - buf->count)) {
1020                 struct eth_device_info *dev_info;
1021                 uint16_t port;
1022                 uint16_t queue;
1023                 union queue_data qd  = rx_adapter->qd;
1024                 int err;
1025
1026                 if (!rx_adapter->qd_valid) {
1027                         struct eth_rx_queue_info *queue_info;
1028
1029                         rte_spinlock_lock(ring_lock);
1030                         err = rte_ring_dequeue(rx_adapter->intr_ring, &qd.ptr);
1031                         if (err) {
1032                                 rte_spinlock_unlock(ring_lock);
1033                                 break;
1034                         }
1035
1036                         port = qd.port;
1037                         queue = qd.queue;
1038                         rx_adapter->qd = qd;
1039                         rx_adapter->qd_valid = 1;
1040                         dev_info = &rx_adapter->eth_devices[port];
1041                         if (rxa_shared_intr(dev_info, queue))
1042                                 dev_info->shared_intr_enabled = 1;
1043                         else {
1044                                 queue_info = &dev_info->rx_queue[queue];
1045                                 queue_info->intr_enabled = 1;
1046                         }
1047                         rte_eth_dev_rx_intr_enable(port, queue);
1048                         rte_spinlock_unlock(ring_lock);
1049                 } else {
1050                         port = qd.port;
1051                         queue = qd.queue;
1052
1053                         dev_info = &rx_adapter->eth_devices[port];
1054                 }
1055
1056                 if (rxa_shared_intr(dev_info, queue)) {
1057                         uint16_t i;
1058                         uint16_t nb_queues;
1059
1060                         nb_queues = dev_info->dev->data->nb_rx_queues;
1061                         n = 0;
1062                         for (i = dev_info->next_q_idx; i < nb_queues; i++) {
1063                                 uint8_t enq_buffer_full;
1064
1065                                 if (!rxa_intr_queue(dev_info, i))
1066                                         continue;
1067                                 n = rxa_eth_rx(rx_adapter, port, i, nb_rx,
1068                                         rx_adapter->max_nb_rx,
1069                                         &rxq_empty);
1070                                 nb_rx += n;
1071
1072                                 enq_buffer_full = !rxq_empty && n == 0;
1073                                 max_done = nb_rx > rx_adapter->max_nb_rx;
1074
1075                                 if (enq_buffer_full || max_done) {
1076                                         dev_info->next_q_idx = i;
1077                                         goto done;
1078                                 }
1079                         }
1080
1081                         rx_adapter->qd_valid = 0;
1082
1083                         /* Reinitialize for next interrupt */
1084                         dev_info->next_q_idx = dev_info->multi_intr_cap ?
1085                                                 RTE_MAX_RXTX_INTR_VEC_ID - 1 :
1086                                                 0;
1087                 } else {
1088                         n = rxa_eth_rx(rx_adapter, port, queue, nb_rx,
1089                                 rx_adapter->max_nb_rx,
1090                                 &rxq_empty);
1091                         rx_adapter->qd_valid = !rxq_empty;
1092                         nb_rx += n;
1093                         if (nb_rx > rx_adapter->max_nb_rx)
1094                                 break;
1095                 }
1096         }
1097
1098 done:
1099         rx_adapter->stats.rx_intr_packets += nb_rx;
1100         return nb_rx;
1101 }
1102
1103 /*
1104  * Polls receive queues added to the event adapter and enqueues received
1105  * packets to the event device.
1106  *
1107  * The receive code enqueues initially to a temporary buffer, the
1108  * temporary buffer is drained anytime it holds >= BATCH_SIZE packets
1109  *
1110  * If there isn't space available in the temporary buffer, packets from the
1111  * Rx queue aren't dequeued from the eth device, this back pressures the
1112  * eth device, in virtual device environments this back pressure is relayed to
1113  * the hypervisor's switching layer where adjustments can be made to deal with
1114  * it.
1115  */
1116 static inline uint32_t
1117 rxa_poll(struct rte_event_eth_rx_adapter *rx_adapter)
1118 {
1119         uint32_t num_queue;
1120         uint32_t nb_rx = 0;
1121         struct rte_eth_event_enqueue_buffer *buf;
1122         uint32_t wrr_pos;
1123         uint32_t max_nb_rx;
1124
1125         wrr_pos = rx_adapter->wrr_pos;
1126         max_nb_rx = rx_adapter->max_nb_rx;
1127         buf = &rx_adapter->event_enqueue_buffer;
1128         stats = &rx_adapter->stats;
1129
1130         /* Iterate through a WRR sequence */
1131         for (num_queue = 0; num_queue < rx_adapter->wrr_len; num_queue++) {
1132                 unsigned int poll_idx = rx_adapter->wrr_sched[wrr_pos];
1133                 uint16_t qid = rx_adapter->eth_rx_poll[poll_idx].eth_rx_qid;
1134                 uint16_t d = rx_adapter->eth_rx_poll[poll_idx].eth_dev_id;
1135
1136                 /* Don't do a batch dequeue from the rx queue if there isn't
1137                  * enough space in the enqueue buffer.
1138                  */
1139                 if (buf->count >= BATCH_SIZE)
1140                         rxa_flush_event_buffer(rx_adapter);
1141                 if (BATCH_SIZE > (ETH_EVENT_BUFFER_SIZE - buf->count)) {
1142                         rx_adapter->wrr_pos = wrr_pos;
1143                         return nb_rx;
1144                 }
1145
1146                 nb_rx += rxa_eth_rx(rx_adapter, d, qid, nb_rx, max_nb_rx,
1147                                 NULL);
1148                 if (nb_rx > max_nb_rx) {
1149                         rx_adapter->wrr_pos =
1150                                     (wrr_pos + 1) % rx_adapter->wrr_len;
1151                         break;
1152                 }
1153
1154                 if (++wrr_pos == rx_adapter->wrr_len)
1155                         wrr_pos = 0;
1156         }
1157         return nb_rx;
1158 }
1159
1160 static int
1161 rxa_service_func(void *args)
1162 {
1163         struct rte_event_eth_rx_adapter *rx_adapter = args;
1164         struct rte_event_eth_rx_adapter_stats *stats;
1165
1166         if (rte_spinlock_trylock(&rx_adapter->rx_lock) == 0)
1167                 return 0;
1168         if (!rx_adapter->rxa_started) {
1169                 return 0;
1170                 rte_spinlock_unlock(&rx_adapter->rx_lock);
1171         }
1172
1173         stats = &rx_adapter->stats;
1174         stats->rx_packets += rxa_intr_ring_dequeue(rx_adapter);
1175         stats->rx_packets += rxa_poll(rx_adapter);
1176         rte_spinlock_unlock(&rx_adapter->rx_lock);
1177         return 0;
1178 }
1179
1180 static int
1181 rte_event_eth_rx_adapter_init(void)
1182 {
1183         const char *name = "rte_event_eth_rx_adapter_array";
1184         const struct rte_memzone *mz;
1185         unsigned int sz;
1186
1187         sz = sizeof(*event_eth_rx_adapter) *
1188             RTE_EVENT_ETH_RX_ADAPTER_MAX_INSTANCE;
1189         sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);
1190
1191         mz = rte_memzone_lookup(name);
1192         if (mz == NULL) {
1193                 mz = rte_memzone_reserve_aligned(name, sz, rte_socket_id(), 0,
1194                                                  RTE_CACHE_LINE_SIZE);
1195                 if (mz == NULL) {
1196                         RTE_EDEV_LOG_ERR("failed to reserve memzone err = %"
1197                                         PRId32, rte_errno);
1198                         return -rte_errno;
1199                 }
1200         }
1201
1202         event_eth_rx_adapter = mz->addr;
1203         return 0;
1204 }
1205
1206 static inline struct rte_event_eth_rx_adapter *
1207 rxa_id_to_adapter(uint8_t id)
1208 {
1209         return event_eth_rx_adapter ?
1210                 event_eth_rx_adapter[id] : NULL;
1211 }
1212
1213 static int
1214 rxa_default_conf_cb(uint8_t id, uint8_t dev_id,
1215                 struct rte_event_eth_rx_adapter_conf *conf, void *arg)
1216 {
1217         int ret;
1218         struct rte_eventdev *dev;
1219         struct rte_event_dev_config dev_conf;
1220         int started;
1221         uint8_t port_id;
1222         struct rte_event_port_conf *port_conf = arg;
1223         struct rte_event_eth_rx_adapter *rx_adapter = rxa_id_to_adapter(id);
1224
1225         dev = &rte_eventdevs[rx_adapter->eventdev_id];
1226         dev_conf = dev->data->dev_conf;
1227
1228         started = dev->data->dev_started;
1229         if (started)
1230                 rte_event_dev_stop(dev_id);
1231         port_id = dev_conf.nb_event_ports;
1232         dev_conf.nb_event_ports += 1;
1233         ret = rte_event_dev_configure(dev_id, &dev_conf);
1234         if (ret) {
1235                 RTE_EDEV_LOG_ERR("failed to configure event dev %u\n",
1236                                                 dev_id);
1237                 if (started) {
1238                         if (rte_event_dev_start(dev_id))
1239                                 return -EIO;
1240                 }
1241                 return ret;
1242         }
1243
1244         ret = rte_event_port_setup(dev_id, port_id, port_conf);
1245         if (ret) {
1246                 RTE_EDEV_LOG_ERR("failed to setup event port %u\n",
1247                                         port_id);
1248                 return ret;
1249         }
1250
1251         conf->event_port_id = port_id;
1252         conf->max_nb_rx = 128;
1253         if (started)
1254                 ret = rte_event_dev_start(dev_id);
1255         rx_adapter->default_cb_arg = 1;
1256         return ret;
1257 }
1258
1259 static int
1260 rxa_epoll_create1(void)
1261 {
1262 #if defined(LINUX)
1263         int fd;
1264         fd = epoll_create1(EPOLL_CLOEXEC);
1265         return fd < 0 ? -errno : fd;
1266 #elif defined(BSD)
1267         return -ENOTSUP;
1268 #endif
1269 }
1270
1271 static int
1272 rxa_init_epd(struct rte_event_eth_rx_adapter *rx_adapter)
1273 {
1274         if (rx_adapter->epd != INIT_FD)
1275                 return 0;
1276
1277         rx_adapter->epd = rxa_epoll_create1();
1278         if (rx_adapter->epd < 0) {
1279                 int err = rx_adapter->epd;
1280                 rx_adapter->epd = INIT_FD;
1281                 RTE_EDEV_LOG_ERR("epoll_create1() failed, err %d", err);
1282                 return err;
1283         }
1284
1285         return 0;
1286 }
1287
1288 static int
1289 rxa_create_intr_thread(struct rte_event_eth_rx_adapter *rx_adapter)
1290 {
1291         int err;
1292         char thread_name[RTE_MAX_THREAD_NAME_LEN];
1293
1294         if (rx_adapter->intr_ring)
1295                 return 0;
1296
1297         rx_adapter->intr_ring = rte_ring_create("intr_ring",
1298                                         RTE_EVENT_ETH_INTR_RING_SIZE,
1299                                         rte_socket_id(), 0);
1300         if (!rx_adapter->intr_ring)
1301                 return -ENOMEM;
1302
1303         rx_adapter->epoll_events = rte_zmalloc_socket(rx_adapter->mem_name,
1304                                         RTE_EVENT_ETH_INTR_RING_SIZE *
1305                                         sizeof(struct rte_epoll_event),
1306                                         RTE_CACHE_LINE_SIZE,
1307                                         rx_adapter->socket_id);
1308         if (!rx_adapter->epoll_events) {
1309                 err = -ENOMEM;
1310                 goto error;
1311         }
1312
1313         rte_spinlock_init(&rx_adapter->intr_ring_lock);
1314
1315         snprintf(thread_name, RTE_MAX_THREAD_NAME_LEN,
1316                         "rx-intr-thread-%d", rx_adapter->id);
1317
1318         err = rte_ctrl_thread_create(&rx_adapter->rx_intr_thread, thread_name,
1319                                 NULL, rxa_intr_thread, rx_adapter);
1320         if (!err) {
1321                 rte_thread_setname(rx_adapter->rx_intr_thread, thread_name);
1322                 return 0;
1323         }
1324
1325         RTE_EDEV_LOG_ERR("Failed to create interrupt thread err = %d\n", err);
1326 error:
1327         rte_ring_free(rx_adapter->intr_ring);
1328         rx_adapter->intr_ring = NULL;
1329         rx_adapter->epoll_events = NULL;
1330         return err;
1331 }
1332
1333 static int
1334 rxa_destroy_intr_thread(struct rte_event_eth_rx_adapter *rx_adapter)
1335 {
1336         int err;
1337
1338         err = pthread_cancel(rx_adapter->rx_intr_thread);
1339         if (err)
1340                 RTE_EDEV_LOG_ERR("Can't cancel interrupt thread err = %d\n",
1341                                 err);
1342
1343         err = pthread_join(rx_adapter->rx_intr_thread, NULL);
1344         if (err)
1345                 RTE_EDEV_LOG_ERR("Can't join interrupt thread err = %d\n", err);
1346
1347         rte_free(rx_adapter->epoll_events);
1348         rte_ring_free(rx_adapter->intr_ring);
1349         rx_adapter->intr_ring = NULL;
1350         rx_adapter->epoll_events = NULL;
1351         return 0;
1352 }
1353
1354 static int
1355 rxa_free_intr_resources(struct rte_event_eth_rx_adapter *rx_adapter)
1356 {
1357         int ret;
1358
1359         if (rx_adapter->num_rx_intr == 0)
1360                 return 0;
1361
1362         ret = rxa_destroy_intr_thread(rx_adapter);
1363         if (ret)
1364                 return ret;
1365
1366         close(rx_adapter->epd);
1367         rx_adapter->epd = INIT_FD;
1368
1369         return ret;
1370 }
1371
1372 static int
1373 rxa_disable_intr(struct rte_event_eth_rx_adapter *rx_adapter,
1374         struct eth_device_info *dev_info,
1375         uint16_t rx_queue_id)
1376 {
1377         int err;
1378         uint16_t eth_dev_id = dev_info->dev->data->port_id;
1379         int sintr = rxa_shared_intr(dev_info, rx_queue_id);
1380
1381         err = rte_eth_dev_rx_intr_disable(eth_dev_id, rx_queue_id);
1382         if (err) {
1383                 RTE_EDEV_LOG_ERR("Could not disable interrupt for Rx queue %u",
1384                         rx_queue_id);
1385                 return err;
1386         }
1387
1388         err = rte_eth_dev_rx_intr_ctl_q(eth_dev_id, rx_queue_id,
1389                                         rx_adapter->epd,
1390                                         RTE_INTR_EVENT_DEL,
1391                                         0);
1392         if (err)
1393                 RTE_EDEV_LOG_ERR("Interrupt event deletion failed %d", err);
1394
1395         if (sintr)
1396                 dev_info->rx_queue[rx_queue_id].intr_enabled = 0;
1397         else
1398                 dev_info->shared_intr_enabled = 0;
1399         return err;
1400 }
1401
1402 static int
1403 rxa_del_intr_queue(struct rte_event_eth_rx_adapter *rx_adapter,
1404                 struct eth_device_info *dev_info,
1405                 int rx_queue_id)
1406 {
1407         int err;
1408         int i;
1409         int s;
1410
1411         if (dev_info->nb_rx_intr == 0)
1412                 return 0;
1413
1414         err = 0;
1415         if (rx_queue_id == -1) {
1416                 s = dev_info->nb_shared_intr;
1417                 for (i = 0; i < dev_info->nb_rx_intr; i++) {
1418                         int sintr;
1419                         uint16_t q;
1420
1421                         q = dev_info->intr_queue[i];
1422                         sintr = rxa_shared_intr(dev_info, q);
1423                         s -= sintr;
1424
1425                         if (!sintr || s == 0) {
1426
1427                                 err = rxa_disable_intr(rx_adapter, dev_info,
1428                                                 q);
1429                                 if (err)
1430                                         return err;
1431                                 rxa_intr_ring_del_entries(rx_adapter, dev_info,
1432                                                         q);
1433                         }
1434                 }
1435         } else {
1436                 if (!rxa_intr_queue(dev_info, rx_queue_id))
1437                         return 0;
1438                 if (!rxa_shared_intr(dev_info, rx_queue_id) ||
1439                                 dev_info->nb_shared_intr == 1) {
1440                         err = rxa_disable_intr(rx_adapter, dev_info,
1441                                         rx_queue_id);
1442                         if (err)
1443                                 return err;
1444                         rxa_intr_ring_del_entries(rx_adapter, dev_info,
1445                                                 rx_queue_id);
1446                 }
1447
1448                 for (i = 0; i < dev_info->nb_rx_intr; i++) {
1449                         if (dev_info->intr_queue[i] == rx_queue_id) {
1450                                 for (; i < dev_info->nb_rx_intr - 1; i++)
1451                                         dev_info->intr_queue[i] =
1452                                                 dev_info->intr_queue[i + 1];
1453                                 break;
1454                         }
1455                 }
1456         }
1457
1458         return err;
1459 }
1460
1461 static int
1462 rxa_config_intr(struct rte_event_eth_rx_adapter *rx_adapter,
1463         struct eth_device_info *dev_info,
1464         uint16_t rx_queue_id)
1465 {
1466         int err, err1;
1467         uint16_t eth_dev_id = dev_info->dev->data->port_id;
1468         union queue_data qd;
1469         int init_fd;
1470         uint16_t *intr_queue;
1471         int sintr = rxa_shared_intr(dev_info, rx_queue_id);
1472
1473         if (rxa_intr_queue(dev_info, rx_queue_id))
1474                 return 0;
1475
1476         intr_queue = dev_info->intr_queue;
1477         if (dev_info->intr_queue == NULL) {
1478                 size_t len =
1479                         dev_info->dev->data->nb_rx_queues * sizeof(uint16_t);
1480                 dev_info->intr_queue =
1481                         rte_zmalloc_socket(
1482                                 rx_adapter->mem_name,
1483                                 len,
1484                                 0,
1485                                 rx_adapter->socket_id);
1486                 if (dev_info->intr_queue == NULL)
1487                         return -ENOMEM;
1488         }
1489
1490         init_fd = rx_adapter->epd;
1491         err = rxa_init_epd(rx_adapter);
1492         if (err)
1493                 goto err_free_queue;
1494
1495         qd.port = eth_dev_id;
1496         qd.queue = rx_queue_id;
1497
1498         err = rte_eth_dev_rx_intr_ctl_q(eth_dev_id, rx_queue_id,
1499                                         rx_adapter->epd,
1500                                         RTE_INTR_EVENT_ADD,
1501                                         qd.ptr);
1502         if (err) {
1503                 RTE_EDEV_LOG_ERR("Failed to add interrupt event for"
1504                         " Rx Queue %u err %d", rx_queue_id, err);
1505                 goto err_del_fd;
1506         }
1507
1508         err = rte_eth_dev_rx_intr_enable(eth_dev_id, rx_queue_id);
1509         if (err) {
1510                 RTE_EDEV_LOG_ERR("Could not enable interrupt for"
1511                                 " Rx Queue %u err %d", rx_queue_id, err);
1512
1513                 goto err_del_event;
1514         }
1515
1516         err = rxa_create_intr_thread(rx_adapter);
1517         if (!err)  {
1518                 if (sintr)
1519                         dev_info->shared_intr_enabled = 1;
1520                 else
1521                         dev_info->rx_queue[rx_queue_id].intr_enabled = 1;
1522                 return 0;
1523         }
1524
1525
1526         err = rte_eth_dev_rx_intr_disable(eth_dev_id, rx_queue_id);
1527         if (err)
1528                 RTE_EDEV_LOG_ERR("Could not disable interrupt for"
1529                                 " Rx Queue %u err %d", rx_queue_id, err);
1530 err_del_event:
1531         err1 = rte_eth_dev_rx_intr_ctl_q(eth_dev_id, rx_queue_id,
1532                                         rx_adapter->epd,
1533                                         RTE_INTR_EVENT_DEL,
1534                                         0);
1535         if (err1) {
1536                 RTE_EDEV_LOG_ERR("Could not delete event for"
1537                                 " Rx Queue %u err %d", rx_queue_id, err1);
1538         }
1539 err_del_fd:
1540         if (init_fd == INIT_FD) {
1541                 close(rx_adapter->epd);
1542                 rx_adapter->epd = -1;
1543         }
1544 err_free_queue:
1545         if (intr_queue == NULL)
1546                 rte_free(dev_info->intr_queue);
1547
1548         return err;
1549 }
1550
1551 static int
1552 rxa_add_intr_queue(struct rte_event_eth_rx_adapter *rx_adapter,
1553         struct eth_device_info *dev_info,
1554         int rx_queue_id)
1555
1556 {
1557         int i, j, err;
1558         int si = -1;
1559         int shared_done = (dev_info->nb_shared_intr > 0);
1560
1561         if (rx_queue_id != -1) {
1562                 if (rxa_shared_intr(dev_info, rx_queue_id) && shared_done)
1563                         return 0;
1564                 return rxa_config_intr(rx_adapter, dev_info, rx_queue_id);
1565         }
1566
1567         err = 0;
1568         for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++) {
1569
1570                 if (rxa_shared_intr(dev_info, i) && shared_done)
1571                         continue;
1572
1573                 err = rxa_config_intr(rx_adapter, dev_info, i);
1574
1575                 shared_done = err == 0 && rxa_shared_intr(dev_info, i);
1576                 if (shared_done) {
1577                         si = i;
1578                         dev_info->shared_intr_enabled = 1;
1579                 }
1580                 if (err)
1581                         break;
1582         }
1583
1584         if (err == 0)
1585                 return 0;
1586
1587         shared_done = (dev_info->nb_shared_intr > 0);
1588         for (j = 0; j < i; j++) {
1589                 if (rxa_intr_queue(dev_info, j))
1590                         continue;
1591                 if (rxa_shared_intr(dev_info, j) && si != j)
1592                         continue;
1593                 err = rxa_disable_intr(rx_adapter, dev_info, j);
1594                 if (err)
1595                         break;
1596
1597         }
1598
1599         return err;
1600 }
1601
1602
1603 static int
1604 rxa_init_service(struct rte_event_eth_rx_adapter *rx_adapter, uint8_t id)
1605 {
1606         int ret;
1607         struct rte_service_spec service;
1608         struct rte_event_eth_rx_adapter_conf rx_adapter_conf;
1609
1610         if (rx_adapter->service_inited)
1611                 return 0;
1612
1613         memset(&service, 0, sizeof(service));
1614         snprintf(service.name, ETH_RX_ADAPTER_SERVICE_NAME_LEN,
1615                 "rte_event_eth_rx_adapter_%d", id);
1616         service.socket_id = rx_adapter->socket_id;
1617         service.callback = rxa_service_func;
1618         service.callback_userdata = rx_adapter;
1619         /* Service function handles locking for queue add/del updates */
1620         service.capabilities = RTE_SERVICE_CAP_MT_SAFE;
1621         ret = rte_service_component_register(&service, &rx_adapter->service_id);
1622         if (ret) {
1623                 RTE_EDEV_LOG_ERR("failed to register service %s err = %" PRId32,
1624                         service.name, ret);
1625                 return ret;
1626         }
1627
1628         ret = rx_adapter->conf_cb(id, rx_adapter->eventdev_id,
1629                 &rx_adapter_conf, rx_adapter->conf_arg);
1630         if (ret) {
1631                 RTE_EDEV_LOG_ERR("configuration callback failed err = %" PRId32,
1632                         ret);
1633                 goto err_done;
1634         }
1635         rx_adapter->event_port_id = rx_adapter_conf.event_port_id;
1636         rx_adapter->max_nb_rx = rx_adapter_conf.max_nb_rx;
1637         rx_adapter->service_inited = 1;
1638         rx_adapter->epd = INIT_FD;
1639         return 0;
1640
1641 err_done:
1642         rte_service_component_unregister(rx_adapter->service_id);
1643         return ret;
1644 }
1645
1646 static void
1647 rxa_update_queue(struct rte_event_eth_rx_adapter *rx_adapter,
1648                 struct eth_device_info *dev_info,
1649                 int32_t rx_queue_id,
1650                 uint8_t add)
1651 {
1652         struct eth_rx_queue_info *queue_info;
1653         int enabled;
1654         uint16_t i;
1655
1656         if (dev_info->rx_queue == NULL)
1657                 return;
1658
1659         if (rx_queue_id == -1) {
1660                 for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++)
1661                         rxa_update_queue(rx_adapter, dev_info, i, add);
1662         } else {
1663                 queue_info = &dev_info->rx_queue[rx_queue_id];
1664                 enabled = queue_info->queue_enabled;
1665                 if (add) {
1666                         rx_adapter->nb_queues += !enabled;
1667                         dev_info->nb_dev_queues += !enabled;
1668                 } else {
1669                         rx_adapter->nb_queues -= enabled;
1670                         dev_info->nb_dev_queues -= enabled;
1671                 }
1672                 queue_info->queue_enabled = !!add;
1673         }
1674 }
1675
1676 static void
1677 rxa_sw_del(struct rte_event_eth_rx_adapter *rx_adapter,
1678         struct eth_device_info *dev_info,
1679         int32_t rx_queue_id)
1680 {
1681         int pollq;
1682         int intrq;
1683         int sintrq;
1684
1685
1686         if (rx_adapter->nb_queues == 0)
1687                 return;
1688
1689         if (rx_queue_id == -1) {
1690                 uint16_t nb_rx_queues;
1691                 uint16_t i;
1692
1693                 nb_rx_queues = dev_info->dev->data->nb_rx_queues;
1694                 for (i = 0; i < nb_rx_queues; i++)
1695                         rxa_sw_del(rx_adapter, dev_info, i);
1696                 return;
1697         }
1698
1699         pollq = rxa_polled_queue(dev_info, rx_queue_id);
1700         intrq = rxa_intr_queue(dev_info, rx_queue_id);
1701         sintrq = rxa_shared_intr(dev_info, rx_queue_id);
1702         rxa_update_queue(rx_adapter, dev_info, rx_queue_id, 0);
1703         rx_adapter->num_rx_polled -= pollq;
1704         dev_info->nb_rx_poll -= pollq;
1705         rx_adapter->num_rx_intr -= intrq;
1706         dev_info->nb_rx_intr -= intrq;
1707         dev_info->nb_shared_intr -= intrq && sintrq;
1708 }
1709
1710 static void
1711 rxa_add_queue(struct rte_event_eth_rx_adapter *rx_adapter,
1712         struct eth_device_info *dev_info,
1713         int32_t rx_queue_id,
1714         const struct rte_event_eth_rx_adapter_queue_conf *conf)
1715 {
1716         struct eth_rx_queue_info *queue_info;
1717         const struct rte_event *ev = &conf->ev;
1718         int pollq;
1719         int intrq;
1720         int sintrq;
1721
1722         if (rx_queue_id == -1) {
1723                 uint16_t nb_rx_queues;
1724                 uint16_t i;
1725
1726                 nb_rx_queues = dev_info->dev->data->nb_rx_queues;
1727                 for (i = 0; i < nb_rx_queues; i++)
1728                         rxa_add_queue(rx_adapter, dev_info, i, conf);
1729                 return;
1730         }
1731
1732         pollq = rxa_polled_queue(dev_info, rx_queue_id);
1733         intrq = rxa_intr_queue(dev_info, rx_queue_id);
1734         sintrq = rxa_shared_intr(dev_info, rx_queue_id);
1735
1736         queue_info = &dev_info->rx_queue[rx_queue_id];
1737         queue_info->event_queue_id = ev->queue_id;
1738         queue_info->sched_type = ev->sched_type;
1739         queue_info->priority = ev->priority;
1740         queue_info->wt = conf->servicing_weight;
1741
1742         if (conf->rx_queue_flags &
1743                         RTE_EVENT_ETH_RX_ADAPTER_QUEUE_FLOW_ID_VALID) {
1744                 queue_info->flow_id = ev->flow_id;
1745                 queue_info->flow_id_mask = ~0;
1746         }
1747
1748         rxa_update_queue(rx_adapter, dev_info, rx_queue_id, 1);
1749         if (rxa_polled_queue(dev_info, rx_queue_id)) {
1750                 rx_adapter->num_rx_polled += !pollq;
1751                 dev_info->nb_rx_poll += !pollq;
1752                 rx_adapter->num_rx_intr -= intrq;
1753                 dev_info->nb_rx_intr -= intrq;
1754                 dev_info->nb_shared_intr -= intrq && sintrq;
1755         }
1756
1757         if (rxa_intr_queue(dev_info, rx_queue_id)) {
1758                 rx_adapter->num_rx_polled -= pollq;
1759                 dev_info->nb_rx_poll -= pollq;
1760                 rx_adapter->num_rx_intr += !intrq;
1761                 dev_info->nb_rx_intr += !intrq;
1762                 dev_info->nb_shared_intr += !intrq && sintrq;
1763                 if (dev_info->nb_shared_intr == 1) {
1764                         if (dev_info->multi_intr_cap)
1765                                 dev_info->next_q_idx =
1766                                         RTE_MAX_RXTX_INTR_VEC_ID - 1;
1767                         else
1768                                 dev_info->next_q_idx = 0;
1769                 }
1770         }
1771 }
1772
1773 static int rxa_sw_add(struct rte_event_eth_rx_adapter *rx_adapter,
1774                 uint16_t eth_dev_id,
1775                 int rx_queue_id,
1776                 const struct rte_event_eth_rx_adapter_queue_conf *queue_conf)
1777 {
1778         struct eth_device_info *dev_info = &rx_adapter->eth_devices[eth_dev_id];
1779         struct rte_event_eth_rx_adapter_queue_conf temp_conf;
1780         int ret;
1781         struct eth_rx_poll_entry *rx_poll;
1782         struct eth_rx_queue_info *rx_queue;
1783         uint32_t *rx_wrr;
1784         uint16_t nb_rx_queues;
1785         uint32_t nb_rx_poll, nb_wrr;
1786         uint32_t nb_rx_intr;
1787         int num_intr_vec;
1788         uint16_t wt;
1789
1790         if (queue_conf->servicing_weight == 0) {
1791                 struct rte_eth_dev_data *data = dev_info->dev->data;
1792
1793                 temp_conf = *queue_conf;
1794                 if (!data->dev_conf.intr_conf.rxq) {
1795                         /* If Rx interrupts are disabled set wt = 1 */
1796                         temp_conf.servicing_weight = 1;
1797                 }
1798                 queue_conf = &temp_conf;
1799         }
1800
1801         nb_rx_queues = dev_info->dev->data->nb_rx_queues;
1802         rx_queue = dev_info->rx_queue;
1803         wt = queue_conf->servicing_weight;
1804
1805         if (dev_info->rx_queue == NULL) {
1806                 dev_info->rx_queue =
1807                     rte_zmalloc_socket(rx_adapter->mem_name,
1808                                        nb_rx_queues *
1809                                        sizeof(struct eth_rx_queue_info), 0,
1810                                        rx_adapter->socket_id);
1811                 if (dev_info->rx_queue == NULL)
1812                         return -ENOMEM;
1813         }
1814         rx_wrr = NULL;
1815         rx_poll = NULL;
1816
1817         rxa_calc_nb_post_add(rx_adapter, dev_info, rx_queue_id,
1818                         queue_conf->servicing_weight,
1819                         &nb_rx_poll, &nb_rx_intr, &nb_wrr);
1820
1821         if (dev_info->dev->intr_handle)
1822                 dev_info->multi_intr_cap =
1823                         rte_intr_cap_multiple(dev_info->dev->intr_handle);
1824
1825         ret = rxa_alloc_poll_arrays(rx_adapter, nb_rx_poll, nb_wrr,
1826                                 &rx_poll, &rx_wrr);
1827         if (ret)
1828                 goto err_free_rxqueue;
1829
1830         if (wt == 0) {
1831                 num_intr_vec = rxa_nb_intr_vect(dev_info, rx_queue_id, 1);
1832
1833                 ret = rxa_intr_ring_check_avail(rx_adapter, num_intr_vec);
1834                 if (ret)
1835                         goto err_free_rxqueue;
1836
1837                 ret = rxa_add_intr_queue(rx_adapter, dev_info, rx_queue_id);
1838                 if (ret)
1839                         goto err_free_rxqueue;
1840         } else {
1841
1842                 num_intr_vec = 0;
1843                 if (rx_adapter->num_rx_intr > nb_rx_intr) {
1844                         num_intr_vec = rxa_nb_intr_vect(dev_info,
1845                                                 rx_queue_id, 0);
1846                         /* interrupt based queues are being converted to
1847                          * poll mode queues, delete the interrupt configuration
1848                          * for those.
1849                          */
1850                         ret = rxa_del_intr_queue(rx_adapter,
1851                                                 dev_info, rx_queue_id);
1852                         if (ret)
1853                                 goto err_free_rxqueue;
1854                 }
1855         }
1856
1857         if (nb_rx_intr == 0) {
1858                 ret = rxa_free_intr_resources(rx_adapter);
1859                 if (ret)
1860                         goto err_free_rxqueue;
1861         }
1862
1863         if (wt == 0) {
1864                 uint16_t i;
1865
1866                 if (rx_queue_id  == -1) {
1867                         for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++)
1868                                 dev_info->intr_queue[i] = i;
1869                 } else {
1870                         if (!rxa_intr_queue(dev_info, rx_queue_id))
1871                                 dev_info->intr_queue[nb_rx_intr - 1] =
1872                                         rx_queue_id;
1873                 }
1874         }
1875
1876
1877
1878         rxa_add_queue(rx_adapter, dev_info, rx_queue_id, queue_conf);
1879         rxa_calc_wrr_sequence(rx_adapter, rx_poll, rx_wrr);
1880
1881         rte_free(rx_adapter->eth_rx_poll);
1882         rte_free(rx_adapter->wrr_sched);
1883
1884         rx_adapter->eth_rx_poll = rx_poll;
1885         rx_adapter->wrr_sched = rx_wrr;
1886         rx_adapter->wrr_len = nb_wrr;
1887         rx_adapter->num_intr_vec += num_intr_vec;
1888         return 0;
1889
1890 err_free_rxqueue:
1891         if (rx_queue == NULL) {
1892                 rte_free(dev_info->rx_queue);
1893                 dev_info->rx_queue = NULL;
1894         }
1895
1896         rte_free(rx_poll);
1897         rte_free(rx_wrr);
1898
1899         return 0;
1900 }
1901
1902 static int
1903 rxa_ctrl(uint8_t id, int start)
1904 {
1905         struct rte_event_eth_rx_adapter *rx_adapter;
1906         struct rte_eventdev *dev;
1907         struct eth_device_info *dev_info;
1908         uint32_t i;
1909         int use_service = 0;
1910         int stop = !start;
1911
1912         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
1913         rx_adapter = rxa_id_to_adapter(id);
1914         if (rx_adapter == NULL)
1915                 return -EINVAL;
1916
1917         dev = &rte_eventdevs[rx_adapter->eventdev_id];
1918
1919         RTE_ETH_FOREACH_DEV(i) {
1920                 dev_info = &rx_adapter->eth_devices[i];
1921                 /* if start  check for num dev queues */
1922                 if (start && !dev_info->nb_dev_queues)
1923                         continue;
1924                 /* if stop check if dev has been started */
1925                 if (stop && !dev_info->dev_rx_started)
1926                         continue;
1927                 use_service |= !dev_info->internal_event_port;
1928                 dev_info->dev_rx_started = start;
1929                 if (dev_info->internal_event_port == 0)
1930                         continue;
1931                 start ? (*dev->dev_ops->eth_rx_adapter_start)(dev,
1932                                                 &rte_eth_devices[i]) :
1933                         (*dev->dev_ops->eth_rx_adapter_stop)(dev,
1934                                                 &rte_eth_devices[i]);
1935         }
1936
1937         if (use_service) {
1938                 rte_spinlock_lock(&rx_adapter->rx_lock);
1939                 rx_adapter->rxa_started = start;
1940                 rte_service_runstate_set(rx_adapter->service_id, start);
1941                 rte_spinlock_unlock(&rx_adapter->rx_lock);
1942         }
1943
1944         return 0;
1945 }
1946
1947 int
1948 rte_event_eth_rx_adapter_create_ext(uint8_t id, uint8_t dev_id,
1949                                 rte_event_eth_rx_adapter_conf_cb conf_cb,
1950                                 void *conf_arg)
1951 {
1952         struct rte_event_eth_rx_adapter *rx_adapter;
1953         int ret;
1954         int socket_id;
1955         uint16_t i;
1956         char mem_name[ETH_RX_ADAPTER_SERVICE_NAME_LEN];
1957         const uint8_t default_rss_key[] = {
1958                 0x6d, 0x5a, 0x56, 0xda, 0x25, 0x5b, 0x0e, 0xc2,
1959                 0x41, 0x67, 0x25, 0x3d, 0x43, 0xa3, 0x8f, 0xb0,
1960                 0xd0, 0xca, 0x2b, 0xcb, 0xae, 0x7b, 0x30, 0xb4,
1961                 0x77, 0xcb, 0x2d, 0xa3, 0x80, 0x30, 0xf2, 0x0c,
1962                 0x6a, 0x42, 0xb7, 0x3b, 0xbe, 0xac, 0x01, 0xfa,
1963         };
1964
1965         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
1966         RTE_EVENTDEV_VALID_DEVID_OR_ERR_RET(dev_id, -EINVAL);
1967         if (conf_cb == NULL)
1968                 return -EINVAL;
1969
1970         if (event_eth_rx_adapter == NULL) {
1971                 ret = rte_event_eth_rx_adapter_init();
1972                 if (ret)
1973                         return ret;
1974         }
1975
1976         rx_adapter = rxa_id_to_adapter(id);
1977         if (rx_adapter != NULL) {
1978                 RTE_EDEV_LOG_ERR("Eth Rx adapter exists id = %" PRIu8, id);
1979                 return -EEXIST;
1980         }
1981
1982         socket_id = rte_event_dev_socket_id(dev_id);
1983         snprintf(mem_name, ETH_RX_ADAPTER_MEM_NAME_LEN,
1984                 "rte_event_eth_rx_adapter_%d",
1985                 id);
1986
1987         rx_adapter = rte_zmalloc_socket(mem_name, sizeof(*rx_adapter),
1988                         RTE_CACHE_LINE_SIZE, socket_id);
1989         if (rx_adapter == NULL) {
1990                 RTE_EDEV_LOG_ERR("failed to get mem for rx adapter");
1991                 return -ENOMEM;
1992         }
1993
1994         rx_adapter->eventdev_id = dev_id;
1995         rx_adapter->socket_id = socket_id;
1996         rx_adapter->conf_cb = conf_cb;
1997         rx_adapter->conf_arg = conf_arg;
1998         rx_adapter->id = id;
1999         strcpy(rx_adapter->mem_name, mem_name);
2000         rx_adapter->eth_devices = rte_zmalloc_socket(rx_adapter->mem_name,
2001                                         /* FIXME: incompatible with hotplug */
2002                                         rte_eth_dev_count_total() *
2003                                         sizeof(struct eth_device_info), 0,
2004                                         socket_id);
2005         rte_convert_rss_key((const uint32_t *)default_rss_key,
2006                         (uint32_t *)rx_adapter->rss_key_be,
2007                             RTE_DIM(default_rss_key));
2008
2009         if (rx_adapter->eth_devices == NULL) {
2010                 RTE_EDEV_LOG_ERR("failed to get mem for eth devices\n");
2011                 rte_free(rx_adapter);
2012                 return -ENOMEM;
2013         }
2014         rte_spinlock_init(&rx_adapter->rx_lock);
2015         RTE_ETH_FOREACH_DEV(i)
2016                 rx_adapter->eth_devices[i].dev = &rte_eth_devices[i];
2017
2018         event_eth_rx_adapter[id] = rx_adapter;
2019         if (conf_cb == rxa_default_conf_cb)
2020                 rx_adapter->default_cb_arg = 1;
2021         return 0;
2022 }
2023
2024 int
2025 rte_event_eth_rx_adapter_create(uint8_t id, uint8_t dev_id,
2026                 struct rte_event_port_conf *port_config)
2027 {
2028         struct rte_event_port_conf *pc;
2029         int ret;
2030
2031         if (port_config == NULL)
2032                 return -EINVAL;
2033         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2034
2035         pc = rte_malloc(NULL, sizeof(*pc), 0);
2036         if (pc == NULL)
2037                 return -ENOMEM;
2038         *pc = *port_config;
2039         ret = rte_event_eth_rx_adapter_create_ext(id, dev_id,
2040                                         rxa_default_conf_cb,
2041                                         pc);
2042         if (ret)
2043                 rte_free(pc);
2044         return ret;
2045 }
2046
2047 int
2048 rte_event_eth_rx_adapter_free(uint8_t id)
2049 {
2050         struct rte_event_eth_rx_adapter *rx_adapter;
2051
2052         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2053
2054         rx_adapter = rxa_id_to_adapter(id);
2055         if (rx_adapter == NULL)
2056                 return -EINVAL;
2057
2058         if (rx_adapter->nb_queues) {
2059                 RTE_EDEV_LOG_ERR("%" PRIu16 " Rx queues not deleted",
2060                                 rx_adapter->nb_queues);
2061                 return -EBUSY;
2062         }
2063
2064         if (rx_adapter->default_cb_arg)
2065                 rte_free(rx_adapter->conf_arg);
2066         rte_free(rx_adapter->eth_devices);
2067         rte_free(rx_adapter);
2068         event_eth_rx_adapter[id] = NULL;
2069
2070         return 0;
2071 }
2072
2073 int
2074 rte_event_eth_rx_adapter_queue_add(uint8_t id,
2075                 uint16_t eth_dev_id,
2076                 int32_t rx_queue_id,
2077                 const struct rte_event_eth_rx_adapter_queue_conf *queue_conf)
2078 {
2079         int ret;
2080         uint32_t cap;
2081         struct rte_event_eth_rx_adapter *rx_adapter;
2082         struct rte_eventdev *dev;
2083         struct eth_device_info *dev_info;
2084
2085         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2086         RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
2087
2088         rx_adapter = rxa_id_to_adapter(id);
2089         if ((rx_adapter == NULL) || (queue_conf == NULL))
2090                 return -EINVAL;
2091
2092         dev = &rte_eventdevs[rx_adapter->eventdev_id];
2093         ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
2094                                                 eth_dev_id,
2095                                                 &cap);
2096         if (ret) {
2097                 RTE_EDEV_LOG_ERR("Failed to get adapter caps edev %" PRIu8
2098                         "eth port %" PRIu16, id, eth_dev_id);
2099                 return ret;
2100         }
2101
2102         if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_OVERRIDE_FLOW_ID) == 0
2103                 && (queue_conf->rx_queue_flags &
2104                         RTE_EVENT_ETH_RX_ADAPTER_QUEUE_FLOW_ID_VALID)) {
2105                 RTE_EDEV_LOG_ERR("Flow ID override is not supported,"
2106                                 " eth port: %" PRIu16 " adapter id: %" PRIu8,
2107                                 eth_dev_id, id);
2108                 return -EINVAL;
2109         }
2110
2111         if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_MULTI_EVENTQ) == 0 &&
2112                 (rx_queue_id != -1)) {
2113                 RTE_EDEV_LOG_ERR("Rx queues can only be connected to single "
2114                         "event queue, eth port: %" PRIu16 " adapter id: %"
2115                         PRIu8, eth_dev_id, id);
2116                 return -EINVAL;
2117         }
2118
2119         if (rx_queue_id != -1 && (uint16_t)rx_queue_id >=
2120                         rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
2121                 RTE_EDEV_LOG_ERR("Invalid rx queue_id %" PRIu16,
2122                          (uint16_t)rx_queue_id);
2123                 return -EINVAL;
2124         }
2125
2126         dev_info = &rx_adapter->eth_devices[eth_dev_id];
2127
2128         if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
2129                 RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->eth_rx_adapter_queue_add,
2130                                         -ENOTSUP);
2131                 if (dev_info->rx_queue == NULL) {
2132                         dev_info->rx_queue =
2133                             rte_zmalloc_socket(rx_adapter->mem_name,
2134                                         dev_info->dev->data->nb_rx_queues *
2135                                         sizeof(struct eth_rx_queue_info), 0,
2136                                         rx_adapter->socket_id);
2137                         if (dev_info->rx_queue == NULL)
2138                                 return -ENOMEM;
2139                 }
2140
2141                 ret = (*dev->dev_ops->eth_rx_adapter_queue_add)(dev,
2142                                 &rte_eth_devices[eth_dev_id],
2143                                 rx_queue_id, queue_conf);
2144                 if (ret == 0) {
2145                         dev_info->internal_event_port = 1;
2146                         rxa_update_queue(rx_adapter,
2147                                         &rx_adapter->eth_devices[eth_dev_id],
2148                                         rx_queue_id,
2149                                         1);
2150                 }
2151         } else {
2152                 rte_spinlock_lock(&rx_adapter->rx_lock);
2153                 dev_info->internal_event_port = 0;
2154                 ret = rxa_init_service(rx_adapter, id);
2155                 if (ret == 0) {
2156                         uint32_t service_id = rx_adapter->service_id;
2157                         ret = rxa_sw_add(rx_adapter, eth_dev_id, rx_queue_id,
2158                                         queue_conf);
2159                         rte_service_component_runstate_set(service_id,
2160                                 rxa_sw_adapter_queue_count(rx_adapter));
2161                 }
2162                 rte_spinlock_unlock(&rx_adapter->rx_lock);
2163         }
2164
2165         if (ret)
2166                 return ret;
2167
2168         return 0;
2169 }
2170
2171 int
2172 rte_event_eth_rx_adapter_queue_del(uint8_t id, uint16_t eth_dev_id,
2173                                 int32_t rx_queue_id)
2174 {
2175         int ret = 0;
2176         struct rte_eventdev *dev;
2177         struct rte_event_eth_rx_adapter *rx_adapter;
2178         struct eth_device_info *dev_info;
2179         uint32_t cap;
2180         uint32_t nb_rx_poll = 0;
2181         uint32_t nb_wrr = 0;
2182         uint32_t nb_rx_intr;
2183         struct eth_rx_poll_entry *rx_poll = NULL;
2184         uint32_t *rx_wrr = NULL;
2185         int num_intr_vec;
2186
2187         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2188         RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
2189
2190         rx_adapter = rxa_id_to_adapter(id);
2191         if (rx_adapter == NULL)
2192                 return -EINVAL;
2193
2194         dev = &rte_eventdevs[rx_adapter->eventdev_id];
2195         ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
2196                                                 eth_dev_id,
2197                                                 &cap);
2198         if (ret)
2199                 return ret;
2200
2201         if (rx_queue_id != -1 && (uint16_t)rx_queue_id >=
2202                 rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
2203                 RTE_EDEV_LOG_ERR("Invalid rx queue_id %" PRIu16,
2204                          (uint16_t)rx_queue_id);
2205                 return -EINVAL;
2206         }
2207
2208         dev_info = &rx_adapter->eth_devices[eth_dev_id];
2209
2210         if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
2211                 RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->eth_rx_adapter_queue_del,
2212                                  -ENOTSUP);
2213                 ret = (*dev->dev_ops->eth_rx_adapter_queue_del)(dev,
2214                                                 &rte_eth_devices[eth_dev_id],
2215                                                 rx_queue_id);
2216                 if (ret == 0) {
2217                         rxa_update_queue(rx_adapter,
2218                                         &rx_adapter->eth_devices[eth_dev_id],
2219                                         rx_queue_id,
2220                                         0);
2221                         if (dev_info->nb_dev_queues == 0) {
2222                                 rte_free(dev_info->rx_queue);
2223                                 dev_info->rx_queue = NULL;
2224                         }
2225                 }
2226         } else {
2227                 rxa_calc_nb_post_del(rx_adapter, dev_info, rx_queue_id,
2228                         &nb_rx_poll, &nb_rx_intr, &nb_wrr);
2229
2230                 ret = rxa_alloc_poll_arrays(rx_adapter, nb_rx_poll, nb_wrr,
2231                         &rx_poll, &rx_wrr);
2232                 if (ret)
2233                         return ret;
2234
2235                 rte_spinlock_lock(&rx_adapter->rx_lock);
2236
2237                 num_intr_vec = 0;
2238                 if (rx_adapter->num_rx_intr > nb_rx_intr) {
2239
2240                         num_intr_vec = rxa_nb_intr_vect(dev_info,
2241                                                 rx_queue_id, 0);
2242                         ret = rxa_del_intr_queue(rx_adapter, dev_info,
2243                                         rx_queue_id);
2244                         if (ret)
2245                                 goto unlock_ret;
2246                 }
2247
2248                 if (nb_rx_intr == 0) {
2249                         ret = rxa_free_intr_resources(rx_adapter);
2250                         if (ret)
2251                                 goto unlock_ret;
2252                 }
2253
2254                 rxa_sw_del(rx_adapter, dev_info, rx_queue_id);
2255                 rxa_calc_wrr_sequence(rx_adapter, rx_poll, rx_wrr);
2256
2257                 rte_free(rx_adapter->eth_rx_poll);
2258                 rte_free(rx_adapter->wrr_sched);
2259
2260                 if (nb_rx_intr == 0) {
2261                         rte_free(dev_info->intr_queue);
2262                         dev_info->intr_queue = NULL;
2263                 }
2264
2265                 rx_adapter->eth_rx_poll = rx_poll;
2266                 rx_adapter->wrr_sched = rx_wrr;
2267                 rx_adapter->wrr_len = nb_wrr;
2268                 rx_adapter->num_intr_vec += num_intr_vec;
2269
2270                 if (dev_info->nb_dev_queues == 0) {
2271                         rte_free(dev_info->rx_queue);
2272                         dev_info->rx_queue = NULL;
2273                 }
2274 unlock_ret:
2275                 rte_spinlock_unlock(&rx_adapter->rx_lock);
2276                 if (ret) {
2277                         rte_free(rx_poll);
2278                         rte_free(rx_wrr);
2279                         return ret;
2280                 }
2281
2282                 rte_service_component_runstate_set(rx_adapter->service_id,
2283                                 rxa_sw_adapter_queue_count(rx_adapter));
2284         }
2285
2286         return ret;
2287 }
2288
2289 int
2290 rte_event_eth_rx_adapter_start(uint8_t id)
2291 {
2292         return rxa_ctrl(id, 1);
2293 }
2294
2295 int
2296 rte_event_eth_rx_adapter_stop(uint8_t id)
2297 {
2298         return rxa_ctrl(id, 0);
2299 }
2300
2301 int
2302 rte_event_eth_rx_adapter_stats_get(uint8_t id,
2303                                struct rte_event_eth_rx_adapter_stats *stats)
2304 {
2305         struct rte_event_eth_rx_adapter *rx_adapter;
2306         struct rte_event_eth_rx_adapter_stats dev_stats_sum = { 0 };
2307         struct rte_event_eth_rx_adapter_stats dev_stats;
2308         struct rte_eventdev *dev;
2309         struct eth_device_info *dev_info;
2310         uint32_t i;
2311         int ret;
2312
2313         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2314
2315         rx_adapter = rxa_id_to_adapter(id);
2316         if (rx_adapter  == NULL || stats == NULL)
2317                 return -EINVAL;
2318
2319         dev = &rte_eventdevs[rx_adapter->eventdev_id];
2320         memset(stats, 0, sizeof(*stats));
2321         RTE_ETH_FOREACH_DEV(i) {
2322                 dev_info = &rx_adapter->eth_devices[i];
2323                 if (dev_info->internal_event_port == 0 ||
2324                         dev->dev_ops->eth_rx_adapter_stats_get == NULL)
2325                         continue;
2326                 ret = (*dev->dev_ops->eth_rx_adapter_stats_get)(dev,
2327                                                 &rte_eth_devices[i],
2328                                                 &dev_stats);
2329                 if (ret)
2330                         continue;
2331                 dev_stats_sum.rx_packets += dev_stats.rx_packets;
2332                 dev_stats_sum.rx_enq_count += dev_stats.rx_enq_count;
2333         }
2334
2335         if (rx_adapter->service_inited)
2336                 *stats = rx_adapter->stats;
2337
2338         stats->rx_packets += dev_stats_sum.rx_packets;
2339         stats->rx_enq_count += dev_stats_sum.rx_enq_count;
2340         return 0;
2341 }
2342
2343 int
2344 rte_event_eth_rx_adapter_stats_reset(uint8_t id)
2345 {
2346         struct rte_event_eth_rx_adapter *rx_adapter;
2347         struct rte_eventdev *dev;
2348         struct eth_device_info *dev_info;
2349         uint32_t i;
2350
2351         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2352
2353         rx_adapter = rxa_id_to_adapter(id);
2354         if (rx_adapter == NULL)
2355                 return -EINVAL;
2356
2357         dev = &rte_eventdevs[rx_adapter->eventdev_id];
2358         RTE_ETH_FOREACH_DEV(i) {
2359                 dev_info = &rx_adapter->eth_devices[i];
2360                 if (dev_info->internal_event_port == 0 ||
2361                         dev->dev_ops->eth_rx_adapter_stats_reset == NULL)
2362                         continue;
2363                 (*dev->dev_ops->eth_rx_adapter_stats_reset)(dev,
2364                                                         &rte_eth_devices[i]);
2365         }
2366
2367         memset(&rx_adapter->stats, 0, sizeof(rx_adapter->stats));
2368         return 0;
2369 }
2370
2371 int
2372 rte_event_eth_rx_adapter_service_id_get(uint8_t id, uint32_t *service_id)
2373 {
2374         struct rte_event_eth_rx_adapter *rx_adapter;
2375
2376         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2377
2378         rx_adapter = rxa_id_to_adapter(id);
2379         if (rx_adapter == NULL || service_id == NULL)
2380                 return -EINVAL;
2381
2382         if (rx_adapter->service_inited)
2383                 *service_id = rx_adapter->service_id;
2384
2385         return rx_adapter->service_inited ? 0 : -ESRCH;
2386 }
2387
2388 int rte_event_eth_rx_adapter_cb_register(uint8_t id,
2389                                         uint16_t eth_dev_id,
2390                                         rte_event_eth_rx_adapter_cb_fn cb_fn,
2391                                         void *cb_arg)
2392 {
2393         struct rte_event_eth_rx_adapter *rx_adapter;
2394         struct eth_device_info *dev_info;
2395         uint32_t cap;
2396         int ret;
2397
2398         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2399         RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
2400
2401         rx_adapter = rxa_id_to_adapter(id);
2402         if (rx_adapter == NULL)
2403                 return -EINVAL;
2404
2405         dev_info = &rx_adapter->eth_devices[eth_dev_id];
2406         if (dev_info->rx_queue == NULL)
2407                 return -EINVAL;
2408
2409         ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
2410                                                 eth_dev_id,
2411                                                 &cap);
2412         if (ret) {
2413                 RTE_EDEV_LOG_ERR("Failed to get adapter caps edev %" PRIu8
2414                         "eth port %" PRIu16, id, eth_dev_id);
2415                 return ret;
2416         }
2417
2418         if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
2419                 RTE_EDEV_LOG_ERR("Rx callback not supported for eth port %"
2420                                 PRIu16, eth_dev_id);
2421                 return -EINVAL;
2422         }
2423
2424         rte_spinlock_lock(&rx_adapter->rx_lock);
2425         dev_info->cb_fn = cb_fn;
2426         dev_info->cb_arg = cb_arg;
2427         rte_spinlock_unlock(&rx_adapter->rx_lock);
2428
2429         return 0;
2430 }