crypto crypto-openssl: support hashing operations
[vpp.git] / test / test_ipip.py
index 8f18c07..83395e0 100644 (file)
@@ -3,10 +3,12 @@
 
 import unittest
 from scapy.layers.inet6 import IPv6, Ether, IP, UDP, IPv6ExtHdrFragment, Raw
+from scapy.contrib.mpls import MPLS
 from scapy.all import fragment, fragment6, RandShort, defragment6
 from framework import VppTestCase, VppTestRunner
 from vpp_ip import DpoProto
-from vpp_ip_route import VppIpRoute, VppRoutePath, VppIpTable, FibPathProto
+from vpp_ip_route import VppIpRoute, VppRoutePath, VppIpTable, FibPathProto, \
+    VppMplsLabel, VppMplsRoute, VppMplsTable
 from vpp_ipip_tun_interface import VppIpIpTunInterface
 from vpp_teib import VppTeib
 from vpp_papi import VppEnum
@@ -1167,5 +1169,151 @@ class TestIPIP6(VppTestCase):
         return 'x' * len
 
 
+class TestMPLS(VppTestCase):
+    """ MPLS Test Case """
+
+    @classmethod
+    def setUpClass(cls):
+        super(TestMPLS, cls).setUpClass()
+        cls.create_pg_interfaces(range(2))
+        cls.interfaces = list(cls.pg_interfaces)
+
+    @classmethod
+    def tearDownClass(cls):
+        super(TestMPLS, cls).tearDownClass()
+
+    def setUp(self):
+        super(TestMPLS, self).setUp()
+        for i in self.interfaces:
+            i.admin_up()
+            i.config_ip4()
+            i.config_ip6()
+            i.disable_ipv6_ra()
+            i.resolve_arp()
+            i.resolve_ndp()
+
+    def tearDown(self):
+        super(TestMPLS, self).tearDown()
+
+        for i in self.pg_interfaces:
+            i.unconfig_ip4()
+            i.unconfig_ip6()
+            i.admin_down()
+
+    def test_mpls(self):
+        """ MPLS over ip{6,4} test """
+
+        tbl = VppMplsTable(self, 0)
+        tbl.add_vpp_config()
+
+        self.p_ether = Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
+        self.p_payload = UDP(sport=1234, dport=1234) / Raw(b'X' * 100)
+        f = FibPathProto
+
+        # IPv4 transport
+        tun4 = VppIpIpTunInterface(
+            self,
+            self.pg1,
+            self.pg1.local_ip4,
+            self.pg1.remote_ip4).add_vpp_config()
+        tun4.admin_up()
+        tun4.config_ip4()
+        tun4.enable_mpls()
+
+        # IPv6 transport
+        tun6 = VppIpIpTunInterface(
+            self,
+            self.pg1,
+            self.pg1.local_ip6,
+            self.pg1.remote_ip6).add_vpp_config()
+        tun6.admin_up()
+        tun6.config_ip6()
+        tun6.enable_mpls()
+
+        # ip routes into the tunnels with output labels
+        r4 = VppIpRoute(self, "1.1.1.1", 32,
+                        [VppRoutePath(
+                            tun4.remote_ip4,
+                            tun4.sw_if_index,
+                            labels=[VppMplsLabel(44)])]).add_vpp_config()
+        r6 = VppIpRoute(self, "1::1", 128,
+                        [VppRoutePath(
+                            tun6.remote_ip6,
+                            tun6.sw_if_index,
+                            labels=[VppMplsLabel(66)])]).add_vpp_config()
+
+        # deag MPLS routes from the tunnel
+        r4 = VppMplsRoute(self, 44, 1,
+                          [VppRoutePath(
+                              self.pg0.remote_ip4,
+                              self.pg0.sw_if_index)]).add_vpp_config()
+        r6 = VppMplsRoute(self, 66, 1,
+                          [VppRoutePath(
+                              self.pg0.remote_ip6,
+                              self.pg0.sw_if_index)],
+                          eos_proto=f.FIB_PATH_NH_PROTO_IP6).add_vpp_config()
+
+        #
+        # Tunnel Encap
+        #
+        p4 = (self.p_ether / IP(src="2.2.2.2", dst="1.1.1.1") / self.p_payload)
+
+        rxs = self.send_and_expect(self.pg0, p4 * N_PACKETS, self.pg1)
+
+        for rx in rxs:
+            self.assertEqual(rx[IP].src, self.pg1.local_ip4)
+            self.assertEqual(rx[IP].dst, self.pg1.remote_ip4)
+            self.assertEqual(rx[MPLS].label, 44)
+            inner = rx[MPLS].payload
+            self.assertEqual(inner.src, "2.2.2.2")
+            self.assertEqual(inner.dst, "1.1.1.1")
+
+        p6 = (self.p_ether / IPv6(src="2::2", dst="1::1") / self.p_payload)
+
+        rxs = self.send_and_expect(self.pg0, p6 * N_PACKETS, self.pg1)
+
+        for rx in rxs:
+            self.assertEqual(rx[IPv6].src, self.pg1.local_ip6)
+            self.assertEqual(rx[IPv6].dst, self.pg1.remote_ip6)
+            self.assertEqual(rx[MPLS].label, 66)
+            inner = rx[MPLS].payload
+            self.assertEqual(inner.src, "2::2")
+            self.assertEqual(inner.dst, "1::1")
+
+        #
+        # Tunnel Decap
+        #
+        p4 = (self.p_ether /
+              IP(src=self.pg1.remote_ip4,
+                 dst=self.pg1.local_ip4) /
+              MPLS(label=44, ttl=4) /
+              IP(src="1.1.1.1",
+                 dst="2.2.2.2") /
+              self.p_payload)
+
+        rxs = self.send_and_expect(self.pg1, p4 * N_PACKETS, self.pg0)
+
+        for rx in rxs:
+            self.assertEqual(rx[IP].src, "1.1.1.1")
+            self.assertEqual(rx[IP].dst, "2.2.2.2")
+
+        p6 = (self.p_ether /
+              IPv6(src=self.pg1.remote_ip6,
+                   dst=self.pg1.local_ip6) /
+              MPLS(label=66, ttl=4) /
+              IPv6(src="1::1",
+                   dst="2::2") /
+              self.p_payload)
+
+        rxs = self.send_and_expect(self.pg1, p6 * N_PACKETS, self.pg0)
+
+        for rx in rxs:
+            self.assertEqual(rx[IPv6].src, "1::1")
+            self.assertEqual(rx[IPv6].dst, "2::2")
+
+        tun4.disable_mpls()
+        tun6.disable_mpls()
+
+
 if __name__ == '__main__':
     unittest.main(testRunner=VppTestRunner)