2 * Copyright (c) 2016 Intel Corporation.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at:
7 * http://www.apache.org/licenses/LICENSE-2.0
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
19 #define TCP_MAX_PROCESS 0x20
22 netfe_stream_term_tcp(struct netfe_lcore *fe, struct netfe_stream *fes)
26 memset(&fes->stat, 0, sizeof(fes->stat));
27 netfe_put_stream(fe, &fe->free, fes);
31 netfe_stream_close_tcp(struct netfe_lcore *fe, struct netfe_stream *fes)
33 tle_tcp_stream_close(fes->s);
34 netfe_stream_term_tcp(fe, fes);
38 * helper function: opens IPv4 and IPv6 streams for selected port.
40 static struct netfe_stream *
41 netfe_stream_open_tcp(struct netfe_lcore *fe, struct netfe_sprm *sprm,
42 uint32_t lcore, uint16_t op, uint32_t bidx, uint8_t server_mode)
45 struct netfe_stream *fes;
46 struct sockaddr_in *l4;
47 struct sockaddr_in6 *l6;
49 struct tle_tcp_stream_param tprm;
51 fes = netfe_get_stream(&fe->free);
57 if (server_mode != 0) {
58 tle_event_free(fes->rxev);
59 fes->rxev = tle_event_alloc(fe->syneq, fes);
62 if (fes->rxev == NULL) {
63 netfe_stream_close_tcp(fe, fes);
68 /* activate rx, tx and err events for the stream */
69 if (op == TXONLY || op == FWD) {
70 tle_event_active(fes->txev, TLE_SEV_DOWN);
71 fes->stat.txev[TLE_SEV_DOWN]++;
74 if (op != TXONLY || server_mode != 0) {
75 tle_event_active(fes->rxev, TLE_SEV_DOWN);
76 fes->stat.rxev[TLE_SEV_DOWN]++;
78 tle_event_active(fes->erev, TLE_SEV_DOWN);
79 fes->stat.erev[TLE_SEV_DOWN]++;
81 memset(&tprm, 0, sizeof(tprm));
82 tprm.addr.local = sprm->local_addr;
83 tprm.addr.remote = sprm->remote_addr;
84 tprm.cfg.err_ev = fes->erev;
85 tprm.cfg.recv_ev = fes->rxev;
87 tprm.cfg.send_ev = fes->txev;
89 fes->s = tle_tcp_stream_open(becfg.cpu[bidx].ctx, &tprm);
93 netfe_stream_close_tcp(fe, fes);
96 if (sprm->local_addr.ss_family == AF_INET) {
97 l4 = (struct sockaddr_in *) &sprm->local_addr;
98 errport = ntohs(l4->sin_port);
100 l6 = (struct sockaddr_in6 *) &sprm->local_addr;
101 errport = ntohs(l6->sin6_port);
104 RTE_LOG(ERR, USER1, "stream open failed for port %u with error "
105 "code=%u, bidx=%u, lc=%u\n",
106 errport, rc, bidx, becfg.cpu[bidx].id);
110 RTE_LOG(NOTICE, USER1,
111 "%s(%u)={s=%p, op=%hu, proto=%s, rxev=%p, txev=%p}, belc=%u\n",
112 __func__, lcore, fes->s, op, proto_name[becfg.proto],
113 fes->rxev, fes->txev, becfg.cpu[bidx].id);
116 fes->proto = becfg.proto;
117 fes->family = sprm->local_addr.ss_family;
118 fes->laddr = sprm->local_addr;
119 netfe_put_stream(fe, &fe->use, fes);
125 netfe_lcore_init_tcp(const struct netfe_lcore_prm *prm)
129 uint32_t i, lcore, snum;
130 struct netfe_lcore *fe;
131 struct tle_evq_param eprm;
132 struct netfe_stream *fes;
133 struct netfe_sprm *sprm;
135 lcore = rte_lcore_id();
137 snum = prm->max_streams;
138 RTE_LOG(NOTICE, USER1, "%s(lcore=%u, nb_streams=%u, max_streams=%u)\n",
139 __func__, lcore, prm->nb_streams, snum);
141 memset(&eprm, 0, sizeof(eprm));
142 eprm.socket_id = rte_lcore_to_socket_id(lcore);
143 eprm.max_events = snum;
145 sz = sizeof(*fe) + snum * sizeof(struct netfe_stream);
146 fe = rte_zmalloc_socket(NULL, sz, RTE_CACHE_LINE_SIZE,
147 rte_lcore_to_socket_id(lcore));
150 RTE_LOG(ERR, USER1, "%s:%d failed to allocate %zu bytes\n",
151 __func__, __LINE__, sz);
155 RTE_PER_LCORE(_fe) = fe;
158 /* initialize the stream pool */
159 LIST_INIT(&fe->free.head);
160 LIST_INIT(&fe->use.head);
162 /* allocate the event queues */
163 fe->syneq = tle_evq_create(&eprm);
164 fe->ereq = tle_evq_create(&eprm);
165 fe->rxeq = tle_evq_create(&eprm);
166 fe->txeq = tle_evq_create(&eprm);
168 RTE_LOG(INFO, USER1, "%s(%u) synevq=%p, erevq=%p, rxevq=%p, txevq=%p\n",
169 __func__, lcore, fe->syneq, fe->ereq, fe->rxeq, fe->txeq);
170 if (fe->syneq == NULL || fe->ereq == NULL || fe->rxeq == NULL ||
174 fes = (struct netfe_stream *)(fe + 1);
175 for (i = 0; i != snum; i++) {
176 fes[i].rxev = tle_event_alloc(fe->rxeq, fes + i);
177 fes[i].txev = tle_event_alloc(fe->txeq, fes + i);
178 fes[i].erev = tle_event_alloc(fe->ereq, fes + i);
179 netfe_put_stream(fe, &fe->free, fes + i);
183 /* open all requested streams. */
184 for (i = 0; i != prm->nb_streams; i++) {
185 sprm = &prm->stream[i].sprm;
186 fes = netfe_stream_open_tcp(fe, sprm, lcore, prm->stream[i].op,
187 sprm->bidx, becfg.server);
193 netfe_stream_dump(fes, &sprm->local_addr, &sprm->remote_addr);
195 if (prm->stream[i].op == FWD) {
196 fes->fwdprm = prm->stream[i].fprm;
197 } else if (prm->stream[i].op == TXONLY) {
198 fes->txlen = prm->stream[i].txlen;
199 fes->raddr = prm->stream[i].sprm.remote_addr;
202 if (becfg.server == 1) {
203 rc = tle_tcp_stream_listen(fes->s);
205 "%s(%u) tle_tcp_stream_listen(stream=%p) "
207 __func__, lcore, fes->s, rc);
211 rc = tle_tcp_stream_connect(fes->s,
212 (const struct sockaddr *)&sprm->remote_addr);
214 "%s(%u) tle_tcp_stream_connect(stream=%p) "
216 __func__, lcore, fes->s, rc);
225 static inline struct netfe_stream *
226 netfe_create_fwd_stream(struct netfe_lcore *fe, struct netfe_stream *fes,
227 uint32_t lcore, uint32_t bidx)
230 struct netfe_stream *fws;
232 fws = netfe_stream_open_tcp(fe, &fes->fwdprm, lcore, FWD, bidx, 0);
234 rc = tle_tcp_stream_connect(fws->s,
235 (const struct sockaddr *)&fes->fwdprm.remote_addr);
236 NETFE_TRACE("%s(lc=%u, fes=%p): tle_tcp_stream_connect() "
238 __func__, rte_lcore_id(), fes, rc);
241 netfe_stream_term_tcp(fe, fws);
247 RTE_LOG(ERR, USER1, "%s(lc=%u fes=%p) failed to open "
248 "forwarding stream;\n",
249 __func__, rte_lcore_id(), fes);
255 netfe_fwd_tcp(uint32_t lcore, struct netfe_stream *fes)
258 struct rte_mbuf **pkt;
259 struct netfe_stream *fed;
273 k = tle_tcp_stream_send(fed->s, pkt, n);
275 NETFE_TRACE("%s(%u): tle_%s_stream_send(%p, %u) "
277 __func__, lcore, proto_name[fes->proto],
281 fed->stat.drops += n - k;
285 NETFE_TRACE("%s(%u, %p): no fwd stream for %u pkts;\n",
286 __func__, lcore, fes->s, n);
287 for (k = 0; k != n; k++) {
288 NETFE_TRACE("%s(%u, %p): free(%p);\n",
289 __func__, lcore, fes->s, pkt[k]);
290 rte_pktmbuf_free(pkt[k]);
292 fes->stat.drops += n;
295 /* copy unforwarded mbufs. */
296 for (i = 0; i != n - k; i++)
302 tle_event_raise(fes->txev);
303 fes->stat.txev[TLE_SEV_UP]++;
306 if (n == RTE_DIM(fes->pbuf.pkt)) {
307 tle_event_active(fes->rxev, TLE_SEV_UP);
308 fes->stat.rxev[TLE_SEV_UP]++;
313 netfe_new_conn_tcp(struct netfe_lcore *fe, __rte_unused uint32_t lcore,
314 struct netfe_stream *fes)
316 uint32_t i, k, n, rc;
317 struct tle_tcp_stream_cfg *prm;
318 struct tle_tcp_accept_param acpt_prm[MAX_PKT_BURST];
319 struct tle_stream *rs[MAX_PKT_BURST];
320 struct tle_syn_req syn_reqs[MAX_PKT_BURST];
321 struct netfe_stream *ts;
322 struct netfe_stream *fs[MAX_PKT_BURST];
324 static const struct tle_stream_cb zcb = {.func = NULL, .data = NULL};
326 /* check if any syn requests are waiting */
327 n = tle_tcp_stream_synreqs(fes->s, syn_reqs, RTE_DIM(syn_reqs));
331 NETFE_TRACE("%s(%u): tle_tcp_stream_synreqs(%p, %u) returns %u\n",
332 __func__, lcore, fes->s, MAX_PKT_BURST, n);
334 /* get n free streams */
335 k = netfe_get_streams(&fe->free, fs, n);
337 /* fill accept params to accept k connection requests*/
338 for (i = 0; i != k; i++) {
339 acpt_prm[i].syn = syn_reqs[i];
340 prm = &acpt_prm[i].cfg;
342 prm->recv_ev = fs[i]->rxev;
343 prm->send_ev = fs[i]->txev;
344 prm->err_ev = fs[i]->erev;
345 tle_event_active(fs[i]->erev, TLE_SEV_DOWN);
351 /* accept k new connections */
352 rc = tle_tcp_stream_accept(fes->s, acpt_prm, rs, k);
354 NETFE_TRACE("%s(%u): tle_tcp_stream_accept(%p, %u) returns %u\n",
355 __func__, lcore, fes->s, k, rc);
358 /* n - rc connections could not be accepted */
359 tle_tcp_reject(fes->s, syn_reqs + rc, n - rc);
361 /* put back k - rc streams free list */
362 netfe_put_streams(fe, &fe->free, fs + rc, k - rc);
365 /* update the params for accepted streams */
366 for (i = 0; i != rc; i++) {
372 ts->proto = fes->proto;
373 ts->family = fes->family;
374 ts->txlen = fes->txlen;
376 if (fes->op == TXONLY) {
377 tle_event_active(ts->txev, TLE_SEV_UP);
378 ts->stat.txev[TLE_SEV_UP]++;
380 tle_event_active(ts->rxev, TLE_SEV_DOWN);
381 ts->stat.rxev[TLE_SEV_DOWN]++;
384 netfe_put_stream(fe, &fe->use, ts);
385 NETFE_TRACE("%s(%u) accept (stream=%p, s=%p)\n",
386 __func__, lcore, ts, rs[i]);
388 /* create a new fwd stream if needed */
389 if (fes->op == FWD) {
390 tle_event_active(ts->txev, TLE_SEV_DOWN);
391 ts->stat.txev[TLE_SEV_DOWN]++;
393 ts->fwds = netfe_create_fwd_stream(fe, fes, lcore,
395 if (ts->fwds != NULL)
399 fe->tcp_stat.acc += rc;
400 fe->tcp_stat.rej += n - rc;
404 netfe_lcore_tcp_req(void)
406 struct netfe_lcore *fe;
407 uint32_t j, n, lcore;
408 struct netfe_stream *fs[MAX_PKT_BURST];
410 fe = RTE_PER_LCORE(_fe);
414 /* look for syn events */
415 n = tle_evq_get(fe->syneq, (const void **)(uintptr_t)fs, RTE_DIM(fs));
419 lcore = rte_lcore_id();
421 NETFE_TRACE("%s(%u): tle_evq_get(synevq=%p) returns %u\n",
422 __func__, lcore, fe->syneq, n);
424 for (j = 0; j != n; j++)
425 netfe_new_conn_tcp(fe, lcore, fs[j]);
429 netfe_lcore_tcp_rst(void)
431 struct netfe_lcore *fe;
432 struct netfe_stream *fwds;
434 struct tle_stream *s[MAX_PKT_BURST];
435 struct netfe_stream *fs[MAX_PKT_BURST];
436 struct tle_event *rv[MAX_PKT_BURST];
437 struct tle_event *tv[MAX_PKT_BURST];
438 struct tle_event *ev[MAX_PKT_BURST];
440 fe = RTE_PER_LCORE(_fe);
444 /* look for err events */
445 n = tle_evq_get(fe->ereq, (const void **)(uintptr_t)fs, RTE_DIM(fs));
449 NETFE_TRACE("%s(%u): tle_evq_get(errevq=%p) returns %u\n",
450 __func__, rte_lcore_id(), fe->ereq, n);
452 for (j = 0; j != n; j++) {
453 if (verbose > VERBOSE_NONE) {
454 struct tle_tcp_stream_addr addr;
455 tle_tcp_stream_get_addr(fs[j]->s, &addr);
456 netfe_stream_dump(fs[j], &addr.local, &addr.remote);
464 tle_evq_idle(fe->rxeq, rv, n);
465 tle_evq_idle(fe->txeq, tv, n);
466 tle_evq_idle(fe->ereq, ev, n);
468 tle_tcp_stream_close_bulk(s, n);
470 for (j = 0; j != n; j++) {
473 * if forwarding mode, send unsent packets and
474 * signal peer stream to terminate too.
477 if (fwds != NULL && fwds->s != NULL) {
479 /* forward all unsent packets */
480 netfe_fwd_tcp(rte_lcore_id(), fs[j]);
483 tle_event_raise(fwds->erev);
487 /* now terminate the stream receiving rst event*/
488 netfe_rem_stream(&fe->use, fs[j]);
489 netfe_stream_term_tcp(fe, fs[j]);
495 netfe_rxtx_process_tcp(__rte_unused uint32_t lcore, struct netfe_stream *fes)
498 struct rte_mbuf **pkt;
503 /* there is nothing to send. */
505 tle_event_idle(fes->txev);
506 fes->stat.txev[TLE_SEV_IDLE]++;
511 k = tle_tcp_stream_send(fes->s, pkt, n);
513 NETFE_TRACE("%s(%u): tle_%s_stream_send(%p, %u) returns %u\n",
514 __func__, lcore, proto_name[fes->proto],
517 fes->stat.drops += n - k;
519 /* not able to send anything. */
523 if (n == RTE_DIM(fes->pbuf.pkt)) {
524 /* mark stream as readable */
525 tle_event_active(fes->rxev, TLE_SEV_UP);
526 fes->stat.rxev[TLE_SEV_UP]++;
529 /* adjust pbuf array. */
530 fes->pbuf.num = n - k;
531 for (i = 0; i != n - k; i++)
536 netfe_tx_process_tcp(uint32_t lcore, struct netfe_stream *fes)
540 /* refill with new mbufs. */
541 pkt_buf_fill(lcore, &fes->pbuf, fes->txlen);
548 * TODO: cannot use function pointers for unequal param num.
550 k = tle_tcp_stream_send(fes->s, fes->pbuf.pkt, n);
552 NETFE_TRACE("%s(%u): tle_%s_stream_send(%p, %u) returns %u\n",
553 __func__, lcore, proto_name[fes->proto], fes->s, n, k);
555 fes->stat.drops += n - k;
560 /* adjust pbuf array. */
561 fes->pbuf.num = n - k;
562 for (i = k; i != n; i++)
563 fes->pbuf.pkt[i - k] = fes->pbuf.pkt[i];
567 netfe_lcore_tcp(void)
569 struct netfe_lcore *fe;
570 uint32_t j, n, lcore;
571 struct netfe_stream *fs[MAX_PKT_BURST];
573 fe = RTE_PER_LCORE(_fe);
577 lcore = rte_lcore_id();
579 /* look for rx events */
580 n = tle_evq_get(fe->rxeq, (const void **)(uintptr_t)fs, RTE_DIM(fs));
583 NETFE_TRACE("%s(%u): tle_evq_get(rxevq=%p) returns %u\n",
584 __func__, lcore, fe->rxeq, n);
585 for (j = 0; j != n; j++)
586 netfe_rx_process(lcore, fs[j]);
589 /* look for tx events */
590 n = tle_evq_get(fe->txeq, (const void **)(uintptr_t)fs, RTE_DIM(fs));
593 NETFE_TRACE("%s(%u): tle_evq_get(txevq=%p) returns %u\n",
594 __func__, lcore, fe->txeq, n);
595 for (j = 0; j != n; j++) {
596 if (fs[j]->op == RXTX)
597 netfe_rxtx_process_tcp(lcore, fs[j]);
598 else if (fs[j]->op == FWD)
599 netfe_fwd_tcp(lcore, fs[j]);
600 else if (fs[j]->op == TXONLY)
601 netfe_tx_process_tcp(lcore, fs[j]);
607 netfe_lcore_fini_tcp(void)
609 struct netfe_lcore *fe;
611 struct tle_tcp_stream_addr addr;
612 struct netfe_stream *fes;
613 uint32_t acc, rej, ter;
615 fe = RTE_PER_LCORE(_fe);
620 for (i = 0; i != snum; i++) {
621 fes = netfe_get_stream(&fe->use);
622 tle_tcp_stream_get_addr(fes->s, &addr);
623 netfe_stream_dump(fes, &addr.local, &addr.remote);
624 netfe_stream_close(fe, fes);
627 acc = fe->tcp_stat.acc;
628 rej = fe->tcp_stat.rej;
629 ter = fe->tcp_stat.ter;
630 RTE_LOG(NOTICE, USER1,
631 "tcp_stats={con_acc=%u,con_rej=%u,con_ter=%u};\n",
634 tle_evq_destroy(fe->txeq);
635 tle_evq_destroy(fe->rxeq);
636 tle_evq_destroy(fe->ereq);
637 tle_evq_destroy(fe->syneq);
638 RTE_PER_LCORE(_fe) = NULL;
643 netbe_lcore_tcp(void)
646 struct netbe_lcore *lc;
648 lc = RTE_PER_LCORE(_be);
652 for (i = 0; i != lc->prtq_num; i++) {
654 tle_tcp_process(lc->ctx, TCP_MAX_PROCESS);
660 lcore_main_tcp(void *arg)
664 struct lcore_prm *prm;
667 lcore = rte_lcore_id();
669 RTE_LOG(NOTICE, USER1, "%s(lcore=%u) start\n",
675 if (prm->fe.max_streams != 0)
676 rc = netfe_lcore_init_tcp(&prm->fe);
679 if (rc == 0 && prm->be.lc != NULL)
680 rc = netbe_lcore_setup(prm->be.lc);
685 while (force_quit == 0) {
686 netfe_lcore_tcp_req();
687 netfe_lcore_tcp_rst();
692 RTE_LOG(NOTICE, USER1, "%s(lcore=%u) finish\n",
695 netfe_lcore_fini_tcp();