3 # Copyright (c) 2016 Cisco and/or its affiliates.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
17 from __future__ import print_function
29 if sys.version[0] == '2':
36 typedef void (*vac_callback_t)(unsigned char * data, int len);
37 typedef void (*vac_error_callback_t)(void *, unsigned char *, int);
38 int vac_connect(char * name, char * chroot_prefix, vac_callback_t cb,
40 int vac_disconnect(void);
41 int vac_read(char **data, int *l, unsigned short timeout);
42 int vac_write(char *data, int len);
43 void vac_free(void * msg);
45 int vac_get_msg_index(unsigned char * name);
46 int vac_msg_table_size(void);
47 int vac_msg_table_max_index(void);
49 void vac_rx_suspend (void);
50 void vac_rx_resume (void);
51 void vac_set_error_handler(vac_error_callback_t);
54 # Barfs on failure, no need to check success.
55 vpp_api = ffi.dlopen('libvppapiclient.so')
59 """Clean up VPP connection on shutdown."""
61 self.logger.debug('Cleaning up VPP on exit')
68 if sys.version[0] == '2':
74 @ffi.callback("void(unsigned char *, int)")
75 def vac_callback_sync(data, len):
76 vpp_object.msg_handler_sync(ffi.buffer(data, len))
79 @ffi.callback("void(unsigned char *, int)")
80 def vac_callback_async(data, len):
81 vpp_object.msg_handler_async(ffi.buffer(data, len))
84 @ffi.callback("void(void *, unsigned char *, int)")
85 def vac_error_handler(arg, msg, msg_len):
86 vpp_object.logger.warning("VPP API client:: %s", ffi.string(msg, msg_len))
93 class FuncWrapper(object):
94 def __init__(self, func):
96 self.__name__ = func.__name__
98 def __call__(self, **kwargs):
99 return self._func(**kwargs)
105 This class provides the APIs to VPP. The APIs are loaded
106 from provided .api.json files and makes functions accordingly.
107 These functions are documented in the VPP .api files, as they
108 are dynamically created.
110 Additionally, VPP can send callback messages; this class
111 provides a means to register a callback function to receive
112 these messages in a background thread.
114 def __init__(self, apifiles=None, testmode=False, async_thread=True,
115 logger=logging.getLogger('vpp_papi'), loglevel='debug'):
116 """Create a VPP API object.
118 apifiles is a list of files containing API
119 descriptions that will be loaded - methods will be
120 dynamically created reflecting these APIs. If not
121 provided this will load the API files from VPP's
122 default install location.
127 logging.basicConfig(level=getattr(logging, loglevel.upper()))
132 self.buffersize = 10000
133 self.connected = False
134 self.header = struct.Struct('>HI')
136 self.event_callback = None
137 self.message_queue = queue.Queue()
138 self.read_timeout = 0
139 self.vpp_api = vpp_api
141 self.event_thread = threading.Thread(
142 target=self.thread_msg_handler)
143 self.event_thread.daemon = True
144 self.event_thread.start()
147 # Pick up API definitions from default directory
148 apifiles = glob.glob('/usr/share/vpp/api/*.api.json')
150 for file in apifiles:
151 with open(file) as apidef_file:
152 api = json.load(apidef_file)
153 for t in api['types']:
154 self.add_type(t[0], t[1:])
156 for m in api['messages']:
157 self.add_message(m[0], m[1:])
158 self.apifiles = apifiles
161 if len(self.messages) == 0 and not testmode:
162 raise ValueError(1, 'Missing JSON message definitions')
164 # Make sure we allow VPP to clean up the message rings.
165 atexit.register(vpp_atexit, self)
167 # Register error handler
168 vpp_api.vac_set_error_handler(vac_error_handler)
170 class ContextId(object):
171 """Thread-safe provider of unique context IDs."""
174 self.lock = threading.Lock()
177 """Get a new unique (or, at least, not recently used) context."""
181 get_context = ContextId()
184 """Debug function: report current VPP API status to stdout."""
185 print('Connected') if self.connected else print('Not Connected')
186 print('Read API definitions from', ', '.join(self.apifiles))
188 def __struct(self, t, n=None, e=-1, vl=None):
189 """Create a packing structure for a message."""
190 base_types = {'u8': 'B',
200 if e > 0 and t == 'u8':
202 return struct.Struct('>' + str(e) + 's')
204 # Fixed array of base type
205 return [e, struct.Struct('>' + base_types[t])]
207 # Old style variable array
208 return [-1, struct.Struct('>' + base_types[t])]
210 # Variable length array
211 return [vl, struct.Struct('>s')] if t == 'u8' else \
212 [vl, struct.Struct('>' + base_types[t])]
214 return struct.Struct('>' + base_types[t])
216 if t in self.messages:
217 # Return a list in case of array
219 return [e, lambda self, encode, buf, offset, args: (
220 self.__struct_type(encode, self.messages[t], buf, offset,
223 return [vl, lambda self, encode, buf, offset, args: (
224 self.__struct_type(encode, self.messages[t], buf, offset,
228 raise NotImplementedError(1,
229 'No support for compound types ' + t)
230 return lambda self, encode, buf, offset, args: (
231 self.__struct_type(encode, self.messages[t], buf, offset, args)
234 raise ValueError(1, 'Invalid message type: ' + t)
236 def __struct_type(self, encode, msgdef, buf, offset, kwargs):
237 """Get a message packer or unpacker."""
239 return self.__struct_type_encode(msgdef, buf, offset, kwargs)
241 return self.__struct_type_decode(msgdef, buf, offset)
243 def __struct_type_encode(self, msgdef, buf, offset, kwargs):
248 if k not in msgdef['args']:
249 raise ValueError(1,'Non existing argument [' + k + ']' + \
250 ' used in call to: ' + \
251 self.id_names[kwargs['_vl_msg_id']] + '()' )
254 for k, v in vpp_iterator(msgdef['args']):
259 e = kwargs[v[0]] if v[0] in kwargs else v[0]
262 size += v[1](self, True, buf, off + size,
270 buf[off:off + l] = bytearray(kwargs[k])
275 v[1].pack_into(buf, off + size, i)
279 size = v(self, True, buf, off, kwargs[k])
281 v.pack_into(buf, off, kwargs[k])
284 size = v.size if not type(v) is list else 0
286 return off + size - offset
288 def __getitem__(self, name):
289 if name in self.messages:
290 return self.messages[name]
293 def encode(self, msgdef, kwargs):
294 # Make suitably large buffer
295 buf = bytearray(self.buffersize)
297 size = self.__struct_type(True, msgdef, buf, offset, kwargs)
298 return buf[:offset + size]
300 def decode(self, msgdef, buf):
301 return self.__struct_type(False, msgdef, buf, 0, None)[1]
303 def __struct_type_decode(self, msgdef, buf, offset):
307 for k, v in vpp_iterator(msgdef['args']):
311 if callable(v[1]): # compound type
313 if v[0] in msgdef['args']: # vla
319 (s, l) = v[1](self, False, buf, off + size, None)
324 if type(v[0]) is int:
325 size = len(buf) - off
328 res.append(buf[off:off + size])
330 e = v[0] if type(v[0]) is int else res[v[2]]
332 e = (len(buf) - off) / v[1].size
337 lst.append(v[1].unpack_from(buf, off + size)[0])
341 (s, l) = v(self, False, buf, off, None)
345 res.append(v.unpack_from(buf, off)[0])
348 return off + size - offset, msgdef['return_tuple']._make(res)
350 def ret_tup(self, name):
351 if name in self.messages and 'return_tuple' in self.messages[name]:
352 return self.messages[name]['return_tuple']
355 def add_message(self, name, msgdef, typeonly=False):
356 if name in self.messages:
357 raise ValueError('Duplicate message name: ' + name)
359 args = collections.OrderedDict()
360 argtypes = collections.OrderedDict()
363 for i, f in enumerate(msgdef):
364 if type(f) is dict and 'crc' in f:
365 msg['crc'] = f['crc']
369 if len(f) == 3 and f[2] == 0 and i != len(msgdef) - 2:
370 raise ValueError('Variable Length Array must be last: ' + name)
371 args[field_name] = self.__struct(*f)
372 argtypes[field_name] = field_type
373 if len(f) == 4: # Find offset to # elements field
374 idx = list(args.keys()).index(f[3]) - i
375 args[field_name].append(idx)
376 fields.append(field_name)
377 msg['return_tuple'] = collections.namedtuple(name, fields,
379 self.messages[name] = msg
380 self.messages[name]['args'] = args
381 self.messages[name]['argtypes'] = argtypes
382 self.messages[name]['typeonly'] = typeonly
383 return self.messages[name]
385 def add_type(self, name, typedef):
386 return self.add_message('vl_api_' + name + '_t', typedef,
389 def make_function(self, name, i, msgdef, multipart, async):
391 f = lambda **kwargs: (self._call_vpp_async(i, msgdef, **kwargs))
393 f = lambda **kwargs: (self._call_vpp(i, msgdef, multipart,
395 args = self.messages[name]['args']
396 argtypes = self.messages[name]['argtypes']
397 f.__name__ = str(name)
398 f.__doc__ = ", ".join(["%s %s" %
399 (argtypes[k], k) for k in args.keys()])
404 if not hasattr(self, "_api"):
405 raise Exception("Not connected, api definitions not available")
408 def _register_functions(self, async=False):
409 self.id_names = [None] * (self.vpp_dictionary_maxid + 1)
410 self.id_msgdef = [None] * (self.vpp_dictionary_maxid + 1)
412 for name, msgdef in vpp_iterator(self.messages):
413 if self.messages[name]['typeonly']:
415 crc = self.messages[name]['crc']
416 n = name + '_' + crc[2:]
417 i = vpp_api.vac_get_msg_index(n.encode())
419 self.id_msgdef[i] = msgdef
420 self.id_names[i] = name
421 multipart = True if name.find('_dump') > 0 else False
422 f = self.make_function(name, i, msgdef, multipart, async)
423 setattr(self._api, name, FuncWrapper(f))
425 # old API stuff starts here - will be removed in 17.07
426 if hasattr(self, name):
428 3, "Conflicting name in JSON definition: `%s'" % name)
429 setattr(self, name, f)
430 # old API stuff ends here
433 'No such message type or failed CRC checksum: %s', n)
435 def _write(self, buf):
436 """Send a binary-packed message to VPP."""
437 if not self.connected:
438 raise IOError(1, 'Not connected')
439 return vpp_api.vac_write(ffi.from_buffer(buf), len(buf))
442 if not self.connected:
443 raise IOError(1, 'Not connected')
444 mem = ffi.new("char **")
445 size = ffi.new("int *")
446 rv = vpp_api.vac_read(mem, size, self.read_timeout)
448 raise IOError(rv, 'vac_read failed')
449 msg = bytes(ffi.buffer(mem[0], size[0]))
450 vpp_api.vac_free(mem[0])
453 def connect_internal(self, name, msg_handler, chroot_prefix, rx_qlen,
455 pfx = chroot_prefix.encode() if chroot_prefix else ffi.NULL
456 rv = vpp_api.vac_connect(name.encode(), pfx, msg_handler, rx_qlen)
458 raise IOError(2, 'Connect failed')
459 self.connected = True
461 self.vpp_dictionary_maxid = vpp_api.vac_msg_table_max_index()
462 self._register_functions(async=async)
464 # Initialise control ping
465 crc = self.messages['control_ping']['crc']
466 self.control_ping_index = vpp_api.vac_get_msg_index(
467 ('control_ping' + '_' + crc[2:]).encode())
468 self.control_ping_msgdef = self.messages['control_ping']
471 def connect(self, name, chroot_prefix=None, async=False, rx_qlen=32):
474 name - the name of the client.
475 chroot_prefix - if VPP is chroot'ed, the prefix of the jail
476 async - if true, messages are sent without waiting for a reply
477 rx_qlen - the length of the VPP message receive queue between
480 msg_handler = vac_callback_sync if not async else vac_callback_async
481 return self.connect_internal(name, msg_handler, chroot_prefix, rx_qlen,
484 def connect_sync(self, name, chroot_prefix=None, rx_qlen=32):
485 """Attach to VPP in synchronous mode. Application must poll for events.
487 name - the name of the client.
488 chroot_prefix - if VPP is chroot'ed, the prefix of the jail
489 rx_qlen - the length of the VPP message receive queue between
493 return self.connect_internal(name, ffi.NULL, chroot_prefix, rx_qlen,
496 def disconnect(self):
497 """Detach from VPP."""
498 rv = vpp_api.vac_disconnect()
499 self.connected = False
502 def msg_handler_sync(self, msg):
503 """Process an incoming message from VPP in sync mode.
505 The message may be a reply or it may be an async notification.
507 r = self.decode_incoming_msg(msg)
511 # If we have a context, then use the context to find any
512 # request waiting for a reply
514 if hasattr(r, 'context') and r.context > 0:
517 msgname = type(r).__name__
520 # No context -> async notification that we feed to the callback
521 self.message_queue.put_nowait(r)
523 raise IOError(2, 'RPC reply message received in event handler')
525 def decode_incoming_msg(self, msg):
527 self.logger.warning('vpp_api.read failed')
530 i, ci = self.header.unpack_from(msg, 0)
531 if self.id_names[i] == 'rx_thread_exit':
535 # Decode message and returns a tuple.
537 msgdef = self.id_msgdef[i]
539 raise IOError(2, 'Reply message undefined')
541 r = self.decode(msgdef, msg)
545 def msg_handler_async(self, msg):
546 """Process a message from VPP in async mode.
548 In async mode, all messages are returned to the callback.
550 r = self.decode_incoming_msg(msg)
554 msgname = type(r).__name__
556 if self.event_callback:
557 self.event_callback(msgname, r)
559 def _control_ping(self, context):
560 """Send a ping command."""
561 self._call_vpp_async(self.control_ping_index,
562 self.control_ping_msgdef,
565 def _call_vpp(self, i, msgdef, multipart, **kwargs):
566 """Given a message, send the message and await a reply.
568 msgdef - the message packing definition
569 i - the message type index
570 multipart - True if the message returns multiple
572 context - context number - chosen at random if not
574 The remainder of the kwargs are the arguments to the API call.
576 The return value is the message or message array containing
577 the response. It will raise an IOError exception if there was
578 no response within the timeout window.
581 if 'context' not in kwargs:
582 context = self.get_context()
583 kwargs['context'] = context
585 context = kwargs['context']
586 kwargs['_vl_msg_id'] = i
587 b = self.encode(msgdef, kwargs)
589 vpp_api.vac_rx_suspend()
593 # Send a ping after the request - we use its response
594 # to detect that we have seen all results.
595 self._control_ping(context)
597 # Block until we get a reply.
602 raise IOError(2, 'VPP API client: read failed')
604 r = self.decode_incoming_msg(msg)
605 msgname = type(r).__name__
606 if context not in r or r.context == 0 or context != r.context:
607 self.message_queue.put_nowait(r)
613 if msgname == 'control_ping_reply':
618 vpp_api.vac_rx_resume()
622 def _call_vpp_async(self, i, msgdef, **kwargs):
623 """Given a message, send the message and await a reply.
625 msgdef - the message packing definition
626 i - the message type index
627 context - context number - chosen at random if not
629 The remainder of the kwargs are the arguments to the API call.
631 if 'context' not in kwargs:
632 context = self.get_context()
633 kwargs['context'] = context
635 context = kwargs['context']
636 kwargs['_vl_msg_id'] = i
637 b = self.encode(msgdef, kwargs)
641 def register_event_callback(self, callback):
642 """Register a callback for async messages.
644 This will be called for async notifications in sync mode,
645 and all messages in async mode. In sync mode, replies to
646 requests will not come here.
648 callback is a fn(msg_type_name, msg_type) that will be
649 called when a message comes in. While this function is
650 executing, note that (a) you are in a background thread and
651 may wish to use threading.Lock to protect your datastructures,
652 and (b) message processing from VPP will stop (so if you take
653 a long while about it you may provoke reply timeouts or cause
654 VPP to fill the RX buffer). Passing None will disable the
657 self.event_callback = callback
659 def thread_msg_handler(self):
660 """Python thread calling the user registerd message handler.
662 This is to emulate the old style event callback scheme. Modern
663 clients should provide their own thread to poll the event
667 r = self.message_queue.get()
668 msgname = type(r).__name__
669 if self.event_callback:
670 self.event_callback(msgname, r)