3 # Copyright (c) 2016 Cisco and/or its affiliates.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
17 from __future__ import print_function
18 import sys, os, logging, collections, struct, json, threading, glob
21 logging.basicConfig(level=logging.DEBUG)
24 def eprint(*args, **kwargs):
25 """Print critical diagnostics to stderr."""
26 print(*args, file=sys.stderr, **kwargs)
29 """Clean up VPP connection on shutdown."""
31 eprint ('Cleaning up VPP on exit')
37 This class provides the APIs to VPP. The APIs are loaded
38 from provided .api.json files and makes functions accordingly.
39 These functions are documented in the VPP .api files, as they
40 are dynamically created.
42 Additionally, VPP can send callback messages; this class
43 provides a means to register a callback function to receive
44 these messages in a background thread.
46 def __init__(self, apifiles = None, testmode = False):
47 """Create a VPP API object.
49 apifiles is a list of files containing API
50 descriptions that will be loaded - methods will be
51 dynamically created reflecting these APIs. If not
52 provided this will load the API files from VPP's
53 default install location.
58 self.buffersize = 10000
59 self.connected = False
60 self.header = struct.Struct('>HI')
61 self.results_lock = threading.Lock()
65 self.event_callback = None
68 # Pick up API definitions from default directory
69 apifiles = glob.glob('/usr/share/vpp/api/*.api.json')
72 with open(file) as apidef_file:
73 api = json.load(apidef_file)
74 for t in api['types']:
75 self.add_type(t[0], t[1:])
77 for m in api['messages']:
78 self.add_message(m[0], m[1:])
79 self.apifiles = apifiles
82 if len(self.messages) == 0 and not testmode:
83 raise ValueError(1, 'Missing JSON message definitions')
85 # Make sure we allow VPP to clean up the message rings.
86 atexit.register(vpp_atexit, self)
88 class ContextId(object):
89 """Thread-safe provider of unique context IDs."""
92 self.lock = threading.Lock()
94 """Get a new unique (or, at least, not recently used) context."""
98 get_context = ContextId()
101 """Debug function: report current VPP API status to stdout."""
102 print('Connected') if self.connected else print('Not Connected')
103 print('Read API definitions from', ', '.join(self.apifiles))
105 def __struct (self, t, n = None, e = -1, vl = None):
106 """Create a packing structure for a message."""
107 base_types = { 'u8' : 'B',
118 if e > 0 and t == 'u8':
120 return struct.Struct('>' + str(e) + 's')
122 # Fixed array of base type
123 return [e, struct.Struct('>' + base_types[t])]
125 # Old style variable array
126 return [-1, struct.Struct('>' + base_types[t])]
128 # Variable length array
129 return [vl, struct.Struct('>s')] if t == 'u8' else \
130 [vl, struct.Struct('>' + base_types[t])]
132 return struct.Struct('>' + base_types[t])
134 if t in self.messages:
135 ### Return a list in case of array ###
137 return [e, lambda self, encode, buf, offset, args: (
138 self.__struct_type(encode, self.messages[t], buf, offset,
141 return [vl, lambda self, encode, buf, offset, args: (
142 self.__struct_type(encode, self.messages[t], buf, offset,
146 raise NotImplementedError(1, 'No support for compound types ' + t)
147 return lambda self, encode, buf, offset, args: (
148 self.__struct_type(encode, self.messages[t], buf, offset, args)
151 raise ValueError(1, 'Invalid message type: ' + t)
153 def __struct_type(self, encode, msgdef, buf, offset, kwargs):
154 """Get a message packer or unpacker."""
156 return self.__struct_type_encode(msgdef, buf, offset, kwargs)
158 return self.__struct_type_decode(msgdef, buf, offset)
160 def __struct_type_encode(self, msgdef, buf, offset, kwargs):
165 if k not in msgdef['args']:
166 raise ValueError(1, 'Invalid field-name in message call ' + k)
168 for k,v in msgdef['args'].iteritems():
173 e = kwargs[v[0]] if v[0] in kwargs else v[0]
176 size += v[1](self, True, buf, off + size,
184 buf[off:off + l] = bytearray(kwargs[k])
189 v[1].pack_into(buf, off + size, i)
193 size = v(self, True, buf, off, kwargs[k])
195 v.pack_into(buf, off, kwargs[k])
198 size = v.size if not type(v) is list else 0
200 return off + size - offset
203 def __getitem__(self, name):
204 if name in self.messages:
205 return self.messages[name]
208 def encode(self, msgdef, kwargs):
209 # Make suitably large buffer
210 buf = bytearray(self.buffersize)
212 size = self.__struct_type(True, msgdef, buf, offset, kwargs)
213 return buf[:offset + size]
215 def decode(self, msgdef, buf):
216 return self.__struct_type(False, msgdef, buf, 0, None)[1]
218 def __struct_type_decode(self, msgdef, buf, offset):
222 for k,v in msgdef['args'].iteritems():
226 if callable(v[1]): # compound type
228 if v[0] in msgdef['args']: # vla
234 (s,l) = v[1](self, False, buf, off + size, None)
239 if type(v[0]) is int:
240 size = len(buf) - off
243 res.append(buf[off:off + size])
245 e = v[0] if type(v[0]) is int else res[v[2]]
247 e = (len(buf) - off) / v[1].size
252 lst.append(v[1].unpack_from(buf, off + size)[0])
256 (s,l) = v(self, False, buf, off, None)
260 res.append(v.unpack_from(buf, off)[0])
263 return off + size - offset, msgdef['return_tuple']._make(res)
265 def ret_tup(self, name):
266 if name in self.messages and 'return_tuple' in self.messages[name]:
267 return self.messages[name]['return_tuple']
270 def add_message(self, name, msgdef):
271 if name in self.messages:
272 raise ValueError('Duplicate message name: ' + name)
274 args = collections.OrderedDict()
275 argtypes = collections.OrderedDict()
278 for i, f in enumerate(msgdef):
279 if type(f) is dict and 'crc' in f:
280 msg['crc'] = f['crc']
284 if len(f) == 3 and f[2] == 0 and i != len(msgdef) - 2:
285 raise ValueError('Variable Length Array must be last: ' + name)
286 args[field_name] = self.__struct(*f)
287 argtypes[field_name] = field_type
288 if len(f) == 4: # Find offset to # elements field
289 args[field_name].append(args.keys().index(f[3]) - i)
290 fields.append(field_name)
291 msg['return_tuple'] = collections.namedtuple(name, fields,
293 self.messages[name] = msg
294 self.messages[name]['args'] = args
295 self.messages[name]['argtypes'] = argtypes
296 return self.messages[name]
298 def add_type(self, name, typedef):
299 return self.add_message('vl_api_' + name + '_t', typedef)
301 def make_function(self, name, i, msgdef, multipart, async):
303 f = lambda **kwargs: (self._call_vpp_async(i, msgdef, **kwargs))
305 f = lambda **kwargs: (self._call_vpp(i, msgdef, multipart, **kwargs))
306 args = self.messages[name]['args']
307 argtypes = self.messages[name]['argtypes']
308 f.__name__ = str(name)
309 f.__doc__ = ", ".join(["%s %s" % (argtypes[k], k) for k in args.keys()])
312 def _register_functions(self, async=False):
313 self.id_names = [None] * (self.vpp_dictionary_maxid + 1)
314 self.id_msgdef = [None] * (self.vpp_dictionary_maxid + 1)
315 for name, msgdef in self.messages.iteritems():
316 if name in self.vpp_dictionary:
317 if self.messages[name]['crc'] != self.vpp_dictionary[name]['crc']:
318 raise ValueError(3, 'Failed CRC checksum ' + name +
319 ' ' + self.messages[name]['crc'] +
320 ' ' + self.vpp_dictionary[name]['crc'])
321 i = self.vpp_dictionary[name]['id']
322 self.id_msgdef[i] = msgdef
323 self.id_names[i] = name
324 multipart = True if name.find('_dump') > 0 else False
325 setattr(self, name, self.make_function(name, i, msgdef, multipart, async))
327 def _write (self, buf):
328 """Send a binary-packed message to VPP."""
329 if not self.connected:
330 raise IOError(1, 'Not connected')
331 return vpp_api.write(str(buf))
333 def _load_dictionary(self):
334 self.vpp_dictionary = {}
335 self.vpp_dictionary_maxid = 0
336 d = vpp_api.msg_table()
339 raise IOError(3, 'Cannot get VPP API dictionary')
341 name, crc = n.rsplit('_', 1)
343 self.vpp_dictionary[name] = { 'id' : i, 'crc' : crc }
344 self.vpp_dictionary_maxid = max(self.vpp_dictionary_maxid, i)
346 def connect(self, name, chroot_prefix = None, async = False, rx_qlen = 32):
349 name - the name of the client.
350 chroot_prefix - if VPP is chroot'ed, the prefix of the jail
351 async - if true, messages are sent without waiting for a reply
352 rx_qlen - the length of the VPP message receive queue between
355 msg_handler = self.msg_handler_sync if not async else self.msg_handler_async
356 if chroot_prefix is not None:
357 rv = vpp_api.connect(name, msg_handler, rx_qlen, chroot_prefix)
359 rv = vpp_api.connect(name, msg_handler, rx_qlen)
362 raise IOError(2, 'Connect failed')
363 self.connected = True
365 self._load_dictionary()
366 self._register_functions(async=async)
368 # Initialise control ping
369 self.control_ping_index = self.vpp_dictionary['control_ping']['id']
370 self.control_ping_msgdef = self.messages['control_ping']
372 def disconnect(self):
373 """Detach from VPP."""
374 rv = vpp_api.disconnect()
375 self.connected = False
378 def results_wait(self, context):
379 """In a sync call, wait for the reply
381 The context ID is used to pair reply to request.
384 # Results is filled by the background callback. It will
385 # raise the event when the context receives a response.
386 # Given there are two threads we have to be careful with the
387 # use of results and the structures under it, hence the lock.
388 with self.results_lock:
389 result = self.results[context]
392 timed_out = not ev.wait(self.timeout)
395 raise IOError(3, 'Waiting for reply timed out')
397 with self.results_lock:
398 result = self.results[context]
399 del self.results[context]
402 def results_prepare(self, context, multi=False):
403 """Prep for receiving a result in response to a request msg
405 context - unique context number sent in request and
406 returned in reply or replies
407 multi - true if we expect multiple messages from this
411 # The event is used to indicate that all results are in
413 'e': threading.Event(),
416 # Make it clear to the BG thread it's going to see several
417 # messages; messages are stored in a results array
418 new_result['m'] = True
421 new_result['e'].clear()
423 # Put the prepped result structure into results, at which point
424 # the bg thread can also access it (hence the thread lock)
425 with self.results_lock:
426 self.results[context] = new_result
428 def msg_handler_sync(self, msg):
429 """Process an incoming message from VPP in sync mode.
431 The message may be a reply or it may be an async notification.
433 r = self.decode_incoming_msg(msg)
437 # If we have a context, then use the context to find any
438 # request waiting for a reply
440 if hasattr(r, 'context') and r.context > 0:
443 msgname = type(r).__name__
446 # No context -> async notification that we feed to the callback
447 if self.event_callback:
448 self.event_callback(msgname, r)
450 # Context -> use the results structure (carefully) to find
451 # who we're responding to and return the message to that
453 with self.results_lock:
454 if context not in self.results:
455 eprint('Not expecting results for this context', context, r)
457 result = self.results[context]
460 # Collect results until control ping
463 if msgname == 'control_ping_reply':
466 elif 'm' in self.results[context]:
467 # One element in a multipart
468 result['r'].append(r)
470 # All of a single result
474 def decode_incoming_msg(self, msg):
476 eprint('vpp_api.read failed')
479 i, ci = self.header.unpack_from(msg, 0)
480 if self.id_names[i] == 'rx_thread_exit':
484 # Decode message and returns a tuple.
486 msgdef = self.id_msgdef[i]
488 raise IOError(2, 'Reply message undefined')
490 r = self.decode(msgdef, msg)
494 def msg_handler_async(self, msg):
495 """Process a message from VPP in async mode.
497 In async mode, all messages are returned to the callback.
499 r = self.decode_incoming_msg(msg)
503 msgname = type(r).__name__
505 if self.event_callback:
506 self.event_callback(msgname, r)
508 def _control_ping(self, context):
509 """Send a ping command."""
510 self._call_vpp_async(self.control_ping_index,
511 self.control_ping_msgdef,
514 def _call_vpp(self, i, msgdef, multipart, **kwargs):
515 """Given a message, send the message and await a reply.
517 msgdef - the message packing definition
518 i - the message type index
519 multipart - True if the message returns multiple
521 context - context number - chosen at random if not
523 The remainder of the kwargs are the arguments to the API call.
525 The return value is the message or message array containing
526 the response. It will raise an IOError exception if there was
527 no response within the timeout window.
530 # We need a context if not supplied, in order to get the
532 context = kwargs.get('context', self.get_context())
533 kwargs['context'] = context
535 # Set up to receive a response
536 self.results_prepare(context, multi=multipart)
539 self._call_vpp_async(i, msgdef, **kwargs)
542 # Send a ping after the request - we use its response
543 # to detect that we have seen all results.
544 self._control_ping(context)
546 # Block until we get a reply.
547 r = self.results_wait(context)
551 def _call_vpp_async(self, i, msgdef, **kwargs):
552 """Given a message, send the message and await a reply.
554 msgdef - the message packing definition
555 i - the message type index
556 context - context number - chosen at random if not
558 The remainder of the kwargs are the arguments to the API call.
560 if not 'context' in kwargs:
561 context = self.get_context()
562 kwargs['context'] = context
564 context = kwargs['context']
565 kwargs['_vl_msg_id'] = i
566 b = self.encode(msgdef, kwargs)
570 def register_event_callback(self, callback):
571 """Register a callback for async messages.
573 This will be called for async notifications in sync mode,
574 and all messages in async mode. In sync mode, replies to
575 requests will not come here.
577 callback is a fn(msg_type_name, msg_type) that will be
578 called when a message comes in. While this function is
579 executing, note that (a) you are in a background thread and
580 may wish to use threading.Lock to protect your datastructures,
581 and (b) message processing from VPP will stop (so if you take
582 a long while about it you may provoke reply timeouts or cause
583 VPP to fill the RX buffer). Passing None will disable the
586 self.event_callback = callback