X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=test%2Ftemplate_ipsec.py;h=c438fbb7680c238ff1a4b4dac767f53fec98962c;hb=0e2f188f7;hp=ce188622a9e4517bddf2ae1bda70ec0010d37dec;hpb=93688d7341ada44755dc0432de3e3dbaaa8aa111;p=vpp.git diff --git a/test/template_ipsec.py b/test/template_ipsec.py index ce188622a9e..c438fbb7680 100644 --- a/test/template_ipsec.py +++ b/test/template_ipsec.py @@ -26,7 +26,6 @@ from os import popen class IPsecIPv4Params: - addr_type = socket.AF_INET addr_any = "0.0.0.0" addr_bcast = "255.255.255.255" @@ -52,6 +51,8 @@ class IPsecIPv4Params: self.outer_flow_label = 0 self.inner_flow_label = 0x12345 + self.anti_replay_window_size = 64 + self.auth_algo_vpp_id = ( VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_SHA1_96 ) @@ -74,7 +75,6 @@ class IPsecIPv4Params: class IPsecIPv6Params: - addr_type = socket.AF_INET6 addr_any = "0::0" addr_bcast = "ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff" @@ -100,6 +100,8 @@ class IPsecIPv6Params: self.outer_flow_label = 0 self.inner_flow_label = 0x12345 + self.anti_replay_window_size = 64 + self.auth_algo_vpp_id = ( VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_SHA1_96 ) @@ -122,7 +124,7 @@ class IPsecIPv6Params: def mk_scapy_crypt_key(p): - if p.crypt_algo in ("AES-GCM", "AES-CTR"): + if p.crypt_algo in ("AES-GCM", "AES-CTR", "AES-NULL-GMAC"): return p.crypt_key + struct.pack("!I", p.salt) else: return p.crypt_key @@ -138,7 +140,7 @@ def config_tun_params(p, encryption_type, tun_if): crypt_key = mk_scapy_crypt_key(p) p.scapy_tun_sa = SecurityAssociation( encryption_type, - spi=p.vpp_tun_spi, + spi=p.scapy_tun_spi, crypt_algo=p.crypt_algo, crypt_key=crypt_key, auth_algo=p.auth_algo, @@ -149,7 +151,7 @@ def config_tun_params(p, encryption_type, tun_if): ) p.vpp_tun_sa = SecurityAssociation( encryption_type, - spi=p.scapy_tun_spi, + spi=p.vpp_tun_spi, crypt_algo=p.crypt_algo, crypt_key=crypt_key, auth_algo=p.auth_algo, @@ -167,7 +169,7 @@ def config_tra_params(p, encryption_type): crypt_key = mk_scapy_crypt_key(p) p.scapy_tra_sa = SecurityAssociation( encryption_type, - spi=p.vpp_tra_spi, + spi=p.scapy_tra_spi, crypt_algo=p.crypt_algo, crypt_key=crypt_key, auth_algo=p.auth_algo, @@ -177,7 +179,7 @@ def config_tra_params(p, encryption_type): ) p.vpp_tra_sa = SecurityAssociation( encryption_type, - spi=p.scapy_tra_spi, + spi=p.vpp_tra_spi, crypt_algo=p.crypt_algo, crypt_key=crypt_key, auth_algo=p.auth_algo, @@ -342,7 +344,7 @@ class IpsecTra4(object): return count def get_hash_failed_counts(self, p): - if ESP == self.encryption_type and p.crypt_algo == "AES-GCM": + if ESP == self.encryption_type and p.crypt_algo in ("AES-GCM", "AES-NULL-GMAC"): hash_failed_node_name = ( "/err/%s/decryption_failed" % self.tra4_decrypt_node_name[p.async_mode] ) @@ -432,6 +434,34 @@ class IpsecTra4(object): ] recv_pkts = self.send_and_expect(self.tra_if, pkts, self.tra_if) + # a replayed packet, then an out of window, then a legit + # tests that a early failure on the batch doesn't affect subsequent packets. + pkts = [ + ( + Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac) + / p.scapy_tra_sa.encrypt( + IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), + seq_num=203, + ) + ), + ( + Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac) + / p.scapy_tra_sa.encrypt( + IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), + seq_num=81, + ) + ), + ( + Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac) + / p.scapy_tra_sa.encrypt( + IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), + seq_num=204, + ) + ), + ] + n_rx = 1 if ar_on else 3 + recv_pkts = self.send_and_expect(self.tra_if, pkts, self.tra_if, n_rx=n_rx) + # move the window over half way to a wrap pkts = [ ( @@ -599,15 +629,34 @@ class IpsecTra4(object): def verify_tra_anti_replay(self): p = self.params[socket.AF_INET] esn_en = p.vpp_tra_sa.esn_en + anti_replay_window_size = p.anti_replay_window_size seq_cycle_node_name = "/err/%s/seq_cycled" % self.tra4_encrypt_node_name replay_count = self.get_replay_counts(p) + initial_sa_node_replay_diff = replay_count - p.tra_sa_in.get_err("replay") hash_failed_count = self.get_hash_failed_counts(p) seq_cycle_count = self.statistics.get_err_counter(seq_cycle_node_name) + initial_sa_node_cycled_diff = seq_cycle_count - p.tra_sa_in.get_err( + "seq_cycled" + ) + hash_err = "integ_error" if ESP == self.encryption_type: undersize_node_name = "/err/%s/runt" % self.tra4_decrypt_node_name[0] undersize_count = self.statistics.get_err_counter(undersize_node_name) + initial_sa_node_undersize_diff = undersize_count - p.tra_sa_in.get_err( + "runt" + ) + # For AES-GCM an error in the hash is reported as a decryption failure + if p.crypt_algo in ("AES-GCM", "AES-NULL-GMAC"): + hash_err = "decryption_failed" + # In async mode, we don't report errors in the hash. + if p.async_mode: + hash_err = "" + else: + initial_sa_node_hash_diff = hash_failed_count - p.tra_sa_in.get_err( + hash_err + ) # # send packets with seq numbers 1->34 @@ -633,6 +682,8 @@ class IpsecTra4(object): self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2) replay_count += len(pkts) self.assertEqual(self.get_replay_counts(p), replay_count) + err = p.tra_sa_in.get_err("replay") + initial_sa_node_replay_diff + self.assertEqual(err, replay_count) # # now send a batch of packets all with the same sequence number @@ -649,25 +700,31 @@ class IpsecTra4(object): recv_pkts = self.send_and_expect(self.tra_if, pkts * 8, self.tra_if, n_rx=1) replay_count += 7 self.assertEqual(self.get_replay_counts(p), replay_count) + err = p.tra_sa_in.get_err("replay") + initial_sa_node_replay_diff + self.assertEqual(err, replay_count) # - # now move the window over to 257 (more than one byte) and into Case A + # now move the window over to anti_replay_window_size + 100 and into Case A # self.vapi.cli("clear error") pkt = Ether( src=self.tra_if.remote_mac, dst=self.tra_if.local_mac ) / p.scapy_tra_sa.encrypt( IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), - seq_num=257, + seq_num=anti_replay_window_size + 100, ) recv_pkts = self.send_and_expect(self.tra_if, [pkt], self.tra_if) + self.logger.info(self.vapi.ppcli("show ipsec sa 1")) + # replayed packets are dropped self.send_and_assert_no_replies(self.tra_if, pkt * 3, timeout=0.2) replay_count += 3 self.assertEqual(self.get_replay_counts(p), replay_count) + err = p.tra_sa_in.get_err("replay") + initial_sa_node_replay_diff + self.assertEqual(err, replay_count) - # the window size is 64 packets + # the window size is anti_replay_window_size packets # in window are still accepted pkt = Ether( src=self.tra_if.remote_mac, dst=self.tra_if.local_mac @@ -675,12 +732,11 @@ class IpsecTra4(object): IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), seq_num=200, ) - recv_pkts = self.send_and_expect(self.tra_if, [pkt], self.tra_if) # a packet that does not decrypt does not move the window forward bogus_sa = SecurityAssociation( self.encryption_type, - p.vpp_tra_spi, + p.scapy_tra_spi, crypt_algo=p.crypt_algo, crypt_key=mk_scapy_crypt_key(p)[::-1], auth_algo=p.auth_algo, @@ -690,27 +746,32 @@ class IpsecTra4(object): src=self.tra_if.remote_mac, dst=self.tra_if.local_mac ) / bogus_sa.encrypt( IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), - seq_num=350, + seq_num=anti_replay_window_size + 200, ) self.send_and_assert_no_replies(self.tra_if, pkt * 17, timeout=0.2) hash_failed_count += 17 self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count) + if hash_err != "": + err = p.tra_sa_in.get_err(hash_err) + initial_sa_node_hash_diff + self.assertEqual(err, hash_failed_count) # a malformed 'runt' packet # created by a mis-constructed SA if ESP == self.encryption_type and p.crypt_algo != "NULL": - bogus_sa = SecurityAssociation(self.encryption_type, p.vpp_tra_spi) + bogus_sa = SecurityAssociation(self.encryption_type, p.scapy_tra_spi) pkt = Ether( src=self.tra_if.remote_mac, dst=self.tra_if.local_mac ) / bogus_sa.encrypt( IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), - seq_num=350, + seq_num=anti_replay_window_size + 200, ) self.send_and_assert_no_replies(self.tra_if, pkt * 17, timeout=0.2) undersize_count += 17 self.assert_error_counter_equal(undersize_node_name, undersize_count) + err = p.tra_sa_in.get_err("runt") + initial_sa_node_undersize_diff + self.assertEqual(err, undersize_count) # which we can determine since this packet is still in the window pkt = Ether( @@ -739,17 +800,22 @@ class IpsecTra4(object): # wrap. but since it isn't then the verify will fail. hash_failed_count += 17 self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count) + if hash_err != "": + err = p.tra_sa_in.get_err(hash_err) + initial_sa_node_hash_diff + self.assertEqual(err, hash_failed_count) else: replay_count += 17 self.assertEqual(self.get_replay_counts(p), replay_count) + err = p.tra_sa_in.get_err("replay") + initial_sa_node_replay_diff + self.assertEqual(err, replay_count) - # valid packet moves the window over to 258 + # valid packet moves the window over to anti_replay_window_size + 258 pkt = Ether( src=self.tra_if.remote_mac, dst=self.tra_if.local_mac ) / p.scapy_tra_sa.encrypt( IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), - seq_num=258, + seq_num=anti_replay_window_size + 258, ) rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if) decrypted = p.vpp_tra_sa.decrypt(rx[0][IP]) @@ -760,7 +826,7 @@ class IpsecTra4(object): # causes the TX seq number to wrap; unless we're using extened sequence # numbers. # - self.vapi.cli("test ipsec sa %d seq 0xffffffff" % p.scapy_tra_sa_id) + self.vapi.cli("test ipsec sa %d seq 0xffffffff" % p.vpp_tra_sa_id) self.logger.info(self.vapi.ppcli("show ipsec sa 0")) self.logger.info(self.vapi.ppcli("show ipsec sa 1")) @@ -803,7 +869,7 @@ class IpsecTra4(object): decrypted = p.vpp_tra_sa.decrypt(rx[0][IP]) # - # A packet that has seq num between (2^32-64) and 5 is within + # A packet that has seq num between (2^32-anti_replay_window_size)+4 and 5 is within # the window # p.scapy_tra_sa.seq_num = 0xFFFFFFFD @@ -833,17 +899,20 @@ class IpsecTra4(object): hash_failed_count += 1 self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count) + if hash_err != "": + err = p.tra_sa_in.get_err(hash_err) + initial_sa_node_hash_diff + self.assertEqual(err, hash_failed_count) # # but if we move the window forward to case B, then we can wrap # again # - p.scapy_tra_sa.seq_num = 0x100000555 + p.scapy_tra_sa.seq_num = 0x100000000 + anti_replay_window_size + 0x555 pkt = Ether( src=self.tra_if.remote_mac, dst=self.tra_if.local_mac ) / p.scapy_tra_sa.encrypt( IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), - seq_num=0x100000555, + seq_num=p.scapy_tra_sa.seq_num, ) rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if) decrypted = p.vpp_tra_sa.decrypt(rx[0][IP]) @@ -866,6 +935,8 @@ class IpsecTra4(object): self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2) seq_cycle_count += len(pkts) self.assert_error_counter_equal(seq_cycle_node_name, seq_cycle_count) + err = p.tra_sa_out.get_err("seq_cycled") + initial_sa_node_cycled_diff + self.assertEqual(err, seq_cycle_count) # move the security-associations seq number on to the last we used self.vapi.cli("test ipsec sa %d seq 0x15f" % p.scapy_tra_sa_id) @@ -896,7 +967,7 @@ class IpsecTra4(object): ] self.send_and_expect(self.tra_if, pkts, self.tra_if) - self.assertEqual(p.tra_sa_out.get_lost(), 0) + self.assertEqual(p.tra_sa_in.get_err("lost"), 0) # skip a sequence number pkts = [ @@ -911,7 +982,7 @@ class IpsecTra4(object): ] self.send_and_expect(self.tra_if, pkts, self.tra_if) - self.assertEqual(p.tra_sa_out.get_lost(), 0) + self.assertEqual(p.tra_sa_in.get_err("lost"), 0) # the lost packet are counted untill we get up past the first # sizeof(replay_window) packets @@ -927,7 +998,7 @@ class IpsecTra4(object): ] self.send_and_expect(self.tra_if, pkts, self.tra_if) - self.assertEqual(p.tra_sa_out.get_lost(), 1) + self.assertEqual(p.tra_sa_in.get_err("lost"), 1) # lost of holes in the sequence pkts = [ @@ -954,7 +1025,7 @@ class IpsecTra4(object): ] self.send_and_expect(self.tra_if, pkts, self.tra_if) - self.assertEqual(p.tra_sa_out.get_lost(), 51) + self.assertEqual(p.tra_sa_in.get_err("lost"), 51) # a big hole in the seq number space pkts = [ @@ -969,7 +1040,7 @@ class IpsecTra4(object): ] self.send_and_expect(self.tra_if, pkts, self.tra_if) - self.assertEqual(p.tra_sa_out.get_lost(), 151) + self.assertEqual(p.tra_sa_in.get_err("lost"), 151) def verify_tra_basic4(self, count=1, payload_size=54): """ipsec v4 transport basic test""" @@ -1008,167 +1079,933 @@ class IpsecTra4(object): self.assertEqual( pkts, count, "incorrect SA out counts: expected %d != %d" % (count, pkts) ) - self.assertEqual(p.tra_sa_out.get_lost(), 0) - self.assertEqual(p.tra_sa_in.get_lost(), 0) + self.assertEqual(p.tra_sa_out.get_err("lost"), 0) + self.assertEqual(p.tra_sa_in.get_err("lost"), 0) self.assert_packet_counter_equal(self.tra4_encrypt_node_name, count) self.assert_packet_counter_equal(self.tra4_decrypt_node_name[0], count) + def _verify_tra_anti_replay_algorithm_esn(self): + def seq_num(seqh, seql): + return (seqh << 32) | (seql & 0xFFFF_FFFF) -class IpsecTra4Tests(IpsecTra4): - """UT test methods for Transport v4""" + p = self.params[socket.AF_INET] + anti_replay_window_size = p.anti_replay_window_size - def test_tra_anti_replay(self): - """ipsec v4 transport anti-replay test""" - self.verify_tra_anti_replay() + seq_cycle_node_name = "/err/%s/seq_cycled" % self.tra4_encrypt_node_name + replay_count = self.get_replay_counts(p) + hash_failed_count = self.get_hash_failed_counts(p) + seq_cycle_count = self.statistics.get_err_counter(seq_cycle_node_name) - def test_tra_lost(self): - """ipsec v4 transport lost packet test""" - self.verify_tra_lost() + if ESP == self.encryption_type: + undersize_node_name = "/err/%s/runt" % self.tra4_decrypt_node_name[0] + undersize_count = self.statistics.get_err_counter(undersize_node_name) - def test_tra_basic(self, count=1): - """ipsec v4 transport basic test""" - self.verify_tra_basic4(count=1) + # reset the TX SA to avoid conflict with left configuration + self.vapi.cli(f"test ipsec sa {p.vpp_tra_sa_id} seq 0x0") - def test_tra_burst(self): - """ipsec v4 transport burst test""" - self.verify_tra_basic4(count=257) + """ + RFC 4303 Appendix A2. Case A + |: new Th marker + a-i: possible seq num received + +: Bl, Tl, Bl', Tl' + [BT]l(sign) = [BT]l (sign) 2^32 mod 2^32 (Th inc/dec-remented by 1) -class IpsecTra6(object): - """verify methods for Transport v6""" + Th - 1 Th Th + 1 + --|--a--+---b---+-c--|--d--+---e---+-f--|--g--+---h---+--i-|-- + ========= ========= ========= + Bl- Tl- Bl Tl Bl+ Tl+ - def verify_tra_basic6(self, count=1, payload_size=54): - self.vapi.cli("clear errors") - self.vapi.cli("clear ipsec sa") - try: - p = self.params[socket.AF_INET6] - send_pkts = self.gen_encrypt_pkts6( - p, - p.scapy_tra_sa, - self.tra_if, - src=self.tra_if.remote_ip6, - dst=self.tra_if.local_ip6, - count=count, - payload_size=payload_size, + Case A implies Tl >= W - 1 + """ + + Th = 1 + Tl = anti_replay_window_size + 40 + Bl = Tl - anti_replay_window_size + 1 + + # move VPP's RX AR window to Case A + self.vapi.cli(f"test ipsec sa {p.scapy_tra_sa_id} seq {seq_num(Th, Tl):#x}") + p.scapy_tra_sa.seq_num = seq_num(Th, Tl) + + """ + case a: Seql < Bl + - pre-crypto check: algorithm predicts that the packet wrap the window + -> Seqh = Th + 1 + - integrity check: should fail + - post-crypto check: ... + """ + pkts = [ + ( + Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac) + / p.scapy_tra_sa.encrypt( + IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), + seq_num=seq, + ) ) - recv_pkts = self.send_and_expect(self.tra_if, send_pkts, self.tra_if) - for rx in recv_pkts: - self.assertEqual(len(rx) - len(Ether()) - len(IPv6()), rx[IPv6].plen) - try: - decrypted = p.vpp_tra_sa.decrypt(rx[IPv6]) - self.assert_packet_checksums_valid(decrypted) - except: - self.logger.debug(ppp("Unexpected packet:", rx)) - raise - finally: - self.logger.info(self.vapi.ppcli("show error")) - self.logger.info(self.vapi.ppcli("show ipsec all")) + for seq in range(seq_num(Th - 1, Bl - 20), seq_num(Th - 1, Bl - 5)) + ] - pkts = p.tra_sa_in.get_stats()["packets"] - self.assertEqual( - pkts, count, "incorrect SA in counts: expected %d != %d" % (count, pkts) - ) - pkts = p.tra_sa_out.get_stats()["packets"] - self.assertEqual( - pkts, count, "incorrect SA out counts: expected %d != %d" % (count, pkts) - ) - self.assert_packet_counter_equal(self.tra6_encrypt_node_name, count) - self.assert_packet_counter_equal(self.tra6_decrypt_node_name[0], count) + # out-of-window packets fail integrity check + self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2) + hash_failed_count += len(pkts) + self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count) - def gen_encrypt_pkts_ext_hdrs6( - self, sa, sw_intf, src, dst, count=1, payload_size=54 - ): - return [ - Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) - / sa.encrypt( - IPv6(src=src, dst=dst) - / ICMPv6EchoRequest(id=0, seq=1, data="X" * payload_size) + """ + case b: Bl <= Seql <= Tl + - pre-crypto check: algorithm predicts that the packet is in the window + -> Seqh = Th + -> check for a replayed packet with Seql + - integrity check: should fail + - post-crypto check: ... + """ + pkts = [ + ( + Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac) + / p.scapy_tra_sa.encrypt( + IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), + seq_num=seq, + ) ) - for i in range(count) + for seq in range(seq_num(Th, Tl - 10), seq_num(Th, Tl - 5)) ] + self.send_and_expect(self.tra_if, pkts, self.tra_if) - def gen_pkts_ext_hdrs6(self, sw_intf, src, dst, count=1, payload_size=54): - return [ - Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) - / IPv6(src=src, dst=dst) - / IPv6ExtHdrHopByHop() - / IPv6ExtHdrFragment(id=2, offset=200) - / Raw(b"\xff" * 200) - for i in range(count) + p.scapy_tra_sa.seq_num = seq_num(Th - 1, Tl) + pkts = [ + ( + Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac) + / p.scapy_tra_sa.encrypt( + IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), + seq_num=seq, + ) + ) + for seq in range(seq_num(Th - 1, Tl - 35), seq_num(Th - 1, Tl - 5)) ] - def verify_tra_encrypted6(self, p, sa, rxs): - decrypted = [] - for rx in rxs: - self.assert_packet_checksums_valid(rx) - try: - decrypt_pkt = p.vpp_tra_sa.decrypt(rx[IPv6]) - decrypted.append(decrypt_pkt) - self.assert_equal(decrypt_pkt.src, self.tra_if.local_ip6) - self.assert_equal(decrypt_pkt.dst, self.tra_if.remote_ip6) - except: - self.logger.debug(ppp("Unexpected packet:", rx)) - try: - self.logger.debug(ppp("Decrypted packet:", decrypt_pkt)) - except: - pass - raise - return decrypted + self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2) - def verify_tra_66_ext_hdrs(self, p): - count = 63 + # some packets are rejected by the pre-crypto check + replay_count += 5 + self.assertEqual(self.get_replay_counts(p), replay_count) - # - # check we can decrypt with options - # - tx = self.gen_encrypt_pkts_ext_hdrs6( - p.scapy_tra_sa, - self.tra_if, - src=self.tra_if.remote_ip6, - dst=self.tra_if.local_ip6, - count=count, - ) - self.send_and_expect(self.tra_if, tx, self.tra_if) + # out-of-window packets fail integrity check + hash_failed_count += len(pkts) - 5 + self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count) - # - # injecting a packet from ourselves to be routed of box is a hack - # but it matches an outbout policy, alors je ne regrette rien - # + """ + case c: Seql > Tl + - pre-crypto check: algorithm predicts that the packet does not wrap the window + -> Seqh = Th + - integrity check: should fail + - post-crypto check: ... + """ + pkts = [ + ( + Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac) + / p.scapy_tra_sa.encrypt( + IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), + seq_num=seq, + ) + ) + for seq in range(seq_num(Th - 1, Tl + 5), seq_num(Th - 1, Tl + 20)) + ] - # one extension before ESP - tx = ( - Ether(src=self.pg2.remote_mac, dst=self.pg2.local_mac) - / IPv6(src=self.tra_if.local_ip6, dst=self.tra_if.remote_ip6) - / IPv6ExtHdrFragment(id=2, offset=200) - / Raw(b"\xff" * 200) - ) + self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2) - rxs = self.send_and_expect(self.pg2, [tx], self.tra_if) - dcs = self.verify_tra_encrypted6(p, p.vpp_tra_sa, rxs) + # out-of-window packets fail integrity check + hash_failed_count += len(pkts) + self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count) - for dc in dcs: - # for reasons i'm not going to investigate scapy does not - # created the correct headers after decrypt. but reparsing - # the ipv6 packet fixes it - dc = IPv6(raw(dc[IPv6])) - self.assert_equal(dc[IPv6ExtHdrFragment].id, 2) + """ + case d: Seql < Bl + - pre-crypto check: algorithm predicts that the packet wrap the window + -> Seqh = Th + 1 + - integrity check: should fail + - post-crypto check: ... + """ + p.scapy_tra_sa.seq_num = seq_num(Th, Tl) + pkts = [ + ( + Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac) + / p.scapy_tra_sa.encrypt( + IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), + seq_num=seq, + ) + ) + for seq in range(seq_num(Th, Bl - 20), seq_num(Th, Bl - 5)) + ] - # two extensions before ESP - tx = ( - Ether(src=self.pg2.remote_mac, dst=self.pg2.local_mac) - / IPv6(src=self.tra_if.local_ip6, dst=self.tra_if.remote_ip6) - / IPv6ExtHdrHopByHop() - / IPv6ExtHdrFragment(id=2, offset=200) - / Raw(b"\xff" * 200) - ) + # out-of-window packets fail integrity check + self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2) + hash_failed_count += len(pkts) + self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count) - rxs = self.send_and_expect(self.pg2, [tx], self.tra_if) - dcs = self.verify_tra_encrypted6(p, p.vpp_tra_sa, rxs) + """ + case e: Bl <= Seql <= Tl + - pre-crypto check: algorithm predicts that the packet is in the window + -> Seqh = Th + -> check for a replayed packet with Seql + - integrity check: should pass + - post-crypto check: should pass + -> Seql is marked in the AR window + """ + pkts = [ + ( + Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac) + / p.scapy_tra_sa.encrypt( + IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), + seq_num=seq, + ) + ) + for seq in range(seq_num(Th, Bl + 10), seq_num(Th, Bl + 30)) + ] - for dc in dcs: - dc = IPv6(raw(dc[IPv6])) - self.assertTrue(dc[IPv6ExtHdrHopByHop]) + self.send_and_expect(self.tra_if, pkts, self.tra_if) + + """ + case f: Seql > Tl + - pre-crypto check: algorithm predicts that the packet does not wrap the window + -> Seqh = Th + - integrity check: should pass + - post-crypto check: should pass + -> AR window shift (the window stays Case A) + -> Seql is marked in the AR window + """ + pkts = [ + ( + Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac) + / p.scapy_tra_sa.encrypt( + IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), + seq_num=seq, + ) + ) + for seq in range(seq_num(Th, Tl + 50), seq_num(Th, Tl + 60)) + ] + + self.send_and_expect(self.tra_if, pkts, self.tra_if) + + """ + case g: Seql < Bl + - pre-crypto check: algorithm predicts that the packet wrap the window + -> Seqh = Th + 1 + - integrity check: should pass + - post-crypto check: should pass + -> AR window shift (may set the window in Case B) + -> Seql is marked in the AR window + """ + p.scapy_tra_sa.seq_num = seq_num(Th + 1, Tl) + pkts = [ + ( + Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac) + / p.scapy_tra_sa.encrypt( + IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), + seq_num=seq, + ) + ) + # set the window in Case B (the minimum window size is 64 + # so we are sure to overlap) + for seq in range(seq_num(Th + 1, 10), seq_num(Th + 1, 20)) + ] + + self.send_and_expect(self.tra_if, pkts, self.tra_if) + + # reset the VPP's RX AR window to Case A + Th = 1 + Tl = 2 * anti_replay_window_size + 40 + Bl = Tl - anti_replay_window_size + 1 + + self.vapi.cli(f"test ipsec sa {p.scapy_tra_sa_id} seq {seq_num(Th, Tl):#x}") + + p.scapy_tra_sa.seq_num = seq_num(Th + 1, Tl) + pkts = [ + ( + Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac) + / p.scapy_tra_sa.encrypt( + IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), + seq_num=seq, + ) + ) + # the AR will stay in Case A + for seq in range( + seq_num(Th + 1, anti_replay_window_size + 10), + seq_num(Th + 1, anti_replay_window_size + 20), + ) + ] + + self.send_and_expect(self.tra_if, pkts, self.tra_if) + + """ + case h: Bl <= Seql <= Tl + - pre-crypto check: algorithm predicts that the packet is in the window + -> Seqh = Th + -> check for a replayed packet with Seql + - integrity check: the wrap is not detected, should fail + - post-crypto check: ... + """ + Th += 1 + Tl = anti_replay_window_size + 20 + Bl = Tl - anti_replay_window_size + 1 + + p.scapy_tra_sa.seq_num = seq_num(Th + 1, Tl) + + pkts = [ + ( + Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac) + / p.scapy_tra_sa.encrypt( + IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), + seq_num=seq, + ) + ) + for seq in range(seq_num(Th + 1, Tl - 20), seq_num(Th + 1, Tl - 5)) + ] + + self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2) + + # some packets are rejected by the pre-crypto check + replay_count += 5 + self.assertEqual(self.get_replay_counts(p), replay_count) + + # out-of-window packets fail integrity check + hash_failed_count += len(pkts) - 5 + self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count) + + """ + case i: Seql > Tl + - pre-crypto check: algorithm predicts that the packet does not wrap the window + -> Seqh = Th + - integrity check: the wrap is not detected, shoud fail + - post-crypto check: ... + """ + pkts = [ + ( + Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac) + / p.scapy_tra_sa.encrypt( + IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), + seq_num=seq, + ) + ) + for seq in range(seq_num(Th + 1, Tl + 5), seq_num(Th + 1, Tl + 15)) + ] + + # out-of-window packets fail integrity check + self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2) + hash_failed_count += len(pkts) + self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count) + + """ + RFC 4303 Appendix A2. Case B + + Th - 1 Th Th + 1 + ----|-a-+-----b----+--c--|-d-+----e-----+--f--|-g-+--h--- + ========= =========== =========== + Tl- Bl Tl Bl+ Tl+ + + Case B implies Tl < W - 1 + """ + + # reset the VPP's RX AR window to Case B + Th = 2 + Tl = 30 # minimum window size of 64, we are sure to overlap + Bl = (Tl - anti_replay_window_size + 1) % (1 << 32) + + self.vapi.cli(f"test ipsec sa {p.scapy_tra_sa_id} seq {seq_num(Th, Tl):#x}") + p.scapy_tra_sa.seq_num = seq_num(Th, Tl) + + """ + case a: Seql <= Tl < Bl + - pre-crypto check: algorithm predicts that the packet is in the window + -> Seqh = Th + -> check for replayed packet + - integrity check: should fail + - post-crypto check: ... + """ + pkts = [ + ( + Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac) + / p.scapy_tra_sa.encrypt( + IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), + seq_num=seq, + ) + ) + for seq in range(seq_num(Th, 5), seq_num(Th, 10)) + ] + + self.send_and_expect(self.tra_if, pkts, self.tra_if) + + p.scapy_tra_sa.seq_num = seq_num(Th - 1, Tl) + pkts = [ + ( + Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac) + / p.scapy_tra_sa.encrypt( + IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), + seq_num=seq, + ) + ) + for seq in range(seq_num(Th - 1, 0), seq_num(Th - 1, 15)) + ] + + self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2) + + # some packets are rejected by the pre-crypto check + replay_count += 5 + self.assertEqual(self.get_replay_counts(p), replay_count) + + # out-of-window packets fail integrity check + hash_failed_count += len(pkts) - 5 + self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count) + + """ + case b: Tl < Seql < Bl + - pre-crypto check: algorithm predicts that the packet will shift the window + -> Seqh = Th + - integrity check: should fail + - post-crypto check: ... + """ + pkts = [ + ( + Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac) + / p.scapy_tra_sa.encrypt( + IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), + seq_num=seq, + ) + ) + for seq in range(seq_num(Th - 1, Tl + 10), seq_num(Th - 1, Tl + 20)) + ] + + self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2) + + # out-of-window packets fail integrity check + hash_failed_count += len(pkts) + self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count) + + """ + case c: Tl < Bl <= Seql + - pre-crypto check: algorithm predicts that the packet is in the window + -> Seqh = Th - 1 + -> check for a replayed packet with Seql + - integrity check: should pass + - post-crypto check: should pass + -> Seql is marked in the AR window + """ + pkts = [ + ( + Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac) + / p.scapy_tra_sa.encrypt( + IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), + seq_num=seq, + ) + ) + for seq in range(seq_num(Th - 1, Bl + 10), seq_num(Th - 1, Bl + 20)) + ] + + self.send_and_expect(self.tra_if, pkts, self.tra_if) + + """ + case d: Seql <= Tl < Bl + - pre-crypto check: algorithm predicts that the packet is the window + -> Seqh = Th + -> check for replayed packet + - integrity check: should pass + - post-crypto check: should pass + -> Seql is marked in the AR window + """ + p.scapy_tra_sa.seq_num = seq_num(Th, Tl) + pkts = [ + ( + Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac) + / p.scapy_tra_sa.encrypt( + IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), + seq_num=seq, + ) + ) + for seq in range(seq_num(Th, 15), seq_num(Th, 25)) + ] + + self.send_and_expect(self.tra_if, pkts, self.tra_if) + + """ + case e: Tl < Seql < Bl + - pre-crypto check: algorithm predicts that the packet is in the window + -> Seqh = Th + -> check for a replayed packet with Seql + - integrity check: should pass + - post-crypto check: should pass + -> AR window shift (may set the window in Case A) + -> Seql is marked in the AR window + """ + pkts = [ + ( + Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac) + / p.scapy_tra_sa.encrypt( + IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), + seq_num=seq, + ) + ) + for seq in range(seq_num(Th, Tl + 5), seq_num(Th, Tl + 15)) + ] + + # the window stays in Case B + self.send_and_expect(self.tra_if, pkts, self.tra_if) + + pkts = [ + ( + Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac) + / p.scapy_tra_sa.encrypt( + IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), + seq_num=seq, + ) + ) + for seq in range( + seq_num(Th, Tl + anti_replay_window_size + 5), + seq_num(Th, Tl + anti_replay_window_size + 15), + ) + ] + + # the window moves to Case A + self.send_and_expect(self.tra_if, pkts, self.tra_if) + + # reset the VPP's RX AR window to Case B + Th = 2 + Tl = 30 # minimum window size of 64, we are sure to overlap + Bl = (Tl - anti_replay_window_size + 1) % (1 << 32) + + self.vapi.cli(f"test ipsec sa {p.scapy_tra_sa_id} seq {seq_num(Th, Tl):#x}") + p.scapy_tra_sa.seq_num = seq_num(Th, Tl) + + """ + case f: Tl < Bl <= Seql + - pre-crypto check: algorithm predicts that the packet is in the previous window + -> Seqh = Th - 1 + -> check for a replayed packet with Seql + - integrity check: should fail + - post-crypto check: ... + """ + pkts = [ + ( + Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac) + / p.scapy_tra_sa.encrypt( + IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), + seq_num=seq, + ) + ) + for seq in range(seq_num(Th, Bl + 10), seq_num(Th, Bl + 20)) + ] + + self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2) + + # out-of-window packets fail integrity check + hash_failed_count += len(pkts) + self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count) + + """ + case g: Seql <= Tl < Bl + - pre-crypto check: algorithm predicts that the packet is the window + -> Seqh = Th + -> check for replayed packet + - integrity check: should fail + - post-crypto check: ... + """ + pkts = [ + ( + Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac) + / p.scapy_tra_sa.encrypt( + IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), + seq_num=seq, + ) + ) + for seq in range(seq_num(Th, 10), seq_num(Th, 15)) + ] + + self.send_and_expect(self.tra_if, pkts, self.tra_if) + + p.scapy_tra_sa.seq_num = seq_num(Th + 1, Tl) + pkts = [ + ( + Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac) + / p.scapy_tra_sa.encrypt( + IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), + seq_num=seq, + ) + ) + for seq in range(seq_num(Th + 1, 0), seq_num(Th + 1, 15)) + ] + + self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2) + + # some packets are rejected by the pre-crypto check + replay_count += 5 + self.assertEqual(self.get_replay_counts(p), replay_count) + + # out-of-window packets fail integrity check + hash_failed_count += len(pkts) - 5 + self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count) + + """ + case h: Tl < Seql < Bl + - pre-crypto check: algorithm predicts that the packet will shift the window + -> Seqh = Th + - integrity check: should fail + - post-crypto check: ... + """ + pkts = [ + ( + Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac) + / p.scapy_tra_sa.encrypt( + IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), + seq_num=seq, + ) + ) + for seq in range(seq_num(Th + 1, Tl + 10), seq_num(Th + 1, Tl + 20)) + ] + + self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2) + + # out-of-window packets fail integrity check + hash_failed_count += len(pkts) + self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count) + + def _verify_tra_anti_replay_algorithm_no_esn(self): + def seq_num(seql): + return seql & 0xFFFF_FFFF + + p = self.params[socket.AF_INET] + anti_replay_window_size = p.anti_replay_window_size + + seq_cycle_node_name = "/err/%s/seq_cycled" % self.tra4_encrypt_node_name + replay_count = self.get_replay_counts(p) + hash_failed_count = self.get_hash_failed_counts(p) + seq_cycle_count = self.statistics.get_err_counter(seq_cycle_node_name) + + if ESP == self.encryption_type: + undersize_node_name = "/err/%s/runt" % self.tra4_decrypt_node_name[0] + undersize_count = self.statistics.get_err_counter(undersize_node_name) + + # reset the TX SA to avoid conflict with left configuration + self.vapi.cli(f"test ipsec sa {p.vpp_tra_sa_id} seq 0x0") + + """ + RFC 4303 Appendix A2. Case A + + a-c: possible seq num received + +: Bl, Tl + + |--a--+---b---+-c--| + ========= + Bl Tl + + No ESN implies Th = 0 + Case A implies Tl >= W - 1 + """ + + Tl = anti_replay_window_size + 40 + Bl = Tl - anti_replay_window_size + 1 + + # move VPP's RX AR window to Case A + self.vapi.cli(f"test ipsec sa {p.scapy_tra_sa_id} seq {seq_num(Tl):#x}") + p.scapy_tra_sa.seq_num = seq_num(Tl) + + """ + case a: Seql < Bl + - pre-crypto check: algorithm predicts that the packet is out of window + -> packet should be dropped + - integrity check: ... + - post-crypto check: ... + """ + pkts = [ + ( + Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac) + / p.scapy_tra_sa.encrypt( + IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), + seq_num=seq, + ) + ) + for seq in range(seq_num(Bl - 20), seq_num(Bl - 5)) + ] + + # out-of-window packets + self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2) + replay_count += len(pkts) + self.assertEqual(self.get_replay_counts(p), replay_count) + + """ + case b: Bl <= Seql <= Tl + - pre-crypto check: algorithm predicts that the packet is in the window + -> check for a replayed packet with Seql + - integrity check: should pass + - post-crypto check: + -> check for a replayed packet with Seql + """ + pkts = [ + ( + Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac) + / p.scapy_tra_sa.encrypt( + IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), + seq_num=seq, + ) + ) + for seq in range(seq_num(Tl - 50), seq_num(Tl - 30)) + ] + self.send_and_expect(self.tra_if, pkts, self.tra_if) + + pkts = [ + ( + Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac) + / p.scapy_tra_sa.encrypt( + IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), + seq_num=seq, + ) + ) + for seq in range(seq_num(Tl - 35), seq_num(Tl - 30)) + ] + + self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2) + + # replayed packets + replay_count += 5 + self.assertEqual(self.get_replay_counts(p), replay_count) + + """ + case c: Seql > Tl + - pre-crypto check: algorithm predicts that the packet will shift the window + - integrity check: should pass + - post-crypto check: should pass + -> AR window is shifted + """ + pkts = [ + ( + Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac) + / p.scapy_tra_sa.encrypt( + IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), + seq_num=seq, + ) + ) + for seq in range(seq_num(Tl + 5), seq_num(Tl + 20)) + ] + + self.send_and_expect(self.tra_if, pkts, self.tra_if) + + """ + RFC 4303 Appendix A2. Case B + + |-a-----+------b-----| + ========= + Tl + + Case B implies Tl < W - 1 + """ + + # reset the VPP's RX AR window to Case B + Tl = 30 # minimum window size of 64, we are sure to overlap + Bl = seq_num(Tl - anti_replay_window_size + 1) + + self.vapi.cli(f"test ipsec sa {p.scapy_tra_sa_id} seq {seq_num(Tl):#x}") + + """ + case a: Seql <= Tl < Bl + - pre-crypto check: algorithm predicts that the packet is in the window + -> check for replayed packet + - integrity check: should fail + - post-crypto check: ... + """ + pkts = [ + ( + Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac) + / p.scapy_tra_sa.encrypt( + IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), + seq_num=seq, + ) + ) + for seq in range(seq_num(5), seq_num(10)) + ] + + self.send_and_expect(self.tra_if, pkts, self.tra_if) + + """ + case b: Tl < Seql < Bl + - pre-crypto check: algorithm predicts that the packet will shift the window + - integrity check: should pass + - post-crypto check: should pass + -> AR window is shifted + """ + pkts = [ + ( + Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac) + / p.scapy_tra_sa.encrypt( + IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), + seq_num=seq, + ) + ) + for seq in range(seq_num(-50), seq_num(-20)) + ] + + self.send_and_expect(self.tra_if, pkts, self.tra_if) + + def verify_tra_anti_replay_algorithm(self): + if self.params[socket.AF_INET].vpp_tra_sa.esn_en: + self._verify_tra_anti_replay_algorithm_esn() + else: + self._verify_tra_anti_replay_algorithm_no_esn() + + +class IpsecTra4Tests(IpsecTra4): + """UT test methods for Transport v4""" + + def test_tra_anti_replay(self): + """ipsec v4 transport anti-replay test""" + self.verify_tra_anti_replay() + + def test_tra_anti_replay_algorithm(self): + """ipsec v4 transport anti-replay algorithm test""" + self.verify_tra_anti_replay_algorithm() + + def test_tra_lost(self): + """ipsec v4 transport lost packet test""" + self.verify_tra_lost() + + def test_tra_basic(self, count=1): + """ipsec v4 transport basic test""" + self.verify_tra_basic4(count=1) + + def test_tra_burst(self): + """ipsec v4 transport burst test""" + self.verify_tra_basic4(count=257) + + +class IpsecTra6(object): + """verify methods for Transport v6""" + + def verify_tra_basic6(self, count=1, payload_size=54): + self.vapi.cli("clear errors") + self.vapi.cli("clear ipsec sa") + try: + p = self.params[socket.AF_INET6] + send_pkts = self.gen_encrypt_pkts6( + p, + p.scapy_tra_sa, + self.tra_if, + src=self.tra_if.remote_ip6, + dst=self.tra_if.local_ip6, + count=count, + payload_size=payload_size, + ) + recv_pkts = self.send_and_expect(self.tra_if, send_pkts, self.tra_if) + for rx in recv_pkts: + self.assertEqual(len(rx) - len(Ether()) - len(IPv6()), rx[IPv6].plen) + try: + decrypted = p.vpp_tra_sa.decrypt(rx[IPv6]) + self.assert_packet_checksums_valid(decrypted) + except: + self.logger.debug(ppp("Unexpected packet:", rx)) + raise + finally: + self.logger.info(self.vapi.ppcli("show error")) + self.logger.info(self.vapi.ppcli("show ipsec all")) + + pkts = p.tra_sa_in.get_stats()["packets"] + self.assertEqual( + pkts, count, "incorrect SA in counts: expected %d != %d" % (count, pkts) + ) + pkts = p.tra_sa_out.get_stats()["packets"] + self.assertEqual( + pkts, count, "incorrect SA out counts: expected %d != %d" % (count, pkts) + ) + self.assert_packet_counter_equal(self.tra6_encrypt_node_name, count) + self.assert_packet_counter_equal(self.tra6_decrypt_node_name[0], count) + + def gen_encrypt_pkts_ext_hdrs6( + self, sa, sw_intf, src, dst, count=1, payload_size=54 + ): + return [ + Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) + / sa.encrypt( + IPv6(src=src, dst=dst) + / ICMPv6EchoRequest(id=0, seq=1, data="X" * payload_size) + ) + for i in range(count) + ] + + def gen_pkts_ext_hdrs6(self, sw_intf, src, dst, count=1, payload_size=54): + return [ + Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) + / IPv6(src=src, dst=dst) + / IPv6ExtHdrHopByHop() + / IPv6ExtHdrFragment(id=2, offset=200) + / Raw(b"\xff" * 200) + for i in range(count) + ] + + def verify_tra_encrypted6(self, p, sa, rxs): + decrypted = [] + for rx in rxs: + self.assert_packet_checksums_valid(rx) + try: + decrypt_pkt = p.vpp_tra_sa.decrypt(rx[IPv6]) + decrypted.append(decrypt_pkt) + self.assert_equal(decrypt_pkt.src, self.tra_if.local_ip6) + self.assert_equal(decrypt_pkt.dst, self.tra_if.remote_ip6) + except: + self.logger.debug(ppp("Unexpected packet:", rx)) + try: + self.logger.debug(ppp("Decrypted packet:", decrypt_pkt)) + except: + pass + raise + return decrypted + + def verify_tra_66_ext_hdrs(self, p): + count = 63 + + # + # check we can decrypt with options + # + tx = self.gen_encrypt_pkts_ext_hdrs6( + p.scapy_tra_sa, + self.tra_if, + src=self.tra_if.remote_ip6, + dst=self.tra_if.local_ip6, + count=count, + ) + self.send_and_expect(self.tra_if, tx, self.tra_if) + + # + # injecting a packet from ourselves to be routed of box is a hack + # but it matches an outbout policy, alors je ne regrette rien + # + + # one extension before ESP + tx = ( + Ether(src=self.pg2.remote_mac, dst=self.pg2.local_mac) + / IPv6(src=self.tra_if.local_ip6, dst=self.tra_if.remote_ip6) + / IPv6ExtHdrFragment(id=2, offset=200) + / Raw(b"\xff" * 200) + ) + + rxs = self.send_and_expect(self.pg2, [tx], self.tra_if) + dcs = self.verify_tra_encrypted6(p, p.vpp_tra_sa, rxs) + + for dc in dcs: + # for reasons i'm not going to investigate scapy does not + # created the correct headers after decrypt. but reparsing + # the ipv6 packet fixes it + dc = IPv6(raw(dc[IPv6])) + self.assert_equal(dc[IPv6ExtHdrFragment].id, 2) + + # two extensions before ESP + tx = ( + Ether(src=self.pg2.remote_mac, dst=self.pg2.local_mac) + / IPv6(src=self.tra_if.local_ip6, dst=self.tra_if.remote_ip6) + / IPv6ExtHdrHopByHop() + / IPv6ExtHdrFragment(id=2, offset=200) + / Raw(b"\xff" * 200) + ) + + rxs = self.send_and_expect(self.pg2, [tx], self.tra_if) + dcs = self.verify_tra_encrypted6(p, p.vpp_tra_sa, rxs) + + for dc in dcs: + dc = IPv6(raw(dc[IPv6])) + self.assertTrue(dc[IPv6ExtHdrHopByHop]) self.assert_equal(dc[IPv6ExtHdrFragment].id, 2) # two extensions before ESP, one after @@ -1263,7 +2100,7 @@ class IpsecTun4(object): decrypt_pkts = [] for rx in rxs: if p.nat_header: - self.assertEqual(rx[UDP].dport, 4500) + self.assertEqual(rx[UDP].dport, p.nat_header.dport) self.assert_packet_checksums_valid(rx) self.assertEqual(len(rx) - len(Ether()), rx[IP].len) try: @@ -1695,6 +2532,40 @@ class IpsecTun6(object): self.logger.info(self.vapi.ppcli("show ipsec all")) self.verify_counters6(p, p, count) + def verify_keepalive(self, p): + # the sizeof Raw is calculated to pad to the minimum ehternet + # frame size of 64 btyes + pkt = ( + Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac) + / IPv6(src=p.remote_tun_if_host, dst=self.tun_if.local_ip6) + / UDP(sport=333, dport=4500) + / Raw(b"\xff") + / Padding(0 * 1) + ) + self.send_and_assert_no_replies(self.tun_if, pkt * 31) + self.assert_error_counter_equal( + "/err/%s/nat_keepalive" % self.tun6_input_node, 31 + ) + + pkt = ( + Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac) + / IPv6(src=p.remote_tun_if_host, dst=self.tun_if.local_ip6) + / UDP(sport=333, dport=4500) + / Raw(b"\xfe") + ) + self.send_and_assert_no_replies(self.tun_if, pkt * 31) + self.assert_error_counter_equal("/err/%s/too_short" % self.tun6_input_node, 31) + + pkt = ( + Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac) + / IPv6(src=p.remote_tun_if_host, dst=self.tun_if.local_ip6) + / UDP(sport=333, dport=4500) + / Raw(b"\xfe") + / Padding(0 * 21) + ) + self.send_and_assert_no_replies(self.tun_if, pkt * 31) + self.assert_error_counter_equal("/err/%s/too_short" % self.tun6_input_node, 62) + class IpsecTun6Tests(IpsecTun6): """UT test methods for Tunnel v6"""