New upstream version 17.11.4
[deb_dpdk.git] / lib / librte_eventdev / rte_event_eth_rx_adapter.c
1 #include <rte_cycles.h>
2 #include <rte_common.h>
3 #include <rte_dev.h>
4 #include <rte_errno.h>
5 #include <rte_ethdev.h>
6 #include <rte_log.h>
7 #include <rte_malloc.h>
8 #include <rte_service_component.h>
9 #include <rte_thash.h>
10
11 #include "rte_eventdev.h"
12 #include "rte_eventdev_pmd.h"
13 #include "rte_event_eth_rx_adapter.h"
14
15 #define BATCH_SIZE              32
16 #define BLOCK_CNT_THRESHOLD     10
17 #define ETH_EVENT_BUFFER_SIZE   (4*BATCH_SIZE)
18
19 #define ETH_RX_ADAPTER_SERVICE_NAME_LEN 32
20 #define ETH_RX_ADAPTER_MEM_NAME_LEN     32
21
22 #define RSS_KEY_SIZE    40
23
24 /*
25  * There is an instance of this struct per polled Rx queue added to the
26  * adapter
27  */
28 struct eth_rx_poll_entry {
29         /* Eth port to poll */
30         uint8_t eth_dev_id;
31         /* Eth rx queue to poll */
32         uint16_t eth_rx_qid;
33 };
34
35 /* Instance per adapter */
36 struct rte_eth_event_enqueue_buffer {
37         /* Count of events in this buffer */
38         uint16_t count;
39         /* Array of events in this buffer */
40         struct rte_event events[ETH_EVENT_BUFFER_SIZE];
41 };
42
43 struct rte_event_eth_rx_adapter {
44         /* RSS key */
45         uint8_t rss_key_be[RSS_KEY_SIZE];
46         /* Event device identifier */
47         uint8_t eventdev_id;
48         /* Per ethernet device structure */
49         struct eth_device_info *eth_devices;
50         /* Event port identifier */
51         uint8_t event_port_id;
52         /* Lock to serialize config updates with service function */
53         rte_spinlock_t rx_lock;
54         /* Max mbufs processed in any service function invocation */
55         uint32_t max_nb_rx;
56         /* Receive queues that need to be polled */
57         struct eth_rx_poll_entry *eth_rx_poll;
58         /* Size of the eth_rx_poll array */
59         uint16_t num_rx_polled;
60         /* Weighted round robin schedule */
61         uint32_t *wrr_sched;
62         /* wrr_sched[] size */
63         uint32_t wrr_len;
64         /* Next entry in wrr[] to begin polling */
65         uint32_t wrr_pos;
66         /* Event burst buffer */
67         struct rte_eth_event_enqueue_buffer event_enqueue_buffer;
68         /* Per adapter stats */
69         struct rte_event_eth_rx_adapter_stats stats;
70         /* Block count, counts up to BLOCK_CNT_THRESHOLD */
71         uint16_t enq_block_count;
72         /* Block start ts */
73         uint64_t rx_enq_block_start_ts;
74         /* Configuration callback for rte_service configuration */
75         rte_event_eth_rx_adapter_conf_cb conf_cb;
76         /* Configuration callback argument */
77         void *conf_arg;
78         /* Set if  default_cb is being used */
79         int default_cb_arg;
80         /* Service initialization state */
81         uint8_t service_inited;
82         /* Total count of Rx queues in adapter */
83         uint32_t nb_queues;
84         /* Memory allocation name */
85         char mem_name[ETH_RX_ADAPTER_MEM_NAME_LEN];
86         /* Socket identifier cached from eventdev */
87         int socket_id;
88         /* Per adapter EAL service */
89         uint32_t service_id;
90         /* Adapter started flag */
91         uint8_t rxa_started;
92 } __rte_cache_aligned;
93
94 /* Per eth device */
95 struct eth_device_info {
96         struct rte_eth_dev *dev;
97         struct eth_rx_queue_info *rx_queue;
98         /* Set if ethdev->eventdev packet transfer uses a
99          * hardware mechanism
100          */
101         uint8_t internal_event_port;
102         /* Set if the adapter is processing rx queues for
103          * this eth device and packet processing has been
104          * started, allows for the code to know if the PMD
105          * rx_adapter_stop callback needs to be invoked
106          */
107         uint8_t dev_rx_started;
108         /* If nb_dev_queues > 0, the start callback will
109          * be invoked if not already invoked
110          */
111         uint16_t nb_dev_queues;
112 };
113
114 /* Per Rx queue */
115 struct eth_rx_queue_info {
116         int queue_enabled;      /* True if added */
117         uint16_t wt;            /* Polling weight */
118         uint8_t event_queue_id; /* Event queue to enqueue packets to */
119         uint8_t sched_type;     /* Sched type for events */
120         uint8_t priority;       /* Event priority */
121         uint32_t flow_id;       /* App provided flow identifier */
122         uint32_t flow_id_mask;  /* Set to ~0 if app provides flow id else 0 */
123 };
124
125 static struct rte_event_eth_rx_adapter **event_eth_rx_adapter;
126
127 static inline int
128 valid_id(uint8_t id)
129 {
130         return id < RTE_EVENT_ETH_RX_ADAPTER_MAX_INSTANCE;
131 }
132
133 #define RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, retval) do { \
134         if (!valid_id(id)) { \
135                 RTE_EDEV_LOG_ERR("Invalid eth Rx adapter id = %d\n", id); \
136                 return retval; \
137         } \
138 } while (0)
139
140 static inline int
141 sw_rx_adapter_queue_count(struct rte_event_eth_rx_adapter *rx_adapter)
142 {
143         return rx_adapter->num_rx_polled;
144 }
145
146 /* Greatest common divisor */
147 static uint16_t gcd_u16(uint16_t a, uint16_t b)
148 {
149         uint16_t r = a % b;
150
151         return r ? gcd_u16(b, r) : b;
152 }
153
154 /* Returns the next queue in the polling sequence
155  *
156  * http://kb.linuxvirtualserver.org/wiki/Weighted_Round-Robin_Scheduling
157  */
158 static int
159 wrr_next(struct rte_event_eth_rx_adapter *rx_adapter,
160          unsigned int n, int *cw,
161          struct eth_rx_poll_entry *eth_rx_poll, uint16_t max_wt,
162          uint16_t gcd, int prev)
163 {
164         int i = prev;
165         uint16_t w;
166
167         while (1) {
168                 uint16_t q;
169                 uint8_t d;
170
171                 i = (i + 1) % n;
172                 if (i == 0) {
173                         *cw = *cw - gcd;
174                         if (*cw <= 0)
175                                 *cw = max_wt;
176                 }
177
178                 q = eth_rx_poll[i].eth_rx_qid;
179                 d = eth_rx_poll[i].eth_dev_id;
180                 w = rx_adapter->eth_devices[d].rx_queue[q].wt;
181
182                 if ((int)w >= *cw)
183                         return i;
184         }
185 }
186
187 /* Precalculate WRR polling sequence for all queues in rx_adapter */
188 static int
189 eth_poll_wrr_calc(struct rte_event_eth_rx_adapter *rx_adapter)
190 {
191         uint8_t d;
192         uint16_t q;
193         unsigned int i;
194
195         /* Initialize variables for calculation of wrr schedule */
196         uint16_t max_wrr_pos = 0;
197         unsigned int poll_q = 0;
198         uint16_t max_wt = 0;
199         uint16_t gcd = 0;
200
201         struct eth_rx_poll_entry *rx_poll = NULL;
202         uint32_t *rx_wrr = NULL;
203
204         if (rx_adapter->num_rx_polled) {
205                 size_t len = RTE_ALIGN(rx_adapter->num_rx_polled *
206                                 sizeof(*rx_adapter->eth_rx_poll),
207                                 RTE_CACHE_LINE_SIZE);
208                 rx_poll = rte_zmalloc_socket(rx_adapter->mem_name,
209                                              len,
210                                              RTE_CACHE_LINE_SIZE,
211                                              rx_adapter->socket_id);
212                 if (rx_poll == NULL)
213                         return -ENOMEM;
214
215                 /* Generate array of all queues to poll, the size of this
216                  * array is poll_q
217                  */
218                 for (d = 0; d < rte_eth_dev_count(); d++) {
219                         uint16_t nb_rx_queues;
220                         struct eth_device_info *dev_info =
221                                         &rx_adapter->eth_devices[d];
222                         nb_rx_queues = dev_info->dev->data->nb_rx_queues;
223                         if (dev_info->rx_queue == NULL)
224                                 continue;
225                         if (dev_info->internal_event_port)
226                                 continue;
227                         for (q = 0; q < nb_rx_queues; q++) {
228                                 struct eth_rx_queue_info *queue_info =
229                                         &dev_info->rx_queue[q];
230                                 if (queue_info->queue_enabled == 0)
231                                         continue;
232
233                                 uint16_t wt = queue_info->wt;
234                                 rx_poll[poll_q].eth_dev_id = d;
235                                 rx_poll[poll_q].eth_rx_qid = q;
236                                 max_wrr_pos += wt;
237                                 max_wt = RTE_MAX(max_wt, wt);
238                                 gcd = (gcd) ? gcd_u16(gcd, wt) : wt;
239                                 poll_q++;
240                         }
241                 }
242
243                 len = RTE_ALIGN(max_wrr_pos * sizeof(*rx_wrr),
244                                 RTE_CACHE_LINE_SIZE);
245                 rx_wrr = rte_zmalloc_socket(rx_adapter->mem_name,
246                                             len,
247                                             RTE_CACHE_LINE_SIZE,
248                                             rx_adapter->socket_id);
249                 if (rx_wrr == NULL) {
250                         rte_free(rx_poll);
251                         return -ENOMEM;
252                 }
253
254                 /* Generate polling sequence based on weights */
255                 int prev = -1;
256                 int cw = -1;
257                 for (i = 0; i < max_wrr_pos; i++) {
258                         rx_wrr[i] = wrr_next(rx_adapter, poll_q, &cw,
259                                              rx_poll, max_wt, gcd, prev);
260                         prev = rx_wrr[i];
261                 }
262         }
263
264         rte_free(rx_adapter->eth_rx_poll);
265         rte_free(rx_adapter->wrr_sched);
266
267         rx_adapter->eth_rx_poll = rx_poll;
268         rx_adapter->wrr_sched = rx_wrr;
269         rx_adapter->wrr_len = max_wrr_pos;
270
271         return 0;
272 }
273
274 static inline void
275 mtoip(struct rte_mbuf *m, struct ipv4_hdr **ipv4_hdr,
276         struct ipv6_hdr **ipv6_hdr)
277 {
278         struct ether_hdr *eth_hdr = rte_pktmbuf_mtod(m, struct ether_hdr *);
279         struct vlan_hdr *vlan_hdr;
280
281         *ipv4_hdr = NULL;
282         *ipv6_hdr = NULL;
283
284         switch (eth_hdr->ether_type) {
285         case RTE_BE16(ETHER_TYPE_IPv4):
286                 *ipv4_hdr = (struct ipv4_hdr *)(eth_hdr + 1);
287                 break;
288
289         case RTE_BE16(ETHER_TYPE_IPv6):
290                 *ipv6_hdr = (struct ipv6_hdr *)(eth_hdr + 1);
291                 break;
292
293         case RTE_BE16(ETHER_TYPE_VLAN):
294                 vlan_hdr = (struct vlan_hdr *)(eth_hdr + 1);
295                 switch (vlan_hdr->eth_proto) {
296                 case RTE_BE16(ETHER_TYPE_IPv4):
297                         *ipv4_hdr = (struct ipv4_hdr *)(vlan_hdr + 1);
298                         break;
299                 case RTE_BE16(ETHER_TYPE_IPv6):
300                         *ipv6_hdr = (struct ipv6_hdr *)(vlan_hdr + 1);
301                         break;
302                 default:
303                         break;
304                 }
305                 break;
306
307         default:
308                 break;
309         }
310 }
311
312 /* Calculate RSS hash for IPv4/6 */
313 static inline uint32_t
314 do_softrss(struct rte_mbuf *m, const uint8_t *rss_key_be)
315 {
316         uint32_t input_len;
317         void *tuple;
318         struct rte_ipv4_tuple ipv4_tuple;
319         struct rte_ipv6_tuple ipv6_tuple;
320         struct ipv4_hdr *ipv4_hdr;
321         struct ipv6_hdr *ipv6_hdr;
322
323         mtoip(m, &ipv4_hdr, &ipv6_hdr);
324
325         if (ipv4_hdr) {
326                 ipv4_tuple.src_addr = rte_be_to_cpu_32(ipv4_hdr->src_addr);
327                 ipv4_tuple.dst_addr = rte_be_to_cpu_32(ipv4_hdr->dst_addr);
328                 tuple = &ipv4_tuple;
329                 input_len = RTE_THASH_V4_L3_LEN;
330         } else if (ipv6_hdr) {
331                 rte_thash_load_v6_addrs(ipv6_hdr,
332                                         (union rte_thash_tuple *)&ipv6_tuple);
333                 tuple = &ipv6_tuple;
334                 input_len = RTE_THASH_V6_L3_LEN;
335         } else
336                 return 0;
337
338         return rte_softrss_be(tuple, input_len, rss_key_be);
339 }
340
341 static inline int
342 rx_enq_blocked(struct rte_event_eth_rx_adapter *rx_adapter)
343 {
344         return !!rx_adapter->enq_block_count;
345 }
346
347 static inline void
348 rx_enq_block_start_ts(struct rte_event_eth_rx_adapter *rx_adapter)
349 {
350         if (rx_adapter->rx_enq_block_start_ts)
351                 return;
352
353         rx_adapter->enq_block_count++;
354         if (rx_adapter->enq_block_count < BLOCK_CNT_THRESHOLD)
355                 return;
356
357         rx_adapter->rx_enq_block_start_ts = rte_get_tsc_cycles();
358 }
359
360 static inline void
361 rx_enq_block_end_ts(struct rte_event_eth_rx_adapter *rx_adapter,
362                     struct rte_event_eth_rx_adapter_stats *stats)
363 {
364         if (unlikely(!stats->rx_enq_start_ts))
365                 stats->rx_enq_start_ts = rte_get_tsc_cycles();
366
367         if (likely(!rx_enq_blocked(rx_adapter)))
368                 return;
369
370         rx_adapter->enq_block_count = 0;
371         if (rx_adapter->rx_enq_block_start_ts) {
372                 stats->rx_enq_end_ts = rte_get_tsc_cycles();
373                 stats->rx_enq_block_cycles += stats->rx_enq_end_ts -
374                     rx_adapter->rx_enq_block_start_ts;
375                 rx_adapter->rx_enq_block_start_ts = 0;
376         }
377 }
378
379 /* Add event to buffer, free space check is done prior to calling
380  * this function
381  */
382 static inline void
383 buf_event_enqueue(struct rte_event_eth_rx_adapter *rx_adapter,
384                   struct rte_event *ev)
385 {
386         struct rte_eth_event_enqueue_buffer *buf =
387             &rx_adapter->event_enqueue_buffer;
388         rte_memcpy(&buf->events[buf->count++], ev, sizeof(struct rte_event));
389 }
390
391 /* Enqueue buffered events to event device */
392 static inline uint16_t
393 flush_event_buffer(struct rte_event_eth_rx_adapter *rx_adapter)
394 {
395         struct rte_eth_event_enqueue_buffer *buf =
396             &rx_adapter->event_enqueue_buffer;
397         struct rte_event_eth_rx_adapter_stats *stats = &rx_adapter->stats;
398
399         uint16_t n = rte_event_enqueue_new_burst(rx_adapter->eventdev_id,
400                                         rx_adapter->event_port_id,
401                                         buf->events,
402                                         buf->count);
403         if (n != buf->count) {
404                 memmove(buf->events,
405                         &buf->events[n],
406                         (buf->count - n) * sizeof(struct rte_event));
407                 stats->rx_enq_retry++;
408         }
409
410         n ? rx_enq_block_end_ts(rx_adapter, stats) :
411                 rx_enq_block_start_ts(rx_adapter);
412
413         buf->count -= n;
414         stats->rx_enq_count += n;
415
416         return n;
417 }
418
419 static inline void
420 fill_event_buffer(struct rte_event_eth_rx_adapter *rx_adapter,
421         uint16_t eth_dev_id,
422         uint16_t rx_queue_id,
423         struct rte_mbuf **mbufs,
424         uint16_t num)
425 {
426         uint32_t i;
427         struct eth_device_info *eth_device_info =
428                                         &rx_adapter->eth_devices[eth_dev_id];
429         struct eth_rx_queue_info *eth_rx_queue_info =
430                                         &eth_device_info->rx_queue[rx_queue_id];
431
432         int32_t qid = eth_rx_queue_info->event_queue_id;
433         uint8_t sched_type = eth_rx_queue_info->sched_type;
434         uint8_t priority = eth_rx_queue_info->priority;
435         uint32_t flow_id;
436         struct rte_event events[BATCH_SIZE];
437         struct rte_mbuf *m = mbufs[0];
438         uint32_t rss_mask;
439         uint32_t rss;
440         int do_rss;
441
442         /* 0xffff ffff if PKT_RX_RSS_HASH is set, otherwise 0 */
443         rss_mask = ~(((m->ol_flags & PKT_RX_RSS_HASH) != 0) - 1);
444         do_rss = !rss_mask && !eth_rx_queue_info->flow_id_mask;
445
446         for (i = 0; i < num; i++) {
447                 m = mbufs[i];
448                 struct rte_event *ev = &events[i];
449
450                 rss = do_rss ?
451                         do_softrss(m, rx_adapter->rss_key_be) : m->hash.rss;
452                 flow_id =
453                     eth_rx_queue_info->flow_id &
454                                 eth_rx_queue_info->flow_id_mask;
455                 flow_id |= rss & ~eth_rx_queue_info->flow_id_mask;
456
457                 ev->flow_id = flow_id;
458                 ev->op = RTE_EVENT_OP_NEW;
459                 ev->sched_type = sched_type;
460                 ev->queue_id = qid;
461                 ev->event_type = RTE_EVENT_TYPE_ETH_RX_ADAPTER;
462                 ev->sub_event_type = 0;
463                 ev->priority = priority;
464                 ev->mbuf = m;
465
466                 buf_event_enqueue(rx_adapter, ev);
467         }
468 }
469
470 /*
471  * Polls receive queues added to the event adapter and enqueues received
472  * packets to the event device.
473  *
474  * The receive code enqueues initially to a temporary buffer, the
475  * temporary buffer is drained anytime it holds >= BATCH_SIZE packets
476  *
477  * If there isn't space available in the temporary buffer, packets from the
478  * Rx queue aren't dequeued from the eth device, this back pressures the
479  * eth device, in virtual device environments this back pressure is relayed to
480  * the hypervisor's switching layer where adjustments can be made to deal with
481  * it.
482  */
483 static inline void
484 eth_rx_poll(struct rte_event_eth_rx_adapter *rx_adapter)
485 {
486         uint32_t num_queue;
487         uint16_t n;
488         uint32_t nb_rx = 0;
489         struct rte_mbuf *mbufs[BATCH_SIZE];
490         struct rte_eth_event_enqueue_buffer *buf;
491         uint32_t wrr_pos;
492         uint32_t max_nb_rx;
493
494         wrr_pos = rx_adapter->wrr_pos;
495         max_nb_rx = rx_adapter->max_nb_rx;
496         buf = &rx_adapter->event_enqueue_buffer;
497         struct rte_event_eth_rx_adapter_stats *stats = &rx_adapter->stats;
498
499         /* Iterate through a WRR sequence */
500         for (num_queue = 0; num_queue < rx_adapter->wrr_len; num_queue++) {
501                 unsigned int poll_idx = rx_adapter->wrr_sched[wrr_pos];
502                 uint16_t qid = rx_adapter->eth_rx_poll[poll_idx].eth_rx_qid;
503                 uint8_t d = rx_adapter->eth_rx_poll[poll_idx].eth_dev_id;
504
505                 /* Don't do a batch dequeue from the rx queue if there isn't
506                  * enough space in the enqueue buffer.
507                  */
508                 if (buf->count >= BATCH_SIZE)
509                         flush_event_buffer(rx_adapter);
510                 if (BATCH_SIZE > (ETH_EVENT_BUFFER_SIZE - buf->count)) {
511                         rx_adapter->wrr_pos = wrr_pos;
512                         return;
513                 }
514
515                 stats->rx_poll_count++;
516                 n = rte_eth_rx_burst(d, qid, mbufs, BATCH_SIZE);
517
518                 if (n) {
519                         stats->rx_packets += n;
520                         /* The check before rte_eth_rx_burst() ensures that
521                          * all n mbufs can be buffered
522                          */
523                         fill_event_buffer(rx_adapter, d, qid, mbufs, n);
524                         nb_rx += n;
525                         if (nb_rx > max_nb_rx) {
526                                 rx_adapter->wrr_pos =
527                                     (wrr_pos + 1) % rx_adapter->wrr_len;
528                                 break;
529                         }
530                 }
531
532                 if (++wrr_pos == rx_adapter->wrr_len)
533                         wrr_pos = 0;
534         }
535
536         if (buf->count >= BATCH_SIZE)
537                 flush_event_buffer(rx_adapter);
538 }
539
540 static int
541 event_eth_rx_adapter_service_func(void *args)
542 {
543         struct rte_event_eth_rx_adapter *rx_adapter = args;
544
545         if (rte_spinlock_trylock(&rx_adapter->rx_lock) == 0)
546                 return 0;
547         if (!rx_adapter->rxa_started) {
548                 return 0;
549                 rte_spinlock_unlock(&rx_adapter->rx_lock);
550         }
551         eth_rx_poll(rx_adapter);
552         rte_spinlock_unlock(&rx_adapter->rx_lock);
553         return 0;
554 }
555
556 static int
557 rte_event_eth_rx_adapter_init(void)
558 {
559         const char *name = "rte_event_eth_rx_adapter_array";
560         const struct rte_memzone *mz;
561         unsigned int sz;
562
563         sz = sizeof(*event_eth_rx_adapter) *
564             RTE_EVENT_ETH_RX_ADAPTER_MAX_INSTANCE;
565         sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);
566
567         mz = rte_memzone_lookup(name);
568         if (mz == NULL) {
569                 mz = rte_memzone_reserve_aligned(name, sz, rte_socket_id(), 0,
570                                                  RTE_CACHE_LINE_SIZE);
571                 if (mz == NULL) {
572                         RTE_EDEV_LOG_ERR("failed to reserve memzone err = %"
573                                         PRId32, rte_errno);
574                         return -rte_errno;
575                 }
576         }
577
578         event_eth_rx_adapter = mz->addr;
579         return 0;
580 }
581
582 static inline struct rte_event_eth_rx_adapter *
583 id_to_rx_adapter(uint8_t id)
584 {
585         return event_eth_rx_adapter ?
586                 event_eth_rx_adapter[id] : NULL;
587 }
588
589 static int
590 default_conf_cb(uint8_t id, uint8_t dev_id,
591                 struct rte_event_eth_rx_adapter_conf *conf, void *arg)
592 {
593         int ret;
594         struct rte_eventdev *dev;
595         struct rte_event_dev_config dev_conf;
596         int started;
597         uint8_t port_id;
598         struct rte_event_port_conf *port_conf = arg;
599         struct rte_event_eth_rx_adapter *rx_adapter = id_to_rx_adapter(id);
600
601         dev = &rte_eventdevs[rx_adapter->eventdev_id];
602         dev_conf = dev->data->dev_conf;
603
604         started = dev->data->dev_started;
605         if (started)
606                 rte_event_dev_stop(dev_id);
607         port_id = dev_conf.nb_event_ports;
608         dev_conf.nb_event_ports += 1;
609         ret = rte_event_dev_configure(dev_id, &dev_conf);
610         if (ret) {
611                 RTE_EDEV_LOG_ERR("failed to configure event dev %u\n",
612                                                 dev_id);
613                 if (started)
614                         rte_event_dev_start(dev_id);
615                 return ret;
616         }
617
618         ret = rte_event_port_setup(dev_id, port_id, port_conf);
619         if (ret) {
620                 RTE_EDEV_LOG_ERR("failed to setup event port %u\n",
621                                         port_id);
622                 return ret;
623         }
624
625         conf->event_port_id = port_id;
626         conf->max_nb_rx = 128;
627         if (started)
628                 rte_event_dev_start(dev_id);
629         rx_adapter->default_cb_arg = 1;
630         return ret;
631 }
632
633 static int
634 init_service(struct rte_event_eth_rx_adapter *rx_adapter, uint8_t id)
635 {
636         int ret;
637         struct rte_service_spec service;
638         struct rte_event_eth_rx_adapter_conf rx_adapter_conf;
639
640         if (rx_adapter->service_inited)
641                 return 0;
642
643         memset(&service, 0, sizeof(service));
644         snprintf(service.name, ETH_RX_ADAPTER_SERVICE_NAME_LEN,
645                 "rte_event_eth_rx_adapter_%d", id);
646         service.socket_id = rx_adapter->socket_id;
647         service.callback = event_eth_rx_adapter_service_func;
648         service.callback_userdata = rx_adapter;
649         /* Service function handles locking for queue add/del updates */
650         service.capabilities = RTE_SERVICE_CAP_MT_SAFE;
651         ret = rte_service_component_register(&service, &rx_adapter->service_id);
652         if (ret) {
653                 RTE_EDEV_LOG_ERR("failed to register service %s err = %" PRId32,
654                         service.name, ret);
655                 return ret;
656         }
657
658         ret = rx_adapter->conf_cb(id, rx_adapter->eventdev_id,
659                 &rx_adapter_conf, rx_adapter->conf_arg);
660         if (ret) {
661                 RTE_EDEV_LOG_ERR("configuration callback failed err = %" PRId32,
662                         ret);
663                 goto err_done;
664         }
665         rx_adapter->event_port_id = rx_adapter_conf.event_port_id;
666         rx_adapter->max_nb_rx = rx_adapter_conf.max_nb_rx;
667         rx_adapter->service_inited = 1;
668         return 0;
669
670 err_done:
671         rte_service_component_unregister(rx_adapter->service_id);
672         return ret;
673 }
674
675
676 static void
677 update_queue_info(struct rte_event_eth_rx_adapter *rx_adapter,
678                 struct eth_device_info *dev_info,
679                 int32_t rx_queue_id,
680                 uint8_t add)
681 {
682         struct eth_rx_queue_info *queue_info;
683         int enabled;
684         uint16_t i;
685
686         if (dev_info->rx_queue == NULL)
687                 return;
688
689         if (rx_queue_id == -1) {
690                 for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++)
691                         update_queue_info(rx_adapter, dev_info, i, add);
692         } else {
693                 queue_info = &dev_info->rx_queue[rx_queue_id];
694                 enabled = queue_info->queue_enabled;
695                 if (add) {
696                         rx_adapter->nb_queues += !enabled;
697                         dev_info->nb_dev_queues += !enabled;
698                 } else {
699                         rx_adapter->nb_queues -= enabled;
700                         dev_info->nb_dev_queues -= enabled;
701                 }
702                 queue_info->queue_enabled = !!add;
703         }
704 }
705
706 static int
707 event_eth_rx_adapter_queue_del(struct rte_event_eth_rx_adapter *rx_adapter,
708                             struct eth_device_info *dev_info,
709                             uint16_t rx_queue_id)
710 {
711         struct eth_rx_queue_info *queue_info;
712
713         if (rx_adapter->nb_queues == 0)
714                 return 0;
715
716         queue_info = &dev_info->rx_queue[rx_queue_id];
717         rx_adapter->num_rx_polled -= queue_info->queue_enabled;
718         update_queue_info(rx_adapter, dev_info, rx_queue_id, 0);
719         return 0;
720 }
721
722 static void
723 event_eth_rx_adapter_queue_add(struct rte_event_eth_rx_adapter *rx_adapter,
724                 struct eth_device_info *dev_info,
725                 uint16_t rx_queue_id,
726                 const struct rte_event_eth_rx_adapter_queue_conf *conf)
727
728 {
729         struct eth_rx_queue_info *queue_info;
730         const struct rte_event *ev = &conf->ev;
731
732         queue_info = &dev_info->rx_queue[rx_queue_id];
733         queue_info->event_queue_id = ev->queue_id;
734         queue_info->sched_type = ev->sched_type;
735         queue_info->priority = ev->priority;
736         queue_info->wt = conf->servicing_weight;
737
738         if (conf->rx_queue_flags &
739                         RTE_EVENT_ETH_RX_ADAPTER_QUEUE_FLOW_ID_VALID) {
740                 queue_info->flow_id = ev->flow_id;
741                 queue_info->flow_id_mask = ~0;
742         }
743
744         /* The same queue can be added more than once */
745         rx_adapter->num_rx_polled += !queue_info->queue_enabled;
746         update_queue_info(rx_adapter, dev_info, rx_queue_id, 1);
747 }
748
749 static int add_rx_queue(struct rte_event_eth_rx_adapter *rx_adapter,
750                 uint8_t eth_dev_id,
751                 int rx_queue_id,
752                 const struct rte_event_eth_rx_adapter_queue_conf *queue_conf)
753 {
754         struct eth_device_info *dev_info = &rx_adapter->eth_devices[eth_dev_id];
755         struct rte_event_eth_rx_adapter_queue_conf temp_conf;
756         uint32_t i;
757         int ret;
758
759         if (queue_conf->servicing_weight == 0) {
760
761                 struct rte_eth_dev_data *data = dev_info->dev->data;
762                 if (data->dev_conf.intr_conf.rxq) {
763                         RTE_EDEV_LOG_ERR("Interrupt driven queues"
764                                         " not supported");
765                         return -ENOTSUP;
766                 }
767                 temp_conf = *queue_conf;
768
769                 /* If Rx interrupts are disabled set wt = 1 */
770                 temp_conf.servicing_weight = 1;
771                 queue_conf = &temp_conf;
772         }
773
774         if (dev_info->rx_queue == NULL) {
775                 dev_info->rx_queue =
776                     rte_zmalloc_socket(rx_adapter->mem_name,
777                                        dev_info->dev->data->nb_rx_queues *
778                                        sizeof(struct eth_rx_queue_info), 0,
779                                        rx_adapter->socket_id);
780                 if (dev_info->rx_queue == NULL)
781                         return -ENOMEM;
782         }
783
784         if (rx_queue_id == -1) {
785                 for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++)
786                         event_eth_rx_adapter_queue_add(rx_adapter,
787                                                 dev_info, i,
788                                                 queue_conf);
789         } else {
790                 event_eth_rx_adapter_queue_add(rx_adapter, dev_info,
791                                           (uint16_t)rx_queue_id,
792                                           queue_conf);
793         }
794
795         ret = eth_poll_wrr_calc(rx_adapter);
796         if (ret) {
797                 event_eth_rx_adapter_queue_del(rx_adapter,
798                                         dev_info, rx_queue_id);
799                 return ret;
800         }
801
802         return ret;
803 }
804
805 static int
806 rx_adapter_ctrl(uint8_t id, int start)
807 {
808         struct rte_event_eth_rx_adapter *rx_adapter;
809         struct rte_eventdev *dev;
810         struct eth_device_info *dev_info;
811         uint32_t i;
812         int use_service = 0;
813         int stop = !start;
814
815         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
816         rx_adapter = id_to_rx_adapter(id);
817         if (rx_adapter == NULL)
818                 return -EINVAL;
819
820         dev = &rte_eventdevs[rx_adapter->eventdev_id];
821
822         for (i = 0; i < rte_eth_dev_count(); i++) {
823                 dev_info = &rx_adapter->eth_devices[i];
824                 /* if start  check for num dev queues */
825                 if (start && !dev_info->nb_dev_queues)
826                         continue;
827                 /* if stop check if dev has been started */
828                 if (stop && !dev_info->dev_rx_started)
829                         continue;
830                 use_service |= !dev_info->internal_event_port;
831                 dev_info->dev_rx_started = start;
832                 if (dev_info->internal_event_port == 0)
833                         continue;
834                 start ? (*dev->dev_ops->eth_rx_adapter_start)(dev,
835                                                 &rte_eth_devices[i]) :
836                         (*dev->dev_ops->eth_rx_adapter_stop)(dev,
837                                                 &rte_eth_devices[i]);
838         }
839
840         if (use_service) {
841                 rte_spinlock_lock(&rx_adapter->rx_lock);
842                 rx_adapter->rxa_started = start;
843                 rte_service_runstate_set(rx_adapter->service_id, start);
844                 rte_spinlock_unlock(&rx_adapter->rx_lock);
845         }
846
847         return 0;
848 }
849
850 int
851 rte_event_eth_rx_adapter_create_ext(uint8_t id, uint8_t dev_id,
852                                 rte_event_eth_rx_adapter_conf_cb conf_cb,
853                                 void *conf_arg)
854 {
855         struct rte_event_eth_rx_adapter *rx_adapter;
856         int ret;
857         int socket_id;
858         uint8_t i;
859         char mem_name[ETH_RX_ADAPTER_SERVICE_NAME_LEN];
860         const uint8_t default_rss_key[] = {
861                 0x6d, 0x5a, 0x56, 0xda, 0x25, 0x5b, 0x0e, 0xc2,
862                 0x41, 0x67, 0x25, 0x3d, 0x43, 0xa3, 0x8f, 0xb0,
863                 0xd0, 0xca, 0x2b, 0xcb, 0xae, 0x7b, 0x30, 0xb4,
864                 0x77, 0xcb, 0x2d, 0xa3, 0x80, 0x30, 0xf2, 0x0c,
865                 0x6a, 0x42, 0xb7, 0x3b, 0xbe, 0xac, 0x01, 0xfa,
866         };
867
868         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
869         RTE_EVENTDEV_VALID_DEVID_OR_ERR_RET(dev_id, -EINVAL);
870         if (conf_cb == NULL)
871                 return -EINVAL;
872
873         if (event_eth_rx_adapter == NULL) {
874                 ret = rte_event_eth_rx_adapter_init();
875                 if (ret)
876                         return ret;
877         }
878
879         rx_adapter = id_to_rx_adapter(id);
880         if (rx_adapter != NULL) {
881                 RTE_EDEV_LOG_ERR("Eth Rx adapter exists id = %" PRIu8, id);
882                 return -EEXIST;
883         }
884
885         socket_id = rte_event_dev_socket_id(dev_id);
886         snprintf(mem_name, ETH_RX_ADAPTER_MEM_NAME_LEN,
887                 "rte_event_eth_rx_adapter_%d",
888                 id);
889
890         rx_adapter = rte_zmalloc_socket(mem_name, sizeof(*rx_adapter),
891                         RTE_CACHE_LINE_SIZE, socket_id);
892         if (rx_adapter == NULL) {
893                 RTE_EDEV_LOG_ERR("failed to get mem for rx adapter");
894                 return -ENOMEM;
895         }
896
897         rx_adapter->eventdev_id = dev_id;
898         rx_adapter->socket_id = socket_id;
899         rx_adapter->conf_cb = conf_cb;
900         rx_adapter->conf_arg = conf_arg;
901         strcpy(rx_adapter->mem_name, mem_name);
902         rx_adapter->eth_devices = rte_zmalloc_socket(rx_adapter->mem_name,
903                                         rte_eth_dev_count() *
904                                         sizeof(struct eth_device_info), 0,
905                                         socket_id);
906         rte_convert_rss_key((const uint32_t *)default_rss_key,
907                         (uint32_t *)rx_adapter->rss_key_be,
908                             RTE_DIM(default_rss_key));
909
910         if (rx_adapter->eth_devices == NULL) {
911                 RTE_EDEV_LOG_ERR("failed to get mem for eth devices\n");
912                 rte_free(rx_adapter);
913                 return -ENOMEM;
914         }
915         rte_spinlock_init(&rx_adapter->rx_lock);
916         for (i = 0; i < rte_eth_dev_count(); i++)
917                 rx_adapter->eth_devices[i].dev = &rte_eth_devices[i];
918
919         event_eth_rx_adapter[id] = rx_adapter;
920         if (conf_cb == default_conf_cb)
921                 rx_adapter->default_cb_arg = 1;
922         return 0;
923 }
924
925 int
926 rte_event_eth_rx_adapter_create(uint8_t id, uint8_t dev_id,
927                 struct rte_event_port_conf *port_config)
928 {
929         struct rte_event_port_conf *pc;
930         int ret;
931
932         if (port_config == NULL)
933                 return -EINVAL;
934         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
935
936         pc = rte_malloc(NULL, sizeof(*pc), 0);
937         if (pc == NULL)
938                 return -ENOMEM;
939         *pc = *port_config;
940         ret = rte_event_eth_rx_adapter_create_ext(id, dev_id,
941                                         default_conf_cb,
942                                         pc);
943         if (ret)
944                 rte_free(pc);
945         return ret;
946 }
947
948 int
949 rte_event_eth_rx_adapter_free(uint8_t id)
950 {
951         struct rte_event_eth_rx_adapter *rx_adapter;
952
953         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
954
955         rx_adapter = id_to_rx_adapter(id);
956         if (rx_adapter == NULL)
957                 return -EINVAL;
958
959         if (rx_adapter->nb_queues) {
960                 RTE_EDEV_LOG_ERR("%" PRIu16 " Rx queues not deleted",
961                                 rx_adapter->nb_queues);
962                 return -EBUSY;
963         }
964
965         if (rx_adapter->default_cb_arg)
966                 rte_free(rx_adapter->conf_arg);
967         rte_free(rx_adapter->eth_devices);
968         rte_free(rx_adapter);
969         event_eth_rx_adapter[id] = NULL;
970
971         return 0;
972 }
973
974 int
975 rte_event_eth_rx_adapter_queue_add(uint8_t id,
976                 uint8_t eth_dev_id,
977                 int32_t rx_queue_id,
978                 const struct rte_event_eth_rx_adapter_queue_conf *queue_conf)
979 {
980         int ret;
981         uint32_t cap;
982         struct rte_event_eth_rx_adapter *rx_adapter;
983         struct rte_eventdev *dev;
984         struct eth_device_info *dev_info;
985         int start_service;
986
987         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
988         RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
989
990         rx_adapter = id_to_rx_adapter(id);
991         if ((rx_adapter == NULL) || (queue_conf == NULL))
992                 return -EINVAL;
993
994         dev = &rte_eventdevs[rx_adapter->eventdev_id];
995         ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
996                                                 eth_dev_id,
997                                                 &cap);
998         if (ret) {
999                 RTE_EDEV_LOG_ERR("Failed to get adapter caps edev %" PRIu8
1000                         "eth port %" PRIu8, id, eth_dev_id);
1001                 return ret;
1002         }
1003
1004         if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_OVERRIDE_FLOW_ID) == 0
1005                 && (queue_conf->rx_queue_flags &
1006                         RTE_EVENT_ETH_RX_ADAPTER_QUEUE_FLOW_ID_VALID)) {
1007                 RTE_EDEV_LOG_ERR("Flow ID override is not supported,"
1008                                 " eth port: %" PRIu8 " adapter id: %" PRIu8,
1009                                 eth_dev_id, id);
1010                 return -EINVAL;
1011         }
1012
1013         if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_MULTI_EVENTQ) == 0 &&
1014                 (rx_queue_id != -1)) {
1015                 RTE_EDEV_LOG_ERR("Rx queues can only be connected to single "
1016                         "event queue id %u eth port %u", id, eth_dev_id);
1017                 return -EINVAL;
1018         }
1019
1020         if (rx_queue_id != -1 && (uint16_t)rx_queue_id >=
1021                         rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
1022                 RTE_EDEV_LOG_ERR("Invalid rx queue_id %" PRIu16,
1023                          (uint16_t)rx_queue_id);
1024                 return -EINVAL;
1025         }
1026
1027         start_service = 0;
1028         dev_info = &rx_adapter->eth_devices[eth_dev_id];
1029
1030         if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
1031                 RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->eth_rx_adapter_queue_add,
1032                                         -ENOTSUP);
1033                 if (dev_info->rx_queue == NULL) {
1034                         dev_info->rx_queue =
1035                             rte_zmalloc_socket(rx_adapter->mem_name,
1036                                         dev_info->dev->data->nb_rx_queues *
1037                                         sizeof(struct eth_rx_queue_info), 0,
1038                                         rx_adapter->socket_id);
1039                         if (dev_info->rx_queue == NULL)
1040                                 return -ENOMEM;
1041                 }
1042
1043                 ret = (*dev->dev_ops->eth_rx_adapter_queue_add)(dev,
1044                                 &rte_eth_devices[eth_dev_id],
1045                                 rx_queue_id, queue_conf);
1046                 if (ret == 0) {
1047                         dev_info->internal_event_port = 1;
1048                         update_queue_info(rx_adapter,
1049                                         &rx_adapter->eth_devices[eth_dev_id],
1050                                         rx_queue_id,
1051                                         1);
1052                 }
1053         } else {
1054                 rte_spinlock_lock(&rx_adapter->rx_lock);
1055                 dev_info->internal_event_port = 0;
1056                 ret = init_service(rx_adapter, id);
1057                 if (ret == 0)
1058                         ret = add_rx_queue(rx_adapter, eth_dev_id, rx_queue_id,
1059                                         queue_conf);
1060                 rte_spinlock_unlock(&rx_adapter->rx_lock);
1061                 if (ret == 0)
1062                         start_service = !!sw_rx_adapter_queue_count(rx_adapter);
1063         }
1064
1065         if (ret)
1066                 return ret;
1067
1068         if (start_service)
1069                 rte_service_component_runstate_set(rx_adapter->service_id, 1);
1070
1071         return 0;
1072 }
1073
1074 int
1075 rte_event_eth_rx_adapter_queue_del(uint8_t id, uint8_t eth_dev_id,
1076                                 int32_t rx_queue_id)
1077 {
1078         int ret = 0;
1079         struct rte_eventdev *dev;
1080         struct rte_event_eth_rx_adapter *rx_adapter;
1081         struct eth_device_info *dev_info;
1082         uint32_t cap;
1083         uint16_t i;
1084
1085         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
1086         RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
1087
1088         rx_adapter = id_to_rx_adapter(id);
1089         if (rx_adapter == NULL)
1090                 return -EINVAL;
1091
1092         dev = &rte_eventdevs[rx_adapter->eventdev_id];
1093         ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
1094                                                 eth_dev_id,
1095                                                 &cap);
1096         if (ret)
1097                 return ret;
1098
1099         if (rx_queue_id != -1 && (uint16_t)rx_queue_id >=
1100                 rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
1101                 RTE_EDEV_LOG_ERR("Invalid rx queue_id %" PRIu16,
1102                          (uint16_t)rx_queue_id);
1103                 return -EINVAL;
1104         }
1105
1106         dev_info = &rx_adapter->eth_devices[eth_dev_id];
1107
1108         if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
1109                 RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->eth_rx_adapter_queue_del,
1110                                  -ENOTSUP);
1111                 ret = (*dev->dev_ops->eth_rx_adapter_queue_del)(dev,
1112                                                 &rte_eth_devices[eth_dev_id],
1113                                                 rx_queue_id);
1114                 if (ret == 0) {
1115                         update_queue_info(rx_adapter,
1116                                         &rx_adapter->eth_devices[eth_dev_id],
1117                                         rx_queue_id,
1118                                         0);
1119                         if (dev_info->nb_dev_queues == 0) {
1120                                 rte_free(dev_info->rx_queue);
1121                                 dev_info->rx_queue = NULL;
1122                         }
1123                 }
1124         } else {
1125                 int rc;
1126                 rte_spinlock_lock(&rx_adapter->rx_lock);
1127                 if (rx_queue_id == -1) {
1128                         for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++)
1129                                 event_eth_rx_adapter_queue_del(rx_adapter,
1130                                                         dev_info,
1131                                                         i);
1132                 } else {
1133                         event_eth_rx_adapter_queue_del(rx_adapter,
1134                                                 dev_info,
1135                                                 (uint16_t)rx_queue_id);
1136                 }
1137
1138                 rc = eth_poll_wrr_calc(rx_adapter);
1139                 if (rc)
1140                         RTE_EDEV_LOG_ERR("WRR recalculation failed %" PRId32,
1141                                         rc);
1142
1143                 if (dev_info->nb_dev_queues == 0) {
1144                         rte_free(dev_info->rx_queue);
1145                         dev_info->rx_queue = NULL;
1146                 }
1147
1148                 rte_spinlock_unlock(&rx_adapter->rx_lock);
1149                 rte_service_component_runstate_set(rx_adapter->service_id,
1150                                 sw_rx_adapter_queue_count(rx_adapter));
1151         }
1152
1153         return ret;
1154 }
1155
1156
1157 int
1158 rte_event_eth_rx_adapter_start(uint8_t id)
1159 {
1160         return rx_adapter_ctrl(id, 1);
1161 }
1162
1163 int
1164 rte_event_eth_rx_adapter_stop(uint8_t id)
1165 {
1166         return rx_adapter_ctrl(id, 0);
1167 }
1168
1169 int
1170 rte_event_eth_rx_adapter_stats_get(uint8_t id,
1171                                struct rte_event_eth_rx_adapter_stats *stats)
1172 {
1173         struct rte_event_eth_rx_adapter *rx_adapter;
1174         struct rte_event_eth_rx_adapter_stats dev_stats_sum = { 0 };
1175         struct rte_event_eth_rx_adapter_stats dev_stats;
1176         struct rte_eventdev *dev;
1177         struct eth_device_info *dev_info;
1178         uint32_t i;
1179         int ret;
1180
1181         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
1182
1183         rx_adapter = id_to_rx_adapter(id);
1184         if (rx_adapter  == NULL || stats == NULL)
1185                 return -EINVAL;
1186
1187         dev = &rte_eventdevs[rx_adapter->eventdev_id];
1188         memset(stats, 0, sizeof(*stats));
1189         for (i = 0; i < rte_eth_dev_count(); i++) {
1190                 dev_info = &rx_adapter->eth_devices[i];
1191                 if (dev_info->internal_event_port == 0 ||
1192                         dev->dev_ops->eth_rx_adapter_stats_get == NULL)
1193                         continue;
1194                 ret = (*dev->dev_ops->eth_rx_adapter_stats_get)(dev,
1195                                                 &rte_eth_devices[i],
1196                                                 &dev_stats);
1197                 if (ret)
1198                         continue;
1199                 dev_stats_sum.rx_packets += dev_stats.rx_packets;
1200                 dev_stats_sum.rx_enq_count += dev_stats.rx_enq_count;
1201         }
1202
1203         if (rx_adapter->service_inited)
1204                 *stats = rx_adapter->stats;
1205
1206         stats->rx_packets += dev_stats_sum.rx_packets;
1207         stats->rx_enq_count += dev_stats_sum.rx_enq_count;
1208         return 0;
1209 }
1210
1211 int
1212 rte_event_eth_rx_adapter_stats_reset(uint8_t id)
1213 {
1214         struct rte_event_eth_rx_adapter *rx_adapter;
1215         struct rte_eventdev *dev;
1216         struct eth_device_info *dev_info;
1217         uint32_t i;
1218
1219         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
1220
1221         rx_adapter = id_to_rx_adapter(id);
1222         if (rx_adapter == NULL)
1223                 return -EINVAL;
1224
1225         dev = &rte_eventdevs[rx_adapter->eventdev_id];
1226         for (i = 0; i < rte_eth_dev_count(); i++) {
1227                 dev_info = &rx_adapter->eth_devices[i];
1228                 if (dev_info->internal_event_port == 0 ||
1229                         dev->dev_ops->eth_rx_adapter_stats_reset == NULL)
1230                         continue;
1231                 (*dev->dev_ops->eth_rx_adapter_stats_reset)(dev,
1232                                                         &rte_eth_devices[i]);
1233         }
1234
1235         memset(&rx_adapter->stats, 0, sizeof(rx_adapter->stats));
1236         return 0;
1237 }
1238
1239 int
1240 rte_event_eth_rx_adapter_service_id_get(uint8_t id, uint32_t *service_id)
1241 {
1242         struct rte_event_eth_rx_adapter *rx_adapter;
1243
1244         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
1245
1246         rx_adapter = id_to_rx_adapter(id);
1247         if (rx_adapter == NULL || service_id == NULL)
1248                 return -EINVAL;
1249
1250         if (rx_adapter->service_inited)
1251                 *service_id = rx_adapter->service_id;
1252
1253         return rx_adapter->service_inited ? 0 : -ESRCH;
1254 }