3 # Copyright (c) 2016 Cisco and/or its affiliates.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
17 from __future__ import print_function
18 from __future__ import absolute_import
29 from . vpp_serializer import VPPType, VPPEnumType, VPPUnionType, BaseTypes
30 from . vpp_serializer import VPPMessage, vpp_get_type
32 if sys.version[0] == '2':
38 class VppEnumType(type):
39 def __getattr__(cls, name):
40 t = vpp_get_type(name)
45 # class VppEnum(metaclass=VppEnumType):
48 __metaclass__ = VppEnumType
51 def vpp_atexit(vpp_weakref):
52 """Clean up VPP connection on shutdown."""
53 vpp_instance = vpp_weakref()
54 if vpp_instance and vpp_instance.transport.connected:
55 vpp_instance.logger.debug('Cleaning up VPP on exit')
56 vpp_instance.disconnect()
60 if sys.version[0] == '2':
66 class VppApiDynamicMethodHolder(object):
70 class FuncWrapper(object):
71 def __init__(self, func):
73 self.__name__ = func.__name__
75 def __call__(self, **kwargs):
76 return self._func(**kwargs)
82 This class provides the APIs to VPP. The APIs are loaded
83 from provided .api.json files and makes functions accordingly.
84 These functions are documented in the VPP .api files, as they
85 are dynamically created.
87 Additionally, VPP can send callback messages; this class
88 provides a means to register a callback function to receive
89 these messages in a background thread.
92 def process_json_file(self, apidef_file):
93 api = json.load(apidef_file)
95 for t in api['enums']:
96 t[0] = 'vl_api_' + t[0] + '_t'
97 types[t[0]] = {'type': 'enum', 'data': t}
98 for t in api['unions']:
99 t[0] = 'vl_api_' + t[0] + '_t'
100 types[t[0]] = {'type': 'union', 'data': t}
101 for t in api['types']:
102 t[0] = 'vl_api_' + t[0] + '_t'
103 types[t[0]] = {'type': 'type', 'data': t}
108 for k, v in types.items():
110 if not vpp_get_type(t[0]):
111 if v['type'] == 'enum':
113 VPPEnumType(t[0], t[1:])
116 elif v['type'] == 'union':
118 VPPUnionType(t[0], t[1:])
121 elif v['type'] == 'type':
126 if len(unresolved) == 0:
129 raise ValueError('Unresolved type definitions {}'
134 for m in api['messages']:
136 self.messages[m[0]] = VPPMessage(m[0], m[1:])
137 except NotImplementedError:
138 self.logger.error('Not implemented error for {}'.format(m[0]))
140 def __init__(self, apifiles=None, testmode=False, async_thread=True,
141 logger=logging.getLogger('vpp_papi'), loglevel='debug',
142 read_timeout=5, use_socket=False,
143 server_address='/run/vpp-api.sock'):
144 """Create a VPP API object.
146 apifiles is a list of files containing API
147 descriptions that will be loaded - methods will be
148 dynamically created reflecting these APIs. If not
149 provided this will load the API files from VPP's
150 default install location.
152 logger, if supplied, is the logging logger object to log to.
153 loglevel, if supplied, is the log level this logger is set
154 to report at (from the loglevels in the logging module).
157 logger = logging.getLogger(__name__)
158 if loglevel is not None:
159 logger.setLevel(loglevel)
165 self.header = VPPType('header', [['u16', 'msgid'],
166 ['u32', 'client_index']])
168 self.event_callback = None
169 self.message_queue = queue.Queue()
170 self.read_timeout = read_timeout
171 self.async_thread = async_thread
174 from . vpp_transport_socket import VppTransport
176 from . vpp_transport_shmem import VppTransport
179 # Pick up API definitions from default directory
181 apifiles = self.find_api_files()
183 # In test mode we don't care that we can't find the API files
189 for file in apifiles:
190 with open(file) as apidef_file:
191 self.process_json_file(apidef_file)
193 self.apifiles = apifiles
196 if len(self.messages) == 0 and not testmode:
197 raise ValueError(1, 'Missing JSON message definitions')
199 self.transport = VppTransport(self, read_timeout=read_timeout,
200 server_address=server_address)
201 # Make sure we allow VPP to clean up the message rings.
202 atexit.register(vpp_atexit, weakref.ref(self))
204 class ContextId(object):
205 """Thread-safe provider of unique context IDs."""
208 self.lock = threading.Lock()
211 """Get a new unique (or, at least, not recently used) context."""
215 get_context = ContextId()
217 def get_type(self, name):
218 return vpp_get_type(name)
221 def find_api_dir(cls):
222 """Attempt to find the best directory in which API definition
223 files may reside. If the value VPP_API_DIR exists in the environment
224 then it is first on the search list. If we're inside a recognized
225 location in a VPP source tree (src/scripts and src/vpp-api/python)
226 then entries from there to the likely locations in build-root are
227 added. Finally the location used by system packages is added.
229 :returns: A single directory name, or None if no such directory
234 if 'VPP_API_DIR' in os.environ:
235 dirs.append(os.environ['VPP_API_DIR'])
237 # perhaps we're in the 'src/scripts' or 'src/vpp-api/python' dir;
238 # in which case, plot a course to likely places in the src tree
239 import __main__ as main
240 if hasattr(main, '__file__'):
241 # get the path of the calling script
242 localdir = os.path.dirname(os.path.realpath(main.__file__))
244 # use cwd if there is no calling script
245 localdir = os.getcwd()
246 localdir_s = localdir.split(os.path.sep)
249 """Match dir against right-hand components of the script dir"""
250 d = dir.split('/') # param 'dir' assumes a / separator
252 return len(localdir_s) > length and localdir_s[-length:] == d
254 def sdir(srcdir, variant):
255 """Build a path from srcdir to the staged API files of
256 'variant' (typically '' or '_debug')"""
257 # Since 'core' and 'plugin' files are staged
258 # in separate directories, we target the parent dir.
259 return os.path.sep.join((
262 'install-vpp%s-native' % variant,
270 if dmatch('src/scripts'):
271 srcdir = os.path.sep.join(localdir_s[:-2])
272 elif dmatch('src/vpp-api/python'):
273 srcdir = os.path.sep.join(localdir_s[:-3])
275 # we're apparently running tests
276 srcdir = os.path.sep.join(localdir_s[:-1])
279 # we're in the source tree, try both the debug and release
281 dirs.append(sdir(srcdir, '_debug'))
282 dirs.append(sdir(srcdir, ''))
284 # Test for staged copies of the scripts
285 # For these, since we explicitly know if we're running a debug versus
286 # release variant, target only the relevant directory
287 if dmatch('build-root/install-vpp_debug-native/vpp/bin'):
288 srcdir = os.path.sep.join(localdir_s[:-4])
289 dirs.append(sdir(srcdir, '_debug'))
290 if dmatch('build-root/install-vpp-native/vpp/bin'):
291 srcdir = os.path.sep.join(localdir_s[:-4])
292 dirs.append(sdir(srcdir, ''))
294 # finally, try the location system packages typically install into
295 dirs.append(os.path.sep.join(('', 'usr', 'share', 'vpp', 'api')))
297 # check the directories for existance; first one wins
299 if os.path.isdir(dir):
305 def find_api_files(cls, api_dir=None, patterns='*'):
306 """Find API definition files from the given directory tree with the
307 given pattern. If no directory is given then find_api_dir() is used
308 to locate one. If no pattern is given then all definition files found
309 in the directory tree are used.
311 :param api_dir: A directory tree in which to locate API definition
312 files; subdirectories are descended into.
313 If this is None then find_api_dir() is called to discover it.
314 :param patterns: A list of patterns to use in each visited directory
315 when looking for files.
316 This can be a list/tuple object or a comma-separated string of
317 patterns. Each value in the list will have leading/trialing
319 The pattern specifies the first part of the filename, '.api.json'
321 The results are de-duplicated, thus overlapping patterns are fine.
322 If this is None it defaults to '*' meaning "all API files".
323 :returns: A list of file paths for the API files found.
326 api_dir = cls.find_api_dir()
328 raise RuntimeError("api_dir cannot be located")
330 if isinstance(patterns, list) or isinstance(patterns, tuple):
331 patterns = [p.strip() + '.api.json' for p in patterns]
333 patterns = [p.strip() + '.api.json' for p in patterns.split(",")]
336 for root, dirnames, files in os.walk(api_dir):
337 # iterate all given patterns and de-dup the result
338 files = set(sum([fnmatch.filter(files, p) for p in patterns], []))
339 for filename in files:
340 api_files.append(os.path.join(root, filename))
346 if not hasattr(self, "_api"):
347 raise Exception("Not connected, api definitions not available")
350 def make_function(self, msg, i, multipart, do_async):
353 return self._call_vpp_async(i, msg, **kwargs)
356 return self._call_vpp(i, msg, multipart, **kwargs)
358 f.__name__ = str(msg.name)
359 f.__doc__ = ", ".join(["%s %s" %
360 (msg.fieldtypes[j], k)
361 for j, k in enumerate(msg.fields)])
364 def _register_functions(self, do_async=False):
365 self.id_names = [None] * (self.vpp_dictionary_maxid + 1)
366 self.id_msgdef = [None] * (self.vpp_dictionary_maxid + 1)
367 self._api = VppApiDynamicMethodHolder()
368 for name, msg in vpp_iterator(self.messages):
369 n = name + '_' + msg.crc[2:]
370 i = self.transport.get_msg_index(n.encode())
372 self.id_msgdef[i] = msg
373 self.id_names[i] = name
374 # TODO: Fix multipart (use services)
375 multipart = True if name.find('_dump') > 0 else False
376 f = self.make_function(msg, i, multipart, do_async)
377 setattr(self._api, name, FuncWrapper(f))
380 'No such message type or failed CRC checksum: %s', n)
382 def connect_internal(self, name, msg_handler, chroot_prefix, rx_qlen,
384 pfx = chroot_prefix.encode() if chroot_prefix else None
386 rv = self.transport.connect(name.encode(), pfx, msg_handler, rx_qlen)
388 raise IOError(2, 'Connect failed')
389 self.vpp_dictionary_maxid = self.transport.msg_table_max_index()
390 self._register_functions(do_async=do_async)
392 # Initialise control ping
393 crc = self.messages['control_ping'].crc
394 self.control_ping_index = self.transport.get_msg_index(
395 ('control_ping' + '_' + crc[2:]).encode())
396 self.control_ping_msgdef = self.messages['control_ping']
397 if self.async_thread:
398 self.event_thread = threading.Thread(
399 target=self.thread_msg_handler)
400 self.event_thread.daemon = True
401 self.event_thread.start()
404 def connect(self, name, chroot_prefix=None, do_async=False, rx_qlen=32):
407 name - the name of the client.
408 chroot_prefix - if VPP is chroot'ed, the prefix of the jail
409 do_async - if true, messages are sent without waiting for a reply
410 rx_qlen - the length of the VPP message receive queue between
413 msg_handler = self.transport.get_callback(do_async)
414 return self.connect_internal(name, msg_handler, chroot_prefix, rx_qlen,
417 def connect_sync(self, name, chroot_prefix=None, rx_qlen=32):
418 """Attach to VPP in synchronous mode. Application must poll for events.
420 name - the name of the client.
421 chroot_prefix - if VPP is chroot'ed, the prefix of the jail
422 rx_qlen - the length of the VPP message receive queue between
426 return self.connect_internal(name, None, chroot_prefix, rx_qlen,
429 def disconnect(self):
430 """Detach from VPP."""
431 rv = self.transport.disconnect()
432 self.message_queue.put("terminate event thread")
435 def msg_handler_sync(self, msg):
436 """Process an incoming message from VPP in sync mode.
438 The message may be a reply or it may be an async notification.
440 r = self.decode_incoming_msg(msg)
444 # If we have a context, then use the context to find any
445 # request waiting for a reply
447 if hasattr(r, 'context') and r.context > 0:
451 # No context -> async notification that we feed to the callback
452 self.message_queue.put_nowait(r)
454 raise IOError(2, 'RPC reply message received in event handler')
456 def decode_incoming_msg(self, msg):
458 self.logger.warning('vpp_api.read failed')
460 (i, ci), size = self.header.unpack(msg, 0)
461 if self.id_names[i] == 'rx_thread_exit':
465 # Decode message and returns a tuple.
467 msgobj = self.id_msgdef[i]
469 raise IOError(2, 'Reply message undefined')
471 r, size = msgobj.unpack(msg)
474 def msg_handler_async(self, msg):
475 """Process a message from VPP in async mode.
477 In async mode, all messages are returned to the callback.
479 r = self.decode_incoming_msg(msg)
483 msgname = type(r).__name__
485 if self.event_callback:
486 self.event_callback(msgname, r)
488 def _control_ping(self, context):
489 """Send a ping command."""
490 self._call_vpp_async(self.control_ping_index,
491 self.control_ping_msgdef,
494 def validate_args(self, msg, kwargs):
495 d = set(kwargs.keys()) - set(msg.field_by_name.keys())
497 raise ValueError('Invalid argument {} to {}'
498 .format(list(d), msg.name))
500 def _call_vpp(self, i, msg, multipart, **kwargs):
501 """Given a message, send the message and await a reply.
503 msgdef - the message packing definition
504 i - the message type index
505 multipart - True if the message returns multiple
507 context - context number - chosen at random if not
509 The remainder of the kwargs are the arguments to the API call.
511 The return value is the message or message array containing
512 the response. It will raise an IOError exception if there was
513 no response within the timeout window.
516 if 'context' not in kwargs:
517 context = self.get_context()
518 kwargs['context'] = context
520 context = kwargs['context']
521 kwargs['_vl_msg_id'] = i
524 if self.transport.socket_index:
525 kwargs['client_index'] = self.transport.socket_index
526 except AttributeError:
528 self.validate_args(msg, kwargs)
530 self.transport.suspend()
532 self.transport.write(b)
535 # Send a ping after the request - we use its response
536 # to detect that we have seen all results.
537 self._control_ping(context)
539 # Block until we get a reply.
542 msg = self.transport.read()
544 raise IOError(2, 'VPP API client: read failed')
545 r = self.decode_incoming_msg(msg)
546 msgname = type(r).__name__
547 if context not in r or r.context == 0 or context != r.context:
548 # Message being queued
549 self.message_queue.put_nowait(r)
555 if msgname == 'control_ping_reply':
560 self.transport.resume()
564 def _call_vpp_async(self, i, msg, **kwargs):
565 """Given a message, send the message and await a reply.
567 msgdef - the message packing definition
568 i - the message type index
569 context - context number - chosen at random if not
571 The remainder of the kwargs are the arguments to the API call.
573 if 'context' not in kwargs:
574 context = self.get_context()
575 kwargs['context'] = context
577 context = kwargs['context']
579 if self.transport.socket_index:
580 kwargs['client_index'] = self.transport.socket_index
581 except AttributeError:
582 kwargs['client_index'] = 0
583 kwargs['_vl_msg_id'] = i
586 self.transport.write(b)
588 def register_event_callback(self, callback):
589 """Register a callback for async messages.
591 This will be called for async notifications in sync mode,
592 and all messages in async mode. In sync mode, replies to
593 requests will not come here.
595 callback is a fn(msg_type_name, msg_type) that will be
596 called when a message comes in. While this function is
597 executing, note that (a) you are in a background thread and
598 may wish to use threading.Lock to protect your datastructures,
599 and (b) message processing from VPP will stop (so if you take
600 a long while about it you may provoke reply timeouts or cause
601 VPP to fill the RX buffer). Passing None will disable the
604 self.event_callback = callback
606 def thread_msg_handler(self):
607 """Python thread calling the user registered message handler.
609 This is to emulate the old style event callback scheme. Modern
610 clients should provide their own thread to poll the event
614 r = self.message_queue.get()
615 if r == "terminate event thread":
617 msgname = type(r).__name__
618 if self.event_callback:
619 self.event_callback(msgname, r)
622 # vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4