+class TestICMPv6Echo(VppTestCase):
+ """ ICMPv6 Echo Test Case """
+
+ def setUp(self):
+ super(TestICMPv6Echo, self).setUp()
+
+ # create 1 pg interface
+ self.create_pg_interfaces(range(1))
+
+ for i in self.pg_interfaces:
+ i.admin_up()
+ i.config_ip6()
+ i.resolve_ndp()
+
+ def tearDown(self):
+ super(TestICMPv6Echo, self).tearDown()
+ for i in self.pg_interfaces:
+ i.unconfig_ip6()
+ i.ip6_disable()
+ i.admin_down()
+
+ def test_icmpv6_echo(self):
+ """ VPP replies to ICMPv6 Echo Request
+
+ Test scenario:
+
+ - Receive ICMPv6 Echo Request message on pg0 interface.
+ - Check outgoing ICMPv6 Echo Reply message on pg0 interface.
+ """
+
+ icmpv6_id = 0xb
+ icmpv6_seq = 5
+ icmpv6_data = '\x0a' * 18
+ p_echo_request = (Ether(src=self.pg0.remote_mac,
+ dst=self.pg0.local_mac) /
+ IPv6(src=self.pg0.remote_ip6,
+ dst=self.pg0.local_ip6) /
+ ICMPv6EchoRequest(
+ id=icmpv6_id,
+ seq=icmpv6_seq,
+ data=icmpv6_data))
+
+ self.pg0.add_stream(p_echo_request)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+
+ rx = self.pg0.get_capture(1)
+ rx = rx[0]
+ ether = rx[Ether]
+ ipv6 = rx[IPv6]
+ icmpv6 = rx[ICMPv6EchoReply]
+
+ self.assertEqual(ether.src, self.pg0.local_mac)
+ self.assertEqual(ether.dst, self.pg0.remote_mac)
+
+ self.assertEqual(ipv6.src, self.pg0.local_ip6)
+ self.assertEqual(ipv6.dst, self.pg0.remote_ip6)
+
+ self.assertEqual(
+ icmp6types[icmpv6.type], "Echo Reply")
+ self.assertEqual(icmpv6.id, icmpv6_id)
+ self.assertEqual(icmpv6.seq, icmpv6_seq)
+ self.assertEqual(icmpv6.data, icmpv6_data)
+
+
+class TestIPv6RD(TestIPv6ND):
+ """ IPv6 Router Discovery Test Case """
+
+ @classmethod
+ def setUpClass(cls):
+ super(TestIPv6RD, cls).setUpClass()
+
+ def setUp(self):
+ super(TestIPv6RD, self).setUp()
+
+ # create 2 pg interfaces
+ self.create_pg_interfaces(range(2))
+
+ self.interfaces = list(self.pg_interfaces)
+
+ # setup all interfaces
+ for i in self.interfaces:
+ i.admin_up()
+ i.config_ip6()
+
+ def tearDown(self):
+ for i in self.interfaces:
+ i.unconfig_ip6()
+ i.admin_down()
+ super(TestIPv6RD, self).tearDown()
+
+ def test_rd_send_router_solicitation(self):
+ """ Verify router solicitation packets """
+
+ count = 2
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ self.vapi.ip6nd_send_router_solicitation(self.pg1.sw_if_index,
+ mrc=count)
+ rx_list = self.pg1.get_capture(count, timeout=3)
+ self.assertEqual(len(rx_list), count)
+ for packet in rx_list:
+ self.assertEqual(packet.haslayer(IPv6), 1)
+ self.assertEqual(packet[IPv6].haslayer(
+ ICMPv6ND_RS), 1)
+ dst = ip6_normalize(packet[IPv6].dst)
+ dst2 = ip6_normalize("ff02::2")
+ self.assert_equal(dst, dst2)
+ src = ip6_normalize(packet[IPv6].src)
+ src2 = ip6_normalize(self.pg1.local_ip6_ll)
+ self.assert_equal(src, src2)
+ self.assertTrue(
+ bool(packet[ICMPv6ND_RS].haslayer(
+ ICMPv6NDOptSrcLLAddr)))
+ self.assert_equal(
+ packet[ICMPv6NDOptSrcLLAddr].lladdr,
+ self.pg1.local_mac)
+
+ def verify_prefix_info(self, reported_prefix, prefix_option):
+ prefix = socket.inet_pton(socket.AF_INET6,
+ prefix_option.getfieldval("prefix"))
+ self.assert_equal(reported_prefix.dst_address, prefix)
+ self.assert_equal(reported_prefix.dst_address_length,
+ prefix_option.getfieldval("prefixlen"))
+ L = prefix_option.getfieldval("L")
+ A = prefix_option.getfieldval("A")
+ option_flags = (L << 7) | (A << 6)
+ self.assert_equal(reported_prefix.flags, option_flags)
+ self.assert_equal(reported_prefix.valid_time,
+ prefix_option.getfieldval("validlifetime"))
+ self.assert_equal(reported_prefix.preferred_time,
+ prefix_option.getfieldval("preferredlifetime"))
+
+ def test_rd_receive_router_advertisement(self):
+ """ Verify events triggered by received RA packets """
+
+ self.vapi.want_ip6_ra_events()
+
+ prefix_info_1 = ICMPv6NDOptPrefixInfo(
+ prefix="1::2",
+ prefixlen=50,
+ validlifetime=200,
+ preferredlifetime=500,
+ L=1,
+ A=1,
+ )
+
+ prefix_info_2 = ICMPv6NDOptPrefixInfo(
+ prefix="7::4",
+ prefixlen=20,
+ validlifetime=70,
+ preferredlifetime=1000,
+ L=1,
+ A=0,
+ )
+
+ p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
+ IPv6(dst=self.pg1.local_ip6_ll,
+ src=mk_ll_addr(self.pg1.remote_mac)) /
+ ICMPv6ND_RA() /
+ prefix_info_1 /
+ prefix_info_2)
+ self.pg1.add_stream([p])
+ self.pg_start()
+
+ ev = self.vapi.wait_for_event(10, "ip6_ra_event")
+
+ self.assert_equal(ev.current_hop_limit, 0)
+ self.assert_equal(ev.flags, 8)
+ self.assert_equal(ev.router_lifetime_in_sec, 1800)
+ self.assert_equal(ev.neighbor_reachable_time_in_msec, 0)
+ self.assert_equal(
+ ev.time_in_msec_between_retransmitted_neighbor_solicitations, 0)
+
+ self.assert_equal(ev.n_prefixes, 2)
+
+ self.verify_prefix_info(ev.prefixes[0], prefix_info_1)
+ self.verify_prefix_info(ev.prefixes[1], prefix_info_2)
+
+
+class TestIPv6RDControlPlane(TestIPv6ND):
+ """ IPv6 Router Discovery Control Plane Test Case """
+
+ @classmethod
+ def setUpClass(cls):
+ super(TestIPv6RDControlPlane, cls).setUpClass()
+
+ def setUp(self):
+ super(TestIPv6RDControlPlane, self).setUp()
+
+ # create 1 pg interface
+ self.create_pg_interfaces(range(1))
+
+ self.interfaces = list(self.pg_interfaces)
+
+ # setup all interfaces
+ for i in self.interfaces:
+ i.admin_up()
+ i.config_ip6()
+
+ def tearDown(self):
+ super(TestIPv6RDControlPlane, self).tearDown()
+
+ @staticmethod
+ def create_ra_packet(pg, routerlifetime=None):
+ src_ip = pg.remote_ip6_ll
+ dst_ip = pg.local_ip6
+ if routerlifetime is not None:
+ ra = ICMPv6ND_RA(routerlifetime=routerlifetime)
+ else:
+ ra = ICMPv6ND_RA()
+ p = (Ether(dst=pg.local_mac, src=pg.remote_mac) /
+ IPv6(dst=dst_ip, src=src_ip) / ra)
+ return p
+
+ @staticmethod
+ def get_default_routes(fib):
+ list = []
+ for entry in fib:
+ if entry.address_length == 0:
+ for path in entry.path:
+ if path.sw_if_index != 0xFFFFFFFF:
+ defaut_route = {}
+ defaut_route['sw_if_index'] = path.sw_if_index
+ defaut_route['next_hop'] = path.next_hop
+ list.append(defaut_route)
+ return list
+
+ @staticmethod
+ def get_interface_addresses(fib, pg):
+ list = []
+ for entry in fib:
+ if entry.address_length == 128:
+ path = entry.path[0]
+ if path.sw_if_index == pg.sw_if_index:
+ list.append(entry.address)
+ return list
+
+ def test_all(self):
+ """ Test handling of SLAAC addresses and default routes """
+
+ fib = self.vapi.ip6_fib_dump()
+ default_routes = self.get_default_routes(fib)
+ initial_addresses = set(self.get_interface_addresses(fib, self.pg0))
+ self.assertEqual(default_routes, [])
+ router_address = self.pg0.remote_ip6n_ll
+
+ self.vapi.ip6_nd_address_autoconfig(self.pg0.sw_if_index, 1, 1)
+
+ self.sleep(0.1)
+
+ # send RA
+ packet = (self.create_ra_packet(
+ self.pg0) / ICMPv6NDOptPrefixInfo(
+ prefix="1::",
+ prefixlen=64,
+ validlifetime=2,
+ preferredlifetime=2,
+ L=1,
+ A=1,
+ ) / ICMPv6NDOptPrefixInfo(
+ prefix="7::",
+ prefixlen=20,
+ validlifetime=1500,
+ preferredlifetime=1000,
+ L=1,
+ A=0,
+ ))
+ self.pg0.add_stream([packet])
+ self.pg_start()
+
+ self.sleep(0.1)
+
+ fib = self.vapi.ip6_fib_dump()
+
+ # check FIB for new address
+ addresses = set(self.get_interface_addresses(fib, self.pg0))
+ new_addresses = addresses.difference(initial_addresses)
+ self.assertEqual(len(new_addresses), 1)
+ prefix = list(new_addresses)[0][:8] + '\0\0\0\0\0\0\0\0'
+ self.assertEqual(inet_ntop(AF_INET6, prefix), '1::')
+
+ # check FIB for new default route
+ default_routes = self.get_default_routes(fib)
+ self.assertEqual(len(default_routes), 1)
+ dr = default_routes[0]
+ self.assertEqual(dr['sw_if_index'], self.pg0.sw_if_index)
+ self.assertEqual(dr['next_hop'], router_address)
+
+ # send RA to delete default route
+ packet = self.create_ra_packet(self.pg0, routerlifetime=0)
+ self.pg0.add_stream([packet])
+ self.pg_start()
+
+ self.sleep(0.1)
+
+ # check that default route is deleted
+ fib = self.vapi.ip6_fib_dump()
+ default_routes = self.get_default_routes(fib)
+ self.assertEqual(len(default_routes), 0)
+
+ self.sleep(0.1)
+
+ # send RA
+ packet = self.create_ra_packet(self.pg0)
+ self.pg0.add_stream([packet])
+ self.pg_start()
+
+ self.sleep(0.1)
+
+ # check FIB for new default route
+ fib = self.vapi.ip6_fib_dump()
+ default_routes = self.get_default_routes(fib)
+ self.assertEqual(len(default_routes), 1)
+ dr = default_routes[0]
+ self.assertEqual(dr['sw_if_index'], self.pg0.sw_if_index)
+ self.assertEqual(dr['next_hop'], router_address)
+
+ # send RA, updating router lifetime to 1s
+ packet = self.create_ra_packet(self.pg0, 1)
+ self.pg0.add_stream([packet])
+ self.pg_start()
+
+ self.sleep(0.1)
+
+ # check that default route still exists
+ fib = self.vapi.ip6_fib_dump()
+ default_routes = self.get_default_routes(fib)
+ self.assertEqual(len(default_routes), 1)
+ dr = default_routes[0]
+ self.assertEqual(dr['sw_if_index'], self.pg0.sw_if_index)
+ self.assertEqual(dr['next_hop'], router_address)
+
+ self.sleep(1)
+
+ # check that default route is deleted
+ fib = self.vapi.ip6_fib_dump()
+ default_routes = self.get_default_routes(fib)
+ self.assertEqual(len(default_routes), 0)
+
+ # check FIB still contains the SLAAC address
+ addresses = set(self.get_interface_addresses(fib, self.pg0))
+ new_addresses = addresses.difference(initial_addresses)
+ self.assertEqual(len(new_addresses), 1)
+ prefix = list(new_addresses)[0][:8] + '\0\0\0\0\0\0\0\0'
+ self.assertEqual(inet_ntop(AF_INET6, prefix), '1::')
+
+ self.sleep(1)
+
+ # check that SLAAC address is deleted
+ fib = self.vapi.ip6_fib_dump()
+ addresses = set(self.get_interface_addresses(fib, self.pg0))
+ new_addresses = addresses.difference(initial_addresses)
+ self.assertEqual(len(new_addresses), 0)
+
+