npt66: icmp6 alg to handle icmp6 error messages 68/39668/7
authorOle Troan <otroan@employees.org>
Thu, 12 Oct 2023 16:54:55 +0000 (18:54 +0200)
committerOle Tr�an <otroan@employees.org>
Mon, 16 Oct 2023 10:59:22 +0000 (10:59 +0000)
Support rewriting the inner packet for ICMP6 error messages.

Type: feature
Change-Id: I7e11f53626037075a23310f1cb7e673b0cb52843
Signed-off-by: Ole Troan <otroan@employees.org>
src/plugins/npt66/npt66_node.c
test/test_npt66.py

index ebe3359..f74f914 100644 (file)
@@ -159,6 +159,69 @@ done:
   return rv;
 }
 
+static int
+npt66_icmp6_translate (vlib_buffer_t *b, ip6_header_t *outer_ip,
+                      icmp46_header_t *icmp, npt66_binding_t *binding,
+                      int dir)
+{
+  ip6_header_t *ip = (ip6_header_t *) (icmp + 2);
+  int rv = 0;
+  vlib_main_t *vm = vlib_get_main ();
+
+  if (clib_net_to_host_u16 (outer_ip->payload_length) <
+      sizeof (icmp46_header_t) + 4 + sizeof (ip6_header_t))
+    {
+      clib_warning ("ICMP6 payload too short");
+      return -1;
+    }
+
+  // Validate checksums
+  int bogus_length;
+  u16 sum16;
+  sum16 = ip6_tcp_udp_icmp_compute_checksum (vm, b, outer_ip, &bogus_length);
+  if (sum16 != 0 && sum16 != 0xffff)
+    {
+      clib_warning ("ICMP6 checksum failed");
+      return -1;
+    }
+  if (dir == VLIB_RX)
+    {
+      if (!ip6_prefix_cmp (ip->src_address, binding->external,
+                          binding->external_plen))
+       {
+         clib_warning (
+           "npt66_icmp6_translate: src address is not internal (%U -> %U)",
+           format_ip6_address, &ip->src_address, format_ip6_address,
+           &ip->dst_address);
+         goto done;
+       }
+      ip->src_address = ip6_prefix_copy (ip->src_address, binding->internal,
+                                        binding->internal_plen);
+      /* Checksum neutrality */
+      rv = npt66_adjust_checksum (binding->internal_plen, true, binding->delta,
+                                 &ip->src_address);
+    }
+  else
+    {
+      if (!ip6_prefix_cmp (ip->dst_address, binding->external,
+                          binding->external_plen))
+       {
+         clib_warning (
+           "npt66_icmp6_translate: dst address is not external (%U -> %U)",
+           format_ip6_address, &ip->src_address, format_ip6_address,
+           &ip->dst_address);
+         goto done;
+       }
+      ip->dst_address = ip6_prefix_copy (ip->dst_address, binding->internal,
+                                        binding->internal_plen);
+      rv = npt66_adjust_checksum (binding->internal_plen, false,
+                                 binding->delta, &ip->dst_address);
+    }
+done:
+
+  return rv;
+}
+
 /*
  * Lookup the packet tuple in the flow cache, given the lookup mask.
  * If a binding is found, rewrite the packet according to instructions,
@@ -194,8 +257,20 @@ npt66_node_inline (vlib_main_t *vm, vlib_node_runtime_t *node,
 
       /* By default pass packet to next node in the feature chain */
       vnet_feature_next_u16 (next, b[0]);
+      int rv;
+      icmp46_header_t *icmp = (icmp46_header_t *) (ip + 1);
+      if (ip->protocol == IP_PROTOCOL_ICMP6 && icmp->type < 128)
+       {
+         rv = npt66_icmp6_translate (b[0], ip, icmp, binding, dir);
+         if (rv < 0)
+           {
+             clib_warning ("ICMP6 npt66_translate failed");
+             *next = NPT66_NEXT_DROP;
+             goto next;
+           }
+       }
+      rv = npt66_translate (ip, binding, dir);
 
