-#!/usr/bin/env python
+#!/usr/bin/env python3
-import unittest
import socket
+import unittest
+
+from parameterized import parameterized
+import scapy.compat
+import scapy.layers.inet6 as inet6
+from scapy.contrib.mpls import MPLS
+from scapy.layers.inet6 import IPv6, ICMPv6ND_NS, ICMPv6ND_RS, \
+ ICMPv6ND_RA, ICMPv6NDOptMTU, ICMPv6NDOptSrcLLAddr, ICMPv6NDOptPrefixInfo, \
+ ICMPv6ND_NA, ICMPv6NDOptDstLLAddr, ICMPv6DestUnreach, icmp6types, \
+ ICMPv6TimeExceeded, ICMPv6EchoRequest, ICMPv6EchoReply, IPv6ExtHdrHopByHop
+from scapy.layers.l2 import Ether, Dot1Q
+from scapy.packet import Raw
+from scapy.utils import inet_pton, inet_ntop
+from scapy.utils6 import in6_getnsma, in6_getnsmac, in6_ptop, in6_islladdr, \
+ in6_mactoifaceid
+from six import moves
from framework import VppTestCase, VppTestRunner
-from util import ppp, ip6_normalize
-from vpp_sub_interface import VppSubInterface, VppDot1QSubint
-from vpp_pg_interface import is_ipv6_misc
+from util import ppp, ip6_normalize, mk_ll_addr
from vpp_ip import DpoProto
from vpp_ip_route import VppIpRoute, VppRoutePath, find_route, VppIpMRoute, \
VppMRoutePath, MRouteItfFlags, MRouteEntryFlags, VppMplsIpBind, \
- VppMplsRoute, VppMplsTable, VppIpTable, VppIpAddress
+ VppMplsRoute, VppMplsTable, VppIpTable, FibPathType, FibPathProto, \
+ VppIpInterfaceAddress, find_route_in_dump, find_mroute_in_dump
from vpp_neighbor import find_nbr, VppNeighbor
-
-from scapy.packet import Raw
-from scapy.layers.l2 import Ether, Dot1Q
-import scapy.layers.inet6 as inet6
-from scapy.layers.inet6 import IPv6, ICMPv6ND_NS, ICMPv6ND_RS, \
- ICMPv6ND_RA, getmacbyip6, ICMPv6MRD_Solicitation, \
- ICMPv6NDOptMTU, ICMPv6NDOptSrcLLAddr, ICMPv6NDOptPrefixInfo, \
- ICMPv6ND_NA, ICMPv6NDOptDstLLAddr, ICMPv6DestUnreach, icmp6types, \
- ICMPv6TimeExceeded, ICMPv6EchoRequest, ICMPv6EchoReply
-from scapy.utils6 import in6_getnsma, in6_getnsmac, in6_ptop, in6_islladdr, \
- in6_mactoifaceid, in6_ismaddr
-from scapy.utils import inet_pton, inet_ntop
-from scapy.contrib.mpls import MPLS
+from vpp_pg_interface import is_ipv6_misc
+from vpp_sub_interface import VppSubInterface, VppDot1QSubint
+from ipaddress import IPv6Network, IPv6Address
AF_INET6 = socket.AF_INET6
+try:
+ text_type = unicode
+except NameError:
+ text_type = str
-def mk_ll_addr(mac):
- euid = in6_mactoifaceid(mac)
- addr = "fe80::" + euid
- return addr
+NUM_PKTS = 67
class TestIPv6ND(VppTestCase):
def setUpClass(cls):
super(TestIPv6, cls).setUpClass()
+ @classmethod
+ def tearDownClass(cls):
+ super(TestIPv6, cls).tearDownClass()
+
def setUp(self):
"""
Perform test setup before test case.
i.config_ip6()
i.resolve_ndp()
- # config 2M FIB entries
- self.config_fib_entries(200)
-
def tearDown(self):
"""Run standard test teardown and log ``show ip6 neighbors``."""
for i in self.interfaces:
self.logger.info(self.vapi.cli("show ip6 neighbors"))
# info(self.vapi.cli("show ip6 fib")) # many entries
- def config_fib_entries(self, count):
- """For each interface add to the FIB table *count* routes to
- "fd02::1/128" destination with interface's local address as next-hop
- address.
-
- :param int count: Number of FIB entries.
-
- - *TODO:* check if the next-hop address shouldn't be remote address
- instead of local address.
- """
- n_int = len(self.interfaces)
- percent = 0
- counter = 0.0
- dest_addr = inet_pton(AF_INET6, "fd02::1")
- dest_addr_len = 128
- for i in self.interfaces:
- next_hop_address = i.local_ip6n
- for j in range(count / n_int):
- self.vapi.ip_add_del_route(
- dest_addr, dest_addr_len, next_hop_address, is_ipv6=1)
- counter += 1
- if counter / count * 100 > percent:
- self.logger.info("Configure %d FIB entries .. %d%% done" %
- (count, percent))
- percent += 1
-
def modify_packet(self, src_if, packet_size, pkt):
"""Add load, set destination IP and extend packet to required packet
size for defined interface.
:param int packet_size: Required packet size.
:param Scapy pkt: Packet to be modified.
"""
- dst_if_idx = packet_size / 10 % 2
+ dst_if_idx = int(packet_size / 10 % 2)
dst_if = self.flows[src_if][dst_if_idx]
info = self.create_packet_info(src_if, dst_if)
payload = self.info_to_payload(info)
inet6.UDP(sport=1234, dport=1234))
pkts = [self.modify_packet(src_if, i, pkt_tmpl)
- for i in xrange(self.pg_if_packet_sizes[0],
- self.pg_if_packet_sizes[1], 10)]
+ for i in moves.range(self.pg_if_packet_sizes[0],
+ self.pg_if_packet_sizes[1], 10)]
pkts_b = [self.modify_packet(src_if, i, pkt_tmpl)
- for i in xrange(self.pg_if_packet_sizes[1] + hdr_ext,
- self.pg_if_packet_sizes[2] + hdr_ext, 50)]
+ for i in moves.range(self.pg_if_packet_sizes[1] + hdr_ext,
+ self.pg_if_packet_sizes[2] + hdr_ext,
+ 50)]
pkts.extend(pkts_b)
return pkts
try:
ip = packet[IPv6]
udp = packet[inet6.UDP]
- payload_info = self.payload_to_info(str(packet[Raw]))
+ payload_info = self.payload_to_info(packet[Raw])
packet_index = payload_info.index
self.assertEqual(payload_info.dst, dst_sw_if_index)
self.logger.debug(
"Interface %s: Packet expected from interface %s "
"didn't arrive" % (dst_if.name, i.name))
+ def test_next_header_anomaly(self):
+ """ IPv6 next header anomaly test
+
+ Test scenario:
+ - ipv6 next header field = Fragment Header (44)
+ - next header is ICMPv6 Echo Request
+ - wait for reassembly
+ """
+ pkt = (Ether(src=self.pg0.local_mac, dst=self.pg0.remote_mac) /
+ IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6, nh=44) /
+ ICMPv6EchoRequest())
+
+ self.pg0.add_stream(pkt)
+ self.pg_start()
+
+ # wait for reassembly
+ self.sleep(10)
+
def test_fib(self):
""" IPv6 FIB test
self.pg0.sw_if_index,
self.pg0.remote_hosts[2].mac,
self.pg0.remote_hosts[2].ip6,
- af=AF_INET6,
is_no_fib_entry=1)
nd_entry.add_vpp_config()
#
self.assertTrue(find_nbr(self,
self.pg0.sw_if_index,
- self.pg0._remote_hosts[2].ip6,
- inet=AF_INET6))
+ self.pg0._remote_hosts[2].ip6))
self.assertFalse(find_route(self,
self.pg0._remote_hosts[2].ip6,
- 128,
- inet=AF_INET6))
+ 128))
#
# send an NS from a link local address to the interface's global
#
self.assertTrue(find_nbr(self,
self.pg0.sw_if_index,
- self.pg0._remote_hosts[2].ip6_ll,
- inet=AF_INET6))
+ self.pg0._remote_hosts[2].ip6_ll))
self.assertFalse(find_route(self,
self.pg0._remote_hosts[2].ip6_ll,
- 128,
- inet=AF_INET6))
+ 128))
#
# An NS to the router's own Link-local
#
self.assertTrue(find_nbr(self,
self.pg0.sw_if_index,
- self.pg0._remote_hosts[3].ip6_ll,
- inet=AF_INET6))
+ self.pg0._remote_hosts[3].ip6_ll))
self.assertFalse(find_route(self,
self.pg0._remote_hosts[3].ip6_ll,
- 128,
- inet=AF_INET6))
+ 128))
def test_ns_duplicates(self):
""" ND Duplicates"""
ns_pg1 = VppNeighbor(self,
self.pg1.sw_if_index,
self.pg1.remote_hosts[1].mac,
- self.pg1.remote_hosts[1].ip6,
- af=AF_INET6)
+ self.pg1.remote_hosts[1].ip6)
ns_pg1.add_vpp_config()
ns_pg2 = VppNeighbor(self,
self.pg2.sw_if_index,
self.pg2.remote_mac,
- self.pg1.remote_hosts[1].ip6,
- af=AF_INET6)
+ self.pg1.remote_hosts[1].ip6)
ns_pg2.add_vpp_config()
#
#
# remove the duplicate on pg1
- # packet stream shoud generate NSs out of pg1
+ # packet stream should generate NSs out of pg1
#
ns_pg1.remove_vpp_config()
# the options are nested in the scapy packet in way that i cannot
# decipher how to decode. this 1st layer of option always returns
# nested classes, so a direct obj1=obj2 comparison always fails.
- # however, the getlayer(.., 2) does give one instnace.
- # so we cheat here and construct a new opt instnace for comparison
+ # however, the getlayer(.., 2) does give one instance.
+ # so we cheat here and construct a new opt instance for comparison
rd = ICMPv6NDOptPrefixInfo(
prefixlen=raos.prefixlen,
prefix=raos.prefix,
rd = rx.getlayer(
ICMPv6NDOptPrefixInfo, ii + 2)
else:
- self.assertEqual(pi_opt, raos)
+ self.assertEqual(pi_opt, raos, 'Expected: %s, received: %s'
+ % (pi_opt.show(dump=True),
+ raos.show(dump=True)))
def send_and_expect_ra(self, intf, pkts, remark, dst_ip=None,
filter_out_fn=is_ipv6_misc,
self.send_and_assert_no_replies(self.pg0, pkts, "RA rate limited")
#
- # When we reconfiure the IPv6 RA config, we reset the RA rate limiting,
+ # When we reconfigure the IPv6 RA config,
+ # we reset the RA rate limiting,
# so we need to do this before each test below so as not to drop
# packets for rate limiting reasons. Test this works here.
#
#
# Configure The RA to announce the links prefix
#
- self.pg0.ip6_ra_prefix(self.pg0.local_ip6n,
- self.pg0.local_ip6_prefix_len)
+ self.pg0.ip6_ra_prefix('%s/%s' % (self.pg0.local_ip6,
+ self.pg0.local_ip6_prefix_len))
#
# RAs should now contain the prefix information option
# Change the prefix info to not off-link
# L-flag is clear
#
- self.pg0.ip6_ra_prefix(self.pg0.local_ip6n,
- self.pg0.local_ip6_prefix_len,
+ self.pg0.ip6_ra_prefix('%s/%s' % (self.pg0.local_ip6,
+ self.pg0.local_ip6_prefix_len),
off_link=1)
opt = ICMPv6NDOptPrefixInfo(
# Change the prefix info to not off-link, no-autoconfig
# L and A flag are clear in the advert
#
- self.pg0.ip6_ra_prefix(self.pg0.local_ip6n,
- self.pg0.local_ip6_prefix_len,
+ self.pg0.ip6_ra_prefix('%s/%s' % (self.pg0.local_ip6,
+ self.pg0.local_ip6_prefix_len),
off_link=1,
no_autoconfig=1)
# Change the flag settings back to the defaults
# L and A flag are set in the advert
#
- self.pg0.ip6_ra_prefix(self.pg0.local_ip6n,
- self.pg0.local_ip6_prefix_len)
+ self.pg0.ip6_ra_prefix('%s/%s' % (self.pg0.local_ip6,
+ self.pg0.local_ip6_prefix_len))
opt = ICMPv6NDOptPrefixInfo(
prefixlen=self.pg0.local_ip6_prefix_len,
# Change the prefix info to not off-link, no-autoconfig
# L and A flag are clear in the advert
#
- self.pg0.ip6_ra_prefix(self.pg0.local_ip6n,
- self.pg0.local_ip6_prefix_len,
+ self.pg0.ip6_ra_prefix('%s/%s' % (self.pg0.local_ip6,
+ self.pg0.local_ip6_prefix_len),
off_link=1,
no_autoconfig=1)
opt=opt)
#
- # Use the reset to defults option to revert to defaults
+ # Use the reset to defaults option to revert to defaults
# L and A flag are clear in the advert
#
- self.pg0.ip6_ra_prefix(self.pg0.local_ip6n,
- self.pg0.local_ip6_prefix_len,
+ self.pg0.ip6_ra_prefix('%s/%s' % (self.pg0.local_ip6,
+ self.pg0.local_ip6_prefix_len),
use_default=1)
opt = ICMPv6NDOptPrefixInfo(
#
# Advertise Another prefix. With no L-flag/A-flag
#
- self.pg0.ip6_ra_prefix(self.pg1.local_ip6n,
- self.pg1.local_ip6_prefix_len,
+ self.pg0.ip6_ra_prefix('%s/%s' % (self.pg1.local_ip6,
+ self.pg1.local_ip6_prefix_len),
off_link=1,
no_autoconfig=1)
opt=opt)
#
- # Remove the first refix-info - expect the second is still in the
+ # Remove the first prefix-info - expect the second is still in the
# advert
#
- self.pg0.ip6_ra_prefix(self.pg0.local_ip6n,
- self.pg0.local_ip6_prefix_len,
+ self.pg0.ip6_ra_prefix('%s/%s' % (self.pg0.local_ip6,
+ self.pg0.local_ip6_prefix_len),
is_no=1)
opt = ICMPv6NDOptPrefixInfo(
opt=opt)
#
- # Remove the second prefix-info - expect no prefix-info i nthe adverts
+ # Remove the second prefix-info - expect no prefix-info in the adverts
#
- self.pg0.ip6_ra_prefix(self.pg1.local_ip6n,
- self.pg1.local_ip6_prefix_len,
+ self.pg0.ip6_ra_prefix('%s/%s' % (self.pg1.local_ip6,
+ self.pg1.local_ip6_prefix_len),
is_no=1)
self.pg0.ip6_ra_config(send_unicast=1)
self.pg0.ip6_ra_config(no=1, suppress=1, send_unicast=0)
+class TestIPv6IfAddrRoute(VppTestCase):
+ """ IPv6 Interface Addr Route Test Case """
+
+ @classmethod
+ def setUpClass(cls):
+ super(TestIPv6IfAddrRoute, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(TestIPv6IfAddrRoute, cls).tearDownClass()
+
+ def setUp(self):
+ super(TestIPv6IfAddrRoute, self).setUp()
+
+ # create 1 pg interface
+ self.create_pg_interfaces(range(1))
+
+ for i in self.pg_interfaces:
+ i.admin_up()
+ i.config_ip6()
+ i.resolve_ndp()
+
+ def tearDown(self):
+ super(TestIPv6IfAddrRoute, self).tearDown()
+ for i in self.pg_interfaces:
+ i.unconfig_ip6()
+ i.admin_down()
+
+ def test_ipv6_ifaddrs_same_prefix(self):
+ """ IPv6 Interface Addresses Same Prefix test
+
+ Test scenario:
+
+ - Verify no route in FIB for prefix 2001:10::/64
+ - Configure IPv4 address 2001:10::10/64 on an interface
+ - Verify route in FIB for prefix 2001:10::/64
+ - Configure IPv4 address 2001:10::20/64 on an interface
+ - Delete 2001:10::10/64 from interface
+ - Verify route in FIB for prefix 2001:10::/64
+ - Delete 2001:10::20/64 from interface
+ - Verify no route in FIB for prefix 2001:10::/64
+ """
+
+ addr1 = "2001:10::10"
+ addr2 = "2001:10::20"
+
+ if_addr1 = VppIpInterfaceAddress(self, self.pg0, addr1, 64)
+ if_addr2 = VppIpInterfaceAddress(self, self.pg0, addr2, 64)
+ self.assertFalse(if_addr1.query_vpp_config())
+ self.assertFalse(find_route(self, addr1, 128))
+ self.assertFalse(find_route(self, addr2, 128))
+
+ # configure first address, verify route present
+ if_addr1.add_vpp_config()
+ self.assertTrue(if_addr1.query_vpp_config())
+ self.assertTrue(find_route(self, addr1, 128))
+ self.assertFalse(find_route(self, addr2, 128))
+
+ # configure second address, delete first, verify route not removed
+ if_addr2.add_vpp_config()
+ if_addr1.remove_vpp_config()
+ self.assertFalse(if_addr1.query_vpp_config())
+ self.assertTrue(if_addr2.query_vpp_config())
+ self.assertFalse(find_route(self, addr1, 128))
+ self.assertTrue(find_route(self, addr2, 128))
+
+ # delete second address, verify route removed
+ if_addr2.remove_vpp_config()
+ self.assertFalse(if_addr1.query_vpp_config())
+ self.assertFalse(find_route(self, addr1, 128))
+ self.assertFalse(find_route(self, addr2, 128))
+
+
class TestICMPv6Echo(VppTestCase):
""" ICMPv6 Echo Test Case """
+ @classmethod
+ def setUpClass(cls):
+ super(TestICMPv6Echo, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(TestICMPv6Echo, cls).tearDownClass()
+
def setUp(self):
super(TestICMPv6Echo, self).setUp()
icmpv6_id = 0xb
icmpv6_seq = 5
- icmpv6_data = '\x0a' * 18
+ icmpv6_data = b'\x0a' * 18
p_echo_request = (Ether(src=self.pg0.remote_mac,
dst=self.pg0.local_mac) /
IPv6(src=self.pg0.remote_ip6,
def setUpClass(cls):
super(TestIPv6RD, cls).setUpClass()
+ @classmethod
+ def tearDownClass(cls):
+ super(TestIPv6RD, cls).tearDownClass()
+
def setUp(self):
super(TestIPv6RD, self).setUp()
self.pg1.local_mac)
def verify_prefix_info(self, reported_prefix, prefix_option):
- prefix = socket.inet_pton(socket.AF_INET6,
- prefix_option.getfieldval("prefix"))
- self.assert_equal(reported_prefix.dst_address, prefix)
- self.assert_equal(reported_prefix.dst_address_length,
- prefix_option.getfieldval("prefixlen"))
+ prefix = IPv6Network(
+ text_type(prefix_option.getfieldval("prefix") +
+ "/" +
+ text_type(prefix_option.getfieldval("prefixlen"))),
+ strict=False)
+ self.assert_equal(reported_prefix.prefix.network_address,
+ prefix.network_address)
L = prefix_option.getfieldval("L")
A = prefix_option.getfieldval("A")
option_flags = (L << 7) | (A << 6)
def setUpClass(cls):
super(TestIPv6RDControlPlane, cls).setUpClass()
+ @classmethod
+ def tearDownClass(cls):
+ super(TestIPv6RDControlPlane, cls).tearDownClass()
+
def setUp(self):
super(TestIPv6RDControlPlane, self).setUp()
def get_default_routes(fib):
list = []
for entry in fib:
- if entry.address_length == 0:
- for path in entry.path:
+ if entry.route.prefix.prefixlen == 0:
+ for path in entry.route.paths:
if path.sw_if_index != 0xFFFFFFFF:
defaut_route = {}
defaut_route['sw_if_index'] = path.sw_if_index
- defaut_route['next_hop'] = path.next_hop
+ defaut_route['next_hop'] = path.nh.address.ip6
list.append(defaut_route)
return list
def get_interface_addresses(fib, pg):
list = []
for entry in fib:
- if entry.address_length == 128:
- path = entry.path[0]
+ if entry.route.prefix.prefixlen == 128:
+ path = entry.route.paths[0]
if path.sw_if_index == pg.sw_if_index:
- list.append(entry.address)
+ list.append(str(entry.route.prefix.network_address))
return list
def test_all(self):
""" Test handling of SLAAC addresses and default routes """
- fib = self.vapi.ip6_fib_dump()
+ fib = self.vapi.ip_route_dump(0, True)
default_routes = self.get_default_routes(fib)
initial_addresses = set(self.get_interface_addresses(fib, self.pg0))
self.assertEqual(default_routes, [])
- router_address = self.pg0.remote_ip6n_ll
+ router_address = IPv6Address(text_type(self.pg0.remote_ip6_ll))
self.vapi.ip6_nd_address_autoconfig(self.pg0.sw_if_index, 1, 1)
self.pg0.add_stream([packet])
self.pg_start()
- self.sleep(0.1)
+ self.sleep_on_vpp_time(0.1)
- fib = self.vapi.ip6_fib_dump()
+ fib = self.vapi.ip_route_dump(0, True)
# check FIB for new address
addresses = set(self.get_interface_addresses(fib, self.pg0))
new_addresses = addresses.difference(initial_addresses)
self.assertEqual(len(new_addresses), 1)
- prefix = list(new_addresses)[0][:8] + '\0\0\0\0\0\0\0\0'
- self.assertEqual(inet_ntop(AF_INET6, prefix), '1::')
+ prefix = IPv6Network(text_type("%s/%d" % (list(new_addresses)[0], 20)),
+ strict=False)
+ self.assertEqual(prefix, IPv6Network(text_type('1::/20')))
# check FIB for new default route
default_routes = self.get_default_routes(fib)
self.pg0.add_stream([packet])
self.pg_start()
- self.sleep(0.1)
+ self.sleep_on_vpp_time(0.1)
# check that default route is deleted
- fib = self.vapi.ip6_fib_dump()
+ fib = self.vapi.ip_route_dump(0, True)
default_routes = self.get_default_routes(fib)
self.assertEqual(len(default_routes), 0)
- self.sleep(0.1)
+ self.sleep_on_vpp_time(0.1)
# send RA
packet = self.create_ra_packet(self.pg0)
self.pg0.add_stream([packet])
self.pg_start()
- self.sleep(0.1)
+ self.sleep_on_vpp_time(0.1)
# check FIB for new default route
- fib = self.vapi.ip6_fib_dump()
+ fib = self.vapi.ip_route_dump(0, True)
default_routes = self.get_default_routes(fib)
self.assertEqual(len(default_routes), 1)
dr = default_routes[0]
self.pg0.add_stream([packet])
self.pg_start()
- self.sleep(0.1)
+ self.sleep_on_vpp_time(0.1)
# check that default route still exists
- fib = self.vapi.ip6_fib_dump()
+ fib = self.vapi.ip_route_dump(0, True)
default_routes = self.get_default_routes(fib)
self.assertEqual(len(default_routes), 1)
dr = default_routes[0]
self.assertEqual(dr['sw_if_index'], self.pg0.sw_if_index)
self.assertEqual(dr['next_hop'], router_address)
- self.sleep(1)
+ self.sleep_on_vpp_time(1)
# check that default route is deleted
- fib = self.vapi.ip6_fib_dump()
+ fib = self.vapi.ip_route_dump(0, True)
default_routes = self.get_default_routes(fib)
self.assertEqual(len(default_routes), 0)
# check FIB still contains the SLAAC address
addresses = set(self.get_interface_addresses(fib, self.pg0))
new_addresses = addresses.difference(initial_addresses)
+
self.assertEqual(len(new_addresses), 1)
- prefix = list(new_addresses)[0][:8] + '\0\0\0\0\0\0\0\0'
- self.assertEqual(inet_ntop(AF_INET6, prefix), '1::')
+ prefix = IPv6Network(text_type("%s/%d" % (list(new_addresses)[0], 20)),
+ strict=False)
+ self.assertEqual(prefix, IPv6Network(text_type('1::/20')))
- self.sleep(1)
+ self.sleep_on_vpp_time(1)
# check that SLAAC address is deleted
- fib = self.vapi.ip6_fib_dump()
+ fib = self.vapi.ip_route_dump(0, True)
addresses = set(self.get_interface_addresses(fib, self.pg0))
new_addresses = addresses.difference(initial_addresses)
self.assertEqual(len(new_addresses), 0)
class IPv6NDProxyTest(TestIPv6ND):
""" IPv6 ND ProxyTest Case """
+ @classmethod
+ def setUpClass(cls):
+ super(IPv6NDProxyTest, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(IPv6NDProxyTest, cls).tearDownClass()
+
def setUp(self):
super(IPv6NDProxyTest, self).setUp()
#
# Add proxy support for the host
#
- self.vapi.ip6_nd_proxy(
- inet_pton(AF_INET6, self.pg0._remote_hosts[2].ip6),
- self.pg1.sw_if_index)
+ self.vapi.ip6nd_proxy_add_del(
+ ip=inet_pton(AF_INET6, self.pg0._remote_hosts[2].ip6),
+ sw_if_index=self.pg1.sw_if_index)
#
# try that NS again. this time we expect an NA back
#
self.assertTrue(find_nbr(self,
self.pg1.sw_if_index,
- self.pg0._remote_hosts[2].ip6,
- inet=AF_INET6))
+ self.pg0._remote_hosts[2].ip6))
#
# ... and we can route traffic to it
IPv6(dst=self.pg0._remote_hosts[2].ip6,
src=self.pg0.remote_ip6) /
inet6.UDP(sport=10000, dport=20000) /
- Raw('\xa5' * 100))
+ Raw(b'\xa5' * 100))
self.pg0.add_stream(t)
self.pg_enable_capture(self.pg_interfaces)
ICMPv6NDOptSrcLLAddr(
lladdr=self.pg0._remote_hosts[2].mac))
- self.vapi.ip6_nd_proxy(
- inet_pton(AF_INET6, self.pg0._remote_hosts[3].ip6),
- self.pg2.sw_if_index)
+ self.vapi.ip6nd_proxy_add_del(
+ ip=inet_pton(AF_INET6, self.pg0._remote_hosts[3].ip6),
+ sw_if_index=self.pg2.sw_if_index)
self.send_and_expect_na(self.pg2, ns_pg2,
"NS to proxy entry other interface",
self.assertTrue(find_nbr(self,
self.pg2.sw_if_index,
- self.pg0._remote_hosts[3].ip6,
- inet=AF_INET6))
+ self.pg0._remote_hosts[3].ip6))
#
# hosts can communicate. pg2->pg1
IPv6(dst=self.pg0._remote_hosts[2].ip6,
src=self.pg0._remote_hosts[3].ip6) /
inet6.UDP(sport=10000, dport=20000) /
- Raw('\xa5' * 100))
+ Raw(b'\xa5' * 100))
self.pg2.add_stream(t2)
self.pg_enable_capture(self.pg_interfaces)
#
# remove the proxy configs
#
- self.vapi.ip6_nd_proxy(
- inet_pton(AF_INET6, self.pg0._remote_hosts[2].ip6),
- self.pg1.sw_if_index,
- is_del=1)
- self.vapi.ip6_nd_proxy(
- inet_pton(AF_INET6, self.pg0._remote_hosts[3].ip6),
- self.pg2.sw_if_index,
- is_del=1)
+ self.vapi.ip6nd_proxy_add_del(
+ ip=inet_pton(AF_INET6, self.pg0._remote_hosts[2].ip6),
+ sw_if_index=self.pg1.sw_if_index, is_del=1)
+ self.vapi.ip6nd_proxy_add_del(
+ ip=inet_pton(AF_INET6, self.pg0._remote_hosts[3].ip6),
+ sw_if_index=self.pg2.sw_if_index, is_del=1)
self.assertFalse(find_nbr(self,
self.pg2.sw_if_index,
- self.pg0._remote_hosts[3].ip6,
- inet=AF_INET6))
+ self.pg0._remote_hosts[3].ip6))
self.assertFalse(find_nbr(self,
self.pg1.sw_if_index,
- self.pg0._remote_hosts[2].ip6,
- inet=AF_INET6))
+ self.pg0._remote_hosts[2].ip6))
#
# no longer proxy-ing...
class TestIPNull(VppTestCase):
""" IPv6 routes via NULL """
+ @classmethod
+ def setUpClass(cls):
+ super(TestIPNull, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(TestIPNull, cls).tearDownClass()
+
def setUp(self):
super(TestIPNull, self).setUp()
dst=self.pg0.local_mac) /
IPv6(src=self.pg0.remote_ip6, dst="2001::1") /
inet6.UDP(sport=1234, dport=1234) /
- Raw('\xa5' * 100))
+ Raw(b'\xa5' * 100))
#
# A route via IP NULL that will reply with ICMP unreachables
#
- ip_unreach = VppIpRoute(self, "2001::", 64, [], is_unreach=1, is_ip6=1)
+ ip_unreach = VppIpRoute(
+ self, "2001::", 64,
+ [VppRoutePath("::", 0xffffffff,
+ type=FibPathType.FIB_PATH_TYPE_ICMP_UNREACH)])
ip_unreach.add_vpp_config()
self.pg0.add_stream(p)
#
# A route via IP NULL that will reply with ICMP prohibited
#
- ip_prohibit = VppIpRoute(self, "2001::1", 128, [],
- is_prohibit=1, is_ip6=1)
+ ip_prohibit = VppIpRoute(
+ self, "2001::1", 128,
+ [VppRoutePath("::", 0xffffffff,
+ type=FibPathType.FIB_PATH_TYPE_ICMP_PROHIBIT)])
ip_prohibit.add_vpp_config()
self.pg0.add_stream(p)
class TestIPDisabled(VppTestCase):
""" IPv6 disabled """
+ @classmethod
+ def setUpClass(cls):
+ super(TestIPDisabled, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(TestIPDisabled, cls).tearDownClass()
+
def setUp(self):
super(TestIPDisabled, self).setUp()
# create 2 pg interfaces
self.create_pg_interfaces(range(2))
- # PG0 is IP enalbed
+ # PG0 is IP enabled
self.pg0.admin_up()
self.pg0.config_ip6()
self.pg0.resolve_ndp()
[VppMRoutePath(self.pg1.sw_if_index,
MRouteItfFlags.MFIB_ITF_FLAG_ACCEPT),
VppMRoutePath(self.pg0.sw_if_index,
- MRouteItfFlags.MFIB_ITF_FLAG_FORWARD)],
- is_ip6=1)
+ MRouteItfFlags.MFIB_ITF_FLAG_FORWARD)])
route_ff_01.add_vpp_config()
pu = (Ether(src=self.pg1.remote_mac,
dst=self.pg1.local_mac) /
IPv6(src="2001::1", dst=self.pg0.remote_ip6) /
inet6.UDP(sport=1234, dport=1234) /
- Raw('\xa5' * 100))
+ Raw(b'\xa5' * 100))
pm = (Ether(src=self.pg1.remote_mac,
dst=self.pg1.local_mac) /
IPv6(src="2001::1", dst="ffef::1") /
inet6.UDP(sport=1234, dport=1234) /
- Raw('\xa5' * 100))
+ Raw(b'\xa5' * 100))
#
# PG1 does not forward IP traffic
class TestIP6LoadBalance(VppTestCase):
""" IPv6 Load-Balancing """
+ @classmethod
+ def setUpClass(cls):
+ super(TestIP6LoadBalance, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(TestIP6LoadBalance, cls).tearDownClass()
+
def setUp(self):
super(TestIP6LoadBalance, self).setUp()
i.disable_mpls()
super(TestIP6LoadBalance, self).tearDown()
- def send_and_expect_load_balancing(self, input, pkts, outputs):
+ def pg_send(self, input, pkts):
self.vapi.cli("clear trace")
input.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
+
+ def send_and_expect_load_balancing(self, input, pkts, outputs):
+ self.pg_send(input, pkts)
for oo in outputs:
rx = oo._get_capture(1)
self.assertNotEqual(0, len(rx))
def send_and_expect_one_itf(self, input, pkts, itf):
- self.vapi.cli("clear trace")
- input.add_stream(pkts)
- self.pg_enable_capture(self.pg_interfaces)
- self.pg_start()
+ self.pg_send(input, pkts)
rx = itf.get_capture(len(pkts))
def test_ip6_load_balance(self):
src_ip_pkts = []
src_mpls_pkts = []
- for ii in range(65):
+ for ii in range(NUM_PKTS):
port_ip_hdr = (
IPv6(dst="3000::1", src="3000:1::1") /
inet6.UDP(sport=1234, dport=1234 + ii) /
- Raw('\xa5' * 100))
+ Raw(b'\xa5' * 100))
port_ip_pkts.append((Ether(src=self.pg0.remote_mac,
dst=self.pg0.local_mac) /
port_ip_hdr))
src_ip_hdr = (
IPv6(dst="3000::1", src="3000:1::%d" % ii) /
inet6.UDP(sport=1234, dport=1234) /
- Raw('\xa5' * 100))
+ Raw(b'\xa5' * 100))
src_ip_pkts.append((Ether(src=self.pg0.remote_mac,
dst=self.pg0.local_mac) /
src_ip_hdr))
src_ip_hdr))
#
- # A route for the IP pacekts
+ # A route for the IP packets
#
route_3000_1 = VppIpRoute(self, "3000::1", 128,
[VppRoutePath(self.pg1.remote_ip6,
- self.pg1.sw_if_index,
- proto=DpoProto.DPO_PROTO_IP6),
+ self.pg1.sw_if_index),
VppRoutePath(self.pg2.remote_ip6,
- self.pg2.sw_if_index,
- proto=DpoProto.DPO_PROTO_IP6)],
- is_ip6=1)
+ self.pg2.sw_if_index)])
route_3000_1.add_vpp_config()
#
route_67 = VppMplsRoute(self, 67, 0,
[VppRoutePath(self.pg1.remote_ip6,
self.pg1.sw_if_index,
- labels=[67],
- proto=DpoProto.DPO_PROTO_IP6),
+ labels=[67]),
VppRoutePath(self.pg2.remote_ip6,
self.pg2.sw_if_index,
- labels=[67],
- proto=DpoProto.DPO_PROTO_IP6)])
+ labels=[67])])
route_67.add_vpp_config()
#
# src,dst
# We are not going to ensure equal amounts of packets across each link,
# since the hash algorithm is statistical and therefore this can never
- # be guaranteed. But wuth 64 different packets we do expect some
+ # be guaranteed. But with 64 different packets we do expect some
# balancing. So instead just ensure there is traffic on each link.
#
self.send_and_expect_load_balancing(self.pg0, port_ip_pkts,
#
# The packets with Entropy label in should not load-balance,
- # since the Entorpy value is fixed.
+ # since the Entropy value is fixed.
#
self.send_and_expect_one_itf(self.pg0, port_ent_pkts, self.pg1)
# - now only the stream with differing source address will
# load-balance
#
- self.vapi.set_ip_flow_hash(0, is_ip6=1, src=1, dst=1, sport=0, dport=0)
+ self.vapi.set_ip_flow_hash(vrf_id=0, src=1, dst=1, sport=0, dport=0,
+ is_ipv6=1)
self.send_and_expect_load_balancing(self.pg0, src_ip_pkts,
[self.pg1, self.pg2])
#
# change the flow hash config back to defaults
#
- self.vapi.set_ip_flow_hash(0, is_ip6=1, src=1, dst=1, sport=1, dport=1)
+ self.vapi.set_ip_flow_hash(vrf_id=0, src=1, dst=1, sport=1, dport=1,
+ is_ipv6=1)
#
# Recursive prefixes
src="4000:1::1") /
inet6.UDP(sport=1234,
dport=1234 + ii) /
- Raw('\xa5' * 100)))
+ Raw(b'\xa5' * 100)))
src_pkts.append((Ether(src=self.pg0.remote_mac,
dst=self.pg0.local_mac) /
IPv6(dst="4000::1",
src="4000:1::%d" % ii) /
inet6.UDP(sport=1234, dport=1234) /
- Raw('\xa5' * 100)))
+ Raw(b'\xa5' * 100)))
route_3000_2 = VppIpRoute(self, "3000::2", 128,
[VppRoutePath(self.pg3.remote_ip6,
- self.pg3.sw_if_index,
- proto=DpoProto.DPO_PROTO_IP6),
+ self.pg3.sw_if_index),
VppRoutePath(self.pg4.remote_ip6,
- self.pg4.sw_if_index,
- proto=DpoProto.DPO_PROTO_IP6)],
- is_ip6=1)
+ self.pg4.sw_if_index)])
route_3000_2.add_vpp_config()
route_4000_1 = VppIpRoute(self, "4000::1", 128,
[VppRoutePath("3000::1",
- 0xffffffff,
- proto=DpoProto.DPO_PROTO_IP6),
+ 0xffffffff),
VppRoutePath("3000::2",
- 0xffffffff,
- proto=DpoProto.DPO_PROTO_IP6)],
- is_ip6=1)
+ 0xffffffff)])
route_4000_1.add_vpp_config()
#
src="6000:1::1") /
inet6.UDP(sport=1234,
dport=1234 + ii) /
- Raw('\xa5' * 100)))
+ Raw(b'\xa5' * 100)))
route_5000_2 = VppIpRoute(self, "5000::2", 128,
[VppRoutePath(self.pg3.remote_ip6,
- self.pg3.sw_if_index,
- proto=DpoProto.DPO_PROTO_IP6)],
- is_ip6=1)
+ self.pg3.sw_if_index)])
route_5000_2.add_vpp_config()
route_6000_1 = VppIpRoute(self, "6000::1", 128,
[VppRoutePath("5000::2",
- 0xffffffff,
- proto=DpoProto.DPO_PROTO_IP6)],
- is_ip6=1)
+ 0xffffffff)])
route_6000_1.add_vpp_config()
#
class TestIP6Punt(VppTestCase):
""" IPv6 Punt Police/Redirect """
+ @classmethod
+ def setUpClass(cls):
+ super(TestIP6Punt, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(TestIP6Punt, cls).tearDownClass()
+
def setUp(self):
super(TestIP6Punt, self).setUp()
IPv6(src=self.pg0.remote_ip6,
dst=self.pg0.local_ip6) /
inet6.TCP(sport=1234, dport=1234) /
- Raw('\xa5' * 100))
+ Raw(b'\xa5' * 100))
pkts = p * 1025
#
# Configure a punt redirect via pg1.
#
- nh_addr = VppIpAddress(self.pg1.remote_ip6).encode()
+ nh_addr = self.pg1.remote_ip6
self.vapi.ip_punt_redirect(self.pg0.sw_if_index,
self.pg1.sw_if_index,
nh_addr)
#
# add a policer
#
- policer = self.vapi.policer_add_del("ip6-punt", 400, 0, 10, 0,
+ policer = self.vapi.policer_add_del(b"ip6-punt", 400, 0, 10, 0,
rate_type=1)
self.vapi.ip_punt_police(policer.policer_index, is_ip6=1)
self.pg_start()
#
- # the number of packet recieved should be greater than 0,
+ # the number of packet received should be greater than 0,
# but not equal to the number sent, since some were policed
#
rx = self.pg1._get_capture(1)
self.assertLess(len(rx), len(pkts))
#
- # remove the poilcer. back to full rx
+ # remove the policer. back to full rx
#
self.vapi.ip_punt_police(policer.policer_index, is_add=0, is_ip6=1)
- self.vapi.policer_add_del("ip6-punt", 400, 0, 10, 0,
+ self.vapi.policer_add_del(b"ip6-punt", 400, 0, 10, 0,
rate_type=1, is_add=0)
self.send_and_expect(self.pg0, pkts, self.pg1)
#
# Configure a punt redirects
#
- nh_addr = VppIpAddress(self.pg3.remote_ip6).encode()
+ nh_addr = self.pg3.remote_ip6
self.vapi.ip_punt_redirect(self.pg0.sw_if_index,
self.pg3.sw_if_index,
nh_addr)
nh_addr)
self.vapi.ip_punt_redirect(self.pg2.sw_if_index,
self.pg3.sw_if_index,
- VppIpAddress('0::0').encode())
+ '0::0')
#
# Dump pg0 punt redirects
self.assertEqual(len(punts), 3)
for p in punts:
self.assertEqual(p.punt.tx_sw_if_index, self.pg3.sw_if_index)
- self.assertNotEqual(punts[1].punt.nh.un.ip6, self.pg3.remote_ip6)
- self.assertEqual(punts[2].punt.nh.un.ip6.address, '\x00'*16)
+ self.assertNotEqual(punts[1].punt.nh, self.pg3.remote_ip6)
+ self.assertEqual(str(punts[2].punt.nh), '::')
class TestIPDeag(VppTestCase):
""" IPv6 Deaggregate Routes """
+ @classmethod
+ def setUpClass(cls):
+ super(TestIPDeag, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(TestIPDeag, cls).tearDownClass()
+
def setUp(self):
super(TestIPDeag, self).setUp()
route_to_dst = VppIpRoute(self, "1::1", 128,
[VppRoutePath("::",
0xffffffff,
- nh_table_id=1,
- proto=DpoProto.DPO_PROTO_IP6)],
- is_ip6=1)
- route_to_src = VppIpRoute(self, "1::2", 128,
- [VppRoutePath("::",
- 0xffffffff,
- nh_table_id=2,
- is_source_lookup=1,
- proto=DpoProto.DPO_PROTO_IP6)],
- is_ip6=1)
+ nh_table_id=1)])
+ route_to_src = VppIpRoute(
+ self, "1::2", 128,
+ [VppRoutePath("::",
+ 0xffffffff,
+ nh_table_id=2,
+ type=FibPathType.FIB_PATH_TYPE_SOURCE_LOOKUP)])
+
route_to_dst.add_vpp_config()
route_to_src.add_vpp_config()
dst=self.pg0.local_mac) /
IPv6(src="5::5", dst="1::1") /
inet6.TCP(sport=1234, dport=1234) /
- Raw('\xa5' * 100))
+ Raw(b'\xa5' * 100))
p_src = (Ether(src=self.pg0.remote_mac,
dst=self.pg0.local_mac) /
IPv6(src="2::2", dst="1::2") /
inet6.TCP(sport=1234, dport=1234) /
- Raw('\xa5' * 100))
+ Raw(b'\xa5' * 100))
pkts_dst = p_dst * 257
pkts_src = p_src * 257
#
route_in_dst = VppIpRoute(self, "1::1", 128,
[VppRoutePath(self.pg1.remote_ip6,
- self.pg1.sw_if_index,
- proto=DpoProto.DPO_PROTO_IP6)],
- is_ip6=1,
+ self.pg1.sw_if_index)],
table_id=1)
route_in_dst.add_vpp_config()
#
route_in_src = VppIpRoute(self, "2::2", 128,
[VppRoutePath(self.pg2.remote_ip6,
- self.pg2.sw_if_index,
- proto=DpoProto.DPO_PROTO_IP6)],
- is_ip6=1,
+ self.pg2.sw_if_index)],
table_id=2)
route_in_src.add_vpp_config()
self.send_and_expect(self.pg0, pkts_src, self.pg2)
#
route_loop = VppIpRoute(self, "3::3", 128,
[VppRoutePath("::",
- 0xffffffff,
- proto=DpoProto.DPO_PROTO_IP6)],
- is_ip6=1)
+ 0xffffffff)])
route_loop.add_vpp_config()
p_l = (Ether(src=self.pg0.remote_mac,
dst=self.pg0.local_mac) /
IPv6(src="3::4", dst="3::3") /
inet6.TCP(sport=1234, dport=1234) /
- Raw('\xa5' * 100))
+ Raw(b'\xa5' * 100))
self.send_and_assert_no_replies(self.pg0, p_l * 257,
"IP lookup loop")
class TestIP6Input(VppTestCase):
- """ IPv6 Input Exceptions """
+ """ IPv6 Input Exception Test Cases """
+
+ @classmethod
+ def setUpClass(cls):
+ super(TestIP6Input, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(TestIP6Input, cls).tearDownClass()
def setUp(self):
super(TestIP6Input, self).setUp()
i.unconfig_ip6()
i.admin_down()
- def test_ip_input(self):
- """ IP6 Input Exceptions """
-
- #
- # bad version - this is dropped
- #
- p_version = (Ether(src=self.pg0.remote_mac,
- dst=self.pg0.local_mac) /
- IPv6(src=self.pg0.remote_ip6,
- dst=self.pg1.remote_ip6,
- version=3) /
- inet6.UDP(sport=1234, dport=1234) /
- Raw('\xa5' * 100))
-
- self.send_and_assert_no_replies(self.pg0, p_version * 65,
- "funky version")
-
+ def test_ip_input_icmp_reply(self):
+ """ IP6 Input Exception - Return ICMP (3,0) """
#
- # hop limit - IMCP replies
+ # hop limit - ICMP replies
#
p_version = (Ether(src=self.pg0.remote_mac,
dst=self.pg0.local_mac) /
dst=self.pg1.remote_ip6,
hlim=1) /
inet6.UDP(sport=1234, dport=1234) /
- Raw('\xa5' * 100))
+ Raw(b'\xa5' * 100))
- rx = self.send_and_expect(self.pg0, p_version * 65, self.pg0)
+ rx = self.send_and_expect(self.pg0, p_version * NUM_PKTS, self.pg0)
rx = rx[0]
icmp = rx[ICMPv6TimeExceeded]
- self.assertEqual(icmp.type, 3)
+
# 0: "hop limit exceeded in transit",
- self.assertEqual(icmp.code, 0)
+ self.assertEqual((icmp.type, icmp.code), (3, 0))
+
+ icmpv6_data = '\x0a' * 18
+ all_0s = "::"
+ all_1s = "FFFF:FFFF:FFFF:FFFF:FFFF:FFFF:FFFF:FFFF"
+
+ @parameterized.expand([
+ # Name, src, dst, l4proto, msg, timeout
+ ("src='iface', dst='iface'", None, None,
+ inet6.UDP(sport=1234, dport=1234), "funky version", None),
+ ("src='All 0's', dst='iface'", all_0s, None,
+ ICMPv6EchoRequest(id=0xb, seq=5, data=icmpv6_data), None, 0.1),
+ ("src='iface', dst='All 0's'", None, all_0s,
+ ICMPv6EchoRequest(id=0xb, seq=5, data=icmpv6_data), None, 0.1),
+ ("src='All 1's', dst='iface'", all_1s, None,
+ ICMPv6EchoRequest(id=0xb, seq=5, data=icmpv6_data), None, 0.1),
+ ("src='iface', dst='All 1's'", None, all_1s,
+ ICMPv6EchoRequest(id=0xb, seq=5, data=icmpv6_data), None, 0.1),
+ ("src='All 1's', dst='All 1's'", all_1s, all_1s,
+ ICMPv6EchoRequest(id=0xb, seq=5, data=icmpv6_data), None, 0.1),
+
+ ])
+ def test_ip_input_no_replies(self, name, src, dst, l4, msg, timeout):
+
+ self._testMethodDoc = 'IPv6 Input Exception - %s' % name
+
+ p_version = (Ether(src=self.pg0.remote_mac,
+ dst=self.pg0.local_mac) /
+ IPv6(src=src or self.pg0.remote_ip6,
+ dst=dst or self.pg1.remote_ip6,
+ version=3) /
+ l4 /
+ Raw(b'\xa5' * 100))
+
+ self.send_and_assert_no_replies(self.pg0, p_version * NUM_PKTS,
+ remark=msg or "",
+ timeout=timeout)
+
+ def test_hop_by_hop(self):
+ """ Hop-by-hop header test """
+
+ p = (Ether(src=self.pg0.remote_mac,
+ dst=self.pg0.local_mac) /
+ IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6) /
+ IPv6ExtHdrHopByHop() /
+ inet6.UDP(sport=1234, dport=1234) /
+ Raw(b'\xa5' * 100))
+
+ self.pg0.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+
+
+class TestIPReplace(VppTestCase):
+ """ IPv6 Table Replace """
+
+ @classmethod
+ def setUpClass(cls):
+ super(TestIPReplace, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(TestIPReplace, cls).tearDownClass()
+
+ def setUp(self):
+ super(TestIPReplace, self).setUp()
+
+ self.create_pg_interfaces(range(4))
+
+ table_id = 1
+ self.tables = []
+
+ for i in self.pg_interfaces:
+ i.admin_up()
+ i.config_ip6()
+ i.resolve_arp()
+ i.generate_remote_hosts(2)
+ self.tables.append(VppIpTable(self, table_id,
+ True).add_vpp_config())
+ table_id += 1
+
+ def tearDown(self):
+ super(TestIPReplace, self).tearDown()
+ for i in self.pg_interfaces:
+ i.admin_down()
+ i.unconfig_ip4()
+
+ def test_replace(self):
+ """ IP Table Replace """
+
+ N_ROUTES = 20
+ links = [self.pg0, self.pg1, self.pg2, self.pg3]
+ routes = [[], [], [], []]
+
+ # the sizes of 'empty' tables
+ for t in self.tables:
+ self.assertEqual(len(t.dump()), 2)
+ self.assertEqual(len(t.mdump()), 5)
+
+ # load up the tables with some routes
+ for ii, t in enumerate(self.tables):
+ for jj in range(1, N_ROUTES):
+ uni = VppIpRoute(
+ self, "2001::%d" % jj if jj != 0 else "2001::", 128,
+ [VppRoutePath(links[ii].remote_hosts[0].ip6,
+ links[ii].sw_if_index),
+ VppRoutePath(links[ii].remote_hosts[1].ip6,
+ links[ii].sw_if_index)],
+ table_id=t.table_id).add_vpp_config()
+ multi = VppIpMRoute(
+ self, "::",
+ "ff:2001::%d" % jj, 128,
+ MRouteEntryFlags.MFIB_ENTRY_FLAG_NONE,
+ [VppMRoutePath(self.pg0.sw_if_index,
+ MRouteItfFlags.MFIB_ITF_FLAG_ACCEPT,
+ proto=FibPathProto.FIB_PATH_NH_PROTO_IP6),
+ VppMRoutePath(self.pg1.sw_if_index,
+ MRouteItfFlags.MFIB_ITF_FLAG_FORWARD,
+ proto=FibPathProto.FIB_PATH_NH_PROTO_IP6),
+ VppMRoutePath(self.pg2.sw_if_index,
+ MRouteItfFlags.MFIB_ITF_FLAG_FORWARD,
+ proto=FibPathProto.FIB_PATH_NH_PROTO_IP6),
+ VppMRoutePath(self.pg3.sw_if_index,
+ MRouteItfFlags.MFIB_ITF_FLAG_FORWARD,
+ proto=FibPathProto.FIB_PATH_NH_PROTO_IP6)],
+ table_id=t.table_id).add_vpp_config()
+ routes[ii].append({'uni': uni,
+ 'multi': multi})
+
+ #
+ # replace the tables a few times
+ #
+ for kk in range(3):
+ # replace each table
+ for t in self.tables:
+ t.replace_begin()
+
+ # all the routes are still there
+ for ii, t in enumerate(self.tables):
+ dump = t.dump()
+ mdump = t.mdump()
+ for r in routes[ii]:
+ self.assertTrue(find_route_in_dump(dump, r['uni'], t))
+ self.assertTrue(find_mroute_in_dump(mdump, r['multi'], t))
+
+ # redownload the even numbered routes
+ for ii, t in enumerate(self.tables):
+ for jj in range(0, N_ROUTES, 2):
+ routes[ii][jj]['uni'].add_vpp_config()
+ routes[ii][jj]['multi'].add_vpp_config()
+
+ # signal each table converged
+ for t in self.tables:
+ t.replace_end()
+
+ # we should find the even routes, but not the odd
+ for ii, t in enumerate(self.tables):
+ dump = t.dump()
+ mdump = t.mdump()
+ for jj in range(0, N_ROUTES, 2):
+ self.assertTrue(find_route_in_dump(
+ dump, routes[ii][jj]['uni'], t))
+ self.assertTrue(find_mroute_in_dump(
+ mdump, routes[ii][jj]['multi'], t))
+ for jj in range(1, N_ROUTES - 1, 2):
+ self.assertFalse(find_route_in_dump(
+ dump, routes[ii][jj]['uni'], t))
+ self.assertFalse(find_mroute_in_dump(
+ mdump, routes[ii][jj]['multi'], t))
+
+ # reload all the routes
+ for ii, t in enumerate(self.tables):
+ for r in routes[ii]:
+ r['uni'].add_vpp_config()
+ r['multi'].add_vpp_config()
+
+ # all the routes are still there
+ for ii, t in enumerate(self.tables):
+ dump = t.dump()
+ mdump = t.mdump()
+ for r in routes[ii]:
+ self.assertTrue(find_route_in_dump(dump, r['uni'], t))
+ self.assertTrue(find_mroute_in_dump(mdump, r['multi'], t))
+
+ #
+ # finally flush the tables for good measure
+ #
+ for t in self.tables:
+ t.flush()
+ self.assertEqual(len(t.dump()), 2)
+ self.assertEqual(len(t.mdump()), 5)
if __name__ == '__main__':