X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=test%2Ftest_ip6.py;h=ea669b708a6294b86ce97ad4fc779612819e5898;hb=da505f608e0919c45089dc80f9e3e16330a6551a;hp=92bb350d1b995a66aa726434c1de5347b48fedac;hpb=f62ae1288a776527c7f7ba3951531fbd07bc63da;p=vpp.git diff --git a/test/test_ip6.py b/test/test_ip6.py index 92bb350d1b9..ea669b708a6 100644 --- a/test/test_ip6.py +++ b/test/test_ip6.py @@ -2,14 +2,24 @@ import unittest import socket -from logging import * from framework import VppTestCase, VppTestRunner -from vpp_sub_interface import VppSubInterface, VppDot1QSubint, VppDot1ADSubint +from vpp_sub_interface import VppSubInterface, VppDot1QSubint from scapy.packet import Raw from scapy.layers.l2 import Ether, Dot1Q -from scapy.layers.inet6 import ICMPv6ND_NS, IPv6, UDP +from scapy.layers.inet6 import IPv6, UDP, ICMPv6ND_NS, ICMPv6ND_RS, \ + ICMPv6ND_RA, ICMPv6NDOptSrcLLAddr, getmacbyip6, ICMPv6MRD_Solicitation +from util import ppp +from scapy.utils6 import in6_getnsma, in6_getnsmac, in6_ptop, in6_islladdr, \ + in6_mactoifaceid +from scapy.utils import inet_pton, inet_ntop + + +def mk_ll_addr(mac): + euid = in6_mactoifaceid(mac) + addr = "fe80::" + euid + return addr class TestIPv6(VppTestCase): @@ -20,6 +30,26 @@ class TestIPv6(VppTestCase): super(TestIPv6, cls).setUpClass() def setUp(self): + """ + Perform test setup before test case. + + **Config:** + - create 3 pg interfaces + - untagged pg0 interface + - Dot1Q subinterface on pg1 + - Dot1AD subinterface on pg2 + - setup interfaces: + - put it into UP state + - set IPv6 addresses + - resolve neighbor address using NDP + - configure 200 fib entries + + :ivar list interfaces: pg interfaces and subinterfaces. + :ivar dict flows: IPv4 packet flows in test. + :ivar list pg_if_packet_sizes: packet sizes in test. + + *TODO:* Create AD sub interface + """ super(TestIPv6, self).setUp() # create 3 pg interfaces @@ -28,8 +58,9 @@ class TestIPv6(VppTestCase): # create 2 subinterfaces for p1 and pg2 self.sub_interfaces = [ VppDot1QSubint(self, self.pg1, 100), - VppDot1QSubint(self, self.pg2, 200)] + VppDot1QSubint(self, self.pg2, 200) # TODO: VppDot1ADSubint(self, self.pg2, 200, 300, 400) + ] # packet flows mapping pg0 -> pg1.sub, pg2.sub, etc. self.flows = dict() @@ -50,16 +81,32 @@ class TestIPv6(VppTestCase): i.config_ip6() i.resolve_ndp() - # config 2M FIB enries + # config 2M FIB entries self.config_fib_entries(200) def tearDown(self): + """Run standard test teardown and log ``show ip6 neighbors``.""" + for i in self.sub_interfaces: + i.unconfig_ip6() + i.ip6_disable() + i.admin_down() + i.remove_vpp_config() + super(TestIPv6, self).tearDown() if not self.vpp_dead: - info(self.vapi.cli("show ip6 neighbors")) + self.logger.info(self.vapi.cli("show ip6 neighbors")) # info(self.vapi.cli("show ip6 fib")) # many entries def config_fib_entries(self, count): + """For each interface add to the FIB table *count* routes to + "fd02::1/128" destination with interface's local address as next-hop + address. + + :param int count: Number of FIB entries. + + - *TODO:* check if the next-hop address shouldn't be remote address + instead of local address. + """ n_int = len(self.interfaces) percent = 0 counter = 0.0 @@ -70,18 +117,22 @@ class TestIPv6(VppTestCase): for j in range(count / n_int): self.vapi.ip_add_del_route( dest_addr, dest_addr_len, next_hop_address, is_ipv6=1) - counter = counter + 1 + counter += 1 if counter / count * 100 > percent: - info("Configure %d FIB entries .. %d%% done" % - (count, percent)) - percent = percent + 1 + self.logger.info("Configure %d FIB entries .. %d%% done" % + (count, percent)) + percent += 1 def create_stream(self, src_if, packet_sizes): + """Create input packet stream for defined interface. + + :param VppInterface src_if: Interface to create packet stream for. + :param list packet_sizes: Required packet sizes. + """ pkts = [] for i in range(0, 257): dst_if = self.flows[src_if][i % 2] - info = self.create_packet_info( - src_if.sw_if_index, dst_if.sw_if_index) + info = self.create_packet_info(src_if, dst_if) payload = self.info_to_payload(info) p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) / IPv6(src=src_if.remote_ip6, dst=dst_if.remote_ip6) / @@ -96,7 +147,13 @@ class TestIPv6(VppTestCase): return pkts def verify_capture(self, dst_if, capture): - info("Verifying capture on interface %s" % dst_if.name) + """Verify captured input packet stream for defined interface. + + :param VppInterface dst_if: Interface to verify captured packet stream + for. + :param list capture: Captured packet stream. + """ + self.logger.info("Verifying capture on interface %s" % dst_if.name) last_info = dict() for i in self.interfaces: last_info[i.sw_if_index] = None @@ -115,8 +172,9 @@ class TestIPv6(VppTestCase): payload_info = self.payload_to_info(str(packet[Raw])) packet_index = payload_info.index self.assertEqual(payload_info.dst, dst_sw_if_index) - debug("Got packet on port %s: src=%u (id=%u)" % - (dst_if.name, payload_info.src, packet_index)) + self.logger.debug( + "Got packet on port %s: src=%u (id=%u)" % + (dst_if.name, payload_info.src, packet_index)) next_info = self.get_next_packet_info_for_interface2( payload_info.src, dst_sw_if_index, last_info[payload_info.src]) @@ -130,19 +188,23 @@ class TestIPv6(VppTestCase): self.assertEqual(udp.sport, saved_packet[UDP].sport) self.assertEqual(udp.dport, saved_packet[UDP].dport) except: - error("Unexpected or invalid packet:") - error(packet.show()) + self.logger.error(ppp("Unexpected or invalid packet:", packet)) raise for i in self.interfaces: remaining_packet = self.get_next_packet_info_for_interface2( i.sw_if_index, dst_sw_if_index, last_info[i.sw_if_index]) - self.assertTrue( - remaining_packet is None, - "Interface %s: Packet expected from interface %s didn't arrive" % - (dst_if.name, i.name)) + self.assertTrue(remaining_packet is None, + "Interface %s: Packet expected from interface %s " + "didn't arrive" % (dst_if.name, i.name)) def test_fib(self): - """ IPv6 FIB test """ + """ IPv6 FIB test + + Test scenario: + - Create IPv6 stream for pg0 interface + - Create IPv6 tagged streams for pg1's and pg2's subinterface. + - Send and verify received packets on each interface. + """ pkts = self.create_stream(self.pg0, self.pg_if_packet_sizes) self.pg0.add_stream(pkts) @@ -161,6 +223,227 @@ class TestIPv6(VppTestCase): pkts = i.parent.get_capture() self.verify_capture(i, pkts) + def send_and_assert_no_replies(self, intf, pkts, remark): + intf.add_stream(pkts) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + intf.assert_nothing_captured(remark=remark) + + def test_ns(self): + """ IPv6 Neighbour Solicitation Exceptions + + Test scenario: + - Send an NS Sourced from an address not covered by the link sub-net + - Send an NS to an mcast address the router has not joined + - Send NS for a target address the router does not onn. + """ + + # + # An NS from a non link source address + # + nsma = in6_getnsma(inet_pton(socket.AF_INET6, self.pg0.local_ip6)) + d = inet_ntop(socket.AF_INET6, nsma) + + p = (Ether(dst=in6_getnsmac(nsma)) / + IPv6(dst=d, src="2002::2") / + ICMPv6ND_NS(tgt=self.pg0.local_ip6) / + ICMPv6NDOptSrcLLAddr(lladdr=self.pg0.remote_mac)) + pkts = [p] + + self.send_and_assert_no_replies( + self.pg0, pkts, + "No response to NS source by address not on sub-net") + + # + # An NS for sent to a solicited mcast group the router is + # not a member of FAILS + # + if 0: + nsma = in6_getnsma(inet_pton(socket.AF_INET6, "fd::ffff")) + d = inet_ntop(socket.AF_INET6, nsma) + + p = (Ether(dst=in6_getnsmac(nsma)) / + IPv6(dst=d, src=self.pg0.remote_ip6) / + ICMPv6ND_NS(tgt=self.pg0.local_ip6) / + ICMPv6NDOptSrcLLAddr(lladdr=self.pg0.remote_mac)) + pkts = [p] + + self.send_and_assert_no_replies( + self.pg0, pkts, + "No response to NS sent to unjoined mcast address") + + # + # An NS whose target address is one the router does not own + # + nsma = in6_getnsma(inet_pton(socket.AF_INET6, self.pg0.local_ip6)) + d = inet_ntop(socket.AF_INET6, nsma) + + p = (Ether(dst=in6_getnsmac(nsma)) / + IPv6(dst=d, src=self.pg0.remote_ip6) / + ICMPv6ND_NS(tgt="fd::ffff") / + ICMPv6NDOptSrcLLAddr(lladdr=self.pg0.remote_mac)) + pkts = [p] + + self.send_and_assert_no_replies(self.pg0, pkts, + "No response to NS for unknown target") + + def send_and_expect_ra(self, intf, pkts, remark, src_ip=None): + if not src_ip: + src_ip = intf.remote_ip6 + intf.add_stream(pkts) + self.pg0.add_stream(pkts) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + rx = intf.get_capture(1) + + self.assertEqual(len(rx), 1) + rx = rx[0] + + # the rx'd RA should be addressed to the sender's source + self.assertTrue(rx.haslayer(ICMPv6ND_RA)) + self.assertEqual(in6_ptop(rx[IPv6].dst), + in6_ptop(src_ip)) + + # and come from the router's link local + self.assertTrue(in6_islladdr(rx[IPv6].src)) + self.assertEqual(in6_ptop(rx[IPv6].src), + in6_ptop(mk_ll_addr(intf.local_mac))) + + def test_rs(self): + """ IPv6 Router Solicitation Exceptions + + Test scenario: + """ + + # + # Before we begin change the IPv6 RA responses to use the unicast + # address - that way we will not confuse them with the periodic + # RAs which go to the mcast address + # + self.pg0.ip6_ra_config(send_unicast=1) + + # + # An RS from a link source address + # - expect an RA in return + # + p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / + IPv6(dst=self.pg0.local_ip6, src=self.pg0.remote_ip6) / + ICMPv6ND_RS()) + pkts = [p] + self.send_and_expect_ra(self.pg0, pkts, "Genuine RS") + + # + # For the next RS sent the RA should be rate limited + # + self.send_and_assert_no_replies(self.pg0, pkts, "RA rate limited") + + # + # When we reconfiure the IPv6 RA config, we reset the RA rate limiting, + # so we need to do this before each test below so as not to drop + # packets for rate limiting reasons. Test this works here. + # + self.pg0.ip6_ra_config(send_unicast=1) + self.send_and_expect_ra(self.pg0, pkts, "Rate limit reset RS") + + # + # An RS sent from a non-link local source + # + self.pg0.ip6_ra_config(send_unicast=1) + p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / + IPv6(dst=self.pg0.local_ip6, src="2002::ffff") / + ICMPv6ND_RS()) + pkts = [p] + self.send_and_assert_no_replies(self.pg0, pkts, + "RS from non-link source") + + # + # Source an RS from a link local address + # + self.pg0.ip6_ra_config(send_unicast=1) + ll = mk_ll_addr(self.pg0.remote_mac) + p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / + IPv6(dst=self.pg0.local_ip6, src=ll) / + ICMPv6ND_RS()) + pkts = [p] + self.send_and_expect_ra( + self.pg0, pkts, "RS sourced from link-local", src_ip=ll) + + # + # Source from the unspecified address ::. This happens when the RS + # is sent before the host has a configured address/sub-net, + # i.e. auto-config. Since the sender has no IP address, the reply + # comes back mcast - so the capture needs to not filter this. + # If we happen to pick up the periodic RA at this point then so be it, + # it's not an error. + # + self.pg0.ip6_ra_config(send_unicast=1) + p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / + IPv6(dst=self.pg0.local_ip6, src="::") / + ICMPv6ND_RS()) + pkts = [p] + + self.pg0.add_stream(pkts) + self.pg0.add_stream(pkts) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg0.get_capture(1, filter_out_fn=None) + found = 0 + for rx in capture: + if (rx.haslayer(ICMPv6ND_RA)): + # and come from the router's link local + self.assertTrue(in6_islladdr(rx[IPv6].src)) + self.assertEqual(in6_ptop(rx[IPv6].src), + in6_ptop(mk_ll_addr(self.pg0.local_mac))) + # sent to the all hosts mcast + self.assertEqual(in6_ptop(rx[IPv6].dst), "ff02::1") + + found = 1 + self.assertTrue(found) + + @unittest.skip("Unsupported") + def test_mrs(self): + """ IPv6 Multicast Router Solicitation Exceptions + + Test scenario: + """ + + # + # An RS from a link source address + # - expect an RA in return + # + nsma = in6_getnsma(inet_pton(socket.AF_INET6, self.pg0.local_ip6)) + d = inet_ntop(socket.AF_INET6, nsma) + + p = (Ether(dst=getmacbyip6("ff02::2")) / + IPv6(dst=d, src=self.pg0.remote_ip6) / + ICMPv6MRD_Solicitation()) + pkts = [p] + + self.pg0.add_stream(pkts) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + self.pg0.assert_nothing_captured( + remark="No response to NS source by address not on sub-net") + + # + # An RS from a non link source address + # + nsma = in6_getnsma(inet_pton(socket.AF_INET6, self.pg0.local_ip6)) + d = inet_ntop(socket.AF_INET6, nsma) + + p = (Ether(dst=getmacbyip6("ff02::2")) / + IPv6(dst=d, src="2002::2") / + ICMPv6MRD_Solicitation()) + pkts = [p] + + self.send_and_assert_no_replies(self.pg0, pkts, + "RA rate limited") + self.pg0.add_stream(pkts) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + self.pg0.assert_nothing_captured( + remark="No response to NS source by address not on sub-net") + if __name__ == '__main__': unittest.main(testRunner=VppTestRunner)