X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=src%2Fvnet%2Fsession%2Fsession.c;h=189c5375fbb8a690d935b94cb88976aa8011a890;hb=b7b929931a07fbb27b43d5cd105f366c3e29807e;hp=7131f51cdda4bacfd9b98b80683fd32fea4271b4;hpb=208daa1adb8069083b67a36e9f16ac27373fa5bf;p=vpp.git diff --git a/src/vnet/session/session.c b/src/vnet/session/session.c index 7131f51cdda..189c5375fbb 100644 --- a/src/vnet/session/session.c +++ b/src/vnet/session/session.c @@ -27,49 +27,90 @@ session_manager_main_t session_manager_main; extern transport_proto_vft_t *tp_vfts; -static void -session_send_evt_to_thread (u64 session_handle, fifo_event_type_t evt_type, - u32 thread_index, void *fp, void *rpc_args) +static inline int +session_send_evt_to_thread (void *data, void *args, u32 thread_index, + session_evt_type_t evt_type) { - session_fifo_event_t evt = { {0}, }; - svm_queue_t *q; + session_event_t *evt; + svm_msg_q_msg_t msg; + svm_msg_q_t *mq; u32 tries = 0, max_tries; - evt.event_type = evt_type; - if (evt_type == FIFO_EVENT_RPC) - { - evt.rpc_args.fp = fp; - evt.rpc_args.arg = rpc_args; - } - else - evt.session_handle = session_handle; - - q = session_manager_get_vpp_event_queue (thread_index); - while (svm_queue_add (q, (u8 *) & evt, 1)) + mq = session_manager_get_vpp_event_queue (thread_index); + while (svm_msg_q_try_lock (mq)) { max_tries = vlib_get_current_process (vlib_get_main ())? 1e6 : 3; if (tries++ == max_tries) { SESSION_DBG ("failed to enqueue evt"); - break; + return -1; } } + if (PREDICT_FALSE (svm_msg_q_ring_is_full (mq, SESSION_MQ_IO_EVT_RING))) + { + svm_msg_q_unlock (mq); + return -2; + } + msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_IO_EVT_RING); + if (PREDICT_FALSE (svm_msg_q_msg_is_invalid (&msg))) + { + svm_msg_q_unlock (mq); + return -2; + } + evt = (session_event_t *) svm_msg_q_msg_data (mq, &msg); + evt->event_type = evt_type; + switch (evt_type) + { + case FIFO_EVENT_RPC: + evt->rpc_args.fp = data; + evt->rpc_args.arg = args; + break; + case FIFO_EVENT_APP_TX: + case FIFO_EVENT_BUILTIN_RX: + evt->fifo = data; + break; + case FIFO_EVENT_BUILTIN_TX: + case FIFO_EVENT_DISCONNECT: + evt->session_handle = session_handle ((stream_session_t *) data); + break; + default: + clib_warning ("evt unhandled!"); + svm_msg_q_unlock (mq); + return -1; + } + + svm_msg_q_add_and_unlock (mq, &msg); + return 0; } -void -session_send_session_evt_to_thread (u64 session_handle, - fifo_event_type_t evt_type, - u32 thread_index) +int +session_send_io_evt_to_thread (svm_fifo_t * f, session_evt_type_t evt_type) { - session_send_evt_to_thread (session_handle, evt_type, thread_index, 0, 0); + return session_send_evt_to_thread (f, 0, f->master_thread_index, evt_type); +} + +int +session_send_io_evt_to_thread_custom (void *data, u32 thread_index, + session_evt_type_t evt_type) +{ + return session_send_evt_to_thread (data, 0, thread_index, evt_type); +} + +int +session_send_ctrl_evt_to_thread (stream_session_t * s, + session_evt_type_t evt_type) +{ + /* only event supported for now is disconnect */ + ASSERT (evt_type == FIFO_EVENT_DISCONNECT); + return session_send_evt_to_thread (s, 0, s->thread_index, + FIFO_EVENT_DISCONNECT); } void session_send_rpc_evt_to_thread (u32 thread_index, void *fp, void *rpc_args) { if (thread_index != vlib_get_thread_index ()) - session_send_evt_to_thread (0, FIFO_EVENT_RPC, thread_index, fp, - rpc_args); + session_send_evt_to_thread (fp, rpc_args, thread_index, FIFO_EVENT_RPC); else { void (*fnp) (void *) = fp; @@ -98,7 +139,7 @@ session_alloc (u32 thread_index) pool_get_aligned (session_manager_main.sessions[thread_index], s, CLIB_CACHE_LINE_BYTES); } - memset (s, 0, sizeof (*s)); + clib_memset (s, 0, sizeof (*s)); s->session_index = s - session_manager_main.sessions[thread_index]; s->thread_index = thread_index; return s; @@ -109,7 +150,15 @@ session_free (stream_session_t * s) { pool_put (session_manager_main.sessions[s->thread_index], s); if (CLIB_DEBUG) - memset (s, 0xFA, sizeof (*s)); + clib_memset (s, 0xFA, sizeof (*s)); +} + +void +session_free_w_fifos (stream_session_t * s) +{ + segment_manager_dealloc_fifos (s->svm_segment_index, s->server_rx_fifo, + s->server_tx_fifo); + session_free (s); } int @@ -148,7 +197,7 @@ session_alloc_for_connection (transport_connection_t * tc) s = session_alloc (thread_index); s->session_type = session_type_from_proto_and_ip (tc->proto, tc->is_ip4); s->session_state = SESSION_STATE_CONNECTING; - s->enqueue_epoch = ~0; + s->enqueue_epoch = (u64) ~ 0; /* Attach transport to session and vice versa */ s->connection_index = tc->c_index; @@ -344,13 +393,13 @@ session_enqueue_stream_connection (transport_connection_t * tc, * by calling stream_server_flush_enqueue_events () */ session_manager_main_t *smm = vnet_get_session_manager_main (); u32 thread_index = s->thread_index; - u32 enqueue_epoch = smm->current_enqueue_epoch[tc->proto][thread_index]; + u64 enqueue_epoch = smm->current_enqueue_epoch[tc->proto][thread_index]; if (s->enqueue_epoch != enqueue_epoch) { s->enqueue_epoch = enqueue_epoch; vec_add1 (smm->session_to_enqueue[tc->proto][thread_index], - s - smm->sessions[thread_index]); + s->session_index); } } @@ -385,13 +434,13 @@ session_enqueue_dgram_connection (stream_session_t * s, * by calling stream_server_flush_enqueue_events () */ session_manager_main_t *smm = vnet_get_session_manager_main (); u32 thread_index = s->thread_index; - u32 enqueue_epoch = smm->current_enqueue_epoch[proto][thread_index]; + u64 enqueue_epoch = smm->current_enqueue_epoch[proto][thread_index]; if (s->enqueue_epoch != enqueue_epoch) { s->enqueue_epoch = enqueue_epoch; vec_add1 (smm->session_to_enqueue[proto][thread_index], - s - smm->sessions[thread_index]); + s->session_index); } } return enqueued; @@ -440,94 +489,43 @@ stream_session_dequeue_drop (transport_connection_t * tc, u32 max_bytes) /** * Notify session peer that new data has been enqueued. * - * @param s Stream session for which the event is to be generated. - * @param block Flag to indicate if call should block if event queue is full. + * @param s Stream session for which the event is to be generated. + * @param lock Flag to indicate if call should lock message queue. * - * @return 0 on succes or negative number if failed to send notification. + * @return 0 on success or negative number if failed to send notification. */ -static int -session_enqueue_notify (stream_session_t * s, u8 block) +static inline int +session_enqueue_notify (stream_session_t * s) { - application_t *app; - session_fifo_event_t evt; - svm_queue_t *q; + app_worker_t *app; - if (PREDICT_FALSE (s->session_state == SESSION_STATE_CLOSED)) - { - /* Session is closed so app will never clean up. Flush rx fifo */ - svm_fifo_dequeue_drop_all (s->server_rx_fifo); - return 0; - } - - app = application_get_if_valid (s->app_index); - if (PREDICT_FALSE (app == 0)) + app = app_worker_get_if_valid (s->app_wrk_index); + if (PREDICT_FALSE (!app)) { - clib_warning ("invalid s->app_index = %d", s->app_index); + SESSION_DBG ("invalid s->app_index = %d", s->app_wrk_index); return 0; } - /* Built-in app? Hand event to the callback... */ - if (app->cb_fns.builtin_app_rx_callback) - return app->cb_fns.builtin_app_rx_callback (s); - - /* If no event, send one */ - if (svm_fifo_set_event (s->server_rx_fifo)) - { - /* Fabricate event */ - evt.fifo = s->server_rx_fifo; - evt.event_type = FIFO_EVENT_APP_RX; - - /* Add event to server's event queue */ - q = app->event_queue; - - /* Based on request block (or not) for lack of space */ - if (block || PREDICT_TRUE (q->cursize < q->maxsize)) - svm_queue_add (app->event_queue, (u8 *) & evt, - 0 /* do wait for mutex */ ); - else - { - clib_warning ("fifo full"); - return -1; - } - } - /* *INDENT-OFF* */ SESSION_EVT_DBG(SESSION_EVT_ENQ, s, ({ - ed->data[0] = evt.event_type; + ed->data[0] = FIFO_EVENT_APP_RX; ed->data[1] = svm_fifo_max_dequeue (s->server_rx_fifo); })); /* *INDENT-ON* */ - return 0; + return app_worker_lock_and_send_event (app, s, FIFO_EVENT_APP_RX); } int session_dequeue_notify (stream_session_t * s) { - application_t *app; - svm_queue_t *q; + app_worker_t *app; - app = application_get_if_valid (s->app_index); + app = app_worker_get_if_valid (s->app_wrk_index); if (PREDICT_FALSE (!app)) return -1; - if (application_is_builtin (app)) - return 0; - - q = app->event_queue; - if (PREDICT_TRUE (q->cursize < q->maxsize)) - { - session_fifo_event_t evt = { - .event_type = FIFO_EVENT_APP_TX, - .fifo = s->server_tx_fifo - }; - svm_queue_add (app->event_queue, (u8 *) & evt, SVM_Q_WAIT); - } - else - { - return -1; - } - return 0; + return app_worker_lock_and_send_event (app, s, FIFO_EVENT_APP_TX); } /** @@ -542,16 +540,21 @@ int session_manager_flush_enqueue_events (u8 transport_proto, u32 thread_index) { session_manager_main_t *smm = &session_manager_main; - u32 *indices; stream_session_t *s; int i, errors = 0; + u32 *indices; indices = smm->session_to_enqueue[transport_proto][thread_index]; for (i = 0; i < vec_len (indices); i++) { s = session_get_if_valid (indices[i], thread_index); - if (s == 0 || session_enqueue_notify (s, 0 /* don't block */ )) + if (PREDICT_FALSE (!s)) + { + errors++; + continue; + } + if (PREDICT_FALSE (session_enqueue_notify (s))) errors++; } @@ -593,6 +596,7 @@ session_stream_connect_notify (transport_connection_t * tc, u8 is_fail) u32 opaque = 0, new_ti, new_si; stream_session_t *new_s = 0; segment_manager_t *sm; + app_worker_t *app_wrk; application_t *app; u8 alloc_fifos; int error = 0; @@ -612,17 +616,18 @@ session_stream_connect_notify (transport_connection_t * tc, u8 is_fail) /* Get the app's index from the handle we stored when opening connection * and the opaque (api_context for external apps) from transport session * index */ - app = application_get_if_valid (handle >> 32); - if (!app) + app_wrk = app_worker_get_if_valid (handle >> 32); + if (!app_wrk) return -1; opaque = tc->s_index; + app = application_get (app_wrk->app_index); /* * Allocate new session with fifos (svm segments are allocated if needed) */ if (!is_fail) { - sm = application_get_connect_segment_manager (app); + sm = app_worker_get_connect_segment_manager (app_wrk); alloc_fifos = !application_is_builtin_proxy (app); if (session_alloc_and_init (sm, tc, alloc_fifos, &new_s)) { @@ -631,7 +636,7 @@ session_stream_connect_notify (transport_connection_t * tc, u8 is_fail) } else { - new_s->app_index = app->index; + new_s->app_wrk_index = app_wrk->wrk_index; new_si = new_s->session_index; new_ti = new_s->thread_index; } @@ -640,8 +645,8 @@ session_stream_connect_notify (transport_connection_t * tc, u8 is_fail) /* * Notify client application */ - if (app->cb_fns.session_connected_callback (app->index, opaque, new_s, - is_fail)) + if (app->cb_fns.session_connected_callback (app_wrk->wrk_index, opaque, + new_s, is_fail)) { SESSION_DBG ("failed to notify app"); if (!is_fail) @@ -728,12 +733,16 @@ session_dgram_connect_notify (transport_connection_t * tc, void stream_session_accept_notify (transport_connection_t * tc) { - application_t *server; + app_worker_t *app_wrk; + application_t *app; stream_session_t *s; s = session_get (tc->s_index, tc->thread_index); - server = application_get (s->app_index); - server->cb_fns.session_accept_callback (s); + app_wrk = app_worker_get_if_valid (s->app_wrk_index); + if (!app_wrk) + return; + app = application_get (app_wrk->app_index); + app->cb_fns.session_accept_callback (s); } /** @@ -746,13 +755,19 @@ stream_session_accept_notify (transport_connection_t * tc) void stream_session_disconnect_notify (transport_connection_t * tc) { - application_t *server; + app_worker_t *app_wrk; + application_t *app; stream_session_t *s; s = session_get (tc->s_index, tc->thread_index); - server = application_get (s->app_index); - server->cb_fns.session_disconnect_callback (s); - s->session_state = SESSION_STATE_CLOSING; + if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSING) + return; + s->session_state = SESSION_STATE_TRANSPORT_CLOSING; + app_wrk = app_worker_get_if_valid (s->app_wrk_index); + if (!app_wrk) + return; + app = application_get (app_wrk->app_index); + app->cb_fns.session_disconnect_callback (s); } /** @@ -769,10 +784,7 @@ stream_session_delete (stream_session_t * s) if ((rv = session_lookup_del_session (s))) clib_warning ("hash delete error, rv %d", rv); - /* Cleanup fifo segments */ - segment_manager_dealloc_fifos (s->svm_segment_index, s->server_rx_fifo, - s->server_tx_fifo); - session_free (s); + session_free_w_fifos (s); } /** @@ -789,10 +801,35 @@ stream_session_delete_notify (transport_connection_t * tc) stream_session_t *s; /* App might've been removed already */ - s = session_get_if_valid (tc->s_index, tc->thread_index); - if (!s) + if (!(s = session_get_if_valid (tc->s_index, tc->thread_index))) return; - stream_session_delete (s); + + /* Make sure we don't try to send anything more */ + svm_fifo_dequeue_drop_all (s->server_tx_fifo); + + switch (s->session_state) + { + case SESSION_STATE_TRANSPORT_CLOSING: + /* If transport finishes or times out before we get a reply + * from the app, do the whole disconnect since we might still + * have lingering events */ + stream_session_disconnect (s); + s->session_state = SESSION_STATE_CLOSED; + break; + case SESSION_STATE_CLOSING: + /* Cleanup lookup table. Transport needs to still be valid */ + session_lookup_del_session (s); + s->session_state = SESSION_STATE_CLOSED; + break; + case SESSION_STATE_CLOSED: + case SESSION_STATE_ACCEPTING: + stream_session_delete (s); + break; + default: + /* Assume connection was not yet added the lookup table */ + session_free_w_fifos (s); + break; + } } /** @@ -802,10 +839,12 @@ void stream_session_reset_notify (transport_connection_t * tc) { stream_session_t *s; + app_worker_t *app_wrk; application_t *app; s = session_get (tc->s_index, tc->thread_index); s->session_state = SESSION_STATE_CLOSED; - app = application_get (s->app_index); + app_wrk = app_worker_get (s->app_wrk_index); + app = application_get (app_wrk->app_index); app->cb_fns.session_reset_callback (s); } @@ -816,38 +855,40 @@ int stream_session_accept (transport_connection_t * tc, u32 listener_index, u8 notify) { - application_t *server; stream_session_t *s, *listener; + app_worker_t *app_wrk; segment_manager_t *sm; int rv; /* Find the server */ listener = listen_session_get (listener_index); - server = application_get (listener->app_index); + app_wrk = application_listener_select_worker (listener, 0); - sm = application_get_listen_segment_manager (server, listener); + sm = app_worker_get_listen_segment_manager (app_wrk, listener); if ((rv = session_alloc_and_init (sm, tc, 1, &s))) return rv; - s->app_index = server->index; + s->app_wrk_index = app_wrk->wrk_index; s->listener_index = listener_index; s->session_state = SESSION_STATE_ACCEPTING; /* Shoulder-tap the server */ if (notify) { - server->cb_fns.session_accept_callback (s); + application_t *app = application_get (app_wrk->app_index); + app->cb_fns.session_accept_callback (s); } return 0; } int -session_open_cl (u32 app_index, session_endpoint_t * rmt, u32 opaque) +session_open_cl (u32 app_wrk_index, session_endpoint_t * rmt, u32 opaque) { transport_connection_t *tc; transport_endpoint_t *tep; segment_manager_t *sm; + app_worker_t *app_wrk; stream_session_t *s; application_t *app; int rv; @@ -864,22 +905,23 @@ session_open_cl (u32 app_index, session_endpoint_t * rmt, u32 opaque) /* For dgram type of service, allocate session and fifos now. */ - app = application_get (app_index); - sm = application_get_connect_segment_manager (app); + app_wrk = app_worker_get (app_wrk_index); + sm = app_worker_get_connect_segment_manager (app_wrk); if (session_alloc_and_init (sm, tc, 1, &s)) return -1; - s->app_index = app->index; + s->app_wrk_index = app_wrk->wrk_index; s->session_state = SESSION_STATE_OPENED; /* Tell the app about the new event fifo for this session */ - app->cb_fns.session_connected_callback (app->index, opaque, s, 0); + app = application_get (app_wrk->app_index); + app->cb_fns.session_connected_callback (app_wrk->wrk_index, opaque, s, 0); return 0; } int -session_open_vc (u32 app_index, session_endpoint_t * rmt, u32 opaque) +session_open_vc (u32 app_wrk_index, session_endpoint_t * rmt, u32 opaque) { transport_connection_t *tc; transport_endpoint_t *tep; @@ -903,7 +945,7 @@ session_open_vc (u32 app_index, session_endpoint_t * rmt, u32 opaque) * is needed when the connect notify comes and we have to notify the * external app */ - handle = (((u64) app_index) << 32) | (u64) tc->c_index; + handle = (((u64) app_wrk_index) << 32) | (u64) tc->c_index; session_lookup_add_half_open (tc, handle); /* Store api_context (opaque) for when the reply comes. Not the nicest @@ -914,10 +956,10 @@ session_open_vc (u32 app_index, session_endpoint_t * rmt, u32 opaque) } int -session_open_app (u32 app_index, session_endpoint_t * rmt, u32 opaque) +session_open_app (u32 app_wrk_index, session_endpoint_t * rmt, u32 opaque) { session_endpoint_extended_t *sep = (session_endpoint_extended_t *) rmt; - sep->app_index = app_index; + sep->app_wrk_index = app_wrk_index; sep->opaque = opaque; return tp_vfts[rmt->transport_proto].open ((transport_endpoint_t *) sep); @@ -947,118 +989,52 @@ static session_open_service_fn session_open_srv_fns[TRANSPORT_N_SERVICES] = { * on open completion. */ int -session_open (u32 app_index, session_endpoint_t * rmt, u32 opaque) +session_open (u32 app_wrk_index, session_endpoint_t * rmt, u32 opaque) { transport_service_type_t tst = tp_vfts[rmt->transport_proto].service_type; - return session_open_srv_fns[tst] (app_index, rmt, opaque); + return session_open_srv_fns[tst] (app_wrk_index, rmt, opaque); } +/** + * Ask transport to listen on session endpoint. + * + * @param s Session for which listen will be called. Note that unlike + * established sessions, listen sessions are not associated to a + * thread. + * @param sep Local endpoint to be listened on. + */ int -session_listen_vc (stream_session_t * s, session_endpoint_t * sep) +session_listen (stream_session_t * ls, session_endpoint_extended_t * sep) { transport_connection_t *tc; - u32 tci; + transport_endpoint_t *tep; + u32 tc_index, s_index; - /* Transport bind/listen */ - tci = tp_vfts[sep->transport_proto].bind (s->session_index, - session_endpoint_to_transport - (sep)); + /* Transport bind/listen */ + tep = session_endpoint_to_transport (sep); + s_index = ls->session_index; + tc_index = tp_vfts[sep->transport_proto].bind (s_index, tep); - if (tci == (u32) ~ 0) + if (tc_index == (u32) ~ 0) return -1; /* Attach transport to session */ - s->connection_index = tci; - tc = tp_vfts[sep->transport_proto].get_listener (tci); + ls = listen_session_get (s_index); + ls->connection_index = tc_index; - /* Weird but handle it ... */ - if (tc == 0) - return -1; - - /* Add to the main lookup table */ - session_lookup_add_connection (tc, s->session_index); + /* Add to the main lookup table after transport was initialized */ + tc = tp_vfts[sep->transport_proto].get_listener (tc_index); + session_lookup_add_connection (tc, s_index); return 0; } -int -session_listen_cl (stream_session_t * s, session_endpoint_t * sep) -{ - transport_connection_t *tc; - application_t *server; - segment_manager_t *sm; - u32 tci; - - /* Transport bind/listen */ - tci = tp_vfts[sep->transport_proto].bind (s->session_index, - session_endpoint_to_transport - (sep)); - - if (tci == (u32) ~ 0) - return -1; - - /* Attach transport to session */ - s->connection_index = tci; - tc = tp_vfts[sep->transport_proto].get_listener (tci); - - /* Weird but handle it ... */ - if (tc == 0) - return -1; - - server = application_get (s->app_index); - sm = application_get_listen_segment_manager (server, s); - if (session_alloc_fifos (sm, s)) - return -1; - - /* Add to the main lookup table */ - session_lookup_add_connection (tc, s->session_index); - return 0; -} - -int -session_listen_app (stream_session_t * s, session_endpoint_t * sep) -{ - session_endpoint_extended_t esep; - clib_memcpy (&esep, sep, sizeof (*sep)); - esep.app_index = s->app_index; - - return tp_vfts[sep->transport_proto].bind (s->session_index, - (transport_endpoint_t *) & esep); -} - -typedef int (*session_listen_service_fn) (stream_session_t *, - session_endpoint_t *); - -/* *INDENT-OFF* */ -static session_listen_service_fn -session_listen_srv_fns[TRANSPORT_N_SERVICES] = { - session_listen_vc, - session_listen_cl, - session_listen_app, -}; -/* *INDENT-ON* */ - -/** - * Ask transport to listen on local transport endpoint. - * - * @param s Session for which listen will be called. Note that unlike - * established sessions, listen sessions are not associated to a - * thread. - * @param tep Local endpoint to be listened on. - */ -int -stream_session_listen (stream_session_t * s, session_endpoint_t * sep) -{ - transport_service_type_t tst = tp_vfts[sep->transport_proto].service_type; - return session_listen_srv_fns[tst] (s, sep); -} - /** * Ask transport to stop listening on local transport endpoint. * * @param s Session to stop listening on. It must be in state LISTENING. */ int -stream_session_stop_listen (stream_session_t * s) +session_stop_listen (stream_session_t * s) { transport_proto_t tp = session_get_transport_proto (s); transport_connection_t *tc; @@ -1091,7 +1067,7 @@ stream_session_disconnect (stream_session_t * s) { u32 thread_index = vlib_get_thread_index (); session_manager_main_t *smm = &session_manager_main; - session_fifo_event_t *evt; + session_event_t *evt; if (!s) return; @@ -1107,20 +1083,16 @@ stream_session_disconnect (stream_session_t * s) s->session_state = SESSION_STATE_CLOSING; /* If we are in the handler thread, or being called with the worker barrier - * held (api/cli), just append a new event to pending disconnects vector. */ - if ((thread_index == 0 && !vlib_get_current_process (vlib_get_main ())) - || thread_index == s->thread_index) + * held, just append a new event to pending disconnects vector. */ + if (vlib_thread_is_main_w_barrier () || thread_index == s->thread_index) { - ASSERT (s->thread_index == thread_index || thread_index == 0); vec_add2 (smm->pending_disconnects[s->thread_index], evt, 1); - memset (evt, 0, sizeof (*evt)); + clib_memset (evt, 0, sizeof (*evt)); evt->session_handle = session_handle (s); evt->event_type = FIFO_EVENT_DISCONNECT; } else - session_send_session_evt_to_thread (session_handle (s), - FIFO_EVENT_DISCONNECT, - s->thread_index); + session_send_ctrl_evt_to_thread (s, FIFO_EVENT_DISCONNECT); } /** @@ -1133,6 +1105,12 @@ stream_session_disconnect (stream_session_t * s) void stream_session_disconnect_transport (stream_session_t * s) { + /* If transport is already closed, just free the session */ + if (s->session_state == SESSION_STATE_CLOSED) + { + session_free_w_fifos (s); + return; + } s->session_state = SESSION_STATE_CLOSED; tp_vfts[session_get_transport_proto (s)].close (s->connection_index, s->thread_index); @@ -1156,9 +1134,7 @@ stream_session_cleanup (stream_session_t * s) s->thread_index); /* Since we called cleanup, no delete notification will come. So, make * sure the session is properly freed. */ - segment_manager_dealloc_fifos (s->svm_segment_index, s->server_rx_fifo, - s->server_tx_fifo); - session_free (s); + session_free_w_fifos (s); } transport_service_type_t @@ -1195,7 +1171,7 @@ session_tx_is_dgram (stream_session_t * s) void session_vpp_event_queues_allocate (session_manager_main_t * smm) { - u32 evt_q_length = 2048, evt_size = sizeof (session_fifo_event_t); + u32 evt_q_length = 2048, evt_size = sizeof (session_event_t); ssvm_private_t *eqs = &smm->evt_qs_segment; api_main_t *am = &api_main; u64 eqs_size = 64 << 20; @@ -1217,7 +1193,11 @@ session_vpp_event_queues_allocate (session_manager_main_t * smm) eqs->name = format (0, "%s%c", "evt-qs-segment", 0); eqs->requested_va = smm->session_baseva; - ssvm_master_init (eqs, SSVM_SEGMENT_MEMFD); + if (ssvm_master_init (eqs, SSVM_SEGMENT_MEMFD)) + { + clib_warning ("failed to initialize queue segment"); + return; + } } if (smm->evt_qs_use_memfd_seg) @@ -1227,8 +1207,23 @@ session_vpp_event_queues_allocate (session_manager_main_t * smm) for (i = 0; i < vec_len (smm->vpp_event_queues); i++) { - smm->vpp_event_queues[i] = svm_queue_init (evt_q_length, evt_size, - vpp_pid, 0); + svm_msg_q_cfg_t _cfg, *cfg = &_cfg; + u32 notif_q_size = clib_max (16, evt_q_length >> 4); + svm_msg_q_ring_cfg_t rc[SESSION_MQ_N_RINGS] = { + {evt_q_length, evt_size, 0} + , + {notif_q_size, 256, 0} + }; + cfg->consumer_pid = 0; + cfg->n_rings = 2; + cfg->q_nitems = evt_q_length; + cfg->ring_cfgs = rc; + smm->vpp_event_queues[i] = svm_msg_q_alloc (cfg); + if (smm->evt_qs_use_memfd_seg) + { + if (svm_msg_q_alloc_consumer_eventfd (smm->vpp_event_queues[i])) + clib_warning ("eventfd returned"); + } } if (smm->evt_qs_use_memfd_seg)