from socket import inet_pton, inet_ntop, AF_INET, AF_INET6
from vpp_ip import DpoProto, INVALID_INDEX, VppIpAddressUnion, \
VppIpMPrefix
-from ipaddress import ip_address, IPv4Network, IPv6Network
+from ipaddress import ip_network, ip_address, IPv4Network, IPv6Network
# from vnet/vnet/mpls/mpls_types.h
MPLS_IETF_MAX_LABEL = 0xfffff
def address_proto(ip_addr):
- if ip_addr.ip_addr.version is 4:
+ if ip_addr.ip_addr.version == 4:
return FibPathProto.FIB_PATH_NH_PROTO_IP4
else:
return FibPathProto.FIB_PATH_NH_PROTO_IP6
-def find_route(test, addr, len, table_id=0):
+def find_route(test, addr, len, table_id=0, sw_if_index=None):
prefix = mk_network(addr, len)
- if 4 is prefix.version:
+ if 4 == prefix.version:
routes = test.vapi.ip_route_dump(table_id, False)
else:
routes = test.vapi.ip_route_dump(table_id, True)
for e in routes:
if table_id == e.route.table_id \
and str(e.route.prefix) == str(prefix):
+ if not sw_if_index:
+ return True
+ else:
+ # should be only one path if the user is looking
+ # for the interface the route is reachable through
+ if e.route.n_paths != 1:
+ return False
+ else:
+ return (e.route.paths[0].sw_if_index == sw_if_index)
+
+ return False
+
+
+def find_route_in_dump(dump, route, table):
+ for r in dump:
+ if table.table_id == r.route.table_id \
+ and route.prefix == r.route.prefix:
+ if len(route.paths) == r.route.n_paths:
+ return True
+ return False
+
+
+def find_mroute_in_dump(dump, route, table):
+ for r in dump:
+ if table.table_id == r.route.table_id \
+ and route.prefix == r.route.prefix:
return True
return False
text_type(grp_addr),
grp_addr_len)
- if 4 is ip_mprefix.version:
+ if 4 == ip_mprefix.version:
routes = test.vapi.ip_mroute_dump(table_id, False)
else:
routes = test.vapi.ip_mroute_dump(table_id, True)
self.is_ip6 = is_ip6
def add_vpp_config(self):
- self._test.vapi.ip_table_add_del(is_ipv6=self.is_ip6, is_add=1,
- table_id=self.table_id)
+ self._test.vapi.ip_table_add_del(is_add=1,
+ table={'is_ip6': self.is_ip6,
+ 'table_id': self.table_id})
self._test.registry.register(self, self._test.logger)
+ return self
def remove_vpp_config(self):
- self._test.vapi.ip_table_add_del(is_ipv6=self.is_ip6, is_add=0,
- table_id=self.table_id)
+ self._test.vapi.ip_table_add_del(is_add=0,
+ table={'is_ip6': self.is_ip6,
+ 'table_id': self.table_id})
+
+ def replace_begin(self):
+ self._test.vapi.ip_table_replace_begin(
+ table={'is_ip6': self.is_ip6,
+ 'table_id': self.table_id})
+
+ def replace_end(self):
+ self._test.vapi.ip_table_replace_end(
+ table={'is_ip6': self.is_ip6,
+ 'table_id': self.table_id})
+
+ def flush(self):
+ self._test.vapi.ip_table_flush(table={'is_ip6': self.is_ip6,
+ 'table_id': self.table_id})
+
+ def dump(self):
+ return self._test.vapi.ip_route_dump(self.table_id, self.is_ip6)
+
+ def mdump(self):
+ return self._test.vapi.ip_mroute_dump(self.table_id, self.is_ip6)
def query_vpp_config(self):
if self.table_id == 0:
class VppIpInterfaceAddress(VppObject):
- def __init__(self, test, intf, addr, len):
+ def __init__(self, test, intf, addr, len, bind=None):
self._test = test
self.intf = intf
self.addr = addr
self.len = len
self.prefix = "%s/%d" % (addr, len)
+ self.host_len = ip_network(self.prefix, strict=False).max_prefixlen
+ self.table_id = 0
+ if bind:
+ self.table_id = bind.table.table_id
def add_vpp_config(self):
self._test.vapi.sw_interface_add_del_address(
sw_if_index=self.intf.sw_if_index, prefix=self.prefix,
is_add=1)
self._test.registry.register(self, self._test.logger)
+ return self
def remove_vpp_config(self):
self._test.vapi.sw_interface_add_del_address(
is_add=0)
def query_vpp_config(self):
- return fib_interface_ip_prefix(self._test,
- self.addr,
- self.len,
- self.intf.sw_if_index)
+ # search for the IP address mapping and the two expected
+ # FIB entries
+ v = ip_address(self.addr).version
+
+ if ((v == 4 and self.len < 31) or (v == 6 and self.len < 127)):
+ return (fib_interface_ip_prefix(self._test,
+ self.addr,
+ self.len,
+ self.intf.sw_if_index) &
+ find_route(self._test,
+ self.addr,
+ self.len,
+ table_id=self.table_id,
+ sw_if_index=self.intf.sw_if_index) &
+ find_route(self._test,
+ self.addr,
+ self.host_len,
+ table_id=self.table_id,
+ sw_if_index=self.intf.sw_if_index))
+ else:
+ return (fib_interface_ip_prefix(self._test,
+ self.addr,
+ self.len,
+ self.intf.sw_if_index) &
+ find_route(self._test,
+ self.addr,
+ self.host_len,
+ table_id=self.table_id,
+ sw_if_index=self.intf.sw_if_index))
+
+ def object_id(self):
+ return "interface-ip-%s-%d-%s" % (self.intf,
+ self.table_id,
+ self.prefix)
+
+
+class VppIp6LinkLocalAddress(VppObject):
+
+ def __init__(self, test, intf, addr):
+ self._test = test
+ self.intf = intf
+ self.addr = addr
+
+ def add_vpp_config(self):
+ self._test.vapi.sw_interface_ip6_set_link_local_address(
+ sw_if_index=self.intf.sw_if_index, ip=self.addr)
+ self._test.registry.register(self, self._test.logger)
+ return self
+
+ def remove_vpp_config(self):
+ # link locals can't be removed, only changed
+ pass
+
+ def query_vpp_config(self):
+ # no API to query
+ return False
def object_id(self):
- return "interface-ip-%s-%s" % (self.intf, self.prefix)
+ return "ip6-link-local-%s-%s" % (self.intf, self.addr)
class VppIpInterfaceBind(VppObject):
else:
self.intf.set_table_ip4(self.table.table_id)
self._test.registry.register(self, self._test.logger)
+ return self
def remove_vpp_config(self):
if 0 == self.table.table_id:
self.stats_index = r.stats_index
if self.register:
self._test.registry.register(self, self._test.logger)
+ return self
def remove_vpp_config(self):
# there's no need to issue different deletes for modified routes
for path in self.paths:
self.encoded_paths.append(path.encode())
+ def encode(self, paths=None):
+ _paths = self.encoded_paths if paths is None else paths
+ return {'table_id': self.table_id,
+ 'entry_flags': self.e_flags,
+ 'rpf_id': self.rpf_id,
+ 'prefix': self.prefix.encode(),
+ 'n_paths': len(_paths),
+ 'paths': _paths,
+ }
+
def add_vpp_config(self):
- r = self._test.vapi.ip_mroute_add_del(self.table_id,
- self.prefix.encode(),
- self.e_flags,
- self.rpf_id,
- self.encoded_paths,
+ r = self._test.vapi.ip_mroute_add_del(route=self.encode(),
+ is_multipath=1,
is_add=1)
self.stats_index = r.stats_index
self._test.registry.register(self, self._test.logger)
+ return self
def remove_vpp_config(self):
- self._test.vapi.ip_mroute_add_del(self.table_id,
- self.prefix.encode(),
- self.e_flags,
- self.rpf_id,
- self.encoded_paths,
+ self._test.vapi.ip_mroute_add_del(route=self.encode(),
+ is_multipath=1,
is_add=0)
def update_entry_flags(self, flags):
self.e_flags = flags
- self._test.vapi.ip_mroute_add_del(self.table_id,
- self.prefix.encode(),
- self.e_flags,
- self.rpf_id,
- [],
+ self._test.vapi.ip_mroute_add_del(route=self.encode(paths=[]),
+ is_multipath=1,
is_add=1)
def update_rpf_id(self, rpf_id):
self.rpf_id = rpf_id
- self._test.vapi.ip_mroute_add_del(self.table_id,
- self.prefix.encode(),
- self.e_flags,
- self.rpf_id,
- [],
+ self._test.vapi.ip_mroute_add_del(route=self.encode(paths=[]),
+ is_multipath=1,
is_add=1)
def update_path_flags(self, itf, flags):
for p in range(len(self.paths)):
if self.paths[p].nh_itf == itf:
self.paths[p].nh_i_flags = flags
- self.encoded_paths[p] = self.paths[p].encode()
- break
-
- self._test.vapi.ip_mroute_add_del(self.table_id,
- self.prefix.encode(),
- self.e_flags,
- self.rpf_id,
- [self.encoded_paths[p]],
- is_add=1,
- is_multipath=0)
+ self.encoded_paths[p] = self.paths[p].encode()
+ break
+
+ self._test.vapi.ip_mroute_add_del(
+ route=self.encode(
+ paths=[self.encoded_paths[p]]),
+ is_add=1,
+ is_multipath=0)
def query_vpp_config(self):
return find_mroute(self._test,