tests: Add support for getting corefile patterns on FreeBSD
[vpp.git] / test / template_ipsec.py
index 9927cc7..b5cd922 100644 (file)
@@ -6,23 +6,27 @@ from scapy.layers.inet import IP, ICMP, TCP, UDP
 from scapy.layers.ipsec import SecurityAssociation, ESP
 from scapy.layers.l2 import Ether
 from scapy.packet import raw, Raw
 from scapy.layers.ipsec import SecurityAssociation, ESP
 from scapy.layers.l2 import Ether
 from scapy.packet import raw, Raw
-from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, IPv6ExtHdrHopByHop, \
-    IPv6ExtHdrFragment, IPv6ExtHdrDestOpt
+from scapy.layers.inet6 import (
+    IPv6,
+    ICMPv6EchoRequest,
+    IPv6ExtHdrHopByHop,
+    IPv6ExtHdrFragment,
+    IPv6ExtHdrDestOpt,
+)
 
 
 
 
-from framework import VppTestCase, VppTestRunner
+from framework import VppTestCase
+from asfframework import VppTestRunner
 from util import ppp, reassemble4, fragment_rfc791, fragment_rfc8200
 from vpp_papi import VppEnum
 
 from util import ppp, reassemble4, fragment_rfc791, fragment_rfc8200
 from vpp_papi import VppEnum
 
-from vpp_ipsec import VppIpsecSpd, VppIpsecSpdEntry, \
-    VppIpsecSpdItfBinding
+from vpp_ipsec import VppIpsecSpd, VppIpsecSpdEntry, VppIpsecSpdItfBinding
 from ipaddress import ip_address
 from re import search
 from os import popen
 
 
 class IPsecIPv4Params:
 from ipaddress import ip_address
 from re import search
 from os import popen
 
 
 class IPsecIPv4Params:
-
     addr_type = socket.AF_INET
     addr_any = "0.0.0.0"
     addr_bcast = "255.255.255.255"
     addr_type = socket.AF_INET
     addr_any = "0.0.0.0"
     addr_bcast = "255.255.255.255"
@@ -30,8 +34,8 @@ class IPsecIPv4Params:
     is_ipv6 = 0
 
     def __init__(self):
     is_ipv6 = 0
 
     def __init__(self):
-        self.remote_tun_if_host = '1.1.1.1'
-        self.remote_tun_if_host6 = '1111::1'
+        self.remote_tun_if_host = "1.1.1.1"
+        self.remote_tun_if_host6 = "1111::1"
 
         self.scapy_tun_sa_id = 100
         self.scapy_tun_spi = 1000
 
         self.scapy_tun_sa_id = 100
         self.scapy_tun_spi = 1000
@@ -48,26 +52,30 @@ class IPsecIPv4Params:
         self.outer_flow_label = 0
         self.inner_flow_label = 0x12345
 
         self.outer_flow_label = 0
         self.inner_flow_label = 0x12345
 
-        self.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
-                                 IPSEC_API_INTEG_ALG_SHA1_96)
-        self.auth_algo = 'HMAC-SHA1-96'  # scapy name
-        self.auth_key = b'C91KUR9GYMm5GfkEvNjX'
+        self.anti_replay_window_size = 64
+
+        self.auth_algo_vpp_id = (
+            VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_SHA1_96
+        )
+        self.auth_algo = "HMAC-SHA1-96"  # scapy name
+        self.auth_key = b"C91KUR9GYMm5GfkEvNjX"
 
 
-        self.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
-                                  IPSEC_API_CRYPTO_ALG_AES_CBC_128)
-        self.crypt_algo = 'AES-CBC'  # scapy name
-        self.crypt_key = b'JPjyOWBeVEQiMe7h'
+        self.crypt_algo_vpp_id = (
+            VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_CBC_128
+        )
+        self.crypt_algo = "AES-CBC"  # scapy name
+        self.crypt_key = b"JPjyOWBeVEQiMe7h"
         self.salt = 0
         self.flags = 0
         self.nat_header = None
         self.salt = 0
         self.flags = 0
         self.nat_header = None
-        self.tun_flags = (VppEnum.vl_api_tunnel_encap_decap_flags_t.
-                          TUNNEL_API_ENCAP_DECAP_FLAG_NONE)
+        self.tun_flags = (
+            VppEnum.vl_api_tunnel_encap_decap_flags_t.TUNNEL_API_ENCAP_DECAP_FLAG_NONE
+        )
         self.dscp = 0
         self.async_mode = False
 
 
 class IPsecIPv6Params:
         self.dscp = 0
         self.async_mode = False
 
 
 class IPsecIPv6Params:
-
     addr_type = socket.AF_INET6
     addr_any = "0::0"
     addr_bcast = "ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff"
     addr_type = socket.AF_INET6
     addr_any = "0::0"
     addr_bcast = "ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff"
@@ -75,8 +83,8 @@ class IPsecIPv6Params:
     is_ipv6 = 1
 
     def __init__(self):
     is_ipv6 = 1
 
     def __init__(self):
-        self.remote_tun_if_host = '1111:1111:1111:1111:1111:1111:1111:1111'
-        self.remote_tun_if_host4 = '1.1.1.1'
+        self.remote_tun_if_host = "1111:1111:1111:1111:1111:1111:1111:1111"
+        self.remote_tun_if_host4 = "1.1.1.1"
 
         self.scapy_tun_sa_id = 500
         self.scapy_tun_spi = 3001
 
         self.scapy_tun_sa_id = 500
         self.scapy_tun_spi = 3001
@@ -93,26 +101,31 @@ class IPsecIPv6Params:
         self.outer_flow_label = 0
         self.inner_flow_label = 0x12345
 
         self.outer_flow_label = 0
         self.inner_flow_label = 0x12345
 
-        self.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
-                                 IPSEC_API_INTEG_ALG_SHA1_96)
-        self.auth_algo = 'HMAC-SHA1-96'  # scapy name
-        self.auth_key = b'C91KUR9GYMm5GfkEvNjX'
+        self.anti_replay_window_size = 64
+
+        self.auth_algo_vpp_id = (
+            VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_SHA1_96
+        )
+        self.auth_algo = "HMAC-SHA1-96"  # scapy name
+        self.auth_key = b"C91KUR9GYMm5GfkEvNjX"
 
 
-        self.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
-                                  IPSEC_API_CRYPTO_ALG_AES_CBC_128)
-        self.crypt_algo = 'AES-CBC'  # scapy name
-        self.crypt_key = b'JPjyOWBeVEQiMe7h'
+        self.crypt_algo_vpp_id = (
+            VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_CBC_128
+        )
+        self.crypt_algo = "AES-CBC"  # scapy name
+        self.crypt_key = b"JPjyOWBeVEQiMe7h"
         self.salt = 0
         self.flags = 0
         self.nat_header = None
         self.salt = 0
         self.flags = 0
         self.nat_header = None
-        self.tun_flags = (VppEnum.vl_api_tunnel_encap_decap_flags_t.
-                          TUNNEL_API_ENCAP_DECAP_FLAG_NONE)
+        self.tun_flags = (
+            VppEnum.vl_api_tunnel_encap_decap_flags_t.TUNNEL_API_ENCAP_DECAP_FLAG_NONE
+        )
         self.dscp = 0
         self.async_mode = False
 
 
 def mk_scapy_crypt_key(p):
         self.dscp = 0
         self.async_mode = False
 
 
 def mk_scapy_crypt_key(p):
-    if p.crypt_algo in ("AES-GCM", "AES-CTR"):
+    if p.crypt_algo in ("AES-GCM", "AES-CTR", "AES-NULL-GMAC"):
         return p.crypt_key + struct.pack("!I", p.salt)
     else:
         return p.crypt_key
         return p.crypt_key + struct.pack("!I", p.salt)
     else:
         return p.crypt_key
@@ -120,55 +133,61 @@ def mk_scapy_crypt_key(p):
 
 def config_tun_params(p, encryption_type, tun_if):
     ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
 
 def config_tun_params(p, encryption_type, tun_if):
     ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
-    esn_en = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
-                             IPSEC_API_SAD_FLAG_USE_ESN))
+    esn_en = bool(
+        p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_USE_ESN)
+    )
     p.tun_dst = tun_if.remote_addr[p.addr_type]
     p.tun_src = tun_if.local_addr[p.addr_type]
     crypt_key = mk_scapy_crypt_key(p)
     p.scapy_tun_sa = SecurityAssociation(
     p.tun_dst = tun_if.remote_addr[p.addr_type]
     p.tun_src = tun_if.local_addr[p.addr_type]
     crypt_key = mk_scapy_crypt_key(p)
     p.scapy_tun_sa = SecurityAssociation(
-        encryption_type, spi=p.vpp_tun_spi,
+        encryption_type,
+        spi=p.scapy_tun_spi,
         crypt_algo=p.crypt_algo,
         crypt_key=crypt_key,
         crypt_algo=p.crypt_algo,
         crypt_key=crypt_key,
-        auth_algo=p.auth_algo, auth_key=p.auth_key,
-        tunnel_header=ip_class_by_addr_type[p.addr_type](
-            src=p.tun_dst,
-            dst=p.tun_src),
+        auth_algo=p.auth_algo,
+        auth_key=p.auth_key,
+        tunnel_header=ip_class_by_addr_type[p.addr_type](src=p.tun_dst, dst=p.tun_src),
         nat_t_header=p.nat_header,
         nat_t_header=p.nat_header,
-        esn_en=esn_en)
+        esn_en=esn_en,
+    )
     p.vpp_tun_sa = SecurityAssociation(
     p.vpp_tun_sa = SecurityAssociation(
-        encryption_type, spi=p.scapy_tun_spi,
+        encryption_type,
+        spi=p.vpp_tun_spi,
         crypt_algo=p.crypt_algo,
         crypt_key=crypt_key,
         crypt_algo=p.crypt_algo,
         crypt_key=crypt_key,
-        auth_algo=p.auth_algo, auth_key=p.auth_key,
-        tunnel_header=ip_class_by_addr_type[p.addr_type](
-            dst=p.tun_dst,
-            src=p.tun_src),
+        auth_algo=p.auth_algo,
+        auth_key=p.auth_key,
+        tunnel_header=ip_class_by_addr_type[p.addr_type](dst=p.tun_dst, src=p.tun_src),
         nat_t_header=p.nat_header,
         nat_t_header=p.nat_header,
-        esn_en=esn_en)
+        esn_en=esn_en,
+    )
 
 
 def config_tra_params(p, encryption_type):
 
 
 def config_tra_params(p, encryption_type):
-    esn_en = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
-                             IPSEC_API_SAD_FLAG_USE_ESN))
+    esn_en = bool(
+        p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_USE_ESN)
+    )
     crypt_key = mk_scapy_crypt_key(p)
     p.scapy_tra_sa = SecurityAssociation(
         encryption_type,
     crypt_key = mk_scapy_crypt_key(p)
     p.scapy_tra_sa = SecurityAssociation(
         encryption_type,
-        spi=p.vpp_tra_spi,
+        spi=p.scapy_tra_spi,
         crypt_algo=p.crypt_algo,
         crypt_key=crypt_key,
         auth_algo=p.auth_algo,
         auth_key=p.auth_key,
         nat_t_header=p.nat_header,
         crypt_algo=p.crypt_algo,
         crypt_key=crypt_key,
         auth_algo=p.auth_algo,
         auth_key=p.auth_key,
         nat_t_header=p.nat_header,
-        esn_en=esn_en)
+        esn_en=esn_en,
+    )
     p.vpp_tra_sa = SecurityAssociation(
         encryption_type,
     p.vpp_tra_sa = SecurityAssociation(
         encryption_type,
-        spi=p.scapy_tra_spi,
+        spi=p.vpp_tra_spi,
         crypt_algo=p.crypt_algo,
         crypt_key=crypt_key,
         auth_algo=p.auth_algo,
         auth_key=p.auth_key,
         nat_t_header=p.nat_header,
         crypt_algo=p.crypt_algo,
         crypt_key=crypt_key,
         auth_algo=p.auth_algo,
         auth_key=p.auth_key,
         nat_t_header=p.nat_header,
-        esn_en=esn_en)
+        esn_en=esn_en,
+    )
 
 
 class TemplateIpsec(VppTestCase):
 
 
 class TemplateIpsec(VppTestCase):
@@ -189,11 +208,12 @@ class TemplateIpsec(VppTestCase):
         |tun_if| ------->  |VPP| ------> |pg1|
          ------             ---           ---
     """
         |tun_if| ------->  |VPP| ------> |pg1|
          ------             ---           ---
     """
+
     tun_spd_id = 1
     tra_spd_id = 2
 
     def ipsec_select_backend(self):
     tun_spd_id = 1
     tra_spd_id = 2
 
     def ipsec_select_backend(self):
-        """ empty method to be overloaded when necessary """
+        """empty method to be overloaded when necessary"""
         pass
 
     @classmethod
         pass
 
     @classmethod
@@ -205,12 +225,14 @@ class TemplateIpsec(VppTestCase):
         super(TemplateIpsec, cls).tearDownClass()
 
     def setup_params(self):
         super(TemplateIpsec, cls).tearDownClass()
 
     def setup_params(self):
-        if not hasattr(self, 'ipv4_params'):
+        if not hasattr(self, "ipv4_params"):
             self.ipv4_params = IPsecIPv4Params()
             self.ipv4_params = IPsecIPv4Params()
-        if not hasattr(self, 'ipv6_params'):
+        if not hasattr(self, "ipv6_params"):
             self.ipv6_params = IPsecIPv6Params()
             self.ipv6_params = IPsecIPv6Params()
-        self.params = {self.ipv4_params.addr_type: self.ipv4_params,
-                       self.ipv6_params.addr_type: self.ipv6_params}
+        self.params = {
+            self.ipv4_params.addr_type: self.ipv4_params,
+            self.ipv6_params.addr_type: self.ipv6_params,
+        }
 
     def config_interfaces(self):
         self.create_pg_interfaces(range(3))
 
     def config_interfaces(self):
         self.create_pg_interfaces(range(3))
@@ -227,10 +249,8 @@ class TemplateIpsec(VppTestCase):
 
         self.setup_params()
 
 
         self.setup_params()
 
-        self.vpp_esp_protocol = (VppEnum.vl_api_ipsec_proto_t.
-                                 IPSEC_API_PROTO_ESP)
-        self.vpp_ah_protocol = (VppEnum.vl_api_ipsec_proto_t.
-                                IPSEC_API_PROTO_AH)
+        self.vpp_esp_protocol = VppEnum.vl_api_ipsec_proto_t.IPSEC_API_PROTO_ESP
+        self.vpp_ah_protocol = VppEnum.vl_api_ipsec_proto_t.IPSEC_API_PROTO_AH
 
         self.config_interfaces()
 
 
         self.config_interfaces()
 
@@ -250,44 +270,52 @@ class TemplateIpsec(VppTestCase):
     def show_commands_at_teardown(self):
         self.logger.info(self.vapi.cli("show hardware"))
 
     def show_commands_at_teardown(self):
         self.logger.info(self.vapi.cli("show hardware"))
 
-    def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
-                         payload_size=54):
-        return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
-                sa.encrypt(IP(src=src, dst=dst) /
-                           ICMP() / Raw(b'X' * payload_size))
-                for i in range(count)]
-
-    def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1,
-                          payload_size=54):
-        return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
-                sa.encrypt(IPv6(src=src, dst=dst,
-                                hlim=p.inner_hop_limit,
-                                fl=p.inner_flow_label) /
-                           ICMPv6EchoRequest(id=0, seq=1,
-                                             data='X' * payload_size))
-                for i in range(count)]
+    def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=54):
+        return [
+            Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
+            / sa.encrypt(IP(src=src, dst=dst) / ICMP() / Raw(b"X" * payload_size))
+            for i in range(count)
+        ]
+
+    def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1, payload_size=54):
+        return [
+            Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
+            / sa.encrypt(
+                IPv6(src=src, dst=dst, hlim=p.inner_hop_limit, fl=p.inner_flow_label)
+                / ICMPv6EchoRequest(id=0, seq=1, data="X" * payload_size)
+            )
+            for i in range(count)
+        ]
 
     def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=54):
 
     def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=54):
-        return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
-                IP(src=src, dst=dst) / ICMP() / Raw(b'X' * payload_size)
-                for i in range(count)]
+        return [
+            Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
+            / IP(src=src, dst=dst)
+            / ICMP()
+            / Raw(b"X" * payload_size)
+            for i in range(count)
+        ]
 
     def gen_pkts6(self, p, sw_intf, src, dst, count=1, payload_size=54):
 
     def gen_pkts6(self, p, sw_intf, src, dst, count=1, payload_size=54):
-        return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
-                IPv6(src=src, dst=dst,
-                     hlim=p.inner_hop_limit, fl=p.inner_flow_label) /
-                ICMPv6EchoRequest(id=0, seq=1, data='X' * payload_size)
-                for i in range(count)]
+        return [
+            Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
+            / IPv6(src=src, dst=dst, hlim=p.inner_hop_limit, fl=p.inner_flow_label)
+            / ICMPv6EchoRequest(id=0, seq=1, data="X" * payload_size)
+            for i in range(count)
+        ]
 
 
 class IpsecTcp(object):
     def verify_tcp_checksum(self):
 
 
 class IpsecTcp(object):
     def verify_tcp_checksum(self):
-        self.vapi.cli("test http server")
+        # start http cli server listener on http://0.0.0.0:80
+        self.vapi.cli("http cli server")
         p = self.params[socket.AF_INET]
         p = self.params[socket.AF_INET]
-        send = (Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac) /
-                p.scapy_tun_sa.encrypt(IP(src=p.remote_tun_if_host,
-                                          dst=self.tun_if.local_ip4) /
-                                       TCP(flags='S', dport=80)))
+        send = Ether(
+            src=self.tun_if.remote_mac, dst=self.tun_if.local_mac
+        ) / p.scapy_tun_sa.encrypt(
+            IP(src=p.remote_tun_if_host, dst=self.tun_if.local_ip4)
+            / TCP(flags="S", dport=80)
+        )
         self.logger.debug(ppp("Sending packet:", send))
         recv = self.send_and_expect(self.tun_if, [send], self.tun_if)
         recv = recv[0]
         self.logger.debug(ppp("Sending packet:", send))
         recv = self.send_and_expect(self.tun_if, [send], self.tun_if)
         recv = recv[0]
@@ -297,36 +325,38 @@ class IpsecTcp(object):
 
 class IpsecTcpTests(IpsecTcp):
     def test_tcp_checksum(self):
 
 class IpsecTcpTests(IpsecTcp):
     def test_tcp_checksum(self):
-        """ verify checksum correctness for vpp generated packets """
+        """verify checksum correctness for vpp generated packets"""
         self.verify_tcp_checksum()
 
 
 class IpsecTra4(object):
         self.verify_tcp_checksum()
 
 
 class IpsecTra4(object):
-    """ verify methods for Transport v4 """
+    """verify methods for Transport v4"""
+
     def get_replay_counts(self, p):
     def get_replay_counts(self, p):
-        replay_node_name = ('/err/%s/SA replayed packet' %
-                            self.tra4_decrypt_node_name[0])
+        replay_node_name = "/err/%s/replay" % self.tra4_decrypt_node_name[0]
         count = self.statistics.get_err_counter(replay_node_name)
 
         if p.async_mode:
         count = self.statistics.get_err_counter(replay_node_name)
 
         if p.async_mode:
-            replay_post_node_name = ('/err/%s/SA replayed packet' %
-                                     self.tra4_decrypt_node_name[p.async_mode])
+            replay_post_node_name = (
+                "/err/%s/replay" % self.tra4_decrypt_node_name[p.async_mode]
+            )
             count += self.statistics.get_err_counter(replay_post_node_name)
 
         return count
 
     def get_hash_failed_counts(self, p):
             count += self.statistics.get_err_counter(replay_post_node_name)
 
         return count
 
     def get_hash_failed_counts(self, p):
-        if ESP == self.encryption_type and p.crypt_algo == "AES-GCM":
-            hash_failed_node_name = ('/err/%s/ESP decryption failed' %
-                                     self.tra4_decrypt_node_name[p.async_mode])
+        if ESP == self.encryption_type and p.crypt_algo in ("AES-GCM", "AES-NULL-GMAC"):
+            hash_failed_node_name = (
+                "/err/%s/decryption_failed" % self.tra4_decrypt_node_name[p.async_mode]
+            )
         else:
         else:
