3 # Copyright (c) 2016 Cisco and/or its affiliates.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
17 from __future__ import print_function
29 if sys.version[0] == '2':
36 typedef void (*vac_callback_t)(unsigned char * data, int len);
37 typedef void (*vac_error_callback_t)(void *, unsigned char *, int);
38 int vac_connect(char * name, char * chroot_prefix, vac_callback_t cb,
40 int vac_disconnect(void);
41 int vac_read(char **data, int *l, unsigned short timeout);
42 int vac_write(char *data, int len);
43 void vac_free(void * msg);
45 int vac_get_msg_index(unsigned char * name);
46 int vac_msg_table_size(void);
47 int vac_msg_table_max_index(void);
49 void vac_rx_suspend (void);
50 void vac_rx_resume (void);
51 void vac_set_error_handler(vac_error_callback_t);
54 # Barfs on failure, no need to check success.
55 vpp_api = ffi.dlopen('libvppapiclient.so')
59 """Clean up VPP connection on shutdown."""
61 self.logger.debug('Cleaning up VPP on exit')
68 if sys.version[0] == '2':
74 @ffi.callback("void(unsigned char *, int)")
75 def vac_callback_sync(data, len):
76 vpp_object.msg_handler_sync(ffi.buffer(data, len))
79 @ffi.callback("void(unsigned char *, int)")
80 def vac_callback_async(data, len):
81 vpp_object.msg_handler_async(ffi.buffer(data, len))
84 @ffi.callback("void(void *, unsigned char *, int)")
85 def vac_error_handler(arg, msg, msg_len):
86 vpp_object.logger.warning("VPP API client:: %s", ffi.string(msg, msg_len))
93 class FuncWrapper(object):
94 def __init__(self, func):
96 self.__name__ = func.__name__
98 def __call__(self, **kwargs):
99 return self._func(**kwargs)
105 This class provides the APIs to VPP. The APIs are loaded
106 from provided .api.json files and makes functions accordingly.
107 These functions are documented in the VPP .api files, as they
108 are dynamically created.
110 Additionally, VPP can send callback messages; this class
111 provides a means to register a callback function to receive
112 these messages in a background thread.
114 def __init__(self, apifiles=None, testmode=False, async_thread=True,
115 logger=logging.getLogger('vpp_papi'), loglevel='debug'):
116 """Create a VPP API object.
118 apifiles is a list of files containing API
119 descriptions that will be loaded - methods will be
120 dynamically created reflecting these APIs. If not
121 provided this will load the API files from VPP's
122 default install location.
127 logging.basicConfig(level=getattr(logging, loglevel.upper()))
132 self.connected = False
133 self.header = struct.Struct('>HI')
135 self.event_callback = None
136 self.message_queue = queue.Queue()
137 self.read_timeout = 0
138 self.vpp_api = vpp_api
140 self.event_thread = threading.Thread(
141 target=self.thread_msg_handler)
142 self.event_thread.daemon = True
143 self.event_thread.start()
146 # Pick up API definitions from default directory
147 apifiles = glob.glob('/usr/share/vpp/api/*.api.json')
149 for file in apifiles:
150 with open(file) as apidef_file:
151 api = json.load(apidef_file)
152 for t in api['types']:
153 self.add_type(t[0], t[1:])
155 for m in api['messages']:
156 self.add_message(m[0], m[1:])
157 self.apifiles = apifiles
160 if len(self.messages) == 0 and not testmode:
161 raise ValueError(1, 'Missing JSON message definitions')
163 # Make sure we allow VPP to clean up the message rings.
164 atexit.register(vpp_atexit, self)
166 # Register error handler
167 vpp_api.vac_set_error_handler(vac_error_handler)
169 class ContextId(object):
170 """Thread-safe provider of unique context IDs."""
173 self.lock = threading.Lock()
176 """Get a new unique (or, at least, not recently used) context."""
180 get_context = ContextId()
183 """Debug function: report current VPP API status to stdout."""
184 print('Connected') if self.connected else print('Not Connected')
185 print('Read API definitions from', ', '.join(self.apifiles))
187 def __struct(self, t, n=None, e=-1, vl=None):
188 """Create a packing structure for a message."""
189 base_types = {'u8': 'B',
199 if e > 0 and t == 'u8':
201 s = struct.Struct('>' + str(e) + 's')
204 # Fixed array of base type
205 s = struct.Struct('>' + base_types[t])
206 return s.size, [e, s]
208 # Old style variable array
209 s = struct.Struct('>' + base_types[t])
210 return s.size, [-1, s]
212 # Variable length array
214 s = struct.Struct('>s')
215 return s.size, [vl, s]
217 s = struct.Struct('>' + base_types[t])
218 return s.size, [vl, s]
220 s = struct.Struct('>' + base_types[t])
223 if t in self.messages:
224 size = self.messages[t]['sizes'][0]
226 # Return a list in case of array
228 return size, [e, lambda self, encode, buf, offset, args: (
229 self.__struct_type(encode, self.messages[t], buf, offset,
232 return size, [vl, lambda self, encode, buf, offset, args: (
233 self.__struct_type(encode, self.messages[t], buf, offset,
237 raise NotImplementedError(1,
238 'No support for compound types ' + t)
239 return size, lambda self, encode, buf, offset, args: (
240 self.__struct_type(encode, self.messages[t], buf, offset, args)
243 raise ValueError(1, 'Invalid message type: ' + t)
245 def __struct_type(self, encode, msgdef, buf, offset, kwargs):
246 """Get a message packer or unpacker."""
248 return self.__struct_type_encode(msgdef, buf, offset, kwargs)
250 return self.__struct_type_decode(msgdef, buf, offset)
252 def __struct_type_encode(self, msgdef, buf, offset, kwargs):
257 if k not in msgdef['args']:
258 raise ValueError(1,'Non existing argument [' + k + ']' + \
259 ' used in call to: ' + \
260 self.id_names[kwargs['_vl_msg_id']] + '()' )
262 for k, v in vpp_iterator(msgdef['args']):
267 e = kwargs[v[0]] if v[0] in kwargs else v[0]
268 if e != len(kwargs[k]):
269 raise (ValueError(1, 'Input list length mismatch: %s (%s != %s)' % (k, e, len(kwargs[k]))))
272 size += v[1](self, True, buf, off + size,
277 if l != len(kwargs[k]):
278 raise ValueError(1, 'Input list length mistmatch: %s (%s != %s)' % (k, l, len(kwargs[k])))
282 buf[off:off + l] = bytearray(kwargs[k])
287 v[1].pack_into(buf, off + size, i)
291 size = v(self, True, buf, off, kwargs[k])
293 if type(kwargs[k]) is str and v.size < len(kwargs[k]):
294 raise ValueError(1, 'Input list length mistmatch: %s (%s < %s)' % (k, v.size, len(kwargs[k])))
295 v.pack_into(buf, off, kwargs[k])
298 size = v.size if not type(v) is list else 0
300 return off + size - offset
302 def __getitem__(self, name):
303 if name in self.messages:
304 return self.messages[name]
307 def get_size(self, sizes, kwargs):
308 total_size = sizes[0]
310 if e in kwargs and type(kwargs[e]) is list:
311 total_size += len(kwargs[e]) * sizes[1][e]
314 def encode(self, msgdef, kwargs):
315 # Make suitably large buffer
316 size = self.get_size(msgdef['sizes'], kwargs)
317 buf = bytearray(size)
319 size = self.__struct_type(True, msgdef, buf, offset, kwargs)
320 return buf[:offset + size]
322 def decode(self, msgdef, buf):
323 return self.__struct_type(False, msgdef, buf, 0, None)[1]
325 def __struct_type_decode(self, msgdef, buf, offset):
329 for k, v in vpp_iterator(msgdef['args']):
333 if callable(v[1]): # compound type
335 if v[0] in msgdef['args']: # vla
341 (s, l) = v[1](self, False, buf, off + size, None)
346 if type(v[0]) is int:
347 size = len(buf) - off
350 res.append(buf[off:off + size])
352 e = v[0] if type(v[0]) is int else res[v[2]]
354 e = (len(buf) - off) / v[1].size
359 lst.append(v[1].unpack_from(buf, off + size)[0])
363 (s, l) = v(self, False, buf, off, None)
367 res.append(v.unpack_from(buf, off)[0])
370 return off + size - offset, msgdef['return_tuple']._make(res)
372 def ret_tup(self, name):
373 if name in self.messages and 'return_tuple' in self.messages[name]:
374 return self.messages[name]['return_tuple']
377 def add_message(self, name, msgdef, typeonly=False):
378 if name in self.messages:
379 raise ValueError('Duplicate message name: ' + name)
381 args = collections.OrderedDict()
382 argtypes = collections.OrderedDict()
387 for i, f in enumerate(msgdef):
388 if type(f) is dict and 'crc' in f:
389 msg['crc'] = f['crc']
393 if len(f) == 3 and f[2] == 0 and i != len(msgdef) - 2:
394 raise ValueError('Variable Length Array must be last: ' + name)
395 size, s = self.__struct(*f)
397 if type(s) == list and type(s[0]) == int and type(s[1]) == struct.Struct:
399 sizes[field_name] = size
401 sizes[field_name] = size
402 total_size += s[0] * size
404 sizes[field_name] = size
407 argtypes[field_name] = field_type
408 if len(f) == 4: # Find offset to # elements field
409 idx = list(args.keys()).index(f[3]) - i
410 args[field_name].append(idx)
411 fields.append(field_name)
412 msg['return_tuple'] = collections.namedtuple(name, fields,
414 self.messages[name] = msg
415 self.messages[name]['args'] = args
416 self.messages[name]['argtypes'] = argtypes
417 self.messages[name]['typeonly'] = typeonly
418 self.messages[name]['sizes'] = [total_size, sizes]
419 return self.messages[name]
421 def add_type(self, name, typedef):
422 return self.add_message('vl_api_' + name + '_t', typedef,
425 def make_function(self, name, i, msgdef, multipart, async):
427 f = lambda **kwargs: (self._call_vpp_async(i, msgdef, **kwargs))
429 f = lambda **kwargs: (self._call_vpp(i, msgdef, multipart,
431 args = self.messages[name]['args']
432 argtypes = self.messages[name]['argtypes']
433 f.__name__ = str(name)
434 f.__doc__ = ", ".join(["%s %s" %
435 (argtypes[k], k) for k in args.keys()])
440 if not hasattr(self, "_api"):
441 raise Exception("Not connected, api definitions not available")
444 def _register_functions(self, async=False):
445 self.id_names = [None] * (self.vpp_dictionary_maxid + 1)
446 self.id_msgdef = [None] * (self.vpp_dictionary_maxid + 1)
448 for name, msgdef in vpp_iterator(self.messages):
449 if self.messages[name]['typeonly']:
451 crc = self.messages[name]['crc']
452 n = name + '_' + crc[2:]
453 i = vpp_api.vac_get_msg_index(n.encode())
455 self.id_msgdef[i] = msgdef
456 self.id_names[i] = name
457 multipart = True if name.find('_dump') > 0 else False
458 f = self.make_function(name, i, msgdef, multipart, async)
459 setattr(self._api, name, FuncWrapper(f))
461 # old API stuff starts here - will be removed in 17.07
462 if hasattr(self, name):
464 3, "Conflicting name in JSON definition: `%s'" % name)
465 setattr(self, name, f)
466 # old API stuff ends here
469 'No such message type or failed CRC checksum: %s', n)
471 def _write(self, buf):
472 """Send a binary-packed message to VPP."""
473 if not self.connected:
474 raise IOError(1, 'Not connected')
475 return vpp_api.vac_write(ffi.from_buffer(buf), len(buf))
478 if not self.connected:
479 raise IOError(1, 'Not connected')
480 mem = ffi.new("char **")
481 size = ffi.new("int *")
482 rv = vpp_api.vac_read(mem, size, self.read_timeout)
484 raise IOError(rv, 'vac_read failed')
485 msg = bytes(ffi.buffer(mem[0], size[0]))
486 vpp_api.vac_free(mem[0])
489 def connect_internal(self, name, msg_handler, chroot_prefix, rx_qlen,
491 pfx = chroot_prefix.encode() if chroot_prefix else ffi.NULL
492 rv = vpp_api.vac_connect(name.encode(), pfx, msg_handler, rx_qlen)
494 raise IOError(2, 'Connect failed')
495 self.connected = True
497 self.vpp_dictionary_maxid = vpp_api.vac_msg_table_max_index()
498 self._register_functions(async=async)
500 # Initialise control ping
501 crc = self.messages['control_ping']['crc']
502 self.control_ping_index = vpp_api.vac_get_msg_index(
503 ('control_ping' + '_' + crc[2:]).encode())
504 self.control_ping_msgdef = self.messages['control_ping']
507 def connect(self, name, chroot_prefix=None, async=False, rx_qlen=32):
510 name - the name of the client.
511 chroot_prefix - if VPP is chroot'ed, the prefix of the jail
512 async - if true, messages are sent without waiting for a reply
513 rx_qlen - the length of the VPP message receive queue between
516 msg_handler = vac_callback_sync if not async else vac_callback_async
517 return self.connect_internal(name, msg_handler, chroot_prefix, rx_qlen,
520 def connect_sync(self, name, chroot_prefix=None, rx_qlen=32):
521 """Attach to VPP in synchronous mode. Application must poll for events.
523 name - the name of the client.
524 chroot_prefix - if VPP is chroot'ed, the prefix of the jail
525 rx_qlen - the length of the VPP message receive queue between
529 return self.connect_internal(name, ffi.NULL, chroot_prefix, rx_qlen,
532 def disconnect(self):
533 """Detach from VPP."""
534 rv = vpp_api.vac_disconnect()
535 self.connected = False
538 def msg_handler_sync(self, msg):
539 """Process an incoming message from VPP in sync mode.
541 The message may be a reply or it may be an async notification.
543 r = self.decode_incoming_msg(msg)
547 # If we have a context, then use the context to find any
548 # request waiting for a reply
550 if hasattr(r, 'context') and r.context > 0:
553 msgname = type(r).__name__
556 # No context -> async notification that we feed to the callback
557 self.message_queue.put_nowait(r)
559 raise IOError(2, 'RPC reply message received in event handler')
561 def decode_incoming_msg(self, msg):
563 self.logger.warning('vpp_api.read failed')
566 i, ci = self.header.unpack_from(msg, 0)
567 if self.id_names[i] == 'rx_thread_exit':
571 # Decode message and returns a tuple.
573 msgdef = self.id_msgdef[i]
575 raise IOError(2, 'Reply message undefined')
577 r = self.decode(msgdef, msg)
581 def msg_handler_async(self, msg):
582 """Process a message from VPP in async mode.
584 In async mode, all messages are returned to the callback.
586 r = self.decode_incoming_msg(msg)
590 msgname = type(r).__name__
592 if self.event_callback:
593 self.event_callback(msgname, r)
595 def _control_ping(self, context):
596 """Send a ping command."""
597 self._call_vpp_async(self.control_ping_index,
598 self.control_ping_msgdef,
601 def _call_vpp(self, i, msgdef, multipart, **kwargs):
602 """Given a message, send the message and await a reply.
604 msgdef - the message packing definition
605 i - the message type index
606 multipart - True if the message returns multiple
608 context - context number - chosen at random if not
610 The remainder of the kwargs are the arguments to the API call.
612 The return value is the message or message array containing
613 the response. It will raise an IOError exception if there was
614 no response within the timeout window.
617 if 'context' not in kwargs:
618 context = self.get_context()
619 kwargs['context'] = context
621 context = kwargs['context']
622 kwargs['_vl_msg_id'] = i
623 b = self.encode(msgdef, kwargs)
625 vpp_api.vac_rx_suspend()
629 # Send a ping after the request - we use its response
630 # to detect that we have seen all results.
631 self._control_ping(context)
633 # Block until we get a reply.
638 raise IOError(2, 'VPP API client: read failed')
640 r = self.decode_incoming_msg(msg)
641 msgname = type(r).__name__
642 if context not in r or r.context == 0 or context != r.context:
643 self.message_queue.put_nowait(r)
649 if msgname == 'control_ping_reply':
654 vpp_api.vac_rx_resume()
658 def _call_vpp_async(self, i, msgdef, **kwargs):
659 """Given a message, send the message and await a reply.
661 msgdef - the message packing definition
662 i - the message type index
663 context - context number - chosen at random if not
665 The remainder of the kwargs are the arguments to the API call.
667 if 'context' not in kwargs:
668 context = self.get_context()
669 kwargs['context'] = context
671 context = kwargs['context']
672 kwargs['_vl_msg_id'] = i
673 b = self.encode(msgdef, kwargs)
677 def register_event_callback(self, callback):
678 """Register a callback for async messages.
680 This will be called for async notifications in sync mode,
681 and all messages in async mode. In sync mode, replies to
682 requests will not come here.
684 callback is a fn(msg_type_name, msg_type) that will be
685 called when a message comes in. While this function is
686 executing, note that (a) you are in a background thread and
687 may wish to use threading.Lock to protect your datastructures,
688 and (b) message processing from VPP will stop (so if you take
689 a long while about it you may provoke reply timeouts or cause
690 VPP to fill the RX buffer). Passing None will disable the
693 self.event_callback = callback
695 def thread_msg_handler(self):
696 """Python thread calling the user registerd message handler.
698 This is to emulate the old style event callback scheme. Modern
699 clients should provide their own thread to poll the event
703 r = self.message_queue.get()
704 msgname = type(r).__name__
705 if self.event_callback:
706 self.event_callback(msgname, r)