int
stream_session_create_i (segment_manager_t * sm, transport_connection_t * tc,
- stream_session_t ** ret_s)
+ u8 alloc_fifos, stream_session_t ** ret_s)
{
session_manager_main_t *smm = &session_manager_main;
svm_fifo_t *server_rx_fifo = 0, *server_tx_fifo = 0;
ASSERT (thread_index == vlib_get_thread_index ());
- if ((rv = segment_manager_alloc_session_fifos (sm, &server_rx_fifo,
- &server_tx_fifo,
- &fifo_segment_index)))
- return rv;
-
/* Create the session */
pool_get_aligned (smm->sessions[thread_index], s, CLIB_CACHE_LINE_BYTES);
memset (s, 0, sizeof (*s));
-
- /* Initialize backpointers */
pool_index = s - smm->sessions[thread_index];
- server_rx_fifo->master_session_index = pool_index;
- server_rx_fifo->master_thread_index = thread_index;
- server_tx_fifo->master_session_index = pool_index;
- server_tx_fifo->master_thread_index = thread_index;
+ /* Allocate fifos */
+ if (alloc_fifos)
+ {
+ if ((rv = segment_manager_alloc_session_fifos (sm, &server_rx_fifo,
+ &server_tx_fifo,
+ &fifo_segment_index)))
+ {
+ pool_put (smm->sessions[thread_index], s);
+ return rv;
+ }
+ /* Initialize backpointers */
+ server_rx_fifo->master_session_index = pool_index;
+ server_rx_fifo->master_thread_index = thread_index;
- s->server_rx_fifo = server_rx_fifo;
- s->server_tx_fifo = server_tx_fifo;
+ server_tx_fifo->master_session_index = pool_index;
+ server_tx_fifo->master_thread_index = thread_index;
+
+ s->server_rx_fifo = server_rx_fifo;
+ s->server_tx_fifo = server_tx_fifo;
+ s->svm_segment_index = fifo_segment_index;
+ }
/* Initialize state machine, such as it is... */
s->session_type = session_type_from_proto_and_ip (tc->transport_proto,
tc->is_ip4);
s->session_state = SESSION_STATE_CONNECTING;
- s->svm_segment_index = fifo_segment_index;
s->thread_index = thread_index;
s->session_index = pool_index;
return 0;
}
-/** Enqueue buffer chain tail */
+/**
+ * Discards bytes from buffer chain
+ *
+ * It discards n_bytes_to_drop starting at first buffer after chain_b
+ */
+always_inline void
+session_enqueue_discard_chain_bytes (vlib_main_t * vm, vlib_buffer_t * b,
+ vlib_buffer_t ** chain_b,
+ u32 n_bytes_to_drop)
+{
+ vlib_buffer_t *next = *chain_b;
+ u32 to_drop = n_bytes_to_drop;
+ ASSERT (b->flags & VLIB_BUFFER_NEXT_PRESENT);
+ while (to_drop && (next->flags & VLIB_BUFFER_NEXT_PRESENT))
+ {
+ next = vlib_get_buffer (vm, next->next_buffer);
+ if (next->current_length > to_drop)
+ {
+ vlib_buffer_advance (next, to_drop);
+ to_drop = 0;
+ }
+ else
+ {
+ to_drop -= next->current_length;
+ next->current_length = 0;
+ }
+ }
+ *chain_b = next;
+
+ if (to_drop == 0)
+ b->total_length_not_including_first_buffer -= n_bytes_to_drop;
+}
+
+/**
+ * Enqueue buffer chain tail
+ */
always_inline int
session_enqueue_chain_tail (stream_session_t * s, vlib_buffer_t * b,
u32 offset, u8 is_in_order)
{
vlib_buffer_t *chain_b;
- u32 chain_bi = b->next_buffer;
+ u32 chain_bi, len, diff;
vlib_main_t *vm = vlib_get_main ();
- u8 *data, len;
- u16 written = 0;
+ u8 *data;
+ u32 written = 0;
int rv = 0;
+ if (is_in_order && offset)
+ {
+ diff = offset - b->current_length;
+ if (diff > b->total_length_not_including_first_buffer)
+ return 0;
+ chain_b = b;
+ session_enqueue_discard_chain_bytes (vm, b, &chain_b, diff);
+ chain_bi = vlib_get_buffer_index (vm, chain_b);
+ }
+ else
+ chain_bi = b->next_buffer;
+
do
{
chain_b = vlib_get_buffer (vm, chain_bi);
data = vlib_buffer_get_current (chain_b);
len = chain_b->current_length;
+ if (!len)
+ continue;
if (is_in_order)
{
rv = svm_fifo_enqueue_nowait (s->server_rx_fifo, len, data);
- if (rv < len)
+ if (rv == len)
+ {
+ written += rv;
+ }
+ else if (rv < len)
{
return (rv > 0) ? (written + rv) : written;
}
- written += rv;
+ else if (rv > len)
+ {
+ written += rv;
+
+ /* written more than what was left in chain */
+ if (written > b->total_length_not_including_first_buffer)
+ return written;
+
+ /* drop the bytes that have already been delivered */
+ session_enqueue_discard_chain_bytes (vm, b, &chain_b, rv - len);
+ }
}
else
{
rv = svm_fifo_enqueue_with_offset (s->server_rx_fifo, offset, len,
data);
if (rv)
- return -1;
+ {
+ clib_warning ("failed to enqueue multi-buffer seg");
+ return -1;
+ }
offset += len;
}
}
u32 offset, u8 queue_event, u8 is_in_order)
{
stream_session_t *s;
- int enqueued = 0, rv;
+ int enqueued = 0, rv, in_order_off;
s = stream_session_get (tc->s_index, tc->thread_index);
if (is_in_order)
{
- enqueued =
- svm_fifo_enqueue_nowait (s->server_rx_fifo, b->current_length,
- vlib_buffer_get_current (b));
- if (PREDICT_FALSE
- ((b->flags & VLIB_BUFFER_NEXT_PRESENT) && enqueued > 0))
+ enqueued = svm_fifo_enqueue_nowait (s->server_rx_fifo,
+ b->current_length,
+ vlib_buffer_get_current (b));
+ if (PREDICT_FALSE ((b->flags & VLIB_BUFFER_NEXT_PRESENT)
+ && enqueued >= 0))
{
- rv = session_enqueue_chain_tail (s, b, 0, 1);
- if (rv <= 0)
- return enqueued;
- enqueued += rv;
+ in_order_off = enqueued > b->current_length ? enqueued : 0;
+ rv = session_enqueue_chain_tail (s, b, in_order_off, 1);
+ if (rv > 0)
+ enqueued += rv;
}
}
else
b->current_length,
vlib_buffer_get_current (b));
if (PREDICT_FALSE ((b->flags & VLIB_BUFFER_NEXT_PRESENT) && !rv))
- rv = session_enqueue_chain_tail (s, b, offset + b->current_length, 0);
- if (rv)
- return -1;
+ session_enqueue_chain_tail (s, b, offset + b->current_length, 0);
+ /* if something was enqueued, report even this as success for ooo
+ * segment handling */
+ return rv;
}
if (queue_event)
}
}
- if (is_in_order)
- return enqueued;
-
- return 0;
+ return enqueued;
}
/** Check if we have space in rx fifo to push more bytes */
stream_session_tx_fifo_max_dequeue (transport_connection_t * tc)
{
stream_session_t *s = stream_session_get (tc->s_index, tc->thread_index);
- if (s->session_state != SESSION_STATE_READY)
+ if (!s->server_tx_fifo)
return 0;
return svm_fifo_max_dequeue (s->server_tx_fifo);
}
static u32 serial_number;
if (PREDICT_FALSE (s->session_state == SESSION_STATE_CLOSED))
- return 0;
+ {
+ /* Session is closed so app will never clean up. Flush rx fifo */
+ u32 to_dequeue = svm_fifo_max_dequeue (s->server_rx_fifo);
+ if (to_dequeue)
+ svm_fifo_dequeue_drop (s->server_rx_fifo, to_dequeue);
+ return 0;
+ }
/* Get session's server */
- app = application_get (s->app_index);
+ app = application_get_if_valid (s->app_index);
+
+ if (PREDICT_FALSE (app == 0))
+ {
+ clib_warning ("invalid s->app_index = %d", s->app_index);
+ return 0;
+ }
/* Built-in server? Hand event to the callback... */
if (app->cb_fns.builtin_server_rx_callback)
stream_session_t *s0;
/* Get session */
- s0 = stream_session_get (session_indices_to_enqueue[i], thread_index);
- if (stream_session_enqueue_notify (s0, 0 /* don't block */ ))
+ s0 = stream_session_get_if_valid (session_indices_to_enqueue[i],
+ thread_index);
+ if (s0 == 0 || stream_session_enqueue_notify (s0, 0 /* don't block */ ))
{
errors++;
}
application_t *app;
stream_session_t *new_s = 0;
u64 handle;
- u32 api_context = 0;
+ u32 opaque = 0;
int error = 0;
+ u8 st;
+ st = session_type_from_proto_and_ip (tc->transport_proto, tc->is_ip4);
handle = stream_session_half_open_lookup_handle (&tc->lcl_ip, &tc->rmt_ip,
tc->lcl_port, tc->rmt_port,
- tc->transport_proto);
+ st);
if (handle == HALF_OPEN_LOOKUP_INVALID_VALUE)
{
- clib_warning ("This can't be good!");
+ TCP_DBG ("half-open was removed!");
return -1;
}
- /* Get the app's index from the handle we stored when opening connection */
- app = application_get (handle >> 32);
- api_context = tc->s_index;
+ /* Cleanup half-open table */
+ stream_session_half_open_table_del (tc);
+
+ /* Get the app's index from the handle we stored when opening connection
+ * and the opaque (api_context for external apps) from transport session
+ * index */
+ app = application_get_if_valid (handle >> 32);
+ if (!app)
+ return -1;
+
+ opaque = tc->s_index;
if (!is_fail)
{
segment_manager_t *sm;
+ u8 alloc_fifos;
sm = application_get_connect_segment_manager (app);
-
+ alloc_fifos = application_is_proxy (app);
/* Create new session (svm segments are allocated if needed) */
- if (stream_session_create_i (sm, tc, &new_s))
+ if (stream_session_create_i (sm, tc, alloc_fifos, &new_s))
{
is_fail = 1;
error = -1;
}
/* Notify client application */
- if (app->cb_fns.session_connected_callback (app->index, api_context, new_s,
+ if (app->cb_fns.session_connected_callback (app->index, opaque, new_s,
is_fail))
{
clib_warning ("failed to notify app");
new_s->session_state = SESSION_STATE_READY;
}
- /* Cleanup session lookup */
- stream_session_half_open_table_del (tc);
-
return error;
}
}
/**
- * Cleans up session and associated app if needed.
+ * Cleans up session and lookup table.
*/
void
stream_session_delete (stream_session_t * s)
/**
* Notification from transport that connection is being deleted
*
- * This should be called only on previously fully established sessions. For
- * instance failed connects should call stream_session_connect_notify and
- * indicate that the connect has failed.
+ * This removes the session if it is still valid. It should be called only on
+ * previously fully established sessions. For instance failed connects should
+ * call stream_session_connect_notify and indicate that the connect has
+ * failed.
*/
void
stream_session_delete_notify (transport_connection_t * tc)
/* App might've been removed already */
s = stream_session_get_if_valid (tc->s_index, tc->thread_index);
if (!s)
- {
- return;
- }
+ return;
stream_session_delete (s);
}
server = application_get (listener->app_index);
sm = application_get_listen_segment_manager (server, listener);
- if ((rv = stream_session_create_i (sm, tc, &s)))
+ if ((rv = stream_session_create_i (sm, tc, 1, &s)))
return rv;
s->app_index = server->index;
u32 thread_index)
{
static u16 serial_number = 0;
+ u32 tries = 0;
session_fifo_event_t evt;
unix_shared_memory_queue_t *q;
evt.event_id = serial_number++;
q = session_manager_get_vpp_event_queue (thread_index);
-
- /* Based on request block (or not) for lack of space */
- if (PREDICT_TRUE (q->cursize < q->maxsize))
+ while (unix_shared_memory_queue_add (q, (u8 *) & evt, 1))
{
- if (unix_shared_memory_queue_add (q, (u8 *) & evt,
- 1 /* do wait for mutex */ ))
+ if (tries++ == 3)
{
- clib_warning ("failed to enqueue evt");
+ TCP_DBG ("failed to enqueue evt");
+ break;
}
}
- else
- {
- clib_warning ("queue full");
- return;
- }
}
/**
session_manager_main_t *smm = &session_manager_main;
vlib_thread_main_t *vtm = vlib_get_thread_main ();
u32 num_threads;
+ u32 preallocated_sessions_per_worker;
int i;
num_threads = 1 /* main thread */ + vtm->n_threads;
for (i = 0; i < vec_len (smm->vpp_event_queues); i++)
session_vpp_event_queue_allocate (smm, i);
- /* $$$$ preallocate hack config parameter */
- for (i = 0; i < smm->preallocated_sessions; i++)
+ /* Preallocate sessions */
+ if (smm->preallocated_sessions)
{
- stream_session_t *ss __attribute__ ((unused));
- pool_get_aligned (smm->sessions[0], ss, CLIB_CACHE_LINE_BYTES);
- }
+ if (num_threads == 1)
+ {
+ pool_init_fixed (smm->sessions[0], smm->preallocated_sessions);
+ }
+ else
+ {
+ int j;
+ preallocated_sessions_per_worker =
+ (1.1 * (f64) smm->preallocated_sessions /
+ (f64) (num_threads - 1));
- for (i = 0; i < smm->preallocated_sessions; i++)
- pool_put_index (smm->sessions[0], i);
+ for (j = 1; j < num_threads; j++)
+ {
+ pool_init_fixed (smm->sessions[j],
+ preallocated_sessions_per_worker);
+ }
+ }
+ }
session_lookup_init ();
{
session_manager_main_t *smm = &session_manager_main;
u32 nitems;
+ uword tmp;
while (unformat_check_input (input) != UNFORMAT_END_OF_INPUT)
{
else
clib_warning ("event queue length %d too small, ignored", nitems);
}
- if (unformat (input, "preallocated-sessions %d",
- &smm->preallocated_sessions))
+ else if (unformat (input, "preallocated-sessions %d",
+ &smm->preallocated_sessions))
+ ;
+ else if (unformat (input, "v4-session-table-buckets %d",
+ &smm->configured_v4_session_table_buckets))
+ ;
+ else if (unformat (input, "v4-halfopen-table-buckets %d",
+ &smm->configured_v4_halfopen_table_buckets))
;
+ else if (unformat (input, "v6-session-table-buckets %d",
+ &smm->configured_v6_session_table_buckets))
+ ;
+ else if (unformat (input, "v6-halfopen-table-buckets %d",
+ &smm->configured_v6_halfopen_table_buckets))
+ ;
+ else if (unformat (input, "v4-session-table-memory %U",
+ unformat_memory_size, &tmp))
+ {
+ if (tmp >= 0x100000000)
+ return clib_error_return (0, "memory size %llx (%lld) too large",
+ tmp, tmp);
+ smm->configured_v4_session_table_memory = tmp;
+ }
+ else if (unformat (input, "v4-halfopen-table-memory %U",
+ unformat_memory_size, &tmp))
+ {
+ if (tmp >= 0x100000000)
+ return clib_error_return (0, "memory size %llx (%lld) too large",
+ tmp, tmp);
+ smm->configured_v4_halfopen_table_memory = tmp;
+ }
+ else if (unformat (input, "v6-session-table-memory %U",
+ unformat_memory_size, &tmp))
+ {
+ if (tmp >= 0x100000000)
+ return clib_error_return (0, "memory size %llx (%lld) too large",
+ tmp, tmp);
+ smm->configured_v6_session_table_memory = tmp;
+ }
+ else if (unformat (input, "v6-halfopen-table-memory %U",
+ unformat_memory_size, &tmp))
+ {
+ if (tmp >= 0x100000000)
+ return clib_error_return (0, "memory size %llx (%lld) too large",
+ tmp, tmp);
+ smm->configured_v6_halfopen_table_memory = tmp;
+ }
else
return clib_error_return (0, "unknown input `%U'",
format_unformat_error, input);