-            hash_failed_node_name = ('/err/%s/Integrity check failed' %
-                                     self.tra4_decrypt_node_name[p.async_mode])
+            hash_failed_node_name = (
+                "/err/%s/integ_error" % self.tra4_decrypt_node_name[p.async_mode]
+            )
         count = self.statistics.get_err_counter(hash_failed_node_name)
 
         if p.async_mode:
         count = self.statistics.get_err_counter(hash_failed_node_name)
 
         if p.async_mode:
-            count += self.statistics.get_err_counter(
-                '/err/crypto-dispatch/bad-hmac')
+            count += self.statistics.get_err_counter("/err/crypto-dispatch/bad-hmac")
 
         return count
 
 
         return count
 
@@ -336,81 +366,125 @@ class IpsecTra4(object):
         esn_on = p.vpp_tra_sa.esn_en
         ar_on = p.flags & saf.IPSEC_API_SAD_FLAG_USE_ANTI_REPLAY
 
         esn_on = p.vpp_tra_sa.esn_en
         ar_on = p.flags & saf.IPSEC_API_SAD_FLAG_USE_ANTI_REPLAY
 
-        seq_cycle_node_name = \
-            ('/err/%s/sequence number cycled (packet dropped)' %
-             self.tra4_encrypt_node_name)
+        seq_cycle_node_name = "/err/%s/seq_cycled" % self.tra4_encrypt_node_name
         replay_count = self.get_replay_counts(p)
         hash_failed_count = self.get_hash_failed_counts(p)
         seq_cycle_count = self.statistics.get_err_counter(seq_cycle_node_name)
 
         # a few packets so we get the rx seq number above the window size and
         # thus can simulate a wrap with an out of window packet
         replay_count = self.get_replay_counts(p)
         hash_failed_count = self.get_hash_failed_counts(p)
         seq_cycle_count = self.statistics.get_err_counter(seq_cycle_node_name)
 
         # a few packets so we get the rx seq number above the window size and
         # thus can simulate a wrap with an out of window packet
-        pkts = [(Ether(src=self.tra_if.remote_mac,
-                       dst=self.tra_if.local_mac) /
-                 p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
-                                           dst=self.tra_if.local_ip4) /
-                                        ICMP(),
-                                        seq_num=seq))
-                for seq in range(63, 80)]
+        pkts = [
+            (
+                Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                / p.scapy_tra_sa.encrypt(
+                    IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                    seq_num=seq,
+                )
+            )
+            for seq in range(63, 80)
+        ]
         recv_pkts = self.send_and_expect(self.tra_if, pkts, self.tra_if)
 
         # these 4 packets will all choose seq-num 0 to decrpyt since none
         # are out of window when first checked. however, once #200 has
         # decrypted it will move the window to 200 and has #81 is out of
         # window. this packet should be dropped.
         recv_pkts = self.send_and_expect(self.tra_if, pkts, self.tra_if)
 
         # these 4 packets will all choose seq-num 0 to decrpyt since none
         # are out of window when first checked. however, once #200 has
         # decrypted it will move the window to 200 and has #81 is out of
         # window. this packet should be dropped.
-        pkts = [(Ether(src=self.tra_if.remote_mac,
-                       dst=self.tra_if.local_mac) /
-                 p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
-                                           dst=self.tra_if.local_ip4) /
-                                        ICMP(),
-                                        seq_num=200)),
-                (Ether(src=self.tra_if.remote_mac,
-                       dst=self.tra_if.local_mac) /
-                 p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
-                                           dst=self.tra_if.local_ip4) /
-                                        ICMP(),
-                                        seq_num=81)),
-                (Ether(src=self.tra_if.remote_mac,
-                       dst=self.tra_if.local_mac) /
-                 p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
-                                           dst=self.tra_if.local_ip4) /
-                                        ICMP(),
-                                        seq_num=201)),
-                (Ether(src=self.tra_if.remote_mac,
-                       dst=self.tra_if.local_mac) /
-                 p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
-                                           dst=self.tra_if.local_ip4) /
-                                        ICMP(),
-                                        seq_num=202))]
+        pkts = [
+            (
+                Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                / p.scapy_tra_sa.encrypt(
+                    IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                    seq_num=200,
+                )
+            ),
+            (
+                Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                / p.scapy_tra_sa.encrypt(
+                    IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                    seq_num=81,
+                )
+            ),
+            (
+                Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                / p.scapy_tra_sa.encrypt(
+                    IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                    seq_num=201,
+                )
+            ),
+            (
+                Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                / p.scapy_tra_sa.encrypt(
+                    IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                    seq_num=202,
+                )
+            ),
+        ]
 
         # if anti-replay is off then we won't drop #81
         n_rx = 3 if ar_on else 4
         self.send_and_expect(self.tra_if, pkts, self.tra_if, n_rx=n_rx)
         # this packet is one before the wrap
 
         # if anti-replay is off then we won't drop #81
         n_rx = 3 if ar_on else 4
         self.send_and_expect(self.tra_if, pkts, self.tra_if, n_rx=n_rx)
         # this packet is one before the wrap
-        pkts = [(Ether(src=self.tra_if.remote_mac,
-                       dst=self.tra_if.local_mac) /
-                 p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
-                                           dst=self.tra_if.local_ip4) /
-                                        ICMP(),
-                                        seq_num=203))]
+        pkts = [
+            (
+                Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                / p.scapy_tra_sa.encrypt(
+                    IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                    seq_num=203,
+                )
+            )
+        ]
         recv_pkts = self.send_and_expect(self.tra_if, pkts, self.tra_if)
 
         recv_pkts = self.send_and_expect(self.tra_if, pkts, self.tra_if)
 
+        # a replayed packet, then an out of window, then a legit
+        # tests that a early failure on the batch doesn't affect subsequent packets.
+        pkts = [
+            (
+                Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                / p.scapy_tra_sa.encrypt(
+                    IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                    seq_num=203,
+                )
+            ),
+            (
+                Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                / p.scapy_tra_sa.encrypt(
+                    IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                    seq_num=81,
+                )
+            ),
+            (
+                Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                / p.scapy_tra_sa.encrypt(
+                    IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                    seq_num=204,
+                )
+            ),
+        ]
+        n_rx = 1 if ar_on else 3
+        recv_pkts = self.send_and_expect(self.tra_if, pkts, self.tra_if, n_rx=n_rx)
+
         # move the window over half way to a wrap
         # move the window over half way to a wrap
-        pkts = [(Ether(src=self.tra_if.remote_mac,
-                       dst=self.tra_if.local_mac) /
-                 p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
-                                           dst=self.tra_if.local_ip4) /
-                                        ICMP(),
-                                        seq_num=0x80000001))]
+        pkts = [
+            (
+                Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                / p.scapy_tra_sa.encrypt(
+                    IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                    seq_num=0x80000001,
+                )
+            )
+        ]
         recv_pkts = self.send_and_expect(self.tra_if, pkts, self.tra_if)
 
         # anti-replay will drop old packets, no anti-replay will not
         recv_pkts = self.send_and_expect(self.tra_if, pkts, self.tra_if)
 
         # anti-replay will drop old packets, no anti-replay will not
-        pkts = [(Ether(src=self.tra_if.remote_mac,
-                       dst=self.tra_if.local_mac) /
-                 p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
-                                           dst=self.tra_if.local_ip4) /
-                                        ICMP(),
-                                        seq_num=0x44000001))]
+        pkts = [
+            (
+                Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                / p.scapy_tra_sa.encrypt(
+                    IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                    seq_num=0x44000001,
+                )
+            )
+        ]
 
         if ar_on:
             self.send_and_assert_no_replies(self.tra_if, pkts)
 
         if ar_on:
             self.send_and_assert_no_replies(self.tra_if, pkts)
@@ -426,36 +500,48 @@ class IpsecTra4(object):
             p.scapy_tra_sa.seq_num = 0x100000005
 
             # send a packet that wraps the window for both AR and no AR
             p.scapy_tra_sa.seq_num = 0x100000005
 
             # send a packet that wraps the window for both AR and no AR
-            pkts = [(Ether(src=self.tra_if.remote_mac,
-                           dst=self.tra_if.local_mac) /
-                     p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
-                                               dst=self.tra_if.local_ip4) /
-                                            ICMP(),
-                                            seq_num=0x100000005))]
+            pkts = [
+                (
+                    Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                    / p.scapy_tra_sa.encrypt(
+                        IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4)
+                        / ICMP(),
+                        seq_num=0x100000005,
+                    )
+                )
+            ]
 
             rxs = self.send_and_expect(self.tra_if, pkts, self.tra_if)
             for rx in rxs:
                 decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
 
             # move the window forward to half way to the next wrap
 
             rxs = self.send_and_expect(self.tra_if, pkts, self.tra_if)
             for rx in rxs:
                 decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
 
             # move the window forward to half way to the next wrap
-            pkts = [(Ether(src=self.tra_if.remote_mac,
-                           dst=self.tra_if.local_mac) /
-                     p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
-                                               dst=self.tra_if.local_ip4) /
-                                            ICMP(),
-                                            seq_num=0x180000005))]
+            pkts = [
+                (
+                    Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                    / p.scapy_tra_sa.encrypt(
+                        IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4)
+                        / ICMP(),
+                        seq_num=0x180000005,
+                    )
+                )
+            ]
 
             rxs = self.send_and_expect(self.tra_if, pkts, self.tra_if)
 
             # a packet less than 2^30 from the current position is:
             #  - AR: out of window and dropped
             #  - non-AR: accepted
 
             rxs = self.send_and_expect(self.tra_if, pkts, self.tra_if)
 
             # a packet less than 2^30 from the current position is:
             #  - AR: out of window and dropped
             #  - non-AR: accepted
-            pkts = [(Ether(src=self.tra_if.remote_mac,
-                           dst=self.tra_if.local_mac) /
-                     p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
-                                               dst=self.tra_if.local_ip4) /
-                                            ICMP(),
-                                            seq_num=0x170000005))]
+            pkts = [
+                (
+                    Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                    / p.scapy_tra_sa.encrypt(
+                        IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4)
+                        / ICMP(),
+                        seq_num=0x170000005,
+                    )
+                )
+            ]
 
             if ar_on:
                 self.send_and_assert_no_replies(self.tra_if, pkts)
 
             if ar_on:
                 self.send_and_assert_no_replies(self.tra_if, pkts)
@@ -466,12 +552,16 @@ class IpsecTra4(object):
             #  - AR: out of window and dropped
             #  - non-AR: considered a wrap, but since it's not a wrap
             #    it won't decrpyt and so will be dropped
             #  - AR: out of window and dropped
             #  - non-AR: considered a wrap, but since it's not a wrap
             #    it won't decrpyt and so will be dropped
-            pkts = [(Ether(src=self.tra_if.remote_mac,
-                           dst=self.tra_if.local_mac) /
-                     p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
-                                               dst=self.tra_if.local_ip4) /
-                                            ICMP(),
-                                            seq_num=0x130000005))]
+            pkts = [
+                (
+                    Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                    / p.scapy_tra_sa.encrypt(
+                        IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4)
+                        / ICMP(),
+                        seq_num=0x130000005,
+                    )
+                )
+            ]
 
             self.send_and_assert_no_replies(self.tra_if, pkts)
 
 
             self.send_and_assert_no_replies(self.tra_if, pkts)
 
@@ -480,12 +570,16 @@ class IpsecTra4(object):
             #  - AR: out of window so considered a wrap, so accepted
             #  - non-AR: not considered a wrap, so won't decrypt
             p.scapy_tra_sa.seq_num = 0x260000005
             #  - AR: out of window so considered a wrap, so accepted
             #  - non-AR: not considered a wrap, so won't decrypt
             p.scapy_tra_sa.seq_num = 0x260000005
-            pkts = [(Ether(src=self.tra_if.remote_mac,
-                           dst=self.tra_if.local_mac) /
-                     p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
-                                               dst=self.tra_if.local_ip4) /
-                                            ICMP(),
-                                            seq_num=0x260000005))]
+            pkts = [
+                (
+                    Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                    / p.scapy_tra_sa.encrypt(
+                        IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4)
+                        / ICMP(),
+                        seq_num=0x260000005,
+                    )
+                )
+            ]
             if ar_on:
                 self.send_and_expect(self.tra_if, pkts, self.tra_if)
             else:
             if ar_on:
                 self.send_and_expect(self.tra_if, pkts, self.tra_if)
             else:
@@ -501,44 +595,69 @@ class IpsecTra4(object):
                 #  - AR: accepted
                 #  - non-AR: not considered a wrap, so won't decrypt
 
                 #  - AR: accepted
                 #  - non-AR: not considered a wrap, so won't decrypt
 
-                pkts = [(Ether(src=self.tra_if.remote_mac,
-                               dst=self.tra_if.local_mac) /
-                         p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
-                                                   dst=self.tra_if.local_ip4) /
-                                                ICMP(),
-                                                seq_num=0x200000005)),
-                        (Ether(src=self.tra_if.remote_mac,
-                               dst=self.tra_if.local_mac) /
-                         p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
-                                                   dst=self.tra_if.local_ip4) /
-                                                ICMP(),
-                                                seq_num=0x200000006))]
+                pkts = [
+                    (
+                        Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                        / p.scapy_tra_sa.encrypt(
+                            IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4)
+                            / ICMP(),
+                            seq_num=0x200000005,
+                        )
+                    ),
+                    (
+                        Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                        / p.scapy_tra_sa.encrypt(
+                            IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4)
+                            / ICMP(),
+                            seq_num=0x200000006,
+                        )
+                    ),
+                ]
                 self.send_and_expect(self.tra_if, pkts, self.tra_if)
 
                 self.send_and_expect(self.tra_if, pkts, self.tra_if)
 
-                pkts = [(Ether(src=self.tra_if.remote_mac,
-                               dst=self.tra_if.local_mac) /
-                         p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
-                                                   dst=self.tra_if.local_ip4) /
-                                                ICMP(),
-                                                seq_num=0x260000005))]
+                pkts = [
+                    (
+                        Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                        / p.scapy_tra_sa.encrypt(
+                            IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4)
+                            / ICMP(),
+                            seq_num=0x260000005,
+                        )
+                    )
+                ]
                 self.send_and_expect(self.tra_if, pkts, self.tra_if)
 
     def verify_tra_anti_replay(self):
         p = self.params[socket.AF_INET]
         esn_en = p.vpp_tra_sa.esn_en
                 self.send_and_expect(self.tra_if, pkts, self.tra_if)
 
     def verify_tra_anti_replay(self):
         p = self.params[socket.AF_INET]
         esn_en = p.vpp_tra_sa.esn_en
+        anti_replay_window_size = p.anti_replay_window_size
 
 
-        seq_cycle_node_name = \
-            ('/err/%s/sequence number cycled (packet dropped)' %
-             self.tra4_encrypt_node_name)
+        seq_cycle_node_name = "/err/%s/seq_cycled" % self.tra4_encrypt_node_name
         replay_count = self.get_replay_counts(p)
         replay_count = self.get_replay_counts(p)
+        initial_sa_node_replay_diff = replay_count - p.tra_sa_in.get_err("replay")
         hash_failed_count = self.get_hash_failed_counts(p)
         seq_cycle_count = self.statistics.get_err_counter(seq_cycle_node_name)
         hash_failed_count = self.get_hash_failed_counts(p)
         seq_cycle_count = self.statistics.get_err_counter(seq_cycle_node_name)
+        initial_sa_node_cycled_diff = seq_cycle_count - p.tra_sa_in.get_err(
+            "seq_cycled"
+        )
+        hash_err = "integ_error"
 
         if ESP == self.encryption_type:
 
         if ESP == self.encryption_type:
-            undersize_node_name = ('/err/%s/undersized packet' %
-                                   self.tra4_decrypt_node_name[0])
-            undersize_count = self.statistics.get_err_counter(
-                undersize_node_name)
+            undersize_node_name = "/err/%s/runt" % self.tra4_decrypt_node_name[0]
+            undersize_count = self.statistics.get_err_counter(undersize_node_name)
+            initial_sa_node_undersize_diff = undersize_count - p.tra_sa_in.get_err(
+                "runt"
+            )
+            # For AES-GCM an error in the hash is reported as a decryption failure
+            if p.crypt_algo in ("AES-GCM", "AES-NULL-GMAC"):
+                hash_err = "decryption_failed"
+        # In async mode, we don't report errors in the hash.
+        if p.async_mode:
+            hash_err = ""
+        else:
+            initial_sa_node_hash_diff = hash_failed_count - p.tra_sa_in.get_err(
+                hash_err
+            )
 
         #
         # send packets with seq numbers 1->34
 
         #
         # send packets with seq numbers 1->34
@@ -548,19 +667,24 @@ class IpsecTra4(object):
         # for reasons i haven't investigated Scapy won't create a packet with
         # seq_num=0
         #
         # for reasons i haven't investigated Scapy won't create a packet with
         # seq_num=0
         #
-        pkts = [(Ether(src=self.tra_if.remote_mac,
-                       dst=self.tra_if.local_mac) /
-                 p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
-                                           dst=self.tra_if.local_ip4) /
-                                        ICMP(),
-                                        seq_num=seq))
-                for seq in range(1, 34)]
+        pkts = [
+            (
+                Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                / p.scapy_tra_sa.encrypt(
+                    IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                    seq_num=seq,
+                )
+            )
+            for seq in range(1, 34)
+        ]
         recv_pkts = self.send_and_expect(self.tra_if, pkts, self.tra_if)
 
         # replayed packets are dropped
         self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
         replay_count += len(pkts)
         self.assertEqual(self.get_replay_counts(p), replay_count)
         recv_pkts = self.send_and_expect(self.tra_if, pkts, self.tra_if)
 
         # replayed packets are dropped
         self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
         replay_count += len(pkts)
         self.assertEqual(self.get_replay_counts(p), replay_count)
+        err = p.tra_sa_in.get_err("replay") + initial_sa_node_replay_diff
+        self.assertEqual(err, replay_count)
 
         #
         # now send a batch of packets all with the same sequence number
 
         #
         # now send a batch of packets all with the same sequence number
@@ -568,86 +692,95 @@ class IpsecTra4(object):
         #
         self.vapi.cli("clear error")
         self.vapi.cli("clear node counters")
         #
         self.vapi.cli("clear error")
         self.vapi.cli("clear node counters")
-        pkts = (Ether(src=self.tra_if.remote_mac,
-                      dst=self.tra_if.local_mac) /
-                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
-                                          dst=self.tra_if.local_ip4) /
-                                       ICMP(),
-                                       seq_num=35))
-        recv_pkts = self.send_and_expect(self.tra_if, pkts * 8,
-                                         self.tra_if, n_rx=1)
+        pkts = Ether(
+            src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
+        ) / p.scapy_tra_sa.encrypt(
+            IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+            seq_num=35,
+        )
+        recv_pkts = self.send_and_expect(self.tra_if, pkts * 8, self.tra_if, n_rx=1)
         replay_count += 7
         self.assertEqual(self.get_replay_counts(p), replay_count)
         replay_count += 7
         self.assertEqual(self.get_replay_counts(p), replay_count)
+        err = p.tra_sa_in.get_err("replay") + initial_sa_node_replay_diff
+        self.assertEqual(err, replay_count)
 
         #
 
         #
-        # now move the window over to 257 (more than one byte) and into Case A
+        # now move the window over to anti_replay_window_size + 100 and into Case A
         #
         self.vapi.cli("clear error")
         #
         self.vapi.cli("clear error")
-        pkt = (Ether(src=self.tra_if.remote_mac,
-                     dst=self.tra_if.local_mac) /
-               p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
-                                         dst=self.tra_if.local_ip4) /
-                                      ICMP(),
-                                      seq_num=257))
+        pkt = Ether(
+            src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
+        ) / p.scapy_tra_sa.encrypt(
+            IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+            seq_num=anti_replay_window_size + 100,
+        )
         recv_pkts = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
 
         recv_pkts = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
 
