2 * Copyright (c) 2019 Cisco and/or its affiliates.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at:
7 * http://www.apache.org/licenses/LICENSE-2.0
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
16 #include <vnet/session/application_local.h>
17 #include <vnet/session/session.h>
19 typedef enum ct_segment_flags_
21 CT_SEGMENT_F_CLIENT_DETACHED = 1 << 0,
22 CT_SEGMENT_F_SERVER_DETACHED = 1 << 1,
25 typedef struct ct_segment_
28 u32 client_n_sessions;
29 u32 server_n_sessions;
30 ct_segment_flags_t flags;
33 typedef struct ct_segments_
38 ct_segment_t *segments;
41 typedef struct ct_main_
43 ct_connection_t **connections; /**< Per-worker connection pools */
44 u32 n_workers; /**< Number of vpp workers */
45 u32 n_sessions; /**< Cumulative sessions counter */
46 u32 *ho_reusable; /**< Vector of reusable ho indices */
47 clib_spinlock_t ho_reuseable_lock; /**< Lock for reusable ho indices */
48 clib_rwlock_t app_segs_lock; /**< RW lock for seg contexts */
49 uword *app_segs_ctxs_table; /**< App handle to segment pool map */
50 ct_segments_ctx_t *app_seg_ctxs; /**< Pool of ct segment contexts */
53 static ct_main_t ct_main;
55 static ct_connection_t *
56 ct_connection_alloc (u32 thread_index)
60 pool_get_zero (ct_main.connections[thread_index], ct);
61 ct->c_c_index = ct - ct_main.connections[thread_index];
62 ct->c_thread_index = thread_index;
65 ct->seg_ctx_index = ~0;
66 ct->ct_seg_index = ~0;
70 static ct_connection_t *
71 ct_connection_get (u32 ct_index, u32 thread_index)
73 if (pool_is_free_index (ct_main.connections[thread_index], ct_index))
75 return pool_elt_at_index (ct_main.connections[thread_index], ct_index);
79 ct_connection_free (ct_connection_t * ct)
83 u32 thread_index = ct->c_thread_index;
84 memset (ct, 0xfc, sizeof (*ct));
85 pool_put (ct_main.connections[thread_index], ct);
88 pool_put (ct_main.connections[ct->c_thread_index], ct);
91 static ct_connection_t *
92 ct_half_open_alloc (void)
94 ct_main_t *cm = &ct_main;
97 clib_spinlock_lock (&cm->ho_reuseable_lock);
98 vec_foreach (hip, cm->ho_reusable)
99 pool_put_index (cm->connections[0], *hip);
100 vec_reset_length (cm->ho_reusable);
101 clib_spinlock_unlock (&cm->ho_reuseable_lock);
103 return ct_connection_alloc (0);
107 ct_half_open_add_reusable (u32 ho_index)
109 ct_main_t *cm = &ct_main;
111 clib_spinlock_lock (&cm->ho_reuseable_lock);
112 vec_add1 (cm->ho_reusable, ho_index);
113 clib_spinlock_unlock (&cm->ho_reuseable_lock);
117 ct_session_get_peer (session_t * s)
119 ct_connection_t *ct, *peer_ct;
120 ct = ct_connection_get (s->connection_index, s->thread_index);
121 peer_ct = ct_connection_get (ct->peer_index, s->thread_index);
122 return session_get (peer_ct->c_s_index, s->thread_index);
126 ct_session_endpoint (session_t * ll, session_endpoint_t * sep)
129 ct = (ct_connection_t *) session_get_transport (ll);
130 sep->transport_proto = ct->actual_tp;
131 sep->port = ct->c_lcl_port;
132 sep->is_ip4 = ct->c_is_ip4;
133 ip_copy (&sep->ip, &ct->c_lcl_ip, ct->c_is_ip4);
137 ct_session_dealloc_fifos (ct_connection_t *ct, svm_fifo_t *rx_fifo,
140 ct_segments_ctx_t *seg_ctx;
141 ct_main_t *cm = &ct_main;
142 ct_segment_flags_t flags;
143 segment_manager_t *sm;
144 app_worker_t *app_wrk;
145 ct_segment_t *ct_seg;
154 sm = segment_manager_get (rx_fifo->segment_manager);
155 seg_index = rx_fifo->segment_index;
157 fs = segment_manager_get_segment_w_lock (sm, seg_index);
158 fifo_segment_free_fifo (fs, rx_fifo);
159 fifo_segment_free_fifo (fs, tx_fifo);
160 segment_manager_segment_reader_unlock (sm);
163 * Update segment context
166 clib_rwlock_reader_lock (&cm->app_segs_lock);
168 seg_ctx = pool_elt_at_index (cm->app_seg_ctxs, ct->seg_ctx_index);
169 ct_seg = pool_elt_at_index (seg_ctx->segments, ct->ct_seg_index);
171 if (ct->flags & CT_CONN_F_CLIENT)
174 __atomic_sub_fetch (&ct_seg->client_n_sessions, 1, __ATOMIC_RELAXED);
176 ct_seg->flags |= CT_SEGMENT_F_CLIENT_DETACHED;
181 __atomic_sub_fetch (&ct_seg->server_n_sessions, 1, __ATOMIC_RELAXED);
183 ct_seg->flags |= CT_SEGMENT_F_SERVER_DETACHED;
186 flags = ct_seg->flags;
188 clib_rwlock_reader_unlock (&cm->app_segs_lock);
191 * No need to do any app updates, return
196 if (ct->flags & CT_CONN_F_CLIENT)
198 app_wrk = app_worker_get_if_valid (ct->client_wrk);
199 /* Determine if client app still needs notification, i.e., if it is
200 * still attached. If client detached and this is the last ct session
201 * on this segment, then its connects segment manager should also be
202 * detached, so do not send notification */
205 segment_manager_t *csm;
206 csm = app_worker_get_connect_segment_manager (app_wrk);
207 if (!segment_manager_app_detached (csm))
208 app_worker_del_segment_notify (app_wrk, ct->segment_handle);
211 else if (!segment_manager_app_detached (sm))
213 app_wrk = app_worker_get (ct->server_wrk);
214 app_worker_del_segment_notify (app_wrk, ct->segment_handle);
217 if (!(flags & CT_SEGMENT_F_CLIENT_DETACHED) ||
218 !(flags & CT_SEGMENT_F_SERVER_DETACHED))
222 * Remove segment context because both client and server detached
225 clib_rwlock_writer_lock (&cm->app_segs_lock);
227 seg_ctx = pool_elt_at_index (cm->app_seg_ctxs, ct->seg_ctx_index);
228 pool_put_index (seg_ctx->segments, ct->ct_seg_index);
231 * No more segment indices left, remove the segments context
233 if (!pool_elts (seg_ctx->segments))
235 u64 table_handle = seg_ctx->client_wrk << 16 | seg_ctx->server_wrk;
236 table_handle = (u64) seg_ctx->sm_index << 32 | table_handle;
237 hash_unset (cm->app_segs_ctxs_table, table_handle);
238 pool_free (seg_ctx->segments);
239 pool_put_index (cm->app_seg_ctxs, ct->seg_ctx_index);
242 clib_rwlock_writer_unlock (&cm->app_segs_lock);
244 segment_manager_lock_and_del_segment (sm, seg_index);
246 /* Cleanup segment manager if needed. If server detaches there's a chance
247 * the client's sessions will hold up segment removal */
248 if (segment_manager_app_detached (sm) && !segment_manager_has_fifos (sm))
249 segment_manager_free_safe (sm);
253 ct_session_connect_notify (session_t *ss)
255 u32 ss_index, opaque, thread_index, cnt;
256 ct_connection_t *sct, *cct;
257 ct_segments_ctx_t *seg_ctx;
258 app_worker_t *client_wrk;
259 ct_main_t *cm = &ct_main;
260 ct_segment_t *ct_seg;
264 ss_index = ss->session_index;
265 thread_index = ss->thread_index;
266 sct = (ct_connection_t *) session_get_transport (ss);
267 client_wrk = app_worker_get (sct->client_wrk);
268 opaque = sct->client_opaque;
270 cct = ct_connection_get (sct->peer_index, thread_index);
272 /* Client closed while waiting for reply from server */
275 session_transport_closing_notify (&sct->connection);
276 session_transport_delete_notify (&sct->connection);
277 ct_connection_free (sct);
281 session_half_open_delete_notify (&cct->connection);
282 cct->flags &= ~CT_CONN_F_HALF_OPEN;
285 * Update ct segment context
288 clib_rwlock_reader_lock (&cm->app_segs_lock);
290 seg_ctx = pool_elt_at_index (cm->app_seg_ctxs, sct->seg_ctx_index);
291 ct_seg = pool_elt_at_index (seg_ctx->segments, sct->ct_seg_index);
293 cnt = __atomic_add_fetch (&ct_seg->client_n_sessions, 1, __ATOMIC_RELAXED);
296 err = app_worker_add_segment_notify (client_wrk, cct->segment_handle);
299 clib_rwlock_reader_unlock (&cm->app_segs_lock);
305 clib_rwlock_reader_unlock (&cm->app_segs_lock);
308 * Alloc client session
311 cs = session_alloc (thread_index);
312 ss = session_get (ss_index, thread_index);
313 cs->session_type = ss->session_type;
314 cs->listener_handle = SESSION_INVALID_HANDLE;
315 cs->session_state = SESSION_STATE_CONNECTING;
316 cs->app_wrk_index = client_wrk->wrk_index;
317 cs->connection_index = cct->c_c_index;
318 cct->seg_ctx_index = sct->seg_ctx_index;
319 cct->ct_seg_index = sct->ct_seg_index;
321 cct->c_s_index = cs->session_index;
322 cct->client_rx_fifo = ss->tx_fifo;
323 cct->client_tx_fifo = ss->rx_fifo;
325 cct->client_rx_fifo->refcnt++;
326 cct->client_tx_fifo->refcnt++;
328 /* This will allocate fifos for the session. They won't be used for
329 * exchanging data but they will be used to close the connection if
330 * the segment manager/worker is freed */
331 if ((err = app_worker_init_connected (client_wrk, cs)))
338 cs->session_state = SESSION_STATE_CONNECTING;
340 if (app_worker_connect_notify (client_wrk, cs, err, opaque))
343 ct_session_dealloc_fifos (cct, cs->rx_fifo, cs->tx_fifo);
348 cs = session_get (cct->c_s_index, cct->c_thread_index);
349 cs->session_state = SESSION_STATE_READY;
354 app_worker_connect_notify (client_wrk, 0, err, opaque);
358 static ct_segment_t *
359 ct_lookup_free_segment (segment_manager_t *sm, ct_segments_ctx_t *seg_ctx,
362 uword free_bytes, max_free_bytes;
363 ct_segment_t *ct_seg, *res = 0;
367 max_free_bytes = pair_bytes;
368 pool_foreach (ct_seg, seg_ctx->segments)
370 /* Client or server has detached so segment cannot be used */
371 if ((ct_seg->flags & CT_SEGMENT_F_SERVER_DETACHED) ||
372 (ct_seg->flags & CT_SEGMENT_F_CLIENT_DETACHED))
374 fs = segment_manager_get_segment (sm, ct_seg->segment_index);
375 free_bytes = fifo_segment_available_bytes (fs);
376 max_fifos = fifo_segment_size (fs) / pair_bytes;
377 if (free_bytes > max_free_bytes &&
378 fifo_segment_num_fifos (fs) / 2 < max_fifos)
380 max_free_bytes = free_bytes;
389 ct_init_accepted_session (app_worker_t *server_wrk, ct_connection_t *ct,
390 session_t *ls, session_t *ll)
392 u32 sm_index, pair_bytes, seg_ctx_index = ~0, ct_seg_index = ~0;
393 u64 seg_handle, table_handle, seg_size;
394 segment_manager_props_t *props;
395 const u32 margin = 16 << 10;
396 ct_segments_ctx_t *seg_ctx;
397 ct_main_t *cm = &ct_main;
398 application_t *server;
399 segment_manager_t *sm;
400 ct_segment_t *ct_seg;
405 sm = app_worker_get_listen_segment_manager (server_wrk, ll);
406 sm_index = segment_manager_index (sm);
407 server = application_get (server_wrk->app_index);
408 props = application_segment_manager_properties (server);
410 table_handle = ct->client_wrk << 16 | server_wrk->wrk_index;
411 table_handle = (u64) segment_manager_index (sm) << 32 | table_handle;
414 * Check if we already have a segment that can hold the fifos
417 clib_rwlock_reader_lock (&cm->app_segs_lock);
419 spp = hash_get (cm->app_segs_ctxs_table, table_handle);
422 seg_ctx_index = *spp;
423 seg_ctx = pool_elt_at_index (cm->app_seg_ctxs, seg_ctx_index);
424 pair_bytes = props->rx_fifo_size + props->tx_fifo_size + margin;
425 ct_seg = ct_lookup_free_segment (sm, seg_ctx, pair_bytes);
428 ct_seg_index = ct_seg - seg_ctx->segments;
429 fs_index = ct_seg->segment_index;
430 __atomic_add_fetch (&ct_seg->server_n_sessions, 1, __ATOMIC_RELAXED);
434 clib_rwlock_reader_unlock (&cm->app_segs_lock);
437 * No segment, try to alloc one and notify the server
440 if (ct_seg_index == ~0)
442 seg_size = clib_max (props->segment_size, 128 << 20);
443 fs_index = segment_manager_add_segment (sm, seg_size, 0);
450 /* Make sure the segment is not used for other fifos */
451 fs = segment_manager_get_segment_w_lock (sm, fs_index);
452 fifo_segment_flags (fs) |= FIFO_SEGMENT_F_CUSTOM_USE;
453 segment_manager_segment_reader_unlock (sm);
455 clib_rwlock_writer_lock (&cm->app_segs_lock);
457 if (seg_ctx_index == ~0)
459 pool_get_zero (cm->app_seg_ctxs, seg_ctx);
460 seg_ctx_index = seg_ctx - cm->app_seg_ctxs;
461 hash_set (cm->app_segs_ctxs_table, table_handle, seg_ctx_index);
462 seg_ctx->server_wrk = server_wrk->wrk_index;
463 seg_ctx->client_wrk = ct->client_wrk;
464 seg_ctx->sm_index = sm_index;
467 seg_ctx = pool_elt_at_index (cm->app_seg_ctxs, seg_ctx_index);
469 pool_get_zero (seg_ctx->segments, ct_seg);
470 ct_seg->segment_index = fs_index;
471 ct_seg->server_n_sessions += 1;
472 ct_seg_index = ct_seg - seg_ctx->segments;
474 clib_rwlock_writer_unlock (&cm->app_segs_lock);
476 /* New segment, notify the server. Client notification sent after
477 * server accepts the connection */
478 seg_handle = segment_manager_make_segment_handle (sm_index, fs_index);
479 if ((rv = app_worker_add_segment_notify (server_wrk, seg_handle)))
481 segment_manager_lock_and_del_segment (sm, fs_index);
483 clib_rwlock_writer_lock (&cm->app_segs_lock);
484 pool_put_index (seg_ctx->segments, ct_seg_index);
485 clib_rwlock_writer_unlock (&cm->app_segs_lock);
487 goto failed_fix_count;
492 * Allocate and initialize the fifos
494 fs = segment_manager_get_segment_w_lock (sm, fs_index);
495 rv = segment_manager_try_alloc_fifos (
496 fs, ls->thread_index, props->rx_fifo_size, props->tx_fifo_size,
497 &ls->rx_fifo, &ls->tx_fifo);
500 segment_manager_segment_reader_unlock (sm);
501 goto failed_fix_count;
504 ls->rx_fifo->shr->master_session_index = ls->session_index;
505 ls->tx_fifo->shr->master_session_index = ls->session_index;
506 ls->rx_fifo->master_thread_index = ls->thread_index;
507 ls->tx_fifo->master_thread_index = ls->thread_index;
508 ls->rx_fifo->segment_manager = sm_index;
509 ls->tx_fifo->segment_manager = sm_index;
510 ls->rx_fifo->segment_index = fs_index;
511 ls->tx_fifo->segment_index = fs_index;
513 seg_handle = segment_manager_segment_handle (sm, fs);
514 segment_manager_segment_reader_unlock (sm);
516 ct->segment_handle = seg_handle;
517 ct->seg_ctx_index = seg_ctx_index;
518 ct->ct_seg_index = ct_seg_index;
524 clib_rwlock_reader_lock (&cm->app_segs_lock);
526 seg_ctx = pool_elt_at_index (cm->app_seg_ctxs, seg_ctx_index);
527 ct_seg = pool_elt_at_index (seg_ctx->segments, ct_seg_index);
528 __atomic_sub_fetch (&ct_seg->server_n_sessions, 1, __ATOMIC_RELAXED);
530 clib_rwlock_reader_unlock (&cm->app_segs_lock);
537 ct_accept_rpc_wrk_handler (void *accept_args)
539 u32 cct_index, ho_index, thread_index, ll_index;
540 ct_connection_t *sct, *cct, *ho;
541 transport_connection_t *ll_ct;
542 app_worker_t *server_wrk;
546 * Alloc client ct and initialize from ho
548 thread_index = vlib_get_thread_index ();
549 cct = ct_connection_alloc (thread_index);
550 cct_index = cct->c_c_index;
552 ho_index = pointer_to_uword (accept_args);
553 ho = ct_connection_get (ho_index, 0);
555 /* Unlikely but half-open session and transport could have been freed */
556 if (PREDICT_FALSE (!ho))
558 ct_connection_free (cct);
562 clib_memcpy (cct, ho, sizeof (*ho));
563 cct->c_c_index = cct_index;
564 cct->c_thread_index = thread_index;
565 cct->flags |= CT_CONN_F_HALF_OPEN;
567 /* Notify session layer that half-open is on a different thread
568 * and mark ho connection index reusable. Avoids another rpc
570 session_half_open_migrate_notify (&cct->connection);
571 session_half_open_migrated_notify (&cct->connection);
572 ct_half_open_add_reusable (ho_index);
575 * Alloc and init server transport
578 ll_index = cct->peer_index;
579 ll = listen_session_get (ll_index);
580 sct = ct_connection_alloc (thread_index);
581 /* Transport not necessarily ct but it might, so grab after sct alloc */
582 ll_ct = listen_session_get_transport (ll);
584 /* Make sure cct is valid after sct alloc */
585 cct = ct_connection_get (cct_index, thread_index);
588 sct->c_lcl_port = ll_ct->lcl_port;
589 sct->c_is_ip4 = cct->c_is_ip4;
590 clib_memcpy (&sct->c_lcl_ip, &ll_ct->lcl_ip, sizeof (ll_ct->lcl_ip));
591 sct->client_wrk = cct->client_wrk;
592 sct->c_proto = TRANSPORT_PROTO_NONE;
593 sct->client_opaque = cct->client_opaque;
594 sct->actual_tp = cct->actual_tp;
596 sct->peer_index = cct->c_c_index;
597 cct->peer_index = sct->c_c_index;
600 * Accept server session. Client session is created only after
601 * server confirms accept.
603 ss = session_alloc (thread_index);
604 ll = listen_session_get (ll_index);
605 ss->session_type = session_type_from_proto_and_ip (TRANSPORT_PROTO_NONE,
607 ss->connection_index = sct->c_c_index;
608 ss->listener_handle = listen_session_get_handle (ll);
609 ss->session_state = SESSION_STATE_CREATED;
611 server_wrk = application_listener_select_worker (ll);
612 ss->app_wrk_index = server_wrk->wrk_index;
614 sct->c_s_index = ss->session_index;
615 sct->server_wrk = ss->app_wrk_index;
617 if (ct_init_accepted_session (server_wrk, sct, ss, ll))
619 ct_connection_free (sct);
624 ss->session_state = SESSION_STATE_ACCEPTING;
625 if (app_worker_accept_notify (server_wrk, ss))
627 ct_session_dealloc_fifos (sct, ss->rx_fifo, ss->tx_fifo);
628 ct_connection_free (sct);
633 cct->segment_handle = sct->segment_handle;
637 ct_connect (app_worker_t * client_wrk, session_t * ll,
638 session_endpoint_cfg_t * sep)
640 u32 thread_index, ho_index;
641 ct_main_t *cm = &ct_main;
644 /* Simple round-robin policy for spreading sessions over workers. We skip
645 * thread index 0, i.e., offset the index by 1, when we have workers as it
646 * is the one dedicated to main thread. Note that n_workers does not include
649 thread_index = cm->n_workers ? (cm->n_sessions % cm->n_workers) + 1 : 0;
652 * Alloc and init client half-open transport
655 ho = ct_half_open_alloc ();
656 ho_index = ho->c_c_index;
657 ho->c_rmt_port = sep->port;
659 ho->c_is_ip4 = sep->is_ip4;
660 ho->client_opaque = sep->opaque;
661 ho->client_wrk = client_wrk->wrk_index;
662 ho->peer_index = ll->session_index;
663 ho->c_proto = TRANSPORT_PROTO_NONE;
664 ho->c_flags |= TRANSPORT_CONNECTION_F_NO_LOOKUP;
665 clib_memcpy (&ho->c_rmt_ip, &sep->ip, sizeof (sep->ip));
666 ho->flags |= CT_CONN_F_CLIENT;
668 ho->actual_tp = sep->transport_proto;
671 * Accept connection on thread selected above. Connected reply comes
672 * after server accepts the connection.
675 session_send_rpc_evt_to_thread_force (thread_index,
676 ct_accept_rpc_wrk_handler,
677 uword_to_pointer (ho_index, void *));
683 ct_start_listen (u32 app_listener_index, transport_endpoint_t * tep)
685 session_endpoint_cfg_t *sep;
688 sep = (session_endpoint_cfg_t *) tep;
689 ct = ct_connection_alloc (0);
690 ct->server_wrk = sep->app_wrk_index;
691 ct->c_is_ip4 = sep->is_ip4;
692 clib_memcpy (&ct->c_lcl_ip, &sep->ip, sizeof (sep->ip));
693 ct->c_lcl_port = sep->port;
694 ct->c_s_index = app_listener_index;
695 ct->actual_tp = sep->transport_proto;
696 return ct->c_c_index;
700 ct_stop_listen (u32 ct_index)
703 ct = ct_connection_get (ct_index, 0);
704 ct_connection_free (ct);
708 static transport_connection_t *
709 ct_listener_get (u32 ct_index)
711 return (transport_connection_t *) ct_connection_get (ct_index, 0);
714 static transport_connection_t *
715 ct_half_open_get (u32 ct_index)
717 return (transport_connection_t *) ct_connection_get (ct_index, 0);
721 ct_session_cleanup (u32 conn_index, u32 thread_index)
723 ct_connection_t *ct, *peer_ct;
725 ct = ct_connection_get (conn_index, thread_index);
729 peer_ct = ct_connection_get (ct->peer_index, thread_index);
731 peer_ct->peer_index = ~0;
733 ct_connection_free (ct);
737 ct_cleanup_ho (u32 ho_index)
739 ct_connection_free (ct_connection_get (ho_index, 0));
743 ct_session_connect (transport_endpoint_cfg_t * tep)
745 session_endpoint_cfg_t *sep_ext;
746 session_endpoint_t _sep, *sep = &_sep;
747 app_worker_t *app_wrk;
755 sep_ext = (session_endpoint_cfg_t *) tep;
756 _sep = *(session_endpoint_t *) tep;
757 app_wrk = app_worker_get (sep_ext->app_wrk_index);
758 app = application_get (app_wrk->app_index);
760 sep->transport_proto = sep_ext->original_tp;
761 table_index = application_local_session_table (app);
762 lh = session_lookup_local_endpoint (table_index, sep);
763 if (lh == SESSION_DROP_HANDLE)
764 return SESSION_E_FILTERED;
766 if (lh == SESSION_INVALID_HANDLE)
769 ll = listen_session_get_from_handle (lh);
770 al = app_listener_get_w_session (ll);
773 * Break loop if rule in local table points to connecting app. This
774 * can happen if client is a generic proxy. Route connect through
775 * global table instead.
777 if (al->app_index == app->app_index)
780 return ct_connect (app_wrk, ll, sep_ext);
783 * If nothing found, check the global scope for locally attached
784 * destinations. Make sure first that we're allowed to.
788 if (session_endpoint_is_local (sep))
789 return SESSION_E_NOROUTE;
791 if (!application_has_global_scope (app))
792 return SESSION_E_SCOPE;
794 fib_proto = session_endpoint_fib_proto (sep);
795 table_index = session_lookup_get_index_for_fib (fib_proto, sep->fib_index);
796 ll = session_lookup_listener_wildcard (table_index, sep);
798 /* Avoid connecting app to own listener */
799 if (ll && ll->app_index != app->app_index)
800 return ct_connect (app_wrk, ll, sep_ext);
802 /* Failed to connect but no error */
803 return SESSION_E_LOCAL_CONNECT;
807 ct_session_close (u32 ct_index, u32 thread_index)
809 ct_connection_t *ct, *peer_ct;
810 app_worker_t *app_wrk;
813 ct = ct_connection_get (ct_index, thread_index);
814 peer_ct = ct_connection_get (ct->peer_index, thread_index);
817 peer_ct->peer_index = ~0;
818 /* Make sure session was allocated */
819 if (peer_ct->flags & CT_CONN_F_HALF_OPEN)
821 app_wrk = app_worker_get (peer_ct->client_wrk);
822 app_worker_connect_notify (app_wrk, 0, SESSION_E_REFUSED,
823 peer_ct->client_opaque);
825 else if (peer_ct->c_s_index != ~0)
826 session_transport_closing_notify (&peer_ct->connection);
828 ct_connection_free (peer_ct);
831 s = session_get (ct->c_s_index, ct->c_thread_index);
833 if (ct->flags & CT_CONN_F_CLIENT)
835 /* Normal free for client session as the fifos are allocated through
836 * the connects segment manager in a segment that's not shared with
838 session_free_w_fifos (s);
839 ct_session_dealloc_fifos (ct, ct->client_rx_fifo, ct->client_tx_fifo);
843 /* Manual session and fifo segment cleanup to avoid implicit
844 * segment manager cleanups and notifications */
845 app_wrk = app_worker_get_if_valid (s->app_wrk_index);
847 app_worker_cleanup_notify (app_wrk, s, SESSION_CLEANUP_SESSION);
849 ct_session_dealloc_fifos (ct, s->rx_fifo, s->tx_fifo);
853 ct_connection_free (ct);
856 static transport_connection_t *
857 ct_session_get (u32 ct_index, u32 thread_index)
859 return (transport_connection_t *) ct_connection_get (ct_index,
864 format_ct_connection_id (u8 * s, va_list * args)
866 ct_connection_t *ct = va_arg (*args, ct_connection_t *);
871 s = format (s, "[%d:%d][CT:%U] %U:%d->%U:%d", ct->c_thread_index,
872 ct->c_s_index, format_transport_proto_short, ct->actual_tp,
873 format_ip4_address, &ct->c_lcl_ip4,
874 clib_net_to_host_u16 (ct->c_lcl_port), format_ip4_address,
875 &ct->c_rmt_ip4, clib_net_to_host_u16 (ct->c_rmt_port));
879 s = format (s, "[%d:%d][CT:%U] %U:%d->%U:%d", ct->c_thread_index,
880 ct->c_s_index, format_transport_proto_short, ct->actual_tp,
881 format_ip6_address, &ct->c_lcl_ip6,
882 clib_net_to_host_u16 (ct->c_lcl_port), format_ip6_address,
883 &ct->c_rmt_ip6, clib_net_to_host_u16 (ct->c_rmt_port));
890 ct_custom_tx (void *session, transport_send_params_t * sp)
892 session_t *s = (session_t *) session;
893 if (session_has_transport (s))
895 /* If event enqueued towards peer, remove from scheduler and remove
896 * session tx flag, i.e., accept new tx events. Unset fifo flag now to
897 * avoid missing events if peer did not clear fifo flag yet, which is
898 * interpreted as successful notification and session is descheduled. */
899 svm_fifo_unset_event (s->tx_fifo);
900 if (!ct_session_tx (s))
901 sp->flags = TRANSPORT_SND_F_DESCHED;
903 /* The scheduler uses packet count as a means of upper bounding the amount
904 * of work done per dispatch. So make it look like we have sent something */
909 ct_app_rx_evt (transport_connection_t * tc)
911 ct_connection_t *ct = (ct_connection_t *) tc, *peer_ct;
914 peer_ct = ct_connection_get (ct->peer_index, tc->thread_index);
917 ps = session_get (peer_ct->c_s_index, peer_ct->c_thread_index);
918 return session_dequeue_notify (ps);
922 format_ct_listener (u8 * s, va_list * args)
924 u32 tc_index = va_arg (*args, u32);
925 u32 __clib_unused thread_index = va_arg (*args, u32);
926 u32 __clib_unused verbose = va_arg (*args, u32);
927 ct_connection_t *ct = ct_connection_get (tc_index, 0);
928 s = format (s, "%-" SESSION_CLI_ID_LEN "U", format_ct_connection_id, ct);
930 s = format (s, "%-" SESSION_CLI_STATE_LEN "s", "LISTEN");
935 format_ct_half_open (u8 *s, va_list *args)
937 u32 ho_index = va_arg (*args, u32);
938 u32 verbose = va_arg (*args, u32);
939 ct_connection_t *ct = ct_connection_get (ho_index, 0);
940 s = format (s, "%-" SESSION_CLI_ID_LEN "U", format_ct_connection_id, ct);
942 s = format (s, "%-" SESSION_CLI_STATE_LEN "s", "HALF-OPEN");
947 format_ct_connection (u8 * s, va_list * args)
949 ct_connection_t *ct = va_arg (*args, ct_connection_t *);
950 u32 verbose = va_arg (*args, u32);
954 s = format (s, "%-" SESSION_CLI_ID_LEN "U", format_ct_connection_id, ct);
957 s = format (s, "%-" SESSION_CLI_STATE_LEN "s", "ESTABLISHED");
960 s = format (s, "\n");
967 format_ct_session (u8 * s, va_list * args)
969 u32 ct_index = va_arg (*args, u32);
970 u32 thread_index = va_arg (*args, u32);
971 u32 verbose = va_arg (*args, u32);
974 ct = ct_connection_get (ct_index, thread_index);
977 s = format (s, "empty\n");
981 s = format (s, "%U", format_ct_connection, ct, verbose);
986 ct_enable_disable (vlib_main_t * vm, u8 is_en)
988 ct_main_t *cm = &ct_main;
990 cm->n_workers = vlib_num_workers ();
991 vec_validate (cm->connections, cm->n_workers);
992 clib_spinlock_init (&cm->ho_reuseable_lock);
993 clib_rwlock_init (&cm->app_segs_lock);
998 static const transport_proto_vft_t cut_thru_proto = {
999 .enable = ct_enable_disable,
1000 .start_listen = ct_start_listen,
1001 .stop_listen = ct_stop_listen,
1002 .get_connection = ct_session_get,
1003 .get_listener = ct_listener_get,
1004 .get_half_open = ct_half_open_get,
1005 .cleanup = ct_session_cleanup,
1006 .cleanup_ho = ct_cleanup_ho,
1007 .connect = ct_session_connect,
1008 .close = ct_session_close,
1009 .custom_tx = ct_custom_tx,
1010 .app_rx_evt = ct_app_rx_evt,
1011 .format_listener = format_ct_listener,
1012 .format_half_open = format_ct_half_open,
1013 .format_connection = format_ct_session,
1014 .transport_options = {
1017 .tx_type = TRANSPORT_TX_INTERNAL,
1018 .service_type = TRANSPORT_SERVICE_VC,
1024 ct_session_tx (session_t * s)
1026 ct_connection_t *ct, *peer_ct;
1029 ct = (ct_connection_t *) session_get_transport (s);
1030 peer_ct = ct_connection_get (ct->peer_index, ct->c_thread_index);
1033 peer_s = session_get (peer_ct->c_s_index, peer_ct->c_thread_index);
1034 if (peer_s->session_state >= SESSION_STATE_TRANSPORT_CLOSING)
1036 return session_enqueue_notify (peer_s);
1039 static clib_error_t *
1040 ct_transport_init (vlib_main_t * vm)
1042 transport_register_protocol (TRANSPORT_PROTO_NONE, &cut_thru_proto,
1043 FIB_PROTOCOL_IP4, ~0);
1044 transport_register_protocol (TRANSPORT_PROTO_NONE, &cut_thru_proto,
1045 FIB_PROTOCOL_IP6, ~0);
1049 VLIB_INIT_FUNCTION (ct_transport_init);
1052 * fd.io coding-style-patch-verification: ON
1055 * eval: (c-set-style "gnu")