Fix PAPI async response
[vpp.git] / src / vpp-api / python / vpp_papi / vpp_papi.py
1 #!/usr/bin/env python
2 #
3 # Copyright (c) 2016 Cisco and/or its affiliates.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16
17 from __future__ import print_function
18 import sys, os, logging, collections, struct, json, threading, glob
19 import atexit
20
21 logging.basicConfig(level=logging.DEBUG)
22 import vpp_api
23
24 def eprint(*args, **kwargs):
25     """Print critical diagnostics to stderr."""
26     print(*args, file=sys.stderr, **kwargs)
27
28 def vpp_atexit(self):
29     """Clean up VPP connection on shutdown."""
30     if self.connected:
31         eprint ('Cleaning up VPP on exit')
32         self.disconnect()
33
34 class VPP():
35     """VPP interface.
36
37     This class provides the APIs to VPP.  The APIs are loaded
38     from provided .api.json files and makes functions accordingly.
39     These functions are documented in the VPP .api files, as they
40     are dynamically created.
41
42     Additionally, VPP can send callback messages; this class
43     provides a means to register a callback function to receive
44     these messages in a background thread.
45     """
46     def __init__(self, apifiles = None, testmode = False):
47         """Create a VPP API object.
48
49         apifiles is a list of files containing API
50         descriptions that will be loaded - methods will be
51         dynamically created reflecting these APIs.  If not
52         provided this will load the API files from VPP's
53         default install location.
54         """
55         self.messages = {}
56         self.id_names = []
57         self.id_msgdef = []
58         self.buffersize = 10000
59         self.connected = False
60         self.header = struct.Struct('>HI')
61         self.results_lock = threading.Lock()
62         self.results = {}
63         self.timeout = 5
64         self.apifiles = []
65
66         if not apifiles:
67             # Pick up API definitions from default directory
68             apifiles = glob.glob('/usr/share/vpp/api/*.api.json')
69
70         for file in apifiles:
71             with open(file) as apidef_file:
72                 api = json.load(apidef_file)
73                 for t in api['types']:
74                     self.add_type(t[0], t[1:])
75
76                 for m in api['messages']:
77                     self.add_message(m[0], m[1:])
78         self.apifiles = apifiles
79
80         # Basic sanity check
81         if len(self.messages) == 0 and not testmode:
82             raise ValueError(1, 'Missing JSON message definitions')
83
84         # Make sure we allow VPP to clean up the message rings.
85         atexit.register(vpp_atexit, self)
86
87     class ContextId(object):
88         """Thread-safe provider of unique context IDs."""
89         def __init__(self):
90             self.context = 0
91             self.lock = threading.Lock()
92         def __call__(self):
93             """Get a new unique (or, at least, not recently used) context."""
94             with self.lock:
95                 self.context += 1
96                 return self.context
97     get_context = ContextId()
98
99     def status(self):
100         """Debug function: report current VPP API status to stdout."""
101         print('Connected') if self.connected else print('Not Connected')
102         print('Read API definitions from', ', '.join(self.apifiles))
103
104     def __struct (self, t, n = None, e = -1, vl = None):
105         """Create a packing structure for a message."""
106         base_types = { 'u8' : 'B',
107                        'u16' : 'H',
108                        'u32' : 'I',
109                        'i32' : 'i',
110                        'u64' : 'Q',
111                        'f64' : 'd',
112                        }
113         pack = None
114         if t in base_types:
115             pack = base_types[t]
116             if not vl:
117                 if e > 0 and t == 'u8':
118                     # Fixed byte array
119                     return struct.Struct('>' + str(e) + 's')
120                 if e > 0:
121                     # Fixed array of base type
122                     return [e, struct.Struct('>' + base_types[t])]
123                 elif e == 0:
124                     # Old style variable array
125                     return [-1, struct.Struct('>' + base_types[t])]
126             else:
127                 # Variable length array
128                 return [vl, struct.Struct('>s')] if t == 'u8' else \
129                     [vl, struct.Struct('>' + base_types[t])]
130
131             return struct.Struct('>' + base_types[t])
132
133         if t in self.messages:
134             ### Return a list in case of array ###
135             if e > 0 and not vl:
136                 return [e, lambda self, encode, buf, offset, args: (
137                     self.__struct_type(encode, self.messages[t], buf, offset,
138                                        args))]
139             if vl:
140                 return [vl, lambda self, encode, buf, offset, args: (
141                     self.__struct_type(encode, self.messages[t], buf, offset,
142                                        args))]
143             elif e == 0:
144                 # Old style VLA
145                 raise NotImplementedError(1, 'No support for compound types ' + t)
146             return lambda self, encode, buf, offset, args: (
147                 self.__struct_type(encode, self.messages[t], buf, offset, args)
148             )
149
150         raise ValueError(1, 'Invalid message type: ' + t)
151
152     def __struct_type(self, encode, msgdef, buf, offset, kwargs):
153         """Get a message packer or unpacker."""
154         if encode:
155             return self.__struct_type_encode(msgdef, buf, offset, kwargs)
156         else:
157             return self.__struct_type_decode(msgdef, buf, offset)
158
159     def __struct_type_encode(self, msgdef, buf, offset, kwargs):
160         off = offset
161         size = 0
162
163         for k in kwargs:
164             if k not in msgdef['args']:
165                 raise ValueError(1, 'Invalid field-name in message call ' + k)
166
167         for k,v in msgdef['args'].iteritems():
168             off += size
169             if k in kwargs:
170                 if type(v) is list:
171                     if callable(v[1]):
172                         e = kwargs[v[0]] if v[0] in kwargs else v[0]
173                         size = 0
174                         for i in range(e):
175                             size += v[1](self, True, buf, off + size,
176                                          kwargs[k][i])
177                     else:
178                         if v[0] in kwargs:
179                             l = kwargs[v[0]]
180                         else:
181                             l = len(kwargs[k])
182                         if v[1].size == 1:
183                             buf[off:off + l] = bytearray(kwargs[k])
184                             size = l
185                         else:
186                             size = 0
187                             for i in kwargs[k]:
188                                 v[1].pack_into(buf, off + size, i)
189                                 size += v[1].size
190                 else:
191                     if callable(v):
192                         size = v(self, True, buf, off, kwargs[k])
193                     else:
194                         v.pack_into(buf, off, kwargs[k])
195                         size = v.size
196             else:
197                 size = v.size if not type(v) is list else 0
198
199         return off + size - offset
200
201
202     def __getitem__(self, name):
203         if name in self.messages:
204             return self.messages[name]
205         return None
206
207     def encode(self, msgdef, kwargs):
208         # Make suitably large buffer
209         buf = bytearray(self.buffersize)
210         offset = 0
211         size = self.__struct_type(True, msgdef, buf, offset, kwargs)
212         return buf[:offset + size]
213
214     def decode(self, msgdef, buf):
215         return self.__struct_type(False, msgdef, buf, 0, None)[1]
216
217     def __struct_type_decode(self, msgdef, buf, offset):
218         res = []
219         off = offset
220         size = 0
221         for k,v in msgdef['args'].iteritems():
222             off += size
223             if type(v) is list:
224                 lst = []
225                 if callable(v[1]): # compound type
226                     size = 0
227                     if v[0] in msgdef['args']: # vla
228                         e = res[v[2]]
229                     else: # fixed array
230                         e = v[0]
231                     res.append(lst)
232                     for i in range(e):
233                         (s,l) = v[1](self, False, buf, off + size, None)
234                         lst.append(l)
235                         size += s
236                     continue
237                 if v[1].size == 1:
238                     if type(v[0]) is int:
239                         size = len(buf) - off
240                     else:
241                         size = res[v[2]]
242                     res.append(buf[off:off + size])
243                 else:
244                     e = v[0] if type(v[0]) is int else res[v[2]]
245                     if e == -1:
246                         e = (len(buf) - off) / v[1].size
247                     lst = []
248                     res.append(lst)
249                     size = 0
250                     for i in range(e):
251                         lst.append(v[1].unpack_from(buf, off + size)[0])
252                         size += v[1].size
253             else:
254                 if callable(v):
255                     (s,l) = v(self, False, buf, off, None)
256                     res.append(l)
257                     size += s
258                 else:
259                     res.append(v.unpack_from(buf, off)[0])
260                     size = v.size
261
262         return off + size - offset, msgdef['return_tuple']._make(res)
263
264     def ret_tup(self, name):
265         if name in self.messages and 'return_tuple' in self.messages[name]:
266             return self.messages[name]['return_tuple']
267         return None
268
269     def add_message(self, name, msgdef):
270         if name in self.messages:
271             raise ValueError('Duplicate message name: ' + name)
272
273         args = collections.OrderedDict()
274         argtypes = collections.OrderedDict()
275         fields = []
276         msg = {}
277         for i, f in enumerate(msgdef):
278             if type(f) is dict and 'crc' in f:
279                 msg['crc'] = f['crc']
280                 continue
281             field_type = f[0]
282             field_name = f[1]
283             if len(f) == 3 and f[2] == 0 and i != len(msgdef) - 2:
284                 raise ValueError('Variable Length Array must be last: ' + name)
285             args[field_name] = self.__struct(*f)
286             argtypes[field_name] = field_type
287             if len(f) == 4: # Find offset to # elements field
288                 args[field_name].append(args.keys().index(f[3]) - i)
289             fields.append(field_name)
290         msg['return_tuple'] = collections.namedtuple(name, fields,
291                                                      rename = True)
292         self.messages[name] = msg
293         self.messages[name]['args'] = args
294         self.messages[name]['argtypes'] = argtypes
295         return self.messages[name]
296
297     def add_type(self, name, typedef):
298         return self.add_message('vl_api_' + name + '_t', typedef)
299
300     def make_function(self, name, i, msgdef, multipart, async):
301         if (async):
302             f = lambda **kwargs: (self._call_vpp_async(i, msgdef, **kwargs))
303         else:
304             f = lambda **kwargs: (self._call_vpp(i, msgdef, multipart, **kwargs))
305         args = self.messages[name]['args']
306         argtypes = self.messages[name]['argtypes']
307         f.__name__ = str(name)
308         f.__doc__ = ", ".join(["%s %s" % (argtypes[k], k) for k in args.keys()])
309         return f
310
311     def _register_functions(self, async=False):
312         self.id_names = [None] * (self.vpp_dictionary_maxid + 1)
313         self.id_msgdef = [None] * (self.vpp_dictionary_maxid + 1)
314         for name, msgdef in self.messages.iteritems():
315             if name in self.vpp_dictionary:
316                 if self.messages[name]['crc'] != self.vpp_dictionary[name]['crc']:
317                     raise ValueError(3, 'Failed CRC checksum ' + name +
318                                      ' ' + self.messages[name]['crc'] +
319                                      ' ' + self.vpp_dictionary[name]['crc'])
320                 i = self.vpp_dictionary[name]['id']
321                 self.id_msgdef[i] = msgdef
322                 self.id_names[i] = name
323                 multipart = True if name.find('_dump') > 0 else False
324                 setattr(self, name, self.make_function(name, i, msgdef, multipart, async))
325
326     def _write (self, buf):
327         """Send a binary-packed message to VPP."""
328         if not self.connected:
329             raise IOError(1, 'Not connected')
330         return vpp_api.write(str(buf))
331
332     def _load_dictionary(self):
333         self.vpp_dictionary = {}
334         self.vpp_dictionary_maxid = 0
335         d = vpp_api.msg_table()
336
337         if not d:
338             raise IOError(3, 'Cannot get VPP API dictionary')
339         for i,n in d:
340             name, crc =  n.rsplit('_', 1)
341             crc = '0x' + crc
342             self.vpp_dictionary[name] = { 'id' : i, 'crc' : crc }
343             self.vpp_dictionary_maxid = max(self.vpp_dictionary_maxid, i)
344
345     def connect(self, name, chroot_prefix = None, async = False, rx_qlen = 32):
346         """Attach to VPP.
347
348         name - the name of the client.
349         chroot_prefix - if VPP is chroot'ed, the prefix of the jail
350         async - if true, messages are sent without waiting for a reply
351         rx_qlen - the length of the VPP message receive queue between
352         client and server.
353         """
354         msg_handler = self.msg_handler_sync if not async else self.msg_handler_async
355         if chroot_prefix is not None:
356             rv = vpp_api.connect(name, msg_handler, rx_qlen, chroot_prefix)
357         else:
358             rv = vpp_api.connect(name, msg_handler, rx_qlen)
359
360         if rv != 0:
361             raise IOError(2, 'Connect failed')
362         self.connected = True
363
364         self._load_dictionary()
365         self._register_functions(async=async)
366
367         # Initialise control ping
368         self.control_ping_index = self.vpp_dictionary['control_ping']['id']
369         self.control_ping_msgdef = self.messages['control_ping']
370
371     def disconnect(self):
372         """Detach from VPP."""
373         rv = vpp_api.disconnect()
374         self.connected = False
375         return rv
376
377     def results_wait(self, context):
378         """In a sync call, wait for the reply
379
380         The context ID is used to pair reply to request.
381         """
382
383         # Results is filled by the background callback.  It will
384         # raise the event when the context receives a response.
385         # Given there are two threads we have to be careful with the
386         # use of results and the structures under it, hence the lock.
387         with self.results_lock:
388             result = self.results[context]
389             ev = result['e']
390
391         timed_out = not ev.wait(self.timeout)
392
393         if timed_out:
394            raise IOError(3, 'Waiting for reply timed out')
395         else:
396             with self.results_lock:
397                 result = self.results[context]
398                 del self.results[context]
399                 return result['r']
400
401     def results_prepare(self, context, multi=False):
402         """Prep for receiving a result in response to a request msg
403
404         context - unique context number sent in request and
405         returned in reply or replies
406         multi - true if we expect multiple messages from this
407         reply.
408         """
409
410         # The event is used to indicate that all results are in
411         new_result = {
412             'e': threading.Event(),
413         }
414         if multi:
415             # Make it clear to the BG thread it's going to see several
416             # messages; messages are stored in a results array
417             new_result['m'] = True
418             new_result['r'] = []
419
420         new_result['e'].clear()
421
422         # Put the prepped result structure into results, at which point
423         # the bg thread can also access it (hence the thread lock)
424         with self.results_lock:
425             self.results[context] = new_result
426
427     def msg_handler_sync(self, msg):
428         """Process an incoming message from VPP in sync mode.
429
430         The message may be a reply or it may be an async notification.
431         """
432         r = self.decode_incoming_msg(msg)
433         if r is None:
434             return
435
436         # If we have a context, then use the context to find any
437         # request waiting for a reply
438         context = 0
439         if hasattr(r, 'context') and r.context > 0:
440             context = r.context
441
442         msgname = type(r).__name__
443
444         if context == 0:
445             # No context -> async notification that we feed to the callback
446             if self.event_callback:
447                 self.event_callback(msgname, r)
448         else:
449             # Context -> use the results structure (carefully) to find
450             # who we're responding to and return the message to that
451             # thread
452             with self.results_lock:
453                 if context not in self.results:
454                     eprint('Not expecting results for this context', context, r)
455                 else:
456                     result = self.results[context]
457
458                     #
459                     # Collect results until control ping
460                     #
461
462                     if msgname == 'control_ping_reply':
463                         # End of a multipart
464                         result['e'].set()
465                     elif 'm' in self.results[context]:
466                         # One element in a multipart
467                         result['r'].append(r)
468                     else:
469                         # All of a single result
470                         result['r'] = r
471                         result['e'].set()
472
473     def decode_incoming_msg(self, msg):
474         if not msg:
475             eprint('vpp_api.read failed')
476             return
477
478         i, ci = self.header.unpack_from(msg, 0)
479         if self.id_names[i] == 'rx_thread_exit':
480             return
481
482         #
483         # Decode message and returns a tuple.
484         #
485         msgdef = self.id_msgdef[i]
486         if not msgdef:
487             raise IOError(2, 'Reply message undefined')
488
489         r = self.decode(msgdef, msg)
490
491         return r
492
493     def msg_handler_async(self, msg):
494         """Process a message from VPP in async mode.
495
496         In async mode, all messages are returned to the callback.
497         """
498         r = self.decode_incoming_msg(msg)
499         if r is None:
500             return
501
502         msgname = type(r).__name__
503
504         if self.event_callback:
505             self.event_callback(msgname, r)
506
507     def _control_ping(self, context):
508         """Send a ping command."""
509         self._call_vpp_async(self.control_ping_index,
510                              self.control_ping_msgdef,
511                              context=context)
512
513     def _call_vpp(self, i, msgdef, multipart, **kwargs):
514         """Given a message, send the message and await a reply.
515
516         msgdef - the message packing definition
517         i - the message type index
518         multipart - True if the message returns multiple
519         messages in return.
520         context - context number - chosen at random if not
521         supplied.
522         The remainder of the kwargs are the arguments to the API call.
523
524         The return value is the message or message array containing
525         the response.  It will raise an IOError exception if there was
526         no response within the timeout window.
527         """
528
529         # We need a context if not supplied, in order to get the
530         # response
531         context = kwargs.get('context', self.get_context())
532         kwargs['context'] = context
533
534         # Set up to receive a response
535         self.results_prepare(context, multi=multipart)
536
537         # Output the message
538         self._call_vpp_async(i, msgdef, **kwargs)
539
540         if multipart:
541             # Send a ping after the request - we use its response
542             # to detect that we have seen all results.
543             self._control_ping(context)
544
545         # Block until we get a reply.
546         r = self.results_wait(context)
547
548         return r
549
550     def _call_vpp_async(self, i, msgdef, **kwargs):
551         """Given a message, send the message and await a reply.
552
553         msgdef - the message packing definition
554         i - the message type index
555         context - context number - chosen at random if not
556         supplied.
557         The remainder of the kwargs are the arguments to the API call.
558         """
559         if not 'context' in kwargs:
560             context = self.get_context()
561             kwargs['context'] = context
562         else:
563             context = kwargs['context']
564         kwargs['_vl_msg_id'] = i
565         b = self.encode(msgdef, kwargs)
566
567         self._write(b)
568
569     def register_event_callback(self, callback):
570         """Register a callback for async messages.
571
572         This will be called for async notifications in sync mode,
573         and all messages in async mode.  In sync mode, replies to
574         requests will not come here.
575
576         callback is a fn(msg_type_name, msg_type) that will be
577         called when a message comes in.  While this function is
578         executing, note that (a) you are in a background thread and
579         may wish to use threading.Lock to protect your datastructures,
580         and (b) message processing from VPP will stop (so if you take
581         a long while about it you may provoke reply timeouts or cause
582         VPP to fill the RX buffer).  Passing None will disable the
583         callback.
584         """
585         self.event_callback = callback