svm: make svm queue mutex robust
[vpp.git] / src / svm / queue.c
1 /*
2  *------------------------------------------------------------------
3  * svm_queue.c - unidirectional shared-memory queues
4  *
5  * Copyright (c) 2009-2019 Cisco and/or its affiliates.
6  * Licensed under the Apache License, Version 2.0 (the "License");
7  * you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at:
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  *------------------------------------------------------------------
18  */
19
20
21 #include <stdio.h>
22 #include <stdlib.h>
23 #include <string.h>
24 #include <pthread.h>
25 #include <vppinfra/mem.h>
26 #include <vppinfra/format.h>
27 #include <vppinfra/cache.h>
28 #include <svm/queue.h>
29 #include <vppinfra/time.h>
30 #include <vppinfra/lock.h>
31
32 svm_queue_t *
33 svm_queue_init (void *base, int nels, int elsize)
34 {
35   svm_queue_t *q;
36   pthread_mutexattr_t attr;
37   pthread_condattr_t cattr;
38
39   q = (svm_queue_t *) base;
40   clib_memset (q, 0, sizeof (*q));
41
42   q->elsize = elsize;
43   q->maxsize = nels;
44   q->producer_evtfd = -1;
45   q->consumer_evtfd = -1;
46
47   clib_memset (&attr, 0, sizeof (attr));
48   clib_memset (&cattr, 0, sizeof (cattr));
49
50   if (pthread_mutexattr_init (&attr))
51     clib_unix_warning ("mutexattr_init");
52   if (pthread_mutexattr_setpshared (&attr, PTHREAD_PROCESS_SHARED))
53     clib_unix_warning ("pthread_mutexattr_setpshared");
54   if (pthread_mutexattr_setrobust (&attr, PTHREAD_MUTEX_ROBUST))
55     clib_unix_warning ("setrobust");
56   if (pthread_mutex_init (&q->mutex, &attr))
57     clib_unix_warning ("mutex_init");
58   if (pthread_mutexattr_destroy (&attr))
59     clib_unix_warning ("mutexattr_destroy");
60   if (pthread_condattr_init (&cattr))
61     clib_unix_warning ("condattr_init");
62   /* prints funny-looking messages in the Linux target */
63   if (pthread_condattr_setpshared (&cattr, PTHREAD_PROCESS_SHARED))
64     clib_unix_warning ("condattr_setpshared");
65   if (pthread_cond_init (&q->condvar, &cattr))
66     clib_unix_warning ("cond_init1");
67   if (pthread_condattr_destroy (&cattr))
68     clib_unix_warning ("cond_init2");
69
70   return (q);
71 }
72
73 svm_queue_t *
74 svm_queue_alloc_and_init (int nels, int elsize, int consumer_pid)
75 {
76   svm_queue_t *q;
77
78   q = clib_mem_alloc_aligned (sizeof (svm_queue_t)
79                               + nels * elsize, CLIB_CACHE_LINE_BYTES);
80   clib_memset (q, 0, sizeof (*q));
81   q = svm_queue_init (q, nels, elsize);
82   q->consumer_pid = consumer_pid;
83
84   return q;
85 }
86
87 /*
88  * svm_queue_free
89  */
90 void
91 svm_queue_free (svm_queue_t * q)
92 {
93   (void) pthread_mutex_destroy (&q->mutex);
94   (void) pthread_cond_destroy (&q->condvar);
95   clib_mem_free (q);
96 }
97
98 void
99 svm_queue_lock (svm_queue_t * q)
100 {
101   int rv = pthread_mutex_lock (&q->mutex);
102   if (PREDICT_FALSE (rv == EOWNERDEAD))
103     pthread_mutex_consistent (&q->mutex);
104 }
105
106 void
107 svm_queue_unlock (svm_queue_t * q)
108 {
109   pthread_mutex_unlock (&q->mutex);
110 }
111
112 int
113 svm_queue_is_full (svm_queue_t * q)
114 {
115   return q->cursize == q->maxsize;
116 }
117
118 static inline void
119 svm_queue_send_signal_inline (svm_queue_t * q, u8 is_prod)
120 {
121   if (q->producer_evtfd == -1)
122     {
123       (void) pthread_cond_broadcast (&q->condvar);
124     }
125   else
126     {
127       int __clib_unused rv, fd;
128       u64 data = 1;
129       ASSERT (q->consumer_evtfd > 0 && q->producer_evtfd > 0);
130       fd = is_prod ? q->producer_evtfd : q->consumer_evtfd;
131       rv = write (fd, &data, sizeof (data));
132       if (PREDICT_FALSE (rv < 0))
133         clib_unix_warning ("signal write on %d returned %d", fd, rv);
134     }
135 }
136
137 void
138 svm_queue_send_signal (svm_queue_t * q, u8 is_prod)
139 {
140   svm_queue_send_signal_inline (q, is_prod);
141 }
142
143 static inline void
144 svm_queue_wait_inline (svm_queue_t * q)
145 {
146   if (q->producer_evtfd == -1)
147     {
148       pthread_cond_wait (&q->condvar, &q->mutex);
149     }
150   else
151     {
152       /* Fake a wait for event. We could use epoll but that would mean
153        * using yet another fd. Should do for now */
154       u32 cursize = q->cursize;
155       svm_queue_unlock (q);
156       while (q->cursize == cursize)
157         CLIB_PAUSE ();
158       svm_queue_lock (q);
159     }
160 }
161
162 void
163 svm_queue_wait (svm_queue_t * q)
164 {
165   svm_queue_wait_inline (q);
166 }
167
168 static inline int
169 svm_queue_timedwait_inline (svm_queue_t * q, double timeout)
170 {
171   struct timespec ts;
172   ts.tv_sec = unix_time_now () + (u32) timeout;
173   ts.tv_nsec = (timeout - (u32) timeout) * 1e9;
174
175   if (q->producer_evtfd == -1)
176     {
177       return pthread_cond_timedwait (&q->condvar, &q->mutex, &ts);
178     }
179   else
180     {
181       double max_time = unix_time_now () + timeout;
182       u32 cursize = q->cursize;
183       int rv;
184
185       svm_queue_unlock (q);
186       while (q->cursize == cursize && unix_time_now () < max_time)
187         CLIB_PAUSE ();
188       rv = unix_time_now () < max_time ? 0 : ETIMEDOUT;
189       svm_queue_lock (q);
190       return rv;
191     }
192 }
193
194 int
195 svm_queue_timedwait (svm_queue_t * q, double timeout)
196 {
197   return svm_queue_timedwait_inline (q, timeout);
198 }
199
200 /*
201  * svm_queue_add_nolock
202  */
203 int
204 svm_queue_add_nolock (svm_queue_t * q, u8 * elem)
205 {
206   i8 *tailp;
207   int need_broadcast = 0;
208
209   if (PREDICT_FALSE (q->cursize == q->maxsize))
210     {
211       while (q->cursize == q->maxsize)
212         svm_queue_wait_inline (q);
213     }
214
215   tailp = (i8 *) (&q->data[0] + q->elsize * q->tail);
216   clib_memcpy_fast (tailp, elem, q->elsize);
217
218   q->tail++;
219   q->cursize++;
220
221   need_broadcast = (q->cursize == 1);
222
223   if (q->tail == q->maxsize)
224     q->tail = 0;
225
226   if (need_broadcast)
227     svm_queue_send_signal_inline (q, 1);
228   return 0;
229 }
230
231 void
232 svm_queue_add_raw (svm_queue_t * q, u8 * elem)
233 {
234   i8 *tailp;
235
236   tailp = (i8 *) (&q->data[0] + q->elsize * q->tail);
237   clib_memcpy_fast (tailp, elem, q->elsize);
238
239   q->tail = (q->tail + 1) % q->maxsize;
240   q->cursize++;
241
242   if (q->cursize == 1)
243     svm_queue_send_signal_inline (q, 1);
244 }
245
246
247 /*
248  * svm_queue_add
249  */
250 int
251 svm_queue_add (svm_queue_t * q, u8 * elem, int nowait)
252 {
253   i8 *tailp;
254   int need_broadcast = 0;
255
256   if (nowait)
257     {
258       /* zero on success */
259       if (pthread_mutex_trylock (&q->mutex))
260         {
261           return (-1);
262         }
263     }
264   else
265     svm_queue_lock (q);
266
267   if (PREDICT_FALSE (q->cursize == q->maxsize))
268     {
269       if (nowait)
270         {
271           svm_queue_unlock (q);
272           return (-2);
273         }
274       while (q->cursize == q->maxsize)
275         svm_queue_wait_inline (q);
276     }
277
278   tailp = (i8 *) (&q->data[0] + q->elsize * q->tail);
279   clib_memcpy_fast (tailp, elem, q->elsize);
280
281   q->tail++;
282   q->cursize++;
283
284   need_broadcast = (q->cursize == 1);
285
286   if (q->tail == q->maxsize)
287     q->tail = 0;
288
289   if (need_broadcast)
290     svm_queue_send_signal_inline (q, 1);
291
292   svm_queue_unlock (q);
293
294   return 0;
295 }
296
297 /*
298  * svm_queue_add2
299  */
300 int
301 svm_queue_add2 (svm_queue_t * q, u8 * elem, u8 * elem2, int nowait)
302 {
303   i8 *tailp;
304   int need_broadcast = 0;
305
306   if (nowait)
307     {
308       /* zero on success */
309       if (pthread_mutex_trylock (&q->mutex))
310         {
311           return (-1);
312         }
313     }
314   else
315     svm_queue_lock (q);
316
317   if (PREDICT_FALSE (q->cursize + 1 == q->maxsize))
318     {
319       if (nowait)
320         {
321           svm_queue_unlock (q);
322           return (-2);
323         }
324       while (q->cursize + 1 == q->maxsize)
325         svm_queue_wait_inline (q);
326     }
327
328   tailp = (i8 *) (&q->data[0] + q->elsize * q->tail);
329   clib_memcpy_fast (tailp, elem, q->elsize);
330
331   q->tail++;
332   q->cursize++;
333
334   if (q->tail == q->maxsize)
335     q->tail = 0;
336
337   need_broadcast = (q->cursize == 1);
338
339   tailp = (i8 *) (&q->data[0] + q->elsize * q->tail);
340   clib_memcpy_fast (tailp, elem2, q->elsize);
341
342   q->tail++;
343   q->cursize++;
344
345   if (q->tail == q->maxsize)
346     q->tail = 0;
347
348   if (need_broadcast)
349     svm_queue_send_signal_inline (q, 1);
350
351   svm_queue_unlock (q);
352
353   return 0;
354 }
355
356 /*
357  * svm_queue_sub
358  */
359 int
360 svm_queue_sub (svm_queue_t * q, u8 * elem, svm_q_conditional_wait_t cond,
361                u32 time)
362 {
363   i8 *headp;
364   int need_broadcast = 0;
365   int rc = 0;
366
367   if (cond == SVM_Q_NOWAIT)
368     {
369       /* zero on success */
370       if (pthread_mutex_trylock (&q->mutex))
371         {
372           return (-1);
373         }
374     }
375   else
376     svm_queue_lock (q);
377
378   if (PREDICT_FALSE (q->cursize == 0))
379     {
380       if (cond == SVM_Q_NOWAIT)
381         {
382           svm_queue_unlock (q);
383           return (-2);
384         }
385       else if (cond == SVM_Q_TIMEDWAIT)
386         {
387           while (q->cursize == 0 && rc == 0)
388             rc = svm_queue_timedwait_inline (q, time);
389
390           if (rc == ETIMEDOUT)
391             {
392               svm_queue_unlock (q);
393               return ETIMEDOUT;
394             }
395         }
396       else
397         {
398           while (q->cursize == 0)
399             svm_queue_wait_inline (q);
400         }
401     }
402
403   headp = (i8 *) (&q->data[0] + q->elsize * q->head);
404   clib_memcpy_fast (elem, headp, q->elsize);
405
406   q->head++;
407   /* $$$$ JFC shouldn't this be == 0? */
408   if (q->cursize == q->maxsize)
409     need_broadcast = 1;
410
411   q->cursize--;
412
413   if (q->head == q->maxsize)
414     q->head = 0;
415
416   if (need_broadcast)
417     svm_queue_send_signal_inline (q, 0);
418
419   svm_queue_unlock (q);
420
421   return 0;
422 }
423
424 int
425 svm_queue_sub2 (svm_queue_t * q, u8 * elem)
426 {
427   int need_broadcast;
428   i8 *headp;
429
430   svm_queue_lock (q);
431   if (q->cursize == 0)
432     {
433       svm_queue_unlock (q);
434       return -1;
435     }
436
437   headp = (i8 *) (&q->data[0] + q->elsize * q->head);
438   clib_memcpy_fast (elem, headp, q->elsize);
439
440   q->head++;
441   need_broadcast = (q->cursize == q->maxsize / 2);
442   q->cursize--;
443
444   if (PREDICT_FALSE (q->head == q->maxsize))
445     q->head = 0;
446   svm_queue_unlock (q);
447
448   if (need_broadcast)
449     svm_queue_send_signal_inline (q, 0);
450
451   return 0;
452 }
453
454 int
455 svm_queue_sub_raw (svm_queue_t * q, u8 * elem)
456 {
457   int need_broadcast;
458   i8 *headp;
459
460   if (PREDICT_FALSE (q->cursize == 0))
461     {
462       while (q->cursize == 0)
463         ;
464     }
465
466   headp = (i8 *) (&q->data[0] + q->elsize * q->head);
467   clib_memcpy_fast (elem, headp, q->elsize);
468
469   need_broadcast = q->cursize == q->maxsize;
470
471   q->head = (q->head + 1) % q->maxsize;
472   q->cursize--;
473
474   if (PREDICT_FALSE (need_broadcast))
475     svm_queue_send_signal_inline (q, 0);
476
477   return 0;
478 }
479
480 void
481 svm_queue_set_producer_event_fd (svm_queue_t * q, int fd)
482 {
483   q->producer_evtfd = fd;
484 }
485
486 void
487 svm_queue_set_consumer_event_fd (svm_queue_t * q, int fd)
488 {
489   q->consumer_evtfd = fd;
490 }
491
492 /*
493  * fd.io coding-style-patch-verification: ON
494  *
495  * Local Variables:
496  * eval: (c-set-style "gnu")
497  * End:
498  */