tests: Use errno value rather than a specific int
[vpp.git] / test / template_ipsec.py
index 2295b75..b5cd922 100644 (file)
@@ -15,7 +15,8 @@ from scapy.layers.inet6 import (
 )
 
 
-from framework import VppTestCase, VppTestRunner
+from framework import VppTestCase
+from asfframework import VppTestRunner
 from util import ppp, reassemble4, fragment_rfc791, fragment_rfc8200
 from vpp_papi import VppEnum
 
@@ -26,7 +27,6 @@ from os import popen
 
 
 class IPsecIPv4Params:
-
     addr_type = socket.AF_INET
     addr_any = "0.0.0.0"
     addr_bcast = "255.255.255.255"
@@ -52,6 +52,8 @@ class IPsecIPv4Params:
         self.outer_flow_label = 0
         self.inner_flow_label = 0x12345
 
+        self.anti_replay_window_size = 64
+
         self.auth_algo_vpp_id = (
             VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_SHA1_96
         )
@@ -74,7 +76,6 @@ class IPsecIPv4Params:
 
 
 class IPsecIPv6Params:
-
     addr_type = socket.AF_INET6
     addr_any = "0::0"
     addr_bcast = "ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff"
@@ -100,6 +101,8 @@ class IPsecIPv6Params:
         self.outer_flow_label = 0
         self.inner_flow_label = 0x12345
 
+        self.anti_replay_window_size = 64
+
         self.auth_algo_vpp_id = (
             VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_SHA1_96
         )
@@ -122,7 +125,7 @@ class IPsecIPv6Params:
 
 
 def mk_scapy_crypt_key(p):
-    if p.crypt_algo in ("AES-GCM", "AES-CTR"):
+    if p.crypt_algo in ("AES-GCM", "AES-CTR", "AES-NULL-GMAC"):
         return p.crypt_key + struct.pack("!I", p.salt)
     else:
         return p.crypt_key
