+ def test_mld(self):
+ """ MLD Report """
+ #
+ # test one MLD is sent after applying an IPv6 Address on an interface
+ #
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+
+ subitf = VppDot1QSubint(self, self.pg1, 99)
+
+ subitf.admin_up()
+ subitf.config_ip6()
+
+ rxs = self.pg1._get_capture(timeout=4, filter_out_fn=None)
+
+ #
+ # hunt for the MLD on vlan 99
+ #
+ for rx in rxs:
+ # make sure ipv6 packets with hop by hop options have
+ # correct checksums
+ self.assert_packet_checksums_valid(rx)
+ if rx.haslayer(IPv6ExtHdrHopByHop) and \
+ rx.haslayer(Dot1Q) and \
+ rx[Dot1Q].vlan == 99:
+ mld = rx[ICMPv6MLReport2]
+
+ self.assertEqual(mld.records_number, 4)
+
+
+class TestIPv6IfAddrRoute(VppTestCase):
+ """ IPv6 Interface Addr Route Test Case """
+
+ @classmethod
+ def setUpClass(cls):
+ super(TestIPv6IfAddrRoute, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(TestIPv6IfAddrRoute, cls).tearDownClass()
+
+ def setUp(self):
+ super(TestIPv6IfAddrRoute, self).setUp()
+
+ # create 1 pg interface
+ self.create_pg_interfaces(range(1))
+
+ for i in self.pg_interfaces:
+ i.admin_up()
+ i.config_ip6()
+ i.resolve_ndp()
+
+ def tearDown(self):
+ super(TestIPv6IfAddrRoute, self).tearDown()
+ for i in self.pg_interfaces:
+ i.unconfig_ip6()
+ i.admin_down()
+
+ def test_ipv6_ifaddrs_same_prefix(self):
+ """ IPv6 Interface Addresses Same Prefix test
+
+ Test scenario:
+
+ - Verify no route in FIB for prefix 2001:10::/64
+ - Configure IPv4 address 2001:10::10/64 on an interface
+ - Verify route in FIB for prefix 2001:10::/64
+ - Configure IPv4 address 2001:10::20/64 on an interface
+ - Delete 2001:10::10/64 from interface
+ - Verify route in FIB for prefix 2001:10::/64
+ - Delete 2001:10::20/64 from interface
+ - Verify no route in FIB for prefix 2001:10::/64
+ """
+
+ addr1 = "2001:10::10"
+ addr2 = "2001:10::20"
+
+ if_addr1 = VppIpInterfaceAddress(self, self.pg0, addr1, 64)
+ if_addr2 = VppIpInterfaceAddress(self, self.pg0, addr2, 64)
+ self.assertFalse(if_addr1.query_vpp_config())
+ self.assertFalse(find_route(self, addr1, 128))
+ self.assertFalse(find_route(self, addr2, 128))
+
+ # configure first address, verify route present
+ if_addr1.add_vpp_config()
+ self.assertTrue(if_addr1.query_vpp_config())
+ self.assertTrue(find_route(self, addr1, 128))
+ self.assertFalse(find_route(self, addr2, 128))
+
+ # configure second address, delete first, verify route not removed
+ if_addr2.add_vpp_config()
+ if_addr1.remove_vpp_config()
+ self.assertFalse(if_addr1.query_vpp_config())
+ self.assertTrue(if_addr2.query_vpp_config())
+ self.assertFalse(find_route(self, addr1, 128))
+ self.assertTrue(find_route(self, addr2, 128))
+
+ # delete second address, verify route removed
+ if_addr2.remove_vpp_config()
+ self.assertFalse(if_addr1.query_vpp_config())
+ self.assertFalse(find_route(self, addr1, 128))
+ self.assertFalse(find_route(self, addr2, 128))
+
+
+class TestICMPv6Echo(VppTestCase):
+ """ ICMPv6 Echo Test Case """
+
+ @classmethod
+ def setUpClass(cls):
+ super(TestICMPv6Echo, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(TestICMPv6Echo, cls).tearDownClass()
+
+ def setUp(self):
+ super(TestICMPv6Echo, self).setUp()
+
+ # create 1 pg interface
+ self.create_pg_interfaces(range(1))
+
+ for i in self.pg_interfaces:
+ i.admin_up()
+ i.config_ip6()
+ i.resolve_ndp()
+
+ def tearDown(self):
+ super(TestICMPv6Echo, self).tearDown()
+ for i in self.pg_interfaces:
+ i.unconfig_ip6()
+ i.admin_down()
+
+ def test_icmpv6_echo(self):
+ """ VPP replies to ICMPv6 Echo Request
+
+ Test scenario:
+
+ - Receive ICMPv6 Echo Request message on pg0 interface.
+ - Check outgoing ICMPv6 Echo Reply message on pg0 interface.
+ """
+
+ icmpv6_id = 0xb
+ icmpv6_seq = 5
+ icmpv6_data = b'\x0a' * 18
+ p_echo_request = (Ether(src=self.pg0.remote_mac,
+ dst=self.pg0.local_mac) /
+ IPv6(src=self.pg0.remote_ip6,
+ dst=self.pg0.local_ip6) /
+ ICMPv6EchoRequest(
+ id=icmpv6_id,
+ seq=icmpv6_seq,
+ data=icmpv6_data))
+
+ self.pg0.add_stream(p_echo_request)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+
+ rx = self.pg0.get_capture(1)
+ rx = rx[0]
+ ether = rx[Ether]
+ ipv6 = rx[IPv6]
+ icmpv6 = rx[ICMPv6EchoReply]
+
+ self.assertEqual(ether.src, self.pg0.local_mac)
+ self.assertEqual(ether.dst, self.pg0.remote_mac)
+
+ self.assertEqual(ipv6.src, self.pg0.local_ip6)
+ self.assertEqual(ipv6.dst, self.pg0.remote_ip6)
+
+ self.assertEqual(
+ icmp6types[icmpv6.type], "Echo Reply")
+ self.assertEqual(icmpv6.id, icmpv6_id)
+ self.assertEqual(icmpv6.seq, icmpv6_seq)
+ self.assertEqual(icmpv6.data, icmpv6_data)
+
+
+class TestIPv6RD(TestIPv6ND):
+ """ IPv6 Router Discovery Test Case """
+
+ @classmethod
+ def setUpClass(cls):
+ super(TestIPv6RD, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(TestIPv6RD, cls).tearDownClass()
+
+ def setUp(self):
+ super(TestIPv6RD, self).setUp()
+
+ # create 2 pg interfaces
+ self.create_pg_interfaces(range(2))
+
+ self.interfaces = list(self.pg_interfaces)
+
+ # setup all interfaces
+ for i in self.interfaces:
+ i.admin_up()
+ i.config_ip6()
+
+ def tearDown(self):
+ for i in self.interfaces:
+ i.unconfig_ip6()
+ i.admin_down()
+ super(TestIPv6RD, self).tearDown()
+
+ def test_rd_send_router_solicitation(self):
+ """ Verify router solicitation packets """
+
+ count = 2
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ self.vapi.ip6nd_send_router_solicitation(self.pg1.sw_if_index,
+ mrc=count)
+ rx_list = self.pg1.get_capture(count, timeout=3)
+ self.assertEqual(len(rx_list), count)
+ for packet in rx_list:
+ self.assertEqual(packet.haslayer(IPv6), 1)
+ self.assertEqual(packet[IPv6].haslayer(
+ ICMPv6ND_RS), 1)
+ dst = ip6_normalize(packet[IPv6].dst)
+ dst2 = ip6_normalize("ff02::2")
+ self.assert_equal(dst, dst2)
+ src = ip6_normalize(packet[IPv6].src)
+ src2 = ip6_normalize(self.pg1.local_ip6_ll)
+ self.assert_equal(src, src2)
+ self.assertTrue(
+ bool(packet[ICMPv6ND_RS].haslayer(
+ ICMPv6NDOptSrcLLAddr)))
+ self.assert_equal(
+ packet[ICMPv6NDOptSrcLLAddr].lladdr,
+ self.pg1.local_mac)
+
+ def verify_prefix_info(self, reported_prefix, prefix_option):
+ prefix = IPv6Network(
+ text_type(prefix_option.getfieldval("prefix") +
+ "/" +
+ text_type(prefix_option.getfieldval("prefixlen"))),
+ strict=False)
+ self.assert_equal(reported_prefix.prefix.network_address,
+ prefix.network_address)
+ L = prefix_option.getfieldval("L")
+ A = prefix_option.getfieldval("A")
+ option_flags = (L << 7) | (A << 6)
+ self.assert_equal(reported_prefix.flags, option_flags)
+ self.assert_equal(reported_prefix.valid_time,
+ prefix_option.getfieldval("validlifetime"))
+ self.assert_equal(reported_prefix.preferred_time,
+ prefix_option.getfieldval("preferredlifetime"))
+
+ def test_rd_receive_router_advertisement(self):
+ """ Verify events triggered by received RA packets """
+
+ self.vapi.want_ip6_ra_events(enable=1)
+
+ prefix_info_1 = ICMPv6NDOptPrefixInfo(
+ prefix="1::2",
+ prefixlen=50,
+ validlifetime=200,
+ preferredlifetime=500,
+ L=1,
+ A=1,
+ )
+
+ prefix_info_2 = ICMPv6NDOptPrefixInfo(
+ prefix="7::4",
+ prefixlen=20,
+ validlifetime=70,
+ preferredlifetime=1000,
+ L=1,
+ A=0,
+ )
+
+ p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
+ IPv6(dst=self.pg1.local_ip6_ll,
+ src=mk_ll_addr(self.pg1.remote_mac)) /
+ ICMPv6ND_RA() /
+ prefix_info_1 /
+ prefix_info_2)
+ self.pg1.add_stream([p])
+ self.pg_start()
+
+ ev = self.vapi.wait_for_event(10, "ip6_ra_event")
+
+ self.assert_equal(ev.current_hop_limit, 0)
+ self.assert_equal(ev.flags, 8)
+ self.assert_equal(ev.router_lifetime_in_sec, 1800)
+ self.assert_equal(ev.neighbor_reachable_time_in_msec, 0)
+ self.assert_equal(
+ ev.time_in_msec_between_retransmitted_neighbor_solicitations, 0)
+
+ self.assert_equal(ev.n_prefixes, 2)
+
+ self.verify_prefix_info(ev.prefixes[0], prefix_info_1)
+ self.verify_prefix_info(ev.prefixes[1], prefix_info_2)
+
+
+class TestIPv6RDControlPlane(TestIPv6ND):
+ """ IPv6 Router Discovery Control Plane Test Case """
+
+ @classmethod
+ def setUpClass(cls):
+ super(TestIPv6RDControlPlane, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(TestIPv6RDControlPlane, cls).tearDownClass()
+
+ def setUp(self):
+ super(TestIPv6RDControlPlane, self).setUp()
+
+ # create 1 pg interface
+ self.create_pg_interfaces(range(1))
+
+ self.interfaces = list(self.pg_interfaces)
+
+ # setup all interfaces
+ for i in self.interfaces:
+ i.admin_up()
+ i.config_ip6()
+
+ def tearDown(self):
+ super(TestIPv6RDControlPlane, self).tearDown()
+
+ @staticmethod
+ def create_ra_packet(pg, routerlifetime=None):
+ src_ip = pg.remote_ip6_ll
+ dst_ip = pg.local_ip6
+ if routerlifetime is not None:
+ ra = ICMPv6ND_RA(routerlifetime=routerlifetime)
+ else:
+ ra = ICMPv6ND_RA()
+ p = (Ether(dst=pg.local_mac, src=pg.remote_mac) /
+ IPv6(dst=dst_ip, src=src_ip) / ra)
+ return p
+
+ @staticmethod
+ def get_default_routes(fib):
+ list = []
+ for entry in fib:
+ if entry.route.prefix.prefixlen == 0:
+ for path in entry.route.paths:
+ if path.sw_if_index != 0xFFFFFFFF:
+ defaut_route = {}
+ defaut_route['sw_if_index'] = path.sw_if_index
+ defaut_route['next_hop'] = path.nh.address.ip6
+ list.append(defaut_route)
+ return list
+
+ @staticmethod
+ def get_interface_addresses(fib, pg):
+ list = []
+ for entry in fib:
+ if entry.route.prefix.prefixlen == 128:
+ path = entry.route.paths[0]
+ if path.sw_if_index == pg.sw_if_index:
+ list.append(str(entry.route.prefix.network_address))
+ return list
+
+ def wait_for_no_default_route(self, n_tries=50, s_time=1):
+ while (n_tries):
+ fib = self.vapi.ip_route_dump(0, True)
+ default_routes = self.get_default_routes(fib)
+ if 0 == len(default_routes):
+ return True
+ n_tries = n_tries - 1
+ self.sleep(s_time)
+
+ return False
+
+ def test_all(self):
+ """ Test handling of SLAAC addresses and default routes """
+
+ fib = self.vapi.ip_route_dump(0, True)
+ default_routes = self.get_default_routes(fib)
+ initial_addresses = set(self.get_interface_addresses(fib, self.pg0))
+ self.assertEqual(default_routes, [])
+ router_address = IPv6Address(text_type(self.pg0.remote_ip6_ll))
+
+ self.vapi.ip6_nd_address_autoconfig(self.pg0.sw_if_index, 1, 1)
+
+ self.sleep(0.1)
+
+ # send RA
+ packet = (self.create_ra_packet(
+ self.pg0) / ICMPv6NDOptPrefixInfo(
+ prefix="1::",
+ prefixlen=64,
+ validlifetime=2,
+ preferredlifetime=2,
+ L=1,
+ A=1,
+ ) / ICMPv6NDOptPrefixInfo(
+ prefix="7::",
+ prefixlen=20,
+ validlifetime=1500,
+ preferredlifetime=1000,
+ L=1,
+ A=0,
+ ))
+ self.pg0.add_stream([packet])
+ self.pg_start()
+
+ self.sleep_on_vpp_time(0.1)
+
+ fib = self.vapi.ip_route_dump(0, True)
+
+ # check FIB for new address
+ addresses = set(self.get_interface_addresses(fib, self.pg0))
+ new_addresses = addresses.difference(initial_addresses)
+ self.assertEqual(len(new_addresses), 1)
+ prefix = IPv6Network(text_type("%s/%d" % (list(new_addresses)[0], 20)),
+ strict=False)
+ self.assertEqual(prefix, IPv6Network(text_type('1::/20')))
+
+ # check FIB for new default route
+ default_routes = self.get_default_routes(fib)
+ self.assertEqual(len(default_routes), 1)
+ dr = default_routes[0]
+ self.assertEqual(dr['sw_if_index'], self.pg0.sw_if_index)
+ self.assertEqual(dr['next_hop'], router_address)
+
+ # send RA to delete default route
+ packet = self.create_ra_packet(self.pg0, routerlifetime=0)
+ self.pg0.add_stream([packet])
+ self.pg_start()
+
+ self.sleep_on_vpp_time(0.1)
+
+ # check that default route is deleted
+ fib = self.vapi.ip_route_dump(0, True)
+ default_routes = self.get_default_routes(fib)
+ self.assertEqual(len(default_routes), 0)
+
+ self.sleep_on_vpp_time(0.1)
+
+ # send RA
+ packet = self.create_ra_packet(self.pg0)
+ self.pg0.add_stream([packet])
+ self.pg_start()
+
+ self.sleep_on_vpp_time(0.1)
+
+ # check FIB for new default route
+ fib = self.vapi.ip_route_dump(0, True)
+ default_routes = self.get_default_routes(fib)
+ self.assertEqual(len(default_routes), 1)
+ dr = default_routes[0]
+ self.assertEqual(dr['sw_if_index'], self.pg0.sw_if_index)
+ self.assertEqual(dr['next_hop'], router_address)
+
+ # send RA, updating router lifetime to 1s
+ packet = self.create_ra_packet(self.pg0, 1)
+ self.pg0.add_stream([packet])
+ self.pg_start()
+
+ self.sleep_on_vpp_time(0.1)
+
+ # check that default route still exists
+ fib = self.vapi.ip_route_dump(0, True)
+ default_routes = self.get_default_routes(fib)
+ self.assertEqual(len(default_routes), 1)
+ dr = default_routes[0]
+ self.assertEqual(dr['sw_if_index'], self.pg0.sw_if_index)
+ self.assertEqual(dr['next_hop'], router_address)
+
+ self.sleep_on_vpp_time(1)
+
+ # check that default route is deleted
+ self.assertTrue(self.wait_for_no_default_route())
+
+ # check FIB still contains the SLAAC address
+ addresses = set(self.get_interface_addresses(fib, self.pg0))
+ new_addresses = addresses.difference(initial_addresses)
+
+ self.assertEqual(len(new_addresses), 1)
+ prefix = IPv6Network(text_type("%s/%d" % (list(new_addresses)[0], 20)),
+ strict=False)
+ self.assertEqual(prefix, IPv6Network(text_type('1::/20')))
+
+ self.sleep_on_vpp_time(1)
+
+ # check that SLAAC address is deleted
+ fib = self.vapi.ip_route_dump(0, True)
+ addresses = set(self.get_interface_addresses(fib, self.pg0))
+ new_addresses = addresses.difference(initial_addresses)
+ self.assertEqual(len(new_addresses), 0)
+