3 # Copyright (c) 2016 Cisco and/or its affiliates.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
17 from __future__ import print_function
18 import sys, os, logging, collections, struct, json, threading, glob
24 typedef void (*vac_callback_t)(unsigned char * data, int len);
25 typedef void (*vac_error_callback_t)(void *, unsigned char *, int);
26 int vac_connect(char * name, char * chroot_prefix, vac_callback_t cb,
28 int vac_disconnect(void);
29 int vac_read(char **data, int *l, unsigned short timeout);
30 int vac_write(char *data, int len);
31 void vac_free(void * msg);
33 int vac_get_msg_index(unsigned char * name);
34 int vac_msg_table_size(void);
35 int vac_msg_table_max_index(void);
37 void vac_rx_suspend (void);
38 void vac_rx_resume (void);
39 void vac_set_error_handler(vac_error_callback_t);
42 # Barfs on failure, no need to check success.
43 vpp_api = ffi.dlopen('libvppapiclient.so')
46 """Clean up VPP connection on shutdown."""
48 self.logger.debug('Cleaning up VPP on exit')
53 @ffi.callback("void(unsigned char *, int)")
54 def vac_callback_sync(data, len):
55 vpp_object.msg_handler_sync(ffi.buffer(data, len))
56 @ffi.callback("void(unsigned char *, int)")
57 def vac_callback_async(data, len):
58 vpp_object.msg_handler_async(ffi.buffer(data, len))
59 @ffi.callback("void(void *, unsigned char *, int)")
60 def vac_error_handler(arg, msg, msg_len):
61 vpp_object.logger.warning("PNEUM: %s", ffi.string(msg, msg_len))
67 class FuncWrapper(object):
68 def __init__(self, func):
70 self.__name__ = func.__name__
72 def __call__(self, **kwargs):
73 return self._func(**kwargs)
79 This class provides the APIs to VPP. The APIs are loaded
80 from provided .api.json files and makes functions accordingly.
81 These functions are documented in the VPP .api files, as they
82 are dynamically created.
84 Additionally, VPP can send callback messages; this class
85 provides a means to register a callback function to receive
86 these messages in a background thread.
88 def __init__(self, apifiles = None, testmode = False, async_thread = True,
89 logger = logging.getLogger('vpp_papi'), loglevel = 'debug'):
90 """Create a VPP API object.
92 apifiles is a list of files containing API
93 descriptions that will be loaded - methods will be
94 dynamically created reflecting these APIs. If not
95 provided this will load the API files from VPP's
96 default install location.
101 logging.basicConfig(level=getattr(logging, loglevel.upper()))
106 self.buffersize = 10000
107 self.connected = False
108 self.header = struct.Struct('>HI')
110 self.event_callback = None
111 self.message_queue = Queue.Queue()
112 self.read_timeout = 0
113 self.vpp_api = vpp_api
115 self.event_thread = threading.Thread(target=self.thread_msg_handler)
116 self.event_thread.daemon = True
117 self.event_thread.start()
120 # Pick up API definitions from default directory
121 apifiles = glob.glob('/usr/share/vpp/api/*.api.json')
123 for file in apifiles:
124 with open(file) as apidef_file:
125 api = json.load(apidef_file)
126 for t in api['types']:
127 self.add_type(t[0], t[1:])
129 for m in api['messages']:
130 self.add_message(m[0], m[1:])
131 self.apifiles = apifiles
134 if len(self.messages) == 0 and not testmode:
135 raise ValueError(1, 'Missing JSON message definitions')
137 # Make sure we allow VPP to clean up the message rings.
138 atexit.register(vpp_atexit, self)
140 # Register error handler
141 vpp_api.vac_set_error_handler(vac_error_handler)
143 class ContextId(object):
144 """Thread-safe provider of unique context IDs."""
147 self.lock = threading.Lock()
149 """Get a new unique (or, at least, not recently used) context."""
153 get_context = ContextId()
156 """Debug function: report current VPP API status to stdout."""
157 print('Connected') if self.connected else print('Not Connected')
158 print('Read API definitions from', ', '.join(self.apifiles))
160 def __struct (self, t, n = None, e = -1, vl = None):
161 """Create a packing structure for a message."""
162 base_types = { 'u8' : 'B',
173 if e > 0 and t == 'u8':
175 return struct.Struct('>' + str(e) + 's')
177 # Fixed array of base type
178 return [e, struct.Struct('>' + base_types[t])]
180 # Old style variable array
181 return [-1, struct.Struct('>' + base_types[t])]
183 # Variable length array
184 return [vl, struct.Struct('>s')] if t == 'u8' else \
185 [vl, struct.Struct('>' + base_types[t])]
187 return struct.Struct('>' + base_types[t])
189 if t in self.messages:
190 ### Return a list in case of array ###
192 return [e, lambda self, encode, buf, offset, args: (
193 self.__struct_type(encode, self.messages[t], buf, offset,
196 return [vl, lambda self, encode, buf, offset, args: (
197 self.__struct_type(encode, self.messages[t], buf, offset,
201 raise NotImplementedError(1, 'No support for compound types ' + t)
202 return lambda self, encode, buf, offset, args: (
203 self.__struct_type(encode, self.messages[t], buf, offset, args)
206 raise ValueError(1, 'Invalid message type: ' + t)
208 def __struct_type(self, encode, msgdef, buf, offset, kwargs):
209 """Get a message packer or unpacker."""
211 return self.__struct_type_encode(msgdef, buf, offset, kwargs)
213 return self.__struct_type_decode(msgdef, buf, offset)
215 def __struct_type_encode(self, msgdef, buf, offset, kwargs):
220 if k not in msgdef['args']:
221 raise ValueError(1, 'Invalid field-name in message call ' + k)
223 for k,v in msgdef['args'].iteritems():
228 e = kwargs[v[0]] if v[0] in kwargs else v[0]
231 size += v[1](self, True, buf, off + size,
239 buf[off:off + l] = bytearray(kwargs[k])
244 v[1].pack_into(buf, off + size, i)
248 size = v(self, True, buf, off, kwargs[k])
250 v.pack_into(buf, off, kwargs[k])
253 size = v.size if not type(v) is list else 0
255 return off + size - offset
258 def __getitem__(self, name):
259 if name in self.messages:
260 return self.messages[name]
263 def encode(self, msgdef, kwargs):
264 # Make suitably large buffer
265 buf = bytearray(self.buffersize)
267 size = self.__struct_type(True, msgdef, buf, offset, kwargs)
268 return buf[:offset + size]
270 def decode(self, msgdef, buf):
271 return self.__struct_type(False, msgdef, buf, 0, None)[1]
273 def __struct_type_decode(self, msgdef, buf, offset):
277 for k,v in msgdef['args'].iteritems():
281 if callable(v[1]): # compound type
283 if v[0] in msgdef['args']: # vla
289 (s,l) = v[1](self, False, buf, off + size, None)
294 if type(v[0]) is int:
295 size = len(buf) - off
298 res.append(buf[off:off + size])
300 e = v[0] if type(v[0]) is int else res[v[2]]
302 e = (len(buf) - off) / v[1].size
307 lst.append(v[1].unpack_from(buf, off + size)[0])
311 (s,l) = v(self, False, buf, off, None)
315 res.append(v.unpack_from(buf, off)[0])
318 return off + size - offset, msgdef['return_tuple']._make(res)
320 def ret_tup(self, name):
321 if name in self.messages and 'return_tuple' in self.messages[name]:
322 return self.messages[name]['return_tuple']
325 def add_message(self, name, msgdef, typeonly = False):
326 if name in self.messages:
327 raise ValueError('Duplicate message name: ' + name)
329 args = collections.OrderedDict()
330 argtypes = collections.OrderedDict()
333 for i, f in enumerate(msgdef):
334 if type(f) is dict and 'crc' in f:
335 msg['crc'] = f['crc']
339 if len(f) == 3 and f[2] == 0 and i != len(msgdef) - 2:
340 raise ValueError('Variable Length Array must be last: ' + name)
341 args[field_name] = self.__struct(*f)
342 argtypes[field_name] = field_type
343 if len(f) == 4: # Find offset to # elements field
344 args[field_name].append(args.keys().index(f[3]) - i)
345 fields.append(field_name)
346 msg['return_tuple'] = collections.namedtuple(name, fields,
348 self.messages[name] = msg
349 self.messages[name]['args'] = args
350 self.messages[name]['argtypes'] = argtypes
351 self.messages[name]['typeonly'] = typeonly
352 return self.messages[name]
354 def add_type(self, name, typedef):
355 return self.add_message('vl_api_' + name + '_t', typedef, typeonly=True)
357 def make_function(self, name, i, msgdef, multipart, async):
359 f = lambda **kwargs: (self._call_vpp_async(i, msgdef, **kwargs))
361 f = lambda **kwargs: (self._call_vpp(i, msgdef, multipart, **kwargs))
362 args = self.messages[name]['args']
363 argtypes = self.messages[name]['argtypes']
364 f.__name__ = str(name)
365 f.__doc__ = ", ".join(["%s %s" % (argtypes[k], k) for k in args.keys()])
370 if not hasattr(self, "_api"):
371 raise Exception("Not connected, api definitions not available")
374 def _register_functions(self, async=False):
375 self.id_names = [None] * (self.vpp_dictionary_maxid + 1)
376 self.id_msgdef = [None] * (self.vpp_dictionary_maxid + 1)
378 for name, msgdef in self.messages.iteritems():
379 if self.messages[name]['typeonly']: continue
380 crc = self.messages[name]['crc']
381 n = name + '_' + crc[2:]
382 i = vpp_api.vac_get_msg_index(bytes(n))
384 self.id_msgdef[i] = msgdef
385 self.id_names[i] = name
386 multipart = True if name.find('_dump') > 0 else False
387 f = self.make_function(name, i, msgdef, multipart, async)
388 setattr(self._api, name, FuncWrapper(f))
390 # old API stuff starts here - will be removed in 17.07
391 if hasattr(self, name):
393 3, "Conflicting name in JSON definition: `%s'" % name)
394 setattr(self, name, f)
395 # old API stuff ends here
397 self.logger.debug('No such message type or failed CRC checksum: %s', n)
399 def _write (self, buf):
400 """Send a binary-packed message to VPP."""
401 if not self.connected:
402 raise IOError(1, 'Not connected')
403 return vpp_api.vac_write(str(buf), len(buf))
406 if not self.connected:
407 raise IOError(1, 'Not connected')
408 mem = ffi.new("char **")
409 size = ffi.new("int *")
410 rv = vpp_api.vac_read(mem, size, self.read_timeout)
412 raise IOError(rv, 'vac_read filed')
413 msg = bytes(ffi.buffer(mem[0], size[0]))
414 vpp_api.vac_free(mem[0])
417 def connect_internal(self, name, msg_handler, chroot_prefix, rx_qlen, async):
418 rv = vpp_api.vac_connect(name, chroot_prefix, msg_handler, rx_qlen)
420 raise IOError(2, 'Connect failed')
421 self.connected = True
423 self.vpp_dictionary_maxid = vpp_api.vac_msg_table_max_index()
424 self._register_functions(async=async)
426 # Initialise control ping
427 crc = self.messages['control_ping']['crc']
428 self.control_ping_index = \
429 vpp_api.vac_get_msg_index(
430 bytes('control_ping' + '_' + crc[2:]))
431 self.control_ping_msgdef = self.messages['control_ping']
433 def connect(self, name, chroot_prefix = ffi.NULL,
434 async = False, rx_qlen = 32):
437 name - the name of the client.
438 chroot_prefix - if VPP is chroot'ed, the prefix of the jail
439 async - if true, messages are sent without waiting for a reply
440 rx_qlen - the length of the VPP message receive queue between
443 msg_handler = vac_callback_sync if not async \
444 else vac_callback_async
445 return self.connect_internal(name, msg_handler, chroot_prefix, rx_qlen,
448 def connect_sync (self, name, chroot_prefix = ffi.NULL, rx_qlen = 32):
449 """Attach to VPP in synchronous mode. Application must poll for events.
451 name - the name of the client.
452 chroot_prefix - if VPP is chroot'ed, the prefix of the jail
453 rx_qlen - the length of the VPP message receive queue between
457 return self.connect_internal(name, ffi.NULL, chroot_prefix, rx_qlen,
460 def disconnect(self):
461 """Detach from VPP."""
462 rv = vpp_api.vac_disconnect()
463 self.connected = False
466 def msg_handler_sync(self, msg):
467 """Process an incoming message from VPP in sync mode.
469 The message may be a reply or it may be an async notification.
471 r = self.decode_incoming_msg(msg)
475 # If we have a context, then use the context to find any
476 # request waiting for a reply
478 if hasattr(r, 'context') and r.context > 0:
481 msgname = type(r).__name__
484 # No context -> async notification that we feed to the callback
485 self.message_queue.put_nowait(r)
487 raise IOError(2, 'RPC reply message received in event handler')
489 def decode_incoming_msg(self, msg):
491 self.logger.warning('vpp_api.read failed')
494 i, ci = self.header.unpack_from(msg, 0)
495 if self.id_names[i] == 'rx_thread_exit':
499 # Decode message and returns a tuple.
501 msgdef = self.id_msgdef[i]
503 raise IOError(2, 'Reply message undefined')
505 r = self.decode(msgdef, msg)
509 def msg_handler_async(self, msg):
510 """Process a message from VPP in async mode.
512 In async mode, all messages are returned to the callback.
514 r = self.decode_incoming_msg(msg)
518 msgname = type(r).__name__
520 if self.event_callback:
521 self.event_callback(msgname, r)
523 def _control_ping(self, context):
524 """Send a ping command."""
525 self._call_vpp_async(self.control_ping_index,
526 self.control_ping_msgdef,
529 def _call_vpp(self, i, msgdef, multipart, **kwargs):
530 """Given a message, send the message and await a reply.
532 msgdef - the message packing definition
533 i - the message type index
534 multipart - True if the message returns multiple
536 context - context number - chosen at random if not
538 The remainder of the kwargs are the arguments to the API call.
540 The return value is the message or message array containing
541 the response. It will raise an IOError exception if there was
542 no response within the timeout window.
545 if not 'context' in kwargs:
546 context = self.get_context()
547 kwargs['context'] = context
549 context = kwargs['context']
550 kwargs['_vl_msg_id'] = i
551 b = self.encode(msgdef, kwargs)
553 vpp_api.vac_rx_suspend()
557 # Send a ping after the request - we use its response
558 # to detect that we have seen all results.
559 self._control_ping(context)
561 # Block until we get a reply.
566 print('PNEUM ERROR: OH MY GOD')
567 raise IOError(2, 'PNEUM read failed')
569 r = self.decode_incoming_msg(msg)
570 msgname = type(r).__name__
571 if not context in r or r.context == 0 or context != r.context:
572 self.message_queue.put_nowait(r)
578 if msgname == 'control_ping_reply':
583 vpp_api.vac_rx_resume()
587 def _call_vpp_async(self, i, msgdef, **kwargs):
588 """Given a message, send the message and await a reply.
590 msgdef - the message packing definition
591 i - the message type index
592 context - context number - chosen at random if not
594 The remainder of the kwargs are the arguments to the API call.
596 if not 'context' in kwargs:
597 context = self.get_context()
598 kwargs['context'] = context
600 context = kwargs['context']
601 kwargs['_vl_msg_id'] = i
602 b = self.encode(msgdef, kwargs)
606 def register_event_callback(self, callback):
607 """Register a callback for async messages.
609 This will be called for async notifications in sync mode,
610 and all messages in async mode. In sync mode, replies to
611 requests will not come here.
613 callback is a fn(msg_type_name, msg_type) that will be
614 called when a message comes in. While this function is
615 executing, note that (a) you are in a background thread and
616 may wish to use threading.Lock to protect your datastructures,
617 and (b) message processing from VPP will stop (so if you take
618 a long while about it you may provoke reply timeouts or cause
619 VPP to fill the RX buffer). Passing None will disable the
622 self.event_callback = callback
624 def thread_msg_handler(self):
625 """Python thread calling the user registerd message handler.
627 This is to emulate the old style event callback scheme. Modern
628 clients should provide their own thread to poll the event
632 r = self.message_queue.get()
633 msgname = type(r).__name__
634 if self.event_callback:
635 self.event_callback(msgname, r)