Python API: event_callback not initialised.
[vpp.git] / src / vpp-api / python / vpp_papi / vpp_papi.py
1 #!/usr/bin/env python
2 #
3 # Copyright (c) 2016 Cisco and/or its affiliates.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16
17 from __future__ import print_function
18 import sys, os, logging, collections, struct, json, threading, glob
19 import atexit
20
21 logging.basicConfig(level=logging.DEBUG)
22 import vpp_api
23
24 def eprint(*args, **kwargs):
25     """Print critical diagnostics to stderr."""
26     print(*args, file=sys.stderr, **kwargs)
27
28 def vpp_atexit(self):
29     """Clean up VPP connection on shutdown."""
30     if self.connected:
31         eprint ('Cleaning up VPP on exit')
32         self.disconnect()
33
34 class VPP():
35     """VPP interface.
36
37     This class provides the APIs to VPP.  The APIs are loaded
38     from provided .api.json files and makes functions accordingly.
39     These functions are documented in the VPP .api files, as they
40     are dynamically created.
41
42     Additionally, VPP can send callback messages; this class
43     provides a means to register a callback function to receive
44     these messages in a background thread.
45     """
46     def __init__(self, apifiles = None, testmode = False):
47         """Create a VPP API object.
48
49         apifiles is a list of files containing API
50         descriptions that will be loaded - methods will be
51         dynamically created reflecting these APIs.  If not
52         provided this will load the API files from VPP's
53         default install location.
54         """
55         self.messages = {}
56         self.id_names = []
57         self.id_msgdef = []
58         self.buffersize = 10000
59         self.connected = False
60         self.header = struct.Struct('>HI')
61         self.results_lock = threading.Lock()
62         self.results = {}
63         self.timeout = 5
64         self.apifiles = []
65         self.event_callback = None
66
67         if not apifiles:
68             # Pick up API definitions from default directory
69             apifiles = glob.glob('/usr/share/vpp/api/*.api.json')
70
71         for file in apifiles:
72             with open(file) as apidef_file:
73                 api = json.load(apidef_file)
74                 for t in api['types']:
75                     self.add_type(t[0], t[1:])
76
77                 for m in api['messages']:
78                     self.add_message(m[0], m[1:])
79         self.apifiles = apifiles
80
81         # Basic sanity check
82         if len(self.messages) == 0 and not testmode:
83             raise ValueError(1, 'Missing JSON message definitions')
84
85         # Make sure we allow VPP to clean up the message rings.
86         atexit.register(vpp_atexit, self)
87
88     class ContextId(object):
89         """Thread-safe provider of unique context IDs."""
90         def __init__(self):
91             self.context = 0
92             self.lock = threading.Lock()
93         def __call__(self):
94             """Get a new unique (or, at least, not recently used) context."""
95             with self.lock:
96                 self.context += 1
97                 return self.context
98     get_context = ContextId()
99
100     def status(self):
101         """Debug function: report current VPP API status to stdout."""
102         print('Connected') if self.connected else print('Not Connected')
103         print('Read API definitions from', ', '.join(self.apifiles))
104
105     def __struct (self, t, n = None, e = -1, vl = None):
106         """Create a packing structure for a message."""
107         base_types = { 'u8' : 'B',
108                        'u16' : 'H',
109                        'u32' : 'I',
110                        'i32' : 'i',
111                        'u64' : 'Q',
112                        'f64' : 'd',
113                        }
114         pack = None
115         if t in base_types:
116             pack = base_types[t]
117             if not vl:
118                 if e > 0 and t == 'u8':
119                     # Fixed byte array
120                     return struct.Struct('>' + str(e) + 's')
121                 if e > 0:
122                     # Fixed array of base type
123                     return [e, struct.Struct('>' + base_types[t])]
124                 elif e == 0:
125                     # Old style variable array
126                     return [-1, struct.Struct('>' + base_types[t])]
127             else:
128                 # Variable length array
129                 return [vl, struct.Struct('>s')] if t == 'u8' else \
130                     [vl, struct.Struct('>' + base_types[t])]
131
132             return struct.Struct('>' + base_types[t])
133
134         if t in self.messages:
135             ### Return a list in case of array ###
136             if e > 0 and not vl:
137                 return [e, lambda self, encode, buf, offset, args: (
138                     self.__struct_type(encode, self.messages[t], buf, offset,
139                                        args))]
140             if vl:
141                 return [vl, lambda self, encode, buf, offset, args: (
142                     self.__struct_type(encode, self.messages[t], buf, offset,
143                                        args))]
144             elif e == 0:
145                 # Old style VLA
146                 raise NotImplementedError(1, 'No support for compound types ' + t)
147             return lambda self, encode, buf, offset, args: (
148                 self.__struct_type(encode, self.messages[t], buf, offset, args)
149             )
150
151         raise ValueError(1, 'Invalid message type: ' + t)
152
153     def __struct_type(self, encode, msgdef, buf, offset, kwargs):
154         """Get a message packer or unpacker."""
155         if encode:
156             return self.__struct_type_encode(msgdef, buf, offset, kwargs)
157         else:
158             return self.__struct_type_decode(msgdef, buf, offset)
159
160     def __struct_type_encode(self, msgdef, buf, offset, kwargs):
161         off = offset
162         size = 0
163
164         for k in kwargs:
165             if k not in msgdef['args']:
166                 raise ValueError(1, 'Invalid field-name in message call ' + k)
167
168         for k,v in msgdef['args'].iteritems():
169             off += size
170             if k in kwargs:
171                 if type(v) is list:
172                     if callable(v[1]):
173                         e = kwargs[v[0]] if v[0] in kwargs else v[0]
174                         size = 0
175                         for i in range(e):
176                             size += v[1](self, True, buf, off + size,
177                                          kwargs[k][i])
178                     else:
179                         if v[0] in kwargs:
180                             l = kwargs[v[0]]
181                         else:
182                             l = len(kwargs[k])
183                         if v[1].size == 1:
184                             buf[off:off + l] = bytearray(kwargs[k])
185                             size = l
186                         else:
187                             size = 0
188                             for i in kwargs[k]:
189                                 v[1].pack_into(buf, off + size, i)
190                                 size += v[1].size
191                 else:
192                     if callable(v):
193                         size = v(self, True, buf, off, kwargs[k])
194                     else:
195                         v.pack_into(buf, off, kwargs[k])
196                         size = v.size
197             else:
198                 size = v.size if not type(v) is list else 0
199
200         return off + size - offset
201
202
203     def __getitem__(self, name):
204         if name in self.messages:
205             return self.messages[name]
206         return None
207
208     def encode(self, msgdef, kwargs):
209         # Make suitably large buffer
210         buf = bytearray(self.buffersize)
211         offset = 0
212         size = self.__struct_type(True, msgdef, buf, offset, kwargs)
213         return buf[:offset + size]
214
215     def decode(self, msgdef, buf):
216         return self.__struct_type(False, msgdef, buf, 0, None)[1]
217
218     def __struct_type_decode(self, msgdef, buf, offset):
219         res = []
220         off = offset
221         size = 0
222         for k,v in msgdef['args'].iteritems():
223             off += size
224             if type(v) is list:
225                 lst = []
226                 if callable(v[1]): # compound type
227                     size = 0
228                     if v[0] in msgdef['args']: # vla
229                         e = res[v[2]]
230                     else: # fixed array
231                         e = v[0]
232                     res.append(lst)
233                     for i in range(e):
234                         (s,l) = v[1](self, False, buf, off + size, None)
235                         lst.append(l)
236                         size += s
237                     continue
238                 if v[1].size == 1:
239                     if type(v[0]) is int:
240                         size = len(buf) - off
241                     else:
242                         size = res[v[2]]
243                     res.append(buf[off:off + size])
244                 else:
245                     e = v[0] if type(v[0]) is int else res[v[2]]
246                     if e == -1:
247                         e = (len(buf) - off) / v[1].size
248                     lst = []
249                     res.append(lst)
250                     size = 0
251                     for i in range(e):
252                         lst.append(v[1].unpack_from(buf, off + size)[0])
253                         size += v[1].size
254             else:
255                 if callable(v):
256                     (s,l) = v(self, False, buf, off, None)
257                     res.append(l)
258                     size += s
259                 else:
260                     res.append(v.unpack_from(buf, off)[0])
261                     size = v.size
262
263         return off + size - offset, msgdef['return_tuple']._make(res)
264
265     def ret_tup(self, name):
266         if name in self.messages and 'return_tuple' in self.messages[name]:
267             return self.messages[name]['return_tuple']
268         return None
269
270     def add_message(self, name, msgdef):
271         if name in self.messages:
272             raise ValueError('Duplicate message name: ' + name)
273
274         args = collections.OrderedDict()
275         argtypes = collections.OrderedDict()
276         fields = []
277         msg = {}
278         for i, f in enumerate(msgdef):
279             if type(f) is dict and 'crc' in f:
280                 msg['crc'] = f['crc']
281                 continue
282             field_type = f[0]
283             field_name = f[1]
284             if len(f) == 3 and f[2] == 0 and i != len(msgdef) - 2:
285                 raise ValueError('Variable Length Array must be last: ' + name)
286             args[field_name] = self.__struct(*f)
287             argtypes[field_name] = field_type
288             if len(f) == 4: # Find offset to # elements field
289                 args[field_name].append(args.keys().index(f[3]) - i)
290             fields.append(field_name)
291         msg['return_tuple'] = collections.namedtuple(name, fields,
292                                                      rename = True)
293         self.messages[name] = msg
294         self.messages[name]['args'] = args
295         self.messages[name]['argtypes'] = argtypes
296         return self.messages[name]
297
298     def add_type(self, name, typedef):
299         return self.add_message('vl_api_' + name + '_t', typedef)
300
301     def make_function(self, name, i, msgdef, multipart, async):
302         if (async):
303             f = lambda **kwargs: (self._call_vpp_async(i, msgdef, **kwargs))
304         else:
305             f = lambda **kwargs: (self._call_vpp(i, msgdef, multipart, **kwargs))
306         args = self.messages[name]['args']
307         argtypes = self.messages[name]['argtypes']
308         f.__name__ = str(name)
309         f.__doc__ = ", ".join(["%s %s" % (argtypes[k], k) for k in args.keys()])
310         return f
311
312     def _register_functions(self, async=False):
313         self.id_names = [None] * (self.vpp_dictionary_maxid + 1)
314         self.id_msgdef = [None] * (self.vpp_dictionary_maxid + 1)
315         for name, msgdef in self.messages.iteritems():
316             if name in self.vpp_dictionary:
317                 if self.messages[name]['crc'] != self.vpp_dictionary[name]['crc']:
318                     raise ValueError(3, 'Failed CRC checksum ' + name +
319                                      ' ' + self.messages[name]['crc'] +
320                                      ' ' + self.vpp_dictionary[name]['crc'])
321                 i = self.vpp_dictionary[name]['id']
322                 self.id_msgdef[i] = msgdef
323                 self.id_names[i] = name
324                 multipart = True if name.find('_dump') > 0 else False
325                 setattr(self, name, self.make_function(name, i, msgdef, multipart, async))
326
327     def _write (self, buf):
328         """Send a binary-packed message to VPP."""
329         if not self.connected:
330             raise IOError(1, 'Not connected')
331         return vpp_api.write(str(buf))
332
333     def _load_dictionary(self):
334         self.vpp_dictionary = {}
335         self.vpp_dictionary_maxid = 0
336         d = vpp_api.msg_table()
337
338         if not d:
339             raise IOError(3, 'Cannot get VPP API dictionary')
340         for i,n in d:
341             name, crc =  n.rsplit('_', 1)
342             crc = '0x' + crc
343             self.vpp_dictionary[name] = { 'id' : i, 'crc' : crc }
344             self.vpp_dictionary_maxid = max(self.vpp_dictionary_maxid, i)
345
346     def connect(self, name, chroot_prefix = None, async = False, rx_qlen = 32):
347         """Attach to VPP.
348
349         name - the name of the client.
350         chroot_prefix - if VPP is chroot'ed, the prefix of the jail
351         async - if true, messages are sent without waiting for a reply
352         rx_qlen - the length of the VPP message receive queue between
353         client and server.
354         """
355         msg_handler = self.msg_handler_sync if not async else self.msg_handler_async
356         if chroot_prefix is not None:
357             rv = vpp_api.connect(name, msg_handler, rx_qlen, chroot_prefix)
358         else:
359             rv = vpp_api.connect(name, msg_handler, rx_qlen)
360
361         if rv != 0:
362             raise IOError(2, 'Connect failed')
363         self.connected = True
364
365         self._load_dictionary()
366         self._register_functions(async=async)
367
368         # Initialise control ping
369         self.control_ping_index = self.vpp_dictionary['control_ping']['id']
370         self.control_ping_msgdef = self.messages['control_ping']
371
372     def disconnect(self):
373         """Detach from VPP."""
374         rv = vpp_api.disconnect()
375         self.connected = False
376         return rv
377
378     def results_wait(self, context):
379         """In a sync call, wait for the reply
380
381         The context ID is used to pair reply to request.
382         """
383
384         # Results is filled by the background callback.  It will
385         # raise the event when the context receives a response.
386         # Given there are two threads we have to be careful with the
387         # use of results and the structures under it, hence the lock.
388         with self.results_lock:
389             result = self.results[context]
390             ev = result['e']
391
392         timed_out = not ev.wait(self.timeout)
393
394         if timed_out:
395            raise IOError(3, 'Waiting for reply timed out')
396         else:
397             with self.results_lock:
398                 result = self.results[context]
399                 del self.results[context]
400                 return result['r']
401
402     def results_prepare(self, context, multi=False):
403         """Prep for receiving a result in response to a request msg
404
405         context - unique context number sent in request and
406         returned in reply or replies
407         multi - true if we expect multiple messages from this
408         reply.
409         """
410
411         # The event is used to indicate that all results are in
412         new_result = {
413             'e': threading.Event(),
414         }
415         if multi:
416             # Make it clear to the BG thread it's going to see several
417             # messages; messages are stored in a results array
418             new_result['m'] = True
419             new_result['r'] = []
420
421         new_result['e'].clear()
422
423         # Put the prepped result structure into results, at which point
424         # the bg thread can also access it (hence the thread lock)
425         with self.results_lock:
426             self.results[context] = new_result
427
428     def msg_handler_sync(self, msg):
429         """Process an incoming message from VPP in sync mode.
430
431         The message may be a reply or it may be an async notification.
432         """
433         r = self.decode_incoming_msg(msg)
434         if r is None:
435             return
436
437         # If we have a context, then use the context to find any
438         # request waiting for a reply
439         context = 0
440         if hasattr(r, 'context') and r.context > 0:
441             context = r.context
442
443         msgname = type(r).__name__
444
445         if context == 0:
446             # No context -> async notification that we feed to the callback
447             if self.event_callback:
448                 self.event_callback(msgname, r)
449         else:
450             # Context -> use the results structure (carefully) to find
451             # who we're responding to and return the message to that
452             # thread
453             with self.results_lock:
454                 if context not in self.results:
455                     eprint('Not expecting results for this context', context, r)
456                 else:
457                     result = self.results[context]
458
459                     #
460                     # Collect results until control ping
461                     #
462
463                     if msgname == 'control_ping_reply':
464                         # End of a multipart
465                         result['e'].set()
466                     elif 'm' in self.results[context]:
467                         # One element in a multipart
468                         result['r'].append(r)
469                     else:
470                         # All of a single result
471                         result['r'] = r
472                         result['e'].set()
473
474     def decode_incoming_msg(self, msg):
475         if not msg:
476             eprint('vpp_api.read failed')
477             return
478
479         i, ci = self.header.unpack_from(msg, 0)
480         if self.id_names[i] == 'rx_thread_exit':
481             return
482
483         #
484         # Decode message and returns a tuple.
485         #
486         msgdef = self.id_msgdef[i]
487         if not msgdef:
488             raise IOError(2, 'Reply message undefined')
489
490         r = self.decode(msgdef, msg)
491
492         return r
493
494     def msg_handler_async(self, msg):
495         """Process a message from VPP in async mode.
496
497         In async mode, all messages are returned to the callback.
498         """
499         r = self.decode_incoming_msg(msg)
500         if r is None:
501             return
502
503         msgname = type(r).__name__
504
505         if self.event_callback:
506             self.event_callback(msgname, r)
507
508     def _control_ping(self, context):
509         """Send a ping command."""
510         self._call_vpp_async(self.control_ping_index,
511                              self.control_ping_msgdef,
512                              context=context)
513
514     def _call_vpp(self, i, msgdef, multipart, **kwargs):
515         """Given a message, send the message and await a reply.
516
517         msgdef - the message packing definition
518         i - the message type index
519         multipart - True if the message returns multiple
520         messages in return.
521         context - context number - chosen at random if not
522         supplied.
523         The remainder of the kwargs are the arguments to the API call.
524
525         The return value is the message or message array containing
526         the response.  It will raise an IOError exception if there was
527         no response within the timeout window.
528         """
529
530         # We need a context if not supplied, in order to get the
531         # response
532         context = kwargs.get('context', self.get_context())
533         kwargs['context'] = context
534
535         # Set up to receive a response
536         self.results_prepare(context, multi=multipart)
537
538         # Output the message
539         self._call_vpp_async(i, msgdef, **kwargs)
540
541         if multipart:
542             # Send a ping after the request - we use its response
543             # to detect that we have seen all results.
544             self._control_ping(context)
545
546         # Block until we get a reply.
547         r = self.results_wait(context)
548
549         return r
550
551     def _call_vpp_async(self, i, msgdef, **kwargs):
552         """Given a message, send the message and await a reply.
553
554         msgdef - the message packing definition
555         i - the message type index
556         context - context number - chosen at random if not
557         supplied.
558         The remainder of the kwargs are the arguments to the API call.
559         """
560         if not 'context' in kwargs:
561             context = self.get_context()
562             kwargs['context'] = context
563         else:
564             context = kwargs['context']
565         kwargs['_vl_msg_id'] = i
566         b = self.encode(msgdef, kwargs)
567
568         self._write(b)
569
570     def register_event_callback(self, callback):
571         """Register a callback for async messages.
572
573         This will be called for async notifications in sync mode,
574         and all messages in async mode.  In sync mode, replies to
575         requests will not come here.
576
577         callback is a fn(msg_type_name, msg_type) that will be
578         called when a message comes in.  While this function is
579         executing, note that (a) you are in a background thread and
580         may wish to use threading.Lock to protect your datastructures,
581         and (b) message processing from VPP will stop (so if you take
582         a long while about it you may provoke reply timeouts or cause
583         VPP to fill the RX buffer).  Passing None will disable the
584         callback.
585         """
586         self.event_callback = callback