VAPI: Ensure type definitions are generated in same order as .api file.
[vpp.git] / test / test_mpls.py
index b2226a7..aa5e674 100644 (file)
@@ -6,13 +6,13 @@ import socket
 from framework import VppTestCase, VppTestRunner
 from vpp_ip_route import VppIpRoute, VppRoutePath, VppMplsRoute, \
     VppMplsIpBind, VppIpMRoute, VppMRoutePath, \
-    MRouteItfFlags, MRouteEntryFlags, DpoProto
+    MRouteItfFlags, MRouteEntryFlags, DpoProto, VppIpTable, VppMplsTable
 from vpp_mpls_tunnel_interface import VppMPLSTunnelInterface
 
 from scapy.packet import Raw
 from scapy.layers.l2 import Ether
 from scapy.layers.inet import IP, UDP, ICMP
-from scapy.layers.inet6 import IPv6
+from scapy.layers.inet6 import IPv6, ICMPv6TimeExceeded
 from scapy.contrib.mpls import MPLS
 
 
@@ -60,9 +60,23 @@ class TestMPLS(VppTestCase):
         # setup both interfaces
         # assign them different tables.
         table_id = 0
+        self.tables = []
+
+        tbl = VppMplsTable(self, 0)
+        tbl.add_vpp_config()
+        self.tables.append(tbl)
 
         for i in self.pg_interfaces:
             i.admin_up()
+
+            if table_id != 0:
+                tbl = VppIpTable(self, table_id)
+                tbl.add_vpp_config()
+                self.tables.append(tbl)
+                tbl = VppIpTable(self, table_id, is_ip6=1)
+                tbl.add_vpp_config()
+                self.tables.append(tbl)
+
             i.set_table_ip4(table_id)
             i.set_table_ip6(table_id)
             i.config_ip4()
@@ -73,12 +87,15 @@ class TestMPLS(VppTestCase):
             table_id += 1
 
     def tearDown(self):
-        super(TestMPLS, self).tearDown()
         for i in self.pg_interfaces:
             i.unconfig_ip4()
             i.unconfig_ip6()
             i.ip6_disable()
+            i.set_table_ip4(0)
+            i.set_table_ip6(0)
+            i.disable_mpls()
             i.admin_down()
+        super(TestMPLS, self).tearDown()
 
     # the default of 64 matches the IP packet TTL default
     def create_stream_labelled_ip4(
@@ -89,6 +106,7 @@ class TestMPLS(VppTestCase):
             ping=0,
             ip_itf=None,
             dst_ip=None,
+            chksum=None,
             n=257):
         self.reset_packet_infos()
         pkts = []
@@ -116,6 +134,8 @@ class TestMPLS(VppTestCase):
                             dst=ip_itf.local_ip4) /
                      ICMP())
 
+            if chksum:
+                p[IP].chksum = chksum
             info.data = p.copy()
             pkts.append(p)
         return pkts
@@ -135,7 +155,7 @@ class TestMPLS(VppTestCase):
         return pkts
 
     def create_stream_labelled_ip6(self, src_if, mpls_label, mpls_ttl,
-                                   dst_ip=None):
+                                   dst_ip=None, hlim=64):
         if dst_ip is None:
             dst_ip = src_if.remote_ip6
         self.reset_packet_infos()
@@ -145,7 +165,7 @@ class TestMPLS(VppTestCase):
             payload = self.info_to_payload(info)
             p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
                  MPLS(label=mpls_label, ttl=mpls_ttl) /
-                 IPv6(src=src_if.remote_ip6, dst=dst_ip) /
+                 IPv6(src=src_if.remote_ip6, dst=dst_ip, hlim=hlim) /
                  UDP(sport=1234, dport=1234) /
                  Raw(payload))
             info.data = p.copy()
@@ -268,12 +288,31 @@ class TestMPLS(VppTestCase):
         except:
             raise
 
