svm: remove fifo segment heap
[vpp.git] / src / svm / message_queue.h
1 /*
2  * Copyright (c) 2018-2019 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 /**
16  * @file
17  * @brief Unidirectional shared-memory multi-ring message queue
18  */
19
20 #ifndef SRC_SVM_MESSAGE_QUEUE_H_
21 #define SRC_SVM_MESSAGE_QUEUE_H_
22
23 #include <vppinfra/clib.h>
24 #include <vppinfra/error.h>
25 #include <svm/queue.h>
26
27 typedef struct svm_msg_q_ring_
28 {
29   volatile u32 cursize;                 /**< current size of the ring */
30   u32 nitems;                           /**< max size of the ring */
31   volatile u32 head;                    /**< current head (for dequeue) */
32   volatile u32 tail;                    /**< current tail (for enqueue) */
33   u32 elsize;                           /**< size of an element */
34   u8 *data;                             /**< chunk of memory for msg data */
35 } __clib_packed svm_msg_q_ring_t;
36
37 typedef struct svm_msg_q_
38 {
39   svm_queue_t *q;                       /**< queue for exchanging messages */
40   svm_msg_q_ring_t *rings;              /**< rings with message data*/
41 } __clib_packed svm_msg_q_t;
42
43 typedef struct svm_msg_q_ring_cfg_
44 {
45   u32 nitems;
46   u32 elsize;
47   void *data;
48 } svm_msg_q_ring_cfg_t;
49
50 typedef struct svm_msg_q_cfg_
51 {
52   int consumer_pid;                     /**< pid of msg consumer */
53   u32 q_nitems;                         /**< msg queue size (not rings) */
54   u32 n_rings;                          /**< number of msg rings */
55   svm_msg_q_ring_cfg_t *ring_cfgs;      /**< array of ring cfgs */
56 } svm_msg_q_cfg_t;
57
58 typedef union
59 {
60   struct
61   {
62     u32 ring_index;                     /**< ring index, could be u8 */
63     u32 elt_index;                      /**< index in ring */
64   };
65   u64 as_u64;
66 } svm_msg_q_msg_t;
67
68 #define SVM_MQ_INVALID_MSG { .as_u64 = ~0 }
69 /**
70  * Allocate message queue
71  *
72  * Allocates a message queue on the heap. Based on the configuration options,
73  * apart from the message queue this also allocates (one or multiple)
74  * shared-memory rings for the messages.
75  *
76  * @param cfg           configuration options: queue len, consumer pid,
77  *                      ring configs
78  * @return              message queue
79  */
80 svm_msg_q_t *svm_msg_q_alloc (svm_msg_q_cfg_t * cfg);
81 svm_msg_q_t *svm_msg_q_init (void *base, svm_msg_q_cfg_t *cfg);
82 uword svm_msg_q_size_to_alloc (svm_msg_q_cfg_t *cfg);
83
84 /**
85  * Free message queue
86  *
87  * @param mq            message queue to be freed
88  */
89 void svm_msg_q_free (svm_msg_q_t * mq);
90
91 /**
92  * Allocate message buffer
93  *
94  * Message is allocated on the first available ring capable of holding
95  * the requested number of bytes.
96  *
97  * @param mq            message queue
98  * @param nbytes        number of bytes needed for message
99  * @return              message structure pointing to the ring and position
100  *                      allocated
101  */
102 svm_msg_q_msg_t svm_msg_q_alloc_msg (svm_msg_q_t * mq, u32 nbytes);
103
104 /**
105  * Allocate message buffer on ring
106  *
107  * Message is allocated, on requested ring. The caller MUST check that
108  * the ring is not full.
109  *
110  * @param mq            message queue
111  * @param ring_index    ring on which the allocation should occur
112  * @return              message structure pointing to the ring and position
113  *                      allocated
114  */
115 svm_msg_q_msg_t svm_msg_q_alloc_msg_w_ring (svm_msg_q_t * mq, u32 ring_index);
116
117 /**
118  * Lock message queue and allocate message buffer on ring
119  *
120  * This should be used when multiple writers/readers are expected to
121  * compete for the rings/queue. Message should be enqueued by calling
122  * @ref svm_msg_q_add_w_lock and the caller MUST unlock the queue once
123  * the message in enqueued.
124  *
125  * @param mq            message queue
126  * @param ring_index    ring on which the allocation should occur
127  * @param noblock       flag that indicates if request should block
128  * @param msg           pointer to message to be filled in
129  * @return              0 on success, negative number otherwise
130  */
131 int svm_msg_q_lock_and_alloc_msg_w_ring (svm_msg_q_t * mq, u32 ring_index,
132                                          u8 noblock, svm_msg_q_msg_t * msg);
133
134 /**
135  * Free message buffer
136  *
137  * Marks message buffer on ring as free.
138  *
139  * @param mq            message queue
140  * @param msg           message to be freed
141  */
142 void svm_msg_q_free_msg (svm_msg_q_t * mq, svm_msg_q_msg_t * msg);
143
144 /**
145  * Producer enqueue one message to queue
146  *
147  * Prior to calling this, the producer should've obtained a message buffer
148  * from one of the rings by calling @ref svm_msg_q_alloc_msg.
149  *
150  * @param mq            message queue
151  * @param msg           message (pointer to ring position) to be enqueued
152  * @param nowait        flag to indicate if request is blocking or not
153  * @return              success status
154  */
155 int svm_msg_q_add (svm_msg_q_t * mq, svm_msg_q_msg_t * msg, int nowait);
156
157 /**
158  * Producer enqueue one message to queue with mutex held
159  *
160  * Prior to calling this, the producer should've obtained a message buffer
161  * from one of the rings by calling @ref svm_msg_q_alloc_msg. It assumes
162  * the queue mutex is held.
163  *
164  * @param mq            message queue
165  * @param msg           message (pointer to ring position) to be enqueued
166  * @return              success status
167  */
168 void svm_msg_q_add_and_unlock (svm_msg_q_t * mq, svm_msg_q_msg_t * msg);
169
170 /**
171  * Consumer dequeue one message from queue
172  *
173  * This returns the message pointing to the data in the message rings.
174  * The consumer is expected to call @ref svm_msg_q_free_msg once it
175  * finishes processing/copies the message data.
176  *
177  * @param mq            message queue
178  * @param msg           pointer to structure where message is to be received
179  * @param cond          flag that indicates if request should block or not
180  * @param time          time to wait if condition it SVM_Q_TIMEDWAIT
181  * @return              success status
182  */
183 int svm_msg_q_sub (svm_msg_q_t * mq, svm_msg_q_msg_t * msg,
184                    svm_q_conditional_wait_t cond, u32 time);
185
186 /**
187  * Consumer dequeue one message from queue with mutex held
188  *
189  * Returns the message pointing to the data in the message rings under the
190  * assumption that the message queue lock is already held. The consumer is
191  * expected to call @ref svm_msg_q_free_msg once it finishes
192  * processing/copies the message data.
193  *
194  * @param mq            message queue
195  * @param msg           pointer to structure where message is to be received
196  * @return              success status
197  */
198 void svm_msg_q_sub_w_lock (svm_msg_q_t * mq, svm_msg_q_msg_t * msg);
199
200 /**
201  * Get data for message in queue
202  *
203  * @param mq            message queue
204  * @param msg           message for which the data is requested
205  * @return              pointer to data
206  */
207 void *svm_msg_q_msg_data (svm_msg_q_t * mq, svm_msg_q_msg_t * msg);
208
209 /**
210  * Get message queue ring
211  *
212  * @param mq            message queue
213  * @param ring_index    index of ring
214  * @return              pointer to ring
215  */
216 svm_msg_q_ring_t *svm_msg_q_ring (svm_msg_q_t * mq, u32 ring_index);
217
218 /**
219  * Set event fd for queue consumer
220  *
221  * If set, queue will exclusively use eventfds for signaling. Moreover,
222  * afterwards, the queue should only be used in non-blocking mode. Waiting
223  * for events should be done externally using something like epoll.
224  *
225  * @param mq            message queue
226  * @param fd            consumer eventfd
227  */
228 void svm_msg_q_set_consumer_eventfd (svm_msg_q_t * mq, int fd);
229
230 /**
231  * Set event fd for queue producer
232  *
233  * If set, queue will exclusively use eventfds for signaling. Moreover,
234  * afterwards, the queue should only be used in non-blocking mode. Waiting
235  * for events should be done externally using something like epoll.
236  *
237  * @param mq            message queue
238  * @param fd            producer eventfd
239  */
240 void svm_msg_q_set_producer_eventfd (svm_msg_q_t * mq, int fd);
241
242 /**
243  * Allocate event fd for queue consumer
244  */
245 int svm_msg_q_alloc_consumer_eventfd (svm_msg_q_t * mq);
246
247 /**
248  * Allocate event fd for queue consumer
249  */
250 int svm_msg_q_alloc_producer_eventfd (svm_msg_q_t * mq);
251
252
253 /**
254  * Format message queue, shows msg count for each ring
255  */
256 u8 *format_svm_msg_q (u8 * s, va_list * args);
257
258 /**
259  * Check if message queue is full
260  */
261 static inline u8
262 svm_msg_q_is_full (svm_msg_q_t * mq)
263 {
264   return (mq->q->cursize == mq->q->maxsize);
265 }
266
267 static inline u8
268 svm_msg_q_ring_is_full (svm_msg_q_t * mq, u32 ring_index)
269 {
270   ASSERT (ring_index < vec_len (mq->rings));
271   return (mq->rings[ring_index].cursize == mq->rings[ring_index].nitems);
272 }
273
274 /**
275  * Check if message queue is empty
276  */
277 static inline u8
278 svm_msg_q_is_empty (svm_msg_q_t * mq)
279 {
280   return (mq->q->cursize == 0);
281 }
282
283 /**
284  * Check length of message queue
285  */
286 static inline u32
287 svm_msg_q_size (svm_msg_q_t * mq)
288 {
289   return mq->q->cursize;
290 }
291
292 /**
293  * Check if message is invalid
294  */
295 static inline u8
296 svm_msg_q_msg_is_invalid (svm_msg_q_msg_t * msg)
297 {
298   return (msg->as_u64 == (u64) ~ 0);
299 }
300
301 /**
302  * Try locking message queue
303  */
304 static inline int
305 svm_msg_q_try_lock (svm_msg_q_t * mq)
306 {
307   int rv = pthread_mutex_trylock (&mq->q->mutex);
308   if (PREDICT_FALSE (rv == EOWNERDEAD))
309     rv = pthread_mutex_consistent (&mq->q->mutex);
310   return rv;
311 }
312
313 /**
314  * Lock, or block trying, the message queue
315  */
316 static inline int
317 svm_msg_q_lock (svm_msg_q_t * mq)
318 {
319   int rv = pthread_mutex_lock (&mq->q->mutex);
320   if (PREDICT_FALSE (rv == EOWNERDEAD))
321     rv = pthread_mutex_consistent (&mq->q->mutex);
322   return rv;
323 }
324
325 /**
326  * Unlock message queue
327  */
328 static inline void
329 svm_msg_q_unlock (svm_msg_q_t * mq)
330 {
331   pthread_mutex_unlock (&mq->q->mutex);
332 }
333
334 /**
335  * Wait for message queue event
336  *
337  * Must be called with mutex held. The queue only works non-blocking
338  * with eventfds, so handle blocking calls as an exception here.
339  */
340 static inline void
341 svm_msg_q_wait (svm_msg_q_t * mq)
342 {
343   svm_queue_wait (mq->q);
344 }
345
346 /**
347  * Timed wait for message queue event
348  *
349  * Must be called with mutex held.
350  *
351  * @param mq            message queue
352  * @param timeout       time in seconds
353  */
354 static inline int
355 svm_msg_q_timedwait (svm_msg_q_t * mq, double timeout)
356 {
357   return svm_queue_timedwait (mq->q, timeout);
358 }
359
360 static inline int
361 svm_msg_q_get_consumer_eventfd (svm_msg_q_t * mq)
362 {
363   return mq->q->consumer_evtfd;
364 }
365
366 static inline int
367 svm_msg_q_get_producer_eventfd (svm_msg_q_t * mq)
368 {
369   return mq->q->producer_evtfd;
370 }
371
372 #endif /* SRC_SVM_MESSAGE_QUEUE_H_ */
373
374 /*
375  * fd.io coding-style-patch-verification: ON
376  *
377  * Local Variables:
378  * eval: (c-set-style "gnu")
379  * End:
380  */