api: remove garbage from sockclnt_create reply
[vpp.git] / src / vlibmemory / memory_shared.c
1 /*
2  *------------------------------------------------------------------
3  * memclnt_shared.c - API message handling, common code for both clients
4  * and the vlib process itself.
5  *
6  *
7  * Copyright (c) 2009 Cisco and/or its affiliates.
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at:
11  *
12  *     http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  *------------------------------------------------------------------
20  */
21
22 #include <stdio.h>
23 #include <stdlib.h>
24 #include <stddef.h>
25 #include <string.h>
26 #include <unistd.h>
27 #include <signal.h>
28
29 #include <vppinfra/format.h>
30 #include <vppinfra/byte_order.h>
31 #include <vppinfra/error.h>
32 #include <svm/queue.h>
33 #include <vlib/vlib.h>
34 #include <vlib/unix/unix.h>
35 #include <vlibmemory/memory_api.h>
36 #include <vlibmemory/vl_memory_msg_enum.h>
37
38 #define vl_typedefs
39 #include <vlibmemory/vl_memory_api_h.h>
40 #undef vl_typedefs
41
42 #define DEBUG_MESSAGE_BUFFER_OVERRUN 0
43
44 static inline void *
45 vl_msg_api_alloc_internal (int nbytes, int pool, int may_return_null)
46 {
47   int i;
48   msgbuf_t *rv;
49   ring_alloc_t *ap;
50   svm_queue_t *q;
51   void *oldheap;
52   vl_shmem_hdr_t *shmem_hdr;
53   api_main_t *am = &api_main;
54
55   shmem_hdr = am->shmem_hdr;
56
57 #if DEBUG_MESSAGE_BUFFER_OVERRUN > 0
58   nbytes += 4;
59 #endif
60
61   ASSERT (pool == 0 || vlib_get_thread_index () == 0);
62
63   if (shmem_hdr == 0)
64     {
65       clib_warning ("shared memory header NULL");
66       return 0;
67     }
68
69   /* account for the msgbuf_t header */
70   nbytes += sizeof (msgbuf_t);
71
72   if (shmem_hdr->vl_rings == 0)
73     {
74       clib_warning ("vl_rings NULL");
75       ASSERT (0);
76       abort ();
77     }
78
79   if (shmem_hdr->client_rings == 0)
80     {
81       clib_warning ("client_rings NULL");
82       ASSERT (0);
83       abort ();
84     }
85
86   ap = pool ? shmem_hdr->vl_rings : shmem_hdr->client_rings;
87   for (i = 0; i < vec_len (ap); i++)
88     {
89       /* Too big? */
90       if (nbytes > ap[i].size)
91         {
92           continue;
93         }
94
95       q = ap[i].rp;
96       if (pool == 0)
97         {
98           pthread_mutex_lock (&q->mutex);
99         }
100       rv = (msgbuf_t *) (&q->data[0] + q->head * q->elsize);
101       /*
102        * Is this item still in use?
103        */
104       if (rv->q)
105         {
106           u32 now = (u32) time (0);
107
108           if (PREDICT_TRUE (rv->gc_mark_timestamp == 0))
109             rv->gc_mark_timestamp = now;
110           else
111             {
112               if (now - rv->gc_mark_timestamp > 10)
113                 {
114                   if (CLIB_DEBUG > 0)
115                     {
116                       u16 *msg_idp, msg_id;
117                       clib_warning
118                         ("garbage collect pool %d ring %d index %d", pool, i,
119                          q->head);
120                       msg_idp = (u16 *) (rv->data);
121                       msg_id = clib_net_to_host_u16 (*msg_idp);
122                       if (msg_id < vec_len (api_main.msg_names))
123                         clib_warning ("msg id %d name %s", (u32) msg_id,
124                                       api_main.msg_names[msg_id]);
125                     }
126                   shmem_hdr->garbage_collects++;
127                   goto collected;
128                 }
129             }
130
131
132           /* yes, loser; try next larger pool */
133           ap[i].misses++;
134           if (pool == 0)
135             pthread_mutex_unlock (&q->mutex);
136           continue;
137         }
138     collected:
139
140       /* OK, we have a winner */
141       ap[i].hits++;
142       /*
143        * Remember the source queue, although we
144        * don't need to know the queue to free the item.
145        */
146       rv->q = q;
147       rv->gc_mark_timestamp = 0;
148       q->head++;
149       if (q->head == q->maxsize)
150         q->head = 0;
151
152       if (pool == 0)
153         pthread_mutex_unlock (&q->mutex);
154       goto out;
155     }
156
157   /*
158    * Request too big, or head element of all size-compatible rings
159    * still in use. Fall back to shared-memory malloc.
160    */
161   am->ring_misses++;
162
163   pthread_mutex_lock (&am->vlib_rp->mutex);
164   oldheap = svm_push_data_heap (am->vlib_rp);
165   if (may_return_null)
166     {
167       rv = clib_mem_alloc_or_null (nbytes);
168       if (PREDICT_FALSE (rv == 0))
169         {
170           svm_pop_heap (oldheap);
171           pthread_mutex_unlock (&am->vlib_rp->mutex);
172           return 0;
173         }
174     }
175   else
176     rv = clib_mem_alloc (nbytes);
177
178   rv->q = 0;
179   rv->gc_mark_timestamp = 0;
180   svm_pop_heap (oldheap);
181   pthread_mutex_unlock (&am->vlib_rp->mutex);
182
183 out:
184 #if DEBUG_MESSAGE_BUFFER_OVERRUN > 0
185   {
186     nbytes -= 4;
187     u32 *overrun;
188     overrun = (u32 *) (rv->data + nbytes - sizeof (msgbuf_t));
189     *overrun = 0x1badbabe;
190   }
191 #endif
192   rv->data_len = htonl (nbytes - sizeof (msgbuf_t));
193
194   return (rv->data);
195 }
196
197 void *
198 vl_msg_api_alloc (int nbytes)
199 {
200   int pool;
201   api_main_t *am = &api_main;
202   vl_shmem_hdr_t *shmem_hdr = am->shmem_hdr;
203
204   /*
205    * Clients use pool-0, vlib proc uses pool 1
206    */
207   pool = (am->our_pid == shmem_hdr->vl_pid);
208   return vl_msg_api_alloc_internal (nbytes, pool, 0 /* may_return_null */ );
209 }
210
211 void *
212 vl_msg_api_alloc_zero (int nbytes)
213 {
214   void *ret;
215
216   ret = vl_msg_api_alloc (nbytes);
217   clib_memset (ret, 0, nbytes);
218   return ret;
219 }
220
221 void *
222 vl_msg_api_alloc_or_null (int nbytes)
223 {
224   int pool;
225   api_main_t *am = &api_main;
226   vl_shmem_hdr_t *shmem_hdr = am->shmem_hdr;
227
228   pool = (am->our_pid == shmem_hdr->vl_pid);
229   return vl_msg_api_alloc_internal (nbytes, pool, 1 /* may_return_null */ );
230 }
231
232 void *
233 vl_msg_api_alloc_as_if_client (int nbytes)
234 {
235   return vl_msg_api_alloc_internal (nbytes, 0, 0 /* may_return_null */ );
236 }
237
238 void *
239 vl_msg_api_alloc_zero_as_if_client (int nbytes)
240 {
241   void *ret;
242
243   ret = vl_msg_api_alloc_as_if_client (nbytes);
244   clib_memset (ret, 0, nbytes);
245   return ret;
246 }
247
248 void *
249 vl_msg_api_alloc_as_if_client_or_null (int nbytes)
250 {
251   return vl_msg_api_alloc_internal (nbytes, 0, 1 /* may_return_null */ );
252 }
253
254 void *
255 vl_mem_api_alloc_as_if_client_w_reg (vl_api_registration_t * reg, int nbytes)
256 {
257   api_main_t *am = &api_main;
258   vl_shmem_hdr_t *save_shmem_hdr = am->shmem_hdr;
259   svm_region_t *vlib_rp, *save_vlib_rp = am->vlib_rp;
260   void *msg;
261
262   vlib_rp = am->vlib_rp = reg->vlib_rp;
263   am->shmem_hdr = (void *) vlib_rp->user_ctx;
264
265   msg = vl_msg_api_alloc_internal (nbytes, 0, 0 /* may_return_null */ );
266
267   am->shmem_hdr = save_shmem_hdr;
268   am->vlib_rp = save_vlib_rp;
269
270   return msg;
271 }
272
273 void
274 vl_msg_api_free (void *a)
275 {
276   msgbuf_t *rv;
277   void *oldheap;
278   api_main_t *am = &api_main;
279
280   rv = (msgbuf_t *) (((u8 *) a) - offsetof (msgbuf_t, data));
281
282   /*
283    * Here's the beauty of the scheme.  Only one proc/thread has
284    * control of a given message buffer. To free a buffer, we just clear the
285    * queue field, and leave. No locks, no hits, no errors...
286    */
287   if (rv->q)
288     {
289       rv->q = 0;
290       rv->gc_mark_timestamp = 0;
291 #if DEBUG_MESSAGE_BUFFER_OVERRUN > 0
292       {
293         u32 *overrun;
294         overrun = (u32 *) (rv->data + ntohl (rv->data_len));
295         ASSERT (*overrun == 0x1badbabe);
296       }
297 #endif
298       return;
299     }
300
301   pthread_mutex_lock (&am->vlib_rp->mutex);
302   oldheap = svm_push_data_heap (am->vlib_rp);
303
304 #if DEBUG_MESSAGE_BUFFER_OVERRUN > 0
305   {
306     u32 *overrun;
307     overrun = (u32 *) (rv->data + ntohl (rv->data_len));
308     ASSERT (*overrun == 0x1badbabe);
309   }
310 #endif
311
312   clib_mem_free (rv);
313   svm_pop_heap (oldheap);
314   pthread_mutex_unlock (&am->vlib_rp->mutex);
315 }
316
317 static void
318 vl_msg_api_free_nolock (void *a)
319 {
320   msgbuf_t *rv;
321   void *oldheap;
322   api_main_t *am = &api_main;
323
324   rv = (msgbuf_t *) (((u8 *) a) - offsetof (msgbuf_t, data));
325   /*
326    * Here's the beauty of the scheme.  Only one proc/thread has
327    * control of a given message buffer. To free a buffer, we just clear the
328    * queue field, and leave. No locks, no hits, no errors...
329    */
330   if (rv->q)
331     {
332       rv->q = 0;
333       return;
334     }
335
336   oldheap = svm_push_data_heap (am->vlib_rp);
337   clib_mem_free (rv);
338   svm_pop_heap (oldheap);
339 }
340
341 void
342 vl_set_memory_root_path (const char *name)
343 {
344   api_main_t *am = &api_main;
345
346   am->root_path = name;
347 }
348
349 void
350 vl_set_memory_uid (int uid)
351 {
352   api_main_t *am = &api_main;
353
354   am->api_uid = uid;
355 }
356
357 void
358 vl_set_memory_gid (int gid)
359 {
360   api_main_t *am = &api_main;
361
362   am->api_gid = gid;
363 }
364
365 void
366 vl_set_global_memory_baseva (u64 baseva)
367 {
368   api_main_t *am = &api_main;
369
370   am->global_baseva = baseva;
371 }
372
373 void
374 vl_set_global_memory_size (u64 size)
375 {
376   api_main_t *am = &api_main;
377
378   am->global_size = size;
379 }
380
381 void
382 vl_set_api_memory_size (u64 size)
383 {
384   api_main_t *am = &api_main;
385
386   am->api_size = size;
387 }
388
389 void
390 vl_set_global_pvt_heap_size (u64 size)
391 {
392   api_main_t *am = &api_main;
393
394   am->global_pvt_heap_size = size;
395 }
396
397 void
398 vl_set_api_pvt_heap_size (u64 size)
399 {
400   api_main_t *am = &api_main;
401
402   am->api_pvt_heap_size = size;
403 }
404
405 static void
406 vl_api_default_mem_config (vl_shmem_hdr_t * shmem_hdr)
407 {
408   api_main_t *am = &api_main;
409   u32 vlib_input_queue_length;
410
411   /* vlib main input queue */
412   vlib_input_queue_length = 1024;
413   if (am->vlib_input_queue_length)
414     vlib_input_queue_length = am->vlib_input_queue_length;
415
416   shmem_hdr->vl_input_queue =
417     svm_queue_alloc_and_init (vlib_input_queue_length, sizeof (uword),
418                               getpid ());
419
420 #define _(sz,n)                                                 \
421     do {                                                        \
422         ring_alloc_t _rp;                                       \
423         _rp.rp = svm_queue_alloc_and_init ((n), (sz), 0);       \
424         _rp.size = (sz);                                        \
425         _rp.nitems = n;                                         \
426         _rp.hits = 0;                                           \
427         _rp.misses = 0;                                         \
428         vec_add1(shmem_hdr->vl_rings, _rp);                     \
429     } while (0);
430
431   foreach_vl_aring_size;
432 #undef _
433
434 #define _(sz,n)                                                 \
435     do {                                                        \
436         ring_alloc_t _rp;                                       \
437         _rp.rp = svm_queue_alloc_and_init ((n), (sz), 0);       \
438         _rp.size = (sz);                                        \
439         _rp.nitems = n;                                         \
440         _rp.hits = 0;                                           \
441         _rp.misses = 0;                                         \
442         vec_add1(shmem_hdr->client_rings, _rp);                 \
443     } while (0);
444
445   foreach_clnt_aring_size;
446 #undef _
447 }
448
449 void
450 vl_api_mem_config (vl_shmem_hdr_t * hdr, vl_api_shm_elem_config_t * config)
451 {
452   vl_api_shm_elem_config_t *c;
453   ring_alloc_t *rp;
454   u32 size;
455
456   if (!config)
457     {
458       vl_api_default_mem_config (hdr);
459       return;
460     }
461
462   vec_foreach (c, config)
463   {
464     switch (c->type)
465       {
466       case VL_API_QUEUE:
467         hdr->vl_input_queue = svm_queue_alloc_and_init (c->count, c->size,
468                                                         getpid ());
469         continue;
470       case VL_API_VLIB_RING:
471         vec_add2 (hdr->vl_rings, rp, 1);
472         break;
473       case VL_API_CLIENT_RING:
474         vec_add2 (hdr->client_rings, rp, 1);
475         break;
476       default:
477         clib_warning ("unknown config type: %d", c->type);
478         continue;
479       }
480
481     size = sizeof (ring_alloc_t) + c->size;
482     rp->rp = svm_queue_alloc_and_init (c->count, size, 0);
483     rp->size = size;
484     rp->nitems = c->count;
485     rp->hits = 0;
486     rp->misses = 0;
487   }
488 }
489
490 void
491 vl_init_shmem (svm_region_t * vlib_rp, vl_api_shm_elem_config_t * config,
492                int is_vlib, int is_private_region)
493 {
494   api_main_t *am = &api_main;
495   vl_shmem_hdr_t *shmem_hdr = 0;
496   void *oldheap;
497   ASSERT (vlib_rp);
498
499   /* $$$$ need private region config parameters */
500
501   oldheap = svm_push_data_heap (vlib_rp);
502
503   vec_validate (shmem_hdr, 0);
504   shmem_hdr->version = VL_SHM_VERSION;
505   shmem_hdr->clib_file_index = VL_API_INVALID_FI;
506
507   /* Set up the queue and msg ring allocator */
508   vl_api_mem_config (shmem_hdr, config);
509
510   if (is_private_region == 0)
511     {
512       am->shmem_hdr = shmem_hdr;
513       am->vlib_rp = vlib_rp;
514       am->our_pid = getpid ();
515       if (is_vlib)
516         am->shmem_hdr->vl_pid = am->our_pid;
517     }
518   else
519     shmem_hdr->vl_pid = am->our_pid;
520
521   svm_pop_heap (oldheap);
522
523   /*
524    * After absolutely everything that a client might see is set up,
525    * declare the shmem region valid
526    */
527   vlib_rp->user_ctx = shmem_hdr;
528
529   pthread_mutex_unlock (&vlib_rp->mutex);
530 }
531
532 int
533 vl_map_shmem (const char *region_name, int is_vlib)
534 {
535   svm_map_region_args_t _a, *a = &_a;
536   svm_region_t *vlib_rp, *root_rp;
537   api_main_t *am = &api_main;
538   int i;
539   struct timespec ts, tsrem;
540   char *vpe_api_region_suffix = "-vpe-api";
541
542   clib_memset (a, 0, sizeof (*a));
543
544   if (strstr (region_name, vpe_api_region_suffix))
545     {
546       u8 *root_path = format (0, "%s", region_name);
547       _vec_len (root_path) = (vec_len (root_path) -
548                               strlen (vpe_api_region_suffix));
549       vec_terminate_c_string (root_path);
550       a->root_path = (const char *) root_path;
551       am->root_path = (const char *) root_path;
552     }
553
554   if (is_vlib == 0)
555     {
556       int tfd;
557       u8 *api_name;
558       /*
559        * Clients wait for vpp to set up the root / API regioins
560        */
561       if (am->root_path)
562         api_name = format (0, "/dev/shm/%s-%s%c", am->root_path,
563                            region_name + 1, 0);
564       else
565         api_name = format (0, "/dev/shm%s%c", region_name, 0);
566
567       /* Wait up to 100 seconds... */
568       for (i = 0; i < 10000; i++)
569         {
570           ts.tv_sec = 0;
571           ts.tv_nsec = 10000 * 1000;    /* 10 ms */
572           while (nanosleep (&ts, &tsrem) < 0)
573             ts = tsrem;
574           tfd = open ((char *) api_name, O_RDWR);
575           if (tfd >= 0)
576             break;
577         }
578       vec_free (api_name);
579       if (tfd < 0)
580         {
581           clib_warning ("region init fail");
582           return -2;
583         }
584       close (tfd);
585       svm_region_init_chroot_uid_gid (am->root_path, getuid (), getgid ());
586     }
587
588   if (a->root_path != NULL)
589     {
590       a->name = "/vpe-api";
591     }
592   else
593     a->name = region_name;
594   a->size = am->api_size ? am->api_size : (16 << 20);
595   a->flags = SVM_FLAGS_MHEAP;
596   a->uid = am->api_uid;
597   a->gid = am->api_gid;
598   a->pvt_heap_size = am->api_pvt_heap_size;
599
600   vlib_rp = svm_region_find_or_create (a);
601
602   if (vlib_rp == 0)
603     return (-2);
604
605   pthread_mutex_lock (&vlib_rp->mutex);
606   /* Has someone else set up the shared-memory variable table? */
607   if (vlib_rp->user_ctx)
608     {
609       am->shmem_hdr = (void *) vlib_rp->user_ctx;
610       am->our_pid = getpid ();
611       if (is_vlib)
612         {
613           svm_queue_t *q;
614           uword old_msg;
615           /*
616            * application restart. Reset cached pids, API message
617            * rings, list of clients; otherwise, various things
618            * fail. (e.g. queue non-empty notification)
619            */
620
621           /* ghosts keep the region from disappearing properly */
622           svm_client_scan_this_region_nolock (vlib_rp);
623           am->shmem_hdr->application_restarts++;
624           q = am->shmem_hdr->vl_input_queue;
625           am->shmem_hdr->vl_pid = getpid ();
626           q->consumer_pid = am->shmem_hdr->vl_pid;
627           /* Drain the input queue, freeing msgs */
628           for (i = 0; i < 10; i++)
629             {
630               if (pthread_mutex_trylock (&q->mutex) == 0)
631                 {
632                   pthread_mutex_unlock (&q->mutex);
633                   goto mutex_ok;
634                 }
635               ts.tv_sec = 0;
636               ts.tv_nsec = 10000 * 1000;        /* 10 ms */
637               while (nanosleep (&ts, &tsrem) < 0)
638                 ts = tsrem;
639             }
640           /* Mutex buggered, "fix" it */
641           clib_memset (&q->mutex, 0, sizeof (q->mutex));
642           clib_warning ("forcibly release main input queue mutex");
643
644         mutex_ok:
645           am->vlib_rp = vlib_rp;
646           while (svm_queue_sub (q, (u8 *) & old_msg, SVM_Q_NOWAIT, 0)
647                  != -2 /* queue underflow */ )
648             {
649               vl_msg_api_free_nolock ((void *) old_msg);
650               am->shmem_hdr->restart_reclaims++;
651             }
652           pthread_mutex_unlock (&vlib_rp->mutex);
653           root_rp = svm_get_root_rp ();
654           ASSERT (root_rp);
655           /* Clean up the root region client list */
656           pthread_mutex_lock (&root_rp->mutex);
657           svm_client_scan_this_region_nolock (root_rp);
658           pthread_mutex_unlock (&root_rp->mutex);
659         }
660       else
661         {
662           pthread_mutex_unlock (&vlib_rp->mutex);
663         }
664       am->vlib_rp = vlib_rp;
665       vec_add1 (am->mapped_shmem_regions, vlib_rp);
666       return 0;
667     }
668   /* Clients simply have to wait... */
669   if (!is_vlib)
670     {
671       pthread_mutex_unlock (&vlib_rp->mutex);
672
673       /* Wait up to 100 seconds... */
674       for (i = 0; i < 10000; i++)
675         {
676           ts.tv_sec = 0;
677           ts.tv_nsec = 10000 * 1000;    /* 10 ms */
678           while (nanosleep (&ts, &tsrem) < 0)
679             ts = tsrem;
680           if (vlib_rp->user_ctx)
681             goto ready;
682         }
683       /* Clean up and leave... */
684       svm_region_unmap (vlib_rp);
685       clib_warning ("region init fail");
686       return (-2);
687
688     ready:
689       am->shmem_hdr = (void *) vlib_rp->user_ctx;
690       am->our_pid = getpid ();
691       am->vlib_rp = vlib_rp;
692       vec_add1 (am->mapped_shmem_regions, vlib_rp);
693       return 0;
694     }
695
696   /* Nope, it's our problem... */
697   vl_init_shmem (vlib_rp, 0 /* default config */ , 1 /* is vlib */ ,
698                  0 /* is_private_region */ );
699
700   vec_add1 (am->mapped_shmem_regions, vlib_rp);
701   return 0;
702 }
703
704 void
705 vl_register_mapped_shmem_region (svm_region_t * rp)
706 {
707   api_main_t *am = &api_main;
708
709   vec_add1 (am->mapped_shmem_regions, rp);
710 }
711
712 static void
713 vl_unmap_shmem_internal (u8 is_client)
714 {
715   svm_region_t *rp;
716   int i;
717   api_main_t *am = &api_main;
718
719   if (!svm_get_root_rp ())
720     return;
721
722   for (i = 0; i < vec_len (am->mapped_shmem_regions); i++)
723     {
724       rp = am->mapped_shmem_regions[i];
725       is_client ? svm_region_unmap_client (rp) : svm_region_unmap (rp);
726     }
727
728   vec_free (am->mapped_shmem_regions);
729   am->shmem_hdr = 0;
730
731   is_client ? svm_region_exit_client () : svm_region_exit ();
732
733   /* $$$ more careful cleanup, valgrind run... */
734   vec_free (am->msg_handlers);
735   vec_free (am->msg_endian_handlers);
736   vec_free (am->msg_print_handlers);
737 }
738
739 void
740 vl_unmap_shmem (void)
741 {
742   vl_unmap_shmem_internal (0);
743 }
744
745 void
746 vl_unmap_shmem_client (void)
747 {
748   vl_unmap_shmem_internal (1);
749 }
750
751 void
752 vl_msg_api_send_shmem (svm_queue_t * q, u8 * elem)
753 {
754   api_main_t *am = &api_main;
755   uword *trace = (uword *) elem;
756
757   if (am->tx_trace && am->tx_trace->enabled)
758     vl_msg_api_trace (am, am->tx_trace, (void *) trace[0]);
759
760   /*
761    * Announce a probable binary API client bug:
762    * some client's input queue is stuffed.
763    * The situation may be recoverable, or not.
764    */
765   if (PREDICT_FALSE
766       (am->vl_clients /* vpp side */  && (q->cursize == q->maxsize)))
767     clib_warning ("WARNING: client input queue at %llx is stuffed...", q);
768   (void) svm_queue_add (q, elem, 0 /* nowait */ );
769 }
770
771 int
772 vl_mem_api_can_send (svm_queue_t * q)
773 {
774   return (q->cursize < q->maxsize);
775 }
776
777 void
778 vl_msg_api_send_shmem_nolock (svm_queue_t * q, u8 * elem)
779 {
780   api_main_t *am = &api_main;
781   uword *trace = (uword *) elem;
782
783   if (am->tx_trace && am->tx_trace->enabled)
784     vl_msg_api_trace (am, am->tx_trace, (void *) trace[0]);
785
786   (void) svm_queue_add_nolock (q, elem);
787 }
788
789 /*
790  * fd.io coding-style-patch-verification: ON
791  *
792  * Local Variables:
793  * eval: (c-set-style "gnu")
794  * End:
795  */