3 # Copyright (c) 2016 Cisco and/or its affiliates.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
17 from __future__ import print_function
18 from __future__ import absolute_import
21 import multiprocessing as mp
30 from . vpp_serializer import VPPType, VPPEnumType, VPPUnionType
31 from . vpp_serializer import VPPMessage, vpp_get_type, VPPTypeAlias
33 if sys.version[0] == '2':
38 __all__ = ('FuncWrapper', 'VPP', 'VppApiDynamicMethodHolder',
39 'VppEnum', 'VppEnumType',
40 'VPPIOError', 'VPPRuntimeError', 'VPPValueError',
44 def metaclass(metaclass):
45 @functools.wraps(metaclass)
47 return metaclass(cls.__name__, cls.__bases__, cls.__dict__.copy())
52 class VppEnumType(type):
53 def __getattr__(cls, name):
54 t = vpp_get_type(name)
58 @metaclass(VppEnumType)
59 class VppEnum(object):
63 def vpp_atexit(vpp_weakref):
64 """Clean up VPP connection on shutdown."""
65 vpp_instance = vpp_weakref()
66 if vpp_instance and vpp_instance.transport.connected:
67 vpp_instance.logger.debug('Cleaning up VPP on exit')
68 vpp_instance.disconnect()
71 if sys.version[0] == '2':
79 class VppApiDynamicMethodHolder(object):
83 class FuncWrapper(object):
84 def __init__(self, func):
86 self.__name__ = func.__name__
87 self.__doc__ = func.__doc__
89 def __call__(self, **kwargs):
90 return self._func(**kwargs)
93 return '<FuncWrapper(func=<%s(%s)>)>' % (self.__name__, self.__doc__)
96 class VPPApiError(Exception):
100 class VPPNotImplementedError(NotImplementedError):
104 class VPPIOError(IOError):
108 class VPPRuntimeError(RuntimeError):
112 class VPPValueError(ValueError):
115 class VPPApiJSONFiles(object):
117 def find_api_dir(cls, dirs):
118 """Attempt to find the best directory in which API definition
119 files may reside. If the value VPP_API_DIR exists in the environment
120 then it is first on the search list. If we're inside a recognized
121 location in a VPP source tree (src/scripts and src/vpp-api/python)
122 then entries from there to the likely locations in build-root are
123 added. Finally the location used by system packages is added.
125 :returns: A single directory name, or None if no such directory
129 # perhaps we're in the 'src/scripts' or 'src/vpp-api/python' dir;
130 # in which case, plot a course to likely places in the src tree
131 import __main__ as main
132 if hasattr(main, '__file__'):
133 # get the path of the calling script
134 localdir = os.path.dirname(os.path.realpath(main.__file__))
136 # use cwd if there is no calling script
137 localdir = os.getcwd()
138 localdir_s = localdir.split(os.path.sep)
141 """Match dir against right-hand components of the script dir"""
142 d = dir.split('/') # param 'dir' assumes a / separator
144 return len(localdir_s) > length and localdir_s[-length:] == d
146 def sdir(srcdir, variant):
147 """Build a path from srcdir to the staged API files of
148 'variant' (typically '' or '_debug')"""
149 # Since 'core' and 'plugin' files are staged
150 # in separate directories, we target the parent dir.
151 return os.path.sep.join((
154 'install-vpp%s-native' % variant,
162 if dmatch('src/scripts'):
163 srcdir = os.path.sep.join(localdir_s[:-2])
164 elif dmatch('src/vpp-api/python'):
165 srcdir = os.path.sep.join(localdir_s[:-3])
167 # we're apparently running tests
168 srcdir = os.path.sep.join(localdir_s[:-1])
171 # we're in the source tree, try both the debug and release
173 dirs.append(sdir(srcdir, '_debug'))
174 dirs.append(sdir(srcdir, ''))
176 # Test for staged copies of the scripts
177 # For these, since we explicitly know if we're running a debug versus
178 # release variant, target only the relevant directory
179 if dmatch('build-root/install-vpp_debug-native/vpp/bin'):
180 srcdir = os.path.sep.join(localdir_s[:-4])
181 dirs.append(sdir(srcdir, '_debug'))
182 if dmatch('build-root/install-vpp-native/vpp/bin'):
183 srcdir = os.path.sep.join(localdir_s[:-4])
184 dirs.append(sdir(srcdir, ''))
186 # finally, try the location system packages typically install into
187 dirs.append(os.path.sep.join(('', 'usr', 'share', 'vpp', 'api')))
189 # check the directories for existence; first one wins
191 if os.path.isdir(dir):
197 def find_api_files(cls, api_dir=None, patterns='*'):
198 """Find API definition files from the given directory tree with the
199 given pattern. If no directory is given then find_api_dir() is used
200 to locate one. If no pattern is given then all definition files found
201 in the directory tree are used.
203 :param api_dir: A directory tree in which to locate API definition
204 files; subdirectories are descended into.
205 If this is None then find_api_dir() is called to discover it.
206 :param patterns: A list of patterns to use in each visited directory
207 when looking for files.
208 This can be a list/tuple object or a comma-separated string of
209 patterns. Each value in the list will have leading/trialing
211 The pattern specifies the first part of the filename, '.api.json'
213 The results are de-duplicated, thus overlapping patterns are fine.
214 If this is None it defaults to '*' meaning "all API files".
215 :returns: A list of file paths for the API files found.
218 api_dir = cls.find_api_dir([])
220 raise VPPApiError("api_dir cannot be located")
222 if isinstance(patterns, list) or isinstance(patterns, tuple):
223 patterns = [p.strip() + '.api.json' for p in patterns]
225 patterns = [p.strip() + '.api.json' for p in patterns.split(",")]
228 for root, dirnames, files in os.walk(api_dir):
229 # iterate all given patterns and de-dup the result
230 files = set(sum([fnmatch.filter(files, p) for p in patterns], []))
231 for filename in files:
232 api_files.append(os.path.join(root, filename))
237 def process_json_file(self, apidef_file):
238 api = json.load(apidef_file)
242 for t in api['enums']:
243 t[0] = 'vl_api_' + t[0] + '_t'
244 types[t[0]] = {'type': 'enum', 'data': t}
245 for t in api['unions']:
246 t[0] = 'vl_api_' + t[0] + '_t'
247 types[t[0]] = {'type': 'union', 'data': t}
248 for t in api['types']:
249 t[0] = 'vl_api_' + t[0] + '_t'
250 types[t[0]] = {'type': 'type', 'data': t}
251 for t, v in api['aliases'].items():
252 types['vl_api_' + t + '_t'] = {'type': 'alias', 'data': v}
253 services.update(api['services'])
258 for k, v in types.items():
260 if not vpp_get_type(k):
261 if v['type'] == 'enum':
263 VPPEnumType(t[0], t[1:])
266 elif v['type'] == 'union':
268 VPPUnionType(t[0], t[1:])
271 elif v['type'] == 'type':
276 elif v['type'] == 'alias':
281 if len(unresolved) == 0:
284 raise VPPValueError('Unresolved type definitions {}'
289 for m in api['messages']:
291 messages[m[0]] = VPPMessage(m[0], m[1:])
292 except VPPNotImplementedError:
294 self.logger.error('Not implemented error for {}'.format(m[0]))
295 return messages, services
297 class VPPApiClient(object):
300 This class provides the APIs to VPP. The APIs are loaded
301 from provided .api.json files and makes functions accordingly.
302 These functions are documented in the VPP .api files, as they
303 are dynamically created.
305 Additionally, VPP can send callback messages; this class
306 provides a means to register a callback function to receive
307 these messages in a background thread.
310 VPPApiError = VPPApiError
311 VPPRuntimeError = VPPRuntimeError
312 VPPValueError = VPPValueError
313 VPPNotImplementedError = VPPNotImplementedError
314 VPPIOError = VPPIOError
317 def __init__(self, apifiles=None, testmode=False, async_thread=True,
318 logger=None, loglevel=None,
319 read_timeout=5, use_socket=False,
320 server_address='/run/vpp/api.sock'):
321 """Create a VPP API object.
323 apifiles is a list of files containing API
324 descriptions that will be loaded - methods will be
325 dynamically created reflecting these APIs. If not
326 provided this will load the API files from VPP's
327 default install location.
329 logger, if supplied, is the logging logger object to log to.
330 loglevel, if supplied, is the log level this logger is set
331 to report at (from the loglevels in the logging module).
334 logger = logging.getLogger(
335 "{}.{}".format(__name__, self.__class__.__name__))
336 if loglevel is not None:
337 logger.setLevel(loglevel)
344 self.header = VPPType('header', [['u16', 'msgid'],
345 ['u32', 'client_index']])
347 self.event_callback = None
348 self.message_queue = queue.Queue()
349 self.read_timeout = read_timeout
350 self.async_thread = async_thread
351 self.event_thread = None
352 self.testmode = testmode
353 self.use_socket = use_socket
354 self.server_address = server_address
355 self._apifiles = apifiles
358 from . vpp_transport_socket import VppTransport
360 from . vpp_transport_shmem import VppTransport
363 # Pick up API definitions from default directory
365 apifiles = VPPApiJSONFiles.find_api_files(self.apidir)
367 # In test mode we don't care that we can't find the API files
371 raise VPPRuntimeError
373 for file in apifiles:
374 with open(file) as apidef_file:
375 m, s = VPPApiJSONFiles.process_json_file(apidef_file)
376 self.messages.update(m)
377 self.services.update(s)
379 self.apifiles = apifiles
382 if len(self.messages) == 0 and not testmode:
383 raise VPPValueError(1, 'Missing JSON message definitions')
385 self.transport = VppTransport(self, read_timeout=read_timeout,
386 server_address=server_address)
387 # Make sure we allow VPP to clean up the message rings.
388 atexit.register(vpp_atexit, weakref.ref(self))
390 def get_function(self, name):
391 return getattr(self._api, name)
394 class ContextId(object):
395 """Multiprocessing-safe provider of unique context IDs."""
397 self.context = mp.Value(ctypes.c_uint, 0)
398 self.lock = mp.Lock()
401 """Get a new unique (or, at least, not recently used) context."""
403 self.context.value += 1
404 return self.context.value
405 get_context = ContextId()
407 def get_type(self, name):
408 return vpp_get_type(name)
412 if not hasattr(self, "_api"):
413 raise VPPApiError("Not connected, api definitions not available")
416 def make_function(self, msg, i, multipart, do_async):
419 return self._call_vpp_async(i, msg, **kwargs)
422 return self._call_vpp(i, msg, multipart, **kwargs)
424 f.__name__ = str(msg.name)
425 f.__doc__ = ", ".join(["%s %s" %
426 (msg.fieldtypes[j], k)
427 for j, k in enumerate(msg.fields)])
432 def _register_functions(self, do_async=False):
433 self.id_names = [None] * (self.vpp_dictionary_maxid + 1)
434 self.id_msgdef = [None] * (self.vpp_dictionary_maxid + 1)
435 self._api = VppApiDynamicMethodHolder()
436 for name, msg in vpp_iterator(self.messages):
437 n = name + '_' + msg.crc[2:]
438 i = self.transport.get_msg_index(n)
440 self.id_msgdef[i] = msg
441 self.id_names[i] = name
443 # Create function for client side messages.
444 if name in self.services:
445 if 'stream' in self.services[name] and \
446 self.services[name]['stream']:
450 f = self.make_function(msg, i, multipart, do_async)
451 setattr(self._api, name, FuncWrapper(f))
454 'No such message type or failed CRC checksum: %s', n)
456 def connect_internal(self, name, msg_handler, chroot_prefix, rx_qlen,
458 pfx = chroot_prefix.encode('utf-8') if chroot_prefix else None
460 rv = self.transport.connect(name, pfx,
461 msg_handler, rx_qlen)
463 raise VPPIOError(2, 'Connect failed')
464 self.vpp_dictionary_maxid = self.transport.msg_table_max_index()
465 self._register_functions(do_async=do_async)
467 # Initialise control ping
468 crc = self.messages['control_ping'].crc
469 self.control_ping_index = self.transport.get_msg_index(
470 ('control_ping' + '_' + crc[2:]))
471 self.control_ping_msgdef = self.messages['control_ping']
472 if self.async_thread:
473 self.event_thread = threading.Thread(
474 target=self.thread_msg_handler)
475 self.event_thread.daemon = True
476 self.event_thread.start()
478 self.event_thread = None
481 def connect(self, name, chroot_prefix=None, do_async=False, rx_qlen=32):
484 name - the name of the client.
485 chroot_prefix - if VPP is chroot'ed, the prefix of the jail
486 do_async - if true, messages are sent without waiting for a reply
487 rx_qlen - the length of the VPP message receive queue between
490 msg_handler = self.transport.get_callback(do_async)
491 return self.connect_internal(name, msg_handler, chroot_prefix, rx_qlen,
494 def connect_sync(self, name, chroot_prefix=None, rx_qlen=32):
495 """Attach to VPP in synchronous mode. Application must poll for events.
497 name - the name of the client.
498 chroot_prefix - if VPP is chroot'ed, the prefix of the jail
499 rx_qlen - the length of the VPP message receive queue between
503 return self.connect_internal(name, None, chroot_prefix, rx_qlen,
506 def disconnect(self):
507 """Detach from VPP."""
508 rv = self.transport.disconnect()
509 if self.event_thread is not None:
510 self.message_queue.put("terminate event thread")
513 def msg_handler_sync(self, msg):
514 """Process an incoming message from VPP in sync mode.
516 The message may be a reply or it may be an async notification.
518 r = self.decode_incoming_msg(msg)
522 # If we have a context, then use the context to find any
523 # request waiting for a reply
525 if hasattr(r, 'context') and r.context > 0:
529 # No context -> async notification that we feed to the callback
530 self.message_queue.put_nowait(r)
532 raise VPPIOError(2, 'RPC reply message received in event handler')
534 def has_context(self, msg):
538 header = VPPType('header_with_context', [['u16', 'msgid'],
539 ['u32', 'client_index'],
542 (i, ci, context), size = header.unpack(msg, 0)
543 if self.id_names[i] == 'rx_thread_exit':
547 # Decode message and returns a tuple.
549 msgobj = self.id_msgdef[i]
550 if 'context' in msgobj.field_by_name and context >= 0:
554 def decode_incoming_msg(self, msg, no_type_conversion=False):
556 self.logger.warning('vpp_api.read failed')
559 (i, ci), size = self.header.unpack(msg, 0)
560 if self.id_names[i] == 'rx_thread_exit':
564 # Decode message and returns a tuple.
566 msgobj = self.id_msgdef[i]
568 raise VPPIOError(2, 'Reply message undefined')
570 r, size = msgobj.unpack(msg, ntc=no_type_conversion)
573 def msg_handler_async(self, msg):
574 """Process a message from VPP in async mode.
576 In async mode, all messages are returned to the callback.
578 r = self.decode_incoming_msg(msg)
582 msgname = type(r).__name__
584 if self.event_callback:
585 self.event_callback(msgname, r)
587 def _control_ping(self, context):
588 """Send a ping command."""
589 self._call_vpp_async(self.control_ping_index,
590 self.control_ping_msgdef,
593 def validate_args(self, msg, kwargs):
594 d = set(kwargs.keys()) - set(msg.field_by_name.keys())
596 raise VPPValueError('Invalid argument {} to {}'
597 .format(list(d), msg.name))
599 def _call_vpp(self, i, msgdef, multipart, **kwargs):
600 """Given a message, send the message and await a reply.
602 msgdef - the message packing definition
603 i - the message type index
604 multipart - True if the message returns multiple
606 context - context number - chosen at random if not
608 The remainder of the kwargs are the arguments to the API call.
610 The return value is the message or message array containing
611 the response. It will raise an IOError exception if there was
612 no response within the timeout window.
615 if 'context' not in kwargs:
616 context = self.get_context()
617 kwargs['context'] = context
619 context = kwargs['context']
620 kwargs['_vl_msg_id'] = i
622 no_type_conversion = kwargs.pop('_no_type_conversion', False)
625 if self.transport.socket_index:
626 kwargs['client_index'] = self.transport.socket_index
627 except AttributeError:
629 self.validate_args(msgdef, kwargs)
631 s = 'Calling {}({})'.format(msgdef.name,
632 ','.join(['{!r}:{!r}'.format(k, v) for k, v in kwargs.items()]))
635 b = msgdef.pack(kwargs)
636 self.transport.suspend()
638 self.transport.write(b)
641 # Send a ping after the request - we use its response
642 # to detect that we have seen all results.
643 self._control_ping(context)
645 # Block until we get a reply.
648 r = self.read_blocking(no_type_conversion)
650 raise VPPIOError(2, 'VPP API client: read failed')
651 msgname = type(r).__name__
652 if context not in r or r.context == 0 or context != r.context:
653 # Message being queued
654 self.message_queue.put_nowait(r)
660 if msgname == 'control_ping_reply':
665 self.transport.resume()
667 s = 'Return value: {!r}'.format(r)
673 def _call_vpp_async(self, i, msg, **kwargs):
674 """Given a message, send the message and return the context.
676 msgdef - the message packing definition
677 i - the message type index
678 context - context number - chosen at random if not
680 The remainder of the kwargs are the arguments to the API call.
682 The reply message(s) will be delivered later to the registered callback.
683 The returned context will help with assigning which call
684 the reply belongs to.
686 if 'context' not in kwargs:
687 context = self.get_context()
688 kwargs['context'] = context
690 context = kwargs['context']
692 if self.transport.socket_index:
693 kwargs['client_index'] = self.transport.socket_index
694 except AttributeError:
695 kwargs['client_index'] = 0
696 kwargs['_vl_msg_id'] = i
699 self.transport.write(b)
702 def read_blocking(self, no_type_conversion=False):
703 """Get next received message from transport within timeout, decoded.
705 Note that noticifations have context zero
706 and are not put into receive queue (at least for socket transport),
707 use async_thread with registered callback for processing them.
709 If no message appears in the queue within timeout, return None.
711 Optionally, type conversion can be skipped,
712 as some of conversions are into less precise types.
714 When r is the return value of this, the caller can get message name as:
715 msgname = type(r).__name__
716 and context number (type long) as:
719 :param no_type_conversion: If false, type conversions are applied.
720 :type no_type_conversion: bool
721 :returns: Decoded message, or None if no message (within timeout).
722 :rtype: Whatever VPPType.unpack returns, depends on no_type_conversion.
724 msg = self.transport.read()
727 return self.decode_incoming_msg(msg, no_type_conversion)
729 def register_event_callback(self, callback):
730 """Register a callback for async messages.
732 This will be called for async notifications in sync mode,
733 and all messages in async mode. In sync mode, replies to
734 requests will not come here.
736 callback is a fn(msg_type_name, msg_type) that will be
737 called when a message comes in. While this function is
738 executing, note that (a) you are in a background thread and
739 may wish to use threading.Lock to protect your datastructures,
740 and (b) message processing from VPP will stop (so if you take
741 a long while about it you may provoke reply timeouts or cause
742 VPP to fill the RX buffer). Passing None will disable the
745 self.event_callback = callback
747 def thread_msg_handler(self):
748 """Python thread calling the user registered message handler.
750 This is to emulate the old style event callback scheme. Modern
751 clients should provide their own thread to poll the event
755 r = self.message_queue.get()
756 if r == "terminate event thread":
758 msgname = type(r).__name__
759 if self.event_callback:
760 self.event_callback(msgname, r)
762 def validate_message_table(self, namecrctable):
763 """Take a dictionary of name_crc message names
764 and returns an array of missing messages"""
767 for name_crc in namecrctable:
768 i = self.transport.get_msg_index(name_crc)
770 missing_table.append(name_crc)
773 def dump_message_table(self):
774 """Return VPPs API message table as name_crc dictionary"""
775 return self.transport.message_table
777 def dump_message_table_filtered(self, msglist):
778 """Return VPPs API message table as name_crc dictionary,
779 filtered by message name list."""
781 replies = [self.services[n]['reply'] for n in msglist]
782 message_table_filtered = {}
783 for name in msglist + replies:
784 for k,v in self.transport.message_table.items():
785 if k.startswith(name):
786 message_table_filtered[k] = v
788 return message_table_filtered
791 return "<VPPApiClient apifiles=%s, testmode=%s, async_thread=%s, " \
792 "logger=%s, read_timeout=%s, use_socket=%s, " \
793 "server_address='%s'>" % (
794 self._apifiles, self.testmode, self.async_thread,
795 self.logger, self.read_timeout, self.use_socket,
799 # Provide the old name for backward compatibility.
802 # vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4