3 # Copyright (c) 2016 Cisco and/or its affiliates.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
17 from __future__ import print_function
29 if sys.version[0] == '2':
36 typedef void (*vac_callback_t)(unsigned char * data, int len);
37 typedef void (*vac_error_callback_t)(void *, unsigned char *, int);
38 int vac_connect(char * name, char * chroot_prefix, vac_callback_t cb,
40 int vac_disconnect(void);
41 int vac_read(char **data, int *l, unsigned short timeout);
42 int vac_write(char *data, int len);
43 void vac_free(void * msg);
45 int vac_get_msg_index(unsigned char * name);
46 int vac_msg_table_size(void);
47 int vac_msg_table_max_index(void);
49 void vac_rx_suspend (void);
50 void vac_rx_resume (void);
51 void vac_set_error_handler(vac_error_callback_t);
54 # Barfs on failure, no need to check success.
55 vpp_api = ffi.dlopen('libvppapiclient.so')
59 """Clean up VPP connection on shutdown."""
61 self.logger.debug('Cleaning up VPP on exit')
68 if sys.version[0] == '2':
74 @ffi.callback("void(unsigned char *, int)")
75 def vac_callback_sync(data, len):
76 vpp_object.msg_handler_sync(ffi.buffer(data, len))
79 @ffi.callback("void(unsigned char *, int)")
80 def vac_callback_async(data, len):
81 vpp_object.msg_handler_async(ffi.buffer(data, len))
84 @ffi.callback("void(void *, unsigned char *, int)")
85 def vac_error_handler(arg, msg, msg_len):
86 vpp_object.logger.warning("VPP API client:: %s", ffi.string(msg, msg_len))
93 class FuncWrapper(object):
94 def __init__(self, func):
96 self.__name__ = func.__name__
98 def __call__(self, **kwargs):
99 return self._func(**kwargs)
105 This class provides the APIs to VPP. The APIs are loaded
106 from provided .api.json files and makes functions accordingly.
107 These functions are documented in the VPP .api files, as they
108 are dynamically created.
110 Additionally, VPP can send callback messages; this class
111 provides a means to register a callback function to receive
112 these messages in a background thread.
114 def __init__(self, apifiles=None, testmode=False, async_thread=True,
115 logger=logging.getLogger('vpp_papi'), loglevel='debug'):
116 """Create a VPP API object.
118 apifiles is a list of files containing API
119 descriptions that will be loaded - methods will be
120 dynamically created reflecting these APIs. If not
121 provided this will load the API files from VPP's
122 default install location.
127 logging.basicConfig(level=getattr(logging, loglevel.upper()))
132 self.connected = False
133 self.header = struct.Struct('>HI')
135 self.event_callback = None
136 self.message_queue = queue.Queue()
137 self.read_timeout = 0
138 self.vpp_api = vpp_api
140 self.event_thread = threading.Thread(
141 target=self.thread_msg_handler)
142 self.event_thread.daemon = True
143 self.event_thread.start()
146 # Pick up API definitions from default directory
147 apifiles = glob.glob('/usr/share/vpp/api/*.api.json')
149 for file in apifiles:
150 with open(file) as apidef_file:
151 api = json.load(apidef_file)
152 for t in api['types']:
153 self.add_type(t[0], t[1:])
155 for m in api['messages']:
156 self.add_message(m[0], m[1:])
157 self.apifiles = apifiles
160 if len(self.messages) == 0 and not testmode:
161 raise ValueError(1, 'Missing JSON message definitions')
163 # Make sure we allow VPP to clean up the message rings.
164 atexit.register(vpp_atexit, self)
166 # Register error handler
167 vpp_api.vac_set_error_handler(vac_error_handler)
169 class ContextId(object):
170 """Thread-safe provider of unique context IDs."""
173 self.lock = threading.Lock()
176 """Get a new unique (or, at least, not recently used) context."""
180 get_context = ContextId()
183 """Debug function: report current VPP API status to stdout."""
184 print('Connected') if self.connected else print('Not Connected')
185 print('Read API definitions from', ', '.join(self.apifiles))
187 def __struct(self, t, n=None, e=-1, vl=None):
188 """Create a packing structure for a message."""
189 base_types = {'u8': 'B',
199 if e > 0 and t == 'u8':
201 s = struct.Struct('>' + str(e) + 's')
204 # Fixed array of base type
205 s = struct.Struct('>' + base_types[t])
206 return s.size, [e, s]
208 # Old style variable array
209 s = struct.Struct('>' + base_types[t])
210 return s.size, [-1, s]
212 # Variable length array
214 s = struct.Struct('>s')
215 return s.size, [vl, s]
217 s = struct.Struct('>' + base_types[t])
218 return s.size, [vl, s]
220 s = struct.Struct('>' + base_types[t])
223 if t in self.messages:
224 size = self.messages[t]['sizes'][0]
226 # Return a list in case of array
228 return size, [e, lambda self, encode, buf, offset, args: (
229 self.__struct_type(encode, self.messages[t], buf, offset,
232 return size, [vl, lambda self, encode, buf, offset, args: (
233 self.__struct_type(encode, self.messages[t], buf, offset,
237 raise NotImplementedError(1,
238 'No support for compound types ' + t)
239 return size, lambda self, encode, buf, offset, args: (
240 self.__struct_type(encode, self.messages[t], buf, offset, args)
243 raise ValueError(1, 'Invalid message type: ' + t)
245 def __struct_type(self, encode, msgdef, buf, offset, kwargs):
246 """Get a message packer or unpacker."""
248 return self.__struct_type_encode(msgdef, buf, offset, kwargs)
250 return self.__struct_type_decode(msgdef, buf, offset)
252 def __struct_type_encode(self, msgdef, buf, offset, kwargs):
257 if k not in msgdef['args']:
258 raise ValueError(1,'Non existing argument [' + k + ']' + \
259 ' used in call to: ' + \
260 self.id_names[kwargs['_vl_msg_id']] + '()' )
262 for k, v in vpp_iterator(msgdef['args']):
267 e = kwargs[v[0]] if v[0] in kwargs else v[0]
268 if e != len(kwargs[k]):
269 raise (ValueError(1, 'Input list length mismatch: %s (%s != %s)' % (k, e, len(kwargs[k]))))
272 size += v[1](self, True, buf, off + size,
277 if l != len(kwargs[k]):
278 raise ValueError(1, 'Input list length mistmatch: %s (%s != %s)' % (k, l, len(kwargs[k])))
282 buf[off:off + l] = bytearray(kwargs[k])
287 v[1].pack_into(buf, off + size, i)
291 size = v(self, True, buf, off, kwargs[k])
293 if type(kwargs[k]) is str and v.size < len(kwargs[k]):
294 raise ValueError(1, 'Input list length mistmatch: %s (%s < %s)' % (k, v.size, len(kwargs[k])))
295 v.pack_into(buf, off, kwargs[k])
298 size = v.size if not type(v) is list else 0
300 return off + size - offset
302 def __getitem__(self, name):
303 if name in self.messages:
304 return self.messages[name]
307 def get_size(self, sizes, kwargs):
308 total_size = sizes[0]
310 if e in kwargs and type(kwargs[e]) is list:
311 total_size += len(kwargs[e]) * sizes[1][e]
314 def encode(self, msgdef, kwargs):
315 # Make suitably large buffer
316 size = self.get_size(msgdef['sizes'], kwargs)
317 buf = bytearray(size)
319 size = self.__struct_type(True, msgdef, buf, offset, kwargs)
320 return buf[:offset + size]
322 def decode(self, msgdef, buf):
323 return self.__struct_type(False, msgdef, buf, 0, None)[1]
325 def __struct_type_decode(self, msgdef, buf, offset):
329 for k, v in vpp_iterator(msgdef['args']):
333 if callable(v[1]): # compound type
335 if v[0] in msgdef['args']: # vla
341 (s, l) = v[1](self, False, buf, off + size, None)
346 if type(v[0]) is int:
347 size = len(buf) - off
350 res.append(buf[off:off + size])
352 e = v[0] if type(v[0]) is int else res[v[2]]
354 e = (len(buf) - off) / v[1].size
359 lst.append(v[1].unpack_from(buf, off + size)[0])
364 (s, l) = v(self, False, buf, off, None)
368 res.append(v.unpack_from(buf, off)[0])
371 return off + size - offset, msgdef['return_tuple']._make(res)
373 def ret_tup(self, name):
374 if name in self.messages and 'return_tuple' in self.messages[name]:
375 return self.messages[name]['return_tuple']
378 def add_message(self, name, msgdef, typeonly=False):
379 if name in self.messages:
380 raise ValueError('Duplicate message name: ' + name)
382 args = collections.OrderedDict()
383 argtypes = collections.OrderedDict()
388 for i, f in enumerate(msgdef):
389 if type(f) is dict and 'crc' in f:
390 msg['crc'] = f['crc']
394 if len(f) == 3 and f[2] == 0 and i != len(msgdef) - 2:
395 raise ValueError('Variable Length Array must be last: ' + name)
396 size, s = self.__struct(*f)
398 if type(s) == list and type(s[0]) == int and type(s[1]) == struct.Struct:
400 sizes[field_name] = size
402 sizes[field_name] = size
403 total_size += s[0] * size
405 sizes[field_name] = size
408 argtypes[field_name] = field_type
409 if len(f) == 4: # Find offset to # elements field
410 idx = list(args.keys()).index(f[3]) - i
411 args[field_name].append(idx)
412 fields.append(field_name)
413 msg['return_tuple'] = collections.namedtuple(name, fields,
415 self.messages[name] = msg
416 self.messages[name]['args'] = args
417 self.messages[name]['argtypes'] = argtypes
418 self.messages[name]['typeonly'] = typeonly
419 self.messages[name]['sizes'] = [total_size, sizes]
420 return self.messages[name]
422 def add_type(self, name, typedef):
423 return self.add_message('vl_api_' + name + '_t', typedef,
426 def make_function(self, name, i, msgdef, multipart, async):
428 f = lambda **kwargs: (self._call_vpp_async(i, msgdef, **kwargs))
430 f = lambda **kwargs: (self._call_vpp(i, msgdef, multipart,
432 args = self.messages[name]['args']
433 argtypes = self.messages[name]['argtypes']
434 f.__name__ = str(name)
435 f.__doc__ = ", ".join(["%s %s" %
436 (argtypes[k], k) for k in args.keys()])
441 if not hasattr(self, "_api"):
442 raise Exception("Not connected, api definitions not available")
445 def _register_functions(self, async=False):
446 self.id_names = [None] * (self.vpp_dictionary_maxid + 1)
447 self.id_msgdef = [None] * (self.vpp_dictionary_maxid + 1)
449 for name, msgdef in vpp_iterator(self.messages):
450 if self.messages[name]['typeonly']:
452 crc = self.messages[name]['crc']
453 n = name + '_' + crc[2:]
454 i = vpp_api.vac_get_msg_index(n.encode())
456 self.id_msgdef[i] = msgdef
457 self.id_names[i] = name
458 multipart = True if name.find('_dump') > 0 else False
459 f = self.make_function(name, i, msgdef, multipart, async)
460 setattr(self._api, name, FuncWrapper(f))
462 # old API stuff starts here - will be removed in 17.07
463 if hasattr(self, name):
465 3, "Conflicting name in JSON definition: `%s'" % name)
466 setattr(self, name, f)
467 # old API stuff ends here
470 'No such message type or failed CRC checksum: %s', n)
472 def _write(self, buf):
473 """Send a binary-packed message to VPP."""
474 if not self.connected:
475 raise IOError(1, 'Not connected')
476 return vpp_api.vac_write(ffi.from_buffer(buf), len(buf))
479 if not self.connected:
480 raise IOError(1, 'Not connected')
481 mem = ffi.new("char **")
482 size = ffi.new("int *")
483 rv = vpp_api.vac_read(mem, size, self.read_timeout)
485 raise IOError(rv, 'vac_read failed')
486 msg = bytes(ffi.buffer(mem[0], size[0]))
487 vpp_api.vac_free(mem[0])
490 def connect_internal(self, name, msg_handler, chroot_prefix, rx_qlen,
492 pfx = chroot_prefix.encode() if chroot_prefix else ffi.NULL
493 rv = vpp_api.vac_connect(name.encode(), pfx, msg_handler, rx_qlen)
495 raise IOError(2, 'Connect failed')
496 self.connected = True
498 self.vpp_dictionary_maxid = vpp_api.vac_msg_table_max_index()
499 self._register_functions(async=async)
501 # Initialise control ping
502 crc = self.messages['control_ping']['crc']
503 self.control_ping_index = vpp_api.vac_get_msg_index(
504 ('control_ping' + '_' + crc[2:]).encode())
505 self.control_ping_msgdef = self.messages['control_ping']
508 def connect(self, name, chroot_prefix=None, async=False, rx_qlen=32):
511 name - the name of the client.
512 chroot_prefix - if VPP is chroot'ed, the prefix of the jail
513 async - if true, messages are sent without waiting for a reply
514 rx_qlen - the length of the VPP message receive queue between
517 msg_handler = vac_callback_sync if not async else vac_callback_async
518 return self.connect_internal(name, msg_handler, chroot_prefix, rx_qlen,
521 def connect_sync(self, name, chroot_prefix=None, rx_qlen=32):
522 """Attach to VPP in synchronous mode. Application must poll for events.
524 name - the name of the client.
525 chroot_prefix - if VPP is chroot'ed, the prefix of the jail
526 rx_qlen - the length of the VPP message receive queue between
530 return self.connect_internal(name, ffi.NULL, chroot_prefix, rx_qlen,
533 def disconnect(self):
534 """Detach from VPP."""
535 rv = vpp_api.vac_disconnect()
536 self.connected = False
539 def msg_handler_sync(self, msg):
540 """Process an incoming message from VPP in sync mode.
542 The message may be a reply or it may be an async notification.
544 r = self.decode_incoming_msg(msg)
548 # If we have a context, then use the context to find any
549 # request waiting for a reply
551 if hasattr(r, 'context') and r.context > 0:
554 msgname = type(r).__name__
557 # No context -> async notification that we feed to the callback
558 self.message_queue.put_nowait(r)
560 raise IOError(2, 'RPC reply message received in event handler')
562 def decode_incoming_msg(self, msg):
564 self.logger.warning('vpp_api.read failed')
567 i, ci = self.header.unpack_from(msg, 0)
568 if self.id_names[i] == 'rx_thread_exit':
572 # Decode message and returns a tuple.
574 msgdef = self.id_msgdef[i]
576 raise IOError(2, 'Reply message undefined')
578 r = self.decode(msgdef, msg)
582 def msg_handler_async(self, msg):
583 """Process a message from VPP in async mode.
585 In async mode, all messages are returned to the callback.
587 r = self.decode_incoming_msg(msg)
591 msgname = type(r).__name__
593 if self.event_callback:
594 self.event_callback(msgname, r)
596 def _control_ping(self, context):
597 """Send a ping command."""
598 self._call_vpp_async(self.control_ping_index,
599 self.control_ping_msgdef,
602 def _call_vpp(self, i, msgdef, multipart, **kwargs):
603 """Given a message, send the message and await a reply.
605 msgdef - the message packing definition
606 i - the message type index
607 multipart - True if the message returns multiple
609 context - context number - chosen at random if not
611 The remainder of the kwargs are the arguments to the API call.
613 The return value is the message or message array containing
614 the response. It will raise an IOError exception if there was
615 no response within the timeout window.
618 if 'context' not in kwargs:
619 context = self.get_context()
620 kwargs['context'] = context
622 context = kwargs['context']
623 kwargs['_vl_msg_id'] = i
624 b = self.encode(msgdef, kwargs)
626 vpp_api.vac_rx_suspend()
630 # Send a ping after the request - we use its response
631 # to detect that we have seen all results.
632 self._control_ping(context)
634 # Block until we get a reply.
639 raise IOError(2, 'VPP API client: read failed')
641 r = self.decode_incoming_msg(msg)
642 msgname = type(r).__name__
643 if context not in r or r.context == 0 or context != r.context:
644 self.message_queue.put_nowait(r)
650 if msgname == 'control_ping_reply':
655 vpp_api.vac_rx_resume()
659 def _call_vpp_async(self, i, msgdef, **kwargs):
660 """Given a message, send the message and await a reply.
662 msgdef - the message packing definition
663 i - the message type index
664 context - context number - chosen at random if not
666 The remainder of the kwargs are the arguments to the API call.
668 if 'context' not in kwargs:
669 context = self.get_context()
670 kwargs['context'] = context
672 context = kwargs['context']
673 kwargs['_vl_msg_id'] = i
674 b = self.encode(msgdef, kwargs)
678 def register_event_callback(self, callback):
679 """Register a callback for async messages.
681 This will be called for async notifications in sync mode,
682 and all messages in async mode. In sync mode, replies to
683 requests will not come here.
685 callback is a fn(msg_type_name, msg_type) that will be
686 called when a message comes in. While this function is
687 executing, note that (a) you are in a background thread and
688 may wish to use threading.Lock to protect your datastructures,
689 and (b) message processing from VPP will stop (so if you take
690 a long while about it you may provoke reply timeouts or cause
691 VPP to fill the RX buffer). Passing None will disable the
694 self.event_callback = callback
696 def thread_msg_handler(self):
697 """Python thread calling the user registerd message handler.
699 This is to emulate the old style event callback scheme. Modern
700 clients should provide their own thread to poll the event
704 r = self.message_queue.get()
705 msgname = type(r).__name__
706 if self.event_callback:
707 self.event_callback(msgname, r)