New upstream version 18.02
[deb_dpdk.git] / lib / librte_eventdev / rte_event_eth_rx_adapter.c
1 #include <rte_cycles.h>
2 #include <rte_common.h>
3 #include <rte_dev.h>
4 #include <rte_errno.h>
5 #include <rte_ethdev.h>
6 #include <rte_log.h>
7 #include <rte_malloc.h>
8 #include <rte_service_component.h>
9 #include <rte_thash.h>
10
11 #include "rte_eventdev.h"
12 #include "rte_eventdev_pmd.h"
13 #include "rte_event_eth_rx_adapter.h"
14
15 #define BATCH_SIZE              32
16 #define BLOCK_CNT_THRESHOLD     10
17 #define ETH_EVENT_BUFFER_SIZE   (4*BATCH_SIZE)
18
19 #define ETH_RX_ADAPTER_SERVICE_NAME_LEN 32
20 #define ETH_RX_ADAPTER_MEM_NAME_LEN     32
21
22 #define RSS_KEY_SIZE    40
23
24 /*
25  * There is an instance of this struct per polled Rx queue added to the
26  * adapter
27  */
28 struct eth_rx_poll_entry {
29         /* Eth port to poll */
30         uint8_t eth_dev_id;
31         /* Eth rx queue to poll */
32         uint16_t eth_rx_qid;
33 };
34
35 /* Instance per adapter */
36 struct rte_eth_event_enqueue_buffer {
37         /* Count of events in this buffer */
38         uint16_t count;
39         /* Array of events in this buffer */
40         struct rte_event events[ETH_EVENT_BUFFER_SIZE];
41 };
42
43 struct rte_event_eth_rx_adapter {
44         /* RSS key */
45         uint8_t rss_key_be[RSS_KEY_SIZE];
46         /* Event device identifier */
47         uint8_t eventdev_id;
48         /* Per ethernet device structure */
49         struct eth_device_info *eth_devices;
50         /* Event port identifier */
51         uint8_t event_port_id;
52         /* Lock to serialize config updates with service function */
53         rte_spinlock_t rx_lock;
54         /* Max mbufs processed in any service function invocation */
55         uint32_t max_nb_rx;
56         /* Receive queues that need to be polled */
57         struct eth_rx_poll_entry *eth_rx_poll;
58         /* Size of the eth_rx_poll array */
59         uint16_t num_rx_polled;
60         /* Weighted round robin schedule */
61         uint32_t *wrr_sched;
62         /* wrr_sched[] size */
63         uint32_t wrr_len;
64         /* Next entry in wrr[] to begin polling */
65         uint32_t wrr_pos;
66         /* Event burst buffer */
67         struct rte_eth_event_enqueue_buffer event_enqueue_buffer;
68         /* Per adapter stats */
69         struct rte_event_eth_rx_adapter_stats stats;
70         /* Block count, counts up to BLOCK_CNT_THRESHOLD */
71         uint16_t enq_block_count;
72         /* Block start ts */
73         uint64_t rx_enq_block_start_ts;
74         /* Configuration callback for rte_service configuration */
75         rte_event_eth_rx_adapter_conf_cb conf_cb;
76         /* Configuration callback argument */
77         void *conf_arg;
78         /* Set if  default_cb is being used */
79         int default_cb_arg;
80         /* Service initialization state */
81         uint8_t service_inited;
82         /* Total count of Rx queues in adapter */
83         uint32_t nb_queues;
84         /* Memory allocation name */
85         char mem_name[ETH_RX_ADAPTER_MEM_NAME_LEN];
86         /* Socket identifier cached from eventdev */
87         int socket_id;
88         /* Per adapter EAL service */
89         uint32_t service_id;
90 } __rte_cache_aligned;
91
92 /* Per eth device */
93 struct eth_device_info {
94         struct rte_eth_dev *dev;
95         struct eth_rx_queue_info *rx_queue;
96         /* Set if ethdev->eventdev packet transfer uses a
97          * hardware mechanism
98          */
99         uint8_t internal_event_port;
100         /* Set if the adapter is processing rx queues for
101          * this eth device and packet processing has been
102          * started, allows for the code to know if the PMD
103          * rx_adapter_stop callback needs to be invoked
104          */
105         uint8_t dev_rx_started;
106         /* If nb_dev_queues > 0, the start callback will
107          * be invoked if not already invoked
108          */
109         uint16_t nb_dev_queues;
110 };
111
112 /* Per Rx queue */
113 struct eth_rx_queue_info {
114         int queue_enabled;      /* True if added */
115         uint16_t wt;            /* Polling weight */
116         uint8_t event_queue_id; /* Event queue to enqueue packets to */
117         uint8_t sched_type;     /* Sched type for events */
118         uint8_t priority;       /* Event priority */
119         uint32_t flow_id;       /* App provided flow identifier */
120         uint32_t flow_id_mask;  /* Set to ~0 if app provides flow id else 0 */
121 };
122
123 static struct rte_event_eth_rx_adapter **event_eth_rx_adapter;
124
125 static inline int
126 valid_id(uint8_t id)
127 {
128         return id < RTE_EVENT_ETH_RX_ADAPTER_MAX_INSTANCE;
129 }
130
131 #define RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, retval) do { \
132         if (!valid_id(id)) { \
133                 RTE_EDEV_LOG_ERR("Invalid eth Rx adapter id = %d\n", id); \
134                 return retval; \
135         } \
136 } while (0)
137
138 static inline int
139 sw_rx_adapter_queue_count(struct rte_event_eth_rx_adapter *rx_adapter)
140 {
141         return rx_adapter->num_rx_polled;
142 }
143
144 /* Greatest common divisor */
145 static uint16_t gcd_u16(uint16_t a, uint16_t b)
146 {
147         uint16_t r = a % b;
148
149         return r ? gcd_u16(b, r) : b;
150 }
151
152 /* Returns the next queue in the polling sequence
153  *
154  * http://kb.linuxvirtualserver.org/wiki/Weighted_Round-Robin_Scheduling
155  */
156 static int
157 wrr_next(struct rte_event_eth_rx_adapter *rx_adapter,
158          unsigned int n, int *cw,
159          struct eth_rx_poll_entry *eth_rx_poll, uint16_t max_wt,
160          uint16_t gcd, int prev)
161 {
162         int i = prev;
163         uint16_t w;
164
165         while (1) {
166                 uint16_t q;
167                 uint8_t d;
168
169                 i = (i + 1) % n;
170                 if (i == 0) {
171                         *cw = *cw - gcd;
172                         if (*cw <= 0)
173                                 *cw = max_wt;
174                 }
175
176                 q = eth_rx_poll[i].eth_rx_qid;
177                 d = eth_rx_poll[i].eth_dev_id;
178                 w = rx_adapter->eth_devices[d].rx_queue[q].wt;
179
180                 if ((int)w >= *cw)
181                         return i;
182         }
183 }
184
185 /* Precalculate WRR polling sequence for all queues in rx_adapter */
186 static int
187 eth_poll_wrr_calc(struct rte_event_eth_rx_adapter *rx_adapter)
188 {
189         uint8_t d;
190         uint16_t q;
191         unsigned int i;
192
193         /* Initialize variables for calculation of wrr schedule */
194         uint16_t max_wrr_pos = 0;
195         unsigned int poll_q = 0;
196         uint16_t max_wt = 0;
197         uint16_t gcd = 0;
198
199         struct eth_rx_poll_entry *rx_poll = NULL;
200         uint32_t *rx_wrr = NULL;
201
202         if (rx_adapter->num_rx_polled) {
203                 size_t len = RTE_ALIGN(rx_adapter->num_rx_polled *
204                                 sizeof(*rx_adapter->eth_rx_poll),
205                                 RTE_CACHE_LINE_SIZE);
206                 rx_poll = rte_zmalloc_socket(rx_adapter->mem_name,
207                                              len,
208                                              RTE_CACHE_LINE_SIZE,
209                                              rx_adapter->socket_id);
210                 if (rx_poll == NULL)
211                         return -ENOMEM;
212
213                 /* Generate array of all queues to poll, the size of this
214                  * array is poll_q
215                  */
216                 for (d = 0; d < rte_eth_dev_count(); d++) {
217                         uint16_t nb_rx_queues;
218                         struct eth_device_info *dev_info =
219                                         &rx_adapter->eth_devices[d];
220                         nb_rx_queues = dev_info->dev->data->nb_rx_queues;
221                         if (dev_info->rx_queue == NULL)
222                                 continue;
223                         for (q = 0; q < nb_rx_queues; q++) {
224                                 struct eth_rx_queue_info *queue_info =
225                                         &dev_info->rx_queue[q];
226                                 if (queue_info->queue_enabled == 0)
227                                         continue;
228
229                                 uint16_t wt = queue_info->wt;
230                                 rx_poll[poll_q].eth_dev_id = d;
231                                 rx_poll[poll_q].eth_rx_qid = q;
232                                 max_wrr_pos += wt;
233                                 max_wt = RTE_MAX(max_wt, wt);
234                                 gcd = (gcd) ? gcd_u16(gcd, wt) : wt;
235                                 poll_q++;
236                         }
237                 }
238
239                 len = RTE_ALIGN(max_wrr_pos * sizeof(*rx_wrr),
240                                 RTE_CACHE_LINE_SIZE);
241                 rx_wrr = rte_zmalloc_socket(rx_adapter->mem_name,
242                                             len,
243                                             RTE_CACHE_LINE_SIZE,
244                                             rx_adapter->socket_id);
245                 if (rx_wrr == NULL) {
246                         rte_free(rx_poll);
247                         return -ENOMEM;
248                 }
249
250                 /* Generate polling sequence based on weights */
251                 int prev = -1;
252                 int cw = -1;
253                 for (i = 0; i < max_wrr_pos; i++) {
254                         rx_wrr[i] = wrr_next(rx_adapter, poll_q, &cw,
255                                              rx_poll, max_wt, gcd, prev);
256                         prev = rx_wrr[i];
257                 }
258         }
259
260         rte_free(rx_adapter->eth_rx_poll);
261         rte_free(rx_adapter->wrr_sched);
262
263         rx_adapter->eth_rx_poll = rx_poll;
264         rx_adapter->wrr_sched = rx_wrr;
265         rx_adapter->wrr_len = max_wrr_pos;
266
267         return 0;
268 }
269
270 static inline void
271 mtoip(struct rte_mbuf *m, struct ipv4_hdr **ipv4_hdr,
272         struct ipv6_hdr **ipv6_hdr)
273 {
274         struct ether_hdr *eth_hdr = rte_pktmbuf_mtod(m, struct ether_hdr *);
275         struct vlan_hdr *vlan_hdr;
276
277         *ipv4_hdr = NULL;
278         *ipv6_hdr = NULL;
279
280         switch (eth_hdr->ether_type) {
281         case RTE_BE16(ETHER_TYPE_IPv4):
282                 *ipv4_hdr = (struct ipv4_hdr *)(eth_hdr + 1);
283                 break;
284
285         case RTE_BE16(ETHER_TYPE_IPv6):
286                 *ipv6_hdr = (struct ipv6_hdr *)(eth_hdr + 1);
287                 break;
288
289         case RTE_BE16(ETHER_TYPE_VLAN):
290                 vlan_hdr = (struct vlan_hdr *)(eth_hdr + 1);
291                 switch (vlan_hdr->eth_proto) {
292                 case RTE_BE16(ETHER_TYPE_IPv4):
293                         *ipv4_hdr = (struct ipv4_hdr *)(vlan_hdr + 1);
294                         break;
295                 case RTE_BE16(ETHER_TYPE_IPv6):
296                         *ipv6_hdr = (struct ipv6_hdr *)(vlan_hdr + 1);
297                         break;
298                 default:
299                         break;
300                 }
301                 break;
302
303         default:
304                 break;
305         }
306 }
307
308 /* Calculate RSS hash for IPv4/6 */
309 static inline uint32_t
310 do_softrss(struct rte_mbuf *m, const uint8_t *rss_key_be)
311 {
312         uint32_t input_len;
313         void *tuple;
314         struct rte_ipv4_tuple ipv4_tuple;
315         struct rte_ipv6_tuple ipv6_tuple;
316         struct ipv4_hdr *ipv4_hdr;
317         struct ipv6_hdr *ipv6_hdr;
318
319         mtoip(m, &ipv4_hdr, &ipv6_hdr);
320
321         if (ipv4_hdr) {
322                 ipv4_tuple.src_addr = rte_be_to_cpu_32(ipv4_hdr->src_addr);
323                 ipv4_tuple.dst_addr = rte_be_to_cpu_32(ipv4_hdr->dst_addr);
324                 tuple = &ipv4_tuple;
325                 input_len = RTE_THASH_V4_L3_LEN;
326         } else if (ipv6_hdr) {
327                 rte_thash_load_v6_addrs(ipv6_hdr,
328                                         (union rte_thash_tuple *)&ipv6_tuple);
329                 tuple = &ipv6_tuple;
330                 input_len = RTE_THASH_V6_L3_LEN;
331         } else
332                 return 0;
333
334         return rte_softrss_be(tuple, input_len, rss_key_be);
335 }
336
337 static inline int
338 rx_enq_blocked(struct rte_event_eth_rx_adapter *rx_adapter)
339 {
340         return !!rx_adapter->enq_block_count;
341 }
342
343 static inline void
344 rx_enq_block_start_ts(struct rte_event_eth_rx_adapter *rx_adapter)
345 {
346         if (rx_adapter->rx_enq_block_start_ts)
347                 return;
348
349         rx_adapter->enq_block_count++;
350         if (rx_adapter->enq_block_count < BLOCK_CNT_THRESHOLD)
351                 return;
352
353         rx_adapter->rx_enq_block_start_ts = rte_get_tsc_cycles();
354 }
355
356 static inline void
357 rx_enq_block_end_ts(struct rte_event_eth_rx_adapter *rx_adapter,
358                     struct rte_event_eth_rx_adapter_stats *stats)
359 {
360         if (unlikely(!stats->rx_enq_start_ts))
361                 stats->rx_enq_start_ts = rte_get_tsc_cycles();
362
363         if (likely(!rx_enq_blocked(rx_adapter)))
364                 return;
365
366         rx_adapter->enq_block_count = 0;
367         if (rx_adapter->rx_enq_block_start_ts) {
368                 stats->rx_enq_end_ts = rte_get_tsc_cycles();
369                 stats->rx_enq_block_cycles += stats->rx_enq_end_ts -
370                     rx_adapter->rx_enq_block_start_ts;
371                 rx_adapter->rx_enq_block_start_ts = 0;
372         }
373 }
374
375 /* Add event to buffer, free space check is done prior to calling
376  * this function
377  */
378 static inline void
379 buf_event_enqueue(struct rte_event_eth_rx_adapter *rx_adapter,
380                   struct rte_event *ev)
381 {
382         struct rte_eth_event_enqueue_buffer *buf =
383             &rx_adapter->event_enqueue_buffer;
384         rte_memcpy(&buf->events[buf->count++], ev, sizeof(struct rte_event));
385 }
386
387 /* Enqueue buffered events to event device */
388 static inline uint16_t
389 flush_event_buffer(struct rte_event_eth_rx_adapter *rx_adapter)
390 {
391         struct rte_eth_event_enqueue_buffer *buf =
392             &rx_adapter->event_enqueue_buffer;
393         struct rte_event_eth_rx_adapter_stats *stats = &rx_adapter->stats;
394
395         uint16_t n = rte_event_enqueue_new_burst(rx_adapter->eventdev_id,
396                                         rx_adapter->event_port_id,
397                                         buf->events,
398                                         buf->count);
399         if (n != buf->count) {
400                 memmove(buf->events,
401                         &buf->events[n],
402                         (buf->count - n) * sizeof(struct rte_event));
403                 stats->rx_enq_retry++;
404         }
405
406         n ? rx_enq_block_end_ts(rx_adapter, stats) :
407                 rx_enq_block_start_ts(rx_adapter);
408
409         buf->count -= n;
410         stats->rx_enq_count += n;
411
412         return n;
413 }
414
415 static inline void
416 fill_event_buffer(struct rte_event_eth_rx_adapter *rx_adapter,
417         uint8_t dev_id,
418         uint16_t rx_queue_id,
419         struct rte_mbuf **mbufs,
420         uint16_t num)
421 {
422         uint32_t i;
423         struct eth_device_info *eth_device_info =
424                                         &rx_adapter->eth_devices[dev_id];
425         struct eth_rx_queue_info *eth_rx_queue_info =
426                                         &eth_device_info->rx_queue[rx_queue_id];
427
428         int32_t qid = eth_rx_queue_info->event_queue_id;
429         uint8_t sched_type = eth_rx_queue_info->sched_type;
430         uint8_t priority = eth_rx_queue_info->priority;
431         uint32_t flow_id;
432         struct rte_event events[BATCH_SIZE];
433         struct rte_mbuf *m = mbufs[0];
434         uint32_t rss_mask;
435         uint32_t rss;
436         int do_rss;
437
438         /* 0xffff ffff if PKT_RX_RSS_HASH is set, otherwise 0 */
439         rss_mask = ~(((m->ol_flags & PKT_RX_RSS_HASH) != 0) - 1);
440         do_rss = !rss_mask && !eth_rx_queue_info->flow_id_mask;
441
442         for (i = 0; i < num; i++) {
443                 m = mbufs[i];
444                 struct rte_event *ev = &events[i];
445
446                 rss = do_rss ?
447                         do_softrss(m, rx_adapter->rss_key_be) : m->hash.rss;
448                 flow_id =
449                     eth_rx_queue_info->flow_id &
450                                 eth_rx_queue_info->flow_id_mask;
451                 flow_id |= rss & ~eth_rx_queue_info->flow_id_mask;
452
453                 ev->flow_id = flow_id;
454                 ev->op = RTE_EVENT_OP_NEW;
455                 ev->sched_type = sched_type;
456                 ev->queue_id = qid;
457                 ev->event_type = RTE_EVENT_TYPE_ETH_RX_ADAPTER;
458                 ev->sub_event_type = 0;
459                 ev->priority = priority;
460                 ev->mbuf = m;
461
462                 buf_event_enqueue(rx_adapter, ev);
463         }
464 }
465
466 /*
467  * Polls receive queues added to the event adapter and enqueues received
468  * packets to the event device.
469  *
470  * The receive code enqueues initially to a temporary buffer, the
471  * temporary buffer is drained anytime it holds >= BATCH_SIZE packets
472  *
473  * If there isn't space available in the temporary buffer, packets from the
474  * Rx queue aren't dequeued from the eth device, this back pressures the
475  * eth device, in virtual device environments this back pressure is relayed to
476  * the hypervisor's switching layer where adjustments can be made to deal with
477  * it.
478  */
479 static inline uint32_t
480 eth_rx_poll(struct rte_event_eth_rx_adapter *rx_adapter)
481 {
482         uint32_t num_queue;
483         uint16_t n;
484         uint32_t nb_rx = 0;
485         struct rte_mbuf *mbufs[BATCH_SIZE];
486         struct rte_eth_event_enqueue_buffer *buf;
487         uint32_t wrr_pos;
488         uint32_t max_nb_rx;
489
490         wrr_pos = rx_adapter->wrr_pos;
491         max_nb_rx = rx_adapter->max_nb_rx;
492         buf = &rx_adapter->event_enqueue_buffer;
493         struct rte_event_eth_rx_adapter_stats *stats = &rx_adapter->stats;
494
495         /* Iterate through a WRR sequence */
496         for (num_queue = 0; num_queue < rx_adapter->wrr_len; num_queue++) {
497                 unsigned int poll_idx = rx_adapter->wrr_sched[wrr_pos];
498                 uint16_t qid = rx_adapter->eth_rx_poll[poll_idx].eth_rx_qid;
499                 uint8_t d = rx_adapter->eth_rx_poll[poll_idx].eth_dev_id;
500
501                 /* Don't do a batch dequeue from the rx queue if there isn't
502                  * enough space in the enqueue buffer.
503                  */
504                 if (buf->count >= BATCH_SIZE)
505                         flush_event_buffer(rx_adapter);
506                 if (BATCH_SIZE > (ETH_EVENT_BUFFER_SIZE - buf->count))
507                         break;
508
509                 stats->rx_poll_count++;
510                 n = rte_eth_rx_burst(d, qid, mbufs, BATCH_SIZE);
511
512                 if (n) {
513                         stats->rx_packets += n;
514                         /* The check before rte_eth_rx_burst() ensures that
515                          * all n mbufs can be buffered
516                          */
517                         fill_event_buffer(rx_adapter, d, qid, mbufs, n);
518                         nb_rx += n;
519                         if (nb_rx > max_nb_rx) {
520                                 rx_adapter->wrr_pos =
521                                     (wrr_pos + 1) % rx_adapter->wrr_len;
522                                 return nb_rx;
523                         }
524                 }
525
526                 if (++wrr_pos == rx_adapter->wrr_len)
527                         wrr_pos = 0;
528         }
529
530         return nb_rx;
531 }
532
533 static int
534 event_eth_rx_adapter_service_func(void *args)
535 {
536         struct rte_event_eth_rx_adapter *rx_adapter = args;
537         struct rte_eth_event_enqueue_buffer *buf;
538
539         buf = &rx_adapter->event_enqueue_buffer;
540         if (rte_spinlock_trylock(&rx_adapter->rx_lock) == 0)
541                 return 0;
542         if (eth_rx_poll(rx_adapter) == 0 && buf->count)
543                 flush_event_buffer(rx_adapter);
544         rte_spinlock_unlock(&rx_adapter->rx_lock);
545         return 0;
546 }
547
548 static int
549 rte_event_eth_rx_adapter_init(void)
550 {
551         const char *name = "rte_event_eth_rx_adapter_array";
552         const struct rte_memzone *mz;
553         unsigned int sz;
554
555         sz = sizeof(*event_eth_rx_adapter) *
556             RTE_EVENT_ETH_RX_ADAPTER_MAX_INSTANCE;
557         sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);
558
559         mz = rte_memzone_lookup(name);
560         if (mz == NULL) {
561                 mz = rte_memzone_reserve_aligned(name, sz, rte_socket_id(), 0,
562                                                  RTE_CACHE_LINE_SIZE);
563                 if (mz == NULL) {
564                         RTE_EDEV_LOG_ERR("failed to reserve memzone err = %"
565                                         PRId32, rte_errno);
566                         return -rte_errno;
567                 }
568         }
569
570         event_eth_rx_adapter = mz->addr;
571         return 0;
572 }
573
574 static inline struct rte_event_eth_rx_adapter *
575 id_to_rx_adapter(uint8_t id)
576 {
577         return event_eth_rx_adapter ?
578                 event_eth_rx_adapter[id] : NULL;
579 }
580
581 static int
582 default_conf_cb(uint8_t id, uint8_t dev_id,
583                 struct rte_event_eth_rx_adapter_conf *conf, void *arg)
584 {
585         int ret;
586         struct rte_eventdev *dev;
587         struct rte_event_dev_config dev_conf;
588         int started;
589         uint8_t port_id;
590         struct rte_event_port_conf *port_conf = arg;
591         struct rte_event_eth_rx_adapter *rx_adapter = id_to_rx_adapter(id);
592
593         dev = &rte_eventdevs[rx_adapter->eventdev_id];
594         dev_conf = dev->data->dev_conf;
595
596         started = dev->data->dev_started;
597         if (started)
598                 rte_event_dev_stop(dev_id);
599         port_id = dev_conf.nb_event_ports;
600         dev_conf.nb_event_ports += 1;
601         ret = rte_event_dev_configure(dev_id, &dev_conf);
602         if (ret) {
603                 RTE_EDEV_LOG_ERR("failed to configure event dev %u\n",
604                                                 dev_id);
605                 if (started) {
606                         if (rte_event_dev_start(dev_id))
607                                 return -EIO;
608                 }
609                 return ret;
610         }
611
612         ret = rte_event_port_setup(dev_id, port_id, port_conf);
613         if (ret) {
614                 RTE_EDEV_LOG_ERR("failed to setup event port %u\n",
615                                         port_id);
616                 return ret;
617         }
618
619         conf->event_port_id = port_id;
620         conf->max_nb_rx = 128;
621         if (started)
622                 ret = rte_event_dev_start(dev_id);
623         rx_adapter->default_cb_arg = 1;
624         return ret;
625 }
626
627 static int
628 init_service(struct rte_event_eth_rx_adapter *rx_adapter, uint8_t id)
629 {
630         int ret;
631         struct rte_service_spec service;
632         struct rte_event_eth_rx_adapter_conf rx_adapter_conf;
633
634         if (rx_adapter->service_inited)
635                 return 0;
636
637         memset(&service, 0, sizeof(service));
638         snprintf(service.name, ETH_RX_ADAPTER_SERVICE_NAME_LEN,
639                 "rte_event_eth_rx_adapter_%d", id);
640         service.socket_id = rx_adapter->socket_id;
641         service.callback = event_eth_rx_adapter_service_func;
642         service.callback_userdata = rx_adapter;
643         /* Service function handles locking for queue add/del updates */
644         service.capabilities = RTE_SERVICE_CAP_MT_SAFE;
645         ret = rte_service_component_register(&service, &rx_adapter->service_id);
646         if (ret) {
647                 RTE_EDEV_LOG_ERR("failed to register service %s err = %" PRId32,
648                         service.name, ret);
649                 return ret;
650         }
651
652         ret = rx_adapter->conf_cb(id, rx_adapter->eventdev_id,
653                 &rx_adapter_conf, rx_adapter->conf_arg);
654         if (ret) {
655                 RTE_EDEV_LOG_ERR("configuration callback failed err = %" PRId32,
656                         ret);
657                 goto err_done;
658         }
659         rx_adapter->event_port_id = rx_adapter_conf.event_port_id;
660         rx_adapter->max_nb_rx = rx_adapter_conf.max_nb_rx;
661         rx_adapter->service_inited = 1;
662         return 0;
663
664 err_done:
665         rte_service_component_unregister(rx_adapter->service_id);
666         return ret;
667 }
668
669
670 static void
671 update_queue_info(struct rte_event_eth_rx_adapter *rx_adapter,
672                 struct eth_device_info *dev_info,
673                 int32_t rx_queue_id,
674                 uint8_t add)
675 {
676         struct eth_rx_queue_info *queue_info;
677         int enabled;
678         uint16_t i;
679
680         if (dev_info->rx_queue == NULL)
681                 return;
682
683         if (rx_queue_id == -1) {
684                 for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++)
685                         update_queue_info(rx_adapter, dev_info, i, add);
686         } else {
687                 queue_info = &dev_info->rx_queue[rx_queue_id];
688                 enabled = queue_info->queue_enabled;
689                 if (add) {
690                         rx_adapter->nb_queues += !enabled;
691                         dev_info->nb_dev_queues += !enabled;
692                 } else {
693                         rx_adapter->nb_queues -= enabled;
694                         dev_info->nb_dev_queues -= enabled;
695                 }
696                 queue_info->queue_enabled = !!add;
697         }
698 }
699
700 static int
701 event_eth_rx_adapter_queue_del(struct rte_event_eth_rx_adapter *rx_adapter,
702                             struct eth_device_info *dev_info,
703                             uint16_t rx_queue_id)
704 {
705         struct eth_rx_queue_info *queue_info;
706
707         if (rx_adapter->nb_queues == 0)
708                 return 0;
709
710         queue_info = &dev_info->rx_queue[rx_queue_id];
711         rx_adapter->num_rx_polled -= queue_info->queue_enabled;
712         update_queue_info(rx_adapter, dev_info, rx_queue_id, 0);
713         return 0;
714 }
715
716 static void
717 event_eth_rx_adapter_queue_add(struct rte_event_eth_rx_adapter *rx_adapter,
718                 struct eth_device_info *dev_info,
719                 uint16_t rx_queue_id,
720                 const struct rte_event_eth_rx_adapter_queue_conf *conf)
721
722 {
723         struct eth_rx_queue_info *queue_info;
724         const struct rte_event *ev = &conf->ev;
725
726         queue_info = &dev_info->rx_queue[rx_queue_id];
727         queue_info->event_queue_id = ev->queue_id;
728         queue_info->sched_type = ev->sched_type;
729         queue_info->priority = ev->priority;
730         queue_info->wt = conf->servicing_weight;
731
732         if (conf->rx_queue_flags &
733                         RTE_EVENT_ETH_RX_ADAPTER_QUEUE_FLOW_ID_VALID) {
734                 queue_info->flow_id = ev->flow_id;
735                 queue_info->flow_id_mask = ~0;
736         }
737
738         /* The same queue can be added more than once */
739         rx_adapter->num_rx_polled += !queue_info->queue_enabled;
740         update_queue_info(rx_adapter, dev_info, rx_queue_id, 1);
741 }
742
743 static int add_rx_queue(struct rte_event_eth_rx_adapter *rx_adapter,
744                 uint8_t eth_dev_id,
745                 int rx_queue_id,
746                 const struct rte_event_eth_rx_adapter_queue_conf *queue_conf)
747 {
748         struct eth_device_info *dev_info = &rx_adapter->eth_devices[eth_dev_id];
749         struct rte_event_eth_rx_adapter_queue_conf temp_conf;
750         uint32_t i;
751         int ret;
752
753         if (queue_conf->servicing_weight == 0) {
754
755                 struct rte_eth_dev_data *data = dev_info->dev->data;
756                 if (data->dev_conf.intr_conf.rxq) {
757                         RTE_EDEV_LOG_ERR("Interrupt driven queues"
758                                         " not supported");
759                         return -ENOTSUP;
760                 }
761                 temp_conf = *queue_conf;
762
763                 /* If Rx interrupts are disabled set wt = 1 */
764                 temp_conf.servicing_weight = 1;
765                 queue_conf = &temp_conf;
766         }
767
768         if (dev_info->rx_queue == NULL) {
769                 dev_info->rx_queue =
770                     rte_zmalloc_socket(rx_adapter->mem_name,
771                                        dev_info->dev->data->nb_rx_queues *
772                                        sizeof(struct eth_rx_queue_info), 0,
773                                        rx_adapter->socket_id);
774                 if (dev_info->rx_queue == NULL)
775                         return -ENOMEM;
776         }
777
778         if (rx_queue_id == -1) {
779                 for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++)
780                         event_eth_rx_adapter_queue_add(rx_adapter,
781                                                 dev_info, i,
782                                                 queue_conf);
783         } else {
784                 event_eth_rx_adapter_queue_add(rx_adapter, dev_info,
785                                           (uint16_t)rx_queue_id,
786                                           queue_conf);
787         }
788
789         ret = eth_poll_wrr_calc(rx_adapter);
790         if (ret) {
791                 event_eth_rx_adapter_queue_del(rx_adapter,
792                                         dev_info, rx_queue_id);
793                 return ret;
794         }
795
796         return ret;
797 }
798
799 static int
800 rx_adapter_ctrl(uint8_t id, int start)
801 {
802         struct rte_event_eth_rx_adapter *rx_adapter;
803         struct rte_eventdev *dev;
804         struct eth_device_info *dev_info;
805         uint32_t i;
806         int use_service = 0;
807         int stop = !start;
808
809         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
810         rx_adapter = id_to_rx_adapter(id);
811         if (rx_adapter == NULL)
812                 return -EINVAL;
813
814         dev = &rte_eventdevs[rx_adapter->eventdev_id];
815
816         for (i = 0; i < rte_eth_dev_count(); i++) {
817                 dev_info = &rx_adapter->eth_devices[i];
818                 /* if start  check for num dev queues */
819                 if (start && !dev_info->nb_dev_queues)
820                         continue;
821                 /* if stop check if dev has been started */
822                 if (stop && !dev_info->dev_rx_started)
823                         continue;
824                 use_service |= !dev_info->internal_event_port;
825                 dev_info->dev_rx_started = start;
826                 if (dev_info->internal_event_port == 0)
827                         continue;
828                 start ? (*dev->dev_ops->eth_rx_adapter_start)(dev,
829                                                 &rte_eth_devices[i]) :
830                         (*dev->dev_ops->eth_rx_adapter_stop)(dev,
831                                                 &rte_eth_devices[i]);
832         }
833
834         if (use_service)
835                 rte_service_runstate_set(rx_adapter->service_id, start);
836
837         return 0;
838 }
839
840 int
841 rte_event_eth_rx_adapter_create_ext(uint8_t id, uint8_t dev_id,
842                                 rte_event_eth_rx_adapter_conf_cb conf_cb,
843                                 void *conf_arg)
844 {
845         struct rte_event_eth_rx_adapter *rx_adapter;
846         int ret;
847         int socket_id;
848         uint8_t i;
849         char mem_name[ETH_RX_ADAPTER_SERVICE_NAME_LEN];
850         const uint8_t default_rss_key[] = {
851                 0x6d, 0x5a, 0x56, 0xda, 0x25, 0x5b, 0x0e, 0xc2,
852                 0x41, 0x67, 0x25, 0x3d, 0x43, 0xa3, 0x8f, 0xb0,
853                 0xd0, 0xca, 0x2b, 0xcb, 0xae, 0x7b, 0x30, 0xb4,
854                 0x77, 0xcb, 0x2d, 0xa3, 0x80, 0x30, 0xf2, 0x0c,
855                 0x6a, 0x42, 0xb7, 0x3b, 0xbe, 0xac, 0x01, 0xfa,
856         };
857
858         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
859         RTE_EVENTDEV_VALID_DEVID_OR_ERR_RET(dev_id, -EINVAL);
860         if (conf_cb == NULL)
861                 return -EINVAL;
862
863         if (event_eth_rx_adapter == NULL) {
864                 ret = rte_event_eth_rx_adapter_init();
865                 if (ret)
866                         return ret;
867         }
868
869         rx_adapter = id_to_rx_adapter(id);
870         if (rx_adapter != NULL) {
871                 RTE_EDEV_LOG_ERR("Eth Rx adapter exists id = %" PRIu8, id);
872                 return -EEXIST;
873         }
874
875         socket_id = rte_event_dev_socket_id(dev_id);
876         snprintf(mem_name, ETH_RX_ADAPTER_MEM_NAME_LEN,
877                 "rte_event_eth_rx_adapter_%d",
878                 id);
879
880         rx_adapter = rte_zmalloc_socket(mem_name, sizeof(*rx_adapter),
881                         RTE_CACHE_LINE_SIZE, socket_id);
882         if (rx_adapter == NULL) {
883                 RTE_EDEV_LOG_ERR("failed to get mem for rx adapter");
884                 return -ENOMEM;
885         }
886
887         rx_adapter->eventdev_id = dev_id;
888         rx_adapter->socket_id = socket_id;
889         rx_adapter->conf_cb = conf_cb;
890         rx_adapter->conf_arg = conf_arg;
891         strcpy(rx_adapter->mem_name, mem_name);
892         rx_adapter->eth_devices = rte_zmalloc_socket(rx_adapter->mem_name,
893                                         rte_eth_dev_count() *
894                                         sizeof(struct eth_device_info), 0,
895                                         socket_id);
896         rte_convert_rss_key((const uint32_t *)default_rss_key,
897                         (uint32_t *)rx_adapter->rss_key_be,
898                             RTE_DIM(default_rss_key));
899
900         if (rx_adapter->eth_devices == NULL) {
901                 RTE_EDEV_LOG_ERR("failed to get mem for eth devices\n");
902                 rte_free(rx_adapter);
903                 return -ENOMEM;
904         }
905         rte_spinlock_init(&rx_adapter->rx_lock);
906         for (i = 0; i < rte_eth_dev_count(); i++)
907                 rx_adapter->eth_devices[i].dev = &rte_eth_devices[i];
908
909         event_eth_rx_adapter[id] = rx_adapter;
910         if (conf_cb == default_conf_cb)
911                 rx_adapter->default_cb_arg = 1;
912         return 0;
913 }
914
915 int
916 rte_event_eth_rx_adapter_create(uint8_t id, uint8_t dev_id,
917                 struct rte_event_port_conf *port_config)
918 {
919         struct rte_event_port_conf *pc;
920         int ret;
921
922         if (port_config == NULL)
923                 return -EINVAL;
924         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
925
926         pc = rte_malloc(NULL, sizeof(*pc), 0);
927         if (pc == NULL)
928                 return -ENOMEM;
929         *pc = *port_config;
930         ret = rte_event_eth_rx_adapter_create_ext(id, dev_id,
931                                         default_conf_cb,
932                                         pc);
933         if (ret)
934                 rte_free(pc);
935         return ret;
936 }
937
938 int
939 rte_event_eth_rx_adapter_free(uint8_t id)
940 {
941         struct rte_event_eth_rx_adapter *rx_adapter;
942
943         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
944
945         rx_adapter = id_to_rx_adapter(id);
946         if (rx_adapter == NULL)
947                 return -EINVAL;
948
949         if (rx_adapter->nb_queues) {
950                 RTE_EDEV_LOG_ERR("%" PRIu16 " Rx queues not deleted",
951                                 rx_adapter->nb_queues);
952                 return -EBUSY;
953         }
954
955         if (rx_adapter->default_cb_arg)
956                 rte_free(rx_adapter->conf_arg);
957         rte_free(rx_adapter->eth_devices);
958         rte_free(rx_adapter);
959         event_eth_rx_adapter[id] = NULL;
960
961         return 0;
962 }
963
964 int
965 rte_event_eth_rx_adapter_queue_add(uint8_t id,
966                 uint8_t eth_dev_id,
967                 int32_t rx_queue_id,
968                 const struct rte_event_eth_rx_adapter_queue_conf *queue_conf)
969 {
970         int ret;
971         uint32_t cap;
972         struct rte_event_eth_rx_adapter *rx_adapter;
973         struct rte_eventdev *dev;
974         struct eth_device_info *dev_info;
975         int start_service;
976
977         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
978         RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
979
980         rx_adapter = id_to_rx_adapter(id);
981         if ((rx_adapter == NULL) || (queue_conf == NULL))
982                 return -EINVAL;
983
984         dev = &rte_eventdevs[rx_adapter->eventdev_id];
985         ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
986                                                 eth_dev_id,
987                                                 &cap);
988         if (ret) {
989                 RTE_EDEV_LOG_ERR("Failed to get adapter caps edev %" PRIu8
990                         "eth port %" PRIu8, id, eth_dev_id);
991                 return ret;
992         }
993
994         if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_OVERRIDE_FLOW_ID) == 0
995                 && (queue_conf->rx_queue_flags &
996                         RTE_EVENT_ETH_RX_ADAPTER_QUEUE_FLOW_ID_VALID)) {
997                 RTE_EDEV_LOG_ERR("Flow ID override is not supported,"
998                                 " eth port: %" PRIu8 " adapter id: %" PRIu8,
999                                 eth_dev_id, id);
1000                 return -EINVAL;
1001         }
1002
1003         if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_MULTI_EVENTQ) == 0 &&
1004                 (rx_queue_id != -1)) {
1005                 RTE_EDEV_LOG_ERR("Rx queues can only be connected to single "
1006                         "event queue id %u eth port %u", id, eth_dev_id);
1007                 return -EINVAL;
1008         }
1009
1010         if (rx_queue_id != -1 && (uint16_t)rx_queue_id >=
1011                         rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
1012                 RTE_EDEV_LOG_ERR("Invalid rx queue_id %" PRIu16,
1013                          (uint16_t)rx_queue_id);
1014                 return -EINVAL;
1015         }
1016
1017         start_service = 0;
1018         dev_info = &rx_adapter->eth_devices[eth_dev_id];
1019
1020         if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
1021                 RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->eth_rx_adapter_queue_add,
1022                                         -ENOTSUP);
1023                 if (dev_info->rx_queue == NULL) {
1024                         dev_info->rx_queue =
1025                             rte_zmalloc_socket(rx_adapter->mem_name,
1026                                         dev_info->dev->data->nb_rx_queues *
1027                                         sizeof(struct eth_rx_queue_info), 0,
1028                                         rx_adapter->socket_id);
1029                         if (dev_info->rx_queue == NULL)
1030                                 return -ENOMEM;
1031                 }
1032
1033                 ret = (*dev->dev_ops->eth_rx_adapter_queue_add)(dev,
1034                                 &rte_eth_devices[eth_dev_id],
1035                                 rx_queue_id, queue_conf);
1036                 if (ret == 0) {
1037                         update_queue_info(rx_adapter,
1038                                         &rx_adapter->eth_devices[eth_dev_id],
1039                                         rx_queue_id,
1040                                         1);
1041                 }
1042         } else {
1043                 rte_spinlock_lock(&rx_adapter->rx_lock);
1044                 ret = init_service(rx_adapter, id);
1045                 if (ret == 0)
1046                         ret = add_rx_queue(rx_adapter, eth_dev_id, rx_queue_id,
1047                                         queue_conf);
1048                 rte_spinlock_unlock(&rx_adapter->rx_lock);
1049                 if (ret == 0)
1050                         start_service = !!sw_rx_adapter_queue_count(rx_adapter);
1051         }
1052
1053         if (ret)
1054                 return ret;
1055
1056         if (start_service)
1057                 rte_service_component_runstate_set(rx_adapter->service_id, 1);
1058
1059         return 0;
1060 }
1061
1062 int
1063 rte_event_eth_rx_adapter_queue_del(uint8_t id, uint8_t eth_dev_id,
1064                                 int32_t rx_queue_id)
1065 {
1066         int ret = 0;
1067         struct rte_eventdev *dev;
1068         struct rte_event_eth_rx_adapter *rx_adapter;
1069         struct eth_device_info *dev_info;
1070         uint32_t cap;
1071         uint16_t i;
1072
1073         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
1074         RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
1075
1076         rx_adapter = id_to_rx_adapter(id);
1077         if (rx_adapter == NULL)
1078                 return -EINVAL;
1079
1080         dev = &rte_eventdevs[rx_adapter->eventdev_id];
1081         ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
1082                                                 eth_dev_id,
1083                                                 &cap);
1084         if (ret)
1085                 return ret;
1086
1087         if (rx_queue_id != -1 && (uint16_t)rx_queue_id >=
1088                 rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
1089                 RTE_EDEV_LOG_ERR("Invalid rx queue_id %" PRIu16,
1090                          (uint16_t)rx_queue_id);
1091                 return -EINVAL;
1092         }
1093
1094         dev_info = &rx_adapter->eth_devices[eth_dev_id];
1095
1096         if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
1097                 RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->eth_rx_adapter_queue_del,
1098                                  -ENOTSUP);
1099                 ret = (*dev->dev_ops->eth_rx_adapter_queue_del)(dev,
1100                                                 &rte_eth_devices[eth_dev_id],
1101                                                 rx_queue_id);
1102                 if (ret == 0) {
1103                         update_queue_info(rx_adapter,
1104                                         &rx_adapter->eth_devices[eth_dev_id],
1105                                         rx_queue_id,
1106                                         0);
1107                         if (dev_info->nb_dev_queues == 0) {
1108                                 rte_free(dev_info->rx_queue);
1109                                 dev_info->rx_queue = NULL;
1110                         }
1111                 }
1112         } else {
1113                 int rc;
1114                 rte_spinlock_lock(&rx_adapter->rx_lock);
1115                 if (rx_queue_id == -1) {
1116                         for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++)
1117                                 event_eth_rx_adapter_queue_del(rx_adapter,
1118                                                         dev_info,
1119                                                         i);
1120                 } else {
1121                         event_eth_rx_adapter_queue_del(rx_adapter,
1122                                                 dev_info,
1123                                                 (uint16_t)rx_queue_id);
1124                 }
1125
1126                 rc = eth_poll_wrr_calc(rx_adapter);
1127                 if (rc)
1128                         RTE_EDEV_LOG_ERR("WRR recalculation failed %" PRId32,
1129                                         rc);
1130
1131                 if (dev_info->nb_dev_queues == 0) {
1132                         rte_free(dev_info->rx_queue);
1133                         dev_info->rx_queue = NULL;
1134                 }
1135
1136                 rte_spinlock_unlock(&rx_adapter->rx_lock);
1137                 rte_service_component_runstate_set(rx_adapter->service_id,
1138                                 sw_rx_adapter_queue_count(rx_adapter));
1139         }
1140
1141         return ret;
1142 }
1143
1144
1145 int
1146 rte_event_eth_rx_adapter_start(uint8_t id)
1147 {
1148         return rx_adapter_ctrl(id, 1);
1149 }
1150
1151 int
1152 rte_event_eth_rx_adapter_stop(uint8_t id)
1153 {
1154         return rx_adapter_ctrl(id, 0);
1155 }
1156
1157 int
1158 rte_event_eth_rx_adapter_stats_get(uint8_t id,
1159                                struct rte_event_eth_rx_adapter_stats *stats)
1160 {
1161         struct rte_event_eth_rx_adapter *rx_adapter;
1162         struct rte_event_eth_rx_adapter_stats dev_stats_sum = { 0 };
1163         struct rte_event_eth_rx_adapter_stats dev_stats;
1164         struct rte_eventdev *dev;
1165         struct eth_device_info *dev_info;
1166         uint32_t i;
1167         int ret;
1168
1169         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
1170
1171         rx_adapter = id_to_rx_adapter(id);
1172         if (rx_adapter  == NULL || stats == NULL)
1173                 return -EINVAL;
1174
1175         dev = &rte_eventdevs[rx_adapter->eventdev_id];
1176         memset(stats, 0, sizeof(*stats));
1177         for (i = 0; i < rte_eth_dev_count(); i++) {
1178                 dev_info = &rx_adapter->eth_devices[i];
1179                 if (dev_info->internal_event_port == 0 ||
1180                         dev->dev_ops->eth_rx_adapter_stats_get == NULL)
1181                         continue;
1182                 ret = (*dev->dev_ops->eth_rx_adapter_stats_get)(dev,
1183                                                 &rte_eth_devices[i],
1184                                                 &dev_stats);
1185                 if (ret)
1186                         continue;
1187                 dev_stats_sum.rx_packets += dev_stats.rx_packets;
1188                 dev_stats_sum.rx_enq_count += dev_stats.rx_enq_count;
1189         }
1190
1191         if (rx_adapter->service_inited)
1192                 *stats = rx_adapter->stats;
1193
1194         stats->rx_packets += dev_stats_sum.rx_packets;
1195         stats->rx_enq_count += dev_stats_sum.rx_enq_count;
1196         return 0;
1197 }
1198
1199 int
1200 rte_event_eth_rx_adapter_stats_reset(uint8_t id)
1201 {
1202         struct rte_event_eth_rx_adapter *rx_adapter;
1203         struct rte_eventdev *dev;
1204         struct eth_device_info *dev_info;
1205         uint32_t i;
1206
1207         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
1208
1209         rx_adapter = id_to_rx_adapter(id);
1210         if (rx_adapter == NULL)
1211                 return -EINVAL;
1212
1213         dev = &rte_eventdevs[rx_adapter->eventdev_id];
1214         for (i = 0; i < rte_eth_dev_count(); i++) {
1215                 dev_info = &rx_adapter->eth_devices[i];
1216                 if (dev_info->internal_event_port == 0 ||
1217                         dev->dev_ops->eth_rx_adapter_stats_reset == NULL)
1218                         continue;
1219                 (*dev->dev_ops->eth_rx_adapter_stats_reset)(dev,
1220                                                         &rte_eth_devices[i]);
1221         }
1222
1223         memset(&rx_adapter->stats, 0, sizeof(rx_adapter->stats));
1224         return 0;
1225 }
1226
1227 int
1228 rte_event_eth_rx_adapter_service_id_get(uint8_t id, uint32_t *service_id)
1229 {
1230         struct rte_event_eth_rx_adapter *rx_adapter;
1231
1232         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
1233
1234         rx_adapter = id_to_rx_adapter(id);
1235         if (rx_adapter == NULL || service_id == NULL)
1236                 return -EINVAL;
1237
1238         if (rx_adapter->service_inited)
1239                 *service_id = rx_adapter->service_id;
1240
1241         return rx_adapter->service_inited ? 0 : -ESRCH;
1242 }