return 0;
}
-/**
- * Allocate vpp event queue (once) per worker thread
- */
-void
-session_vpp_event_queue_allocate (session_manager_main_t * smm,
- u32 thread_index)
-{
- api_main_t *am = &api_main;
- void *oldheap;
-
- if (smm->vpp_event_queues[thread_index] == 0)
- {
- /* Allocate event fifo in the /vpe-api shared-memory segment */
- oldheap = svm_push_data_heap (am->vlib_rp);
-
- smm->vpp_event_queues[thread_index] =
- unix_shared_memory_queue_init (2048 /* nels $$$$ config */ ,
- sizeof (session_fifo_event_t),
- 0 /* consumer pid */ ,
- 0
- /* (do not) send signal when queue non-empty */
- );
-
- svm_pop_heap (oldheap);
- }
-}
-
int
stream_session_create_i (segment_manager_t * sm, transport_connection_t * tc,
stream_session_t ** ret_s)
return rv;
/* Create the session */
- pool_get (smm->sessions[thread_index], s);
+ pool_get_aligned (smm->sessions[thread_index], s, CLIB_CACHE_LINE_BYTES);
memset (s, 0, sizeof (*s));
/* Initialize backpointers */
pool_index = s - smm->sessions[thread_index];
- server_rx_fifo->server_session_index = pool_index;
- server_rx_fifo->server_thread_index = thread_index;
+ server_rx_fifo->master_session_index = pool_index;
+ server_rx_fifo->master_thread_index = thread_index;
- server_tx_fifo->server_session_index = pool_index;
- server_tx_fifo->server_thread_index = thread_index;
+ server_tx_fifo->master_session_index = pool_index;
+ server_tx_fifo->master_thread_index = thread_index;
s->server_rx_fifo = server_rx_fifo;
s->server_tx_fifo = server_tx_fifo;
return 0;
}
+/** Enqueue buffer chain tail */
+always_inline int
+session_enqueue_chain_tail (stream_session_t * s, vlib_buffer_t * b,
+ u32 offset, u8 is_in_order)
+{
+ vlib_buffer_t *chain_b;
+ u32 chain_bi = b->next_buffer;
+ vlib_main_t *vm = vlib_get_main ();
+ u8 *data, len;
+ u16 written = 0;
+ int rv = 0;
+
+ do
+ {
+ chain_b = vlib_get_buffer (vm, chain_bi);
+ data = vlib_buffer_get_current (chain_b);
+ len = chain_b->current_length;
+ if (is_in_order)
+ {
+ rv = svm_fifo_enqueue_nowait (s->server_rx_fifo, len, data);
+ if (rv < len)
+ {
+ return (rv > 0) ? (written + rv) : written;
+ }
+ written += rv;
+ }
+ else
+ {
+ rv = svm_fifo_enqueue_with_offset (s->server_rx_fifo, offset, len,
+ data);
+ if (rv)
+ return -1;
+ offset += len;
+ }
+ }
+ while ((chain_bi = (chain_b->flags & VLIB_BUFFER_NEXT_PRESENT)
+ ? chain_b->next_buffer : 0));
+
+ if (is_in_order)
+ return written;
+
+ return 0;
+}
+
/*
* Enqueue data for delivery to session peer. Does not notify peer of enqueue
* event but on request can queue notification events for later delivery by
* calling stream_server_flush_enqueue_events().
*
* @param tc Transport connection which is to be enqueued data
- * @param data Data to be enqueued
- * @param len Length of data to be enqueued
+ * @param b Buffer to be enqueued
+ * @param offset Offset at which to start enqueueing if out-of-order
* @param queue_event Flag to indicate if peer is to be notified or if event
* is to be queued. The former is useful when more data is
* enqueued and only one event is to be generated.
+ * @param is_in_order Flag to indicate if data is in order
* @return Number of bytes enqueued or a negative value if enqueueing failed.
*/
int
-stream_session_enqueue_data (transport_connection_t * tc, u8 * data, u16 len,
- u8 queue_event)
+stream_session_enqueue_data (transport_connection_t * tc, vlib_buffer_t * b,
+ u32 offset, u8 queue_event, u8 is_in_order)
{
stream_session_t *s;
- int enqueued;
+ int enqueued = 0, rv;
s = stream_session_get (tc->s_index, tc->thread_index);
- /* Make sure there's enough space left. We might've filled the pipes */
- if (PREDICT_FALSE (len > svm_fifo_max_enqueue (s->server_rx_fifo)))
- return -1;
-
- enqueued = svm_fifo_enqueue_nowait (s->server_rx_fifo, s->pid, len, data);
+ if (is_in_order)
+ {
+ enqueued =
+ svm_fifo_enqueue_nowait (s->server_rx_fifo, b->current_length,
+ vlib_buffer_get_current (b));
+ if (PREDICT_FALSE
+ ((b->flags & VLIB_BUFFER_NEXT_PRESENT) && enqueued > 0))
+ {
+ rv = session_enqueue_chain_tail (s, b, 0, 1);
+ if (rv <= 0)
+ return enqueued;
+ enqueued += rv;
+ }
+ }
+ else
+ {
+ rv = svm_fifo_enqueue_with_offset (s->server_rx_fifo, offset,
+ b->current_length,
+ vlib_buffer_get_current (b));
+ if (PREDICT_FALSE ((b->flags & VLIB_BUFFER_NEXT_PRESENT) && !rv))
+ rv = session_enqueue_chain_tail (s, b, offset + b->current_length, 0);
+ if (rv)
+ return -1;
+ }
if (queue_event)
{
}
}
- return enqueued;
+ if (is_in_order)
+ return enqueued;
+
+ return 0;
}
/** Check if we have space in rx fifo to push more bytes */
u32 offset, u32 max_bytes)
{
stream_session_t *s = stream_session_get (tc->s_index, tc->thread_index);
- return svm_fifo_peek (s->server_tx_fifo, s->pid, offset, max_bytes, buffer);
+ return svm_fifo_peek (s->server_tx_fifo, offset, max_bytes, buffer);
}
u32
stream_session_dequeue_drop (transport_connection_t * tc, u32 max_bytes)
{
stream_session_t *s = stream_session_get (tc->s_index, tc->thread_index);
- return svm_fifo_dequeue_drop (s->server_tx_fifo, s->pid, max_bytes);
+ return svm_fifo_dequeue_drop (s->server_tx_fifo, max_bytes);
}
/**
{
/* Fabricate event */
evt.fifo = s->server_rx_fifo;
- evt.event_type = FIFO_EVENT_SERVER_RX;
+ evt.event_type = FIFO_EVENT_APP_RX;
evt.event_id = serial_number++;
/* Add event to server's event queue */
return errors;
}
+/**
+ * Init fifo tail and head pointers
+ *
+ * Useful if transport uses absolute offsets for tracking ooo segments.
+ */
+void
+stream_session_init_fifos_pointers (transport_connection_t * tc,
+ u32 rx_pointer, u32 tx_pointer)
+{
+ stream_session_t *s;
+ s = stream_session_get (tc->s_index, tc->thread_index);
+ svm_fifo_init_pointers (s->server_rx_fifo, rx_pointer);
+ svm_fifo_init_pointers (s->server_tx_fifo, tx_pointer);
+}
+
void
stream_session_connect_notify (transport_connection_t * tc, u8 sst,
u8 is_fail)
return 0;
}
+void
+session_send_session_evt_to_thread (u64 session_handle,
+ fifo_event_type_t evt_type,
+ u32 thread_index)
+{
+ static u16 serial_number = 0;
+ session_fifo_event_t evt;
+ unix_shared_memory_queue_t *q;
+
+ /* Fabricate event */
+ evt.session_handle = session_handle;
+ evt.event_type = evt_type;
+ evt.event_id = serial_number++;
+
+ q = session_manager_get_vpp_event_queue (thread_index);
+
+ /* Based on request block (or not) for lack of space */
+ if (PREDICT_TRUE (q->cursize < q->maxsize))
+ unix_shared_memory_queue_add (q, (u8 *) & evt,
+ 0 /* do wait for mutex */ );
+ else
+ {
+ clib_warning ("queue full");
+ return;
+ }
+}
+
/**
* Disconnect session and propagate to transport. This should eventually
* result in a delete notification that allows us to cleanup session state.
* Called for both active/passive disconnects.
+ *
+ * Should be called from the session's thread.
*/
void
stream_session_disconnect (stream_session_t * s)
{
-// session_fifo_event_t evt;
-
s->session_state = SESSION_STATE_CLOSED;
- /* RPC to vpp evt queue in the right thread */
-
tp_vfts[s->session_type].close (s->connection_index, s->thread_index);
-
-// {
-// /* Fabricate event */
-// evt.fifo = s->server_rx_fifo;
-// evt.event_type = FIFO_EVENT_SERVER_RX;
-// evt.event_id = serial_number++;
-//
-// /* Based on request block (or not) for lack of space */
-// if (PREDICT_TRUE(q->cursize < q->maxsize))
-// unix_shared_memory_queue_add (app->event_queue, (u8 *) &evt,
-// 0 /* do wait for mutex */);
-// else
-// {
-// clib_warning("fifo full");
-// return -1;
-// }
-// }
}
/**
return &tp_vfts[type];
}
+/**
+ * Allocate vpp event queue (once) per worker thread
+ */
+void
+session_vpp_event_queue_allocate (session_manager_main_t * smm,
+ u32 thread_index)
+{
+ api_main_t *am = &api_main;
+ void *oldheap;
+
+ if (smm->vpp_event_queues[thread_index] == 0)
+ {
+ /* Allocate event fifo in the /vpe-api shared-memory segment */
+ oldheap = svm_push_data_heap (am->vlib_rp);
+
+ smm->vpp_event_queues[thread_index] =
+ unix_shared_memory_queue_init (2048 /* nels $$$$ config */ ,
+ sizeof (session_fifo_event_t),
+ 0 /* consumer pid */ ,
+ 0
+ /* (do not) send signal when queue non-empty */
+ );
+
+ svm_pop_heap (oldheap);
+ }
+}
+
static clib_error_t *
session_manager_main_enable (vlib_main_t * vm)
{
vec_validate (smm->sessions, num_threads - 1);
vec_validate (smm->session_indices_to_enqueue_by_thread, num_threads - 1);
vec_validate (smm->tx_buffers, num_threads - 1);
- vec_validate (smm->fifo_events, num_threads - 1);
- vec_validate (smm->evts_partially_read, num_threads - 1);
+ vec_validate (smm->pending_event_vector, num_threads - 1);
+ vec_validate (smm->free_event_vector, num_threads - 1);
vec_validate (smm->current_enqueue_epoch, num_threads - 1);
vec_validate (smm->vpp_event_queues, num_threads - 1);
+ for (i = 0; i < num_threads; i++)
+ {
+ vec_validate (smm->free_event_vector[i], 0);
+ _vec_len (smm->free_event_vector[i]) = 0;
+ vec_validate (smm->pending_event_vector[i], 0);
+ _vec_len (smm->pending_event_vector[i]) = 0;
+ }
+
#if SESSION_DBG
vec_validate (smm->last_event_poll_by_thread, num_threads - 1);
#endif
for (i = 0; i < 200000; i++)
{
stream_session_t *ss;
- pool_get (smm->sessions[0], ss);
+ pool_get_aligned (smm->sessions[0], ss, CLIB_CACHE_LINE_BYTES);
memset (ss, 0, sizeof (*ss));
}
return 0;
}
+void
+session_node_enable_disable (u8 is_en)
+{
+ u8 state = is_en ? VLIB_NODE_STATE_POLLING : VLIB_NODE_STATE_DISABLED;
+ /* *INDENT-OFF* */
+ foreach_vlib_main (({
+ vlib_node_set_state (this_vlib_main, session_queue_node.index,
+ state);
+ }));
+ /* *INDENT-ON* */
+}
+
clib_error_t *
vnet_session_enable_disable (vlib_main_t * vm, u8 is_en)
{
if (session_manager_main.is_enabled)
return 0;
- vlib_node_set_state (vm, session_queue_node.index,
- VLIB_NODE_STATE_POLLING);
+ session_node_enable_disable (is_en);
return session_manager_main_enable (vm);
}
else
{
session_manager_main.is_enabled = 0;
- vlib_node_set_state (vm, session_queue_node.index,
- VLIB_NODE_STATE_DISABLED);
+ session_node_enable_disable (is_en);
}
return 0;