NAT: Buufer overflow for memcpy()
[vpp.git] / test / test_mpls.py
index 460a32d..9590519 100644 (file)
@@ -12,7 +12,7 @@ from vpp_mpls_tunnel_interface import VppMPLSTunnelInterface
 from scapy.packet import Raw
 from scapy.layers.l2 import Ether
 from scapy.layers.inet import IP, UDP, ICMP
 from scapy.packet import Raw
 from scapy.layers.l2 import Ether
 from scapy.layers.inet import IP, UDP, ICMP
-from scapy.layers.inet6 import IPv6
+from scapy.layers.inet6 import IPv6, ICMPv6TimeExceeded
 from scapy.contrib.mpls import MPLS
 
 
 from scapy.contrib.mpls import MPLS
 
 
@@ -106,6 +106,7 @@ class TestMPLS(VppTestCase):
             ping=0,
             ip_itf=None,
             dst_ip=None,
             ping=0,
             ip_itf=None,
             dst_ip=None,
+            chksum=None,
             n=257):
         self.reset_packet_infos()
         pkts = []
             n=257):
         self.reset_packet_infos()
         pkts = []
@@ -133,6 +134,8 @@ class TestMPLS(VppTestCase):
                             dst=ip_itf.local_ip4) /
                      ICMP())
 
                             dst=ip_itf.local_ip4) /
                      ICMP())
 
+            if chksum:
+                p[IP].chksum = chksum
             info.data = p.copy()
             pkts.append(p)
         return pkts
             info.data = p.copy()
             pkts.append(p)
         return pkts
@@ -152,7 +155,7 @@ class TestMPLS(VppTestCase):
         return pkts
 
     def create_stream_labelled_ip6(self, src_if, mpls_label, mpls_ttl,
         return pkts
 
     def create_stream_labelled_ip6(self, src_if, mpls_label, mpls_ttl,
-                                   dst_ip=None):
+                                   dst_ip=None, hlim=64):
         if dst_ip is None:
             dst_ip = src_if.remote_ip6
         self.reset_packet_infos()
         if dst_ip is None:
             dst_ip = src_if.remote_ip6
         self.reset_packet_infos()
@@ -162,7 +165,7 @@ class TestMPLS(VppTestCase):
             payload = self.info_to_payload(info)
             p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
                  MPLS(label=mpls_label, ttl=mpls_ttl) /
             payload = self.info_to_payload(info)
             p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
                  MPLS(label=mpls_label, ttl=mpls_ttl) /
-                 IPv6(src=src_if.remote_ip6, dst=dst_ip) /
+                 IPv6(src=src_if.remote_ip6, dst=dst_ip, hlim=hlim) /
                  UDP(sport=1234, dport=1234) /
                  Raw(payload))
             info.data = p.copy()
                  UDP(sport=1234, dport=1234) /
                  Raw(payload))
             info.data = p.copy()
@@ -285,6 +288,32 @@ class TestMPLS(VppTestCase):
         except:
             raise
 
         except:
             raise
 
+    def verify_capture_ip6_icmp(self, src_if, capture, sent):
+        try:
+            self.assertEqual(len(capture), len(sent))
+
+            for i in range(len(capture)):
+                tx = sent[i]
+                rx = capture[i]
+
+                # the rx'd packet has the MPLS label popped
+                eth = rx[Ether]
+                self.assertEqual(eth.type, 0x86DD)
+
+                tx_ip = tx[IPv6]
+                rx_ip = rx[IPv6]
+
+                self.assertEqual(rx_ip.dst, tx_ip.src)
+                # ICMP sourced from the interface's address
+                self.assertEqual(rx_ip.src, src_if.local_ip6)
+                # hop-limit reset to 255 for IMCP packet
+                self.assertEqual(rx_ip.hlim, 254)
+
+                icmp = rx[ICMPv6TimeExceeded]
+
+        except:
+            raise
+
     def send_and_assert_no_replies(self, intf, pkts, remark):
         intf.add_stream(pkts)
         self.pg_enable_capture(self.pg_interfaces)
     def send_and_assert_no_replies(self, intf, pkts, remark):
         intf.add_stream(pkts)
         self.pg_enable_capture(self.pg_interfaces)
@@ -360,6 +389,91 @@ class TestMPLS(VppTestCase):
         rx = self.pg0.get_capture()
         self.verify_capture_ip4(self.pg0, rx, tx)
 
         rx = self.pg0.get_capture()
         self.verify_capture_ip4(self.pg0, rx, tx)
 
