ipsec: USE_EXTENDED_SEQ_NUM -> USE_ESN
[vpp.git] / test / template_ipsec.py
index 7888a67..40e787e 100644 (file)
@@ -1,7 +1,7 @@
 import unittest
 import socket
 
 import unittest
 import socket
 
-from scapy.layers.inet import IP, ICMP, TCP
+from scapy.layers.inet import IP, ICMP, TCP, UDP
 from scapy.layers.ipsec import SecurityAssociation
 from scapy.layers.l2 import Ether, Raw
 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest
 from scapy.layers.ipsec import SecurityAssociation
 from scapy.layers.l2 import Ether, Raw
 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest
@@ -21,6 +21,7 @@ class IPsecIPv4Params(object):
 
     def __init__(self):
         self.remote_tun_if_host = '1.1.1.1'
 
     def __init__(self):
         self.remote_tun_if_host = '1.1.1.1'
+        self.remote_tun_if_host6 = '1111::1'
 
         self.scapy_tun_sa_id = 10
         self.scapy_tun_spi = 1001
 
         self.scapy_tun_sa_id = 10
         self.scapy_tun_spi = 1001
@@ -41,6 +42,8 @@ class IPsecIPv4Params(object):
                                   IPSEC_API_CRYPTO_ALG_AES_CBC_128)
         self.crypt_algo = 'AES-CBC'  # scapy name
         self.crypt_key = 'JPjyOWBeVEQiMe7h'
                                   IPSEC_API_CRYPTO_ALG_AES_CBC_128)
         self.crypt_algo = 'AES-CBC'  # scapy name
         self.crypt_key = 'JPjyOWBeVEQiMe7h'
+        self.flags = 0
+        self.nat_header = None
 
 
 class IPsecIPv6Params(object):
 
 
 class IPsecIPv6Params(object):
@@ -53,6 +56,7 @@ class IPsecIPv6Params(object):
 
     def __init__(self):
         self.remote_tun_if_host = '1111:1111:1111:1111:1111:1111:1111:1111'
 
     def __init__(self):
         self.remote_tun_if_host = '1111:1111:1111:1111:1111:1111:1111:1111'
+        self.remote_tun_if_host4 = '1.1.1.1'
 
         self.scapy_tun_sa_id = 50
         self.scapy_tun_spi = 3001
 
         self.scapy_tun_sa_id = 50
         self.scapy_tun_spi = 3001
@@ -73,6 +77,55 @@ class IPsecIPv6Params(object):
                                   IPSEC_API_CRYPTO_ALG_AES_CBC_256)
         self.crypt_algo = 'AES-CBC'  # scapy name
         self.crypt_key = 'JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h'
                                   IPSEC_API_CRYPTO_ALG_AES_CBC_256)
         self.crypt_algo = 'AES-CBC'  # scapy name
         self.crypt_key = 'JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h'
+        self.flags = 0
+        self.nat_header = None
+
+
+def config_tun_params(p, encryption_type, tun_if):
+    ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
+    use_esn = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
+                              IPSEC_API_SAD_FLAG_USE_ESN))
+    p.scapy_tun_sa = SecurityAssociation(
+        encryption_type, spi=p.vpp_tun_spi,
+        crypt_algo=p.crypt_algo, crypt_key=p.crypt_key,
+        auth_algo=p.auth_algo, auth_key=p.auth_key,
+        tunnel_header=ip_class_by_addr_type[p.addr_type](
+            src=tun_if.remote_addr[p.addr_type],
+            dst=tun_if.local_addr[p.addr_type]),
+        nat_t_header=p.nat_header,
+        use_esn=use_esn)
+    p.vpp_tun_sa = SecurityAssociation(
+        encryption_type, spi=p.scapy_tun_spi,
+        crypt_algo=p.crypt_algo, crypt_key=p.crypt_key,
+        auth_algo=p.auth_algo, auth_key=p.auth_key,
+        tunnel_header=ip_class_by_addr_type[p.addr_type](
+            dst=tun_if.remote_addr[p.addr_type],
+            src=tun_if.local_addr[p.addr_type]),
+        nat_t_header=p.nat_header,
+        use_esn=use_esn)
+
+
+def config_tra_params(p, encryption_type):
+    use_esn = p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
+                         IPSEC_API_SAD_FLAG_USE_ESN)
+    p.scapy_tra_sa = SecurityAssociation(
+        encryption_type,
+        spi=p.vpp_tra_spi,
+        crypt_algo=p.crypt_algo,
+        crypt_key=p.crypt_key,
+        auth_algo=p.auth_algo,
+        auth_key=p.auth_key,
+        nat_t_header=p.nat_header,
+        use_esn=use_esn)
+    p.vpp_tra_sa = SecurityAssociation(
+        encryption_type,
+        spi=p.scapy_tra_spi,
+        crypt_algo=p.crypt_algo,
+        crypt_key=p.crypt_key,
+        auth_algo=p.auth_algo,
+        auth_key=p.auth_key,
+        nat_t_header=p.nat_header,
+        use_esn=use_esn)
 
 
 class TemplateIpsec(VppTestCase):
 
 
 class TemplateIpsec(VppTestCase):