@@ -138,7 +141,7 @@ def config_tun_params(p, encryption_type, tun_if):
     crypt_key = mk_scapy_crypt_key(p)
     p.scapy_tun_sa = SecurityAssociation(
         encryption_type,
-        spi=p.vpp_tun_spi,
+        spi=p.scapy_tun_spi,
         crypt_algo=p.crypt_algo,
         crypt_key=crypt_key,
         auth_algo=p.auth_algo,
@@ -149,7 +152,7 @@ def config_tun_params(p, encryption_type, tun_if):
     )
     p.vpp_tun_sa = SecurityAssociation(
         encryption_type,
-        spi=p.scapy_tun_spi,
+        spi=p.vpp_tun_spi,
         crypt_algo=p.crypt_algo,
         crypt_key=crypt_key,
         auth_algo=p.auth_algo,
@@ -167,7 +170,7 @@ def config_tra_params(p, encryption_type):
     crypt_key = mk_scapy_crypt_key(p)
     p.scapy_tra_sa = SecurityAssociation(
         encryption_type,
-        spi=p.vpp_tra_spi,
+        spi=p.scapy_tra_spi,
         crypt_algo=p.crypt_algo,
         crypt_key=crypt_key,
         auth_algo=p.auth_algo,
@@ -177,7 +180,7 @@ def config_tra_params(p, encryption_type):
     )
     p.vpp_tra_sa = SecurityAssociation(
         encryption_type,
-        spi=p.scapy_tra_spi,
+        spi=p.vpp_tra_spi,
         crypt_algo=p.crypt_algo,
         crypt_key=crypt_key,
         auth_algo=p.auth_algo,
@@ -330,27 +333,25 @@ class IpsecTra4(object):
     """verify methods for Transport v4"""
 
     def get_replay_counts(self, p):
-        replay_node_name = "/err/%s/SA replayed packet" % self.tra4_decrypt_node_name[0]
+        replay_node_name = "/err/%s/replay" % self.tra4_decrypt_node_name[0]
         count = self.statistics.get_err_counter(replay_node_name)
 
         if p.async_mode:
             replay_post_node_name = (
-                "/err/%s/SA replayed packet" % self.tra4_decrypt_node_name[p.async_mode]
+                "/err/%s/replay" % self.tra4_decrypt_node_name[p.async_mode]
             )
             count += self.statistics.get_err_counter(replay_post_node_name)
 
         return count
 
     def get_hash_failed_counts(self, p):
-        if ESP == self.encryption_type and p.crypt_algo == "AES-GCM":
+        if ESP == self.encryption_type and p.crypt_algo in ("AES-GCM", "AES-NULL-GMAC"):
             hash_failed_node_name = (
-                "/err/%s/ESP decryption failed"
-                % self.tra4_decrypt_node_name[p.async_mode]
+                "/err/%s/decryption_failed" % self.tra4_decrypt_node_name[p.async_mode]
             )
         else:
             hash_failed_node_name = (
-                "/err/%s/Integrity check failed"
-                % self.tra4_decrypt_node_name[p.async_mode]
+                "/err/%s/integ_error" % self.tra4_decrypt_node_name[p.async_mode]
             )
         count = self.statistics.get_err_counter(hash_failed_node_name)
 
@@ -365,10 +366,7 @@ class IpsecTra4(object):
         esn_on = p.vpp_tra_sa.esn_en
         ar_on = p.flags & saf.IPSEC_API_SAD_FLAG_USE_ANTI_REPLAY
 
-        seq_cycle_node_name = (
-            "/err/%s/sequence number cycled (packet dropped)"
-            % self.tra4_encrypt_node_name
-        )
+        seq_cycle_node_name = "/err/%s/seq_cycled" % self.tra4_encrypt_node_name
         replay_count = self.get_replay_counts(p)
         hash_failed_count = self.get_hash_failed_counts(p)
         seq_cycle_count = self.statistics.get_err_counter(seq_cycle_node_name)
@@ -437,6 +435,34 @@ class IpsecTra4(object):
         ]
         recv_pkts = self.send_and_expect(self.tra_if, pkts, self.tra_if)
 
+        # a replayed packet, then an out of window, then a legit
+        # tests that a early failure on the batch doesn't affect subsequent packets.
+        pkts = [
+            (
+                Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                / p.scapy_tra_sa.encrypt(
+                    IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                    seq_num=203,
+                )
+            ),
+            (
+                Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                / p.scapy_tra_sa.encrypt(
+                    IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                    seq_num=81,
+                )
+            ),
+            (
+                Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                / p.scapy_tra_sa.encrypt(
+                    IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                    seq_num=204,
+                )
+            ),
+        ]
+        n_rx = 1 if ar_on else 3
+        recv_pkts = self.send_and_expect(self.tra_if, pkts, self.tra_if, n_rx=n_rx)
+
         # move the window over half way to a wrap
         pkts = [
             (
@@ -604,20 +630,34 @@ class IpsecTra4(object):
     def verify_tra_anti_replay(self):
         p = self.params[socket.AF_INET]
         esn_en = p.vpp_tra_sa.esn_en
+        anti_replay_window_size = p.anti_replay_window_size
 
-        seq_cycle_node_name = (
-            "/err/%s/sequence number cycled (packet dropped)"
-            % self.tra4_encrypt_node_name
-        )
+        seq_cycle_node_name = "/err/%s/seq_cycled" % self.tra4_encrypt_node_name
         replay_count = self.get_replay_counts(p)
+        initial_sa_node_replay_diff = replay_count - p.tra_sa_in.get_err("replay")
         hash_failed_count = self.get_hash_failed_counts(p)
         seq_cycle_count = self.statistics.get_err_counter(seq_cycle_node_name)
+        initial_sa_node_cycled_diff = seq_cycle_count - p.tra_sa_in.get_err(
+            "seq_cycled"
+        )
+        hash_err = "integ_error"
 
         if ESP == self.encryption_type:
-            undersize_node_name = (
-                "/err/%s/undersized packet" % self.tra4_decrypt_node_name[0]
-            )
+            undersize_node_name = "/err/%s/runt" % self.tra4_decrypt_node_name[0]
             undersize_count = self.statistics.get_err_counter(undersize_node_name)
+            initial_sa_node_undersize_diff = undersize_count - p.tra_sa_in.get_err(
+                "runt"
+            )
+            # For AES-GCM an error in the hash is reported as a decryption failure
+            if p.crypt_algo in ("AES-GCM", "AES-NULL-GMAC"):
+                hash_err = "decryption_failed"
+        # In async mode, we don't report errors in the hash.
+        if p.async_mode:
+            hash_err = ""
+        else:
+            initial_sa_node_hash_diff = hash_failed_count - p.tra_sa_in.get_err(
+                hash_err
+            )
 
         #
         # send packets with seq numbers 1->34
@@ -643,6 +683,8 @@ class IpsecTra4(object):
         self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
         replay_count += len(pkts)
         self.assertEqual(self.get_replay_counts(p), replay_count)
+        err = p.tra_sa_in.get_err("replay") + initial_sa_node_replay_diff
+        self.assertEqual(err, replay_count)
 
         #
         # now send a batch of packets all with the same sequence number
@@ -659,25 +701,31 @@ class IpsecTra4(object):
         recv_pkts = self.send_and_expect(self.tra_if, pkts * 8, self.tra_if, n_rx=1)
         replay_count += 7
         self.assertEqual(self.get_replay_counts(p), replay_count)
+        err = p.tra_sa_in.get_err("replay") + initial_sa_node_replay_diff
+        self.assertEqual(err, replay_count)
 
         #
-        # now move the window over to 257 (more than one byte) and into Case A
+        # now move the window over to anti_replay_window_size + 100 and into Case A
         #
         self.vapi.cli("clear error")
         pkt = Ether(
             src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
         ) / p.scapy_tra_sa.encrypt(
             IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
-            seq_num=257,
+            seq_num=anti_replay_window_size + 100,
         )
         recv_pkts = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
 
+        self.logger.info(self.vapi.ppcli("show ipsec sa 1"))
+
         # replayed packets are dropped
         self.send_and_assert_no_replies(self.tra_if, pkt * 3, timeout=0.2)
         replay_count += 3
         self.assertEqual(self.get_replay_counts(p), replay_count)
+        err = p.tra_sa_in.get_err("replay") + initial_sa_node_replay_diff
+        self.assertEqual(err, replay_count)
 
-        # the window size is 64 packets
+        # the window size is anti_replay_window_size packets
         # in window are still accepted
         pkt = Ether(
             src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
@@ -685,12 +733,11 @@ class IpsecTra4(object):
             IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
             seq_num=200,
         )
-        recv_pkts = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
 
         # a packet that does not decrypt does not move the window forward
         bogus_sa = SecurityAssociation(
             self.encryption_type,
-            p.vpp_tra_spi,
+            p.scapy_tra_spi,
             crypt_algo=p.crypt_algo,
             crypt_key=mk_scapy_crypt_key(p)[::-1],
             auth_algo=p.auth_algo,
@@ -700,27 +747,32 @@ class IpsecTra4(object):
             src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
         ) / bogus_sa.encrypt(
             IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
-            seq_num=350,
+            seq_num=anti_replay_window_size + 200,
         )
         self.send_and_assert_no_replies(self.tra_if, pkt * 17, timeout=0.2)
 
         hash_failed_count += 17
         self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
+        if hash_err != "":
+            err = p.tra_sa_in.get_err(hash_err) + initial_sa_node_hash_diff
+            self.assertEqual(err, hash_failed_count)
 
         # a malformed 'runt' packet
         #  created by a mis-constructed SA
         if ESP == self.encryption_type and p.crypt_algo != "NULL":
-            bogus_sa = SecurityAssociation(self.encryption_type, p.vpp_tra_spi)
+            bogus_sa = SecurityAssociation(self.encryption_type, p.scapy_tra_spi)
             pkt = Ether(
                 src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
             ) / bogus_sa.encrypt(
                 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
-                seq_num=350,
+                seq_num=anti_replay_window_size + 200,
             )
             self.send_and_assert_no_replies(self.tra_if, pkt * 17, timeout=0.2)
 
             undersize_count += 17
             self.assert_error_counter_equal(undersize_node_name, undersize_count)
+            err = p.tra_sa_in.get_err("runt") + initial_sa_node_undersize_diff
+            self.assertEqual(err, undersize_count)
 
         # which we can determine since this packet is still in the window
         pkt = Ether(
@@ -749,17 +801,22 @@ class IpsecTra4(object):
             # wrap. but since it isn't then the verify will fail.
             hash_failed_count += 17
             self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
+            if hash_err != "":
+                err = p.tra_sa_in.get_err(hash_err) + initial_sa_node_hash_diff
+                self.assertEqual(err, hash_failed_count)
 
         else:
             replay_count += 17
             self.assertEqual(self.get_replay_counts(p), replay_count)
+            err = p.tra_sa_in.get_err("replay") + initial_sa_node_replay_diff
+            self.assertEqual(err, replay_count)
 
-        # valid packet moves the window over to 258
+        # valid packet moves the window over to anti_replay_window_size + 258
         pkt = Ether(
             src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
         ) / p.scapy_tra_sa.encrypt(
             IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
-            seq_num=258,
+            seq_num=anti_replay_window_size + 258,
         )
         rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
         decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
@@ -770,7 +827,7 @@ class IpsecTra4(object):
         # causes the TX seq number to wrap; unless we're using extened sequence
         # numbers.
         #
-        self.vapi.cli("test ipsec sa %d seq 0xffffffff" % p.scapy_tra_sa_id)
+        self.vapi.cli("test ipsec sa %d seq 0xffffffff" % p.vpp_tra_sa_id)
         self.logger.info(self.vapi.ppcli("show ipsec sa 0"))
         self.logger.info(self.vapi.ppcli("show ipsec sa 1"))
 
@@ -813,7 +870,7 @@ class IpsecTra4(object):
             decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
 
             #
-            # A packet that has seq num between (2^32-64) and 5 is within
+            # A packet that has seq num between (2^32-anti_replay_window_size)+4 and 5 is within
             # the window
             #
             p.scapy_tra_sa.seq_num = 0xFFFFFFFD
@@ -843,57 +900,755 @@ class IpsecTra4(object):
 
             hash_failed_count += 1
             self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
+            if hash_err != "":
+                err = p.tra_sa_in.get_err(hash_err) + initial_sa_node_hash_diff
+                self.assertEqual(err, hash_failed_count)
 
             #
             # but if we move the window forward to case B, then we can wrap
             # again
             #
-            p.scapy_tra_sa.seq_num = 0x100000555
+            p.scapy_tra_sa.seq_num = 0x100000000 + anti_replay_window_size + 0x555
+            pkt = Ether(
+                src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
+            ) / p.scapy_tra_sa.encrypt(
+                IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                seq_num=p.scapy_tra_sa.seq_num,
+            )
+            rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
+            decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
+
+            p.scapy_tra_sa.seq_num = 0x200000444
             pkt = Ether(
                 src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
             ) / p.scapy_tra_sa.encrypt(
                 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
-                seq_num=0x100000555,
+                seq_num=0x200000444,
             )
             rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
             decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
 
-            p.scapy_tra_sa.seq_num = 0x200000444
-            pkt = Ether(
-                src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
-            ) / p.scapy_tra_sa.encrypt(
-                IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
-                seq_num=0x200000444,
+        else:
+            #
+            # without ESN TX sequence numbers can't wrap and packets are
+            # dropped from here on out.
+            #
+            self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
+            seq_cycle_count += len(pkts)
+            self.assert_error_counter_equal(seq_cycle_node_name, seq_cycle_count)
+            err = p.tra_sa_out.get_err("seq_cycled") + initial_sa_node_cycled_diff
+            self.assertEqual(err, seq_cycle_count)
+
+        # move the security-associations seq number on to the last we used
+        self.vapi.cli("test ipsec sa %d seq 0x15f" % p.scapy_tra_sa_id)
+        p.scapy_tra_sa.seq_num = 351
+        p.vpp_tra_sa.seq_num = 351
+
+    def verify_tra_lost(self):
+        p = self.params[socket.AF_INET]
+        esn_en = p.vpp_tra_sa.esn_en
+
+        #
+        # send packets with seq numbers 1->34
+        # this means the window size is still in Case B (see RFC4303
+        # Appendix A)
+        #
+        # for reasons i haven't investigated Scapy won't create a packet with
+        # seq_num=0
+        #
+        pkts = [
+            (
+                Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                / p.scapy_tra_sa.encrypt(
+                    IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                    seq_num=seq,
+                )
+            )
+            for seq in range(1, 3)
+        ]
+        self.send_and_expect(self.tra_if, pkts, self.tra_if)
+
+        self.assertEqual(p.tra_sa_in.get_err("lost"), 0)
+
+        # skip a sequence number
+        pkts = [
+            (
+                Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                / p.scapy_tra_sa.encrypt(
+                    IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                    seq_num=seq,
+                )
+            )
+            for seq in range(4, 6)
+        ]
+        self.send_and_expect(self.tra_if, pkts, self.tra_if)
+
+        self.assertEqual(p.tra_sa_in.get_err("lost"), 0)
+
+        # the lost packet are counted untill we get up past the first
+        # sizeof(replay_window) packets
+        pkts = [
+            (
+                Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                / p.scapy_tra_sa.encrypt(
+                    IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                    seq_num=seq,
+                )
+            )
+            for seq in range(6, 100)
+        ]
+        self.send_and_expect(self.tra_if, pkts, self.tra_if)
+
+        self.assertEqual(p.tra_sa_in.get_err("lost"), 1)
+
+        # lost of holes in the sequence
+        pkts = [
+            (
+                Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                / p.scapy_tra_sa.encrypt(
+                    IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                    seq_num=seq,
+                )
+            )
+            for seq in range(100, 200, 2)
+        ]
+        self.send_and_expect(self.tra_if, pkts, self.tra_if, n_rx=50)
+
+        pkts = [
+            (
+                Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                / p.scapy_tra_sa.encrypt(
+                    IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                    seq_num=seq,
+                )
+            )
+            for seq in range(200, 300)
+        ]
+        self.send_and_expect(self.tra_if, pkts, self.tra_if)
+
+        self.assertEqual(p.tra_sa_in.get_err("lost"), 51)
+
+        # a big hole in the seq number space
+        pkts = [
+            (
+                Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                / p.scapy_tra_sa.encrypt(
+                    IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                    seq_num=seq,
+                )
+            )
+            for seq in range(400, 500)
+        ]
+        self.send_and_expect(self.tra_if, pkts, self.tra_if)
+
+        self.assertEqual(p.tra_sa_in.get_err("lost"), 151)
+
+    def verify_tra_basic4(self, count=1, payload_size=54):
+        """ipsec v4 transport basic test"""
+        self.vapi.cli("clear errors")
+        self.vapi.cli("clear ipsec sa")
+        try:
+            p = self.params[socket.AF_INET]
+            send_pkts = self.gen_encrypt_pkts(
+                p,
+                p.scapy_tra_sa,
+                self.tra_if,
+                src=self.tra_if.remote_ip4,
+                dst=self.tra_if.local_ip4,
+                count=count,
+                payload_size=payload_size,
+            )
+            recv_pkts = self.send_and_expect(self.tra_if, send_pkts, self.tra_if)
+            for rx in recv_pkts:
+                self.assertEqual(len(rx) - len(Ether()), rx[IP].len)
+                self.assert_packet_checksums_valid(rx)
+                try:
+                    decrypted = p.vpp_tra_sa.decrypt(rx[IP])
+                    self.assert_packet_checksums_valid(decrypted)
+                except:
+                    self.logger.debug(ppp("Unexpected packet:", rx))
+                    raise
+        finally:
+            self.logger.info(self.vapi.ppcli("show error"))
+            self.logger.info(self.vapi.ppcli("show ipsec all"))
+
+        pkts = p.tra_sa_in.get_stats()["packets"]
+        self.assertEqual(
+            pkts, count, "incorrect SA in counts: expected %d != %d" % (count, pkts)
+        )
+        pkts = p.tra_sa_out.get_stats()["packets"]
+        self.assertEqual(
+            pkts, count, "incorrect SA out counts: expected %d != %d" % (count, pkts)
+        )
+        self.assertEqual(p.tra_sa_out.get_err("lost"), 0)
+        self.assertEqual(p.tra_sa_in.get_err("lost"), 0)
+
+        self.assert_packet_counter_equal(self.tra4_encrypt_node_name, count)
+        self.assert_packet_counter_equal(self.tra4_decrypt_node_name[0], count)
+
+    def _verify_tra_anti_replay_algorithm_esn(self):
+        def seq_num(seqh, seql):
+            return (seqh << 32) | (seql & 0xFFFF_FFFF)
+
+        p = self.params[socket.AF_INET]
+        anti_replay_window_size = p.anti_replay_window_size
+
+        seq_cycle_node_name = "/err/%s/seq_cycled" % self.tra4_encrypt_node_name
+        replay_count = self.get_replay_counts(p)
+        hash_failed_count = self.get_hash_failed_counts(p)
+        seq_cycle_count = self.statistics.get_err_counter(seq_cycle_node_name)
+
+        if ESP == self.encryption_type:
+            undersize_node_name = "/err/%s/runt" % self.tra4_decrypt_node_name[0]
+            undersize_count = self.statistics.get_err_counter(undersize_node_name)
+
+        # reset the TX SA to avoid conflict with left configuration
+        self.vapi.cli(f"test ipsec sa {p.vpp_tra_sa_id} seq 0x0")
+
+        """
+        RFC 4303 Appendix A2. Case A
+
+        |: new Th marker
+        a-i: possible seq num received
+        +: Bl, Tl, Bl', Tl'
+        [BT]l(sign) = [BT]l (sign) 2^32 mod 2^32 (Th inc/dec-remented by 1)
+
+                Th - 1               Th                Th + 1
+        --|--a--+---b---+-c--|--d--+---e---+-f--|--g--+---h---+--i-|--
+                =========          =========          =========
+                Bl-     Tl-        Bl      Tl         Bl+     Tl+
+
+        Case A implies Tl >= W - 1
+        """
+
+        Th = 1
+        Tl = anti_replay_window_size + 40
+        Bl = Tl - anti_replay_window_size + 1
+
+        # move VPP's RX AR window to Case A
+        self.vapi.cli(f"test ipsec sa {p.scapy_tra_sa_id} seq {seq_num(Th, Tl):#x}")
+        p.scapy_tra_sa.seq_num = seq_num(Th, Tl)
+
+        """
+        case a: Seql < Bl
+            - pre-crypto check: algorithm predicts that the packet wrap the window
+                -> Seqh = Th + 1
+            - integrity check: should fail
+            - post-crypto check: ...
+        """
+        pkts = [
+            (
+                Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                / p.scapy_tra_sa.encrypt(
+                    IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                    seq_num=seq,
+                )
+            )
+            for seq in range(seq_num(Th - 1, Bl - 20), seq_num(Th - 1, Bl - 5))
+        ]
+
+        # out-of-window packets fail integrity check
+        self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
+        hash_failed_count += len(pkts)
+        self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
+
+        """
+            case b: Bl <= Seql <= Tl
+                - pre-crypto check: algorithm predicts that the packet is in the window
+                    -> Seqh = Th
+                    -> check for a replayed packet with Seql
+                - integrity check: should fail
+                - post-crypto check: ...
+        """
+        pkts = [
+            (
+                Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                / p.scapy_tra_sa.encrypt(
+                    IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                    seq_num=seq,
+                )
+            )
+            for seq in range(seq_num(Th, Tl - 10), seq_num(Th, Tl - 5))
+        ]
+        self.send_and_expect(self.tra_if, pkts, self.tra_if)
+
+        p.scapy_tra_sa.seq_num = seq_num(Th - 1, Tl)
+        pkts = [
+            (
+                Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                / p.scapy_tra_sa.encrypt(
+                    IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                    seq_num=seq,
+                )
+            )
+            for seq in range(seq_num(Th - 1, Tl - 35), seq_num(Th - 1, Tl - 5))
+        ]
+
+        self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
+
+        # some packets are rejected by the pre-crypto check
+        replay_count += 5
+        self.assertEqual(self.get_replay_counts(p), replay_count)
+
+        # out-of-window packets fail integrity check
+        hash_failed_count += len(pkts) - 5
+        self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
+
+        """
+            case c: Seql > Tl
+                - pre-crypto check: algorithm predicts that the packet does not wrap the window
+                    -> Seqh = Th
+                - integrity check: should fail
+                - post-crypto check: ...
+        """
+        pkts = [
+            (
+                Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                / p.scapy_tra_sa.encrypt(
+                    IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                    seq_num=seq,
+                )
+            )
+            for seq in range(seq_num(Th - 1, Tl + 5), seq_num(Th - 1, Tl + 20))
+        ]
+
+        self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
+
+        # out-of-window packets fail integrity check
+        hash_failed_count += len(pkts)
+        self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
+
+        """
+            case d: Seql < Bl
+                - pre-crypto check: algorithm predicts that the packet wrap the window
+                    -> Seqh = Th + 1
+                - integrity check: should fail
+                - post-crypto check: ...
+        """
+        p.scapy_tra_sa.seq_num = seq_num(Th, Tl)
+        pkts = [
+            (
+                Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                / p.scapy_tra_sa.encrypt(
+                    IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                    seq_num=seq,
+                )
+            )
+            for seq in range(seq_num(Th, Bl - 20), seq_num(Th, Bl - 5))
+        ]
+
+        # out-of-window packets fail integrity check
+        self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
+        hash_failed_count += len(pkts)
+        self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
+
+        """
+            case e: Bl <= Seql <= Tl
+                - pre-crypto check: algorithm predicts that the packet is in the window
+                    -> Seqh = Th
+                    -> check for a replayed packet with Seql
+                - integrity check: should pass
+                - post-crypto check: should pass
+                    -> Seql is marked in the AR window
+        """
+        pkts = [
+            (
+                Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                / p.scapy_tra_sa.encrypt(
+                    IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                    seq_num=seq,
+                )
+            )
+            for seq in range(seq_num(Th, Bl + 10), seq_num(Th, Bl + 30))
+        ]
+
+        self.send_and_expect(self.tra_if, pkts, self.tra_if)
+
+        """
+            case f: Seql > Tl
+                - pre-crypto check: algorithm predicts that the packet does not wrap the window
+                    -> Seqh = Th
+                - integrity check: should pass
+                - post-crypto check: should pass
+                    -> AR window shift (the window stays Case A)
+                    -> Seql is marked in the AR window
+        """
+        pkts = [
+            (
+                Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                / p.scapy_tra_sa.encrypt(
+                    IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                    seq_num=seq,
+                )
+            )
+            for seq in range(seq_num(Th, Tl + 50), seq_num(Th, Tl + 60))
+        ]
+
+        self.send_and_expect(self.tra_if, pkts, self.tra_if)
+
+        """
+            case g: Seql < Bl
+                - pre-crypto check: algorithm predicts that the packet wrap the window
+                    -> Seqh = Th + 1
+                - integrity check: should pass
+                - post-crypto check: should pass
+                    -> AR window shift (may set the window in Case B)
+                    -> Seql is marked in the AR window
+        """
+        p.scapy_tra_sa.seq_num = seq_num(Th + 1, Tl)
+        pkts = [
+            (
+                Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                / p.scapy_tra_sa.encrypt(
+                    IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                    seq_num=seq,
+                )
+            )
+            # set the window in Case B (the minimum window size is 64
+            # so we are sure to overlap)
+            for seq in range(seq_num(Th + 1, 10), seq_num(Th + 1, 20))
+        ]
+
+        self.send_and_expect(self.tra_if, pkts, self.tra_if)
+
+        # reset the VPP's RX AR window to Case A
+        Th = 1
+        Tl = 2 * anti_replay_window_size + 40
+        Bl = Tl - anti_replay_window_size + 1
+
+        self.vapi.cli(f"test ipsec sa {p.scapy_tra_sa_id} seq {seq_num(Th, Tl):#x}")
+
+        p.scapy_tra_sa.seq_num = seq_num(Th + 1, Tl)
+        pkts = [
+            (
+                Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                / p.scapy_tra_sa.encrypt(
+                    IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                    seq_num=seq,
+                )
+            )
+            # the AR will stay in Case A
+            for seq in range(
+                seq_num(Th + 1, anti_replay_window_size + 10),
+                seq_num(Th + 1, anti_replay_window_size + 20),
+            )
+        ]
+
+        self.send_and_expect(self.tra_if, pkts, self.tra_if)
+
+        """
+            case h: Bl <= Seql <= Tl
+                - pre-crypto check: algorithm predicts that the packet is in the window
+                    -> Seqh = Th
+                    -> check for a replayed packet with Seql
+                - integrity check: the wrap is not detected, should fail
+                - post-crypto check: ...
+        """
+        Th += 1
+        Tl = anti_replay_window_size + 20
+        Bl = Tl - anti_replay_window_size + 1
+
+        p.scapy_tra_sa.seq_num = seq_num(Th + 1, Tl)
+
+        pkts = [
+            (
+                Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                / p.scapy_tra_sa.encrypt(
+                    IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                    seq_num=seq,
+                )
+            )
+            for seq in range(seq_num(Th + 1, Tl - 20), seq_num(Th + 1, Tl - 5))
+        ]
+
+        self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
+
+        # some packets are rejected by the pre-crypto check
+        replay_count += 5
+        self.assertEqual(self.get_replay_counts(p), replay_count)
+
+        # out-of-window packets fail integrity check
+        hash_failed_count += len(pkts) - 5
+        self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
+
+        """
+            case i: Seql > Tl
+                - pre-crypto check: algorithm predicts that the packet does not wrap the window
+                    -> Seqh = Th
+                - integrity check: the wrap is not detected, shoud fail
+                - post-crypto check: ...
+        """
+        pkts = [
+            (
+                Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                / p.scapy_tra_sa.encrypt(
+                    IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                    seq_num=seq,
+                )
+            )
+            for seq in range(seq_num(Th + 1, Tl + 5), seq_num(Th + 1, Tl + 15))
+        ]
+
+        # out-of-window packets fail integrity check
+        self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
+        hash_failed_count += len(pkts)
+        self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
+
+        """
+            RFC 4303 Appendix A2. Case B
+
+                        Th - 1               Th                Th + 1
+            ----|-a-+-----b----+--c--|-d-+----e-----+--f--|-g-+--h---
+            =========          ===========          ===========
+                    Tl-        Bl       Tl          Bl+       Tl+
+
+            Case B implies Tl < W - 1
+        """
+
+        # reset the VPP's RX AR window to Case B
+        Th = 2
+        Tl = 30  # minimum window size of 64, we are sure to overlap
+        Bl = (Tl - anti_replay_window_size + 1) % (1 << 32)
+
+        self.vapi.cli(f"test ipsec sa {p.scapy_tra_sa_id} seq {seq_num(Th, Tl):#x}")
+        p.scapy_tra_sa.seq_num = seq_num(Th, Tl)
+
+        """
+            case a: Seql <= Tl < Bl
+                - pre-crypto check: algorithm predicts that the packet is in the window
+                    -> Seqh = Th
+                    -> check for replayed packet
+                - integrity check: should fail
+                - post-crypto check: ...
+        """
+        pkts = [
+            (
+                Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                / p.scapy_tra_sa.encrypt(
+                    IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                    seq_num=seq,
+                )
+            )
+            for seq in range(seq_num(Th, 5), seq_num(Th, 10))
+        ]
+
+        self.send_and_expect(self.tra_if, pkts, self.tra_if)
+
+        p.scapy_tra_sa.seq_num = seq_num(Th - 1, Tl)
+        pkts = [
+            (
+                Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                / p.scapy_tra_sa.encrypt(
+                    IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                    seq_num=seq,
+                )
+            )
+            for seq in range(seq_num(Th - 1, 0), seq_num(Th - 1, 15))
+        ]
+
+        self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
+
+        # some packets are rejected by the pre-crypto check
+        replay_count += 5
+        self.assertEqual(self.get_replay_counts(p), replay_count)
+
+        # out-of-window packets fail integrity check
+        hash_failed_count += len(pkts) - 5
+        self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
+
+        """
+            case b: Tl < Seql < Bl
+                - pre-crypto check: algorithm predicts that the packet will shift the window
+                    -> Seqh = Th
+                - integrity check: should fail
+                - post-crypto check: ...
+        """
+        pkts = [
+            (
+                Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                / p.scapy_tra_sa.encrypt(
+                    IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                    seq_num=seq,
+                )
+            )
+            for seq in range(seq_num(Th - 1, Tl + 10), seq_num(Th - 1, Tl + 20))
+        ]
+
+        self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
+
+        # out-of-window packets fail integrity check
+        hash_failed_count += len(pkts)
+        self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
+
+        """
+            case c: Tl < Bl <= Seql
+                - pre-crypto check: algorithm predicts that the packet is in the window
+                    -> Seqh = Th - 1
+                    -> check for a replayed packet with Seql
+                - integrity check: should pass
+                - post-crypto check: should pass
+                    -> Seql is marked in the AR window
+        """
+        pkts = [
+            (
+                Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                / p.scapy_tra_sa.encrypt(
+                    IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                    seq_num=seq,
+                )
+            )
+            for seq in range(seq_num(Th - 1, Bl + 10), seq_num(Th - 1, Bl + 20))
+        ]
+
+        self.send_and_expect(self.tra_if, pkts, self.tra_if)
+
+        """
+            case d: Seql <= Tl < Bl
+                - pre-crypto check: algorithm predicts that the packet is the window
+                    -> Seqh = Th
+                    -> check for replayed packet
+                - integrity check: should pass
+                - post-crypto check: should pass
+                    -> Seql is marked in the AR window
+        """
+        p.scapy_tra_sa.seq_num = seq_num(Th, Tl)
+        pkts = [
+            (
+                Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                / p.scapy_tra_sa.encrypt(
+                    IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                    seq_num=seq,
+                )
+            )
+            for seq in range(seq_num(Th, 15), seq_num(Th, 25))
+        ]
+
+        self.send_and_expect(self.tra_if, pkts, self.tra_if)
+
+        """
+            case e: Tl < Seql < Bl
+                - pre-crypto check: algorithm predicts that the packet is in the window
+                    -> Seqh = Th
+                    -> check for a replayed packet with Seql
+                - integrity check: should pass
+                - post-crypto check: should pass
+                    -> AR window shift (may set the window in Case A)
+                    -> Seql is marked in the AR window
+        """
+        pkts = [
+            (
+                Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                / p.scapy_tra_sa.encrypt(
+                    IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                    seq_num=seq,
+                )
+            )
+            for seq in range(seq_num(Th, Tl + 5), seq_num(Th, Tl + 15))
+        ]
+
+        # the window stays in Case B
+        self.send_and_expect(self.tra_if, pkts, self.tra_if)
+
+        pkts = [
+            (
+                Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                / p.scapy_tra_sa.encrypt(
+                    IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                    seq_num=seq,
+                )
+            )
+            for seq in range(
+                seq_num(Th, Tl + anti_replay_window_size + 5),
+                seq_num(Th, Tl + anti_replay_window_size + 15),
+            )
+        ]
+
+        # the window moves to Case A
+        self.send_and_expect(self.tra_if, pkts, self.tra_if)
+
+        # reset the VPP's RX AR window to Case B
+        Th = 2
+        Tl = 30  # minimum window size of 64, we are sure to overlap
+        Bl = (Tl - anti_replay_window_size + 1) % (1 << 32)
+
+        self.vapi.cli(f"test ipsec sa {p.scapy_tra_sa_id} seq {seq_num(Th, Tl):#x}")
+        p.scapy_tra_sa.seq_num = seq_num(Th, Tl)
+
+        """
+            case f: Tl < Bl <= Seql
+                - pre-crypto check: algorithm predicts that the packet is in the previous window
+                    -> Seqh = Th - 1
+                    -> check for a replayed packet with Seql
+                - integrity check: should fail
+                - post-crypto check: ...
+        """
+        pkts = [
+            (
+                Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                / p.scapy_tra_sa.encrypt(
+                    IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                    seq_num=seq,
+                )
+            )
+            for seq in range(seq_num(Th, Bl + 10), seq_num(Th, Bl + 20))
+        ]
+
+        self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
+
+        # out-of-window packets fail integrity check
+        hash_failed_count += len(pkts)
+        self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
+
+        """
+            case g: Seql <= Tl < Bl
+                - pre-crypto check: algorithm predicts that the packet is the window
+                    -> Seqh = Th
+                    -> check for replayed packet
+                - integrity check: should fail
+                - post-crypto check: ...
+        """
+        pkts = [
+            (
+                Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                / p.scapy_tra_sa.encrypt(
+                    IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                    seq_num=seq,
+                )
+            )
+            for seq in range(seq_num(Th, 10), seq_num(Th, 15))
+        ]
+
+        self.send_and_expect(self.tra_if, pkts, self.tra_if)
+
+        p.scapy_tra_sa.seq_num = seq_num(Th + 1, Tl)
+        pkts = [
+            (
+                Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                / p.scapy_tra_sa.encrypt(
+                    IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                    seq_num=seq,
+                )
             )
-            rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
-            decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
+            for seq in range(seq_num(Th + 1, 0), seq_num(Th + 1, 15))
+        ]
 
-        else:
-            #
-            # without ESN TX sequence numbers can't wrap and packets are
-            # dropped from here on out.
-            #
-            self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
-            seq_cycle_count += len(pkts)
-            self.assert_error_counter_equal(seq_cycle_node_name, seq_cycle_count)
+        self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
 
-        # move the security-associations seq number on to the last we used
-        self.vapi.cli("test ipsec sa %d seq 0x15f" % p.scapy_tra_sa_id)
-        p.scapy_tra_sa.seq_num = 351
-        p.vpp_tra_sa.seq_num = 351
+        # some packets are rejected by the pre-crypto check
+        replay_count += 5
+        self.assertEqual(self.get_replay_counts(p), replay_count)
 
-    def verify_tra_lost(self):
-        p = self.params[socket.AF_INET]
-        esn_en = p.vpp_tra_sa.esn_en
+        # out-of-window packets fail integrity check
+        hash_failed_count += len(pkts) - 5
+        self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
 
-        #
-        # send packets with seq numbers 1->34
-        # this means the window size is still in Case B (see RFC4303
-        # Appendix A)
-        #
-        # for reasons i haven't investigated Scapy won't create a packet with
-        # seq_num=0
-        #
+        """
+            case h: Tl < Seql < Bl
+                - pre-crypto check: algorithm predicts that the packet will shift the window
+                    -> Seqh = Th
+                - integrity check: should fail
+                - post-crypto check: ...
+        """
         pkts = [
             (
                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
@@ -902,13 +1657,62 @@ class IpsecTra4(object):
                     seq_num=seq,
                 )
             )
-            for seq in range(1, 3)
+            for seq in range(seq_num(Th + 1, Tl + 10), seq_num(Th + 1, Tl + 20))
         ]
-        self.send_and_expect(self.tra_if, pkts, self.tra_if)
 
-        self.assertEqual(p.tra_sa_out.get_lost(), 0)
+        self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
 
-        # skip a sequence number
+        # out-of-window packets fail integrity check
+        hash_failed_count += len(pkts)
+        self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
+
+    def _verify_tra_anti_replay_algorithm_no_esn(self):
+        def seq_num(seql):
+            return seql & 0xFFFF_FFFF
+
+        p = self.params[socket.AF_INET]
+        anti_replay_window_size = p.anti_replay_window_size
+
+        seq_cycle_node_name = "/err/%s/seq_cycled" % self.tra4_encrypt_node_name
+        replay_count = self.get_replay_counts(p)
+        hash_failed_count = self.get_hash_failed_counts(p)
+        seq_cycle_count = self.statistics.get_err_counter(seq_cycle_node_name)
+
+        if ESP == self.encryption_type:
+            undersize_node_name = "/err/%s/runt" % self.tra4_decrypt_node_name[0]
+            undersize_count = self.statistics.get_err_counter(undersize_node_name)
+
+        # reset the TX SA to avoid conflict with left configuration
+        self.vapi.cli(f"test ipsec sa {p.vpp_tra_sa_id} seq 0x0")
+
+        """
+        RFC 4303 Appendix A2. Case A
+
+        a-c: possible seq num received
+        +: Bl, Tl
+
+        |--a--+---b---+-c--|
+              =========
+              Bl      Tl
+
+        No ESN implies Th = 0
+        Case A implies Tl >= W - 1
+        """
+
+        Tl = anti_replay_window_size + 40
+        Bl = Tl - anti_replay_window_size + 1
+
+        # move VPP's RX AR window to Case A
+        self.vapi.cli(f"test ipsec sa {p.scapy_tra_sa_id} seq {seq_num(Tl):#x}")
+        p.scapy_tra_sa.seq_num = seq_num(Tl)
+
+        """
+        case a: Seql < Bl
+            - pre-crypto check: algorithm predicts that the packet is out of window
+                -> packet should be dropped
+            - integrity check: ...
+            - post-crypto check: ...
+        """
         pkts = [
             (
                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
@@ -917,14 +1721,22 @@ class IpsecTra4(object):
                     seq_num=seq,
                 )
             )
-            for seq in range(4, 6)
+            for seq in range(seq_num(Bl - 20), seq_num(Bl - 5))
         ]
-        self.send_and_expect(self.tra_if, pkts, self.tra_if)
 
-        self.assertEqual(p.tra_sa_out.get_lost(), 0)
+        # out-of-window packets
+        self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
+        replay_count += len(pkts)
+        self.assertEqual(self.get_replay_counts(p), replay_count)
 
-        # the lost packet are counted untill we get up past the first
-        # sizeof(replay_window) packets
+        """
+            case b: Bl <= Seql <= Tl
+                - pre-crypto check: algorithm predicts that the packet is in the window
+                    -> check for a replayed packet with Seql
+                - integrity check: should pass
+                - post-crypto check:
+                    -> check for a replayed packet with Seql
+        """
         pkts = [
             (
                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
@@ -933,13 +1745,10 @@ class IpsecTra4(object):
                     seq_num=seq,
                 )
             )
-            for seq in range(6, 100)
+            for seq in range(seq_num(Tl - 50), seq_num(Tl - 30))
         ]
         self.send_and_expect(self.tra_if, pkts, self.tra_if)
 
-        self.assertEqual(p.tra_sa_out.get_lost(), 1)
-
-        # lost of holes in the sequence
         pkts = [
             (
                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
@@ -948,10 +1757,22 @@ class IpsecTra4(object):
                     seq_num=seq,
                 )
             )
-            for seq in range(100, 200, 2)
+            for seq in range(seq_num(Tl - 35), seq_num(Tl - 30))
         ]
-        self.send_and_expect(self.tra_if, pkts, self.tra_if, n_rx=50)
 
+        self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
+
+        # replayed packets
+        replay_count += 5
+        self.assertEqual(self.get_replay_counts(p), replay_count)
+
+        """
+            case c: Seql > Tl
+                - pre-crypto check: algorithm predicts that the packet will shift the window
+                - integrity check: should pass
+                - post-crypto check: should pass
+                    -> AR window is shifted
+        """
         pkts = [
             (
                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
@@ -960,13 +1781,34 @@ class IpsecTra4(object):
                     seq_num=seq,
                 )
             )
-            for seq in range(200, 300)
+            for seq in range(seq_num(Tl + 5), seq_num(Tl + 20))
         ]
