Revert "VOM: fix cflags"
[vpp.git] / test / test_mpls.py
index 460a32d..aa5e674 100644 (file)
@@ -12,7 +12,7 @@ from vpp_mpls_tunnel_interface import VppMPLSTunnelInterface
 from scapy.packet import Raw
 from scapy.layers.l2 import Ether
 from scapy.layers.inet import IP, UDP, ICMP
 from scapy.packet import Raw
 from scapy.layers.l2 import Ether
 from scapy.layers.inet import IP, UDP, ICMP
-from scapy.layers.inet6 import IPv6
+from scapy.layers.inet6 import IPv6, ICMPv6TimeExceeded
 from scapy.contrib.mpls import MPLS
 
 
 from scapy.contrib.mpls import MPLS
 
 
@@ -106,6 +106,7 @@ class TestMPLS(VppTestCase):
             ping=0,
             ip_itf=None,
             dst_ip=None,
             ping=0,
             ip_itf=None,
             dst_ip=None,
+            chksum=None,
             n=257):
         self.reset_packet_infos()
         pkts = []
             n=257):
         self.reset_packet_infos()
         pkts = []
@@ -133,6 +134,8 @@ class TestMPLS(VppTestCase):
                             dst=ip_itf.local_ip4) /
                      ICMP())
 
                             dst=ip_itf.local_ip4) /
                      ICMP())
 
+            if chksum:
+                p[IP].chksum = chksum
             info.data = p.copy()
             pkts.append(p)
         return pkts
             info.data = p.copy()
             pkts.append(p)
         return pkts
@@ -152,7 +155,7 @@ class TestMPLS(VppTestCase):
         return pkts
 
     def create_stream_labelled_ip6(self, src_if, mpls_label, mpls_ttl,
         return pkts
 
     def create_stream_labelled_ip6(self, src_if, mpls_label, mpls_ttl,
-                                   dst_ip=None):
+                                   dst_ip=None, hlim=64):
         if dst_ip is None:
             dst_ip = src_if.remote_ip6
         self.reset_packet_infos()
         if dst_ip is None:
             dst_ip = src_if.remote_ip6
         self.reset_packet_infos()
@@ -162,7 +165,7 @@ class TestMPLS(VppTestCase):
             payload = self.info_to_payload(info)
             p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
                  MPLS(label=mpls_label, ttl=mpls_ttl) /
             payload = self.info_to_payload(info)
             p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
                  MPLS(label=mpls_label, ttl=mpls_ttl) /
-                 IPv6(src=src_if.remote_ip6, dst=dst_ip) /
+                 IPv6(src=src_if.remote_ip6, dst=dst_ip, hlim=hlim) /
                  UDP(sport=1234, dport=1234) /
                  Raw(payload))
             info.data = p.copy()
                  UDP(sport=1234, dport=1234) /
                  Raw(payload))
             info.data = p.copy()
@@ -285,12 +288,31 @@ class TestMPLS(VppTestCase):
         except:
             raise
 
         except:
             raise
 
-    def send_and_assert_no_replies(self, intf, pkts, remark):
-        intf.add_stream(pkts)
-        self.pg_enable_capture(self.pg_interfaces)
-        self.pg_start()
-        for i in self.pg_interfaces:
-            i.assert_nothing_captured(remark=remark)
+    def verify_capture_ip6_icmp(self, src_if, capture, sent):
+        try:
+            self.assertEqual(len(capture), len(sent))
+
+            for i in range(len(capture)):
+                tx = sent[i]
+                rx = capture[i]
+
+                # the rx'd packet has the MPLS label popped
+                eth = rx[Ether]
+                self.assertEqual(eth.type, 0x86DD)
+
+                tx_ip = tx[IPv6]
+                rx_ip = rx[IPv6]
+
+                self.assertEqual(rx_ip.dst, tx_ip.src)
+                # ICMP sourced from the interface's address
+                self.assertEqual(rx_ip.src, src_if.local_ip6)
+                # hop-limit reset to 255 for IMCP packet
+                self.assertEqual(rx_ip.hlim, 254)
+
+                icmp = rx[ICMPv6TimeExceeded]
+
+        except:
+            raise
 
     def test_swap(self):
         """ MPLS label swap tests """
 
     def test_swap(self):
         """ MPLS label swap tests """
@@ -360,6 +382,91 @@ class TestMPLS(VppTestCase):
         rx = self.pg0.get_capture()
         self.verify_capture_ip4(self.pg0, rx, tx)
 
         rx = self.pg0.get_capture()
         self.verify_capture_ip4(self.pg0, rx, tx)
 
