VOM: mroutes
[vpp.git] / test / vpp_ip.py
index 912d843..fe985fb 100644 (file)
@@ -2,26 +2,25 @@
   IP Types
 
 """
+import logging
 
 from ipaddress import ip_address
+from socket import AF_INET, AF_INET6
+from vpp_papi import VppEnum
 
+_log = logging.getLogger(__name__)
 
-class IpAddressFamily:
-    ADDRESS_IP4 = 0
-    ADDRESS_IP6 = 1
 
-
-INVALID_INDEX = 0xffffffff
+class DpoProto:
+    DPO_PROTO_IP4 = 0
+    DPO_PROTO_IP6 = 1
+    DPO_PROTO_MPLS = 2
+    DPO_PROTO_ETHERNET = 3
+    DPO_PROTO_BIER = 4
+    DPO_PROTO_NSH = 5
 
 
-def compare_ip_address(api_address, py_address):
-    if 4 is py_address.version:
-        if py_address.packed == api_address.ip4.address:
-            return True
-    else:
-        if py_address.packed == api_address.ip6.address:
-            return True
-    return False
+INVALID_INDEX = 0xffffffff
 
 
 class VppIpAddressUnion():
@@ -29,6 +28,12 @@ class VppIpAddressUnion():
         self.addr = addr
         self.ip_addr = ip_address(unicode(self.addr))
 
+    def encode(self):
+        if self.version == 6:
+            return {'ip6': self.ip_addr.packed}
+        else:
+            return {'ip4': self.ip_addr.packed}
+
     @property
     def version(self):
         return self.ip_addr.version
@@ -37,19 +42,28 @@ class VppIpAddressUnion():
     def address(self):
         return self.addr
 
-    def encode(self):
-        if self.ip_addr.version is 6:
-            return {
-                'ip6': {
-                    'address': self.ip_addr.packed
-                },
-            }
+    @property
+    def length(self):
+        return self.ip_addr.max_prefixlen
+
+    @property
+    def bytes(self):
+        return self.ip_addr.packed
+
+    def __eq__(self, other):
+        if isinstance(other, self.__class__):
+            return self.ip_addr == other.ip_addr
+        elif hasattr(other, "ip4") and hasattr(other, "ip6"):
+            # vl_api_address_union_t
+            if 4 == self.version:
+                return self.ip_addr.packed == other.ip4
+            else:
+                return self.ip_addr.packed == other.ip6
         else:
-            return {
-                'ip4': {
-                    'address': self.ip_addr.packed
-                },
-            }
+            _log.error("Comparing VppIpAddressUnions:%s"
+                       " with incomparable type: %s",
+                       self, other)
+            return NotImplemented
 
 
 class VppIpAddress():
@@ -57,32 +71,84 @@ class VppIpAddress():
         self.addr = VppIpAddressUnion(addr)
 
     def encode(self):
-        if self.addr.version is 6:
+        if self.addr.version == 6:
             return {
-                'af': IpAddressFamily.ADDRESS_IP6,
+                'af': VppEnum.vl_api_address_family_t.ADDRESS_IP6,
                 'un': self.addr.encode()
             }
         else:
             return {
-                'af': IpAddressFamily.ADDRESS_IP4,
+                'af': VppEnum.vl_api_address_family_t.ADDRESS_IP4,
                 'un': self.addr.encode()
             }
 
+    def __eq__(self, other):
+        if isinstance(other, self.__class__):
+            return self.addr == other.addr
+        elif hasattr(other, "af") and hasattr(other, "un"):
+            # a vp_api_address_t
+            if 4 == self.version:
+                return other.af == \
+                    VppEnum.vl_api_address_family_t.ADDRESS_IP4 and \
+                    other.un == self.addr
+            else:
+                return other.af == \
+                    VppEnum.vl_api_address_family_t.ADDRESS_IP6 and \
+                    other.un == self.addr
+        else:
+            _log.error(
+                "Comparing VppIpAddress:<%s> %s with incomparable "
+                "type: <%s> %s",
+                self.__class__.__name__, self,
+                other.__class__.__name__, other)
+            return NotImplemented
+
+    def __ne__(self, other):
+        return not (self == other)
+
+    def __str__(self):
+        return self.address
+
+    @property
+    def bytes(self):
+        return self.addr.bytes
+
     @property
     def address(self):
         return self.addr.address
 
+    @property
+    def length(self):
+        return self.addr.length
+
+    @property
+    def version(self):
+        return self.addr.version
+
+    @property
+    def is_ip6(self):
+        return (self.version == 6)
+
+    @property
+    def af(self):
+        if self.version == 6:
+            return AF_INET6
+        else:
+            return AF_INET
+
+    @property
+    def dpo_proto(self):
+        if self.version == 6:
+            return DpoProto.DPO_PROTO_IP6
+        else:
+            return DpoProto.DPO_PROTO_IP4
+
 
 class VppIpPrefix():
     def __init__(self, addr, len):
         self.addr = VppIpAddress(addr)
         self.len = len
 
-    def __eq__(self, other):
-        if self.addr == other.addr and self.len == other.len:
-            return True
-        return False
-
     def encode(self):
         return {'address': self.addr.encode(),
                 'address_length': self.len}
@@ -91,6 +157,34 @@ class VppIpPrefix():
     def address(self):
         return self.addr.address
 
+    @property
+    def bytes(self):
+        return self.addr.bytes
+
+    @property
+    def length(self):
+        return self.len
+
+    @property
+    def is_ip6(self):
+        return self.addr.is_ip6
+
+    def __str__(self):
+        return "%s/%d" % (self.address, self.length)
+
+    def __eq__(self, other):
+        if isinstance(other, self.__class__):
+            return (self.len == other.len and self.addr == other.addr)
+        elif hasattr(other, "address") and hasattr(other, "address_length"):
+            # vl_api_prefix_t
+            return self.len == other.address_length and \
+                   self.addr == other.address
+        else:
+            _log.error(
+                "Comparing VppIpPrefix:%s with incomparable type: %s" %
+                (self, other))
+            return NotImplemented
+
 
 class VppIpMPrefix():
     def __init__(self, saddr, gaddr, len):
@@ -99,37 +193,23 @@ class VppIpMPrefix():
         self.len = len
         self.ip_saddr = ip_address(unicode(self.saddr))
         self.ip_gaddr = ip_address(unicode(self.gaddr))
+        if self.ip_saddr.version != self.ip_gaddr.version:
+            raise ValueError('Source and group addresses must be of the '
+                             'same address family.')
 
     def encode(self):
-
-        if 6 is self.ip_saddr.version:
+        if 6 == self.ip_saddr.version:
             prefix = {
-                'af': IpAddressFamily.ADDRESS_IP6,
-                'grp_address': {
-                    'ip6': {
-                        'address': self.ip_gaddr.packed
-                    },
-                },
-                'src_address': {
-                    'ip6': {
-                        'address': self.ip_saddr.packed
-                    },
-                },
+                'af': VppEnum.vl_api_address_family_t.ADDRESS_IP6,
+                'grp_address': {'ip6': self.ip_gaddr.packed},
+                'src_address': {'ip6': self.ip_saddr.packed},
                 'grp_address_length': self.len,
             }
         else:
             prefix = {
-                'af': IpAddressFamily.ADDRESS_IP4,
-                'grp_address': {
-                    'ip4': {
-                        'address': self.ip_gaddr.packed
-                    },
-                },
-                'src_address': {
-                    'ip4': {
-                        'address': self.ip_saddr.packed
-                    },
-                },
+                'af': VppEnum.vl_api_address_family_t.ADDRESS_IP4,
+                'grp_address': {'ip4': self.ip_gaddr.packed},
+                'src_address': {'ip4': self.ip_saddr.packed},
                 'grp_address_length': self.len,
             }
         return prefix