+
         self.send_and_expect(self.tra_if, pkts, self.tra_if)
 
-        self.assertEqual(p.tra_sa_out.get_lost(), 51)
+        """
+            RFC 4303 Appendix A2. Case B
 
-        # a big hole in the seq number space
+            |-a-----+------b-----|
+            =========
+                    Tl
+
+            Case B implies Tl < W - 1
+        """
+
+        # reset the VPP's RX AR window to Case B
+        Tl = 30  # minimum window size of 64, we are sure to overlap
+        Bl = seq_num(Tl - anti_replay_window_size + 1)
+
+        self.vapi.cli(f"test ipsec sa {p.scapy_tra_sa_id} seq {seq_num(Tl):#x}")
+
+        """
+        case a: Seql <= Tl < Bl
+            - pre-crypto check: algorithm predicts that the packet is in the window
+                -> check for replayed packet
+            - integrity check: should fail
+            - post-crypto check: ...
+        """
         pkts = [
             (
                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
@@ -975,54 +1817,36 @@ class IpsecTra4(object):
                     seq_num=seq,
                 )
             )
-            for seq in range(400, 500)
+            for seq in range(seq_num(5), seq_num(10))
         ]
-        self.send_and_expect(self.tra_if, pkts, self.tra_if)
 
-        self.assertEqual(p.tra_sa_out.get_lost(), 151)
+        self.send_and_expect(self.tra_if, pkts, self.tra_if)
 
