From: Florin Coras Date: Tue, 16 Nov 2021 03:01:52 +0000 (-0800) Subject: session: try to coalesce ct accept rpcs X-Git-Tag: v22.06-rc0~231 X-Git-Url: https://gerrit.fd.io/r/gitweb?a=commitdiff_plain;h=c215e5a317980447fed3ca0df1895e261769c219;p=vpp.git session: try to coalesce ct accept rpcs Type: improvement Signed-off-by: Florin Coras Change-Id: I11de851949afd90a37c102ed0c00969a4cc73df4 --- diff --git a/src/vnet/session/application_local.c b/src/vnet/session/application_local.c index 3a1d48b1a7e..db8d721b631 100644 --- a/src/vnet/session/application_local.c +++ b/src/vnet/session/application_local.c @@ -49,7 +49,9 @@ typedef struct ct_cleanup_req_ typedef struct ct_worker_ { ct_connection_t *connections; /**< Per-worker connection pools */ + u32 *pending_connects; /**< Fifo of pending ho indices */ ct_cleanup_req_t *pending_cleanups; /**< Fifo of pending indices */ + u8 have_connects; /**< Set if connect rpc pending */ u8 have_cleanups; /**< Set if cleanup rpc pending */ } ct_worker_t; @@ -631,22 +633,20 @@ ct_init_accepted_session (app_worker_t *server_wrk, ct_connection_t *ct, } static void -ct_accept_rpc_wrk_handler (void *accept_args) +ct_accept_one (u32 thread_index, u32 ho_index) { - u32 cct_index, ho_index, thread_index, ll_index; ct_connection_t *sct, *cct, *ho; transport_connection_t *ll_ct; app_worker_t *server_wrk; + u32 cct_index, ll_index; session_t *ss, *ll; /* * Alloc client ct and initialize from ho */ - thread_index = vlib_get_thread_index (); cct = ct_connection_alloc (thread_index); cct_index = cct->c_c_index; - ho_index = pointer_to_uword (accept_args); ho = ct_connection_get (ho_index, 0); /* Unlikely but half-open session and transport could have been freed */ @@ -738,6 +738,34 @@ ct_accept_rpc_wrk_handler (void *accept_args) } } +static void +ct_accept_rpc_wrk_handler (void *rpc_args) +{ + u32 thread_index, ho_index, n_connects, i, n_pending; + const u32 max_connects = 32; + ct_worker_t *wrk; + + thread_index = pointer_to_uword (rpc_args); + wrk = ct_worker_get (thread_index); + + /* Sub without lock as main enqueues with worker barrier */ + n_pending = clib_fifo_elts (wrk->pending_connects); + n_connects = clib_min (n_pending, max_connects); + + for (i = 0; i < n_connects; i++) + { + clib_fifo_sub1 (wrk->pending_connects, ho_index); + ct_accept_one (thread_index, ho_index); + } + + if (n_pending == n_connects) + wrk->have_connects = 0; + else + session_send_rpc_evt_to_thread_force ( + thread_index, ct_accept_rpc_wrk_handler, + uword_to_pointer (thread_index, void *)); +} + static int ct_connect (app_worker_t * client_wrk, session_t * ll, session_endpoint_cfg_t * sep) @@ -745,6 +773,7 @@ ct_connect (app_worker_t * client_wrk, session_t * ll, u32 thread_index, ho_index; ct_main_t *cm = &ct_main; ct_connection_t *ho; + ct_worker_t *wrk; /* Simple round-robin policy for spreading sessions over workers. We skip * thread index 0, i.e., offset the index by 1, when we have workers as it @@ -777,9 +806,17 @@ ct_connect (app_worker_t * client_wrk, session_t * ll, * after server accepts the connection. */ - session_send_rpc_evt_to_thread_force (thread_index, - ct_accept_rpc_wrk_handler, - uword_to_pointer (ho_index, void *)); + wrk = ct_worker_get (thread_index); + + /* Worker barrier held, add without additional lock */ + clib_fifo_add1 (wrk->pending_connects, ho_index); + if (!wrk->have_connects) + { + wrk->have_connects = 1; + session_send_rpc_evt_to_thread_force ( + thread_index, ct_accept_rpc_wrk_handler, + uword_to_pointer (thread_index, void *)); + } return ho_index; }