VPP-598: tcp stack initial commit
[vpp.git] / src / vlibmemory / unix_shared_memory_queue.c
1 /*
2  *------------------------------------------------------------------
3  * unix_shared_memory_queue.c - unidirectional shared-memory queues
4  *
5  * Copyright (c) 2009 Cisco and/or its affiliates.
6  * Licensed under the Apache License, Version 2.0 (the "License");
7  * you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at:
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  *------------------------------------------------------------------
18  */
19
20 #include <stdio.h>
21 #include <stdlib.h>
22 #include <string.h>
23 #include <pthread.h>
24 #include <vppinfra/mem.h>
25 #include <vppinfra/format.h>
26 #include <vppinfra/cache.h>
27 #include <vlibmemory/unix_shared_memory_queue.h>
28 #include <signal.h>
29
30 /*
31  * unix_shared_memory_queue_init
32  *
33  * nels = number of elements on the queue
34  * elsize = element size, presumably 4 and cacheline-size will
35  *          be popular choices.
36  * pid   = consumer pid
37  *
38  * The idea is to call this function in the queue consumer,
39  * and e-mail the queue pointer to the producer(s).
40  *
41  * The vpp process / main thread allocates one of these
42  * at startup; its main input queue. The vpp main input queue
43  * has a pointer to it in the shared memory segment header.
44  *
45  * You probably want to be on an svm data heap before calling this
46  * function.
47  */
48 unix_shared_memory_queue_t *
49 unix_shared_memory_queue_init (int nels,
50                                int elsize,
51                                int consumer_pid,
52                                int signal_when_queue_non_empty)
53 {
54   unix_shared_memory_queue_t *q;
55   pthread_mutexattr_t attr;
56   pthread_condattr_t cattr;
57
58   q = clib_mem_alloc_aligned (sizeof (unix_shared_memory_queue_t)
59                               + nels * elsize, CLIB_CACHE_LINE_BYTES);
60   memset (q, 0, sizeof (*q));
61
62   q->elsize = elsize;
63   q->maxsize = nels;
64   q->consumer_pid = consumer_pid;
65   q->signal_when_queue_non_empty = signal_when_queue_non_empty;
66
67   memset (&attr, 0, sizeof (attr));
68   memset (&cattr, 0, sizeof (cattr));
69
70   if (pthread_mutexattr_init (&attr))
71     clib_unix_warning ("mutexattr_init");
72   if (pthread_mutexattr_setpshared (&attr, PTHREAD_PROCESS_SHARED))
73     clib_unix_warning ("pthread_mutexattr_setpshared");
74   if (pthread_mutex_init (&q->mutex, &attr))
75     clib_unix_warning ("mutex_init");
76   if (pthread_mutexattr_destroy (&attr))
77     clib_unix_warning ("mutexattr_destroy");
78   if (pthread_condattr_init (&cattr))
79     clib_unix_warning ("condattr_init");
80   /* prints funny-looking messages in the Linux target */
81   if (pthread_condattr_setpshared (&cattr, PTHREAD_PROCESS_SHARED))
82     clib_unix_warning ("condattr_setpshared");
83   if (pthread_cond_init (&q->condvar, &cattr))
84     clib_unix_warning ("cond_init1");
85   if (pthread_condattr_destroy (&cattr))
86     clib_unix_warning ("cond_init2");
87
88   return (q);
89 }
90
91 /*
92  * unix_shared_memory_queue_free
93  */
94 void
95 unix_shared_memory_queue_free (unix_shared_memory_queue_t * q)
96 {
97   (void) pthread_mutex_destroy (&q->mutex);
98   (void) pthread_cond_destroy (&q->condvar);
99   clib_mem_free (q);
100 }
101
102 void
103 unix_shared_memory_queue_lock (unix_shared_memory_queue_t * q)
104 {
105   pthread_mutex_lock (&q->mutex);
106 }
107
108 void
109 unix_shared_memory_queue_unlock (unix_shared_memory_queue_t * q)
110 {
111   pthread_mutex_unlock (&q->mutex);
112 }
113
114 int
115 unix_shared_memory_queue_is_full (unix_shared_memory_queue_t * q)
116 {
117   return q->cursize == q->maxsize;
118 }
119
120 /*
121  * unix_shared_memory_queue_add_nolock
122  */
123 int
124 unix_shared_memory_queue_add_nolock (unix_shared_memory_queue_t * q,
125                                      u8 * elem)
126 {
127   i8 *tailp;
128   int need_broadcast = 0;
129
130   if (PREDICT_FALSE (q->cursize == q->maxsize))
131     {
132       while (q->cursize == q->maxsize)
133         {
134           (void) pthread_cond_wait (&q->condvar, &q->mutex);
135         }
136     }
137
138   tailp = (i8 *) (&q->data[0] + q->elsize * q->tail);
139   clib_memcpy (tailp, elem, q->elsize);
140
141   q->tail++;
142   q->cursize++;
143
144   need_broadcast = (q->cursize == 1);
145
146   if (q->tail == q->maxsize)
147     q->tail = 0;
148
149   if (need_broadcast)
150     {
151       (void) pthread_cond_broadcast (&q->condvar);
152       if (q->signal_when_queue_non_empty)
153         kill (q->consumer_pid, q->signal_when_queue_non_empty);
154     }
155   return 0;
156 }
157
158 int
159 unix_shared_memory_queue_add_raw (unix_shared_memory_queue_t * q, u8 * elem)
160 {
161   i8 *tailp;
162
163   if (PREDICT_FALSE (q->cursize == q->maxsize))
164     {
165       while (q->cursize == q->maxsize)
166         ;
167     }
168
169   tailp = (i8 *) (&q->data[0] + q->elsize * q->tail);
170   clib_memcpy (tailp, elem, q->elsize);
171
172   q->tail++;
173   q->cursize++;
174
175   if (q->tail == q->maxsize)
176     q->tail = 0;
177   return 0;
178 }
179
180
181 /*
182  * unix_shared_memory_queue_add
183  */
184 int
185 unix_shared_memory_queue_add (unix_shared_memory_queue_t * q,
186                               u8 * elem, int nowait)
187 {
188   i8 *tailp;
189   int need_broadcast = 0;
190
191   if (nowait)
192     {
193       /* zero on success */
194       if (pthread_mutex_trylock (&q->mutex))
195         {
196           return (-1);
197         }
198     }
199   else
200     pthread_mutex_lock (&q->mutex);
201
202   if (PREDICT_FALSE (q->cursize == q->maxsize))
203     {
204       if (nowait)
205         {
206           pthread_mutex_unlock (&q->mutex);
207           return (-2);
208         }
209       while (q->cursize == q->maxsize)
210         {
211           (void) pthread_cond_wait (&q->condvar, &q->mutex);
212         }
213     }
214
215   tailp = (i8 *) (&q->data[0] + q->elsize * q->tail);
216   clib_memcpy (tailp, elem, q->elsize);
217
218   q->tail++;
219   q->cursize++;
220
221   need_broadcast = (q->cursize == 1);
222
223   if (q->tail == q->maxsize)
224     q->tail = 0;
225
226   if (need_broadcast)
227     {
228       (void) pthread_cond_broadcast (&q->condvar);
229       if (q->signal_when_queue_non_empty)
230         kill (q->consumer_pid, q->signal_when_queue_non_empty);
231     }
232   pthread_mutex_unlock (&q->mutex);
233
234   return 0;
235 }
236
237 /*
238  * unix_shared_memory_queue_sub
239  */
240 int
241 unix_shared_memory_queue_sub (unix_shared_memory_queue_t * q,
242                               u8 * elem, int nowait)
243 {
244   i8 *headp;
245   int need_broadcast = 0;
246
247   if (nowait)
248     {
249       /* zero on success */
250       if (pthread_mutex_trylock (&q->mutex))
251         {
252           return (-1);
253         }
254     }
255   else
256     pthread_mutex_lock (&q->mutex);
257
258   if (PREDICT_FALSE (q->cursize == 0))
259     {
260       if (nowait)
261         {
262           pthread_mutex_unlock (&q->mutex);
263           return (-2);
264         }
265       while (q->cursize == 0)
266         {
267           (void) pthread_cond_wait (&q->condvar, &q->mutex);
268         }
269     }
270
271   headp = (i8 *) (&q->data[0] + q->elsize * q->head);
272   clib_memcpy (elem, headp, q->elsize);
273
274   q->head++;
275   /* $$$$ JFC shouldn't this be == 0? */
276   if (q->cursize == q->maxsize)
277     need_broadcast = 1;
278
279   q->cursize--;
280
281   if (q->head == q->maxsize)
282     q->head = 0;
283
284   if (need_broadcast)
285     (void) pthread_cond_broadcast (&q->condvar);
286
287   pthread_mutex_unlock (&q->mutex);
288
289   return 0;
290 }
291
292 int
293 unix_shared_memory_queue_sub_raw (unix_shared_memory_queue_t * q, u8 * elem)
294 {
295   i8 *headp;
296
297   if (PREDICT_FALSE (q->cursize == 0))
298     {
299       while (q->cursize == 0)
300         ;
301     }
302
303   headp = (i8 *) (&q->data[0] + q->elsize * q->head);
304   clib_memcpy (elem, headp, q->elsize);
305
306   q->head++;
307   q->cursize--;
308
309   if (q->head == q->maxsize)
310     q->head = 0;
311   return 0;
312 }
313
314 /*
315  * fd.io coding-style-patch-verification: ON
316  *
317  * Local Variables:
318  * eval: (c-set-style "gnu")
319  * End:
320  */