X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=test%2Ftest_snat.py;h=43c4269600fe3541691380c071e6ce9d7d52f515;hb=a5e614f76fda9efb7bd1577ec18f9c0407d95d03;hp=0eceaab256b4f6bf41e137bf940ea520dd4aaee5;hpb=6fa74c60ea5930864750d64005b213647f12fe08;p=vpp.git diff --git a/test/test_snat.py b/test/test_snat.py index 0eceaab256b..43c4269600f 100644 --- a/test/test_snat.py +++ b/test/test_snat.py @@ -7,7 +7,9 @@ import struct from framework import VppTestCase, VppTestRunner, running_extended_tests from scapy.layers.inet import IP, TCP, UDP, ICMP from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror -from scapy.layers.l2 import Ether, ARP +from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply +from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6 +from scapy.layers.l2 import Ether, ARP, GRE from scapy.data import IP_PROTOS from scapy.packet import bind_layers from util import ppp @@ -25,6 +27,107 @@ class MethodHolder(VppTestCase): def tearDown(self): super(MethodHolder, self).tearDown() + def check_ip_checksum(self, pkt): + """ + Check IP checksum of the packet + + :param pkt: Packet to check IP checksum + """ + new = pkt.__class__(str(pkt)) + del new['IP'].chksum + new = new.__class__(str(new)) + self.assertEqual(new['IP'].chksum, pkt['IP'].chksum) + + def check_tcp_checksum(self, pkt): + """ + Check TCP checksum in IP packet + + :param pkt: Packet to check TCP checksum + """ + new = pkt.__class__(str(pkt)) + del new['TCP'].chksum + new = new.__class__(str(new)) + self.assertEqual(new['TCP'].chksum, pkt['TCP'].chksum) + + def check_udp_checksum(self, pkt): + """ + Check UDP checksum in IP packet + + :param pkt: Packet to check UDP checksum + """ + new = pkt.__class__(str(pkt)) + del new['UDP'].chksum + new = new.__class__(str(new)) + self.assertEqual(new['UDP'].chksum, pkt['UDP'].chksum) + + def check_icmp_errror_embedded(self, pkt): + """ + Check ICMP error embeded packet checksum + + :param pkt: Packet to check ICMP error embeded packet checksum + """ + if pkt.haslayer(IPerror): + new = pkt.__class__(str(pkt)) + del new['IPerror'].chksum + new = new.__class__(str(new)) + self.assertEqual(new['IPerror'].chksum, pkt['IPerror'].chksum) + + if pkt.haslayer(TCPerror): + new = pkt.__class__(str(pkt)) + del new['TCPerror'].chksum + new = new.__class__(str(new)) + self.assertEqual(new['TCPerror'].chksum, pkt['TCPerror'].chksum) + + if pkt.haslayer(UDPerror): + if pkt['UDPerror'].chksum != 0: + new = pkt.__class__(str(pkt)) + del new['UDPerror'].chksum + new = new.__class__(str(new)) + self.assertEqual(new['UDPerror'].chksum, + pkt['UDPerror'].chksum) + + if pkt.haslayer(ICMPerror): + del new['ICMPerror'].chksum + new = new.__class__(str(new)) + self.assertEqual(new['ICMPerror'].chksum, pkt['ICMPerror'].chksum) + + def check_icmp_checksum(self, pkt): + """ + Check ICMP checksum in IPv4 packet + + :param pkt: Packet to check ICMP checksum + """ + new = pkt.__class__(str(pkt)) + del new['ICMP'].chksum + new = new.__class__(str(new)) + self.assertEqual(new['ICMP'].chksum, pkt['ICMP'].chksum) + if pkt.haslayer(IPerror): + self.check_icmp_errror_embedded(pkt) + + def check_icmpv6_checksum(self, pkt): + """ + Check ICMPv6 checksum in IPv4 packet + + :param pkt: Packet to check ICMPv6 checksum + """ + new = pkt.__class__(str(pkt)) + if pkt.haslayer(ICMPv6DestUnreach): + del new['ICMPv6DestUnreach'].cksum + new = new.__class__(str(new)) + self.assertEqual(new['ICMPv6DestUnreach'].cksum, + pkt['ICMPv6DestUnreach'].cksum) + self.check_icmp_errror_embedded(pkt) + if pkt.haslayer(ICMPv6EchoRequest): + del new['ICMPv6EchoRequest'].cksum + new = new.__class__(str(new)) + self.assertEqual(new['ICMPv6EchoRequest'].cksum, + pkt['ICMPv6EchoRequest'].cksum) + if pkt.haslayer(ICMPv6EchoReply): + del new['ICMPv6EchoReply'].cksum + new = new.__class__(str(new)) + self.assertEqual(new['ICMPv6EchoReply'].cksum, + pkt['ICMPv6EchoReply'].cksum) + def create_stream_in(self, in_if, out_if, ttl=64): """ Create packet stream for inside network @@ -37,13 +140,13 @@ class MethodHolder(VppTestCase): # TCP p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) / IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) / - TCP(sport=self.tcp_port_in)) + TCP(sport=self.tcp_port_in, dport=20)) pkts.append(p) # UDP p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) / IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) / - UDP(sport=self.udp_port_in)) + UDP(sport=self.udp_port_in, dport=20)) pkts.append(p) # ICMP @@ -54,6 +157,85 @@ class MethodHolder(VppTestCase): return pkts + def compose_ip6(self, ip4, pref, plen): + """ + Compose IPv4-embedded IPv6 addresses + + :param ip4: IPv4 address + :param pref: IPv6 prefix + :param plen: IPv6 prefix length + :returns: IPv4-embedded IPv6 addresses + """ + pref_n = list(socket.inet_pton(socket.AF_INET6, pref)) + ip4_n = list(socket.inet_pton(socket.AF_INET, ip4)) + if plen == 32: + pref_n[4] = ip4_n[0] + pref_n[5] = ip4_n[1] + pref_n[6] = ip4_n[2] + pref_n[7] = ip4_n[3] + elif plen == 40: + pref_n[5] = ip4_n[0] + pref_n[6] = ip4_n[1] + pref_n[7] = ip4_n[2] + pref_n[9] = ip4_n[3] + elif plen == 48: + pref_n[6] = ip4_n[0] + pref_n[7] = ip4_n[1] + pref_n[9] = ip4_n[2] + pref_n[10] = ip4_n[3] + elif plen == 56: + pref_n[7] = ip4_n[0] + pref_n[9] = ip4_n[1] + pref_n[10] = ip4_n[2] + pref_n[11] = ip4_n[3] + elif plen == 64: + pref_n[9] = ip4_n[0] + pref_n[10] = ip4_n[1] + pref_n[11] = ip4_n[2] + pref_n[12] = ip4_n[3] + elif plen == 96: + pref_n[12] = ip4_n[0] + pref_n[13] = ip4_n[1] + pref_n[14] = ip4_n[2] + pref_n[15] = ip4_n[3] + return socket.inet_ntop(socket.AF_INET6, ''.join(pref_n)) + + def create_stream_in_ip6(self, in_if, out_if, hlim=64, pref=None, plen=0): + """ + Create IPv6 packet stream for inside network + + :param in_if: Inside interface + :param out_if: Outside interface + :param ttl: Hop Limit of generated packets + :param pref: NAT64 prefix + :param plen: NAT64 prefix length + """ + pkts = [] + if pref is None: + dst = ''.join(['64:ff9b::', out_if.remote_ip4]) + else: + dst = self.compose_ip6(out_if.remote_ip4, pref, plen) + + # TCP + p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) / + IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) / + TCP(sport=self.tcp_port_in, dport=20)) + pkts.append(p) + + # UDP + p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) / + IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) / + UDP(sport=self.udp_port_in, dport=20)) + pkts.append(p) + + # ICMP + p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) / + IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) / + ICMPv6EchoRequest(id=self.icmp_id_in)) + pkts.append(p) + + return pkts + def create_stream_out(self, out_if, dst_ip=None, ttl=64): """ Create packet stream for outside network @@ -68,13 +250,13 @@ class MethodHolder(VppTestCase): # TCP p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) / IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) / - TCP(dport=self.tcp_port_out)) + TCP(dport=self.tcp_port_out, sport=20)) pkts.append(p) # UDP p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) / IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) / - UDP(dport=self.udp_port_out)) + UDP(dport=self.udp_port_out, sport=20)) pkts.append(p) # ICMP @@ -86,7 +268,7 @@ class MethodHolder(VppTestCase): return pkts def verify_capture_out(self, capture, nat_ip=None, same_port=False, - packet_num=3): + packet_num=3, dst_ip=None): """ Verify captured packets on outside network @@ -94,13 +276,17 @@ class MethodHolder(VppTestCase): :param nat_ip: Translated IP address (Default use global SNAT address) :param same_port: Sorce port number is not translated (Default False) :param packet_num: Expected number of packets (Default 3) + :param dst_ip: Destination IP address (Default do not verify) """ if nat_ip is None: nat_ip = self.snat_addr self.assertEqual(packet_num, len(capture)) for packet in capture: try: + self.check_ip_checksum(packet) self.assertEqual(packet[IP].src, nat_ip) + if dst_ip is not None: + self.assertEqual(packet[IP].dst, dst_ip) if packet.haslayer(TCP): if same_port: self.assertEqual(packet[TCP].sport, self.tcp_port_in) @@ -108,6 +294,7 @@ class MethodHolder(VppTestCase): self.assertNotEqual( packet[TCP].sport, self.tcp_port_in) self.tcp_port_out = packet[TCP].sport + self.check_tcp_checksum(packet) elif packet.haslayer(UDP): if same_port: self.assertEqual(packet[UDP].sport, self.udp_port_in) @@ -121,6 +308,7 @@ class MethodHolder(VppTestCase): else: self.assertNotEqual(packet[ICMP].id, self.icmp_id_in) self.icmp_id_out = packet[ICMP].id + self.check_icmp_checksum(packet) except: self.logger.error(ppp("Unexpected or invalid packet " "(outside network):", packet)) @@ -137,13 +325,45 @@ class MethodHolder(VppTestCase): self.assertEqual(packet_num, len(capture)) for packet in capture: try: + self.check_ip_checksum(packet) self.assertEqual(packet[IP].dst, in_if.remote_ip4) if packet.haslayer(TCP): self.assertEqual(packet[TCP].dport, self.tcp_port_in) + self.check_tcp_checksum(packet) elif packet.haslayer(UDP): self.assertEqual(packet[UDP].dport, self.udp_port_in) else: self.assertEqual(packet[ICMP].id, self.icmp_id_in) + self.check_icmp_checksum(packet) + except: + self.logger.error(ppp("Unexpected or invalid packet " + "(inside network):", packet)) + raise + + def verify_capture_in_ip6(self, capture, src_ip, dst_ip, packet_num=3): + """ + Verify captured IPv6 packets on inside network + + :param capture: Captured packets + :param src_ip: Source IP + :param dst_ip: Destination IP address + :param packet_num: Expected number of packets (Default 3) + """ + self.assertEqual(packet_num, len(capture)) + for packet in capture: + try: + self.assertEqual(packet[IPv6].src, src_ip) + self.assertEqual(packet[IPv6].dst, dst_ip) + if packet.haslayer(TCP): + self.assertEqual(packet[TCP].dport, self.tcp_port_in) + self.check_tcp_checksum(packet) + elif packet.haslayer(UDP): + self.assertEqual(packet[UDP].dport, self.udp_port_in) + self.check_udp_checksum(packet) + else: + self.assertEqual(packet[ICMPv6EchoReply].id, + self.icmp_id_in) + self.check_icmpv6_checksum(packet) except: self.logger.error(ppp("Unexpected or invalid packet " "(inside network):", packet)) @@ -395,6 +615,12 @@ class TestSNAT(MethodHolder): intf.is_inside, is_add=0) + interfaces = self.vapi.snat_interface_output_feature_dump() + for intf in interfaces: + self.vapi.snat_interface_add_del_output_feature(intf.sw_if_index, + intf.is_inside, + is_add=0) + static_mappings = self.vapi.snat_static_mapping_dump() for sm in static_mappings: self.vapi.snat_add_static_mapping(sm.local_ip_address, @@ -1050,6 +1276,7 @@ class TestSNAT(MethodHolder): self.assertEqual(ip.dst, server.ip4) self.assertNotEqual(tcp.sport, host_in_port) self.assertEqual(tcp.dport, server_in_port) + self.check_tcp_checksum(p) host_out_port = tcp.sport except: self.logger.error(ppp("Unexpected or invalid packet:", p)) @@ -1071,6 +1298,7 @@ class TestSNAT(MethodHolder): self.assertEqual(ip.dst, host.ip4) self.assertEqual(tcp.sport, server_out_port) self.assertEqual(tcp.dport, host_in_port) + self.check_tcp_checksum(p) except: self.logger.error(ppp("Unexpected or invalid packet:"), p) raise @@ -1121,6 +1349,7 @@ class TestSNAT(MethodHolder): self.assertNotEqual(packet[TCP].sport, self.tcp_port_in) self.assertEqual(packet[TCP].dport, server_tcp_port) self.tcp_port_out = packet[TCP].sport + self.check_tcp_checksum(packet) elif packet.haslayer(UDP): self.assertNotEqual(packet[UDP].sport, self.udp_port_in) self.assertEqual(packet[UDP].dport, server_udp_port) @@ -1157,6 +1386,7 @@ class TestSNAT(MethodHolder): if packet.haslayer(TCP): self.assertEqual(packet[TCP].dport, self.tcp_port_in) self.assertEqual(packet[TCP].sport, server_tcp_port) + self.check_tcp_checksum(packet) elif packet.haslayer(UDP): self.assertEqual(packet[UDP].dport, self.udp_port_in) self.assertEqual(packet[UDP].sport, server_udp_port) @@ -1192,6 +1422,7 @@ class TestSNAT(MethodHolder): self.assertEqual(packet[TCP].sport, self.tcp_port_in) self.assertEqual(packet[TCP].dport, server_tcp_port) self.tcp_port_out = packet[TCP].sport + self.check_tcp_checksum(packet) elif packet.haslayer(UDP): self.assertEqual(packet[UDP].sport, self.udp_port_in) self.assertEqual(packet[UDP].dport, server_udp_port) @@ -1228,6 +1459,7 @@ class TestSNAT(MethodHolder): if packet.haslayer(TCP): self.assertEqual(packet[TCP].dport, self.tcp_port_in) self.assertEqual(packet[TCP].sport, server_tcp_port) + self.check_tcp_checksum(packet) elif packet.haslayer(UDP): self.assertEqual(packet[UDP].dport, self.udp_port_in) self.assertEqual(packet[UDP].sport, server_udp_port) @@ -1658,119 +1890,483 @@ class TestSNAT(MethodHolder): capture = self.pg8.get_capture(len(pkts)) self.verify_capture_out(capture) - def tearDown(self): - super(TestSNAT, self).tearDown() - if not self.vpp_dead: - self.logger.info(self.vapi.cli("show snat verbose")) - self.clear_snat() - + def test_static_unknown_proto(self): + """ 1:1 NAT translate packet with unknown protocol """ + nat_ip = "10.0.0.10" + self.snat_add_static_mapping(self.pg0.remote_ip4, nat_ip) + self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index) + self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index, + is_inside=0) -class TestDeterministicNAT(MethodHolder): - """ Deterministic NAT Test Cases """ + # in2out + p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / + IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / + GRE() / + IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) / + TCP(sport=1234, dport=1234)) + self.pg0.add_stream(p) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + p = self.pg1.get_capture(1) + packet = p[0] + try: + self.assertEqual(packet[IP].src, nat_ip) + self.assertEqual(packet[IP].dst, self.pg1.remote_ip4) + self.assertTrue(packet.haslayer(GRE)) + self.check_ip_checksum(packet) + except: + self.logger.error(ppp("Unexpected or invalid packet:", packet)) + raise - @classmethod - def setUpConstants(cls): - super(TestDeterministicNAT, cls).setUpConstants() - cls.vpp_cmdline.extend(["snat", "{", "deterministic", "}"]) + # out2in + p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) / + IP(src=self.pg1.remote_ip4, dst=nat_ip) / + GRE() / + IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) / + TCP(sport=1234, dport=1234)) + self.pg1.add_stream(p) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + p = self.pg0.get_capture(1) + packet = p[0] + try: + self.assertEqual(packet[IP].src, self.pg1.remote_ip4) + self.assertEqual(packet[IP].dst, self.pg0.remote_ip4) + self.assertTrue(packet.haslayer(GRE)) + self.check_ip_checksum(packet) + except: + self.logger.error(ppp("Unexpected or invalid packet:", packet)) + raise - @classmethod - def setUpClass(cls): - super(TestDeterministicNAT, cls).setUpClass() + def test_hairpinning_static_unknown_proto(self): + """ 1:1 NAT translate packet with unknown protocol - hairpinning """ - try: - cls.tcp_port_in = 6303 - cls.tcp_external_port = 6303 - cls.udp_port_in = 6304 - cls.udp_external_port = 6304 - cls.icmp_id_in = 6305 - cls.snat_addr = '10.0.0.3' + host = self.pg0.remote_hosts[0] + server = self.pg0.remote_hosts[1] - cls.create_pg_interfaces(range(3)) - cls.interfaces = list(cls.pg_interfaces) + host_nat_ip = "10.0.0.10" + server_nat_ip = "10.0.0.11" - for i in cls.interfaces: - i.admin_up() - i.config_ip4() - i.resolve_arp() + self.snat_add_static_mapping(host.ip4, host_nat_ip) + self.snat_add_static_mapping(server.ip4, server_nat_ip) + self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index) + self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index, + is_inside=0) - cls.pg0.generate_remote_hosts(2) - cls.pg0.configure_ipv4_neighbors() + # host to server + p = (Ether(dst=self.pg0.local_mac, src=host.mac) / + IP(src=host.ip4, dst=server_nat_ip) / + GRE() / + IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) / + TCP(sport=1234, dport=1234)) + self.pg0.add_stream(p) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + p = self.pg0.get_capture(1) + packet = p[0] + try: + self.assertEqual(packet[IP].src, host_nat_ip) + self.assertEqual(packet[IP].dst, server.ip4) + self.assertTrue(packet.haslayer(GRE)) + self.check_ip_checksum(packet) + except: + self.logger.error(ppp("Unexpected or invalid packet:", packet)) + raise - except Exception: - super(TestDeterministicNAT, cls).tearDownClass() + # server to host + p = (Ether(dst=self.pg0.local_mac, src=server.mac) / + IP(src=server.ip4, dst=host_nat_ip) / + GRE() / + IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) / + TCP(sport=1234, dport=1234)) + self.pg0.add_stream(p) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + p = self.pg0.get_capture(1) + packet = p[0] + try: + self.assertEqual(packet[IP].src, server_nat_ip) + self.assertEqual(packet[IP].dst, host.ip4) + self.assertTrue(packet.haslayer(GRE)) + self.check_ip_checksum(packet) + except: + self.logger.error(ppp("Unexpected or invalid packet:", packet)) raise - def create_stream_in(self, in_if, out_if, ttl=64): - """ - Create packet stream for inside network + def test_unknown_proto(self): + """ SNAT translate packet with unknown protocol """ + self.snat_add_address(self.snat_addr) + self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index) + self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index, + is_inside=0) - :param in_if: Inside interface - :param out_if: Outside interface - :param ttl: TTL of generated packets - """ - pkts = [] - # TCP - p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) / - IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) / - TCP(sport=self.tcp_port_in, dport=self.tcp_external_port)) - pkts.append(p) + # in2out + p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / + IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / + TCP(sport=self.tcp_port_in, dport=20)) + self.pg0.add_stream(p) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + p = self.pg1.get_capture(1) - # UDP - p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) / - IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) / - UDP(sport=self.udp_port_in, dport=self.udp_external_port)) - pkts.append(p) + p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / + IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / + GRE() / + IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) / + TCP(sport=1234, dport=1234)) + self.pg0.add_stream(p) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + p = self.pg1.get_capture(1) + packet = p[0] + try: + self.assertEqual(packet[IP].src, self.snat_addr) + self.assertEqual(packet[IP].dst, self.pg1.remote_ip4) + self.assertTrue(packet.haslayer(GRE)) + self.check_ip_checksum(packet) + except: + self.logger.error(ppp("Unexpected or invalid packet:", packet)) + raise - # ICMP - p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) / - IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) / - ICMP(id=self.icmp_id_in, type='echo-request')) - pkts.append(p) + # out2in + p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) / + IP(src=self.pg1.remote_ip4, dst=self.snat_addr) / + GRE() / + IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) / + TCP(sport=1234, dport=1234)) + self.pg1.add_stream(p) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + p = self.pg0.get_capture(1) + packet = p[0] + try: + self.assertEqual(packet[IP].src, self.pg1.remote_ip4) + self.assertEqual(packet[IP].dst, self.pg0.remote_ip4) + self.assertTrue(packet.haslayer(GRE)) + self.check_ip_checksum(packet) + except: + self.logger.error(ppp("Unexpected or invalid packet:", packet)) + raise - return pkts + def test_hairpinning_unknown_proto(self): + """ SNAT translate packet with unknown protocol - hairpinning """ + host = self.pg0.remote_hosts[0] + server = self.pg0.remote_hosts[1] + host_in_port = 1234 + host_out_port = 0 + server_in_port = 5678 + server_out_port = 8765 + server_nat_ip = "10.0.0.11" - def create_stream_out(self, out_if, dst_ip=None, ttl=64): - """ - Create packet stream for outside network + self.snat_add_address(self.snat_addr) + self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index) + self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index, + is_inside=0) - :param out_if: Outside interface - :param dst_ip: Destination IP address (Default use global SNAT address) - :param ttl: TTL of generated packets - """ - if dst_ip is None: - dst_ip = self.snat_addr - pkts = [] - # TCP - p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) / - IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) / - TCP(dport=self.tcp_port_out, sport=self.tcp_external_port)) - pkts.append(p) + # add static mapping for server + self.snat_add_static_mapping(server.ip4, server_nat_ip) - # UDP - p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) / - IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) / - UDP(dport=self.udp_port_out, sport=self.udp_external_port)) - pkts.append(p) + # host to server + p = (Ether(src=host.mac, dst=self.pg0.local_mac) / + IP(src=host.ip4, dst=server_nat_ip) / + TCP(sport=host_in_port, dport=server_out_port)) + self.pg0.add_stream(p) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg0.get_capture(1) - # ICMP - p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) / - IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) / - ICMP(id=self.icmp_external_id, type='echo-reply')) - pkts.append(p) + p = (Ether(dst=self.pg0.local_mac, src=host.mac) / + IP(src=host.ip4, dst=server_nat_ip) / + GRE() / + IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) / + TCP(sport=1234, dport=1234)) + self.pg0.add_stream(p) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + p = self.pg0.get_capture(1) + packet = p[0] + try: + self.assertEqual(packet[IP].src, self.snat_addr) + self.assertEqual(packet[IP].dst, server.ip4) + self.assertTrue(packet.haslayer(GRE)) + self.check_ip_checksum(packet) + except: + self.logger.error(ppp("Unexpected or invalid packet:", packet)) + raise - return pkts + # server to host + p = (Ether(dst=self.pg0.local_mac, src=server.mac) / + IP(src=server.ip4, dst=self.snat_addr) / + GRE() / + IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) / + TCP(sport=1234, dport=1234)) + self.pg0.add_stream(p) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + p = self.pg0.get_capture(1) + packet = p[0] + try: + self.assertEqual(packet[IP].src, server_nat_ip) + self.assertEqual(packet[IP].dst, host.ip4) + self.assertTrue(packet.haslayer(GRE)) + self.check_ip_checksum(packet) + except: + self.logger.error(ppp("Unexpected or invalid packet:", packet)) + raise - def verify_capture_out(self, capture, nat_ip=None, packet_num=3): - """ - Verify captured packets on outside network + def test_output_feature(self): + """ S-NAT interface output feature (in2out postrouting) """ + self.snat_add_address(self.snat_addr) + self.vapi.snat_interface_add_del_output_feature(self.pg0.sw_if_index) + self.vapi.snat_interface_add_del_output_feature(self.pg1.sw_if_index, + is_inside=0) - :param capture: Captured packets - :param nat_ip: Translated IP address (Default use global SNAT address) - :param same_port: Sorce port number is not translated (Default False) - :param packet_num: Expected number of packets (Default 3) - """ - if nat_ip is None: - nat_ip = self.snat_addr + # in2out + pkts = self.create_stream_in(self.pg0, self.pg1) + self.pg0.add_stream(pkts) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg1.get_capture(len(pkts)) + self.verify_capture_out(capture) + + # out2in + pkts = self.create_stream_out(self.pg1) + self.pg1.add_stream(pkts) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg0.get_capture(len(pkts)) + self.verify_capture_in(capture, self.pg0) + + def test_output_feature_vrf_aware(self): + """ S-NAT interface output feature VRF aware (in2out postrouting) """ + nat_ip_vrf10 = "10.0.0.10" + nat_ip_vrf20 = "10.0.0.20" + + self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n, + dst_address_length=32, + next_hop_address=self.pg3.remote_ip4n, + next_hop_sw_if_index=self.pg3.sw_if_index, + table_id=10) + self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n, + dst_address_length=32, + next_hop_address=self.pg3.remote_ip4n, + next_hop_sw_if_index=self.pg3.sw_if_index, + table_id=20) + + self.snat_add_address(nat_ip_vrf10, vrf_id=10) + self.snat_add_address(nat_ip_vrf20, vrf_id=20) + self.vapi.snat_interface_add_del_output_feature(self.pg4.sw_if_index) + self.vapi.snat_interface_add_del_output_feature(self.pg6.sw_if_index) + self.vapi.snat_interface_add_del_output_feature(self.pg3.sw_if_index, + is_inside=0) + + # in2out VRF 10 + pkts = self.create_stream_in(self.pg4, self.pg3) + self.pg4.add_stream(pkts) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg3.get_capture(len(pkts)) + self.verify_capture_out(capture, nat_ip=nat_ip_vrf10) + + # out2in VRF 10 + pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf10) + self.pg3.add_stream(pkts) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg4.get_capture(len(pkts)) + self.verify_capture_in(capture, self.pg4) + + # in2out VRF 20 + pkts = self.create_stream_in(self.pg6, self.pg3) + self.pg6.add_stream(pkts) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg3.get_capture(len(pkts)) + self.verify_capture_out(capture, nat_ip=nat_ip_vrf20) + + # out2in VRF 20 + pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf20) + self.pg3.add_stream(pkts) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg6.get_capture(len(pkts)) + self.verify_capture_in(capture, self.pg6) + + def test_output_feature_hairpinning(self): + """ S-NAT interface output feature hairpinning (in2out postrouting) """ + host = self.pg0.remote_hosts[0] + server = self.pg0.remote_hosts[1] + host_in_port = 1234 + host_out_port = 0 + server_in_port = 5678 + server_out_port = 8765 + + self.snat_add_address(self.snat_addr) + self.vapi.snat_interface_add_del_output_feature(self.pg0.sw_if_index) + self.vapi.snat_interface_add_del_output_feature(self.pg1.sw_if_index, + is_inside=0) + + # add static mapping for server + self.snat_add_static_mapping(server.ip4, self.snat_addr, + server_in_port, server_out_port, + proto=IP_PROTOS.tcp) + + # send packet from host to server + p = (Ether(src=host.mac, dst=self.pg0.local_mac) / + IP(src=host.ip4, dst=self.snat_addr) / + TCP(sport=host_in_port, dport=server_out_port)) + self.pg0.add_stream(p) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg0.get_capture(1) + p = capture[0] + try: + ip = p[IP] + tcp = p[TCP] + self.assertEqual(ip.src, self.snat_addr) + self.assertEqual(ip.dst, server.ip4) + self.assertNotEqual(tcp.sport, host_in_port) + self.assertEqual(tcp.dport, server_in_port) + self.check_tcp_checksum(p) + host_out_port = tcp.sport + except: + self.logger.error(ppp("Unexpected or invalid packet:", p)) + raise + + # send reply from server to host + p = (Ether(src=server.mac, dst=self.pg0.local_mac) / + IP(src=server.ip4, dst=self.snat_addr) / + TCP(sport=server_in_port, dport=host_out_port)) + self.pg0.add_stream(p) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg0.get_capture(1) + p = capture[0] + try: + ip = p[IP] + tcp = p[TCP] + self.assertEqual(ip.src, self.snat_addr) + self.assertEqual(ip.dst, host.ip4) + self.assertEqual(tcp.sport, server_out_port) + self.assertEqual(tcp.dport, host_in_port) + self.check_tcp_checksum(p) + except: + self.logger.error(ppp("Unexpected or invalid packet:"), p) + raise + + def tearDown(self): + super(TestSNAT, self).tearDown() + if not self.vpp_dead: + self.logger.info(self.vapi.cli("show snat verbose")) + self.clear_snat() + + +class TestDeterministicNAT(MethodHolder): + """ Deterministic NAT Test Cases """ + + @classmethod + def setUpConstants(cls): + super(TestDeterministicNAT, cls).setUpConstants() + cls.vpp_cmdline.extend(["snat", "{", "deterministic", "}"]) + + @classmethod + def setUpClass(cls): + super(TestDeterministicNAT, cls).setUpClass() + + try: + cls.tcp_port_in = 6303 + cls.tcp_external_port = 6303 + cls.udp_port_in = 6304 + cls.udp_external_port = 6304 + cls.icmp_id_in = 6305 + cls.snat_addr = '10.0.0.3' + + cls.create_pg_interfaces(range(3)) + cls.interfaces = list(cls.pg_interfaces) + + for i in cls.interfaces: + i.admin_up() + i.config_ip4() + i.resolve_arp() + + cls.pg0.generate_remote_hosts(2) + cls.pg0.configure_ipv4_neighbors() + + except Exception: + super(TestDeterministicNAT, cls).tearDownClass() + raise + + def create_stream_in(self, in_if, out_if, ttl=64): + """ + Create packet stream for inside network + + :param in_if: Inside interface + :param out_if: Outside interface + :param ttl: TTL of generated packets + """ + pkts = [] + # TCP + p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) / + IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) / + TCP(sport=self.tcp_port_in, dport=self.tcp_external_port)) + pkts.append(p) + + # UDP + p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) / + IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) / + UDP(sport=self.udp_port_in, dport=self.udp_external_port)) + pkts.append(p) + + # ICMP + p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) / + IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) / + ICMP(id=self.icmp_id_in, type='echo-request')) + pkts.append(p) + + return pkts + + def create_stream_out(self, out_if, dst_ip=None, ttl=64): + """ + Create packet stream for outside network + + :param out_if: Outside interface + :param dst_ip: Destination IP address (Default use global SNAT address) + :param ttl: TTL of generated packets + """ + if dst_ip is None: + dst_ip = self.snat_addr + pkts = [] + # TCP + p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) / + IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) / + TCP(dport=self.tcp_port_out, sport=self.tcp_external_port)) + pkts.append(p) + + # UDP + p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) / + IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) / + UDP(dport=self.udp_port_out, sport=self.udp_external_port)) + pkts.append(p) + + # ICMP + p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) / + IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) / + ICMP(id=self.icmp_external_id, type='echo-reply')) + pkts.append(p) + + return pkts + + def verify_capture_out(self, capture, nat_ip=None, packet_num=3): + """ + Verify captured packets on outside network + + :param capture: Captured packets + :param nat_ip: Translated IP address (Default use global SNAT address) + :param same_port: Sorce port number is not translated (Default False) + :param packet_num: Expected number of packets (Default 3) + """ + if nat_ip is None: + nat_ip = self.snat_addr self.assertEqual(packet_num, len(capture)) for packet in capture: try: @@ -2213,6 +2809,7 @@ class TestDeterministicNAT(MethodHolder): dms = self.vapi.snat_det_map_dump() self.assertEqual(0, dms[0].ses_num) + @unittest.skipUnless(running_extended_tests(), "part of extended tests") def test_session_limit_per_user(self): """ CGNAT maximum 1000 sessions per user should be created """ self.vapi.snat_add_det_map(self.pg0.remote_ip4n, @@ -2266,6 +2863,7 @@ class TestDeterministicNAT(MethodHolder): # verify IPFIX logging self.vapi.cli("ipfix flush") # FIXME this should be an API call + sleep(1) capture = self.pg2.get_capture(2) ipfix = IPFIXDecoder() # first load template @@ -2305,5 +2903,809 @@ class TestDeterministicNAT(MethodHolder): self.logger.info(self.vapi.cli("show snat detail")) self.clear_snat() + +class TestNAT64(MethodHolder): + """ NAT64 Test Cases """ + + @classmethod + def setUpClass(cls): + super(TestNAT64, cls).setUpClass() + + try: + cls.tcp_port_in = 6303 + cls.tcp_port_out = 6303 + cls.udp_port_in = 6304 + cls.udp_port_out = 6304 + cls.icmp_id_in = 6305 + cls.icmp_id_out = 6305 + cls.nat_addr = '10.0.0.3' + cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr) + cls.vrf1_id = 10 + cls.vrf1_nat_addr = '10.0.10.3' + cls.vrf1_nat_addr_n = socket.inet_pton(socket.AF_INET, + cls.vrf1_nat_addr) + + cls.create_pg_interfaces(range(3)) + cls.ip6_interfaces = list(cls.pg_interfaces[0:1]) + cls.ip6_interfaces.append(cls.pg_interfaces[2]) + cls.ip4_interfaces = list(cls.pg_interfaces[1:2]) + + cls.pg_interfaces[2].set_table_ip6(cls.vrf1_id) + + cls.pg0.generate_remote_hosts(2) + + for i in cls.ip6_interfaces: + i.admin_up() + i.config_ip6() + i.configure_ipv6_neighbors() + + for i in cls.ip4_interfaces: + i.admin_up() + i.config_ip4() + i.resolve_arp() + + except Exception: + super(TestNAT64, cls).tearDownClass() + raise + + def test_pool(self): + """ Add/delete address to NAT64 pool """ + nat_addr = socket.inet_pton(socket.AF_INET, '1.2.3.4') + + self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr) + + addresses = self.vapi.nat64_pool_addr_dump() + self.assertEqual(len(addresses), 1) + self.assertEqual(addresses[0].address, nat_addr) + + self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr, is_add=0) + + addresses = self.vapi.nat64_pool_addr_dump() + self.assertEqual(len(addresses), 0) + + def test_interface(self): + """ Enable/disable NAT64 feature on the interface """ + self.vapi.nat64_add_del_interface(self.pg0.sw_if_index) + self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0) + + interfaces = self.vapi.nat64_interface_dump() + self.assertEqual(len(interfaces), 2) + pg0_found = False + pg1_found = False + for intf in interfaces: + if intf.sw_if_index == self.pg0.sw_if_index: + self.assertEqual(intf.is_inside, 1) + pg0_found = True + elif intf.sw_if_index == self.pg1.sw_if_index: + self.assertEqual(intf.is_inside, 0) + pg1_found = True + self.assertTrue(pg0_found) + self.assertTrue(pg1_found) + + features = self.vapi.cli("show interface features pg0") + self.assertNotEqual(features.find('nat64-in2out'), -1) + features = self.vapi.cli("show interface features pg1") + self.assertNotEqual(features.find('nat64-out2in'), -1) + + self.vapi.nat64_add_del_interface(self.pg0.sw_if_index, is_add=0) + self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_add=0) + + interfaces = self.vapi.nat64_interface_dump() + self.assertEqual(len(interfaces), 0) + + def test_static_bib(self): + """ Add/delete static BIB entry """ + in_addr = socket.inet_pton(socket.AF_INET6, + '2001:db8:85a3::8a2e:370:7334') + out_addr = socket.inet_pton(socket.AF_INET, '10.1.1.3') + in_port = 1234 + out_port = 5678 + proto = IP_PROTOS.tcp + + self.vapi.nat64_add_del_static_bib(in_addr, + out_addr, + in_port, + out_port, + proto) + bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp) + static_bib_num = 0 + for bibe in bib: + if bibe.is_static: + static_bib_num += 1 + self.assertEqual(bibe.i_addr, in_addr) + self.assertEqual(bibe.o_addr, out_addr) + self.assertEqual(bibe.i_port, in_port) + self.assertEqual(bibe.o_port, out_port) + self.assertEqual(static_bib_num, 1) + + self.vapi.nat64_add_del_static_bib(in_addr, + out_addr, + in_port, + out_port, + proto, + is_add=0) + bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp) + static_bib_num = 0 + for bibe in bib: + if bibe.is_static: + static_bib_num += 1 + self.assertEqual(static_bib_num, 0) + + def test_set_timeouts(self): + """ Set NAT64 timeouts """ + # verify default values + timeouts = self.vapi.nat64_get_timeouts() + self.assertEqual(timeouts.udp, 300) + self.assertEqual(timeouts.icmp, 60) + self.assertEqual(timeouts.tcp_trans, 240) + self.assertEqual(timeouts.tcp_est, 7440) + self.assertEqual(timeouts.tcp_incoming_syn, 6) + + # set and verify custom values + self.vapi.nat64_set_timeouts(udp=200, icmp=30, tcp_trans=250, + tcp_est=7450, tcp_incoming_syn=10) + timeouts = self.vapi.nat64_get_timeouts() + self.assertEqual(timeouts.udp, 200) + self.assertEqual(timeouts.icmp, 30) + self.assertEqual(timeouts.tcp_trans, 250) + self.assertEqual(timeouts.tcp_est, 7450) + self.assertEqual(timeouts.tcp_incoming_syn, 10) + + def test_dynamic(self): + """ NAT64 dynamic translation test """ + self.tcp_port_in = 6303 + self.udp_port_in = 6304 + self.icmp_id_in = 6305 + + ses_num_start = self.nat64_get_ses_num() + + self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n, + self.nat_addr_n) + self.vapi.nat64_add_del_interface(self.pg0.sw_if_index) + self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0) + + # in2out + pkts = self.create_stream_in_ip6(self.pg0, self.pg1) + self.pg0.add_stream(pkts) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg1.get_capture(len(pkts)) + self.verify_capture_out(capture, nat_ip=self.nat_addr, + dst_ip=self.pg1.remote_ip4) + + # out2in + pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr) + self.pg1.add_stream(pkts) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg0.get_capture(len(pkts)) + ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4])) + self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6) + + # in2out + pkts = self.create_stream_in_ip6(self.pg0, self.pg1) + self.pg0.add_stream(pkts) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg1.get_capture(len(pkts)) + self.verify_capture_out(capture, nat_ip=self.nat_addr, + dst_ip=self.pg1.remote_ip4) + + # out2in + pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr) + self.pg1.add_stream(pkts) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg0.get_capture(len(pkts)) + self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6) + + ses_num_end = self.nat64_get_ses_num() + + self.assertEqual(ses_num_end - ses_num_start, 3) + + # tenant with specific VRF + self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n, + self.vrf1_nat_addr_n, + vrf_id=self.vrf1_id) + self.vapi.nat64_add_del_interface(self.pg2.sw_if_index) + + pkts = self.create_stream_in_ip6(self.pg2, self.pg1) + self.pg2.add_stream(pkts) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg1.get_capture(len(pkts)) + self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr, + dst_ip=self.pg1.remote_ip4) + + pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr) + self.pg1.add_stream(pkts) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg2.get_capture(len(pkts)) + self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg2.remote_ip6) + + def test_static(self): + """ NAT64 static translation test """ + self.tcp_port_in = 60303 + self.udp_port_in = 60304 + self.icmp_id_in = 60305 + self.tcp_port_out = 60303 + self.udp_port_out = 60304 + self.icmp_id_out = 60305 + + ses_num_start = self.nat64_get_ses_num() + + self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n, + self.nat_addr_n) + self.vapi.nat64_add_del_interface(self.pg0.sw_if_index) + self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0) + + self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n, + self.nat_addr_n, + self.tcp_port_in, + self.tcp_port_out, + IP_PROTOS.tcp) + self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n, + self.nat_addr_n, + self.udp_port_in, + self.udp_port_out, + IP_PROTOS.udp) + self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n, + self.nat_addr_n, + self.icmp_id_in, + self.icmp_id_out, + IP_PROTOS.icmp) + + # in2out + pkts = self.create_stream_in_ip6(self.pg0, self.pg1) + self.pg0.add_stream(pkts) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg1.get_capture(len(pkts)) + self.verify_capture_out(capture, nat_ip=self.nat_addr, + dst_ip=self.pg1.remote_ip4, same_port=True) + + # out2in + pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr) + self.pg1.add_stream(pkts) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg0.get_capture(len(pkts)) + ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4])) + self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6) + + ses_num_end = self.nat64_get_ses_num() + + self.assertEqual(ses_num_end - ses_num_start, 3) + + @unittest.skipUnless(running_extended_tests(), "part of extended tests") + def test_session_timeout(self): + """ NAT64 session timeout """ + self.icmp_id_in = 1234 + self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n, + self.nat_addr_n) + self.vapi.nat64_add_del_interface(self.pg0.sw_if_index) + self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0) + self.vapi.nat64_set_timeouts(icmp=5) + + pkts = self.create_stream_in_ip6(self.pg0, self.pg1) + self.pg0.add_stream(pkts) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg1.get_capture(len(pkts)) + + ses_num_before_timeout = self.nat64_get_ses_num() + + sleep(15) + + # ICMP session after timeout + ses_num_after_timeout = self.nat64_get_ses_num() + self.assertNotEqual(ses_num_before_timeout, ses_num_after_timeout) + + def test_icmp_error(self): + """ NAT64 ICMP Error message translation """ + self.tcp_port_in = 6303 + self.udp_port_in = 6304 + self.icmp_id_in = 6305 + + ses_num_start = self.nat64_get_ses_num() + + self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n, + self.nat_addr_n) + self.vapi.nat64_add_del_interface(self.pg0.sw_if_index) + self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0) + + # send some packets to create sessions + pkts = self.create_stream_in_ip6(self.pg0, self.pg1) + self.pg0.add_stream(pkts) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture_ip4 = self.pg1.get_capture(len(pkts)) + self.verify_capture_out(capture_ip4, + nat_ip=self.nat_addr, + dst_ip=self.pg1.remote_ip4) + + pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr) + self.pg1.add_stream(pkts) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture_ip6 = self.pg0.get_capture(len(pkts)) + ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4])) + self.verify_capture_in_ip6(capture_ip6, ip[IPv6].src, + self.pg0.remote_ip6) + + # in2out + pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / + IPv6(src=self.pg0.remote_ip6, dst=ip[IPv6].src) / + ICMPv6DestUnreach(code=1) / + packet[IPv6] for packet in capture_ip6] + self.pg0.add_stream(pkts) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg1.get_capture(len(pkts)) + for packet in capture: + try: + self.assertEqual(packet[IP].src, self.nat_addr) + self.assertEqual(packet[IP].dst, self.pg1.remote_ip4) + self.assertEqual(packet[ICMP].type, 3) + self.assertEqual(packet[ICMP].code, 13) + inner = packet[IPerror] + self.assertEqual(inner.src, self.pg1.remote_ip4) + self.assertEqual(inner.dst, self.nat_addr) + self.check_icmp_checksum(packet) + if inner.haslayer(TCPerror): + self.assertEqual(inner[TCPerror].dport, self.tcp_port_out) + elif inner.haslayer(UDPerror): + self.assertEqual(inner[UDPerror].dport, self.udp_port_out) + else: + self.assertEqual(inner[ICMPerror].id, self.icmp_id_out) + except: + self.logger.error(ppp("Unexpected or invalid packet:", packet)) + raise + + # out2in + pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) / + IP(src=self.pg1.remote_ip4, dst=self.nat_addr) / + ICMP(type=3, code=13) / + packet[IP] for packet in capture_ip4] + self.pg1.add_stream(pkts) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg0.get_capture(len(pkts)) + for packet in capture: + try: + self.assertEqual(packet[IPv6].src, ip.src) + self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6) + icmp = packet[ICMPv6DestUnreach] + self.assertEqual(icmp.code, 1) + inner = icmp[IPerror6] + self.assertEqual(inner.src, self.pg0.remote_ip6) + self.assertEqual(inner.dst, ip.src) + self.check_icmpv6_checksum(packet) + if inner.haslayer(TCPerror): + self.assertEqual(inner[TCPerror].sport, self.tcp_port_in) + elif inner.haslayer(UDPerror): + self.assertEqual(inner[UDPerror].sport, self.udp_port_in) + else: + self.assertEqual(inner[ICMPv6EchoRequest].id, + self.icmp_id_in) + except: + self.logger.error(ppp("Unexpected or invalid packet:", packet)) + raise + + def test_hairpinning(self): + """ NAT64 hairpinning """ + + client = self.pg0.remote_hosts[0] + server = self.pg0.remote_hosts[1] + server_tcp_in_port = 22 + server_tcp_out_port = 4022 + server_udp_in_port = 23 + server_udp_out_port = 4023 + client_tcp_in_port = 1234 + client_udp_in_port = 1235 + client_tcp_out_port = 0 + client_udp_out_port = 0 + ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr])) + nat_addr_ip6 = ip.src + + self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n, + self.nat_addr_n) + self.vapi.nat64_add_del_interface(self.pg0.sw_if_index) + self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0) + + self.vapi.nat64_add_del_static_bib(server.ip6n, + self.nat_addr_n, + server_tcp_in_port, + server_tcp_out_port, + IP_PROTOS.tcp) + self.vapi.nat64_add_del_static_bib(server.ip6n, + self.nat_addr_n, + server_udp_in_port, + server_udp_out_port, + IP_PROTOS.udp) + + # client to server + pkts = [] + p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / + IPv6(src=client.ip6, dst=nat_addr_ip6) / + TCP(sport=client_tcp_in_port, dport=server_tcp_out_port)) + pkts.append(p) + p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / + IPv6(src=client.ip6, dst=nat_addr_ip6) / + UDP(sport=client_udp_in_port, dport=server_udp_out_port)) + pkts.append(p) + self.pg0.add_stream(pkts) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg0.get_capture(len(pkts)) + for packet in capture: + try: + self.assertEqual(packet[IPv6].src, nat_addr_ip6) + self.assertEqual(packet[IPv6].dst, server.ip6) + if packet.haslayer(TCP): + self.assertNotEqual(packet[TCP].sport, client_tcp_in_port) + self.assertEqual(packet[TCP].dport, server_tcp_in_port) + self.check_tcp_checksum(packet) + client_tcp_out_port = packet[TCP].sport + else: + self.assertNotEqual(packet[UDP].sport, client_udp_in_port) + self.assertEqual(packet[UDP].dport, server_udp_in_port) + self.check_udp_checksum(packet) + client_udp_out_port = packet[UDP].sport + except: + self.logger.error(ppp("Unexpected or invalid packet:", packet)) + raise + + # server to client + pkts = [] + p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / + IPv6(src=server.ip6, dst=nat_addr_ip6) / + TCP(sport=server_tcp_in_port, dport=client_tcp_out_port)) + pkts.append(p) + p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / + IPv6(src=server.ip6, dst=nat_addr_ip6) / + UDP(sport=server_udp_in_port, dport=client_udp_out_port)) + pkts.append(p) + self.pg0.add_stream(pkts) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg0.get_capture(len(pkts)) + for packet in capture: + try: + self.assertEqual(packet[IPv6].src, nat_addr_ip6) + self.assertEqual(packet[IPv6].dst, client.ip6) + if packet.haslayer(TCP): + self.assertEqual(packet[TCP].sport, server_tcp_out_port) + self.assertEqual(packet[TCP].dport, client_tcp_in_port) + self.check_tcp_checksum(packet) + else: + self.assertEqual(packet[UDP].sport, server_udp_out_port) + self.assertEqual(packet[UDP].dport, client_udp_in_port) + self.check_udp_checksum(packet) + except: + self.logger.error(ppp("Unexpected or invalid packet:", packet)) + raise + + # ICMP error + pkts = [] + pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / + IPv6(src=client.ip6, dst=nat_addr_ip6) / + ICMPv6DestUnreach(code=1) / + packet[IPv6] for packet in capture] + self.pg0.add_stream(pkts) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg0.get_capture(len(pkts)) + for packet in capture: + try: + self.assertEqual(packet[IPv6].src, nat_addr_ip6) + self.assertEqual(packet[IPv6].dst, server.ip6) + icmp = packet[ICMPv6DestUnreach] + self.assertEqual(icmp.code, 1) + inner = icmp[IPerror6] + self.assertEqual(inner.src, server.ip6) + self.assertEqual(inner.dst, nat_addr_ip6) + self.check_icmpv6_checksum(packet) + if inner.haslayer(TCPerror): + self.assertEqual(inner[TCPerror].sport, server_tcp_in_port) + self.assertEqual(inner[TCPerror].dport, + client_tcp_out_port) + else: + self.assertEqual(inner[UDPerror].sport, server_udp_in_port) + self.assertEqual(inner[UDPerror].dport, + client_udp_out_port) + except: + self.logger.error(ppp("Unexpected or invalid packet:", packet)) + raise + + def test_prefix(self): + """ NAT64 Network-Specific Prefix """ + + self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n, + self.nat_addr_n) + self.vapi.nat64_add_del_interface(self.pg0.sw_if_index) + self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0) + self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n, + self.vrf1_nat_addr_n, + vrf_id=self.vrf1_id) + self.vapi.nat64_add_del_interface(self.pg2.sw_if_index) + + # Add global prefix + global_pref64 = "2001:db8::" + global_pref64_n = socket.inet_pton(socket.AF_INET6, global_pref64) + global_pref64_len = 32 + self.vapi.nat64_add_del_prefix(global_pref64_n, global_pref64_len) + + prefix = self.vapi.nat64_prefix_dump() + self.assertEqual(len(prefix), 1) + self.assertEqual(prefix[0].prefix, global_pref64_n) + self.assertEqual(prefix[0].prefix_len, global_pref64_len) + self.assertEqual(prefix[0].vrf_id, 0) + + # Add tenant specific prefix + vrf1_pref64 = "2001:db8:122:300::" + vrf1_pref64_n = socket.inet_pton(socket.AF_INET6, vrf1_pref64) + vrf1_pref64_len = 56 + self.vapi.nat64_add_del_prefix(vrf1_pref64_n, + vrf1_pref64_len, + vrf_id=self.vrf1_id) + prefix = self.vapi.nat64_prefix_dump() + self.assertEqual(len(prefix), 2) + + # Global prefix + pkts = self.create_stream_in_ip6(self.pg0, + self.pg1, + pref=global_pref64, + plen=global_pref64_len) + self.pg0.add_stream(pkts) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg1.get_capture(len(pkts)) + self.verify_capture_out(capture, nat_ip=self.nat_addr, + dst_ip=self.pg1.remote_ip4) + + pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr) + self.pg1.add_stream(pkts) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg0.get_capture(len(pkts)) + dst_ip = self.compose_ip6(self.pg1.remote_ip4, + global_pref64, + global_pref64_len) + self.verify_capture_in_ip6(capture, dst_ip, self.pg0.remote_ip6) + + # Tenant specific prefix + pkts = self.create_stream_in_ip6(self.pg2, + self.pg1, + pref=vrf1_pref64, + plen=vrf1_pref64_len) + self.pg2.add_stream(pkts) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg1.get_capture(len(pkts)) + self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr, + dst_ip=self.pg1.remote_ip4) + + pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr) + self.pg1.add_stream(pkts) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg2.get_capture(len(pkts)) + dst_ip = self.compose_ip6(self.pg1.remote_ip4, + vrf1_pref64, + vrf1_pref64_len) + self.verify_capture_in_ip6(capture, dst_ip, self.pg2.remote_ip6) + + def _test_unknown_proto(self): + """ NAT64 translate packet with unknown protocol """ + + self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n, + self.nat_addr_n) + self.vapi.nat64_add_del_interface(self.pg0.sw_if_index) + self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0) + remote_ip6 = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96) + + # in2out + p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / + IPv6(src=self.pg0.remote_ip6, dst=remote_ip6) / + TCP(sport=self.tcp_port_in, dport=20)) + self.pg0.add_stream(p) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + p = self.pg1.get_capture(1) + + p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / + IPv6(src=self.pg0.remote_ip6, dst=remote_ip6) / + GRE() / + IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) / + TCP(sport=1234, dport=1234)) + self.pg0.add_stream(p) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + p = self.pg1.get_capture(1) + packet = p[0] + try: + self.assertEqual(packet[IP].src, self.nat_addr) + self.assertEqual(packet[IP].dst, self.pg1.remote_ip4) + self.assertTrue(packet.haslayer(GRE)) + self.check_ip_checksum(packet) + except: + self.logger.error(ppp("Unexpected or invalid packet:", packet)) + raise + + # out2in + p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) / + IP(src=self.pg1.remote_ip4, dst=self.nat_addr) / + GRE() / + IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) / + TCP(sport=1234, dport=1234)) + self.pg1.add_stream(p) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + p = self.pg0.get_capture(1) + packet = p[0] + try: + self.assertEqual(packet[IPv6].src, remote_ip6) + self.assertEqual(packet[IPv6].dst, self.pgi0.remote_ip6) + self.assertTrue(packet.haslayer(GRE)) + except: + self.logger.error(ppp("Unexpected or invalid packet:", packet)) + raise + + def _test_hairpinning_unknown_proto(self): + """ NAT64 translate packet with unknown protocol - hairpinning """ + + client = self.pg0.remote_hosts[0] + server = self.pg0.remote_hosts[1] + server_tcp_in_port = 22 + server_tcp_out_port = 4022 + client_tcp_in_port = 1234 + client_udp_in_port = 1235 + nat_addr_ip6 = self.compose_ip6(self.nat_addr, '64:ff9b::', 96) + + self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n, + self.nat_addr_n) + self.vapi.nat64_add_del_interface(self.pg0.sw_if_index) + self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0) + + self.vapi.nat64_add_del_static_bib(server.ip6n, + self.nat_addr_n, + server_tcp_in_port, + server_tcp_out_port, + IP_PROTOS.tcp) + + # client to server + p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / + IPv6(src=client.ip6, dst=nat_addr_ip6) / + TCP(sport=client_tcp_in_port, dport=server_tcp_out_port)) + self.pg0.add_stream(p) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + p = self.pg0.get_capture(1) + + p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / + IPv6(src=client.ip6, dst=nat_addr_ip6) / + GRE() / + IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) / + TCP(sport=1234, dport=1234)) + self.pg0.add_stream(p) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + p = self.pg0.get_capture(1) + packet = p[0] + try: + self.assertEqual(packet[IPv6].src, nat_addr_ip6) + self.assertEqual(packet[IPv6].dst, server.ip6) + self.assertTrue(packet.haslayer(GRE)) + except: + self.logger.error(ppp("Unexpected or invalid packet:", packet)) + raise + + # server to client + p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / + IPv6(src=server.ip6, dst=nat_addr_ip6) / + GRE() / + IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) / + TCP(sport=1234, dport=1234)) + self.pg0.add_stream(p) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + p = self.pg0.get_capture(1) + packet = p[0] + try: + self.assertEqual(packet[IPv6].src, nat_addr_ip6) + self.assertEqual(packet[IPv6].dst, client.ip6) + self.assertTrue(packet.haslayer(GRE)) + except: + self.logger.error(ppp("Unexpected or invalid packet:", packet)) + raise + + def nat64_get_ses_num(self): + """ + Return number of active NAT64 sessions. + """ + ses_num = 0 + st = self.vapi.nat64_st_dump(IP_PROTOS.tcp) + ses_num += len(st) + st = self.vapi.nat64_st_dump(IP_PROTOS.udp) + ses_num += len(st) + st = self.vapi.nat64_st_dump(IP_PROTOS.icmp) + ses_num += len(st) + return ses_num + + def clear_nat64(self): + """ + Clear NAT64 configuration. + """ + self.vapi.nat64_set_timeouts() + + interfaces = self.vapi.nat64_interface_dump() + for intf in interfaces: + self.vapi.nat64_add_del_interface(intf.sw_if_index, + intf.is_inside, + is_add=0) + + bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp) + for bibe in bib: + if bibe.is_static: + self.vapi.nat64_add_del_static_bib(bibe.i_addr, + bibe.o_addr, + bibe.i_port, + bibe.o_port, + bibe.proto, + bibe.vrf_id, + is_add=0) + + bib = self.vapi.nat64_bib_dump(IP_PROTOS.udp) + for bibe in bib: + if bibe.is_static: + self.vapi.nat64_add_del_static_bib(bibe.i_addr, + bibe.o_addr, + bibe.i_port, + bibe.o_port, + bibe.proto, + bibe.vrf_id, + is_add=0) + + bib = self.vapi.nat64_bib_dump(IP_PROTOS.icmp) + for bibe in bib: + if bibe.is_static: + self.vapi.nat64_add_del_static_bib(bibe.i_addr, + bibe.o_addr, + bibe.i_port, + bibe.o_port, + bibe.proto, + bibe.vrf_id, + is_add=0) + + adresses = self.vapi.nat64_pool_addr_dump() + for addr in adresses: + self.vapi.nat64_add_del_pool_addr_range(addr.address, + addr.address, + vrf_id=addr.vrf_id, + is_add=0) + + prefixes = self.vapi.nat64_prefix_dump() + for prefix in prefixes: + self.vapi.nat64_add_del_prefix(prefix.prefix, + prefix.prefix_len, + vrf_id=prefix.vrf_id, + is_add=0) + + def tearDown(self): + super(TestNAT64, self).tearDown() + if not self.vpp_dead: + self.logger.info(self.vapi.cli("show nat64 pool")) + self.logger.info(self.vapi.cli("show nat64 interfaces")) + self.logger.info(self.vapi.cli("show nat64 prefix")) + self.logger.info(self.vapi.cli("show nat64 bib tcp")) + self.logger.info(self.vapi.cli("show nat64 bib udp")) + self.logger.info(self.vapi.cli("show nat64 bib icmp")) + self.logger.info(self.vapi.cli("show nat64 session table tcp")) + self.logger.info(self.vapi.cli("show nat64 session table udp")) + self.logger.info(self.vapi.cli("show nat64 session table icmp")) + self.clear_nat64() + if __name__ == '__main__': unittest.main(testRunner=VppTestRunner)