+static void
+svm_msg_q_init_mutex (svm_msg_q_shared_queue_t *sq)
+{
+ pthread_mutexattr_t attr;
+ pthread_condattr_t cattr;
+
+ clib_memset (&attr, 0, sizeof (attr));
+ clib_memset (&cattr, 0, sizeof (cattr));
+
+ if (pthread_mutexattr_init (&attr))
+ clib_unix_warning ("mutexattr_init");
+ if (pthread_mutexattr_setpshared (&attr, PTHREAD_PROCESS_SHARED))
+ clib_unix_warning ("pthread_mutexattr_setpshared");
+ if (pthread_mutexattr_setrobust (&attr, PTHREAD_MUTEX_ROBUST))
+ clib_unix_warning ("setrobust");
+ if (pthread_mutex_init (&sq->mutex, &attr))
+ clib_unix_warning ("mutex_init");
+ if (pthread_mutexattr_destroy (&attr))
+ clib_unix_warning ("mutexattr_destroy");
+ if (pthread_condattr_init (&cattr))
+ clib_unix_warning ("condattr_init");
+ if (pthread_condattr_setpshared (&cattr, PTHREAD_PROCESS_SHARED))
+ clib_unix_warning ("condattr_setpshared");
+ if (pthread_cond_init (&sq->condvar, &cattr))
+ clib_unix_warning ("cond_init1");
+ if (pthread_condattr_destroy (&cattr))
+ clib_unix_warning ("cond_init2");
+}
+
+svm_msg_q_shared_t *
+svm_msg_q_init (void *base, svm_msg_q_cfg_t *cfg)
+{
+ svm_msg_q_ring_shared_t *ring;
+ svm_msg_q_shared_queue_t *sq;
+ svm_msg_q_shared_t *smq;
+ u32 q_sz, offset;
+ int i;
+
+ q_sz = sizeof (*sq) + cfg->q_nitems * sizeof (svm_msg_q_msg_t);
+
+ smq = (svm_msg_q_shared_t *) base;
+ sq = smq->q;
+ clib_memset (sq, 0, sizeof (*sq));
+ sq->elsize = sizeof (svm_msg_q_msg_t);
+ sq->maxsize = cfg->q_nitems;
+ smq->n_rings = cfg->n_rings;
+ ring = (void *) ((u8 *) smq->q + q_sz);
+ for (i = 0; i < cfg->n_rings; i++)
+ {
+ ring->elsize = cfg->ring_cfgs[i].elsize;
+ ring->nitems = cfg->ring_cfgs[i].nitems;
+ ring->cursize = ring->head = ring->tail = 0;
+ offset = sizeof (*ring) + ring->nitems * ring->elsize;
+ ring = (void *) ((u8 *) ring + offset);
+ }
+
+ svm_msg_q_init_mutex (sq);
+
+ return smq;
+}
+
+uword
+svm_msg_q_size_to_alloc (svm_msg_q_cfg_t *cfg)