3 # Copyright (c) 2016 Cisco and/or its affiliates.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
17 from __future__ import print_function
31 if sys.version[0] == '2':
38 typedef void (*vac_callback_t)(unsigned char * data, int len);
39 typedef void (*vac_error_callback_t)(void *, unsigned char *, int);
40 int vac_connect(char * name, char * chroot_prefix, vac_callback_t cb,
42 int vac_disconnect(void);
43 int vac_read(char **data, int *l, unsigned short timeout);
44 int vac_write(char *data, int len);
45 void vac_free(void * msg);
47 int vac_get_msg_index(unsigned char * name);
48 int vac_msg_table_size(void);
49 int vac_msg_table_max_index(void);
51 void vac_rx_suspend (void);
52 void vac_rx_resume (void);
53 void vac_set_error_handler(vac_error_callback_t);
56 # Barfs on failure, no need to check success.
57 vpp_api = ffi.dlopen('libvppapiclient.so')
60 def vpp_atexit(vpp_weakref):
61 """Clean up VPP connection on shutdown."""
62 vpp_instance = vpp_weakref()
63 if vpp_instance.connected:
64 vpp_instance.logger.debug('Cleaning up VPP on exit')
65 vpp_instance.disconnect()
72 if sys.version[0] == '2':
78 @ffi.callback("void(unsigned char *, int)")
79 def vac_callback_sync(data, len):
80 vpp_object.msg_handler_sync(ffi.buffer(data, len))
83 @ffi.callback("void(unsigned char *, int)")
84 def vac_callback_async(data, len):
85 vpp_object.msg_handler_async(ffi.buffer(data, len))
88 @ffi.callback("void(void *, unsigned char *, int)")
89 def vac_error_handler(arg, msg, msg_len):
90 vpp_object.logger.warning("VPP API client:: %s", ffi.string(msg, msg_len))
97 class FuncWrapper(object):
98 def __init__(self, func):
100 self.__name__ = func.__name__
102 def __call__(self, **kwargs):
103 return self._func(**kwargs)
109 This class provides the APIs to VPP. The APIs are loaded
110 from provided .api.json files and makes functions accordingly.
111 These functions are documented in the VPP .api files, as they
112 are dynamically created.
114 Additionally, VPP can send callback messages; this class
115 provides a means to register a callback function to receive
116 these messages in a background thread.
118 def __init__(self, apifiles=None, testmode=False, async_thread=True,
119 logger=None, loglevel=None,
121 """Create a VPP API object.
123 apifiles is a list of files containing API
124 descriptions that will be loaded - methods will be
125 dynamically created reflecting these APIs. If not
126 provided this will load the API files from VPP's
127 default install location.
129 logger, if supplied, is the logging logger object to log to.
130 loglevel, if supplied, is the log level this logger is set
131 to report at (from the loglevels in the logging module).
137 logger = logging.getLogger(__name__)
138 if loglevel is not None:
139 logger.setLevel(loglevel)
146 self.connected = False
147 self.header = struct.Struct('>HI')
149 self.event_callback = None
150 self.message_queue = queue.Queue()
151 self.read_timeout = read_timeout
152 self.vpp_api = vpp_api
153 self.async_thread = async_thread
156 # Pick up API definitions from default directory
158 apifiles = self.find_api_files()
160 # In test mode we don't care that we can't find the API files
166 for file in apifiles:
167 with open(file) as apidef_file:
168 api = json.load(apidef_file)
169 for t in api['types']:
170 self.add_type(t[0], t[1:])
172 for m in api['messages']:
173 self.add_message(m[0], m[1:])
174 self.apifiles = apifiles
177 if len(self.messages) == 0 and not testmode:
178 raise ValueError(1, 'Missing JSON message definitions')
180 # Make sure we allow VPP to clean up the message rings.
181 atexit.register(vpp_atexit, weakref.ref(self))
183 # Register error handler
184 vpp_api.vac_set_error_handler(vac_error_handler)
186 # Support legacy CFFI
187 # from_buffer supported from 1.8.0
188 (major, minor, patch) = [int(s) for s in
189 cffi.__version__.split('.', 3)]
190 if major >= 1 and minor >= 8:
191 self._write = self._write_new_cffi
193 self._write = self._write_legacy_cffi
195 class ContextId(object):
196 """Thread-safe provider of unique context IDs."""
199 self.lock = threading.Lock()
202 """Get a new unique (or, at least, not recently used) context."""
206 get_context = ContextId()
209 def find_api_dir(cls):
210 """Attempt to find the best directory in which API definition
211 files may reside. If the value VPP_API_DIR exists in the environment
212 then it is first on the search list. If we're inside a recognized
213 location in a VPP source tree (src/scripts and src/vpp-api/python)
214 then entries from there to the likely locations in build-root are
215 added. Finally the location used by system packages is added.
217 :returns: A single directory name, or None if no such directory
222 if 'VPP_API_DIR' in os.environ:
223 dirs.append(os.environ['VPP_API_DIR'])
225 # perhaps we're in the 'src/scripts' or 'src/vpp-api/python' dir;
226 # in which case, plot a course to likely places in the src tree
227 import __main__ as main
228 if hasattr(main, '__file__'):
229 # get the path of the calling script
230 localdir = os.path.dirname(os.path.realpath(main.__file__))
232 # use cwd if there is no calling script
233 localdir = os.getcwd()
234 localdir_s = localdir.split(os.path.sep)
237 """Match dir against right-hand components of the script dir"""
238 d = dir.split('/') # param 'dir' assumes a / separator
240 return len(localdir_s) > length and localdir_s[-length:] == d
242 def sdir(srcdir, variant):
243 """Build a path from srcdir to the staged API files of
244 'variant' (typically '' or '_debug')"""
245 # Since 'core' and 'plugin' files are staged
246 # in separate directories, we target the parent dir.
247 return os.path.sep.join((
250 'install-vpp%s-native' % variant,
258 if dmatch('src/scripts'):
259 srcdir = os.path.sep.join(localdir_s[:-2])
260 elif dmatch('src/vpp-api/python'):
261 srcdir = os.path.sep.join(localdir_s[:-3])
263 # we're apparently running tests
264 srcdir = os.path.sep.join(localdir_s[:-1])
267 # we're in the source tree, try both the debug and release
269 dirs.append(sdir(srcdir, '_debug'))
270 dirs.append(sdir(srcdir, ''))
272 # Test for staged copies of the scripts
273 # For these, since we explicitly know if we're running a debug versus
274 # release variant, target only the relevant directory
275 if dmatch('build-root/install-vpp_debug-native/vpp/bin'):
276 srcdir = os.path.sep.join(localdir_s[:-4])
277 dirs.append(sdir(srcdir, '_debug'))
278 if dmatch('build-root/install-vpp-native/vpp/bin'):
279 srcdir = os.path.sep.join(localdir_s[:-4])
280 dirs.append(sdir(srcdir, ''))
282 # finally, try the location system packages typically install into
283 dirs.append(os.path.sep.join(('', 'usr', 'share', 'vpp', 'api')))
285 # check the directories for existance; first one wins
287 if os.path.isdir(dir):
293 def find_api_files(cls, api_dir=None, patterns='*'):
294 """Find API definition files from the given directory tree with the
295 given pattern. If no directory is given then find_api_dir() is used
296 to locate one. If no pattern is given then all definition files found
297 in the directory tree are used.
299 :param api_dir: A directory tree in which to locate API definition
300 files; subdirectories are descended into.
301 If this is None then find_api_dir() is called to discover it.
302 :param patterns: A list of patterns to use in each visited directory
303 when looking for files.
304 This can be a list/tuple object or a comma-separated string of
305 patterns. Each value in the list will have leading/trialing
307 The pattern specifies the first part of the filename, '.api.json'
309 The results are de-duplicated, thus overlapping patterns are fine.
310 If this is None it defaults to '*' meaning "all API files".
311 :returns: A list of file paths for the API files found.
314 api_dir = cls.find_api_dir()
316 raise RuntimeError("api_dir cannot be located")
318 if isinstance(patterns, list) or isinstance(patterns, tuple):
319 patterns = [p.strip() + '.api.json' for p in patterns]
321 patterns = [p.strip() + '.api.json' for p in patterns.split(",")]
324 for root, dirnames, files in os.walk(api_dir):
325 # iterate all given patterns and de-dup the result
326 files = set(sum([fnmatch.filter(files, p) for p in patterns], []))
327 for filename in files:
328 api_files.append(os.path.join(root, filename))
333 """Debug function: report current VPP API status to stdout."""
334 print('Connected') if self.connected else print('Not Connected')
335 print('Read API definitions from', ', '.join(self.apifiles))
337 def __struct(self, t, n=None, e=-1, vl=None):
338 """Create a packing structure for a message."""
339 base_types = {'u8': 'B',
348 if e > 0 and t == 'u8':
350 s = struct.Struct('>' + str(e) + 's')
353 # Fixed array of base type
354 s = struct.Struct('>' + base_types[t])
355 return s.size, [e, s]
357 # Old style variable array
358 s = struct.Struct('>' + base_types[t])
359 return s.size, [-1, s]
361 # Variable length array
363 s = struct.Struct('>s')
364 return s.size, [vl, s]
366 s = struct.Struct('>' + base_types[t])
367 return s.size, [vl, s]
369 s = struct.Struct('>' + base_types[t])
372 if t in self.messages:
373 size = self.messages[t]['sizes'][0]
375 # Return a list in case of array
377 return size, [e, lambda self, encode, buf, offset, args: (
378 self.__struct_type(encode, self.messages[t], buf, offset,
381 return size, [vl, lambda self, encode, buf, offset, args: (
382 self.__struct_type(encode, self.messages[t], buf, offset,
386 raise NotImplementedError(1,
387 'No support for compound types ' + t)
388 return size, lambda self, encode, buf, offset, args: (
389 self.__struct_type(encode, self.messages[t], buf, offset, args)
392 raise ValueError(1, 'Invalid message type: ' + t)
394 def __struct_type(self, encode, msgdef, buf, offset, kwargs):
395 """Get a message packer or unpacker."""
397 return self.__struct_type_encode(msgdef, buf, offset, kwargs)
399 return self.__struct_type_decode(msgdef, buf, offset)
401 def __struct_type_encode(self, msgdef, buf, offset, kwargs):
406 if k not in msgdef['args']:
407 raise ValueError(1, 'Non existing argument [' + k + ']' +
408 ' used in call to: ' +
409 self.id_names[kwargs['_vl_msg_id']] + '()')
411 for k, v in vpp_iterator(msgdef['args']):
416 e = kwargs[v[0]] if v[0] in kwargs else v[0]
417 if e != len(kwargs[k]):
419 'Input list length mismatch: '
421 (k, e, len(kwargs[k]))))
424 size += v[1](self, True, buf, off + size,
428 kwargslen = kwargs[v[0]]
429 if kwargslen != len(kwargs[k]):
431 'Input list length mismatch:'
436 kwargslen = len(kwargs[k])
438 buf[off:off + kwargslen] = bytearray(kwargs[k])
443 v[1].pack_into(buf, off + size, i)
447 size = v(self, True, buf, off, kwargs[k])
449 if type(kwargs[k]) is str and v.size < len(kwargs[k]):
451 'Input list length mismatch: '
453 (k, v.size, len(kwargs[k])))
454 v.pack_into(buf, off, kwargs[k])
457 size = v.size if not type(v) is list else 0
459 return off + size - offset
461 def __getitem__(self, name):
462 if name in self.messages:
463 return self.messages[name]
466 def get_size(self, sizes, kwargs):
467 total_size = sizes[0]
469 if e in kwargs and type(kwargs[e]) is list:
470 total_size += len(kwargs[e]) * sizes[1][e]
473 def encode(self, msgdef, kwargs):
474 # Make suitably large buffer
475 size = self.get_size(msgdef['sizes'], kwargs)
476 buf = bytearray(size)
478 size = self.__struct_type(True, msgdef, buf, offset, kwargs)
479 return buf[:offset + size]
481 def decode(self, msgdef, buf):
482 return self.__struct_type(False, msgdef, buf, 0, None)[1]
484 def __struct_type_decode(self, msgdef, buf, offset):
488 for k, v in vpp_iterator(msgdef['args']):
492 if callable(v[1]): # compound type
494 if v[0] in msgdef['args']: # vla
500 (s, l) = v[1](self, False, buf, off + size, None)
505 if type(v[0]) is int:
506 size = len(buf) - off
509 res.append(buf[off:off + size])
511 e = v[0] if type(v[0]) is int else res[v[2]]
513 e = (len(buf) - off) / v[1].size
518 lst.append(v[1].unpack_from(buf, off + size)[0])
523 (s, l) = v(self, False, buf, off, None)
527 res.append(v.unpack_from(buf, off)[0])
530 return off + size - offset, msgdef['return_tuple']._make(res)
532 def ret_tup(self, name):
533 if name in self.messages and 'return_tuple' in self.messages[name]:
534 return self.messages[name]['return_tuple']
537 def duplicate_check_ok(self, name, msgdef):
540 if type(c) is dict and 'crc' in c:
544 # We can get duplicates because of imports
545 if crc == self.messages[name]['crc']:
549 def add_message(self, name, msgdef, typeonly=False):
550 if name in self.messages:
552 if self.duplicate_check_ok(name, msgdef):
554 raise ValueError('Duplicate message name: ' + name)
556 args = collections.OrderedDict()
557 argtypes = collections.OrderedDict()
562 for i, f in enumerate(msgdef):
563 if type(f) is dict and 'crc' in f:
564 msg['crc'] = f['crc']
568 if len(f) == 3 and f[2] == 0 and i != len(msgdef) - 2:
569 raise ValueError('Variable Length Array must be last: ' + name)
570 size, s = self.__struct(*f)
572 if type(s) == list and type(s[0]) == int and \
573 type(s[1]) == struct.Struct:
575 sizes[field_name] = size
577 sizes[field_name] = size
578 total_size += s[0] * size
580 sizes[field_name] = size
583 argtypes[field_name] = field_type
584 if len(f) == 4: # Find offset to # elements field
585 idx = list(args.keys()).index(f[3]) - i
586 args[field_name].append(idx)
587 fields.append(field_name)
588 msg['return_tuple'] = collections.namedtuple(name, fields,
590 self.messages[name] = msg
591 self.messages[name]['args'] = args
592 self.messages[name]['argtypes'] = argtypes
593 self.messages[name]['typeonly'] = typeonly
594 self.messages[name]['sizes'] = [total_size, sizes]
595 return self.messages[name]
597 def add_type(self, name, typedef):
598 return self.add_message('vl_api_' + name + '_t', typedef,
601 def make_function(self, name, i, msgdef, multipart, async):
604 return self._call_vpp_async(i, msgdef, **kwargs)
607 return self._call_vpp(i, msgdef, multipart, **kwargs)
608 args = self.messages[name]['args']
609 argtypes = self.messages[name]['argtypes']
610 f.__name__ = str(name)
611 f.__doc__ = ", ".join(["%s %s" %
612 (argtypes[k], k) for k in args.keys()])
617 if not hasattr(self, "_api"):
618 raise Exception("Not connected, api definitions not available")
621 def _register_functions(self, async=False):
622 self.id_names = [None] * (self.vpp_dictionary_maxid + 1)
623 self.id_msgdef = [None] * (self.vpp_dictionary_maxid + 1)
625 for name, msgdef in vpp_iterator(self.messages):
626 if self.messages[name]['typeonly']:
628 crc = self.messages[name]['crc']
629 n = name + '_' + crc[2:]
630 i = vpp_api.vac_get_msg_index(n.encode())
632 self.id_msgdef[i] = msgdef
633 self.id_names[i] = name
634 multipart = True if name.find('_dump') > 0 else False
635 f = self.make_function(name, i, msgdef, multipart, async)
636 setattr(self._api, name, FuncWrapper(f))
639 'No such message type or failed CRC checksum: %s', n)
641 def _write_new_cffi(self, buf):
642 """Send a binary-packed message to VPP."""
643 if not self.connected:
644 raise IOError(1, 'Not connected')
645 return vpp_api.vac_write(ffi.from_buffer(buf), len(buf))
647 def _write_legacy_cffi(self, buf):
648 """Send a binary-packed message to VPP."""
649 if not self.connected:
650 raise IOError(1, 'Not connected')
651 return vpp_api.vac_write(bytes(buf), len(buf))
654 if not self.connected:
655 raise IOError(1, 'Not connected')
656 mem = ffi.new("char **")
657 size = ffi.new("int *")
658 rv = vpp_api.vac_read(mem, size, self.read_timeout)
660 raise IOError(rv, 'vac_read failed')
661 msg = bytes(ffi.buffer(mem[0], size[0]))
662 vpp_api.vac_free(mem[0])
665 def connect_internal(self, name, msg_handler, chroot_prefix, rx_qlen,
667 pfx = chroot_prefix.encode() if chroot_prefix else ffi.NULL
668 rv = vpp_api.vac_connect(name.encode(), pfx, msg_handler, rx_qlen)
670 raise IOError(2, 'Connect failed')
671 self.connected = True
673 self.vpp_dictionary_maxid = vpp_api.vac_msg_table_max_index()
674 self._register_functions(async=async)
676 # Initialise control ping
677 crc = self.messages['control_ping']['crc']
678 self.control_ping_index = vpp_api.vac_get_msg_index(
679 ('control_ping' + '_' + crc[2:]).encode())
680 self.control_ping_msgdef = self.messages['control_ping']
681 if self.async_thread:
682 self.event_thread = threading.Thread(
683 target=self.thread_msg_handler)
684 self.event_thread.daemon = True
685 self.event_thread.start()
688 def connect(self, name, chroot_prefix=None, async=False, rx_qlen=32):
691 name - the name of the client.
692 chroot_prefix - if VPP is chroot'ed, the prefix of the jail
693 async - if true, messages are sent without waiting for a reply
694 rx_qlen - the length of the VPP message receive queue between
697 msg_handler = vac_callback_sync if not async else vac_callback_async
698 return self.connect_internal(name, msg_handler, chroot_prefix, rx_qlen,
701 def connect_sync(self, name, chroot_prefix=None, rx_qlen=32):
702 """Attach to VPP in synchronous mode. Application must poll for events.
704 name - the name of the client.
705 chroot_prefix - if VPP is chroot'ed, the prefix of the jail
706 rx_qlen - the length of the VPP message receive queue between
710 return self.connect_internal(name, ffi.NULL, chroot_prefix, rx_qlen,
713 def disconnect(self):
714 """Detach from VPP."""
715 rv = vpp_api.vac_disconnect()
716 self.connected = False
717 self.message_queue.put("terminate event thread")
720 def msg_handler_sync(self, msg):
721 """Process an incoming message from VPP in sync mode.
723 The message may be a reply or it may be an async notification.
725 r = self.decode_incoming_msg(msg)
729 # If we have a context, then use the context to find any
730 # request waiting for a reply
732 if hasattr(r, 'context') and r.context > 0:
736 # No context -> async notification that we feed to the callback
737 self.message_queue.put_nowait(r)
739 raise IOError(2, 'RPC reply message received in event handler')
741 def decode_incoming_msg(self, msg):
743 self.logger.warning('vpp_api.read failed')
746 i, ci = self.header.unpack_from(msg, 0)
747 if self.id_names[i] == 'rx_thread_exit':
751 # Decode message and returns a tuple.
753 msgdef = self.id_msgdef[i]
755 raise IOError(2, 'Reply message undefined')
757 r = self.decode(msgdef, msg)
761 def msg_handler_async(self, msg):
762 """Process a message from VPP in async mode.
764 In async mode, all messages are returned to the callback.
766 r = self.decode_incoming_msg(msg)
770 msgname = type(r).__name__
772 if self.event_callback:
773 self.event_callback(msgname, r)
775 def _control_ping(self, context):
776 """Send a ping command."""
777 self._call_vpp_async(self.control_ping_index,
778 self.control_ping_msgdef,
781 def _call_vpp(self, i, msgdef, multipart, **kwargs):
782 """Given a message, send the message and await a reply.
784 msgdef - the message packing definition
785 i - the message type index
786 multipart - True if the message returns multiple
788 context - context number - chosen at random if not
790 The remainder of the kwargs are the arguments to the API call.
792 The return value is the message or message array containing
793 the response. It will raise an IOError exception if there was
794 no response within the timeout window.
797 if 'context' not in kwargs:
798 context = self.get_context()
799 kwargs['context'] = context
801 context = kwargs['context']
802 kwargs['_vl_msg_id'] = i
803 b = self.encode(msgdef, kwargs)
805 vpp_api.vac_rx_suspend()
809 # Send a ping after the request - we use its response
810 # to detect that we have seen all results.
811 self._control_ping(context)
813 # Block until we get a reply.
818 raise IOError(2, 'VPP API client: read failed')
820 r = self.decode_incoming_msg(msg)
821 msgname = type(r).__name__
822 if context not in r or r.context == 0 or context != r.context:
823 self.message_queue.put_nowait(r)
829 if msgname == 'control_ping_reply':
834 vpp_api.vac_rx_resume()
838 def _call_vpp_async(self, i, msgdef, **kwargs):
839 """Given a message, send the message and await a reply.
841 msgdef - the message packing definition
842 i - the message type index
843 context - context number - chosen at random if not
845 The remainder of the kwargs are the arguments to the API call.
847 if 'context' not in kwargs:
848 context = self.get_context()
849 kwargs['context'] = context
851 context = kwargs['context']
852 kwargs['_vl_msg_id'] = i
853 b = self.encode(msgdef, kwargs)
857 def register_event_callback(self, callback):
858 """Register a callback for async messages.
860 This will be called for async notifications in sync mode,
861 and all messages in async mode. In sync mode, replies to
862 requests will not come here.
864 callback is a fn(msg_type_name, msg_type) that will be
865 called when a message comes in. While this function is
866 executing, note that (a) you are in a background thread and
867 may wish to use threading.Lock to protect your datastructures,
868 and (b) message processing from VPP will stop (so if you take
869 a long while about it you may provoke reply timeouts or cause
870 VPP to fill the RX buffer). Passing None will disable the
873 self.event_callback = callback
875 def thread_msg_handler(self):
876 """Python thread calling the user registerd message handler.
878 This is to emulate the old style event callback scheme. Modern
879 clients should provide their own thread to poll the event
883 r = self.message_queue.get()
884 if r == "terminate event thread":
886 msgname = type(r).__name__
887 if self.event_callback:
888 self.event_callback(msgname, r)
891 # vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4