-    def verify_tra_basic4(self, count=1, payload_size=54):
-        """ipsec v4 transport basic test"""
-        self.vapi.cli("clear errors")
-        self.vapi.cli("clear ipsec sa")
-        try:
-            p = self.params[socket.AF_INET]
-            send_pkts = self.gen_encrypt_pkts(
-                p,
-                p.scapy_tra_sa,
-                self.tra_if,
-                src=self.tra_if.remote_ip4,
-                dst=self.tra_if.local_ip4,
-                count=count,
-                payload_size=payload_size,
+        """
+            case b: Tl < Seql < Bl
+                - pre-crypto check: algorithm predicts that the packet will shift the window
+                - integrity check: should pass
+                - post-crypto check: should pass
+                    -> AR window is shifted
+        """
+        pkts = [
+            (
+                Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                / p.scapy_tra_sa.encrypt(
+                    IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                    seq_num=seq,
+                )
             )
-            recv_pkts = self.send_and_expect(self.tra_if, send_pkts, self.tra_if)
-            for rx in recv_pkts:
-                self.assertEqual(len(rx) - len(Ether()), rx[IP].len)
-                self.assert_packet_checksums_valid(rx)
-                try:
-                    decrypted = p.vpp_tra_sa.decrypt(rx[IP])
-                    self.assert_packet_checksums_valid(decrypted)
-                except:
-                    self.logger.debug(ppp("Unexpected packet:", rx))
-                    raise
-        finally:
-            self.logger.info(self.vapi.ppcli("show error"))
-            self.logger.info(self.vapi.ppcli("show ipsec all"))
+            for seq in range(seq_num(-50), seq_num(-20))
+        ]
 
-        pkts = p.tra_sa_in.get_stats()["packets"]
-        self.assertEqual(
-            pkts, count, "incorrect SA in counts: expected %d != %d" % (count, pkts)
-        )
-        pkts = p.tra_sa_out.get_stats()["packets"]
-        self.assertEqual(
-            pkts, count, "incorrect SA out counts: expected %d != %d" % (count, pkts)
-        )
-        self.assertEqual(p.tra_sa_out.get_lost(), 0)
-        self.assertEqual(p.tra_sa_in.get_lost(), 0)
+        self.send_and_expect(self.tra_if, pkts, self.tra_if)
 
-        self.assert_packet_counter_equal(self.tra4_encrypt_node_name, count)
-        self.assert_packet_counter_equal(self.tra4_decrypt_node_name[0], count)
+    def verify_tra_anti_replay_algorithm(self):
+        if self.params[socket.AF_INET].vpp_tra_sa.esn_en:
+            self._verify_tra_anti_replay_algorithm_esn()
+        else:
+            self._verify_tra_anti_replay_algorithm_no_esn()
 
 
 class IpsecTra4Tests(IpsecTra4):
@@ -1032,6 +1856,10 @@ class IpsecTra4Tests(IpsecTra4):
         """ipsec v4 transport anti-replay test"""
         self.verify_tra_anti_replay()
 
+    def test_tra_anti_replay_algorithm(self):
+        """ipsec v4 transport anti-replay algorithm test"""
+        self.verify_tra_anti_replay_algorithm()
+
     def test_tra_lost(self):
         """ipsec v4 transport lost packet test"""
         self.verify_tra_lost()
@@ -1273,7 +2101,7 @@ class IpsecTun4(object):
         decrypt_pkts = []
         for rx in rxs:
             if p.nat_header:
-                self.assertEqual(rx[UDP].dport, 4500)
+                self.assertEqual(rx[UDP].dport, p.nat_header.dport)
             self.assert_packet_checksums_valid(rx)
             self.assertEqual(len(rx) - len(Ether()), rx[IP].len)
             try:
@@ -1459,7 +2287,7 @@ class IpsecTun4(object):
         )
         self.send_and_assert_no_replies(self.tun_if, pkt * 31)
         self.assert_error_counter_equal(
-            "/err/%s/NAT Keepalive" % self.tun4_input_node, 31
+            "/err/%s/nat_keepalive" % self.tun4_input_node, 31
         )
 
         pkt = (
@@ -1469,7 +2297,7 @@ class IpsecTun4(object):
             / Raw(b"\xfe")
         )
         self.send_and_assert_no_replies(self.tun_if, pkt * 31)
