X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=test%2Ftemplate_ipsec.py;h=c623d6a4d9be9126cd4721e145ff5b62bcfec2a9;hb=17e5d8031f385c8ede56c59bd50957f0ea6cd2ce;hp=36e8da6635c30aeaafe05158967e265fc132e6fa;hpb=1091c4aa9bd96055e7a94d368bd6abf0c9f1b73d;p=vpp.git diff --git a/test/template_ipsec.py b/test/template_ipsec.py index 36e8da6635c..c623d6a4d9b 100644 --- a/test/template_ipsec.py +++ b/test/template_ipsec.py @@ -1,8 +1,9 @@ import unittest import socket +import struct from scapy.layers.inet import IP, ICMP, TCP, UDP -from scapy.layers.ipsec import SecurityAssociation +from scapy.layers.ipsec import SecurityAssociation, ESP from scapy.layers.l2 import Ether, Raw from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest @@ -42,6 +43,7 @@ class IPsecIPv4Params(object): IPSEC_API_CRYPTO_ALG_AES_CBC_128) self.crypt_algo = 'AES-CBC' # scapy name self.crypt_key = 'JPjyOWBeVEQiMe7h' + self.salt = 0 self.flags = 0 self.nat_header = None @@ -74,9 +76,10 @@ class IPsecIPv6Params(object): self.auth_key = 'C91KUR9GYMm5GfkEvNjX' self.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t. - IPSEC_API_CRYPTO_ALG_AES_CBC_256) + IPSEC_API_CRYPTO_ALG_AES_CBC_128) self.crypt_algo = 'AES-CBC' # scapy name - self.crypt_key = 'JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h' + self.crypt_key = 'JPjyOWBeVEQiMe7h' + self.salt = 0 self.flags = 0 self.nat_header = None @@ -85,9 +88,14 @@ def config_tun_params(p, encryption_type, tun_if): ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6} use_esn = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t. IPSEC_API_SAD_FLAG_USE_ESN)) + if p.crypt_algo == "AES-GCM": + crypt_key = p.crypt_key + struct.pack("!I", p.salt) + else: + crypt_key = p.crypt_key p.scapy_tun_sa = SecurityAssociation( encryption_type, spi=p.vpp_tun_spi, - crypt_algo=p.crypt_algo, crypt_key=p.crypt_key, + crypt_algo=p.crypt_algo, + crypt_key=crypt_key, auth_algo=p.auth_algo, auth_key=p.auth_key, tunnel_header=ip_class_by_addr_type[p.addr_type]( src=tun_if.remote_addr[p.addr_type], @@ -96,7 +104,8 @@ def config_tun_params(p, encryption_type, tun_if): use_esn=use_esn) p.vpp_tun_sa = SecurityAssociation( encryption_type, spi=p.scapy_tun_spi, - crypt_algo=p.crypt_algo, crypt_key=p.crypt_key, + crypt_algo=p.crypt_algo, + crypt_key=crypt_key, auth_algo=p.auth_algo, auth_key=p.auth_key, tunnel_header=ip_class_by_addr_type[p.addr_type]( dst=tun_if.remote_addr[p.addr_type], @@ -106,13 +115,17 @@ def config_tun_params(p, encryption_type, tun_if): def config_tra_params(p, encryption_type): - use_esn = p.flags & (VppEnum.vl_api_ipsec_sad_flags_t. - IPSEC_API_SAD_FLAG_USE_ESN) + use_esn = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t. + IPSEC_API_SAD_FLAG_USE_ESN)) + if p.crypt_algo == "AES-GCM": + crypt_key = p.crypt_key + struct.pack("!I", p.salt) + else: + crypt_key = p.crypt_key p.scapy_tra_sa = SecurityAssociation( encryption_type, spi=p.vpp_tra_spi, crypt_algo=p.crypt_algo, - crypt_key=p.crypt_key, + crypt_key=crypt_key, auth_algo=p.auth_algo, auth_key=p.auth_key, nat_t_header=p.nat_header, @@ -121,7 +134,7 @@ def config_tra_params(p, encryption_type): encryption_type, spi=p.scapy_tra_spi, crypt_algo=p.crypt_algo, - crypt_key=p.crypt_key, + crypt_key=crypt_key, auth_algo=p.auth_algo, auth_key=p.auth_key, nat_t_header=p.nat_header, @@ -146,50 +159,64 @@ class TemplateIpsec(VppTestCase): |tun_if| -------> |VPP| ------> |pg1| ------ --- --- """ + tun_spd_id = 1 + tra_spd_id = 2 def ipsec_select_backend(self): """ empty method to be overloaded when necessary """ pass + @classmethod + def setUpClass(cls): + super(TemplateIpsec, cls).setUpClass() + + @classmethod + def tearDownClass(cls): + super(TemplateIpsec, cls).tearDownClass() + def setup_params(self): self.ipv4_params = IPsecIPv4Params() self.ipv6_params = IPsecIPv6Params() self.params = {self.ipv4_params.addr_type: self.ipv4_params, self.ipv6_params.addr_type: self.ipv6_params} + def config_interfaces(self): + self.create_pg_interfaces(range(3)) + self.interfaces = list(self.pg_interfaces) + for i in self.interfaces: + i.admin_up() + i.config_ip4() + i.resolve_arp() + i.config_ip6() + i.resolve_ndp() + def setUp(self): super(TemplateIpsec, self).setUp() self.setup_params() - self.tun_spd_id = 1 - self.tra_spd_id = 2 - self.vpp_esp_protocol = (VppEnum.vl_api_ipsec_proto_t. IPSEC_API_PROTO_ESP) self.vpp_ah_protocol = (VppEnum.vl_api_ipsec_proto_t. IPSEC_API_PROTO_AH) - self.create_pg_interfaces(range(3)) - self.interfaces = list(self.pg_interfaces) - for i in self.interfaces: - i.admin_up() - i.config_ip4() - i.resolve_arp() - i.config_ip6() - i.resolve_ndp() - self.ipsec_select_backend() + self.config_interfaces() - def tearDown(self): - super(TemplateIpsec, self).tearDown() + self.ipsec_select_backend() + def unconfig_interfaces(self): for i in self.interfaces: i.admin_down() i.unconfig_ip4() i.unconfig_ip6() - if not self.vpp_dead: - self.vapi.cli("show hardware") + def tearDown(self): + super(TemplateIpsec, self).tearDown() + + self.unconfig_interfaces() + + def show_commands_at_teardown(self): + self.logger.info(self.vapi.cli("show hardware")) def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1, payload_size=54): @@ -218,9 +245,8 @@ class TemplateIpsec(VppTestCase): for i in range(count)] -class IpsecTcpTests(object): - def test_tcp_checksum(self): - """ verify checksum correctness for vpp generated packets """ +class IpsecTcp(object): + def verify_tcp_checksum(self): self.vapi.cli("test http server") p = self.params[socket.AF_INET] config_tun_params(p, self.encryption_type, self.tun_if) @@ -235,9 +261,15 @@ class IpsecTcpTests(object): self.assert_packet_checksums_valid(decrypted) -class IpsecTra4Tests(object): - def test_tra_anti_replay(self, count=1): - """ ipsec v4 transport anti-reply test """ +class IpsecTcpTests(IpsecTcp): + def test_tcp_checksum(self): + """ verify checksum correctness for vpp generated packets """ + self.verify_tcp_checksum() + + +class IpsecTra4(object): + """ verify methods for Transport v4 """ + def verify_tra_anti_replay(self, count=1): p = self.params[socket.AF_INET] use_esn = p.vpp_tra_sa.use_esn @@ -276,7 +308,11 @@ class IpsecTra4Tests(object): # a packet that does not decrypt does not move the window forward bogus_sa = SecurityAssociation(self.encryption_type, - p.vpp_tra_spi) + p.vpp_tra_spi, + crypt_algo=p.crypt_algo, + crypt_key=p.crypt_key[::-1], + auth_algo=p.auth_algo, + auth_key=p.auth_key[::-1]) pkt = (Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac) / bogus_sa.encrypt(IP(src=self.tra_if.remote_ip4, @@ -288,6 +324,22 @@ class IpsecTra4Tests(object): self.assert_packet_counter_equal( '/err/%s/Integrity check failed' % self.tra4_decrypt_node_name, 17) + # a malformed 'runt' packet + # created by a mis-constructed SA + if (ESP == self.encryption_type): + bogus_sa = SecurityAssociation(self.encryption_type, + p.vpp_tra_spi) + pkt = (Ether(src=self.tra_if.remote_mac, + dst=self.tra_if.local_mac) / + bogus_sa.encrypt(IP(src=self.tra_if.remote_ip4, + dst=self.tra_if.local_ip4) / + ICMP(), + seq_num=350)) + self.send_and_assert_no_replies(self.tra_if, pkt * 17) + + self.assert_packet_counter_equal( + '/err/%s/undersized packet' % self.tra4_decrypt_node_name, 17) + # which we can determine since this packet is still in the window pkt = (Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac) / @@ -368,7 +420,7 @@ class IpsecTra4Tests(object): p.scapy_tra_sa.seq_num = 351 p.vpp_tra_sa.seq_num = 351 - def test_tra_basic(self, count=1): + def verify_tra_basic4(self, count=1): """ ipsec v4 transport basic test """ self.vapi.cli("clear errors") try: @@ -380,6 +432,8 @@ class IpsecTra4Tests(object): recv_pkts = self.send_and_expect(self.tra_if, send_pkts, self.tra_if) for rx in recv_pkts: + self.assertEqual(len(rx) - len(Ether()), rx[IP].len) + self.assert_packet_checksums_valid(rx) try: decrypted = p.vpp_tra_sa.decrypt(rx[IP]) self.assert_packet_checksums_valid(decrypted) @@ -402,14 +456,25 @@ class IpsecTra4Tests(object): self.assert_packet_counter_equal(self.tra4_encrypt_node_name, count) self.assert_packet_counter_equal(self.tra4_decrypt_node_name, count) + +class IpsecTra4Tests(IpsecTra4): + """ UT test methods for Transport v4 """ + def test_tra_anti_replay(self): + """ ipsec v4 transport anti-reply test """ + self.verify_tra_anti_replay(count=1) + + def test_tra_basic(self, count=1): + """ ipsec v4 transport basic test """ + self.verify_tra_basic4(count=1) + def test_tra_burst(self): """ ipsec v4 transport burst test """ - self.test_tra_basic(count=257) + self.verify_tra_basic4(count=257) -class IpsecTra6Tests(object): - def test_tra_basic6(self, count=1): - """ ipsec v6 transport basic test """ +class IpsecTra6(object): + """ verify methods for Transport v6 """ + def verify_tra_basic6(self, count=1): self.vapi.cli("clear errors") try: p = self.params[socket.AF_INET6] @@ -420,6 +485,8 @@ class IpsecTra6Tests(object): recv_pkts = self.send_and_expect(self.tra_if, send_pkts, self.tra_if) for rx in recv_pkts: + self.assertEqual(len(rx) - len(Ether()) - len(IPv6()), + rx[IPv6].plen) try: decrypted = p.vpp_tra_sa.decrypt(rx[IPv6]) self.assert_packet_checksums_valid(decrypted) @@ -441,17 +508,25 @@ class IpsecTra6Tests(object): self.assert_packet_counter_equal(self.tra6_encrypt_node_name, count) self.assert_packet_counter_equal(self.tra6_decrypt_node_name, count) + +class IpsecTra6Tests(IpsecTra6): + """ UT test methods for Transport v6 """ + def test_tra_basic6(self): + """ ipsec v6 transport basic test """ + self.verify_tra_basic6(count=1) + def test_tra_burst6(self): """ ipsec v6 transport burst test """ - self.test_tra_basic6(count=257) + self.verify_tra_basic6(count=257) class IpsecTra46Tests(IpsecTra4Tests, IpsecTra6Tests): + """ UT test methods for Transport v6 and v4""" pass class IpsecTun4(object): - + """ verify methods for Tunnel v4 """ def verify_counters(self, p, count): if (hasattr(p, "spd_policy_in_any")): pkts = p.spd_policy_in_any.get_stats()['packets'] @@ -481,6 +556,8 @@ class IpsecTun4(object): def verify_encrypted(self, p, sa, rxs): decrypt_pkts = [] for rx in rxs: + self.assert_packet_checksums_valid(rx) + self.assertEqual(len(rx) - len(Ether()), rx[IP].len) try: decrypt_pkt = p.vpp_tun_sa.decrypt(rx[IP]) if not decrypt_pkt.haslayer(IP): @@ -565,7 +642,7 @@ class IpsecTun4(object): class IpsecTun4Tests(IpsecTun4): - + """ UT test methods for Tunnel v4 """ def test_tun_basic44(self): """ ipsec 4o4 tunnel basic test """ self.verify_tun_44(self.params[socket.AF_INET], count=1) @@ -576,7 +653,7 @@ class IpsecTun4Tests(IpsecTun4): class IpsecTun6(object): - + """ verify methods for Tunnel v6 """ def verify_counters(self, p, count): if (hasattr(p, "tun_sa_in")): pkts = p.tun_sa_in.get_stats()['packets'] @@ -609,6 +686,8 @@ class IpsecTun6(object): count=count) recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if) for recv_pkt in recv_pkts: + self.assertEqual(len(recv_pkt) - len(Ether()) - len(IPv6()), + recv_pkt[IPv6].plen) try: decrypt_pkt = p.vpp_tun_sa.decrypt(recv_pkt[IPv6]) if not decrypt_pkt.haslayer(IPv6): @@ -670,6 +749,7 @@ class IpsecTun6(object): class IpsecTun6Tests(IpsecTun6): + """ UT test methods for Tunnel v6 """ def test_tun_basic66(self): """ ipsec 6o6 tunnel basic test """ @@ -681,6 +761,7 @@ class IpsecTun6Tests(IpsecTun6): class IpsecTun46Tests(IpsecTun4Tests, IpsecTun6Tests): + """ UT test methods for Tunnel v6 & v4 """ pass