Change libtle_udp to use dring.
[tldk.git] / examples / udpfwd / main.c
1 /*
2  * Copyright (c) 2016  Intel Corporation.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include "netbe.h"
17 #include "parse.h"
18
19 #define MAX_RULES       0x100
20 #define MAX_TBL8        0x800
21
22 #define RX_RING_SIZE    0x400
23 #define TX_RING_SIZE    0x800
24
25 #define MPOOL_CACHE_SIZE        0x100
26 #define MPOOL_NB_BUF            0x20000
27
28 #define FRAG_MBUF_BUF_SIZE      (RTE_PKTMBUF_HEADROOM + TLE_UDP_MAX_HDR)
29 #define FRAG_TTL                MS_PER_S
30 #define FRAG_TBL_BUCKET_ENTRIES 16
31
32 #define FIRST_PORT      0x8000
33
34 #define RX_CSUM_OFFLOAD (DEV_RX_OFFLOAD_IPV4_CKSUM | DEV_RX_OFFLOAD_UDP_CKSUM)
35 #define TX_CSUM_OFFLOAD (DEV_TX_OFFLOAD_IPV4_CKSUM | DEV_TX_OFFLOAD_UDP_CKSUM)
36
37 #define OPT_SHORT_SBULK         'B'
38 #define OPT_LONG_SBULK          "sburst"
39
40 #define OPT_SHORT_PROMISC       'P'
41 #define OPT_LONG_PROMISC        "promisc"
42
43 #define OPT_SHORT_RBUFS 'R'
44 #define OPT_LONG_RBUFS  "rbufs"
45
46 #define OPT_SHORT_SBUFS 'S'
47 #define OPT_LONG_SBUFS  "sbufs"
48
49 #define OPT_SHORT_STREAMS       's'
50 #define OPT_LONG_STREAMS        "streams"
51
52 #define OPT_SHORT_FECFG 'f'
53 #define OPT_LONG_FECFG  "fecfg"
54
55 #define OPT_SHORT_BECFG 'b'
56 #define OPT_LONG_BECFG  "becfg"
57
58 RTE_DEFINE_PER_LCORE(struct netfe_lcore *, _fe);
59
60 #include "fwdtbl.h"
61
62 static const struct option long_opt[] = {
63         {OPT_LONG_BECFG, 1, 0, OPT_SHORT_BECFG},
64         {OPT_LONG_FECFG, 1, 0, OPT_SHORT_FECFG},
65         {OPT_LONG_PROMISC, 0, 0, OPT_SHORT_PROMISC},
66         {OPT_LONG_RBUFS, 1, 0, OPT_SHORT_RBUFS},
67         {OPT_LONG_SBUFS, 1, 0, OPT_SHORT_SBUFS},
68         {OPT_LONG_SBULK, 1, 0, OPT_SHORT_SBULK},
69         {OPT_LONG_STREAMS, 1, 0, OPT_SHORT_STREAMS},
70         {NULL, 0, 0, 0}
71 };
72
73 static volatile int force_quit;
74
75 static struct netbe_cfg becfg;
76 static struct rte_mempool *mpool[RTE_MAX_NUMA_NODES + 1];
77 static struct rte_mempool *frag_mpool[RTE_MAX_NUMA_NODES + 1];
78
79 static const struct rte_eth_conf port_conf_default = {
80         .rxmode = {
81                 .max_rx_pkt_len = ETHER_MAX_VLAN_FRAME_LEN,
82                 .hw_vlan_strip = 1,
83                 .jumbo_frame = 1,
84         },
85 };
86
87 #include "parse.h"
88
89 static void
90 sig_handle(int signum)
91 {
92         RTE_LOG(ERR, USER1, "%s(%d)\n", __func__, signum);
93         force_quit = 1;
94 }
95
96 /*
97  * Initilise DPDK port.
98  * In current version, only one queue per port is used.
99  */
100 static int
101 port_init(struct netbe_port *uprt, struct rte_mempool *mp)
102 {
103         int32_t socket, rc;
104         uint16_t q;
105         struct rte_eth_conf port_conf;
106         struct rte_eth_dev_info dev_info;
107
108         const uint16_t rx_rings = 1, tx_rings = 1;
109
110         rte_eth_dev_info_get(uprt->id, &dev_info);
111         if ((dev_info.rx_offload_capa & uprt->rx_offload) != uprt->rx_offload) {
112                 RTE_LOG(ERR, USER1,
113                         "port#%u supported/requested RX offloads don't match, "
114                         "supported: %#x, requested: %#x;\n",
115                         uprt->id, dev_info.rx_offload_capa, uprt->rx_offload);
116                 return -EINVAL;
117         }
118         if ((dev_info.tx_offload_capa & uprt->tx_offload) != uprt->tx_offload) {
119                 RTE_LOG(ERR, USER1,
120                         "port#%u supported/requested TX offloads don't match, "
121                         "supported: %#x, requested: %#x;\n",
122                         uprt->id, dev_info.tx_offload_capa, uprt->tx_offload);
123                 return -EINVAL;
124         }
125
126         port_conf = port_conf_default;
127         if ((uprt->rx_offload & RX_CSUM_OFFLOAD) != 0) {
128                 RTE_LOG(ERR, USER1, "%s(%u): enabling RX csum offload;\n",
129                         __func__, uprt->id);
130                 port_conf.rxmode.hw_ip_checksum = 1;
131         }
132
133         port_conf.rxmode.max_rx_pkt_len = uprt->mtu + ETHER_CRC_LEN;
134
135         rc = rte_eth_dev_configure(uprt->id, rx_rings, tx_rings, &port_conf);
136         RTE_LOG(NOTICE, USER1,
137                 "%s: rte_eth_dev_configure(%u) returns %d;\n",
138                 __func__, uprt->id, rc);
139         if (rc != 0)
140                 return rc;
141
142         socket = rte_eth_dev_socket_id(uprt->id);
143
144         dev_info.default_rxconf.rx_drop_en = 1;
145
146         dev_info.default_txconf.tx_free_thresh = TX_RING_SIZE / 2;
147         if (uprt->tx_offload != 0) {
148                 RTE_LOG(ERR, USER1, "%s(%u): enabling full featured TX;\n",
149                         __func__, uprt->id);
150                 dev_info.default_txconf.txq_flags = 0;
151         }
152
153         for (q = 0; q < rx_rings; q++) {
154                 rc = rte_eth_rx_queue_setup(uprt->id, q, RX_RING_SIZE,
155                         socket, NULL, mp);
156                 if (rc < 0)
157                         return rc;
158         }
159
160         for (q = 0; q < tx_rings; q++) {
161                 rc = rte_eth_tx_queue_setup(uprt->id, q, TX_RING_SIZE,
162                         socket, &dev_info.default_txconf);
163                 if (rc < 0)
164                         return rc;
165         }
166
167
168         return 0;
169 }
170
171 /*
172  * Check that lcore is enabled, not master, and not in use already.
173  */
174 static int
175 check_lcore(uint32_t lc)
176 {
177         if (rte_lcore_is_enabled(lc) == 0) {
178                 RTE_LOG(ERR, USER1, "lcore %u is not enabled\n", lc);
179                 return -EINVAL;
180         }
181         if (rte_get_master_lcore() == lc) {
182                 RTE_LOG(ERR, USER1, "lcore %u is not slave\n", lc);
183                 return -EINVAL;
184         }
185         if (rte_eal_get_lcore_state(lc) == RUNNING) {
186                 RTE_LOG(ERR, USER1, "lcore %u already running %p\n",
187                         lc, lcore_config[lc].f);
188                 return -EINVAL;
189         }
190         return 0;
191 }
192
193 static void
194 log_netbe_prt(const struct netbe_port *uprt)
195 {
196         RTE_LOG(NOTICE, USER1,
197                 "uprt %p = <id = %u, lcore = %u, "
198                 "mtu = %u, rx_offload = %u, tx_offload = %u,\n"
199                 "ipv4 = %#x, "
200                 "ipv6 = %04hx:%04hx:%04hx:%04hx:%04hx:%04hx:%04hx:%04hx, "
201                 "mac = %02hhx:%02hhx:%02hhx:%02hhx:%02hhx:%02hhx>;\n",
202                 uprt, uprt->id, uprt->lcore,
203                 uprt->mtu, uprt->rx_offload, uprt->tx_offload,
204                 uprt->ipv4,
205                 uprt->ipv6.s6_addr16[0], uprt->ipv6.s6_addr16[1],
206                 uprt->ipv6.s6_addr16[2], uprt->ipv6.s6_addr16[3],
207                 uprt->ipv6.s6_addr16[4], uprt->ipv6.s6_addr16[5],
208                 uprt->ipv6.s6_addr16[6], uprt->ipv6.s6_addr16[7],
209                 uprt->mac.addr_bytes[0], uprt->mac.addr_bytes[1],
210                 uprt->mac.addr_bytes[2], uprt->mac.addr_bytes[3],
211                 uprt->mac.addr_bytes[4], uprt->mac.addr_bytes[5]);
212 }
213
214 static void
215 log_netbe_cfg(const struct netbe_cfg *ucfg)
216 {
217         uint32_t i;
218
219         RTE_LOG(NOTICE, USER1,
220                 "ucfg @ %p, prt_num = %u\n", ucfg, ucfg->prt_num);
221
222         for (i = 0; i != ucfg->prt_num; i++)
223                 log_netbe_prt(ucfg->prt + i);
224 }
225
226 static int
227 pool_init(uint32_t sid)
228 {
229         int32_t rc;
230         struct rte_mempool *mp;
231         char name[RTE_MEMPOOL_NAMESIZE];
232
233         snprintf(name, sizeof(name), "MP%u", sid);
234         mp = rte_pktmbuf_pool_create(name, MPOOL_NB_BUF, MPOOL_CACHE_SIZE, 0,
235                 RTE_MBUF_DEFAULT_BUF_SIZE, sid - 1);
236         if (mp == NULL) {
237                 rc = -rte_errno;
238                 RTE_LOG(ERR, USER1, "%s(%d) failed with error code: %d\n",
239                         __func__, sid - 1, rc);
240                 return rc;
241         }
242
243         mpool[sid] = mp;
244         return 0;
245 }
246
247 static int
248 frag_pool_init(uint32_t sid)
249 {
250         int32_t rc;
251         struct rte_mempool *frag_mp;
252         char frag_name[RTE_MEMPOOL_NAMESIZE];
253
254         snprintf(frag_name, sizeof(frag_name), "frag_MP%u", sid);
255         frag_mp = rte_pktmbuf_pool_create(frag_name, MPOOL_NB_BUF,
256                 MPOOL_CACHE_SIZE, 0, FRAG_MBUF_BUF_SIZE, sid - 1);
257         if (frag_mp == NULL) {
258                 rc = -rte_errno;
259                 RTE_LOG(ERR, USER1, "%s(%d) failed with error code: %d\n",
260                         __func__, sid - 1, rc);
261                 return rc;
262         }
263
264         frag_mpool[sid] = frag_mp;
265         return 0;
266 }
267
268
269 /*
270  * Setup all enabled ports.
271  */
272 static void
273 netbe_port_init(struct netbe_cfg *cfg, int argc, char *argv[])
274 {
275         int32_t rc;
276         uint32_t i, n, sid;
277
278         n = RTE_MIN(RTE_DIM(cfg->prt), (uint32_t)argc);
279
280         rc = 0;
281         for (i = 0; i != n; i++) {
282                 rc = parse_netbe_arg(cfg->prt + i, argv[i]);
283                 if (rc != 0)
284                         break;
285
286                 rc = check_lcore(cfg->prt[i].lcore);
287                 if (rc != 0)
288                         break;
289
290                 sid = rte_lcore_to_socket_id(cfg->prt[i].lcore) + 1;
291                 assert(sid < RTE_DIM(mpool));
292
293                 if (mpool[sid] == NULL && (rc = pool_init(sid)) != 0)
294                         break;
295
296                 if (frag_mpool[sid] == NULL && (rc = frag_pool_init(sid)) != 0)
297                         break;
298
299                 rc = port_init(cfg->prt + i, mpool[sid]);
300                 if (rc != 0)
301                         break;
302
303                 rte_eth_macaddr_get(cfg->prt[i].id, &cfg->prt[i].mac);
304                 if (cfg->promisc)
305                         rte_eth_promiscuous_enable(cfg->prt[i].id);
306         }
307
308         if (rc != 0)
309                 rte_exit(EXIT_FAILURE,
310                         "%s: processing of \"%s\" failed with error code: %d\n",
311                         __func__, argv[i], rc);
312
313         cfg->prt_num = i;
314         log_netbe_cfg(cfg);
315 }
316
317 /*
318  * UDP IPv4 destination lookup callback.
319  */
320 static int
321 lpm4_dst_lookup(void *data, const struct in_addr *addr,
322         struct tle_udp_dest *res)
323 {
324         int32_t rc;
325         uint32_t idx;
326         struct netbe_lcore *lc;
327         struct tle_udp_dest *dst;
328
329         lc = data;
330
331         rc = rte_lpm_lookup(lc->lpm4, rte_be_to_cpu_32(addr->s_addr), &idx);
332         if (rc == 0) {
333                 dst = &lc->dst4[idx];
334                 rte_memcpy(res, dst, dst->l2_len + dst->l3_len +
335                         offsetof(struct tle_udp_dest, hdr));
336         }
337         return rc;
338 }
339
340 /*
341  * UDP IPv6 destination lookup callback.
342  */
343 static int
344 lpm6_dst_lookup(void *data, const struct in6_addr *addr,
345         struct tle_udp_dest *res)
346 {
347         int32_t rc;
348         uint8_t idx;
349         struct netbe_lcore *lc;
350         struct tle_udp_dest *dst;
351         uintptr_t p;
352
353         lc = data;
354         p = (uintptr_t)addr->s6_addr;
355
356         rc = rte_lpm6_lookup(lc->lpm6, (uint8_t *)p, &idx);
357         if (rc == 0) {
358                 dst = &lc->dst6[idx];
359                 rte_memcpy(res, dst, dst->l2_len + dst->l3_len +
360                         offsetof(struct tle_udp_dest, hdr));
361         }
362         return rc;
363 }
364
365 static int
366 netbe_add_ipv4_route(struct netbe_lcore *lc, const struct netbe_dest *dst,
367         uint8_t idx)
368 {
369         int32_t rc;
370         uint32_t addr, depth;
371         char str[INET_ADDRSTRLEN];
372
373         depth = dst->prfx;
374         addr = rte_be_to_cpu_32(dst->ipv4.s_addr);
375
376         inet_ntop(AF_INET, &dst->ipv4, str, sizeof(str));
377         rc = rte_lpm_add(lc->lpm4, addr, depth, idx);
378         RTE_LOG(NOTICE, USER1, "%s(lcore=%u,port=%u,dev=%p,"
379                 "ipv4=%s/%u,mtu=%u,"
380                 "mac=%02hhx:%02hhx:%02hhx:%02hhx:%02hhx:%02hhx) "
381                 "returns %d;\n",
382                 __func__, lc->id, dst->port, lc->dst4[idx].dev,
383                 str, depth, lc->dst4[idx].mtu,
384                 dst->mac.addr_bytes[0], dst->mac.addr_bytes[1],
385                 dst->mac.addr_bytes[2], dst->mac.addr_bytes[3],
386                 dst->mac.addr_bytes[4], dst->mac.addr_bytes[5],
387                 rc);
388         return rc;
389 }
390
391 static int
392 netbe_add_ipv6_route(struct netbe_lcore *lc, const struct netbe_dest *dst,
393         uint8_t idx)
394 {
395         int32_t rc;
396         uint32_t depth;
397         char str[INET6_ADDRSTRLEN];
398
399         depth = dst->prfx;
400
401         rc = rte_lpm6_add(lc->lpm6, (uint8_t *)(uintptr_t)dst->ipv6.s6_addr,
402                 depth, idx);
403
404         inet_ntop(AF_INET6, &dst->ipv6, str, sizeof(str));
405         RTE_LOG(NOTICE, USER1, "%s(lcore=%u,port=%u,dev=%p,"
406                 "ipv6=%s/%u,mtu=%u,"
407                 "mac=%02hhx:%02hhx:%02hhx:%02hhx:%02hhx:%02hhx) "
408                 "returns %d;\n",
409                 __func__, lc->id, dst->port, lc->dst6[idx].dev,
410                 str, depth, lc->dst4[idx].mtu,
411                 dst->mac.addr_bytes[0], dst->mac.addr_bytes[1],
412                 dst->mac.addr_bytes[2], dst->mac.addr_bytes[3],
413                 dst->mac.addr_bytes[4], dst->mac.addr_bytes[5],
414                 rc);
415         return rc;
416 }
417
418 static int
419 lcore_lpm_init(struct netbe_lcore *lc)
420 {
421         int32_t sid;
422         char str[RTE_LPM_NAMESIZE];
423         const struct rte_lpm_config lpm4_cfg = {
424                 .max_rules = MAX_RULES,
425                 .number_tbl8s = MAX_TBL8,
426         };
427         const struct rte_lpm6_config lpm6_cfg = {
428                 .max_rules = MAX_RULES,
429                 .number_tbl8s = MAX_TBL8,
430         };
431
432         sid = rte_lcore_to_socket_id(lc->id);
433
434         snprintf(str, sizeof(str), "LPM4%u\n", lc->id);
435         lc->lpm4 = rte_lpm_create(str, sid, &lpm4_cfg);
436         RTE_LOG(NOTICE, USER1, "%s(lcore=%u): lpm4=%p;\n",
437                 __func__, lc->id, lc->lpm4);
438         if (lc->lpm4 == NULL)
439                 return -ENOMEM;
440
441         snprintf(str, sizeof(str), "LPM6%u\n", lc->id);
442         lc->lpm6 = rte_lpm6_create(str, sid, &lpm6_cfg);
443         RTE_LOG(NOTICE, USER1, "%s(lcore=%u): lpm6=%p;\n",
444                 __func__, lc->id, lc->lpm6);
445         if (lc->lpm6 == NULL)
446                 return -ENOMEM;
447
448         return 0;
449 }
450
451 static void
452 fill_dst(struct tle_udp_dest *dst, struct netbe_dev *bed,
453         const struct netbe_dest *bdp, uint16_t l3_type, int32_t sid)
454 {
455         struct ether_hdr *eth;
456         struct ipv4_hdr *ip4h;
457         struct ipv6_hdr *ip6h;
458
459         static const struct ipv4_hdr ipv4_tmpl = {
460                 .version_ihl =  4 << 4 | sizeof(*ip4h) / IPV4_IHL_MULTIPLIER,
461                 .time_to_live = 64,
462                 .next_proto_id = IPPROTO_UDP,
463         };
464
465         static const struct ipv6_hdr ipv6_tmpl = {
466                 .vtc_flow = 6 << 4,
467                 .proto = IPPROTO_UDP,
468                 .hop_limits = 64,
469         };
470
471         dst->dev = bed->dev;
472         dst->head_mp = frag_mpool[sid + 1];
473         dst->mtu = RTE_MIN(bdp->mtu, bed->port.mtu);
474         dst->l2_len = sizeof(*eth);
475
476         eth = (struct ether_hdr *)dst->hdr;
477
478         ether_addr_copy(&bed->port.mac, &eth->s_addr);
479         ether_addr_copy(&bdp->mac, &eth->d_addr);
480         eth->ether_type = rte_cpu_to_be_16(l3_type);
481
482         if (l3_type == ETHER_TYPE_IPv4) {
483                 dst->l3_len = sizeof(*ip4h);
484                 ip4h = (struct ipv4_hdr *)(eth + 1);
485                 ip4h[0] = ipv4_tmpl;
486         } else if (l3_type == ETHER_TYPE_IPv6) {
487                 dst->l3_len = sizeof(*ip6h);
488                 ip6h = (struct ipv6_hdr *)(eth + 1);
489                 ip6h[0] = ipv6_tmpl;
490         }
491 }
492
493
494 /*
495  * BE lcore setup routine.
496  */
497 static int
498 lcore_init(struct netbe_lcore *lc, const struct tle_udp_ctx_param *ctx_prm,
499         const struct netbe_port prt[], uint32_t prt_num)
500 {
501         int32_t rc, sid;
502         uint32_t i;
503         uint64_t frag_cycles;
504         struct tle_udp_ctx_param cprm;
505         struct tle_udp_dev_param dprm;
506
507         lc->id = prt[0].lcore;
508         lc->prt_num = prt_num;
509
510         sid = rte_lcore_to_socket_id(lc->id);
511
512         rc = lcore_lpm_init(lc);
513         if (rc != 0)
514                 return rc;
515
516         cprm = *ctx_prm;
517         cprm.socket_id = sid;
518         cprm.lookup4 = lpm4_dst_lookup;
519         cprm.lookup4_data = lc;
520         cprm.lookup6 = lpm6_dst_lookup;
521         cprm.lookup6_data = lc;
522
523         /* to facilitate both IPv4 and IPv6. */
524         cprm.max_streams *= 2;
525
526         frag_cycles = (rte_get_tsc_hz() + MS_PER_S - 1) / MS_PER_S * FRAG_TTL;
527         lc->ftbl = rte_ip_frag_table_create(cprm.max_streams,
528                 FRAG_TBL_BUCKET_ENTRIES, cprm.max_streams, frag_cycles, sid);
529         RTE_LOG(NOTICE, USER1, "%s(lcore=%u): frag_tbl=%p;\n",
530                 __func__, lc->id, lc->ftbl);
531
532         lc->ctx = tle_udp_create(&cprm);
533         RTE_LOG(NOTICE, USER1, "%s(lcore=%u): udp_ctx=%p;\n",
534                 __func__, lc->id, lc->ctx);
535
536         if (lc->ctx == NULL || lc->ftbl == NULL)
537                 rc = ENOMEM;
538
539         for (i = 0; i != prt_num && rc == 0; i++) {
540
541                 memset(&dprm, 0, sizeof(dprm));
542
543                 lc->prt[i].rxqid = 0;
544                 lc->prt[i].txqid = 0;
545                 lc->prt[i].port = prt[i];
546
547                 dprm.rx_offload = prt[i].rx_offload;
548                 dprm.tx_offload = prt[i].tx_offload;
549                 dprm.local_addr4.s_addr = prt[i].ipv4;
550                 memcpy(&dprm.local_addr6,  &prt[i].ipv6, sizeof(prt[i].ipv6));
551
552                 lc->prt[i].dev = tle_udp_add_dev(lc->ctx, &dprm);
553                 RTE_LOG(NOTICE, USER1, "%s(lcore=%u, port=%u), udp_dev: %p;\n",
554                         __func__, lc->id, prt[i].id, lc->prt[i].dev);
555                 if (lc->prt[i].dev == NULL)
556                         rc = -rte_errno;
557         }
558
559         if (rc != 0) {
560                 RTE_LOG(ERR, USER1, "%s(lcore=%u) failed with error code: %d\n",
561                         __func__, lc->id, rc);
562                 tle_udp_destroy(lc->ctx);
563                 rte_ip_frag_table_destroy(lc->ftbl);
564                 rte_lpm_free(lc->lpm4);
565                 rte_lpm6_free(lc->lpm6);
566         }
567
568         return rc;
569 }
570
571 static int
572 prt_lcore_cmp(const void *s1, const void *s2)
573 {
574         const struct netbe_port *p1, *p2;
575
576         p1 = s1;
577         p2 = s2;
578         return p1->lcore - p2->lcore;
579 }
580
581 static void
582 netbe_lcore_init(struct netbe_cfg *cfg, const struct tle_udp_ctx_param *ctx_prm)
583 {
584         int32_t rc;
585         uint32_t i, k, n, num;
586         struct netbe_port sp[RTE_DIM(cfg->prt)];
587
588         num = cfg->prt_num;
589         memcpy(sp, cfg->prt, sizeof(sp[0]) * num);
590         qsort(sp, num, sizeof(sp[0]), prt_lcore_cmp);
591
592         /* Fill ports to be used by each lcore. */
593
594         k = 0;
595         n = 0;
596         rc = 0;
597         for (i = 0; i != num && rc == 0; i++) {
598                 if (sp[n].lcore != sp[i].lcore) {
599                         rc = lcore_init(cfg->cpu + k, ctx_prm, sp + n, i - n);
600                         n = i;
601                         k++;
602                 }
603         }
604
605         if (rc == 0 && i != n) {
606                 rc = lcore_init(cfg->cpu + k, ctx_prm, sp + n, i - n);
607                 k++;
608         }
609
610         if (rc != 0)
611                 rte_exit(EXIT_FAILURE, "%s: failed with error code: %d\n",
612                         __func__, rc);
613
614         cfg->cpu_num = k;
615 }
616
617 static void
618 netbe_lcore_fini(struct netbe_cfg *cfg)
619 {
620         uint32_t i;
621
622         for (i = 0; i != cfg->cpu_num; i++) {
623                 tle_udp_destroy(cfg->cpu[i].ctx);
624                 rte_ip_frag_table_destroy(cfg->cpu[i].ftbl);
625                 rte_lpm_free(cfg->cpu[i].lpm4);
626                 rte_lpm6_free(cfg->cpu[i].lpm6);
627         }
628
629         memset(cfg->cpu, 0, sizeof(cfg->cpu));
630         cfg->cpu_num = 0;
631 }
632
633 static int
634 netbe_add_dest(struct netbe_lcore *lc, uint32_t dev_idx, uint16_t family,
635         const struct netbe_dest *dst, uint32_t dnum)
636 {
637         int32_t rc, sid;
638         uint16_t l3_type;
639         uint32_t i, n, m;
640         struct tle_udp_dest *dp;
641
642         if (family == AF_INET) {
643                 n = lc->dst4_num;
644                 dp = lc->dst4 + n;
645                 m = RTE_DIM(lc->dst4);
646                 l3_type = ETHER_TYPE_IPv4;
647         } else {
648                 n = lc->dst6_num;
649                 dp = lc->dst6 + n;
650                 m = RTE_DIM(lc->dst6);
651                 l3_type = ETHER_TYPE_IPv6;
652         }
653
654         if (n + dnum >= m) {
655                 RTE_LOG(ERR, USER1, "%s(lcore=%u, family=%hu, dnum=%u) exceeds "
656                         "maximum allowed number of destinations(%u);\n",
657                         __func__, lc->id, family, dnum, m);
658                 return -ENOSPC;
659         }
660
661         sid = rte_lcore_to_socket_id(lc->id);
662         rc = 0;
663
664         for (i = 0; i != dnum && rc == 0; i++) {
665                 fill_dst(dp + i, lc->prt + dev_idx, dst + i, l3_type, sid);
666                 if (family == AF_INET)
667                         rc = netbe_add_ipv4_route(lc, dst + i, n + i);
668                 else
669                         rc = netbe_add_ipv6_route(lc, dst + i, n + i);
670         }
671
672         if (family == AF_INET)
673                 lc->dst4_num = n + i;
674         else
675                 lc->dst6_num = n + i;
676
677         return rc;
678 }
679
680 static int
681 netbe_port2lcore(struct netbe_cfg *cfg, uint32_t port, struct netbe_lcore **plc)
682 {
683         uint32_t i, j;
684         struct netbe_lcore *lc;
685
686         for (i = 0; i != cfg->cpu_num; i++) {
687                 lc = cfg->cpu + i;
688                 for (j = 0; j != cfg->prt_num; j++) {
689                         if (lc->prt[j].port.id == port) {
690                                 *plc = lc;
691                                 return j;
692                         }
693                 }
694         }
695
696         return -ENOENT;
697 }
698
699 static int
700 netbe_dest_cmp(const void *s1, const void *s2)
701 {
702         const struct netbe_dest *p1, *p2;
703
704         p1 = s1;
705         p2 = s2;
706         if (p1->port == p2->port)
707                 return p1->family - p2->family;
708         else
709                 return p1->port - p2->port;
710 }
711
712 static int
713 netbe_dest_init(const char *fname, struct netbe_cfg *cfg)
714 {
715         int32_t rc;
716         uint32_t f, i, j, p;
717         struct netbe_lcore *lc;
718         struct netbe_dest_prm prm;
719
720         rc = netbe_parse_dest(fname, &prm);
721         if (rc != 0)
722                 return rc;
723
724         qsort(prm.dest, prm.nb_dest, sizeof(prm.dest[0]), netbe_dest_cmp);
725
726         rc = 0;
727         for (i = 0; i != prm.nb_dest; i = j) {
728
729                 p = prm.dest[i].port;
730                 f = prm.dest[i].family;
731                 for (j = i + 1; j != prm.nb_dest && p == prm.dest[j].port &&
732                                 f == prm.dest[j].family;
733                                 j++)
734                         ;
735
736                 rc = netbe_port2lcore(cfg, p, &lc);
737                 if (rc < 0) {
738                         RTE_LOG(ERR, USER1, "%s(%s) error at line %u: "
739                                 "port %u not managed by any lcore;\n",
740                                 __func__, fname, prm.dest[i].line, p);
741                         break;
742                 }
743
744                 rc = netbe_add_dest(lc, rc, f, prm.dest + i, j - i);
745                 if (rc != 0)
746                         break;
747         }
748
749         free(prm.dest);
750         return rc;
751 }
752
753 static void
754 netfe_stream_close(struct netfe_lcore *fe, uint32_t dec)
755 {
756         uint32_t sidx;
757
758         fe->sidx -= dec;
759         sidx = fe->sidx;
760         tle_event_free(fe->fs[sidx].txev);
761         tle_event_free(fe->fs[sidx].rxev);
762         tle_udp_stream_close(fe->fs[sidx].s);
763         memset(&fe->fs[sidx], 0, sizeof(fe->fs[sidx]));
764 }
765
766 static void
767 netfe_stream_dump(const struct netfe_stream *fes)
768 {
769         struct sockaddr_in *l4, *r4;
770         struct sockaddr_in6 *l6, *r6;
771         uint16_t lport, rport;
772         struct tle_udp_stream_param sprm;
773         char laddr[INET6_ADDRSTRLEN];
774         char raddr[INET6_ADDRSTRLEN];
775
776         tle_udp_stream_get_param(fes->s, &sprm);
777
778         if (sprm.local_addr.ss_family == AF_INET) {
779
780                 l4 = (struct sockaddr_in *)&sprm.local_addr;
781                 r4 = (struct sockaddr_in *)&sprm.remote_addr;
782
783                 lport = l4->sin_port;
784                 rport = r4->sin_port;
785
786         } else if (sprm.local_addr.ss_family == AF_INET6) {
787
788                 l6 = (struct sockaddr_in6 *)&sprm.local_addr;
789                 r6 = (struct sockaddr_in6 *)&sprm.remote_addr;
790
791                 lport = l6->sin6_port;
792                 rport = r6->sin6_port;
793
794         } else {
795                 RTE_LOG(ERR, USER1, "stream@%p - unknown family=%hu\n",
796                         fes->s, sprm.local_addr.ss_family);
797                 return;
798         }
799
800         format_addr(&sprm.local_addr, laddr, sizeof(laddr));
801         format_addr(&sprm.remote_addr, raddr, sizeof(raddr));
802
803         RTE_LOG(INFO, USER1,
804                 "stream@%p={"
805                 "family=%hu,laddr=%s,lport=%hu,raddr=%s,rport=%hu,"
806                 "stats={"
807                 "rxp=%" PRIu64 ",txp=%" PRIu64 ",drops=%" PRIu64 ","
808                 "rxev[IDLE, DOWN, UP]=[%" PRIu64 ", %" PRIu64 ", %" PRIu64 "],"
809                 "txev[IDLE, DOWN, UP]=[%" PRIu64 ", %" PRIu64 ", %" PRIu64 "],"
810                 "}};\n",
811                 fes->s,
812                 sprm.local_addr.ss_family,
813                 laddr, ntohs(lport), raddr, ntohs(rport),
814                 fes->stat.rxp, fes->stat.txp, fes->stat.drops,
815                 fes->stat.rxev[TLE_SEV_IDLE],
816                 fes->stat.rxev[TLE_SEV_DOWN],
817                 fes->stat.rxev[TLE_SEV_UP],
818                 fes->stat.txev[TLE_SEV_IDLE],
819                 fes->stat.txev[TLE_SEV_DOWN],
820                 fes->stat.txev[TLE_SEV_UP]);
821 }
822
823
824 /*
825  * helper function: opens IPv4 and IPv6 streams for selected port.
826  */
827 static struct netfe_stream *
828 netfe_stream_open(struct netfe_lcore *fe, struct tle_udp_stream_param *sprm,
829         uint32_t lcore, uint16_t op, uint32_t bidx)
830 {
831         int32_t rc;
832         uint32_t sidx;
833         struct netfe_stream *fes;
834
835         sidx = fe->sidx;
836         fes = fe->fs + sidx;
837         if (sidx >= fe->snum) {
838                 rte_errno = ENOBUFS;
839                 return NULL;
840         }
841
842         fes->rxev = tle_event_alloc(fe->rxeq, &fe->fs[sidx]);
843         fes->txev = tle_event_alloc(fe->txeq, &fe->fs[sidx]);
844         sprm->recv_ev = fes->rxev;
845         if (op != FWD)
846                 sprm->send_ev = fes->txev;
847
848         RTE_LOG(ERR, USER1, "%s(%u) [%u]={op=%hu, rxev=%p, txev=%p}\n",
849                 __func__, lcore, sidx, op, fes->rxev, fes->txev);
850         if (fes->rxev == NULL || fes->txev == NULL) {
851                 netfe_stream_close(fe, 0);
852                 rte_errno = ENOMEM;
853                 return NULL;
854         }
855
856         if (op == TXONLY || op == FWD) {
857                 tle_event_active(fes->txev, TLE_SEV_DOWN);
858                 fes->stat.txev[TLE_SEV_DOWN]++;
859         }
860
861         if (op != TXONLY) {
862                 tle_event_active(fes->rxev, TLE_SEV_DOWN);
863                 fes->stat.rxev[TLE_SEV_DOWN]++;
864         }
865
866         fes->s = tle_udp_stream_open(becfg.cpu[bidx].ctx, sprm);
867         if (fes->s == NULL) {
868                 rc = rte_errno;
869                 netfe_stream_close(fe, 0);
870                 rte_errno = rc;
871                 return NULL;
872         }
873
874         fes->op = op;
875         fes->family = sprm->local_addr.ss_family;
876
877         fe->sidx = sidx + 1;
878         return fes;
879
880 }
881
882 static inline int
883 netfe_addr_eq(struct sockaddr_storage *l, struct sockaddr_storage *r,
884         uint16_t family)
885 {
886         struct sockaddr_in *l4, *r4;
887         struct sockaddr_in6 *l6, *r6;
888
889         if (family == AF_INET) {
890                 l4 = (struct sockaddr_in *)l;
891                 r4 = (struct sockaddr_in *)r;
892                 return (l4->sin_port == r4->sin_port &&
893                                 l4->sin_addr.s_addr == r4->sin_addr.s_addr);
894         } else {
895                 l6 = (struct sockaddr_in6 *)l;
896                 r6 = (struct sockaddr_in6 *)r;
897                 return (l6->sin6_port == r6->sin6_port &&
898                                 memcmp(&l6->sin6_addr, &r6->sin6_addr,
899                                 sizeof(l6->sin6_addr)));
900         }
901 }
902
903 static inline void
904 netfe_pkt_addr(const struct rte_mbuf *m, struct sockaddr_storage *ps,
905         uint16_t family)
906 {
907         const struct ipv4_hdr *ip4h;
908         const struct ipv6_hdr *ip6h;
909         const struct udp_hdr *udph;
910         struct sockaddr_in *in4;
911         struct sockaddr_in6 *in6;
912
913         NETFE_PKT_DUMP(m);
914
915         udph = rte_pktmbuf_mtod_offset(m, struct udp_hdr *, -m->l4_len);
916
917         if (family == AF_INET) {
918                 in4 = (struct sockaddr_in *)ps;
919                 ip4h = rte_pktmbuf_mtod_offset(m, struct ipv4_hdr *,
920                         -(m->l4_len + m->l3_len));
921                 in4->sin_port = udph->src_port;
922                 in4->sin_addr.s_addr = ip4h->src_addr;
923         } else {
924                 in6 = (struct sockaddr_in6 *)ps;
925                 ip6h = rte_pktmbuf_mtod_offset(m, struct ipv6_hdr *,
926                         -(m->l4_len + m->l3_len));
927                 in6->sin6_port = udph->src_port;
928                 rte_memcpy(&in6->sin6_addr, ip6h->src_addr,
929                         sizeof(in6->sin6_addr));
930         }
931 }
932
933 static inline uint32_t
934 pkt_eq_addr(struct rte_mbuf *pkt[], uint32_t num, uint16_t family,
935         struct sockaddr_storage *cur, struct sockaddr_storage *nxt)
936 {
937         uint32_t i;
938
939         for (i = 0; i != num; i++) {
940                 netfe_pkt_addr(pkt[i], nxt, family);
941                 if (netfe_addr_eq(cur, nxt, family) == 0)
942                         break;
943         }
944
945         return i;
946 }
947
948 static inline void
949 pkt_buf_empty(struct pkt_buf *pb)
950 {
951         uint32_t i;
952
953         for (i = 0; i != pb->num; i++)
954                 rte_pktmbuf_free(pb->pkt[i]);
955
956         pb->num = 0;
957 }
958
959 static inline void
960 pkt_buf_fill(uint32_t lcore, struct pkt_buf *pb, uint32_t dlen)
961 {
962         uint32_t i;
963         int32_t sid;
964
965         sid = rte_lcore_to_socket_id(lcore) + 1;
966
967         for (i = pb->num; i != RTE_DIM(pb->pkt); i++) {
968                 pb->pkt[i] = rte_pktmbuf_alloc(mpool[sid]);
969                 if (pb->pkt[i] == NULL)
970                         break;
971                 rte_pktmbuf_append(pb->pkt[i], dlen);
972         }
973
974         pb->num = i;
975 }
976
977 static struct netfe_stream *
978 find_fwd_dst(uint32_t lcore, struct netfe_stream *fes,
979         const struct sockaddr *sa)
980 {
981         uint32_t rc;
982         struct netfe_stream *fed;
983         struct netfe_lcore *fe;
984         struct tle_udp_stream_param sprm;
985
986         fe = RTE_PER_LCORE(_fe);
987
988         fed = fwd_tbl_lkp(fe, fes->family, sa);
989         if (fed != NULL)
990                 return fed;
991
992         /* create a new stream and put it into the fwd table. */
993
994         sprm = fes->fwdprm.prm;
995
996         /* open forward stream with wildcard remote addr. */
997         memset(&sprm.remote_addr.ss_family + 1, 0,
998                 sizeof(sprm.remote_addr) - sizeof(sprm.remote_addr.ss_family));
999         fed = netfe_stream_open(fe, &sprm, lcore, FWD, fes->fwdprm.bidx);
1000         if (fed == NULL)
1001                 return NULL;
1002
1003         rc = fwd_tbl_add(fe, fes->family, sa, fed);
1004         if (rc != 0) {
1005                 netfe_stream_close(fe, 1);
1006                 fed = NULL;
1007         }
1008
1009         fed->fwdprm.prm.remote_addr = *(const struct sockaddr_storage *)sa;
1010         return fed;
1011 }
1012
1013 static inline void
1014 netfe_tx_process(uint32_t lcore, struct netfe_stream *fes)
1015 {
1016         uint32_t i, k, n;
1017
1018         /* refill with new mbufs. */
1019         pkt_buf_fill(lcore, &fes->pbuf, fes->txlen);
1020
1021         n = fes->pbuf.num;
1022         if (n == 0)
1023                 return;
1024
1025         k = tle_udp_stream_send(fes->s, fes->pbuf.pkt, n, NULL);
1026         NETFE_TRACE("%s(%u): tle_udp_stream_send(%p, %u) returns %u\n",
1027                 __func__, lcore, fes->s, n, k);
1028         fes->stat.txp += k;
1029         fes->stat.drops += n - k;
1030
1031         if (k == 0)
1032                 return;
1033
1034         /* adjust pbuf array. */
1035         fes->pbuf.num = n - k;
1036         for (i = k; i != n; i++)
1037                 fes->pbuf.pkt[i - k] = fes->pbuf.pkt[i];
1038 }
1039
1040
1041 static inline void
1042 netfe_fwd(uint32_t lcore, struct netfe_stream *fes)
1043 {
1044         uint32_t i, j, k, n, x;
1045         uint16_t family;
1046         void *pi0, *pi1, *pt;
1047         struct rte_mbuf **pkt;
1048         struct netfe_stream *fed;
1049         struct sockaddr_storage in[2];
1050
1051         family = fes->family;
1052         n = fes->pbuf.num;
1053         pkt = fes->pbuf.pkt;
1054
1055         if (n == 0)
1056                 return;
1057
1058         in[0].ss_family = family;
1059         in[1].ss_family = family;
1060         pi0 = &in[0];
1061         pi1 = &in[1];
1062
1063         netfe_pkt_addr(pkt[0], pi0, family);
1064
1065         x = 0;
1066         for (i = 0; i != n; i = j) {
1067
1068                 j = i + pkt_eq_addr(&pkt[i + 1],
1069                         n - i - 1, family, pi0, pi1) + 1;
1070
1071                 fed = find_fwd_dst(lcore, fes, (const struct sockaddr *)pi0);
1072                 if (fed != NULL) {
1073
1074                         k = tle_udp_stream_send(fed->s, pkt + i, j - i,
1075                                 (const struct sockaddr *)
1076                                 &fes->fwdprm.prm.remote_addr);
1077
1078                         NETFE_TRACE("%s(%u): tle_udp_stream_send(%p, %u) "
1079                                 "returns %u\n",
1080                                 __func__, lcore, fed->s, j - i, k);
1081                         fed->stat.txp += k;
1082                         fed->stat.drops += j - i - k;
1083                         fes->stat.fwp += k;
1084
1085                 } else {
1086                         NETFE_TRACE("%s(%u, %p): no fwd stream for %u pkts;\n",
1087                                 __func__, lcore, fes->s, j - i);
1088                         for (k = i; k != j; k++) {
1089                                 NETFE_TRACE("%s(%u, %p): free(%p);\n",
1090                                 __func__, lcore, fes->s, pkt[k]);
1091                                 rte_pktmbuf_free(pkt[j]);
1092                         }
1093                         fes->stat.drops += j - i;
1094                 }
1095
1096                 /* copy unforwarded mbufs. */
1097                 for (i += k; i != j; i++, x++)
1098                         pkt[x] = pkt[i];
1099
1100                 /* swap the pointers */
1101                 pt = pi0;
1102                 pi0 = pi1;
1103                 pi1 = pt;
1104         }
1105
1106         fes->pbuf.num = x;
1107
1108         if (x != 0) {
1109                 tle_event_raise(fes->txev);
1110                 fes->stat.txev[TLE_SEV_UP]++;
1111         }
1112
1113         if (n == RTE_DIM(fes->pbuf.pkt)) {
1114                 tle_event_active(fes->rxev, TLE_SEV_UP);
1115                 fes->stat.rxev[TLE_SEV_UP]++;
1116         }
1117 }
1118
1119 static inline void
1120 netfe_rx_process(__rte_unused uint32_t lcore, struct netfe_stream *fes)
1121 {
1122         uint32_t k, n;
1123
1124         n = fes->pbuf.num;
1125         k = RTE_DIM(fes->pbuf.pkt) - n;
1126
1127         /* packet buffer is full, can't receive any new packets. */
1128         if (k == 0) {
1129                 tle_event_idle(fes->rxev);
1130                 fes->stat.rxev[TLE_SEV_IDLE]++;
1131                 return;
1132         }
1133
1134         n = tle_udp_stream_recv(fes->s, fes->pbuf.pkt + n, k);
1135         if (n == 0)
1136                 return;
1137
1138         NETFE_TRACE("%s(%u): tle_udp_stream_recv(%p, %u) returns %u\n",
1139                 __func__, lcore, fes->s, k, n);
1140
1141         fes->pbuf.num += n;
1142         fes->stat.rxp += n;
1143
1144         /* free all received mbufs. */
1145         if (fes->op == RXONLY)
1146                 pkt_buf_empty(&fes->pbuf);
1147         /* mark stream as writable */
1148         else if (k ==  RTE_DIM(fes->pbuf.pkt)) {
1149                 if (fes->op == RXTX) {
1150                         tle_event_active(fes->txev, TLE_SEV_UP);
1151                         fes->stat.txev[TLE_SEV_UP]++;
1152                 } else if (fes->op == FWD) {
1153                         tle_event_raise(fes->txev);
1154                         fes->stat.txev[TLE_SEV_UP]++;
1155                 }
1156         }
1157 }
1158
1159 static inline void
1160 netfe_rxtx_process(__rte_unused uint32_t lcore, struct netfe_stream *fes)
1161 {
1162         uint32_t i, j, k, n;
1163         uint16_t family;
1164         void *pi0, *pi1, *pt;
1165         struct rte_mbuf **pkt;
1166         struct sockaddr_storage in[2];
1167
1168         family = fes->family;
1169         n = fes->pbuf.num;
1170         pkt = fes->pbuf.pkt;
1171
1172         /* there is nothing to send. */
1173         if (n == 0) {
1174                 tle_event_idle(fes->txev);
1175                 fes->stat.txev[TLE_SEV_IDLE]++;
1176                 return;
1177         }
1178
1179         in[0].ss_family = family;
1180         in[1].ss_family = family;
1181         pi0 = &in[0];
1182         pi1 = &in[1];
1183
1184         netfe_pkt_addr(pkt[0], pi0, family);
1185
1186         for (i = 0; i != n; i = j) {
1187
1188                 j = i + pkt_eq_addr(&pkt[i + 1],
1189                         n - i - 1, family, pi0, pi1) + 1;
1190
1191                 k = tle_udp_stream_send(fes->s, pkt + i, j - i,
1192                         (const struct sockaddr *)pi0);
1193
1194                 NETFE_TRACE("%s(%u): tle_udp_stream_send(%p, %u) returns %u\n",
1195                         __func__, lcore, fes->s, j - i, k);
1196                 fes->stat.txp += k;
1197                 fes->stat.drops += j - i - k;
1198
1199                 i += k;
1200
1201                 /* stream send buffer is full */
1202                 if (i != j)
1203                         break;
1204
1205                 /* swap the pointers */
1206                 pt = pi0;
1207                 pi0 = pi1;
1208                 pi1 = pt;
1209         }
1210
1211         /* not able to send anything. */
1212         if (i == 0)
1213                 return;
1214
1215         if (n == RTE_DIM(fes->pbuf.pkt)) {
1216                 /* mark stream as readable */
1217                 tle_event_active(fes->rxev, TLE_SEV_UP);
1218                 fes->stat.rxev[TLE_SEV_UP]++;
1219         }
1220
1221         /* adjust pbuf array. */
1222         fes->pbuf.num = n - i;
1223         for (j = i; j != n; j++)
1224                 pkt[j - i] = pkt[j];
1225 }
1226
1227 static int
1228 netfe_lcore(void *arg)
1229 {
1230         size_t sz;
1231         int32_t rc;
1232         uint32_t i, j, n, lcore, snum;
1233         const struct netfe_lcore_prm *prm;
1234         struct netfe_lcore *fe;
1235         struct tle_evq_param eprm;
1236         struct tle_udp_stream_param sprm;
1237         struct netfe_stream *fes, *fs[MAX_PKT_BURST];
1238
1239         lcore = rte_lcore_id();
1240         prm = arg;
1241
1242         snum = prm->max_streams;
1243         RTE_LOG(NOTICE, USER1, "%s(lcore=%u, nb_streams=%u, max_streams=%u)\n",
1244                 __func__, lcore, prm->nb_streams, snum);
1245
1246         memset(&eprm, 0, sizeof(eprm));
1247         eprm.socket_id = rte_lcore_to_socket_id(lcore);
1248         eprm.max_events = snum;
1249
1250         sz = sizeof(*fe) + snum * sizeof(fe->fs[0]);
1251         fe = rte_zmalloc_socket(NULL, sz, RTE_CACHE_LINE_SIZE,
1252                 rte_lcore_to_socket_id(lcore));
1253
1254         if (fe == NULL) {
1255                 RTE_LOG(ERR, USER1, "%s:%d failed to allocate %zu bytes\n",
1256                         __func__, __LINE__, sz);
1257                 return -ENOMEM;
1258         }
1259
1260         RTE_PER_LCORE(_fe) = fe;
1261
1262         fe->snum = snum;
1263         fe->fs = (struct netfe_stream *)(fe + 1);
1264
1265         fe->rxeq = tle_evq_create(&eprm);
1266         fe->txeq = tle_evq_create(&eprm);
1267
1268         RTE_LOG(ERR, USER1, "%s(%u) rx evq=%p, tx evq=%p\n",
1269                 __func__, lcore, fe->rxeq, fe->txeq);
1270         if (fe->rxeq == NULL || fe->txeq == NULL)
1271                 return -ENOMEM;
1272
1273         rc = fwd_tbl_init(fe, AF_INET, lcore);
1274         RTE_LOG(ERR, USER1, "%s(%u) fwd_tbl_init(%u) returns %d\n",
1275                 __func__, lcore, AF_INET, rc);
1276         if (rc != 0)
1277                 return rc;
1278
1279         rc = fwd_tbl_init(fe, AF_INET6, lcore);
1280         RTE_LOG(ERR, USER1, "%s(%u) fwd_tbl_init(%u) returns %d\n",
1281                 __func__, lcore, AF_INET6, rc);
1282         if (rc != 0)
1283                 return rc;
1284
1285         /* open all requested streams. */
1286         for (i = 0; i != prm->nb_streams; i++) {
1287                 sprm = prm->stream[i].sprm.prm;
1288                 fes = netfe_stream_open(fe, &sprm, lcore, prm->stream[i].op,
1289                         prm->stream[i].sprm.bidx);
1290                 if (fes == NULL) {
1291                         rc = -rte_errno;
1292                         break;
1293                 }
1294
1295                 netfe_stream_dump(fes);
1296
1297                 if (prm->stream[i].op == FWD) {
1298                         fes->fwdprm = prm->stream[i].fprm;
1299                         rc = fwd_tbl_add(fe,
1300                                 prm->stream[i].fprm.prm.remote_addr.ss_family,
1301                                 (const struct sockaddr *)
1302                                 &prm->stream[i].fprm.prm.remote_addr,
1303                                 fes);
1304                         if (rc != 0) {
1305                                 netfe_stream_close(fe, 1);
1306                                 break;
1307                         }
1308                 } else if (prm->stream[i].op == TXONLY) {
1309                         fes->txlen = prm->stream[i].txlen;
1310                         fes->raddr = sprm.remote_addr;
1311                 }
1312         }
1313
1314         while (fe->sidx >= prm->nb_streams && force_quit == 0) {
1315
1316                 n = tle_evq_get(fe->rxeq, (const void **)(uintptr_t)fs,
1317                         RTE_DIM(fs));
1318
1319                 if (n != 0) {
1320                         NETFE_TRACE("%s(%u): tle_evq_get(rxevq=%p) "
1321                                 "returns %u\n",
1322                                 __func__, lcore, fe->rxeq, n);
1323                         for (j = 0; j != n; j++)
1324                                 netfe_rx_process(lcore, fs[j]);
1325                 }
1326
1327                 n = tle_evq_get(fe->txeq, (const void **)(uintptr_t)fs,
1328                         RTE_DIM(fs));
1329
1330                 if (n != 0) {
1331                         NETFE_TRACE("%s(%u): tle_evq_get(txevq=%p) "
1332                                 "returns %u\n",
1333                                 __func__, lcore, fe->txeq, n);
1334                         for (j = 0; j != n; j++) {
1335                                 if (fs[j]->op == RXTX)
1336                                         netfe_rxtx_process(lcore, fs[j]);
1337                                 else if (fs[j]->op == FWD)
1338                                         netfe_fwd(lcore, fs[j]);
1339                                 else if (fs[j]->op == TXONLY)
1340                                         netfe_tx_process(lcore, fs[j]);
1341                         }
1342                 }
1343         }
1344
1345         RTE_LOG(NOTICE, USER1, "%s(lcore=%u) finish\n",
1346                 __func__, lcore);
1347
1348         while (fe->sidx != 0) {
1349
1350                 i = fe->sidx - 1;
1351                 netfe_stream_dump(fe->fs + i);
1352                 netfe_stream_close(fe, 1);
1353         }
1354
1355         tle_evq_destroy(fe->txeq);
1356         tle_evq_destroy(fe->rxeq);
1357         rte_free(fe);
1358
1359         return rc;
1360 }
1361
1362 static inline void
1363 netbe_rx(struct netbe_lcore *lc, uint32_t pidx)
1364 {
1365         uint32_t j, k, n;
1366         struct rte_mbuf *pkt[MAX_PKT_BURST];
1367         struct rte_mbuf *rp[MAX_PKT_BURST];
1368         int32_t rc[MAX_PKT_BURST];
1369
1370         n = rte_eth_rx_burst(lc->prt[pidx].port.id,
1371                         lc->prt[pidx].rxqid, pkt, RTE_DIM(pkt));
1372         if (n == 0)
1373                 return;
1374
1375         lc->prt[pidx].rx_stat.in += n;
1376         NETBE_TRACE("%s(%u): rte_eth_rx_burst(%u, %u) returns %u\n",
1377                 __func__, lc->id, lc->prt[pidx].port.id, lc->prt[pidx].rxqid,
1378                 n);
1379
1380         k = tle_udp_rx_bulk(lc->prt[pidx].dev, pkt, rp, rc, n);
1381
1382         lc->prt[pidx].rx_stat.up += k;
1383         lc->prt[pidx].rx_stat.drop += n - k;
1384         NETBE_TRACE("%s(%u): tle_udp_rx_bulk(%p, %u) returns %u\n",
1385                 __func__, lc->id, lc->prt[pidx].dev, n, k);
1386
1387         for (j = 0; j != n - k; j++) {
1388                 NETBE_TRACE("%s:%d(port=%u) rp[%u]={%p, %d};\n",
1389                         __func__, __LINE__, lc->prt[pidx].port.id,
1390                         j, rp[j], rc[j]);
1391                 rte_pktmbuf_free(rp[j]);
1392         }
1393 }
1394
1395 static inline void
1396 netbe_tx(struct netbe_lcore *lc, uint32_t pidx)
1397 {
1398         uint32_t j, k, n;
1399         struct rte_mbuf **mb;
1400
1401         n = lc->prt[pidx].tx_buf.num;
1402         k = RTE_DIM(lc->prt[pidx].tx_buf.pkt) - n;
1403         mb = lc->prt[pidx].tx_buf.pkt;
1404
1405         if (k >= RTE_DIM(lc->prt[pidx].tx_buf.pkt) / 2) {
1406                 j = tle_udp_tx_bulk(lc->prt[pidx].dev, mb + n, k);
1407                 n += j;
1408                 lc->prt[pidx].tx_stat.down += j;
1409         }
1410
1411         if (n == 0)
1412                 return;
1413
1414         NETBE_TRACE("%s(%u): tle_udp_tx_bulk(%p) returns %u,\n"
1415                 "total pkts to send: %u\n",
1416                 __func__, lc->id, lc->prt[pidx].dev, j, n);
1417
1418         for (j = 0; j != n; j++)
1419                 NETBE_PKT_DUMP(mb[j]);
1420
1421         k = rte_eth_tx_burst(lc->prt[pidx].port.id,
1422                         lc->prt[pidx].txqid, mb, n);
1423
1424         lc->prt[pidx].tx_stat.out += k;
1425         lc->prt[pidx].tx_stat.drop += n - k;
1426         NETBE_TRACE("%s(%u): rte_eth_tx_burst(%u, %u, %u) returns %u\n",
1427                 __func__, lc->id, lc->prt[pidx].port.id, lc->prt[pidx].txqid,
1428                 n, k);
1429
1430         lc->prt[pidx].tx_buf.num = n - k;
1431         if (k != 0)
1432                 for (j = k; j != n; j++)
1433                         mb[j - k] = mb[j];
1434 }
1435
1436 static int
1437 netbe_lcore(void *arg)
1438 {
1439         uint32_t i, j;
1440         int32_t rc;
1441         struct netbe_lcore *lc;
1442
1443         lc = arg;
1444         RTE_LOG(NOTICE, USER1, "%s(lcore=%u, udp_ctx: %p) start\n",
1445                 __func__, lc->id, lc->ctx);
1446
1447         /*
1448          * ???????
1449          * wait for FE lcores to start, so BE dont' drop any packets
1450          * because corresponding streams not opened yet by FE.
1451          * usefull when used with pcap PMDS.
1452          * think better way, or should this timeout be a cmdlien parameter.
1453          * ???????
1454          */
1455         rte_delay_ms(10);
1456
1457         for (i = 0; i != lc->prt_num; i++) {
1458                 RTE_LOG(NOTICE, USER1, "%s:%u(port=%u, udp_dev: %p)\n",
1459                         __func__, i, lc->prt[i].port.id, lc->prt[i].dev);
1460                 rc = setup_rx_cb(&lc->prt[i].port, lc);
1461                 if (rc < 0)
1462                         sig_handle(SIGQUIT);
1463         }
1464
1465         while (force_quit == 0) {
1466                 for (i = 0; i != lc->prt_num; i++) {
1467                         netbe_rx(lc, i);
1468                         netbe_tx(lc, i);
1469                 }
1470         }
1471
1472         RTE_LOG(NOTICE, USER1, "%s(lcore=%u, udp_ctx: %p) finish\n",
1473                 __func__, lc->id, lc->ctx);
1474         for (i = 0; i != lc->prt_num; i++) {
1475                 RTE_LOG(NOTICE, USER1, "%s:%u(port=%u) "
1476                         "rx_stats={"
1477                         "in=%" PRIu64 ",up=%" PRIu64 ",drop=%" PRIu64 "}, "
1478                         "tx_stats={"
1479                         "in=%" PRIu64 ",up=%" PRIu64 ",drop=%" PRIu64 "};\n",
1480                         __func__, i, lc->prt[i].port.id,
1481                         lc->prt[i].rx_stat.in,
1482                         lc->prt[i].rx_stat.up,
1483                         lc->prt[i].rx_stat.drop,
1484                         lc->prt[i].tx_stat.down,
1485                         lc->prt[i].tx_stat.out,
1486                         lc->prt[i].tx_stat.drop);
1487         }
1488
1489
1490         for (i = 0; i != lc->prt_num; i++) {
1491                 for (j = 0; j != lc->prt[i].tx_buf.num; j++)
1492                         rte_pktmbuf_free(lc->prt[i].tx_buf.pkt[j]);
1493         }
1494
1495         return 0;
1496 }
1497
1498 static int
1499 netfe_lcore_cmp(const void *s1, const void *s2)
1500 {
1501         const struct netfe_stream_prm *p1, *p2;
1502
1503         p1 = s1;
1504         p2 = s2;
1505         return p1->lcore - p2->lcore;
1506 }
1507
1508 /*
1509  * Helper functions, finds BE by given local and remote addresses.
1510  */
1511 static int
1512 netbe_find4(const struct in_addr *laddr, const struct in_addr *raddr)
1513 {
1514         uint32_t i, j;
1515         int32_t rc;
1516         uint32_t idx;
1517         struct netbe_lcore *bc;
1518
1519         if (laddr->s_addr == INADDR_ANY) {
1520
1521                 /* we have exactly one BE, use it for all traffic */
1522                 if (becfg.cpu_num == 1)
1523                         return 0;
1524
1525                 /* search by remote address. */
1526                 for (i = 0; i != becfg.cpu_num; i++) {
1527                         bc = becfg.cpu + i;
1528                         rc = rte_lpm_lookup(bc->lpm4,
1529                                 rte_be_to_cpu_32(raddr->s_addr), &idx);
1530                         if (rc == 0)
1531                                 return i;
1532                 }
1533         } else {
1534
1535                 /* search by local address */
1536                 for (i = 0; i != becfg.cpu_num; i++) {
1537                         bc = becfg.cpu + i;
1538                         for (j = 0; j != bc->prt_num; j++)
1539                                 if (laddr->s_addr == bc->prt[j].port.ipv4)
1540                                         return i;
1541                 }
1542         }
1543
1544         return -ENOENT;
1545 }
1546
1547 static int
1548 netbe_find6(const struct in6_addr *laddr, const struct in6_addr *raddr)
1549 {
1550         uint32_t i, j;
1551         int32_t rc;
1552         uint8_t idx;
1553         struct netbe_lcore *bc;
1554
1555         if (memcmp(laddr, &in6addr_any, sizeof(*laddr)) == 0) {
1556
1557                 /* we have exactly one BE, use it for all traffic */
1558                 if (becfg.cpu_num == 1)
1559                         return 0;
1560
1561                 /* search by remote address. */
1562                 for (i = 0; i != becfg.cpu_num; i++) {
1563                         bc = becfg.cpu + i;
1564                         rc = rte_lpm6_lookup(bc->lpm6,
1565                                 (uint8_t *)(uintptr_t)raddr->s6_addr, &idx);
1566                         if (rc == 0)
1567                                 return i;
1568                 }
1569         } else {
1570                 /* search by local address */
1571                 for (i = 0; i != becfg.cpu_num; i++) {
1572                         bc = becfg.cpu + i;
1573                         for (j = 0; j != bc->prt_num; j++)
1574                                 if (memcmp(laddr, &bc->prt[j].port.ipv6,
1575                                                 sizeof(*laddr)) == 0)
1576                                         return i;
1577                 }
1578         }
1579
1580         return -ENOENT;
1581 }
1582
1583 static int
1584 netbe_find(const struct tle_udp_stream_param *p)
1585 {
1586         const struct sockaddr_in *l4, *r4;
1587         const struct sockaddr_in6 *l6, *r6;
1588
1589         if (p->local_addr.ss_family == AF_INET) {
1590                 l4 = (const struct sockaddr_in *)&p->local_addr;
1591                 r4 = (const struct sockaddr_in *)&p->remote_addr;
1592                 return netbe_find4(&l4->sin_addr, &r4->sin_addr);
1593         } else if (p->local_addr.ss_family == AF_INET6) {
1594                 l6 = (const struct sockaddr_in6 *)&p->local_addr;
1595                 r6 = (const struct sockaddr_in6 *)&p->remote_addr;
1596                 return netbe_find6(&l6->sin6_addr, &r6->sin6_addr);
1597         }
1598         return -EINVAL;
1599 }
1600
1601 static int
1602 netfe_sprm_flll_be(struct netfe_sprm *sp, uint32_t line)
1603 {
1604         int32_t bidx;
1605
1606         bidx = netbe_find(&sp->prm);
1607         if (bidx < 0) {
1608                 RTE_LOG(ERR, USER1, "%s(line=%u): no BE for that stream\n",
1609                         __func__, line);
1610                 return -EINVAL;
1611         }
1612         sp->bidx = bidx;
1613         return 0;
1614 }
1615
1616 /* start front-end processing. */
1617 static int
1618 netfe_launch(struct netfe_lcore_prm *lprm)
1619 {
1620         uint32_t i, j, k, lc, ln, mi;
1621         struct netfe_lcore_prm feprm[RTE_MAX_LCORE];
1622
1623         /* determine on what BE each stream should be open. */
1624         for (i = 0; i != lprm->nb_streams; i++) {
1625
1626                 lc = lprm->stream[i].lcore;
1627                 ln = lprm->stream[i].line;
1628
1629                 if (netfe_sprm_flll_be(&lprm->stream[i].sprm, ln) != 0 ||
1630                                 (lprm->stream[i].op == FWD &&
1631                                 netfe_sprm_flll_be(&lprm->stream[i].fprm,
1632                                 ln) != 0))
1633                         return -EINVAL;
1634         }
1635
1636         /* group all fe parameters by lcore. */
1637
1638         memset(feprm, 0, sizeof(feprm));
1639         qsort(lprm->stream, lprm->nb_streams, sizeof(lprm->stream[0]),
1640                 netfe_lcore_cmp);
1641
1642         k = 0;
1643         mi = UINT32_MAX;
1644         for (i = 0; i != lprm->nb_streams; i = j) {
1645
1646                 lc = lprm->stream[i].lcore;
1647                 ln = lprm->stream[i].line;
1648
1649                 if (rte_lcore_is_enabled(lc) == 0) {
1650                         RTE_LOG(ERR, USER1,
1651                                 "%s(line=%u): lcore %u is not enabled\n",
1652                                 __func__, ln, lc);
1653                         return -EINVAL;
1654                 }
1655
1656                 if (rte_get_master_lcore() == lc)
1657                         mi = k;
1658                 else if (rte_eal_get_lcore_state(lc) == RUNNING) {
1659                         RTE_LOG(ERR, USER1,
1660                                 "%s(line=%u): lcore %u already in use\n",
1661                                 __func__, ln, lc);
1662                         return -EINVAL;
1663                 }
1664
1665                 for (j = i + 1; j != lprm->nb_streams &&
1666                                 lc == lprm->stream[j].lcore;
1667                                 j++)
1668                         ;
1669
1670                 feprm[k].max_streams = lprm->max_streams;
1671                 feprm[k].nb_streams = j - i;
1672                 feprm[k].stream = lprm->stream + i;
1673                 k++;
1674         }
1675
1676         /* launch all slave FE lcores. */
1677         for (i = 0; i != k; i++) {
1678                 if (i != mi)
1679                         rte_eal_remote_launch(netfe_lcore, feprm + i,
1680                                 feprm[i].stream[0].lcore);
1681         }
1682
1683         /* launch FE at master lcore. */
1684         if (mi != UINT32_MAX)
1685                 netfe_lcore(feprm + mi);
1686
1687         return 0;
1688 }
1689
1690 int
1691 main(int argc, char *argv[])
1692 {
1693         int32_t opt, opt_idx, rc;
1694         uint32_t i;
1695         uint64_t v;
1696         struct tle_udp_ctx_param ctx_prm;
1697         struct netfe_lcore_prm feprm;
1698         struct rte_eth_stats stats;
1699         char fecfg_fname[PATH_MAX + 1];
1700         char becfg_fname[PATH_MAX + 1];
1701
1702         fecfg_fname[0] = 0;
1703         becfg_fname[0] = 0;
1704
1705         rc = rte_eal_init(argc, argv);
1706         if (rc < 0)
1707                 rte_exit(EXIT_FAILURE,
1708                         "%s: rte_eal_init failed with error code: %d\n",
1709                         __func__, rc);
1710
1711         memset(&ctx_prm, 0, sizeof(ctx_prm));
1712
1713         argc -= rc;
1714         argv += rc;
1715
1716         optind = 0;
1717         optarg = NULL;
1718         while ((opt = getopt_long(argc, argv, "B:PR:S:b:f:s:", long_opt,
1719                         &opt_idx)) != EOF) {
1720                 if (opt == OPT_SHORT_SBULK) {
1721                         rc = parse_uint_val(NULL, optarg, &v);
1722                         if (rc < 0)
1723                                 rte_exit(EXIT_FAILURE, "%s: invalid value: %s "
1724                                         "for option: \'%c\'\n",
1725                                         __func__, optarg, opt);
1726                         ctx_prm.send_bulk_size = v;
1727                 } else if (opt == OPT_SHORT_PROMISC) {
1728                         becfg.promisc = 1;
1729                 } else if (opt == OPT_SHORT_RBUFS) {
1730                         rc = parse_uint_val(NULL, optarg, &v);
1731                         if (rc < 0)
1732                                 rte_exit(EXIT_FAILURE, "%s: invalid value: %s "
1733                                         "for option: \'%c\'\n",
1734                                         __func__, optarg, opt);
1735                         ctx_prm.max_stream_rbufs = v;
1736                 } else if (opt == OPT_SHORT_SBUFS) {
1737                         rc = parse_uint_val(NULL, optarg, &v);
1738                         if (rc < 0)
1739                                 rte_exit(EXIT_FAILURE, "%s: invalid value: %s "
1740                                         "for option: \'%c\'\n",
1741                                         __func__, optarg, opt);
1742                         ctx_prm.max_stream_sbufs = v;
1743                 } else if (opt == OPT_SHORT_STREAMS) {
1744                         rc = parse_uint_val(NULL, optarg, &v);
1745                         if (rc < 0)
1746                                 rte_exit(EXIT_FAILURE, "%s: invalid value: %s "
1747                                         "for option: \'%c\'\n",
1748                                         __func__, optarg, opt);
1749                         ctx_prm.max_streams = v;
1750                 } else if (opt == OPT_SHORT_BECFG) {
1751                         snprintf(becfg_fname, sizeof(becfg_fname), "%s",
1752                                 optarg);
1753                 } else if (opt == OPT_SHORT_FECFG) {
1754                         snprintf(fecfg_fname, sizeof(fecfg_fname), "%s",
1755                                 optarg);
1756                 } else {
1757                         rte_exit(EXIT_FAILURE,
1758                                 "%s: unknown option: \'%c\'\n",
1759                                 __func__, opt);
1760                 }
1761         }
1762
1763         signal(SIGINT, sig_handle);
1764
1765         netbe_port_init(&becfg, argc - optind, argv + optind);
1766         netbe_lcore_init(&becfg, &ctx_prm);
1767
1768         if ((rc = netbe_dest_init(becfg_fname, &becfg)) != 0)
1769                 sig_handle(SIGQUIT);
1770
1771         for (i = 0; i != becfg.prt_num && rc == 0; i++) {
1772                 RTE_LOG(NOTICE, USER1, "%s: starting port %u\n",
1773                         __func__, becfg.prt[i].id);
1774                 rc = rte_eth_dev_start(becfg.prt[i].id);
1775                 if (rc != 0) {
1776                         RTE_LOG(ERR, USER1,
1777                                 "%s: rte_eth_dev_start(%u) returned "
1778                                 "error code: %d\n",
1779                                 __func__, becfg.prt[i].id, rc);
1780                         sig_handle(SIGQUIT);
1781                 }
1782         }
1783
1784         feprm.max_streams = ctx_prm.max_streams * becfg.cpu_num;
1785         if (rc == 0 && (rc = netfe_parse_cfg(fecfg_fname, &feprm)) != 0)
1786                 sig_handle(SIGQUIT);
1787
1788         for (i = 0; rc == 0 && i != becfg.cpu_num; i++) {
1789                  rte_eal_remote_launch(netbe_lcore, becfg.cpu + i,
1790                         becfg.cpu[i].id);
1791         }
1792
1793         if (rc == 0 && (rc = netfe_launch(&feprm)) != 0)
1794                 sig_handle(SIGQUIT);
1795
1796         rte_eal_mp_wait_lcore();
1797
1798         for (i = 0; i != becfg.prt_num; i++) {
1799                 RTE_LOG(NOTICE, USER1, "%s: stoping port %u\n",
1800                         __func__, becfg.prt[i].id);
1801                 rte_eth_stats_get(becfg.prt[i].id, &stats);
1802                 RTE_LOG(NOTICE, USER1, "port %u stats={\n"
1803                         "ipackets=%" PRIu64 ";"
1804                         "ibytes=%" PRIu64 ";"
1805                         "ierrors=%" PRIu64 ";\n"
1806                         "opackets=%" PRIu64 ";"
1807                         "obytes=%" PRIu64 ";"
1808                         "oerrors=%" PRIu64 ";\n"
1809                         "}\n",
1810                         becfg.prt[i].id,
1811                         stats.ipackets,
1812                         stats.ibytes,
1813                         stats.ierrors,
1814                         stats.opackets,
1815                         stats.obytes,
1816                         stats.oerrors);
1817                 rte_eth_dev_stop(becfg.prt[i].id);
1818         }
1819
1820         netbe_lcore_fini(&becfg);
1821
1822         return 0;
1823 }