ipsec: ipsec-tun protect
[vpp.git] / test / test_ipsec_tun_if_esp.py
index 5ef0bdb..cc220d5 100644 (file)
@@ -5,13 +5,15 @@ import copy
 from scapy.layers.ipsec import ESP
 from scapy.layers.l2 import Ether, Raw, GRE
 from scapy.layers.inet import IP, UDP
 from scapy.layers.ipsec import ESP
 from scapy.layers.l2 import Ether, Raw, GRE
 from scapy.layers.inet import IP, UDP
+from scapy.layers.inet6 import IPv6
 from framework import VppTestRunner, is_skip_aarch64_set, is_platform_aarch64
 from template_ipsec import TemplateIpsec, IpsecTun4Tests, IpsecTun6Tests, \
     IpsecTun4, IpsecTun6,  IpsecTcpTests,  config_tun_params
 from framework import VppTestRunner, is_skip_aarch64_set, is_platform_aarch64
 from template_ipsec import TemplateIpsec, IpsecTun4Tests, IpsecTun6Tests, \
     IpsecTun4, IpsecTun6,  IpsecTcpTests,  config_tun_params
-from vpp_ipsec_tun_interface import VppIpsecTunInterface, \
-    VppIpsecGRETunInterface
+from vpp_ipsec_tun_interface import VppIpsecTunInterface
+from vpp_gre_interface import VppGreInterface
+from vpp_ipip_tun_interface import VppIpIpTunInterface
 from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto
 from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto
-from vpp_ipsec import VppIpsecSA
+from vpp_ipsec import VppIpsecSA, VppIpsecTunProtect
 from vpp_l2 import VppBridgeDomain, VppBridgeDomainPort
 from util import ppp
 from vpp_papi import VppEnum
 from vpp_l2 import VppBridgeDomain, VppBridgeDomainPort
 from util import ppp
 from vpp_papi import VppEnum
@@ -58,8 +60,6 @@ class TemplateIpsec4TunIfEsp(TemplateIpsec):
         r.add_vpp_config()
 
     def tearDown(self):
         r.add_vpp_config()
 
     def tearDown(self):
-        if not self.vpp_dead:
-            self.vapi.cli("show hardware")
         super(TemplateIpsec4TunIfEsp, self).tearDown()
 
 
         super(TemplateIpsec4TunIfEsp, self).tearDown()
 
 
@@ -131,8 +131,6 @@ class TemplateIpsec6TunIfEsp(TemplateIpsec):
         r.add_vpp_config()
 
     def tearDown(self):
         r.add_vpp_config()
 
     def tearDown(self):
-        if not self.vpp_dead:
-            self.vapi.cli("show hardware")
         super(TemplateIpsec6TunIfEsp, self).tearDown()
 
 
         super(TemplateIpsec6TunIfEsp, self).tearDown()
 
 
@@ -198,8 +196,6 @@ class TestIpsec4MultiTunIfEsp(TemplateIpsec, IpsecTun4):
                                      0xffffffff)]).add_vpp_config()
 
     def tearDown(self):
                                      0xffffffff)]).add_vpp_config()
 
     def tearDown(self):
-        if not self.vpp_dead:
-            self.vapi.cli("show hardware")
         super(TestIpsec4MultiTunIfEsp, self).tearDown()
 
     def test_tun_44(self):
         super(TestIpsec4MultiTunIfEsp, self).tearDown()
 
     def test_tun_44(self):
@@ -442,8 +438,6 @@ class TestIpsec6MultiTunIfEsp(TemplateIpsec, IpsecTun6):
             r.add_vpp_config()
 
     def tearDown(self):
             r.add_vpp_config()
 
     def tearDown(self):
-        if not self.vpp_dead:
-            self.vapi.cli("show hardware")
         super(TestIpsec6MultiTunIfEsp, self).tearDown()
 
     def test_tun_66(self):
         super(TestIpsec6MultiTunIfEsp, self).tearDown()
 
     def test_tun_66(self):
@@ -456,9 +450,13 @@ class TestIpsec6MultiTunIfEsp(TemplateIpsec, IpsecTun6):
             self.assertEqual(c['packets'], 127)
 
 
             self.assertEqual(c['packets'], 127)
 
 