@@ -98,14 +151,16 @@ class TemplateIpsec(VppTestCase):
         """ empty method to be overloaded when necessary """
         pass
 
         """ empty method to be overloaded when necessary """
         pass
 
-    def setUp(self):
-        super(TemplateIpsec, self).setUp()
-
+    def setup_params(self):
         self.ipv4_params = IPsecIPv4Params()
         self.ipv6_params = IPsecIPv6Params()
         self.params = {self.ipv4_params.addr_type: self.ipv4_params,
                        self.ipv6_params.addr_type: self.ipv6_params}
 
         self.ipv4_params = IPsecIPv4Params()
         self.ipv6_params = IPsecIPv6Params()
         self.params = {self.ipv4_params.addr_type: self.ipv4_params,
                        self.ipv6_params.addr_type: self.ipv6_params}
 
+    def setUp(self):
+        super(TemplateIpsec, self).setUp()
+
+        self.setup_params()
         self.payload = "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"\
                        "XXXXXXXXXXXXXXXXXXXXX"
 
         self.payload = "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"\
                        "XXXXXXXXXXXXXXXXXXXXX"
 
@@ -160,60 +215,29 @@ class TemplateIpsec(VppTestCase):
                 ICMPv6EchoRequest(id=0, seq=1, data=self.payload)
                 for i in range(count)]
 
                 ICMPv6EchoRequest(id=0, seq=1, data=self.payload)
                 for i in range(count)]
 
-    def configure_sa_tun(self, params):
-        ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
-        scapy_tun_sa = SecurityAssociation(
-            self.encryption_type, spi=params.vpp_tun_spi,
-            crypt_algo=params.crypt_algo, crypt_key=params.crypt_key,
-            auth_algo=params.auth_algo, auth_key=params.auth_key,
-            tunnel_header=ip_class_by_addr_type[params.addr_type](
-                src=self.tun_if.remote_addr[params.addr_type],
-                dst=self.tun_if.local_addr[params.addr_type]))
-        vpp_tun_sa = SecurityAssociation(
-            self.encryption_type, spi=params.scapy_tun_spi,
-            crypt_algo=params.crypt_algo, crypt_key=params.crypt_key,
-            auth_algo=params.auth_algo, auth_key=params.auth_key,
-            tunnel_header=ip_class_by_addr_type[params.addr_type](
-                dst=self.tun_if.remote_addr[params.addr_type],
-                src=self.tun_if.local_addr[params.addr_type]))
-        return vpp_tun_sa, scapy_tun_sa
-
-    def configure_sa_tra(self, params):
-        params.scapy_tra_sa = SecurityAssociation(self.encryption_type,
-                                                  spi=params.vpp_tra_spi,
-                                                  crypt_algo=params.crypt_algo,
-                                                  crypt_key=params.crypt_key,
-                                                  auth_algo=params.auth_algo,
-                                                  auth_key=params.auth_key)
-        params.vpp_tra_sa = SecurityAssociation(self.encryption_type,
-                                                spi=params.scapy_tra_spi,
-                                                crypt_algo=params.crypt_algo,
-                                                crypt_key=params.crypt_key,
-                                                auth_algo=params.auth_algo,
-                                                auth_key=params.auth_key)
-
 
 class IpsecTcpTests(object):
     def test_tcp_checksum(self):
         """ verify checksum correctness for vpp generated packets """
         self.vapi.cli("test http server")
         p = self.params[socket.AF_INET]
 
 class IpsecTcpTests(object):
     def test_tcp_checksum(self):
         """ verify checksum correctness for vpp generated packets """
         self.vapi.cli("test http server")
         p = self.params[socket.AF_INET]
