3 # Copyright (c) 2016 Cisco and/or its affiliates.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
17 from __future__ import print_function
18 from __future__ import absolute_import
29 from . vpp_serializer import VPPType, VPPEnumType, VPPUnionType, BaseTypes
30 from . vpp_serializer import VPPMessage, vpp_get_type, VPPTypeAlias
31 from . vpp_format import VPPFormat
33 if sys.version[0] == '2':
39 class VppEnumType(type):
40 def __getattr__(cls, name):
41 t = vpp_get_type(name)
46 # class VppEnum(metaclass=VppEnumType):
48 class VppEnum(object):
49 __metaclass__ = VppEnumType
52 def vpp_atexit(vpp_weakref):
53 """Clean up VPP connection on shutdown."""
54 vpp_instance = vpp_weakref()
55 if vpp_instance and vpp_instance.transport.connected:
56 vpp_instance.logger.debug('Cleaning up VPP on exit')
57 vpp_instance.disconnect()
61 if sys.version[0] == '2':
67 class VppApiDynamicMethodHolder(object):
71 class FuncWrapper(object):
72 def __init__(self, func):
74 self.__name__ = func.__name__
76 def __call__(self, **kwargs):
77 return self._func(**kwargs)
80 class VPPApiError(Exception):
84 class VPPNotImplementedError(NotImplementedError):
88 class VPPIOError(IOError):
92 class VPPRuntimeError(RuntimeError):
96 class VPPValueError(ValueError):
103 This class provides the APIs to VPP. The APIs are loaded
104 from provided .api.json files and makes functions accordingly.
105 These functions are documented in the VPP .api files, as they
106 are dynamically created.
108 Additionally, VPP can send callback messages; this class
109 provides a means to register a callback function to receive
110 these messages in a background thread.
112 VPPApiError = VPPApiError
113 VPPRuntimeError = VPPRuntimeError
114 VPPValueError = VPPValueError
115 VPPNotImplementedError = VPPNotImplementedError
116 VPPIOError = VPPIOError
118 def process_json_file(self, apidef_file):
119 api = json.load(apidef_file)
121 for t in api['enums']:
122 t[0] = 'vl_api_' + t[0] + '_t'
123 types[t[0]] = {'type': 'enum', 'data': t}
124 for t in api['unions']:
125 t[0] = 'vl_api_' + t[0] + '_t'
126 types[t[0]] = {'type': 'union', 'data': t}
127 for t in api['types']:
128 t[0] = 'vl_api_' + t[0] + '_t'
129 types[t[0]] = {'type': 'type', 'data': t}
130 for t, v in api['aliases'].items():
131 types['vl_api_' + t + '_t'] = {'type': 'alias', 'data': v}
136 for k, v in types.items():
138 if not vpp_get_type(k):
139 if v['type'] == 'enum':
141 VPPEnumType(t[0], t[1:])
144 elif v['type'] == 'union':
146 VPPUnionType(t[0], t[1:])
149 elif v['type'] == 'type':
154 elif v['type'] == 'alias':
159 if len(unresolved) == 0:
162 raise VPPValueError('Unresolved type definitions {}'
167 for m in api['messages']:
169 self.messages[m[0]] = VPPMessage(m[0], m[1:])
170 except VPPNotImplementedError:
171 self.logger.error('Not implemented error for {}'.format(m[0]))
173 def __init__(self, apifiles=None, testmode=False, async_thread=True,
174 logger=None, loglevel=None,
175 read_timeout=5, use_socket=False,
176 server_address='/run/vpp-api.sock'):
177 """Create a VPP API object.
179 apifiles is a list of files containing API
180 descriptions that will be loaded - methods will be
181 dynamically created reflecting these APIs. If not
182 provided this will load the API files from VPP's
183 default install location.
185 logger, if supplied, is the logging logger object to log to.
186 loglevel, if supplied, is the log level this logger is set
187 to report at (from the loglevels in the logging module).
190 logger = logging.getLogger(__name__)
191 if loglevel is not None:
192 logger.setLevel(loglevel)
198 self.header = VPPType('header', [['u16', 'msgid'],
199 ['u32', 'client_index']])
201 self.event_callback = None
202 self.message_queue = queue.Queue()
203 self.read_timeout = read_timeout
204 self.async_thread = async_thread
207 from . vpp_transport_socket import VppTransport
209 from . vpp_transport_shmem import VppTransport
212 # Pick up API definitions from default directory
214 apifiles = self.find_api_files()
216 # In test mode we don't care that we can't find the API files
220 raise VPPRuntimeError
222 for file in apifiles:
223 with open(file) as apidef_file:
224 self.process_json_file(apidef_file)
226 self.apifiles = apifiles
229 if len(self.messages) == 0 and not testmode:
230 raise VPPValueError(1, 'Missing JSON message definitions')
232 self.transport = VppTransport(self, read_timeout=read_timeout,
233 server_address=server_address)
234 # Make sure we allow VPP to clean up the message rings.
235 atexit.register(vpp_atexit, weakref.ref(self))
237 class ContextId(object):
238 """Thread-safe provider of unique context IDs."""
241 self.lock = threading.Lock()
244 """Get a new unique (or, at least, not recently used) context."""
248 get_context = ContextId()
250 def get_type(self, name):
251 return vpp_get_type(name)
254 def find_api_dir(cls):
255 """Attempt to find the best directory in which API definition
256 files may reside. If the value VPP_API_DIR exists in the environment
257 then it is first on the search list. If we're inside a recognized
258 location in a VPP source tree (src/scripts and src/vpp-api/python)
259 then entries from there to the likely locations in build-root are
260 added. Finally the location used by system packages is added.
262 :returns: A single directory name, or None if no such directory
267 if 'VPP_API_DIR' in os.environ:
268 dirs.append(os.environ['VPP_API_DIR'])
270 # perhaps we're in the 'src/scripts' or 'src/vpp-api/python' dir;
271 # in which case, plot a course to likely places in the src tree
272 import __main__ as main
273 if hasattr(main, '__file__'):
274 # get the path of the calling script
275 localdir = os.path.dirname(os.path.realpath(main.__file__))
277 # use cwd if there is no calling script
278 localdir = os.getcwd()
279 localdir_s = localdir.split(os.path.sep)
282 """Match dir against right-hand components of the script dir"""
283 d = dir.split('/') # param 'dir' assumes a / separator
285 return len(localdir_s) > length and localdir_s[-length:] == d
287 def sdir(srcdir, variant):
288 """Build a path from srcdir to the staged API files of
289 'variant' (typically '' or '_debug')"""
290 # Since 'core' and 'plugin' files are staged
291 # in separate directories, we target the parent dir.
292 return os.path.sep.join((
295 'install-vpp%s-native' % variant,
303 if dmatch('src/scripts'):
304 srcdir = os.path.sep.join(localdir_s[:-2])
305 elif dmatch('src/vpp-api/python'):
306 srcdir = os.path.sep.join(localdir_s[:-3])
308 # we're apparently running tests
309 srcdir = os.path.sep.join(localdir_s[:-1])
312 # we're in the source tree, try both the debug and release
314 dirs.append(sdir(srcdir, '_debug'))
315 dirs.append(sdir(srcdir, ''))
317 # Test for staged copies of the scripts
318 # For these, since we explicitly know if we're running a debug versus
319 # release variant, target only the relevant directory
320 if dmatch('build-root/install-vpp_debug-native/vpp/bin'):
321 srcdir = os.path.sep.join(localdir_s[:-4])
322 dirs.append(sdir(srcdir, '_debug'))
323 if dmatch('build-root/install-vpp-native/vpp/bin'):
324 srcdir = os.path.sep.join(localdir_s[:-4])
325 dirs.append(sdir(srcdir, ''))
327 # finally, try the location system packages typically install into
328 dirs.append(os.path.sep.join(('', 'usr', 'share', 'vpp', 'api')))
330 # check the directories for existance; first one wins
332 if os.path.isdir(dir):
338 def find_api_files(cls, api_dir=None, patterns='*'):
339 """Find API definition files from the given directory tree with the
340 given pattern. If no directory is given then find_api_dir() is used
341 to locate one. If no pattern is given then all definition files found
342 in the directory tree are used.
344 :param api_dir: A directory tree in which to locate API definition
345 files; subdirectories are descended into.
346 If this is None then find_api_dir() is called to discover it.
347 :param patterns: A list of patterns to use in each visited directory
348 when looking for files.
349 This can be a list/tuple object or a comma-separated string of
350 patterns. Each value in the list will have leading/trialing
352 The pattern specifies the first part of the filename, '.api.json'
354 The results are de-duplicated, thus overlapping patterns are fine.
355 If this is None it defaults to '*' meaning "all API files".
356 :returns: A list of file paths for the API files found.
359 api_dir = cls.find_api_dir()
361 raise VPPApiError("api_dir cannot be located")
363 if isinstance(patterns, list) or isinstance(patterns, tuple):
364 patterns = [p.strip() + '.api.json' for p in patterns]
366 patterns = [p.strip() + '.api.json' for p in patterns.split(",")]
369 for root, dirnames, files in os.walk(api_dir):
370 # iterate all given patterns and de-dup the result
371 files = set(sum([fnmatch.filter(files, p) for p in patterns], []))
372 for filename in files:
373 api_files.append(os.path.join(root, filename))
379 if not hasattr(self, "_api"):
380 raise VPPApiError("Not connected, api definitions not available")
383 def make_function(self, msg, i, multipart, do_async):
386 return self._call_vpp_async(i, msg, **kwargs)
389 return self._call_vpp(i, msg, multipart, **kwargs)
391 f.__name__ = str(msg.name)
392 f.__doc__ = ", ".join(["%s %s" %
393 (msg.fieldtypes[j], k)
394 for j, k in enumerate(msg.fields)])
397 def _register_functions(self, do_async=False):
398 self.id_names = [None] * (self.vpp_dictionary_maxid + 1)
399 self.id_msgdef = [None] * (self.vpp_dictionary_maxid + 1)
400 self._api = VppApiDynamicMethodHolder()
401 for name, msg in vpp_iterator(self.messages):
402 n = name + '_' + msg.crc[2:]
403 i = self.transport.get_msg_index(n.encode())
405 self.id_msgdef[i] = msg
406 self.id_names[i] = name
407 # TODO: Fix multipart (use services)
408 multipart = True if name.find('_dump') > 0 else False
409 f = self.make_function(msg, i, multipart, do_async)
410 setattr(self._api, name, FuncWrapper(f))
413 'No such message type or failed CRC checksum: %s', n)
415 def connect_internal(self, name, msg_handler, chroot_prefix, rx_qlen,
417 pfx = chroot_prefix.encode() if chroot_prefix else None
419 rv = self.transport.connect(name.encode(), pfx, msg_handler, rx_qlen)
421 raise VPPIOError(2, 'Connect failed')
422 self.vpp_dictionary_maxid = self.transport.msg_table_max_index()
423 self._register_functions(do_async=do_async)
425 # Initialise control ping
426 crc = self.messages['control_ping'].crc
427 self.control_ping_index = self.transport.get_msg_index(
428 ('control_ping' + '_' + crc[2:]).encode())
429 self.control_ping_msgdef = self.messages['control_ping']
430 if self.async_thread:
431 self.event_thread = threading.Thread(
432 target=self.thread_msg_handler)
433 self.event_thread.daemon = True
434 self.event_thread.start()
437 def connect(self, name, chroot_prefix=None, do_async=False, rx_qlen=32):
440 name - the name of the client.
441 chroot_prefix - if VPP is chroot'ed, the prefix of the jail
442 do_async - if true, messages are sent without waiting for a reply
443 rx_qlen - the length of the VPP message receive queue between
446 msg_handler = self.transport.get_callback(do_async)
447 return self.connect_internal(name, msg_handler, chroot_prefix, rx_qlen,
450 def connect_sync(self, name, chroot_prefix=None, rx_qlen=32):
451 """Attach to VPP in synchronous mode. Application must poll for events.
453 name - the name of the client.
454 chroot_prefix - if VPP is chroot'ed, the prefix of the jail
455 rx_qlen - the length of the VPP message receive queue between
459 return self.connect_internal(name, None, chroot_prefix, rx_qlen,
462 def disconnect(self):
463 """Detach from VPP."""
464 rv = self.transport.disconnect()
465 self.message_queue.put("terminate event thread")
468 def msg_handler_sync(self, msg):
469 """Process an incoming message from VPP in sync mode.
471 The message may be a reply or it may be an async notification.
473 r = self.decode_incoming_msg(msg)
477 # If we have a context, then use the context to find any
478 # request waiting for a reply
480 if hasattr(r, 'context') and r.context > 0:
484 # No context -> async notification that we feed to the callback
485 self.message_queue.put_nowait(r)
487 raise VPPIOError(2, 'RPC reply message received in event handler')
489 def decode_incoming_msg(self, msg):
491 self.logger.warning('vpp_api.read failed')
493 (i, ci), size = self.header.unpack(msg, 0)
494 if self.id_names[i] == 'rx_thread_exit':
498 # Decode message and returns a tuple.
500 msgobj = self.id_msgdef[i]
502 raise VPPIOError(2, 'Reply message undefined')
504 r, size = msgobj.unpack(msg)
507 def msg_handler_async(self, msg):
508 """Process a message from VPP in async mode.
510 In async mode, all messages are returned to the callback.
512 r = self.decode_incoming_msg(msg)
516 msgname = type(r).__name__
518 if self.event_callback:
519 self.event_callback(msgname, r)
521 def _control_ping(self, context):
522 """Send a ping command."""
523 self._call_vpp_async(self.control_ping_index,
524 self.control_ping_msgdef,
527 def validate_args(self, msg, kwargs):
528 d = set(kwargs.keys()) - set(msg.field_by_name.keys())
530 raise VPPValueError('Invalid argument {} to {}'
531 .format(list(d), msg.name))
533 def _call_vpp(self, i, msg, multipart, **kwargs):
534 """Given a message, send the message and await a reply.
536 msgdef - the message packing definition
537 i - the message type index
538 multipart - True if the message returns multiple
540 context - context number - chosen at random if not
542 The remainder of the kwargs are the arguments to the API call.
544 The return value is the message or message array containing
545 the response. It will raise an IOError exception if there was
546 no response within the timeout window.
549 if 'context' not in kwargs:
550 context = self.get_context()
551 kwargs['context'] = context
553 context = kwargs['context']
554 kwargs['_vl_msg_id'] = i
557 if self.transport.socket_index:
558 kwargs['client_index'] = self.transport.socket_index
559 except AttributeError:
561 self.validate_args(msg, kwargs)
563 self.transport.suspend()
565 self.transport.write(b)
568 # Send a ping after the request - we use its response
569 # to detect that we have seen all results.
570 self._control_ping(context)
572 # Block until we get a reply.
575 msg = self.transport.read()
577 raise VPPIOError(2, 'VPP API client: read failed')
578 r = self.decode_incoming_msg(msg)
579 msgname = type(r).__name__
580 if context not in r or r.context == 0 or context != r.context:
581 # Message being queued
582 self.message_queue.put_nowait(r)
588 if msgname == 'control_ping_reply':
593 self.transport.resume()
597 def _call_vpp_async(self, i, msg, **kwargs):
598 """Given a message, send the message and await a reply.
600 msgdef - the message packing definition
601 i - the message type index
602 context - context number - chosen at random if not
604 The remainder of the kwargs are the arguments to the API call.
606 if 'context' not in kwargs:
607 context = self.get_context()
608 kwargs['context'] = context
610 context = kwargs['context']
612 if self.transport.socket_index:
613 kwargs['client_index'] = self.transport.socket_index
614 except AttributeError:
615 kwargs['client_index'] = 0
616 kwargs['_vl_msg_id'] = i
619 self.transport.write(b)
621 def register_event_callback(self, callback):
622 """Register a callback for async messages.
624 This will be called for async notifications in sync mode,
625 and all messages in async mode. In sync mode, replies to
626 requests will not come here.
628 callback is a fn(msg_type_name, msg_type) that will be
629 called when a message comes in. While this function is
630 executing, note that (a) you are in a background thread and
631 may wish to use threading.Lock to protect your datastructures,
632 and (b) message processing from VPP will stop (so if you take
633 a long while about it you may provoke reply timeouts or cause
634 VPP to fill the RX buffer). Passing None will disable the
637 self.event_callback = callback
639 def thread_msg_handler(self):
640 """Python thread calling the user registered message handler.
642 This is to emulate the old style event callback scheme. Modern
643 clients should provide their own thread to poll the event
647 r = self.message_queue.get()
648 if r == "terminate event thread":
650 msgname = type(r).__name__
651 if self.event_callback:
652 self.event_callback(msgname, r)
655 # vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4