/**
* Default fifo and segment size. TODO config.
*/
-u32 default_fifo_size = 1 << 12;
-u32 default_segment_size = 1 << 20;
+static u32 default_fifo_size = 1 << 12;
+static u32 default_segment_size = 1 << 20;
+static u32 default_app_evt_queue_size = 128;
segment_manager_properties_t *
segment_manager_properties_get (segment_manager_t * sm)
{
- return application_get_segment_manager_properties (sm->app_index);
+ app_worker_t *app_wrk = app_worker_get (sm->app_wrk_index);
+ return application_get_segment_manager_properties (app_wrk->app_index);
}
segment_manager_properties_t *
props->add_segment_size = default_segment_size;
props->rx_fifo_size = default_fifo_size;
props->tx_fifo_size = default_fifo_size;
+ props->evt_q_size = default_app_evt_queue_size;
return props;
}
static u8
segment_manager_app_detached (segment_manager_t * sm)
{
- return (sm->app_index == SEGMENT_MANAGER_INVALID_APP_INDEX);
+ return (sm->app_wrk_index == SEGMENT_MANAGER_INVALID_APP_INDEX);
}
void
segment_manager_app_detach (segment_manager_t * sm)
{
- sm->app_index = SEGMENT_MANAGER_INVALID_APP_INDEX;
+ sm->app_wrk_index = SEGMENT_MANAGER_INVALID_APP_INDEX;
}
always_inline u32
/**
* Remove segment without lock
*/
-always_inline void
+void
segment_manager_del_segment (segment_manager_t * sm,
svm_fifo_segment_private_t * fs)
{
/**
* Removes segment after acquiring writer lock
*/
-always_inline void
+static inline void
segment_manager_lock_and_del_segment (segment_manager_t * sm, u32 fs_index)
{
svm_fifo_segment_private_t *fs;
void
segment_manager_segment_reader_unlock (segment_manager_t * sm)
{
+ ASSERT (sm->segments_rwlock->n_readers > 0);
clib_rwlock_reader_unlock (&sm->segments_rwlock);
}
* If needed a writer's lock is acquired before allocating a new segment
* to avoid affecting any of the segments pool readers.
*/
-always_inline int
+int
segment_manager_add_segment (segment_manager_t * sm, u32 segment_size)
{
segment_manager_main_t *smm = &segment_manager_main;
- u32 rnd_margin = 128 << 10, seg_index;
+ u32 rnd_margin = 128 << 10, seg_index, page_size;
segment_manager_properties_t *props;
uword baseva = (u64) ~ 0, alloc_size;
svm_fifo_segment_private_t *seg;
* Initialize ssvm segment and svm fifo private header
*/
segment_size = segment_size ? segment_size : props->add_segment_size;
+ page_size = clib_mem_get_page_size ();
+ segment_size = (segment_size + page_size - 1) & ~(page_size - 1);
if (props->segment_type != SSVM_SEGMENT_PRIVATE)
{
seg_name = format (0, "%d-%d%c", getpid (), segment_name_counter++, 0);
*/
int
segment_manager_init (segment_manager_t * sm, u32 first_seg_size,
- u32 evt_q_size, u32 prealloc_fifo_pairs)
+ u32 prealloc_fifo_pairs)
{
u32 rx_fifo_size, tx_fifo_size, pair_size;
u32 rx_rounded_data_size, tx_rounded_data_size;
return seg_index;
}
+ segment = segment_manager_get_segment (sm, seg_index);
if (i == 0)
- sm->event_queue = segment_manager_alloc_queue (sm, evt_q_size);
+ sm->event_queue = segment_manager_alloc_queue (segment, props);
- segment = segment_manager_get_segment (sm, seg_index);
svm_fifo_segment_preallocate_fifo_pairs (segment,
props->rx_fifo_size,
props->tx_fifo_size,
clib_warning ("Failed to allocate segment");
return seg_index;
}
- sm->event_queue = segment_manager_alloc_queue (sm, evt_q_size);
+ segment = segment_manager_get_segment (sm, seg_index);
+ sm->event_queue = segment_manager_alloc_queue (segment, props);
}
return 0;
*/
while (fifo)
{
+ if (fifo->master_thread_index == 255)
+ {
+ svm_fifo_t *next = fifo->next;
+ application_local_session_disconnect_w_index (sm->app_wrk_index,
+ fifo->master_session_index);
+ fifo = next;
+ continue;
+ }
session = session_get (fifo->master_session_index,
fifo->master_thread_index);
stream_session_disconnect (session);
}
}
-always_inline int
-segment_try_alloc_fifos (svm_fifo_segment_private_t * fifo_segment,
- u32 rx_fifo_size, u32 tx_fifo_size,
- svm_fifo_t ** rx_fifo, svm_fifo_t ** tx_fifo)
+int
+segment_manager_try_alloc_fifos (svm_fifo_segment_private_t * fifo_segment,
+ u32 rx_fifo_size, u32 tx_fifo_size,
+ svm_fifo_t ** rx_fifo, svm_fifo_t ** tx_fifo)
{
rx_fifo_size = clib_max (rx_fifo_size, default_fifo_size);
*rx_fifo = svm_fifo_segment_alloc_fifo (fifo_segment, rx_fifo_size,
svm_fifo_t ** tx_fifo,
u32 * fifo_segment_index)
{
- svm_fifo_segment_private_t *fifo_segment;
+ svm_fifo_segment_private_t *fifo_segment = 0;
int alloc_fail = 1, rv = 0, new_fs_index;
segment_manager_properties_t *props;
u8 added_a_segment = 0;
u32 sm_index;
- ASSERT (pool_elts (sm->segments) != 0);
props = segment_manager_properties_get (sm);
/*
/* *INDENT-OFF* */
segment_manager_foreach_segment_w_lock (fifo_segment, sm, ({
- alloc_fail = segment_try_alloc_fifos (fifo_segment, props->rx_fifo_size,
- props->tx_fifo_size, rx_fifo,
- tx_fifo);
+ alloc_fail = segment_manager_try_alloc_fifos (fifo_segment,
+ props->rx_fifo_size,
+ props->tx_fifo_size,
+ rx_fifo, tx_fifo);
/* Exit with lock held, drop it after notifying app */
if (!alloc_fail)
goto alloc_success;
*fifo_segment_index = segment_manager_segment_index (sm, fifo_segment);
if (added_a_segment)
- rv = application_add_segment_notify (sm->app_index,
- &fifo_segment->ssvm);
+ rv = app_worker_add_segment_notify (sm->app_wrk_index,
+ &fifo_segment->ssvm);
/* Drop the lock after app is notified */
segment_manager_segment_reader_unlock (sm);
return rv;
return SESSION_ERROR_SEG_CREATE;
}
fifo_segment = segment_manager_get_segment_w_lock (sm, new_fs_index);
- alloc_fail = segment_try_alloc_fifos (fifo_segment, props->rx_fifo_size,
- props->tx_fifo_size, rx_fifo,
- tx_fifo);
+ alloc_fail = segment_manager_try_alloc_fifos (fifo_segment,
+ props->rx_fifo_size,
+ props->tx_fifo_size,
+ rx_fifo, tx_fifo);
added_a_segment = 1;
goto alloc_check;
}
segment_manager_segment_reader_unlock (sm);
}
+u32
+segment_manager_evt_q_expected_size (u32 q_len)
+{
+ u32 fifo_evt_size, notif_q_size, q_hdrs;
+ u32 msg_q_sz, fifo_evt_ring_sz, session_ntf_ring_sz;
+
+ fifo_evt_size = 1 << max_log2 (sizeof (session_event_t));
+ notif_q_size = clib_max (16, q_len >> 4);
+
+ msg_q_sz = q_len * sizeof (svm_msg_q_msg_t);
+ fifo_evt_ring_sz = q_len * fifo_evt_size;
+ session_ntf_ring_sz = notif_q_size * 256;
+ q_hdrs = sizeof (svm_queue_t) + sizeof (svm_msg_q_t);
+
+ return (msg_q_sz + fifo_evt_ring_sz + session_ntf_ring_sz + q_hdrs);
+}
+
/**
* Allocates shm queue in the first segment
*
* Must be called with lock held
*/
-svm_queue_t *
-segment_manager_alloc_queue (segment_manager_t * sm, u32 queue_size)
+svm_msg_q_t *
+segment_manager_alloc_queue (svm_fifo_segment_private_t * segment,
+ segment_manager_properties_t * props)
{
- svm_fifo_segment_private_t *segment;
- ssvm_shared_header_t *sh;
- svm_queue_t *q;
+ u32 fifo_evt_size, session_evt_size = 256, notif_q_size;
+ svm_msg_q_cfg_t _cfg, *cfg = &_cfg;
+ svm_msg_q_t *q;
void *oldheap;
- ASSERT (!pool_is_free_index (sm->segments, 0));
-
- segment = segment_manager_get_segment (sm, 0);
- sh = segment->ssvm.sh;
+ fifo_evt_size = sizeof (session_event_t);
+ notif_q_size = clib_max (16, props->evt_q_size >> 4);
+ /* *INDENT-OFF* */
+ svm_msg_q_ring_cfg_t rc[SESSION_MQ_N_RINGS] = {
+ {props->evt_q_size, fifo_evt_size, 0},
+ {notif_q_size, session_evt_size, 0}
+ };
+ /* *INDENT-ON* */
+ cfg->consumer_pid = 0;
+ cfg->n_rings = 2;
+ cfg->q_nitems = props->evt_q_size;
+ cfg->ring_cfgs = rc;
- oldheap = ssvm_push_heap (sh);
- q = svm_queue_init (queue_size, sizeof (session_fifo_event_t),
- 0 /* consumer pid */ ,
- 0 /* signal when queue non-empty */ );
+ oldheap = ssvm_push_heap (segment->ssvm.sh);
+ q = svm_msg_q_alloc (cfg);
ssvm_pop_heap (oldheap);
+
+ if (props->use_mq_eventfd)
+ {
+ if (svm_msg_q_alloc_producer_eventfd (q))
+ clib_warning ("failed to alloc eventfd");
+ }
return q;
}
/* *INDENT-OFF* */
pool_foreach (sm, smm->segment_managers, ({
- vlib_cli_output (vm, "%-10d%=15d%=12d", segment_manager_index(sm),
- sm->app_index, pool_elts (sm->segments));
+ vlib_cli_output (vm, "%-10d%=15d%=12d", segment_manager_index (sm),
+ sm->app_wrk_index, pool_elts (sm->segments));
}));
/* *INDENT-ON* */