X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=test%2Ftest_ipsec_nat.py;h=7e6e1d4d91227d91555419894d6e2c63e721ed85;hb=be16020c5034bc69df25a8ecd7081aec9898d93c;hp=6d9dc109add11c205cab6a6fe6ba93d96c83ce49;hpb=beaded5e0cbcd507fa4dca4f71712bd4e6911e69;p=vpp.git diff --git a/test/test_ipsec_nat.py b/test/test_ipsec_nat.py index 6d9dc109add..7e6e1d4d912 100644 --- a/test/test_ipsec_nat.py +++ b/test/test_ipsec_nat.py @@ -9,15 +9,8 @@ from util import ppp, ppc from template_ipsec import TemplateIpsec -class IPSecNATTestCase(TemplateIpsec): +class TemplateIPSecNAT(TemplateIpsec): """ IPSec/NAT - - TRANSPORT MODE: - - --- encrypt --- - |pg2| <-------> |VPP| - --- decrypt --- - TUNNEL MODE: @@ -31,20 +24,27 @@ class IPSecNATTestCase(TemplateIpsec): --- --- --- """ + tcp_port_in = 6303 + tcp_port_out = 6303 + udp_port_in = 6304 + udp_port_out = 6304 + icmp_id_in = 6305 + icmp_id_out = 6305 + @classmethod def setUpClass(cls): - super(IPSecNATTestCase, cls).setUpClass() - cls.tcp_port_in = 6303 - cls.tcp_port_out = 6303 - cls.udp_port_in = 6304 - cls.udp_port_out = 6304 - cls.icmp_id_in = 6305 - cls.icmp_id_out = 6305 + super(TemplateIPSecNAT, cls).setUpClass() cls.tun_if = cls.pg0 - cls.config_esp_tun() + cls.vapi.ipsec_spd_add_del(cls.tun_spd_id) + cls.vapi.ipsec_interface_add_del_spd(cls.tun_spd_id, + cls.tun_if.sw_if_index) + p = cls.ipv4_params + cls.config_esp_tun(p) cls.logger.info(cls.vapi.ppcli("show ipsec")) - client = socket.inet_pton(socket.AF_INET, cls.remote_tun_if_host) - cls.vapi.ip_add_del_route(client, 32, cls.pg0.remote_ip4n) + src = socket.inet_pton(p.addr_type, p.remote_tun_if_host) + cls.vapi.ip_add_del_route(src, p.addr_len, + cls.tun_if.remote_addr_n[p.addr_type], + is_ipv6=p.is_ipv6) def create_stream_plain(self, src_mac, dst_mac, src_ip, dst_ip): return [ @@ -113,83 +113,90 @@ class IPSecNATTestCase(TemplateIpsec): def verify_capture_encrypted(self, capture, sa): for packet in capture: try: + copy = packet.__class__(str(packet)) + del copy[UDP].len + copy = packet.__class__(str(copy)) + self.assert_equal(packet[UDP].len, copy[UDP].len, + "UDP header length") + self.assert_packet_checksums_valid(packet) self.assertIn(ESP, packet[IP]) decrypt_pkt = sa.decrypt(packet[IP]) + self.assert_packet_checksums_valid(decrypt_pkt) self.assert_equal(decrypt_pkt[IP].src, self.pg1.remote_ip4, "encrypted packet source address") self.assert_equal(decrypt_pkt[IP].dst, self.tun_if.remote_ip4, "encrypted packet destination address") - # if decrypt_pkt.haslayer(TCP): - # self.tcp_port_out = decrypt_pkt[TCP].sport - # elif decrypt_pkt.haslayer(UDP): - # self.udp_port_out = decrypt_pkt[UDP].sport - # else: - # self.icmp_id_out = decrypt_pkt[ICMP].id except Exception: self.logger.error( ppp("Unexpected or invalid encrypted packet:", packet)) raise @classmethod - def config_esp_tun(cls): - cls.vapi.ipsec_sad_add_del_entry(cls.scapy_tun_sa_id, - cls.scapy_tun_spi, - cls.auth_algo_vpp_id, cls.auth_key, - cls.crypt_algo_vpp_id, - cls.crypt_key, cls.vpp_esp_protocol, - cls.pg1.remote_ip4n, - cls.tun_if.remote_ip4n) - cls.vapi.ipsec_sad_add_del_entry(cls.vpp_tun_sa_id, - cls.vpp_tun_spi, - cls.auth_algo_vpp_id, cls.auth_key, - cls.crypt_algo_vpp_id, - cls.crypt_key, cls.vpp_esp_protocol, - cls.tun_if.remote_ip4n, - cls.pg1.remote_ip4n) - cls.vapi.ipsec_spd_add_del(cls.tun_spd_id) - cls.vapi.ipsec_interface_add_del_spd(cls.tun_spd_id, - cls.tun_if.sw_if_index) - l_startaddr = r_startaddr = socket.inet_pton(socket.AF_INET, - "0.0.0.0") - l_stopaddr = r_stopaddr = socket.inet_pton(socket.AF_INET, - "255.255.255.255") - cls.vapi.ipsec_spd_add_del_entry(cls.tun_spd_id, cls.vpp_tun_sa_id, + def config_esp_tun(cls, params): + addr_type = params.addr_type + scapy_tun_sa_id = params.scapy_tun_sa_id + scapy_tun_spi = params.scapy_tun_spi + vpp_tun_sa_id = params.vpp_tun_sa_id + vpp_tun_spi = params.vpp_tun_spi + auth_algo_vpp_id = params.auth_algo_vpp_id + auth_key = params.auth_key + crypt_algo_vpp_id = params.crypt_algo_vpp_id + crypt_key = params.crypt_key + addr_any = params.addr_any + addr_bcast = params.addr_bcast + cls.vapi.ipsec_sad_add_del_entry(scapy_tun_sa_id, scapy_tun_spi, + auth_algo_vpp_id, auth_key, + crypt_algo_vpp_id, crypt_key, + cls.vpp_esp_protocol, + cls.pg1.remote_addr_n[addr_type], + cls.tun_if.remote_addr_n[addr_type], + udp_encap=1) + cls.vapi.ipsec_sad_add_del_entry(vpp_tun_sa_id, vpp_tun_spi, + auth_algo_vpp_id, auth_key, + crypt_algo_vpp_id, crypt_key, + cls.vpp_esp_protocol, + cls.tun_if.remote_addr_n[addr_type], + cls.pg1.remote_addr_n[addr_type], + udp_encap=1) + l_startaddr = r_startaddr = socket.inet_pton(addr_type, addr_any) + l_stopaddr = r_stopaddr = socket.inet_pton(addr_type, addr_bcast) + cls.vapi.ipsec_spd_add_del_entry(cls.tun_spd_id, scapy_tun_sa_id, l_startaddr, l_stopaddr, r_startaddr, r_stopaddr, protocol=socket.IPPROTO_ESP) - cls.vapi.ipsec_spd_add_del_entry(cls.tun_spd_id, cls.scapy_tun_sa_id, + cls.vapi.ipsec_spd_add_del_entry(cls.tun_spd_id, scapy_tun_sa_id, l_startaddr, l_stopaddr, r_startaddr, r_stopaddr, is_outbound=0, protocol=socket.IPPROTO_ESP) - cls.vapi.ipsec_spd_add_del_entry(cls.tun_spd_id, cls.vpp_tun_sa_id, + cls.vapi.ipsec_spd_add_del_entry(cls.tun_spd_id, scapy_tun_sa_id, l_startaddr, l_stopaddr, r_startaddr, r_stopaddr, remote_port_start=4500, remote_port_stop=4500, protocol=socket.IPPROTO_UDP) - cls.vapi.ipsec_spd_add_del_entry(cls.tun_spd_id, cls.scapy_tun_sa_id, + cls.vapi.ipsec_spd_add_del_entry(cls.tun_spd_id, scapy_tun_sa_id, l_startaddr, l_stopaddr, r_startaddr, r_stopaddr, remote_port_start=4500, remote_port_stop=4500, protocol=socket.IPPROTO_UDP, is_outbound=0) - l_startaddr = l_stopaddr = cls.tun_if.remote_ip4n - r_startaddr = r_stopaddr = cls.pg1.remote_ip4n - cls.vapi.ipsec_spd_add_del_entry(cls.tun_spd_id, cls.vpp_tun_sa_id, + l_startaddr = l_stopaddr = cls.tun_if.remote_addr_n[addr_type] + r_startaddr = r_stopaddr = cls.pg1.remote_addr_n[addr_type] + cls.vapi.ipsec_spd_add_del_entry(cls.tun_spd_id, vpp_tun_sa_id, l_startaddr, l_stopaddr, r_startaddr, r_stopaddr, priority=10, policy=3, is_outbound=0) - cls.vapi.ipsec_spd_add_del_entry(cls.tun_spd_id, cls.scapy_tun_sa_id, + cls.vapi.ipsec_spd_add_del_entry(cls.tun_spd_id, scapy_tun_sa_id, r_startaddr, r_stopaddr, l_startaddr, l_stopaddr, priority=10, policy=3) def test_ipsec_nat_tun(self): """ IPSec/NAT tunnel test case """ - scapy_tun_sa = SecurityAssociation(ESP, - spi=self.scapy_tun_spi, - crypt_algo=self.crypt_algo, - crypt_key=self.crypt_key, - auth_algo=self.auth_algo, - auth_key=self.auth_key, + p = self.ipv4_params + scapy_tun_sa = SecurityAssociation(ESP, spi=p.scapy_tun_spi, + crypt_algo=p.crypt_algo, + crypt_key=p.crypt_key, + auth_algo=p.auth_algo, + auth_key=p.auth_key, tunnel_header=IP( src=self.pg1.remote_ip4, dst=self.tun_if.remote_ip4), @@ -207,11 +214,11 @@ class IPSecNATTestCase(TemplateIpsec): self.verify_capture_encrypted(capture, scapy_tun_sa) vpp_tun_sa = SecurityAssociation(ESP, - spi=self.vpp_tun_spi, - crypt_algo=self.crypt_algo, - crypt_key=self.crypt_key, - auth_algo=self.auth_algo, - auth_key=self.auth_key, + spi=p.vpp_tun_spi, + crypt_algo=p.crypt_algo, + crypt_key=p.crypt_key, + auth_algo=p.auth_algo, + auth_key=p.auth_key, tunnel_header=IP( src=self.tun_if.remote_ip4, dst=self.pg1.remote_ip4), @@ -229,3 +236,8 @@ class IPSecNATTestCase(TemplateIpsec): self.pg_start() capture = self.pg1.get_capture(len(pkts)) self.verify_capture_plain(capture) + + +class IPSecNAT(TemplateIPSecNAT): + """ IPSec/NAT """ + pass