int
session_send_ctrl_evt_to_thread (session_t * s, session_evt_type_t evt_type)
{
- /* only events supported are disconnect and reset */
- ASSERT (evt_type == SESSION_CTRL_EVT_CLOSE
- || evt_type == SESSION_CTRL_EVT_RESET);
+ /* only events supported are disconnect, shutdown and reset */
+ ASSERT (evt_type == SESSION_CTRL_EVT_CLOSE ||
+ evt_type == SESSION_CTRL_EVT_HALF_CLOSE ||
+ evt_type == SESSION_CTRL_EVT_RESET);
return session_send_evt_to_thread (s, 0, s->thread_index, evt_type);
}
void
session_cleanup_half_open (session_handle_t ho_handle)
{
- session_t *s = session_get_from_handle (ho_handle);
- transport_cleanup_half_open (session_get_transport_proto (s),
- s->connection_index);
+ session_t *ho = session_get_from_handle (ho_handle);
+
+ /* App transports can migrate their half-opens */
+ if (ho->flags & SESSION_F_IS_MIGRATING)
+ {
+ /* Session still migrating, move to closed state to signal that the
+ * session should be removed. */
+ if (ho->connection_index == ~0)
+ {
+ ho->session_state = SESSION_STATE_CLOSED;
+ return;
+ }
+ /* Migrated transports are no longer half-opens */
+ transport_cleanup (session_get_transport_proto (ho),
+ ho->connection_index, ho->app_index /* overloaded */);
+ }
+ else
+ transport_cleanup_half_open (session_get_transport_proto (ho),
+ ho->connection_index);
+ session_free (ho);
+}
+
+static void
+session_half_open_free (session_t *ho)
+{
+ app_worker_t *app_wrk;
+
+ ASSERT (vlib_get_thread_index () <= 1);
+ app_wrk = app_worker_get (ho->app_wrk_index);
+ app_worker_del_half_open (app_wrk, ho);
+ session_free (ho);
+}
+
+static void
+session_half_open_free_rpc (void *args)
+{
+ session_t *ho = ho_session_get (pointer_to_uword (args));
+ session_half_open_free (ho);
}
void
session_half_open_delete_notify (transport_connection_t *tc)
{
- app_worker_t *app_wrk;
- session_t *s;
+ /* Notification from ctrl thread accepted without rpc */
+ if (!tc->thread_index)
+ {
+ session_half_open_free (ho_session_get (tc->s_index));
+ }
+ else
+ {
+ void *args = uword_to_pointer ((uword) tc->s_index, void *);
+ session_send_rpc_evt_to_thread_force (0, session_half_open_free_rpc,
+ args);
+ }
+}
- s = session_get (tc->s_index, tc->thread_index);
- app_wrk = app_worker_get (s->app_wrk_index);
- app_worker_del_half_open (app_wrk, s->ho_index);
- session_free (s);
+void
+session_half_open_migrate_notify (transport_connection_t *tc)
+{
+ session_t *ho;
+
+ ho = ho_session_get (tc->s_index);
+ ho->flags |= SESSION_F_IS_MIGRATING;
+ ho->connection_index = ~0;
+}
+
+int
+session_half_open_migrated_notify (transport_connection_t *tc)
+{
+ session_t *ho;
+
+ ho = ho_session_get (tc->s_index);
+
+ /* App probably detached so the half-open must be cleaned up */
+ if (ho->session_state == SESSION_STATE_CLOSED)
+ {
+ session_half_open_delete_notify (tc);
+ return -1;
+ }
+ ho->connection_index = tc->c_index;
+ /* Overload app index for half-open with new thread */
+ ho->app_index = tc->thread_index;
+ return 0;
}
session_t *
return s;
}
-static session_t *
+session_t *
session_alloc_for_half_open (transport_connection_t *tc)
{
session_t *s;
return;
/* Transport thinks that app requested close but it actually didn't.
- * Can happen for tcp if fin and rst are received in close succession. */
+ * Can happen for tcp:
+ * 1)if fin and rst are received in close succession.
+ * 2)if app shutdown the connection. */
if (s->session_state == SESSION_STATE_READY)
{
session_transport_closing_notify (tc);
return rv;
}
- s->session_state = SESSION_STATE_READY;
-
return 0;
}
int
-session_open_cl (u32 app_wrk_index, session_endpoint_t * rmt, u32 opaque)
+session_open_cl (session_endpoint_cfg_t *rmt, session_handle_t *rsh)
{
transport_connection_t *tc;
transport_endpoint_cfg_t *tep;
tc = transport_get_half_open (rmt->transport_proto, (u32) rv);
/* For dgram type of service, allocate session and fifos now */
- app_wrk = app_worker_get (app_wrk_index);
+ app_wrk = app_worker_get (rmt->app_wrk_index);
s = session_alloc_for_connection (tc);
s->app_wrk_index = app_wrk->wrk_index;
s->session_state = SESSION_STATE_OPENED;
}
sh = session_handle (s);
+ *rsh = sh;
+
session_lookup_add_connection (tc, sh);
- return app_worker_connect_notify (app_wrk, s, SESSION_E_NONE, opaque);
+ return app_worker_connect_notify (app_wrk, s, SESSION_E_NONE, rmt->opaque);
}
int
-session_open_vc (u32 app_wrk_index, session_endpoint_t * rmt, u32 opaque)
+session_open_vc (session_endpoint_cfg_t *rmt, session_handle_t *rsh)
{
transport_connection_t *tc;
transport_endpoint_cfg_t *tep;
app_worker_t *app_wrk;
- session_t *s;
+ session_t *ho;
int rv;
tep = session_endpoint_to_transport_cfg (rmt);
tc = transport_get_half_open (rmt->transport_proto, (u32) rv);
- app_wrk = app_worker_get (app_wrk_index);
+ app_wrk = app_worker_get (rmt->app_wrk_index);
/* If transport offers a vc service, only allocate established
* session once the connection has been established.
* session on transport notification, and to cleanup the half-open
* session if the app detaches before connection establishment.
*/
- s = session_alloc_for_half_open (tc);
- s->app_wrk_index = app_wrk->wrk_index;
- s->ho_index = app_worker_add_half_open (app_wrk, session_handle (s));
- s->opaque = opaque;
+ ho = session_alloc_for_half_open (tc);
+ ho->app_wrk_index = app_wrk->wrk_index;
+ ho->ho_index = app_worker_add_half_open (app_wrk, session_handle (ho));
+ ho->opaque = rmt->opaque;
+ *rsh = session_handle (ho);
- session_lookup_add_half_open (tc, tc->c_index);
+ if (!(tc->flags & TRANSPORT_CONNECTION_F_NO_LOOKUP))
+ session_lookup_add_half_open (tc, tc->c_index);
return 0;
}
int
-session_open_app (u32 app_wrk_index, session_endpoint_t * rmt, u32 opaque)
+session_open_app (session_endpoint_cfg_t *rmt, session_handle_t *rsh)
{
- session_endpoint_cfg_t *sep = (session_endpoint_cfg_t *) rmt;
- transport_endpoint_cfg_t *tep_cfg = session_endpoint_to_transport_cfg (sep);
-
- sep->app_wrk_index = app_wrk_index;
- sep->opaque = opaque;
+ transport_endpoint_cfg_t *tep_cfg = session_endpoint_to_transport_cfg (rmt);
+ /* Not supported for now */
+ *rsh = SESSION_INVALID_HANDLE;
return transport_connect (rmt->transport_proto, tep_cfg);
}
-typedef int (*session_open_service_fn) (u32, session_endpoint_t *, u32);
+typedef int (*session_open_service_fn) (session_endpoint_cfg_t *,
+ session_handle_t *);
/* *INDENT-OFF* */
static session_open_service_fn session_open_srv_fns[TRANSPORT_N_SERVICES] = {
* on open completion.
*/
int
-session_open (u32 app_wrk_index, session_endpoint_t * rmt, u32 opaque)
+session_open (session_endpoint_cfg_t *rmt, session_handle_t *rsh)
{
transport_service_type_t tst;
tst = transport_protocol_service_type (rmt->transport_proto);
- return session_open_srv_fns[tst] (app_wrk_index, rmt, opaque);
+ return session_open_srv_fns[tst](rmt, rsh);
}
/**
return 0;
}
+/**
+ * Initialize session half-closing procedure.
+ *
+ * Note that half-closing will not change the state of the session.
+ */
+void
+session_half_close (session_t *s)
+{
+ if (!s)
+ return;
+
+ session_program_transport_ctrl_evt (s, SESSION_CTRL_EVT_HALF_CLOSE);
+}
+
/**
* Initialize session closing procedure.
*
session_program_transport_ctrl_evt (s, SESSION_CTRL_EVT_RESET);
}
+/**
+ * Notify transport the session can be half-disconnected.
+ *
+ * Must be called from the session's thread.
+ */
+void
+session_transport_half_close (session_t *s)
+{
+ /* Only READY session can be half-closed */
+ if (s->session_state != SESSION_STATE_READY)
+ {
+ return;
+ }
+
+ transport_half_close (session_get_transport_proto (s), s->connection_index,
+ s->thread_index);
+}
+
/**
* Notify transport the session can be disconnected. This should eventually
* result in a delete notification that allows us to cleanup session state.
wrk->ctrl_head = clib_llist_make_head (wrk->event_elts, evt_list);
wrk->new_head = clib_llist_make_head (wrk->event_elts, evt_list);
wrk->old_head = clib_llist_make_head (wrk->event_elts, evt_list);
+ wrk->pending_connects = clib_llist_make_head (wrk->event_elts, evt_list);
wrk->vm = vlib_get_main_by_index (i);
wrk->last_vlib_time = vlib_time_now (vm);
wrk->last_vlib_us_time = wrk->last_vlib_time * CLIB_US_TIME_FREQ;
+ wrk->timerfd = -1;
vec_validate (wrk->session_to_enqueue, smm->last_transport_proto_type);
if (num_threads > 1)
vlib_node_set_state (vm, session_queue_node.index, mstate);
if (is_en)
{
+ session_main_get_worker (0)->state = SESSION_WRK_INTERRUPT;
vlib_node_set_state (vm, session_queue_process_node.index,
state);
n = vlib_get_node (vm, session_queue_process_node.index);
smm->evt_qs_segment_size = 1 << 20;
#endif
- smm->last_transport_proto_type = TRANSPORT_PROTO_DTLS;
+ smm->last_transport_proto_type = TRANSPORT_PROTO_SRTP;
return 0;
}
else if (unformat (input, "segment-baseva 0x%lx", &smm->session_baseva))
;
else if (unformat (input, "use-app-socket-api"))
- appns_sapi_enable ();
+ (void) appns_sapi_enable_disable (1 /* is_enable */);
else if (unformat (input, "poll-main"))
smm->poll_main = 1;
else if (unformat (input, "use-private-rx-mqs"))