0780cca1c3235926f3fdf52fd669db9ed98304ff
[vpp.git] / src / svm / message_queue.h
1 /*
2  * Copyright (c) 2018-2019 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 /**
16  * @file
17  * @brief Unidirectional shared-memory multi-ring message queue
18  */
19
20 #ifndef SRC_SVM_MESSAGE_QUEUE_H_
21 #define SRC_SVM_MESSAGE_QUEUE_H_
22
23 #include <vppinfra/clib.h>
24 #include <vppinfra/error.h>
25 #include <vppinfra/lock.h>
26 #include <svm/queue.h>
27
28 typedef struct svm_msg_q_shr_queue_
29 {
30   pthread_mutex_t mutex;  /* 8 bytes */
31   pthread_cond_t condvar; /* 8 bytes */
32   u32 head;
33   u32 tail;
34   volatile u32 cursize;
35   u32 maxsize;
36   u32 elsize;
37   u32 pad;
38   u8 data[0];
39 } svm_msg_q_shared_queue_t;
40
41 typedef struct svm_msg_q_queue_
42 {
43   svm_msg_q_shared_queue_t *shr; /**< pointer to shared queue */
44   int evtfd;                     /**< producer/consumer eventfd */
45   clib_spinlock_t lock;          /**< private lock for multi-producer */
46 } svm_msg_q_queue_t;
47
48 typedef struct svm_msg_q_ring_shared_
49 {
50   volatile u32 cursize;                 /**< current size of the ring */
51   u32 nitems;                           /**< max size of the ring */
52   volatile u32 head;                    /**< current head (for dequeue) */
53   volatile u32 tail;                    /**< current tail (for enqueue) */
54   u32 elsize;                           /**< size of an element */
55   u8 data[0];                           /**< chunk of memory for msg data */
56 } svm_msg_q_ring_shared_t;
57
58 typedef struct svm_msg_q_ring_
59 {
60   u32 nitems;                   /**< max size of the ring */
61   u32 elsize;                   /**< size of an element */
62   svm_msg_q_ring_shared_t *shr; /**< ring in shared memory */
63 } __clib_packed svm_msg_q_ring_t;
64
65 typedef struct svm_msg_q_shared_
66 {
67   u32 n_rings;                   /**< number of rings after q */
68   u32 pad;                       /**< 8 byte alignment for q */
69   svm_msg_q_shared_queue_t q[0]; /**< queue for exchanging messages */
70 } __clib_packed svm_msg_q_shared_t;
71
72 typedef struct svm_msg_q_
73 {
74   svm_msg_q_queue_t q;                  /**< queue for exchanging messages */
75   svm_msg_q_ring_t *rings;              /**< rings with message data*/
76 } __clib_packed svm_msg_q_t;
77
78 typedef struct svm_msg_q_ring_cfg_
79 {
80   u32 nitems;
81   u32 elsize;
82   void *data;
83 } svm_msg_q_ring_cfg_t;
84
85 typedef struct svm_msg_q_cfg_
86 {
87   int consumer_pid;                     /**< pid of msg consumer */
88   u32 q_nitems;                         /**< msg queue size (not rings) */
89   u32 n_rings;                          /**< number of msg rings */
90   svm_msg_q_ring_cfg_t *ring_cfgs;      /**< array of ring cfgs */
91 } svm_msg_q_cfg_t;
92
93 typedef union
94 {
95   struct
96   {
97     u32 ring_index;                     /**< ring index, could be u8 */
98     u32 elt_index;                      /**< index in ring */
99   };
100   u64 as_u64;
101 } svm_msg_q_msg_t;
102
103 #define SVM_MQ_INVALID_MSG { .as_u64 = ~0 }
104
105 typedef enum svm_msg_q_wait_type_
106 {
107   SVM_MQ_WAIT_EMPTY,
108   SVM_MQ_WAIT_FULL
109 } svm_msg_q_wait_type_t;
110
111 /**
112  * Allocate message queue
113  *
114  * Allocates a message queue on the heap. Based on the configuration options,
115  * apart from the message queue this also allocates (one or multiple)
116  * shared-memory rings for the messages.
117  *
118  * @param cfg           configuration options: queue len, consumer pid,
119  *                      ring configs
120  * @return              message queue
121  */
122 svm_msg_q_shared_t *svm_msg_q_alloc (svm_msg_q_cfg_t *cfg);
123 svm_msg_q_shared_t *svm_msg_q_init (void *base, svm_msg_q_cfg_t *cfg);
124 uword svm_msg_q_size_to_alloc (svm_msg_q_cfg_t *cfg);
125
126 void svm_msg_q_attach (svm_msg_q_t *mq, void *smq_base);
127
128 /**
129  * Cleanup mq's private data
130  */
131 void svm_msg_q_cleanup (svm_msg_q_t *mq);
132
133 /**
134  * Free message queue
135  *
136  * @param mq            message queue to be freed
137  */
138 void svm_msg_q_free (svm_msg_q_t * mq);
139
140 /**
141  * Allocate message buffer
142  *
143  * Message is allocated on the first available ring capable of holding
144  * the requested number of bytes.
145  *
146  * @param mq            message queue
147  * @param nbytes        number of bytes needed for message
148  * @return              message structure pointing to the ring and position
149  *                      allocated
150  */
151 svm_msg_q_msg_t svm_msg_q_alloc_msg (svm_msg_q_t * mq, u32 nbytes);
152
153 /**
154  * Allocate message buffer on ring
155  *
156  * Message is allocated, on requested ring. The caller MUST check that
157  * the ring is not full.
158  *
159  * @param mq            message queue
160  * @param ring_index    ring on which the allocation should occur
161  * @return              message structure pointing to the ring and position
162  *                      allocated
163  */
164 svm_msg_q_msg_t svm_msg_q_alloc_msg_w_ring (svm_msg_q_t * mq, u32 ring_index);
165
166 /**
167  * Lock message queue and allocate message buffer on ring
168  *
169  * This should be used when multiple writers/readers are expected to
170  * compete for the rings/queue. Message should be enqueued by calling
171  * @ref svm_msg_q_add_w_lock and the caller MUST unlock the queue once
172  * the message in enqueued.
173  *
174  * @param mq            message queue
175  * @param ring_index    ring on which the allocation should occur
176  * @param noblock       flag that indicates if request should block
177  * @param msg           pointer to message to be filled in
178  * @return              0 on success, negative number otherwise
179  */
180 int svm_msg_q_lock_and_alloc_msg_w_ring (svm_msg_q_t * mq, u32 ring_index,
181                                          u8 noblock, svm_msg_q_msg_t * msg);
182
183 /**
184  * Free message buffer
185  *
186  * Marks message buffer on ring as free.
187  *
188  * @param mq            message queue
189  * @param msg           message to be freed
190  */
191 void svm_msg_q_free_msg (svm_msg_q_t * mq, svm_msg_q_msg_t * msg);
192
193 /**
194  * Producer enqueue one message to queue
195  *
196  * Prior to calling this, the producer should've obtained a message buffer
197  * from one of the rings by calling @ref svm_msg_q_alloc_msg.
198  *
199  * @param mq            message queue
200  * @param msg           message (pointer to ring position) to be enqueued
201  * @param nowait        flag to indicate if request is blocking or not
202  * @return              success status
203  */
204 int svm_msg_q_add (svm_msg_q_t * mq, svm_msg_q_msg_t * msg, int nowait);
205
206 /**
207  * Producer enqueue one message to queue with mutex held
208  *
209  * Prior to calling this, the producer should've obtained a message buffer
210  * from one of the rings by calling @ref svm_msg_q_alloc_msg. It assumes
211  * the queue mutex is held.
212  *
213  * @param mq            message queue
214  * @param msg           message (pointer to ring position) to be enqueued
215  * @return              success status
216  */
217 void svm_msg_q_add_and_unlock (svm_msg_q_t * mq, svm_msg_q_msg_t * msg);
218
219 /**
220  * Consumer dequeue one message from queue
221  *
222  * This returns the message pointing to the data in the message rings.
223  * Should only be used in single consumer scenarios as no locks are grabbed.
224  * The consumer is expected to call @ref svm_msg_q_free_msg once it
225  * finishes processing/copies the message data.
226  *
227  * @param mq            message queue
228  * @param msg           pointer to structure where message is to be received
229  * @param cond          flag that indicates if request should block or not
230  * @param time          time to wait if condition it SVM_Q_TIMEDWAIT
231  * @return              success status
232  */
233 int svm_msg_q_sub (svm_msg_q_t * mq, svm_msg_q_msg_t * msg,
234                    svm_q_conditional_wait_t cond, u32 time);
235
236 /**
237  * Consumer dequeue one message from queue
238  *
239  * Returns the message pointing to the data in the message rings. Should only
240  * be used in single consumer scenarios as no locks are grabbed. The consumer
241  * is expected to call @ref svm_msg_q_free_msg once it finishes
242  * processing/copies the message data.
243  *
244  * @param mq            message queue
245  * @param msg           pointer to structure where message is to be received
246  * @return              success status
247  */
248 int svm_msg_q_sub_raw (svm_msg_q_t *mq, svm_msg_q_msg_t *elem);
249
250 /**
251  * Consumer dequeue multiple messages from queue
252  *
253  * Returns the message pointing to the data in the message rings. Should only
254  * be used in single consumer scenarios as no locks are grabbed. The consumer
255  * is expected to call @ref svm_msg_q_free_msg once it finishes
256  * processing/copies the message data.
257  *
258  * @param mq            message queue
259  * @param msg_buf       pointer to array of messages to received
260  * @param n_msgs        lengt of msg_buf array
261  * @return              number of messages dequeued
262  */
263 int svm_msg_q_sub_raw_batch (svm_msg_q_t *mq, svm_msg_q_msg_t *msg_buf,
264                              u32 n_msgs);
265
266 /**
267  * Get data for message in queue
268  *
269  * @param mq            message queue
270  * @param msg           message for which the data is requested
271  * @return              pointer to data
272  */
273 void *svm_msg_q_msg_data (svm_msg_q_t * mq, svm_msg_q_msg_t * msg);
274
275 /**
276  * Get message queue ring
277  *
278  * @param mq            message queue
279  * @param ring_index    index of ring
280  * @return              pointer to ring
281  */
282 svm_msg_q_ring_t *svm_msg_q_ring (svm_msg_q_t * mq, u32 ring_index);
283
284 /**
285  * Set event fd for queue
286  *
287  * If set, queue will exclusively use eventfds for signaling. Moreover,
288  * afterwards, the queue should only be used in non-blocking mode. Waiting
289  * for events should be done externally using something like epoll.
290  *
291  * @param mq            message queue
292  * @param fd            consumer eventfd
293  */
294 void svm_msg_q_set_eventfd (svm_msg_q_t *mq, int fd);
295
296 /**
297  * Allocate event fd for queue
298  */
299 int svm_msg_q_alloc_eventfd (svm_msg_q_t *mq);
300
301 /**
302  * Format message queue, shows msg count for each ring
303  */
304 u8 *format_svm_msg_q (u8 *s, va_list *args);
305
306 /**
307  * Check length of message queue
308  */
309 static inline u32
310 svm_msg_q_size (svm_msg_q_t *mq)
311 {
312   return clib_atomic_load_relax_n (&mq->q.shr->cursize);
313 }
314
315 /**
316  * Check if message queue is full
317  */
318 static inline u8
319 svm_msg_q_is_full (svm_msg_q_t * mq)
320 {
321   return (svm_msg_q_size (mq) == mq->q.shr->maxsize);
322 }
323
324 static inline u8
325 svm_msg_q_ring_is_full (svm_msg_q_t * mq, u32 ring_index)
326 {
327   svm_msg_q_ring_t *ring = vec_elt_at_index (mq->rings, ring_index);
328   return (clib_atomic_load_relax_n (&ring->shr->cursize) >= ring->nitems);
329 }
330
331 static inline u8
332 svm_msg_q_or_ring_is_full (svm_msg_q_t *mq, u32 ring_index)
333 {
334   return (svm_msg_q_is_full (mq) || svm_msg_q_ring_is_full (mq, ring_index));
335 }
336
337 /**
338  * Check if message queue is empty
339  */
340 static inline u8
341 svm_msg_q_is_empty (svm_msg_q_t * mq)
342 {
343   return (svm_msg_q_size (mq) == 0);
344 }
345
346 /**
347  * Check if message is invalid
348  */
349 static inline u8
350 svm_msg_q_msg_is_invalid (svm_msg_q_msg_t * msg)
351 {
352   return (msg->as_u64 == (u64) ~ 0);
353 }
354
355 /**
356  * Try locking message queue
357  */
358 static inline int
359 svm_msg_q_try_lock (svm_msg_q_t * mq)
360 {
361   if (mq->q.evtfd == -1)
362     {
363       int rv = pthread_mutex_trylock (&mq->q.shr->mutex);
364       if (PREDICT_FALSE (rv == EOWNERDEAD))
365         rv = pthread_mutex_consistent (&mq->q.shr->mutex);
366       return rv;
367     }
368   else
369     {
370       return !clib_spinlock_trylock (&mq->q.lock);
371     }
372 }
373
374 /**
375  * Lock, or block trying, the message queue
376  */
377 static inline int
378 svm_msg_q_lock (svm_msg_q_t * mq)
379 {
380   if (mq->q.evtfd == -1)
381     {
382       int rv = pthread_mutex_lock (&mq->q.shr->mutex);
383       if (PREDICT_FALSE (rv == EOWNERDEAD))
384         rv = pthread_mutex_consistent (&mq->q.shr->mutex);
385       return rv;
386     }
387   else
388     {
389       clib_spinlock_lock (&mq->q.lock);
390       return 0;
391     }
392 }
393
394 /**
395  * Unlock message queue
396  */
397 static inline void
398 svm_msg_q_unlock (svm_msg_q_t * mq)
399 {
400   if (mq->q.evtfd == -1)
401     {
402       pthread_mutex_unlock (&mq->q.shr->mutex);
403     }
404   else
405     {
406       clib_spinlock_unlock (&mq->q.lock);
407     }
408 }
409
410 /**
411  * Wait for message queue event
412  *
413  * When eventfds are not configured, the shared memory mutex is locked
414  * before waiting on the condvar. Typically called by consumers.
415  */
416 int svm_msg_q_wait (svm_msg_q_t *mq, svm_msg_q_wait_type_t type);
417
418 /**
419  * Wait for message queue event as producer
420  *
421  * Similar to @ref svm_msg_q_wait but lock (mutex or spinlock) must
422  * be held. Should only be called by producers.
423  */
424 int svm_msg_q_wait_prod (svm_msg_q_t *mq);
425
426 /**
427  * Wait for message queue or ring event as producer
428  *
429  * Similar to @ref svm_msg_q_wait but lock (mutex or spinlock) must
430  * be held. Should only be called by producers.
431  */
432 int svm_msg_q_or_ring_wait_prod (svm_msg_q_t *mq, u32 ring_index);
433
434 /**
435  * Timed wait for message queue event
436  *
437  * Must be called with mutex held.
438  *
439  * @param mq            message queue
440  * @param timeout       time in seconds
441  */
442 int svm_msg_q_timedwait (svm_msg_q_t *mq, double timeout);
443
444 static inline int
445 svm_msg_q_get_eventfd (svm_msg_q_t *mq)
446 {
447   return mq->q.evtfd;
448 }
449
450 #endif /* SRC_SVM_MESSAGE_QUEUE_H_ */
451
452 /*
453  * fd.io coding-style-patch-verification: ON
454  *
455  * Local Variables:
456  * eval: (c-set-style "gnu")
457  * End:
458  */