X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=test%2Ftemplate_ipsec.py;h=bcfc0d9253a1fd5aaaeafce76901e52d1b4359df;hb=4a56f4e48f39f9e0560833115f85270c0c04b57f;hp=25cff7f2edf161e7d2b5c106c69ad44ae9185f46;hpb=6afaae156a9ab9de79474367d8873407f3b12a71;p=vpp.git diff --git a/test/template_ipsec.py b/test/template_ipsec.py index 25cff7f2edf..bcfc0d9253a 100644 --- a/test/template_ipsec.py +++ b/test/template_ipsec.py @@ -4,11 +4,12 @@ import struct from scapy.layers.inet import IP, ICMP, TCP, UDP from scapy.layers.ipsec import SecurityAssociation, ESP -from scapy.layers.l2 import Ether, Raw +from scapy.layers.l2 import Ether +from scapy.packet import Raw from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest from framework import VppTestCase, VppTestRunner -from util import ppp, reassemble4 +from util import ppp, reassemble4, fragment_rfc791, fragment_rfc8200 from vpp_papi import VppEnum @@ -37,12 +38,12 @@ class IPsecIPv4Params(object): self.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t. IPSEC_API_INTEG_ALG_SHA1_96) self.auth_algo = 'HMAC-SHA1-96' # scapy name - self.auth_key = 'C91KUR9GYMm5GfkEvNjX' + self.auth_key = b'C91KUR9GYMm5GfkEvNjX' self.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t. IPSEC_API_CRYPTO_ALG_AES_CBC_128) self.crypt_algo = 'AES-CBC' # scapy name - self.crypt_key = 'JPjyOWBeVEQiMe7h' + self.crypt_key = b'JPjyOWBeVEQiMe7h' self.salt = 0 self.flags = 0 self.nat_header = None @@ -73,18 +74,18 @@ class IPsecIPv6Params(object): self.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t. IPSEC_API_INTEG_ALG_SHA1_96) self.auth_algo = 'HMAC-SHA1-96' # scapy name - self.auth_key = 'C91KUR9GYMm5GfkEvNjX' + self.auth_key = b'C91KUR9GYMm5GfkEvNjX' self.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t. IPSEC_API_CRYPTO_ALG_AES_CBC_128) self.crypt_algo = 'AES-CBC' # scapy name - self.crypt_key = 'JPjyOWBeVEQiMe7h' + self.crypt_key = b'JPjyOWBeVEQiMe7h' self.salt = 0 self.flags = 0 self.nat_header = None -def mk_scapy_crpyt_key(p): +def mk_scapy_crypt_key(p): if p.crypt_algo == "AES-GCM": return p.crypt_key + struct.pack("!I", p.salt) else: @@ -93,9 +94,9 @@ def mk_scapy_crpyt_key(p): def config_tun_params(p, encryption_type, tun_if): ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6} - use_esn = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t. - IPSEC_API_SAD_FLAG_USE_ESN)) - crypt_key = mk_scapy_crpyt_key(p) + esn_en = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t. + IPSEC_API_SAD_FLAG_USE_ESN)) + crypt_key = mk_scapy_crypt_key(p) p.scapy_tun_sa = SecurityAssociation( encryption_type, spi=p.vpp_tun_spi, crypt_algo=p.crypt_algo, @@ -105,7 +106,7 @@ def config_tun_params(p, encryption_type, tun_if): src=tun_if.remote_addr[p.addr_type], dst=tun_if.local_addr[p.addr_type]), nat_t_header=p.nat_header, - use_esn=use_esn) + esn_en=esn_en) p.vpp_tun_sa = SecurityAssociation( encryption_type, spi=p.scapy_tun_spi, crypt_algo=p.crypt_algo, @@ -115,13 +116,13 @@ def config_tun_params(p, encryption_type, tun_if): dst=tun_if.remote_addr[p.addr_type], src=tun_if.local_addr[p.addr_type]), nat_t_header=p.nat_header, - use_esn=use_esn) + esn_en=esn_en) def config_tra_params(p, encryption_type): - use_esn = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t. - IPSEC_API_SAD_FLAG_USE_ESN)) - crypt_key = mk_scapy_crpyt_key(p) + esn_en = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t. + IPSEC_API_SAD_FLAG_USE_ESN)) + crypt_key = mk_scapy_crypt_key(p) p.scapy_tra_sa = SecurityAssociation( encryption_type, spi=p.vpp_tra_spi, @@ -130,7 +131,7 @@ def config_tra_params(p, encryption_type): auth_algo=p.auth_algo, auth_key=p.auth_key, nat_t_header=p.nat_header, - use_esn=use_esn) + esn_en=esn_en) p.vpp_tra_sa = SecurityAssociation( encryption_type, spi=p.scapy_tra_spi, @@ -139,7 +140,7 @@ def config_tra_params(p, encryption_type): auth_algo=p.auth_algo, auth_key=p.auth_key, nat_t_header=p.nat_header, - use_esn=use_esn) + esn_en=esn_en) class TemplateIpsec(VppTestCase): @@ -223,7 +224,7 @@ class TemplateIpsec(VppTestCase): payload_size=54): return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) / sa.encrypt(IP(src=src, dst=dst) / - ICMP() / Raw('X' * payload_size)) + ICMP() / Raw(b'X' * payload_size)) for i in range(count)] def gen_encrypt_pkts6(self, sa, sw_intf, src, dst, count=1, @@ -236,7 +237,7 @@ class TemplateIpsec(VppTestCase): def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=54): return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) / - IP(src=src, dst=dst) / ICMP() / Raw('X' * payload_size) + IP(src=src, dst=dst) / ICMP() / Raw(b'X' * payload_size) for i in range(count)] def gen_pkts6(self, sw_intf, src, dst, count=1, payload_size=54): @@ -250,7 +251,6 @@ class IpsecTcp(object): def verify_tcp_checksum(self): self.vapi.cli("test http server") p = self.params[socket.AF_INET] - config_tun_params(p, self.encryption_type, self.tun_if) send = (Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac) / p.scapy_tun_sa.encrypt(IP(src=p.remote_tun_if_host, dst=self.tun_if.local_ip4) / @@ -272,7 +272,7 @@ class IpsecTra4(object): """ verify methods for Transport v4 """ def verify_tra_anti_replay(self): p = self.params[socket.AF_INET] - use_esn = p.vpp_tra_sa.use_esn + esn_en = p.vpp_tra_sa.esn_en seq_cycle_node_name = ('/err/%s/sequence number cycled' % self.tra4_encrypt_node_name) @@ -317,6 +317,21 @@ class IpsecTra4(object): replay_count += len(pkts) self.assert_error_counter_equal(replay_node_name, replay_count) + # + # now send a batch of packets all with the same sequence number + # the first packet in the batch is legitimate, the rest bogus + # + pkts = (Ether(src=self.tra_if.remote_mac, + dst=self.tra_if.local_mac) / + p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4, + dst=self.tra_if.local_ip4) / + ICMP(), + seq_num=35)) + recv_pkts = self.send_and_expect(self.tra_if, pkts * 8, + self.tra_if, n_rx=1) + replay_count += 7 + self.assert_error_counter_equal(replay_node_name, replay_count) + # # now move the window over to 257 (more than one byte) and into Case A # @@ -347,7 +362,7 @@ class IpsecTra4(object): bogus_sa = SecurityAssociation(self.encryption_type, p.vpp_tra_spi, crypt_algo=p.crypt_algo, - crypt_key=mk_scapy_crpyt_key(p)[::-1], + crypt_key=mk_scapy_crypt_key(p)[::-1], auth_algo=p.auth_algo, auth_key=p.auth_key[::-1]) pkt = (Ether(src=self.tra_if.remote_mac, @@ -364,7 +379,7 @@ class IpsecTra4(object): # a malformed 'runt' packet # created by a mis-constructed SA - if (ESP == self.encryption_type): + if (ESP == self.encryption_type and p.crypt_algo != "NULL"): bogus_sa = SecurityAssociation(self.encryption_type, p.vpp_tra_spi) pkt = (Ether(src=self.tra_if.remote_mac, @@ -401,7 +416,7 @@ class IpsecTra4(object): seq_num=17)) self.send_and_assert_no_replies(self.tra_if, pkt * 17) - if use_esn: + if esn_en: # an out of window error with ESN looks like a high sequence # wrap. but since it isn't then the verify will fail. hash_failed_count += 17 @@ -441,7 +456,7 @@ class IpsecTra4(object): seq_num=seq)) for seq in range(259, 280)] - if use_esn: + if esn_en: rxs = self.send_and_expect(self.tra_if, pkts, self.tra_if) # @@ -645,21 +660,21 @@ class IpsecTra46Tests(IpsecTra4Tests, IpsecTra6Tests): class IpsecTun4(object): """ verify methods for Tunnel v4 """ - def verify_counters4(self, p, count, n_frags=None): + def verify_counters4(self, p, count, n_frags=None, worker=None): if not n_frags: n_frags = count if (hasattr(p, "spd_policy_in_any")): - pkts = p.spd_policy_in_any.get_stats()['packets'] + pkts = p.spd_policy_in_any.get_stats(worker)['packets'] self.assertEqual(pkts, count, "incorrect SPD any policy: expected %d != %d" % (count, pkts)) if (hasattr(p, "tun_sa_in")): - pkts = p.tun_sa_in.get_stats()['packets'] + pkts = p.tun_sa_in.get_stats(worker)['packets'] self.assertEqual(pkts, count, "incorrect SA in counts: expected %d != %d" % (count, pkts)) - pkts = p.tun_sa_out.get_stats()['packets'] + pkts = p.tun_sa_out.get_stats(worker)['packets'] self.assertEqual(pkts, count, "incorrect SA out counts: expected %d != %d" % (count, pkts)) @@ -703,7 +718,6 @@ class IpsecTun4(object): if not n_rx: n_rx = count try: - config_tun_params(p, self.encryption_type, self.tun_if) send_pkts = self.gen_encrypt_pkts(p.scapy_tun_sa, self.tun_if, src=p.remote_tun_if_host, dst=self.pg1.remote_ip4, @@ -724,10 +738,39 @@ class IpsecTun4(object): self.verify_counters4(p, count, n_rx) + def verify_tun_reass_44(self, p): + self.vapi.cli("clear errors") + self.vapi.ip_reassembly_enable_disable( + sw_if_index=self.tun_if.sw_if_index, enable_ip4=True) + + try: + send_pkts = self.gen_encrypt_pkts(p.scapy_tun_sa, self.tun_if, + src=p.remote_tun_if_host, + dst=self.pg1.remote_ip4, + payload_size=1900, + count=1) + send_pkts = fragment_rfc791(send_pkts[0], 1400) + recv_pkts = self.send_and_expect(self.tun_if, send_pkts, + self.pg1, n_rx=1) + self.verify_decrypted(p, recv_pkts) + + send_pkts = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4, + dst=p.remote_tun_if_host, count=1) + recv_pkts = self.send_and_expect(self.pg1, send_pkts, + self.tun_if) + self.verify_encrypted(p, p.vpp_tun_sa, recv_pkts) + + finally: + self.logger.info(self.vapi.ppcli("show error")) + self.logger.info(self.vapi.ppcli("show ipsec all")) + + self.verify_counters4(p, 1, 1) + self.vapi.ip_reassembly_enable_disable( + sw_if_index=self.tun_if.sw_if_index, enable_ip4=False) + def verify_tun_64(self, p, count=1): self.vapi.cli("clear errors") try: - config_tun_params(p, self.encryption_type, self.tun_if) send_pkts = self.gen_encrypt_pkts6(p.scapy_tun_sa, self.tun_if, src=p.remote_tun_if_host6, dst=self.pg1.remote_ip6, @@ -766,7 +809,7 @@ class IpsecTun4(object): pkt = (Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac) / IP(src=p.remote_tun_if_host, dst=self.tun_if.local_ip4) / UDP(sport=333, dport=4500) / - Raw(0xff)) + Raw(b'\xff')) self.send_and_assert_no_replies(self.tun_if, pkt*31) self.assert_error_counter_equal( '/err/%s/NAT Keepalive' % self.tun4_input_node, 31) @@ -774,7 +817,7 @@ class IpsecTun4(object): pkt = (Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac) / IP(src=p.remote_tun_if_host, dst=self.tun_if.local_ip4) / UDP(sport=333, dport=4500) / - Raw(0xfe)) + Raw(b'\xfe')) self.send_and_assert_no_replies(self.tun_if, pkt*31) self.assert_error_counter_equal( '/err/%s/Too Short' % self.tun4_input_node, 31) @@ -786,6 +829,10 @@ class IpsecTun4Tests(IpsecTun4): """ ipsec 4o4 tunnel basic test """ self.verify_tun_44(self.params[socket.AF_INET], count=1) + def test_tun_reass_basic44(self): + """ ipsec 4o4 tunnel basic reassembly test """ + self.verify_tun_reass_44(self.params[socket.AF_INET]) + def test_tun_burst44(self): """ ipsec 4o4 tunnel burst test """ self.verify_tun_44(self.params[socket.AF_INET], count=257) @@ -793,14 +840,14 @@ class IpsecTun4Tests(IpsecTun4): class IpsecTun6(object): """ verify methods for Tunnel v6 """ - def verify_counters6(self, p_in, p_out, count): + def verify_counters6(self, p_in, p_out, count, worker=None): if (hasattr(p_in, "tun_sa_in")): - pkts = p_in.tun_sa_in.get_stats()['packets'] + pkts = p_in.tun_sa_in.get_stats(worker)['packets'] self.assertEqual(pkts, count, "incorrect SA in counts: expected %d != %d" % (count, pkts)) if (hasattr(p_out, "tun_sa_out")): - pkts = p_out.tun_sa_out.get_stats()['packets'] + pkts = p_out.tun_sa_out.get_stats(worker)['packets'] self.assertEqual(pkts, count, "incorrect SA out counts: expected %d != %d" % (count, pkts)) @@ -837,7 +884,6 @@ class IpsecTun6(object): self.vapi.cli("clear errors") self.vapi.cli("clear ipsec sa") - config_tun_params(p_in, self.encryption_type, self.tun_if) send_pkts = self.gen_encrypt_pkts6(p_in.scapy_tun_sa, self.tun_if, src=p_in.remote_tun_if_host, dst=self.pg1.remote_ip6, @@ -851,8 +897,6 @@ class IpsecTun6(object): if not p_out: p_out = p_in try: - config_tun_params(p_in, self.encryption_type, self.tun_if) - config_tun_params(p_out, self.encryption_type, self.tun_if) send_pkts = self.gen_encrypt_pkts6(p_in.scapy_tun_sa, self.tun_if, src=p_in.remote_tun_if_host, dst=self.pg1.remote_ip6, @@ -864,8 +908,7 @@ class IpsecTun6(object): dst=p_out.remote_tun_if_host, count=count, payload_size=payload_size) - recv_pkts = self.send_and_expect(self.pg1, send_pkts, - self.tun_if) + recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if) self.verify_encrypted6(p_out, p_out.vpp_tun_sa, recv_pkts) finally: @@ -873,11 +916,40 @@ class IpsecTun6(object): self.logger.info(self.vapi.ppcli("show ipsec all")) self.verify_counters6(p_in, p_out, count) + def verify_tun_reass_66(self, p): + self.vapi.cli("clear errors") + self.vapi.ip_reassembly_enable_disable( + sw_if_index=self.tun_if.sw_if_index, enable_ip6=True) + + try: + send_pkts = self.gen_encrypt_pkts6(p.scapy_tun_sa, self.tun_if, + src=p.remote_tun_if_host, + dst=self.pg1.remote_ip6, + count=1, + payload_size=1900) + send_pkts = fragment_rfc8200(send_pkts[0], 1, 1400, self.logger) + recv_pkts = self.send_and_expect(self.tun_if, send_pkts, + self.pg1, n_rx=1) + self.verify_decrypted6(p, recv_pkts) + + send_pkts = self.gen_pkts6(self.pg1, src=self.pg1.remote_ip6, + dst=p.remote_tun_if_host, + count=1, + payload_size=64) + recv_pkts = self.send_and_expect(self.pg1, send_pkts, + self.tun_if) + self.verify_encrypted6(p, p.vpp_tun_sa, recv_pkts) + finally: + self.logger.info(self.vapi.ppcli("show error")) + self.logger.info(self.vapi.ppcli("show ipsec all")) + self.verify_counters6(p, p, 1) + self.vapi.ip_reassembly_enable_disable( + sw_if_index=self.tun_if.sw_if_index, enable_ip6=False) + def verify_tun_46(self, p, count=1): """ ipsec 4o6 tunnel basic test """ self.vapi.cli("clear errors") try: - config_tun_params(p, self.encryption_type, self.tun_if) send_pkts = self.gen_encrypt_pkts(p.scapy_tun_sa, self.tun_if, src=p.remote_tun_if_host4, dst=self.pg1.remote_ip4, @@ -920,11 +992,77 @@ class IpsecTun6Tests(IpsecTun6): """ ipsec 6o6 tunnel basic test """ self.verify_tun_66(self.params[socket.AF_INET6], count=1) + def test_tun_reass_basic66(self): + """ ipsec 6o6 tunnel basic reassembly test """ + self.verify_tun_reass_66(self.params[socket.AF_INET6]) + def test_tun_burst66(self): """ ipsec 6o6 tunnel burst test """ self.verify_tun_66(self.params[socket.AF_INET6], count=257) +class IpsecTun6HandoffTests(IpsecTun6): + """ UT test methods for Tunnel v6 with multiple workers """ + worker_config = "workers 2" + + def test_tun_handoff_66(self): + """ ipsec 6o6 tunnel worker hand-off test """ + N_PKTS = 15 + p = self.params[socket.AF_INET6] + + # inject alternately on worker 0 and 1. all counts on the SA + # should be against worker 0 + for worker in [0, 1, 0, 1]: + send_pkts = self.gen_encrypt_pkts6(p.scapy_tun_sa, self.tun_if, + src=p.remote_tun_if_host, + dst=self.pg1.remote_ip6, + count=N_PKTS) + recv_pkts = self.send_and_expect(self.tun_if, send_pkts, + self.pg1, worker=worker) + self.verify_decrypted6(p, recv_pkts) + + send_pkts = self.gen_pkts6(self.pg1, src=self.pg1.remote_ip6, + dst=p.remote_tun_if_host, + count=N_PKTS) + recv_pkts = self.send_and_expect(self.pg1, send_pkts, + self.tun_if, worker=worker) + self.verify_encrypted6(p, p.vpp_tun_sa, recv_pkts) + + # all counts against the first worker that was used + self.verify_counters6(p, p, 4*N_PKTS, worker=0) + + +class IpsecTun4HandoffTests(IpsecTun4): + """ UT test methods for Tunnel v4 with multiple workers """ + worker_config = "workers 2" + + def test_tun_handooff_44(self): + """ ipsec 4o4 tunnel worker hand-off test """ + N_PKTS = 15 + p = self.params[socket.AF_INET] + + # inject alternately on worker 0 and 1. all counts on the SA + # should be against worker 0 + for worker in [0, 1, 0, 1]: + send_pkts = self.gen_encrypt_pkts(p.scapy_tun_sa, self.tun_if, + src=p.remote_tun_if_host, + dst=self.pg1.remote_ip4, + count=N_PKTS) + recv_pkts = self.send_and_expect(self.tun_if, send_pkts, + self.pg1, worker=worker) + self.verify_decrypted(p, recv_pkts) + + send_pkts = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4, + dst=p.remote_tun_if_host, + count=N_PKTS) + recv_pkts = self.send_and_expect(self.pg1, send_pkts, + self.tun_if, worker=worker) + self.verify_encrypted(p, p.vpp_tun_sa, recv_pkts) + + # all counts against the first worker that was used + self.verify_counters4(p, 4*N_PKTS, worker=0) + + class IpsecTun46Tests(IpsecTun4Tests, IpsecTun6Tests): """ UT test methods for Tunnel v6 & v4 """ pass