3 # Copyright (c) 2016 Cisco and/or its affiliates.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
17 from __future__ import print_function
31 if sys.version[0] == '2':
38 typedef void (*vac_callback_t)(unsigned char * data, int len);
39 typedef void (*vac_error_callback_t)(void *, unsigned char *, int);
40 int vac_connect(char * name, char * chroot_prefix, vac_callback_t cb,
42 int vac_disconnect(void);
43 int vac_read(char **data, int *l, unsigned short timeout);
44 int vac_write(char *data, int len);
45 void vac_free(void * msg);
47 int vac_get_msg_index(unsigned char * name);
48 int vac_msg_table_size(void);
49 int vac_msg_table_max_index(void);
51 void vac_rx_suspend (void);
52 void vac_rx_resume (void);
53 void vac_set_error_handler(vac_error_callback_t);
56 # Barfs on failure, no need to check success.
57 vpp_api = ffi.dlopen('libvppapiclient.so')
60 def vpp_atexit(vpp_weakref):
61 """Clean up VPP connection on shutdown."""
62 vpp_instance = vpp_weakref()
63 if vpp_instance.connected:
64 vpp_instance.logger.debug('Cleaning up VPP on exit')
65 vpp_instance.disconnect()
72 if sys.version[0] == '2':
78 @ffi.callback("void(unsigned char *, int)")
79 def vac_callback_sync(data, len):
80 vpp_object.msg_handler_sync(ffi.buffer(data, len))
83 @ffi.callback("void(unsigned char *, int)")
84 def vac_callback_async(data, len):
85 vpp_object.msg_handler_async(ffi.buffer(data, len))
88 @ffi.callback("void(void *, unsigned char *, int)")
89 def vac_error_handler(arg, msg, msg_len):
90 vpp_object.logger.warning("VPP API client:: %s", ffi.string(msg, msg_len))
97 class FuncWrapper(object):
98 def __init__(self, func):
100 self.__name__ = func.__name__
102 def __call__(self, **kwargs):
103 return self._func(**kwargs)
109 This class provides the APIs to VPP. The APIs are loaded
110 from provided .api.json files and makes functions accordingly.
111 These functions are documented in the VPP .api files, as they
112 are dynamically created.
114 Additionally, VPP can send callback messages; this class
115 provides a means to register a callback function to receive
116 these messages in a background thread.
118 def __init__(self, apifiles=None, testmode=False, async_thread=True,
119 logger=logging.getLogger('vpp_papi'), loglevel='debug',
121 """Create a VPP API object.
123 apifiles is a list of files containing API
124 descriptions that will be loaded - methods will be
125 dynamically created reflecting these APIs. If not
126 provided this will load the API files from VPP's
127 default install location.
132 logging.basicConfig(level=getattr(logging, loglevel.upper()))
137 self.connected = False
138 self.header = struct.Struct('>HI')
140 self.event_callback = None
141 self.message_queue = queue.Queue()
142 self.read_timeout = read_timeout
143 self.vpp_api = vpp_api
144 self.async_thread = async_thread
147 # Pick up API definitions from default directory
149 apifiles = self.find_api_files()
151 # In test mode we don't care that we can't find the API files
157 for file in apifiles:
158 with open(file) as apidef_file:
159 api = json.load(apidef_file)
160 for t in api['types']:
161 self.add_type(t[0], t[1:])
163 for m in api['messages']:
164 self.add_message(m[0], m[1:])
165 self.apifiles = apifiles
168 if len(self.messages) == 0 and not testmode:
169 raise ValueError(1, 'Missing JSON message definitions')
171 # Make sure we allow VPP to clean up the message rings.
172 atexit.register(vpp_atexit, weakref.ref(self))
174 # Register error handler
175 vpp_api.vac_set_error_handler(vac_error_handler)
177 # Support legacy CFFI
178 # from_buffer supported from 1.8.0
179 (major, minor, patch) = [int(s) for s in
180 cffi.__version__.split('.', 3)]
181 if major >= 1 and minor >= 8:
182 self._write = self._write_new_cffi
184 self._write = self._write_legacy_cffi
186 class ContextId(object):
187 """Thread-safe provider of unique context IDs."""
190 self.lock = threading.Lock()
193 """Get a new unique (or, at least, not recently used) context."""
197 get_context = ContextId()
200 def find_api_dir(cls):
201 """Attempt to find the best directory in which API definition
202 files may reside. If the value VPP_API_DIR exists in the environment
203 then it is first on the search list. If we're inside a recognized
204 location in a VPP source tree (src/scripts and src/vpp-api/python)
205 then entries from there to the likely locations in build-root are
206 added. Finally the location used by system packages is added.
208 :returns: A single directory name, or None if no such directory
213 if 'VPP_API_DIR' in os.environ:
214 dirs.append(os.environ['VPP_API_DIR'])
216 # perhaps we're in the 'src/scripts' or 'src/vpp-api/python' dir;
217 # in which case, plot a course to likely places in the src tree
218 import __main__ as main
219 if hasattr(main, '__file__'):
220 # get the path of the calling script
221 localdir = os.path.dirname(os.path.realpath(main.__file__))
223 # use cwd if there is no calling script
224 localdir = os.getcwd()
225 localdir_s = localdir.split(os.path.sep)
228 """Match dir against right-hand components of the script dir"""
229 d = dir.split('/') # param 'dir' assumes a / separator
231 return len(localdir_s) > length and localdir_s[-length:] == d
233 def sdir(srcdir, variant):
234 """Build a path from srcdir to the staged API files of
235 'variant' (typically '' or '_debug')"""
236 # Since 'core' and 'plugin' files are staged
237 # in separate directories, we target the parent dir.
238 return os.path.sep.join((
241 'install-vpp%s-native' % variant,
249 if dmatch('src/scripts'):
250 srcdir = os.path.sep.join(localdir_s[:-2])
251 elif dmatch('src/vpp-api/python'):
252 srcdir = os.path.sep.join(localdir_s[:-3])
254 # we're apparently running tests
255 srcdir = os.path.sep.join(localdir_s[:-1])
258 # we're in the source tree, try both the debug and release
260 dirs.append(sdir(srcdir, '_debug'))
261 dirs.append(sdir(srcdir, ''))
263 # Test for staged copies of the scripts
264 # For these, since we explicitly know if we're running a debug versus
265 # release variant, target only the relevant directory
266 if dmatch('build-root/install-vpp_debug-native/vpp/bin'):
267 srcdir = os.path.sep.join(localdir_s[:-4])
268 dirs.append(sdir(srcdir, '_debug'))
269 if dmatch('build-root/install-vpp-native/vpp/bin'):
270 srcdir = os.path.sep.join(localdir_s[:-4])
271 dirs.append(sdir(srcdir, ''))
273 # finally, try the location system packages typically install into
274 dirs.append(os.path.sep.join(('', 'usr', 'share', 'vpp', 'api')))
276 # check the directories for existance; first one wins
278 if os.path.isdir(dir):
284 def find_api_files(cls, api_dir=None, patterns='*'):
285 """Find API definition files from the given directory tree with the
286 given pattern. If no directory is given then find_api_dir() is used
287 to locate one. If no pattern is given then all definition files found
288 in the directory tree are used.
290 :param api_dir: A directory tree in which to locate API definition
291 files; subdirectories are descended into.
292 If this is None then find_api_dir() is called to discover it.
293 :param patterns: A list of patterns to use in each visited directory
294 when looking for files.
295 This can be a list/tuple object or a comma-separated string of
296 patterns. Each value in the list will have leading/trialing
298 The pattern specifies the first part of the filename, '.api.json'
300 The results are de-duplicated, thus overlapping patterns are fine.
301 If this is None it defaults to '*' meaning "all API files".
302 :returns: A list of file paths for the API files found.
305 api_dir = cls.find_api_dir()
307 raise RuntimeError("api_dir cannot be located")
309 if isinstance(patterns, list) or isinstance(patterns, tuple):
310 patterns = [p.strip() + '.api.json' for p in patterns]
312 patterns = [p.strip() + '.api.json' for p in patterns.split(",")]
315 for root, dirnames, files in os.walk(api_dir):
316 # iterate all given patterns and de-dup the result
317 files = set(sum([fnmatch.filter(files, p) for p in patterns], []))
318 for filename in files:
319 api_files.append(os.path.join(root, filename))
324 """Debug function: report current VPP API status to stdout."""
325 print('Connected') if self.connected else print('Not Connected')
326 print('Read API definitions from', ', '.join(self.apifiles))
328 def __struct(self, t, n=None, e=-1, vl=None):
329 """Create a packing structure for a message."""
330 base_types = {'u8': 'B',
339 if e > 0 and t == 'u8':
341 s = struct.Struct('>' + str(e) + 's')
344 # Fixed array of base type
345 s = struct.Struct('>' + base_types[t])
346 return s.size, [e, s]
348 # Old style variable array
349 s = struct.Struct('>' + base_types[t])
350 return s.size, [-1, s]
352 # Variable length array
354 s = struct.Struct('>s')
355 return s.size, [vl, s]
357 s = struct.Struct('>' + base_types[t])
358 return s.size, [vl, s]
360 s = struct.Struct('>' + base_types[t])
363 if t in self.messages:
364 size = self.messages[t]['sizes'][0]
366 # Return a list in case of array
368 return size, [e, lambda self, encode, buf, offset, args: (
369 self.__struct_type(encode, self.messages[t], buf, offset,
372 return size, [vl, lambda self, encode, buf, offset, args: (
373 self.__struct_type(encode, self.messages[t], buf, offset,
377 raise NotImplementedError(1,
378 'No support for compound types ' + t)
379 return size, lambda self, encode, buf, offset, args: (
380 self.__struct_type(encode, self.messages[t], buf, offset, args)
383 raise ValueError(1, 'Invalid message type: ' + t)
385 def __struct_type(self, encode, msgdef, buf, offset, kwargs):
386 """Get a message packer or unpacker."""
388 return self.__struct_type_encode(msgdef, buf, offset, kwargs)
390 return self.__struct_type_decode(msgdef, buf, offset)
392 def __struct_type_encode(self, msgdef, buf, offset, kwargs):
397 if k not in msgdef['args']:
398 raise ValueError(1, 'Non existing argument [' + k + ']' +
399 ' used in call to: ' +
400 self.id_names[kwargs['_vl_msg_id']] + '()')
402 for k, v in vpp_iterator(msgdef['args']):
407 e = kwargs[v[0]] if v[0] in kwargs else v[0]
408 if e != len(kwargs[k]):
410 'Input list length mismatch: '
412 (k, e, len(kwargs[k]))))
415 size += v[1](self, True, buf, off + size,
419 kwargslen = kwargs[v[0]]
420 if kwargslen != len(kwargs[k]):
422 'Input list length mismatch:'
427 kwargslen = len(kwargs[k])
429 buf[off:off + kwargslen] = bytearray(kwargs[k])
434 v[1].pack_into(buf, off + size, i)
438 size = v(self, True, buf, off, kwargs[k])
440 if type(kwargs[k]) is str and v.size < len(kwargs[k]):
442 'Input list length mismatch: '
444 (k, v.size, len(kwargs[k])))
445 v.pack_into(buf, off, kwargs[k])
448 size = v.size if not type(v) is list else 0
450 return off + size - offset
452 def __getitem__(self, name):
453 if name in self.messages:
454 return self.messages[name]
457 def get_size(self, sizes, kwargs):
458 total_size = sizes[0]
460 if e in kwargs and type(kwargs[e]) is list:
461 total_size += len(kwargs[e]) * sizes[1][e]
464 def encode(self, msgdef, kwargs):
465 # Make suitably large buffer
466 size = self.get_size(msgdef['sizes'], kwargs)
467 buf = bytearray(size)
469 size = self.__struct_type(True, msgdef, buf, offset, kwargs)
470 return buf[:offset + size]
472 def decode(self, msgdef, buf):
473 return self.__struct_type(False, msgdef, buf, 0, None)[1]
475 def __struct_type_decode(self, msgdef, buf, offset):
479 for k, v in vpp_iterator(msgdef['args']):
483 if callable(v[1]): # compound type
485 if v[0] in msgdef['args']: # vla
491 (s, l) = v[1](self, False, buf, off + size, None)
496 if type(v[0]) is int:
497 size = len(buf) - off
500 res.append(buf[off:off + size])
502 e = v[0] if type(v[0]) is int else res[v[2]]
504 e = (len(buf) - off) / v[1].size
509 lst.append(v[1].unpack_from(buf, off + size)[0])
514 (s, l) = v(self, False, buf, off, None)
518 res.append(v.unpack_from(buf, off)[0])
521 return off + size - offset, msgdef['return_tuple']._make(res)
523 def ret_tup(self, name):
524 if name in self.messages and 'return_tuple' in self.messages[name]:
525 return self.messages[name]['return_tuple']
528 def duplicate_check_ok(self, name, msgdef):
531 if type(c) is dict and 'crc' in c:
535 # We can get duplicates because of imports
536 if crc == self.messages[name]['crc']:
540 def add_message(self, name, msgdef, typeonly=False):
541 if name in self.messages:
543 if self.duplicate_check_ok(name, msgdef):
545 raise ValueError('Duplicate message name: ' + name)
547 args = collections.OrderedDict()
548 argtypes = collections.OrderedDict()
553 for i, f in enumerate(msgdef):
554 if type(f) is dict and 'crc' in f:
555 msg['crc'] = f['crc']
559 if len(f) == 3 and f[2] == 0 and i != len(msgdef) - 2:
560 raise ValueError('Variable Length Array must be last: ' + name)
561 size, s = self.__struct(*f)
563 if type(s) == list and type(s[0]) == int and \
564 type(s[1]) == struct.Struct:
566 sizes[field_name] = size
568 sizes[field_name] = size
569 total_size += s[0] * size
571 sizes[field_name] = size
574 argtypes[field_name] = field_type
575 if len(f) == 4: # Find offset to # elements field
576 idx = list(args.keys()).index(f[3]) - i
577 args[field_name].append(idx)
578 fields.append(field_name)
579 msg['return_tuple'] = collections.namedtuple(name, fields,
581 self.messages[name] = msg
582 self.messages[name]['args'] = args
583 self.messages[name]['argtypes'] = argtypes
584 self.messages[name]['typeonly'] = typeonly
585 self.messages[name]['sizes'] = [total_size, sizes]
586 return self.messages[name]
588 def add_type(self, name, typedef):
589 return self.add_message('vl_api_' + name + '_t', typedef,
592 def make_function(self, name, i, msgdef, multipart, async):
595 return self._call_vpp_async(i, msgdef, **kwargs)
598 return self._call_vpp(i, msgdef, multipart, **kwargs)
599 args = self.messages[name]['args']
600 argtypes = self.messages[name]['argtypes']
601 f.__name__ = str(name)
602 f.__doc__ = ", ".join(["%s %s" %
603 (argtypes[k], k) for k in args.keys()])
608 if not hasattr(self, "_api"):
609 raise Exception("Not connected, api definitions not available")
612 def _register_functions(self, async=False):
613 self.id_names = [None] * (self.vpp_dictionary_maxid + 1)
614 self.id_msgdef = [None] * (self.vpp_dictionary_maxid + 1)
616 for name, msgdef in vpp_iterator(self.messages):
617 if self.messages[name]['typeonly']:
619 crc = self.messages[name]['crc']
620 n = name + '_' + crc[2:]
621 i = vpp_api.vac_get_msg_index(n.encode())
623 self.id_msgdef[i] = msgdef
624 self.id_names[i] = name
625 multipart = True if name.find('_dump') > 0 else False
626 f = self.make_function(name, i, msgdef, multipart, async)
627 setattr(self._api, name, FuncWrapper(f))
630 'No such message type or failed CRC checksum: %s', n)
632 def _write_new_cffi(self, buf):
633 """Send a binary-packed message to VPP."""
634 if not self.connected:
635 raise IOError(1, 'Not connected')
636 return vpp_api.vac_write(ffi.from_buffer(buf), len(buf))
638 def _write_legacy_cffi(self, buf):
639 """Send a binary-packed message to VPP."""
640 if not self.connected:
641 raise IOError(1, 'Not connected')
642 return vpp_api.vac_write(bytes(buf), len(buf))
645 if not self.connected:
646 raise IOError(1, 'Not connected')
647 mem = ffi.new("char **")
648 size = ffi.new("int *")
649 rv = vpp_api.vac_read(mem, size, self.read_timeout)
651 raise IOError(rv, 'vac_read failed')
652 msg = bytes(ffi.buffer(mem[0], size[0]))
653 vpp_api.vac_free(mem[0])
656 def connect_internal(self, name, msg_handler, chroot_prefix, rx_qlen,
658 pfx = chroot_prefix.encode() if chroot_prefix else ffi.NULL
659 rv = vpp_api.vac_connect(name.encode(), pfx, msg_handler, rx_qlen)
661 raise IOError(2, 'Connect failed')
662 self.connected = True
664 self.vpp_dictionary_maxid = vpp_api.vac_msg_table_max_index()
665 self._register_functions(async=async)
667 # Initialise control ping
668 crc = self.messages['control_ping']['crc']
669 self.control_ping_index = vpp_api.vac_get_msg_index(
670 ('control_ping' + '_' + crc[2:]).encode())
671 self.control_ping_msgdef = self.messages['control_ping']
672 if self.async_thread:
673 self.event_thread = threading.Thread(
674 target=self.thread_msg_handler)
675 self.event_thread.daemon = True
676 self.event_thread.start()
679 def connect(self, name, chroot_prefix=None, async=False, rx_qlen=32):
682 name - the name of the client.
683 chroot_prefix - if VPP is chroot'ed, the prefix of the jail
684 async - if true, messages are sent without waiting for a reply
685 rx_qlen - the length of the VPP message receive queue between
688 msg_handler = vac_callback_sync if not async else vac_callback_async
689 return self.connect_internal(name, msg_handler, chroot_prefix, rx_qlen,
692 def connect_sync(self, name, chroot_prefix=None, rx_qlen=32):
693 """Attach to VPP in synchronous mode. Application must poll for events.
695 name - the name of the client.
696 chroot_prefix - if VPP is chroot'ed, the prefix of the jail
697 rx_qlen - the length of the VPP message receive queue between
701 return self.connect_internal(name, ffi.NULL, chroot_prefix, rx_qlen,
704 def disconnect(self):
705 """Detach from VPP."""
706 rv = vpp_api.vac_disconnect()
707 self.connected = False
708 self.message_queue.put("terminate event thread")
711 def msg_handler_sync(self, msg):
712 """Process an incoming message from VPP in sync mode.
714 The message may be a reply or it may be an async notification.
716 r = self.decode_incoming_msg(msg)
720 # If we have a context, then use the context to find any
721 # request waiting for a reply
723 if hasattr(r, 'context') and r.context > 0:
727 # No context -> async notification that we feed to the callback
728 self.message_queue.put_nowait(r)
730 raise IOError(2, 'RPC reply message received in event handler')
732 def decode_incoming_msg(self, msg):
734 self.logger.warning('vpp_api.read failed')
737 i, ci = self.header.unpack_from(msg, 0)
738 if self.id_names[i] == 'rx_thread_exit':
742 # Decode message and returns a tuple.
744 msgdef = self.id_msgdef[i]
746 raise IOError(2, 'Reply message undefined')
748 r = self.decode(msgdef, msg)
752 def msg_handler_async(self, msg):
753 """Process a message from VPP in async mode.
755 In async mode, all messages are returned to the callback.
757 r = self.decode_incoming_msg(msg)
761 msgname = type(r).__name__
763 if self.event_callback:
764 self.event_callback(msgname, r)
766 def _control_ping(self, context):
767 """Send a ping command."""
768 self._call_vpp_async(self.control_ping_index,
769 self.control_ping_msgdef,
772 def _call_vpp(self, i, msgdef, multipart, **kwargs):
773 """Given a message, send the message and await a reply.
775 msgdef - the message packing definition
776 i - the message type index
777 multipart - True if the message returns multiple
779 context - context number - chosen at random if not
781 The remainder of the kwargs are the arguments to the API call.
783 The return value is the message or message array containing
784 the response. It will raise an IOError exception if there was
785 no response within the timeout window.
788 if 'context' not in kwargs:
789 context = self.get_context()
790 kwargs['context'] = context
792 context = kwargs['context']
793 kwargs['_vl_msg_id'] = i
794 b = self.encode(msgdef, kwargs)
796 vpp_api.vac_rx_suspend()
800 # Send a ping after the request - we use its response
801 # to detect that we have seen all results.
802 self._control_ping(context)
804 # Block until we get a reply.
809 raise IOError(2, 'VPP API client: read failed')
811 r = self.decode_incoming_msg(msg)
812 msgname = type(r).__name__
813 if context not in r or r.context == 0 or context != r.context:
814 self.message_queue.put_nowait(r)
820 if msgname == 'control_ping_reply':
825 vpp_api.vac_rx_resume()
829 def _call_vpp_async(self, i, msgdef, **kwargs):
830 """Given a message, send the message and await a reply.
832 msgdef - the message packing definition
833 i - the message type index
834 context - context number - chosen at random if not
836 The remainder of the kwargs are the arguments to the API call.
838 if 'context' not in kwargs:
839 context = self.get_context()
840 kwargs['context'] = context
842 context = kwargs['context']
843 kwargs['_vl_msg_id'] = i
844 b = self.encode(msgdef, kwargs)
848 def register_event_callback(self, callback):
849 """Register a callback for async messages.
851 This will be called for async notifications in sync mode,
852 and all messages in async mode. In sync mode, replies to
853 requests will not come here.
855 callback is a fn(msg_type_name, msg_type) that will be
856 called when a message comes in. While this function is
857 executing, note that (a) you are in a background thread and
858 may wish to use threading.Lock to protect your datastructures,
859 and (b) message processing from VPP will stop (so if you take
860 a long while about it you may provoke reply timeouts or cause
861 VPP to fill the RX buffer). Passing None will disable the
864 self.event_callback = callback
866 def thread_msg_handler(self):
867 """Python thread calling the user registerd message handler.
869 This is to emulate the old style event callback scheme. Modern
870 clients should provide their own thread to poll the event
874 r = self.message_queue.get()
875 if r == "terminate event thread":
877 msgname = type(r).__name__
878 if self.event_callback:
879 self.event_callback(msgname, r)
882 # vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4