{
s->enqueue_epoch = enqueue_epoch;
vec_add1 (smm->session_to_enqueue[tc->proto][thread_index],
- s - smm->sessions[thread_index]);
+ s->session_index);
}
}
{
s->enqueue_epoch = enqueue_epoch;
vec_add1 (smm->session_to_enqueue[proto][thread_index],
- s - smm->sessions[thread_index]);
+ s->session_index);
}
}
return enqueued;
* @return 0 on success or negative number if failed to send notification.
*/
static inline int
-session_enqueue_notify (stream_session_t * s, u8 lock)
+session_enqueue_notify (stream_session_t * s)
{
- application_t *app;
+ app_worker_t *app;
- app = application_get_if_valid (s->app_index);
+ app = app_worker_get_if_valid (s->app_wrk_index);
if (PREDICT_FALSE (!app))
{
- TCP_DBG ("invalid s->app_index = %d", s->app_index);
+ SESSION_DBG ("invalid s->app_index = %d", s->app_wrk_index);
return 0;
}
}));
/* *INDENT-ON* */
- if (lock)
- return application_lock_and_send_event (app, s, FIFO_EVENT_APP_RX);
-
- return application_send_event (app, s, FIFO_EVENT_APP_RX);
+ return app_worker_lock_and_send_event (app, s, FIFO_EVENT_APP_RX);
}
int
session_dequeue_notify (stream_session_t * s)
{
- application_t *app;
+ app_worker_t *app;
- app = application_get_if_valid (s->app_index);
+ app = app_worker_get_if_valid (s->app_wrk_index);
if (PREDICT_FALSE (!app))
return -1;
- if (session_transport_service_type (s) == TRANSPORT_SERVICE_CL)
- return application_lock_and_send_event (app, s, FIFO_EVENT_APP_RX);
-
- return application_send_event (app, s, FIFO_EVENT_APP_TX);
+ return app_worker_lock_and_send_event (app, s, FIFO_EVENT_APP_TX);
}
/**
session_manager_flush_enqueue_events (u8 transport_proto, u32 thread_index)
{
session_manager_main_t *smm = &session_manager_main;
- transport_service_type_t tp_service;
- int i, errors = 0, lock;
stream_session_t *s;
+ int i, errors = 0;
u32 *indices;
indices = smm->session_to_enqueue[transport_proto][thread_index];
- tp_service = transport_protocol_service_type (transport_proto);
- lock = tp_service == TRANSPORT_SERVICE_CL;
for (i = 0; i < vec_len (indices); i++)
{
errors++;
continue;
}
- if (PREDICT_FALSE (session_enqueue_notify (s, lock)))
+ if (PREDICT_FALSE (session_enqueue_notify (s)))
errors++;
}
u32 opaque = 0, new_ti, new_si;
stream_session_t *new_s = 0;
segment_manager_t *sm;
+ app_worker_t *app_wrk;
application_t *app;
u8 alloc_fifos;
int error = 0;
/* Get the app's index from the handle we stored when opening connection
* and the opaque (api_context for external apps) from transport session
* index */
- app = application_get_if_valid (handle >> 32);
- if (!app)
+ app_wrk = app_worker_get_if_valid (handle >> 32);
+ if (!app_wrk)
return -1;
opaque = tc->s_index;
+ app = application_get (app_wrk->app_index);
/*
* Allocate new session with fifos (svm segments are allocated if needed)
*/
if (!is_fail)
{
- sm = application_get_connect_segment_manager (app);
+ sm = app_worker_get_connect_segment_manager (app_wrk);
alloc_fifos = !application_is_builtin_proxy (app);
if (session_alloc_and_init (sm, tc, alloc_fifos, &new_s))
{
}
else
{
- new_s->app_index = app->index;
+ new_s->app_wrk_index = app_wrk->wrk_index;
new_si = new_s->session_index;
new_ti = new_s->thread_index;
}
/*
* Notify client application
*/
- if (app->cb_fns.session_connected_callback (app->index, opaque, new_s,
- is_fail))
+ if (app->cb_fns.session_connected_callback (app_wrk->wrk_index, opaque,
+ new_s, is_fail))
{
SESSION_DBG ("failed to notify app");
if (!is_fail)
void
stream_session_accept_notify (transport_connection_t * tc)
{
- application_t *server;
+ app_worker_t *app_wrk;
+ application_t *app;
stream_session_t *s;
s = session_get (tc->s_index, tc->thread_index);
- server = application_get (s->app_index);
- server->cb_fns.session_accept_callback (s);
+ app_wrk = app_worker_get_if_valid (s->app_wrk_index);
+ if (!app_wrk)
+ return;
+ app = application_get (app_wrk->app_index);
+ app->cb_fns.session_accept_callback (s);
}
/**
void
stream_session_disconnect_notify (transport_connection_t * tc)
{
- application_t *server;
+ app_worker_t *app_wrk;
+ application_t *app;
stream_session_t *s;
s = session_get (tc->s_index, tc->thread_index);
s->session_state = SESSION_STATE_CLOSING;
- server = application_get_if_valid (s->app_index);
- if (server)
- server->cb_fns.session_disconnect_callback (s);
+ app_wrk = app_worker_get_if_valid (s->app_wrk_index);
+ if (!app_wrk)
+ return;
+ app = application_get (app_wrk->app_index);
+ app->cb_fns.session_disconnect_callback (s);
}
/**
stream_session_reset_notify (transport_connection_t * tc)
{
stream_session_t *s;
+ app_worker_t *app_wrk;
application_t *app;
s = session_get (tc->s_index, tc->thread_index);
s->session_state = SESSION_STATE_CLOSED;
- app = application_get (s->app_index);
+ app_wrk = app_worker_get (s->app_wrk_index);
+ app = application_get (app_wrk->app_index);
app->cb_fns.session_reset_callback (s);
}
stream_session_accept (transport_connection_t * tc, u32 listener_index,
u8 notify)
{
- application_t *server;
stream_session_t *s, *listener;
+ app_worker_t *app_wrk;
segment_manager_t *sm;
int rv;
/* Find the server */
listener = listen_session_get (listener_index);
- server = application_get (listener->app_index);
+ app_wrk = application_listener_select_worker (listener, 0);
- sm = application_get_listen_segment_manager (server, listener);
+ sm = app_worker_get_listen_segment_manager (app_wrk, listener);
if ((rv = session_alloc_and_init (sm, tc, 1, &s)))
return rv;
- s->app_index = server->index;
+ s->app_wrk_index = app_wrk->wrk_index;
s->listener_index = listener_index;
s->session_state = SESSION_STATE_ACCEPTING;
/* Shoulder-tap the server */
if (notify)
{
- server->cb_fns.session_accept_callback (s);
+ application_t *app = application_get (app_wrk->app_index);
+ app->cb_fns.session_accept_callback (s);
}
return 0;
}
int
-session_open_cl (u32 app_index, session_endpoint_t * rmt, u32 opaque)
+session_open_cl (u32 app_wrk_index, session_endpoint_t * rmt, u32 opaque)
{
transport_connection_t *tc;
transport_endpoint_t *tep;
segment_manager_t *sm;
+ app_worker_t *app_wrk;
stream_session_t *s;
application_t *app;
int rv;
/* For dgram type of service, allocate session and fifos now.
*/
- app = application_get (app_index);
- sm = application_get_connect_segment_manager (app);
+ app_wrk = app_worker_get (app_wrk_index);
+ sm = app_worker_get_connect_segment_manager (app_wrk);
if (session_alloc_and_init (sm, tc, 1, &s))
return -1;
- s->app_index = app->index;
+ s->app_wrk_index = app_wrk->wrk_index;
s->session_state = SESSION_STATE_OPENED;
/* Tell the app about the new event fifo for this session */
- app->cb_fns.session_connected_callback (app->index, opaque, s, 0);
+ app = application_get (app_wrk->app_index);
+ app->cb_fns.session_connected_callback (app_wrk->wrk_index, opaque, s, 0);
return 0;
}
int
-session_open_vc (u32 app_index, session_endpoint_t * rmt, u32 opaque)
+session_open_vc (u32 app_wrk_index, session_endpoint_t * rmt, u32 opaque)
{
transport_connection_t *tc;
transport_endpoint_t *tep;
* is needed when the connect notify comes and we have to notify the
* external app
*/
- handle = (((u64) app_index) << 32) | (u64) tc->c_index;
+ handle = (((u64) app_wrk_index) << 32) | (u64) tc->c_index;
session_lookup_add_half_open (tc, handle);
/* Store api_context (opaque) for when the reply comes. Not the nicest
}
int
-session_open_app (u32 app_index, session_endpoint_t * rmt, u32 opaque)
+session_open_app (u32 app_wrk_index, session_endpoint_t * rmt, u32 opaque)
{
session_endpoint_extended_t *sep = (session_endpoint_extended_t *) rmt;
- sep->app_index = app_index;
+ sep->app_wrk_index = app_wrk_index;
sep->opaque = opaque;
return tp_vfts[rmt->transport_proto].open ((transport_endpoint_t *) sep);
* on open completion.
*/
int
-session_open (u32 app_index, session_endpoint_t * rmt, u32 opaque)
+session_open (u32 app_wrk_index, session_endpoint_t * rmt, u32 opaque)
{
transport_service_type_t tst = tp_vfts[rmt->transport_proto].service_type;
- return session_open_srv_fns[tst] (app_index, rmt, opaque);
-}
-
-int
-session_listen_vc (stream_session_t * s, session_endpoint_t * sep)
-{
- transport_connection_t *tc;
- u32 tci;
-
- /* Transport bind/listen */
- tci = tp_vfts[sep->transport_proto].bind (s->session_index,
- session_endpoint_to_transport
- (sep));
-
- if (tci == (u32) ~ 0)
- return -1;
-
- /* Attach transport to session */
- s->connection_index = tci;
- tc = tp_vfts[sep->transport_proto].get_listener (tci);
-
- /* Weird but handle it ... */
- if (tc == 0)
- return -1;
-
- /* Add to the main lookup table */
- session_lookup_add_connection (tc, s->session_index);
- return 0;
+ return session_open_srv_fns[tst] (app_wrk_index, rmt, opaque);
}
+/**
+ * Ask transport to listen on session endpoint.
+ *
+ * @param s Session for which listen will be called. Note that unlike
+ * established sessions, listen sessions are not associated to a
+ * thread.
+ * @param sep Local endpoint to be listened on.
+ */
int
-session_listen_cl (stream_session_t * s, session_endpoint_t * sep)
+session_listen (stream_session_t * ls, session_endpoint_extended_t * sep)
{
transport_connection_t *tc;
- application_t *server;
- segment_manager_t *sm;
- u32 tci;
+ transport_endpoint_t *tep;
+ u32 tc_index, s_index;
- /* Transport bind/listen */
- tci = tp_vfts[sep->transport_proto].bind (s->session_index,
- session_endpoint_to_transport
- (sep));
+ /* Transport bind/listen */
+ tep = session_endpoint_to_transport (sep);
+ s_index = ls->session_index;
+ tc_index = tp_vfts[sep->transport_proto].bind (s_index, tep);
- if (tci == (u32) ~ 0)
+ if (tc_index == (u32) ~ 0)
return -1;
/* Attach transport to session */
- s->connection_index = tci;
- tc = tp_vfts[sep->transport_proto].get_listener (tci);
-
- /* Weird but handle it ... */
- if (tc == 0)
- return -1;
-
- server = application_get (s->app_index);
- sm = application_get_listen_segment_manager (server, s);
- if (session_alloc_fifos (sm, s))
- return -1;
+ ls = listen_session_get (s_index);
+ ls->connection_index = tc_index;
- /* Add to the main lookup table */
- session_lookup_add_connection (tc, s->session_index);
+ /* Add to the main lookup table after transport was initialized */
+ tc = tp_vfts[sep->transport_proto].get_listener (tc_index);
+ session_lookup_add_connection (tc, s_index);
return 0;
}
-int
-session_listen_app (stream_session_t * s, session_endpoint_t * sep)
-{
- session_endpoint_extended_t esep;
- clib_memcpy (&esep, sep, sizeof (*sep));
- esep.app_index = s->app_index;
-
- return tp_vfts[sep->transport_proto].bind (s->session_index,
- (transport_endpoint_t *) & esep);
-}
-
-typedef int (*session_listen_service_fn) (stream_session_t *,
- session_endpoint_t *);
-
-/* *INDENT-OFF* */
-static session_listen_service_fn
-session_listen_srv_fns[TRANSPORT_N_SERVICES] = {
- session_listen_vc,
- session_listen_cl,
- session_listen_app,
-};
-/* *INDENT-ON* */
-
-/**
- * Ask transport to listen on local transport endpoint.
- *
- * @param s Session for which listen will be called. Note that unlike
- * established sessions, listen sessions are not associated to a
- * thread.
- * @param tep Local endpoint to be listened on.
- */
-int
-stream_session_listen (stream_session_t * s, session_endpoint_t * sep)
-{
- transport_service_type_t tst = tp_vfts[sep->transport_proto].service_type;
- return session_listen_srv_fns[tst] (s, sep);
-}
-
/**
* Ask transport to stop listening on local transport endpoint.
*
* @param s Session to stop listening on. It must be in state LISTENING.
*/
int
-stream_session_stop_listen (stream_session_t * s)
+session_stop_listen (stream_session_t * s)
{
transport_proto_t tp = session_get_transport_proto (s);
transport_connection_t *tc;