+        #
+        # disposed packets have an invalid IPv4 checkusm
+        #
+        tx = self.create_stream_labelled_ip4(self.pg0, [33],
+                                             dst_ip=self.pg0.remote_ip4,
+                                             n=65,
+                                             chksum=1)
+        self.send_and_assert_no_replies(self.pg0, tx, "Invalid Checksum")
+
+        #
+        # An MPLS xconnect - EOS label in IPv6 out
+        #
+        route_333_eos = VppMplsRoute(
+            self, 333, 1,
+            [VppRoutePath(self.pg0.remote_ip6,
+                          self.pg0.sw_if_index,
+                          labels=[],
+                          proto=DpoProto.DPO_PROTO_IP6)])
+        route_333_eos.add_vpp_config()
+
+        self.vapi.cli("clear trace")
+        tx = self.create_stream_labelled_ip6(self.pg0, [333], 64)
+        self.pg0.add_stream(tx)
+
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+
+        rx = self.pg0.get_capture()
+        self.verify_capture_ip6(self.pg0, rx, tx)
+
+        #
+        # disposed packets have an TTL expired
+        #
+        tx = self.create_stream_labelled_ip6(self.pg0, [333], 64,
+                                             dst_ip=self.pg1.remote_ip6,
+                                             hlim=1)
+
+        self.vapi.cli("clear trace")
+        tx = self.create_stream_labelled_ip6(self.pg0, [333], 64,
+                                             dst_ip=self.pg1.remote_ip6,
+                                             hlim=0)
+        self.pg0.add_stream(tx)
+
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+
+        rx = self.pg0.get_capture()
+        self.verify_capture_ip6_icmp(self.pg0, rx, tx)
+
+        #
+        # An MPLS xconnect - EOS label in IPv6 out w imp-null
+        #
+        route_334_eos = VppMplsRoute(
+            self, 334, 1,
+            [VppRoutePath(self.pg0.remote_ip6,
+                          self.pg0.sw_if_index,
+                          labels=[3],
+                          proto=DpoProto.DPO_PROTO_IP6)])
+        route_334_eos.add_vpp_config()
+
+        self.vapi.cli("clear trace")
+        tx = self.create_stream_labelled_ip6(self.pg0, [334], 64)
+        self.pg0.add_stream(tx)
+
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+
+        rx = self.pg0.get_capture()
+        self.verify_capture_ip6(self.pg0, rx, tx)
+
+        #
+        # disposed packets have an TTL expired
+        #
+        self.vapi.cli("clear trace")
+        tx = self.create_stream_labelled_ip6(self.pg0, [334], 64,
+                                             dst_ip=self.pg1.remote_ip6,
+                                             hlim=0)
+        self.pg0.add_stream(tx)
+
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+
+        rx = self.pg0.get_capture()
+        self.verify_capture_ip6_icmp(self.pg0, rx, tx)
+
         #
         # An MPLS xconnect - non-EOS label in IP out - an invalid configuration
         # so this traffic should be dropped.
         #
         # An MPLS xconnect - non-EOS label in IP out - an invalid configuration
         # so this traffic should be dropped.
@@ -1025,6 +1139,14 @@ class TestMPLS(VppTestCase):
         rx = self.pg1.get_capture(257)
         self.verify_capture_ip4(self.pg1, rx, tx)
 
         rx = self.pg1.get_capture(257)
         self.verify_capture_ip4(self.pg1, rx, tx)
 
+        #
+        # disposed packets have an invalid IPv4 checkusm
+        #
+        tx = self.create_stream_labelled_ip4(self.pg0, [34],
+                                             dst_ip="232.1.1.1", n=65,
+                                             chksum=1)
+        self.send_and_assert_no_replies(self.pg0, tx, "Invalid Checksum")
+
         #
         # set the RPF-ID of the enrtry to not match the input packet's
         #
         #
         # set the RPF-ID of the enrtry to not match the input packet's
         #
@@ -1091,6 +1213,13 @@ class TestMPLS(VppTestCase):
         rx = self.pg1.get_capture(257)
         self.verify_capture_ip6(self.pg1, rx, tx)
 
         rx = self.pg1.get_capture(257)
         self.verify_capture_ip6(self.pg1, rx, tx)
 
+        #
+        # disposed packets have hop-limit = 1
+        #
+        tx = self.create_stream_labelled_ip6(self.pg0, [34], 255,
+                                             dst_ip="ff01::1", hlim=1)
+        self.send_and_assert_no_replies(self.pg0, tx, "Hop Limt Expired")
+
         #
         # set the RPF-ID of the enrtry to not match the input packet's
         #
         #
         # set the RPF-ID of the enrtry to not match the input packet's
         #