-        self.assert_error_counter_equal("/err/%s/Too Short" % self.tun4_input_node, 31)
+        self.assert_error_counter_equal("/err/%s/too_short" % self.tun4_input_node, 31)
 
         pkt = (
             Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac)
@@ -1479,7 +2307,7 @@ class IpsecTun4(object):
             / Padding(0 * 21)
         )
         self.send_and_assert_no_replies(self.tun_if, pkt * 31)
-        self.assert_error_counter_equal("/err/%s/Too Short" % self.tun4_input_node, 62)
+        self.assert_error_counter_equal("/err/%s/too_short" % self.tun4_input_node, 62)
 
 
 class IpsecTun4Tests(IpsecTun4):
@@ -1705,6 +2533,40 @@ class IpsecTun6(object):
             self.logger.info(self.vapi.ppcli("show ipsec all"))
         self.verify_counters6(p, p, count)
 
+    def verify_keepalive(self, p):
+        # the sizeof Raw is calculated to pad to the minimum ehternet
+        # frame size of 64 btyes
+        pkt = (
+            Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac)
+            / IPv6(src=p.remote_tun_if_host, dst=self.tun_if.local_ip6)
+            / UDP(sport=333, dport=4500)
+            / Raw(b"\xff")
+            / Padding(0 * 1)
+        )
+        self.send_and_assert_no_replies(self.tun_if, pkt * 31)
+        self.assert_error_counter_equal(
+            "/err/%s/nat_keepalive" % self.tun6_input_node, 31
+        )
+
+        pkt = (
+            Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac)
+            / IPv6(src=p.remote_tun_if_host, dst=self.tun_if.local_ip6)
+            / UDP(sport=333, dport=4500)
+            / Raw(b"\xfe")
+        )
+        self.send_and_assert_no_replies(self.tun_if, pkt * 31)
+        self.assert_error_counter_equal("/err/%s/too_short" % self.tun6_input_node, 31)
+
+        pkt = (
+            Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac)
+            / IPv6(src=p.remote_tun_if_host, dst=self.tun_if.local_ip6)
+            / UDP(sport=333, dport=4500)
+            / Raw(b"\xfe")
+            / Padding(0 * 21)
+        )
+        self.send_and_assert_no_replies(self.tun_if, pkt * 31)
+        self.assert_error_counter_equal("/err/%s/too_short" % self.tun6_input_node, 62)
+
 
 class IpsecTun6Tests(IpsecTun6):
     """UT test methods for Tunnel v6"""
