X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=test%2Ftest_nat.py;h=f0614da2f3cd7fc5bc74a34346b56174c55684ea;hb=5f22499e8836066a268dcb3f4d16cfcd0244b2a2;hp=7194704046d647619b0db7b0780dcc825d04d850;hpb=a431ad1c486ad0fd9ca35e14c527fe7611965fc2;p=vpp.git diff --git a/test/test_nat.py b/test/test_nat.py index 7194704046d..f0614da2f3c 100644 --- a/test/test_nat.py +++ b/test/test_nat.py @@ -989,6 +989,8 @@ class TestNAT44(MethodHolder): if self.pg7.has_ip4_config: self.pg7.unconfig_ip4() + self.vapi.nat44_forwarding_enable_disable(0) + interfaces = self.vapi.nat44_interface_addr_dump() for intf in interfaces: self.vapi.nat44_add_interface_addr(intf.sw_if_index, @@ -1027,6 +1029,8 @@ class TestNAT44(MethodHolder): vrf_id=sm.vrf_id, protocol=sm.protocol, twice_nat=sm.twice_nat, + out2in_only=sm.out2in_only, + tag=sm.tag, is_add=0) lb_static_mappings = self.vapi.nat44_lb_static_mapping_dump() @@ -1037,6 +1041,8 @@ class TestNAT44(MethodHolder): lb_sm.protocol, vrf_id=lb_sm.vrf_id, twice_nat=lb_sm.twice_nat, + out2in_only=lb_sm.out2in_only, + tag=lb_sm.tag, is_add=0, local_num=0, locals=[]) @@ -1065,7 +1071,7 @@ class TestNAT44(MethodHolder): def nat44_add_static_mapping(self, local_ip, external_ip='0.0.0.0', local_port=0, external_port=0, vrf_id=0, is_add=1, external_sw_if_index=0xFFFFFFFF, - proto=0, twice_nat=0): + proto=0, twice_nat=0, out2in_only=0, tag=""): """ Add/delete NAT44 static mapping @@ -1078,6 +1084,8 @@ class TestNAT44(MethodHolder): :param external_sw_if_index: External interface instead of IP address :param proto: IP protocol (Mandatory if port specified) :param twice_nat: 1 if translate external host address and port + :param out2in_only: if 1 rule is matching only out2in direction + :param tag: Opaque string tag """ addr_only = 1 if local_port and external_port: @@ -1094,6 +1102,8 @@ class TestNAT44(MethodHolder): vrf_id, proto, twice_nat, + out2in_only, + tag, is_add) def nat44_add_address(self, ip, is_add=1, vrf_id=0xFFFFFFFF, twice_nat=0): @@ -1370,6 +1380,9 @@ class TestNAT44(MethodHolder): self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index) self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index, is_inside=0) + sm = self.vapi.nat44_static_mapping_dump() + self.assertEqual(len(sm), 1) + self.assertEqual((sm[0].tag).split('\0', 1)[0], '') # in2out pkts = self.create_stream_in(self.pg0, self.pg1) @@ -1394,11 +1407,15 @@ class TestNAT44(MethodHolder): self.tcp_port_out = 6303 self.udp_port_out = 6304 self.icmp_id_out = 6305 + tag = "testTAG" - self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip) + self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip, tag=tag) self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index) self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index, is_inside=0) + sm = self.vapi.nat44_static_mapping_dump() + self.assertEqual(len(sm), 1) + self.assertEqual((sm[0].tag).split('\0', 1)[0], tag) # out2in pkts = self.create_stream_out(self.pg1, nat_ip) @@ -1490,6 +1507,102 @@ class TestNAT44(MethodHolder): capture = self.pg1.get_capture(len(pkts)) self.verify_capture_out(capture) + def test_static_with_port_out2(self): + """ 1:1 NAPT symmetrical rule """ + + external_port = 80 + local_port = 8080 + + self.vapi.nat44_forwarding_enable_disable(1) + self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr, + local_port, external_port, + proto=IP_PROTOS.tcp, out2in_only=1) + self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index) + self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index, + is_inside=0) + + # from client to service + p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) / + IP(src=self.pg1.remote_ip4, dst=self.nat_addr) / + TCP(sport=12345, dport=external_port)) + self.pg1.add_stream(p) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg0.get_capture(1) + p = capture[0] + server = None + try: + ip = p[IP] + tcp = p[TCP] + self.assertEqual(ip.dst, self.pg0.remote_ip4) + self.assertEqual(tcp.dport, local_port) + self.check_tcp_checksum(p) + self.check_ip_checksum(p) + except: + self.logger.error(ppp("Unexpected or invalid packet:", p)) + raise + + # from service back to client + p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / + IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / + TCP(sport=local_port, dport=12345)) + self.pg0.add_stream(p) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg1.get_capture(1) + p = capture[0] + try: + ip = p[IP] + tcp = p[TCP] + self.assertEqual(ip.src, self.nat_addr) + self.assertEqual(tcp.sport, external_port) + self.check_tcp_checksum(p) + self.check_ip_checksum(p) + except: + self.logger.error(ppp("Unexpected or invalid packet:", p)) + raise + + # from client to server (no translation) + p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) / + IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) / + TCP(sport=12346, dport=local_port)) + self.pg1.add_stream(p) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg0.get_capture(1) + p = capture[0] + server = None + try: + ip = p[IP] + tcp = p[TCP] + self.assertEqual(ip.dst, self.pg0.remote_ip4) + self.assertEqual(tcp.dport, local_port) + self.check_tcp_checksum(p) + self.check_ip_checksum(p) + except: + self.logger.error(ppp("Unexpected or invalid packet:", p)) + raise + + # from service back to client (no translation) + p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / + IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / + TCP(sport=local_port, dport=12346)) + self.pg0.add_stream(p) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg1.get_capture(1) + p = capture[0] + try: + ip = p[IP] + tcp = p[TCP] + self.assertEqual(ip.src, self.pg0.remote_ip4) + self.assertEqual(tcp.sport, local_port) + self.check_tcp_checksum(p) + self.check_ip_checksum(p) + except: + self.logger.error(ppp("Unexpected or invalid packet:", p)) + raise + def test_static_vrf_aware(self): """ 1:1 NAT VRF awareness """ @@ -1644,6 +1757,118 @@ class TestNAT44(MethodHolder): server2_n += 1 self.assertTrue(server1_n > server2_n) + def test_static_lb_2(self): + """ NAT44 local service load balancing (asymmetrical rule) """ + external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr) + external_port = 80 + local_port = 8080 + server1 = self.pg0.remote_hosts[0] + server2 = self.pg0.remote_hosts[1] + + locals = [{'addr': server1.ip4n, + 'port': local_port, + 'probability': 70}, + {'addr': server2.ip4n, + 'port': local_port, + 'probability': 30}] + + self.vapi.nat44_forwarding_enable_disable(1) + self.vapi.nat44_add_del_lb_static_mapping(external_addr_n, + external_port, + IP_PROTOS.tcp, + out2in_only=1, + local_num=len(locals), + locals=locals) + self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index) + self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index, + is_inside=0) + + # from client to service + p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) / + IP(src=self.pg1.remote_ip4, dst=self.nat_addr) / + TCP(sport=12345, dport=external_port)) + self.pg1.add_stream(p) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg0.get_capture(1) + p = capture[0] + server = None + try: + ip = p[IP] + tcp = p[TCP] + self.assertIn(ip.dst, [server1.ip4, server2.ip4]) + if ip.dst == server1.ip4: + server = server1 + else: + server = server2 + self.assertEqual(tcp.dport, local_port) + self.check_tcp_checksum(p) + self.check_ip_checksum(p) + except: + self.logger.error(ppp("Unexpected or invalid packet:", p)) + raise + + # from service back to client + p = (Ether(src=server.mac, dst=self.pg0.local_mac) / + IP(src=server.ip4, dst=self.pg1.remote_ip4) / + TCP(sport=local_port, dport=12345)) + self.pg0.add_stream(p) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg1.get_capture(1) + p = capture[0] + try: + ip = p[IP] + tcp = p[TCP] + self.assertEqual(ip.src, self.nat_addr) + self.assertEqual(tcp.sport, external_port) + self.check_tcp_checksum(p) + self.check_ip_checksum(p) + except: + self.logger.error(ppp("Unexpected or invalid packet:", p)) + raise + + # from client to server (no translation) + p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) / + IP(src=self.pg1.remote_ip4, dst=server1.ip4) / + TCP(sport=12346, dport=local_port)) + self.pg1.add_stream(p) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg0.get_capture(1) + p = capture[0] + server = None + try: + ip = p[IP] + tcp = p[TCP] + self.assertEqual(ip.dst, server1.ip4) + self.assertEqual(tcp.dport, local_port) + self.check_tcp_checksum(p) + self.check_ip_checksum(p) + except: + self.logger.error(ppp("Unexpected or invalid packet:", p)) + raise + + # from service back to client (no translation) + p = (Ether(src=server1.mac, dst=self.pg0.local_mac) / + IP(src=server1.ip4, dst=self.pg1.remote_ip4) / + TCP(sport=local_port, dport=12346)) + self.pg0.add_stream(p) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg1.get_capture(1) + p = capture[0] + try: + ip = p[IP] + tcp = p[TCP] + self.assertEqual(ip.src, server1.ip4) + self.assertEqual(tcp.sport, local_port) + self.check_tcp_checksum(p) + self.check_ip_checksum(p) + except: + self.logger.error(ppp("Unexpected or invalid packet:", p)) + raise + def test_multiple_inside_interfaces(self): """ NAT44 multiple non-overlapping address space inside interfaces """ @@ -1908,7 +2133,7 @@ class TestNAT44(MethodHolder): self.assertEqual(tcp.dport, host_in_port) self.check_tcp_checksum(p) except: - self.logger.error(ppp("Unexpected or invalid packet:"), p) + self.logger.error(ppp("Unexpected or invalid packet:", p)) raise def test_hairpinning2(self): @@ -2124,16 +2349,20 @@ class TestNAT44(MethodHolder): def test_interface_addr_static_mapping(self): """ Static mapping with addresses from interface """ + tag = "testTAG" + self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index) self.nat44_add_static_mapping( '1.2.3.4', - external_sw_if_index=self.pg7.sw_if_index) + external_sw_if_index=self.pg7.sw_if_index, + tag=tag) # static mappings with external interface static_mappings = self.vapi.nat44_static_mapping_dump() self.assertEqual(1, len(static_mappings)) self.assertEqual(self.pg7.sw_if_index, static_mappings[0].external_sw_if_index) + self.assertEqual((static_mappings[0].tag).split('\0', 1)[0], tag) # configure interface address and check static mappings self.pg7.config_ip4() @@ -2142,6 +2371,7 @@ class TestNAT44(MethodHolder): self.assertEqual(static_mappings[0].external_ip_address[0:4], self.pg7.local_ip4n) self.assertEqual(0xFFFFFFFF, static_mappings[0].external_sw_if_index) + self.assertEqual((static_mappings[0].tag).split('\0', 1)[0], tag) # remove interface address and check static mappings self.pg7.unconfig_ip4() @@ -2967,7 +3197,7 @@ class TestNAT44(MethodHolder): self.assertEqual(tcp.dport, host_in_port) self.check_tcp_checksum(p) except: - self.logger.error(ppp("Unexpected or invalid packet:"), p) + self.logger.error(ppp("Unexpected or invalid packet:", p)) raise def test_one_armed_nat44(self): @@ -3026,6 +3256,70 @@ class TestNAT44(MethodHolder): self.logger.error(ppp("Unexpected or invalid packet:", p)) raise + def test_one_armed_nat44_static(self): + """ One armed NAT44 and 1:1 NAPT symmetrical rule """ + remote_host = self.pg9.remote_hosts[0] + local_host = self.pg9.remote_hosts[1] + external_port = 80 + local_port = 8080 + eh_port_in = 0 + + self.vapi.nat44_forwarding_enable_disable(1) + self.nat44_add_address(self.nat_addr, twice_nat=1) + self.nat44_add_static_mapping(local_host.ip4, self.nat_addr, + local_port, external_port, + proto=IP_PROTOS.tcp, out2in_only=1, + twice_nat=1) + self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index) + self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index, + is_inside=0) + + # from client to service + p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) / + IP(src=remote_host.ip4, dst=self.nat_addr) / + TCP(sport=12345, dport=external_port)) + self.pg9.add_stream(p) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg9.get_capture(1) + p = capture[0] + server = None + try: + ip = p[IP] + tcp = p[TCP] + self.assertEqual(ip.dst, local_host.ip4) + self.assertEqual(ip.src, self.nat_addr) + self.assertEqual(tcp.dport, local_port) + self.assertNotEqual(tcp.sport, 12345) + eh_port_in = tcp.sport + self.check_tcp_checksum(p) + self.check_ip_checksum(p) + except: + self.logger.error(ppp("Unexpected or invalid packet:", p)) + raise + + # from service back to client + p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) / + IP(src=local_host.ip4, dst=self.nat_addr) / + TCP(sport=local_port, dport=eh_port_in)) + self.pg9.add_stream(p) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg9.get_capture(1) + p = capture[0] + try: + ip = p[IP] + tcp = p[TCP] + self.assertEqual(ip.src, self.nat_addr) + self.assertEqual(ip.dst, remote_host.ip4) + self.assertEqual(tcp.sport, external_port) + self.assertEqual(tcp.dport, 12345) + self.check_tcp_checksum(p) + self.check_ip_checksum(p) + except: + self.logger.error(ppp("Unexpected or invalid packet:", p)) + raise + def test_del_session(self): """ Delete NAT44 session """ self.nat44_add_address(self.nat_addr) @@ -3444,7 +3738,11 @@ class TestNAT44(MethodHolder): def tearDown(self): super(TestNAT44, self).tearDown() if not self.vpp_dead: - self.logger.info(self.vapi.cli("show nat44 verbose")) + self.logger.info(self.vapi.cli("show nat44 addresses")) + self.logger.info(self.vapi.cli("show nat44 interfaces")) + self.logger.info(self.vapi.cli("show nat44 static mappings")) + self.logger.info(self.vapi.cli("show nat44 interface address")) + self.logger.info(self.vapi.cli("show nat44 sessions detail")) self.logger.info(self.vapi.cli("show nat virtual-reassembly")) self.vapi.cli("nat addr-port-assignment-alg default") self.clear_nat44() @@ -4208,7 +4506,13 @@ class TestDeterministicNAT(MethodHolder): def tearDown(self): super(TestDeterministicNAT, self).tearDown() if not self.vpp_dead: - self.logger.info(self.vapi.cli("show nat44 detail")) + self.logger.info(self.vapi.cli("show nat44 interfaces")) + self.logger.info( + self.vapi.cli("show nat44 deterministic mappings")) + self.logger.info( + self.vapi.cli("show nat44 deterministic timeouts")) + self.logger.info( + self.vapi.cli("show nat44 deterministic sessions")) self.clear_nat_det() @@ -5608,5 +5912,109 @@ class TestDSlite(MethodHolder): self.vapi.cli("show dslite aftr-tunnel-endpoint-address")) self.logger.info(self.vapi.cli("show dslite sessions")) + +class TestDSliteCE(MethodHolder): + """ DS-Lite CE Test Cases """ + + @classmethod + def setUpConstants(cls): + super(TestDSliteCE, cls).setUpConstants() + cls.vpp_cmdline.extend(["nat", "{", "dslite ce", "}"]) + + @classmethod + def setUpClass(cls): + super(TestDSliteCE, cls).setUpClass() + + try: + cls.create_pg_interfaces(range(2)) + cls.pg0.admin_up() + cls.pg0.config_ip4() + cls.pg0.resolve_arp() + cls.pg1.admin_up() + cls.pg1.config_ip6() + cls.pg1.generate_remote_hosts(1) + cls.pg1.configure_ipv6_neighbors() + + except Exception: + super(TestDSliteCE, cls).tearDownClass() + raise + + def test_dslite_ce(self): + """ Test DS-Lite CE """ + + b4_ip4 = '192.0.0.2' + b4_ip4_n = socket.inet_pton(socket.AF_INET, b4_ip4) + b4_ip6 = '2001:db8:62aa::375e:f4c1:1' + b4_ip6_n = socket.inet_pton(socket.AF_INET6, b4_ip6) + self.vapi.dslite_set_b4_addr(b4_ip6_n, b4_ip4_n) + + aftr_ip4 = '192.0.0.1' + aftr_ip4_n = socket.inet_pton(socket.AF_INET, aftr_ip4) + aftr_ip6 = '2001:db8:85a3::8a2e:370:1' + aftr_ip6_n = socket.inet_pton(socket.AF_INET6, aftr_ip6) + self.vapi.dslite_set_aftr_addr(aftr_ip6_n, aftr_ip4_n) + + self.vapi.ip_add_del_route(dst_address=aftr_ip6_n, + dst_address_length=128, + next_hop_address=self.pg1.remote_ip6n, + next_hop_sw_if_index=self.pg1.sw_if_index, + is_ipv6=1) + + # UDP encapsulation + p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / + IP(dst=self.pg1.remote_ip4, src=self.pg0.remote_ip4) / + UDP(sport=10000, dport=20000)) + self.pg0.add_stream(p) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg1.get_capture(1) + capture = capture[0] + self.assertEqual(capture[IPv6].src, b4_ip6) + self.assertEqual(capture[IPv6].dst, aftr_ip6) + self.assertEqual(capture[IP].src, self.pg0.remote_ip4) + self.assertEqual(capture[IP].dst, self.pg1.remote_ip4) + self.assertEqual(capture[UDP].sport, 10000) + self.assertEqual(capture[UDP].dport, 20000) + self.check_ip_checksum(capture) + + # UDP decapsulation + p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) / + IPv6(dst=b4_ip6, src=aftr_ip6) / + IP(dst=self.pg0.remote_ip4, src=self.pg1.remote_ip4) / + UDP(sport=20000, dport=10000)) + self.pg1.add_stream(p) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg0.get_capture(1) + capture = capture[0] + self.assertFalse(capture.haslayer(IPv6)) + self.assertEqual(capture[IP].src, self.pg1.remote_ip4) + self.assertEqual(capture[IP].dst, self.pg0.remote_ip4) + self.assertEqual(capture[UDP].sport, 20000) + self.assertEqual(capture[UDP].dport, 10000) + self.check_ip_checksum(capture) + + # ping DS-Lite B4 tunnel endpoint address + p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) / + IPv6(src=self.pg1.remote_hosts[0].ip6, dst=b4_ip6) / + ICMPv6EchoRequest()) + self.pg1.add_stream(p) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg1.get_capture(1) + self.assertEqual(1, len(capture)) + capture = capture[0] + self.assertEqual(capture[IPv6].src, b4_ip6) + self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[0].ip6) + self.assertTrue(capture.haslayer(ICMPv6EchoReply)) + + def tearDown(self): + super(TestDSliteCE, self).tearDown() + if not self.vpp_dead: + self.logger.info( + self.vapi.cli("show dslite aftr-tunnel-endpoint-address")) + self.logger.info( + self.vapi.cli("show dslite b4-tunnel-endpoint-address")) + if __name__ == '__main__': unittest.main(testRunner=VppTestRunner)