IPSEC: Pass the algorithm salt (used in GCM) over the API
[vpp.git] / test / template_ipsec.py
index df4c2bc..d6641c4 100644 (file)
@@ -1,5 +1,6 @@
 import unittest
 import socket
+import struct
 
 from scapy.layers.inet import IP, ICMP, TCP, UDP
 from scapy.layers.ipsec import SecurityAssociation
@@ -7,7 +8,7 @@ from scapy.layers.l2 import Ether, Raw
 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest
 
 from framework import VppTestCase, VppTestRunner
-from util import ppp
+from util import ppp, reassemble4
 from vpp_papi import VppEnum
 
 
@@ -21,6 +22,7 @@ class IPsecIPv4Params(object):
 
     def __init__(self):
         self.remote_tun_if_host = '1.1.1.1'
+        self.remote_tun_if_host6 = '1111::1'
 
         self.scapy_tun_sa_id = 10
         self.scapy_tun_spi = 1001
@@ -41,6 +43,7 @@ class IPsecIPv4Params(object):
                                   IPSEC_API_CRYPTO_ALG_AES_CBC_128)
         self.crypt_algo = 'AES-CBC'  # scapy name
         self.crypt_key = 'JPjyOWBeVEQiMe7h'
+        self.salt = 0
         self.flags = 0
         self.nat_header = None
 
@@ -55,6 +58,7 @@ class IPsecIPv6Params(object):
 
     def __init__(self):
         self.remote_tun_if_host = '1111:1111:1111:1111:1111:1111:1111:1111'
+        self.remote_tun_if_host4 = '1.1.1.1'
 
         self.scapy_tun_sa_id = 50
         self.scapy_tun_spi = 3001
@@ -67,14 +71,15 @@ class IPsecIPv6Params(object):
         self.vpp_tra_spi = 4000
 
         self.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
-                                 IPSEC_API_INTEG_ALG_SHA_256_128)
-        self.auth_algo = 'SHA2-256-128'  # scapy name
+                                 IPSEC_API_INTEG_ALG_SHA1_96)
+        self.auth_algo = 'HMAC-SHA1-96'  # scapy name
         self.auth_key = 'C91KUR9GYMm5GfkEvNjX'
 
         self.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
-                                  IPSEC_API_CRYPTO_ALG_AES_CBC_256)
+                                  IPSEC_API_CRYPTO_ALG_AES_CBC_128)
         self.crypt_algo = 'AES-CBC'  # scapy name
-        self.crypt_key = 'JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h'
+        self.crypt_key = 'JPjyOWBeVEQiMe7h'
+        self.salt = 0
         self.flags = 0
         self.nat_header = None
 
@@ -82,10 +87,15 @@ class IPsecIPv6Params(object):
 def config_tun_params(p, encryption_type, tun_if):
     ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
     use_esn = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
