3 # Copyright (c) 2016 Cisco and/or its affiliates.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
17 from __future__ import print_function
30 from vpp_serializer import VPPType, VPPEnumType, VPPUnionType, BaseTypes
32 if sys.version[0] == '2':
39 typedef void (*vac_callback_t)(unsigned char * data, int len);
40 typedef void (*vac_error_callback_t)(void *, unsigned char *, int);
41 int vac_connect(char * name, char * chroot_prefix, vac_callback_t cb,
43 int vac_disconnect(void);
44 int vac_read(char **data, int *l, unsigned short timeout);
45 int vac_write(char *data, int len);
46 void vac_free(void * msg);
48 int vac_get_msg_index(unsigned char * name);
49 int vac_msg_table_size(void);
50 int vac_msg_table_max_index(void);
52 void vac_rx_suspend (void);
53 void vac_rx_resume (void);
54 void vac_set_error_handler(vac_error_callback_t);
57 # Barfs on failure, no need to check success.
58 vpp_api = ffi.dlopen('libvppapiclient.so')
61 def vpp_atexit(vpp_weakref):
62 """Clean up VPP connection on shutdown."""
63 vpp_instance = vpp_weakref()
64 if vpp_instance.connected:
65 vpp_instance.logger.debug('Cleaning up VPP on exit')
66 vpp_instance.disconnect()
73 if sys.version[0] == '2':
79 @ffi.callback("void(unsigned char *, int)")
80 def vac_callback_sync(data, len):
81 vpp_object.msg_handler_sync(ffi.buffer(data, len))
84 @ffi.callback("void(unsigned char *, int)")
85 def vac_callback_async(data, len):
86 vpp_object.msg_handler_async(ffi.buffer(data, len))
89 @ffi.callback("void(void *, unsigned char *, int)")
90 def vac_error_handler(arg, msg, msg_len):
91 vpp_object.logger.warning("VPP API client:: %s", ffi.string(msg, msg_len))
98 class FuncWrapper(object):
99 def __init__(self, func):
101 self.__name__ = func.__name__
103 def __call__(self, **kwargs):
104 return self._func(**kwargs)
107 class VPPMessage(VPPType):
113 This class provides the APIs to VPP. The APIs are loaded
114 from provided .api.json files and makes functions accordingly.
115 These functions are documented in the VPP .api files, as they
116 are dynamically created.
118 Additionally, VPP can send callback messages; this class
119 provides a means to register a callback function to receive
120 these messages in a background thread.
123 def process_json_file(self, apidef_file):
124 api = json.load(apidef_file)
126 for t in api['enums']:
127 t[0] = 'vl_api_' + t[0] + '_t'
128 types[t[0]] = {'type': 'enum', 'data': t}
129 for t in api['unions']:
130 t[0] = 'vl_api_' + t[0] + '_t'
131 types[t[0]] = {'type': 'union', 'data': t}
132 for t in api['types']:
133 t[0] = 'vl_api_' + t[0] + '_t'
134 types[t[0]] = {'type': 'type', 'data': t}
139 for k, v in types.items():
141 if v['type'] == 'enum':
143 VPPEnumType(t[0], t[1:])
146 elif v['type'] == 'union':
148 VPPUnionType(t[0], t[1:])
151 elif v['type'] == 'type':
156 if len(unresolved) == 0:
159 raise ValueError('Unresolved type definitions {}'
164 for m in api['messages']:
166 self.messages[m[0]] = VPPMessage(m[0], m[1:])
167 except NotImplementedError:
168 self.logger.error('Not implemented error for {}'.format(m[0]))
170 def __init__(self, apifiles=None, testmode=False, async_thread=True,
171 logger=logging.getLogger('vpp_papi'), loglevel='debug',
173 """Create a VPP API object.
175 apifiles is a list of files containing API
176 descriptions that will be loaded - methods will be
177 dynamically created reflecting these APIs. If not
178 provided this will load the API files from VPP's
179 default install location.
181 logger, if supplied, is the logging logger object to log to.
182 loglevel, if supplied, is the log level this logger is set
183 to report at (from the loglevels in the logging module).
189 logger = logging.getLogger(__name__)
190 if loglevel is not None:
191 logger.setLevel(loglevel)
197 self.connected = False
198 self.header = VPPType('header', [['u16', 'msgid'],
199 ['u32', 'client_index']])
201 self.event_callback = None
202 self.message_queue = queue.Queue()
203 self.read_timeout = read_timeout
204 self.vpp_api = vpp_api
205 self.async_thread = async_thread
208 # Pick up API definitions from default directory
210 apifiles = self.find_api_files()
212 # In test mode we don't care that we can't find the API files
218 for file in apifiles:
219 with open(file) as apidef_file:
220 self.process_json_file(apidef_file)
222 self.apifiles = apifiles
225 if len(self.messages) == 0 and not testmode:
226 raise ValueError(1, 'Missing JSON message definitions')
228 # Make sure we allow VPP to clean up the message rings.
229 atexit.register(vpp_atexit, weakref.ref(self))
231 # Register error handler
233 vpp_api.vac_set_error_handler(vac_error_handler)
235 # Support legacy CFFI
236 # from_buffer supported from 1.8.0
237 (major, minor, patch) = [int(s) for s in
238 cffi.__version__.split('.', 3)]
239 if major >= 1 and minor >= 8:
240 self._write = self._write_new_cffi
242 self._write = self._write_legacy_cffi
244 class ContextId(object):
245 """Thread-safe provider of unique context IDs."""
248 self.lock = threading.Lock()
251 """Get a new unique (or, at least, not recently used) context."""
255 get_context = ContextId()
258 def find_api_dir(cls):
259 """Attempt to find the best directory in which API definition
260 files may reside. If the value VPP_API_DIR exists in the environment
261 then it is first on the search list. If we're inside a recognized
262 location in a VPP source tree (src/scripts and src/vpp-api/python)
263 then entries from there to the likely locations in build-root are
264 added. Finally the location used by system packages is added.
266 :returns: A single directory name, or None if no such directory
271 if 'VPP_API_DIR' in os.environ:
272 dirs.append(os.environ['VPP_API_DIR'])
274 # perhaps we're in the 'src/scripts' or 'src/vpp-api/python' dir;
275 # in which case, plot a course to likely places in the src tree
276 import __main__ as main
277 if hasattr(main, '__file__'):
278 # get the path of the calling script
279 localdir = os.path.dirname(os.path.realpath(main.__file__))
281 # use cwd if there is no calling script
282 localdir = os.getcwd()
283 localdir_s = localdir.split(os.path.sep)
286 """Match dir against right-hand components of the script dir"""
287 d = dir.split('/') # param 'dir' assumes a / separator
289 return len(localdir_s) > length and localdir_s[-length:] == d
291 def sdir(srcdir, variant):
292 """Build a path from srcdir to the staged API files of
293 'variant' (typically '' or '_debug')"""
294 # Since 'core' and 'plugin' files are staged
295 # in separate directories, we target the parent dir.
296 return os.path.sep.join((
299 'install-vpp%s-native' % variant,
307 if dmatch('src/scripts'):
308 srcdir = os.path.sep.join(localdir_s[:-2])
309 elif dmatch('src/vpp-api/python'):
310 srcdir = os.path.sep.join(localdir_s[:-3])
312 # we're apparently running tests
313 srcdir = os.path.sep.join(localdir_s[:-1])
316 # we're in the source tree, try both the debug and release
318 dirs.append(sdir(srcdir, '_debug'))
319 dirs.append(sdir(srcdir, ''))
321 # Test for staged copies of the scripts
322 # For these, since we explicitly know if we're running a debug versus
323 # release variant, target only the relevant directory
324 if dmatch('build-root/install-vpp_debug-native/vpp/bin'):
325 srcdir = os.path.sep.join(localdir_s[:-4])
326 dirs.append(sdir(srcdir, '_debug'))
327 if dmatch('build-root/install-vpp-native/vpp/bin'):
328 srcdir = os.path.sep.join(localdir_s[:-4])
329 dirs.append(sdir(srcdir, ''))
331 # finally, try the location system packages typically install into
332 dirs.append(os.path.sep.join(('', 'usr', 'share', 'vpp', 'api')))
334 # check the directories for existance; first one wins
336 if os.path.isdir(dir):
342 def find_api_files(cls, api_dir=None, patterns='*'):
343 """Find API definition files from the given directory tree with the
344 given pattern. If no directory is given then find_api_dir() is used
345 to locate one. If no pattern is given then all definition files found
346 in the directory tree are used.
348 :param api_dir: A directory tree in which to locate API definition
349 files; subdirectories are descended into.
350 If this is None then find_api_dir() is called to discover it.
351 :param patterns: A list of patterns to use in each visited directory
352 when looking for files.
353 This can be a list/tuple object or a comma-separated string of
354 patterns. Each value in the list will have leading/trialing
356 The pattern specifies the first part of the filename, '.api.json'
358 The results are de-duplicated, thus overlapping patterns are fine.
359 If this is None it defaults to '*' meaning "all API files".
360 :returns: A list of file paths for the API files found.
363 api_dir = cls.find_api_dir()
365 raise RuntimeError("api_dir cannot be located")
367 if isinstance(patterns, list) or isinstance(patterns, tuple):
368 patterns = [p.strip() + '.api.json' for p in patterns]
370 patterns = [p.strip() + '.api.json' for p in patterns.split(",")]
373 for root, dirnames, files in os.walk(api_dir):
374 # iterate all given patterns and de-dup the result
375 files = set(sum([fnmatch.filter(files, p) for p in patterns], []))
376 for filename in files:
377 api_files.append(os.path.join(root, filename))
382 """Debug function: report current VPP API status to stdout."""
383 print('Connected') if self.connected else print('Not Connected')
384 print('Read API definitions from', ', '.join(self.apifiles))
388 if not hasattr(self, "_api"):
389 raise Exception("Not connected, api definitions not available")
392 def make_function(self, msg, i, multipart, async):
395 return self._call_vpp_async(i, msg, **kwargs)
398 return self._call_vpp(i, msg, multipart, **kwargs)
400 f.__name__ = str(msg.name)
401 f.__doc__ = ", ".join(["%s %s" %
402 (msg.fieldtypes[j], k) for j, k in enumerate(msg.fields)])
405 def _register_functions(self, async=False):
406 self.id_names = [None] * (self.vpp_dictionary_maxid + 1)
407 self.id_msgdef = [None] * (self.vpp_dictionary_maxid + 1)
409 for name, msg in vpp_iterator(self.messages):
410 n = name + '_' + msg.crc[2:]
411 i = vpp_api.vac_get_msg_index(n.encode())
413 self.id_msgdef[i] = msg
414 self.id_names[i] = name
415 # TODO: Fix multipart (use services)
416 multipart = True if name.find('_dump') > 0 else False
417 f = self.make_function(msg, i, multipart, async)
418 setattr(self._api, name, FuncWrapper(f))
421 'No such message type or failed CRC checksum: %s', n)
423 def _write_new_cffi(self, buf):
424 """Send a binary-packed message to VPP."""
425 if not self.connected:
426 raise IOError(1, 'Not connected')
427 return vpp_api.vac_write(ffi.from_buffer(buf), len(buf))
429 def _write_legacy_cffi(self, buf):
430 """Send a binary-packed message to VPP."""
431 if not self.connected:
432 raise IOError(1, 'Not connected')
433 return vpp_api.vac_write(bytes(buf), len(buf))
436 if not self.connected:
437 raise IOError(1, 'Not connected')
438 mem = ffi.new("char **")
439 size = ffi.new("int *")
440 rv = vpp_api.vac_read(mem, size, self.read_timeout)
442 raise IOError(rv, 'vac_read failed')
443 msg = bytes(ffi.buffer(mem[0], size[0]))
444 vpp_api.vac_free(mem[0])
447 def connect_internal(self, name, msg_handler, chroot_prefix, rx_qlen,
449 pfx = chroot_prefix.encode() if chroot_prefix else ffi.NULL
450 rv = vpp_api.vac_connect(name.encode(), pfx, msg_handler, rx_qlen)
452 raise IOError(2, 'Connect failed')
453 self.connected = True
454 self.vpp_dictionary_maxid = vpp_api.vac_msg_table_max_index()
455 self._register_functions(async=async)
457 # Initialise control ping
458 crc = self.messages['control_ping'].crc
459 self.control_ping_index = vpp_api.vac_get_msg_index(
460 ('control_ping' + '_' + crc[2:]).encode())
461 self.control_ping_msgdef = self.messages['control_ping']
462 if self.async_thread:
463 self.event_thread = threading.Thread(
464 target=self.thread_msg_handler)
465 self.event_thread.daemon = True
466 self.event_thread.start()
469 def connect(self, name, chroot_prefix=None, async=False, rx_qlen=32):
472 name - the name of the client.
473 chroot_prefix - if VPP is chroot'ed, the prefix of the jail
474 async - if true, messages are sent without waiting for a reply
475 rx_qlen - the length of the VPP message receive queue between
478 msg_handler = vac_callback_sync if not async else vac_callback_async
479 return self.connect_internal(name, msg_handler, chroot_prefix, rx_qlen,
482 def connect_sync(self, name, chroot_prefix=None, rx_qlen=32):
483 """Attach to VPP in synchronous mode. Application must poll for events.
485 name - the name of the client.
486 chroot_prefix - if VPP is chroot'ed, the prefix of the jail
487 rx_qlen - the length of the VPP message receive queue between
491 return self.connect_internal(name, ffi.NULL, chroot_prefix, rx_qlen,
494 def disconnect(self):
495 """Detach from VPP."""
496 rv = vpp_api.vac_disconnect()
497 self.connected = False
498 self.message_queue.put("terminate event thread")
501 def msg_handler_sync(self, msg):
502 """Process an incoming message from VPP in sync mode.
504 The message may be a reply or it may be an async notification.
506 r = self.decode_incoming_msg(msg)
510 # If we have a context, then use the context to find any
511 # request waiting for a reply
513 if hasattr(r, 'context') and r.context > 0:
517 # No context -> async notification that we feed to the callback
518 self.message_queue.put_nowait(r)
520 raise IOError(2, 'RPC reply message received in event handler')
522 def decode_incoming_msg(self, msg):
524 self.logger.warning('vpp_api.read failed')
527 i, ci = self.header.unpack(msg, 0)
528 if self.id_names[i] == 'rx_thread_exit':
532 # Decode message and returns a tuple.
534 msgobj = self.id_msgdef[i]
536 raise IOError(2, 'Reply message undefined')
538 r = msgobj.unpack(msg)
542 def msg_handler_async(self, msg):
543 """Process a message from VPP in async mode.
545 In async mode, all messages are returned to the callback.
547 r = self.decode_incoming_msg(msg)
551 msgname = type(r).__name__
553 if self.event_callback:
554 self.event_callback(msgname, r)
556 def _control_ping(self, context):
557 """Send a ping command."""
558 self._call_vpp_async(self.control_ping_index,
559 self.control_ping_msgdef,
562 def validate_args(self, msg, kwargs):
563 d = set(kwargs.keys()) - set(msg.field_by_name.keys())
565 raise ValueError('Invalid argument {} to {}'.format(list(d), msg.name))
567 def _call_vpp(self, i, msg, multipart, **kwargs):
568 """Given a message, send the message and await a reply.
570 msgdef - the message packing definition
571 i - the message type index
572 multipart - True if the message returns multiple
574 context - context number - chosen at random if not
576 The remainder of the kwargs are the arguments to the API call.
578 The return value is the message or message array containing
579 the response. It will raise an IOError exception if there was
580 no response within the timeout window.
583 if 'context' not in kwargs:
584 context = self.get_context()
585 kwargs['context'] = context
587 context = kwargs['context']
588 kwargs['_vl_msg_id'] = i
590 self.validate_args(msg, kwargs)
592 vpp_api.vac_rx_suspend()
596 # Send a ping after the request - we use its response
597 # to detect that we have seen all results.
598 self._control_ping(context)
600 # Block until we get a reply.
605 raise IOError(2, 'VPP API client: read failed')
606 r = self.decode_incoming_msg(msg)
607 msgname = type(r).__name__
608 if context not in r or r.context == 0 or context != r.context:
609 self.message_queue.put_nowait(r)
615 if msgname == 'control_ping_reply':
620 vpp_api.vac_rx_resume()
624 def _call_vpp_async(self, i, msg, **kwargs):
625 """Given a message, send the message and await a reply.
627 msgdef - the message packing definition
628 i - the message type index
629 context - context number - chosen at random if not
631 The remainder of the kwargs are the arguments to the API call.
633 if 'context' not in kwargs:
634 context = self.get_context()
635 kwargs['context'] = context
637 context = kwargs['context']
638 kwargs['client_index'] = 0
639 kwargs['_vl_msg_id'] = i
644 def register_event_callback(self, callback):
645 """Register a callback for async messages.
647 This will be called for async notifications in sync mode,
648 and all messages in async mode. In sync mode, replies to
649 requests will not come here.
651 callback is a fn(msg_type_name, msg_type) that will be
652 called when a message comes in. While this function is
653 executing, note that (a) you are in a background thread and
654 may wish to use threading.Lock to protect your datastructures,
655 and (b) message processing from VPP will stop (so if you take
656 a long while about it you may provoke reply timeouts or cause
657 VPP to fill the RX buffer). Passing None will disable the
660 self.event_callback = callback
662 def thread_msg_handler(self):
663 """Python thread calling the user registerd message handler.
665 This is to emulate the old style event callback scheme. Modern
666 clients should provide their own thread to poll the event
670 r = self.message_queue.get()
671 if r == "terminate event thread":
673 msgname = type(r).__name__
674 if self.event_callback:
675 self.event_callback(msgname, r)
678 # vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4