+ hash_failed_count += 1
+ self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
+ if hash_err != "":
+ err = p.tra_sa_in.get_err(hash_err) + initial_sa_node_hash_diff
+ self.assertEqual(err, hash_failed_count)
+
+ #
+ # but if we move the window forward to case B, then we can wrap
+ # again
+ #
+ p.scapy_tra_sa.seq_num = 0x100000000 + anti_replay_window_size + 0x555
+ pkt = Ether(
+ src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
+ ) / p.scapy_tra_sa.encrypt(
+ IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+ seq_num=p.scapy_tra_sa.seq_num,
+ )
+ rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
+ decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
+
+ p.scapy_tra_sa.seq_num = 0x200000444
+ pkt = Ether(
+ src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
+ ) / p.scapy_tra_sa.encrypt(
+ IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+ seq_num=0x200000444,
+ )
+ rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
+ decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
+
+ else:
+ #
+ # without ESN TX sequence numbers can't wrap and packets are
+ # dropped from here on out.
+ #
+ self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
+ seq_cycle_count += len(pkts)
+ self.assert_error_counter_equal(seq_cycle_node_name, seq_cycle_count)
+ err = p.tra_sa_out.get_err("seq_cycled") + initial_sa_node_cycled_diff
+ self.assertEqual(err, seq_cycle_count)
+
+ # move the security-associations seq number on to the last we used
+ self.vapi.cli("test ipsec sa %d seq 0x15f" % p.scapy_tra_sa_id)
+ p.scapy_tra_sa.seq_num = 351
+ p.vpp_tra_sa.seq_num = 351
+
+ def verify_tra_lost(self):
+ p = self.params[socket.AF_INET]
+ esn_en = p.vpp_tra_sa.esn_en
+
+ #
+ # send packets with seq numbers 1->34
+ # this means the window size is still in Case B (see RFC4303
+ # Appendix A)
+ #
+ # for reasons i haven't investigated Scapy won't create a packet with
+ # seq_num=0
+ #
+ pkts = [
+ (
+ Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+ / p.scapy_tra_sa.encrypt(
+ IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+ seq_num=seq,
+ )
+ )
+ for seq in range(1, 3)
+ ]
+ self.send_and_expect(self.tra_if, pkts, self.tra_if)
+
+ self.assertEqual(p.tra_sa_in.get_err("lost"), 0)
+
+ # skip a sequence number
+ pkts = [
+ (
+ Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+ / p.scapy_tra_sa.encrypt(
+ IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+ seq_num=seq,
+ )
+ )
+ for seq in range(4, 6)
+ ]
+ self.send_and_expect(self.tra_if, pkts, self.tra_if)
+
+ self.assertEqual(p.tra_sa_in.get_err("lost"), 0)
+
+ # the lost packet are counted untill we get up past the first
+ # sizeof(replay_window) packets
+ pkts = [
+ (
+ Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+ / p.scapy_tra_sa.encrypt(
+ IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+ seq_num=seq,
+ )
+ )
+ for seq in range(6, 100)
+ ]
+ self.send_and_expect(self.tra_if, pkts, self.tra_if)
+
+ self.assertEqual(p.tra_sa_in.get_err("lost"), 1)
+
+ # lost of holes in the sequence
+ pkts = [
+ (
+ Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+ / p.scapy_tra_sa.encrypt(
+ IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+ seq_num=seq,
+ )
+ )
+ for seq in range(100, 200, 2)
+ ]
+ self.send_and_expect(self.tra_if, pkts, self.tra_if, n_rx=50)
+
+ pkts = [
+ (
+ Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+ / p.scapy_tra_sa.encrypt(
+ IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+ seq_num=seq,
+ )
+ )
+ for seq in range(200, 300)
+ ]
+ self.send_and_expect(self.tra_if, pkts, self.tra_if)
+
+ self.assertEqual(p.tra_sa_in.get_err("lost"), 51)
+
+ # a big hole in the seq number space
+ pkts = [
+ (
+ Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+ / p.scapy_tra_sa.encrypt(
+ IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+ seq_num=seq,
+ )
+ )
+ for seq in range(400, 500)
+ ]
+ self.send_and_expect(self.tra_if, pkts, self.tra_if)
+
+ self.assertEqual(p.tra_sa_in.get_err("lost"), 151)
+
+ def verify_tra_basic4(self, count=1, payload_size=54):
+ """ipsec v4 transport basic test"""
+ self.vapi.cli("clear errors")
+ self.vapi.cli("clear ipsec sa")
+ try:
+ p = self.params[socket.AF_INET]
+ send_pkts = self.gen_encrypt_pkts(
+ p,
+ p.scapy_tra_sa,
+ self.tra_if,
+ src=self.tra_if.remote_ip4,
+ dst=self.tra_if.local_ip4,
+ count=count,
+ payload_size=payload_size,
+ )
+ recv_pkts = self.send_and_expect(self.tra_if, send_pkts, self.tra_if)
+ for rx in recv_pkts:
+ self.assertEqual(len(rx) - len(Ether()), rx[IP].len)
+ self.assert_packet_checksums_valid(rx)
+ try:
+ decrypted = p.vpp_tra_sa.decrypt(rx[IP])
+ self.assert_packet_checksums_valid(decrypted)
+ except:
+ self.logger.debug(ppp("Unexpected packet:", rx))
+ raise
+ finally:
+ self.logger.info(self.vapi.ppcli("show error"))
+ self.logger.info(self.vapi.ppcli("show ipsec all"))
+
+ pkts = p.tra_sa_in.get_stats()["packets"]
+ self.assertEqual(
+ pkts, count, "incorrect SA in counts: expected %d != %d" % (count, pkts)
+ )
+ pkts = p.tra_sa_out.get_stats()["packets"]
+ self.assertEqual(
+ pkts, count, "incorrect SA out counts: expected %d != %d" % (count, pkts)
+ )
+ self.assertEqual(p.tra_sa_out.get_err("lost"), 0)
+ self.assertEqual(p.tra_sa_in.get_err("lost"), 0)
+
+ self.assert_packet_counter_equal(self.tra4_encrypt_node_name, count)
+ self.assert_packet_counter_equal(self.tra4_decrypt_node_name[0], count)
+
+ def _verify_tra_anti_replay_algorithm_esn(self):
+ def seq_num(seqh, seql):
+ return (seqh << 32) | (seql & 0xFFFF_FFFF)
+
+ p = self.params[socket.AF_INET]
+ anti_replay_window_size = p.anti_replay_window_size
+
+ seq_cycle_node_name = "/err/%s/seq_cycled" % self.tra4_encrypt_node_name
+ replay_count = self.get_replay_counts(p)
+ hash_failed_count = self.get_hash_failed_counts(p)
+ seq_cycle_count = self.statistics.get_err_counter(seq_cycle_node_name)
+
+ if ESP == self.encryption_type:
+ undersize_node_name = "/err/%s/runt" % self.tra4_decrypt_node_name[0]
+ undersize_count = self.statistics.get_err_counter(undersize_node_name)
+
+ # reset the TX SA to avoid conflict with left configuration
+ self.vapi.cli(f"test ipsec sa {p.vpp_tra_sa_id} seq 0x0")
+
+ """
+ RFC 4303 Appendix A2. Case A
+
+ |: new Th marker
+ a-i: possible seq num received
+ +: Bl, Tl, Bl', Tl'
+ [BT]l(sign) = [BT]l (sign) 2^32 mod 2^32 (Th inc/dec-remented by 1)
+
+ Th - 1 Th Th + 1
+ --|--a--+---b---+-c--|--d--+---e---+-f--|--g--+---h---+--i-|--
+ ========= ========= =========
+ Bl- Tl- Bl Tl Bl+ Tl+
+
+ Case A implies Tl >= W - 1
+ """
+
+ Th = 1
+ Tl = anti_replay_window_size + 40
+ Bl = Tl - anti_replay_window_size + 1
+
+ # move VPP's RX AR window to Case A
+ self.vapi.cli(f"test ipsec sa {p.scapy_tra_sa_id} seq {seq_num(Th, Tl):#x}")
+ p.scapy_tra_sa.seq_num = seq_num(Th, Tl)
+
+ """
+ case a: Seql < Bl
+ - pre-crypto check: algorithm predicts that the packet wrap the window
+ -> Seqh = Th + 1
+ - integrity check: should fail
+ - post-crypto check: ...
+ """
+ pkts = [
+ (
+ Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+ / p.scapy_tra_sa.encrypt(
+ IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+ seq_num=seq,
+ )
+ )
+ for seq in range(seq_num(Th - 1, Bl - 20), seq_num(Th - 1, Bl - 5))
+ ]
+
+ # out-of-window packets fail integrity check
+ self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
+ hash_failed_count += len(pkts)
+ self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
+
+ """
+ case b: Bl <= Seql <= Tl
+ - pre-crypto check: algorithm predicts that the packet is in the window
+ -> Seqh = Th
+ -> check for a replayed packet with Seql
+ - integrity check: should fail
+ - post-crypto check: ...
+ """
+ pkts = [
+ (
+ Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+ / p.scapy_tra_sa.encrypt(
+ IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+ seq_num=seq,
+ )
+ )
+ for seq in range(seq_num(Th, Tl - 10), seq_num(Th, Tl - 5))
+ ]
+ self.send_and_expect(self.tra_if, pkts, self.tra_if)
+
+ p.scapy_tra_sa.seq_num = seq_num(Th - 1, Tl)
+ pkts = [
+ (
+ Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+ / p.scapy_tra_sa.encrypt(
+ IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+ seq_num=seq,
+ )
+ )
+ for seq in range(seq_num(Th - 1, Tl - 35), seq_num(Th - 1, Tl - 5))
+ ]
+
+ self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
+
+ # some packets are rejected by the pre-crypto check
+ replay_count += 5
+ self.assertEqual(self.get_replay_counts(p), replay_count)
+
+ # out-of-window packets fail integrity check
+ hash_failed_count += len(pkts) - 5
+ self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
+
+ """
+ case c: Seql > Tl
+ - pre-crypto check: algorithm predicts that the packet does not wrap the window
+ -> Seqh = Th
+ - integrity check: should fail
+ - post-crypto check: ...
+ """
+ pkts = [
+ (
+ Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+ / p.scapy_tra_sa.encrypt(
+ IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+ seq_num=seq,
+ )
+ )
+ for seq in range(seq_num(Th - 1, Tl + 5), seq_num(Th - 1, Tl + 20))
+ ]
+
+ self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
+
+ # out-of-window packets fail integrity check
+ hash_failed_count += len(pkts)
+ self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
+
+ """
+ case d: Seql < Bl
+ - pre-crypto check: algorithm predicts that the packet wrap the window
+ -> Seqh = Th + 1
+ - integrity check: should fail
+ - post-crypto check: ...
+ """
+ p.scapy_tra_sa.seq_num = seq_num(Th, Tl)
+ pkts = [
+ (
+ Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+ / p.scapy_tra_sa.encrypt(
+ IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+ seq_num=seq,
+ )
+ )
+ for seq in range(seq_num(Th, Bl - 20), seq_num(Th, Bl - 5))
+ ]
+
+ # out-of-window packets fail integrity check
+ self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
+ hash_failed_count += len(pkts)
+ self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
+
+ """
+ case e: Bl <= Seql <= Tl
+ - pre-crypto check: algorithm predicts that the packet is in the window
+ -> Seqh = Th
+ -> check for a replayed packet with Seql
+ - integrity check: should pass
+ - post-crypto check: should pass
+ -> Seql is marked in the AR window
+ """
+ pkts = [
+ (
+ Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+ / p.scapy_tra_sa.encrypt(
+ IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+ seq_num=seq,
+ )
+ )
+ for seq in range(seq_num(Th, Bl + 10), seq_num(Th, Bl + 30))
+ ]
+
+ self.send_and_expect(self.tra_if, pkts, self.tra_if)
+
+ """
+ case f: Seql > Tl
+ - pre-crypto check: algorithm predicts that the packet does not wrap the window
+ -> Seqh = Th
+ - integrity check: should pass
+ - post-crypto check: should pass
+ -> AR window shift (the window stays Case A)
+ -> Seql is marked in the AR window
+ """
+ pkts = [
+ (
+ Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+ / p.scapy_tra_sa.encrypt(
+ IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+ seq_num=seq,
+ )
+ )
+ for seq in range(seq_num(Th, Tl + 50), seq_num(Th, Tl + 60))
+ ]
+
+ self.send_and_expect(self.tra_if, pkts, self.tra_if)
+
+ """
+ case g: Seql < Bl
+ - pre-crypto check: algorithm predicts that the packet wrap the window
+ -> Seqh = Th + 1
+ - integrity check: should pass
+ - post-crypto check: should pass
+ -> AR window shift (may set the window in Case B)
+ -> Seql is marked in the AR window
+ """
+ p.scapy_tra_sa.seq_num = seq_num(Th + 1, Tl)
+ pkts = [
+ (
+ Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+ / p.scapy_tra_sa.encrypt(
+ IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+ seq_num=seq,
+ )
+ )
+ # set the window in Case B (the minimum window size is 64
+ # so we are sure to overlap)
+ for seq in range(seq_num(Th + 1, 10), seq_num(Th + 1, 20))
+ ]
+
+ self.send_and_expect(self.tra_if, pkts, self.tra_if)
+
+ # reset the VPP's RX AR window to Case A
+ Th = 1
+ Tl = 2 * anti_replay_window_size + 40
+ Bl = Tl - anti_replay_window_size + 1
+
+ self.vapi.cli(f"test ipsec sa {p.scapy_tra_sa_id} seq {seq_num(Th, Tl):#x}")
+
+ p.scapy_tra_sa.seq_num = seq_num(Th + 1, Tl)
+ pkts = [
+ (
+ Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+ / p.scapy_tra_sa.encrypt(
+ IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+ seq_num=seq,
+ )
+ )
+ # the AR will stay in Case A
+ for seq in range(
+ seq_num(Th + 1, anti_replay_window_size + 10),
+ seq_num(Th + 1, anti_replay_window_size + 20),
+ )
+ ]
+
+ self.send_and_expect(self.tra_if, pkts, self.tra_if)
+
+ """
+ case h: Bl <= Seql <= Tl
+ - pre-crypto check: algorithm predicts that the packet is in the window
+ -> Seqh = Th
+ -> check for a replayed packet with Seql
+ - integrity check: the wrap is not detected, should fail
+ - post-crypto check: ...
+ """
+ Th += 1
+ Tl = anti_replay_window_size + 20
+ Bl = Tl - anti_replay_window_size + 1
+
+ p.scapy_tra_sa.seq_num = seq_num(Th + 1, Tl)
+
+ pkts = [
+ (
+ Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+ / p.scapy_tra_sa.encrypt(
+ IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+ seq_num=seq,
+ )
+ )
+ for seq in range(seq_num(Th + 1, Tl - 20), seq_num(Th + 1, Tl - 5))
+ ]
+
+ self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
+
+ # some packets are rejected by the pre-crypto check
+ replay_count += 5
+ self.assertEqual(self.get_replay_counts(p), replay_count)
+
+ # out-of-window packets fail integrity check
+ hash_failed_count += len(pkts) - 5
+ self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
+
+ """
+ case i: Seql > Tl
+ - pre-crypto check: algorithm predicts that the packet does not wrap the window
+ -> Seqh = Th
+ - integrity check: the wrap is not detected, shoud fail
+ - post-crypto check: ...
+ """
+ pkts = [
+ (
+ Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+ / p.scapy_tra_sa.encrypt(
+ IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+ seq_num=seq,
+ )
+ )
+ for seq in range(seq_num(Th + 1, Tl + 5), seq_num(Th + 1, Tl + 15))
+ ]
+
+ # out-of-window packets fail integrity check
+ self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
+ hash_failed_count += len(pkts)
+ self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
+
+ """
+ RFC 4303 Appendix A2. Case B
+
+ Th - 1 Th Th + 1
+ ----|-a-+-----b----+--c--|-d-+----e-----+--f--|-g-+--h---
+ ========= =========== ===========
+ Tl- Bl Tl Bl+ Tl+
+
+ Case B implies Tl < W - 1
+ """
+
+ # reset the VPP's RX AR window to Case B
+ Th = 2
+ Tl = 30 # minimum window size of 64, we are sure to overlap
+ Bl = (Tl - anti_replay_window_size + 1) % (1 << 32)
+
+ self.vapi.cli(f"test ipsec sa {p.scapy_tra_sa_id} seq {seq_num(Th, Tl):#x}")
+ p.scapy_tra_sa.seq_num = seq_num(Th, Tl)
+
+ """
+ case a: Seql <= Tl < Bl
+ - pre-crypto check: algorithm predicts that the packet is in the window
+ -> Seqh = Th
+ -> check for replayed packet
+ - integrity check: should fail
+ - post-crypto check: ...
+ """
+ pkts = [
+ (
+ Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+ / p.scapy_tra_sa.encrypt(
+ IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+ seq_num=seq,
+ )
+ )
+ for seq in range(seq_num(Th, 5), seq_num(Th, 10))
+ ]
+
+ self.send_and_expect(self.tra_if, pkts, self.tra_if)
+
+ p.scapy_tra_sa.seq_num = seq_num(Th - 1, Tl)
+ pkts = [
+ (
+ Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+ / p.scapy_tra_sa.encrypt(
+ IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+ seq_num=seq,
+ )
+ )
+ for seq in range(seq_num(Th - 1, 0), seq_num(Th - 1, 15))
+ ]
+
+ self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
+
+ # some packets are rejected by the pre-crypto check
+ replay_count += 5
+ self.assertEqual(self.get_replay_counts(p), replay_count)
+
+ # out-of-window packets fail integrity check
+ hash_failed_count += len(pkts) - 5
+ self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
+
+ """
+ case b: Tl < Seql < Bl
+ - pre-crypto check: algorithm predicts that the packet will shift the window
+ -> Seqh = Th
+ - integrity check: should fail
+ - post-crypto check: ...
+ """
+ pkts = [
+ (
+ Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+ / p.scapy_tra_sa.encrypt(
+ IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+ seq_num=seq,
+ )
+ )
+ for seq in range(seq_num(Th - 1, Tl + 10), seq_num(Th - 1, Tl + 20))
+ ]
+
+ self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
+
+ # out-of-window packets fail integrity check
+ hash_failed_count += len(pkts)
+ self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
+
+ """
+ case c: Tl < Bl <= Seql
+ - pre-crypto check: algorithm predicts that the packet is in the window
+ -> Seqh = Th - 1
+ -> check for a replayed packet with Seql
+ - integrity check: should pass
+ - post-crypto check: should pass
+ -> Seql is marked in the AR window
+ """
+ pkts = [
+ (
+ Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+ / p.scapy_tra_sa.encrypt(
+ IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+ seq_num=seq,
+ )
+ )
+ for seq in range(seq_num(Th - 1, Bl + 10), seq_num(Th - 1, Bl + 20))
+ ]
+
+ self.send_and_expect(self.tra_if, pkts, self.tra_if)
+
+ """
+ case d: Seql <= Tl < Bl
+ - pre-crypto check: algorithm predicts that the packet is the window
+ -> Seqh = Th
+ -> check for replayed packet
+ - integrity check: should pass
+ - post-crypto check: should pass
+ -> Seql is marked in the AR window
+ """
+ p.scapy_tra_sa.seq_num = seq_num(Th, Tl)
+ pkts = [
+ (
+ Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+ / p.scapy_tra_sa.encrypt(
+ IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+ seq_num=seq,
+ )
+ )
+ for seq in range(seq_num(Th, 15), seq_num(Th, 25))
+ ]
+
+ self.send_and_expect(self.tra_if, pkts, self.tra_if)
+
+ """
+ case e: Tl < Seql < Bl
+ - pre-crypto check: algorithm predicts that the packet is in the window
+ -> Seqh = Th
+ -> check for a replayed packet with Seql
+ - integrity check: should pass
+ - post-crypto check: should pass
+ -> AR window shift (may set the window in Case A)
+ -> Seql is marked in the AR window
+ """
+ pkts = [
+ (
+ Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+ / p.scapy_tra_sa.encrypt(
+ IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+ seq_num=seq,
+ )
+ )
+ for seq in range(seq_num(Th, Tl + 5), seq_num(Th, Tl + 15))
+ ]
+
+ # the window stays in Case B
+ self.send_and_expect(self.tra_if, pkts, self.tra_if)
+
+ pkts = [
+ (
+ Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+ / p.scapy_tra_sa.encrypt(
+ IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+ seq_num=seq,
+ )
+ )
+ for seq in range(
+ seq_num(Th, Tl + anti_replay_window_size + 5),
+ seq_num(Th, Tl + anti_replay_window_size + 15),
+ )
+ ]
+
+ # the window moves to Case A
+ self.send_and_expect(self.tra_if, pkts, self.tra_if)
+
+ # reset the VPP's RX AR window to Case B
+ Th = 2
+ Tl = 30 # minimum window size of 64, we are sure to overlap
+ Bl = (Tl - anti_replay_window_size + 1) % (1 << 32)
+
+ self.vapi.cli(f"test ipsec sa {p.scapy_tra_sa_id} seq {seq_num(Th, Tl):#x}")
+ p.scapy_tra_sa.seq_num = seq_num(Th, Tl)
+
+ """
+ case f: Tl < Bl <= Seql
+ - pre-crypto check: algorithm predicts that the packet is in the previous window
+ -> Seqh = Th - 1
+ -> check for a replayed packet with Seql
+ - integrity check: should fail
+ - post-crypto check: ...
+ """
+ pkts = [
+ (
+ Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+ / p.scapy_tra_sa.encrypt(
+ IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+ seq_num=seq,
+ )
+ )
+ for seq in range(seq_num(Th, Bl + 10), seq_num(Th, Bl + 20))
+ ]
+
+ self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)