return rv;
/* Create the session */
- pool_get (smm->sessions[thread_index], s);
+ pool_get_aligned (smm->sessions[thread_index], s, CLIB_CACHE_LINE_BYTES);
memset (s, 0, sizeof (*s));
/* Initialize backpointers */
return 0;
}
+/** Enqueue buffer chain tail */
+always_inline int
+session_enqueue_chain_tail (stream_session_t * s, vlib_buffer_t * b,
+ u32 offset, u8 is_in_order)
+{
+ vlib_buffer_t *chain_b;
+ u32 chain_bi = b->next_buffer;
+ vlib_main_t *vm = vlib_get_main ();
+ u8 *data, len;
+ u16 written = 0;
+ int rv = 0;
+
+ do
+ {
+ chain_b = vlib_get_buffer (vm, chain_bi);
+ data = vlib_buffer_get_current (chain_b);
+ len = chain_b->current_length;
+ if (is_in_order)
+ {
+ rv = svm_fifo_enqueue_nowait (s->server_rx_fifo, len, data);
+ if (rv < len)
+ {
+ return (rv > 0) ? (written + rv) : written;
+ }
+ written += rv;
+ }
+ else
+ {
+ rv = svm_fifo_enqueue_with_offset (s->server_rx_fifo, offset, len,
+ data);
+ if (rv)
+ return -1;
+ offset += len;
+ }
+ }
+ while ((chain_bi = (chain_b->flags & VLIB_BUFFER_NEXT_PRESENT)
+ ? chain_b->next_buffer : 0));
+
+ if (is_in_order)
+ return written;
+
+ return 0;
+}
+
/*
* Enqueue data for delivery to session peer. Does not notify peer of enqueue
* event but on request can queue notification events for later delivery by
* calling stream_server_flush_enqueue_events().
*
* @param tc Transport connection which is to be enqueued data
- * @param data Data to be enqueued
- * @param len Length of data to be enqueued
+ * @param b Buffer to be enqueued
+ * @param offset Offset at which to start enqueueing if out-of-order
* @param queue_event Flag to indicate if peer is to be notified or if event
* is to be queued. The former is useful when more data is
* enqueued and only one event is to be generated.
+ * @param is_in_order Flag to indicate if data is in order
* @return Number of bytes enqueued or a negative value if enqueueing failed.
*/
int
-stream_session_enqueue_data (transport_connection_t * tc, u8 * data, u16 len,
- u8 queue_event)
+stream_session_enqueue_data (transport_connection_t * tc, vlib_buffer_t * b,
+ u32 offset, u8 queue_event, u8 is_in_order)
{
stream_session_t *s;
- int enqueued;
+ int enqueued = 0, rv;
s = stream_session_get (tc->s_index, tc->thread_index);
- /* Make sure there's enough space left. We might've filled the pipes */
- if (PREDICT_FALSE (len > svm_fifo_max_enqueue (s->server_rx_fifo)))
- return -1;
-
- enqueued = svm_fifo_enqueue_nowait (s->server_rx_fifo, len, data);
+ if (is_in_order)
+ {
+ enqueued =
+ svm_fifo_enqueue_nowait (s->server_rx_fifo, b->current_length,
+ vlib_buffer_get_current (b));
+ if (PREDICT_FALSE
+ ((b->flags & VLIB_BUFFER_NEXT_PRESENT) && enqueued > 0))
+ {
+ rv = session_enqueue_chain_tail (s, b, 0, 1);
+ if (rv <= 0)
+ return enqueued;
+ enqueued += rv;
+ }
+ }
+ else
+ {
+ rv = svm_fifo_enqueue_with_offset (s->server_rx_fifo, offset,
+ b->current_length,
+ vlib_buffer_get_current (b));
+ if (PREDICT_FALSE ((b->flags & VLIB_BUFFER_NEXT_PRESENT) && !rv))
+ rv = session_enqueue_chain_tail (s, b, offset + b->current_length, 0);
+ if (rv)
+ return -1;
+ }
if (queue_event)
{
}
}
- return enqueued;
+ if (is_in_order)
+ return enqueued;
+
+ return 0;
}
/** Check if we have space in rx fifo to push more bytes */
vec_validate (smm->sessions, num_threads - 1);
vec_validate (smm->session_indices_to_enqueue_by_thread, num_threads - 1);
vec_validate (smm->tx_buffers, num_threads - 1);
- vec_validate (smm->fifo_events, num_threads - 1);
- vec_validate (smm->evts_partially_read, num_threads - 1);
+ vec_validate (smm->pending_event_vector, num_threads - 1);
+ vec_validate (smm->free_event_vector, num_threads - 1);
vec_validate (smm->current_enqueue_epoch, num_threads - 1);
vec_validate (smm->vpp_event_queues, num_threads - 1);
+ for (i = 0; i < num_threads; i++)
+ {
+ vec_validate (smm->free_event_vector[i], 0);
+ _vec_len (smm->free_event_vector[i]) = 0;
+ vec_validate (smm->pending_event_vector[i], 0);
+ _vec_len (smm->pending_event_vector[i]) = 0;
+ }
+
#if SESSION_DBG
vec_validate (smm->last_event_poll_by_thread, num_threads - 1);
#endif
for (i = 0; i < 200000; i++)
{
stream_session_t *ss;
- pool_get (smm->sessions[0], ss);
+ pool_get_aligned (smm->sessions[0], ss, CLIB_CACHE_LINE_BYTES);
memset (ss, 0, sizeof (*ss));
}