X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=test%2Ftemplate_ipsec.py;h=b5cd922f127af8cbb8b08ddee346adbb486140ef;hb=4941afb4f96a20df7dc8b6688f7921a3d713b77d;hp=5d835104ec48169b2df976afe5381f7a4227f7ed;hpb=7b8b4652693a87233c9aea313959a9cede3df0f4;p=vpp.git diff --git a/test/template_ipsec.py b/test/template_ipsec.py index 5d835104ec4..b5cd922f127 100644 --- a/test/template_ipsec.py +++ b/test/template_ipsec.py @@ -15,7 +15,8 @@ from scapy.layers.inet6 import ( ) -from framework import VppTestCase, VppTestRunner +from framework import VppTestCase +from asfframework import VppTestRunner from util import ppp, reassemble4, fragment_rfc791, fragment_rfc8200 from vpp_papi import VppEnum @@ -51,6 +52,8 @@ class IPsecIPv4Params: self.outer_flow_label = 0 self.inner_flow_label = 0x12345 + self.anti_replay_window_size = 64 + self.auth_algo_vpp_id = ( VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_SHA1_96 ) @@ -98,6 +101,8 @@ class IPsecIPv6Params: self.outer_flow_label = 0 self.inner_flow_label = 0x12345 + self.anti_replay_window_size = 64 + self.auth_algo_vpp_id = ( VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_SHA1_96 ) @@ -625,22 +630,34 @@ class IpsecTra4(object): def verify_tra_anti_replay(self): p = self.params[socket.AF_INET] esn_en = p.vpp_tra_sa.esn_en + anti_replay_window_size = p.anti_replay_window_size seq_cycle_node_name = "/err/%s/seq_cycled" % self.tra4_encrypt_node_name replay_count = self.get_replay_counts(p) + initial_sa_node_replay_diff = replay_count - p.tra_sa_in.get_err("replay") hash_failed_count = self.get_hash_failed_counts(p) seq_cycle_count = self.statistics.get_err_counter(seq_cycle_node_name) + initial_sa_node_cycled_diff = seq_cycle_count - p.tra_sa_in.get_err( + "seq_cycled" + ) hash_err = "integ_error" if ESP == self.encryption_type: undersize_node_name = "/err/%s/runt" % self.tra4_decrypt_node_name[0] undersize_count = self.statistics.get_err_counter(undersize_node_name) + initial_sa_node_undersize_diff = undersize_count - p.tra_sa_in.get_err( + "runt" + ) # For AES-GCM an error in the hash is reported as a decryption failure if p.crypt_algo in ("AES-GCM", "AES-NULL-GMAC"): hash_err = "decryption_failed" # In async mode, we don't report errors in the hash. if p.async_mode: hash_err = "" + else: + initial_sa_node_hash_diff = hash_failed_count - p.tra_sa_in.get_err( + hash_err + ) # # send packets with seq numbers 1->34 @@ -666,7 +683,7 @@ class IpsecTra4(object): self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2) replay_count += len(pkts) self.assertEqual(self.get_replay_counts(p), replay_count) - err = p.tra_sa_in.get_err("replay") + err = p.tra_sa_in.get_err("replay") + initial_sa_node_replay_diff self.assertEqual(err, replay_count) # @@ -684,29 +701,31 @@ class IpsecTra4(object): recv_pkts = self.send_and_expect(self.tra_if, pkts * 8, self.tra_if, n_rx=1) replay_count += 7 self.assertEqual(self.get_replay_counts(p), replay_count) - err = p.tra_sa_in.get_err("replay") + err = p.tra_sa_in.get_err("replay") + initial_sa_node_replay_diff self.assertEqual(err, replay_count) # - # now move the window over to 257 (more than one byte) and into Case A + # now move the window over to anti_replay_window_size + 100 and into Case A # self.vapi.cli("clear error") pkt = Ether( src=self.tra_if.remote_mac, dst=self.tra_if.local_mac ) / p.scapy_tra_sa.encrypt( IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), - seq_num=257, + seq_num=anti_replay_window_size + 100, ) recv_pkts = self.send_and_expect(self.tra_if, [pkt], self.tra_if) + self.logger.info(self.vapi.ppcli("show ipsec sa 1")) + # replayed packets are dropped self.send_and_assert_no_replies(self.tra_if, pkt * 3, timeout=0.2) replay_count += 3 self.assertEqual(self.get_replay_counts(p), replay_count) - err = p.tra_sa_in.get_err("replay") + err = p.tra_sa_in.get_err("replay") + initial_sa_node_replay_diff self.assertEqual(err, replay_count) - # the window size is 64 packets + # the window size is anti_replay_window_size packets # in window are still accepted pkt = Ether( src=self.tra_if.remote_mac, dst=self.tra_if.local_mac @@ -714,7 +733,6 @@ class IpsecTra4(object): IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), seq_num=200, ) - recv_pkts = self.send_and_expect(self.tra_if, [pkt], self.tra_if) # a packet that does not decrypt does not move the window forward bogus_sa = SecurityAssociation( @@ -729,14 +747,14 @@ class IpsecTra4(object): src=self.tra_if.remote_mac, dst=self.tra_if.local_mac ) / bogus_sa.encrypt( IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), - seq_num=350, + seq_num=anti_replay_window_size + 200, ) self.send_and_assert_no_replies(self.tra_if, pkt * 17, timeout=0.2) hash_failed_count += 17 self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count) if hash_err != "": - err = p.tra_sa_in.get_err(hash_err) + err = p.tra_sa_in.get_err(hash_err) + initial_sa_node_hash_diff self.assertEqual(err, hash_failed_count) # a malformed 'runt' packet @@ -747,13 +765,13 @@ class IpsecTra4(object): src=self.tra_if.remote_mac, dst=self.tra_if.local_mac ) / bogus_sa.encrypt( IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), - seq_num=350, + seq_num=anti_replay_window_size + 200, ) self.send_and_assert_no_replies(self.tra_if, pkt * 17, timeout=0.2) undersize_count += 17 self.assert_error_counter_equal(undersize_node_name, undersize_count) - err = p.tra_sa_in.get_err("runt") + err = p.tra_sa_in.get_err("runt") + initial_sa_node_undersize_diff self.assertEqual(err, undersize_count) # which we can determine since this packet is still in the window @@ -784,21 +802,21 @@ class IpsecTra4(object): hash_failed_count += 17 self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count) if hash_err != "": - err = p.tra_sa_in.get_err(hash_err) + err = p.tra_sa_in.get_err(hash_err) + initial_sa_node_hash_diff self.assertEqual(err, hash_failed_count) else: replay_count += 17 self.assertEqual(self.get_replay_counts(p), replay_count) - err = p.tra_sa_in.get_err("replay") + err = p.tra_sa_in.get_err("replay") + initial_sa_node_replay_diff self.assertEqual(err, replay_count) - # valid packet moves the window over to 258 + # valid packet moves the window over to anti_replay_window_size + 258 pkt = Ether( src=self.tra_if.remote_mac, dst=self.tra_if.local_mac ) / p.scapy_tra_sa.encrypt( IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), - seq_num=258, + seq_num=anti_replay_window_size + 258, ) rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if) decrypted = p.vpp_tra_sa.decrypt(rx[0][IP]) @@ -852,7 +870,7 @@ class IpsecTra4(object): decrypted = p.vpp_tra_sa.decrypt(rx[0][IP]) # - # A packet that has seq num between (2^32-64) and 5 is within + # A packet that has seq num between (2^32-anti_replay_window_size)+4 and 5 is within # the window # p.scapy_tra_sa.seq_num = 0xFFFFFFFD @@ -880,64 +898,757 @@ class IpsecTra4(object): self.tra_if, [pkt], self.tra_if, timeout=0.2 ) - hash_failed_count += 1 - self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count) - if hash_err != "": - err = p.tra_sa_in.get_err(hash_err) - self.assertEqual(err, hash_failed_count) - - # - # but if we move the window forward to case B, then we can wrap - # again - # - p.scapy_tra_sa.seq_num = 0x100000555 - pkt = Ether( - src=self.tra_if.remote_mac, dst=self.tra_if.local_mac - ) / p.scapy_tra_sa.encrypt( - IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), - seq_num=0x100000555, - ) - rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if) - decrypted = p.vpp_tra_sa.decrypt(rx[0][IP]) + hash_failed_count += 1 + self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count) + if hash_err != "": + err = p.tra_sa_in.get_err(hash_err) + initial_sa_node_hash_diff + self.assertEqual(err, hash_failed_count) + + # + # but if we move the window forward to case B, then we can wrap + # again + # + p.scapy_tra_sa.seq_num = 0x100000000 + anti_replay_window_size + 0x555 + pkt = Ether( + src=self.tra_if.remote_mac, dst=self.tra_if.local_mac + ) / p.scapy_tra_sa.encrypt( + IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), + seq_num=p.scapy_tra_sa.seq_num, + ) + rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if) + decrypted = p.vpp_tra_sa.decrypt(rx[0][IP]) + + p.scapy_tra_sa.seq_num = 0x200000444 + pkt = Ether( + src=self.tra_if.remote_mac, dst=self.tra_if.local_mac + ) / p.scapy_tra_sa.encrypt( + IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), + seq_num=0x200000444, + ) + rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if) + decrypted = p.vpp_tra_sa.decrypt(rx[0][IP]) + + else: + # + # without ESN TX sequence numbers can't wrap and packets are + # dropped from here on out. + # + self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2) + seq_cycle_count += len(pkts) + self.assert_error_counter_equal(seq_cycle_node_name, seq_cycle_count) + err = p.tra_sa_out.get_err("seq_cycled") + initial_sa_node_cycled_diff + self.assertEqual(err, seq_cycle_count) + + # move the security-associations seq number on to the last we used + self.vapi.cli("test ipsec sa %d seq 0x15f" % p.scapy_tra_sa_id) + p.scapy_tra_sa.seq_num = 351 + p.vpp_tra_sa.seq_num = 351 + + def verify_tra_lost(self): + p = self.params[socket.AF_INET] + esn_en = p.vpp_tra_sa.esn_en + + # + # send packets with seq numbers 1->34 + # this means the window size is still in Case B (see RFC4303 + # Appendix A) + # + # for reasons i haven't investigated Scapy won't create a packet with + # seq_num=0 + # + pkts = [ + ( + Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac) + / p.scapy_tra_sa.encrypt( + IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), + seq_num=seq, + ) + ) + for seq in range(1, 3) + ] + self.send_and_expect(self.tra_if, pkts, self.tra_if) + + self.assertEqual(p.tra_sa_in.get_err("lost"), 0) + + # skip a sequence number + pkts = [ + ( + Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac) + / p.scapy_tra_sa.encrypt( + IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), + seq_num=seq, + ) + ) + for seq in range(4, 6) + ] + self.send_and_expect(self.tra_if, pkts, self.tra_if) + + self.assertEqual(p.tra_sa_in.get_err("lost"), 0) + + # the lost packet are counted untill we get up past the first + # sizeof(replay_window) packets + pkts = [ + ( + Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac) + / p.scapy_tra_sa.encrypt( + IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), + seq_num=seq, + ) + ) + for seq in range(6, 100) + ] + self.send_and_expect(self.tra_if, pkts, self.tra_if) + + self.assertEqual(p.tra_sa_in.get_err("lost"), 1) + + # lost of holes in the sequence + pkts = [ + ( + Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac) + / p.scapy_tra_sa.encrypt( + IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), + seq_num=seq, + ) + ) + for seq in range(100, 200, 2) + ] + self.send_and_expect(self.tra_if, pkts, self.tra_if, n_rx=50) + + pkts = [ + ( + Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac) + / p.scapy_tra_sa.encrypt( + IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), + seq_num=seq, + ) + ) + for seq in range(200, 300) + ] + self.send_and_expect(self.tra_if, pkts, self.tra_if) + + self.assertEqual(p.tra_sa_in.get_err("lost"), 51) + + # a big hole in the seq number space + pkts = [ + ( + Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac) + / p.scapy_tra_sa.encrypt( + IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), + seq_num=seq, + ) + ) + for seq in range(400, 500) + ] + self.send_and_expect(self.tra_if, pkts, self.tra_if) + + self.assertEqual(p.tra_sa_in.get_err("lost"), 151) + + def verify_tra_basic4(self, count=1, payload_size=54): + """ipsec v4 transport basic test""" + self.vapi.cli("clear errors") + self.vapi.cli("clear ipsec sa") + try: + p = self.params[socket.AF_INET] + send_pkts = self.gen_encrypt_pkts( + p, + p.scapy_tra_sa, + self.tra_if, + src=self.tra_if.remote_ip4, + dst=self.tra_if.local_ip4, + count=count, + payload_size=payload_size, + ) + recv_pkts = self.send_and_expect(self.tra_if, send_pkts, self.tra_if) + for rx in recv_pkts: + self.assertEqual(len(rx) - len(Ether()), rx[IP].len) + self.assert_packet_checksums_valid(rx) + try: + decrypted = p.vpp_tra_sa.decrypt(rx[IP]) + self.assert_packet_checksums_valid(decrypted) + except: + self.logger.debug(ppp("Unexpected packet:", rx)) + raise + finally: + self.logger.info(self.vapi.ppcli("show error")) + self.logger.info(self.vapi.ppcli("show ipsec all")) + + pkts = p.tra_sa_in.get_stats()["packets"] + self.assertEqual( + pkts, count, "incorrect SA in counts: expected %d != %d" % (count, pkts) + ) + pkts = p.tra_sa_out.get_stats()["packets"] + self.assertEqual( + pkts, count, "incorrect SA out counts: expected %d != %d" % (count, pkts) + ) + self.assertEqual(p.tra_sa_out.get_err("lost"), 0) + self.assertEqual(p.tra_sa_in.get_err("lost"), 0) + + self.assert_packet_counter_equal(self.tra4_encrypt_node_name, count) + self.assert_packet_counter_equal(self.tra4_decrypt_node_name[0], count) + + def _verify_tra_anti_replay_algorithm_esn(self): + def seq_num(seqh, seql): + return (seqh << 32) | (seql & 0xFFFF_FFFF) + + p = self.params[socket.AF_INET] + anti_replay_window_size = p.anti_replay_window_size + + seq_cycle_node_name = "/err/%s/seq_cycled" % self.tra4_encrypt_node_name + replay_count = self.get_replay_counts(p) + hash_failed_count = self.get_hash_failed_counts(p) + seq_cycle_count = self.statistics.get_err_counter(seq_cycle_node_name) + + if ESP == self.encryption_type: + undersize_node_name = "/err/%s/runt" % self.tra4_decrypt_node_name[0] + undersize_count = self.statistics.get_err_counter(undersize_node_name) + + # reset the TX SA to avoid conflict with left configuration + self.vapi.cli(f"test ipsec sa {p.vpp_tra_sa_id} seq 0x0") + + """ + RFC 4303 Appendix A2. Case A + + |: new Th marker + a-i: possible seq num received + +: Bl, Tl, Bl', Tl' + [BT]l(sign) = [BT]l (sign) 2^32 mod 2^32 (Th inc/dec-remented by 1) + + Th - 1 Th Th + 1 + --|--a--+---b---+-c--|--d--+---e---+-f--|--g--+---h---+--i-|-- + ========= ========= ========= + Bl- Tl- Bl Tl Bl+ Tl+ + + Case A implies Tl >= W - 1 + """ + + Th = 1 + Tl = anti_replay_window_size + 40 + Bl = Tl - anti_replay_window_size + 1 + + # move VPP's RX AR window to Case A + self.vapi.cli(f"test ipsec sa {p.scapy_tra_sa_id} seq {seq_num(Th, Tl):#x}") + p.scapy_tra_sa.seq_num = seq_num(Th, Tl) + + """ + case a: Seql < Bl + - pre-crypto check: algorithm predicts that the packet wrap the window + -> Seqh = Th + 1 + - integrity check: should fail + - post-crypto check: ... + """ + pkts = [ + ( + Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac) + / p.scapy_tra_sa.encrypt( + IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), + seq_num=seq, + ) + ) + for seq in range(seq_num(Th - 1, Bl - 20), seq_num(Th - 1, Bl - 5)) + ] + + # out-of-window packets fail integrity check + self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2) + hash_failed_count += len(pkts) + self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count) + + """ + case b: Bl <= Seql <= Tl + - pre-crypto check: algorithm predicts that the packet is in the window + -> Seqh = Th + -> check for a replayed packet with Seql + - integrity check: should fail + - post-crypto check: ... + """ + pkts = [ + ( + Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac) + / p.scapy_tra_sa.encrypt( + IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), + seq_num=seq, + ) + ) + for seq in range(seq_num(Th, Tl - 10), seq_num(Th, Tl - 5)) + ] + self.send_and_expect(self.tra_if, pkts, self.tra_if) + + p.scapy_tra_sa.seq_num = seq_num(Th - 1, Tl) + pkts = [ + ( + Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac) + / p.scapy_tra_sa.encrypt( + IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), + seq_num=seq, + ) + ) + for seq in range(seq_num(Th - 1, Tl - 35), seq_num(Th - 1, Tl - 5)) + ] + + self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2) + + # some packets are rejected by the pre-crypto check + replay_count += 5 + self.assertEqual(self.get_replay_counts(p), replay_count) + + # out-of-window packets fail integrity check + hash_failed_count += len(pkts) - 5 + self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count) + + """ + case c: Seql > Tl + - pre-crypto check: algorithm predicts that the packet does not wrap the window + -> Seqh = Th + - integrity check: should fail + - post-crypto check: ... + """ + pkts = [ + ( + Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac) + / p.scapy_tra_sa.encrypt( + IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), + seq_num=seq, + ) + ) + for seq in range(seq_num(Th - 1, Tl + 5), seq_num(Th - 1, Tl + 20)) + ] + + self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2) + + # out-of-window packets fail integrity check + hash_failed_count += len(pkts) + self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count) + + """ + case d: Seql < Bl + - pre-crypto check: algorithm predicts that the packet wrap the window + -> Seqh = Th + 1 + - integrity check: should fail + - post-crypto check: ... + """ + p.scapy_tra_sa.seq_num = seq_num(Th, Tl) + pkts = [ + ( + Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac) + / p.scapy_tra_sa.encrypt( + IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), + seq_num=seq, + ) + ) + for seq in range(seq_num(Th, Bl - 20), seq_num(Th, Bl - 5)) + ] + + # out-of-window packets fail integrity check + self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2) + hash_failed_count += len(pkts) + self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count) + + """ + case e: Bl <= Seql <= Tl + - pre-crypto check: algorithm predicts that the packet is in the window + -> Seqh = Th + -> check for a replayed packet with Seql + - integrity check: should pass + - post-crypto check: should pass + -> Seql is marked in the AR window + """ + pkts = [ + ( + Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac) + / p.scapy_tra_sa.encrypt( + IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), + seq_num=seq, + ) + ) + for seq in range(seq_num(Th, Bl + 10), seq_num(Th, Bl + 30)) + ] + + self.send_and_expect(self.tra_if, pkts, self.tra_if) + + """ + case f: Seql > Tl + - pre-crypto check: algorithm predicts that the packet does not wrap the window + -> Seqh = Th + - integrity check: should pass + - post-crypto check: should pass + -> AR window shift (the window stays Case A) + -> Seql is marked in the AR window + """ + pkts = [ + ( + Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac) + / p.scapy_tra_sa.encrypt( + IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), + seq_num=seq, + ) + ) + for seq in range(seq_num(Th, Tl + 50), seq_num(Th, Tl + 60)) + ] + + self.send_and_expect(self.tra_if, pkts, self.tra_if) + + """ + case g: Seql < Bl + - pre-crypto check: algorithm predicts that the packet wrap the window + -> Seqh = Th + 1 + - integrity check: should pass + - post-crypto check: should pass + -> AR window shift (may set the window in Case B) + -> Seql is marked in the AR window + """ + p.scapy_tra_sa.seq_num = seq_num(Th + 1, Tl) + pkts = [ + ( + Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac) + / p.scapy_tra_sa.encrypt( + IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), + seq_num=seq, + ) + ) + # set the window in Case B (the minimum window size is 64 + # so we are sure to overlap) + for seq in range(seq_num(Th + 1, 10), seq_num(Th + 1, 20)) + ] + + self.send_and_expect(self.tra_if, pkts, self.tra_if) + + # reset the VPP's RX AR window to Case A + Th = 1 + Tl = 2 * anti_replay_window_size + 40 + Bl = Tl - anti_replay_window_size + 1 + + self.vapi.cli(f"test ipsec sa {p.scapy_tra_sa_id} seq {seq_num(Th, Tl):#x}") + + p.scapy_tra_sa.seq_num = seq_num(Th + 1, Tl) + pkts = [ + ( + Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac) + / p.scapy_tra_sa.encrypt( + IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), + seq_num=seq, + ) + ) + # the AR will stay in Case A + for seq in range( + seq_num(Th + 1, anti_replay_window_size + 10), + seq_num(Th + 1, anti_replay_window_size + 20), + ) + ] + + self.send_and_expect(self.tra_if, pkts, self.tra_if) + + """ + case h: Bl <= Seql <= Tl + - pre-crypto check: algorithm predicts that the packet is in the window + -> Seqh = Th + -> check for a replayed packet with Seql + - integrity check: the wrap is not detected, should fail + - post-crypto check: ... + """ + Th += 1 + Tl = anti_replay_window_size + 20 + Bl = Tl - anti_replay_window_size + 1 + + p.scapy_tra_sa.seq_num = seq_num(Th + 1, Tl) + + pkts = [ + ( + Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac) + / p.scapy_tra_sa.encrypt( + IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), + seq_num=seq, + ) + ) + for seq in range(seq_num(Th + 1, Tl - 20), seq_num(Th + 1, Tl - 5)) + ] + + self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2) + + # some packets are rejected by the pre-crypto check + replay_count += 5 + self.assertEqual(self.get_replay_counts(p), replay_count) + + # out-of-window packets fail integrity check + hash_failed_count += len(pkts) - 5 + self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count) + + """ + case i: Seql > Tl + - pre-crypto check: algorithm predicts that the packet does not wrap the window + -> Seqh = Th + - integrity check: the wrap is not detected, shoud fail + - post-crypto check: ... + """ + pkts = [ + ( + Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac) + / p.scapy_tra_sa.encrypt( + IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), + seq_num=seq, + ) + ) + for seq in range(seq_num(Th + 1, Tl + 5), seq_num(Th + 1, Tl + 15)) + ] + + # out-of-window packets fail integrity check + self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2) + hash_failed_count += len(pkts) + self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count) + + """ + RFC 4303 Appendix A2. Case B + + Th - 1 Th Th + 1 + ----|-a-+-----b----+--c--|-d-+----e-----+--f--|-g-+--h--- + ========= =========== =========== + Tl- Bl Tl Bl+ Tl+ + + Case B implies Tl < W - 1 + """ + + # reset the VPP's RX AR window to Case B + Th = 2 + Tl = 30 # minimum window size of 64, we are sure to overlap + Bl = (Tl - anti_replay_window_size + 1) % (1 << 32) + + self.vapi.cli(f"test ipsec sa {p.scapy_tra_sa_id} seq {seq_num(Th, Tl):#x}") + p.scapy_tra_sa.seq_num = seq_num(Th, Tl) + + """ + case a: Seql <= Tl < Bl + - pre-crypto check: algorithm predicts that the packet is in the window + -> Seqh = Th + -> check for replayed packet + - integrity check: should fail + - post-crypto check: ... + """ + pkts = [ + ( + Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac) + / p.scapy_tra_sa.encrypt( + IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), + seq_num=seq, + ) + ) + for seq in range(seq_num(Th, 5), seq_num(Th, 10)) + ] + + self.send_and_expect(self.tra_if, pkts, self.tra_if) + + p.scapy_tra_sa.seq_num = seq_num(Th - 1, Tl) + pkts = [ + ( + Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac) + / p.scapy_tra_sa.encrypt( + IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), + seq_num=seq, + ) + ) + for seq in range(seq_num(Th - 1, 0), seq_num(Th - 1, 15)) + ] + + self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2) + + # some packets are rejected by the pre-crypto check + replay_count += 5 + self.assertEqual(self.get_replay_counts(p), replay_count) + + # out-of-window packets fail integrity check + hash_failed_count += len(pkts) - 5 + self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count) + + """ + case b: Tl < Seql < Bl + - pre-crypto check: algorithm predicts that the packet will shift the window + -> Seqh = Th + - integrity check: should fail + - post-crypto check: ... + """ + pkts = [ + ( + Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac) + / p.scapy_tra_sa.encrypt( + IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), + seq_num=seq, + ) + ) + for seq in range(seq_num(Th - 1, Tl + 10), seq_num(Th - 1, Tl + 20)) + ] + + self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2) + + # out-of-window packets fail integrity check + hash_failed_count += len(pkts) + self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count) + + """ + case c: Tl < Bl <= Seql + - pre-crypto check: algorithm predicts that the packet is in the window + -> Seqh = Th - 1 + -> check for a replayed packet with Seql + - integrity check: should pass + - post-crypto check: should pass + -> Seql is marked in the AR window + """ + pkts = [ + ( + Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac) + / p.scapy_tra_sa.encrypt( + IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), + seq_num=seq, + ) + ) + for seq in range(seq_num(Th - 1, Bl + 10), seq_num(Th - 1, Bl + 20)) + ] + + self.send_and_expect(self.tra_if, pkts, self.tra_if) + + """ + case d: Seql <= Tl < Bl + - pre-crypto check: algorithm predicts that the packet is the window + -> Seqh = Th + -> check for replayed packet + - integrity check: should pass + - post-crypto check: should pass + -> Seql is marked in the AR window + """ + p.scapy_tra_sa.seq_num = seq_num(Th, Tl) + pkts = [ + ( + Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac) + / p.scapy_tra_sa.encrypt( + IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), + seq_num=seq, + ) + ) + for seq in range(seq_num(Th, 15), seq_num(Th, 25)) + ] + + self.send_and_expect(self.tra_if, pkts, self.tra_if) + + """ + case e: Tl < Seql < Bl + - pre-crypto check: algorithm predicts that the packet is in the window + -> Seqh = Th + -> check for a replayed packet with Seql + - integrity check: should pass + - post-crypto check: should pass + -> AR window shift (may set the window in Case A) + -> Seql is marked in the AR window + """ + pkts = [ + ( + Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac) + / p.scapy_tra_sa.encrypt( + IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), + seq_num=seq, + ) + ) + for seq in range(seq_num(Th, Tl + 5), seq_num(Th, Tl + 15)) + ] + + # the window stays in Case B + self.send_and_expect(self.tra_if, pkts, self.tra_if) + + pkts = [ + ( + Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac) + / p.scapy_tra_sa.encrypt( + IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), + seq_num=seq, + ) + ) + for seq in range( + seq_num(Th, Tl + anti_replay_window_size + 5), + seq_num(Th, Tl + anti_replay_window_size + 15), + ) + ] + + # the window moves to Case A + self.send_and_expect(self.tra_if, pkts, self.tra_if) + + # reset the VPP's RX AR window to Case B + Th = 2 + Tl = 30 # minimum window size of 64, we are sure to overlap + Bl = (Tl - anti_replay_window_size + 1) % (1 << 32) + + self.vapi.cli(f"test ipsec sa {p.scapy_tra_sa_id} seq {seq_num(Th, Tl):#x}") + p.scapy_tra_sa.seq_num = seq_num(Th, Tl) + + """ + case f: Tl < Bl <= Seql + - pre-crypto check: algorithm predicts that the packet is in the previous window + -> Seqh = Th - 1 + -> check for a replayed packet with Seql + - integrity check: should fail + - post-crypto check: ... + """ + pkts = [ + ( + Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac) + / p.scapy_tra_sa.encrypt( + IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), + seq_num=seq, + ) + ) + for seq in range(seq_num(Th, Bl + 10), seq_num(Th, Bl + 20)) + ] + + self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2) + + # out-of-window packets fail integrity check + hash_failed_count += len(pkts) + self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count) + + """ + case g: Seql <= Tl < Bl + - pre-crypto check: algorithm predicts that the packet is the window + -> Seqh = Th + -> check for replayed packet + - integrity check: should fail + - post-crypto check: ... + """ + pkts = [ + ( + Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac) + / p.scapy_tra_sa.encrypt( + IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), + seq_num=seq, + ) + ) + for seq in range(seq_num(Th, 10), seq_num(Th, 15)) + ] + + self.send_and_expect(self.tra_if, pkts, self.tra_if) - p.scapy_tra_sa.seq_num = 0x200000444 - pkt = Ether( - src=self.tra_if.remote_mac, dst=self.tra_if.local_mac - ) / p.scapy_tra_sa.encrypt( - IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), - seq_num=0x200000444, + p.scapy_tra_sa.seq_num = seq_num(Th + 1, Tl) + pkts = [ + ( + Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac) + / p.scapy_tra_sa.encrypt( + IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), + seq_num=seq, + ) ) - rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if) - decrypted = p.vpp_tra_sa.decrypt(rx[0][IP]) + for seq in range(seq_num(Th + 1, 0), seq_num(Th + 1, 15)) + ] - else: - # - # without ESN TX sequence numbers can't wrap and packets are - # dropped from here on out. - # - self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2) - seq_cycle_count += len(pkts) - self.assert_error_counter_equal(seq_cycle_node_name, seq_cycle_count) - err = p.tra_sa_out.get_err("seq_cycled") - self.assertEqual(err, seq_cycle_count) + self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2) - # move the security-associations seq number on to the last we used - self.vapi.cli("test ipsec sa %d seq 0x15f" % p.scapy_tra_sa_id) - p.scapy_tra_sa.seq_num = 351 - p.vpp_tra_sa.seq_num = 351 + # some packets are rejected by the pre-crypto check + replay_count += 5 + self.assertEqual(self.get_replay_counts(p), replay_count) - def verify_tra_lost(self): - p = self.params[socket.AF_INET] - esn_en = p.vpp_tra_sa.esn_en + # out-of-window packets fail integrity check + hash_failed_count += len(pkts) - 5 + self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count) - # - # send packets with seq numbers 1->34 - # this means the window size is still in Case B (see RFC4303 - # Appendix A) - # - # for reasons i haven't investigated Scapy won't create a packet with - # seq_num=0 - # + """ + case h: Tl < Seql < Bl + - pre-crypto check: algorithm predicts that the packet will shift the window + -> Seqh = Th + - integrity check: should fail + - post-crypto check: ... + """ pkts = [ ( Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac) @@ -946,13 +1657,62 @@ class IpsecTra4(object): seq_num=seq, ) ) - for seq in range(1, 3) + for seq in range(seq_num(Th + 1, Tl + 10), seq_num(Th + 1, Tl + 20)) ] - self.send_and_expect(self.tra_if, pkts, self.tra_if) - self.assertEqual(p.tra_sa_in.get_err("lost"), 0) + self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2) - # skip a sequence number + # out-of-window packets fail integrity check + hash_failed_count += len(pkts) + self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count) + + def _verify_tra_anti_replay_algorithm_no_esn(self): + def seq_num(seql): + return seql & 0xFFFF_FFFF + + p = self.params[socket.AF_INET] + anti_replay_window_size = p.anti_replay_window_size + + seq_cycle_node_name = "/err/%s/seq_cycled" % self.tra4_encrypt_node_name + replay_count = self.get_replay_counts(p) + hash_failed_count = self.get_hash_failed_counts(p) + seq_cycle_count = self.statistics.get_err_counter(seq_cycle_node_name) + + if ESP == self.encryption_type: + undersize_node_name = "/err/%s/runt" % self.tra4_decrypt_node_name[0] + undersize_count = self.statistics.get_err_counter(undersize_node_name) + + # reset the TX SA to avoid conflict with left configuration + self.vapi.cli(f"test ipsec sa {p.vpp_tra_sa_id} seq 0x0") + + """ + RFC 4303 Appendix A2. Case A + + a-c: possible seq num received + +: Bl, Tl + + |--a--+---b---+-c--| + ========= + Bl Tl + + No ESN implies Th = 0 + Case A implies Tl >= W - 1 + """ + + Tl = anti_replay_window_size + 40 + Bl = Tl - anti_replay_window_size + 1 + + # move VPP's RX AR window to Case A + self.vapi.cli(f"test ipsec sa {p.scapy_tra_sa_id} seq {seq_num(Tl):#x}") + p.scapy_tra_sa.seq_num = seq_num(Tl) + + """ + case a: Seql < Bl + - pre-crypto check: algorithm predicts that the packet is out of window + -> packet should be dropped + - integrity check: ... + - post-crypto check: ... + """ pkts = [ ( Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac) @@ -961,14 +1721,22 @@ class IpsecTra4(object): seq_num=seq, ) ) - for seq in range(4, 6) + for seq in range(seq_num(Bl - 20), seq_num(Bl - 5)) ] - self.send_and_expect(self.tra_if, pkts, self.tra_if) - self.assertEqual(p.tra_sa_in.get_err("lost"), 0) + # out-of-window packets + self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2) + replay_count += len(pkts) + self.assertEqual(self.get_replay_counts(p), replay_count) - # the lost packet are counted untill we get up past the first - # sizeof(replay_window) packets + """ + case b: Bl <= Seql <= Tl + - pre-crypto check: algorithm predicts that the packet is in the window + -> check for a replayed packet with Seql + - integrity check: should pass + - post-crypto check: + -> check for a replayed packet with Seql + """ pkts = [ ( Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac) @@ -977,13 +1745,10 @@ class IpsecTra4(object): seq_num=seq, ) ) - for seq in range(6, 100) + for seq in range(seq_num(Tl - 50), seq_num(Tl - 30)) ] self.send_and_expect(self.tra_if, pkts, self.tra_if) - self.assertEqual(p.tra_sa_in.get_err("lost"), 1) - - # lost of holes in the sequence pkts = [ ( Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac) @@ -992,10 +1757,22 @@ class IpsecTra4(object): seq_num=seq, ) ) - for seq in range(100, 200, 2) + for seq in range(seq_num(Tl - 35), seq_num(Tl - 30)) ] - self.send_and_expect(self.tra_if, pkts, self.tra_if, n_rx=50) + self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2) + + # replayed packets + replay_count += 5 + self.assertEqual(self.get_replay_counts(p), replay_count) + + """ + case c: Seql > Tl + - pre-crypto check: algorithm predicts that the packet will shift the window + - integrity check: should pass + - post-crypto check: should pass + -> AR window is shifted + """ pkts = [ ( Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac) @@ -1004,13 +1781,34 @@ class IpsecTra4(object): seq_num=seq, ) ) - for seq in range(200, 300) + for seq in range(seq_num(Tl + 5), seq_num(Tl + 20)) ] + self.send_and_expect(self.tra_if, pkts, self.tra_if) - self.assertEqual(p.tra_sa_in.get_err("lost"), 51) + """ + RFC 4303 Appendix A2. Case B - # a big hole in the seq number space + |-a-----+------b-----| + ========= + Tl + + Case B implies Tl < W - 1 + """ + + # reset the VPP's RX AR window to Case B + Tl = 30 # minimum window size of 64, we are sure to overlap + Bl = seq_num(Tl - anti_replay_window_size + 1) + + self.vapi.cli(f"test ipsec sa {p.scapy_tra_sa_id} seq {seq_num(Tl):#x}") + + """ + case a: Seql <= Tl < Bl + - pre-crypto check: algorithm predicts that the packet is in the window + -> check for replayed packet + - integrity check: should fail + - post-crypto check: ... + """ pkts = [ ( Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac) @@ -1019,54 +1817,36 @@ class IpsecTra4(object): seq_num=seq, ) ) - for seq in range(400, 500) + for seq in range(seq_num(5), seq_num(10)) ] - self.send_and_expect(self.tra_if, pkts, self.tra_if) - self.assertEqual(p.tra_sa_in.get_err("lost"), 151) + self.send_and_expect(self.tra_if, pkts, self.tra_if) - def verify_tra_basic4(self, count=1, payload_size=54): - """ipsec v4 transport basic test""" - self.vapi.cli("clear errors") - self.vapi.cli("clear ipsec sa") - try: - p = self.params[socket.AF_INET] - send_pkts = self.gen_encrypt_pkts( - p, - p.scapy_tra_sa, - self.tra_if, - src=self.tra_if.remote_ip4, - dst=self.tra_if.local_ip4, - count=count, - payload_size=payload_size, + """ + case b: Tl < Seql < Bl + - pre-crypto check: algorithm predicts that the packet will shift the window + - integrity check: should pass + - post-crypto check: should pass + -> AR window is shifted + """ + pkts = [ + ( + Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac) + / p.scapy_tra_sa.encrypt( + IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(), + seq_num=seq, + ) ) - recv_pkts = self.send_and_expect(self.tra_if, send_pkts, self.tra_if) - for rx in recv_pkts: - self.assertEqual(len(rx) - len(Ether()), rx[IP].len) - self.assert_packet_checksums_valid(rx) - try: - decrypted = p.vpp_tra_sa.decrypt(rx[IP]) - self.assert_packet_checksums_valid(decrypted) - except: - self.logger.debug(ppp("Unexpected packet:", rx)) - raise - finally: - self.logger.info(self.vapi.ppcli("show error")) - self.logger.info(self.vapi.ppcli("show ipsec all")) + for seq in range(seq_num(-50), seq_num(-20)) + ] - pkts = p.tra_sa_in.get_stats()["packets"] - self.assertEqual( - pkts, count, "incorrect SA in counts: expected %d != %d" % (count, pkts) - ) - pkts = p.tra_sa_out.get_stats()["packets"] - self.assertEqual( - pkts, count, "incorrect SA out counts: expected %d != %d" % (count, pkts) - ) - self.assertEqual(p.tra_sa_out.get_err("lost"), 0) - self.assertEqual(p.tra_sa_in.get_err("lost"), 0) + self.send_and_expect(self.tra_if, pkts, self.tra_if) - self.assert_packet_counter_equal(self.tra4_encrypt_node_name, count) - self.assert_packet_counter_equal(self.tra4_decrypt_node_name[0], count) + def verify_tra_anti_replay_algorithm(self): + if self.params[socket.AF_INET].vpp_tra_sa.esn_en: + self._verify_tra_anti_replay_algorithm_esn() + else: + self._verify_tra_anti_replay_algorithm_no_esn() class IpsecTra4Tests(IpsecTra4): @@ -1076,6 +1856,10 @@ class IpsecTra4Tests(IpsecTra4): """ipsec v4 transport anti-replay test""" self.verify_tra_anti_replay() + def test_tra_anti_replay_algorithm(self): + """ipsec v4 transport anti-replay algorithm test""" + self.verify_tra_anti_replay_algorithm() + def test_tra_lost(self): """ipsec v4 transport lost packet test""" self.verify_tra_lost() @@ -2023,20 +2807,48 @@ class IPSecIPv4Fwd(VppTestCase): self.logger.info(self.vapi.ppcli("show ipsec all")) return spdEntry - def create_stream(self, src_if, dst_if, pkt_count, src_prt=1234, dst_prt=5678): + def create_stream( + self, src_if, dst_if, pkt_count, src_prt=1234, dst_prt=5678, proto="UDP" + ): packets = [] + # create SA + sa = SecurityAssociation( + ESP, + spi=1000, + crypt_algo="AES-CBC", + crypt_key=b"JPjyOWBeVEQiMe7h", + auth_algo="HMAC-SHA1-96", + auth_key=b"C91KUR9GYMm5GfkEvNjX", + tunnel_header=IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4), + nat_t_header=UDP(sport=src_prt, dport=dst_prt), + ) for i in range(pkt_count): # create packet info stored in the test case instance info = self.create_packet_info(src_if, dst_if) # convert the info into packet payload payload = self.info_to_payload(info) # create the packet itself - p = ( - Ether(dst=src_if.local_mac, src=src_if.remote_mac) - / IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4) - / UDP(sport=src_prt, dport=dst_prt) - / Raw(payload) - ) + p = [] + if proto == "UDP-ESP": + p = Ether(dst=src_if.local_mac, src=src_if.remote_mac) / sa.encrypt( + IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4) + / UDP(sport=src_prt, dport=dst_prt) + / Raw(payload) + ) + elif proto == "UDP": + p = ( + Ether(dst=src_if.local_mac, src=src_if.remote_mac) + / IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4) + / UDP(sport=src_prt, dport=dst_prt) + / Raw(payload) + ) + elif proto == "TCP": + p = ( + Ether(dst=src_if.local_mac, src=src_if.remote_mac) + / IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4) + / TCP(sport=src_prt, dport=dst_prt) + / Raw(payload) + ) # store a copy of the packet in the packet info info.data = p.copy() # append the packet to the list @@ -2090,6 +2902,50 @@ class IPSecIPv4Fwd(VppTestCase): self.logger.info("Policy %s matched: %d pkts", str(spdEntry), matched_pkts) self.assert_equal(pkt_count, matched_pkts) + # Method verify_l3_l4_capture() will verify network and transport layer + # fields of the packet sa.encrypt() gives interface number garbadge. + # thus interface validation get failed (scapy bug?). However our intent + # is to verify IP layer and above and that is covered. + + def verify_l3_l4_capture( + self, src_if, dst_if, capture, tcp_port_in=1234, udp_port_in=5678 + ): + for packet in capture: + try: + self.assert_packet_checksums_valid(packet) + self.assert_equal( + packet[IP].src, + src_if.remote_ip4, + "decrypted packet source address", + ) + self.assert_equal( + packet[IP].dst, + dst_if.remote_ip4, + "decrypted packet destination address", + ) + if packet.haslayer(TCP): + self.assertFalse( + packet.haslayer(UDP), + "unexpected UDP header in decrypted packet", + ) + elif packet.haslayer(UDP): + if packet[UDP].payload: + self.assertFalse( + packet[UDP][1].haslayer(UDP), + "unexpected UDP header in decrypted packet", + ) + else: + self.assertFalse( + packet.haslayer(UDP), + "unexpected UDP header in decrypted packet", + ) + self.assert_equal( + packet[ICMP].id, self.icmp_id_in, "decrypted packet ICMP ID" + ) + except Exception: + self.logger.error(ppp("Unexpected or invalid plain packet:", packet)) + raise + class SpdFlowCacheTemplate(IPSecIPv4Fwd): @classmethod @@ -2165,6 +3021,84 @@ class SpdFlowCacheTemplate(IPSecIPv4Fwd): self.logger.info("\ncrc32 NOT supported:\n" + cpu_info) return False + def create_stream( + cls, src_if, dst_if, pkt_count, src_prt=1234, dst_prt=5678, proto="UDP-ESP" + ): + packets = [] + packets = super(SpdFlowCacheTemplate, cls).create_stream( + src_if, dst_if, pkt_count, src_prt, dst_prt, proto + ) + return packets + + def verify_capture( + self, src_if, dst_if, capture, tcp_port_in=1234, udp_port_in=5678 + ): + super(SpdFlowCacheTemplate, self).verify_l3_l4_capture( + src_if, dst_if, capture, tcp_port_in, udp_port_in + ) + + +class SpdFastPathTemplate(IPSecIPv4Fwd): + @classmethod + def setUpConstants(cls): + super(SpdFastPathTemplate, cls).setUpConstants() + # Override this method with required cmdline parameters e.g. + # cls.vpp_cmdline.extend(["ipsec", "{", + # "ipv4-outbound-spd-flow-cache on", + # "}"]) + # cls.logger.info("VPP modified cmdline is %s" % " " + # .join(cls.vpp_cmdline)) + + def setUp(self): + super(SpdFastPathTemplate, self).setUp() + + def tearDown(self): + super(SpdFastPathTemplate, self).tearDown() + + def create_stream( + cls, src_if, dst_if, pkt_count, src_prt=1234, dst_prt=5678, proto="UDP-ESP" + ): + packets = [] + packets = super(SpdFastPathTemplate, cls).create_stream( + src_if, dst_if, pkt_count, src_prt, dst_prt, proto + ) + return packets + + def verify_capture( + self, src_if, dst_if, capture, tcp_port_in=1234, udp_port_in=5678 + ): + super(SpdFastPathTemplate, self).verify_l3_l4_capture( + src_if, dst_if, capture, tcp_port_in, udp_port_in + ) + + +class IpsecDefaultTemplate(IPSecIPv4Fwd): + @classmethod + def setUpConstants(cls): + super(IpsecDefaultTemplate, cls).setUpConstants() + + def setUp(self): + super(IpsecDefaultTemplate, self).setUp() + + def tearDown(self): + super(IpsecDefaultTemplate, self).tearDown() + + def create_stream( + cls, src_if, dst_if, pkt_count, src_prt=1234, dst_prt=5678, proto="UDP-ESP" + ): + packets = [] + packets = super(IpsecDefaultTemplate, cls).create_stream( + src_if, dst_if, pkt_count, src_prt, dst_prt, proto + ) + return packets + + def verify_capture( + self, src_if, dst_if, capture, tcp_port_in=1234, udp_port_in=5678 + ): + super(IpsecDefaultTemplate, self).verify_l3_l4_capture( + src_if, dst_if, capture, tcp_port_in, udp_port_in + ) + class IPSecIPv6Fwd(VppTestCase): """Test IPSec by capturing and verifying IPv6 forwarded pkts"""