4 from scapy.layers.ipsec import ESP
5 from scapy.layers.l2 import Ether, Raw, GRE
6 from scapy.layers.inet import IP, UDP
7 from framework import VppTestRunner, is_skip_aarch64_set, is_platform_aarch64
8 from template_ipsec import TemplateIpsec, IpsecTun4Tests, IpsecTun6Tests, \
9 IpsecTun4, IpsecTun6, IpsecTcpTests, config_tun_params
10 from vpp_ipsec_tun_interface import VppIpsecTunInterface, \
11 VppIpsecGRETunInterface
12 from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto
13 from vpp_ipsec import VppIpsecSA
14 from vpp_l2 import VppBridgeDomain, VppBridgeDomainPort
18 class TemplateIpsec4TunIfEsp(TemplateIpsec):
19 """ IPsec tunnel interface tests """
24 super(TemplateIpsec4TunIfEsp, self).setUp()
26 self.tun_if = self.pg0
30 p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
31 p.scapy_tun_spi, p.crypt_algo_vpp_id,
32 p.crypt_key, p.crypt_key,
33 p.auth_algo_vpp_id, p.auth_key,
35 p.tun_if.add_vpp_config()
40 VppIpRoute(self, p.remote_tun_if_host, 32,
41 [VppRoutePath(p.tun_if.remote_ip4,
42 0xffffffff)]).add_vpp_config()
43 VppIpRoute(self, p.remote_tun_if_host6, 128,
44 [VppRoutePath(p.tun_if.remote_ip6,
46 proto=DpoProto.DPO_PROTO_IP6)],
47 is_ip6=1).add_vpp_config()
51 self.vapi.cli("show hardware")
52 super(TemplateIpsec4TunIfEsp, self).tearDown()
55 class TestIpsec4TunIfEsp1(TemplateIpsec4TunIfEsp, IpsecTun4Tests):
56 """ Ipsec ESP - TUN tests """
57 tun4_encrypt_node_name = "esp4-encrypt"
58 tun4_decrypt_node_name = "esp4-decrypt"
60 def test_tun_basic64(self):
61 """ ipsec 6o4 tunnel basic test """
62 self.verify_tun_64(self.params[socket.AF_INET], count=1)
64 def test_tun_burst64(self):
65 """ ipsec 6o4 tunnel basic test """
66 self.verify_tun_64(self.params[socket.AF_INET], count=257)
68 def test_tun_basic_frag44(self):
69 """ ipsec 4o4 tunnel frag basic test """
72 self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index,
74 self.verify_tun_44(self.params[socket.AF_INET],
75 count=1, payload_size=1800, n_rx=2)
76 self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index,
80 class TestIpsec4TunIfEsp2(TemplateIpsec4TunIfEsp, IpsecTcpTests):
81 """ Ipsec ESP - TCP tests """
85 class TemplateIpsec6TunIfEsp(TemplateIpsec):
86 """ IPsec tunnel interface tests """
91 super(TemplateIpsec6TunIfEsp, self).setUp()
93 self.tun_if = self.pg0
96 tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
97 p.scapy_tun_spi, p.crypt_algo_vpp_id,
98 p.crypt_key, p.crypt_key,
99 p.auth_algo_vpp_id, p.auth_key,
100 p.auth_key, is_ip6=True)
101 tun_if.add_vpp_config()
106 VppIpRoute(self, p.remote_tun_if_host, 128,
107 [VppRoutePath(tun_if.remote_ip6,
109 proto=DpoProto.DPO_PROTO_IP6)],
110 is_ip6=1).add_vpp_config()
111 VppIpRoute(self, p.remote_tun_if_host4, 32,
112 [VppRoutePath(tun_if.remote_ip4,
113 0xffffffff)]).add_vpp_config()
116 if not self.vpp_dead:
117 self.vapi.cli("show hardware")
118 super(TemplateIpsec6TunIfEsp, self).tearDown()
121 class TestIpsec6TunIfEsp1(TemplateIpsec6TunIfEsp, IpsecTun6Tests):
122 """ Ipsec ESP - TUN tests """
123 tun6_encrypt_node_name = "esp6-encrypt"
124 tun6_decrypt_node_name = "esp6-decrypt"
126 def test_tun_basic46(self):
127 """ ipsec 4o6 tunnel basic test """
128 self.verify_tun_46(self.params[socket.AF_INET6], count=1)
130 def test_tun_burst46(self):
131 """ ipsec 4o6 tunnel burst test """
132 self.verify_tun_46(self.params[socket.AF_INET6], count=257)
135 class TestIpsec4MultiTunIfEsp(TemplateIpsec, IpsecTun4):
136 """ IPsec IPv4 Multi Tunnel interface """
138 encryption_type = ESP
139 tun4_encrypt_node_name = "esp4-encrypt"
140 tun4_decrypt_node_name = "esp4-decrypt"
143 super(TestIpsec4MultiTunIfEsp, self).setUp()
145 self.tun_if = self.pg0
147 self.multi_params = []
150 p = copy.copy(self.ipv4_params)
152 p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
153 p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
154 p.scapy_tun_spi = p.scapy_tun_spi + ii
155 p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
156 p.vpp_tun_spi = p.vpp_tun_spi + ii
158 p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
159 p.scapy_tra_spi = p.scapy_tra_spi + ii
160 p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
161 p.vpp_tra_spi = p.vpp_tra_spi + ii
163 config_tun_params(p, self.encryption_type, self.tun_if)
164 self.multi_params.append(p)
166 p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
169 p.crypt_key, p.crypt_key,
170 p.auth_algo_vpp_id, p.auth_key,
172 p.tun_if.add_vpp_config()
174 p.tun_if.config_ip4()
176 VppIpRoute(self, p.remote_tun_if_host, 32,
177 [VppRoutePath(p.tun_if.remote_ip4,
178 0xffffffff)]).add_vpp_config()
181 if not self.vpp_dead:
182 self.vapi.cli("show hardware")
183 super(TestIpsec4MultiTunIfEsp, self).tearDown()
185 def test_tun_44(self):
186 """Multiple IPSEC tunnel interfaces """
187 for p in self.multi_params:
188 self.verify_tun_44(p, count=127)
189 c = p.tun_if.get_rx_stats()
190 self.assertEqual(c['packets'], 127)
191 c = p.tun_if.get_tx_stats()
192 self.assertEqual(c['packets'], 127)
195 class TestIpsec6MultiTunIfEsp(TemplateIpsec, IpsecTun6):
196 """ IPsec IPv6 Multi Tunnel interface """
198 encryption_type = ESP
199 tun6_encrypt_node_name = "esp6-encrypt"
200 tun6_decrypt_node_name = "esp6-decrypt"
203 super(TestIpsec6MultiTunIfEsp, self).setUp()
205 self.tun_if = self.pg0
207 self.multi_params = []
210 p = copy.copy(self.ipv6_params)
212 p.remote_tun_if_host = "1111::%d" % (ii + 1)
213 p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
214 p.scapy_tun_spi = p.scapy_tun_spi + ii
215 p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
216 p.vpp_tun_spi = p.vpp_tun_spi + ii
218 p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
219 p.scapy_tra_spi = p.scapy_tra_spi + ii
220 p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
221 p.vpp_tra_spi = p.vpp_tra_spi + ii
223 config_tun_params(p, self.encryption_type, self.tun_if)
224 self.multi_params.append(p)
226 p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
229 p.crypt_key, p.crypt_key,
230 p.auth_algo_vpp_id, p.auth_key,
231 p.auth_key, is_ip6=True)
232 p.tun_if.add_vpp_config()
234 p.tun_if.config_ip6()
236 VppIpRoute(self, p.remote_tun_if_host, 128,
237 [VppRoutePath(p.tun_if.remote_ip6,
239 proto=DpoProto.DPO_PROTO_IP6)],
240 is_ip6=1).add_vpp_config()
243 if not self.vpp_dead:
244 self.vapi.cli("show hardware")
245 super(TestIpsec6MultiTunIfEsp, self).tearDown()
247 def test_tun_66(self):
248 """Multiple IPSEC tunnel interfaces """
249 for p in self.multi_params:
250 self.verify_tun_66(p, count=127)
251 c = p.tun_if.get_rx_stats()
252 self.assertEqual(c['packets'], 127)
253 c = p.tun_if.get_tx_stats()
254 self.assertEqual(c['packets'], 127)
257 class TemplateIpsecGRETunIfEsp(TemplateIpsec):
258 """ IPsec GRE tunnel interface tests """
260 encryption_type = ESP
261 omac = "00:11:22:33:44:55"
263 def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1,
265 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
266 sa.encrypt(IP(src=self.pg0.remote_ip4,
267 dst=self.pg0.local_ip4) /
269 Ether(dst=self.omac) /
270 IP(src="1.1.1.1", dst="1.1.1.2") /
271 UDP(sport=1144, dport=2233) /
272 Raw('X' * payload_size))
273 for i in range(count)]
275 def gen_pkts(self, sw_intf, src, dst, count=1,
277 return [Ether(dst=self.omac) /
278 IP(src="1.1.1.1", dst="1.1.1.2") /
279 UDP(sport=1144, dport=2233) /
280 Raw('X' * payload_size)
281 for i in range(count)]
283 def verify_decrypted(self, p, rxs):
285 self.assert_equal(rx[Ether].dst, self.omac)
286 self.assert_equal(rx[IP].dst, "1.1.1.2")
288 def verify_encrypted(self, p, sa, rxs):
291 pkt = sa.decrypt(rx[IP])
292 if not pkt.haslayer(IP):
293 pkt = IP(pkt[Raw].load)
294 self.assert_packet_checksums_valid(pkt)
295 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
296 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
297 self.assertTrue(pkt.haslayer(GRE))
299 self.assertEqual(e[Ether].dst, self.omac)
300 self.assertEqual(e[IP].dst, "1.1.1.2")
301 except (IndexError, AssertionError):
302 self.logger.debug(ppp("Unexpected packet:", rx))
304 self.logger.debug(ppp("Decrypted packet:", pkt))
310 super(TemplateIpsecGRETunIfEsp, self).setUp()
312 self.tun_if = self.pg0
316 bd1 = VppBridgeDomain(self, 1)
319 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
320 p.auth_algo_vpp_id, p.auth_key,
321 p.crypt_algo_vpp_id, p.crypt_key,
322 self.vpp_esp_protocol,
325 p.tun_sa_out.add_vpp_config()
327 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
328 p.auth_algo_vpp_id, p.auth_key,
329 p.crypt_algo_vpp_id, p.crypt_key,
330 self.vpp_esp_protocol,
333 p.tun_sa_in.add_vpp_config()
335 self.tun = VppIpsecGRETunInterface(self, self.pg0,
339 self.tun.add_vpp_config()
341 self.tun.config_ip4()
343 VppIpRoute(self, p.remote_tun_if_host, 32,
344 [VppRoutePath(self.tun.remote_ip4,
345 0xffffffff)]).add_vpp_config()
346 VppBridgeDomainPort(self, bd1, self.tun).add_vpp_config()
347 VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
350 if not self.vpp_dead:
351 self.vapi.cli("show hardware")
352 self.tun.unconfig_ip4()
353 super(TemplateIpsecGRETunIfEsp, self).tearDown()
356 @unittest.skipIf(is_skip_aarch64_set and is_platform_aarch64,
357 "test doesn't work on aarch64")
358 class TestIpsecGRETunIfEsp1(TemplateIpsecGRETunIfEsp, IpsecTun4Tests):
359 """ Ipsec GRE ESP - TUN tests """
360 tun4_encrypt_node_name = "esp4-encrypt"
361 tun4_decrypt_node_name = "esp4-decrypt"
363 if __name__ == '__main__':
364 unittest.main(testRunner=VppTestRunner)