+        self.logger.info(self.vapi.ppcli("show ipsec sa 1"))
+
         # replayed packets are dropped
         self.send_and_assert_no_replies(self.tra_if, pkt * 3, timeout=0.2)
         replay_count += 3
         self.assertEqual(self.get_replay_counts(p), replay_count)
         # replayed packets are dropped
         self.send_and_assert_no_replies(self.tra_if, pkt * 3, timeout=0.2)
         replay_count += 3
         self.assertEqual(self.get_replay_counts(p), replay_count)
+        err = p.tra_sa_in.get_err("replay") + initial_sa_node_replay_diff
+        self.assertEqual(err, replay_count)
 
 
-        # the window size is 64 packets
+        # the window size is anti_replay_window_size packets
         # in window are still accepted
         # in window are still accepted
-        pkt = (Ether(src=self.tra_if.remote_mac,
-                     dst=self.tra_if.local_mac) /
-               p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
-                                         dst=self.tra_if.local_ip4) /
-                                      ICMP(),
-                                      seq_num=200))
-        recv_pkts = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
+        pkt = Ether(
+            src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
+        ) / p.scapy_tra_sa.encrypt(
+            IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+            seq_num=200,
+        )
 
         # a packet that does not decrypt does not move the window forward
 
         # a packet that does not decrypt does not move the window forward
-        bogus_sa = SecurityAssociation(self.encryption_type,
-                                       p.vpp_tra_spi,
-                                       crypt_algo=p.crypt_algo,
-                                       crypt_key=mk_scapy_crypt_key(p)[::-1],
-                                       auth_algo=p.auth_algo,
-                                       auth_key=p.auth_key[::-1])
-        pkt = (Ether(src=self.tra_if.remote_mac,
-                     dst=self.tra_if.local_mac) /
-               bogus_sa.encrypt(IP(src=self.tra_if.remote_ip4,
-                                   dst=self.tra_if.local_ip4) /
-                                ICMP(),
-                                seq_num=350))
+        bogus_sa = SecurityAssociation(
+            self.encryption_type,
+            p.scapy_tra_spi,
+            crypt_algo=p.crypt_algo,
+            crypt_key=mk_scapy_crypt_key(p)[::-1],
+            auth_algo=p.auth_algo,
+            auth_key=p.auth_key[::-1],
+        )
+        pkt = Ether(
+            src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
+        ) / bogus_sa.encrypt(
+            IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+            seq_num=anti_replay_window_size + 200,
+        )
         self.send_and_assert_no_replies(self.tra_if, pkt * 17, timeout=0.2)
 
         hash_failed_count += 17
         self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
         self.send_and_assert_no_replies(self.tra_if, pkt * 17, timeout=0.2)
 
         hash_failed_count += 17
         self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
+        if hash_err != "":
+            err = p.tra_sa_in.get_err(hash_err) + initial_sa_node_hash_diff
+            self.assertEqual(err, hash_failed_count)
 
         # a malformed 'runt' packet
         #  created by a mis-constructed SA
 
         # a malformed 'runt' packet
         #  created by a mis-constructed SA
-        if (ESP == self.encryption_type and p.crypt_algo != "NULL"):
-            bogus_sa = SecurityAssociation(self.encryption_type,
-                                           p.vpp_tra_spi)
-            pkt = (Ether(src=self.tra_if.remote_mac,
-                         dst=self.tra_if.local_mac) /
-                   bogus_sa.encrypt(IP(src=self.tra_if.remote_ip4,
-                                       dst=self.tra_if.local_ip4) /
-                                    ICMP(),
-                                    seq_num=350))
+        if ESP == self.encryption_type and p.crypt_algo != "NULL":
+            bogus_sa = SecurityAssociation(self.encryption_type, p.scapy_tra_spi)
+            pkt = Ether(
+                src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
+            ) / bogus_sa.encrypt(
+                IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                seq_num=anti_replay_window_size + 200,
+            )
             self.send_and_assert_no_replies(self.tra_if, pkt * 17, timeout=0.2)
 
             undersize_count += 17
             self.send_and_assert_no_replies(self.tra_if, pkt * 17, timeout=0.2)
 
             undersize_count += 17
-            self.assert_error_counter_equal(undersize_node_name,
-                                            undersize_count)
+            self.assert_error_counter_equal(undersize_node_name, undersize_count)
+            err = p.tra_sa_in.get_err("runt") + initial_sa_node_undersize_diff
+            self.assertEqual(err, undersize_count)
 
         # which we can determine since this packet is still in the window
 
         # which we can determine since this packet is still in the window
-        pkt = (Ether(src=self.tra_if.remote_mac,
-                     dst=self.tra_if.local_mac) /
-               p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
-                                         dst=self.tra_if.local_ip4) /
-                                      ICMP(),
-                                      seq_num=234))
+        pkt = Ether(
+            src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
+        ) / p.scapy_tra_sa.encrypt(
+            IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+            seq_num=234,
+        )
         self.send_and_expect(self.tra_if, [pkt], self.tra_if)
 
         #
         self.send_and_expect(self.tra_if, [pkt], self.tra_if)
 
         #
@@ -655,12 +788,12 @@ class IpsecTra4(object):
         #  this is Case B. So VPP will consider this to be a high seq num wrap
         #  and so the decrypt attempt will fail
         #
         #  this is Case B. So VPP will consider this to be a high seq num wrap
         #  and so the decrypt attempt will fail
         #
-        pkt = (Ether(src=self.tra_if.remote_mac,
-                     dst=self.tra_if.local_mac) /
-               p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
-                                         dst=self.tra_if.local_ip4) /
-                                      ICMP(),
-                                      seq_num=17))
+        pkt = Ether(
+            src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
+        ) / p.scapy_tra_sa.encrypt(
+            IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+            seq_num=17,
+        )
         self.send_and_assert_no_replies(self.tra_if, pkt * 17, timeout=0.2)
 
         if esn_en:
         self.send_and_assert_no_replies(self.tra_if, pkt * 17, timeout=0.2)
 
         if esn_en:
@@ -668,18 +801,23 @@ class IpsecTra4(object):
             # wrap. but since it isn't then the verify will fail.
             hash_failed_count += 17
             self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
             # wrap. but since it isn't then the verify will fail.
             hash_failed_count += 17
             self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
+            if hash_err != "":
+                err = p.tra_sa_in.get_err(hash_err) + initial_sa_node_hash_diff
+                self.assertEqual(err, hash_failed_count)
 
         else:
             replay_count += 17
             self.assertEqual(self.get_replay_counts(p), replay_count)
 
         else:
             replay_count += 17
             self.assertEqual(self.get_replay_counts(p), replay_count)
-
-        # valid packet moves the window over to 258
-        pkt = (Ether(src=self.tra_if.remote_mac,
-                     dst=self.tra_if.local_mac) /
-               p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
-                                         dst=self.tra_if.local_ip4) /
-                                      ICMP(),
-                                      seq_num=258))
+            err = p.tra_sa_in.get_err("replay") + initial_sa_node_replay_diff
+            self.assertEqual(err, replay_count)
+
+        # valid packet moves the window over to anti_replay_window_size + 258
+        pkt = Ether(
+            src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
+        ) / p.scapy_tra_sa.encrypt(
+            IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+            seq_num=anti_replay_window_size + 258,
+        )
         rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
         decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
 
         rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
         decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
 
@@ -689,17 +827,20 @@ class IpsecTra4(object):
         # causes the TX seq number to wrap; unless we're using extened sequence
         # numbers.
         #
         # causes the TX seq number to wrap; unless we're using extened sequence
         # numbers.
         #
-        self.vapi.cli("test ipsec sa %d seq 0xffffffff" % p.scapy_tra_sa_id)
+        self.vapi.cli("test ipsec sa %d seq 0xffffffff" % p.vpp_tra_sa_id)
         self.logger.info(self.vapi.ppcli("show ipsec sa 0"))
         self.logger.info(self.vapi.ppcli("show ipsec sa 1"))
 
         self.logger.info(self.vapi.ppcli("show ipsec sa 0"))
         self.logger.info(self.vapi.ppcli("show ipsec sa 1"))
 
-        pkts = [(Ether(src=self.tra_if.remote_mac,
-                       dst=self.tra_if.local_mac) /
-                 p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
-                                           dst=self.tra_if.local_ip4) /
-                                        ICMP(),
-                                        seq_num=seq))
-                for seq in range(259, 280)]
+        pkts = [
+            (
+                Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                / p.scapy_tra_sa.encrypt(
+                    IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                    seq_num=seq,
+                )
+            )
+            for seq in range(259, 280)
+        ]
 
         if esn_en:
             rxs = self.send_and_expect(self.tra_if, pkts, self.tra_if)
 
         if esn_en:
             rxs = self.send_and_expect(self.tra_if, pkts, self.tra_if)
@@ -718,27 +859,27 @@ class IpsecTra4(object):
             # The low seq num we set it to will place VPP's RX window in Case A
             #
             p.scapy_tra_sa.seq_num = 0x100000005
             # The low seq num we set it to will place VPP's RX window in Case A
             #
             p.scapy_tra_sa.seq_num = 0x100000005
-            pkt = (Ether(src=self.tra_if.remote_mac,
-                         dst=self.tra_if.local_mac) /
-                   p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
-                                             dst=self.tra_if.local_ip4) /
-                                          ICMP(),
-                                          seq_num=0x100000005))
+            pkt = Ether(
+                src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
+            ) / p.scapy_tra_sa.encrypt(
+                IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                seq_num=0x100000005,
+            )
             rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
 
             decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
 
             #
             rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
 
             decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
 
             #
-            # A packet that has seq num between (2^32-64) and 5 is within
+            # A packet that has seq num between (2^32-anti_replay_window_size)+4 and 5 is within
             # the window
             #
             # the window
             #
-            p.scapy_tra_sa.seq_num = 0xfffffffd
-            pkt = (Ether(src=self.tra_if.remote_mac,
-                         dst=self.tra_if.local_mac) /
-                   p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
-                                             dst=self.tra_if.local_ip4) /
-                                          ICMP(),
-                                          seq_num=0xfffffffd))
+            p.scapy_tra_sa.seq_num = 0xFFFFFFFD
+            pkt = Ether(
+                src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
+            ) / p.scapy_tra_sa.encrypt(
+                IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                seq_num=0xFFFFFFFD,
+            )
             rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
             decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
 
             rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
             decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
 
@@ -747,39 +888,43 @@ class IpsecTra4(object):
             # because VPP will consider this packet to be one that moves the
             # window forward
             #
             # because VPP will consider this packet to be one that moves the
             # window forward
             #
-            pkt = (Ether(src=self.tra_if.remote_mac,
-                         dst=self.tra_if.local_mac) /
-                   p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
-                                             dst=self.tra_if.local_ip4) /
-                                          ICMP(),
-                                          seq_num=0x200000999))
-            self.send_and_assert_no_replies(self.tra_if, [pkt], self.tra_if,
-                                            timeout=0.2)
+            pkt = Ether(
+                src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
+            ) / p.scapy_tra_sa.encrypt(
+                IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                seq_num=0x200000999,
+            )
+            self.send_and_assert_no_replies(
+                self.tra_if, [pkt], self.tra_if, timeout=0.2
+            )
 
             hash_failed_count += 1
             self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
 
             hash_failed_count += 1
             self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
+            if hash_err != "":
+                err = p.tra_sa_in.get_err(hash_err) + initial_sa_node_hash_diff
+                self.assertEqual(err, hash_failed_count)
 
             #
             # but if we move the window forward to case B, then we can wrap
             # again
             #
 
             #
             # but if we move the window forward to case B, then we can wrap
             # again
             #
-            p.scapy_tra_sa.seq_num = 0x100000555
-            pkt = (Ether(src=self.tra_if.remote_mac,
-                         dst=self.tra_if.local_mac) /
-                   p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
-                                             dst=self.tra_if.local_ip4) /
-                                          ICMP(),
-                                          seq_num=0x100000555))
+            p.scapy_tra_sa.seq_num = 0x100000000 + anti_replay_window_size + 0x555
+            pkt = Ether(
+                src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
+            ) / p.scapy_tra_sa.encrypt(
+                IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                seq_num=p.scapy_tra_sa.seq_num,
+            )
             rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
             decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
 
             p.scapy_tra_sa.seq_num = 0x200000444
             rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
             decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
 
             p.scapy_tra_sa.seq_num = 0x200000444
-            pkt = (Ether(src=self.tra_if.remote_mac,
-                         dst=self.tra_if.local_mac) /
-                   p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
-                                             dst=self.tra_if.local_ip4) /
-                                          ICMP(),
-                                          seq_num=0x200000444))
+            pkt = Ether(
+                src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
+            ) / p.scapy_tra_sa.encrypt(
+                IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                seq_num=0x200000444,
+            )
             rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
             decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
 
             rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
             decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
 
@@ -790,8 +935,9 @@ class IpsecTra4(object):
             #
             self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
             seq_cycle_count += len(pkts)
             #
             self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
             seq_cycle_count += len(pkts)
-            self.assert_error_counter_equal(seq_cycle_node_name,
-                                            seq_cycle_count)
+            self.assert_error_counter_equal(seq_cycle_node_name, seq_cycle_count)
+            err = p.tra_sa_out.get_err("seq_cycled") + initial_sa_node_cycled_diff
+            self.assertEqual(err, seq_cycle_count)
 
         # move the security-associations seq number on to the last we used
         self.vapi.cli("test ipsec sa %d seq 0x15f" % p.scapy_tra_sa_id)
 
         # move the security-associations seq number on to the last we used
         self.vapi.cli("test ipsec sa %d seq 0x15f" % p.scapy_tra_sa_id)
@@ -810,88 +956,109 @@ class IpsecTra4(object):
         # for reasons i haven't investigated Scapy won't create a packet with
         # seq_num=0
         #
         # for reasons i haven't investigated Scapy won't create a packet with
         # seq_num=0
         #
-        pkts = [(Ether(src=self.tra_if.remote_mac,
-                       dst=self.tra_if.local_mac) /
-                 p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
-                                           dst=self.tra_if.local_ip4) /
-                                        ICMP(),
-                                        seq_num=seq))
-                for seq in range(1, 3)]
+        pkts = [
+            (
+                Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                / p.scapy_tra_sa.encrypt(
+                    IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                    seq_num=seq,
+                )
+            )
+            for seq in range(1, 3)
+        ]
         self.send_and_expect(self.tra_if, pkts, self.tra_if)
 
         self.send_and_expect(self.tra_if, pkts, self.tra_if)
 
-        self.assertEqual(p.tra_sa_out.get_lost(), 0)
+        self.assertEqual(p.tra_sa_in.get_err("lost"), 0)
 
         # skip a sequence number
 
         # skip a sequence number
-        pkts = [(Ether(src=self.tra_if.remote_mac,
-                       dst=self.tra_if.local_mac) /
-                 p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
-                                           dst=self.tra_if.local_ip4) /
-                                        ICMP(),
-                                        seq_num=seq))
-                for seq in range(4, 6)]
+        pkts = [
+            (
+                Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                / p.scapy_tra_sa.encrypt(
+                    IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                    seq_num=seq,
+                )
+            )
+            for seq in range(4, 6)
+        ]
         self.send_and_expect(self.tra_if, pkts, self.tra_if)
 
         self.send_and_expect(self.tra_if, pkts, self.tra_if)
 
-        self.assertEqual(p.tra_sa_out.get_lost(), 0)
+        self.assertEqual(p.tra_sa_in.get_err("lost"), 0)
 
         # the lost packet are counted untill we get up past the first
         # sizeof(replay_window) packets
 
         # the lost packet are counted untill we get up past the first
         # sizeof(replay_window) packets
-        pkts = [(Ether(src=self.tra_if.remote_mac,
-                       dst=self.tra_if.local_mac) /
-                 p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
-                                           dst=self.tra_if.local_ip4) /
-                                        ICMP(),
-                                        seq_num=seq))
-                for seq in range(6, 100)]
+        pkts = [
+            (
+                Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                / p.scapy_tra_sa.encrypt(
+                    IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                    seq_num=seq,
+                )
+            )
+            for seq in range(6, 100)
+        ]
         self.send_and_expect(self.tra_if, pkts, self.tra_if)
 
         self.send_and_expect(self.tra_if, pkts, self.tra_if)
 
-        self.assertEqual(p.tra_sa_out.get_lost(), 1)
+        self.assertEqual(p.tra_sa_in.get_err("lost"), 1)
 
         # lost of holes in the sequence
 
         # lost of holes in the sequence
-        pkts = [(Ether(src=self.tra_if.remote_mac,
-                       dst=self.tra_if.local_mac) /
-                 p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
-                                           dst=self.tra_if.local_ip4) /
-                                        ICMP(),
-                                        seq_num=seq))
-                for seq in range(100, 200, 2)]
+        pkts = [
+            (
+                Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                / p.scapy_tra_sa.encrypt(
+                    IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                    seq_num=seq,
+                )
+            )
+            for seq in range(100, 200, 2)
+        ]
         self.send_and_expect(self.tra_if, pkts, self.tra_if, n_rx=50)
 
         self.send_and_expect(self.tra_if, pkts, self.tra_if, n_rx=50)
 
-        pkts = [(Ether(src=self.tra_if.remote_mac,
-                       dst=self.tra_if.local_mac) /
-                 p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
-                                           dst=self.tra_if.local_ip4) /
-                                        ICMP(),
-                                        seq_num=seq))
-                for seq in range(200, 300)]
+        pkts = [
+            (
+                Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                / p.scapy_tra_sa.encrypt(
+                    IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                    seq_num=seq,
+                )
+            )
+            for seq in range(200, 300)
+        ]
         self.send_and_expect(self.tra_if, pkts, self.tra_if)
 
         self.send_and_expect(self.tra_if, pkts, self.tra_if)
 
-        self.assertEqual(p.tra_sa_out.get_lost(), 51)
+        self.assertEqual(p.tra_sa_in.get_err("lost"), 51)
 
         # a big hole in the seq number space
 
         # a big hole in the seq number space
-        pkts = [(Ether(src=self.tra_if.remote_mac,
-                       dst=self.tra_if.local_mac) /
-                 p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
-                                           dst=self.tra_if.local_ip4) /
-                                        ICMP(),
-                                        seq_num=seq))
-                for seq in range(400, 500)]
+        pkts = [
+            (
+                Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                / p.scapy_tra_sa.encrypt(
+                    IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                    seq_num=seq,
+                )
+            )
+            for seq in range(400, 500)
+        ]
         self.send_and_expect(self.tra_if, pkts, self.tra_if)
 
         self.send_and_expect(self.tra_if, pkts, self.tra_if)
 
-        self.assertEqual(p.tra_sa_out.get_lost(), 151)
+        self.assertEqual(p.tra_sa_in.get_err("lost"), 151)
 
     def verify_tra_basic4(self, count=1, payload_size=54):
 
     def verify_tra_basic4(self, count=1, payload_size=54):
-        """ ipsec v4 transport basic test """
+        """ipsec v4 transport basic test"""
         self.vapi.cli("clear errors")
         self.vapi.cli("clear ipsec sa")
         try:
             p = self.params[socket.AF_INET]
         self.vapi.cli("clear errors")
         self.vapi.cli("clear ipsec sa")
         try:
             p = self.params[socket.AF_INET]
-            send_pkts = self.gen_encrypt_pkts(p, p.scapy_tra_sa, self.tra_if,
-                                              src=self.tra_if.remote_ip4,
-                                              dst=self.tra_if.local_ip4,
-                                              count=count,
-                                              payload_size=payload_size)
-            recv_pkts = self.send_and_expect(self.tra_if, send_pkts,
-                                             self.tra_if)
+            send_pkts = self.gen_encrypt_pkts(
+                p,
+                p.scapy_tra_sa,
+                self.tra_if,
+                src=self.tra_if.remote_ip4,
+                dst=self.tra_if.local_ip4,
+                count=count,
+                payload_size=payload_size,
+            )
+            recv_pkts = self.send_and_expect(self.tra_if, send_pkts, self.tra_if)
             for rx in recv_pkts:
                 self.assertEqual(len(rx) - len(Ether()), rx[IP].len)
                 self.assert_packet_checksums_valid(rx)
             for rx in recv_pkts:
                 self.assertEqual(len(rx) - len(Ether()), rx[IP].len)
                 self.assert_packet_checksums_valid(rx)
