svm: allow mq attachments at random offsets
[vpp.git] / src / svm / message_queue.h
1 /*
2  * Copyright (c) 2018-2019 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 /**
16  * @file
17  * @brief Unidirectional shared-memory multi-ring message queue
18  */
19
20 #ifndef SRC_SVM_MESSAGE_QUEUE_H_
21 #define SRC_SVM_MESSAGE_QUEUE_H_
22
23 #include <vppinfra/clib.h>
24 #include <vppinfra/error.h>
25 #include <svm/queue.h>
26
27 typedef struct svm_msg_q_ring_shared_
28 {
29   volatile u32 cursize;                 /**< current size of the ring */
30   u32 nitems;                           /**< max size of the ring */
31   volatile u32 head;                    /**< current head (for dequeue) */
32   volatile u32 tail;                    /**< current tail (for enqueue) */
33   u32 elsize;                           /**< size of an element */
34   u8 data[0];                           /**< chunk of memory for msg data */
35 } svm_msg_q_ring_shared_t;
36
37 typedef struct svm_msg_q_ring_
38 {
39   u32 nitems;                   /**< max size of the ring */
40   u32 elsize;                   /**< size of an element */
41   svm_msg_q_ring_shared_t *shr; /**< ring in shared memory */
42 } __clib_packed svm_msg_q_ring_t;
43
44 typedef struct svm_msg_q_shared_
45 {
46   u32 n_rings;      /**< number of rings after q */
47   u32 pad;          /**< 8 byte alignment for q */
48   svm_queue_t q[0]; /**< queue for exchanging messages */
49 } __clib_packed svm_msg_q_shared_t;
50
51 typedef struct svm_msg_q_
52 {
53   svm_queue_t *q;                       /**< queue for exchanging messages */
54   svm_msg_q_ring_t *rings;              /**< rings with message data*/
55 } __clib_packed svm_msg_q_t;
56
57 typedef struct svm_msg_q_ring_cfg_
58 {
59   u32 nitems;
60   u32 elsize;
61   void *data;
62 } svm_msg_q_ring_cfg_t;
63
64 typedef struct svm_msg_q_cfg_
65 {
66   int consumer_pid;                     /**< pid of msg consumer */
67   u32 q_nitems;                         /**< msg queue size (not rings) */
68   u32 n_rings;                          /**< number of msg rings */
69   svm_msg_q_ring_cfg_t *ring_cfgs;      /**< array of ring cfgs */
70 } svm_msg_q_cfg_t;
71
72 typedef union
73 {
74   struct
75   {
76     u32 ring_index;                     /**< ring index, could be u8 */
77     u32 elt_index;                      /**< index in ring */
78   };
79   u64 as_u64;
80 } svm_msg_q_msg_t;
81
82 #define SVM_MQ_INVALID_MSG { .as_u64 = ~0 }
83 /**
84  * Allocate message queue
85  *
86  * Allocates a message queue on the heap. Based on the configuration options,
87  * apart from the message queue this also allocates (one or multiple)
88  * shared-memory rings for the messages.
89  *
90  * @param cfg           configuration options: queue len, consumer pid,
91  *                      ring configs
92  * @return              message queue
93  */
94 svm_msg_q_shared_t *svm_msg_q_alloc (svm_msg_q_cfg_t *cfg);
95 svm_msg_q_shared_t *svm_msg_q_init (void *base, svm_msg_q_cfg_t *cfg);
96 uword svm_msg_q_size_to_alloc (svm_msg_q_cfg_t *cfg);
97
98 void svm_msg_q_attach (svm_msg_q_t *mq, void *smq_base);
99
100 /**
101  * Free message queue
102  *
103  * @param mq            message queue to be freed
104  */
105 void svm_msg_q_free (svm_msg_q_t * mq);
106
107 /**
108  * Allocate message buffer
109  *
110  * Message is allocated on the first available ring capable of holding
111  * the requested number of bytes.
112  *
113  * @param mq            message queue
114  * @param nbytes        number of bytes needed for message
115  * @return              message structure pointing to the ring and position
116  *                      allocated
117  */
118 svm_msg_q_msg_t svm_msg_q_alloc_msg (svm_msg_q_t * mq, u32 nbytes);
119
120 /**
121  * Allocate message buffer on ring
122  *
123  * Message is allocated, on requested ring. The caller MUST check that
124  * the ring is not full.
125  *
126  * @param mq            message queue
127  * @param ring_index    ring on which the allocation should occur
128  * @return              message structure pointing to the ring and position
129  *                      allocated
130  */
131 svm_msg_q_msg_t svm_msg_q_alloc_msg_w_ring (svm_msg_q_t * mq, u32 ring_index);
132
133 /**
134  * Lock message queue and allocate message buffer on ring
135  *
136  * This should be used when multiple writers/readers are expected to
137  * compete for the rings/queue. Message should be enqueued by calling
138  * @ref svm_msg_q_add_w_lock and the caller MUST unlock the queue once
139  * the message in enqueued.
140  *
141  * @param mq            message queue
142  * @param ring_index    ring on which the allocation should occur
143  * @param noblock       flag that indicates if request should block
144  * @param msg           pointer to message to be filled in
145  * @return              0 on success, negative number otherwise
146  */
147 int svm_msg_q_lock_and_alloc_msg_w_ring (svm_msg_q_t * mq, u32 ring_index,
148                                          u8 noblock, svm_msg_q_msg_t * msg);
149
150 /**
151  * Free message buffer
152  *
153  * Marks message buffer on ring as free.
154  *
155  * @param mq            message queue
156  * @param msg           message to be freed
157  */
158 void svm_msg_q_free_msg (svm_msg_q_t * mq, svm_msg_q_msg_t * msg);
159
160 /**
161  * Producer enqueue one message to queue
162  *
163  * Prior to calling this, the producer should've obtained a message buffer
164  * from one of the rings by calling @ref svm_msg_q_alloc_msg.
165  *
166  * @param mq            message queue
167  * @param msg           message (pointer to ring position) to be enqueued
168  * @param nowait        flag to indicate if request is blocking or not
169  * @return              success status
170  */
171 int svm_msg_q_add (svm_msg_q_t * mq, svm_msg_q_msg_t * msg, int nowait);
172
173 /**
174  * Producer enqueue one message to queue with mutex held
175  *
176  * Prior to calling this, the producer should've obtained a message buffer
177  * from one of the rings by calling @ref svm_msg_q_alloc_msg. It assumes
178  * the queue mutex is held.
179  *
180  * @param mq            message queue
181  * @param msg           message (pointer to ring position) to be enqueued
182  * @return              success status
183  */
184 void svm_msg_q_add_and_unlock (svm_msg_q_t * mq, svm_msg_q_msg_t * msg);
185
186 /**
187  * Consumer dequeue one message from queue
188  *
189  * This returns the message pointing to the data in the message rings.
190  * The consumer is expected to call @ref svm_msg_q_free_msg once it
191  * finishes processing/copies the message data.
192  *
193  * @param mq            message queue
194  * @param msg           pointer to structure where message is to be received
195  * @param cond          flag that indicates if request should block or not
196  * @param time          time to wait if condition it SVM_Q_TIMEDWAIT
197  * @return              success status
198  */
199 int svm_msg_q_sub (svm_msg_q_t * mq, svm_msg_q_msg_t * msg,
200                    svm_q_conditional_wait_t cond, u32 time);
201
202 /**
203  * Consumer dequeue one message from queue with mutex held
204  *
205  * Returns the message pointing to the data in the message rings under the
206  * assumption that the message queue lock is already held. The consumer is
207  * expected to call @ref svm_msg_q_free_msg once it finishes
208  * processing/copies the message data.
209  *
210  * @param mq            message queue
211  * @param msg           pointer to structure where message is to be received
212  * @return              success status
213  */
214 void svm_msg_q_sub_w_lock (svm_msg_q_t * mq, svm_msg_q_msg_t * msg);
215
216 /**
217  * Get data for message in queue
218  *
219  * @param mq            message queue
220  * @param msg           message for which the data is requested
221  * @return              pointer to data
222  */
223 void *svm_msg_q_msg_data (svm_msg_q_t * mq, svm_msg_q_msg_t * msg);
224
225 /**
226  * Get message queue ring
227  *
228  * @param mq            message queue
229  * @param ring_index    index of ring
230  * @return              pointer to ring
231  */
232 svm_msg_q_ring_t *svm_msg_q_ring (svm_msg_q_t * mq, u32 ring_index);
233
234 /**
235  * Set event fd for queue consumer
236  *
237  * If set, queue will exclusively use eventfds for signaling. Moreover,
238  * afterwards, the queue should only be used in non-blocking mode. Waiting
239  * for events should be done externally using something like epoll.
240  *
241  * @param mq            message queue
242  * @param fd            consumer eventfd
243  */
244 void svm_msg_q_set_consumer_eventfd (svm_msg_q_t * mq, int fd);
245
246 /**
247  * Set event fd for queue producer
248  *
249  * If set, queue will exclusively use eventfds for signaling. Moreover,
250  * afterwards, the queue should only be used in non-blocking mode. Waiting
251  * for events should be done externally using something like epoll.
252  *
253  * @param mq            message queue
254  * @param fd            producer eventfd
255  */
256 void svm_msg_q_set_producer_eventfd (svm_msg_q_t * mq, int fd);
257
258 /**
259  * Allocate event fd for queue consumer
260  */
261 int svm_msg_q_alloc_consumer_eventfd (svm_msg_q_t * mq);
262
263 /**
264  * Allocate event fd for queue consumer
265  */
266 int svm_msg_q_alloc_producer_eventfd (svm_msg_q_t * mq);
267
268
269 /**
270  * Format message queue, shows msg count for each ring
271  */
272 u8 *format_svm_msg_q (u8 * s, va_list * args);
273
274 /**
275  * Check if message queue is full
276  */
277 static inline u8
278 svm_msg_q_is_full (svm_msg_q_t * mq)
279 {
280   return (mq->q->cursize == mq->q->maxsize);
281 }
282
283 static inline u8
284 svm_msg_q_ring_is_full (svm_msg_q_t * mq, u32 ring_index)
285 {
286   svm_msg_q_ring_t *ring = vec_elt_at_index (mq->rings, ring_index);
287   return (ring->shr->cursize >= ring->nitems);
288 }
289
290 /**
291  * Check if message queue is empty
292  */
293 static inline u8
294 svm_msg_q_is_empty (svm_msg_q_t * mq)
295 {
296   return (mq->q->cursize == 0);
297 }
298
299 /**
300  * Check length of message queue
301  */
302 static inline u32
303 svm_msg_q_size (svm_msg_q_t * mq)
304 {
305   return mq->q->cursize;
306 }
307
308 /**
309  * Check if message is invalid
310  */
311 static inline u8
312 svm_msg_q_msg_is_invalid (svm_msg_q_msg_t * msg)
313 {
314   return (msg->as_u64 == (u64) ~ 0);
315 }
316
317 /**
318  * Try locking message queue
319  */
320 static inline int
321 svm_msg_q_try_lock (svm_msg_q_t * mq)
322 {
323   int rv = pthread_mutex_trylock (&mq->q->mutex);
324   if (PREDICT_FALSE (rv == EOWNERDEAD))
325     rv = pthread_mutex_consistent (&mq->q->mutex);
326   return rv;
327 }
328
329 /**
330  * Lock, or block trying, the message queue
331  */
332 static inline int
333 svm_msg_q_lock (svm_msg_q_t * mq)
334 {
335   int rv = pthread_mutex_lock (&mq->q->mutex);
336   if (PREDICT_FALSE (rv == EOWNERDEAD))
337     rv = pthread_mutex_consistent (&mq->q->mutex);
338   return rv;
339 }
340
341 /**
342  * Unlock message queue
343  */
344 static inline void
345 svm_msg_q_unlock (svm_msg_q_t * mq)
346 {
347   pthread_mutex_unlock (&mq->q->mutex);
348 }
349
350 /**
351  * Wait for message queue event
352  *
353  * Must be called with mutex held. The queue only works non-blocking
354  * with eventfds, so handle blocking calls as an exception here.
355  */
356 static inline void
357 svm_msg_q_wait (svm_msg_q_t * mq)
358 {
359   svm_queue_wait (mq->q);
360 }
361
362 /**
363  * Timed wait for message queue event
364  *
365  * Must be called with mutex held.
366  *
367  * @param mq            message queue
368  * @param timeout       time in seconds
369  */
370 static inline int
371 svm_msg_q_timedwait (svm_msg_q_t * mq, double timeout)
372 {
373   return svm_queue_timedwait (mq->q, timeout);
374 }
375
376 static inline int
377 svm_msg_q_get_consumer_eventfd (svm_msg_q_t * mq)
378 {
379   return mq->q->consumer_evtfd;
380 }
381
382 static inline int
383 svm_msg_q_get_producer_eventfd (svm_msg_q_t * mq)
384 {
385   return mq->q->producer_evtfd;
386 }
387
388 #endif /* SRC_SVM_MESSAGE_QUEUE_H_ */
389
390 /*
391  * fd.io coding-style-patch-verification: ON
392  *
393  * Local Variables:
394  * eval: (c-set-style "gnu")
395  * End:
396  */