svm: add custom q implementation for mq
[vpp.git] / src / svm / message_queue.h
1 /*
2  * Copyright (c) 2018-2019 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 /**
16  * @file
17  * @brief Unidirectional shared-memory multi-ring message queue
18  */
19
20 #ifndef SRC_SVM_MESSAGE_QUEUE_H_
21 #define SRC_SVM_MESSAGE_QUEUE_H_
22
23 #include <vppinfra/clib.h>
24 #include <vppinfra/error.h>
25 #include <svm/queue.h>
26
27 typedef struct svm_msg_q_shr_queue_
28 {
29   pthread_mutex_t mutex;  /* 8 bytes */
30   pthread_cond_t condvar; /* 8 bytes */
31   u32 head;
32   u32 tail;
33   volatile u32 cursize;
34   u32 maxsize;
35   u32 elsize;
36   u32 pad;
37   u8 data[0];
38 } svm_msg_q_shared_queue_t;
39
40 typedef struct svm_msg_q_queue_
41 {
42   svm_msg_q_shared_queue_t *shr; /**< pointer to shared queue */
43   int evtfd;                     /**< producer/consumer eventfd */
44 } svm_msg_q_queue_t;
45
46 typedef struct svm_msg_q_ring_shared_
47 {
48   volatile u32 cursize;                 /**< current size of the ring */
49   u32 nitems;                           /**< max size of the ring */
50   volatile u32 head;                    /**< current head (for dequeue) */
51   volatile u32 tail;                    /**< current tail (for enqueue) */
52   u32 elsize;                           /**< size of an element */
53   u8 data[0];                           /**< chunk of memory for msg data */
54 } svm_msg_q_ring_shared_t;
55
56 typedef struct svm_msg_q_ring_
57 {
58   u32 nitems;                   /**< max size of the ring */
59   u32 elsize;                   /**< size of an element */
60   svm_msg_q_ring_shared_t *shr; /**< ring in shared memory */
61 } __clib_packed svm_msg_q_ring_t;
62
63 typedef struct svm_msg_q_shared_
64 {
65   u32 n_rings;                   /**< number of rings after q */
66   u32 pad;                       /**< 8 byte alignment for q */
67   svm_msg_q_shared_queue_t q[0]; /**< queue for exchanging messages */
68 } __clib_packed svm_msg_q_shared_t;
69
70 typedef struct svm_msg_q_
71 {
72   svm_msg_q_queue_t q;                  /**< queue for exchanging messages */
73   svm_msg_q_ring_t *rings;              /**< rings with message data*/
74 } __clib_packed svm_msg_q_t;
75
76 typedef struct svm_msg_q_ring_cfg_
77 {
78   u32 nitems;
79   u32 elsize;
80   void *data;
81 } svm_msg_q_ring_cfg_t;
82
83 typedef struct svm_msg_q_cfg_
84 {
85   int consumer_pid;                     /**< pid of msg consumer */
86   u32 q_nitems;                         /**< msg queue size (not rings) */
87   u32 n_rings;                          /**< number of msg rings */
88   svm_msg_q_ring_cfg_t *ring_cfgs;      /**< array of ring cfgs */
89 } svm_msg_q_cfg_t;
90
91 typedef union
92 {
93   struct
94   {
95     u32 ring_index;                     /**< ring index, could be u8 */
96     u32 elt_index;                      /**< index in ring */
97   };
98   u64 as_u64;
99 } svm_msg_q_msg_t;
100
101 #define SVM_MQ_INVALID_MSG { .as_u64 = ~0 }
102 /**
103  * Allocate message queue
104  *
105  * Allocates a message queue on the heap. Based on the configuration options,
106  * apart from the message queue this also allocates (one or multiple)
107  * shared-memory rings for the messages.
108  *
109  * @param cfg           configuration options: queue len, consumer pid,
110  *                      ring configs
111  * @return              message queue
112  */
113 svm_msg_q_shared_t *svm_msg_q_alloc (svm_msg_q_cfg_t *cfg);
114 svm_msg_q_shared_t *svm_msg_q_init (void *base, svm_msg_q_cfg_t *cfg);
115 uword svm_msg_q_size_to_alloc (svm_msg_q_cfg_t *cfg);
116
117 void svm_msg_q_attach (svm_msg_q_t *mq, void *smq_base);
118
119 /**
120  * Free message queue
121  *
122  * @param mq            message queue to be freed
123  */
124 void svm_msg_q_free (svm_msg_q_t * mq);
125
126 /**
127  * Allocate message buffer
128  *
129  * Message is allocated on the first available ring capable of holding
130  * the requested number of bytes.
131  *
132  * @param mq            message queue
133  * @param nbytes        number of bytes needed for message
134  * @return              message structure pointing to the ring and position
135  *                      allocated
136  */
137 svm_msg_q_msg_t svm_msg_q_alloc_msg (svm_msg_q_t * mq, u32 nbytes);
138
139 /**
140  * Allocate message buffer on ring
141  *
142  * Message is allocated, on requested ring. The caller MUST check that
143  * the ring is not full.
144  *
145  * @param mq            message queue
146  * @param ring_index    ring on which the allocation should occur
147  * @return              message structure pointing to the ring and position
148  *                      allocated
149  */
150 svm_msg_q_msg_t svm_msg_q_alloc_msg_w_ring (svm_msg_q_t * mq, u32 ring_index);
151
152 /**
153  * Lock message queue and allocate message buffer on ring
154  *
155  * This should be used when multiple writers/readers are expected to
156  * compete for the rings/queue. Message should be enqueued by calling
157  * @ref svm_msg_q_add_w_lock and the caller MUST unlock the queue once
158  * the message in enqueued.
159  *
160  * @param mq            message queue
161  * @param ring_index    ring on which the allocation should occur
162  * @param noblock       flag that indicates if request should block
163  * @param msg           pointer to message to be filled in
164  * @return              0 on success, negative number otherwise
165  */
166 int svm_msg_q_lock_and_alloc_msg_w_ring (svm_msg_q_t * mq, u32 ring_index,
167                                          u8 noblock, svm_msg_q_msg_t * msg);
168
169 /**
170  * Free message buffer
171  *
172  * Marks message buffer on ring as free.
173  *
174  * @param mq            message queue
175  * @param msg           message to be freed
176  */
177 void svm_msg_q_free_msg (svm_msg_q_t * mq, svm_msg_q_msg_t * msg);
178
179 /**
180  * Producer enqueue one message to queue
181  *
182  * Prior to calling this, the producer should've obtained a message buffer
183  * from one of the rings by calling @ref svm_msg_q_alloc_msg.
184  *
185  * @param mq            message queue
186  * @param msg           message (pointer to ring position) to be enqueued
187  * @param nowait        flag to indicate if request is blocking or not
188  * @return              success status
189  */
190 int svm_msg_q_add (svm_msg_q_t * mq, svm_msg_q_msg_t * msg, int nowait);
191
192 /**
193  * Producer enqueue one message to queue with mutex held
194  *
195  * Prior to calling this, the producer should've obtained a message buffer
196  * from one of the rings by calling @ref svm_msg_q_alloc_msg. It assumes
197  * the queue mutex is held.
198  *
199  * @param mq            message queue
200  * @param msg           message (pointer to ring position) to be enqueued
201  * @return              success status
202  */
203 void svm_msg_q_add_and_unlock (svm_msg_q_t * mq, svm_msg_q_msg_t * msg);
204
205 /**
206  * Consumer dequeue one message from queue
207  *
208  * This returns the message pointing to the data in the message rings.
209  * The consumer is expected to call @ref svm_msg_q_free_msg once it
210  * finishes processing/copies the message data.
211  *
212  * @param mq            message queue
213  * @param msg           pointer to structure where message is to be received
214  * @param cond          flag that indicates if request should block or not
215  * @param time          time to wait if condition it SVM_Q_TIMEDWAIT
216  * @return              success status
217  */
218 int svm_msg_q_sub (svm_msg_q_t * mq, svm_msg_q_msg_t * msg,
219                    svm_q_conditional_wait_t cond, u32 time);
220
221 /**
222  * Consumer dequeue one message from queue with mutex held
223  *
224  * Returns the message pointing to the data in the message rings under the
225  * assumption that the message queue lock is already held. The consumer is
226  * expected to call @ref svm_msg_q_free_msg once it finishes
227  * processing/copies the message data.
228  *
229  * @param mq            message queue
230  * @param msg           pointer to structure where message is to be received
231  * @return              success status
232  */
233 void svm_msg_q_sub_w_lock (svm_msg_q_t * mq, svm_msg_q_msg_t * msg);
234
235 /**
236  * Get data for message in queue
237  *
238  * @param mq            message queue
239  * @param msg           message for which the data is requested
240  * @return              pointer to data
241  */
242 void *svm_msg_q_msg_data (svm_msg_q_t * mq, svm_msg_q_msg_t * msg);
243
244 /**
245  * Get message queue ring
246  *
247  * @param mq            message queue
248  * @param ring_index    index of ring
249  * @return              pointer to ring
250  */
251 svm_msg_q_ring_t *svm_msg_q_ring (svm_msg_q_t * mq, u32 ring_index);
252
253 /**
254  * Set event fd for queue
255  *
256  * If set, queue will exclusively use eventfds for signaling. Moreover,
257  * afterwards, the queue should only be used in non-blocking mode. Waiting
258  * for events should be done externally using something like epoll.
259  *
260  * @param mq            message queue
261  * @param fd            consumer eventfd
262  */
263 void svm_msg_q_set_eventfd (svm_msg_q_t *mq, int fd);
264
265 /**
266  * Allocate event fd for queue
267  */
268 int svm_msg_q_alloc_eventfd (svm_msg_q_t *mq);
269
270 /**
271  * Format message queue, shows msg count for each ring
272  */
273 u8 *format_svm_msg_q (u8 *s, va_list *args);
274
275 /**
276  * Check length of message queue
277  */
278 static inline u32
279 svm_msg_q_size (svm_msg_q_t *mq)
280 {
281   return clib_atomic_load_relax_n (&mq->q.shr->cursize);
282 }
283
284 /**
285  * Check if message queue is full
286  */
287 static inline u8
288 svm_msg_q_is_full (svm_msg_q_t * mq)
289 {
290   return (svm_msg_q_size (mq) == mq->q.shr->maxsize);
291 }
292
293 static inline u8
294 svm_msg_q_ring_is_full (svm_msg_q_t * mq, u32 ring_index)
295 {
296   svm_msg_q_ring_t *ring = vec_elt_at_index (mq->rings, ring_index);
297   return (clib_atomic_load_relax_n (&ring->shr->cursize) >= ring->nitems);
298 }
299
300 /**
301  * Check if message queue is empty
302  */
303 static inline u8
304 svm_msg_q_is_empty (svm_msg_q_t * mq)
305 {
306   return (svm_msg_q_size (mq) == 0);
307 }
308
309 /**
310  * Check if message is invalid
311  */
312 static inline u8
313 svm_msg_q_msg_is_invalid (svm_msg_q_msg_t * msg)
314 {
315   return (msg->as_u64 == (u64) ~ 0);
316 }
317
318 /**
319  * Try locking message queue
320  */
321 static inline int
322 svm_msg_q_try_lock (svm_msg_q_t * mq)
323 {
324   int rv = pthread_mutex_trylock (&mq->q.shr->mutex);
325   if (PREDICT_FALSE (rv == EOWNERDEAD))
326     rv = pthread_mutex_consistent (&mq->q.shr->mutex);
327   return rv;
328 }
329
330 /**
331  * Lock, or block trying, the message queue
332  */
333 static inline int
334 svm_msg_q_lock (svm_msg_q_t * mq)
335 {
336   int rv = pthread_mutex_lock (&mq->q.shr->mutex);
337   if (PREDICT_FALSE (rv == EOWNERDEAD))
338     rv = pthread_mutex_consistent (&mq->q.shr->mutex);
339   return rv;
340 }
341
342 /**
343  * Unlock message queue
344  */
345 static inline void
346 svm_msg_q_unlock (svm_msg_q_t * mq)
347 {
348   pthread_mutex_unlock (&mq->q.shr->mutex);
349 }
350
351 /**
352  * Wait for message queue event
353  *
354  * Must be called with mutex held. The queue only works non-blocking
355  * with eventfds, so handle blocking calls as an exception here.
356  */
357 void svm_msg_q_wait (svm_msg_q_t *mq);
358
359 /**
360  * Timed wait for message queue event
361  *
362  * Must be called with mutex held.
363  *
364  * @param mq            message queue
365  * @param timeout       time in seconds
366  */
367 int svm_msg_q_timedwait (svm_msg_q_t *mq, double timeout);
368
369 static inline int
370 svm_msg_q_get_eventfd (svm_msg_q_t *mq)
371 {
372   return mq->q.evtfd;
373 }
374
375 #endif /* SRC_SVM_MESSAGE_QUEUE_H_ */
376
377 /*
378  * fd.io coding-style-patch-verification: ON
379  *
380  * Local Variables:
381  * eval: (c-set-style "gnu")
382  * End:
383  */