New upstream version 17.11-rc3
[deb_dpdk.git] / lib / librte_eventdev / rte_event_eth_rx_adapter.c
1 #include <rte_cycles.h>
2 #include <rte_common.h>
3 #include <rte_dev.h>
4 #include <rte_errno.h>
5 #include <rte_ethdev.h>
6 #include <rte_log.h>
7 #include <rte_malloc.h>
8 #include <rte_service_component.h>
9 #include <rte_thash.h>
10
11 #include "rte_eventdev.h"
12 #include "rte_eventdev_pmd.h"
13 #include "rte_event_eth_rx_adapter.h"
14
15 #define BATCH_SIZE              32
16 #define BLOCK_CNT_THRESHOLD     10
17 #define ETH_EVENT_BUFFER_SIZE   (4*BATCH_SIZE)
18
19 #define ETH_RX_ADAPTER_SERVICE_NAME_LEN 32
20 #define ETH_RX_ADAPTER_MEM_NAME_LEN     32
21
22 #define RSS_KEY_SIZE    40
23
24 /*
25  * There is an instance of this struct per polled Rx queue added to the
26  * adapter
27  */
28 struct eth_rx_poll_entry {
29         /* Eth port to poll */
30         uint8_t eth_dev_id;
31         /* Eth rx queue to poll */
32         uint16_t eth_rx_qid;
33 };
34
35 /* Instance per adapter */
36 struct rte_eth_event_enqueue_buffer {
37         /* Count of events in this buffer */
38         uint16_t count;
39         /* Array of events in this buffer */
40         struct rte_event events[ETH_EVENT_BUFFER_SIZE];
41 };
42
43 struct rte_event_eth_rx_adapter {
44         /* RSS key */
45         uint8_t rss_key_be[RSS_KEY_SIZE];
46         /* Event device identifier */
47         uint8_t eventdev_id;
48         /* Per ethernet device structure */
49         struct eth_device_info *eth_devices;
50         /* Event port identifier */
51         uint8_t event_port_id;
52         /* Lock to serialize config updates with service function */
53         rte_spinlock_t rx_lock;
54         /* Max mbufs processed in any service function invocation */
55         uint32_t max_nb_rx;
56         /* Receive queues that need to be polled */
57         struct eth_rx_poll_entry *eth_rx_poll;
58         /* Size of the eth_rx_poll array */
59         uint16_t num_rx_polled;
60         /* Weighted round robin schedule */
61         uint32_t *wrr_sched;
62         /* wrr_sched[] size */
63         uint32_t wrr_len;
64         /* Next entry in wrr[] to begin polling */
65         uint32_t wrr_pos;
66         /* Event burst buffer */
67         struct rte_eth_event_enqueue_buffer event_enqueue_buffer;
68         /* Per adapter stats */
69         struct rte_event_eth_rx_adapter_stats stats;
70         /* Block count, counts up to BLOCK_CNT_THRESHOLD */
71         uint16_t enq_block_count;
72         /* Block start ts */
73         uint64_t rx_enq_block_start_ts;
74         /* Configuration callback for rte_service configuration */
75         rte_event_eth_rx_adapter_conf_cb conf_cb;
76         /* Configuration callback argument */
77         void *conf_arg;
78         /* Set if  default_cb is being used */
79         int default_cb_arg;
80         /* Service initialization state */
81         uint8_t service_inited;
82         /* Total count of Rx queues in adapter */
83         uint32_t nb_queues;
84         /* Memory allocation name */
85         char mem_name[ETH_RX_ADAPTER_MEM_NAME_LEN];
86         /* Socket identifier cached from eventdev */
87         int socket_id;
88         /* Per adapter EAL service */
89         uint32_t service_id;
90 } __rte_cache_aligned;
91
92 /* Per eth device */
93 struct eth_device_info {
94         struct rte_eth_dev *dev;
95         struct eth_rx_queue_info *rx_queue;
96         /* Set if ethdev->eventdev packet transfer uses a
97          * hardware mechanism
98          */
99         uint8_t internal_event_port;
100         /* Set if the adapter is processing rx queues for
101          * this eth device and packet processing has been
102          * started, allows for the code to know if the PMD
103          * rx_adapter_stop callback needs to be invoked
104          */
105         uint8_t dev_rx_started;
106         /* If nb_dev_queues > 0, the start callback will
107          * be invoked if not already invoked
108          */
109         uint16_t nb_dev_queues;
110 };
111
112 /* Per Rx queue */
113 struct eth_rx_queue_info {
114         int queue_enabled;      /* True if added */
115         uint16_t wt;            /* Polling weight */
116         uint8_t event_queue_id; /* Event queue to enqueue packets to */
117         uint8_t sched_type;     /* Sched type for events */
118         uint8_t priority;       /* Event priority */
119         uint32_t flow_id;       /* App provided flow identifier */
120         uint32_t flow_id_mask;  /* Set to ~0 if app provides flow id else 0 */
121 };
122
123 static struct rte_event_eth_rx_adapter **event_eth_rx_adapter;
124
125 static inline int
126 valid_id(uint8_t id)
127 {
128         return id < RTE_EVENT_ETH_RX_ADAPTER_MAX_INSTANCE;
129 }
130
131 #define RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, retval) do { \
132         if (!valid_id(id)) { \
133                 RTE_EDEV_LOG_ERR("Invalid eth Rx adapter id = %d\n", id); \
134                 return retval; \
135         } \
136 } while (0)
137
138 static inline int
139 sw_rx_adapter_queue_count(struct rte_event_eth_rx_adapter *rx_adapter)
140 {
141         return rx_adapter->num_rx_polled;
142 }
143
144 /* Greatest common divisor */
145 static uint16_t gcd_u16(uint16_t a, uint16_t b)
146 {
147         uint16_t r = a % b;
148
149         return r ? gcd_u16(b, r) : b;
150 }
151
152 /* Returns the next queue in the polling sequence
153  *
154  * http://kb.linuxvirtualserver.org/wiki/Weighted_Round-Robin_Scheduling
155  */
156 static int
157 wrr_next(struct rte_event_eth_rx_adapter *rx_adapter,
158          unsigned int n, int *cw,
159          struct eth_rx_poll_entry *eth_rx_poll, uint16_t max_wt,
160          uint16_t gcd, int prev)
161 {
162         int i = prev;
163         uint16_t w;
164
165         while (1) {
166                 uint16_t q;
167                 uint8_t d;
168
169                 i = (i + 1) % n;
170                 if (i == 0) {
171                         *cw = *cw - gcd;
172                         if (*cw <= 0)
173                                 *cw = max_wt;
174                 }
175
176                 q = eth_rx_poll[i].eth_rx_qid;
177                 d = eth_rx_poll[i].eth_dev_id;
178                 w = rx_adapter->eth_devices[d].rx_queue[q].wt;
179
180                 if ((int)w >= *cw)
181                         return i;
182         }
183 }
184
185 /* Precalculate WRR polling sequence for all queues in rx_adapter */
186 static int
187 eth_poll_wrr_calc(struct rte_event_eth_rx_adapter *rx_adapter)
188 {
189         uint8_t d;
190         uint16_t q;
191         unsigned int i;
192
193         /* Initialize variables for calculation of wrr schedule */
194         uint16_t max_wrr_pos = 0;
195         unsigned int poll_q = 0;
196         uint16_t max_wt = 0;
197         uint16_t gcd = 0;
198
199         struct eth_rx_poll_entry *rx_poll = NULL;
200         uint32_t *rx_wrr = NULL;
201
202         if (rx_adapter->num_rx_polled) {
203                 size_t len = RTE_ALIGN(rx_adapter->num_rx_polled *
204                                 sizeof(*rx_adapter->eth_rx_poll),
205                                 RTE_CACHE_LINE_SIZE);
206                 rx_poll = rte_zmalloc_socket(rx_adapter->mem_name,
207                                              len,
208                                              RTE_CACHE_LINE_SIZE,
209                                              rx_adapter->socket_id);
210                 if (rx_poll == NULL)
211                         return -ENOMEM;
212
213                 /* Generate array of all queues to poll, the size of this
214                  * array is poll_q
215                  */
216                 for (d = 0; d < rte_eth_dev_count(); d++) {
217                         uint16_t nb_rx_queues;
218                         struct eth_device_info *dev_info =
219                                         &rx_adapter->eth_devices[d];
220                         nb_rx_queues = dev_info->dev->data->nb_rx_queues;
221                         if (dev_info->rx_queue == NULL)
222                                 continue;
223                         for (q = 0; q < nb_rx_queues; q++) {
224                                 struct eth_rx_queue_info *queue_info =
225                                         &dev_info->rx_queue[q];
226                                 if (queue_info->queue_enabled == 0)
227                                         continue;
228
229                                 uint16_t wt = queue_info->wt;
230                                 rx_poll[poll_q].eth_dev_id = d;
231                                 rx_poll[poll_q].eth_rx_qid = q;
232                                 max_wrr_pos += wt;
233                                 max_wt = RTE_MAX(max_wt, wt);
234                                 gcd = (gcd) ? gcd_u16(gcd, wt) : wt;
235                                 poll_q++;
236                         }
237                 }
238
239                 len = RTE_ALIGN(max_wrr_pos * sizeof(*rx_wrr),
240                                 RTE_CACHE_LINE_SIZE);
241                 rx_wrr = rte_zmalloc_socket(rx_adapter->mem_name,
242                                             len,
243                                             RTE_CACHE_LINE_SIZE,
244                                             rx_adapter->socket_id);
245                 if (rx_wrr == NULL) {
246                         rte_free(rx_poll);
247                         return -ENOMEM;
248                 }
249
250                 /* Generate polling sequence based on weights */
251                 int prev = -1;
252                 int cw = -1;
253                 for (i = 0; i < max_wrr_pos; i++) {
254                         rx_wrr[i] = wrr_next(rx_adapter, poll_q, &cw,
255                                              rx_poll, max_wt, gcd, prev);
256                         prev = rx_wrr[i];
257                 }
258         }
259
260         rte_free(rx_adapter->eth_rx_poll);
261         rte_free(rx_adapter->wrr_sched);
262
263         rx_adapter->eth_rx_poll = rx_poll;
264         rx_adapter->wrr_sched = rx_wrr;
265         rx_adapter->wrr_len = max_wrr_pos;
266
267         return 0;
268 }
269
270 static inline void
271 mtoip(struct rte_mbuf *m, struct ipv4_hdr **ipv4_hdr,
272         struct ipv6_hdr **ipv6_hdr)
273 {
274         struct ether_hdr *eth_hdr = rte_pktmbuf_mtod(m, struct ether_hdr *);
275         struct vlan_hdr *vlan_hdr;
276
277         *ipv4_hdr = NULL;
278         *ipv6_hdr = NULL;
279
280         switch (eth_hdr->ether_type) {
281         case RTE_BE16(ETHER_TYPE_IPv4):
282                 *ipv4_hdr = (struct ipv4_hdr *)(eth_hdr + 1);
283                 break;
284
285         case RTE_BE16(ETHER_TYPE_IPv6):
286                 *ipv6_hdr = (struct ipv6_hdr *)(eth_hdr + 1);
287                 break;
288
289         case RTE_BE16(ETHER_TYPE_VLAN):
290                 vlan_hdr = (struct vlan_hdr *)(eth_hdr + 1);
291                 switch (vlan_hdr->eth_proto) {
292                 case RTE_BE16(ETHER_TYPE_IPv4):
293                         *ipv4_hdr = (struct ipv4_hdr *)(vlan_hdr + 1);
294                         break;
295                 case RTE_BE16(ETHER_TYPE_IPv6):
296                         *ipv6_hdr = (struct ipv6_hdr *)(vlan_hdr + 1);
297                         break;
298                 default:
299                         break;
300                 }
301                 break;
302
303         default:
304                 break;
305         }
306 }
307
308 /* Calculate RSS hash for IPv4/6 */
309 static inline uint32_t
310 do_softrss(struct rte_mbuf *m, const uint8_t *rss_key_be)
311 {
312         uint32_t input_len;
313         void *tuple;
314         struct rte_ipv4_tuple ipv4_tuple;
315         struct rte_ipv6_tuple ipv6_tuple;
316         struct ipv4_hdr *ipv4_hdr;
317         struct ipv6_hdr *ipv6_hdr;
318
319         mtoip(m, &ipv4_hdr, &ipv6_hdr);
320
321         if (ipv4_hdr) {
322                 ipv4_tuple.src_addr = rte_be_to_cpu_32(ipv4_hdr->src_addr);
323                 ipv4_tuple.dst_addr = rte_be_to_cpu_32(ipv4_hdr->dst_addr);
324                 tuple = &ipv4_tuple;
325                 input_len = RTE_THASH_V4_L3_LEN;
326         } else if (ipv6_hdr) {
327                 rte_thash_load_v6_addrs(ipv6_hdr,
328                                         (union rte_thash_tuple *)&ipv6_tuple);
329                 tuple = &ipv6_tuple;
330                 input_len = RTE_THASH_V6_L3_LEN;
331         } else
332                 return 0;
333
334         return rte_softrss_be(tuple, input_len, rss_key_be);
335 }
336
337 static inline int
338 rx_enq_blocked(struct rte_event_eth_rx_adapter *rx_adapter)
339 {
340         return !!rx_adapter->enq_block_count;
341 }
342
343 static inline void
344 rx_enq_block_start_ts(struct rte_event_eth_rx_adapter *rx_adapter)
345 {
346         if (rx_adapter->rx_enq_block_start_ts)
347                 return;
348
349         rx_adapter->enq_block_count++;
350         if (rx_adapter->enq_block_count < BLOCK_CNT_THRESHOLD)
351                 return;
352
353         rx_adapter->rx_enq_block_start_ts = rte_get_tsc_cycles();
354 }
355
356 static inline void
357 rx_enq_block_end_ts(struct rte_event_eth_rx_adapter *rx_adapter,
358                     struct rte_event_eth_rx_adapter_stats *stats)
359 {
360         if (unlikely(!stats->rx_enq_start_ts))
361                 stats->rx_enq_start_ts = rte_get_tsc_cycles();
362
363         if (likely(!rx_enq_blocked(rx_adapter)))
364                 return;
365
366         rx_adapter->enq_block_count = 0;
367         if (rx_adapter->rx_enq_block_start_ts) {
368                 stats->rx_enq_end_ts = rte_get_tsc_cycles();
369                 stats->rx_enq_block_cycles += stats->rx_enq_end_ts -
370                     rx_adapter->rx_enq_block_start_ts;
371                 rx_adapter->rx_enq_block_start_ts = 0;
372         }
373 }
374
375 /* Add event to buffer, free space check is done prior to calling
376  * this function
377  */
378 static inline void
379 buf_event_enqueue(struct rte_event_eth_rx_adapter *rx_adapter,
380                   struct rte_event *ev)
381 {
382         struct rte_eth_event_enqueue_buffer *buf =
383             &rx_adapter->event_enqueue_buffer;
384         rte_memcpy(&buf->events[buf->count++], ev, sizeof(struct rte_event));
385 }
386
387 /* Enqueue buffered events to event device */
388 static inline uint16_t
389 flush_event_buffer(struct rte_event_eth_rx_adapter *rx_adapter)
390 {
391         struct rte_eth_event_enqueue_buffer *buf =
392             &rx_adapter->event_enqueue_buffer;
393         struct rte_event_eth_rx_adapter_stats *stats = &rx_adapter->stats;
394
395         uint16_t n = rte_event_enqueue_new_burst(rx_adapter->eventdev_id,
396                                         rx_adapter->event_port_id,
397                                         buf->events,
398                                         buf->count);
399         if (n != buf->count) {
400                 memmove(buf->events,
401                         &buf->events[n],
402                         (buf->count - n) * sizeof(struct rte_event));
403                 stats->rx_enq_retry++;
404         }
405
406         n ? rx_enq_block_end_ts(rx_adapter, stats) :
407                 rx_enq_block_start_ts(rx_adapter);
408
409         buf->count -= n;
410         stats->rx_enq_count += n;
411
412         return n;
413 }
414
415 static inline void
416 fill_event_buffer(struct rte_event_eth_rx_adapter *rx_adapter,
417         uint8_t dev_id,
418         uint16_t rx_queue_id,
419         struct rte_mbuf **mbufs,
420         uint16_t num)
421 {
422         uint32_t i;
423         struct eth_device_info *eth_device_info =
424                                         &rx_adapter->eth_devices[dev_id];
425         struct eth_rx_queue_info *eth_rx_queue_info =
426                                         &eth_device_info->rx_queue[rx_queue_id];
427
428         int32_t qid = eth_rx_queue_info->event_queue_id;
429         uint8_t sched_type = eth_rx_queue_info->sched_type;
430         uint8_t priority = eth_rx_queue_info->priority;
431         uint32_t flow_id;
432         struct rte_event events[BATCH_SIZE];
433         struct rte_mbuf *m = mbufs[0];
434         uint32_t rss_mask;
435         uint32_t rss;
436         int do_rss;
437
438         /* 0xffff ffff if PKT_RX_RSS_HASH is set, otherwise 0 */
439         rss_mask = ~(((m->ol_flags & PKT_RX_RSS_HASH) != 0) - 1);
440         do_rss = !rss_mask && !eth_rx_queue_info->flow_id_mask;
441
442         for (i = 0; i < num; i++) {
443                 m = mbufs[i];
444                 struct rte_event *ev = &events[i];
445
446                 rss = do_rss ?
447                         do_softrss(m, rx_adapter->rss_key_be) : m->hash.rss;
448                 flow_id =
449                     eth_rx_queue_info->flow_id &
450                                 eth_rx_queue_info->flow_id_mask;
451                 flow_id |= rss & ~eth_rx_queue_info->flow_id_mask;
452
453                 ev->flow_id = flow_id;
454                 ev->op = RTE_EVENT_OP_NEW;
455                 ev->sched_type = sched_type;
456                 ev->queue_id = qid;
457                 ev->event_type = RTE_EVENT_TYPE_ETH_RX_ADAPTER;
458                 ev->sub_event_type = 0;
459                 ev->priority = priority;
460                 ev->mbuf = m;
461
462                 buf_event_enqueue(rx_adapter, ev);
463         }
464 }
465
466 /*
467  * Polls receive queues added to the event adapter and enqueues received
468  * packets to the event device.
469  *
470  * The receive code enqueues initially to a temporary buffer, the
471  * temporary buffer is drained anytime it holds >= BATCH_SIZE packets
472  *
473  * If there isn't space available in the temporary buffer, packets from the
474  * Rx queue aren't dequeued from the eth device, this back pressures the
475  * eth device, in virtual device environments this back pressure is relayed to
476  * the hypervisor's switching layer where adjustments can be made to deal with
477  * it.
478  */
479 static inline uint32_t
480 eth_rx_poll(struct rte_event_eth_rx_adapter *rx_adapter)
481 {
482         uint32_t num_queue;
483         uint16_t n;
484         uint32_t nb_rx = 0;
485         struct rte_mbuf *mbufs[BATCH_SIZE];
486         struct rte_eth_event_enqueue_buffer *buf;
487         uint32_t wrr_pos;
488         uint32_t max_nb_rx;
489
490         wrr_pos = rx_adapter->wrr_pos;
491         max_nb_rx = rx_adapter->max_nb_rx;
492         buf = &rx_adapter->event_enqueue_buffer;
493         struct rte_event_eth_rx_adapter_stats *stats = &rx_adapter->stats;
494
495         /* Iterate through a WRR sequence */
496         for (num_queue = 0; num_queue < rx_adapter->wrr_len; num_queue++) {
497                 unsigned int poll_idx = rx_adapter->wrr_sched[wrr_pos];
498                 uint16_t qid = rx_adapter->eth_rx_poll[poll_idx].eth_rx_qid;
499                 uint8_t d = rx_adapter->eth_rx_poll[poll_idx].eth_dev_id;
500
501                 /* Don't do a batch dequeue from the rx queue if there isn't
502                  * enough space in the enqueue buffer.
503                  */
504                 if (buf->count >= BATCH_SIZE)
505                         flush_event_buffer(rx_adapter);
506                 if (BATCH_SIZE > (ETH_EVENT_BUFFER_SIZE - buf->count))
507                         break;
508
509                 stats->rx_poll_count++;
510                 n = rte_eth_rx_burst(d, qid, mbufs, BATCH_SIZE);
511
512                 if (n) {
513                         stats->rx_packets += n;
514                         /* The check before rte_eth_rx_burst() ensures that
515                          * all n mbufs can be buffered
516                          */
517                         fill_event_buffer(rx_adapter, d, qid, mbufs, n);
518                         nb_rx += n;
519                         if (nb_rx > max_nb_rx) {
520                                 rx_adapter->wrr_pos =
521                                     (wrr_pos + 1) % rx_adapter->wrr_len;
522                                 return nb_rx;
523                         }
524                 }
525
526                 if (++wrr_pos == rx_adapter->wrr_len)
527                         wrr_pos = 0;
528         }
529
530         return nb_rx;
531 }
532
533 static int
534 event_eth_rx_adapter_service_func(void *args)
535 {
536         struct rte_event_eth_rx_adapter *rx_adapter = args;
537         struct rte_eth_event_enqueue_buffer *buf;
538
539         buf = &rx_adapter->event_enqueue_buffer;
540         if (rte_spinlock_trylock(&rx_adapter->rx_lock) == 0)
541                 return 0;
542         if (eth_rx_poll(rx_adapter) == 0 && buf->count)
543                 flush_event_buffer(rx_adapter);
544         rte_spinlock_unlock(&rx_adapter->rx_lock);
545         return 0;
546 }
547
548 static int
549 rte_event_eth_rx_adapter_init(void)
550 {
551         const char *name = "rte_event_eth_rx_adapter_array";
552         const struct rte_memzone *mz;
553         unsigned int sz;
554
555         sz = sizeof(*event_eth_rx_adapter) *
556             RTE_EVENT_ETH_RX_ADAPTER_MAX_INSTANCE;
557         sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);
558
559         mz = rte_memzone_lookup(name);
560         if (mz == NULL) {
561                 mz = rte_memzone_reserve_aligned(name, sz, rte_socket_id(), 0,
562                                                  RTE_CACHE_LINE_SIZE);
563                 if (mz == NULL) {
564                         RTE_EDEV_LOG_ERR("failed to reserve memzone err = %"
565                                         PRId32, rte_errno);
566                         return -rte_errno;
567                 }
568         }
569
570         event_eth_rx_adapter = mz->addr;
571         return 0;
572 }
573
574 static inline struct rte_event_eth_rx_adapter *
575 id_to_rx_adapter(uint8_t id)
576 {
577         return event_eth_rx_adapter ?
578                 event_eth_rx_adapter[id] : NULL;
579 }
580
581 static int
582 default_conf_cb(uint8_t id, uint8_t dev_id,
583                 struct rte_event_eth_rx_adapter_conf *conf, void *arg)
584 {
585         int ret;
586         struct rte_eventdev *dev;
587         struct rte_event_dev_config dev_conf;
588         int started;
589         uint8_t port_id;
590         struct rte_event_port_conf *port_conf = arg;
591         struct rte_event_eth_rx_adapter *rx_adapter = id_to_rx_adapter(id);
592
593         dev = &rte_eventdevs[rx_adapter->eventdev_id];
594         dev_conf = dev->data->dev_conf;
595
596         started = dev->data->dev_started;
597         if (started)
598                 rte_event_dev_stop(dev_id);
599         port_id = dev_conf.nb_event_ports;
600         dev_conf.nb_event_ports += 1;
601         ret = rte_event_dev_configure(dev_id, &dev_conf);
602         if (ret) {
603                 RTE_EDEV_LOG_ERR("failed to configure event dev %u\n",
604                                                 dev_id);
605                 if (started)
606                         rte_event_dev_start(dev_id);
607                 return ret;
608         }
609
610         ret = rte_event_port_setup(dev_id, port_id, port_conf);
611         if (ret) {
612                 RTE_EDEV_LOG_ERR("failed to setup event port %u\n",
613                                         port_id);
614                 return ret;
615         }
616
617         conf->event_port_id = port_id;
618         conf->max_nb_rx = 128;
619         if (started)
620                 rte_event_dev_start(dev_id);
621         rx_adapter->default_cb_arg = 1;
622         return ret;
623 }
624
625 static int
626 init_service(struct rte_event_eth_rx_adapter *rx_adapter, uint8_t id)
627 {
628         int ret;
629         struct rte_service_spec service;
630         struct rte_event_eth_rx_adapter_conf rx_adapter_conf;
631
632         if (rx_adapter->service_inited)
633                 return 0;
634
635         memset(&service, 0, sizeof(service));
636         snprintf(service.name, ETH_RX_ADAPTER_SERVICE_NAME_LEN,
637                 "rte_event_eth_rx_adapter_%d", id);
638         service.socket_id = rx_adapter->socket_id;
639         service.callback = event_eth_rx_adapter_service_func;
640         service.callback_userdata = rx_adapter;
641         /* Service function handles locking for queue add/del updates */
642         service.capabilities = RTE_SERVICE_CAP_MT_SAFE;
643         ret = rte_service_component_register(&service, &rx_adapter->service_id);
644         if (ret) {
645                 RTE_EDEV_LOG_ERR("failed to register service %s err = %" PRId32,
646                         service.name, ret);
647                 return ret;
648         }
649
650         ret = rx_adapter->conf_cb(id, rx_adapter->eventdev_id,
651                 &rx_adapter_conf, rx_adapter->conf_arg);
652         if (ret) {
653                 RTE_EDEV_LOG_ERR("configuration callback failed err = %" PRId32,
654                         ret);
655                 goto err_done;
656         }
657         rx_adapter->event_port_id = rx_adapter_conf.event_port_id;
658         rx_adapter->max_nb_rx = rx_adapter_conf.max_nb_rx;
659         rx_adapter->service_inited = 1;
660         return 0;
661
662 err_done:
663         rte_service_component_unregister(rx_adapter->service_id);
664         return ret;
665 }
666
667
668 static void
669 update_queue_info(struct rte_event_eth_rx_adapter *rx_adapter,
670                 struct eth_device_info *dev_info,
671                 int32_t rx_queue_id,
672                 uint8_t add)
673 {
674         struct eth_rx_queue_info *queue_info;
675         int enabled;
676         uint16_t i;
677
678         if (dev_info->rx_queue == NULL)
679                 return;
680
681         if (rx_queue_id == -1) {
682                 for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++)
683                         update_queue_info(rx_adapter, dev_info, i, add);
684         } else {
685                 queue_info = &dev_info->rx_queue[rx_queue_id];
686                 enabled = queue_info->queue_enabled;
687                 if (add) {
688                         rx_adapter->nb_queues += !enabled;
689                         dev_info->nb_dev_queues += !enabled;
690                 } else {
691                         rx_adapter->nb_queues -= enabled;
692                         dev_info->nb_dev_queues -= enabled;
693                 }
694                 queue_info->queue_enabled = !!add;
695         }
696 }
697
698 static int
699 event_eth_rx_adapter_queue_del(struct rte_event_eth_rx_adapter *rx_adapter,
700                             struct eth_device_info *dev_info,
701                             uint16_t rx_queue_id)
702 {
703         struct eth_rx_queue_info *queue_info;
704
705         if (rx_adapter->nb_queues == 0)
706                 return 0;
707
708         queue_info = &dev_info->rx_queue[rx_queue_id];
709         rx_adapter->num_rx_polled -= queue_info->queue_enabled;
710         update_queue_info(rx_adapter, dev_info, rx_queue_id, 0);
711         return 0;
712 }
713
714 static void
715 event_eth_rx_adapter_queue_add(struct rte_event_eth_rx_adapter *rx_adapter,
716                 struct eth_device_info *dev_info,
717                 uint16_t rx_queue_id,
718                 const struct rte_event_eth_rx_adapter_queue_conf *conf)
719
720 {
721         struct eth_rx_queue_info *queue_info;
722         const struct rte_event *ev = &conf->ev;
723
724         queue_info = &dev_info->rx_queue[rx_queue_id];
725         queue_info->event_queue_id = ev->queue_id;
726         queue_info->sched_type = ev->sched_type;
727         queue_info->priority = ev->priority;
728         queue_info->wt = conf->servicing_weight;
729
730         if (conf->rx_queue_flags &
731                         RTE_EVENT_ETH_RX_ADAPTER_QUEUE_FLOW_ID_VALID) {
732                 queue_info->flow_id = ev->flow_id;
733                 queue_info->flow_id_mask = ~0;
734         }
735
736         /* The same queue can be added more than once */
737         rx_adapter->num_rx_polled += !queue_info->queue_enabled;
738         update_queue_info(rx_adapter, dev_info, rx_queue_id, 1);
739 }
740
741 static int add_rx_queue(struct rte_event_eth_rx_adapter *rx_adapter,
742                 uint8_t eth_dev_id,
743                 int rx_queue_id,
744                 const struct rte_event_eth_rx_adapter_queue_conf *queue_conf)
745 {
746         struct eth_device_info *dev_info = &rx_adapter->eth_devices[eth_dev_id];
747         struct rte_event_eth_rx_adapter_queue_conf temp_conf;
748         uint32_t i;
749         int ret;
750
751         if (queue_conf->servicing_weight == 0) {
752
753                 struct rte_eth_dev_data *data = dev_info->dev->data;
754                 if (data->dev_conf.intr_conf.rxq) {
755                         RTE_EDEV_LOG_ERR("Interrupt driven queues"
756                                         " not supported");
757                         return -ENOTSUP;
758                 }
759                 temp_conf = *queue_conf;
760
761                 /* If Rx interrupts are disabled set wt = 1 */
762                 temp_conf.servicing_weight = 1;
763                 queue_conf = &temp_conf;
764         }
765
766         if (dev_info->rx_queue == NULL) {
767                 dev_info->rx_queue =
768                     rte_zmalloc_socket(rx_adapter->mem_name,
769                                        dev_info->dev->data->nb_rx_queues *
770                                        sizeof(struct eth_rx_queue_info), 0,
771                                        rx_adapter->socket_id);
772                 if (dev_info->rx_queue == NULL)
773                         return -ENOMEM;
774         }
775
776         if (rx_queue_id == -1) {
777                 for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++)
778                         event_eth_rx_adapter_queue_add(rx_adapter,
779                                                 dev_info, i,
780                                                 queue_conf);
781         } else {
782                 event_eth_rx_adapter_queue_add(rx_adapter, dev_info,
783                                           (uint16_t)rx_queue_id,
784                                           queue_conf);
785         }
786
787         ret = eth_poll_wrr_calc(rx_adapter);
788         if (ret) {
789                 event_eth_rx_adapter_queue_del(rx_adapter,
790                                         dev_info, rx_queue_id);
791                 return ret;
792         }
793
794         return ret;
795 }
796
797 static int
798 rx_adapter_ctrl(uint8_t id, int start)
799 {
800         struct rte_event_eth_rx_adapter *rx_adapter;
801         struct rte_eventdev *dev;
802         struct eth_device_info *dev_info;
803         uint32_t i;
804         int use_service = 0;
805         int stop = !start;
806
807         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
808         rx_adapter = id_to_rx_adapter(id);
809         if (rx_adapter == NULL)
810                 return -EINVAL;
811
812         dev = &rte_eventdevs[rx_adapter->eventdev_id];
813
814         for (i = 0; i < rte_eth_dev_count(); i++) {
815                 dev_info = &rx_adapter->eth_devices[i];
816                 /* if start  check for num dev queues */
817                 if (start && !dev_info->nb_dev_queues)
818                         continue;
819                 /* if stop check if dev has been started */
820                 if (stop && !dev_info->dev_rx_started)
821                         continue;
822                 use_service |= !dev_info->internal_event_port;
823                 dev_info->dev_rx_started = start;
824                 if (dev_info->internal_event_port == 0)
825                         continue;
826                 start ? (*dev->dev_ops->eth_rx_adapter_start)(dev,
827                                                 &rte_eth_devices[i]) :
828                         (*dev->dev_ops->eth_rx_adapter_stop)(dev,
829                                                 &rte_eth_devices[i]);
830         }
831
832         if (use_service)
833                 rte_service_runstate_set(rx_adapter->service_id, start);
834
835         return 0;
836 }
837
838 int
839 rte_event_eth_rx_adapter_create_ext(uint8_t id, uint8_t dev_id,
840                                 rte_event_eth_rx_adapter_conf_cb conf_cb,
841                                 void *conf_arg)
842 {
843         struct rte_event_eth_rx_adapter *rx_adapter;
844         int ret;
845         int socket_id;
846         uint8_t i;
847         char mem_name[ETH_RX_ADAPTER_SERVICE_NAME_LEN];
848         const uint8_t default_rss_key[] = {
849                 0x6d, 0x5a, 0x56, 0xda, 0x25, 0x5b, 0x0e, 0xc2,
850                 0x41, 0x67, 0x25, 0x3d, 0x43, 0xa3, 0x8f, 0xb0,
851                 0xd0, 0xca, 0x2b, 0xcb, 0xae, 0x7b, 0x30, 0xb4,
852                 0x77, 0xcb, 0x2d, 0xa3, 0x80, 0x30, 0xf2, 0x0c,
853                 0x6a, 0x42, 0xb7, 0x3b, 0xbe, 0xac, 0x01, 0xfa,
854         };
855
856         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
857         RTE_EVENTDEV_VALID_DEVID_OR_ERR_RET(dev_id, -EINVAL);
858         if (conf_cb == NULL)
859                 return -EINVAL;
860
861         if (event_eth_rx_adapter == NULL) {
862                 ret = rte_event_eth_rx_adapter_init();
863                 if (ret)
864                         return ret;
865         }
866
867         rx_adapter = id_to_rx_adapter(id);
868         if (rx_adapter != NULL) {
869                 RTE_EDEV_LOG_ERR("Eth Rx adapter exists id = %" PRIu8, id);
870                 return -EEXIST;
871         }
872
873         socket_id = rte_event_dev_socket_id(dev_id);
874         snprintf(mem_name, ETH_RX_ADAPTER_MEM_NAME_LEN,
875                 "rte_event_eth_rx_adapter_%d",
876                 id);
877
878         rx_adapter = rte_zmalloc_socket(mem_name, sizeof(*rx_adapter),
879                         RTE_CACHE_LINE_SIZE, socket_id);
880         if (rx_adapter == NULL) {
881                 RTE_EDEV_LOG_ERR("failed to get mem for rx adapter");
882                 return -ENOMEM;
883         }
884
885         rx_adapter->eventdev_id = dev_id;
886         rx_adapter->socket_id = socket_id;
887         rx_adapter->conf_cb = conf_cb;
888         rx_adapter->conf_arg = conf_arg;
889         strcpy(rx_adapter->mem_name, mem_name);
890         rx_adapter->eth_devices = rte_zmalloc_socket(rx_adapter->mem_name,
891                                         rte_eth_dev_count() *
892                                         sizeof(struct eth_device_info), 0,
893                                         socket_id);
894         rte_convert_rss_key((const uint32_t *)default_rss_key,
895                         (uint32_t *)rx_adapter->rss_key_be,
896                             RTE_DIM(default_rss_key));
897
898         if (rx_adapter->eth_devices == NULL) {
899                 RTE_EDEV_LOG_ERR("failed to get mem for eth devices\n");
900                 rte_free(rx_adapter);
901                 return -ENOMEM;
902         }
903         rte_spinlock_init(&rx_adapter->rx_lock);
904         for (i = 0; i < rte_eth_dev_count(); i++)
905                 rx_adapter->eth_devices[i].dev = &rte_eth_devices[i];
906
907         event_eth_rx_adapter[id] = rx_adapter;
908         if (conf_cb == default_conf_cb)
909                 rx_adapter->default_cb_arg = 1;
910         return 0;
911 }
912
913 int
914 rte_event_eth_rx_adapter_create(uint8_t id, uint8_t dev_id,
915                 struct rte_event_port_conf *port_config)
916 {
917         struct rte_event_port_conf *pc;
918         int ret;
919
920         if (port_config == NULL)
921                 return -EINVAL;
922         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
923
924         pc = rte_malloc(NULL, sizeof(*pc), 0);
925         if (pc == NULL)
926                 return -ENOMEM;
927         *pc = *port_config;
928         ret = rte_event_eth_rx_adapter_create_ext(id, dev_id,
929                                         default_conf_cb,
930                                         pc);
931         if (ret)
932                 rte_free(pc);
933         return ret;
934 }
935
936 int
937 rte_event_eth_rx_adapter_free(uint8_t id)
938 {
939         struct rte_event_eth_rx_adapter *rx_adapter;
940
941         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
942
943         rx_adapter = id_to_rx_adapter(id);
944         if (rx_adapter == NULL)
945                 return -EINVAL;
946
947         if (rx_adapter->nb_queues) {
948                 RTE_EDEV_LOG_ERR("%" PRIu16 " Rx queues not deleted",
949                                 rx_adapter->nb_queues);
950                 return -EBUSY;
951         }
952
953         if (rx_adapter->default_cb_arg)
954                 rte_free(rx_adapter->conf_arg);
955         rte_free(rx_adapter->eth_devices);
956         rte_free(rx_adapter);
957         event_eth_rx_adapter[id] = NULL;
958
959         return 0;
960 }
961
962 int
963 rte_event_eth_rx_adapter_queue_add(uint8_t id,
964                 uint8_t eth_dev_id,
965                 int32_t rx_queue_id,
966                 const struct rte_event_eth_rx_adapter_queue_conf *queue_conf)
967 {
968         int ret;
969         uint32_t cap;
970         struct rte_event_eth_rx_adapter *rx_adapter;
971         struct rte_eventdev *dev;
972         struct eth_device_info *dev_info;
973         int start_service;
974
975         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
976         RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
977
978         rx_adapter = id_to_rx_adapter(id);
979         if ((rx_adapter == NULL) || (queue_conf == NULL))
980                 return -EINVAL;
981
982         dev = &rte_eventdevs[rx_adapter->eventdev_id];
983         ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
984                                                 eth_dev_id,
985                                                 &cap);
986         if (ret) {
987                 RTE_EDEV_LOG_ERR("Failed to get adapter caps edev %" PRIu8
988                         "eth port %" PRIu8, id, eth_dev_id);
989                 return ret;
990         }
991
992         if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_OVERRIDE_FLOW_ID) == 0
993                 && (queue_conf->rx_queue_flags &
994                         RTE_EVENT_ETH_RX_ADAPTER_QUEUE_FLOW_ID_VALID)) {
995                 RTE_EDEV_LOG_ERR("Flow ID override is not supported,"
996                                 " eth port: %" PRIu8 " adapter id: %" PRIu8,
997                                 eth_dev_id, id);
998                 return -EINVAL;
999         }
1000
1001         if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_MULTI_EVENTQ) == 0 &&
1002                 (rx_queue_id != -1)) {
1003                 RTE_EDEV_LOG_ERR("Rx queues can only be connected to single "
1004                         "event queue id %u eth port %u", id, eth_dev_id);
1005                 return -EINVAL;
1006         }
1007
1008         if (rx_queue_id != -1 && (uint16_t)rx_queue_id >=
1009                         rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
1010                 RTE_EDEV_LOG_ERR("Invalid rx queue_id %" PRIu16,
1011                          (uint16_t)rx_queue_id);
1012                 return -EINVAL;
1013         }
1014
1015         start_service = 0;
1016         dev_info = &rx_adapter->eth_devices[eth_dev_id];
1017
1018         if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
1019                 RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->eth_rx_adapter_queue_add,
1020                                         -ENOTSUP);
1021                 if (dev_info->rx_queue == NULL) {
1022                         dev_info->rx_queue =
1023                             rte_zmalloc_socket(rx_adapter->mem_name,
1024                                         dev_info->dev->data->nb_rx_queues *
1025                                         sizeof(struct eth_rx_queue_info), 0,
1026                                         rx_adapter->socket_id);
1027                         if (dev_info->rx_queue == NULL)
1028                                 return -ENOMEM;
1029                 }
1030
1031                 ret = (*dev->dev_ops->eth_rx_adapter_queue_add)(dev,
1032                                 &rte_eth_devices[eth_dev_id],
1033                                 rx_queue_id, queue_conf);
1034                 if (ret == 0) {
1035                         update_queue_info(rx_adapter,
1036                                         &rx_adapter->eth_devices[eth_dev_id],
1037                                         rx_queue_id,
1038                                         1);
1039                 }
1040         } else {
1041                 rte_spinlock_lock(&rx_adapter->rx_lock);
1042                 ret = init_service(rx_adapter, id);
1043                 if (ret == 0)
1044                         ret = add_rx_queue(rx_adapter, eth_dev_id, rx_queue_id,
1045                                         queue_conf);
1046                 rte_spinlock_unlock(&rx_adapter->rx_lock);
1047                 if (ret == 0)
1048                         start_service = !!sw_rx_adapter_queue_count(rx_adapter);
1049         }
1050
1051         if (ret)
1052                 return ret;
1053
1054         if (start_service)
1055                 rte_service_component_runstate_set(rx_adapter->service_id, 1);
1056
1057         return 0;
1058 }
1059
1060 int
1061 rte_event_eth_rx_adapter_queue_del(uint8_t id, uint8_t eth_dev_id,
1062                                 int32_t rx_queue_id)
1063 {
1064         int ret = 0;
1065         struct rte_eventdev *dev;
1066         struct rte_event_eth_rx_adapter *rx_adapter;
1067         struct eth_device_info *dev_info;
1068         uint32_t cap;
1069         uint16_t i;
1070
1071         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
1072         RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
1073
1074         rx_adapter = id_to_rx_adapter(id);
1075         if (rx_adapter == NULL)
1076                 return -EINVAL;
1077
1078         dev = &rte_eventdevs[rx_adapter->eventdev_id];
1079         ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
1080                                                 eth_dev_id,
1081                                                 &cap);
1082         if (ret)
1083                 return ret;
1084
1085         if (rx_queue_id != -1 && (uint16_t)rx_queue_id >=
1086                 rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
1087                 RTE_EDEV_LOG_ERR("Invalid rx queue_id %" PRIu16,
1088                          (uint16_t)rx_queue_id);
1089                 return -EINVAL;
1090         }
1091
1092         dev_info = &rx_adapter->eth_devices[eth_dev_id];
1093
1094         if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
1095                 RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->eth_rx_adapter_queue_del,
1096                                  -ENOTSUP);
1097                 ret = (*dev->dev_ops->eth_rx_adapter_queue_del)(dev,
1098                                                 &rte_eth_devices[eth_dev_id],
1099                                                 rx_queue_id);
1100                 if (ret == 0) {
1101                         update_queue_info(rx_adapter,
1102                                         &rx_adapter->eth_devices[eth_dev_id],
1103                                         rx_queue_id,
1104                                         0);
1105                         if (dev_info->nb_dev_queues == 0) {
1106                                 rte_free(dev_info->rx_queue);
1107                                 dev_info->rx_queue = NULL;
1108                         }
1109                 }
1110         } else {
1111                 int rc;
1112                 rte_spinlock_lock(&rx_adapter->rx_lock);
1113                 if (rx_queue_id == -1) {
1114                         for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++)
1115                                 event_eth_rx_adapter_queue_del(rx_adapter,
1116                                                         dev_info,
1117                                                         i);
1118                 } else {
1119                         event_eth_rx_adapter_queue_del(rx_adapter,
1120                                                 dev_info,
1121                                                 (uint16_t)rx_queue_id);
1122                 }
1123
1124                 rc = eth_poll_wrr_calc(rx_adapter);
1125                 if (rc)
1126                         RTE_EDEV_LOG_ERR("WRR recalculation failed %" PRId32,
1127                                         rc);
1128
1129                 if (dev_info->nb_dev_queues == 0) {
1130                         rte_free(dev_info->rx_queue);
1131                         dev_info->rx_queue = NULL;
1132                 }
1133
1134                 rte_spinlock_unlock(&rx_adapter->rx_lock);
1135                 rte_service_component_runstate_set(rx_adapter->service_id,
1136                                 sw_rx_adapter_queue_count(rx_adapter));
1137         }
1138
1139         return ret;
1140 }
1141
1142
1143 int
1144 rte_event_eth_rx_adapter_start(uint8_t id)
1145 {
1146         return rx_adapter_ctrl(id, 1);
1147 }
1148
1149 int
1150 rte_event_eth_rx_adapter_stop(uint8_t id)
1151 {
1152         return rx_adapter_ctrl(id, 0);
1153 }
1154
1155 int
1156 rte_event_eth_rx_adapter_stats_get(uint8_t id,
1157                                struct rte_event_eth_rx_adapter_stats *stats)
1158 {
1159         struct rte_event_eth_rx_adapter *rx_adapter;
1160         struct rte_event_eth_rx_adapter_stats dev_stats_sum = { 0 };
1161         struct rte_event_eth_rx_adapter_stats dev_stats;
1162         struct rte_eventdev *dev;
1163         struct eth_device_info *dev_info;
1164         uint32_t i;
1165         int ret;
1166
1167         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
1168
1169         rx_adapter = id_to_rx_adapter(id);
1170         if (rx_adapter  == NULL || stats == NULL)
1171                 return -EINVAL;
1172
1173         dev = &rte_eventdevs[rx_adapter->eventdev_id];
1174         memset(stats, 0, sizeof(*stats));
1175         for (i = 0; i < rte_eth_dev_count(); i++) {
1176                 dev_info = &rx_adapter->eth_devices[i];
1177                 if (dev_info->internal_event_port == 0 ||
1178                         dev->dev_ops->eth_rx_adapter_stats_get == NULL)
1179                         continue;
1180                 ret = (*dev->dev_ops->eth_rx_adapter_stats_get)(dev,
1181                                                 &rte_eth_devices[i],
1182                                                 &dev_stats);
1183                 if (ret)
1184                         continue;
1185                 dev_stats_sum.rx_packets += dev_stats.rx_packets;
1186                 dev_stats_sum.rx_enq_count += dev_stats.rx_enq_count;
1187         }
1188
1189         if (rx_adapter->service_inited)
1190                 *stats = rx_adapter->stats;
1191
1192         stats->rx_packets += dev_stats_sum.rx_packets;
1193         stats->rx_enq_count += dev_stats_sum.rx_enq_count;
1194         return 0;
1195 }
1196
1197 int
1198 rte_event_eth_rx_adapter_stats_reset(uint8_t id)
1199 {
1200         struct rte_event_eth_rx_adapter *rx_adapter;
1201         struct rte_eventdev *dev;
1202         struct eth_device_info *dev_info;
1203         uint32_t i;
1204
1205         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
1206
1207         rx_adapter = id_to_rx_adapter(id);
1208         if (rx_adapter == NULL)
1209                 return -EINVAL;
1210
1211         dev = &rte_eventdevs[rx_adapter->eventdev_id];
1212         for (i = 0; i < rte_eth_dev_count(); i++) {
1213                 dev_info = &rx_adapter->eth_devices[i];
1214                 if (dev_info->internal_event_port == 0 ||
1215                         dev->dev_ops->eth_rx_adapter_stats_reset == NULL)
1216                         continue;
1217                 (*dev->dev_ops->eth_rx_adapter_stats_reset)(dev,
1218                                                         &rte_eth_devices[i]);
1219         }
1220
1221         memset(&rx_adapter->stats, 0, sizeof(rx_adapter->stats));
1222         return 0;
1223 }
1224
1225 int
1226 rte_event_eth_rx_adapter_service_id_get(uint8_t id, uint32_t *service_id)
1227 {
1228         struct rte_event_eth_rx_adapter *rx_adapter;
1229
1230         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
1231
1232         rx_adapter = id_to_rx_adapter(id);
1233         if (rx_adapter == NULL || service_id == NULL)
1234                 return -EINVAL;
1235
1236         if (rx_adapter->service_inited)
1237                 *service_id = rx_adapter->service_id;
1238
1239         return rx_adapter->service_inited ? 0 : -ESRCH;
1240 }