-class TemplateIpsecGRETunIfEsp(TemplateIpsec):
-    """ IPsec GRE tunnel interface tests """
-
+@unittest.skipIf(is_skip_aarch64_set and is_platform_aarch64,
+                 "test doesn't work on aarch64")
+class TestIpsecGreTebIfEsp(TemplateIpsec,
+                           IpsecTun4Tests):
+    """ Ipsec GRE TEB ESP - TUN tests """
+    tun4_encrypt_node_name = "esp4-encrypt-tun"
+    tun4_decrypt_node_name = "esp4-decrypt-tun"
     encryption_type = ESP
     omac = "00:11:22:33:44:55"
 
     encryption_type = ESP
     omac = "00:11:22:33:44:55"
 
@@ -509,7 +507,7 @@ class TemplateIpsecGRETunIfEsp(TemplateIpsec):
                 raise
 
     def setUp(self):
                 raise
 
     def setUp(self):
-        super(TemplateIpsecGRETunIfEsp, self).setUp()
+        super(TestIpsecGreTebIfEsp, self).setUp()
 
         self.tun_if = self.pg0
 
 
         self.tun_if = self.pg0
 
@@ -534,33 +532,634 @@ class TemplateIpsecGRETunIfEsp(TemplateIpsec):
                                  self.pg0.local_ip4)
         p.tun_sa_in.add_vpp_config()
 
                                  self.pg0.local_ip4)
         p.tun_sa_in.add_vpp_config()
 
-        self.tun = VppIpsecGRETunInterface(self, self.pg0,
-                                           p.tun_sa_out.id,
-                                           p.tun_sa_in.id)
-
+        self.tun = VppGreInterface(self,
+                                   self.pg0.local_ip4,
+                                   self.pg0.remote_ip4,
+                                   type=(VppEnum.vl_api_gre_tunnel_type_t.
+                                         GRE_API_TUNNEL_TYPE_TEB))
         self.tun.add_vpp_config()
         self.tun.add_vpp_config()
+
+        p.tun_protect = VppIpsecTunProtect(self,
+                                           self.tun,
+                                           p.tun_sa_out,
+                                           [p.tun_sa_in])
+
+        p.tun_protect.add_vpp_config()
+
         self.tun.admin_up()
         self.tun.config_ip4()
 
         self.tun.admin_up()
         self.tun.config_ip4()
 
-        VppIpRoute(self, p.remote_tun_if_host, 32,
-                   [VppRoutePath(self.tun.remote_ip4,
-                                 0xffffffff)]).add_vpp_config()
         VppBridgeDomainPort(self, bd1, self.tun).add_vpp_config()
         VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
 
         VppBridgeDomainPort(self, bd1, self.tun).add_vpp_config()
         VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
 
+        self.vapi.cli("clear ipsec sa")
+
     def tearDown(self):
     def tearDown(self):
-        if not self.vpp_dead:
-            self.vapi.cli("show hardware")
         self.tun.unconfig_ip4()
         self.tun.unconfig_ip4()
-        super(TemplateIpsecGRETunIfEsp, self).tearDown()
+        super(TestIpsecGreTebIfEsp, self).tearDown()
 
 
 
 
-@unittest.skipIf(is_skip_aarch64_set and is_platform_aarch64,
-                 "test doesn't work on aarch64")
-class TestIpsecGRETunIfEsp1(TemplateIpsecGRETunIfEsp, IpsecTun4Tests):
+class TestIpsecGreIfEsp(TemplateIpsec,
+                        IpsecTun4Tests):
     """ Ipsec GRE ESP - TUN tests """
     """ Ipsec GRE ESP - TUN tests """
