5 from scapy.layers.l2 import Ether
6 from scapy.layers.inet import ICMP, IP, TCP, UDP
7 from scapy.layers.ipsec import SecurityAssociation, ESP
8 from util import ppp, ppc
9 from template_ipsec import TemplateIpsec
12 class IPSecNATTestCase(TemplateIpsec):
24 public network | private network
25 --- encrypt --- plain ---
26 |pg0| <------- |VPP| <------ |pg1|
29 --- decrypt --- plain ---
30 |pg0| -------> |VPP| ------> |pg1|
36 super(IPSecNATTestCase, cls).setUpClass()
37 cls.tcp_port_in = 6303
38 cls.tcp_port_out = 6303
39 cls.udp_port_in = 6304
40 cls.udp_port_out = 6304
42 cls.icmp_id_out = 6305
45 cls.logger.info(cls.vapi.ppcli("show ipsec"))
46 client = socket.inet_pton(socket.AF_INET, cls.remote_tun_if_host)
47 cls.vapi.ip_add_del_route(client, 32, cls.pg0.remote_ip4n)
49 def create_stream_plain(self, src_mac, dst_mac, src_ip, dst_ip):
52 Ether(src=src_mac, dst=dst_mac) /
53 IP(src=src_ip, dst=dst_ip) /
54 TCP(sport=self.tcp_port_in, dport=20),
56 Ether(src=src_mac, dst=dst_mac) /
57 IP(src=src_ip, dst=dst_ip) /
58 UDP(sport=self.udp_port_in, dport=20),
60 Ether(src=src_mac, dst=dst_mac) /
61 IP(src=src_ip, dst=dst_ip) /
62 ICMP(id=self.icmp_id_in, type='echo-request')
65 def create_stream_encrypted(self, src_mac, dst_mac, src_ip, dst_ip, sa):
68 Ether(src=src_mac, dst=dst_mac) /
69 sa.encrypt(IP(src=src_ip, dst=dst_ip) /
70 TCP(dport=self.tcp_port_out, sport=20)),
72 Ether(src=src_mac, dst=dst_mac) /
73 sa.encrypt(IP(src=src_ip, dst=dst_ip) /
74 UDP(dport=self.udp_port_out, sport=20)),
76 Ether(src=src_mac, dst=dst_mac) /
77 sa.encrypt(IP(src=src_ip, dst=dst_ip) /
78 ICMP(id=self.icmp_id_out, type='echo-request'))
81 def verify_capture_plain(self, capture):
82 for packet in capture:
84 self.assert_packet_checksums_valid(packet)
85 self.assert_equal(packet[IP].src, self.tun_if.remote_ip4,
86 "decrypted packet source address")
87 self.assert_equal(packet[IP].dst, self.pg1.remote_ip4,
88 "decrypted packet destination address")
89 if packet.haslayer(TCP):
92 "unexpected UDP header in decrypted packet")
93 self.assert_equal(packet[TCP].dport, self.tcp_port_in,
94 "decrypted packet TCP destination port")
95 elif packet.haslayer(UDP):
96 if packet[UDP].payload:
98 packet[UDP][1].haslayer(UDP),
99 "unexpected UDP header in decrypted packet")
100 self.assert_equal(packet[UDP].dport, self.udp_port_in,
101 "decrypted packet UDP destination port")
104 packet.haslayer(UDP),
105 "unexpected UDP header in decrypted packet")
106 self.assert_equal(packet[ICMP].id, self.icmp_id_in,
107 "decrypted packet ICMP ID")
110 ppp("Unexpected or invalid plain packet:", packet))
113 def verify_capture_encrypted(self, capture, sa):
114 for packet in capture:
116 self.assertIn(ESP, packet[IP])
117 decrypt_pkt = sa.decrypt(packet[IP])
118 self.assert_equal(decrypt_pkt[IP].src, self.pg1.remote_ip4,
119 "encrypted packet source address")
120 self.assert_equal(decrypt_pkt[IP].dst, self.tun_if.remote_ip4,
121 "encrypted packet destination address")
122 # if decrypt_pkt.haslayer(TCP):
123 # self.tcp_port_out = decrypt_pkt[TCP].sport
124 # elif decrypt_pkt.haslayer(UDP):
125 # self.udp_port_out = decrypt_pkt[UDP].sport
127 # self.icmp_id_out = decrypt_pkt[ICMP].id
130 ppp("Unexpected or invalid encrypted packet:", packet))
134 def config_esp_tun(cls):
135 cls.vapi.ipsec_sad_add_del_entry(cls.scapy_tun_sa_id,
137 cls.auth_algo_vpp_id, cls.auth_key,
138 cls.crypt_algo_vpp_id,
139 cls.crypt_key, cls.vpp_esp_protocol,
141 cls.tun_if.remote_ip4n)
142 cls.vapi.ipsec_sad_add_del_entry(cls.vpp_tun_sa_id,
144 cls.auth_algo_vpp_id, cls.auth_key,
145 cls.crypt_algo_vpp_id,
146 cls.crypt_key, cls.vpp_esp_protocol,
147 cls.tun_if.remote_ip4n,
149 cls.vapi.ipsec_spd_add_del(cls.tun_spd_id)
150 cls.vapi.ipsec_interface_add_del_spd(cls.tun_spd_id,
151 cls.tun_if.sw_if_index)
152 l_startaddr = r_startaddr = socket.inet_pton(socket.AF_INET,
154 l_stopaddr = r_stopaddr = socket.inet_pton(socket.AF_INET,
156 cls.vapi.ipsec_spd_add_del_entry(cls.tun_spd_id, cls.vpp_tun_sa_id,
157 l_startaddr, l_stopaddr, r_startaddr,
159 protocol=socket.IPPROTO_ESP)
160 cls.vapi.ipsec_spd_add_del_entry(cls.tun_spd_id, cls.scapy_tun_sa_id,
161 l_startaddr, l_stopaddr, r_startaddr,
162 r_stopaddr, is_outbound=0,
163 protocol=socket.IPPROTO_ESP)
164 cls.vapi.ipsec_spd_add_del_entry(cls.tun_spd_id, cls.vpp_tun_sa_id,
165 l_startaddr, l_stopaddr, r_startaddr,
166 r_stopaddr, remote_port_start=4500,
167 remote_port_stop=4500,
168 protocol=socket.IPPROTO_UDP)
169 cls.vapi.ipsec_spd_add_del_entry(cls.tun_spd_id, cls.scapy_tun_sa_id,
170 l_startaddr, l_stopaddr, r_startaddr,
171 r_stopaddr, remote_port_start=4500,
172 remote_port_stop=4500,
173 protocol=socket.IPPROTO_UDP,
175 l_startaddr = l_stopaddr = cls.tun_if.remote_ip4n
176 r_startaddr = r_stopaddr = cls.pg1.remote_ip4n
177 cls.vapi.ipsec_spd_add_del_entry(cls.tun_spd_id, cls.vpp_tun_sa_id,
178 l_startaddr, l_stopaddr, r_startaddr,
179 r_stopaddr, priority=10, policy=3,
181 cls.vapi.ipsec_spd_add_del_entry(cls.tun_spd_id, cls.scapy_tun_sa_id,
182 r_startaddr, r_stopaddr, l_startaddr,
183 l_stopaddr, priority=10, policy=3)
185 def test_ipsec_nat_tun(self):
186 """ IPSec/NAT tunnel test case """
187 scapy_tun_sa = SecurityAssociation(ESP,
188 spi=self.scapy_tun_spi,
189 crypt_algo=self.crypt_algo,
190 crypt_key=self.crypt_key,
191 auth_algo=self.auth_algo,
192 auth_key=self.auth_key,
194 src=self.pg1.remote_ip4,
195 dst=self.tun_if.remote_ip4),
199 # in2out - from private network to public
200 pkts = self.create_stream_plain(
201 self.pg1.remote_mac, self.pg1.local_mac,
202 self.pg1.remote_ip4, self.tun_if.remote_ip4)
203 self.pg1.add_stream(pkts)
204 self.pg_enable_capture(self.pg_interfaces)
206 capture = self.tun_if.get_capture(len(pkts))
207 self.verify_capture_encrypted(capture, scapy_tun_sa)
209 vpp_tun_sa = SecurityAssociation(ESP,
210 spi=self.vpp_tun_spi,
211 crypt_algo=self.crypt_algo,
212 crypt_key=self.crypt_key,
213 auth_algo=self.auth_algo,
214 auth_key=self.auth_key,
216 src=self.tun_if.remote_ip4,
217 dst=self.pg1.remote_ip4),
222 # out2in - from public network to private
223 pkts = self.create_stream_encrypted(
224 self.tun_if.remote_mac, self.tun_if.local_mac,
225 self.tun_if.remote_ip4, self.pg1.remote_ip4, vpp_tun_sa)
226 self.logger.info(ppc("Sending packets:", pkts))
227 self.tun_if.add_stream(pkts)
228 self.pg_enable_capture(self.pg_interfaces)
230 capture = self.pg1.get_capture(len(pkts))
231 self.verify_capture_plain(capture)