-        vpp_tun_sa, scapy_tun_sa = self.configure_sa_tun(p)
+        config_tun_params(p, self.encryption_type, self.tun_if)
         send = (Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac) /
         send = (Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac) /
-                scapy_tun_sa.encrypt(IP(src=p.remote_tun_if_host,
-                                        dst=self.tun_if.local_ip4) /
-                                     TCP(flags='S', dport=80)))
+                p.scapy_tun_sa.encrypt(IP(src=p.remote_tun_if_host,
+                                          dst=self.tun_if.local_ip4) /
+                                       TCP(flags='S', dport=80)))
         self.logger.debug(ppp("Sending packet:", send))
         recv = self.send_and_expect(self.tun_if, [send], self.tun_if)
         recv = recv[0]
         self.logger.debug(ppp("Sending packet:", send))
         recv = self.send_and_expect(self.tun_if, [send], self.tun_if)
         recv = recv[0]
-        decrypted = vpp_tun_sa.decrypt(recv[IP])
+        decrypted = p.vpp_tun_sa.decrypt(recv[IP])
         self.assert_packet_checksums_valid(decrypted)
 
 
         self.assert_packet_checksums_valid(decrypted)
 
 
-class IpsecTraTests(object):
+class IpsecTra4Tests(object):
     def test_tra_anti_replay(self, count=1):
         """ ipsec v4 transport anti-reply test """
         p = self.params[socket.AF_INET]
     def test_tra_anti_replay(self, count=1):
         """ ipsec v4 transport anti-reply test """
         p = self.params[socket.AF_INET]
+        use_esn = p.vpp_tra_sa.use_esn
 
         # fire in a packet with seq number 1
         pkt = (Ether(src=self.tra_if.remote_mac,
 
         # fire in a packet with seq number 1
         pkt = (Ether(src=self.tra_if.remote_mac,
@@ -233,6 +257,11 @@ class IpsecTraTests(object):
                                       seq_num=235))
         recv_pkts = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
 
                                       seq_num=235))
         recv_pkts = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
 
+        # replayed packets are dropped
+        self.send_and_assert_no_replies(self.tra_if, pkt * 3)
+        self.assert_packet_counter_equal(
+            '/err/%s/SA replayed packet' % self.tra4_decrypt_node_name, 3)
+
         # the window size is 64 packets
         # in window are still accepted
         pkt = (Ether(src=self.tra_if.remote_mac,
         # the window size is 64 packets
         # in window are still accepted
         pkt = (Ether(src=self.tra_if.remote_mac,
@@ -243,18 +272,6 @@ class IpsecTraTests(object):
                                       seq_num=172))
         recv_pkts = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
 
                                       seq_num=172))
         recv_pkts = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
 
-        # out of window are dropped
-        pkt = (Ether(src=self.tra_if.remote_mac,
-                     dst=self.tra_if.local_mac) /
-               p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
-                                         dst=self.tra_if.local_ip4) /
-                                      ICMP(),
-                                      seq_num=17))
-        self.send_and_assert_no_replies(self.tra_if, pkt * 17)
-
-        self.assert_packet_counter_equal(
-            '/err/%s/SA replayed packet' % self.tra4_decrypt_node_name, 17)
-
         # a packet that does not decrypt does not move the window forward
         bogus_sa = SecurityAssociation(self.encryption_type,
                                        p.vpp_tra_spi)
         # a packet that does not decrypt does not move the window forward
         bogus_sa = SecurityAssociation(self.encryption_type,
                                        p.vpp_tra_spi)
@@ -278,7 +295,74 @@ class IpsecTraTests(object):
                                       seq_num=234))
         self.send_and_expect(self.tra_if, [pkt], self.tra_if)
 
                                       seq_num=234))
         self.send_and_expect(self.tra_if, [pkt], self.tra_if)
 
