session svm: non blocking mq
[vpp.git] / src / svm / message_queue.h
1 /*
2  * Copyright (c) 2018-2019 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 /**
16  * @file
17  * @brief Unidirectional shared-memory multi-ring message queue
18  */
19
20 #ifndef SRC_SVM_MESSAGE_QUEUE_H_
21 #define SRC_SVM_MESSAGE_QUEUE_H_
22
23 #include <vppinfra/clib.h>
24 #include <vppinfra/error.h>
25 #include <vppinfra/lock.h>
26 #include <svm/queue.h>
27
28 typedef struct svm_msg_q_shr_queue_
29 {
30   pthread_mutex_t mutex;  /* 8 bytes */
31   pthread_cond_t condvar; /* 8 bytes */
32   u32 head;
33   u32 tail;
34   volatile u32 cursize;
35   u32 maxsize;
36   u32 elsize;
37   u32 pad;
38   u8 data[0];
39 } svm_msg_q_shared_queue_t;
40
41 typedef struct svm_msg_q_queue_
42 {
43   svm_msg_q_shared_queue_t *shr; /**< pointer to shared queue */
44   int evtfd;                     /**< producer/consumer eventfd */
45   clib_spinlock_t lock;          /**< private lock for multi-producer */
46 } svm_msg_q_queue_t;
47
48 typedef struct svm_msg_q_ring_shared_
49 {
50   volatile u32 cursize;                 /**< current size of the ring */
51   u32 nitems;                           /**< max size of the ring */
52   volatile u32 head;                    /**< current head (for dequeue) */
53   volatile u32 tail;                    /**< current tail (for enqueue) */
54   u32 elsize;                           /**< size of an element */
55   u8 data[0];                           /**< chunk of memory for msg data */
56 } svm_msg_q_ring_shared_t;
57
58 typedef struct svm_msg_q_ring_
59 {
60   u32 nitems;                   /**< max size of the ring */
61   u32 elsize;                   /**< size of an element */
62   svm_msg_q_ring_shared_t *shr; /**< ring in shared memory */
63 } __clib_packed svm_msg_q_ring_t;
64
65 typedef struct svm_msg_q_shared_
66 {
67   u32 n_rings;                   /**< number of rings after q */
68   u32 pad;                       /**< 8 byte alignment for q */
69   svm_msg_q_shared_queue_t q[0]; /**< queue for exchanging messages */
70 } __clib_packed svm_msg_q_shared_t;
71
72 typedef struct svm_msg_q_
73 {
74   svm_msg_q_queue_t q;                  /**< queue for exchanging messages */
75   svm_msg_q_ring_t *rings;              /**< rings with message data*/
76 } __clib_packed svm_msg_q_t;
77
78 typedef struct svm_msg_q_ring_cfg_
79 {
80   u32 nitems;
81   u32 elsize;
82   void *data;
83 } svm_msg_q_ring_cfg_t;
84
85 typedef struct svm_msg_q_cfg_
86 {
87   int consumer_pid;                     /**< pid of msg consumer */
88   u32 q_nitems;                         /**< msg queue size (not rings) */
89   u32 n_rings;                          /**< number of msg rings */
90   svm_msg_q_ring_cfg_t *ring_cfgs;      /**< array of ring cfgs */
91 } svm_msg_q_cfg_t;
92
93 typedef union
94 {
95   struct
96   {
97     u32 ring_index;                     /**< ring index, could be u8 */
98     u32 elt_index;                      /**< index in ring */
99   };
100   u64 as_u64;
101 } svm_msg_q_msg_t;
102
103 #define SVM_MQ_INVALID_MSG { .as_u64 = ~0 }
104
105 typedef enum svm_msg_q_wait_type_
106 {
107   SVM_MQ_WAIT_EMPTY,
108   SVM_MQ_WAIT_FULL
109 } svm_msg_q_wait_type_t;
110
111 /**
112  * Allocate message queue
113  *
114  * Allocates a message queue on the heap. Based on the configuration options,
115  * apart from the message queue this also allocates (one or multiple)
116  * shared-memory rings for the messages.
117  *
118  * @param cfg           configuration options: queue len, consumer pid,
119  *                      ring configs
120  * @return              message queue
121  */
122 svm_msg_q_shared_t *svm_msg_q_alloc (svm_msg_q_cfg_t *cfg);
123 svm_msg_q_shared_t *svm_msg_q_init (void *base, svm_msg_q_cfg_t *cfg);
124 uword svm_msg_q_size_to_alloc (svm_msg_q_cfg_t *cfg);
125
126 void svm_msg_q_attach (svm_msg_q_t *mq, void *smq_base);
127
128 /**
129  * Free message queue
130  *
131  * @param mq            message queue to be freed
132  */
133 void svm_msg_q_free (svm_msg_q_t * mq);
134
135 /**
136  * Allocate message buffer
137  *
138  * Message is allocated on the first available ring capable of holding
139  * the requested number of bytes.
140  *
141  * @param mq            message queue
142  * @param nbytes        number of bytes needed for message
143  * @return              message structure pointing to the ring and position
144  *                      allocated
145  */
146 svm_msg_q_msg_t svm_msg_q_alloc_msg (svm_msg_q_t * mq, u32 nbytes);
147
148 /**
149  * Allocate message buffer on ring
150  *
151  * Message is allocated, on requested ring. The caller MUST check that
152  * the ring is not full.
153  *
154  * @param mq            message queue
155  * @param ring_index    ring on which the allocation should occur
156  * @return              message structure pointing to the ring and position
157  *                      allocated
158  */
159 svm_msg_q_msg_t svm_msg_q_alloc_msg_w_ring (svm_msg_q_t * mq, u32 ring_index);
160
161 /**
162  * Lock message queue and allocate message buffer on ring
163  *
164  * This should be used when multiple writers/readers are expected to
165  * compete for the rings/queue. Message should be enqueued by calling
166  * @ref svm_msg_q_add_w_lock and the caller MUST unlock the queue once
167  * the message in enqueued.
168  *
169  * @param mq            message queue
170  * @param ring_index    ring on which the allocation should occur
171  * @param noblock       flag that indicates if request should block
172  * @param msg           pointer to message to be filled in
173  * @return              0 on success, negative number otherwise
174  */
175 int svm_msg_q_lock_and_alloc_msg_w_ring (svm_msg_q_t * mq, u32 ring_index,
176                                          u8 noblock, svm_msg_q_msg_t * msg);
177
178 /**
179  * Free message buffer
180  *
181  * Marks message buffer on ring as free.
182  *
183  * @param mq            message queue
184  * @param msg           message to be freed
185  */
186 void svm_msg_q_free_msg (svm_msg_q_t * mq, svm_msg_q_msg_t * msg);
187
188 /**
189  * Producer enqueue one message to queue
190  *
191  * Prior to calling this, the producer should've obtained a message buffer
192  * from one of the rings by calling @ref svm_msg_q_alloc_msg.
193  *
194  * @param mq            message queue
195  * @param msg           message (pointer to ring position) to be enqueued
196  * @param nowait        flag to indicate if request is blocking or not
197  * @return              success status
198  */
199 int svm_msg_q_add (svm_msg_q_t * mq, svm_msg_q_msg_t * msg, int nowait);
200
201 /**
202  * Producer enqueue one message to queue with mutex held
203  *
204  * Prior to calling this, the producer should've obtained a message buffer
205  * from one of the rings by calling @ref svm_msg_q_alloc_msg. It assumes
206  * the queue mutex is held.
207  *
208  * @param mq            message queue
209  * @param msg           message (pointer to ring position) to be enqueued
210  * @return              success status
211  */
212 void svm_msg_q_add_and_unlock (svm_msg_q_t * mq, svm_msg_q_msg_t * msg);
213
214 /**
215  * Consumer dequeue one message from queue
216  *
217  * This returns the message pointing to the data in the message rings.
218  * Should only be used in single consumer scenarios as no locks are grabbed.
219  * The consumer is expected to call @ref svm_msg_q_free_msg once it
220  * finishes processing/copies the message data.
221  *
222  * @param mq            message queue
223  * @param msg           pointer to structure where message is to be received
224  * @param cond          flag that indicates if request should block or not
225  * @param time          time to wait if condition it SVM_Q_TIMEDWAIT
226  * @return              success status
227  */
228 int svm_msg_q_sub (svm_msg_q_t * mq, svm_msg_q_msg_t * msg,
229                    svm_q_conditional_wait_t cond, u32 time);
230
231 /**
232  * Consumer dequeue one message from queue
233  *
234  * Returns the message pointing to the data in the message rings. Should only
235  * be used in single consumer scenarios as no locks are grabbed. The consumer
236  * is expected to call @ref svm_msg_q_free_msg once it finishes
237  * processing/copies the message data.
238  *
239  * @param mq            message queue
240  * @param msg           pointer to structure where message is to be received
241  * @return              success status
242  */
243 int svm_msg_q_sub_raw (svm_msg_q_t *mq, svm_msg_q_msg_t *elem);
244
245 /**
246  * Consumer dequeue multiple messages from queue
247  *
248  * Returns the message pointing to the data in the message rings. Should only
249  * be used in single consumer scenarios as no locks are grabbed. The consumer
250  * is expected to call @ref svm_msg_q_free_msg once it finishes
251  * processing/copies the message data.
252  *
253  * @param mq            message queue
254  * @param msg_buf       pointer to array of messages to received
255  * @param n_msgs        lengt of msg_buf array
256  * @return              number of messages dequeued
257  */
258 int svm_msg_q_sub_raw_batch (svm_msg_q_t *mq, svm_msg_q_msg_t *msg_buf,
259                              u32 n_msgs);
260
261 /**
262  * Get data for message in queue
263  *
264  * @param mq            message queue
265  * @param msg           message for which the data is requested
266  * @return              pointer to data
267  */
268 void *svm_msg_q_msg_data (svm_msg_q_t * mq, svm_msg_q_msg_t * msg);
269
270 /**
271  * Get message queue ring
272  *
273  * @param mq            message queue
274  * @param ring_index    index of ring
275  * @return              pointer to ring
276  */
277 svm_msg_q_ring_t *svm_msg_q_ring (svm_msg_q_t * mq, u32 ring_index);
278
279 /**
280  * Set event fd for queue
281  *
282  * If set, queue will exclusively use eventfds for signaling. Moreover,
283  * afterwards, the queue should only be used in non-blocking mode. Waiting
284  * for events should be done externally using something like epoll.
285  *
286  * @param mq            message queue
287  * @param fd            consumer eventfd
288  */
289 void svm_msg_q_set_eventfd (svm_msg_q_t *mq, int fd);
290
291 /**
292  * Allocate event fd for queue
293  */
294 int svm_msg_q_alloc_eventfd (svm_msg_q_t *mq);
295
296 /**
297  * Format message queue, shows msg count for each ring
298  */
299 u8 *format_svm_msg_q (u8 *s, va_list *args);
300
301 /**
302  * Check length of message queue
303  */
304 static inline u32
305 svm_msg_q_size (svm_msg_q_t *mq)
306 {
307   return clib_atomic_load_relax_n (&mq->q.shr->cursize);
308 }
309
310 /**
311  * Check if message queue is full
312  */
313 static inline u8
314 svm_msg_q_is_full (svm_msg_q_t * mq)
315 {
316   return (svm_msg_q_size (mq) == mq->q.shr->maxsize);
317 }
318
319 static inline u8
320 svm_msg_q_ring_is_full (svm_msg_q_t * mq, u32 ring_index)
321 {
322   svm_msg_q_ring_t *ring = vec_elt_at_index (mq->rings, ring_index);
323   return (clib_atomic_load_relax_n (&ring->shr->cursize) >= ring->nitems);
324 }
325
326 /**
327  * Check if message queue is empty
328  */
329 static inline u8
330 svm_msg_q_is_empty (svm_msg_q_t * mq)
331 {
332   return (svm_msg_q_size (mq) == 0);
333 }
334
335 /**
336  * Check if message is invalid
337  */
338 static inline u8
339 svm_msg_q_msg_is_invalid (svm_msg_q_msg_t * msg)
340 {
341   return (msg->as_u64 == (u64) ~ 0);
342 }
343
344 /**
345  * Try locking message queue
346  */
347 static inline int
348 svm_msg_q_try_lock (svm_msg_q_t * mq)
349 {
350   if (mq->q.evtfd == -1)
351     {
352       int rv = pthread_mutex_trylock (&mq->q.shr->mutex);
353       if (PREDICT_FALSE (rv == EOWNERDEAD))
354         rv = pthread_mutex_consistent (&mq->q.shr->mutex);
355       return rv;
356     }
357   else
358     {
359       return !clib_spinlock_trylock (&mq->q.lock);
360     }
361 }
362
363 /**
364  * Lock, or block trying, the message queue
365  */
366 static inline int
367 svm_msg_q_lock (svm_msg_q_t * mq)
368 {
369   if (mq->q.evtfd == -1)
370     {
371       int rv = pthread_mutex_lock (&mq->q.shr->mutex);
372       if (PREDICT_FALSE (rv == EOWNERDEAD))
373         rv = pthread_mutex_consistent (&mq->q.shr->mutex);
374       return rv;
375     }
376   else
377     {
378       clib_spinlock_lock (&mq->q.lock);
379       return 0;
380     }
381 }
382
383 /**
384  * Unlock message queue
385  */
386 static inline void
387 svm_msg_q_unlock (svm_msg_q_t * mq)
388 {
389   if (mq->q.evtfd == -1)
390     {
391       pthread_mutex_unlock (&mq->q.shr->mutex);
392     }
393   else
394     {
395       clib_spinlock_unlock (&mq->q.lock);
396     }
397 }
398
399 /**
400  * Wait for message queue event
401  *
402  * Must be called with mutex held. The queue only works non-blocking
403  * with eventfds, so handle blocking calls as an exception here.
404  */
405 int svm_msg_q_wait (svm_msg_q_t *mq, svm_msg_q_wait_type_t type);
406
407 /**
408  * Timed wait for message queue event
409  *
410  * Must be called with mutex held.
411  *
412  * @param mq            message queue
413  * @param timeout       time in seconds
414  */
415 int svm_msg_q_timedwait (svm_msg_q_t *mq, double timeout);
416
417 static inline int
418 svm_msg_q_get_eventfd (svm_msg_q_t *mq)
419 {
420   return mq->q.evtfd;
421 }
422
423 #endif /* SRC_SVM_MESSAGE_QUEUE_H_ */
424
425 /*
426  * fd.io coding-style-patch-verification: ON
427  *
428  * Local Variables:
429  * eval: (c-set-style "gnu")
430  * End:
431  */