};
always_inline int
-session_fifo_rx_i (vlib_main_t * vm, vlib_node_runtime_t * node,
- session_manager_main_t * smm, session_fifo_event_t * e0,
- stream_session_t * s0, u32 thread_index, int *n_tx_packets,
- u8 peek_data)
+session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node,
+ session_manager_main_t * smm,
+ session_fifo_event_t * e0,
+ stream_session_t * s0, u32 thread_index,
+ int *n_tx_packets, u8 peek_data)
{
u32 n_trace = vlib_get_trace_count (vm, node);
u32 left_to_snd0, max_len_to_snd0, len_to_deq0, n_bufs, snd_space0;
snd_space0 = transport_vft->send_space (tc0);
snd_mss0 = transport_vft->send_mss (tc0);
+ /* Can't make any progress */
if (snd_space0 == 0 || svm_fifo_max_dequeue (s0->server_tx_fifo) == 0
|| snd_mss0 == 0)
- return 0;
+ {
+ vec_add1 (smm->evts_partially_read[thread_index], *e0);
+ return 0;
+ }
ASSERT (e0->enqueue_length > 0);
if (peek_data)
{
/* Offset in rx fifo from where to peek data */
- rx_offset = transport_vft->rx_fifo_offset (tc0);
+ rx_offset = transport_vft->tx_fifo_offset (tc0);
}
/* TODO check if transport is willing to send len_to_snd0
if (PREDICT_FALSE (n_bufs < 0.9 * VLIB_FRAME_SIZE))
{
/* Keep track of how much we've dequeued and exit */
- e0->enqueue_length -= max_len_to_snd0 - left_to_snd0;
+ if (left_to_snd0 != max_len_to_snd0)
+ {
+ e0->enqueue_length -= max_len_to_snd0 - left_to_snd0;
+ vec_add1 (smm->evts_partially_read[thread_index], *e0);
+ }
+
return -1;
}
t0->server_thread_index = s0->thread_index;
}
+ len_to_deq0 = (left_to_snd0 < snd_mss0) ? left_to_snd0 : snd_mss0;
+
+ /* *INDENT-OFF* */
if (1)
{
- ELOG_TYPE_DECLARE (e) =
- {
- .format = "evt-dequeue: id %d length %d",.format_args =
- "i4i4",};
+ ELOG_TYPE_DECLARE (e) = {
+ .format = "evt-deq: id %d len %d rd %d wnd %d",
+ .format_args = "i4i4i4i4",
+ };
struct
{
- u32 data[2];
+ u32 data[4];
} *ed;
ed = ELOG_DATA (&vm->elog_main, e);
ed->data[0] = e0->event_id;
ed->data[1] = e0->enqueue_length;
+ ed->data[2] = len_to_deq0;
+ ed->data[3] = left_to_snd0;
}
-
- len_to_deq0 = (left_to_snd0 < snd_mss0) ? left_to_snd0 : snd_mss0;
+ /* *INDENT-ON* */
/* Make room for headers */
data0 = vlib_buffer_make_headroom (b0, MAX_HDRS_LEN);
}
int
-session_fifo_rx_peek (vlib_main_t * vm, vlib_node_runtime_t * node,
- session_manager_main_t * smm, session_fifo_event_t * e0,
- stream_session_t * s0, u32 thread_index, int *n_tx_pkts)
+session_tx_fifo_peek_and_snd (vlib_main_t * vm, vlib_node_runtime_t * node,
+ session_manager_main_t * smm,
+ session_fifo_event_t * e0,
+ stream_session_t * s0, u32 thread_index,
+ int *n_tx_pkts)
{
- return session_fifo_rx_i (vm, node, smm, e0, s0, thread_index, n_tx_pkts,
- 1);
+ return session_tx_fifo_read_and_snd_i (vm, node, smm, e0, s0, thread_index,
+ n_tx_pkts, 1);
}
int
-session_fifo_rx_dequeue (vlib_main_t * vm, vlib_node_runtime_t * node,
- session_manager_main_t * smm,
- session_fifo_event_t * e0, stream_session_t * s0,
- u32 thread_index, int *n_tx_pkts)
+session_tx_fifo_dequeue_and_snd (vlib_main_t * vm, vlib_node_runtime_t * node,
+ session_manager_main_t * smm,
+ session_fifo_event_t * e0,
+ stream_session_t * s0, u32 thread_index,
+ int *n_tx_pkts)
{
- return session_fifo_rx_i (vm, node, smm, e0, s0, thread_index, n_tx_pkts,
- 0);
+ return session_tx_fifo_read_and_snd_i (vm, node, smm, e0, s0, thread_index,
+ n_tx_pkts, 0);
}
static uword
{
session_manager_main_t *smm = vnet_get_session_manager_main ();
session_fifo_event_t *my_fifo_events, *e;
- u32 n_to_dequeue;
+ u32 n_to_dequeue, n_events;
unix_shared_memory_queue_t *q;
int n_tx_packets = 0;
u32 my_thread_index = vm->cpu_index;
/* min number of events we can dequeue without blocking */
n_to_dequeue = q->cursize;
- if (n_to_dequeue == 0)
- return 0;
-
my_fifo_events = smm->fifo_events[my_thread_index];
- /* If we didn't manage to process previous events try going
+ if (n_to_dequeue == 0 && vec_len (my_fifo_events) == 0)
+ return 0;
+
+ /*
+ * If we didn't manage to process previous events try going
* over them again without dequeuing new ones.
- * XXX: Block senders to sessions that can't keep up */
+ */
+ /* XXX: Block senders to sessions that can't keep up */
if (vec_len (my_fifo_events) >= 100)
goto skip_dequeue;
smm->fifo_events[my_thread_index] = my_fifo_events;
skip_dequeue:
-
- for (i = 0; i < n_to_dequeue; i++)
+ n_events = vec_len (my_fifo_events);
+ for (i = 0; i < n_events; i++)
{
svm_fifo_t *f0; /* $$$ prefetch 1 ahead maybe */
stream_session_t *s0;
/* $$$ add multiple event queues, per vpp worker thread */
ASSERT (server_thread_index0 == my_thread_index);
- s0 = pool_elt_at_index (smm->sessions[my_thread_index],
- server_session_index0);
+ s0 = stream_session_get_if_valid (server_session_index0,
+ my_thread_index);
+
+ if (CLIB_DEBUG && !s0)
+ {
+ clib_warning ("It's dead, Jim!");
+ continue;
+ }
+
+ if (PREDICT_FALSE (s0->session_state == SESSION_STATE_CLOSED))
+ continue;
ASSERT (s0->thread_index == my_thread_index);
done:
/* Couldn't process all events. Probably out of buffers */
- if (PREDICT_FALSE (i < n_to_dequeue))
+ if (PREDICT_FALSE (i < n_events))
{
session_fifo_event_t *partially_read =
smm->evts_partially_read[my_thread_index];
- vec_add (partially_read, &my_fifo_events[i], n_to_dequeue - i);
+ vec_add (partially_read, &my_fifo_events[i], n_events - i);
vec_free (my_fifo_events);
smm->fifo_events[my_thread_index] = partially_read;
smm->evts_partially_read[my_thread_index] = 0;
.n_errors = ARRAY_LEN (session_queue_error_strings),
.error_strings = session_queue_error_strings,
.n_next_nodes = SESSION_QUEUE_N_NEXT,
- /* .state = VLIB_NODE_STATE_DISABLED, enable on-demand? */
- /* edit / add dispositions here */
+ .state = VLIB_NODE_STATE_DISABLED,
.next_nodes =
{
[SESSION_QUEUE_NEXT_DROP] = "error-drop",