+        # out of window are dropped
+        pkt = (Ether(src=self.tra_if.remote_mac,
+                     dst=self.tra_if.local_mac) /
+               p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
+                                         dst=self.tra_if.local_ip4) /
+                                      ICMP(),
+                                      seq_num=17))
+        self.send_and_assert_no_replies(self.tra_if, pkt * 17)
+
+        if use_esn:
+            # an out of window error with ESN looks like a high sequence
+            # wrap. but since it isn't then the verify will fail.
+            self.assert_packet_counter_equal(
+                '/err/%s/Integrity check failed' %
+                self.tra4_decrypt_node_name, 34)
+
+        else:
+            self.assert_packet_counter_equal(
+                '/err/%s/SA replayed packet' %
+                self.tra4_decrypt_node_name, 20)
+
+        # valid packet moves the window over to 236
+        pkt = (Ether(src=self.tra_if.remote_mac,
+                     dst=self.tra_if.local_mac) /
+               p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
+                                         dst=self.tra_if.local_ip4) /
+                                      ICMP(),
+                                      seq_num=236))
+        rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
+        decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
+
+        # move VPP's SA to just before the seq-number wrap
+        self.vapi.cli("test ipsec sa %d seq 0xffffffff" % p.scapy_tra_sa_id)
+
+        # then fire in a packet that VPP should drop because it causes the
+        # seq number to wrap  unless we're using extended.
+        pkt = (Ether(src=self.tra_if.remote_mac,
+                     dst=self.tra_if.local_mac) /
+               p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
+                                         dst=self.tra_if.local_ip4) /
+                                      ICMP(),
+                                      seq_num=237))
+
+        if use_esn:
+            rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
+            # in order to decrpyt the high order number needs to wrap
+            p.vpp_tra_sa.seq_num = 0x100000000
+            decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
+
+            # send packets with high bits set
+            p.scapy_tra_sa.seq_num = 0x100000005
+            pkt = (Ether(src=self.tra_if.remote_mac,
+                         dst=self.tra_if.local_mac) /
+                   p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
+                                             dst=self.tra_if.local_ip4) /
+                                          ICMP(),
+                                          seq_num=0x100000005))
+            rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
+            # in order to decrpyt the high order number needs to wrap
+            decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
+        else:
+            self.send_and_assert_no_replies(self.tra_if, [pkt])
+            self.assert_packet_counter_equal(
+                '/err/%s/sequence number cycled' %
+                self.tra4_encrypt_node_name, 1)
+
         # move the security-associations seq number on to the last we used
         # move the security-associations seq number on to the last we used
+        self.vapi.cli("test ipsec sa %d seq 0x15f" % p.scapy_tra_sa_id)
         p.scapy_tra_sa.seq_num = 351
         p.vpp_tra_sa.seq_num = 351
 
         p.scapy_tra_sa.seq_num = 351
         p.vpp_tra_sa.seq_num = 351
 
@@ -304,6 +388,15 @@ class IpsecTraTests(object):
             self.logger.info(self.vapi.ppcli("show error"))
             self.logger.info(self.vapi.ppcli("show ipsec"))
 
             self.logger.info(self.vapi.ppcli("show error"))
             self.logger.info(self.vapi.ppcli("show ipsec"))
 
+        pkts = p.tra_sa_in.get_stats()['packets']
+        self.assertEqual(pkts, count,
+                         "incorrect SA in counts: expected %d != %d" %
+                         (count, pkts))
+        pkts = p.tra_sa_out.get_stats()['packets']
+        self.assertEqual(pkts, count,
+                         "incorrect SA out counts: expected %d != %d" %
+                         (count, pkts))
+
         self.assert_packet_counter_equal(self.tra4_encrypt_node_name, count)
         self.assert_packet_counter_equal(self.tra4_decrypt_node_name, count)
 
         self.assert_packet_counter_equal(self.tra4_encrypt_node_name, count)
         self.assert_packet_counter_equal(self.tra4_decrypt_node_name, count)
 
@@ -311,6 +404,8 @@ class IpsecTraTests(object):
         """ ipsec v4 transport burst test """
         self.test_tra_basic(count=257)
 
         """ ipsec v4 transport burst test """
         self.test_tra_basic(count=257)
 