@@ -905,57 +1072,827 @@ class IpsecTra4(object):
             self.logger.info(self.vapi.ppcli("show error"))
             self.logger.info(self.vapi.ppcli("show ipsec all"))
 
             self.logger.info(self.vapi.ppcli("show error"))
             self.logger.info(self.vapi.ppcli("show ipsec all"))
 
-        pkts = p.tra_sa_in.get_stats()['packets']
-        self.assertEqual(pkts, count,
-                         "incorrect SA in counts: expected %d != %d" %
-                         (count, pkts))
-        pkts = p.tra_sa_out.get_stats()['packets']
-        self.assertEqual(pkts, count,
-                         "incorrect SA out counts: expected %d != %d" %
-                         (count, pkts))
-        self.assertEqual(p.tra_sa_out.get_lost(), 0)
-        self.assertEqual(p.tra_sa_in.get_lost(), 0)
+        pkts = p.tra_sa_in.get_stats()["packets"]
+        self.assertEqual(
+            pkts, count, "incorrect SA in counts: expected %d != %d" % (count, pkts)
+        )
+        pkts = p.tra_sa_out.get_stats()["packets"]
+        self.assertEqual(
+            pkts, count, "incorrect SA out counts: expected %d != %d" % (count, pkts)
+        )
+        self.assertEqual(p.tra_sa_out.get_err("lost"), 0)
+        self.assertEqual(p.tra_sa_in.get_err("lost"), 0)
 
         self.assert_packet_counter_equal(self.tra4_encrypt_node_name, count)
         self.assert_packet_counter_equal(self.tra4_decrypt_node_name[0], count)
 
 
         self.assert_packet_counter_equal(self.tra4_encrypt_node_name, count)
         self.assert_packet_counter_equal(self.tra4_decrypt_node_name[0], count)
 
+    def _verify_tra_anti_replay_algorithm_esn(self):
+        def seq_num(seqh, seql):
+            return (seqh << 32) | (seql & 0xFFFF_FFFF)
+
+        p = self.params[socket.AF_INET]
+        anti_replay_window_size = p.anti_replay_window_size
+
+        seq_cycle_node_name = "/err/%s/seq_cycled" % self.tra4_encrypt_node_name
+        replay_count = self.get_replay_counts(p)
+        hash_failed_count = self.get_hash_failed_counts(p)
+        seq_cycle_count = self.statistics.get_err_counter(seq_cycle_node_name)
+
+        if ESP == self.encryption_type:
+            undersize_node_name = "/err/%s/runt" % self.tra4_decrypt_node_name[0]
+            undersize_count = self.statistics.get_err_counter(undersize_node_name)
+
+        # reset the TX SA to avoid conflict with left configuration
+        self.vapi.cli(f"test ipsec sa {p.vpp_tra_sa_id} seq 0x0")
+
+        """
+        RFC 4303 Appendix A2. Case A
+
+        |: new Th marker
+        a-i: possible seq num received
+        +: Bl, Tl, Bl', Tl'
+        [BT]l(sign) = [BT]l (sign) 2^32 mod 2^32 (Th inc/dec-remented by 1)
+
+                Th - 1               Th                Th + 1
+        --|--a--+---b---+-c--|--d--+---e---+-f--|--g--+---h---+--i-|--
+                =========          =========          =========
+                Bl-     Tl-        Bl      Tl         Bl+     Tl+
+
+        Case A implies Tl >= W - 1
+        """
+
+        Th = 1
+        Tl = anti_replay_window_size + 40
+        Bl = Tl - anti_replay_window_size + 1
+
+        # move VPP's RX AR window to Case A
+        self.vapi.cli(f"test ipsec sa {p.scapy_tra_sa_id} seq {seq_num(Th, Tl):#x}")
+        p.scapy_tra_sa.seq_num = seq_num(Th, Tl)
+
+        """
+        case a: Seql < Bl
+            - pre-crypto check: algorithm predicts that the packet wrap the window
+                -> Seqh = Th + 1
+            - integrity check: should fail
+            - post-crypto check: ...
+        """
+        pkts = [
+            (
+                Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                / p.scapy_tra_sa.encrypt(
+                    IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                    seq_num=seq,
+                )
+            )
+            for seq in range(seq_num(Th - 1, Bl - 20), seq_num(Th - 1, Bl - 5))
+        ]
+
+        # out-of-window packets fail integrity check
+        self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
+        hash_failed_count += len(pkts)
+        self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
+
+        """
+            case b: Bl <= Seql <= Tl
+                - pre-crypto check: algorithm predicts that the packet is in the window
+                    -> Seqh = Th
+                    -> check for a replayed packet with Seql
+                - integrity check: should fail
+                - post-crypto check: ...
+        """
+        pkts = [
+            (
+                Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                / p.scapy_tra_sa.encrypt(
+                    IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                    seq_num=seq,
+                )
+            )
+            for seq in range(seq_num(Th, Tl - 10), seq_num(Th, Tl - 5))
+        ]
+        self.send_and_expect(self.tra_if, pkts, self.tra_if)
+
+        p.scapy_tra_sa.seq_num = seq_num(Th - 1, Tl)
+        pkts = [
+            (
+                Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                / p.scapy_tra_sa.encrypt(
+                    IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                    seq_num=seq,
+                )
+            )
+            for seq in range(seq_num(Th - 1, Tl - 35), seq_num(Th - 1, Tl - 5))
+        ]
+
+        self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
+
+        # some packets are rejected by the pre-crypto check
+        replay_count += 5
+        self.assertEqual(self.get_replay_counts(p), replay_count)
+
+        # out-of-window packets fail integrity check
+        hash_failed_count += len(pkts) - 5
+        self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
+
+        """
+            case c: Seql > Tl
+                - pre-crypto check: algorithm predicts that the packet does not wrap the window
+                    -> Seqh = Th
+                - integrity check: should fail
+                - post-crypto check: ...
+        """
+        pkts = [
+            (
+                Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                / p.scapy_tra_sa.encrypt(
+                    IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                    seq_num=seq,
+                )
+            )
+            for seq in range(seq_num(Th - 1, Tl + 5), seq_num(Th - 1, Tl + 20))
+        ]
+
+        self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
+
+        # out-of-window packets fail integrity check
+        hash_failed_count += len(pkts)
+        self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
+
+        """
+            case d: Seql < Bl
+                - pre-crypto check: algorithm predicts that the packet wrap the window
+                    -> Seqh = Th + 1
+                - integrity check: should fail
+                - post-crypto check: ...
+        """
+        p.scapy_tra_sa.seq_num = seq_num(Th, Tl)
+        pkts = [
+            (
+                Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                / p.scapy_tra_sa.encrypt(
+                    IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                    seq_num=seq,
+                )
+            )
+            for seq in range(seq_num(Th, Bl - 20), seq_num(Th, Bl - 5))
+        ]
+
+        # out-of-window packets fail integrity check
+        self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
+        hash_failed_count += len(pkts)
+        self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
+
+        """
+            case e: Bl <= Seql <= Tl
+                - pre-crypto check: algorithm predicts that the packet is in the window
+                    -> Seqh = Th
+                    -> check for a replayed packet with Seql
+                - integrity check: should pass
+                - post-crypto check: should pass
+                    -> Seql is marked in the AR window
+        """
+        pkts = [
+            (
+                Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                / p.scapy_tra_sa.encrypt(
+                    IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                    seq_num=seq,
+                )
+            )
+            for seq in range(seq_num(Th, Bl + 10), seq_num(Th, Bl + 30))
+        ]
+
+        self.send_and_expect(self.tra_if, pkts, self.tra_if)
+
+        """
+            case f: Seql > Tl
+                - pre-crypto check: algorithm predicts that the packet does not wrap the window
+                    -> Seqh = Th
+                - integrity check: should pass
+                - post-crypto check: should pass
+                    -> AR window shift (the window stays Case A)
+                    -> Seql is marked in the AR window
+        """
+        pkts = [
+            (
+                Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                / p.scapy_tra_sa.encrypt(
+                    IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                    seq_num=seq,
+                )
+            )
+            for seq in range(seq_num(Th, Tl + 50), seq_num(Th, Tl + 60))
+        ]
+
+        self.send_and_expect(self.tra_if, pkts, self.tra_if)
+
+        """
+            case g: Seql < Bl
+                - pre-crypto check: algorithm predicts that the packet wrap the window
+                    -> Seqh = Th + 1
+                - integrity check: should pass
+                - post-crypto check: should pass
+                    -> AR window shift (may set the window in Case B)
+                    -> Seql is marked in the AR window
+        """
+        p.scapy_tra_sa.seq_num = seq_num(Th + 1, Tl)
+        pkts = [
+            (
+                Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                / p.scapy_tra_sa.encrypt(
+                    IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                    seq_num=seq,
+                )
+            )
+            # set the window in Case B (the minimum window size is 64
+            # so we are sure to overlap)
+            for seq in range(seq_num(Th + 1, 10), seq_num(Th + 1, 20))
+        ]
+
+        self.send_and_expect(self.tra_if, pkts, self.tra_if)
+
+        # reset the VPP's RX AR window to Case A
+        Th = 1
+        Tl = 2 * anti_replay_window_size + 40
+        Bl = Tl - anti_replay_window_size + 1
+
+        self.vapi.cli(f"test ipsec sa {p.scapy_tra_sa_id} seq {seq_num(Th, Tl):#x}")
+
+        p.scapy_tra_sa.seq_num = seq_num(Th + 1, Tl)
+        pkts = [
+            (
+                Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                / p.scapy_tra_sa.encrypt(
+                    IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                    seq_num=seq,
+                )
+            )
+            # the AR will stay in Case A
+            for seq in range(
+                seq_num(Th + 1, anti_replay_window_size + 10),
+                seq_num(Th + 1, anti_replay_window_size + 20),
+            )
+        ]
+
+        self.send_and_expect(self.tra_if, pkts, self.tra_if)
+
+        """
+            case h: Bl <= Seql <= Tl
+                - pre-crypto check: algorithm predicts that the packet is in the window
+                    -> Seqh = Th
+                    -> check for a replayed packet with Seql
+                - integrity check: the wrap is not detected, should fail
+                - post-crypto check: ...
+        """
+        Th += 1
+        Tl = anti_replay_window_size + 20
+        Bl = Tl - anti_replay_window_size + 1
+
+        p.scapy_tra_sa.seq_num = seq_num(Th + 1, Tl)
+
+        pkts = [
+            (
+                Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                / p.scapy_tra_sa.encrypt(
+                    IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                    seq_num=seq,
+                )
+            )
+            for seq in range(seq_num(Th + 1, Tl - 20), seq_num(Th + 1, Tl - 5))
+        ]
+
+        self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
+
+        # some packets are rejected by the pre-crypto check
+        replay_count += 5
+        self.assertEqual(self.get_replay_counts(p), replay_count)
+
+        # out-of-window packets fail integrity check
+        hash_failed_count += len(pkts) - 5
+        self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
+
+        """
+            case i: Seql > Tl
+                - pre-crypto check: algorithm predicts that the packet does not wrap the window
+                    -> Seqh = Th
+                - integrity check: the wrap is not detected, shoud fail
+                - post-crypto check: ...
+        """
+        pkts = [
+            (
+                Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                / p.scapy_tra_sa.encrypt(
+                    IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                    seq_num=seq,
+                )
+            )
+            for seq in range(seq_num(Th + 1, Tl + 5), seq_num(Th + 1, Tl + 15))
+        ]
+
+        # out-of-window packets fail integrity check
+        self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
+        hash_failed_count += len(pkts)
+        self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
+
+        """
+            RFC 4303 Appendix A2. Case B
+
+                        Th - 1               Th                Th + 1
+            ----|-a-+-----b----+--c--|-d-+----e-----+--f--|-g-+--h---
+            =========          ===========          ===========
+                    Tl-        Bl       Tl          Bl+       Tl+
+
+            Case B implies Tl < W - 1
+        """
+
+        # reset the VPP's RX AR window to Case B
+        Th = 2
+        Tl = 30  # minimum window size of 64, we are sure to overlap
+        Bl = (Tl - anti_replay_window_size + 1) % (1 << 32)
+
+        self.vapi.cli(f"test ipsec sa {p.scapy_tra_sa_id} seq {seq_num(Th, Tl):#x}")
+        p.scapy_tra_sa.seq_num = seq_num(Th, Tl)
+
+        """
+            case a: Seql <= Tl < Bl
+                - pre-crypto check: algorithm predicts that the packet is in the window
+                    -> Seqh = Th
+                    -> check for replayed packet
+                - integrity check: should fail
+                - post-crypto check: ...
+        """
+        pkts = [
+            (
+                Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                / p.scapy_tra_sa.encrypt(
+                    IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                    seq_num=seq,
+                )
+            )
+            for seq in range(seq_num(Th, 5), seq_num(Th, 10))
+        ]
+
+        self.send_and_expect(self.tra_if, pkts, self.tra_if)
+
+        p.scapy_tra_sa.seq_num = seq_num(Th - 1, Tl)
+        pkts = [
+            (
+                Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                / p.scapy_tra_sa.encrypt(
+                    IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                    seq_num=seq,
+                )
+            )
+            for seq in range(seq_num(Th - 1, 0), seq_num(Th - 1, 15))
+        ]
+
+        self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
+
+        # some packets are rejected by the pre-crypto check
+        replay_count += 5
+        self.assertEqual(self.get_replay_counts(p), replay_count)
+
+        # out-of-window packets fail integrity check
+        hash_failed_count += len(pkts) - 5
+        self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
+
+        """
+            case b: Tl < Seql < Bl
+                - pre-crypto check: algorithm predicts that the packet will shift the window
+                    -> Seqh = Th
+                - integrity check: should fail
+                - post-crypto check: ...
+        """
+        pkts = [
+            (
+                Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                / p.scapy_tra_sa.encrypt(
+                    IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                    seq_num=seq,
+                )
+            )
+            for seq in range(seq_num(Th - 1, Tl + 10), seq_num(Th - 1, Tl + 20))
+        ]
+
+        self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
+
+        # out-of-window packets fail integrity check
+        hash_failed_count += len(pkts)
+        self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
+
+        """
+            case c: Tl < Bl <= Seql
+                - pre-crypto check: algorithm predicts that the packet is in the window
+                    -> Seqh = Th - 1
+                    -> check for a replayed packet with Seql
+                - integrity check: should pass
+                - post-crypto check: should pass
+                    -> Seql is marked in the AR window
+        """
+        pkts = [
+            (
+                Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                / p.scapy_tra_sa.encrypt(
+                    IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                    seq_num=seq,
+                )
+            )
+            for seq in range(seq_num(Th - 1, Bl + 10), seq_num(Th - 1, Bl + 20))
+        ]
+
+        self.send_and_expect(self.tra_if, pkts, self.tra_if)
+
+        """
+            case d: Seql <= Tl < Bl
+                - pre-crypto check: algorithm predicts that the packet is the window
+                    -> Seqh = Th
+                    -> check for replayed packet
+                - integrity check: should pass
+                - post-crypto check: should pass
+                    -> Seql is marked in the AR window
+        """
+        p.scapy_tra_sa.seq_num = seq_num(Th, Tl)
+        pkts = [
+            (
+                Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                / p.scapy_tra_sa.encrypt(
+                    IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                    seq_num=seq,
+                )
+            )
+            for seq in range(seq_num(Th, 15), seq_num(Th, 25))
+        ]
+
+        self.send_and_expect(self.tra_if, pkts, self.tra_if)
+
+        """
+            case e: Tl < Seql < Bl
+                - pre-crypto check: algorithm predicts that the packet is in the window
+                    -> Seqh = Th
+                    -> check for a replayed packet with Seql
+                - integrity check: should pass
+                - post-crypto check: should pass
+                    -> AR window shift (may set the window in Case A)
+                    -> Seql is marked in the AR window
+        """
+        pkts = [
+            (
+                Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                / p.scapy_tra_sa.encrypt(
+                    IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                    seq_num=seq,
+                )
+            )
+            for seq in range(seq_num(Th, Tl + 5), seq_num(Th, Tl + 15))
+        ]
+
+        # the window stays in Case B
+        self.send_and_expect(self.tra_if, pkts, self.tra_if)
+
+        pkts = [
+            (
+                Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                / p.scapy_tra_sa.encrypt(
+                    IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                    seq_num=seq,
+                )
+            )
+            for seq in range(
+                seq_num(Th, Tl + anti_replay_window_size + 5),
+                seq_num(Th, Tl + anti_replay_window_size + 15),
+            )
+        ]
+
+        # the window moves to Case A
+        self.send_and_expect(self.tra_if, pkts, self.tra_if)
+
+        # reset the VPP's RX AR window to Case B
+        Th = 2
+        Tl = 30  # minimum window size of 64, we are sure to overlap
+        Bl = (Tl - anti_replay_window_size + 1) % (1 << 32)
+
+        self.vapi.cli(f"test ipsec sa {p.scapy_tra_sa_id} seq {seq_num(Th, Tl):#x}")
+        p.scapy_tra_sa.seq_num = seq_num(Th, Tl)
+
+        """
+            case f: Tl < Bl <= Seql
+                - pre-crypto check: algorithm predicts that the packet is in the previous window
+                    -> Seqh = Th - 1
+                    -> check for a replayed packet with Seql
+                - integrity check: should fail
+                - post-crypto check: ...
+        """
+        pkts = [
+            (
+                Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                / p.scapy_tra_sa.encrypt(
+                    IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                    seq_num=seq,
+                )
+            )
+            for seq in range(seq_num(Th, Bl + 10), seq_num(Th, Bl + 20))
+        ]
+
+        self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
+
+        # out-of-window packets fail integrity check
+        hash_failed_count += len(pkts)
+        self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
+
+        """
+            case g: Seql <= Tl < Bl
+                - pre-crypto check: algorithm predicts that the packet is the window
+                    -> Seqh = Th
+                    -> check for replayed packet
+                - integrity check: should fail
+                - post-crypto check: ...
+        """
+        pkts = [
+            (
+                Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                / p.scapy_tra_sa.encrypt(
+                    IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                    seq_num=seq,
+                )
+            )
+            for seq in range(seq_num(Th, 10), seq_num(Th, 15))
+        ]
+
+        self.send_and_expect(self.tra_if, pkts, self.tra_if)
+
+        p.scapy_tra_sa.seq_num = seq_num(Th + 1, Tl)
+        pkts = [
+            (
+                Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                / p.scapy_tra_sa.encrypt(
+                    IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                    seq_num=seq,
+                )
+            )
+            for seq in range(seq_num(Th + 1, 0), seq_num(Th + 1, 15))
+        ]
+
+        self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
+
+        # some packets are rejected by the pre-crypto check
+        replay_count += 5
+        self.assertEqual(self.get_replay_counts(p), replay_count)
+
+        # out-of-window packets fail integrity check
+        hash_failed_count += len(pkts) - 5
+        self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
+
+        """
+            case h: Tl < Seql < Bl
+                - pre-crypto check: algorithm predicts that the packet will shift the window
+                    -> Seqh = Th
+                - integrity check: should fail
+                - post-crypto check: ...
+        """
+        pkts = [
+            (
+                Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                / p.scapy_tra_sa.encrypt(
+                    IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                    seq_num=seq,
+                )
+            )
+            for seq in range(seq_num(Th + 1, Tl + 10), seq_num(Th + 1, Tl + 20))
+        ]
+
+        self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
+
+        # out-of-window packets fail integrity check
+        hash_failed_count += len(pkts)
+        self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
+
+    def _verify_tra_anti_replay_algorithm_no_esn(self):
+        def seq_num(seql):
+            return seql & 0xFFFF_FFFF
+
+        p = self.params[socket.AF_INET]
+        anti_replay_window_size = p.anti_replay_window_size
+
+        seq_cycle_node_name = "/err/%s/seq_cycled" % self.tra4_encrypt_node_name
+        replay_count = self.get_replay_counts(p)
+        hash_failed_count = self.get_hash_failed_counts(p)
+        seq_cycle_count = self.statistics.get_err_counter(seq_cycle_node_name)
+
+        if ESP == self.encryption_type:
+            undersize_node_name = "/err/%s/runt" % self.tra4_decrypt_node_name[0]
+            undersize_count = self.statistics.get_err_counter(undersize_node_name)
+
+        # reset the TX SA to avoid conflict with left configuration
+        self.vapi.cli(f"test ipsec sa {p.vpp_tra_sa_id} seq 0x0")
+
+        """
+        RFC 4303 Appendix A2. Case A
+
+        a-c: possible seq num received
+        +: Bl, Tl
+
+        |--a--+---b---+-c--|
+              =========
+              Bl      Tl
+
+        No ESN implies Th = 0
+        Case A implies Tl >= W - 1
+        """
+
+        Tl = anti_replay_window_size + 40
+        Bl = Tl - anti_replay_window_size + 1
+
+        # move VPP's RX AR window to Case A
+        self.vapi.cli(f"test ipsec sa {p.scapy_tra_sa_id} seq {seq_num(Tl):#x}")
+        p.scapy_tra_sa.seq_num = seq_num(Tl)
+
+        """
+        case a: Seql < Bl
+            - pre-crypto check: algorithm predicts that the packet is out of window
+                -> packet should be dropped
+            - integrity check: ...
+            - post-crypto check: ...
+        """
+        pkts = [
+            (
+                Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                / p.scapy_tra_sa.encrypt(
+                    IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                    seq_num=seq,
+                )
+            )
+            for seq in range(seq_num(Bl - 20), seq_num(Bl - 5))
+        ]
+
+        # out-of-window packets
+        self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
+        replay_count += len(pkts)
+        self.assertEqual(self.get_replay_counts(p), replay_count)
+
+        """
+            case b: Bl <= Seql <= Tl
+                - pre-crypto check: algorithm predicts that the packet is in the window
+                    -> check for a replayed packet with Seql
+                - integrity check: should pass
+                - post-crypto check:
+                    -> check for a replayed packet with Seql
+        """
+        pkts = [
+            (
+                Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                / p.scapy_tra_sa.encrypt(
+                    IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                    seq_num=seq,
+                )
+            )
+            for seq in range(seq_num(Tl - 50), seq_num(Tl - 30))
+        ]
+        self.send_and_expect(self.tra_if, pkts, self.tra_if)
+
+        pkts = [
+            (
+                Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                / p.scapy_tra_sa.encrypt(
+                    IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                    seq_num=seq,
+                )
+            )
+            for seq in range(seq_num(Tl - 35), seq_num(Tl - 30))
+        ]
+
+        self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
+
+        # replayed packets
+        replay_count += 5
+        self.assertEqual(self.get_replay_counts(p), replay_count)
+
+        """
+            case c: Seql > Tl
+                - pre-crypto check: algorithm predicts that the packet will shift the window
+                - integrity check: should pass
+                - post-crypto check: should pass
+                    -> AR window is shifted
+        """
+        pkts = [
+            (
+                Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                / p.scapy_tra_sa.encrypt(
+                    IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                    seq_num=seq,
+                )
+            )
+            for seq in range(seq_num(Tl + 5), seq_num(Tl + 20))
+        ]
+
+        self.send_and_expect(self.tra_if, pkts, self.tra_if)
+
+        """
+            RFC 4303 Appendix A2. Case B
+
+            |-a-----+------b-----|
+            =========
+                    Tl
+
+            Case B implies Tl < W - 1
+        """
+
+        # reset the VPP's RX AR window to Case B
+        Tl = 30  # minimum window size of 64, we are sure to overlap
+        Bl = seq_num(Tl - anti_replay_window_size + 1)
+
+        self.vapi.cli(f"test ipsec sa {p.scapy_tra_sa_id} seq {seq_num(Tl):#x}")
+
+        """
+        case a: Seql <= Tl < Bl
+            - pre-crypto check: algorithm predicts that the packet is in the window
+                -> check for replayed packet
+            - integrity check: should fail
+            - post-crypto check: ...
+        """
+        pkts = [
+            (
+                Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                / p.scapy_tra_sa.encrypt(
+                    IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                    seq_num=seq,
+                )
+            )
+            for seq in range(seq_num(5), seq_num(10))
+        ]
+
+        self.send_and_expect(self.tra_if, pkts, self.tra_if)
+
+        """
+            case b: Tl < Seql < Bl
+                - pre-crypto check: algorithm predicts that the packet will shift the window
+                - integrity check: should pass
+                - post-crypto check: should pass
+                    -> AR window is shifted
+        """
+        pkts = [
+            (
+                Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                / p.scapy_tra_sa.encrypt(
+                    IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                    seq_num=seq,
+                )
+            )
+            for seq in range(seq_num(-50), seq_num(-20))
+        ]
+
+        self.send_and_expect(self.tra_if, pkts, self.tra_if)
+
+    def verify_tra_anti_replay_algorithm(self):
+        if self.params[socket.AF_INET].vpp_tra_sa.esn_en:
+            self._verify_tra_anti_replay_algorithm_esn()
+        else:
+            self._verify_tra_anti_replay_algorithm_no_esn()
+
 
 class IpsecTra4Tests(IpsecTra4):
 
 class IpsecTra4Tests(IpsecTra4):