-                              IPSEC_API_SAD_FLAG_USE_EXTENDED_SEQ_NUM))
+                              IPSEC_API_SAD_FLAG_USE_ESN))
+    if p.crypt_algo == "AES-GCM":
+        crypt_key = p.crypt_key + struct.pack("!I", p.salt)
+    else:
+        crypt_key = p.crypt_key
     p.scapy_tun_sa = SecurityAssociation(
         encryption_type, spi=p.vpp_tun_spi,
-        crypt_algo=p.crypt_algo, crypt_key=p.crypt_key,
+        crypt_algo=p.crypt_algo,
+        crypt_key=crypt_key,
         auth_algo=p.auth_algo, auth_key=p.auth_key,
         tunnel_header=ip_class_by_addr_type[p.addr_type](
             src=tun_if.remote_addr[p.addr_type],
@@ -94,7 +104,8 @@ def config_tun_params(p, encryption_type, tun_if):
         use_esn=use_esn)
     p.vpp_tun_sa = SecurityAssociation(
         encryption_type, spi=p.scapy_tun_spi,
-        crypt_algo=p.crypt_algo, crypt_key=p.crypt_key,
+        crypt_algo=p.crypt_algo,
+        crypt_key=crypt_key,
         auth_algo=p.auth_algo, auth_key=p.auth_key,
         tunnel_header=ip_class_by_addr_type[p.addr_type](
             dst=tun_if.remote_addr[p.addr_type],
@@ -104,13 +115,17 @@ def config_tun_params(p, encryption_type, tun_if):
 
 
 def config_tra_params(p, encryption_type):
-    use_esn = p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
-                         IPSEC_API_SAD_FLAG_USE_EXTENDED_SEQ_NUM)
+    use_esn = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
+                              IPSEC_API_SAD_FLAG_USE_ESN))
+    if p.crypt_algo == "AES-GCM":
+        crypt_key = p.crypt_key + struct.pack("!I", p.salt)
+    else:
+        crypt_key = p.crypt_key
     p.scapy_tra_sa = SecurityAssociation(
         encryption_type,
         spi=p.vpp_tra_spi,
         crypt_algo=p.crypt_algo,
-        crypt_key=p.crypt_key,
+        crypt_key=crypt_key,
         auth_algo=p.auth_algo,
         auth_key=p.auth_key,
         nat_t_header=p.nat_header,
@@ -119,7 +134,7 @@ def config_tra_params(p, encryption_type):
         encryption_type,
         spi=p.scapy_tra_spi,
         crypt_algo=p.crypt_algo,
-        crypt_key=p.crypt_key,
+        crypt_key=crypt_key,
         auth_algo=p.auth_algo,
         auth_key=p.auth_key,
         nat_t_header=p.nat_header,
@@ -144,97 +159,94 @@ class TemplateIpsec(VppTestCase):
     |tun_if| ------->  |VPP| ------> |pg1|
      ------             ---           ---
     """
+    tun_spd_id = 1
+    tra_spd_id = 2
 
     def ipsec_select_backend(self):
         """ empty method to be overloaded when necessary """
         pass
 
+    @classmethod
+    def setUpClass(cls):
+        super(TemplateIpsec, cls).setUpClass()
+
+    @classmethod
+    def tearDownClass(cls):
+        super(TemplateIpsec, cls).tearDownClass()
+
     def setup_params(self):
         self.ipv4_params = IPsecIPv4Params()
         self.ipv6_params = IPsecIPv6Params()
         self.params = {self.ipv4_params.addr_type: self.ipv4_params,
                        self.ipv6_params.addr_type: self.ipv6_params}
 
+    def config_interfaces(self):
+        self.create_pg_interfaces(range(3))
+        self.interfaces = list(self.pg_interfaces)
+        for i in self.interfaces:
+            i.admin_up()
+            i.config_ip4()
+            i.resolve_arp()
+            i.config_ip6()
+            i.resolve_ndp()
+
     def setUp(self):
         super(TemplateIpsec, self).setUp()
 
         self.setup_params()
-        self.payload = "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"\
-                       "XXXXXXXXXXXXXXXXXXXXX"
-
-        self.tun_spd_id = 1
-        self.tra_spd_id = 2
 
         self.vpp_esp_protocol = (VppEnum.vl_api_ipsec_proto_t.
                                  IPSEC_API_PROTO_ESP)
         self.vpp_ah_protocol = (VppEnum.vl_api_ipsec_proto_t.
                                 IPSEC_API_PROTO_AH)
 
-        self.create_pg_interfaces(range(3))
-        self.interfaces = list(self.pg_interfaces)
-        for i in self.interfaces:
-            i.admin_up()
-            i.config_ip4()
-            i.resolve_arp()
-            i.config_ip6()
-            i.resolve_ndp()
-        self.ipsec_select_backend()
+        self.config_interfaces()
 
-    def tearDown(self):
-        super(TemplateIpsec, self).tearDown()
+        self.ipsec_select_backend()
 
+    def unconfig_interfaces(self):
         for i in self.interfaces:
             i.admin_down()
             i.unconfig_ip4()
             i.unconfig_ip6()
 
-        if not self.vpp_dead:
-            self.vapi.cli("show hardware")
+    def tearDown(self):
+        super(TemplateIpsec, self).tearDown()
 
-    def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1):
+        self.unconfig_interfaces()
+
+    def show_commands_at_teardown(self):
+        self.logger.info(self.vapi.cli("show hardware"))
+
+    def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1,
+                         payload_size=54):
         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
-                sa.encrypt(IP(src=src, dst=dst) / ICMP() / self.payload)
+                sa.encrypt(IP(src=src, dst=dst) /
+                           ICMP() / Raw('X' * payload_size))
                 for i in range(count)]
 
-    def gen_encrypt_pkts6(self, sa, sw_intf, src, dst, count=1):
+    def gen_encrypt_pkts6(self, sa, sw_intf, src, dst, count=1,
+                          payload_size=54):
         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
                 sa.encrypt(IPv6(src=src, dst=dst) /
-                           ICMPv6EchoRequest(id=0, seq=1, data=self.payload))
+                           ICMPv6EchoRequest(id=0, seq=1,
+                                             data='X' * payload_size))
                 for i in range(count)]
 
-    def gen_pkts(self, sw_intf, src, dst, count=1):
+    def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=54):
         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
-                IP(src=src, dst=dst) / ICMP() / self.payload
+                IP(src=src, dst=dst) / ICMP() / Raw('X' * payload_size)
                 for i in range(count)]
 
-    def gen_pkts6(self, sw_intf, src, dst, count=1):
+    def gen_pkts6(self, sw_intf, src, dst, count=1, payload_size=54):
         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
                 IPv6(src=src, dst=dst) /
-                ICMPv6EchoRequest(id=0, seq=1, data=self.payload)
+                ICMPv6EchoRequest(id=0, seq=1, data='X' * payload_size)
                 for i in range(count)]
 
-    def configure_sa_tra(self, params):
-        params.scapy_tra_sa = SecurityAssociation(
-            self.encryption_type,
-            spi=params.vpp_tra_spi,
-            crypt_algo=params.crypt_algo,
-            crypt_key=params.crypt_key,
-            auth_algo=params.auth_algo,
-            auth_key=params.auth_key,
-            nat_t_header=params.nat_header)
-        params.vpp_tra_sa = SecurityAssociation(
-            self.encryption_type,
-            spi=params.scapy_tra_spi,
-            crypt_algo=params.crypt_algo,
-            crypt_key=params.crypt_key,
-            auth_algo=params.auth_algo,
-            auth_key=params.auth_key,
-            nat_t_header=params.nat_header)
-
-
-class IpsecTcpTests(object):
-    def test_tcp_checksum(self):
-        """ verify checksum correctness for vpp generated packets """
+
+class IpsecTcp(object):
+    def verify_tcp_checksum(self):
         self.vapi.cli("test http server")
         p = self.params[socket.AF_INET]
         config_tun_params(p, self.encryption_type, self.tun_if)
@@ -249,9 +261,15 @@ class IpsecTcpTests(object):
         self.assert_packet_checksums_valid(decrypted)
 
 
-class IpsecTra4Tests(object):
-    def test_tra_anti_replay(self, count=1):
-        """ ipsec v4 transport anti-reply test """
+class IpsecTcpTests(IpsecTcp):
+    def test_tcp_checksum(self):
+        """ verify checksum correctness for vpp generated packets """
+        self.verify_tcp_checksum()
+
+
+class IpsecTra4(object):
+    """ verify methods for Transport v4 """
+    def verify_tra_anti_replay(self, count=1):
         p = self.params[socket.AF_INET]
         use_esn = p.vpp_tra_sa.use_esn
 
@@ -382,7 +400,7 @@ class IpsecTra4Tests(object):
         p.scapy_tra_sa.seq_num = 351
         p.vpp_tra_sa.seq_num = 351
 
-    def test_tra_basic(self, count=1):
+    def verify_tra_basic4(self, count=1):
         """ ipsec v4 transport basic test """
         self.vapi.cli("clear errors")
         try:
@@ -416,14 +434,25 @@ class IpsecTra4Tests(object):
         self.assert_packet_counter_equal(self.tra4_encrypt_node_name, count)
         self.assert_packet_counter_equal(self.tra4_decrypt_node_name, count)
 
+
+class IpsecTra4Tests(IpsecTra4):
+    """ UT test methods for Transport v4 """
+    def test_tra_anti_replay(self):
+        """ ipsec v4 transport anti-reply test """
+        self.verify_tra_anti_replay(count=1)
+
+    def test_tra_basic(self, count=1):
+        """ ipsec v4 transport basic test """
+        self.verify_tra_basic4(count=1)
+
     def test_tra_burst(self):
         """ ipsec v4 transport burst test """
-        self.test_tra_basic(count=257)
+        self.verify_tra_basic4(count=257)
 
 
-class IpsecTra6Tests(object):
-    def test_tra_basic6(self, count=1):
-        """ ipsec v6 transport basic test """
+class IpsecTra6(object):
+    """ verify methods for Transport v6 """
+    def verify_tra_basic6(self, count=1):
         self.vapi.cli("clear errors")
         try:
             p = self.params[socket.AF_INET6]
@@ -455,19 +484,76 @@ class IpsecTra6Tests(object):
         self.assert_packet_counter_equal(self.tra6_encrypt_node_name, count)
         self.assert_packet_counter_equal(self.tra6_decrypt_node_name, count)
 
+
+class IpsecTra6Tests(IpsecTra6):
+    """ UT test methods for Transport v6 """
+    def test_tra_basic6(self):
+        """ ipsec v6 transport basic test """
+        self.verify_tra_basic6(count=1)
+
     def test_tra_burst6(self):
         """ ipsec v6 transport burst test """
-        self.test_tra_basic6(count=257)
+        self.verify_tra_basic6(count=257)
 
 
 class IpsecTra46Tests(IpsecTra4Tests, IpsecTra6Tests):
+    """ UT test methods for Transport v6 and v4"""
     pass
 
 
 class IpsecTun4(object):
+    """ verify methods for Tunnel v4 """
+    def verify_counters(self, p, count):
+        if (hasattr(p, "spd_policy_in_any")):
+            pkts = p.spd_policy_in_any.get_stats()['packets']
+            self.assertEqual(pkts, count,
+                             "incorrect SPD any policy: expected %d != %d" %
+                             (count, pkts))
 
-    def verify_tun_44(self, p, count=1):
+        if (hasattr(p, "tun_sa_in")):
+            pkts = p.tun_sa_in.get_stats()['packets']
+            self.assertEqual(pkts, count,
+                             "incorrect SA in counts: expected %d != %d" %
+                             (count, pkts))
+            pkts = p.tun_sa_out.get_stats()['packets']
+            self.assertEqual(pkts, count,
+                             "incorrect SA out counts: expected %d != %d" %
+                             (count, pkts))
+
+        self.assert_packet_counter_equal(self.tun4_encrypt_node_name, count)
+        self.assert_packet_counter_equal(self.tun4_decrypt_node_name, count)
+
+    def verify_decrypted(self, p, rxs):
+        for rx in rxs:
+            self.assert_equal(rx[IP].src, p.remote_tun_if_host)
+            self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
+            self.assert_packet_checksums_valid(rx)
+
+    def verify_encrypted(self, p, sa, rxs):
+        decrypt_pkts = []
+        for rx in rxs:
+            try:
+                decrypt_pkt = p.vpp_tun_sa.decrypt(rx[IP])
+                if not decrypt_pkt.haslayer(IP):
+                    decrypt_pkt = IP(decrypt_pkt[Raw].load)
+                decrypt_pkts.append(decrypt_pkt)
+                self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip4)
+                self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host)
+            except:
+                self.logger.debug(ppp("Unexpected packet:", rx))
+                try:
+                    self.logger.debug(ppp("Decrypted packet:", decrypt_pkt))
+                except:
+                    pass
+                raise
+        pkts = reassemble4(decrypt_pkts)
+        for pkt in pkts:
+            self.assert_packet_checksums_valid(pkt)
+
+    def verify_tun_44(self, p, count=1, payload_size=64, n_rx=None):
         self.vapi.cli("clear errors")
+        if not n_rx:
+            n_rx = count
         try:
             config_tun_params(p, self.encryption_type, self.tun_if)
             send_pkts = self.gen_encrypt_pkts(p.scapy_tun_sa, self.tun_if,
@@ -475,23 +561,47 @@ class IpsecTun4(object):
                                               dst=self.pg1.remote_ip4,
                                               count=count)
             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
+            self.verify_decrypted(p, recv_pkts)
+
+            send_pkts = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
+                                      dst=p.remote_tun_if_host, count=count,
+                                      payload_size=payload_size)
+            recv_pkts = self.send_and_expect(self.pg1, send_pkts,
+                                             self.tun_if, n_rx)
+            self.verify_encrypted(p, p.vpp_tun_sa, recv_pkts)
+
+        finally:
+            self.logger.info(self.vapi.ppcli("show error"))
+            self.logger.info(self.vapi.ppcli("show ipsec"))
+
+        self.verify_counters(p, count)
+
+    def verify_tun_64(self, p, count=1):
+        self.vapi.cli("clear errors")
+        try:
+            config_tun_params(p, self.encryption_type, self.tun_if)
+            send_pkts = self.gen_encrypt_pkts6(p.scapy_tun_sa, self.tun_if,
+                                               src=p.remote_tun_if_host6,
+                                               dst=self.pg1.remote_ip6,
+                                               count=count)
+            recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
             for recv_pkt in recv_pkts:
-                self.assert_equal(recv_pkt[IP].src, p.remote_tun_if_host)
-                self.assert_equal(recv_pkt[IP].dst, self.pg1.remote_ip4)
+                self.assert_equal(recv_pkt[IPv6].src, p.remote_tun_if_host6)
+                self.assert_equal(recv_pkt[IPv6].dst, self.pg1.remote_ip6)
                 self.assert_packet_checksums_valid(recv_pkt)
-            send_pkts = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
-                                      dst=p.remote_tun_if_host, count=count)
+            send_pkts = self.gen_pkts6(self.pg1, src=self.pg1.remote_ip6,
+                                       dst=p.remote_tun_if_host6, count=count)
             recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
             for recv_pkt in recv_pkts:
                 try:
                     decrypt_pkt = p.vpp_tun_sa.decrypt(recv_pkt[IP])
-                    if not decrypt_pkt.haslayer(IP):
-                        decrypt_pkt = IP(decrypt_pkt[Raw].load)
-                    self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip4)
-                    self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host)
+                    if not decrypt_pkt.haslayer(IPv6):
+                        decrypt_pkt = IPv6(decrypt_pkt[Raw].load)
+                    self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip6)
+                    self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host6)
                     self.assert_packet_checksums_valid(decrypt_pkt)
                 except:
-                    self.logger.debug(ppp("Unexpected packet:", recv_pkt))
+                    self.logger.error(ppp("Unexpected packet:", recv_pkt))
                     try:
                         self.logger.debug(
                             ppp("Decrypted packet:", decrypt_pkt))
@@ -502,28 +612,11 @@ class IpsecTun4(object):
             self.logger.info(self.vapi.ppcli("show error"))
             self.logger.info(self.vapi.ppcli("show ipsec"))
 
-        if (hasattr(p, "spd_policy_in_any")):
-            pkts = p.spd_policy_in_any.get_stats()['packets']
-            self.assertEqual(pkts, count,
-                             "incorrect SPD any policy: expected %d != %d" %
-                             (count, pkts))
-
-        if (hasattr(p, "tun_sa_in")):
-            pkts = p.tun_sa_in.get_stats()['packets']
-            self.assertEqual(pkts, count,
-                             "incorrect SA in counts: expected %d != %d" %
-                             (count, pkts))
-            pkts = p.tun_sa_out.get_stats()['packets']
-            self.assertEqual(pkts, count,
-                             "incorrect SA out counts: expected %d != %d" %
-                             (count, pkts))
-
-        self.assert_packet_counter_equal(self.tun4_encrypt_node_name, count)
-        self.assert_packet_counter_equal(self.tun4_decrypt_node_name, count)
+        self.verify_counters(p, count)
 
 
 class IpsecTun4Tests(IpsecTun4):
-
+    """ UT test methods for Tunnel v4 """
     def test_tun_basic44(self):
         """ ipsec 4o4 tunnel basic test """
         self.verify_tun_44(self.params[socket.AF_INET], count=1)
@@ -534,6 +627,19 @@ class IpsecTun4Tests(IpsecTun4):
 
 
 class IpsecTun6(object):
+    """ verify methods for Tunnel v6 """
+    def verify_counters(self, p, count):
+        if (hasattr(p, "tun_sa_in")):
+            pkts = p.tun_sa_in.get_stats()['packets']
+            self.assertEqual(pkts, count,
+                             "incorrect SA in counts: expected %d != %d" %
+                             (count, pkts))
+            pkts = p.tun_sa_out.get_stats()['packets']
+            self.assertEqual(pkts, count,
+                             "incorrect SA out counts: expected %d != %d" %
+                             (count, pkts))
+        self.assert_packet_counter_equal(self.tun6_encrypt_node_name, count)
+        self.assert_packet_counter_equal(self.tun6_decrypt_node_name, count)
 
     def verify_tun_66(self, p, count=1):
         """ ipsec 6o6 tunnel basic test """
@@ -572,21 +678,50 @@ class IpsecTun6(object):
         finally:
             self.logger.info(self.vapi.ppcli("show error"))
             self.logger.info(self.vapi.ppcli("show ipsec"))
+        self.verify_counters(p, count)
 
-        if (hasattr(p, "tun_sa_in")):
-            pkts = p.tun_sa_in.get_stats()['packets']
-            self.assertEqual(pkts, count,
-                             "incorrect SA in counts: expected %d != %d" %
-                             (count, pkts))
-            pkts = p.tun_sa_out.get_stats()['packets']
-            self.assertEqual(pkts, count,
-                             "incorrect SA out counts: expected %d != %d" %
-                             (count, pkts))
-        self.assert_packet_counter_equal(self.tun6_encrypt_node_name, count)
-        self.assert_packet_counter_equal(self.tun6_decrypt_node_name, count)
+    def verify_tun_46(self, p, count=1):
+        """ ipsec 4o6 tunnel basic test """
+        self.vapi.cli("clear errors")
+        try:
+            config_tun_params(p, self.encryption_type, self.tun_if)
+            send_pkts = self.gen_encrypt_pkts(p.scapy_tun_sa, self.tun_if,
+                                              src=p.remote_tun_if_host4,
+                                              dst=self.pg1.remote_ip4,
+                                              count=count)
+            recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
+            for recv_pkt in recv_pkts:
+                self.assert_equal(recv_pkt[IP].src, p.remote_tun_if_host4)
+                self.assert_equal(recv_pkt[IP].dst, self.pg1.remote_ip4)
+                self.assert_packet_checksums_valid(recv_pkt)
+            send_pkts = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
+                                      dst=p.remote_tun_if_host4,
+                                      count=count)
+            recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
+            for recv_pkt in recv_pkts:
+                try:
+                    decrypt_pkt = p.vpp_tun_sa.decrypt(recv_pkt[IPv6])
+                    if not decrypt_pkt.haslayer(IP):
+                        decrypt_pkt = IP(decrypt_pkt[Raw].load)
+                    self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip4)
+                    self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host4)
+                    self.assert_packet_checksums_valid(decrypt_pkt)
+                except:
+                    self.logger.debug(ppp("Unexpected packet:", recv_pkt))
+                    try:
+                        self.logger.debug(ppp("Decrypted packet:",
+                                              decrypt_pkt))
+                    except:
+                        pass
+                    raise
+        finally:
+            self.logger.info(self.vapi.ppcli("show error"))
+            self.logger.info(self.vapi.ppcli("show ipsec"))
+        self.verify_counters(p, count)
 
 
 class IpsecTun6Tests(IpsecTun6):
+    """ UT test methods for Tunnel v6 """
 
     def test_tun_basic66(self):
         """ ipsec 6o6 tunnel basic test """
@@ -598,6 +733,7 @@ class IpsecTun6Tests(IpsecTun6):
 
 
 class IpsecTun46Tests(IpsecTun4Tests, IpsecTun6Tests):
+    """ UT test methods for Tunnel v6 & v4 """
     pass