3 # Copyright (c) 2016 Cisco and/or its affiliates.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
17 from __future__ import print_function
29 if sys.version[0] == '2':
36 typedef void (*vac_callback_t)(unsigned char * data, int len);
37 typedef void (*vac_error_callback_t)(void *, unsigned char *, int);
38 int vac_connect(char * name, char * chroot_prefix, vac_callback_t cb,
40 int vac_disconnect(void);
41 int vac_read(char **data, int *l, unsigned short timeout);
42 int vac_write(char *data, int len);
43 void vac_free(void * msg);
45 int vac_get_msg_index(unsigned char * name);
46 int vac_msg_table_size(void);
47 int vac_msg_table_max_index(void);
49 void vac_rx_suspend (void);
50 void vac_rx_resume (void);
51 void vac_set_error_handler(vac_error_callback_t);
54 # Barfs on failure, no need to check success.
55 vpp_api = ffi.dlopen('libvppapiclient.so')
59 """Clean up VPP connection on shutdown."""
61 self.logger.debug('Cleaning up VPP on exit')
68 if sys.version[0] == '2':
74 @ffi.callback("void(unsigned char *, int)")
75 def vac_callback_sync(data, len):
76 vpp_object.msg_handler_sync(ffi.buffer(data, len))
79 @ffi.callback("void(unsigned char *, int)")
80 def vac_callback_async(data, len):
81 vpp_object.msg_handler_async(ffi.buffer(data, len))
84 @ffi.callback("void(void *, unsigned char *, int)")
85 def vac_error_handler(arg, msg, msg_len):
86 vpp_object.logger.warning("VPP API client:: %s", ffi.string(msg, msg_len))
93 class FuncWrapper(object):
94 def __init__(self, func):
96 self.__name__ = func.__name__
98 def __call__(self, **kwargs):
99 return self._func(**kwargs)
105 This class provides the APIs to VPP. The APIs are loaded
106 from provided .api.json files and makes functions accordingly.
107 These functions are documented in the VPP .api files, as they
108 are dynamically created.
110 Additionally, VPP can send callback messages; this class
111 provides a means to register a callback function to receive
112 these messages in a background thread.
114 def __init__(self, apifiles=None, testmode=False, async_thread=True,
115 logger=logging.getLogger('vpp_papi'), loglevel='debug'):
116 """Create a VPP API object.
118 apifiles is a list of files containing API
119 descriptions that will be loaded - methods will be
120 dynamically created reflecting these APIs. If not
121 provided this will load the API files from VPP's
122 default install location.
127 logging.basicConfig(level=getattr(logging, loglevel.upper()))
132 self.buffersize = 10000
133 self.connected = False
134 self.header = struct.Struct('>HI')
136 self.event_callback = None
137 self.message_queue = queue.Queue()
138 self.read_timeout = 0
139 self.vpp_api = vpp_api
141 self.event_thread = threading.Thread(
142 target=self.thread_msg_handler)
143 self.event_thread.daemon = True
144 self.event_thread.start()
147 # Pick up API definitions from default directory
148 apifiles = glob.glob('/usr/share/vpp/api/*.api.json')
150 for file in apifiles:
151 with open(file) as apidef_file:
152 api = json.load(apidef_file)
153 for t in api['types']:
154 self.add_type(t[0], t[1:])
156 for m in api['messages']:
157 self.add_message(m[0], m[1:])
158 self.apifiles = apifiles
161 if len(self.messages) == 0 and not testmode:
162 raise ValueError(1, 'Missing JSON message definitions')
164 # Make sure we allow VPP to clean up the message rings.
165 atexit.register(vpp_atexit, self)
167 # Register error handler
168 vpp_api.vac_set_error_handler(vac_error_handler)
170 class ContextId(object):
171 """Thread-safe provider of unique context IDs."""
174 self.lock = threading.Lock()
177 """Get a new unique (or, at least, not recently used) context."""
181 get_context = ContextId()
184 """Debug function: report current VPP API status to stdout."""
185 print('Connected') if self.connected else print('Not Connected')
186 print('Read API definitions from', ', '.join(self.apifiles))
188 def __struct(self, t, n=None, e=-1, vl=None):
189 """Create a packing structure for a message."""
190 base_types = {'u8': 'B',
200 if e > 0 and t == 'u8':
202 return struct.Struct('>' + str(e) + 's')
204 # Fixed array of base type
205 return [e, struct.Struct('>' + base_types[t])]
207 # Old style variable array
208 return [-1, struct.Struct('>' + base_types[t])]
210 # Variable length array
211 return [vl, struct.Struct('>s')] if t == 'u8' else \
212 [vl, struct.Struct('>' + base_types[t])]
214 return struct.Struct('>' + base_types[t])
216 if t in self.messages:
217 # Return a list in case of array
219 return [e, lambda self, encode, buf, offset, args: (
220 self.__struct_type(encode, self.messages[t], buf, offset,
223 return [vl, lambda self, encode, buf, offset, args: (
224 self.__struct_type(encode, self.messages[t], buf, offset,
228 raise NotImplementedError(1,
229 'No support for compound types ' + t)
230 return lambda self, encode, buf, offset, args: (
231 self.__struct_type(encode, self.messages[t], buf, offset, args)
234 raise ValueError(1, 'Invalid message type: ' + t)
236 def __struct_type(self, encode, msgdef, buf, offset, kwargs):
237 """Get a message packer or unpacker."""
239 return self.__struct_type_encode(msgdef, buf, offset, kwargs)
241 return self.__struct_type_decode(msgdef, buf, offset)
243 def __struct_type_encode(self, msgdef, buf, offset, kwargs):
248 if k not in msgdef['args']:
249 raise ValueError(1, 'Invalid field-name in message call ' + k)
251 for k, v in vpp_iterator(msgdef['args']):
256 e = kwargs[v[0]] if v[0] in kwargs else v[0]
259 size += v[1](self, True, buf, off + size,
267 buf[off:off + l] = bytearray(kwargs[k])
272 v[1].pack_into(buf, off + size, i)
276 size = v(self, True, buf, off, kwargs[k])
278 v.pack_into(buf, off, kwargs[k])
281 size = v.size if not type(v) is list else 0
283 return off + size - offset
285 def __getitem__(self, name):
286 if name in self.messages:
287 return self.messages[name]
290 def encode(self, msgdef, kwargs):
291 # Make suitably large buffer
292 buf = bytearray(self.buffersize)
294 size = self.__struct_type(True, msgdef, buf, offset, kwargs)
295 return buf[:offset + size]
297 def decode(self, msgdef, buf):
298 return self.__struct_type(False, msgdef, buf, 0, None)[1]
300 def __struct_type_decode(self, msgdef, buf, offset):
304 for k, v in vpp_iterator(msgdef['args']):
308 if callable(v[1]): # compound type
310 if v[0] in msgdef['args']: # vla
316 (s, l) = v[1](self, False, buf, off + size, None)
321 if type(v[0]) is int:
322 size = len(buf) - off
325 res.append(buf[off:off + size])
327 e = v[0] if type(v[0]) is int else res[v[2]]
329 e = (len(buf) - off) / v[1].size
334 lst.append(v[1].unpack_from(buf, off + size)[0])
338 (s, l) = v(self, False, buf, off, None)
342 res.append(v.unpack_from(buf, off)[0])
345 return off + size - offset, msgdef['return_tuple']._make(res)
347 def ret_tup(self, name):
348 if name in self.messages and 'return_tuple' in self.messages[name]:
349 return self.messages[name]['return_tuple']
352 def add_message(self, name, msgdef, typeonly=False):
353 if name in self.messages:
354 raise ValueError('Duplicate message name: ' + name)
356 args = collections.OrderedDict()
357 argtypes = collections.OrderedDict()
360 for i, f in enumerate(msgdef):
361 if type(f) is dict and 'crc' in f:
362 msg['crc'] = f['crc']
366 if len(f) == 3 and f[2] == 0 and i != len(msgdef) - 2:
367 raise ValueError('Variable Length Array must be last: ' + name)
368 args[field_name] = self.__struct(*f)
369 argtypes[field_name] = field_type
370 if len(f) == 4: # Find offset to # elements field
371 idx = list(args.keys()).index(f[3]) - i
372 args[field_name].append(idx)
373 fields.append(field_name)
374 msg['return_tuple'] = collections.namedtuple(name, fields,
376 self.messages[name] = msg
377 self.messages[name]['args'] = args
378 self.messages[name]['argtypes'] = argtypes
379 self.messages[name]['typeonly'] = typeonly
380 return self.messages[name]
382 def add_type(self, name, typedef):
383 return self.add_message('vl_api_' + name + '_t', typedef,
386 def make_function(self, name, i, msgdef, multipart, async):
388 f = lambda **kwargs: (self._call_vpp_async(i, msgdef, **kwargs))
390 f = lambda **kwargs: (self._call_vpp(i, msgdef, multipart,
392 args = self.messages[name]['args']
393 argtypes = self.messages[name]['argtypes']
394 f.__name__ = str(name)
395 f.__doc__ = ", ".join(["%s %s" %
396 (argtypes[k], k) for k in args.keys()])
401 if not hasattr(self, "_api"):
402 raise Exception("Not connected, api definitions not available")
405 def _register_functions(self, async=False):
406 self.id_names = [None] * (self.vpp_dictionary_maxid + 1)
407 self.id_msgdef = [None] * (self.vpp_dictionary_maxid + 1)
409 for name, msgdef in vpp_iterator(self.messages):
410 if self.messages[name]['typeonly']:
412 crc = self.messages[name]['crc']
413 n = name + '_' + crc[2:]
414 i = vpp_api.vac_get_msg_index(n.encode())
416 self.id_msgdef[i] = msgdef
417 self.id_names[i] = name
418 multipart = True if name.find('_dump') > 0 else False
419 f = self.make_function(name, i, msgdef, multipart, async)
420 setattr(self._api, name, FuncWrapper(f))
422 # old API stuff starts here - will be removed in 17.07
423 if hasattr(self, name):
425 3, "Conflicting name in JSON definition: `%s'" % name)
426 setattr(self, name, f)
427 # old API stuff ends here
430 'No such message type or failed CRC checksum: %s', n)
432 def _write(self, buf):
433 """Send a binary-packed message to VPP."""
434 if not self.connected:
435 raise IOError(1, 'Not connected')
436 return vpp_api.vac_write(ffi.from_buffer(buf), len(buf))
439 if not self.connected:
440 raise IOError(1, 'Not connected')
441 mem = ffi.new("char **")
442 size = ffi.new("int *")
443 rv = vpp_api.vac_read(mem, size, self.read_timeout)
445 raise IOError(rv, 'vac_read filed')
446 msg = bytes(ffi.buffer(mem[0], size[0]))
447 vpp_api.vac_free(mem[0])
450 def connect_internal(self, name, msg_handler, chroot_prefix, rx_qlen,
452 rv = vpp_api.vac_connect(name.encode(), chroot_prefix.encode(),
453 msg_handler, rx_qlen)
455 raise IOError(2, 'Connect failed')
456 self.connected = True
458 self.vpp_dictionary_maxid = vpp_api.vac_msg_table_max_index()
459 self._register_functions(async=async)
461 # Initialise control ping
462 crc = self.messages['control_ping']['crc']
463 self.control_ping_index = vpp_api.vac_get_msg_index(
464 ('control_ping' + '_' + crc[2:]).encode())
465 self.control_ping_msgdef = self.messages['control_ping']
468 def connect(self, name, chroot_prefix=ffi.NULL, async=False, rx_qlen=32):
471 name - the name of the client.
472 chroot_prefix - if VPP is chroot'ed, the prefix of the jail
473 async - if true, messages are sent without waiting for a reply
474 rx_qlen - the length of the VPP message receive queue between
477 msg_handler = vac_callback_sync if not async else vac_callback_async
478 return self.connect_internal(name, msg_handler, chroot_prefix, rx_qlen,
481 def connect_sync(self, name, chroot_prefix=ffi.NULL, rx_qlen=32):
482 """Attach to VPP in synchronous mode. Application must poll for events.
484 name - the name of the client.
485 chroot_prefix - if VPP is chroot'ed, the prefix of the jail
486 rx_qlen - the length of the VPP message receive queue between
490 return self.connect_internal(name, ffi.NULL, chroot_prefix, rx_qlen,
493 def disconnect(self):
494 """Detach from VPP."""
495 rv = vpp_api.vac_disconnect()
496 self.connected = False
499 def msg_handler_sync(self, msg):
500 """Process an incoming message from VPP in sync mode.
502 The message may be a reply or it may be an async notification.
504 r = self.decode_incoming_msg(msg)
508 # If we have a context, then use the context to find any
509 # request waiting for a reply
511 if hasattr(r, 'context') and r.context > 0:
514 msgname = type(r).__name__
517 # No context -> async notification that we feed to the callback
518 self.message_queue.put_nowait(r)
520 raise IOError(2, 'RPC reply message received in event handler')
522 def decode_incoming_msg(self, msg):
524 self.logger.warning('vpp_api.read failed')
527 i, ci = self.header.unpack_from(msg, 0)
528 if self.id_names[i] == 'rx_thread_exit':
532 # Decode message and returns a tuple.
534 msgdef = self.id_msgdef[i]
536 raise IOError(2, 'Reply message undefined')
538 r = self.decode(msgdef, msg)
542 def msg_handler_async(self, msg):
543 """Process a message from VPP in async mode.
545 In async mode, all messages are returned to the callback.
547 r = self.decode_incoming_msg(msg)
551 msgname = type(r).__name__
553 if self.event_callback:
554 self.event_callback(msgname, r)
556 def _control_ping(self, context):
557 """Send a ping command."""
558 self._call_vpp_async(self.control_ping_index,
559 self.control_ping_msgdef,
562 def _call_vpp(self, i, msgdef, multipart, **kwargs):
563 """Given a message, send the message and await a reply.
565 msgdef - the message packing definition
566 i - the message type index
567 multipart - True if the message returns multiple
569 context - context number - chosen at random if not
571 The remainder of the kwargs are the arguments to the API call.
573 The return value is the message or message array containing
574 the response. It will raise an IOError exception if there was
575 no response within the timeout window.
578 if 'context' not in kwargs:
579 context = self.get_context()
580 kwargs['context'] = context
582 context = kwargs['context']
583 kwargs['_vl_msg_id'] = i
584 b = self.encode(msgdef, kwargs)
586 vpp_api.vac_rx_suspend()
590 # Send a ping after the request - we use its response
591 # to detect that we have seen all results.
592 self._control_ping(context)
594 # Block until we get a reply.
599 raise IOError(2, 'VPP API client: read failed')
601 r = self.decode_incoming_msg(msg)
602 msgname = type(r).__name__
603 if context not in r or r.context == 0 or context != r.context:
604 self.message_queue.put_nowait(r)
610 if msgname == 'control_ping_reply':
615 vpp_api.vac_rx_resume()
619 def _call_vpp_async(self, i, msgdef, **kwargs):
620 """Given a message, send the message and await a reply.
622 msgdef - the message packing definition
623 i - the message type index
624 context - context number - chosen at random if not
626 The remainder of the kwargs are the arguments to the API call.
628 if 'context' not in kwargs:
629 context = self.get_context()
630 kwargs['context'] = context
632 context = kwargs['context']
633 kwargs['_vl_msg_id'] = i
634 b = self.encode(msgdef, kwargs)
638 def register_event_callback(self, callback):
639 """Register a callback for async messages.
641 This will be called for async notifications in sync mode,
642 and all messages in async mode. In sync mode, replies to
643 requests will not come here.
645 callback is a fn(msg_type_name, msg_type) that will be
646 called when a message comes in. While this function is
647 executing, note that (a) you are in a background thread and
648 may wish to use threading.Lock to protect your datastructures,
649 and (b) message processing from VPP will stop (so if you take
650 a long while about it you may provoke reply timeouts or cause
651 VPP to fill the RX buffer). Passing None will disable the
654 self.event_callback = callback
656 def thread_msg_handler(self):
657 """Python thread calling the user registerd message handler.
659 This is to emulate the old style event callback scheme. Modern
660 clients should provide their own thread to poll the event
664 r = self.message_queue.get()
665 msgname = type(r).__name__
666 if self.event_callback:
667 self.event_callback(msgname, r)