-    """ UT test methods for Transport v4 """
+    """UT test methods for Transport v4"""
+
     def test_tra_anti_replay(self):
     def test_tra_anti_replay(self):
-        """ ipsec v4 transport anti-replay test """
+        """ipsec v4 transport anti-replay test"""
         self.verify_tra_anti_replay()
 
         self.verify_tra_anti_replay()
 
+    def test_tra_anti_replay_algorithm(self):
+        """ipsec v4 transport anti-replay algorithm test"""
+        self.verify_tra_anti_replay_algorithm()
+
     def test_tra_lost(self):
     def test_tra_lost(self):
-        """ ipsec v4 transport lost packet test """
+        """ipsec v4 transport lost packet test"""
         self.verify_tra_lost()
 
     def test_tra_basic(self, count=1):
         self.verify_tra_lost()
 
     def test_tra_basic(self, count=1):
-        """ ipsec v4 transport basic test """
+        """ipsec v4 transport basic test"""
         self.verify_tra_basic4(count=1)
 
     def test_tra_burst(self):
         self.verify_tra_basic4(count=1)
 
     def test_tra_burst(self):
-        """ ipsec v4 transport burst test """
+        """ipsec v4 transport burst test"""
         self.verify_tra_basic4(count=257)
 
 
 class IpsecTra6(object):
         self.verify_tra_basic4(count=257)
 
 
 class IpsecTra6(object):
-    """ verify methods for Transport v6 """
+    """verify methods for Transport v6"""
+
     def verify_tra_basic6(self, count=1, payload_size=54):
         self.vapi.cli("clear errors")
         self.vapi.cli("clear ipsec sa")
         try:
             p = self.params[socket.AF_INET6]
     def verify_tra_basic6(self, count=1, payload_size=54):
         self.vapi.cli("clear errors")
         self.vapi.cli("clear ipsec sa")
         try:
             p = self.params[socket.AF_INET6]
-            send_pkts = self.gen_encrypt_pkts6(p, p.scapy_tra_sa, self.tra_if,
-                                               src=self.tra_if.remote_ip6,
-                                               dst=self.tra_if.local_ip6,
-                                               count=count,
-                                               payload_size=payload_size)
-            recv_pkts = self.send_and_expect(self.tra_if, send_pkts,
-                                             self.tra_if)
+            send_pkts = self.gen_encrypt_pkts6(
+                p,
+                p.scapy_tra_sa,
+                self.tra_if,
+                src=self.tra_if.remote_ip6,
+                dst=self.tra_if.local_ip6,
+                count=count,
+                payload_size=payload_size,
+            )
+            recv_pkts = self.send_and_expect(self.tra_if, send_pkts, self.tra_if)
             for rx in recv_pkts:
             for rx in recv_pkts:
-                self.assertEqual(len(rx) - len(Ether()) - len(IPv6()),
-                                 rx[IPv6].plen)
+                self.assertEqual(len(rx) - len(Ether()) - len(IPv6()), rx[IPv6].plen)
                 try:
                     decrypted = p.vpp_tra_sa.decrypt(rx[IPv6])
                     self.assert_packet_checksums_valid(decrypted)
                 try:
                     decrypted = p.vpp_tra_sa.decrypt(rx[IPv6])
                     self.assert_packet_checksums_valid(decrypted)
@@ -966,32 +1903,38 @@ class IpsecTra6(object):
             self.logger.info(self.vapi.ppcli("show error"))
             self.logger.info(self.vapi.ppcli("show ipsec all"))
 
             self.logger.info(self.vapi.ppcli("show error"))
             self.logger.info(self.vapi.ppcli("show ipsec all"))
 
-        pkts = p.tra_sa_in.get_stats()['packets']
-        self.assertEqual(pkts, count,
-                         "incorrect SA in counts: expected %d != %d" %
-                         (count, pkts))
-        pkts = p.tra_sa_out.get_stats()['packets']
-        self.assertEqual(pkts, count,
-                         "incorrect SA out counts: expected %d != %d" %
-                         (count, pkts))
+        pkts = p.tra_sa_in.get_stats()["packets"]
+        self.assertEqual(
+            pkts, count, "incorrect SA in counts: expected %d != %d" % (count, pkts)
+        )
+        pkts = p.tra_sa_out.get_stats()["packets"]
+        self.assertEqual(
+            pkts, count, "incorrect SA out counts: expected %d != %d" % (count, pkts)
+        )
         self.assert_packet_counter_equal(self.tra6_encrypt_node_name, count)
         self.assert_packet_counter_equal(self.tra6_decrypt_node_name[0], count)
 
         self.assert_packet_counter_equal(self.tra6_encrypt_node_name, count)
         self.assert_packet_counter_equal(self.tra6_decrypt_node_name[0], count)
 
-    def gen_encrypt_pkts_ext_hdrs6(self, sa, sw_intf, src, dst, count=1,
-                                   payload_size=54):
-        return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
-                sa.encrypt(IPv6(src=src, dst=dst) /
-                           ICMPv6EchoRequest(id=0, seq=1,
-                                             data='X' * payload_size))
-                for i in range(count)]
+    def gen_encrypt_pkts_ext_hdrs6(
+        self, sa, sw_intf, src, dst, count=1, payload_size=54
+    ):
+        return [
+            Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
+            / sa.encrypt(
+                IPv6(src=src, dst=dst)
+                / ICMPv6EchoRequest(id=0, seq=1, data="X" * payload_size)
+            )
+            for i in range(count)
+        ]
 
     def gen_pkts_ext_hdrs6(self, sw_intf, src, dst, count=1, payload_size=54):
 
     def gen_pkts_ext_hdrs6(self, sw_intf, src, dst, count=1, payload_size=54):
-        return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
-                IPv6(src=src, dst=dst) /
-                IPv6ExtHdrHopByHop() /
-                IPv6ExtHdrFragment(id=2, offset=200) /
-                Raw(b'\xff' * 200)
-                for i in range(count)]
+        return [
+            Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
+            / IPv6(src=src, dst=dst)
+            / IPv6ExtHdrHopByHop()
+            / IPv6ExtHdrFragment(id=2, offset=200)
+            / Raw(b"\xff" * 200)
+            for i in range(count)
+        ]
 
     def verify_tra_encrypted6(self, p, sa, rxs):
         decrypted = []
 
     def verify_tra_encrypted6(self, p, sa, rxs):
         decrypted = []
@@ -1017,10 +1960,13 @@ class IpsecTra6(object):
         #
         # check we can decrypt with options
         #
         #
         # check we can decrypt with options
         #
-        tx = self.gen_encrypt_pkts_ext_hdrs6(p.scapy_tra_sa, self.tra_if,
-                                             src=self.tra_if.remote_ip6,
-                                             dst=self.tra_if.local_ip6,
-                                             count=count)
+        tx = self.gen_encrypt_pkts_ext_hdrs6(
+            p.scapy_tra_sa,
+            self.tra_if,
+            src=self.tra_if.remote_ip6,
+            dst=self.tra_if.local_ip6,
+            count=count,
+        )
         self.send_and_expect(self.tra_if, tx, self.tra_if)
 
         #
         self.send_and_expect(self.tra_if, tx, self.tra_if)
 
         #
@@ -1029,11 +1975,12 @@ class IpsecTra6(object):
         #
 
         # one extension before ESP
         #
 
         # one extension before ESP
-        tx = (Ether(src=self.pg2.remote_mac, dst=self.pg2.local_mac) /
-              IPv6(src=self.tra_if.local_ip6,
-                   dst=self.tra_if.remote_ip6) /
-              IPv6ExtHdrFragment(id=2, offset=200) /
-              Raw(b'\xff' * 200))
+        tx = (
+            Ether(src=self.pg2.remote_mac, dst=self.pg2.local_mac)
+            / IPv6(src=self.tra_if.local_ip6, dst=self.tra_if.remote_ip6)
+            / IPv6ExtHdrFragment(id=2, offset=200)
+            / Raw(b"\xff" * 200)
+        )
 
         rxs = self.send_and_expect(self.pg2, [tx], self.tra_if)
         dcs = self.verify_tra_encrypted6(p, p.vpp_tra_sa, rxs)
 
         rxs = self.send_and_expect(self.pg2, [tx], self.tra_if)
         dcs = self.verify_tra_encrypted6(p, p.vpp_tra_sa, rxs)
@@ -1046,12 +1993,13 @@ class IpsecTra6(object):
             self.assert_equal(dc[IPv6ExtHdrFragment].id, 2)
 
         # two extensions before ESP
             self.assert_equal(dc[IPv6ExtHdrFragment].id, 2)
 
         # two extensions before ESP
-        tx = (Ether(src=self.pg2.remote_mac, dst=self.pg2.local_mac) /
-              IPv6(src=self.tra_if.local_ip6,
-                   dst=self.tra_if.remote_ip6) /
-              IPv6ExtHdrHopByHop() /
-              IPv6ExtHdrFragment(id=2, offset=200) /
-              Raw(b'\xff' * 200))
+        tx = (
+            Ether(src=self.pg2.remote_mac, dst=self.pg2.local_mac)
+            / IPv6(src=self.tra_if.local_ip6, dst=self.tra_if.remote_ip6)
+            / IPv6ExtHdrHopByHop()
+            / IPv6ExtHdrFragment(id=2, offset=200)
+            / Raw(b"\xff" * 200)
+        )
 
         rxs = self.send_and_expect(self.pg2, [tx], self.tra_if)
         dcs = self.verify_tra_encrypted6(p, p.vpp_tra_sa, rxs)
 
         rxs = self.send_and_expect(self.pg2, [tx], self.tra_if)
         dcs = self.verify_tra_encrypted6(p, p.vpp_tra_sa, rxs)
@@ -1062,13 +2010,14 @@ class IpsecTra6(object):
             self.assert_equal(dc[IPv6ExtHdrFragment].id, 2)
 
         # two extensions before ESP, one after
             self.assert_equal(dc[IPv6ExtHdrFragment].id, 2)
 
         # two extensions before ESP, one after
-        tx = (Ether(src=self.pg2.remote_mac, dst=self.pg2.local_mac) /
-              IPv6(src=self.tra_if.local_ip6,
-                   dst=self.tra_if.remote_ip6) /
-              IPv6ExtHdrHopByHop() /
-              IPv6ExtHdrFragment(id=2, offset=200) /
-              IPv6ExtHdrDestOpt() /
-              Raw(b'\xff' * 200))
+        tx = (
+            Ether(src=self.pg2.remote_mac, dst=self.pg2.local_mac)
+            / IPv6(src=self.tra_if.local_ip6, dst=self.tra_if.remote_ip6)
+            / IPv6ExtHdrHopByHop()
+            / IPv6ExtHdrFragment(id=2, offset=200)
+            / IPv6ExtHdrDestOpt()
+            / Raw(b"\xff" * 200)
+        )
 
         rxs = self.send_and_expect(self.pg2, [tx], self.tra_if)
         dcs = self.verify_tra_encrypted6(p, p.vpp_tra_sa, rxs)
 
         rxs = self.send_and_expect(self.pg2, [tx], self.tra_if)
         dcs = self.verify_tra_encrypted6(p, p.vpp_tra_sa, rxs)
@@ -1081,47 +2030,54 @@ class IpsecTra6(object):
 
 
 class IpsecTra6Tests(IpsecTra6):
 
 
 class IpsecTra6Tests(IpsecTra6):
-    """ UT test methods for Transport v6 """
+    """UT test methods for Transport v6"""
+
     def test_tra_basic6(self):
     def test_tra_basic6(self):
-        """ ipsec v6 transport basic test """
+        """ipsec v6 transport basic test"""
         self.verify_tra_basic6(count=1)
 
     def test_tra_burst6(self):
         self.verify_tra_basic6(count=1)
 
     def test_tra_burst6(self):
-        """ ipsec v6 transport burst test """
+        """ipsec v6 transport burst test"""
         self.verify_tra_basic6(count=257)
 
 
 class IpsecTra6ExtTests(IpsecTra6):
     def test_tra_ext_hdrs_66(self):
         self.verify_tra_basic6(count=257)
 
 
 class IpsecTra6ExtTests(IpsecTra6):
     def test_tra_ext_hdrs_66(self):
-        """ ipsec 6o6 tra extension headers test """
+        """ipsec 6o6 tra extension headers test"""
         self.verify_tra_66_ext_hdrs(self.params[socket.AF_INET6])
 
 
 class IpsecTra46Tests(IpsecTra4Tests, IpsecTra6Tests):
         self.verify_tra_66_ext_hdrs(self.params[socket.AF_INET6])
 
 
 class IpsecTra46Tests(IpsecTra4Tests, IpsecTra6Tests):
-    """ UT test methods for Transport v6 and v4"""
+    """UT test methods for Transport v6 and v4"""
+
     pass
 
 
 class IpsecTun4(object):
     pass
 
 
 class IpsecTun4(object):
-    """ verify methods for Tunnel v4 """
+    """verify methods for Tunnel v4"""
+
     def verify_counters4(self, p, count, n_frags=None, worker=None):
         if not n_frags:
             n_frags = count
     def verify_counters4(self, p, count, n_frags=None, worker=None):
         if not n_frags:
             n_frags = count
-        if (hasattr(p, "spd_policy_in_any")):
-            pkts = p.spd_policy_in_any.get_stats(worker)['packets']
-            self.assertEqual(pkts, count,
-                             "incorrect SPD any policy: expected %d != %d" %
-                             (count, pkts))
-
-        if (hasattr(p, "tun_sa_in")):
-            pkts = p.tun_sa_in.get_stats(worker)['packets']
-            self.assertEqual(pkts, count,
-                             "incorrect SA in counts: expected %d != %d" %
-                             (count, pkts))
-            pkts = p.tun_sa_out.get_stats(worker)['packets']
-            self.assertEqual(pkts, n_frags,
-                             "incorrect SA out counts: expected %d != %d" %
-                             (count, pkts))
+        if hasattr(p, "spd_policy_in_any"):
+            pkts = p.spd_policy_in_any.get_stats(worker)["packets"]
+            self.assertEqual(
+                pkts,
+                count,
+                "incorrect SPD any policy: expected %d != %d" % (count, pkts),
+            )
+
+        if hasattr(p, "tun_sa_in"):
+            pkts = p.tun_sa_in.get_stats(worker)["packets"]
+            self.assertEqual(
+                pkts, count, "incorrect SA in counts: expected %d != %d" % (count, pkts)
+            )
+            pkts = p.tun_sa_out.get_stats(worker)["packets"]
+            self.assertEqual(
+                pkts,
+                n_frags,
+                "incorrect SA out counts: expected %d != %d" % (count, pkts),
+            )
 
         self.assert_packet_counter_equal(self.tun4_encrypt_node_name, n_frags)
         self.assert_packet_counter_equal(self.tun4_decrypt_node_name[0], count)
 
         self.assert_packet_counter_equal(self.tun4_encrypt_node_name, n_frags)
         self.assert_packet_counter_equal(self.tun4_decrypt_node_name[0], count)
@@ -1145,7 +2101,7 @@ class IpsecTun4(object):
         decrypt_pkts = []
         for rx in rxs:
             if p.nat_header:
         decrypt_pkts = []
         for rx in rxs:
             if p.nat_header:
