ip-neighbor: show age instead of time in cli
[vpp.git] / src / svm / queue.c
1 /*
2  *------------------------------------------------------------------
3  * svm_queue.c - unidirectional shared-memory queues
4  *
5  * Copyright (c) 2009-2019 Cisco and/or its affiliates.
6  * Licensed under the Apache License, Version 2.0 (the "License");
7  * you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at:
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  *------------------------------------------------------------------
18  */
19
20
21 #include <stdio.h>
22 #include <stdlib.h>
23 #include <string.h>
24 #include <pthread.h>
25 #include <vppinfra/mem.h>
26 #include <vppinfra/format.h>
27 #include <vppinfra/cache.h>
28 #include <svm/queue.h>
29 #include <vppinfra/time.h>
30 #include <vppinfra/lock.h>
31
32 svm_queue_t *
33 svm_queue_init (void *base, int nels, int elsize)
34 {
35   svm_queue_t *q;
36   pthread_mutexattr_t attr;
37   pthread_condattr_t cattr;
38
39   q = (svm_queue_t *) base;
40   clib_memset (q, 0, sizeof (*q));
41
42   q->elsize = elsize;
43   q->maxsize = nels;
44   q->producer_evtfd = -1;
45   q->consumer_evtfd = -1;
46
47   clib_memset (&attr, 0, sizeof (attr));
48   clib_memset (&cattr, 0, sizeof (cattr));
49
50   if (pthread_mutexattr_init (&attr))
51     clib_unix_warning ("mutexattr_init");
52   if (pthread_mutexattr_setpshared (&attr, PTHREAD_PROCESS_SHARED))
53     clib_unix_warning ("pthread_mutexattr_setpshared");
54   if (pthread_mutexattr_setrobust (&attr, PTHREAD_MUTEX_ROBUST))
55     clib_unix_warning ("setrobust");
56   if (pthread_mutex_init (&q->mutex, &attr))
57     clib_unix_warning ("mutex_init");
58   if (pthread_mutexattr_destroy (&attr))
59     clib_unix_warning ("mutexattr_destroy");
60   if (pthread_condattr_init (&cattr))
61     clib_unix_warning ("condattr_init");
62   /* prints funny-looking messages in the Linux target */
63   if (pthread_condattr_setpshared (&cattr, PTHREAD_PROCESS_SHARED))
64     clib_unix_warning ("condattr_setpshared");
65   if (pthread_cond_init (&q->condvar, &cattr))
66     clib_unix_warning ("cond_init1");
67   if (pthread_condattr_destroy (&cattr))
68     clib_unix_warning ("cond_init2");
69
70   return (q);
71 }
72
73 svm_queue_t *
74 svm_queue_alloc_and_init (int nels, int elsize, int consumer_pid)
75 {
76   svm_queue_t *q;
77
78   q = clib_mem_alloc_aligned (sizeof (svm_queue_t)
79                               + nels * elsize, CLIB_CACHE_LINE_BYTES);
80   clib_memset (q, 0, sizeof (*q));
81   q = svm_queue_init (q, nels, elsize);
82   q->consumer_pid = consumer_pid;
83
84   return q;
85 }
86
87 /*
88  * svm_queue_free
89  */
90 void
91 svm_queue_free (svm_queue_t * q)
92 {
93   (void) pthread_mutex_destroy (&q->mutex);
94   (void) pthread_cond_destroy (&q->condvar);
95   clib_mem_free (q);
96 }
97
98 void
99 svm_queue_lock (svm_queue_t * q)
100 {
101   int rv = pthread_mutex_lock (&q->mutex);
102   if (PREDICT_FALSE (rv == EOWNERDEAD))
103     pthread_mutex_consistent (&q->mutex);
104 }
105
106 static int
107 svm_queue_trylock (svm_queue_t * q)
108 {
109   int rv = pthread_mutex_trylock (&q->mutex);
110   if (PREDICT_FALSE (rv == EOWNERDEAD))
111     rv = pthread_mutex_consistent (&q->mutex);
112   return rv;
113 }
114
115 void
116 svm_queue_unlock (svm_queue_t * q)
117 {
118   pthread_mutex_unlock (&q->mutex);
119 }
120
121 int
122 svm_queue_is_full (svm_queue_t * q)
123 {
124   return q->cursize == q->maxsize;
125 }
126
127 static inline void
128 svm_queue_send_signal_inline (svm_queue_t * q, u8 is_prod)
129 {
130   if (q->producer_evtfd == -1)
131     {
132       (void) pthread_cond_broadcast (&q->condvar);
133     }
134   else
135     {
136       int __clib_unused rv, fd;
137       u64 data = 1;
138       ASSERT (q->consumer_evtfd > 0 && q->producer_evtfd > 0);
139       fd = is_prod ? q->producer_evtfd : q->consumer_evtfd;
140       rv = write (fd, &data, sizeof (data));
141       if (PREDICT_FALSE (rv < 0))
142         clib_unix_warning ("signal write on %d returned %d", fd, rv);
143     }
144 }
145
146 void
147 svm_queue_send_signal (svm_queue_t * q, u8 is_prod)
148 {
149   svm_queue_send_signal_inline (q, is_prod);
150 }
151
152 static inline void
153 svm_queue_wait_inline (svm_queue_t * q)
154 {
155   if (q->producer_evtfd == -1)
156     {
157       pthread_cond_wait (&q->condvar, &q->mutex);
158     }
159   else
160     {
161       /* Fake a wait for event. We could use epoll but that would mean
162        * using yet another fd. Should do for now */
163       u32 cursize = q->cursize;
164       svm_queue_unlock (q);
165       while (q->cursize == cursize)
166         CLIB_PAUSE ();
167       svm_queue_lock (q);
168     }
169 }
170
171 void
172 svm_queue_wait (svm_queue_t * q)
173 {
174   svm_queue_wait_inline (q);
175 }
176
177 static inline int
178 svm_queue_timedwait_inline (svm_queue_t * q, double timeout)
179 {
180   struct timespec ts;
181   ts.tv_sec = unix_time_now () + (u32) timeout;
182   ts.tv_nsec = (timeout - (u32) timeout) * 1e9;
183
184   if (q->producer_evtfd == -1)
185     {
186       return pthread_cond_timedwait (&q->condvar, &q->mutex, &ts);
187     }
188   else
189     {
190       double max_time = unix_time_now () + timeout;
191       u32 cursize = q->cursize;
192       int rv;
193
194       svm_queue_unlock (q);
195       while (q->cursize == cursize && unix_time_now () < max_time)
196         CLIB_PAUSE ();
197       rv = unix_time_now () < max_time ? 0 : ETIMEDOUT;
198       svm_queue_lock (q);
199       return rv;
200     }
201 }
202
203 int
204 svm_queue_timedwait (svm_queue_t * q, double timeout)
205 {
206   return svm_queue_timedwait_inline (q, timeout);
207 }
208
209 /*
210  * svm_queue_add_nolock
211  */
212 int
213 svm_queue_add_nolock (svm_queue_t * q, u8 * elem)
214 {
215   i8 *tailp;
216   int need_broadcast = 0;
217
218   if (PREDICT_FALSE (q->cursize == q->maxsize))
219     {
220       while (q->cursize == q->maxsize)
221         svm_queue_wait_inline (q);
222     }
223
224   tailp = (i8 *) (&q->data[0] + q->elsize * q->tail);
225   clib_memcpy_fast (tailp, elem, q->elsize);
226
227   q->tail++;
228   q->cursize++;
229
230   need_broadcast = (q->cursize == 1);
231
232   if (q->tail == q->maxsize)
233     q->tail = 0;
234
235   if (need_broadcast)
236     svm_queue_send_signal_inline (q, 1);
237   return 0;
238 }
239
240 void
241 svm_queue_add_raw (svm_queue_t * q, u8 * elem)
242 {
243   i8 *tailp;
244
245   tailp = (i8 *) (&q->data[0] + q->elsize * q->tail);
246   clib_memcpy_fast (tailp, elem, q->elsize);
247
248   q->tail = (q->tail + 1) % q->maxsize;
249   q->cursize++;
250
251   if (q->cursize == 1)
252     svm_queue_send_signal_inline (q, 1);
253 }
254
255
256 /*
257  * svm_queue_add
258  */
259 int
260 svm_queue_add (svm_queue_t * q, u8 * elem, int nowait)
261 {
262   i8 *tailp;
263   int need_broadcast = 0;
264
265   if (nowait)
266     {
267       /* zero on success */
268       if (svm_queue_trylock (q))
269         {
270           return (-1);
271         }
272     }
273   else
274     svm_queue_lock (q);
275
276   if (PREDICT_FALSE (q->cursize == q->maxsize))
277     {
278       if (nowait)
279         {
280           svm_queue_unlock (q);
281           return (-2);
282         }
283       while (q->cursize == q->maxsize)
284         svm_queue_wait_inline (q);
285     }
286
287   tailp = (i8 *) (&q->data[0] + q->elsize * q->tail);
288   clib_memcpy_fast (tailp, elem, q->elsize);
289
290   q->tail++;
291   q->cursize++;
292
293   need_broadcast = (q->cursize == 1);
294
295   if (q->tail == q->maxsize)
296     q->tail = 0;
297
298   if (need_broadcast)
299     svm_queue_send_signal_inline (q, 1);
300
301   svm_queue_unlock (q);
302
303   return 0;
304 }
305
306 /*
307  * svm_queue_add2
308  */
309 int
310 svm_queue_add2 (svm_queue_t * q, u8 * elem, u8 * elem2, int nowait)
311 {
312   i8 *tailp;
313   int need_broadcast = 0;
314
315   if (nowait)
316     {
317       /* zero on success */
318       if (svm_queue_trylock (q))
319         {
320           return (-1);
321         }
322     }
323   else
324     svm_queue_lock (q);
325
326   if (PREDICT_FALSE (q->cursize + 1 >= q->maxsize))
327     {
328       if (nowait)
329         {
330           svm_queue_unlock (q);
331           return (-2);
332         }
333       while (q->cursize + 1 >= q->maxsize)
334         svm_queue_wait_inline (q);
335     }
336
337   tailp = (i8 *) (&q->data[0] + q->elsize * q->tail);
338   clib_memcpy_fast (tailp, elem, q->elsize);
339
340   q->tail++;
341   q->cursize++;
342
343   if (q->tail == q->maxsize)
344     q->tail = 0;
345
346   need_broadcast = (q->cursize == 1);
347
348   tailp = (i8 *) (&q->data[0] + q->elsize * q->tail);
349   clib_memcpy_fast (tailp, elem2, q->elsize);
350
351   q->tail++;
352   q->cursize++;
353
354   if (q->tail == q->maxsize)
355     q->tail = 0;
356
357   if (need_broadcast)
358     svm_queue_send_signal_inline (q, 1);
359
360   svm_queue_unlock (q);
361
362   return 0;
363 }
364
365 /*
366  * svm_queue_sub
367  */
368 int
369 svm_queue_sub (svm_queue_t * q, u8 * elem, svm_q_conditional_wait_t cond,
370                u32 time)
371 {
372   i8 *headp;
373   int need_broadcast = 0;
374   int rc = 0;
375
376   if (cond == SVM_Q_NOWAIT)
377     {
378       /* zero on success */
379       if (svm_queue_trylock (q))
380         {
381           return (-1);
382         }
383     }
384   else
385     svm_queue_lock (q);
386
387   if (PREDICT_FALSE (q->cursize == 0))
388     {
389       if (cond == SVM_Q_NOWAIT)
390         {
391           svm_queue_unlock (q);
392           return (-2);
393         }
394       else if (cond == SVM_Q_TIMEDWAIT)
395         {
396           while (q->cursize == 0 && rc == 0)
397             rc = svm_queue_timedwait_inline (q, time);
398
399           if (rc == ETIMEDOUT)
400             {
401               svm_queue_unlock (q);
402               return ETIMEDOUT;
403             }
404         }
405       else
406         {
407           while (q->cursize == 0)
408             svm_queue_wait_inline (q);
409         }
410     }
411
412   headp = (i8 *) (&q->data[0] + q->elsize * q->head);
413   clib_memcpy_fast (elem, headp, q->elsize);
414
415   q->head++;
416   /* $$$$ JFC shouldn't this be == 0? */
417   if (q->cursize == q->maxsize)
418     need_broadcast = 1;
419
420   q->cursize--;
421
422   if (q->head == q->maxsize)
423     q->head = 0;
424
425   if (need_broadcast)
426     svm_queue_send_signal_inline (q, 0);
427
428   svm_queue_unlock (q);
429
430   return 0;
431 }
432
433 int
434 svm_queue_sub2 (svm_queue_t * q, u8 * elem)
435 {
436   int need_broadcast;
437   i8 *headp;
438
439   svm_queue_lock (q);
440   if (q->cursize == 0)
441     {
442       svm_queue_unlock (q);
443       return -1;
444     }
445
446   headp = (i8 *) (&q->data[0] + q->elsize * q->head);
447   clib_memcpy_fast (elem, headp, q->elsize);
448
449   q->head++;
450   need_broadcast = (q->cursize == q->maxsize / 2);
451   q->cursize--;
452
453   if (PREDICT_FALSE (q->head == q->maxsize))
454     q->head = 0;
455   svm_queue_unlock (q);
456
457   if (need_broadcast)
458     svm_queue_send_signal_inline (q, 0);
459
460   return 0;
461 }
462
463 int
464 svm_queue_sub_raw (svm_queue_t * q, u8 * elem)
465 {
466   int need_broadcast;
467   i8 *headp;
468
469   if (PREDICT_FALSE (q->cursize == 0))
470     {
471       while (q->cursize == 0)
472         ;
473     }
474
475   headp = (i8 *) (&q->data[0] + q->elsize * q->head);
476   clib_memcpy_fast (elem, headp, q->elsize);
477
478   need_broadcast = q->cursize == q->maxsize;
479
480   q->head = (q->head + 1) % q->maxsize;
481   q->cursize--;
482
483   if (PREDICT_FALSE (need_broadcast))
484     svm_queue_send_signal_inline (q, 0);
485
486   return 0;
487 }
488
489 void
490 svm_queue_set_producer_event_fd (svm_queue_t * q, int fd)
491 {
492   q->producer_evtfd = fd;
493 }
494
495 void
496 svm_queue_set_consumer_event_fd (svm_queue_t * q, int fd)
497 {
498   q->consumer_evtfd = fd;
499 }
500
501 /*
502  * fd.io coding-style-patch-verification: ON
503  *
504  * Local Variables:
505  * eval: (c-set-style "gnu")
506  * End:
507  */