npt66: icmp6 alg to handle icmp6 error messages
[vpp.git] / test / test_npt66.py
index 5173c62..c867621 100644 (file)
@@ -4,7 +4,7 @@ import unittest
 import ipaddress
 from framework import VppTestCase, VppTestRunner
 
-from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest
+from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6DestUnreach
 from scapy.layers.l2 import Ether
 from scapy.packet import Raw
 
@@ -12,6 +12,10 @@ from scapy.packet import Raw
 class TestNPT66(VppTestCase):
     """NPTv6 Test Case"""
 
+    extra_vpp_plugin_config = [
+        "plugin npt66_plugin.so {enable}",
+    ]
+
     def setUp(self):
         super(TestNPT66, self).setUp()
 
@@ -29,46 +33,81 @@ class TestNPT66(VppTestCase):
             i.admin_down()
         super(TestNPT66, self).tearDown()
 
-    def send_and_verify(self, in2out, internal, external):
-        if in2out:
-            sendif = self.pg0
-            recvif = self.pg1
-            local_mac = self.pg0.local_mac
-            remote_mac = self.pg0.remote_mac
-            src = ipaddress.ip_interface(internal).ip + 1
-            dst = self.pg1.remote_ip6
-        else:
-            sendif = self.pg1
-            recvif = self.pg0
-            local_mac = self.pg1.local_mac
-            remote_mac = self.pg1.remote_mac
-            src = self.pg1.remote_ip6
-            dst = ipaddress.ip_interface(external).ip + 1
+    def send_and_verify(self, internal, reply_icmp_error=False):
+        sendif = self.pg0
+        recvif = self.pg1
+        local_mac = self.pg0.local_mac
+        remote_mac = self.pg0.remote_mac
+        src = ipaddress.ip_interface(internal).ip + 1
+        dst = self.pg1.remote_ip6
 
         p = (
             Ether(dst=local_mac, src=remote_mac)
             / IPv6(src=src, dst=dst)
             / ICMPv6EchoRequest()
+            / Raw(b"Request")
         )
+        # print('Sending packet')
+        # p.show2()
         rxs = self.send_and_expect(sendif, p, recvif)
         for rx in rxs:
-            rx.show2()
+            # print('Received packet')
+            # rx.show2()
             original_cksum = rx[ICMPv6EchoRequest].cksum
             del rx[ICMPv6EchoRequest].cksum
             rx = rx.__class__(bytes(rx))
             self.assertEqual(original_cksum, rx[ICMPv6EchoRequest].cksum)
 
-    def do_test(self, internal, external):
+            # Generate a replies
+            if reply_icmp_error:
+                # print('Generating an ICMP error message')
+                reply = (
+                    Ether(dst=rx[Ether].src, src=local_mac)
+                    / IPv6(src=rx[IPv6].dst, dst=rx[IPv6].src)
+                    / ICMPv6DestUnreach()
+                    / rx[IPv6]
+                )
+                # print('Sending ICMP error message reply')
+                # reply.show2()
+                replies = self.send_and_expect(recvif, reply, sendif)
+                for r in replies:
+                    # print('Received ICMP error message reply on the other side')
+                    # r.show2()
+                    self.assertEqual(str(p[IPv6].src), r[IPv6].dst)
+                    original_cksum = r[ICMPv6EchoRequest].cksum
+                    del r[ICMPv6EchoRequest].cksum
+                    r = r.__class__(bytes(r))
+                    self.assertEqual(original_cksum, r[ICMPv6EchoRequest].cksum)
+
+            else:
+                reply = (
+                    Ether(dst=rx[Ether].src, src=local_mac)
+                    / IPv6(src=rx[IPv6].dst, dst=rx[IPv6].src)
+                    / ICMPv6EchoRequest()
+                    / Raw(b"Reply")
+                )
+
+                replies = self.send_and_expect(recvif, reply, sendif)
+                for r in replies:
+                    r.show2()
+                    self.assertEqual(str(p[IPv6].src), r[IPv6].dst)
+                    original_cksum = r[ICMPv6EchoRequest].cksum
+                    del r[ICMPv6EchoRequest].cksum
+                    r = r.__class__(bytes(r))
+                    self.assertEqual(original_cksum, r[ICMPv6EchoRequest].cksum)
+
+    def do_test(self, internal, external, reply_icmp_error=False):
+        """Add NPT66 binding and send packet"""
         self.vapi.npt66_binding_add_del(
             sw_if_index=self.pg1.sw_if_index,
             internal=internal,
             external=external,
             is_add=True,
         )
+        ## TODO use route api
         self.vapi.cli(f"ip route add {internal} via {self.pg0.remote_ip6}")
 
-        self.send_and_verify(True, internal, external)
-        self.send_and_verify(False, internal, external)
+        self.send_and_verify(internal, reply_icmp_error=reply_icmp_error)
 
         self.vapi.npt66_binding_add_del(
             sw_if_index=self.pg1.sw_if_index,
@@ -80,10 +119,22 @@ class TestNPT66(VppTestCase):
     def test_npt66_simple(self):
         """Send and receive a packet through NPT66"""
 
+        self.do_test("fd00:0000:0000::/48", "2001:4650:c3ed::/48")
         self.do_test("fc00:1::/48", "2001:db8:1::/48")
         self.do_test("fc00:1234::/32", "2001:db8:1::/32")
         self.do_test("fc00:1234::/63", "2001:db8:1::/56")
 
+    def test_npt66_icmp6(self):
+        """Send and receive a packet through NPT66"""
+
+        # Test ICMP6 error packets
+        self.do_test(
+            "fd00:0000:0000::/48", "2001:4650:c3ed::/48", reply_icmp_error=True
+        )
+        self.do_test("fc00:1::/48", "2001:db8:1::/48", reply_icmp_error=True)
+        self.do_test("fc00:1234::/32", "2001:db8:1::/32", reply_icmp_error=True)
+        self.do_test("fc00:1234::/63", "2001:db8:1::/56", reply_icmp_error=True)
+
 
 if __name__ == "__main__":
     unittest.main(testRunner=VppTestRunner)