5 from scapy.layers.ipsec import ESP
6 from scapy.layers.l2 import Ether, Raw, GRE
7 from scapy.layers.inet import IP, UDP
8 from framework import VppTestRunner, is_skip_aarch64_set, is_platform_aarch64
9 from template_ipsec import TemplateIpsec, IpsecTun4Tests, IpsecTun6Tests, \
10 IpsecTun4, IpsecTun6, IpsecTcpTests, config_tun_params
11 from vpp_ipsec_tun_interface import VppIpsecTunInterface, \
12 VppIpsecGRETunInterface
13 from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto
14 from vpp_ipsec import VppIpsecSA
15 from vpp_l2 import VppBridgeDomain, VppBridgeDomainPort
17 from vpp_papi import VppEnum
20 class TemplateIpsec4TunIfEsp(TemplateIpsec):
21 """ IPsec tunnel interface tests """
27 super(TemplateIpsec4TunIfEsp, cls).setUpClass()
30 def tearDownClass(cls):
31 super(TemplateIpsec4TunIfEsp, cls).tearDownClass()
34 super(TemplateIpsec4TunIfEsp, self).setUp()
36 self.tun_if = self.pg0
40 p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
41 p.scapy_tun_spi, p.crypt_algo_vpp_id,
42 p.crypt_key, p.crypt_key,
43 p.auth_algo_vpp_id, p.auth_key,
45 p.tun_if.add_vpp_config()
50 VppIpRoute(self, p.remote_tun_if_host, 32,
51 [VppRoutePath(p.tun_if.remote_ip4,
52 0xffffffff)]).add_vpp_config()
53 VppIpRoute(self, p.remote_tun_if_host6, 128,
54 [VppRoutePath(p.tun_if.remote_ip6,
56 proto=DpoProto.DPO_PROTO_IP6)],
57 is_ip6=1).add_vpp_config()
61 self.vapi.cli("show hardware")
62 super(TemplateIpsec4TunIfEsp, self).tearDown()
65 class TestIpsec4TunIfEsp1(TemplateIpsec4TunIfEsp, IpsecTun4Tests):
66 """ Ipsec ESP - TUN tests """
67 tun4_encrypt_node_name = "esp4-encrypt"
68 tun4_decrypt_node_name = "esp4-decrypt"
70 def test_tun_basic64(self):
71 """ ipsec 6o4 tunnel basic test """
72 self.verify_tun_64(self.params[socket.AF_INET], count=1)
74 def test_tun_burst64(self):
75 """ ipsec 6o4 tunnel basic test """
76 self.verify_tun_64(self.params[socket.AF_INET], count=257)
78 def test_tun_basic_frag44(self):
79 """ ipsec 4o4 tunnel frag basic test """
82 self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index,
84 self.verify_tun_44(self.params[socket.AF_INET],
85 count=1, payload_size=1800, n_rx=2)
86 self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index,
90 class TestIpsec4TunIfEsp2(TemplateIpsec4TunIfEsp, IpsecTcpTests):
91 """ Ipsec ESP - TCP tests """
95 class TemplateIpsec6TunIfEsp(TemplateIpsec):
96 """ IPsec tunnel interface tests """
101 super(TemplateIpsec6TunIfEsp, self).setUp()
103 self.tun_if = self.pg0
106 tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
107 p.scapy_tun_spi, p.crypt_algo_vpp_id,
108 p.crypt_key, p.crypt_key,
109 p.auth_algo_vpp_id, p.auth_key,
110 p.auth_key, is_ip6=True)
111 tun_if.add_vpp_config()
116 VppIpRoute(self, p.remote_tun_if_host, 128,
117 [VppRoutePath(tun_if.remote_ip6,
119 proto=DpoProto.DPO_PROTO_IP6)],
120 is_ip6=1).add_vpp_config()
121 VppIpRoute(self, p.remote_tun_if_host4, 32,
122 [VppRoutePath(tun_if.remote_ip4,
123 0xffffffff)]).add_vpp_config()
126 if not self.vpp_dead:
127 self.vapi.cli("show hardware")
128 super(TemplateIpsec6TunIfEsp, self).tearDown()
131 class TestIpsec6TunIfEsp1(TemplateIpsec6TunIfEsp, IpsecTun6Tests):
132 """ Ipsec ESP - TUN tests """
133 tun6_encrypt_node_name = "esp6-encrypt"
134 tun6_decrypt_node_name = "esp6-decrypt"
136 def test_tun_basic46(self):
137 """ ipsec 4o6 tunnel basic test """
138 self.verify_tun_46(self.params[socket.AF_INET6], count=1)
140 def test_tun_burst46(self):
141 """ ipsec 4o6 tunnel burst test """
142 self.verify_tun_46(self.params[socket.AF_INET6], count=257)
145 class TestIpsec4MultiTunIfEsp(TemplateIpsec, IpsecTun4):
146 """ IPsec IPv4 Multi Tunnel interface """
148 encryption_type = ESP
149 tun4_encrypt_node_name = "esp4-encrypt"
150 tun4_decrypt_node_name = "esp4-decrypt"
153 super(TestIpsec4MultiTunIfEsp, self).setUp()
155 self.tun_if = self.pg0
157 self.multi_params = []
160 p = copy.copy(self.ipv4_params)
162 p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
163 p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
164 p.scapy_tun_spi = p.scapy_tun_spi + ii
165 p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
166 p.vpp_tun_spi = p.vpp_tun_spi + ii
168 p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
169 p.scapy_tra_spi = p.scapy_tra_spi + ii
170 p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
171 p.vpp_tra_spi = p.vpp_tra_spi + ii
173 config_tun_params(p, self.encryption_type, self.tun_if)
174 self.multi_params.append(p)
176 p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
179 p.crypt_key, p.crypt_key,
180 p.auth_algo_vpp_id, p.auth_key,
182 p.tun_if.add_vpp_config()
184 p.tun_if.config_ip4()
186 VppIpRoute(self, p.remote_tun_if_host, 32,
187 [VppRoutePath(p.tun_if.remote_ip4,
188 0xffffffff)]).add_vpp_config()
191 if not self.vpp_dead:
192 self.vapi.cli("show hardware")
193 super(TestIpsec4MultiTunIfEsp, self).tearDown()
195 def test_tun_44(self):
196 """Multiple IPSEC tunnel interfaces """
197 for p in self.multi_params:
198 self.verify_tun_44(p, count=127)
199 c = p.tun_if.get_rx_stats()
200 self.assertEqual(c['packets'], 127)
201 c = p.tun_if.get_tx_stats()
202 self.assertEqual(c['packets'], 127)
205 class TestIpsec4TunIfEspAll(TemplateIpsec, IpsecTun4):
206 """ IPsec IPv4 Tunnel interface all Algos """
208 encryption_type = ESP
209 tun4_encrypt_node_name = "esp4-encrypt"
210 tun4_decrypt_node_name = "esp4-decrypt"
212 def config_network(self, p):
213 config_tun_params(p, self.encryption_type, self.tun_if)
215 p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
218 p.crypt_key, p.crypt_key,
219 p.auth_algo_vpp_id, p.auth_key,
222 p.tun_if.add_vpp_config()
224 p.tun_if.config_ip4()
225 self.logger.info(self.vapi.cli("sh ipsec sa 0"))
226 self.logger.info(self.vapi.cli("sh ipsec sa 1"))
228 p.route = VppIpRoute(self, p.remote_tun_if_host, 32,
229 [VppRoutePath(p.tun_if.remote_ip4,
231 p.route.add_vpp_config()
233 def unconfig_network(self, p):
234 p.tun_if.unconfig_ip4()
235 p.tun_if.remove_vpp_config()
236 p.route.remove_vpp_config()
239 super(TestIpsec4TunIfEspAll, self).setUp()
241 self.tun_if = self.pg0
244 super(TestIpsec4TunIfEspAll, self).tearDown()
246 def test_tun_44(self):
247 """IPSEC tunnel all algos """
249 # foreach VPP crypto engine
250 engines = ["ia32", "ipsecmb", "openssl"]
252 # foreach crypto algorithm
253 algos = [{'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
254 IPSEC_API_CRYPTO_ALG_AES_GCM_128),
255 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
256 IPSEC_API_INTEG_ALG_NONE),
257 'scapy-crypto': "AES-GCM",
258 'scapy-integ': "NULL",
259 'key': "JPjyOWBeVEQiMe7h",
261 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
262 IPSEC_API_CRYPTO_ALG_AES_GCM_192),
263 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
264 IPSEC_API_INTEG_ALG_NONE),
265 'scapy-crypto': "AES-GCM",
266 'scapy-integ': "NULL",
267 'key': "JPjyOWBeVEQiMe7hJPjyOWBe",
269 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
270 IPSEC_API_CRYPTO_ALG_AES_GCM_256),
271 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
272 IPSEC_API_INTEG_ALG_NONE),
273 'scapy-crypto': "AES-GCM",
274 'scapy-integ': "NULL",
275 'key': "JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h",
277 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
278 IPSEC_API_CRYPTO_ALG_AES_CBC_128),
279 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
280 IPSEC_API_INTEG_ALG_SHA1_96),
281 'scapy-crypto': "AES-CBC",
282 'scapy-integ': "HMAC-SHA1-96",
284 'key': "JPjyOWBeVEQiMe7h"},
285 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
286 IPSEC_API_CRYPTO_ALG_AES_CBC_192),
287 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
288 IPSEC_API_INTEG_ALG_SHA1_96),
289 'scapy-crypto': "AES-CBC",
290 'scapy-integ': "HMAC-SHA1-96",
292 'key': "JPjyOWBeVEQiMe7hJPjyOWBe"},
293 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
294 IPSEC_API_CRYPTO_ALG_AES_CBC_256),
295 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
296 IPSEC_API_INTEG_ALG_SHA1_96),
297 'scapy-crypto': "AES-CBC",
298 'scapy-integ': "HMAC-SHA1-96",
300 'key': "JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"}]
302 for engine in engines:
303 self.vapi.cli("set crypto handler all %s" % engine)
306 # loop through each of the algorithms
309 # with self.subTest(algo=algo['scapy']):
311 p = copy.copy(self.ipv4_params)
312 p.auth_algo_vpp_id = algo['vpp-integ']
313 p.crypt_algo_vpp_id = algo['vpp-crypto']
314 p.crypt_algo = algo['scapy-crypto']
315 p.auth_algo = algo['scapy-integ']
316 p.crypt_key = algo['key']
317 p.salt = algo['salt']
319 self.config_network(p)
321 self.verify_tun_44(p, count=127)
322 c = p.tun_if.get_rx_stats()
323 self.assertEqual(c['packets'], 127)
324 c = p.tun_if.get_tx_stats()
325 self.assertEqual(c['packets'], 127)
327 self.unconfig_network(p)
330 class TestIpsec6MultiTunIfEsp(TemplateIpsec, IpsecTun6):
331 """ IPsec IPv6 Multi Tunnel interface """
333 encryption_type = ESP
334 tun6_encrypt_node_name = "esp6-encrypt"
335 tun6_decrypt_node_name = "esp6-decrypt"
338 super(TestIpsec6MultiTunIfEsp, self).setUp()
340 self.tun_if = self.pg0
342 self.multi_params = []
345 p = copy.copy(self.ipv6_params)
347 p.remote_tun_if_host = "1111::%d" % (ii + 1)
348 p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
349 p.scapy_tun_spi = p.scapy_tun_spi + ii
350 p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
351 p.vpp_tun_spi = p.vpp_tun_spi + ii
353 p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
354 p.scapy_tra_spi = p.scapy_tra_spi + ii
355 p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
356 p.vpp_tra_spi = p.vpp_tra_spi + ii
358 config_tun_params(p, self.encryption_type, self.tun_if)
359 self.multi_params.append(p)
361 p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
364 p.crypt_key, p.crypt_key,
365 p.auth_algo_vpp_id, p.auth_key,
366 p.auth_key, is_ip6=True)
367 p.tun_if.add_vpp_config()
369 p.tun_if.config_ip6()
371 VppIpRoute(self, p.remote_tun_if_host, 128,
372 [VppRoutePath(p.tun_if.remote_ip6,
374 proto=DpoProto.DPO_PROTO_IP6)],
375 is_ip6=1).add_vpp_config()
378 if not self.vpp_dead:
379 self.vapi.cli("show hardware")
380 super(TestIpsec6MultiTunIfEsp, self).tearDown()
382 def test_tun_66(self):
383 """Multiple IPSEC tunnel interfaces """
384 for p in self.multi_params:
385 self.verify_tun_66(p, count=127)
386 c = p.tun_if.get_rx_stats()
387 self.assertEqual(c['packets'], 127)
388 c = p.tun_if.get_tx_stats()
389 self.assertEqual(c['packets'], 127)
392 class TemplateIpsecGRETunIfEsp(TemplateIpsec):
393 """ IPsec GRE tunnel interface tests """
395 encryption_type = ESP
396 omac = "00:11:22:33:44:55"
398 def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1,
400 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
401 sa.encrypt(IP(src=self.pg0.remote_ip4,
402 dst=self.pg0.local_ip4) /
404 Ether(dst=self.omac) /
405 IP(src="1.1.1.1", dst="1.1.1.2") /
406 UDP(sport=1144, dport=2233) /
407 Raw('X' * payload_size))
408 for i in range(count)]
410 def gen_pkts(self, sw_intf, src, dst, count=1,
412 return [Ether(dst=self.omac) /
413 IP(src="1.1.1.1", dst="1.1.1.2") /
414 UDP(sport=1144, dport=2233) /
415 Raw('X' * payload_size)
416 for i in range(count)]
418 def verify_decrypted(self, p, rxs):
420 self.assert_equal(rx[Ether].dst, self.omac)
421 self.assert_equal(rx[IP].dst, "1.1.1.2")
423 def verify_encrypted(self, p, sa, rxs):
426 pkt = sa.decrypt(rx[IP])
427 if not pkt.haslayer(IP):
428 pkt = IP(pkt[Raw].load)
429 self.assert_packet_checksums_valid(pkt)
430 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
431 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
432 self.assertTrue(pkt.haslayer(GRE))
434 self.assertEqual(e[Ether].dst, self.omac)
435 self.assertEqual(e[IP].dst, "1.1.1.2")
436 except (IndexError, AssertionError):
437 self.logger.debug(ppp("Unexpected packet:", rx))
439 self.logger.debug(ppp("Decrypted packet:", pkt))
445 super(TemplateIpsecGRETunIfEsp, self).setUp()
447 self.tun_if = self.pg0
451 bd1 = VppBridgeDomain(self, 1)
454 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
455 p.auth_algo_vpp_id, p.auth_key,
456 p.crypt_algo_vpp_id, p.crypt_key,
457 self.vpp_esp_protocol,
460 p.tun_sa_out.add_vpp_config()
462 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
463 p.auth_algo_vpp_id, p.auth_key,
464 p.crypt_algo_vpp_id, p.crypt_key,
465 self.vpp_esp_protocol,
468 p.tun_sa_in.add_vpp_config()
470 self.tun = VppIpsecGRETunInterface(self, self.pg0,
474 self.tun.add_vpp_config()
476 self.tun.config_ip4()
478 VppIpRoute(self, p.remote_tun_if_host, 32,
479 [VppRoutePath(self.tun.remote_ip4,
480 0xffffffff)]).add_vpp_config()
481 VppBridgeDomainPort(self, bd1, self.tun).add_vpp_config()
482 VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
485 if not self.vpp_dead:
486 self.vapi.cli("show hardware")
487 self.tun.unconfig_ip4()
488 super(TemplateIpsecGRETunIfEsp, self).tearDown()
491 @unittest.skipIf(is_skip_aarch64_set and is_platform_aarch64,
492 "test doesn't work on aarch64")
493 class TestIpsecGRETunIfEsp1(TemplateIpsecGRETunIfEsp, IpsecTun4Tests):
494 """ Ipsec GRE ESP - TUN tests """
495 tun4_encrypt_node_name = "esp4-encrypt"
496 tun4_decrypt_node_name = "esp4-decrypt"
498 if __name__ == '__main__':
499 unittest.main(testRunner=VppTestRunner)