+        #
+        # disposed packets have an invalid IPv4 checkusm
+        #
+        tx = self.create_stream_labelled_ip4(self.pg0, [33],
+                                             dst_ip=self.pg0.remote_ip4,
+                                             n=65,
+                                             chksum=1)
+        self.send_and_assert_no_replies(self.pg0, tx, "Invalid Checksum")
+
+        #
+        # An MPLS xconnect - EOS label in IPv6 out
+        #
+        route_333_eos = VppMplsRoute(
+            self, 333, 1,
+            [VppRoutePath(self.pg0.remote_ip6,
+                          self.pg0.sw_if_index,
+                          labels=[],
+                          proto=DpoProto.DPO_PROTO_IP6)])
+        route_333_eos.add_vpp_config()
+
+        self.vapi.cli("clear trace")
+        tx = self.create_stream_labelled_ip6(self.pg0, [333], 64)
+        self.pg0.add_stream(tx)
+
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+
+        rx = self.pg0.get_capture()
+        self.verify_capture_ip6(self.pg0, rx, tx)
+
+        #
+        # disposed packets have an TTL expired
+        #
+        tx = self.create_stream_labelled_ip6(self.pg0, [333], 64,
+                                             dst_ip=self.pg1.remote_ip6,
+                                             hlim=1)
+
+        self.vapi.cli("clear trace")
+        tx = self.create_stream_labelled_ip6(self.pg0, [333], 64,
+                                             dst_ip=self.pg1.remote_ip6,
+                                             hlim=0)
+        self.pg0.add_stream(tx)
+
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+
+        rx = self.pg0.get_capture()
+        self.verify_capture_ip6_icmp(self.pg0, rx, tx)
+
+        #
+        # An MPLS xconnect - EOS label in IPv6 out w imp-null
+        #
+        route_334_eos = VppMplsRoute(
+            self, 334, 1,
+            [VppRoutePath(self.pg0.remote_ip6,
+                          self.pg0.sw_if_index,
+                          labels=[3],
+                          proto=DpoProto.DPO_PROTO_IP6)])
+        route_334_eos.add_vpp_config()
+
+        self.vapi.cli("clear trace")
+        tx = self.create_stream_labelled_ip6(self.pg0, [334], 64)
+        self.pg0.add_stream(tx)
+
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+
+        rx = self.pg0.get_capture()
+        self.verify_capture_ip6(self.pg0, rx, tx)
+
+        #
+        # disposed packets have an TTL expired
+        #
+        self.vapi.cli("clear trace")
+        tx = self.create_stream_labelled_ip6(self.pg0, [334], 64,
+                                             dst_ip=self.pg1.remote_ip6,
+                                             hlim=0)
+        self.pg0.add_stream(tx)
+
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+
+        rx = self.pg0.get_capture()
+        self.verify_capture_ip6_icmp(self.pg0, rx, tx)
+
         #
         # An MPLS xconnect - non-EOS label in IP out - an invalid configuration
         # so this traffic should be dropped.
         #
         # An MPLS xconnect - non-EOS label in IP out - an invalid configuration
         # so this traffic should be dropped.
@@ -1025,6 +1132,14 @@ class TestMPLS(VppTestCase):
         rx = self.pg1.get_capture(257)
         self.verify_capture_ip4(self.pg1, rx, tx)
 
         rx = self.pg1.get_capture(257)
         self.verify_capture_ip4(self.pg1, rx, tx)
 
+        #
+        # disposed packets have an invalid IPv4 checkusm
+        #
+        tx = self.create_stream_labelled_ip4(self.pg0, [34],
+                                             dst_ip="232.1.1.1", n=65,
+                                             chksum=1)
+        self.send_and_assert_no_replies(self.pg0, tx, "Invalid Checksum")
+
         #
         # set the RPF-ID of the enrtry to not match the input packet's
         #
         #
         # set the RPF-ID of the enrtry to not match the input packet's
         #
@@ -1091,6 +1206,19 @@ class TestMPLS(VppTestCase):
         rx = self.pg1.get_capture(257)
         self.verify_capture_ip6(self.pg1, rx, tx)
 
         rx = self.pg1.get_capture(257)
         self.verify_capture_ip6(self.pg1, rx, tx)
 
+        #
+        # disposed packets have hop-limit = 1
+        #
+        tx = self.create_stream_labelled_ip6(self.pg0, [34], 255,
+                                             dst_ip="ff01::1", hlim=1)
+        self.pg0.add_stream(tx)
+
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+
+        rx = self.pg0.get_capture(257)
+        self.verify_capture_ip6_icmp(self.pg0, rx, tx)
+
         #
         # set the RPF-ID of the enrtry to not match the input packet's
         #
         #
         # set the RPF-ID of the enrtry to not match the input packet's
         #
@@ -1129,14 +1257,6 @@ class TestMPLSDisabled(VppTestCase):
         self.pg0.disable_mpls()
         super(TestMPLSDisabled, self).tearDown()
 
         self.pg0.disable_mpls()
         super(TestMPLSDisabled, self).tearDown()
 
-    def send_and_assert_no_replies(self, intf, pkts, remark):
-        intf.add_stream(pkts)
-        self.pg_enable_capture(self.pg_interfaces)
-        self.pg_start()
-        for i in self.pg_interfaces:
-            i.get_capture(0)
-            i.assert_nothing_captured(remark=remark)
-
     def test_mpls_disabled(self):
         """ MPLS Disabled """
 
     def test_mpls_disabled(self):
         """ MPLS Disabled """