IP Types
"""
+import logging
from ipaddress import ip_address
+from socket import AF_INET, AF_INET6
+from vpp_papi import VppEnum
+from vpp_object import VppObject
+try:
+ text_type = unicode
+except NameError:
+ text_type = str
+_log = logging.getLogger(__name__)
-class IpAddressFamily:
- ADDRESS_IP4 = 0
- ADDRESS_IP6 = 1
+
+class DpoProto:
+ DPO_PROTO_IP4 = 0
+ DPO_PROTO_IP6 = 1
+ DPO_PROTO_MPLS = 2
+ DPO_PROTO_ETHERNET = 3
+ DPO_PROTO_BIER = 4
+ DPO_PROTO_NSH = 5
INVALID_INDEX = 0xffffffff
-def compare_ip_address(api_address, py_address):
- if 4 is py_address.version:
- if py_address.packed == api_address.ip4.address:
- return True
+def get_dpo_proto(addr):
+ if ip_address(addr).version == 6:
+ return DpoProto.DPO_PROTO_IP6
else:
- if py_address.packed == api_address.ip6.address:
- return True
- return False
+ return DpoProto.DPO_PROTO_IP4
class VppIpAddressUnion():
def __init__(self, addr):
self.addr = addr
- self.ip_addr = ip_address(unicode(self.addr))
+ self.ip_addr = ip_address(text_type(self.addr))
+
+ def encode(self):
+ if self.version == 6:
+ return {'ip6': self.ip_addr}
+ else:
+ return {'ip4': self.ip_addr}
@property
def version(self):
def address(self):
return self.addr
- def encode(self):
- if self.ip_addr.version is 6:
- return {
- 'ip6': {
- 'address': self.ip_addr.packed
- },
- }
+ @property
+ def length(self):
+ return self.ip_addr.max_prefixlen
+
+ @property
+ def bytes(self):
+ return self.ip_addr.packed
+
+ def __eq__(self, other):
+ if isinstance(other, self.__class__):
+ return self.ip_addr == other.ip_addr
+ elif hasattr(other, "ip4") and hasattr(other, "ip6"):
+ # vl_api_address_union_t
+ if 4 == self.version:
+ return self.ip_addr == other.ip4
+ else:
+ return self.ip_addr == other.ip6
else:
- return {
- 'ip4': {
- 'address': self.ip_addr.packed
- },
- }
+ raise Exception("Comparing VppIpAddressUnions:%s"
+ " with incomparable type: %s",
+ self, other)
+ def __ne__(self, other):
+ return not (self == other)
-class VppIpAddress():
- def __init__(self, addr):
- self.addr = VppIpAddressUnion(addr)
+ def __str__(self):
+ return str(self.ip_addr)
+
+
+class VppIpMPrefix():
+ def __init__(self, saddr, gaddr, glen):
+ self.saddr = saddr
+ self.gaddr = gaddr
+ self.glen = glen
+ if ip_address(self.saddr).version != \
+ ip_address(self.gaddr).version:
+ raise ValueError('Source and group addresses must be of the '
+ 'same address family.')
def encode(self):
- if self.addr.version is 6:
- return {
- 'af': IpAddressFamily.ADDRESS_IP6,
- 'un': self.addr.encode()
- }
- else:
- return {
- 'af': IpAddressFamily.ADDRESS_IP4,
- 'un': self.addr.encode()
- }
+ return {
+ 'af': ip_address(self.gaddr).vapi_af,
+ 'grp_address': {
+ ip_address(self.gaddr).vapi_af_name: self.gaddr
+ },
+ 'src_address': {
+ ip_address(self.saddr).vapi_af_name: self.saddr
+ },
+ 'grp_address_length': self.glen,
+ }
@property
- def address(self):
- return self.addr.address
+ def length(self):
+ return self.glen
+ @property
+ def version(self):
+ return ip_address(self.gaddr).version
-class VppIpPrefix():
- def __init__(self, addr, len):
- self.addr = VppIpAddress(addr)
- self.len = len
+ def __str__(self):
+ return "(%s,%s)/%d" % (self.saddr, self.gaddr, self.glen)
def __eq__(self, other):
- if self.addr == other.addr and self.len == other.len:
- return True
- return False
+ if isinstance(other, self.__class__):
+ return (self.glen == other.glen and
+ self.saddr == other.gaddr and
+ self.saddr == other.saddr)
+ elif (hasattr(other, "grp_address_length") and
+ hasattr(other, "grp_address") and
+ hasattr(other, "src_address")):
+ # vl_api_mprefix_t
+ if 4 == self.version:
+ return (self.glen == other.grp_address_length and
+ self.gaddr == str(other.grp_address.ip4) and
+ self.saddr == str(other.src_address.ip4))
+ else:
+ return (self.glen == other.grp_address_length and
+ self.gaddr == str(other.grp_address.ip6) and
+ self.saddr == str(other.src_address.ip6))
+ return NotImplemented
+
+
+class VppIpPuntPolicer(VppObject):
+ def __init__(self, test, policer_index, is_ip6=False):
+ self._test = test
+ self._policer_index = policer_index
+ self._is_ip6 = is_ip6
+
+ def add_vpp_config(self):
+ self._test.vapi.ip_punt_police(policer_index=self._policer_index,
+ is_ip6=self._is_ip6, is_add=True)
+
+ def remove_vpp_config(self):
+ self._test.vapi.ip_punt_police(policer_index=self._policer_index,
+ is_ip6=self._is_ip6, is_add=False)
+
+ def query_vpp_config(self):
+ NotImplemented
+
+
+class VppIpPuntRedirect(VppObject):
+ def __init__(self, test, rx_index, tx_index, nh_addr):
+ self._test = test
+ self._rx_index = rx_index
+ self._tx_index = tx_index
+ self._nh_addr = ip_address(nh_addr)
def encode(self):
- return {'address': self.addr.encode(),
- 'address_length': self.len}
+ return {"rx_sw_if_index": self._rx_index,
+ "tx_sw_if_index": self._tx_index, "nh": self._nh_addr}
- @property
- def address(self):
- return self.addr.address
+ def add_vpp_config(self):
+ self._test.vapi.ip_punt_redirect(punt=self.encode(), is_add=True)
+ self._test.registry.register(self, self._test.logger)
+ def remove_vpp_config(self):
+ self._test.vapi.ip_punt_redirect(punt=self.encode(), is_add=False)
-class VppIpMPrefix():
- def __init__(self, saddr, gaddr, len):
- self.saddr = saddr
- self.gaddr = gaddr
- self.len = len
- self.ip_saddr = ip_address(unicode(self.saddr))
- self.ip_gaddr = ip_address(unicode(self.gaddr))
+ def get_vpp_config(self):
+ is_ipv6 = True if self._nh_addr.version == 6 else False
+ return self._test.vapi.ip_punt_redirect_dump(
+ sw_if_index=self._rx_index, is_ipv6=is_ipv6)
- def encode(self):
+ def query_vpp_config(self):
+ if self.get_vpp_config():
+ return True
+ return False
- if 6 is self.ip_saddr.version:
- prefix = {
- 'af': IpAddressFamily.ADDRESS_IP6,
- 'grp_address': {
- 'ip6': {
- 'address': self.ip_gaddr.packed
- },
- },
- 'src_address': {
- 'ip6': {
- 'address': self.ip_saddr.packed
- },
- },
- 'grp_address_length': self.len,
- }
- else:
- prefix = {
- 'af': IpAddressFamily.ADDRESS_IP4,
- 'grp_address': {
- 'ip4': {
- 'address': self.ip_gaddr.packed
- },
- },
- 'src_address': {
- 'ip4': {
- 'address': self.ip_saddr.packed
- },
- },
- 'grp_address_length': self.len,
- }
- return prefix
+
+class VppIpPathMtu(VppObject):
+ def __init__(self, test, nh, pmtu, table_id=0):
+ self._test = test
+ self.nh = nh
+ self.pmtu = pmtu
+ self.table_id = table_id
+
+ def add_vpp_config(self):
+ self._test.vapi.ip_path_mtu_update(pmtu={'nh': self.nh,
+ 'table_id': self.table_id,
+ 'path_mtu': self.pmtu})
+ self._test.registry.register(self, self._test.logger)
+ return self
+
+ def modify(self, pmtu):
+ self.pmtu = pmtu
+ self._test.vapi.ip_path_mtu_update(pmtu={'nh': self.nh,
+ 'table_id': self.table_id,
+ 'path_mtu': self.pmtu})
+ return self
+
+ def remove_vpp_config(self):
+ self._test.vapi.ip_path_mtu_update(pmtu={'nh': self.nh,
+ 'table_id': self.table_id,
+ 'path_mtu': 0})
+
+ def query_vpp_config(self):
+ ds = list(self._test.vapi.vpp.details_iter(
+ self._test.vapi.ip_path_mtu_get))
+
+ for d in ds:
+ if self.nh == str(d.pmtu.nh) \
+ and self.table_id == d.pmtu.table_id \
+ and self.pmtu == d.pmtu.path_mtu:
+ return True
+ return False
+
+ def object_id(self):
+ return ("ip-path-mtu-%d-%s-%d" % (self.table_id,
+ self.nh,
+ self.pmtu))
+
+ def __str__(self):
+ return self.object_id()