2 * Copyright (c) 2016 Intel Corporation.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at:
7 * http://www.apache.org/licenses/LICENSE-2.0
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
19 #define MAX_RULES 0x100
20 #define MAX_TBL8 0x800
22 #define RX_RING_SIZE 0x400
23 #define TX_RING_SIZE 0x800
25 #define MPOOL_CACHE_SIZE 0x100
26 #define MPOOL_NB_BUF 0x20000
28 #define FRAG_MBUF_BUF_SIZE (RTE_PKTMBUF_HEADROOM + TLE_UDP_MAX_HDR)
29 #define FRAG_TTL MS_PER_S
30 #define FRAG_TBL_BUCKET_ENTRIES 16
32 #define FIRST_PORT 0x8000
34 #define RX_CSUM_OFFLOAD (DEV_RX_OFFLOAD_IPV4_CKSUM | DEV_RX_OFFLOAD_UDP_CKSUM)
35 #define TX_CSUM_OFFLOAD (DEV_TX_OFFLOAD_IPV4_CKSUM | DEV_TX_OFFLOAD_UDP_CKSUM)
37 #define OPT_SHORT_SBULK 'B'
38 #define OPT_LONG_SBULK "sburst"
40 #define OPT_SHORT_PROMISC 'P'
41 #define OPT_LONG_PROMISC "promisc"
43 #define OPT_SHORT_RBUFS 'R'
44 #define OPT_LONG_RBUFS "rbufs"
46 #define OPT_SHORT_SBUFS 'S'
47 #define OPT_LONG_SBUFS "sbufs"
49 #define OPT_SHORT_STREAMS 's'
50 #define OPT_LONG_STREAMS "streams"
52 #define OPT_SHORT_FECFG 'f'
53 #define OPT_LONG_FECFG "fecfg"
55 #define OPT_SHORT_BECFG 'b'
56 #define OPT_LONG_BECFG "becfg"
58 RTE_DEFINE_PER_LCORE(struct netfe_lcore *, _fe);
62 static const struct option long_opt[] = {
63 {OPT_LONG_BECFG, 1, 0, OPT_SHORT_BECFG},
64 {OPT_LONG_FECFG, 1, 0, OPT_SHORT_FECFG},
65 {OPT_LONG_PROMISC, 0, 0, OPT_SHORT_PROMISC},
66 {OPT_LONG_RBUFS, 1, 0, OPT_SHORT_RBUFS},
67 {OPT_LONG_SBUFS, 1, 0, OPT_SHORT_SBUFS},
68 {OPT_LONG_SBULK, 1, 0, OPT_SHORT_SBULK},
69 {OPT_LONG_STREAMS, 1, 0, OPT_SHORT_STREAMS},
73 static volatile int force_quit;
75 static struct netbe_cfg becfg;
76 static struct rte_mempool *mpool[RTE_MAX_NUMA_NODES + 1];
77 static struct rte_mempool *frag_mpool[RTE_MAX_NUMA_NODES + 1];
79 static const struct rte_eth_conf port_conf_default = {
81 .max_rx_pkt_len = ETHER_MAX_VLAN_FRAME_LEN,
90 sig_handle(int signum)
92 RTE_LOG(ERR, USER1, "%s(%d)\n", __func__, signum);
97 * Initilise DPDK port.
98 * In current version, only one queue per port is used.
101 port_init(struct netbe_port *uprt, struct rte_mempool *mp)
105 struct rte_eth_conf port_conf;
106 struct rte_eth_dev_info dev_info;
108 const uint16_t rx_rings = 1, tx_rings = 1;
110 rte_eth_dev_info_get(uprt->id, &dev_info);
111 if ((dev_info.rx_offload_capa & uprt->rx_offload) != uprt->rx_offload) {
113 "port#%u supported/requested RX offloads don't match, "
114 "supported: %#x, requested: %#x;\n",
115 uprt->id, dev_info.rx_offload_capa, uprt->rx_offload);
118 if ((dev_info.tx_offload_capa & uprt->tx_offload) != uprt->tx_offload) {
120 "port#%u supported/requested TX offloads don't match, "
121 "supported: %#x, requested: %#x;\n",
122 uprt->id, dev_info.tx_offload_capa, uprt->tx_offload);
126 port_conf = port_conf_default;
127 if ((uprt->rx_offload & RX_CSUM_OFFLOAD) != 0) {
128 RTE_LOG(ERR, USER1, "%s(%u): enabling RX csum offload;\n",
130 port_conf.rxmode.hw_ip_checksum = 1;
133 port_conf.rxmode.max_rx_pkt_len = uprt->mtu + ETHER_CRC_LEN;
135 rc = rte_eth_dev_configure(uprt->id, rx_rings, tx_rings, &port_conf);
136 RTE_LOG(NOTICE, USER1,
137 "%s: rte_eth_dev_configure(%u) returns %d;\n",
138 __func__, uprt->id, rc);
142 socket = rte_eth_dev_socket_id(uprt->id);
144 dev_info.default_rxconf.rx_drop_en = 1;
146 dev_info.default_txconf.tx_free_thresh = TX_RING_SIZE / 2;
147 if (uprt->tx_offload != 0) {
148 RTE_LOG(ERR, USER1, "%s(%u): enabling full featured TX;\n",
150 dev_info.default_txconf.txq_flags = 0;
153 for (q = 0; q < rx_rings; q++) {
154 rc = rte_eth_rx_queue_setup(uprt->id, q, RX_RING_SIZE,
160 for (q = 0; q < tx_rings; q++) {
161 rc = rte_eth_tx_queue_setup(uprt->id, q, TX_RING_SIZE,
162 socket, &dev_info.default_txconf);
172 * Check that lcore is enabled, not master, and not in use already.
175 check_lcore(uint32_t lc)
177 if (rte_lcore_is_enabled(lc) == 0) {
178 RTE_LOG(ERR, USER1, "lcore %u is not enabled\n", lc);
181 if (rte_get_master_lcore() == lc) {
182 RTE_LOG(ERR, USER1, "lcore %u is not slave\n", lc);
185 if (rte_eal_get_lcore_state(lc) == RUNNING) {
186 RTE_LOG(ERR, USER1, "lcore %u already running %p\n",
187 lc, lcore_config[lc].f);
194 log_netbe_prt(const struct netbe_port *uprt)
196 RTE_LOG(NOTICE, USER1,
197 "uprt %p = <id = %u, lcore = %u, "
198 "mtu = %u, rx_offload = %u, tx_offload = %u,\n"
200 "ipv6 = %04hx:%04hx:%04hx:%04hx:%04hx:%04hx:%04hx:%04hx, "
201 "mac = %02hhx:%02hhx:%02hhx:%02hhx:%02hhx:%02hhx>;\n",
202 uprt, uprt->id, uprt->lcore,
203 uprt->mtu, uprt->rx_offload, uprt->tx_offload,
205 uprt->ipv6.s6_addr16[0], uprt->ipv6.s6_addr16[1],
206 uprt->ipv6.s6_addr16[2], uprt->ipv6.s6_addr16[3],
207 uprt->ipv6.s6_addr16[4], uprt->ipv6.s6_addr16[5],
208 uprt->ipv6.s6_addr16[6], uprt->ipv6.s6_addr16[7],
209 uprt->mac.addr_bytes[0], uprt->mac.addr_bytes[1],
210 uprt->mac.addr_bytes[2], uprt->mac.addr_bytes[3],
211 uprt->mac.addr_bytes[4], uprt->mac.addr_bytes[5]);
215 log_netbe_cfg(const struct netbe_cfg *ucfg)
219 RTE_LOG(NOTICE, USER1,
220 "ucfg @ %p, prt_num = %u\n", ucfg, ucfg->prt_num);
222 for (i = 0; i != ucfg->prt_num; i++)
223 log_netbe_prt(ucfg->prt + i);
227 pool_init(uint32_t sid)
230 struct rte_mempool *mp;
231 char name[RTE_MEMPOOL_NAMESIZE];
233 snprintf(name, sizeof(name), "MP%u", sid);
234 mp = rte_pktmbuf_pool_create(name, MPOOL_NB_BUF, MPOOL_CACHE_SIZE, 0,
235 RTE_MBUF_DEFAULT_BUF_SIZE, sid - 1);
238 RTE_LOG(ERR, USER1, "%s(%d) failed with error code: %d\n",
239 __func__, sid - 1, rc);
248 frag_pool_init(uint32_t sid)
251 struct rte_mempool *frag_mp;
252 char frag_name[RTE_MEMPOOL_NAMESIZE];
254 snprintf(frag_name, sizeof(frag_name), "frag_MP%u", sid);
255 frag_mp = rte_pktmbuf_pool_create(frag_name, MPOOL_NB_BUF,
256 MPOOL_CACHE_SIZE, 0, FRAG_MBUF_BUF_SIZE, sid - 1);
257 if (frag_mp == NULL) {
259 RTE_LOG(ERR, USER1, "%s(%d) failed with error code: %d\n",
260 __func__, sid - 1, rc);
264 frag_mpool[sid] = frag_mp;
270 * Setup all enabled ports.
273 netbe_port_init(struct netbe_cfg *cfg, int argc, char *argv[])
278 n = RTE_MIN(RTE_DIM(cfg->prt), (uint32_t)argc);
281 for (i = 0; i != n; i++) {
282 rc = parse_netbe_arg(cfg->prt + i, argv[i]);
286 rc = check_lcore(cfg->prt[i].lcore);
290 sid = rte_lcore_to_socket_id(cfg->prt[i].lcore) + 1;
291 assert(sid < RTE_DIM(mpool));
293 if (mpool[sid] == NULL && (rc = pool_init(sid)) != 0)
296 if (frag_mpool[sid] == NULL && (rc = frag_pool_init(sid)) != 0)
299 rc = port_init(cfg->prt + i, mpool[sid]);
303 rte_eth_macaddr_get(cfg->prt[i].id, &cfg->prt[i].mac);
305 rte_eth_promiscuous_enable(cfg->prt[i].id);
309 rte_exit(EXIT_FAILURE,
310 "%s: processing of \"%s\" failed with error code: %d\n",
311 __func__, argv[i], rc);
318 * UDP IPv4 destination lookup callback.
321 lpm4_dst_lookup(void *data, const struct in_addr *addr,
322 struct tle_udp_dest *res)
326 struct netbe_lcore *lc;
327 struct tle_udp_dest *dst;
331 rc = rte_lpm_lookup(lc->lpm4, rte_be_to_cpu_32(addr->s_addr), &idx);
333 dst = &lc->dst4[idx];
334 rte_memcpy(res, dst, dst->l2_len + dst->l3_len +
335 offsetof(struct tle_udp_dest, hdr));
341 * UDP IPv6 destination lookup callback.
344 lpm6_dst_lookup(void *data, const struct in6_addr *addr,
345 struct tle_udp_dest *res)
349 struct netbe_lcore *lc;
350 struct tle_udp_dest *dst;
354 p = (uintptr_t)addr->s6_addr;
356 rc = rte_lpm6_lookup(lc->lpm6, (uint8_t *)p, &idx);
358 dst = &lc->dst6[idx];
359 rte_memcpy(res, dst, dst->l2_len + dst->l3_len +
360 offsetof(struct tle_udp_dest, hdr));
366 netbe_add_ipv4_route(struct netbe_lcore *lc, const struct netbe_dest *dst,
370 uint32_t addr, depth;
371 char str[INET_ADDRSTRLEN];
374 addr = rte_be_to_cpu_32(dst->ipv4.s_addr);
376 inet_ntop(AF_INET, &dst->ipv4, str, sizeof(str));
377 rc = rte_lpm_add(lc->lpm4, addr, depth, idx);
378 RTE_LOG(NOTICE, USER1, "%s(lcore=%u,port=%u,dev=%p,"
380 "mac=%02hhx:%02hhx:%02hhx:%02hhx:%02hhx:%02hhx) "
382 __func__, lc->id, dst->port, lc->dst4[idx].dev,
383 str, depth, lc->dst4[idx].mtu,
384 dst->mac.addr_bytes[0], dst->mac.addr_bytes[1],
385 dst->mac.addr_bytes[2], dst->mac.addr_bytes[3],
386 dst->mac.addr_bytes[4], dst->mac.addr_bytes[5],
392 netbe_add_ipv6_route(struct netbe_lcore *lc, const struct netbe_dest *dst,
397 char str[INET6_ADDRSTRLEN];
401 rc = rte_lpm6_add(lc->lpm6, (uint8_t *)(uintptr_t)dst->ipv6.s6_addr,
404 inet_ntop(AF_INET6, &dst->ipv6, str, sizeof(str));
405 RTE_LOG(NOTICE, USER1, "%s(lcore=%u,port=%u,dev=%p,"
407 "mac=%02hhx:%02hhx:%02hhx:%02hhx:%02hhx:%02hhx) "
409 __func__, lc->id, dst->port, lc->dst6[idx].dev,
410 str, depth, lc->dst4[idx].mtu,
411 dst->mac.addr_bytes[0], dst->mac.addr_bytes[1],
412 dst->mac.addr_bytes[2], dst->mac.addr_bytes[3],
413 dst->mac.addr_bytes[4], dst->mac.addr_bytes[5],
419 lcore_lpm_init(struct netbe_lcore *lc)
422 char str[RTE_LPM_NAMESIZE];
423 const struct rte_lpm_config lpm4_cfg = {
424 .max_rules = MAX_RULES,
425 .number_tbl8s = MAX_TBL8,
427 const struct rte_lpm6_config lpm6_cfg = {
428 .max_rules = MAX_RULES,
429 .number_tbl8s = MAX_TBL8,
432 sid = rte_lcore_to_socket_id(lc->id);
434 snprintf(str, sizeof(str), "LPM4%u\n", lc->id);
435 lc->lpm4 = rte_lpm_create(str, sid, &lpm4_cfg);
436 RTE_LOG(NOTICE, USER1, "%s(lcore=%u): lpm4=%p;\n",
437 __func__, lc->id, lc->lpm4);
438 if (lc->lpm4 == NULL)
441 snprintf(str, sizeof(str), "LPM6%u\n", lc->id);
442 lc->lpm6 = rte_lpm6_create(str, sid, &lpm6_cfg);
443 RTE_LOG(NOTICE, USER1, "%s(lcore=%u): lpm6=%p;\n",
444 __func__, lc->id, lc->lpm6);
445 if (lc->lpm6 == NULL)
452 fill_dst(struct tle_udp_dest *dst, struct netbe_dev *bed,
453 const struct netbe_dest *bdp, uint16_t l3_type, int32_t sid)
455 struct ether_hdr *eth;
456 struct ipv4_hdr *ip4h;
457 struct ipv6_hdr *ip6h;
459 static const struct ipv4_hdr ipv4_tmpl = {
460 .version_ihl = 4 << 4 | sizeof(*ip4h) / IPV4_IHL_MULTIPLIER,
462 .next_proto_id = IPPROTO_UDP,
465 static const struct ipv6_hdr ipv6_tmpl = {
467 .proto = IPPROTO_UDP,
472 dst->head_mp = frag_mpool[sid + 1];
473 dst->mtu = RTE_MIN(bdp->mtu, bed->port.mtu);
474 dst->l2_len = sizeof(*eth);
476 eth = (struct ether_hdr *)dst->hdr;
478 ether_addr_copy(&bed->port.mac, ð->s_addr);
479 ether_addr_copy(&bdp->mac, ð->d_addr);
480 eth->ether_type = rte_cpu_to_be_16(l3_type);
482 if (l3_type == ETHER_TYPE_IPv4) {
483 dst->l3_len = sizeof(*ip4h);
484 ip4h = (struct ipv4_hdr *)(eth + 1);
486 } else if (l3_type == ETHER_TYPE_IPv6) {
487 dst->l3_len = sizeof(*ip6h);
488 ip6h = (struct ipv6_hdr *)(eth + 1);
495 * BE lcore setup routine.
498 lcore_init(struct netbe_lcore *lc, const struct tle_udp_ctx_param *ctx_prm,
499 const struct netbe_port prt[], uint32_t prt_num)
503 uint64_t frag_cycles;
504 struct tle_udp_ctx_param cprm;
505 struct tle_udp_dev_param dprm;
507 lc->id = prt[0].lcore;
508 lc->prt_num = prt_num;
510 sid = rte_lcore_to_socket_id(lc->id);
512 rc = lcore_lpm_init(lc);
517 cprm.socket_id = sid;
518 cprm.lookup4 = lpm4_dst_lookup;
519 cprm.lookup4_data = lc;
520 cprm.lookup6 = lpm6_dst_lookup;
521 cprm.lookup6_data = lc;
523 /* to facilitate both IPv4 and IPv6. */
524 cprm.max_streams *= 2;
526 frag_cycles = (rte_get_tsc_hz() + MS_PER_S - 1) / MS_PER_S * FRAG_TTL;
527 lc->ftbl = rte_ip_frag_table_create(cprm.max_streams,
528 FRAG_TBL_BUCKET_ENTRIES, cprm.max_streams, frag_cycles, sid);
529 RTE_LOG(NOTICE, USER1, "%s(lcore=%u): frag_tbl=%p;\n",
530 __func__, lc->id, lc->ftbl);
532 lc->ctx = tle_udp_create(&cprm);
533 RTE_LOG(NOTICE, USER1, "%s(lcore=%u): udp_ctx=%p;\n",
534 __func__, lc->id, lc->ctx);
536 if (lc->ctx == NULL || lc->ftbl == NULL)
539 for (i = 0; i != prt_num && rc == 0; i++) {
541 memset(&dprm, 0, sizeof(dprm));
543 lc->prt[i].rxqid = 0;
544 lc->prt[i].txqid = 0;
545 lc->prt[i].port = prt[i];
547 dprm.rx_offload = prt[i].rx_offload;
548 dprm.tx_offload = prt[i].tx_offload;
549 dprm.local_addr4.s_addr = prt[i].ipv4;
550 memcpy(&dprm.local_addr6, &prt[i].ipv6, sizeof(prt[i].ipv6));
552 lc->prt[i].dev = tle_udp_add_dev(lc->ctx, &dprm);
553 RTE_LOG(NOTICE, USER1, "%s(lcore=%u, port=%u), udp_dev: %p;\n",
554 __func__, lc->id, prt[i].id, lc->prt[i].dev);
555 if (lc->prt[i].dev == NULL)
560 RTE_LOG(ERR, USER1, "%s(lcore=%u) failed with error code: %d\n",
561 __func__, lc->id, rc);
562 tle_udp_destroy(lc->ctx);
563 rte_ip_frag_table_destroy(lc->ftbl);
564 rte_lpm_free(lc->lpm4);
565 rte_lpm6_free(lc->lpm6);
572 prt_lcore_cmp(const void *s1, const void *s2)
574 const struct netbe_port *p1, *p2;
578 return p1->lcore - p2->lcore;
582 netbe_lcore_init(struct netbe_cfg *cfg, const struct tle_udp_ctx_param *ctx_prm)
585 uint32_t i, k, n, num;
586 struct netbe_port sp[RTE_DIM(cfg->prt)];
589 memcpy(sp, cfg->prt, sizeof(sp[0]) * num);
590 qsort(sp, num, sizeof(sp[0]), prt_lcore_cmp);
592 /* Fill ports to be used by each lcore. */
597 for (i = 0; i != num && rc == 0; i++) {
598 if (sp[n].lcore != sp[i].lcore) {
599 rc = lcore_init(cfg->cpu + k, ctx_prm, sp + n, i - n);
605 if (rc == 0 && i != n) {
606 rc = lcore_init(cfg->cpu + k, ctx_prm, sp + n, i - n);
611 rte_exit(EXIT_FAILURE, "%s: failed with error code: %d\n",
618 netbe_lcore_fini(struct netbe_cfg *cfg)
622 for (i = 0; i != cfg->cpu_num; i++) {
623 tle_udp_destroy(cfg->cpu[i].ctx);
624 rte_ip_frag_table_destroy(cfg->cpu[i].ftbl);
625 rte_lpm_free(cfg->cpu[i].lpm4);
626 rte_lpm6_free(cfg->cpu[i].lpm6);
629 memset(cfg->cpu, 0, sizeof(cfg->cpu));
634 netbe_add_dest(struct netbe_lcore *lc, uint32_t dev_idx, uint16_t family,
635 const struct netbe_dest *dst, uint32_t dnum)
640 struct tle_udp_dest *dp;
642 if (family == AF_INET) {
645 m = RTE_DIM(lc->dst4);
646 l3_type = ETHER_TYPE_IPv4;
650 m = RTE_DIM(lc->dst6);
651 l3_type = ETHER_TYPE_IPv6;
655 RTE_LOG(ERR, USER1, "%s(lcore=%u, family=%hu, dnum=%u) exceeds "
656 "maximum allowed number of destinations(%u);\n",
657 __func__, lc->id, family, dnum, m);
661 sid = rte_lcore_to_socket_id(lc->id);
664 for (i = 0; i != dnum && rc == 0; i++) {
665 fill_dst(dp + i, lc->prt + dev_idx, dst + i, l3_type, sid);
666 if (family == AF_INET)
667 rc = netbe_add_ipv4_route(lc, dst + i, n + i);
669 rc = netbe_add_ipv6_route(lc, dst + i, n + i);
672 if (family == AF_INET)
673 lc->dst4_num = n + i;
675 lc->dst6_num = n + i;
681 netbe_port2lcore(struct netbe_cfg *cfg, uint32_t port, struct netbe_lcore **plc)
684 struct netbe_lcore *lc;
686 for (i = 0; i != cfg->cpu_num; i++) {
688 for (j = 0; j != cfg->prt_num; j++) {
689 if (lc->prt[j].port.id == port) {
700 netbe_dest_cmp(const void *s1, const void *s2)
702 const struct netbe_dest *p1, *p2;
706 if (p1->port == p2->port)
707 return p1->family - p2->family;
709 return p1->port - p2->port;
713 netbe_dest_init(const char *fname, struct netbe_cfg *cfg)
717 struct netbe_lcore *lc;
718 struct netbe_dest_prm prm;
720 rc = netbe_parse_dest(fname, &prm);
724 qsort(prm.dest, prm.nb_dest, sizeof(prm.dest[0]), netbe_dest_cmp);
727 for (i = 0; i != prm.nb_dest; i = j) {
729 p = prm.dest[i].port;
730 f = prm.dest[i].family;
731 for (j = i + 1; j != prm.nb_dest && p == prm.dest[j].port &&
732 f == prm.dest[j].family;
736 rc = netbe_port2lcore(cfg, p, &lc);
738 RTE_LOG(ERR, USER1, "%s(%s) error at line %u: "
739 "port %u not managed by any lcore;\n",
740 __func__, fname, prm.dest[i].line, p);
744 rc = netbe_add_dest(lc, rc, f, prm.dest + i, j - i);
754 netfe_stream_close(struct netfe_lcore *fe, uint32_t dec)
760 tle_event_free(fe->fs[sidx].txev);
761 tle_event_free(fe->fs[sidx].rxev);
762 tle_udp_stream_close(fe->fs[sidx].s);
763 memset(&fe->fs[sidx], 0, sizeof(fe->fs[sidx]));
767 netfe_stream_dump(const struct netfe_stream *fes)
769 struct sockaddr_in *l4, *r4;
770 struct sockaddr_in6 *l6, *r6;
771 uint16_t lport, rport;
772 struct tle_udp_stream_param sprm;
773 char laddr[INET6_ADDRSTRLEN];
774 char raddr[INET6_ADDRSTRLEN];
776 tle_udp_stream_get_param(fes->s, &sprm);
778 if (sprm.local_addr.ss_family == AF_INET) {
780 l4 = (struct sockaddr_in *)&sprm.local_addr;
781 r4 = (struct sockaddr_in *)&sprm.remote_addr;
783 lport = l4->sin_port;
784 rport = r4->sin_port;
786 } else if (sprm.local_addr.ss_family == AF_INET6) {
788 l6 = (struct sockaddr_in6 *)&sprm.local_addr;
789 r6 = (struct sockaddr_in6 *)&sprm.remote_addr;
791 lport = l6->sin6_port;
792 rport = r6->sin6_port;
795 RTE_LOG(ERR, USER1, "stream@%p - unknown family=%hu\n",
796 fes->s, sprm.local_addr.ss_family);
800 format_addr(&sprm.local_addr, laddr, sizeof(laddr));
801 format_addr(&sprm.remote_addr, raddr, sizeof(raddr));
805 "family=%hu,laddr=%s,lport=%hu,raddr=%s,rport=%hu,"
807 "rxp=%" PRIu64 ",txp=%" PRIu64 ",drops=%" PRIu64 ","
808 "rxev[IDLE, DOWN, UP]=[%" PRIu64 ", %" PRIu64 ", %" PRIu64 "],"
809 "txev[IDLE, DOWN, UP]=[%" PRIu64 ", %" PRIu64 ", %" PRIu64 "],"
812 sprm.local_addr.ss_family,
813 laddr, ntohs(lport), raddr, ntohs(rport),
814 fes->stat.rxp, fes->stat.txp, fes->stat.drops,
815 fes->stat.rxev[TLE_SEV_IDLE],
816 fes->stat.rxev[TLE_SEV_DOWN],
817 fes->stat.rxev[TLE_SEV_UP],
818 fes->stat.txev[TLE_SEV_IDLE],
819 fes->stat.txev[TLE_SEV_DOWN],
820 fes->stat.txev[TLE_SEV_UP]);
825 * helper function: opens IPv4 and IPv6 streams for selected port.
827 static struct netfe_stream *
828 netfe_stream_open(struct netfe_lcore *fe, struct tle_udp_stream_param *sprm,
829 uint32_t lcore, uint16_t op, uint32_t bidx)
833 struct netfe_stream *fes;
837 if (sidx >= fe->snum) {
842 fes->rxev = tle_event_alloc(fe->rxeq, &fe->fs[sidx]);
843 fes->txev = tle_event_alloc(fe->txeq, &fe->fs[sidx]);
844 sprm->recv_ev = fes->rxev;
846 sprm->send_ev = fes->txev;
848 RTE_LOG(ERR, USER1, "%s(%u) [%u]={op=%hu, rxev=%p, txev=%p}\n",
849 __func__, lcore, sidx, op, fes->rxev, fes->txev);
850 if (fes->rxev == NULL || fes->txev == NULL) {
851 netfe_stream_close(fe, 0);
856 if (op == TXONLY || op == FWD) {
857 tle_event_active(fes->txev, TLE_SEV_DOWN);
858 fes->stat.txev[TLE_SEV_DOWN]++;
862 tle_event_active(fes->rxev, TLE_SEV_DOWN);
863 fes->stat.rxev[TLE_SEV_DOWN]++;
866 fes->s = tle_udp_stream_open(becfg.cpu[bidx].ctx, sprm);
867 if (fes->s == NULL) {
869 netfe_stream_close(fe, 0);
875 fes->family = sprm->local_addr.ss_family;
883 netfe_addr_eq(struct sockaddr_storage *l, struct sockaddr_storage *r,
886 struct sockaddr_in *l4, *r4;
887 struct sockaddr_in6 *l6, *r6;
889 if (family == AF_INET) {
890 l4 = (struct sockaddr_in *)l;
891 r4 = (struct sockaddr_in *)r;
892 return (l4->sin_port == r4->sin_port &&
893 l4->sin_addr.s_addr == r4->sin_addr.s_addr);
895 l6 = (struct sockaddr_in6 *)l;
896 r6 = (struct sockaddr_in6 *)r;
897 return (l6->sin6_port == r6->sin6_port &&
898 memcmp(&l6->sin6_addr, &r6->sin6_addr,
899 sizeof(l6->sin6_addr)));
904 netfe_pkt_addr(const struct rte_mbuf *m, struct sockaddr_storage *ps,
907 const struct ipv4_hdr *ip4h;
908 const struct ipv6_hdr *ip6h;
909 const struct udp_hdr *udph;
910 struct sockaddr_in *in4;
911 struct sockaddr_in6 *in6;
915 udph = rte_pktmbuf_mtod_offset(m, struct udp_hdr *, -m->l4_len);
917 if (family == AF_INET) {
918 in4 = (struct sockaddr_in *)ps;
919 ip4h = rte_pktmbuf_mtod_offset(m, struct ipv4_hdr *,
920 -(m->l4_len + m->l3_len));
921 in4->sin_port = udph->src_port;
922 in4->sin_addr.s_addr = ip4h->src_addr;
924 in6 = (struct sockaddr_in6 *)ps;
925 ip6h = rte_pktmbuf_mtod_offset(m, struct ipv6_hdr *,
926 -(m->l4_len + m->l3_len));
927 in6->sin6_port = udph->src_port;
928 rte_memcpy(&in6->sin6_addr, ip6h->src_addr,
929 sizeof(in6->sin6_addr));
933 static inline uint32_t
934 pkt_eq_addr(struct rte_mbuf *pkt[], uint32_t num, uint16_t family,
935 struct sockaddr_storage *cur, struct sockaddr_storage *nxt)
939 for (i = 0; i != num; i++) {
940 netfe_pkt_addr(pkt[i], nxt, family);
941 if (netfe_addr_eq(cur, nxt, family) == 0)
949 pkt_buf_empty(struct pkt_buf *pb)
953 for (i = 0; i != pb->num; i++)
954 rte_pktmbuf_free(pb->pkt[i]);
960 pkt_buf_fill(uint32_t lcore, struct pkt_buf *pb, uint32_t dlen)
965 sid = rte_lcore_to_socket_id(lcore) + 1;
967 for (i = pb->num; i != RTE_DIM(pb->pkt); i++) {
968 pb->pkt[i] = rte_pktmbuf_alloc(mpool[sid]);
969 if (pb->pkt[i] == NULL)
971 rte_pktmbuf_append(pb->pkt[i], dlen);
977 static struct netfe_stream *
978 find_fwd_dst(uint32_t lcore, struct netfe_stream *fes,
979 const struct sockaddr *sa)
982 struct netfe_stream *fed;
983 struct netfe_lcore *fe;
984 struct tle_udp_stream_param sprm;
986 fe = RTE_PER_LCORE(_fe);
988 fed = fwd_tbl_lkp(fe, fes->family, sa);
992 /* create a new stream and put it into the fwd table. */
994 sprm = fes->fwdprm.prm;
996 /* open forward stream with wildcard remote addr. */
997 memset(&sprm.remote_addr.ss_family + 1, 0,
998 sizeof(sprm.remote_addr) - sizeof(sprm.remote_addr.ss_family));
999 fed = netfe_stream_open(fe, &sprm, lcore, FWD, fes->fwdprm.bidx);
1003 rc = fwd_tbl_add(fe, fes->family, sa, fed);
1005 netfe_stream_close(fe, 1);
1009 fed->fwdprm.prm.remote_addr = *(const struct sockaddr_storage *)sa;
1014 netfe_tx_process(uint32_t lcore, struct netfe_stream *fes)
1018 /* refill with new mbufs. */
1019 pkt_buf_fill(lcore, &fes->pbuf, fes->txlen);
1025 k = tle_udp_stream_send(fes->s, fes->pbuf.pkt, n, NULL);
1026 NETFE_TRACE("%s(%u): tle_udp_stream_send(%p, %u) returns %u\n",
1027 __func__, lcore, fes->s, n, k);
1029 fes->stat.drops += n - k;
1034 /* adjust pbuf array. */
1035 fes->pbuf.num = n - k;
1036 for (i = k; i != n; i++)
1037 fes->pbuf.pkt[i - k] = fes->pbuf.pkt[i];
1042 netfe_fwd(uint32_t lcore, struct netfe_stream *fes)
1044 uint32_t i, j, k, n, x;
1046 void *pi0, *pi1, *pt;
1047 struct rte_mbuf **pkt;
1048 struct netfe_stream *fed;
1049 struct sockaddr_storage in[2];
1051 family = fes->family;
1053 pkt = fes->pbuf.pkt;
1058 in[0].ss_family = family;
1059 in[1].ss_family = family;
1063 netfe_pkt_addr(pkt[0], pi0, family);
1066 for (i = 0; i != n; i = j) {
1068 j = i + pkt_eq_addr(&pkt[i + 1],
1069 n - i - 1, family, pi0, pi1) + 1;
1071 fed = find_fwd_dst(lcore, fes, (const struct sockaddr *)pi0);
1074 k = tle_udp_stream_send(fed->s, pkt + i, j - i,
1075 (const struct sockaddr *)
1076 &fes->fwdprm.prm.remote_addr);
1078 NETFE_TRACE("%s(%u): tle_udp_stream_send(%p, %u) "
1080 __func__, lcore, fed->s, j - i, k);
1082 fed->stat.drops += j - i - k;
1086 NETFE_TRACE("%s(%u, %p): no fwd stream for %u pkts;\n",
1087 __func__, lcore, fes->s, j - i);
1088 for (k = i; k != j; k++) {
1089 NETFE_TRACE("%s(%u, %p): free(%p);\n",
1090 __func__, lcore, fes->s, pkt[k]);
1091 rte_pktmbuf_free(pkt[j]);
1093 fes->stat.drops += j - i;
1096 /* copy unforwarded mbufs. */
1097 for (i += k; i != j; i++, x++)
1100 /* swap the pointers */
1109 tle_event_raise(fes->txev);
1110 fes->stat.txev[TLE_SEV_UP]++;
1113 if (n == RTE_DIM(fes->pbuf.pkt)) {
1114 tle_event_active(fes->rxev, TLE_SEV_UP);
1115 fes->stat.rxev[TLE_SEV_UP]++;
1120 netfe_rx_process(__rte_unused uint32_t lcore, struct netfe_stream *fes)
1125 k = RTE_DIM(fes->pbuf.pkt) - n;
1127 /* packet buffer is full, can't receive any new packets. */
1129 tle_event_idle(fes->rxev);
1130 fes->stat.rxev[TLE_SEV_IDLE]++;
1134 n = tle_udp_stream_recv(fes->s, fes->pbuf.pkt + n, k);
1138 NETFE_TRACE("%s(%u): tle_udp_stream_recv(%p, %u) returns %u\n",
1139 __func__, lcore, fes->s, k, n);
1144 /* free all received mbufs. */
1145 if (fes->op == RXONLY)
1146 pkt_buf_empty(&fes->pbuf);
1147 /* mark stream as writable */
1148 else if (k == RTE_DIM(fes->pbuf.pkt)) {
1149 if (fes->op == RXTX) {
1150 tle_event_active(fes->txev, TLE_SEV_UP);
1151 fes->stat.txev[TLE_SEV_UP]++;
1152 } else if (fes->op == FWD) {
1153 tle_event_raise(fes->txev);
1154 fes->stat.txev[TLE_SEV_UP]++;
1160 netfe_rxtx_process(__rte_unused uint32_t lcore, struct netfe_stream *fes)
1162 uint32_t i, j, k, n;
1164 void *pi0, *pi1, *pt;
1165 struct rte_mbuf **pkt;
1166 struct sockaddr_storage in[2];
1168 family = fes->family;
1170 pkt = fes->pbuf.pkt;
1172 /* there is nothing to send. */
1174 tle_event_idle(fes->txev);
1175 fes->stat.txev[TLE_SEV_IDLE]++;
1179 in[0].ss_family = family;
1180 in[1].ss_family = family;
1184 netfe_pkt_addr(pkt[0], pi0, family);
1186 for (i = 0; i != n; i = j) {
1188 j = i + pkt_eq_addr(&pkt[i + 1],
1189 n - i - 1, family, pi0, pi1) + 1;
1191 k = tle_udp_stream_send(fes->s, pkt + i, j - i,
1192 (const struct sockaddr *)pi0);
1194 NETFE_TRACE("%s(%u): tle_udp_stream_send(%p, %u) returns %u\n",
1195 __func__, lcore, fes->s, j - i, k);
1197 fes->stat.drops += j - i - k;
1201 /* stream send buffer is full */
1205 /* swap the pointers */
1211 /* not able to send anything. */
1215 if (n == RTE_DIM(fes->pbuf.pkt)) {
1216 /* mark stream as readable */
1217 tle_event_active(fes->rxev, TLE_SEV_UP);
1218 fes->stat.rxev[TLE_SEV_UP]++;
1221 /* adjust pbuf array. */
1222 fes->pbuf.num = n - i;
1223 for (j = i; j != n; j++)
1224 pkt[j - i] = pkt[j];
1228 netfe_lcore(void *arg)
1232 uint32_t i, j, n, lcore, snum;
1233 const struct netfe_lcore_prm *prm;
1234 struct netfe_lcore *fe;
1235 struct tle_evq_param eprm;
1236 struct tle_udp_stream_param sprm;
1237 struct netfe_stream *fes, *fs[MAX_PKT_BURST];
1239 lcore = rte_lcore_id();
1242 snum = prm->max_streams;
1243 RTE_LOG(NOTICE, USER1, "%s(lcore=%u, nb_streams=%u, max_streams=%u)\n",
1244 __func__, lcore, prm->nb_streams, snum);
1246 memset(&eprm, 0, sizeof(eprm));
1247 eprm.socket_id = rte_lcore_to_socket_id(lcore);
1248 eprm.max_events = snum;
1250 sz = sizeof(*fe) + snum * sizeof(fe->fs[0]);
1251 fe = rte_zmalloc_socket(NULL, sz, RTE_CACHE_LINE_SIZE,
1252 rte_lcore_to_socket_id(lcore));
1255 RTE_LOG(ERR, USER1, "%s:%d failed to allocate %zu bytes\n",
1256 __func__, __LINE__, sz);
1260 RTE_PER_LCORE(_fe) = fe;
1263 fe->fs = (struct netfe_stream *)(fe + 1);
1265 fe->rxeq = tle_evq_create(&eprm);
1266 fe->txeq = tle_evq_create(&eprm);
1268 RTE_LOG(ERR, USER1, "%s(%u) rx evq=%p, tx evq=%p\n",
1269 __func__, lcore, fe->rxeq, fe->txeq);
1270 if (fe->rxeq == NULL || fe->txeq == NULL)
1273 rc = fwd_tbl_init(fe, AF_INET, lcore);
1274 RTE_LOG(ERR, USER1, "%s(%u) fwd_tbl_init(%u) returns %d\n",
1275 __func__, lcore, AF_INET, rc);
1279 rc = fwd_tbl_init(fe, AF_INET6, lcore);
1280 RTE_LOG(ERR, USER1, "%s(%u) fwd_tbl_init(%u) returns %d\n",
1281 __func__, lcore, AF_INET6, rc);
1285 /* open all requested streams. */
1286 for (i = 0; i != prm->nb_streams; i++) {
1287 sprm = prm->stream[i].sprm.prm;
1288 fes = netfe_stream_open(fe, &sprm, lcore, prm->stream[i].op,
1289 prm->stream[i].sprm.bidx);
1295 netfe_stream_dump(fes);
1297 if (prm->stream[i].op == FWD) {
1298 fes->fwdprm = prm->stream[i].fprm;
1299 rc = fwd_tbl_add(fe,
1300 prm->stream[i].fprm.prm.remote_addr.ss_family,
1301 (const struct sockaddr *)
1302 &prm->stream[i].fprm.prm.remote_addr,
1305 netfe_stream_close(fe, 1);
1308 } else if (prm->stream[i].op == TXONLY) {
1309 fes->txlen = prm->stream[i].txlen;
1310 fes->raddr = sprm.remote_addr;
1314 while (fe->sidx >= prm->nb_streams && force_quit == 0) {
1316 n = tle_evq_get(fe->rxeq, (const void **)(uintptr_t)fs,
1320 NETFE_TRACE("%s(%u): tle_evq_get(rxevq=%p) "
1322 __func__, lcore, fe->rxeq, n);
1323 for (j = 0; j != n; j++)
1324 netfe_rx_process(lcore, fs[j]);
1327 n = tle_evq_get(fe->txeq, (const void **)(uintptr_t)fs,
1331 NETFE_TRACE("%s(%u): tle_evq_get(txevq=%p) "
1333 __func__, lcore, fe->txeq, n);
1334 for (j = 0; j != n; j++) {
1335 if (fs[j]->op == RXTX)
1336 netfe_rxtx_process(lcore, fs[j]);
1337 else if (fs[j]->op == FWD)
1338 netfe_fwd(lcore, fs[j]);
1339 else if (fs[j]->op == TXONLY)
1340 netfe_tx_process(lcore, fs[j]);
1345 RTE_LOG(NOTICE, USER1, "%s(lcore=%u) finish\n",
1348 while (fe->sidx != 0) {
1351 netfe_stream_dump(fe->fs + i);
1352 netfe_stream_close(fe, 1);
1355 tle_evq_destroy(fe->txeq);
1356 tle_evq_destroy(fe->rxeq);
1363 netbe_rx(struct netbe_lcore *lc, uint32_t pidx)
1366 struct rte_mbuf *pkt[MAX_PKT_BURST];
1367 struct rte_mbuf *rp[MAX_PKT_BURST];
1368 int32_t rc[MAX_PKT_BURST];
1370 n = rte_eth_rx_burst(lc->prt[pidx].port.id,
1371 lc->prt[pidx].rxqid, pkt, RTE_DIM(pkt));
1375 lc->prt[pidx].rx_stat.in += n;
1376 NETBE_TRACE("%s(%u): rte_eth_rx_burst(%u, %u) returns %u\n",
1377 __func__, lc->id, lc->prt[pidx].port.id, lc->prt[pidx].rxqid,
1380 k = tle_udp_rx_bulk(lc->prt[pidx].dev, pkt, rp, rc, n);
1382 lc->prt[pidx].rx_stat.up += k;
1383 lc->prt[pidx].rx_stat.drop += n - k;
1384 NETBE_TRACE("%s(%u): tle_udp_rx_bulk(%p, %u) returns %u\n",
1385 __func__, lc->id, lc->prt[pidx].dev, n, k);
1387 for (j = 0; j != n - k; j++) {
1388 NETBE_TRACE("%s:%d(port=%u) rp[%u]={%p, %d};\n",
1389 __func__, __LINE__, lc->prt[pidx].port.id,
1391 rte_pktmbuf_free(rp[j]);
1396 netbe_tx(struct netbe_lcore *lc, uint32_t pidx)
1399 struct rte_mbuf **mb;
1401 n = lc->prt[pidx].tx_buf.num;
1402 k = RTE_DIM(lc->prt[pidx].tx_buf.pkt) - n;
1403 mb = lc->prt[pidx].tx_buf.pkt;
1405 if (k >= RTE_DIM(lc->prt[pidx].tx_buf.pkt) / 2) {
1406 j = tle_udp_tx_bulk(lc->prt[pidx].dev, mb + n, k);
1408 lc->prt[pidx].tx_stat.down += j;
1414 NETBE_TRACE("%s(%u): tle_udp_tx_bulk(%p) returns %u,\n"
1415 "total pkts to send: %u\n",
1416 __func__, lc->id, lc->prt[pidx].dev, j, n);
1418 for (j = 0; j != n; j++)
1419 NETBE_PKT_DUMP(mb[j]);
1421 k = rte_eth_tx_burst(lc->prt[pidx].port.id,
1422 lc->prt[pidx].txqid, mb, n);
1424 lc->prt[pidx].tx_stat.out += k;
1425 lc->prt[pidx].tx_stat.drop += n - k;
1426 NETBE_TRACE("%s(%u): rte_eth_tx_burst(%u, %u, %u) returns %u\n",
1427 __func__, lc->id, lc->prt[pidx].port.id, lc->prt[pidx].txqid,
1430 lc->prt[pidx].tx_buf.num = n - k;
1432 for (j = k; j != n; j++)
1437 netbe_lcore(void *arg)
1441 struct netbe_lcore *lc;
1444 RTE_LOG(NOTICE, USER1, "%s(lcore=%u, udp_ctx: %p) start\n",
1445 __func__, lc->id, lc->ctx);
1449 * wait for FE lcores to start, so BE dont' drop any packets
1450 * because corresponding streams not opened yet by FE.
1451 * usefull when used with pcap PMDS.
1452 * think better way, or should this timeout be a cmdlien parameter.
1457 for (i = 0; i != lc->prt_num; i++) {
1458 RTE_LOG(NOTICE, USER1, "%s:%u(port=%u, udp_dev: %p)\n",
1459 __func__, i, lc->prt[i].port.id, lc->prt[i].dev);
1460 rc = setup_rx_cb(&lc->prt[i].port, lc);
1462 sig_handle(SIGQUIT);
1465 while (force_quit == 0) {
1466 for (i = 0; i != lc->prt_num; i++) {
1472 RTE_LOG(NOTICE, USER1, "%s(lcore=%u, udp_ctx: %p) finish\n",
1473 __func__, lc->id, lc->ctx);
1474 for (i = 0; i != lc->prt_num; i++) {
1475 RTE_LOG(NOTICE, USER1, "%s:%u(port=%u) "
1477 "in=%" PRIu64 ",up=%" PRIu64 ",drop=%" PRIu64 "}, "
1479 "in=%" PRIu64 ",up=%" PRIu64 ",drop=%" PRIu64 "};\n",
1480 __func__, i, lc->prt[i].port.id,
1481 lc->prt[i].rx_stat.in,
1482 lc->prt[i].rx_stat.up,
1483 lc->prt[i].rx_stat.drop,
1484 lc->prt[i].tx_stat.down,
1485 lc->prt[i].tx_stat.out,
1486 lc->prt[i].tx_stat.drop);
1490 for (i = 0; i != lc->prt_num; i++) {
1491 for (j = 0; j != lc->prt[i].tx_buf.num; j++)
1492 rte_pktmbuf_free(lc->prt[i].tx_buf.pkt[j]);
1499 netfe_lcore_cmp(const void *s1, const void *s2)
1501 const struct netfe_stream_prm *p1, *p2;
1505 return p1->lcore - p2->lcore;
1509 * Helper functions, finds BE by given local and remote addresses.
1512 netbe_find4(const struct in_addr *laddr, const struct in_addr *raddr)
1517 struct netbe_lcore *bc;
1519 if (laddr->s_addr == INADDR_ANY) {
1521 /* we have exactly one BE, use it for all traffic */
1522 if (becfg.cpu_num == 1)
1525 /* search by remote address. */
1526 for (i = 0; i != becfg.cpu_num; i++) {
1528 rc = rte_lpm_lookup(bc->lpm4,
1529 rte_be_to_cpu_32(raddr->s_addr), &idx);
1535 /* search by local address */
1536 for (i = 0; i != becfg.cpu_num; i++) {
1538 for (j = 0; j != bc->prt_num; j++)
1539 if (laddr->s_addr == bc->prt[j].port.ipv4)
1548 netbe_find6(const struct in6_addr *laddr, const struct in6_addr *raddr)
1553 struct netbe_lcore *bc;
1555 if (memcmp(laddr, &in6addr_any, sizeof(*laddr)) == 0) {
1557 /* we have exactly one BE, use it for all traffic */
1558 if (becfg.cpu_num == 1)
1561 /* search by remote address. */
1562 for (i = 0; i != becfg.cpu_num; i++) {
1564 rc = rte_lpm6_lookup(bc->lpm6,
1565 (uint8_t *)(uintptr_t)raddr->s6_addr, &idx);
1570 /* search by local address */
1571 for (i = 0; i != becfg.cpu_num; i++) {
1573 for (j = 0; j != bc->prt_num; j++)
1574 if (memcmp(laddr, &bc->prt[j].port.ipv6,
1575 sizeof(*laddr)) == 0)
1584 netbe_find(const struct tle_udp_stream_param *p)
1586 const struct sockaddr_in *l4, *r4;
1587 const struct sockaddr_in6 *l6, *r6;
1589 if (p->local_addr.ss_family == AF_INET) {
1590 l4 = (const struct sockaddr_in *)&p->local_addr;
1591 r4 = (const struct sockaddr_in *)&p->remote_addr;
1592 return netbe_find4(&l4->sin_addr, &r4->sin_addr);
1593 } else if (p->local_addr.ss_family == AF_INET6) {
1594 l6 = (const struct sockaddr_in6 *)&p->local_addr;
1595 r6 = (const struct sockaddr_in6 *)&p->remote_addr;
1596 return netbe_find6(&l6->sin6_addr, &r6->sin6_addr);
1602 netfe_sprm_flll_be(struct netfe_sprm *sp, uint32_t line)
1606 bidx = netbe_find(&sp->prm);
1608 RTE_LOG(ERR, USER1, "%s(line=%u): no BE for that stream\n",
1616 /* start front-end processing. */
1618 netfe_launch(struct netfe_lcore_prm *lprm)
1620 uint32_t i, j, k, lc, ln, mi;
1621 struct netfe_lcore_prm feprm[RTE_MAX_LCORE];
1623 /* determine on what BE each stream should be open. */
1624 for (i = 0; i != lprm->nb_streams; i++) {
1626 lc = lprm->stream[i].lcore;
1627 ln = lprm->stream[i].line;
1629 if (netfe_sprm_flll_be(&lprm->stream[i].sprm, ln) != 0 ||
1630 (lprm->stream[i].op == FWD &&
1631 netfe_sprm_flll_be(&lprm->stream[i].fprm,
1636 /* group all fe parameters by lcore. */
1638 memset(feprm, 0, sizeof(feprm));
1639 qsort(lprm->stream, lprm->nb_streams, sizeof(lprm->stream[0]),
1644 for (i = 0; i != lprm->nb_streams; i = j) {
1646 lc = lprm->stream[i].lcore;
1647 ln = lprm->stream[i].line;
1649 if (rte_lcore_is_enabled(lc) == 0) {
1651 "%s(line=%u): lcore %u is not enabled\n",
1656 if (rte_get_master_lcore() == lc)
1658 else if (rte_eal_get_lcore_state(lc) == RUNNING) {
1660 "%s(line=%u): lcore %u already in use\n",
1665 for (j = i + 1; j != lprm->nb_streams &&
1666 lc == lprm->stream[j].lcore;
1670 feprm[k].max_streams = lprm->max_streams;
1671 feprm[k].nb_streams = j - i;
1672 feprm[k].stream = lprm->stream + i;
1676 /* launch all slave FE lcores. */
1677 for (i = 0; i != k; i++) {
1679 rte_eal_remote_launch(netfe_lcore, feprm + i,
1680 feprm[i].stream[0].lcore);
1683 /* launch FE at master lcore. */
1684 if (mi != UINT32_MAX)
1685 netfe_lcore(feprm + mi);
1691 main(int argc, char *argv[])
1693 int32_t opt, opt_idx, rc;
1696 struct tle_udp_ctx_param ctx_prm;
1697 struct netfe_lcore_prm feprm;
1698 struct rte_eth_stats stats;
1699 char fecfg_fname[PATH_MAX + 1];
1700 char becfg_fname[PATH_MAX + 1];
1705 rc = rte_eal_init(argc, argv);
1707 rte_exit(EXIT_FAILURE,
1708 "%s: rte_eal_init failed with error code: %d\n",
1711 memset(&ctx_prm, 0, sizeof(ctx_prm));
1718 while ((opt = getopt_long(argc, argv, "B:PR:S:b:f:s:", long_opt,
1719 &opt_idx)) != EOF) {
1720 if (opt == OPT_SHORT_SBULK) {
1721 rc = parse_uint_val(NULL, optarg, &v);
1723 rte_exit(EXIT_FAILURE, "%s: invalid value: %s "
1724 "for option: \'%c\'\n",
1725 __func__, optarg, opt);
1726 ctx_prm.send_bulk_size = v;
1727 } else if (opt == OPT_SHORT_PROMISC) {
1729 } else if (opt == OPT_SHORT_RBUFS) {
1730 rc = parse_uint_val(NULL, optarg, &v);
1732 rte_exit(EXIT_FAILURE, "%s: invalid value: %s "
1733 "for option: \'%c\'\n",
1734 __func__, optarg, opt);
1735 ctx_prm.max_stream_rbufs = v;
1736 } else if (opt == OPT_SHORT_SBUFS) {
1737 rc = parse_uint_val(NULL, optarg, &v);
1739 rte_exit(EXIT_FAILURE, "%s: invalid value: %s "
1740 "for option: \'%c\'\n",
1741 __func__, optarg, opt);
1742 ctx_prm.max_stream_sbufs = v;
1743 } else if (opt == OPT_SHORT_STREAMS) {
1744 rc = parse_uint_val(NULL, optarg, &v);
1746 rte_exit(EXIT_FAILURE, "%s: invalid value: %s "
1747 "for option: \'%c\'\n",
1748 __func__, optarg, opt);
1749 ctx_prm.max_streams = v;
1750 } else if (opt == OPT_SHORT_BECFG) {
1751 snprintf(becfg_fname, sizeof(becfg_fname), "%s",
1753 } else if (opt == OPT_SHORT_FECFG) {
1754 snprintf(fecfg_fname, sizeof(fecfg_fname), "%s",
1757 rte_exit(EXIT_FAILURE,
1758 "%s: unknown option: \'%c\'\n",
1763 signal(SIGINT, sig_handle);
1765 netbe_port_init(&becfg, argc - optind, argv + optind);
1766 netbe_lcore_init(&becfg, &ctx_prm);
1768 if ((rc = netbe_dest_init(becfg_fname, &becfg)) != 0)
1769 sig_handle(SIGQUIT);
1771 for (i = 0; i != becfg.prt_num && rc == 0; i++) {
1772 RTE_LOG(NOTICE, USER1, "%s: starting port %u\n",
1773 __func__, becfg.prt[i].id);
1774 rc = rte_eth_dev_start(becfg.prt[i].id);
1777 "%s: rte_eth_dev_start(%u) returned "
1779 __func__, becfg.prt[i].id, rc);
1780 sig_handle(SIGQUIT);
1784 feprm.max_streams = ctx_prm.max_streams * becfg.cpu_num;
1785 if (rc == 0 && (rc = netfe_parse_cfg(fecfg_fname, &feprm)) != 0)
1786 sig_handle(SIGQUIT);
1788 for (i = 0; rc == 0 && i != becfg.cpu_num; i++) {
1789 rte_eal_remote_launch(netbe_lcore, becfg.cpu + i,
1793 if (rc == 0 && (rc = netfe_launch(&feprm)) != 0)
1794 sig_handle(SIGQUIT);
1796 rte_eal_mp_wait_lcore();
1798 for (i = 0; i != becfg.prt_num; i++) {
1799 RTE_LOG(NOTICE, USER1, "%s: stoping port %u\n",
1800 __func__, becfg.prt[i].id);
1801 rte_eth_stats_get(becfg.prt[i].id, &stats);
1802 RTE_LOG(NOTICE, USER1, "port %u stats={\n"
1803 "ipackets=%" PRIu64 ";"
1804 "ibytes=%" PRIu64 ";"
1805 "ierrors=%" PRIu64 ";\n"
1806 "opackets=%" PRIu64 ";"
1807 "obytes=%" PRIu64 ";"
1808 "oerrors=%" PRIu64 ";\n"
1817 rte_eth_dev_stop(becfg.prt[i].id);
1820 netbe_lcore_fini(&becfg);