X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=test%2Ftemplate_ipsec.py;h=b5cd922f127af8cbb8b08ddee346adbb486140ef;hb=ccc17f0a70918ae0f6e94037101718786702295c;hp=1b1c9aaa25d1c9da1ed3b48f1cc02c43e81012fd;hpb=ab0bf0c0cbb66a807e4e8a04462ab56be12c4524;p=vpp.git diff --git a/test/template_ipsec.py b/test/template_ipsec.py index 1b1c9aaa25d..b5cd922f127 100644 --- a/test/template_ipsec.py +++ b/test/template_ipsec.py @@ -15,7 +15,8 @@ from scapy.layers.inet6 import ( ) -from framework import VppTestCase, VppTestRunner +from framework import VppTestCase +from asfframework import VppTestRunner from util import ppp, reassemble4, fragment_rfc791, fragment_rfc8200 from vpp_papi import VppEnum @@ -26,7 +27,6 @@ from os import popen class IPsecIPv4Params: - addr_type = socket.AF_INET addr_any = "0.0.0.0" addr_bcast = "255.255.255.255" @@ -52,6 +52,8 @@ class IPsecIPv4Params: self.outer_flow_label = 0 self.inner_flow_label = 0x12345 + self.anti_replay_window_size = 64 + self.auth_algo_vpp_id = ( VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_SHA1_96 ) @@ -74,7 +76,6 @@ class IPsecIPv4Params: class IPsecIPv6Params: - addr_type = socket.AF_INET6 addr_any = "0::0" addr_bcast = "ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff" @@ -100,6 +101,8 @@ class IPsecIPv6Params: self.outer_flow_label = 0 self.inner_flow_label = 0x12345 + self.anti_replay_window_size = 64 + self.auth_algo_vpp_id = ( VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_SHA1_96 ) @@ -122,7 +125,7 @@ class IPsecIPv6Params: def mk_scapy_crypt_key(p): - if p.crypt_algo in ("AES-GCM", "AES-CTR"): + if p.crypt_algo in ("AES-GCM", "AES-CTR", "AES-NULL-GMAC"): return p.crypt_key + struct.pack("!I", p.salt) else: return p.crypt_key @@ -138,7 +141,7 @@ def config_tun_params(p, encryption_type, tun_if): crypt_key = mk_scapy_crypt_key(p) p.scapy_tun_sa = SecurityAssociation( encryption_type, - spi=p.vpp_tun_spi, + spi=p.scapy_tun_spi, crypt_algo=p.crypt_algo, crypt_key=crypt_key, auth_algo=p.auth_algo, @@ -149,7 +152,7 @@ def config_tun_params(p, encryption_type, tun_if): ) p.vpp_tun_sa = SecurityAssociation( encryption_type, - spi=p.scapy_tun_spi, + spi=p.vpp_tun_spi, crypt_algo=p.crypt_algo, crypt_key=crypt_key, auth_algo=p.auth_algo, @@ -167,7 +170,7 @@ def config_tra_params(p, encryption_type): crypt_key = mk_scapy_crypt_key(p) p.scapy_tra_sa = SecurityAssociation( encryption_type, - spi=p.vpp_tra_spi, + spi=p.scapy_tra_spi, crypt_algo=p.crypt_algo, crypt_key=crypt_key, auth_algo=p.auth_algo, @@ -177,7 +180,7 @@ def config_tra_params(p, encryption_type): ) p.vpp_tra_sa = SecurityAssociation( encryption_type, - spi=p.scapy_tra_spi, + spi=p.vpp_tra_spi, crypt_algo=p.crypt_algo, crypt_key=crypt_key, auth_algo=p.auth_algo, @@ -330,27 +333,25 @@ class IpsecTra4(object): """verify methods for Transport v4""" def get_replay_counts(self, p): - replay_node_name = "/err/%s/SA replayed packet" % self.tra4_decrypt_node_name[0] + replay_node_name = "/err/%s/replay" % self.tra4_decrypt_node_name[0] count = self.statistics.get_err_counter(replay_node_name) if p.async_mode: replay_post_node_name = ( - "/err/%s/SA replayed packet" % self.tra4_decrypt_node_name[p.async_mode] + "/err/%s/replay" % self.tra4_decrypt_node_name[p.async_mode] ) count += self.statistics.get_err_counter(replay_post_node_name) return count def get_hash_failed_counts(self, p): - if ESP == self.encryption_type and p.crypt_algo == "AES-GCM": + if ESP == self.encryption_type and p.crypt_algo in ("AES-GCM", "AES-NULL-GMAC"): hash_failed_node_name = ( - "/err/%s/ESP decryption failed" - % self.tra4_decrypt_node_name[p.async_mode] + "/err/%s/decryption_failed" % self.tra4_decrypt_node_name[p.async_mode] ) else: hash_failed_node_name = ( - "/err/%s/Integrity check failed" - % self.tra4_decrypt_node_name[p.async_mode] + "/err/%s/integ_error" % self.tra4_decrypt_node_name[p.async_mode] ) count = self.statistics.get_err_counter(hash_failed_node_name) @@ -365,10 +366,7 @@ class IpsecTra4(object): esn_on = p.vpp_tra_sa.esn_en ar_on = p.flags & saf.IPSEC_API_SAD_FLAG_USE_ANTI_REPLAY - seq_cycle_node_name = ( - "/err/%s/sequence number cycled (packet dropped)" - % self.tra4_encrypt_node_name - ) + seq_cycle_node_name = "/err/%s/seq_cycled" % self.tra4_encrypt_node_name replay_count = self.get_replay_counts(p) hash_failed_count = self.get_hash_failed_counts(p) seq_cycle_count = self.statistics.get_err_counter(seq_cycle_node_name) @@ -437,6 +435,34 @@ class IpsecTra4(object): ] recv_pkts = self.send_and_expect(self.tra_if, pkts, self.tra_if) + # a replayed packet, then an out of window, then a legit + # tests that a early failure on the batch doesn't affect subsequent packets. + pkts = [ + ( + Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac) + / p.scapy_tra_sa.encrypt( + IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), + seq_num=203, + ) + ), + ( + Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac) + / p.scapy_tra_sa.encrypt( + IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), + seq_num=81, + ) + ), + ( + Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac) + / p.scapy_tra_sa.encrypt( + IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), + seq_num=204, + ) + ), + ] + n_rx = 1 if ar_on else 3 + recv_pkts = self.send_and_expect(self.tra_if, pkts, self.tra_if, n_rx=n_rx) + # move the window over half way to a wrap pkts = [ ( @@ -604,20 +630,34 @@ class IpsecTra4(object): def verify_tra_anti_replay(self): p = self.params[socket.AF_INET] esn_en = p.vpp_tra_sa.esn_en + anti_replay_window_size = p.anti_replay_window_size - seq_cycle_node_name = ( - "/err/%s/sequence number cycled (packet dropped)" - % self.tra4_encrypt_node_name - ) + seq_cycle_node_name = "/err/%s/seq_cycled" % self.tra4_encrypt_node_name replay_count = self.get_replay_counts(p) + initial_sa_node_replay_diff = replay_count - p.tra_sa_in.get_err("replay") hash_failed_count = self.get_hash_failed_counts(p) seq_cycle_count = self.statistics.get_err_counter(seq_cycle_node_name) + initial_sa_node_cycled_diff = seq_cycle_count - p.tra_sa_in.get_err( + "seq_cycled" + ) + hash_err = "integ_error" if ESP == self.encryption_type: - undersize_node_name = ( - "/err/%s/undersized packet" % self.tra4_decrypt_node_name[0] - ) + undersize_node_name = "/err/%s/runt" % self.tra4_decrypt_node_name[0] undersize_count = self.statistics.get_err_counter(undersize_node_name) + initial_sa_node_undersize_diff = undersize_count - p.tra_sa_in.get_err( + "runt" + ) + # For AES-GCM an error in the hash is reported as a decryption failure + if p.crypt_algo in ("AES-GCM", "AES-NULL-GMAC"): + hash_err = "decryption_failed" + # In async mode, we don't report errors in the hash. + if p.async_mode: + hash_err = "" + else: + initial_sa_node_hash_diff = hash_failed_count - p.tra_sa_in.get_err( + hash_err + ) # # send packets with seq numbers 1->34 @@ -643,6 +683,8 @@ class IpsecTra4(object): self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2) replay_count += len(pkts) self.assertEqual(self.get_replay_counts(p), replay_count) + err = p.tra_sa_in.get_err("replay") + initial_sa_node_replay_diff + self.assertEqual(err, replay_count) # # now send a batch of packets all with the same sequence number @@ -659,25 +701,31 @@ class IpsecTra4(object): recv_pkts = self.send_and_expect(self.tra_if, pkts * 8, self.tra_if, n_rx=1) replay_count += 7 self.assertEqual(self.get_replay_counts(p), replay_count) + err = p.tra_sa_in.get_err("replay") + initial_sa_node_replay_diff + self.assertEqual(err, replay_count) # - # now move the window over to 257 (more than one byte) and into Case A + # now move the window over to anti_replay_window_size + 100 and into Case A # self.vapi.cli("clear error") pkt = Ether( src=self.tra_if.remote_mac, dst=self.tra_if.local_mac ) / p.scapy_tra_sa.encrypt( IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), - seq_num=257, + seq_num=anti_replay_window_size + 100, ) recv_pkts = self.send_and_expect(self.tra_if, [pkt], self.tra_if) + self.logger.info(self.vapi.ppcli("show ipsec sa 1")) + # replayed packets are dropped self.send_and_assert_no_replies(self.tra_if, pkt * 3, timeout=0.2) replay_count += 3 self.assertEqual(self.get_replay_counts(p), replay_count) + err = p.tra_sa_in.get_err("replay") + initial_sa_node_replay_diff + self.assertEqual(err, replay_count) - # the window size is 64 packets + # the window size is anti_replay_window_size packets # in window are still accepted pkt = Ether( src=self.tra_if.remote_mac, dst=self.tra_if.local_mac @@ -685,12 +733,11 @@ class IpsecTra4(object): IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), seq_num=200, ) - recv_pkts = self.send_and_expect(self.tra_if, [pkt], self.tra_if) # a packet that does not decrypt does not move the window forward bogus_sa = SecurityAssociation( self.encryption_type, - p.vpp_tra_spi, + p.scapy_tra_spi, crypt_algo=p.crypt_algo, crypt_key=mk_scapy_crypt_key(p)[::-1], auth_algo=p.auth_algo, @@ -700,27 +747,32 @@ class IpsecTra4(object): src=self.tra_if.remote_mac, dst=self.tra_if.local_mac ) / bogus_sa.encrypt( IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), - seq_num=350, + seq_num=anti_replay_window_size + 200, ) self.send_and_assert_no_replies(self.tra_if, pkt * 17, timeout=0.2) hash_failed_count += 17 self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count) + if hash_err != "": + err = p.tra_sa_in.get_err(hash_err) + initial_sa_node_hash_diff + self.assertEqual(err, hash_failed_count) # a malformed 'runt' packet # created by a mis-constructed SA if ESP == self.encryption_type and p.crypt_algo != "NULL": - bogus_sa = SecurityAssociation(self.encryption_type, p.vpp_tra_spi) + bogus_sa = SecurityAssociation(self.encryption_type, p.scapy_tra_spi) pkt = Ether( src=self.tra_if.remote_mac, dst=self.tra_if.local_mac ) / bogus_sa.encrypt( IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), - seq_num=350, + seq_num=anti_replay_window_size + 200, ) self.send_and_assert_no_replies(self.tra_if, pkt * 17, timeout=0.2) undersize_count += 17 self.assert_error_counter_equal(undersize_node_name, undersize_count) + err = p.tra_sa_in.get_err("runt") + initial_sa_node_undersize_diff + self.assertEqual(err, undersize_count) # which we can determine since this packet is still in the window pkt = Ether( @@ -749,17 +801,22 @@ class IpsecTra4(object): # wrap. but since it isn't then the verify will fail. hash_failed_count += 17 self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count) + if hash_err != "": + err = p.tra_sa_in.get_err(hash_err) + initial_sa_node_hash_diff + self.assertEqual(err, hash_failed_count) else: replay_count += 17 self.assertEqual(self.get_replay_counts(p), replay_count) + err = p.tra_sa_in.get_err("replay") + initial_sa_node_replay_diff + self.assertEqual(err, replay_count) - # valid packet moves the window over to 258 + # valid packet moves the window over to anti_replay_window_size + 258 pkt = Ether( src=self.tra_if.remote_mac, dst=self.tra_if.local_mac ) / p.scapy_tra_sa.encrypt( IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), - seq_num=258, + seq_num=anti_replay_window_size + 258, ) rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if) decrypted = p.vpp_tra_sa.decrypt(rx[0][IP]) @@ -770,7 +827,7 @@ class IpsecTra4(object): # causes the TX seq number to wrap; unless we're using extened sequence # numbers. # - self.vapi.cli("test ipsec sa %d seq 0xffffffff" % p.scapy_tra_sa_id) + self.vapi.cli("test ipsec sa %d seq 0xffffffff" % p.vpp_tra_sa_id) self.logger.info(self.vapi.ppcli("show ipsec sa 0")) self.logger.info(self.vapi.ppcli("show ipsec sa 1")) @@ -813,7 +870,7 @@ class IpsecTra4(object): decrypted = p.vpp_tra_sa.decrypt(rx[0][IP]) # - # A packet that has seq num between (2^32-64) and 5 is within + # A packet that has seq num between (2^32-anti_replay_window_size)+4 and 5 is within # the window # p.scapy_tra_sa.seq_num = 0xFFFFFFFD @@ -843,17 +900,20 @@ class IpsecTra4(object): hash_failed_count += 1 self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count) + if hash_err != "": + err = p.tra_sa_in.get_err(hash_err) + initial_sa_node_hash_diff + self.assertEqual(err, hash_failed_count) # # but if we move the window forward to case B, then we can wrap # again # - p.scapy_tra_sa.seq_num = 0x100000555 + p.scapy_tra_sa.seq_num = 0x100000000 + anti_replay_window_size + 0x555 pkt = Ether( src=self.tra_if.remote_mac, dst=self.tra_if.local_mac ) / p.scapy_tra_sa.encrypt( IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), - seq_num=0x100000555, + seq_num=p.scapy_tra_sa.seq_num, ) rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if) decrypted = p.vpp_tra_sa.decrypt(rx[0][IP]) @@ -876,6 +936,8 @@ class IpsecTra4(object): self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2) seq_cycle_count += len(pkts) self.assert_error_counter_equal(seq_cycle_node_name, seq_cycle_count) + err = p.tra_sa_out.get_err("seq_cycled") + initial_sa_node_cycled_diff + self.assertEqual(err, seq_cycle_count) # move the security-associations seq number on to the last we used self.vapi.cli("test ipsec sa %d seq 0x15f" % p.scapy_tra_sa_id) @@ -906,7 +968,7 @@ class IpsecTra4(object): ] self.send_and_expect(self.tra_if, pkts, self.tra_if) - self.assertEqual(p.tra_sa_out.get_lost(), 0) + self.assertEqual(p.tra_sa_in.get_err("lost"), 0) # skip a sequence number pkts = [ @@ -921,7 +983,7 @@ class IpsecTra4(object): ] self.send_and_expect(self.tra_if, pkts, self.tra_if) - self.assertEqual(p.tra_sa_out.get_lost(), 0) + self.assertEqual(p.tra_sa_in.get_err("lost"), 0) # the lost packet are counted untill we get up past the first # sizeof(replay_window) packets @@ -937,7 +999,7 @@ class IpsecTra4(object): ] self.send_and_expect(self.tra_if, pkts, self.tra_if) - self.assertEqual(p.tra_sa_out.get_lost(), 1) + self.assertEqual(p.tra_sa_in.get_err("lost"), 1) # lost of holes in the sequence pkts = [ @@ -964,7 +1026,7 @@ class IpsecTra4(object): ] self.send_and_expect(self.tra_if, pkts, self.tra_if) - self.assertEqual(p.tra_sa_out.get_lost(), 51) + self.assertEqual(p.tra_sa_in.get_err("lost"), 51) # a big hole in the seq number space pkts = [ @@ -979,7 +1041,7 @@ class IpsecTra4(object): ] self.send_and_expect(self.tra_if, pkts, self.tra_if) - self.assertEqual(p.tra_sa_out.get_lost(), 151) + self.assertEqual(p.tra_sa_in.get_err("lost"), 151) def verify_tra_basic4(self, count=1, payload_size=54): """ipsec v4 transport basic test""" @@ -1018,166 +1080,932 @@ class IpsecTra4(object): self.assertEqual( pkts, count, "incorrect SA out counts: expected %d != %d" % (count, pkts) ) - self.assertEqual(p.tra_sa_out.get_lost(), 0) - self.assertEqual(p.tra_sa_in.get_lost(), 0) + self.assertEqual(p.tra_sa_out.get_err("lost"), 0) + self.assertEqual(p.tra_sa_in.get_err("lost"), 0) self.assert_packet_counter_equal(self.tra4_encrypt_node_name, count) self.assert_packet_counter_equal(self.tra4_decrypt_node_name[0], count) + def _verify_tra_anti_replay_algorithm_esn(self): + def seq_num(seqh, seql): + return (seqh << 32) | (seql & 0xFFFF_FFFF) -class IpsecTra4Tests(IpsecTra4): - """UT test methods for Transport v4""" + p = self.params[socket.AF_INET] + anti_replay_window_size = p.anti_replay_window_size - def test_tra_anti_replay(self): - """ipsec v4 transport anti-replay test""" - self.verify_tra_anti_replay() + seq_cycle_node_name = "/err/%s/seq_cycled" % self.tra4_encrypt_node_name + replay_count = self.get_replay_counts(p) + hash_failed_count = self.get_hash_failed_counts(p) + seq_cycle_count = self.statistics.get_err_counter(seq_cycle_node_name) - def test_tra_lost(self): - """ipsec v4 transport lost packet test""" - self.verify_tra_lost() + if ESP == self.encryption_type: + undersize_node_name = "/err/%s/runt" % self.tra4_decrypt_node_name[0] + undersize_count = self.statistics.get_err_counter(undersize_node_name) - def test_tra_basic(self, count=1): - """ipsec v4 transport basic test""" - self.verify_tra_basic4(count=1) + # reset the TX SA to avoid conflict with left configuration + self.vapi.cli(f"test ipsec sa {p.vpp_tra_sa_id} seq 0x0") - def test_tra_burst(self): - """ipsec v4 transport burst test""" - self.verify_tra_basic4(count=257) + """ + RFC 4303 Appendix A2. Case A + |: new Th marker + a-i: possible seq num received + +: Bl, Tl, Bl', Tl' + [BT]l(sign) = [BT]l (sign) 2^32 mod 2^32 (Th inc/dec-remented by 1) -class IpsecTra6(object): - """verify methods for Transport v6""" + Th - 1 Th Th + 1 + --|--a--+---b---+-c--|--d--+---e---+-f--|--g--+---h---+--i-|-- + ========= ========= ========= + Bl- Tl- Bl Tl Bl+ Tl+ - def verify_tra_basic6(self, count=1, payload_size=54): - self.vapi.cli("clear errors") - self.vapi.cli("clear ipsec sa") - try: - p = self.params[socket.AF_INET6] - send_pkts = self.gen_encrypt_pkts6( - p, - p.scapy_tra_sa, - self.tra_if, - src=self.tra_if.remote_ip6, - dst=self.tra_if.local_ip6, - count=count, - payload_size=payload_size, + Case A implies Tl >= W - 1 + """ + + Th = 1 + Tl = anti_replay_window_size + 40 + Bl = Tl - anti_replay_window_size + 1 + + # move VPP's RX AR window to Case A + self.vapi.cli(f"test ipsec sa {p.scapy_tra_sa_id} seq {seq_num(Th, Tl):#x}") + p.scapy_tra_sa.seq_num = seq_num(Th, Tl) + + """ + case a: Seql < Bl + - pre-crypto check: algorithm predicts that the packet wrap the window + -> Seqh = Th + 1 + - integrity check: should fail + - post-crypto check: ... + """ + pkts = [ + ( + Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac) + / p.scapy_tra_sa.encrypt( + IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), + seq_num=seq, + ) ) - recv_pkts = self.send_and_expect(self.tra_if, send_pkts, self.tra_if) - for rx in recv_pkts: - self.assertEqual(len(rx) - len(Ether()) - len(IPv6()), rx[IPv6].plen) - try: - decrypted = p.vpp_tra_sa.decrypt(rx[IPv6]) - self.assert_packet_checksums_valid(decrypted) - except: - self.logger.debug(ppp("Unexpected packet:", rx)) - raise - finally: - self.logger.info(self.vapi.ppcli("show error")) - self.logger.info(self.vapi.ppcli("show ipsec all")) + for seq in range(seq_num(Th - 1, Bl - 20), seq_num(Th - 1, Bl - 5)) + ] - pkts = p.tra_sa_in.get_stats()["packets"] - self.assertEqual( - pkts, count, "incorrect SA in counts: expected %d != %d" % (count, pkts) - ) - pkts = p.tra_sa_out.get_stats()["packets"] - self.assertEqual( - pkts, count, "incorrect SA out counts: expected %d != %d" % (count, pkts) - ) - self.assert_packet_counter_equal(self.tra6_encrypt_node_name, count) - self.assert_packet_counter_equal(self.tra6_decrypt_node_name[0], count) + # out-of-window packets fail integrity check + self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2) + hash_failed_count += len(pkts) + self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count) - def gen_encrypt_pkts_ext_hdrs6( - self, sa, sw_intf, src, dst, count=1, payload_size=54 - ): - return [ - Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) - / sa.encrypt( - IPv6(src=src, dst=dst) - / ICMPv6EchoRequest(id=0, seq=1, data="X" * payload_size) + """ + case b: Bl <= Seql <= Tl + - pre-crypto check: algorithm predicts that the packet is in the window + -> Seqh = Th + -> check for a replayed packet with Seql + - integrity check: should fail + - post-crypto check: ... + """ + pkts = [ + ( + Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac) + / p.scapy_tra_sa.encrypt( + IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), + seq_num=seq, + ) ) - for i in range(count) + for seq in range(seq_num(Th, Tl - 10), seq_num(Th, Tl - 5)) ] + self.send_and_expect(self.tra_if, pkts, self.tra_if) - def gen_pkts_ext_hdrs6(self, sw_intf, src, dst, count=1, payload_size=54): - return [ - Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) - / IPv6(src=src, dst=dst) - / IPv6ExtHdrHopByHop() - / IPv6ExtHdrFragment(id=2, offset=200) - / Raw(b"\xff" * 200) - for i in range(count) + p.scapy_tra_sa.seq_num = seq_num(Th - 1, Tl) + pkts = [ + ( + Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac) + / p.scapy_tra_sa.encrypt( + IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), + seq_num=seq, + ) + ) + for seq in range(seq_num(Th - 1, Tl - 35), seq_num(Th - 1, Tl - 5)) ] - def verify_tra_encrypted6(self, p, sa, rxs): - decrypted = [] - for rx in rxs: - self.assert_packet_checksums_valid(rx) - try: - decrypt_pkt = p.vpp_tra_sa.decrypt(rx[IPv6]) - decrypted.append(decrypt_pkt) - self.assert_equal(decrypt_pkt.src, self.tra_if.local_ip6) - self.assert_equal(decrypt_pkt.dst, self.tra_if.remote_ip6) - except: - self.logger.debug(ppp("Unexpected packet:", rx)) - try: - self.logger.debug(ppp("Decrypted packet:", decrypt_pkt)) - except: - pass - raise - return decrypted + self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2) - def verify_tra_66_ext_hdrs(self, p): - count = 63 + # some packets are rejected by the pre-crypto check + replay_count += 5 + self.assertEqual(self.get_replay_counts(p), replay_count) - # - # check we can decrypt with options - # - tx = self.gen_encrypt_pkts_ext_hdrs6( - p.scapy_tra_sa, - self.tra_if, - src=self.tra_if.remote_ip6, - dst=self.tra_if.local_ip6, - count=count, - ) - self.send_and_expect(self.tra_if, tx, self.tra_if) + # out-of-window packets fail integrity check + hash_failed_count += len(pkts) - 5 + self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count) - # - # injecting a packet from ourselves to be routed of box is a hack - # but it matches an outbout policy, alors je ne regrette rien - # + """ + case c: Seql > Tl + - pre-crypto check: algorithm predicts that the packet does not wrap the window + -> Seqh = Th + - integrity check: should fail + - post-crypto check: ... + """ + pkts = [ + ( + Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac) + / p.scapy_tra_sa.encrypt( + IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), + seq_num=seq, + ) + ) + for seq in range(seq_num(Th - 1, Tl + 5), seq_num(Th - 1, Tl + 20)) + ] - # one extension before ESP - tx = ( - Ether(src=self.pg2.remote_mac, dst=self.pg2.local_mac) - / IPv6(src=self.tra_if.local_ip6, dst=self.tra_if.remote_ip6) - / IPv6ExtHdrFragment(id=2, offset=200) - / Raw(b"\xff" * 200) - ) + self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2) - rxs = self.send_and_expect(self.pg2, [tx], self.tra_if) - dcs = self.verify_tra_encrypted6(p, p.vpp_tra_sa, rxs) + # out-of-window packets fail integrity check + hash_failed_count += len(pkts) + self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count) - for dc in dcs: - # for reasons i'm not going to investigate scapy does not - # created the correct headers after decrypt. but reparsing - # the ipv6 packet fixes it - dc = IPv6(raw(dc[IPv6])) - self.assert_equal(dc[IPv6ExtHdrFragment].id, 2) + """ + case d: Seql < Bl + - pre-crypto check: algorithm predicts that the packet wrap the window + -> Seqh = Th + 1 + - integrity check: should fail + - post-crypto check: ... + """ + p.scapy_tra_sa.seq_num = seq_num(Th, Tl) + pkts = [ + ( + Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac) + / p.scapy_tra_sa.encrypt( + IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), + seq_num=seq, + ) + ) + for seq in range(seq_num(Th, Bl - 20), seq_num(Th, Bl - 5)) + ] - # two extensions before ESP - tx = ( - Ether(src=self.pg2.remote_mac, dst=self.pg2.local_mac) - / IPv6(src=self.tra_if.local_ip6, dst=self.tra_if.remote_ip6) - / IPv6ExtHdrHopByHop() - / IPv6ExtHdrFragment(id=2, offset=200) - / Raw(b"\xff" * 200) - ) + # out-of-window packets fail integrity check + self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2) + hash_failed_count += len(pkts) + self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count) - rxs = self.send_and_expect(self.pg2, [tx], self.tra_if) - dcs = self.verify_tra_encrypted6(p, p.vpp_tra_sa, rxs) + """ + case e: Bl <= Seql <= Tl + - pre-crypto check: algorithm predicts that the packet is in the window + -> Seqh = Th + -> check for a replayed packet with Seql + - integrity check: should pass + - post-crypto check: should pass + -> Seql is marked in the AR window + """ + pkts = [ + ( + Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac) + / p.scapy_tra_sa.encrypt( + IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), + seq_num=seq, + ) + ) + for seq in range(seq_num(Th, Bl + 10), seq_num(Th, Bl + 30)) + ] - for dc in dcs: - dc = IPv6(raw(dc[IPv6])) + self.send_and_expect(self.tra_if, pkts, self.tra_if) + + """ + case f: Seql > Tl + - pre-crypto check: algorithm predicts that the packet does not wrap the window + -> Seqh = Th + - integrity check: should pass + - post-crypto check: should pass + -> AR window shift (the window stays Case A) + -> Seql is marked in the AR window + """ + pkts = [ + ( + Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac) + / p.scapy_tra_sa.encrypt( + IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), + seq_num=seq, + ) + ) + for seq in range(seq_num(Th, Tl + 50), seq_num(Th, Tl + 60)) + ] + + self.send_and_expect(self.tra_if, pkts, self.tra_if) + + """ + case g: Seql < Bl + - pre-crypto check: algorithm predicts that the packet wrap the window + -> Seqh = Th + 1 + - integrity check: should pass + - post-crypto check: should pass + -> AR window shift (may set the window in Case B) + -> Seql is marked in the AR window + """ + p.scapy_tra_sa.seq_num = seq_num(Th + 1, Tl) + pkts = [ + ( + Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac) + / p.scapy_tra_sa.encrypt( + IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), + seq_num=seq, + ) + ) + # set the window in Case B (the minimum window size is 64 + # so we are sure to overlap) + for seq in range(seq_num(Th + 1, 10), seq_num(Th + 1, 20)) + ] + + self.send_and_expect(self.tra_if, pkts, self.tra_if) + + # reset the VPP's RX AR window to Case A + Th = 1 + Tl = 2 * anti_replay_window_size + 40 + Bl = Tl - anti_replay_window_size + 1 + + self.vapi.cli(f"test ipsec sa {p.scapy_tra_sa_id} seq {seq_num(Th, Tl):#x}") + + p.scapy_tra_sa.seq_num = seq_num(Th + 1, Tl) + pkts = [ + ( + Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac) + / p.scapy_tra_sa.encrypt( + IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), + seq_num=seq, + ) + ) + # the AR will stay in Case A + for seq in range( + seq_num(Th + 1, anti_replay_window_size + 10), + seq_num(Th + 1, anti_replay_window_size + 20), + ) + ] + + self.send_and_expect(self.tra_if, pkts, self.tra_if) + + """ + case h: Bl <= Seql <= Tl + - pre-crypto check: algorithm predicts that the packet is in the window + -> Seqh = Th + -> check for a replayed packet with Seql + - integrity check: the wrap is not detected, should fail + - post-crypto check: ... + """ + Th += 1 + Tl = anti_replay_window_size + 20 + Bl = Tl - anti_replay_window_size + 1 + + p.scapy_tra_sa.seq_num = seq_num(Th + 1, Tl) + + pkts = [ + ( + Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac) + / p.scapy_tra_sa.encrypt( + IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), + seq_num=seq, + ) + ) + for seq in range(seq_num(Th + 1, Tl - 20), seq_num(Th + 1, Tl - 5)) + ] + + self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2) + + # some packets are rejected by the pre-crypto check + replay_count += 5 + self.assertEqual(self.get_replay_counts(p), replay_count) + + # out-of-window packets fail integrity check + hash_failed_count += len(pkts) - 5 + self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count) + + """ + case i: Seql > Tl + - pre-crypto check: algorithm predicts that the packet does not wrap the window + -> Seqh = Th + - integrity check: the wrap is not detected, shoud fail + - post-crypto check: ... + """ + pkts = [ + ( + Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac) + / p.scapy_tra_sa.encrypt( + IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), + seq_num=seq, + ) + ) + for seq in range(seq_num(Th + 1, Tl + 5), seq_num(Th + 1, Tl + 15)) + ] + + # out-of-window packets fail integrity check + self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2) + hash_failed_count += len(pkts) + self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count) + + """ + RFC 4303 Appendix A2. Case B + + Th - 1 Th Th + 1 + ----|-a-+-----b----+--c--|-d-+----e-----+--f--|-g-+--h--- + ========= =========== =========== + Tl- Bl Tl Bl+ Tl+ + + Case B implies Tl < W - 1 + """ + + # reset the VPP's RX AR window to Case B + Th = 2 + Tl = 30 # minimum window size of 64, we are sure to overlap + Bl = (Tl - anti_replay_window_size + 1) % (1 << 32) + + self.vapi.cli(f"test ipsec sa {p.scapy_tra_sa_id} seq {seq_num(Th, Tl):#x}") + p.scapy_tra_sa.seq_num = seq_num(Th, Tl) + + """ + case a: Seql <= Tl < Bl + - pre-crypto check: algorithm predicts that the packet is in the window + -> Seqh = Th + -> check for replayed packet + - integrity check: should fail + - post-crypto check: ... + """ + pkts = [ + ( + Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac) + / p.scapy_tra_sa.encrypt( + IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), + seq_num=seq, + ) + ) + for seq in range(seq_num(Th, 5), seq_num(Th, 10)) + ] + + self.send_and_expect(self.tra_if, pkts, self.tra_if) + + p.scapy_tra_sa.seq_num = seq_num(Th - 1, Tl) + pkts = [ + ( + Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac) + / p.scapy_tra_sa.encrypt( + IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), + seq_num=seq, + ) + ) + for seq in range(seq_num(Th - 1, 0), seq_num(Th - 1, 15)) + ] + + self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2) + + # some packets are rejected by the pre-crypto check + replay_count += 5 + self.assertEqual(self.get_replay_counts(p), replay_count) + + # out-of-window packets fail integrity check + hash_failed_count += len(pkts) - 5 + self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count) + + """ + case b: Tl < Seql < Bl + - pre-crypto check: algorithm predicts that the packet will shift the window + -> Seqh = Th + - integrity check: should fail + - post-crypto check: ... + """ + pkts = [ + ( + Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac) + / p.scapy_tra_sa.encrypt( + IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), + seq_num=seq, + ) + ) + for seq in range(seq_num(Th - 1, Tl + 10), seq_num(Th - 1, Tl + 20)) + ] + + self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2) + + # out-of-window packets fail integrity check + hash_failed_count += len(pkts) + self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count) + + """ + case c: Tl < Bl <= Seql + - pre-crypto check: algorithm predicts that the packet is in the window + -> Seqh = Th - 1 + -> check for a replayed packet with Seql + - integrity check: should pass + - post-crypto check: should pass + -> Seql is marked in the AR window + """ + pkts = [ + ( + Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac) + / p.scapy_tra_sa.encrypt( + IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), + seq_num=seq, + ) + ) + for seq in range(seq_num(Th - 1, Bl + 10), seq_num(Th - 1, Bl + 20)) + ] + + self.send_and_expect(self.tra_if, pkts, self.tra_if) + + """ + case d: Seql <= Tl < Bl + - pre-crypto check: algorithm predicts that the packet is the window + -> Seqh = Th + -> check for replayed packet + - integrity check: should pass + - post-crypto check: should pass + -> Seql is marked in the AR window + """ + p.scapy_tra_sa.seq_num = seq_num(Th, Tl) + pkts = [ + ( + Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac) + / p.scapy_tra_sa.encrypt( + IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), + seq_num=seq, + ) + ) + for seq in range(seq_num(Th, 15), seq_num(Th, 25)) + ] + + self.send_and_expect(self.tra_if, pkts, self.tra_if) + + """ + case e: Tl < Seql < Bl + - pre-crypto check: algorithm predicts that the packet is in the window + -> Seqh = Th + -> check for a replayed packet with Seql + - integrity check: should pass + - post-crypto check: should pass + -> AR window shift (may set the window in Case A) + -> Seql is marked in the AR window + """ + pkts = [ + ( + Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac) + / p.scapy_tra_sa.encrypt( + IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), + seq_num=seq, + ) + ) + for seq in range(seq_num(Th, Tl + 5), seq_num(Th, Tl + 15)) + ] + + # the window stays in Case B + self.send_and_expect(self.tra_if, pkts, self.tra_if) + + pkts = [ + ( + Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac) + / p.scapy_tra_sa.encrypt( + IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), + seq_num=seq, + ) + ) + for seq in range( + seq_num(Th, Tl + anti_replay_window_size + 5), + seq_num(Th, Tl + anti_replay_window_size + 15), + ) + ] + + # the window moves to Case A + self.send_and_expect(self.tra_if, pkts, self.tra_if) + + # reset the VPP's RX AR window to Case B + Th = 2 + Tl = 30 # minimum window size of 64, we are sure to overlap + Bl = (Tl - anti_replay_window_size + 1) % (1 << 32) + + self.vapi.cli(f"test ipsec sa {p.scapy_tra_sa_id} seq {seq_num(Th, Tl):#x}") + p.scapy_tra_sa.seq_num = seq_num(Th, Tl) + + """ + case f: Tl < Bl <= Seql + - pre-crypto check: algorithm predicts that the packet is in the previous window + -> Seqh = Th - 1 + -> check for a replayed packet with Seql + - integrity check: should fail + - post-crypto check: ... + """ + pkts = [ + ( + Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac) + / p.scapy_tra_sa.encrypt( + IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), + seq_num=seq, + ) + ) + for seq in range(seq_num(Th, Bl + 10), seq_num(Th, Bl + 20)) + ] + + self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2) + + # out-of-window packets fail integrity check + hash_failed_count += len(pkts) + self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count) + + """ + case g: Seql <= Tl < Bl + - pre-crypto check: algorithm predicts that the packet is the window + -> Seqh = Th + -> check for replayed packet + - integrity check: should fail + - post-crypto check: ... + """ + pkts = [ + ( + Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac) + / p.scapy_tra_sa.encrypt( + IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), + seq_num=seq, + ) + ) + for seq in range(seq_num(Th, 10), seq_num(Th, 15)) + ] + + self.send_and_expect(self.tra_if, pkts, self.tra_if) + + p.scapy_tra_sa.seq_num = seq_num(Th + 1, Tl) + pkts = [ + ( + Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac) + / p.scapy_tra_sa.encrypt( + IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), + seq_num=seq, + ) + ) + for seq in range(seq_num(Th + 1, 0), seq_num(Th + 1, 15)) + ] + + self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2) + + # some packets are rejected by the pre-crypto check + replay_count += 5 + self.assertEqual(self.get_replay_counts(p), replay_count) + + # out-of-window packets fail integrity check + hash_failed_count += len(pkts) - 5 + self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count) + + """ + case h: Tl < Seql < Bl + - pre-crypto check: algorithm predicts that the packet will shift the window + -> Seqh = Th + - integrity check: should fail + - post-crypto check: ... + """ + pkts = [ + ( + Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac) + / p.scapy_tra_sa.encrypt( + IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), + seq_num=seq, + ) + ) + for seq in range(seq_num(Th + 1, Tl + 10), seq_num(Th + 1, Tl + 20)) + ] + + self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2) + + # out-of-window packets fail integrity check + hash_failed_count += len(pkts) + self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count) + + def _verify_tra_anti_replay_algorithm_no_esn(self): + def seq_num(seql): + return seql & 0xFFFF_FFFF + + p = self.params[socket.AF_INET] + anti_replay_window_size = p.anti_replay_window_size + + seq_cycle_node_name = "/err/%s/seq_cycled" % self.tra4_encrypt_node_name + replay_count = self.get_replay_counts(p) + hash_failed_count = self.get_hash_failed_counts(p) + seq_cycle_count = self.statistics.get_err_counter(seq_cycle_node_name) + + if ESP == self.encryption_type: + undersize_node_name = "/err/%s/runt" % self.tra4_decrypt_node_name[0] + undersize_count = self.statistics.get_err_counter(undersize_node_name) + + # reset the TX SA to avoid conflict with left configuration + self.vapi.cli(f"test ipsec sa {p.vpp_tra_sa_id} seq 0x0") + + """ + RFC 4303 Appendix A2. Case A + + a-c: possible seq num received + +: Bl, Tl + + |--a--+---b---+-c--| + ========= + Bl Tl + + No ESN implies Th = 0 + Case A implies Tl >= W - 1 + """ + + Tl = anti_replay_window_size + 40 + Bl = Tl - anti_replay_window_size + 1 + + # move VPP's RX AR window to Case A + self.vapi.cli(f"test ipsec sa {p.scapy_tra_sa_id} seq {seq_num(Tl):#x}") + p.scapy_tra_sa.seq_num = seq_num(Tl) + + """ + case a: Seql < Bl + - pre-crypto check: algorithm predicts that the packet is out of window + -> packet should be dropped + - integrity check: ... + - post-crypto check: ... + """ + pkts = [ + ( + Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac) + / p.scapy_tra_sa.encrypt( + IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), + seq_num=seq, + ) + ) + for seq in range(seq_num(Bl - 20), seq_num(Bl - 5)) + ] + + # out-of-window packets + self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2) + replay_count += len(pkts) + self.assertEqual(self.get_replay_counts(p), replay_count) + + """ + case b: Bl <= Seql <= Tl + - pre-crypto check: algorithm predicts that the packet is in the window + -> check for a replayed packet with Seql + - integrity check: should pass + - post-crypto check: + -> check for a replayed packet with Seql + """ + pkts = [ + ( + Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac) + / p.scapy_tra_sa.encrypt( + IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), + seq_num=seq, + ) + ) + for seq in range(seq_num(Tl - 50), seq_num(Tl - 30)) + ] + self.send_and_expect(self.tra_if, pkts, self.tra_if) + + pkts = [ + ( + Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac) + / p.scapy_tra_sa.encrypt( + IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), + seq_num=seq, + ) + ) + for seq in range(seq_num(Tl - 35), seq_num(Tl - 30)) + ] + + self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2) + + # replayed packets + replay_count += 5 + self.assertEqual(self.get_replay_counts(p), replay_count) + + """ + case c: Seql > Tl + - pre-crypto check: algorithm predicts that the packet will shift the window + - integrity check: should pass + - post-crypto check: should pass + -> AR window is shifted + """ + pkts = [ + ( + Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac) + / p.scapy_tra_sa.encrypt( + IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), + seq_num=seq, + ) + ) + for seq in range(seq_num(Tl + 5), seq_num(Tl + 20)) + ] + + self.send_and_expect(self.tra_if, pkts, self.tra_if) + + """ + RFC 4303 Appendix A2. Case B + + |-a-----+------b-----| + ========= + Tl + + Case B implies Tl < W - 1 + """ + + # reset the VPP's RX AR window to Case B + Tl = 30 # minimum window size of 64, we are sure to overlap + Bl = seq_num(Tl - anti_replay_window_size + 1) + + self.vapi.cli(f"test ipsec sa {p.scapy_tra_sa_id} seq {seq_num(Tl):#x}") + + """ + case a: Seql <= Tl < Bl + - pre-crypto check: algorithm predicts that the packet is in the window + -> check for replayed packet + - integrity check: should fail + - post-crypto check: ... + """ + pkts = [ + ( + Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac) + / p.scapy_tra_sa.encrypt( + IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), + seq_num=seq, + ) + ) + for seq in range(seq_num(5), seq_num(10)) + ] + + self.send_and_expect(self.tra_if, pkts, self.tra_if) + + """ + case b: Tl < Seql < Bl + - pre-crypto check: algorithm predicts that the packet will shift the window + - integrity check: should pass + - post-crypto check: should pass + -> AR window is shifted + """ + pkts = [ + ( + Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac) + / p.scapy_tra_sa.encrypt( + IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), + seq_num=seq, + ) + ) + for seq in range(seq_num(-50), seq_num(-20)) + ] + + self.send_and_expect(self.tra_if, pkts, self.tra_if) + + def verify_tra_anti_replay_algorithm(self): + if self.params[socket.AF_INET].vpp_tra_sa.esn_en: + self._verify_tra_anti_replay_algorithm_esn() + else: + self._verify_tra_anti_replay_algorithm_no_esn() + + +class IpsecTra4Tests(IpsecTra4): + """UT test methods for Transport v4""" + + def test_tra_anti_replay(self): + """ipsec v4 transport anti-replay test""" + self.verify_tra_anti_replay() + + def test_tra_anti_replay_algorithm(self): + """ipsec v4 transport anti-replay algorithm test""" + self.verify_tra_anti_replay_algorithm() + + def test_tra_lost(self): + """ipsec v4 transport lost packet test""" + self.verify_tra_lost() + + def test_tra_basic(self, count=1): + """ipsec v4 transport basic test""" + self.verify_tra_basic4(count=1) + + def test_tra_burst(self): + """ipsec v4 transport burst test""" + self.verify_tra_basic4(count=257) + + +class IpsecTra6(object): + """verify methods for Transport v6""" + + def verify_tra_basic6(self, count=1, payload_size=54): + self.vapi.cli("clear errors") + self.vapi.cli("clear ipsec sa") + try: + p = self.params[socket.AF_INET6] + send_pkts = self.gen_encrypt_pkts6( + p, + p.scapy_tra_sa, + self.tra_if, + src=self.tra_if.remote_ip6, + dst=self.tra_if.local_ip6, + count=count, + payload_size=payload_size, + ) + recv_pkts = self.send_and_expect(self.tra_if, send_pkts, self.tra_if) + for rx in recv_pkts: + self.assertEqual(len(rx) - len(Ether()) - len(IPv6()), rx[IPv6].plen) + try: + decrypted = p.vpp_tra_sa.decrypt(rx[IPv6]) + self.assert_packet_checksums_valid(decrypted) + except: + self.logger.debug(ppp("Unexpected packet:", rx)) + raise + finally: + self.logger.info(self.vapi.ppcli("show error")) + self.logger.info(self.vapi.ppcli("show ipsec all")) + + pkts = p.tra_sa_in.get_stats()["packets"] + self.assertEqual( + pkts, count, "incorrect SA in counts: expected %d != %d" % (count, pkts) + ) + pkts = p.tra_sa_out.get_stats()["packets"] + self.assertEqual( + pkts, count, "incorrect SA out counts: expected %d != %d" % (count, pkts) + ) + self.assert_packet_counter_equal(self.tra6_encrypt_node_name, count) + self.assert_packet_counter_equal(self.tra6_decrypt_node_name[0], count) + + def gen_encrypt_pkts_ext_hdrs6( + self, sa, sw_intf, src, dst, count=1, payload_size=54 + ): + return [ + Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) + / sa.encrypt( + IPv6(src=src, dst=dst) + / ICMPv6EchoRequest(id=0, seq=1, data="X" * payload_size) + ) + for i in range(count) + ] + + def gen_pkts_ext_hdrs6(self, sw_intf, src, dst, count=1, payload_size=54): + return [ + Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) + / IPv6(src=src, dst=dst) + / IPv6ExtHdrHopByHop() + / IPv6ExtHdrFragment(id=2, offset=200) + / Raw(b"\xff" * 200) + for i in range(count) + ] + + def verify_tra_encrypted6(self, p, sa, rxs): + decrypted = [] + for rx in rxs: + self.assert_packet_checksums_valid(rx) + try: + decrypt_pkt = p.vpp_tra_sa.decrypt(rx[IPv6]) + decrypted.append(decrypt_pkt) + self.assert_equal(decrypt_pkt.src, self.tra_if.local_ip6) + self.assert_equal(decrypt_pkt.dst, self.tra_if.remote_ip6) + except: + self.logger.debug(ppp("Unexpected packet:", rx)) + try: + self.logger.debug(ppp("Decrypted packet:", decrypt_pkt)) + except: + pass + raise + return decrypted + + def verify_tra_66_ext_hdrs(self, p): + count = 63 + + # + # check we can decrypt with options + # + tx = self.gen_encrypt_pkts_ext_hdrs6( + p.scapy_tra_sa, + self.tra_if, + src=self.tra_if.remote_ip6, + dst=self.tra_if.local_ip6, + count=count, + ) + self.send_and_expect(self.tra_if, tx, self.tra_if) + + # + # injecting a packet from ourselves to be routed of box is a hack + # but it matches an outbout policy, alors je ne regrette rien + # + + # one extension before ESP + tx = ( + Ether(src=self.pg2.remote_mac, dst=self.pg2.local_mac) + / IPv6(src=self.tra_if.local_ip6, dst=self.tra_if.remote_ip6) + / IPv6ExtHdrFragment(id=2, offset=200) + / Raw(b"\xff" * 200) + ) + + rxs = self.send_and_expect(self.pg2, [tx], self.tra_if) + dcs = self.verify_tra_encrypted6(p, p.vpp_tra_sa, rxs) + + for dc in dcs: + # for reasons i'm not going to investigate scapy does not + # created the correct headers after decrypt. but reparsing + # the ipv6 packet fixes it + dc = IPv6(raw(dc[IPv6])) + self.assert_equal(dc[IPv6ExtHdrFragment].id, 2) + + # two extensions before ESP + tx = ( + Ether(src=self.pg2.remote_mac, dst=self.pg2.local_mac) + / IPv6(src=self.tra_if.local_ip6, dst=self.tra_if.remote_ip6) + / IPv6ExtHdrHopByHop() + / IPv6ExtHdrFragment(id=2, offset=200) + / Raw(b"\xff" * 200) + ) + + rxs = self.send_and_expect(self.pg2, [tx], self.tra_if) + dcs = self.verify_tra_encrypted6(p, p.vpp_tra_sa, rxs) + + for dc in dcs: + dc = IPv6(raw(dc[IPv6])) self.assertTrue(dc[IPv6ExtHdrHopByHop]) self.assert_equal(dc[IPv6ExtHdrFragment].id, 2) @@ -1273,7 +2101,7 @@ class IpsecTun4(object): decrypt_pkts = [] for rx in rxs: if p.nat_header: - self.assertEqual(rx[UDP].dport, 4500) + self.assertEqual(rx[UDP].dport, p.nat_header.dport) self.assert_packet_checksums_valid(rx) self.assertEqual(len(rx) - len(Ether()), rx[IP].len) try: @@ -1459,7 +2287,7 @@ class IpsecTun4(object): ) self.send_and_assert_no_replies(self.tun_if, pkt * 31) self.assert_error_counter_equal( - "/err/%s/NAT Keepalive" % self.tun4_input_node, 31 + "/err/%s/nat_keepalive" % self.tun4_input_node, 31 ) pkt = ( @@ -1469,7 +2297,7 @@ class IpsecTun4(object): / Raw(b"\xfe") ) self.send_and_assert_no_replies(self.tun_if, pkt * 31) - self.assert_error_counter_equal("/err/%s/Too Short" % self.tun4_input_node, 31) + self.assert_error_counter_equal("/err/%s/too_short" % self.tun4_input_node, 31) pkt = ( Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac) @@ -1479,7 +2307,7 @@ class IpsecTun4(object): / Padding(0 * 21) ) self.send_and_assert_no_replies(self.tun_if, pkt * 31) - self.assert_error_counter_equal("/err/%s/Too Short" % self.tun4_input_node, 62) + self.assert_error_counter_equal("/err/%s/too_short" % self.tun4_input_node, 62) class IpsecTun4Tests(IpsecTun4): @@ -1705,6 +2533,40 @@ class IpsecTun6(object): self.logger.info(self.vapi.ppcli("show ipsec all")) self.verify_counters6(p, p, count) + def verify_keepalive(self, p): + # the sizeof Raw is calculated to pad to the minimum ehternet + # frame size of 64 btyes + pkt = ( + Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac) + / IPv6(src=p.remote_tun_if_host, dst=self.tun_if.local_ip6) + / UDP(sport=333, dport=4500) + / Raw(b"\xff") + / Padding(0 * 1) + ) + self.send_and_assert_no_replies(self.tun_if, pkt * 31) + self.assert_error_counter_equal( + "/err/%s/nat_keepalive" % self.tun6_input_node, 31 + ) + + pkt = ( + Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac) + / IPv6(src=p.remote_tun_if_host, dst=self.tun_if.local_ip6) + / UDP(sport=333, dport=4500) + / Raw(b"\xfe") + ) + self.send_and_assert_no_replies(self.tun_if, pkt * 31) + self.assert_error_counter_equal("/err/%s/too_short" % self.tun6_input_node, 31) + + pkt = ( + Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac) + / IPv6(src=p.remote_tun_if_host, dst=self.tun_if.local_ip6) + / UDP(sport=333, dport=4500) + / Raw(b"\xfe") + / Padding(0 * 21) + ) + self.send_and_assert_no_replies(self.tun_if, pkt * 31) + self.assert_error_counter_equal("/err/%s/too_short" % self.tun6_input_node, 62) + class IpsecTun6Tests(IpsecTun6): """UT test methods for Tunnel v6""" @@ -1945,20 +2807,48 @@ class IPSecIPv4Fwd(VppTestCase): self.logger.info(self.vapi.ppcli("show ipsec all")) return spdEntry - def create_stream(self, src_if, dst_if, pkt_count, src_prt=1234, dst_prt=5678): + def create_stream( + self, src_if, dst_if, pkt_count, src_prt=1234, dst_prt=5678, proto="UDP" + ): packets = [] + # create SA + sa = SecurityAssociation( + ESP, + spi=1000, + crypt_algo="AES-CBC", + crypt_key=b"JPjyOWBeVEQiMe7h", + auth_algo="HMAC-SHA1-96", + auth_key=b"C91KUR9GYMm5GfkEvNjX", + tunnel_header=IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4), + nat_t_header=UDP(sport=src_prt, dport=dst_prt), + ) for i in range(pkt_count): # create packet info stored in the test case instance info = self.create_packet_info(src_if, dst_if) # convert the info into packet payload payload = self.info_to_payload(info) # create the packet itself - p = ( - Ether(dst=src_if.local_mac, src=src_if.remote_mac) - / IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4) - / UDP(sport=src_prt, dport=dst_prt) - / Raw(payload) - ) + p = [] + if proto == "UDP-ESP": + p = Ether(dst=src_if.local_mac, src=src_if.remote_mac) / sa.encrypt( + IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4) + / UDP(sport=src_prt, dport=dst_prt) + / Raw(payload) + ) + elif proto == "UDP": + p = ( + Ether(dst=src_if.local_mac, src=src_if.remote_mac) + / IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4) + / UDP(sport=src_prt, dport=dst_prt) + / Raw(payload) + ) + elif proto == "TCP": + p = ( + Ether(dst=src_if.local_mac, src=src_if.remote_mac) + / IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4) + / TCP(sport=src_prt, dport=dst_prt) + / Raw(payload) + ) # store a copy of the packet in the packet info info.data = p.copy() # append the packet to the list @@ -2012,6 +2902,50 @@ class IPSecIPv4Fwd(VppTestCase): self.logger.info("Policy %s matched: %d pkts", str(spdEntry), matched_pkts) self.assert_equal(pkt_count, matched_pkts) + # Method verify_l3_l4_capture() will verify network and transport layer + # fields of the packet sa.encrypt() gives interface number garbadge. + # thus interface validation get failed (scapy bug?). However our intent + # is to verify IP layer and above and that is covered. + + def verify_l3_l4_capture( + self, src_if, dst_if, capture, tcp_port_in=1234, udp_port_in=5678 + ): + for packet in capture: + try: + self.assert_packet_checksums_valid(packet) + self.assert_equal( + packet[IP].src, + src_if.remote_ip4, + "decrypted packet source address", + ) + self.assert_equal( + packet[IP].dst, + dst_if.remote_ip4, + "decrypted packet destination address", + ) + if packet.haslayer(TCP): + self.assertFalse( + packet.haslayer(UDP), + "unexpected UDP header in decrypted packet", + ) + elif packet.haslayer(UDP): + if packet[UDP].payload: + self.assertFalse( + packet[UDP][1].haslayer(UDP), + "unexpected UDP header in decrypted packet", + ) + else: + self.assertFalse( + packet.haslayer(UDP), + "unexpected UDP header in decrypted packet", + ) + self.assert_equal( + packet[ICMP].id, self.icmp_id_in, "decrypted packet ICMP ID" + ) + except Exception: + self.logger.error(ppp("Unexpected or invalid plain packet:", packet)) + raise + class SpdFlowCacheTemplate(IPSecIPv4Fwd): @classmethod @@ -2087,6 +3021,279 @@ class SpdFlowCacheTemplate(IPSecIPv4Fwd): self.logger.info("\ncrc32 NOT supported:\n" + cpu_info) return False + def create_stream( + cls, src_if, dst_if, pkt_count, src_prt=1234, dst_prt=5678, proto="UDP-ESP" + ): + packets = [] + packets = super(SpdFlowCacheTemplate, cls).create_stream( + src_if, dst_if, pkt_count, src_prt, dst_prt, proto + ) + return packets + + def verify_capture( + self, src_if, dst_if, capture, tcp_port_in=1234, udp_port_in=5678 + ): + super(SpdFlowCacheTemplate, self).verify_l3_l4_capture( + src_if, dst_if, capture, tcp_port_in, udp_port_in + ) + + +class SpdFastPathTemplate(IPSecIPv4Fwd): + @classmethod + def setUpConstants(cls): + super(SpdFastPathTemplate, cls).setUpConstants() + # Override this method with required cmdline parameters e.g. + # cls.vpp_cmdline.extend(["ipsec", "{", + # "ipv4-outbound-spd-flow-cache on", + # "}"]) + # cls.logger.info("VPP modified cmdline is %s" % " " + # .join(cls.vpp_cmdline)) + + def setUp(self): + super(SpdFastPathTemplate, self).setUp() + + def tearDown(self): + super(SpdFastPathTemplate, self).tearDown() + + def create_stream( + cls, src_if, dst_if, pkt_count, src_prt=1234, dst_prt=5678, proto="UDP-ESP" + ): + packets = [] + packets = super(SpdFastPathTemplate, cls).create_stream( + src_if, dst_if, pkt_count, src_prt, dst_prt, proto + ) + return packets + + def verify_capture( + self, src_if, dst_if, capture, tcp_port_in=1234, udp_port_in=5678 + ): + super(SpdFastPathTemplate, self).verify_l3_l4_capture( + src_if, dst_if, capture, tcp_port_in, udp_port_in + ) + + +class IpsecDefaultTemplate(IPSecIPv4Fwd): + @classmethod + def setUpConstants(cls): + super(IpsecDefaultTemplate, cls).setUpConstants() + + def setUp(self): + super(IpsecDefaultTemplate, self).setUp() + + def tearDown(self): + super(IpsecDefaultTemplate, self).tearDown() + + def create_stream( + cls, src_if, dst_if, pkt_count, src_prt=1234, dst_prt=5678, proto="UDP-ESP" + ): + packets = [] + packets = super(IpsecDefaultTemplate, cls).create_stream( + src_if, dst_if, pkt_count, src_prt, dst_prt, proto + ) + return packets + + def verify_capture( + self, src_if, dst_if, capture, tcp_port_in=1234, udp_port_in=5678 + ): + super(IpsecDefaultTemplate, self).verify_l3_l4_capture( + src_if, dst_if, capture, tcp_port_in, udp_port_in + ) + + +class IPSecIPv6Fwd(VppTestCase): + """Test IPSec by capturing and verifying IPv6 forwarded pkts""" + + @classmethod + def setUpConstants(cls): + super(IPSecIPv6Fwd, cls).setUpConstants() + + def setUp(self): + super(IPSecIPv6Fwd, self).setUp() + # store SPD objects so we can remove configs on tear down + self.spd_objs = [] + self.spd_policies = [] + + def tearDown(self): + # remove SPD policies + for obj in self.spd_policies: + obj.remove_vpp_config() + self.spd_policies = [] + # remove SPD items (interface bindings first, then SPD) + for obj in reversed(self.spd_objs): + obj.remove_vpp_config() + self.spd_objs = [] + # close down pg intfs + for pg in self.pg_interfaces: + pg.unconfig_ip6() + pg.admin_down() + super(IPSecIPv6Fwd, self).tearDown() + + def create_interfaces(self, num_ifs=2): + # create interfaces pg0 ... pg + self.create_pg_interfaces(range(num_ifs)) + for pg in self.pg_interfaces: + # put the interface up + pg.admin_up() + # configure IPv6 address on the interface + pg.config_ip6() + pg.resolve_ndp() + self.logger.info(self.vapi.ppcli("show int addr")) + + def spd_create_and_intf_add(self, spd_id, pg_list): + spd = VppIpsecSpd(self, spd_id) + spd.add_vpp_config() + self.spd_objs.append(spd) + for pg in pg_list: + spdItf = VppIpsecSpdItfBinding(self, spd, pg) + spdItf.add_vpp_config() + self.spd_objs.append(spdItf) + + def get_policy(self, policy_type): + e = VppEnum.vl_api_ipsec_spd_action_t + if policy_type == "protect": + return e.IPSEC_API_SPD_ACTION_PROTECT + elif policy_type == "bypass": + return e.IPSEC_API_SPD_ACTION_BYPASS + elif policy_type == "discard": + return e.IPSEC_API_SPD_ACTION_DISCARD + else: + raise Exception("Invalid policy type: %s", policy_type) + + def spd_add_rem_policy( + self, + spd_id, + src_if, + dst_if, + proto, + is_out, + priority, + policy_type, + remove=False, + all_ips=False, + ip_range=False, + local_ip_start=ip_address("0::0"), + local_ip_stop=ip_address("ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff"), + remote_ip_start=ip_address("0::0"), + remote_ip_stop=ip_address("ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff"), + remote_port_start=0, + remote_port_stop=65535, + local_port_start=0, + local_port_stop=65535, + ): + spd = VppIpsecSpd(self, spd_id) + + if all_ips: + src_range_low = ip_address("0::0") + src_range_high = ip_address("ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff") + dst_range_low = ip_address("0::0") + dst_range_high = ip_address("ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff") + + elif ip_range: + src_range_low = local_ip_start + src_range_high = local_ip_stop + dst_range_low = remote_ip_start + dst_range_high = remote_ip_stop + + else: + src_range_low = src_if.remote_ip6 + src_range_high = src_if.remote_ip6 + dst_range_low = dst_if.remote_ip6 + dst_range_high = dst_if.remote_ip6 + + spdEntry = VppIpsecSpdEntry( + self, + spd, + 0, + src_range_low, + src_range_high, + dst_range_low, + dst_range_high, + proto, + priority=priority, + policy=self.get_policy(policy_type), + is_outbound=is_out, + remote_port_start=remote_port_start, + remote_port_stop=remote_port_stop, + local_port_start=local_port_start, + local_port_stop=local_port_stop, + ) + + if remove is False: + spdEntry.add_vpp_config() + self.spd_policies.append(spdEntry) + else: + spdEntry.remove_vpp_config() + self.spd_policies.remove(spdEntry) + self.logger.info(self.vapi.ppcli("show ipsec all")) + return spdEntry + + def create_stream(self, src_if, dst_if, pkt_count, src_prt=1234, dst_prt=5678): + packets = [] + for i in range(pkt_count): + # create packet info stored in the test case instance + info = self.create_packet_info(src_if, dst_if) + # convert the info into packet payload + payload = self.info_to_payload(info) + # create the packet itself + p = ( + Ether(dst=src_if.local_mac, src=src_if.remote_mac) + / IPv6(src=src_if.remote_ip6, dst=dst_if.remote_ip6) + / UDP(sport=src_prt, dport=dst_prt) + / Raw(payload) + ) + # store a copy of the packet in the packet info + info.data = p.copy() + # append the packet to the list + packets.append(p) + # return the created packet list + return packets + + def verify_capture(self, src_if, dst_if, capture): + packet_info = None + for packet in capture: + try: + ip = packet[IPv6] + udp = packet[UDP] + # convert the payload to packet info object + payload_info = self.payload_to_info(packet) + # make sure the indexes match + self.assert_equal( + payload_info.src, src_if.sw_if_index, "source sw_if_index" + ) + self.assert_equal( + payload_info.dst, dst_if.sw_if_index, "destination sw_if_index" + ) + packet_info = self.get_next_packet_info_for_interface2( + src_if.sw_if_index, dst_if.sw_if_index, packet_info + ) + # make sure we didn't run out of saved packets + self.assertIsNotNone(packet_info) + self.assert_equal( + payload_info.index, packet_info.index, "packet info index" + ) + saved_packet = packet_info.data # fetch the saved packet + # assert the values match + self.assert_equal(ip.src, saved_packet[IPv6].src, "IP source address") + # ... more assertions here + self.assert_equal(udp.sport, saved_packet[UDP].sport, "UDP source port") + except Exception as e: + self.logger.error(ppp("Unexpected or invalid packet:", packet)) + raise + remaining_packet = self.get_next_packet_info_for_interface2( + src_if.sw_if_index, dst_if.sw_if_index, packet_info + ) + self.assertIsNone( + remaining_packet, + "Interface %s: Packet expected from interface " + "%s didn't arrive" % (dst_if.name, src_if.name), + ) + + def verify_policy_match(self, pkt_count, spdEntry): + self.logger.info("XXXX %s %s", str(spdEntry), str(spdEntry.get_stats())) + matched_pkts = spdEntry.get_stats().get("packets") + self.logger.info("Policy %s matched: %d pkts", str(spdEntry), matched_pkts) + self.assert_equal(pkt_count, matched_pkts) + if __name__ == "__main__": unittest.main(testRunner=VppTestRunner)