UNIFORM = 1
+def ip_to_dpo_proto(addr):
+ if addr.version is 6:
+ return DpoProto.DPO_PROTO_IP6
+ else:
+ return DpoProto.DPO_PROTO_IP4
+
+
def find_route(test, ip_addr, len, table_id=0, inet=AF_INET):
if inet == AF_INET:
s = 4
return False
+def fib_interface_ip_prefix(test, address, length, sw_if_index):
+ vp = VppIpPrefix(address, length)
+ addrs = test.vapi.ip_address_dump(sw_if_index, is_ipv6=vp.is_ip6)
+
+ if vp.is_ip6:
+ n = 16
+ else:
+ n = 4
+
+ for a in addrs:
+ if a.prefix_length == length and \
+ a.sw_if_index == sw_if_index and \
+ a.ip[:n] == vp.bytes:
+ return True
+ return False
+
+
class VppIpTable(VppObject):
def __init__(self,
is_add=0)
def query_vpp_config(self):
+ if self.table_id == 0:
+ # the default table always exists
+ return False
# find the default route
return find_route(self._test,
"::" if self.is_ip6 else "0.0.0.0",
self.table_id))
+class VppIpInterfaceAddress(VppObject):
+
+ def __init__(self, test, intf, addr, len):
+ self._test = test
+ self.intf = intf
+ self.prefix = VppIpPrefix(addr, len)
+
+ def add_vpp_config(self):
+ self._test.vapi.sw_interface_add_del_address(
+ self.intf.sw_if_index,
+ self.prefix.bytes,
+ self.prefix.length,
+ is_add=1,
+ is_ipv6=self.prefix.is_ip6)
+ self._test.registry.register(self, self._test.logger)
+
+ def remove_vpp_config(self):
+ self._test.vapi.sw_interface_add_del_address(
+ self.intf.sw_if_index,
+ self.prefix.bytes,
+ self.prefix.length,
+ is_add=0,
+ is_ipv6=self.prefix.is_ip6)
+
+ def query_vpp_config(self):
+ return fib_interface_ip_prefix(self._test,
+ self.prefix.address,
+ self.prefix.length,
+ self.intf.sw_if_index)
+
+ def __str__(self):
+ return self.object_id()
+
+ def object_id(self):
+ return "interface-ip-%s-%s" % (self.intf, self.prefix)
+
+
+class VppIpInterfaceBind(VppObject):
+
+ def __init__(self, test, intf, table):
+ self._test = test
+ self.intf = intf
+ self.table = table
+
+ def add_vpp_config(self):
+ if self.table.is_ip6:
+ self.intf.set_table_ip6(self.table.table_id)
+ else:
+ self.intf.set_table_ip4(self.table.table_id)
+ self._test.registry.register(self, self._test.logger)
+
+ def remove_vpp_config(self):
+ if 0 == self.table.table_id:
+ return
+ if self.table.is_ip6:
+ self.intf.set_table_ip6(0)
+ else:
+ self.intf.set_table_ip4(0)
+
+ def query_vpp_config(self):
+ if 0 == self.table.table_id:
+ return False
+ return self._test.vapi.sw_interface_get_table(
+ self.intf.sw_if_index,
+ self.table.is_ip6).vrf_id == self.table.table_id
+
+ def __str__(self):
+ return self.object_id()
+
+ def object_id(self):
+ return "interface-bind-%s-%s" % (self.intf, self.table)
+
+
class VppMplsLabel(object):
def __init__(self, value, mode=MplsLspMode.PIPE, ttl=64, exp=0):
self.value = value