X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=src%2Fvnet%2Fsession%2Fsession.c;h=23c58ef2d8915b10b847b5cc9da761ea6505d1ec;hb=e111bbd121b7c2ca4e2a002fd8ed4ffcea5222ff;hp=852f87da677501fd87b8405189959f96cae8e009;hpb=fc20c8e50f2784ad62b97bdb0094605d2b86f596;p=vpp.git diff --git a/src/vnet/session/session.c b/src/vnet/session/session.c index 852f87da677..23c58ef2d89 100644 --- a/src/vnet/session/session.c +++ b/src/vnet/session/session.c @@ -17,11 +17,13 @@ * @brief Session and session manager */ +#include #include #include #include #include #include +#include session_main_t session_main; @@ -58,7 +60,7 @@ session_send_evt_to_thread (void *data, void *args, u32 thread_index, evt = (session_event_t *) svm_msg_q_msg_data (mq, &msg); evt->session_index = *(u32 *) data; break; - case SESSION_IO_EVT_BUILTIN_TX: + case SESSION_IO_EVT_TX_MAIN: case SESSION_CTRL_EVT_CLOSE: case SESSION_CTRL_EVT_RESET: msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_IO_EVT_RING); @@ -215,15 +217,12 @@ session_alloc (u32 thread_index) void session_free (session_t * s) { - if (CLIB_DEBUG) - { - u8 thread_index = s->thread_index; - clib_memset (s, 0xFA, sizeof (*s)); - pool_put (session_main.wrk[thread_index].sessions, s); - return; - } + session_worker_t *wrk = &session_main.wrk[s->thread_index]; + SESSION_EVT (SESSION_EVT_FREE, s); - pool_put (session_main.wrk[s->thread_index].sessions, s); + if (CLIB_DEBUG) + clib_memset (s, 0xFA, sizeof (*s)); + pool_put (wrk->sessions, s); } u8 @@ -241,18 +240,26 @@ session_is_valid (u32 si, u8 thread_index) || s->session_state <= SESSION_STATE_LISTENING) return 1; - if (s->session_state == SESSION_STATE_CONNECTING && + if ((s->session_state == SESSION_STATE_CONNECTING || + s->session_state == SESSION_STATE_TRANSPORT_CLOSED) && (s->flags & SESSION_F_HALF_OPEN)) return 1; tc = session_get_transport (s); - if (s->connection_index != tc->c_index - || s->thread_index != tc->thread_index || tc->s_index != si) + if (s->connection_index != tc->c_index || + s->thread_index != tc->thread_index || tc->s_index != si) return 0; return 1; } +void +session_cleanup (session_t *s) +{ + segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo); + session_free (s); +} + static void session_cleanup_notify (session_t * s, session_cleanup_ntf_t ntf) { @@ -260,16 +267,21 @@ session_cleanup_notify (session_t * s, session_cleanup_ntf_t ntf) app_wrk = app_worker_get_if_valid (s->app_wrk_index); if (!app_wrk) - return; + { + if (ntf == SESSION_CLEANUP_TRANSPORT) + return; + + session_cleanup (s); + return; + } app_worker_cleanup_notify (app_wrk, s, ntf); } void -session_free_w_fifos (session_t * s) +session_program_cleanup (session_t *s) { + ASSERT (s->session_state == SESSION_STATE_TRANSPORT_DELETED); session_cleanup_notify (s, SESSION_CLEANUP_SESSION); - segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo); - session_free (s); } /** @@ -286,7 +298,7 @@ session_delete (session_t * s) if ((rv = session_lookup_del_session (s))) clib_warning ("session %u hash delete rv %d", s->session_index, rv); - session_free_w_fifos (s); + session_program_cleanup (s); } void @@ -301,16 +313,27 @@ session_cleanup_half_open (session_handle_t ho_handle) * session should be removed. */ if (ho->connection_index == ~0) { - ho->session_state = SESSION_STATE_CLOSED; + session_set_state (ho, SESSION_STATE_CLOSED); return; } /* Migrated transports are no longer half-opens */ transport_cleanup (session_get_transport_proto (ho), ho->connection_index, ho->app_index /* overloaded */); } - else - transport_cleanup_half_open (session_get_transport_proto (ho), - ho->connection_index); + else if (ho->session_state != SESSION_STATE_TRANSPORT_DELETED) + { + /* Cleanup half-open session lookup table if need be */ + if (ho->session_state != SESSION_STATE_TRANSPORT_CLOSED) + { + transport_connection_t *tc; + tc = transport_get_half_open (session_get_transport_proto (ho), + ho->connection_index); + if (tc && !(tc->flags & TRANSPORT_CONNECTION_F_NO_LOOKUP)) + session_lookup_del_half_open (tc); + } + transport_cleanup_half_open (session_get_transport_proto (ho), + ho->connection_index); + } session_free (ho); } @@ -319,10 +342,12 @@ session_half_open_free (session_t *ho) { app_worker_t *app_wrk; - ASSERT (vlib_get_thread_index () <= 1); - app_wrk = app_worker_get (ho->app_wrk_index); - app_worker_del_half_open (app_wrk, ho); - session_free (ho); + ASSERT (vlib_get_thread_index () <= transport_cl_thread ()); + app_wrk = app_worker_get_if_valid (ho->app_wrk_index); + if (app_wrk) + app_worker_del_half_open (app_wrk, ho); + else + session_free (ho); } static void @@ -335,16 +360,26 @@ session_half_open_free_rpc (void *args) void session_half_open_delete_notify (transport_connection_t *tc) { + session_t *ho = ho_session_get (tc->s_index); + + /* Cleanup half-open lookup table if need be */ + if (ho->session_state != SESSION_STATE_TRANSPORT_CLOSED) + { + if (!(tc->flags & TRANSPORT_CONNECTION_F_NO_LOOKUP)) + session_lookup_del_half_open (tc); + } + session_set_state (ho, SESSION_STATE_TRANSPORT_DELETED); + /* Notification from ctrl thread accepted without rpc */ - if (!tc->thread_index) + if (tc->thread_index == transport_cl_thread ()) { - session_half_open_free (ho_session_get (tc->s_index)); + session_half_open_free (ho); } else { void *args = uword_to_pointer ((uword) tc->s_index, void *); - session_send_rpc_evt_to_thread_force (0, session_half_open_free_rpc, - args); + session_send_rpc_evt_to_thread_force (transport_cl_thread (), + session_half_open_free_rpc, args); } } @@ -353,6 +388,9 @@ session_half_open_migrate_notify (transport_connection_t *tc) { session_t *ho; + /* Support half-open migrations only for transports with no lookup */ + ASSERT (tc->flags & TRANSPORT_CONNECTION_F_NO_LOOKUP); + ho = ho_session_get (tc->s_index); ho->flags |= SESSION_F_IS_MIGRATING; ho->connection_index = ~0; @@ -388,7 +426,7 @@ session_alloc_for_connection (transport_connection_t * tc) s = session_alloc (thread_index); s->session_type = session_type_from_proto_and_ip (tc->proto, tc->is_ip4); - s->session_state = SESSION_STATE_CLOSED; + session_set_state (s, SESSION_STATE_CLOSED); /* Attach transport to session and vice versa */ s->connection_index = tc->c_index; @@ -535,10 +573,158 @@ session_fifo_tuning (session_t * s, svm_fifo_t * f, } } +void +session_wrk_program_app_wrk_evts (session_worker_t *wrk, u32 app_wrk_index) +{ + u8 need_interrupt; + + ASSERT ((wrk - session_main.wrk) == vlib_get_thread_index ()); + need_interrupt = clib_bitmap_is_zero (wrk->app_wrks_pending_ntf); + wrk->app_wrks_pending_ntf = + clib_bitmap_set (wrk->app_wrks_pending_ntf, app_wrk_index, 1); + + if (need_interrupt) + vlib_node_set_interrupt_pending (wrk->vm, session_input_node.index); +} + +always_inline void +session_program_io_event (app_worker_t *app_wrk, session_t *s, + session_evt_type_t et, u8 is_cl) +{ + if (is_cl) + { + /* Special events for connectionless sessions */ + et += SESSION_IO_EVT_BUILTIN_RX - SESSION_IO_EVT_RX; + + ASSERT (s->thread_index == 0); + session_event_t evt = { + .event_type = et, + .session_handle = session_handle (s), + }; + + app_worker_add_event_custom (app_wrk, vlib_get_thread_index (), &evt); + } + else + { + app_worker_add_event (app_wrk, s, et); + } +} + +static inline int +session_notify_subscribers (u32 app_index, session_t *s, svm_fifo_t *f, + session_evt_type_t evt_type) +{ + app_worker_t *app_wrk; + application_t *app; + u8 is_cl; + int i; + + app = application_get (app_index); + if (!app) + return -1; + + is_cl = s->thread_index != vlib_get_thread_index (); + for (i = 0; i < f->shr->n_subscribers; i++) + { + app_wrk = application_get_worker (app, f->shr->subscribers[i]); + if (!app_wrk) + continue; + session_program_io_event (app_wrk, s, evt_type, is_cl ? 1 : 0); + } + + return 0; +} + +always_inline int +session_enqueue_notify_inline (session_t *s, u8 is_cl) +{ + app_worker_t *app_wrk; + + app_wrk = app_worker_get_if_valid (s->app_wrk_index); + if (PREDICT_FALSE (!app_wrk)) + return -1; + + session_program_io_event (app_wrk, s, SESSION_IO_EVT_RX, is_cl); + + if (PREDICT_FALSE (svm_fifo_n_subscribers (s->rx_fifo))) + return session_notify_subscribers (app_wrk->app_index, s, s->rx_fifo, + SESSION_IO_EVT_RX); + + return 0; +} + +int +session_enqueue_notify (session_t *s) +{ + return session_enqueue_notify_inline (s, 0 /* is_cl */); +} + +int +session_enqueue_notify_cl (session_t *s) +{ + return session_enqueue_notify_inline (s, 1 /* is_cl */); +} + +int +session_dequeue_notify (session_t *s) +{ + app_worker_t *app_wrk; + u8 is_cl; + + /* Unset as soon as event is requested */ + svm_fifo_clear_deq_ntf (s->tx_fifo); + + app_wrk = app_worker_get_if_valid (s->app_wrk_index); + if (PREDICT_FALSE (!app_wrk)) + return -1; + + is_cl = s->session_state == SESSION_STATE_LISTENING || + s->session_state == SESSION_STATE_OPENED; + session_program_io_event (app_wrk, s, SESSION_IO_EVT_TX, is_cl ? 1 : 0); + + if (PREDICT_FALSE (svm_fifo_n_subscribers (s->tx_fifo))) + return session_notify_subscribers (app_wrk->app_index, s, s->tx_fifo, + SESSION_IO_EVT_TX); + + return 0; +} + +/** + * Flushes queue of sessions that are to be notified of new data + * enqueued events. + * + * @param transport_proto transport protocol for which queue to be flushed + * @param thread_index Thread index for which the flush is to be performed. + * @return 0 on success or a positive number indicating the number of + * failures due to API queue being full. + */ +void +session_main_flush_enqueue_events (transport_proto_t transport_proto, + u32 thread_index) +{ + session_worker_t *wrk = session_main_get_worker (thread_index); + session_handle_t *handles; + session_t *s; + u32 i; + + handles = wrk->session_to_enqueue[transport_proto]; + + for (i = 0; i < vec_len (handles); i++) + { + s = session_get_from_handle (handles[i]); + session_fifo_tuning (s, s->rx_fifo, SESSION_FT_ACTION_ENQUEUED, + 0 /* TODO/not needed */); + session_enqueue_notify_inline (s, + s->thread_index != thread_index ? 1 : 0); + } + + vec_reset_length (handles); + wrk->session_to_enqueue[transport_proto] = handles; +} + /* - * Enqueue data for delivery to session peer. Does not notify peer of enqueue - * event but on request can queue notification events for later delivery by - * calling stream_server_flush_enqueue_events(). + * Enqueue data for delivery to app. If requested, it queues app notification + * event for later delivery. * * @param tc Transport connection which is to be enqueued data * @param b Buffer to be enqueued @@ -587,15 +773,14 @@ session_enqueue_stream_connection (transport_connection_t * tc, if (queue_event) { - /* Queue RX event on this fifo. Eventually these will need to be flushed - * by calling stream_server_flush_enqueue_events () */ - session_worker_t *wrk; - - wrk = session_main_get_worker (s->thread_index); + /* Queue RX event on this fifo. Eventually these will need to be + * flushed by calling @ref session_main_flush_enqueue_events () */ if (!(s->flags & SESSION_F_RX_EVT)) { + session_worker_t *wrk = session_main_get_worker (s->thread_index); + ASSERT (s->thread_index == vlib_get_thread_index ()); s->flags |= SESSION_F_RX_EVT; - vec_add1 (wrk->session_to_enqueue[tc->proto], s->session_index); + vec_add1 (wrk->session_to_enqueue[tc->proto], session_handle (s)); } session_fifo_tuning (s, s->rx_fifo, SESSION_FT_ACTION_ENQUEUED, 0); @@ -604,10 +789,11 @@ session_enqueue_stream_connection (transport_connection_t * tc, return enqueued; } -int -session_enqueue_dgram_connection (session_t * s, - session_dgram_hdr_t * hdr, - vlib_buffer_t * b, u8 proto, u8 queue_event) +always_inline int +session_enqueue_dgram_connection_inline (session_t *s, + session_dgram_hdr_t *hdr, + vlib_buffer_t *b, u8 proto, + u8 queue_event, u32 is_cl) { int rv; @@ -616,12 +802,10 @@ session_enqueue_dgram_connection (session_t * s, if (PREDICT_TRUE (!(b->flags & VLIB_BUFFER_NEXT_PRESENT))) { - /* *INDENT-OFF* */ svm_fifo_seg_t segs[2] = { { (u8 *) hdr, sizeof (*hdr) }, { vlib_buffer_get_current (b), b->current_length } }; - /* *INDENT-ON* */ rv = svm_fifo_enqueue_segments (s->rx_fifo, segs, 2, 0 /* allow_partial */ ); @@ -653,15 +837,16 @@ session_enqueue_dgram_connection (session_t * s, if (queue_event && rv > 0) { - /* Queue RX event on this fifo. Eventually these will need to be flushed - * by calling stream_server_flush_enqueue_events () */ - session_worker_t *wrk; - - wrk = session_main_get_worker (s->thread_index); + /* Queue RX event on this fifo. Eventually these will need to be + * flushed by calling @ref session_main_flush_enqueue_events () */ if (!(s->flags & SESSION_F_RX_EVT)) { + u32 thread_index = + is_cl ? vlib_get_thread_index () : s->thread_index; + session_worker_t *wrk = session_main_get_worker (thread_index); + ASSERT (s->thread_index == vlib_get_thread_index () || is_cl); s->flags |= SESSION_F_RX_EVT; - vec_add1 (wrk->session_to_enqueue[proto], s->session_index); + vec_add1 (wrk->session_to_enqueue[proto], session_handle (s)); } session_fifo_tuning (s, s->rx_fifo, SESSION_FT_ACTION_ENQUEUED, 0); @@ -669,6 +854,23 @@ session_enqueue_dgram_connection (session_t * s, return rv > 0 ? rv : 0; } +int +session_enqueue_dgram_connection (session_t *s, session_dgram_hdr_t *hdr, + vlib_buffer_t *b, u8 proto, u8 queue_event) +{ + return session_enqueue_dgram_connection_inline (s, hdr, b, proto, + queue_event, 0 /* is_cl */); +} + +int +session_enqueue_dgram_connection_cl (session_t *s, session_dgram_hdr_t *hdr, + vlib_buffer_t *b, u8 proto, + u8 queue_event) +{ + return session_enqueue_dgram_connection_inline (s, hdr, b, proto, + queue_event, 1 /* is_cl */); +} + int session_tx_fifo_peek_bytes (transport_connection_t * tc, u8 * buffer, u32 offset, u32 max_bytes) @@ -692,187 +894,6 @@ session_tx_fifo_dequeue_drop (transport_connection_t * tc, u32 max_bytes) return rv; } -static inline int -session_notify_subscribers (u32 app_index, session_t * s, - svm_fifo_t * f, session_evt_type_t evt_type) -{ - app_worker_t *app_wrk; - application_t *app; - int i; - - app = application_get (app_index); - if (!app) - return -1; - - for (i = 0; i < f->shr->n_subscribers; i++) - { - app_wrk = application_get_worker (app, f->shr->subscribers[i]); - if (!app_wrk) - continue; - if (app_worker_lock_and_send_event (app_wrk, s, evt_type)) - return -1; - } - - return 0; -} - -/** - * Notify session peer that new data has been enqueued. - * - * @param s Stream session for which the event is to be generated. - * @param lock Flag to indicate if call should lock message queue. - * - * @return 0 on success or negative number if failed to send notification. - */ -static inline int -session_enqueue_notify_inline (session_t * s) -{ - app_worker_t *app_wrk; - u32 session_index; - u8 n_subscribers; - - session_index = s->session_index; - n_subscribers = svm_fifo_n_subscribers (s->rx_fifo); - - app_wrk = app_worker_get_if_valid (s->app_wrk_index); - if (PREDICT_FALSE (!app_wrk)) - { - SESSION_DBG ("invalid s->app_index = %d", s->app_wrk_index); - return 0; - } - - SESSION_EVT (SESSION_EVT_ENQ, s, svm_fifo_max_dequeue_prod (s->rx_fifo)); - - s->flags &= ~SESSION_F_RX_EVT; - - /* Application didn't confirm accept yet */ - if (PREDICT_FALSE (s->session_state == SESSION_STATE_ACCEPTING)) - return 0; - - if (PREDICT_FALSE (app_worker_lock_and_send_event (app_wrk, s, - SESSION_IO_EVT_RX))) - return -1; - - if (PREDICT_FALSE (n_subscribers)) - { - s = session_get (session_index, vlib_get_thread_index ()); - return session_notify_subscribers (app_wrk->app_index, s, - s->rx_fifo, SESSION_IO_EVT_RX); - } - - return 0; -} - -int -session_enqueue_notify (session_t * s) -{ - return session_enqueue_notify_inline (s); -} - -static void -session_enqueue_notify_rpc (void *arg) -{ - u32 session_index = pointer_to_uword (arg); - session_t *s; - - s = session_get_if_valid (session_index, vlib_get_thread_index ()); - if (!s) - return; - - session_enqueue_notify (s); -} - -/** - * Like session_enqueue_notify, but can be called from a thread that does not - * own the session. - */ -void -session_enqueue_notify_thread (session_handle_t sh) -{ - u32 thread_index = session_thread_from_handle (sh); - u32 session_index = session_index_from_handle (sh); - - /* - * Pass session index (u32) as opposed to handle (u64) in case pointers - * are not 64-bit. - */ - session_send_rpc_evt_to_thread (thread_index, - session_enqueue_notify_rpc, - uword_to_pointer (session_index, void *)); -} - -int -session_dequeue_notify (session_t * s) -{ - app_worker_t *app_wrk; - - svm_fifo_clear_deq_ntf (s->tx_fifo); - - app_wrk = app_worker_get_if_valid (s->app_wrk_index); - if (PREDICT_FALSE (!app_wrk)) - return -1; - - if (PREDICT_FALSE (app_worker_lock_and_send_event (app_wrk, s, - SESSION_IO_EVT_TX))) - return -1; - - if (PREDICT_FALSE (s->tx_fifo->shr->n_subscribers)) - return session_notify_subscribers (app_wrk->app_index, s, - s->tx_fifo, SESSION_IO_EVT_TX); - - return 0; -} - -/** - * Flushes queue of sessions that are to be notified of new data - * enqueued events. - * - * @param thread_index Thread index for which the flush is to be performed. - * @return 0 on success or a positive number indicating the number of - * failures due to API queue being full. - */ -int -session_main_flush_enqueue_events (u8 transport_proto, u32 thread_index) -{ - session_worker_t *wrk = session_main_get_worker (thread_index); - session_t *s; - int i, errors = 0; - u32 *indices; - - indices = wrk->session_to_enqueue[transport_proto]; - - for (i = 0; i < vec_len (indices); i++) - { - s = session_get_if_valid (indices[i], thread_index); - if (PREDICT_FALSE (!s)) - { - errors++; - continue; - } - - session_fifo_tuning (s, s->rx_fifo, SESSION_FT_ACTION_ENQUEUED, - 0 /* TODO/not needed */ ); - - if (PREDICT_FALSE (session_enqueue_notify_inline (s))) - errors++; - } - - vec_reset_length (indices); - wrk->session_to_enqueue[transport_proto] = indices; - - return errors; -} - -int -session_main_flush_all_enqueue_events (u8 transport_proto) -{ - vlib_thread_main_t *vtm = vlib_get_thread_main (); - int i, errors = 0; - for (i = 0; i < 1 + vtm->n_threads; i++) - errors += session_main_flush_enqueue_events (transport_proto, i); - return errors; -} - int session_stream_connect_notify (transport_connection_t * tc, session_error_t err) @@ -887,6 +908,7 @@ session_stream_connect_notify (transport_connection_t * tc, session_lookup_del_half_open (tc); ho = ho_session_get (tc->s_index); + session_set_state (ho, SESSION_STATE_TRANSPORT_CLOSED); opaque = ho->opaque; app_wrk = app_worker_get_if_valid (ho->app_wrk_index); if (!app_wrk) @@ -896,8 +918,9 @@ session_stream_connect_notify (transport_connection_t * tc, return app_worker_connect_notify (app_wrk, s, err, opaque); s = session_alloc_for_connection (tc); - s->session_state = SESSION_STATE_CONNECTING; + session_set_state (s, SESSION_STATE_CONNECTING); s->app_wrk_index = app_wrk->wrk_index; + s->opaque = opaque; new_si = s->session_index; new_ti = s->thread_index; @@ -909,7 +932,7 @@ session_stream_connect_notify (transport_connection_t * tc, } s = session_get (new_si, new_ti); - s->session_state = SESSION_STATE_READY; + session_set_state (s, SESSION_STATE_READY); session_lookup_add_connection (tc, session_handle (s)); if (app_worker_connect_notify (app_wrk, s, SESSION_E_NONE, opaque)) @@ -925,43 +948,20 @@ session_stream_connect_notify (transport_connection_t * tc, return 0; } -typedef union session_switch_pool_reply_args_ -{ - struct - { - u32 session_index; - u16 thread_index; - u8 is_closed; - }; - u64 as_u64; -} session_switch_pool_reply_args_t; - -STATIC_ASSERT (sizeof (session_switch_pool_reply_args_t) <= sizeof (uword), - "switch pool reply args size"); - static void -session_switch_pool_reply (void *arg) +session_switch_pool_closed_rpc (void *arg) { - session_switch_pool_reply_args_t rargs; + session_handle_t sh; session_t *s; - rargs.as_u64 = pointer_to_uword (arg); - s = session_get_if_valid (rargs.session_index, rargs.thread_index); + sh = pointer_to_uword (arg); + s = session_get_from_handle_if_valid (sh); if (!s) return; - /* Session closed during migration. Clean everything up */ - if (rargs.is_closed) - { - transport_cleanup (session_get_transport_proto (s), s->connection_index, - s->thread_index); - segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo); - session_free (s); - return; - } - - /* Notify app that it has data on the new session */ - session_enqueue_notify (s); + transport_cleanup (session_get_transport_proto (s), s->connection_index, + s->thread_index); + session_cleanup (s); } typedef struct _session_switch_pool_args @@ -979,8 +979,7 @@ static void session_switch_pool (void *cb_args) { session_switch_pool_args_t *args = (session_switch_pool_args_t *) cb_args; - session_switch_pool_reply_args_t rargs; - session_handle_t new_sh; + session_handle_t sh, new_sh; segment_manager_t *sm; app_worker_t *app_wrk; session_t *s; @@ -988,37 +987,32 @@ session_switch_pool (void *cb_args) ASSERT (args->thread_index == vlib_get_thread_index ()); s = session_get (args->session_index, args->thread_index); - /* Check if session closed during migration */ - rargs.is_closed = s->session_state >= SESSION_STATE_TRANSPORT_CLOSING; + app_wrk = app_worker_get_if_valid (s->app_wrk_index); + if (!app_wrk) + goto app_closed; - transport_cleanup (session_get_transport_proto (s), s->connection_index, - s->thread_index); + /* Cleanup fifo segment slice state for fifos */ + sm = app_worker_get_connect_segment_manager (app_wrk); + segment_manager_detach_fifo (sm, &s->rx_fifo); + segment_manager_detach_fifo (sm, &s->tx_fifo); - app_wrk = app_worker_get_if_valid (s->app_wrk_index); - if (app_wrk) - { - /* Cleanup fifo segment slice state for fifos */ - sm = app_worker_get_connect_segment_manager (app_wrk); - segment_manager_detach_fifo (sm, &s->rx_fifo); - segment_manager_detach_fifo (sm, &s->tx_fifo); + /* Check if session closed during migration */ + if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSING) + goto app_closed; - /* Notify app, using old session, about the migration event */ - if (!rargs.is_closed) - { - new_sh = session_make_handle (args->new_session_index, - args->new_thread_index); - app_worker_migrate_notify (app_wrk, s, new_sh); - } - } + new_sh = + session_make_handle (args->new_session_index, args->new_thread_index); + app_worker_migrate_notify (app_wrk, s, new_sh); - /* Trigger app read and fifo updates on the new thread */ - rargs.session_index = args->new_session_index; - rargs.thread_index = args->new_thread_index; - session_send_rpc_evt_to_thread (args->new_thread_index, - session_switch_pool_reply, - uword_to_pointer (rargs.as_u64, void *)); + clib_mem_free (cb_args); + return; - session_free (s); +app_closed: + /* Session closed during migration. Clean everything up */ + sh = session_handle (s); + session_send_rpc_evt_to_thread (args->new_thread_index, + session_switch_pool_closed_rpc, + uword_to_pointer (sh, void *)); clib_mem_free (cb_args); } @@ -1039,7 +1033,7 @@ session_dgram_connect_notify (transport_connection_t * tc, */ new_s = session_clone_safe (tc->s_index, old_thread_index); new_s->connection_index = tc->c_index; - new_s->session_state = SESSION_STATE_READY; + session_set_state (new_s, SESSION_STATE_READY); new_s->flags |= SESSION_F_IS_MIGRATING; if (!(tc->flags & TRANSPORT_CONNECTION_F_NO_LOOKUP)) @@ -1093,11 +1087,11 @@ session_transport_closing_notify (transport_connection_t * tc) * accept might be rejected */ if (s->session_state == SESSION_STATE_ACCEPTING) { - s->session_state = SESSION_STATE_TRANSPORT_CLOSING; + session_set_state (s, SESSION_STATE_TRANSPORT_CLOSING); return; } - s->session_state = SESSION_STATE_TRANSPORT_CLOSING; + session_set_state (s, SESSION_STATE_TRANSPORT_CLOSING); app_wrk = app_worker_get (s->app_wrk_index); app_worker_close_notify (app_wrk, s); } @@ -1138,7 +1132,7 @@ session_transport_delete_notify (transport_connection_t * tc) * because transport will soon be closed and closed sessions * are assumed to have been removed from the lookup table */ session_lookup_del_session (s); - s->session_state = SESSION_STATE_TRANSPORT_DELETED; + session_set_state (s, SESSION_STATE_TRANSPORT_DELETED); session_cleanup_notify (s, SESSION_CLEANUP_TRANSPORT); svm_fifo_dequeue_drop_all (s->tx_fifo); break; @@ -1149,7 +1143,7 @@ session_transport_delete_notify (transport_connection_t * tc) * session is just removed because both transport and app have * confirmed the close*/ session_lookup_del_session (s); - s->session_state = SESSION_STATE_TRANSPORT_DELETED; + session_set_state (s, SESSION_STATE_TRANSPORT_DELETED); session_cleanup_notify (s, SESSION_CLEANUP_TRANSPORT); svm_fifo_dequeue_drop_all (s->tx_fifo); session_program_transport_ctrl_evt (s, SESSION_CTRL_EVT_CLOSE); @@ -1158,6 +1152,7 @@ session_transport_delete_notify (transport_connection_t * tc) break; case SESSION_STATE_CLOSED: session_cleanup_notify (s, SESSION_CLEANUP_TRANSPORT); + session_set_state (s, SESSION_STATE_TRANSPORT_DELETED); session_delete (s); break; default: @@ -1193,17 +1188,15 @@ session_transport_closed_notify (transport_connection_t * tc) { session_transport_closing_notify (tc); svm_fifo_dequeue_drop_all (s->tx_fifo); - s->session_state = SESSION_STATE_TRANSPORT_CLOSED; + session_set_state (s, SESSION_STATE_TRANSPORT_CLOSED); } /* If app close has not been received or has not yet resulted in * a transport close, only mark the session transport as closed */ else if (s->session_state <= SESSION_STATE_CLOSING) - { - s->session_state = SESSION_STATE_TRANSPORT_CLOSED; - } + session_set_state (s, SESSION_STATE_TRANSPORT_CLOSED); /* If app also closed, switch to closed */ else if (s->session_state == SESSION_STATE_APP_CLOSED) - s->session_state = SESSION_STATE_CLOSED; + session_set_state (s, SESSION_STATE_CLOSED); app_wrk = app_worker_get_if_valid (s->app_wrk_index); if (app_wrk) @@ -1225,10 +1218,10 @@ session_transport_reset_notify (transport_connection_t * tc) return; if (s->session_state == SESSION_STATE_ACCEPTING) { - s->session_state = SESSION_STATE_TRANSPORT_CLOSING; + session_set_state (s, SESSION_STATE_TRANSPORT_CLOSING); return; } - s->session_state = SESSION_STATE_TRANSPORT_CLOSING; + session_set_state (s, SESSION_STATE_TRANSPORT_CLOSING); app_wrk = app_worker_get (s->app_wrk_index); app_worker_reset_notify (app_wrk, s); } @@ -1245,12 +1238,12 @@ session_stream_accept_notify (transport_connection_t * tc) return -1; if (s->session_state != SESSION_STATE_CREATED) return 0; - s->session_state = SESSION_STATE_ACCEPTING; + session_set_state (s, SESSION_STATE_ACCEPTING); if (app_worker_accept_notify (app_wrk, s)) { /* On transport delete, no notifications should be sent. Unless, the * accept is retried and successful. */ - s->session_state = SESSION_STATE_CREATED; + session_set_state (s, SESSION_STATE_CREATED); return -1; } return 0; @@ -1268,7 +1261,7 @@ session_stream_accept (transport_connection_t * tc, u32 listener_index, s = session_alloc_for_connection (tc); s->listener_handle = ((u64) thread_index << 32) | (u64) listener_index; - s->session_state = SESSION_STATE_CREATED; + session_set_state (s, SESSION_STATE_CREATED); if ((rv = app_worker_init_accepted (s))) { @@ -1312,7 +1305,7 @@ session_dgram_accept (transport_connection_t * tc, u32 listener_index, } session_lookup_add_connection (tc, session_handle (s)); - s->session_state = SESSION_STATE_ACCEPTING; + session_set_state (s, SESSION_STATE_ACCEPTING); app_wrk = app_worker_get (s->app_wrk_index); if ((rv = app_worker_accept_notify (app_wrk, s))) @@ -1350,7 +1343,8 @@ session_open_cl (session_endpoint_cfg_t *rmt, session_handle_t *rsh) app_wrk = app_worker_get (rmt->app_wrk_index); s = session_alloc_for_connection (tc); s->app_wrk_index = app_wrk->wrk_index; - s->session_state = SESSION_STATE_OPENED; + s->opaque = rmt->opaque; + session_set_state (s, SESSION_STATE_OPENED); if (app_worker_init_connected (app_wrk, s)) { session_free (s); @@ -1475,6 +1469,7 @@ session_listen (session_t * ls, session_endpoint_cfg_t * sep) * worker because local tables (for ct sessions) are not backed by a fib */ ls = listen_session_get (s_index); ls->connection_index = tc_index; + ls->opaque = sep->opaque; return 0; } @@ -1529,9 +1524,15 @@ session_half_close (session_t *s) void session_close (session_t * s) { - if (!s) + if (!s || (s->flags & SESSION_F_APP_CLOSED)) return; + /* Transports can close and delete their state independent of app closes + * and transport initiated state transitions can hide app closes. Instead + * of extending the state machine to support separate tracking of app and + * transport initiated closes, use a flag. */ + s->flags |= SESSION_F_APP_CLOSED; + if (s->session_state >= SESSION_STATE_CLOSING) { /* Session will only be removed once both app and transport @@ -1542,9 +1543,12 @@ session_close (session_t * s) return; } - /* App closed so stop propagating dequeue notifications */ - svm_fifo_clear_deq_ntf (s->tx_fifo); - s->session_state = SESSION_STATE_CLOSING; + /* App closed so stop propagating dequeue notifications. + * App might disconnect session before connected, in this case, + * tx_fifo may not be setup yet, so clear only it's inited. */ + if (s->tx_fifo) + svm_fifo_clear_deq_ntf (s->tx_fifo); + session_set_state (s, SESSION_STATE_CLOSING); session_program_transport_ctrl_evt (s, SESSION_CTRL_EVT_CLOSE); } @@ -1556,9 +1560,12 @@ session_reset (session_t * s) { if (s->session_state >= SESSION_STATE_CLOSING) return; - /* Drop all outstanding tx data */ - svm_fifo_dequeue_drop_all (s->tx_fifo); - s->session_state = SESSION_STATE_CLOSING; + /* Drop all outstanding tx data + * App might disconnect session before connected, in this case, + * tx_fifo may not be setup yet, so clear only it's inited. */ + if (s->tx_fifo) + svm_fifo_dequeue_drop_all (s->tx_fifo); + session_set_state (s, SESSION_STATE_CLOSING); session_program_transport_ctrl_evt (s, SESSION_CTRL_EVT_RESET); } @@ -1593,10 +1600,10 @@ session_transport_close (session_t * s) if (s->session_state >= SESSION_STATE_APP_CLOSED) { if (s->session_state == SESSION_STATE_TRANSPORT_CLOSED) - s->session_state = SESSION_STATE_CLOSED; + session_set_state (s, SESSION_STATE_CLOSED); /* If transport is already deleted, just free the session */ else if (s->session_state >= SESSION_STATE_TRANSPORT_DELETED) - session_free_w_fifos (s); + session_program_cleanup (s); return; } @@ -1606,7 +1613,7 @@ session_transport_close (session_t * s) * delete notify. This will finally lead to the complete cleanup of the * session. */ - s->session_state = SESSION_STATE_APP_CLOSED; + session_set_state (s, SESSION_STATE_APP_CLOSED); transport_close (session_get_transport_proto (s), s->connection_index, s->thread_index); @@ -1621,13 +1628,13 @@ session_transport_reset (session_t * s) if (s->session_state >= SESSION_STATE_APP_CLOSED) { if (s->session_state == SESSION_STATE_TRANSPORT_CLOSED) - s->session_state = SESSION_STATE_CLOSED; + session_set_state (s, SESSION_STATE_CLOSED); else if (s->session_state >= SESSION_STATE_TRANSPORT_DELETED) - session_free_w_fifos (s); + session_program_cleanup (s); return; } - s->session_state = SESSION_STATE_APP_CLOSED; + session_set_state (s, SESSION_STATE_APP_CLOSED); transport_reset (session_get_transport_proto (s), s->connection_index, s->thread_index); } @@ -1726,6 +1733,22 @@ session_segment_handle (session_t * s) f->segment_index); } +void +session_get_original_dst (transport_endpoint_t *i2o_src, + transport_endpoint_t *i2o_dst, + transport_proto_t transport_proto, u32 *original_dst, + u16 *original_dst_port) +{ + session_main_t *smm = vnet_get_session_main (); + ip_protocol_t proto = + (transport_proto == TRANSPORT_PROTO_TCP ? IPPROTO_TCP : IPPROTO_UDP); + if (!smm->original_dst_lookup || !i2o_dst->is_ip4) + return; + smm->original_dst_lookup (&i2o_src->ip.ip4, i2o_src->port, &i2o_dst->ip.ip4, + i2o_dst->port, proto, original_dst, + original_dst_port); +} + /* *INDENT-OFF* */ static session_fifo_rx_fn *session_tx_fns[TRANSPORT_TX_N_FNS] = { session_tx_fifo_peek_and_snd, @@ -1988,6 +2011,87 @@ session_manager_main_disable (vlib_main_t * vm) transport_enable_disable (vm, 0 /* is_en */ ); } +/* in this new callback, cookie hint the index */ +void +session_dma_completion_cb (vlib_main_t *vm, struct vlib_dma_batch *batch) +{ + session_worker_t *wrk; + wrk = session_main_get_worker (vm->thread_index); + session_dma_transfer *dma_transfer; + + dma_transfer = &wrk->dma_trans[wrk->trans_head]; + vec_add (wrk->pending_tx_buffers, dma_transfer->pending_tx_buffers, + vec_len (dma_transfer->pending_tx_buffers)); + vec_add (wrk->pending_tx_nexts, dma_transfer->pending_tx_nexts, + vec_len (dma_transfer->pending_tx_nexts)); + vec_reset_length (dma_transfer->pending_tx_buffers); + vec_reset_length (dma_transfer->pending_tx_nexts); + wrk->trans_head++; + if (wrk->trans_head == wrk->trans_size) + wrk->trans_head = 0; + return; +} + +static void +session_prepare_dma_args (vlib_dma_config_t *args) +{ + args->max_batches = 16; + args->max_transfers = DMA_TRANS_SIZE; + args->max_transfer_size = 65536; + args->features = 0; + args->sw_fallback = 1; + args->barrier_before_last = 1; + args->callback_fn = session_dma_completion_cb; +} + +static void +session_node_enable_dma (u8 is_en, int n_vlibs) +{ + vlib_dma_config_t args; + session_prepare_dma_args (&args); + session_worker_t *wrk; + vlib_main_t *vm; + + int config_index = -1; + + if (is_en) + { + vm = vlib_get_main_by_index (0); + config_index = vlib_dma_config_add (vm, &args); + } + else + { + vm = vlib_get_main_by_index (0); + wrk = session_main_get_worker (0); + if (wrk->config_index >= 0) + vlib_dma_config_del (vm, wrk->config_index); + } + int i; + for (i = 0; i < n_vlibs; i++) + { + vm = vlib_get_main_by_index (i); + wrk = session_main_get_worker (vm->thread_index); + wrk->config_index = config_index; + if (is_en) + { + if (config_index >= 0) + wrk->dma_enabled = true; + wrk->dma_trans = (session_dma_transfer *) clib_mem_alloc ( + sizeof (session_dma_transfer) * DMA_TRANS_SIZE); + bzero (wrk->dma_trans, + sizeof (session_dma_transfer) * DMA_TRANS_SIZE); + } + else + { + if (wrk->dma_trans) + clib_mem_free (wrk->dma_trans); + } + wrk->trans_head = 0; + wrk->trans_tail = 0; + wrk->trans_size = DMA_TRANS_SIZE; + } +} + void session_node_enable_disable (u8 is_en) { @@ -2023,11 +2127,15 @@ session_node_enable_disable (u8 is_en) if (!sm->poll_main) continue; } + vlib_node_set_state (vm, session_input_node.index, mstate); vlib_node_set_state (vm, session_queue_node.index, state); } if (sm->use_private_rx_mqs) application_enable_rx_mqs_nodes (is_en); + + if (sm->dma_enabled) + session_node_enable_dma (is_en, n_vlibs); } clib_error_t * @@ -2063,6 +2171,8 @@ session_main_init (vlib_main_t * vm) smm->use_private_rx_mqs = 0; smm->no_adaptive = 0; smm->last_transport_proto_type = TRANSPORT_PROTO_HTTP; + smm->port_allocator_min_src_port = 1024; + smm->port_allocator_max_src_port = 65535; return 0; } @@ -2160,6 +2270,10 @@ session_config_fn (vlib_main_t * vm, unformat_input_t * input) else if (unformat (input, "local-endpoints-table-buckets %d", &smm->local_endpoints_table_buckets)) ; + else if (unformat (input, "min-src-port %d", &tmp)) + smm->port_allocator_min_src_port = tmp; + else if (unformat (input, "max-src-port %d", &tmp)) + smm->port_allocator_max_src_port = tmp; else if (unformat (input, "enable")) smm->session_enable_asap = 1; else if (unformat (input, "use-app-socket-api")) @@ -2170,6 +2284,13 @@ session_config_fn (vlib_main_t * vm, unformat_input_t * input) smm->use_private_rx_mqs = 1; else if (unformat (input, "no-adaptive")) smm->no_adaptive = 1; + else if (unformat (input, "use-dma")) + smm->dma_enabled = 1; + else if (unformat (input, "nat44-original-dst-enable")) + { + smm->original_dst_lookup = vlib_get_plugin_symbol ( + "nat_plugin.so", "nat44_original_dst_lookup"); + } /* * Deprecated but maintained for compatibility */