+
+class IpsecTra6Tests(object):
     def test_tra_basic6(self, count=1):
         """ ipsec v6 transport basic test """
         self.vapi.cli("clear errors")
     def test_tra_basic6(self, count=1):
         """ ipsec v6 transport basic test """
         self.vapi.cli("clear errors")
@@ -333,6 +428,14 @@ class IpsecTraTests(object):
             self.logger.info(self.vapi.ppcli("show error"))
             self.logger.info(self.vapi.ppcli("show ipsec"))
 
             self.logger.info(self.vapi.ppcli("show error"))
             self.logger.info(self.vapi.ppcli("show ipsec"))
 
+        pkts = p.tra_sa_in.get_stats()['packets']
+        self.assertEqual(pkts, count,
+                         "incorrect SA in counts: expected %d != %d" %
+                         (count, pkts))
+        pkts = p.tra_sa_out.get_stats()['packets']
+        self.assertEqual(pkts, count,
+                         "incorrect SA out counts: expected %d != %d" %
+                         (count, pkts))
         self.assert_packet_counter_equal(self.tra6_encrypt_node_name, count)
         self.assert_packet_counter_equal(self.tra6_decrypt_node_name, count)
 
         self.assert_packet_counter_equal(self.tra6_encrypt_node_name, count)
         self.assert_packet_counter_equal(self.tra6_decrypt_node_name, count)
 
@@ -341,14 +444,37 @@ class IpsecTraTests(object):
         self.test_tra_basic6(count=257)
 
 
         self.test_tra_basic6(count=257)
 
 
-class IpsecTun4Tests(object):
-    def test_tun_basic44(self, count=1):
-        """ ipsec 4o4 tunnel basic test """
+class IpsecTra46Tests(IpsecTra4Tests, IpsecTra6Tests):
+    pass
+
+
+class IpsecTun4(object):
+
+    def verify_counters(self, p, count):
+        if (hasattr(p, "spd_policy_in_any")):
+            pkts = p.spd_policy_in_any.get_stats()['packets']
+            self.assertEqual(pkts, count,
+                             "incorrect SPD any policy: expected %d != %d" %
+                             (count, pkts))
+
+        if (hasattr(p, "tun_sa_in")):
+            pkts = p.tun_sa_in.get_stats()['packets']
+            self.assertEqual(pkts, count,
+                             "incorrect SA in counts: expected %d != %d" %
+                             (count, pkts))
+            pkts = p.tun_sa_out.get_stats()['packets']
+            self.assertEqual(pkts, count,
+                             "incorrect SA out counts: expected %d != %d" %
+                             (count, pkts))
+
+        self.assert_packet_counter_equal(self.tun4_encrypt_node_name, count)
+        self.assert_packet_counter_equal(self.tun4_decrypt_node_name, count)
+
+    def verify_tun_44(self, p, count=1):
         self.vapi.cli("clear errors")
         try:
         self.vapi.cli("clear errors")
         try:
-            p = self.params[socket.AF_INET]
-            vpp_tun_sa, scapy_tun_sa = self.configure_sa_tun(p)
-            send_pkts = self.gen_encrypt_pkts(scapy_tun_sa, self.tun_if,
+            config_tun_params(p, self.encryption_type, self.tun_if)
+            send_pkts = self.gen_encrypt_pkts(p.scapy_tun_sa, self.tun_if,
                                               src=p.remote_tun_if_host,
                                               dst=self.pg1.remote_ip4,
                                               count=count)
                                               src=p.remote_tun_if_host,
                                               dst=self.pg1.remote_ip4,
                                               count=count)
@@ -362,7 +488,7 @@ class IpsecTun4Tests(object):
             recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
             for recv_pkt in recv_pkts:
                 try:
             recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
             for recv_pkt in recv_pkts:
                 try:
-                    decrypt_pkt = vpp_tun_sa.decrypt(recv_pkt[IP])
+                    decrypt_pkt = p.vpp_tun_sa.decrypt(recv_pkt[IP])
                     if not decrypt_pkt.haslayer(IP):
                         decrypt_pkt = IP(decrypt_pkt[Raw].load)
                     self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip4)
                     if not decrypt_pkt.haslayer(IP):
                         decrypt_pkt = IP(decrypt_pkt[Raw].load)
                     self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip4)
