ipsec: Support MPLS over IPSec[46] interface
[vpp.git] / test / test_ipsec_tun_if_esp.py
index 7e36d13..5bcd9dd 100644 (file)
@@ -7,13 +7,15 @@ from scapy.layers.l2 import Ether, GRE, Dot1Q
 from scapy.packet import Raw
 from scapy.layers.inet import IP, UDP
 from scapy.layers.inet6 import IPv6
 from scapy.packet import Raw
 from scapy.layers.inet import IP, UDP
 from scapy.layers.inet6 import IPv6
+from scapy.contrib.mpls import MPLS
 from framework import VppTestRunner
 from template_ipsec import TemplateIpsec, IpsecTun4Tests, IpsecTun6Tests, \
     IpsecTun4, IpsecTun6,  IpsecTcpTests, mk_scapy_crypt_key, \
     IpsecTun6HandoffTests, IpsecTun4HandoffTests, config_tun_params
 from vpp_gre_interface import VppGreInterface
 from vpp_ipip_tun_interface import VppIpIpTunInterface
 from framework import VppTestRunner
 from template_ipsec import TemplateIpsec, IpsecTun4Tests, IpsecTun6Tests, \
     IpsecTun4, IpsecTun6,  IpsecTcpTests, mk_scapy_crypt_key, \
     IpsecTun6HandoffTests, IpsecTun4HandoffTests, config_tun_params
 from vpp_gre_interface import VppGreInterface
 from vpp_ipip_tun_interface import VppIpIpTunInterface
-from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto
+from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto, VppMplsLabel, \
+    VppMplsTable, VppMplsRoute, FibPathProto
 from vpp_ipsec import VppIpsecSA, VppIpsecTunProtect, VppIpsecInterface
 from vpp_l2 import VppBridgeDomain, VppBridgeDomainPort
 from vpp_sub_interface import L2_VTR_OP, VppDot1QSubint
 from vpp_ipsec import VppIpsecSA, VppIpsecTunProtect, VppIpsecInterface
 from vpp_l2 import VppBridgeDomain, VppBridgeDomainPort
 from vpp_sub_interface import L2_VTR_OP, VppDot1QSubint
@@ -2570,6 +2572,82 @@ class TestIpsecItf4(TemplateIpsec,
         self.unconfig_network(p)
 
 
         self.unconfig_network(p)
 
 
+class TestIpsecItf4MPLS(TemplateIpsec,
+                        TemplateIpsecItf4,
+                        IpsecTun4):
+    """ IPsec Interface MPLSoIPv4 """
+
+    tun4_encrypt_node_name = "esp-mpls-encrypt-tun"
+
+    def setUp(self):
+        super(TestIpsecItf4MPLS, self).setUp()
+
+        self.tun_if = self.pg0
+
+    def tearDown(self):
+        super(TestIpsecItf4MPLS, self).tearDown()
+
+    def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
+                         payload_size=100):
+        return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
+                sa.encrypt(MPLS(label=44, ttl=3) /
+                           IP(src=src, dst=dst) /
+                           UDP(sport=1166, dport=2233) /
+                           Raw(b'X' * payload_size))
+                for i in range(count)]
+
+    def verify_encrypted(self, p, sa, rxs):
+        for rx in rxs:
+            try:
+                pkt = sa.decrypt(rx[IP])
+                if not pkt.haslayer(IP):
+                    pkt = IP(pkt[Raw].load)
+                self.assert_packet_checksums_valid(pkt)
+                self.assert_equal(pkt[MPLS].label, 44)
+                self.assert_equal(pkt[IP].dst, p.remote_tun_if_host)
+            except (IndexError, AssertionError):
+                self.logger.debug(ppp("Unexpected packet:", rx))
+                try:
+                    self.logger.debug(ppp("Decrypted packet:", pkt))
+                except:
+                    pass
+                raise
+
+    def test_tun_mpls_o_ip4(self):
+        """IPSEC interface MPLS over IPv4"""
+
+        n_pkts = 127
+        p = self.ipv4_params
+        f = FibPathProto
+
+        tbl = VppMplsTable(self, 0)
+        tbl.add_vpp_config()
+
+        self.config_network(p)
+        # deag MPLS routes from the tunnel
+        r4 = VppMplsRoute(self, 44, 1,
+                          [VppRoutePath(
+                              self.pg1.remote_ip4,
+                              self.pg1.sw_if_index)]).add_vpp_config()
+        p.route.modify([VppRoutePath(p.tun_if.remote_ip4,
+                                     p.tun_if.sw_if_index,
+                                     labels=[VppMplsLabel(44)])])
+        p.tun_if.enable_mpls()
+
+        self.config_sa_tun(p,
+                           self.pg0.local_ip4,
+                           self.pg0.remote_ip4)
+        self.config_protect(p)
+
+        self.verify_tun_44(p, count=n_pkts)
+
+        # cleanup
+        p.tun_if.disable_mpls()
+        self.unconfig_protect(p)
+        self.unconfig_sa(p)
+        self.unconfig_network(p)
+
+
 class TemplateIpsecItf6(object):
     """ IPsec Interface IPv6 """
 
 class TemplateIpsecItf6(object):
     """ IPsec Interface IPv6 """
 
