3 # Copyright (c) 2016 Cisco and/or its affiliates.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
17 from __future__ import print_function
18 from __future__ import absolute_import
29 from . vpp_serializer import VPPType, VPPEnumType, VPPUnionType, BaseTypes
30 from . vpp_serializer import VPPMessage, vpp_get_type, VPPTypeAlias
31 from . vpp_format import VPPFormat
33 if sys.version[0] == '2':
39 class VppEnumType(type):
40 def __getattr__(cls, name):
41 t = vpp_get_type(name)
46 # class VppEnum(metaclass=VppEnumType):
48 class VppEnum(object):
49 __metaclass__ = VppEnumType
52 def vpp_atexit(vpp_weakref):
53 """Clean up VPP connection on shutdown."""
54 vpp_instance = vpp_weakref()
55 if vpp_instance and vpp_instance.transport.connected:
56 vpp_instance.logger.debug('Cleaning up VPP on exit')
57 vpp_instance.disconnect()
61 if sys.version[0] == '2':
67 class VppApiDynamicMethodHolder(object):
71 class FuncWrapper(object):
72 def __init__(self, func):
74 self.__name__ = func.__name__
76 def __call__(self, **kwargs):
77 return self._func(**kwargs)
80 class VPPApiError(Exception):
84 class VPPNotImplementedError(NotImplementedError):
88 class VPPIOError(IOError):
92 class VPPRuntimeError(RuntimeError):
96 class VPPValueError(ValueError):
103 This class provides the APIs to VPP. The APIs are loaded
104 from provided .api.json files and makes functions accordingly.
105 These functions are documented in the VPP .api files, as they
106 are dynamically created.
108 Additionally, VPP can send callback messages; this class
109 provides a means to register a callback function to receive
110 these messages in a background thread.
112 VPPApiError = VPPApiError
113 VPPRuntimeError = VPPRuntimeError
114 VPPValueError = VPPValueError
115 VPPNotImplementedError = VPPNotImplementedError
116 VPPIOError = VPPIOError
118 def process_json_file(self, apidef_file):
119 api = json.load(apidef_file)
121 for t in api['enums']:
122 t[0] = 'vl_api_' + t[0] + '_t'
123 types[t[0]] = {'type': 'enum', 'data': t}
124 for t in api['unions']:
125 t[0] = 'vl_api_' + t[0] + '_t'
126 types[t[0]] = {'type': 'union', 'data': t}
127 for t in api['types']:
128 t[0] = 'vl_api_' + t[0] + '_t'
129 types[t[0]] = {'type': 'type', 'data': t}
130 for t, v in api['aliases'].items():
131 types['vl_api_' + t + '_t'] = {'type': 'alias', 'data': v}
132 self.services.update(api['services'])
137 for k, v in types.items():
139 if not vpp_get_type(k):
140 if v['type'] == 'enum':
142 VPPEnumType(t[0], t[1:])
145 elif v['type'] == 'union':
147 VPPUnionType(t[0], t[1:])
150 elif v['type'] == 'type':
155 elif v['type'] == 'alias':
160 if len(unresolved) == 0:
163 raise VPPValueError('Unresolved type definitions {}'
168 for m in api['messages']:
170 self.messages[m[0]] = VPPMessage(m[0], m[1:])
171 except VPPNotImplementedError:
172 self.logger.error('Not implemented error for {}'.format(m[0]))
174 def __init__(self, apifiles=None, testmode=False, async_thread=True,
175 logger=None, loglevel=None,
176 read_timeout=5, use_socket=False,
177 server_address='/run/vpp-api.sock'):
178 """Create a VPP API object.
180 apifiles is a list of files containing API
181 descriptions that will be loaded - methods will be
182 dynamically created reflecting these APIs. If not
183 provided this will load the API files from VPP's
184 default install location.
186 logger, if supplied, is the logging logger object to log to.
187 loglevel, if supplied, is the log level this logger is set
188 to report at (from the loglevels in the logging module).
191 logger = logging.getLogger(__name__)
192 if loglevel is not None:
193 logger.setLevel(loglevel)
200 self.header = VPPType('header', [['u16', 'msgid'],
201 ['u32', 'client_index']])
203 self.event_callback = None
204 self.message_queue = queue.Queue()
205 self.read_timeout = read_timeout
206 self.async_thread = async_thread
209 from . vpp_transport_socket import VppTransport
211 from . vpp_transport_shmem import VppTransport
214 # Pick up API definitions from default directory
216 apifiles = self.find_api_files()
218 # In test mode we don't care that we can't find the API files
222 raise VPPRuntimeError
224 for file in apifiles:
225 with open(file) as apidef_file:
226 self.process_json_file(apidef_file)
228 self.apifiles = apifiles
231 if len(self.messages) == 0 and not testmode:
232 raise VPPValueError(1, 'Missing JSON message definitions')
234 self.transport = VppTransport(self, read_timeout=read_timeout,
235 server_address=server_address)
236 # Make sure we allow VPP to clean up the message rings.
237 atexit.register(vpp_atexit, weakref.ref(self))
239 class ContextId(object):
240 """Thread-safe provider of unique context IDs."""
243 self.lock = threading.Lock()
246 """Get a new unique (or, at least, not recently used) context."""
250 get_context = ContextId()
252 def get_type(self, name):
253 return vpp_get_type(name)
256 def find_api_dir(cls):
257 """Attempt to find the best directory in which API definition
258 files may reside. If the value VPP_API_DIR exists in the environment
259 then it is first on the search list. If we're inside a recognized
260 location in a VPP source tree (src/scripts and src/vpp-api/python)
261 then entries from there to the likely locations in build-root are
262 added. Finally the location used by system packages is added.
264 :returns: A single directory name, or None if no such directory
269 if 'VPP_API_DIR' in os.environ:
270 dirs.append(os.environ['VPP_API_DIR'])
272 # perhaps we're in the 'src/scripts' or 'src/vpp-api/python' dir;
273 # in which case, plot a course to likely places in the src tree
274 import __main__ as main
275 if hasattr(main, '__file__'):
276 # get the path of the calling script
277 localdir = os.path.dirname(os.path.realpath(main.__file__))
279 # use cwd if there is no calling script
280 localdir = os.getcwd()
281 localdir_s = localdir.split(os.path.sep)
284 """Match dir against right-hand components of the script dir"""
285 d = dir.split('/') # param 'dir' assumes a / separator
287 return len(localdir_s) > length and localdir_s[-length:] == d
289 def sdir(srcdir, variant):
290 """Build a path from srcdir to the staged API files of
291 'variant' (typically '' or '_debug')"""
292 # Since 'core' and 'plugin' files are staged
293 # in separate directories, we target the parent dir.
294 return os.path.sep.join((
297 'install-vpp%s-native' % variant,
305 if dmatch('src/scripts'):
306 srcdir = os.path.sep.join(localdir_s[:-2])
307 elif dmatch('src/vpp-api/python'):
308 srcdir = os.path.sep.join(localdir_s[:-3])
310 # we're apparently running tests
311 srcdir = os.path.sep.join(localdir_s[:-1])
314 # we're in the source tree, try both the debug and release
316 dirs.append(sdir(srcdir, '_debug'))
317 dirs.append(sdir(srcdir, ''))
319 # Test for staged copies of the scripts
320 # For these, since we explicitly know if we're running a debug versus
321 # release variant, target only the relevant directory
322 if dmatch('build-root/install-vpp_debug-native/vpp/bin'):
323 srcdir = os.path.sep.join(localdir_s[:-4])
324 dirs.append(sdir(srcdir, '_debug'))
325 if dmatch('build-root/install-vpp-native/vpp/bin'):
326 srcdir = os.path.sep.join(localdir_s[:-4])
327 dirs.append(sdir(srcdir, ''))
329 # finally, try the location system packages typically install into
330 dirs.append(os.path.sep.join(('', 'usr', 'share', 'vpp', 'api')))
332 # check the directories for existance; first one wins
334 if os.path.isdir(dir):
340 def find_api_files(cls, api_dir=None, patterns='*'):
341 """Find API definition files from the given directory tree with the
342 given pattern. If no directory is given then find_api_dir() is used
343 to locate one. If no pattern is given then all definition files found
344 in the directory tree are used.
346 :param api_dir: A directory tree in which to locate API definition
347 files; subdirectories are descended into.
348 If this is None then find_api_dir() is called to discover it.
349 :param patterns: A list of patterns to use in each visited directory
350 when looking for files.
351 This can be a list/tuple object or a comma-separated string of
352 patterns. Each value in the list will have leading/trialing
354 The pattern specifies the first part of the filename, '.api.json'
356 The results are de-duplicated, thus overlapping patterns are fine.
357 If this is None it defaults to '*' meaning "all API files".
358 :returns: A list of file paths for the API files found.
361 api_dir = cls.find_api_dir()
363 raise VPPApiError("api_dir cannot be located")
365 if isinstance(patterns, list) or isinstance(patterns, tuple):
366 patterns = [p.strip() + '.api.json' for p in patterns]
368 patterns = [p.strip() + '.api.json' for p in patterns.split(",")]
371 for root, dirnames, files in os.walk(api_dir):
372 # iterate all given patterns and de-dup the result
373 files = set(sum([fnmatch.filter(files, p) for p in patterns], []))
374 for filename in files:
375 api_files.append(os.path.join(root, filename))
381 if not hasattr(self, "_api"):
382 raise VPPApiError("Not connected, api definitions not available")
385 def make_function(self, msg, i, multipart, do_async):
388 return self._call_vpp_async(i, msg, **kwargs)
391 return self._call_vpp(i, msg, multipart, **kwargs)
393 f.__name__ = str(msg.name)
394 f.__doc__ = ", ".join(["%s %s" %
395 (msg.fieldtypes[j], k)
396 for j, k in enumerate(msg.fields)])
399 def _register_functions(self, do_async=False):
400 self.id_names = [None] * (self.vpp_dictionary_maxid + 1)
401 self.id_msgdef = [None] * (self.vpp_dictionary_maxid + 1)
402 self._api = VppApiDynamicMethodHolder()
403 for name, msg in vpp_iterator(self.messages):
404 n = name + '_' + msg.crc[2:]
405 i = self.transport.get_msg_index(n.encode())
407 self.id_msgdef[i] = msg
408 self.id_names[i] = name
410 # Create function for client side messages.
411 if name in self.services:
412 if 'stream' in self.services[name] and self.services[name]['stream']:
416 f = self.make_function(msg, i, multipart, do_async)
417 setattr(self._api, name, FuncWrapper(f))
420 'No such message type or failed CRC checksum: %s', n)
422 def connect_internal(self, name, msg_handler, chroot_prefix, rx_qlen,
424 pfx = chroot_prefix.encode() if chroot_prefix else None
426 rv = self.transport.connect(name.encode(), pfx, msg_handler, rx_qlen)
428 raise VPPIOError(2, 'Connect failed')
429 self.vpp_dictionary_maxid = self.transport.msg_table_max_index()
430 self._register_functions(do_async=do_async)
432 # Initialise control ping
433 crc = self.messages['control_ping'].crc
434 self.control_ping_index = self.transport.get_msg_index(
435 ('control_ping' + '_' + crc[2:]).encode())
436 self.control_ping_msgdef = self.messages['control_ping']
437 if self.async_thread:
438 self.event_thread = threading.Thread(
439 target=self.thread_msg_handler)
440 self.event_thread.daemon = True
441 self.event_thread.start()
444 def connect(self, name, chroot_prefix=None, do_async=False, rx_qlen=32):
447 name - the name of the client.
448 chroot_prefix - if VPP is chroot'ed, the prefix of the jail
449 do_async - if true, messages are sent without waiting for a reply
450 rx_qlen - the length of the VPP message receive queue between
453 msg_handler = self.transport.get_callback(do_async)
454 return self.connect_internal(name, msg_handler, chroot_prefix, rx_qlen,
457 def connect_sync(self, name, chroot_prefix=None, rx_qlen=32):
458 """Attach to VPP in synchronous mode. Application must poll for events.
460 name - the name of the client.
461 chroot_prefix - if VPP is chroot'ed, the prefix of the jail
462 rx_qlen - the length of the VPP message receive queue between
466 return self.connect_internal(name, None, chroot_prefix, rx_qlen,
469 def disconnect(self):
470 """Detach from VPP."""
471 rv = self.transport.disconnect()
472 self.message_queue.put("terminate event thread")
475 def msg_handler_sync(self, msg):
476 """Process an incoming message from VPP in sync mode.
478 The message may be a reply or it may be an async notification.
480 r = self.decode_incoming_msg(msg)
484 # If we have a context, then use the context to find any
485 # request waiting for a reply
487 if hasattr(r, 'context') and r.context > 0:
491 # No context -> async notification that we feed to the callback
492 self.message_queue.put_nowait(r)
494 raise VPPIOError(2, 'RPC reply message received in event handler')
496 def decode_incoming_msg(self, msg):
498 self.logger.warning('vpp_api.read failed')
500 (i, ci), size = self.header.unpack(msg, 0)
501 if self.id_names[i] == 'rx_thread_exit':
505 # Decode message and returns a tuple.
507 msgobj = self.id_msgdef[i]
509 raise VPPIOError(2, 'Reply message undefined')
511 r, size = msgobj.unpack(msg)
514 def msg_handler_async(self, msg):
515 """Process a message from VPP in async mode.
517 In async mode, all messages are returned to the callback.
519 r = self.decode_incoming_msg(msg)
523 msgname = type(r).__name__
525 if self.event_callback:
526 self.event_callback(msgname, r)
528 def _control_ping(self, context):
529 """Send a ping command."""
530 self._call_vpp_async(self.control_ping_index,
531 self.control_ping_msgdef,
534 def validate_args(self, msg, kwargs):
535 d = set(kwargs.keys()) - set(msg.field_by_name.keys())
537 raise VPPValueError('Invalid argument {} to {}'
538 .format(list(d), msg.name))
540 def _call_vpp(self, i, msg, multipart, **kwargs):
541 """Given a message, send the message and await a reply.
543 msgdef - the message packing definition
544 i - the message type index
545 multipart - True if the message returns multiple
547 context - context number - chosen at random if not
549 The remainder of the kwargs are the arguments to the API call.
551 The return value is the message or message array containing
552 the response. It will raise an IOError exception if there was
553 no response within the timeout window.
556 if 'context' not in kwargs:
557 context = self.get_context()
558 kwargs['context'] = context
560 context = kwargs['context']
561 kwargs['_vl_msg_id'] = i
564 if self.transport.socket_index:
565 kwargs['client_index'] = self.transport.socket_index
566 except AttributeError:
568 self.validate_args(msg, kwargs)
570 self.transport.suspend()
572 self.transport.write(b)
575 # Send a ping after the request - we use its response
576 # to detect that we have seen all results.
577 self._control_ping(context)
579 # Block until we get a reply.
582 msg = self.transport.read()
584 raise VPPIOError(2, 'VPP API client: read failed')
585 r = self.decode_incoming_msg(msg)
586 msgname = type(r).__name__
587 if context not in r or r.context == 0 or context != r.context:
588 # Message being queued
589 self.message_queue.put_nowait(r)
595 if msgname == 'control_ping_reply':
600 self.transport.resume()
604 def _call_vpp_async(self, i, msg, **kwargs):
605 """Given a message, send the message and await a reply.
607 msgdef - the message packing definition
608 i - the message type index
609 context - context number - chosen at random if not
611 The remainder of the kwargs are the arguments to the API call.
613 if 'context' not in kwargs:
614 context = self.get_context()
615 kwargs['context'] = context
617 context = kwargs['context']
619 if self.transport.socket_index:
620 kwargs['client_index'] = self.transport.socket_index
621 except AttributeError:
622 kwargs['client_index'] = 0
623 kwargs['_vl_msg_id'] = i
626 self.transport.write(b)
628 def register_event_callback(self, callback):
629 """Register a callback for async messages.
631 This will be called for async notifications in sync mode,
632 and all messages in async mode. In sync mode, replies to
633 requests will not come here.
635 callback is a fn(msg_type_name, msg_type) that will be
636 called when a message comes in. While this function is
637 executing, note that (a) you are in a background thread and
638 may wish to use threading.Lock to protect your datastructures,
639 and (b) message processing from VPP will stop (so if you take
640 a long while about it you may provoke reply timeouts or cause
641 VPP to fill the RX buffer). Passing None will disable the
644 self.event_callback = callback
646 def thread_msg_handler(self):
647 """Python thread calling the user registered message handler.
649 This is to emulate the old style event callback scheme. Modern
650 clients should provide their own thread to poll the event
654 r = self.message_queue.get()
655 if r == "terminate event thread":
657 msgname = type(r).__name__
658 if self.event_callback:
659 self.event_callback(msgname, r)
662 # vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4