#include <vnet/session/session.h>
#include <vnet/session/application.h>
#include <vnet/session/application_interface.h>
+#include <vnet/session/application_local.h>
#include <vnet/session/session_debug.h>
#include <svm/queue.h>
+static void session_mq_accepted_reply_handler (void *data);
+
+static void
+accepted_notify_cb (void *data, u32 data_len)
+{
+ session_mq_accepted_reply_handler (data);
+}
+
static void
session_mq_accepted_reply_handler (void *data)
{
vnet_disconnect_args_t _a = { 0 }, *a = &_a;
session_state_t old_state;
app_worker_t *app_wrk;
- local_session_t *ls;
session_t *s;
/* Server isn't interested, kill the session */
return;
}
- if (session_handle_is_local (mp->handle))
+ /* Mail this back from the main thread. We're not polling in main
+ * thread so we're using other workers for notifications. */
+ if (vlib_num_workers () && vlib_get_thread_index () != 0
+ && session_thread_from_handle (mp->handle) == 0)
{
- ls = app_worker_get_local_session_from_handle (mp->handle);
- if (!ls)
- {
- clib_warning ("unknown local handle 0x%lx", mp->handle);
- return;
- }
- app_wrk = app_worker_get (ls->app_wrk_index);
- if (app_wrk->app_index != mp->context)
- {
- clib_warning ("server %u doesn't own local handle 0x%lx",
- mp->context, mp->handle);
- return;
- }
- if (app_worker_local_session_connect_notify (ls))
+ vl_api_rpc_call_main_thread (accepted_notify_cb, data,
+ sizeof (session_accepted_reply_msg_t));
+ return;
+ }
+
+ s = session_get_from_handle_if_valid (mp->handle);
+ if (!s)
+ return;
+
+ app_wrk = app_worker_get (s->app_wrk_index);
+ if (app_wrk->app_index != mp->context)
+ {
+ clib_warning ("app doesn't own session");
+ return;
+ }
+
+ if (!session_has_transport (s))
+ {
+ s->session_state = SESSION_STATE_READY;
+ if (ct_session_connect_notify (s))
return;
- ls->session_state = SESSION_STATE_READY;
}
else
{
- s = session_get_from_handle_if_valid (mp->handle);
- if (!s)
- return;
-
- app_wrk = app_worker_get (s->app_wrk_index);
- if (app_wrk->app_index != mp->context)
- {
- clib_warning ("app doesn't own session");
- return;
- }
-
old_state = s->session_state;
s->session_state = SESSION_STATE_READY;
if (!svm_fifo_is_empty (s->rx_fifo))
- app_worker_lock_and_send_event (app_wrk, s, FIFO_EVENT_APP_RX);
+ app_worker_lock_and_send_event (app_wrk, s, SESSION_IO_EVT_RX);
/* Closed while waiting for app to reply. Resend disconnect */
if (old_state >= SESSION_STATE_TRANSPORT_CLOSING)
* Retransmit messages that may have been lost
*/
if (s->tx_fifo && !svm_fifo_is_empty (s->tx_fifo))
- session_send_io_evt_to_thread (s->tx_fifo, FIFO_EVENT_APP_TX);
+ session_send_io_evt_to_thread (s->tx_fifo, SESSION_IO_EVT_TX);
if (s->rx_fifo && !svm_fifo_is_empty (s->rx_fifo))
- app_worker_lock_and_send_event (app_wrk, s, FIFO_EVENT_APP_RX);
+ app_worker_lock_and_send_event (app_wrk, s, SESSION_IO_EVT_RX);
if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSING)
app->cb_fns.session_disconnect_callback (s);
{
vlib_buffer_t *chain_b, *prev_b;
u32 chain_bi0, to_deq, left_from_seg;
- session_manager_worker_t *wrk;
+ session_worker_t *wrk;
u16 len_to_deq, n_bytes_read;
u8 *data, j;
- wrk = session_manager_get_worker (ctx->s->thread_index);
+ wrk = session_main_get_worker (ctx->s->thread_index);
b->flags |= VLIB_BUFFER_TOTAL_LENGTH_VALID;
b->total_length_not_including_first_buffer = 0;
/* *INDENT-OFF* */
SESSION_EVT_DBG(SESSION_EVT_DEQ, ctx->s, ({
- ed->data[0] = FIFO_EVENT_APP_TX;
+ ed->data[0] = SESSION_IO_EVT_TX;
ed->data[1] = ctx->max_dequeue;
ed->data[2] = len_to_deq;
ed->data[3] = ctx->left_to_snd;
ctx->max_len_to_snd = max_segs * ctx->snd_mss;
}
- n_bytes_per_buf = VLIB_BUFFER_DATA_SIZE;
+ n_bytes_per_buf = vlib_buffer_get_default_data_size (vm);
ASSERT (n_bytes_per_buf > TRANSPORT_MAX_HDRS_LEN);
n_bytes_per_seg = TRANSPORT_MAX_HDRS_LEN + ctx->snd_mss;
ctx->n_bufs_per_seg = ceil ((f64) n_bytes_per_seg / n_bytes_per_buf);
always_inline int
session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node,
- session_manager_worker_t * wrk,
+ session_worker_t * wrk,
session_event_t * e, int *n_tx_packets,
u8 peek_data)
{
u32 next_index, next0, next1, *to_next, n_left_to_next, n_left, pbi;
u32 n_trace = vlib_get_trace_count (vm, node), n_bufs_needed = 0;
- session_manager_main_t *smm = &session_manager_main;
+ session_main_t *smm = &session_main;
session_tx_context_t *ctx = &wrk->ctx;
transport_proto_t tp;
vlib_buffer_t *pb;
int
session_tx_fifo_peek_and_snd (vlib_main_t * vm, vlib_node_runtime_t * node,
- session_manager_worker_t * wrk,
+ session_worker_t * wrk,
session_event_t * e, int *n_tx_pkts)
{
return session_tx_fifo_read_and_snd_i (vm, node, wrk, e, n_tx_pkts, 1);
int
session_tx_fifo_dequeue_and_snd (vlib_main_t * vm, vlib_node_runtime_t * node,
- session_manager_worker_t * wrk,
+ session_worker_t * wrk,
session_event_t * e, int *n_tx_pkts)
{
return session_tx_fifo_read_and_snd_i (vm, node, wrk, e, n_tx_pkts, 0);
int
session_tx_fifo_dequeue_internal (vlib_main_t * vm,
vlib_node_runtime_t * node,
- session_manager_worker_t * wrk,
+ session_worker_t * wrk,
session_event_t * e, int *n_tx_pkts)
{
session_t *s = wrk->ctx.s;
- application_t *app;
- if (PREDICT_FALSE (s->session_state == SESSION_STATE_CLOSED))
+ if (PREDICT_FALSE (s->session_state >= SESSION_STATE_TRANSPORT_CLOSED))
return 0;
- app = application_get (s->t_app_index);
svm_fifo_unset_event (s->tx_fifo);
- return app->cb_fns.builtin_app_tx_callback (s);
+ return transport_custom_tx (session_get_transport_proto (s), s);
}
always_inline session_t *
session_event_get_session (session_event_t * e, u8 thread_index)
{
- return session_get_if_valid (e->fifo->master_session_index, thread_index);
+ return session_get_if_valid (e->session_index, thread_index);
}
static void
-session_update_dispatch_period (session_manager_worker_t * wrk, f64 now,
+session_update_dispatch_period (session_worker_t * wrk, f64 now,
u32 thread_index)
{
if (wrk->last_tx_packets)
session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
vlib_frame_t * frame)
{
- session_manager_main_t *smm = vnet_get_session_manager_main ();
+ session_main_t *smm = vnet_get_session_main ();
u32 thread_index = vm->thread_index, n_to_dequeue, n_events;
- session_manager_worker_t *wrk = &smm->wrk[thread_index];
+ session_worker_t *wrk = &smm->wrk[thread_index];
session_event_t *e, *fifo_events;
svm_msg_q_msg_t _msg, *msg = &_msg;
f64 now = vlib_time_now (vm);
switch (e->event_type)
{
case SESSION_IO_EVT_TX_FLUSH:
- case FIFO_EVENT_APP_TX:
+ case SESSION_IO_EVT_TX:
/* Don't try to send more that one frame per dispatch cycle */
if (n_tx_packets == VLIB_FRAME_SIZE)
{
continue;
}
break;
- case FIFO_EVENT_DISCONNECT:
+ case SESSION_CTRL_EVT_CLOSE:
s = session_get_from_handle_if_valid (e->session_handle);
if (PREDICT_FALSE (!s))
break;
session_transport_close (s);
break;
- case FIFO_EVENT_BUILTIN_RX:
+ case SESSION_IO_EVT_BUILTIN_RX:
s = session_event_get_session (e, thread_index);
if (PREDICT_FALSE (!s || s->session_state >= SESSION_STATE_CLOSING))
continue;
app = application_get (app_wrk->app_index);
app->cb_fns.builtin_app_rx_callback (s);
break;
- case FIFO_EVENT_BUILTIN_TX:
+ case SESSION_IO_EVT_BUILTIN_TX:
s = session_get_from_handle_if_valid (e->session_handle);
wrk->ctx.s = s;
if (PREDICT_TRUE (s != 0))
session_tx_fifo_dequeue_internal (vm, node, wrk, e,
&n_tx_packets);
break;
- case FIFO_EVENT_RPC:
+ case SESSION_CTRL_EVT_RPC:
fp = e->rpc_args.fp;
(*fp) (e->rpc_args.arg);
break;
void
dump_thread_0_event_queue (void)
{
- session_manager_main_t *smm = vnet_get_session_manager_main ();
+ session_main_t *smm = vnet_get_session_main ();
vlib_main_t *vm = &vlib_global_main;
u32 my_thread_index = vm->thread_index;
session_event_t _e, *e = &_e;
switch (e->event_type)
{
- case FIFO_EVENT_APP_TX:
+ case SESSION_IO_EVT_TX:
s0 = session_event_get_session (e, my_thread_index);
fformat (stdout, "[%04d] TX session %d\n", i, s0->session_index);
break;
- case FIFO_EVENT_DISCONNECT:
+ case SESSION_CTRL_EVT_CLOSE:
s0 = session_get_from_handle (e->session_handle);
fformat (stdout, "[%04d] disconnect session %d\n", i,
s0->session_index);
break;
- case FIFO_EVENT_BUILTIN_RX:
+ case SESSION_IO_EVT_BUILTIN_RX:
s0 = session_event_get_session (e, my_thread_index);
fformat (stdout, "[%04d] builtin_rx %d\n", i, s0->session_index);
break;
- case FIFO_EVENT_RPC:
+ case SESSION_CTRL_EVT_RPC:
fformat (stdout, "[%04d] RPC call %llx with %llx\n",
i, (u64) (uword) (e->rpc_args.fp),
(u64) (uword) (e->rpc_args.arg));
session_t *s;
switch (e->event_type)
{
- case FIFO_EVENT_APP_RX:
- case FIFO_EVENT_APP_TX:
- case FIFO_EVENT_BUILTIN_RX:
- if (e->fifo == f)
+ case SESSION_IO_EVT_RX:
+ case SESSION_IO_EVT_TX:
+ case SESSION_IO_EVT_BUILTIN_RX:
+ if (e->session_index == f->master_session_index)
return 1;
break;
- case FIFO_EVENT_DISCONNECT:
+ case SESSION_CTRL_EVT_CLOSE:
break;
- case FIFO_EVENT_RPC:
+ case SESSION_CTRL_EVT_RPC:
s = session_get_from_handle (e->session_handle);
if (!s)
{
session_node_lookup_fifo_event (svm_fifo_t * f, session_event_t * e)
{
session_event_t *pending_event_vector, *evt;
- session_manager_worker_t *wrk;
+ session_worker_t *wrk;
int i, index, found = 0;
svm_msg_q_msg_t *msg;
svm_msg_q_ring_t *ring;
ASSERT (e);
thread_index = f->master_thread_index;
- wrk = session_manager_get_worker (thread_index);
+ wrk = session_main_get_worker (thread_index);
/*
* Search evt queue
};
/* *INDENT-ON* */
+static_always_inline uword
+session_queue_pre_input_inline (vlib_main_t * vm, vlib_node_runtime_t * node,
+ vlib_frame_t * frame)
+{
+ session_main_t *sm = &session_main;
+ if (!sm->wrk[0].vpp_event_queue)
+ return 0;
+ return session_queue_node_fn (vm, node, frame);
+}
+
+/* *INDENT-OFF* */
+VLIB_REGISTER_NODE (session_queue_pre_input_node) =
+{
+ .function = session_queue_pre_input_inline,
+ .type = VLIB_NODE_TYPE_PRE_INPUT,
+ .name = "session-queue-main",
+ .state = VLIB_NODE_STATE_DISABLED,
+};
+/* *INDENT-ON* */
+
/*
* fd.io coding-style-patch-verification: ON
*