3 # Copyright (c) 2016 Cisco and/or its affiliates.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
17 from __future__ import print_function
18 from __future__ import absolute_import
29 from . vpp_serializer import VPPType, VPPEnumType, VPPUnionType, BaseTypes
30 from . vpp_serializer import VPPMessage, vpp_get_type, VPPTypeAlias
31 from . vpp_format import VPPFormat
33 if sys.version[0] == '2':
39 class VppEnumType(type):
40 def __getattr__(cls, name):
41 t = vpp_get_type(name)
46 # class VppEnum(metaclass=VppEnumType):
48 class VppEnum(object):
49 __metaclass__ = VppEnumType
52 def vpp_atexit(vpp_weakref):
53 """Clean up VPP connection on shutdown."""
54 vpp_instance = vpp_weakref()
55 if vpp_instance and vpp_instance.transport.connected:
56 vpp_instance.logger.debug('Cleaning up VPP on exit')
57 vpp_instance.disconnect()
61 if sys.version[0] == '2':
67 class VppApiDynamicMethodHolder(object):
71 class FuncWrapper(object):
72 def __init__(self, func):
74 self.__name__ = func.__name__
76 def __call__(self, **kwargs):
77 return self._func(**kwargs)
83 This class provides the APIs to VPP. The APIs are loaded
84 from provided .api.json files and makes functions accordingly.
85 These functions are documented in the VPP .api files, as they
86 are dynamically created.
88 Additionally, VPP can send callback messages; this class
89 provides a means to register a callback function to receive
90 these messages in a background thread.
93 def process_json_file(self, apidef_file):
94 api = json.load(apidef_file)
96 for t in api['enums']:
97 t[0] = 'vl_api_' + t[0] + '_t'
98 types[t[0]] = {'type': 'enum', 'data': t}
99 for t in api['unions']:
100 t[0] = 'vl_api_' + t[0] + '_t'
101 types[t[0]] = {'type': 'union', 'data': t}
102 for t in api['types']:
103 t[0] = 'vl_api_' + t[0] + '_t'
104 types[t[0]] = {'type': 'type', 'data': t}
105 for t, v in api['aliases'].items():
106 types['vl_api_' + t + '_t'] = {'type': 'alias', 'data': v}
111 for k, v in types.items():
113 if not vpp_get_type(k):
114 if v['type'] == 'enum':
116 VPPEnumType(t[0], t[1:])
119 elif v['type'] == 'union':
121 VPPUnionType(t[0], t[1:])
124 elif v['type'] == 'type':
129 elif v['type'] == 'alias':
134 if len(unresolved) == 0:
137 raise ValueError('Unresolved type definitions {}'
142 for m in api['messages']:
144 self.messages[m[0]] = VPPMessage(m[0], m[1:])
145 except NotImplementedError:
146 self.logger.error('Not implemented error for {}'.format(m[0]))
148 def __init__(self, apifiles=None, testmode=False, async_thread=True,
149 logger=None, loglevel=None,
150 read_timeout=5, use_socket=False,
151 server_address='/run/vpp-api.sock'):
152 """Create a VPP API object.
154 apifiles is a list of files containing API
155 descriptions that will be loaded - methods will be
156 dynamically created reflecting these APIs. If not
157 provided this will load the API files from VPP's
158 default install location.
160 logger, if supplied, is the logging logger object to log to.
161 loglevel, if supplied, is the log level this logger is set
162 to report at (from the loglevels in the logging module).
165 logger = logging.getLogger(__name__)
166 if loglevel is not None:
167 logger.setLevel(loglevel)
173 self.header = VPPType('header', [['u16', 'msgid'],
174 ['u32', 'client_index']])
176 self.event_callback = None
177 self.message_queue = queue.Queue()
178 self.read_timeout = read_timeout
179 self.async_thread = async_thread
182 from . vpp_transport_socket import VppTransport
184 from . vpp_transport_shmem import VppTransport
187 # Pick up API definitions from default directory
189 apifiles = self.find_api_files()
191 # In test mode we don't care that we can't find the API files
197 for file in apifiles:
198 with open(file) as apidef_file:
199 self.process_json_file(apidef_file)
201 self.apifiles = apifiles
204 if len(self.messages) == 0 and not testmode:
205 raise ValueError(1, 'Missing JSON message definitions')
207 self.transport = VppTransport(self, read_timeout=read_timeout,
208 server_address=server_address)
209 # Make sure we allow VPP to clean up the message rings.
210 atexit.register(vpp_atexit, weakref.ref(self))
212 class ContextId(object):
213 """Thread-safe provider of unique context IDs."""
216 self.lock = threading.Lock()
219 """Get a new unique (or, at least, not recently used) context."""
223 get_context = ContextId()
225 def get_type(self, name):
226 return vpp_get_type(name)
229 def find_api_dir(cls):
230 """Attempt to find the best directory in which API definition
231 files may reside. If the value VPP_API_DIR exists in the environment
232 then it is first on the search list. If we're inside a recognized
233 location in a VPP source tree (src/scripts and src/vpp-api/python)
234 then entries from there to the likely locations in build-root are
235 added. Finally the location used by system packages is added.
237 :returns: A single directory name, or None if no such directory
242 if 'VPP_API_DIR' in os.environ:
243 dirs.append(os.environ['VPP_API_DIR'])
245 # perhaps we're in the 'src/scripts' or 'src/vpp-api/python' dir;
246 # in which case, plot a course to likely places in the src tree
247 import __main__ as main
248 if hasattr(main, '__file__'):
249 # get the path of the calling script
250 localdir = os.path.dirname(os.path.realpath(main.__file__))
252 # use cwd if there is no calling script
253 localdir = os.getcwd()
254 localdir_s = localdir.split(os.path.sep)
257 """Match dir against right-hand components of the script dir"""
258 d = dir.split('/') # param 'dir' assumes a / separator
260 return len(localdir_s) > length and localdir_s[-length:] == d
262 def sdir(srcdir, variant):
263 """Build a path from srcdir to the staged API files of
264 'variant' (typically '' or '_debug')"""
265 # Since 'core' and 'plugin' files are staged
266 # in separate directories, we target the parent dir.
267 return os.path.sep.join((
270 'install-vpp%s-native' % variant,
278 if dmatch('src/scripts'):
279 srcdir = os.path.sep.join(localdir_s[:-2])
280 elif dmatch('src/vpp-api/python'):
281 srcdir = os.path.sep.join(localdir_s[:-3])
283 # we're apparently running tests
284 srcdir = os.path.sep.join(localdir_s[:-1])
287 # we're in the source tree, try both the debug and release
289 dirs.append(sdir(srcdir, '_debug'))
290 dirs.append(sdir(srcdir, ''))
292 # Test for staged copies of the scripts
293 # For these, since we explicitly know if we're running a debug versus
294 # release variant, target only the relevant directory
295 if dmatch('build-root/install-vpp_debug-native/vpp/bin'):
296 srcdir = os.path.sep.join(localdir_s[:-4])
297 dirs.append(sdir(srcdir, '_debug'))
298 if dmatch('build-root/install-vpp-native/vpp/bin'):
299 srcdir = os.path.sep.join(localdir_s[:-4])
300 dirs.append(sdir(srcdir, ''))
302 # finally, try the location system packages typically install into
303 dirs.append(os.path.sep.join(('', 'usr', 'share', 'vpp', 'api')))
305 # check the directories for existance; first one wins
307 if os.path.isdir(dir):
313 def find_api_files(cls, api_dir=None, patterns='*'):
314 """Find API definition files from the given directory tree with the
315 given pattern. If no directory is given then find_api_dir() is used
316 to locate one. If no pattern is given then all definition files found
317 in the directory tree are used.
319 :param api_dir: A directory tree in which to locate API definition
320 files; subdirectories are descended into.
321 If this is None then find_api_dir() is called to discover it.
322 :param patterns: A list of patterns to use in each visited directory
323 when looking for files.
324 This can be a list/tuple object or a comma-separated string of
325 patterns. Each value in the list will have leading/trialing
327 The pattern specifies the first part of the filename, '.api.json'
329 The results are de-duplicated, thus overlapping patterns are fine.
330 If this is None it defaults to '*' meaning "all API files".
331 :returns: A list of file paths for the API files found.
334 api_dir = cls.find_api_dir()
336 raise RuntimeError("api_dir cannot be located")
338 if isinstance(patterns, list) or isinstance(patterns, tuple):
339 patterns = [p.strip() + '.api.json' for p in patterns]
341 patterns = [p.strip() + '.api.json' for p in patterns.split(",")]
344 for root, dirnames, files in os.walk(api_dir):
345 # iterate all given patterns and de-dup the result
346 files = set(sum([fnmatch.filter(files, p) for p in patterns], []))
347 for filename in files:
348 api_files.append(os.path.join(root, filename))
354 if not hasattr(self, "_api"):
355 raise Exception("Not connected, api definitions not available")
358 def make_function(self, msg, i, multipart, do_async):
361 return self._call_vpp_async(i, msg, **kwargs)
364 return self._call_vpp(i, msg, multipart, **kwargs)
366 f.__name__ = str(msg.name)
367 f.__doc__ = ", ".join(["%s %s" %
368 (msg.fieldtypes[j], k)
369 for j, k in enumerate(msg.fields)])
372 def _register_functions(self, do_async=False):
373 self.id_names = [None] * (self.vpp_dictionary_maxid + 1)
374 self.id_msgdef = [None] * (self.vpp_dictionary_maxid + 1)
375 self._api = VppApiDynamicMethodHolder()
376 for name, msg in vpp_iterator(self.messages):
377 n = name + '_' + msg.crc[2:]
378 i = self.transport.get_msg_index(n.encode())
380 self.id_msgdef[i] = msg
381 self.id_names[i] = name
382 # TODO: Fix multipart (use services)
383 multipart = True if name.find('_dump') > 0 else False
384 f = self.make_function(msg, i, multipart, do_async)
385 setattr(self._api, name, FuncWrapper(f))
388 'No such message type or failed CRC checksum: %s', n)
390 def connect_internal(self, name, msg_handler, chroot_prefix, rx_qlen,
392 pfx = chroot_prefix.encode() if chroot_prefix else None
394 rv = self.transport.connect(name.encode(), pfx, msg_handler, rx_qlen)
396 raise IOError(2, 'Connect failed')
397 self.vpp_dictionary_maxid = self.transport.msg_table_max_index()
398 self._register_functions(do_async=do_async)
400 # Initialise control ping
401 crc = self.messages['control_ping'].crc
402 self.control_ping_index = self.transport.get_msg_index(
403 ('control_ping' + '_' + crc[2:]).encode())
404 self.control_ping_msgdef = self.messages['control_ping']
405 if self.async_thread:
406 self.event_thread = threading.Thread(
407 target=self.thread_msg_handler)
408 self.event_thread.daemon = True
409 self.event_thread.start()
412 def connect(self, name, chroot_prefix=None, do_async=False, rx_qlen=32):
415 name - the name of the client.
416 chroot_prefix - if VPP is chroot'ed, the prefix of the jail
417 do_async - if true, messages are sent without waiting for a reply
418 rx_qlen - the length of the VPP message receive queue between
421 msg_handler = self.transport.get_callback(do_async)
422 return self.connect_internal(name, msg_handler, chroot_prefix, rx_qlen,
425 def connect_sync(self, name, chroot_prefix=None, rx_qlen=32):
426 """Attach to VPP in synchronous mode. Application must poll for events.
428 name - the name of the client.
429 chroot_prefix - if VPP is chroot'ed, the prefix of the jail
430 rx_qlen - the length of the VPP message receive queue between
434 return self.connect_internal(name, None, chroot_prefix, rx_qlen,
437 def disconnect(self):
438 """Detach from VPP."""
439 rv = self.transport.disconnect()
440 self.message_queue.put("terminate event thread")
443 def msg_handler_sync(self, msg):
444 """Process an incoming message from VPP in sync mode.
446 The message may be a reply or it may be an async notification.
448 r = self.decode_incoming_msg(msg)
452 # If we have a context, then use the context to find any
453 # request waiting for a reply
455 if hasattr(r, 'context') and r.context > 0:
459 # No context -> async notification that we feed to the callback
460 self.message_queue.put_nowait(r)
462 raise IOError(2, 'RPC reply message received in event handler')
464 def decode_incoming_msg(self, msg):
466 self.logger.warning('vpp_api.read failed')
468 (i, ci), size = self.header.unpack(msg, 0)
469 if self.id_names[i] == 'rx_thread_exit':
473 # Decode message and returns a tuple.
475 msgobj = self.id_msgdef[i]
477 raise IOError(2, 'Reply message undefined')
479 r, size = msgobj.unpack(msg)
482 def msg_handler_async(self, msg):
483 """Process a message from VPP in async mode.
485 In async mode, all messages are returned to the callback.
487 r = self.decode_incoming_msg(msg)
491 msgname = type(r).__name__
493 if self.event_callback:
494 self.event_callback(msgname, r)
496 def _control_ping(self, context):
497 """Send a ping command."""
498 self._call_vpp_async(self.control_ping_index,
499 self.control_ping_msgdef,
502 def validate_args(self, msg, kwargs):
503 d = set(kwargs.keys()) - set(msg.field_by_name.keys())
505 raise ValueError('Invalid argument {} to {}'
506 .format(list(d), msg.name))
508 def _call_vpp(self, i, msg, multipart, **kwargs):
509 """Given a message, send the message and await a reply.
511 msgdef - the message packing definition
512 i - the message type index
513 multipart - True if the message returns multiple
515 context - context number - chosen at random if not
517 The remainder of the kwargs are the arguments to the API call.
519 The return value is the message or message array containing
520 the response. It will raise an IOError exception if there was
521 no response within the timeout window.
524 if 'context' not in kwargs:
525 context = self.get_context()
526 kwargs['context'] = context
528 context = kwargs['context']
529 kwargs['_vl_msg_id'] = i
532 if self.transport.socket_index:
533 kwargs['client_index'] = self.transport.socket_index
534 except AttributeError:
536 self.validate_args(msg, kwargs)
538 self.transport.suspend()
540 self.transport.write(b)
543 # Send a ping after the request - we use its response
544 # to detect that we have seen all results.
545 self._control_ping(context)
547 # Block until we get a reply.
550 msg = self.transport.read()
552 raise IOError(2, 'VPP API client: read failed')
553 r = self.decode_incoming_msg(msg)
554 msgname = type(r).__name__
555 if context not in r or r.context == 0 or context != r.context:
556 # Message being queued
557 self.message_queue.put_nowait(r)
563 if msgname == 'control_ping_reply':
568 self.transport.resume()
572 def _call_vpp_async(self, i, msg, **kwargs):
573 """Given a message, send the message and await a reply.
575 msgdef - the message packing definition
576 i - the message type index
577 context - context number - chosen at random if not
579 The remainder of the kwargs are the arguments to the API call.
581 if 'context' not in kwargs:
582 context = self.get_context()
583 kwargs['context'] = context
585 context = kwargs['context']
587 if self.transport.socket_index:
588 kwargs['client_index'] = self.transport.socket_index
589 except AttributeError:
590 kwargs['client_index'] = 0
591 kwargs['_vl_msg_id'] = i
594 self.transport.write(b)
596 def register_event_callback(self, callback):
597 """Register a callback for async messages.
599 This will be called for async notifications in sync mode,
600 and all messages in async mode. In sync mode, replies to
601 requests will not come here.
603 callback is a fn(msg_type_name, msg_type) that will be
604 called when a message comes in. While this function is
605 executing, note that (a) you are in a background thread and
606 may wish to use threading.Lock to protect your datastructures,
607 and (b) message processing from VPP will stop (so if you take
608 a long while about it you may provoke reply timeouts or cause
609 VPP to fill the RX buffer). Passing None will disable the
612 self.event_callback = callback
614 def thread_msg_handler(self):
615 """Python thread calling the user registered message handler.
617 This is to emulate the old style event callback scheme. Modern
618 clients should provide their own thread to poll the event
622 r = self.message_queue.get()
623 if r == "terminate event thread":
625 msgname = type(r).__name__
626 if self.event_callback:
627 self.event_callback(msgname, r)
630 # vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4