3 # Copyright (c) 2016 Cisco and/or its affiliates.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
17 from __future__ import print_function
18 import sys, os, logging, collections, struct, json, threading, glob
21 logging.basicConfig(level=logging.DEBUG)
24 def eprint(*args, **kwargs):
25 """Print critical diagnostics to stderr."""
26 print(*args, file=sys.stderr, **kwargs)
29 """Clean up VPP connection on shutdown."""
31 eprint ('Cleaning up VPP on exit')
39 class FuncWrapper(object):
40 def __init__(self, func):
42 self.__name__ = func.__name__
44 def __call__(self, **kwargs):
45 return self._func(**kwargs)
51 This class provides the APIs to VPP. The APIs are loaded
52 from provided .api.json files and makes functions accordingly.
53 These functions are documented in the VPP .api files, as they
54 are dynamically created.
56 Additionally, VPP can send callback messages; this class
57 provides a means to register a callback function to receive
58 these messages in a background thread.
60 def __init__(self, apifiles = None, testmode = False, async_thread = True):
61 """Create a VPP API object.
63 apifiles is a list of files containing API
64 descriptions that will be loaded - methods will be
65 dynamically created reflecting these APIs. If not
66 provided this will load the API files from VPP's
67 default install location.
72 self.buffersize = 10000
73 self.connected = False
74 self.header = struct.Struct('>HI')
76 self.event_callback = None
77 self.message_queue = Queue.Queue()
79 self.vpp_api = vpp_api
81 self.event_thread = threading.Thread(target=self.thread_msg_handler)
82 self.event_thread.daemon = True
83 self.event_thread.start()
86 # Pick up API definitions from default directory
87 apifiles = glob.glob('/usr/share/vpp/api/*.api.json')
90 with open(file) as apidef_file:
91 api = json.load(apidef_file)
92 for t in api['types']:
93 self.add_type(t[0], t[1:])
95 for m in api['messages']:
96 self.add_message(m[0], m[1:])
97 self.apifiles = apifiles
100 if len(self.messages) == 0 and not testmode:
101 raise ValueError(1, 'Missing JSON message definitions')
103 # Make sure we allow VPP to clean up the message rings.
104 atexit.register(vpp_atexit, self)
106 class ContextId(object):
107 """Thread-safe provider of unique context IDs."""
110 self.lock = threading.Lock()
112 """Get a new unique (or, at least, not recently used) context."""
116 get_context = ContextId()
119 """Debug function: report current VPP API status to stdout."""
120 print('Connected') if self.connected else print('Not Connected')
121 print('Read API definitions from', ', '.join(self.apifiles))
123 def __struct (self, t, n = None, e = -1, vl = None):
124 """Create a packing structure for a message."""
125 base_types = { 'u8' : 'B',
136 if e > 0 and t == 'u8':
138 return struct.Struct('>' + str(e) + 's')
140 # Fixed array of base type
141 return [e, struct.Struct('>' + base_types[t])]
143 # Old style variable array
144 return [-1, struct.Struct('>' + base_types[t])]
146 # Variable length array
147 return [vl, struct.Struct('>s')] if t == 'u8' else \
148 [vl, struct.Struct('>' + base_types[t])]
150 return struct.Struct('>' + base_types[t])
152 if t in self.messages:
153 ### Return a list in case of array ###
155 return [e, lambda self, encode, buf, offset, args: (
156 self.__struct_type(encode, self.messages[t], buf, offset,
159 return [vl, lambda self, encode, buf, offset, args: (
160 self.__struct_type(encode, self.messages[t], buf, offset,
164 raise NotImplementedError(1, 'No support for compound types ' + t)
165 return lambda self, encode, buf, offset, args: (
166 self.__struct_type(encode, self.messages[t], buf, offset, args)
169 raise ValueError(1, 'Invalid message type: ' + t)
171 def __struct_type(self, encode, msgdef, buf, offset, kwargs):
172 """Get a message packer or unpacker."""
174 return self.__struct_type_encode(msgdef, buf, offset, kwargs)
176 return self.__struct_type_decode(msgdef, buf, offset)
178 def __struct_type_encode(self, msgdef, buf, offset, kwargs):
183 if k not in msgdef['args']:
184 raise ValueError(1, 'Invalid field-name in message call ' + k)
186 for k,v in msgdef['args'].iteritems():
191 e = kwargs[v[0]] if v[0] in kwargs else v[0]
194 size += v[1](self, True, buf, off + size,
202 buf[off:off + l] = bytearray(kwargs[k])
207 v[1].pack_into(buf, off + size, i)
211 size = v(self, True, buf, off, kwargs[k])
213 v.pack_into(buf, off, kwargs[k])
216 size = v.size if not type(v) is list else 0
218 return off + size - offset
221 def __getitem__(self, name):
222 if name in self.messages:
223 return self.messages[name]
226 def encode(self, msgdef, kwargs):
227 # Make suitably large buffer
228 buf = bytearray(self.buffersize)
230 size = self.__struct_type(True, msgdef, buf, offset, kwargs)
231 return buf[:offset + size]
233 def decode(self, msgdef, buf):
234 return self.__struct_type(False, msgdef, buf, 0, None)[1]
236 def __struct_type_decode(self, msgdef, buf, offset):
240 for k,v in msgdef['args'].iteritems():
244 if callable(v[1]): # compound type
246 if v[0] in msgdef['args']: # vla
252 (s,l) = v[1](self, False, buf, off + size, None)
257 if type(v[0]) is int:
258 size = len(buf) - off
261 res.append(buf[off:off + size])
263 e = v[0] if type(v[0]) is int else res[v[2]]
265 e = (len(buf) - off) / v[1].size
270 lst.append(v[1].unpack_from(buf, off + size)[0])
274 (s,l) = v(self, False, buf, off, None)
278 res.append(v.unpack_from(buf, off)[0])
281 return off + size - offset, msgdef['return_tuple']._make(res)
283 def ret_tup(self, name):
284 if name in self.messages and 'return_tuple' in self.messages[name]:
285 return self.messages[name]['return_tuple']
288 def add_message(self, name, msgdef):
289 if name in self.messages:
290 raise ValueError('Duplicate message name: ' + name)
292 args = collections.OrderedDict()
293 argtypes = collections.OrderedDict()
296 for i, f in enumerate(msgdef):
297 if type(f) is dict and 'crc' in f:
298 msg['crc'] = f['crc']
302 if len(f) == 3 and f[2] == 0 and i != len(msgdef) - 2:
303 raise ValueError('Variable Length Array must be last: ' + name)
304 args[field_name] = self.__struct(*f)
305 argtypes[field_name] = field_type
306 if len(f) == 4: # Find offset to # elements field
307 args[field_name].append(args.keys().index(f[3]) - i)
308 fields.append(field_name)
309 msg['return_tuple'] = collections.namedtuple(name, fields,
311 self.messages[name] = msg
312 self.messages[name]['args'] = args
313 self.messages[name]['argtypes'] = argtypes
314 return self.messages[name]
316 def add_type(self, name, typedef):
317 return self.add_message('vl_api_' + name + '_t', typedef)
319 def make_function(self, name, i, msgdef, multipart, async):
321 f = lambda **kwargs: (self._call_vpp_async(i, msgdef, **kwargs))
323 f = lambda **kwargs: (self._call_vpp(i, msgdef, multipart, **kwargs))
324 args = self.messages[name]['args']
325 argtypes = self.messages[name]['argtypes']
326 f.__name__ = str(name)
327 f.__doc__ = ", ".join(["%s %s" % (argtypes[k], k) for k in args.keys()])
332 if not hasattr(self, "_api"):
333 raise Exception("Not connected, api definitions not available")
336 def _register_functions(self, async=False):
337 self.id_names = [None] * (self.vpp_dictionary_maxid + 1)
338 self.id_msgdef = [None] * (self.vpp_dictionary_maxid + 1)
340 for name, msgdef in self.messages.iteritems():
341 if name in self.vpp_dictionary:
342 if self.messages[name]['crc'] != self.vpp_dictionary[name]['crc']:
343 raise ValueError(3, 'Failed CRC checksum ' + name +
344 ' ' + self.messages[name]['crc'] +
345 ' ' + self.vpp_dictionary[name]['crc'])
346 i = self.vpp_dictionary[name]['id']
347 self.id_msgdef[i] = msgdef
348 self.id_names[i] = name
349 multipart = True if name.find('_dump') > 0 else False
350 f = self.make_function(name, i, msgdef, multipart, async)
351 setattr(self._api, name, FuncWrapper(f))
353 # old API stuff starts here - will be removed in 17.07
354 if hasattr(self, name):
356 3, "Conflicting name in JSON definition: `%s'" % name)
357 setattr(self, name, f)
358 # old API stuff ends here
360 def _write (self, buf):
361 """Send a binary-packed message to VPP."""
362 if not self.connected:
363 raise IOError(1, 'Not connected')
364 return vpp_api.write(str(buf))
367 if not self.connected:
368 raise IOError(1, 'Not connected')
370 return vpp_api.read(self.read_timeout)
372 def _load_dictionary(self):
373 self.vpp_dictionary = {}
374 self.vpp_dictionary_maxid = 0
375 d = vpp_api.msg_table()
378 raise IOError(3, 'Cannot get VPP API dictionary')
380 name, crc = n.rsplit('_', 1)
382 self.vpp_dictionary[name] = { 'id' : i, 'crc' : crc }
383 self.vpp_dictionary_maxid = max(self.vpp_dictionary_maxid, i)
385 def connect_internal(self, name, msg_handler, chroot_prefix, rx_qlen, async):
386 rv = vpp_api.connect(name, msg_handler, chroot_prefix, rx_qlen)
388 raise IOError(2, 'Connect failed')
389 self.connected = True
391 self._load_dictionary()
392 self._register_functions(async=async)
394 # Initialise control ping
395 self.control_ping_index = self.vpp_dictionary['control_ping']['id']
396 self.control_ping_msgdef = self.messages['control_ping']
398 def connect(self, name, chroot_prefix = None, async = False, rx_qlen = 32):
401 name - the name of the client.
402 chroot_prefix - if VPP is chroot'ed, the prefix of the jail
403 async - if true, messages are sent without waiting for a reply
404 rx_qlen - the length of the VPP message receive queue between
407 msg_handler = self.msg_handler_sync if not async \
408 else self.msg_handler_async
409 return self.connect_internal(name, msg_handler, chroot_prefix, rx_qlen,
412 def connect_sync (self, name, chroot_prefix = None, rx_qlen = 32):
413 """Attach to VPP in synchronous mode. Application must poll for events.
415 name - the name of the client.
416 chroot_prefix - if VPP is chroot'ed, the prefix of the jail
417 rx_qlen - the length of the VPP message receive queue between
421 return self.connect_internal(name, None, chroot_prefix, rx_qlen,
424 def disconnect(self):
425 """Detach from VPP."""
426 rv = vpp_api.disconnect()
427 self.connected = False
430 def msg_handler_sync(self, msg):
431 """Process an incoming message from VPP in sync mode.
433 The message may be a reply or it may be an async notification.
435 r = self.decode_incoming_msg(msg)
439 # If we have a context, then use the context to find any
440 # request waiting for a reply
442 if hasattr(r, 'context') and r.context > 0:
445 msgname = type(r).__name__
448 # No context -> async notification that we feed to the callback
449 self.message_queue.put_nowait(r)
451 raise IOError(2, 'RPC reply message received in event handler')
453 def decode_incoming_msg(self, msg):
455 eprint('vpp_api.read failed')
458 i, ci = self.header.unpack_from(msg, 0)
459 if self.id_names[i] == 'rx_thread_exit':
463 # Decode message and returns a tuple.
465 msgdef = self.id_msgdef[i]
467 raise IOError(2, 'Reply message undefined')
469 r = self.decode(msgdef, msg)
473 def msg_handler_async(self, msg):
474 """Process a message from VPP in async mode.
476 In async mode, all messages are returned to the callback.
478 r = self.decode_incoming_msg(msg)
482 msgname = type(r).__name__
484 if self.event_callback:
485 self.event_callback(msgname, r)
487 def _control_ping(self, context):
488 """Send a ping command."""
489 self._call_vpp_async(self.control_ping_index,
490 self.control_ping_msgdef,
493 def _call_vpp(self, i, msgdef, multipart, **kwargs):
494 """Given a message, send the message and await a reply.
496 msgdef - the message packing definition
497 i - the message type index
498 multipart - True if the message returns multiple
500 context - context number - chosen at random if not
502 The remainder of the kwargs are the arguments to the API call.
504 The return value is the message or message array containing
505 the response. It will raise an IOError exception if there was
506 no response within the timeout window.
509 if not 'context' in kwargs:
510 context = self.get_context()
511 kwargs['context'] = context
513 context = kwargs['context']
514 kwargs['_vl_msg_id'] = i
515 b = self.encode(msgdef, kwargs)
521 # Send a ping after the request - we use its response
522 # to detect that we have seen all results.
523 self._control_ping(context)
525 # Block until we get a reply.
530 print('PNEUM ERROR: OH MY GOD')
531 raise IOError(2, 'PNEUM read failed')
533 r = self.decode_incoming_msg(msg)
534 msgname = type(r).__name__
535 if not context in r or r.context == 0 or context != r.context:
536 self.message_queue.put_nowait(r)
542 if msgname == 'control_ping_reply':
551 def _call_vpp_async(self, i, msgdef, **kwargs):
552 """Given a message, send the message and await a reply.
554 msgdef - the message packing definition
555 i - the message type index
556 context - context number - chosen at random if not
558 The remainder of the kwargs are the arguments to the API call.
560 if not 'context' in kwargs:
561 context = self.get_context()
562 kwargs['context'] = context
564 context = kwargs['context']
565 kwargs['_vl_msg_id'] = i
566 b = self.encode(msgdef, kwargs)
570 def register_event_callback(self, callback):
571 """Register a callback for async messages.
573 This will be called for async notifications in sync mode,
574 and all messages in async mode. In sync mode, replies to
575 requests will not come here.
577 callback is a fn(msg_type_name, msg_type) that will be
578 called when a message comes in. While this function is
579 executing, note that (a) you are in a background thread and
580 may wish to use threading.Lock to protect your datastructures,
581 and (b) message processing from VPP will stop (so if you take
582 a long while about it you may provoke reply timeouts or cause
583 VPP to fill the RX buffer). Passing None will disable the
586 self.event_callback = callback
588 def thread_msg_handler(self):
589 """Python thread calling the user registerd message handler.
591 This is to emulate the old style event callback scheme. Modern
592 clients should provide their own thread to poll the event
596 r = self.message_queue.get()
597 msgname = type(r).__name__
598 if self.event_callback:
599 self.event_callback(msgname, r)