ip: Replace Sematics for Interface IP addresses
[vpp.git] / src / svm / queue.c
1 /*
2  *------------------------------------------------------------------
3  * svm_queue.c - unidirectional shared-memory queues
4  *
5  * Copyright (c) 2009-2019 Cisco and/or its affiliates.
6  * Licensed under the Apache License, Version 2.0 (the "License");
7  * you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at:
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  *------------------------------------------------------------------
18  */
19
20
21 #include <stdio.h>
22 #include <stdlib.h>
23 #include <string.h>
24 #include <pthread.h>
25 #include <vppinfra/mem.h>
26 #include <vppinfra/format.h>
27 #include <vppinfra/cache.h>
28 #include <svm/queue.h>
29 #include <vppinfra/time.h>
30 #include <vppinfra/lock.h>
31
32 svm_queue_t *
33 svm_queue_init (void *base, int nels, int elsize)
34 {
35   svm_queue_t *q;
36   pthread_mutexattr_t attr;
37   pthread_condattr_t cattr;
38
39   q = (svm_queue_t *) base;
40   clib_memset (q, 0, sizeof (*q));
41
42   q->elsize = elsize;
43   q->maxsize = nels;
44   q->producer_evtfd = -1;
45   q->consumer_evtfd = -1;
46
47   clib_memset (&attr, 0, sizeof (attr));
48   clib_memset (&cattr, 0, sizeof (cattr));
49
50   if (pthread_mutexattr_init (&attr))
51     clib_unix_warning ("mutexattr_init");
52   if (pthread_mutexattr_setpshared (&attr, PTHREAD_PROCESS_SHARED))
53     clib_unix_warning ("pthread_mutexattr_setpshared");
54   if (pthread_mutex_init (&q->mutex, &attr))
55     clib_unix_warning ("mutex_init");
56   if (pthread_mutexattr_destroy (&attr))
57     clib_unix_warning ("mutexattr_destroy");
58   if (pthread_condattr_init (&cattr))
59     clib_unix_warning ("condattr_init");
60   /* prints funny-looking messages in the Linux target */
61   if (pthread_condattr_setpshared (&cattr, PTHREAD_PROCESS_SHARED))
62     clib_unix_warning ("condattr_setpshared");
63   if (pthread_cond_init (&q->condvar, &cattr))
64     clib_unix_warning ("cond_init1");
65   if (pthread_condattr_destroy (&cattr))
66     clib_unix_warning ("cond_init2");
67
68   return (q);
69 }
70
71 svm_queue_t *
72 svm_queue_alloc_and_init (int nels, int elsize, int consumer_pid)
73 {
74   svm_queue_t *q;
75
76   q = clib_mem_alloc_aligned (sizeof (svm_queue_t)
77                               + nels * elsize, CLIB_CACHE_LINE_BYTES);
78   clib_memset (q, 0, sizeof (*q));
79   q = svm_queue_init (q, nels, elsize);
80   q->consumer_pid = consumer_pid;
81
82   return q;
83 }
84
85 /*
86  * svm_queue_free
87  */
88 void
89 svm_queue_free (svm_queue_t * q)
90 {
91   (void) pthread_mutex_destroy (&q->mutex);
92   (void) pthread_cond_destroy (&q->condvar);
93   clib_mem_free (q);
94 }
95
96 void
97 svm_queue_lock (svm_queue_t * q)
98 {
99   pthread_mutex_lock (&q->mutex);
100 }
101
102 void
103 svm_queue_unlock (svm_queue_t * q)
104 {
105   pthread_mutex_unlock (&q->mutex);
106 }
107
108 int
109 svm_queue_is_full (svm_queue_t * q)
110 {
111   return q->cursize == q->maxsize;
112 }
113
114 static inline void
115 svm_queue_send_signal_inline (svm_queue_t * q, u8 is_prod)
116 {
117   if (q->producer_evtfd == -1)
118     {
119       (void) pthread_cond_broadcast (&q->condvar);
120     }
121   else
122     {
123       int __clib_unused rv, fd;
124       u64 data = 1;
125       ASSERT (q->consumer_evtfd > 0 && q->producer_evtfd > 0);
126       fd = is_prod ? q->producer_evtfd : q->consumer_evtfd;
127       rv = write (fd, &data, sizeof (data));
128       if (PREDICT_FALSE (rv < 0))
129         clib_unix_warning ("signal write on %d returned %d", fd, rv);
130     }
131 }
132
133 void
134 svm_queue_send_signal (svm_queue_t * q, u8 is_prod)
135 {
136   svm_queue_send_signal_inline (q, is_prod);
137 }
138
139 static inline void
140 svm_queue_wait_inline (svm_queue_t * q)
141 {
142   if (q->producer_evtfd == -1)
143     {
144       pthread_cond_wait (&q->condvar, &q->mutex);
145     }
146   else
147     {
148       /* Fake a wait for event. We could use epoll but that would mean
149        * using yet another fd. Should do for now */
150       u32 cursize = q->cursize;
151       svm_queue_unlock (q);
152       while (q->cursize == cursize)
153         CLIB_PAUSE ();
154       svm_queue_lock (q);
155     }
156 }
157
158 void
159 svm_queue_wait (svm_queue_t * q)
160 {
161   svm_queue_wait_inline (q);
162 }
163
164 static inline int
165 svm_queue_timedwait_inline (svm_queue_t * q, double timeout)
166 {
167   struct timespec ts;
168   ts.tv_sec = unix_time_now () + (u32) timeout;
169   ts.tv_nsec = (timeout - (u32) timeout) * 1e9;
170
171   if (q->producer_evtfd == -1)
172     {
173       return pthread_cond_timedwait (&q->condvar, &q->mutex, &ts);
174     }
175   else
176     {
177       double max_time = unix_time_now () + timeout;
178       u32 cursize = q->cursize;
179       int rv;
180
181       svm_queue_unlock (q);
182       while (q->cursize == cursize && unix_time_now () < max_time)
183         CLIB_PAUSE ();
184       rv = unix_time_now () < max_time ? 0 : ETIMEDOUT;
185       svm_queue_lock (q);
186       return rv;
187     }
188 }
189
190 int
191 svm_queue_timedwait (svm_queue_t * q, double timeout)
192 {
193   return svm_queue_timedwait_inline (q, timeout);
194 }
195
196 /*
197  * svm_queue_add_nolock
198  */
199 int
200 svm_queue_add_nolock (svm_queue_t * q, u8 * elem)
201 {
202   i8 *tailp;
203   int need_broadcast = 0;
204
205   if (PREDICT_FALSE (q->cursize == q->maxsize))
206     {
207       while (q->cursize == q->maxsize)
208         svm_queue_wait_inline (q);
209     }
210
211   tailp = (i8 *) (&q->data[0] + q->elsize * q->tail);
212   clib_memcpy_fast (tailp, elem, q->elsize);
213
214   q->tail++;
215   q->cursize++;
216
217   need_broadcast = (q->cursize == 1);
218
219   if (q->tail == q->maxsize)
220     q->tail = 0;
221
222   if (need_broadcast)
223     svm_queue_send_signal_inline (q, 1);
224   return 0;
225 }
226
227 void
228 svm_queue_add_raw (svm_queue_t * q, u8 * elem)
229 {
230   i8 *tailp;
231
232   tailp = (i8 *) (&q->data[0] + q->elsize * q->tail);
233   clib_memcpy_fast (tailp, elem, q->elsize);
234
235   q->tail = (q->tail + 1) % q->maxsize;
236   q->cursize++;
237
238   if (q->cursize == 1)
239     svm_queue_send_signal_inline (q, 1);
240 }
241
242
243 /*
244  * svm_queue_add
245  */
246 int
247 svm_queue_add (svm_queue_t * q, u8 * elem, int nowait)
248 {
249   i8 *tailp;
250   int need_broadcast = 0;
251
252   if (nowait)
253     {
254       /* zero on success */
255       if (pthread_mutex_trylock (&q->mutex))
256         {
257           return (-1);
258         }
259     }
260   else
261     svm_queue_lock (q);
262
263   if (PREDICT_FALSE (q->cursize == q->maxsize))
264     {
265       if (nowait)
266         {
267           svm_queue_unlock (q);
268           return (-2);
269         }
270       while (q->cursize == q->maxsize)
271         svm_queue_wait_inline (q);
272     }
273
274   tailp = (i8 *) (&q->data[0] + q->elsize * q->tail);
275   clib_memcpy_fast (tailp, elem, q->elsize);
276
277   q->tail++;
278   q->cursize++;
279
280   need_broadcast = (q->cursize == 1);
281
282   if (q->tail == q->maxsize)
283     q->tail = 0;
284
285   if (need_broadcast)
286     svm_queue_send_signal_inline (q, 1);
287
288   svm_queue_unlock (q);
289
290   return 0;
291 }
292
293 /*
294  * svm_queue_add2
295  */
296 int
297 svm_queue_add2 (svm_queue_t * q, u8 * elem, u8 * elem2, int nowait)
298 {
299   i8 *tailp;
300   int need_broadcast = 0;
301
302   if (nowait)
303     {
304       /* zero on success */
305       if (pthread_mutex_trylock (&q->mutex))
306         {
307           return (-1);
308         }
309     }
310   else
311     svm_queue_lock (q);
312
313   if (PREDICT_FALSE (q->cursize + 1 == q->maxsize))
314     {
315       if (nowait)
316         {
317           svm_queue_unlock (q);
318           return (-2);
319         }
320       while (q->cursize + 1 == q->maxsize)
321         svm_queue_wait_inline (q);
322     }
323
324   tailp = (i8 *) (&q->data[0] + q->elsize * q->tail);
325   clib_memcpy_fast (tailp, elem, q->elsize);
326
327   q->tail++;
328   q->cursize++;
329
330   if (q->tail == q->maxsize)
331     q->tail = 0;
332
333   need_broadcast = (q->cursize == 1);
334
335   tailp = (i8 *) (&q->data[0] + q->elsize * q->tail);
336   clib_memcpy_fast (tailp, elem2, q->elsize);
337
338   q->tail++;
339   q->cursize++;
340
341   if (q->tail == q->maxsize)
342     q->tail = 0;
343
344   if (need_broadcast)
345     svm_queue_send_signal_inline (q, 1);
346
347   svm_queue_unlock (q);
348
349   return 0;
350 }
351
352 /*
353  * svm_queue_sub
354  */
355 int
356 svm_queue_sub (svm_queue_t * q, u8 * elem, svm_q_conditional_wait_t cond,
357                u32 time)
358 {
359   i8 *headp;
360   int need_broadcast = 0;
361   int rc = 0;
362
363   if (cond == SVM_Q_NOWAIT)
364     {
365       /* zero on success */
366       if (pthread_mutex_trylock (&q->mutex))
367         {
368           return (-1);
369         }
370     }
371   else
372     svm_queue_lock (q);
373
374   if (PREDICT_FALSE (q->cursize == 0))
375     {
376       if (cond == SVM_Q_NOWAIT)
377         {
378           svm_queue_unlock (q);
379           return (-2);
380         }
381       else if (cond == SVM_Q_TIMEDWAIT)
382         {
383           while (q->cursize == 0 && rc == 0)
384             rc = svm_queue_timedwait_inline (q, time);
385
386           if (rc == ETIMEDOUT)
387             {
388               svm_queue_unlock (q);
389               return ETIMEDOUT;
390             }
391         }
392       else
393         {
394           while (q->cursize == 0)
395             svm_queue_wait_inline (q);
396         }
397     }
398
399   headp = (i8 *) (&q->data[0] + q->elsize * q->head);
400   clib_memcpy_fast (elem, headp, q->elsize);
401
402   q->head++;
403   /* $$$$ JFC shouldn't this be == 0? */
404   if (q->cursize == q->maxsize)
405     need_broadcast = 1;
406
407   q->cursize--;
408
409   if (q->head == q->maxsize)
410     q->head = 0;
411
412   if (need_broadcast)
413     svm_queue_send_signal_inline (q, 0);
414
415   svm_queue_unlock (q);
416
417   return 0;
418 }
419
420 int
421 svm_queue_sub2 (svm_queue_t * q, u8 * elem)
422 {
423   int need_broadcast;
424   i8 *headp;
425
426   svm_queue_lock (q);
427   if (q->cursize == 0)
428     {
429       svm_queue_unlock (q);
430       return -1;
431     }
432
433   headp = (i8 *) (&q->data[0] + q->elsize * q->head);
434   clib_memcpy_fast (elem, headp, q->elsize);
435
436   q->head++;
437   need_broadcast = (q->cursize == q->maxsize / 2);
438   q->cursize--;
439
440   if (PREDICT_FALSE (q->head == q->maxsize))
441     q->head = 0;
442   svm_queue_unlock (q);
443
444   if (need_broadcast)
445     svm_queue_send_signal_inline (q, 0);
446
447   return 0;
448 }
449
450 int
451 svm_queue_sub_raw (svm_queue_t * q, u8 * elem)
452 {
453   int need_broadcast;
454   i8 *headp;
455
456   if (PREDICT_FALSE (q->cursize == 0))
457     {
458       while (q->cursize == 0)
459         ;
460     }
461
462   headp = (i8 *) (&q->data[0] + q->elsize * q->head);
463   clib_memcpy_fast (elem, headp, q->elsize);
464
465   need_broadcast = q->cursize == q->maxsize;
466
467   q->head = (q->head + 1) % q->maxsize;
468   q->cursize--;
469
470   if (PREDICT_FALSE (need_broadcast))
471     svm_queue_send_signal_inline (q, 0);
472
473   return 0;
474 }
475
476 void
477 svm_queue_set_producer_event_fd (svm_queue_t * q, int fd)
478 {
479   q->producer_evtfd = fd;
480 }
481
482 void
483 svm_queue_set_consumer_event_fd (svm_queue_t * q, int fd)
484 {
485   q->consumer_evtfd = fd;
486 }
487
488 /*
489  * fd.io coding-style-patch-verification: ON
490  *
491  * Local Variables:
492  * eval: (c-set-style "gnu")
493  * End:
494  */