Initial commit of vpp code.
[vpp.git] / vlib-api / vlibmemory / unix_shared_memory_queue.c
1 /* 
2  *------------------------------------------------------------------
3  * unix_shared_memory_queue.c - unidirectional shared-memory queues
4  *
5  * Copyright (c) 2009 Cisco and/or its affiliates.
6  * Licensed under the Apache License, Version 2.0 (the "License");
7  * you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at:
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  *------------------------------------------------------------------
18  */
19
20 #include <stdio.h>
21 #include <stdlib.h>
22 #include <string.h>
23 #include <pthread.h>
24 #include <vppinfra/mem.h>
25 #include <vppinfra/format.h>
26 #include <vppinfra/cache.h>
27 #include <vlibmemory/unix_shared_memory_queue.h>
28 #include <signal.h>
29
30 /*
31  * unix_shared_memory_queue_init
32  * 
33  * nels = number of elements on the queue
34  * elsize = element size, presumably 4 and cacheline-size will
35  *          be popular choices.
36  * coid  = consumer coid, from ChannelCreate
37  * pid   = consumer pid
38  * pulse_code  = pulse code consumer expects
39  * pulse_value = pulse value consumer expects
40  * consumer_prio = consumer's priority, so pulses won't change
41  *                 the consumer's priority.
42  *
43  * The idea is to call this function in the queue consumer,
44  * and e-mail the queue pointer to the producer(s).
45  *
46  * The spp process / main thread allocates one of these
47  * at startup; its main input queue. The spp main input queue
48  * has a pointer to it in the shared memory segment header.
49  * 
50  * You probably want to be on an svm data heap before calling this 
51  * function.
52  */
53 unix_shared_memory_queue_t *
54 unix_shared_memory_queue_init(int nels, 
55                               int elsize, 
56                               int consumer_pid,
57                               int signal_when_queue_non_empty)
58 {
59     unix_shared_memory_queue_t *q;
60     pthread_mutexattr_t attr;
61     pthread_condattr_t cattr;
62
63     q = clib_mem_alloc_aligned(sizeof(unix_shared_memory_queue_t) 
64                                + nels*elsize, CLIB_CACHE_LINE_BYTES);
65     memset(q, 0, sizeof (*q));
66
67     q->elsize = elsize;
68     q->maxsize = nels;
69     q->consumer_pid = consumer_pid;
70     q->signal_when_queue_non_empty = signal_when_queue_non_empty;
71
72     memset(&attr,0,sizeof(attr));
73     memset(&cattr,0,sizeof(attr));
74     
75     if (pthread_mutexattr_init(&attr))
76         clib_unix_warning("mutexattr_init");
77     if (pthread_mutexattr_setpshared(&attr, PTHREAD_PROCESS_SHARED))
78         clib_unix_warning("pthread_mutexattr_setpshared");
79     if (pthread_mutex_init(&q->mutex, &attr))
80         clib_unix_warning("mutex_init");
81     if (pthread_mutexattr_destroy(&attr))
82         clib_unix_warning("mutexattr_destroy");
83     if (pthread_condattr_init(&cattr)) 
84         clib_unix_warning("condattr_init");
85     /* prints funny-looking messages in the Linux target */
86     if (pthread_condattr_setpshared(&cattr, PTHREAD_PROCESS_SHARED))
87         clib_unix_warning("condattr_setpshared");
88     if (pthread_cond_init(&q->condvar, &cattr))
89         clib_unix_warning("cond_init1");
90     if(pthread_condattr_destroy(&cattr))
91         clib_unix_warning("cond_init2");
92
93     return(q);
94 }
95
96 /*
97  * unix_shared_memory_queue_free
98  */
99 void unix_shared_memory_queue_free(unix_shared_memory_queue_t *q)
100 {
101     (void) pthread_mutex_destroy(&q->mutex);
102     (void) pthread_cond_destroy(&q->condvar);
103     clib_mem_free(q);
104 }
105
106 void unix_shared_memory_queue_lock (unix_shared_memory_queue_t *q)
107 {
108     pthread_mutex_lock(&q->mutex);
109 }
110
111 void unix_shared_memory_queue_unlock (unix_shared_memory_queue_t *q)
112 {
113     pthread_mutex_unlock(&q->mutex);
114 }
115
116 int unix_shared_memory_queue_is_full (unix_shared_memory_queue_t *q)
117 {
118     return q->cursize == q->maxsize;
119 }
120
121 /*
122  * unix_shared_memory_queue_add_nolock
123  */
124 int unix_shared_memory_queue_add_nolock (unix_shared_memory_queue_t *q, 
125                                          u8 *elem)
126 {
127     i8 *tailp;
128     int need_broadcast=0;
129     
130     if (PREDICT_FALSE(q->cursize == q->maxsize)) {
131         while(q->cursize == q->maxsize) {
132             (void) pthread_cond_wait(&q->condvar, &q->mutex);
133         }
134     }
135         
136     tailp = (i8 *)(&q->data[0] + q->elsize*q->tail);
137     memcpy(tailp, elem, q->elsize);
138
139     q->tail++;
140     q->cursize++;
141
142     need_broadcast = (q->cursize == 1);
143
144     if (q->tail == q->maxsize)
145         q->tail = 0;
146
147     if (need_broadcast) {
148         (void) pthread_cond_broadcast(&q->condvar);
149         if (q->signal_when_queue_non_empty)
150             kill (q->consumer_pid, q->signal_when_queue_non_empty);
151     }
152     return 0;
153 }
154
155 int unix_shared_memory_queue_add_raw (unix_shared_memory_queue_t *q, 
156                                       u8 *elem)
157 {
158     i8 *tailp;
159     
160     if (PREDICT_FALSE(q->cursize == q->maxsize)) {
161         while(q->cursize == q->maxsize)
162             ;
163     }
164         
165     tailp = (i8 *)(&q->data[0] + q->elsize*q->tail);
166     memcpy(tailp, elem, q->elsize);
167
168     q->tail++;
169     q->cursize++;
170
171     if (q->tail == q->maxsize)
172         q->tail = 0;
173     return 0;
174 }
175
176
177 /*
178  * unix_shared_memory_queue_add
179  */
180 int unix_shared_memory_queue_add (unix_shared_memory_queue_t *q, 
181                                   u8 *elem, int nowait)
182 {
183     i8 *tailp;
184     int need_broadcast=0;
185     
186     if (nowait) {
187         /* zero on success */
188         if (pthread_mutex_trylock (&q->mutex)) {
189             return (-1);
190         }
191     } else 
192         pthread_mutex_lock(&q->mutex);
193     
194     if (PREDICT_FALSE(q->cursize == q->maxsize)) {
195         if (nowait) {
196             pthread_mutex_unlock(&q->mutex);
197             return (-2);
198         }
199         while(q->cursize == q->maxsize) {
200             (void) pthread_cond_wait(&q->condvar, &q->mutex);
201         }
202     }
203         
204     tailp = (i8 *)(&q->data[0] + q->elsize*q->tail);
205     memcpy(tailp, elem, q->elsize);
206
207     q->tail++;
208     q->cursize++;
209
210     need_broadcast = (q->cursize == 1);
211
212     if (q->tail == q->maxsize)
213         q->tail = 0;
214
215     if (need_broadcast) {
216         (void) pthread_cond_broadcast(&q->condvar);
217         if (q->signal_when_queue_non_empty)
218             kill (q->consumer_pid, q->signal_when_queue_non_empty);
219     }
220     pthread_mutex_unlock(&q->mutex);
221
222     return 0;
223 }
224
225 /*
226  * unix_shared_memory_queue_sub
227  */
228 int unix_shared_memory_queue_sub(unix_shared_memory_queue_t *q, 
229                                  u8 *elem, int nowait)
230 {
231     i8 *headp;
232     int need_broadcast=0;
233     
234     if (nowait) {
235         /* zero on success */
236         if (pthread_mutex_trylock (&q->mutex)) {
237             return (-1);
238         }
239     } else 
240         pthread_mutex_lock(&q->mutex);
241     
242     if (PREDICT_FALSE(q->cursize == 0)) {
243         if (nowait) {
244             pthread_mutex_unlock(&q->mutex);
245             return (-2);
246         }
247         while (q->cursize == 0) {
248             (void) pthread_cond_wait(&q->condvar, &q->mutex);
249         }
250     }
251     
252     headp = (i8 *)(&q->data[0] + q->elsize*q->head);
253     memcpy(elem, headp, q->elsize);
254     
255     q->head++;
256     if (q->cursize == q->maxsize)
257         need_broadcast = 1;
258
259     q->cursize--;
260
261     if(q->head == q->maxsize)
262         q->head = 0;
263
264     if (need_broadcast) 
265         (void) pthread_cond_broadcast(&q->condvar);
266     
267     pthread_mutex_unlock(&q->mutex);
268
269     return 0;
270 }
271
272 int unix_shared_memory_queue_sub_raw (unix_shared_memory_queue_t *q, 
273                                       u8 *elem)
274 {
275     i8 *headp;
276     
277     if (PREDICT_FALSE(q->cursize == 0)) {
278         while (q->cursize == 0) 
279             ;
280     }
281     
282     headp = (i8 *)(&q->data[0] + q->elsize*q->head);
283     memcpy(elem, headp, q->elsize);
284     
285     q->head++;
286     q->cursize--;
287
288     if(q->head == q->maxsize)
289         q->head = 0;
290     return 0;
291 }