3 # Copyright (c) 2016 Cisco and/or its affiliates.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
17 from __future__ import print_function
30 if sys.version[0] == '2':
37 typedef void (*vac_callback_t)(unsigned char * data, int len);
38 typedef void (*vac_error_callback_t)(void *, unsigned char *, int);
39 int vac_connect(char * name, char * chroot_prefix, vac_callback_t cb,
41 int vac_disconnect(void);
42 int vac_read(char **data, int *l, unsigned short timeout);
43 int vac_write(char *data, int len);
44 void vac_free(void * msg);
46 int vac_get_msg_index(unsigned char * name);
47 int vac_msg_table_size(void);
48 int vac_msg_table_max_index(void);
50 void vac_rx_suspend (void);
51 void vac_rx_resume (void);
52 void vac_set_error_handler(vac_error_callback_t);
55 # Barfs on failure, no need to check success.
56 vpp_api = ffi.dlopen('libvppapiclient.so')
59 """Clean up VPP connection on shutdown."""
61 self.logger.debug('Cleaning up VPP on exit')
68 if sys.version[0] == '2':
74 @ffi.callback("void(unsigned char *, int)")
75 def vac_callback_sync(data, len):
76 vpp_object.msg_handler_sync(ffi.buffer(data, len))
79 @ffi.callback("void(unsigned char *, int)")
80 def vac_callback_async(data, len):
81 vpp_object.msg_handler_async(ffi.buffer(data, len))
84 @ffi.callback("void(void *, unsigned char *, int)")
85 def vac_error_handler(arg, msg, msg_len):
86 vpp_object.logger.warning("VPP API client:: %s", ffi.string(msg, msg_len))
93 class FuncWrapper(object):
94 def __init__(self, func):
96 self.__name__ = func.__name__
98 def __call__(self, **kwargs):
99 return self._func(**kwargs)
105 This class provides the APIs to VPP. The APIs are loaded
106 from provided .api.json files and makes functions accordingly.
107 These functions are documented in the VPP .api files, as they
108 are dynamically created.
110 Additionally, VPP can send callback messages; this class
111 provides a means to register a callback function to receive
112 these messages in a background thread.
114 def __init__(self, apifiles=None, testmode=False, async_thread=True,
115 logger=logging.getLogger('vpp_papi'), loglevel='debug', read_timeout=0):
116 """Create a VPP API object.
118 apifiles is a list of files containing API
119 descriptions that will be loaded - methods will be
120 dynamically created reflecting these APIs. If not
121 provided this will load the API files from VPP's
122 default install location.
127 logging.basicConfig(level=getattr(logging, loglevel.upper()))
132 self.connected = False
133 self.header = struct.Struct('>HI')
135 self.event_callback = None
136 self.message_queue = queue.Queue()
137 self.read_timeout = read_timeout
138 self.vpp_api = vpp_api
140 self.event_thread = threading.Thread(
141 target=self.thread_msg_handler)
142 self.event_thread.daemon = True
143 self.event_thread.start()
146 # Pick up API definitions from default directory
148 apifiles = self.find_api_files()
150 # In test mode we don't care that we can't find the API files
156 for file in apifiles:
157 with open(file) as apidef_file:
158 api = json.load(apidef_file)
159 for t in api['types']:
160 self.add_type(t[0], t[1:])
162 for m in api['messages']:
163 self.add_message(m[0], m[1:])
164 self.apifiles = apifiles
167 if len(self.messages) == 0 and not testmode:
168 raise ValueError(1, 'Missing JSON message definitions')
170 # Make sure we allow VPP to clean up the message rings.
171 atexit.register(vpp_atexit, self)
173 # Register error handler
174 vpp_api.vac_set_error_handler(vac_error_handler)
176 # Support legacy CFFI
177 # from_buffer supported from 1.8.0
178 (major, minor, patch) = [int(s) for s in cffi.__version__.split('.', 3)]
179 if major >= 1 and minor >= 8:
180 self._write = self._write_new_cffi
182 self._write = self._write_legacy_cffi
184 class ContextId(object):
185 """Thread-safe provider of unique context IDs."""
188 self.lock = threading.Lock()
191 """Get a new unique (or, at least, not recently used) context."""
195 get_context = ContextId()
198 def find_api_dir(cls):
199 """Attempt to find the best directory in which API definition
200 files may reside. If the value VPP_API_DIR exists in the environment
201 then it is first on the search list. If we're inside a recognized
202 location in a VPP source tree (src/scripts and src/vpp-api/python)
203 then entries from there to the likely locations in build-root are
204 added. Finally the location used by system packages is added.
206 :returns: A single directory name, or None if no such directory
211 if 'VPP_API_DIR' in os.environ:
212 dirs.append(os.environ['VPP_API_DIR'])
214 # perhaps we're in the 'src/scripts' or 'src/vpp-api/python' dir;
215 # in which case, plot a course to likely places in the src tree
216 import __main__ as main
217 if hasattr(main, '__file__'):
218 # get the path of the calling script
219 localdir = os.path.dirname(os.path.realpath(main.__file__))
221 # use cwd if there is no calling script
223 localdir_s = localdir.split(os.path.sep)
226 """Match dir against right-hand components of the script dir"""
227 d = dir.split('/') # param 'dir' assumes a / separator
229 return len(localdir_s) > l and localdir_s[-l:] == d
231 def sdir(srcdir, variant):
232 """Build a path from srcdir to the staged API files of
233 'variant' (typically '' or '_debug')"""
234 # Since 'core' and 'plugin' files are staged
235 # in separate directories, we target the parent dir.
236 return os.path.sep.join((
239 'install-vpp%s-native' % variant,
247 if dmatch('src/scripts'):
248 srcdir = os.path.sep.join(localdir_s[:-2])
249 elif dmatch('src/vpp-api/python'):
250 srcdir = os.path.sep.join(localdir_s[:-3])
252 # we're apparently running tests
253 srcdir = os.path.sep.join(localdir_s[:-1])
256 # we're in the source tree, try both the debug and release
258 x = 'vpp/share/vpp/api'
259 dirs.append(sdir(srcdir, '_debug'))
260 dirs.append(sdir(srcdir, ''))
262 # Test for staged copies of the scripts
263 # For these, since we explicitly know if we're running a debug versus
264 # release variant, target only the relevant directory
265 if dmatch('build-root/install-vpp_debug-native/vpp/bin'):
266 srcdir = os.path.sep.join(localdir_s[:-4])
267 dirs.append(sdir(srcdir, '_debug'))
268 if dmatch('build-root/install-vpp-native/vpp/bin'):
269 srcdir = os.path.sep.join(localdir_s[:-4])
270 dirs.append(sdir(srcdir, ''))
272 # finally, try the location system packages typically install into
273 dirs.append(os.path.sep.join(('', 'usr', 'share', 'vpp', 'api')))
275 # check the directories for existance; first one wins
277 if os.path.isdir(dir):
283 def find_api_files(cls, api_dir=None, patterns='*'):
284 """Find API definition files from the given directory tree with the
285 given pattern. If no directory is given then find_api_dir() is used
286 to locate one. If no pattern is given then all definition files found
287 in the directory tree are used.
289 :param api_dir: A directory tree in which to locate API definition
290 files; subdirectories are descended into.
291 If this is None then find_api_dir() is called to discover it.
292 :param patterns: A list of patterns to use in each visited directory
293 when looking for files.
294 This can be a list/tuple object or a comma-separated string of
295 patterns. Each value in the list will have leading/trialing
297 The pattern specifies the first part of the filename, '.api.json'
299 The results are de-duplicated, thus overlapping patterns are fine.
300 If this is None it defaults to '*' meaning "all API files".
301 :returns: A list of file paths for the API files found.
304 api_dir = cls.find_api_dir()
306 raise RuntimeError("api_dir cannot be located")
308 if isinstance(patterns, list) or isinstance(patterns, tuple):
309 patterns = [p.strip() + '.api.json' for p in patterns]
311 patterns = [p.strip() + '.api.json' for p in patterns.split(",")]
314 for root, dirnames, files in os.walk(api_dir):
315 # iterate all given patterns and de-dup the result
316 files = set(sum([fnmatch.filter(files, p) for p in patterns], []))
317 for filename in files:
318 api_files.append(os.path.join(root, filename))
323 """Debug function: report current VPP API status to stdout."""
324 print('Connected') if self.connected else print('Not Connected')
325 print('Read API definitions from', ', '.join(self.apifiles))
327 def __struct(self, t, n=None, e=-1, vl=None):
328 """Create a packing structure for a message."""
329 base_types = {'u8': 'B',
339 if e > 0 and t == 'u8':
341 s = struct.Struct('>' + str(e) + 's')
344 # Fixed array of base type
345 s = struct.Struct('>' + base_types[t])
346 return s.size, [e, s]
348 # Old style variable array
349 s = struct.Struct('>' + base_types[t])
350 return s.size, [-1, s]
352 # Variable length array
354 s = struct.Struct('>s')
355 return s.size, [vl, s]
357 s = struct.Struct('>' + base_types[t])
358 return s.size, [vl, s]
360 s = struct.Struct('>' + base_types[t])
363 if t in self.messages:
364 size = self.messages[t]['sizes'][0]
366 # Return a list in case of array
368 return size, [e, lambda self, encode, buf, offset, args: (
369 self.__struct_type(encode, self.messages[t], buf, offset,
372 return size, [vl, lambda self, encode, buf, offset, args: (
373 self.__struct_type(encode, self.messages[t], buf, offset,
377 raise NotImplementedError(1,
378 'No support for compound types ' + t)
379 return size, lambda self, encode, buf, offset, args: (
380 self.__struct_type(encode, self.messages[t], buf, offset, args)
383 raise ValueError(1, 'Invalid message type: ' + t)
385 def __struct_type(self, encode, msgdef, buf, offset, kwargs):
386 """Get a message packer or unpacker."""
388 return self.__struct_type_encode(msgdef, buf, offset, kwargs)
390 return self.__struct_type_decode(msgdef, buf, offset)
392 def __struct_type_encode(self, msgdef, buf, offset, kwargs):
397 if k not in msgdef['args']:
398 raise ValueError(1,'Non existing argument [' + k + ']' + \
399 ' used in call to: ' + \
400 self.id_names[kwargs['_vl_msg_id']] + '()' )
402 for k, v in vpp_iterator(msgdef['args']):
407 e = kwargs[v[0]] if v[0] in kwargs else v[0]
408 if e != len(kwargs[k]):
409 raise (ValueError(1, 'Input list length mismatch: %s (%s != %s)' % (k, e, len(kwargs[k]))))
412 size += v[1](self, True, buf, off + size,
417 if l != len(kwargs[k]):
418 raise ValueError(1, 'Input list length mismatch: %s (%s != %s)' % (k, l, len(kwargs[k])))
422 buf[off:off + l] = bytearray(kwargs[k])
427 v[1].pack_into(buf, off + size, i)
431 size = v(self, True, buf, off, kwargs[k])
433 if type(kwargs[k]) is str and v.size < len(kwargs[k]):
434 raise ValueError(1, 'Input list length mismatch: %s (%s < %s)' % (k, v.size, len(kwargs[k])))
435 v.pack_into(buf, off, kwargs[k])
438 size = v.size if not type(v) is list else 0
440 return off + size - offset
442 def __getitem__(self, name):
443 if name in self.messages:
444 return self.messages[name]
447 def get_size(self, sizes, kwargs):
448 total_size = sizes[0]
450 if e in kwargs and type(kwargs[e]) is list:
451 total_size += len(kwargs[e]) * sizes[1][e]
454 def encode(self, msgdef, kwargs):
455 # Make suitably large buffer
456 size = self.get_size(msgdef['sizes'], kwargs)
457 buf = bytearray(size)
459 size = self.__struct_type(True, msgdef, buf, offset, kwargs)
460 return buf[:offset + size]
462 def decode(self, msgdef, buf):
463 return self.__struct_type(False, msgdef, buf, 0, None)[1]
465 def __struct_type_decode(self, msgdef, buf, offset):
469 for k, v in vpp_iterator(msgdef['args']):
473 if callable(v[1]): # compound type
475 if v[0] in msgdef['args']: # vla
481 (s, l) = v[1](self, False, buf, off + size, None)
486 if type(v[0]) is int:
487 size = len(buf) - off
490 res.append(buf[off:off + size])
492 e = v[0] if type(v[0]) is int else res[v[2]]
494 e = (len(buf) - off) / v[1].size
499 lst.append(v[1].unpack_from(buf, off + size)[0])
504 (s, l) = v(self, False, buf, off, None)
508 res.append(v.unpack_from(buf, off)[0])
511 return off + size - offset, msgdef['return_tuple']._make(res)
513 def ret_tup(self, name):
514 if name in self.messages and 'return_tuple' in self.messages[name]:
515 return self.messages[name]['return_tuple']
518 def add_message(self, name, msgdef, typeonly=False):
519 if name in self.messages:
520 raise ValueError('Duplicate message name: ' + name)
522 args = collections.OrderedDict()
523 argtypes = collections.OrderedDict()
528 for i, f in enumerate(msgdef):
529 if type(f) is dict and 'crc' in f:
530 msg['crc'] = f['crc']
534 if len(f) == 3 and f[2] == 0 and i != len(msgdef) - 2:
535 raise ValueError('Variable Length Array must be last: ' + name)
536 size, s = self.__struct(*f)
538 if type(s) == list and type(s[0]) == int and type(s[1]) == struct.Struct:
540 sizes[field_name] = size
542 sizes[field_name] = size
543 total_size += s[0] * size
545 sizes[field_name] = size
548 argtypes[field_name] = field_type
549 if len(f) == 4: # Find offset to # elements field
550 idx = list(args.keys()).index(f[3]) - i
551 args[field_name].append(idx)
552 fields.append(field_name)
553 msg['return_tuple'] = collections.namedtuple(name, fields,
555 self.messages[name] = msg
556 self.messages[name]['args'] = args
557 self.messages[name]['argtypes'] = argtypes
558 self.messages[name]['typeonly'] = typeonly
559 self.messages[name]['sizes'] = [total_size, sizes]
560 return self.messages[name]
562 def add_type(self, name, typedef):
563 return self.add_message('vl_api_' + name + '_t', typedef,
566 def make_function(self, name, i, msgdef, multipart, async):
568 f = lambda **kwargs: (self._call_vpp_async(i, msgdef, **kwargs))
570 f = lambda **kwargs: (self._call_vpp(i, msgdef, multipart,
572 args = self.messages[name]['args']
573 argtypes = self.messages[name]['argtypes']
574 f.__name__ = str(name)
575 f.__doc__ = ", ".join(["%s %s" %
576 (argtypes[k], k) for k in args.keys()])
581 if not hasattr(self, "_api"):
582 raise Exception("Not connected, api definitions not available")
585 def _register_functions(self, async=False):
586 self.id_names = [None] * (self.vpp_dictionary_maxid + 1)
587 self.id_msgdef = [None] * (self.vpp_dictionary_maxid + 1)
589 for name, msgdef in vpp_iterator(self.messages):
590 if self.messages[name]['typeonly']:
592 crc = self.messages[name]['crc']
593 n = name + '_' + crc[2:]
594 i = vpp_api.vac_get_msg_index(n.encode())
596 self.id_msgdef[i] = msgdef
597 self.id_names[i] = name
598 multipart = True if name.find('_dump') > 0 else False
599 f = self.make_function(name, i, msgdef, multipart, async)
600 setattr(self._api, name, FuncWrapper(f))
602 # old API stuff starts here - will be removed in 17.07
603 if hasattr(self, name):
605 3, "Conflicting name in JSON definition: `%s'" % name)
606 setattr(self, name, f)
607 # old API stuff ends here
610 'No such message type or failed CRC checksum: %s', n)
612 def _write_new_cffi(self, buf):
613 """Send a binary-packed message to VPP."""
614 if not self.connected:
615 raise IOError(1, 'Not connected')
616 return vpp_api.vac_write(ffi.from_buffer(buf), len(buf))
618 def _write_legacy_cffi(self, buf):
619 """Send a binary-packed message to VPP."""
620 if not self.connected:
621 raise IOError(1, 'Not connected')
622 return vpp_api.vac_write(str(buf), len(buf))
625 if not self.connected:
626 raise IOError(1, 'Not connected')
627 mem = ffi.new("char **")
628 size = ffi.new("int *")
629 rv = vpp_api.vac_read(mem, size, self.read_timeout)
631 raise IOError(rv, 'vac_read failed')
632 msg = bytes(ffi.buffer(mem[0], size[0]))
633 vpp_api.vac_free(mem[0])
636 def connect_internal(self, name, msg_handler, chroot_prefix, rx_qlen,
638 pfx = chroot_prefix.encode() if chroot_prefix else ffi.NULL
639 rv = vpp_api.vac_connect(name.encode(), pfx, msg_handler, rx_qlen)
641 raise IOError(2, 'Connect failed')
642 self.connected = True
644 self.vpp_dictionary_maxid = vpp_api.vac_msg_table_max_index()
645 self._register_functions(async=async)
647 # Initialise control ping
648 crc = self.messages['control_ping']['crc']
649 self.control_ping_index = vpp_api.vac_get_msg_index(
650 ('control_ping' + '_' + crc[2:]).encode())
651 self.control_ping_msgdef = self.messages['control_ping']
654 def connect(self, name, chroot_prefix=None, async=False, rx_qlen=32):
657 name - the name of the client.
658 chroot_prefix - if VPP is chroot'ed, the prefix of the jail
659 async - if true, messages are sent without waiting for a reply
660 rx_qlen - the length of the VPP message receive queue between
663 msg_handler = vac_callback_sync if not async else vac_callback_async
664 return self.connect_internal(name, msg_handler, chroot_prefix, rx_qlen,
667 def connect_sync(self, name, chroot_prefix=None, rx_qlen=32):
668 """Attach to VPP in synchronous mode. Application must poll for events.
670 name - the name of the client.
671 chroot_prefix - if VPP is chroot'ed, the prefix of the jail
672 rx_qlen - the length of the VPP message receive queue between
676 return self.connect_internal(name, ffi.NULL, chroot_prefix, rx_qlen,
679 def disconnect(self):
680 """Detach from VPP."""
681 rv = vpp_api.vac_disconnect()
682 self.connected = False
685 def msg_handler_sync(self, msg):
686 """Process an incoming message from VPP in sync mode.
688 The message may be a reply or it may be an async notification.
690 r = self.decode_incoming_msg(msg)
694 # If we have a context, then use the context to find any
695 # request waiting for a reply
697 if hasattr(r, 'context') and r.context > 0:
700 msgname = type(r).__name__
703 # No context -> async notification that we feed to the callback
704 self.message_queue.put_nowait(r)
706 raise IOError(2, 'RPC reply message received in event handler')
708 def decode_incoming_msg(self, msg):
710 self.logger.warning('vpp_api.read failed')
713 i, ci = self.header.unpack_from(msg, 0)
714 if self.id_names[i] == 'rx_thread_exit':
718 # Decode message and returns a tuple.
720 msgdef = self.id_msgdef[i]
722 raise IOError(2, 'Reply message undefined')
724 r = self.decode(msgdef, msg)
728 def msg_handler_async(self, msg):
729 """Process a message from VPP in async mode.
731 In async mode, all messages are returned to the callback.
733 r = self.decode_incoming_msg(msg)
737 msgname = type(r).__name__
739 if self.event_callback:
740 self.event_callback(msgname, r)
742 def _control_ping(self, context):
743 """Send a ping command."""
744 self._call_vpp_async(self.control_ping_index,
745 self.control_ping_msgdef,
748 def _call_vpp(self, i, msgdef, multipart, **kwargs):
749 """Given a message, send the message and await a reply.
751 msgdef - the message packing definition
752 i - the message type index
753 multipart - True if the message returns multiple
755 context - context number - chosen at random if not
757 The remainder of the kwargs are the arguments to the API call.
759 The return value is the message or message array containing
760 the response. It will raise an IOError exception if there was
761 no response within the timeout window.
764 if 'context' not in kwargs:
765 context = self.get_context()
766 kwargs['context'] = context
768 context = kwargs['context']
769 kwargs['_vl_msg_id'] = i
770 b = self.encode(msgdef, kwargs)
772 vpp_api.vac_rx_suspend()
776 # Send a ping after the request - we use its response
777 # to detect that we have seen all results.
778 self._control_ping(context)
780 # Block until we get a reply.
785 raise IOError(2, 'VPP API client: read failed')
787 r = self.decode_incoming_msg(msg)
788 msgname = type(r).__name__
789 if context not in r or r.context == 0 or context != r.context:
790 self.message_queue.put_nowait(r)
796 if msgname == 'control_ping_reply':
801 vpp_api.vac_rx_resume()
805 def _call_vpp_async(self, i, msgdef, **kwargs):
806 """Given a message, send the message and await a reply.
808 msgdef - the message packing definition
809 i - the message type index
810 context - context number - chosen at random if not
812 The remainder of the kwargs are the arguments to the API call.
814 if 'context' not in kwargs:
815 context = self.get_context()
816 kwargs['context'] = context
818 context = kwargs['context']
819 kwargs['_vl_msg_id'] = i
820 b = self.encode(msgdef, kwargs)
824 def register_event_callback(self, callback):
825 """Register a callback for async messages.
827 This will be called for async notifications in sync mode,
828 and all messages in async mode. In sync mode, replies to
829 requests will not come here.
831 callback is a fn(msg_type_name, msg_type) that will be
832 called when a message comes in. While this function is
833 executing, note that (a) you are in a background thread and
834 may wish to use threading.Lock to protect your datastructures,
835 and (b) message processing from VPP will stop (so if you take
836 a long while about it you may provoke reply timeouts or cause
837 VPP to fill the RX buffer). Passing None will disable the
840 self.event_callback = callback
842 def thread_msg_handler(self):
843 """Python thread calling the user registerd message handler.
845 This is to emulate the old style event callback scheme. Modern
846 clients should provide their own thread to poll the event
850 r = self.message_queue.get()
851 msgname = type(r).__name__
852 if self.event_callback:
853 self.event_callback(msgname, r)
856 # vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4