X-Git-Url: https://gerrit.fd.io/r/gitweb?p=deb_dpdk.git;a=blobdiff_plain;f=lib%2Flibrte_eventdev%2Frte_event_eth_rx_adapter.c;h=f5e5a0b53081f71b67916b666ebf9d067041dba8;hp=9aece9f8c3fec528e01137c98c52791def848162;hb=b63264c8342e6a1b6971c79550d2af2024b6a4de;hpb=ca33590b6af032bff57d9cc70455660466a654b2 diff --git a/lib/librte_eventdev/rte_event_eth_rx_adapter.c b/lib/librte_eventdev/rte_event_eth_rx_adapter.c index 9aece9f8..f5e5a0b5 100644 --- a/lib/librte_eventdev/rte_event_eth_rx_adapter.c +++ b/lib/librte_eventdev/rte_event_eth_rx_adapter.c @@ -1,3 +1,12 @@ +/* SPDX-License-Identifier: BSD-3-Clause + * Copyright(c) 2017 Intel Corporation. + * All rights reserved. + */ +#if defined(LINUX) +#include +#endif +#include + #include #include #include @@ -7,6 +16,7 @@ #include #include #include +#include #include "rte_eventdev.h" #include "rte_eventdev_pmd.h" @@ -20,6 +30,22 @@ #define ETH_RX_ADAPTER_MEM_NAME_LEN 32 #define RSS_KEY_SIZE 40 +/* value written to intr thread pipe to signal thread exit */ +#define ETH_BRIDGE_INTR_THREAD_EXIT 1 +/* Sentinel value to detect initialized file handle */ +#define INIT_FD -1 + +/* + * Used to store port and queue ID of interrupting Rx queue + */ +union queue_data { + RTE_STD_C11 + void *ptr; + struct { + uint16_t port; + uint16_t queue; + }; +}; /* * There is an instance of this struct per polled Rx queue added to the @@ -27,7 +53,7 @@ */ struct eth_rx_poll_entry { /* Eth port to poll */ - uint8_t eth_dev_id; + uint16_t eth_dev_id; /* Eth rx queue to poll */ uint16_t eth_rx_qid; }; @@ -71,6 +97,30 @@ struct rte_event_eth_rx_adapter { uint16_t enq_block_count; /* Block start ts */ uint64_t rx_enq_block_start_ts; + /* epoll fd used to wait for Rx interrupts */ + int epd; + /* Num of interrupt driven interrupt queues */ + uint32_t num_rx_intr; + /* Used to send of interrupting Rx queues from + * the interrupt thread to the Rx thread + */ + struct rte_ring *intr_ring; + /* Rx Queue data (dev id, queue id) for the last non-empty + * queue polled + */ + union queue_data qd; + /* queue_data is valid */ + int qd_valid; + /* Interrupt ring lock, synchronizes Rx thread + * and interrupt thread + */ + rte_spinlock_t intr_ring_lock; + /* event array passed to rte_poll_wait */ + struct rte_epoll_event *epoll_events; + /* Count of interrupt vectors in use */ + uint32_t num_intr_vec; + /* Thread blocked on Rx interrupts */ + pthread_t rx_intr_thread; /* Configuration callback for rte_service configuration */ rte_event_eth_rx_adapter_conf_cb conf_cb; /* Configuration callback argument */ @@ -87,12 +137,20 @@ struct rte_event_eth_rx_adapter { int socket_id; /* Per adapter EAL service */ uint32_t service_id; + /* Adapter started flag */ + uint8_t rxa_started; + /* Adapter ID */ + uint8_t id; } __rte_cache_aligned; /* Per eth device */ struct eth_device_info { struct rte_eth_dev *dev; struct eth_rx_queue_info *rx_queue; + /* Rx callback */ + rte_event_eth_rx_adapter_cb_fn cb_fn; + /* Rx callback argument */ + void *cb_arg; /* Set if ethdev->eventdev packet transfer uses a * hardware mechanism */ @@ -103,15 +161,42 @@ struct eth_device_info { * rx_adapter_stop callback needs to be invoked */ uint8_t dev_rx_started; - /* If nb_dev_queues > 0, the start callback will + /* Number of queues added for this device */ + uint16_t nb_dev_queues; + /* Number of poll based queues + * If nb_rx_poll > 0, the start callback will * be invoked if not already invoked */ - uint16_t nb_dev_queues; + uint16_t nb_rx_poll; + /* Number of interrupt based queues + * If nb_rx_intr > 0, the start callback will + * be invoked if not already invoked. + */ + uint16_t nb_rx_intr; + /* Number of queues that use the shared interrupt */ + uint16_t nb_shared_intr; + /* sum(wrr(q)) for all queues within the device + * useful when deleting all device queues + */ + uint32_t wrr_len; + /* Intr based queue index to start polling from, this is used + * if the number of shared interrupts is non-zero + */ + uint16_t next_q_idx; + /* Intr based queue indices */ + uint16_t *intr_queue; + /* device generates per Rx queue interrupt for queue index + * for queue indices < RTE_MAX_RXTX_INTR_VEC_ID - 1 + */ + int multi_intr_cap; + /* shared interrupt enabled */ + int shared_intr_enabled; }; /* Per Rx queue */ struct eth_rx_queue_info { int queue_enabled; /* True if added */ + int intr_enabled; uint16_t wt; /* Polling weight */ uint8_t event_queue_id; /* Event queue to enqueue packets to */ uint8_t sched_type; /* Sched type for events */ @@ -123,30 +208,30 @@ struct eth_rx_queue_info { static struct rte_event_eth_rx_adapter **event_eth_rx_adapter; static inline int -valid_id(uint8_t id) +rxa_validate_id(uint8_t id) { return id < RTE_EVENT_ETH_RX_ADAPTER_MAX_INSTANCE; } #define RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, retval) do { \ - if (!valid_id(id)) { \ + if (!rxa_validate_id(id)) { \ RTE_EDEV_LOG_ERR("Invalid eth Rx adapter id = %d\n", id); \ return retval; \ } \ } while (0) static inline int -sw_rx_adapter_queue_count(struct rte_event_eth_rx_adapter *rx_adapter) +rxa_sw_adapter_queue_count(struct rte_event_eth_rx_adapter *rx_adapter) { - return rx_adapter->num_rx_polled; + return rx_adapter->num_rx_polled + rx_adapter->num_rx_intr; } /* Greatest common divisor */ -static uint16_t gcd_u16(uint16_t a, uint16_t b) +static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b) { uint16_t r = a % b; - return r ? gcd_u16(b, r) : b; + return r ? rxa_gcd_u16(b, r) : b; } /* Returns the next queue in the polling sequence @@ -154,7 +239,7 @@ static uint16_t gcd_u16(uint16_t a, uint16_t b) * http://kb.linuxvirtualserver.org/wiki/Weighted_Round-Robin_Scheduling */ static int -wrr_next(struct rte_event_eth_rx_adapter *rx_adapter, +rxa_wrr_next(struct rte_event_eth_rx_adapter *rx_adapter, unsigned int n, int *cw, struct eth_rx_poll_entry *eth_rx_poll, uint16_t max_wt, uint16_t gcd, int prev) @@ -164,7 +249,7 @@ wrr_next(struct rte_event_eth_rx_adapter *rx_adapter, while (1) { uint16_t q; - uint8_t d; + uint16_t d; i = (i + 1) % n; if (i == 0) { @@ -182,13 +267,298 @@ wrr_next(struct rte_event_eth_rx_adapter *rx_adapter, } } -/* Precalculate WRR polling sequence for all queues in rx_adapter */ +static inline int +rxa_shared_intr(struct eth_device_info *dev_info, + int rx_queue_id) +{ + int multi_intr_cap; + + if (dev_info->dev->intr_handle == NULL) + return 0; + + multi_intr_cap = rte_intr_cap_multiple(dev_info->dev->intr_handle); + return !multi_intr_cap || + rx_queue_id >= RTE_MAX_RXTX_INTR_VEC_ID - 1; +} + +static inline int +rxa_intr_queue(struct eth_device_info *dev_info, + int rx_queue_id) +{ + struct eth_rx_queue_info *queue_info; + + queue_info = &dev_info->rx_queue[rx_queue_id]; + return dev_info->rx_queue && + !dev_info->internal_event_port && + queue_info->queue_enabled && queue_info->wt == 0; +} + +static inline int +rxa_polled_queue(struct eth_device_info *dev_info, + int rx_queue_id) +{ + struct eth_rx_queue_info *queue_info; + + queue_info = &dev_info->rx_queue[rx_queue_id]; + return !dev_info->internal_event_port && + dev_info->rx_queue && + queue_info->queue_enabled && queue_info->wt != 0; +} + +/* Calculate change in number of vectors after Rx queue ID is add/deleted */ +static int +rxa_nb_intr_vect(struct eth_device_info *dev_info, int rx_queue_id, int add) +{ + uint16_t i; + int n, s; + uint16_t nbq; + + nbq = dev_info->dev->data->nb_rx_queues; + n = 0; /* non shared count */ + s = 0; /* shared count */ + + if (rx_queue_id == -1) { + for (i = 0; i < nbq; i++) { + if (!rxa_shared_intr(dev_info, i)) + n += add ? !rxa_intr_queue(dev_info, i) : + rxa_intr_queue(dev_info, i); + else + s += add ? !rxa_intr_queue(dev_info, i) : + rxa_intr_queue(dev_info, i); + } + + if (s > 0) { + if ((add && dev_info->nb_shared_intr == 0) || + (!add && dev_info->nb_shared_intr)) + n += 1; + } + } else { + if (!rxa_shared_intr(dev_info, rx_queue_id)) + n = add ? !rxa_intr_queue(dev_info, rx_queue_id) : + rxa_intr_queue(dev_info, rx_queue_id); + else + n = add ? !dev_info->nb_shared_intr : + dev_info->nb_shared_intr == 1; + } + + return add ? n : -n; +} + +/* Calculate nb_rx_intr after deleting interrupt mode rx queues + */ +static void +rxa_calc_nb_post_intr_del(struct rte_event_eth_rx_adapter *rx_adapter, + struct eth_device_info *dev_info, + int rx_queue_id, + uint32_t *nb_rx_intr) +{ + uint32_t intr_diff; + + if (rx_queue_id == -1) + intr_diff = dev_info->nb_rx_intr; + else + intr_diff = rxa_intr_queue(dev_info, rx_queue_id); + + *nb_rx_intr = rx_adapter->num_rx_intr - intr_diff; +} + +/* Calculate nb_rx_* after adding interrupt mode rx queues, newly added + * interrupt queues could currently be poll mode Rx queues + */ +static void +rxa_calc_nb_post_add_intr(struct rte_event_eth_rx_adapter *rx_adapter, + struct eth_device_info *dev_info, + int rx_queue_id, + uint32_t *nb_rx_poll, + uint32_t *nb_rx_intr, + uint32_t *nb_wrr) +{ + uint32_t intr_diff; + uint32_t poll_diff; + uint32_t wrr_len_diff; + + if (rx_queue_id == -1) { + intr_diff = dev_info->dev->data->nb_rx_queues - + dev_info->nb_rx_intr; + poll_diff = dev_info->nb_rx_poll; + wrr_len_diff = dev_info->wrr_len; + } else { + intr_diff = !rxa_intr_queue(dev_info, rx_queue_id); + poll_diff = rxa_polled_queue(dev_info, rx_queue_id); + wrr_len_diff = poll_diff ? dev_info->rx_queue[rx_queue_id].wt : + 0; + } + + *nb_rx_intr = rx_adapter->num_rx_intr + intr_diff; + *nb_rx_poll = rx_adapter->num_rx_polled - poll_diff; + *nb_wrr = rx_adapter->wrr_len - wrr_len_diff; +} + +/* Calculate size of the eth_rx_poll and wrr_sched arrays + * after deleting poll mode rx queues + */ +static void +rxa_calc_nb_post_poll_del(struct rte_event_eth_rx_adapter *rx_adapter, + struct eth_device_info *dev_info, + int rx_queue_id, + uint32_t *nb_rx_poll, + uint32_t *nb_wrr) +{ + uint32_t poll_diff; + uint32_t wrr_len_diff; + + if (rx_queue_id == -1) { + poll_diff = dev_info->nb_rx_poll; + wrr_len_diff = dev_info->wrr_len; + } else { + poll_diff = rxa_polled_queue(dev_info, rx_queue_id); + wrr_len_diff = poll_diff ? dev_info->rx_queue[rx_queue_id].wt : + 0; + } + + *nb_rx_poll = rx_adapter->num_rx_polled - poll_diff; + *nb_wrr = rx_adapter->wrr_len - wrr_len_diff; +} + +/* Calculate nb_rx_* after adding poll mode rx queues + */ +static void +rxa_calc_nb_post_add_poll(struct rte_event_eth_rx_adapter *rx_adapter, + struct eth_device_info *dev_info, + int rx_queue_id, + uint16_t wt, + uint32_t *nb_rx_poll, + uint32_t *nb_rx_intr, + uint32_t *nb_wrr) +{ + uint32_t intr_diff; + uint32_t poll_diff; + uint32_t wrr_len_diff; + + if (rx_queue_id == -1) { + intr_diff = dev_info->nb_rx_intr; + poll_diff = dev_info->dev->data->nb_rx_queues - + dev_info->nb_rx_poll; + wrr_len_diff = wt*dev_info->dev->data->nb_rx_queues + - dev_info->wrr_len; + } else { + intr_diff = rxa_intr_queue(dev_info, rx_queue_id); + poll_diff = !rxa_polled_queue(dev_info, rx_queue_id); + wrr_len_diff = rxa_polled_queue(dev_info, rx_queue_id) ? + wt - dev_info->rx_queue[rx_queue_id].wt : + wt; + } + + *nb_rx_poll = rx_adapter->num_rx_polled + poll_diff; + *nb_rx_intr = rx_adapter->num_rx_intr - intr_diff; + *nb_wrr = rx_adapter->wrr_len + wrr_len_diff; +} + +/* Calculate nb_rx_* after adding rx_queue_id */ +static void +rxa_calc_nb_post_add(struct rte_event_eth_rx_adapter *rx_adapter, + struct eth_device_info *dev_info, + int rx_queue_id, + uint16_t wt, + uint32_t *nb_rx_poll, + uint32_t *nb_rx_intr, + uint32_t *nb_wrr) +{ + if (wt != 0) + rxa_calc_nb_post_add_poll(rx_adapter, dev_info, rx_queue_id, + wt, nb_rx_poll, nb_rx_intr, nb_wrr); + else + rxa_calc_nb_post_add_intr(rx_adapter, dev_info, rx_queue_id, + nb_rx_poll, nb_rx_intr, nb_wrr); +} + +/* Calculate nb_rx_* after deleting rx_queue_id */ +static void +rxa_calc_nb_post_del(struct rte_event_eth_rx_adapter *rx_adapter, + struct eth_device_info *dev_info, + int rx_queue_id, + uint32_t *nb_rx_poll, + uint32_t *nb_rx_intr, + uint32_t *nb_wrr) +{ + rxa_calc_nb_post_poll_del(rx_adapter, dev_info, rx_queue_id, nb_rx_poll, + nb_wrr); + rxa_calc_nb_post_intr_del(rx_adapter, dev_info, rx_queue_id, + nb_rx_intr); +} + +/* + * Allocate the rx_poll array + */ +static struct eth_rx_poll_entry * +rxa_alloc_poll(struct rte_event_eth_rx_adapter *rx_adapter, + uint32_t num_rx_polled) +{ + size_t len; + + len = RTE_ALIGN(num_rx_polled * sizeof(*rx_adapter->eth_rx_poll), + RTE_CACHE_LINE_SIZE); + return rte_zmalloc_socket(rx_adapter->mem_name, + len, + RTE_CACHE_LINE_SIZE, + rx_adapter->socket_id); +} + +/* + * Allocate the WRR array + */ +static uint32_t * +rxa_alloc_wrr(struct rte_event_eth_rx_adapter *rx_adapter, int nb_wrr) +{ + size_t len; + + len = RTE_ALIGN(nb_wrr * sizeof(*rx_adapter->wrr_sched), + RTE_CACHE_LINE_SIZE); + return rte_zmalloc_socket(rx_adapter->mem_name, + len, + RTE_CACHE_LINE_SIZE, + rx_adapter->socket_id); +} + static int -eth_poll_wrr_calc(struct rte_event_eth_rx_adapter *rx_adapter) +rxa_alloc_poll_arrays(struct rte_event_eth_rx_adapter *rx_adapter, + uint32_t nb_poll, + uint32_t nb_wrr, + struct eth_rx_poll_entry **rx_poll, + uint32_t **wrr_sched) +{ + + if (nb_poll == 0) { + *rx_poll = NULL; + *wrr_sched = NULL; + return 0; + } + + *rx_poll = rxa_alloc_poll(rx_adapter, nb_poll); + if (*rx_poll == NULL) { + *wrr_sched = NULL; + return -ENOMEM; + } + + *wrr_sched = rxa_alloc_wrr(rx_adapter, nb_wrr); + if (*wrr_sched == NULL) { + rte_free(*rx_poll); + return -ENOMEM; + } + return 0; +} + +/* Precalculate WRR polling sequence for all queues in rx_adapter */ +static void +rxa_calc_wrr_sequence(struct rte_event_eth_rx_adapter *rx_adapter, + struct eth_rx_poll_entry *rx_poll, + uint32_t *rx_wrr) { - uint8_t d; + uint16_t d; uint16_t q; unsigned int i; + int prev = -1; + int cw = -1; /* Initialize variables for calculation of wrr schedule */ uint16_t max_wrr_pos = 0; @@ -196,79 +566,52 @@ eth_poll_wrr_calc(struct rte_event_eth_rx_adapter *rx_adapter) uint16_t max_wt = 0; uint16_t gcd = 0; - struct eth_rx_poll_entry *rx_poll = NULL; - uint32_t *rx_wrr = NULL; + if (rx_poll == NULL) + return; - if (rx_adapter->num_rx_polled) { - size_t len = RTE_ALIGN(rx_adapter->num_rx_polled * - sizeof(*rx_adapter->eth_rx_poll), - RTE_CACHE_LINE_SIZE); - rx_poll = rte_zmalloc_socket(rx_adapter->mem_name, - len, - RTE_CACHE_LINE_SIZE, - rx_adapter->socket_id); - if (rx_poll == NULL) - return -ENOMEM; + /* Generate array of all queues to poll, the size of this + * array is poll_q + */ + RTE_ETH_FOREACH_DEV(d) { + uint16_t nb_rx_queues; + struct eth_device_info *dev_info = + &rx_adapter->eth_devices[d]; + nb_rx_queues = dev_info->dev->data->nb_rx_queues; + if (dev_info->rx_queue == NULL) + continue; + if (dev_info->internal_event_port) + continue; + dev_info->wrr_len = 0; + for (q = 0; q < nb_rx_queues; q++) { + struct eth_rx_queue_info *queue_info = + &dev_info->rx_queue[q]; + uint16_t wt; - /* Generate array of all queues to poll, the size of this - * array is poll_q - */ - for (d = 0; d < rte_eth_dev_count(); d++) { - uint16_t nb_rx_queues; - struct eth_device_info *dev_info = - &rx_adapter->eth_devices[d]; - nb_rx_queues = dev_info->dev->data->nb_rx_queues; - if (dev_info->rx_queue == NULL) + if (!rxa_polled_queue(dev_info, q)) continue; - for (q = 0; q < nb_rx_queues; q++) { - struct eth_rx_queue_info *queue_info = - &dev_info->rx_queue[q]; - if (queue_info->queue_enabled == 0) - continue; - - uint16_t wt = queue_info->wt; - rx_poll[poll_q].eth_dev_id = d; - rx_poll[poll_q].eth_rx_qid = q; - max_wrr_pos += wt; - max_wt = RTE_MAX(max_wt, wt); - gcd = (gcd) ? gcd_u16(gcd, wt) : wt; - poll_q++; - } - } - - len = RTE_ALIGN(max_wrr_pos * sizeof(*rx_wrr), - RTE_CACHE_LINE_SIZE); - rx_wrr = rte_zmalloc_socket(rx_adapter->mem_name, - len, - RTE_CACHE_LINE_SIZE, - rx_adapter->socket_id); - if (rx_wrr == NULL) { - rte_free(rx_poll); - return -ENOMEM; - } - - /* Generate polling sequence based on weights */ - int prev = -1; - int cw = -1; - for (i = 0; i < max_wrr_pos; i++) { - rx_wrr[i] = wrr_next(rx_adapter, poll_q, &cw, - rx_poll, max_wt, gcd, prev); - prev = rx_wrr[i]; + wt = queue_info->wt; + rx_poll[poll_q].eth_dev_id = d; + rx_poll[poll_q].eth_rx_qid = q; + max_wrr_pos += wt; + dev_info->wrr_len += wt; + max_wt = RTE_MAX(max_wt, wt); + gcd = (gcd) ? rxa_gcd_u16(gcd, wt) : wt; + poll_q++; } } - rte_free(rx_adapter->eth_rx_poll); - rte_free(rx_adapter->wrr_sched); - - rx_adapter->eth_rx_poll = rx_poll; - rx_adapter->wrr_sched = rx_wrr; - rx_adapter->wrr_len = max_wrr_pos; - - return 0; + /* Generate polling sequence based on weights */ + prev = -1; + cw = -1; + for (i = 0; i < max_wrr_pos; i++) { + rx_wrr[i] = rxa_wrr_next(rx_adapter, poll_q, &cw, + rx_poll, max_wt, gcd, prev); + prev = rx_wrr[i]; + } } static inline void -mtoip(struct rte_mbuf *m, struct ipv4_hdr **ipv4_hdr, +rxa_mtoip(struct rte_mbuf *m, struct ipv4_hdr **ipv4_hdr, struct ipv6_hdr **ipv6_hdr) { struct ether_hdr *eth_hdr = rte_pktmbuf_mtod(m, struct ether_hdr *); @@ -307,7 +650,7 @@ mtoip(struct rte_mbuf *m, struct ipv4_hdr **ipv4_hdr, /* Calculate RSS hash for IPv4/6 */ static inline uint32_t -do_softrss(struct rte_mbuf *m, const uint8_t *rss_key_be) +rxa_do_softrss(struct rte_mbuf *m, const uint8_t *rss_key_be) { uint32_t input_len; void *tuple; @@ -316,7 +659,7 @@ do_softrss(struct rte_mbuf *m, const uint8_t *rss_key_be) struct ipv4_hdr *ipv4_hdr; struct ipv6_hdr *ipv6_hdr; - mtoip(m, &ipv4_hdr, &ipv6_hdr); + rxa_mtoip(m, &ipv4_hdr, &ipv6_hdr); if (ipv4_hdr) { ipv4_tuple.src_addr = rte_be_to_cpu_32(ipv4_hdr->src_addr); @@ -335,13 +678,13 @@ do_softrss(struct rte_mbuf *m, const uint8_t *rss_key_be) } static inline int -rx_enq_blocked(struct rte_event_eth_rx_adapter *rx_adapter) +rxa_enq_blocked(struct rte_event_eth_rx_adapter *rx_adapter) { return !!rx_adapter->enq_block_count; } static inline void -rx_enq_block_start_ts(struct rte_event_eth_rx_adapter *rx_adapter) +rxa_enq_block_start_ts(struct rte_event_eth_rx_adapter *rx_adapter) { if (rx_adapter->rx_enq_block_start_ts) return; @@ -354,13 +697,13 @@ rx_enq_block_start_ts(struct rte_event_eth_rx_adapter *rx_adapter) } static inline void -rx_enq_block_end_ts(struct rte_event_eth_rx_adapter *rx_adapter, +rxa_enq_block_end_ts(struct rte_event_eth_rx_adapter *rx_adapter, struct rte_event_eth_rx_adapter_stats *stats) { if (unlikely(!stats->rx_enq_start_ts)) stats->rx_enq_start_ts = rte_get_tsc_cycles(); - if (likely(!rx_enq_blocked(rx_adapter))) + if (likely(!rxa_enq_blocked(rx_adapter))) return; rx_adapter->enq_block_count = 0; @@ -376,8 +719,8 @@ rx_enq_block_end_ts(struct rte_event_eth_rx_adapter *rx_adapter, * this function */ static inline void -buf_event_enqueue(struct rte_event_eth_rx_adapter *rx_adapter, - struct rte_event *ev) +rxa_buffer_event(struct rte_event_eth_rx_adapter *rx_adapter, + struct rte_event *ev) { struct rte_eth_event_enqueue_buffer *buf = &rx_adapter->event_enqueue_buffer; @@ -386,7 +729,7 @@ buf_event_enqueue(struct rte_event_eth_rx_adapter *rx_adapter, /* Enqueue buffered events to event device */ static inline uint16_t -flush_event_buffer(struct rte_event_eth_rx_adapter *rx_adapter) +rxa_flush_event_buffer(struct rte_event_eth_rx_adapter *rx_adapter) { struct rte_eth_event_enqueue_buffer *buf = &rx_adapter->event_enqueue_buffer; @@ -403,8 +746,8 @@ flush_event_buffer(struct rte_event_eth_rx_adapter *rx_adapter) stats->rx_enq_retry++; } - n ? rx_enq_block_end_ts(rx_adapter, stats) : - rx_enq_block_start_ts(rx_adapter); + n ? rxa_enq_block_end_ts(rx_adapter, stats) : + rxa_enq_block_start_ts(rx_adapter); buf->count -= n; stats->rx_enq_count += n; @@ -413,18 +756,19 @@ flush_event_buffer(struct rte_event_eth_rx_adapter *rx_adapter) } static inline void -fill_event_buffer(struct rte_event_eth_rx_adapter *rx_adapter, - uint8_t dev_id, - uint16_t rx_queue_id, - struct rte_mbuf **mbufs, - uint16_t num) +rxa_buffer_mbufs(struct rte_event_eth_rx_adapter *rx_adapter, + uint16_t eth_dev_id, + uint16_t rx_queue_id, + struct rte_mbuf **mbufs, + uint16_t num) { uint32_t i; - struct eth_device_info *eth_device_info = - &rx_adapter->eth_devices[dev_id]; + struct eth_device_info *dev_info = + &rx_adapter->eth_devices[eth_dev_id]; struct eth_rx_queue_info *eth_rx_queue_info = - ð_device_info->rx_queue[rx_queue_id]; - + &dev_info->rx_queue[rx_queue_id]; + struct rte_eth_event_enqueue_buffer *buf = + &rx_adapter->event_enqueue_buffer; int32_t qid = eth_rx_queue_info->event_queue_id; uint8_t sched_type = eth_rx_queue_info->sched_type; uint8_t priority = eth_rx_queue_info->priority; @@ -434,22 +778,48 @@ fill_event_buffer(struct rte_event_eth_rx_adapter *rx_adapter, uint32_t rss_mask; uint32_t rss; int do_rss; + uint64_t ts; + struct rte_mbuf *cb_mbufs[BATCH_SIZE]; + uint16_t nb_cb; /* 0xffff ffff if PKT_RX_RSS_HASH is set, otherwise 0 */ rss_mask = ~(((m->ol_flags & PKT_RX_RSS_HASH) != 0) - 1); do_rss = !rss_mask && !eth_rx_queue_info->flow_id_mask; + if ((m->ol_flags & PKT_RX_TIMESTAMP) == 0) { + ts = rte_get_tsc_cycles(); + for (i = 0; i < num; i++) { + m = mbufs[i]; + + m->timestamp = ts; + m->ol_flags |= PKT_RX_TIMESTAMP; + } + } + + + nb_cb = dev_info->cb_fn ? dev_info->cb_fn(eth_dev_id, rx_queue_id, + ETH_EVENT_BUFFER_SIZE, + buf->count, mbufs, + num, + dev_info->cb_arg, + cb_mbufs) : + num; + if (nb_cb < num) { + mbufs = cb_mbufs; + num = nb_cb; + } + for (i = 0; i < num; i++) { m = mbufs[i]; struct rte_event *ev = &events[i]; rss = do_rss ? - do_softrss(m, rx_adapter->rss_key_be) : m->hash.rss; + rxa_do_softrss(m, rx_adapter->rss_key_be) : + m->hash.rss; flow_id = eth_rx_queue_info->flow_id & eth_rx_queue_info->flow_id_mask; flow_id |= rss & ~eth_rx_queue_info->flow_id_mask; - ev->flow_id = flow_id; ev->op = RTE_EVENT_OP_NEW; ev->sched_type = sched_type; @@ -459,10 +829,277 @@ fill_event_buffer(struct rte_event_eth_rx_adapter *rx_adapter, ev->priority = priority; ev->mbuf = m; - buf_event_enqueue(rx_adapter, ev); + rxa_buffer_event(rx_adapter, ev); } } +/* Enqueue packets from to event buffer */ +static inline uint32_t +rxa_eth_rx(struct rte_event_eth_rx_adapter *rx_adapter, + uint16_t port_id, + uint16_t queue_id, + uint32_t rx_count, + uint32_t max_rx, + int *rxq_empty) +{ + struct rte_mbuf *mbufs[BATCH_SIZE]; + struct rte_eth_event_enqueue_buffer *buf = + &rx_adapter->event_enqueue_buffer; + struct rte_event_eth_rx_adapter_stats *stats = + &rx_adapter->stats; + uint16_t n; + uint32_t nb_rx = 0; + + if (rxq_empty) + *rxq_empty = 0; + /* Don't do a batch dequeue from the rx queue if there isn't + * enough space in the enqueue buffer. + */ + while (BATCH_SIZE <= (RTE_DIM(buf->events) - buf->count)) { + if (buf->count >= BATCH_SIZE) + rxa_flush_event_buffer(rx_adapter); + + stats->rx_poll_count++; + n = rte_eth_rx_burst(port_id, queue_id, mbufs, BATCH_SIZE); + if (unlikely(!n)) { + if (rxq_empty) + *rxq_empty = 1; + break; + } + rxa_buffer_mbufs(rx_adapter, port_id, queue_id, mbufs, n); + nb_rx += n; + if (rx_count + nb_rx > max_rx) + break; + } + + if (buf->count >= BATCH_SIZE) + rxa_flush_event_buffer(rx_adapter); + + return nb_rx; +} + +static inline void +rxa_intr_ring_enqueue(struct rte_event_eth_rx_adapter *rx_adapter, + void *data) +{ + uint16_t port_id; + uint16_t queue; + int err; + union queue_data qd; + struct eth_device_info *dev_info; + struct eth_rx_queue_info *queue_info; + int *intr_enabled; + + qd.ptr = data; + port_id = qd.port; + queue = qd.queue; + + dev_info = &rx_adapter->eth_devices[port_id]; + queue_info = &dev_info->rx_queue[queue]; + rte_spinlock_lock(&rx_adapter->intr_ring_lock); + if (rxa_shared_intr(dev_info, queue)) + intr_enabled = &dev_info->shared_intr_enabled; + else + intr_enabled = &queue_info->intr_enabled; + + if (*intr_enabled) { + *intr_enabled = 0; + err = rte_ring_enqueue(rx_adapter->intr_ring, data); + /* Entry should always be available. + * The ring size equals the maximum number of interrupt + * vectors supported (an interrupt vector is shared in + * case of shared interrupts) + */ + if (err) + RTE_EDEV_LOG_ERR("Failed to enqueue interrupt" + " to ring: %s", strerror(err)); + else + rte_eth_dev_rx_intr_disable(port_id, queue); + } + rte_spinlock_unlock(&rx_adapter->intr_ring_lock); +} + +static int +rxa_intr_ring_check_avail(struct rte_event_eth_rx_adapter *rx_adapter, + uint32_t num_intr_vec) +{ + if (rx_adapter->num_intr_vec + num_intr_vec > + RTE_EVENT_ETH_INTR_RING_SIZE) { + RTE_EDEV_LOG_ERR("Exceeded intr ring slots current" + " %d needed %d limit %d", rx_adapter->num_intr_vec, + num_intr_vec, RTE_EVENT_ETH_INTR_RING_SIZE); + return -ENOSPC; + } + + return 0; +} + +/* Delete entries for (dev, queue) from the interrupt ring */ +static void +rxa_intr_ring_del_entries(struct rte_event_eth_rx_adapter *rx_adapter, + struct eth_device_info *dev_info, + uint16_t rx_queue_id) +{ + int i, n; + union queue_data qd; + + rte_spinlock_lock(&rx_adapter->intr_ring_lock); + + n = rte_ring_count(rx_adapter->intr_ring); + for (i = 0; i < n; i++) { + rte_ring_dequeue(rx_adapter->intr_ring, &qd.ptr); + if (!rxa_shared_intr(dev_info, rx_queue_id)) { + if (qd.port == dev_info->dev->data->port_id && + qd.queue == rx_queue_id) + continue; + } else { + if (qd.port == dev_info->dev->data->port_id) + continue; + } + rte_ring_enqueue(rx_adapter->intr_ring, qd.ptr); + } + + rte_spinlock_unlock(&rx_adapter->intr_ring_lock); +} + +/* pthread callback handling interrupt mode receive queues + * After receiving an Rx interrupt, it enqueues the port id and queue id of the + * interrupting queue to the adapter's ring buffer for interrupt events. + * These events are picked up by rxa_intr_ring_dequeue() which is invoked from + * the adapter service function. + */ +static void * +rxa_intr_thread(void *arg) +{ + struct rte_event_eth_rx_adapter *rx_adapter = arg; + struct rte_epoll_event *epoll_events = rx_adapter->epoll_events; + int n, i; + + while (1) { + n = rte_epoll_wait(rx_adapter->epd, epoll_events, + RTE_EVENT_ETH_INTR_RING_SIZE, -1); + if (unlikely(n < 0)) + RTE_EDEV_LOG_ERR("rte_epoll_wait returned error %d", + n); + for (i = 0; i < n; i++) { + rxa_intr_ring_enqueue(rx_adapter, + epoll_events[i].epdata.data); + } + } + + return NULL; +} + +/* Dequeue from interrupt ring and enqueue received + * mbufs to eventdev + */ +static inline uint32_t +rxa_intr_ring_dequeue(struct rte_event_eth_rx_adapter *rx_adapter) +{ + uint32_t n; + uint32_t nb_rx = 0; + int rxq_empty; + struct rte_eth_event_enqueue_buffer *buf; + rte_spinlock_t *ring_lock; + uint8_t max_done = 0; + + if (rx_adapter->num_rx_intr == 0) + return 0; + + if (rte_ring_count(rx_adapter->intr_ring) == 0 + && !rx_adapter->qd_valid) + return 0; + + buf = &rx_adapter->event_enqueue_buffer; + ring_lock = &rx_adapter->intr_ring_lock; + + if (buf->count >= BATCH_SIZE) + rxa_flush_event_buffer(rx_adapter); + + while (BATCH_SIZE <= (RTE_DIM(buf->events) - buf->count)) { + struct eth_device_info *dev_info; + uint16_t port; + uint16_t queue; + union queue_data qd = rx_adapter->qd; + int err; + + if (!rx_adapter->qd_valid) { + struct eth_rx_queue_info *queue_info; + + rte_spinlock_lock(ring_lock); + err = rte_ring_dequeue(rx_adapter->intr_ring, &qd.ptr); + if (err) { + rte_spinlock_unlock(ring_lock); + break; + } + + port = qd.port; + queue = qd.queue; + rx_adapter->qd = qd; + rx_adapter->qd_valid = 1; + dev_info = &rx_adapter->eth_devices[port]; + if (rxa_shared_intr(dev_info, queue)) + dev_info->shared_intr_enabled = 1; + else { + queue_info = &dev_info->rx_queue[queue]; + queue_info->intr_enabled = 1; + } + rte_eth_dev_rx_intr_enable(port, queue); + rte_spinlock_unlock(ring_lock); + } else { + port = qd.port; + queue = qd.queue; + + dev_info = &rx_adapter->eth_devices[port]; + } + + if (rxa_shared_intr(dev_info, queue)) { + uint16_t i; + uint16_t nb_queues; + + nb_queues = dev_info->dev->data->nb_rx_queues; + n = 0; + for (i = dev_info->next_q_idx; i < nb_queues; i++) { + uint8_t enq_buffer_full; + + if (!rxa_intr_queue(dev_info, i)) + continue; + n = rxa_eth_rx(rx_adapter, port, i, nb_rx, + rx_adapter->max_nb_rx, + &rxq_empty); + nb_rx += n; + + enq_buffer_full = !rxq_empty && n == 0; + max_done = nb_rx > rx_adapter->max_nb_rx; + + if (enq_buffer_full || max_done) { + dev_info->next_q_idx = i; + goto done; + } + } + + rx_adapter->qd_valid = 0; + + /* Reinitialize for next interrupt */ + dev_info->next_q_idx = dev_info->multi_intr_cap ? + RTE_MAX_RXTX_INTR_VEC_ID - 1 : + 0; + } else { + n = rxa_eth_rx(rx_adapter, port, queue, nb_rx, + rx_adapter->max_nb_rx, + &rxq_empty); + rx_adapter->qd_valid = !rxq_empty; + nb_rx += n; + if (nb_rx > rx_adapter->max_nb_rx) + break; + } + } + +done: + rx_adapter->stats.rx_intr_packets += nb_rx; + return nb_rx; +} + /* * Polls receive queues added to the event adapter and enqueues received * packets to the event device. @@ -477,12 +1114,10 @@ fill_event_buffer(struct rte_event_eth_rx_adapter *rx_adapter, * it. */ static inline uint32_t -eth_rx_poll(struct rte_event_eth_rx_adapter *rx_adapter) +rxa_poll(struct rte_event_eth_rx_adapter *rx_adapter) { uint32_t num_queue; - uint16_t n; uint32_t nb_rx = 0; - struct rte_mbuf *mbufs[BATCH_SIZE]; struct rte_eth_event_enqueue_buffer *buf; uint32_t wrr_pos; uint32_t max_nb_rx; @@ -490,142 +1125,483 @@ eth_rx_poll(struct rte_event_eth_rx_adapter *rx_adapter) wrr_pos = rx_adapter->wrr_pos; max_nb_rx = rx_adapter->max_nb_rx; buf = &rx_adapter->event_enqueue_buffer; - struct rte_event_eth_rx_adapter_stats *stats = &rx_adapter->stats; + stats = &rx_adapter->stats; /* Iterate through a WRR sequence */ for (num_queue = 0; num_queue < rx_adapter->wrr_len; num_queue++) { unsigned int poll_idx = rx_adapter->wrr_sched[wrr_pos]; uint16_t qid = rx_adapter->eth_rx_poll[poll_idx].eth_rx_qid; - uint8_t d = rx_adapter->eth_rx_poll[poll_idx].eth_dev_id; + uint16_t d = rx_adapter->eth_rx_poll[poll_idx].eth_dev_id; /* Don't do a batch dequeue from the rx queue if there isn't * enough space in the enqueue buffer. */ if (buf->count >= BATCH_SIZE) - flush_event_buffer(rx_adapter); - if (BATCH_SIZE > (ETH_EVENT_BUFFER_SIZE - buf->count)) + rxa_flush_event_buffer(rx_adapter); + if (BATCH_SIZE > (ETH_EVENT_BUFFER_SIZE - buf->count)) { + rx_adapter->wrr_pos = wrr_pos; + return nb_rx; + } + + nb_rx += rxa_eth_rx(rx_adapter, d, qid, nb_rx, max_nb_rx, + NULL); + if (nb_rx > max_nb_rx) { + rx_adapter->wrr_pos = + (wrr_pos + 1) % rx_adapter->wrr_len; break; + } + + if (++wrr_pos == rx_adapter->wrr_len) + wrr_pos = 0; + } + return nb_rx; +} + +static int +rxa_service_func(void *args) +{ + struct rte_event_eth_rx_adapter *rx_adapter = args; + struct rte_event_eth_rx_adapter_stats *stats; + + if (rte_spinlock_trylock(&rx_adapter->rx_lock) == 0) + return 0; + if (!rx_adapter->rxa_started) { + return 0; + rte_spinlock_unlock(&rx_adapter->rx_lock); + } + + stats = &rx_adapter->stats; + stats->rx_packets += rxa_intr_ring_dequeue(rx_adapter); + stats->rx_packets += rxa_poll(rx_adapter); + rte_spinlock_unlock(&rx_adapter->rx_lock); + return 0; +} + +static int +rte_event_eth_rx_adapter_init(void) +{ + const char *name = "rte_event_eth_rx_adapter_array"; + const struct rte_memzone *mz; + unsigned int sz; + + sz = sizeof(*event_eth_rx_adapter) * + RTE_EVENT_ETH_RX_ADAPTER_MAX_INSTANCE; + sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE); + + mz = rte_memzone_lookup(name); + if (mz == NULL) { + mz = rte_memzone_reserve_aligned(name, sz, rte_socket_id(), 0, + RTE_CACHE_LINE_SIZE); + if (mz == NULL) { + RTE_EDEV_LOG_ERR("failed to reserve memzone err = %" + PRId32, rte_errno); + return -rte_errno; + } + } + + event_eth_rx_adapter = mz->addr; + return 0; +} + +static inline struct rte_event_eth_rx_adapter * +rxa_id_to_adapter(uint8_t id) +{ + return event_eth_rx_adapter ? + event_eth_rx_adapter[id] : NULL; +} + +static int +rxa_default_conf_cb(uint8_t id, uint8_t dev_id, + struct rte_event_eth_rx_adapter_conf *conf, void *arg) +{ + int ret; + struct rte_eventdev *dev; + struct rte_event_dev_config dev_conf; + int started; + uint8_t port_id; + struct rte_event_port_conf *port_conf = arg; + struct rte_event_eth_rx_adapter *rx_adapter = rxa_id_to_adapter(id); + + dev = &rte_eventdevs[rx_adapter->eventdev_id]; + dev_conf = dev->data->dev_conf; + + started = dev->data->dev_started; + if (started) + rte_event_dev_stop(dev_id); + port_id = dev_conf.nb_event_ports; + dev_conf.nb_event_ports += 1; + ret = rte_event_dev_configure(dev_id, &dev_conf); + if (ret) { + RTE_EDEV_LOG_ERR("failed to configure event dev %u\n", + dev_id); + if (started) { + if (rte_event_dev_start(dev_id)) + return -EIO; + } + return ret; + } + + ret = rte_event_port_setup(dev_id, port_id, port_conf); + if (ret) { + RTE_EDEV_LOG_ERR("failed to setup event port %u\n", + port_id); + return ret; + } + + conf->event_port_id = port_id; + conf->max_nb_rx = 128; + if (started) + ret = rte_event_dev_start(dev_id); + rx_adapter->default_cb_arg = 1; + return ret; +} + +static int +rxa_epoll_create1(void) +{ +#if defined(LINUX) + int fd; + fd = epoll_create1(EPOLL_CLOEXEC); + return fd < 0 ? -errno : fd; +#elif defined(BSD) + return -ENOTSUP; +#endif +} + +static int +rxa_init_epd(struct rte_event_eth_rx_adapter *rx_adapter) +{ + if (rx_adapter->epd != INIT_FD) + return 0; + + rx_adapter->epd = rxa_epoll_create1(); + if (rx_adapter->epd < 0) { + int err = rx_adapter->epd; + rx_adapter->epd = INIT_FD; + RTE_EDEV_LOG_ERR("epoll_create1() failed, err %d", err); + return err; + } + + return 0; +} + +static int +rxa_create_intr_thread(struct rte_event_eth_rx_adapter *rx_adapter) +{ + int err; + char thread_name[RTE_MAX_THREAD_NAME_LEN]; + + if (rx_adapter->intr_ring) + return 0; + + rx_adapter->intr_ring = rte_ring_create("intr_ring", + RTE_EVENT_ETH_INTR_RING_SIZE, + rte_socket_id(), 0); + if (!rx_adapter->intr_ring) + return -ENOMEM; + + rx_adapter->epoll_events = rte_zmalloc_socket(rx_adapter->mem_name, + RTE_EVENT_ETH_INTR_RING_SIZE * + sizeof(struct rte_epoll_event), + RTE_CACHE_LINE_SIZE, + rx_adapter->socket_id); + if (!rx_adapter->epoll_events) { + err = -ENOMEM; + goto error; + } + + rte_spinlock_init(&rx_adapter->intr_ring_lock); + + snprintf(thread_name, RTE_MAX_THREAD_NAME_LEN, + "rx-intr-thread-%d", rx_adapter->id); + + err = rte_ctrl_thread_create(&rx_adapter->rx_intr_thread, thread_name, + NULL, rxa_intr_thread, rx_adapter); + if (!err) { + rte_thread_setname(rx_adapter->rx_intr_thread, thread_name); + return 0; + } + + RTE_EDEV_LOG_ERR("Failed to create interrupt thread err = %d\n", err); +error: + rte_ring_free(rx_adapter->intr_ring); + rx_adapter->intr_ring = NULL; + rx_adapter->epoll_events = NULL; + return err; +} + +static int +rxa_destroy_intr_thread(struct rte_event_eth_rx_adapter *rx_adapter) +{ + int err; + + err = pthread_cancel(rx_adapter->rx_intr_thread); + if (err) + RTE_EDEV_LOG_ERR("Can't cancel interrupt thread err = %d\n", + err); + + err = pthread_join(rx_adapter->rx_intr_thread, NULL); + if (err) + RTE_EDEV_LOG_ERR("Can't join interrupt thread err = %d\n", err); + + rte_free(rx_adapter->epoll_events); + rte_ring_free(rx_adapter->intr_ring); + rx_adapter->intr_ring = NULL; + rx_adapter->epoll_events = NULL; + return 0; +} + +static int +rxa_free_intr_resources(struct rte_event_eth_rx_adapter *rx_adapter) +{ + int ret; - stats->rx_poll_count++; - n = rte_eth_rx_burst(d, qid, mbufs, BATCH_SIZE); + if (rx_adapter->num_rx_intr == 0) + return 0; - if (n) { - stats->rx_packets += n; - /* The check before rte_eth_rx_burst() ensures that - * all n mbufs can be buffered - */ - fill_event_buffer(rx_adapter, d, qid, mbufs, n); - nb_rx += n; - if (nb_rx > max_nb_rx) { - rx_adapter->wrr_pos = - (wrr_pos + 1) % rx_adapter->wrr_len; - return nb_rx; - } - } + ret = rxa_destroy_intr_thread(rx_adapter); + if (ret) + return ret; - if (++wrr_pos == rx_adapter->wrr_len) - wrr_pos = 0; - } + close(rx_adapter->epd); + rx_adapter->epd = INIT_FD; - return nb_rx; + return ret; } static int -event_eth_rx_adapter_service_func(void *args) +rxa_disable_intr(struct rte_event_eth_rx_adapter *rx_adapter, + struct eth_device_info *dev_info, + uint16_t rx_queue_id) { - struct rte_event_eth_rx_adapter *rx_adapter = args; - struct rte_eth_event_enqueue_buffer *buf; + int err; + uint16_t eth_dev_id = dev_info->dev->data->port_id; + int sintr = rxa_shared_intr(dev_info, rx_queue_id); + + err = rte_eth_dev_rx_intr_disable(eth_dev_id, rx_queue_id); + if (err) { + RTE_EDEV_LOG_ERR("Could not disable interrupt for Rx queue %u", + rx_queue_id); + return err; + } - buf = &rx_adapter->event_enqueue_buffer; - if (rte_spinlock_trylock(&rx_adapter->rx_lock) == 0) - return 0; - if (eth_rx_poll(rx_adapter) == 0 && buf->count) - flush_event_buffer(rx_adapter); - rte_spinlock_unlock(&rx_adapter->rx_lock); - return 0; + err = rte_eth_dev_rx_intr_ctl_q(eth_dev_id, rx_queue_id, + rx_adapter->epd, + RTE_INTR_EVENT_DEL, + 0); + if (err) + RTE_EDEV_LOG_ERR("Interrupt event deletion failed %d", err); + + if (sintr) + dev_info->rx_queue[rx_queue_id].intr_enabled = 0; + else + dev_info->shared_intr_enabled = 0; + return err; } static int -rte_event_eth_rx_adapter_init(void) +rxa_del_intr_queue(struct rte_event_eth_rx_adapter *rx_adapter, + struct eth_device_info *dev_info, + int rx_queue_id) { - const char *name = "rte_event_eth_rx_adapter_array"; - const struct rte_memzone *mz; - unsigned int sz; + int err; + int i; + int s; - sz = sizeof(*event_eth_rx_adapter) * - RTE_EVENT_ETH_RX_ADAPTER_MAX_INSTANCE; - sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE); + if (dev_info->nb_rx_intr == 0) + return 0; - mz = rte_memzone_lookup(name); - if (mz == NULL) { - mz = rte_memzone_reserve_aligned(name, sz, rte_socket_id(), 0, - RTE_CACHE_LINE_SIZE); - if (mz == NULL) { - RTE_EDEV_LOG_ERR("failed to reserve memzone err = %" - PRId32, rte_errno); - return -rte_errno; + err = 0; + if (rx_queue_id == -1) { + s = dev_info->nb_shared_intr; + for (i = 0; i < dev_info->nb_rx_intr; i++) { + int sintr; + uint16_t q; + + q = dev_info->intr_queue[i]; + sintr = rxa_shared_intr(dev_info, q); + s -= sintr; + + if (!sintr || s == 0) { + + err = rxa_disable_intr(rx_adapter, dev_info, + q); + if (err) + return err; + rxa_intr_ring_del_entries(rx_adapter, dev_info, + q); + } + } + } else { + if (!rxa_intr_queue(dev_info, rx_queue_id)) + return 0; + if (!rxa_shared_intr(dev_info, rx_queue_id) || + dev_info->nb_shared_intr == 1) { + err = rxa_disable_intr(rx_adapter, dev_info, + rx_queue_id); + if (err) + return err; + rxa_intr_ring_del_entries(rx_adapter, dev_info, + rx_queue_id); + } + + for (i = 0; i < dev_info->nb_rx_intr; i++) { + if (dev_info->intr_queue[i] == rx_queue_id) { + for (; i < dev_info->nb_rx_intr - 1; i++) + dev_info->intr_queue[i] = + dev_info->intr_queue[i + 1]; + break; + } } } - event_eth_rx_adapter = mz->addr; - return 0; + return err; } -static inline struct rte_event_eth_rx_adapter * -id_to_rx_adapter(uint8_t id) +static int +rxa_config_intr(struct rte_event_eth_rx_adapter *rx_adapter, + struct eth_device_info *dev_info, + uint16_t rx_queue_id) { - return event_eth_rx_adapter ? - event_eth_rx_adapter[id] : NULL; + int err, err1; + uint16_t eth_dev_id = dev_info->dev->data->port_id; + union queue_data qd; + int init_fd; + uint16_t *intr_queue; + int sintr = rxa_shared_intr(dev_info, rx_queue_id); + + if (rxa_intr_queue(dev_info, rx_queue_id)) + return 0; + + intr_queue = dev_info->intr_queue; + if (dev_info->intr_queue == NULL) { + size_t len = + dev_info->dev->data->nb_rx_queues * sizeof(uint16_t); + dev_info->intr_queue = + rte_zmalloc_socket( + rx_adapter->mem_name, + len, + 0, + rx_adapter->socket_id); + if (dev_info->intr_queue == NULL) + return -ENOMEM; + } + + init_fd = rx_adapter->epd; + err = rxa_init_epd(rx_adapter); + if (err) + goto err_free_queue; + + qd.port = eth_dev_id; + qd.queue = rx_queue_id; + + err = rte_eth_dev_rx_intr_ctl_q(eth_dev_id, rx_queue_id, + rx_adapter->epd, + RTE_INTR_EVENT_ADD, + qd.ptr); + if (err) { + RTE_EDEV_LOG_ERR("Failed to add interrupt event for" + " Rx Queue %u err %d", rx_queue_id, err); + goto err_del_fd; + } + + err = rte_eth_dev_rx_intr_enable(eth_dev_id, rx_queue_id); + if (err) { + RTE_EDEV_LOG_ERR("Could not enable interrupt for" + " Rx Queue %u err %d", rx_queue_id, err); + + goto err_del_event; + } + + err = rxa_create_intr_thread(rx_adapter); + if (!err) { + if (sintr) + dev_info->shared_intr_enabled = 1; + else + dev_info->rx_queue[rx_queue_id].intr_enabled = 1; + return 0; + } + + + err = rte_eth_dev_rx_intr_disable(eth_dev_id, rx_queue_id); + if (err) + RTE_EDEV_LOG_ERR("Could not disable interrupt for" + " Rx Queue %u err %d", rx_queue_id, err); +err_del_event: + err1 = rte_eth_dev_rx_intr_ctl_q(eth_dev_id, rx_queue_id, + rx_adapter->epd, + RTE_INTR_EVENT_DEL, + 0); + if (err1) { + RTE_EDEV_LOG_ERR("Could not delete event for" + " Rx Queue %u err %d", rx_queue_id, err1); + } +err_del_fd: + if (init_fd == INIT_FD) { + close(rx_adapter->epd); + rx_adapter->epd = -1; + } +err_free_queue: + if (intr_queue == NULL) + rte_free(dev_info->intr_queue); + + return err; } static int -default_conf_cb(uint8_t id, uint8_t dev_id, - struct rte_event_eth_rx_adapter_conf *conf, void *arg) +rxa_add_intr_queue(struct rte_event_eth_rx_adapter *rx_adapter, + struct eth_device_info *dev_info, + int rx_queue_id) + { - int ret; - struct rte_eventdev *dev; - struct rte_event_dev_config dev_conf; - int started; - uint8_t port_id; - struct rte_event_port_conf *port_conf = arg; - struct rte_event_eth_rx_adapter *rx_adapter = id_to_rx_adapter(id); + int i, j, err; + int si = -1; + int shared_done = (dev_info->nb_shared_intr > 0); + + if (rx_queue_id != -1) { + if (rxa_shared_intr(dev_info, rx_queue_id) && shared_done) + return 0; + return rxa_config_intr(rx_adapter, dev_info, rx_queue_id); + } - dev = &rte_eventdevs[rx_adapter->eventdev_id]; - dev_conf = dev->data->dev_conf; + err = 0; + for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++) { - started = dev->data->dev_started; - if (started) - rte_event_dev_stop(dev_id); - port_id = dev_conf.nb_event_ports; - dev_conf.nb_event_ports += 1; - ret = rte_event_dev_configure(dev_id, &dev_conf); - if (ret) { - RTE_EDEV_LOG_ERR("failed to configure event dev %u\n", - dev_id); - if (started) { - if (rte_event_dev_start(dev_id)) - return -EIO; + if (rxa_shared_intr(dev_info, i) && shared_done) + continue; + + err = rxa_config_intr(rx_adapter, dev_info, i); + + shared_done = err == 0 && rxa_shared_intr(dev_info, i); + if (shared_done) { + si = i; + dev_info->shared_intr_enabled = 1; } - return ret; + if (err) + break; } - ret = rte_event_port_setup(dev_id, port_id, port_conf); - if (ret) { - RTE_EDEV_LOG_ERR("failed to setup event port %u\n", - port_id); - return ret; + if (err == 0) + return 0; + + shared_done = (dev_info->nb_shared_intr > 0); + for (j = 0; j < i; j++) { + if (rxa_intr_queue(dev_info, j)) + continue; + if (rxa_shared_intr(dev_info, j) && si != j) + continue; + err = rxa_disable_intr(rx_adapter, dev_info, j); + if (err) + break; + } - conf->event_port_id = port_id; - conf->max_nb_rx = 128; - if (started) - ret = rte_event_dev_start(dev_id); - rx_adapter->default_cb_arg = 1; - return ret; + return err; } + static int -init_service(struct rte_event_eth_rx_adapter *rx_adapter, uint8_t id) +rxa_init_service(struct rte_event_eth_rx_adapter *rx_adapter, uint8_t id) { int ret; struct rte_service_spec service; @@ -638,7 +1614,7 @@ init_service(struct rte_event_eth_rx_adapter *rx_adapter, uint8_t id) snprintf(service.name, ETH_RX_ADAPTER_SERVICE_NAME_LEN, "rte_event_eth_rx_adapter_%d", id); service.socket_id = rx_adapter->socket_id; - service.callback = event_eth_rx_adapter_service_func; + service.callback = rxa_service_func; service.callback_userdata = rx_adapter; /* Service function handles locking for queue add/del updates */ service.capabilities = RTE_SERVICE_CAP_MT_SAFE; @@ -659,6 +1635,7 @@ init_service(struct rte_event_eth_rx_adapter *rx_adapter, uint8_t id) rx_adapter->event_port_id = rx_adapter_conf.event_port_id; rx_adapter->max_nb_rx = rx_adapter_conf.max_nb_rx; rx_adapter->service_inited = 1; + rx_adapter->epd = INIT_FD; return 0; err_done: @@ -666,9 +1643,8 @@ err_done: return ret; } - static void -update_queue_info(struct rte_event_eth_rx_adapter *rx_adapter, +rxa_update_queue(struct rte_event_eth_rx_adapter *rx_adapter, struct eth_device_info *dev_info, int32_t rx_queue_id, uint8_t add) @@ -682,7 +1658,7 @@ update_queue_info(struct rte_event_eth_rx_adapter *rx_adapter, if (rx_queue_id == -1) { for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++) - update_queue_info(rx_adapter, dev_info, i, add); + rxa_update_queue(rx_adapter, dev_info, i, add); } else { queue_info = &dev_info->rx_queue[rx_queue_id]; enabled = queue_info->queue_enabled; @@ -697,31 +1673,65 @@ update_queue_info(struct rte_event_eth_rx_adapter *rx_adapter, } } -static int -event_eth_rx_adapter_queue_del(struct rte_event_eth_rx_adapter *rx_adapter, - struct eth_device_info *dev_info, - uint16_t rx_queue_id) +static void +rxa_sw_del(struct rte_event_eth_rx_adapter *rx_adapter, + struct eth_device_info *dev_info, + int32_t rx_queue_id) { - struct eth_rx_queue_info *queue_info; + int pollq; + int intrq; + int sintrq; + if (rx_adapter->nb_queues == 0) - return 0; + return; - queue_info = &dev_info->rx_queue[rx_queue_id]; - rx_adapter->num_rx_polled -= queue_info->queue_enabled; - update_queue_info(rx_adapter, dev_info, rx_queue_id, 0); - return 0; + if (rx_queue_id == -1) { + uint16_t nb_rx_queues; + uint16_t i; + + nb_rx_queues = dev_info->dev->data->nb_rx_queues; + for (i = 0; i < nb_rx_queues; i++) + rxa_sw_del(rx_adapter, dev_info, i); + return; + } + + pollq = rxa_polled_queue(dev_info, rx_queue_id); + intrq = rxa_intr_queue(dev_info, rx_queue_id); + sintrq = rxa_shared_intr(dev_info, rx_queue_id); + rxa_update_queue(rx_adapter, dev_info, rx_queue_id, 0); + rx_adapter->num_rx_polled -= pollq; + dev_info->nb_rx_poll -= pollq; + rx_adapter->num_rx_intr -= intrq; + dev_info->nb_rx_intr -= intrq; + dev_info->nb_shared_intr -= intrq && sintrq; } static void -event_eth_rx_adapter_queue_add(struct rte_event_eth_rx_adapter *rx_adapter, - struct eth_device_info *dev_info, - uint16_t rx_queue_id, - const struct rte_event_eth_rx_adapter_queue_conf *conf) - +rxa_add_queue(struct rte_event_eth_rx_adapter *rx_adapter, + struct eth_device_info *dev_info, + int32_t rx_queue_id, + const struct rte_event_eth_rx_adapter_queue_conf *conf) { struct eth_rx_queue_info *queue_info; const struct rte_event *ev = &conf->ev; + int pollq; + int intrq; + int sintrq; + + if (rx_queue_id == -1) { + uint16_t nb_rx_queues; + uint16_t i; + + nb_rx_queues = dev_info->dev->data->nb_rx_queues; + for (i = 0; i < nb_rx_queues; i++) + rxa_add_queue(rx_adapter, dev_info, i, conf); + return; + } + + pollq = rxa_polled_queue(dev_info, rx_queue_id); + intrq = rxa_intr_queue(dev_info, rx_queue_id); + sintrq = rxa_shared_intr(dev_info, rx_queue_id); queue_info = &dev_info->rx_queue[rx_queue_id]; queue_info->event_queue_id = ev->queue_id; @@ -735,69 +1745,162 @@ event_eth_rx_adapter_queue_add(struct rte_event_eth_rx_adapter *rx_adapter, queue_info->flow_id_mask = ~0; } - /* The same queue can be added more than once */ - rx_adapter->num_rx_polled += !queue_info->queue_enabled; - update_queue_info(rx_adapter, dev_info, rx_queue_id, 1); + rxa_update_queue(rx_adapter, dev_info, rx_queue_id, 1); + if (rxa_polled_queue(dev_info, rx_queue_id)) { + rx_adapter->num_rx_polled += !pollq; + dev_info->nb_rx_poll += !pollq; + rx_adapter->num_rx_intr -= intrq; + dev_info->nb_rx_intr -= intrq; + dev_info->nb_shared_intr -= intrq && sintrq; + } + + if (rxa_intr_queue(dev_info, rx_queue_id)) { + rx_adapter->num_rx_polled -= pollq; + dev_info->nb_rx_poll -= pollq; + rx_adapter->num_rx_intr += !intrq; + dev_info->nb_rx_intr += !intrq; + dev_info->nb_shared_intr += !intrq && sintrq; + if (dev_info->nb_shared_intr == 1) { + if (dev_info->multi_intr_cap) + dev_info->next_q_idx = + RTE_MAX_RXTX_INTR_VEC_ID - 1; + else + dev_info->next_q_idx = 0; + } + } } -static int add_rx_queue(struct rte_event_eth_rx_adapter *rx_adapter, - uint8_t eth_dev_id, +static int rxa_sw_add(struct rte_event_eth_rx_adapter *rx_adapter, + uint16_t eth_dev_id, int rx_queue_id, const struct rte_event_eth_rx_adapter_queue_conf *queue_conf) { struct eth_device_info *dev_info = &rx_adapter->eth_devices[eth_dev_id]; struct rte_event_eth_rx_adapter_queue_conf temp_conf; - uint32_t i; int ret; + struct eth_rx_poll_entry *rx_poll; + struct eth_rx_queue_info *rx_queue; + uint32_t *rx_wrr; + uint16_t nb_rx_queues; + uint32_t nb_rx_poll, nb_wrr; + uint32_t nb_rx_intr; + int num_intr_vec; + uint16_t wt; if (queue_conf->servicing_weight == 0) { - struct rte_eth_dev_data *data = dev_info->dev->data; - if (data->dev_conf.intr_conf.rxq) { - RTE_EDEV_LOG_ERR("Interrupt driven queues" - " not supported"); - return -ENOTSUP; - } - temp_conf = *queue_conf; - /* If Rx interrupts are disabled set wt = 1 */ - temp_conf.servicing_weight = 1; + temp_conf = *queue_conf; + if (!data->dev_conf.intr_conf.rxq) { + /* If Rx interrupts are disabled set wt = 1 */ + temp_conf.servicing_weight = 1; + } queue_conf = &temp_conf; } + nb_rx_queues = dev_info->dev->data->nb_rx_queues; + rx_queue = dev_info->rx_queue; + wt = queue_conf->servicing_weight; + if (dev_info->rx_queue == NULL) { dev_info->rx_queue = rte_zmalloc_socket(rx_adapter->mem_name, - dev_info->dev->data->nb_rx_queues * + nb_rx_queues * sizeof(struct eth_rx_queue_info), 0, rx_adapter->socket_id); if (dev_info->rx_queue == NULL) return -ENOMEM; } + rx_wrr = NULL; + rx_poll = NULL; - if (rx_queue_id == -1) { - for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++) - event_eth_rx_adapter_queue_add(rx_adapter, - dev_info, i, - queue_conf); + rxa_calc_nb_post_add(rx_adapter, dev_info, rx_queue_id, + queue_conf->servicing_weight, + &nb_rx_poll, &nb_rx_intr, &nb_wrr); + + if (dev_info->dev->intr_handle) + dev_info->multi_intr_cap = + rte_intr_cap_multiple(dev_info->dev->intr_handle); + + ret = rxa_alloc_poll_arrays(rx_adapter, nb_rx_poll, nb_wrr, + &rx_poll, &rx_wrr); + if (ret) + goto err_free_rxqueue; + + if (wt == 0) { + num_intr_vec = rxa_nb_intr_vect(dev_info, rx_queue_id, 1); + + ret = rxa_intr_ring_check_avail(rx_adapter, num_intr_vec); + if (ret) + goto err_free_rxqueue; + + ret = rxa_add_intr_queue(rx_adapter, dev_info, rx_queue_id); + if (ret) + goto err_free_rxqueue; } else { - event_eth_rx_adapter_queue_add(rx_adapter, dev_info, - (uint16_t)rx_queue_id, - queue_conf); + + num_intr_vec = 0; + if (rx_adapter->num_rx_intr > nb_rx_intr) { + num_intr_vec = rxa_nb_intr_vect(dev_info, + rx_queue_id, 0); + /* interrupt based queues are being converted to + * poll mode queues, delete the interrupt configuration + * for those. + */ + ret = rxa_del_intr_queue(rx_adapter, + dev_info, rx_queue_id); + if (ret) + goto err_free_rxqueue; + } } - ret = eth_poll_wrr_calc(rx_adapter); - if (ret) { - event_eth_rx_adapter_queue_del(rx_adapter, - dev_info, rx_queue_id); - return ret; + if (nb_rx_intr == 0) { + ret = rxa_free_intr_resources(rx_adapter); + if (ret) + goto err_free_rxqueue; } - return ret; + if (wt == 0) { + uint16_t i; + + if (rx_queue_id == -1) { + for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++) + dev_info->intr_queue[i] = i; + } else { + if (!rxa_intr_queue(dev_info, rx_queue_id)) + dev_info->intr_queue[nb_rx_intr - 1] = + rx_queue_id; + } + } + + + + rxa_add_queue(rx_adapter, dev_info, rx_queue_id, queue_conf); + rxa_calc_wrr_sequence(rx_adapter, rx_poll, rx_wrr); + + rte_free(rx_adapter->eth_rx_poll); + rte_free(rx_adapter->wrr_sched); + + rx_adapter->eth_rx_poll = rx_poll; + rx_adapter->wrr_sched = rx_wrr; + rx_adapter->wrr_len = nb_wrr; + rx_adapter->num_intr_vec += num_intr_vec; + return 0; + +err_free_rxqueue: + if (rx_queue == NULL) { + rte_free(dev_info->rx_queue); + dev_info->rx_queue = NULL; + } + + rte_free(rx_poll); + rte_free(rx_wrr); + + return 0; } static int -rx_adapter_ctrl(uint8_t id, int start) +rxa_ctrl(uint8_t id, int start) { struct rte_event_eth_rx_adapter *rx_adapter; struct rte_eventdev *dev; @@ -807,13 +1910,13 @@ rx_adapter_ctrl(uint8_t id, int start) int stop = !start; RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL); - rx_adapter = id_to_rx_adapter(id); + rx_adapter = rxa_id_to_adapter(id); if (rx_adapter == NULL) return -EINVAL; dev = &rte_eventdevs[rx_adapter->eventdev_id]; - for (i = 0; i < rte_eth_dev_count(); i++) { + RTE_ETH_FOREACH_DEV(i) { dev_info = &rx_adapter->eth_devices[i]; /* if start check for num dev queues */ if (start && !dev_info->nb_dev_queues) @@ -831,8 +1934,12 @@ rx_adapter_ctrl(uint8_t id, int start) &rte_eth_devices[i]); } - if (use_service) + if (use_service) { + rte_spinlock_lock(&rx_adapter->rx_lock); + rx_adapter->rxa_started = start; rte_service_runstate_set(rx_adapter->service_id, start); + rte_spinlock_unlock(&rx_adapter->rx_lock); + } return 0; } @@ -845,7 +1952,7 @@ rte_event_eth_rx_adapter_create_ext(uint8_t id, uint8_t dev_id, struct rte_event_eth_rx_adapter *rx_adapter; int ret; int socket_id; - uint8_t i; + uint16_t i; char mem_name[ETH_RX_ADAPTER_SERVICE_NAME_LEN]; const uint8_t default_rss_key[] = { 0x6d, 0x5a, 0x56, 0xda, 0x25, 0x5b, 0x0e, 0xc2, @@ -866,7 +1973,7 @@ rte_event_eth_rx_adapter_create_ext(uint8_t id, uint8_t dev_id, return ret; } - rx_adapter = id_to_rx_adapter(id); + rx_adapter = rxa_id_to_adapter(id); if (rx_adapter != NULL) { RTE_EDEV_LOG_ERR("Eth Rx adapter exists id = %" PRIu8, id); return -EEXIST; @@ -888,9 +1995,11 @@ rte_event_eth_rx_adapter_create_ext(uint8_t id, uint8_t dev_id, rx_adapter->socket_id = socket_id; rx_adapter->conf_cb = conf_cb; rx_adapter->conf_arg = conf_arg; + rx_adapter->id = id; strcpy(rx_adapter->mem_name, mem_name); rx_adapter->eth_devices = rte_zmalloc_socket(rx_adapter->mem_name, - rte_eth_dev_count() * + /* FIXME: incompatible with hotplug */ + rte_eth_dev_count_total() * sizeof(struct eth_device_info), 0, socket_id); rte_convert_rss_key((const uint32_t *)default_rss_key, @@ -903,11 +2012,11 @@ rte_event_eth_rx_adapter_create_ext(uint8_t id, uint8_t dev_id, return -ENOMEM; } rte_spinlock_init(&rx_adapter->rx_lock); - for (i = 0; i < rte_eth_dev_count(); i++) + RTE_ETH_FOREACH_DEV(i) rx_adapter->eth_devices[i].dev = &rte_eth_devices[i]; event_eth_rx_adapter[id] = rx_adapter; - if (conf_cb == default_conf_cb) + if (conf_cb == rxa_default_conf_cb) rx_adapter->default_cb_arg = 1; return 0; } @@ -928,7 +2037,7 @@ rte_event_eth_rx_adapter_create(uint8_t id, uint8_t dev_id, return -ENOMEM; *pc = *port_config; ret = rte_event_eth_rx_adapter_create_ext(id, dev_id, - default_conf_cb, + rxa_default_conf_cb, pc); if (ret) rte_free(pc); @@ -942,7 +2051,7 @@ rte_event_eth_rx_adapter_free(uint8_t id) RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL); - rx_adapter = id_to_rx_adapter(id); + rx_adapter = rxa_id_to_adapter(id); if (rx_adapter == NULL) return -EINVAL; @@ -963,7 +2072,7 @@ rte_event_eth_rx_adapter_free(uint8_t id) int rte_event_eth_rx_adapter_queue_add(uint8_t id, - uint8_t eth_dev_id, + uint16_t eth_dev_id, int32_t rx_queue_id, const struct rte_event_eth_rx_adapter_queue_conf *queue_conf) { @@ -972,12 +2081,11 @@ rte_event_eth_rx_adapter_queue_add(uint8_t id, struct rte_event_eth_rx_adapter *rx_adapter; struct rte_eventdev *dev; struct eth_device_info *dev_info; - int start_service; RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL); RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL); - rx_adapter = id_to_rx_adapter(id); + rx_adapter = rxa_id_to_adapter(id); if ((rx_adapter == NULL) || (queue_conf == NULL)) return -EINVAL; @@ -987,7 +2095,7 @@ rte_event_eth_rx_adapter_queue_add(uint8_t id, &cap); if (ret) { RTE_EDEV_LOG_ERR("Failed to get adapter caps edev %" PRIu8 - "eth port %" PRIu8, id, eth_dev_id); + "eth port %" PRIu16, id, eth_dev_id); return ret; } @@ -995,7 +2103,7 @@ rte_event_eth_rx_adapter_queue_add(uint8_t id, && (queue_conf->rx_queue_flags & RTE_EVENT_ETH_RX_ADAPTER_QUEUE_FLOW_ID_VALID)) { RTE_EDEV_LOG_ERR("Flow ID override is not supported," - " eth port: %" PRIu8 " adapter id: %" PRIu8, + " eth port: %" PRIu16 " adapter id: %" PRIu8, eth_dev_id, id); return -EINVAL; } @@ -1003,7 +2111,8 @@ rte_event_eth_rx_adapter_queue_add(uint8_t id, if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_MULTI_EVENTQ) == 0 && (rx_queue_id != -1)) { RTE_EDEV_LOG_ERR("Rx queues can only be connected to single " - "event queue id %u eth port %u", id, eth_dev_id); + "event queue, eth port: %" PRIu16 " adapter id: %" + PRIu8, eth_dev_id, id); return -EINVAL; } @@ -1014,7 +2123,6 @@ rte_event_eth_rx_adapter_queue_add(uint8_t id, return -EINVAL; } - start_service = 0; dev_info = &rx_adapter->eth_devices[eth_dev_id]; if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) { @@ -1034,33 +2142,34 @@ rte_event_eth_rx_adapter_queue_add(uint8_t id, &rte_eth_devices[eth_dev_id], rx_queue_id, queue_conf); if (ret == 0) { - update_queue_info(rx_adapter, + dev_info->internal_event_port = 1; + rxa_update_queue(rx_adapter, &rx_adapter->eth_devices[eth_dev_id], rx_queue_id, 1); } } else { rte_spinlock_lock(&rx_adapter->rx_lock); - ret = init_service(rx_adapter, id); - if (ret == 0) - ret = add_rx_queue(rx_adapter, eth_dev_id, rx_queue_id, + dev_info->internal_event_port = 0; + ret = rxa_init_service(rx_adapter, id); + if (ret == 0) { + uint32_t service_id = rx_adapter->service_id; + ret = rxa_sw_add(rx_adapter, eth_dev_id, rx_queue_id, queue_conf); + rte_service_component_runstate_set(service_id, + rxa_sw_adapter_queue_count(rx_adapter)); + } rte_spinlock_unlock(&rx_adapter->rx_lock); - if (ret == 0) - start_service = !!sw_rx_adapter_queue_count(rx_adapter); } if (ret) return ret; - if (start_service) - rte_service_component_runstate_set(rx_adapter->service_id, 1); - return 0; } int -rte_event_eth_rx_adapter_queue_del(uint8_t id, uint8_t eth_dev_id, +rte_event_eth_rx_adapter_queue_del(uint8_t id, uint16_t eth_dev_id, int32_t rx_queue_id) { int ret = 0; @@ -1068,12 +2177,17 @@ rte_event_eth_rx_adapter_queue_del(uint8_t id, uint8_t eth_dev_id, struct rte_event_eth_rx_adapter *rx_adapter; struct eth_device_info *dev_info; uint32_t cap; - uint16_t i; + uint32_t nb_rx_poll = 0; + uint32_t nb_wrr = 0; + uint32_t nb_rx_intr; + struct eth_rx_poll_entry *rx_poll = NULL; + uint32_t *rx_wrr = NULL; + int num_intr_vec; RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL); RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL); - rx_adapter = id_to_rx_adapter(id); + rx_adapter = rxa_id_to_adapter(id); if (rx_adapter == NULL) return -EINVAL; @@ -1100,7 +2214,7 @@ rte_event_eth_rx_adapter_queue_del(uint8_t id, uint8_t eth_dev_id, &rte_eth_devices[eth_dev_id], rx_queue_id); if (ret == 0) { - update_queue_info(rx_adapter, + rxa_update_queue(rx_adapter, &rx_adapter->eth_devices[eth_dev_id], rx_queue_id, 0); @@ -1110,48 +2224,78 @@ rte_event_eth_rx_adapter_queue_del(uint8_t id, uint8_t eth_dev_id, } } } else { - int rc; + rxa_calc_nb_post_del(rx_adapter, dev_info, rx_queue_id, + &nb_rx_poll, &nb_rx_intr, &nb_wrr); + + ret = rxa_alloc_poll_arrays(rx_adapter, nb_rx_poll, nb_wrr, + &rx_poll, &rx_wrr); + if (ret) + return ret; + rte_spinlock_lock(&rx_adapter->rx_lock); - if (rx_queue_id == -1) { - for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++) - event_eth_rx_adapter_queue_del(rx_adapter, - dev_info, - i); - } else { - event_eth_rx_adapter_queue_del(rx_adapter, - dev_info, - (uint16_t)rx_queue_id); + + num_intr_vec = 0; + if (rx_adapter->num_rx_intr > nb_rx_intr) { + + num_intr_vec = rxa_nb_intr_vect(dev_info, + rx_queue_id, 0); + ret = rxa_del_intr_queue(rx_adapter, dev_info, + rx_queue_id); + if (ret) + goto unlock_ret; + } + + if (nb_rx_intr == 0) { + ret = rxa_free_intr_resources(rx_adapter); + if (ret) + goto unlock_ret; + } + + rxa_sw_del(rx_adapter, dev_info, rx_queue_id); + rxa_calc_wrr_sequence(rx_adapter, rx_poll, rx_wrr); + + rte_free(rx_adapter->eth_rx_poll); + rte_free(rx_adapter->wrr_sched); + + if (nb_rx_intr == 0) { + rte_free(dev_info->intr_queue); + dev_info->intr_queue = NULL; } - rc = eth_poll_wrr_calc(rx_adapter); - if (rc) - RTE_EDEV_LOG_ERR("WRR recalculation failed %" PRId32, - rc); + rx_adapter->eth_rx_poll = rx_poll; + rx_adapter->wrr_sched = rx_wrr; + rx_adapter->wrr_len = nb_wrr; + rx_adapter->num_intr_vec += num_intr_vec; if (dev_info->nb_dev_queues == 0) { rte_free(dev_info->rx_queue); dev_info->rx_queue = NULL; } - +unlock_ret: rte_spinlock_unlock(&rx_adapter->rx_lock); + if (ret) { + rte_free(rx_poll); + rte_free(rx_wrr); + return ret; + } + rte_service_component_runstate_set(rx_adapter->service_id, - sw_rx_adapter_queue_count(rx_adapter)); + rxa_sw_adapter_queue_count(rx_adapter)); } return ret; } - int rte_event_eth_rx_adapter_start(uint8_t id) { - return rx_adapter_ctrl(id, 1); + return rxa_ctrl(id, 1); } int rte_event_eth_rx_adapter_stop(uint8_t id) { - return rx_adapter_ctrl(id, 0); + return rxa_ctrl(id, 0); } int @@ -1168,13 +2312,13 @@ rte_event_eth_rx_adapter_stats_get(uint8_t id, RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL); - rx_adapter = id_to_rx_adapter(id); + rx_adapter = rxa_id_to_adapter(id); if (rx_adapter == NULL || stats == NULL) return -EINVAL; dev = &rte_eventdevs[rx_adapter->eventdev_id]; memset(stats, 0, sizeof(*stats)); - for (i = 0; i < rte_eth_dev_count(); i++) { + RTE_ETH_FOREACH_DEV(i) { dev_info = &rx_adapter->eth_devices[i]; if (dev_info->internal_event_port == 0 || dev->dev_ops->eth_rx_adapter_stats_get == NULL) @@ -1206,12 +2350,12 @@ rte_event_eth_rx_adapter_stats_reset(uint8_t id) RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL); - rx_adapter = id_to_rx_adapter(id); + rx_adapter = rxa_id_to_adapter(id); if (rx_adapter == NULL) return -EINVAL; dev = &rte_eventdevs[rx_adapter->eventdev_id]; - for (i = 0; i < rte_eth_dev_count(); i++) { + RTE_ETH_FOREACH_DEV(i) { dev_info = &rx_adapter->eth_devices[i]; if (dev_info->internal_event_port == 0 || dev->dev_ops->eth_rx_adapter_stats_reset == NULL) @@ -1231,7 +2375,7 @@ rte_event_eth_rx_adapter_service_id_get(uint8_t id, uint32_t *service_id) RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL); - rx_adapter = id_to_rx_adapter(id); + rx_adapter = rxa_id_to_adapter(id); if (rx_adapter == NULL || service_id == NULL) return -EINVAL; @@ -1240,3 +2384,47 @@ rte_event_eth_rx_adapter_service_id_get(uint8_t id, uint32_t *service_id) return rx_adapter->service_inited ? 0 : -ESRCH; } + +int rte_event_eth_rx_adapter_cb_register(uint8_t id, + uint16_t eth_dev_id, + rte_event_eth_rx_adapter_cb_fn cb_fn, + void *cb_arg) +{ + struct rte_event_eth_rx_adapter *rx_adapter; + struct eth_device_info *dev_info; + uint32_t cap; + int ret; + + RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL); + RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL); + + rx_adapter = rxa_id_to_adapter(id); + if (rx_adapter == NULL) + return -EINVAL; + + dev_info = &rx_adapter->eth_devices[eth_dev_id]; + if (dev_info->rx_queue == NULL) + return -EINVAL; + + ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id, + eth_dev_id, + &cap); + if (ret) { + RTE_EDEV_LOG_ERR("Failed to get adapter caps edev %" PRIu8 + "eth port %" PRIu16, id, eth_dev_id); + return ret; + } + + if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) { + RTE_EDEV_LOG_ERR("Rx callback not supported for eth port %" + PRIu16, eth_dev_id); + return -EINVAL; + } + + rte_spinlock_lock(&rx_adapter->rx_lock); + dev_info->cb_fn = cb_fn; + dev_info->cb_arg = cb_arg; + rte_spinlock_unlock(&rx_adapter->rx_lock); + + return 0; +}