papi: avoid IOError on disconnect
[vpp.git] / src / vpp-api / python / vpp_papi / vpp_papi.py
1 #!/usr/bin/env python
2 #
3 # Copyright (c) 2016 Cisco and/or its affiliates.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16
17 from __future__ import print_function
18 from __future__ import absolute_import
19 import sys
20 import os
21 import logging
22 import collections
23 import struct
24 import functools
25 import json
26 import threading
27 import fnmatch
28 import weakref
29 import atexit
30 from . vpp_serializer import VPPType, VPPEnumType, VPPUnionType, BaseTypes
31 from . vpp_serializer import VPPMessage, vpp_get_type, VPPTypeAlias
32 from . macaddress import MACAddress, mac_pton, mac_ntop
33
34 logger = logging.getLogger(__name__)
35
36 if sys.version[0] == '2':
37     import Queue as queue
38 else:
39     import queue as queue
40
41
42 def metaclass(metaclass):
43     @functools.wraps(metaclass)
44     def wrapper(cls):
45         return metaclass(cls.__name__, cls.__bases__, cls.__dict__.copy())
46
47     return wrapper
48
49
50 class VppEnumType(type):
51     def __getattr__(cls, name):
52         t = vpp_get_type(name)
53         return t.enum
54
55
56 @metaclass(VppEnumType)
57 class VppEnum(object):
58     pass
59
60
61 def vpp_atexit(vpp_weakref):
62     """Clean up VPP connection on shutdown."""
63     vpp_instance = vpp_weakref()
64     if vpp_instance and vpp_instance.transport.connected:
65         vpp_instance.logger.debug('Cleaning up VPP on exit')
66         vpp_instance.disconnect()
67
68
69 if sys.version[0] == '2':
70     def vpp_iterator(d):
71         return d.iteritems()
72 else:
73     def vpp_iterator(d):
74         return d.items()
75
76
77 def call_logger(msgdef, kwargs):
78     s = 'Calling {}('.format(msgdef.name)
79     for k, v in kwargs.items():
80         s += '{}:{} '.format(k, v)
81     s += ')'
82     return s
83
84
85 def return_logger(r):
86     s = 'Return from {}'.format(r)
87     return s
88
89
90 class VppApiDynamicMethodHolder(object):
91     pass
92
93
94 class FuncWrapper(object):
95     def __init__(self, func):
96         self._func = func
97         self.__name__ = func.__name__
98         self.__doc__ = func.__doc__
99
100     def __call__(self, **kwargs):
101         return self._func(**kwargs)
102
103
104 class VPPApiError(Exception):
105     pass
106
107
108 class VPPNotImplementedError(NotImplementedError):
109     pass
110
111
112 class VPPIOError(IOError):
113     pass
114
115
116 class VPPRuntimeError(RuntimeError):
117     pass
118
119
120 class VPPValueError(ValueError):
121     pass
122
123
124 class VPPApiClient(object):
125     """VPP interface.
126
127     This class provides the APIs to VPP.  The APIs are loaded
128     from provided .api.json files and makes functions accordingly.
129     These functions are documented in the VPP .api files, as they
130     are dynamically created.
131
132     Additionally, VPP can send callback messages; this class
133     provides a means to register a callback function to receive
134     these messages in a background thread.
135     """
136     apidir = None
137     VPPApiError = VPPApiError
138     VPPRuntimeError = VPPRuntimeError
139     VPPValueError = VPPValueError
140     VPPNotImplementedError = VPPNotImplementedError
141     VPPIOError = VPPIOError
142
143     def process_json_file(self, apidef_file):
144         api = json.load(apidef_file)
145         types = {}
146         for t in api['enums']:
147             t[0] = 'vl_api_' + t[0] + '_t'
148             types[t[0]] = {'type': 'enum', 'data': t}
149         for t in api['unions']:
150             t[0] = 'vl_api_' + t[0] + '_t'
151             types[t[0]] = {'type': 'union', 'data': t}
152         for t in api['types']:
153             t[0] = 'vl_api_' + t[0] + '_t'
154             types[t[0]] = {'type': 'type', 'data': t}
155         for t, v in api['aliases'].items():
156             types['vl_api_' + t + '_t'] = {'type': 'alias', 'data': v}
157         self.services.update(api['services'])
158
159         i = 0
160         while True:
161             unresolved = {}
162             for k, v in types.items():
163                 t = v['data']
164                 if not vpp_get_type(k):
165                     if v['type'] == 'enum':
166                         try:
167                             VPPEnumType(t[0], t[1:])
168                         except ValueError:
169                             unresolved[k] = v
170                     elif v['type'] == 'union':
171                         try:
172                             VPPUnionType(t[0], t[1:])
173                         except ValueError:
174                             unresolved[k] = v
175                     elif v['type'] == 'type':
176                         try:
177                             VPPType(t[0], t[1:])
178                         except ValueError:
179                             unresolved[k] = v
180                     elif v['type'] == 'alias':
181                         try:
182                             VPPTypeAlias(k, t)
183                         except ValueError:
184                             unresolved[k] = v
185             if len(unresolved) == 0:
186                 break
187             if i > 3:
188                 raise VPPValueError('Unresolved type definitions {}'
189                                     .format(unresolved))
190             types = unresolved
191             i += 1
192
193         for m in api['messages']:
194             try:
195                 self.messages[m[0]] = VPPMessage(m[0], m[1:])
196             except VPPNotImplementedError:
197                 self.logger.error('Not implemented error for {}'.format(m[0]))
198
199     def __init__(self, apifiles=None, testmode=False, async_thread=True,
200                  logger=None, loglevel=None,
201                  read_timeout=5, use_socket=False,
202                  server_address='/run/vpp-api.sock'):
203         """Create a VPP API object.
204
205         apifiles is a list of files containing API
206         descriptions that will be loaded - methods will be
207         dynamically created reflecting these APIs.  If not
208         provided this will load the API files from VPP's
209         default install location.
210
211         logger, if supplied, is the logging logger object to log to.
212         loglevel, if supplied, is the log level this logger is set
213         to report at (from the loglevels in the logging module).
214         """
215         if logger is None:
216             logger = logging.getLogger(__name__)
217             if loglevel is not None:
218                 logger.setLevel(loglevel)
219         self.logger = logger
220
221         self.messages = {}
222         self.services = {}
223         self.id_names = []
224         self.id_msgdef = []
225         self.header = VPPType('header', [['u16', 'msgid'],
226                                          ['u32', 'client_index']])
227         self.apifiles = []
228         self.event_callback = None
229         self.message_queue = queue.Queue()
230         self.read_timeout = read_timeout
231         self.async_thread = async_thread
232
233         if use_socket:
234             from . vpp_transport_socket import VppTransport
235         else:
236             from . vpp_transport_shmem import VppTransport
237
238         if not apifiles:
239             # Pick up API definitions from default directory
240             try:
241                 apifiles = self.find_api_files()
242             except RuntimeError:
243                 # In test mode we don't care that we can't find the API files
244                 if testmode:
245                     apifiles = []
246                 else:
247                     raise VPPRuntimeError
248
249         for file in apifiles:
250             with open(file) as apidef_file:
251                 self.process_json_file(apidef_file)
252
253         self.apifiles = apifiles
254
255         # Basic sanity check
256         if len(self.messages) == 0 and not testmode:
257             raise VPPValueError(1, 'Missing JSON message definitions')
258
259         self.transport = VppTransport(self, read_timeout=read_timeout,
260                                       server_address=server_address)
261         # Make sure we allow VPP to clean up the message rings.
262         atexit.register(vpp_atexit, weakref.ref(self))
263
264     class ContextId(object):
265         """Thread-safe provider of unique context IDs."""
266         def __init__(self):
267             self.context = 0
268             self.lock = threading.Lock()
269
270         def __call__(self):
271             """Get a new unique (or, at least, not recently used) context."""
272             with self.lock:
273                 self.context += 1
274                 return self.context
275     get_context = ContextId()
276
277     def get_type(self, name):
278         return vpp_get_type(name)
279
280     @classmethod
281     def find_api_dir(cls):
282         """Attempt to find the best directory in which API definition
283         files may reside. If the value VPP_API_DIR exists in the environment
284         then it is first on the search list. If we're inside a recognized
285         location in a VPP source tree (src/scripts and src/vpp-api/python)
286         then entries from there to the likely locations in build-root are
287         added. Finally the location used by system packages is added.
288
289         :returns: A single directory name, or None if no such directory
290             could be found.
291         """
292         dirs = [cls.apidir] if cls.apidir else []
293
294         # perhaps we're in the 'src/scripts' or 'src/vpp-api/python' dir;
295         # in which case, plot a course to likely places in the src tree
296         import __main__ as main
297         if hasattr(main, '__file__'):
298             # get the path of the calling script
299             localdir = os.path.dirname(os.path.realpath(main.__file__))
300         else:
301             # use cwd if there is no calling script
302             localdir = os.getcwd()
303         localdir_s = localdir.split(os.path.sep)
304
305         def dmatch(dir):
306             """Match dir against right-hand components of the script dir"""
307             d = dir.split('/')  # param 'dir' assumes a / separator
308             length = len(d)
309             return len(localdir_s) > length and localdir_s[-length:] == d
310
311         def sdir(srcdir, variant):
312             """Build a path from srcdir to the staged API files of
313             'variant'  (typically '' or '_debug')"""
314             # Since 'core' and 'plugin' files are staged
315             # in separate directories, we target the parent dir.
316             return os.path.sep.join((
317                 srcdir,
318                 'build-root',
319                 'install-vpp%s-native' % variant,
320                 'vpp',
321                 'share',
322                 'vpp',
323                 'api',
324             ))
325
326         srcdir = None
327         if dmatch('src/scripts'):
328             srcdir = os.path.sep.join(localdir_s[:-2])
329         elif dmatch('src/vpp-api/python'):
330             srcdir = os.path.sep.join(localdir_s[:-3])
331         elif dmatch('test'):
332             # we're apparently running tests
333             srcdir = os.path.sep.join(localdir_s[:-1])
334
335         if srcdir:
336             # we're in the source tree, try both the debug and release
337             # variants.
338             dirs.append(sdir(srcdir, '_debug'))
339             dirs.append(sdir(srcdir, ''))
340
341         # Test for staged copies of the scripts
342         # For these, since we explicitly know if we're running a debug versus
343         # release variant, target only the relevant directory
344         if dmatch('build-root/install-vpp_debug-native/vpp/bin'):
345             srcdir = os.path.sep.join(localdir_s[:-4])
346             dirs.append(sdir(srcdir, '_debug'))
347         if dmatch('build-root/install-vpp-native/vpp/bin'):
348             srcdir = os.path.sep.join(localdir_s[:-4])
349             dirs.append(sdir(srcdir, ''))
350
351         # finally, try the location system packages typically install into
352         dirs.append(os.path.sep.join(('', 'usr', 'share', 'vpp', 'api')))
353
354         # check the directories for existence; first one wins
355         for dir in dirs:
356             if os.path.isdir(dir):
357                 return dir
358
359         return None
360
361     @classmethod
362     def find_api_files(cls, api_dir=None, patterns='*'):
363         """Find API definition files from the given directory tree with the
364         given pattern. If no directory is given then find_api_dir() is used
365         to locate one. If no pattern is given then all definition files found
366         in the directory tree are used.
367
368         :param api_dir: A directory tree in which to locate API definition
369             files; subdirectories are descended into.
370             If this is None then find_api_dir() is called to discover it.
371         :param patterns: A list of patterns to use in each visited directory
372             when looking for files.
373             This can be a list/tuple object or a comma-separated string of
374             patterns. Each value in the list will have leading/trialing
375             whitespace stripped.
376             The pattern specifies the first part of the filename, '.api.json'
377             is appended.
378             The results are de-duplicated, thus overlapping patterns are fine.
379             If this is None it defaults to '*' meaning "all API files".
380         :returns: A list of file paths for the API files found.
381         """
382         if api_dir is None:
383             api_dir = cls.find_api_dir()
384             if api_dir is None:
385                 raise VPPApiError("api_dir cannot be located")
386
387         if isinstance(patterns, list) or isinstance(patterns, tuple):
388             patterns = [p.strip() + '.api.json' for p in patterns]
389         else:
390             patterns = [p.strip() + '.api.json' for p in patterns.split(",")]
391
392         api_files = []
393         for root, dirnames, files in os.walk(api_dir):
394             # iterate all given patterns and de-dup the result
395             files = set(sum([fnmatch.filter(files, p) for p in patterns], []))
396             for filename in files:
397                 api_files.append(os.path.join(root, filename))
398
399         return api_files
400
401     @property
402     def api(self):
403         if not hasattr(self, "_api"):
404             raise VPPApiError("Not connected, api definitions not available")
405         return self._api
406
407     def make_function(self, msg, i, multipart, do_async):
408         if (do_async):
409             def f(**kwargs):
410                 return self._call_vpp_async(i, msg, **kwargs)
411         else:
412             def f(**kwargs):
413                 return self._call_vpp(i, msg, multipart, **kwargs)
414
415         f.__name__ = str(msg.name)
416         f.__doc__ = ", ".join(["%s %s" %
417                                (msg.fieldtypes[j], k)
418                                for j, k in enumerate(msg.fields)])
419         f.msg = msg
420
421         return f
422
423     def _register_functions(self, do_async=False):
424         self.id_names = [None] * (self.vpp_dictionary_maxid + 1)
425         self.id_msgdef = [None] * (self.vpp_dictionary_maxid + 1)
426         self._api = VppApiDynamicMethodHolder()
427         for name, msg in vpp_iterator(self.messages):
428             n = name + '_' + msg.crc[2:]
429             i = self.transport.get_msg_index(n.encode('utf-8'))
430             if i > 0:
431                 self.id_msgdef[i] = msg
432                 self.id_names[i] = name
433
434                 # Create function for client side messages.
435                 if name in self.services:
436                     if 'stream' in self.services[name] and \
437                        self.services[name]['stream']:
438                         multipart = True
439                     else:
440                         multipart = False
441                     f = self.make_function(msg, i, multipart, do_async)
442                     setattr(self._api, name, FuncWrapper(f))
443             else:
444                 self.logger.debug(
445                     'No such message type or failed CRC checksum: %s', n)
446
447     def connect_internal(self, name, msg_handler, chroot_prefix, rx_qlen,
448                          do_async):
449         pfx = chroot_prefix.encode('utf-8') if chroot_prefix else None
450
451         rv = self.transport.connect(name.encode('utf-8'), pfx,
452                                     msg_handler, rx_qlen)
453         if rv != 0:
454             raise VPPIOError(2, 'Connect failed')
455         self.vpp_dictionary_maxid = self.transport.msg_table_max_index()
456         self._register_functions(do_async=do_async)
457
458         # Initialise control ping
459         crc = self.messages['control_ping'].crc
460         self.control_ping_index = self.transport.get_msg_index(
461             ('control_ping' + '_' + crc[2:]).encode('utf-8'))
462         self.control_ping_msgdef = self.messages['control_ping']
463         if self.async_thread:
464             self.event_thread = threading.Thread(
465                 target=self.thread_msg_handler)
466             self.event_thread.daemon = True
467             self.event_thread.start()
468         else:
469             self.event_thread = None
470         return rv
471
472     def connect(self, name, chroot_prefix=None, do_async=False, rx_qlen=32):
473         """Attach to VPP.
474
475         name - the name of the client.
476         chroot_prefix - if VPP is chroot'ed, the prefix of the jail
477         do_async - if true, messages are sent without waiting for a reply
478         rx_qlen - the length of the VPP message receive queue between
479         client and server.
480         """
481         msg_handler = self.transport.get_callback(do_async)
482         return self.connect_internal(name, msg_handler, chroot_prefix, rx_qlen,
483                                      do_async)
484
485     def connect_sync(self, name, chroot_prefix=None, rx_qlen=32):
486         """Attach to VPP in synchronous mode. Application must poll for events.
487
488         name - the name of the client.
489         chroot_prefix - if VPP is chroot'ed, the prefix of the jail
490         rx_qlen - the length of the VPP message receive queue between
491         client and server.
492         """
493
494         return self.connect_internal(name, None, chroot_prefix, rx_qlen,
495                                      do_async=False)
496
497     def disconnect(self):
498         """Detach from VPP."""
499         rv = self.transport.disconnect()
500         if self.event_thread is not None:
501             self.message_queue.put("terminate event thread")
502         return rv
503
504     def msg_handler_sync(self, msg):
505         """Process an incoming message from VPP in sync mode.
506
507         The message may be a reply or it may be an async notification.
508         """
509         r = self.decode_incoming_msg(msg)
510         if r is None:
511             return
512
513         # If we have a context, then use the context to find any
514         # request waiting for a reply
515         context = 0
516         if hasattr(r, 'context') and r.context > 0:
517             context = r.context
518
519         if context == 0:
520             # No context -> async notification that we feed to the callback
521             self.message_queue.put_nowait(r)
522         else:
523             raise VPPIOError(2, 'RPC reply message received in event handler')
524
525     def has_context(self, msg):
526         if len(msg) < 10:
527             return False
528
529         header = VPPType('header_with_context', [['u16', 'msgid'],
530                                                  ['u32', 'client_index'],
531                                                  ['u32', 'context']])
532
533         (i, ci, context), size = header.unpack(msg, 0)
534         if self.id_names[i] == 'rx_thread_exit':
535             return
536
537         #
538         # Decode message and returns a tuple.
539         #
540         msgobj = self.id_msgdef[i]
541         if 'context' in msgobj.field_by_name and context >= 0:
542             return True
543         return False
544
545     def decode_incoming_msg(self, msg, no_type_conversion=False):
546         if not msg:
547             self.logger.warning('vpp_api.read failed')
548             return
549
550         (i, ci), size = self.header.unpack(msg, 0)
551         if self.id_names[i] == 'rx_thread_exit':
552             return
553
554         #
555         # Decode message and returns a tuple.
556         #
557         msgobj = self.id_msgdef[i]
558         if not msgobj:
559             raise VPPIOError(2, 'Reply message undefined')
560
561         r, size = msgobj.unpack(msg, ntc=no_type_conversion)
562         return r
563
564     def msg_handler_async(self, msg):
565         """Process a message from VPP in async mode.
566
567         In async mode, all messages are returned to the callback.
568         """
569         r = self.decode_incoming_msg(msg)
570         if r is None:
571             return
572
573         msgname = type(r).__name__
574
575         if self.event_callback:
576             self.event_callback(msgname, r)
577
578     def _control_ping(self, context):
579         """Send a ping command."""
580         self._call_vpp_async(self.control_ping_index,
581                              self.control_ping_msgdef,
582                              context=context)
583
584     def validate_args(self, msg, kwargs):
585         d = set(kwargs.keys()) - set(msg.field_by_name.keys())
586         if d:
587             raise VPPValueError('Invalid argument {} to {}'
588                                 .format(list(d), msg.name))
589
590     def _call_vpp(self, i, msgdef, multipart, **kwargs):
591         """Given a message, send the message and await a reply.
592
593         msgdef - the message packing definition
594         i - the message type index
595         multipart - True if the message returns multiple
596         messages in return.
597         context - context number - chosen at random if not
598         supplied.
599         The remainder of the kwargs are the arguments to the API call.
600
601         The return value is the message or message array containing
602         the response.  It will raise an IOError exception if there was
603         no response within the timeout window.
604         """
605
606         if 'context' not in kwargs:
607             context = self.get_context()
608             kwargs['context'] = context
609         else:
610             context = kwargs['context']
611         kwargs['_vl_msg_id'] = i
612
613         no_type_conversion = kwargs.pop('_no_type_conversion', False)
614
615         try:
616             if self.transport.socket_index:
617                 kwargs['client_index'] = self.transport.socket_index
618         except AttributeError:
619             pass
620         self.validate_args(msgdef, kwargs)
621
622         logging.debug(call_logger(msgdef, kwargs))
623
624         b = msgdef.pack(kwargs)
625         self.transport.suspend()
626
627         self.transport.write(b)
628
629         if multipart:
630             # Send a ping after the request - we use its response
631             # to detect that we have seen all results.
632             self._control_ping(context)
633
634         # Block until we get a reply.
635         rl = []
636         while (True):
637             msg = self.transport.read()
638             if not msg:
639                 raise VPPIOError(2, 'VPP API client: read failed')
640             r = self.decode_incoming_msg(msg, no_type_conversion)
641             msgname = type(r).__name__
642             if context not in r or r.context == 0 or context != r.context:
643                 # Message being queued
644                 self.message_queue.put_nowait(r)
645                 continue
646
647             if not multipart:
648                 rl = r
649                 break
650             if msgname == 'control_ping_reply':
651                 break
652
653             rl.append(r)
654
655         self.transport.resume()
656
657         logger.debug(return_logger(rl))
658         return rl
659
660     def _call_vpp_async(self, i, msg, **kwargs):
661         """Given a message, send the message and await a reply.
662
663         msgdef - the message packing definition
664         i - the message type index
665         context - context number - chosen at random if not
666         supplied.
667         The remainder of the kwargs are the arguments to the API call.
668         """
669         if 'context' not in kwargs:
670             context = self.get_context()
671             kwargs['context'] = context
672         else:
673             context = kwargs['context']
674         try:
675             if self.transport.socket_index:
676                 kwargs['client_index'] = self.transport.socket_index
677         except AttributeError:
678             kwargs['client_index'] = 0
679         kwargs['_vl_msg_id'] = i
680         b = msg.pack(kwargs)
681
682         self.transport.write(b)
683
684     def register_event_callback(self, callback):
685         """Register a callback for async messages.
686
687         This will be called for async notifications in sync mode,
688         and all messages in async mode.  In sync mode, replies to
689         requests will not come here.
690
691         callback is a fn(msg_type_name, msg_type) that will be
692         called when a message comes in.  While this function is
693         executing, note that (a) you are in a background thread and
694         may wish to use threading.Lock to protect your datastructures,
695         and (b) message processing from VPP will stop (so if you take
696         a long while about it you may provoke reply timeouts or cause
697         VPP to fill the RX buffer).  Passing None will disable the
698         callback.
699         """
700         self.event_callback = callback
701
702     def thread_msg_handler(self):
703         """Python thread calling the user registered message handler.
704
705         This is to emulate the old style event callback scheme. Modern
706         clients should provide their own thread to poll the event
707         queue.
708         """
709         while True:
710             r = self.message_queue.get()
711             if r == "terminate event thread":
712                 break
713             msgname = type(r).__name__
714             if self.event_callback:
715                 self.event_callback(msgname, r)
716
717 # Provide the old name for backward compatibility.
718 VPP = VPPApiClient
719
720 # vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4