3 # Copyright (c) 2016 Cisco and/or its affiliates.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
17 from __future__ import print_function
18 from __future__ import absolute_import
21 import multiprocessing as mp
32 from . vpp_serializer import VPPType, VPPEnumType, VPPUnionType, BaseTypes
33 from . vpp_serializer import VPPMessage, vpp_get_type, VPPTypeAlias
34 from . macaddress import MACAddress, mac_pton, mac_ntop
36 logger = logging.getLogger(__name__)
38 if sys.version[0] == '2':
44 def metaclass(metaclass):
45 @functools.wraps(metaclass)
47 return metaclass(cls.__name__, cls.__bases__, cls.__dict__.copy())
52 class VppEnumType(type):
53 def __getattr__(cls, name):
54 t = vpp_get_type(name)
58 @metaclass(VppEnumType)
59 class VppEnum(object):
63 def vpp_atexit(vpp_weakref):
64 """Clean up VPP connection on shutdown."""
65 vpp_instance = vpp_weakref()
66 if vpp_instance and vpp_instance.transport.connected:
67 vpp_instance.logger.debug('Cleaning up VPP on exit')
68 vpp_instance.disconnect()
71 if sys.version[0] == '2':
79 def call_logger(msgdef, kwargs):
80 s = 'Calling {}('.format(msgdef.name)
81 for k, v in kwargs.items():
82 s += '{}:{} '.format(k, v)
88 s = 'Return from {}'.format(r)
92 class VppApiDynamicMethodHolder(object):
96 class FuncWrapper(object):
97 def __init__(self, func):
99 self.__name__ = func.__name__
100 self.__doc__ = func.__doc__
102 def __call__(self, **kwargs):
103 return self._func(**kwargs)
106 class VPPApiError(Exception):
110 class VPPNotImplementedError(NotImplementedError):
114 class VPPIOError(IOError):
118 class VPPRuntimeError(RuntimeError):
122 class VPPValueError(ValueError):
126 class VPPApiClient(object):
129 This class provides the APIs to VPP. The APIs are loaded
130 from provided .api.json files and makes functions accordingly.
131 These functions are documented in the VPP .api files, as they
132 are dynamically created.
134 Additionally, VPP can send callback messages; this class
135 provides a means to register a callback function to receive
136 these messages in a background thread.
139 VPPApiError = VPPApiError
140 VPPRuntimeError = VPPRuntimeError
141 VPPValueError = VPPValueError
142 VPPNotImplementedError = VPPNotImplementedError
143 VPPIOError = VPPIOError
145 def process_json_file(self, apidef_file):
146 api = json.load(apidef_file)
148 for t in api['enums']:
149 t[0] = 'vl_api_' + t[0] + '_t'
150 types[t[0]] = {'type': 'enum', 'data': t}
151 for t in api['unions']:
152 t[0] = 'vl_api_' + t[0] + '_t'
153 types[t[0]] = {'type': 'union', 'data': t}
154 for t in api['types']:
155 t[0] = 'vl_api_' + t[0] + '_t'
156 types[t[0]] = {'type': 'type', 'data': t}
157 for t, v in api['aliases'].items():
158 types['vl_api_' + t + '_t'] = {'type': 'alias', 'data': v}
159 self.services.update(api['services'])
164 for k, v in types.items():
166 if not vpp_get_type(k):
167 if v['type'] == 'enum':
169 VPPEnumType(t[0], t[1:])
172 elif v['type'] == 'union':
174 VPPUnionType(t[0], t[1:])
177 elif v['type'] == 'type':
182 elif v['type'] == 'alias':
187 if len(unresolved) == 0:
190 raise VPPValueError('Unresolved type definitions {}'
195 for m in api['messages']:
197 self.messages[m[0]] = VPPMessage(m[0], m[1:])
198 except VPPNotImplementedError:
199 self.logger.error('Not implemented error for {}'.format(m[0]))
201 def __init__(self, apifiles=None, testmode=False, async_thread=True,
202 logger=None, loglevel=None,
203 read_timeout=5, use_socket=False,
204 server_address='/run/vpp-api.sock'):
205 """Create a VPP API object.
207 apifiles is a list of files containing API
208 descriptions that will be loaded - methods will be
209 dynamically created reflecting these APIs. If not
210 provided this will load the API files from VPP's
211 default install location.
213 logger, if supplied, is the logging logger object to log to.
214 loglevel, if supplied, is the log level this logger is set
215 to report at (from the loglevels in the logging module).
218 logger = logging.getLogger(__name__)
219 if loglevel is not None:
220 logger.setLevel(loglevel)
227 self.header = VPPType('header', [['u16', 'msgid'],
228 ['u32', 'client_index']])
230 self.event_callback = None
231 self.message_queue = queue.Queue()
232 self.read_timeout = read_timeout
233 self.async_thread = async_thread
234 self.event_thread = None
237 from . vpp_transport_socket import VppTransport
239 from . vpp_transport_shmem import VppTransport
242 # Pick up API definitions from default directory
244 apifiles = self.find_api_files()
246 # In test mode we don't care that we can't find the API files
250 raise VPPRuntimeError
252 for file in apifiles:
253 with open(file) as apidef_file:
254 self.process_json_file(apidef_file)
256 self.apifiles = apifiles
259 if len(self.messages) == 0 and not testmode:
260 raise VPPValueError(1, 'Missing JSON message definitions')
262 self.transport = VppTransport(self, read_timeout=read_timeout,
263 server_address=server_address)
264 # Make sure we allow VPP to clean up the message rings.
265 atexit.register(vpp_atexit, weakref.ref(self))
267 class ContextId(object):
268 """Multiprocessing-safe provider of unique context IDs."""
270 self.context = mp.Value(ctypes.c_uint, 0)
271 self.lock = mp.Lock()
274 """Get a new unique (or, at least, not recently used) context."""
276 self.context.value += 1
277 return self.context.value
278 get_context = ContextId()
280 def get_type(self, name):
281 return vpp_get_type(name)
284 def find_api_dir(cls):
285 """Attempt to find the best directory in which API definition
286 files may reside. If the value VPP_API_DIR exists in the environment
287 then it is first on the search list. If we're inside a recognized
288 location in a VPP source tree (src/scripts and src/vpp-api/python)
289 then entries from there to the likely locations in build-root are
290 added. Finally the location used by system packages is added.
292 :returns: A single directory name, or None if no such directory
295 dirs = [cls.apidir] if cls.apidir else []
297 # perhaps we're in the 'src/scripts' or 'src/vpp-api/python' dir;
298 # in which case, plot a course to likely places in the src tree
299 import __main__ as main
300 if hasattr(main, '__file__'):
301 # get the path of the calling script
302 localdir = os.path.dirname(os.path.realpath(main.__file__))
304 # use cwd if there is no calling script
305 localdir = os.getcwd()
306 localdir_s = localdir.split(os.path.sep)
309 """Match dir against right-hand components of the script dir"""
310 d = dir.split('/') # param 'dir' assumes a / separator
312 return len(localdir_s) > length and localdir_s[-length:] == d
314 def sdir(srcdir, variant):
315 """Build a path from srcdir to the staged API files of
316 'variant' (typically '' or '_debug')"""
317 # Since 'core' and 'plugin' files are staged
318 # in separate directories, we target the parent dir.
319 return os.path.sep.join((
322 'install-vpp%s-native' % variant,
330 if dmatch('src/scripts'):
331 srcdir = os.path.sep.join(localdir_s[:-2])
332 elif dmatch('src/vpp-api/python'):
333 srcdir = os.path.sep.join(localdir_s[:-3])
335 # we're apparently running tests
336 srcdir = os.path.sep.join(localdir_s[:-1])
339 # we're in the source tree, try both the debug and release
341 dirs.append(sdir(srcdir, '_debug'))
342 dirs.append(sdir(srcdir, ''))
344 # Test for staged copies of the scripts
345 # For these, since we explicitly know if we're running a debug versus
346 # release variant, target only the relevant directory
347 if dmatch('build-root/install-vpp_debug-native/vpp/bin'):
348 srcdir = os.path.sep.join(localdir_s[:-4])
349 dirs.append(sdir(srcdir, '_debug'))
350 if dmatch('build-root/install-vpp-native/vpp/bin'):
351 srcdir = os.path.sep.join(localdir_s[:-4])
352 dirs.append(sdir(srcdir, ''))
354 # finally, try the location system packages typically install into
355 dirs.append(os.path.sep.join(('', 'usr', 'share', 'vpp', 'api')))
357 # check the directories for existence; first one wins
359 if os.path.isdir(dir):
365 def find_api_files(cls, api_dir=None, patterns='*'):
366 """Find API definition files from the given directory tree with the
367 given pattern. If no directory is given then find_api_dir() is used
368 to locate one. If no pattern is given then all definition files found
369 in the directory tree are used.
371 :param api_dir: A directory tree in which to locate API definition
372 files; subdirectories are descended into.
373 If this is None then find_api_dir() is called to discover it.
374 :param patterns: A list of patterns to use in each visited directory
375 when looking for files.
376 This can be a list/tuple object or a comma-separated string of
377 patterns. Each value in the list will have leading/trialing
379 The pattern specifies the first part of the filename, '.api.json'
381 The results are de-duplicated, thus overlapping patterns are fine.
382 If this is None it defaults to '*' meaning "all API files".
383 :returns: A list of file paths for the API files found.
386 api_dir = cls.find_api_dir()
388 raise VPPApiError("api_dir cannot be located")
390 if isinstance(patterns, list) or isinstance(patterns, tuple):
391 patterns = [p.strip() + '.api.json' for p in patterns]
393 patterns = [p.strip() + '.api.json' for p in patterns.split(",")]
396 for root, dirnames, files in os.walk(api_dir):
397 # iterate all given patterns and de-dup the result
398 files = set(sum([fnmatch.filter(files, p) for p in patterns], []))
399 for filename in files:
400 api_files.append(os.path.join(root, filename))
406 if not hasattr(self, "_api"):
407 raise VPPApiError("Not connected, api definitions not available")
410 def make_function(self, msg, i, multipart, do_async):
413 return self._call_vpp_async(i, msg, **kwargs)
416 return self._call_vpp(i, msg, multipart, **kwargs)
418 f.__name__ = str(msg.name)
419 f.__doc__ = ", ".join(["%s %s" %
420 (msg.fieldtypes[j], k)
421 for j, k in enumerate(msg.fields)])
426 def _register_functions(self, do_async=False):
427 self.id_names = [None] * (self.vpp_dictionary_maxid + 1)
428 self.id_msgdef = [None] * (self.vpp_dictionary_maxid + 1)
429 self._api = VppApiDynamicMethodHolder()
430 for name, msg in vpp_iterator(self.messages):
431 n = name + '_' + msg.crc[2:]
432 i = self.transport.get_msg_index(n.encode('utf-8'))
434 self.id_msgdef[i] = msg
435 self.id_names[i] = name
437 # Create function for client side messages.
438 if name in self.services:
439 if 'stream' in self.services[name] and \
440 self.services[name]['stream']:
444 f = self.make_function(msg, i, multipart, do_async)
445 setattr(self._api, name, FuncWrapper(f))
448 'No such message type or failed CRC checksum: %s', n)
450 def connect_internal(self, name, msg_handler, chroot_prefix, rx_qlen,
452 pfx = chroot_prefix.encode('utf-8') if chroot_prefix else None
454 rv = self.transport.connect(name.encode('utf-8'), pfx,
455 msg_handler, rx_qlen)
457 raise VPPIOError(2, 'Connect failed')
458 self.vpp_dictionary_maxid = self.transport.msg_table_max_index()
459 self._register_functions(do_async=do_async)
461 # Initialise control ping
462 crc = self.messages['control_ping'].crc
463 self.control_ping_index = self.transport.get_msg_index(
464 ('control_ping' + '_' + crc[2:]).encode('utf-8'))
465 self.control_ping_msgdef = self.messages['control_ping']
466 if self.async_thread:
467 self.event_thread = threading.Thread(
468 target=self.thread_msg_handler)
469 self.event_thread.daemon = True
470 self.event_thread.start()
472 self.event_thread = None
475 def connect(self, name, chroot_prefix=None, do_async=False, rx_qlen=32):
478 name - the name of the client.
479 chroot_prefix - if VPP is chroot'ed, the prefix of the jail
480 do_async - if true, messages are sent without waiting for a reply
481 rx_qlen - the length of the VPP message receive queue between
484 msg_handler = self.transport.get_callback(do_async)
485 return self.connect_internal(name, msg_handler, chroot_prefix, rx_qlen,
488 def connect_sync(self, name, chroot_prefix=None, rx_qlen=32):
489 """Attach to VPP in synchronous mode. Application must poll for events.
491 name - the name of the client.
492 chroot_prefix - if VPP is chroot'ed, the prefix of the jail
493 rx_qlen - the length of the VPP message receive queue between
497 return self.connect_internal(name, None, chroot_prefix, rx_qlen,
500 def disconnect(self):
501 """Detach from VPP."""
502 rv = self.transport.disconnect()
503 if self.event_thread is not None:
504 self.message_queue.put("terminate event thread")
507 def msg_handler_sync(self, msg):
508 """Process an incoming message from VPP in sync mode.
510 The message may be a reply or it may be an async notification.
512 r = self.decode_incoming_msg(msg)
516 # If we have a context, then use the context to find any
517 # request waiting for a reply
519 if hasattr(r, 'context') and r.context > 0:
523 # No context -> async notification that we feed to the callback
524 self.message_queue.put_nowait(r)
526 raise VPPIOError(2, 'RPC reply message received in event handler')
528 def has_context(self, msg):
532 header = VPPType('header_with_context', [['u16', 'msgid'],
533 ['u32', 'client_index'],
536 (i, ci, context), size = header.unpack(msg, 0)
537 if self.id_names[i] == 'rx_thread_exit':
541 # Decode message and returns a tuple.
543 msgobj = self.id_msgdef[i]
544 if 'context' in msgobj.field_by_name and context >= 0:
548 def decode_incoming_msg(self, msg, no_type_conversion=False):
550 self.logger.warning('vpp_api.read failed')
553 (i, ci), size = self.header.unpack(msg, 0)
554 if self.id_names[i] == 'rx_thread_exit':
558 # Decode message and returns a tuple.
560 msgobj = self.id_msgdef[i]
562 raise VPPIOError(2, 'Reply message undefined')
564 r, size = msgobj.unpack(msg, ntc=no_type_conversion)
567 def msg_handler_async(self, msg):
568 """Process a message from VPP in async mode.
570 In async mode, all messages are returned to the callback.
572 r = self.decode_incoming_msg(msg)
576 msgname = type(r).__name__
578 if self.event_callback:
579 self.event_callback(msgname, r)
581 def _control_ping(self, context):
582 """Send a ping command."""
583 self._call_vpp_async(self.control_ping_index,
584 self.control_ping_msgdef,
587 def validate_args(self, msg, kwargs):
588 d = set(kwargs.keys()) - set(msg.field_by_name.keys())
590 raise VPPValueError('Invalid argument {} to {}'
591 .format(list(d), msg.name))
593 def _call_vpp(self, i, msgdef, multipart, **kwargs):
594 """Given a message, send the message and await a reply.
596 msgdef - the message packing definition
597 i - the message type index
598 multipart - True if the message returns multiple
600 context - context number - chosen at random if not
602 The remainder of the kwargs are the arguments to the API call.
604 The return value is the message or message array containing
605 the response. It will raise an IOError exception if there was
606 no response within the timeout window.
609 if 'context' not in kwargs:
610 context = self.get_context()
611 kwargs['context'] = context
613 context = kwargs['context']
614 kwargs['_vl_msg_id'] = i
616 no_type_conversion = kwargs.pop('_no_type_conversion', False)
619 if self.transport.socket_index:
620 kwargs['client_index'] = self.transport.socket_index
621 except AttributeError:
623 self.validate_args(msgdef, kwargs)
625 logging.debug(call_logger(msgdef, kwargs))
627 b = msgdef.pack(kwargs)
628 self.transport.suspend()
630 self.transport.write(b)
633 # Send a ping after the request - we use its response
634 # to detect that we have seen all results.
635 self._control_ping(context)
637 # Block until we get a reply.
640 msg = self.transport.read()
642 raise VPPIOError(2, 'VPP API client: read failed')
643 r = self.decode_incoming_msg(msg, no_type_conversion)
644 msgname = type(r).__name__
645 if context not in r or r.context == 0 or context != r.context:
646 # Message being queued
647 self.message_queue.put_nowait(r)
653 if msgname == 'control_ping_reply':
658 self.transport.resume()
660 logger.debug(return_logger(rl))
663 def _call_vpp_async(self, i, msg, **kwargs):
664 """Given a message, send the message and await a reply.
666 msgdef - the message packing definition
667 i - the message type index
668 context - context number - chosen at random if not
670 The remainder of the kwargs are the arguments to the API call.
672 if 'context' not in kwargs:
673 context = self.get_context()
674 kwargs['context'] = context
676 context = kwargs['context']
678 if self.transport.socket_index:
679 kwargs['client_index'] = self.transport.socket_index
680 except AttributeError:
681 kwargs['client_index'] = 0
682 kwargs['_vl_msg_id'] = i
685 self.transport.write(b)
687 def register_event_callback(self, callback):
688 """Register a callback for async messages.
690 This will be called for async notifications in sync mode,
691 and all messages in async mode. In sync mode, replies to
692 requests will not come here.
694 callback is a fn(msg_type_name, msg_type) that will be
695 called when a message comes in. While this function is
696 executing, note that (a) you are in a background thread and
697 may wish to use threading.Lock to protect your datastructures,
698 and (b) message processing from VPP will stop (so if you take
699 a long while about it you may provoke reply timeouts or cause
700 VPP to fill the RX buffer). Passing None will disable the
703 self.event_callback = callback
705 def thread_msg_handler(self):
706 """Python thread calling the user registered message handler.
708 This is to emulate the old style event callback scheme. Modern
709 clients should provide their own thread to poll the event
713 r = self.message_queue.get()
714 if r == "terminate event thread":
716 msgname = type(r).__name__
717 if self.event_callback:
718 self.event_callback(msgname, r)
720 # Provide the old name for backward compatibility.
723 # vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4