X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=src%2Fvnet%2Fsession%2Fsession.c;h=c1d02ab8f09ab05671a26acd294352b65843909d;hb=cdb7170ab44affe51c574eb2218744fff1bdd369;hp=97636a7ac8edd8e5af02c07b61401f426a82f0f5;hpb=f6c4313b6aa7746fe97afd398ce68c2efbef0600;p=vpp.git diff --git a/src/vnet/session/session.c b/src/vnet/session/session.c index 97636a7ac8e..c1d02ab8f09 100644 --- a/src/vnet/session/session.c +++ b/src/vnet/session/session.c @@ -23,7 +23,7 @@ #include #include -session_manager_main_t session_manager_main; +session_main_t session_main; static inline int session_send_evt_to_thread (void *data, void *args, u32 thread_index, @@ -34,7 +34,7 @@ session_send_evt_to_thread (void *data, void *args, u32 thread_index, svm_msg_q_t *mq; u32 tries = 0, max_tries; - mq = session_manager_get_vpp_event_queue (thread_index); + mq = session_main_get_vpp_event_queue (thread_index); while (svm_msg_q_try_lock (mq)) { max_tries = vlib_get_current_process (vlib_get_main ())? 1e6 : 3; @@ -66,7 +66,7 @@ session_send_evt_to_thread (void *data, void *args, u32 thread_index, case SESSION_IO_EVT_TX: case SESSION_IO_EVT_TX_FLUSH: case SESSION_IO_EVT_BUILTIN_RX: - evt->fifo = data; + evt->session_index = *(u32 *) data; break; case SESSION_IO_EVT_BUILTIN_TX: case SESSION_CTRL_EVT_CLOSE: @@ -85,7 +85,8 @@ session_send_evt_to_thread (void *data, void *args, u32 thread_index, int session_send_io_evt_to_thread (svm_fifo_t * f, session_evt_type_t evt_type) { - return session_send_evt_to_thread (f, 0, f->master_thread_index, evt_type); + return session_send_evt_to_thread (&f->master_session_index, 0, + f->master_thread_index, evt_type); } int @@ -104,12 +105,19 @@ session_send_ctrl_evt_to_thread (session_t * s, session_evt_type_t evt_type) SESSION_CTRL_EVT_CLOSE); } +void +session_send_rpc_evt_to_thread_force (u32 thread_index, void *fp, + void *rpc_args) +{ + session_send_evt_to_thread (fp, rpc_args, thread_index, + SESSION_CTRL_EVT_RPC); +} + void session_send_rpc_evt_to_thread (u32 thread_index, void *fp, void *rpc_args) { if (thread_index != vlib_get_thread_index ()) - session_send_evt_to_thread (fp, rpc_args, thread_index, - SESSION_CTRL_EVT_RPC); + session_send_rpc_evt_to_thread_force (thread_index, fp, rpc_args); else { void (*fnp) (void *) = fp; @@ -121,21 +129,14 @@ static void session_program_transport_close (session_t * s) { u32 thread_index = vlib_get_thread_index (); - session_manager_worker_t *wrk; + session_worker_t *wrk; session_event_t *evt; - if (!session_has_transport (s)) - { - /* Polling may not be enabled on main thread so close now */ - session_transport_close (s); - return; - } - /* If we are in the handler thread, or being called with the worker barrier * held, just append a new event to pending disconnects vector. */ if (vlib_thread_is_main_w_barrier () || thread_index == s->thread_index) { - wrk = session_manager_get_worker (s->thread_index); + wrk = session_main_get_worker (s->thread_index); vec_add2 (wrk->pending_disconnects, evt, 1); clib_memset (evt, 0, sizeof (*evt)); evt->session_handle = session_handle (s); @@ -148,7 +149,7 @@ session_program_transport_close (session_t * s) session_t * session_alloc (u32 thread_index) { - session_manager_worker_t *wrk = &session_manager_main.wrk[thread_index]; + session_worker_t *wrk = &session_main.wrk[thread_index]; session_t *s; u8 will_expand = 0; pool_get_aligned_will_expand (wrk->sessions, will_expand, @@ -173,16 +174,21 @@ session_alloc (u32 thread_index) void session_free (session_t * s) { - pool_put (session_manager_main.wrk[s->thread_index].sessions, s); if (CLIB_DEBUG) - clib_memset (s, 0xFA, sizeof (*s)); + { + u8 thread_index = s->thread_index; + clib_memset (s, 0xFA, sizeof (*s)); + pool_put (session_main.wrk[thread_index].sessions, s); + return; + } + SESSION_EVT_DBG (SESSION_EVT_FREE, s); + pool_put (session_main.wrk[s->thread_index].sessions, s); } void session_free_w_fifos (session_t * s) { - segment_manager_dealloc_fifos (s->svm_segment_index, s->rx_fifo, - s->tx_fifo); + segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo); session_free (s); } @@ -214,7 +220,6 @@ session_alloc_for_connection (transport_connection_t * tc) s = session_alloc (thread_index); s->session_type = session_type_from_proto_and_ip (tc->proto, tc->is_ip4); - s->enqueue_epoch = (u64) ~ 0; s->session_state = SESSION_STATE_CLOSED; /* Attach transport to session and vice versa */ @@ -386,12 +391,12 @@ session_enqueue_stream_connection (transport_connection_t * tc, { /* Queue RX event on this fifo. Eventually these will need to be flushed * by calling stream_server_flush_enqueue_events () */ - session_manager_worker_t *wrk; + session_worker_t *wrk; - wrk = session_manager_get_worker (s->thread_index); - if (s->enqueue_epoch != wrk->current_enqueue_epoch[tc->proto]) + wrk = session_main_get_worker (s->thread_index); + if (!(s->flags & SESSION_F_RX_EVT)) { - s->enqueue_epoch = wrk->current_enqueue_epoch[tc->proto]; + s->flags |= SESSION_F_RX_EVT; vec_add1 (wrk->session_to_enqueue[tc->proto], s->session_index); } } @@ -406,7 +411,7 @@ session_enqueue_dgram_connection (session_t * s, { int enqueued = 0, rv, in_order_off; - ASSERT (svm_fifo_max_enqueue (s->rx_fifo) + ASSERT (svm_fifo_max_enqueue_prod (s->rx_fifo) >= b->current_length + sizeof (*hdr)); svm_fifo_enqueue_nowait (s->rx_fifo, sizeof (session_dgram_hdr_t), @@ -424,53 +429,28 @@ session_enqueue_dgram_connection (session_t * s, { /* Queue RX event on this fifo. Eventually these will need to be flushed * by calling stream_server_flush_enqueue_events () */ - session_manager_worker_t *wrk; + session_worker_t *wrk; - wrk = session_manager_get_worker (s->thread_index); - if (s->enqueue_epoch != wrk->current_enqueue_epoch[proto]) + wrk = session_main_get_worker (s->thread_index); + if (!(s->flags & SESSION_F_RX_EVT)) { - s->enqueue_epoch = wrk->current_enqueue_epoch[proto]; + s->flags |= SESSION_F_RX_EVT; vec_add1 (wrk->session_to_enqueue[proto], s->session_index); } } return enqueued; } -/** Check if we have space in rx fifo to push more bytes */ -u8 -stream_session_no_space (transport_connection_t * tc, u32 thread_index, - u16 data_len) -{ - session_t *s = session_get (tc->s_index, thread_index); - - if (PREDICT_FALSE (s->session_state != SESSION_STATE_READY)) - return 1; - - if (data_len > svm_fifo_max_enqueue (s->rx_fifo)) - return 1; - - return 0; -} - -u32 -session_tx_fifo_max_dequeue (transport_connection_t * tc) -{ - session_t *s = session_get (tc->s_index, tc->thread_index); - if (!s->tx_fifo) - return 0; - return svm_fifo_max_dequeue (s->tx_fifo); -} - int -stream_session_peek_bytes (transport_connection_t * tc, u8 * buffer, - u32 offset, u32 max_bytes) +session_tx_fifo_peek_bytes (transport_connection_t * tc, u8 * buffer, + u32 offset, u32 max_bytes) { session_t *s = session_get (tc->s_index, tc->thread_index); return svm_fifo_peek (s->tx_fifo, offset, max_bytes, buffer); } u32 -session_dequeue_drop (transport_connection_t * tc, u32 max_bytes) +session_tx_fifo_dequeue_drop (transport_connection_t * tc, u32 max_bytes) { session_t *s = session_get (tc->s_index, tc->thread_index); return svm_fifo_dequeue_drop (s->tx_fifo, max_bytes); @@ -509,9 +489,14 @@ session_notify_subscribers (u32 app_index, session_t * s, * @return 0 on success or negative number if failed to send notification. */ static inline int -session_enqueue_notify (session_t * s) +session_enqueue_notify_inline (session_t * s) { app_worker_t *app_wrk; + u32 session_index; + u8 n_subscribers; + + session_index = s->session_index; + n_subscribers = svm_fifo_n_subscribers (s->rx_fifo); app_wrk = app_worker_get_if_valid (s->app_wrk_index); if (PREDICT_FALSE (!app_wrk)) @@ -523,21 +508,31 @@ session_enqueue_notify (session_t * s) /* *INDENT-OFF* */ SESSION_EVT_DBG(SESSION_EVT_ENQ, s, ({ ed->data[0] = SESSION_IO_EVT_RX; - ed->data[1] = svm_fifo_max_dequeue (s->rx_fifo); + ed->data[1] = svm_fifo_max_dequeue_prod (s->rx_fifo); })); /* *INDENT-ON* */ + s->flags &= ~SESSION_F_RX_EVT; if (PREDICT_FALSE (app_worker_lock_and_send_event (app_wrk, s, SESSION_IO_EVT_RX))) return -1; - if (PREDICT_FALSE (svm_fifo_n_subscribers (s->rx_fifo))) - return session_notify_subscribers (app_wrk->app_index, s, - s->rx_fifo, SESSION_IO_EVT_RX); + if (PREDICT_FALSE (n_subscribers)) + { + s = session_get (session_index, vlib_get_thread_index ()); + return session_notify_subscribers (app_wrk->app_index, s, + s->rx_fifo, SESSION_IO_EVT_RX); + } return 0; } +int +session_enqueue_notify (session_t * s) +{ + return session_enqueue_notify_inline (s); +} + int session_dequeue_notify (session_t * s) { @@ -569,9 +564,9 @@ session_dequeue_notify (session_t * s) * failures due to API queue being full. */ int -session_manager_flush_enqueue_events (u8 transport_proto, u32 thread_index) +session_main_flush_enqueue_events (u8 transport_proto, u32 thread_index) { - session_manager_worker_t *wrk = session_manager_get_worker (thread_index); + session_worker_t *wrk = session_main_get_worker (thread_index); session_t *s; int i, errors = 0; u32 *indices; @@ -586,42 +581,27 @@ session_manager_flush_enqueue_events (u8 transport_proto, u32 thread_index) errors++; continue; } - if (PREDICT_FALSE (session_enqueue_notify (s))) + + if (PREDICT_FALSE (session_enqueue_notify_inline (s))) errors++; } vec_reset_length (indices); wrk->session_to_enqueue[transport_proto] = indices; - wrk->current_enqueue_epoch[transport_proto]++; return errors; } int -session_manager_flush_all_enqueue_events (u8 transport_proto) +session_main_flush_all_enqueue_events (u8 transport_proto) { vlib_thread_main_t *vtm = vlib_get_thread_main (); int i, errors = 0; for (i = 0; i < 1 + vtm->n_threads; i++) - errors += session_manager_flush_enqueue_events (transport_proto, i); + errors += session_main_flush_enqueue_events (transport_proto, i); return errors; } -/** - * Init fifo tail and head pointers - * - * Useful if transport uses absolute offsets for tracking ooo segments. - */ -void -stream_session_init_fifos_pointers (transport_connection_t * tc, - u32 rx_pointer, u32 tx_pointer) -{ - session_t *s; - s = session_get (tc->s_index, tc->thread_index); - svm_fifo_init_pointers (s->rx_fifo, rx_pointer); - svm_fifo_init_pointers (s->tx_fifo, tx_pointer); -} - int session_stream_connect_notify (transport_connection_t * tc, u8 is_fail) { @@ -752,18 +732,14 @@ void session_transport_closing_notify (transport_connection_t * tc) { app_worker_t *app_wrk; - application_t *app; session_t *s; s = session_get (tc->s_index, tc->thread_index); if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSING) return; s->session_state = SESSION_STATE_TRANSPORT_CLOSING; - app_wrk = app_worker_get_if_valid (s->app_wrk_index); - if (!app_wrk) - return; - app = application_get (app_wrk->app_index); - app->cb_fns.session_disconnect_callback (s); + app_wrk = app_worker_get (s->app_wrk_index); + app_worker_close_notify (app_wrk, s); } /** @@ -860,17 +836,16 @@ session_transport_closed_notify (transport_connection_t * tc) void session_transport_reset_notify (transport_connection_t * tc) { - session_t *s; app_worker_t *app_wrk; - application_t *app; + session_t *s; + s = session_get (tc->s_index, tc->thread_index); svm_fifo_dequeue_drop_all (s->tx_fifo); if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSING) return; s->session_state = SESSION_STATE_TRANSPORT_CLOSING; app_wrk = app_worker_get (s->app_wrk_index); - app = application_get (app_wrk->app_index); - app->cb_fns.session_reset_callback (s); + app_worker_reset_notify (app_wrk, s); } int @@ -922,6 +897,7 @@ session_open_cl (u32 app_wrk_index, session_endpoint_t * rmt, u32 opaque) transport_connection_t *tc; transport_endpoint_cfg_t *tep; app_worker_t *app_wrk; + session_handle_t sh; session_t *s; int rv; @@ -946,6 +922,9 @@ session_open_cl (u32 app_wrk_index, session_endpoint_t * rmt, u32 opaque) return -1; } + sh = session_handle (s); + session_lookup_add_connection (tc, sh); + return app_worker_connect_notify (app_wrk, s, opaque); } @@ -1133,7 +1112,7 @@ session_transport_close (session_t * s) * point, either after sending everything or after a timeout, call delete * notify. This will finally lead to the complete cleanup of the session. */ - if (svm_fifo_max_dequeue (s->tx_fifo)) + if (svm_fifo_max_dequeue_cons (s->tx_fifo)) s->session_state = SESSION_STATE_CLOSED_WAITING; else s->session_state = SESSION_STATE_CLOSED; @@ -1173,7 +1152,7 @@ session_transport_cleanup (session_t * s) * vpp uses api svm region for event queues. */ void -session_vpp_event_queues_allocate (session_manager_main_t * smm) +session_vpp_event_queues_allocate (session_main_t * smm) { u32 evt_q_length = 2048, evt_size = sizeof (session_event_t); ssvm_private_t *eqs = &smm->evt_qs_segment; @@ -1236,14 +1215,27 @@ session_vpp_event_queues_allocate (session_manager_main_t * smm) } ssvm_private_t * -session_manager_get_evt_q_segment (void) +session_main_get_evt_q_segment (void) { - session_manager_main_t *smm = &session_manager_main; + session_main_t *smm = &session_main; if (smm->evt_qs_use_memfd_seg) return &smm->evt_qs_segment; return 0; } +u64 +session_segment_handle (session_t * s) +{ + svm_fifo_t *f; + + if (!s->rx_fifo) + return SESSION_INVALID_HANDLE; + + f = s->rx_fifo; + return segment_manager_make_segment_handle (f->segment_manager, + f->segment_index); +} + /* *INDENT-OFF* */ static session_fifo_rx_fn *session_tx_fns[TRANSPORT_TX_N_FNS] = { session_tx_fifo_peek_and_snd, @@ -1264,7 +1256,7 @@ session_register_transport (transport_proto_t transport_proto, const transport_proto_vft_t * vft, u8 is_ip4, u32 output_node) { - session_manager_main_t *smm = &session_manager_main; + session_main_t *smm = &session_main; session_type_t session_type; u32 next_index = ~0; @@ -1299,6 +1291,20 @@ session_get_transport (session_t * s) s->connection_index); } +void +session_get_endpoint (session_t * s, ip46_address_t * ip, u16 * port, + u8 * is_ip4, u8 is_lcl) +{ + if (s->session_state != SESSION_STATE_LISTENING) + return transport_get_endpoint (session_get_transport_proto (s), + s->connection_index, s->thread_index, ip, + port, is_ip4, is_lcl); + else + return transport_get_listener_endpoint (session_get_transport_proto (s), + s->connection_index, ip, port, + is_ip4, is_lcl); +} + transport_connection_t * listen_session_get_transport (session_t * s) { @@ -1306,25 +1312,6 @@ listen_session_get_transport (session_t * s) s->connection_index); } -int -listen_session_get_local_session_endpoint (session_t * listener, - session_endpoint_t * sep) -{ - transport_connection_t *tc; - tc = listen_session_get_transport (listener); - if (!tc) - { - clib_warning ("no transport"); - return -1; - } - - /* N.B. The ip should not be copied because this is the local endpoint */ - sep->port = tc->lcl_port; - sep->transport_proto = tc->proto; - sep->is_ip4 = tc->is_ip4; - return 0; -} - void session_flush_frames_main_thread (vlib_main_t * vm) { @@ -1337,11 +1324,11 @@ static clib_error_t * session_manager_main_enable (vlib_main_t * vm) { segment_manager_main_init_args_t _sm_args = { 0 }, *sm_args = &_sm_args; - session_manager_main_t *smm = &session_manager_main; + session_main_t *smm = &session_main; vlib_thread_main_t *vtm = vlib_get_thread_main (); u32 num_threads, preallocated_sessions_per_worker; - session_manager_worker_t *wrk; - int i, j; + session_worker_t *wrk; + int i; num_threads = 1 /* main thread */ + vtm->n_threads; @@ -1351,12 +1338,6 @@ session_manager_main_enable (vlib_main_t * vm) /* Allocate cache line aligned worker contexts */ vec_validate_aligned (smm->wrk, num_threads - 1, CLIB_CACHE_LINE_BYTES); - for (i = 0; i < TRANSPORT_N_PROTO; i++) - { - for (j = 0; j < num_threads; j++) - smm->wrk[j].current_enqueue_epoch[i] = 1; - } - for (i = 0; i < num_threads; i++) { wrk = &smm->wrk[i]; @@ -1462,7 +1443,7 @@ vnet_session_enable_disable (vlib_main_t * vm, u8 is_en) clib_error_t *error = 0; if (is_en) { - if (session_manager_main.is_enabled) + if (session_main.is_enabled) return 0; session_node_enable_disable (is_en); @@ -1470,7 +1451,7 @@ vnet_session_enable_disable (vlib_main_t * vm, u8 is_en) } else { - session_manager_main.is_enabled = 0; + session_main.is_enabled = 0; session_node_enable_disable (is_en); } @@ -1480,7 +1461,7 @@ vnet_session_enable_disable (vlib_main_t * vm, u8 is_en) clib_error_t * session_manager_main_init (vlib_main_t * vm) { - session_manager_main_t *smm = &session_manager_main; + session_main_t *smm = &session_main; smm->session_baseva = HIGH_SEGMENT_BASEVA; #if (HIGH_SEGMENT_BASEVA > (4ULL << 30)) smm->session_va_space_size = 128ULL << 30; @@ -1498,7 +1479,7 @@ VLIB_INIT_FUNCTION (session_manager_main_init); static clib_error_t * session_config_fn (vlib_main_t * vm, unformat_input_t * input) { - session_manager_main_t *smm = &session_manager_main; + session_main_t *smm = &session_main; u32 nitems; uword tmp;