@@ -380,22 +506,79 @@ class IpsecTun4Tests(object):
             self.logger.info(self.vapi.ppcli("show error"))
             self.logger.info(self.vapi.ppcli("show ipsec"))
 
             self.logger.info(self.vapi.ppcli("show error"))
             self.logger.info(self.vapi.ppcli("show ipsec"))
 
-        self.assert_packet_counter_equal(self.tun4_encrypt_node_name, count)
-        self.assert_packet_counter_equal(self.tun4_decrypt_node_name, count)
+        self.verify_counters(p, count)
+
+    def verify_tun_64(self, p, count=1):
+        self.vapi.cli("clear errors")
+        try:
+            config_tun_params(p, self.encryption_type, self.tun_if)
+            send_pkts = self.gen_encrypt_pkts6(p.scapy_tun_sa, self.tun_if,
+                                               src=p.remote_tun_if_host6,
+                                               dst=self.pg1.remote_ip6,
+                                               count=count)
+            recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
+            for recv_pkt in recv_pkts:
+                self.assert_equal(recv_pkt[IPv6].src, p.remote_tun_if_host6)
+                self.assert_equal(recv_pkt[IPv6].dst, self.pg1.remote_ip6)
+                self.assert_packet_checksums_valid(recv_pkt)
+            send_pkts = self.gen_pkts6(self.pg1, src=self.pg1.remote_ip6,
+                                       dst=p.remote_tun_if_host6, count=count)
+            recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
+            for recv_pkt in recv_pkts:
+                try:
+                    decrypt_pkt = p.vpp_tun_sa.decrypt(recv_pkt[IP])
+                    if not decrypt_pkt.haslayer(IPv6):
+                        decrypt_pkt = IPv6(decrypt_pkt[Raw].load)
+                    self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip6)
+                    self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host6)
+                    self.assert_packet_checksums_valid(decrypt_pkt)
+                except:
+                    self.logger.error(ppp("Unexpected packet:", recv_pkt))
+                    try:
+                        self.logger.debug(
+                            ppp("Decrypted packet:", decrypt_pkt))
+                    except:
+                        pass
+                    raise
+        finally:
+            self.logger.info(self.vapi.ppcli("show error"))
+            self.logger.info(self.vapi.ppcli("show ipsec"))
+
+        self.verify_counters(p, count)
+
+
+class IpsecTun4Tests(IpsecTun4):
+
+    def test_tun_basic44(self):
+        """ ipsec 4o4 tunnel basic test """
+        self.verify_tun_44(self.params[socket.AF_INET], count=1)
 
     def test_tun_burst44(self):
         """ ipsec 4o4 tunnel burst test """
 
     def test_tun_burst44(self):
         """ ipsec 4o4 tunnel burst test """
-        self.test_tun_basic44(count=257)
+        self.verify_tun_44(self.params[socket.AF_INET], count=257)
+
 
 
+class IpsecTun6(object):
 
 
-class IpsecTun6Tests(object):
-    def test_tun_basic66(self, count=1):
+    def verify_counters(self, p, count):
+        if (hasattr(p, "tun_sa_in")):
+            pkts = p.tun_sa_in.get_stats()['packets']
+            self.assertEqual(pkts, count,
+                             "incorrect SA in counts: expected %d != %d" %
+                             (count, pkts))
+            pkts = p.tun_sa_out.get_stats()['packets']
+            self.assertEqual(pkts, count,
+                             "incorrect SA out counts: expected %d != %d" %
+                             (count, pkts))
+        self.assert_packet_counter_equal(self.tun6_encrypt_node_name, count)
+        self.assert_packet_counter_equal(self.tun6_decrypt_node_name, count)
+
+    def verify_tun_66(self, p, count=1):
         """ ipsec 6o6 tunnel basic test """
         self.vapi.cli("clear errors")
         try:
         """ ipsec 6o6 tunnel basic test """
         self.vapi.cli("clear errors")
         try:
-            p = self.params[socket.AF_INET6]
-            vpp_tun_sa, scapy_tun_sa = self.configure_sa_tun(p)
-            send_pkts = self.gen_encrypt_pkts6(scapy_tun_sa, self.tun_if,
+            config_tun_params(p, self.encryption_type, self.tun_if)
+            send_pkts = self.gen_encrypt_pkts6(p.scapy_tun_sa, self.tun_if,
                                                src=p.remote_tun_if_host,
                                                dst=self.pg1.remote_ip6,
                                                count=count)
                                                src=p.remote_tun_if_host,
                                                dst=self.pg1.remote_ip6,
                                                count=count)
@@ -410,7 +593,7 @@ class IpsecTun6Tests(object):
             recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
             for recv_pkt in recv_pkts:
                 try:
             recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
             for recv_pkt in recv_pkts:
                 try:
-                    decrypt_pkt = vpp_tun_sa.decrypt(recv_pkt[IPv6])
+                    decrypt_pkt = p.vpp_tun_sa.decrypt(recv_pkt[IPv6])
                     if not decrypt_pkt.haslayer(IPv6):
                         decrypt_pkt = IPv6(decrypt_pkt[Raw].load)
                     self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip6)
                     if not decrypt_pkt.haslayer(IPv6):
                         decrypt_pkt = IPv6(decrypt_pkt[Raw].load)
                     self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip6)
@@ -427,16 +610,60 @@ class IpsecTun6Tests(object):
         finally:
             self.logger.info(self.vapi.ppcli("show error"))
             self.logger.info(self.vapi.ppcli("show ipsec"))
         finally:
             self.logger.info(self.vapi.ppcli("show error"))
             self.logger.info(self.vapi.ppcli("show ipsec"))
+        self.verify_counters(p, count)
 
 
-        self.assert_packet_counter_equal(self.tun6_encrypt_node_name, count)
-        self.assert_packet_counter_equal(self.tun6_decrypt_node_name, count)
+    def verify_tun_46(self, p, count=1):
+        """ ipsec 4o6 tunnel basic test """
+        self.vapi.cli("clear errors")
+        try:
+            config_tun_params(p, self.encryption_type, self.tun_if)
+            send_pkts = self.gen_encrypt_pkts(p.scapy_tun_sa, self.tun_if,
+                                              src=p.remote_tun_if_host4,
+                                              dst=self.pg1.remote_ip4,
+                                              count=count)
+            recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
+            for recv_pkt in recv_pkts:
+                self.assert_equal(recv_pkt[IP].src, p.remote_tun_if_host4)
+                self.assert_equal(recv_pkt[IP].dst, self.pg1.remote_ip4)
+                self.assert_packet_checksums_valid(recv_pkt)
+            send_pkts = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
+                                      dst=p.remote_tun_if_host4,
+                                      count=count)
+            recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
+            for recv_pkt in recv_pkts:
+                try:
+                    decrypt_pkt = p.vpp_tun_sa.decrypt(recv_pkt[IPv6])
+                    if not decrypt_pkt.haslayer(IP):
+                        decrypt_pkt = IP(decrypt_pkt[Raw].load)
+                    self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip4)
+                    self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host4)
+                    self.assert_packet_checksums_valid(decrypt_pkt)
+                except:
+                    self.logger.debug(ppp("Unexpected packet:", recv_pkt))
+                    try:
+                        self.logger.debug(ppp("Decrypted packet:",
+                                              decrypt_pkt))
+                    except:
+                        pass
+                    raise
+        finally:
+            self.logger.info(self.vapi.ppcli("show error"))
+            self.logger.info(self.vapi.ppcli("show ipsec"))
+        self.verify_counters(p, count)
+
+
+class IpsecTun6Tests(IpsecTun6):
+
+    def test_tun_basic66(self):
+        """ ipsec 6o6 tunnel basic test """
+        self.verify_tun_66(self.params[socket.AF_INET6], count=1)
 
     def test_tun_burst66(self):
         """ ipsec 6o6 tunnel burst test """
 
     def test_tun_burst66(self):
         """ ipsec 6o6 tunnel burst test """
-        self.test_tun_basic66(count=257)
+        self.verify_tun_66(self.params[socket.AF_INET6], count=257)
 
 
 
 
-class IpsecTunTests(IpsecTun4Tests, IpsecTun6Tests):
+class IpsecTun46Tests(IpsecTun4Tests, IpsecTun6Tests):
     pass
 
 
     pass