3 # Copyright (c) 2016 Cisco and/or its affiliates.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
17 from __future__ import print_function
18 from __future__ import absolute_import
21 import multiprocessing as mp
30 from . vpp_serializer import VPPType, VPPEnumType, VPPUnionType
31 from . vpp_serializer import VPPMessage, vpp_get_type, VPPTypeAlias
33 if sys.version[0] == '2':
38 __all__ = ('FuncWrapper', 'VPP', 'VppApiDynamicMethodHolder',
39 'VppEnum', 'VppEnumType',
40 'VPPIOError', 'VPPRuntimeError', 'VPPValueError',
44 def metaclass(metaclass):
45 @functools.wraps(metaclass)
47 return metaclass(cls.__name__, cls.__bases__, cls.__dict__.copy())
52 class VppEnumType(type):
53 def __getattr__(cls, name):
54 t = vpp_get_type(name)
58 @metaclass(VppEnumType)
59 class VppEnum(object):
63 def vpp_atexit(vpp_weakref):
64 """Clean up VPP connection on shutdown."""
65 vpp_instance = vpp_weakref()
66 if vpp_instance and vpp_instance.transport.connected:
67 vpp_instance.logger.debug('Cleaning up VPP on exit')
68 vpp_instance.disconnect()
71 if sys.version[0] == '2':
79 class VppApiDynamicMethodHolder(object):
83 class FuncWrapper(object):
84 def __init__(self, func):
86 self.__name__ = func.__name__
87 self.__doc__ = func.__doc__
89 def __call__(self, **kwargs):
90 return self._func(**kwargs)
93 return '<FuncWrapper(func=<%s(%s)>)>' % (self.__name__, self.__doc__)
96 class VPPApiError(Exception):
100 class VPPNotImplementedError(NotImplementedError):
104 class VPPIOError(IOError):
108 class VPPRuntimeError(RuntimeError):
112 class VPPValueError(ValueError):
115 class VPPApiJSONFiles(object):
117 def find_api_dir(cls, dirs):
118 """Attempt to find the best directory in which API definition
119 files may reside. If the value VPP_API_DIR exists in the environment
120 then it is first on the search list. If we're inside a recognized
121 location in a VPP source tree (src/scripts and src/vpp-api/python)
122 then entries from there to the likely locations in build-root are
123 added. Finally the location used by system packages is added.
125 :returns: A single directory name, or None if no such directory
129 # perhaps we're in the 'src/scripts' or 'src/vpp-api/python' dir;
130 # in which case, plot a course to likely places in the src tree
131 import __main__ as main
132 if hasattr(main, '__file__'):
133 # get the path of the calling script
134 localdir = os.path.dirname(os.path.realpath(main.__file__))
136 # use cwd if there is no calling script
137 localdir = os.getcwd()
138 localdir_s = localdir.split(os.path.sep)
141 """Match dir against right-hand components of the script dir"""
142 d = dir.split('/') # param 'dir' assumes a / separator
144 return len(localdir_s) > length and localdir_s[-length:] == d
146 def sdir(srcdir, variant):
147 """Build a path from srcdir to the staged API files of
148 'variant' (typically '' or '_debug')"""
149 # Since 'core' and 'plugin' files are staged
150 # in separate directories, we target the parent dir.
151 return os.path.sep.join((
154 'install-vpp%s-native' % variant,
162 if dmatch('src/scripts'):
163 srcdir = os.path.sep.join(localdir_s[:-2])
164 elif dmatch('src/vpp-api/python'):
165 srcdir = os.path.sep.join(localdir_s[:-3])
167 # we're apparently running tests
168 srcdir = os.path.sep.join(localdir_s[:-1])
171 # we're in the source tree, try both the debug and release
173 dirs.append(sdir(srcdir, '_debug'))
174 dirs.append(sdir(srcdir, ''))
176 # Test for staged copies of the scripts
177 # For these, since we explicitly know if we're running a debug versus
178 # release variant, target only the relevant directory
179 if dmatch('build-root/install-vpp_debug-native/vpp/bin'):
180 srcdir = os.path.sep.join(localdir_s[:-4])
181 dirs.append(sdir(srcdir, '_debug'))
182 if dmatch('build-root/install-vpp-native/vpp/bin'):
183 srcdir = os.path.sep.join(localdir_s[:-4])
184 dirs.append(sdir(srcdir, ''))
186 # finally, try the location system packages typically install into
187 dirs.append(os.path.sep.join(('', 'usr', 'share', 'vpp', 'api')))
189 # check the directories for existence; first one wins
191 if os.path.isdir(dir):
197 def find_api_files(cls, api_dir=None, patterns='*'):
198 """Find API definition files from the given directory tree with the
199 given pattern. If no directory is given then find_api_dir() is used
200 to locate one. If no pattern is given then all definition files found
201 in the directory tree are used.
203 :param api_dir: A directory tree in which to locate API definition
204 files; subdirectories are descended into.
205 If this is None then find_api_dir() is called to discover it.
206 :param patterns: A list of patterns to use in each visited directory
207 when looking for files.
208 This can be a list/tuple object or a comma-separated string of
209 patterns. Each value in the list will have leading/trialing
211 The pattern specifies the first part of the filename, '.api.json'
213 The results are de-duplicated, thus overlapping patterns are fine.
214 If this is None it defaults to '*' meaning "all API files".
215 :returns: A list of file paths for the API files found.
218 api_dir = cls.find_api_dir([])
220 raise VPPApiError("api_dir cannot be located")
222 if isinstance(patterns, list) or isinstance(patterns, tuple):
223 patterns = [p.strip() + '.api.json' for p in patterns]
225 patterns = [p.strip() + '.api.json' for p in patterns.split(",")]
228 for root, dirnames, files in os.walk(api_dir):
229 # iterate all given patterns and de-dup the result
230 files = set(sum([fnmatch.filter(files, p) for p in patterns], []))
231 for filename in files:
232 api_files.append(os.path.join(root, filename))
237 def process_json_file(self, apidef_file):
238 api = json.load(apidef_file)
242 for t in api['enums']:
243 t[0] = 'vl_api_' + t[0] + '_t'
244 types[t[0]] = {'type': 'enum', 'data': t}
245 for t in api['unions']:
246 t[0] = 'vl_api_' + t[0] + '_t'
247 types[t[0]] = {'type': 'union', 'data': t}
248 for t in api['types']:
249 t[0] = 'vl_api_' + t[0] + '_t'
250 types[t[0]] = {'type': 'type', 'data': t}
251 for t, v in api['aliases'].items():
252 types['vl_api_' + t + '_t'] = {'type': 'alias', 'data': v}
253 services.update(api['services'])
258 for k, v in types.items():
260 if not vpp_get_type(k):
261 if v['type'] == 'enum':
263 VPPEnumType(t[0], t[1:])
266 elif v['type'] == 'union':
268 VPPUnionType(t[0], t[1:])
271 elif v['type'] == 'type':
276 elif v['type'] == 'alias':
281 if len(unresolved) == 0:
284 raise VPPValueError('Unresolved type definitions {}'
289 for m in api['messages']:
291 messages[m[0]] = VPPMessage(m[0], m[1:])
292 except VPPNotImplementedError:
294 self.logger.error('Not implemented error for {}'.format(m[0]))
295 return messages, services
297 class VPPApiClient(object):
300 This class provides the APIs to VPP. The APIs are loaded
301 from provided .api.json files and makes functions accordingly.
302 These functions are documented in the VPP .api files, as they
303 are dynamically created.
305 Additionally, VPP can send callback messages; this class
306 provides a means to register a callback function to receive
307 these messages in a background thread.
310 VPPApiError = VPPApiError
311 VPPRuntimeError = VPPRuntimeError
312 VPPValueError = VPPValueError
313 VPPNotImplementedError = VPPNotImplementedError
314 VPPIOError = VPPIOError
317 def __init__(self, apifiles=None, testmode=False, async_thread=True,
318 logger=None, loglevel=None,
319 read_timeout=5, use_socket=False,
320 server_address='/run/vpp/api.sock'):
321 """Create a VPP API object.
323 apifiles is a list of files containing API
324 descriptions that will be loaded - methods will be
325 dynamically created reflecting these APIs. If not
326 provided this will load the API files from VPP's
327 default install location.
329 logger, if supplied, is the logging logger object to log to.
330 loglevel, if supplied, is the log level this logger is set
331 to report at (from the loglevels in the logging module).
334 logger = logging.getLogger(
335 "{}.{}".format(__name__, self.__class__.__name__))
336 if loglevel is not None:
337 logger.setLevel(loglevel)
344 self.header = VPPType('header', [['u16', 'msgid'],
345 ['u32', 'client_index']])
347 self.event_callback = None
348 self.message_queue = queue.Queue()
349 self.read_timeout = read_timeout
350 self.async_thread = async_thread
351 self.event_thread = None
352 self.testmode = testmode
353 self.use_socket = use_socket
354 self.server_address = server_address
355 self._apifiles = apifiles
358 from . vpp_transport_socket import VppTransport
360 from . vpp_transport_shmem import VppTransport
363 # Pick up API definitions from default directory
365 apifiles = VPPApiJSONFiles.find_api_files(self.apidir)
367 # In test mode we don't care that we can't find the API files
371 raise VPPRuntimeError
373 for file in apifiles:
374 with open(file) as apidef_file:
375 m, s = VPPApiJSONFiles.process_json_file(apidef_file)
376 self.messages.update(m)
377 self.services.update(s)
379 self.apifiles = apifiles
382 if len(self.messages) == 0 and not testmode:
383 raise VPPValueError(1, 'Missing JSON message definitions')
385 self.transport = VppTransport(self, read_timeout=read_timeout,
386 server_address=server_address)
387 # Make sure we allow VPP to clean up the message rings.
388 atexit.register(vpp_atexit, weakref.ref(self))
390 def get_function(self, name):
391 return getattr(self._api, name)
394 class ContextId(object):
395 """Multiprocessing-safe provider of unique context IDs."""
397 self.context = mp.Value(ctypes.c_uint, 0)
398 self.lock = mp.Lock()
401 """Get a new unique (or, at least, not recently used) context."""
403 self.context.value += 1
404 return self.context.value
405 get_context = ContextId()
407 def get_type(self, name):
408 return vpp_get_type(name)
412 if not hasattr(self, "_api"):
413 raise VPPApiError("Not connected, api definitions not available")
416 def make_function(self, msg, i, multipart, do_async):
419 return self._call_vpp_async(i, msg, **kwargs)
422 return self._call_vpp(i, msg, multipart, **kwargs)
424 f.__name__ = str(msg.name)
425 f.__doc__ = ", ".join(["%s %s" %
426 (msg.fieldtypes[j], k)
427 for j, k in enumerate(msg.fields)])
432 def _register_functions(self, do_async=False):
433 self.id_names = [None] * (self.vpp_dictionary_maxid + 1)
434 self.id_msgdef = [None] * (self.vpp_dictionary_maxid + 1)
435 self._api = VppApiDynamicMethodHolder()
436 for name, msg in vpp_iterator(self.messages):
437 n = name + '_' + msg.crc[2:]
438 i = self.transport.get_msg_index(n.encode('utf-8'))
440 self.id_msgdef[i] = msg
441 self.id_names[i] = name
443 # Create function for client side messages.
444 if name in self.services:
445 if 'stream' in self.services[name] and \
446 self.services[name]['stream']:
450 f = self.make_function(msg, i, multipart, do_async)
451 setattr(self._api, name, FuncWrapper(f))
454 'No such message type or failed CRC checksum: %s', n)
456 def connect_internal(self, name, msg_handler, chroot_prefix, rx_qlen,
458 pfx = chroot_prefix.encode('utf-8') if chroot_prefix else None
460 rv = self.transport.connect(name.encode('utf-8'), pfx,
461 msg_handler, rx_qlen)
463 raise VPPIOError(2, 'Connect failed')
464 self.vpp_dictionary_maxid = self.transport.msg_table_max_index()
465 self._register_functions(do_async=do_async)
467 # Initialise control ping
468 crc = self.messages['control_ping'].crc
469 self.control_ping_index = self.transport.get_msg_index(
470 ('control_ping' + '_' + crc[2:]).encode('utf-8'))
471 self.control_ping_msgdef = self.messages['control_ping']
472 if self.async_thread:
473 self.event_thread = threading.Thread(
474 target=self.thread_msg_handler)
475 self.event_thread.daemon = True
476 self.event_thread.start()
478 self.event_thread = None
481 def connect(self, name, chroot_prefix=None, do_async=False, rx_qlen=32):
484 name - the name of the client.
485 chroot_prefix - if VPP is chroot'ed, the prefix of the jail
486 do_async - if true, messages are sent without waiting for a reply
487 rx_qlen - the length of the VPP message receive queue between
490 msg_handler = self.transport.get_callback(do_async)
491 return self.connect_internal(name, msg_handler, chroot_prefix, rx_qlen,
494 def connect_sync(self, name, chroot_prefix=None, rx_qlen=32):
495 """Attach to VPP in synchronous mode. Application must poll for events.
497 name - the name of the client.
498 chroot_prefix - if VPP is chroot'ed, the prefix of the jail
499 rx_qlen - the length of the VPP message receive queue between
503 return self.connect_internal(name, None, chroot_prefix, rx_qlen,
506 def disconnect(self):
507 """Detach from VPP."""
508 rv = self.transport.disconnect()
509 if self.event_thread is not None:
510 self.message_queue.put("terminate event thread")
513 def msg_handler_sync(self, msg):
514 """Process an incoming message from VPP in sync mode.
516 The message may be a reply or it may be an async notification.
518 r = self.decode_incoming_msg(msg)
522 # If we have a context, then use the context to find any
523 # request waiting for a reply
525 if hasattr(r, 'context') and r.context > 0:
529 # No context -> async notification that we feed to the callback
530 self.message_queue.put_nowait(r)
532 raise VPPIOError(2, 'RPC reply message received in event handler')
534 def has_context(self, msg):
538 header = VPPType('header_with_context', [['u16', 'msgid'],
539 ['u32', 'client_index'],
542 (i, ci, context), size = header.unpack(msg, 0)
543 if self.id_names[i] == 'rx_thread_exit':
547 # Decode message and returns a tuple.
549 msgobj = self.id_msgdef[i]
550 if 'context' in msgobj.field_by_name and context >= 0:
554 def decode_incoming_msg(self, msg, no_type_conversion=False):
556 self.logger.warning('vpp_api.read failed')
559 (i, ci), size = self.header.unpack(msg, 0)
560 if self.id_names[i] == 'rx_thread_exit':
564 # Decode message and returns a tuple.
566 msgobj = self.id_msgdef[i]
568 raise VPPIOError(2, 'Reply message undefined')
570 r, size = msgobj.unpack(msg, ntc=no_type_conversion)
573 def msg_handler_async(self, msg):
574 """Process a message from VPP in async mode.
576 In async mode, all messages are returned to the callback.
578 r = self.decode_incoming_msg(msg)
582 msgname = type(r).__name__
584 if self.event_callback:
585 self.event_callback(msgname, r)
587 def _control_ping(self, context):
588 """Send a ping command."""
589 self._call_vpp_async(self.control_ping_index,
590 self.control_ping_msgdef,
593 def validate_args(self, msg, kwargs):
594 d = set(kwargs.keys()) - set(msg.field_by_name.keys())
596 raise VPPValueError('Invalid argument {} to {}'
597 .format(list(d), msg.name))
599 def _call_vpp(self, i, msgdef, multipart, **kwargs):
600 """Given a message, send the message and await a reply.
602 msgdef - the message packing definition
603 i - the message type index
604 multipart - True if the message returns multiple
606 context - context number - chosen at random if not
608 The remainder of the kwargs are the arguments to the API call.
610 The return value is the message or message array containing
611 the response. It will raise an IOError exception if there was
612 no response within the timeout window.
615 if 'context' not in kwargs:
616 context = self.get_context()
617 kwargs['context'] = context
619 context = kwargs['context']
620 kwargs['_vl_msg_id'] = i
622 no_type_conversion = kwargs.pop('_no_type_conversion', False)
625 if self.transport.socket_index:
626 kwargs['client_index'] = self.transport.socket_index
627 except AttributeError:
629 self.validate_args(msgdef, kwargs)
631 s = 'Calling {}({})'.format(msgdef.name,
632 ','.join(['{!r}:{!r}'.format(k, v) for k, v in kwargs.items()]))
635 b = msgdef.pack(kwargs)
636 self.transport.suspend()
638 self.transport.write(b)
641 # Send a ping after the request - we use its response
642 # to detect that we have seen all results.
643 self._control_ping(context)
645 # Block until we get a reply.
648 msg = self.transport.read()
650 raise VPPIOError(2, 'VPP API client: read failed')
651 r = self.decode_incoming_msg(msg, no_type_conversion)
652 msgname = type(r).__name__
653 if context not in r or r.context == 0 or context != r.context:
654 # Message being queued
655 self.message_queue.put_nowait(r)
661 if msgname == 'control_ping_reply':
666 self.transport.resume()
668 self.logger.debug('Return from {!r}'.format(r))
671 def _call_vpp_async(self, i, msg, **kwargs):
672 """Given a message, send the message and await a reply.
674 msgdef - the message packing definition
675 i - the message type index
676 context - context number - chosen at random if not
678 The remainder of the kwargs are the arguments to the API call.
680 if 'context' not in kwargs:
681 context = self.get_context()
682 kwargs['context'] = context
684 context = kwargs['context']
686 if self.transport.socket_index:
687 kwargs['client_index'] = self.transport.socket_index
688 except AttributeError:
689 kwargs['client_index'] = 0
690 kwargs['_vl_msg_id'] = i
693 self.transport.write(b)
695 def register_event_callback(self, callback):
696 """Register a callback for async messages.
698 This will be called for async notifications in sync mode,
699 and all messages in async mode. In sync mode, replies to
700 requests will not come here.
702 callback is a fn(msg_type_name, msg_type) that will be
703 called when a message comes in. While this function is
704 executing, note that (a) you are in a background thread and
705 may wish to use threading.Lock to protect your datastructures,
706 and (b) message processing from VPP will stop (so if you take
707 a long while about it you may provoke reply timeouts or cause
708 VPP to fill the RX buffer). Passing None will disable the
711 self.event_callback = callback
713 def thread_msg_handler(self):
714 """Python thread calling the user registered message handler.
716 This is to emulate the old style event callback scheme. Modern
717 clients should provide their own thread to poll the event
721 r = self.message_queue.get()
722 if r == "terminate event thread":
724 msgname = type(r).__name__
725 if self.event_callback:
726 self.event_callback(msgname, r)
729 return "<VPPApiClient apifiles=%s, testmode=%s, async_thread=%s, " \
730 "logger=%s, read_timeout=%s, use_socket=%s, " \
731 "server_address='%s'>" % (
732 self._apifiles, self.testmode, self.async_thread,
733 self.logger, self.read_timeout, self.use_socket,
737 # Provide the old name for backward compatibility.
740 # vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4