X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=test%2Ftest_nat.py;h=4ced0af46e0f394cce8eacc8fa6e25545d60eafc;hb=cba6936c45bc3265ae695e8266bdefc65e7a5116;hp=1f87bffab577f555a8d6de3f57945a6ba8c26850;hpb=b932d26ea48ba8aa7677dc3b6ffd5d4729176c8f;p=vpp.git diff --git a/test/test_nat.py b/test/test_nat.py index 1f87bffab57..4ced0af46e0 100644 --- a/test/test_nat.py +++ b/test/test_nat.py @@ -134,30 +134,34 @@ class MethodHolder(VppTestCase): self.assertEqual(new['ICMPv6EchoReply'].cksum, pkt['ICMPv6EchoReply'].cksum) - def create_stream_in(self, in_if, out_if, ttl=64): + def create_stream_in(self, in_if, out_if, dst_ip=None, ttl=64): """ Create packet stream for inside network :param in_if: Inside interface :param out_if: Outside interface + :param dst_ip: Destination address :param ttl: TTL of generated packets """ + if dst_ip is None: + dst_ip = out_if.remote_ip4 + pkts = [] # TCP p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) / - IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) / + IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) / TCP(sport=self.tcp_port_in, dport=20)) pkts.append(p) # UDP p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) / - IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) / + IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) / UDP(sport=self.udp_port_in, dport=20)) pkts.append(p) # ICMP p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) / - IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) / + IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) / ICMP(id=self.icmp_id_in, type='echo-request')) pkts.append(p) @@ -206,6 +210,48 @@ class MethodHolder(VppTestCase): pref_n[15] = ip4_n[3] return socket.inet_ntop(socket.AF_INET6, ''.join(pref_n)) + def extract_ip4(self, ip6, plen): + """ + Extract IPv4 address embedded in IPv6 addresses + + :param ip6: IPv6 address + :param plen: IPv6 prefix length + :returns: extracted IPv4 address + """ + ip6_n = list(socket.inet_pton(socket.AF_INET6, ip6)) + ip4_n = [None] * 4 + if plen == 32: + ip4_n[0] = ip6_n[4] + ip4_n[1] = ip6_n[5] + ip4_n[2] = ip6_n[6] + ip4_n[3] = ip6_n[7] + elif plen == 40: + ip4_n[0] = ip6_n[5] + ip4_n[1] = ip6_n[6] + ip4_n[2] = ip6_n[7] + ip4_n[3] = ip6_n[9] + elif plen == 48: + ip4_n[0] = ip6_n[6] + ip4_n[1] = ip6_n[7] + ip4_n[2] = ip6_n[9] + ip4_n[3] = ip6_n[10] + elif plen == 56: + ip4_n[0] = ip6_n[7] + ip4_n[1] = ip6_n[9] + ip4_n[2] = ip6_n[10] + ip4_n[3] = ip6_n[11] + elif plen == 64: + ip4_n[0] = ip6_n[9] + ip4_n[1] = ip6_n[10] + ip4_n[2] = ip6_n[11] + ip4_n[3] = ip6_n[12] + elif plen == 96: + ip4_n[0] = ip6_n[12] + ip4_n[1] = ip6_n[13] + ip4_n[2] = ip6_n[14] + ip4_n[3] = ip6_n[15] + return socket.inet_ntop(socket.AF_INET, ''.join(ip4_n)) + def create_stream_in_ip6(self, in_if, out_if, hlim=64, pref=None, plen=0): """ Create IPv6 packet stream for inside network @@ -242,39 +288,79 @@ class MethodHolder(VppTestCase): return pkts - def create_stream_out(self, out_if, dst_ip=None, ttl=64): + def create_stream_out(self, out_if, dst_ip=None, ttl=64, + use_inside_ports=False): """ Create packet stream for outside network :param out_if: Outside interface :param dst_ip: Destination IP address (Default use global NAT address) :param ttl: TTL of generated packets + :param use_inside_ports: Use inside NAT ports as destination ports + instead of outside ports """ if dst_ip is None: dst_ip = self.nat_addr + if not use_inside_ports: + tcp_port = self.tcp_port_out + udp_port = self.udp_port_out + icmp_id = self.icmp_id_out + else: + tcp_port = self.tcp_port_in + udp_port = self.udp_port_in + icmp_id = self.icmp_id_in pkts = [] # TCP p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) / IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) / - TCP(dport=self.tcp_port_out, sport=20)) + TCP(dport=tcp_port, sport=20)) pkts.append(p) # UDP p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) / IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) / - UDP(dport=self.udp_port_out, sport=20)) + UDP(dport=udp_port, sport=20)) pkts.append(p) # ICMP p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) / IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) / - ICMP(id=self.icmp_id_out, type='echo-reply')) + ICMP(id=icmp_id, type='echo-reply')) + pkts.append(p) + + return pkts + + def create_stream_out_ip6(self, out_if, src_ip, dst_ip, hl=64): + """ + Create packet stream for outside network + + :param out_if: Outside interface + :param dst_ip: Destination IP address (Default use global NAT address) + :param hl: HL of generated packets + """ + pkts = [] + # TCP + p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) / + IPv6(src=src_ip, dst=dst_ip, hlim=hl) / + TCP(dport=self.tcp_port_out, sport=20)) + pkts.append(p) + + # UDP + p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) / + IPv6(src=src_ip, dst=dst_ip, hlim=hl) / + UDP(dport=self.udp_port_out, sport=20)) + pkts.append(p) + + # ICMP + p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) / + IPv6(src=src_ip, dst=dst_ip, hlim=hl) / + ICMPv6EchoReply(id=self.icmp_id_out)) pkts.append(p) return pkts def verify_capture_out(self, capture, nat_ip=None, same_port=False, - packet_num=3, dst_ip=None): + packet_num=3, dst_ip=None, is_ip6=False): """ Verify captured packets on outside network @@ -283,16 +369,24 @@ class MethodHolder(VppTestCase): :param same_port: Sorce port number is not translated (Default False) :param packet_num: Expected number of packets (Default 3) :param dst_ip: Destination IP address (Default do not verify) + :param is_ip6: If L3 protocol is IPv6 (Default False) """ + if is_ip6: + IP46 = IPv6 + ICMP46 = ICMPv6EchoRequest + else: + IP46 = IP + ICMP46 = ICMP if nat_ip is None: nat_ip = self.nat_addr self.assertEqual(packet_num, len(capture)) for packet in capture: try: - self.check_ip_checksum(packet) - self.assertEqual(packet[IP].src, nat_ip) + if not is_ip6: + self.check_ip_checksum(packet) + self.assertEqual(packet[IP46].src, nat_ip) if dst_ip is not None: - self.assertEqual(packet[IP].dst, dst_ip) + self.assertEqual(packet[IP46].dst, dst_ip) if packet.haslayer(TCP): if same_port: self.assertEqual(packet[TCP].sport, self.tcp_port_in) @@ -310,16 +404,33 @@ class MethodHolder(VppTestCase): self.udp_port_out = packet[UDP].sport else: if same_port: - self.assertEqual(packet[ICMP].id, self.icmp_id_in) + self.assertEqual(packet[ICMP46].id, self.icmp_id_in) else: - self.assertNotEqual(packet[ICMP].id, self.icmp_id_in) - self.icmp_id_out = packet[ICMP].id - self.check_icmp_checksum(packet) + self.assertNotEqual(packet[ICMP46].id, self.icmp_id_in) + self.icmp_id_out = packet[ICMP46].id + if is_ip6: + self.check_icmpv6_checksum(packet) + else: + self.check_icmp_checksum(packet) except: self.logger.error(ppp("Unexpected or invalid packet " "(outside network):", packet)) raise + def verify_capture_out_ip6(self, capture, nat_ip, same_port=False, + packet_num=3, dst_ip=None): + """ + Verify captured packets on outside network + + :param capture: Captured packets + :param nat_ip: Translated IP address + :param same_port: Sorce port number is not translated (Default False) + :param packet_num: Expected number of packets (Default 3) + :param dst_ip: Destination IP address (Default do not verify) + """ + return self.verify_capture_out(capture, nat_ip, same_port, packet_num, + dst_ip, True) + def verify_capture_in(self, capture, in_if, packet_num=3): """ Verify captured packets on inside network @@ -654,6 +765,7 @@ class TestNAT44(MethodHolder): cls.icmp_id_in = 6305 cls.icmp_id_out = 6305 cls.nat_addr = '10.0.0.3' + cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr) cls.ipfix_src_port = 4739 cls.ipfix_domain_id = 1 @@ -1044,6 +1156,66 @@ class TestNAT44(MethodHolder): self.verify_capture_out(capture, same_port=True, packet_num=1) self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp) + def test_forwarding(self): + """ NAT44 forwarding test """ + + self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index) + self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index, + is_inside=0) + self.vapi.nat44_forwarding_enable_disable(1) + + real_ip = self.pg0.remote_ip4n + alias_ip = self.nat_addr_n + self.vapi.nat44_add_del_static_mapping(local_ip=real_ip, + external_ip=alias_ip) + + try: + # in2out - static mapping match + + pkts = self.create_stream_out(self.pg1) + self.pg1.add_stream(pkts) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg0.get_capture(len(pkts)) + self.verify_capture_in(capture, self.pg0) + + pkts = self.create_stream_in(self.pg0, self.pg1) + self.pg0.add_stream(pkts) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg1.get_capture(len(pkts)) + self.verify_capture_out(capture, same_port=True) + + # in2out - no static mapping match + + host0 = self.pg0.remote_hosts[0] + self.pg0.remote_hosts[0] = self.pg0.remote_hosts[1] + try: + pkts = self.create_stream_out(self.pg1, + dst_ip=self.pg0.remote_ip4, + use_inside_ports=True) + self.pg1.add_stream(pkts) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg0.get_capture(len(pkts)) + self.verify_capture_in(capture, self.pg0) + + pkts = self.create_stream_in(self.pg0, self.pg1) + self.pg0.add_stream(pkts) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg1.get_capture(len(pkts)) + self.verify_capture_out(capture, nat_ip=self.pg0.remote_ip4, + same_port=True) + finally: + self.pg0.remote_hosts[0] = host0 + + finally: + self.vapi.nat44_forwarding_enable_disable(0) + self.vapi.nat44_add_del_static_mapping(local_ip=real_ip, + external_ip=alias_ip, + is_add=0) + def test_static_in(self): """ 1:1 NAT initialized from inside network """ @@ -3033,6 +3205,123 @@ class TestNAT44(MethodHolder): self.clear_nat44() +class TestNAT44Out2InDPO(MethodHolder): + """ NAT44 Test Cases using out2in DPO """ + + @classmethod + def setUpConstants(cls): + super(TestNAT44Out2InDPO, cls).setUpConstants() + cls.vpp_cmdline.extend(["nat", "{", "out2in dpo", "}"]) + + @classmethod + def setUpClass(cls): + super(TestNAT44Out2InDPO, cls).setUpClass() + + try: + cls.tcp_port_in = 6303 + cls.tcp_port_out = 6303 + cls.udp_port_in = 6304 + cls.udp_port_out = 6304 + cls.icmp_id_in = 6305 + cls.icmp_id_out = 6305 + cls.nat_addr = '10.0.0.3' + cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr) + cls.dst_ip4 = '192.168.70.1' + + cls.create_pg_interfaces(range(2)) + + cls.pg0.admin_up() + cls.pg0.config_ip4() + cls.pg0.resolve_arp() + + cls.pg1.admin_up() + cls.pg1.config_ip6() + cls.pg1.resolve_ndp() + + cls.vapi.ip_add_del_route(is_ipv6=True, dst_address='\x00'*16, + dst_address_length=0, + next_hop_address=cls.pg1.remote_ip6n, + next_hop_sw_if_index=cls.pg1.sw_if_index) + + except Exception: + super(TestNAT44Out2InDPO, cls).tearDownClass() + raise + + def configure_xlat(self): + self.dst_ip6_pfx = '1:2:3::' + self.dst_ip6_pfx_n = socket.inet_pton(socket.AF_INET6, + self.dst_ip6_pfx) + self.dst_ip6_pfx_len = 96 + self.src_ip6_pfx = '4:5:6::' + self.src_ip6_pfx_n = socket.inet_pton(socket.AF_INET6, + self.src_ip6_pfx) + self.src_ip6_pfx_len = 96 + self.vapi.map_add_domain(self.dst_ip6_pfx_n, self.dst_ip6_pfx_len, + self.src_ip6_pfx_n, self.src_ip6_pfx_len, + '\x00\x00\x00\x00', 0, is_translation=1, + is_rfc6052=1) + + def test_464xlat_ce(self): + """ Test 464XLAT CE with NAT44 """ + + self.configure_xlat() + + self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index) + self.vapi.nat44_add_del_address_range(self.nat_addr_n, self.nat_addr_n) + + out_src_ip6 = self.compose_ip6(self.dst_ip4, self.dst_ip6_pfx, + self.dst_ip6_pfx_len) + out_dst_ip6 = self.compose_ip6(self.nat_addr, self.src_ip6_pfx, + self.src_ip6_pfx_len) + + try: + pkts = self.create_stream_in(self.pg0, self.pg1, self.dst_ip4) + self.pg0.add_stream(pkts) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg1.get_capture(len(pkts)) + self.verify_capture_out_ip6(capture, nat_ip=out_dst_ip6, + dst_ip=out_src_ip6) + + pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6, + out_dst_ip6) + self.pg1.add_stream(pkts) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg0.get_capture(len(pkts)) + self.verify_capture_in(capture, self.pg0) + finally: + self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index, + is_add=0) + self.vapi.nat44_add_del_address_range(self.nat_addr_n, + self.nat_addr_n, is_add=0) + + def test_464xlat_ce_no_nat(self): + """ Test 464XLAT CE without NAT44 """ + + self.configure_xlat() + + out_src_ip6 = self.compose_ip6(self.dst_ip4, self.dst_ip6_pfx, + self.dst_ip6_pfx_len) + out_dst_ip6 = self.compose_ip6(self.pg0.remote_ip4, self.src_ip6_pfx, + self.src_ip6_pfx_len) + + pkts = self.create_stream_in(self.pg0, self.pg1, self.dst_ip4) + self.pg0.add_stream(pkts) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg1.get_capture(len(pkts)) + self.verify_capture_out_ip6(capture, dst_ip=out_src_ip6, + nat_ip=out_dst_ip6, same_port=True) + + pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6, out_dst_ip6) + self.pg1.add_stream(pkts) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg0.get_capture(len(pkts)) + self.verify_capture_in(capture, self.pg0) + + class TestDeterministicNAT(MethodHolder): """ Deterministic NAT Test Cases """