-    def send_and_assert_no_replies(self, intf, pkts, remark):
-        intf.add_stream(pkts)
-        self.pg_enable_capture(self.pg_interfaces)
-        self.pg_start()
-        for i in self.pg_interfaces:
-            i.assert_nothing_captured(remark=remark)
+    def verify_capture_ip6_icmp(self, src_if, capture, sent):
+        try:
+            self.assertEqual(len(capture), len(sent))
+
+            for i in range(len(capture)):
+                tx = sent[i]
+                rx = capture[i]
+
+                # the rx'd packet has the MPLS label popped
+                eth = rx[Ether]
+                self.assertEqual(eth.type, 0x86DD)
+
+                tx_ip = tx[IPv6]
+                rx_ip = rx[IPv6]
+
+                self.assertEqual(rx_ip.dst, tx_ip.src)
+                # ICMP sourced from the interface's address
+                self.assertEqual(rx_ip.src, src_if.local_ip6)
+                # hop-limit reset to 255 for IMCP packet
+                self.assertEqual(rx_ip.hlim, 254)
+
+                icmp = rx[ICMPv6TimeExceeded]
+
+        except:
+            raise
 
     def test_swap(self):
         """ MPLS label swap tests """
@@ -343,6 +382,91 @@ class TestMPLS(VppTestCase):
         rx = self.pg0.get_capture()
         self.verify_capture_ip4(self.pg0, rx, tx)
 
+        #
+        # disposed packets have an invalid IPv4 checkusm
+        #
+        tx = self.create_stream_labelled_ip4(self.pg0, [33],
+                                             dst_ip=self.pg0.remote_ip4,
+                                             n=65,
+                                             chksum=1)
+        self.send_and_assert_no_replies(self.pg0, tx, "Invalid Checksum")
+
+        #
+        # An MPLS xconnect - EOS label in IPv6 out
+        #
+        route_333_eos = VppMplsRoute(
+            self, 333, 1,
+            [VppRoutePath(self.pg0.remote_ip6,
+                          self.pg0.sw_if_index,
+                          labels=[],
+                          proto=DpoProto.DPO_PROTO_IP6)])
+        route_333_eos.add_vpp_config()
+
+        self.vapi.cli("clear trace")
+        tx = self.create_stream_labelled_ip6(self.pg0, [333], 64)
+        self.pg0.add_stream(tx)
+
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+
+        rx = self.pg0.get_capture()
+        self.verify_capture_ip6(self.pg0, rx, tx)
+
+        #
+        # disposed packets have an TTL expired
+        #
+        tx = self.create_stream_labelled_ip6(self.pg0, [333], 64,
+                                             dst_ip=self.pg1.remote_ip6,
+                                             hlim=1)
+
+        self.vapi.cli("clear trace")
+        tx = self.create_stream_labelled_ip6(self.pg0, [333], 64,
+                                             dst_ip=self.pg1.remote_ip6,
+                                             hlim=0)
+        self.pg0.add_stream(tx)
+
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+
+        rx = self.pg0.get_capture()
+        self.verify_capture_ip6_icmp(self.pg0, rx, tx)
+
+        #
+        # An MPLS xconnect - EOS label in IPv6 out w imp-null
+        #
+        route_334_eos = VppMplsRoute(
+            self, 334, 1,
+            [VppRoutePath(self.pg0.remote_ip6,
+                          self.pg0.sw_if_index,
+                          labels=[3],
+                          proto=DpoProto.DPO_PROTO_IP6)])
+        route_334_eos.add_vpp_config()
+
+        self.vapi.cli("clear trace")
+        tx = self.create_stream_labelled_ip6(self.pg0, [334], 64)
+        self.pg0.add_stream(tx)
+
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+
+        rx = self.pg0.get_capture()
+        self.verify_capture_ip6(self.pg0, rx, tx)
+
+        #
+        # disposed packets have an TTL expired
+        #
+        self.vapi.cli("clear trace")
+        tx = self.create_stream_labelled_ip6(self.pg0, [334], 64,
+                                             dst_ip=self.pg1.remote_ip6,
+                                             hlim=0)
+        self.pg0.add_stream(tx)
+
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+
+        rx = self.pg0.get_capture()
+        self.verify_capture_ip6_icmp(self.pg0, rx, tx)
+
         #
         # An MPLS xconnect - non-EOS label in IP out - an invalid configuration
         # so this traffic should be dropped.
@@ -1008,6 +1132,14 @@ class TestMPLS(VppTestCase):
         rx = self.pg1.get_capture(257)
         self.verify_capture_ip4(self.pg1, rx, tx)
 
+        #
+        # disposed packets have an invalid IPv4 checkusm
+        #
+        tx = self.create_stream_labelled_ip4(self.pg0, [34],
+                                             dst_ip="232.1.1.1", n=65,
+                                             chksum=1)
+        self.send_and_assert_no_replies(self.pg0, tx, "Invalid Checksum")
+
         #
         # set the RPF-ID of the enrtry to not match the input packet's
         #
