5 from scapy.layers.ipsec import ESP
6 from scapy.layers.l2 import Ether, Raw, GRE
7 from scapy.layers.inet import IP, UDP
8 from framework import VppTestRunner, is_skip_aarch64_set, is_platform_aarch64
9 from template_ipsec import TemplateIpsec, IpsecTun4Tests, IpsecTun6Tests, \
10 IpsecTun4, IpsecTun6, IpsecTcpTests, config_tun_params
11 from vpp_ipsec_tun_interface import VppIpsecTunInterface, \
12 VppIpsecGRETunInterface
13 from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto
14 from vpp_ipsec import VppIpsecSA
15 from vpp_l2 import VppBridgeDomain, VppBridgeDomainPort
17 from vpp_papi import VppEnum
20 class TemplateIpsec4TunIfEsp(TemplateIpsec):
21 """ IPsec tunnel interface tests """
27 super(TemplateIpsec4TunIfEsp, cls).setUpClass()
30 def tearDownClass(cls):
31 super(TemplateIpsec4TunIfEsp, cls).tearDownClass()
34 super(TemplateIpsec4TunIfEsp, self).setUp()
36 self.tun_if = self.pg0
40 p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
41 p.scapy_tun_spi, p.crypt_algo_vpp_id,
42 p.crypt_key, p.crypt_key,
43 p.auth_algo_vpp_id, p.auth_key,
45 p.tun_if.add_vpp_config()
50 VppIpRoute(self, p.remote_tun_if_host, 32,
51 [VppRoutePath(p.tun_if.remote_ip4,
52 0xffffffff)]).add_vpp_config()
53 VppIpRoute(self, p.remote_tun_if_host6, 128,
54 [VppRoutePath(p.tun_if.remote_ip6,
56 proto=DpoProto.DPO_PROTO_IP6)],
57 is_ip6=1).add_vpp_config()
61 self.vapi.cli("show hardware")
62 super(TemplateIpsec4TunIfEsp, self).tearDown()
65 class TestIpsec4TunIfEsp1(TemplateIpsec4TunIfEsp, IpsecTun4Tests):
66 """ Ipsec ESP - TUN tests """
67 tun4_encrypt_node_name = "esp4-encrypt"
68 tun4_decrypt_node_name = "esp4-decrypt"
70 def test_tun_basic64(self):
71 """ ipsec 6o4 tunnel basic test """
72 self.verify_tun_64(self.params[socket.AF_INET], count=1)
74 def test_tun_burst64(self):
75 """ ipsec 6o4 tunnel basic test """
76 self.verify_tun_64(self.params[socket.AF_INET], count=257)
78 def test_tun_basic_frag44(self):
79 """ ipsec 4o4 tunnel frag basic test """
82 self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index,
84 self.verify_tun_44(self.params[socket.AF_INET],
85 count=1, payload_size=1800, n_rx=2)
86 self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index,
90 class TestIpsec4TunIfEsp2(TemplateIpsec4TunIfEsp, IpsecTcpTests):
91 """ Ipsec ESP - TCP tests """
95 class TemplateIpsec6TunIfEsp(TemplateIpsec):
96 """ IPsec tunnel interface tests """
101 super(TemplateIpsec6TunIfEsp, self).setUp()
103 self.tun_if = self.pg0
106 tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
107 p.scapy_tun_spi, p.crypt_algo_vpp_id,
108 p.crypt_key, p.crypt_key,
109 p.auth_algo_vpp_id, p.auth_key,
110 p.auth_key, is_ip6=True)
111 tun_if.add_vpp_config()
116 VppIpRoute(self, p.remote_tun_if_host, 128,
117 [VppRoutePath(tun_if.remote_ip6,
119 proto=DpoProto.DPO_PROTO_IP6)],
120 is_ip6=1).add_vpp_config()
121 VppIpRoute(self, p.remote_tun_if_host4, 32,
122 [VppRoutePath(tun_if.remote_ip4,
123 0xffffffff)]).add_vpp_config()
126 if not self.vpp_dead:
127 self.vapi.cli("show hardware")
128 super(TemplateIpsec6TunIfEsp, self).tearDown()
131 class TestIpsec6TunIfEsp1(TemplateIpsec6TunIfEsp, IpsecTun6Tests):
132 """ Ipsec ESP - TUN tests """
133 tun6_encrypt_node_name = "esp6-encrypt"
134 tun6_decrypt_node_name = "esp6-decrypt"
136 def test_tun_basic46(self):
137 """ ipsec 4o6 tunnel basic test """
138 self.verify_tun_46(self.params[socket.AF_INET6], count=1)
140 def test_tun_burst46(self):
141 """ ipsec 4o6 tunnel burst test """
142 self.verify_tun_46(self.params[socket.AF_INET6], count=257)
145 class TestIpsec4MultiTunIfEsp(TemplateIpsec, IpsecTun4):
146 """ IPsec IPv4 Multi Tunnel interface """
148 encryption_type = ESP
149 tun4_encrypt_node_name = "esp4-encrypt"
150 tun4_decrypt_node_name = "esp4-decrypt"
153 super(TestIpsec4MultiTunIfEsp, self).setUp()
155 self.tun_if = self.pg0
157 self.multi_params = []
160 p = copy.copy(self.ipv4_params)
162 p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
163 p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
164 p.scapy_tun_spi = p.scapy_tun_spi + ii
165 p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
166 p.vpp_tun_spi = p.vpp_tun_spi + ii
168 p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
169 p.scapy_tra_spi = p.scapy_tra_spi + ii
170 p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
171 p.vpp_tra_spi = p.vpp_tra_spi + ii
173 config_tun_params(p, self.encryption_type, self.tun_if)
174 self.multi_params.append(p)
176 p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
179 p.crypt_key, p.crypt_key,
180 p.auth_algo_vpp_id, p.auth_key,
182 p.tun_if.add_vpp_config()
184 p.tun_if.config_ip4()
186 VppIpRoute(self, p.remote_tun_if_host, 32,
187 [VppRoutePath(p.tun_if.remote_ip4,
188 0xffffffff)]).add_vpp_config()
191 if not self.vpp_dead:
192 self.vapi.cli("show hardware")
193 super(TestIpsec4MultiTunIfEsp, self).tearDown()
195 def test_tun_44(self):
196 """Multiple IPSEC tunnel interfaces """
197 for p in self.multi_params:
198 self.verify_tun_44(p, count=127)
199 c = p.tun_if.get_rx_stats()
200 self.assertEqual(c['packets'], 127)
201 c = p.tun_if.get_tx_stats()
202 self.assertEqual(c['packets'], 127)
205 class TestIpsec4TunIfEspAll(TemplateIpsec, IpsecTun4):
206 """ IPsec IPv4 Tunnel interface all Algos """
208 encryption_type = ESP
209 tun4_encrypt_node_name = "esp4-encrypt"
210 tun4_decrypt_node_name = "esp4-decrypt"
212 def config_network(self, p):
213 config_tun_params(p, self.encryption_type, self.tun_if)
215 p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
218 p.crypt_key, p.crypt_key,
219 p.auth_algo_vpp_id, p.auth_key,
222 p.tun_if.add_vpp_config()
224 p.tun_if.config_ip4()
225 self.logger.info(self.vapi.cli("sh ipsec sa 0"))
226 self.logger.info(self.vapi.cli("sh ipsec sa 1"))
228 p.route = VppIpRoute(self, p.remote_tun_if_host, 32,
229 [VppRoutePath(p.tun_if.remote_ip4,
231 p.route.add_vpp_config()
233 def unconfig_network(self, p):
234 p.tun_if.unconfig_ip4()
235 p.tun_if.remove_vpp_config()
236 p.route.remove_vpp_config()
239 super(TestIpsec4TunIfEspAll, self).setUp()
241 self.tun_if = self.pg0
244 super(TestIpsec4TunIfEspAll, self).tearDown()
248 # change the key and the SPI
250 p.crypt_key = 'X' + p.crypt_key[1:]
252 p.scapy_tun_sa_id += 1
255 p.tun_if.local_spi = p.vpp_tun_spi
256 p.tun_if.remote_spi = p.scapy_tun_spi
258 config_tun_params(p, self.encryption_type, self.tun_if)
260 p.tun_sa_in = VppIpsecSA(self,
267 self.vpp_esp_protocol,
268 self.tun_if.local_addr[p.addr_type],
269 self.tun_if.remote_addr[p.addr_type],
272 p.tun_sa_out = VppIpsecSA(self,
279 self.vpp_esp_protocol,
280 self.tun_if.remote_addr[p.addr_type],
281 self.tun_if.local_addr[p.addr_type],
284 p.tun_sa_in.add_vpp_config()
285 p.tun_sa_out.add_vpp_config()
287 self.vapi.ipsec_tunnel_if_set_sa(sw_if_index=p.tun_if.sw_if_index,
288 sa_id=p.tun_sa_in.id,
290 self.vapi.ipsec_tunnel_if_set_sa(sw_if_index=p.tun_if.sw_if_index,
291 sa_id=p.tun_sa_out.id,
293 self.logger.info(self.vapi.cli("sh ipsec sa"))
295 def test_tun_44(self):
296 """IPSEC tunnel all algos """
298 # foreach VPP crypto engine
299 engines = ["ia32", "ipsecmb", "openssl"]
301 # foreach crypto algorithm
302 algos = [{'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
303 IPSEC_API_CRYPTO_ALG_AES_GCM_128),
304 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
305 IPSEC_API_INTEG_ALG_NONE),
306 'scapy-crypto': "AES-GCM",
307 'scapy-integ': "NULL",
308 'key': "JPjyOWBeVEQiMe7h",
310 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
311 IPSEC_API_CRYPTO_ALG_AES_GCM_192),
312 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
313 IPSEC_API_INTEG_ALG_NONE),
314 'scapy-crypto': "AES-GCM",
315 'scapy-integ': "NULL",
316 'key': "JPjyOWBeVEQiMe7hJPjyOWBe",
318 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
319 IPSEC_API_CRYPTO_ALG_AES_GCM_256),
320 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
321 IPSEC_API_INTEG_ALG_NONE),
322 'scapy-crypto': "AES-GCM",
323 'scapy-integ': "NULL",
324 'key': "JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h",
326 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
327 IPSEC_API_CRYPTO_ALG_AES_CBC_128),
328 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
329 IPSEC_API_INTEG_ALG_SHA1_96),
330 'scapy-crypto': "AES-CBC",
331 'scapy-integ': "HMAC-SHA1-96",
333 'key': "JPjyOWBeVEQiMe7h"},
334 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
335 IPSEC_API_CRYPTO_ALG_AES_CBC_192),
336 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
337 IPSEC_API_INTEG_ALG_SHA1_96),
338 'scapy-crypto': "AES-CBC",
339 'scapy-integ': "HMAC-SHA1-96",
341 'key': "JPjyOWBeVEQiMe7hJPjyOWBe"},
342 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
343 IPSEC_API_CRYPTO_ALG_AES_CBC_256),
344 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
345 IPSEC_API_INTEG_ALG_SHA1_96),
346 'scapy-crypto': "AES-CBC",
347 'scapy-integ': "HMAC-SHA1-96",
349 'key': "JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"}]
351 for engine in engines:
352 self.vapi.cli("set crypto handler all %s" % engine)
355 # loop through each of the algorithms
358 # with self.subTest(algo=algo['scapy']):
360 p = copy.copy(self.ipv4_params)
361 p.auth_algo_vpp_id = algo['vpp-integ']
362 p.crypt_algo_vpp_id = algo['vpp-crypto']
363 p.crypt_algo = algo['scapy-crypto']
364 p.auth_algo = algo['scapy-integ']
365 p.crypt_key = algo['key']
366 p.salt = algo['salt']
368 self.config_network(p)
370 self.verify_tun_44(p, count=127)
371 c = p.tun_if.get_rx_stats()
372 self.assertEqual(c['packets'], 127)
373 c = p.tun_if.get_tx_stats()
374 self.assertEqual(c['packets'], 127)
380 self.verify_tun_44(p, count=127)
382 self.unconfig_network(p)
383 p.tun_sa_out.remove_vpp_config()
384 p.tun_sa_in.remove_vpp_config()
387 class TestIpsec6MultiTunIfEsp(TemplateIpsec, IpsecTun6):
388 """ IPsec IPv6 Multi Tunnel interface """
390 encryption_type = ESP
391 tun6_encrypt_node_name = "esp6-encrypt"
392 tun6_decrypt_node_name = "esp6-decrypt"
395 super(TestIpsec6MultiTunIfEsp, self).setUp()
397 self.tun_if = self.pg0
399 self.multi_params = []
402 p = copy.copy(self.ipv6_params)
404 p.remote_tun_if_host = "1111::%d" % (ii + 1)
405 p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
406 p.scapy_tun_spi = p.scapy_tun_spi + ii
407 p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
408 p.vpp_tun_spi = p.vpp_tun_spi + ii
410 p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
411 p.scapy_tra_spi = p.scapy_tra_spi + ii
412 p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
413 p.vpp_tra_spi = p.vpp_tra_spi + ii
415 config_tun_params(p, self.encryption_type, self.tun_if)
416 self.multi_params.append(p)
418 p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
421 p.crypt_key, p.crypt_key,
422 p.auth_algo_vpp_id, p.auth_key,
423 p.auth_key, is_ip6=True)
424 p.tun_if.add_vpp_config()
426 p.tun_if.config_ip6()
428 VppIpRoute(self, p.remote_tun_if_host, 128,
429 [VppRoutePath(p.tun_if.remote_ip6,
431 proto=DpoProto.DPO_PROTO_IP6)],
432 is_ip6=1).add_vpp_config()
435 if not self.vpp_dead:
436 self.vapi.cli("show hardware")
437 super(TestIpsec6MultiTunIfEsp, self).tearDown()
439 def test_tun_66(self):
440 """Multiple IPSEC tunnel interfaces """
441 for p in self.multi_params:
442 self.verify_tun_66(p, count=127)
443 c = p.tun_if.get_rx_stats()
444 self.assertEqual(c['packets'], 127)
445 c = p.tun_if.get_tx_stats()
446 self.assertEqual(c['packets'], 127)
449 class TemplateIpsecGRETunIfEsp(TemplateIpsec):
450 """ IPsec GRE tunnel interface tests """
452 encryption_type = ESP
453 omac = "00:11:22:33:44:55"
455 def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1,
457 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
458 sa.encrypt(IP(src=self.pg0.remote_ip4,
459 dst=self.pg0.local_ip4) /
461 Ether(dst=self.omac) /
462 IP(src="1.1.1.1", dst="1.1.1.2") /
463 UDP(sport=1144, dport=2233) /
464 Raw('X' * payload_size))
465 for i in range(count)]
467 def gen_pkts(self, sw_intf, src, dst, count=1,
469 return [Ether(dst=self.omac) /
470 IP(src="1.1.1.1", dst="1.1.1.2") /
471 UDP(sport=1144, dport=2233) /
472 Raw('X' * payload_size)
473 for i in range(count)]
475 def verify_decrypted(self, p, rxs):
477 self.assert_equal(rx[Ether].dst, self.omac)
478 self.assert_equal(rx[IP].dst, "1.1.1.2")
480 def verify_encrypted(self, p, sa, rxs):
483 pkt = sa.decrypt(rx[IP])
484 if not pkt.haslayer(IP):
485 pkt = IP(pkt[Raw].load)
486 self.assert_packet_checksums_valid(pkt)
487 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
488 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
489 self.assertTrue(pkt.haslayer(GRE))
491 self.assertEqual(e[Ether].dst, self.omac)
492 self.assertEqual(e[IP].dst, "1.1.1.2")
493 except (IndexError, AssertionError):
494 self.logger.debug(ppp("Unexpected packet:", rx))
496 self.logger.debug(ppp("Decrypted packet:", pkt))
502 super(TemplateIpsecGRETunIfEsp, self).setUp()
504 self.tun_if = self.pg0
508 bd1 = VppBridgeDomain(self, 1)
511 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
512 p.auth_algo_vpp_id, p.auth_key,
513 p.crypt_algo_vpp_id, p.crypt_key,
514 self.vpp_esp_protocol,
517 p.tun_sa_out.add_vpp_config()
519 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
520 p.auth_algo_vpp_id, p.auth_key,
521 p.crypt_algo_vpp_id, p.crypt_key,
522 self.vpp_esp_protocol,
525 p.tun_sa_in.add_vpp_config()
527 self.tun = VppIpsecGRETunInterface(self, self.pg0,
531 self.tun.add_vpp_config()
533 self.tun.config_ip4()
535 VppIpRoute(self, p.remote_tun_if_host, 32,
536 [VppRoutePath(self.tun.remote_ip4,
537 0xffffffff)]).add_vpp_config()
538 VppBridgeDomainPort(self, bd1, self.tun).add_vpp_config()
539 VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
542 if not self.vpp_dead:
543 self.vapi.cli("show hardware")
544 self.tun.unconfig_ip4()
545 super(TemplateIpsecGRETunIfEsp, self).tearDown()
548 @unittest.skipIf(is_skip_aarch64_set and is_platform_aarch64,
549 "test doesn't work on aarch64")
550 class TestIpsecGRETunIfEsp1(TemplateIpsecGRETunIfEsp, IpsecTun4Tests):
551 """ Ipsec GRE ESP - TUN tests """
552 tun4_encrypt_node_name = "esp4-encrypt"
553 tun4_decrypt_node_name = "esp4-decrypt"
555 if __name__ == '__main__':
556 unittest.main(testRunner=VppTestRunner)