+ if esn_on:
+ #
+ # validate wrapping the ESN
+ #
+
+ # wrap scapy's TX SA SN
+ p.scapy_tra_sa.seq_num = 0x100000005
+
+ # send a packet that wraps the window for both AR and no AR
+ pkts = [(Ether(src=self.tra_if.remote_mac,
+ dst=self.tra_if.local_mac) /
+ p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
+ dst=self.tra_if.local_ip4) /
+ ICMP(),
+ seq_num=0x100000005))]
+
+ rxs = self.send_and_expect(self.tra_if, pkts, self.tra_if)
+ for rx in rxs:
+ decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
+
+ # move the window forward to half way to the next wrap
+ pkts = [(Ether(src=self.tra_if.remote_mac,
+ dst=self.tra_if.local_mac) /
+ p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
+ dst=self.tra_if.local_ip4) /
+ ICMP(),
+ seq_num=0x180000005))]
+
+ rxs = self.send_and_expect(self.tra_if, pkts, self.tra_if)
+
+ # a packet less than 2^30 from the current position is:
+ # - AR: out of window and dropped
+ # - non-AR: accepted
+ pkts = [(Ether(src=self.tra_if.remote_mac,
+ dst=self.tra_if.local_mac) /
+ p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
+ dst=self.tra_if.local_ip4) /
+ ICMP(),
+ seq_num=0x170000005))]
+
+ if ar_on:
+ self.send_and_assert_no_replies(self.tra_if, pkts)
+ else:
+ self.send_and_expect(self.tra_if, pkts, self.tra_if)
+
+ # a packet more than 2^30 from the current position is:
+ # - AR: out of window and dropped
+ # - non-AR: considered a wrap, but since it's not a wrap
+ # it won't decrpyt and so will be dropped
+ pkts = [(Ether(src=self.tra_if.remote_mac,
+ dst=self.tra_if.local_mac) /
+ p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
+ dst=self.tra_if.local_ip4) /
+ ICMP(),
+ seq_num=0x130000005))]
+
+ self.send_and_assert_no_replies(self.tra_if, pkts)
+
+ # a packet less than 2^30 from the current position and is a
+ # wrap; (the seq is currently at 0x180000005).
+ # - AR: out of window so considered a wrap, so accepted
+ # - non-AR: not considered a wrap, so won't decrypt
+ p.scapy_tra_sa.seq_num = 0x260000005
+ pkts = [(Ether(src=self.tra_if.remote_mac,
+ dst=self.tra_if.local_mac) /
+ p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
+ dst=self.tra_if.local_ip4) /
+ ICMP(),
+ seq_num=0x260000005))]
+ if ar_on:
+ self.send_and_expect(self.tra_if, pkts, self.tra_if)
+ else:
+ self.send_and_assert_no_replies(self.tra_if, pkts)
+
+ #
+ # window positions are different now for AR/non-AR
+ # move non-AR forward
+ #
+ if not ar_on:
+ # a packet more than 2^30 from the current position and is a
+ # wrap; (the seq is currently at 0x180000005).
+ # - AR: accepted
+ # - non-AR: not considered a wrap, so won't decrypt
+
+ pkts = [(Ether(src=self.tra_if.remote_mac,
+ dst=self.tra_if.local_mac) /
+ p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
+ dst=self.tra_if.local_ip4) /
+ ICMP(),
+ seq_num=0x200000005)),
+ (Ether(src=self.tra_if.remote_mac,
+ dst=self.tra_if.local_mac) /
+ p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
+ dst=self.tra_if.local_ip4) /
+ ICMP(),
+ seq_num=0x200000006))]
+ self.send_and_expect(self.tra_if, pkts, self.tra_if)
+
+ pkts = [(Ether(src=self.tra_if.remote_mac,
+ dst=self.tra_if.local_mac) /
+ p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
+ dst=self.tra_if.local_ip4) /
+ ICMP(),
+ seq_num=0x260000005))]
+ self.send_and_expect(self.tra_if, pkts, self.tra_if)
+
+ def verify_tra_anti_replay(self):
+ p = self.params[socket.AF_INET]
+ esn_en = p.vpp_tra_sa.esn_en
+
+ seq_cycle_node_name = \
+ ('/err/%s/sequence number cycled (packet dropped)' %
+ self.tra4_encrypt_node_name)
+ replay_count = self.get_replay_counts(p)
+ hash_failed_count = self.get_hash_failed_counts(p)
+ seq_cycle_count = self.statistics.get_err_counter(seq_cycle_node_name)
+
+ if ESP == self.encryption_type:
+ undersize_node_name = ('/err/%s/undersized packet' %
+ self.tra4_decrypt_node_name[0])
+ undersize_count = self.statistics.get_err_counter(
+ undersize_node_name)
+
+ #
+ # send packets with seq numbers 1->34
+ # this means the window size is still in Case B (see RFC4303
+ # Appendix A)
+ #
+ # for reasons i haven't investigated Scapy won't create a packet with
+ # seq_num=0
+ #
+ pkts = [(Ether(src=self.tra_if.remote_mac,
+ dst=self.tra_if.local_mac) /
+ p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
+ dst=self.tra_if.local_ip4) /
+ ICMP(),
+ seq_num=seq))
+ for seq in range(1, 34)]
+ recv_pkts = self.send_and_expect(self.tra_if, pkts, self.tra_if)