-                self.assertEqual(rx[UDP].dport, 4500)
+                self.assertEqual(rx[UDP].dport, p.nat_header.dport)
             self.assert_packet_checksums_valid(rx)
             self.assertEqual(len(rx) - len(Ether()), rx[IP].len)
             try:
             self.assert_packet_checksums_valid(rx)
             self.assertEqual(len(rx) - len(Ether()), rx[IP].len)
             try:
@@ -1176,19 +2132,26 @@ class IpsecTun4(object):
         if not n_rx:
             n_rx = count
         try:
         if not n_rx:
             n_rx = count
         try:
-            send_pkts = self.gen_encrypt_pkts(p, p.scapy_tun_sa, self.tun_if,
-                                              src=p.remote_tun_if_host,
-                                              dst=self.pg1.remote_ip4,
-                                              count=count,
-                                              payload_size=payload_size)
+            send_pkts = self.gen_encrypt_pkts(
+                p,
+                p.scapy_tun_sa,
+                self.tun_if,
+                src=p.remote_tun_if_host,
+                dst=self.pg1.remote_ip4,
+                count=count,
+                payload_size=payload_size,
+            )
             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
             self.verify_decrypted(p, recv_pkts)
 
             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
             self.verify_decrypted(p, recv_pkts)
 
-            send_pkts = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
-                                      dst=p.remote_tun_if_host, count=count,
-                                      payload_size=payload_size)
-            recv_pkts = self.send_and_expect(self.pg1, send_pkts,
-                                             self.tun_if, n_rx)
+            send_pkts = self.gen_pkts(
+                self.pg1,
+                src=self.pg1.remote_ip4,
+                dst=p.remote_tun_if_host,
+                count=count,
+                payload_size=payload_size,
+            )
+            recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if, n_rx)
             self.verify_encrypted(p, p.vpp_tun_sa, recv_pkts)
 
             for rx in recv_pkts:
             self.verify_encrypted(p, p.vpp_tun_sa, recv_pkts)
 
             for rx in recv_pkts:
@@ -1208,15 +2171,23 @@ class IpsecTun4(object):
         if not n_rx:
             n_rx = count
         try:
         if not n_rx:
             n_rx = count
         try:
-            send_pkts = self.gen_encrypt_pkts(p, p.scapy_tun_sa, self.tun_if,
-                                              src=p.remote_tun_if_host,
-                                              dst=self.pg1.remote_ip4,
-                                              count=count)
+            send_pkts = self.gen_encrypt_pkts(
+                p,
+                p.scapy_tun_sa,
+                self.tun_if,
+                src=p.remote_tun_if_host,
+                dst=self.pg1.remote_ip4,
+                count=count,
+            )
             self.send_and_assert_no_replies(self.tun_if, send_pkts)
 
             self.send_and_assert_no_replies(self.tun_if, send_pkts)
 
-            send_pkts = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
-                                      dst=p.remote_tun_if_host, count=count,
-                                      payload_size=payload_size)
+            send_pkts = self.gen_pkts(
+                self.pg1,
+                src=self.pg1.remote_ip4,
+                dst=p.remote_tun_if_host,
+                count=count,
+                payload_size=payload_size,
+            )
             self.send_and_assert_no_replies(self.pg1, send_pkts)
 
         finally:
             self.send_and_assert_no_replies(self.pg1, send_pkts)
 
         finally:
@@ -1226,23 +2197,27 @@ class IpsecTun4(object):
     def verify_tun_reass_44(self, p):
         self.vapi.cli("clear errors")
         self.vapi.ip_reassembly_enable_disable(
     def verify_tun_reass_44(self, p):
         self.vapi.cli("clear errors")
         self.vapi.ip_reassembly_enable_disable(
-            sw_if_index=self.tun_if.sw_if_index, enable_ip4=True)
+            sw_if_index=self.tun_if.sw_if_index, enable_ip4=True
+        )
 
         try:
 
         try:
-            send_pkts = self.gen_encrypt_pkts(p, p.scapy_tun_sa, self.tun_if,
-                                              src=p.remote_tun_if_host,
-                                              dst=self.pg1.remote_ip4,
-                                              payload_size=1900,
-                                              count=1)
+            send_pkts = self.gen_encrypt_pkts(
+                p,
+                p.scapy_tun_sa,
+                self.tun_if,
+                src=p.remote_tun_if_host,
+                dst=self.pg1.remote_ip4,
+                payload_size=1900,
+                count=1,
+            )
             send_pkts = fragment_rfc791(send_pkts[0], 1400)
             send_pkts = fragment_rfc791(send_pkts[0], 1400)
-            recv_pkts = self.send_and_expect(self.tun_if, send_pkts,
-                                             self.pg1, n_rx=1)
+            recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1, n_rx=1)
             self.verify_decrypted(p, recv_pkts)
 
             self.verify_decrypted(p, recv_pkts)
 
-            send_pkts = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
-                                      dst=p.remote_tun_if_host, count=1)
-            recv_pkts = self.send_and_expect(self.pg1, send_pkts,
-                                             self.tun_if)
+            send_pkts = self.gen_pkts(
+                self.pg1, src=self.pg1.remote_ip4, dst=p.remote_tun_if_host, count=1
+            )
+            recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
             self.verify_encrypted(p, p.vpp_tun_sa, recv_pkts)
 
         finally:
             self.verify_encrypted(p, p.vpp_tun_sa, recv_pkts)
 
         finally:
@@ -1251,23 +2226,33 @@ class IpsecTun4(object):
 
         self.verify_counters4(p, 1, 1)
         self.vapi.ip_reassembly_enable_disable(
 
         self.verify_counters4(p, 1, 1)
         self.vapi.ip_reassembly_enable_disable(
-            sw_if_index=self.tun_if.sw_if_index, enable_ip4=False)
+            sw_if_index=self.tun_if.sw_if_index, enable_ip4=False
+        )
 
     def verify_tun_64(self, p, count=1):
         self.vapi.cli("clear errors")
         self.vapi.cli("clear ipsec sa")
         try:
 
     def verify_tun_64(self, p, count=1):
         self.vapi.cli("clear errors")
         self.vapi.cli("clear ipsec sa")
         try:
-            send_pkts = self.gen_encrypt_pkts6(p, p.scapy_tun_sa, self.tun_if,
-                                               src=p.remote_tun_if_host6,
-                                               dst=self.pg1.remote_ip6,
-                                               count=count)
+            send_pkts = self.gen_encrypt_pkts6(
+                p,
+                p.scapy_tun_sa,
+                self.tun_if,
+                src=p.remote_tun_if_host6,
+                dst=self.pg1.remote_ip6,
+                count=count,
+            )
             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
             for recv_pkt in recv_pkts:
                 self.assert_equal(recv_pkt[IPv6].src, p.remote_tun_if_host6)
                 self.assert_equal(recv_pkt[IPv6].dst, self.pg1.remote_ip6)
                 self.assert_packet_checksums_valid(recv_pkt)
             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
             for recv_pkt in recv_pkts:
                 self.assert_equal(recv_pkt[IPv6].src, p.remote_tun_if_host6)
                 self.assert_equal(recv_pkt[IPv6].dst, self.pg1.remote_ip6)
                 self.assert_packet_checksums_valid(recv_pkt)
-            send_pkts = self.gen_pkts6(p, self.pg1, src=self.pg1.remote_ip6,
-                                       dst=p.remote_tun_if_host6, count=count)
+            send_pkts = self.gen_pkts6(
+                p,
+                self.pg1,
+                src=self.pg1.remote_ip6,
+                dst=p.remote_tun_if_host6,
+                count=count,
+            )
             recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
             for recv_pkt in recv_pkts:
                 try:
             recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
             for recv_pkt in recv_pkts:
                 try:
@@ -1280,8 +2265,7 @@ class IpsecTun4(object):
                 except:
                     self.logger.error(ppp("Unexpected packet:", recv_pkt))
                     try:
                 except:
                     self.logger.error(ppp("Unexpected packet:", recv_pkt))
                     try:
-                        self.logger.debug(
-                            ppp("Decrypted packet:", decrypt_pkt))
+                        self.logger.debug(ppp("Decrypted packet:", decrypt_pkt))
                     except:
                         pass
                     raise
                     except:
                         pass
                     raise
@@ -1292,27 +2276,45 @@ class IpsecTun4(object):
         self.verify_counters4(p, count)
 
     def verify_keepalive(self, p):
         self.verify_counters4(p, count)
 
     def verify_keepalive(self, p):
-        pkt = (Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac) /
-               IP(src=p.remote_tun_if_host, dst=self.tun_if.local_ip4) /
-               UDP(sport=333, dport=4500) /
-               Raw(b'\xff'))
-        self.send_and_assert_no_replies(self.tun_if, pkt*31)
-        self.assert_error_counter_equal(
-            '/err/%s/NAT Keepalive' % self.tun4_input_node, 31)
-
-        pkt = (Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac) /
-               IP(src=p.remote_tun_if_host, dst=self.tun_if.local_ip4) /
-               UDP(sport=333, dport=4500) /
-               Raw(b'\xfe'))
-        self.send_and_assert_no_replies(self.tun_if, pkt*31)
+        # the sizeof Raw is calculated to pad to the minimum ehternet
+        # frame size of 64 btyes
+        pkt = (
+            Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac)
+            / IP(src=p.remote_tun_if_host, dst=self.tun_if.local_ip4)
+            / UDP(sport=333, dport=4500)
+            / Raw(b"\xff")
+            / Padding(0 * 21)
+        )
+        self.send_and_assert_no_replies(self.tun_if, pkt * 31)
         self.assert_error_counter_equal(
         self.assert_error_counter_equal(
-            '/err/%s/Too Short' % self.tun4_input_node, 31)
+            "/err/%s/nat_keepalive" % self.tun4_input_node, 31
+        )
+
+        pkt = (
+            Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac)
+            / IP(src=p.remote_tun_if_host, dst=self.tun_if.local_ip4)
+            / UDP(sport=333, dport=4500)
+            / Raw(b"\xfe")
+        )
+        self.send_and_assert_no_replies(self.tun_if, pkt * 31)
+        self.assert_error_counter_equal("/err/%s/too_short" % self.tun4_input_node, 31)
+
+        pkt = (
+            Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac)
+            / IP(src=p.remote_tun_if_host, dst=self.tun_if.local_ip4)
+            / UDP(sport=333, dport=4500)
+            / Raw(b"\xfe")
+            / Padding(0 * 21)
+        )
+        self.send_and_assert_no_replies(self.tun_if, pkt * 31)
+        self.assert_error_counter_equal("/err/%s/too_short" % self.tun4_input_node, 62)
 
 
 class IpsecTun4Tests(IpsecTun4):
 
 
 class IpsecTun4Tests(IpsecTun4):
-    """ UT test methods for Tunnel v4 """
+    """UT test methods for Tunnel v4"""
+
     def test_tun_basic44(self):
     def test_tun_basic44(self):
-        """ ipsec 4o4 tunnel basic test """
+        """ipsec 4o4 tunnel basic test"""
         self.verify_tun_44(self.params[socket.AF_INET], count=1)
         self.tun_if.admin_down()
         self.tun_if.resolve_arp()
         self.verify_tun_44(self.params[socket.AF_INET], count=1)
         self.tun_if.admin_down()
         self.tun_if.resolve_arp()
@@ -1320,27 +2322,30 @@ class IpsecTun4Tests(IpsecTun4):
         self.verify_tun_44(self.params[socket.AF_INET], count=1)
 
     def test_tun_reass_basic44(self):
         self.verify_tun_44(self.params[socket.AF_INET], count=1)
 
     def test_tun_reass_basic44(self):
-        """ ipsec 4o4 tunnel basic reassembly test """
+        """ipsec 4o4 tunnel basic reassembly test"""
         self.verify_tun_reass_44(self.params[socket.AF_INET])
 
     def test_tun_burst44(self):
         self.verify_tun_reass_44(self.params[socket.AF_INET])
 
     def test_tun_burst44(self):
-        """ ipsec 4o4 tunnel burst test """
+        """ipsec 4o4 tunnel burst test"""
         self.verify_tun_44(self.params[socket.AF_INET], count=127)
 
 
 class IpsecTun6(object):
         self.verify_tun_44(self.params[socket.AF_INET], count=127)
 
 
 class IpsecTun6(object):
-    """ verify methods for Tunnel v6 """
+    """verify methods for Tunnel v6"""
+
     def verify_counters6(self, p_in, p_out, count, worker=None):
     def verify_counters6(self, p_in, p_out, count, worker=None):
-        if (hasattr(p_in, "tun_sa_in")):
-            pkts = p_in.tun_sa_in.get_stats(worker)['packets']
-            self.assertEqual(pkts, count,
-                             "incorrect SA in counts: expected %d != %d" %
-                             (count, pkts))
-        if (hasattr(p_out, "tun_sa_out")):
-            pkts = p_out.tun_sa_out.get_stats(worker)['packets']
-            self.assertEqual(pkts, count,
-                             "incorrect SA out counts: expected %d != %d" %
-                             (count, pkts))
+        if hasattr(p_in, "tun_sa_in"):
+            pkts = p_in.tun_sa_in.get_stats(worker)["packets"]
+            self.assertEqual(
+                pkts, count, "incorrect SA in counts: expected %d != %d" % (count, pkts)
+            )
+        if hasattr(p_out, "tun_sa_out"):
+            pkts = p_out.tun_sa_out.get_stats(worker)["packets"]
+            self.assertEqual(
+                pkts,
+                count,
+                "incorrect SA out counts: expected %d != %d" % (count, pkts),
+            )
         self.assert_packet_counter_equal(self.tun6_encrypt_node_name, count)
         self.assert_packet_counter_equal(self.tun6_decrypt_node_name[0], count)
 
         self.assert_packet_counter_equal(self.tun6_encrypt_node_name, count)
         self.assert_packet_counter_equal(self.tun6_decrypt_node_name[0], count)
 
@@ -1353,8 +2358,7 @@ class IpsecTun6(object):
     def verify_encrypted6(self, p, sa, rxs):
         for rx in rxs:
             self.assert_packet_checksums_valid(rx)
     def verify_encrypted6(self, p, sa, rxs):
         for rx in rxs:
             self.assert_packet_checksums_valid(rx)
-            self.assertEqual(len(rx) - len(Ether()) - len(IPv6()),
-                             rx[IPv6].plen)
+            self.assertEqual(len(rx) - len(Ether()) - len(IPv6()), rx[IPv6].plen)
             self.assert_equal(rx[IPv6].hlim, p.outer_hop_limit)
             if p.outer_flow_label:
                 self.assert_equal(rx[IPv6].fl, p.outer_flow_label)
             self.assert_equal(rx[IPv6].hlim, p.outer_hop_limit)
             if p.outer_flow_label:
                 self.assert_equal(rx[IPv6].fl, p.outer_flow_label)
@@ -1379,9 +2383,14 @@ class IpsecTun6(object):
         self.vapi.cli("clear errors")
         self.vapi.cli("clear ipsec sa")
 
         self.vapi.cli("clear errors")
         self.vapi.cli("clear ipsec sa")
 
-        send_pkts = self.gen_pkts6(p_in, self.pg1, src=self.pg1.remote_ip6,
-                                   dst=p_in.remote_tun_if_host, count=count,
-                                   payload_size=payload_size)
+        send_pkts = self.gen_pkts6(
+            p_in,
+            self.pg1,
+            src=self.pg1.remote_ip6,
+            dst=p_in.remote_tun_if_host,
+            count=count,
+            payload_size=payload_size,
+        )
         self.send_and_assert_no_replies(self.tun_if, send_pkts)
         self.logger.info(self.vapi.cli("sh punt stats"))
 
         self.send_and_assert_no_replies(self.tun_if, send_pkts)
         self.logger.info(self.vapi.cli("sh punt stats"))
 
@@ -1389,18 +2398,19 @@ class IpsecTun6(object):
         self.vapi.cli("clear errors")
         self.vapi.cli("clear ipsec sa")
 
         self.vapi.cli("clear errors")
         self.vapi.cli("clear ipsec sa")
 
-        send_pkts = self.gen_encrypt_pkts6(p_in, p_in.scapy_tun_sa,
-                                           self.tun_if,
-                                           src=p_in.remote_tun_if_host,
-                                           dst=self.pg1.remote_ip6,
-                                           count=count)
+        send_pkts = self.gen_encrypt_pkts6(
+            p_in,
+            p_in.scapy_tun_sa,
+            self.tun_if,
+            src=p_in.remote_tun_if_host,
+            dst=self.pg1.remote_ip6,
+            count=count,
+        )
         self.send_and_assert_no_replies(self.tun_if, send_pkts)
 
     def verify_drop_tun_66(self, p_in, count=1, payload_size=64):
         self.send_and_assert_no_replies(self.tun_if, send_pkts)
 
     def verify_drop_tun_66(self, p_in, count=1, payload_size=64):
-        self.verify_drop_tun_tx_66(p_in, count=count,
-                                   payload_size=payload_size)
-        self.verify_drop_tun_rx_66(p_in, count=count,
-                                   payload_size=payload_size)
+        self.verify_drop_tun_tx_66(p_in, count=count, payload_size=payload_size)
+        self.verify_drop_tun_rx_66(p_in, count=count, payload_size=payload_size)
 
     def verify_tun_66(self, p_in, p_out=None, count=1, payload_size=64):
         self.vapi.cli("clear errors")
 
     def verify_tun_66(self, p_in, p_out=None, count=1, payload_size=64):
         self.vapi.cli("clear errors")
@@ -1408,19 +2418,26 @@ class IpsecTun6(object):
         if not p_out:
             p_out = p_in
         try:
         if not p_out:
             p_out = p_in
         try:
-            send_pkts = self.gen_encrypt_pkts6(p_in, p_in.scapy_tun_sa,
-                                               self.tun_if,
-                                               src=p_in.remote_tun_if_host,
-                                               dst=self.pg1.remote_ip6,
-                                               count=count,
-                                               payload_size=payload_size)
+            send_pkts = self.gen_encrypt_pkts6(
+                p_in,
+                p_in.scapy_tun_sa,
+                self.tun_if,
+                src=p_in.remote_tun_if_host,
+                dst=self.pg1.remote_ip6,
+                count=count,
+                payload_size=payload_size,
+            )
             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
             self.verify_decrypted6(p_in, recv_pkts)
 
             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
             self.verify_decrypted6(p_in, recv_pkts)
 
-            send_pkts = self.gen_pkts6(p_in, self.pg1, src=self.pg1.remote_ip6,
-                                       dst=p_out.remote_tun_if_host,
-                                       count=count,
-                                       payload_size=payload_size)
+            send_pkts = self.gen_pkts6(
+                p_in,
+                self.pg1,
+                src=self.pg1.remote_ip6,
+                dst=p_out.remote_tun_if_host,
+                count=count,
+                payload_size=payload_size,
+            )
             recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
             self.verify_encrypted6(p_out, p_out.vpp_tun_sa, recv_pkts)
 
             recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
             self.verify_encrypted6(p_out, p_out.vpp_tun_sa, recv_pkts)
 
@@ -1436,50 +2453,65 @@ class IpsecTun6(object):
     def verify_tun_reass_66(self, p):
         self.vapi.cli("clear errors")
         self.vapi.ip_reassembly_enable_disable(
     def verify_tun_reass_66(self, p):
         self.vapi.cli("clear errors")
         self.vapi.ip_reassembly_enable_disable(
-            sw_if_index=self.tun_if.sw_if_index, enable_ip6=True)
+            sw_if_index=self.tun_if.sw_if_index, enable_ip6=True
+        )
 
         try:
 
         try:
-            send_pkts = self.gen_encrypt_pkts6(p, p.scapy_tun_sa, self.tun_if,
-                                               src=p.remote_tun_if_host,
-                                               dst=self.pg1.remote_ip6,
-                                               count=1,
-                                               payload_size=1850)
+            send_pkts = self.gen_encrypt_pkts6(
+                p,
+                p.scapy_tun_sa,
+                self.tun_if,
+                src=p.remote_tun_if_host,
+                dst=self.pg1.remote_ip6,
+                count=1,
+                payload_size=1850,
+            )
             send_pkts = fragment_rfc8200(send_pkts[0], 1, 1400, self.logger)
             send_pkts = fragment_rfc8200(send_pkts[0], 1, 1400, self.logger)
