New upstream version 18.11-rc1
[deb_dpdk.git] / lib / librte_eventdev / rte_event_eth_rx_adapter.c
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2017 Intel Corporation.
3  * All rights reserved.
4  */
5 #if defined(LINUX)
6 #include <sys/epoll.h>
7 #endif
8 #include <unistd.h>
9
10 #include <rte_cycles.h>
11 #include <rte_common.h>
12 #include <rte_dev.h>
13 #include <rte_errno.h>
14 #include <rte_ethdev.h>
15 #include <rte_log.h>
16 #include <rte_malloc.h>
17 #include <rte_service_component.h>
18 #include <rte_thash.h>
19 #include <rte_interrupts.h>
20
21 #include "rte_eventdev.h"
22 #include "rte_eventdev_pmd.h"
23 #include "rte_event_eth_rx_adapter.h"
24
25 #define BATCH_SIZE              32
26 #define BLOCK_CNT_THRESHOLD     10
27 #define ETH_EVENT_BUFFER_SIZE   (4*BATCH_SIZE)
28
29 #define ETH_RX_ADAPTER_SERVICE_NAME_LEN 32
30 #define ETH_RX_ADAPTER_MEM_NAME_LEN     32
31
32 #define RSS_KEY_SIZE    40
33 /* value written to intr thread pipe to signal thread exit */
34 #define ETH_BRIDGE_INTR_THREAD_EXIT     1
35 /* Sentinel value to detect initialized file handle */
36 #define INIT_FD         -1
37
38 /*
39  * Used to store port and queue ID of interrupting Rx queue
40  */
41 union queue_data {
42         RTE_STD_C11
43         void *ptr;
44         struct {
45                 uint16_t port;
46                 uint16_t queue;
47         };
48 };
49
50 /*
51  * There is an instance of this struct per polled Rx queue added to the
52  * adapter
53  */
54 struct eth_rx_poll_entry {
55         /* Eth port to poll */
56         uint16_t eth_dev_id;
57         /* Eth rx queue to poll */
58         uint16_t eth_rx_qid;
59 };
60
61 /* Instance per adapter */
62 struct rte_eth_event_enqueue_buffer {
63         /* Count of events in this buffer */
64         uint16_t count;
65         /* Array of events in this buffer */
66         struct rte_event events[ETH_EVENT_BUFFER_SIZE];
67 };
68
69 struct rte_event_eth_rx_adapter {
70         /* RSS key */
71         uint8_t rss_key_be[RSS_KEY_SIZE];
72         /* Event device identifier */
73         uint8_t eventdev_id;
74         /* Per ethernet device structure */
75         struct eth_device_info *eth_devices;
76         /* Event port identifier */
77         uint8_t event_port_id;
78         /* Lock to serialize config updates with service function */
79         rte_spinlock_t rx_lock;
80         /* Max mbufs processed in any service function invocation */
81         uint32_t max_nb_rx;
82         /* Receive queues that need to be polled */
83         struct eth_rx_poll_entry *eth_rx_poll;
84         /* Size of the eth_rx_poll array */
85         uint16_t num_rx_polled;
86         /* Weighted round robin schedule */
87         uint32_t *wrr_sched;
88         /* wrr_sched[] size */
89         uint32_t wrr_len;
90         /* Next entry in wrr[] to begin polling */
91         uint32_t wrr_pos;
92         /* Event burst buffer */
93         struct rte_eth_event_enqueue_buffer event_enqueue_buffer;
94         /* Per adapter stats */
95         struct rte_event_eth_rx_adapter_stats stats;
96         /* Block count, counts up to BLOCK_CNT_THRESHOLD */
97         uint16_t enq_block_count;
98         /* Block start ts */
99         uint64_t rx_enq_block_start_ts;
100         /* epoll fd used to wait for Rx interrupts */
101         int epd;
102         /* Num of interrupt driven interrupt queues */
103         uint32_t num_rx_intr;
104         /* Used to send <dev id, queue id> of interrupting Rx queues from
105          * the interrupt thread to the Rx thread
106          */
107         struct rte_ring *intr_ring;
108         /* Rx Queue data (dev id, queue id) for the last non-empty
109          * queue polled
110          */
111         union queue_data qd;
112         /* queue_data is valid */
113         int qd_valid;
114         /* Interrupt ring lock, synchronizes Rx thread
115          * and interrupt thread
116          */
117         rte_spinlock_t intr_ring_lock;
118         /* event array passed to rte_poll_wait */
119         struct rte_epoll_event *epoll_events;
120         /* Count of interrupt vectors in use */
121         uint32_t num_intr_vec;
122         /* Thread blocked on Rx interrupts */
123         pthread_t rx_intr_thread;
124         /* Configuration callback for rte_service configuration */
125         rte_event_eth_rx_adapter_conf_cb conf_cb;
126         /* Configuration callback argument */
127         void *conf_arg;
128         /* Set if  default_cb is being used */
129         int default_cb_arg;
130         /* Service initialization state */
131         uint8_t service_inited;
132         /* Total count of Rx queues in adapter */
133         uint32_t nb_queues;
134         /* Memory allocation name */
135         char mem_name[ETH_RX_ADAPTER_MEM_NAME_LEN];
136         /* Socket identifier cached from eventdev */
137         int socket_id;
138         /* Per adapter EAL service */
139         uint32_t service_id;
140         /* Adapter started flag */
141         uint8_t rxa_started;
142         /* Adapter ID */
143         uint8_t id;
144 } __rte_cache_aligned;
145
146 /* Per eth device */
147 struct eth_device_info {
148         struct rte_eth_dev *dev;
149         struct eth_rx_queue_info *rx_queue;
150         /* Rx callback */
151         rte_event_eth_rx_adapter_cb_fn cb_fn;
152         /* Rx callback argument */
153         void *cb_arg;
154         /* Set if ethdev->eventdev packet transfer uses a
155          * hardware mechanism
156          */
157         uint8_t internal_event_port;
158         /* Set if the adapter is processing rx queues for
159          * this eth device and packet processing has been
160          * started, allows for the code to know if the PMD
161          * rx_adapter_stop callback needs to be invoked
162          */
163         uint8_t dev_rx_started;
164         /* Number of queues added for this device */
165         uint16_t nb_dev_queues;
166         /* Number of poll based queues
167          * If nb_rx_poll > 0, the start callback will
168          * be invoked if not already invoked
169          */
170         uint16_t nb_rx_poll;
171         /* Number of interrupt based queues
172          * If nb_rx_intr > 0, the start callback will
173          * be invoked if not already invoked.
174          */
175         uint16_t nb_rx_intr;
176         /* Number of queues that use the shared interrupt */
177         uint16_t nb_shared_intr;
178         /* sum(wrr(q)) for all queues within the device
179          * useful when deleting all device queues
180          */
181         uint32_t wrr_len;
182         /* Intr based queue index to start polling from, this is used
183          * if the number of shared interrupts is non-zero
184          */
185         uint16_t next_q_idx;
186         /* Intr based queue indices */
187         uint16_t *intr_queue;
188         /* device generates per Rx queue interrupt for queue index
189          * for queue indices < RTE_MAX_RXTX_INTR_VEC_ID - 1
190          */
191         int multi_intr_cap;
192         /* shared interrupt enabled */
193         int shared_intr_enabled;
194 };
195
196 /* Per Rx queue */
197 struct eth_rx_queue_info {
198         int queue_enabled;      /* True if added */
199         int intr_enabled;
200         uint16_t wt;            /* Polling weight */
201         uint8_t event_queue_id; /* Event queue to enqueue packets to */
202         uint8_t sched_type;     /* Sched type for events */
203         uint8_t priority;       /* Event priority */
204         uint32_t flow_id;       /* App provided flow identifier */
205         uint32_t flow_id_mask;  /* Set to ~0 if app provides flow id else 0 */
206 };
207
208 static struct rte_event_eth_rx_adapter **event_eth_rx_adapter;
209
210 static inline int
211 rxa_validate_id(uint8_t id)
212 {
213         return id < RTE_EVENT_ETH_RX_ADAPTER_MAX_INSTANCE;
214 }
215
216 #define RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, retval) do { \
217         if (!rxa_validate_id(id)) { \
218                 RTE_EDEV_LOG_ERR("Invalid eth Rx adapter id = %d\n", id); \
219                 return retval; \
220         } \
221 } while (0)
222
223 static inline int
224 rxa_sw_adapter_queue_count(struct rte_event_eth_rx_adapter *rx_adapter)
225 {
226         return rx_adapter->num_rx_polled + rx_adapter->num_rx_intr;
227 }
228
229 /* Greatest common divisor */
230 static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)
231 {
232         uint16_t r = a % b;
233
234         return r ? rxa_gcd_u16(b, r) : b;
235 }
236
237 /* Returns the next queue in the polling sequence
238  *
239  * http://kb.linuxvirtualserver.org/wiki/Weighted_Round-Robin_Scheduling
240  */
241 static int
242 rxa_wrr_next(struct rte_event_eth_rx_adapter *rx_adapter,
243          unsigned int n, int *cw,
244          struct eth_rx_poll_entry *eth_rx_poll, uint16_t max_wt,
245          uint16_t gcd, int prev)
246 {
247         int i = prev;
248         uint16_t w;
249
250         while (1) {
251                 uint16_t q;
252                 uint16_t d;
253
254                 i = (i + 1) % n;
255                 if (i == 0) {
256                         *cw = *cw - gcd;
257                         if (*cw <= 0)
258                                 *cw = max_wt;
259                 }
260
261                 q = eth_rx_poll[i].eth_rx_qid;
262                 d = eth_rx_poll[i].eth_dev_id;
263                 w = rx_adapter->eth_devices[d].rx_queue[q].wt;
264
265                 if ((int)w >= *cw)
266                         return i;
267         }
268 }
269
270 static inline int
271 rxa_shared_intr(struct eth_device_info *dev_info,
272         int rx_queue_id)
273 {
274         int multi_intr_cap;
275
276         if (dev_info->dev->intr_handle == NULL)
277                 return 0;
278
279         multi_intr_cap = rte_intr_cap_multiple(dev_info->dev->intr_handle);
280         return !multi_intr_cap ||
281                 rx_queue_id >= RTE_MAX_RXTX_INTR_VEC_ID - 1;
282 }
283
284 static inline int
285 rxa_intr_queue(struct eth_device_info *dev_info,
286         int rx_queue_id)
287 {
288         struct eth_rx_queue_info *queue_info;
289
290         queue_info = &dev_info->rx_queue[rx_queue_id];
291         return dev_info->rx_queue &&
292                 !dev_info->internal_event_port &&
293                 queue_info->queue_enabled && queue_info->wt == 0;
294 }
295
296 static inline int
297 rxa_polled_queue(struct eth_device_info *dev_info,
298         int rx_queue_id)
299 {
300         struct eth_rx_queue_info *queue_info;
301
302         queue_info = &dev_info->rx_queue[rx_queue_id];
303         return !dev_info->internal_event_port &&
304                 dev_info->rx_queue &&
305                 queue_info->queue_enabled && queue_info->wt != 0;
306 }
307
308 /* Calculate change in number of vectors after Rx queue ID is add/deleted */
309 static int
310 rxa_nb_intr_vect(struct eth_device_info *dev_info, int rx_queue_id, int add)
311 {
312         uint16_t i;
313         int n, s;
314         uint16_t nbq;
315
316         nbq = dev_info->dev->data->nb_rx_queues;
317         n = 0; /* non shared count */
318         s = 0; /* shared count */
319
320         if (rx_queue_id == -1) {
321                 for (i = 0; i < nbq; i++) {
322                         if (!rxa_shared_intr(dev_info, i))
323                                 n += add ? !rxa_intr_queue(dev_info, i) :
324                                         rxa_intr_queue(dev_info, i);
325                         else
326                                 s += add ? !rxa_intr_queue(dev_info, i) :
327                                         rxa_intr_queue(dev_info, i);
328                 }
329
330                 if (s > 0) {
331                         if ((add && dev_info->nb_shared_intr == 0) ||
332                                 (!add && dev_info->nb_shared_intr))
333                                 n += 1;
334                 }
335         } else {
336                 if (!rxa_shared_intr(dev_info, rx_queue_id))
337                         n = add ? !rxa_intr_queue(dev_info, rx_queue_id) :
338                                 rxa_intr_queue(dev_info, rx_queue_id);
339                 else
340                         n = add ? !dev_info->nb_shared_intr :
341                                 dev_info->nb_shared_intr == 1;
342         }
343
344         return add ? n : -n;
345 }
346
347 /* Calculate nb_rx_intr after deleting interrupt mode rx queues
348  */
349 static void
350 rxa_calc_nb_post_intr_del(struct rte_event_eth_rx_adapter *rx_adapter,
351                         struct eth_device_info *dev_info,
352                         int rx_queue_id,
353                         uint32_t *nb_rx_intr)
354 {
355         uint32_t intr_diff;
356
357         if (rx_queue_id == -1)
358                 intr_diff = dev_info->nb_rx_intr;
359         else
360                 intr_diff = rxa_intr_queue(dev_info, rx_queue_id);
361
362         *nb_rx_intr = rx_adapter->num_rx_intr - intr_diff;
363 }
364
365 /* Calculate nb_rx_* after adding interrupt mode rx queues, newly added
366  * interrupt queues could currently be poll mode Rx queues
367  */
368 static void
369 rxa_calc_nb_post_add_intr(struct rte_event_eth_rx_adapter *rx_adapter,
370                         struct eth_device_info *dev_info,
371                         int rx_queue_id,
372                         uint32_t *nb_rx_poll,
373                         uint32_t *nb_rx_intr,
374                         uint32_t *nb_wrr)
375 {
376         uint32_t intr_diff;
377         uint32_t poll_diff;
378         uint32_t wrr_len_diff;
379
380         if (rx_queue_id == -1) {
381                 intr_diff = dev_info->dev->data->nb_rx_queues -
382                                                 dev_info->nb_rx_intr;
383                 poll_diff = dev_info->nb_rx_poll;
384                 wrr_len_diff = dev_info->wrr_len;
385         } else {
386                 intr_diff = !rxa_intr_queue(dev_info, rx_queue_id);
387                 poll_diff = rxa_polled_queue(dev_info, rx_queue_id);
388                 wrr_len_diff = poll_diff ? dev_info->rx_queue[rx_queue_id].wt :
389                                         0;
390         }
391
392         *nb_rx_intr = rx_adapter->num_rx_intr + intr_diff;
393         *nb_rx_poll = rx_adapter->num_rx_polled - poll_diff;
394         *nb_wrr = rx_adapter->wrr_len - wrr_len_diff;
395 }
396
397 /* Calculate size of the eth_rx_poll and wrr_sched arrays
398  * after deleting poll mode rx queues
399  */
400 static void
401 rxa_calc_nb_post_poll_del(struct rte_event_eth_rx_adapter *rx_adapter,
402                         struct eth_device_info *dev_info,
403                         int rx_queue_id,
404                         uint32_t *nb_rx_poll,
405                         uint32_t *nb_wrr)
406 {
407         uint32_t poll_diff;
408         uint32_t wrr_len_diff;
409
410         if (rx_queue_id == -1) {
411                 poll_diff = dev_info->nb_rx_poll;
412                 wrr_len_diff = dev_info->wrr_len;
413         } else {
414                 poll_diff = rxa_polled_queue(dev_info, rx_queue_id);
415                 wrr_len_diff = poll_diff ? dev_info->rx_queue[rx_queue_id].wt :
416                                         0;
417         }
418
419         *nb_rx_poll = rx_adapter->num_rx_polled - poll_diff;
420         *nb_wrr = rx_adapter->wrr_len - wrr_len_diff;
421 }
422
423 /* Calculate nb_rx_* after adding poll mode rx queues
424  */
425 static void
426 rxa_calc_nb_post_add_poll(struct rte_event_eth_rx_adapter *rx_adapter,
427                         struct eth_device_info *dev_info,
428                         int rx_queue_id,
429                         uint16_t wt,
430                         uint32_t *nb_rx_poll,
431                         uint32_t *nb_rx_intr,
432                         uint32_t *nb_wrr)
433 {
434         uint32_t intr_diff;
435         uint32_t poll_diff;
436         uint32_t wrr_len_diff;
437
438         if (rx_queue_id == -1) {
439                 intr_diff = dev_info->nb_rx_intr;
440                 poll_diff = dev_info->dev->data->nb_rx_queues -
441                                                 dev_info->nb_rx_poll;
442                 wrr_len_diff = wt*dev_info->dev->data->nb_rx_queues
443                                 - dev_info->wrr_len;
444         } else {
445                 intr_diff = rxa_intr_queue(dev_info, rx_queue_id);
446                 poll_diff = !rxa_polled_queue(dev_info, rx_queue_id);
447                 wrr_len_diff = rxa_polled_queue(dev_info, rx_queue_id) ?
448                                 wt - dev_info->rx_queue[rx_queue_id].wt :
449                                 wt;
450         }
451
452         *nb_rx_poll = rx_adapter->num_rx_polled + poll_diff;
453         *nb_rx_intr = rx_adapter->num_rx_intr - intr_diff;
454         *nb_wrr = rx_adapter->wrr_len + wrr_len_diff;
455 }
456
457 /* Calculate nb_rx_* after adding rx_queue_id */
458 static void
459 rxa_calc_nb_post_add(struct rte_event_eth_rx_adapter *rx_adapter,
460                 struct eth_device_info *dev_info,
461                 int rx_queue_id,
462                 uint16_t wt,
463                 uint32_t *nb_rx_poll,
464                 uint32_t *nb_rx_intr,
465                 uint32_t *nb_wrr)
466 {
467         if (wt != 0)
468                 rxa_calc_nb_post_add_poll(rx_adapter, dev_info, rx_queue_id,
469                                         wt, nb_rx_poll, nb_rx_intr, nb_wrr);
470         else
471                 rxa_calc_nb_post_add_intr(rx_adapter, dev_info, rx_queue_id,
472                                         nb_rx_poll, nb_rx_intr, nb_wrr);
473 }
474
475 /* Calculate nb_rx_* after deleting rx_queue_id */
476 static void
477 rxa_calc_nb_post_del(struct rte_event_eth_rx_adapter *rx_adapter,
478                 struct eth_device_info *dev_info,
479                 int rx_queue_id,
480                 uint32_t *nb_rx_poll,
481                 uint32_t *nb_rx_intr,
482                 uint32_t *nb_wrr)
483 {
484         rxa_calc_nb_post_poll_del(rx_adapter, dev_info, rx_queue_id, nb_rx_poll,
485                                 nb_wrr);
486         rxa_calc_nb_post_intr_del(rx_adapter, dev_info, rx_queue_id,
487                                 nb_rx_intr);
488 }
489
490 /*
491  * Allocate the rx_poll array
492  */
493 static struct eth_rx_poll_entry *
494 rxa_alloc_poll(struct rte_event_eth_rx_adapter *rx_adapter,
495         uint32_t num_rx_polled)
496 {
497         size_t len;
498
499         len  = RTE_ALIGN(num_rx_polled * sizeof(*rx_adapter->eth_rx_poll),
500                                                         RTE_CACHE_LINE_SIZE);
501         return  rte_zmalloc_socket(rx_adapter->mem_name,
502                                 len,
503                                 RTE_CACHE_LINE_SIZE,
504                                 rx_adapter->socket_id);
505 }
506
507 /*
508  * Allocate the WRR array
509  */
510 static uint32_t *
511 rxa_alloc_wrr(struct rte_event_eth_rx_adapter *rx_adapter, int nb_wrr)
512 {
513         size_t len;
514
515         len = RTE_ALIGN(nb_wrr * sizeof(*rx_adapter->wrr_sched),
516                         RTE_CACHE_LINE_SIZE);
517         return  rte_zmalloc_socket(rx_adapter->mem_name,
518                                 len,
519                                 RTE_CACHE_LINE_SIZE,
520                                 rx_adapter->socket_id);
521 }
522
523 static int
524 rxa_alloc_poll_arrays(struct rte_event_eth_rx_adapter *rx_adapter,
525                 uint32_t nb_poll,
526                 uint32_t nb_wrr,
527                 struct eth_rx_poll_entry **rx_poll,
528                 uint32_t **wrr_sched)
529 {
530
531         if (nb_poll == 0) {
532                 *rx_poll = NULL;
533                 *wrr_sched = NULL;
534                 return 0;
535         }
536
537         *rx_poll = rxa_alloc_poll(rx_adapter, nb_poll);
538         if (*rx_poll == NULL) {
539                 *wrr_sched = NULL;
540                 return -ENOMEM;
541         }
542
543         *wrr_sched = rxa_alloc_wrr(rx_adapter, nb_wrr);
544         if (*wrr_sched == NULL) {
545                 rte_free(*rx_poll);
546                 return -ENOMEM;
547         }
548         return 0;
549 }
550
551 /* Precalculate WRR polling sequence for all queues in rx_adapter */
552 static void
553 rxa_calc_wrr_sequence(struct rte_event_eth_rx_adapter *rx_adapter,
554                 struct eth_rx_poll_entry *rx_poll,
555                 uint32_t *rx_wrr)
556 {
557         uint16_t d;
558         uint16_t q;
559         unsigned int i;
560         int prev = -1;
561         int cw = -1;
562
563         /* Initialize variables for calculation of wrr schedule */
564         uint16_t max_wrr_pos = 0;
565         unsigned int poll_q = 0;
566         uint16_t max_wt = 0;
567         uint16_t gcd = 0;
568
569         if (rx_poll == NULL)
570                 return;
571
572         /* Generate array of all queues to poll, the size of this
573          * array is poll_q
574          */
575         RTE_ETH_FOREACH_DEV(d) {
576                 uint16_t nb_rx_queues;
577                 struct eth_device_info *dev_info =
578                                 &rx_adapter->eth_devices[d];
579                 nb_rx_queues = dev_info->dev->data->nb_rx_queues;
580                 if (dev_info->rx_queue == NULL)
581                         continue;
582                 if (dev_info->internal_event_port)
583                         continue;
584                 dev_info->wrr_len = 0;
585                 for (q = 0; q < nb_rx_queues; q++) {
586                         struct eth_rx_queue_info *queue_info =
587                                 &dev_info->rx_queue[q];
588                         uint16_t wt;
589
590                         if (!rxa_polled_queue(dev_info, q))
591                                 continue;
592                         wt = queue_info->wt;
593                         rx_poll[poll_q].eth_dev_id = d;
594                         rx_poll[poll_q].eth_rx_qid = q;
595                         max_wrr_pos += wt;
596                         dev_info->wrr_len += wt;
597                         max_wt = RTE_MAX(max_wt, wt);
598                         gcd = (gcd) ? rxa_gcd_u16(gcd, wt) : wt;
599                         poll_q++;
600                 }
601         }
602
603         /* Generate polling sequence based on weights */
604         prev = -1;
605         cw = -1;
606         for (i = 0; i < max_wrr_pos; i++) {
607                 rx_wrr[i] = rxa_wrr_next(rx_adapter, poll_q, &cw,
608                                      rx_poll, max_wt, gcd, prev);
609                 prev = rx_wrr[i];
610         }
611 }
612
613 static inline void
614 rxa_mtoip(struct rte_mbuf *m, struct ipv4_hdr **ipv4_hdr,
615         struct ipv6_hdr **ipv6_hdr)
616 {
617         struct ether_hdr *eth_hdr = rte_pktmbuf_mtod(m, struct ether_hdr *);
618         struct vlan_hdr *vlan_hdr;
619
620         *ipv4_hdr = NULL;
621         *ipv6_hdr = NULL;
622
623         switch (eth_hdr->ether_type) {
624         case RTE_BE16(ETHER_TYPE_IPv4):
625                 *ipv4_hdr = (struct ipv4_hdr *)(eth_hdr + 1);
626                 break;
627
628         case RTE_BE16(ETHER_TYPE_IPv6):
629                 *ipv6_hdr = (struct ipv6_hdr *)(eth_hdr + 1);
630                 break;
631
632         case RTE_BE16(ETHER_TYPE_VLAN):
633                 vlan_hdr = (struct vlan_hdr *)(eth_hdr + 1);
634                 switch (vlan_hdr->eth_proto) {
635                 case RTE_BE16(ETHER_TYPE_IPv4):
636                         *ipv4_hdr = (struct ipv4_hdr *)(vlan_hdr + 1);
637                         break;
638                 case RTE_BE16(ETHER_TYPE_IPv6):
639                         *ipv6_hdr = (struct ipv6_hdr *)(vlan_hdr + 1);
640                         break;
641                 default:
642                         break;
643                 }
644                 break;
645
646         default:
647                 break;
648         }
649 }
650
651 /* Calculate RSS hash for IPv4/6 */
652 static inline uint32_t
653 rxa_do_softrss(struct rte_mbuf *m, const uint8_t *rss_key_be)
654 {
655         uint32_t input_len;
656         void *tuple;
657         struct rte_ipv4_tuple ipv4_tuple;
658         struct rte_ipv6_tuple ipv6_tuple;
659         struct ipv4_hdr *ipv4_hdr;
660         struct ipv6_hdr *ipv6_hdr;
661
662         rxa_mtoip(m, &ipv4_hdr, &ipv6_hdr);
663
664         if (ipv4_hdr) {
665                 ipv4_tuple.src_addr = rte_be_to_cpu_32(ipv4_hdr->src_addr);
666                 ipv4_tuple.dst_addr = rte_be_to_cpu_32(ipv4_hdr->dst_addr);
667                 tuple = &ipv4_tuple;
668                 input_len = RTE_THASH_V4_L3_LEN;
669         } else if (ipv6_hdr) {
670                 rte_thash_load_v6_addrs(ipv6_hdr,
671                                         (union rte_thash_tuple *)&ipv6_tuple);
672                 tuple = &ipv6_tuple;
673                 input_len = RTE_THASH_V6_L3_LEN;
674         } else
675                 return 0;
676
677         return rte_softrss_be(tuple, input_len, rss_key_be);
678 }
679
680 static inline int
681 rxa_enq_blocked(struct rte_event_eth_rx_adapter *rx_adapter)
682 {
683         return !!rx_adapter->enq_block_count;
684 }
685
686 static inline void
687 rxa_enq_block_start_ts(struct rte_event_eth_rx_adapter *rx_adapter)
688 {
689         if (rx_adapter->rx_enq_block_start_ts)
690                 return;
691
692         rx_adapter->enq_block_count++;
693         if (rx_adapter->enq_block_count < BLOCK_CNT_THRESHOLD)
694                 return;
695
696         rx_adapter->rx_enq_block_start_ts = rte_get_tsc_cycles();
697 }
698
699 static inline void
700 rxa_enq_block_end_ts(struct rte_event_eth_rx_adapter *rx_adapter,
701                     struct rte_event_eth_rx_adapter_stats *stats)
702 {
703         if (unlikely(!stats->rx_enq_start_ts))
704                 stats->rx_enq_start_ts = rte_get_tsc_cycles();
705
706         if (likely(!rxa_enq_blocked(rx_adapter)))
707                 return;
708
709         rx_adapter->enq_block_count = 0;
710         if (rx_adapter->rx_enq_block_start_ts) {
711                 stats->rx_enq_end_ts = rte_get_tsc_cycles();
712                 stats->rx_enq_block_cycles += stats->rx_enq_end_ts -
713                     rx_adapter->rx_enq_block_start_ts;
714                 rx_adapter->rx_enq_block_start_ts = 0;
715         }
716 }
717
718 /* Add event to buffer, free space check is done prior to calling
719  * this function
720  */
721 static inline void
722 rxa_buffer_event(struct rte_event_eth_rx_adapter *rx_adapter,
723                 struct rte_event *ev)
724 {
725         struct rte_eth_event_enqueue_buffer *buf =
726             &rx_adapter->event_enqueue_buffer;
727         rte_memcpy(&buf->events[buf->count++], ev, sizeof(struct rte_event));
728 }
729
730 /* Enqueue buffered events to event device */
731 static inline uint16_t
732 rxa_flush_event_buffer(struct rte_event_eth_rx_adapter *rx_adapter)
733 {
734         struct rte_eth_event_enqueue_buffer *buf =
735             &rx_adapter->event_enqueue_buffer;
736         struct rte_event_eth_rx_adapter_stats *stats = &rx_adapter->stats;
737
738         uint16_t n = rte_event_enqueue_new_burst(rx_adapter->eventdev_id,
739                                         rx_adapter->event_port_id,
740                                         buf->events,
741                                         buf->count);
742         if (n != buf->count) {
743                 memmove(buf->events,
744                         &buf->events[n],
745                         (buf->count - n) * sizeof(struct rte_event));
746                 stats->rx_enq_retry++;
747         }
748
749         n ? rxa_enq_block_end_ts(rx_adapter, stats) :
750                 rxa_enq_block_start_ts(rx_adapter);
751
752         buf->count -= n;
753         stats->rx_enq_count += n;
754
755         return n;
756 }
757
758 static inline void
759 rxa_buffer_mbufs(struct rte_event_eth_rx_adapter *rx_adapter,
760                 uint16_t eth_dev_id,
761                 uint16_t rx_queue_id,
762                 struct rte_mbuf **mbufs,
763                 uint16_t num)
764 {
765         uint32_t i;
766         struct eth_device_info *dev_info =
767                                         &rx_adapter->eth_devices[eth_dev_id];
768         struct eth_rx_queue_info *eth_rx_queue_info =
769                                         &dev_info->rx_queue[rx_queue_id];
770         struct rte_eth_event_enqueue_buffer *buf =
771                                         &rx_adapter->event_enqueue_buffer;
772         int32_t qid = eth_rx_queue_info->event_queue_id;
773         uint8_t sched_type = eth_rx_queue_info->sched_type;
774         uint8_t priority = eth_rx_queue_info->priority;
775         uint32_t flow_id;
776         struct rte_event events[BATCH_SIZE];
777         struct rte_mbuf *m = mbufs[0];
778         uint32_t rss_mask;
779         uint32_t rss;
780         int do_rss;
781         uint64_t ts;
782         struct rte_mbuf *cb_mbufs[BATCH_SIZE];
783         uint16_t nb_cb;
784
785         /* 0xffff ffff if PKT_RX_RSS_HASH is set, otherwise 0 */
786         rss_mask = ~(((m->ol_flags & PKT_RX_RSS_HASH) != 0) - 1);
787         do_rss = !rss_mask && !eth_rx_queue_info->flow_id_mask;
788
789         if ((m->ol_flags & PKT_RX_TIMESTAMP) == 0) {
790                 ts = rte_get_tsc_cycles();
791                 for (i = 0; i < num; i++) {
792                         m = mbufs[i];
793
794                         m->timestamp = ts;
795                         m->ol_flags |= PKT_RX_TIMESTAMP;
796                 }
797         }
798
799
800         nb_cb = dev_info->cb_fn ? dev_info->cb_fn(eth_dev_id, rx_queue_id,
801                                                 ETH_EVENT_BUFFER_SIZE,
802                                                 buf->count, mbufs,
803                                                 num,
804                                                 dev_info->cb_arg,
805                                                 cb_mbufs) :
806                                                 num;
807         if (nb_cb < num) {
808                 mbufs = cb_mbufs;
809                 num = nb_cb;
810         }
811
812         for (i = 0; i < num; i++) {
813                 m = mbufs[i];
814                 struct rte_event *ev = &events[i];
815
816                 rss = do_rss ?
817                         rxa_do_softrss(m, rx_adapter->rss_key_be) :
818                         m->hash.rss;
819                 flow_id =
820                     eth_rx_queue_info->flow_id &
821                                 eth_rx_queue_info->flow_id_mask;
822                 flow_id |= rss & ~eth_rx_queue_info->flow_id_mask;
823                 ev->flow_id = flow_id;
824                 ev->op = RTE_EVENT_OP_NEW;
825                 ev->sched_type = sched_type;
826                 ev->queue_id = qid;
827                 ev->event_type = RTE_EVENT_TYPE_ETH_RX_ADAPTER;
828                 ev->sub_event_type = 0;
829                 ev->priority = priority;
830                 ev->mbuf = m;
831
832                 rxa_buffer_event(rx_adapter, ev);
833         }
834 }
835
836 /* Enqueue packets from  <port, q>  to event buffer */
837 static inline uint32_t
838 rxa_eth_rx(struct rte_event_eth_rx_adapter *rx_adapter,
839         uint16_t port_id,
840         uint16_t queue_id,
841         uint32_t rx_count,
842         uint32_t max_rx,
843         int *rxq_empty)
844 {
845         struct rte_mbuf *mbufs[BATCH_SIZE];
846         struct rte_eth_event_enqueue_buffer *buf =
847                                         &rx_adapter->event_enqueue_buffer;
848         struct rte_event_eth_rx_adapter_stats *stats =
849                                         &rx_adapter->stats;
850         uint16_t n;
851         uint32_t nb_rx = 0;
852
853         if (rxq_empty)
854                 *rxq_empty = 0;
855         /* Don't do a batch dequeue from the rx queue if there isn't
856          * enough space in the enqueue buffer.
857          */
858         while (BATCH_SIZE <= (RTE_DIM(buf->events) - buf->count)) {
859                 if (buf->count >= BATCH_SIZE)
860                         rxa_flush_event_buffer(rx_adapter);
861
862                 stats->rx_poll_count++;
863                 n = rte_eth_rx_burst(port_id, queue_id, mbufs, BATCH_SIZE);
864                 if (unlikely(!n)) {
865                         if (rxq_empty)
866                                 *rxq_empty = 1;
867                         break;
868                 }
869                 rxa_buffer_mbufs(rx_adapter, port_id, queue_id, mbufs, n);
870                 nb_rx += n;
871                 if (rx_count + nb_rx > max_rx)
872                         break;
873         }
874
875         if (buf->count >= BATCH_SIZE)
876                 rxa_flush_event_buffer(rx_adapter);
877
878         return nb_rx;
879 }
880
881 static inline void
882 rxa_intr_ring_enqueue(struct rte_event_eth_rx_adapter *rx_adapter,
883                 void *data)
884 {
885         uint16_t port_id;
886         uint16_t queue;
887         int err;
888         union queue_data qd;
889         struct eth_device_info *dev_info;
890         struct eth_rx_queue_info *queue_info;
891         int *intr_enabled;
892
893         qd.ptr = data;
894         port_id = qd.port;
895         queue = qd.queue;
896
897         dev_info = &rx_adapter->eth_devices[port_id];
898         queue_info = &dev_info->rx_queue[queue];
899         rte_spinlock_lock(&rx_adapter->intr_ring_lock);
900         if (rxa_shared_intr(dev_info, queue))
901                 intr_enabled = &dev_info->shared_intr_enabled;
902         else
903                 intr_enabled = &queue_info->intr_enabled;
904
905         if (*intr_enabled) {
906                 *intr_enabled = 0;
907                 err = rte_ring_enqueue(rx_adapter->intr_ring, data);
908                 /* Entry should always be available.
909                  * The ring size equals the maximum number of interrupt
910                  * vectors supported (an interrupt vector is shared in
911                  * case of shared interrupts)
912                  */
913                 if (err)
914                         RTE_EDEV_LOG_ERR("Failed to enqueue interrupt"
915                                 " to ring: %s", strerror(err));
916                 else
917                         rte_eth_dev_rx_intr_disable(port_id, queue);
918         }
919         rte_spinlock_unlock(&rx_adapter->intr_ring_lock);
920 }
921
922 static int
923 rxa_intr_ring_check_avail(struct rte_event_eth_rx_adapter *rx_adapter,
924                         uint32_t num_intr_vec)
925 {
926         if (rx_adapter->num_intr_vec + num_intr_vec >
927                                 RTE_EVENT_ETH_INTR_RING_SIZE) {
928                 RTE_EDEV_LOG_ERR("Exceeded intr ring slots current"
929                 " %d needed %d limit %d", rx_adapter->num_intr_vec,
930                 num_intr_vec, RTE_EVENT_ETH_INTR_RING_SIZE);
931                 return -ENOSPC;
932         }
933
934         return 0;
935 }
936
937 /* Delete entries for (dev, queue) from the interrupt ring */
938 static void
939 rxa_intr_ring_del_entries(struct rte_event_eth_rx_adapter *rx_adapter,
940                         struct eth_device_info *dev_info,
941                         uint16_t rx_queue_id)
942 {
943         int i, n;
944         union queue_data qd;
945
946         rte_spinlock_lock(&rx_adapter->intr_ring_lock);
947
948         n = rte_ring_count(rx_adapter->intr_ring);
949         for (i = 0; i < n; i++) {
950                 rte_ring_dequeue(rx_adapter->intr_ring, &qd.ptr);
951                 if (!rxa_shared_intr(dev_info, rx_queue_id)) {
952                         if (qd.port == dev_info->dev->data->port_id &&
953                                 qd.queue == rx_queue_id)
954                                 continue;
955                 } else {
956                         if (qd.port == dev_info->dev->data->port_id)
957                                 continue;
958                 }
959                 rte_ring_enqueue(rx_adapter->intr_ring, qd.ptr);
960         }
961
962         rte_spinlock_unlock(&rx_adapter->intr_ring_lock);
963 }
964
965 /* pthread callback handling interrupt mode receive queues
966  * After receiving an Rx interrupt, it enqueues the port id and queue id of the
967  * interrupting queue to the adapter's ring buffer for interrupt events.
968  * These events are picked up by rxa_intr_ring_dequeue() which is invoked from
969  * the adapter service function.
970  */
971 static void *
972 rxa_intr_thread(void *arg)
973 {
974         struct rte_event_eth_rx_adapter *rx_adapter = arg;
975         struct rte_epoll_event *epoll_events = rx_adapter->epoll_events;
976         int n, i;
977
978         while (1) {
979                 n = rte_epoll_wait(rx_adapter->epd, epoll_events,
980                                 RTE_EVENT_ETH_INTR_RING_SIZE, -1);
981                 if (unlikely(n < 0))
982                         RTE_EDEV_LOG_ERR("rte_epoll_wait returned error %d",
983                                         n);
984                 for (i = 0; i < n; i++) {
985                         rxa_intr_ring_enqueue(rx_adapter,
986                                         epoll_events[i].epdata.data);
987                 }
988         }
989
990         return NULL;
991 }
992
993 /* Dequeue <port, q> from interrupt ring and enqueue received
994  * mbufs to eventdev
995  */
996 static inline uint32_t
997 rxa_intr_ring_dequeue(struct rte_event_eth_rx_adapter *rx_adapter)
998 {
999         uint32_t n;
1000         uint32_t nb_rx = 0;
1001         int rxq_empty;
1002         struct rte_eth_event_enqueue_buffer *buf;
1003         rte_spinlock_t *ring_lock;
1004         uint8_t max_done = 0;
1005
1006         if (rx_adapter->num_rx_intr == 0)
1007                 return 0;
1008
1009         if (rte_ring_count(rx_adapter->intr_ring) == 0
1010                 && !rx_adapter->qd_valid)
1011                 return 0;
1012
1013         buf = &rx_adapter->event_enqueue_buffer;
1014         ring_lock = &rx_adapter->intr_ring_lock;
1015
1016         if (buf->count >= BATCH_SIZE)
1017                 rxa_flush_event_buffer(rx_adapter);
1018
1019         while (BATCH_SIZE <= (RTE_DIM(buf->events) - buf->count)) {
1020                 struct eth_device_info *dev_info;
1021                 uint16_t port;
1022                 uint16_t queue;
1023                 union queue_data qd  = rx_adapter->qd;
1024                 int err;
1025
1026                 if (!rx_adapter->qd_valid) {
1027                         struct eth_rx_queue_info *queue_info;
1028
1029                         rte_spinlock_lock(ring_lock);
1030                         err = rte_ring_dequeue(rx_adapter->intr_ring, &qd.ptr);
1031                         if (err) {
1032                                 rte_spinlock_unlock(ring_lock);
1033                                 break;
1034                         }
1035
1036                         port = qd.port;
1037                         queue = qd.queue;
1038                         rx_adapter->qd = qd;
1039                         rx_adapter->qd_valid = 1;
1040                         dev_info = &rx_adapter->eth_devices[port];
1041                         if (rxa_shared_intr(dev_info, queue))
1042                                 dev_info->shared_intr_enabled = 1;
1043                         else {
1044                                 queue_info = &dev_info->rx_queue[queue];
1045                                 queue_info->intr_enabled = 1;
1046                         }
1047                         rte_eth_dev_rx_intr_enable(port, queue);
1048                         rte_spinlock_unlock(ring_lock);
1049                 } else {
1050                         port = qd.port;
1051                         queue = qd.queue;
1052
1053                         dev_info = &rx_adapter->eth_devices[port];
1054                 }
1055
1056                 if (rxa_shared_intr(dev_info, queue)) {
1057                         uint16_t i;
1058                         uint16_t nb_queues;
1059
1060                         nb_queues = dev_info->dev->data->nb_rx_queues;
1061                         n = 0;
1062                         for (i = dev_info->next_q_idx; i < nb_queues; i++) {
1063                                 uint8_t enq_buffer_full;
1064
1065                                 if (!rxa_intr_queue(dev_info, i))
1066                                         continue;
1067                                 n = rxa_eth_rx(rx_adapter, port, i, nb_rx,
1068                                         rx_adapter->max_nb_rx,
1069                                         &rxq_empty);
1070                                 nb_rx += n;
1071
1072                                 enq_buffer_full = !rxq_empty && n == 0;
1073                                 max_done = nb_rx > rx_adapter->max_nb_rx;
1074
1075                                 if (enq_buffer_full || max_done) {
1076                                         dev_info->next_q_idx = i;
1077                                         goto done;
1078                                 }
1079                         }
1080
1081                         rx_adapter->qd_valid = 0;
1082
1083                         /* Reinitialize for next interrupt */
1084                         dev_info->next_q_idx = dev_info->multi_intr_cap ?
1085                                                 RTE_MAX_RXTX_INTR_VEC_ID - 1 :
1086                                                 0;
1087                 } else {
1088                         n = rxa_eth_rx(rx_adapter, port, queue, nb_rx,
1089                                 rx_adapter->max_nb_rx,
1090                                 &rxq_empty);
1091                         rx_adapter->qd_valid = !rxq_empty;
1092                         nb_rx += n;
1093                         if (nb_rx > rx_adapter->max_nb_rx)
1094                                 break;
1095                 }
1096         }
1097
1098 done:
1099         rx_adapter->stats.rx_intr_packets += nb_rx;
1100         return nb_rx;
1101 }
1102
1103 /*
1104  * Polls receive queues added to the event adapter and enqueues received
1105  * packets to the event device.
1106  *
1107  * The receive code enqueues initially to a temporary buffer, the
1108  * temporary buffer is drained anytime it holds >= BATCH_SIZE packets
1109  *
1110  * If there isn't space available in the temporary buffer, packets from the
1111  * Rx queue aren't dequeued from the eth device, this back pressures the
1112  * eth device, in virtual device environments this back pressure is relayed to
1113  * the hypervisor's switching layer where adjustments can be made to deal with
1114  * it.
1115  */
1116 static inline uint32_t
1117 rxa_poll(struct rte_event_eth_rx_adapter *rx_adapter)
1118 {
1119         uint32_t num_queue;
1120         uint32_t nb_rx = 0;
1121         struct rte_eth_event_enqueue_buffer *buf;
1122         uint32_t wrr_pos;
1123         uint32_t max_nb_rx;
1124
1125         wrr_pos = rx_adapter->wrr_pos;
1126         max_nb_rx = rx_adapter->max_nb_rx;
1127         buf = &rx_adapter->event_enqueue_buffer;
1128
1129         /* Iterate through a WRR sequence */
1130         for (num_queue = 0; num_queue < rx_adapter->wrr_len; num_queue++) {
1131                 unsigned int poll_idx = rx_adapter->wrr_sched[wrr_pos];
1132                 uint16_t qid = rx_adapter->eth_rx_poll[poll_idx].eth_rx_qid;
1133                 uint16_t d = rx_adapter->eth_rx_poll[poll_idx].eth_dev_id;
1134
1135                 /* Don't do a batch dequeue from the rx queue if there isn't
1136                  * enough space in the enqueue buffer.
1137                  */
1138                 if (buf->count >= BATCH_SIZE)
1139                         rxa_flush_event_buffer(rx_adapter);
1140                 if (BATCH_SIZE > (ETH_EVENT_BUFFER_SIZE - buf->count)) {
1141                         rx_adapter->wrr_pos = wrr_pos;
1142                         return nb_rx;
1143                 }
1144
1145                 nb_rx += rxa_eth_rx(rx_adapter, d, qid, nb_rx, max_nb_rx,
1146                                 NULL);
1147                 if (nb_rx > max_nb_rx) {
1148                         rx_adapter->wrr_pos =
1149                                     (wrr_pos + 1) % rx_adapter->wrr_len;
1150                         break;
1151                 }
1152
1153                 if (++wrr_pos == rx_adapter->wrr_len)
1154                         wrr_pos = 0;
1155         }
1156         return nb_rx;
1157 }
1158
1159 static int
1160 rxa_service_func(void *args)
1161 {
1162         struct rte_event_eth_rx_adapter *rx_adapter = args;
1163         struct rte_event_eth_rx_adapter_stats *stats;
1164
1165         if (rte_spinlock_trylock(&rx_adapter->rx_lock) == 0)
1166                 return 0;
1167         if (!rx_adapter->rxa_started) {
1168                 return 0;
1169                 rte_spinlock_unlock(&rx_adapter->rx_lock);
1170         }
1171
1172         stats = &rx_adapter->stats;
1173         stats->rx_packets += rxa_intr_ring_dequeue(rx_adapter);
1174         stats->rx_packets += rxa_poll(rx_adapter);
1175         rte_spinlock_unlock(&rx_adapter->rx_lock);
1176         return 0;
1177 }
1178
1179 static int
1180 rte_event_eth_rx_adapter_init(void)
1181 {
1182         const char *name = "rte_event_eth_rx_adapter_array";
1183         const struct rte_memzone *mz;
1184         unsigned int sz;
1185
1186         sz = sizeof(*event_eth_rx_adapter) *
1187             RTE_EVENT_ETH_RX_ADAPTER_MAX_INSTANCE;
1188         sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);
1189
1190         mz = rte_memzone_lookup(name);
1191         if (mz == NULL) {
1192                 mz = rte_memzone_reserve_aligned(name, sz, rte_socket_id(), 0,
1193                                                  RTE_CACHE_LINE_SIZE);
1194                 if (mz == NULL) {
1195                         RTE_EDEV_LOG_ERR("failed to reserve memzone err = %"
1196                                         PRId32, rte_errno);
1197                         return -rte_errno;
1198                 }
1199         }
1200
1201         event_eth_rx_adapter = mz->addr;
1202         return 0;
1203 }
1204
1205 static inline struct rte_event_eth_rx_adapter *
1206 rxa_id_to_adapter(uint8_t id)
1207 {
1208         return event_eth_rx_adapter ?
1209                 event_eth_rx_adapter[id] : NULL;
1210 }
1211
1212 static int
1213 rxa_default_conf_cb(uint8_t id, uint8_t dev_id,
1214                 struct rte_event_eth_rx_adapter_conf *conf, void *arg)
1215 {
1216         int ret;
1217         struct rte_eventdev *dev;
1218         struct rte_event_dev_config dev_conf;
1219         int started;
1220         uint8_t port_id;
1221         struct rte_event_port_conf *port_conf = arg;
1222         struct rte_event_eth_rx_adapter *rx_adapter = rxa_id_to_adapter(id);
1223
1224         dev = &rte_eventdevs[rx_adapter->eventdev_id];
1225         dev_conf = dev->data->dev_conf;
1226
1227         started = dev->data->dev_started;
1228         if (started)
1229                 rte_event_dev_stop(dev_id);
1230         port_id = dev_conf.nb_event_ports;
1231         dev_conf.nb_event_ports += 1;
1232         ret = rte_event_dev_configure(dev_id, &dev_conf);
1233         if (ret) {
1234                 RTE_EDEV_LOG_ERR("failed to configure event dev %u\n",
1235                                                 dev_id);
1236                 if (started) {
1237                         if (rte_event_dev_start(dev_id))
1238                                 return -EIO;
1239                 }
1240                 return ret;
1241         }
1242
1243         ret = rte_event_port_setup(dev_id, port_id, port_conf);
1244         if (ret) {
1245                 RTE_EDEV_LOG_ERR("failed to setup event port %u\n",
1246                                         port_id);
1247                 return ret;
1248         }
1249
1250         conf->event_port_id = port_id;
1251         conf->max_nb_rx = 128;
1252         if (started)
1253                 ret = rte_event_dev_start(dev_id);
1254         rx_adapter->default_cb_arg = 1;
1255         return ret;
1256 }
1257
1258 static int
1259 rxa_epoll_create1(void)
1260 {
1261 #if defined(LINUX)
1262         int fd;
1263         fd = epoll_create1(EPOLL_CLOEXEC);
1264         return fd < 0 ? -errno : fd;
1265 #elif defined(BSD)
1266         return -ENOTSUP;
1267 #endif
1268 }
1269
1270 static int
1271 rxa_init_epd(struct rte_event_eth_rx_adapter *rx_adapter)
1272 {
1273         if (rx_adapter->epd != INIT_FD)
1274                 return 0;
1275
1276         rx_adapter->epd = rxa_epoll_create1();
1277         if (rx_adapter->epd < 0) {
1278                 int err = rx_adapter->epd;
1279                 rx_adapter->epd = INIT_FD;
1280                 RTE_EDEV_LOG_ERR("epoll_create1() failed, err %d", err);
1281                 return err;
1282         }
1283
1284         return 0;
1285 }
1286
1287 static int
1288 rxa_create_intr_thread(struct rte_event_eth_rx_adapter *rx_adapter)
1289 {
1290         int err;
1291         char thread_name[RTE_MAX_THREAD_NAME_LEN];
1292
1293         if (rx_adapter->intr_ring)
1294                 return 0;
1295
1296         rx_adapter->intr_ring = rte_ring_create("intr_ring",
1297                                         RTE_EVENT_ETH_INTR_RING_SIZE,
1298                                         rte_socket_id(), 0);
1299         if (!rx_adapter->intr_ring)
1300                 return -ENOMEM;
1301
1302         rx_adapter->epoll_events = rte_zmalloc_socket(rx_adapter->mem_name,
1303                                         RTE_EVENT_ETH_INTR_RING_SIZE *
1304                                         sizeof(struct rte_epoll_event),
1305                                         RTE_CACHE_LINE_SIZE,
1306                                         rx_adapter->socket_id);
1307         if (!rx_adapter->epoll_events) {
1308                 err = -ENOMEM;
1309                 goto error;
1310         }
1311
1312         rte_spinlock_init(&rx_adapter->intr_ring_lock);
1313
1314         snprintf(thread_name, RTE_MAX_THREAD_NAME_LEN,
1315                         "rx-intr-thread-%d", rx_adapter->id);
1316
1317         err = rte_ctrl_thread_create(&rx_adapter->rx_intr_thread, thread_name,
1318                                 NULL, rxa_intr_thread, rx_adapter);
1319         if (!err) {
1320                 rte_thread_setname(rx_adapter->rx_intr_thread, thread_name);
1321                 return 0;
1322         }
1323
1324         RTE_EDEV_LOG_ERR("Failed to create interrupt thread err = %d\n", err);
1325 error:
1326         rte_ring_free(rx_adapter->intr_ring);
1327         rx_adapter->intr_ring = NULL;
1328         rx_adapter->epoll_events = NULL;
1329         return err;
1330 }
1331
1332 static int
1333 rxa_destroy_intr_thread(struct rte_event_eth_rx_adapter *rx_adapter)
1334 {
1335         int err;
1336
1337         err = pthread_cancel(rx_adapter->rx_intr_thread);
1338         if (err)
1339                 RTE_EDEV_LOG_ERR("Can't cancel interrupt thread err = %d\n",
1340                                 err);
1341
1342         err = pthread_join(rx_adapter->rx_intr_thread, NULL);
1343         if (err)
1344                 RTE_EDEV_LOG_ERR("Can't join interrupt thread err = %d\n", err);
1345
1346         rte_free(rx_adapter->epoll_events);
1347         rte_ring_free(rx_adapter->intr_ring);
1348         rx_adapter->intr_ring = NULL;
1349         rx_adapter->epoll_events = NULL;
1350         return 0;
1351 }
1352
1353 static int
1354 rxa_free_intr_resources(struct rte_event_eth_rx_adapter *rx_adapter)
1355 {
1356         int ret;
1357
1358         if (rx_adapter->num_rx_intr == 0)
1359                 return 0;
1360
1361         ret = rxa_destroy_intr_thread(rx_adapter);
1362         if (ret)
1363                 return ret;
1364
1365         close(rx_adapter->epd);
1366         rx_adapter->epd = INIT_FD;
1367
1368         return ret;
1369 }
1370
1371 static int
1372 rxa_disable_intr(struct rte_event_eth_rx_adapter *rx_adapter,
1373         struct eth_device_info *dev_info,
1374         uint16_t rx_queue_id)
1375 {
1376         int err;
1377         uint16_t eth_dev_id = dev_info->dev->data->port_id;
1378         int sintr = rxa_shared_intr(dev_info, rx_queue_id);
1379
1380         err = rte_eth_dev_rx_intr_disable(eth_dev_id, rx_queue_id);
1381         if (err) {
1382                 RTE_EDEV_LOG_ERR("Could not disable interrupt for Rx queue %u",
1383                         rx_queue_id);
1384                 return err;
1385         }
1386
1387         err = rte_eth_dev_rx_intr_ctl_q(eth_dev_id, rx_queue_id,
1388                                         rx_adapter->epd,
1389                                         RTE_INTR_EVENT_DEL,
1390                                         0);
1391         if (err)
1392                 RTE_EDEV_LOG_ERR("Interrupt event deletion failed %d", err);
1393
1394         if (sintr)
1395                 dev_info->rx_queue[rx_queue_id].intr_enabled = 0;
1396         else
1397                 dev_info->shared_intr_enabled = 0;
1398         return err;
1399 }
1400
1401 static int
1402 rxa_del_intr_queue(struct rte_event_eth_rx_adapter *rx_adapter,
1403                 struct eth_device_info *dev_info,
1404                 int rx_queue_id)
1405 {
1406         int err;
1407         int i;
1408         int s;
1409
1410         if (dev_info->nb_rx_intr == 0)
1411                 return 0;
1412
1413         err = 0;
1414         if (rx_queue_id == -1) {
1415                 s = dev_info->nb_shared_intr;
1416                 for (i = 0; i < dev_info->nb_rx_intr; i++) {
1417                         int sintr;
1418                         uint16_t q;
1419
1420                         q = dev_info->intr_queue[i];
1421                         sintr = rxa_shared_intr(dev_info, q);
1422                         s -= sintr;
1423
1424                         if (!sintr || s == 0) {
1425
1426                                 err = rxa_disable_intr(rx_adapter, dev_info,
1427                                                 q);
1428                                 if (err)
1429                                         return err;
1430                                 rxa_intr_ring_del_entries(rx_adapter, dev_info,
1431                                                         q);
1432                         }
1433                 }
1434         } else {
1435                 if (!rxa_intr_queue(dev_info, rx_queue_id))
1436                         return 0;
1437                 if (!rxa_shared_intr(dev_info, rx_queue_id) ||
1438                                 dev_info->nb_shared_intr == 1) {
1439                         err = rxa_disable_intr(rx_adapter, dev_info,
1440                                         rx_queue_id);
1441                         if (err)
1442                                 return err;
1443                         rxa_intr_ring_del_entries(rx_adapter, dev_info,
1444                                                 rx_queue_id);
1445                 }
1446
1447                 for (i = 0; i < dev_info->nb_rx_intr; i++) {
1448                         if (dev_info->intr_queue[i] == rx_queue_id) {
1449                                 for (; i < dev_info->nb_rx_intr - 1; i++)
1450                                         dev_info->intr_queue[i] =
1451                                                 dev_info->intr_queue[i + 1];
1452                                 break;
1453                         }
1454                 }
1455         }
1456
1457         return err;
1458 }
1459
1460 static int
1461 rxa_config_intr(struct rte_event_eth_rx_adapter *rx_adapter,
1462         struct eth_device_info *dev_info,
1463         uint16_t rx_queue_id)
1464 {
1465         int err, err1;
1466         uint16_t eth_dev_id = dev_info->dev->data->port_id;
1467         union queue_data qd;
1468         int init_fd;
1469         uint16_t *intr_queue;
1470         int sintr = rxa_shared_intr(dev_info, rx_queue_id);
1471
1472         if (rxa_intr_queue(dev_info, rx_queue_id))
1473                 return 0;
1474
1475         intr_queue = dev_info->intr_queue;
1476         if (dev_info->intr_queue == NULL) {
1477                 size_t len =
1478                         dev_info->dev->data->nb_rx_queues * sizeof(uint16_t);
1479                 dev_info->intr_queue =
1480                         rte_zmalloc_socket(
1481                                 rx_adapter->mem_name,
1482                                 len,
1483                                 0,
1484                                 rx_adapter->socket_id);
1485                 if (dev_info->intr_queue == NULL)
1486                         return -ENOMEM;
1487         }
1488
1489         init_fd = rx_adapter->epd;
1490         err = rxa_init_epd(rx_adapter);
1491         if (err)
1492                 goto err_free_queue;
1493
1494         qd.port = eth_dev_id;
1495         qd.queue = rx_queue_id;
1496
1497         err = rte_eth_dev_rx_intr_ctl_q(eth_dev_id, rx_queue_id,
1498                                         rx_adapter->epd,
1499                                         RTE_INTR_EVENT_ADD,
1500                                         qd.ptr);
1501         if (err) {
1502                 RTE_EDEV_LOG_ERR("Failed to add interrupt event for"
1503                         " Rx Queue %u err %d", rx_queue_id, err);
1504                 goto err_del_fd;
1505         }
1506
1507         err = rte_eth_dev_rx_intr_enable(eth_dev_id, rx_queue_id);
1508         if (err) {
1509                 RTE_EDEV_LOG_ERR("Could not enable interrupt for"
1510                                 " Rx Queue %u err %d", rx_queue_id, err);
1511
1512                 goto err_del_event;
1513         }
1514
1515         err = rxa_create_intr_thread(rx_adapter);
1516         if (!err)  {
1517                 if (sintr)
1518                         dev_info->shared_intr_enabled = 1;
1519                 else
1520                         dev_info->rx_queue[rx_queue_id].intr_enabled = 1;
1521                 return 0;
1522         }
1523
1524
1525         err = rte_eth_dev_rx_intr_disable(eth_dev_id, rx_queue_id);
1526         if (err)
1527                 RTE_EDEV_LOG_ERR("Could not disable interrupt for"
1528                                 " Rx Queue %u err %d", rx_queue_id, err);
1529 err_del_event:
1530         err1 = rte_eth_dev_rx_intr_ctl_q(eth_dev_id, rx_queue_id,
1531                                         rx_adapter->epd,
1532                                         RTE_INTR_EVENT_DEL,
1533                                         0);
1534         if (err1) {
1535                 RTE_EDEV_LOG_ERR("Could not delete event for"
1536                                 " Rx Queue %u err %d", rx_queue_id, err1);
1537         }
1538 err_del_fd:
1539         if (init_fd == INIT_FD) {
1540                 close(rx_adapter->epd);
1541                 rx_adapter->epd = -1;
1542         }
1543 err_free_queue:
1544         if (intr_queue == NULL)
1545                 rte_free(dev_info->intr_queue);
1546
1547         return err;
1548 }
1549
1550 static int
1551 rxa_add_intr_queue(struct rte_event_eth_rx_adapter *rx_adapter,
1552         struct eth_device_info *dev_info,
1553         int rx_queue_id)
1554
1555 {
1556         int i, j, err;
1557         int si = -1;
1558         int shared_done = (dev_info->nb_shared_intr > 0);
1559
1560         if (rx_queue_id != -1) {
1561                 if (rxa_shared_intr(dev_info, rx_queue_id) && shared_done)
1562                         return 0;
1563                 return rxa_config_intr(rx_adapter, dev_info, rx_queue_id);
1564         }
1565
1566         err = 0;
1567         for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++) {
1568
1569                 if (rxa_shared_intr(dev_info, i) && shared_done)
1570                         continue;
1571
1572                 err = rxa_config_intr(rx_adapter, dev_info, i);
1573
1574                 shared_done = err == 0 && rxa_shared_intr(dev_info, i);
1575                 if (shared_done) {
1576                         si = i;
1577                         dev_info->shared_intr_enabled = 1;
1578                 }
1579                 if (err)
1580                         break;
1581         }
1582
1583         if (err == 0)
1584                 return 0;
1585
1586         shared_done = (dev_info->nb_shared_intr > 0);
1587         for (j = 0; j < i; j++) {
1588                 if (rxa_intr_queue(dev_info, j))
1589                         continue;
1590                 if (rxa_shared_intr(dev_info, j) && si != j)
1591                         continue;
1592                 err = rxa_disable_intr(rx_adapter, dev_info, j);
1593                 if (err)
1594                         break;
1595
1596         }
1597
1598         return err;
1599 }
1600
1601
1602 static int
1603 rxa_init_service(struct rte_event_eth_rx_adapter *rx_adapter, uint8_t id)
1604 {
1605         int ret;
1606         struct rte_service_spec service;
1607         struct rte_event_eth_rx_adapter_conf rx_adapter_conf;
1608
1609         if (rx_adapter->service_inited)
1610                 return 0;
1611
1612         memset(&service, 0, sizeof(service));
1613         snprintf(service.name, ETH_RX_ADAPTER_SERVICE_NAME_LEN,
1614                 "rte_event_eth_rx_adapter_%d", id);
1615         service.socket_id = rx_adapter->socket_id;
1616         service.callback = rxa_service_func;
1617         service.callback_userdata = rx_adapter;
1618         /* Service function handles locking for queue add/del updates */
1619         service.capabilities = RTE_SERVICE_CAP_MT_SAFE;
1620         ret = rte_service_component_register(&service, &rx_adapter->service_id);
1621         if (ret) {
1622                 RTE_EDEV_LOG_ERR("failed to register service %s err = %" PRId32,
1623                         service.name, ret);
1624                 return ret;
1625         }
1626
1627         ret = rx_adapter->conf_cb(id, rx_adapter->eventdev_id,
1628                 &rx_adapter_conf, rx_adapter->conf_arg);
1629         if (ret) {
1630                 RTE_EDEV_LOG_ERR("configuration callback failed err = %" PRId32,
1631                         ret);
1632                 goto err_done;
1633         }
1634         rx_adapter->event_port_id = rx_adapter_conf.event_port_id;
1635         rx_adapter->max_nb_rx = rx_adapter_conf.max_nb_rx;
1636         rx_adapter->service_inited = 1;
1637         rx_adapter->epd = INIT_FD;
1638         return 0;
1639
1640 err_done:
1641         rte_service_component_unregister(rx_adapter->service_id);
1642         return ret;
1643 }
1644
1645 static void
1646 rxa_update_queue(struct rte_event_eth_rx_adapter *rx_adapter,
1647                 struct eth_device_info *dev_info,
1648                 int32_t rx_queue_id,
1649                 uint8_t add)
1650 {
1651         struct eth_rx_queue_info *queue_info;
1652         int enabled;
1653         uint16_t i;
1654
1655         if (dev_info->rx_queue == NULL)
1656                 return;
1657
1658         if (rx_queue_id == -1) {
1659                 for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++)
1660                         rxa_update_queue(rx_adapter, dev_info, i, add);
1661         } else {
1662                 queue_info = &dev_info->rx_queue[rx_queue_id];
1663                 enabled = queue_info->queue_enabled;
1664                 if (add) {
1665                         rx_adapter->nb_queues += !enabled;
1666                         dev_info->nb_dev_queues += !enabled;
1667                 } else {
1668                         rx_adapter->nb_queues -= enabled;
1669                         dev_info->nb_dev_queues -= enabled;
1670                 }
1671                 queue_info->queue_enabled = !!add;
1672         }
1673 }
1674
1675 static void
1676 rxa_sw_del(struct rte_event_eth_rx_adapter *rx_adapter,
1677         struct eth_device_info *dev_info,
1678         int32_t rx_queue_id)
1679 {
1680         int pollq;
1681         int intrq;
1682         int sintrq;
1683
1684
1685         if (rx_adapter->nb_queues == 0)
1686                 return;
1687
1688         if (rx_queue_id == -1) {
1689                 uint16_t nb_rx_queues;
1690                 uint16_t i;
1691
1692                 nb_rx_queues = dev_info->dev->data->nb_rx_queues;
1693                 for (i = 0; i < nb_rx_queues; i++)
1694                         rxa_sw_del(rx_adapter, dev_info, i);
1695                 return;
1696         }
1697
1698         pollq = rxa_polled_queue(dev_info, rx_queue_id);
1699         intrq = rxa_intr_queue(dev_info, rx_queue_id);
1700         sintrq = rxa_shared_intr(dev_info, rx_queue_id);
1701         rxa_update_queue(rx_adapter, dev_info, rx_queue_id, 0);
1702         rx_adapter->num_rx_polled -= pollq;
1703         dev_info->nb_rx_poll -= pollq;
1704         rx_adapter->num_rx_intr -= intrq;
1705         dev_info->nb_rx_intr -= intrq;
1706         dev_info->nb_shared_intr -= intrq && sintrq;
1707 }
1708
1709 static void
1710 rxa_add_queue(struct rte_event_eth_rx_adapter *rx_adapter,
1711         struct eth_device_info *dev_info,
1712         int32_t rx_queue_id,
1713         const struct rte_event_eth_rx_adapter_queue_conf *conf)
1714 {
1715         struct eth_rx_queue_info *queue_info;
1716         const struct rte_event *ev = &conf->ev;
1717         int pollq;
1718         int intrq;
1719         int sintrq;
1720
1721         if (rx_queue_id == -1) {
1722                 uint16_t nb_rx_queues;
1723                 uint16_t i;
1724
1725                 nb_rx_queues = dev_info->dev->data->nb_rx_queues;
1726                 for (i = 0; i < nb_rx_queues; i++)
1727                         rxa_add_queue(rx_adapter, dev_info, i, conf);
1728                 return;
1729         }
1730
1731         pollq = rxa_polled_queue(dev_info, rx_queue_id);
1732         intrq = rxa_intr_queue(dev_info, rx_queue_id);
1733         sintrq = rxa_shared_intr(dev_info, rx_queue_id);
1734
1735         queue_info = &dev_info->rx_queue[rx_queue_id];
1736         queue_info->event_queue_id = ev->queue_id;
1737         queue_info->sched_type = ev->sched_type;
1738         queue_info->priority = ev->priority;
1739         queue_info->wt = conf->servicing_weight;
1740
1741         if (conf->rx_queue_flags &
1742                         RTE_EVENT_ETH_RX_ADAPTER_QUEUE_FLOW_ID_VALID) {
1743                 queue_info->flow_id = ev->flow_id;
1744                 queue_info->flow_id_mask = ~0;
1745         }
1746
1747         rxa_update_queue(rx_adapter, dev_info, rx_queue_id, 1);
1748         if (rxa_polled_queue(dev_info, rx_queue_id)) {
1749                 rx_adapter->num_rx_polled += !pollq;
1750                 dev_info->nb_rx_poll += !pollq;
1751                 rx_adapter->num_rx_intr -= intrq;
1752                 dev_info->nb_rx_intr -= intrq;
1753                 dev_info->nb_shared_intr -= intrq && sintrq;
1754         }
1755
1756         if (rxa_intr_queue(dev_info, rx_queue_id)) {
1757                 rx_adapter->num_rx_polled -= pollq;
1758                 dev_info->nb_rx_poll -= pollq;
1759                 rx_adapter->num_rx_intr += !intrq;
1760                 dev_info->nb_rx_intr += !intrq;
1761                 dev_info->nb_shared_intr += !intrq && sintrq;
1762                 if (dev_info->nb_shared_intr == 1) {
1763                         if (dev_info->multi_intr_cap)
1764                                 dev_info->next_q_idx =
1765                                         RTE_MAX_RXTX_INTR_VEC_ID - 1;
1766                         else
1767                                 dev_info->next_q_idx = 0;
1768                 }
1769         }
1770 }
1771
1772 static int rxa_sw_add(struct rte_event_eth_rx_adapter *rx_adapter,
1773                 uint16_t eth_dev_id,
1774                 int rx_queue_id,
1775                 const struct rte_event_eth_rx_adapter_queue_conf *queue_conf)
1776 {
1777         struct eth_device_info *dev_info = &rx_adapter->eth_devices[eth_dev_id];
1778         struct rte_event_eth_rx_adapter_queue_conf temp_conf;
1779         int ret;
1780         struct eth_rx_poll_entry *rx_poll;
1781         struct eth_rx_queue_info *rx_queue;
1782         uint32_t *rx_wrr;
1783         uint16_t nb_rx_queues;
1784         uint32_t nb_rx_poll, nb_wrr;
1785         uint32_t nb_rx_intr;
1786         int num_intr_vec;
1787         uint16_t wt;
1788
1789         if (queue_conf->servicing_weight == 0) {
1790                 struct rte_eth_dev_data *data = dev_info->dev->data;
1791
1792                 temp_conf = *queue_conf;
1793                 if (!data->dev_conf.intr_conf.rxq) {
1794                         /* If Rx interrupts are disabled set wt = 1 */
1795                         temp_conf.servicing_weight = 1;
1796                 }
1797                 queue_conf = &temp_conf;
1798         }
1799
1800         nb_rx_queues = dev_info->dev->data->nb_rx_queues;
1801         rx_queue = dev_info->rx_queue;
1802         wt = queue_conf->servicing_weight;
1803
1804         if (dev_info->rx_queue == NULL) {
1805                 dev_info->rx_queue =
1806                     rte_zmalloc_socket(rx_adapter->mem_name,
1807                                        nb_rx_queues *
1808                                        sizeof(struct eth_rx_queue_info), 0,
1809                                        rx_adapter->socket_id);
1810                 if (dev_info->rx_queue == NULL)
1811                         return -ENOMEM;
1812         }
1813         rx_wrr = NULL;
1814         rx_poll = NULL;
1815
1816         rxa_calc_nb_post_add(rx_adapter, dev_info, rx_queue_id,
1817                         queue_conf->servicing_weight,
1818                         &nb_rx_poll, &nb_rx_intr, &nb_wrr);
1819
1820         if (dev_info->dev->intr_handle)
1821                 dev_info->multi_intr_cap =
1822                         rte_intr_cap_multiple(dev_info->dev->intr_handle);
1823
1824         ret = rxa_alloc_poll_arrays(rx_adapter, nb_rx_poll, nb_wrr,
1825                                 &rx_poll, &rx_wrr);
1826         if (ret)
1827                 goto err_free_rxqueue;
1828
1829         if (wt == 0) {
1830                 num_intr_vec = rxa_nb_intr_vect(dev_info, rx_queue_id, 1);
1831
1832                 ret = rxa_intr_ring_check_avail(rx_adapter, num_intr_vec);
1833                 if (ret)
1834                         goto err_free_rxqueue;
1835
1836                 ret = rxa_add_intr_queue(rx_adapter, dev_info, rx_queue_id);
1837                 if (ret)
1838                         goto err_free_rxqueue;
1839         } else {
1840
1841                 num_intr_vec = 0;
1842                 if (rx_adapter->num_rx_intr > nb_rx_intr) {
1843                         num_intr_vec = rxa_nb_intr_vect(dev_info,
1844                                                 rx_queue_id, 0);
1845                         /* interrupt based queues are being converted to
1846                          * poll mode queues, delete the interrupt configuration
1847                          * for those.
1848                          */
1849                         ret = rxa_del_intr_queue(rx_adapter,
1850                                                 dev_info, rx_queue_id);
1851                         if (ret)
1852                                 goto err_free_rxqueue;
1853                 }
1854         }
1855
1856         if (nb_rx_intr == 0) {
1857                 ret = rxa_free_intr_resources(rx_adapter);
1858                 if (ret)
1859                         goto err_free_rxqueue;
1860         }
1861
1862         if (wt == 0) {
1863                 uint16_t i;
1864
1865                 if (rx_queue_id  == -1) {
1866                         for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++)
1867                                 dev_info->intr_queue[i] = i;
1868                 } else {
1869                         if (!rxa_intr_queue(dev_info, rx_queue_id))
1870                                 dev_info->intr_queue[nb_rx_intr - 1] =
1871                                         rx_queue_id;
1872                 }
1873         }
1874
1875
1876
1877         rxa_add_queue(rx_adapter, dev_info, rx_queue_id, queue_conf);
1878         rxa_calc_wrr_sequence(rx_adapter, rx_poll, rx_wrr);
1879
1880         rte_free(rx_adapter->eth_rx_poll);
1881         rte_free(rx_adapter->wrr_sched);
1882
1883         rx_adapter->eth_rx_poll = rx_poll;
1884         rx_adapter->wrr_sched = rx_wrr;
1885         rx_adapter->wrr_len = nb_wrr;
1886         rx_adapter->num_intr_vec += num_intr_vec;
1887         return 0;
1888
1889 err_free_rxqueue:
1890         if (rx_queue == NULL) {
1891                 rte_free(dev_info->rx_queue);
1892                 dev_info->rx_queue = NULL;
1893         }
1894
1895         rte_free(rx_poll);
1896         rte_free(rx_wrr);
1897
1898         return 0;
1899 }
1900
1901 static int
1902 rxa_ctrl(uint8_t id, int start)
1903 {
1904         struct rte_event_eth_rx_adapter *rx_adapter;
1905         struct rte_eventdev *dev;
1906         struct eth_device_info *dev_info;
1907         uint32_t i;
1908         int use_service = 0;
1909         int stop = !start;
1910
1911         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
1912         rx_adapter = rxa_id_to_adapter(id);
1913         if (rx_adapter == NULL)
1914                 return -EINVAL;
1915
1916         dev = &rte_eventdevs[rx_adapter->eventdev_id];
1917
1918         RTE_ETH_FOREACH_DEV(i) {
1919                 dev_info = &rx_adapter->eth_devices[i];
1920                 /* if start  check for num dev queues */
1921                 if (start && !dev_info->nb_dev_queues)
1922                         continue;
1923                 /* if stop check if dev has been started */
1924                 if (stop && !dev_info->dev_rx_started)
1925                         continue;
1926                 use_service |= !dev_info->internal_event_port;
1927                 dev_info->dev_rx_started = start;
1928                 if (dev_info->internal_event_port == 0)
1929                         continue;
1930                 start ? (*dev->dev_ops->eth_rx_adapter_start)(dev,
1931                                                 &rte_eth_devices[i]) :
1932                         (*dev->dev_ops->eth_rx_adapter_stop)(dev,
1933                                                 &rte_eth_devices[i]);
1934         }
1935
1936         if (use_service) {
1937                 rte_spinlock_lock(&rx_adapter->rx_lock);
1938                 rx_adapter->rxa_started = start;
1939                 rte_service_runstate_set(rx_adapter->service_id, start);
1940                 rte_spinlock_unlock(&rx_adapter->rx_lock);
1941         }
1942
1943         return 0;
1944 }
1945
1946 int
1947 rte_event_eth_rx_adapter_create_ext(uint8_t id, uint8_t dev_id,
1948                                 rte_event_eth_rx_adapter_conf_cb conf_cb,
1949                                 void *conf_arg)
1950 {
1951         struct rte_event_eth_rx_adapter *rx_adapter;
1952         int ret;
1953         int socket_id;
1954         uint16_t i;
1955         char mem_name[ETH_RX_ADAPTER_SERVICE_NAME_LEN];
1956         const uint8_t default_rss_key[] = {
1957                 0x6d, 0x5a, 0x56, 0xda, 0x25, 0x5b, 0x0e, 0xc2,
1958                 0x41, 0x67, 0x25, 0x3d, 0x43, 0xa3, 0x8f, 0xb0,
1959                 0xd0, 0xca, 0x2b, 0xcb, 0xae, 0x7b, 0x30, 0xb4,
1960                 0x77, 0xcb, 0x2d, 0xa3, 0x80, 0x30, 0xf2, 0x0c,
1961                 0x6a, 0x42, 0xb7, 0x3b, 0xbe, 0xac, 0x01, 0xfa,
1962         };
1963
1964         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
1965         RTE_EVENTDEV_VALID_DEVID_OR_ERR_RET(dev_id, -EINVAL);
1966         if (conf_cb == NULL)
1967                 return -EINVAL;
1968
1969         if (event_eth_rx_adapter == NULL) {
1970                 ret = rte_event_eth_rx_adapter_init();
1971                 if (ret)
1972                         return ret;
1973         }
1974
1975         rx_adapter = rxa_id_to_adapter(id);
1976         if (rx_adapter != NULL) {
1977                 RTE_EDEV_LOG_ERR("Eth Rx adapter exists id = %" PRIu8, id);
1978                 return -EEXIST;
1979         }
1980
1981         socket_id = rte_event_dev_socket_id(dev_id);
1982         snprintf(mem_name, ETH_RX_ADAPTER_MEM_NAME_LEN,
1983                 "rte_event_eth_rx_adapter_%d",
1984                 id);
1985
1986         rx_adapter = rte_zmalloc_socket(mem_name, sizeof(*rx_adapter),
1987                         RTE_CACHE_LINE_SIZE, socket_id);
1988         if (rx_adapter == NULL) {
1989                 RTE_EDEV_LOG_ERR("failed to get mem for rx adapter");
1990                 return -ENOMEM;
1991         }
1992
1993         rx_adapter->eventdev_id = dev_id;
1994         rx_adapter->socket_id = socket_id;
1995         rx_adapter->conf_cb = conf_cb;
1996         rx_adapter->conf_arg = conf_arg;
1997         rx_adapter->id = id;
1998         strcpy(rx_adapter->mem_name, mem_name);
1999         rx_adapter->eth_devices = rte_zmalloc_socket(rx_adapter->mem_name,
2000                                         RTE_MAX_ETHPORTS *
2001                                         sizeof(struct eth_device_info), 0,
2002                                         socket_id);
2003         rte_convert_rss_key((const uint32_t *)default_rss_key,
2004                         (uint32_t *)rx_adapter->rss_key_be,
2005                             RTE_DIM(default_rss_key));
2006
2007         if (rx_adapter->eth_devices == NULL) {
2008                 RTE_EDEV_LOG_ERR("failed to get mem for eth devices\n");
2009                 rte_free(rx_adapter);
2010                 return -ENOMEM;
2011         }
2012         rte_spinlock_init(&rx_adapter->rx_lock);
2013         for (i = 0; i < RTE_MAX_ETHPORTS; i++)
2014                 rx_adapter->eth_devices[i].dev = &rte_eth_devices[i];
2015
2016         event_eth_rx_adapter[id] = rx_adapter;
2017         if (conf_cb == rxa_default_conf_cb)
2018                 rx_adapter->default_cb_arg = 1;
2019         return 0;
2020 }
2021
2022 int
2023 rte_event_eth_rx_adapter_create(uint8_t id, uint8_t dev_id,
2024                 struct rte_event_port_conf *port_config)
2025 {
2026         struct rte_event_port_conf *pc;
2027         int ret;
2028
2029         if (port_config == NULL)
2030                 return -EINVAL;
2031         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2032
2033         pc = rte_malloc(NULL, sizeof(*pc), 0);
2034         if (pc == NULL)
2035                 return -ENOMEM;
2036         *pc = *port_config;
2037         ret = rte_event_eth_rx_adapter_create_ext(id, dev_id,
2038                                         rxa_default_conf_cb,
2039                                         pc);
2040         if (ret)
2041                 rte_free(pc);
2042         return ret;
2043 }
2044
2045 int
2046 rte_event_eth_rx_adapter_free(uint8_t id)
2047 {
2048         struct rte_event_eth_rx_adapter *rx_adapter;
2049
2050         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2051
2052         rx_adapter = rxa_id_to_adapter(id);
2053         if (rx_adapter == NULL)
2054                 return -EINVAL;
2055
2056         if (rx_adapter->nb_queues) {
2057                 RTE_EDEV_LOG_ERR("%" PRIu16 " Rx queues not deleted",
2058                                 rx_adapter->nb_queues);
2059                 return -EBUSY;
2060         }
2061
2062         if (rx_adapter->default_cb_arg)
2063                 rte_free(rx_adapter->conf_arg);
2064         rte_free(rx_adapter->eth_devices);
2065         rte_free(rx_adapter);
2066         event_eth_rx_adapter[id] = NULL;
2067
2068         return 0;
2069 }
2070
2071 int
2072 rte_event_eth_rx_adapter_queue_add(uint8_t id,
2073                 uint16_t eth_dev_id,
2074                 int32_t rx_queue_id,
2075                 const struct rte_event_eth_rx_adapter_queue_conf *queue_conf)
2076 {
2077         int ret;
2078         uint32_t cap;
2079         struct rte_event_eth_rx_adapter *rx_adapter;
2080         struct rte_eventdev *dev;
2081         struct eth_device_info *dev_info;
2082
2083         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2084         RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
2085
2086         rx_adapter = rxa_id_to_adapter(id);
2087         if ((rx_adapter == NULL) || (queue_conf == NULL))
2088                 return -EINVAL;
2089
2090         dev = &rte_eventdevs[rx_adapter->eventdev_id];
2091         ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
2092                                                 eth_dev_id,
2093                                                 &cap);
2094         if (ret) {
2095                 RTE_EDEV_LOG_ERR("Failed to get adapter caps edev %" PRIu8
2096                         "eth port %" PRIu16, id, eth_dev_id);
2097                 return ret;
2098         }
2099
2100         if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_OVERRIDE_FLOW_ID) == 0
2101                 && (queue_conf->rx_queue_flags &
2102                         RTE_EVENT_ETH_RX_ADAPTER_QUEUE_FLOW_ID_VALID)) {
2103                 RTE_EDEV_LOG_ERR("Flow ID override is not supported,"
2104                                 " eth port: %" PRIu16 " adapter id: %" PRIu8,
2105                                 eth_dev_id, id);
2106                 return -EINVAL;
2107         }
2108
2109         if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_MULTI_EVENTQ) == 0 &&
2110                 (rx_queue_id != -1)) {
2111                 RTE_EDEV_LOG_ERR("Rx queues can only be connected to single "
2112                         "event queue, eth port: %" PRIu16 " adapter id: %"
2113                         PRIu8, eth_dev_id, id);
2114                 return -EINVAL;
2115         }
2116
2117         if (rx_queue_id != -1 && (uint16_t)rx_queue_id >=
2118                         rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
2119                 RTE_EDEV_LOG_ERR("Invalid rx queue_id %" PRIu16,
2120                          (uint16_t)rx_queue_id);
2121                 return -EINVAL;
2122         }
2123
2124         dev_info = &rx_adapter->eth_devices[eth_dev_id];
2125
2126         if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
2127                 RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->eth_rx_adapter_queue_add,
2128                                         -ENOTSUP);
2129                 if (dev_info->rx_queue == NULL) {
2130                         dev_info->rx_queue =
2131                             rte_zmalloc_socket(rx_adapter->mem_name,
2132                                         dev_info->dev->data->nb_rx_queues *
2133                                         sizeof(struct eth_rx_queue_info), 0,
2134                                         rx_adapter->socket_id);
2135                         if (dev_info->rx_queue == NULL)
2136                                 return -ENOMEM;
2137                 }
2138
2139                 ret = (*dev->dev_ops->eth_rx_adapter_queue_add)(dev,
2140                                 &rte_eth_devices[eth_dev_id],
2141                                 rx_queue_id, queue_conf);
2142                 if (ret == 0) {
2143                         dev_info->internal_event_port = 1;
2144                         rxa_update_queue(rx_adapter,
2145                                         &rx_adapter->eth_devices[eth_dev_id],
2146                                         rx_queue_id,
2147                                         1);
2148                 }
2149         } else {
2150                 rte_spinlock_lock(&rx_adapter->rx_lock);
2151                 dev_info->internal_event_port = 0;
2152                 ret = rxa_init_service(rx_adapter, id);
2153                 if (ret == 0) {
2154                         uint32_t service_id = rx_adapter->service_id;
2155                         ret = rxa_sw_add(rx_adapter, eth_dev_id, rx_queue_id,
2156                                         queue_conf);
2157                         rte_service_component_runstate_set(service_id,
2158                                 rxa_sw_adapter_queue_count(rx_adapter));
2159                 }
2160                 rte_spinlock_unlock(&rx_adapter->rx_lock);
2161         }
2162
2163         if (ret)
2164                 return ret;
2165
2166         return 0;
2167 }
2168
2169 int
2170 rte_event_eth_rx_adapter_queue_del(uint8_t id, uint16_t eth_dev_id,
2171                                 int32_t rx_queue_id)
2172 {
2173         int ret = 0;
2174         struct rte_eventdev *dev;
2175         struct rte_event_eth_rx_adapter *rx_adapter;
2176         struct eth_device_info *dev_info;
2177         uint32_t cap;
2178         uint32_t nb_rx_poll = 0;
2179         uint32_t nb_wrr = 0;
2180         uint32_t nb_rx_intr;
2181         struct eth_rx_poll_entry *rx_poll = NULL;
2182         uint32_t *rx_wrr = NULL;
2183         int num_intr_vec;
2184
2185         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2186         RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
2187
2188         rx_adapter = rxa_id_to_adapter(id);
2189         if (rx_adapter == NULL)
2190                 return -EINVAL;
2191
2192         dev = &rte_eventdevs[rx_adapter->eventdev_id];
2193         ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
2194                                                 eth_dev_id,
2195                                                 &cap);
2196         if (ret)
2197                 return ret;
2198
2199         if (rx_queue_id != -1 && (uint16_t)rx_queue_id >=
2200                 rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
2201                 RTE_EDEV_LOG_ERR("Invalid rx queue_id %" PRIu16,
2202                          (uint16_t)rx_queue_id);
2203                 return -EINVAL;
2204         }
2205
2206         dev_info = &rx_adapter->eth_devices[eth_dev_id];
2207
2208         if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
2209                 RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->eth_rx_adapter_queue_del,
2210                                  -ENOTSUP);
2211                 ret = (*dev->dev_ops->eth_rx_adapter_queue_del)(dev,
2212                                                 &rte_eth_devices[eth_dev_id],
2213                                                 rx_queue_id);
2214                 if (ret == 0) {
2215                         rxa_update_queue(rx_adapter,
2216                                         &rx_adapter->eth_devices[eth_dev_id],
2217                                         rx_queue_id,
2218                                         0);
2219                         if (dev_info->nb_dev_queues == 0) {
2220                                 rte_free(dev_info->rx_queue);
2221                                 dev_info->rx_queue = NULL;
2222                         }
2223                 }
2224         } else {
2225                 rxa_calc_nb_post_del(rx_adapter, dev_info, rx_queue_id,
2226                         &nb_rx_poll, &nb_rx_intr, &nb_wrr);
2227
2228                 ret = rxa_alloc_poll_arrays(rx_adapter, nb_rx_poll, nb_wrr,
2229                         &rx_poll, &rx_wrr);
2230                 if (ret)
2231                         return ret;
2232
2233                 rte_spinlock_lock(&rx_adapter->rx_lock);
2234
2235                 num_intr_vec = 0;
2236                 if (rx_adapter->num_rx_intr > nb_rx_intr) {
2237
2238                         num_intr_vec = rxa_nb_intr_vect(dev_info,
2239                                                 rx_queue_id, 0);
2240                         ret = rxa_del_intr_queue(rx_adapter, dev_info,
2241                                         rx_queue_id);
2242                         if (ret)
2243                                 goto unlock_ret;
2244                 }
2245
2246                 if (nb_rx_intr == 0) {
2247                         ret = rxa_free_intr_resources(rx_adapter);
2248                         if (ret)
2249                                 goto unlock_ret;
2250                 }
2251
2252                 rxa_sw_del(rx_adapter, dev_info, rx_queue_id);
2253                 rxa_calc_wrr_sequence(rx_adapter, rx_poll, rx_wrr);
2254
2255                 rte_free(rx_adapter->eth_rx_poll);
2256                 rte_free(rx_adapter->wrr_sched);
2257
2258                 if (nb_rx_intr == 0) {
2259                         rte_free(dev_info->intr_queue);
2260                         dev_info->intr_queue = NULL;
2261                 }
2262
2263                 rx_adapter->eth_rx_poll = rx_poll;
2264                 rx_adapter->wrr_sched = rx_wrr;
2265                 rx_adapter->wrr_len = nb_wrr;
2266                 rx_adapter->num_intr_vec += num_intr_vec;
2267
2268                 if (dev_info->nb_dev_queues == 0) {
2269                         rte_free(dev_info->rx_queue);
2270                         dev_info->rx_queue = NULL;
2271                 }
2272 unlock_ret:
2273                 rte_spinlock_unlock(&rx_adapter->rx_lock);
2274                 if (ret) {
2275                         rte_free(rx_poll);
2276                         rte_free(rx_wrr);
2277                         return ret;
2278                 }
2279
2280                 rte_service_component_runstate_set(rx_adapter->service_id,
2281                                 rxa_sw_adapter_queue_count(rx_adapter));
2282         }
2283
2284         return ret;
2285 }
2286
2287 int
2288 rte_event_eth_rx_adapter_start(uint8_t id)
2289 {
2290         return rxa_ctrl(id, 1);
2291 }
2292
2293 int
2294 rte_event_eth_rx_adapter_stop(uint8_t id)
2295 {
2296         return rxa_ctrl(id, 0);
2297 }
2298
2299 int
2300 rte_event_eth_rx_adapter_stats_get(uint8_t id,
2301                                struct rte_event_eth_rx_adapter_stats *stats)
2302 {
2303         struct rte_event_eth_rx_adapter *rx_adapter;
2304         struct rte_event_eth_rx_adapter_stats dev_stats_sum = { 0 };
2305         struct rte_event_eth_rx_adapter_stats dev_stats;
2306         struct rte_eventdev *dev;
2307         struct eth_device_info *dev_info;
2308         uint32_t i;
2309         int ret;
2310
2311         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2312
2313         rx_adapter = rxa_id_to_adapter(id);
2314         if (rx_adapter  == NULL || stats == NULL)
2315                 return -EINVAL;
2316
2317         dev = &rte_eventdevs[rx_adapter->eventdev_id];
2318         memset(stats, 0, sizeof(*stats));
2319         RTE_ETH_FOREACH_DEV(i) {
2320                 dev_info = &rx_adapter->eth_devices[i];
2321                 if (dev_info->internal_event_port == 0 ||
2322                         dev->dev_ops->eth_rx_adapter_stats_get == NULL)
2323                         continue;
2324                 ret = (*dev->dev_ops->eth_rx_adapter_stats_get)(dev,
2325                                                 &rte_eth_devices[i],
2326                                                 &dev_stats);
2327                 if (ret)
2328                         continue;
2329                 dev_stats_sum.rx_packets += dev_stats.rx_packets;
2330                 dev_stats_sum.rx_enq_count += dev_stats.rx_enq_count;
2331         }
2332
2333         if (rx_adapter->service_inited)
2334                 *stats = rx_adapter->stats;
2335
2336         stats->rx_packets += dev_stats_sum.rx_packets;
2337         stats->rx_enq_count += dev_stats_sum.rx_enq_count;
2338         return 0;
2339 }
2340
2341 int
2342 rte_event_eth_rx_adapter_stats_reset(uint8_t id)
2343 {
2344         struct rte_event_eth_rx_adapter *rx_adapter;
2345         struct rte_eventdev *dev;
2346         struct eth_device_info *dev_info;
2347         uint32_t i;
2348
2349         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2350
2351         rx_adapter = rxa_id_to_adapter(id);
2352         if (rx_adapter == NULL)
2353                 return -EINVAL;
2354
2355         dev = &rte_eventdevs[rx_adapter->eventdev_id];
2356         RTE_ETH_FOREACH_DEV(i) {
2357                 dev_info = &rx_adapter->eth_devices[i];
2358                 if (dev_info->internal_event_port == 0 ||
2359                         dev->dev_ops->eth_rx_adapter_stats_reset == NULL)
2360                         continue;
2361                 (*dev->dev_ops->eth_rx_adapter_stats_reset)(dev,
2362                                                         &rte_eth_devices[i]);
2363         }
2364
2365         memset(&rx_adapter->stats, 0, sizeof(rx_adapter->stats));
2366         return 0;
2367 }
2368
2369 int
2370 rte_event_eth_rx_adapter_service_id_get(uint8_t id, uint32_t *service_id)
2371 {
2372         struct rte_event_eth_rx_adapter *rx_adapter;
2373
2374         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2375
2376         rx_adapter = rxa_id_to_adapter(id);
2377         if (rx_adapter == NULL || service_id == NULL)
2378                 return -EINVAL;
2379
2380         if (rx_adapter->service_inited)
2381                 *service_id = rx_adapter->service_id;
2382
2383         return rx_adapter->service_inited ? 0 : -ESRCH;
2384 }
2385
2386 int rte_event_eth_rx_adapter_cb_register(uint8_t id,
2387                                         uint16_t eth_dev_id,
2388                                         rte_event_eth_rx_adapter_cb_fn cb_fn,
2389                                         void *cb_arg)
2390 {
2391         struct rte_event_eth_rx_adapter *rx_adapter;
2392         struct eth_device_info *dev_info;
2393         uint32_t cap;
2394         int ret;
2395
2396         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2397         RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
2398
2399         rx_adapter = rxa_id_to_adapter(id);
2400         if (rx_adapter == NULL)
2401                 return -EINVAL;
2402
2403         dev_info = &rx_adapter->eth_devices[eth_dev_id];
2404         if (dev_info->rx_queue == NULL)
2405                 return -EINVAL;
2406
2407         ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
2408                                                 eth_dev_id,
2409                                                 &cap);
2410         if (ret) {
2411                 RTE_EDEV_LOG_ERR("Failed to get adapter caps edev %" PRIu8
2412                         "eth port %" PRIu16, id, eth_dev_id);
2413                 return ret;
2414         }
2415
2416         if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
2417                 RTE_EDEV_LOG_ERR("Rx callback not supported for eth port %"
2418                                 PRIu16, eth_dev_id);
2419                 return -EINVAL;
2420         }
2421
2422         rte_spinlock_lock(&rx_adapter->rx_lock);
2423         dev_info->cb_fn = cb_fn;
2424         dev_info->cb_arg = cb_arg;
2425         rte_spinlock_unlock(&rx_adapter->rx_lock);
2426
2427         return 0;
2428 }