stream_session_no_space (transport_connection_t * tc, u32 thread_index,
u16 data_len)
{
- stream_session_t *s = stream_session_get (tc->c_index, thread_index);
+ stream_session_t *s = stream_session_get (tc->s_index, thread_index);
if (PREDICT_FALSE (s->session_state != SESSION_STATE_READY))
return 1;
}
u32
+stream_session_tx_fifo_max_dequeue (transport_connection_t * tc)
+{
+ stream_session_t *s = stream_session_get (tc->s_index, tc->thread_index);
+ if (s->session_state != SESSION_STATE_READY)
+ return 0;
+ return svm_fifo_max_dequeue (s->server_tx_fifo);
+}
+
+int
stream_session_peek_bytes (transport_connection_t * tc, u8 * buffer,
u32 offset, u32 max_bytes)
{
svm_fifo_init_pointers (s->server_tx_fifo, tx_pointer);
}
-void
+int
stream_session_connect_notify (transport_connection_t * tc, u8 sst,
u8 is_fail)
{
stream_session_t *new_s = 0;
u64 handle;
u32 api_context = 0;
+ int error = 0;
handle = stream_session_half_open_lookup (smm, &tc->lcl_ip, &tc->rmt_ip,
tc->lcl_port, tc->rmt_port,
if (handle == HALF_OPEN_LOOKUP_INVALID_VALUE)
{
clib_warning ("This can't be good!");
- return;
+ return -1;
}
/* Get the app's index from the handle we stored when opening connection */
/* Create new session (svm segments are allocated if needed) */
if (stream_session_create_i (sm, tc, &new_s))
- return;
-
- new_s->app_index = app->index;
+ {
+ is_fail = 1;
+ error = -1;
+ }
+ else
+ new_s->app_index = app->index;
}
/* Notify client */
/* Cleanup session lookup */
stream_session_half_open_table_del (smm, sst, tc);
+
+ return error;
}
void
/* Based on request block (or not) for lack of space */
if (PREDICT_TRUE (q->cursize < q->maxsize))
- unix_shared_memory_queue_add (q, (u8 *) & evt,
- 0 /* do wait for mutex */ );
+ {
+ if (unix_shared_memory_queue_add (q, (u8 *) & evt,
+ 1 /* do wait for mutex */ ))
+ {
+ clib_warning ("failed to enqueue evt");
+ }
+ }
else
{
clib_warning ("queue full");
{
api_main_t *am = &api_main;
void *oldheap;
+ u32 event_queue_length = 2048;
if (smm->vpp_event_queues[thread_index] == 0)
{
/* Allocate event fifo in the /vpe-api shared-memory segment */
oldheap = svm_push_data_heap (am->vlib_rp);
+ if (smm->configured_event_queue_length)
+ event_queue_length = smm->configured_event_queue_length;
+
smm->vpp_event_queues[thread_index] =
- unix_shared_memory_queue_init (2048 /* nels $$$$ config */ ,
- sizeof (session_fifo_event_t),
- 0 /* consumer pid */ ,
- 0
- /* (do not) send signal when queue non-empty */
- );
+ unix_shared_memory_queue_init
+ (event_queue_length,
+ sizeof (session_fifo_event_t), 0 /* consumer pid */ ,
+ 0 /* (do not) send signal when queue non-empty */ );
svm_pop_heap (oldheap);
}
vec_validate (smm->sessions, num_threads - 1);
vec_validate (smm->session_indices_to_enqueue_by_thread, num_threads - 1);
vec_validate (smm->tx_buffers, num_threads - 1);
- vec_validate (smm->fifo_events, num_threads - 1);
- vec_validate (smm->evts_partially_read, num_threads - 1);
+ vec_validate (smm->pending_event_vector, num_threads - 1);
+ vec_validate (smm->free_event_vector, num_threads - 1);
vec_validate (smm->current_enqueue_epoch, num_threads - 1);
vec_validate (smm->vpp_event_queues, num_threads - 1);
+ for (i = 0; i < num_threads; i++)
+ {
+ vec_validate (smm->free_event_vector[i], 0);
+ _vec_len (smm->free_event_vector[i]) = 0;
+ vec_validate (smm->pending_event_vector[i], 0);
+ _vec_len (smm->pending_event_vector[i]) = 0;
+ }
+
#if SESSION_DBG
vec_validate (smm->last_event_poll_by_thread, num_threads - 1);
#endif
for (i = 0; i < 200000; i++)
{
stream_session_t *ss;
- pool_get (smm->sessions[0], ss);
+ pool_get_aligned (smm->sessions[0], ss, CLIB_CACHE_LINE_BYTES);
memset (ss, 0, sizeof (*ss));
}
}
VLIB_INIT_FUNCTION (session_manager_main_init)
+ static clib_error_t *session_config_fn (vlib_main_t * vm,
+ unformat_input_t * input)
+{
+ session_manager_main_t *smm = &session_manager_main;
+ u32 nitems;
+
+ while (unformat_check_input (input) != UNFORMAT_END_OF_INPUT)
+ {
+ if (unformat (input, "event-queue-length %d", &nitems))
+ {
+ if (nitems >= 2048)
+ smm->configured_event_queue_length = nitems;
+ else
+ clib_warning ("event queue length %d too small, ignored", nitems);
+ }
+ else
+ return clib_error_return (0, "unknown input `%U'",
+ format_unformat_error, input);
+ }
+ return 0;
+}
+
+VLIB_CONFIG_FUNCTION (session_config_fn, "session");
+
/*
* fd.io coding-style-patch-verification: ON
*