X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=test%2Ftest_ipsec_ah.py;fp=test%2Ftest_ipsec_ah.py;h=190bde78f560487d86ce290720eab3c6223aeb75;hb=d9b0c6fbf7aa5bd9af84264105b39c82028a4a29;hp=8f8b2bf15509185aa030be6ae3cb6055bad980de;hpb=f90348bcb4afd0af2611cefc43b17ef3042b511c;p=vpp.git diff --git a/test/test_ipsec_ah.py b/test/test_ipsec_ah.py index 8f8b2bf1550..190bde78f56 100644 --- a/test/test_ipsec_ah.py +++ b/test/test_ipsec_ah.py @@ -8,13 +8,23 @@ from scapy.layers.l2 import Ether from scapy.packet import Raw from framework import VppTestRunner -from template_ipsec import TemplateIpsec, IpsecTra46Tests, IpsecTun46Tests, \ - config_tun_params, config_tra_params, IPsecIPv4Params, IPsecIPv6Params, \ - IpsecTra4, IpsecTun4, IpsecTra6, IpsecTun6, \ - IpsecTun6HandoffTests, IpsecTun4HandoffTests +from template_ipsec import ( + TemplateIpsec, + IpsecTra46Tests, + IpsecTun46Tests, + config_tun_params, + config_tra_params, + IPsecIPv4Params, + IPsecIPv6Params, + IpsecTra4, + IpsecTun4, + IpsecTra6, + IpsecTun6, + IpsecTun6HandoffTests, + IpsecTun4HandoffTests, +) from template_ipsec import IpsecTcpTests -from vpp_ipsec import VppIpsecSA, VppIpsecSpd, VppIpsecSpdEntry,\ - VppIpsecSpdItfBinding +from vpp_ipsec import VppIpsecSA, VppIpsecSpd, VppIpsecSpdEntry, VppIpsecSpdItfBinding from vpp_ip_route import VppIpRoute, VppRoutePath from vpp_ip import DpoProto from vpp_papi import VppEnum @@ -41,6 +51,7 @@ class ConfigIpsecAH(TemplateIpsec): --- --- --- """ + encryption_type = AH net_objs = [] tra4_encrypt_node_name = "ah4-encrypt" @@ -79,13 +90,11 @@ class ConfigIpsecAH(TemplateIpsec): self.tun_spd.add_vpp_config() self.net_objs.append(self.tun_spd) - b = VppIpsecSpdItfBinding(self, self.tra_spd, - self.tra_if) + b = VppIpsecSpdItfBinding(self, self.tra_spd, self.tra_if) b.add_vpp_config() self.net_objs.append(b) - b = VppIpsecSpdItfBinding(self, self.tun_spd, - self.tun_if) + b = VppIpsecSpdItfBinding(self, self.tun_spd, self.tun_if) b.add_vpp_config() self.net_objs.append(b) @@ -97,10 +106,16 @@ class ConfigIpsecAH(TemplateIpsec): config_tun_params(p, self.encryption_type, self.tun_if) for p in params: d = DpoProto.DPO_PROTO_IP6 if p.is_ipv6 else DpoProto.DPO_PROTO_IP4 - r = VppIpRoute(self, p.remote_tun_if_host, p.addr_len, - [VppRoutePath(self.tun_if.remote_addr[p.addr_type], - 0xffffffff, - proto=d)]) + r = VppIpRoute( + self, + p.remote_tun_if_host, + p.addr_len, + [ + VppRoutePath( + self.tun_if.remote_addr[p.addr_type], 0xFFFFFFFF, proto=d + ) + ], + ) r.add_vpp_config() self.net_objs.append(r) self.logger.info(self.vapi.ppcli("show ipsec all")) @@ -130,74 +145,116 @@ class ConfigIpsecAH(TemplateIpsec): params.outer_hop_limit = 253 params.outer_flow_label = 0x12345 - params.tun_sa_in = VppIpsecSA(self, scapy_tun_sa_id, scapy_tun_spi, - auth_algo_vpp_id, auth_key, - crypt_algo_vpp_id, crypt_key, - self.vpp_ah_protocol, - self.tun_if.local_addr[addr_type], - self.tun_if.remote_addr[addr_type], - tun_flags=tun_flags, - flags=flags, - dscp=params.dscp) - - params.tun_sa_out = VppIpsecSA(self, vpp_tun_sa_id, vpp_tun_spi, - auth_algo_vpp_id, auth_key, - crypt_algo_vpp_id, crypt_key, - self.vpp_ah_protocol, - self.tun_if.remote_addr[addr_type], - self.tun_if.local_addr[addr_type], - tun_flags=tun_flags, - flags=flags, - dscp=params.dscp) + params.tun_sa_in = VppIpsecSA( + self, + scapy_tun_sa_id, + scapy_tun_spi, + auth_algo_vpp_id, + auth_key, + crypt_algo_vpp_id, + crypt_key, + self.vpp_ah_protocol, + self.tun_if.local_addr[addr_type], + self.tun_if.remote_addr[addr_type], + tun_flags=tun_flags, + flags=flags, + dscp=params.dscp, + ) + + params.tun_sa_out = VppIpsecSA( + self, + vpp_tun_sa_id, + vpp_tun_spi, + auth_algo_vpp_id, + auth_key, + crypt_algo_vpp_id, + crypt_key, + self.vpp_ah_protocol, + self.tun_if.remote_addr[addr_type], + self.tun_if.local_addr[addr_type], + tun_flags=tun_flags, + flags=flags, + dscp=params.dscp, + ) objs.append(params.tun_sa_in) objs.append(params.tun_sa_out) - params.spd_policy_in_any = VppIpsecSpdEntry(self, self.tun_spd, - vpp_tun_sa_id, - addr_any, addr_bcast, - addr_any, addr_bcast, - socket.IPPROTO_AH) - params.spd_policy_out_any = VppIpsecSpdEntry(self, self.tun_spd, - vpp_tun_sa_id, - addr_any, addr_bcast, - addr_any, addr_bcast, - socket.IPPROTO_AH, - is_outbound=0) + params.spd_policy_in_any = VppIpsecSpdEntry( + self, + self.tun_spd, + vpp_tun_sa_id, + addr_any, + addr_bcast, + addr_any, + addr_bcast, + socket.IPPROTO_AH, + ) + params.spd_policy_out_any = VppIpsecSpdEntry( + self, + self.tun_spd, + vpp_tun_sa_id, + addr_any, + addr_bcast, + addr_any, + addr_bcast, + socket.IPPROTO_AH, + is_outbound=0, + ) objs.append(params.spd_policy_out_any) objs.append(params.spd_policy_in_any) - e1 = VppIpsecSpdEntry(self, self.tun_spd, vpp_tun_sa_id, - remote_tun_if_host, - remote_tun_if_host, - self.pg1.remote_addr[addr_type], - self.pg1.remote_addr[addr_type], - 0, priority=10, - policy=e.IPSEC_API_SPD_ACTION_PROTECT, - is_outbound=0) - e2 = VppIpsecSpdEntry(self, self.tun_spd, scapy_tun_sa_id, - self.pg1.remote_addr[addr_type], - self.pg1.remote_addr[addr_type], - remote_tun_if_host, - remote_tun_if_host, - 0, policy=e.IPSEC_API_SPD_ACTION_PROTECT, - priority=10) - e3 = VppIpsecSpdEntry(self, self.tun_spd, vpp_tun_sa_id, - remote_tun_if_host, - remote_tun_if_host, - self.pg0.local_addr[addr_type], - self.pg0.local_addr[addr_type], - 0, priority=20, - policy=e.IPSEC_API_SPD_ACTION_PROTECT, - is_outbound=0) - e4 = VppIpsecSpdEntry(self, self.tun_spd, scapy_tun_sa_id, - self.pg0.local_addr[addr_type], - self.pg0.local_addr[addr_type], - remote_tun_if_host, - remote_tun_if_host, - 0, policy=e.IPSEC_API_SPD_ACTION_PROTECT, - priority=20) + e1 = VppIpsecSpdEntry( + self, + self.tun_spd, + vpp_tun_sa_id, + remote_tun_if_host, + remote_tun_if_host, + self.pg1.remote_addr[addr_type], + self.pg1.remote_addr[addr_type], + 0, + priority=10, + policy=e.IPSEC_API_SPD_ACTION_PROTECT, + is_outbound=0, + ) + e2 = VppIpsecSpdEntry( + self, + self.tun_spd, + scapy_tun_sa_id, + self.pg1.remote_addr[addr_type], + self.pg1.remote_addr[addr_type], + remote_tun_if_host, + remote_tun_if_host, + 0, + policy=e.IPSEC_API_SPD_ACTION_PROTECT, + priority=10, + ) + e3 = VppIpsecSpdEntry( + self, + self.tun_spd, + vpp_tun_sa_id, + remote_tun_if_host, + remote_tun_if_host, + self.pg0.local_addr[addr_type], + self.pg0.local_addr[addr_type], + 0, + priority=20, + policy=e.IPSEC_API_SPD_ACTION_PROTECT, + is_outbound=0, + ) + e4 = VppIpsecSpdEntry( + self, + self.tun_spd, + scapy_tun_sa_id, + self.pg0.local_addr[addr_type], + self.pg0.local_addr[addr_type], + remote_tun_if_host, + remote_tun_if_host, + 0, + policy=e.IPSEC_API_SPD_ACTION_PROTECT, + priority=20, + ) objs = objs + [e1, e2, e3, e4] @@ -218,49 +275,92 @@ class ConfigIpsecAH(TemplateIpsec): crypt_key = params.crypt_key addr_any = params.addr_any addr_bcast = params.addr_bcast - flags = params.flags | (VppEnum.vl_api_ipsec_sad_flags_t. - IPSEC_API_SAD_FLAG_USE_ANTI_REPLAY) + flags = params.flags | ( + VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_USE_ANTI_REPLAY + ) e = VppEnum.vl_api_ipsec_spd_action_t objs = [] - params.tra_sa_in = VppIpsecSA(self, scapy_tra_sa_id, scapy_tra_spi, - auth_algo_vpp_id, auth_key, - crypt_algo_vpp_id, crypt_key, - self.vpp_ah_protocol, - flags=flags) - params.tra_sa_out = VppIpsecSA(self, vpp_tra_sa_id, vpp_tra_spi, - auth_algo_vpp_id, auth_key, - crypt_algo_vpp_id, crypt_key, - self.vpp_ah_protocol, - flags=flags) + params.tra_sa_in = VppIpsecSA( + self, + scapy_tra_sa_id, + scapy_tra_spi, + auth_algo_vpp_id, + auth_key, + crypt_algo_vpp_id, + crypt_key, + self.vpp_ah_protocol, + flags=flags, + ) + params.tra_sa_out = VppIpsecSA( + self, + vpp_tra_sa_id, + vpp_tra_spi, + auth_algo_vpp_id, + auth_key, + crypt_algo_vpp_id, + crypt_key, + self.vpp_ah_protocol, + flags=flags, + ) objs.append(params.tra_sa_in) objs.append(params.tra_sa_out) - objs.append(VppIpsecSpdEntry(self, self.tra_spd, vpp_tra_sa_id, - addr_any, addr_bcast, - addr_any, addr_bcast, - socket.IPPROTO_AH)) - objs.append(VppIpsecSpdEntry(self, self.tra_spd, scapy_tra_sa_id, - addr_any, addr_bcast, - addr_any, addr_bcast, - socket.IPPROTO_AH, - is_outbound=0)) - objs.append(VppIpsecSpdEntry(self, self.tra_spd, vpp_tra_sa_id, - self.tra_if.local_addr[addr_type], - self.tra_if.local_addr[addr_type], - self.tra_if.remote_addr[addr_type], - self.tra_if.remote_addr[addr_type], - 0, priority=10, - policy=e.IPSEC_API_SPD_ACTION_PROTECT, - is_outbound=0)) - objs.append(VppIpsecSpdEntry(self, self.tra_spd, scapy_tra_sa_id, - self.tra_if.local_addr[addr_type], - self.tra_if.local_addr[addr_type], - self.tra_if.remote_addr[addr_type], - self.tra_if.remote_addr[addr_type], - 0, policy=e.IPSEC_API_SPD_ACTION_PROTECT, - priority=10)) + objs.append( + VppIpsecSpdEntry( + self, + self.tra_spd, + vpp_tra_sa_id, + addr_any, + addr_bcast, + addr_any, + addr_bcast, + socket.IPPROTO_AH, + ) + ) + objs.append( + VppIpsecSpdEntry( + self, + self.tra_spd, + scapy_tra_sa_id, + addr_any, + addr_bcast, + addr_any, + addr_bcast, + socket.IPPROTO_AH, + is_outbound=0, + ) + ) + objs.append( + VppIpsecSpdEntry( + self, + self.tra_spd, + vpp_tra_sa_id, + self.tra_if.local_addr[addr_type], + self.tra_if.local_addr[addr_type], + self.tra_if.remote_addr[addr_type], + self.tra_if.remote_addr[addr_type], + 0, + priority=10, + policy=e.IPSEC_API_SPD_ACTION_PROTECT, + is_outbound=0, + ) + ) + objs.append( + VppIpsecSpdEntry( + self, + self.tra_spd, + scapy_tra_sa_id, + self.tra_if.local_addr[addr_type], + self.tra_if.local_addr[addr_type], + self.tra_if.remote_addr[addr_type], + self.tra_if.remote_addr[addr_type], + 0, + policy=e.IPSEC_API_SPD_ACTION_PROTECT, + priority=10, + ) + ) for o in objs: o.add_vpp_config() @@ -288,6 +388,7 @@ class TemplateIpsecAh(ConfigIpsecAH): --- --- --- """ + @classmethod def setUpClass(cls): super(TemplateIpsecAh, cls).setUpClass() @@ -306,26 +407,30 @@ class TemplateIpsecAh(ConfigIpsecAH): class TestIpsecAh1(TemplateIpsecAh, IpsecTcpTests): - """ Ipsec AH - TCP tests """ + """Ipsec AH - TCP tests""" + pass class TestIpsecAh2(TemplateIpsecAh, IpsecTra46Tests, IpsecTun46Tests): - """ Ipsec AH w/ SHA1 """ + """Ipsec AH w/ SHA1""" + pass class TestIpsecAhTun(TemplateIpsecAh, IpsecTun46Tests): - """ Ipsec AH - TUN encap tests """ + """Ipsec AH - TUN encap tests""" def setUp(self): self.ipv4_params = IPsecIPv4Params() self.ipv6_params = IPsecIPv6Params() - c = (VppEnum.vl_api_tunnel_encap_decap_flags_t. - TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_COPY_DSCP) - c1 = c | (VppEnum.vl_api_tunnel_encap_decap_flags_t. - TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_COPY_ECN) + c = ( + VppEnum.vl_api_tunnel_encap_decap_flags_t.TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_COPY_DSCP + ) + c1 = c | ( + VppEnum.vl_api_tunnel_encap_decap_flags_t.TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_COPY_ECN + ) self.ipv4_params.tun_flags = c self.ipv6_params.tun_flags = c1 @@ -334,19 +439,23 @@ class TestIpsecAhTun(TemplateIpsecAh, IpsecTun46Tests): def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=54): # set the DSCP + ECN - flags are set to copy only DSCP - return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) / - IP(src=src, dst=dst, tos=5) / - UDP(sport=4444, dport=4444) / - Raw(b'X' * payload_size) - for i in range(count)] + return [ + Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) + / IP(src=src, dst=dst, tos=5) + / UDP(sport=4444, dport=4444) + / Raw(b"X" * payload_size) + for i in range(count) + ] def gen_pkts6(self, p, sw_intf, src, dst, count=1, payload_size=54): # set the DSCP + ECN - flags are set to copy both - return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) / - IPv6(src=src, dst=dst, tc=5) / - UDP(sport=4444, dport=4444) / - Raw(b'X' * payload_size) - for i in range(count)] + return [ + Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) + / IPv6(src=src, dst=dst, tc=5) + / UDP(sport=4444, dport=4444) + / Raw(b"X" * payload_size) + for i in range(count) + ] def verify_encrypted(self, p, sa, rxs): # just check that only the DSCP is copied @@ -360,7 +469,7 @@ class TestIpsecAhTun(TemplateIpsecAh, IpsecTun46Tests): class TestIpsecAhTun2(TemplateIpsecAh, IpsecTun46Tests): - """ Ipsec AH - TUN encap tests """ + """Ipsec AH - TUN encap tests""" def setUp(self): self.ipv4_params = IPsecIPv4Params() @@ -373,24 +482,28 @@ class TestIpsecAhTun2(TemplateIpsecAh, IpsecTun46Tests): def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=54): # set the DSCP + ECN - flags are set to copy only DSCP - return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) / - IP(src=src, dst=dst, tos=0) / - UDP(sport=4444, dport=4444) / - Raw(b'X' * payload_size) - for i in range(count)] + return [ + Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) + / IP(src=src, dst=dst, tos=0) + / UDP(sport=4444, dport=4444) + / Raw(b"X" * payload_size) + for i in range(count) + ] def gen_pkts6(self, p, sw_intf, src, dst, count=1, payload_size=54): # set the DSCP + ECN - flags are set to copy both - return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) / - IPv6(src=src, dst=dst, tc=0) / - UDP(sport=4444, dport=4444) / - Raw(b'X' * payload_size) - for i in range(count)] + return [ + Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) + / IPv6(src=src, dst=dst, tc=0) + / UDP(sport=4444, dport=4444) + / Raw(b"X" * payload_size) + for i in range(count) + ] def verify_encrypted(self, p, sa, rxs): # just check that only the DSCP is copied for rx in rxs: - self.assertEqual(rx[IP].tos, 0xc) + self.assertEqual(rx[IP].tos, 0xC) def verify_encrypted6(self, p, sa, rxs): # just check that the DSCP & ECN are copied @@ -398,17 +511,14 @@ class TestIpsecAhTun2(TemplateIpsecAh, IpsecTun46Tests): self.assertEqual(rx[IPv6].tc, 0x10) -class TestIpsecAhHandoff(TemplateIpsecAh, - IpsecTun6HandoffTests, - IpsecTun4HandoffTests): - """ Ipsec AH Handoff """ +class TestIpsecAhHandoff(TemplateIpsecAh, IpsecTun6HandoffTests, IpsecTun4HandoffTests): + """Ipsec AH Handoff""" + pass -class TestIpsecAhAll(ConfigIpsecAH, - IpsecTra4, IpsecTra6, - IpsecTun4, IpsecTun6): - """ Ipsec AH all Algos """ +class TestIpsecAhAll(ConfigIpsecAH, IpsecTra4, IpsecTra6, IpsecTun4, IpsecTun6): + """Ipsec AH all Algos""" def setUp(self): super(TestIpsecAhAll, self).setUp() @@ -421,21 +531,26 @@ class TestIpsecAhAll(ConfigIpsecAH, # foreach VPP crypto engine engines = ["ia32", "ipsecmb", "openssl"] - algos = [{'vpp': VppEnum.vl_api_ipsec_integ_alg_t. - IPSEC_API_INTEG_ALG_SHA1_96, - 'scapy': "HMAC-SHA1-96"}, - {'vpp': VppEnum.vl_api_ipsec_integ_alg_t. - IPSEC_API_INTEG_ALG_SHA_256_128, - 'scapy': "SHA2-256-128"}, - {'vpp': VppEnum.vl_api_ipsec_integ_alg_t. - IPSEC_API_INTEG_ALG_SHA_384_192, - 'scapy': "SHA2-384-192"}, - {'vpp': VppEnum.vl_api_ipsec_integ_alg_t. - IPSEC_API_INTEG_ALG_SHA_512_256, - 'scapy': "SHA2-512-256"}] - - flags = [0, (VppEnum.vl_api_ipsec_sad_flags_t. - IPSEC_API_SAD_FLAG_USE_ESN)] + algos = [ + { + "vpp": VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_SHA1_96, + "scapy": "HMAC-SHA1-96", + }, + { + "vpp": VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_SHA_256_128, + "scapy": "SHA2-256-128", + }, + { + "vpp": VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_SHA_384_192, + "scapy": "SHA2-384-192", + }, + { + "vpp": VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_SHA_512_256, + "scapy": "SHA2-512-256", + }, + ] + + flags = [0, (VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_USE_ESN)] # # loop through the VPP engines @@ -454,14 +569,14 @@ class TestIpsecAhAll(ConfigIpsecAH, self.ipv4_params = IPsecIPv4Params() self.ipv6_params = IPsecIPv6Params() - self.params = {self.ipv4_params.addr_type: - self.ipv4_params, - self.ipv6_params.addr_type: - self.ipv6_params} + self.params = { + self.ipv4_params.addr_type: self.ipv4_params, + self.ipv6_params.addr_type: self.ipv6_params, + } for _, p in self.params.items(): - p.auth_algo_vpp_id = algo['vpp'] - p.auth_algo = algo['scapy'] + p.auth_algo_vpp_id = algo["vpp"] + p.auth_algo = algo["scapy"] p.flags = p.flags | flag # @@ -484,5 +599,5 @@ class TestIpsecAhAll(ConfigIpsecAH, self.unconfig_network() -if __name__ == '__main__': +if __name__ == "__main__": unittest.main(testRunner=VppTestRunner)