# See the License for the specific language governing permissions and
# limitations under the License.
#
+import datetime
+from socket import inet_pton, AF_INET6, AF_INET
+import socket
+import ipaddress
+from . import macaddress
-from socket import inet_pton, inet_ntop, AF_INET6, AF_INET
-
-
-class VPPFormat(object):
- @staticmethod
- def format_vl_api_ip6_prefix_t(args):
- prefix, len = args.split('/')
- return {'prefix': {'address': inet_pton(AF_INET6, prefix)},
- 'len': int(len)}
-
- @staticmethod
- def unformat_vl_api_ip6_prefix_t(args):
- return "{}/{}".format(inet_ntop(AF_INET6, args.prefix.address),
- args.len)
-
- @staticmethod
- def format_vl_api_ip4_prefix_t(args):
- prefix, len = args.split('/')
- return {'prefix': {'address': inet_pton(AF_INET, prefix)},
- 'len': int(len)}
-
- @staticmethod
- def unformat_vl_api_ip4_prefix_t(args):
- return "{}/{}".format(inet_ntop(AF_INET, args.prefix.address),
- args.len)
-
- @staticmethod
- def format_vl_api_ip6_address_t(args):
- return {'address': inet_pton(AF_INET6, args)}
-
- @staticmethod
- def format_vl_api_ip4_address_t(args):
- return {'address': inet_pton(AF_INET, args)}
-
- @staticmethod
- def format_vl_api_address_t(args):
- try:
- return {'un': {'ip6': {'address': inet_pton(AF_INET6, args)}},
- 'af': int(1)}
- except Exception as e:
- return {'un': {'ip4': {'address': inet_pton(AF_INET, args)}},
- 'af': int(0)}
-
- @staticmethod
- def unformat_vl_api_address_t(arg):
- if arg.af == 1:
- return inet_ntop(AF_INET6, arg.un.ip6.address)
- if arg.af == 0:
- return inet_ntop(AF_INET, arg.un.ip4.address)
- raise
-
- @staticmethod
- def format_vl_api_prefix_t(args):
- prefix, len = args.split('/')
- return {'address': VPPFormat.format_vl_api_address_t(prefix),
- 'address_length': int(len)}
-
- @staticmethod
- def unformat_vl_api_prefix_t(arg):
- if arg.address.af == 1:
- return "{}/{}".format(inet_ntop(AF_INET6,
- arg.address.un.ip6.address),
- arg.address_length)
- if arg.address.af == 0:
- return "{}/{}".format(inet_ntop(AF_INET,
- arg.address.un.ip4.address),
- arg.address_length)
- raise
-
- @staticmethod
- def format_u8(args):
- try:
- return int(args)
- except Exception as e:
- return args.encode()
-
- @staticmethod
- def format(typename, args):
- try:
- return getattr(VPPFormat, 'format_' + typename)(args)
- except AttributeError:
- # Default
- return (int(args))
-
- @staticmethod
- def unformat_bytes(args):
- try:
- return args.decode('utf-8')
- except Exception as e:
- return args
-
- @staticmethod
- def unformat_list(args):
- s = '['
- for f in args:
- t = type(f).__name__
- if type(f) is int:
- s2 = str(f)
- else:
- s2 = VPPFormat.unformat_t(t, f)
- s += '{} '.format(s2)
- return s[:-1] + ']'
-
- @staticmethod
- def unformat(args):
- s = ''
- return VPPFormat.unformat_t(type(args).__name__, args)
- '''
- for i, f in enumerate(args):
- print('F', f)
- t = type(f).__name__
- if type(f) is int:
- s2 = str(f)
- else:
- s2 = VPPFormat.unformat_t(t, f)
- s += '{} {} '.format(args._fields[i], s2)
- return s[:-1]
- '''
-
- @staticmethod
- def unformat_t(typename, args):
- try:
- return getattr(VPPFormat, 'unformat_' + typename)(args)
- except AttributeError:
- # Type without explicit override
- return VPPFormat.unformat(args)
-
- # Default handling
- return args
+try:
+ text_type = unicode
+except NameError:
+ text_type = str
+
+# Copies from vl_api_address_t definition
+ADDRESS_IP4 = 0
+ADDRESS_IP6 = 1
+
+#
+# Type conversion for input arguments and return values
+#
+
+
+def format_vl_api_address_t(args):
+ try:
+ return {'un': {'ip6': inet_pton(AF_INET6, args)},
+ 'af': ADDRESS_IP6}
+ # PY2: raises socket.error
+ # PY3: raises OSError
+ except (socket.error, OSError):
+ return {'un': {'ip4': inet_pton(AF_INET, args)},
+ 'af': ADDRESS_IP4}
+
+
+def format_vl_api_prefix_t(args):
+ if isinstance(args, (ipaddress.IPv4Network, ipaddress.IPv6Network)):
+ return {'address': format_vl_api_address_t(
+ text_type(args.network_address)),
+ 'len': int(args.prefixlen)}
+ p, length = args.split('/')
+ return {'address': format_vl_api_address_t(p),
+ 'len': int(length)}
+
+
+def format_vl_api_ip6_prefix_t(args):
+ if isinstance(args, ipaddress.IPv6Network):
+ return {'address': args.network_address.packed,
+ 'len': int(args.prefixlen)}
+ p, length = args.split('/')
+ return {'address': inet_pton(AF_INET6, p),
+ 'len': int(length)}
+
+
+def format_vl_api_ip4_prefix_t(args):
+ if isinstance(args, ipaddress.IPv4Network):
+ return {'address': args.network_address.packed,
+ 'len': int(args.prefixlen)}
+ p, length = args.split('/')
+ return {'address': inet_pton(AF_INET, p),
+ 'len': int(length)}
+
+
+conversion_table = {
+ 'vl_api_ip6_address_t':
+ {
+ 'IPv6Address': lambda o: o.packed,
+ 'str': lambda s: inet_pton(AF_INET6, s)
+ },
+ 'vl_api_ip4_address_t':
+ {
+ 'IPv4Address': lambda o: o.packed,
+ 'str': lambda s: inet_pton(AF_INET, s)
+ },
+ 'vl_api_ip6_prefix_t':
+ {
+ 'IPv6Network': lambda o: {'address': o.network_address.packed,
+ 'len': o.prefixlen},
+ 'str': lambda s: format_vl_api_ip6_prefix_t(s)
+ },
+ 'vl_api_ip4_prefix_t':
+ {
+ 'IPv4Network': lambda o: {'address': o.network_address.packed,
+ 'len': o.prefixlen},
+ 'str': lambda s: format_vl_api_ip4_prefix_t(s)
+ },
+ 'vl_api_address_t':
+ {
+ 'IPv4Address': lambda o: {'af': ADDRESS_IP4, 'un': {'ip4': o.packed}},
+ 'IPv6Address': lambda o: {'af': ADDRESS_IP6, 'un': {'ip6': o.packed}},
+ 'str': lambda s: format_vl_api_address_t(s)
+ },
+ 'vl_api_prefix_t':
+ {
+ 'IPv4Network': lambda o: {'address':
+ {'af': ADDRESS_IP4, 'un':
+ {'ip4': o.network_address.packed}},
+ 'len': o.prefixlen},
+ 'IPv6Network': lambda o: {'address':
+ {'af': ADDRESS_IP6, 'un':
+ {'ip6': o.network_address.packed}},
+ 'len': o.prefixlen},
+ 'str': lambda s: format_vl_api_prefix_t(s)
+ },
+ 'vl_api_mac_address_t':
+ {
+ 'MACAddress': lambda o: o.packed,
+ 'str': lambda s: macaddress.mac_pton(s)
+ },
+ 'vl_api_timestamp_t':
+ {
+ 'datetime.datetime': lambda o:
+ (o - datetime.datetime(1970, 1, 1)).total_seconds()
+ }
+}
+
+
+def unformat_api_address_t(o):
+ if o.af == 1:
+ return ipaddress.IPv6Address(o.un.ip6)
+ if o.af == 0:
+ return ipaddress.IPv4Address(o.un.ip4)
+
+
+def unformat_api_prefix_t(o):
+ if isinstance(o.address, ipaddress.IPv4Address):
+ return ipaddress.IPv4Network((o.address, o.len), False)
+ if isinstance(o.address, ipaddress.IPv6Address):
+ return ipaddress.IPv6Network((o.address, o.len), False)
+
+
+conversion_unpacker_table = {
+ 'vl_api_ip6_address_t': lambda o: ipaddress.IPv6Address(o),
+ 'vl_api_ip6_prefix_t': lambda o: ipaddress.IPv6Network((o.address, o.len)),
+ 'vl_api_ip4_address_t': lambda o: ipaddress.IPv4Address(o),
+ 'vl_api_ip4_prefix_t': lambda o: ipaddress.IPv4Network((o.address, o.len)),
+ 'vl_api_address_t': lambda o: unformat_api_address_t(o),
+ 'vl_api_prefix_t': lambda o: unformat_api_prefix_t(o),
+ 'vl_api_mac_address_t': lambda o: macaddress.MACAddress(o),
+ 'vl_api_timestamp_t': lambda o: datetime.datetime.fromtimestamp(o),
+ 'vl_api_timedelta_t': lambda o: datetime.timedelta(seconds=o),
+}