-    tun4_encrypt_node_name = "esp4-encrypt"
-    tun4_decrypt_node_name = "esp4-decrypt"
+    tun4_encrypt_node_name = "esp4-encrypt-tun"
+    tun4_decrypt_node_name = "esp4-decrypt-tun"
+    encryption_type = ESP
+
+    def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1,
+                         payload_size=100):
+        return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
+                sa.encrypt(IP(src=self.pg0.remote_ip4,
+                              dst=self.pg0.local_ip4) /
+                           GRE() /
+                           IP(src=self.pg1.local_ip4,
+                              dst=self.pg1.remote_ip4) /
+                           UDP(sport=1144, dport=2233) /
+                           Raw('X' * payload_size))
+                for i in range(count)]
+
+    def gen_pkts(self, sw_intf, src, dst, count=1,
+                 payload_size=100):
+        return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
+                IP(src="1.1.1.1", dst="1.1.1.2") /
+                UDP(sport=1144, dport=2233) /
+                Raw('X' * payload_size)
+                for i in range(count)]
+
+    def verify_decrypted(self, p, rxs):
+        for rx in rxs:
+            self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
+            self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
+
+    def verify_encrypted(self, p, sa, rxs):
+        for rx in rxs:
+            try:
+                pkt = sa.decrypt(rx[IP])
+                if not pkt.haslayer(IP):
+                    pkt = IP(pkt[Raw].load)
+                self.assert_packet_checksums_valid(pkt)
+                self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
+                self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
+                self.assertTrue(pkt.haslayer(GRE))
+                e = pkt[GRE]
+                self.assertEqual(e[IP].dst, "1.1.1.2")
+            except (IndexError, AssertionError):
+                self.logger.debug(ppp("Unexpected packet:", rx))
+                try:
+                    self.logger.debug(ppp("Decrypted packet:", pkt))
+                except:
+                    pass
+                raise
+
+    def setUp(self):
+        super(TestIpsecGreIfEsp, self).setUp()
+
+        self.tun_if = self.pg0
+
+        p = self.ipv4_params
+
+        bd1 = VppBridgeDomain(self, 1)
+        bd1.add_vpp_config()
+
+        p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
+                                  p.auth_algo_vpp_id, p.auth_key,
+                                  p.crypt_algo_vpp_id, p.crypt_key,
+                                  self.vpp_esp_protocol,
+                                  self.pg0.local_ip4,
+                                  self.pg0.remote_ip4)
+        p.tun_sa_out.add_vpp_config()
+
+        p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
+                                 p.auth_algo_vpp_id, p.auth_key,
+                                 p.crypt_algo_vpp_id, p.crypt_key,
+                                 self.vpp_esp_protocol,
+                                 self.pg0.remote_ip4,
+                                 self.pg0.local_ip4)
+        p.tun_sa_in.add_vpp_config()
+
+        self.tun = VppGreInterface(self,
+                                   self.pg0.local_ip4,
+                                   self.pg0.remote_ip4)
+        self.tun.add_vpp_config()
+
+        p.tun_protect = VppIpsecTunProtect(self,
+                                           self.tun,
+                                           p.tun_sa_out,
+                                           [p.tun_sa_in])
+        p.tun_protect.add_vpp_config()
+
+        self.tun.admin_up()
+        self.tun.config_ip4()
+
+        VppIpRoute(self, "1.1.1.2", 32,
+                   [VppRoutePath(self.tun.remote_ip4,
+                                 0xffffffff)]).add_vpp_config()
+
+    def tearDown(self):
+        self.tun.unconfig_ip4()
+        super(TestIpsecGreIfEsp, self).tearDown()
+
+
+class TemplateIpsec4TunProtect(object):
+    """ IPsec IPv4 Tunnel protect """
+
+    def config_sa_tra(self, p):
+        config_tun_params(p, self.encryption_type, self.tun_if)
+
+        p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
+                                  p.auth_algo_vpp_id, p.auth_key,
+                                  p.crypt_algo_vpp_id, p.crypt_key,
+                                  self.vpp_esp_protocol)
+        p.tun_sa_out.add_vpp_config()
+
+        p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
+                                 p.auth_algo_vpp_id, p.auth_key,
+                                 p.crypt_algo_vpp_id, p.crypt_key,
+                                 self.vpp_esp_protocol)
+        p.tun_sa_in.add_vpp_config()
+
+    def config_sa_tun(self, p):
+        config_tun_params(p, self.encryption_type, self.tun_if)
+
+        p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
+                                  p.auth_algo_vpp_id, p.auth_key,
+                                  p.crypt_algo_vpp_id, p.crypt_key,
+                                  self.vpp_esp_protocol,
+                                  self.tun_if.remote_addr[p.addr_type],
+                                  self.tun_if.local_addr[p.addr_type])
+        p.tun_sa_out.add_vpp_config()
+
+        p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
+                                 p.auth_algo_vpp_id, p.auth_key,
+                                 p.crypt_algo_vpp_id, p.crypt_key,
+                                 self.vpp_esp_protocol,
+                                 self.tun_if.remote_addr[p.addr_type],
+                                 self.tun_if.local_addr[p.addr_type])
+        p.tun_sa_in.add_vpp_config()
+
+    def config_protect(self, p):
+        p.tun_protect = VppIpsecTunProtect(self,
+                                           p.tun_if,
+                                           p.tun_sa_out,
+                                           [p.tun_sa_in])
+        p.tun_protect.add_vpp_config()
+
+    def config_network(self, p):
+        p.tun_if = VppIpIpTunInterface(self, self.pg0,
+                                       self.pg0.local_ip4,
+                                       self.pg0.remote_ip4)
+        p.tun_if.add_vpp_config()
+        p.tun_if.admin_up()
+        p.tun_if.config_ip4()
+
+        p.route = VppIpRoute(self, p.remote_tun_if_host, 32,
+                             [VppRoutePath(p.tun_if.remote_ip4,
+                                           0xffffffff)])
+        p.route.add_vpp_config()
+
+    def unconfig_network(self, p):
+        p.route.remove_vpp_config()
+        p.tun_if.remove_vpp_config()
+
+    def unconfig_protect(self, p):
+        p.tun_protect.remove_vpp_config()
+
+    def unconfig_sa(self, p):
+        p.tun_sa_out.remove_vpp_config()
+        p.tun_sa_in.remove_vpp_config()
+
+
+class TestIpsec4TunProtect(TemplateIpsec,
+                           TemplateIpsec4TunProtect,
+                           IpsecTun4):
+    """ IPsec IPv4 Tunnel protect - transport mode"""
+
+    encryption_type = ESP
+    tun4_encrypt_node_name = "esp4-encrypt-tun"
+    tun4_decrypt_node_name = "esp4-decrypt-tun"
+
+    def setUp(self):
+        super(TestIpsec4TunProtect, self).setUp()
+
+        self.tun_if = self.pg0
+
+    def tearDown(self):
+        super(TestIpsec4TunProtect, self).tearDown()
+
+    def test_tun_44(self):
+        """IPSEC tunnel protect"""
+
+        p = self.ipv4_params
+
+        self.config_network(p)
+        self.config_sa_tra(p)
+        self.config_protect(p)
+
+        self.verify_tun_44(p, count=127)
+        c = p.tun_if.get_rx_stats()
+        self.assertEqual(c['packets'], 127)
+        c = p.tun_if.get_tx_stats()
+        self.assertEqual(c['packets'], 127)
+
+        # rekey - create new SAs and update the tunnel protection
+        np = copy.copy(p)
+        np.crypt_key = 'X' + p.crypt_key[1:]
+        np.scapy_tun_spi += 100
+        np.scapy_tun_sa_id += 1
+        np.vpp_tun_spi += 100
+        np.vpp_tun_sa_id += 1
+        np.tun_if.local_spi = p.vpp_tun_spi
+        np.tun_if.remote_spi = p.scapy_tun_spi
+
+        self.config_sa_tra(np)
+        self.config_protect(np)
+        self.unconfig_sa(p)
+
+        self.verify_tun_44(np, count=127)
+        c = p.tun_if.get_rx_stats()
+        self.assertEqual(c['packets'], 254)
+        c = p.tun_if.get_tx_stats()
+        self.assertEqual(c['packets'], 254)
+
+        # teardown
+        self.unconfig_protect(np)
+        self.unconfig_sa(np)
+        self.unconfig_network(p)
+
+
+class TestIpsec4TunProtectTun(TemplateIpsec,
+                              TemplateIpsec4TunProtect,
+                              IpsecTun4):
+    """ IPsec IPv4 Tunnel protect - tunnel mode"""
+
+    encryption_type = ESP
+    tun4_encrypt_node_name = "esp4-encrypt-tun"
+    tun4_decrypt_node_name = "esp4-decrypt-tun"
+
+    def setUp(self):
+        super(TestIpsec4TunProtectTun, self).setUp()
+
+        self.tun_if = self.pg0
+
+    def tearDown(self):
+        super(TestIpsec4TunProtectTun, self).tearDown()
+
+    def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1,
+                         payload_size=100):
+        return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
+                sa.encrypt(IP(src=sw_intf.remote_ip4,
+                              dst=sw_intf.local_ip4) /
+                           IP(src=src, dst=dst) /
+                           UDP(sport=1144, dport=2233) /
+                           Raw('X' * payload_size))
+                for i in range(count)]
+
+    def gen_pkts(self, sw_intf, src, dst, count=1,
+                 payload_size=100):
+        return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
+                IP(src=src, dst=dst) /
+                UDP(sport=1144, dport=2233) /
+                Raw('X' * payload_size)
+                for i in range(count)]
+
+    def verify_decrypted(self, p, rxs):
+        for rx in rxs:
+            self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
+            self.assert_equal(rx[IP].src, p.remote_tun_if_host)
+            self.assert_packet_checksums_valid(rx)
+
+    def verify_encrypted(self, p, sa, rxs):
+        for rx in rxs:
+            try:
+                pkt = sa.decrypt(rx[IP])
+                if not pkt.haslayer(IP):
+                    pkt = IP(pkt[Raw].load)
+                self.assert_packet_checksums_valid(pkt)
+                self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
+                self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
+                inner = pkt[IP].payload
+                self.assertEqual(inner[IP][IP].dst, p.remote_tun_if_host)
+
+            except (IndexError, AssertionError):
+                self.logger.debug(ppp("Unexpected packet:", rx))
+                try:
+                    self.logger.debug(ppp("Decrypted packet:", pkt))
+                except:
+                    pass
+                raise
+
+    def test_tun_44(self):
+        """IPSEC tunnel protect """
+
+        p = self.ipv4_params
+
+        self.config_network(p)
+        self.config_sa_tun(p)
+        self.config_protect(p)
+
+        self.verify_tun_44(p, count=127)
+
+        c = p.tun_if.get_rx_stats()
+        self.assertEqual(c['packets'], 127)
+        c = p.tun_if.get_tx_stats()
+        self.assertEqual(c['packets'], 127)
+
+        # rekey - create new SAs and update the tunnel protection
+        np = copy.copy(p)
+        np.crypt_key = 'X' + p.crypt_key[1:]
+        np.scapy_tun_spi += 100
+        np.scapy_tun_sa_id += 1
+        np.vpp_tun_spi += 100
+        np.vpp_tun_sa_id += 1
+        np.tun_if.local_spi = p.vpp_tun_spi
+        np.tun_if.remote_spi = p.scapy_tun_spi
+
+        self.config_sa_tun(np)
+        self.config_protect(np)
+        self.unconfig_sa(p)
+
+        self.verify_tun_44(np, count=127)
+        c = p.tun_if.get_rx_stats()
+        self.assertEqual(c['packets'], 254)
+        c = p.tun_if.get_tx_stats()
+        self.assertEqual(c['packets'], 254)
+
+        # teardown
+        self.unconfig_protect(np)
+        self.unconfig_sa(np)
+        self.unconfig_network(p)
+
+
+class TemplateIpsec6TunProtect(object):
+    """ IPsec IPv6 Tunnel protect """
+
+    def config_sa_tra(self, p):
+        config_tun_params(p, self.encryption_type, self.tun_if)
+
+        p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
+                                  p.auth_algo_vpp_id, p.auth_key,
+                                  p.crypt_algo_vpp_id, p.crypt_key,
+                                  self.vpp_esp_protocol)
+        p.tun_sa_out.add_vpp_config()
+
+        p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
+                                 p.auth_algo_vpp_id, p.auth_key,
+                                 p.crypt_algo_vpp_id, p.crypt_key,
+                                 self.vpp_esp_protocol)
+        p.tun_sa_in.add_vpp_config()
+
+    def config_sa_tun(self, p):
+        config_tun_params(p, self.encryption_type, self.tun_if)
+
+        p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
+                                  p.auth_algo_vpp_id, p.auth_key,
+                                  p.crypt_algo_vpp_id, p.crypt_key,
+                                  self.vpp_esp_protocol,
+                                  self.tun_if.remote_addr[p.addr_type],
+                                  self.tun_if.local_addr[p.addr_type])
+        p.tun_sa_out.add_vpp_config()
+
+        p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
+                                 p.auth_algo_vpp_id, p.auth_key,
+                                 p.crypt_algo_vpp_id, p.crypt_key,
+                                 self.vpp_esp_protocol,
+                                 self.tun_if.remote_addr[p.addr_type],
+                                 self.tun_if.local_addr[p.addr_type])
+        p.tun_sa_in.add_vpp_config()
+
+    def config_protect(self, p):
+        p.tun_protect = VppIpsecTunProtect(self,
+                                           p.tun_if,
+                                           p.tun_sa_out,
+                                           [p.tun_sa_in])
+        p.tun_protect.add_vpp_config()
+
+    def config_network(self, p):
+        p.tun_if = VppIpIpTunInterface(self, self.pg0,
+                                       self.pg0.local_ip6,
+                                       self.pg0.remote_ip6)
+        p.tun_if.add_vpp_config()
+        p.tun_if.admin_up()
+        p.tun_if.config_ip6()
+
+        p.route = VppIpRoute(self, p.remote_tun_if_host, 128,
+                             [VppRoutePath(p.tun_if.remote_ip6,
+                                           0xffffffff,
+                                           proto=DpoProto.DPO_PROTO_IP6)],
+                             is_ip6=1)
+        p.route.add_vpp_config()
+
+    def unconfig_network(self, p):
+        p.route.remove_vpp_config()
+        p.tun_if.remove_vpp_config()
+
+    def unconfig_protect(self, p):
+        p.tun_protect.remove_vpp_config()
+
+    def unconfig_sa(self, p):
+        p.tun_sa_out.remove_vpp_config()
+        p.tun_sa_in.remove_vpp_config()
+
+
+class TestIpsec6TunProtect(TemplateIpsec,
+                           TemplateIpsec6TunProtect,
+                           IpsecTun6):
+    """ IPsec IPv6 Tunnel protect - transport mode"""
+
+    encryption_type = ESP
+    tun6_encrypt_node_name = "esp6-encrypt-tun"
+    tun6_decrypt_node_name = "esp6-decrypt-tun"
+
+    def setUp(self):
+        super(TestIpsec6TunProtect, self).setUp()
+
+        self.tun_if = self.pg0
+
+    def tearDown(self):
+        super(TestIpsec6TunProtect, self).tearDown()
+
+    def test_tun_66(self):
+        """IPSEC tunnel protect"""
+
+        p = self.ipv6_params
+
+        self.config_network(p)
+        self.config_sa_tra(p)
+        self.config_protect(p)
+
+        self.verify_tun_66(p, count=127)
+        c = p.tun_if.get_rx_stats()
+        self.assertEqual(c['packets'], 127)
+        c = p.tun_if.get_tx_stats()
+        self.assertEqual(c['packets'], 127)
+
+        # rekey - create new SAs and update the tunnel protection
+        np = copy.copy(p)
+        np.crypt_key = 'X' + p.crypt_key[1:]
+        np.scapy_tun_spi += 100
+        np.scapy_tun_sa_id += 1
+        np.vpp_tun_spi += 100
+        np.vpp_tun_sa_id += 1
+        np.tun_if.local_spi = p.vpp_tun_spi
+        np.tun_if.remote_spi = p.scapy_tun_spi
+
+        self.config_sa_tra(np)
+        self.config_protect(np)
+        self.unconfig_sa(p)
+
+        self.verify_tun_66(np, count=127)
+        c = p.tun_if.get_rx_stats()
+        self.assertEqual(c['packets'], 254)
+        c = p.tun_if.get_tx_stats()
+        self.assertEqual(c['packets'], 254)
+
+        # 3 phase rekey
+        #  1) add two input SAs [old, new]
+        #  2) swap output SA to [new]
+        #  3) use only [new] input SA
+        np3 = copy.copy(np)
+        np3.crypt_key = 'Z' + p.crypt_key[1:]
+        np3.scapy_tun_spi += 100
+        np3.scapy_tun_sa_id += 1
+        np3.vpp_tun_spi += 100
+        np3.vpp_tun_sa_id += 1
+        np3.tun_if.local_spi = p.vpp_tun_spi
+        np3.tun_if.remote_spi = p.scapy_tun_spi
+
+        self.config_sa_tra(np3)
+
+        # step 1;
+        p.tun_protect.update_vpp_config(np.tun_sa_out,
+                                        [np.tun_sa_in, np3.tun_sa_in])
+        self.verify_tun_66(np, np, count=127)
+        self.verify_tun_66(np3, np, count=127)
+
+        # step 2;
+        p.tun_protect.update_vpp_config(np3.tun_sa_out,
+                                        [np.tun_sa_in, np3.tun_sa_in])
+        self.verify_tun_66(np, np3, count=127)
+        self.verify_tun_66(np3, np3, count=127)
+
+        # step 1;
+        p.tun_protect.update_vpp_config(np3.tun_sa_out,
+                                        [np3.tun_sa_in])
+        self.verify_tun_66(np3, np3, count=127)
+        self.verify_drop_tun_66(np, count=127)
+
+        c = p.tun_if.get_rx_stats()
+        self.assertEqual(c['packets'], 127*7)
+        c = p.tun_if.get_tx_stats()
+        self.assertEqual(c['packets'], 127*7)
+        self.unconfig_sa(np)
+
+        # teardown
+        self.unconfig_protect(np3)
+        self.unconfig_sa(np3)
+        self.unconfig_network(p)
+
+
+class TestIpsec6TunProtectTun(TemplateIpsec,
+                              TemplateIpsec6TunProtect,
+                              IpsecTun6):
+    """ IPsec IPv6 Tunnel protect - tunnel mode"""
+
+    encryption_type = ESP
+    tun6_encrypt_node_name = "esp6-encrypt-tun"
+    tun6_decrypt_node_name = "esp6-decrypt-tun"
+
+    def setUp(self):
+        super(TestIpsec6TunProtectTun, self).setUp()
+
+        self.tun_if = self.pg0
+
+    def tearDown(self):
+        super(TestIpsec6TunProtectTun, self).tearDown()
+
+    def gen_encrypt_pkts6(self, sa, sw_intf, src, dst, count=1,
+                          payload_size=100):
+        return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
+                sa.encrypt(IPv6(src=sw_intf.remote_ip6,
+                                dst=sw_intf.local_ip6) /
+                           IPv6(src=src, dst=dst) /
+                           UDP(sport=1166, dport=2233) /
+                           Raw('X' * payload_size))
+                for i in range(count)]
+
+    def gen_pkts6(self, sw_intf, src, dst, count=1,
+                  payload_size=100):
+        return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
+                IPv6(src=src, dst=dst) /
+                UDP(sport=1166, dport=2233) /
+                Raw('X' * payload_size)
+                for i in range(count)]
+
+    def verify_decrypted6(self, p, rxs):
+        for rx in rxs:
+            self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
+            self.assert_equal(rx[IPv6].src, p.remote_tun_if_host)
+            self.assert_packet_checksums_valid(rx)
+
+    def verify_encrypted6(self, p, sa, rxs):
+        for rx in rxs:
+            try:
+                pkt = sa.decrypt(rx[IPv6])
+                if not pkt.haslayer(IPv6):
+                    pkt = IPv6(pkt[Raw].load)
+                self.assert_packet_checksums_valid(pkt)
+                self.assert_equal(pkt[IPv6].dst, self.pg0.remote_ip6)
+                self.assert_equal(pkt[IPv6].src, self.pg0.local_ip6)
+                inner = pkt[IPv6].payload
+                self.assertEqual(inner[IPv6][IPv6].dst, p.remote_tun_if_host)
+
+            except (IndexError, AssertionError):
+                self.logger.debug(ppp("Unexpected packet:", rx))
+                try:
+                    self.logger.debug(ppp("Decrypted packet:", pkt))
+                except:
+                    pass
+                raise
+
+    def test_tun_66(self):
+        """IPSEC tunnel protect """
+
+        p = self.ipv6_params
+
+        self.config_network(p)
+        self.config_sa_tun(p)
+        self.config_protect(p)
+
+        self.verify_tun_66(p, count=127)
+
+        c = p.tun_if.get_rx_stats()
+        self.assertEqual(c['packets'], 127)
+        c = p.tun_if.get_tx_stats()
+        self.assertEqual(c['packets'], 127)
+
+        # rekey - create new SAs and update the tunnel protection
+        np = copy.copy(p)
+        np.crypt_key = 'X' + p.crypt_key[1:]
+        np.scapy_tun_spi += 100
+        np.scapy_tun_sa_id += 1
+        np.vpp_tun_spi += 100
+        np.vpp_tun_sa_id += 1
+        np.tun_if.local_spi = p.vpp_tun_spi
+        np.tun_if.remote_spi = p.scapy_tun_spi
+
+        self.config_sa_tun(np)
+        self.config_protect(np)
+        self.unconfig_sa(p)
+
+        self.verify_tun_66(np, count=127)
+        c = p.tun_if.get_rx_stats()
+        self.assertEqual(c['packets'], 254)
+        c = p.tun_if.get_tx_stats()
+        self.assertEqual(c['packets'], 254)
+
+        # teardown
+        self.unconfig_protect(np)
+        self.unconfig_sa(np)
+        self.unconfig_network(p)
+
 
 if __name__ == '__main__':
     unittest.main(testRunner=VppTestRunner)
 
 if __name__ == '__main__':
     unittest.main(testRunner=VppTestRunner)