# See the License for the specific language governing permissions and
# limitations under the License.
#
+import datetime
+from socket import inet_pton, AF_INET6, AF_INET
+import socket
+import ipaddress
+from . import macaddress
-from socket import inet_pton, inet_ntop, AF_INET6, AF_INET
-
-
-class VPPFormat:
- @staticmethod
- def format_vl_api_ip6_prefix_t(args):
- prefix, len = args.split('/')
- return {'prefix': {'address': inet_pton(AF_INET6, prefix)},
- 'len': int(len)}
-
- @staticmethod
- def unformat_vl_api_ip6_prefix_t(args):
- return "{}/{}".format(inet_ntop(AF_INET6, args.prefix.address),
- args.len)
-
- @staticmethod
- def format_vl_api_ip4_prefix_t(args):
- prefix, len = args.split('/')
- return {'prefix': {'address': inet_pton(AF_INET, prefix)},
- 'len': int(len)}
-
- @staticmethod
- def unformat_vl_api_ip4_prefix_t(args):
- return "{}/{}".format(inet_ntop(AF_INET, args.prefix.address),
- args.len)
-
- @staticmethod
- def format_vl_api_ip6_address_t(args):
- return {'address': inet_pton(AF_INET6, args)}
-
- @staticmethod
- def format_vl_api_ip4_address_t(args):
- return {'address': inet_pton(AF_INET, args)}
-
- @staticmethod
- def format_vl_api_address_t(args):
- try:
- return {'un': {'ip6': {'address': inet_pton(AF_INET6, args)}},
- 'af': int(1)}
- except Exception as e:
- return {'un': {'ip4': {'address': inet_pton(AF_INET, args)}},
- 'af': int(0)}
-
- @staticmethod
- def unformat_vl_api_address_t(arg):
- if arg.af == 1:
- return inet_ntop(AF_INET6, arg.un.ip6.address)
- if arg.af == 0:
- return inet_ntop(AF_INET, arg.un.ip4.address)
- raise
-
- @staticmethod
- def format_vl_api_prefix_t(args):
- prefix, len = args.split('/')
- return {'address': VPPFormat.format_vl_api_address_t(prefix),
- 'address_length': int(len)}
-
- @staticmethod
- def unformat_vl_api_prefix_t(arg):
- if arg.address.af == 1:
- return "{}/{}".format(inet_ntop(AF_INET6,
- arg.address.un.ip6.address),
- arg.address_length)
- if arg.address.af == 0:
- return "{}/{}".format(inet_ntop(AF_INET,
- arg.address.un.ip4.address),
- arg.address_length)
- raise
-
- @staticmethod
- def format_u8(args):
- try:
- return int(args)
- except Exception as e:
- return args.encode()
-
- @staticmethod
- def format(typename, args):
- try:
- return getattr(VPPFormat, 'format_' + typename)(args)
- except AttributeError:
- # Default
- return (int(args))
-
- @staticmethod
- def unformat_bytes(args):
- try:
- return args.decode('utf-8')
- except Exception as e:
- return args
-
- @staticmethod
- def unformat_list(args):
- s = '['
- for f in args:
- t = type(f).__name__
- if type(f) is int:
- s2 = str(f)
- else:
- s2 = VPPFormat.unformat_t(t, f)
- s += '{} '.format(s2)
- return s[:-1] + ']'
-
- @staticmethod
- def unformat(args):
- s = ''
- return VPPFormat.unformat_t(type(args).__name__, args)
- '''
- for i, f in enumerate(args):
- print('F', f)
- t = type(f).__name__
- if type(f) is int:
- s2 = str(f)
- else:
- s2 = VPPFormat.unformat_t(t, f)
- s += '{} {} '.format(args._fields[i], s2)
- return s[:-1]
- '''
-
- @staticmethod
- def unformat_t(typename, args):
- try:
- return getattr(VPPFormat, 'unformat_' + typename)(args)
- except AttributeError:
- # Type without explicit override
- return VPPFormat.unformat(args)
-
- # Default handling
- return args
+
+# Copies from vl_api_address_t definition
+ADDRESS_IP4 = 0
+ADDRESS_IP6 = 1
+
+
+def verify_enum_hint(e):
+ return (e.ADDRESS_IP4.value == ADDRESS_IP4) and (e.ADDRESS_IP6.value == ADDRESS_IP6)
+
+
+#
+# Type conversion for input arguments and return values
+#
+
+
+def format_vl_api_address_t(args):
+ try:
+ return {"un": {"ip6": inet_pton(AF_INET6, args)}, "af": ADDRESS_IP6}
+ # PY2: raises socket.error
+ # PY3: raises OSError
+ except (socket.error, OSError):
+ return {"un": {"ip4": inet_pton(AF_INET, args)}, "af": ADDRESS_IP4}
+
+
+def format_vl_api_prefix_t(args):
+ if isinstance(args, (ipaddress.IPv4Network, ipaddress.IPv6Network)):
+ return {
+ "address": format_vl_api_address_t(str(args.network_address)),
+ "len": int(args.prefixlen),
+ }
+ p, length = args.split("/")
+ return {"address": format_vl_api_address_t(p), "len": int(length)}
+
+
+def format_vl_api_address_with_prefix_t(args):
+ if isinstance(args, (ipaddress.IPv4Interface, ipaddress.IPv6Interface)):
+ return {
+ "address": format_vl_api_address_t(str(args.network_address)),
+ "len": int(args.prefixlen),
+ }
+ p, length = args.split("/")
+ return {"address": format_vl_api_address_t(p), "len": int(length)}
+
+
+def format_vl_api_ip6_prefix_t(args):
+ if isinstance(args, ipaddress.IPv6Network):
+ return {"address": args.network_address.packed, "len": int(args.prefixlen)}
+ p, length = args.split("/")
+ return {"address": inet_pton(AF_INET6, p), "len": int(length)}
+
+
+def format_vl_api_ip6_address_with_prefix_t(args):
+ if isinstance(args, ipaddress.IPv6Interface):
+ return {"address": args.network_address.packed, "len": int(args.prefixlen)}
+ p, length = args.split("/")
+ return {"address": inet_pton(AF_INET6, p), "len": int(length)}
+
+
+def format_vl_api_ip4_prefix_t(args):
+ if isinstance(args, ipaddress.IPv4Network):
+ return {"address": args.network_address.packed, "len": int(args.prefixlen)}
+ p, length = args.split("/")
+ return {"address": inet_pton(AF_INET, p), "len": int(length)}
+
+
+def format_vl_api_ip4_address_with_prefix_t(args):
+ if isinstance(args, ipaddress.IPv4Interface):
+ return {"address": args.network_address.packed, "len": int(args.prefixlen)}
+ p, length = args.split("/")
+ return {"address": inet_pton(AF_INET, p), "len": int(length)}
+
+
+conversion_table = {
+ "vl_api_ip6_address_t": {
+ "IPv6Address": lambda o: o.packed,
+ "str": lambda s: inet_pton(AF_INET6, s),
+ },
+ "vl_api_ip4_address_t": {
+ "IPv4Address": lambda o: o.packed,
+ "str": lambda s: inet_pton(AF_INET, s),
+ },
+ "vl_api_ip6_prefix_t": {
+ "IPv6Network": lambda o: {
+ "address": o.network_address.packed,
+ "len": o.prefixlen,
+ },
+ "str": lambda s: format_vl_api_ip6_prefix_t(s),
+ },
+ "vl_api_ip4_prefix_t": {
+ "IPv4Network": lambda o: {
+ "address": o.network_address.packed,
+ "len": o.prefixlen,
+ },
+ "str": lambda s: format_vl_api_ip4_prefix_t(s),
+ },
+ "vl_api_address_t": {
+ "IPv4Address": lambda o: {"af": ADDRESS_IP4, "un": {"ip4": o.packed}},
+ "IPv6Address": lambda o: {"af": ADDRESS_IP6, "un": {"ip6": o.packed}},
+ "str": lambda s: format_vl_api_address_t(s),
+ },
+ "vl_api_prefix_t": {
+ "IPv4Network": lambda o: {
+ "address": {"af": ADDRESS_IP4, "un": {"ip4": o.network_address.packed}},
+ "len": o.prefixlen,
+ },
+ "IPv6Network": lambda o: {
+ "address": {"af": ADDRESS_IP6, "un": {"ip6": o.network_address.packed}},
+ "len": o.prefixlen,
+ },
+ "str": lambda s: format_vl_api_prefix_t(s),
+ },
+ "vl_api_address_with_prefix_t": {
+ "IPv4Interface": lambda o: {
+ "address": {"af": ADDRESS_IP4, "un": {"ip4": o.packed}},
+ "len": o.network.prefixlen,
+ },
+ "IPv6Interface": lambda o: {
+ "address": {"af": ADDRESS_IP6, "un": {"ip6": o.packed}},
+ "len": o.network.prefixlen,
+ },
+ "str": lambda s: format_vl_api_address_with_prefix_t(s),
+ },
+ "vl_api_ip4_address_with_prefix_t": {
+ "IPv4Interface": lambda o: {"address": o.packed, "len": o.network.prefixlen},
+ "str": lambda s: format_vl_api_ip4_address_with_prefix_t(s),
+ },
+ "vl_api_ip6_address_with_prefix_t": {
+ "IPv6Interface": lambda o: {"address": o.packed, "len": o.network.prefixlen},
+ "str": lambda s: format_vl_api_ip6_address_with_prefix_t(s),
+ },
+ "vl_api_mac_address_t": {
+ "MACAddress": lambda o: o.packed,
+ "str": lambda s: macaddress.mac_pton(s),
+ },
+ "vl_api_timestamp_t": {
+ "datetime.datetime": lambda o: (
+ o - datetime.datetime(1970, 1, 1)
+ ).total_seconds()
+ },
+}
+
+
+def unformat_api_address_t(o):
+ if o.af == 1:
+ return ipaddress.IPv6Address(o.un.ip6)
+ if o.af == 0:
+ return ipaddress.IPv4Address(o.un.ip4)
+ return None
+
+
+def unformat_api_prefix_t(o):
+ if o.address.af == 1:
+ return ipaddress.IPv6Network((o.address.un.ip6, o.len), False)
+ if o.address.af == 0:
+ return ipaddress.IPv4Network((o.address.un.ip4, o.len), False)
+ return None
+
+ if isinstance(o.address, ipaddress.IPv4Address):
+ return ipaddress.IPv4Network((o.address, o.len), False)
+ if isinstance(o.address, ipaddress.IPv6Address):
+ return ipaddress.IPv6Network((o.address, o.len), False)
+ raise ValueError("Unknown instance {}", format(o))
+
+
+def unformat_api_address_with_prefix_t(o):
+ if o.address.af == 1:
+ return ipaddress.IPv6Interface((o.address.un.ip6, o.len))
+ if o.address.af == 0:
+ return ipaddress.IPv4Interface((o.address.un.ip4, o.len))
+ return None
+
+
+def unformat_api_ip4_address_with_prefix_t(o):
+ return ipaddress.IPv4Interface((o.address, o.len))
+
+
+def unformat_api_ip6_address_with_prefix_t(o):
+ return ipaddress.IPv6Interface((o.address, o.len))
+
+
+conversion_unpacker_table = {
+ "vl_api_ip6_address_t": lambda o: ipaddress.IPv6Address(o),
+ "vl_api_ip6_prefix_t": lambda o: ipaddress.IPv6Network((o.address, o.len)),
+ "vl_api_ip4_address_t": lambda o: ipaddress.IPv4Address(o),
+ "vl_api_ip4_prefix_t": lambda o: ipaddress.IPv4Network((o.address, o.len)),
+ "vl_api_address_t": lambda o: unformat_api_address_t(o),
+ "vl_api_prefix_t": lambda o: unformat_api_prefix_t(o),
+ "vl_api_address_with_prefix_t": lambda o: unformat_api_address_with_prefix_t(o),
+ "vl_api_ip4_address_with_prefix_t": lambda o: unformat_api_ip4_address_with_prefix_t(
+ o
+ ),
+ "vl_api_ip6_address_with_prefix_t": lambda o: unformat_api_ip6_address_with_prefix_t(
+ o
+ ),
+ "vl_api_mac_address_t": lambda o: macaddress.MACAddress(o),
+ "vl_api_timestamp_t": lambda o: datetime.datetime.fromtimestamp(o),
+ "vl_api_timedelta_t": lambda o: datetime.timedelta(seconds=o),
+}