int app_worker_reset_notify (app_worker_t * app_wrk, session_t * s);
int app_worker_cleanup_notify (app_worker_t * app_wrk, session_t * s,
session_cleanup_ntf_t ntf);
+int app_worker_migrate_notify (app_worker_t * app_wrk, session_t * s,
+ session_handle_t new_sh);
int app_worker_builtin_rx (app_worker_t * app_wrk, session_t * s);
int app_worker_builtin_tx (app_worker_t * app_wrk, session_t * s);
segment_manager_t *app_worker_get_listen_segment_manager (app_worker_t *,
/** Notify app that session was reset */
void (*session_reset_callback) (session_t * s);
+ /** Notify app that session pool migration happened */
+ void (*session_migrate_callback) (session_t * s, session_handle_t new_sh);
+
/** Direct RX callback for built-in application */
int (*builtin_app_rx_callback) (session_t * session);
return 0;
}
+int
+app_worker_migrate_notify (app_worker_t * app_wrk, session_t * s,
+ session_handle_t new_sh)
+{
+ application_t *app = application_get (app_wrk->app_index);
+ app->cb_fns.session_migrate_callback (s, new_sh);
+ return 0;
+}
+
int
app_worker_own_session (app_worker_t * app_wrk, session_t * s)
{
u32 new_session_index;
} session_switch_pool_args_t;
+/**
+ * Notify old thread of the session pool switch
+ */
static void
session_switch_pool (void *cb_args)
{
session_switch_pool_args_t *args = (session_switch_pool_args_t *) cb_args;
+ app_worker_t *app_wrk;
session_t *s;
+
ASSERT (args->thread_index == vlib_get_thread_index ());
s = session_get (args->session_index, args->thread_index);
s->tx_fifo->master_session_index = args->new_session_index;
s->tx_fifo->master_thread_index = args->new_thread_index;
transport_cleanup (session_get_transport_proto (s), s->connection_index,
s->thread_index);
+
+ app_wrk = app_worker_get_if_valid (s->app_wrk_index);
+ if (app_wrk)
+ {
+ session_handle_t new_sh;
+ new_sh = session_make_handle (args->new_session_index,
+ args->new_thread_index);
+ app_worker_migrate_notify (app_wrk, s, new_sh);
+ }
+
session_free (s);
clib_mem_free (cb_args);
}
return 0;
}
+static void
+mq_send_session_migrate_cb (session_t * s, session_handle_t new_sh)
+{
+ clib_warning ("not supported");
+}
+
+
static session_cb_vft_t session_mq_cb_vft = {
.session_accept_callback = mq_send_session_accepted_cb,
.session_disconnect_callback = mq_send_session_disconnected_cb,
.session_connected_callback = mq_send_session_connected_cb,
.session_reset_callback = mq_send_session_reset_cb,
+ .session_migrate_callback = mq_send_session_migrate_cb,
.add_segment_callback = send_add_segment_callback,
.del_segment_callback = send_del_segment_callback,
};
*thread_index = session_thread_from_handle (handle);
}
+static inline session_handle_t
+session_make_handle (u32 session_index, u32 thread_index)
+{
+ return (((u64) thread_index << 32) | (u64) session_index);
+}
+
typedef enum
{
SESSION_IO_EVT_RX,