udpfw: dynamic allocation of netbe_* structures
[tldk.git] / examples / udpfwd / main.c
1 /*
2  * Copyright (c) 2016  Intel Corporation.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include "netbe.h"
17 #include "parse.h"
18
19 #define MAX_RULES       0x100
20 #define MAX_TBL8        0x800
21
22 #define RX_RING_SIZE    0x400
23 #define TX_RING_SIZE    0x800
24
25 #define MPOOL_CACHE_SIZE        0x100
26 #define MPOOL_NB_BUF            0x20000
27
28 #define FRAG_MBUF_BUF_SIZE      (RTE_PKTMBUF_HEADROOM + TLE_UDP_MAX_HDR)
29 #define FRAG_TTL                MS_PER_S
30 #define FRAG_TBL_BUCKET_ENTRIES 16
31
32 #define FIRST_PORT      0x8000
33
34 #define RX_CSUM_OFFLOAD (DEV_RX_OFFLOAD_IPV4_CKSUM | DEV_RX_OFFLOAD_UDP_CKSUM)
35 #define TX_CSUM_OFFLOAD (DEV_TX_OFFLOAD_IPV4_CKSUM | DEV_TX_OFFLOAD_UDP_CKSUM)
36
37 #define OPT_SHORT_SBULK         'B'
38 #define OPT_LONG_SBULK          "sburst"
39
40 #define OPT_SHORT_PROMISC       'P'
41 #define OPT_LONG_PROMISC        "promisc"
42
43 #define OPT_SHORT_RBUFS 'R'
44 #define OPT_LONG_RBUFS  "rbufs"
45
46 #define OPT_SHORT_SBUFS 'S'
47 #define OPT_LONG_SBUFS  "sbufs"
48
49 #define OPT_SHORT_STREAMS       's'
50 #define OPT_LONG_STREAMS        "streams"
51
52 #define OPT_SHORT_FECFG 'f'
53 #define OPT_LONG_FECFG  "fecfg"
54
55 #define OPT_SHORT_BECFG 'b'
56 #define OPT_LONG_BECFG  "becfg"
57
58 RTE_DEFINE_PER_LCORE(struct netbe_lcore *, _be);
59 RTE_DEFINE_PER_LCORE(struct netfe_lcore *, _fe);
60
61 #include "fwdtbl.h"
62
63 static const struct option long_opt[] = {
64         {OPT_LONG_BECFG, 1, 0, OPT_SHORT_BECFG},
65         {OPT_LONG_FECFG, 1, 0, OPT_SHORT_FECFG},
66         {OPT_LONG_PROMISC, 0, 0, OPT_SHORT_PROMISC},
67         {OPT_LONG_RBUFS, 1, 0, OPT_SHORT_RBUFS},
68         {OPT_LONG_SBUFS, 1, 0, OPT_SHORT_SBUFS},
69         {OPT_LONG_SBULK, 1, 0, OPT_SHORT_SBULK},
70         {OPT_LONG_STREAMS, 1, 0, OPT_SHORT_STREAMS},
71         {NULL, 0, 0, 0}
72 };
73
74 /**
75  * IPv4 Input size in bytes for RSS hash key calculation.
76  * source address, destination address, source port, and destination port.
77  */
78 #define IPV4_TUPLE_SIZE 12
79
80 /**
81  * IPv6 Input size in bytes for RSS hash key calculation.
82  * source address, destination address, source port, and destination port.
83  */
84 #define IPV6_TUPLE_SIZE 36
85
86 /**
87  * Location to be modified to create the IPv4 hash key which helps
88  * to distribute packets based on the destination UDP port.
89  */
90 #define RSS_HASH_KEY_DEST_PORT_LOC_IPV4 15
91
92 /*
93  * Location to be modified to create the IPv6 hash key which helps
94  * to distribute packets based on the destination UDP port.
95  */
96 #define RSS_HASH_KEY_DEST_PORT_LOC_IPV6 39
97
98 /**
99  * Size of the rte_eth_rss_reta_entry64 array to update through
100  * rte_eth_dev_rss_reta_update.
101  */
102 #define RSS_RETA_CONF_ARRAY_SIZE (ETH_RSS_RETA_SIZE_512/RTE_RETA_GROUP_SIZE)
103
104 #define NETBE_REALLOC(loc, n) do { \
105         (loc) = rte_realloc((loc), sizeof(*(loc)) * (n), RTE_CACHE_LINE_SIZE); \
106         if ((loc) == NULL) { \
107                 RTE_LOG(ERR, USER1, \
108                         "%s: failed to reallocate memory\n", \
109                         __func__); \
110                 return -ENOMEM; \
111         } \
112 } while (0)
113
114 static volatile int force_quit;
115
116 static struct netbe_cfg becfg;
117 static struct rte_mempool *mpool[RTE_MAX_NUMA_NODES + 1];
118 static struct rte_mempool *frag_mpool[RTE_MAX_NUMA_NODES + 1];
119
120 static const struct rte_eth_conf port_conf_default = {
121         .rxmode = {
122                 .max_rx_pkt_len = ETHER_MAX_VLAN_FRAME_LEN,
123                 .hw_vlan_strip = 1,
124                 .jumbo_frame = 1,
125         },
126 };
127
128 #include "parse.h"
129 #include "main_dpdk_legacy.h"
130
131 static void
132 sig_handle(int signum)
133 {
134         RTE_LOG(ERR, USER1, "%s(%d)\n", __func__, signum);
135         force_quit = 1;
136 }
137
138 static void
139 prepare_hash_key(struct netbe_port *uprt, uint8_t key_size, uint16_t family)
140 {
141         uint32_t align_nb_q;
142
143         align_nb_q = rte_align32pow2(uprt->nb_lcore);
144         memset(uprt->hash_key, 0, RSS_HASH_KEY_LENGTH);
145         uprt->hash_key_size = key_size;
146         if (family == AF_INET)
147                 uprt->hash_key[RSS_HASH_KEY_DEST_PORT_LOC_IPV4] = align_nb_q;
148         else
149                 uprt->hash_key[RSS_HASH_KEY_DEST_PORT_LOC_IPV6] = align_nb_q;
150 }
151
152 static uint32_t
153 qidx_from_hash_index(uint32_t hash, uint32_t align_nb_q)
154 {
155         uint32_t i, nb_bit, q;
156
157         nb_bit = (sizeof(uint32_t) * CHAR_BIT) - __builtin_clz(align_nb_q - 1);
158         q = (hash & 1);
159         for (i = 1; i < nb_bit; i++) {
160                 hash >>= 1;
161                 q <<= 1;
162                 q |= (hash & 1);
163         }
164
165         return q;
166 }
167
168 static int
169 update_rss_conf(struct netbe_port *uprt,
170         const struct rte_eth_dev_info *dev_info,
171         struct rte_eth_conf *port_conf)
172 {
173         uint8_t hash_key_size;
174
175         if (uprt->nb_lcore > 1) {
176                 if (dev_info->hash_key_size > 0)
177                         hash_key_size = dev_info->hash_key_size;
178                 else {
179                         RTE_LOG(ERR, USER1,
180                                 "%s: dev_info did not provide a valid hash key size\n",
181                                 __func__);
182                         return -EINVAL;
183                 }
184
185                 if (uprt->ipv4 != INADDR_ANY &&
186                                 memcmp(&uprt->ipv6, &in6addr_any,
187                                 sizeof(uprt->ipv6)) != 0) {
188                         RTE_LOG(ERR, USER1,
189                                 "%s: RSS for both IPv4 and IPv6 not supported!\n",
190                                 __func__);
191                         return -EINVAL;
192                 } else if (uprt->ipv4 != INADDR_ANY) {
193                         prepare_hash_key(uprt, hash_key_size, AF_INET);
194                 } else if (memcmp(&uprt->ipv6, &in6addr_any, sizeof(uprt->ipv6))
195                                 != 0) {
196                         prepare_hash_key(uprt, hash_key_size, AF_INET6);
197                 } else {
198                         RTE_LOG(ERR, USER1,
199                                 "%s: No IPv4 or IPv6 address is found!\n",
200                                 __func__);
201                         return -EINVAL;
202                 }
203                 port_conf->rxmode.mq_mode = ETH_MQ_RX_RSS;
204                 port_conf->rx_adv_conf.rss_conf.rss_hf = ETH_RSS_UDP;
205                 port_conf->rx_adv_conf.rss_conf.rss_key_len = hash_key_size;
206                 port_conf->rx_adv_conf.rss_conf.rss_key = uprt->hash_key;
207         }
208
209         return 0;
210 }
211
212 static int
213 update_rss_reta(struct netbe_port *uprt,
214         const struct rte_eth_dev_info *dev_info)
215 {
216         struct rte_eth_rss_reta_entry64 reta_conf[RSS_RETA_CONF_ARRAY_SIZE];
217         int32_t i, rc, align_nb_q;
218         int32_t q_index, idx, shift;
219
220         if (uprt->nb_lcore > 1) {
221                 if (dev_info->reta_size == 0) {
222                         RTE_LOG(ERR, USER1,
223                                 "%s: Redirection table size 0 is invalid for RSS\n",
224                                 __func__);
225                         return -EINVAL;
226                 }
227                 RTE_LOG(NOTICE, USER1,
228                         "%s: The reta size of port %d is %u\n",
229                         __func__, uprt->id, dev_info->reta_size);
230
231                 if (dev_info->reta_size > ETH_RSS_RETA_SIZE_512) {
232                         RTE_LOG(ERR, USER1,
233                                 "%s: More than %u entries of Reta not supported\n",
234                                 __func__, ETH_RSS_RETA_SIZE_512);
235                         return -EINVAL;
236                 }
237
238                 memset(reta_conf, 0, sizeof(reta_conf));
239                 align_nb_q = rte_align32pow2(uprt->nb_lcore);
240                 for (i = 0; i < align_nb_q; i++) {
241                         q_index = qidx_from_hash_index(i, align_nb_q) %
242                                                 uprt->nb_lcore;
243
244                         idx = i / RTE_RETA_GROUP_SIZE;
245                         shift = i % RTE_RETA_GROUP_SIZE;
246                         reta_conf[idx].mask |= (1ULL << shift);
247                         reta_conf[idx].reta[shift] = q_index;
248                         RTE_LOG(NOTICE, USER1,
249                                 "%s: port=%u RSS reta conf: hash=%u, q=%u\n",
250                                 __func__, uprt->id, i, q_index);
251                 }
252
253                 rc = rte_eth_dev_rss_reta_update(uprt->id,
254                                 reta_conf, dev_info->reta_size);
255                 if (rc != 0) {
256                         RTE_LOG(ERR, USER1,
257                                 "%s: Bad redirection table parameter, rc = %d\n",
258                                 __func__, rc);
259                         return rc;
260                 }
261         }
262
263         return 0;
264 }
265
266 /*
267  * Initilise DPDK port.
268  * In current version, multi-queue per port is used.
269  */
270 static int
271 port_init(struct netbe_port *uprt)
272 {
273         int32_t rc;
274         struct rte_eth_conf port_conf;
275         struct rte_eth_dev_info dev_info;
276
277         rte_eth_dev_info_get(uprt->id, &dev_info);
278         if ((dev_info.rx_offload_capa & uprt->rx_offload) != uprt->rx_offload) {
279                 RTE_LOG(ERR, USER1,
280                         "port#%u supported/requested RX offloads don't match, "
281                         "supported: %#x, requested: %#x;\n",
282                         uprt->id, dev_info.rx_offload_capa, uprt->rx_offload);
283                 return -EINVAL;
284         }
285         if ((dev_info.tx_offload_capa & uprt->tx_offload) != uprt->tx_offload) {
286                 RTE_LOG(ERR, USER1,
287                         "port#%u supported/requested TX offloads don't match, "
288                         "supported: %#x, requested: %#x;\n",
289                         uprt->id, dev_info.tx_offload_capa, uprt->tx_offload);
290                 return -EINVAL;
291         }
292
293         port_conf = port_conf_default;
294         if ((uprt->rx_offload & RX_CSUM_OFFLOAD) != 0) {
295                 RTE_LOG(ERR, USER1, "%s(%u): enabling RX csum offload;\n",
296                         __func__, uprt->id);
297                 port_conf.rxmode.hw_ip_checksum = 1;
298         }
299         port_conf.rxmode.max_rx_pkt_len = uprt->mtu + ETHER_CRC_LEN;
300
301         rc = update_rss_conf(uprt, &dev_info, &port_conf);
302         if (rc != 0)
303                 return rc;
304
305         rc = rte_eth_dev_configure(uprt->id, uprt->nb_lcore, uprt->nb_lcore,
306                         &port_conf);
307         RTE_LOG(NOTICE, USER1,
308                 "%s: rte_eth_dev_configure(prt_id=%u, nb_rxq=%u, nb_txq=%u) "
309                 "returns %d;\n", __func__, uprt->id, uprt->nb_lcore,
310                 uprt->nb_lcore, rc);
311         if (rc != 0)
312                 return rc;
313
314         return 0;
315 }
316
317 static int
318 queue_init(struct netbe_port *uprt, struct rte_mempool *mp)
319 {
320         int32_t socket, rc;
321         uint16_t q;
322         struct rte_eth_dev_info dev_info;
323
324         rte_eth_dev_info_get(uprt->id, &dev_info);
325
326         socket = rte_eth_dev_socket_id(uprt->id);
327
328         dev_info.default_rxconf.rx_drop_en = 1;
329
330         dev_info.default_txconf.tx_free_thresh = TX_RING_SIZE / 2;
331         if (uprt->tx_offload != 0) {
332                 RTE_LOG(ERR, USER1, "%s(%u): enabling full featured TX;\n",
333                         __func__, uprt->id);
334                 dev_info.default_txconf.txq_flags = 0;
335         }
336
337         for (q = 0; q < uprt->nb_lcore; q++) {
338                 rc = rte_eth_rx_queue_setup(uprt->id, q, RX_RING_SIZE,
339                         socket, &dev_info.default_rxconf, mp);
340                 if (rc < 0) {
341                         RTE_LOG(ERR, USER1,
342                                 "%s: rx queue=%u setup failed with error code: %d\n",
343                                 __func__, q, rc);
344                         return rc;
345                 }
346         }
347
348         for (q = 0; q < uprt->nb_lcore; q++) {
349                 rc = rte_eth_tx_queue_setup(uprt->id, q, TX_RING_SIZE,
350                         socket, &dev_info.default_txconf);
351                 if (rc < 0) {
352                         RTE_LOG(ERR, USER1,
353                                 "%s: tx queue=%u setup failed with error code: %d\n",
354                                 __func__, q, rc);
355                         return rc;
356                 }
357         }
358         return 0;
359 }
360
361 /*
362  * Check that lcore is enabled, not master, and not in use already.
363  */
364 static int
365 check_lcore(uint32_t lc)
366 {
367         if (rte_lcore_is_enabled(lc) == 0) {
368                 RTE_LOG(ERR, USER1, "lcore %u is not enabled\n", lc);
369                 return -EINVAL;
370         }
371         if (rte_eal_get_lcore_state(lc) == RUNNING) {
372                 RTE_LOG(ERR, USER1, "lcore %u already running %p\n",
373                         lc, lcore_config[lc].f);
374                 return -EINVAL;
375         }
376         return 0;
377 }
378
379 static void
380 log_netbe_prt(const struct netbe_port *uprt)
381 {
382         uint32_t i;
383         char corelist[2 * RTE_MAX_LCORE + 1];
384         char hashkey[2 * RSS_HASH_KEY_LENGTH];
385
386         memset(corelist, 0, sizeof(corelist));
387         memset(hashkey, 0, sizeof(hashkey));
388         for (i = 0; i < uprt->nb_lcore; i++)
389                 if (i < uprt->nb_lcore - 1)
390                         sprintf(corelist + (2 * i), "%u,", uprt->lcore[i]);
391                 else
392                         sprintf(corelist + (2 * i), "%u", uprt->lcore[i]);
393
394         for (i = 0; i < uprt->hash_key_size; i++)
395                 sprintf(hashkey + (2 * i), "%02x", uprt->hash_key[i]);
396
397         RTE_LOG(NOTICE, USER1,
398                 "uprt %p = <id = %u, lcore = <%s>, mtu = %u, "
399                 "rx_offload = %u, tx_offload = %u,\n"
400                 "ipv4 = %#x, "
401                 "ipv6 = %04hx:%04hx:%04hx:%04hx:%04hx:%04hx:%04hx:%04hx, "
402                 "mac = %02hhx:%02hhx:%02hhx:%02hhx:%02hhx:%02hhx>;\n"
403                 "hashkey = %s;\n",
404                 uprt, uprt->id, corelist,
405                 uprt->mtu, uprt->rx_offload, uprt->tx_offload,
406                 uprt->ipv4,
407                 uprt->ipv6.s6_addr16[0], uprt->ipv6.s6_addr16[1],
408                 uprt->ipv6.s6_addr16[2], uprt->ipv6.s6_addr16[3],
409                 uprt->ipv6.s6_addr16[4], uprt->ipv6.s6_addr16[5],
410                 uprt->ipv6.s6_addr16[6], uprt->ipv6.s6_addr16[7],
411                 uprt->mac.addr_bytes[0], uprt->mac.addr_bytes[1],
412                 uprt->mac.addr_bytes[2], uprt->mac.addr_bytes[3],
413                 uprt->mac.addr_bytes[4], uprt->mac.addr_bytes[5],
414                 hashkey);
415 }
416
417 static void
418 log_netbe_cfg(const struct netbe_cfg *ucfg)
419 {
420         uint32_t i;
421
422         RTE_LOG(NOTICE, USER1,
423                 "ucfg @ %p, prt_num = %u\n", ucfg, ucfg->prt_num);
424
425         for (i = 0; i != ucfg->prt_num; i++)
426                 log_netbe_prt(ucfg->prt + i);
427 }
428
429 static int
430 pool_init(uint32_t sid)
431 {
432         int32_t rc;
433         struct rte_mempool *mp;
434         char name[RTE_MEMPOOL_NAMESIZE];
435
436         snprintf(name, sizeof(name), "MP%u", sid);
437         mp = rte_pktmbuf_pool_create(name, MPOOL_NB_BUF, MPOOL_CACHE_SIZE, 0,
438                 RTE_MBUF_DEFAULT_BUF_SIZE, sid - 1);
439         if (mp == NULL) {
440                 rc = -rte_errno;
441                 RTE_LOG(ERR, USER1, "%s(%d) failed with error code: %d\n",
442                         __func__, sid - 1, rc);
443                 return rc;
444         }
445
446         mpool[sid] = mp;
447         return 0;
448 }
449
450 static int
451 frag_pool_init(uint32_t sid)
452 {
453         int32_t rc;
454         struct rte_mempool *frag_mp;
455         char frag_name[RTE_MEMPOOL_NAMESIZE];
456
457         snprintf(frag_name, sizeof(frag_name), "frag_MP%u", sid);
458         frag_mp = rte_pktmbuf_pool_create(frag_name, MPOOL_NB_BUF,
459                 MPOOL_CACHE_SIZE, 0, FRAG_MBUF_BUF_SIZE, sid - 1);
460         if (frag_mp == NULL) {
461                 rc = -rte_errno;
462                 RTE_LOG(ERR, USER1, "%s(%d) failed with error code: %d\n",
463                         __func__, sid - 1, rc);
464                 return rc;
465         }
466
467         frag_mpool[sid] = frag_mp;
468         return 0;
469 }
470
471 static struct netbe_lcore *
472 find_initilized_lcore(struct netbe_cfg *cfg, uint32_t lc_num)
473 {
474         uint32_t i;
475
476         for (i = 0; i < cfg->cpu_num; i++)
477                 if (cfg->cpu[i].id == lc_num)
478                         return &cfg->cpu[i];
479
480         return NULL;
481 }
482
483 static int
484 calculate_nb_prtq(struct netbe_cfg *cfg)
485 {
486         uint32_t i, j, rc;
487         struct netbe_port *prt;
488         struct netbe_lcore *lc;
489
490         for (i = 0; i != cfg->prt_num; i++) {
491                 prt = &cfg->prt[i];
492                 for (j = 0; j != prt->nb_lcore; j++) {
493                         rc = check_lcore(prt->lcore[j]);
494                         if (rc != 0) {
495                                 RTE_LOG(ERR, USER1,
496                                         "%s: processing failed with err: %d\n",
497                                         __func__, rc);
498                                 return rc;
499                         }
500
501                         lc = find_initilized_lcore(cfg, prt->lcore[j]);
502                         if (lc == NULL) {
503                                 NETBE_REALLOC(cfg->cpu, cfg->cpu_num + 1);
504                                 lc = &cfg->cpu[cfg->cpu_num];
505                                 lc->id = prt->lcore[j];
506                                 cfg->cpu_num++;
507                         }
508
509                         NETBE_REALLOC(lc->prtq, lc->prtq_num + 1);
510                         lc->prtq[lc->prtq_num].rxqid = j;
511                         lc->prtq[lc->prtq_num].txqid = j;
512                         lc->prtq[lc->prtq_num].port = *prt;
513                         lc->prtq_num++;
514                 }
515         }
516
517         return 0;
518 }
519
520 /*
521  * Setup all enabled ports.
522  */
523 static int
524 netbe_port_init(struct netbe_cfg *cfg, int argc, char *argv[])
525 {
526         int32_t rc;
527         uint32_t i, n, sid, j;
528         struct netbe_port *prt;
529
530         n = (uint32_t)argc;
531
532         rc = 0;
533         for (i = 0; i != n; i++) {
534                 NETBE_REALLOC(cfg->prt, cfg->prt_num + 1);
535                 rc = parse_netbe_arg(cfg->prt + i, argv[i]);
536                 if (rc != 0) {
537                         RTE_LOG(ERR, USER1,
538                                 "%s: processing of \"%s\" failed with error code: %d\n",
539                                 __func__, argv[i], rc);
540                         return rc;
541                 }
542                 cfg->prt_num++;
543         }
544
545         /* calculate number of queues per lcore. */
546         rc = calculate_nb_prtq(cfg);
547         if (rc != 0) {
548                 RTE_LOG(ERR, USER1, "%s: processing of arguments failed"
549                         " with error code: %d\n", __func__, rc);
550                 return rc;
551         }
552
553         for (i = 0; i != cfg->prt_num; i++) {
554                 prt = cfg->prt + i;
555                 rc = port_init(prt);
556                 if (rc != 0) {
557                         RTE_LOG(ERR, USER1,
558                                 "%s: port=%u init failed with error code: %d\n",
559                                 __func__, prt->id, rc);
560                         return rc;
561                 }
562                 rte_eth_macaddr_get(prt->id, &prt->mac);
563                 if (cfg->promisc)
564                         rte_eth_promiscuous_enable(prt->id);
565
566                 for (j = 0; j < prt->nb_lcore; j++) {
567                         sid = rte_lcore_to_socket_id(prt->lcore[j]) + 1;
568                         assert(sid < RTE_DIM(mpool));
569
570                         if (mpool[sid] == NULL) {
571                                 rc = pool_init(sid);
572                                 if (rc != 0)
573                                         return rc;
574                         }
575
576                         if (frag_mpool[sid] == NULL) {
577                                 rc = frag_pool_init(sid);
578                                 if (rc != 0)
579                                         return rc;
580                         }
581
582                         rc = queue_init(prt, mpool[sid]);
583                         if (rc != 0) {
584                                 RTE_LOG(ERR, USER1,
585                                         "%s: lcore=%u queue init failed with err: %d\n",
586                                         __func__, prt->lcore[j], rc);
587                                 return rc;
588                         }
589                 }
590         }
591         log_netbe_cfg(cfg);
592
593         return 0;
594 }
595
596 /*
597  * UDP IPv6 destination lookup callback.
598  */
599 static int
600 lpm6_dst_lookup(void *data, const struct in6_addr *addr,
601         struct tle_udp_dest *res)
602 {
603         int32_t rc;
604         uint8_t idx;
605         struct netbe_lcore *lc;
606         struct tle_udp_dest *dst;
607         uintptr_t p;
608
609         lc = data;
610         p = (uintptr_t)addr->s6_addr;
611
612         rc = rte_lpm6_lookup(lc->lpm6, (uint8_t *)p, &idx);
613         if (rc == 0) {
614                 dst = &lc->dst6[idx];
615                 rte_memcpy(res, dst, dst->l2_len + dst->l3_len +
616                         offsetof(struct tle_udp_dest, hdr));
617         }
618         return rc;
619 }
620
621 static int
622 netbe_add_ipv4_route(struct netbe_lcore *lc, const struct netbe_dest *dst,
623         uint8_t idx)
624 {
625         int32_t rc;
626         uint32_t addr, depth;
627         char str[INET_ADDRSTRLEN];
628
629         depth = dst->prfx;
630         addr = rte_be_to_cpu_32(dst->ipv4.s_addr);
631
632         inet_ntop(AF_INET, &dst->ipv4, str, sizeof(str));
633         rc = rte_lpm_add(lc->lpm4, addr, depth, idx);
634         RTE_LOG(NOTICE, USER1, "%s(lcore=%u,port=%u,dev=%p,"
635                 "ipv4=%s/%u,mtu=%u,"
636                 "mac=%02hhx:%02hhx:%02hhx:%02hhx:%02hhx:%02hhx) "
637                 "returns %d;\n",
638                 __func__, lc->id, dst->port, lc->dst4[idx].dev,
639                 str, depth, lc->dst4[idx].mtu,
640                 dst->mac.addr_bytes[0], dst->mac.addr_bytes[1],
641                 dst->mac.addr_bytes[2], dst->mac.addr_bytes[3],
642                 dst->mac.addr_bytes[4], dst->mac.addr_bytes[5],
643                 rc);
644         return rc;
645 }
646
647 static int
648 netbe_add_ipv6_route(struct netbe_lcore *lc, const struct netbe_dest *dst,
649         uint8_t idx)
650 {
651         int32_t rc;
652         uint32_t depth;
653         char str[INET6_ADDRSTRLEN];
654
655         depth = dst->prfx;
656
657         rc = rte_lpm6_add(lc->lpm6, (uint8_t *)(uintptr_t)dst->ipv6.s6_addr,
658                 depth, idx);
659
660         inet_ntop(AF_INET6, &dst->ipv6, str, sizeof(str));
661         RTE_LOG(NOTICE, USER1, "%s(lcore=%u,port=%u,dev=%p,"
662                 "ipv6=%s/%u,mtu=%u,"
663                 "mac=%02hhx:%02hhx:%02hhx:%02hhx:%02hhx:%02hhx) "
664                 "returns %d;\n",
665                 __func__, lc->id, dst->port, lc->dst6[idx].dev,
666                 str, depth, lc->dst4[idx].mtu,
667                 dst->mac.addr_bytes[0], dst->mac.addr_bytes[1],
668                 dst->mac.addr_bytes[2], dst->mac.addr_bytes[3],
669                 dst->mac.addr_bytes[4], dst->mac.addr_bytes[5],
670                 rc);
671         return rc;
672 }
673
674 static void
675 fill_dst(struct tle_udp_dest *dst, struct netbe_dev *bed,
676         const struct netbe_dest *bdp, uint16_t l3_type, int32_t sid)
677 {
678         struct ether_hdr *eth;
679         struct ipv4_hdr *ip4h;
680         struct ipv6_hdr *ip6h;
681
682         static const struct ipv4_hdr ipv4_tmpl = {
683                 .version_ihl =  4 << 4 | sizeof(*ip4h) / IPV4_IHL_MULTIPLIER,
684                 .time_to_live = 64,
685                 .next_proto_id = IPPROTO_UDP,
686         };
687
688         static const struct ipv6_hdr ipv6_tmpl = {
689                 .vtc_flow = 6 << 4,
690                 .proto = IPPROTO_UDP,
691                 .hop_limits = 64,
692         };
693
694         dst->dev = bed->dev;
695         dst->head_mp = frag_mpool[sid + 1];
696         dst->mtu = RTE_MIN(bdp->mtu, bed->port.mtu);
697         dst->l2_len = sizeof(*eth);
698
699         eth = (struct ether_hdr *)dst->hdr;
700
701         ether_addr_copy(&bed->port.mac, &eth->s_addr);
702         ether_addr_copy(&bdp->mac, &eth->d_addr);
703         eth->ether_type = rte_cpu_to_be_16(l3_type);
704
705         if (l3_type == ETHER_TYPE_IPv4) {
706                 dst->l3_len = sizeof(*ip4h);
707                 ip4h = (struct ipv4_hdr *)(eth + 1);
708                 ip4h[0] = ipv4_tmpl;
709         } else if (l3_type == ETHER_TYPE_IPv6) {
710                 dst->l3_len = sizeof(*ip6h);
711                 ip6h = (struct ipv6_hdr *)(eth + 1);
712                 ip6h[0] = ipv6_tmpl;
713         }
714 }
715
716 static int
717 create_context(struct netbe_lcore *lc, const struct tle_udp_ctx_param *ctx_prm)
718 {
719         uint32_t rc = 0, sid;
720         uint64_t frag_cycles;
721         struct tle_udp_ctx_param cprm;
722
723         if (lc->ctx == NULL) {
724                 sid = rte_lcore_to_socket_id(lc->id);
725
726                 rc = lcore_lpm_init(lc);
727                 if (rc != 0)
728                         return rc;
729
730                 cprm = *ctx_prm;
731                 cprm.socket_id = sid;
732                 cprm.lookup4 = lpm4_dst_lookup;
733                 cprm.lookup4_data = lc;
734                 cprm.lookup6 = lpm6_dst_lookup;
735                 cprm.lookup6_data = lc;
736
737                 /* to facilitate both IPv4 and IPv6. */
738                 cprm.max_streams *= 2;
739
740                 frag_cycles = (rte_get_tsc_hz() + MS_PER_S - 1) /
741                                                 MS_PER_S * FRAG_TTL;
742
743                 lc->ftbl = rte_ip_frag_table_create(cprm.max_streams,
744                         FRAG_TBL_BUCKET_ENTRIES, cprm.max_streams,
745                         frag_cycles, sid);
746
747                 RTE_LOG(NOTICE, USER1, "%s(lcore=%u): frag_tbl=%p;\n",
748                         __func__, lc->id, lc->ftbl);
749
750                 lc->ctx = tle_udp_create(&cprm);
751
752                 RTE_LOG(NOTICE, USER1, "%s(lcore=%u): udp_ctx=%p;\n",
753                         __func__, lc->id, lc->ctx);
754
755                 if (lc->ctx == NULL || lc->ftbl == NULL)
756                         rc = ENOMEM;
757         }
758
759         return rc;
760 }
761
762 /*
763  * BE lcore setup routine.
764  */
765 static int
766 lcore_init(struct netbe_lcore *lc, const struct tle_udp_ctx_param *ctx_prm,
767         const uint32_t prtqid, const uint16_t *bl_ports, uint32_t nb_bl_ports)
768 {
769         int32_t rc = 0;
770         struct tle_udp_dev_param dprm;
771
772         rc = create_context(lc, ctx_prm);
773
774         if (lc->ctx != NULL) {
775                 memset(&dprm, 0, sizeof(dprm));
776                 dprm.rx_offload = lc->prtq[prtqid].port.rx_offload;
777                 dprm.tx_offload = lc->prtq[prtqid].port.tx_offload;
778                 dprm.local_addr4.s_addr = lc->prtq[prtqid].port.ipv4;
779                 memcpy(&dprm.local_addr6,  &lc->prtq[prtqid].port.ipv6,
780                         sizeof(lc->prtq[prtqid].port.ipv6));
781                 dprm.bl4.nb_port = nb_bl_ports;
782                 dprm.bl4.port = bl_ports;
783                 dprm.bl6.nb_port = nb_bl_ports;
784                 dprm.bl6.port = bl_ports;
785
786                 lc->prtq[prtqid].dev = tle_udp_add_dev(lc->ctx, &dprm);
787
788                 RTE_LOG(NOTICE, USER1,
789                         "%s(lcore=%u, port=%u, qid=%u), udp_dev: %p\n",
790                         __func__, lc->id, lc->prtq[prtqid].port.id,
791                         lc->prtq[prtqid].rxqid, lc->prtq[prtqid].dev);
792
793                 if (lc->prtq[prtqid].dev == NULL)
794                         rc = -rte_errno;
795
796                 if (rc != 0) {
797                         RTE_LOG(ERR, USER1,
798                                 "%s(lcore=%u) failed with error code: %d\n",
799                                 __func__, lc->id, rc);
800                         tle_udp_destroy(lc->ctx);
801                         rte_ip_frag_table_destroy(lc->ftbl);
802                         rte_lpm_free(lc->lpm4);
803                         rte_lpm6_free(lc->lpm6);
804                         rte_free(lc->prtq[prtqid].port.lcore);
805                         lc->prtq[prtqid].port.nb_lcore = 0;
806                         rte_free(lc->prtq);
807                         lc->prtq_num = 0;
808                         return rc;
809                 }
810         }
811
812         return rc;
813 }
814
815 static uint16_t
816 create_blocklist(const struct netbe_port *beprt, uint16_t *bl_ports,
817         uint32_t q)
818 {
819         uint32_t i, j, qid, align_nb_q;
820
821         align_nb_q = rte_align32pow2(beprt->nb_lcore);
822         for (i = 0, j = 0; i < (UINT16_MAX + 1); i++) {
823                 qid = (i % align_nb_q) % beprt->nb_lcore;
824                 if (qid != q)
825                         bl_ports[j++] = i;
826         }
827
828         return j;
829 }
830
831 static int
832 netbe_lcore_init(struct netbe_cfg *cfg,
833         const struct tle_udp_ctx_param *ctx_prm)
834 {
835         int32_t rc;
836         uint32_t i, j, nb_bl_ports = 0, sz;
837         struct netbe_lcore *lc;
838         static uint16_t *bl_ports;
839
840         /* Create the udp context and attached queue for each lcore. */
841         rc = 0;
842         sz = sizeof(uint16_t) * UINT16_MAX;
843         bl_ports = rte_zmalloc(NULL, sz, RTE_CACHE_LINE_SIZE);
844         for (i = 0; i < cfg->cpu_num; i++) {
845                 lc = &cfg->cpu[i];
846                 for (j = 0; j < lc->prtq_num; j++) {
847                         memset((uint8_t *)bl_ports, 0, sz);
848                         /* create list of blocked ports based on q */
849                         nb_bl_ports = create_blocklist(&lc->prtq[j].port,
850                                 bl_ports, lc->prtq[j].rxqid);
851                         RTE_LOG(NOTICE, USER1,
852                                 "lc=%u, q=%u, nb_bl_ports=%u\n",
853                                 lc->id, lc->prtq[j].rxqid, nb_bl_ports);
854
855                         rc = lcore_init(lc, ctx_prm, j, bl_ports, nb_bl_ports);
856                         if (rc != 0) {
857                                 RTE_LOG(ERR, USER1,
858                                         "%s: failed with error code: %d\n",
859                                         __func__, rc);
860                                 rte_free(bl_ports);
861                                 return rc;
862                         }
863                 }
864         }
865         rte_free(bl_ports);
866
867         return 0;
868 }
869
870 static void
871 netbe_lcore_fini(struct netbe_cfg *cfg)
872 {
873         uint32_t i;
874
875         for (i = 0; i != cfg->cpu_num; i++) {
876                 tle_udp_destroy(cfg->cpu[i].ctx);
877                 rte_ip_frag_table_destroy(cfg->cpu[i].ftbl);
878                 rte_lpm_free(cfg->cpu[i].lpm4);
879                 rte_lpm6_free(cfg->cpu[i].lpm6);
880
881                 rte_free(cfg->cpu[i].prtq);
882                 cfg->cpu[i].prtq_num = 0;
883         }
884
885         rte_free(cfg->cpu);
886         cfg->cpu_num = 0;
887         for (i = 0; i != cfg->prt_num; i++) {
888                 rte_free(cfg->prt[i].lcore);
889                 cfg->prt[i].nb_lcore = 0;
890         }
891         rte_free(cfg->prt);
892         cfg->prt_num = 0;
893 }
894
895 static int
896 netbe_add_dest(struct netbe_lcore *lc, uint32_t dev_idx, uint16_t family,
897         const struct netbe_dest *dst, uint32_t dnum)
898 {
899         int32_t rc, sid;
900         uint16_t l3_type;
901         uint32_t i, n, m;
902         struct tle_udp_dest *dp;
903
904         if (family == AF_INET) {
905                 n = lc->dst4_num;
906                 dp = lc->dst4 + n;
907                 m = RTE_DIM(lc->dst4);
908                 l3_type = ETHER_TYPE_IPv4;
909         } else {
910                 n = lc->dst6_num;
911                 dp = lc->dst6 + n;
912                 m = RTE_DIM(lc->dst6);
913                 l3_type = ETHER_TYPE_IPv6;
914         }
915
916         if (n + dnum >= m) {
917                 RTE_LOG(ERR, USER1, "%s(lcore=%u, family=%hu, dnum=%u) exceeds "
918                         "maximum allowed number of destinations(%u);\n",
919                         __func__, lc->id, family, dnum, m);
920                 return -ENOSPC;
921         }
922
923         sid = rte_lcore_to_socket_id(lc->id);
924         rc = 0;
925
926         for (i = 0; i != dnum && rc == 0; i++) {
927                 fill_dst(dp + i, lc->prtq + dev_idx, dst + i, l3_type, sid);
928                 if (family == AF_INET)
929                         rc = netbe_add_ipv4_route(lc, dst + i, n + i);
930                 else
931                         rc = netbe_add_ipv6_route(lc, dst + i, n + i);
932         }
933
934         if (family == AF_INET)
935                 lc->dst4_num = n + i;
936         else
937                 lc->dst6_num = n + i;
938
939         return rc;
940 }
941
942 static int
943 netbe_dest_init(const char *fname, struct netbe_cfg *cfg)
944 {
945         int32_t rc;
946         uint32_t f, i, p;
947         uint32_t k, l, cnt;
948         struct netbe_lcore *lc;
949         struct netbe_dest_prm prm;
950
951         rc = netbe_parse_dest(fname, &prm);
952         if (rc != 0)
953                 return rc;
954
955         rc = 0;
956         for (i = 0; i != prm.nb_dest; i++) {
957
958                 p = prm.dest[i].port;
959                 f = prm.dest[i].family;
960
961                 cnt = 0;
962                 for (k = 0; k != cfg->cpu_num; k++) {
963                         lc = cfg->cpu + k;
964                         for (l = 0; l != lc->prtq_num; l++)
965                                 if (lc->prtq[l].port.id == p) {
966                                         rc = netbe_add_dest(lc, l, f,
967                                                         prm.dest + i, 1);
968                                         if (rc != 0) {
969                                                 RTE_LOG(ERR, USER1,
970                                                         "%s(lcore=%u, family=%u) could not "
971                                                         "add destinations(%u);\n",
972                                                         __func__, lc->id, f, i);
973                                                 return -ENOSPC;
974                                         }
975                                         cnt++;
976                                 }
977                 }
978
979                 if (cnt == 0) {
980                         RTE_LOG(ERR, USER1, "%s(%s) error at line %u: "
981                                 "port %u not managed by any lcore;\n",
982                                 __func__, fname, prm.dest[i].line, p);
983                         break;
984                 }
985         }
986
987         free(prm.dest);
988         return rc;
989 }
990
991 static void
992 netfe_stream_close(struct netfe_lcore *fe, uint32_t dec)
993 {
994         uint32_t sidx;
995
996         fe->sidx -= dec;
997         sidx = fe->sidx;
998         tle_event_free(fe->fs[sidx].txev);
999         tle_event_free(fe->fs[sidx].rxev);
1000         tle_udp_stream_close(fe->fs[sidx].s);
1001         memset(&fe->fs[sidx], 0, sizeof(fe->fs[sidx]));
1002 }
1003
1004 static void
1005 netfe_stream_dump(const struct netfe_stream *fes)
1006 {
1007         struct sockaddr_in *l4, *r4;
1008         struct sockaddr_in6 *l6, *r6;
1009         uint16_t lport, rport;
1010         struct tle_udp_stream_param sprm;
1011         char laddr[INET6_ADDRSTRLEN];
1012         char raddr[INET6_ADDRSTRLEN];
1013
1014         tle_udp_stream_get_param(fes->s, &sprm);
1015
1016         if (sprm.local_addr.ss_family == AF_INET) {
1017
1018                 l4 = (struct sockaddr_in *)&sprm.local_addr;
1019                 r4 = (struct sockaddr_in *)&sprm.remote_addr;
1020
1021                 lport = l4->sin_port;
1022                 rport = r4->sin_port;
1023
1024         } else if (sprm.local_addr.ss_family == AF_INET6) {
1025
1026                 l6 = (struct sockaddr_in6 *)&sprm.local_addr;
1027                 r6 = (struct sockaddr_in6 *)&sprm.remote_addr;
1028
1029                 lport = l6->sin6_port;
1030                 rport = r6->sin6_port;
1031
1032         } else {
1033                 RTE_LOG(ERR, USER1, "stream@%p - unknown family=%hu\n",
1034                         fes->s, sprm.local_addr.ss_family);
1035                 return;
1036         }
1037
1038         format_addr(&sprm.local_addr, laddr, sizeof(laddr));
1039         format_addr(&sprm.remote_addr, raddr, sizeof(raddr));
1040
1041         RTE_LOG(INFO, USER1,
1042                 "stream@%p={"
1043                 "family=%hu,laddr=%s,lport=%hu,raddr=%s,rport=%hu,"
1044                 "stats={"
1045                 "rxp=%" PRIu64 ",txp=%" PRIu64 ",drops=%" PRIu64 ","
1046                 "rxev[IDLE, DOWN, UP]=[%" PRIu64 ", %" PRIu64 ", %" PRIu64 "],"
1047                 "txev[IDLE, DOWN, UP]=[%" PRIu64 ", %" PRIu64 ", %" PRIu64 "],"
1048                 "}};\n",
1049                 fes->s,
1050                 sprm.local_addr.ss_family,
1051                 laddr, ntohs(lport), raddr, ntohs(rport),
1052                 fes->stat.rxp, fes->stat.txp, fes->stat.drops,
1053                 fes->stat.rxev[TLE_SEV_IDLE],
1054                 fes->stat.rxev[TLE_SEV_DOWN],
1055                 fes->stat.rxev[TLE_SEV_UP],
1056                 fes->stat.txev[TLE_SEV_IDLE],
1057                 fes->stat.txev[TLE_SEV_DOWN],
1058                 fes->stat.txev[TLE_SEV_UP]);
1059 }
1060
1061 /*
1062  * helper function: opens IPv4 and IPv6 streams for selected port.
1063  */
1064 static struct netfe_stream *
1065 netfe_stream_open(struct netfe_lcore *fe, struct tle_udp_stream_param *sprm,
1066         uint32_t lcore, uint16_t op, uint32_t bidx)
1067 {
1068         int32_t rc;
1069         uint32_t sidx;
1070         struct netfe_stream *fes;
1071         struct sockaddr_in *l4;
1072         struct sockaddr_in6 *l6;
1073         uint16_t errport;
1074
1075         sidx = fe->sidx;
1076         fes = fe->fs + sidx;
1077         if (sidx >= fe->snum) {
1078                 rte_errno = ENOBUFS;
1079                 return NULL;
1080         }
1081
1082         fes->rxev = tle_event_alloc(fe->rxeq, &fe->fs[sidx]);
1083         fes->txev = tle_event_alloc(fe->txeq, &fe->fs[sidx]);
1084         sprm->recv_ev = fes->rxev;
1085         if (op != FWD)
1086                 sprm->send_ev = fes->txev;
1087
1088         RTE_LOG(ERR, USER1,
1089                 "%s(%u) [%u]={op=%hu, rxev=%p, txev=%p}, be_lc=%u\n",
1090                 __func__, lcore, sidx, op, fes->rxev, fes->txev,
1091                 becfg.cpu[bidx].id);
1092         if (fes->rxev == NULL || fes->txev == NULL) {
1093                 netfe_stream_close(fe, 0);
1094                 rte_errno = ENOMEM;
1095                 return NULL;
1096         }
1097
1098         if (op == TXONLY || op == FWD) {
1099                 tle_event_active(fes->txev, TLE_SEV_DOWN);
1100                 fes->stat.txev[TLE_SEV_DOWN]++;
1101         }
1102
1103         if (op != TXONLY) {
1104                 tle_event_active(fes->rxev, TLE_SEV_DOWN);
1105                 fes->stat.rxev[TLE_SEV_DOWN]++;
1106         }
1107
1108         fes->s = tle_udp_stream_open(becfg.cpu[bidx].ctx, sprm);
1109         if (fes->s == NULL) {
1110                 rc = rte_errno;
1111                 netfe_stream_close(fe, 0);
1112                 rte_errno = rc;
1113
1114                 if (sprm->local_addr.ss_family == AF_INET) {
1115                         l4 = (struct sockaddr_in *) &sprm->local_addr;
1116                         errport = ntohs(l4->sin_port);
1117                 } else {
1118                         l6 = (struct sockaddr_in6 *) &sprm->local_addr;
1119                         errport = ntohs(l6->sin6_port);
1120                 }
1121                 RTE_LOG(ERR, USER1, "stream open failed for port %u with error "
1122                         "code=%u, bidx=%u, lc=%u\n",
1123                         errport, rc, bidx, becfg.cpu[bidx].id);
1124                 return NULL;
1125         }
1126
1127         fes->op = op;
1128         fes->family = sprm->local_addr.ss_family;
1129
1130         fe->sidx = sidx + 1;
1131         return fes;
1132 }
1133
1134 static inline int
1135 netfe_addr_eq(struct sockaddr_storage *l, struct sockaddr_storage *r,
1136         uint16_t family)
1137 {
1138         struct sockaddr_in *l4, *r4;
1139         struct sockaddr_in6 *l6, *r6;
1140
1141         if (family == AF_INET) {
1142                 l4 = (struct sockaddr_in *)l;
1143                 r4 = (struct sockaddr_in *)r;
1144                 return (l4->sin_port == r4->sin_port &&
1145                                 l4->sin_addr.s_addr == r4->sin_addr.s_addr);
1146         } else {
1147                 l6 = (struct sockaddr_in6 *)l;
1148                 r6 = (struct sockaddr_in6 *)r;
1149                 return (l6->sin6_port == r6->sin6_port &&
1150                                 memcmp(&l6->sin6_addr, &r6->sin6_addr,
1151                                 sizeof(l6->sin6_addr)));
1152         }
1153 }
1154
1155 static inline void
1156 netfe_pkt_addr(const struct rte_mbuf *m, struct sockaddr_storage *ps,
1157         uint16_t family)
1158 {
1159         const struct ipv4_hdr *ip4h;
1160         const struct ipv6_hdr *ip6h;
1161         const struct udp_hdr *udph;
1162         struct sockaddr_in *in4;
1163         struct sockaddr_in6 *in6;
1164
1165         NETFE_PKT_DUMP(m);
1166
1167         udph = rte_pktmbuf_mtod_offset(m, struct udp_hdr *, -m->l4_len);
1168
1169         if (family == AF_INET) {
1170                 in4 = (struct sockaddr_in *)ps;
1171                 ip4h = rte_pktmbuf_mtod_offset(m, struct ipv4_hdr *,
1172                         -(m->l4_len + m->l3_len));
1173                 in4->sin_port = udph->src_port;
1174                 in4->sin_addr.s_addr = ip4h->src_addr;
1175         } else {
1176                 in6 = (struct sockaddr_in6 *)ps;
1177                 ip6h = rte_pktmbuf_mtod_offset(m, struct ipv6_hdr *,
1178                         -(m->l4_len + m->l3_len));
1179                 in6->sin6_port = udph->src_port;
1180                 rte_memcpy(&in6->sin6_addr, ip6h->src_addr,
1181                         sizeof(in6->sin6_addr));
1182         }
1183 }
1184
1185 static inline uint32_t
1186 pkt_eq_addr(struct rte_mbuf *pkt[], uint32_t num, uint16_t family,
1187         struct sockaddr_storage *cur, struct sockaddr_storage *nxt)
1188 {
1189         uint32_t i;
1190
1191         for (i = 0; i != num; i++) {
1192                 netfe_pkt_addr(pkt[i], nxt, family);
1193                 if (netfe_addr_eq(cur, nxt, family) == 0)
1194                         break;
1195         }
1196
1197         return i;
1198 }
1199
1200 static inline void
1201 pkt_buf_empty(struct pkt_buf *pb)
1202 {
1203         uint32_t i;
1204
1205         for (i = 0; i != pb->num; i++)
1206                 rte_pktmbuf_free(pb->pkt[i]);
1207
1208         pb->num = 0;
1209 }
1210
1211 static inline void
1212 pkt_buf_fill(uint32_t lcore, struct pkt_buf *pb, uint32_t dlen)
1213 {
1214         uint32_t i;
1215         int32_t sid;
1216
1217         sid = rte_lcore_to_socket_id(lcore) + 1;
1218
1219         for (i = pb->num; i != RTE_DIM(pb->pkt); i++) {
1220                 pb->pkt[i] = rte_pktmbuf_alloc(mpool[sid]);
1221                 if (pb->pkt[i] == NULL)
1222                         break;
1223                 rte_pktmbuf_append(pb->pkt[i], dlen);
1224         }
1225
1226         pb->num = i;
1227 }
1228
1229 static struct netfe_stream *
1230 find_fwd_dst(uint32_t lcore, struct netfe_stream *fes,
1231         const struct sockaddr *sa)
1232 {
1233         uint32_t rc;
1234         struct netfe_stream *fed;
1235         struct netfe_lcore *fe;
1236         struct tle_udp_stream_param sprm;
1237
1238         fe = RTE_PER_LCORE(_fe);
1239
1240         fed = fwd_tbl_lkp(fe, fes->family, sa);
1241         if (fed != NULL)
1242                 return fed;
1243
1244         /* create a new stream and put it into the fwd table. */
1245
1246         sprm = fes->fwdprm.prm;
1247
1248         /* open forward stream with wildcard remote addr. */
1249         memset(&sprm.remote_addr.ss_family + 1, 0,
1250                 sizeof(sprm.remote_addr) - sizeof(sprm.remote_addr.ss_family));
1251         fed = netfe_stream_open(fe, &sprm, lcore, FWD, fes->fwdprm.bidx);
1252         if (fed == NULL)
1253                 return NULL;
1254
1255         rc = fwd_tbl_add(fe, fes->family, sa, fed);
1256         if (rc != 0) {
1257                 netfe_stream_close(fe, 1);
1258                 fed = NULL;
1259         }
1260
1261         fed->fwdprm.prm.remote_addr = *(const struct sockaddr_storage *)sa;
1262         return fed;
1263 }
1264
1265 static inline void
1266 netfe_tx_process(uint32_t lcore, struct netfe_stream *fes)
1267 {
1268         uint32_t i, k, n;
1269
1270         /* refill with new mbufs. */
1271         pkt_buf_fill(lcore, &fes->pbuf, fes->txlen);
1272
1273         n = fes->pbuf.num;
1274         if (n == 0)
1275                 return;
1276
1277         k = tle_udp_stream_send(fes->s, fes->pbuf.pkt, n, NULL);
1278         NETFE_TRACE("%s(%u): tle_udp_stream_send(%p, %u) returns %u\n",
1279                 __func__, lcore, fes->s, n, k);
1280         fes->stat.txp += k;
1281         fes->stat.drops += n - k;
1282
1283         if (k == 0)
1284                 return;
1285
1286         /* adjust pbuf array. */
1287         fes->pbuf.num = n - k;
1288         for (i = k; i != n; i++)
1289                 fes->pbuf.pkt[i - k] = fes->pbuf.pkt[i];
1290 }
1291
1292 static inline void
1293 netfe_fwd(uint32_t lcore, struct netfe_stream *fes)
1294 {
1295         uint32_t i, j, k, n, x;
1296         uint16_t family;
1297         void *pi0, *pi1, *pt;
1298         struct rte_mbuf **pkt;
1299         struct netfe_stream *fed;
1300         struct sockaddr_storage in[2];
1301
1302         family = fes->family;
1303         n = fes->pbuf.num;
1304         pkt = fes->pbuf.pkt;
1305
1306         if (n == 0)
1307                 return;
1308
1309         in[0].ss_family = family;
1310         in[1].ss_family = family;
1311         pi0 = &in[0];
1312         pi1 = &in[1];
1313
1314         netfe_pkt_addr(pkt[0], pi0, family);
1315
1316         x = 0;
1317         for (i = 0; i != n; i = j) {
1318
1319                 j = i + pkt_eq_addr(&pkt[i + 1],
1320                         n - i - 1, family, pi0, pi1) + 1;
1321
1322                 fed = find_fwd_dst(lcore, fes, (const struct sockaddr *)pi0);
1323                 if (fed != NULL) {
1324
1325                         k = tle_udp_stream_send(fed->s, pkt + i, j - i,
1326                                 (const struct sockaddr *)
1327                                 &fes->fwdprm.prm.remote_addr);
1328
1329                         NETFE_TRACE("%s(%u): tle_udp_stream_send(%p, %u) "
1330                                 "returns %u\n",
1331                                 __func__, lcore, fed->s, j - i, k);
1332                         fed->stat.txp += k;
1333                         fed->stat.drops += j - i - k;
1334                         fes->stat.fwp += k;
1335
1336                 } else {
1337                         NETFE_TRACE("%s(%u, %p): no fwd stream for %u pkts;\n",
1338                                 __func__, lcore, fes->s, j - i);
1339                         for (k = i; k != j; k++) {
1340                                 NETFE_TRACE("%s(%u, %p): free(%p);\n",
1341                                 __func__, lcore, fes->s, pkt[k]);
1342                                 rte_pktmbuf_free(pkt[j]);
1343                         }
1344                         fes->stat.drops += j - i;
1345                 }
1346
1347                 /* copy unforwarded mbufs. */
1348                 for (i += k; i != j; i++, x++)
1349                         pkt[x] = pkt[i];
1350
1351                 /* swap the pointers */
1352                 pt = pi0;
1353                 pi0 = pi1;
1354                 pi1 = pt;
1355         }
1356
1357         fes->pbuf.num = x;
1358
1359         if (x != 0) {
1360                 tle_event_raise(fes->txev);
1361                 fes->stat.txev[TLE_SEV_UP]++;
1362         }
1363
1364         if (n == RTE_DIM(fes->pbuf.pkt)) {
1365                 tle_event_active(fes->rxev, TLE_SEV_UP);
1366                 fes->stat.rxev[TLE_SEV_UP]++;
1367         }
1368 }
1369
1370 static inline void
1371 netfe_rx_process(__rte_unused uint32_t lcore, struct netfe_stream *fes)
1372 {
1373         uint32_t k, n;
1374
1375         n = fes->pbuf.num;
1376         k = RTE_DIM(fes->pbuf.pkt) - n;
1377
1378         /* packet buffer is full, can't receive any new packets. */
1379         if (k == 0) {
1380                 tle_event_idle(fes->rxev);
1381                 fes->stat.rxev[TLE_SEV_IDLE]++;
1382                 return;
1383         }
1384
1385         n = tle_udp_stream_recv(fes->s, fes->pbuf.pkt + n, k);
1386         if (n == 0)
1387                 return;
1388
1389         NETFE_TRACE("%s(%u): tle_udp_stream_recv(%p, %u) returns %u\n",
1390                 __func__, lcore, fes->s, k, n);
1391
1392         fes->pbuf.num += n;
1393         fes->stat.rxp += n;
1394
1395         /* free all received mbufs. */
1396         if (fes->op == RXONLY)
1397                 pkt_buf_empty(&fes->pbuf);
1398         /* mark stream as writable */
1399         else if (k ==  RTE_DIM(fes->pbuf.pkt)) {
1400                 if (fes->op == RXTX) {
1401                         tle_event_active(fes->txev, TLE_SEV_UP);
1402                         fes->stat.txev[TLE_SEV_UP]++;
1403                 } else if (fes->op == FWD) {
1404                         tle_event_raise(fes->txev);
1405                         fes->stat.txev[TLE_SEV_UP]++;
1406                 }
1407         }
1408 }
1409
1410 static inline void
1411 netfe_rxtx_process(__rte_unused uint32_t lcore, struct netfe_stream *fes)
1412 {
1413         uint32_t i, j, k, n;
1414         uint16_t family;
1415         void *pi0, *pi1, *pt;
1416         struct rte_mbuf **pkt;
1417         struct sockaddr_storage in[2];
1418
1419         family = fes->family;
1420         n = fes->pbuf.num;
1421         pkt = fes->pbuf.pkt;
1422
1423         /* there is nothing to send. */
1424         if (n == 0) {
1425                 tle_event_idle(fes->txev);
1426                 fes->stat.txev[TLE_SEV_IDLE]++;
1427                 return;
1428         }
1429
1430         in[0].ss_family = family;
1431         in[1].ss_family = family;
1432         pi0 = &in[0];
1433         pi1 = &in[1];
1434
1435         netfe_pkt_addr(pkt[0], pi0, family);
1436
1437         for (i = 0; i != n; i = j) {
1438
1439                 j = i + pkt_eq_addr(&pkt[i + 1],
1440                         n - i - 1, family, pi0, pi1) + 1;
1441
1442                 k = tle_udp_stream_send(fes->s, pkt + i, j - i,
1443                         (const struct sockaddr *)pi0);
1444
1445                 NETFE_TRACE("%s(%u): tle_udp_stream_send(%p, %u) returns %u\n",
1446                         __func__, lcore, fes->s, j - i, k);
1447                 fes->stat.txp += k;
1448                 fes->stat.drops += j - i - k;
1449
1450                 i += k;
1451
1452                 /* stream send buffer is full */
1453                 if (i != j)
1454                         break;
1455
1456                 /* swap the pointers */
1457                 pt = pi0;
1458                 pi0 = pi1;
1459                 pi1 = pt;
1460         }
1461
1462         /* not able to send anything. */
1463         if (i == 0)
1464                 return;
1465
1466         if (n == RTE_DIM(fes->pbuf.pkt)) {
1467                 /* mark stream as readable */
1468                 tle_event_active(fes->rxev, TLE_SEV_UP);
1469                 fes->stat.rxev[TLE_SEV_UP]++;
1470         }
1471
1472         /* adjust pbuf array. */
1473         fes->pbuf.num = n - i;
1474         for (j = i; j != n; j++)
1475                 pkt[j - i] = pkt[j];
1476 }
1477
1478 static int
1479 netfe_lcore_init(const struct netfe_lcore_prm *prm)
1480 {
1481         size_t sz;
1482         int32_t rc;
1483         uint32_t i, lcore, snum;
1484         struct netfe_lcore *fe;
1485         struct tle_evq_param eprm;
1486         struct tle_udp_stream_param sprm;
1487         struct netfe_stream *fes;
1488
1489         lcore = rte_lcore_id();
1490
1491         snum = prm->max_streams;
1492         RTE_LOG(NOTICE, USER1, "%s(lcore=%u, nb_streams=%u, max_streams=%u)\n",
1493                 __func__, lcore, prm->nb_streams, snum);
1494
1495         memset(&eprm, 0, sizeof(eprm));
1496         eprm.socket_id = rte_lcore_to_socket_id(lcore);
1497         eprm.max_events = snum;
1498
1499         sz = sizeof(*fe) + snum * sizeof(fe->fs[0]);
1500         fe = rte_zmalloc_socket(NULL, sz, RTE_CACHE_LINE_SIZE,
1501                 rte_lcore_to_socket_id(lcore));
1502
1503         if (fe == NULL) {
1504                 RTE_LOG(ERR, USER1, "%s:%d failed to allocate %zu bytes\n",
1505                         __func__, __LINE__, sz);
1506                 return -ENOMEM;
1507         }
1508
1509         RTE_PER_LCORE(_fe) = fe;
1510
1511         fe->snum = snum;
1512         fe->fs = (struct netfe_stream *)(fe + 1);
1513
1514         fe->rxeq = tle_evq_create(&eprm);
1515         fe->txeq = tle_evq_create(&eprm);
1516
1517         RTE_LOG(INFO, USER1, "%s(%u) rx evq=%p, tx evq=%p\n",
1518                 __func__, lcore, fe->rxeq, fe->txeq);
1519         if (fe->rxeq == NULL || fe->txeq == NULL)
1520                 return -ENOMEM;
1521
1522         rc = fwd_tbl_init(fe, AF_INET, lcore);
1523         RTE_LOG(ERR, USER1, "%s(%u) fwd_tbl_init(%u) returns %d\n",
1524                 __func__, lcore, AF_INET, rc);
1525         if (rc != 0)
1526                 return rc;
1527
1528         rc = fwd_tbl_init(fe, AF_INET6, lcore);
1529         RTE_LOG(ERR, USER1, "%s(%u) fwd_tbl_init(%u) returns %d\n",
1530                 __func__, lcore, AF_INET6, rc);
1531         if (rc != 0)
1532                 return rc;
1533
1534         /* open all requested streams. */
1535         for (i = 0; i != prm->nb_streams; i++) {
1536                 sprm = prm->stream[i].sprm.prm;
1537                 fes = netfe_stream_open(fe, &sprm, lcore, prm->stream[i].op,
1538                         prm->stream[i].sprm.bidx);
1539                 if (fes == NULL) {
1540                         rc = -rte_errno;
1541                         break;
1542                 }
1543
1544                 netfe_stream_dump(fes);
1545
1546                 if (prm->stream[i].op == FWD) {
1547                         fes->fwdprm = prm->stream[i].fprm;
1548                         rc = fwd_tbl_add(fe,
1549                                 prm->stream[i].fprm.prm.remote_addr.ss_family,
1550                                 (const struct sockaddr *)
1551                                 &prm->stream[i].fprm.prm.remote_addr,
1552                                 fes);
1553                         if (rc != 0) {
1554                                 netfe_stream_close(fe, 1);
1555                                 break;
1556                         }
1557                 } else if (prm->stream[i].op == TXONLY) {
1558                         fes->txlen = prm->stream[i].txlen;
1559                         fes->raddr = sprm.remote_addr;
1560                 }
1561         }
1562
1563         return rc;
1564 }
1565
1566 static inline void
1567 netfe_lcore(void)
1568 {
1569         struct netfe_lcore *fe;
1570         uint32_t j, n, lcore;
1571         struct netfe_stream *fs[MAX_PKT_BURST];
1572
1573         fe = RTE_PER_LCORE(_fe);
1574         if (fe == NULL)
1575                 return;
1576
1577         lcore = rte_lcore_id();
1578
1579         n = tle_evq_get(fe->rxeq, (const void **)(uintptr_t)fs, RTE_DIM(fs));
1580
1581         if (n != 0) {
1582                 NETFE_TRACE("%s(%u): tle_evq_get(rxevq=%p) returns %u\n",
1583                         __func__, lcore, fe->rxeq, n);
1584                 for (j = 0; j != n; j++)
1585                         netfe_rx_process(lcore, fs[j]);
1586         }
1587
1588         n = tle_evq_get(fe->txeq, (const void **)(uintptr_t)fs, RTE_DIM(fs));
1589
1590         if (n != 0) {
1591                 NETFE_TRACE("%s(%u): tle_evq_get(txevq=%p) returns %u\n",
1592                         __func__, lcore, fe->txeq, n);
1593                 for (j = 0; j != n; j++) {
1594                         if (fs[j]->op == RXTX)
1595                                 netfe_rxtx_process(lcore, fs[j]);
1596                         else if (fs[j]->op == FWD)
1597                                 netfe_fwd(lcore, fs[j]);
1598                         else if (fs[j]->op == TXONLY)
1599                                 netfe_tx_process(lcore, fs[j]);
1600                 }
1601         }
1602 }
1603
1604 static void
1605 netfe_lcore_fini(void)
1606 {
1607         struct netfe_lcore *fe;
1608         uint32_t i;
1609
1610         fe = RTE_PER_LCORE(_fe);
1611         if (fe == NULL)
1612                 return;
1613
1614         while (fe->sidx != 0) {
1615                 i = fe->sidx - 1;
1616                 netfe_stream_dump(fe->fs + i);
1617                 netfe_stream_close(fe, 1);
1618         }
1619
1620         tle_evq_destroy(fe->txeq);
1621         tle_evq_destroy(fe->rxeq);
1622         RTE_PER_LCORE(_fe) = NULL;
1623         rte_free(fe);
1624 }
1625
1626 static inline void
1627 netbe_rx(struct netbe_lcore *lc, uint32_t pidx)
1628 {
1629         uint32_t j, k, n;
1630         struct rte_mbuf *pkt[MAX_PKT_BURST];
1631         struct rte_mbuf *rp[MAX_PKT_BURST];
1632         int32_t rc[MAX_PKT_BURST];
1633
1634         n = rte_eth_rx_burst(lc->prtq[pidx].port.id,
1635                         lc->prtq[pidx].rxqid, pkt, RTE_DIM(pkt));
1636         if (n == 0)
1637                 return;
1638
1639         lc->prtq[pidx].rx_stat.in += n;
1640         NETBE_TRACE("%s(%u): rte_eth_rx_burst(%u, %u) returns %u\n",
1641                 __func__, lc->id, lc->prtq[pidx].port.id, lc->prtq[pidx].rxqid,
1642                 n);
1643
1644         k = tle_udp_rx_bulk(lc->prtq[pidx].dev, pkt, rp, rc, n);
1645
1646         lc->prtq[pidx].rx_stat.up += k;
1647         lc->prtq[pidx].rx_stat.drop += n - k;
1648         NETBE_TRACE("%s(%u): tle_udp_rx_bulk(%p, %u) returns %u\n",
1649                 __func__, lc->id, lc->prtq[pidx].dev, n, k);
1650
1651         for (j = 0; j != n - k; j++) {
1652                 NETBE_TRACE("%s:%d(port=%u) rp[%u]={%p, %d};\n",
1653                         __func__, __LINE__, lc->prtq[pidx].port.id,
1654                         j, rp[j], rc[j]);
1655                 rte_pktmbuf_free(rp[j]);
1656         }
1657 }
1658
1659 static inline void
1660 netbe_tx(struct netbe_lcore *lc, uint32_t pidx)
1661 {
1662         uint32_t j, k, n;
1663         struct rte_mbuf **mb;
1664
1665         n = lc->prtq[pidx].tx_buf.num;
1666         k = RTE_DIM(lc->prtq[pidx].tx_buf.pkt) - n;
1667         mb = lc->prtq[pidx].tx_buf.pkt;
1668
1669         if (k >= RTE_DIM(lc->prtq[pidx].tx_buf.pkt) / 2) {
1670                 j = tle_udp_tx_bulk(lc->prtq[pidx].dev, mb + n, k);
1671                 n += j;
1672                 lc->prtq[pidx].tx_stat.down += j;
1673         }
1674
1675         if (n == 0)
1676                 return;
1677
1678         NETBE_TRACE("%s(%u): tle_udp_tx_bulk(%p) returns %u,\n"
1679                 "total pkts to send: %u\n",
1680                 __func__, lc->id, lc->prtq[pidx].dev, j, n);
1681
1682         for (j = 0; j != n; j++)
1683                 NETBE_PKT_DUMP(mb[j]);
1684
1685         k = rte_eth_tx_burst(lc->prtq[pidx].port.id,
1686                         lc->prtq[pidx].txqid, mb, n);
1687
1688         lc->prtq[pidx].tx_stat.out += k;
1689         lc->prtq[pidx].tx_stat.drop += n - k;
1690         NETBE_TRACE("%s(%u): rte_eth_tx_burst(%u, %u, %u) returns %u\n",
1691                 __func__, lc->id, lc->prtq[pidx].port.id, lc->prtq[pidx].txqid,
1692                 n, k);
1693
1694         lc->prtq[pidx].tx_buf.num = n - k;
1695         if (k != 0)
1696                 for (j = k; j != n; j++)
1697                         mb[j - k] = mb[j];
1698 }
1699
1700 static int
1701 netbe_lcore_setup(struct netbe_lcore *lc)
1702 {
1703         uint32_t i;
1704         int32_t rc;
1705
1706         RTE_LOG(NOTICE, USER1, "%s(lcore=%u, udp_ctx: %p) start\n",
1707                 __func__, lc->id, lc->ctx);
1708
1709         /*
1710          * ???????
1711          * wait for FE lcores to start, so BE dont' drop any packets
1712          * because corresponding streams not opened yet by FE.
1713          * useful when used with pcap PMDS.
1714          * think better way, or should this timeout be a cmdlien parameter.
1715          * ???????
1716          */
1717         rte_delay_ms(10);
1718
1719         rc = 0;
1720         for (i = 0; i != lc->prtq_num && rc == 0; i++) {
1721                 RTE_LOG(NOTICE, USER1, "%s:%u(port=%u, udp_dev: %p)\n",
1722                         __func__, i, lc->prtq[i].port.id, lc->prtq[i].dev);
1723                 rc = setup_rx_cb(&lc->prtq[i].port, lc, lc->prtq[i].rxqid);
1724                 if (rc < 0)
1725                         return rc;
1726         }
1727
1728         if (rc == 0)
1729                 RTE_PER_LCORE(_be) = lc;
1730         return rc;
1731 }
1732
1733 static inline void
1734 netbe_lcore(void)
1735 {
1736         uint32_t i;
1737         struct netbe_lcore *lc;
1738
1739         lc = RTE_PER_LCORE(_be);
1740         if (lc == NULL)
1741                 return;
1742
1743         for (i = 0; i != lc->prtq_num; i++) {
1744                 netbe_rx(lc, i);
1745                 netbe_tx(lc, i);
1746         }
1747 }
1748
1749 static void
1750 netbe_lcore_clear(void)
1751 {
1752         uint32_t i, j;
1753         struct netbe_lcore *lc;
1754
1755         lc = RTE_PER_LCORE(_be);
1756         if (lc == NULL)
1757                 return;
1758
1759         RTE_LOG(NOTICE, USER1, "%s(lcore=%u, udp_ctx: %p) finish\n",
1760                 __func__, lc->id, lc->ctx);
1761         for (i = 0; i != lc->prtq_num; i++) {
1762                 RTE_LOG(NOTICE, USER1, "%s:%u(port=%u, lcore=%u, q=%u, dev=%p) "
1763                         "rx_stats={"
1764                         "in=%" PRIu64 ",up=%" PRIu64 ",drop=%" PRIu64 "}, "
1765                         "tx_stats={"
1766                         "in=%" PRIu64 ",up=%" PRIu64 ",drop=%" PRIu64 "};\n",
1767                         __func__, i, lc->prtq[i].port.id, lc->id,
1768                         lc->prtq[i].rxqid,
1769                         lc->prtq[i].dev,
1770                         lc->prtq[i].rx_stat.in,
1771                         lc->prtq[i].rx_stat.up,
1772                         lc->prtq[i].rx_stat.drop,
1773                         lc->prtq[i].tx_stat.down,
1774                         lc->prtq[i].tx_stat.out,
1775                         lc->prtq[i].tx_stat.drop);
1776         }
1777
1778         for (i = 0; i != lc->prtq_num; i++)
1779                 for (j = 0; j != lc->prtq[i].tx_buf.num; j++)
1780                         rte_pktmbuf_free(lc->prtq[i].tx_buf.pkt[j]);
1781
1782         RTE_PER_LCORE(_be) = NULL;
1783 }
1784
1785 static int
1786 lcore_main(void *arg)
1787 {
1788         int32_t rc;
1789         uint32_t lcore;
1790         struct lcore_prm *prm;
1791
1792         prm = arg;
1793         lcore = rte_lcore_id();
1794
1795         RTE_LOG(NOTICE, USER1, "%s(lcore=%u) start\n",
1796                 __func__, lcore);
1797
1798         rc = 0;
1799
1800         /* lcore FE init. */
1801         if (prm->fe.max_streams != 0)
1802                 rc = netfe_lcore_init(&prm->fe);
1803
1804         /* lcore FE init. */
1805         if (rc == 0 && prm->be.lc != NULL)
1806                 rc = netbe_lcore_setup(prm->be.lc);
1807
1808         if (rc != 0)
1809                 sig_handle(SIGQUIT);
1810
1811         while (force_quit == 0) {
1812                 netfe_lcore();
1813                 netbe_lcore();
1814         }
1815
1816         RTE_LOG(NOTICE, USER1, "%s(lcore=%u) finish\n",
1817                 __func__, lcore);
1818
1819         netfe_lcore_fini();
1820         netbe_lcore_clear();
1821
1822         return rc;
1823 }
1824
1825 static int
1826 netfe_lcore_cmp(const void *s1, const void *s2)
1827 {
1828         const struct netfe_stream_prm *p1, *p2;
1829
1830         p1 = s1;
1831         p2 = s2;
1832         return p1->lcore - p2->lcore;
1833 }
1834
1835 static int
1836 netbe_find6(const struct in6_addr *laddr, uint16_t lport,
1837         const struct in6_addr *raddr, uint32_t be_lc)
1838 {
1839         uint32_t i, j;
1840         uint8_t idx;
1841         struct netbe_lcore *bc;
1842
1843         /* we have exactly one BE, use it for all traffic */
1844         if (becfg.cpu_num == 1)
1845                 return 0;
1846
1847         /* search by provided be_lcore */
1848         if (be_lc != LCORE_ID_ANY) {
1849                 for (i = 0; i != becfg.cpu_num; i++) {
1850                         bc = becfg.cpu + i;
1851                         if (be_lc == bc->id)
1852                                 return i;
1853                 }
1854                 RTE_LOG(NOTICE, USER1, "%s: no stream with be_lcore=%u\n",
1855                         __func__, be_lc);
1856                 return -ENOENT;
1857         }
1858
1859         /* search by local address */
1860         if (memcmp(laddr, &in6addr_any, sizeof(*laddr)) != 0) {
1861                 for (i = 0; i != becfg.cpu_num; i++) {
1862                         bc = becfg.cpu + i;
1863                         /* search by queue for the local port */
1864                         for (j = 0; j != bc->prtq_num; j++) {
1865                                 if (memcmp(laddr, &bc->prtq[j].port.ipv6,
1866                                                 sizeof(*laddr)) == 0) {
1867
1868                                         if (lport == 0)
1869                                                 return i;
1870
1871                                         if (verify_queue_for_port(bc->prtq + j,
1872                                                         lport) != 0)
1873                                                 return i;
1874                                 }
1875                         }
1876                 }
1877         }
1878
1879         /* search by remote address */
1880         if (memcmp(raddr, &in6addr_any, sizeof(*raddr)) == 0) {
1881                 for (i = 0; i != becfg.cpu_num; i++) {
1882                         bc = becfg.cpu + i;
1883                         if (rte_lpm6_lookup(bc->lpm6,
1884                                         (uint8_t *)(uintptr_t)raddr->s6_addr,
1885                                         &idx) == 0) {
1886
1887                                 if (lport == 0)
1888                                         return i;
1889
1890                                 /* search by queue for the local port */
1891                                 for (j = 0; j != bc->prtq_num; j++)
1892                                         if (verify_queue_for_port(bc->prtq + j,
1893                                                         lport) != 0)
1894                                                 return i;
1895                         }
1896                 }
1897         }
1898
1899         return -ENOENT;
1900 }
1901
1902 static int
1903 netbe_find(const struct tle_udp_stream_param *p, uint32_t be_lc)
1904 {
1905         const struct sockaddr_in *l4, *r4;
1906         const struct sockaddr_in6 *l6, *r6;
1907
1908         if (p->local_addr.ss_family == AF_INET) {
1909                 l4 = (const struct sockaddr_in *)&p->local_addr;
1910                 r4 = (const struct sockaddr_in *)&p->remote_addr;
1911                 return netbe_find4(&l4->sin_addr, ntohs(l4->sin_port),
1912                                 &r4->sin_addr, be_lc);
1913         } else if (p->local_addr.ss_family == AF_INET6) {
1914                 l6 = (const struct sockaddr_in6 *)&p->local_addr;
1915                 r6 = (const struct sockaddr_in6 *)&p->remote_addr;
1916                 return netbe_find6(&l6->sin6_addr, ntohs(l6->sin6_port),
1917                                 &r6->sin6_addr, be_lc);
1918         }
1919         return -EINVAL;
1920 }
1921
1922 static int
1923 netfe_sprm_flll_be(struct netfe_sprm *sp, uint32_t line, uint32_t be_lc)
1924 {
1925         int32_t bidx;
1926
1927         bidx = netbe_find(&sp->prm, be_lc);
1928         if (bidx < 0) {
1929                 RTE_LOG(ERR, USER1, "%s(line=%u): no BE for that stream\n",
1930                         __func__, line);
1931                 return -EINVAL;
1932         }
1933         sp->bidx = bidx;
1934         return 0;
1935 }
1936
1937 /* start front-end processing. */
1938 static int
1939 netfe_lcore_fill(struct lcore_prm prm[RTE_MAX_LCORE],
1940         struct netfe_lcore_prm *lprm)
1941 {
1942         uint32_t be_lc;
1943         uint32_t i, j, lc, ln;
1944
1945         /* determine on what BE each stream should be open. */
1946         for (i = 0; i != lprm->nb_streams; i++) {
1947                 lc = lprm->stream[i].lcore;
1948                 ln = lprm->stream[i].line;
1949                 be_lc = lprm->stream[i].be_lcore;
1950                 if (netfe_sprm_flll_be(&lprm->stream[i].sprm, ln,
1951                                 be_lc) != 0 ||
1952                                 (lprm->stream[i].op == FWD &&
1953                                 netfe_sprm_flll_be(&lprm->stream[i].fprm, ln,
1954                                         be_lc) != 0))
1955                         return -EINVAL;
1956         }
1957
1958         /* group all fe parameters by lcore. */
1959
1960         qsort(lprm->stream, lprm->nb_streams, sizeof(lprm->stream[0]),
1961                 netfe_lcore_cmp);
1962
1963         for (i = 0; i != lprm->nb_streams; i = j) {
1964
1965                 lc = lprm->stream[i].lcore;
1966                 ln = lprm->stream[i].line;
1967
1968                 if (rte_lcore_is_enabled(lc) == 0) {
1969                         RTE_LOG(ERR, USER1,
1970                                 "%s(line=%u): lcore %u is not enabled\n",
1971                                 __func__, ln, lc);
1972                         return -EINVAL;
1973                 }
1974
1975                 if (rte_get_master_lcore() != lc &&
1976                                 rte_eal_get_lcore_state(lc) == RUNNING) {
1977                         RTE_LOG(ERR, USER1,
1978                                 "%s(line=%u): lcore %u already in use\n",
1979                                 __func__, ln, lc);
1980                         return -EINVAL;
1981                 }
1982
1983                 for (j = i + 1; j != lprm->nb_streams &&
1984                                 lc == lprm->stream[j].lcore;
1985                                 j++)
1986                         ;
1987
1988                 prm[lc].fe.max_streams = lprm->max_streams;
1989                 prm[lc].fe.nb_streams = j - i;
1990                 prm[lc].fe.stream = lprm->stream + i;
1991         }
1992
1993         return 0;
1994 }
1995
1996 int
1997 main(int argc, char *argv[])
1998 {
1999         int32_t opt, opt_idx, rc;
2000         uint32_t i;
2001         uint64_t v;
2002         struct tle_udp_ctx_param ctx_prm;
2003         struct netfe_lcore_prm feprm;
2004         struct rte_eth_stats stats;
2005         char fecfg_fname[PATH_MAX + 1];
2006         char becfg_fname[PATH_MAX + 1];
2007         struct lcore_prm prm[RTE_MAX_LCORE];
2008         struct rte_eth_dev_info dev_info;
2009
2010         fecfg_fname[0] = 0;
2011         becfg_fname[0] = 0;
2012         memset(prm, 0, sizeof(prm));
2013
2014         rc = rte_eal_init(argc, argv);
2015         if (rc < 0)
2016                 rte_exit(EXIT_FAILURE,
2017                         "%s: rte_eal_init failed with error code: %d\n",
2018                         __func__, rc);
2019
2020         memset(&ctx_prm, 0, sizeof(ctx_prm));
2021
2022         argc -= rc;
2023         argv += rc;
2024
2025         optind = 0;
2026         optarg = NULL;
2027         while ((opt = getopt_long(argc, argv, "B:PR:S:b:f:s:", long_opt,
2028                         &opt_idx)) != EOF) {
2029                 if (opt == OPT_SHORT_SBULK) {
2030                         rc = parse_uint_val(NULL, optarg, &v);
2031                         if (rc < 0)
2032                                 rte_exit(EXIT_FAILURE, "%s: invalid value: %s "
2033                                         "for option: \'%c\'\n",
2034                                         __func__, optarg, opt);
2035                         ctx_prm.send_bulk_size = v;
2036                 } else if (opt == OPT_SHORT_PROMISC) {
2037                         becfg.promisc = 1;
2038                 } else if (opt == OPT_SHORT_RBUFS) {
2039                         rc = parse_uint_val(NULL, optarg, &v);
2040                         if (rc < 0)
2041                                 rte_exit(EXIT_FAILURE, "%s: invalid value: %s "
2042                                         "for option: \'%c\'\n",
2043                                         __func__, optarg, opt);
2044                         ctx_prm.max_stream_rbufs = v;
2045                 } else if (opt == OPT_SHORT_SBUFS) {
2046                         rc = parse_uint_val(NULL, optarg, &v);
2047                         if (rc < 0)
2048                                 rte_exit(EXIT_FAILURE, "%s: invalid value: %s "
2049                                         "for option: \'%c\'\n",
2050                                         __func__, optarg, opt);
2051                         ctx_prm.max_stream_sbufs = v;
2052                 } else if (opt == OPT_SHORT_STREAMS) {
2053                         rc = parse_uint_val(NULL, optarg, &v);
2054                         if (rc < 0)
2055                                 rte_exit(EXIT_FAILURE, "%s: invalid value: %s "
2056                                         "for option: \'%c\'\n",
2057                                         __func__, optarg, opt);
2058                         ctx_prm.max_streams = v;
2059                 } else if (opt == OPT_SHORT_BECFG) {
2060                         snprintf(becfg_fname, sizeof(becfg_fname), "%s",
2061                                 optarg);
2062                 } else if (opt == OPT_SHORT_FECFG) {
2063                         snprintf(fecfg_fname, sizeof(fecfg_fname), "%s",
2064                                 optarg);
2065                 } else {
2066                         rte_exit(EXIT_FAILURE,
2067                                 "%s: unknown option: \'%c\'\n",
2068                                 __func__, opt);
2069                 }
2070         }
2071
2072         signal(SIGINT, sig_handle);
2073
2074         rc = netbe_port_init(&becfg, argc - optind, argv + optind);
2075         if (rc != 0)
2076                 rte_exit(EXIT_FAILURE,
2077                         "%s: netbe_port_init failed with error code: %d\n",
2078                         __func__, rc);
2079
2080         rc = netbe_lcore_init(&becfg, &ctx_prm);
2081         if (rc != 0)
2082                 sig_handle(SIGQUIT);
2083
2084         if ((rc = netbe_dest_init(becfg_fname, &becfg)) != 0)
2085                 sig_handle(SIGQUIT);
2086
2087         for (i = 0; i != becfg.prt_num && rc == 0; i++) {
2088                 RTE_LOG(NOTICE, USER1, "%s: starting port %u\n",
2089                         __func__, becfg.prt[i].id);
2090                 rc = rte_eth_dev_start(becfg.prt[i].id);
2091                 if (rc != 0) {
2092                         RTE_LOG(ERR, USER1,
2093                                 "%s: rte_eth_dev_start(%u) returned "
2094                                 "error code: %d\n",
2095                                 __func__, becfg.prt[i].id, rc);
2096                         sig_handle(SIGQUIT);
2097                 }
2098                 rte_eth_dev_info_get(becfg.prt[i].id, &dev_info);
2099                 rc = update_rss_reta(&becfg.prt[i], &dev_info);
2100                 if (rc != 0)
2101                         sig_handle(SIGQUIT);
2102         }
2103
2104         feprm.max_streams = ctx_prm.max_streams * becfg.cpu_num;
2105         if (rc == 0 && (rc = netfe_parse_cfg(fecfg_fname, &feprm)) != 0)
2106                 sig_handle(SIGQUIT);
2107
2108         for (i = 0; rc == 0 && i != becfg.cpu_num; i++)
2109                 prm[becfg.cpu[i].id].be.lc = becfg.cpu + i;
2110
2111         if (rc == 0 && (rc = netfe_lcore_fill(prm, &feprm)) != 0)
2112                 sig_handle(SIGQUIT);
2113
2114         /* launch all slave lcores. */
2115         RTE_LCORE_FOREACH_SLAVE(i) {
2116                 if (prm[i].be.lc != NULL || prm[i].fe.max_streams != 0)
2117                         rte_eal_remote_launch(lcore_main, prm + i, i);
2118         }
2119
2120         /* launch master lcore. */
2121         i = rte_get_master_lcore();
2122         if (prm[i].be.lc != NULL || prm[i].fe.max_streams != 0)
2123                 lcore_main(prm + i);
2124
2125         rte_eal_mp_wait_lcore();
2126
2127         for (i = 0; i != becfg.prt_num; i++) {
2128                 RTE_LOG(NOTICE, USER1, "%s: stoping port %u\n",
2129                         __func__, becfg.prt[i].id);
2130                 rte_eth_stats_get(becfg.prt[i].id, &stats);
2131                 RTE_LOG(NOTICE, USER1, "port %u stats={\n"
2132                         "ipackets=%" PRIu64 ";"
2133                         "ibytes=%" PRIu64 ";"
2134                         "ierrors=%" PRIu64 ";\n"
2135                         "opackets=%" PRIu64 ";"
2136                         "obytes=%" PRIu64 ";"
2137                         "oerrors=%" PRIu64 ";\n"
2138                         "}\n",
2139                         becfg.prt[i].id,
2140                         stats.ipackets,
2141                         stats.ibytes,
2142                         stats.ierrors,
2143                         stats.opackets,
2144                         stats.obytes,
2145                         stats.oerrors);
2146                 rte_eth_dev_stop(becfg.prt[i].id);
2147         }
2148
2149         netbe_lcore_fini(&becfg);
2150
2151         return 0;
2152 }