3 # Copyright (c) 2016 Cisco and/or its affiliates.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
17 from __future__ import print_function
18 import sys, os, logging, collections, struct, json, threading, glob
21 logging.basicConfig(level=logging.DEBUG)
24 def eprint(*args, **kwargs):
25 """Print critical diagnostics to stderr."""
26 print(*args, file=sys.stderr, **kwargs)
29 """Clean up VPP connection on shutdown."""
31 eprint ('Cleaning up VPP on exit')
37 This class provides the APIs to VPP. The APIs are loaded
38 from provided .api.json files and makes functions accordingly.
39 These functions are documented in the VPP .api files, as they
40 are dynamically created.
42 Additionally, VPP can send callback messages; this class
43 provides a means to register a callback function to receive
44 these messages in a background thread.
46 def __init__(self, apifiles = None, testmode = False):
47 """Create a VPP API object.
49 apifiles is a list of files containing API
50 descriptions that will be loaded - methods will be
51 dynamically created reflecting these APIs. If not
52 provided this will load the API files from VPP's
53 default install location.
58 self.buffersize = 10000
59 self.connected = False
60 self.header = struct.Struct('>HI')
61 self.results_lock = threading.Lock()
67 # Pick up API definitions from default directory
68 apifiles = glob.glob('/usr/share/vpp/api/*.api.json')
71 with open(file) as apidef_file:
72 api = json.load(apidef_file)
73 for t in api['types']:
74 self.add_type(t[0], t[1:])
76 for m in api['messages']:
77 self.add_message(m[0], m[1:])
78 self.apifiles = apifiles
81 if len(self.messages) == 0 and not testmode:
82 raise ValueError(1, 'Missing JSON message definitions')
84 # Make sure we allow VPP to clean up the message rings.
85 atexit.register(vpp_atexit, self)
87 class ContextId(object):
88 """Thread-safe provider of unique context IDs."""
91 self.lock = threading.Lock()
93 """Get a new unique (or, at least, not recently used) context."""
97 get_context = ContextId()
100 """Debug function: report current VPP API status to stdout."""
101 print('Connected') if self.connected else print('Not Connected')
102 print('Read API definitions from', ', '.join(self.apifiles))
104 def __struct (self, t, n = None, e = -1, vl = None):
105 """Create a packing structure for a message."""
106 base_types = { 'u8' : 'B',
117 if e > 0 and t == 'u8':
119 return struct.Struct('>' + str(e) + 's')
121 # Fixed array of base type
122 return [e, struct.Struct('>' + base_types[t])]
124 # Old style variable array
125 return [-1, struct.Struct('>' + base_types[t])]
127 # Variable length array
128 return [vl, struct.Struct('>s')] if t == 'u8' else \
129 [vl, struct.Struct('>' + base_types[t])]
131 return struct.Struct('>' + base_types[t])
133 if t in self.messages:
134 ### Return a list in case of array ###
136 return [e, lambda self, encode, buf, offset, args: (
137 self.__struct_type(encode, self.messages[t], buf, offset,
140 return [vl, lambda self, encode, buf, offset, args: (
141 self.__struct_type(encode, self.messages[t], buf, offset,
145 raise NotImplementedError(1, 'No support for compound types ' + t)
146 return lambda self, encode, buf, offset, args: (
147 self.__struct_type(encode, self.messages[t], buf, offset, args)
150 raise ValueError(1, 'Invalid message type: ' + t)
152 def __struct_type(self, encode, msgdef, buf, offset, kwargs):
153 """Get a message packer or unpacker."""
155 return self.__struct_type_encode(msgdef, buf, offset, kwargs)
157 return self.__struct_type_decode(msgdef, buf, offset)
159 def __struct_type_encode(self, msgdef, buf, offset, kwargs):
164 if k not in msgdef['args']:
165 raise ValueError(1, 'Invalid field-name in message call ' + k)
167 for k,v in msgdef['args'].iteritems():
172 e = kwargs[v[0]] if v[0] in kwargs else v[0]
175 size += v[1](self, True, buf, off + size,
183 buf[off:off + l] = bytearray(kwargs[k])
188 v[1].pack_into(buf, off + size, i)
192 size = v(self, True, buf, off, kwargs[k])
194 v.pack_into(buf, off, kwargs[k])
197 size = v.size if not type(v) is list else 0
199 return off + size - offset
202 def __getitem__(self, name):
203 if name in self.messages:
204 return self.messages[name]
207 def encode(self, msgdef, kwargs):
208 # Make suitably large buffer
209 buf = bytearray(self.buffersize)
211 size = self.__struct_type(True, msgdef, buf, offset, kwargs)
212 return buf[:offset + size]
214 def decode(self, msgdef, buf):
215 return self.__struct_type(False, msgdef, buf, 0, None)[1]
217 def __struct_type_decode(self, msgdef, buf, offset):
221 for k,v in msgdef['args'].iteritems():
225 if callable(v[1]): # compound type
227 if v[0] in msgdef['args']: # vla
233 (s,l) = v[1](self, False, buf, off + size, None)
238 if type(v[0]) is int:
239 size = len(buf) - off
242 res.append(buf[off:off + size])
244 e = v[0] if type(v[0]) is int else res[v[2]]
246 e = (len(buf) - off) / v[1].size
251 lst.append(v[1].unpack_from(buf, off + size)[0])
255 (s,l) = v(self, False, buf, off, None)
259 res.append(v.unpack_from(buf, off)[0])
262 return off + size - offset, msgdef['return_tuple']._make(res)
264 def ret_tup(self, name):
265 if name in self.messages and 'return_tuple' in self.messages[name]:
266 return self.messages[name]['return_tuple']
269 def add_message(self, name, msgdef):
270 if name in self.messages:
271 raise ValueError('Duplicate message name: ' + name)
273 args = collections.OrderedDict()
274 argtypes = collections.OrderedDict()
277 for i, f in enumerate(msgdef):
278 if type(f) is dict and 'crc' in f:
279 msg['crc'] = f['crc']
283 if len(f) == 3 and f[2] == 0 and i != len(msgdef) - 2:
284 raise ValueError('Variable Length Array must be last: ' + name)
285 args[field_name] = self.__struct(*f)
286 argtypes[field_name] = field_type
287 if len(f) == 4: # Find offset to # elements field
288 args[field_name].append(args.keys().index(f[3]) - i)
289 fields.append(field_name)
290 msg['return_tuple'] = collections.namedtuple(name, fields,
292 self.messages[name] = msg
293 self.messages[name]['args'] = args
294 self.messages[name]['argtypes'] = argtypes
295 return self.messages[name]
297 def add_type(self, name, typedef):
298 return self.add_message('vl_api_' + name + '_t', typedef)
300 def make_function(self, name, i, msgdef, multipart, async):
302 f = lambda **kwargs: (self._call_vpp_async(i, msgdef, **kwargs))
304 f = lambda **kwargs: (self._call_vpp(i, msgdef, multipart, **kwargs))
305 args = self.messages[name]['args']
306 argtypes = self.messages[name]['argtypes']
307 f.__name__ = str(name)
308 f.__doc__ = ", ".join(["%s %s" % (argtypes[k], k) for k in args.keys()])
311 def _register_functions(self, async=False):
312 self.id_names = [None] * (self.vpp_dictionary_maxid + 1)
313 self.id_msgdef = [None] * (self.vpp_dictionary_maxid + 1)
314 for name, msgdef in self.messages.iteritems():
315 if name in self.vpp_dictionary:
316 if self.messages[name]['crc'] != self.vpp_dictionary[name]['crc']:
317 raise ValueError(3, 'Failed CRC checksum ' + name +
318 ' ' + self.messages[name]['crc'] +
319 ' ' + self.vpp_dictionary[name]['crc'])
320 i = self.vpp_dictionary[name]['id']
321 self.id_msgdef[i] = msgdef
322 self.id_names[i] = name
323 multipart = True if name.find('_dump') > 0 else False
324 setattr(self, name, self.make_function(name, i, msgdef, multipart, async))
326 def _write (self, buf):
327 """Send a binary-packed message to VPP."""
328 if not self.connected:
329 raise IOError(1, 'Not connected')
330 return vpp_api.write(str(buf))
332 def _load_dictionary(self):
333 self.vpp_dictionary = {}
334 self.vpp_dictionary_maxid = 0
335 d = vpp_api.msg_table()
338 raise IOError(3, 'Cannot get VPP API dictionary')
340 name, crc = n.rsplit('_', 1)
342 self.vpp_dictionary[name] = { 'id' : i, 'crc' : crc }
343 self.vpp_dictionary_maxid = max(self.vpp_dictionary_maxid, i)
345 def connect(self, name, chroot_prefix = None, async = False, rx_qlen = 32):
348 name - the name of the client.
349 chroot_prefix - if VPP is chroot'ed, the prefix of the jail
350 async - if true, messages are sent without waiting for a reply
351 rx_qlen - the length of the VPP message receive queue between
354 msg_handler = self.msg_handler_sync if not async else self.msg_handler_async
355 if chroot_prefix is not None:
356 rv = vpp_api.connect(name, msg_handler, rx_qlen, chroot_prefix)
358 rv = vpp_api.connect(name, msg_handler, rx_qlen)
361 raise IOError(2, 'Connect failed')
362 self.connected = True
364 self._load_dictionary()
365 self._register_functions(async=async)
367 # Initialise control ping
368 self.control_ping_index = self.vpp_dictionary['control_ping']['id']
369 self.control_ping_msgdef = self.messages['control_ping']
371 def disconnect(self):
372 """Detach from VPP."""
373 rv = vpp_api.disconnect()
374 self.connected = False
377 def results_wait(self, context):
378 """In a sync call, wait for the reply
380 The context ID is used to pair reply to request.
383 # Results is filled by the background callback. It will
384 # raise the event when the context receives a response.
385 # Given there are two threads we have to be careful with the
386 # use of results and the structures under it, hence the lock.
387 with self.results_lock:
388 result = self.results[context]
391 timed_out = not ev.wait(self.timeout)
394 raise IOError(3, 'Waiting for reply timed out')
396 with self.results_lock:
397 result = self.results[context]
398 del self.results[context]
401 def results_prepare(self, context, multi=False):
402 """Prep for receiving a result in response to a request msg
404 context - unique context number sent in request and
405 returned in reply or replies
406 multi - true if we expect multiple messages from this
410 # The event is used to indicate that all results are in
412 'e': threading.Event(),
415 # Make it clear to the BG thread it's going to see several
416 # messages; messages are stored in a results array
417 new_result['m'] = True
420 new_result['e'].clear()
422 # Put the prepped result structure into results, at which point
423 # the bg thread can also access it (hence the thread lock)
424 with self.results_lock:
425 self.results[context] = new_result
427 def msg_handler_sync(self, msg):
428 """Process an incoming message from VPP in sync mode.
430 The message may be a reply or it may be an async notification.
432 r = self.decode_incoming_msg(msg)
436 # If we have a context, then use the context to find any
437 # request waiting for a reply
439 if hasattr(r, 'context') and r.context > 0:
442 msgname = type(r).__name__
445 # No context -> async notification that we feed to the callback
446 if self.event_callback:
447 self.event_callback(msgname, r)
449 # Context -> use the results structure (carefully) to find
450 # who we're responding to and return the message to that
452 with self.results_lock:
453 if context not in self.results:
454 eprint('Not expecting results for this context', context, r)
456 result = self.results[context]
459 # Collect results until control ping
462 if msgname == 'control_ping_reply':
465 elif 'm' in self.results[context]:
466 # One element in a multipart
467 result['r'].append(r)
469 # All of a single result
473 def decode_incoming_msg(self, msg):
475 eprint('vpp_api.read failed')
478 i, ci = self.header.unpack_from(msg, 0)
479 if self.id_names[i] == 'rx_thread_exit':
483 # Decode message and returns a tuple.
485 msgdef = self.id_msgdef[i]
487 raise IOError(2, 'Reply message undefined')
489 r = self.decode(msgdef, msg)
493 def msg_handler_async(self, msg):
494 """Process a message from VPP in async mode.
496 In async mode, all messages are returned to the callback.
498 r = self.decode_incoming_msg(msg)
502 msgname = type(r).__name__
504 if self.event_callback:
505 self.event_callback(msgname, msg)
507 def _control_ping(self, context):
508 """Send a ping command."""
509 self._call_vpp_async(self.control_ping_index,
510 self.control_ping_msgdef,
513 def _call_vpp(self, i, msgdef, multipart, **kwargs):
514 """Given a message, send the message and await a reply.
516 msgdef - the message packing definition
517 i - the message type index
518 multipart - True if the message returns multiple
520 context - context number - chosen at random if not
522 The remainder of the kwargs are the arguments to the API call.
524 The return value is the message or message array containing
525 the response. It will raise an IOError exception if there was
526 no response within the timeout window.
529 # We need a context if not supplied, in order to get the
531 context = kwargs.get('context', self.get_context())
532 kwargs['context'] = context
534 # Set up to receive a response
535 self.results_prepare(context, multi=multipart)
538 self._call_vpp_async(i, msgdef, **kwargs)
541 # Send a ping after the request - we use its response
542 # to detect that we have seen all results.
543 self._control_ping(context)
545 # Block until we get a reply.
546 r = self.results_wait(context)
550 def _call_vpp_async(self, i, msgdef, **kwargs):
551 """Given a message, send the message and await a reply.
553 msgdef - the message packing definition
554 i - the message type index
555 context - context number - chosen at random if not
557 The remainder of the kwargs are the arguments to the API call.
559 if not 'context' in kwargs:
560 context = self.get_context()
561 kwargs['context'] = context
563 context = kwargs['context']
564 kwargs['_vl_msg_id'] = i
565 b = self.encode(msgdef, kwargs)
569 def register_event_callback(self, callback):
570 """Register a callback for async messages.
572 This will be called for async notifications in sync mode,
573 and all messages in async mode. In sync mode, replies to
574 requests will not come here.
576 callback is a fn(msg_type_name, msg_type) that will be
577 called when a message comes in. While this function is
578 executing, note that (a) you are in a background thread and
579 may wish to use threading.Lock to protect your datastructures,
580 and (b) message processing from VPP will stop (so if you take
581 a long while about it you may provoke reply timeouts or cause
582 VPP to fill the RX buffer). Passing None will disable the
585 self.event_callback = callback