misc: add address sanitizer heap instrumentation
[vpp.git] / src / vlibmemory / memory_shared.c
1 /*
2  *------------------------------------------------------------------
3  * memclnt_shared.c - API message handling, common code for both clients
4  * and the vlib process itself.
5  *
6  *
7  * Copyright (c) 2009 Cisco and/or its affiliates.
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at:
11  *
12  *     http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  *------------------------------------------------------------------
20  */
21
22 #include <stdio.h>
23 #include <stdlib.h>
24 #include <stddef.h>
25 #include <string.h>
26 #include <unistd.h>
27 #include <signal.h>
28
29 #include <vppinfra/format.h>
30 #include <vppinfra/byte_order.h>
31 #include <vppinfra/error.h>
32 #include <vppinfra/elog.h>
33 #include <svm/queue.h>
34 #include <vlib/vlib.h>
35 #include <vlib/unix/unix.h>
36 #include <vlibmemory/memory_api.h>
37 #include <vlibmemory/vl_memory_msg_enum.h>
38
39 #define vl_typedefs
40 #include <vlibmemory/vl_memory_api_h.h>
41 #undef vl_typedefs
42
43 #define DEBUG_MESSAGE_BUFFER_OVERRUN 0
44
45 CLIB_NOSANITIZE_ADDR static inline void *
46 vl_msg_api_alloc_internal (int nbytes, int pool, int may_return_null)
47 {
48   int i;
49   msgbuf_t *rv;
50   ring_alloc_t *ap;
51   svm_queue_t *q;
52   void *oldheap;
53   vl_shmem_hdr_t *shmem_hdr;
54   api_main_t *am = &api_main;
55
56   shmem_hdr = am->shmem_hdr;
57
58 #if DEBUG_MESSAGE_BUFFER_OVERRUN > 0
59   nbytes += 4;
60 #endif
61
62   ASSERT (pool == 0 || vlib_get_thread_index () == 0);
63
64   if (shmem_hdr == 0)
65     {
66       clib_warning ("shared memory header NULL");
67       return 0;
68     }
69
70   /* account for the msgbuf_t header */
71   nbytes += sizeof (msgbuf_t);
72
73   if (shmem_hdr->vl_rings == 0)
74     {
75       clib_warning ("vl_rings NULL");
76       ASSERT (0);
77       abort ();
78     }
79
80   if (shmem_hdr->client_rings == 0)
81     {
82       clib_warning ("client_rings NULL");
83       ASSERT (0);
84       abort ();
85     }
86
87   ap = pool ? shmem_hdr->vl_rings : shmem_hdr->client_rings;
88   for (i = 0; i < vec_len (ap); i++)
89     {
90       /* Too big? */
91       if (nbytes > ap[i].size)
92         {
93           continue;
94         }
95
96       q = ap[i].rp;
97       if (pool == 0)
98         {
99           pthread_mutex_lock (&q->mutex);
100         }
101       rv = (msgbuf_t *) (&q->data[0] + q->head * q->elsize);
102       /*
103        * Is this item still in use?
104        */
105       if (rv->q)
106         {
107           u32 now = (u32) time (0);
108
109           if (PREDICT_TRUE (rv->gc_mark_timestamp == 0))
110             rv->gc_mark_timestamp = now;
111           else
112             {
113               if (now - rv->gc_mark_timestamp > 10)
114                 {
115                   if (CLIB_DEBUG > 0)
116                     {
117                       u16 *msg_idp, msg_id;
118                       clib_warning
119                         ("garbage collect pool %d ring %d index %d", pool, i,
120                          q->head);
121                       msg_idp = (u16 *) (rv->data);
122                       msg_id = clib_net_to_host_u16 (*msg_idp);
123                       if (msg_id < vec_len (api_main.msg_names))
124                         clib_warning ("msg id %d name %s", (u32) msg_id,
125                                       api_main.msg_names[msg_id]);
126                     }
127                   shmem_hdr->garbage_collects++;
128                   goto collected;
129                 }
130             }
131
132
133           /* yes, loser; try next larger pool */
134           ap[i].misses++;
135           if (pool == 0)
136             pthread_mutex_unlock (&q->mutex);
137           continue;
138         }
139     collected:
140
141       /* OK, we have a winner */
142       ap[i].hits++;
143       /*
144        * Remember the source queue, although we
145        * don't need to know the queue to free the item.
146        */
147       rv->q = q;
148       rv->gc_mark_timestamp = 0;
149       q->head++;
150       if (q->head == q->maxsize)
151         q->head = 0;
152
153       if (pool == 0)
154         pthread_mutex_unlock (&q->mutex);
155       goto out;
156     }
157
158   /*
159    * Request too big, or head element of all size-compatible rings
160    * still in use. Fall back to shared-memory malloc.
161    */
162   am->ring_misses++;
163
164   pthread_mutex_lock (&am->vlib_rp->mutex);
165   oldheap = svm_push_data_heap (am->vlib_rp);
166   if (may_return_null)
167     {
168       rv = clib_mem_alloc_or_null (nbytes);
169       if (PREDICT_FALSE (rv == 0))
170         {
171           svm_pop_heap (oldheap);
172           pthread_mutex_unlock (&am->vlib_rp->mutex);
173           return 0;
174         }
175     }
176   else
177     rv = clib_mem_alloc (nbytes);
178
179   rv->q = 0;
180   rv->gc_mark_timestamp = 0;
181   svm_pop_heap (oldheap);
182   pthread_mutex_unlock (&am->vlib_rp->mutex);
183
184 out:
185 #if DEBUG_MESSAGE_BUFFER_OVERRUN > 0
186   {
187     nbytes -= 4;
188     u32 *overrun;
189     overrun = (u32 *) (rv->data + nbytes - sizeof (msgbuf_t));
190     *overrun = 0x1badbabe;
191   }
192 #endif
193   rv->data_len = htonl (nbytes - sizeof (msgbuf_t));
194
195   VL_MSG_API_UNPOISON (rv->data);
196   return (rv->data);
197 }
198
199 void *
200 vl_msg_api_alloc (int nbytes)
201 {
202   int pool;
203   api_main_t *am = &api_main;
204   vl_shmem_hdr_t *shmem_hdr = am->shmem_hdr;
205
206   /*
207    * Clients use pool-0, vlib proc uses pool 1
208    */
209   pool = (am->our_pid == shmem_hdr->vl_pid);
210   return vl_msg_api_alloc_internal (nbytes, pool, 0 /* may_return_null */ );
211 }
212
213 void *
214 vl_msg_api_alloc_zero (int nbytes)
215 {
216   void *ret;
217
218   ret = vl_msg_api_alloc (nbytes);
219   clib_memset (ret, 0, nbytes);
220   return ret;
221 }
222
223 void *
224 vl_msg_api_alloc_or_null (int nbytes)
225 {
226   int pool;
227   api_main_t *am = &api_main;
228   vl_shmem_hdr_t *shmem_hdr = am->shmem_hdr;
229
230   pool = (am->our_pid == shmem_hdr->vl_pid);
231   return vl_msg_api_alloc_internal (nbytes, pool, 1 /* may_return_null */ );
232 }
233
234 void *
235 vl_msg_api_alloc_as_if_client (int nbytes)
236 {
237   return vl_msg_api_alloc_internal (nbytes, 0, 0 /* may_return_null */ );
238 }
239
240 void *
241 vl_msg_api_alloc_zero_as_if_client (int nbytes)
242 {
243   void *ret;
244
245   ret = vl_msg_api_alloc_as_if_client (nbytes);
246   clib_memset (ret, 0, nbytes);
247   return ret;
248 }
249
250 void *
251 vl_msg_api_alloc_as_if_client_or_null (int nbytes)
252 {
253   return vl_msg_api_alloc_internal (nbytes, 0, 1 /* may_return_null */ );
254 }
255
256 void *
257 vl_mem_api_alloc_as_if_client_w_reg (vl_api_registration_t * reg, int nbytes)
258 {
259   api_main_t *am = &api_main;
260   vl_shmem_hdr_t *save_shmem_hdr = am->shmem_hdr;
261   svm_region_t *vlib_rp, *save_vlib_rp = am->vlib_rp;
262   void *msg;
263
264   vlib_rp = am->vlib_rp = reg->vlib_rp;
265   am->shmem_hdr = (void *) vlib_rp->user_ctx;
266
267   msg = vl_msg_api_alloc_internal (nbytes, 0, 0 /* may_return_null */ );
268
269   am->shmem_hdr = save_shmem_hdr;
270   am->vlib_rp = save_vlib_rp;
271
272   return msg;
273 }
274
275 void
276 vl_msg_api_free (void *a)
277 {
278   msgbuf_t *rv;
279   void *oldheap;
280   api_main_t *am = &api_main;
281
282   rv = (msgbuf_t *) (((u8 *) a) - offsetof (msgbuf_t, data));
283
284   /*
285    * Here's the beauty of the scheme.  Only one proc/thread has
286    * control of a given message buffer. To free a buffer, we just clear the
287    * queue field, and leave. No locks, no hits, no errors...
288    */
289   if (rv->q)
290     {
291       rv->q = 0;
292       rv->gc_mark_timestamp = 0;
293 #if DEBUG_MESSAGE_BUFFER_OVERRUN > 0
294       {
295         u32 *overrun;
296         overrun = (u32 *) (rv->data + ntohl (rv->data_len));
297         ASSERT (*overrun == 0x1badbabe);
298       }
299 #endif
300       VL_MSG_API_POISON (rv->data);
301       return;
302     }
303
304   pthread_mutex_lock (&am->vlib_rp->mutex);
305   oldheap = svm_push_data_heap (am->vlib_rp);
306
307 #if DEBUG_MESSAGE_BUFFER_OVERRUN > 0
308   {
309     u32 *overrun;
310     overrun = (u32 *) (rv->data + ntohl (rv->data_len));
311     ASSERT (*overrun == 0x1badbabe);
312   }
313 #endif
314
315   clib_mem_free (rv);
316   svm_pop_heap (oldheap);
317   pthread_mutex_unlock (&am->vlib_rp->mutex);
318 }
319
320 static void
321 vl_msg_api_free_nolock (void *a)
322 {
323   msgbuf_t *rv;
324   void *oldheap;
325   api_main_t *am = &api_main;
326
327   rv = (msgbuf_t *) (((u8 *) a) - offsetof (msgbuf_t, data));
328   /*
329    * Here's the beauty of the scheme.  Only one proc/thread has
330    * control of a given message buffer. To free a buffer, we just clear the
331    * queue field, and leave. No locks, no hits, no errors...
332    */
333   if (rv->q)
334     {
335       rv->q = 0;
336       VL_MSG_API_POISON (rv->data);
337       return;
338     }
339
340   oldheap = svm_push_data_heap (am->vlib_rp);
341   clib_mem_free (rv);
342   svm_pop_heap (oldheap);
343 }
344
345 void
346 vl_set_memory_root_path (const char *name)
347 {
348   api_main_t *am = &api_main;
349
350   am->root_path = name;
351 }
352
353 void
354 vl_set_memory_uid (int uid)
355 {
356   api_main_t *am = &api_main;
357
358   am->api_uid = uid;
359 }
360
361 void
362 vl_set_memory_gid (int gid)
363 {
364   api_main_t *am = &api_main;
365
366   am->api_gid = gid;
367 }
368
369 void
370 vl_set_global_memory_baseva (u64 baseva)
371 {
372   api_main_t *am = &api_main;
373
374   am->global_baseva = baseva;
375 }
376
377 void
378 vl_set_global_memory_size (u64 size)
379 {
380   api_main_t *am = &api_main;
381
382   am->global_size = size;
383 }
384
385 void
386 vl_set_api_memory_size (u64 size)
387 {
388   api_main_t *am = &api_main;
389
390   am->api_size = size;
391 }
392
393 void
394 vl_set_global_pvt_heap_size (u64 size)
395 {
396   api_main_t *am = &api_main;
397
398   am->global_pvt_heap_size = size;
399 }
400
401 void
402 vl_set_api_pvt_heap_size (u64 size)
403 {
404   api_main_t *am = &api_main;
405
406   am->api_pvt_heap_size = size;
407 }
408
409 static void
410 vl_api_default_mem_config (vl_shmem_hdr_t * shmem_hdr)
411 {
412   api_main_t *am = &api_main;
413   u32 vlib_input_queue_length;
414
415   /* vlib main input queue */
416   vlib_input_queue_length = 1024;
417   if (am->vlib_input_queue_length)
418     vlib_input_queue_length = am->vlib_input_queue_length;
419
420   shmem_hdr->vl_input_queue =
421     svm_queue_alloc_and_init (vlib_input_queue_length, sizeof (uword),
422                               getpid ());
423
424 #define _(sz,n)                                                 \
425     do {                                                        \
426         ring_alloc_t _rp;                                       \
427         _rp.rp = svm_queue_alloc_and_init ((n), (sz), 0);       \
428         _rp.size = (sz);                                        \
429         _rp.nitems = n;                                         \
430         _rp.hits = 0;                                           \
431         _rp.misses = 0;                                         \
432         vec_add1(shmem_hdr->vl_rings, _rp);                     \
433     } while (0);
434
435   foreach_vl_aring_size;
436 #undef _
437
438 #define _(sz,n)                                                 \
439     do {                                                        \
440         ring_alloc_t _rp;                                       \
441         _rp.rp = svm_queue_alloc_and_init ((n), (sz), 0);       \
442         _rp.size = (sz);                                        \
443         _rp.nitems = n;                                         \
444         _rp.hits = 0;                                           \
445         _rp.misses = 0;                                         \
446         vec_add1(shmem_hdr->client_rings, _rp);                 \
447     } while (0);
448
449   foreach_clnt_aring_size;
450 #undef _
451 }
452
453 void
454 vl_api_mem_config (vl_shmem_hdr_t * hdr, vl_api_shm_elem_config_t * config)
455 {
456   vl_api_shm_elem_config_t *c;
457   ring_alloc_t *rp;
458   u32 size;
459
460   if (!config)
461     {
462       vl_api_default_mem_config (hdr);
463       return;
464     }
465
466   vec_foreach (c, config)
467   {
468     switch (c->type)
469       {
470       case VL_API_QUEUE:
471         hdr->vl_input_queue = svm_queue_alloc_and_init (c->count, c->size,
472                                                         getpid ());
473         continue;
474       case VL_API_VLIB_RING:
475         vec_add2 (hdr->vl_rings, rp, 1);
476         break;
477       case VL_API_CLIENT_RING:
478         vec_add2 (hdr->client_rings, rp, 1);
479         break;
480       default:
481         clib_warning ("unknown config type: %d", c->type);
482         continue;
483       }
484
485     size = sizeof (ring_alloc_t) + c->size;
486     rp->rp = svm_queue_alloc_and_init (c->count, size, 0);
487     rp->size = size;
488     rp->nitems = c->count;
489     rp->hits = 0;
490     rp->misses = 0;
491   }
492 }
493
494 void
495 vl_init_shmem (svm_region_t * vlib_rp, vl_api_shm_elem_config_t * config,
496                int is_vlib, int is_private_region)
497 {
498   api_main_t *am = &api_main;
499   vl_shmem_hdr_t *shmem_hdr = 0;
500   void *oldheap;
501   ASSERT (vlib_rp);
502
503   /* $$$$ need private region config parameters */
504
505   oldheap = svm_push_data_heap (vlib_rp);
506
507   vec_validate (shmem_hdr, 0);
508   shmem_hdr->version = VL_SHM_VERSION;
509   shmem_hdr->clib_file_index = VL_API_INVALID_FI;
510
511   /* Set up the queue and msg ring allocator */
512   vl_api_mem_config (shmem_hdr, config);
513
514   if (is_private_region == 0)
515     {
516       am->shmem_hdr = shmem_hdr;
517       am->vlib_rp = vlib_rp;
518       am->our_pid = getpid ();
519       if (is_vlib)
520         am->shmem_hdr->vl_pid = am->our_pid;
521     }
522   else
523     shmem_hdr->vl_pid = am->our_pid;
524
525   svm_pop_heap (oldheap);
526
527   /*
528    * After absolutely everything that a client might see is set up,
529    * declare the shmem region valid
530    */
531   vlib_rp->user_ctx = shmem_hdr;
532
533   pthread_mutex_unlock (&vlib_rp->mutex);
534 }
535
536 int
537 vl_map_shmem (const char *region_name, int is_vlib)
538 {
539   svm_map_region_args_t _a, *a = &_a;
540   svm_region_t *vlib_rp, *root_rp;
541   api_main_t *am = &api_main;
542   int i;
543   struct timespec ts, tsrem;
544   char *vpe_api_region_suffix = "-vpe-api";
545
546   clib_memset (a, 0, sizeof (*a));
547
548   if (strstr (region_name, vpe_api_region_suffix))
549     {
550       u8 *root_path = format (0, "%s", region_name);
551       _vec_len (root_path) = (vec_len (root_path) -
552                               strlen (vpe_api_region_suffix));
553       vec_terminate_c_string (root_path);
554       a->root_path = (const char *) root_path;
555       am->root_path = (const char *) root_path;
556     }
557
558   if (is_vlib == 0)
559     {
560       int tfd;
561       u8 *api_name;
562       /*
563        * Clients wait for vpp to set up the root / API regioins
564        */
565       if (am->root_path)
566         api_name = format (0, "/dev/shm/%s-%s%c", am->root_path,
567                            region_name + 1, 0);
568       else
569         api_name = format (0, "/dev/shm%s%c", region_name, 0);
570
571       /* Wait up to 100 seconds... */
572       for (i = 0; i < 10000; i++)
573         {
574           ts.tv_sec = 0;
575           ts.tv_nsec = 10000 * 1000;    /* 10 ms */
576           while (nanosleep (&ts, &tsrem) < 0)
577             ts = tsrem;
578           tfd = open ((char *) api_name, O_RDWR);
579           if (tfd >= 0)
580             break;
581         }
582       vec_free (api_name);
583       if (tfd < 0)
584         {
585           clib_warning ("region init fail");
586           return -2;
587         }
588       close (tfd);
589       svm_region_init_chroot_uid_gid (am->root_path, getuid (), getgid ());
590     }
591
592   if (a->root_path != NULL)
593     {
594       a->name = "/vpe-api";
595     }
596   else
597     a->name = region_name;
598   a->size = am->api_size ? am->api_size : (16 << 20);
599   a->flags = SVM_FLAGS_MHEAP;
600   a->uid = am->api_uid;
601   a->gid = am->api_gid;
602   a->pvt_heap_size = am->api_pvt_heap_size;
603
604   vlib_rp = svm_region_find_or_create (a);
605
606   if (vlib_rp == 0)
607     return (-2);
608
609   pthread_mutex_lock (&vlib_rp->mutex);
610   /* Has someone else set up the shared-memory variable table? */
611   if (vlib_rp->user_ctx)
612     {
613       am->shmem_hdr = (void *) vlib_rp->user_ctx;
614       am->our_pid = getpid ();
615       if (is_vlib)
616         {
617           svm_queue_t *q;
618           uword old_msg;
619           /*
620            * application restart. Reset cached pids, API message
621            * rings, list of clients; otherwise, various things
622            * fail. (e.g. queue non-empty notification)
623            */
624
625           /* ghosts keep the region from disappearing properly */
626           svm_client_scan_this_region_nolock (vlib_rp);
627           am->shmem_hdr->application_restarts++;
628           q = am->shmem_hdr->vl_input_queue;
629           am->shmem_hdr->vl_pid = getpid ();
630           q->consumer_pid = am->shmem_hdr->vl_pid;
631           /* Drain the input queue, freeing msgs */
632           for (i = 0; i < 10; i++)
633             {
634               if (pthread_mutex_trylock (&q->mutex) == 0)
635                 {
636                   pthread_mutex_unlock (&q->mutex);
637                   goto mutex_ok;
638                 }
639               ts.tv_sec = 0;
640               ts.tv_nsec = 10000 * 1000;        /* 10 ms */
641               while (nanosleep (&ts, &tsrem) < 0)
642                 ts = tsrem;
643             }
644           /* Mutex buggered, "fix" it */
645           clib_memset (&q->mutex, 0, sizeof (q->mutex));
646           clib_warning ("forcibly release main input queue mutex");
647
648         mutex_ok:
649           am->vlib_rp = vlib_rp;
650           while (svm_queue_sub (q, (u8 *) & old_msg, SVM_Q_NOWAIT, 0)
651                  != -2 /* queue underflow */ )
652             {
653               vl_msg_api_free_nolock ((void *) old_msg);
654               am->shmem_hdr->restart_reclaims++;
655             }
656           pthread_mutex_unlock (&vlib_rp->mutex);
657           root_rp = svm_get_root_rp ();
658           ASSERT (root_rp);
659           /* Clean up the root region client list */
660           pthread_mutex_lock (&root_rp->mutex);
661           svm_client_scan_this_region_nolock (root_rp);
662           pthread_mutex_unlock (&root_rp->mutex);
663         }
664       else
665         {
666           pthread_mutex_unlock (&vlib_rp->mutex);
667         }
668       am->vlib_rp = vlib_rp;
669       vec_add1 (am->mapped_shmem_regions, vlib_rp);
670       return 0;
671     }
672   /* Clients simply have to wait... */
673   if (!is_vlib)
674     {
675       pthread_mutex_unlock (&vlib_rp->mutex);
676
677       /* Wait up to 100 seconds... */
678       for (i = 0; i < 10000; i++)
679         {
680           ts.tv_sec = 0;
681           ts.tv_nsec = 10000 * 1000;    /* 10 ms */
682           while (nanosleep (&ts, &tsrem) < 0)
683             ts = tsrem;
684           if (vlib_rp->user_ctx)
685             goto ready;
686         }
687       /* Clean up and leave... */
688       svm_region_unmap (vlib_rp);
689       clib_warning ("region init fail");
690       return (-2);
691
692     ready:
693       am->shmem_hdr = (void *) vlib_rp->user_ctx;
694       am->our_pid = getpid ();
695       am->vlib_rp = vlib_rp;
696       vec_add1 (am->mapped_shmem_regions, vlib_rp);
697       return 0;
698     }
699
700   /* Nope, it's our problem... */
701   vl_init_shmem (vlib_rp, 0 /* default config */ , 1 /* is vlib */ ,
702                  0 /* is_private_region */ );
703
704   vec_add1 (am->mapped_shmem_regions, vlib_rp);
705   return 0;
706 }
707
708 void
709 vl_register_mapped_shmem_region (svm_region_t * rp)
710 {
711   api_main_t *am = &api_main;
712
713   vec_add1 (am->mapped_shmem_regions, rp);
714 }
715
716 static void
717 vl_unmap_shmem_internal (u8 is_client)
718 {
719   svm_region_t *rp;
720   int i;
721   api_main_t *am = &api_main;
722
723   if (!svm_get_root_rp ())
724     return;
725
726   for (i = 0; i < vec_len (am->mapped_shmem_regions); i++)
727     {
728       rp = am->mapped_shmem_regions[i];
729       is_client ? svm_region_unmap_client (rp) : svm_region_unmap (rp);
730     }
731
732   vec_free (am->mapped_shmem_regions);
733   am->shmem_hdr = 0;
734
735   is_client ? svm_region_exit_client () : svm_region_exit ();
736
737   /* $$$ more careful cleanup, valgrind run... */
738   vec_free (am->msg_handlers);
739   vec_free (am->msg_endian_handlers);
740   vec_free (am->msg_print_handlers);
741 }
742
743 void
744 vl_unmap_shmem (void)
745 {
746   vl_unmap_shmem_internal (0);
747 }
748
749 void
750 vl_unmap_shmem_client (void)
751 {
752   vl_unmap_shmem_internal (1);
753 }
754
755 void
756 vl_msg_api_send_shmem (svm_queue_t * q, u8 * elem)
757 {
758   api_main_t *am = &api_main;
759   void *msg = (void *) *(uword *) elem;
760
761   if (am->tx_trace && am->tx_trace->enabled)
762     vl_msg_api_trace (am, am->tx_trace, msg);
763
764   /*
765    * Announce a probable binary API client bug:
766    * some client's input queue is stuffed.
767    * The situation may be recoverable, or not.
768    */
769   if (PREDICT_FALSE
770       (am->vl_clients /* vpp side */  && (q->cursize == q->maxsize)))
771     {
772       if (PREDICT_FALSE (am->elog_trace_api_messages))
773         {
774           /* *INDENT-OFF* */
775           ELOG_TYPE_DECLARE (e) =
776             {
777               .format = "api-client-queue-stuffed: %x%x",
778               .format_args = "i4i4",
779             };
780           /* *INDENT-ON* */
781           struct
782           {
783             u32 hi, low;
784           } *ed;
785           ed = ELOG_DATA (am->elog_main, e);
786           ed->hi = (uword) q >> 32;
787           ed->low = (uword) q & 0xFFFFFFFF;
788           clib_warning ("WARNING: client input queue at %llx is stuffed...",
789                         q);
790         }
791     }
792   VL_MSG_API_POISON (msg);
793   (void) svm_queue_add (q, elem, 0 /* nowait */ );
794 }
795
796 int
797 vl_mem_api_can_send (svm_queue_t * q)
798 {
799   return (q->cursize < q->maxsize);
800 }
801
802 void
803 vl_msg_api_send_shmem_nolock (svm_queue_t * q, u8 * elem)
804 {
805   api_main_t *am = &api_main;
806   void *msg = (void *) *(uword *) elem;
807
808   if (am->tx_trace && am->tx_trace->enabled)
809     vl_msg_api_trace (am, am->tx_trace, msg);
810
811   (void) svm_queue_add_nolock (q, elem);
812   VL_MSG_API_POISON (msg);
813 }
814
815 /*
816  * fd.io coding-style-patch-verification: ON
817  *
818  * Local Variables:
819  * eval: (c-set-style "gnu")
820  * End:
821  */