3 # Copyright (c) 2016 Cisco and/or its affiliates.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
17 from __future__ import print_function
18 from __future__ import absolute_import
31 from . vpp_serializer import VPPType, VPPEnumType, VPPUnionType, BaseTypes
32 from . vpp_serializer import VPPMessage
34 if sys.version[0] == '2':
41 typedef void (*vac_callback_t)(unsigned char * data, int len);
42 typedef void (*vac_error_callback_t)(void *, unsigned char *, int);
43 int vac_connect(char * name, char * chroot_prefix, vac_callback_t cb,
45 int vac_disconnect(void);
46 int vac_read(char **data, int *l, unsigned short timeout);
47 int vac_write(char *data, int len);
48 void vac_free(void * msg);
50 int vac_get_msg_index(unsigned char * name);
51 int vac_msg_table_size(void);
52 int vac_msg_table_max_index(void);
54 void vac_rx_suspend (void);
55 void vac_rx_resume (void);
56 void vac_set_error_handler(vac_error_callback_t);
59 # Barfs on failure, no need to check success.
60 vpp_api = ffi.dlopen('libvppapiclient.so')
63 def vpp_atexit(vpp_weakref):
64 """Clean up VPP connection on shutdown."""
65 vpp_instance = vpp_weakref()
66 if vpp_instance.connected:
67 vpp_instance.logger.debug('Cleaning up VPP on exit')
68 vpp_instance.disconnect()
75 if sys.version[0] == '2':
81 @ffi.callback("void(unsigned char *, int)")
82 def vac_callback_sync(data, len):
83 vpp_object.msg_handler_sync(ffi.buffer(data, len))
86 @ffi.callback("void(unsigned char *, int)")
87 def vac_callback_async(data, len):
88 vpp_object.msg_handler_async(ffi.buffer(data, len))
91 @ffi.callback("void(void *, unsigned char *, int)")
92 def vac_error_handler(arg, msg, msg_len):
93 vpp_object.logger.warning("VPP API client:: %s", ffi.string(msg, msg_len))
100 class FuncWrapper(object):
101 def __init__(self, func):
103 self.__name__ = func.__name__
105 def __call__(self, **kwargs):
106 return self._func(**kwargs)
112 This class provides the APIs to VPP. The APIs are loaded
113 from provided .api.json files and makes functions accordingly.
114 These functions are documented in the VPP .api files, as they
115 are dynamically created.
117 Additionally, VPP can send callback messages; this class
118 provides a means to register a callback function to receive
119 these messages in a background thread.
122 def process_json_file(self, apidef_file):
123 api = json.load(apidef_file)
125 for t in api['enums']:
126 t[0] = 'vl_api_' + t[0] + '_t'
127 types[t[0]] = {'type': 'enum', 'data': t}
128 for t in api['unions']:
129 t[0] = 'vl_api_' + t[0] + '_t'
130 types[t[0]] = {'type': 'union', 'data': t}
131 for t in api['types']:
132 t[0] = 'vl_api_' + t[0] + '_t'
133 types[t[0]] = {'type': 'type', 'data': t}
138 for k, v in types.items():
140 if v['type'] == 'enum':
142 VPPEnumType(t[0], t[1:])
145 elif v['type'] == 'union':
147 VPPUnionType(t[0], t[1:])
150 elif v['type'] == 'type':
155 if len(unresolved) == 0:
158 raise ValueError('Unresolved type definitions {}'
163 for m in api['messages']:
165 self.messages[m[0]] = VPPMessage(m[0], m[1:])
166 except NotImplementedError:
167 self.logger.error('Not implemented error for {}'.format(m[0]))
169 def __init__(self, apifiles=None, testmode=False, async_thread=True,
170 logger=logging.getLogger('vpp_papi'), loglevel='debug',
172 """Create a VPP API object.
174 apifiles is a list of files containing API
175 descriptions that will be loaded - methods will be
176 dynamically created reflecting these APIs. If not
177 provided this will load the API files from VPP's
178 default install location.
180 logger, if supplied, is the logging logger object to log to.
181 loglevel, if supplied, is the log level this logger is set
182 to report at (from the loglevels in the logging module).
188 logger = logging.getLogger(__name__)
189 if loglevel is not None:
190 logger.setLevel(loglevel)
196 self.connected = False
197 self.header = VPPType('header', [['u16', 'msgid'],
198 ['u32', 'client_index']])
200 self.event_callback = None
201 self.message_queue = queue.Queue()
202 self.read_timeout = read_timeout
203 self.vpp_api = vpp_api
204 self.async_thread = async_thread
207 # Pick up API definitions from default directory
209 apifiles = self.find_api_files()
211 # In test mode we don't care that we can't find the API files
217 for file in apifiles:
218 with open(file) as apidef_file:
219 self.process_json_file(apidef_file)
221 self.apifiles = apifiles
224 if len(self.messages) == 0 and not testmode:
225 raise ValueError(1, 'Missing JSON message definitions')
227 # Make sure we allow VPP to clean up the message rings.
228 atexit.register(vpp_atexit, weakref.ref(self))
230 # Register error handler
232 vpp_api.vac_set_error_handler(vac_error_handler)
234 # Support legacy CFFI
235 # from_buffer supported from 1.8.0
236 (major, minor, patch) = [int(s) for s in
237 cffi.__version__.split('.', 3)]
238 if major >= 1 and minor >= 8:
239 self._write = self._write_new_cffi
241 self._write = self._write_legacy_cffi
243 class ContextId(object):
244 """Thread-safe provider of unique context IDs."""
247 self.lock = threading.Lock()
250 """Get a new unique (or, at least, not recently used) context."""
254 get_context = ContextId()
257 def find_api_dir(cls):
258 """Attempt to find the best directory in which API definition
259 files may reside. If the value VPP_API_DIR exists in the environment
260 then it is first on the search list. If we're inside a recognized
261 location in a VPP source tree (src/scripts and src/vpp-api/python)
262 then entries from there to the likely locations in build-root are
263 added. Finally the location used by system packages is added.
265 :returns: A single directory name, or None if no such directory
270 if 'VPP_API_DIR' in os.environ:
271 dirs.append(os.environ['VPP_API_DIR'])
273 # perhaps we're in the 'src/scripts' or 'src/vpp-api/python' dir;
274 # in which case, plot a course to likely places in the src tree
275 import __main__ as main
276 if hasattr(main, '__file__'):
277 # get the path of the calling script
278 localdir = os.path.dirname(os.path.realpath(main.__file__))
280 # use cwd if there is no calling script
281 localdir = os.getcwd()
282 localdir_s = localdir.split(os.path.sep)
285 """Match dir against right-hand components of the script dir"""
286 d = dir.split('/') # param 'dir' assumes a / separator
288 return len(localdir_s) > length and localdir_s[-length:] == d
290 def sdir(srcdir, variant):
291 """Build a path from srcdir to the staged API files of
292 'variant' (typically '' or '_debug')"""
293 # Since 'core' and 'plugin' files are staged
294 # in separate directories, we target the parent dir.
295 return os.path.sep.join((
298 'install-vpp%s-native' % variant,
306 if dmatch('src/scripts'):
307 srcdir = os.path.sep.join(localdir_s[:-2])
308 elif dmatch('src/vpp-api/python'):
309 srcdir = os.path.sep.join(localdir_s[:-3])
311 # we're apparently running tests
312 srcdir = os.path.sep.join(localdir_s[:-1])
315 # we're in the source tree, try both the debug and release
317 dirs.append(sdir(srcdir, '_debug'))
318 dirs.append(sdir(srcdir, ''))
320 # Test for staged copies of the scripts
321 # For these, since we explicitly know if we're running a debug versus
322 # release variant, target only the relevant directory
323 if dmatch('build-root/install-vpp_debug-native/vpp/bin'):
324 srcdir = os.path.sep.join(localdir_s[:-4])
325 dirs.append(sdir(srcdir, '_debug'))
326 if dmatch('build-root/install-vpp-native/vpp/bin'):
327 srcdir = os.path.sep.join(localdir_s[:-4])
328 dirs.append(sdir(srcdir, ''))
330 # finally, try the location system packages typically install into
331 dirs.append(os.path.sep.join(('', 'usr', 'share', 'vpp', 'api')))
333 # check the directories for existance; first one wins
335 if os.path.isdir(dir):
341 def find_api_files(cls, api_dir=None, patterns='*'):
342 """Find API definition files from the given directory tree with the
343 given pattern. If no directory is given then find_api_dir() is used
344 to locate one. If no pattern is given then all definition files found
345 in the directory tree are used.
347 :param api_dir: A directory tree in which to locate API definition
348 files; subdirectories are descended into.
349 If this is None then find_api_dir() is called to discover it.
350 :param patterns: A list of patterns to use in each visited directory
351 when looking for files.
352 This can be a list/tuple object or a comma-separated string of
353 patterns. Each value in the list will have leading/trialing
355 The pattern specifies the first part of the filename, '.api.json'
357 The results are de-duplicated, thus overlapping patterns are fine.
358 If this is None it defaults to '*' meaning "all API files".
359 :returns: A list of file paths for the API files found.
362 api_dir = cls.find_api_dir()
364 raise RuntimeError("api_dir cannot be located")
366 if isinstance(patterns, list) or isinstance(patterns, tuple):
367 patterns = [p.strip() + '.api.json' for p in patterns]
369 patterns = [p.strip() + '.api.json' for p in patterns.split(",")]
372 for root, dirnames, files in os.walk(api_dir):
373 # iterate all given patterns and de-dup the result
374 files = set(sum([fnmatch.filter(files, p) for p in patterns], []))
375 for filename in files:
376 api_files.append(os.path.join(root, filename))
381 """Debug function: report current VPP API status to stdout."""
382 print('Connected') if self.connected else print('Not Connected')
383 print('Read API definitions from', ', '.join(self.apifiles))
387 if not hasattr(self, "_api"):
388 raise Exception("Not connected, api definitions not available")
391 def make_function(self, msg, i, multipart, async):
394 return self._call_vpp_async(i, msg, **kwargs)
397 return self._call_vpp(i, msg, multipart, **kwargs)
399 f.__name__ = str(msg.name)
400 f.__doc__ = ", ".join(["%s %s" %
401 (msg.fieldtypes[j], k)
402 for j, k in enumerate(msg.fields)])
405 def _register_functions(self, async=False):
406 self.id_names = [None] * (self.vpp_dictionary_maxid + 1)
407 self.id_msgdef = [None] * (self.vpp_dictionary_maxid + 1)
409 for name, msg in vpp_iterator(self.messages):
410 n = name + '_' + msg.crc[2:]
411 i = vpp_api.vac_get_msg_index(n.encode())
413 self.id_msgdef[i] = msg
414 self.id_names[i] = name
415 # TODO: Fix multipart (use services)
416 multipart = True if name.find('_dump') > 0 else False
417 f = self.make_function(msg, i, multipart, async)
418 setattr(self._api, name, FuncWrapper(f))
421 'No such message type or failed CRC checksum: %s', n)
423 def _write_new_cffi(self, buf):
424 """Send a binary-packed message to VPP."""
425 if not self.connected:
426 raise IOError(1, 'Not connected')
427 return vpp_api.vac_write(ffi.from_buffer(buf), len(buf))
429 def _write_legacy_cffi(self, buf):
430 """Send a binary-packed message to VPP."""
431 if not self.connected:
432 raise IOError(1, 'Not connected')
433 return vpp_api.vac_write(bytes(buf), len(buf))
436 if not self.connected:
437 raise IOError(1, 'Not connected')
438 mem = ffi.new("char **")
439 size = ffi.new("int *")
440 rv = vpp_api.vac_read(mem, size, self.read_timeout)
442 raise IOError(rv, 'vac_read failed')
443 msg = bytes(ffi.buffer(mem[0], size[0]))
444 vpp_api.vac_free(mem[0])
447 def connect_internal(self, name, msg_handler, chroot_prefix, rx_qlen,
449 pfx = chroot_prefix.encode() if chroot_prefix else ffi.NULL
450 rv = vpp_api.vac_connect(name.encode(), pfx, msg_handler, rx_qlen)
452 raise IOError(2, 'Connect failed')
453 self.connected = True
454 self.vpp_dictionary_maxid = vpp_api.vac_msg_table_max_index()
455 self._register_functions(async=async)
457 # Initialise control ping
458 crc = self.messages['control_ping'].crc
459 self.control_ping_index = vpp_api.vac_get_msg_index(
460 ('control_ping' + '_' + crc[2:]).encode())
461 self.control_ping_msgdef = self.messages['control_ping']
462 if self.async_thread:
463 self.event_thread = threading.Thread(
464 target=self.thread_msg_handler)
465 self.event_thread.daemon = True
466 self.event_thread.start()
469 def connect(self, name, chroot_prefix=None, async=False, rx_qlen=32):
472 name - the name of the client.
473 chroot_prefix - if VPP is chroot'ed, the prefix of the jail
474 async - if true, messages are sent without waiting for a reply
475 rx_qlen - the length of the VPP message receive queue between
478 msg_handler = vac_callback_sync if not async else vac_callback_async
479 return self.connect_internal(name, msg_handler, chroot_prefix, rx_qlen,
482 def connect_sync(self, name, chroot_prefix=None, rx_qlen=32):
483 """Attach to VPP in synchronous mode. Application must poll for events.
485 name - the name of the client.
486 chroot_prefix - if VPP is chroot'ed, the prefix of the jail
487 rx_qlen - the length of the VPP message receive queue between
491 return self.connect_internal(name, ffi.NULL, chroot_prefix, rx_qlen,
494 def disconnect(self):
495 """Detach from VPP."""
496 rv = vpp_api.vac_disconnect()
497 self.connected = False
498 self.message_queue.put("terminate event thread")
501 def msg_handler_sync(self, msg):
502 """Process an incoming message from VPP in sync mode.
504 The message may be a reply or it may be an async notification.
506 r = self.decode_incoming_msg(msg)
510 # If we have a context, then use the context to find any
511 # request waiting for a reply
513 if hasattr(r, 'context') and r.context > 0:
517 # No context -> async notification that we feed to the callback
518 self.message_queue.put_nowait(r)
520 raise IOError(2, 'RPC reply message received in event handler')
522 def decode_incoming_msg(self, msg):
524 self.logger.warning('vpp_api.read failed')
527 i, ci = self.header.unpack(msg, 0)
528 if self.id_names[i] == 'rx_thread_exit':
532 # Decode message and returns a tuple.
534 msgobj = self.id_msgdef[i]
536 raise IOError(2, 'Reply message undefined')
538 r = msgobj.unpack(msg)
542 def msg_handler_async(self, msg):
543 """Process a message from VPP in async mode.
545 In async mode, all messages are returned to the callback.
547 r = self.decode_incoming_msg(msg)
551 msgname = type(r).__name__
553 if self.event_callback:
554 self.event_callback(msgname, r)
556 def _control_ping(self, context):
557 """Send a ping command."""
558 self._call_vpp_async(self.control_ping_index,
559 self.control_ping_msgdef,
562 def validate_args(self, msg, kwargs):
563 d = set(kwargs.keys()) - set(msg.field_by_name.keys())
565 raise ValueError('Invalid argument {} to {}'
566 .format(list(d), msg.name))
568 def _call_vpp(self, i, msg, multipart, **kwargs):
569 """Given a message, send the message and await a reply.
571 msgdef - the message packing definition
572 i - the message type index
573 multipart - True if the message returns multiple
575 context - context number - chosen at random if not
577 The remainder of the kwargs are the arguments to the API call.
579 The return value is the message or message array containing
580 the response. It will raise an IOError exception if there was
581 no response within the timeout window.
584 if 'context' not in kwargs:
585 context = self.get_context()
586 kwargs['context'] = context
588 context = kwargs['context']
589 kwargs['_vl_msg_id'] = i
591 self.validate_args(msg, kwargs)
593 vpp_api.vac_rx_suspend()
597 # Send a ping after the request - we use its response
598 # to detect that we have seen all results.
599 self._control_ping(context)
601 # Block until we get a reply.
606 raise IOError(2, 'VPP API client: read failed')
607 r = self.decode_incoming_msg(msg)
608 msgname = type(r).__name__
609 if context not in r or r.context == 0 or context != r.context:
610 self.message_queue.put_nowait(r)
616 if msgname == 'control_ping_reply':
621 vpp_api.vac_rx_resume()
625 def _call_vpp_async(self, i, msg, **kwargs):
626 """Given a message, send the message and await a reply.
628 msgdef - the message packing definition
629 i - the message type index
630 context - context number - chosen at random if not
632 The remainder of the kwargs are the arguments to the API call.
634 if 'context' not in kwargs:
635 context = self.get_context()
636 kwargs['context'] = context
638 context = kwargs['context']
639 kwargs['client_index'] = 0
640 kwargs['_vl_msg_id'] = i
645 def register_event_callback(self, callback):
646 """Register a callback for async messages.
648 This will be called for async notifications in sync mode,
649 and all messages in async mode. In sync mode, replies to
650 requests will not come here.
652 callback is a fn(msg_type_name, msg_type) that will be
653 called when a message comes in. While this function is
654 executing, note that (a) you are in a background thread and
655 may wish to use threading.Lock to protect your datastructures,
656 and (b) message processing from VPP will stop (so if you take
657 a long while about it you may provoke reply timeouts or cause
658 VPP to fill the RX buffer). Passing None will disable the
661 self.event_callback = callback
663 def thread_msg_handler(self):
664 """Python thread calling the user registerd message handler.
666 This is to emulate the old style event callback scheme. Modern
667 clients should provide their own thread to poll the event
671 r = self.message_queue.get()
672 if r == "terminate event thread":
674 msgname = type(r).__name__
675 if self.event_callback:
676 self.event_callback(msgname, r)
679 # vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4