vpp_papi: Fix vapi.disconnect exception.
[vpp.git] / src / vpp-api / python / vpp_papi / vpp_papi.py
1 #!/usr/bin/env python
2 #
3 # Copyright (c) 2016 Cisco and/or its affiliates.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16
17 from __future__ import print_function
18 from __future__ import absolute_import
19 import sys
20 import os
21 import logging
22 import collections
23 import struct
24 import functools
25 import json
26 import threading
27 import fnmatch
28 import weakref
29 import atexit
30 from . vpp_serializer import VPPType, VPPEnumType, VPPUnionType, BaseTypes
31 from . vpp_serializer import VPPMessage, vpp_get_type, VPPTypeAlias
32 from . macaddress import MACAddress, mac_pton, mac_ntop
33
34 logger = logging.getLogger(__name__)
35
36 if sys.version[0] == '2':
37     import Queue as queue
38 else:
39     import queue as queue
40
41
42 def metaclass(metaclass):
43     @functools.wraps(metaclass)
44     def wrapper(cls):
45         return metaclass(cls.__name__, cls.__bases__, cls.__dict__.copy())
46
47     return wrapper
48
49
50 class VppEnumType(type):
51     def __getattr__(cls, name):
52         t = vpp_get_type(name)
53         return t.enum
54
55
56 @metaclass(VppEnumType)
57 class VppEnum(object):
58     pass
59
60
61 def vpp_atexit(vpp_weakref):
62     """Clean up VPP connection on shutdown."""
63     vpp_instance = vpp_weakref()
64     if vpp_instance and vpp_instance.transport.connected:
65         vpp_instance.logger.debug('Cleaning up VPP on exit')
66         vpp_instance.disconnect()
67
68
69 if sys.version[0] == '2':
70     def vpp_iterator(d):
71         return d.iteritems()
72 else:
73     def vpp_iterator(d):
74         return d.items()
75
76
77 def call_logger(msgdef, kwargs):
78     s = 'Calling {}('.format(msgdef.name)
79     for k, v in kwargs.items():
80         s += '{}:{} '.format(k, v)
81     s += ')'
82     return s
83
84
85 def return_logger(r):
86     s = 'Return from {}'.format(r)
87     return s
88
89
90 class VppApiDynamicMethodHolder(object):
91     pass
92
93
94 class FuncWrapper(object):
95     def __init__(self, func):
96         self._func = func
97         self.__name__ = func.__name__
98         self.__doc__ = func.__doc__
99
100     def __call__(self, **kwargs):
101         return self._func(**kwargs)
102
103
104 class VPPApiError(Exception):
105     pass
106
107
108 class VPPNotImplementedError(NotImplementedError):
109     pass
110
111
112 class VPPIOError(IOError):
113     pass
114
115
116 class VPPRuntimeError(RuntimeError):
117     pass
118
119
120 class VPPValueError(ValueError):
121     pass
122
123
124 class VPPApiClient(object):
125     """VPP interface.
126
127     This class provides the APIs to VPP.  The APIs are loaded
128     from provided .api.json files and makes functions accordingly.
129     These functions are documented in the VPP .api files, as they
130     are dynamically created.
131
132     Additionally, VPP can send callback messages; this class
133     provides a means to register a callback function to receive
134     these messages in a background thread.
135     """
136     apidir = None
137     VPPApiError = VPPApiError
138     VPPRuntimeError = VPPRuntimeError
139     VPPValueError = VPPValueError
140     VPPNotImplementedError = VPPNotImplementedError
141     VPPIOError = VPPIOError
142
143     def process_json_file(self, apidef_file):
144         api = json.load(apidef_file)
145         types = {}
146         for t in api['enums']:
147             t[0] = 'vl_api_' + t[0] + '_t'
148             types[t[0]] = {'type': 'enum', 'data': t}
149         for t in api['unions']:
150             t[0] = 'vl_api_' + t[0] + '_t'
151             types[t[0]] = {'type': 'union', 'data': t}
152         for t in api['types']:
153             t[0] = 'vl_api_' + t[0] + '_t'
154             types[t[0]] = {'type': 'type', 'data': t}
155         for t, v in api['aliases'].items():
156             types['vl_api_' + t + '_t'] = {'type': 'alias', 'data': v}
157         self.services.update(api['services'])
158
159         i = 0
160         while True:
161             unresolved = {}
162             for k, v in types.items():
163                 t = v['data']
164                 if not vpp_get_type(k):
165                     if v['type'] == 'enum':
166                         try:
167                             VPPEnumType(t[0], t[1:])
168                         except ValueError:
169                             unresolved[k] = v
170                     elif v['type'] == 'union':
171                         try:
172                             VPPUnionType(t[0], t[1:])
173                         except ValueError:
174                             unresolved[k] = v
175                     elif v['type'] == 'type':
176                         try:
177                             VPPType(t[0], t[1:])
178                         except ValueError:
179                             unresolved[k] = v
180                     elif v['type'] == 'alias':
181                         try:
182                             VPPTypeAlias(k, t)
183                         except ValueError:
184                             unresolved[k] = v
185             if len(unresolved) == 0:
186                 break
187             if i > 3:
188                 raise VPPValueError('Unresolved type definitions {}'
189                                     .format(unresolved))
190             types = unresolved
191             i += 1
192
193         for m in api['messages']:
194             try:
195                 self.messages[m[0]] = VPPMessage(m[0], m[1:])
196             except VPPNotImplementedError:
197                 self.logger.error('Not implemented error for {}'.format(m[0]))
198
199     def __init__(self, apifiles=None, testmode=False, async_thread=True,
200                  logger=None, loglevel=None,
201                  read_timeout=5, use_socket=False,
202                  server_address='/run/vpp-api.sock'):
203         """Create a VPP API object.
204
205         apifiles is a list of files containing API
206         descriptions that will be loaded - methods will be
207         dynamically created reflecting these APIs.  If not
208         provided this will load the API files from VPP's
209         default install location.
210
211         logger, if supplied, is the logging logger object to log to.
212         loglevel, if supplied, is the log level this logger is set
213         to report at (from the loglevels in the logging module).
214         """
215         if logger is None:
216             logger = logging.getLogger(__name__)
217             if loglevel is not None:
218                 logger.setLevel(loglevel)
219         self.logger = logger
220
221         self.messages = {}
222         self.services = {}
223         self.id_names = []
224         self.id_msgdef = []
225         self.header = VPPType('header', [['u16', 'msgid'],
226                                          ['u32', 'client_index']])
227         self.apifiles = []
228         self.event_callback = None
229         self.message_queue = queue.Queue()
230         self.read_timeout = read_timeout
231         self.async_thread = async_thread
232         self.event_thread = None
233
234         if use_socket:
235             from . vpp_transport_socket import VppTransport
236         else:
237             from . vpp_transport_shmem import VppTransport
238
239         if not apifiles:
240             # Pick up API definitions from default directory
241             try:
242                 apifiles = self.find_api_files()
243             except RuntimeError:
244                 # In test mode we don't care that we can't find the API files
245                 if testmode:
246                     apifiles = []
247                 else:
248                     raise VPPRuntimeError
249
250         for file in apifiles:
251             with open(file) as apidef_file:
252                 self.process_json_file(apidef_file)
253
254         self.apifiles = apifiles
255
256         # Basic sanity check
257         if len(self.messages) == 0 and not testmode:
258             raise VPPValueError(1, 'Missing JSON message definitions')
259
260         self.transport = VppTransport(self, read_timeout=read_timeout,
261                                       server_address=server_address)
262         # Make sure we allow VPP to clean up the message rings.
263         atexit.register(vpp_atexit, weakref.ref(self))
264
265     class ContextId(object):
266         """Thread-safe provider of unique context IDs."""
267         def __init__(self):
268             self.context = 0
269             self.lock = threading.Lock()
270
271         def __call__(self):
272             """Get a new unique (or, at least, not recently used) context."""
273             with self.lock:
274                 self.context += 1
275                 return self.context
276     get_context = ContextId()
277
278     def get_type(self, name):
279         return vpp_get_type(name)
280
281     @classmethod
282     def find_api_dir(cls):
283         """Attempt to find the best directory in which API definition
284         files may reside. If the value VPP_API_DIR exists in the environment
285         then it is first on the search list. If we're inside a recognized
286         location in a VPP source tree (src/scripts and src/vpp-api/python)
287         then entries from there to the likely locations in build-root are
288         added. Finally the location used by system packages is added.
289
290         :returns: A single directory name, or None if no such directory
291             could be found.
292         """
293         dirs = [cls.apidir] if cls.apidir else []
294
295         # perhaps we're in the 'src/scripts' or 'src/vpp-api/python' dir;
296         # in which case, plot a course to likely places in the src tree
297         import __main__ as main
298         if hasattr(main, '__file__'):
299             # get the path of the calling script
300             localdir = os.path.dirname(os.path.realpath(main.__file__))
301         else:
302             # use cwd if there is no calling script
303             localdir = os.getcwd()
304         localdir_s = localdir.split(os.path.sep)
305
306         def dmatch(dir):
307             """Match dir against right-hand components of the script dir"""
308             d = dir.split('/')  # param 'dir' assumes a / separator
309             length = len(d)
310             return len(localdir_s) > length and localdir_s[-length:] == d
311
312         def sdir(srcdir, variant):
313             """Build a path from srcdir to the staged API files of
314             'variant'  (typically '' or '_debug')"""
315             # Since 'core' and 'plugin' files are staged
316             # in separate directories, we target the parent dir.
317             return os.path.sep.join((
318                 srcdir,
319                 'build-root',
320                 'install-vpp%s-native' % variant,
321                 'vpp',
322                 'share',
323                 'vpp',
324                 'api',
325             ))
326
327         srcdir = None
328         if dmatch('src/scripts'):
329             srcdir = os.path.sep.join(localdir_s[:-2])
330         elif dmatch('src/vpp-api/python'):
331             srcdir = os.path.sep.join(localdir_s[:-3])
332         elif dmatch('test'):
333             # we're apparently running tests
334             srcdir = os.path.sep.join(localdir_s[:-1])
335
336         if srcdir:
337             # we're in the source tree, try both the debug and release
338             # variants.
339             dirs.append(sdir(srcdir, '_debug'))
340             dirs.append(sdir(srcdir, ''))
341
342         # Test for staged copies of the scripts
343         # For these, since we explicitly know if we're running a debug versus
344         # release variant, target only the relevant directory
345         if dmatch('build-root/install-vpp_debug-native/vpp/bin'):
346             srcdir = os.path.sep.join(localdir_s[:-4])
347             dirs.append(sdir(srcdir, '_debug'))
348         if dmatch('build-root/install-vpp-native/vpp/bin'):
349             srcdir = os.path.sep.join(localdir_s[:-4])
350             dirs.append(sdir(srcdir, ''))
351
352         # finally, try the location system packages typically install into
353         dirs.append(os.path.sep.join(('', 'usr', 'share', 'vpp', 'api')))
354
355         # check the directories for existence; first one wins
356         for dir in dirs:
357             if os.path.isdir(dir):
358                 return dir
359
360         return None
361
362     @classmethod
363     def find_api_files(cls, api_dir=None, patterns='*'):
364         """Find API definition files from the given directory tree with the
365         given pattern. If no directory is given then find_api_dir() is used
366         to locate one. If no pattern is given then all definition files found
367         in the directory tree are used.
368
369         :param api_dir: A directory tree in which to locate API definition
370             files; subdirectories are descended into.
371             If this is None then find_api_dir() is called to discover it.
372         :param patterns: A list of patterns to use in each visited directory
373             when looking for files.
374             This can be a list/tuple object or a comma-separated string of
375             patterns. Each value in the list will have leading/trialing
376             whitespace stripped.
377             The pattern specifies the first part of the filename, '.api.json'
378             is appended.
379             The results are de-duplicated, thus overlapping patterns are fine.
380             If this is None it defaults to '*' meaning "all API files".
381         :returns: A list of file paths for the API files found.
382         """
383         if api_dir is None:
384             api_dir = cls.find_api_dir()
385             if api_dir is None:
386                 raise VPPApiError("api_dir cannot be located")
387
388         if isinstance(patterns, list) or isinstance(patterns, tuple):
389             patterns = [p.strip() + '.api.json' for p in patterns]
390         else:
391             patterns = [p.strip() + '.api.json' for p in patterns.split(",")]
392
393         api_files = []
394         for root, dirnames, files in os.walk(api_dir):
395             # iterate all given patterns and de-dup the result
396             files = set(sum([fnmatch.filter(files, p) for p in patterns], []))
397             for filename in files:
398                 api_files.append(os.path.join(root, filename))
399
400         return api_files
401
402     @property
403     def api(self):
404         if not hasattr(self, "_api"):
405             raise VPPApiError("Not connected, api definitions not available")
406         return self._api
407
408     def make_function(self, msg, i, multipart, do_async):
409         if (do_async):
410             def f(**kwargs):
411                 return self._call_vpp_async(i, msg, **kwargs)
412         else:
413             def f(**kwargs):
414                 return self._call_vpp(i, msg, multipart, **kwargs)
415
416         f.__name__ = str(msg.name)
417         f.__doc__ = ", ".join(["%s %s" %
418                                (msg.fieldtypes[j], k)
419                                for j, k in enumerate(msg.fields)])
420         f.msg = msg
421
422         return f
423
424     def _register_functions(self, do_async=False):
425         self.id_names = [None] * (self.vpp_dictionary_maxid + 1)
426         self.id_msgdef = [None] * (self.vpp_dictionary_maxid + 1)
427         self._api = VppApiDynamicMethodHolder()
428         for name, msg in vpp_iterator(self.messages):
429             n = name + '_' + msg.crc[2:]
430             i = self.transport.get_msg_index(n.encode('utf-8'))
431             if i > 0:
432                 self.id_msgdef[i] = msg
433                 self.id_names[i] = name
434
435                 # Create function for client side messages.
436                 if name in self.services:
437                     if 'stream' in self.services[name] and \
438                        self.services[name]['stream']:
439                         multipart = True
440                     else:
441                         multipart = False
442                     f = self.make_function(msg, i, multipart, do_async)
443                     setattr(self._api, name, FuncWrapper(f))
444             else:
445                 self.logger.debug(
446                     'No such message type or failed CRC checksum: %s', n)
447
448     def connect_internal(self, name, msg_handler, chroot_prefix, rx_qlen,
449                          do_async):
450         pfx = chroot_prefix.encode('utf-8') if chroot_prefix else None
451
452         rv = self.transport.connect(name.encode('utf-8'), pfx,
453                                     msg_handler, rx_qlen)
454         if rv != 0:
455             raise VPPIOError(2, 'Connect failed')
456         self.vpp_dictionary_maxid = self.transport.msg_table_max_index()
457         self._register_functions(do_async=do_async)
458
459         # Initialise control ping
460         crc = self.messages['control_ping'].crc
461         self.control_ping_index = self.transport.get_msg_index(
462             ('control_ping' + '_' + crc[2:]).encode('utf-8'))
463         self.control_ping_msgdef = self.messages['control_ping']
464         if self.async_thread:
465             self.event_thread = threading.Thread(
466                 target=self.thread_msg_handler)
467             self.event_thread.daemon = True
468             self.event_thread.start()
469         else:
470             self.event_thread = None
471         return rv
472
473     def connect(self, name, chroot_prefix=None, do_async=False, rx_qlen=32):
474         """Attach to VPP.
475
476         name - the name of the client.
477         chroot_prefix - if VPP is chroot'ed, the prefix of the jail
478         do_async - if true, messages are sent without waiting for a reply
479         rx_qlen - the length of the VPP message receive queue between
480         client and server.
481         """
482         msg_handler = self.transport.get_callback(do_async)
483         return self.connect_internal(name, msg_handler, chroot_prefix, rx_qlen,
484                                      do_async)
485
486     def connect_sync(self, name, chroot_prefix=None, rx_qlen=32):
487         """Attach to VPP in synchronous mode. Application must poll for events.
488
489         name - the name of the client.
490         chroot_prefix - if VPP is chroot'ed, the prefix of the jail
491         rx_qlen - the length of the VPP message receive queue between
492         client and server.
493         """
494
495         return self.connect_internal(name, None, chroot_prefix, rx_qlen,
496                                      do_async=False)
497
498     def disconnect(self):
499         """Detach from VPP."""
500         rv = self.transport.disconnect()
501         if self.event_thread is not None:
502             self.message_queue.put("terminate event thread")
503         return rv
504
505     def msg_handler_sync(self, msg):
506         """Process an incoming message from VPP in sync mode.
507
508         The message may be a reply or it may be an async notification.
509         """
510         r = self.decode_incoming_msg(msg)
511         if r is None:
512             return
513
514         # If we have a context, then use the context to find any
515         # request waiting for a reply
516         context = 0
517         if hasattr(r, 'context') and r.context > 0:
518             context = r.context
519
520         if context == 0:
521             # No context -> async notification that we feed to the callback
522             self.message_queue.put_nowait(r)
523         else:
524             raise VPPIOError(2, 'RPC reply message received in event handler')
525
526     def has_context(self, msg):
527         if len(msg) < 10:
528             return False
529
530         header = VPPType('header_with_context', [['u16', 'msgid'],
531                                                  ['u32', 'client_index'],
532                                                  ['u32', 'context']])
533
534         (i, ci, context), size = header.unpack(msg, 0)
535         if self.id_names[i] == 'rx_thread_exit':
536             return
537
538         #
539         # Decode message and returns a tuple.
540         #
541         msgobj = self.id_msgdef[i]
542         if 'context' in msgobj.field_by_name and context >= 0:
543             return True
544         return False
545
546     def decode_incoming_msg(self, msg, no_type_conversion=False):
547         if not msg:
548             self.logger.warning('vpp_api.read failed')
549             return
550
551         (i, ci), size = self.header.unpack(msg, 0)
552         if self.id_names[i] == 'rx_thread_exit':
553             return
554
555         #
556         # Decode message and returns a tuple.
557         #
558         msgobj = self.id_msgdef[i]
559         if not msgobj:
560             raise VPPIOError(2, 'Reply message undefined')
561
562         r, size = msgobj.unpack(msg, ntc=no_type_conversion)
563         return r
564
565     def msg_handler_async(self, msg):
566         """Process a message from VPP in async mode.
567
568         In async mode, all messages are returned to the callback.
569         """
570         r = self.decode_incoming_msg(msg)
571         if r is None:
572             return
573
574         msgname = type(r).__name__
575
576         if self.event_callback:
577             self.event_callback(msgname, r)
578
579     def _control_ping(self, context):
580         """Send a ping command."""
581         self._call_vpp_async(self.control_ping_index,
582                              self.control_ping_msgdef,
583                              context=context)
584
585     def validate_args(self, msg, kwargs):
586         d = set(kwargs.keys()) - set(msg.field_by_name.keys())
587         if d:
588             raise VPPValueError('Invalid argument {} to {}'
589                                 .format(list(d), msg.name))
590
591     def _call_vpp(self, i, msgdef, multipart, **kwargs):
592         """Given a message, send the message and await a reply.
593
594         msgdef - the message packing definition
595         i - the message type index
596         multipart - True if the message returns multiple
597         messages in return.
598         context - context number - chosen at random if not
599         supplied.
600         The remainder of the kwargs are the arguments to the API call.
601
602         The return value is the message or message array containing
603         the response.  It will raise an IOError exception if there was
604         no response within the timeout window.
605         """
606
607         if 'context' not in kwargs:
608             context = self.get_context()
609             kwargs['context'] = context
610         else:
611             context = kwargs['context']
612         kwargs['_vl_msg_id'] = i
613
614         no_type_conversion = kwargs.pop('_no_type_conversion', False)
615
616         try:
617             if self.transport.socket_index:
618                 kwargs['client_index'] = self.transport.socket_index
619         except AttributeError:
620             pass
621         self.validate_args(msgdef, kwargs)
622
623         logging.debug(call_logger(msgdef, kwargs))
624
625         b = msgdef.pack(kwargs)
626         self.transport.suspend()
627
628         self.transport.write(b)
629
630         if multipart:
631             # Send a ping after the request - we use its response
632             # to detect that we have seen all results.
633             self._control_ping(context)
634
635         # Block until we get a reply.
636         rl = []
637         while (True):
638             msg = self.transport.read()
639             if not msg:
640                 raise VPPIOError(2, 'VPP API client: read failed')
641             r = self.decode_incoming_msg(msg, no_type_conversion)
642             msgname = type(r).__name__
643             if context not in r or r.context == 0 or context != r.context:
644                 # Message being queued
645                 self.message_queue.put_nowait(r)
646                 continue
647
648             if not multipart:
649                 rl = r
650                 break
651             if msgname == 'control_ping_reply':
652                 break
653
654             rl.append(r)
655
656         self.transport.resume()
657
658         logger.debug(return_logger(rl))
659         return rl
660
661     def _call_vpp_async(self, i, msg, **kwargs):
662         """Given a message, send the message and await a reply.
663
664         msgdef - the message packing definition
665         i - the message type index
666         context - context number - chosen at random if not
667         supplied.
668         The remainder of the kwargs are the arguments to the API call.
669         """
670         if 'context' not in kwargs:
671             context = self.get_context()
672             kwargs['context'] = context
673         else:
674             context = kwargs['context']
675         try:
676             if self.transport.socket_index:
677                 kwargs['client_index'] = self.transport.socket_index
678         except AttributeError:
679             kwargs['client_index'] = 0
680         kwargs['_vl_msg_id'] = i
681         b = msg.pack(kwargs)
682
683         self.transport.write(b)
684
685     def register_event_callback(self, callback):
686         """Register a callback for async messages.
687
688         This will be called for async notifications in sync mode,
689         and all messages in async mode.  In sync mode, replies to
690         requests will not come here.
691
692         callback is a fn(msg_type_name, msg_type) that will be
693         called when a message comes in.  While this function is
694         executing, note that (a) you are in a background thread and
695         may wish to use threading.Lock to protect your datastructures,
696         and (b) message processing from VPP will stop (so if you take
697         a long while about it you may provoke reply timeouts or cause
698         VPP to fill the RX buffer).  Passing None will disable the
699         callback.
700         """
701         self.event_callback = callback
702
703     def thread_msg_handler(self):
704         """Python thread calling the user registered message handler.
705
706         This is to emulate the old style event callback scheme. Modern
707         clients should provide their own thread to poll the event
708         queue.
709         """
710         while True:
711             r = self.message_queue.get()
712             if r == "terminate event thread":
713                 break
714             msgname = type(r).__name__
715             if self.event_callback:
716                 self.event_callback(msgname, r)
717
718 # Provide the old name for backward compatibility.
719 VPP = VPPApiClient
720
721 # vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4