3 # Copyright (c) 2016 Cisco and/or its affiliates.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
17 from __future__ import print_function
18 from __future__ import absolute_import
22 import multiprocessing as mp
32 from . vpp_format import verify_enum_hint
33 from . vpp_serializer import VPPType, VPPEnumType, VPPUnionType
34 from . vpp_serializer import VPPMessage, vpp_get_type, VPPTypeAlias
36 if sys.version[0] == '2':
41 __all__ = ('FuncWrapper', 'VPP', 'VppApiDynamicMethodHolder',
42 'VppEnum', 'VppEnumType',
43 'VPPIOError', 'VPPRuntimeError', 'VPPValueError',
47 def metaclass(metaclass):
48 @functools.wraps(metaclass)
50 return metaclass(cls.__name__, cls.__bases__, cls.__dict__.copy())
55 class VppEnumType(type):
56 def __getattr__(cls, name):
57 t = vpp_get_type(name)
61 @metaclass(VppEnumType)
62 class VppEnum(object):
66 def vpp_atexit(vpp_weakref):
67 """Clean up VPP connection on shutdown."""
68 vpp_instance = vpp_weakref()
69 if vpp_instance and vpp_instance.transport.connected:
70 vpp_instance.logger.debug('Cleaning up VPP on exit')
71 vpp_instance.disconnect()
74 if sys.version[0] == '2':
82 def add_convenience_methods():
83 # provide convenience methods to IP[46]Address.vapi_af
85 if 6 == self._version:
86 return VppEnum.vl_api_address_family_t.ADDRESS_IP6.value
87 if 4 == self._version:
88 return VppEnum.vl_api_address_family_t.ADDRESS_IP4.value
89 raise ValueError("Invalid _version.")
91 def _vapi_af_name(self):
92 if 6 == self._version:
94 if 4 == self._version:
96 raise ValueError("Invalid _version.")
98 ipaddress._IPAddressBase.vapi_af = property(_vapi_af)
99 ipaddress._IPAddressBase.vapi_af_name = property(_vapi_af_name)
102 class VppApiDynamicMethodHolder(object):
106 class FuncWrapper(object):
107 def __init__(self, func):
109 self.__name__ = func.__name__
110 self.__doc__ = func.__doc__
112 def __call__(self, **kwargs):
113 return self._func(**kwargs)
116 return '<FuncWrapper(func=<%s(%s)>)>' % (self.__name__, self.__doc__)
119 class VPPApiError(Exception):
123 class VPPNotImplementedError(NotImplementedError):
127 class VPPIOError(IOError):
131 class VPPRuntimeError(RuntimeError):
135 class VPPValueError(ValueError):
139 class VPPApiJSONFiles(object):
141 def find_api_dir(cls, dirs):
142 """Attempt to find the best directory in which API definition
143 files may reside. If the value VPP_API_DIR exists in the environment
144 then it is first on the search list. If we're inside a recognized
145 location in a VPP source tree (src/scripts and src/vpp-api/python)
146 then entries from there to the likely locations in build-root are
147 added. Finally the location used by system packages is added.
149 :returns: A single directory name, or None if no such directory
153 # perhaps we're in the 'src/scripts' or 'src/vpp-api/python' dir;
154 # in which case, plot a course to likely places in the src tree
155 import __main__ as main
156 if hasattr(main, '__file__'):
157 # get the path of the calling script
158 localdir = os.path.dirname(os.path.realpath(main.__file__))
160 # use cwd if there is no calling script
161 localdir = os.getcwd()
162 localdir_s = localdir.split(os.path.sep)
165 """Match dir against right-hand components of the script dir"""
166 d = dir.split('/') # param 'dir' assumes a / separator
168 return len(localdir_s) > length and localdir_s[-length:] == d
170 def sdir(srcdir, variant):
171 """Build a path from srcdir to the staged API files of
172 'variant' (typically '' or '_debug')"""
173 # Since 'core' and 'plugin' files are staged
174 # in separate directories, we target the parent dir.
175 return os.path.sep.join((
178 'install-vpp%s-native' % variant,
186 if dmatch('src/scripts'):
187 srcdir = os.path.sep.join(localdir_s[:-2])
188 elif dmatch('src/vpp-api/python'):
189 srcdir = os.path.sep.join(localdir_s[:-3])
191 # we're apparently running tests
192 srcdir = os.path.sep.join(localdir_s[:-1])
195 # we're in the source tree, try both the debug and release
197 dirs.append(sdir(srcdir, '_debug'))
198 dirs.append(sdir(srcdir, ''))
200 # Test for staged copies of the scripts
201 # For these, since we explicitly know if we're running a debug versus
202 # release variant, target only the relevant directory
203 if dmatch('build-root/install-vpp_debug-native/vpp/bin'):
204 srcdir = os.path.sep.join(localdir_s[:-4])
205 dirs.append(sdir(srcdir, '_debug'))
206 if dmatch('build-root/install-vpp-native/vpp/bin'):
207 srcdir = os.path.sep.join(localdir_s[:-4])
208 dirs.append(sdir(srcdir, ''))
210 # finally, try the location system packages typically install into
211 dirs.append(os.path.sep.join(('', 'usr', 'share', 'vpp', 'api')))
213 # check the directories for existence; first one wins
215 if os.path.isdir(dir):
221 def find_api_files(cls, api_dir=None, patterns='*'):
222 """Find API definition files from the given directory tree with the
223 given pattern. If no directory is given then find_api_dir() is used
224 to locate one. If no pattern is given then all definition files found
225 in the directory tree are used.
227 :param api_dir: A directory tree in which to locate API definition
228 files; subdirectories are descended into.
229 If this is None then find_api_dir() is called to discover it.
230 :param patterns: A list of patterns to use in each visited directory
231 when looking for files.
232 This can be a list/tuple object or a comma-separated string of
233 patterns. Each value in the list will have leading/trialing
235 The pattern specifies the first part of the filename, '.api.json'
237 The results are de-duplicated, thus overlapping patterns are fine.
238 If this is None it defaults to '*' meaning "all API files".
239 :returns: A list of file paths for the API files found.
242 api_dir = cls.find_api_dir([])
244 raise VPPApiError("api_dir cannot be located")
246 if isinstance(patterns, list) or isinstance(patterns, tuple):
247 patterns = [p.strip() + '.api.json' for p in patterns]
249 patterns = [p.strip() + '.api.json' for p in patterns.split(",")]
252 for root, dirnames, files in os.walk(api_dir):
253 # iterate all given patterns and de-dup the result
254 files = set(sum([fnmatch.filter(files, p) for p in patterns], []))
255 for filename in files:
256 api_files.append(os.path.join(root, filename))
261 def process_json_file(self, apidef_file):
262 api = json.load(apidef_file)
266 for t in api['enums']:
267 t[0] = 'vl_api_' + t[0] + '_t'
268 types[t[0]] = {'type': 'enum', 'data': t}
269 for t in api['unions']:
270 t[0] = 'vl_api_' + t[0] + '_t'
271 types[t[0]] = {'type': 'union', 'data': t}
272 for t in api['types']:
273 t[0] = 'vl_api_' + t[0] + '_t'
274 types[t[0]] = {'type': 'type', 'data': t}
275 for t, v in api['aliases'].items():
276 types['vl_api_' + t + '_t'] = {'type': 'alias', 'data': v}
277 services.update(api['services'])
282 for k, v in types.items():
284 if not vpp_get_type(k):
285 if v['type'] == 'enum':
287 VPPEnumType(t[0], t[1:])
290 elif v['type'] == 'union':
292 VPPUnionType(t[0], t[1:])
295 elif v['type'] == 'type':
300 elif v['type'] == 'alias':
305 if len(unresolved) == 0:
308 raise VPPValueError('Unresolved type definitions {}'
313 for m in api['messages']:
315 messages[m[0]] = VPPMessage(m[0], m[1:])
316 except VPPNotImplementedError:
318 self.logger.error('Not implemented error for {}'.format(m[0]))
319 return messages, services
322 class VPPApiClient(object):
325 This class provides the APIs to VPP. The APIs are loaded
326 from provided .api.json files and makes functions accordingly.
327 These functions are documented in the VPP .api files, as they
328 are dynamically created.
330 Additionally, VPP can send callback messages; this class
331 provides a means to register a callback function to receive
332 these messages in a background thread.
335 VPPApiError = VPPApiError
336 VPPRuntimeError = VPPRuntimeError
337 VPPValueError = VPPValueError
338 VPPNotImplementedError = VPPNotImplementedError
339 VPPIOError = VPPIOError
342 def __init__(self, apifiles=None, testmode=False, async_thread=True,
343 logger=None, loglevel=None,
344 read_timeout=5, use_socket=False,
345 server_address='/run/vpp/api.sock'):
346 """Create a VPP API object.
348 apifiles is a list of files containing API
349 descriptions that will be loaded - methods will be
350 dynamically created reflecting these APIs. If not
351 provided this will load the API files from VPP's
352 default install location.
354 logger, if supplied, is the logging logger object to log to.
355 loglevel, if supplied, is the log level this logger is set
356 to report at (from the loglevels in the logging module).
359 logger = logging.getLogger(
360 "{}.{}".format(__name__, self.__class__.__name__))
361 if loglevel is not None:
362 logger.setLevel(loglevel)
369 self.header = VPPType('header', [['u16', 'msgid'],
370 ['u32', 'client_index']])
372 self.event_callback = None
373 self.message_queue = queue.Queue()
374 self.read_timeout = read_timeout
375 self.async_thread = async_thread
376 self.event_thread = None
377 self.testmode = testmode
378 self.use_socket = use_socket
379 self.server_address = server_address
380 self._apifiles = apifiles
384 from . vpp_transport_socket import VppTransport
386 from . vpp_transport_shmem import VppTransport
389 # Pick up API definitions from default directory
391 apifiles = VPPApiJSONFiles.find_api_files(self.apidir)
393 # In test mode we don't care that we can't find the API files
397 raise VPPRuntimeError
399 for file in apifiles:
400 with open(file) as apidef_file:
401 m, s = VPPApiJSONFiles.process_json_file(apidef_file)
402 self.messages.update(m)
403 self.services.update(s)
405 self.apifiles = apifiles
408 if len(self.messages) == 0 and not testmode:
409 raise VPPValueError(1, 'Missing JSON message definitions')
410 if not(verify_enum_hint(VppEnum.vl_api_address_family_t)):
411 raise VPPRuntimeError("Invalid address family hints. "
414 self.transport = VppTransport(self, read_timeout=read_timeout,
415 server_address=server_address)
416 # Make sure we allow VPP to clean up the message rings.
417 atexit.register(vpp_atexit, weakref.ref(self))
419 add_convenience_methods()
421 def get_function(self, name):
422 return getattr(self._api, name)
424 class ContextId(object):
425 """Multiprocessing-safe provider of unique context IDs."""
427 self.context = mp.Value(ctypes.c_uint, 0)
428 self.lock = mp.Lock()
431 """Get a new unique (or, at least, not recently used) context."""
433 self.context.value += 1
434 return self.context.value
435 get_context = ContextId()
437 def get_type(self, name):
438 return vpp_get_type(name)
442 if not hasattr(self, "_api"):
443 raise VPPApiError("Not connected, api definitions not available")
446 def make_function(self, msg, i, multipart, do_async):
449 return self._call_vpp_async(i, msg, **kwargs)
452 return self._call_vpp(i, msg, multipart, **kwargs)
454 f.__name__ = str(msg.name)
455 f.__doc__ = ", ".join(["%s %s" %
456 (msg.fieldtypes[j], k)
457 for j, k in enumerate(msg.fields)])
462 def _register_functions(self, do_async=False):
463 self.id_names = [None] * (self.vpp_dictionary_maxid + 1)
464 self.id_msgdef = [None] * (self.vpp_dictionary_maxid + 1)
465 self._api = VppApiDynamicMethodHolder()
466 for name, msg in vpp_iterator(self.messages):
467 n = name + '_' + msg.crc[2:]
468 i = self.transport.get_msg_index(n)
470 self.id_msgdef[i] = msg
471 self.id_names[i] = name
473 # Create function for client side messages.
474 if name in self.services:
475 if 'stream' in self.services[name] and \
476 self.services[name]['stream']:
480 f = self.make_function(msg, i, multipart, do_async)
481 setattr(self._api, name, FuncWrapper(f))
484 'No such message type or failed CRC checksum: %s', n)
486 def connect_internal(self, name, msg_handler, chroot_prefix, rx_qlen,
488 pfx = chroot_prefix.encode('utf-8') if chroot_prefix else None
490 rv = self.transport.connect(name, pfx,
491 msg_handler, rx_qlen)
493 raise VPPIOError(2, 'Connect failed')
494 self.vpp_dictionary_maxid = self.transport.msg_table_max_index()
495 self._register_functions(do_async=do_async)
497 # Initialise control ping
498 crc = self.messages['control_ping'].crc
499 self.control_ping_index = self.transport.get_msg_index(
500 ('control_ping' + '_' + crc[2:]))
501 self.control_ping_msgdef = self.messages['control_ping']
502 if self.async_thread:
503 self.event_thread = threading.Thread(
504 target=self.thread_msg_handler)
505 self.event_thread.daemon = True
506 self.event_thread.start()
508 self.event_thread = None
511 def connect(self, name, chroot_prefix=None, do_async=False, rx_qlen=32):
514 name - the name of the client.
515 chroot_prefix - if VPP is chroot'ed, the prefix of the jail
516 do_async - if true, messages are sent without waiting for a reply
517 rx_qlen - the length of the VPP message receive queue between
520 msg_handler = self.transport.get_callback(do_async)
521 return self.connect_internal(name, msg_handler, chroot_prefix, rx_qlen,
524 def connect_sync(self, name, chroot_prefix=None, rx_qlen=32):
525 """Attach to VPP in synchronous mode. Application must poll for events.
527 name - the name of the client.
528 chroot_prefix - if VPP is chroot'ed, the prefix of the jail
529 rx_qlen - the length of the VPP message receive queue between
533 return self.connect_internal(name, None, chroot_prefix, rx_qlen,
536 def disconnect(self):
537 """Detach from VPP."""
538 rv = self.transport.disconnect()
539 if self.event_thread is not None:
540 self.message_queue.put("terminate event thread")
543 def msg_handler_sync(self, msg):
544 """Process an incoming message from VPP in sync mode.
546 The message may be a reply or it may be an async notification.
548 r = self.decode_incoming_msg(msg)
552 # If we have a context, then use the context to find any
553 # request waiting for a reply
555 if hasattr(r, 'context') and r.context > 0:
559 # No context -> async notification that we feed to the callback
560 self.message_queue.put_nowait(r)
562 raise VPPIOError(2, 'RPC reply message received in event handler')
564 def has_context(self, msg):
568 header = VPPType('header_with_context', [['u16', 'msgid'],
569 ['u32', 'client_index'],
572 (i, ci, context), size = header.unpack(msg, 0)
573 if self.id_names[i] == 'rx_thread_exit':
577 # Decode message and returns a tuple.
579 msgobj = self.id_msgdef[i]
580 if 'context' in msgobj.field_by_name and context >= 0:
584 def decode_incoming_msg(self, msg, no_type_conversion=False):
586 self.logger.warning('vpp_api.read failed')
589 (i, ci), size = self.header.unpack(msg, 0)
590 if self.id_names[i] == 'rx_thread_exit':
594 # Decode message and returns a tuple.
596 msgobj = self.id_msgdef[i]
598 raise VPPIOError(2, 'Reply message undefined')
600 r, size = msgobj.unpack(msg, ntc=no_type_conversion)
603 def msg_handler_async(self, msg):
604 """Process a message from VPP in async mode.
606 In async mode, all messages are returned to the callback.
608 r = self.decode_incoming_msg(msg)
612 msgname = type(r).__name__
614 if self.event_callback:
615 self.event_callback(msgname, r)
617 def _control_ping(self, context):
618 """Send a ping command."""
619 self._call_vpp_async(self.control_ping_index,
620 self.control_ping_msgdef,
623 def validate_args(self, msg, kwargs):
624 d = set(kwargs.keys()) - set(msg.field_by_name.keys())
626 raise VPPValueError('Invalid argument {} to {}'
627 .format(list(d), msg.name))
629 def _add_stat(self, name, ms):
630 if not name in self.stats:
631 self.stats[name] = {'max': ms, 'count': 1, 'avg': ms}
633 if ms > self.stats[name]['max']:
634 self.stats[name]['max'] = ms
635 self.stats[name]['count'] += 1
636 n = self.stats[name]['count']
637 self.stats[name]['avg'] = self.stats[name]['avg'] * (n - 1) / n + ms / n
640 s = '\n=== API PAPI STATISTICS ===\n'
641 s += '{:<30} {:>4} {:>6} {:>6}\n'.format('message', 'cnt', 'avg', 'max')
642 for n in sorted(self.stats.items(), key=lambda v: v[1]['avg'], reverse=True):
643 s += '{:<30} {:>4} {:>6.2f} {:>6.2f}\n'.format(n[0], n[1]['count'],
644 n[1]['avg'], n[1]['max'])
647 def _call_vpp(self, i, msgdef, multipart, **kwargs):
648 """Given a message, send the message and await a reply.
650 msgdef - the message packing definition
651 i - the message type index
652 multipart - True if the message returns multiple
654 context - context number - chosen at random if not
656 The remainder of the kwargs are the arguments to the API call.
658 The return value is the message or message array containing
659 the response. It will raise an IOError exception if there was
660 no response within the timeout window.
663 if 'context' not in kwargs:
664 context = self.get_context()
665 kwargs['context'] = context
667 context = kwargs['context']
668 kwargs['_vl_msg_id'] = i
670 no_type_conversion = kwargs.pop('_no_type_conversion', False)
671 timeout = kwargs.pop('_timeout', None)
674 if self.transport.socket_index:
675 kwargs['client_index'] = self.transport.socket_index
676 except AttributeError:
678 self.validate_args(msgdef, kwargs)
680 s = 'Calling {}({})'.format(msgdef.name,
681 ','.join(['{!r}:{!r}'.format(k, v) for k, v in kwargs.items()]))
684 b = msgdef.pack(kwargs)
685 self.transport.suspend()
687 self.transport.write(b)
690 # Send a ping after the request - we use its response
691 # to detect that we have seen all results.
692 self._control_ping(context)
694 # Block until we get a reply.
697 r = self.read_blocking(no_type_conversion, timeout)
699 raise VPPIOError(2, 'VPP API client: read failed')
700 msgname = type(r).__name__
701 if context not in r or r.context == 0 or context != r.context:
702 # Message being queued
703 self.message_queue.put_nowait(r)
709 if msgname == 'control_ping_reply':
714 self.transport.resume()
716 s = 'Return value: {!r}'.format(r)
721 self._add_stat(msgdef.name, (te - ts) * 1000)
724 def _call_vpp_async(self, i, msg, **kwargs):
725 """Given a message, send the message and return the context.
727 msgdef - the message packing definition
728 i - the message type index
729 context - context number - chosen at random if not
731 The remainder of the kwargs are the arguments to the API call.
733 The reply message(s) will be delivered later to the registered callback.
734 The returned context will help with assigning which call
735 the reply belongs to.
737 if 'context' not in kwargs:
738 context = self.get_context()
739 kwargs['context'] = context
741 context = kwargs['context']
743 if self.transport.socket_index:
744 kwargs['client_index'] = self.transport.socket_index
745 except AttributeError:
746 kwargs['client_index'] = 0
747 kwargs['_vl_msg_id'] = i
750 self.transport.write(b)
753 def read_blocking(self, no_type_conversion=False, timeout=None):
754 """Get next received message from transport within timeout, decoded.
756 Note that notifications have context zero
757 and are not put into receive queue (at least for socket transport),
758 use async_thread with registered callback for processing them.
760 If no message appears in the queue within timeout, return None.
762 Optionally, type conversion can be skipped,
763 as some of conversions are into less precise types.
765 When r is the return value of this, the caller can get message name as:
766 msgname = type(r).__name__
767 and context number (type long) as:
770 :param no_type_conversion: If false, type conversions are applied.
771 :type no_type_conversion: bool
772 :returns: Decoded message, or None if no message (within timeout).
773 :rtype: Whatever VPPType.unpack returns, depends on no_type_conversion.
774 :raises VppTransportShmemIOError if timed out.
776 msg = self.transport.read(timeout=timeout)
779 return self.decode_incoming_msg(msg, no_type_conversion)
781 def register_event_callback(self, callback):
782 """Register a callback for async messages.
784 This will be called for async notifications in sync mode,
785 and all messages in async mode. In sync mode, replies to
786 requests will not come here.
788 callback is a fn(msg_type_name, msg_type) that will be
789 called when a message comes in. While this function is
790 executing, note that (a) you are in a background thread and
791 may wish to use threading.Lock to protect your datastructures,
792 and (b) message processing from VPP will stop (so if you take
793 a long while about it you may provoke reply timeouts or cause
794 VPP to fill the RX buffer). Passing None will disable the
797 self.event_callback = callback
799 def thread_msg_handler(self):
800 """Python thread calling the user registered message handler.
802 This is to emulate the old style event callback scheme. Modern
803 clients should provide their own thread to poll the event
807 r = self.message_queue.get()
808 if r == "terminate event thread":
810 msgname = type(r).__name__
811 if self.event_callback:
812 self.event_callback(msgname, r)
814 def validate_message_table(self, namecrctable):
815 """Take a dictionary of name_crc message names
816 and returns an array of missing messages"""
819 for name_crc in namecrctable:
820 i = self.transport.get_msg_index(name_crc)
822 missing_table.append(name_crc)
825 def dump_message_table(self):
826 """Return VPPs API message table as name_crc dictionary"""
827 return self.transport.message_table
829 def dump_message_table_filtered(self, msglist):
830 """Return VPPs API message table as name_crc dictionary,
831 filtered by message name list."""
833 replies = [self.services[n]['reply'] for n in msglist]
834 message_table_filtered = {}
835 for name in msglist + replies:
836 for k,v in self.transport.message_table.items():
837 if k.startswith(name):
838 message_table_filtered[k] = v
840 return message_table_filtered
843 return "<VPPApiClient apifiles=%s, testmode=%s, async_thread=%s, " \
844 "logger=%s, read_timeout=%s, use_socket=%s, " \
845 "server_address='%s'>" % (
846 self._apifiles, self.testmode, self.async_thread,
847 self.logger, self.read_timeout, self.use_socket,
851 # Provide the old name for backward compatibility.
854 # vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4