npt66: icmp6 alg to handle icmp6 error messages
[vpp.git] / test / test_npt66.py
index 44a9e87..c867621 100644 (file)
@@ -4,7 +4,7 @@ import unittest
 import ipaddress
 from framework import VppTestCase, VppTestRunner
 
-from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest
+from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6DestUnreach
 from scapy.layers.l2 import Ether
 from scapy.packet import Raw
 
@@ -33,7 +33,7 @@ class TestNPT66(VppTestCase):
             i.admin_down()
         super(TestNPT66, self).tearDown()
 
-    def send_and_verify(self, internal):
+    def send_and_verify(self, internal, reply_icmp_error=False):
         sendif = self.pg0
         recvif = self.pg1
         local_mac = self.pg0.local_mac
@@ -47,30 +47,57 @@ class TestNPT66(VppTestCase):
             / ICMPv6EchoRequest()
             / Raw(b"Request")
         )
+        # print('Sending packet')
+        # p.show2()
         rxs = self.send_and_expect(sendif, p, recvif)
         for rx in rxs:
+            # print('Received packet')
+            # rx.show2()
             original_cksum = rx[ICMPv6EchoRequest].cksum
             del rx[ICMPv6EchoRequest].cksum
             rx = rx.__class__(bytes(rx))
             self.assertEqual(original_cksum, rx[ICMPv6EchoRequest].cksum)
 
             # Generate a replies
-            reply = (
-                Ether(dst=rx[Ether].src, src=local_mac)
-                / IPv6(src=rx[IPv6].dst, dst=rx[IPv6].src)
-                / ICMPv6EchoRequest()
-                / Raw(b"Reply")
-            )
-
-            replies = self.send_and_expect(recvif, reply, sendif)
-            for r in replies:
-                self.assertEqual(str(p[IPv6].src), r[IPv6].dst)
-                original_cksum = r[ICMPv6EchoRequest].cksum
-                del r[ICMPv6EchoRequest].cksum
-                r = r.__class__(bytes(r))
-                self.assertEqual(original_cksum, r[ICMPv6EchoRequest].cksum)
-
-    def do_test(self, internal, external):
+            if reply_icmp_error:
+                # print('Generating an ICMP error message')
+                reply = (
+                    Ether(dst=rx[Ether].src, src=local_mac)
+                    / IPv6(src=rx[IPv6].dst, dst=rx[IPv6].src)
+                    / ICMPv6DestUnreach()
+                    / rx[IPv6]
+                )
+                # print('Sending ICMP error message reply')
+                # reply.show2()
+                replies = self.send_and_expect(recvif, reply, sendif)
+                for r in replies:
+                    # print('Received ICMP error message reply on the other side')
+                    # r.show2()
+                    self.assertEqual(str(p[IPv6].src), r[IPv6].dst)
+                    original_cksum = r[ICMPv6EchoRequest].cksum
+                    del r[ICMPv6EchoRequest].cksum
+                    r = r.__class__(bytes(r))
+                    self.assertEqual(original_cksum, r[ICMPv6EchoRequest].cksum)
+
+            else:
+                reply = (
+                    Ether(dst=rx[Ether].src, src=local_mac)
+                    / IPv6(src=rx[IPv6].dst, dst=rx[IPv6].src)
+                    / ICMPv6EchoRequest()
+                    / Raw(b"Reply")
+                )
+
+                replies = self.send_and_expect(recvif, reply, sendif)
+                for r in replies:
+                    r.show2()
+                    self.assertEqual(str(p[IPv6].src), r[IPv6].dst)
+                    original_cksum = r[ICMPv6EchoRequest].cksum
+                    del r[ICMPv6EchoRequest].cksum
+                    r = r.__class__(bytes(r))
+                    self.assertEqual(original_cksum, r[ICMPv6EchoRequest].cksum)
+
+    def do_test(self, internal, external, reply_icmp_error=False):
+        """Add NPT66 binding and send packet"""
         self.vapi.npt66_binding_add_del(
             sw_if_index=self.pg1.sw_if_index,
             internal=internal,
@@ -80,7 +107,7 @@ class TestNPT66(VppTestCase):
         ## TODO use route api
         self.vapi.cli(f"ip route add {internal} via {self.pg0.remote_ip6}")
 
-        self.send_and_verify(internal)
+        self.send_and_verify(internal, reply_icmp_error=reply_icmp_error)
 
         self.vapi.npt66_binding_add_del(
             sw_if_index=self.pg1.sw_if_index,
@@ -97,6 +124,17 @@ class TestNPT66(VppTestCase):
         self.do_test("fc00:1234::/32", "2001:db8:1::/32")
         self.do_test("fc00:1234::/63", "2001:db8:1::/56")
 
+    def test_npt66_icmp6(self):
+        """Send and receive a packet through NPT66"""
+
+        # Test ICMP6 error packets
+        self.do_test(
+            "fd00:0000:0000::/48", "2001:4650:c3ed::/48", reply_icmp_error=True
+        )
+        self.do_test("fc00:1::/48", "2001:db8:1::/48", reply_icmp_error=True)
+        self.do_test("fc00:1234::/32", "2001:db8:1::/32", reply_icmp_error=True)
+        self.do_test("fc00:1234::/63", "2001:db8:1::/56", reply_icmp_error=True)
+
 
 if __name__ == "__main__":
     unittest.main(testRunner=VppTestRunner)