NAT64: ICMP error support
[vpp.git] / test / test_snat.py
index 64fa305..c6344a9 100644 (file)
@@ -8,6 +8,7 @@ from framework import VppTestCase, VppTestRunner, running_extended_tests
 from scapy.layers.inet import IP, TCP, UDP, ICMP
 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply
+from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6
 from scapy.layers.l2 import Ether, ARP
 from scapy.data import IP_PROTOS
 from scapy.packet import bind_layers
@@ -2635,6 +2636,95 @@ class TestNAT64(MethodHolder):
         ses_num_after_timeout = self.nat64_get_ses_num()
         self.assertNotEqual(ses_num_before_timeout, ses_num_after_timeout)
 
+    def test_icmp_error(self):
+        """ NAT64 ICMP Error message translation """
+        self.tcp_port_in = 6303
+        self.udp_port_in = 6304
+        self.icmp_id_in = 6305
+
+        ses_num_start = self.nat64_get_ses_num()
+
+        self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
+                                                self.nat_addr_n)
+        self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
+        self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
+
+        # send some packets to create sessions
+        pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
+        self.pg0.add_stream(pkts)
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+        capture_ip4 = self.pg1.get_capture(len(pkts))
+        self.verify_capture_out(capture_ip4, packet_num=3,
+                                nat_ip=self.nat_addr,
+                                dst_ip=self.pg1.remote_ip4)
+
+        pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
+        self.pg1.add_stream(pkts)
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+        capture_ip6 = self.pg0.get_capture(len(pkts))
+        ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
+        self.verify_capture_in_ip6(capture_ip6, ip[IPv6].src,
+                                   self.pg0.remote_ip6)
+
+        # in2out
+        pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+                IPv6(src=self.pg0.remote_ip6, dst=ip[IPv6].src) /
+                ICMPv6DestUnreach(code=1) /
+                packet[IPv6] for packet in capture_ip6]
+        self.pg0.add_stream(pkts)
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+        capture = self.pg1.get_capture(len(pkts))
+        for packet in capture:
+            try:
+                self.assertEqual(packet[IP].src, self.nat_addr)
+                self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
+                self.assertEqual(packet[ICMP].type, 3)
+                self.assertEqual(packet[ICMP].code, 13)
+                inner = packet[IPerror]
+                self.assertEqual(inner.src, self.pg1.remote_ip4)
+                self.assertEqual(inner.dst, self.nat_addr)
+                if inner.haslayer(TCPerror):
+                    self.assertEqual(inner[TCPerror].dport, self.tcp_port_out)
+                elif inner.haslayer(UDPerror):
+                    self.assertEqual(inner[UDPerror].dport, self.udp_port_out)
+                else:
+                    self.assertEqual(inner[ICMPerror].id, self.icmp_id_out)
+            except:
+                self.logger.error(ppp("Unexpected or invalid packet:", packet))
+                raise
+
+        # out2in
+        pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
+                IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
+                ICMP(type=3, code=13) /
+                packet[IP] for packet in capture_ip4]
+        self.pg1.add_stream(pkts)
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+        capture = self.pg0.get_capture(len(pkts))
+        for packet in capture:
+            try:
+                self.assertEqual(packet[IPv6].src, ip.src)
+                self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
+                icmp = packet[ICMPv6DestUnreach]
+                self.assertEqual(icmp.code, 1)
+                inner = icmp[IPerror6]
+                self.assertEqual(inner.src, self.pg0.remote_ip6)
+                self.assertEqual(inner.dst, ip.src)
+                if inner.haslayer(TCPerror):
+                    self.assertEqual(inner[TCPerror].sport, self.tcp_port_in)
+                elif inner.haslayer(UDPerror):
+                    self.assertEqual(inner[UDPerror].sport, self.udp_port_in)
+                else:
+                    self.assertEqual(inner[ICMPv6EchoRequest].id,
+                                     self.icmp_id_in)
+            except:
+                self.logger.error(ppp("Unexpected or invalid packet:", packet))
+                raise
+
     def nat64_get_ses_num(self):
         """
         Return number of active NAT64 sessions.