3 # Copyright (c) 2016 Cisco and/or its affiliates.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
17 from __future__ import print_function
18 import sys, os, logging, collections, struct, json, threading, glob
21 logging.basicConfig(level=logging.DEBUG)
24 def eprint(*args, **kwargs):
25 """Print critical diagnostics to stderr."""
26 print(*args, file=sys.stderr, **kwargs)
29 """Clean up VPP connection on shutdown."""
31 eprint ('Cleaning up VPP on exit')
39 class FuncWrapper(object):
40 def __init__(self, func):
42 self.__name__ = func.__name__
44 def __call__(self, **kwargs):
45 return self._func(**kwargs)
51 This class provides the APIs to VPP. The APIs are loaded
52 from provided .api.json files and makes functions accordingly.
53 These functions are documented in the VPP .api files, as they
54 are dynamically created.
56 Additionally, VPP can send callback messages; this class
57 provides a means to register a callback function to receive
58 these messages in a background thread.
60 def __init__(self, apifiles = None, testmode = False):
61 """Create a VPP API object.
63 apifiles is a list of files containing API
64 descriptions that will be loaded - methods will be
65 dynamically created reflecting these APIs. If not
66 provided this will load the API files from VPP's
67 default install location.
72 self.buffersize = 10000
73 self.connected = False
74 self.header = struct.Struct('>HI')
75 self.results_lock = threading.Lock()
79 self.event_callback = None
82 # Pick up API definitions from default directory
83 apifiles = glob.glob('/usr/share/vpp/api/*.api.json')
86 with open(file) as apidef_file:
87 api = json.load(apidef_file)
88 for t in api['types']:
89 self.add_type(t[0], t[1:])
91 for m in api['messages']:
92 self.add_message(m[0], m[1:])
93 self.apifiles = apifiles
96 if len(self.messages) == 0 and not testmode:
97 raise ValueError(1, 'Missing JSON message definitions')
99 # Make sure we allow VPP to clean up the message rings.
100 atexit.register(vpp_atexit, self)
102 class ContextId(object):
103 """Thread-safe provider of unique context IDs."""
106 self.lock = threading.Lock()
108 """Get a new unique (or, at least, not recently used) context."""
112 get_context = ContextId()
115 """Debug function: report current VPP API status to stdout."""
116 print('Connected') if self.connected else print('Not Connected')
117 print('Read API definitions from', ', '.join(self.apifiles))
119 def __struct (self, t, n = None, e = -1, vl = None):
120 """Create a packing structure for a message."""
121 base_types = { 'u8' : 'B',
132 if e > 0 and t == 'u8':
134 return struct.Struct('>' + str(e) + 's')
136 # Fixed array of base type
137 return [e, struct.Struct('>' + base_types[t])]
139 # Old style variable array
140 return [-1, struct.Struct('>' + base_types[t])]
142 # Variable length array
143 return [vl, struct.Struct('>s')] if t == 'u8' else \
144 [vl, struct.Struct('>' + base_types[t])]
146 return struct.Struct('>' + base_types[t])
148 if t in self.messages:
149 ### Return a list in case of array ###
151 return [e, lambda self, encode, buf, offset, args: (
152 self.__struct_type(encode, self.messages[t], buf, offset,
155 return [vl, lambda self, encode, buf, offset, args: (
156 self.__struct_type(encode, self.messages[t], buf, offset,
160 raise NotImplementedError(1, 'No support for compound types ' + t)
161 return lambda self, encode, buf, offset, args: (
162 self.__struct_type(encode, self.messages[t], buf, offset, args)
165 raise ValueError(1, 'Invalid message type: ' + t)
167 def __struct_type(self, encode, msgdef, buf, offset, kwargs):
168 """Get a message packer or unpacker."""
170 return self.__struct_type_encode(msgdef, buf, offset, kwargs)
172 return self.__struct_type_decode(msgdef, buf, offset)
174 def __struct_type_encode(self, msgdef, buf, offset, kwargs):
179 if k not in msgdef['args']:
180 raise ValueError(1, 'Invalid field-name in message call ' + k)
182 for k,v in msgdef['args'].iteritems():
187 e = kwargs[v[0]] if v[0] in kwargs else v[0]
190 size += v[1](self, True, buf, off + size,
198 buf[off:off + l] = bytearray(kwargs[k])
203 v[1].pack_into(buf, off + size, i)
207 size = v(self, True, buf, off, kwargs[k])
209 v.pack_into(buf, off, kwargs[k])
212 size = v.size if not type(v) is list else 0
214 return off + size - offset
217 def __getitem__(self, name):
218 if name in self.messages:
219 return self.messages[name]
222 def encode(self, msgdef, kwargs):
223 # Make suitably large buffer
224 buf = bytearray(self.buffersize)
226 size = self.__struct_type(True, msgdef, buf, offset, kwargs)
227 return buf[:offset + size]
229 def decode(self, msgdef, buf):
230 return self.__struct_type(False, msgdef, buf, 0, None)[1]
232 def __struct_type_decode(self, msgdef, buf, offset):
236 for k,v in msgdef['args'].iteritems():
240 if callable(v[1]): # compound type
242 if v[0] in msgdef['args']: # vla
248 (s,l) = v[1](self, False, buf, off + size, None)
253 if type(v[0]) is int:
254 size = len(buf) - off
257 res.append(buf[off:off + size])
259 e = v[0] if type(v[0]) is int else res[v[2]]
261 e = (len(buf) - off) / v[1].size
266 lst.append(v[1].unpack_from(buf, off + size)[0])
270 (s,l) = v(self, False, buf, off, None)
274 res.append(v.unpack_from(buf, off)[0])
277 return off + size - offset, msgdef['return_tuple']._make(res)
279 def ret_tup(self, name):
280 if name in self.messages and 'return_tuple' in self.messages[name]:
281 return self.messages[name]['return_tuple']
284 def add_message(self, name, msgdef):
285 if name in self.messages:
286 raise ValueError('Duplicate message name: ' + name)
288 args = collections.OrderedDict()
289 argtypes = collections.OrderedDict()
292 for i, f in enumerate(msgdef):
293 if type(f) is dict and 'crc' in f:
294 msg['crc'] = f['crc']
298 if len(f) == 3 and f[2] == 0 and i != len(msgdef) - 2:
299 raise ValueError('Variable Length Array must be last: ' + name)
300 args[field_name] = self.__struct(*f)
301 argtypes[field_name] = field_type
302 if len(f) == 4: # Find offset to # elements field
303 args[field_name].append(args.keys().index(f[3]) - i)
304 fields.append(field_name)
305 msg['return_tuple'] = collections.namedtuple(name, fields,
307 self.messages[name] = msg
308 self.messages[name]['args'] = args
309 self.messages[name]['argtypes'] = argtypes
310 return self.messages[name]
312 def add_type(self, name, typedef):
313 return self.add_message('vl_api_' + name + '_t', typedef)
315 def make_function(self, name, i, msgdef, multipart, async):
317 f = lambda **kwargs: (self._call_vpp_async(i, msgdef, **kwargs))
319 f = lambda **kwargs: (self._call_vpp(i, msgdef, multipart, **kwargs))
320 args = self.messages[name]['args']
321 argtypes = self.messages[name]['argtypes']
322 f.__name__ = str(name)
323 f.__doc__ = ", ".join(["%s %s" % (argtypes[k], k) for k in args.keys()])
328 if not hasattr(self, "_api"):
329 raise Exception("Not connected, api definitions not available")
332 def _register_functions(self, async=False):
333 self.id_names = [None] * (self.vpp_dictionary_maxid + 1)
334 self.id_msgdef = [None] * (self.vpp_dictionary_maxid + 1)
336 for name, msgdef in self.messages.iteritems():
337 if name in self.vpp_dictionary:
338 if self.messages[name]['crc'] != self.vpp_dictionary[name]['crc']:
339 raise ValueError(3, 'Failed CRC checksum ' + name +
340 ' ' + self.messages[name]['crc'] +
341 ' ' + self.vpp_dictionary[name]['crc'])
342 i = self.vpp_dictionary[name]['id']
343 self.id_msgdef[i] = msgdef
344 self.id_names[i] = name
345 multipart = True if name.find('_dump') > 0 else False
346 f = self.make_function(name, i, msgdef, multipart, async)
347 setattr(self._api, name, FuncWrapper(f))
349 # olf API stuff starts here - will be removed in 17.07
350 if hasattr(self, name):
352 3, "Conflicting name in JSON definition: `%s'" % name)
353 setattr(self, name, f)
354 # old API stuff ends here
356 def _write (self, buf):
357 """Send a binary-packed message to VPP."""
358 if not self.connected:
359 raise IOError(1, 'Not connected')
360 return vpp_api.write(str(buf))
362 def _load_dictionary(self):
363 self.vpp_dictionary = {}
364 self.vpp_dictionary_maxid = 0
365 d = vpp_api.msg_table()
368 raise IOError(3, 'Cannot get VPP API dictionary')
370 name, crc = n.rsplit('_', 1)
372 self.vpp_dictionary[name] = { 'id' : i, 'crc' : crc }
373 self.vpp_dictionary_maxid = max(self.vpp_dictionary_maxid, i)
375 def connect(self, name, chroot_prefix = None, async = False, rx_qlen = 32):
378 name - the name of the client.
379 chroot_prefix - if VPP is chroot'ed, the prefix of the jail
380 async - if true, messages are sent without waiting for a reply
381 rx_qlen - the length of the VPP message receive queue between
384 msg_handler = self.msg_handler_sync if not async else self.msg_handler_async
385 if chroot_prefix is not None:
386 rv = vpp_api.connect(name, msg_handler, rx_qlen, chroot_prefix)
388 rv = vpp_api.connect(name, msg_handler, rx_qlen)
391 raise IOError(2, 'Connect failed')
392 self.connected = True
394 self._load_dictionary()
395 self._register_functions(async=async)
397 # Initialise control ping
398 self.control_ping_index = self.vpp_dictionary['control_ping']['id']
399 self.control_ping_msgdef = self.messages['control_ping']
401 def disconnect(self):
402 """Detach from VPP."""
403 rv = vpp_api.disconnect()
404 self.connected = False
407 def results_wait(self, context):
408 """In a sync call, wait for the reply
410 The context ID is used to pair reply to request.
413 # Results is filled by the background callback. It will
414 # raise the event when the context receives a response.
415 # Given there are two threads we have to be careful with the
416 # use of results and the structures under it, hence the lock.
417 with self.results_lock:
418 result = self.results[context]
421 timed_out = not ev.wait(self.timeout)
424 raise IOError(3, 'Waiting for reply timed out')
426 with self.results_lock:
427 result = self.results[context]
428 del self.results[context]
431 def results_prepare(self, context, multi=False):
432 """Prep for receiving a result in response to a request msg
434 context - unique context number sent in request and
435 returned in reply or replies
436 multi - true if we expect multiple messages from this
440 # The event is used to indicate that all results are in
442 'e': threading.Event(),
445 # Make it clear to the BG thread it's going to see several
446 # messages; messages are stored in a results array
447 new_result['m'] = True
450 new_result['e'].clear()
452 # Put the prepped result structure into results, at which point
453 # the bg thread can also access it (hence the thread lock)
454 with self.results_lock:
455 self.results[context] = new_result
457 def msg_handler_sync(self, msg):
458 """Process an incoming message from VPP in sync mode.
460 The message may be a reply or it may be an async notification.
462 r = self.decode_incoming_msg(msg)
466 # If we have a context, then use the context to find any
467 # request waiting for a reply
469 if hasattr(r, 'context') and r.context > 0:
472 msgname = type(r).__name__
475 # No context -> async notification that we feed to the callback
476 if self.event_callback:
477 self.event_callback(msgname, r)
479 # Context -> use the results structure (carefully) to find
480 # who we're responding to and return the message to that
482 with self.results_lock:
483 if context not in self.results:
484 eprint('Not expecting results for this context', context, r)
486 result = self.results[context]
489 # Collect results until control ping
492 if msgname == 'control_ping_reply':
495 elif 'm' in self.results[context]:
496 # One element in a multipart
497 result['r'].append(r)
499 # All of a single result
503 def decode_incoming_msg(self, msg):
505 eprint('vpp_api.read failed')
508 i, ci = self.header.unpack_from(msg, 0)
509 if self.id_names[i] == 'rx_thread_exit':
513 # Decode message and returns a tuple.
515 msgdef = self.id_msgdef[i]
517 raise IOError(2, 'Reply message undefined')
519 r = self.decode(msgdef, msg)
523 def msg_handler_async(self, msg):
524 """Process a message from VPP in async mode.
526 In async mode, all messages are returned to the callback.
528 r = self.decode_incoming_msg(msg)
532 msgname = type(r).__name__
534 if self.event_callback:
535 self.event_callback(msgname, r)
537 def _control_ping(self, context):
538 """Send a ping command."""
539 self._call_vpp_async(self.control_ping_index,
540 self.control_ping_msgdef,
543 def _call_vpp(self, i, msgdef, multipart, **kwargs):
544 """Given a message, send the message and await a reply.
546 msgdef - the message packing definition
547 i - the message type index
548 multipart - True if the message returns multiple
550 context - context number - chosen at random if not
552 The remainder of the kwargs are the arguments to the API call.
554 The return value is the message or message array containing
555 the response. It will raise an IOError exception if there was
556 no response within the timeout window.
559 # We need a context if not supplied, in order to get the
561 context = kwargs.get('context', self.get_context())
562 kwargs['context'] = context
564 # Set up to receive a response
565 self.results_prepare(context, multi=multipart)
568 self._call_vpp_async(i, msgdef, **kwargs)
571 # Send a ping after the request - we use its response
572 # to detect that we have seen all results.
573 self._control_ping(context)
575 # Block until we get a reply.
576 r = self.results_wait(context)
580 def _call_vpp_async(self, i, msgdef, **kwargs):
581 """Given a message, send the message and await a reply.
583 msgdef - the message packing definition
584 i - the message type index
585 context - context number - chosen at random if not
587 The remainder of the kwargs are the arguments to the API call.
589 if not 'context' in kwargs:
590 context = self.get_context()
591 kwargs['context'] = context
593 context = kwargs['context']
594 kwargs['_vl_msg_id'] = i
595 b = self.encode(msgdef, kwargs)
599 def register_event_callback(self, callback):
600 """Register a callback for async messages.
602 This will be called for async notifications in sync mode,
603 and all messages in async mode. In sync mode, replies to
604 requests will not come here.
606 callback is a fn(msg_type_name, msg_type) that will be
607 called when a message comes in. While this function is
608 executing, note that (a) you are in a background thread and
609 may wish to use threading.Lock to protect your datastructures,
610 and (b) message processing from VPP will stop (so if you take
611 a long while about it you may provoke reply timeouts or cause
612 VPP to fill the RX buffer). Passing None will disable the
615 self.event_callback = callback