PAPI: Add MACAddress object wrapper for vl_api_mac_address_t
[vpp.git] / src / vpp-api / python / vpp_papi / vpp_format.py
index 908606a..fec0667 100644 (file)
 
 from socket import inet_pton, inet_ntop, AF_INET6, AF_INET
 import socket
+import ipaddress
+from . import macaddress
 
+# Copies from vl_api_address_t definition
+ADDRESS_IP4 = 0
+ADDRESS_IP6 = 1
 
-class VPPFormatError(Exception):
-    pass
-
-
-class VPPFormat(object):
-    VPPFormatError = VPPFormatError
-
-    @staticmethod
-    def format_vl_api_ip6_prefix_t(args):
-        prefix, len = args.split('/')
-        return {'prefix': {'address': inet_pton(AF_INET6, prefix)},
-                'len': int(len)}
-
-    @staticmethod
-    def unformat_vl_api_ip6_prefix_t(args):
-        return "{}/{}".format(inet_ntop(AF_INET6, args.prefix),
-                              args.len)
-
-    @staticmethod
-    def format_vl_api_ip4_prefix_t(args):
-        prefix, len = args.split('/')
-        return {'prefix': {'address': inet_pton(AF_INET, prefix)},
-                'len': int(len)}
-
-    @staticmethod
-    def unformat_vl_api_ip4_prefix_t(args):
-        return "{}/{}".format(inet_ntop(AF_INET, args.prefix),
-                              args.len)
-
-    @staticmethod
-    def format_vl_api_ip6_address_t(args):
-        return {'address': inet_pton(AF_INET6, args)}
-
-    @staticmethod
-    def format_vl_api_ip4_address_t(args):
-        return {'address': inet_pton(AF_INET, args)}
-
-    @staticmethod
-    def format_vl_api_address_t(args):
-        try:
-            return {'un': {'ip6': inet_pton(AF_INET6, args)},
-                    'af': int(1)}
-        except socket.error as e:
-            return {'un': {'ip4': inet_pton(AF_INET, args)},
-                    'af': int(0)}
-
-    @staticmethod
-    def unformat_vl_api_address_t(arg):
-        if arg.af == 1:
-            return inet_ntop(AF_INET6, arg.un.ip6)
-        if arg.af == 0:
-            return inet_ntop(AF_INET, arg.un.ip4)
-        raise VPPFormatError
-
-    @staticmethod
-    def format_vl_api_prefix_t(args):
-        prefix, len = args.split('/')
-        return {'address': VPPFormat.format_vl_api_address_t(prefix),
-                'address_length': int(len)}
-
-    @staticmethod
-    def unformat_vl_api_prefix_t(arg):
-        if arg.address.af == 1:
-            return "{}/{}".format(inet_ntop(AF_INET6,
-                                            arg.address.un.ip6),
-                                  arg.address_length)
-        if arg.address.af == 0:
-            return "{}/{}".format(inet_ntop(AF_INET,
-                                            arg.address.un.ip4),
-                                  arg.address_length)
-        raise VPPFormatError
-
-    @staticmethod
-    def format_u8(args):
-        try:
-            return int(args)
-        except Exception as e:
-            return args.encode()
-
-    @staticmethod
-    def format(typename, args):
-        try:
-            return getattr(VPPFormat, 'format_' + typename)(args)
-        except AttributeError:
-            # Default
-            return (int(args))
-
-    @staticmethod
-    def unformat_bytes(args):
-        try:
-            return args.decode('utf-8')
-        except ValueError as e:
-            return args
-
-    @staticmethod
-    def unformat_list(args):
-        s = '['
-        for f in args:
-            t = type(f).__name__
-            if type(f) is int:
-                s2 = str(f)
-            else:
-                s2 = VPPFormat.unformat_t(t, f)
-            s += '{} '.format(s2)
-        return s[:-1] + ']'
-
-    @staticmethod
-    def unformat(args):
-        s = ''
-        return VPPFormat.unformat_t(type(args).__name__, args)
-        '''
-        for i, f in enumerate(args):
-            print('F', f)
-            t = type(f).__name__
-            if type(f) is int:
-                s2 = str(f)
-            else:
-                s2 = VPPFormat.unformat_t(t, f)
-            s += '{} {} '.format(args._fields[i], s2)
-        return s[:-1]
-        '''
-
-    @staticmethod
-    def unformat_t(typename, args):
-        try:
-            return getattr(VPPFormat, 'unformat_' + typename)(args)
-        except AttributeError:
-            # Type without explicit override
-            return VPPFormat.unformat(args)
-
-        # Default handling
-        return args
+#
+# Type conversion for input arguments and return values
+#
+
+
+def format_vl_api_address_t(args):
+    try:
+        return {'un': {'ip6': inet_pton(AF_INET6, args)},
+                'af': ADDRESS_IP6}
+    except socket.error as e:
+        return {'un': {'ip4': inet_pton(AF_INET, args)},
+                'af': ADDRESS_IP4}
+
+
+def format_vl_api_prefix_t(args):
+    p, length = args.split('/')
+    return {'address': format_vl_api_address_t(p),
+            'address_length': int(length)}
+
+
+def format_vl_api_ip6_prefix_t(args):
+    p, length = args.split('/')
+    return {'prefix': inet_pton(AF_INET6, p),
+            'len': int(length)}
+
+
+def format_vl_api_ip4_prefix_t(args):
+    p, length = args.split('/')
+    return {'prefix': inet_pton(AF_INET, p),
+            'len': int(length)}
+
+
+conversion_table = {
+    'vl_api_ip6_address_t':
+    {
+        'IPv6Address': lambda o: o.packed,
+        'str': lambda s: inet_pton(AF_INET6, s)
+    },
+    'vl_api_ip4_address_t':
+    {
+        'IPv4Address': lambda o: o.packed,
+        'str': lambda s: inet_pton(AF_INET, s)
+    },
+    'vl_api_ip6_prefix_t':
+    {
+        'IPv6Network': lambda o: {'prefix': o.network_address.packed,
+                                  'len': o.prefixlen},
+        'str': lambda s: format_vl_api_ip6_prefix_t(s)
+    },
+    'vl_api_ip4_prefix_t':
+    {
+        'IPv4Network': lambda o: {'prefix': o.network_address.packed,
+                                  'len': o.prefixlen},
+        'str': lambda s: format_vl_api_ip4_prefix_t(s)
+    },
+    'vl_api_address_t':
+    {
+        'IPv4Address': lambda o: {'af': ADDRESS_IP4, 'un': {'ip4': o.packed}},
+        'IPv6Address': lambda o: {'af': ADDRESS_IP6, 'un': {'ip6': o.packed}},
+        'str': lambda s: format_vl_api_address_t(s)
+    },
+    'vl_api_prefix_t':
+    {
+        'IPv4Network': lambda o: {'prefix':
+                                  {'af': ADDRESS_IP4, 'un':
+                                   {'ip4': o.network_address.packed}},
+                                  'len': o.prefixlen},
+        'IPv6Network': lambda o: {'prefix':
+                                  {'af': ADDRESS_IP6, 'un':
+                                   {'ip6': o.network_address.packed}},
+                                  'len': o.prefixlen},
+        'str': lambda s: format_vl_api_prefix_t(s)
+    },
+    'vl_api_mac_address_t':
+    {
+        'MACAddress': lambda o: o.packed,
+        'str': lambda s: macaddress.mac_pton(s)
+    },
+}
+
+
+def unformat_api_address_t(o):
+    if o.af == 1:
+        return ipaddress.IPv6Address(o.un.ip6)
+    if o.af == 0:
+        return ipaddress.IPv4Address(o.un.ip4)
+
+
+def unformat_api_prefix_t(o):
+    if isinstance(o.address, ipaddress.IPv4Address):
+        return ipaddress.IPv4Network((o.address, o.address_length), False)
+    if isinstance(o.address, ipaddress.IPv6Address):
+        return ipaddress.IPv6Network((o.address, o.address_length), False)
+
+
+conversion_unpacker_table = {
+    'vl_api_ip6_address_t': lambda o: ipaddress.IPv6Address(o),
+    'vl_api_ip6_prefix_t': lambda o: ipaddress.IPv6Network((o.prefix, o.len)),
+    'vl_api_ip4_address_t': lambda o: ipaddress.IPv4Address(o),
+    'vl_api_ip4_prefix_t': lambda o: ipaddress.IPv4Network((o.prefix, o.len)),
+    'vl_api_address_t': lambda o: unformat_api_address_t(o),
+    'vl_api_prefix_t': lambda o: unformat_api_prefix_t(o),
+    'vl_api_mac_address_t': lambda o: macaddress.MACAddress(o),
+}