2 * Copyright (c) 2016 Intel Corporation.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at:
7 * http://www.apache.org/licenses/LICENSE-2.0
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
19 #define MAX_RULES 0x100
20 #define MAX_TBL8 0x800
22 #define RX_RING_SIZE 0x400
23 #define TX_RING_SIZE 0x800
25 #define MPOOL_CACHE_SIZE 0x100
26 #define MPOOL_NB_BUF 0x20000
28 #define FRAG_MBUF_BUF_SIZE (RTE_PKTMBUF_HEADROOM + TLE_UDP_MAX_HDR)
29 #define FRAG_TTL MS_PER_S
30 #define FRAG_TBL_BUCKET_ENTRIES 16
32 #define FIRST_PORT 0x8000
34 #define RX_CSUM_OFFLOAD (DEV_RX_OFFLOAD_IPV4_CKSUM | DEV_RX_OFFLOAD_UDP_CKSUM)
35 #define TX_CSUM_OFFLOAD (DEV_TX_OFFLOAD_IPV4_CKSUM | DEV_TX_OFFLOAD_UDP_CKSUM)
37 #define OPT_SHORT_SBULK 'B'
38 #define OPT_LONG_SBULK "sburst"
40 #define OPT_SHORT_PROMISC 'P'
41 #define OPT_LONG_PROMISC "promisc"
43 #define OPT_SHORT_RBUFS 'R'
44 #define OPT_LONG_RBUFS "rbufs"
46 #define OPT_SHORT_SBUFS 'S'
47 #define OPT_LONG_SBUFS "sbufs"
49 #define OPT_SHORT_STREAMS 's'
50 #define OPT_LONG_STREAMS "streams"
52 #define OPT_SHORT_FECFG 'f'
53 #define OPT_LONG_FECFG "fecfg"
55 #define OPT_SHORT_BECFG 'b'
56 #define OPT_LONG_BECFG "becfg"
58 RTE_DEFINE_PER_LCORE(struct netbe_lcore *, _be);
59 RTE_DEFINE_PER_LCORE(struct netfe_lcore *, _fe);
63 static const struct option long_opt[] = {
64 {OPT_LONG_BECFG, 1, 0, OPT_SHORT_BECFG},
65 {OPT_LONG_FECFG, 1, 0, OPT_SHORT_FECFG},
66 {OPT_LONG_PROMISC, 0, 0, OPT_SHORT_PROMISC},
67 {OPT_LONG_RBUFS, 1, 0, OPT_SHORT_RBUFS},
68 {OPT_LONG_SBUFS, 1, 0, OPT_SHORT_SBUFS},
69 {OPT_LONG_SBULK, 1, 0, OPT_SHORT_SBULK},
70 {OPT_LONG_STREAMS, 1, 0, OPT_SHORT_STREAMS},
75 * IPv4 Input size in bytes for RSS hash key calculation.
76 * source address, destination address, source port, and destination port.
78 #define IPV4_TUPLE_SIZE 12
81 * IPv6 Input size in bytes for RSS hash key calculation.
82 * source address, destination address, source port, and destination port.
84 #define IPV6_TUPLE_SIZE 36
87 * Location to be modified to create the IPv4 hash key which helps
88 * to distribute packets based on the destination UDP port.
90 #define RSS_HASH_KEY_DEST_PORT_LOC_IPV4 15
93 * Location to be modified to create the IPv6 hash key which helps
94 * to distribute packets based on the destination UDP port.
96 #define RSS_HASH_KEY_DEST_PORT_LOC_IPV6 39
99 * Size of the rte_eth_rss_reta_entry64 array to update through
100 * rte_eth_dev_rss_reta_update.
102 #define RSS_RETA_CONF_ARRAY_SIZE (ETH_RSS_RETA_SIZE_512/RTE_RETA_GROUP_SIZE)
104 #define NETBE_REALLOC(loc, n) do { \
105 (loc) = rte_realloc((loc), sizeof(*(loc)) * (n), RTE_CACHE_LINE_SIZE); \
106 if ((loc) == NULL) { \
107 RTE_LOG(ERR, USER1, \
108 "%s: failed to reallocate memory\n", \
114 static volatile int force_quit;
116 static struct netbe_cfg becfg;
117 static struct rte_mempool *mpool[RTE_MAX_NUMA_NODES + 1];
118 static struct rte_mempool *frag_mpool[RTE_MAX_NUMA_NODES + 1];
120 static const struct rte_eth_conf port_conf_default = {
122 .max_rx_pkt_len = ETHER_MAX_VLAN_FRAME_LEN,
129 #include "main_dpdk_legacy.h"
132 sig_handle(int signum)
134 RTE_LOG(ERR, USER1, "%s(%d)\n", __func__, signum);
139 prepare_hash_key(struct netbe_port *uprt, uint8_t key_size, uint16_t family)
143 align_nb_q = rte_align32pow2(uprt->nb_lcore);
144 memset(uprt->hash_key, 0, RSS_HASH_KEY_LENGTH);
145 uprt->hash_key_size = key_size;
146 if (family == AF_INET)
147 uprt->hash_key[RSS_HASH_KEY_DEST_PORT_LOC_IPV4] = align_nb_q;
149 uprt->hash_key[RSS_HASH_KEY_DEST_PORT_LOC_IPV6] = align_nb_q;
153 qidx_from_hash_index(uint32_t hash, uint32_t align_nb_q)
155 uint32_t i, nb_bit, q;
157 nb_bit = (sizeof(uint32_t) * CHAR_BIT) - __builtin_clz(align_nb_q - 1);
159 for (i = 1; i < nb_bit; i++) {
169 update_rss_conf(struct netbe_port *uprt,
170 const struct rte_eth_dev_info *dev_info,
171 struct rte_eth_conf *port_conf)
173 uint8_t hash_key_size;
175 if (uprt->nb_lcore > 1) {
176 if (dev_info->hash_key_size > 0)
177 hash_key_size = dev_info->hash_key_size;
180 "%s: dev_info did not provide a valid hash key size\n",
185 if (uprt->ipv4 != INADDR_ANY &&
186 memcmp(&uprt->ipv6, &in6addr_any,
187 sizeof(uprt->ipv6)) != 0) {
189 "%s: RSS for both IPv4 and IPv6 not supported!\n",
192 } else if (uprt->ipv4 != INADDR_ANY) {
193 prepare_hash_key(uprt, hash_key_size, AF_INET);
194 } else if (memcmp(&uprt->ipv6, &in6addr_any, sizeof(uprt->ipv6))
196 prepare_hash_key(uprt, hash_key_size, AF_INET6);
199 "%s: No IPv4 or IPv6 address is found!\n",
203 port_conf->rxmode.mq_mode = ETH_MQ_RX_RSS;
204 port_conf->rx_adv_conf.rss_conf.rss_hf = ETH_RSS_UDP;
205 port_conf->rx_adv_conf.rss_conf.rss_key_len = hash_key_size;
206 port_conf->rx_adv_conf.rss_conf.rss_key = uprt->hash_key;
213 update_rss_reta(struct netbe_port *uprt,
214 const struct rte_eth_dev_info *dev_info)
216 struct rte_eth_rss_reta_entry64 reta_conf[RSS_RETA_CONF_ARRAY_SIZE];
217 int32_t i, rc, align_nb_q;
218 int32_t q_index, idx, shift;
220 if (uprt->nb_lcore > 1) {
221 if (dev_info->reta_size == 0) {
223 "%s: Redirection table size 0 is invalid for RSS\n",
227 RTE_LOG(NOTICE, USER1,
228 "%s: The reta size of port %d is %u\n",
229 __func__, uprt->id, dev_info->reta_size);
231 if (dev_info->reta_size > ETH_RSS_RETA_SIZE_512) {
233 "%s: More than %u entries of Reta not supported\n",
234 __func__, ETH_RSS_RETA_SIZE_512);
238 memset(reta_conf, 0, sizeof(reta_conf));
239 align_nb_q = rte_align32pow2(uprt->nb_lcore);
240 for (i = 0; i < align_nb_q; i++) {
241 q_index = qidx_from_hash_index(i, align_nb_q) %
244 idx = i / RTE_RETA_GROUP_SIZE;
245 shift = i % RTE_RETA_GROUP_SIZE;
246 reta_conf[idx].mask |= (1ULL << shift);
247 reta_conf[idx].reta[shift] = q_index;
248 RTE_LOG(NOTICE, USER1,
249 "%s: port=%u RSS reta conf: hash=%u, q=%u\n",
250 __func__, uprt->id, i, q_index);
253 rc = rte_eth_dev_rss_reta_update(uprt->id,
254 reta_conf, dev_info->reta_size);
257 "%s: Bad redirection table parameter, rc = %d\n",
267 * Initilise DPDK port.
268 * In current version, multi-queue per port is used.
271 port_init(struct netbe_port *uprt)
274 struct rte_eth_conf port_conf;
275 struct rte_eth_dev_info dev_info;
277 rte_eth_dev_info_get(uprt->id, &dev_info);
278 if ((dev_info.rx_offload_capa & uprt->rx_offload) != uprt->rx_offload) {
280 "port#%u supported/requested RX offloads don't match, "
281 "supported: %#x, requested: %#x;\n",
282 uprt->id, dev_info.rx_offload_capa, uprt->rx_offload);
285 if ((dev_info.tx_offload_capa & uprt->tx_offload) != uprt->tx_offload) {
287 "port#%u supported/requested TX offloads don't match, "
288 "supported: %#x, requested: %#x;\n",
289 uprt->id, dev_info.tx_offload_capa, uprt->tx_offload);
293 port_conf = port_conf_default;
294 if ((uprt->rx_offload & RX_CSUM_OFFLOAD) != 0) {
295 RTE_LOG(ERR, USER1, "%s(%u): enabling RX csum offload;\n",
297 port_conf.rxmode.hw_ip_checksum = 1;
299 port_conf.rxmode.max_rx_pkt_len = uprt->mtu + ETHER_CRC_LEN;
301 rc = update_rss_conf(uprt, &dev_info, &port_conf);
305 rc = rte_eth_dev_configure(uprt->id, uprt->nb_lcore, uprt->nb_lcore,
307 RTE_LOG(NOTICE, USER1,
308 "%s: rte_eth_dev_configure(prt_id=%u, nb_rxq=%u, nb_txq=%u) "
309 "returns %d;\n", __func__, uprt->id, uprt->nb_lcore,
318 queue_init(struct netbe_port *uprt, struct rte_mempool *mp)
322 struct rte_eth_dev_info dev_info;
324 rte_eth_dev_info_get(uprt->id, &dev_info);
326 socket = rte_eth_dev_socket_id(uprt->id);
328 dev_info.default_rxconf.rx_drop_en = 1;
330 dev_info.default_txconf.tx_free_thresh = TX_RING_SIZE / 2;
331 if (uprt->tx_offload != 0) {
332 RTE_LOG(ERR, USER1, "%s(%u): enabling full featured TX;\n",
334 dev_info.default_txconf.txq_flags = 0;
337 for (q = 0; q < uprt->nb_lcore; q++) {
338 rc = rte_eth_rx_queue_setup(uprt->id, q, RX_RING_SIZE,
339 socket, &dev_info.default_rxconf, mp);
342 "%s: rx queue=%u setup failed with error code: %d\n",
348 for (q = 0; q < uprt->nb_lcore; q++) {
349 rc = rte_eth_tx_queue_setup(uprt->id, q, TX_RING_SIZE,
350 socket, &dev_info.default_txconf);
353 "%s: tx queue=%u setup failed with error code: %d\n",
362 * Check that lcore is enabled, not master, and not in use already.
365 check_lcore(uint32_t lc)
367 if (rte_lcore_is_enabled(lc) == 0) {
368 RTE_LOG(ERR, USER1, "lcore %u is not enabled\n", lc);
371 if (rte_eal_get_lcore_state(lc) == RUNNING) {
372 RTE_LOG(ERR, USER1, "lcore %u already running %p\n",
373 lc, lcore_config[lc].f);
380 log_netbe_prt(const struct netbe_port *uprt)
383 char corelist[2 * RTE_MAX_LCORE + 1];
384 char hashkey[2 * RSS_HASH_KEY_LENGTH];
386 memset(corelist, 0, sizeof(corelist));
387 memset(hashkey, 0, sizeof(hashkey));
388 for (i = 0; i < uprt->nb_lcore; i++)
389 if (i < uprt->nb_lcore - 1)
390 sprintf(corelist + (2 * i), "%u,", uprt->lcore[i]);
392 sprintf(corelist + (2 * i), "%u", uprt->lcore[i]);
394 for (i = 0; i < uprt->hash_key_size; i++)
395 sprintf(hashkey + (2 * i), "%02x", uprt->hash_key[i]);
397 RTE_LOG(NOTICE, USER1,
398 "uprt %p = <id = %u, lcore = <%s>, mtu = %u, "
399 "rx_offload = %u, tx_offload = %u,\n"
401 "ipv6 = %04hx:%04hx:%04hx:%04hx:%04hx:%04hx:%04hx:%04hx, "
402 "mac = %02hhx:%02hhx:%02hhx:%02hhx:%02hhx:%02hhx>;\n"
404 uprt, uprt->id, corelist,
405 uprt->mtu, uprt->rx_offload, uprt->tx_offload,
407 uprt->ipv6.s6_addr16[0], uprt->ipv6.s6_addr16[1],
408 uprt->ipv6.s6_addr16[2], uprt->ipv6.s6_addr16[3],
409 uprt->ipv6.s6_addr16[4], uprt->ipv6.s6_addr16[5],
410 uprt->ipv6.s6_addr16[6], uprt->ipv6.s6_addr16[7],
411 uprt->mac.addr_bytes[0], uprt->mac.addr_bytes[1],
412 uprt->mac.addr_bytes[2], uprt->mac.addr_bytes[3],
413 uprt->mac.addr_bytes[4], uprt->mac.addr_bytes[5],
418 log_netbe_cfg(const struct netbe_cfg *ucfg)
422 RTE_LOG(NOTICE, USER1,
423 "ucfg @ %p, prt_num = %u\n", ucfg, ucfg->prt_num);
425 for (i = 0; i != ucfg->prt_num; i++)
426 log_netbe_prt(ucfg->prt + i);
430 pool_init(uint32_t sid)
433 struct rte_mempool *mp;
434 char name[RTE_MEMPOOL_NAMESIZE];
436 snprintf(name, sizeof(name), "MP%u", sid);
437 mp = rte_pktmbuf_pool_create(name, MPOOL_NB_BUF, MPOOL_CACHE_SIZE, 0,
438 RTE_MBUF_DEFAULT_BUF_SIZE, sid - 1);
441 RTE_LOG(ERR, USER1, "%s(%d) failed with error code: %d\n",
442 __func__, sid - 1, rc);
451 frag_pool_init(uint32_t sid)
454 struct rte_mempool *frag_mp;
455 char frag_name[RTE_MEMPOOL_NAMESIZE];
457 snprintf(frag_name, sizeof(frag_name), "frag_MP%u", sid);
458 frag_mp = rte_pktmbuf_pool_create(frag_name, MPOOL_NB_BUF,
459 MPOOL_CACHE_SIZE, 0, FRAG_MBUF_BUF_SIZE, sid - 1);
460 if (frag_mp == NULL) {
462 RTE_LOG(ERR, USER1, "%s(%d) failed with error code: %d\n",
463 __func__, sid - 1, rc);
467 frag_mpool[sid] = frag_mp;
471 static struct netbe_lcore *
472 find_initilized_lcore(struct netbe_cfg *cfg, uint32_t lc_num)
476 for (i = 0; i < cfg->cpu_num; i++)
477 if (cfg->cpu[i].id == lc_num)
484 calculate_nb_prtq(struct netbe_cfg *cfg)
487 struct netbe_port *prt;
488 struct netbe_lcore *lc;
490 for (i = 0; i != cfg->prt_num; i++) {
492 for (j = 0; j != prt->nb_lcore; j++) {
493 rc = check_lcore(prt->lcore[j]);
496 "%s: processing failed with err: %d\n",
501 lc = find_initilized_lcore(cfg, prt->lcore[j]);
503 NETBE_REALLOC(cfg->cpu, cfg->cpu_num + 1);
504 lc = &cfg->cpu[cfg->cpu_num];
505 lc->id = prt->lcore[j];
509 NETBE_REALLOC(lc->prtq, lc->prtq_num + 1);
510 lc->prtq[lc->prtq_num].rxqid = j;
511 lc->prtq[lc->prtq_num].txqid = j;
512 lc->prtq[lc->prtq_num].port = *prt;
521 * Setup all enabled ports.
524 netbe_port_init(struct netbe_cfg *cfg, int argc, char *argv[])
527 uint32_t i, n, sid, j;
528 struct netbe_port *prt;
533 for (i = 0; i != n; i++) {
534 NETBE_REALLOC(cfg->prt, cfg->prt_num + 1);
535 rc = parse_netbe_arg(cfg->prt + i, argv[i]);
538 "%s: processing of \"%s\" failed with error code: %d\n",
539 __func__, argv[i], rc);
545 /* calculate number of queues per lcore. */
546 rc = calculate_nb_prtq(cfg);
548 RTE_LOG(ERR, USER1, "%s: processing of arguments failed"
549 " with error code: %d\n", __func__, rc);
553 for (i = 0; i != cfg->prt_num; i++) {
558 "%s: port=%u init failed with error code: %d\n",
559 __func__, prt->id, rc);
562 rte_eth_macaddr_get(prt->id, &prt->mac);
564 rte_eth_promiscuous_enable(prt->id);
566 for (j = 0; j < prt->nb_lcore; j++) {
567 sid = rte_lcore_to_socket_id(prt->lcore[j]) + 1;
568 assert(sid < RTE_DIM(mpool));
570 if (mpool[sid] == NULL) {
576 if (frag_mpool[sid] == NULL) {
577 rc = frag_pool_init(sid);
582 rc = queue_init(prt, mpool[sid]);
585 "%s: lcore=%u queue init failed with err: %d\n",
586 __func__, prt->lcore[j], rc);
597 * UDP IPv6 destination lookup callback.
600 lpm6_dst_lookup(void *data, const struct in6_addr *addr,
601 struct tle_udp_dest *res)
605 struct netbe_lcore *lc;
606 struct tle_udp_dest *dst;
610 p = (uintptr_t)addr->s6_addr;
612 rc = rte_lpm6_lookup(lc->lpm6, (uint8_t *)p, &idx);
614 dst = &lc->dst6[idx];
615 rte_memcpy(res, dst, dst->l2_len + dst->l3_len +
616 offsetof(struct tle_udp_dest, hdr));
622 netbe_add_ipv4_route(struct netbe_lcore *lc, const struct netbe_dest *dst,
626 uint32_t addr, depth;
627 char str[INET_ADDRSTRLEN];
630 addr = rte_be_to_cpu_32(dst->ipv4.s_addr);
632 inet_ntop(AF_INET, &dst->ipv4, str, sizeof(str));
633 rc = rte_lpm_add(lc->lpm4, addr, depth, idx);
634 RTE_LOG(NOTICE, USER1, "%s(lcore=%u,port=%u,dev=%p,"
636 "mac=%02hhx:%02hhx:%02hhx:%02hhx:%02hhx:%02hhx) "
638 __func__, lc->id, dst->port, lc->dst4[idx].dev,
639 str, depth, lc->dst4[idx].mtu,
640 dst->mac.addr_bytes[0], dst->mac.addr_bytes[1],
641 dst->mac.addr_bytes[2], dst->mac.addr_bytes[3],
642 dst->mac.addr_bytes[4], dst->mac.addr_bytes[5],
648 netbe_add_ipv6_route(struct netbe_lcore *lc, const struct netbe_dest *dst,
653 char str[INET6_ADDRSTRLEN];
657 rc = rte_lpm6_add(lc->lpm6, (uint8_t *)(uintptr_t)dst->ipv6.s6_addr,
660 inet_ntop(AF_INET6, &dst->ipv6, str, sizeof(str));
661 RTE_LOG(NOTICE, USER1, "%s(lcore=%u,port=%u,dev=%p,"
663 "mac=%02hhx:%02hhx:%02hhx:%02hhx:%02hhx:%02hhx) "
665 __func__, lc->id, dst->port, lc->dst6[idx].dev,
666 str, depth, lc->dst4[idx].mtu,
667 dst->mac.addr_bytes[0], dst->mac.addr_bytes[1],
668 dst->mac.addr_bytes[2], dst->mac.addr_bytes[3],
669 dst->mac.addr_bytes[4], dst->mac.addr_bytes[5],
675 fill_dst(struct tle_udp_dest *dst, struct netbe_dev *bed,
676 const struct netbe_dest *bdp, uint16_t l3_type, int32_t sid)
678 struct ether_hdr *eth;
679 struct ipv4_hdr *ip4h;
680 struct ipv6_hdr *ip6h;
682 static const struct ipv4_hdr ipv4_tmpl = {
683 .version_ihl = 4 << 4 | sizeof(*ip4h) / IPV4_IHL_MULTIPLIER,
685 .next_proto_id = IPPROTO_UDP,
688 static const struct ipv6_hdr ipv6_tmpl = {
690 .proto = IPPROTO_UDP,
695 dst->head_mp = frag_mpool[sid + 1];
696 dst->mtu = RTE_MIN(bdp->mtu, bed->port.mtu);
697 dst->l2_len = sizeof(*eth);
699 eth = (struct ether_hdr *)dst->hdr;
701 ether_addr_copy(&bed->port.mac, ð->s_addr);
702 ether_addr_copy(&bdp->mac, ð->d_addr);
703 eth->ether_type = rte_cpu_to_be_16(l3_type);
705 if (l3_type == ETHER_TYPE_IPv4) {
706 dst->l3_len = sizeof(*ip4h);
707 ip4h = (struct ipv4_hdr *)(eth + 1);
709 } else if (l3_type == ETHER_TYPE_IPv6) {
710 dst->l3_len = sizeof(*ip6h);
711 ip6h = (struct ipv6_hdr *)(eth + 1);
717 create_context(struct netbe_lcore *lc, const struct tle_udp_ctx_param *ctx_prm)
719 uint32_t rc = 0, sid;
720 uint64_t frag_cycles;
721 struct tle_udp_ctx_param cprm;
723 if (lc->ctx == NULL) {
724 sid = rte_lcore_to_socket_id(lc->id);
726 rc = lcore_lpm_init(lc);
731 cprm.socket_id = sid;
732 cprm.lookup4 = lpm4_dst_lookup;
733 cprm.lookup4_data = lc;
734 cprm.lookup6 = lpm6_dst_lookup;
735 cprm.lookup6_data = lc;
737 /* to facilitate both IPv4 and IPv6. */
738 cprm.max_streams *= 2;
740 frag_cycles = (rte_get_tsc_hz() + MS_PER_S - 1) /
743 lc->ftbl = rte_ip_frag_table_create(cprm.max_streams,
744 FRAG_TBL_BUCKET_ENTRIES, cprm.max_streams,
747 RTE_LOG(NOTICE, USER1, "%s(lcore=%u): frag_tbl=%p;\n",
748 __func__, lc->id, lc->ftbl);
750 lc->ctx = tle_udp_create(&cprm);
752 RTE_LOG(NOTICE, USER1, "%s(lcore=%u): udp_ctx=%p;\n",
753 __func__, lc->id, lc->ctx);
755 if (lc->ctx == NULL || lc->ftbl == NULL)
763 * BE lcore setup routine.
766 lcore_init(struct netbe_lcore *lc, const struct tle_udp_ctx_param *ctx_prm,
767 const uint32_t prtqid, const uint16_t *bl_ports, uint32_t nb_bl_ports)
770 struct tle_udp_dev_param dprm;
772 rc = create_context(lc, ctx_prm);
774 if (lc->ctx != NULL) {
775 memset(&dprm, 0, sizeof(dprm));
776 dprm.rx_offload = lc->prtq[prtqid].port.rx_offload;
777 dprm.tx_offload = lc->prtq[prtqid].port.tx_offload;
778 dprm.local_addr4.s_addr = lc->prtq[prtqid].port.ipv4;
779 memcpy(&dprm.local_addr6, &lc->prtq[prtqid].port.ipv6,
780 sizeof(lc->prtq[prtqid].port.ipv6));
781 dprm.bl4.nb_port = nb_bl_ports;
782 dprm.bl4.port = bl_ports;
783 dprm.bl6.nb_port = nb_bl_ports;
784 dprm.bl6.port = bl_ports;
786 lc->prtq[prtqid].dev = tle_udp_add_dev(lc->ctx, &dprm);
788 RTE_LOG(NOTICE, USER1,
789 "%s(lcore=%u, port=%u, qid=%u), udp_dev: %p\n",
790 __func__, lc->id, lc->prtq[prtqid].port.id,
791 lc->prtq[prtqid].rxqid, lc->prtq[prtqid].dev);
793 if (lc->prtq[prtqid].dev == NULL)
798 "%s(lcore=%u) failed with error code: %d\n",
799 __func__, lc->id, rc);
800 tle_udp_destroy(lc->ctx);
801 rte_ip_frag_table_destroy(lc->ftbl);
802 rte_lpm_free(lc->lpm4);
803 rte_lpm6_free(lc->lpm6);
804 rte_free(lc->prtq[prtqid].port.lcore);
805 lc->prtq[prtqid].port.nb_lcore = 0;
816 create_blocklist(const struct netbe_port *beprt, uint16_t *bl_ports,
819 uint32_t i, j, qid, align_nb_q;
821 align_nb_q = rte_align32pow2(beprt->nb_lcore);
822 for (i = 0, j = 0; i < (UINT16_MAX + 1); i++) {
823 qid = (i % align_nb_q) % beprt->nb_lcore;
832 netbe_lcore_init(struct netbe_cfg *cfg,
833 const struct tle_udp_ctx_param *ctx_prm)
836 uint32_t i, j, nb_bl_ports = 0, sz;
837 struct netbe_lcore *lc;
838 static uint16_t *bl_ports;
840 /* Create the udp context and attached queue for each lcore. */
842 sz = sizeof(uint16_t) * UINT16_MAX;
843 bl_ports = rte_zmalloc(NULL, sz, RTE_CACHE_LINE_SIZE);
844 for (i = 0; i < cfg->cpu_num; i++) {
846 for (j = 0; j < lc->prtq_num; j++) {
847 memset((uint8_t *)bl_ports, 0, sz);
848 /* create list of blocked ports based on q */
849 nb_bl_ports = create_blocklist(&lc->prtq[j].port,
850 bl_ports, lc->prtq[j].rxqid);
851 RTE_LOG(NOTICE, USER1,
852 "lc=%u, q=%u, nb_bl_ports=%u\n",
853 lc->id, lc->prtq[j].rxqid, nb_bl_ports);
855 rc = lcore_init(lc, ctx_prm, j, bl_ports, nb_bl_ports);
858 "%s: failed with error code: %d\n",
871 netbe_lcore_fini(struct netbe_cfg *cfg)
875 for (i = 0; i != cfg->cpu_num; i++) {
876 tle_udp_destroy(cfg->cpu[i].ctx);
877 rte_ip_frag_table_destroy(cfg->cpu[i].ftbl);
878 rte_lpm_free(cfg->cpu[i].lpm4);
879 rte_lpm6_free(cfg->cpu[i].lpm6);
881 rte_free(cfg->cpu[i].prtq);
882 cfg->cpu[i].prtq_num = 0;
887 for (i = 0; i != cfg->prt_num; i++) {
888 rte_free(cfg->prt[i].lcore);
889 cfg->prt[i].nb_lcore = 0;
896 netbe_add_dest(struct netbe_lcore *lc, uint32_t dev_idx, uint16_t family,
897 const struct netbe_dest *dst, uint32_t dnum)
902 struct tle_udp_dest *dp;
904 if (family == AF_INET) {
907 m = RTE_DIM(lc->dst4);
908 l3_type = ETHER_TYPE_IPv4;
912 m = RTE_DIM(lc->dst6);
913 l3_type = ETHER_TYPE_IPv6;
917 RTE_LOG(ERR, USER1, "%s(lcore=%u, family=%hu, dnum=%u) exceeds "
918 "maximum allowed number of destinations(%u);\n",
919 __func__, lc->id, family, dnum, m);
923 sid = rte_lcore_to_socket_id(lc->id);
926 for (i = 0; i != dnum && rc == 0; i++) {
927 fill_dst(dp + i, lc->prtq + dev_idx, dst + i, l3_type, sid);
928 if (family == AF_INET)
929 rc = netbe_add_ipv4_route(lc, dst + i, n + i);
931 rc = netbe_add_ipv6_route(lc, dst + i, n + i);
934 if (family == AF_INET)
935 lc->dst4_num = n + i;
937 lc->dst6_num = n + i;
943 netbe_dest_init(const char *fname, struct netbe_cfg *cfg)
948 struct netbe_lcore *lc;
949 struct netbe_dest_prm prm;
951 rc = netbe_parse_dest(fname, &prm);
956 for (i = 0; i != prm.nb_dest; i++) {
958 p = prm.dest[i].port;
959 f = prm.dest[i].family;
962 for (k = 0; k != cfg->cpu_num; k++) {
964 for (l = 0; l != lc->prtq_num; l++)
965 if (lc->prtq[l].port.id == p) {
966 rc = netbe_add_dest(lc, l, f,
970 "%s(lcore=%u, family=%u) could not "
971 "add destinations(%u);\n",
972 __func__, lc->id, f, i);
980 RTE_LOG(ERR, USER1, "%s(%s) error at line %u: "
981 "port %u not managed by any lcore;\n",
982 __func__, fname, prm.dest[i].line, p);
992 netfe_stream_close(struct netfe_lcore *fe, uint32_t dec)
998 tle_event_free(fe->fs[sidx].txev);
999 tle_event_free(fe->fs[sidx].rxev);
1000 tle_udp_stream_close(fe->fs[sidx].s);
1001 memset(&fe->fs[sidx], 0, sizeof(fe->fs[sidx]));
1005 netfe_stream_dump(const struct netfe_stream *fes)
1007 struct sockaddr_in *l4, *r4;
1008 struct sockaddr_in6 *l6, *r6;
1009 uint16_t lport, rport;
1010 struct tle_udp_stream_param sprm;
1011 char laddr[INET6_ADDRSTRLEN];
1012 char raddr[INET6_ADDRSTRLEN];
1014 tle_udp_stream_get_param(fes->s, &sprm);
1016 if (sprm.local_addr.ss_family == AF_INET) {
1018 l4 = (struct sockaddr_in *)&sprm.local_addr;
1019 r4 = (struct sockaddr_in *)&sprm.remote_addr;
1021 lport = l4->sin_port;
1022 rport = r4->sin_port;
1024 } else if (sprm.local_addr.ss_family == AF_INET6) {
1026 l6 = (struct sockaddr_in6 *)&sprm.local_addr;
1027 r6 = (struct sockaddr_in6 *)&sprm.remote_addr;
1029 lport = l6->sin6_port;
1030 rport = r6->sin6_port;
1033 RTE_LOG(ERR, USER1, "stream@%p - unknown family=%hu\n",
1034 fes->s, sprm.local_addr.ss_family);
1038 format_addr(&sprm.local_addr, laddr, sizeof(laddr));
1039 format_addr(&sprm.remote_addr, raddr, sizeof(raddr));
1041 RTE_LOG(INFO, USER1,
1043 "family=%hu,laddr=%s,lport=%hu,raddr=%s,rport=%hu,"
1045 "rxp=%" PRIu64 ",txp=%" PRIu64 ",drops=%" PRIu64 ","
1046 "rxev[IDLE, DOWN, UP]=[%" PRIu64 ", %" PRIu64 ", %" PRIu64 "],"
1047 "txev[IDLE, DOWN, UP]=[%" PRIu64 ", %" PRIu64 ", %" PRIu64 "],"
1050 sprm.local_addr.ss_family,
1051 laddr, ntohs(lport), raddr, ntohs(rport),
1052 fes->stat.rxp, fes->stat.txp, fes->stat.drops,
1053 fes->stat.rxev[TLE_SEV_IDLE],
1054 fes->stat.rxev[TLE_SEV_DOWN],
1055 fes->stat.rxev[TLE_SEV_UP],
1056 fes->stat.txev[TLE_SEV_IDLE],
1057 fes->stat.txev[TLE_SEV_DOWN],
1058 fes->stat.txev[TLE_SEV_UP]);
1062 * helper function: opens IPv4 and IPv6 streams for selected port.
1064 static struct netfe_stream *
1065 netfe_stream_open(struct netfe_lcore *fe, struct tle_udp_stream_param *sprm,
1066 uint32_t lcore, uint16_t op, uint32_t bidx)
1070 struct netfe_stream *fes;
1071 struct sockaddr_in *l4;
1072 struct sockaddr_in6 *l6;
1076 fes = fe->fs + sidx;
1077 if (sidx >= fe->snum) {
1078 rte_errno = ENOBUFS;
1082 fes->rxev = tle_event_alloc(fe->rxeq, &fe->fs[sidx]);
1083 fes->txev = tle_event_alloc(fe->txeq, &fe->fs[sidx]);
1084 sprm->recv_ev = fes->rxev;
1086 sprm->send_ev = fes->txev;
1089 "%s(%u) [%u]={op=%hu, rxev=%p, txev=%p}, be_lc=%u\n",
1090 __func__, lcore, sidx, op, fes->rxev, fes->txev,
1091 becfg.cpu[bidx].id);
1092 if (fes->rxev == NULL || fes->txev == NULL) {
1093 netfe_stream_close(fe, 0);
1098 if (op == TXONLY || op == FWD) {
1099 tle_event_active(fes->txev, TLE_SEV_DOWN);
1100 fes->stat.txev[TLE_SEV_DOWN]++;
1104 tle_event_active(fes->rxev, TLE_SEV_DOWN);
1105 fes->stat.rxev[TLE_SEV_DOWN]++;
1108 fes->s = tle_udp_stream_open(becfg.cpu[bidx].ctx, sprm);
1109 if (fes->s == NULL) {
1111 netfe_stream_close(fe, 0);
1114 if (sprm->local_addr.ss_family == AF_INET) {
1115 l4 = (struct sockaddr_in *) &sprm->local_addr;
1116 errport = ntohs(l4->sin_port);
1118 l6 = (struct sockaddr_in6 *) &sprm->local_addr;
1119 errport = ntohs(l6->sin6_port);
1121 RTE_LOG(ERR, USER1, "stream open failed for port %u with error "
1122 "code=%u, bidx=%u, lc=%u\n",
1123 errport, rc, bidx, becfg.cpu[bidx].id);
1128 fes->family = sprm->local_addr.ss_family;
1130 fe->sidx = sidx + 1;
1135 netfe_addr_eq(struct sockaddr_storage *l, struct sockaddr_storage *r,
1138 struct sockaddr_in *l4, *r4;
1139 struct sockaddr_in6 *l6, *r6;
1141 if (family == AF_INET) {
1142 l4 = (struct sockaddr_in *)l;
1143 r4 = (struct sockaddr_in *)r;
1144 return (l4->sin_port == r4->sin_port &&
1145 l4->sin_addr.s_addr == r4->sin_addr.s_addr);
1147 l6 = (struct sockaddr_in6 *)l;
1148 r6 = (struct sockaddr_in6 *)r;
1149 return (l6->sin6_port == r6->sin6_port &&
1150 memcmp(&l6->sin6_addr, &r6->sin6_addr,
1151 sizeof(l6->sin6_addr)));
1156 netfe_pkt_addr(const struct rte_mbuf *m, struct sockaddr_storage *ps,
1159 const struct ipv4_hdr *ip4h;
1160 const struct ipv6_hdr *ip6h;
1161 const struct udp_hdr *udph;
1162 struct sockaddr_in *in4;
1163 struct sockaddr_in6 *in6;
1167 udph = rte_pktmbuf_mtod_offset(m, struct udp_hdr *, -m->l4_len);
1169 if (family == AF_INET) {
1170 in4 = (struct sockaddr_in *)ps;
1171 ip4h = rte_pktmbuf_mtod_offset(m, struct ipv4_hdr *,
1172 -(m->l4_len + m->l3_len));
1173 in4->sin_port = udph->src_port;
1174 in4->sin_addr.s_addr = ip4h->src_addr;
1176 in6 = (struct sockaddr_in6 *)ps;
1177 ip6h = rte_pktmbuf_mtod_offset(m, struct ipv6_hdr *,
1178 -(m->l4_len + m->l3_len));
1179 in6->sin6_port = udph->src_port;
1180 rte_memcpy(&in6->sin6_addr, ip6h->src_addr,
1181 sizeof(in6->sin6_addr));
1185 static inline uint32_t
1186 pkt_eq_addr(struct rte_mbuf *pkt[], uint32_t num, uint16_t family,
1187 struct sockaddr_storage *cur, struct sockaddr_storage *nxt)
1191 for (i = 0; i != num; i++) {
1192 netfe_pkt_addr(pkt[i], nxt, family);
1193 if (netfe_addr_eq(cur, nxt, family) == 0)
1201 pkt_buf_empty(struct pkt_buf *pb)
1205 for (i = 0; i != pb->num; i++)
1206 rte_pktmbuf_free(pb->pkt[i]);
1212 pkt_buf_fill(uint32_t lcore, struct pkt_buf *pb, uint32_t dlen)
1217 sid = rte_lcore_to_socket_id(lcore) + 1;
1219 for (i = pb->num; i != RTE_DIM(pb->pkt); i++) {
1220 pb->pkt[i] = rte_pktmbuf_alloc(mpool[sid]);
1221 if (pb->pkt[i] == NULL)
1223 rte_pktmbuf_append(pb->pkt[i], dlen);
1229 static struct netfe_stream *
1230 find_fwd_dst(uint32_t lcore, struct netfe_stream *fes,
1231 const struct sockaddr *sa)
1234 struct netfe_stream *fed;
1235 struct netfe_lcore *fe;
1236 struct tle_udp_stream_param sprm;
1238 fe = RTE_PER_LCORE(_fe);
1240 fed = fwd_tbl_lkp(fe, fes->family, sa);
1244 /* create a new stream and put it into the fwd table. */
1246 sprm = fes->fwdprm.prm;
1248 /* open forward stream with wildcard remote addr. */
1249 memset(&sprm.remote_addr.ss_family + 1, 0,
1250 sizeof(sprm.remote_addr) - sizeof(sprm.remote_addr.ss_family));
1251 fed = netfe_stream_open(fe, &sprm, lcore, FWD, fes->fwdprm.bidx);
1255 rc = fwd_tbl_add(fe, fes->family, sa, fed);
1257 netfe_stream_close(fe, 1);
1261 fed->fwdprm.prm.remote_addr = *(const struct sockaddr_storage *)sa;
1266 netfe_tx_process(uint32_t lcore, struct netfe_stream *fes)
1270 /* refill with new mbufs. */
1271 pkt_buf_fill(lcore, &fes->pbuf, fes->txlen);
1277 k = tle_udp_stream_send(fes->s, fes->pbuf.pkt, n, NULL);
1278 NETFE_TRACE("%s(%u): tle_udp_stream_send(%p, %u) returns %u\n",
1279 __func__, lcore, fes->s, n, k);
1281 fes->stat.drops += n - k;
1286 /* adjust pbuf array. */
1287 fes->pbuf.num = n - k;
1288 for (i = k; i != n; i++)
1289 fes->pbuf.pkt[i - k] = fes->pbuf.pkt[i];
1293 netfe_fwd(uint32_t lcore, struct netfe_stream *fes)
1295 uint32_t i, j, k, n, x;
1297 void *pi0, *pi1, *pt;
1298 struct rte_mbuf **pkt;
1299 struct netfe_stream *fed;
1300 struct sockaddr_storage in[2];
1302 family = fes->family;
1304 pkt = fes->pbuf.pkt;
1309 in[0].ss_family = family;
1310 in[1].ss_family = family;
1314 netfe_pkt_addr(pkt[0], pi0, family);
1317 for (i = 0; i != n; i = j) {
1319 j = i + pkt_eq_addr(&pkt[i + 1],
1320 n - i - 1, family, pi0, pi1) + 1;
1322 fed = find_fwd_dst(lcore, fes, (const struct sockaddr *)pi0);
1325 k = tle_udp_stream_send(fed->s, pkt + i, j - i,
1326 (const struct sockaddr *)
1327 &fes->fwdprm.prm.remote_addr);
1329 NETFE_TRACE("%s(%u): tle_udp_stream_send(%p, %u) "
1331 __func__, lcore, fed->s, j - i, k);
1333 fed->stat.drops += j - i - k;
1337 NETFE_TRACE("%s(%u, %p): no fwd stream for %u pkts;\n",
1338 __func__, lcore, fes->s, j - i);
1339 for (k = i; k != j; k++) {
1340 NETFE_TRACE("%s(%u, %p): free(%p);\n",
1341 __func__, lcore, fes->s, pkt[k]);
1342 rte_pktmbuf_free(pkt[j]);
1344 fes->stat.drops += j - i;
1347 /* copy unforwarded mbufs. */
1348 for (i += k; i != j; i++, x++)
1351 /* swap the pointers */
1360 tle_event_raise(fes->txev);
1361 fes->stat.txev[TLE_SEV_UP]++;
1364 if (n == RTE_DIM(fes->pbuf.pkt)) {
1365 tle_event_active(fes->rxev, TLE_SEV_UP);
1366 fes->stat.rxev[TLE_SEV_UP]++;
1371 netfe_rx_process(__rte_unused uint32_t lcore, struct netfe_stream *fes)
1376 k = RTE_DIM(fes->pbuf.pkt) - n;
1378 /* packet buffer is full, can't receive any new packets. */
1380 tle_event_idle(fes->rxev);
1381 fes->stat.rxev[TLE_SEV_IDLE]++;
1385 n = tle_udp_stream_recv(fes->s, fes->pbuf.pkt + n, k);
1389 NETFE_TRACE("%s(%u): tle_udp_stream_recv(%p, %u) returns %u\n",
1390 __func__, lcore, fes->s, k, n);
1395 /* free all received mbufs. */
1396 if (fes->op == RXONLY)
1397 pkt_buf_empty(&fes->pbuf);
1398 /* mark stream as writable */
1399 else if (k == RTE_DIM(fes->pbuf.pkt)) {
1400 if (fes->op == RXTX) {
1401 tle_event_active(fes->txev, TLE_SEV_UP);
1402 fes->stat.txev[TLE_SEV_UP]++;
1403 } else if (fes->op == FWD) {
1404 tle_event_raise(fes->txev);
1405 fes->stat.txev[TLE_SEV_UP]++;
1411 netfe_rxtx_process(__rte_unused uint32_t lcore, struct netfe_stream *fes)
1413 uint32_t i, j, k, n;
1415 void *pi0, *pi1, *pt;
1416 struct rte_mbuf **pkt;
1417 struct sockaddr_storage in[2];
1419 family = fes->family;
1421 pkt = fes->pbuf.pkt;
1423 /* there is nothing to send. */
1425 tle_event_idle(fes->txev);
1426 fes->stat.txev[TLE_SEV_IDLE]++;
1430 in[0].ss_family = family;
1431 in[1].ss_family = family;
1435 netfe_pkt_addr(pkt[0], pi0, family);
1437 for (i = 0; i != n; i = j) {
1439 j = i + pkt_eq_addr(&pkt[i + 1],
1440 n - i - 1, family, pi0, pi1) + 1;
1442 k = tle_udp_stream_send(fes->s, pkt + i, j - i,
1443 (const struct sockaddr *)pi0);
1445 NETFE_TRACE("%s(%u): tle_udp_stream_send(%p, %u) returns %u\n",
1446 __func__, lcore, fes->s, j - i, k);
1448 fes->stat.drops += j - i - k;
1452 /* stream send buffer is full */
1456 /* swap the pointers */
1462 /* not able to send anything. */
1466 if (n == RTE_DIM(fes->pbuf.pkt)) {
1467 /* mark stream as readable */
1468 tle_event_active(fes->rxev, TLE_SEV_UP);
1469 fes->stat.rxev[TLE_SEV_UP]++;
1472 /* adjust pbuf array. */
1473 fes->pbuf.num = n - i;
1474 for (j = i; j != n; j++)
1475 pkt[j - i] = pkt[j];
1479 netfe_lcore_init(const struct netfe_lcore_prm *prm)
1483 uint32_t i, lcore, snum;
1484 struct netfe_lcore *fe;
1485 struct tle_evq_param eprm;
1486 struct tle_udp_stream_param sprm;
1487 struct netfe_stream *fes;
1489 lcore = rte_lcore_id();
1491 snum = prm->max_streams;
1492 RTE_LOG(NOTICE, USER1, "%s(lcore=%u, nb_streams=%u, max_streams=%u)\n",
1493 __func__, lcore, prm->nb_streams, snum);
1495 memset(&eprm, 0, sizeof(eprm));
1496 eprm.socket_id = rte_lcore_to_socket_id(lcore);
1497 eprm.max_events = snum;
1499 sz = sizeof(*fe) + snum * sizeof(fe->fs[0]);
1500 fe = rte_zmalloc_socket(NULL, sz, RTE_CACHE_LINE_SIZE,
1501 rte_lcore_to_socket_id(lcore));
1504 RTE_LOG(ERR, USER1, "%s:%d failed to allocate %zu bytes\n",
1505 __func__, __LINE__, sz);
1509 RTE_PER_LCORE(_fe) = fe;
1512 fe->fs = (struct netfe_stream *)(fe + 1);
1514 fe->rxeq = tle_evq_create(&eprm);
1515 fe->txeq = tle_evq_create(&eprm);
1517 RTE_LOG(INFO, USER1, "%s(%u) rx evq=%p, tx evq=%p\n",
1518 __func__, lcore, fe->rxeq, fe->txeq);
1519 if (fe->rxeq == NULL || fe->txeq == NULL)
1522 rc = fwd_tbl_init(fe, AF_INET, lcore);
1523 RTE_LOG(ERR, USER1, "%s(%u) fwd_tbl_init(%u) returns %d\n",
1524 __func__, lcore, AF_INET, rc);
1528 rc = fwd_tbl_init(fe, AF_INET6, lcore);
1529 RTE_LOG(ERR, USER1, "%s(%u) fwd_tbl_init(%u) returns %d\n",
1530 __func__, lcore, AF_INET6, rc);
1534 /* open all requested streams. */
1535 for (i = 0; i != prm->nb_streams; i++) {
1536 sprm = prm->stream[i].sprm.prm;
1537 fes = netfe_stream_open(fe, &sprm, lcore, prm->stream[i].op,
1538 prm->stream[i].sprm.bidx);
1544 netfe_stream_dump(fes);
1546 if (prm->stream[i].op == FWD) {
1547 fes->fwdprm = prm->stream[i].fprm;
1548 rc = fwd_tbl_add(fe,
1549 prm->stream[i].fprm.prm.remote_addr.ss_family,
1550 (const struct sockaddr *)
1551 &prm->stream[i].fprm.prm.remote_addr,
1554 netfe_stream_close(fe, 1);
1557 } else if (prm->stream[i].op == TXONLY) {
1558 fes->txlen = prm->stream[i].txlen;
1559 fes->raddr = sprm.remote_addr;
1569 struct netfe_lcore *fe;
1570 uint32_t j, n, lcore;
1571 struct netfe_stream *fs[MAX_PKT_BURST];
1573 fe = RTE_PER_LCORE(_fe);
1577 lcore = rte_lcore_id();
1579 n = tle_evq_get(fe->rxeq, (const void **)(uintptr_t)fs, RTE_DIM(fs));
1582 NETFE_TRACE("%s(%u): tle_evq_get(rxevq=%p) returns %u\n",
1583 __func__, lcore, fe->rxeq, n);
1584 for (j = 0; j != n; j++)
1585 netfe_rx_process(lcore, fs[j]);
1588 n = tle_evq_get(fe->txeq, (const void **)(uintptr_t)fs, RTE_DIM(fs));
1591 NETFE_TRACE("%s(%u): tle_evq_get(txevq=%p) returns %u\n",
1592 __func__, lcore, fe->txeq, n);
1593 for (j = 0; j != n; j++) {
1594 if (fs[j]->op == RXTX)
1595 netfe_rxtx_process(lcore, fs[j]);
1596 else if (fs[j]->op == FWD)
1597 netfe_fwd(lcore, fs[j]);
1598 else if (fs[j]->op == TXONLY)
1599 netfe_tx_process(lcore, fs[j]);
1605 netfe_lcore_fini(void)
1607 struct netfe_lcore *fe;
1610 fe = RTE_PER_LCORE(_fe);
1614 while (fe->sidx != 0) {
1616 netfe_stream_dump(fe->fs + i);
1617 netfe_stream_close(fe, 1);
1620 tle_evq_destroy(fe->txeq);
1621 tle_evq_destroy(fe->rxeq);
1622 RTE_PER_LCORE(_fe) = NULL;
1627 netbe_rx(struct netbe_lcore *lc, uint32_t pidx)
1630 struct rte_mbuf *pkt[MAX_PKT_BURST];
1631 struct rte_mbuf *rp[MAX_PKT_BURST];
1632 int32_t rc[MAX_PKT_BURST];
1634 n = rte_eth_rx_burst(lc->prtq[pidx].port.id,
1635 lc->prtq[pidx].rxqid, pkt, RTE_DIM(pkt));
1639 lc->prtq[pidx].rx_stat.in += n;
1640 NETBE_TRACE("%s(%u): rte_eth_rx_burst(%u, %u) returns %u\n",
1641 __func__, lc->id, lc->prtq[pidx].port.id, lc->prtq[pidx].rxqid,
1644 k = tle_udp_rx_bulk(lc->prtq[pidx].dev, pkt, rp, rc, n);
1646 lc->prtq[pidx].rx_stat.up += k;
1647 lc->prtq[pidx].rx_stat.drop += n - k;
1648 NETBE_TRACE("%s(%u): tle_udp_rx_bulk(%p, %u) returns %u\n",
1649 __func__, lc->id, lc->prtq[pidx].dev, n, k);
1651 for (j = 0; j != n - k; j++) {
1652 NETBE_TRACE("%s:%d(port=%u) rp[%u]={%p, %d};\n",
1653 __func__, __LINE__, lc->prtq[pidx].port.id,
1655 rte_pktmbuf_free(rp[j]);
1660 netbe_tx(struct netbe_lcore *lc, uint32_t pidx)
1663 struct rte_mbuf **mb;
1665 n = lc->prtq[pidx].tx_buf.num;
1666 k = RTE_DIM(lc->prtq[pidx].tx_buf.pkt) - n;
1667 mb = lc->prtq[pidx].tx_buf.pkt;
1669 if (k >= RTE_DIM(lc->prtq[pidx].tx_buf.pkt) / 2) {
1670 j = tle_udp_tx_bulk(lc->prtq[pidx].dev, mb + n, k);
1672 lc->prtq[pidx].tx_stat.down += j;
1678 NETBE_TRACE("%s(%u): tle_udp_tx_bulk(%p) returns %u,\n"
1679 "total pkts to send: %u\n",
1680 __func__, lc->id, lc->prtq[pidx].dev, j, n);
1682 for (j = 0; j != n; j++)
1683 NETBE_PKT_DUMP(mb[j]);
1685 k = rte_eth_tx_burst(lc->prtq[pidx].port.id,
1686 lc->prtq[pidx].txqid, mb, n);
1688 lc->prtq[pidx].tx_stat.out += k;
1689 lc->prtq[pidx].tx_stat.drop += n - k;
1690 NETBE_TRACE("%s(%u): rte_eth_tx_burst(%u, %u, %u) returns %u\n",
1691 __func__, lc->id, lc->prtq[pidx].port.id, lc->prtq[pidx].txqid,
1694 lc->prtq[pidx].tx_buf.num = n - k;
1696 for (j = k; j != n; j++)
1701 netbe_lcore_setup(struct netbe_lcore *lc)
1706 RTE_LOG(NOTICE, USER1, "%s(lcore=%u, udp_ctx: %p) start\n",
1707 __func__, lc->id, lc->ctx);
1711 * wait for FE lcores to start, so BE dont' drop any packets
1712 * because corresponding streams not opened yet by FE.
1713 * useful when used with pcap PMDS.
1714 * think better way, or should this timeout be a cmdlien parameter.
1720 for (i = 0; i != lc->prtq_num && rc == 0; i++) {
1721 RTE_LOG(NOTICE, USER1, "%s:%u(port=%u, udp_dev: %p)\n",
1722 __func__, i, lc->prtq[i].port.id, lc->prtq[i].dev);
1723 rc = setup_rx_cb(&lc->prtq[i].port, lc, lc->prtq[i].rxqid);
1729 RTE_PER_LCORE(_be) = lc;
1737 struct netbe_lcore *lc;
1739 lc = RTE_PER_LCORE(_be);
1743 for (i = 0; i != lc->prtq_num; i++) {
1750 netbe_lcore_clear(void)
1753 struct netbe_lcore *lc;
1755 lc = RTE_PER_LCORE(_be);
1759 RTE_LOG(NOTICE, USER1, "%s(lcore=%u, udp_ctx: %p) finish\n",
1760 __func__, lc->id, lc->ctx);
1761 for (i = 0; i != lc->prtq_num; i++) {
1762 RTE_LOG(NOTICE, USER1, "%s:%u(port=%u, lcore=%u, q=%u, dev=%p) "
1764 "in=%" PRIu64 ",up=%" PRIu64 ",drop=%" PRIu64 "}, "
1766 "in=%" PRIu64 ",up=%" PRIu64 ",drop=%" PRIu64 "};\n",
1767 __func__, i, lc->prtq[i].port.id, lc->id,
1770 lc->prtq[i].rx_stat.in,
1771 lc->prtq[i].rx_stat.up,
1772 lc->prtq[i].rx_stat.drop,
1773 lc->prtq[i].tx_stat.down,
1774 lc->prtq[i].tx_stat.out,
1775 lc->prtq[i].tx_stat.drop);
1778 for (i = 0; i != lc->prtq_num; i++)
1779 for (j = 0; j != lc->prtq[i].tx_buf.num; j++)
1780 rte_pktmbuf_free(lc->prtq[i].tx_buf.pkt[j]);
1782 RTE_PER_LCORE(_be) = NULL;
1786 lcore_main(void *arg)
1790 struct lcore_prm *prm;
1793 lcore = rte_lcore_id();
1795 RTE_LOG(NOTICE, USER1, "%s(lcore=%u) start\n",
1800 /* lcore FE init. */
1801 if (prm->fe.max_streams != 0)
1802 rc = netfe_lcore_init(&prm->fe);
1804 /* lcore FE init. */
1805 if (rc == 0 && prm->be.lc != NULL)
1806 rc = netbe_lcore_setup(prm->be.lc);
1809 sig_handle(SIGQUIT);
1811 while (force_quit == 0) {
1816 RTE_LOG(NOTICE, USER1, "%s(lcore=%u) finish\n",
1820 netbe_lcore_clear();
1826 netfe_lcore_cmp(const void *s1, const void *s2)
1828 const struct netfe_stream_prm *p1, *p2;
1832 return p1->lcore - p2->lcore;
1836 netbe_find6(const struct in6_addr *laddr, uint16_t lport,
1837 const struct in6_addr *raddr, uint32_t be_lc)
1841 struct netbe_lcore *bc;
1843 /* we have exactly one BE, use it for all traffic */
1844 if (becfg.cpu_num == 1)
1847 /* search by provided be_lcore */
1848 if (be_lc != LCORE_ID_ANY) {
1849 for (i = 0; i != becfg.cpu_num; i++) {
1851 if (be_lc == bc->id)
1854 RTE_LOG(NOTICE, USER1, "%s: no stream with be_lcore=%u\n",
1859 /* search by local address */
1860 if (memcmp(laddr, &in6addr_any, sizeof(*laddr)) != 0) {
1861 for (i = 0; i != becfg.cpu_num; i++) {
1863 /* search by queue for the local port */
1864 for (j = 0; j != bc->prtq_num; j++) {
1865 if (memcmp(laddr, &bc->prtq[j].port.ipv6,
1866 sizeof(*laddr)) == 0) {
1871 if (verify_queue_for_port(bc->prtq + j,
1879 /* search by remote address */
1880 if (memcmp(raddr, &in6addr_any, sizeof(*raddr)) == 0) {
1881 for (i = 0; i != becfg.cpu_num; i++) {
1883 if (rte_lpm6_lookup(bc->lpm6,
1884 (uint8_t *)(uintptr_t)raddr->s6_addr,
1890 /* search by queue for the local port */
1891 for (j = 0; j != bc->prtq_num; j++)
1892 if (verify_queue_for_port(bc->prtq + j,
1903 netbe_find(const struct tle_udp_stream_param *p, uint32_t be_lc)
1905 const struct sockaddr_in *l4, *r4;
1906 const struct sockaddr_in6 *l6, *r6;
1908 if (p->local_addr.ss_family == AF_INET) {
1909 l4 = (const struct sockaddr_in *)&p->local_addr;
1910 r4 = (const struct sockaddr_in *)&p->remote_addr;
1911 return netbe_find4(&l4->sin_addr, ntohs(l4->sin_port),
1912 &r4->sin_addr, be_lc);
1913 } else if (p->local_addr.ss_family == AF_INET6) {
1914 l6 = (const struct sockaddr_in6 *)&p->local_addr;
1915 r6 = (const struct sockaddr_in6 *)&p->remote_addr;
1916 return netbe_find6(&l6->sin6_addr, ntohs(l6->sin6_port),
1917 &r6->sin6_addr, be_lc);
1923 netfe_sprm_flll_be(struct netfe_sprm *sp, uint32_t line, uint32_t be_lc)
1927 bidx = netbe_find(&sp->prm, be_lc);
1929 RTE_LOG(ERR, USER1, "%s(line=%u): no BE for that stream\n",
1937 /* start front-end processing. */
1939 netfe_lcore_fill(struct lcore_prm prm[RTE_MAX_LCORE],
1940 struct netfe_lcore_prm *lprm)
1943 uint32_t i, j, lc, ln;
1945 /* determine on what BE each stream should be open. */
1946 for (i = 0; i != lprm->nb_streams; i++) {
1947 lc = lprm->stream[i].lcore;
1948 ln = lprm->stream[i].line;
1949 be_lc = lprm->stream[i].be_lcore;
1950 if (netfe_sprm_flll_be(&lprm->stream[i].sprm, ln,
1952 (lprm->stream[i].op == FWD &&
1953 netfe_sprm_flll_be(&lprm->stream[i].fprm, ln,
1958 /* group all fe parameters by lcore. */
1960 qsort(lprm->stream, lprm->nb_streams, sizeof(lprm->stream[0]),
1963 for (i = 0; i != lprm->nb_streams; i = j) {
1965 lc = lprm->stream[i].lcore;
1966 ln = lprm->stream[i].line;
1968 if (rte_lcore_is_enabled(lc) == 0) {
1970 "%s(line=%u): lcore %u is not enabled\n",
1975 if (rte_get_master_lcore() != lc &&
1976 rte_eal_get_lcore_state(lc) == RUNNING) {
1978 "%s(line=%u): lcore %u already in use\n",
1983 for (j = i + 1; j != lprm->nb_streams &&
1984 lc == lprm->stream[j].lcore;
1988 prm[lc].fe.max_streams = lprm->max_streams;
1989 prm[lc].fe.nb_streams = j - i;
1990 prm[lc].fe.stream = lprm->stream + i;
1997 main(int argc, char *argv[])
1999 int32_t opt, opt_idx, rc;
2002 struct tle_udp_ctx_param ctx_prm;
2003 struct netfe_lcore_prm feprm;
2004 struct rte_eth_stats stats;
2005 char fecfg_fname[PATH_MAX + 1];
2006 char becfg_fname[PATH_MAX + 1];
2007 struct lcore_prm prm[RTE_MAX_LCORE];
2008 struct rte_eth_dev_info dev_info;
2012 memset(prm, 0, sizeof(prm));
2014 rc = rte_eal_init(argc, argv);
2016 rte_exit(EXIT_FAILURE,
2017 "%s: rte_eal_init failed with error code: %d\n",
2020 memset(&ctx_prm, 0, sizeof(ctx_prm));
2027 while ((opt = getopt_long(argc, argv, "B:PR:S:b:f:s:", long_opt,
2028 &opt_idx)) != EOF) {
2029 if (opt == OPT_SHORT_SBULK) {
2030 rc = parse_uint_val(NULL, optarg, &v);
2032 rte_exit(EXIT_FAILURE, "%s: invalid value: %s "
2033 "for option: \'%c\'\n",
2034 __func__, optarg, opt);
2035 ctx_prm.send_bulk_size = v;
2036 } else if (opt == OPT_SHORT_PROMISC) {
2038 } else if (opt == OPT_SHORT_RBUFS) {
2039 rc = parse_uint_val(NULL, optarg, &v);
2041 rte_exit(EXIT_FAILURE, "%s: invalid value: %s "
2042 "for option: \'%c\'\n",
2043 __func__, optarg, opt);
2044 ctx_prm.max_stream_rbufs = v;
2045 } else if (opt == OPT_SHORT_SBUFS) {
2046 rc = parse_uint_val(NULL, optarg, &v);
2048 rte_exit(EXIT_FAILURE, "%s: invalid value: %s "
2049 "for option: \'%c\'\n",
2050 __func__, optarg, opt);
2051 ctx_prm.max_stream_sbufs = v;
2052 } else if (opt == OPT_SHORT_STREAMS) {
2053 rc = parse_uint_val(NULL, optarg, &v);
2055 rte_exit(EXIT_FAILURE, "%s: invalid value: %s "
2056 "for option: \'%c\'\n",
2057 __func__, optarg, opt);
2058 ctx_prm.max_streams = v;
2059 } else if (opt == OPT_SHORT_BECFG) {
2060 snprintf(becfg_fname, sizeof(becfg_fname), "%s",
2062 } else if (opt == OPT_SHORT_FECFG) {
2063 snprintf(fecfg_fname, sizeof(fecfg_fname), "%s",
2066 rte_exit(EXIT_FAILURE,
2067 "%s: unknown option: \'%c\'\n",
2072 signal(SIGINT, sig_handle);
2074 rc = netbe_port_init(&becfg, argc - optind, argv + optind);
2076 rte_exit(EXIT_FAILURE,
2077 "%s: netbe_port_init failed with error code: %d\n",
2080 rc = netbe_lcore_init(&becfg, &ctx_prm);
2082 sig_handle(SIGQUIT);
2084 if ((rc = netbe_dest_init(becfg_fname, &becfg)) != 0)
2085 sig_handle(SIGQUIT);
2087 for (i = 0; i != becfg.prt_num && rc == 0; i++) {
2088 RTE_LOG(NOTICE, USER1, "%s: starting port %u\n",
2089 __func__, becfg.prt[i].id);
2090 rc = rte_eth_dev_start(becfg.prt[i].id);
2093 "%s: rte_eth_dev_start(%u) returned "
2095 __func__, becfg.prt[i].id, rc);
2096 sig_handle(SIGQUIT);
2098 rte_eth_dev_info_get(becfg.prt[i].id, &dev_info);
2099 rc = update_rss_reta(&becfg.prt[i], &dev_info);
2101 sig_handle(SIGQUIT);
2104 feprm.max_streams = ctx_prm.max_streams * becfg.cpu_num;
2105 if (rc == 0 && (rc = netfe_parse_cfg(fecfg_fname, &feprm)) != 0)
2106 sig_handle(SIGQUIT);
2108 for (i = 0; rc == 0 && i != becfg.cpu_num; i++)
2109 prm[becfg.cpu[i].id].be.lc = becfg.cpu + i;
2111 if (rc == 0 && (rc = netfe_lcore_fill(prm, &feprm)) != 0)
2112 sig_handle(SIGQUIT);
2114 /* launch all slave lcores. */
2115 RTE_LCORE_FOREACH_SLAVE(i) {
2116 if (prm[i].be.lc != NULL || prm[i].fe.max_streams != 0)
2117 rte_eal_remote_launch(lcore_main, prm + i, i);
2120 /* launch master lcore. */
2121 i = rte_get_master_lcore();
2122 if (prm[i].be.lc != NULL || prm[i].fe.max_streams != 0)
2123 lcore_main(prm + i);
2125 rte_eal_mp_wait_lcore();
2127 for (i = 0; i != becfg.prt_num; i++) {
2128 RTE_LOG(NOTICE, USER1, "%s: stoping port %u\n",
2129 __func__, becfg.prt[i].id);
2130 rte_eth_stats_get(becfg.prt[i].id, &stats);
2131 RTE_LOG(NOTICE, USER1, "port %u stats={\n"
2132 "ipackets=%" PRIu64 ";"
2133 "ibytes=%" PRIu64 ";"
2134 "ierrors=%" PRIu64 ";\n"
2135 "opackets=%" PRIu64 ";"
2136 "obytes=%" PRIu64 ";"
2137 "oerrors=%" PRIu64 ";\n"
2146 rte_eth_dev_stop(becfg.prt[i].id);
2149 netbe_lcore_fini(&becfg);