int
stream_session_create_i (segment_manager_t * sm, transport_connection_t * tc,
- stream_session_t ** ret_s)
+ u8 alloc_fifos, stream_session_t ** ret_s)
{
session_manager_main_t *smm = &session_manager_main;
svm_fifo_t *server_rx_fifo = 0, *server_tx_fifo = 0;
ASSERT (thread_index == vlib_get_thread_index ());
- if ((rv = segment_manager_alloc_session_fifos (sm, &server_rx_fifo,
- &server_tx_fifo,
- &fifo_segment_index)))
- return rv;
-
/* Create the session */
pool_get_aligned (smm->sessions[thread_index], s, CLIB_CACHE_LINE_BYTES);
memset (s, 0, sizeof (*s));
-
- /* Initialize backpointers */
pool_index = s - smm->sessions[thread_index];
- server_rx_fifo->master_session_index = pool_index;
- server_rx_fifo->master_thread_index = thread_index;
- server_tx_fifo->master_session_index = pool_index;
- server_tx_fifo->master_thread_index = thread_index;
+ /* Allocate fifos */
+ if (alloc_fifos)
+ {
+ if ((rv = segment_manager_alloc_session_fifos (sm, &server_rx_fifo,
+ &server_tx_fifo,
+ &fifo_segment_index)))
+ {
+ pool_put (smm->sessions[thread_index], s);
+ return rv;
+ }
+ /* Initialize backpointers */
+ server_rx_fifo->master_session_index = pool_index;
+ server_rx_fifo->master_thread_index = thread_index;
+
+ server_tx_fifo->master_session_index = pool_index;
+ server_tx_fifo->master_thread_index = thread_index;
- s->server_rx_fifo = server_rx_fifo;
- s->server_tx_fifo = server_tx_fifo;
+ s->server_rx_fifo = server_rx_fifo;
+ s->server_tx_fifo = server_tx_fifo;
+ s->svm_segment_index = fifo_segment_index;
+ }
/* Initialize state machine, such as it is... */
s->session_type = session_type_from_proto_and_ip (tc->transport_proto,
tc->is_ip4);
s->session_state = SESSION_STATE_CONNECTING;
- s->svm_segment_index = fifo_segment_index;
s->thread_index = thread_index;
s->session_index = pool_index;
u32 offset, u8 is_in_order)
{
vlib_buffer_t *chain_b;
- u32 chain_bi = b->next_buffer;
+ u32 chain_bi = b->next_buffer, len;
vlib_main_t *vm = vlib_get_main ();
- u8 *data, len;
+ u8 *data;
u16 written = 0;
int rv = 0;
stream_session_tx_fifo_max_dequeue (transport_connection_t * tc)
{
stream_session_t *s = stream_session_get (tc->s_index, tc->thread_index);
- if (s->session_state != SESSION_STATE_READY)
+ if (!s->server_tx_fifo)
return 0;
return svm_fifo_max_dequeue (s->server_tx_fifo);
}
return 0;
/* Get session's server */
- app = application_get (s->app_index);
+ app = application_get_if_valid (s->app_index);
+
+ if (PREDICT_FALSE (app == 0))
+ {
+ clib_warning ("invalid s->app_index = %d", s->app_index);
+ return 0;
+ }
/* Built-in server? Hand event to the callback... */
if (app->cb_fns.builtin_server_rx_callback)
stream_session_t *s0;
/* Get session */
- s0 = stream_session_get (session_indices_to_enqueue[i], thread_index);
- if (stream_session_enqueue_notify (s0, 0 /* don't block */ ))
+ s0 = stream_session_get_if_valid (session_indices_to_enqueue[i],
+ thread_index);
+ if (s0 == 0 || stream_session_enqueue_notify (s0, 0 /* don't block */ ))
{
errors++;
}
application_t *app;
stream_session_t *new_s = 0;
u64 handle;
- u32 api_context = 0;
+ u32 opaque = 0;
int error = 0;
handle = stream_session_half_open_lookup_handle (&tc->lcl_ip, &tc->rmt_ip,
return -1;
}
- /* Get the app's index from the handle we stored when opening connection */
+ /* Get the app's index from the handle we stored when opening connection
+ * and the opaque (api_context for external apps) from transport session
+ * index*/
app = application_get (handle >> 32);
- api_context = tc->s_index;
+ opaque = tc->s_index;
if (!is_fail)
{
segment_manager_t *sm;
+ u8 alloc_fifos;
sm = application_get_connect_segment_manager (app);
-
+ alloc_fifos = application_is_proxy (app);
/* Create new session (svm segments are allocated if needed) */
- if (stream_session_create_i (sm, tc, &new_s))
+ if (stream_session_create_i (sm, tc, alloc_fifos, &new_s))
{
is_fail = 1;
error = -1;
}
/* Notify client application */
- if (app->cb_fns.session_connected_callback (app->index, api_context, new_s,
+ if (app->cb_fns.session_connected_callback (app->index, opaque, new_s,
is_fail))
{
clib_warning ("failed to notify app");
server = application_get (listener->app_index);
sm = application_get_listen_segment_manager (server, listener);
- if ((rv = stream_session_create_i (sm, tc, &s)))
+ if ((rv = stream_session_create_i (sm, tc, 1, &s)))
return rv;
s->app_index = server->index;
session_manager_main_t *smm = &session_manager_main;
vlib_thread_main_t *vtm = vlib_get_thread_main ();
u32 num_threads;
+ u32 preallocated_sessions_per_worker;
int i;
num_threads = 1 /* main thread */ + vtm->n_threads;
for (i = 0; i < vec_len (smm->vpp_event_queues); i++)
session_vpp_event_queue_allocate (smm, i);
- /* $$$$ preallocate hack config parameter */
- for (i = 0; i < smm->preallocated_sessions; i++)
+ /* Preallocate sessions */
+ if (num_threads == 1)
{
- stream_session_t *ss __attribute__ ((unused));
- pool_get_aligned (smm->sessions[0], ss, CLIB_CACHE_LINE_BYTES);
+ for (i = 0; i < smm->preallocated_sessions; i++)
+ {
+ stream_session_t *ss __attribute__ ((unused));
+ pool_get_aligned (smm->sessions[0], ss, CLIB_CACHE_LINE_BYTES);
+ }
+
+ for (i = 0; i < smm->preallocated_sessions; i++)
+ pool_put_index (smm->sessions[0], i);
}
+ else
+ {
+ int j;
+ preallocated_sessions_per_worker = smm->preallocated_sessions /
+ (num_threads - 1);
- for (i = 0; i < smm->preallocated_sessions; i++)
- pool_put_index (smm->sessions[0], i);
+ for (j = 1; j < num_threads; j++)
+ {
+ for (i = 0; i < preallocated_sessions_per_worker; i++)
+ {
+ stream_session_t *ss __attribute__ ((unused));
+ pool_get_aligned (smm->sessions[j], ss, CLIB_CACHE_LINE_BYTES);
+ }
+ for (i = 0; i < preallocated_sessions_per_worker; i++)
+ pool_put_index (smm->sessions[j], i);
+ }
+ }
session_lookup_init ();
{
session_manager_main_t *smm = &session_manager_main;
u32 nitems;
+ uword tmp;
while (unformat_check_input (input) != UNFORMAT_END_OF_INPUT)
{
else
clib_warning ("event queue length %d too small, ignored", nitems);
}
- if (unformat (input, "preallocated-sessions %d",
- &smm->preallocated_sessions))
+ else if (unformat (input, "preallocated-sessions %d",
+ &smm->preallocated_sessions))
+ ;
+ else if (unformat (input, "v4-session-table-buckets %d",
+ &smm->configured_v4_session_table_buckets))
+ ;
+ else if (unformat (input, "v4-halfopen-table-buckets %d",
+ &smm->configured_v4_halfopen_table_buckets))
+ ;
+ else if (unformat (input, "v6-session-table-buckets %d",
+ &smm->configured_v6_session_table_buckets))
+ ;
+ else if (unformat (input, "v6-halfopen-table-buckets %d",
+ &smm->configured_v6_halfopen_table_buckets))
;
+ else if (unformat (input, "v4-session-table-memory %U",
+ unformat_memory_size, &tmp))
+ {
+ if (tmp >= 0x100000000)
+ return clib_error_return (0, "memory size %llx (%lld) too large",
+ tmp, tmp);
+ smm->configured_v4_session_table_memory = tmp;
+ }
+ else if (unformat (input, "v4-halfopen-table-memory %U",
+ unformat_memory_size, &tmp))
+ {
+ if (tmp >= 0x100000000)
+ return clib_error_return (0, "memory size %llx (%lld) too large",
+ tmp, tmp);
+ smm->configured_v4_halfopen_table_memory = tmp;
+ }
+ else if (unformat (input, "v6-session-table-memory %U",
+ unformat_memory_size, &tmp))
+ {
+ if (tmp >= 0x100000000)
+ return clib_error_return (0, "memory size %llx (%lld) too large",
+ tmp, tmp);
+ smm->configured_v6_session_table_memory = tmp;
+ }
+ else if (unformat (input, "v6-halfopen-table-memory %U",
+ unformat_memory_size, &tmp))
+ {
+ if (tmp >= 0x100000000)
+ return clib_error_return (0, "memory size %llx (%lld) too large",
+ tmp, tmp);
+ smm->configured_v6_halfopen_table_memory = tmp;
+ }
else
return clib_error_return (0, "unknown input `%U'",
format_unformat_error, input);