vcl: use events for epoll/select/read/write
[vpp.git] / src / svm / message_queue.h
1 /*
2  * Copyright (c) 2018 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 /**
16  * @file
17  * @brief Unidirectional shared-memory multi-ring message queue
18  */
19
20 #ifndef SRC_SVM_MESSAGE_QUEUE_H_
21 #define SRC_SVM_MESSAGE_QUEUE_H_
22
23 #include <vppinfra/clib.h>
24 #include <vppinfra/error.h>
25 #include <vppinfra/time.h>
26 #include <svm/queue.h>
27
28 typedef struct svm_msg_q_ring_
29 {
30   volatile u32 cursize;                 /**< current size of the ring */
31   u32 nitems;                           /**< max size of the ring */
32   volatile u32 head;                    /**< current head (for dequeue) */
33   volatile u32 tail;                    /**< current tail (for enqueue) */
34   u32 elsize;                           /**< size of an element */
35   u8 *data;                             /**< chunk of memory for msg data */
36 } __clib_packed svm_msg_q_ring_t;
37
38 typedef struct svm_msg_q_
39 {
40   svm_queue_t *q;                       /**< queue for exchanging messages */
41   svm_msg_q_ring_t *rings;              /**< rings with message data*/
42 } __clib_packed svm_msg_q_t;
43
44 typedef struct svm_msg_q_ring_cfg_
45 {
46   u32 nitems;
47   u32 elsize;
48   void *data;
49 } svm_msg_q_ring_cfg_t;
50
51 typedef struct svm_msg_q_cfg_
52 {
53   int consumer_pid;                     /**< pid of msg consumer */
54   u32 q_nitems;                         /**< msg queue size (not rings) */
55   u32 n_rings;                          /**< number of msg rings */
56   svm_msg_q_ring_cfg_t *ring_cfgs;      /**< array of ring cfgs */
57 } svm_msg_q_cfg_t;
58
59 typedef union
60 {
61   struct
62   {
63     u32 ring_index;                     /**< ring index, could be u8 */
64     u32 elt_index;                      /**< index in ring */
65   };
66   u64 as_u64;
67 } svm_msg_q_msg_t;
68
69 #define SVM_MQ_INVALID_MSG { .as_u64 = ~0 }
70 /**
71  * Allocate message queue
72  *
73  * Allocates a message queue on the heap. Based on the configuration options,
74  * apart from the message queue this also allocates (one or multiple)
75  * shared-memory rings for the messages.
76  *
77  * @param cfg           configuration options: queue len, consumer pid,
78  *                      ring configs
79  * @return              message queue
80  */
81 svm_msg_q_t *svm_msg_q_alloc (svm_msg_q_cfg_t * cfg);
82
83 /**
84  * Free message queue
85  *
86  * @param mq            message queue to be freed
87  */
88 void svm_msg_q_free (svm_msg_q_t * mq);
89
90 /**
91  * Allocate message buffer
92  *
93  * Message is allocated on the first available ring capable of holding
94  * the requested number of bytes.
95  *
96  * @param mq            message queue
97  * @param nbytes        number of bytes needed for message
98  * @return              message structure pointing to the ring and position
99  *                      allocated
100  */
101 svm_msg_q_msg_t svm_msg_q_alloc_msg (svm_msg_q_t * mq, u32 nbytes);
102
103 /**
104  * Allocate message buffer on ring
105  *
106  * Message is allocated, on requested ring. The caller MUST check that
107  * the ring is not full.
108  *
109  * @param mq            message queue
110  * @param ring_index    ring on which the allocation should occur
111  * @return              message structure pointing to the ring and position
112  *                      allocated
113  */
114 svm_msg_q_msg_t svm_msg_q_alloc_msg_w_ring (svm_msg_q_t * mq, u32 ring_index);
115
116 /**
117  * Lock message queue and allocate message buffer on ring
118  *
119  * This should be used when multiple writers/readers are expected to
120  * compete for the rings/queue. Message should be enqueued by calling
121  * @ref svm_msg_q_add_w_lock and the caller MUST unlock the queue once
122  * the message in enqueued.
123  *
124  * @param mq            message queue
125  * @param ring_index    ring on which the allocation should occur
126  * @param noblock       flag that indicates if request should block
127  * @param msg           pointer to message to be filled in
128  * @return              0 on success, negative number otherwise
129  */
130 int svm_msg_q_lock_and_alloc_msg_w_ring (svm_msg_q_t * mq, u32 ring_index,
131                                          u8 noblock, svm_msg_q_msg_t * msg);
132
133 /**
134  * Free message buffer
135  *
136  * Marks message buffer on ring as free.
137  *
138  * @param mq            message queue
139  * @param msg           message to be freed
140  */
141 void svm_msg_q_free_msg (svm_msg_q_t * mq, svm_msg_q_msg_t * msg);
142
143 /**
144  * Producer enqueue one message to queue
145  *
146  * Prior to calling this, the producer should've obtained a message buffer
147  * from one of the rings by calling @ref svm_msg_q_alloc_msg.
148  *
149  * @param mq            message queue
150  * @param msg           message (pointer to ring position) to be enqueued
151  * @param nowait        flag to indicate if request is blocking or not
152  * @return              success status
153  */
154 int svm_msg_q_add (svm_msg_q_t * mq, svm_msg_q_msg_t * msg, int nowait);
155
156 /**
157  * Producer enqueue one message to queue with mutex held
158  *
159  * Prior to calling this, the producer should've obtained a message buffer
160  * from one of the rings by calling @ref svm_msg_q_alloc_msg. It assumes
161  * the queue mutex is held.
162  *
163  * @param mq            message queue
164  * @param msg           message (pointer to ring position) to be enqueued
165  * @return              success status
166  */
167 void svm_msg_q_add_and_unlock (svm_msg_q_t * mq, svm_msg_q_msg_t * msg);
168
169 /**
170  * Consumer dequeue one message from queue
171  *
172  * This returns the message pointing to the data in the message rings.
173  * The consumer is expected to call @ref svm_msg_q_free_msg once it
174  * finishes processing/copies the message data.
175  *
176  * @param mq            message queue
177  * @param msg           pointer to structure where message is to be received
178  * @param cond          flag that indicates if request should block or not
179  * @param time          time to wait if condition it SVM_Q_TIMEDWAIT
180  * @return              success status
181  */
182 int svm_msg_q_sub (svm_msg_q_t * mq, svm_msg_q_msg_t * msg,
183                    svm_q_conditional_wait_t cond, u32 time);
184
185 /**
186  * Consumer dequeue one message from queue with mutex held
187  *
188  * Returns the message pointing to the data in the message rings under the
189  * assumption that the message queue lock is already held. The consumer is
190  * expected to call @ref svm_msg_q_free_msg once it finishes
191  * processing/copies the message data.
192  *
193  * @param mq            message queue
194  * @param msg           pointer to structure where message is to be received
195  * @return              success status
196  */
197 void svm_msg_q_sub_w_lock (svm_msg_q_t * mq, svm_msg_q_msg_t * msg);
198
199 /**
200  * Get data for message in queue
201  *
202  * @param mq            message queue
203  * @param msg           message for which the data is requested
204  * @return              pointer to data
205  */
206 void *svm_msg_q_msg_data (svm_msg_q_t * mq, svm_msg_q_msg_t * msg);
207
208 /**
209  * Get message queue ring
210  *
211  * @param mq            message queue
212  * @param ring_index    index of ring
213  * @return              pointer to ring
214  */
215 svm_msg_q_ring_t *svm_msg_q_ring (svm_msg_q_t * mq, u32 ring_index);
216
217 /**
218  * Check if message queue is full
219  */
220 static inline u8
221 svm_msg_q_is_full (svm_msg_q_t * mq)
222 {
223   return (mq->q->cursize == mq->q->maxsize);
224 }
225
226 static inline u8
227 svm_msg_q_ring_is_full (svm_msg_q_t * mq, u32 ring_index)
228 {
229   ASSERT (ring_index < vec_len (mq->rings));
230   return (mq->rings[ring_index].cursize == mq->rings[ring_index].nitems);
231 }
232
233 /**
234  * Check if message queue is empty
235  */
236 static inline u8
237 svm_msg_q_is_empty (svm_msg_q_t * mq)
238 {
239   return (mq->q->cursize == 0);
240 }
241
242 /**
243  * Check length of message queue
244  */
245 static inline u32
246 svm_msg_q_size (svm_msg_q_t * mq)
247 {
248   return mq->q->cursize;
249 }
250
251 /**
252  * Check if message is invalid
253  */
254 static inline u8
255 svm_msg_q_msg_is_invalid (svm_msg_q_msg_t * msg)
256 {
257   return (msg->as_u64 == (u64) ~ 0);
258 }
259
260 /**
261  * Try locking message queue
262  */
263 static inline int
264 svm_msg_q_try_lock (svm_msg_q_t * mq)
265 {
266   return pthread_mutex_trylock (&mq->q->mutex);
267 }
268
269 /**
270  * Lock, or block trying, the message queue
271  */
272 static inline int
273 svm_msg_q_lock (svm_msg_q_t * mq)
274 {
275   return pthread_mutex_lock (&mq->q->mutex);
276 }
277
278 /**
279  * Unlock message queue
280  */
281 static inline void
282 svm_msg_q_unlock (svm_msg_q_t * mq)
283 {
284   /* The other side of the connection is not polling */
285   if (mq->q->cursize < (mq->q->maxsize / 8))
286     (void) pthread_cond_broadcast (&mq->q->condvar);
287   pthread_mutex_unlock (&mq->q->mutex);
288 }
289
290 /**
291  * Wait for message queue event
292  *
293  * Must be called with mutex held
294  */
295 static inline void
296 svm_msg_q_wait (svm_msg_q_t * mq)
297 {
298   pthread_cond_wait (&mq->q->condvar, &mq->q->mutex);
299 }
300
301 /**
302  * Timed wait for message queue event
303  *
304  * Must be called with mutex held.
305  *
306  * @param mq            message queue
307  * @param timeout       time in seconds
308  */
309 static inline int
310 svm_msg_q_timedwait (svm_msg_q_t * mq, double timeout)
311 {
312   struct timespec ts;
313
314   ts.tv_sec = unix_time_now () + (u32) timeout;
315   ts.tv_nsec = (timeout - (u32) timeout) * 1e9;
316   if (pthread_cond_timedwait (&mq->q->condvar, &mq->q->mutex, &ts))
317     return -1;
318   return 0;
319 }
320
321 #endif /* SRC_SVM_MESSAGE_QUEUE_H_ */
322
323 /*
324  * fd.io coding-style-patch-verification: ON
325  *
326  * Local Variables:
327  * eval: (c-set-style "gnu")
328  * End:
329  */