host stack: update stale copyright
[vpp.git] / src / svm / queue.c
1 /*
2  *------------------------------------------------------------------
3  * svm_queue.c - unidirectional shared-memory queues
4  *
5  * Copyright (c) 2009-2019 Cisco and/or its affiliates.
6  * Licensed under the Apache License, Version 2.0 (the "License");
7  * you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at:
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  *------------------------------------------------------------------
18  */
19
20
21 #include <stdio.h>
22 #include <stdlib.h>
23 #include <string.h>
24 #include <pthread.h>
25 #include <vppinfra/mem.h>
26 #include <vppinfra/format.h>
27 #include <vppinfra/cache.h>
28 #include <svm/queue.h>
29 #include <vppinfra/time.h>
30 #include <vppinfra/lock.h>
31
32 svm_queue_t *
33 svm_queue_init (void *base, int nels, int elsize)
34 {
35   svm_queue_t *q;
36   pthread_mutexattr_t attr;
37   pthread_condattr_t cattr;
38
39   q = (svm_queue_t *) base;
40   clib_memset (q, 0, sizeof (*q));
41
42   q->elsize = elsize;
43   q->maxsize = nels;
44   q->producer_evtfd = -1;
45   q->consumer_evtfd = -1;
46
47   clib_memset (&attr, 0, sizeof (attr));
48   clib_memset (&cattr, 0, sizeof (cattr));
49
50   if (pthread_mutexattr_init (&attr))
51     clib_unix_warning ("mutexattr_init");
52   if (pthread_mutexattr_setpshared (&attr, PTHREAD_PROCESS_SHARED))
53     clib_unix_warning ("pthread_mutexattr_setpshared");
54   if (pthread_mutex_init (&q->mutex, &attr))
55     clib_unix_warning ("mutex_init");
56   if (pthread_mutexattr_destroy (&attr))
57     clib_unix_warning ("mutexattr_destroy");
58   if (pthread_condattr_init (&cattr))
59     clib_unix_warning ("condattr_init");
60   /* prints funny-looking messages in the Linux target */
61   if (pthread_condattr_setpshared (&cattr, PTHREAD_PROCESS_SHARED))
62     clib_unix_warning ("condattr_setpshared");
63   if (pthread_cond_init (&q->condvar, &cattr))
64     clib_unix_warning ("cond_init1");
65   if (pthread_condattr_destroy (&cattr))
66     clib_unix_warning ("cond_init2");
67
68   return (q);
69 }
70
71 svm_queue_t *
72 svm_queue_alloc_and_init (int nels, int elsize, int consumer_pid)
73 {
74   svm_queue_t *q;
75
76   q = clib_mem_alloc_aligned (sizeof (svm_queue_t)
77                               + nels * elsize, CLIB_CACHE_LINE_BYTES);
78   clib_memset (q, 0, sizeof (*q));
79   q = svm_queue_init (q, nels, elsize);
80   q->consumer_pid = consumer_pid;
81
82   return q;
83 }
84
85 /*
86  * svm_queue_free
87  */
88 void
89 svm_queue_free (svm_queue_t * q)
90 {
91   (void) pthread_mutex_destroy (&q->mutex);
92   (void) pthread_cond_destroy (&q->condvar);
93   clib_mem_free (q);
94 }
95
96 void
97 svm_queue_lock (svm_queue_t * q)
98 {
99   pthread_mutex_lock (&q->mutex);
100 }
101
102 void
103 svm_queue_unlock (svm_queue_t * q)
104 {
105   pthread_mutex_unlock (&q->mutex);
106 }
107
108 int
109 svm_queue_is_full (svm_queue_t * q)
110 {
111   return q->cursize == q->maxsize;
112 }
113
114 static inline void
115 svm_queue_send_signal (svm_queue_t * q, u8 is_prod)
116 {
117   if (q->producer_evtfd == -1)
118     {
119       (void) pthread_cond_broadcast (&q->condvar);
120     }
121   else
122     {
123       int __clib_unused rv, fd;
124       u64 data = 1;
125       ASSERT (q->consumer_evtfd > 0 && q->producer_evtfd > 0);
126       fd = is_prod ? q->producer_evtfd : q->consumer_evtfd;
127       rv = write (fd, &data, sizeof (data));
128     }
129 }
130
131 static inline void
132 svm_queue_wait_inline (svm_queue_t * q)
133 {
134   if (q->producer_evtfd == -1)
135     {
136       pthread_cond_wait (&q->condvar, &q->mutex);
137     }
138   else
139     {
140       /* Fake a wait for event. We could use epoll but that would mean
141        * using yet another fd. Should do for now */
142       u32 cursize = q->cursize;
143       pthread_mutex_unlock (&q->mutex);
144       while (q->cursize == cursize)
145         CLIB_PAUSE ();
146       pthread_mutex_lock (&q->mutex);
147     }
148 }
149
150 void
151 svm_queue_wait (svm_queue_t * q)
152 {
153   svm_queue_wait_inline (q);
154 }
155
156 static inline int
157 svm_queue_timedwait_inline (svm_queue_t * q, double timeout)
158 {
159   struct timespec ts;
160   ts.tv_sec = unix_time_now () + (u32) timeout;
161   ts.tv_nsec = (timeout - (u32) timeout) * 1e9;
162
163   if (q->producer_evtfd == -1)
164     {
165       return pthread_cond_timedwait (&q->condvar, &q->mutex, &ts);
166     }
167   else
168     {
169       double max_time = unix_time_now () + timeout;
170       u32 cursize = q->cursize;
171       int rv;
172
173       pthread_mutex_unlock (&q->mutex);
174       while (q->cursize == cursize && unix_time_now () < max_time)
175         CLIB_PAUSE ();
176       rv = unix_time_now () < max_time ? 0 : ETIMEDOUT;
177       pthread_mutex_lock (&q->mutex);
178       return rv;
179     }
180 }
181
182 int
183 svm_queue_timedwait (svm_queue_t * q, double timeout)
184 {
185   return svm_queue_timedwait_inline (q, timeout);
186 }
187
188 /*
189  * svm_queue_add_nolock
190  */
191 int
192 svm_queue_add_nolock (svm_queue_t * q, u8 * elem)
193 {
194   i8 *tailp;
195   int need_broadcast = 0;
196
197   if (PREDICT_FALSE (q->cursize == q->maxsize))
198     {
199       while (q->cursize == q->maxsize)
200         svm_queue_wait_inline (q);
201     }
202
203   tailp = (i8 *) (&q->data[0] + q->elsize * q->tail);
204   clib_memcpy_fast (tailp, elem, q->elsize);
205
206   q->tail++;
207   q->cursize++;
208
209   need_broadcast = (q->cursize == 1);
210
211   if (q->tail == q->maxsize)
212     q->tail = 0;
213
214   if (need_broadcast)
215     svm_queue_send_signal (q, 1);
216   return 0;
217 }
218
219 void
220 svm_queue_add_raw (svm_queue_t * q, u8 * elem)
221 {
222   i8 *tailp;
223
224   tailp = (i8 *) (&q->data[0] + q->elsize * q->tail);
225   clib_memcpy_fast (tailp, elem, q->elsize);
226
227   q->tail = (q->tail + 1) % q->maxsize;
228   q->cursize++;
229
230   if (q->cursize == 1)
231     svm_queue_send_signal (q, 1);
232 }
233
234
235 /*
236  * svm_queue_add
237  */
238 int
239 svm_queue_add (svm_queue_t * q, u8 * elem, int nowait)
240 {
241   i8 *tailp;
242   int need_broadcast = 0;
243
244   if (nowait)
245     {
246       /* zero on success */
247       if (pthread_mutex_trylock (&q->mutex))
248         {
249           return (-1);
250         }
251     }
252   else
253     pthread_mutex_lock (&q->mutex);
254
255   if (PREDICT_FALSE (q->cursize == q->maxsize))
256     {
257       if (nowait)
258         {
259           pthread_mutex_unlock (&q->mutex);
260           return (-2);
261         }
262       while (q->cursize == q->maxsize)
263         svm_queue_wait_inline (q);
264     }
265
266   tailp = (i8 *) (&q->data[0] + q->elsize * q->tail);
267   clib_memcpy_fast (tailp, elem, q->elsize);
268
269   q->tail++;
270   q->cursize++;
271
272   need_broadcast = (q->cursize == 1);
273
274   if (q->tail == q->maxsize)
275     q->tail = 0;
276
277   if (need_broadcast)
278     svm_queue_send_signal (q, 1);
279
280   pthread_mutex_unlock (&q->mutex);
281
282   return 0;
283 }
284
285 /*
286  * svm_queue_add2
287  */
288 int
289 svm_queue_add2 (svm_queue_t * q, u8 * elem, u8 * elem2, int nowait)
290 {
291   i8 *tailp;
292   int need_broadcast = 0;
293
294   if (nowait)
295     {
296       /* zero on success */
297       if (pthread_mutex_trylock (&q->mutex))
298         {
299           return (-1);
300         }
301     }
302   else
303     pthread_mutex_lock (&q->mutex);
304
305   if (PREDICT_FALSE (q->cursize + 1 == q->maxsize))
306     {
307       if (nowait)
308         {
309           pthread_mutex_unlock (&q->mutex);
310           return (-2);
311         }
312       while (q->cursize + 1 == q->maxsize)
313         svm_queue_wait_inline (q);
314     }
315
316   tailp = (i8 *) (&q->data[0] + q->elsize * q->tail);
317   clib_memcpy_fast (tailp, elem, q->elsize);
318
319   q->tail++;
320   q->cursize++;
321
322   if (q->tail == q->maxsize)
323     q->tail = 0;
324
325   need_broadcast = (q->cursize == 1);
326
327   tailp = (i8 *) (&q->data[0] + q->elsize * q->tail);
328   clib_memcpy_fast (tailp, elem2, q->elsize);
329
330   q->tail++;
331   q->cursize++;
332
333   if (q->tail == q->maxsize)
334     q->tail = 0;
335
336   if (need_broadcast)
337     svm_queue_send_signal (q, 1);
338
339   pthread_mutex_unlock (&q->mutex);
340
341   return 0;
342 }
343
344 /*
345  * svm_queue_sub
346  */
347 int
348 svm_queue_sub (svm_queue_t * q, u8 * elem, svm_q_conditional_wait_t cond,
349                u32 time)
350 {
351   i8 *headp;
352   int need_broadcast = 0;
353   int rc = 0;
354
355   if (cond == SVM_Q_NOWAIT)
356     {
357       /* zero on success */
358       if (pthread_mutex_trylock (&q->mutex))
359         {
360           return (-1);
361         }
362     }
363   else
364     pthread_mutex_lock (&q->mutex);
365
366   if (PREDICT_FALSE (q->cursize == 0))
367     {
368       if (cond == SVM_Q_NOWAIT)
369         {
370           pthread_mutex_unlock (&q->mutex);
371           return (-2);
372         }
373       else if (cond == SVM_Q_TIMEDWAIT)
374         {
375           while (q->cursize == 0 && rc == 0)
376             rc = svm_queue_timedwait_inline (q, time);
377
378           if (rc == ETIMEDOUT)
379             {
380               pthread_mutex_unlock (&q->mutex);
381               return ETIMEDOUT;
382             }
383         }
384       else
385         {
386           while (q->cursize == 0)
387             svm_queue_wait_inline (q);
388         }
389     }
390
391   headp = (i8 *) (&q->data[0] + q->elsize * q->head);
392   clib_memcpy_fast (elem, headp, q->elsize);
393
394   q->head++;
395   /* $$$$ JFC shouldn't this be == 0? */
396   if (q->cursize == q->maxsize)
397     need_broadcast = 1;
398
399   q->cursize--;
400
401   if (q->head == q->maxsize)
402     q->head = 0;
403
404   if (need_broadcast)
405     svm_queue_send_signal (q, 0);
406
407   pthread_mutex_unlock (&q->mutex);
408
409   return 0;
410 }
411
412 int
413 svm_queue_sub2 (svm_queue_t * q, u8 * elem)
414 {
415   int need_broadcast;
416   i8 *headp;
417
418   pthread_mutex_lock (&q->mutex);
419   if (q->cursize == 0)
420     {
421       pthread_mutex_unlock (&q->mutex);
422       return -1;
423     }
424
425   headp = (i8 *) (&q->data[0] + q->elsize * q->head);
426   clib_memcpy_fast (elem, headp, q->elsize);
427
428   q->head++;
429   need_broadcast = (q->cursize == q->maxsize / 2);
430   q->cursize--;
431
432   if (PREDICT_FALSE (q->head == q->maxsize))
433     q->head = 0;
434   pthread_mutex_unlock (&q->mutex);
435
436   if (need_broadcast)
437     svm_queue_send_signal (q, 0);
438
439   return 0;
440 }
441
442 int
443 svm_queue_sub_raw (svm_queue_t * q, u8 * elem)
444 {
445   i8 *headp;
446
447   if (PREDICT_FALSE (q->cursize == 0))
448     {
449       while (q->cursize == 0)
450         ;
451     }
452
453   headp = (i8 *) (&q->data[0] + q->elsize * q->head);
454   clib_memcpy_fast (elem, headp, q->elsize);
455
456   q->head = (q->head + 1) % q->maxsize;
457   q->cursize--;
458
459   return 0;
460 }
461
462 void
463 svm_queue_set_producer_event_fd (svm_queue_t * q, int fd)
464 {
465   q->producer_evtfd = fd;
466 }
467
468 void
469 svm_queue_set_consumer_event_fd (svm_queue_t * q, int fd)
470 {
471   q->consumer_evtfd = fd;
472 }
473
474 /*
475  * fd.io coding-style-patch-verification: ON
476  *
477  * Local Variables:
478  * eval: (c-set-style "gnu")
479  * End:
480  */