@@ -1945,20 +2807,48 @@ class IPSecIPv4Fwd(VppTestCase):
         self.logger.info(self.vapi.ppcli("show ipsec all"))
         return spdEntry
 
-    def create_stream(self, src_if, dst_if, pkt_count, src_prt=1234, dst_prt=5678):
+    def create_stream(
+        self, src_if, dst_if, pkt_count, src_prt=1234, dst_prt=5678, proto="UDP"
+    ):
         packets = []
+        # create SA
+        sa = SecurityAssociation(
+            ESP,
+            spi=1000,
+            crypt_algo="AES-CBC",
+            crypt_key=b"JPjyOWBeVEQiMe7h",
+            auth_algo="HMAC-SHA1-96",
+            auth_key=b"C91KUR9GYMm5GfkEvNjX",
+            tunnel_header=IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4),
+            nat_t_header=UDP(sport=src_prt, dport=dst_prt),
+        )
         for i in range(pkt_count):
             # create packet info stored in the test case instance
             info = self.create_packet_info(src_if, dst_if)
             # convert the info into packet payload
             payload = self.info_to_payload(info)
             # create the packet itself
-            p = (
-                Ether(dst=src_if.local_mac, src=src_if.remote_mac)
-                / IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4)
-                / UDP(sport=src_prt, dport=dst_prt)
-                / Raw(payload)
-            )
+            p = []
+            if proto == "UDP-ESP":
+                p = Ether(dst=src_if.local_mac, src=src_if.remote_mac) / sa.encrypt(
+                    IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4)
+                    / UDP(sport=src_prt, dport=dst_prt)
+                    / Raw(payload)
+                )
+            elif proto == "UDP":
+                p = (
+                    Ether(dst=src_if.local_mac, src=src_if.remote_mac)
+                    / IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4)
+                    / UDP(sport=src_prt, dport=dst_prt)
+                    / Raw(payload)
+                )
+            elif proto == "TCP":
+                p = (
+                    Ether(dst=src_if.local_mac, src=src_if.remote_mac)
+                    / IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4)
+                    / TCP(sport=src_prt, dport=dst_prt)
+                    / Raw(payload)
+                )
             # store a copy of the packet in the packet info
             info.data = p.copy()
             # append the packet to the list
