X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=test%2Ftemplate_ipsec.py;h=d5f13322a59232ce09133d030cffa4cad49cedf9;hb=1b6b09bb514831050b23397c078933c2ace3ff36;hp=62271d33579617dfc46b449560a759297ec70a21;hpb=8d8150262b00435c365a43c8f859584901736aff;p=vpp.git diff --git a/test/template_ipsec.py b/test/template_ipsec.py index 62271d33579..d5f13322a59 100644 --- a/test/template_ipsec.py +++ b/test/template_ipsec.py @@ -167,21 +167,21 @@ def config_tra_params(p, encryption_type): class TemplateIpsec(VppTestCase): """ - TRANSPORT MODE: + TRANSPORT MODE:: - ------ encrypt --- - |tra_if| <-------> |VPP| - ------ decrypt --- + ------ encrypt --- + |tra_if| <-------> |VPP| + ------ decrypt --- - TUNNEL MODE: + TUNNEL MODE:: - ------ encrypt --- plain --- - |tun_if| <------- |VPP| <------ |pg1| - ------ --- --- + ------ encrypt --- plain --- + |tun_if| <------- |VPP| <------ |pg1| + ------ --- --- - ------ decrypt --- plain --- - |tun_if| -------> |VPP| ------> |pg1| - ------ --- --- + ------ decrypt --- plain --- + |tun_if| -------> |VPP| ------> |pg1| + ------ --- --- """ tun_spd_id = 1 tra_spd_id = 2 @@ -324,12 +324,206 @@ class IpsecTra4(object): return count + def verify_hi_seq_num(self): + p = self.params[socket.AF_INET] + saf = VppEnum.vl_api_ipsec_sad_flags_t + esn_on = p.vpp_tra_sa.esn_en + ar_on = p.flags & saf.IPSEC_API_SAD_FLAG_USE_ANTI_REPLAY + + seq_cycle_node_name = \ + ('/err/%s/sequence number cycled (packet dropped)' % + self.tra4_encrypt_node_name) + replay_count = self.get_replay_counts(p) + hash_failed_count = self.get_hash_failed_counts(p) + seq_cycle_count = self.statistics.get_err_counter(seq_cycle_node_name) + + # a few packets so we get the rx seq number above the window size and + # thus can simulate a wrap with an out of window packet + pkts = [(Ether(src=self.tra_if.remote_mac, + dst=self.tra_if.local_mac) / + p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4, + dst=self.tra_if.local_ip4) / + ICMP(), + seq_num=seq)) + for seq in range(63, 80)] + recv_pkts = self.send_and_expect(self.tra_if, pkts, self.tra_if) + + # these 4 packets will all choose seq-num 0 to decrpyt since none + # are out of window when first checked. however, once #200 has + # decrypted it will move the window to 200 and has #81 is out of + # window. this packet should be dropped. + pkts = [(Ether(src=self.tra_if.remote_mac, + dst=self.tra_if.local_mac) / + p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4, + dst=self.tra_if.local_ip4) / + ICMP(), + seq_num=200)), + (Ether(src=self.tra_if.remote_mac, + dst=self.tra_if.local_mac) / + p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4, + dst=self.tra_if.local_ip4) / + ICMP(), + seq_num=81)), + (Ether(src=self.tra_if.remote_mac, + dst=self.tra_if.local_mac) / + p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4, + dst=self.tra_if.local_ip4) / + ICMP(), + seq_num=201)), + (Ether(src=self.tra_if.remote_mac, + dst=self.tra_if.local_mac) / + p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4, + dst=self.tra_if.local_ip4) / + ICMP(), + seq_num=202))] + + # if anti-replay is off then we won't drop #81 + n_rx = 3 if ar_on else 4 + self.send_and_expect(self.tra_if, pkts, self.tra_if, n_rx=n_rx) + # this packet is one before the wrap + pkts = [(Ether(src=self.tra_if.remote_mac, + dst=self.tra_if.local_mac) / + p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4, + dst=self.tra_if.local_ip4) / + ICMP(), + seq_num=203))] + recv_pkts = self.send_and_expect(self.tra_if, pkts, self.tra_if) + + # move the window over half way to a wrap + pkts = [(Ether(src=self.tra_if.remote_mac, + dst=self.tra_if.local_mac) / + p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4, + dst=self.tra_if.local_ip4) / + ICMP(), + seq_num=0x80000001))] + recv_pkts = self.send_and_expect(self.tra_if, pkts, self.tra_if) + + # anti-replay will drop old packets, no anti-replay will not + pkts = [(Ether(src=self.tra_if.remote_mac, + dst=self.tra_if.local_mac) / + p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4, + dst=self.tra_if.local_ip4) / + ICMP(), + seq_num=0x44000001))] + + if ar_on: + self.send_and_assert_no_replies(self.tra_if, pkts) + else: + recv_pkts = self.send_and_expect(self.tra_if, pkts, self.tra_if) + + if esn_on: + # + # validate wrapping the ESN + # + + # wrap scapy's TX SA SN + p.scapy_tra_sa.seq_num = 0x100000005 + + # send a packet that wraps the window for both AR and no AR + pkts = [(Ether(src=self.tra_if.remote_mac, + dst=self.tra_if.local_mac) / + p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4, + dst=self.tra_if.local_ip4) / + ICMP(), + seq_num=0x100000005))] + + rxs = self.send_and_expect(self.tra_if, pkts, self.tra_if) + for rx in rxs: + decrypted = p.vpp_tra_sa.decrypt(rx[0][IP]) + + # move the window forward to half way to the next wrap + pkts = [(Ether(src=self.tra_if.remote_mac, + dst=self.tra_if.local_mac) / + p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4, + dst=self.tra_if.local_ip4) / + ICMP(), + seq_num=0x180000005))] + + rxs = self.send_and_expect(self.tra_if, pkts, self.tra_if) + + # a packet less than 2^30 from the current position is: + # - AR: out of window and dropped + # - non-AR: accepted + pkts = [(Ether(src=self.tra_if.remote_mac, + dst=self.tra_if.local_mac) / + p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4, + dst=self.tra_if.local_ip4) / + ICMP(), + seq_num=0x170000005))] + + if ar_on: + self.send_and_assert_no_replies(self.tra_if, pkts) + else: + self.send_and_expect(self.tra_if, pkts, self.tra_if) + + # a packet more than 2^30 from the current position is: + # - AR: out of window and dropped + # - non-AR: considered a wrap, but since it's not a wrap + # it won't decrpyt and so will be dropped + pkts = [(Ether(src=self.tra_if.remote_mac, + dst=self.tra_if.local_mac) / + p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4, + dst=self.tra_if.local_ip4) / + ICMP(), + seq_num=0x130000005))] + + self.send_and_assert_no_replies(self.tra_if, pkts) + + # a packet less than 2^30 from the current position and is a + # wrap; (the seq is currently at 0x180000005). + # - AR: out of window so considered a wrap, so accepted + # - non-AR: not considered a wrap, so won't decrypt + p.scapy_tra_sa.seq_num = 0x260000005 + pkts = [(Ether(src=self.tra_if.remote_mac, + dst=self.tra_if.local_mac) / + p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4, + dst=self.tra_if.local_ip4) / + ICMP(), + seq_num=0x260000005))] + if ar_on: + self.send_and_expect(self.tra_if, pkts, self.tra_if) + else: + self.send_and_assert_no_replies(self.tra_if, pkts) + + # + # window positions are different now for AR/non-AR + # move non-AR forward + # + if not ar_on: + # a packet more than 2^30 from the current position and is a + # wrap; (the seq is currently at 0x180000005). + # - AR: accepted + # - non-AR: not considered a wrap, so won't decrypt + + pkts = [(Ether(src=self.tra_if.remote_mac, + dst=self.tra_if.local_mac) / + p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4, + dst=self.tra_if.local_ip4) / + ICMP(), + seq_num=0x200000005)), + (Ether(src=self.tra_if.remote_mac, + dst=self.tra_if.local_mac) / + p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4, + dst=self.tra_if.local_ip4) / + ICMP(), + seq_num=0x200000006))] + self.send_and_expect(self.tra_if, pkts, self.tra_if) + + pkts = [(Ether(src=self.tra_if.remote_mac, + dst=self.tra_if.local_mac) / + p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4, + dst=self.tra_if.local_ip4) / + ICMP(), + seq_num=0x260000005))] + self.send_and_expect(self.tra_if, pkts, self.tra_if) + def verify_tra_anti_replay(self): p = self.params[socket.AF_INET] esn_en = p.vpp_tra_sa.esn_en - seq_cycle_node_name = ('/err/%s/sequence number cycled' % - self.tra4_encrypt_node_name) + seq_cycle_node_name = \ + ('/err/%s/sequence number cycled (packet dropped)' % + self.tra4_encrypt_node_name) replay_count = self.get_replay_counts(p) hash_failed_count = self.get_hash_failed_counts(p) seq_cycle_count = self.statistics.get_err_counter(seq_cycle_node_name) @@ -358,7 +552,7 @@ class IpsecTra4(object): recv_pkts = self.send_and_expect(self.tra_if, pkts, self.tra_if) # replayed packets are dropped - self.send_and_assert_no_replies(self.tra_if, pkts) + self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2) replay_count += len(pkts) self.assertEqual(self.get_replay_counts(p), replay_count) @@ -392,7 +586,7 @@ class IpsecTra4(object): recv_pkts = self.send_and_expect(self.tra_if, [pkt], self.tra_if) # replayed packets are dropped - self.send_and_assert_no_replies(self.tra_if, pkt * 3) + self.send_and_assert_no_replies(self.tra_if, pkt * 3, timeout=0.2) replay_count += 3 self.assertEqual(self.get_replay_counts(p), replay_count) @@ -419,7 +613,7 @@ class IpsecTra4(object): dst=self.tra_if.local_ip4) / ICMP(), seq_num=350)) - self.send_and_assert_no_replies(self.tra_if, pkt * 17) + self.send_and_assert_no_replies(self.tra_if, pkt * 17, timeout=0.2) hash_failed_count += 17 self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count) @@ -435,7 +629,7 @@ class IpsecTra4(object): dst=self.tra_if.local_ip4) / ICMP(), seq_num=350)) - self.send_and_assert_no_replies(self.tra_if, pkt * 17) + self.send_and_assert_no_replies(self.tra_if, pkt * 17, timeout=0.2) undersize_count += 17 self.assert_error_counter_equal(undersize_node_name, @@ -461,7 +655,7 @@ class IpsecTra4(object): dst=self.tra_if.local_ip4) / ICMP(), seq_num=17)) - self.send_and_assert_no_replies(self.tra_if, pkt * 17) + self.send_and_assert_no_replies(self.tra_if, pkt * 17, timeout=0.2) if esn_en: # an out of window error with ESN looks like a high sequence @@ -525,6 +719,7 @@ class IpsecTra4(object): ICMP(), seq_num=0x100000005)) rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if) + decrypted = p.vpp_tra_sa.decrypt(rx[0][IP]) # @@ -543,7 +738,7 @@ class IpsecTra4(object): # # While in case A we cannot wrap the high sequence number again - # becuase VPP will consider this packet to be one that moves the + # because VPP will consider this packet to be one that moves the # window forward # pkt = (Ether(src=self.tra_if.remote_mac, @@ -552,13 +747,14 @@ class IpsecTra4(object): dst=self.tra_if.local_ip4) / ICMP(), seq_num=0x200000999)) - self.send_and_assert_no_replies(self.tra_if, [pkt], self.tra_if) + self.send_and_assert_no_replies(self.tra_if, [pkt], self.tra_if, + timeout=0.2) hash_failed_count += 1 self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count) # - # but if we move the wondow forward to case B, then we can wrap + # but if we move the window forward to case B, then we can wrap # again # p.scapy_tra_sa.seq_num = 0x100000555 @@ -586,7 +782,7 @@ class IpsecTra4(object): # without ESN TX sequence numbers can't wrap and packets are # dropped from here on out. # - self.send_and_assert_no_replies(self.tra_if, pkts) + self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2) seq_cycle_count += len(pkts) self.assert_error_counter_equal(seq_cycle_node_name, seq_cycle_count)