X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=test%2Ftest_nat.py;h=1f87bffab577f555a8d6de3f57945a6ba8c26850;hb=b932d26ea48ba8aa7677dc3b6ffd5d4729176c8f;hp=3c002bb8eca4940396de8d7f26875a0ba75cc6ef;hpb=efcd1e9e1d7dda4e4ea3db5750925cd8f6894f4d;p=vpp.git diff --git a/test/test_nat.py b/test/test_nat.py index 3c002bb8eca..1f87bffab57 100644 --- a/test/test_nat.py +++ b/test/test_nat.py @@ -704,8 +704,6 @@ class TestNAT44(MethodHolder): cls.pg4._remote_ip4 = cls.pg9._remote_hosts[0]._ip4 = "10.0.0.2" cls.pg9.resolve_arp() - random.seed() - except Exception: super(TestNAT44, cls).tearDownClass() raise @@ -739,7 +737,9 @@ class TestNAT44(MethodHolder): interfaces = self.vapi.nat44_interface_addr_dump() for intf in interfaces: - self.vapi.nat44_add_interface_addr(intf.sw_if_index, is_add=0) + self.vapi.nat44_add_interface_addr(intf.sw_if_index, + twice_nat=intf.twice_nat, + is_add=0) self.vapi.nat_ipfix(enable=0, src_port=self.ipfix_src_port, domain_id=self.ipfix_domain_id) @@ -772,6 +772,7 @@ class TestNAT44(MethodHolder): addr_only=sm.addr_only, vrf_id=sm.vrf_id, protocol=sm.protocol, + twice_nat=sm.twice_nat, is_add=0) lb_static_mappings = self.vapi.nat44_lb_static_mapping_dump() @@ -780,15 +781,28 @@ class TestNAT44(MethodHolder): lb_sm.external_addr, lb_sm.external_port, lb_sm.protocol, - lb_sm.vrf_id, + vrf_id=lb_sm.vrf_id, + twice_nat=lb_sm.twice_nat, is_add=0, local_num=0, locals=[]) + identity_mappings = self.vapi.nat44_identity_mapping_dump() + for id_m in identity_mappings: + self.vapi.nat44_add_del_identity_mapping( + addr_only=id_m.addr_only, + ip=id_m.ip_address, + port=id_m.port, + sw_if_index=id_m.sw_if_index, + vrf_id=id_m.vrf_id, + protocol=id_m.protocol, + is_add=0) + adresses = self.vapi.nat44_address_dump() for addr in adresses: self.vapi.nat44_add_del_address_range(addr.ip_address, addr.ip_address, + twice_nat=addr.twice_nat, is_add=0) self.vapi.nat_set_reass() @@ -797,7 +811,7 @@ class TestNAT44(MethodHolder): def nat44_add_static_mapping(self, local_ip, external_ip='0.0.0.0', local_port=0, external_port=0, vrf_id=0, is_add=1, external_sw_if_index=0xFFFFFFFF, - proto=0): + proto=0, twice_nat=0): """ Add/delete NAT44 static mapping @@ -809,6 +823,7 @@ class TestNAT44(MethodHolder): :param is_add: 1 if add, 0 if delete (Default add) :param external_sw_if_index: External interface instead of IP address :param proto: IP protocol (Mandatory if port specified) + :param twice_nat: 1 if translate external host address and port """ addr_only = 1 if local_port and external_port: @@ -824,18 +839,21 @@ class TestNAT44(MethodHolder): addr_only, vrf_id, proto, + twice_nat, is_add) - def nat44_add_address(self, ip, is_add=1, vrf_id=0xFFFFFFFF): + def nat44_add_address(self, ip, is_add=1, vrf_id=0xFFFFFFFF, twice_nat=0): """ Add/delete NAT44 address :param ip: IP address :param is_add: 1 if add, 0 if delete (Default add) + :param twice_nat: twice NAT address for extenal hosts """ nat_addr = socket.inet_pton(socket.AF_INET, ip) self.vapi.nat44_add_del_address_range(nat_addr, nat_addr, is_add, - vrf_id=vrf_id) + vrf_id=vrf_id, + twice_nat=twice_nat) def test_dynamic(self): """ NAT44 dynamic translation test """ @@ -1192,6 +1210,35 @@ class TestNAT44(MethodHolder): self.pg_start() self.pg3.assert_nothing_captured() + def test_identity_nat(self): + """ Identity NAT """ + + self.vapi.nat44_add_del_identity_mapping(ip=self.pg0.remote_ip4n) + self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index) + self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index, + is_inside=0) + + p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) / + IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) / + TCP(sport=12345, dport=56789)) + self.pg1.add_stream(p) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg0.get_capture(1) + p = capture[0] + try: + ip = p[IP] + tcp = p[TCP] + self.assertEqual(ip.dst, self.pg0.remote_ip4) + self.assertEqual(ip.src, self.pg1.remote_ip4) + self.assertEqual(tcp.dport, 56789) + self.assertEqual(tcp.sport, 12345) + self.check_tcp_checksum(p) + self.check_ip_checksum(p) + except: + self.logger.error(ppp("Unexpected or invalid packet:", p)) + raise + def test_static_lb(self): """ NAT44 local service load balancing """ external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr) @@ -1787,6 +1834,38 @@ class TestNAT44(MethodHolder): static_mappings = self.vapi.nat44_static_mapping_dump() self.assertEqual(0, len(static_mappings)) + def test_interface_addr_identity_nat(self): + """ Identity NAT with addresses from interface """ + + port = 53053 + self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index) + self.vapi.nat44_add_del_identity_mapping( + sw_if_index=self.pg7.sw_if_index, + port=port, + protocol=IP_PROTOS.tcp, + addr_only=0) + + # identity mappings with external interface + identity_mappings = self.vapi.nat44_identity_mapping_dump() + self.assertEqual(1, len(identity_mappings)) + self.assertEqual(self.pg7.sw_if_index, + identity_mappings[0].sw_if_index) + + # configure interface address and check identity mappings + self.pg7.config_ip4() + identity_mappings = self.vapi.nat44_identity_mapping_dump() + self.assertEqual(1, len(identity_mappings)) + self.assertEqual(identity_mappings[0].ip_address, + self.pg7.local_ip4n) + self.assertEqual(0xFFFFFFFF, identity_mappings[0].sw_if_index) + self.assertEqual(port, identity_mappings[0].port) + self.assertEqual(IP_PROTOS.tcp, identity_mappings[0].protocol) + + # remove interface address and check identity mappings + self.pg7.unconfig_ip4() + identity_mappings = self.vapi.nat44_identity_mapping_dump() + self.assertEqual(0, len(identity_mappings)) + def test_ipfix_nat44_sess(self): """ IPFIX logging NAT44 session created/delted """ self.ipfix_domain_id = 10 @@ -1986,7 +2065,7 @@ class TestNAT44(MethodHolder): nat_ip2 = "10.0.0.11" self.nat44_add_address(nat_ip1) - self.nat44_add_address(nat_ip2) + self.nat44_add_address(nat_ip2, vrf_id=99) self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index) self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index) self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index, @@ -2756,11 +2835,201 @@ class TestNAT44(MethodHolder): self.assertEqual(p[TCP].dport, self.tcp_port_in) self.assertEqual(data, p[Raw].load) + def test_port_restricted(self): + """ Port restricted NAT44 (MAP-E CE) """ + self.nat44_add_address(self.nat_addr) + self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index) + self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index, + is_inside=0) + self.vapi.cli("nat addr-port-assignment-alg map-e psid 10 " + "psid-offset 6 psid-len 6") + + p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / + IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / + TCP(sport=4567, dport=22)) + self.pg0.add_stream(p) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg1.get_capture(1) + p = capture[0] + try: + ip = p[IP] + tcp = p[TCP] + self.assertEqual(ip.dst, self.pg1.remote_ip4) + self.assertEqual(ip.src, self.nat_addr) + self.assertEqual(tcp.dport, 22) + self.assertNotEqual(tcp.sport, 4567) + self.assertEqual((tcp.sport >> 6) & 63, 10) + self.check_tcp_checksum(p) + self.check_ip_checksum(p) + except: + self.logger.error(ppp("Unexpected or invalid packet:", p)) + raise + + def test_twice_nat(self): + """ Twice NAT44 """ + twice_nat_addr = '10.0.1.3' + port_in = 8080 + port_out = 80 + eh_port_out = 4567 + eh_port_in = 0 + self.nat44_add_address(self.nat_addr) + self.nat44_add_address(twice_nat_addr, twice_nat=1) + self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr, + port_in, port_out, proto=IP_PROTOS.tcp, + twice_nat=1) + self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index) + self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index, + is_inside=0) + + p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) / + IP(src=self.pg1.remote_ip4, dst=self.nat_addr) / + TCP(sport=eh_port_out, dport=port_out)) + self.pg1.add_stream(p) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg0.get_capture(1) + p = capture[0] + try: + ip = p[IP] + tcp = p[TCP] + self.assertEqual(ip.dst, self.pg0.remote_ip4) + self.assertEqual(ip.src, twice_nat_addr) + self.assertEqual(tcp.dport, port_in) + self.assertNotEqual(tcp.sport, eh_port_out) + eh_port_in = tcp.sport + self.check_tcp_checksum(p) + self.check_ip_checksum(p) + except: + self.logger.error(ppp("Unexpected or invalid packet:", p)) + raise + + p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / + IP(src=self.pg0.remote_ip4, dst=twice_nat_addr) / + TCP(sport=port_in, dport=eh_port_in)) + self.pg0.add_stream(p) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg1.get_capture(1) + p = capture[0] + try: + ip = p[IP] + tcp = p[TCP] + self.assertEqual(ip.dst, self.pg1.remote_ip4) + self.assertEqual(ip.src, self.nat_addr) + self.assertEqual(tcp.dport, eh_port_out) + self.assertEqual(tcp.sport, port_out) + self.check_tcp_checksum(p) + self.check_ip_checksum(p) + except: + self.logger.error(ppp("Unexpected or invalid packet:", p)) + raise + + def test_twice_nat_lb(self): + """ Twice NAT44 local service load balancing """ + external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr) + twice_nat_addr = '10.0.1.3' + local_port = 8080 + external_port = 80 + eh_port_out = 4567 + eh_port_in = 0 + server1 = self.pg0.remote_hosts[0] + server2 = self.pg0.remote_hosts[1] + + locals = [{'addr': server1.ip4n, + 'port': local_port, + 'probability': 50}, + {'addr': server2.ip4n, + 'port': local_port, + 'probability': 50}] + + self.nat44_add_address(self.nat_addr) + self.nat44_add_address(twice_nat_addr, twice_nat=1) + + self.vapi.nat44_add_del_lb_static_mapping(external_addr_n, + external_port, + IP_PROTOS.tcp, + twice_nat=1, + local_num=len(locals), + locals=locals) + self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index) + self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index, + is_inside=0) + + p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) / + IP(src=self.pg1.remote_ip4, dst=self.nat_addr) / + TCP(sport=eh_port_out, dport=external_port)) + self.pg1.add_stream(p) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg0.get_capture(1) + p = capture[0] + server = None + try: + ip = p[IP] + tcp = p[TCP] + self.assertEqual(ip.src, twice_nat_addr) + self.assertIn(ip.dst, [server1.ip4, server2.ip4]) + if ip.dst == server1.ip4: + server = server1 + else: + server = server2 + self.assertNotEqual(tcp.sport, eh_port_out) + eh_port_in = tcp.sport + self.assertEqual(tcp.dport, local_port) + self.check_tcp_checksum(p) + self.check_ip_checksum(p) + except: + self.logger.error(ppp("Unexpected or invalid packet:", p)) + raise + + p = (Ether(src=server.mac, dst=self.pg0.local_mac) / + IP(src=server.ip4, dst=twice_nat_addr) / + TCP(sport=local_port, dport=eh_port_in)) + self.pg0.add_stream(p) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg1.get_capture(1) + p = capture[0] + try: + ip = p[IP] + tcp = p[TCP] + self.assertEqual(ip.src, self.nat_addr) + self.assertEqual(ip.dst, self.pg1.remote_ip4) + self.assertEqual(tcp.sport, external_port) + self.assertEqual(tcp.dport, eh_port_out) + self.check_tcp_checksum(p) + self.check_ip_checksum(p) + except: + self.logger.error(ppp("Unexpected or invalid packet:", p)) + raise + + def test_twice_nat_interface_addr(self): + """ Acquire twice NAT44 addresses from interface """ + self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index, twice_nat=1) + + # no address in NAT pool + adresses = self.vapi.nat44_address_dump() + self.assertEqual(0, len(adresses)) + + # configure interface address and check NAT address pool + self.pg7.config_ip4() + adresses = self.vapi.nat44_address_dump() + self.assertEqual(1, len(adresses)) + self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n) + self.assertEqual(adresses[0].twice_nat, 1) + + # remove interface address and check NAT address pool + self.pg7.unconfig_ip4() + adresses = self.vapi.nat44_address_dump() + self.assertEqual(0, len(adresses)) + def tearDown(self): super(TestNAT44, self).tearDown() if not self.vpp_dead: self.logger.info(self.vapi.cli("show nat44 verbose")) self.logger.info(self.vapi.cli("show nat virtual-reassembly")) + self.vapi.cli("nat addr-port-assignment-alg default") self.clear_nat44() @@ -3428,7 +3697,7 @@ class TestNAT64(MethodHolder): cls.vrf1_nat_addr_n = socket.inet_pton(socket.AF_INET, cls.vrf1_nat_addr) - cls.create_pg_interfaces(range(4)) + cls.create_pg_interfaces(range(5)) cls.ip6_interfaces = list(cls.pg_interfaces[0:1]) cls.ip6_interfaces.append(cls.pg_interfaces[2]) cls.ip4_interfaces = list(cls.pg_interfaces[1:2]) @@ -4337,6 +4606,25 @@ class TestNAT64(MethodHolder): self.assertEqual(p[TCP].dport, self.tcp_port_in) self.assertEqual(data, p[Raw].load) + def test_interface_addr(self): + """ Acquire NAT64 pool addresses from interface """ + self.vapi.nat64_add_interface_addr(self.pg4.sw_if_index) + + # no address in NAT64 pool + adresses = self.vapi.nat44_address_dump() + self.assertEqual(0, len(adresses)) + + # configure interface address and check NAT64 address pool + self.pg4.config_ip4() + addresses = self.vapi.nat64_pool_addr_dump() + self.assertEqual(len(addresses), 1) + self.assertEqual(addresses[0].address, self.pg4.local_ip4n) + + # remove interface address and check NAT64 address pool + self.pg4.unconfig_ip4() + addresses = self.vapi.nat64_pool_addr_dump() + self.assertEqual(0, len(adresses)) + def nat64_get_ses_num(self): """ Return number of active NAT64 sessions. @@ -4557,6 +4845,20 @@ class TestDSlite(MethodHolder): self.check_ip_checksum(capture) self.check_icmp_checksum(capture) + # ping DS-Lite AFTR tunnel endpoint address + p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) / + IPv6(src=self.pg1.remote_hosts[1].ip6, dst=aftr_ip6) / + ICMPv6EchoRequest()) + self.pg1.add_stream(p) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg1.get_capture(1) + self.assertEqual(1, len(capture)) + capture = capture[0] + self.assertEqual(capture[IPv6].src, aftr_ip6) + self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6) + self.assertTrue(capture.haslayer(ICMPv6EchoReply)) + def tearDown(self): super(TestDSlite, self).tearDown() if not self.vpp_dead: