3 # Copyright (c) 2016 Cisco and/or its affiliates.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
17 from __future__ import print_function
18 from __future__ import absolute_import
29 from . vpp_serializer import VPPType, VPPEnumType, VPPUnionType, BaseTypes
30 from . vpp_serializer import VPPMessage
32 if sys.version[0] == '2':
38 def vpp_atexit(vpp_weakref):
39 """Clean up VPP connection on shutdown."""
40 vpp_instance = vpp_weakref()
41 if vpp_instance and vpp_instance.transport.connected:
42 vpp_instance.logger.debug('Cleaning up VPP on exit')
43 vpp_instance.disconnect()
47 if sys.version[0] == '2':
53 class VppApiDynamicMethodHolder(object):
57 class FuncWrapper(object):
58 def __init__(self, func):
60 self.__name__ = func.__name__
62 def __call__(self, **kwargs):
63 return self._func(**kwargs)
69 This class provides the APIs to VPP. The APIs are loaded
70 from provided .api.json files and makes functions accordingly.
71 These functions are documented in the VPP .api files, as they
72 are dynamically created.
74 Additionally, VPP can send callback messages; this class
75 provides a means to register a callback function to receive
76 these messages in a background thread.
79 def process_json_file(self, apidef_file):
80 api = json.load(apidef_file)
82 for t in api['enums']:
83 t[0] = 'vl_api_' + t[0] + '_t'
84 types[t[0]] = {'type': 'enum', 'data': t}
85 for t in api['unions']:
86 t[0] = 'vl_api_' + t[0] + '_t'
87 types[t[0]] = {'type': 'union', 'data': t}
88 for t in api['types']:
89 t[0] = 'vl_api_' + t[0] + '_t'
90 types[t[0]] = {'type': 'type', 'data': t}
95 for k, v in types.items():
97 if v['type'] == 'enum':
99 VPPEnumType(t[0], t[1:])
102 elif v['type'] == 'union':
104 VPPUnionType(t[0], t[1:])
107 elif v['type'] == 'type':
112 if len(unresolved) == 0:
115 raise ValueError('Unresolved type definitions {}'
120 for m in api['messages']:
122 self.messages[m[0]] = VPPMessage(m[0], m[1:])
123 except NotImplementedError:
124 self.logger.error('Not implemented error for {}'.format(m[0]))
126 def __init__(self, apifiles=None, testmode=False, async_thread=True,
127 logger=logging.getLogger('vpp_papi'), loglevel='debug',
128 read_timeout=5, use_socket=False,
129 server_address='/run/vpp-api.sock'):
130 """Create a VPP API object.
132 apifiles is a list of files containing API
133 descriptions that will be loaded - methods will be
134 dynamically created reflecting these APIs. If not
135 provided this will load the API files from VPP's
136 default install location.
138 logger, if supplied, is the logging logger object to log to.
139 loglevel, if supplied, is the log level this logger is set
140 to report at (from the loglevels in the logging module).
143 logger = logging.getLogger(__name__)
144 if loglevel is not None:
145 logger.setLevel(loglevel)
151 self.header = VPPType('header', [['u16', 'msgid'],
152 ['u32', 'client_index']])
154 self.event_callback = None
155 self.message_queue = queue.Queue()
156 self.read_timeout = read_timeout
157 self.async_thread = async_thread
160 from . vpp_transport_socket import VppTransport
162 from . vpp_transport_shmem import VppTransport
165 # Pick up API definitions from default directory
167 apifiles = self.find_api_files()
169 # In test mode we don't care that we can't find the API files
175 for file in apifiles:
176 with open(file) as apidef_file:
177 self.process_json_file(apidef_file)
179 self.apifiles = apifiles
182 if len(self.messages) == 0 and not testmode:
183 raise ValueError(1, 'Missing JSON message definitions')
185 self.transport = VppTransport(self, read_timeout=read_timeout,
186 server_address=server_address)
187 # Make sure we allow VPP to clean up the message rings.
188 atexit.register(vpp_atexit, weakref.ref(self))
190 class ContextId(object):
191 """Thread-safe provider of unique context IDs."""
194 self.lock = threading.Lock()
197 """Get a new unique (or, at least, not recently used) context."""
201 get_context = ContextId()
204 def find_api_dir(cls):
205 """Attempt to find the best directory in which API definition
206 files may reside. If the value VPP_API_DIR exists in the environment
207 then it is first on the search list. If we're inside a recognized
208 location in a VPP source tree (src/scripts and src/vpp-api/python)
209 then entries from there to the likely locations in build-root are
210 added. Finally the location used by system packages is added.
212 :returns: A single directory name, or None if no such directory
217 if 'VPP_API_DIR' in os.environ:
218 dirs.append(os.environ['VPP_API_DIR'])
220 # perhaps we're in the 'src/scripts' or 'src/vpp-api/python' dir;
221 # in which case, plot a course to likely places in the src tree
222 import __main__ as main
223 if hasattr(main, '__file__'):
224 # get the path of the calling script
225 localdir = os.path.dirname(os.path.realpath(main.__file__))
227 # use cwd if there is no calling script
228 localdir = os.getcwd()
229 localdir_s = localdir.split(os.path.sep)
232 """Match dir against right-hand components of the script dir"""
233 d = dir.split('/') # param 'dir' assumes a / separator
235 return len(localdir_s) > length and localdir_s[-length:] == d
237 def sdir(srcdir, variant):
238 """Build a path from srcdir to the staged API files of
239 'variant' (typically '' or '_debug')"""
240 # Since 'core' and 'plugin' files are staged
241 # in separate directories, we target the parent dir.
242 return os.path.sep.join((
245 'install-vpp%s-native' % variant,
253 if dmatch('src/scripts'):
254 srcdir = os.path.sep.join(localdir_s[:-2])
255 elif dmatch('src/vpp-api/python'):
256 srcdir = os.path.sep.join(localdir_s[:-3])
258 # we're apparently running tests
259 srcdir = os.path.sep.join(localdir_s[:-1])
262 # we're in the source tree, try both the debug and release
264 dirs.append(sdir(srcdir, '_debug'))
265 dirs.append(sdir(srcdir, ''))
267 # Test for staged copies of the scripts
268 # For these, since we explicitly know if we're running a debug versus
269 # release variant, target only the relevant directory
270 if dmatch('build-root/install-vpp_debug-native/vpp/bin'):
271 srcdir = os.path.sep.join(localdir_s[:-4])
272 dirs.append(sdir(srcdir, '_debug'))
273 if dmatch('build-root/install-vpp-native/vpp/bin'):
274 srcdir = os.path.sep.join(localdir_s[:-4])
275 dirs.append(sdir(srcdir, ''))
277 # finally, try the location system packages typically install into
278 dirs.append(os.path.sep.join(('', 'usr', 'share', 'vpp', 'api')))
280 # check the directories for existance; first one wins
282 if os.path.isdir(dir):
288 def find_api_files(cls, api_dir=None, patterns='*'):
289 """Find API definition files from the given directory tree with the
290 given pattern. If no directory is given then find_api_dir() is used
291 to locate one. If no pattern is given then all definition files found
292 in the directory tree are used.
294 :param api_dir: A directory tree in which to locate API definition
295 files; subdirectories are descended into.
296 If this is None then find_api_dir() is called to discover it.
297 :param patterns: A list of patterns to use in each visited directory
298 when looking for files.
299 This can be a list/tuple object or a comma-separated string of
300 patterns. Each value in the list will have leading/trialing
302 The pattern specifies the first part of the filename, '.api.json'
304 The results are de-duplicated, thus overlapping patterns are fine.
305 If this is None it defaults to '*' meaning "all API files".
306 :returns: A list of file paths for the API files found.
309 api_dir = cls.find_api_dir()
311 raise RuntimeError("api_dir cannot be located")
313 if isinstance(patterns, list) or isinstance(patterns, tuple):
314 patterns = [p.strip() + '.api.json' for p in patterns]
316 patterns = [p.strip() + '.api.json' for p in patterns.split(",")]
319 for root, dirnames, files in os.walk(api_dir):
320 # iterate all given patterns and de-dup the result
321 files = set(sum([fnmatch.filter(files, p) for p in patterns], []))
322 for filename in files:
323 api_files.append(os.path.join(root, filename))
329 if not hasattr(self, "_api"):
330 raise Exception("Not connected, api definitions not available")
333 def make_function(self, msg, i, multipart, do_async):
336 return self._call_vpp_async(i, msg, **kwargs)
339 return self._call_vpp(i, msg, multipart, **kwargs)
341 f.__name__ = str(msg.name)
342 f.__doc__ = ", ".join(["%s %s" %
343 (msg.fieldtypes[j], k)
344 for j, k in enumerate(msg.fields)])
347 def _register_functions(self, do_async=False):
348 self.id_names = [None] * (self.vpp_dictionary_maxid + 1)
349 self.id_msgdef = [None] * (self.vpp_dictionary_maxid + 1)
350 self._api = VppApiDynamicMethodHolder()
351 for name, msg in vpp_iterator(self.messages):
352 n = name + '_' + msg.crc[2:]
353 i = self.transport.get_msg_index(n.encode())
355 self.id_msgdef[i] = msg
356 self.id_names[i] = name
357 # TODO: Fix multipart (use services)
358 multipart = True if name.find('_dump') > 0 else False
359 f = self.make_function(msg, i, multipart, do_async)
360 setattr(self._api, name, FuncWrapper(f))
363 'No such message type or failed CRC checksum: %s', n)
365 def connect_internal(self, name, msg_handler, chroot_prefix, rx_qlen,
367 pfx = chroot_prefix.encode() if chroot_prefix else None
369 rv = self.transport.connect(name.encode(), pfx, msg_handler, rx_qlen)
371 raise IOError(2, 'Connect failed')
372 self.vpp_dictionary_maxid = self.transport.msg_table_max_index()
373 self._register_functions(do_async=do_async)
375 # Initialise control ping
376 crc = self.messages['control_ping'].crc
377 self.control_ping_index = self.transport.get_msg_index(
378 ('control_ping' + '_' + crc[2:]).encode())
379 self.control_ping_msgdef = self.messages['control_ping']
380 if self.async_thread:
381 self.event_thread = threading.Thread(
382 target=self.thread_msg_handler)
383 self.event_thread.daemon = True
384 self.event_thread.start()
387 def connect(self, name, chroot_prefix=None, do_async=False, rx_qlen=32):
390 name - the name of the client.
391 chroot_prefix - if VPP is chroot'ed, the prefix of the jail
392 do_async - if true, messages are sent without waiting for a reply
393 rx_qlen - the length of the VPP message receive queue between
396 msg_handler = self.transport.get_callback(do_async)
397 return self.connect_internal(name, msg_handler, chroot_prefix, rx_qlen,
400 def connect_sync(self, name, chroot_prefix=None, rx_qlen=32):
401 """Attach to VPP in synchronous mode. Application must poll for events.
403 name - the name of the client.
404 chroot_prefix - if VPP is chroot'ed, the prefix of the jail
405 rx_qlen - the length of the VPP message receive queue between
409 return self.connect_internal(name, None, chroot_prefix, rx_qlen,
412 def disconnect(self):
413 """Detach from VPP."""
414 rv = self.transport.disconnect()
415 self.message_queue.put("terminate event thread")
418 def msg_handler_sync(self, msg):
419 """Process an incoming message from VPP in sync mode.
421 The message may be a reply or it may be an async notification.
423 r = self.decode_incoming_msg(msg)
427 # If we have a context, then use the context to find any
428 # request waiting for a reply
430 if hasattr(r, 'context') and r.context > 0:
434 # No context -> async notification that we feed to the callback
435 self.message_queue.put_nowait(r)
437 raise IOError(2, 'RPC reply message received in event handler')
439 def decode_incoming_msg(self, msg):
441 self.logger.warning('vpp_api.read failed')
443 (i, ci), size = self.header.unpack(msg, 0)
444 if self.id_names[i] == 'rx_thread_exit':
448 # Decode message and returns a tuple.
450 msgobj = self.id_msgdef[i]
452 raise IOError(2, 'Reply message undefined')
454 r, size = msgobj.unpack(msg)
457 def msg_handler_async(self, msg):
458 """Process a message from VPP in async mode.
460 In async mode, all messages are returned to the callback.
462 r = self.decode_incoming_msg(msg)
466 msgname = type(r).__name__
468 if self.event_callback:
469 self.event_callback(msgname, r)
471 def _control_ping(self, context):
472 """Send a ping command."""
473 self._call_vpp_async(self.control_ping_index,
474 self.control_ping_msgdef,
477 def validate_args(self, msg, kwargs):
478 d = set(kwargs.keys()) - set(msg.field_by_name.keys())
480 raise ValueError('Invalid argument {} to {}'
481 .format(list(d), msg.name))
483 def _call_vpp(self, i, msg, multipart, **kwargs):
484 """Given a message, send the message and await a reply.
486 msgdef - the message packing definition
487 i - the message type index
488 multipart - True if the message returns multiple
490 context - context number - chosen at random if not
492 The remainder of the kwargs are the arguments to the API call.
494 The return value is the message or message array containing
495 the response. It will raise an IOError exception if there was
496 no response within the timeout window.
499 if 'context' not in kwargs:
500 context = self.get_context()
501 kwargs['context'] = context
503 context = kwargs['context']
504 kwargs['_vl_msg_id'] = i
507 if self.transport.socket_index:
508 kwargs['client_index'] = self.transport.socket_index
509 except AttributeError:
511 self.validate_args(msg, kwargs)
513 self.transport.suspend()
515 self.transport.write(b)
518 # Send a ping after the request - we use its response
519 # to detect that we have seen all results.
520 self._control_ping(context)
522 # Block until we get a reply.
525 msg = self.transport.read()
527 raise IOError(2, 'VPP API client: read failed')
528 r = self.decode_incoming_msg(msg)
529 msgname = type(r).__name__
530 if context not in r or r.context == 0 or context != r.context:
531 # Message being queued
532 self.message_queue.put_nowait(r)
538 if msgname == 'control_ping_reply':
543 self.transport.resume()
547 def _call_vpp_async(self, i, msg, **kwargs):
548 """Given a message, send the message and await a reply.
550 msgdef - the message packing definition
551 i - the message type index
552 context - context number - chosen at random if not
554 The remainder of the kwargs are the arguments to the API call.
556 if 'context' not in kwargs:
557 context = self.get_context()
558 kwargs['context'] = context
560 context = kwargs['context']
562 if self.transport.socket_index:
563 kwargs['client_index'] = self.transport.socket_index
564 except AttributeError:
565 kwargs['client_index'] = 0
566 kwargs['_vl_msg_id'] = i
569 self.transport.write(b)
571 def register_event_callback(self, callback):
572 """Register a callback for async messages.
574 This will be called for async notifications in sync mode,
575 and all messages in async mode. In sync mode, replies to
576 requests will not come here.
578 callback is a fn(msg_type_name, msg_type) that will be
579 called when a message comes in. While this function is
580 executing, note that (a) you are in a background thread and
581 may wish to use threading.Lock to protect your datastructures,
582 and (b) message processing from VPP will stop (so if you take
583 a long while about it you may provoke reply timeouts or cause
584 VPP to fill the RX buffer). Passing None will disable the
587 self.event_callback = callback
589 def thread_msg_handler(self):
590 """Python thread calling the user registered message handler.
592 This is to emulate the old style event callback scheme. Modern
593 clients should provide their own thread to poll the event
597 r = self.message_queue.get()
598 if r == "terminate event thread":
600 msgname = type(r).__name__
601 if self.event_callback:
602 self.event_callback(msgname, r)
605 # vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4