X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=test%2Ftest_ipsec_nat.py;h=07670d71b03682936306972607379c25344dedb5;hb=refs%2Fchanges%2F96%2F12296%2F42;hp=7e6e1d4d91227d91555419894d6e2c63e721ed85;hpb=be16020c5034bc69df25a8ecd7081aec9898d93c;p=vpp.git diff --git a/test/test_ipsec_nat.py b/test/test_ipsec_nat.py index 7e6e1d4d912..07670d71b03 100644 --- a/test/test_ipsec_nat.py +++ b/test/test_ipsec_nat.py @@ -2,14 +2,21 @@ import socket +import scapy.compat from scapy.layers.l2 import Ether from scapy.layers.inet import ICMP, IP, TCP, UDP from scapy.layers.ipsec import SecurityAssociation, ESP + from util import ppp, ppc from template_ipsec import TemplateIpsec +from vpp_ipsec import VppIpsecSA, VppIpsecSpd, VppIpsecSpdEntry,\ + VppIpsecSpdItfBinding +from vpp_ip_route import VppIpRoute, VppRoutePath +from vpp_ip import DpoProto +from vpp_papi import VppEnum -class TemplateIPSecNAT(TemplateIpsec): +class IPSecNATTestCase(TemplateIpsec): """ IPSec/NAT TUNNEL MODE: @@ -33,18 +40,33 @@ class TemplateIPSecNAT(TemplateIpsec): @classmethod def setUpClass(cls): - super(TemplateIPSecNAT, cls).setUpClass() - cls.tun_if = cls.pg0 - cls.vapi.ipsec_spd_add_del(cls.tun_spd_id) - cls.vapi.ipsec_interface_add_del_spd(cls.tun_spd_id, - cls.tun_if.sw_if_index) - p = cls.ipv4_params - cls.config_esp_tun(p) - cls.logger.info(cls.vapi.ppcli("show ipsec")) - src = socket.inet_pton(p.addr_type, p.remote_tun_if_host) - cls.vapi.ip_add_del_route(src, p.addr_len, - cls.tun_if.remote_addr_n[p.addr_type], - is_ipv6=p.is_ipv6) + super(IPSecNATTestCase, cls).setUpClass() + + @classmethod + def tearDownClass(cls): + super(IPSecNATTestCase, cls).tearDownClass() + + def setUp(self): + super(IPSecNATTestCase, self).setUp() + self.tun_if = self.pg0 + + self.tun_spd = VppIpsecSpd(self, self.tun_spd_id) + self.tun_spd.add_vpp_config() + VppIpsecSpdItfBinding(self, self.tun_spd, + self.tun_if).add_vpp_config() + + p = self.ipv4_params + self.config_esp_tun(p) + self.logger.info(self.vapi.ppcli("show ipsec all")) + + d = DpoProto.DPO_PROTO_IP6 if p.is_ipv6 else DpoProto.DPO_PROTO_IP4 + VppIpRoute(self, p.remote_tun_if_host, p.addr_len, + [VppRoutePath(self.tun_if.remote_addr[p.addr_type], + 0xffffffff, + proto=d)]).add_vpp_config() + + def tearDown(self): + super(IPSecNATTestCase, self).tearDown() def create_stream_plain(self, src_mac, dst_mac, src_ip, dst_ip): return [ @@ -113,9 +135,9 @@ class TemplateIPSecNAT(TemplateIpsec): def verify_capture_encrypted(self, capture, sa): for packet in capture: try: - copy = packet.__class__(str(packet)) + copy = packet.__class__(scapy.compat.raw(packet)) del copy[UDP].len - copy = packet.__class__(str(copy)) + copy = packet.__class__(scapy.compat.raw(copy)) self.assert_equal(packet[UDP].len, copy[UDP].len, "UDP header length") self.assert_packet_checksums_valid(packet) @@ -131,8 +153,7 @@ class TemplateIPSecNAT(TemplateIpsec): ppp("Unexpected or invalid encrypted packet:", packet)) raise - @classmethod - def config_esp_tun(cls, params): + def config_esp_tun(self, params): addr_type = params.addr_type scapy_tun_sa_id = params.scapy_tun_sa_id scapy_tun_spi = params.scapy_tun_spi @@ -144,50 +165,62 @@ class TemplateIPSecNAT(TemplateIpsec): crypt_key = params.crypt_key addr_any = params.addr_any addr_bcast = params.addr_bcast - cls.vapi.ipsec_sad_add_del_entry(scapy_tun_sa_id, scapy_tun_spi, - auth_algo_vpp_id, auth_key, - crypt_algo_vpp_id, crypt_key, - cls.vpp_esp_protocol, - cls.pg1.remote_addr_n[addr_type], - cls.tun_if.remote_addr_n[addr_type], - udp_encap=1) - cls.vapi.ipsec_sad_add_del_entry(vpp_tun_sa_id, vpp_tun_spi, - auth_algo_vpp_id, auth_key, - crypt_algo_vpp_id, crypt_key, - cls.vpp_esp_protocol, - cls.tun_if.remote_addr_n[addr_type], - cls.pg1.remote_addr_n[addr_type], - udp_encap=1) - l_startaddr = r_startaddr = socket.inet_pton(addr_type, addr_any) - l_stopaddr = r_stopaddr = socket.inet_pton(addr_type, addr_bcast) - cls.vapi.ipsec_spd_add_del_entry(cls.tun_spd_id, scapy_tun_sa_id, - l_startaddr, l_stopaddr, r_startaddr, - r_stopaddr, - protocol=socket.IPPROTO_ESP) - cls.vapi.ipsec_spd_add_del_entry(cls.tun_spd_id, scapy_tun_sa_id, - l_startaddr, l_stopaddr, r_startaddr, - r_stopaddr, is_outbound=0, - protocol=socket.IPPROTO_ESP) - cls.vapi.ipsec_spd_add_del_entry(cls.tun_spd_id, scapy_tun_sa_id, - l_startaddr, l_stopaddr, r_startaddr, - r_stopaddr, remote_port_start=4500, - remote_port_stop=4500, - protocol=socket.IPPROTO_UDP) - cls.vapi.ipsec_spd_add_del_entry(cls.tun_spd_id, scapy_tun_sa_id, - l_startaddr, l_stopaddr, r_startaddr, - r_stopaddr, remote_port_start=4500, - remote_port_stop=4500, - protocol=socket.IPPROTO_UDP, - is_outbound=0) - l_startaddr = l_stopaddr = cls.tun_if.remote_addr_n[addr_type] - r_startaddr = r_stopaddr = cls.pg1.remote_addr_n[addr_type] - cls.vapi.ipsec_spd_add_del_entry(cls.tun_spd_id, vpp_tun_sa_id, - l_startaddr, l_stopaddr, r_startaddr, - r_stopaddr, priority=10, policy=3, - is_outbound=0) - cls.vapi.ipsec_spd_add_del_entry(cls.tun_spd_id, scapy_tun_sa_id, - r_startaddr, r_stopaddr, l_startaddr, - l_stopaddr, priority=10, policy=3) + flags = (VppEnum.vl_api_ipsec_sad_flags_t. + IPSEC_API_SAD_FLAG_UDP_ENCAP) + e = VppEnum.vl_api_ipsec_spd_action_t + + VppIpsecSA(self, scapy_tun_sa_id, scapy_tun_spi, + auth_algo_vpp_id, auth_key, + crypt_algo_vpp_id, crypt_key, + self.vpp_esp_protocol, + self.pg1.remote_addr[addr_type], + self.tun_if.remote_addr[addr_type], + flags=flags).add_vpp_config() + VppIpsecSA(self, vpp_tun_sa_id, vpp_tun_spi, + auth_algo_vpp_id, auth_key, + crypt_algo_vpp_id, crypt_key, + self.vpp_esp_protocol, + self.tun_if.remote_addr[addr_type], + self.pg1.remote_addr[addr_type], + flags=flags).add_vpp_config() + + VppIpsecSpdEntry(self, self.tun_spd, scapy_tun_sa_id, + addr_any, addr_bcast, + addr_any, addr_bcast, + socket.IPPROTO_ESP).add_vpp_config() + VppIpsecSpdEntry(self, self.tun_spd, scapy_tun_sa_id, + addr_any, addr_bcast, + addr_any, addr_bcast, + socket.IPPROTO_ESP, + is_outbound=0).add_vpp_config() + VppIpsecSpdEntry(self, self.tun_spd, scapy_tun_sa_id, + addr_any, addr_bcast, + addr_any, addr_bcast, + socket.IPPROTO_UDP, + remote_port_start=4500, + remote_port_stop=4500).add_vpp_config() + VppIpsecSpdEntry(self, self.tun_spd, scapy_tun_sa_id, + addr_any, addr_bcast, + addr_any, addr_bcast, + socket.IPPROTO_UDP, + remote_port_start=4500, + remote_port_stop=4500, + is_outbound=0).add_vpp_config() + VppIpsecSpdEntry(self, self.tun_spd, vpp_tun_sa_id, + self.tun_if.remote_addr[addr_type], + self.tun_if.remote_addr[addr_type], + self.pg1.remote_addr[addr_type], + self.pg1.remote_addr[addr_type], + 0, priority=10, + policy=e.IPSEC_API_SPD_ACTION_PROTECT, + is_outbound=0).add_vpp_config() + VppIpsecSpdEntry(self, self.tun_spd, scapy_tun_sa_id, + self.pg1.remote_addr[addr_type], + self.pg1.remote_addr[addr_type], + self.tun_if.remote_addr[addr_type], + self.tun_if.remote_addr[addr_type], + 0, policy=e.IPSEC_API_SPD_ACTION_PROTECT, + priority=10).add_vpp_config() def test_ipsec_nat_tun(self): """ IPSec/NAT tunnel test case """ @@ -236,8 +269,3 @@ class TemplateIPSecNAT(TemplateIpsec): self.pg_start() capture = self.pg1.get_capture(len(pkts)) self.verify_capture_plain(capture) - - -class IPSecNAT(TemplateIPSecNAT): - """ IPSec/NAT """ - pass