X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=test%2Ftest_ip6.py;h=1d52edb7721f7b0250f0fe0d03ccb2a762aaa52e;hb=eaea421e1fefedc47325f970475c5d48c899433c;hp=df2364213d477d69a5c88d0e5e624dd40244628a;hpb=a6fe463c1cb67a52f06e519a514a47a9b6af8057;p=vpp.git diff --git a/test/test_ip6.py b/test/test_ip6.py index df2364213d4..1d52edb7721 100644 --- a/test/test_ip6.py +++ b/test/test_ip6.py @@ -3,6 +3,7 @@ import socket import unittest +from parameterized import parameterized import scapy.layers.inet6 as inet6 from scapy.contrib.mpls import MPLS from scapy.layers.inet6 import IPv6, ICMPv6ND_NS, ICMPv6ND_RS, \ @@ -17,22 +18,22 @@ from scapy.utils6 import in6_getnsma, in6_getnsmac, in6_ptop, in6_islladdr, \ from six import moves from framework import VppTestCase, VppTestRunner -from util import ppp, ip6_normalize +from util import ppp, ip6_normalize, mk_ll_addr from vpp_ip import DpoProto from vpp_ip_route import VppIpRoute, VppRoutePath, find_route, VppIpMRoute, \ VppMRoutePath, MRouteItfFlags, MRouteEntryFlags, VppMplsIpBind, \ - VppMplsRoute, VppMplsTable, VppIpTable, VppIpAddress + VppMplsRoute, VppMplsTable, VppIpTable from vpp_neighbor import find_nbr, VppNeighbor from vpp_pg_interface import is_ipv6_misc from vpp_sub_interface import VppSubInterface, VppDot1QSubint +from ipaddress import IPv6Network, IPv4Network AF_INET6 = socket.AF_INET6 - -def mk_ll_addr(mac): - euid = in6_mactoifaceid(mac) - addr = "fe80::" + euid - return addr +try: + text_type = unicode +except NameError: + text_type = str class TestIPv6ND(VppTestCase): @@ -317,7 +318,7 @@ class TestIPv6(TestIPv6ND): try: ip = packet[IPv6] udp = packet[inet6.UDP] - payload_info = self.payload_to_info(str(packet[Raw])) + payload_info = self.payload_to_info(packet[Raw]) packet_index = payload_info.index self.assertEqual(payload_info.dst, dst_sw_if_index) self.logger.debug( @@ -444,7 +445,6 @@ class TestIPv6(TestIPv6ND): self.pg0.sw_if_index, self.pg0.remote_hosts[2].mac, self.pg0.remote_hosts[2].ip6, - af=AF_INET6, is_no_fib_entry=1) nd_entry.add_vpp_config() @@ -453,8 +453,7 @@ class TestIPv6(TestIPv6ND): # self.assertTrue(find_nbr(self, self.pg0.sw_if_index, - self.pg0._remote_hosts[2].ip6, - inet=AF_INET6)) + self.pg0._remote_hosts[2].ip6)) self.assertFalse(find_route(self, self.pg0._remote_hosts[2].ip6, 128, @@ -482,8 +481,7 @@ class TestIPv6(TestIPv6ND): # self.assertTrue(find_nbr(self, self.pg0.sw_if_index, - self.pg0._remote_hosts[2].ip6_ll, - inet=AF_INET6)) + self.pg0._remote_hosts[2].ip6_ll)) self.assertFalse(find_route(self, self.pg0._remote_hosts[2].ip6_ll, 128, @@ -510,8 +508,7 @@ class TestIPv6(TestIPv6ND): # self.assertTrue(find_nbr(self, self.pg0.sw_if_index, - self.pg0._remote_hosts[3].ip6_ll, - inet=AF_INET6)) + self.pg0._remote_hosts[3].ip6_ll)) self.assertFalse(find_route(self, self.pg0._remote_hosts[3].ip6_ll, 128, @@ -531,14 +528,12 @@ class TestIPv6(TestIPv6ND): ns_pg1 = VppNeighbor(self, self.pg1.sw_if_index, self.pg1.remote_hosts[1].mac, - self.pg1.remote_hosts[1].ip6, - af=AF_INET6) + self.pg1.remote_hosts[1].ip6) ns_pg1.add_vpp_config() ns_pg2 = VppNeighbor(self, self.pg2.sw_if_index, self.pg2.remote_mac, - self.pg1.remote_hosts[1].ip6, - af=AF_INET6) + self.pg1.remote_hosts[1].ip6) ns_pg2.add_vpp_config() # @@ -754,7 +749,7 @@ class TestIPv6(TestIPv6ND): # # Configure The RA to announce the links prefix # - self.pg0.ip6_ra_prefix(self.pg0.local_ip6n, + self.pg0.ip6_ra_prefix(self.pg0.local_ip6, self.pg0.local_ip6_prefix_len) # @@ -780,7 +775,7 @@ class TestIPv6(TestIPv6ND): # Change the prefix info to not off-link # L-flag is clear # - self.pg0.ip6_ra_prefix(self.pg0.local_ip6n, + self.pg0.ip6_ra_prefix(self.pg0.local_ip6, self.pg0.local_ip6_prefix_len, off_link=1) @@ -800,7 +795,7 @@ class TestIPv6(TestIPv6ND): # Change the prefix info to not off-link, no-autoconfig # L and A flag are clear in the advert # - self.pg0.ip6_ra_prefix(self.pg0.local_ip6n, + self.pg0.ip6_ra_prefix(self.pg0.local_ip6, self.pg0.local_ip6_prefix_len, off_link=1, no_autoconfig=1) @@ -821,7 +816,7 @@ class TestIPv6(TestIPv6ND): # Change the flag settings back to the defaults # L and A flag are set in the advert # - self.pg0.ip6_ra_prefix(self.pg0.local_ip6n, + self.pg0.ip6_ra_prefix(self.pg0.local_ip6, self.pg0.local_ip6_prefix_len) opt = ICMPv6NDOptPrefixInfo( @@ -840,7 +835,7 @@ class TestIPv6(TestIPv6ND): # Change the prefix info to not off-link, no-autoconfig # L and A flag are clear in the advert # - self.pg0.ip6_ra_prefix(self.pg0.local_ip6n, + self.pg0.ip6_ra_prefix(self.pg0.local_ip6, self.pg0.local_ip6_prefix_len, off_link=1, no_autoconfig=1) @@ -861,7 +856,7 @@ class TestIPv6(TestIPv6ND): # Use the reset to defults option to revert to defaults # L and A flag are clear in the advert # - self.pg0.ip6_ra_prefix(self.pg0.local_ip6n, + self.pg0.ip6_ra_prefix(self.pg0.local_ip6, self.pg0.local_ip6_prefix_len, use_default=1) @@ -880,7 +875,7 @@ class TestIPv6(TestIPv6ND): # # Advertise Another prefix. With no L-flag/A-flag # - self.pg0.ip6_ra_prefix(self.pg1.local_ip6n, + self.pg0.ip6_ra_prefix(self.pg1.local_ip6, self.pg1.local_ip6_prefix_len, off_link=1, no_autoconfig=1) @@ -910,7 +905,7 @@ class TestIPv6(TestIPv6ND): # Remove the first refix-info - expect the second is still in the # advert # - self.pg0.ip6_ra_prefix(self.pg0.local_ip6n, + self.pg0.ip6_ra_prefix(self.pg0.local_ip6, self.pg0.local_ip6_prefix_len, is_no=1) @@ -929,7 +924,7 @@ class TestIPv6(TestIPv6ND): # # Remove the second prefix-info - expect no prefix-info i nthe adverts # - self.pg0.ip6_ra_prefix(self.pg1.local_ip6n, + self.pg0.ip6_ra_prefix(self.pg1.local_ip6, self.pg1.local_ip6_prefix_len, is_no=1) @@ -1063,11 +1058,13 @@ class TestIPv6RD(TestIPv6ND): self.pg1.local_mac) def verify_prefix_info(self, reported_prefix, prefix_option): - prefix = socket.inet_pton(socket.AF_INET6, - prefix_option.getfieldval("prefix")) - self.assert_equal(reported_prefix.dst_address, prefix) - self.assert_equal(reported_prefix.dst_address_length, - prefix_option.getfieldval("prefixlen")) + prefix = IPv6Network( + text_type(prefix_option.getfieldval("prefix") + + "/" + + text_type(prefix_option.getfieldval("prefixlen"))), + strict=False) + self.assert_equal(reported_prefix.prefix.network_address, + prefix.network_address) L = prefix_option.getfieldval("L") A = prefix_option.getfieldval("A") option_flags = (L << 7) | (A << 6) @@ -1348,9 +1345,9 @@ class IPv6NDProxyTest(TestIPv6ND): # # Add proxy support for the host # - self.vapi.ip6_nd_proxy( - inet_pton(AF_INET6, self.pg0._remote_hosts[2].ip6), - self.pg1.sw_if_index) + self.vapi.ip6nd_proxy_add_del( + ip=inet_pton(AF_INET6, self.pg0._remote_hosts[2].ip6), + sw_if_index=self.pg1.sw_if_index) # # try that NS again. this time we expect an NA back @@ -1365,8 +1362,7 @@ class IPv6NDProxyTest(TestIPv6ND): # self.assertTrue(find_nbr(self, self.pg1.sw_if_index, - self.pg0._remote_hosts[2].ip6, - inet=AF_INET6)) + self.pg0._remote_hosts[2].ip6)) # # ... and we can route traffic to it @@ -1416,9 +1412,9 @@ class IPv6NDProxyTest(TestIPv6ND): ICMPv6NDOptSrcLLAddr( lladdr=self.pg0._remote_hosts[2].mac)) - self.vapi.ip6_nd_proxy( - inet_pton(AF_INET6, self.pg0._remote_hosts[3].ip6), - self.pg2.sw_if_index) + self.vapi.ip6nd_proxy_add_del( + ip=inet_pton(AF_INET6, self.pg0._remote_hosts[3].ip6), + sw_if_index=self.pg2.sw_if_index) self.send_and_expect_na(self.pg2, ns_pg2, "NS to proxy entry other interface", @@ -1427,8 +1423,7 @@ class IPv6NDProxyTest(TestIPv6ND): self.assertTrue(find_nbr(self, self.pg2.sw_if_index, - self.pg0._remote_hosts[3].ip6, - inet=AF_INET6)) + self.pg0._remote_hosts[3].ip6)) # # hosts can communicate. pg2->pg1 @@ -1457,23 +1452,19 @@ class IPv6NDProxyTest(TestIPv6ND): # # remove the proxy configs # - self.vapi.ip6_nd_proxy( - inet_pton(AF_INET6, self.pg0._remote_hosts[2].ip6), - self.pg1.sw_if_index, - is_del=1) - self.vapi.ip6_nd_proxy( - inet_pton(AF_INET6, self.pg0._remote_hosts[3].ip6), - self.pg2.sw_if_index, - is_del=1) + self.vapi.ip6nd_proxy_add_del( + ip=inet_pton(AF_INET6, self.pg0._remote_hosts[2].ip6), + sw_if_index=self.pg1.sw_if_index, is_del=1) + self.vapi.ip6nd_proxy_add_del( + ip=inet_pton(AF_INET6, self.pg0._remote_hosts[3].ip6), + sw_if_index=self.pg2.sw_if_index, is_del=1) self.assertFalse(find_nbr(self, self.pg2.sw_if_index, - self.pg0._remote_hosts[3].ip6, - inet=AF_INET6)) + self.pg0._remote_hosts[3].ip6)) self.assertFalse(find_nbr(self, self.pg1.sw_if_index, - self.pg0._remote_hosts[2].ip6, - inet=AF_INET6)) + self.pg0._remote_hosts[2].ip6)) # # no longer proxy-ing... @@ -1676,20 +1667,20 @@ class TestIP6LoadBalance(VppTestCase): i.disable_mpls() super(TestIP6LoadBalance, self).tearDown() - def send_and_expect_load_balancing(self, input, pkts, outputs): + def pg_send(self, input, pkts): self.vapi.cli("clear trace") input.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) self.pg_start() + + def send_and_expect_load_balancing(self, input, pkts, outputs): + self.pg_send(input, pkts) for oo in outputs: rx = oo._get_capture(1) self.assertNotEqual(0, len(rx)) def send_and_expect_one_itf(self, input, pkts, itf): - self.vapi.cli("clear trace") - input.add_stream(pkts) - self.pg_enable_capture(self.pg_interfaces) - self.pg_start() + self.pg_send(input, pkts) rx = itf.get_capture(len(pkts)) def test_ip6_load_balance(self): @@ -1949,7 +1940,7 @@ class TestIP6Punt(VppTestCase): # # Configure a punt redirect via pg1. # - nh_addr = VppIpAddress(self.pg1.remote_ip6).encode() + nh_addr = self.pg1.remote_ip6 self.vapi.ip_punt_redirect(self.pg0.sw_if_index, self.pg1.sw_if_index, nh_addr) @@ -1977,7 +1968,7 @@ class TestIP6Punt(VppTestCase): self.assertLess(len(rx), len(pkts)) # - # remove the poilcer. back to full rx + # remove the policer. back to full rx # self.vapi.ip_punt_police(policer.policer_index, is_add=0, is_ip6=1) self.vapi.policer_add_del("ip6-punt", 400, 0, 10, 0, @@ -2013,7 +2004,7 @@ class TestIP6Punt(VppTestCase): # # Configure a punt redirects # - nh_addr = VppIpAddress(self.pg3.remote_ip6).encode() + nh_addr = self.pg3.remote_ip6 self.vapi.ip_punt_redirect(self.pg0.sw_if_index, self.pg3.sw_if_index, nh_addr) @@ -2022,7 +2013,7 @@ class TestIP6Punt(VppTestCase): nh_addr) self.vapi.ip_punt_redirect(self.pg2.sw_if_index, self.pg3.sw_if_index, - VppIpAddress('0::0').encode()) + '0::0') # # Dump pg0 punt redirects @@ -2039,8 +2030,8 @@ class TestIP6Punt(VppTestCase): self.assertEqual(len(punts), 3) for p in punts: self.assertEqual(p.punt.tx_sw_if_index, self.pg3.sw_if_index) - self.assertNotEqual(punts[1].punt.nh.un.ip6, self.pg3.remote_ip6) - self.assertEqual(punts[2].punt.nh.un.ip6.address, '\x00'*16) + self.assertNotEqual(punts[1].punt.nh, self.pg3.remote_ip6) + self.assertEqual(str(punts[2].punt.nh), '::') class TestIPDeag(VppTestCase): @@ -2163,7 +2154,7 @@ class TestIPDeag(VppTestCase): class TestIP6Input(VppTestCase): - """ IPv6 Input Exceptions """ + """ IPv6 Input Exception Test Cases """ def setUp(self): super(TestIP6Input, self).setUp() @@ -2181,25 +2172,10 @@ class TestIP6Input(VppTestCase): i.unconfig_ip6() i.admin_down() - def test_ip_input(self): - """ IP6 Input Exceptions """ - - # - # bad version - this is dropped - # - p_version = (Ether(src=self.pg0.remote_mac, - dst=self.pg0.local_mac) / - IPv6(src=self.pg0.remote_ip6, - dst=self.pg1.remote_ip6, - version=3) / - inet6.UDP(sport=1234, dport=1234) / - Raw('\xa5' * 100)) - - self.send_and_assert_no_replies(self.pg0, p_version * 65, - "funky version") - + def test_ip_input_icmp_reply(self): + """ IP6 Input Exception - Return ICMP (3,0) """ # - # hop limit - IMCP replies + # hop limit - ICMP replies # p_version = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / @@ -2212,9 +2188,45 @@ class TestIP6Input(VppTestCase): rx = self.send_and_expect(self.pg0, p_version * 65, self.pg0) rx = rx[0] icmp = rx[ICMPv6TimeExceeded] - self.assertEqual(icmp.type, 3) + # 0: "hop limit exceeded in transit", - self.assertEqual(icmp.code, 0) + self.assertEqual((icmp.type, icmp.code), (3, 0)) + + icmpv6_data = '\x0a' * 18 + all_0s = "::" + all_1s = "FFFF:FFFF:FFFF:FFFF:FFFF:FFFF:FFFF:FFFF" + + @parameterized.expand([ + # Name, src, dst, l4proto, msg, timeout + ("src='iface', dst='iface'", None, None, + inet6.UDP(sport=1234, dport=1234), "funky version", None), + ("src='All 0's', dst='iface'", all_0s, None, + ICMPv6EchoRequest(id=0xb, seq=5, data=icmpv6_data), None, 0.1), + ("src='iface', dst='All 0's'", None, all_0s, + ICMPv6EchoRequest(id=0xb, seq=5, data=icmpv6_data), None, 0.1), + ("src='All 1's', dst='iface'", all_1s, None, + ICMPv6EchoRequest(id=0xb, seq=5, data=icmpv6_data), None, 0.1), + ("src='iface', dst='All 1's'", None, all_1s, + ICMPv6EchoRequest(id=0xb, seq=5, data=icmpv6_data), None, 0.1), + ("src='All 1's', dst='All 1's'", all_1s, all_1s, + ICMPv6EchoRequest(id=0xb, seq=5, data=icmpv6_data), None, 0.1), + + ]) + def test_ip_input_no_replies(self, name, src, dst, l4, msg, timeout): + + self._testMethodDoc = 'IPv6 Input Exception - %s' % name + + p_version = (Ether(src=self.pg0.remote_mac, + dst=self.pg0.local_mac) / + IPv6(src=src or self.pg0.remote_ip6, + dst=dst or self.pg1.remote_ip6, + version=3) / + l4 / + Raw('\xa5' * 100)) + + self.send_and_assert_no_replies(self.pg0, p_version * 65, + remark=msg or "", + timeout=timeout) if __name__ == '__main__':