session: remove ipv6 lookup threading assert
[vpp.git] / src / svm / message_queue.h
1 /*
2  * Copyright (c) 2018-2019 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 /**
16  * @file
17  * @brief Unidirectional shared-memory multi-ring message queue
18  */
19
20 #ifndef SRC_SVM_MESSAGE_QUEUE_H_
21 #define SRC_SVM_MESSAGE_QUEUE_H_
22
23 #include <vppinfra/clib.h>
24 #include <vppinfra/error.h>
25 #include <vppinfra/lock.h>
26 #include <svm/queue.h>
27
28 typedef struct svm_msg_q_shr_queue_
29 {
30   pthread_mutex_t mutex;  /* 8 bytes */
31   pthread_cond_t condvar; /* 8 bytes */
32   u32 head;
33   u32 tail;
34   volatile u32 cursize;
35   u32 maxsize;
36   u32 elsize;
37   u32 pad;
38   u8 data[0];
39 } svm_msg_q_shared_queue_t;
40
41 typedef struct svm_msg_q_queue_
42 {
43   svm_msg_q_shared_queue_t *shr; /**< pointer to shared queue */
44   int evtfd;                     /**< producer/consumer eventfd */
45   clib_spinlock_t lock;          /**< private lock for multi-producer */
46 } svm_msg_q_queue_t;
47
48 typedef struct svm_msg_q_ring_shared_
49 {
50   volatile u32 cursize;                 /**< current size of the ring */
51   u32 nitems;                           /**< max size of the ring */
52   volatile u32 head;                    /**< current head (for dequeue) */
53   volatile u32 tail;                    /**< current tail (for enqueue) */
54   u32 elsize;                           /**< size of an element */
55   u8 data[0];                           /**< chunk of memory for msg data */
56 } svm_msg_q_ring_shared_t;
57
58 typedef struct svm_msg_q_ring_
59 {
60   u32 nitems;                   /**< max size of the ring */
61   u32 elsize;                   /**< size of an element */
62   svm_msg_q_ring_shared_t *shr; /**< ring in shared memory */
63 } __clib_packed svm_msg_q_ring_t;
64
65 typedef struct svm_msg_q_shared_
66 {
67   u32 n_rings;                   /**< number of rings after q */
68   u32 pad;                       /**< 8 byte alignment for q */
69   svm_msg_q_shared_queue_t q[0]; /**< queue for exchanging messages */
70 } __clib_packed svm_msg_q_shared_t;
71
72 typedef struct svm_msg_q_
73 {
74   svm_msg_q_queue_t q;                  /**< queue for exchanging messages */
75   svm_msg_q_ring_t *rings;              /**< rings with message data*/
76 } __clib_packed svm_msg_q_t;
77
78 typedef struct svm_msg_q_ring_cfg_
79 {
80   u32 nitems;
81   u32 elsize;
82   void *data;
83 } svm_msg_q_ring_cfg_t;
84
85 typedef struct svm_msg_q_cfg_
86 {
87   int consumer_pid;                     /**< pid of msg consumer */
88   u32 q_nitems;                         /**< msg queue size (not rings) */
89   u32 n_rings;                          /**< number of msg rings */
90   svm_msg_q_ring_cfg_t *ring_cfgs;      /**< array of ring cfgs */
91 } svm_msg_q_cfg_t;
92
93 typedef union
94 {
95   struct
96   {
97     u32 ring_index;                     /**< ring index, could be u8 */
98     u32 elt_index;                      /**< index in ring */
99   };
100   u64 as_u64;
101 } svm_msg_q_msg_t;
102
103 #define SVM_MQ_INVALID_MSG { .as_u64 = ~0 }
104
105 typedef enum svm_msg_q_wait_type_
106 {
107   SVM_MQ_WAIT_EMPTY,
108   SVM_MQ_WAIT_FULL
109 } svm_msg_q_wait_type_t;
110
111 /**
112  * Allocate message queue
113  *
114  * Allocates a message queue on the heap. Based on the configuration options,
115  * apart from the message queue this also allocates (one or multiple)
116  * shared-memory rings for the messages.
117  *
118  * @param cfg           configuration options: queue len, consumer pid,
119  *                      ring configs
120  * @return              message queue
121  */
122 svm_msg_q_shared_t *svm_msg_q_alloc (svm_msg_q_cfg_t *cfg);
123 svm_msg_q_shared_t *svm_msg_q_init (void *base, svm_msg_q_cfg_t *cfg);
124 uword svm_msg_q_size_to_alloc (svm_msg_q_cfg_t *cfg);
125
126 void svm_msg_q_attach (svm_msg_q_t *mq, void *smq_base);
127
128 /**
129  * Cleanup mq's private data
130  */
131 void svm_msg_q_cleanup (svm_msg_q_t *mq);
132
133 /**
134  * Free message queue
135  *
136  * @param mq            message queue to be freed
137  */
138 void svm_msg_q_free (svm_msg_q_t * mq);
139
140 /**
141  * Allocate message buffer
142  *
143  * Message is allocated on the first available ring capable of holding
144  * the requested number of bytes.
145  *
146  * @param mq            message queue
147  * @param nbytes        number of bytes needed for message
148  * @return              message structure pointing to the ring and position
149  *                      allocated
150  */
151 svm_msg_q_msg_t svm_msg_q_alloc_msg (svm_msg_q_t * mq, u32 nbytes);
152
153 /**
154  * Allocate message buffer on ring
155  *
156  * Message is allocated, on requested ring. The caller MUST check that
157  * the ring is not full.
158  *
159  * @param mq            message queue
160  * @param ring_index    ring on which the allocation should occur
161  * @return              message structure pointing to the ring and position
162  *                      allocated
163  */
164 svm_msg_q_msg_t svm_msg_q_alloc_msg_w_ring (svm_msg_q_t * mq, u32 ring_index);
165
166 /**
167  * Lock message queue and allocate message buffer on ring
168  *
169  * This should be used when multiple writers/readers are expected to
170  * compete for the rings/queue. Message should be enqueued by calling
171  * @ref svm_msg_q_add_w_lock and the caller MUST unlock the queue once
172  * the message in enqueued.
173  *
174  * @param mq            message queue
175  * @param ring_index    ring on which the allocation should occur
176  * @param noblock       flag that indicates if request should block
177  * @param msg           pointer to message to be filled in
178  * @return              0 on success, negative number otherwise
179  */
180 int svm_msg_q_lock_and_alloc_msg_w_ring (svm_msg_q_t * mq, u32 ring_index,
181                                          u8 noblock, svm_msg_q_msg_t * msg);
182
183 /**
184  * Free message buffer
185  *
186  * Marks message buffer on ring as free.
187  *
188  * @param mq            message queue
189  * @param msg           message to be freed
190  */
191 void svm_msg_q_free_msg (svm_msg_q_t * mq, svm_msg_q_msg_t * msg);
192
193 /**
194  * Producer enqueue one message to queue
195  *
196  * Must be called with mq locked. Prior to calling this, the producer should've
197  * obtained a message buffer from one of the rings.
198  *
199  * @param mq            message queue
200  * @param msg           message to be enqueued
201  */
202 void svm_msg_q_add_raw (svm_msg_q_t *mq, svm_msg_q_msg_t *msg);
203
204 /**
205  * Producer enqueue one message to queue
206  *
207  * Prior to calling this, the producer should've obtained a message buffer
208  * from one of the rings by calling @ref svm_msg_q_alloc_msg.
209  *
210  * @param mq            message queue
211  * @param msg           message (pointer to ring position) to be enqueued
212  * @param nowait        flag to indicate if request is blocking or not
213  * @return              success status
214  */
215 int svm_msg_q_add (svm_msg_q_t * mq, svm_msg_q_msg_t * msg, int nowait);
216
217 /**
218  * Producer enqueue one message to queue with mutex held
219  *
220  * Prior to calling this, the producer should've obtained a message buffer
221  * from one of the rings by calling @ref svm_msg_q_alloc_msg. It assumes
222  * the queue mutex is held.
223  *
224  * @param mq            message queue
225  * @param msg           message (pointer to ring position) to be enqueued
226  * @return              success status
227  */
228 void svm_msg_q_add_and_unlock (svm_msg_q_t * mq, svm_msg_q_msg_t * msg);
229
230 /**
231  * Consumer dequeue one message from queue
232  *
233  * This returns the message pointing to the data in the message rings.
234  * Should only be used in single consumer scenarios as no locks are grabbed.
235  * The consumer is expected to call @ref svm_msg_q_free_msg once it
236  * finishes processing/copies the message data.
237  *
238  * @param mq            message queue
239  * @param msg           pointer to structure where message is to be received
240  * @param cond          flag that indicates if request should block or not
241  * @param time          time to wait if condition it SVM_Q_TIMEDWAIT
242  * @return              success status
243  */
244 int svm_msg_q_sub (svm_msg_q_t * mq, svm_msg_q_msg_t * msg,
245                    svm_q_conditional_wait_t cond, u32 time);
246
247 /**
248  * Consumer dequeue one message from queue
249  *
250  * Returns the message pointing to the data in the message rings. Should only
251  * be used in single consumer scenarios as no locks are grabbed. The consumer
252  * is expected to call @ref svm_msg_q_free_msg once it finishes
253  * processing/copies the message data.
254  *
255  * @param mq            message queue
256  * @param msg           pointer to structure where message is to be received
257  * @return              success status
258  */
259 int svm_msg_q_sub_raw (svm_msg_q_t *mq, svm_msg_q_msg_t *elem);
260
261 /**
262  * Consumer dequeue multiple messages from queue
263  *
264  * Returns the message pointing to the data in the message rings. Should only
265  * be used in single consumer scenarios as no locks are grabbed. The consumer
266  * is expected to call @ref svm_msg_q_free_msg once it finishes
267  * processing/copies the message data.
268  *
269  * @param mq            message queue
270  * @param msg_buf       pointer to array of messages to received
271  * @param n_msgs        lengt of msg_buf array
272  * @return              number of messages dequeued
273  */
274 int svm_msg_q_sub_raw_batch (svm_msg_q_t *mq, svm_msg_q_msg_t *msg_buf,
275                              u32 n_msgs);
276
277 /**
278  * Get data for message in queue
279  *
280  * @param mq            message queue
281  * @param msg           message for which the data is requested
282  * @return              pointer to data
283  */
284 void *svm_msg_q_msg_data (svm_msg_q_t * mq, svm_msg_q_msg_t * msg);
285
286 /**
287  * Get message queue ring
288  *
289  * @param mq            message queue
290  * @param ring_index    index of ring
291  * @return              pointer to ring
292  */
293 svm_msg_q_ring_t *svm_msg_q_ring (svm_msg_q_t * mq, u32 ring_index);
294
295 /**
296  * Set event fd for queue
297  *
298  * If set, queue will exclusively use eventfds for signaling. Moreover,
299  * afterwards, the queue should only be used in non-blocking mode. Waiting
300  * for events should be done externally using something like epoll.
301  *
302  * @param mq            message queue
303  * @param fd            consumer eventfd
304  */
305 void svm_msg_q_set_eventfd (svm_msg_q_t *mq, int fd);
306
307 /**
308  * Allocate event fd for queue
309  */
310 int svm_msg_q_alloc_eventfd (svm_msg_q_t *mq);
311
312 /**
313  * Format message queue, shows msg count for each ring
314  */
315 u8 *format_svm_msg_q (u8 *s, va_list *args);
316
317 /**
318  * Check length of message queue
319  */
320 static inline u32
321 svm_msg_q_size (svm_msg_q_t *mq)
322 {
323   return clib_atomic_load_relax_n (&mq->q.shr->cursize);
324 }
325
326 /**
327  * Check if message queue is full
328  */
329 static inline u8
330 svm_msg_q_is_full (svm_msg_q_t * mq)
331 {
332   return (svm_msg_q_size (mq) == mq->q.shr->maxsize);
333 }
334
335 static inline u8
336 svm_msg_q_ring_is_full (svm_msg_q_t * mq, u32 ring_index)
337 {
338   svm_msg_q_ring_t *ring = vec_elt_at_index (mq->rings, ring_index);
339   return (clib_atomic_load_relax_n (&ring->shr->cursize) >= ring->nitems);
340 }
341
342 static inline u8
343 svm_msg_q_or_ring_is_full (svm_msg_q_t *mq, u32 ring_index)
344 {
345   return (svm_msg_q_is_full (mq) || svm_msg_q_ring_is_full (mq, ring_index));
346 }
347
348 /**
349  * Check if message queue is empty
350  */
351 static inline u8
352 svm_msg_q_is_empty (svm_msg_q_t * mq)
353 {
354   return (svm_msg_q_size (mq) == 0);
355 }
356
357 /**
358  * Check if message is invalid
359  */
360 static inline u8
361 svm_msg_q_msg_is_invalid (svm_msg_q_msg_t * msg)
362 {
363   return (msg->as_u64 == (u64) ~ 0);
364 }
365
366 /**
367  * Try locking message queue
368  */
369 static inline int
370 svm_msg_q_try_lock (svm_msg_q_t * mq)
371 {
372   if (mq->q.evtfd == -1)
373     {
374       int rv = pthread_mutex_trylock (&mq->q.shr->mutex);
375       if (PREDICT_FALSE (rv == EOWNERDEAD))
376         rv = pthread_mutex_consistent (&mq->q.shr->mutex);
377       return rv;
378     }
379   else
380     {
381       return !clib_spinlock_trylock (&mq->q.lock);
382     }
383 }
384
385 /**
386  * Lock, or block trying, the message queue
387  */
388 static inline int
389 svm_msg_q_lock (svm_msg_q_t * mq)
390 {
391   if (mq->q.evtfd == -1)
392     {
393       int rv = pthread_mutex_lock (&mq->q.shr->mutex);
394       if (PREDICT_FALSE (rv == EOWNERDEAD))
395         rv = pthread_mutex_consistent (&mq->q.shr->mutex);
396       return rv;
397     }
398   else
399     {
400       clib_spinlock_lock (&mq->q.lock);
401       return 0;
402     }
403 }
404
405 /**
406  * Unlock message queue
407  */
408 static inline void
409 svm_msg_q_unlock (svm_msg_q_t * mq)
410 {
411   if (mq->q.evtfd == -1)
412     {
413       pthread_mutex_unlock (&mq->q.shr->mutex);
414     }
415   else
416     {
417       clib_spinlock_unlock (&mq->q.lock);
418     }
419 }
420
421 /**
422  * Wait for message queue event
423  *
424  * When eventfds are not configured, the shared memory mutex is locked
425  * before waiting on the condvar. Typically called by consumers.
426  */
427 int svm_msg_q_wait (svm_msg_q_t *mq, svm_msg_q_wait_type_t type);
428
429 /**
430  * Wait for message queue event as producer
431  *
432  * Similar to @ref svm_msg_q_wait but lock (mutex or spinlock) must
433  * be held. Should only be called by producers.
434  */
435 int svm_msg_q_wait_prod (svm_msg_q_t *mq);
436
437 /**
438  * Wait for message queue or ring event as producer
439  *
440  * Similar to @ref svm_msg_q_wait but lock (mutex or spinlock) must
441  * be held. Should only be called by producers.
442  */
443 int svm_msg_q_or_ring_wait_prod (svm_msg_q_t *mq, u32 ring_index);
444
445 /**
446  * Timed wait for message queue event
447  *
448  * Must be called with mutex held.
449  *
450  * @param mq            message queue
451  * @param timeout       time in seconds
452  */
453 int svm_msg_q_timedwait (svm_msg_q_t *mq, double timeout);
454
455 static inline int
456 svm_msg_q_get_eventfd (svm_msg_q_t *mq)
457 {
458   return mq->q.evtfd;
459 }
460
461 #endif /* SRC_SVM_MESSAGE_QUEUE_H_ */
462
463 /*
464  * fd.io coding-style-patch-verification: ON
465  *
466  * Local Variables:
467  * eval: (c-set-style "gnu")
468  * End:
469  */