-      int rv = npt66_translate (ip, binding, dir);
       if (rv < 0)
        {
          vlib_node_increment_counter (vm, node->node_index,
index 44a9e87..c867621 100644 (file)
@@ -4,7 +4,7 @@ import unittest
 import ipaddress
 from framework import VppTestCase, VppTestRunner
 
-from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest
+from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6DestUnreach
 from scapy.layers.l2 import Ether
 from scapy.packet import Raw
 
@@ -33,7 +33,7 @@ class TestNPT66(VppTestCase):
             i.admin_down()
         super(TestNPT66, self).tearDown()
 
-    def send_and_verify(self, internal):
+    def send_and_verify(self, internal, reply_icmp_error=False):
         sendif = self.pg0
         recvif = self.pg1
         local_mac = self.pg0.local_mac
@@ -47,30 +47,57 @@ class TestNPT66(VppTestCase):
             / ICMPv6EchoRequest()
             / Raw(b"Request")
         )
+        # print('Sending packet')
+        # p.show2()
         rxs = self.send_and_expect(sendif, p, recvif)
         for rx in rxs:
+            # print('Received packet')
+            # rx.show2()
             original_cksum = rx[ICMPv6EchoRequest].cksum
             del rx[ICMPv6EchoRequest].cksum
             rx = rx.__class__(bytes(rx))
             self.assertEqual(original_cksum, rx[ICMPv6EchoRequest].cksum)
 
             # Generate a replies
-            reply = (
-                Ether(dst=rx[Ether].src, src=local_mac)
-                / IPv6(src=rx[IPv6].dst, dst=rx[IPv6].src)
-                / ICMPv6EchoRequest()
-                / Raw(b"Reply")
-            )
-
-            replies = self.send_and_expect(recvif, reply, sendif)
-            for r in replies:
-                self.assertEqual(str(p[IPv6].src), r[IPv6].dst)
-                original_cksum = r[ICMPv6EchoRequest].cksum
-                del r[ICMPv6EchoRequest].cksum
-                r = r.__class__(bytes(r))
-                self.assertEqual(original_cksum, r[ICMPv6EchoRequest].cksum)
-
-    def do_test(self, internal, external):
+            if reply_icmp_error:
+                # print('Generating an ICMP error message')
+                reply = (
+                    Ether(dst=rx[Ether].src, src=local_mac)
+                    / IPv6(src=rx[IPv6].dst, dst=rx[IPv6].src)
+                    / ICMPv6DestUnreach()
+                    / rx[IPv6]
+                )
+                # print('Sending ICMP error message reply')
+                # reply.show2()
+                replies = self.send_and_expect(recvif, reply, sendif)
+                for r in replies:
+                    # print('Received ICMP error message reply on the other side')
+                    # r.show2()
+                    self.assertEqual(str(p[IPv6].src), r[IPv6].dst)
+                    original_cksum = r[ICMPv6EchoRequest].cksum
+                    del r[ICMPv6EchoRequest].cksum
+                    r = r.__class__(bytes(r))
+                    self.assertEqual(original_cksum, r[ICMPv6EchoRequest].cksum)
+
+            else:
+                reply = (
+                    Ether(dst=rx[Ether].src, src=local_mac)
+                    / IPv6(src=rx[IPv6].dst, dst=rx[IPv6].src)
+                    / ICMPv6EchoRequest()
+                    / Raw(b"Reply")
+                )
+
+                replies = self.send_and_expect(recvif, reply, sendif)
+                for r in replies:
+                    r.show2()
+                    self.assertEqual(str(p[IPv6].src), r[IPv6].dst)
+                    original_cksum = r[ICMPv6EchoRequest].cksum
+                    del r[ICMPv6EchoRequest].cksum
+                    r = r.__class__(bytes(r))
+                    self.assertEqual(original_cksum, r[ICMPv6EchoRequest].cksum)
+
+    def do_test(self, internal, external, reply_icmp_error=False):
+        """Add NPT66 binding and send packet"""
         self.vapi.npt66_binding_add_del(
             sw_if_index=self.pg1.sw_if_index,
             internal=internal,
@@ -80,7 +107,7 @@ class TestNPT66(VppTestCase):
         ## TODO use route api
         self.vapi.cli(f"ip route add {internal} via {self.pg0.remote_ip6}")
 
-        self.send_and_verify(internal)
+        self.send_and_verify(internal, reply_icmp_error=reply_icmp_error)
 
         self.vapi.npt66_binding_add_del(
             sw_if_index=self.pg1.sw_if_index,
@@ -97,6 +124,17 @@ class TestNPT66(VppTestCase):
         self.do_test("fc00:1234::/32", "2001:db8:1::/32")
         self.do_test("fc00:1234::/63", "2001:db8:1::/56")
 
+    def test_npt66_icmp6(self):
+        """Send and receive a packet through NPT66"""
+
+        # Test ICMP6 error packets
+        self.do_test(
+            "fd00:0000:0000::/48", "2001:4650:c3ed::/48", reply_icmp_error=True
+        )
+        self.do_test("fc00:1::/48", "2001:db8:1::/48", reply_icmp_error=True)
+        self.do_test("fc00:1234::/32", "2001:db8:1::/32", reply_icmp_error=True)
+        self.do_test("fc00:1234::/63", "2001:db8:1::/56", reply_icmp_error=True)
+
 
 if __name__ == "__main__":
     unittest.main(testRunner=VppTestRunner)