1 #include <rte_cycles.h>
2 #include <rte_common.h>
5 #include <rte_ethdev.h>
7 #include <rte_malloc.h>
8 #include <rte_service_component.h>
11 #include "rte_eventdev.h"
12 #include "rte_eventdev_pmd.h"
13 #include "rte_event_eth_rx_adapter.h"
16 #define BLOCK_CNT_THRESHOLD 10
17 #define ETH_EVENT_BUFFER_SIZE (4*BATCH_SIZE)
19 #define ETH_RX_ADAPTER_SERVICE_NAME_LEN 32
20 #define ETH_RX_ADAPTER_MEM_NAME_LEN 32
22 #define RSS_KEY_SIZE 40
25 * There is an instance of this struct per polled Rx queue added to the
28 struct eth_rx_poll_entry {
29 /* Eth port to poll */
31 /* Eth rx queue to poll */
35 /* Instance per adapter */
36 struct rte_eth_event_enqueue_buffer {
37 /* Count of events in this buffer */
39 /* Array of events in this buffer */
40 struct rte_event events[ETH_EVENT_BUFFER_SIZE];
43 struct rte_event_eth_rx_adapter {
45 uint8_t rss_key_be[RSS_KEY_SIZE];
46 /* Event device identifier */
48 /* Per ethernet device structure */
49 struct eth_device_info *eth_devices;
50 /* Event port identifier */
51 uint8_t event_port_id;
52 /* Lock to serialize config updates with service function */
53 rte_spinlock_t rx_lock;
54 /* Max mbufs processed in any service function invocation */
56 /* Receive queues that need to be polled */
57 struct eth_rx_poll_entry *eth_rx_poll;
58 /* Size of the eth_rx_poll array */
59 uint16_t num_rx_polled;
60 /* Weighted round robin schedule */
62 /* wrr_sched[] size */
64 /* Next entry in wrr[] to begin polling */
66 /* Event burst buffer */
67 struct rte_eth_event_enqueue_buffer event_enqueue_buffer;
68 /* Per adapter stats */
69 struct rte_event_eth_rx_adapter_stats stats;
70 /* Block count, counts up to BLOCK_CNT_THRESHOLD */
71 uint16_t enq_block_count;
73 uint64_t rx_enq_block_start_ts;
74 /* Configuration callback for rte_service configuration */
75 rte_event_eth_rx_adapter_conf_cb conf_cb;
76 /* Configuration callback argument */
78 /* Set if default_cb is being used */
80 /* Service initialization state */
81 uint8_t service_inited;
82 /* Total count of Rx queues in adapter */
84 /* Memory allocation name */
85 char mem_name[ETH_RX_ADAPTER_MEM_NAME_LEN];
86 /* Socket identifier cached from eventdev */
88 /* Per adapter EAL service */
90 /* Adapter started flag */
92 } __rte_cache_aligned;
95 struct eth_device_info {
96 struct rte_eth_dev *dev;
97 struct eth_rx_queue_info *rx_queue;
98 /* Set if ethdev->eventdev packet transfer uses a
101 uint8_t internal_event_port;
102 /* Set if the adapter is processing rx queues for
103 * this eth device and packet processing has been
104 * started, allows for the code to know if the PMD
105 * rx_adapter_stop callback needs to be invoked
107 uint8_t dev_rx_started;
108 /* If nb_dev_queues > 0, the start callback will
109 * be invoked if not already invoked
111 uint16_t nb_dev_queues;
115 struct eth_rx_queue_info {
116 int queue_enabled; /* True if added */
117 uint16_t wt; /* Polling weight */
118 uint8_t event_queue_id; /* Event queue to enqueue packets to */
119 uint8_t sched_type; /* Sched type for events */
120 uint8_t priority; /* Event priority */
121 uint32_t flow_id; /* App provided flow identifier */
122 uint32_t flow_id_mask; /* Set to ~0 if app provides flow id else 0 */
125 static struct rte_event_eth_rx_adapter **event_eth_rx_adapter;
130 return id < RTE_EVENT_ETH_RX_ADAPTER_MAX_INSTANCE;
133 #define RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, retval) do { \
134 if (!valid_id(id)) { \
135 RTE_EDEV_LOG_ERR("Invalid eth Rx adapter id = %d\n", id); \
141 sw_rx_adapter_queue_count(struct rte_event_eth_rx_adapter *rx_adapter)
143 return rx_adapter->num_rx_polled;
146 /* Greatest common divisor */
147 static uint16_t gcd_u16(uint16_t a, uint16_t b)
151 return r ? gcd_u16(b, r) : b;
154 /* Returns the next queue in the polling sequence
156 * http://kb.linuxvirtualserver.org/wiki/Weighted_Round-Robin_Scheduling
159 wrr_next(struct rte_event_eth_rx_adapter *rx_adapter,
160 unsigned int n, int *cw,
161 struct eth_rx_poll_entry *eth_rx_poll, uint16_t max_wt,
162 uint16_t gcd, int prev)
178 q = eth_rx_poll[i].eth_rx_qid;
179 d = eth_rx_poll[i].eth_dev_id;
180 w = rx_adapter->eth_devices[d].rx_queue[q].wt;
187 /* Precalculate WRR polling sequence for all queues in rx_adapter */
189 eth_poll_wrr_calc(struct rte_event_eth_rx_adapter *rx_adapter)
195 /* Initialize variables for calculation of wrr schedule */
196 uint16_t max_wrr_pos = 0;
197 unsigned int poll_q = 0;
201 struct eth_rx_poll_entry *rx_poll = NULL;
202 uint32_t *rx_wrr = NULL;
204 if (rx_adapter->num_rx_polled) {
205 size_t len = RTE_ALIGN(rx_adapter->num_rx_polled *
206 sizeof(*rx_adapter->eth_rx_poll),
207 RTE_CACHE_LINE_SIZE);
208 rx_poll = rte_zmalloc_socket(rx_adapter->mem_name,
211 rx_adapter->socket_id);
215 /* Generate array of all queues to poll, the size of this
218 for (d = 0; d < rte_eth_dev_count(); d++) {
219 uint16_t nb_rx_queues;
220 struct eth_device_info *dev_info =
221 &rx_adapter->eth_devices[d];
222 nb_rx_queues = dev_info->dev->data->nb_rx_queues;
223 if (dev_info->rx_queue == NULL)
225 if (dev_info->internal_event_port)
227 for (q = 0; q < nb_rx_queues; q++) {
228 struct eth_rx_queue_info *queue_info =
229 &dev_info->rx_queue[q];
230 if (queue_info->queue_enabled == 0)
233 uint16_t wt = queue_info->wt;
234 rx_poll[poll_q].eth_dev_id = d;
235 rx_poll[poll_q].eth_rx_qid = q;
237 max_wt = RTE_MAX(max_wt, wt);
238 gcd = (gcd) ? gcd_u16(gcd, wt) : wt;
243 len = RTE_ALIGN(max_wrr_pos * sizeof(*rx_wrr),
244 RTE_CACHE_LINE_SIZE);
245 rx_wrr = rte_zmalloc_socket(rx_adapter->mem_name,
248 rx_adapter->socket_id);
249 if (rx_wrr == NULL) {
254 /* Generate polling sequence based on weights */
257 for (i = 0; i < max_wrr_pos; i++) {
258 rx_wrr[i] = wrr_next(rx_adapter, poll_q, &cw,
259 rx_poll, max_wt, gcd, prev);
264 rte_free(rx_adapter->eth_rx_poll);
265 rte_free(rx_adapter->wrr_sched);
267 rx_adapter->eth_rx_poll = rx_poll;
268 rx_adapter->wrr_sched = rx_wrr;
269 rx_adapter->wrr_len = max_wrr_pos;
275 mtoip(struct rte_mbuf *m, struct ipv4_hdr **ipv4_hdr,
276 struct ipv6_hdr **ipv6_hdr)
278 struct ether_hdr *eth_hdr = rte_pktmbuf_mtod(m, struct ether_hdr *);
279 struct vlan_hdr *vlan_hdr;
284 switch (eth_hdr->ether_type) {
285 case RTE_BE16(ETHER_TYPE_IPv4):
286 *ipv4_hdr = (struct ipv4_hdr *)(eth_hdr + 1);
289 case RTE_BE16(ETHER_TYPE_IPv6):
290 *ipv6_hdr = (struct ipv6_hdr *)(eth_hdr + 1);
293 case RTE_BE16(ETHER_TYPE_VLAN):
294 vlan_hdr = (struct vlan_hdr *)(eth_hdr + 1);
295 switch (vlan_hdr->eth_proto) {
296 case RTE_BE16(ETHER_TYPE_IPv4):
297 *ipv4_hdr = (struct ipv4_hdr *)(vlan_hdr + 1);
299 case RTE_BE16(ETHER_TYPE_IPv6):
300 *ipv6_hdr = (struct ipv6_hdr *)(vlan_hdr + 1);
312 /* Calculate RSS hash for IPv4/6 */
313 static inline uint32_t
314 do_softrss(struct rte_mbuf *m, const uint8_t *rss_key_be)
318 struct rte_ipv4_tuple ipv4_tuple;
319 struct rte_ipv6_tuple ipv6_tuple;
320 struct ipv4_hdr *ipv4_hdr;
321 struct ipv6_hdr *ipv6_hdr;
323 mtoip(m, &ipv4_hdr, &ipv6_hdr);
326 ipv4_tuple.src_addr = rte_be_to_cpu_32(ipv4_hdr->src_addr);
327 ipv4_tuple.dst_addr = rte_be_to_cpu_32(ipv4_hdr->dst_addr);
329 input_len = RTE_THASH_V4_L3_LEN;
330 } else if (ipv6_hdr) {
331 rte_thash_load_v6_addrs(ipv6_hdr,
332 (union rte_thash_tuple *)&ipv6_tuple);
334 input_len = RTE_THASH_V6_L3_LEN;
338 return rte_softrss_be(tuple, input_len, rss_key_be);
342 rx_enq_blocked(struct rte_event_eth_rx_adapter *rx_adapter)
344 return !!rx_adapter->enq_block_count;
348 rx_enq_block_start_ts(struct rte_event_eth_rx_adapter *rx_adapter)
350 if (rx_adapter->rx_enq_block_start_ts)
353 rx_adapter->enq_block_count++;
354 if (rx_adapter->enq_block_count < BLOCK_CNT_THRESHOLD)
357 rx_adapter->rx_enq_block_start_ts = rte_get_tsc_cycles();
361 rx_enq_block_end_ts(struct rte_event_eth_rx_adapter *rx_adapter,
362 struct rte_event_eth_rx_adapter_stats *stats)
364 if (unlikely(!stats->rx_enq_start_ts))
365 stats->rx_enq_start_ts = rte_get_tsc_cycles();
367 if (likely(!rx_enq_blocked(rx_adapter)))
370 rx_adapter->enq_block_count = 0;
371 if (rx_adapter->rx_enq_block_start_ts) {
372 stats->rx_enq_end_ts = rte_get_tsc_cycles();
373 stats->rx_enq_block_cycles += stats->rx_enq_end_ts -
374 rx_adapter->rx_enq_block_start_ts;
375 rx_adapter->rx_enq_block_start_ts = 0;
379 /* Add event to buffer, free space check is done prior to calling
383 buf_event_enqueue(struct rte_event_eth_rx_adapter *rx_adapter,
384 struct rte_event *ev)
386 struct rte_eth_event_enqueue_buffer *buf =
387 &rx_adapter->event_enqueue_buffer;
388 rte_memcpy(&buf->events[buf->count++], ev, sizeof(struct rte_event));
391 /* Enqueue buffered events to event device */
392 static inline uint16_t
393 flush_event_buffer(struct rte_event_eth_rx_adapter *rx_adapter)
395 struct rte_eth_event_enqueue_buffer *buf =
396 &rx_adapter->event_enqueue_buffer;
397 struct rte_event_eth_rx_adapter_stats *stats = &rx_adapter->stats;
399 uint16_t n = rte_event_enqueue_new_burst(rx_adapter->eventdev_id,
400 rx_adapter->event_port_id,
403 if (n != buf->count) {
406 (buf->count - n) * sizeof(struct rte_event));
407 stats->rx_enq_retry++;
410 n ? rx_enq_block_end_ts(rx_adapter, stats) :
411 rx_enq_block_start_ts(rx_adapter);
414 stats->rx_enq_count += n;
420 fill_event_buffer(struct rte_event_eth_rx_adapter *rx_adapter,
422 uint16_t rx_queue_id,
423 struct rte_mbuf **mbufs,
427 struct eth_device_info *eth_device_info =
428 &rx_adapter->eth_devices[eth_dev_id];
429 struct eth_rx_queue_info *eth_rx_queue_info =
430 ð_device_info->rx_queue[rx_queue_id];
432 int32_t qid = eth_rx_queue_info->event_queue_id;
433 uint8_t sched_type = eth_rx_queue_info->sched_type;
434 uint8_t priority = eth_rx_queue_info->priority;
436 struct rte_event events[BATCH_SIZE];
437 struct rte_mbuf *m = mbufs[0];
442 /* 0xffff ffff if PKT_RX_RSS_HASH is set, otherwise 0 */
443 rss_mask = ~(((m->ol_flags & PKT_RX_RSS_HASH) != 0) - 1);
444 do_rss = !rss_mask && !eth_rx_queue_info->flow_id_mask;
446 for (i = 0; i < num; i++) {
448 struct rte_event *ev = &events[i];
451 do_softrss(m, rx_adapter->rss_key_be) : m->hash.rss;
453 eth_rx_queue_info->flow_id &
454 eth_rx_queue_info->flow_id_mask;
455 flow_id |= rss & ~eth_rx_queue_info->flow_id_mask;
457 ev->flow_id = flow_id;
458 ev->op = RTE_EVENT_OP_NEW;
459 ev->sched_type = sched_type;
461 ev->event_type = RTE_EVENT_TYPE_ETH_RX_ADAPTER;
462 ev->sub_event_type = 0;
463 ev->priority = priority;
466 buf_event_enqueue(rx_adapter, ev);
471 * Polls receive queues added to the event adapter and enqueues received
472 * packets to the event device.
474 * The receive code enqueues initially to a temporary buffer, the
475 * temporary buffer is drained anytime it holds >= BATCH_SIZE packets
477 * If there isn't space available in the temporary buffer, packets from the
478 * Rx queue aren't dequeued from the eth device, this back pressures the
479 * eth device, in virtual device environments this back pressure is relayed to
480 * the hypervisor's switching layer where adjustments can be made to deal with
484 eth_rx_poll(struct rte_event_eth_rx_adapter *rx_adapter)
489 struct rte_mbuf *mbufs[BATCH_SIZE];
490 struct rte_eth_event_enqueue_buffer *buf;
494 wrr_pos = rx_adapter->wrr_pos;
495 max_nb_rx = rx_adapter->max_nb_rx;
496 buf = &rx_adapter->event_enqueue_buffer;
497 struct rte_event_eth_rx_adapter_stats *stats = &rx_adapter->stats;
499 /* Iterate through a WRR sequence */
500 for (num_queue = 0; num_queue < rx_adapter->wrr_len; num_queue++) {
501 unsigned int poll_idx = rx_adapter->wrr_sched[wrr_pos];
502 uint16_t qid = rx_adapter->eth_rx_poll[poll_idx].eth_rx_qid;
503 uint8_t d = rx_adapter->eth_rx_poll[poll_idx].eth_dev_id;
505 /* Don't do a batch dequeue from the rx queue if there isn't
506 * enough space in the enqueue buffer.
508 if (buf->count >= BATCH_SIZE)
509 flush_event_buffer(rx_adapter);
510 if (BATCH_SIZE > (ETH_EVENT_BUFFER_SIZE - buf->count)) {
511 rx_adapter->wrr_pos = wrr_pos;
515 stats->rx_poll_count++;
516 n = rte_eth_rx_burst(d, qid, mbufs, BATCH_SIZE);
519 stats->rx_packets += n;
520 /* The check before rte_eth_rx_burst() ensures that
521 * all n mbufs can be buffered
523 fill_event_buffer(rx_adapter, d, qid, mbufs, n);
525 if (nb_rx > max_nb_rx) {
526 rx_adapter->wrr_pos =
527 (wrr_pos + 1) % rx_adapter->wrr_len;
532 if (++wrr_pos == rx_adapter->wrr_len)
536 if (buf->count >= BATCH_SIZE)
537 flush_event_buffer(rx_adapter);
541 event_eth_rx_adapter_service_func(void *args)
543 struct rte_event_eth_rx_adapter *rx_adapter = args;
545 if (rte_spinlock_trylock(&rx_adapter->rx_lock) == 0)
547 if (!rx_adapter->rxa_started) {
549 rte_spinlock_unlock(&rx_adapter->rx_lock);
551 eth_rx_poll(rx_adapter);
552 rte_spinlock_unlock(&rx_adapter->rx_lock);
557 rte_event_eth_rx_adapter_init(void)
559 const char *name = "rte_event_eth_rx_adapter_array";
560 const struct rte_memzone *mz;
563 sz = sizeof(*event_eth_rx_adapter) *
564 RTE_EVENT_ETH_RX_ADAPTER_MAX_INSTANCE;
565 sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);
567 mz = rte_memzone_lookup(name);
569 mz = rte_memzone_reserve_aligned(name, sz, rte_socket_id(), 0,
570 RTE_CACHE_LINE_SIZE);
572 RTE_EDEV_LOG_ERR("failed to reserve memzone err = %"
578 event_eth_rx_adapter = mz->addr;
582 static inline struct rte_event_eth_rx_adapter *
583 id_to_rx_adapter(uint8_t id)
585 return event_eth_rx_adapter ?
586 event_eth_rx_adapter[id] : NULL;
590 default_conf_cb(uint8_t id, uint8_t dev_id,
591 struct rte_event_eth_rx_adapter_conf *conf, void *arg)
594 struct rte_eventdev *dev;
595 struct rte_event_dev_config dev_conf;
598 struct rte_event_port_conf *port_conf = arg;
599 struct rte_event_eth_rx_adapter *rx_adapter = id_to_rx_adapter(id);
601 dev = &rte_eventdevs[rx_adapter->eventdev_id];
602 dev_conf = dev->data->dev_conf;
604 started = dev->data->dev_started;
606 rte_event_dev_stop(dev_id);
607 port_id = dev_conf.nb_event_ports;
608 dev_conf.nb_event_ports += 1;
609 ret = rte_event_dev_configure(dev_id, &dev_conf);
611 RTE_EDEV_LOG_ERR("failed to configure event dev %u\n",
614 rte_event_dev_start(dev_id);
618 ret = rte_event_port_setup(dev_id, port_id, port_conf);
620 RTE_EDEV_LOG_ERR("failed to setup event port %u\n",
625 conf->event_port_id = port_id;
626 conf->max_nb_rx = 128;
628 rte_event_dev_start(dev_id);
629 rx_adapter->default_cb_arg = 1;
634 init_service(struct rte_event_eth_rx_adapter *rx_adapter, uint8_t id)
637 struct rte_service_spec service;
638 struct rte_event_eth_rx_adapter_conf rx_adapter_conf;
640 if (rx_adapter->service_inited)
643 memset(&service, 0, sizeof(service));
644 snprintf(service.name, ETH_RX_ADAPTER_SERVICE_NAME_LEN,
645 "rte_event_eth_rx_adapter_%d", id);
646 service.socket_id = rx_adapter->socket_id;
647 service.callback = event_eth_rx_adapter_service_func;
648 service.callback_userdata = rx_adapter;
649 /* Service function handles locking for queue add/del updates */
650 service.capabilities = RTE_SERVICE_CAP_MT_SAFE;
651 ret = rte_service_component_register(&service, &rx_adapter->service_id);
653 RTE_EDEV_LOG_ERR("failed to register service %s err = %" PRId32,
658 ret = rx_adapter->conf_cb(id, rx_adapter->eventdev_id,
659 &rx_adapter_conf, rx_adapter->conf_arg);
661 RTE_EDEV_LOG_ERR("configuration callback failed err = %" PRId32,
665 rx_adapter->event_port_id = rx_adapter_conf.event_port_id;
666 rx_adapter->max_nb_rx = rx_adapter_conf.max_nb_rx;
667 rx_adapter->service_inited = 1;
671 rte_service_component_unregister(rx_adapter->service_id);
677 update_queue_info(struct rte_event_eth_rx_adapter *rx_adapter,
678 struct eth_device_info *dev_info,
682 struct eth_rx_queue_info *queue_info;
686 if (dev_info->rx_queue == NULL)
689 if (rx_queue_id == -1) {
690 for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++)
691 update_queue_info(rx_adapter, dev_info, i, add);
693 queue_info = &dev_info->rx_queue[rx_queue_id];
694 enabled = queue_info->queue_enabled;
696 rx_adapter->nb_queues += !enabled;
697 dev_info->nb_dev_queues += !enabled;
699 rx_adapter->nb_queues -= enabled;
700 dev_info->nb_dev_queues -= enabled;
702 queue_info->queue_enabled = !!add;
707 event_eth_rx_adapter_queue_del(struct rte_event_eth_rx_adapter *rx_adapter,
708 struct eth_device_info *dev_info,
709 uint16_t rx_queue_id)
711 struct eth_rx_queue_info *queue_info;
713 if (rx_adapter->nb_queues == 0)
716 queue_info = &dev_info->rx_queue[rx_queue_id];
717 rx_adapter->num_rx_polled -= queue_info->queue_enabled;
718 update_queue_info(rx_adapter, dev_info, rx_queue_id, 0);
723 event_eth_rx_adapter_queue_add(struct rte_event_eth_rx_adapter *rx_adapter,
724 struct eth_device_info *dev_info,
725 uint16_t rx_queue_id,
726 const struct rte_event_eth_rx_adapter_queue_conf *conf)
729 struct eth_rx_queue_info *queue_info;
730 const struct rte_event *ev = &conf->ev;
732 queue_info = &dev_info->rx_queue[rx_queue_id];
733 queue_info->event_queue_id = ev->queue_id;
734 queue_info->sched_type = ev->sched_type;
735 queue_info->priority = ev->priority;
736 queue_info->wt = conf->servicing_weight;
738 if (conf->rx_queue_flags &
739 RTE_EVENT_ETH_RX_ADAPTER_QUEUE_FLOW_ID_VALID) {
740 queue_info->flow_id = ev->flow_id;
741 queue_info->flow_id_mask = ~0;
744 /* The same queue can be added more than once */
745 rx_adapter->num_rx_polled += !queue_info->queue_enabled;
746 update_queue_info(rx_adapter, dev_info, rx_queue_id, 1);
749 static int add_rx_queue(struct rte_event_eth_rx_adapter *rx_adapter,
752 const struct rte_event_eth_rx_adapter_queue_conf *queue_conf)
754 struct eth_device_info *dev_info = &rx_adapter->eth_devices[eth_dev_id];
755 struct rte_event_eth_rx_adapter_queue_conf temp_conf;
759 if (queue_conf->servicing_weight == 0) {
761 struct rte_eth_dev_data *data = dev_info->dev->data;
762 if (data->dev_conf.intr_conf.rxq) {
763 RTE_EDEV_LOG_ERR("Interrupt driven queues"
767 temp_conf = *queue_conf;
769 /* If Rx interrupts are disabled set wt = 1 */
770 temp_conf.servicing_weight = 1;
771 queue_conf = &temp_conf;
774 if (dev_info->rx_queue == NULL) {
776 rte_zmalloc_socket(rx_adapter->mem_name,
777 dev_info->dev->data->nb_rx_queues *
778 sizeof(struct eth_rx_queue_info), 0,
779 rx_adapter->socket_id);
780 if (dev_info->rx_queue == NULL)
784 if (rx_queue_id == -1) {
785 for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++)
786 event_eth_rx_adapter_queue_add(rx_adapter,
790 event_eth_rx_adapter_queue_add(rx_adapter, dev_info,
791 (uint16_t)rx_queue_id,
795 ret = eth_poll_wrr_calc(rx_adapter);
797 event_eth_rx_adapter_queue_del(rx_adapter,
798 dev_info, rx_queue_id);
806 rx_adapter_ctrl(uint8_t id, int start)
808 struct rte_event_eth_rx_adapter *rx_adapter;
809 struct rte_eventdev *dev;
810 struct eth_device_info *dev_info;
815 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
816 rx_adapter = id_to_rx_adapter(id);
817 if (rx_adapter == NULL)
820 dev = &rte_eventdevs[rx_adapter->eventdev_id];
822 for (i = 0; i < rte_eth_dev_count(); i++) {
823 dev_info = &rx_adapter->eth_devices[i];
824 /* if start check for num dev queues */
825 if (start && !dev_info->nb_dev_queues)
827 /* if stop check if dev has been started */
828 if (stop && !dev_info->dev_rx_started)
830 use_service |= !dev_info->internal_event_port;
831 dev_info->dev_rx_started = start;
832 if (dev_info->internal_event_port == 0)
834 start ? (*dev->dev_ops->eth_rx_adapter_start)(dev,
835 &rte_eth_devices[i]) :
836 (*dev->dev_ops->eth_rx_adapter_stop)(dev,
837 &rte_eth_devices[i]);
841 rte_spinlock_lock(&rx_adapter->rx_lock);
842 rx_adapter->rxa_started = start;
843 rte_service_runstate_set(rx_adapter->service_id, start);
844 rte_spinlock_unlock(&rx_adapter->rx_lock);
851 rte_event_eth_rx_adapter_create_ext(uint8_t id, uint8_t dev_id,
852 rte_event_eth_rx_adapter_conf_cb conf_cb,
855 struct rte_event_eth_rx_adapter *rx_adapter;
859 char mem_name[ETH_RX_ADAPTER_SERVICE_NAME_LEN];
860 const uint8_t default_rss_key[] = {
861 0x6d, 0x5a, 0x56, 0xda, 0x25, 0x5b, 0x0e, 0xc2,
862 0x41, 0x67, 0x25, 0x3d, 0x43, 0xa3, 0x8f, 0xb0,
863 0xd0, 0xca, 0x2b, 0xcb, 0xae, 0x7b, 0x30, 0xb4,
864 0x77, 0xcb, 0x2d, 0xa3, 0x80, 0x30, 0xf2, 0x0c,
865 0x6a, 0x42, 0xb7, 0x3b, 0xbe, 0xac, 0x01, 0xfa,
868 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
869 RTE_EVENTDEV_VALID_DEVID_OR_ERR_RET(dev_id, -EINVAL);
873 if (event_eth_rx_adapter == NULL) {
874 ret = rte_event_eth_rx_adapter_init();
879 rx_adapter = id_to_rx_adapter(id);
880 if (rx_adapter != NULL) {
881 RTE_EDEV_LOG_ERR("Eth Rx adapter exists id = %" PRIu8, id);
885 socket_id = rte_event_dev_socket_id(dev_id);
886 snprintf(mem_name, ETH_RX_ADAPTER_MEM_NAME_LEN,
887 "rte_event_eth_rx_adapter_%d",
890 rx_adapter = rte_zmalloc_socket(mem_name, sizeof(*rx_adapter),
891 RTE_CACHE_LINE_SIZE, socket_id);
892 if (rx_adapter == NULL) {
893 RTE_EDEV_LOG_ERR("failed to get mem for rx adapter");
897 rx_adapter->eventdev_id = dev_id;
898 rx_adapter->socket_id = socket_id;
899 rx_adapter->conf_cb = conf_cb;
900 rx_adapter->conf_arg = conf_arg;
901 strcpy(rx_adapter->mem_name, mem_name);
902 rx_adapter->eth_devices = rte_zmalloc_socket(rx_adapter->mem_name,
903 rte_eth_dev_count() *
904 sizeof(struct eth_device_info), 0,
906 rte_convert_rss_key((const uint32_t *)default_rss_key,
907 (uint32_t *)rx_adapter->rss_key_be,
908 RTE_DIM(default_rss_key));
910 if (rx_adapter->eth_devices == NULL) {
911 RTE_EDEV_LOG_ERR("failed to get mem for eth devices\n");
912 rte_free(rx_adapter);
915 rte_spinlock_init(&rx_adapter->rx_lock);
916 for (i = 0; i < rte_eth_dev_count(); i++)
917 rx_adapter->eth_devices[i].dev = &rte_eth_devices[i];
919 event_eth_rx_adapter[id] = rx_adapter;
920 if (conf_cb == default_conf_cb)
921 rx_adapter->default_cb_arg = 1;
926 rte_event_eth_rx_adapter_create(uint8_t id, uint8_t dev_id,
927 struct rte_event_port_conf *port_config)
929 struct rte_event_port_conf *pc;
932 if (port_config == NULL)
934 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
936 pc = rte_malloc(NULL, sizeof(*pc), 0);
940 ret = rte_event_eth_rx_adapter_create_ext(id, dev_id,
949 rte_event_eth_rx_adapter_free(uint8_t id)
951 struct rte_event_eth_rx_adapter *rx_adapter;
953 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
955 rx_adapter = id_to_rx_adapter(id);
956 if (rx_adapter == NULL)
959 if (rx_adapter->nb_queues) {
960 RTE_EDEV_LOG_ERR("%" PRIu16 " Rx queues not deleted",
961 rx_adapter->nb_queues);
965 if (rx_adapter->default_cb_arg)
966 rte_free(rx_adapter->conf_arg);
967 rte_free(rx_adapter->eth_devices);
968 rte_free(rx_adapter);
969 event_eth_rx_adapter[id] = NULL;
975 rte_event_eth_rx_adapter_queue_add(uint8_t id,
978 const struct rte_event_eth_rx_adapter_queue_conf *queue_conf)
982 struct rte_event_eth_rx_adapter *rx_adapter;
983 struct rte_eventdev *dev;
984 struct eth_device_info *dev_info;
987 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
988 RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
990 rx_adapter = id_to_rx_adapter(id);
991 if ((rx_adapter == NULL) || (queue_conf == NULL))
994 dev = &rte_eventdevs[rx_adapter->eventdev_id];
995 ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
999 RTE_EDEV_LOG_ERR("Failed to get adapter caps edev %" PRIu8
1000 "eth port %" PRIu8, id, eth_dev_id);
1004 if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_OVERRIDE_FLOW_ID) == 0
1005 && (queue_conf->rx_queue_flags &
1006 RTE_EVENT_ETH_RX_ADAPTER_QUEUE_FLOW_ID_VALID)) {
1007 RTE_EDEV_LOG_ERR("Flow ID override is not supported,"
1008 " eth port: %" PRIu8 " adapter id: %" PRIu8,
1013 if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_MULTI_EVENTQ) == 0 &&
1014 (rx_queue_id != -1)) {
1015 RTE_EDEV_LOG_ERR("Rx queues can only be connected to single "
1016 "event queue id %u eth port %u", id, eth_dev_id);
1020 if (rx_queue_id != -1 && (uint16_t)rx_queue_id >=
1021 rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
1022 RTE_EDEV_LOG_ERR("Invalid rx queue_id %" PRIu16,
1023 (uint16_t)rx_queue_id);
1028 dev_info = &rx_adapter->eth_devices[eth_dev_id];
1030 if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
1031 RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->eth_rx_adapter_queue_add,
1033 if (dev_info->rx_queue == NULL) {
1034 dev_info->rx_queue =
1035 rte_zmalloc_socket(rx_adapter->mem_name,
1036 dev_info->dev->data->nb_rx_queues *
1037 sizeof(struct eth_rx_queue_info), 0,
1038 rx_adapter->socket_id);
1039 if (dev_info->rx_queue == NULL)
1043 ret = (*dev->dev_ops->eth_rx_adapter_queue_add)(dev,
1044 &rte_eth_devices[eth_dev_id],
1045 rx_queue_id, queue_conf);
1047 dev_info->internal_event_port = 1;
1048 update_queue_info(rx_adapter,
1049 &rx_adapter->eth_devices[eth_dev_id],
1054 rte_spinlock_lock(&rx_adapter->rx_lock);
1055 dev_info->internal_event_port = 0;
1056 ret = init_service(rx_adapter, id);
1058 ret = add_rx_queue(rx_adapter, eth_dev_id, rx_queue_id,
1060 rte_spinlock_unlock(&rx_adapter->rx_lock);
1062 start_service = !!sw_rx_adapter_queue_count(rx_adapter);
1069 rte_service_component_runstate_set(rx_adapter->service_id, 1);
1075 rte_event_eth_rx_adapter_queue_del(uint8_t id, uint8_t eth_dev_id,
1076 int32_t rx_queue_id)
1079 struct rte_eventdev *dev;
1080 struct rte_event_eth_rx_adapter *rx_adapter;
1081 struct eth_device_info *dev_info;
1085 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
1086 RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
1088 rx_adapter = id_to_rx_adapter(id);
1089 if (rx_adapter == NULL)
1092 dev = &rte_eventdevs[rx_adapter->eventdev_id];
1093 ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
1099 if (rx_queue_id != -1 && (uint16_t)rx_queue_id >=
1100 rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
1101 RTE_EDEV_LOG_ERR("Invalid rx queue_id %" PRIu16,
1102 (uint16_t)rx_queue_id);
1106 dev_info = &rx_adapter->eth_devices[eth_dev_id];
1108 if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
1109 RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->eth_rx_adapter_queue_del,
1111 ret = (*dev->dev_ops->eth_rx_adapter_queue_del)(dev,
1112 &rte_eth_devices[eth_dev_id],
1115 update_queue_info(rx_adapter,
1116 &rx_adapter->eth_devices[eth_dev_id],
1119 if (dev_info->nb_dev_queues == 0) {
1120 rte_free(dev_info->rx_queue);
1121 dev_info->rx_queue = NULL;
1126 rte_spinlock_lock(&rx_adapter->rx_lock);
1127 if (rx_queue_id == -1) {
1128 for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++)
1129 event_eth_rx_adapter_queue_del(rx_adapter,
1133 event_eth_rx_adapter_queue_del(rx_adapter,
1135 (uint16_t)rx_queue_id);
1138 rc = eth_poll_wrr_calc(rx_adapter);
1140 RTE_EDEV_LOG_ERR("WRR recalculation failed %" PRId32,
1143 if (dev_info->nb_dev_queues == 0) {
1144 rte_free(dev_info->rx_queue);
1145 dev_info->rx_queue = NULL;
1148 rte_spinlock_unlock(&rx_adapter->rx_lock);
1149 rte_service_component_runstate_set(rx_adapter->service_id,
1150 sw_rx_adapter_queue_count(rx_adapter));
1158 rte_event_eth_rx_adapter_start(uint8_t id)
1160 return rx_adapter_ctrl(id, 1);
1164 rte_event_eth_rx_adapter_stop(uint8_t id)
1166 return rx_adapter_ctrl(id, 0);
1170 rte_event_eth_rx_adapter_stats_get(uint8_t id,
1171 struct rte_event_eth_rx_adapter_stats *stats)
1173 struct rte_event_eth_rx_adapter *rx_adapter;
1174 struct rte_event_eth_rx_adapter_stats dev_stats_sum = { 0 };
1175 struct rte_event_eth_rx_adapter_stats dev_stats;
1176 struct rte_eventdev *dev;
1177 struct eth_device_info *dev_info;
1181 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
1183 rx_adapter = id_to_rx_adapter(id);
1184 if (rx_adapter == NULL || stats == NULL)
1187 dev = &rte_eventdevs[rx_adapter->eventdev_id];
1188 memset(stats, 0, sizeof(*stats));
1189 for (i = 0; i < rte_eth_dev_count(); i++) {
1190 dev_info = &rx_adapter->eth_devices[i];
1191 if (dev_info->internal_event_port == 0 ||
1192 dev->dev_ops->eth_rx_adapter_stats_get == NULL)
1194 ret = (*dev->dev_ops->eth_rx_adapter_stats_get)(dev,
1195 &rte_eth_devices[i],
1199 dev_stats_sum.rx_packets += dev_stats.rx_packets;
1200 dev_stats_sum.rx_enq_count += dev_stats.rx_enq_count;
1203 if (rx_adapter->service_inited)
1204 *stats = rx_adapter->stats;
1206 stats->rx_packets += dev_stats_sum.rx_packets;
1207 stats->rx_enq_count += dev_stats_sum.rx_enq_count;
1212 rte_event_eth_rx_adapter_stats_reset(uint8_t id)
1214 struct rte_event_eth_rx_adapter *rx_adapter;
1215 struct rte_eventdev *dev;
1216 struct eth_device_info *dev_info;
1219 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
1221 rx_adapter = id_to_rx_adapter(id);
1222 if (rx_adapter == NULL)
1225 dev = &rte_eventdevs[rx_adapter->eventdev_id];
1226 for (i = 0; i < rte_eth_dev_count(); i++) {
1227 dev_info = &rx_adapter->eth_devices[i];
1228 if (dev_info->internal_event_port == 0 ||
1229 dev->dev_ops->eth_rx_adapter_stats_reset == NULL)
1231 (*dev->dev_ops->eth_rx_adapter_stats_reset)(dev,
1232 &rte_eth_devices[i]);
1235 memset(&rx_adapter->stats, 0, sizeof(rx_adapter->stats));
1240 rte_event_eth_rx_adapter_service_id_get(uint8_t id, uint32_t *service_id)
1242 struct rte_event_eth_rx_adapter *rx_adapter;
1244 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
1246 rx_adapter = id_to_rx_adapter(id);
1247 if (rx_adapter == NULL || service_id == NULL)
1250 if (rx_adapter->service_inited)
1251 *service_id = rx_adapter->service_id;
1253 return rx_adapter->service_inited ? 0 : -ESRCH;