-            recv_pkts = self.send_and_expect(self.tun_if, send_pkts,
-                                             self.pg1, n_rx=1)
+            recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1, n_rx=1)
             self.verify_decrypted6(p, recv_pkts)
 
             self.verify_decrypted6(p, recv_pkts)
 
-            send_pkts = self.gen_pkts6(p, self.pg1, src=self.pg1.remote_ip6,
-                                       dst=p.remote_tun_if_host,
-                                       count=1,
-                                       payload_size=64)
-            recv_pkts = self.send_and_expect(self.pg1, send_pkts,
-                                             self.tun_if)
+            send_pkts = self.gen_pkts6(
+                p,
+                self.pg1,
+                src=self.pg1.remote_ip6,
+                dst=p.remote_tun_if_host,
+                count=1,
+                payload_size=64,
+            )
+            recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
             self.verify_encrypted6(p, p.vpp_tun_sa, recv_pkts)
         finally:
             self.logger.info(self.vapi.ppcli("show error"))
             self.logger.info(self.vapi.ppcli("show ipsec all"))
         self.verify_counters6(p, p, 1)
         self.vapi.ip_reassembly_enable_disable(
             self.verify_encrypted6(p, p.vpp_tun_sa, recv_pkts)
         finally:
             self.logger.info(self.vapi.ppcli("show error"))
             self.logger.info(self.vapi.ppcli("show ipsec all"))
         self.verify_counters6(p, p, 1)
         self.vapi.ip_reassembly_enable_disable(
-            sw_if_index=self.tun_if.sw_if_index, enable_ip6=False)
+            sw_if_index=self.tun_if.sw_if_index, enable_ip6=False
+        )
 
     def verify_tun_46(self, p, count=1):
 
     def verify_tun_46(self, p, count=1):
-        """ ipsec 4o6 tunnel basic test """
+        """ipsec 4o6 tunnel basic test"""
         self.vapi.cli("clear errors")
         self.vapi.cli("clear ipsec sa")
         try:
         self.vapi.cli("clear errors")
         self.vapi.cli("clear ipsec sa")
         try:
-            send_pkts = self.gen_encrypt_pkts(p, p.scapy_tun_sa, self.tun_if,
-                                              src=p.remote_tun_if_host4,
-                                              dst=self.pg1.remote_ip4,
-                                              count=count)
+            send_pkts = self.gen_encrypt_pkts(
+                p,
+                p.scapy_tun_sa,
+                self.tun_if,
+                src=p.remote_tun_if_host4,
+                dst=self.pg1.remote_ip4,
+                count=count,
+            )
             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
             for recv_pkt in recv_pkts:
                 self.assert_equal(recv_pkt[IP].src, p.remote_tun_if_host4)
                 self.assert_equal(recv_pkt[IP].dst, self.pg1.remote_ip4)
                 self.assert_packet_checksums_valid(recv_pkt)
             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
             for recv_pkt in recv_pkts:
                 self.assert_equal(recv_pkt[IP].src, p.remote_tun_if_host4)
                 self.assert_equal(recv_pkt[IP].dst, self.pg1.remote_ip4)
                 self.assert_packet_checksums_valid(recv_pkt)
-            send_pkts = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
-                                      dst=p.remote_tun_if_host4,
-                                      count=count)
+            send_pkts = self.gen_pkts(
+                self.pg1,
+                src=self.pg1.remote_ip4,
+                dst=p.remote_tun_if_host4,
+                count=count,
+            )
             recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
             for recv_pkt in recv_pkts:
                 try:
             recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
             for recv_pkt in recv_pkts:
                 try:
@@ -1492,8 +2524,7 @@ class IpsecTun6(object):
                 except:
                     self.logger.debug(ppp("Unexpected packet:", recv_pkt))
                     try:
                 except:
                     self.logger.debug(ppp("Unexpected packet:", recv_pkt))
                     try:
-                        self.logger.debug(ppp("Decrypted packet:",
-                                              decrypt_pkt))
+                        self.logger.debug(ppp("Decrypted packet:", decrypt_pkt))
                     except:
                         pass
                     raise
                     except:
                         pass
                     raise
@@ -1502,29 +2533,64 @@ class IpsecTun6(object):
             self.logger.info(self.vapi.ppcli("show ipsec all"))
         self.verify_counters6(p, p, count)
 
             self.logger.info(self.vapi.ppcli("show ipsec all"))
         self.verify_counters6(p, p, count)
 
+    def verify_keepalive(self, p):
+        # the sizeof Raw is calculated to pad to the minimum ehternet
+        # frame size of 64 btyes
+        pkt = (
+            Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac)
+            / IPv6(src=p.remote_tun_if_host, dst=self.tun_if.local_ip6)
+            / UDP(sport=333, dport=4500)
+            / Raw(b"\xff")
+            / Padding(0 * 1)
+        )
+        self.send_and_assert_no_replies(self.tun_if, pkt * 31)
+        self.assert_error_counter_equal(
+            "/err/%s/nat_keepalive" % self.tun6_input_node, 31
+        )
+
+        pkt = (
+            Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac)
+            / IPv6(src=p.remote_tun_if_host, dst=self.tun_if.local_ip6)
+            / UDP(sport=333, dport=4500)
+            / Raw(b"\xfe")
+        )
+        self.send_and_assert_no_replies(self.tun_if, pkt * 31)
+        self.assert_error_counter_equal("/err/%s/too_short" % self.tun6_input_node, 31)
+
+        pkt = (
+            Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac)
+            / IPv6(src=p.remote_tun_if_host, dst=self.tun_if.local_ip6)
+            / UDP(sport=333, dport=4500)
+            / Raw(b"\xfe")
+            / Padding(0 * 21)
+        )
+        self.send_and_assert_no_replies(self.tun_if, pkt * 31)
+        self.assert_error_counter_equal("/err/%s/too_short" % self.tun6_input_node, 62)
+
 
 class IpsecTun6Tests(IpsecTun6):
 
 class IpsecTun6Tests(IpsecTun6):
-    """ UT test methods for Tunnel v6 """
+    """UT test methods for Tunnel v6"""
 
     def test_tun_basic66(self):
 
     def test_tun_basic66(self):
-        """ ipsec 6o6 tunnel basic test """
+        """ipsec 6o6 tunnel basic test"""
         self.verify_tun_66(self.params[socket.AF_INET6], count=1)
 
     def test_tun_reass_basic66(self):
         self.verify_tun_66(self.params[socket.AF_INET6], count=1)
 
     def test_tun_reass_basic66(self):
-        """ ipsec 6o6 tunnel basic reassembly test """
+        """ipsec 6o6 tunnel basic reassembly test"""
         self.verify_tun_reass_66(self.params[socket.AF_INET6])
 
     def test_tun_burst66(self):
         self.verify_tun_reass_66(self.params[socket.AF_INET6])
 
     def test_tun_burst66(self):
-        """ ipsec 6o6 tunnel burst test """
+        """ipsec 6o6 tunnel burst test"""
         self.verify_tun_66(self.params[socket.AF_INET6], count=257)
 
 
 class IpsecTun6HandoffTests(IpsecTun6):
         self.verify_tun_66(self.params[socket.AF_INET6], count=257)
 
 
 class IpsecTun6HandoffTests(IpsecTun6):
-    """ UT test methods for Tunnel v6 with multiple workers """
+    """UT test methods for Tunnel v6 with multiple workers"""
+
     vpp_worker_count = 2
 
     def test_tun_handoff_66(self):
     vpp_worker_count = 2
 
     def test_tun_handoff_66(self):
-        """ ipsec 6o6 tunnel worker hand-off test """
+        """ipsec 6o6 tunnel worker hand-off test"""
         self.vapi.cli("clear errors")
         self.vapi.cli("clear ipsec sa")
 
         self.vapi.cli("clear errors")
         self.vapi.cli("clear ipsec sa")
 
@@ -1534,31 +2600,42 @@ class IpsecTun6HandoffTests(IpsecTun6):
         # inject alternately on worker 0 and 1. all counts on the SA
         # should be against worker 0
         for worker in [0, 1, 0, 1]:
         # inject alternately on worker 0 and 1. all counts on the SA
         # should be against worker 0
         for worker in [0, 1, 0, 1]:
-            send_pkts = self.gen_encrypt_pkts6(p, p.scapy_tun_sa, self.tun_if,
-                                               src=p.remote_tun_if_host,
-                                               dst=self.pg1.remote_ip6,
-                                               count=N_PKTS)
-            recv_pkts = self.send_and_expect(self.tun_if, send_pkts,
-                                             self.pg1, worker=worker)
+            send_pkts = self.gen_encrypt_pkts6(
+                p,
+                p.scapy_tun_sa,
+                self.tun_if,
+                src=p.remote_tun_if_host,
+                dst=self.pg1.remote_ip6,
+                count=N_PKTS,
+            )
+            recv_pkts = self.send_and_expect(
+                self.tun_if, send_pkts, self.pg1, worker=worker
+            )
             self.verify_decrypted6(p, recv_pkts)
 
             self.verify_decrypted6(p, recv_pkts)
 
-            send_pkts = self.gen_pkts6(p, self.pg1, src=self.pg1.remote_ip6,
-                                       dst=p.remote_tun_if_host,
-                                       count=N_PKTS)
-            recv_pkts = self.send_and_expect(self.pg1, send_pkts,
-                                             self.tun_if, worker=worker)
+            send_pkts = self.gen_pkts6(
+                p,
+                self.pg1,
+                src=self.pg1.remote_ip6,
+                dst=p.remote_tun_if_host,
+                count=N_PKTS,
+            )
+            recv_pkts = self.send_and_expect(
+                self.pg1, send_pkts, self.tun_if, worker=worker
+            )
             self.verify_encrypted6(p, p.vpp_tun_sa, recv_pkts)
 
         # all counts against the first worker that was used
             self.verify_encrypted6(p, p.vpp_tun_sa, recv_pkts)
 
         # all counts against the first worker that was used
-        self.verify_counters6(p, p, 4*N_PKTS, worker=0)
+        self.verify_counters6(p, p, 4 * N_PKTS, worker=0)
 
 
 class IpsecTun4HandoffTests(IpsecTun4):
 
 
 class IpsecTun4HandoffTests(IpsecTun4):
-    """ UT test methods for Tunnel v4 with multiple workers """
+    """UT test methods for Tunnel v4 with multiple workers"""
+
     vpp_worker_count = 2
 
     def test_tun_handooff_44(self):
     vpp_worker_count = 2
 
     def test_tun_handooff_44(self):
-        """ ipsec 4o4 tunnel worker hand-off test """
+        """ipsec 4o4 tunnel worker hand-off test"""
         self.vapi.cli("clear errors")
         self.vapi.cli("clear ipsec sa")
 
         self.vapi.cli("clear errors")
         self.vapi.cli("clear ipsec sa")
 
@@ -1568,43 +2645,49 @@ class IpsecTun4HandoffTests(IpsecTun4):
         # inject alternately on worker 0 and 1. all counts on the SA
         # should be against worker 0
         for worker in [0, 1, 0, 1]:
         # inject alternately on worker 0 and 1. all counts on the SA
         # should be against worker 0
         for worker in [0, 1, 0, 1]:
-            send_pkts = self.gen_encrypt_pkts(p, p.scapy_tun_sa, self.tun_if,
-                                              src=p.remote_tun_if_host,
-                                              dst=self.pg1.remote_ip4,
-                                              count=N_PKTS)
-            recv_pkts = self.send_and_expect(self.tun_if, send_pkts,
-                                             self.pg1, worker=worker)
+            send_pkts = self.gen_encrypt_pkts(
+                p,
+                p.scapy_tun_sa,
+                self.tun_if,
+                src=p.remote_tun_if_host,
+                dst=self.pg1.remote_ip4,
+                count=N_PKTS,
+            )
+            recv_pkts = self.send_and_expect(
+                self.tun_if, send_pkts, self.pg1, worker=worker
+            )
             self.verify_decrypted(p, recv_pkts)
 
             self.verify_decrypted(p, recv_pkts)
 
-            send_pkts = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
-                                      dst=p.remote_tun_if_host,
-                                      count=N_PKTS)
-            recv_pkts = self.send_and_expect(self.pg1, send_pkts,
-                                             self.tun_if, worker=worker)
+            send_pkts = self.gen_pkts(
+                self.pg1,
+                src=self.pg1.remote_ip4,
+                dst=p.remote_tun_if_host,
+                count=N_PKTS,
+            )
+            recv_pkts = self.send_and_expect(
+                self.pg1, send_pkts, self.tun_if, worker=worker
+            )
             self.verify_encrypted(p, p.vpp_tun_sa, recv_pkts)
 
         # all counts against the first worker that was used
             self.verify_encrypted(p, p.vpp_tun_sa, recv_pkts)
 
         # all counts against the first worker that was used
-        self.verify_counters4(p, 4*N_PKTS, worker=0)
+        self.verify_counters4(p, 4 * N_PKTS, worker=0)
 
 
 class IpsecTun46Tests(IpsecTun4Tests, IpsecTun6Tests):
 
 
 class IpsecTun46Tests(IpsecTun4Tests, IpsecTun6Tests):
-    """ UT test methods for Tunnel v6 & v4 """
+    """UT test methods for Tunnel v6 & v4"""
+
     pass
 
 
     pass
 
 
-class SpdFlowCacheTemplate(VppTestCase):
+class IPSecIPv4Fwd(VppTestCase):
+    """Test IPSec by capturing and verifying IPv4 forwarded pkts"""
+
     @classmethod
     def setUpConstants(cls):
     @classmethod
     def setUpConstants(cls):
-        super(SpdFlowCacheTemplate, cls).setUpConstants()
-        # Override this method with required cmdline parameters e.g.
-        # cls.vpp_cmdline.extend(["ipsec", "{",
-        #                         "ipv4-outbound-spd-flow-cache on",
-        #                         "}"])
-        # cls.logger.info("VPP modified cmdline is %s" % " "
-        #                 .join(cls.vpp_cmdline))
+        super(IPSecIPv4Fwd, cls).setUpConstants()
 
     def setUp(self):
 
     def setUp(self):
-        super(SpdFlowCacheTemplate, self).setUp()
+        super(IPSecIPv4Fwd, self).setUp()
         # store SPD objects so we can remove configs on tear down
         self.spd_objs = []
         self.spd_policies = []
         # store SPD objects so we can remove configs on tear down
         self.spd_objs = []
         self.spd_policies = []
@@ -1622,7 +2705,7 @@ class SpdFlowCacheTemplate(VppTestCase):
         for pg in self.pg_interfaces:
             pg.unconfig_ip4()
             pg.admin_down()
         for pg in self.pg_interfaces:
             pg.unconfig_ip4()
             pg.admin_down()
-        super(SpdFlowCacheTemplate, self).tearDown()
+        super(IPSecIPv4Fwd, self).tearDown()
 
     def create_interfaces(self, num_ifs=2):
         # create interfaces pg0 ... pg<num_ifs>
 
     def create_interfaces(self, num_ifs=2):
         # create interfaces pg0 ... pg<num_ifs>
@@ -1656,9 +2739,27 @@ class SpdFlowCacheTemplate(VppTestCase):
         else:
             raise Exception("Invalid policy type: %s", policy_type)
 
         else:
             raise Exception("Invalid policy type: %s", policy_type)
 
-    def spd_add_rem_policy(self, spd_id, src_if, dst_if,
-                           proto, is_out, priority, policy_type,
-                           remove=False, all_ips=False):
+    def spd_add_rem_policy(
+        self,
+        spd_id,
+        src_if,
+        dst_if,
+        proto,
+        is_out,
+        priority,
+        policy_type,
+        remove=False,
+        all_ips=False,
+        ip_range=False,
+        local_ip_start=ip_address("0.0.0.0"),
+        local_ip_stop=ip_address("255.255.255.255"),
+        remote_ip_start=ip_address("0.0.0.0"),
+        remote_ip_stop=ip_address("255.255.255.255"),
+        remote_port_start=0,
+        remote_port_stop=65535,
+        local_port_start=0,
+        local_port_stop=65535,
+    ):
         spd = VppIpsecSpd(self, spd_id)
 
         if all_ips:
         spd = VppIpsecSpd(self, spd_id)
 
         if all_ips:
@@ -1666,23 +2767,38 @@ class SpdFlowCacheTemplate(VppTestCase):
             src_range_high = ip_address("255.255.255.255")
             dst_range_low = ip_address("0.0.0.0")
             dst_range_high = ip_address("255.255.255.255")
             src_range_high = ip_address("255.255.255.255")
             dst_range_low = ip_address("0.0.0.0")
             dst_range_high = ip_address("255.255.255.255")
+
+        elif ip_range:
+            src_range_low = local_ip_start
+            src_range_high = local_ip_stop
+            dst_range_low = remote_ip_start
+            dst_range_high = remote_ip_stop
+
         else:
             src_range_low = src_if.remote_ip4
             src_range_high = src_if.remote_ip4
             dst_range_low = dst_if.remote_ip4
             dst_range_high = dst_if.remote_ip4
 
         else:
             src_range_low = src_if.remote_ip4
             src_range_high = src_if.remote_ip4
             dst_range_low = dst_if.remote_ip4
             dst_range_high = dst_if.remote_ip4
 
-        spdEntry = VppIpsecSpdEntry(self, spd, 0,
-                                    src_range_low,
-                                    src_range_high,
-                                    dst_range_low,
-                                    dst_range_high,
-                                    proto,
-                                    priority=priority,
-                                    policy=self.get_policy(policy_type),
-                                    is_outbound=is_out)
-
-        if(remove is False):
+        spdEntry = VppIpsecSpdEntry(
+            self,
+            spd,
+            0,
+            src_range_low,
+            src_range_high,
+            dst_range_low,
+            dst_range_high,
+            proto,
+            priority=priority,
+            policy=self.get_policy(policy_type),
+            is_outbound=is_out,
+            remote_port_start=remote_port_start,
+            remote_port_stop=remote_port_stop,
+            local_port_start=local_port_start,
+            local_port_stop=local_port_stop,
+        )
+
+        if remove is False:
             spdEntry.add_vpp_config()
             self.spd_policies.append(spdEntry)
         else:
             spdEntry.add_vpp_config()
             self.spd_policies.append(spdEntry)
         else:
@@ -1691,19 +2807,48 @@ class SpdFlowCacheTemplate(VppTestCase):
         self.logger.info(self.vapi.ppcli("show ipsec all"))
         return spdEntry
 
         self.logger.info(self.vapi.ppcli("show ipsec all"))
         return spdEntry
 
-    def create_stream(self, src_if, dst_if, pkt_count,
-                      src_prt=1234, dst_prt=5678):
+    def create_stream(
+        self, src_if, dst_if, pkt_count, src_prt=1234, dst_prt=5678, proto="UDP"
+    ):
         packets = []
         packets = []
+        # create SA
+        sa = SecurityAssociation(
+            ESP,
+            spi=1000,
+            crypt_algo="AES-CBC",
+            crypt_key=b"JPjyOWBeVEQiMe7h",
+            auth_algo="HMAC-SHA1-96",
+            auth_key=b"C91KUR9GYMm5GfkEvNjX",
+            tunnel_header=IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4),
+            nat_t_header=UDP(sport=src_prt, dport=dst_prt),
+        )
         for i in range(pkt_count):
             # create packet info stored in the test case instance
             info = self.create_packet_info(src_if, dst_if)
             # convert the info into packet payload
             payload = self.info_to_payload(info)
             # create the packet itself
         for i in range(pkt_count):
             # create packet info stored in the test case instance
             info = self.create_packet_info(src_if, dst_if)
             # convert the info into packet payload
             payload = self.info_to_payload(info)
             # create the packet itself
