3 # Copyright (c) 2016 Cisco and/or its affiliates.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
17 from __future__ import print_function
30 if sys.version[0] == '2':
37 typedef void (*vac_callback_t)(unsigned char * data, int len);
38 typedef void (*vac_error_callback_t)(void *, unsigned char *, int);
39 int vac_connect(char * name, char * chroot_prefix, vac_callback_t cb,
41 int vac_disconnect(void);
42 int vac_read(char **data, int *l, unsigned short timeout);
43 int vac_write(char *data, int len);
44 void vac_free(void * msg);
46 int vac_get_msg_index(unsigned char * name);
47 int vac_msg_table_size(void);
48 int vac_msg_table_max_index(void);
50 void vac_rx_suspend (void);
51 void vac_rx_resume (void);
52 void vac_set_error_handler(vac_error_callback_t);
55 # Barfs on failure, no need to check success.
56 vpp_api = ffi.dlopen('libvppapiclient.so')
59 """Clean up VPP connection on shutdown."""
61 self.logger.debug('Cleaning up VPP on exit')
68 if sys.version[0] == '2':
74 @ffi.callback("void(unsigned char *, int)")
75 def vac_callback_sync(data, len):
76 vpp_object.msg_handler_sync(ffi.buffer(data, len))
79 @ffi.callback("void(unsigned char *, int)")
80 def vac_callback_async(data, len):
81 vpp_object.msg_handler_async(ffi.buffer(data, len))
84 @ffi.callback("void(void *, unsigned char *, int)")
85 def vac_error_handler(arg, msg, msg_len):
86 vpp_object.logger.warning("VPP API client:: %s", ffi.string(msg, msg_len))
93 class FuncWrapper(object):
94 def __init__(self, func):
96 self.__name__ = func.__name__
98 def __call__(self, **kwargs):
99 return self._func(**kwargs)
105 This class provides the APIs to VPP. The APIs are loaded
106 from provided .api.json files and makes functions accordingly.
107 These functions are documented in the VPP .api files, as they
108 are dynamically created.
110 Additionally, VPP can send callback messages; this class
111 provides a means to register a callback function to receive
112 these messages in a background thread.
114 def __init__(self, apifiles=None, testmode=False, async_thread=True,
115 logger=logging.getLogger('vpp_papi'), loglevel='debug'):
116 """Create a VPP API object.
118 apifiles is a list of files containing API
119 descriptions that will be loaded - methods will be
120 dynamically created reflecting these APIs. If not
121 provided this will load the API files from VPP's
122 default install location.
127 logging.basicConfig(level=getattr(logging, loglevel.upper()))
132 self.connected = False
133 self.header = struct.Struct('>HI')
135 self.event_callback = None
136 self.message_queue = queue.Queue()
137 self.read_timeout = 0
138 self.vpp_api = vpp_api
140 self.event_thread = threading.Thread(
141 target=self.thread_msg_handler)
142 self.event_thread.daemon = True
143 self.event_thread.start()
146 # Pick up API definitions from default directory
147 apifiles = glob.glob('/usr/share/vpp/api/*.api.json')
149 for file in apifiles:
150 with open(file) as apidef_file:
151 api = json.load(apidef_file)
152 for t in api['types']:
153 self.add_type(t[0], t[1:])
155 for m in api['messages']:
156 self.add_message(m[0], m[1:])
157 self.apifiles = apifiles
160 if len(self.messages) == 0 and not testmode:
161 raise ValueError(1, 'Missing JSON message definitions')
163 # Make sure we allow VPP to clean up the message rings.
164 atexit.register(vpp_atexit, self)
166 # Register error handler
167 vpp_api.vac_set_error_handler(vac_error_handler)
169 # Support legacy CFFI
170 # from_buffer supported from 1.8.0
171 (major, minor, patch) = [int(s) for s in cffi.__version__.split('.', 3)]
172 if major >= 1 and minor >= 8:
173 self._write = self._write_new_cffi
175 self._write = self._write_legacy_cffi
177 class ContextId(object):
178 """Thread-safe provider of unique context IDs."""
181 self.lock = threading.Lock()
184 """Get a new unique (or, at least, not recently used) context."""
188 get_context = ContextId()
191 """Debug function: report current VPP API status to stdout."""
192 print('Connected') if self.connected else print('Not Connected')
193 print('Read API definitions from', ', '.join(self.apifiles))
195 def __struct(self, t, n=None, e=-1, vl=None):
196 """Create a packing structure for a message."""
197 base_types = {'u8': 'B',
207 if e > 0 and t == 'u8':
209 s = struct.Struct('>' + str(e) + 's')
212 # Fixed array of base type
213 s = struct.Struct('>' + base_types[t])
214 return s.size, [e, s]
216 # Old style variable array
217 s = struct.Struct('>' + base_types[t])
218 return s.size, [-1, s]
220 # Variable length array
222 s = struct.Struct('>s')
223 return s.size, [vl, s]
225 s = struct.Struct('>' + base_types[t])
226 return s.size, [vl, s]
228 s = struct.Struct('>' + base_types[t])
231 if t in self.messages:
232 size = self.messages[t]['sizes'][0]
234 # Return a list in case of array
236 return size, [e, lambda self, encode, buf, offset, args: (
237 self.__struct_type(encode, self.messages[t], buf, offset,
240 return size, [vl, lambda self, encode, buf, offset, args: (
241 self.__struct_type(encode, self.messages[t], buf, offset,
245 raise NotImplementedError(1,
246 'No support for compound types ' + t)
247 return size, lambda self, encode, buf, offset, args: (
248 self.__struct_type(encode, self.messages[t], buf, offset, args)
251 raise ValueError(1, 'Invalid message type: ' + t)
253 def __struct_type(self, encode, msgdef, buf, offset, kwargs):
254 """Get a message packer or unpacker."""
256 return self.__struct_type_encode(msgdef, buf, offset, kwargs)
258 return self.__struct_type_decode(msgdef, buf, offset)
260 def __struct_type_encode(self, msgdef, buf, offset, kwargs):
265 if k not in msgdef['args']:
266 raise ValueError(1,'Non existing argument [' + k + ']' + \
267 ' used in call to: ' + \
268 self.id_names[kwargs['_vl_msg_id']] + '()' )
270 for k, v in vpp_iterator(msgdef['args']):
275 e = kwargs[v[0]] if v[0] in kwargs else v[0]
276 if e != len(kwargs[k]):
277 raise (ValueError(1, 'Input list length mismatch: %s (%s != %s)' % (k, e, len(kwargs[k]))))
280 size += v[1](self, True, buf, off + size,
285 if l != len(kwargs[k]):
286 raise ValueError(1, 'Input list length mistmatch: %s (%s != %s)' % (k, l, len(kwargs[k])))
290 buf[off:off + l] = bytearray(kwargs[k])
295 v[1].pack_into(buf, off + size, i)
299 size = v(self, True, buf, off, kwargs[k])
301 if type(kwargs[k]) is str and v.size < len(kwargs[k]):
302 raise ValueError(1, 'Input list length mistmatch: %s (%s < %s)' % (k, v.size, len(kwargs[k])))
303 v.pack_into(buf, off, kwargs[k])
306 size = v.size if not type(v) is list else 0
308 return off + size - offset
310 def __getitem__(self, name):
311 if name in self.messages:
312 return self.messages[name]
315 def get_size(self, sizes, kwargs):
316 total_size = sizes[0]
318 if e in kwargs and type(kwargs[e]) is list:
319 total_size += len(kwargs[e]) * sizes[1][e]
322 def encode(self, msgdef, kwargs):
323 # Make suitably large buffer
324 size = self.get_size(msgdef['sizes'], kwargs)
325 buf = bytearray(size)
327 size = self.__struct_type(True, msgdef, buf, offset, kwargs)
328 return buf[:offset + size]
330 def decode(self, msgdef, buf):
331 return self.__struct_type(False, msgdef, buf, 0, None)[1]
333 def __struct_type_decode(self, msgdef, buf, offset):
337 for k, v in vpp_iterator(msgdef['args']):
341 if callable(v[1]): # compound type
343 if v[0] in msgdef['args']: # vla
349 (s, l) = v[1](self, False, buf, off + size, None)
354 if type(v[0]) is int:
355 size = len(buf) - off
358 res.append(buf[off:off + size])
360 e = v[0] if type(v[0]) is int else res[v[2]]
362 e = (len(buf) - off) / v[1].size
367 lst.append(v[1].unpack_from(buf, off + size)[0])
372 (s, l) = v(self, False, buf, off, None)
376 res.append(v.unpack_from(buf, off)[0])
379 return off + size - offset, msgdef['return_tuple']._make(res)
381 def ret_tup(self, name):
382 if name in self.messages and 'return_tuple' in self.messages[name]:
383 return self.messages[name]['return_tuple']
386 def add_message(self, name, msgdef, typeonly=False):
387 if name in self.messages:
388 raise ValueError('Duplicate message name: ' + name)
390 args = collections.OrderedDict()
391 argtypes = collections.OrderedDict()
396 for i, f in enumerate(msgdef):
397 if type(f) is dict and 'crc' in f:
398 msg['crc'] = f['crc']
402 if len(f) == 3 and f[2] == 0 and i != len(msgdef) - 2:
403 raise ValueError('Variable Length Array must be last: ' + name)
404 size, s = self.__struct(*f)
406 if type(s) == list and type(s[0]) == int and type(s[1]) == struct.Struct:
408 sizes[field_name] = size
410 sizes[field_name] = size
411 total_size += s[0] * size
413 sizes[field_name] = size
416 argtypes[field_name] = field_type
417 if len(f) == 4: # Find offset to # elements field
418 idx = list(args.keys()).index(f[3]) - i
419 args[field_name].append(idx)
420 fields.append(field_name)
421 msg['return_tuple'] = collections.namedtuple(name, fields,
423 self.messages[name] = msg
424 self.messages[name]['args'] = args
425 self.messages[name]['argtypes'] = argtypes
426 self.messages[name]['typeonly'] = typeonly
427 self.messages[name]['sizes'] = [total_size, sizes]
428 return self.messages[name]
430 def add_type(self, name, typedef):
431 return self.add_message('vl_api_' + name + '_t', typedef,
434 def make_function(self, name, i, msgdef, multipart, async):
436 f = lambda **kwargs: (self._call_vpp_async(i, msgdef, **kwargs))
438 f = lambda **kwargs: (self._call_vpp(i, msgdef, multipart,
440 args = self.messages[name]['args']
441 argtypes = self.messages[name]['argtypes']
442 f.__name__ = str(name)
443 f.__doc__ = ", ".join(["%s %s" %
444 (argtypes[k], k) for k in args.keys()])
449 if not hasattr(self, "_api"):
450 raise Exception("Not connected, api definitions not available")
453 def _register_functions(self, async=False):
454 self.id_names = [None] * (self.vpp_dictionary_maxid + 1)
455 self.id_msgdef = [None] * (self.vpp_dictionary_maxid + 1)
457 for name, msgdef in vpp_iterator(self.messages):
458 if self.messages[name]['typeonly']:
460 crc = self.messages[name]['crc']
461 n = name + '_' + crc[2:]
462 i = vpp_api.vac_get_msg_index(n.encode())
464 self.id_msgdef[i] = msgdef
465 self.id_names[i] = name
466 multipart = True if name.find('_dump') > 0 else False
467 f = self.make_function(name, i, msgdef, multipart, async)
468 setattr(self._api, name, FuncWrapper(f))
470 # old API stuff starts here - will be removed in 17.07
471 if hasattr(self, name):
473 3, "Conflicting name in JSON definition: `%s'" % name)
474 setattr(self, name, f)
475 # old API stuff ends here
478 'No such message type or failed CRC checksum: %s', n)
480 def _write_new_cffi(self, buf):
481 """Send a binary-packed message to VPP."""
482 if not self.connected:
483 raise IOError(1, 'Not connected')
484 return vpp_api.vac_write(ffi.from_buffer(buf), len(buf))
486 def _write_legacy_cffi(self, buf):
487 """Send a binary-packed message to VPP."""
488 if not self.connected:
489 raise IOError(1, 'Not connected')
490 return vpp_api.vac_write(str(buf), len(buf))
493 if not self.connected:
494 raise IOError(1, 'Not connected')
495 mem = ffi.new("char **")
496 size = ffi.new("int *")
497 rv = vpp_api.vac_read(mem, size, self.read_timeout)
499 raise IOError(rv, 'vac_read failed')
500 msg = bytes(ffi.buffer(mem[0], size[0]))
501 vpp_api.vac_free(mem[0])
504 def connect_internal(self, name, msg_handler, chroot_prefix, rx_qlen,
506 pfx = chroot_prefix.encode() if chroot_prefix else ffi.NULL
507 rv = vpp_api.vac_connect(name.encode(), pfx, msg_handler, rx_qlen)
509 raise IOError(2, 'Connect failed')
510 self.connected = True
512 self.vpp_dictionary_maxid = vpp_api.vac_msg_table_max_index()
513 self._register_functions(async=async)
515 # Initialise control ping
516 crc = self.messages['control_ping']['crc']
517 self.control_ping_index = vpp_api.vac_get_msg_index(
518 ('control_ping' + '_' + crc[2:]).encode())
519 self.control_ping_msgdef = self.messages['control_ping']
522 def connect(self, name, chroot_prefix=None, async=False, rx_qlen=32):
525 name - the name of the client.
526 chroot_prefix - if VPP is chroot'ed, the prefix of the jail
527 async - if true, messages are sent without waiting for a reply
528 rx_qlen - the length of the VPP message receive queue between
531 msg_handler = vac_callback_sync if not async else vac_callback_async
532 return self.connect_internal(name, msg_handler, chroot_prefix, rx_qlen,
535 def connect_sync(self, name, chroot_prefix=None, rx_qlen=32):
536 """Attach to VPP in synchronous mode. Application must poll for events.
538 name - the name of the client.
539 chroot_prefix - if VPP is chroot'ed, the prefix of the jail
540 rx_qlen - the length of the VPP message receive queue between
544 return self.connect_internal(name, ffi.NULL, chroot_prefix, rx_qlen,
547 def disconnect(self):
548 """Detach from VPP."""
549 rv = vpp_api.vac_disconnect()
550 self.connected = False
553 def msg_handler_sync(self, msg):
554 """Process an incoming message from VPP in sync mode.
556 The message may be a reply or it may be an async notification.
558 r = self.decode_incoming_msg(msg)
562 # If we have a context, then use the context to find any
563 # request waiting for a reply
565 if hasattr(r, 'context') and r.context > 0:
568 msgname = type(r).__name__
571 # No context -> async notification that we feed to the callback
572 self.message_queue.put_nowait(r)
574 raise IOError(2, 'RPC reply message received in event handler')
576 def decode_incoming_msg(self, msg):
578 self.logger.warning('vpp_api.read failed')
581 i, ci = self.header.unpack_from(msg, 0)
582 if self.id_names[i] == 'rx_thread_exit':
586 # Decode message and returns a tuple.
588 msgdef = self.id_msgdef[i]
590 raise IOError(2, 'Reply message undefined')
592 r = self.decode(msgdef, msg)
596 def msg_handler_async(self, msg):
597 """Process a message from VPP in async mode.
599 In async mode, all messages are returned to the callback.
601 r = self.decode_incoming_msg(msg)
605 msgname = type(r).__name__
607 if self.event_callback:
608 self.event_callback(msgname, r)
610 def _control_ping(self, context):
611 """Send a ping command."""
612 self._call_vpp_async(self.control_ping_index,
613 self.control_ping_msgdef,
616 def _call_vpp(self, i, msgdef, multipart, **kwargs):
617 """Given a message, send the message and await a reply.
619 msgdef - the message packing definition
620 i - the message type index
621 multipart - True if the message returns multiple
623 context - context number - chosen at random if not
625 The remainder of the kwargs are the arguments to the API call.
627 The return value is the message or message array containing
628 the response. It will raise an IOError exception if there was
629 no response within the timeout window.
632 if 'context' not in kwargs:
633 context = self.get_context()
634 kwargs['context'] = context
636 context = kwargs['context']
637 kwargs['_vl_msg_id'] = i
638 b = self.encode(msgdef, kwargs)
640 vpp_api.vac_rx_suspend()
644 # Send a ping after the request - we use its response
645 # to detect that we have seen all results.
646 self._control_ping(context)
648 # Block until we get a reply.
653 raise IOError(2, 'VPP API client: read failed')
655 r = self.decode_incoming_msg(msg)
656 msgname = type(r).__name__
657 if context not in r or r.context == 0 or context != r.context:
658 self.message_queue.put_nowait(r)
664 if msgname == 'control_ping_reply':
669 vpp_api.vac_rx_resume()
673 def _call_vpp_async(self, i, msgdef, **kwargs):
674 """Given a message, send the message and await a reply.
676 msgdef - the message packing definition
677 i - the message type index
678 context - context number - chosen at random if not
680 The remainder of the kwargs are the arguments to the API call.
682 if 'context' not in kwargs:
683 context = self.get_context()
684 kwargs['context'] = context
686 context = kwargs['context']
687 kwargs['_vl_msg_id'] = i
688 b = self.encode(msgdef, kwargs)
692 def register_event_callback(self, callback):
693 """Register a callback for async messages.
695 This will be called for async notifications in sync mode,
696 and all messages in async mode. In sync mode, replies to
697 requests will not come here.
699 callback is a fn(msg_type_name, msg_type) that will be
700 called when a message comes in. While this function is
701 executing, note that (a) you are in a background thread and
702 may wish to use threading.Lock to protect your datastructures,
703 and (b) message processing from VPP will stop (so if you take
704 a long while about it you may provoke reply timeouts or cause
705 VPP to fill the RX buffer). Passing None will disable the
708 self.event_callback = callback
710 def thread_msg_handler(self):
711 """Python thread calling the user registerd message handler.
713 This is to emulate the old style event callback scheme. Modern
714 clients should provide their own thread to poll the event
718 r = self.message_queue.get()
719 msgname = type(r).__name__
720 if self.event_callback:
721 self.event_callback(msgname, r)