X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=src%2Fvnet%2Fsession%2Fsession.c;h=952a5a90141fa7f06afd690f449ef8c72c776b17;hb=ab2f6dbf;hp=9790ec25eddf01bdf47043a64f28cc8610187173;hpb=3ec66b023280b1aa4b2e92ae475ceb03e5ed3910;p=vpp.git diff --git a/src/vnet/session/session.c b/src/vnet/session/session.c index 9790ec25edd..952a5a90141 100644 --- a/src/vnet/session/session.c +++ b/src/vnet/session/session.c @@ -488,12 +488,12 @@ stream_session_dequeue_drop (transport_connection_t * tc, u32 max_bytes) static inline int session_enqueue_notify (stream_session_t * s, u8 lock) { - application_t *app; + app_worker_t *app; - app = application_get_if_valid (s->app_index); + app = app_worker_get_if_valid (s->app_wrk_index); if (PREDICT_FALSE (!app)) { - TCP_DBG ("invalid s->app_index = %d", s->app_index); + SESSION_DBG ("invalid s->app_index = %d", s->app_wrk_index); return 0; } @@ -505,24 +505,24 @@ session_enqueue_notify (stream_session_t * s, u8 lock) /* *INDENT-ON* */ if (lock) - return application_lock_and_send_event (app, s, FIFO_EVENT_APP_RX); + return app_worker_lock_and_send_event (app, s, FIFO_EVENT_APP_RX); - return application_send_event (app, s, FIFO_EVENT_APP_RX); + return app_worker_send_event (app, s, FIFO_EVENT_APP_RX); } int session_dequeue_notify (stream_session_t * s) { - application_t *app; + app_worker_t *app; - app = application_get_if_valid (s->app_index); + app = app_worker_get_if_valid (s->app_wrk_index); if (PREDICT_FALSE (!app)) return -1; if (session_transport_service_type (s) == TRANSPORT_SERVICE_CL) - return application_lock_and_send_event (app, s, FIFO_EVENT_APP_RX); + return app_worker_lock_and_send_event (app, s, FIFO_EVENT_APP_RX); - return application_send_event (app, s, FIFO_EVENT_APP_TX); + return app_worker_send_event (app, s, FIFO_EVENT_APP_TX); } /** @@ -596,6 +596,7 @@ session_stream_connect_notify (transport_connection_t * tc, u8 is_fail) u32 opaque = 0, new_ti, new_si; stream_session_t *new_s = 0; segment_manager_t *sm; + app_worker_t *app_wrk; application_t *app; u8 alloc_fifos; int error = 0; @@ -615,17 +616,18 @@ session_stream_connect_notify (transport_connection_t * tc, u8 is_fail) /* Get the app's index from the handle we stored when opening connection * and the opaque (api_context for external apps) from transport session * index */ - app = application_get_if_valid (handle >> 32); - if (!app) + app_wrk = app_worker_get_if_valid (handle >> 32); + if (!app_wrk) return -1; opaque = tc->s_index; + app = application_get (app_wrk->app_index); /* * Allocate new session with fifos (svm segments are allocated if needed) */ if (!is_fail) { - sm = application_get_connect_segment_manager (app); + sm = app_worker_get_connect_segment_manager (app_wrk); alloc_fifos = !application_is_builtin_proxy (app); if (session_alloc_and_init (sm, tc, alloc_fifos, &new_s)) { @@ -634,7 +636,7 @@ session_stream_connect_notify (transport_connection_t * tc, u8 is_fail) } else { - new_s->app_index = app->index; + new_s->app_wrk_index = app_wrk->wrk_index; new_si = new_s->session_index; new_ti = new_s->thread_index; } @@ -643,8 +645,8 @@ session_stream_connect_notify (transport_connection_t * tc, u8 is_fail) /* * Notify client application */ - if (app->cb_fns.session_connected_callback (app->index, opaque, new_s, - is_fail)) + if (app->cb_fns.session_connected_callback (app_wrk->wrk_index, opaque, + new_s, is_fail)) { SESSION_DBG ("failed to notify app"); if (!is_fail) @@ -731,12 +733,16 @@ session_dgram_connect_notify (transport_connection_t * tc, void stream_session_accept_notify (transport_connection_t * tc) { - application_t *server; + app_worker_t *app_wrk; + application_t *app; stream_session_t *s; s = session_get (tc->s_index, tc->thread_index); - server = application_get (s->app_index); - server->cb_fns.session_accept_callback (s); + app_wrk = app_worker_get_if_valid (s->app_wrk_index); + if (!app_wrk) + return; + app = application_get (app_wrk->app_index); + app->cb_fns.session_accept_callback (s); } /** @@ -749,14 +755,17 @@ stream_session_accept_notify (transport_connection_t * tc) void stream_session_disconnect_notify (transport_connection_t * tc) { - application_t *server; + app_worker_t *app_wrk; + application_t *app; stream_session_t *s; s = session_get (tc->s_index, tc->thread_index); s->session_state = SESSION_STATE_CLOSING; - server = application_get_if_valid (s->app_index); - if (server) - server->cb_fns.session_disconnect_callback (s); + app_wrk = app_worker_get_if_valid (s->app_wrk_index); + if (!app_wrk) + return; + app = application_get (app_wrk->app_index); + app->cb_fns.session_disconnect_callback (s); } /** @@ -806,10 +815,12 @@ void stream_session_reset_notify (transport_connection_t * tc) { stream_session_t *s; + app_worker_t *app_wrk; application_t *app; s = session_get (tc->s_index, tc->thread_index); s->session_state = SESSION_STATE_CLOSED; - app = application_get (s->app_index); + app_wrk = app_worker_get (s->app_wrk_index); + app = application_get (app_wrk->app_index); app->cb_fns.session_reset_callback (s); } @@ -820,38 +831,40 @@ int stream_session_accept (transport_connection_t * tc, u32 listener_index, u8 notify) { - application_t *server; stream_session_t *s, *listener; + app_worker_t *app_wrk; segment_manager_t *sm; int rv; /* Find the server */ listener = listen_session_get (listener_index); - server = application_get (listener->app_index); + app_wrk = app_worker_get (listener->app_wrk_index); - sm = application_get_listen_segment_manager (server, listener); + sm = app_worker_get_listen_segment_manager (app_wrk, listener); if ((rv = session_alloc_and_init (sm, tc, 1, &s))) return rv; - s->app_index = server->index; + s->app_wrk_index = app_wrk->wrk_index; s->listener_index = listener_index; s->session_state = SESSION_STATE_ACCEPTING; /* Shoulder-tap the server */ if (notify) { - server->cb_fns.session_accept_callback (s); + application_t *app = application_get (app_wrk->app_index); + app->cb_fns.session_accept_callback (s); } return 0; } int -session_open_cl (u32 app_index, session_endpoint_t * rmt, u32 opaque) +session_open_cl (u32 app_wrk_index, session_endpoint_t * rmt, u32 opaque) { transport_connection_t *tc; transport_endpoint_t *tep; segment_manager_t *sm; + app_worker_t *app_wrk; stream_session_t *s; application_t *app; int rv; @@ -868,22 +881,23 @@ session_open_cl (u32 app_index, session_endpoint_t * rmt, u32 opaque) /* For dgram type of service, allocate session and fifos now. */ - app = application_get (app_index); - sm = application_get_connect_segment_manager (app); + app_wrk = app_worker_get (app_wrk_index); + sm = app_worker_get_connect_segment_manager (app_wrk); if (session_alloc_and_init (sm, tc, 1, &s)) return -1; - s->app_index = app->index; + s->app_wrk_index = app_wrk->wrk_index; s->session_state = SESSION_STATE_OPENED; /* Tell the app about the new event fifo for this session */ - app->cb_fns.session_connected_callback (app->index, opaque, s, 0); + app = application_get (app_wrk->app_index); + app->cb_fns.session_connected_callback (app_wrk->wrk_index, opaque, s, 0); return 0; } int -session_open_vc (u32 app_index, session_endpoint_t * rmt, u32 opaque) +session_open_vc (u32 app_wrk_index, session_endpoint_t * rmt, u32 opaque) { transport_connection_t *tc; transport_endpoint_t *tep; @@ -907,7 +921,7 @@ session_open_vc (u32 app_index, session_endpoint_t * rmt, u32 opaque) * is needed when the connect notify comes and we have to notify the * external app */ - handle = (((u64) app_index) << 32) | (u64) tc->c_index; + handle = (((u64) app_wrk_index) << 32) | (u64) tc->c_index; session_lookup_add_half_open (tc, handle); /* Store api_context (opaque) for when the reply comes. Not the nicest @@ -918,10 +932,10 @@ session_open_vc (u32 app_index, session_endpoint_t * rmt, u32 opaque) } int -session_open_app (u32 app_index, session_endpoint_t * rmt, u32 opaque) +session_open_app (u32 app_wrk_index, session_endpoint_t * rmt, u32 opaque) { session_endpoint_extended_t *sep = (session_endpoint_extended_t *) rmt; - sep->app_index = app_index; + sep->app_wrk_index = app_wrk_index; sep->opaque = opaque; return tp_vfts[rmt->transport_proto].open ((transport_endpoint_t *) sep); @@ -951,118 +965,50 @@ static session_open_service_fn session_open_srv_fns[TRANSPORT_N_SERVICES] = { * on open completion. */ int -session_open (u32 app_index, session_endpoint_t * rmt, u32 opaque) +session_open (u32 app_wrk_index, session_endpoint_t * rmt, u32 opaque) { transport_service_type_t tst = tp_vfts[rmt->transport_proto].service_type; - return session_open_srv_fns[tst] (app_index, rmt, opaque); + return session_open_srv_fns[tst] (app_wrk_index, rmt, opaque); } +/** + * Ask transport to listen on session endpoint. + * + * @param s Session for which listen will be called. Note that unlike + * established sessions, listen sessions are not associated to a + * thread. + * @param sep Local endpoint to be listened on. + */ int -session_listen_vc (stream_session_t * s, session_endpoint_t * sep) -{ - transport_connection_t *tc; - u32 tci; - - /* Transport bind/listen */ - tci = tp_vfts[sep->transport_proto].bind (s->session_index, - session_endpoint_to_transport - (sep)); - - if (tci == (u32) ~ 0) - return -1; - - /* Attach transport to session */ - s->connection_index = tci; - tc = tp_vfts[sep->transport_proto].get_listener (tci); - - /* Weird but handle it ... */ - if (tc == 0) - return -1; - - /* Add to the main lookup table */ - session_lookup_add_connection (tc, s->session_index); - return 0; -} - -int -session_listen_cl (stream_session_t * s, session_endpoint_t * sep) +session_listen (stream_session_t * ls, session_endpoint_extended_t * sep) { transport_connection_t *tc; - application_t *server; - segment_manager_t *sm; - u32 tci; + transport_endpoint_t *tep; + u32 tc_index; - /* Transport bind/listen */ - tci = tp_vfts[sep->transport_proto].bind (s->session_index, - session_endpoint_to_transport - (sep)); + /* Transport bind/listen */ + tep = session_endpoint_to_transport (sep); + tc_index = tp_vfts[sep->transport_proto].bind (ls->session_index, tep); - if (tci == (u32) ~ 0) + if (tc_index == (u32) ~ 0) return -1; /* Attach transport to session */ - s->connection_index = tci; - tc = tp_vfts[sep->transport_proto].get_listener (tci); - - /* Weird but handle it ... */ - if (tc == 0) - return -1; + ls->connection_index = tc_index; - server = application_get (s->app_index); - sm = application_get_listen_segment_manager (server, s); - if (session_alloc_fifos (sm, s)) - return -1; - - /* Add to the main lookup table */ - session_lookup_add_connection (tc, s->session_index); + /* Add to the main lookup table after transport was initialized */ + tc = tp_vfts[sep->transport_proto].get_listener (tc_index); + session_lookup_add_connection (tc, ls->session_index); return 0; } -int -session_listen_app (stream_session_t * s, session_endpoint_t * sep) -{ - session_endpoint_extended_t esep; - clib_memcpy (&esep, sep, sizeof (*sep)); - esep.app_index = s->app_index; - - return tp_vfts[sep->transport_proto].bind (s->session_index, - (transport_endpoint_t *) & esep); -} - -typedef int (*session_listen_service_fn) (stream_session_t *, - session_endpoint_t *); - -/* *INDENT-OFF* */ -static session_listen_service_fn -session_listen_srv_fns[TRANSPORT_N_SERVICES] = { - session_listen_vc, - session_listen_cl, - session_listen_app, -}; -/* *INDENT-ON* */ - -/** - * Ask transport to listen on local transport endpoint. - * - * @param s Session for which listen will be called. Note that unlike - * established sessions, listen sessions are not associated to a - * thread. - * @param tep Local endpoint to be listened on. - */ -int -stream_session_listen (stream_session_t * s, session_endpoint_t * sep) -{ - transport_service_type_t tst = tp_vfts[sep->transport_proto].service_type; - return session_listen_srv_fns[tst] (s, sep); -} - /** * Ask transport to stop listening on local transport endpoint. * * @param s Session to stop listening on. It must be in state LISTENING. */ int -stream_session_stop_listen (stream_session_t * s) +session_stop_listen (stream_session_t * s) { transport_proto_t tp = session_get_transport_proto (s); transport_connection_t *tc;