X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=test%2Ftemplate_ipsec.py;h=d6641c45dd1b7fdd8e968723c2cc7f019fee991c;hb=80f6fd53f;hp=e6f1d2488240dbebee1f74fd6d0b688c8fd43268;hpb=7f9b7f9f492d1748d8ba025b3a713058fdb1943d;p=vpp.git diff --git a/test/template_ipsec.py b/test/template_ipsec.py index e6f1d248824..d6641c45dd1 100644 --- a/test/template_ipsec.py +++ b/test/template_ipsec.py @@ -1,5 +1,6 @@ import unittest import socket +import struct from scapy.layers.inet import IP, ICMP, TCP, UDP from scapy.layers.ipsec import SecurityAssociation @@ -42,6 +43,7 @@ class IPsecIPv4Params(object): IPSEC_API_CRYPTO_ALG_AES_CBC_128) self.crypt_algo = 'AES-CBC' # scapy name self.crypt_key = 'JPjyOWBeVEQiMe7h' + self.salt = 0 self.flags = 0 self.nat_header = None @@ -74,9 +76,10 @@ class IPsecIPv6Params(object): self.auth_key = 'C91KUR9GYMm5GfkEvNjX' self.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t. - IPSEC_API_CRYPTO_ALG_AES_CBC_256) + IPSEC_API_CRYPTO_ALG_AES_CBC_128) self.crypt_algo = 'AES-CBC' # scapy name - self.crypt_key = 'JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h' + self.crypt_key = 'JPjyOWBeVEQiMe7h' + self.salt = 0 self.flags = 0 self.nat_header = None @@ -85,9 +88,14 @@ def config_tun_params(p, encryption_type, tun_if): ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6} use_esn = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t. IPSEC_API_SAD_FLAG_USE_ESN)) + if p.crypt_algo == "AES-GCM": + crypt_key = p.crypt_key + struct.pack("!I", p.salt) + else: + crypt_key = p.crypt_key p.scapy_tun_sa = SecurityAssociation( encryption_type, spi=p.vpp_tun_spi, - crypt_algo=p.crypt_algo, crypt_key=p.crypt_key, + crypt_algo=p.crypt_algo, + crypt_key=crypt_key, auth_algo=p.auth_algo, auth_key=p.auth_key, tunnel_header=ip_class_by_addr_type[p.addr_type]( src=tun_if.remote_addr[p.addr_type], @@ -96,7 +104,8 @@ def config_tun_params(p, encryption_type, tun_if): use_esn=use_esn) p.vpp_tun_sa = SecurityAssociation( encryption_type, spi=p.scapy_tun_spi, - crypt_algo=p.crypt_algo, crypt_key=p.crypt_key, + crypt_algo=p.crypt_algo, + crypt_key=crypt_key, auth_algo=p.auth_algo, auth_key=p.auth_key, tunnel_header=ip_class_by_addr_type[p.addr_type]( dst=tun_if.remote_addr[p.addr_type], @@ -106,13 +115,17 @@ def config_tun_params(p, encryption_type, tun_if): def config_tra_params(p, encryption_type): - use_esn = p.flags & (VppEnum.vl_api_ipsec_sad_flags_t. - IPSEC_API_SAD_FLAG_USE_ESN) + use_esn = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t. + IPSEC_API_SAD_FLAG_USE_ESN)) + if p.crypt_algo == "AES-GCM": + crypt_key = p.crypt_key + struct.pack("!I", p.salt) + else: + crypt_key = p.crypt_key p.scapy_tra_sa = SecurityAssociation( encryption_type, spi=p.vpp_tra_spi, crypt_algo=p.crypt_algo, - crypt_key=p.crypt_key, + crypt_key=crypt_key, auth_algo=p.auth_algo, auth_key=p.auth_key, nat_t_header=p.nat_header, @@ -121,7 +134,7 @@ def config_tra_params(p, encryption_type): encryption_type, spi=p.scapy_tra_spi, crypt_algo=p.crypt_algo, - crypt_key=p.crypt_key, + crypt_key=crypt_key, auth_algo=p.auth_algo, auth_key=p.auth_key, nat_t_header=p.nat_header, @@ -146,6 +159,8 @@ class TemplateIpsec(VppTestCase): |tun_if| -------> |VPP| ------> |pg1| ------ --- --- """ + tun_spd_id = 1 + tra_spd_id = 2 def ipsec_select_backend(self): """ empty method to be overloaded when necessary """ @@ -159,48 +174,49 @@ class TemplateIpsec(VppTestCase): def tearDownClass(cls): super(TemplateIpsec, cls).tearDownClass() - def setUp(self): - super(TemplateIpsec, self).setUp() - def setup_params(self): self.ipv4_params = IPsecIPv4Params() self.ipv6_params = IPsecIPv6Params() self.params = {self.ipv4_params.addr_type: self.ipv4_params, self.ipv6_params.addr_type: self.ipv6_params} + def config_interfaces(self): + self.create_pg_interfaces(range(3)) + self.interfaces = list(self.pg_interfaces) + for i in self.interfaces: + i.admin_up() + i.config_ip4() + i.resolve_arp() + i.config_ip6() + i.resolve_ndp() + def setUp(self): super(TemplateIpsec, self).setUp() self.setup_params() - self.tun_spd_id = 1 - self.tra_spd_id = 2 - self.vpp_esp_protocol = (VppEnum.vl_api_ipsec_proto_t. IPSEC_API_PROTO_ESP) self.vpp_ah_protocol = (VppEnum.vl_api_ipsec_proto_t. IPSEC_API_PROTO_AH) - self.create_pg_interfaces(range(3)) - self.interfaces = list(self.pg_interfaces) - for i in self.interfaces: - i.admin_up() - i.config_ip4() - i.resolve_arp() - i.config_ip6() - i.resolve_ndp() - self.ipsec_select_backend() + self.config_interfaces() - def tearDown(self): - super(TemplateIpsec, self).tearDown() + self.ipsec_select_backend() + def unconfig_interfaces(self): for i in self.interfaces: i.admin_down() i.unconfig_ip4() i.unconfig_ip6() - if not self.vpp_dead: - self.vapi.cli("show hardware") + def tearDown(self): + super(TemplateIpsec, self).tearDown() + + self.unconfig_interfaces() + + def show_commands_at_teardown(self): + self.logger.info(self.vapi.cli("show hardware")) def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1, payload_size=54): @@ -229,9 +245,8 @@ class TemplateIpsec(VppTestCase): for i in range(count)] -class IpsecTcpTests(object): - def test_tcp_checksum(self): - """ verify checksum correctness for vpp generated packets """ +class IpsecTcp(object): + def verify_tcp_checksum(self): self.vapi.cli("test http server") p = self.params[socket.AF_INET] config_tun_params(p, self.encryption_type, self.tun_if) @@ -246,9 +261,15 @@ class IpsecTcpTests(object): self.assert_packet_checksums_valid(decrypted) -class IpsecTra4Tests(object): - def test_tra_anti_replay(self, count=1): - """ ipsec v4 transport anti-reply test """ +class IpsecTcpTests(IpsecTcp): + def test_tcp_checksum(self): + """ verify checksum correctness for vpp generated packets """ + self.verify_tcp_checksum() + + +class IpsecTra4(object): + """ verify methods for Transport v4 """ + def verify_tra_anti_replay(self, count=1): p = self.params[socket.AF_INET] use_esn = p.vpp_tra_sa.use_esn @@ -379,7 +400,7 @@ class IpsecTra4Tests(object): p.scapy_tra_sa.seq_num = 351 p.vpp_tra_sa.seq_num = 351 - def test_tra_basic(self, count=1): + def verify_tra_basic4(self, count=1): """ ipsec v4 transport basic test """ self.vapi.cli("clear errors") try: @@ -413,14 +434,25 @@ class IpsecTra4Tests(object): self.assert_packet_counter_equal(self.tra4_encrypt_node_name, count) self.assert_packet_counter_equal(self.tra4_decrypt_node_name, count) + +class IpsecTra4Tests(IpsecTra4): + """ UT test methods for Transport v4 """ + def test_tra_anti_replay(self): + """ ipsec v4 transport anti-reply test """ + self.verify_tra_anti_replay(count=1) + + def test_tra_basic(self, count=1): + """ ipsec v4 transport basic test """ + self.verify_tra_basic4(count=1) + def test_tra_burst(self): """ ipsec v4 transport burst test """ - self.test_tra_basic(count=257) + self.verify_tra_basic4(count=257) -class IpsecTra6Tests(object): - def test_tra_basic6(self, count=1): - """ ipsec v6 transport basic test """ +class IpsecTra6(object): + """ verify methods for Transport v6 """ + def verify_tra_basic6(self, count=1): self.vapi.cli("clear errors") try: p = self.params[socket.AF_INET6] @@ -452,17 +484,25 @@ class IpsecTra6Tests(object): self.assert_packet_counter_equal(self.tra6_encrypt_node_name, count) self.assert_packet_counter_equal(self.tra6_decrypt_node_name, count) + +class IpsecTra6Tests(IpsecTra6): + """ UT test methods for Transport v6 """ + def test_tra_basic6(self): + """ ipsec v6 transport basic test """ + self.verify_tra_basic6(count=1) + def test_tra_burst6(self): """ ipsec v6 transport burst test """ - self.test_tra_basic6(count=257) + self.verify_tra_basic6(count=257) class IpsecTra46Tests(IpsecTra4Tests, IpsecTra6Tests): + """ UT test methods for Transport v6 and v4""" pass class IpsecTun4(object): - + """ verify methods for Tunnel v4 """ def verify_counters(self, p, count): if (hasattr(p, "spd_policy_in_any")): pkts = p.spd_policy_in_any.get_stats()['packets'] @@ -576,7 +616,7 @@ class IpsecTun4(object): class IpsecTun4Tests(IpsecTun4): - + """ UT test methods for Tunnel v4 """ def test_tun_basic44(self): """ ipsec 4o4 tunnel basic test """ self.verify_tun_44(self.params[socket.AF_INET], count=1) @@ -587,7 +627,7 @@ class IpsecTun4Tests(IpsecTun4): class IpsecTun6(object): - + """ verify methods for Tunnel v6 """ def verify_counters(self, p, count): if (hasattr(p, "tun_sa_in")): pkts = p.tun_sa_in.get_stats()['packets'] @@ -681,6 +721,7 @@ class IpsecTun6(object): class IpsecTun6Tests(IpsecTun6): + """ UT test methods for Tunnel v6 """ def test_tun_basic66(self): """ ipsec 6o6 tunnel basic test """ @@ -692,6 +733,7 @@ class IpsecTun6Tests(IpsecTun6): class IpsecTun46Tests(IpsecTun4Tests, IpsecTun6Tests): + """ UT test methods for Tunnel v6 & v4 """ pass