2 * Copyright (c) 2016 Intel Corporation.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at:
7 * http://www.apache.org/licenses/LICENSE-2.0
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
22 sig_handle(int signum)
24 RTE_LOG(ERR, USER1, "%s(%d)\n", __func__, signum);
29 netfe_stream_dump(const struct netfe_stream *fes, struct sockaddr_storage *la,
30 struct sockaddr_storage *ra)
32 struct sockaddr_in *l4, *r4;
33 struct sockaddr_in6 *l6, *r6;
34 uint16_t lport, rport;
35 char laddr[INET6_ADDRSTRLEN];
36 char raddr[INET6_ADDRSTRLEN];
38 if (la->ss_family == AF_INET) {
40 l4 = (struct sockaddr_in *)la;
41 r4 = (struct sockaddr_in *)ra;
46 } else if (la->ss_family == AF_INET6) {
48 l6 = (struct sockaddr_in6 *)la;
49 r6 = (struct sockaddr_in6 *)ra;
51 lport = l6->sin6_port;
52 rport = r6->sin6_port;
55 RTE_LOG(ERR, USER1, "stream@%p - unknown family=%hu\n",
56 fes->s, la->ss_family);
60 format_addr(la, laddr, sizeof(laddr));
61 format_addr(ra, raddr, sizeof(raddr));
63 RTE_LOG(INFO, USER1, "stream@%p={s=%p,"
64 "family=%hu,proto=%s,laddr=%s,lport=%hu,raddr=%s,rport=%hu;"
66 "rxp=%" PRIu64 ",rxb=%" PRIu64
67 ",txp=%" PRIu64 ",txb=%" PRIu64
69 "rxev[IDLE, DOWN, UP]=[%" PRIu64 ", %" PRIu64 ", %" PRIu64 "],"
70 "txev[IDLE, DOWN, UP]=[%" PRIu64 ", %" PRIu64 ", %" PRIu64 "]"
72 fes, fes->s, la->ss_family, proto_name[fes->proto],
73 laddr, ntohs(lport), raddr, ntohs(rport),
74 fes->stat.rxp, fes->stat.rxb,
75 fes->stat.txp, fes->stat.txb,
77 fes->stat.rxev[TLE_SEV_IDLE],
78 fes->stat.rxev[TLE_SEV_DOWN],
79 fes->stat.rxev[TLE_SEV_UP],
80 fes->stat.txev[TLE_SEV_IDLE],
81 fes->stat.txev[TLE_SEV_DOWN],
82 fes->stat.txev[TLE_SEV_UP]);
85 static inline uint32_t
86 netfe_get_streams(struct netfe_stream_list *list, struct netfe_stream *rs[],
89 struct netfe_stream *s;
92 n = RTE_MIN(list->num, num);
93 for (i = 0, s = LIST_FIRST(&list->head);
95 i++, s = LIST_NEXT(s, link)) {
100 /* we retrieved all free entries */
101 LIST_INIT(&list->head);
103 LIST_FIRST(&list->head) = s;
110 static inline struct netfe_stream *
111 netfe_get_stream(struct netfe_stream_list *list)
113 struct netfe_stream *s;
119 netfe_get_streams(list, &s, 1);
125 netfe_put_streams(struct netfe_lcore *fe, struct netfe_stream_list *list,
126 struct netfe_stream *fs[], uint32_t num)
130 n = RTE_MIN(fe->snum - list->num, num);
132 RTE_LOG(ERR, USER1, "%s: list overflow by %u\n", __func__,
135 for (i = 0; i != n; i++)
136 LIST_INSERT_HEAD(&list->head, fs[i], link);
141 netfe_put_stream(struct netfe_lcore *fe, struct netfe_stream_list *list,
142 struct netfe_stream *s)
144 if (list->num == fe->snum) {
145 RTE_LOG(ERR, USER1, "%s: list is full\n", __func__);
149 netfe_put_streams(fe, list, &s, 1);
153 netfe_rem_stream(struct netfe_stream_list *list, struct netfe_stream *s)
155 LIST_REMOVE(s, link);
160 netfe_stream_close(struct netfe_lcore *fe, struct netfe_stream *fes)
162 tle_stream_close(fes->s);
163 tle_event_free(fes->txev);
164 tle_event_free(fes->rxev);
165 tle_event_free(fes->erev);
166 memset(fes, 0, sizeof(*fes));
167 netfe_put_stream(fe, &fe->free, fes);
171 * Helper functions, verify the queue for corresponding UDP port.
174 verify_queue_for_port(const struct netbe_dev *prtq, const uint16_t lport)
176 uint32_t align_nb_q, qid;
178 align_nb_q = rte_align32pow2(prtq->port.nb_lcore);
179 qid = (lport % align_nb_q) % prtq->port.nb_lcore;
180 if (prtq->rxqid == qid)
187 pkt_buf_empty(struct pkt_buf *pb)
193 for (i = 0; i != pb->num; i++) {
194 x += pb->pkt[i]->pkt_len;
195 NETFE_PKT_DUMP(pb->pkt[i]);
196 rte_pktmbuf_free(pb->pkt[i]);
204 pkt_buf_fill(uint32_t lcore, struct pkt_buf *pb, uint32_t dlen)
209 sid = rte_lcore_to_socket_id(lcore) + 1;
211 for (i = pb->num; i != RTE_DIM(pb->pkt); i++) {
212 pb->pkt[i] = rte_pktmbuf_alloc(mpool[sid]);
213 if (pb->pkt[i] == NULL)
215 rte_pktmbuf_append(pb->pkt[i], dlen);
222 netbe_lcore_setup(struct netbe_lcore *lc)
227 RTE_LOG(NOTICE, USER1, "%s:(lcore=%u, proto=%s, ctx=%p) start\n",
228 __func__, lc->id, proto_name[lc->proto], lc->ctx);
232 * wait for FE lcores to start, so BE dont' drop any packets
233 * because corresponding streams not opened yet by FE.
234 * useful when used with pcap PMDS.
235 * think better way, or should this timeout be a cmdlien parameter.
241 for (i = 0; i != lc->prtq_num && rc == 0; i++) {
242 RTE_LOG(NOTICE, USER1,
243 "%s:%u(port=%u, q=%u, proto=%s, dev=%p)\n",
244 __func__, i, lc->prtq[i].port.id, lc->prtq[i].rxqid,
245 proto_name[lc->proto], lc->prtq[i].dev);
247 rc = setup_rx_cb(&lc->prtq[i].port, lc, lc->prtq[i].rxqid,
254 RTE_PER_LCORE(_be) = lc;
259 netbe_lcore_clear(void)
262 struct netbe_lcore *lc;
264 lc = RTE_PER_LCORE(_be);
268 RTE_LOG(NOTICE, USER1, "%s(lcore=%u, proto=%s, ctx: %p) finish\n",
269 __func__, lc->id, proto_name[lc->proto], lc->ctx);
270 for (i = 0; i != lc->prtq_num; i++) {
271 RTE_LOG(NOTICE, USER1, "%s:%u(port=%u, q=%u, lcore=%u, dev=%p) "
273 "in=%" PRIu64 ",up=%" PRIu64 ",drop=%" PRIu64 "}, "
275 "in=%" PRIu64 ",up=%" PRIu64 ",drop=%" PRIu64 "};\n",
276 __func__, i, lc->prtq[i].port.id, lc->prtq[i].rxqid,
279 lc->prtq[i].rx_stat.in,
280 lc->prtq[i].rx_stat.up,
281 lc->prtq[i].rx_stat.drop,
282 lc->prtq[i].tx_stat.down,
283 lc->prtq[i].tx_stat.out,
284 lc->prtq[i].tx_stat.drop);
287 RTE_LOG(NOTICE, USER1, "tcp_stat={\n");
288 for (i = 0; i != RTE_DIM(lc->tcp_stat.flags); i++) {
289 if (lc->tcp_stat.flags[i] != 0)
290 RTE_LOG(NOTICE, USER1, "[flag=%#x]==%" PRIu64 ";\n",
291 i, lc->tcp_stat.flags[i]);
293 RTE_LOG(NOTICE, USER1, "};\n");
295 for (i = 0; i != lc->prtq_num; i++)
296 for (j = 0; j != lc->prtq[i].tx_buf.num; j++)
297 rte_pktmbuf_free(lc->prtq[i].tx_buf.pkt[j]);
299 RTE_PER_LCORE(_be) = NULL;
303 netbe_add_ipv4_route(struct netbe_lcore *lc, const struct netbe_dest *dst,
307 uint32_t addr, depth;
308 char str[INET_ADDRSTRLEN];
311 addr = rte_be_to_cpu_32(dst->ipv4.s_addr);
313 inet_ntop(AF_INET, &dst->ipv4, str, sizeof(str));
314 rc = rte_lpm_add(lc->lpm4, addr, depth, idx);
315 RTE_LOG(NOTICE, USER1, "%s(lcore=%u,port=%u,dev=%p,"
317 "mac=%02hhx:%02hhx:%02hhx:%02hhx:%02hhx:%02hhx) "
319 __func__, lc->id, dst->port, lc->dst4[idx].dev,
320 str, depth, lc->dst4[idx].mtu,
321 dst->mac.addr_bytes[0], dst->mac.addr_bytes[1],
322 dst->mac.addr_bytes[2], dst->mac.addr_bytes[3],
323 dst->mac.addr_bytes[4], dst->mac.addr_bytes[5],
329 netbe_add_ipv6_route(struct netbe_lcore *lc, const struct netbe_dest *dst,
334 char str[INET6_ADDRSTRLEN];
338 rc = rte_lpm6_add(lc->lpm6, (uint8_t *)(uintptr_t)dst->ipv6.s6_addr,
341 inet_ntop(AF_INET6, &dst->ipv6, str, sizeof(str));
342 RTE_LOG(NOTICE, USER1, "%s(lcore=%u,port=%u,dev=%p,"
344 "mac=%02hhx:%02hhx:%02hhx:%02hhx:%02hhx:%02hhx) "
346 __func__, lc->id, dst->port, lc->dst6[idx].dev,
347 str, depth, lc->dst4[idx].mtu,
348 dst->mac.addr_bytes[0], dst->mac.addr_bytes[1],
349 dst->mac.addr_bytes[2], dst->mac.addr_bytes[3],
350 dst->mac.addr_bytes[4], dst->mac.addr_bytes[5],
356 fill_dst(struct tle_dest *dst, struct netbe_dev *bed,
357 const struct netbe_dest *bdp, uint16_t l3_type, int32_t sid,
360 struct ether_hdr *eth;
361 struct ipv4_hdr *ip4h;
362 struct ipv6_hdr *ip6h;
365 dst->head_mp = frag_mpool[sid + 1];
366 dst->mtu = RTE_MIN(bdp->mtu, bed->port.mtu);
367 dst->l2_len = sizeof(*eth);
369 eth = (struct ether_hdr *)dst->hdr;
371 ether_addr_copy(&bed->port.mac, ð->s_addr);
372 ether_addr_copy(&bdp->mac, ð->d_addr);
373 eth->ether_type = rte_cpu_to_be_16(l3_type);
375 if (l3_type == ETHER_TYPE_IPv4) {
376 dst->l3_len = sizeof(*ip4h);
377 ip4h = (struct ipv4_hdr *)(eth + 1);
378 ip4h->version_ihl = 4 << 4 |
379 sizeof(*ip4h) / IPV4_IHL_MULTIPLIER;
380 ip4h->time_to_live = 64;
381 ip4h->next_proto_id = proto_id;
382 } else if (l3_type == ETHER_TYPE_IPv6) {
383 dst->l3_len = sizeof(*ip6h);
384 ip6h = (struct ipv6_hdr *)(eth + 1);
385 ip6h->vtc_flow = 6 << 4;
386 ip6h->proto = proto_id;
387 ip6h->hop_limits = 64;
392 netbe_add_dest(struct netbe_lcore *lc, uint32_t dev_idx, uint16_t family,
393 const struct netbe_dest *dst, uint32_t dnum)
401 if (family == AF_INET) {
404 m = RTE_DIM(lc->dst4);
405 l3_type = ETHER_TYPE_IPv4;
409 m = RTE_DIM(lc->dst6);
410 l3_type = ETHER_TYPE_IPv6;
414 RTE_LOG(ERR, USER1, "%s(lcore=%u, family=%hu, dnum=%u) exceeds "
415 "maximum allowed number of destinations(%u);\n",
416 __func__, lc->id, family, dnum, m);
420 sid = rte_lcore_to_socket_id(lc->id);
421 proto = (becfg.proto == TLE_PROTO_UDP) ? IPPROTO_UDP : IPPROTO_TCP;
424 for (i = 0; i != dnum && rc == 0; i++) {
425 fill_dst(dp + i, lc->prtq + dev_idx, dst + i, l3_type, sid,
427 if (family == AF_INET)
428 rc = netbe_add_ipv4_route(lc, dst + i, n + i);
430 rc = netbe_add_ipv6_route(lc, dst + i, n + i);
433 if (family == AF_INET)
434 lc->dst4_num = n + i;
436 lc->dst6_num = n + i;
442 fill_arp_reply(struct netbe_dev *dev, struct rte_mbuf *m)
444 struct ether_hdr *eth;
445 struct arp_hdr *ahdr;
446 struct arp_ipv4 *adata;
449 /* set up the ethernet data */
450 eth = rte_pktmbuf_mtod(m, struct ether_hdr *);
451 eth->d_addr = eth->s_addr;
452 eth->s_addr = dev->port.mac;
454 /* set up the arp data */
455 ahdr = rte_pktmbuf_mtod_offset(m, struct arp_hdr *, m->l2_len);
456 adata = &ahdr->arp_data;
458 ahdr->arp_op = rte_cpu_to_be_16(ARP_OP_REPLY);
460 tip = adata->arp_tip;
461 adata->arp_tip = adata->arp_sip;
462 adata->arp_sip = tip;
464 adata->arp_tha = adata->arp_sha;
465 adata->arp_sha = dev->port.mac;
468 /* this is a semi ARP response implementation of RFC 826
469 * in RFC, it algo is as below
471 * ?Do I have the hardware type in ar$hrd?
472 * Yes: (almost definitely)
473 * [optionally check the hardware length ar$hln]
474 * ?Do I speak the protocol in ar$pro?
476 * [optionally check the protocol length ar$pln]
477 * Merge_flag := false
478 * If the pair <protocol type, sender protocol address> is
479 * already in my translation table, update the sender
480 * hardware address field of the entry with the new
481 * information in the packet and set Merge_flag to true.
482 * ?Am I the target protocol address?
484 * If Merge_flag is false, add the triplet <protocol type,
485 * sender protocol address, sender hardware address> to
486 * the translation table.
487 * ?Is the opcode ares_op$REQUEST? (NOW look at the opcode!!)
489 * Swap hardware and protocol fields, putting the local
490 * hardware and protocol addresses in the sender fields.
491 * Set the ar$op field to ares_op$REPLY
492 * Send the packet to the (new) target hardware address on
493 * the same hardware on which the request was received.
495 * So, in our implementation we skip updating the local cache,
496 * we assume that local cache is ok, so we just reply the packet.
500 send_arp_reply(struct netbe_dev *dev, struct pkt_buf *pb)
507 for (i = 0; i != num; i++) {
508 fill_arp_reply(dev, m[i]);
509 NETBE_PKT_DUMP(m[i]);
512 n = rte_eth_tx_burst(dev->port.id, dev->txqid, m, num);
513 NETBE_TRACE("%s: sent n=%u arp replies\n", __func__, n);
515 /* free mbufs with unsent arp response */
516 for (i = n; i != num; i++)
517 rte_pktmbuf_free(m[i]);
523 netbe_rx(struct netbe_lcore *lc, uint32_t pidx)
526 struct rte_mbuf *pkt[MAX_PKT_BURST];
527 struct rte_mbuf *rp[MAX_PKT_BURST];
528 int32_t rc[MAX_PKT_BURST];
529 struct pkt_buf *abuf;
531 n = rte_eth_rx_burst(lc->prtq[pidx].port.id,
532 lc->prtq[pidx].rxqid, pkt, RTE_DIM(pkt));
535 lc->prtq[pidx].rx_stat.in += n;
536 NETBE_TRACE("%s(%u): rte_eth_rx_burst(%u, %u) returns %u\n",
537 __func__, lc->id, lc->prtq[pidx].port.id,
538 lc->prtq[pidx].rxqid, n);
540 k = tle_rx_bulk(lc->prtq[pidx].dev, pkt, rp, rc, n);
542 lc->prtq[pidx].rx_stat.up += k;
543 lc->prtq[pidx].rx_stat.drop += n - k;
544 NETBE_TRACE("%s(%u): tle_%s_rx_bulk(%p, %u) returns %u\n",
545 __func__, lc->id, proto_name[lc->proto],
546 lc->prtq[pidx].dev, n, k);
548 for (j = 0; j != n - k; j++) {
549 NETBE_TRACE("%s:%d(port=%u) rp[%u]={%p, %d};\n",
550 __func__, __LINE__, lc->prtq[pidx].port.id,
552 rte_pktmbuf_free(rp[j]);
556 /* respond to incoming arp requests */
557 abuf = &lc->prtq[pidx].arp_buf;
561 send_arp_reply(&lc->prtq[pidx], abuf);
565 netbe_tx(struct netbe_lcore *lc, uint32_t pidx)
568 struct rte_mbuf **mb;
570 n = lc->prtq[pidx].tx_buf.num;
571 k = RTE_DIM(lc->prtq[pidx].tx_buf.pkt) - n;
572 mb = lc->prtq[pidx].tx_buf.pkt;
574 if (k >= RTE_DIM(lc->prtq[pidx].tx_buf.pkt) / 2) {
575 j = tle_tx_bulk(lc->prtq[pidx].dev, mb + n, k);
577 lc->prtq[pidx].tx_stat.down += j;
583 NETBE_TRACE("%s(%u): tle_%s_tx_bulk(%p) returns %u,\n"
584 "total pkts to send: %u\n",
585 __func__, lc->id, proto_name[lc->proto],
586 lc->prtq[pidx].dev, j, n);
588 for (j = 0; j != n; j++)
589 NETBE_PKT_DUMP(mb[j]);
591 k = rte_eth_tx_burst(lc->prtq[pidx].port.id,
592 lc->prtq[pidx].txqid, mb, n);
594 lc->prtq[pidx].tx_stat.out += k;
595 lc->prtq[pidx].tx_stat.drop += n - k;
596 NETBE_TRACE("%s(%u): rte_eth_tx_burst(%u, %u, %u) returns %u\n",
597 __func__, lc->id, lc->prtq[pidx].port.id, lc->prtq[pidx].txqid,
600 lc->prtq[pidx].tx_buf.num = n - k;
602 for (j = k; j != n; j++)
610 struct netbe_lcore *lc;
612 lc = RTE_PER_LCORE(_be);
616 for (i = 0; i != lc->prtq_num; i++) {
623 netfe_rxtx_get_mss(struct netfe_stream *fes)
625 switch (fes->proto) {
627 return tle_tcp_stream_get_mss(fes->s);
630 /* The UDP code doesn't have MSS discovery, so have to
631 * assume arbitary MTU. Going to use default mbuf
632 * data space as TLDK uses this internally as a
633 * maximum segment size.
635 return RTE_MBUF_DEFAULT_DATAROOM - TLE_DST_MAX_HDR;
642 netfe_rxtx_dispatch_reply(uint32_t lcore, struct netfe_stream *fes)
647 uint32_t cnt_mtu_pkts;
648 uint32_t cnt_all_pkts;
657 sid = rte_lcore_to_socket_id(lcore) + 1;
658 mtu = netfe_rxtx_get_mss(fes);
660 cnt_mtu_pkts = (fes->txlen / mtu);
661 cnt_all_pkts = cnt_mtu_pkts;
662 len_tail = fes->txlen - (mtu * cnt_mtu_pkts);
667 if (pb->num + cnt_all_pkts >= RTE_DIM(pb->pkt)) {
668 NETFE_TRACE("%s(%u): Insufficent space for outbound burst\n",
672 if (rte_pktmbuf_alloc_bulk(mpool[sid], &pb->pkt[pb->num], cnt_all_pkts)
674 NETFE_TRACE("%s(%u): rte_pktmbuf_alloc_bulk() failed\n",
680 src = tx_content.data;
684 /* Full MTU packets */
685 for (idx_pkt = 0; idx_pkt < cnt_mtu_pkts; idx_pkt++, n++) {
686 rte_pktmbuf_reset(pb->pkt[n]);
687 dst = rte_pktmbuf_append(pb->pkt[n], mtu);
689 len = RTE_MIN(mtu, csz);
690 rte_memcpy(dst, src, len);
696 /* Last non-MTU packet, if any */
698 rte_pktmbuf_reset(pb->pkt[n]);
699 dst = rte_pktmbuf_append(pb->pkt[n], len_tail);
701 len = RTE_MIN(len_tail, csz);
702 rte_memcpy(dst, src, len);
715 netfe_rx_process(uint32_t lcore, struct netfe_stream *fes)
718 uint64_t count_bytes;
721 k = RTE_DIM(fes->pbuf.pkt) - n;
723 /* packet buffer is full, can't receive any new packets. */
725 tle_event_idle(fes->rxev);
726 fes->stat.rxev[TLE_SEV_IDLE]++;
730 n = tle_stream_recv(fes->s, fes->pbuf.pkt + n, k);
734 NETFE_TRACE("%s(%u): tle_%s_stream_recv(%p, %u) returns %u\n",
735 __func__, lcore, proto_name[fes->proto], fes->s, k, n);
740 /* free all received mbufs. */
741 if (fes->op == RXONLY)
742 fes->stat.rxb += pkt_buf_empty(&fes->pbuf);
743 else if (fes->op == RXTX) {
744 /* RXTX mode. Count incoming bytes then discard.
745 * If receive threshold (rxlen) exceeded, send out a packet.
747 count_bytes = pkt_buf_empty(&fes->pbuf);
748 fes->stat.rxb += count_bytes;
749 fes->rx_run_len += count_bytes;
750 if (fes->rx_run_len >= fes->rxlen) {
751 /* Idle Rx as buffer needed for Tx */
752 tle_event_idle(fes->rxev);
753 fes->stat.rxev[TLE_SEV_IDLE]++;
755 /* Discard surplus bytes. For now pipelining of
756 * requests is not supported.
759 netfe_rxtx_dispatch_reply(lcore, fes);
761 /* Kick off a Tx event */
762 tle_event_active(fes->txev, TLE_SEV_UP);
763 fes->stat.txev[TLE_SEV_UP]++;
766 /* mark stream as writable */
767 else if (k == RTE_DIM(fes->pbuf.pkt)) {
768 if (fes->op == ECHO) {
769 tle_event_active(fes->txev, TLE_SEV_UP);
770 fes->stat.txev[TLE_SEV_UP]++;
771 } else if (fes->op == FWD) {
772 tle_event_raise(fes->txev);
773 fes->stat.txev[TLE_SEV_UP]++;
780 #endif /* COMMON_H_ */