@@ -2012,6 +2902,50 @@ class IPSecIPv4Fwd(VppTestCase):
         self.logger.info("Policy %s matched: %d pkts", str(spdEntry), matched_pkts)
         self.assert_equal(pkt_count, matched_pkts)
 
+    # Method verify_l3_l4_capture() will verify network and transport layer
+    # fields of the packet sa.encrypt() gives interface number garbadge.
+    # thus interface validation get failed (scapy bug?). However our intent
+    # is to verify IP layer and above and that is covered.
+
+    def verify_l3_l4_capture(
+        self, src_if, dst_if, capture, tcp_port_in=1234, udp_port_in=5678
+    ):
+        for packet in capture:
+            try:
+                self.assert_packet_checksums_valid(packet)
+                self.assert_equal(
+                    packet[IP].src,
+                    src_if.remote_ip4,
+                    "decrypted packet source address",
+                )
+                self.assert_equal(
+                    packet[IP].dst,
+                    dst_if.remote_ip4,
+                    "decrypted packet destination address",
+                )
+                if packet.haslayer(TCP):
+                    self.assertFalse(
+                        packet.haslayer(UDP),
+                        "unexpected UDP header in decrypted packet",
+                    )
+                elif packet.haslayer(UDP):
+                    if packet[UDP].payload:
+                        self.assertFalse(
+                            packet[UDP][1].haslayer(UDP),
+                            "unexpected UDP header in decrypted packet",
+                        )
+                else:
+                    self.assertFalse(
+                        packet.haslayer(UDP),
+                        "unexpected UDP header in decrypted packet",
+                    )
+                    self.assert_equal(
+                        packet[ICMP].id, self.icmp_id_in, "decrypted packet ICMP ID"
+                    )
+            except Exception:
+                self.logger.error(ppp("Unexpected or invalid plain packet:", packet))
+                raise
+
 
 class SpdFlowCacheTemplate(IPSecIPv4Fwd):
     @classmethod