-            p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
-                 IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4) /
-                 UDP(sport=src_prt, dport=dst_prt) /
-                 Raw(payload))
+            p = []
+            if proto == "UDP-ESP":
+                p = Ether(dst=src_if.local_mac, src=src_if.remote_mac) / sa.encrypt(
+                    IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4)
+                    / UDP(sport=src_prt, dport=dst_prt)
+                    / Raw(payload)
+                )
+            elif proto == "UDP":
+                p = (
+                    Ether(dst=src_if.local_mac, src=src_if.remote_mac)
+                    / IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4)
+                    / UDP(sport=src_prt, dport=dst_prt)
+                    / Raw(payload)
+                )
+            elif proto == "TCP":
+                p = (
+                    Ether(dst=src_if.local_mac, src=src_if.remote_mac)
+                    / IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4)
+                    / TCP(sport=src_prt, dport=dst_prt)
+                    / Raw(payload)
+                )
             # store a copy of the packet in the packet info
             info.data = p.copy()
             # append the packet to the list
             # store a copy of the packet in the packet info
             info.data = p.copy()
             # append the packet to the list
@@ -1720,72 +2865,152 @@ class SpdFlowCacheTemplate(VppTestCase):
                 # convert the payload to packet info object
                 payload_info = self.payload_to_info(packet)
                 # make sure the indexes match
                 # convert the payload to packet info object
                 payload_info = self.payload_to_info(packet)
                 # make sure the indexes match
-                self.assert_equal(payload_info.src, src_if.sw_if_index,
-                                  "source sw_if_index")
-                self.assert_equal(payload_info.dst, dst_if.sw_if_index,
-                                  "destination sw_if_index")
+                self.assert_equal(
+                    payload_info.src, src_if.sw_if_index, "source sw_if_index"
+                )
+                self.assert_equal(
+                    payload_info.dst, dst_if.sw_if_index, "destination sw_if_index"
+                )
                 packet_info = self.get_next_packet_info_for_interface2(
                 packet_info = self.get_next_packet_info_for_interface2(
-                                src_if.sw_if_index,
-                                dst_if.sw_if_index,
-                                packet_info)
+                    src_if.sw_if_index, dst_if.sw_if_index, packet_info
+                )
                 # make sure we didn't run out of saved packets
                 self.assertIsNotNone(packet_info)
                 # make sure we didn't run out of saved packets
                 self.assertIsNotNone(packet_info)
-                self.assert_equal(payload_info.index, packet_info.index,
-                                  "packet info index")
+                self.assert_equal(
+                    payload_info.index, packet_info.index, "packet info index"
+                )
                 saved_packet = packet_info.data  # fetch the saved packet
                 # assert the values match
                 saved_packet = packet_info.data  # fetch the saved packet
                 # assert the values match
-                self.assert_equal(ip.src, saved_packet[IP].src,
-                                  "IP source address")
+                self.assert_equal(ip.src, saved_packet[IP].src, "IP source address")
                 # ... more assertions here
                 # ... more assertions here
-                self.assert_equal(udp.sport, saved_packet[UDP].sport,
-                                  "UDP source port")
+                self.assert_equal(udp.sport, saved_packet[UDP].sport, "UDP source port")
             except Exception as e:
             except Exception as e:
-                self.logger.error(ppp("Unexpected or invalid packet:",
-                                  packet))
+                self.logger.error(ppp("Unexpected or invalid packet:", packet))
                 raise
         remaining_packet = self.get_next_packet_info_for_interface2(
                 raise
         remaining_packet = self.get_next_packet_info_for_interface2(
-                src_if.sw_if_index,
-                dst_if.sw_if_index,
-                packet_info)
-        self.assertIsNone(remaining_packet,
-                          "Interface %s: Packet expected from interface "
-                          "%s didn't arrive" % (dst_if.name, src_if.name))
+            src_if.sw_if_index, dst_if.sw_if_index, packet_info
+        )
+        self.assertIsNone(
+            remaining_packet,
+            "Interface %s: Packet expected from interface "
+            "%s didn't arrive" % (dst_if.name, src_if.name),
+        )
 
     def verify_policy_match(self, pkt_count, spdEntry):
 
     def verify_policy_match(self, pkt_count, spdEntry):
-        self.logger.info(
-            "XXXX %s %s", str(spdEntry), str(spdEntry.get_stats()))
-        matched_pkts = spdEntry.get_stats().get('packets')
-        self.logger.info(
-            "Policy %s matched: %d pkts", str(spdEntry), matched_pkts)
+        self.logger.info("XXXX %s %s", str(spdEntry), str(spdEntry.get_stats()))
+        matched_pkts = spdEntry.get_stats().get("packets")
+        self.logger.info("Policy %s matched: %d pkts", str(spdEntry), matched_pkts)
         self.assert_equal(pkt_count, matched_pkts)
 
         self.assert_equal(pkt_count, matched_pkts)
 
-    def get_spd_flow_cache_entries(self):
-        """ 'show ipsec spd' output:
-        ip4-outbound-spd-flow-cache-entries: 0
+    # Method verify_l3_l4_capture() will verify network and transport layer
+    # fields of the packet sa.encrypt() gives interface number garbadge.
+    # thus interface validation get failed (scapy bug?). However our intent
+    # is to verify IP layer and above and that is covered.
+
+    def verify_l3_l4_capture(
+        self, src_if, dst_if, capture, tcp_port_in=1234, udp_port_in=5678
+    ):
+        for packet in capture:
+            try:
+                self.assert_packet_checksums_valid(packet)
+                self.assert_equal(
+                    packet[IP].src,
+                    src_if.remote_ip4,
+                    "decrypted packet source address",
+                )
+                self.assert_equal(
+                    packet[IP].dst,
+                    dst_if.remote_ip4,
+                    "decrypted packet destination address",
+                )
+                if packet.haslayer(TCP):
+                    self.assertFalse(
+                        packet.haslayer(UDP),
+                        "unexpected UDP header in decrypted packet",
+                    )
+                elif packet.haslayer(UDP):
+                    if packet[UDP].payload:
+                        self.assertFalse(
+                            packet[UDP][1].haslayer(UDP),
+                            "unexpected UDP header in decrypted packet",
+                        )
+                else:
+                    self.assertFalse(
+                        packet.haslayer(UDP),
+                        "unexpected UDP header in decrypted packet",
+                    )
+                    self.assert_equal(
+                        packet[ICMP].id, self.icmp_id_in, "decrypted packet ICMP ID"
+                    )
+            except Exception:
+                self.logger.error(ppp("Unexpected or invalid plain packet:", packet))
+                raise
+
+
+class SpdFlowCacheTemplate(IPSecIPv4Fwd):
+    @classmethod
+    def setUpConstants(cls):
+        super(SpdFlowCacheTemplate, cls).setUpConstants()
+        # Override this method with required cmdline parameters e.g.
+        # cls.vpp_cmdline.extend(["ipsec", "{",
+        #                         "ipv4-outbound-spd-flow-cache on",
+        #                         "}"])
+        # cls.logger.info("VPP modified cmdline is %s" % " "
+        #                 .join(cls.vpp_cmdline))
+
+    def setUp(self):
+        super(SpdFlowCacheTemplate, self).setUp()
+
+    def tearDown(self):
+        super(SpdFlowCacheTemplate, self).tearDown()
+
+    def get_spd_flow_cache_entries(self, outbound):
+        """'show ipsec spd' output:
+        ipv4-inbound-spd-flow-cache-entries: 0
+        ipv4-outbound-spd-flow-cache-entries: 0
         """
         show_ipsec_reply = self.vapi.cli("show ipsec spd")
         # match the relevant section of 'show ipsec spd' output
         """
         show_ipsec_reply = self.vapi.cli("show ipsec spd")
         # match the relevant section of 'show ipsec spd' output
-        regex_match = re.search(
-            'ip4-outbound-spd-flow-cache-entries: (.*)',
-            show_ipsec_reply, re.DOTALL)
+        if outbound:
+            regex_match = re.search(
+                "ipv4-outbound-spd-flow-cache-entries: (.*)",
+                show_ipsec_reply,
+                re.DOTALL,
+            )
+        else:
+            regex_match = re.search(
+                "ipv4-inbound-spd-flow-cache-entries: (.*)", show_ipsec_reply, re.DOTALL
+            )
         if regex_match is None:
         if regex_match is None:
-            raise Exception("Unable to find spd flow cache entries \
-                in \'show ipsec spd\' CLI output - regex failed to match")
+            raise Exception(
+                "Unable to find spd flow cache entries \
+                in 'show ipsec spd' CLI output - regex failed to match"
+            )
         else:
             try:
                 num_entries = int(regex_match.group(1))
             except ValueError:
         else:
             try:
                 num_entries = int(regex_match.group(1))
             except ValueError:
-                raise Exception("Unable to get spd flow cache entries \
-                from \'show ipsec spd\' string: %s", regex_match.group(0))
+                raise Exception(
+                    "Unable to get spd flow cache entries \
+                from 'show ipsec spd' string: %s",
+                    regex_match.group(0),
+                )
             self.logger.info("%s", regex_match.group(0))
         return num_entries
 
     def verify_num_outbound_flow_cache_entries(self, expected_elements):
             self.logger.info("%s", regex_match.group(0))
         return num_entries
 
     def verify_num_outbound_flow_cache_entries(self, expected_elements):
-        self.assertEqual(self.get_spd_flow_cache_entries(), expected_elements)
+        self.assertEqual(
+            self.get_spd_flow_cache_entries(outbound=True), expected_elements
+        )
+
+    def verify_num_inbound_flow_cache_entries(self, expected_elements):
+        self.assertEqual(
+            self.get_spd_flow_cache_entries(outbound=False), expected_elements
+        )
 
     def crc32_supported(self):
         # lscpu is part of util-linux package, available on all Linux Distros
 
     def crc32_supported(self):
         # lscpu is part of util-linux package, available on all Linux Distros
-        stream = os.popen('lscpu')
+        stream = os.popen("lscpu")
         cpu_info = stream.read()
         # feature/flag "crc32" on Aarch64 and "sse4_2" on x86
         # see vppinfra/crc32.h
         cpu_info = stream.read()
         # feature/flag "crc32" on Aarch64 and "sse4_2" on x86
         # see vppinfra/crc32.h
@@ -1796,6 +3021,279 @@ class SpdFlowCacheTemplate(VppTestCase):
             self.logger.info("\ncrc32 NOT supported:\n" + cpu_info)
             return False
 
             self.logger.info("\ncrc32 NOT supported:\n" + cpu_info)
             return False
 
+    def create_stream(
+        cls, src_if, dst_if, pkt_count, src_prt=1234, dst_prt=5678, proto="UDP-ESP"
+    ):
+        packets = []
+        packets = super(SpdFlowCacheTemplate, cls).create_stream(
+            src_if, dst_if, pkt_count, src_prt, dst_prt, proto
+        )
+        return packets
+
+    def verify_capture(
+        self, src_if, dst_if, capture, tcp_port_in=1234, udp_port_in=5678
+    ):
+        super(SpdFlowCacheTemplate, self).verify_l3_l4_capture(
+            src_if, dst_if, capture, tcp_port_in, udp_port_in
+        )
+
+
+class SpdFastPathTemplate(IPSecIPv4Fwd):
+    @classmethod
+    def setUpConstants(cls):
+        super(SpdFastPathTemplate, cls).setUpConstants()
+        # Override this method with required cmdline parameters e.g.
+        # cls.vpp_cmdline.extend(["ipsec", "{",
+        #                         "ipv4-outbound-spd-flow-cache on",
+        #                         "}"])
+        # cls.logger.info("VPP modified cmdline is %s" % " "
+        #                 .join(cls.vpp_cmdline))
+
+    def setUp(self):
+        super(SpdFastPathTemplate, self).setUp()
+
+    def tearDown(self):
+        super(SpdFastPathTemplate, self).tearDown()
+
+    def create_stream(
+        cls, src_if, dst_if, pkt_count, src_prt=1234, dst_prt=5678, proto="UDP-ESP"
+    ):
+        packets = []
+        packets = super(SpdFastPathTemplate, cls).create_stream(
+            src_if, dst_if, pkt_count, src_prt, dst_prt, proto
+        )
+        return packets
+
+    def verify_capture(
+        self, src_if, dst_if, capture, tcp_port_in=1234, udp_port_in=5678
+    ):
+        super(SpdFastPathTemplate, self).verify_l3_l4_capture(
+            src_if, dst_if, capture, tcp_port_in, udp_port_in
+        )
+
+
+class IpsecDefaultTemplate(IPSecIPv4Fwd):
+    @classmethod
+    def setUpConstants(cls):
+        super(IpsecDefaultTemplate, cls).setUpConstants()
+
+    def setUp(self):
+        super(IpsecDefaultTemplate, self).setUp()
+
+    def tearDown(self):
+        super(IpsecDefaultTemplate, self).tearDown()
+
+    def create_stream(
+        cls, src_if, dst_if, pkt_count, src_prt=1234, dst_prt=5678, proto="UDP-ESP"
+    ):
+        packets = []
+        packets = super(IpsecDefaultTemplate, cls).create_stream(
+            src_if, dst_if, pkt_count, src_prt, dst_prt, proto
+        )
+        return packets
+
+    def verify_capture(
+        self, src_if, dst_if, capture, tcp_port_in=1234, udp_port_in=5678
+    ):
+        super(IpsecDefaultTemplate, self).verify_l3_l4_capture(
+            src_if, dst_if, capture, tcp_port_in, udp_port_in
+        )
+
+
+class IPSecIPv6Fwd(VppTestCase):
+    """Test IPSec by capturing and verifying IPv6 forwarded pkts"""
+
+    @classmethod
+    def setUpConstants(cls):
+        super(IPSecIPv6Fwd, cls).setUpConstants()
+
+    def setUp(self):
+        super(IPSecIPv6Fwd, self).setUp()
+        # store SPD objects so we can remove configs on tear down
+        self.spd_objs = []
+        self.spd_policies = []
+
+    def tearDown(self):
+        # remove SPD policies
+        for obj in self.spd_policies:
+            obj.remove_vpp_config()
+        self.spd_policies = []
+        # remove SPD items (interface bindings first, then SPD)
+        for obj in reversed(self.spd_objs):
+            obj.remove_vpp_config()
+        self.spd_objs = []
+        # close down pg intfs
+        for pg in self.pg_interfaces:
+            pg.unconfig_ip6()
+            pg.admin_down()
+        super(IPSecIPv6Fwd, self).tearDown()
+
+    def create_interfaces(self, num_ifs=2):
+        # create interfaces pg0 ... pg<num_ifs>
+        self.create_pg_interfaces(range(num_ifs))
+        for pg in self.pg_interfaces:
+            # put the interface up
+            pg.admin_up()
+            # configure IPv6 address on the interface
+            pg.config_ip6()
+            pg.resolve_ndp()
+        self.logger.info(self.vapi.ppcli("show int addr"))
+
+    def spd_create_and_intf_add(self, spd_id, pg_list):
+        spd = VppIpsecSpd(self, spd_id)
+        spd.add_vpp_config()
+        self.spd_objs.append(spd)
+        for pg in pg_list:
+            spdItf = VppIpsecSpdItfBinding(self, spd, pg)
+            spdItf.add_vpp_config()
+            self.spd_objs.append(spdItf)
+
+    def get_policy(self, policy_type):
+        e = VppEnum.vl_api_ipsec_spd_action_t
+        if policy_type == "protect":
+            return e.IPSEC_API_SPD_ACTION_PROTECT
+        elif policy_type == "bypass":
+            return e.IPSEC_API_SPD_ACTION_BYPASS
+        elif policy_type == "discard":
+            return e.IPSEC_API_SPD_ACTION_DISCARD
+        else:
+            raise Exception("Invalid policy type: %s", policy_type)
+
+    def spd_add_rem_policy(
+        self,
+        spd_id,
+        src_if,
+        dst_if,
+        proto,
+        is_out,
+        priority,
+        policy_type,
+        remove=False,
+        all_ips=False,
+        ip_range=False,
+        local_ip_start=ip_address("0::0"),
+        local_ip_stop=ip_address("ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff"),
+        remote_ip_start=ip_address("0::0"),
+        remote_ip_stop=ip_address("ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff"),
+        remote_port_start=0,
+        remote_port_stop=65535,
+        local_port_start=0,
+        local_port_stop=65535,
+    ):
+        spd = VppIpsecSpd(self, spd_id)
+
+        if all_ips:
+            src_range_low = ip_address("0::0")
+            src_range_high = ip_address("ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff")
+            dst_range_low = ip_address("0::0")
+            dst_range_high = ip_address("ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff")
+
+        elif ip_range:
+            src_range_low = local_ip_start
+            src_range_high = local_ip_stop
+            dst_range_low = remote_ip_start
+            dst_range_high = remote_ip_stop
+
+        else:
+            src_range_low = src_if.remote_ip6
+            src_range_high = src_if.remote_ip6
+            dst_range_low = dst_if.remote_ip6
+            dst_range_high = dst_if.remote_ip6
+
+        spdEntry = VppIpsecSpdEntry(
+            self,
+            spd,
+            0,
+            src_range_low,
+            src_range_high,
+            dst_range_low,
+            dst_range_high,
+            proto,
+            priority=priority,
+            policy=self.get_policy(policy_type),
+            is_outbound=is_out,
+            remote_port_start=remote_port_start,
+            remote_port_stop=remote_port_stop,
+            local_port_start=local_port_start,
+            local_port_stop=local_port_stop,
+        )
+
+        if remove is False:
+            spdEntry.add_vpp_config()
+            self.spd_policies.append(spdEntry)
+        else:
+            spdEntry.remove_vpp_config()
+            self.spd_policies.remove(spdEntry)
+        self.logger.info(self.vapi.ppcli("show ipsec all"))
+        return spdEntry
+
+    def create_stream(self, src_if, dst_if, pkt_count, src_prt=1234, dst_prt=5678):
+        packets = []
+        for i in range(pkt_count):
+            # create packet info stored in the test case instance
+            info = self.create_packet_info(src_if, dst_if)
+            # convert the info into packet payload
+            payload = self.info_to_payload(info)
+            # create the packet itself
+            p = (
+                Ether(dst=src_if.local_mac, src=src_if.remote_mac)
+                / IPv6(src=src_if.remote_ip6, dst=dst_if.remote_ip6)
+                / UDP(sport=src_prt, dport=dst_prt)
+                / Raw(payload)
+            )
+            # store a copy of the packet in the packet info
+            info.data = p.copy()
+            # append the packet to the list
+            packets.append(p)
+        # return the created packet list
+        return packets
+
+    def verify_capture(self, src_if, dst_if, capture):
+        packet_info = None
+        for packet in capture:
+            try:
+                ip = packet[IPv6]
+                udp = packet[UDP]
+                # convert the payload to packet info object
+                payload_info = self.payload_to_info(packet)
+                # make sure the indexes match
+                self.assert_equal(
+                    payload_info.src, src_if.sw_if_index, "source sw_if_index"
+                )
+                self.assert_equal(
+                    payload_info.dst, dst_if.sw_if_index, "destination sw_if_index"
+                )
+                packet_info = self.get_next_packet_info_for_interface2(
+                    src_if.sw_if_index, dst_if.sw_if_index, packet_info
+                )
+                # make sure we didn't run out of saved packets
+                self.assertIsNotNone(packet_info)
+                self.assert_equal(
+                    payload_info.index, packet_info.index, "packet info index"
+                )
+                saved_packet = packet_info.data  # fetch the saved packet
+                # assert the values match
+                self.assert_equal(ip.src, saved_packet[IPv6].src, "IP source address")
+                # ... more assertions here
+                self.assert_equal(udp.sport, saved_packet[UDP].sport, "UDP source port")
+            except Exception as e:
+                self.logger.error(ppp("Unexpected or invalid packet:", packet))
+                raise
+        remaining_packet = self.get_next_packet_info_for_interface2(
+            src_if.sw_if_index, dst_if.sw_if_index, packet_info
+        )
+        self.assertIsNone(
+            remaining_packet,
+            "Interface %s: Packet expected from interface "
+            "%s didn't arrive" % (dst_if.name, src_if.name),
+        )
+
+    def verify_policy_match(self, pkt_count, spdEntry):
+        self.logger.info("XXXX %s %s", str(spdEntry), str(spdEntry.get_stats()))
+        matched_pkts = spdEntry.get_stats().get("packets")
+        self.logger.info("Policy %s matched: %d pkts", str(spdEntry), matched_pkts)
+        self.assert_equal(pkt_count, matched_pkts)
+
 
 
-if __name__ == '__main__':
+if __name__ == "__main__":
     unittest.main(testRunner=VppTestRunner)
     unittest.main(testRunner=VppTestRunner)