Trivial: Cleanup some typos.
[vpp.git] / src / vpp-api / client / client.c
1 /*
2  * Copyright (c) 2016 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include <stdio.h>
16 #include <stdlib.h>
17 #include <stddef.h>
18 #include <sys/types.h>
19 #include <sys/socket.h>
20 #include <sys/mman.h>
21 #include <sys/stat.h>
22 #include <netinet/in.h>
23 #include <netdb.h>
24 #include <signal.h>
25 #include <stdbool.h>
26 #include <vnet/vnet.h>
27 #include <vlib/vlib.h>
28 #include <vlib/unix/unix.h>
29 #include <vlibapi/api.h>
30 #include <vlibmemory/api.h>
31
32 #include <vpp/api/vpe_msg_enum.h>
33
34 #include "vppapiclient.h"
35
36 /*
37  * Asynchronous mode:
38  *  Client registers a callback. All messages are sent to the callback.
39  * Synchronous mode:
40  *  Client calls blocking read().
41  *  Clients are expected to collate events on a queue.
42  *  vac_write() -> suspends RX thread
43  *  vac_read() -> resumes RX thread
44  */
45
46 #define vl_typedefs             /* define message structures */
47 #include <vpp/api/vpe_all_api_h.h>
48 #undef vl_typedefs
49
50 #define vl_endianfun             /* define message structures */
51 #include <vpp/api/vpe_all_api_h.h>
52 #undef vl_endianfun
53
54 vlib_main_t vlib_global_main;
55 vlib_main_t **vlib_mains;
56
57 typedef struct {
58   u8 connected_to_vlib;
59   pthread_t rx_thread_handle;
60   pthread_t timeout_thread_handle;
61   pthread_mutex_t queue_lock;
62   pthread_cond_t suspend_cv;
63   pthread_cond_t resume_cv;
64   pthread_mutex_t timeout_lock;
65   u8 timeout_loop;
66   pthread_cond_t timeout_cv;
67   pthread_cond_t timeout_cancel_cv;
68   pthread_cond_t terminate_cv;
69 } vac_main_t;
70
71 vac_main_t vac_main;
72 vac_callback_t vac_callback;
73 u16 read_timeout = 0;
74 bool rx_is_running = false;
75 bool timeout_thread_cancelled = false;
76
77 /* Set to true to enable memory tracing */
78 bool mem_trace = false;
79
80 __attribute__((constructor))
81 static void
82 vac_client_constructor (void)
83 {
84   clib_mem_init (0, 1 << 30);
85 #if USE_DLMALLOC == 0
86   {
87       u8 *heap;
88       mheap_t *h;
89
90       heap = clib_mem_get_per_cpu_heap ();
91       h = mheap_header (heap);
92       /* make the main heap thread-safe */
93       h->flags |= MHEAP_FLAG_THREAD_SAFE;
94   }
95 #endif
96   if (mem_trace)
97     clib_mem_trace (1);
98 }
99
100 __attribute__((destructor))
101 static void
102 vac_client_destructor (void)
103 {
104   if (mem_trace)
105     fformat(stderr, "TRACE: %s",
106             format (0, "%U\n",
107                     format_mheap, clib_mem_get_heap (), 1));
108 }
109
110
111 static void
112 init (void)
113 {
114   vac_main_t *pm = &vac_main;
115   memset(pm, 0, sizeof(*pm));
116   pthread_mutex_init(&pm->queue_lock, NULL);
117   pthread_cond_init(&pm->suspend_cv, NULL);
118   pthread_cond_init(&pm->resume_cv, NULL);
119   pthread_mutex_init(&pm->timeout_lock, NULL);
120   pm->timeout_loop = 1;
121   pthread_cond_init(&pm->timeout_cv, NULL);
122   pthread_cond_init(&pm->timeout_cancel_cv, NULL);
123   pthread_cond_init(&pm->terminate_cv, NULL);
124 }
125
126 static void
127 cleanup (void)
128 {
129   vac_main_t *pm = &vac_main;
130   pthread_mutex_destroy(&pm->queue_lock);
131   pthread_cond_destroy(&pm->suspend_cv);
132   pthread_cond_destroy(&pm->resume_cv);
133   pthread_mutex_destroy(&pm->timeout_lock);
134   pthread_cond_destroy(&pm->timeout_cv);
135   pthread_cond_destroy(&pm->timeout_cancel_cv);
136   pthread_cond_destroy(&pm->terminate_cv);
137   memset(pm, 0, sizeof(*pm));
138 }
139
140 /*
141  * Satisfy external references when -lvlib is not available.
142  */
143 void vlib_cli_output (struct vlib_main_t * vm, char * fmt, ...)
144 {
145   clib_warning ("vlib_cli_output called...");
146 }
147
148 void
149 vac_free (void * msg)
150 {
151   vl_msg_api_free (msg);
152 }
153
154 static void
155 vac_api_handler (void *msg)
156 {
157   u16 id = ntohs(*((u16 *)msg));
158   msgbuf_t *msgbuf = (msgbuf_t *)(((u8 *)msg) - offsetof(msgbuf_t, data));
159   int l = ntohl(msgbuf->data_len);
160   if (l == 0)
161     clib_warning("Message ID %d has wrong length: %d\n", id, l);
162
163   /* Call Python callback */
164   ASSERT(vac_callback);
165   (vac_callback)(msg, l);
166   vac_free(msg);
167 }
168
169 static void *
170 vac_rx_thread_fn (void *arg)
171 {
172   svm_queue_t *q;
173   vl_api_memclnt_keepalive_t *mp;
174   vl_api_memclnt_keepalive_reply_t *rmp;
175   vac_main_t *pm = &vac_main;
176   api_main_t *am = &api_main;
177   vl_shmem_hdr_t *shmem_hdr;
178   uword msg;
179
180   q = am->vl_input_queue;
181
182   while (1)
183     while (!svm_queue_sub(q, (u8 *)&msg, SVM_Q_WAIT, 0))
184       {
185         u16 id = ntohs(*((u16 *)msg));
186         switch (id) {
187         case VL_API_RX_THREAD_EXIT:
188           vl_msg_api_free((void *) msg);
189           /* signal waiting threads that this thread is about to terminate */
190           pthread_mutex_lock(&pm->queue_lock);
191           pthread_cond_signal(&pm->terminate_cv);
192           pthread_mutex_unlock(&pm->queue_lock);
193           pthread_exit(0);
194           return 0;
195           break;
196
197         case VL_API_MEMCLNT_RX_THREAD_SUSPEND:
198           vl_msg_api_free((void * )msg);
199           /* Suspend thread and signal reader */
200           pthread_mutex_lock(&pm->queue_lock);
201           pthread_cond_signal(&pm->suspend_cv);
202           /* Wait for the resume signal */
203           pthread_cond_wait (&pm->resume_cv, &pm->queue_lock);
204           pthread_mutex_unlock(&pm->queue_lock);
205           break;
206
207         case VL_API_MEMCLNT_READ_TIMEOUT:
208           clib_warning("Received read timeout in async thread\n");
209           vl_msg_api_free((void *) msg);
210           break;
211
212         case VL_API_MEMCLNT_KEEPALIVE:
213           mp = (void *)msg;
214           rmp = vl_msg_api_alloc (sizeof (*rmp));
215           memset (rmp, 0, sizeof (*rmp));
216           rmp->_vl_msg_id = ntohs(VL_API_MEMCLNT_KEEPALIVE_REPLY);
217           rmp->context = mp->context;
218           shmem_hdr = am->shmem_hdr;
219           vl_msg_api_send_shmem(shmem_hdr->vl_input_queue, (u8 *)&rmp);
220           vl_msg_api_free((void *) msg);
221           break;
222
223         default:
224           vac_api_handler((void *)msg);
225         }
226       }
227 }
228
229 static void *
230 vac_timeout_thread_fn (void *arg)
231 {
232   vl_api_memclnt_read_timeout_t *ep;
233   vac_main_t *pm = &vac_main;
234   api_main_t *am = &api_main;
235   struct timespec ts;
236   struct timeval tv;
237   u16 timeout;
238   int rv;
239
240   while (pm->timeout_loop)
241     {
242       /* Wait for poke */
243       pthread_mutex_lock(&pm->timeout_lock);
244       pthread_cond_wait (&pm->timeout_cv, &pm->timeout_lock);
245       timeout = read_timeout;
246       gettimeofday(&tv, NULL);
247       ts.tv_sec = tv.tv_sec + timeout;
248       ts.tv_nsec = 0;
249       rv = pthread_cond_timedwait (&pm->timeout_cancel_cv,
250                                    &pm->timeout_lock, &ts);
251       pthread_mutex_unlock(&pm->timeout_lock);
252       if (rv == ETIMEDOUT && !timeout_thread_cancelled)
253         {
254           ep = vl_msg_api_alloc (sizeof (*ep));
255           ep->_vl_msg_id = ntohs(VL_API_MEMCLNT_READ_TIMEOUT);
256           vl_msg_api_send_shmem(am->vl_input_queue, (u8 *)&ep);
257         }
258     }
259   pthread_exit(0);
260 }
261
262 void
263 vac_rx_suspend (void)
264 {
265   api_main_t *am = &api_main;
266   vac_main_t *pm = &vac_main;
267   vl_api_memclnt_rx_thread_suspend_t *ep;
268
269   if (!pm->rx_thread_handle) return;
270   pthread_mutex_lock(&pm->queue_lock);
271   if (rx_is_running)
272     {
273       ep = vl_msg_api_alloc (sizeof (*ep));
274       ep->_vl_msg_id = ntohs(VL_API_MEMCLNT_RX_THREAD_SUSPEND);
275       vl_msg_api_send_shmem(am->vl_input_queue, (u8 *)&ep);
276       /* Wait for RX thread to tell us it has suspended */
277       pthread_cond_wait(&pm->suspend_cv, &pm->queue_lock);
278       rx_is_running = false;
279     }
280   pthread_mutex_unlock(&pm->queue_lock);
281 }
282
283 void
284 vac_rx_resume (void)
285 {
286   vac_main_t *pm = &vac_main;
287   if (!pm->rx_thread_handle) return;
288   pthread_mutex_lock(&pm->queue_lock);
289   if (rx_is_running) goto unlock;
290   pthread_cond_signal(&pm->resume_cv);
291   rx_is_running = true;
292  unlock:
293   pthread_mutex_unlock(&pm->queue_lock);
294 }
295
296 static uword *
297 vac_msg_table_get_hash (void)
298 {
299   api_main_t *am = &api_main;
300   return (am->msg_index_by_name_and_crc);
301 }
302
303 int
304 vac_msg_table_size(void)
305 {
306   api_main_t *am = &api_main;
307   return hash_elts(am->msg_index_by_name_and_crc);
308 }
309
310 int
311 vac_connect (char * name, char * chroot_prefix, vac_callback_t cb,
312                int rx_qlen)
313 {
314   int rv = 0;
315   vac_main_t *pm = &vac_main;
316
317   init();
318   if (chroot_prefix != NULL)
319     vl_set_memory_root_path (chroot_prefix);
320
321   if ((rv = vl_client_api_map("/vpe-api"))) {
322     clib_warning ("vl_client_api_map returned %d", rv);
323     return rv;
324   }
325
326   if (vl_client_connect(name, 0, rx_qlen) < 0) {
327     vl_client_api_unmap();
328     return (-1);
329   }
330
331   if (cb) {
332     /* Start the rx queue thread */
333     rv = pthread_create(&pm->rx_thread_handle, NULL, vac_rx_thread_fn, 0);
334     if (rv) {
335       clib_warning("pthread_create returned %d", rv);
336       vl_client_api_unmap();
337       return (-1);
338     }
339     vac_callback = cb;
340     rx_is_running = true;
341   }
342
343   /* Start read timeout thread */
344   rv = pthread_create(&pm->timeout_thread_handle, NULL,
345                       vac_timeout_thread_fn, 0);
346   if (rv) {
347     clib_warning("pthread_create returned %d", rv);
348     vl_client_api_unmap();
349     return (-1);
350   }
351
352   pm->connected_to_vlib = 1;
353
354   return (0);
355 }
356
357 static void
358 set_timeout (unsigned short timeout)
359 {
360   vac_main_t *pm = &vac_main;
361   pthread_mutex_lock(&pm->timeout_lock);
362   read_timeout = timeout;
363   pthread_cond_signal(&pm->timeout_cv);
364   pthread_mutex_unlock(&pm->timeout_lock);
365 }
366
367 static void
368 unset_timeout (void)
369 {
370   vac_main_t *pm = &vac_main;
371   pthread_mutex_lock(&pm->timeout_lock);
372   pthread_cond_signal(&pm->timeout_cancel_cv);
373   pthread_mutex_unlock(&pm->timeout_lock);
374 }
375
376 int
377 vac_disconnect (void)
378 {
379   api_main_t *am = &api_main;
380   vac_main_t *pm = &vac_main;
381   uword junk;
382
383   if (!pm->connected_to_vlib) return 0;
384
385   if (pm->rx_thread_handle) {
386     vl_api_rx_thread_exit_t *ep;
387     ep = vl_msg_api_alloc (sizeof (*ep));
388     ep->_vl_msg_id = ntohs(VL_API_RX_THREAD_EXIT);
389     vl_msg_api_send_shmem(am->vl_input_queue, (u8 *)&ep);
390
391     /* wait (with timeout) until RX thread has finished */
392     struct timespec ts;
393     struct timeval tv;
394     gettimeofday(&tv, NULL);
395     ts.tv_sec = tv.tv_sec + 5;
396     ts.tv_nsec = 0;
397     pthread_mutex_lock(&pm->queue_lock);
398     int rv = pthread_cond_timedwait(&pm->terminate_cv, &pm->queue_lock, &ts);
399     pthread_mutex_unlock(&pm->queue_lock);
400     /* now join so we wait until thread has -really- finished */
401     if (rv == ETIMEDOUT)
402       pthread_cancel(pm->rx_thread_handle);
403     else
404       pthread_join(pm->rx_thread_handle, (void **) &junk);
405   }
406   if (pm->timeout_thread_handle) {
407     /* cancel, wake then join the timeout thread */
408     pm->timeout_loop = 0;
409     timeout_thread_cancelled = true;
410     set_timeout(0);
411     pthread_join(pm->timeout_thread_handle, (void **) &junk);
412   }
413
414   vl_client_disconnect();
415   vl_client_api_unmap();
416   vac_callback = 0;
417
418   cleanup();
419
420   return (0);
421 }
422
423 int
424 vac_read (char **p, int *l, u16 timeout)
425 {
426   svm_queue_t *q;
427   api_main_t *am = &api_main;
428   vac_main_t *pm = &vac_main;
429   vl_api_memclnt_keepalive_t *mp;
430   vl_api_memclnt_keepalive_reply_t *rmp;
431   uword msg;
432   msgbuf_t *msgbuf;
433   int rv;
434   vl_shmem_hdr_t *shmem_hdr;
435
436   if (!pm->connected_to_vlib) return -1;
437
438   *l = 0;
439
440   if (am->our_pid == 0) return (-1);
441
442   /* Poke timeout thread */
443   if (timeout)
444     set_timeout(timeout);
445
446   q = am->vl_input_queue;
447
448  again:
449   rv = svm_queue_sub(q, (u8 *)&msg, SVM_Q_WAIT, 0);
450
451   if (rv == 0) {
452     u16 msg_id = ntohs(*((u16 *)msg));
453     switch (msg_id) {
454     case VL_API_RX_THREAD_EXIT:
455       vl_msg_api_free((void *) msg);
456       return -1;
457     case VL_API_MEMCLNT_RX_THREAD_SUSPEND:
458       goto error;
459     case VL_API_MEMCLNT_READ_TIMEOUT:
460       goto error;
461     case VL_API_MEMCLNT_KEEPALIVE:
462       /* Handle an alive-check ping from vpp. */
463       mp = (void *)msg;
464       rmp = vl_msg_api_alloc (sizeof (*rmp));
465       memset (rmp, 0, sizeof (*rmp));
466       rmp->_vl_msg_id = ntohs(VL_API_MEMCLNT_KEEPALIVE_REPLY);
467       rmp->context = mp->context;
468       shmem_hdr = am->shmem_hdr;
469       vl_msg_api_send_shmem(shmem_hdr->vl_input_queue, (u8 *)&rmp);
470       vl_msg_api_free((void *) msg);
471       /* 
472        * Python code is blissfully unaware of these pings, so
473        * act as if it never happened...
474        */
475       goto again;
476
477     default:
478       msgbuf = (msgbuf_t *)(((u8 *)msg) - offsetof(msgbuf_t, data));
479       *l = ntohl(msgbuf->data_len);
480       if (*l == 0) {
481         fprintf(stderr, "Unregistered API message: %d\n", msg_id);
482         goto error;
483       }
484     }
485     *p = (char *)msg;
486
487     /* Let timeout notification thread know we're done */
488     unset_timeout();
489
490   } else {
491     fprintf(stderr, "Read failed with %d\n", rv);
492   }
493   return (rv);
494
495  error:
496   vl_msg_api_free((void *) msg);
497   /* Client might forget to resume RX thread on failure */
498   vac_rx_resume ();
499   return -1;
500 }
501
502 /*
503  * XXX: Makes the assumption that client_index is the first member
504  */
505 typedef VL_API_PACKED(struct _vl_api_header {
506   u16 _vl_msg_id;
507   u32 client_index;
508 }) vl_api_header_t;
509
510 static unsigned int
511 vac_client_index (void)
512 {
513   return (api_main.my_client_index);
514 }
515
516 int
517 vac_write (char *p, int l)
518 {
519   int rv = -1;
520   api_main_t *am = &api_main;
521   vl_api_header_t *mp = vl_msg_api_alloc(l);
522   svm_queue_t *q;
523   vac_main_t *pm = &vac_main;
524
525   if (!pm->connected_to_vlib) return -1;
526   if (!mp) return (-1);
527
528   memcpy(mp, p, l);
529   mp->client_index = vac_client_index();
530   q = am->shmem_hdr->vl_input_queue;
531   rv = svm_queue_add(q, (u8 *)&mp, 0);
532   if (rv != 0) {
533     fprintf(stderr, "vpe_api_write fails: %d\n", rv);
534     /* Clear message */
535     vac_free(mp);
536   }
537   return (rv);
538 }
539
540 int
541 vac_get_msg_index (unsigned char * name)
542 {
543   return vl_msg_api_get_msg_index (name);
544 }
545
546 int
547 vac_msg_table_max_index(void)
548 {
549   int max = 0;
550   hash_pair_t *hp;
551   uword *h = vac_msg_table_get_hash();
552   hash_foreach_pair (hp, h,
553   ({
554     if (hp->value[0] > max)
555       max = hp->value[0];
556   }));
557
558   return max;
559 }
560
561 void
562 vac_set_error_handler (vac_error_callback_t cb)
563 {
564   if (cb) clib_error_register_handler (cb, 0);
565 }