@@ -2087,6 +3021,84 @@ class SpdFlowCacheTemplate(IPSecIPv4Fwd):
             self.logger.info("\ncrc32 NOT supported:\n" + cpu_info)
             return False
 
+    def create_stream(
+        cls, src_if, dst_if, pkt_count, src_prt=1234, dst_prt=5678, proto="UDP-ESP"
+    ):
+        packets = []
+        packets = super(SpdFlowCacheTemplate, cls).create_stream(
+            src_if, dst_if, pkt_count, src_prt, dst_prt, proto
+        )
+        return packets
+
+    def verify_capture(
+        self, src_if, dst_if, capture, tcp_port_in=1234, udp_port_in=5678
+    ):
+        super(SpdFlowCacheTemplate, self).verify_l3_l4_capture(
+            src_if, dst_if, capture, tcp_port_in, udp_port_in
+        )
+
+
+class SpdFastPathTemplate(IPSecIPv4Fwd):
+    @classmethod
+    def setUpConstants(cls):
+        super(SpdFastPathTemplate, cls).setUpConstants()
+        # Override this method with required cmdline parameters e.g.
+        # cls.vpp_cmdline.extend(["ipsec", "{",
+        #                         "ipv4-outbound-spd-flow-cache on",
+        #                         "}"])
+        # cls.logger.info("VPP modified cmdline is %s" % " "
+        #                 .join(cls.vpp_cmdline))
+
+    def setUp(self):
+        super(SpdFastPathTemplate, self).setUp()
+
+    def tearDown(self):
+        super(SpdFastPathTemplate, self).tearDown()
+
+    def create_stream(
+        cls, src_if, dst_if, pkt_count, src_prt=1234, dst_prt=5678, proto="UDP-ESP"
+    ):
+        packets = []
+        packets = super(SpdFastPathTemplate, cls).create_stream(
+            src_if, dst_if, pkt_count, src_prt, dst_prt, proto
+        )
+        return packets
+
+    def verify_capture(
+        self, src_if, dst_if, capture, tcp_port_in=1234, udp_port_in=5678
+    ):
+        super(SpdFastPathTemplate, self).verify_l3_l4_capture(
+            src_if, dst_if, capture, tcp_port_in, udp_port_in
+        )
+
+
+class IpsecDefaultTemplate(IPSecIPv4Fwd):
+    @classmethod
+    def setUpConstants(cls):
+        super(IpsecDefaultTemplate, cls).setUpConstants()
+
+    def setUp(self):
+        super(IpsecDefaultTemplate, self).setUp()
+
+    def tearDown(self):
+        super(IpsecDefaultTemplate, self).tearDown()
+
+    def create_stream(
+        cls, src_if, dst_if, pkt_count, src_prt=1234, dst_prt=5678, proto="UDP-ESP"
+    ):
+        packets = []
+        packets = super(IpsecDefaultTemplate, cls).create_stream(
+            src_if, dst_if, pkt_count, src_prt, dst_prt, proto
+        )
+        return packets
+
+    def verify_capture(
+        self, src_if, dst_if, capture, tcp_port_in=1234, udp_port_in=5678
+    ):
+        super(IpsecDefaultTemplate, self).verify_l3_l4_capture(
+            src_if, dst_if, capture, tcp_port_in, udp_port_in
+        )
+
 
 class IPSecIPv6Fwd(VppTestCase):
     """Test IPSec by capturing and verifying IPv6 forwarded pkts"""