+ seq_cycle_node_name = (
+ "/err/%s/sequence number cycled (packet dropped)"
+ % self.tra4_encrypt_node_name
+ )
+ replay_count = self.get_replay_counts(p)
+ hash_failed_count = self.get_hash_failed_counts(p)
+ seq_cycle_count = self.statistics.get_err_counter(seq_cycle_node_name)
+
+ # a few packets so we get the rx seq number above the window size and
+ # thus can simulate a wrap with an out of window packet
+ pkts = [
+ (
+ Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+ / p.scapy_tra_sa.encrypt(
+ IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+ seq_num=seq,
+ )
+ )
+ for seq in range(63, 80)
+ ]
+ recv_pkts = self.send_and_expect(self.tra_if, pkts, self.tra_if)
+
+ # these 4 packets will all choose seq-num 0 to decrpyt since none
+ # are out of window when first checked. however, once #200 has
+ # decrypted it will move the window to 200 and has #81 is out of
+ # window. this packet should be dropped.
+ pkts = [
+ (
+ Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+ / p.scapy_tra_sa.encrypt(
+ IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+ seq_num=200,
+ )
+ ),
+ (
+ Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+ / p.scapy_tra_sa.encrypt(
+ IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+ seq_num=81,
+ )
+ ),
+ (
+ Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+ / p.scapy_tra_sa.encrypt(
+ IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+ seq_num=201,
+ )
+ ),
+ (
+ Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+ / p.scapy_tra_sa.encrypt(
+ IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+ seq_num=202,
+ )
+ ),
+ ]
+
+ # if anti-replay is off then we won't drop #81
+ n_rx = 3 if ar_on else 4
+ self.send_and_expect(self.tra_if, pkts, self.tra_if, n_rx=n_rx)
+ # this packet is one before the wrap
+ pkts = [
+ (
+ Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+ / p.scapy_tra_sa.encrypt(
+ IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+ seq_num=203,
+ )
+ )
+ ]
+ recv_pkts = self.send_and_expect(self.tra_if, pkts, self.tra_if)
+
+ # move the window over half way to a wrap
+ pkts = [
+ (
+ Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+ / p.scapy_tra_sa.encrypt(
+ IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+ seq_num=0x80000001,
+ )
+ )
+ ]
+ recv_pkts = self.send_and_expect(self.tra_if, pkts, self.tra_if)
+
+ # anti-replay will drop old packets, no anti-replay will not
+ pkts = [
+ (
+ Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+ / p.scapy_tra_sa.encrypt(
+ IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+ seq_num=0x44000001,
+ )
+ )
+ ]
+
+ if ar_on:
+ self.send_and_assert_no_replies(self.tra_if, pkts)
+ else:
+ recv_pkts = self.send_and_expect(self.tra_if, pkts, self.tra_if)
+
+ if esn_on:
+ #
+ # validate wrapping the ESN
+ #
+
+ # wrap scapy's TX SA SN
+ p.scapy_tra_sa.seq_num = 0x100000005
+
+ # send a packet that wraps the window for both AR and no AR
+ pkts = [
+ (
+ Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+ / p.scapy_tra_sa.encrypt(
+ IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4)
+ / ICMP(),
+ seq_num=0x100000005,
+ )
+ )
+ ]
+
+ rxs = self.send_and_expect(self.tra_if, pkts, self.tra_if)
+ for rx in rxs:
+ decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
+
+ # move the window forward to half way to the next wrap
+ pkts = [
+ (
+ Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+ / p.scapy_tra_sa.encrypt(
+ IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4)
+ / ICMP(),
+ seq_num=0x180000005,
+ )
+ )
+ ]
+
+ rxs = self.send_and_expect(self.tra_if, pkts, self.tra_if)
+
+ # a packet less than 2^30 from the current position is:
+ # - AR: out of window and dropped
+ # - non-AR: accepted
+ pkts = [
+ (
+ Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+ / p.scapy_tra_sa.encrypt(
+ IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4)
+ / ICMP(),
+ seq_num=0x170000005,
+ )
+ )
+ ]
+
+ if ar_on:
+ self.send_and_assert_no_replies(self.tra_if, pkts)
+ else:
+ self.send_and_expect(self.tra_if, pkts, self.tra_if)
+
+ # a packet more than 2^30 from the current position is:
+ # - AR: out of window and dropped
+ # - non-AR: considered a wrap, but since it's not a wrap
+ # it won't decrpyt and so will be dropped
+ pkts = [
+ (
+ Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+ / p.scapy_tra_sa.encrypt(
+ IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4)
+ / ICMP(),
+ seq_num=0x130000005,
+ )
+ )
+ ]
+
+ self.send_and_assert_no_replies(self.tra_if, pkts)
+
+ # a packet less than 2^30 from the current position and is a
+ # wrap; (the seq is currently at 0x180000005).
+ # - AR: out of window so considered a wrap, so accepted
+ # - non-AR: not considered a wrap, so won't decrypt
+ p.scapy_tra_sa.seq_num = 0x260000005
+ pkts = [
+ (
+ Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+ / p.scapy_tra_sa.encrypt(
+ IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4)
+ / ICMP(),
+ seq_num=0x260000005,
+ )
+ )
+ ]
+ if ar_on:
+ self.send_and_expect(self.tra_if, pkts, self.tra_if)
+ else:
+ self.send_and_assert_no_replies(self.tra_if, pkts)
+
+ #
+ # window positions are different now for AR/non-AR
+ # move non-AR forward
+ #
+ if not ar_on:
+ # a packet more than 2^30 from the current position and is a
+ # wrap; (the seq is currently at 0x180000005).
+ # - AR: accepted
+ # - non-AR: not considered a wrap, so won't decrypt
+
+ pkts = [
+ (
+ Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+ / p.scapy_tra_sa.encrypt(
+ IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4)
+ / ICMP(),
+ seq_num=0x200000005,
+ )
+ ),
+ (
+ Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+ / p.scapy_tra_sa.encrypt(
+ IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4)
+ / ICMP(),
+ seq_num=0x200000006,
+ )
+ ),
+ ]
+ self.send_and_expect(self.tra_if, pkts, self.tra_if)
+
+ pkts = [
+ (
+ Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+ / p.scapy_tra_sa.encrypt(
+ IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4)
+ / ICMP(),
+ seq_num=0x260000005,
+ )
+ )
+ ]
+ self.send_and_expect(self.tra_if, pkts, self.tra_if)
+
+ def verify_tra_anti_replay(self):
+ p = self.params[socket.AF_INET]
+ esn_en = p.vpp_tra_sa.esn_en
+
+ seq_cycle_node_name = (
+ "/err/%s/sequence number cycled (packet dropped)"
+ % self.tra4_encrypt_node_name
+ )
+ replay_count = self.get_replay_counts(p)
+ hash_failed_count = self.get_hash_failed_counts(p)
+ seq_cycle_count = self.statistics.get_err_counter(seq_cycle_node_name)
+
+ if ESP == self.encryption_type:
+ undersize_node_name = (
+ "/err/%s/undersized packet" % self.tra4_decrypt_node_name[0]
+ )
+ undersize_count = self.statistics.get_err_counter(undersize_node_name)
+
+ #
+ # send packets with seq numbers 1->34
+ # this means the window size is still in Case B (see RFC4303
+ # Appendix A)
+ #
+ # for reasons i haven't investigated Scapy won't create a packet with
+ # seq_num=0
+ #
+ pkts = [
+ (
+ Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+ / p.scapy_tra_sa.encrypt(
+ IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+ seq_num=seq,
+ )
+ )
+ for seq in range(1, 34)
+ ]
+ recv_pkts = self.send_and_expect(self.tra_if, pkts, self.tra_if)
+
+ # replayed packets are dropped
+ self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
+ replay_count += len(pkts)
+ self.assertEqual(self.get_replay_counts(p), replay_count)
+
+ #
+ # now send a batch of packets all with the same sequence number
+ # the first packet in the batch is legitimate, the rest bogus
+ #
+ self.vapi.cli("clear error")
+ self.vapi.cli("clear node counters")
+ pkts = Ether(
+ src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
+ ) / p.scapy_tra_sa.encrypt(
+ IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+ seq_num=35,
+ )
+ recv_pkts = self.send_and_expect(self.tra_if, pkts * 8, self.tra_if, n_rx=1)
+ replay_count += 7
+ self.assertEqual(self.get_replay_counts(p), replay_count)