#include <svm/queue.h>
#include <sys/timerfd.h>
-#define app_check_thread_and_barrier(_fn, _arg) \
- if (!vlib_thread_is_main_w_barrier ()) \
- { \
- vlib_rpc_call_main_thread (_fn, (u8 *) _arg, sizeof(*_arg)); \
- return; \
- }
+static inline void
+session_wrk_send_evt_to_main (session_worker_t *wrk, session_evt_elt_t *elt)
+{
+ session_evt_elt_t *he;
+ uword thread_index;
+ u8 is_empty;
+
+ thread_index = wrk->vm->thread_index;
+ he = clib_llist_elt (wrk->event_elts, wrk->evts_pending_main);
+ is_empty = clib_llist_is_empty (wrk->event_elts, evt_list, he);
+ clib_llist_add_tail (wrk->event_elts, evt_list, elt, he);
+ if (is_empty)
+ session_send_rpc_evt_to_thread (0, session_wrk_handle_evts_main_rpc,
+ uword_to_pointer (thread_index, void *));
+}
+
+#define app_check_thread_and_barrier(_wrk, _elt) \
+ if (!vlib_thread_is_main_w_barrier ()) \
+ { \
+ session_wrk_send_evt_to_main (wrk, elt); \
+ return; \
+ }
static void
session_wrk_timerfd_update (session_worker_t *wrk, u64 time_ns)
}
static void
-session_mq_listen_handler (void *data)
+session_mq_listen_handler (session_worker_t *wrk, session_evt_elt_t *elt)
{
- session_listen_msg_t *mp = (session_listen_msg_t *) data;
vnet_listen_args_t _a, *a = &_a;
+ session_listen_msg_t *mp;
app_worker_t *app_wrk;
application_t *app;
int rv;
- app_check_thread_and_barrier (session_mq_listen_handler, mp);
+ app_check_thread_and_barrier (wrk, elt);
+ mp = session_evt_ctrl_data (wrk, elt);
app = application_lookup (mp->client_index);
if (!app)
return;
}
static void
-session_mq_listen_uri_handler (void *data)
+session_mq_listen_uri_handler (session_worker_t *wrk, session_evt_elt_t *elt)
{
- session_listen_uri_msg_t *mp = (session_listen_uri_msg_t *) data;
vnet_listen_args_t _a, *a = &_a;
+ session_listen_uri_msg_t *mp;
app_worker_t *app_wrk;
application_t *app;
int rv;
- app_check_thread_and_barrier (session_mq_listen_uri_handler, mp);
+ app_check_thread_and_barrier (wrk, elt);
+ mp = session_evt_ctrl_data (wrk, elt);
app = application_lookup (mp->client_index);
if (!app)
return;
a->sep.port = mp->port;
a->sep.transport_proto = mp->proto;
a->sep.peer.fib_index = mp->vrf;
+ a->sep.dscp = mp->dscp;
clib_memcpy_fast (&a->sep.peer.ip, &mp->lcl_ip, sizeof (mp->lcl_ip));
if (mp->is_ip4)
{
}
static void
-session_mq_connect_uri_handler (void *data)
+session_mq_connect_uri_handler (session_worker_t *wrk, session_evt_elt_t *elt)
{
- session_connect_uri_msg_t *mp = (session_connect_uri_msg_t *) data;
vnet_connect_args_t _a, *a = &_a;
+ session_connect_uri_msg_t *mp;
app_worker_t *app_wrk;
application_t *app;
int rv;
- app_check_thread_and_barrier (session_mq_connect_uri_handler, mp);
+ app_check_thread_and_barrier (wrk, elt);
+ mp = session_evt_ctrl_data (wrk, elt);
app = application_lookup (mp->client_index);
if (!app)
return;
}
static void
-app_mq_detach_handler (void *data)
+app_mq_detach_handler (session_worker_t *wrk, session_evt_elt_t *elt)
{
- session_app_detach_msg_t *mp = (session_app_detach_msg_t *) data;
vnet_app_detach_args_t _a, *a = &_a;
+ session_app_detach_msg_t *mp;
application_t *app;
- app_check_thread_and_barrier (app_mq_detach_handler, mp);
+ app_check_thread_and_barrier (wrk, elt);
+ mp = session_evt_ctrl_data (wrk, elt);
app = application_lookup (mp->client_index);
if (!app)
return;
}
static void
-session_mq_unlisten_rpc (session_unlisten_msg_t *mp)
+session_mq_unlisten_handler (session_worker_t *wrk, session_evt_elt_t *elt)
{
- vlib_main_t *vm = vlib_get_main ();
vnet_unlisten_args_t _a, *a = &_a;
+ session_unlisten_msg_t *mp;
app_worker_t *app_wrk;
session_handle_t sh;
application_t *app;
- u32 context;
int rv;
+ app_check_thread_and_barrier (wrk, elt);
+
+ mp = session_evt_ctrl_data (wrk, elt);
sh = mp->handle;
- context = mp->context;
app = application_lookup (mp->client_index);
if (!app)
a->handle = sh;
a->wrk_map_index = mp->wrk_index;
- vlib_worker_thread_barrier_sync (vm);
-
if ((rv = vnet_unlisten (a)))
clib_warning ("unlisten returned: %d", rv);
- vlib_worker_thread_barrier_release (vm);
-
app_wrk = application_get_worker (app, a->wrk_map_index);
if (!app_wrk)
return;
- mq_send_unlisten_reply (app_wrk, sh, context, rv);
- clib_mem_free (mp);
+ mq_send_unlisten_reply (app_wrk, sh, mp->context, rv);
}
static void
-session_mq_unlisten_handler (session_worker_t *wrk, session_evt_elt_t *elt)
+session_mq_accepted_reply_handler (session_worker_t *wrk,
+ session_evt_elt_t *elt)
{
- u32 thread_index = wrk - session_main.wrk;
- session_unlisten_msg_t *mp, *arg;
-
- mp = session_evt_ctrl_data (wrk, elt);
- arg = clib_mem_alloc (sizeof (session_unlisten_msg_t));
- clib_memcpy_fast (arg, mp, sizeof (*arg));
-
- if (PREDICT_FALSE (!thread_index))
- {
- session_mq_unlisten_rpc (arg);
- return;
- }
-
- session_send_rpc_evt_to_thread_force (0, session_mq_unlisten_rpc, arg);
-}
-
-static void
-session_mq_accepted_reply_handler (void *data)
-{
- session_accepted_reply_msg_t *mp = (session_accepted_reply_msg_t *) data;
vnet_disconnect_args_t _a = { 0 }, *a = &_a;
+ session_accepted_reply_msg_t *mp;
session_state_t old_state;
app_worker_t *app_wrk;
session_t *s;
- /* Server isn't interested, kill the session */
- if (mp->retval)
- {
- a->app_index = mp->context;
- a->handle = mp->handle;
- vnet_disconnect_session (a);
- return;
- }
+ mp = session_evt_ctrl_data (wrk, elt);
/* Mail this back from the main thread. We're not polling in main
* thread so we're using other workers for notifications. */
- if (vlib_num_workers () && vlib_get_thread_index () != 0
- && session_thread_from_handle (mp->handle) == 0)
+ if (session_thread_from_handle (mp->handle) == 0 && vlib_num_workers () &&
+ vlib_get_thread_index () != 0)
{
- vlib_rpc_call_main_thread (session_mq_accepted_reply_handler,
- (u8 *) mp, sizeof (*mp));
+ session_wrk_send_evt_to_main (wrk, elt);
return;
}
return;
}
- if (!session_has_transport (s))
+ /* Server isn't interested, disconnect the session */
+ if (mp->retval)
{
- s->session_state = SESSION_STATE_READY;
- if (ct_session_connect_notify (s, SESSION_E_NONE))
- return;
+ a->app_index = mp->context;
+ a->handle = mp->handle;
+ vnet_disconnect_session (a);
+ return;
}
- else
+
+ /* Special handling for cut-through sessions */
+ if (!session_has_transport (s))
{
- old_state = s->session_state;
- s->session_state = SESSION_STATE_READY;
+ session_set_state (s, SESSION_STATE_READY);
+ ct_session_connect_notify (s, SESSION_E_NONE);
+ return;
+ }
- if (!svm_fifo_is_empty_prod (s->rx_fifo))
- app_worker_lock_and_send_event (app_wrk, s, SESSION_IO_EVT_RX);
+ old_state = s->session_state;
+ session_set_state (s, SESSION_STATE_READY);
- /* Closed while waiting for app to reply. Resend disconnect */
- if (old_state >= SESSION_STATE_TRANSPORT_CLOSING)
- {
- app_worker_close_notify (app_wrk, s);
- s->session_state = old_state;
- return;
- }
+ if (!svm_fifo_is_empty_prod (s->rx_fifo))
+ app_worker_lock_and_send_event (app_wrk, s, SESSION_IO_EVT_RX);
+
+ /* Closed while waiting for app to reply. Resend disconnect */
+ if (old_state >= SESSION_STATE_TRANSPORT_CLOSING)
+ {
+ app_worker_close_notify (app_wrk, s);
+ session_set_state (s, old_state);
+ return;
}
}
svm_msg_q_add_and_unlock (app_wrk->event_queue, msg);
}
+void
+session_wrk_handle_evts_main_rpc (void *args)
+{
+ vlib_main_t *vm = vlib_get_main ();
+ clib_llist_index_t ei, next_ei;
+ session_evt_elt_t *he, *elt;
+ session_worker_t *fwrk;
+ u32 thread_index;
+
+ vlib_worker_thread_barrier_sync (vm);
+
+ thread_index = pointer_to_uword (args);
+ fwrk = session_main_get_worker (thread_index);
+
+ he = clib_llist_elt (fwrk->event_elts, fwrk->evts_pending_main);
+ ei = clib_llist_next_index (he, evt_list);
+
+ while (ei != fwrk->evts_pending_main)
+ {
+ elt = clib_llist_elt (fwrk->event_elts, ei);
+ next_ei = clib_llist_next_index (elt, evt_list);
+ clib_llist_remove (fwrk->event_elts, evt_list, elt);
+ switch (elt->evt.event_type)
+ {
+ case SESSION_CTRL_EVT_LISTEN:
+ session_mq_listen_handler (fwrk, elt);
+ break;
+ case SESSION_CTRL_EVT_UNLISTEN:
+ session_mq_unlisten_handler (fwrk, elt);
+ break;
+ case SESSION_CTRL_EVT_APP_DETACH:
+ app_mq_detach_handler (fwrk, elt);
+ break;
+ case SESSION_CTRL_EVT_CONNECT_URI:
+ session_mq_connect_uri_handler (fwrk, elt);
+ break;
+ case SESSION_CTRL_EVT_ACCEPTED_REPLY:
+ session_mq_accepted_reply_handler (fwrk, elt);
+ break;
+ default:
+ clib_warning ("unhandled %u", elt->evt.event_type);
+ ALWAYS_ASSERT (0);
+ break;
+ }
+
+ /* Regrab element in case pool moved */
+ elt = clib_llist_elt (fwrk->event_elts, ei);
+ session_evt_ctrl_data_free (fwrk, elt);
+ clib_llist_put (fwrk->event_elts, elt);
+ ei = next_ei;
+ }
+
+ vlib_worker_thread_barrier_release (vm);
+}
+
vlib_node_registration_t session_queue_node;
typedef struct
return s;
}
-#define foreach_session_queue_error \
-_(TX, "Packets transmitted") \
-_(TIMER, "Timer events") \
-_(NO_BUFFER, "Out of buffers")
+#define foreach_session_queue_error \
+ _ (TX, tx, INFO, "Packets transmitted") \
+ _ (TIMER, timer, INFO, "Timer events") \
+ _ (NO_BUFFER, no_buffer, ERROR, "Out of buffers")
typedef enum
{
-#define _(sym,str) SESSION_QUEUE_ERROR_##sym,
+#define _(f, n, s, d) SESSION_QUEUE_ERROR_##f,
foreach_session_queue_error
#undef _
SESSION_QUEUE_N_ERROR,
} session_queue_error_t;
-static char *session_queue_error_strings[] = {
-#define _(sym,string) string,
+static vlib_error_desc_t session_error_counters[] = {
+#define _(f, n, s, d) { #n, d, VL_COUNTER_SEVERITY_##s },
foreach_session_queue_error
#undef _
};
};
static void
-session_tx_trace_frame (vlib_main_t * vm, vlib_node_runtime_t * node,
- u32 next_index, u32 * to_next, u16 n_segs,
- session_t * s, u32 n_trace)
+session_tx_trace_frame (vlib_main_t *vm, vlib_node_runtime_t *node,
+ u32 next_index, vlib_buffer_t **bufs, u16 n_segs,
+ session_t *s, u32 n_trace)
{
+ vlib_buffer_t **b = bufs;
+
while (n_trace && n_segs)
{
- vlib_buffer_t *b = vlib_get_buffer (vm, to_next[0]);
- if (PREDICT_TRUE
- (vlib_trace_buffer
- (vm, node, next_index, b, 1 /* follow_chain */ )))
+ if (PREDICT_TRUE (vlib_trace_buffer (vm, node, next_index, b[0],
+ 1 /* follow_chain */)))
{
session_queue_trace_t *t =
- vlib_add_trace (vm, node, b, sizeof (*t));
+ vlib_add_trace (vm, node, b[0], sizeof (*t));
t->session_index = s->session_index;
t->server_thread_index = s->thread_index;
n_trace--;
}
- to_next++;
+ b++;
n_segs--;
}
vlib_set_trace_count (vm, node, n_trace);
}
+always_inline int
+session_tx_fill_dma_transfers (session_worker_t *wrk,
+ session_tx_context_t *ctx, vlib_buffer_t *b)
+{
+ vlib_main_t *vm = wrk->vm;
+ u32 len_to_deq;
+ u8 *data0 = NULL;
+ int n_bytes_read, len_write;
+ svm_fifo_seg_t data_fs[2];
+
+ u32 n_segs = 2;
+ u16 n_transfers = 0;
+ /*
+ * Start with the first buffer in chain
+ */
+ b->error = 0;
+ b->flags = VNET_BUFFER_F_LOCALLY_ORIGINATED;
+ b->current_data = 0;
+ data0 = vlib_buffer_make_headroom (b, TRANSPORT_MAX_HDRS_LEN);
+ len_to_deq = clib_min (ctx->left_to_snd, ctx->deq_per_first_buf);
+
+ n_bytes_read = svm_fifo_segments (ctx->s->tx_fifo, ctx->sp.tx_offset,
+ data_fs, &n_segs, len_to_deq);
+
+ len_write = n_bytes_read;
+ ASSERT (n_bytes_read == len_to_deq);
+
+ while (n_bytes_read)
+ {
+ wrk->batch_num++;
+ vlib_dma_batch_add (vm, wrk->batch, data0, data_fs[n_transfers].data,
+ data_fs[n_transfers].len);
+ data0 += data_fs[n_transfers].len;
+ n_bytes_read -= data_fs[n_transfers].len;
+ n_transfers++;
+ }
+ return len_write;
+}
+
+always_inline int
+session_tx_fill_dma_transfers_tail (session_worker_t *wrk,
+ session_tx_context_t *ctx,
+ vlib_buffer_t *b, u32 len_to_deq, u8 *data)
+{
+ vlib_main_t *vm = wrk->vm;
+ int n_bytes_read, len_write;
+ svm_fifo_seg_t data_fs[2];
+ u32 n_segs = 2;
+ u16 n_transfers = 0;
+
+ n_bytes_read = svm_fifo_segments (ctx->s->tx_fifo, ctx->sp.tx_offset,
+ data_fs, &n_segs, len_to_deq);
+
+ len_write = n_bytes_read;
+
+ ASSERT (n_bytes_read == len_to_deq);
+
+ while (n_bytes_read)
+ {
+ wrk->batch_num++;
+ vlib_dma_batch_add (vm, wrk->batch, data, data_fs[n_transfers].data,
+ data_fs[n_transfers].len);
+ data += data_fs[n_transfers].len;
+ n_bytes_read -= data_fs[n_transfers].len;
+ n_transfers++;
+ }
+
+ return len_write;
+}
+
+always_inline int
+session_tx_copy_data (session_worker_t *wrk, session_tx_context_t *ctx,
+ vlib_buffer_t *b, u32 len_to_deq, u8 *data0)
+{
+ int n_bytes_read;
+ if (PREDICT_TRUE (!wrk->dma_enabled))
+ n_bytes_read =
+ svm_fifo_peek (ctx->s->tx_fifo, ctx->sp.tx_offset, len_to_deq, data0);
+ else
+ n_bytes_read = session_tx_fill_dma_transfers (wrk, ctx, b);
+ return n_bytes_read;
+}
+
+always_inline int
+session_tx_copy_data_tail (session_worker_t *wrk, session_tx_context_t *ctx,
+ vlib_buffer_t *b, u32 len_to_deq, u8 *data)
+{
+ int n_bytes_read;
+ if (PREDICT_TRUE (!wrk->dma_enabled))
+ n_bytes_read =
+ svm_fifo_peek (ctx->s->tx_fifo, ctx->sp.tx_offset, len_to_deq, data);
+ else
+ n_bytes_read =
+ session_tx_fill_dma_transfers_tail (wrk, ctx, b, len_to_deq, data);
+ return n_bytes_read;
+}
+
always_inline void
-session_tx_fifo_chain_tail (vlib_main_t * vm, session_tx_context_t * ctx,
- vlib_buffer_t * b, u16 * n_bufs, u8 peek_data)
+session_tx_fifo_chain_tail (session_worker_t *wrk, session_tx_context_t *ctx,
+ vlib_buffer_t *b, u16 *n_bufs, u8 peek_data)
{
+ vlib_main_t *vm = wrk->vm;
vlib_buffer_t *chain_b, *prev_b;
u32 chain_bi0, to_deq, left_from_seg;
- u16 len_to_deq, n_bytes_read;
+ int len_to_deq, n_bytes_read;
u8 *data, j;
b->flags |= VLIB_BUFFER_TOTAL_LENGTH_VALID;
data = vlib_buffer_get_current (chain_b);
if (peek_data)
{
- n_bytes_read = svm_fifo_peek (ctx->s->tx_fifo,
- ctx->sp.tx_offset, len_to_deq, data);
+ n_bytes_read =
+ session_tx_copy_data_tail (wrk, ctx, b, len_to_deq, data);
ctx->sp.tx_offset += n_bytes_read;
}
else
}
always_inline void
-session_tx_fill_buffer (vlib_main_t * vm, session_tx_context_t * ctx,
- vlib_buffer_t * b, u16 * n_bufs, u8 peek_data)
+session_tx_fill_buffer (session_worker_t *wrk, session_tx_context_t *ctx,
+ vlib_buffer_t *b, u16 *n_bufs, u8 peek_data)
{
u32 len_to_deq;
u8 *data0;
int n_bytes_read;
-
/*
* Start with the first buffer in chain
*/
if (peek_data)
{
- n_bytes_read = svm_fifo_peek (ctx->s->tx_fifo, ctx->sp.tx_offset,
- len_to_deq, data0);
+ n_bytes_read = session_tx_copy_data (wrk, ctx, b, len_to_deq, data0);
ASSERT (n_bytes_read > 0);
/* Keep track of progress locally, transport is also supposed to
* increment it independently when pushing the header */
ASSERT (n_bytes_read > 0);
}
}
+
b->current_length = n_bytes_read;
ctx->left_to_snd -= n_bytes_read;
* Fill in the remaining buffers in the chain, if any
*/
if (PREDICT_FALSE (ctx->n_bufs_per_seg > 1 && ctx->left_to_snd))
- session_tx_fifo_chain_tail (vm, ctx, b, n_bufs, peek_data);
+ session_tx_fifo_chain_tail (wrk, ctx, b, n_bufs, peek_data);
}
always_inline u8
svm_fifo_unset_event (s->tx_fifo);
if (svm_fifo_max_dequeue_cons (s->tx_fifo) > ctx->sp.tx_offset)
- if (svm_fifo_set_event (s->tx_fifo))
- session_evt_add_head_old (wrk, elt);
+ {
+ if (svm_fifo_set_event (s->tx_fifo))
+ session_evt_add_head_old (wrk, elt);
+ }
+ else
+ {
+ transport_connection_deschedule (ctx->tc);
+ }
+}
+
+always_inline void
+session_tx_add_pending_buffer (session_worker_t *wrk, u32 bi, u32 next_index)
+{
+ if (PREDICT_TRUE (!wrk->dma_enabled))
+ {
+ vec_add1 (wrk->pending_tx_buffers, bi);
+ vec_add1 (wrk->pending_tx_nexts, next_index);
+ }
+ else
+ {
+ session_dma_transfer *dma_transfer = &wrk->dma_trans[wrk->trans_tail];
+ vec_add1 (dma_transfer->pending_tx_buffers, bi);
+ vec_add1 (dma_transfer->pending_tx_nexts, next_index);
+ }
}
always_inline int
}
}
+ /* Connection previously descheduled because it had no data to send.
+ * Clear descheduled flag and reset pacer if in use */
+ if (transport_connection_is_descheduled (ctx->tc))
+ transport_connection_clear_descheduled (ctx->tc);
+
transport_connection_snd_params (ctx->tc, &ctx->sp);
if (!ctx->sp.snd_space)
ctx->left_to_snd = ctx->max_len_to_snd;
n_left = ctx->n_segs_per_evt;
+ vec_validate (ctx->transport_pending_bufs, n_left);
+
while (n_left >= 4)
{
vlib_buffer_t *b0, *b1;
b0 = vlib_get_buffer (vm, bi0);
b1 = vlib_get_buffer (vm, bi1);
- session_tx_fill_buffer (vm, ctx, b0, &n_bufs, peek_data);
- session_tx_fill_buffer (vm, ctx, b1, &n_bufs, peek_data);
-
- ctx->transport_vft->push_header (ctx->tc, b0);
- ctx->transport_vft->push_header (ctx->tc, b1);
+ session_tx_fill_buffer (wrk, ctx, b0, &n_bufs, peek_data);
+ session_tx_fill_buffer (wrk, ctx, b1, &n_bufs, peek_data);
+ ctx->transport_pending_bufs[ctx->n_segs_per_evt - n_left] = b0;
+ ctx->transport_pending_bufs[ctx->n_segs_per_evt - n_left + 1] = b1;
n_left -= 2;
- vec_add1 (wrk->pending_tx_buffers, bi0);
- vec_add1 (wrk->pending_tx_buffers, bi1);
- vec_add1 (wrk->pending_tx_nexts, next_index);
- vec_add1 (wrk->pending_tx_nexts, next_index);
+ session_tx_add_pending_buffer (wrk, bi0, next_index);
+ session_tx_add_pending_buffer (wrk, bi1, next_index);
}
while (n_left)
{
bi0 = ctx->tx_buffers[--n_bufs];
b0 = vlib_get_buffer (vm, bi0);
- session_tx_fill_buffer (vm, ctx, b0, &n_bufs, peek_data);
-
- /* Ask transport to push header after current_length and
- * total_length_not_including_first_buffer are updated */
- ctx->transport_vft->push_header (ctx->tc, b0);
+ session_tx_fill_buffer (wrk, ctx, b0, &n_bufs, peek_data);
+ ctx->transport_pending_bufs[ctx->n_segs_per_evt - n_left] = b0;
n_left -= 1;
- vec_add1 (wrk->pending_tx_buffers, bi0);
- vec_add1 (wrk->pending_tx_nexts, next_index);
+ session_tx_add_pending_buffer (wrk, bi0, next_index);
}
+ /* Ask transport to push headers */
+ ctx->transport_vft->push_header (ctx->tc, ctx->transport_pending_bufs,
+ ctx->n_segs_per_evt);
+
if (PREDICT_FALSE ((n_trace = vlib_get_trace_count (vm, node)) > 0))
- session_tx_trace_frame (vm, node, next_index, wrk->pending_tx_buffers,
+ session_tx_trace_frame (vm, node, next_index, ctx->transport_pending_bufs,
ctx->n_segs_per_evt, ctx->s, n_trace);
if (PREDICT_FALSE (n_bufs))
*n_tx_packets += ctx->n_segs_per_evt;
SESSION_EVT (SESSION_EVT_DEQ, ctx->s, ctx->max_len_to_snd, ctx->max_dequeue,
- ctx->s->tx_fifo->has_event, wrk->last_vlib_time);
+ ctx->s->tx_fifo->shr->has_event, wrk->last_vlib_time);
ASSERT (ctx->left_to_snd == 0);
/* Clear custom-tx flag used to request reschedule for tx */
s->flags &= ~SESSION_F_CUSTOM_TX;
+ sp->flags = 0;
+ sp->bytes_dequeued = 0;
sp->max_burst_size = clib_min (SESSION_NODE_FRAME_SIZE - *n_tx_packets,
TRANSPORT_PACER_MAX_BURST_PKTS);
session_evt_add_head_old (wrk, elt);
}
- if (sp->max_burst_size &&
- svm_fifo_needs_deq_ntf (s->tx_fifo, sp->max_burst_size))
+ if (sp->bytes_dequeued &&
+ svm_fifo_needs_deq_ntf (s->tx_fifo, sp->bytes_dequeued))
session_dequeue_notify (s);
return n_packets;
session_transport_reset (s);
break;
case SESSION_CTRL_EVT_LISTEN:
- session_mq_listen_handler (session_evt_ctrl_data (wrk, elt));
+ session_mq_listen_handler (wrk, elt);
break;
case SESSION_CTRL_EVT_LISTEN_URI:
- session_mq_listen_uri_handler (session_evt_ctrl_data (wrk, elt));
+ session_mq_listen_uri_handler (wrk, elt);
break;
case SESSION_CTRL_EVT_UNLISTEN:
session_mq_unlisten_handler (wrk, elt);
session_mq_connect_handler (wrk, elt);
break;
case SESSION_CTRL_EVT_CONNECT_URI:
- session_mq_connect_uri_handler (session_evt_ctrl_data (wrk, elt));
+ session_mq_connect_uri_handler (wrk, elt);
break;
case SESSION_CTRL_EVT_SHUTDOWN:
session_mq_shutdown_handler (session_evt_ctrl_data (wrk, elt));
session_mq_disconnected_handler (session_evt_ctrl_data (wrk, elt));
break;
case SESSION_CTRL_EVT_ACCEPTED_REPLY:
- session_mq_accepted_reply_handler (session_evt_ctrl_data (wrk, elt));
+ session_mq_accepted_reply_handler (wrk, elt);
break;
case SESSION_CTRL_EVT_DISCONNECTED_REPLY:
session_mq_disconnected_reply_handler (session_evt_ctrl_data (wrk,
session_mq_worker_update_handler (session_evt_ctrl_data (wrk, elt));
break;
case SESSION_CTRL_EVT_APP_DETACH:
- app_mq_detach_handler (session_evt_ctrl_data (wrk, elt));
+ app_mq_detach_handler (wrk, elt);
break;
case SESSION_CTRL_EVT_APP_WRK_RPC:
session_mq_app_wrk_rpc_handler (session_evt_ctrl_data (wrk, elt));
clib_warning ("unhandled event type %d", e->event_type);
}
- SESSION_EVT (SESSION_IO_EVT_COUNTS, e->event_type, 1, wrk);
+ SESSION_EVT (SESSION_EVT_IO_EVT_COUNTS, e->event_type, 1, wrk);
/* Regrab elements in case pool moved */
elt = clib_llist_elt (wrk->event_elts, ei);
};
/* *INDENT-ON* */
+always_inline void
+session_update_time_subscribers (session_main_t *smm, clib_time_type_t now,
+ u32 thread_index)
+{
+ session_update_time_fn *fn;
+
+ vec_foreach (fn, smm->update_time_fns)
+ (*fn) (now, thread_index);
+}
+
always_inline void
session_evt_add_to_list (session_worker_t * wrk, session_event_t * evt)
{
/*
* Update transport time
*/
- transport_update_time (wrk->last_vlib_time, thread_index);
+ session_update_time_subscribers (smm, wrk->last_vlib_time, thread_index);
n_tx_packets = vec_len (wrk->pending_tx_buffers);
SESSION_EVT (SESSION_EVT_DSP_CNTRS, UPDATE_TIME, wrk);
+ if (PREDICT_FALSE (wrk->dma_enabled))
+ {
+ if (wrk->trans_head == ((wrk->trans_tail + 1) & (wrk->trans_size - 1)))
+ return 0;
+ wrk->batch = vlib_dma_batch_new (vm, wrk->config_index);
+ }
+
/*
* Dequeue new internal mq events
*/
};
}
+ if (PREDICT_FALSE (wrk->dma_enabled))
+ {
+ if (wrk->batch_num)
+ {
+ vlib_dma_batch_set_cookie (vm, wrk->batch, wrk->trans_tail);
+ wrk->batch_num = 0;
+ wrk->trans_tail++;
+ if (wrk->trans_tail == wrk->trans_size)
+ wrk->trans_tail = 0;
+ }
+
+ vlib_dma_batch_submit (vm, wrk->batch);
+ }
+
SESSION_EVT (SESSION_EVT_DSP_CNTRS, OLD_IO_EVTS, wrk);
if (vec_len (wrk->pending_tx_buffers))
}
/* *INDENT-OFF* */
-VLIB_REGISTER_NODE (session_queue_node) =
-{
+VLIB_REGISTER_NODE (session_queue_node) = {
.function = session_queue_node_fn,
.flags = VLIB_NODE_FLAG_TRACE_SUPPORTED,
.name = "session-queue",
.format_trace = format_session_queue_trace,
.type = VLIB_NODE_TYPE_INPUT,
- .n_errors = ARRAY_LEN (session_queue_error_strings),
- .error_strings = session_queue_error_strings,
+ .n_errors = SESSION_QUEUE_N_ERROR,
+ .error_counters = session_error_counters,
.state = VLIB_NODE_STATE_DISABLED,
};
/* *INDENT-ON* */