if (!session_has_transport (s))
{
+ s->session_state = SESSION_STATE_READY;
if (ct_session_connect_notify (s))
return;
- s->session_state = SESSION_STATE_READY;
}
else
{
old_state = s->session_state;
s->session_state = SESSION_STATE_READY;
- if (!svm_fifo_is_empty (s->rx_fifo))
+
+ if (!svm_fifo_is_empty_prod (s->rx_fifo))
app_worker_lock_and_send_event (app_wrk, s, SESSION_IO_EVT_RX);
/* Closed while waiting for app to reply. Resend disconnect */
if (old_state >= SESSION_STATE_TRANSPORT_CLOSING)
{
- application_t *app = application_get (app_wrk->app_index);
- app->cb_fns.session_disconnect_callback (s);
+ app_worker_close_notify (app_wrk, s);
s->session_state = old_state;
return;
}
app_worker_lock_and_send_event (app_wrk, s, SESSION_IO_EVT_RX);
if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSING)
- app->cb_fns.session_disconnect_callback (s);
+ app_worker_close_notify (app_wrk, s);
}
vlib_node_registration_t session_queue_node;
{
vlib_buffer_t *chain_b, *prev_b;
u32 chain_bi0, to_deq, left_from_seg;
- session_manager_worker_t *wrk;
+ session_worker_t *wrk;
u16 len_to_deq, n_bytes_read;
u8 *data, j;
- wrk = session_manager_get_worker (ctx->s->thread_index);
+ wrk = session_main_get_worker (ctx->s->thread_index);
b->flags |= VLIB_BUFFER_TOTAL_LENGTH_VALID;
b->total_length_not_including_first_buffer = 0;
u32 max_segs, u8 peek_data)
{
u32 n_bytes_per_buf, n_bytes_per_seg;
- ctx->max_dequeue = svm_fifo_max_dequeue (ctx->s->tx_fifo);
+ ctx->max_dequeue = svm_fifo_max_dequeue_cons (ctx->s->tx_fifo);
if (peek_data)
{
/* Offset in rx fifo from where to peek data */
always_inline int
session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node,
- session_manager_worker_t * wrk,
+ session_worker_t * wrk,
session_event_t * e, int *n_tx_packets,
u8 peek_data)
{
u32 next_index, next0, next1, *to_next, n_left_to_next, n_left, pbi;
u32 n_trace = vlib_get_trace_count (vm, node), n_bufs_needed = 0;
- session_manager_main_t *smm = &session_manager_main;
+ session_main_t *smm = &session_main;
session_tx_context_t *ctx = &wrk->ctx;
transport_proto_t tp;
vlib_buffer_t *pb;
svm_fifo_overwrite_head (ctx->s->tx_fifo, (u8 *) & ctx->hdr,
sizeof (session_dgram_pre_hdr_t));
/* More data needs to be read */
- else if (svm_fifo_max_dequeue (ctx->s->tx_fifo) > 0)
+ else if (svm_fifo_max_dequeue_cons (ctx->s->tx_fifo) > 0)
if (svm_fifo_set_event (ctx->s->tx_fifo))
vec_add1 (wrk->pending_event_vector, *e);
}
int
session_tx_fifo_peek_and_snd (vlib_main_t * vm, vlib_node_runtime_t * node,
- session_manager_worker_t * wrk,
+ session_worker_t * wrk,
session_event_t * e, int *n_tx_pkts)
{
return session_tx_fifo_read_and_snd_i (vm, node, wrk, e, n_tx_pkts, 1);
int
session_tx_fifo_dequeue_and_snd (vlib_main_t * vm, vlib_node_runtime_t * node,
- session_manager_worker_t * wrk,
+ session_worker_t * wrk,
session_event_t * e, int *n_tx_pkts)
{
return session_tx_fifo_read_and_snd_i (vm, node, wrk, e, n_tx_pkts, 0);
int
session_tx_fifo_dequeue_internal (vlib_main_t * vm,
vlib_node_runtime_t * node,
- session_manager_worker_t * wrk,
+ session_worker_t * wrk,
session_event_t * e, int *n_tx_pkts)
{
session_t *s = wrk->ctx.s;
- application_t *app;
- if (PREDICT_FALSE (s->session_state == SESSION_STATE_CLOSED))
+ if (PREDICT_FALSE (s->session_state >= SESSION_STATE_TRANSPORT_CLOSED))
return 0;
- app = application_get (s->t_app_index);
svm_fifo_unset_event (s->tx_fifo);
- return app->cb_fns.builtin_app_tx_callback (s);
+ return transport_custom_tx (session_get_transport_proto (s), s);
}
always_inline session_t *
session_event_get_session (session_event_t * e, u8 thread_index)
{
- return session_get_if_valid (e->fifo->master_session_index, thread_index);
+ return session_get_if_valid (e->session_index, thread_index);
}
static void
-session_update_dispatch_period (session_manager_worker_t * wrk, f64 now,
+session_update_dispatch_period (session_worker_t * wrk, f64 now,
u32 thread_index)
{
if (wrk->last_tx_packets)
session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
vlib_frame_t * frame)
{
- session_manager_main_t *smm = vnet_get_session_manager_main ();
+ session_main_t *smm = vnet_get_session_main ();
u32 thread_index = vm->thread_index, n_to_dequeue, n_events;
- session_manager_worker_t *wrk = &smm->wrk[thread_index];
+ session_worker_t *wrk = &smm->wrk[thread_index];
session_event_t *e, *fifo_events;
svm_msg_q_msg_t _msg, *msg = &_msg;
f64 now = vlib_time_now (vm);
int n_tx_packets = 0, i, rv;
app_worker_t *app_wrk;
- application_t *app;
svm_msg_q_t *mq;
void (*fp) (void *);
clib_warning ("session was freed!");
continue;
}
+ CLIB_PREFETCH (s->tx_fifo, 2 * CLIB_CACHE_LINE_BYTES, LOAD);
wrk->ctx.s = s;
/* Spray packets in per session type frames, since they go to
* different nodes */
continue;
}
break;
+ case SESSION_IO_EVT_RX:
+ s = session_event_get_session (e, thread_index);
+ if (!s)
+ break;
+ transport_app_rx_evt (session_get_transport_proto (s),
+ s->connection_index, s->thread_index);
+ break;
case SESSION_CTRL_EVT_CLOSE:
s = session_get_from_handle_if_valid (e->session_handle);
if (PREDICT_FALSE (!s))
* and the tx queue is still not empty, try to wait for some
* dispatch cycles */
if (!e->postponed
- || (e->postponed < 200 && svm_fifo_max_dequeue (s->tx_fifo)))
+ || (e->postponed < 200
+ && svm_fifo_max_dequeue_cons (s->tx_fifo)))
{
e->postponed += 1;
vec_add1 (wrk->pending_disconnects, *e);
continue;
svm_fifo_unset_event (s->rx_fifo);
app_wrk = app_worker_get (s->app_wrk_index);
- app = application_get (app_wrk->app_index);
- app->cb_fns.builtin_app_rx_callback (s);
+ app_worker_builtin_rx (app_wrk, s);
break;
case SESSION_IO_EVT_BUILTIN_TX:
s = session_get_from_handle_if_valid (e->session_handle);
void
dump_thread_0_event_queue (void)
{
- session_manager_main_t *smm = vnet_get_session_manager_main ();
+ session_main_t *smm = vnet_get_session_main ();
vlib_main_t *vm = &vlib_global_main;
u32 my_thread_index = vm->thread_index;
session_event_t _e, *e = &_e;
case SESSION_IO_EVT_RX:
case SESSION_IO_EVT_TX:
case SESSION_IO_EVT_BUILTIN_RX:
- if (e->fifo == f)
+ if (e->session_index == f->master_session_index)
return 1;
break;
case SESSION_CTRL_EVT_CLOSE:
session_node_lookup_fifo_event (svm_fifo_t * f, session_event_t * e)
{
session_event_t *pending_event_vector, *evt;
- session_manager_worker_t *wrk;
+ session_worker_t *wrk;
int i, index, found = 0;
svm_msg_q_msg_t *msg;
svm_msg_q_ring_t *ring;
ASSERT (e);
thread_index = f->master_thread_index;
- wrk = session_manager_get_worker (thread_index);
+ wrk = session_main_get_worker (thread_index);
/*
* Search evt queue
};
/* *INDENT-ON* */
+static_always_inline uword
+session_queue_pre_input_inline (vlib_main_t * vm, vlib_node_runtime_t * node,
+ vlib_frame_t * frame)
+{
+ session_main_t *sm = &session_main;
+ if (!sm->wrk[0].vpp_event_queue)
+ return 0;
+ return session_queue_node_fn (vm, node, frame);
+}
+
+/* *INDENT-OFF* */
+VLIB_REGISTER_NODE (session_queue_pre_input_node) =
+{
+ .function = session_queue_pre_input_inline,
+ .type = VLIB_NODE_TYPE_PRE_INPUT,
+ .name = "session-queue-main",
+ .state = VLIB_NODE_STATE_DISABLED,
+};
+/* *INDENT-ON* */
+
/*
* fd.io coding-style-patch-verification: ON
*