3 # Copyright (c) 2016 Cisco and/or its affiliates.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
17 from __future__ import print_function
31 if sys.version[0] == '2':
38 typedef void (*vac_callback_t)(unsigned char * data, int len);
39 typedef void (*vac_error_callback_t)(void *, unsigned char *, int);
40 int vac_connect(char * name, char * chroot_prefix, vac_callback_t cb,
42 int vac_disconnect(void);
43 int vac_read(char **data, int *l, unsigned short timeout);
44 int vac_write(char *data, int len);
45 void vac_free(void * msg);
47 int vac_get_msg_index(unsigned char * name);
48 int vac_msg_table_size(void);
49 int vac_msg_table_max_index(void);
51 void vac_rx_suspend (void);
52 void vac_rx_resume (void);
53 void vac_set_error_handler(vac_error_callback_t);
56 # Barfs on failure, no need to check success.
57 vpp_api = ffi.dlopen('libvppapiclient.so')
59 def vpp_atexit(vpp_weakref):
60 """Clean up VPP connection on shutdown."""
61 vpp_instance = vpp_weakref()
62 if vpp_instance.connected:
63 vpp_instance.logger.debug('Cleaning up VPP on exit')
64 vpp_instance.disconnect()
70 if sys.version[0] == '2':
76 @ffi.callback("void(unsigned char *, int)")
77 def vac_callback_sync(data, len):
78 vpp_object.msg_handler_sync(ffi.buffer(data, len))
81 @ffi.callback("void(unsigned char *, int)")
82 def vac_callback_async(data, len):
83 vpp_object.msg_handler_async(ffi.buffer(data, len))
86 @ffi.callback("void(void *, unsigned char *, int)")
87 def vac_error_handler(arg, msg, msg_len):
88 vpp_object.logger.warning("VPP API client:: %s", ffi.string(msg, msg_len))
95 class FuncWrapper(object):
96 def __init__(self, func):
98 self.__name__ = func.__name__
100 def __call__(self, **kwargs):
101 return self._func(**kwargs)
107 This class provides the APIs to VPP. The APIs are loaded
108 from provided .api.json files and makes functions accordingly.
109 These functions are documented in the VPP .api files, as they
110 are dynamically created.
112 Additionally, VPP can send callback messages; this class
113 provides a means to register a callback function to receive
114 these messages in a background thread.
116 def __init__(self, apifiles=None, testmode=False, async_thread=True,
117 logger=logging.getLogger('vpp_papi'), loglevel='debug', read_timeout=0):
118 """Create a VPP API object.
120 apifiles is a list of files containing API
121 descriptions that will be loaded - methods will be
122 dynamically created reflecting these APIs. If not
123 provided this will load the API files from VPP's
124 default install location.
129 logging.basicConfig(level=getattr(logging, loglevel.upper()))
134 self.connected = False
135 self.header = struct.Struct('>HI')
137 self.event_callback = None
138 self.message_queue = queue.Queue()
139 self.read_timeout = read_timeout
140 self.vpp_api = vpp_api
141 self.async_thread = async_thread
144 # Pick up API definitions from default directory
146 apifiles = self.find_api_files()
148 # In test mode we don't care that we can't find the API files
154 for file in apifiles:
155 with open(file) as apidef_file:
156 api = json.load(apidef_file)
157 for t in api['types']:
158 self.add_type(t[0], t[1:])
160 for m in api['messages']:
161 self.add_message(m[0], m[1:])
162 self.apifiles = apifiles
165 if len(self.messages) == 0 and not testmode:
166 raise ValueError(1, 'Missing JSON message definitions')
168 # Make sure we allow VPP to clean up the message rings.
169 atexit.register(vpp_atexit, weakref.ref(self))
171 # Register error handler
172 vpp_api.vac_set_error_handler(vac_error_handler)
174 # Support legacy CFFI
175 # from_buffer supported from 1.8.0
176 (major, minor, patch) = [int(s) for s in cffi.__version__.split('.', 3)]
177 if major >= 1 and minor >= 8:
178 self._write = self._write_new_cffi
180 self._write = self._write_legacy_cffi
182 class ContextId(object):
183 """Thread-safe provider of unique context IDs."""
186 self.lock = threading.Lock()
189 """Get a new unique (or, at least, not recently used) context."""
193 get_context = ContextId()
196 def find_api_dir(cls):
197 """Attempt to find the best directory in which API definition
198 files may reside. If the value VPP_API_DIR exists in the environment
199 then it is first on the search list. If we're inside a recognized
200 location in a VPP source tree (src/scripts and src/vpp-api/python)
201 then entries from there to the likely locations in build-root are
202 added. Finally the location used by system packages is added.
204 :returns: A single directory name, or None if no such directory
209 if 'VPP_API_DIR' in os.environ:
210 dirs.append(os.environ['VPP_API_DIR'])
212 # perhaps we're in the 'src/scripts' or 'src/vpp-api/python' dir;
213 # in which case, plot a course to likely places in the src tree
214 import __main__ as main
215 if hasattr(main, '__file__'):
216 # get the path of the calling script
217 localdir = os.path.dirname(os.path.realpath(main.__file__))
219 # use cwd if there is no calling script
221 localdir_s = localdir.split(os.path.sep)
224 """Match dir against right-hand components of the script dir"""
225 d = dir.split('/') # param 'dir' assumes a / separator
227 return len(localdir_s) > l and localdir_s[-l:] == d
229 def sdir(srcdir, variant):
230 """Build a path from srcdir to the staged API files of
231 'variant' (typically '' or '_debug')"""
232 # Since 'core' and 'plugin' files are staged
233 # in separate directories, we target the parent dir.
234 return os.path.sep.join((
237 'install-vpp%s-native' % variant,
245 if dmatch('src/scripts'):
246 srcdir = os.path.sep.join(localdir_s[:-2])
247 elif dmatch('src/vpp-api/python'):
248 srcdir = os.path.sep.join(localdir_s[:-3])
250 # we're apparently running tests
251 srcdir = os.path.sep.join(localdir_s[:-1])
254 # we're in the source tree, try both the debug and release
256 x = 'vpp/share/vpp/api'
257 dirs.append(sdir(srcdir, '_debug'))
258 dirs.append(sdir(srcdir, ''))
260 # Test for staged copies of the scripts
261 # For these, since we explicitly know if we're running a debug versus
262 # release variant, target only the relevant directory
263 if dmatch('build-root/install-vpp_debug-native/vpp/bin'):
264 srcdir = os.path.sep.join(localdir_s[:-4])
265 dirs.append(sdir(srcdir, '_debug'))
266 if dmatch('build-root/install-vpp-native/vpp/bin'):
267 srcdir = os.path.sep.join(localdir_s[:-4])
268 dirs.append(sdir(srcdir, ''))
270 # finally, try the location system packages typically install into
271 dirs.append(os.path.sep.join(('', 'usr', 'share', 'vpp', 'api')))
273 # check the directories for existance; first one wins
275 if os.path.isdir(dir):
281 def find_api_files(cls, api_dir=None, patterns='*'):
282 """Find API definition files from the given directory tree with the
283 given pattern. If no directory is given then find_api_dir() is used
284 to locate one. If no pattern is given then all definition files found
285 in the directory tree are used.
287 :param api_dir: A directory tree in which to locate API definition
288 files; subdirectories are descended into.
289 If this is None then find_api_dir() is called to discover it.
290 :param patterns: A list of patterns to use in each visited directory
291 when looking for files.
292 This can be a list/tuple object or a comma-separated string of
293 patterns. Each value in the list will have leading/trialing
295 The pattern specifies the first part of the filename, '.api.json'
297 The results are de-duplicated, thus overlapping patterns are fine.
298 If this is None it defaults to '*' meaning "all API files".
299 :returns: A list of file paths for the API files found.
302 api_dir = cls.find_api_dir()
304 raise RuntimeError("api_dir cannot be located")
306 if isinstance(patterns, list) or isinstance(patterns, tuple):
307 patterns = [p.strip() + '.api.json' for p in patterns]
309 patterns = [p.strip() + '.api.json' for p in patterns.split(",")]
312 for root, dirnames, files in os.walk(api_dir):
313 # iterate all given patterns and de-dup the result
314 files = set(sum([fnmatch.filter(files, p) for p in patterns], []))
315 for filename in files:
316 api_files.append(os.path.join(root, filename))
321 """Debug function: report current VPP API status to stdout."""
322 print('Connected') if self.connected else print('Not Connected')
323 print('Read API definitions from', ', '.join(self.apifiles))
325 def __struct(self, t, n=None, e=-1, vl=None):
326 """Create a packing structure for a message."""
327 base_types = {'u8': 'B',
337 if e > 0 and t == 'u8':
339 s = struct.Struct('>' + str(e) + 's')
342 # Fixed array of base type
343 s = struct.Struct('>' + base_types[t])
344 return s.size, [e, s]
346 # Old style variable array
347 s = struct.Struct('>' + base_types[t])
348 return s.size, [-1, s]
350 # Variable length array
352 s = struct.Struct('>s')
353 return s.size, [vl, s]
355 s = struct.Struct('>' + base_types[t])
356 return s.size, [vl, s]
358 s = struct.Struct('>' + base_types[t])
361 if t in self.messages:
362 size = self.messages[t]['sizes'][0]
364 # Return a list in case of array
366 return size, [e, lambda self, encode, buf, offset, args: (
367 self.__struct_type(encode, self.messages[t], buf, offset,
370 return size, [vl, lambda self, encode, buf, offset, args: (
371 self.__struct_type(encode, self.messages[t], buf, offset,
375 raise NotImplementedError(1,
376 'No support for compound types ' + t)
377 return size, lambda self, encode, buf, offset, args: (
378 self.__struct_type(encode, self.messages[t], buf, offset, args)
381 raise ValueError(1, 'Invalid message type: ' + t)
383 def __struct_type(self, encode, msgdef, buf, offset, kwargs):
384 """Get a message packer or unpacker."""
386 return self.__struct_type_encode(msgdef, buf, offset, kwargs)
388 return self.__struct_type_decode(msgdef, buf, offset)
390 def __struct_type_encode(self, msgdef, buf, offset, kwargs):
395 if k not in msgdef['args']:
396 raise ValueError(1,'Non existing argument [' + k + ']' + \
397 ' used in call to: ' + \
398 self.id_names[kwargs['_vl_msg_id']] + '()' )
400 for k, v in vpp_iterator(msgdef['args']):
405 e = kwargs[v[0]] if v[0] in kwargs else v[0]
406 if e != len(kwargs[k]):
407 raise (ValueError(1, 'Input list length mismatch: %s (%s != %s)' % (k, e, len(kwargs[k]))))
410 size += v[1](self, True, buf, off + size,
415 if l != len(kwargs[k]):
416 raise ValueError(1, 'Input list length mismatch: %s (%s != %s)' % (k, l, len(kwargs[k])))
420 buf[off:off + l] = bytearray(kwargs[k])
425 v[1].pack_into(buf, off + size, i)
429 size = v(self, True, buf, off, kwargs[k])
431 if type(kwargs[k]) is str and v.size < len(kwargs[k]):
432 raise ValueError(1, 'Input list length mismatch: %s (%s < %s)' % (k, v.size, len(kwargs[k])))
433 v.pack_into(buf, off, kwargs[k])
436 size = v.size if not type(v) is list else 0
438 return off + size - offset
440 def __getitem__(self, name):
441 if name in self.messages:
442 return self.messages[name]
445 def get_size(self, sizes, kwargs):
446 total_size = sizes[0]
448 if e in kwargs and type(kwargs[e]) is list:
449 total_size += len(kwargs[e]) * sizes[1][e]
452 def encode(self, msgdef, kwargs):
453 # Make suitably large buffer
454 size = self.get_size(msgdef['sizes'], kwargs)
455 buf = bytearray(size)
457 size = self.__struct_type(True, msgdef, buf, offset, kwargs)
458 return buf[:offset + size]
460 def decode(self, msgdef, buf):
461 return self.__struct_type(False, msgdef, buf, 0, None)[1]
463 def __struct_type_decode(self, msgdef, buf, offset):
467 for k, v in vpp_iterator(msgdef['args']):
471 if callable(v[1]): # compound type
473 if v[0] in msgdef['args']: # vla
479 (s, l) = v[1](self, False, buf, off + size, None)
484 if type(v[0]) is int:
485 size = len(buf) - off
488 res.append(buf[off:off + size])
490 e = v[0] if type(v[0]) is int else res[v[2]]
492 e = (len(buf) - off) / v[1].size
497 lst.append(v[1].unpack_from(buf, off + size)[0])
502 (s, l) = v(self, False, buf, off, None)
506 res.append(v.unpack_from(buf, off)[0])
509 return off + size - offset, msgdef['return_tuple']._make(res)
511 def ret_tup(self, name):
512 if name in self.messages and 'return_tuple' in self.messages[name]:
513 return self.messages[name]['return_tuple']
516 def duplicate_check_ok(self, name, msgdef):
519 if type(c) is dict and 'crc' in c:
523 # We can get duplicates because of imports
524 if crc == self.messages[name]['crc']:
528 def add_message(self, name, msgdef, typeonly=False):
529 if name in self.messages:
531 if self.duplicate_check_ok(name, msgdef):
533 raise ValueError('Duplicate message name: ' + name)
535 args = collections.OrderedDict()
536 argtypes = collections.OrderedDict()
541 for i, f in enumerate(msgdef):
542 if type(f) is dict and 'crc' in f:
543 msg['crc'] = f['crc']
547 if len(f) == 3 and f[2] == 0 and i != len(msgdef) - 2:
548 raise ValueError('Variable Length Array must be last: ' + name)
549 size, s = self.__struct(*f)
551 if type(s) == list and type(s[0]) == int and type(s[1]) == struct.Struct:
553 sizes[field_name] = size
555 sizes[field_name] = size
556 total_size += s[0] * size
558 sizes[field_name] = size
561 argtypes[field_name] = field_type
562 if len(f) == 4: # Find offset to # elements field
563 idx = list(args.keys()).index(f[3]) - i
564 args[field_name].append(idx)
565 fields.append(field_name)
566 msg['return_tuple'] = collections.namedtuple(name, fields,
568 self.messages[name] = msg
569 self.messages[name]['args'] = args
570 self.messages[name]['argtypes'] = argtypes
571 self.messages[name]['typeonly'] = typeonly
572 self.messages[name]['sizes'] = [total_size, sizes]
573 return self.messages[name]
575 def add_type(self, name, typedef):
576 return self.add_message('vl_api_' + name + '_t', typedef,
579 def make_function(self, name, i, msgdef, multipart, async):
581 f = lambda **kwargs: (self._call_vpp_async(i, msgdef, **kwargs))
583 f = lambda **kwargs: (self._call_vpp(i, msgdef, multipart,
585 args = self.messages[name]['args']
586 argtypes = self.messages[name]['argtypes']
587 f.__name__ = str(name)
588 f.__doc__ = ", ".join(["%s %s" %
589 (argtypes[k], k) for k in args.keys()])
594 if not hasattr(self, "_api"):
595 raise Exception("Not connected, api definitions not available")
598 def _register_functions(self, async=False):
599 self.id_names = [None] * (self.vpp_dictionary_maxid + 1)
600 self.id_msgdef = [None] * (self.vpp_dictionary_maxid + 1)
602 for name, msgdef in vpp_iterator(self.messages):
603 if self.messages[name]['typeonly']:
605 crc = self.messages[name]['crc']
606 n = name + '_' + crc[2:]
607 i = vpp_api.vac_get_msg_index(n.encode())
609 self.id_msgdef[i] = msgdef
610 self.id_names[i] = name
611 multipart = True if name.find('_dump') > 0 else False
612 f = self.make_function(name, i, msgdef, multipart, async)
613 setattr(self._api, name, FuncWrapper(f))
616 'No such message type or failed CRC checksum: %s', n)
618 def _write_new_cffi(self, buf):
619 """Send a binary-packed message to VPP."""
620 if not self.connected:
621 raise IOError(1, 'Not connected')
622 return vpp_api.vac_write(ffi.from_buffer(buf), len(buf))
624 def _write_legacy_cffi(self, buf):
625 """Send a binary-packed message to VPP."""
626 if not self.connected:
627 raise IOError(1, 'Not connected')
628 return vpp_api.vac_write(str(buf), len(buf))
631 if not self.connected:
632 raise IOError(1, 'Not connected')
633 mem = ffi.new("char **")
634 size = ffi.new("int *")
635 rv = vpp_api.vac_read(mem, size, self.read_timeout)
637 raise IOError(rv, 'vac_read failed')
638 msg = bytes(ffi.buffer(mem[0], size[0]))
639 vpp_api.vac_free(mem[0])
642 def connect_internal(self, name, msg_handler, chroot_prefix, rx_qlen,
644 pfx = chroot_prefix.encode() if chroot_prefix else ffi.NULL
645 rv = vpp_api.vac_connect(name.encode(), pfx, msg_handler, rx_qlen)
647 raise IOError(2, 'Connect failed')
648 self.connected = True
650 self.vpp_dictionary_maxid = vpp_api.vac_msg_table_max_index()
651 self._register_functions(async=async)
653 # Initialise control ping
654 crc = self.messages['control_ping']['crc']
655 self.control_ping_index = vpp_api.vac_get_msg_index(
656 ('control_ping' + '_' + crc[2:]).encode())
657 self.control_ping_msgdef = self.messages['control_ping']
658 if self.async_thread:
659 self.event_thread = threading.Thread(
660 target=self.thread_msg_handler)
661 self.event_thread.daemon = True
662 self.event_thread.start()
665 def connect(self, name, chroot_prefix=None, async=False, rx_qlen=32):
668 name - the name of the client.
669 chroot_prefix - if VPP is chroot'ed, the prefix of the jail
670 async - if true, messages are sent without waiting for a reply
671 rx_qlen - the length of the VPP message receive queue between
674 msg_handler = vac_callback_sync if not async else vac_callback_async
675 return self.connect_internal(name, msg_handler, chroot_prefix, rx_qlen,
678 def connect_sync(self, name, chroot_prefix=None, rx_qlen=32):
679 """Attach to VPP in synchronous mode. Application must poll for events.
681 name - the name of the client.
682 chroot_prefix - if VPP is chroot'ed, the prefix of the jail
683 rx_qlen - the length of the VPP message receive queue between
687 return self.connect_internal(name, ffi.NULL, chroot_prefix, rx_qlen,
690 def disconnect(self):
691 """Detach from VPP."""
692 rv = vpp_api.vac_disconnect()
693 self.connected = False
694 self.message_queue.put("terminate event thread")
697 def msg_handler_sync(self, msg):
698 """Process an incoming message from VPP in sync mode.
700 The message may be a reply or it may be an async notification.
702 r = self.decode_incoming_msg(msg)
706 # If we have a context, then use the context to find any
707 # request waiting for a reply
709 if hasattr(r, 'context') and r.context > 0:
713 # No context -> async notification that we feed to the callback
714 self.message_queue.put_nowait(r)
716 raise IOError(2, 'RPC reply message received in event handler')
718 def decode_incoming_msg(self, msg):
720 self.logger.warning('vpp_api.read failed')
723 i, ci = self.header.unpack_from(msg, 0)
724 if self.id_names[i] == 'rx_thread_exit':
728 # Decode message and returns a tuple.
730 msgdef = self.id_msgdef[i]
732 raise IOError(2, 'Reply message undefined')
734 r = self.decode(msgdef, msg)
738 def msg_handler_async(self, msg):
739 """Process a message from VPP in async mode.
741 In async mode, all messages are returned to the callback.
743 r = self.decode_incoming_msg(msg)
747 msgname = type(r).__name__
749 if self.event_callback:
750 self.event_callback(msgname, r)
752 def _control_ping(self, context):
753 """Send a ping command."""
754 self._call_vpp_async(self.control_ping_index,
755 self.control_ping_msgdef,
758 def _call_vpp(self, i, msgdef, multipart, **kwargs):
759 """Given a message, send the message and await a reply.
761 msgdef - the message packing definition
762 i - the message type index
763 multipart - True if the message returns multiple
765 context - context number - chosen at random if not
767 The remainder of the kwargs are the arguments to the API call.
769 The return value is the message or message array containing
770 the response. It will raise an IOError exception if there was
771 no response within the timeout window.
774 if 'context' not in kwargs:
775 context = self.get_context()
776 kwargs['context'] = context
778 context = kwargs['context']
779 kwargs['_vl_msg_id'] = i
780 b = self.encode(msgdef, kwargs)
782 vpp_api.vac_rx_suspend()
786 # Send a ping after the request - we use its response
787 # to detect that we have seen all results.
788 self._control_ping(context)
790 # Block until we get a reply.
795 raise IOError(2, 'VPP API client: read failed')
797 r = self.decode_incoming_msg(msg)
798 msgname = type(r).__name__
799 if context not in r or r.context == 0 or context != r.context:
800 self.message_queue.put_nowait(r)
806 if msgname == 'control_ping_reply':
811 vpp_api.vac_rx_resume()
815 def _call_vpp_async(self, i, msgdef, **kwargs):
816 """Given a message, send the message and await a reply.
818 msgdef - the message packing definition
819 i - the message type index
820 context - context number - chosen at random if not
822 The remainder of the kwargs are the arguments to the API call.
824 if 'context' not in kwargs:
825 context = self.get_context()
826 kwargs['context'] = context
828 context = kwargs['context']
829 kwargs['_vl_msg_id'] = i
830 b = self.encode(msgdef, kwargs)
834 def register_event_callback(self, callback):
835 """Register a callback for async messages.
837 This will be called for async notifications in sync mode,
838 and all messages in async mode. In sync mode, replies to
839 requests will not come here.
841 callback is a fn(msg_type_name, msg_type) that will be
842 called when a message comes in. While this function is
843 executing, note that (a) you are in a background thread and
844 may wish to use threading.Lock to protect your datastructures,
845 and (b) message processing from VPP will stop (so if you take
846 a long while about it you may provoke reply timeouts or cause
847 VPP to fill the RX buffer). Passing None will disable the
850 self.event_callback = callback
852 def thread_msg_handler(self):
853 """Python thread calling the user registerd message handler.
855 This is to emulate the old style event callback scheme. Modern
856 clients should provide their own thread to poll the event
860 r = self.message_queue.get()
861 if r == "terminate event thread":
863 msgname = type(r).__name__
864 if self.event_callback:
865 self.event_callback(msgname, r)
868 # vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4