3 # Copyright (c) 2016 Cisco and/or its affiliates.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
17 from __future__ import print_function
31 if sys.version[0] == '2':
38 typedef void (*vac_callback_t)(unsigned char * data, int len);
39 typedef void (*vac_error_callback_t)(void *, unsigned char *, int);
40 int vac_connect(char * name, char * chroot_prefix, vac_callback_t cb,
42 int vac_disconnect(void);
43 int vac_read(char **data, int *l, unsigned short timeout);
44 int vac_write(char *data, int len);
45 void vac_free(void * msg);
47 int vac_get_msg_index(unsigned char * name);
48 int vac_msg_table_size(void);
49 int vac_msg_table_max_index(void);
51 void vac_rx_suspend (void);
52 void vac_rx_resume (void);
53 void vac_set_error_handler(vac_error_callback_t);
56 # Barfs on failure, no need to check success.
57 vpp_api = ffi.dlopen('libvppapiclient.so')
59 def vpp_atexit(vpp_weakref):
60 """Clean up VPP connection on shutdown."""
61 vpp_instance = vpp_weakref()
62 if vpp_instance.connected:
63 vpp_instance.logger.debug('Cleaning up VPP on exit')
64 vpp_instance.disconnect()
70 if sys.version[0] == '2':
76 @ffi.callback("void(unsigned char *, int)")
77 def vac_callback_sync(data, len):
78 vpp_object.msg_handler_sync(ffi.buffer(data, len))
81 @ffi.callback("void(unsigned char *, int)")
82 def vac_callback_async(data, len):
83 vpp_object.msg_handler_async(ffi.buffer(data, len))
86 @ffi.callback("void(void *, unsigned char *, int)")
87 def vac_error_handler(arg, msg, msg_len):
88 vpp_object.logger.warning("VPP API client:: %s", ffi.string(msg, msg_len))
95 class FuncWrapper(object):
96 def __init__(self, func):
98 self.__name__ = func.__name__
100 def __call__(self, **kwargs):
101 return self._func(**kwargs)
107 This class provides the APIs to VPP. The APIs are loaded
108 from provided .api.json files and makes functions accordingly.
109 These functions are documented in the VPP .api files, as they
110 are dynamically created.
112 Additionally, VPP can send callback messages; this class
113 provides a means to register a callback function to receive
114 these messages in a background thread.
116 def __init__(self, apifiles=None, testmode=False, async_thread=True,
117 logger=logging.getLogger('vpp_papi'), loglevel='debug', read_timeout=0):
118 """Create a VPP API object.
120 apifiles is a list of files containing API
121 descriptions that will be loaded - methods will be
122 dynamically created reflecting these APIs. If not
123 provided this will load the API files from VPP's
124 default install location.
129 logging.basicConfig(level=getattr(logging, loglevel.upper()))
134 self.connected = False
135 self.header = struct.Struct('>HI')
137 self.event_callback = None
138 self.message_queue = queue.Queue()
139 self.read_timeout = read_timeout
140 self.vpp_api = vpp_api
141 self.async_thread = async_thread
144 # Pick up API definitions from default directory
146 apifiles = self.find_api_files()
148 # In test mode we don't care that we can't find the API files
154 for file in apifiles:
155 with open(file) as apidef_file:
156 api = json.load(apidef_file)
157 for t in api['types']:
158 self.add_type(t[0], t[1:])
160 for m in api['messages']:
161 self.add_message(m[0], m[1:])
162 self.apifiles = apifiles
165 if len(self.messages) == 0 and not testmode:
166 raise ValueError(1, 'Missing JSON message definitions')
168 # Make sure we allow VPP to clean up the message rings.
169 atexit.register(vpp_atexit, weakref.ref(self))
171 # Register error handler
172 vpp_api.vac_set_error_handler(vac_error_handler)
174 # Support legacy CFFI
175 # from_buffer supported from 1.8.0
176 (major, minor, patch) = [int(s) for s in cffi.__version__.split('.', 3)]
177 if major >= 1 and minor >= 8:
178 self._write = self._write_new_cffi
180 self._write = self._write_legacy_cffi
182 class ContextId(object):
183 """Thread-safe provider of unique context IDs."""
186 self.lock = threading.Lock()
189 """Get a new unique (or, at least, not recently used) context."""
193 get_context = ContextId()
196 def find_api_dir(cls):
197 """Attempt to find the best directory in which API definition
198 files may reside. If the value VPP_API_DIR exists in the environment
199 then it is first on the search list. If we're inside a recognized
200 location in a VPP source tree (src/scripts and src/vpp-api/python)
201 then entries from there to the likely locations in build-root are
202 added. Finally the location used by system packages is added.
204 :returns: A single directory name, or None if no such directory
209 if 'VPP_API_DIR' in os.environ:
210 dirs.append(os.environ['VPP_API_DIR'])
212 # perhaps we're in the 'src/scripts' or 'src/vpp-api/python' dir;
213 # in which case, plot a course to likely places in the src tree
214 import __main__ as main
215 if hasattr(main, '__file__'):
216 # get the path of the calling script
217 localdir = os.path.dirname(os.path.realpath(main.__file__))
219 # use cwd if there is no calling script
221 localdir_s = localdir.split(os.path.sep)
224 """Match dir against right-hand components of the script dir"""
225 d = dir.split('/') # param 'dir' assumes a / separator
227 return len(localdir_s) > l and localdir_s[-l:] == d
229 def sdir(srcdir, variant):
230 """Build a path from srcdir to the staged API files of
231 'variant' (typically '' or '_debug')"""
232 # Since 'core' and 'plugin' files are staged
233 # in separate directories, we target the parent dir.
234 return os.path.sep.join((
237 'install-vpp%s-native' % variant,
245 if dmatch('src/scripts'):
246 srcdir = os.path.sep.join(localdir_s[:-2])
247 elif dmatch('src/vpp-api/python'):
248 srcdir = os.path.sep.join(localdir_s[:-3])
250 # we're apparently running tests
251 srcdir = os.path.sep.join(localdir_s[:-1])
254 # we're in the source tree, try both the debug and release
256 x = 'vpp/share/vpp/api'
257 dirs.append(sdir(srcdir, '_debug'))
258 dirs.append(sdir(srcdir, ''))
260 # Test for staged copies of the scripts
261 # For these, since we explicitly know if we're running a debug versus
262 # release variant, target only the relevant directory
263 if dmatch('build-root/install-vpp_debug-native/vpp/bin'):
264 srcdir = os.path.sep.join(localdir_s[:-4])
265 dirs.append(sdir(srcdir, '_debug'))
266 if dmatch('build-root/install-vpp-native/vpp/bin'):
267 srcdir = os.path.sep.join(localdir_s[:-4])
268 dirs.append(sdir(srcdir, ''))
270 # finally, try the location system packages typically install into
271 dirs.append(os.path.sep.join(('', 'usr', 'share', 'vpp', 'api')))
273 # check the directories for existance; first one wins
275 if os.path.isdir(dir):
281 def find_api_files(cls, api_dir=None, patterns='*'):
282 """Find API definition files from the given directory tree with the
283 given pattern. If no directory is given then find_api_dir() is used
284 to locate one. If no pattern is given then all definition files found
285 in the directory tree are used.
287 :param api_dir: A directory tree in which to locate API definition
288 files; subdirectories are descended into.
289 If this is None then find_api_dir() is called to discover it.
290 :param patterns: A list of patterns to use in each visited directory
291 when looking for files.
292 This can be a list/tuple object or a comma-separated string of
293 patterns. Each value in the list will have leading/trialing
295 The pattern specifies the first part of the filename, '.api.json'
297 The results are de-duplicated, thus overlapping patterns are fine.
298 If this is None it defaults to '*' meaning "all API files".
299 :returns: A list of file paths for the API files found.
302 api_dir = cls.find_api_dir()
304 raise RuntimeError("api_dir cannot be located")
306 if isinstance(patterns, list) or isinstance(patterns, tuple):
307 patterns = [p.strip() + '.api.json' for p in patterns]
309 patterns = [p.strip() + '.api.json' for p in patterns.split(",")]
312 for root, dirnames, files in os.walk(api_dir):
313 # iterate all given patterns and de-dup the result
314 files = set(sum([fnmatch.filter(files, p) for p in patterns], []))
315 for filename in files:
316 api_files.append(os.path.join(root, filename))
321 """Debug function: report current VPP API status to stdout."""
322 print('Connected') if self.connected else print('Not Connected')
323 print('Read API definitions from', ', '.join(self.apifiles))
325 def __struct(self, t, n=None, e=-1, vl=None):
326 """Create a packing structure for a message."""
327 base_types = {'u8': 'B',
337 if e > 0 and t == 'u8':
339 s = struct.Struct('>' + str(e) + 's')
342 # Fixed array of base type
343 s = struct.Struct('>' + base_types[t])
344 return s.size, [e, s]
346 # Old style variable array
347 s = struct.Struct('>' + base_types[t])
348 return s.size, [-1, s]
350 # Variable length array
352 s = struct.Struct('>s')
353 return s.size, [vl, s]
355 s = struct.Struct('>' + base_types[t])
356 return s.size, [vl, s]
358 s = struct.Struct('>' + base_types[t])
361 if t in self.messages:
362 size = self.messages[t]['sizes'][0]
364 # Return a list in case of array
366 return size, [e, lambda self, encode, buf, offset, args: (
367 self.__struct_type(encode, self.messages[t], buf, offset,
370 return size, [vl, lambda self, encode, buf, offset, args: (
371 self.__struct_type(encode, self.messages[t], buf, offset,
375 raise NotImplementedError(1,
376 'No support for compound types ' + t)
377 return size, lambda self, encode, buf, offset, args: (
378 self.__struct_type(encode, self.messages[t], buf, offset, args)
381 raise ValueError(1, 'Invalid message type: ' + t)
383 def __struct_type(self, encode, msgdef, buf, offset, kwargs):
384 """Get a message packer or unpacker."""
386 return self.__struct_type_encode(msgdef, buf, offset, kwargs)
388 return self.__struct_type_decode(msgdef, buf, offset)
390 def __struct_type_encode(self, msgdef, buf, offset, kwargs):
395 if k not in msgdef['args']:
396 raise ValueError(1,'Non existing argument [' + k + ']' + \
397 ' used in call to: ' + \
398 self.id_names[kwargs['_vl_msg_id']] + '()' )
400 for k, v in vpp_iterator(msgdef['args']):
405 e = kwargs[v[0]] if v[0] in kwargs else v[0]
406 if e != len(kwargs[k]):
407 raise (ValueError(1, 'Input list length mismatch: %s (%s != %s)' % (k, e, len(kwargs[k]))))
410 size += v[1](self, True, buf, off + size,
415 if l != len(kwargs[k]):
416 raise ValueError(1, 'Input list length mismatch: %s (%s != %s)' % (k, l, len(kwargs[k])))
420 buf[off:off + l] = bytearray(kwargs[k])
425 v[1].pack_into(buf, off + size, i)
429 size = v(self, True, buf, off, kwargs[k])
431 if type(kwargs[k]) is str and v.size < len(kwargs[k]):
432 raise ValueError(1, 'Input list length mismatch: %s (%s < %s)' % (k, v.size, len(kwargs[k])))
433 v.pack_into(buf, off, kwargs[k])
436 size = v.size if not type(v) is list else 0
438 return off + size - offset
440 def __getitem__(self, name):
441 if name in self.messages:
442 return self.messages[name]
445 def get_size(self, sizes, kwargs):
446 total_size = sizes[0]
448 if e in kwargs and type(kwargs[e]) is list:
449 total_size += len(kwargs[e]) * sizes[1][e]
452 def encode(self, msgdef, kwargs):
453 # Make suitably large buffer
454 size = self.get_size(msgdef['sizes'], kwargs)
455 buf = bytearray(size)
457 size = self.__struct_type(True, msgdef, buf, offset, kwargs)
458 return buf[:offset + size]
460 def decode(self, msgdef, buf):
461 return self.__struct_type(False, msgdef, buf, 0, None)[1]
463 def __struct_type_decode(self, msgdef, buf, offset):
467 for k, v in vpp_iterator(msgdef['args']):
471 if callable(v[1]): # compound type
473 if v[0] in msgdef['args']: # vla
479 (s, l) = v[1](self, False, buf, off + size, None)
484 if type(v[0]) is int:
485 size = len(buf) - off
488 res.append(buf[off:off + size])
490 e = v[0] if type(v[0]) is int else res[v[2]]
492 e = (len(buf) - off) / v[1].size
497 lst.append(v[1].unpack_from(buf, off + size)[0])
502 (s, l) = v(self, False, buf, off, None)
506 res.append(v.unpack_from(buf, off)[0])
509 return off + size - offset, msgdef['return_tuple']._make(res)
511 def ret_tup(self, name):
512 if name in self.messages and 'return_tuple' in self.messages[name]:
513 return self.messages[name]['return_tuple']
516 def duplicate_check_ok(self, name, msgdef):
519 if type(c) is dict and 'crc' in c:
523 # We can get duplicates because of imports
524 if crc == self.messages[name]['crc']:
528 def add_message(self, name, msgdef, typeonly=False):
529 if name in self.messages:
531 if self.duplicate_check_ok(name, msgdef):
533 raise ValueError('Duplicate message name: ' + name)
535 args = collections.OrderedDict()
536 argtypes = collections.OrderedDict()
541 for i, f in enumerate(msgdef):
542 if type(f) is dict and 'crc' in f:
543 msg['crc'] = f['crc']
547 if len(f) == 3 and f[2] == 0 and i != len(msgdef) - 2:
548 raise ValueError('Variable Length Array must be last: ' + name)
549 size, s = self.__struct(*f)
551 if type(s) == list and type(s[0]) == int and type(s[1]) == struct.Struct:
553 sizes[field_name] = size
555 sizes[field_name] = size
556 total_size += s[0] * size
558 sizes[field_name] = size
561 argtypes[field_name] = field_type
562 if len(f) == 4: # Find offset to # elements field
563 idx = list(args.keys()).index(f[3]) - i
564 args[field_name].append(idx)
565 fields.append(field_name)
566 msg['return_tuple'] = collections.namedtuple(name, fields,
568 self.messages[name] = msg
569 self.messages[name]['args'] = args
570 self.messages[name]['argtypes'] = argtypes
571 self.messages[name]['typeonly'] = typeonly
572 self.messages[name]['sizes'] = [total_size, sizes]
573 return self.messages[name]
575 def add_type(self, name, typedef):
576 return self.add_message('vl_api_' + name + '_t', typedef,
579 def make_function(self, name, i, msgdef, multipart, async):
581 f = lambda **kwargs: (self._call_vpp_async(i, msgdef, **kwargs))
583 f = lambda **kwargs: (self._call_vpp(i, msgdef, multipart,
585 args = self.messages[name]['args']
586 argtypes = self.messages[name]['argtypes']
587 f.__name__ = str(name)
588 f.__doc__ = ", ".join(["%s %s" %
589 (argtypes[k], k) for k in args.keys()])
594 if not hasattr(self, "_api"):
595 raise Exception("Not connected, api definitions not available")
598 def _register_functions(self, async=False):
599 self.id_names = [None] * (self.vpp_dictionary_maxid + 1)
600 self.id_msgdef = [None] * (self.vpp_dictionary_maxid + 1)
602 for name, msgdef in vpp_iterator(self.messages):
603 if self.messages[name]['typeonly']:
605 crc = self.messages[name]['crc']
606 n = name + '_' + crc[2:]
607 i = vpp_api.vac_get_msg_index(n.encode())
609 self.id_msgdef[i] = msgdef
610 self.id_names[i] = name
611 multipart = True if name.find('_dump') > 0 else False
612 f = self.make_function(name, i, msgdef, multipart, async)
613 setattr(self._api, name, FuncWrapper(f))
615 # old API stuff starts here - will be removed in 17.07
616 if hasattr(self, name):
618 3, "Conflicting name in JSON definition: `%s'" % name)
619 setattr(self, name, f)
620 # old API stuff ends here
623 'No such message type or failed CRC checksum: %s', n)
625 def _write_new_cffi(self, buf):
626 """Send a binary-packed message to VPP."""
627 if not self.connected:
628 raise IOError(1, 'Not connected')
629 return vpp_api.vac_write(ffi.from_buffer(buf), len(buf))
631 def _write_legacy_cffi(self, buf):
632 """Send a binary-packed message to VPP."""
633 if not self.connected:
634 raise IOError(1, 'Not connected')
635 return vpp_api.vac_write(str(buf), len(buf))
638 if not self.connected:
639 raise IOError(1, 'Not connected')
640 mem = ffi.new("char **")
641 size = ffi.new("int *")
642 rv = vpp_api.vac_read(mem, size, self.read_timeout)
644 raise IOError(rv, 'vac_read failed')
645 msg = bytes(ffi.buffer(mem[0], size[0]))
646 vpp_api.vac_free(mem[0])
649 def connect_internal(self, name, msg_handler, chroot_prefix, rx_qlen,
651 pfx = chroot_prefix.encode() if chroot_prefix else ffi.NULL
652 rv = vpp_api.vac_connect(name.encode(), pfx, msg_handler, rx_qlen)
654 raise IOError(2, 'Connect failed')
655 self.connected = True
657 self.vpp_dictionary_maxid = vpp_api.vac_msg_table_max_index()
658 self._register_functions(async=async)
660 # Initialise control ping
661 crc = self.messages['control_ping']['crc']
662 self.control_ping_index = vpp_api.vac_get_msg_index(
663 ('control_ping' + '_' + crc[2:]).encode())
664 self.control_ping_msgdef = self.messages['control_ping']
665 if self.async_thread:
666 self.event_thread = threading.Thread(
667 target=self.thread_msg_handler)
668 self.event_thread.daemon = True
669 self.event_thread.start()
672 def connect(self, name, chroot_prefix=None, async=False, rx_qlen=32):
675 name - the name of the client.
676 chroot_prefix - if VPP is chroot'ed, the prefix of the jail
677 async - if true, messages are sent without waiting for a reply
678 rx_qlen - the length of the VPP message receive queue between
681 msg_handler = vac_callback_sync if not async else vac_callback_async
682 return self.connect_internal(name, msg_handler, chroot_prefix, rx_qlen,
685 def connect_sync(self, name, chroot_prefix=None, rx_qlen=32):
686 """Attach to VPP in synchronous mode. Application must poll for events.
688 name - the name of the client.
689 chroot_prefix - if VPP is chroot'ed, the prefix of the jail
690 rx_qlen - the length of the VPP message receive queue between
694 return self.connect_internal(name, ffi.NULL, chroot_prefix, rx_qlen,
697 def disconnect(self):
698 """Detach from VPP."""
699 rv = vpp_api.vac_disconnect()
700 self.connected = False
701 self.message_queue.put("terminate event thread")
704 def msg_handler_sync(self, msg):
705 """Process an incoming message from VPP in sync mode.
707 The message may be a reply or it may be an async notification.
709 r = self.decode_incoming_msg(msg)
713 # If we have a context, then use the context to find any
714 # request waiting for a reply
716 if hasattr(r, 'context') and r.context > 0:
720 # No context -> async notification that we feed to the callback
721 self.message_queue.put_nowait(r)
723 raise IOError(2, 'RPC reply message received in event handler')
725 def decode_incoming_msg(self, msg):
727 self.logger.warning('vpp_api.read failed')
730 i, ci = self.header.unpack_from(msg, 0)
731 if self.id_names[i] == 'rx_thread_exit':
735 # Decode message and returns a tuple.
737 msgdef = self.id_msgdef[i]
739 raise IOError(2, 'Reply message undefined')
741 r = self.decode(msgdef, msg)
745 def msg_handler_async(self, msg):
746 """Process a message from VPP in async mode.
748 In async mode, all messages are returned to the callback.
750 r = self.decode_incoming_msg(msg)
754 msgname = type(r).__name__
756 if self.event_callback:
757 self.event_callback(msgname, r)
759 def _control_ping(self, context):
760 """Send a ping command."""
761 self._call_vpp_async(self.control_ping_index,
762 self.control_ping_msgdef,
765 def _call_vpp(self, i, msgdef, multipart, **kwargs):
766 """Given a message, send the message and await a reply.
768 msgdef - the message packing definition
769 i - the message type index
770 multipart - True if the message returns multiple
772 context - context number - chosen at random if not
774 The remainder of the kwargs are the arguments to the API call.
776 The return value is the message or message array containing
777 the response. It will raise an IOError exception if there was
778 no response within the timeout window.
781 if 'context' not in kwargs:
782 context = self.get_context()
783 kwargs['context'] = context
785 context = kwargs['context']
786 kwargs['_vl_msg_id'] = i
787 b = self.encode(msgdef, kwargs)
789 vpp_api.vac_rx_suspend()
793 # Send a ping after the request - we use its response
794 # to detect that we have seen all results.
795 self._control_ping(context)
797 # Block until we get a reply.
802 raise IOError(2, 'VPP API client: read failed')
804 r = self.decode_incoming_msg(msg)
805 msgname = type(r).__name__
806 if context not in r or r.context == 0 or context != r.context:
807 self.message_queue.put_nowait(r)
813 if msgname == 'control_ping_reply':
818 vpp_api.vac_rx_resume()
822 def _call_vpp_async(self, i, msgdef, **kwargs):
823 """Given a message, send the message and await a reply.
825 msgdef - the message packing definition
826 i - the message type index
827 context - context number - chosen at random if not
829 The remainder of the kwargs are the arguments to the API call.
831 if 'context' not in kwargs:
832 context = self.get_context()
833 kwargs['context'] = context
835 context = kwargs['context']
836 kwargs['_vl_msg_id'] = i
837 b = self.encode(msgdef, kwargs)
841 def register_event_callback(self, callback):
842 """Register a callback for async messages.
844 This will be called for async notifications in sync mode,
845 and all messages in async mode. In sync mode, replies to
846 requests will not come here.
848 callback is a fn(msg_type_name, msg_type) that will be
849 called when a message comes in. While this function is
850 executing, note that (a) you are in a background thread and
851 may wish to use threading.Lock to protect your datastructures,
852 and (b) message processing from VPP will stop (so if you take
853 a long while about it you may provoke reply timeouts or cause
854 VPP to fill the RX buffer). Passing None will disable the
857 self.event_callback = callback
859 def thread_msg_handler(self):
860 """Python thread calling the user registerd message handler.
862 This is to emulate the old style event callback scheme. Modern
863 clients should provide their own thread to poll the event
867 r = self.message_queue.get()
868 if r == "terminate event thread":
870 msgname = type(r).__name__
871 if self.event_callback:
872 self.event_callback(msgname, r)
875 # vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4