+ self.assert_packet_counter_equal(self.tra4_decrypt_node_name[0], count)
+
+ def _verify_tra_anti_replay_algorithm_esn(self):
+ def seq_num(seqh, seql):
+ return (seqh << 32) | (seql & 0xFFFF_FFFF)
+
+ p = self.params[socket.AF_INET]
+ anti_replay_window_size = p.anti_replay_window_size
+
+ seq_cycle_node_name = "/err/%s/seq_cycled" % self.tra4_encrypt_node_name
+ replay_count = self.get_replay_counts(p)
+ hash_failed_count = self.get_hash_failed_counts(p)
+ seq_cycle_count = self.statistics.get_err_counter(seq_cycle_node_name)
+
+ if ESP == self.encryption_type:
+ undersize_node_name = "/err/%s/runt" % self.tra4_decrypt_node_name[0]
+ undersize_count = self.statistics.get_err_counter(undersize_node_name)
+
+ # reset the TX SA to avoid conflict with left configuration
+ self.vapi.cli(f"test ipsec sa {p.vpp_tra_sa_id} seq 0x0")
+
+ """
+ RFC 4303 Appendix A2. Case A
+
+ |: new Th marker
+ a-i: possible seq num received
+ +: Bl, Tl, Bl', Tl'
+ [BT]l(sign) = [BT]l (sign) 2^32 mod 2^32 (Th inc/dec-remented by 1)
+
+ Th - 1 Th Th + 1
+ --|--a--+---b---+-c--|--d--+---e---+-f--|--g--+---h---+--i-|--
+ ========= ========= =========
+ Bl- Tl- Bl Tl Bl+ Tl+
+
+ Case A implies Tl >= W - 1
+ """
+
+ Th = 1
+ Tl = anti_replay_window_size + 40
+ Bl = Tl - anti_replay_window_size + 1
+
+ # move VPP's RX AR window to Case A
+ self.vapi.cli(f"test ipsec sa {p.scapy_tra_sa_id} seq {seq_num(Th, Tl):#x}")
+ p.scapy_tra_sa.seq_num = seq_num(Th, Tl)
+
+ """
+ case a: Seql < Bl
+ - pre-crypto check: algorithm predicts that the packet wrap the window
+ -> Seqh = Th + 1
+ - integrity check: should fail
+ - post-crypto check: ...
+ """
+ pkts = [
+ (
+ Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+ / p.scapy_tra_sa.encrypt(
+ IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+ seq_num=seq,
+ )
+ )
+ for seq in range(seq_num(Th - 1, Bl - 20), seq_num(Th - 1, Bl - 5))
+ ]
+
+ # out-of-window packets fail integrity check
+ self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
+ hash_failed_count += len(pkts)
+ self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
+
+ """
+ case b: Bl <= Seql <= Tl
+ - pre-crypto check: algorithm predicts that the packet is in the window
+ -> Seqh = Th
+ -> check for a replayed packet with Seql
+ - integrity check: should fail
+ - post-crypto check: ...
+ """
+ pkts = [
+ (
+ Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+ / p.scapy_tra_sa.encrypt(
+ IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+ seq_num=seq,
+ )
+ )
+ for seq in range(seq_num(Th, Tl - 10), seq_num(Th, Tl - 5))
+ ]
+ self.send_and_expect(self.tra_if, pkts, self.tra_if)
+
+ p.scapy_tra_sa.seq_num = seq_num(Th - 1, Tl)
+ pkts = [
+ (
+ Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+ / p.scapy_tra_sa.encrypt(
+ IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+ seq_num=seq,
+ )
+ )
+ for seq in range(seq_num(Th - 1, Tl - 35), seq_num(Th - 1, Tl - 5))
+ ]
+
+ self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
+
+ # some packets are rejected by the pre-crypto check
+ replay_count += 5
+ self.assertEqual(self.get_replay_counts(p), replay_count)
+
+ # out-of-window packets fail integrity check
+ hash_failed_count += len(pkts) - 5
+ self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
+
+ """
+ case c: Seql > Tl
+ - pre-crypto check: algorithm predicts that the packet does not wrap the window
+ -> Seqh = Th
+ - integrity check: should fail
+ - post-crypto check: ...
+ """
+ pkts = [
+ (
+ Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+ / p.scapy_tra_sa.encrypt(
+ IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+ seq_num=seq,
+ )
+ )
+ for seq in range(seq_num(Th - 1, Tl + 5), seq_num(Th - 1, Tl + 20))
+ ]
+
+ self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
+
+ # out-of-window packets fail integrity check
+ hash_failed_count += len(pkts)
+ self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
+
+ """
+ case d: Seql < Bl
+ - pre-crypto check: algorithm predicts that the packet wrap the window
+ -> Seqh = Th + 1
+ - integrity check: should fail
+ - post-crypto check: ...
+ """
+ p.scapy_tra_sa.seq_num = seq_num(Th, Tl)
+ pkts = [
+ (
+ Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+ / p.scapy_tra_sa.encrypt(
+ IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+ seq_num=seq,
+ )
+ )
+ for seq in range(seq_num(Th, Bl - 20), seq_num(Th, Bl - 5))
+ ]
+
+ # out-of-window packets fail integrity check
+ self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
+ hash_failed_count += len(pkts)
+ self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
+
+ """
+ case e: Bl <= Seql <= Tl
+ - pre-crypto check: algorithm predicts that the packet is in the window
+ -> Seqh = Th
+ -> check for a replayed packet with Seql
+ - integrity check: should pass
+ - post-crypto check: should pass
+ -> Seql is marked in the AR window
+ """
+ pkts = [
+ (
+ Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+ / p.scapy_tra_sa.encrypt(
+ IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+ seq_num=seq,
+ )
+ )
+ for seq in range(seq_num(Th, Bl + 10), seq_num(Th, Bl + 30))
+ ]
+
+ self.send_and_expect(self.tra_if, pkts, self.tra_if)
+
+ """
+ case f: Seql > Tl
+ - pre-crypto check: algorithm predicts that the packet does not wrap the window
+ -> Seqh = Th
+ - integrity check: should pass
+ - post-crypto check: should pass
+ -> AR window shift (the window stays Case A)
+ -> Seql is marked in the AR window
+ """
+ pkts = [
+ (
+ Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+ / p.scapy_tra_sa.encrypt(
+ IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+ seq_num=seq,
+ )
+ )
+ for seq in range(seq_num(Th, Tl + 50), seq_num(Th, Tl + 60))
+ ]
+
+ self.send_and_expect(self.tra_if, pkts, self.tra_if)
+
+ """
+ case g: Seql < Bl
+ - pre-crypto check: algorithm predicts that the packet wrap the window
+ -> Seqh = Th + 1
+ - integrity check: should pass
+ - post-crypto check: should pass
+ -> AR window shift (may set the window in Case B)
+ -> Seql is marked in the AR window
+ """
+ p.scapy_tra_sa.seq_num = seq_num(Th + 1, Tl)
+ pkts = [
+ (
+ Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+ / p.scapy_tra_sa.encrypt(
+ IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+ seq_num=seq,
+ )
+ )
+ # set the window in Case B (the minimum window size is 64
+ # so we are sure to overlap)
+ for seq in range(seq_num(Th + 1, 10), seq_num(Th + 1, 20))
+ ]
+
+ self.send_and_expect(self.tra_if, pkts, self.tra_if)
+
+ # reset the VPP's RX AR window to Case A
+ Th = 1
+ Tl = 2 * anti_replay_window_size + 40
+ Bl = Tl - anti_replay_window_size + 1
+
+ self.vapi.cli(f"test ipsec sa {p.scapy_tra_sa_id} seq {seq_num(Th, Tl):#x}")
+
+ p.scapy_tra_sa.seq_num = seq_num(Th + 1, Tl)
+ pkts = [
+ (
+ Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+ / p.scapy_tra_sa.encrypt(
+ IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+ seq_num=seq,
+ )
+ )
+ # the AR will stay in Case A
+ for seq in range(
+ seq_num(Th + 1, anti_replay_window_size + 10),
+ seq_num(Th + 1, anti_replay_window_size + 20),
+ )
+ ]
+
+ self.send_and_expect(self.tra_if, pkts, self.tra_if)
+
+ """
+ case h: Bl <= Seql <= Tl
+ - pre-crypto check: algorithm predicts that the packet is in the window
+ -> Seqh = Th
+ -> check for a replayed packet with Seql
+ - integrity check: the wrap is not detected, should fail
+ - post-crypto check: ...
+ """
+ Th += 1
+ Tl = anti_replay_window_size + 20
+ Bl = Tl - anti_replay_window_size + 1
+
+ p.scapy_tra_sa.seq_num = seq_num(Th + 1, Tl)
+
+ pkts = [
+ (
+ Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+ / p.scapy_tra_sa.encrypt(
+ IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+ seq_num=seq,
+ )
+ )
+ for seq in range(seq_num(Th + 1, Tl - 20), seq_num(Th + 1, Tl - 5))
+ ]
+
+ self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
+
+ # some packets are rejected by the pre-crypto check
+ replay_count += 5
+ self.assertEqual(self.get_replay_counts(p), replay_count)
+
+ # out-of-window packets fail integrity check
+ hash_failed_count += len(pkts) - 5
+ self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
+
+ """
+ case i: Seql > Tl
+ - pre-crypto check: algorithm predicts that the packet does not wrap the window
+ -> Seqh = Th
+ - integrity check: the wrap is not detected, shoud fail
+ - post-crypto check: ...
+ """
+ pkts = [
+ (
+ Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+ / p.scapy_tra_sa.encrypt(
+ IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+ seq_num=seq,
+ )
+ )
+ for seq in range(seq_num(Th + 1, Tl + 5), seq_num(Th + 1, Tl + 15))
+ ]
+
+ # out-of-window packets fail integrity check
+ self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
+ hash_failed_count += len(pkts)
+ self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
+
+ """
+ RFC 4303 Appendix A2. Case B
+
+ Th - 1 Th Th + 1
+ ----|-a-+-----b----+--c--|-d-+----e-----+--f--|-g-+--h---
+ ========= =========== ===========
+ Tl- Bl Tl Bl+ Tl+
+
+ Case B implies Tl < W - 1
+ """
+
+ # reset the VPP's RX AR window to Case B
+ Th = 2
+ Tl = 30 # minimum window size of 64, we are sure to overlap
+ Bl = (Tl - anti_replay_window_size + 1) % (1 << 32)
+
+ self.vapi.cli(f"test ipsec sa {p.scapy_tra_sa_id} seq {seq_num(Th, Tl):#x}")
+ p.scapy_tra_sa.seq_num = seq_num(Th, Tl)
+
+ """
+ case a: Seql <= Tl < Bl
+ - pre-crypto check: algorithm predicts that the packet is in the window
+ -> Seqh = Th
+ -> check for replayed packet
+ - integrity check: should fail
+ - post-crypto check: ...
+ """
+ pkts = [
+ (
+ Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+ / p.scapy_tra_sa.encrypt(
+ IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+ seq_num=seq,
+ )
+ )
+ for seq in range(seq_num(Th, 5), seq_num(Th, 10))
+ ]
+
+ self.send_and_expect(self.tra_if, pkts, self.tra_if)
+
+ p.scapy_tra_sa.seq_num = seq_num(Th - 1, Tl)
+ pkts = [
+ (
+ Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+ / p.scapy_tra_sa.encrypt(
+ IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+ seq_num=seq,
+ )
+ )
+ for seq in range(seq_num(Th - 1, 0), seq_num(Th - 1, 15))
+ ]
+
+ self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
+
+ # some packets are rejected by the pre-crypto check
+ replay_count += 5
+ self.assertEqual(self.get_replay_counts(p), replay_count)
+
+ # out-of-window packets fail integrity check
+ hash_failed_count += len(pkts) - 5
+ self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
+
+ """
+ case b: Tl < Seql < Bl
+ - pre-crypto check: algorithm predicts that the packet will shift the window
+ -> Seqh = Th
+ - integrity check: should fail
+ - post-crypto check: ...
+ """
+ pkts = [
+ (
+ Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+ / p.scapy_tra_sa.encrypt(
+ IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+ seq_num=seq,
+ )
+ )
+ for seq in range(seq_num(Th - 1, Tl + 10), seq_num(Th - 1, Tl + 20))
+ ]
+
+ self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
+
+ # out-of-window packets fail integrity check
+ hash_failed_count += len(pkts)
+ self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
+
+ """
+ case c: Tl < Bl <= Seql
+ - pre-crypto check: algorithm predicts that the packet is in the window
+ -> Seqh = Th - 1
+ -> check for a replayed packet with Seql
+ - integrity check: should pass
+ - post-crypto check: should pass
+ -> Seql is marked in the AR window
+ """
+ pkts = [
+ (
+ Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+ / p.scapy_tra_sa.encrypt(
+ IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+ seq_num=seq,
+ )
+ )
+ for seq in range(seq_num(Th - 1, Bl + 10), seq_num(Th - 1, Bl + 20))
+ ]
+
+ self.send_and_expect(self.tra_if, pkts, self.tra_if)
+
+ """
+ case d: Seql <= Tl < Bl
+ - pre-crypto check: algorithm predicts that the packet is the window
+ -> Seqh = Th
+ -> check for replayed packet
+ - integrity check: should pass
+ - post-crypto check: should pass
+ -> Seql is marked in the AR window
+ """
+ p.scapy_tra_sa.seq_num = seq_num(Th, Tl)
+ pkts = [
+ (
+ Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+ / p.scapy_tra_sa.encrypt(
+ IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+ seq_num=seq,
+ )
+ )
+ for seq in range(seq_num(Th, 15), seq_num(Th, 25))
+ ]
+
+ self.send_and_expect(self.tra_if, pkts, self.tra_if)
+
+ """
+ case e: Tl < Seql < Bl
+ - pre-crypto check: algorithm predicts that the packet is in the window
+ -> Seqh = Th
+ -> check for a replayed packet with Seql
+ - integrity check: should pass
+ - post-crypto check: should pass
+ -> AR window shift (may set the window in Case A)
+ -> Seql is marked in the AR window
+ """
+ pkts = [
+ (
+ Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+ / p.scapy_tra_sa.encrypt(
+ IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+ seq_num=seq,
+ )
+ )
+ for seq in range(seq_num(Th, Tl + 5), seq_num(Th, Tl + 15))
+ ]
+
+ # the window stays in Case B
+ self.send_and_expect(self.tra_if, pkts, self.tra_if)
+
+ pkts = [
+ (
+ Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+ / p.scapy_tra_sa.encrypt(
+ IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+ seq_num=seq,
+ )
+ )
+ for seq in range(
+ seq_num(Th, Tl + anti_replay_window_size + 5),
+ seq_num(Th, Tl + anti_replay_window_size + 15),
+ )
+ ]
+
+ # the window moves to Case A
+ self.send_and_expect(self.tra_if, pkts, self.tra_if)
+
+ # reset the VPP's RX AR window to Case B
+ Th = 2
+ Tl = 30 # minimum window size of 64, we are sure to overlap
+ Bl = (Tl - anti_replay_window_size + 1) % (1 << 32)
+
+ self.vapi.cli(f"test ipsec sa {p.scapy_tra_sa_id} seq {seq_num(Th, Tl):#x}")
+ p.scapy_tra_sa.seq_num = seq_num(Th, Tl)
+
+ """
+ case f: Tl < Bl <= Seql
+ - pre-crypto check: algorithm predicts that the packet is in the previous window
+ -> Seqh = Th - 1
+ -> check for a replayed packet with Seql
+ - integrity check: should fail
+ - post-crypto check: ...
+ """
+ pkts = [
+ (
+ Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+ / p.scapy_tra_sa.encrypt(
+ IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+ seq_num=seq,
+ )
+ )
+ for seq in range(seq_num(Th, Bl + 10), seq_num(Th, Bl + 20))
+ ]
+
+ self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
+
+ # out-of-window packets fail integrity check
+ hash_failed_count += len(pkts)
+ self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
+
+ """
+ case g: Seql <= Tl < Bl
+ - pre-crypto check: algorithm predicts that the packet is the window
+ -> Seqh = Th
+ -> check for replayed packet
+ - integrity check: should fail
+ - post-crypto check: ...
+ """
+ pkts = [
+ (
+ Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+ / p.scapy_tra_sa.encrypt(
+ IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+ seq_num=seq,
+ )
+ )
+ for seq in range(seq_num(Th, 10), seq_num(Th, 15))
+ ]
+
+ self.send_and_expect(self.tra_if, pkts, self.tra_if)
+
+ p.scapy_tra_sa.seq_num = seq_num(Th + 1, Tl)
+ pkts = [
+ (
+ Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+ / p.scapy_tra_sa.encrypt(
+ IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+ seq_num=seq,
+ )
+ )
+ for seq in range(seq_num(Th + 1, 0), seq_num(Th + 1, 15))
+ ]
+
+ self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
+
+ # some packets are rejected by the pre-crypto check
+ replay_count += 5
+ self.assertEqual(self.get_replay_counts(p), replay_count)
+
+ # out-of-window packets fail integrity check
+ hash_failed_count += len(pkts) - 5
+ self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
+
+ """
+ case h: Tl < Seql < Bl
+ - pre-crypto check: algorithm predicts that the packet will shift the window
+ -> Seqh = Th
+ - integrity check: should fail
+ - post-crypto check: ...
+ """
+ pkts = [
+ (
+ Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+ / p.scapy_tra_sa.encrypt(
+ IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+ seq_num=seq,
+ )
+ )
+ for seq in range(seq_num(Th + 1, Tl + 10), seq_num(Th + 1, Tl + 20))
+ ]
+
+ self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
+
+ # out-of-window packets fail integrity check
+ hash_failed_count += len(pkts)
+ self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
+
+ def _verify_tra_anti_replay_algorithm_no_esn(self):
+ def seq_num(seql):
+ return seql & 0xFFFF_FFFF
+
+ p = self.params[socket.AF_INET]
+ anti_replay_window_size = p.anti_replay_window_size
+
+ seq_cycle_node_name = "/err/%s/seq_cycled" % self.tra4_encrypt_node_name
+ replay_count = self.get_replay_counts(p)
+ hash_failed_count = self.get_hash_failed_counts(p)
+ seq_cycle_count = self.statistics.get_err_counter(seq_cycle_node_name)
+
+ if ESP == self.encryption_type:
+ undersize_node_name = "/err/%s/runt" % self.tra4_decrypt_node_name[0]
+ undersize_count = self.statistics.get_err_counter(undersize_node_name)
+
+ # reset the TX SA to avoid conflict with left configuration
+ self.vapi.cli(f"test ipsec sa {p.vpp_tra_sa_id} seq 0x0")
+
+ """
+ RFC 4303 Appendix A2. Case A
+
+ a-c: possible seq num received
+ +: Bl, Tl
+
+ |--a--+---b---+-c--|
+ =========
+ Bl Tl
+
+ No ESN implies Th = 0
+ Case A implies Tl >= W - 1
+ """
+
+ Tl = anti_replay_window_size + 40
+ Bl = Tl - anti_replay_window_size + 1
+
+ # move VPP's RX AR window to Case A
+ self.vapi.cli(f"test ipsec sa {p.scapy_tra_sa_id} seq {seq_num(Tl):#x}")
+ p.scapy_tra_sa.seq_num = seq_num(Tl)
+
+ """
+ case a: Seql < Bl
+ - pre-crypto check: algorithm predicts that the packet is out of window
+ -> packet should be dropped
+ - integrity check: ...
+ - post-crypto check: ...
+ """
+ pkts = [
+ (
+ Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+ / p.scapy_tra_sa.encrypt(
+ IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+ seq_num=seq,
+ )
+ )
+ for seq in range(seq_num(Bl - 20), seq_num(Bl - 5))
+ ]
+
+ # out-of-window packets
+ self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
+ replay_count += len(pkts)
+ self.assertEqual(self.get_replay_counts(p), replay_count)
+
+ """
+ case b: Bl <= Seql <= Tl
+ - pre-crypto check: algorithm predicts that the packet is in the window
+ -> check for a replayed packet with Seql
+ - integrity check: should pass
+ - post-crypto check:
+ -> check for a replayed packet with Seql
+ """
+ pkts = [
+ (
+ Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+ / p.scapy_tra_sa.encrypt(
+ IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+ seq_num=seq,
+ )
+ )
+ for seq in range(seq_num(Tl - 50), seq_num(Tl - 30))
+ ]
+ self.send_and_expect(self.tra_if, pkts, self.tra_if)
+
+ pkts = [
+ (
+ Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+ / p.scapy_tra_sa.encrypt(
+ IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+ seq_num=seq,
+ )
+ )
+ for seq in range(seq_num(Tl - 35), seq_num(Tl - 30))
+ ]
+
+ self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
+
+ # replayed packets
+ replay_count += 5
+ self.assertEqual(self.get_replay_counts(p), replay_count)
+
+ """
+ case c: Seql > Tl
+ - pre-crypto check: algorithm predicts that the packet will shift the window
+ - integrity check: should pass
+ - post-crypto check: should pass
+ -> AR window is shifted
+ """
+ pkts = [
+ (
+ Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+ / p.scapy_tra_sa.encrypt(
+ IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+ seq_num=seq,
+ )
+ )
+ for seq in range(seq_num(Tl + 5), seq_num(Tl + 20))
+ ]
+
+ self.send_and_expect(self.tra_if, pkts, self.tra_if)
+
+ """
+ RFC 4303 Appendix A2. Case B
+
+ |-a-----+------b-----|
+ =========
+ Tl
+
+ Case B implies Tl < W - 1
+ """
+
+ # reset the VPP's RX AR window to Case B
+ Tl = 30 # minimum window size of 64, we are sure to overlap
+ Bl = seq_num(Tl - anti_replay_window_size + 1)
+
+ self.vapi.cli(f"test ipsec sa {p.scapy_tra_sa_id} seq {seq_num(Tl):#x}")
+
+ """
+ case a: Seql <= Tl < Bl
+ - pre-crypto check: algorithm predicts that the packet is in the window
+ -> check for replayed packet
+ - integrity check: should fail
+ - post-crypto check: ...
+ """
+ pkts = [
+ (
+ Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+ / p.scapy_tra_sa.encrypt(
+ IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+ seq_num=seq,
+ )
+ )
+ for seq in range(seq_num(5), seq_num(10))
+ ]
+
+ self.send_and_expect(self.tra_if, pkts, self.tra_if)
+
+ """
+ case b: Tl < Seql < Bl
+ - pre-crypto check: algorithm predicts that the packet will shift the window
+ - integrity check: should pass
+ - post-crypto check: should pass
+ -> AR window is shifted
+ """
+ pkts = [
+ (
+ Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+ / p.scapy_tra_sa.encrypt(
+ IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+ seq_num=seq,
+ )
+ )
+ for seq in range(seq_num(-50), seq_num(-20))
+ ]
+
+ self.send_and_expect(self.tra_if, pkts, self.tra_if)
+
+ def verify_tra_anti_replay_algorithm(self):
+ if self.params[socket.AF_INET].vpp_tra_sa.esn_en:
+ self._verify_tra_anti_replay_algorithm_esn()
+ else:
+ self._verify_tra_anti_replay_algorithm_no_esn()