+ self.send_and_expect(self.tra_if, pkts, self.tra_if)
+
+ """
+ case f: Seql > Tl
+ - pre-crypto check: algorithm predicts that the packet does not wrap the window
+ -> Seqh = Th
+ - integrity check: should pass
+ - post-crypto check: should pass
+ -> AR window shift (the window stays Case A)
+ -> Seql is marked in the AR window
+ """
+ pkts = [
+ (
+ Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+ / p.scapy_tra_sa.encrypt(
+ IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+ seq_num=seq,
+ )
+ )
+ for seq in range(seq_num(Th, Tl + 50), seq_num(Th, Tl + 60))
+ ]
+
+ self.send_and_expect(self.tra_if, pkts, self.tra_if)
+
+ """
+ case g: Seql < Bl
+ - pre-crypto check: algorithm predicts that the packet wrap the window
+ -> Seqh = Th + 1
+ - integrity check: should pass
+ - post-crypto check: should pass
+ -> AR window shift (may set the window in Case B)
+ -> Seql is marked in the AR window
+ """
+ p.scapy_tra_sa.seq_num = seq_num(Th + 1, Tl)
+ pkts = [
+ (
+ Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+ / p.scapy_tra_sa.encrypt(
+ IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+ seq_num=seq,
+ )
+ )
+ # set the window in Case B (the minimum window size is 64
+ # so we are sure to overlap)
+ for seq in range(seq_num(Th + 1, 10), seq_num(Th + 1, 20))
+ ]
+
+ self.send_and_expect(self.tra_if, pkts, self.tra_if)
+
+ # reset the VPP's RX AR window to Case A
+ Th = 1
+ Tl = 2 * anti_replay_window_size + 40
+ Bl = Tl - anti_replay_window_size + 1
+
+ self.vapi.cli(f"test ipsec sa {p.scapy_tra_sa_id} seq {seq_num(Th, Tl):#x}")
+
+ p.scapy_tra_sa.seq_num = seq_num(Th + 1, Tl)
+ pkts = [
+ (
+ Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+ / p.scapy_tra_sa.encrypt(
+ IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+ seq_num=seq,
+ )
+ )
+ # the AR will stay in Case A
+ for seq in range(
+ seq_num(Th + 1, anti_replay_window_size + 10),
+ seq_num(Th + 1, anti_replay_window_size + 20),
+ )
+ ]
+
+ self.send_and_expect(self.tra_if, pkts, self.tra_if)
+
+ """
+ case h: Bl <= Seql <= Tl
+ - pre-crypto check: algorithm predicts that the packet is in the window
+ -> Seqh = Th
+ -> check for a replayed packet with Seql
+ - integrity check: the wrap is not detected, should fail
+ - post-crypto check: ...
+ """
+ Th += 1
+ Tl = anti_replay_window_size + 20
+ Bl = Tl - anti_replay_window_size + 1
+
+ p.scapy_tra_sa.seq_num = seq_num(Th + 1, Tl)
+
+ pkts = [
+ (
+ Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+ / p.scapy_tra_sa.encrypt(
+ IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+ seq_num=seq,
+ )
+ )
+ for seq in range(seq_num(Th + 1, Tl - 20), seq_num(Th + 1, Tl - 5))
+ ]
+
+ self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
+
+ # some packets are rejected by the pre-crypto check
+ replay_count += 5
+ self.assertEqual(self.get_replay_counts(p), replay_count)
+
+ # out-of-window packets fail integrity check
+ hash_failed_count += len(pkts) - 5
+ self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
+
+ """
+ case i: Seql > Tl
+ - pre-crypto check: algorithm predicts that the packet does not wrap the window
+ -> Seqh = Th
+ - integrity check: the wrap is not detected, shoud fail
+ - post-crypto check: ...
+ """
+ pkts = [
+ (
+ Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+ / p.scapy_tra_sa.encrypt(
+ IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+ seq_num=seq,
+ )
+ )
+ for seq in range(seq_num(Th + 1, Tl + 5), seq_num(Th + 1, Tl + 15))
+ ]
+
+ # out-of-window packets fail integrity check
+ self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
+ hash_failed_count += len(pkts)
+ self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
+
+ """
+ RFC 4303 Appendix A2. Case B
+
+ Th - 1 Th Th + 1
+ ----|-a-+-----b----+--c--|-d-+----e-----+--f--|-g-+--h---
+ ========= =========== ===========
+ Tl- Bl Tl Bl+ Tl+
+
+ Case B implies Tl < W - 1
+ """
+
+ # reset the VPP's RX AR window to Case B
+ Th = 2
+ Tl = 30 # minimum window size of 64, we are sure to overlap
+ Bl = (Tl - anti_replay_window_size + 1) % (1 << 32)
+
+ self.vapi.cli(f"test ipsec sa {p.scapy_tra_sa_id} seq {seq_num(Th, Tl):#x}")
+ p.scapy_tra_sa.seq_num = seq_num(Th, Tl)
+
+ """
+ case a: Seql <= Tl < Bl
+ - pre-crypto check: algorithm predicts that the packet is in the window
+ -> Seqh = Th
+ -> check for replayed packet
+ - integrity check: should fail
+ - post-crypto check: ...
+ """
+ pkts = [
+ (
+ Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+ / p.scapy_tra_sa.encrypt(
+ IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+ seq_num=seq,
+ )
+ )
+ for seq in range(seq_num(Th, 5), seq_num(Th, 10))
+ ]
+
+ self.send_and_expect(self.tra_if, pkts, self.tra_if)
+
+ p.scapy_tra_sa.seq_num = seq_num(Th - 1, Tl)
+ pkts = [
+ (
+ Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+ / p.scapy_tra_sa.encrypt(
+ IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+ seq_num=seq,
+ )
+ )
+ for seq in range(seq_num(Th - 1, 0), seq_num(Th - 1, 15))
+ ]
+
+ self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
+
+ # some packets are rejected by the pre-crypto check
+ replay_count += 5
+ self.assertEqual(self.get_replay_counts(p), replay_count)
+
+ # out-of-window packets fail integrity check
+ hash_failed_count += len(pkts) - 5
+ self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
+
+ """
+ case b: Tl < Seql < Bl
+ - pre-crypto check: algorithm predicts that the packet will shift the window
+ -> Seqh = Th
+ - integrity check: should fail
+ - post-crypto check: ...
+ """
+ pkts = [
+ (
+ Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+ / p.scapy_tra_sa.encrypt(
+ IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+ seq_num=seq,
+ )
+ )
+ for seq in range(seq_num(Th - 1, Tl + 10), seq_num(Th - 1, Tl + 20))
+ ]
+
+ self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
+
+ # out-of-window packets fail integrity check
+ hash_failed_count += len(pkts)
+ self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
+
+ """
+ case c: Tl < Bl <= Seql
+ - pre-crypto check: algorithm predicts that the packet is in the window
+ -> Seqh = Th - 1
+ -> check for a replayed packet with Seql
+ - integrity check: should pass
+ - post-crypto check: should pass
+ -> Seql is marked in the AR window
+ """
+ pkts = [
+ (
+ Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+ / p.scapy_tra_sa.encrypt(
+ IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+ seq_num=seq,
+ )
+ )
+ for seq in range(seq_num(Th - 1, Bl + 10), seq_num(Th - 1, Bl + 20))
+ ]
+
+ self.send_and_expect(self.tra_if, pkts, self.tra_if)
+
+ """
+ case d: Seql <= Tl < Bl
+ - pre-crypto check: algorithm predicts that the packet is the window
+ -> Seqh = Th
+ -> check for replayed packet
+ - integrity check: should pass
+ - post-crypto check: should pass
+ -> Seql is marked in the AR window
+ """
+ p.scapy_tra_sa.seq_num = seq_num(Th, Tl)
+ pkts = [
+ (
+ Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+ / p.scapy_tra_sa.encrypt(
+ IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+ seq_num=seq,
+ )
+ )
+ for seq in range(seq_num(Th, 15), seq_num(Th, 25))
+ ]
+
+ self.send_and_expect(self.tra_if, pkts, self.tra_if)
+
+ """
+ case e: Tl < Seql < Bl
+ - pre-crypto check: algorithm predicts that the packet is in the window
+ -> Seqh = Th
+ -> check for a replayed packet with Seql
+ - integrity check: should pass
+ - post-crypto check: should pass
+ -> AR window shift (may set the window in Case A)
+ -> Seql is marked in the AR window
+ """
+ pkts = [
+ (
+ Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+ / p.scapy_tra_sa.encrypt(
+ IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+ seq_num=seq,
+ )
+ )
+ for seq in range(seq_num(Th, Tl + 5), seq_num(Th, Tl + 15))
+ ]
+
+ # the window stays in Case B
+ self.send_and_expect(self.tra_if, pkts, self.tra_if)
+
+ pkts = [
+ (
+ Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+ / p.scapy_tra_sa.encrypt(
+ IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+ seq_num=seq,
+ )
+ )
+ for seq in range(
+ seq_num(Th, Tl + anti_replay_window_size + 5),
+ seq_num(Th, Tl + anti_replay_window_size + 15),
+ )
+ ]
+
+ # the window moves to Case A
+ self.send_and_expect(self.tra_if, pkts, self.tra_if)
+
+ # reset the VPP's RX AR window to Case B
+ Th = 2
+ Tl = 30 # minimum window size of 64, we are sure to overlap
+ Bl = (Tl - anti_replay_window_size + 1) % (1 << 32)
+
+ self.vapi.cli(f"test ipsec sa {p.scapy_tra_sa_id} seq {seq_num(Th, Tl):#x}")
+ p.scapy_tra_sa.seq_num = seq_num(Th, Tl)
+
+ """
+ case f: Tl < Bl <= Seql
+ - pre-crypto check: algorithm predicts that the packet is in the previous window
+ -> Seqh = Th - 1
+ -> check for a replayed packet with Seql
+ - integrity check: should fail
+ - post-crypto check: ...
+ """
+ pkts = [
+ (
+ Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+ / p.scapy_tra_sa.encrypt(
+ IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+ seq_num=seq,
+ )
+ )
+ for seq in range(seq_num(Th, Bl + 10), seq_num(Th, Bl + 20))
+ ]
+
+ self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
+
+ # out-of-window packets fail integrity check
+ hash_failed_count += len(pkts)
+ self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
+
+ """
+ case g: Seql <= Tl < Bl
+ - pre-crypto check: algorithm predicts that the packet is the window
+ -> Seqh = Th
+ -> check for replayed packet
+ - integrity check: should fail
+ - post-crypto check: ...
+ """
+ pkts = [
+ (
+ Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+ / p.scapy_tra_sa.encrypt(
+ IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+ seq_num=seq,
+ )
+ )
+ for seq in range(seq_num(Th, 10), seq_num(Th, 15))
+ ]
+
+ self.send_and_expect(self.tra_if, pkts, self.tra_if)
+
+ p.scapy_tra_sa.seq_num = seq_num(Th + 1, Tl)
+ pkts = [
+ (
+ Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+ / p.scapy_tra_sa.encrypt(
+ IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+ seq_num=seq,
+ )
+ )
+ for seq in range(seq_num(Th + 1, 0), seq_num(Th + 1, 15))
+ ]
+
+ self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
+
+ # some packets are rejected by the pre-crypto check
+ replay_count += 5
+ self.assertEqual(self.get_replay_counts(p), replay_count)
+
+ # out-of-window packets fail integrity check
+ hash_failed_count += len(pkts) - 5
+ self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
+
+ """
+ case h: Tl < Seql < Bl
+ - pre-crypto check: algorithm predicts that the packet will shift the window
+ -> Seqh = Th
+ - integrity check: should fail
+ - post-crypto check: ...
+ """
+ pkts = [
+ (
+ Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+ / p.scapy_tra_sa.encrypt(
+ IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+ seq_num=seq,
+ )
+ )
+ for seq in range(seq_num(Th + 1, Tl + 10), seq_num(Th + 1, Tl + 20))
+ ]
+
+ self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
+
+ # out-of-window packets fail integrity check
+ hash_failed_count += len(pkts)
+ self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
+
+ def _verify_tra_anti_replay_algorithm_no_esn(self):
+ def seq_num(seql):
+ return seql & 0xFFFF_FFFF
+
+ p = self.params[socket.AF_INET]
+ anti_replay_window_size = p.anti_replay_window_size
+
+ seq_cycle_node_name = "/err/%s/seq_cycled" % self.tra4_encrypt_node_name
+ replay_count = self.get_replay_counts(p)
+ hash_failed_count = self.get_hash_failed_counts(p)
+ seq_cycle_count = self.statistics.get_err_counter(seq_cycle_node_name)
+
+ if ESP == self.encryption_type:
+ undersize_node_name = "/err/%s/runt" % self.tra4_decrypt_node_name[0]
+ undersize_count = self.statistics.get_err_counter(undersize_node_name)
+
+ # reset the TX SA to avoid conflict with left configuration
+ self.vapi.cli(f"test ipsec sa {p.vpp_tra_sa_id} seq 0x0")
+
+ """
+ RFC 4303 Appendix A2. Case A
+
+ a-c: possible seq num received
+ +: Bl, Tl
+
+ |--a--+---b---+-c--|
+ =========
+ Bl Tl
+
+ No ESN implies Th = 0
+ Case A implies Tl >= W - 1
+ """
+
+ Tl = anti_replay_window_size + 40
+ Bl = Tl - anti_replay_window_size + 1
+
+ # move VPP's RX AR window to Case A
+ self.vapi.cli(f"test ipsec sa {p.scapy_tra_sa_id} seq {seq_num(Tl):#x}")
+ p.scapy_tra_sa.seq_num = seq_num(Tl)
+
+ """
+ case a: Seql < Bl
+ - pre-crypto check: algorithm predicts that the packet is out of window
+ -> packet should be dropped
+ - integrity check: ...
+ - post-crypto check: ...
+ """
+ pkts = [
+ (
+ Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+ / p.scapy_tra_sa.encrypt(
+ IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+ seq_num=seq,
+ )
+ )
+ for seq in range(seq_num(Bl - 20), seq_num(Bl - 5))
+ ]
+
+ # out-of-window packets
+ self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
+ replay_count += len(pkts)
+ self.assertEqual(self.get_replay_counts(p), replay_count)
+
+ """
+ case b: Bl <= Seql <= Tl
+ - pre-crypto check: algorithm predicts that the packet is in the window
+ -> check for a replayed packet with Seql
+ - integrity check: should pass
+ - post-crypto check:
+ -> check for a replayed packet with Seql
+ """
+ pkts = [
+ (
+ Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+ / p.scapy_tra_sa.encrypt(
+ IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+ seq_num=seq,
+ )
+ )
+ for seq in range(seq_num(Tl - 50), seq_num(Tl - 30))
+ ]
+ self.send_and_expect(self.tra_if, pkts, self.tra_if)
+
+ pkts = [
+ (
+ Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+ / p.scapy_tra_sa.encrypt(
+ IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+ seq_num=seq,
+ )
+ )
+ for seq in range(seq_num(Tl - 35), seq_num(Tl - 30))
+ ]
+
+ self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
+
+ # replayed packets
+ replay_count += 5
+ self.assertEqual(self.get_replay_counts(p), replay_count)
+
+ """
+ case c: Seql > Tl
+ - pre-crypto check: algorithm predicts that the packet will shift the window
+ - integrity check: should pass
+ - post-crypto check: should pass
+ -> AR window is shifted
+ """
+ pkts = [
+ (
+ Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+ / p.scapy_tra_sa.encrypt(
+ IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+ seq_num=seq,
+ )
+ )
+ for seq in range(seq_num(Tl + 5), seq_num(Tl + 20))
+ ]
+
+ self.send_and_expect(self.tra_if, pkts, self.tra_if)
+
+ """
+ RFC 4303 Appendix A2. Case B
+
+ |-a-----+------b-----|
+ =========
+ Tl
+
+ Case B implies Tl < W - 1
+ """
+
+ # reset the VPP's RX AR window to Case B
+ Tl = 30 # minimum window size of 64, we are sure to overlap
+ Bl = seq_num(Tl - anti_replay_window_size + 1)
+
+ self.vapi.cli(f"test ipsec sa {p.scapy_tra_sa_id} seq {seq_num(Tl):#x}")
+
+ """
+ case a: Seql <= Tl < Bl
+ - pre-crypto check: algorithm predicts that the packet is in the window
+ -> check for replayed packet
+ - integrity check: should fail
+ - post-crypto check: ...
+ """
+ pkts = [
+ (
+ Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+ / p.scapy_tra_sa.encrypt(
+ IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+ seq_num=seq,
+ )
+ )
+ for seq in range(seq_num(5), seq_num(10))
+ ]
+
+ self.send_and_expect(self.tra_if, pkts, self.tra_if)
+
+ """
+ case b: Tl < Seql < Bl
+ - pre-crypto check: algorithm predicts that the packet will shift the window
+ - integrity check: should pass
+ - post-crypto check: should pass
+ -> AR window is shifted
+ """
+ pkts = [
+ (
+ Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+ / p.scapy_tra_sa.encrypt(
+ IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+ seq_num=seq,
+ )
+ )
+ for seq in range(seq_num(-50), seq_num(-20))
+ ]
+
+ self.send_and_expect(self.tra_if, pkts, self.tra_if)
+
+ def verify_tra_anti_replay_algorithm(self):
+ if self.params[socket.AF_INET].vpp_tra_sa.esn_en:
+ self._verify_tra_anti_replay_algorithm_esn()
+ else:
+ self._verify_tra_anti_replay_algorithm_no_esn()
+
+
+class IpsecTra4Tests(IpsecTra4):
+ """UT test methods for Transport v4"""
+
+ def test_tra_anti_replay(self):
+ """ipsec v4 transport anti-replay test"""
+ self.verify_tra_anti_replay()
+
+ def test_tra_anti_replay_algorithm(self):
+ """ipsec v4 transport anti-replay algorithm test"""
+ self.verify_tra_anti_replay_algorithm()
+
+ def test_tra_lost(self):
+ """ipsec v4 transport lost packet test"""
+ self.verify_tra_lost()
+
+ def test_tra_basic(self, count=1):
+ """ipsec v4 transport basic test"""
+ self.verify_tra_basic4(count=1)
+
+ def test_tra_burst(self):
+ """ipsec v4 transport burst test"""
+ self.verify_tra_basic4(count=257)
+
+
+class IpsecTra6(object):
+ """verify methods for Transport v6"""
+
+ def verify_tra_basic6(self, count=1, payload_size=54):
+ self.vapi.cli("clear errors")
+ self.vapi.cli("clear ipsec sa")
+ try:
+ p = self.params[socket.AF_INET6]
+ send_pkts = self.gen_encrypt_pkts6(
+ p,
+ p.scapy_tra_sa,
+ self.tra_if,
+ src=self.tra_if.remote_ip6,
+ dst=self.tra_if.local_ip6,
+ count=count,
+ payload_size=payload_size,
+ )
+ recv_pkts = self.send_and_expect(self.tra_if, send_pkts, self.tra_if)
+ for rx in recv_pkts:
+ self.assertEqual(len(rx) - len(Ether()) - len(IPv6()), rx[IPv6].plen)
+ try:
+ decrypted = p.vpp_tra_sa.decrypt(rx[IPv6])
+ self.assert_packet_checksums_valid(decrypted)
+ except:
+ self.logger.debug(ppp("Unexpected packet:", rx))
+ raise
+ finally:
+ self.logger.info(self.vapi.ppcli("show error"))
+ self.logger.info(self.vapi.ppcli("show ipsec all"))
+
+ pkts = p.tra_sa_in.get_stats()["packets"]
+ self.assertEqual(
+ pkts, count, "incorrect SA in counts: expected %d != %d" % (count, pkts)
+ )
+ pkts = p.tra_sa_out.get_stats()["packets"]
+ self.assertEqual(
+ pkts, count, "incorrect SA out counts: expected %d != %d" % (count, pkts)
+ )
+ self.assert_packet_counter_equal(self.tra6_encrypt_node_name, count)
+ self.assert_packet_counter_equal(self.tra6_decrypt_node_name[0], count)
+
+ def gen_encrypt_pkts_ext_hdrs6(
+ self, sa, sw_intf, src, dst, count=1, payload_size=54
+ ):
+ return [
+ Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
+ / sa.encrypt(
+ IPv6(src=src, dst=dst)
+ / ICMPv6EchoRequest(id=0, seq=1, data="X" * payload_size)
+ )
+ for i in range(count)
+ ]
+
+ def gen_pkts_ext_hdrs6(self, sw_intf, src, dst, count=1, payload_size=54):
+ return [
+ Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
+ / IPv6(src=src, dst=dst)
+ / IPv6ExtHdrHopByHop()
+ / IPv6ExtHdrFragment(id=2, offset=200)
+ / Raw(b"\xff" * 200)
+ for i in range(count)
+ ]
+
+ def verify_tra_encrypted6(self, p, sa, rxs):
+ decrypted = []
+ for rx in rxs:
+ self.assert_packet_checksums_valid(rx)
+ try:
+ decrypt_pkt = p.vpp_tra_sa.decrypt(rx[IPv6])
+ decrypted.append(decrypt_pkt)
+ self.assert_equal(decrypt_pkt.src, self.tra_if.local_ip6)
+ self.assert_equal(decrypt_pkt.dst, self.tra_if.remote_ip6)
+ except:
+ self.logger.debug(ppp("Unexpected packet:", rx))
+ try:
+ self.logger.debug(ppp("Decrypted packet:", decrypt_pkt))
+ except:
+ pass
+ raise
+ return decrypted
+
+ def verify_tra_66_ext_hdrs(self, p):
+ count = 63
+
+ #
+ # check we can decrypt with options
+ #
+ tx = self.gen_encrypt_pkts_ext_hdrs6(
+ p.scapy_tra_sa,
+ self.tra_if,
+ src=self.tra_if.remote_ip6,
+ dst=self.tra_if.local_ip6,
+ count=count,
+ )
+ self.send_and_expect(self.tra_if, tx, self.tra_if)
+
+ #
+ # injecting a packet from ourselves to be routed of box is a hack
+ # but it matches an outbout policy, alors je ne regrette rien
+ #
+
+ # one extension before ESP
+ tx = (
+ Ether(src=self.pg2.remote_mac, dst=self.pg2.local_mac)
+ / IPv6(src=self.tra_if.local_ip6, dst=self.tra_if.remote_ip6)
+ / IPv6ExtHdrFragment(id=2, offset=200)
+ / Raw(b"\xff" * 200)
+ )
+
+ rxs = self.send_and_expect(self.pg2, [tx], self.tra_if)
+ dcs = self.verify_tra_encrypted6(p, p.vpp_tra_sa, rxs)
+
+ for dc in dcs:
+ # for reasons i'm not going to investigate scapy does not
+ # created the correct headers after decrypt. but reparsing
+ # the ipv6 packet fixes it
+ dc = IPv6(raw(dc[IPv6]))
+ self.assert_equal(dc[IPv6ExtHdrFragment].id, 2)
+
+ # two extensions before ESP
+ tx = (
+ Ether(src=self.pg2.remote_mac, dst=self.pg2.local_mac)
+ / IPv6(src=self.tra_if.local_ip6, dst=self.tra_if.remote_ip6)
+ / IPv6ExtHdrHopByHop()
+ / IPv6ExtHdrFragment(id=2, offset=200)
+ / Raw(b"\xff" * 200)
+ )
+
+ rxs = self.send_and_expect(self.pg2, [tx], self.tra_if)
+ dcs = self.verify_tra_encrypted6(p, p.vpp_tra_sa, rxs)
+
+ for dc in dcs:
+ dc = IPv6(raw(dc[IPv6]))