@@ -2848,5 +2926,82 @@ class TestIpsecMIfEsp4(TemplateIpsec, IpsecTun4):
             self.verify_tun_44(p, count=N_PKTS)
 
 
             self.verify_tun_44(p, count=N_PKTS)
 
 
+class TestIpsecItf6MPLS(TemplateIpsec,
+                        TemplateIpsecItf6,
+                        IpsecTun6):
+    """ IPsec Interface MPLSoIPv6 """
+
+    tun6_encrypt_node_name = "esp-mpls-encrypt-tun"
+
+    def setUp(self):
+        super(TestIpsecItf6MPLS, self).setUp()
+
+        self.tun_if = self.pg0
+
+    def tearDown(self):
+        super(TestIpsecItf6MPLS, self).tearDown()
+
+    def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1,
+                          payload_size=100):
+        return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
+                sa.encrypt(MPLS(label=66, ttl=3) /
+                           IPv6(src=src, dst=dst) /
+                           UDP(sport=1166, dport=2233) /
+                           Raw(b'X' * payload_size))
+                for i in range(count)]
+
+    def verify_encrypted6(self, p, sa, rxs):
+        for rx in rxs:
+            try:
+                pkt = sa.decrypt(rx[IPv6])
+                if not pkt.haslayer(IPv6):
+                    pkt = IP(pkt[Raw].load)
+                self.assert_packet_checksums_valid(pkt)
+                self.assert_equal(pkt[MPLS].label, 66)
+                self.assert_equal(pkt[IPv6].dst, p.remote_tun_if_host)
+            except (IndexError, AssertionError):
+                self.logger.debug(ppp("Unexpected packet:", rx))
+                try:
+                    self.logger.debug(ppp("Decrypted packet:", pkt))
+                except:
+                    pass
+                raise
+
+    def test_tun_mpls_o_ip6(self):
+        """IPSEC interface MPLS over IPv6"""
+
+        n_pkts = 127
+        p = self.ipv6_params
+        f = FibPathProto
+
+        tbl = VppMplsTable(self, 0)
+        tbl.add_vpp_config()
+
+        self.config_network(p)
+        # deag MPLS routes from the tunnel
+        r6 = VppMplsRoute(self, 66, 1,
+                          [VppRoutePath(
+                              self.pg1.remote_ip6,
+                              self.pg1.sw_if_index)],
+                          eos_proto=f.FIB_PATH_NH_PROTO_IP6).add_vpp_config()
+        p.route.modify([VppRoutePath(p.tun_if.remote_ip6,
+                                     p.tun_if.sw_if_index,
+                                     labels=[VppMplsLabel(66)])])
+        p.tun_if.enable_mpls()
+
+        self.config_sa_tun(p,
+                           self.pg0.local_ip6,
+                           self.pg0.remote_ip6)
+        self.config_protect(p)
+
+        self.verify_tun_66(p, count=n_pkts)
+
+        # cleanup
+        p.tun_if.disable_mpls()
+        self.unconfig_protect(p)
+        self.unconfig_sa(p)
+        self.unconfig_network(p)
+
+
 if __name__ == '__main__':
     unittest.main(testRunner=VppTestRunner)
 if __name__ == '__main__':
     unittest.main(testRunner=VppTestRunner)