l4fwd: allow to specify TX payload contents for rxtx mode
[tldk.git] / examples / l4fwd / common.h
1 /*
2  * Copyright (c) 2016  Intel Corporation.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #ifndef COMMON_H_
17 #define COMMON_H_
18
19 #include <rte_arp.h>
20
21 static void
22 sig_handle(int signum)
23 {
24         RTE_LOG(ERR, USER1, "%s(%d)\n", __func__, signum);
25         force_quit = 1;
26 }
27
28 static void
29 netfe_stream_dump(const struct netfe_stream *fes, struct sockaddr_storage *la,
30         struct sockaddr_storage *ra)
31 {
32         struct sockaddr_in *l4, *r4;
33         struct sockaddr_in6 *l6, *r6;
34         uint16_t lport, rport;
35         char laddr[INET6_ADDRSTRLEN];
36         char raddr[INET6_ADDRSTRLEN];
37
38         if (la->ss_family == AF_INET) {
39
40                 l4 = (struct sockaddr_in *)la;
41                 r4 = (struct sockaddr_in *)ra;
42
43                 lport = l4->sin_port;
44                 rport = r4->sin_port;
45
46         } else if (la->ss_family == AF_INET6) {
47
48                 l6 = (struct sockaddr_in6 *)la;
49                 r6 = (struct sockaddr_in6 *)ra;
50
51                 lport = l6->sin6_port;
52                 rport = r6->sin6_port;
53
54         } else {
55                 RTE_LOG(ERR, USER1, "stream@%p - unknown family=%hu\n",
56                         fes->s, la->ss_family);
57                 return;
58         }
59
60         format_addr(la, laddr, sizeof(laddr));
61         format_addr(ra, raddr, sizeof(raddr));
62
63         RTE_LOG(INFO, USER1, "stream@%p={s=%p,"
64                 "family=%hu,proto=%s,laddr=%s,lport=%hu,raddr=%s,rport=%hu;"
65                 "stats={"
66                 "rxp=%" PRIu64 ",rxb=%" PRIu64
67                 ",txp=%" PRIu64 ",txb=%" PRIu64
68                 ",drops=%" PRIu64 ","
69                 "rxev[IDLE, DOWN, UP]=[%" PRIu64 ", %" PRIu64 ", %" PRIu64 "],"
70                 "txev[IDLE, DOWN, UP]=[%" PRIu64 ", %" PRIu64 ", %" PRIu64 "]"
71                 "};};\n",
72                 fes, fes->s, la->ss_family, proto_name[fes->proto],
73                 laddr, ntohs(lport), raddr, ntohs(rport),
74                 fes->stat.rxp, fes->stat.rxb,
75                 fes->stat.txp, fes->stat.txb,
76                 fes->stat.drops,
77                 fes->stat.rxev[TLE_SEV_IDLE],
78                 fes->stat.rxev[TLE_SEV_DOWN],
79                 fes->stat.rxev[TLE_SEV_UP],
80                 fes->stat.txev[TLE_SEV_IDLE],
81                 fes->stat.txev[TLE_SEV_DOWN],
82                 fes->stat.txev[TLE_SEV_UP]);
83 }
84
85 static inline uint32_t
86 netfe_get_streams(struct netfe_stream_list *list, struct netfe_stream *rs[],
87         uint32_t num)
88 {
89         struct netfe_stream *s;
90         uint32_t i, n;
91
92         n = RTE_MIN(list->num, num);
93         for (i = 0, s = LIST_FIRST(&list->head);
94                         i != n;
95                         i++, s = LIST_NEXT(s, link)) {
96                 rs[i] = s;
97         }
98
99         if (s == NULL)
100                 /* we retrieved all free entries */
101                 LIST_INIT(&list->head);
102         else
103                 LIST_FIRST(&list->head) = s;
104
105         list->num -= n;
106
107         return n;
108 }
109
110 static inline struct netfe_stream *
111 netfe_get_stream(struct netfe_stream_list *list)
112 {
113         struct netfe_stream *s;
114
115         s = NULL;
116         if (list->num == 0)
117                 return s;
118
119         netfe_get_streams(list, &s, 1);
120
121         return s;
122 }
123
124 static inline void
125 netfe_put_streams(struct netfe_lcore *fe, struct netfe_stream_list *list,
126         struct netfe_stream *fs[], uint32_t num)
127 {
128         uint32_t i, n;
129
130         n = RTE_MIN(fe->snum - list->num, num);
131         if (n != num)
132                 RTE_LOG(ERR, USER1, "%s: list overflow by %u\n", __func__,
133                         num - n);
134
135         for (i = 0; i != n; i++)
136                 LIST_INSERT_HEAD(&list->head, fs[i], link);
137         list->num += n;
138 }
139
140 static inline void
141 netfe_put_stream(struct netfe_lcore *fe, struct netfe_stream_list *list,
142         struct netfe_stream *s)
143 {
144         if (list->num == fe->snum) {
145                 RTE_LOG(ERR, USER1, "%s: list is full\n", __func__);
146                 return;
147         }
148
149         netfe_put_streams(fe, list, &s, 1);
150 }
151
152 static inline void
153 netfe_rem_stream(struct netfe_stream_list *list, struct netfe_stream *s)
154 {
155         LIST_REMOVE(s, link);
156         list->num--;
157 }
158
159 static void
160 netfe_stream_close(struct netfe_lcore *fe, struct netfe_stream *fes)
161 {
162         tle_stream_close(fes->s);
163         tle_event_free(fes->txev);
164         tle_event_free(fes->rxev);
165         tle_event_free(fes->erev);
166         memset(fes, 0, sizeof(*fes));
167         netfe_put_stream(fe, &fe->free, fes);
168 }
169
170 /*
171  * Helper functions, verify the queue for corresponding UDP port.
172  */
173 static uint8_t
174 verify_queue_for_port(const struct netbe_dev *prtq, const uint16_t lport)
175 {
176         uint32_t align_nb_q, qid;
177
178         align_nb_q = rte_align32pow2(prtq->port.nb_lcore);
179         qid = (lport % align_nb_q) % prtq->port.nb_lcore;
180         if (prtq->rxqid == qid)
181                 return 1;
182
183         return 0;
184 }
185
186 static inline size_t
187 pkt_buf_empty(struct pkt_buf *pb)
188 {
189         uint32_t i;
190         size_t x;
191
192         x = 0;
193         for (i = 0; i != pb->num; i++) {
194                 x += pb->pkt[i]->pkt_len;
195                 NETFE_PKT_DUMP(pb->pkt[i]);
196                 rte_pktmbuf_free(pb->pkt[i]);
197         }
198
199         pb->num = 0;
200         return x;
201 }
202
203 static inline void
204 pkt_buf_fill(uint32_t lcore, struct pkt_buf *pb, uint32_t dlen)
205 {
206         uint32_t i;
207         int32_t sid;
208
209         sid = rte_lcore_to_socket_id(lcore) + 1;
210
211         for (i = pb->num; i != RTE_DIM(pb->pkt); i++) {
212                 pb->pkt[i] = rte_pktmbuf_alloc(mpool[sid]);
213                 if (pb->pkt[i] == NULL)
214                         break;
215                 rte_pktmbuf_append(pb->pkt[i], dlen);
216         }
217
218         pb->num = i;
219 }
220
221 static int
222 netbe_lcore_setup(struct netbe_lcore *lc)
223 {
224         uint32_t i;
225         int32_t rc;
226
227         RTE_LOG(NOTICE, USER1, "%s:(lcore=%u, proto=%s, ctx=%p) start\n",
228                 __func__, lc->id, proto_name[lc->proto], lc->ctx);
229
230         /*
231          * ???????
232          * wait for FE lcores to start, so BE dont' drop any packets
233          * because corresponding streams not opened yet by FE.
234          * useful when used with pcap PMDS.
235          * think better way, or should this timeout be a cmdlien parameter.
236          * ???????
237          */
238         rte_delay_ms(10);
239
240         rc = 0;
241         for (i = 0; i != lc->prtq_num && rc == 0; i++) {
242                 RTE_LOG(NOTICE, USER1,
243                         "%s:%u(port=%u, q=%u, proto=%s, dev=%p)\n",
244                         __func__, i, lc->prtq[i].port.id, lc->prtq[i].rxqid,
245                         proto_name[lc->proto], lc->prtq[i].dev);
246
247                 rc = setup_rx_cb(&lc->prtq[i].port, lc, lc->prtq[i].rxqid,
248                         becfg.arp);
249                 if (rc < 0)
250                         return rc;
251         }
252
253         if (rc == 0)
254                 RTE_PER_LCORE(_be) = lc;
255         return rc;
256 }
257
258 static void
259 netbe_lcore_clear(void)
260 {
261         uint32_t i, j;
262         struct netbe_lcore *lc;
263
264         lc = RTE_PER_LCORE(_be);
265         if (lc == NULL)
266                 return;
267
268         RTE_LOG(NOTICE, USER1, "%s(lcore=%u, proto=%s, ctx: %p) finish\n",
269                 __func__, lc->id, proto_name[lc->proto], lc->ctx);
270         for (i = 0; i != lc->prtq_num; i++) {
271                 RTE_LOG(NOTICE, USER1, "%s:%u(port=%u, q=%u, lcore=%u, dev=%p) "
272                         "rx_stats={"
273                         "in=%" PRIu64 ",up=%" PRIu64 ",drop=%" PRIu64 "}, "
274                         "tx_stats={"
275                         "in=%" PRIu64 ",up=%" PRIu64 ",drop=%" PRIu64 "};\n",
276                         __func__, i, lc->prtq[i].port.id, lc->prtq[i].rxqid,
277                         lc->id,
278                         lc->prtq[i].dev,
279                         lc->prtq[i].rx_stat.in,
280                         lc->prtq[i].rx_stat.up,
281                         lc->prtq[i].rx_stat.drop,
282                         lc->prtq[i].tx_stat.down,
283                         lc->prtq[i].tx_stat.out,
284                         lc->prtq[i].tx_stat.drop);
285         }
286
287         RTE_LOG(NOTICE, USER1, "tcp_stat={\n");
288         for (i = 0; i != RTE_DIM(lc->tcp_stat.flags); i++) {
289                 if (lc->tcp_stat.flags[i] != 0)
290                         RTE_LOG(NOTICE, USER1, "[flag=%#x]==%" PRIu64 ";\n",
291                                 i, lc->tcp_stat.flags[i]);
292         }
293         RTE_LOG(NOTICE, USER1, "};\n");
294
295         for (i = 0; i != lc->prtq_num; i++)
296                 for (j = 0; j != lc->prtq[i].tx_buf.num; j++)
297                         rte_pktmbuf_free(lc->prtq[i].tx_buf.pkt[j]);
298
299         RTE_PER_LCORE(_be) = NULL;
300 }
301
302 static int
303 netbe_add_ipv4_route(struct netbe_lcore *lc, const struct netbe_dest *dst,
304         uint8_t idx)
305 {
306         int32_t rc;
307         uint32_t addr, depth;
308         char str[INET_ADDRSTRLEN];
309
310         depth = dst->prfx;
311         addr = rte_be_to_cpu_32(dst->ipv4.s_addr);
312
313         inet_ntop(AF_INET, &dst->ipv4, str, sizeof(str));
314         rc = rte_lpm_add(lc->lpm4, addr, depth, idx);
315         RTE_LOG(NOTICE, USER1, "%s(lcore=%u,port=%u,dev=%p,"
316                 "ipv4=%s/%u,mtu=%u,"
317                 "mac=%02hhx:%02hhx:%02hhx:%02hhx:%02hhx:%02hhx) "
318                 "returns %d;\n",
319                 __func__, lc->id, dst->port, lc->dst4[idx].dev,
320                 str, depth, lc->dst4[idx].mtu,
321                 dst->mac.addr_bytes[0], dst->mac.addr_bytes[1],
322                 dst->mac.addr_bytes[2], dst->mac.addr_bytes[3],
323                 dst->mac.addr_bytes[4], dst->mac.addr_bytes[5],
324                 rc);
325         return rc;
326 }
327
328 static int
329 netbe_add_ipv6_route(struct netbe_lcore *lc, const struct netbe_dest *dst,
330         uint8_t idx)
331 {
332         int32_t rc;
333         uint32_t depth;
334         char str[INET6_ADDRSTRLEN];
335
336         depth = dst->prfx;
337
338         rc = rte_lpm6_add(lc->lpm6, (uint8_t *)(uintptr_t)dst->ipv6.s6_addr,
339                 depth, idx);
340
341         inet_ntop(AF_INET6, &dst->ipv6, str, sizeof(str));
342         RTE_LOG(NOTICE, USER1, "%s(lcore=%u,port=%u,dev=%p,"
343                 "ipv6=%s/%u,mtu=%u,"
344                 "mac=%02hhx:%02hhx:%02hhx:%02hhx:%02hhx:%02hhx) "
345                 "returns %d;\n",
346                 __func__, lc->id, dst->port, lc->dst6[idx].dev,
347                 str, depth, lc->dst4[idx].mtu,
348                 dst->mac.addr_bytes[0], dst->mac.addr_bytes[1],
349                 dst->mac.addr_bytes[2], dst->mac.addr_bytes[3],
350                 dst->mac.addr_bytes[4], dst->mac.addr_bytes[5],
351                 rc);
352         return rc;
353 }
354
355 static void
356 fill_dst(struct tle_dest *dst, struct netbe_dev *bed,
357         const struct netbe_dest *bdp, uint16_t l3_type, int32_t sid,
358         uint8_t proto_id)
359 {
360         struct ether_hdr *eth;
361         struct ipv4_hdr *ip4h;
362         struct ipv6_hdr *ip6h;
363
364         dst->dev = bed->dev;
365         dst->head_mp = frag_mpool[sid + 1];
366         dst->mtu = RTE_MIN(bdp->mtu, bed->port.mtu);
367         dst->l2_len = sizeof(*eth);
368
369         eth = (struct ether_hdr *)dst->hdr;
370
371         ether_addr_copy(&bed->port.mac, &eth->s_addr);
372         ether_addr_copy(&bdp->mac, &eth->d_addr);
373         eth->ether_type = rte_cpu_to_be_16(l3_type);
374
375         if (l3_type == ETHER_TYPE_IPv4) {
376                 dst->l3_len = sizeof(*ip4h);
377                 ip4h = (struct ipv4_hdr *)(eth + 1);
378                 ip4h->version_ihl = 4 << 4 |
379                         sizeof(*ip4h) / IPV4_IHL_MULTIPLIER;
380                 ip4h->time_to_live = 64;
381                 ip4h->next_proto_id = proto_id;
382         } else if (l3_type == ETHER_TYPE_IPv6) {
383                 dst->l3_len = sizeof(*ip6h);
384                 ip6h = (struct ipv6_hdr *)(eth + 1);
385                 ip6h->vtc_flow = 6 << 4;
386                 ip6h->proto = proto_id;
387                 ip6h->hop_limits = 64;
388         }
389 }
390
391 static int
392 netbe_add_dest(struct netbe_lcore *lc, uint32_t dev_idx, uint16_t family,
393         const struct netbe_dest *dst, uint32_t dnum)
394 {
395         int32_t rc, sid;
396         uint8_t proto;
397         uint16_t l3_type;
398         uint32_t i, n, m;
399         struct tle_dest *dp;
400
401         if (family == AF_INET) {
402                 n = lc->dst4_num;
403                 dp = lc->dst4 + n;
404                 m = RTE_DIM(lc->dst4);
405                 l3_type = ETHER_TYPE_IPv4;
406         } else {
407                 n = lc->dst6_num;
408                 dp = lc->dst6 + n;
409                 m = RTE_DIM(lc->dst6);
410                 l3_type = ETHER_TYPE_IPv6;
411         }
412
413         if (n + dnum >= m) {
414                 RTE_LOG(ERR, USER1, "%s(lcore=%u, family=%hu, dnum=%u) exceeds "
415                         "maximum allowed number of destinations(%u);\n",
416                         __func__, lc->id, family, dnum, m);
417                 return -ENOSPC;
418         }
419
420         sid = rte_lcore_to_socket_id(lc->id);
421         proto = (becfg.proto == TLE_PROTO_UDP) ? IPPROTO_UDP : IPPROTO_TCP;
422         rc = 0;
423
424         for (i = 0; i != dnum && rc == 0; i++) {
425                 fill_dst(dp + i, lc->prtq + dev_idx, dst + i, l3_type, sid,
426                         proto);
427                 if (family == AF_INET)
428                         rc = netbe_add_ipv4_route(lc, dst + i, n + i);
429                 else
430                         rc = netbe_add_ipv6_route(lc, dst + i, n + i);
431         }
432
433         if (family == AF_INET)
434                 lc->dst4_num = n + i;
435         else
436                 lc->dst6_num = n + i;
437
438         return rc;
439 }
440
441 static inline void
442 fill_arp_reply(struct netbe_dev *dev, struct rte_mbuf *m)
443 {
444         struct ether_hdr *eth;
445         struct arp_hdr *ahdr;
446         struct arp_ipv4 *adata;
447         uint32_t tip;
448
449         /* set up the ethernet data */
450         eth = rte_pktmbuf_mtod(m, struct ether_hdr *);
451         eth->d_addr = eth->s_addr;
452         eth->s_addr = dev->port.mac;
453
454         /* set up the arp data */
455         ahdr = rte_pktmbuf_mtod_offset(m, struct arp_hdr *, m->l2_len);
456         adata = &ahdr->arp_data;
457
458         ahdr->arp_op = rte_cpu_to_be_16(ARP_OP_REPLY);
459
460         tip = adata->arp_tip;
461         adata->arp_tip = adata->arp_sip;
462         adata->arp_sip = tip;
463
464         adata->arp_tha = adata->arp_sha;
465         adata->arp_sha = dev->port.mac;
466 }
467
468 /* this is a semi ARP response implementation of RFC 826
469  * in RFC, it algo is as below
470  *
471  * ?Do I have the hardware type in ar$hrd?
472  * Yes: (almost definitely)
473  * [optionally check the hardware length ar$hln]
474  * ?Do I speak the protocol in ar$pro?
475  * Yes:
476  *  [optionally check the protocol length ar$pln]
477  *  Merge_flag := false
478  *  If the pair <protocol type, sender protocol address> is
479  *      already in my translation table, update the sender
480  *      hardware address field of the entry with the new
481  *      information in the packet and set Merge_flag to true.
482  *  ?Am I the target protocol address?
483  *  Yes:
484  *    If Merge_flag is false, add the triplet <protocol type,
485  *        sender protocol address, sender hardware address> to
486  *        the translation table.
487  *    ?Is the opcode ares_op$REQUEST?  (NOW look at the opcode!!)
488  *    Yes:
489  *      Swap hardware and protocol fields, putting the local
490  *          hardware and protocol addresses in the sender fields.
491  *      Set the ar$op field to ares_op$REPLY
492  *      Send the packet to the (new) target hardware address on
493  *          the same hardware on which the request was received.
494  *
495  * So, in our implementation we skip updating the local cache,
496  * we assume that local cache is ok, so we just reply the packet.
497  */
498
499 static inline void
500 send_arp_reply(struct netbe_dev *dev, struct pkt_buf *pb)
501 {
502         uint32_t i, n, num;
503         struct rte_mbuf **m;
504
505         m = pb->pkt;
506         num = pb->num;
507         for (i = 0; i != num; i++) {
508                 fill_arp_reply(dev, m[i]);
509                 NETBE_PKT_DUMP(m[i]);
510         }
511
512         n = rte_eth_tx_burst(dev->port.id, dev->txqid, m, num);
513         NETBE_TRACE("%s: sent n=%u arp replies\n", __func__, n);
514
515         /* free mbufs with unsent arp response */
516         for (i = n; i != num; i++)
517                 rte_pktmbuf_free(m[i]);
518
519         pb->num = 0;
520 }
521
522 static inline void
523 netbe_rx(struct netbe_lcore *lc, uint32_t pidx)
524 {
525         uint32_t j, k, n;
526         struct rte_mbuf *pkt[MAX_PKT_BURST];
527         struct rte_mbuf *rp[MAX_PKT_BURST];
528         int32_t rc[MAX_PKT_BURST];
529         struct pkt_buf *abuf;
530
531         n = rte_eth_rx_burst(lc->prtq[pidx].port.id,
532                         lc->prtq[pidx].rxqid, pkt, RTE_DIM(pkt));
533
534         if (n != 0) {
535                 lc->prtq[pidx].rx_stat.in += n;
536                 NETBE_TRACE("%s(%u): rte_eth_rx_burst(%u, %u) returns %u\n",
537                         __func__, lc->id, lc->prtq[pidx].port.id,
538                         lc->prtq[pidx].rxqid, n);
539
540                 k = tle_rx_bulk(lc->prtq[pidx].dev, pkt, rp, rc, n);
541
542                 lc->prtq[pidx].rx_stat.up += k;
543                 lc->prtq[pidx].rx_stat.drop += n - k;
544                 NETBE_TRACE("%s(%u): tle_%s_rx_bulk(%p, %u) returns %u\n",
545                         __func__, lc->id, proto_name[lc->proto],
546                         lc->prtq[pidx].dev, n, k);
547
548                 for (j = 0; j != n - k; j++) {
549                         NETBE_TRACE("%s:%d(port=%u) rp[%u]={%p, %d};\n",
550                                 __func__, __LINE__, lc->prtq[pidx].port.id,
551                                 j, rp[j], rc[j]);
552                         rte_pktmbuf_free(rp[j]);
553                 }
554         }
555
556         /* respond to incoming arp requests */
557         abuf = &lc->prtq[pidx].arp_buf;
558         if (abuf->num == 0)
559                 return;
560
561         send_arp_reply(&lc->prtq[pidx], abuf);
562 }
563
564 static inline void
565 netbe_tx(struct netbe_lcore *lc, uint32_t pidx)
566 {
567         uint32_t j, k, n;
568         struct rte_mbuf **mb;
569
570         n = lc->prtq[pidx].tx_buf.num;
571         k = RTE_DIM(lc->prtq[pidx].tx_buf.pkt) - n;
572         mb = lc->prtq[pidx].tx_buf.pkt;
573
574         if (k >= RTE_DIM(lc->prtq[pidx].tx_buf.pkt) / 2) {
575                 j = tle_tx_bulk(lc->prtq[pidx].dev, mb + n, k);
576                 n += j;
577                 lc->prtq[pidx].tx_stat.down += j;
578         }
579
580         if (n == 0)
581                 return;
582
583         NETBE_TRACE("%s(%u): tle_%s_tx_bulk(%p) returns %u,\n"
584                 "total pkts to send: %u\n",
585                 __func__, lc->id, proto_name[lc->proto],
586                 lc->prtq[pidx].dev, j, n);
587
588         for (j = 0; j != n; j++)
589                 NETBE_PKT_DUMP(mb[j]);
590
591         k = rte_eth_tx_burst(lc->prtq[pidx].port.id,
592                         lc->prtq[pidx].txqid, mb, n);
593
594         lc->prtq[pidx].tx_stat.out += k;
595         lc->prtq[pidx].tx_stat.drop += n - k;
596         NETBE_TRACE("%s(%u): rte_eth_tx_burst(%u, %u, %u) returns %u\n",
597                 __func__, lc->id, lc->prtq[pidx].port.id, lc->prtq[pidx].txqid,
598                 n, k);
599
600         lc->prtq[pidx].tx_buf.num = n - k;
601         if (k != 0)
602                 for (j = k; j != n; j++)
603                         mb[j - k] = mb[j];
604 }
605
606 static inline void
607 netbe_lcore(void)
608 {
609         uint32_t i;
610         struct netbe_lcore *lc;
611
612         lc = RTE_PER_LCORE(_be);
613         if (lc == NULL)
614                 return;
615
616         for (i = 0; i != lc->prtq_num; i++) {
617                 netbe_rx(lc, i);
618                 netbe_tx(lc, i);
619         }
620 }
621
622 static inline int
623 netfe_rxtx_get_mss(struct netfe_stream *fes)
624 {
625         switch (fes->proto) {
626         case TLE_PROTO_TCP:
627                 return tle_tcp_stream_get_mss(fes->s);
628
629         case TLE_PROTO_UDP:
630                 /* The UDP code doesn't have MSS discovery, so have to
631                  * assume arbitary MTU. Going to use default mbuf
632                  * data space as TLDK uses this internally as a
633                  * maximum segment size.
634                  */
635                 return RTE_MBUF_DEFAULT_DATAROOM - TLE_DST_MAX_HDR;
636         default:
637                 return -EINVAL;
638         }
639 }
640
641 static inline int
642 netfe_rxtx_dispatch_reply(uint32_t lcore, struct netfe_stream *fes)
643 {
644         struct pkt_buf *pb;
645         int32_t sid;
646         uint32_t n;
647         uint32_t cnt_mtu_pkts;
648         uint32_t cnt_all_pkts;
649         uint32_t idx_pkt;
650         uint32_t len_tail;
651         uint32_t mtu;
652         size_t csz, len;
653         char *dst;
654         const uint8_t *src;
655
656         pb = &fes->pbuf;
657         sid = rte_lcore_to_socket_id(lcore) + 1;
658         mtu = netfe_rxtx_get_mss(fes);
659
660         cnt_mtu_pkts = (fes->txlen / mtu);
661         cnt_all_pkts = cnt_mtu_pkts;
662         len_tail = fes->txlen - (mtu * cnt_mtu_pkts);
663
664         if (len_tail > 0)
665                 cnt_all_pkts++;
666
667         if (pb->num + cnt_all_pkts >= RTE_DIM(pb->pkt)) {
668                 NETFE_TRACE("%s(%u): Insufficent space for outbound burst\n",
669                         __func__, lcore);
670                 return -ENOMEM;
671         }
672         if (rte_pktmbuf_alloc_bulk(mpool[sid], &pb->pkt[pb->num], cnt_all_pkts)
673                         != 0) {
674                 NETFE_TRACE("%s(%u): rte_pktmbuf_alloc_bulk() failed\n",
675                         __func__, lcore);
676                 return -ENOMEM;
677         }
678
679         csz = tx_content.sz;
680         src = tx_content.data;
681
682         n = pb->num;
683
684         /* Full MTU packets */
685         for (idx_pkt = 0; idx_pkt < cnt_mtu_pkts; idx_pkt++, n++) {
686                 rte_pktmbuf_reset(pb->pkt[n]);
687                 dst = rte_pktmbuf_append(pb->pkt[n], mtu);
688                 if (csz > 0) {
689                         len = RTE_MIN(mtu, csz);
690                         rte_memcpy(dst, src, len);
691                         src += len;
692                         csz -= len;
693                 }
694         }
695
696         /* Last non-MTU packet, if any */
697         if (len_tail > 0) {
698                 rte_pktmbuf_reset(pb->pkt[n]);
699                 dst = rte_pktmbuf_append(pb->pkt[n], len_tail);
700                 if (csz > 0) {
701                         len = RTE_MIN(len_tail, csz);
702                         rte_memcpy(dst, src, len);
703                         src += len;
704                         csz -= len;
705                 }
706                 n++;
707         }
708
709         pb->num = n;
710
711         return 0;
712 }
713
714 static inline int
715 netfe_rx_process(uint32_t lcore, struct netfe_stream *fes)
716 {
717         uint32_t k, n;
718         uint64_t count_bytes;
719
720         n = fes->pbuf.num;
721         k = RTE_DIM(fes->pbuf.pkt) - n;
722
723         /* packet buffer is full, can't receive any new packets. */
724         if (k == 0) {
725                 tle_event_idle(fes->rxev);
726                 fes->stat.rxev[TLE_SEV_IDLE]++;
727                 return 0;
728         }
729
730         n = tle_stream_recv(fes->s, fes->pbuf.pkt + n, k);
731         if (n == 0)
732                 return 0;
733
734         NETFE_TRACE("%s(%u): tle_%s_stream_recv(%p, %u) returns %u\n",
735                 __func__, lcore, proto_name[fes->proto], fes->s, k, n);
736
737         fes->pbuf.num += n;
738         fes->stat.rxp += n;
739
740         /* free all received mbufs. */
741         if (fes->op == RXONLY)
742                 fes->stat.rxb += pkt_buf_empty(&fes->pbuf);
743         else if (fes->op == RXTX) {
744                 /* RXTX mode. Count incoming bytes then discard.
745                  * If receive threshold (rxlen) exceeded, send out a packet.
746                  */
747                 count_bytes = pkt_buf_empty(&fes->pbuf);
748                 fes->stat.rxb += count_bytes;
749                 fes->rx_run_len += count_bytes;
750                 if (fes->rx_run_len >= fes->rxlen) {
751                         /* Idle Rx as buffer needed for Tx */
752                         tle_event_idle(fes->rxev);
753                         fes->stat.rxev[TLE_SEV_IDLE]++;
754
755                         /* Discard surplus bytes. For now pipelining of
756                          * requests is not supported.
757                          */
758                         fes->rx_run_len = 0;
759                         netfe_rxtx_dispatch_reply(lcore, fes);
760
761                         /* Kick off a Tx event */
762                         tle_event_active(fes->txev, TLE_SEV_UP);
763                         fes->stat.txev[TLE_SEV_UP]++;
764                 }
765         }
766         /* mark stream as writable */
767         else if (k == RTE_DIM(fes->pbuf.pkt)) {
768                 if (fes->op == ECHO) {
769                         tle_event_active(fes->txev, TLE_SEV_UP);
770                         fes->stat.txev[TLE_SEV_UP]++;
771                 } else if (fes->op == FWD) {
772                         tle_event_raise(fes->txev);
773                         fes->stat.txev[TLE_SEV_UP]++;
774                 }
775         }
776
777         return n;
778 }
779
780 #endif /* COMMON_H_ */