misc: add address sanitizer heap instrumentation
[vpp.git] / src / vpp-api / client / client.c
1 /*
2  * Copyright (c) 2016 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include <stdio.h>
16 #include <stdlib.h>
17 #include <stddef.h>
18 #include <sys/types.h>
19 #include <sys/socket.h>
20 #include <sys/mman.h>
21 #include <sys/stat.h>
22 #include <netinet/in.h>
23 #include <netdb.h>
24 #include <signal.h>
25 #include <stdbool.h>
26 #include <vnet/vnet.h>
27 #include <vlib/vlib.h>
28 #include <vlib/unix/unix.h>
29 #include <vlibapi/api.h>
30 #include <vlibmemory/api.h>
31
32 #include <vpp/api/vpe_msg_enum.h>
33
34 #include "vppapiclient.h"
35
36 bool timeout_cancelled;
37 bool timeout_in_progress;
38 bool rx_thread_done;
39
40 /*
41  * Asynchronous mode:
42  *  Client registers a callback. All messages are sent to the callback.
43  * Synchronous mode:
44  *  Client calls blocking read().
45  *  Clients are expected to collate events on a queue.
46  *  vac_write() -> suspends RX thread
47  *  vac_read() -> resumes RX thread
48  */
49
50 #define vl_typedefs             /* define message structures */
51 #include <vpp/api/vpe_all_api_h.h>
52 #undef vl_typedefs
53
54 #define vl_endianfun             /* define message structures */
55 #include <vpp/api/vpe_all_api_h.h>
56 #undef vl_endianfun
57
58 vlib_main_t vlib_global_main;
59 vlib_main_t **vlib_mains;
60
61 typedef struct {
62   u8 connected_to_vlib;
63   pthread_t rx_thread_handle;
64   pthread_t timeout_thread_handle;
65   pthread_mutex_t queue_lock;
66   pthread_cond_t suspend_cv;
67   pthread_cond_t resume_cv;
68   pthread_mutex_t timeout_lock;
69   u8 timeout_loop;
70   pthread_cond_t timeout_cv;
71   pthread_cond_t timeout_cancel_cv;
72   pthread_cond_t terminate_cv;
73 } vac_main_t;
74
75 vac_main_t vac_main;
76 vac_callback_t vac_callback;
77 u16 read_timeout = 0;
78 bool rx_is_running = false;
79 bool timeout_thread_cancelled = false;
80
81 /* Set to true to enable memory tracing */
82 bool mem_trace = false;
83
84 __attribute__((constructor))
85 static void
86 vac_client_constructor (void)
87 {
88   clib_mem_init (0, 1 << 30);
89 #if USE_DLMALLOC == 0
90   {
91       u8 *heap;
92       mheap_t *h;
93
94       heap = clib_mem_get_per_cpu_heap ();
95       h = mheap_header (heap);
96       /* make the main heap thread-safe */
97       h->flags |= MHEAP_FLAG_THREAD_SAFE;
98   }
99 #endif
100   if (mem_trace)
101     clib_mem_trace (1);
102 }
103
104 __attribute__((destructor))
105 static void
106 vac_client_destructor (void)
107 {
108   if (mem_trace)
109     fformat(stderr, "TRACE: %s",
110             format (0, "%U\n",
111                     format_mheap, clib_mem_get_heap (), 1));
112 }
113
114
115 static void
116 init (void)
117 {
118   vac_main_t *pm = &vac_main;
119   clib_memset(pm, 0, sizeof(*pm));
120   pthread_mutex_init(&pm->queue_lock, NULL);
121   pthread_cond_init(&pm->suspend_cv, NULL);
122   pthread_cond_init(&pm->resume_cv, NULL);
123   pthread_mutex_init(&pm->timeout_lock, NULL);
124   pm->timeout_loop = 1;
125   pthread_cond_init(&pm->timeout_cv, NULL);
126   pthread_cond_init(&pm->timeout_cancel_cv, NULL);
127   pthread_cond_init(&pm->terminate_cv, NULL);
128 }
129
130 static void
131 cleanup (void)
132 {
133   vac_main_t *pm = &vac_main;
134   pthread_mutex_destroy(&pm->queue_lock);
135   pthread_cond_destroy(&pm->suspend_cv);
136   pthread_cond_destroy(&pm->resume_cv);
137   pthread_mutex_destroy(&pm->timeout_lock);
138   pthread_cond_destroy(&pm->timeout_cv);
139   pthread_cond_destroy(&pm->timeout_cancel_cv);
140   pthread_cond_destroy(&pm->terminate_cv);
141   clib_memset(pm, 0, sizeof(*pm));
142 }
143
144 /*
145  * Satisfy external references when -lvlib is not available.
146  */
147 void vlib_cli_output (struct vlib_main_t * vm, char * fmt, ...)
148 {
149   clib_warning ("vlib_cli_output called...");
150 }
151
152 void
153 vac_free (void * msg)
154 {
155   vl_msg_api_free (msg);
156 }
157
158 static void
159 vac_api_handler (void *msg)
160 {
161   u16 id = ntohs(*((u16 *)msg));
162   msgbuf_t *msgbuf = (msgbuf_t *)(((u8 *)msg) - offsetof(msgbuf_t, data));
163   int l = ntohl(msgbuf->data_len);
164   if (l == 0)
165     clib_warning("Message ID %d has wrong length: %d\n", id, l);
166
167   /* Call Python callback */
168   ASSERT(vac_callback);
169   (vac_callback)(msg, l);
170   vac_free(msg);
171 }
172
173 static void *
174 vac_rx_thread_fn (void *arg)
175 {
176   svm_queue_t *q;
177   vl_api_memclnt_keepalive_t *mp;
178   vl_api_memclnt_keepalive_reply_t *rmp;
179   vac_main_t *pm = &vac_main;
180   api_main_t *am = &api_main;
181   vl_shmem_hdr_t *shmem_hdr;
182   uword msg;
183
184   q = am->vl_input_queue;
185
186   while (1)
187     while (!svm_queue_sub(q, (u8 *)&msg, SVM_Q_WAIT, 0))
188       {
189         VL_MSG_API_UNPOISON((void *)msg);
190         u16 id = ntohs(*((u16 *)msg));
191         switch (id) {
192         case VL_API_RX_THREAD_EXIT:
193           vl_msg_api_free((void *) msg);
194           /* signal waiting threads that this thread is about to terminate */
195           pthread_mutex_lock(&pm->queue_lock);
196           rx_thread_done = true;
197           pthread_cond_signal(&pm->terminate_cv);
198           pthread_mutex_unlock(&pm->queue_lock);
199           pthread_exit(0);
200           return 0;
201           break;
202
203         case VL_API_MEMCLNT_RX_THREAD_SUSPEND:
204           vl_msg_api_free((void * )msg);
205           /* Suspend thread and signal reader */
206           pthread_mutex_lock(&pm->queue_lock);
207           pthread_cond_signal(&pm->suspend_cv);
208           /* Wait for the resume signal */
209           pthread_cond_wait (&pm->resume_cv, &pm->queue_lock);
210           pthread_mutex_unlock(&pm->queue_lock);
211           break;
212
213         case VL_API_MEMCLNT_READ_TIMEOUT:
214           clib_warning("Received read timeout in async thread\n");
215           vl_msg_api_free((void *) msg);
216           break;
217
218         case VL_API_MEMCLNT_KEEPALIVE:
219           mp = (void *)msg;
220           rmp = vl_msg_api_alloc (sizeof (*rmp));
221           clib_memset (rmp, 0, sizeof (*rmp));
222           rmp->_vl_msg_id = ntohs(VL_API_MEMCLNT_KEEPALIVE_REPLY);
223           rmp->context = mp->context;
224           shmem_hdr = am->shmem_hdr;
225           vl_msg_api_send_shmem(shmem_hdr->vl_input_queue, (u8 *)&rmp);
226           vl_msg_api_free((void *) msg);
227           break;
228
229         default:
230           vac_api_handler((void *)msg);
231         }
232       }
233 }
234
235 static void *
236 vac_timeout_thread_fn (void *arg)
237 {
238   vl_api_memclnt_read_timeout_t *ep;
239   vac_main_t *pm = &vac_main;
240   api_main_t *am = &api_main;
241   struct timespec ts;
242   struct timeval tv;
243   int rv;
244
245   while (pm->timeout_loop)
246     {
247       /* Wait for poke */
248       pthread_mutex_lock(&pm->timeout_lock);
249       while (!timeout_in_progress)
250         pthread_cond_wait (&pm->timeout_cv, &pm->timeout_lock);
251
252       /* Starting timer */
253       gettimeofday(&tv, NULL);
254       ts.tv_sec = tv.tv_sec + read_timeout;
255       ts.tv_nsec = 0;
256
257       if (!timeout_cancelled) {
258         rv = pthread_cond_timedwait (&pm->timeout_cancel_cv,
259                                      &pm->timeout_lock, &ts);
260         if (rv == ETIMEDOUT && !timeout_thread_cancelled) {
261           ep = vl_msg_api_alloc (sizeof (*ep));
262           ep->_vl_msg_id = ntohs(VL_API_MEMCLNT_READ_TIMEOUT);
263           vl_msg_api_send_shmem(am->vl_input_queue, (u8 *)&ep);
264         }
265       }
266
267       pthread_mutex_unlock(&pm->timeout_lock);
268     }
269   pthread_exit(0);
270 }
271
272 void
273 vac_rx_suspend (void)
274 {
275   api_main_t *am = &api_main;
276   vac_main_t *pm = &vac_main;
277   vl_api_memclnt_rx_thread_suspend_t *ep;
278
279   if (!pm->rx_thread_handle) return;
280   pthread_mutex_lock(&pm->queue_lock);
281   if (rx_is_running)
282     {
283       ep = vl_msg_api_alloc (sizeof (*ep));
284       ep->_vl_msg_id = ntohs(VL_API_MEMCLNT_RX_THREAD_SUSPEND);
285       vl_msg_api_send_shmem(am->vl_input_queue, (u8 *)&ep);
286       /* Wait for RX thread to tell us it has suspended */
287       pthread_cond_wait(&pm->suspend_cv, &pm->queue_lock);
288       rx_is_running = false;
289     }
290   pthread_mutex_unlock(&pm->queue_lock);
291 }
292
293 void
294 vac_rx_resume (void)
295 {
296   vac_main_t *pm = &vac_main;
297   if (!pm->rx_thread_handle) return;
298   pthread_mutex_lock(&pm->queue_lock);
299   if (rx_is_running) goto unlock;
300   pthread_cond_signal(&pm->resume_cv);
301   rx_is_running = true;
302  unlock:
303   pthread_mutex_unlock(&pm->queue_lock);
304 }
305
306 static uword *
307 vac_msg_table_get_hash (void)
308 {
309   api_main_t *am = &api_main;
310   return (am->msg_index_by_name_and_crc);
311 }
312
313 int
314 vac_msg_table_size(void)
315 {
316   api_main_t *am = &api_main;
317   return hash_elts(am->msg_index_by_name_and_crc);
318 }
319
320 int
321 vac_connect (char * name, char * chroot_prefix, vac_callback_t cb,
322                int rx_qlen)
323 {
324   rx_thread_done = false;
325   int rv = 0;
326   vac_main_t *pm = &vac_main;
327
328   init();
329   if (chroot_prefix != NULL)
330     vl_set_memory_root_path (chroot_prefix);
331
332   if ((rv = vl_client_api_map("/vpe-api"))) {
333     clib_warning ("vl_client_api_map returned %d", rv);
334     return rv;
335   }
336
337   if (vl_client_connect(name, 0, rx_qlen) < 0) {
338     vl_client_api_unmap();
339     return (-1);
340   }
341
342   if (cb) {
343     /* Start the rx queue thread */
344     rv = pthread_create(&pm->rx_thread_handle, NULL, vac_rx_thread_fn, 0);
345     if (rv) {
346       clib_warning("pthread_create returned %d", rv);
347       vl_client_api_unmap();
348       return (-1);
349     }
350     vac_callback = cb;
351     rx_is_running = true;
352   }
353
354   /* Start read timeout thread */
355   rv = pthread_create(&pm->timeout_thread_handle, NULL,
356                       vac_timeout_thread_fn, 0);
357   if (rv) {
358     clib_warning("pthread_create returned %d", rv);
359     vl_client_api_unmap();
360     return (-1);
361   }
362
363   pm->connected_to_vlib = 1;
364
365   return (0);
366 }
367 static void
368 set_timeout (unsigned short timeout)
369 {
370   vac_main_t *pm = &vac_main;
371   pthread_mutex_lock(&pm->timeout_lock);
372   read_timeout = timeout;
373   timeout_in_progress = true;
374   timeout_cancelled = false;
375   pthread_cond_signal(&pm->timeout_cv);
376   pthread_mutex_unlock(&pm->timeout_lock);
377 }
378
379 static void
380 unset_timeout (void)
381 {
382   vac_main_t *pm = &vac_main;
383   pthread_mutex_lock(&pm->timeout_lock);
384   timeout_in_progress = false;
385   timeout_cancelled = true;
386   pthread_cond_signal(&pm->timeout_cancel_cv);
387   pthread_mutex_unlock(&pm->timeout_lock);
388 }
389
390 int
391 vac_disconnect (void)
392 {
393   api_main_t *am = &api_main;
394   vac_main_t *pm = &vac_main;
395   uword junk;
396   int rv = 0;
397
398   if (!pm->connected_to_vlib) return 0;
399
400   if (pm->rx_thread_handle) {
401     vl_api_rx_thread_exit_t *ep;
402     ep = vl_msg_api_alloc (sizeof (*ep));
403     ep->_vl_msg_id = ntohs(VL_API_RX_THREAD_EXIT);
404     vl_msg_api_send_shmem(am->vl_input_queue, (u8 *)&ep);
405
406     /* wait (with timeout) until RX thread has finished */
407     struct timespec ts;
408     struct timeval tv;
409     gettimeofday(&tv, NULL);
410     ts.tv_sec = tv.tv_sec + 5;
411     ts.tv_nsec = 0;
412
413     pthread_mutex_lock(&pm->queue_lock);
414     if (rx_thread_done == false)
415       rv = pthread_cond_timedwait(&pm->terminate_cv, &pm->queue_lock, &ts);
416     pthread_mutex_unlock(&pm->queue_lock);
417
418     /* now join so we wait until thread has -really- finished */
419     if (rv == ETIMEDOUT)
420       pthread_cancel(pm->rx_thread_handle);
421     else
422       pthread_join(pm->rx_thread_handle, (void **) &junk);
423   }
424   if (pm->timeout_thread_handle) {
425     /* cancel, wake then join the timeout thread */
426     pm->timeout_loop = 0;
427     timeout_thread_cancelled = true;
428     set_timeout(0);
429     pthread_join(pm->timeout_thread_handle, (void **) &junk);
430   }
431
432   vl_client_disconnect();
433   vl_client_api_unmap();
434   vac_callback = 0;
435
436   cleanup();
437
438   return (0);
439 }
440
441 int
442 vac_read (char **p, int *l, u16 timeout)
443 {
444   svm_queue_t *q;
445   api_main_t *am = &api_main;
446   vac_main_t *pm = &vac_main;
447   vl_api_memclnt_keepalive_t *mp;
448   vl_api_memclnt_keepalive_reply_t *rmp;
449   uword msg;
450   msgbuf_t *msgbuf;
451   int rv;
452   vl_shmem_hdr_t *shmem_hdr;
453
454   /* svm_queue_sub(below) returns {-1, -2} */
455   if (!pm->connected_to_vlib) return -3;
456
457   *l = 0;
458
459   /* svm_queue_sub(below) returns {-1, -2} */
460   if (am->our_pid == 0) return (-4);
461
462   /* Poke timeout thread */
463   if (timeout)
464     set_timeout(timeout);
465
466   q = am->vl_input_queue;
467
468  again:
469   rv = svm_queue_sub(q, (u8 *)&msg, SVM_Q_WAIT, 0);
470
471   if (rv == 0) {
472     VL_MSG_API_UNPOISON((void *)msg);
473     u16 msg_id = ntohs(*((u16 *)msg));
474     switch (msg_id) {
475     case VL_API_RX_THREAD_EXIT:
476       vl_msg_api_free((void *) msg);
477       goto error;
478     case VL_API_MEMCLNT_RX_THREAD_SUSPEND:
479       goto error;
480     case VL_API_MEMCLNT_READ_TIMEOUT:
481       goto error;
482     case VL_API_MEMCLNT_KEEPALIVE:
483       /* Handle an alive-check ping from vpp. */
484       mp = (void *)msg;
485       rmp = vl_msg_api_alloc (sizeof (*rmp));
486       clib_memset (rmp, 0, sizeof (*rmp));
487       rmp->_vl_msg_id = ntohs(VL_API_MEMCLNT_KEEPALIVE_REPLY);
488       rmp->context = mp->context;
489       shmem_hdr = am->shmem_hdr;
490       vl_msg_api_send_shmem(shmem_hdr->vl_input_queue, (u8 *)&rmp);
491       vl_msg_api_free((void *) msg);
492       /* 
493        * Python code is blissfully unaware of these pings, so
494        * act as if it never happened...
495        */
496       goto again;
497
498     default:
499       msgbuf = (msgbuf_t *)(((u8 *)msg) - offsetof(msgbuf_t, data));
500       *l = ntohl(msgbuf->data_len);
501       if (*l == 0) {
502         fprintf(stderr, "Unregistered API message: %d\n", msg_id);
503         goto error;
504       }
505     }
506     *p = (char *)msg;
507
508
509   } else {
510     fprintf(stderr, "Read failed with %d\n", rv);
511   }
512   /* Let timeout notification thread know we're done */
513   if (timeout)
514     unset_timeout();
515
516   return (rv);
517
518  error:
519   if (timeout)
520     unset_timeout();
521   vl_msg_api_free((void *) msg);
522   /* Client might forget to resume RX thread on failure */
523   vac_rx_resume ();
524   return -1;
525 }
526
527 /*
528  * XXX: Makes the assumption that client_index is the first member
529  */
530 typedef VL_API_PACKED(struct _vl_api_header {
531   u16 _vl_msg_id;
532   u32 client_index;
533 }) vl_api_header_t;
534
535 static u32
536 vac_client_index (void)
537 {
538   return (api_main.my_client_index);
539 }
540
541 int
542 vac_write (char *p, int l)
543 {
544   int rv = -1;
545   api_main_t *am = &api_main;
546   vl_api_header_t *mp = vl_msg_api_alloc(l);
547   svm_queue_t *q;
548   vac_main_t *pm = &vac_main;
549
550   if (!pm->connected_to_vlib) return -1;
551   if (!mp) return (-1);
552
553   memcpy(mp, p, l);
554   mp->client_index = vac_client_index();
555   q = am->shmem_hdr->vl_input_queue;
556   rv = svm_queue_add(q, (u8 *)&mp, 0);
557   if (rv != 0) {
558     fprintf(stderr, "vpe_api_write fails: %d\n", rv);
559     /* Clear message */
560     vac_free(mp);
561   }
562   return (rv);
563 }
564
565 int
566 vac_get_msg_index (unsigned char * name)
567 {
568   return vl_msg_api_get_msg_index (name);
569 }
570
571 int
572 vac_msg_table_max_index(void)
573 {
574   int max = 0;
575   hash_pair_t *hp;
576   uword *h = vac_msg_table_get_hash();
577   hash_foreach_pair (hp, h,
578   ({
579     if (hp->value[0] > max)
580       max = hp->value[0];
581   }));
582
583   return max;
584 }
585
586 void
587 vac_set_error_handler (vac_error_callback_t cb)
588 {
589   if (cb) clib_error_register_handler (cb, 0);
590 }