55dda1044b6e46e1c0df93c9692a72bd9677c507
[vpp.git] / src / vpp-api / python / vpp_papi.py
1 #!/usr/bin/env python
2 #
3 # Copyright (c) 2016 Cisco and/or its affiliates.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16
17 from __future__ import print_function
18 import sys
19 import os
20 import logging
21 import collections
22 import struct
23 import json
24 import threading
25 import glob
26 import atexit
27 from cffi import FFI
28
29 if sys.version[0] == '2':
30     import Queue as queue
31 else:
32     import queue as queue
33
34 ffi = FFI()
35 ffi.cdef("""
36 typedef void (*vac_callback_t)(unsigned char * data, int len);
37 typedef void (*vac_error_callback_t)(void *, unsigned char *, int);
38 int vac_connect(char * name, char * chroot_prefix, vac_callback_t cb,
39     int rx_qlen);
40 int vac_disconnect(void);
41 int vac_read(char **data, int *l, unsigned short timeout);
42 int vac_write(char *data, int len);
43 void vac_free(void * msg);
44
45 int vac_get_msg_index(unsigned char * name);
46 int vac_msg_table_size(void);
47 int vac_msg_table_max_index(void);
48
49 void vac_rx_suspend (void);
50 void vac_rx_resume (void);
51 void vac_set_error_handler(vac_error_callback_t);
52  """)
53
54 # Barfs on failure, no need to check success.
55 vpp_api = ffi.dlopen('libvppapiclient.so')
56
57
58 def vpp_atexit(self):
59     """Clean up VPP connection on shutdown."""
60     if self.connected:
61         self.logger.debug('Cleaning up VPP on exit')
62         self.disconnect()
63
64 vpp_object = None
65
66
67 def vpp_iterator(d):
68     if sys.version[0] == '2':
69         return d.iteritems()
70     else:
71         return d.items()
72
73
74 @ffi.callback("void(unsigned char *, int)")
75 def vac_callback_sync(data, len):
76     vpp_object.msg_handler_sync(ffi.buffer(data, len))
77
78
79 @ffi.callback("void(unsigned char *, int)")
80 def vac_callback_async(data, len):
81     vpp_object.msg_handler_async(ffi.buffer(data, len))
82
83
84 @ffi.callback("void(void *, unsigned char *, int)")
85 def vac_error_handler(arg, msg, msg_len):
86     vpp_object.logger.warning("VPP API client:: %s", ffi.string(msg, msg_len))
87
88
89 class Empty(object):
90     pass
91
92
93 class FuncWrapper(object):
94     def __init__(self, func):
95         self._func = func
96         self.__name__ = func.__name__
97
98     def __call__(self, **kwargs):
99         return self._func(**kwargs)
100
101
102 class VPP():
103     """VPP interface.
104
105     This class provides the APIs to VPP.  The APIs are loaded
106     from provided .api.json files and makes functions accordingly.
107     These functions are documented in the VPP .api files, as they
108     are dynamically created.
109
110     Additionally, VPP can send callback messages; this class
111     provides a means to register a callback function to receive
112     these messages in a background thread.
113     """
114     def __init__(self, apifiles=None, testmode=False, async_thread=True,
115                  logger=logging.getLogger('vpp_papi'), loglevel='debug'):
116         """Create a VPP API object.
117
118         apifiles is a list of files containing API
119         descriptions that will be loaded - methods will be
120         dynamically created reflecting these APIs.  If not
121         provided this will load the API files from VPP's
122         default install location.
123         """
124         global vpp_object
125         vpp_object = self
126         self.logger = logger
127         logging.basicConfig(level=getattr(logging, loglevel.upper()))
128
129         self.messages = {}
130         self.id_names = []
131         self.id_msgdef = []
132         self.buffersize = 10000
133         self.connected = False
134         self.header = struct.Struct('>HI')
135         self.apifiles = []
136         self.event_callback = None
137         self.message_queue = queue.Queue()
138         self.read_timeout = 0
139         self.vpp_api = vpp_api
140         if async_thread:
141             self.event_thread = threading.Thread(
142                 target=self.thread_msg_handler)
143             self.event_thread.daemon = True
144             self.event_thread.start()
145
146         if not apifiles:
147             # Pick up API definitions from default directory
148             apifiles = glob.glob('/usr/share/vpp/api/*.api.json')
149
150         for file in apifiles:
151             with open(file) as apidef_file:
152                 api = json.load(apidef_file)
153                 for t in api['types']:
154                     self.add_type(t[0], t[1:])
155
156                 for m in api['messages']:
157                     self.add_message(m[0], m[1:])
158         self.apifiles = apifiles
159
160         # Basic sanity check
161         if len(self.messages) == 0 and not testmode:
162             raise ValueError(1, 'Missing JSON message definitions')
163
164         # Make sure we allow VPP to clean up the message rings.
165         atexit.register(vpp_atexit, self)
166
167         # Register error handler
168         vpp_api.vac_set_error_handler(vac_error_handler)
169
170     class ContextId(object):
171         """Thread-safe provider of unique context IDs."""
172         def __init__(self):
173             self.context = 0
174             self.lock = threading.Lock()
175
176         def __call__(self):
177             """Get a new unique (or, at least, not recently used) context."""
178             with self.lock:
179                 self.context += 1
180                 return self.context
181     get_context = ContextId()
182
183     def status(self):
184         """Debug function: report current VPP API status to stdout."""
185         print('Connected') if self.connected else print('Not Connected')
186         print('Read API definitions from', ', '.join(self.apifiles))
187
188     def __struct(self, t, n=None, e=-1, vl=None):
189         """Create a packing structure for a message."""
190         base_types = {'u8': 'B',
191                       'u16': 'H',
192                       'u32': 'I',
193                       'i32': 'i',
194                       'u64': 'Q',
195                       'f64': 'd', }
196         pack = None
197         if t in base_types:
198             pack = base_types[t]
199             if not vl:
200                 if e > 0 and t == 'u8':
201                     # Fixed byte array
202                     return struct.Struct('>' + str(e) + 's')
203                 if e > 0:
204                     # Fixed array of base type
205                     return [e, struct.Struct('>' + base_types[t])]
206                 elif e == 0:
207                     # Old style variable array
208                     return [-1, struct.Struct('>' + base_types[t])]
209             else:
210                 # Variable length array
211                 return [vl, struct.Struct('>s')] if t == 'u8' else \
212                     [vl, struct.Struct('>' + base_types[t])]
213
214             return struct.Struct('>' + base_types[t])
215
216         if t in self.messages:
217             # Return a list in case of array
218             if e > 0 and not vl:
219                 return [e, lambda self, encode, buf, offset, args: (
220                     self.__struct_type(encode, self.messages[t], buf, offset,
221                                        args))]
222             if vl:
223                 return [vl, lambda self, encode, buf, offset, args: (
224                     self.__struct_type(encode, self.messages[t], buf, offset,
225                                        args))]
226             elif e == 0:
227                 # Old style VLA
228                 raise NotImplementedError(1,
229                                           'No support for compound types ' + t)
230             return lambda self, encode, buf, offset, args: (
231                 self.__struct_type(encode, self.messages[t], buf, offset, args)
232             )
233
234         raise ValueError(1, 'Invalid message type: ' + t)
235
236     def __struct_type(self, encode, msgdef, buf, offset, kwargs):
237         """Get a message packer or unpacker."""
238         if encode:
239             return self.__struct_type_encode(msgdef, buf, offset, kwargs)
240         else:
241             return self.__struct_type_decode(msgdef, buf, offset)
242
243     def __struct_type_encode(self, msgdef, buf, offset, kwargs):
244         off = offset
245         size = 0
246
247         for k in kwargs:
248             if k not in msgdef['args']:
249                 raise ValueError(1, 'Invalid field-name in message call ' + k)
250
251         for k, v in vpp_iterator(msgdef['args']):
252             off += size
253             if k in kwargs:
254                 if type(v) is list:
255                     if callable(v[1]):
256                         e = kwargs[v[0]] if v[0] in kwargs else v[0]
257                         size = 0
258                         for i in range(e):
259                             size += v[1](self, True, buf, off + size,
260                                          kwargs[k][i])
261                     else:
262                         if v[0] in kwargs:
263                             l = kwargs[v[0]]
264                         else:
265                             l = len(kwargs[k])
266                         if v[1].size == 1:
267                             buf[off:off + l] = bytearray(kwargs[k])
268                             size = l
269                         else:
270                             size = 0
271                             for i in kwargs[k]:
272                                 v[1].pack_into(buf, off + size, i)
273                                 size += v[1].size
274                 else:
275                     if callable(v):
276                         size = v(self, True, buf, off, kwargs[k])
277                     else:
278                         v.pack_into(buf, off, kwargs[k])
279                         size = v.size
280             else:
281                 size = v.size if not type(v) is list else 0
282
283         return off + size - offset
284
285     def __getitem__(self, name):
286         if name in self.messages:
287             return self.messages[name]
288         return None
289
290     def encode(self, msgdef, kwargs):
291         # Make suitably large buffer
292         buf = bytearray(self.buffersize)
293         offset = 0
294         size = self.__struct_type(True, msgdef, buf, offset, kwargs)
295         return buf[:offset + size]
296
297     def decode(self, msgdef, buf):
298         return self.__struct_type(False, msgdef, buf, 0, None)[1]
299
300     def __struct_type_decode(self, msgdef, buf, offset):
301         res = []
302         off = offset
303         size = 0
304         for k, v in vpp_iterator(msgdef['args']):
305             off += size
306             if type(v) is list:
307                 lst = []
308                 if callable(v[1]):  # compound type
309                     size = 0
310                     if v[0] in msgdef['args']:  # vla
311                         e = res[v[2]]
312                     else:  # fixed array
313                         e = v[0]
314                     res.append(lst)
315                     for i in range(e):
316                         (s, l) = v[1](self, False, buf, off + size, None)
317                         lst.append(l)
318                         size += s
319                     continue
320                 if v[1].size == 1:
321                     if type(v[0]) is int:
322                         size = len(buf) - off
323                     else:
324                         size = res[v[2]]
325                     res.append(buf[off:off + size])
326                 else:
327                     e = v[0] if type(v[0]) is int else res[v[2]]
328                     if e == -1:
329                         e = (len(buf) - off) / v[1].size
330                     lst = []
331                     res.append(lst)
332                     size = 0
333                     for i in range(e):
334                         lst.append(v[1].unpack_from(buf, off + size)[0])
335                         size += v[1].size
336             else:
337                 if callable(v):
338                     (s, l) = v(self, False, buf, off, None)
339                     res.append(l)
340                     size += s
341                 else:
342                     res.append(v.unpack_from(buf, off)[0])
343                     size = v.size
344
345         return off + size - offset, msgdef['return_tuple']._make(res)
346
347     def ret_tup(self, name):
348         if name in self.messages and 'return_tuple' in self.messages[name]:
349             return self.messages[name]['return_tuple']
350         return None
351
352     def add_message(self, name, msgdef, typeonly=False):
353         if name in self.messages:
354             raise ValueError('Duplicate message name: ' + name)
355
356         args = collections.OrderedDict()
357         argtypes = collections.OrderedDict()
358         fields = []
359         msg = {}
360         for i, f in enumerate(msgdef):
361             if type(f) is dict and 'crc' in f:
362                 msg['crc'] = f['crc']
363                 continue
364             field_type = f[0]
365             field_name = f[1]
366             if len(f) == 3 and f[2] == 0 and i != len(msgdef) - 2:
367                 raise ValueError('Variable Length Array must be last: ' + name)
368             args[field_name] = self.__struct(*f)
369             argtypes[field_name] = field_type
370             if len(f) == 4:  # Find offset to # elements field
371                 idx = list(args.keys()).index(f[3]) - i
372                 args[field_name].append(idx)
373             fields.append(field_name)
374         msg['return_tuple'] = collections.namedtuple(name, fields,
375                                                      rename=True)
376         self.messages[name] = msg
377         self.messages[name]['args'] = args
378         self.messages[name]['argtypes'] = argtypes
379         self.messages[name]['typeonly'] = typeonly
380         return self.messages[name]
381
382     def add_type(self, name, typedef):
383         return self.add_message('vl_api_' + name + '_t', typedef,
384                                 typeonly=True)
385
386     def make_function(self, name, i, msgdef, multipart, async):
387         if (async):
388             f = lambda **kwargs: (self._call_vpp_async(i, msgdef, **kwargs))
389         else:
390             f = lambda **kwargs: (self._call_vpp(i, msgdef, multipart,
391                                                  **kwargs))
392         args = self.messages[name]['args']
393         argtypes = self.messages[name]['argtypes']
394         f.__name__ = str(name)
395         f.__doc__ = ", ".join(["%s %s" %
396                                (argtypes[k], k) for k in args.keys()])
397         return f
398
399     @property
400     def api(self):
401         if not hasattr(self, "_api"):
402             raise Exception("Not connected, api definitions not available")
403         return self._api
404
405     def _register_functions(self, async=False):
406         self.id_names = [None] * (self.vpp_dictionary_maxid + 1)
407         self.id_msgdef = [None] * (self.vpp_dictionary_maxid + 1)
408         self._api = Empty()
409         for name, msgdef in vpp_iterator(self.messages):
410             if self.messages[name]['typeonly']:
411                 continue
412             crc = self.messages[name]['crc']
413             n = name + '_' + crc[2:]
414             i = vpp_api.vac_get_msg_index(n.encode())
415             if i > 0:
416                 self.id_msgdef[i] = msgdef
417                 self.id_names[i] = name
418                 multipart = True if name.find('_dump') > 0 else False
419                 f = self.make_function(name, i, msgdef, multipart, async)
420                 setattr(self._api, name, FuncWrapper(f))
421
422                 # old API stuff starts here - will be removed in 17.07
423                 if hasattr(self, name):
424                     raise NameError(
425                         3, "Conflicting name in JSON definition: `%s'" % name)
426                 setattr(self, name, f)
427                 # old API stuff ends here
428             else:
429                 self.logger.debug(
430                     'No such message type or failed CRC checksum: %s', n)
431
432     def _write(self, buf):
433         """Send a binary-packed message to VPP."""
434         if not self.connected:
435             raise IOError(1, 'Not connected')
436         return vpp_api.vac_write(ffi.from_buffer(buf), len(buf))
437
438     def _read(self):
439         if not self.connected:
440             raise IOError(1, 'Not connected')
441         mem = ffi.new("char **")
442         size = ffi.new("int *")
443         rv = vpp_api.vac_read(mem, size, self.read_timeout)
444         if rv:
445             raise IOError(rv, 'vac_read failed')
446         msg = bytes(ffi.buffer(mem[0], size[0]))
447         vpp_api.vac_free(mem[0])
448         return msg
449
450     def connect_internal(self, name, msg_handler, chroot_prefix, rx_qlen,
451                          async):
452         pfx = chroot_prefix.encode() if chroot_prefix else ffi.NULL
453         rv = vpp_api.vac_connect(name.encode(), pfx, msg_handler, rx_qlen)
454         if rv != 0:
455             raise IOError(2, 'Connect failed')
456         self.connected = True
457
458         self.vpp_dictionary_maxid = vpp_api.vac_msg_table_max_index()
459         self._register_functions(async=async)
460
461         # Initialise control ping
462         crc = self.messages['control_ping']['crc']
463         self.control_ping_index = vpp_api.vac_get_msg_index(
464             ('control_ping' + '_' + crc[2:]).encode())
465         self.control_ping_msgdef = self.messages['control_ping']
466         return rv
467
468     def connect(self, name, chroot_prefix=None, async=False, rx_qlen=32):
469         """Attach to VPP.
470
471         name - the name of the client.
472         chroot_prefix - if VPP is chroot'ed, the prefix of the jail
473         async - if true, messages are sent without waiting for a reply
474         rx_qlen - the length of the VPP message receive queue between
475         client and server.
476         """
477         msg_handler = vac_callback_sync if not async else vac_callback_async
478         return self.connect_internal(name, msg_handler, chroot_prefix, rx_qlen,
479                                      async)
480
481     def connect_sync(self, name, chroot_prefix=None, rx_qlen=32):
482         """Attach to VPP in synchronous mode. Application must poll for events.
483
484         name - the name of the client.
485         chroot_prefix - if VPP is chroot'ed, the prefix of the jail
486         rx_qlen - the length of the VPP message receive queue between
487         client and server.
488         """
489
490         return self.connect_internal(name, ffi.NULL, chroot_prefix, rx_qlen,
491                                      async=False)
492
493     def disconnect(self):
494         """Detach from VPP."""
495         rv = vpp_api.vac_disconnect()
496         self.connected = False
497         return rv
498
499     def msg_handler_sync(self, msg):
500         """Process an incoming message from VPP in sync mode.
501
502         The message may be a reply or it may be an async notification.
503         """
504         r = self.decode_incoming_msg(msg)
505         if r is None:
506             return
507
508         # If we have a context, then use the context to find any
509         # request waiting for a reply
510         context = 0
511         if hasattr(r, 'context') and r.context > 0:
512             context = r.context
513
514         msgname = type(r).__name__
515
516         if context == 0:
517             # No context -> async notification that we feed to the callback
518             self.message_queue.put_nowait(r)
519         else:
520             raise IOError(2, 'RPC reply message received in event handler')
521
522     def decode_incoming_msg(self, msg):
523         if not msg:
524             self.logger.warning('vpp_api.read failed')
525             return
526
527         i, ci = self.header.unpack_from(msg, 0)
528         if self.id_names[i] == 'rx_thread_exit':
529             return
530
531         #
532         # Decode message and returns a tuple.
533         #
534         msgdef = self.id_msgdef[i]
535         if not msgdef:
536             raise IOError(2, 'Reply message undefined')
537
538         r = self.decode(msgdef, msg)
539
540         return r
541
542     def msg_handler_async(self, msg):
543         """Process a message from VPP in async mode.
544
545         In async mode, all messages are returned to the callback.
546         """
547         r = self.decode_incoming_msg(msg)
548         if r is None:
549             return
550
551         msgname = type(r).__name__
552
553         if self.event_callback:
554             self.event_callback(msgname, r)
555
556     def _control_ping(self, context):
557         """Send a ping command."""
558         self._call_vpp_async(self.control_ping_index,
559                              self.control_ping_msgdef,
560                              context=context)
561
562     def _call_vpp(self, i, msgdef, multipart, **kwargs):
563         """Given a message, send the message and await a reply.
564
565         msgdef - the message packing definition
566         i - the message type index
567         multipart - True if the message returns multiple
568         messages in return.
569         context - context number - chosen at random if not
570         supplied.
571         The remainder of the kwargs are the arguments to the API call.
572
573         The return value is the message or message array containing
574         the response.  It will raise an IOError exception if there was
575         no response within the timeout window.
576         """
577
578         if 'context' not in kwargs:
579             context = self.get_context()
580             kwargs['context'] = context
581         else:
582             context = kwargs['context']
583         kwargs['_vl_msg_id'] = i
584         b = self.encode(msgdef, kwargs)
585
586         vpp_api.vac_rx_suspend()
587         self._write(b)
588
589         if multipart:
590             # Send a ping after the request - we use its response
591             # to detect that we have seen all results.
592             self._control_ping(context)
593
594         # Block until we get a reply.
595         rl = []
596         while (True):
597             msg = self._read()
598             if not msg:
599                 raise IOError(2, 'VPP API client: read failed')
600
601             r = self.decode_incoming_msg(msg)
602             msgname = type(r).__name__
603             if context not in r or r.context == 0 or context != r.context:
604                 self.message_queue.put_nowait(r)
605                 continue
606
607             if not multipart:
608                 rl = r
609                 break
610             if msgname == 'control_ping_reply':
611                 break
612
613             rl.append(r)
614
615         vpp_api.vac_rx_resume()
616
617         return rl
618
619     def _call_vpp_async(self, i, msgdef, **kwargs):
620         """Given a message, send the message and await a reply.
621
622         msgdef - the message packing definition
623         i - the message type index
624         context - context number - chosen at random if not
625         supplied.
626         The remainder of the kwargs are the arguments to the API call.
627         """
628         if 'context' not in kwargs:
629             context = self.get_context()
630             kwargs['context'] = context
631         else:
632             context = kwargs['context']
633         kwargs['_vl_msg_id'] = i
634         b = self.encode(msgdef, kwargs)
635
636         self._write(b)
637
638     def register_event_callback(self, callback):
639         """Register a callback for async messages.
640
641         This will be called for async notifications in sync mode,
642         and all messages in async mode.  In sync mode, replies to
643         requests will not come here.
644
645         callback is a fn(msg_type_name, msg_type) that will be
646         called when a message comes in.  While this function is
647         executing, note that (a) you are in a background thread and
648         may wish to use threading.Lock to protect your datastructures,
649         and (b) message processing from VPP will stop (so if you take
650         a long while about it you may provoke reply timeouts or cause
651         VPP to fill the RX buffer).  Passing None will disable the
652         callback.
653         """
654         self.event_callback = callback
655
656     def thread_msg_handler(self):
657         """Python thread calling the user registerd message handler.
658
659         This is to emulate the old style event callback scheme. Modern
660         clients should provide their own thread to poll the event
661         queue.
662         """
663         while True:
664             r = self.message_queue.get()
665             msgname = type(r).__name__
666             if self.event_callback:
667                 self.event_callback(msgname, r)