6 from scapy.layers.l2 import Ether
7 from scapy.layers.inet import ICMP, IP, TCP, UDP
8 from scapy.layers.ipsec import SecurityAssociation, ESP
10 from util import ppp, ppc
11 from template_ipsec import TemplateIpsec
12 from vpp_ipsec import VppIpsecSA, VppIpsecSpd, VppIpsecSpdEntry, VppIpsecSpdItfBinding
13 from vpp_ip_route import VppIpRoute, VppRoutePath
14 from vpp_ip import DpoProto
15 from vpp_papi import VppEnum
18 class IPSecNATTestCase(TemplateIpsec):
23 public network | private network
24 --- encrypt --- plain ---
25 |pg0| <------- |VPP| <------ |pg1|
28 --- decrypt --- plain ---
29 |pg0| -------> |VPP| ------> |pg1|
43 super(IPSecNATTestCase, cls).setUpClass()
46 def tearDownClass(cls):
47 super(IPSecNATTestCase, cls).tearDownClass()
50 super(IPSecNATTestCase, self).setUp()
51 self.tun_if = self.pg0
53 self.tun_spd = VppIpsecSpd(self, self.tun_spd_id)
54 self.tun_spd.add_vpp_config()
55 VppIpsecSpdItfBinding(self, self.tun_spd, self.tun_if).add_vpp_config()
58 self.config_esp_tun(p)
59 self.logger.info(self.vapi.ppcli("show ipsec all"))
61 d = DpoProto.DPO_PROTO_IP6 if p.is_ipv6 else DpoProto.DPO_PROTO_IP4
66 [VppRoutePath(self.tun_if.remote_addr[p.addr_type], 0xFFFFFFFF, proto=d)],
70 super(IPSecNATTestCase, self).tearDown()
72 def create_stream_plain(self, src_mac, dst_mac, src_ip, dst_ip):
75 Ether(src=src_mac, dst=dst_mac)
76 / IP(src=src_ip, dst=dst_ip)
77 / TCP(sport=self.tcp_port_in, dport=20),
79 Ether(src=src_mac, dst=dst_mac)
80 / IP(src=src_ip, dst=dst_ip)
81 / UDP(sport=self.udp_port_in, dport=20),
83 Ether(src=src_mac, dst=dst_mac)
84 / IP(src=src_ip, dst=dst_ip)
85 / ICMP(id=self.icmp_id_in, type="echo-request"),
88 def create_stream_encrypted(self, src_mac, dst_mac, src_ip, dst_ip, sa):
91 Ether(src=src_mac, dst=dst_mac)
93 IP(src=src_ip, dst=dst_ip) / TCP(dport=self.tcp_port_out, sport=20)
96 Ether(src=src_mac, dst=dst_mac)
98 IP(src=src_ip, dst=dst_ip) / UDP(dport=self.udp_port_out, sport=20)
101 Ether(src=src_mac, dst=dst_mac)
103 IP(src=src_ip, dst=dst_ip)
104 / ICMP(id=self.icmp_id_out, type="echo-request")
108 def verify_capture_plain(self, capture):
109 for packet in capture:
111 self.assert_packet_checksums_valid(packet)
114 self.tun_if.remote_ip4,
115 "decrypted packet source address",
120 "decrypted packet destination address",
122 if packet.haslayer(TCP):
124 packet.haslayer(UDP),
125 "unexpected UDP header in decrypted packet",
130 "decrypted packet TCP destination port",
132 elif packet.haslayer(UDP):
133 if packet[UDP].payload:
135 packet[UDP][1].haslayer(UDP),
136 "unexpected UDP header in decrypted packet",
141 "decrypted packet UDP destination port",
145 packet.haslayer(UDP),
146 "unexpected UDP header in decrypted packet",
149 packet[ICMP].id, self.icmp_id_in, "decrypted packet ICMP ID"
152 self.logger.error(ppp("Unexpected or invalid plain packet:", packet))
155 def verify_capture_encrypted(self, capture, sa):
156 for packet in capture:
158 copy = packet.__class__(scapy.compat.raw(packet))
160 copy = packet.__class__(scapy.compat.raw(copy))
161 self.assert_equal(packet[UDP].len, copy[UDP].len, "UDP header length")
162 self.assert_packet_checksums_valid(packet)
163 self.assertIn(ESP, packet[IP])
164 decrypt_pkt = sa.decrypt(packet[IP])
165 self.assert_packet_checksums_valid(decrypt_pkt)
169 "encrypted packet source address",
173 self.tun_if.remote_ip4,
174 "encrypted packet destination address",
178 ppp("Unexpected or invalid encrypted packet:", packet)
182 def config_esp_tun(self, params):
183 addr_type = params.addr_type
184 scapy_tun_sa_id = params.scapy_tun_sa_id
185 scapy_tun_spi = params.scapy_tun_spi
186 vpp_tun_sa_id = params.vpp_tun_sa_id
187 vpp_tun_spi = params.vpp_tun_spi
188 auth_algo_vpp_id = params.auth_algo_vpp_id
189 auth_key = params.auth_key
190 crypt_algo_vpp_id = params.crypt_algo_vpp_id
191 crypt_key = params.crypt_key
192 addr_any = params.addr_any
193 addr_bcast = params.addr_bcast
194 flags = VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_UDP_ENCAP
195 e = VppEnum.vl_api_ipsec_spd_action_t
205 self.vpp_esp_protocol,
206 self.pg1.remote_addr[addr_type],
207 self.tun_if.remote_addr[addr_type],
218 self.vpp_esp_protocol,
219 self.tun_if.remote_addr[addr_type],
220 self.pg1.remote_addr[addr_type],
254 remote_port_start=4500,
255 remote_port_stop=4500,
266 remote_port_start=4500,
267 remote_port_stop=4500,
274 self.tun_if.remote_addr[addr_type],
275 self.tun_if.remote_addr[addr_type],
276 self.pg1.remote_addr[addr_type],
277 self.pg1.remote_addr[addr_type],
280 policy=e.IPSEC_API_SPD_ACTION_PROTECT,
287 self.pg1.remote_addr[addr_type],
288 self.pg1.remote_addr[addr_type],
289 self.tun_if.remote_addr[addr_type],
290 self.tun_if.remote_addr[addr_type],
292 policy=e.IPSEC_API_SPD_ACTION_PROTECT,
296 def test_ipsec_nat_tun(self):
297 """IPSec/NAT tunnel test case"""
299 scapy_tun_sa = SecurityAssociation(
302 crypt_algo=p.crypt_algo,
303 crypt_key=p.crypt_key,
304 auth_algo=p.auth_algo,
306 tunnel_header=IP(src=self.pg1.remote_ip4, dst=self.tun_if.remote_ip4),
307 nat_t_header=UDP(sport=4500, dport=4500),
309 # in2out - from private network to public
310 pkts = self.create_stream_plain(
314 self.tun_if.remote_ip4,
316 self.pg1.add_stream(pkts)
317 self.pg_enable_capture(self.pg_interfaces)
319 capture = self.tun_if.get_capture(len(pkts))
320 self.verify_capture_encrypted(capture, scapy_tun_sa)
322 vpp_tun_sa = SecurityAssociation(
325 crypt_algo=p.crypt_algo,
326 crypt_key=p.crypt_key,
327 auth_algo=p.auth_algo,
329 tunnel_header=IP(src=self.tun_if.remote_ip4, dst=self.pg1.remote_ip4),
330 nat_t_header=UDP(sport=4500, dport=4500),
333 # out2in - from public network to private
334 pkts = self.create_stream_encrypted(
335 self.tun_if.remote_mac,
336 self.tun_if.local_mac,
337 self.tun_if.remote_ip4,
341 self.logger.info(ppc("Sending packets:", pkts))
342 self.tun_if.add_stream(pkts)
343 self.pg_enable_capture(self.pg_interfaces)
345 capture = self.pg1.get_capture(len(pkts))
346 self.verify_capture_plain(capture)