@@ -1074,6 +1206,19 @@ class TestMPLS(VppTestCase):
         rx = self.pg1.get_capture(257)
         self.verify_capture_ip6(self.pg1, rx, tx)
 
+        #
+        # disposed packets have hop-limit = 1
+        #
+        tx = self.create_stream_labelled_ip6(self.pg0, [34], 255,
+                                             dst_ip="ff01::1", hlim=1)
+        self.pg0.add_stream(tx)
+
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+
+        rx = self.pg0.get_capture(257)
+        self.verify_capture_ip6_icmp(self.pg0, rx, tx)
+
         #
         # set the RPF-ID of the enrtry to not match the input packet's
         #
@@ -1092,6 +1237,9 @@ class TestMPLSDisabled(VppTestCase):
         # create 2 pg interfaces
         self.create_pg_interfaces(range(2))
 
+        self.tbl = VppMplsTable(self, 0)
+        self.tbl.add_vpp_config()
+
         # PG0 is MPLS enalbed
         self.pg0.admin_up()
         self.pg0.config_ip4()
@@ -1102,18 +1250,12 @@ class TestMPLSDisabled(VppTestCase):
         self.pg1.admin_up()
 
     def tearDown(self):
-        super(TestMPLSDisabled, self).tearDown()
         for i in self.pg_interfaces:
             i.unconfig_ip4()
             i.admin_down()
 
-    def send_and_assert_no_replies(self, intf, pkts, remark):
-        intf.add_stream(pkts)
-        self.pg_enable_capture(self.pg_interfaces)
-        self.pg_start()
-        for i in self.pg_interfaces:
-            i.get_capture(0)
-            i.assert_nothing_captured(remark=remark)
+        self.pg0.disable_mpls()
+        super(TestMPLSDisabled, self).tearDown()
 
     def test_mpls_disabled(self):
         """ MPLS Disabled """
@@ -1174,6 +1316,13 @@ class TestMPLSPIC(VppTestCase):
         # create 2 pg interfaces
         self.create_pg_interfaces(range(4))
 
+        mpls_tbl = VppMplsTable(self, 0)
+        mpls_tbl.add_vpp_config()
+        tbl4 = VppIpTable(self, 1)
+        tbl4.add_vpp_config()
+        tbl6 = VppIpTable(self, 1, is_ip6=1)
+        tbl6.add_vpp_config()
+
         # core links
         self.pg0.admin_up()
         self.pg0.config_ip4()
@@ -1201,14 +1350,15 @@ class TestMPLSPIC(VppTestCase):
         self.pg3.resolve_ndp()
 
     def tearDown(self):
-        super(TestMPLSPIC, self).tearDown()
         self.pg0.disable_mpls()
+        self.pg1.disable_mpls()
         for i in self.pg_interfaces:
             i.unconfig_ip4()
             i.unconfig_ip6()
             i.set_table_ip4(0)
             i.set_table_ip6(0)
             i.admin_down()
+        super(TestMPLSPIC, self).tearDown()
 
     def test_mpls_ibgp_pic(self):
         """ MPLS iBGP PIC edge convergence
@@ -1534,24 +1684,30 @@ class TestMPLSL2(VppTestCase):
         # create 2 pg interfaces
         self.create_pg_interfaces(range(2))
 
+        # create the default MPLS table
+        self.tables = []
+        tbl = VppMplsTable(self, 0)
+        tbl.add_vpp_config()
+        self.tables.append(tbl)
+
         # use pg0 as the core facing interface
         self.pg0.admin_up()
         self.pg0.config_ip4()
         self.pg0.resolve_arp()
         self.pg0.enable_mpls()
 
-        # use the other 2 for customer facg L2 links
+        # use the other 2 for customer facing L2 links
         for i in self.pg_interfaces[1:]:
             i.admin_up()
 
     def tearDown(self):
-        super(TestMPLSL2, self).tearDown()
         for i in self.pg_interfaces[1:]:
             i.admin_down()
 
         self.pg0.disable_mpls()
         self.pg0.unconfig_ip4()
         self.pg0.admin_down()
+        super(TestMPLSL2, self).tearDown()
 
     def verify_capture_tunneled_ethernet(self, capture, sent, mpls_labels,
                                          ttl=255, top=None):