X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=src%2Fvnet%2Fsession%2Fsession.c;h=81c93064d38dd185cd6ef35475c14e8668dec398;hb=c01d578a625fb136bc33b0eb9c19907769a67989;hp=c5b2124acbd7493ff6a16f8ef7977dd3a7e20f99;hpb=74cac8839efae6a69baea031fb01602ef8907e8a;p=vpp.git diff --git a/src/vnet/session/session.c b/src/vnet/session/session.c index c5b2124acbd..81c93064d38 100644 --- a/src/vnet/session/session.c +++ b/src/vnet/session/session.c @@ -69,6 +69,7 @@ session_send_evt_to_thread (void *data, void *args, u32 thread_index, case FIFO_EVENT_BUILTIN_RX: evt->fifo = data; break; + case FIFO_EVENT_BUILTIN_TX: case FIFO_EVENT_DISCONNECT: evt->session_handle = session_handle ((stream_session_t *) data); break; @@ -89,10 +90,10 @@ session_send_io_evt_to_thread (svm_fifo_t * f, session_evt_type_t evt_type) } int -session_send_io_evt_to_thread_custom (svm_fifo_t * f, u32 thread_index, +session_send_io_evt_to_thread_custom (void *data, u32 thread_index, session_evt_type_t evt_type) { - return session_send_evt_to_thread (f, 0, thread_index, evt_type); + return session_send_evt_to_thread (data, 0, thread_index, evt_type); } int @@ -152,6 +153,14 @@ session_free (stream_session_t * s) memset (s, 0xFA, sizeof (*s)); } +void +session_free_w_fifos (stream_session_t * s) +{ + segment_manager_dealloc_fifos (s->svm_segment_index, s->server_rx_fifo, + s->server_tx_fifo); + session_free (s); +} + int session_alloc_fifos (segment_manager_t * sm, stream_session_t * s) { @@ -188,7 +197,7 @@ session_alloc_for_connection (transport_connection_t * tc) s = session_alloc (thread_index); s->session_type = session_type_from_proto_and_ip (tc->proto, tc->is_ip4); s->session_state = SESSION_STATE_CONNECTING; - s->enqueue_epoch = ~0; + s->enqueue_epoch = (u64) ~ 0; /* Attach transport to session and vice versa */ s->connection_index = tc->c_index; @@ -384,13 +393,13 @@ session_enqueue_stream_connection (transport_connection_t * tc, * by calling stream_server_flush_enqueue_events () */ session_manager_main_t *smm = vnet_get_session_manager_main (); u32 thread_index = s->thread_index; - u32 enqueue_epoch = smm->current_enqueue_epoch[tc->proto][thread_index]; + u64 enqueue_epoch = smm->current_enqueue_epoch[tc->proto][thread_index]; if (s->enqueue_epoch != enqueue_epoch) { s->enqueue_epoch = enqueue_epoch; vec_add1 (smm->session_to_enqueue[tc->proto][thread_index], - s - smm->sessions[thread_index]); + s->session_index); } } @@ -425,13 +434,13 @@ session_enqueue_dgram_connection (stream_session_t * s, * by calling stream_server_flush_enqueue_events () */ session_manager_main_t *smm = vnet_get_session_manager_main (); u32 thread_index = s->thread_index; - u32 enqueue_epoch = smm->current_enqueue_epoch[proto][thread_index]; + u64 enqueue_epoch = smm->current_enqueue_epoch[proto][thread_index]; if (s->enqueue_epoch != enqueue_epoch) { s->enqueue_epoch = enqueue_epoch; vec_add1 (smm->session_to_enqueue[proto][thread_index], - s - smm->sessions[thread_index]); + s->session_index); } } return enqueued; @@ -486,7 +495,7 @@ stream_session_dequeue_drop (transport_connection_t * tc, u32 max_bytes) * @return 0 on success or negative number if failed to send notification. */ static inline int -session_enqueue_notify (stream_session_t * s, u8 lock) +session_enqueue_notify (stream_session_t * s) { app_worker_t *app; @@ -504,10 +513,7 @@ session_enqueue_notify (stream_session_t * s, u8 lock) })); /* *INDENT-ON* */ - if (lock) - return app_worker_lock_and_send_event (app, s, FIFO_EVENT_APP_RX); - - return app_worker_send_event (app, s, FIFO_EVENT_APP_RX); + return app_worker_lock_and_send_event (app, s, FIFO_EVENT_APP_RX); } int @@ -519,10 +525,7 @@ session_dequeue_notify (stream_session_t * s) if (PREDICT_FALSE (!app)) return -1; - if (session_transport_service_type (s) == TRANSPORT_SERVICE_CL) - return app_worker_lock_and_send_event (app, s, FIFO_EVENT_APP_RX); - - return app_worker_send_event (app, s, FIFO_EVENT_APP_TX); + return app_worker_lock_and_send_event (app, s, FIFO_EVENT_APP_TX); } /** @@ -537,14 +540,11 @@ int session_manager_flush_enqueue_events (u8 transport_proto, u32 thread_index) { session_manager_main_t *smm = &session_manager_main; - transport_service_type_t tp_service; - int i, errors = 0, lock; stream_session_t *s; + int i, errors = 0; u32 *indices; indices = smm->session_to_enqueue[transport_proto][thread_index]; - tp_service = transport_protocol_service_type (transport_proto); - lock = tp_service == TRANSPORT_SERVICE_CL; for (i = 0; i < vec_len (indices); i++) { @@ -554,7 +554,7 @@ session_manager_flush_enqueue_events (u8 transport_proto, u32 thread_index) errors++; continue; } - if (PREDICT_FALSE (session_enqueue_notify (s, lock))) + if (PREDICT_FALSE (session_enqueue_notify (s))) errors++; } @@ -760,7 +760,9 @@ stream_session_disconnect_notify (transport_connection_t * tc) stream_session_t *s; s = session_get (tc->s_index, tc->thread_index); - s->session_state = SESSION_STATE_CLOSING; + if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSING) + return; + s->session_state = SESSION_STATE_TRANSPORT_CLOSING; app_wrk = app_worker_get_if_valid (s->app_wrk_index); if (!app_wrk) return; @@ -782,10 +784,7 @@ stream_session_delete (stream_session_t * s) if ((rv = session_lookup_del_session (s))) clib_warning ("hash delete error, rv %d", rv); - /* Cleanup fifo segments */ - segment_manager_dealloc_fifos (s->svm_segment_index, s->server_rx_fifo, - s->server_tx_fifo); - session_free (s); + session_free_w_fifos (s); } /** @@ -802,10 +801,35 @@ stream_session_delete_notify (transport_connection_t * tc) stream_session_t *s; /* App might've been removed already */ - s = session_get_if_valid (tc->s_index, tc->thread_index); - if (!s) + if (!(s = session_get_if_valid (tc->s_index, tc->thread_index))) return; - stream_session_delete (s); + + /* Make sure we don't try to send anything more */ + svm_fifo_dequeue_drop_all (s->server_tx_fifo); + + switch (s->session_state) + { + case SESSION_STATE_TRANSPORT_CLOSING: + /* If transport finishes or times out before we get a reply + * from the app, do the whole disconnect since we might still + * have lingering events */ + stream_session_disconnect (s); + s->session_state = SESSION_STATE_CLOSED; + break; + case SESSION_STATE_CLOSING: + /* Cleanup lookup table. Transport needs to still be valid */ + session_lookup_del_session (s); + s->session_state = SESSION_STATE_CLOSED; + break; + case SESSION_STATE_CLOSED: + case SESSION_STATE_ACCEPTING: + stream_session_delete (s); + break; + default: + /* Assume connection was not yet added the lookup table */ + session_free_w_fifos (s); + break; + } } /** @@ -838,7 +862,7 @@ stream_session_accept (transport_connection_t * tc, u32 listener_index, /* Find the server */ listener = listen_session_get (listener_index); - app_wrk = app_worker_get (listener->app_wrk_index); + app_wrk = application_listener_select_worker (listener, 0); sm = app_worker_get_listen_segment_manager (app_wrk, listener); if ((rv = session_alloc_and_init (sm, tc, 1, &s))) @@ -1059,11 +1083,9 @@ stream_session_disconnect (stream_session_t * s) s->session_state = SESSION_STATE_CLOSING; /* If we are in the handler thread, or being called with the worker barrier - * held (api/cli), just append a new event to pending disconnects vector. */ - if ((thread_index == 0 && !vlib_get_current_process (vlib_get_main ())) - || thread_index == s->thread_index) + * held, just append a new event to pending disconnects vector. */ + if (vlib_thread_is_main_w_barrier () || thread_index == s->thread_index) { - ASSERT (s->thread_index == thread_index || thread_index == 0); vec_add2 (smm->pending_disconnects[s->thread_index], evt, 1); memset (evt, 0, sizeof (*evt)); evt->session_handle = session_handle (s); @@ -1083,6 +1105,12 @@ stream_session_disconnect (stream_session_t * s) void stream_session_disconnect_transport (stream_session_t * s) { + /* If transport is already closed, just free the session */ + if (s->session_state == SESSION_STATE_CLOSED) + { + session_free_w_fifos (s); + return; + } s->session_state = SESSION_STATE_CLOSED; tp_vfts[session_get_transport_proto (s)].close (s->connection_index, s->thread_index); @@ -1106,9 +1134,7 @@ stream_session_cleanup (stream_session_t * s) s->thread_index); /* Since we called cleanup, no delete notification will come. So, make * sure the session is properly freed. */ - segment_manager_dealloc_fifos (s->svm_segment_index, s->server_rx_fifo, - s->server_tx_fifo); - session_free (s); + session_free_w_fifos (s); } transport_service_type_t