3 # Copyright (c) 2016 Cisco and/or its affiliates.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
17 from __future__ import print_function
18 from __future__ import absolute_import
29 from . vpp_serializer import VPPType, VPPEnumType, VPPUnionType, BaseTypes
30 from . vpp_serializer import VPPMessage, vpp_get_type
31 from . vpp_format import VPPFormat
33 if sys.version[0] == '2':
39 class VppEnumType(type):
40 def __getattr__(cls, name):
41 t = vpp_get_type(name)
46 # class VppEnum(metaclass=VppEnumType):
49 __metaclass__ = VppEnumType
52 def vpp_atexit(vpp_weakref):
53 """Clean up VPP connection on shutdown."""
54 vpp_instance = vpp_weakref()
55 if vpp_instance and vpp_instance.transport.connected:
56 vpp_instance.logger.debug('Cleaning up VPP on exit')
57 vpp_instance.disconnect()
61 if sys.version[0] == '2':
67 class VppApiDynamicMethodHolder(object):
71 class FuncWrapper(object):
72 def __init__(self, func):
74 self.__name__ = func.__name__
76 def __call__(self, **kwargs):
77 return self._func(**kwargs)
83 This class provides the APIs to VPP. The APIs are loaded
84 from provided .api.json files and makes functions accordingly.
85 These functions are documented in the VPP .api files, as they
86 are dynamically created.
88 Additionally, VPP can send callback messages; this class
89 provides a means to register a callback function to receive
90 these messages in a background thread.
93 def process_json_file(self, apidef_file):
94 api = json.load(apidef_file)
96 for t in api['enums']:
97 t[0] = 'vl_api_' + t[0] + '_t'
98 types[t[0]] = {'type': 'enum', 'data': t}
99 for t in api['unions']:
100 t[0] = 'vl_api_' + t[0] + '_t'
101 types[t[0]] = {'type': 'union', 'data': t}
102 for t in api['types']:
103 t[0] = 'vl_api_' + t[0] + '_t'
104 types[t[0]] = {'type': 'type', 'data': t}
109 for k, v in types.items():
111 if not vpp_get_type(t[0]):
112 if v['type'] == 'enum':
114 VPPEnumType(t[0], t[1:])
117 elif v['type'] == 'union':
119 VPPUnionType(t[0], t[1:])
122 elif v['type'] == 'type':
127 if len(unresolved) == 0:
130 raise ValueError('Unresolved type definitions {}'
135 for m in api['messages']:
137 self.messages[m[0]] = VPPMessage(m[0], m[1:])
138 except NotImplementedError:
139 self.logger.error('Not implemented error for {}'.format(m[0]))
141 def __init__(self, apifiles=None, testmode=False, async_thread=True,
142 logger=None, loglevel=None,
143 read_timeout=5, use_socket=False,
144 server_address='/run/vpp-api.sock'):
145 """Create a VPP API object.
147 apifiles is a list of files containing API
148 descriptions that will be loaded - methods will be
149 dynamically created reflecting these APIs. If not
150 provided this will load the API files from VPP's
151 default install location.
153 logger, if supplied, is the logging logger object to log to.
154 loglevel, if supplied, is the log level this logger is set
155 to report at (from the loglevels in the logging module).
158 logger = logging.getLogger(__name__)
159 if loglevel is not None:
160 logger.setLevel(loglevel)
166 self.header = VPPType('header', [['u16', 'msgid'],
167 ['u32', 'client_index']])
169 self.event_callback = None
170 self.message_queue = queue.Queue()
171 self.read_timeout = read_timeout
172 self.async_thread = async_thread
175 from . vpp_transport_socket import VppTransport
177 from . vpp_transport_shmem import VppTransport
180 # Pick up API definitions from default directory
182 apifiles = self.find_api_files()
184 # In test mode we don't care that we can't find the API files
190 for file in apifiles:
191 with open(file) as apidef_file:
192 self.process_json_file(apidef_file)
194 self.apifiles = apifiles
197 if len(self.messages) == 0 and not testmode:
198 raise ValueError(1, 'Missing JSON message definitions')
200 self.transport = VppTransport(self, read_timeout=read_timeout,
201 server_address=server_address)
202 # Make sure we allow VPP to clean up the message rings.
203 atexit.register(vpp_atexit, weakref.ref(self))
205 class ContextId(object):
206 """Thread-safe provider of unique context IDs."""
209 self.lock = threading.Lock()
212 """Get a new unique (or, at least, not recently used) context."""
216 get_context = ContextId()
218 def get_type(self, name):
219 return vpp_get_type(name)
222 def find_api_dir(cls):
223 """Attempt to find the best directory in which API definition
224 files may reside. If the value VPP_API_DIR exists in the environment
225 then it is first on the search list. If we're inside a recognized
226 location in a VPP source tree (src/scripts and src/vpp-api/python)
227 then entries from there to the likely locations in build-root are
228 added. Finally the location used by system packages is added.
230 :returns: A single directory name, or None if no such directory
235 if 'VPP_API_DIR' in os.environ:
236 dirs.append(os.environ['VPP_API_DIR'])
238 # perhaps we're in the 'src/scripts' or 'src/vpp-api/python' dir;
239 # in which case, plot a course to likely places in the src tree
240 import __main__ as main
241 if hasattr(main, '__file__'):
242 # get the path of the calling script
243 localdir = os.path.dirname(os.path.realpath(main.__file__))
245 # use cwd if there is no calling script
246 localdir = os.getcwd()
247 localdir_s = localdir.split(os.path.sep)
250 """Match dir against right-hand components of the script dir"""
251 d = dir.split('/') # param 'dir' assumes a / separator
253 return len(localdir_s) > length and localdir_s[-length:] == d
255 def sdir(srcdir, variant):
256 """Build a path from srcdir to the staged API files of
257 'variant' (typically '' or '_debug')"""
258 # Since 'core' and 'plugin' files are staged
259 # in separate directories, we target the parent dir.
260 return os.path.sep.join((
263 'install-vpp%s-native' % variant,
271 if dmatch('src/scripts'):
272 srcdir = os.path.sep.join(localdir_s[:-2])
273 elif dmatch('src/vpp-api/python'):
274 srcdir = os.path.sep.join(localdir_s[:-3])
276 # we're apparently running tests
277 srcdir = os.path.sep.join(localdir_s[:-1])
280 # we're in the source tree, try both the debug and release
282 dirs.append(sdir(srcdir, '_debug'))
283 dirs.append(sdir(srcdir, ''))
285 # Test for staged copies of the scripts
286 # For these, since we explicitly know if we're running a debug versus
287 # release variant, target only the relevant directory
288 if dmatch('build-root/install-vpp_debug-native/vpp/bin'):
289 srcdir = os.path.sep.join(localdir_s[:-4])
290 dirs.append(sdir(srcdir, '_debug'))
291 if dmatch('build-root/install-vpp-native/vpp/bin'):
292 srcdir = os.path.sep.join(localdir_s[:-4])
293 dirs.append(sdir(srcdir, ''))
295 # finally, try the location system packages typically install into
296 dirs.append(os.path.sep.join(('', 'usr', 'share', 'vpp', 'api')))
298 # check the directories for existance; first one wins
300 if os.path.isdir(dir):
306 def find_api_files(cls, api_dir=None, patterns='*'):
307 """Find API definition files from the given directory tree with the
308 given pattern. If no directory is given then find_api_dir() is used
309 to locate one. If no pattern is given then all definition files found
310 in the directory tree are used.
312 :param api_dir: A directory tree in which to locate API definition
313 files; subdirectories are descended into.
314 If this is None then find_api_dir() is called to discover it.
315 :param patterns: A list of patterns to use in each visited directory
316 when looking for files.
317 This can be a list/tuple object or a comma-separated string of
318 patterns. Each value in the list will have leading/trialing
320 The pattern specifies the first part of the filename, '.api.json'
322 The results are de-duplicated, thus overlapping patterns are fine.
323 If this is None it defaults to '*' meaning "all API files".
324 :returns: A list of file paths for the API files found.
327 api_dir = cls.find_api_dir()
329 raise RuntimeError("api_dir cannot be located")
331 if isinstance(patterns, list) or isinstance(patterns, tuple):
332 patterns = [p.strip() + '.api.json' for p in patterns]
334 patterns = [p.strip() + '.api.json' for p in patterns.split(",")]
337 for root, dirnames, files in os.walk(api_dir):
338 # iterate all given patterns and de-dup the result
339 files = set(sum([fnmatch.filter(files, p) for p in patterns], []))
340 for filename in files:
341 api_files.append(os.path.join(root, filename))
347 if not hasattr(self, "_api"):
348 raise Exception("Not connected, api definitions not available")
351 def make_function(self, msg, i, multipart, do_async):
354 return self._call_vpp_async(i, msg, **kwargs)
357 return self._call_vpp(i, msg, multipart, **kwargs)
359 f.__name__ = str(msg.name)
360 f.__doc__ = ", ".join(["%s %s" %
361 (msg.fieldtypes[j], k)
362 for j, k in enumerate(msg.fields)])
365 def _register_functions(self, do_async=False):
366 self.id_names = [None] * (self.vpp_dictionary_maxid + 1)
367 self.id_msgdef = [None] * (self.vpp_dictionary_maxid + 1)
368 self._api = VppApiDynamicMethodHolder()
369 for name, msg in vpp_iterator(self.messages):
370 n = name + '_' + msg.crc[2:]
371 i = self.transport.get_msg_index(n.encode())
373 self.id_msgdef[i] = msg
374 self.id_names[i] = name
375 # TODO: Fix multipart (use services)
376 multipart = True if name.find('_dump') > 0 else False
377 f = self.make_function(msg, i, multipart, do_async)
378 setattr(self._api, name, FuncWrapper(f))
381 'No such message type or failed CRC checksum: %s', n)
383 def connect_internal(self, name, msg_handler, chroot_prefix, rx_qlen,
385 pfx = chroot_prefix.encode() if chroot_prefix else None
387 rv = self.transport.connect(name.encode(), pfx, msg_handler, rx_qlen)
389 raise IOError(2, 'Connect failed')
390 self.vpp_dictionary_maxid = self.transport.msg_table_max_index()
391 self._register_functions(do_async=do_async)
393 # Initialise control ping
394 crc = self.messages['control_ping'].crc
395 self.control_ping_index = self.transport.get_msg_index(
396 ('control_ping' + '_' + crc[2:]).encode())
397 self.control_ping_msgdef = self.messages['control_ping']
398 if self.async_thread:
399 self.event_thread = threading.Thread(
400 target=self.thread_msg_handler)
401 self.event_thread.daemon = True
402 self.event_thread.start()
405 def connect(self, name, chroot_prefix=None, do_async=False, rx_qlen=32):
408 name - the name of the client.
409 chroot_prefix - if VPP is chroot'ed, the prefix of the jail
410 do_async - if true, messages are sent without waiting for a reply
411 rx_qlen - the length of the VPP message receive queue between
414 msg_handler = self.transport.get_callback(do_async)
415 return self.connect_internal(name, msg_handler, chroot_prefix, rx_qlen,
418 def connect_sync(self, name, chroot_prefix=None, rx_qlen=32):
419 """Attach to VPP in synchronous mode. Application must poll for events.
421 name - the name of the client.
422 chroot_prefix - if VPP is chroot'ed, the prefix of the jail
423 rx_qlen - the length of the VPP message receive queue between
427 return self.connect_internal(name, None, chroot_prefix, rx_qlen,
430 def disconnect(self):
431 """Detach from VPP."""
432 rv = self.transport.disconnect()
433 self.message_queue.put("terminate event thread")
436 def msg_handler_sync(self, msg):
437 """Process an incoming message from VPP in sync mode.
439 The message may be a reply or it may be an async notification.
441 r = self.decode_incoming_msg(msg)
445 # If we have a context, then use the context to find any
446 # request waiting for a reply
448 if hasattr(r, 'context') and r.context > 0:
452 # No context -> async notification that we feed to the callback
453 self.message_queue.put_nowait(r)
455 raise IOError(2, 'RPC reply message received in event handler')
457 def decode_incoming_msg(self, msg):
459 self.logger.warning('vpp_api.read failed')
461 (i, ci), size = self.header.unpack(msg, 0)
462 if self.id_names[i] == 'rx_thread_exit':
466 # Decode message and returns a tuple.
468 msgobj = self.id_msgdef[i]
470 raise IOError(2, 'Reply message undefined')
472 r, size = msgobj.unpack(msg)
475 def msg_handler_async(self, msg):
476 """Process a message from VPP in async mode.
478 In async mode, all messages are returned to the callback.
480 r = self.decode_incoming_msg(msg)
484 msgname = type(r).__name__
486 if self.event_callback:
487 self.event_callback(msgname, r)
489 def _control_ping(self, context):
490 """Send a ping command."""
491 self._call_vpp_async(self.control_ping_index,
492 self.control_ping_msgdef,
495 def validate_args(self, msg, kwargs):
496 d = set(kwargs.keys()) - set(msg.field_by_name.keys())
498 raise ValueError('Invalid argument {} to {}'
499 .format(list(d), msg.name))
501 def _call_vpp(self, i, msg, multipart, **kwargs):
502 """Given a message, send the message and await a reply.
504 msgdef - the message packing definition
505 i - the message type index
506 multipart - True if the message returns multiple
508 context - context number - chosen at random if not
510 The remainder of the kwargs are the arguments to the API call.
512 The return value is the message or message array containing
513 the response. It will raise an IOError exception if there was
514 no response within the timeout window.
517 if 'context' not in kwargs:
518 context = self.get_context()
519 kwargs['context'] = context
521 context = kwargs['context']
522 kwargs['_vl_msg_id'] = i
525 if self.transport.socket_index:
526 kwargs['client_index'] = self.transport.socket_index
527 except AttributeError:
529 self.validate_args(msg, kwargs)
531 self.transport.suspend()
533 self.transport.write(b)
536 # Send a ping after the request - we use its response
537 # to detect that we have seen all results.
538 self._control_ping(context)
540 # Block until we get a reply.
543 msg = self.transport.read()
545 raise IOError(2, 'VPP API client: read failed')
546 r = self.decode_incoming_msg(msg)
547 msgname = type(r).__name__
548 if context not in r or r.context == 0 or context != r.context:
549 # Message being queued
550 self.message_queue.put_nowait(r)
556 if msgname == 'control_ping_reply':
561 self.transport.resume()
565 def _call_vpp_async(self, i, msg, **kwargs):
566 """Given a message, send the message and await a reply.
568 msgdef - the message packing definition
569 i - the message type index
570 context - context number - chosen at random if not
572 The remainder of the kwargs are the arguments to the API call.
574 if 'context' not in kwargs:
575 context = self.get_context()
576 kwargs['context'] = context
578 context = kwargs['context']
580 if self.transport.socket_index:
581 kwargs['client_index'] = self.transport.socket_index
582 except AttributeError:
583 kwargs['client_index'] = 0
584 kwargs['_vl_msg_id'] = i
587 self.transport.write(b)
589 def register_event_callback(self, callback):
590 """Register a callback for async messages.
592 This will be called for async notifications in sync mode,
593 and all messages in async mode. In sync mode, replies to
594 requests will not come here.
596 callback is a fn(msg_type_name, msg_type) that will be
597 called when a message comes in. While this function is
598 executing, note that (a) you are in a background thread and
599 may wish to use threading.Lock to protect your datastructures,
600 and (b) message processing from VPP will stop (so if you take
601 a long while about it you may provoke reply timeouts or cause
602 VPP to fill the RX buffer). Passing None will disable the
605 self.event_callback = callback
607 def thread_msg_handler(self):
608 """Python thread calling the user registered message handler.
610 This is to emulate the old style event callback scheme. Modern
611 clients should provide their own thread to poll the event
615 r = self.message_queue.get()
616 if r == "terminate event thread":
618 msgname = type(r).__name__
619 if self.event_callback:
620 self.event_callback(msgname, r)
623 # vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4