object abstractions for representing IP routes in VPP
"""
-from vpp_object import *
+from vpp_object import VppObject
from socket import inet_pton, inet_ntop, AF_INET, AF_INET6
-from vpp_ip import *
+from vpp_ip import DpoProto, VppIpPrefix
# from vnet/vnet/mpls/mpls_types.h
MPLS_IETF_MAX_LABEL = 0xfffff
def ip_to_dpo_proto(addr):
- if addr.version is 6:
+ if addr.version == 6:
return DpoProto.DPO_PROTO_IP6
else:
return DpoProto.DPO_PROTO_IP4
route_addr = inet_pton(inet, ip_addr)
for e in routes:
if route_addr == e.address[:s] \
- and len == e.address_length \
- and table_id == e.table_id:
+ and len == e.address_length \
+ and table_id == e.table_id:
return True
return False
saddr = inet_pton(inet, src_addr)
for e in routes:
if gaddr == e.grp_address[:s] \
- and grp_addr_len == e.address_length \
- and saddr == e.src_address[:s] \
- and table_id == e.table_id:
+ and grp_addr_len == e.address_length \
+ and saddr == e.src_address[:s] \
+ and table_id == e.table_id:
return True
return False
dump = test.vapi.mpls_fib_dump()
for e in dump:
if label == e.label \
- and eos_bit == e.eos_bit \
- and table_id == e.table_id:
+ and eos_bit == e.eos_bit \
+ and table_id == e.table_id:
if not paths:
return True
else:
for a in addrs:
if a.prefix_length == length and \
- a.sw_if_index == sw_if_index and \
- a.ip[:n] == vp.bytes:
+ a.sw_if_index == sw_if_index and \
+ a.ip[:n] == vp.bytes:
return True
return False
self.is_ip6 = is_ip6
def add_vpp_config(self):
- self._test.vapi.ip_table_add_del(
- self.table_id,
- is_ipv6=self.is_ip6,
- is_add=1)
+ self._test.vapi.ip_table_add_del(is_ipv6=self.is_ip6, is_add=1,
+ table_id=self.table_id)
self._test.registry.register(self, self._test.logger)
def remove_vpp_config(self):
- self._test.vapi.ip_table_add_del(
- self.table_id,
- is_ipv6=self.is_ip6,
- is_add=0)
+ self._test.vapi.ip_table_add_del(is_ipv6=self.is_ip6, is_add=0,
+ table_id=self.table_id)
def query_vpp_config(self):
if self.table_id == 0:
def add_vpp_config(self):
self._test.vapi.sw_interface_add_del_address(
- self.intf.sw_if_index,
- self.prefix.bytes,
- self.prefix.length,
- is_add=1,
- is_ipv6=self.prefix.is_ip6)
+ sw_if_index=self.intf.sw_if_index, address=self.prefix.bytes,
+ address_length=self.prefix.length, is_ipv6=self.prefix.is_ip6,
+ is_add=1)
self._test.registry.register(self, self._test.logger)
def remove_vpp_config(self):
self._test.vapi.sw_interface_add_del_address(
- self.intf.sw_if_index,
- self.prefix.bytes,
- self.prefix.length,
- is_add=0,
- is_ipv6=self.prefix.is_ip6)
+ sw_if_index=self.intf.sw_if_index, address=self.prefix.bytes,
+ address_length=self.prefix.length, is_ipv6=self.prefix.is_ip6,
+ is_add=0)
def query_vpp_config(self):
return fib_interface_ip_prefix(self._test,
def encode(self):
return {'next_hop': self.nh_addr,
'weight': 1,
- 'afi': 0,
'preference': 0,
'table_id': self.nh_table_id,
'next_hop_id': self.next_hop_id,
def add_vpp_config(self):
if self.is_unreach or self.is_prohibit or self.is_drop:
r = self._test.vapi.ip_add_del_route(
- self.dest_addr,
- self.dest_addr_len,
- inet_pton(AF_INET6, "::"),
- 0xffffffff,
- is_local=self.is_local,
+ dst_address=self.dest_addr,
+ dst_address_length=self.dest_addr_len,
+ next_hop_address=inet_pton(
+ AF_INET6, "::"),
+ next_hop_sw_if_index=0xffffffff,
+ table_id=self.table_id,
+ is_drop=self.is_drop,
is_unreach=self.is_unreach,
is_prohibit=self.is_prohibit,
- is_drop=self.is_drop,
- table_id=self.table_id,
- is_ipv6=self.is_ip6)
+ is_ipv6=self.is_ip6,
+ is_local=self.is_local)
else:
for path in self.paths:
lstack = path.encode_labels()
r = self._test.vapi.ip_add_del_route(
- self.dest_addr,
- self.dest_addr_len,
- path.nh_addr,
- path.nh_itf,
- table_id=self.table_id,
- next_hop_out_label_stack=lstack,
+ dst_address=self.dest_addr,
+ dst_address_length=self.dest_addr_len,
+ next_hop_address=path.nh_addr,
+ next_hop_sw_if_index=path.nh_itf, table_id=self.table_id,
+ next_hop_table_id=path.nh_table_id,
next_hop_n_out_labels=len(lstack),
+ next_hop_out_label_stack=lstack,
next_hop_via_label=path.nh_via_label,
- next_hop_table_id=path.nh_table_id,
next_hop_id=path.next_hop_id,
- is_ipv6=self.is_ip6,
- is_dvr=path.is_dvr,
- is_local=self.is_local,
is_resolve_host=path.is_resolve_host,
is_resolve_attached=path.is_resolve_attached,
- is_source_lookup=path.is_source_lookup,
- is_udp_encap=path.is_udp_encap,
- is_multipath=1 if len(self.paths) > 1 else 0)
+ is_ipv6=self.is_ip6, is_local=self.is_local,
+ is_multipath=1 if len(self.paths) > 1 else 0,
+ is_dvr=path.is_dvr, is_udp_encap=path.is_udp_encap,
+ is_source_lookup=path.is_source_lookup)
self.stats_index = r.stats_index
self._test.registry.register(self, self._test.logger)
def remove_vpp_config(self):
if self.is_unreach or self.is_prohibit or self.is_drop:
self._test.vapi.ip_add_del_route(
- self.dest_addr,
- self.dest_addr_len,
- inet_pton(AF_INET6, "::"),
- 0xffffffff,
- is_local=self.is_local,
+ dst_address=self.dest_addr,
+ dst_address_length=self.dest_addr_len,
+ next_hop_address=inet_pton(
+ AF_INET6, "::"),
+ next_hop_sw_if_index=0xffffffff,
+ table_id=self.table_id, is_add=0,
is_unreach=self.is_unreach,
is_prohibit=self.is_prohibit,
- is_add=0,
- table_id=self.table_id,
- is_ipv6=self.is_ip6)
+ is_ipv6=self.is_ip6,
+ is_local=self.is_local)
else:
for path in self.paths:
self._test.vapi.ip_add_del_route(
- self.dest_addr,
- self.dest_addr_len,
- path.nh_addr,
- path.nh_itf,
+ dst_address=self.dest_addr,
+ dst_address_length=self.dest_addr_len,
+ next_hop_address=path.nh_addr,
+ next_hop_sw_if_index=path.nh_itf,
table_id=self.table_id,
next_hop_table_id=path.nh_table_id,
next_hop_via_label=path.nh_via_label,
next_hop_id=path.next_hop_id,
- is_add=0,
- is_udp_encap=path.is_udp_encap,
- is_ipv6=self.is_ip6,
- is_dvr=path.is_dvr)
+ is_add=0, is_ipv6=self.is_ip6,
+ is_dvr=path.is_dvr,
+ is_udp_encap=path.is_udp_encap)
def query_vpp_config(self):
return find_route(self._test,
dump = self._test.vapi.mpls_fib_dump()
for e in dump:
if self.local_label == e.label \
- and self.table_id == e.table_id:
+ and self.table_id == e.table_id:
return True
return False
lstack = path.encode_labels()
r = self._test.vapi.mpls_route_add_del(
- self.local_label,
- self.eos_bit,
- path.proto,
- path.nh_addr,
- path.nh_itf,
- is_multicast=self.is_multicast,
- is_multipath=is_multipath,
- table_id=self.table_id,
- is_interface_rx=path.is_interface_rx,
- is_rpf_id=path.is_rpf_id,
- next_hop_out_label_stack=lstack,
- next_hop_n_out_labels=len(lstack),
- next_hop_via_label=path.nh_via_label,
- next_hop_table_id=path.nh_table_id)
+ mr_label=self.local_label,
+ mr_eos=self.eos_bit,
+ mr_next_hop_proto=path.proto,
+ mr_next_hop=path.nh_addr,
+ mr_next_hop_sw_if_index=path.nh_itf,
+ mr_table_id=self.table_id,
+ mr_next_hop_table_id=path.nh_table_id,
+ mr_next_hop_n_out_labels=len(
+ lstack),
+ mr_next_hop_out_label_stack=lstack,
+ mr_next_hop_via_label=path.nh_via_label,
+ mr_is_interface_rx=path.is_interface_rx,
+ mr_is_rpf_id=path.is_rpf_id,
+ mr_is_multicast=self.is_multicast,
+ mr_is_multipath=is_multipath)
self.stats_index = r.stats_index
self._test.registry.register(self, self._test.logger)
def remove_vpp_config(self):
for path in self.paths:
- self._test.vapi.mpls_route_add_del(self.local_label,
- self.eos_bit,
- path.proto,
- path.nh_addr,
- path.nh_itf,
- is_rpf_id=path.is_rpf_id,
- table_id=self.table_id,
- is_add=0)
+ self._test.vapi.mpls_route_add_del(
+ mr_label=self.local_label,
+ mr_eos=self.eos_bit,
+ mr_next_hop_proto=path.proto,
+ mr_next_hop=path.nh_addr,
+ mr_next_hop_sw_if_index=path.nh_itf,
+ mr_table_id=self.table_id,
+ mr_is_rpf_id=path.is_rpf_id,
+ mr_is_add=0)
def query_vpp_config(self):
return find_mpls_route(self._test, self.table_id,
return ("%d:%s/%d"
% (self.table_id,
self.local_label,
- 20+self.eos_bit))
+ 20 + self.eos_bit))
def get_stats_to(self):
c = self._test.statistics.get_counter("/net/route/to")