3 # Copyright (c) 2016 Cisco and/or its affiliates.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
17 from __future__ import print_function
18 from __future__ import absolute_import
30 from . vpp_serializer import VPPType, VPPEnumType, VPPUnionType, BaseTypes
31 from . vpp_serializer import VPPMessage, vpp_get_type, VPPTypeAlias
32 from . macaddress import MACAddress, mac_pton, mac_ntop
34 logger = logging.getLogger(__name__)
36 if sys.version[0] == '2':
42 def metaclass(metaclass):
43 @functools.wraps(metaclass)
45 return metaclass(cls.__name__, cls.__bases__, cls.__dict__.copy())
50 class VppEnumType(type):
51 def __getattr__(cls, name):
52 t = vpp_get_type(name)
56 @metaclass(VppEnumType)
57 class VppEnum(object):
61 def vpp_atexit(vpp_weakref):
62 """Clean up VPP connection on shutdown."""
63 vpp_instance = vpp_weakref()
64 if vpp_instance and vpp_instance.transport.connected:
65 vpp_instance.logger.debug('Cleaning up VPP on exit')
66 vpp_instance.disconnect()
69 if sys.version[0] == '2':
77 def call_logger(msgdef, kwargs):
78 s = 'Calling {}('.format(msgdef.name)
79 for k, v in kwargs.items():
80 s += '{}:{} '.format(k, v)
86 s = 'Return from {}'.format(r)
90 class VppApiDynamicMethodHolder(object):
94 class FuncWrapper(object):
95 def __init__(self, func):
97 self.__name__ = func.__name__
99 def __call__(self, **kwargs):
100 return self._func(**kwargs)
103 class VPPApiError(Exception):
107 class VPPNotImplementedError(NotImplementedError):
111 class VPPIOError(IOError):
115 class VPPRuntimeError(RuntimeError):
119 class VPPValueError(ValueError):
126 This class provides the APIs to VPP. The APIs are loaded
127 from provided .api.json files and makes functions accordingly.
128 These functions are documented in the VPP .api files, as they
129 are dynamically created.
131 Additionally, VPP can send callback messages; this class
132 provides a means to register a callback function to receive
133 these messages in a background thread.
135 VPPApiError = VPPApiError
136 VPPRuntimeError = VPPRuntimeError
137 VPPValueError = VPPValueError
138 VPPNotImplementedError = VPPNotImplementedError
139 VPPIOError = VPPIOError
141 def process_json_file(self, apidef_file):
142 api = json.load(apidef_file)
144 for t in api['enums']:
145 t[0] = 'vl_api_' + t[0] + '_t'
146 types[t[0]] = {'type': 'enum', 'data': t}
147 for t in api['unions']:
148 t[0] = 'vl_api_' + t[0] + '_t'
149 types[t[0]] = {'type': 'union', 'data': t}
150 for t in api['types']:
151 t[0] = 'vl_api_' + t[0] + '_t'
152 types[t[0]] = {'type': 'type', 'data': t}
153 for t, v in api['aliases'].items():
154 types['vl_api_' + t + '_t'] = {'type': 'alias', 'data': v}
155 self.services.update(api['services'])
160 for k, v in types.items():
162 if not vpp_get_type(k):
163 if v['type'] == 'enum':
165 VPPEnumType(t[0], t[1:])
168 elif v['type'] == 'union':
170 VPPUnionType(t[0], t[1:])
173 elif v['type'] == 'type':
178 elif v['type'] == 'alias':
183 if len(unresolved) == 0:
186 raise VPPValueError('Unresolved type definitions {}'
191 for m in api['messages']:
193 self.messages[m[0]] = VPPMessage(m[0], m[1:])
194 except VPPNotImplementedError:
195 self.logger.error('Not implemented error for {}'.format(m[0]))
197 def __init__(self, apifiles=None, testmode=False, async_thread=True,
198 logger=None, loglevel=None,
199 read_timeout=5, use_socket=False,
200 server_address='/run/vpp-api.sock'):
201 """Create a VPP API object.
203 apifiles is a list of files containing API
204 descriptions that will be loaded - methods will be
205 dynamically created reflecting these APIs. If not
206 provided this will load the API files from VPP's
207 default install location.
209 logger, if supplied, is the logging logger object to log to.
210 loglevel, if supplied, is the log level this logger is set
211 to report at (from the loglevels in the logging module).
214 logger = logging.getLogger(__name__)
215 if loglevel is not None:
216 logger.setLevel(loglevel)
223 self.header = VPPType('header', [['u16', 'msgid'],
224 ['u32', 'client_index']])
226 self.event_callback = None
227 self.message_queue = queue.Queue()
228 self.read_timeout = read_timeout
229 self.async_thread = async_thread
232 from . vpp_transport_socket import VppTransport
234 from . vpp_transport_shmem import VppTransport
237 # Pick up API definitions from default directory
239 apifiles = self.find_api_files()
241 # In test mode we don't care that we can't find the API files
245 raise VPPRuntimeError
247 for file in apifiles:
248 with open(file) as apidef_file:
249 self.process_json_file(apidef_file)
251 self.apifiles = apifiles
254 if len(self.messages) == 0 and not testmode:
255 raise VPPValueError(1, 'Missing JSON message definitions')
257 self.transport = VppTransport(self, read_timeout=read_timeout,
258 server_address=server_address)
259 # Make sure we allow VPP to clean up the message rings.
260 atexit.register(vpp_atexit, weakref.ref(self))
262 class ContextId(object):
263 """Thread-safe provider of unique context IDs."""
266 self.lock = threading.Lock()
269 """Get a new unique (or, at least, not recently used) context."""
273 get_context = ContextId()
275 def get_type(self, name):
276 return vpp_get_type(name)
279 def find_api_dir(cls):
280 """Attempt to find the best directory in which API definition
281 files may reside. If the value VPP_API_DIR exists in the environment
282 then it is first on the search list. If we're inside a recognized
283 location in a VPP source tree (src/scripts and src/vpp-api/python)
284 then entries from there to the likely locations in build-root are
285 added. Finally the location used by system packages is added.
287 :returns: A single directory name, or None if no such directory
292 if 'VPP_API_DIR' in os.environ:
293 dirs.append(os.environ['VPP_API_DIR'])
295 # perhaps we're in the 'src/scripts' or 'src/vpp-api/python' dir;
296 # in which case, plot a course to likely places in the src tree
297 import __main__ as main
298 if hasattr(main, '__file__'):
299 # get the path of the calling script
300 localdir = os.path.dirname(os.path.realpath(main.__file__))
302 # use cwd if there is no calling script
303 localdir = os.getcwd()
304 localdir_s = localdir.split(os.path.sep)
307 """Match dir against right-hand components of the script dir"""
308 d = dir.split('/') # param 'dir' assumes a / separator
310 return len(localdir_s) > length and localdir_s[-length:] == d
312 def sdir(srcdir, variant):
313 """Build a path from srcdir to the staged API files of
314 'variant' (typically '' or '_debug')"""
315 # Since 'core' and 'plugin' files are staged
316 # in separate directories, we target the parent dir.
317 return os.path.sep.join((
320 'install-vpp%s-native' % variant,
328 if dmatch('src/scripts'):
329 srcdir = os.path.sep.join(localdir_s[:-2])
330 elif dmatch('src/vpp-api/python'):
331 srcdir = os.path.sep.join(localdir_s[:-3])
333 # we're apparently running tests
334 srcdir = os.path.sep.join(localdir_s[:-1])
337 # we're in the source tree, try both the debug and release
339 dirs.append(sdir(srcdir, '_debug'))
340 dirs.append(sdir(srcdir, ''))
342 # Test for staged copies of the scripts
343 # For these, since we explicitly know if we're running a debug versus
344 # release variant, target only the relevant directory
345 if dmatch('build-root/install-vpp_debug-native/vpp/bin'):
346 srcdir = os.path.sep.join(localdir_s[:-4])
347 dirs.append(sdir(srcdir, '_debug'))
348 if dmatch('build-root/install-vpp-native/vpp/bin'):
349 srcdir = os.path.sep.join(localdir_s[:-4])
350 dirs.append(sdir(srcdir, ''))
352 # finally, try the location system packages typically install into
353 dirs.append(os.path.sep.join(('', 'usr', 'share', 'vpp', 'api')))
355 # check the directories for existance; first one wins
357 if os.path.isdir(dir):
363 def find_api_files(cls, api_dir=None, patterns='*'):
364 """Find API definition files from the given directory tree with the
365 given pattern. If no directory is given then find_api_dir() is used
366 to locate one. If no pattern is given then all definition files found
367 in the directory tree are used.
369 :param api_dir: A directory tree in which to locate API definition
370 files; subdirectories are descended into.
371 If this is None then find_api_dir() is called to discover it.
372 :param patterns: A list of patterns to use in each visited directory
373 when looking for files.
374 This can be a list/tuple object or a comma-separated string of
375 patterns. Each value in the list will have leading/trialing
377 The pattern specifies the first part of the filename, '.api.json'
379 The results are de-duplicated, thus overlapping patterns are fine.
380 If this is None it defaults to '*' meaning "all API files".
381 :returns: A list of file paths for the API files found.
384 api_dir = cls.find_api_dir()
386 raise VPPApiError("api_dir cannot be located")
388 if isinstance(patterns, list) or isinstance(patterns, tuple):
389 patterns = [p.strip() + '.api.json' for p in patterns]
391 patterns = [p.strip() + '.api.json' for p in patterns.split(",")]
394 for root, dirnames, files in os.walk(api_dir):
395 # iterate all given patterns and de-dup the result
396 files = set(sum([fnmatch.filter(files, p) for p in patterns], []))
397 for filename in files:
398 api_files.append(os.path.join(root, filename))
404 if not hasattr(self, "_api"):
405 raise VPPApiError("Not connected, api definitions not available")
408 def make_function(self, msg, i, multipart, do_async):
411 return self._call_vpp_async(i, msg, **kwargs)
414 return self._call_vpp(i, msg, multipart, **kwargs)
416 f.__name__ = str(msg.name)
417 f.__doc__ = ", ".join(["%s %s" %
418 (msg.fieldtypes[j], k)
419 for j, k in enumerate(msg.fields)])
422 def _register_functions(self, do_async=False):
423 self.id_names = [None] * (self.vpp_dictionary_maxid + 1)
424 self.id_msgdef = [None] * (self.vpp_dictionary_maxid + 1)
425 self._api = VppApiDynamicMethodHolder()
426 for name, msg in vpp_iterator(self.messages):
427 n = name + '_' + msg.crc[2:]
428 i = self.transport.get_msg_index(n.encode())
430 self.id_msgdef[i] = msg
431 self.id_names[i] = name
433 # Create function for client side messages.
434 if name in self.services:
435 if 'stream' in self.services[name] and \
436 self.services[name]['stream']:
440 f = self.make_function(msg, i, multipart, do_async)
441 setattr(self._api, name, FuncWrapper(f))
444 'No such message type or failed CRC checksum: %s', n)
446 def connect_internal(self, name, msg_handler, chroot_prefix, rx_qlen,
448 pfx = chroot_prefix.encode() if chroot_prefix else None
450 rv = self.transport.connect(name.encode(), pfx, msg_handler, rx_qlen)
452 raise VPPIOError(2, 'Connect failed')
453 self.vpp_dictionary_maxid = self.transport.msg_table_max_index()
454 self._register_functions(do_async=do_async)
456 # Initialise control ping
457 crc = self.messages['control_ping'].crc
458 self.control_ping_index = self.transport.get_msg_index(
459 ('control_ping' + '_' + crc[2:]).encode())
460 self.control_ping_msgdef = self.messages['control_ping']
461 if self.async_thread:
462 self.event_thread = threading.Thread(
463 target=self.thread_msg_handler)
464 self.event_thread.daemon = True
465 self.event_thread.start()
468 def connect(self, name, chroot_prefix=None, do_async=False, rx_qlen=32):
471 name - the name of the client.
472 chroot_prefix - if VPP is chroot'ed, the prefix of the jail
473 do_async - if true, messages are sent without waiting for a reply
474 rx_qlen - the length of the VPP message receive queue between
477 msg_handler = self.transport.get_callback(do_async)
478 return self.connect_internal(name, msg_handler, chroot_prefix, rx_qlen,
481 def connect_sync(self, name, chroot_prefix=None, rx_qlen=32):
482 """Attach to VPP in synchronous mode. Application must poll for events.
484 name - the name of the client.
485 chroot_prefix - if VPP is chroot'ed, the prefix of the jail
486 rx_qlen - the length of the VPP message receive queue between
490 return self.connect_internal(name, None, chroot_prefix, rx_qlen,
493 def disconnect(self):
494 """Detach from VPP."""
495 rv = self.transport.disconnect()
496 self.message_queue.put("terminate event thread")
499 def msg_handler_sync(self, msg):
500 """Process an incoming message from VPP in sync mode.
502 The message may be a reply or it may be an async notification.
504 r = self.decode_incoming_msg(msg)
508 # If we have a context, then use the context to find any
509 # request waiting for a reply
511 if hasattr(r, 'context') and r.context > 0:
515 # No context -> async notification that we feed to the callback
516 self.message_queue.put_nowait(r)
518 raise VPPIOError(2, 'RPC reply message received in event handler')
520 def has_context(self, msg):
524 header = VPPType('header_with_context', [['u16', 'msgid'],
525 ['u32', 'client_index'],
528 (i, ci, context), size = header.unpack(msg, 0)
529 if self.id_names[i] == 'rx_thread_exit':
533 # Decode message and returns a tuple.
535 msgobj = self.id_msgdef[i]
536 if 'context' in msgobj.field_by_name and context >= 0:
540 def decode_incoming_msg(self, msg, no_type_conversion=False):
542 self.logger.warning('vpp_api.read failed')
545 (i, ci), size = self.header.unpack(msg, 0)
546 if self.id_names[i] == 'rx_thread_exit':
550 # Decode message and returns a tuple.
552 msgobj = self.id_msgdef[i]
554 raise VPPIOError(2, 'Reply message undefined')
556 r, size = msgobj.unpack(msg, ntc=no_type_conversion)
559 def msg_handler_async(self, msg):
560 """Process a message from VPP in async mode.
562 In async mode, all messages are returned to the callback.
564 r = self.decode_incoming_msg(msg)
568 msgname = type(r).__name__
570 if self.event_callback:
571 self.event_callback(msgname, r)
573 def _control_ping(self, context):
574 """Send a ping command."""
575 self._call_vpp_async(self.control_ping_index,
576 self.control_ping_msgdef,
579 def validate_args(self, msg, kwargs):
580 d = set(kwargs.keys()) - set(msg.field_by_name.keys())
582 raise VPPValueError('Invalid argument {} to {}'
583 .format(list(d), msg.name))
585 def _call_vpp(self, i, msgdef, multipart, **kwargs):
586 """Given a message, send the message and await a reply.
588 msgdef - the message packing definition
589 i - the message type index
590 multipart - True if the message returns multiple
592 context - context number - chosen at random if not
594 The remainder of the kwargs are the arguments to the API call.
596 The return value is the message or message array containing
597 the response. It will raise an IOError exception if there was
598 no response within the timeout window.
601 if 'context' not in kwargs:
602 context = self.get_context()
603 kwargs['context'] = context
605 context = kwargs['context']
606 kwargs['_vl_msg_id'] = i
608 no_type_conversion = kwargs.pop('_no_type_conversion', False)
611 if self.transport.socket_index:
612 kwargs['client_index'] = self.transport.socket_index
613 except AttributeError:
615 self.validate_args(msgdef, kwargs)
617 logging.debug(call_logger(msgdef, kwargs))
619 b = msgdef.pack(kwargs)
620 self.transport.suspend()
622 self.transport.write(b)
625 # Send a ping after the request - we use its response
626 # to detect that we have seen all results.
627 self._control_ping(context)
629 # Block until we get a reply.
632 msg = self.transport.read()
634 raise VPPIOError(2, 'VPP API client: read failed')
635 r = self.decode_incoming_msg(msg, no_type_conversion)
636 msgname = type(r).__name__
637 if context not in r or r.context == 0 or context != r.context:
638 # Message being queued
639 self.message_queue.put_nowait(r)
645 if msgname == 'control_ping_reply':
650 self.transport.resume()
652 logger.debug(return_logger(rl))
655 def _call_vpp_async(self, i, msg, **kwargs):
656 """Given a message, send the message and await a reply.
658 msgdef - the message packing definition
659 i - the message type index
660 context - context number - chosen at random if not
662 The remainder of the kwargs are the arguments to the API call.
664 if 'context' not in kwargs:
665 context = self.get_context()
666 kwargs['context'] = context
668 context = kwargs['context']
670 if self.transport.socket_index:
671 kwargs['client_index'] = self.transport.socket_index
672 except AttributeError:
673 kwargs['client_index'] = 0
674 kwargs['_vl_msg_id'] = i
677 self.transport.write(b)
679 def register_event_callback(self, callback):
680 """Register a callback for async messages.
682 This will be called for async notifications in sync mode,
683 and all messages in async mode. In sync mode, replies to
684 requests will not come here.
686 callback is a fn(msg_type_name, msg_type) that will be
687 called when a message comes in. While this function is
688 executing, note that (a) you are in a background thread and
689 may wish to use threading.Lock to protect your datastructures,
690 and (b) message processing from VPP will stop (so if you take
691 a long while about it you may provoke reply timeouts or cause
692 VPP to fill the RX buffer). Passing None will disable the
695 self.event_callback = callback
697 def thread_msg_handler(self):
698 """Python thread calling the user registered message handler.
700 This is to emulate the old style event callback scheme. Modern
701 clients should provide their own thread to poll the event
705 r = self.message_queue.get()
706 if r == "terminate event thread":
708 msgname = type(r).__name__
709 if self.event_callback:
710 self.event_callback(msgname, r)
713 # vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4