X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=src%2Fvnet%2Fsession%2Fsession.c;h=4081f90948200ff51953a2cc3218906fdec51391;hb=refs%2Fchanges%2F72%2F16572%2F11;hp=f6894868a3a6fd8d48f9bb2e13a23e2d9afa7ed9;hpb=c44a558164a466a74a4c10d4e7d7dd1b9a4b01dd;p=vpp.git diff --git a/src/vnet/session/session.c b/src/vnet/session/session.c index f6894868a3a..4081f909482 100644 --- a/src/vnet/session/session.c +++ b/src/vnet/session/session.c @@ -66,6 +66,7 @@ session_send_evt_to_thread (void *data, void *args, u32 thread_index, evt->rpc_args.arg = args; break; case FIFO_EVENT_APP_TX: + case SESSION_IO_EVT_TX_FLUSH: case FIFO_EVENT_BUILTIN_RX: evt->fifo = data; break; @@ -194,7 +195,6 @@ session_alloc_for_connection (transport_connection_t * tc) s = session_alloc (thread_index); s->session_type = session_type_from_proto_and_ip (tc->proto, tc->is_ip4); - s->session_state = SESSION_STATE_CONNECTING; s->enqueue_epoch = (u64) ~ 0; /* Attach transport to session and vice versa */ @@ -629,6 +629,7 @@ session_stream_connect_notify (transport_connection_t * tc, u8 is_fail) } else { + new_s->session_state = SESSION_STATE_CONNECTING; new_s->app_wrk_index = app_wrk->wrk_index; new_si = new_s->session_index; new_ti = new_s->thread_index; @@ -723,7 +724,7 @@ session_dgram_connect_notify (transport_connection_t * tc, return 0; } -void +int stream_session_accept_notify (transport_connection_t * tc) { app_worker_t *app_wrk; @@ -733,9 +734,9 @@ stream_session_accept_notify (transport_connection_t * tc) s = session_get (tc->s_index, tc->thread_index); app_wrk = app_worker_get_if_valid (s->app_wrk_index); if (!app_wrk) - return; + return -1; app = application_get (app_wrk->app_index); - app->cb_fns.session_accept_callback (s); + return app->cb_fns.session_accept_callback (s); } /** @@ -805,7 +806,10 @@ stream_session_delete_notify (transport_connection_t * tc) case SESSION_STATE_TRANSPORT_CLOSING: /* If transport finishes or times out before we get a reply * from the app, do the whole disconnect since we might still - * have lingering events */ + * have lingering events. Cleanup session table in advance + * because transport will soon be closed and closed sessions + * are assumed to have been removed from the lookup table */ + session_lookup_del_session (s); stream_session_disconnect (s); s->session_state = SESSION_STATE_CLOSED; break; @@ -816,11 +820,11 @@ stream_session_delete_notify (transport_connection_t * tc) break; case SESSION_STATE_CLOSED: case SESSION_STATE_ACCEPTING: + case SESSION_STATE_CLOSED_WAITING: stream_session_delete (s); break; default: - /* Assume connection was not yet added the lookup table */ - session_free_w_fifos (s); + stream_session_delete (s); break; } } @@ -835,7 +839,10 @@ stream_session_reset_notify (transport_connection_t * tc) app_worker_t *app_wrk; application_t *app; s = session_get (tc->s_index, tc->thread_index); - s->session_state = SESSION_STATE_CLOSED; + svm_fifo_dequeue_drop_all (s->server_tx_fifo); + if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSING) + return; + s->session_state = SESSION_STATE_TRANSPORT_CLOSING; app_wrk = app_worker_get (s->app_wrk_index); app = application_get (app_wrk->app_index); app->cb_fns.session_reset_callback (s); @@ -869,7 +876,7 @@ stream_session_accept (transport_connection_t * tc, u32 listener_index, if (notify) { application_t *app = application_get (app_wrk->app_index); - app->cb_fns.session_accept_callback (s); + return app->cb_fns.session_accept_callback (s); } return 0; @@ -1081,7 +1088,7 @@ stream_session_disconnect (stream_session_t * s) * held, just append a new event to pending disconnects vector. */ if (vlib_thread_is_main_w_barrier () || thread_index == s->thread_index) { - wrk = session_manager_get_worker (thread_index); + wrk = session_manager_get_worker (s->thread_index); vec_add2 (wrk->pending_disconnects, evt, 1); clib_memset (evt, 0, sizeof (*evt)); evt->session_handle = session_handle (s); @@ -1107,7 +1114,18 @@ stream_session_disconnect_transport (stream_session_t * s) session_free_w_fifos (s); return; } - s->session_state = SESSION_STATE_CLOSED; + + /* If tx queue wasn't drained, change state to closed waiting for transport. + * This way, the transport, if it so wishes, can continue to try sending the + * outstanding data (in closed state it cannot). It MUST however at one + * point, either after sending everything or after a timeout, call delete + * notify. This will finally lead to the complete cleanup of the session. + */ + if (svm_fifo_max_dequeue (s->server_tx_fifo)) + s->session_state = SESSION_STATE_CLOSED_WAITING; + else + s->session_state = SESSION_STATE_CLOSED; + tp_vfts[session_get_transport_proto (s)].close (s->connection_index, s->thread_index); } @@ -1204,11 +1222,10 @@ session_vpp_event_queues_allocate (session_manager_main_t * smm) for (i = 0; i < vec_len (smm->wrk); i++) { svm_msg_q_cfg_t _cfg, *cfg = &_cfg; - u32 notif_q_size = clib_max (16, evt_q_length >> 4); svm_msg_q_ring_cfg_t rc[SESSION_MQ_N_RINGS] = { {evt_q_length, evt_size, 0} , - {notif_q_size, 256, 0} + {evt_q_length << 1, 256, 0} }; cfg->consumer_pid = 0; cfg->n_rings = 2;