6 from scapy.layers.ipsec import ESP
7 from scapy.layers.l2 import Ether, Raw, GRE
8 from scapy.layers.inet import IP, UDP
9 from framework import VppTestRunner, is_skip_aarch64_set, is_platform_aarch64
10 from template_ipsec import TemplateIpsec, IpsecTun4Tests, IpsecTun6Tests, \
11 IpsecTun4, IpsecTun6, IpsecTcpTests, config_tun_params
12 from vpp_ipsec_tun_interface import VppIpsecTunInterface, \
13 VppIpsecGRETunInterface
14 from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto
15 from vpp_ipsec import VppIpsecSA
16 from vpp_l2 import VppBridgeDomain, VppBridgeDomainPort
18 from vpp_papi import VppEnum
21 class TemplateIpsec4TunIfEsp(TemplateIpsec):
22 """ IPsec tunnel interface tests """
28 super(TemplateIpsec4TunIfEsp, cls).setUpClass()
31 def tearDownClass(cls):
32 super(TemplateIpsec4TunIfEsp, cls).tearDownClass()
35 super(TemplateIpsec4TunIfEsp, self).setUp()
37 self.tun_if = self.pg0
41 p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
42 p.scapy_tun_spi, p.crypt_algo_vpp_id,
43 p.crypt_key, p.crypt_key,
44 p.auth_algo_vpp_id, p.auth_key,
46 p.tun_if.add_vpp_config()
51 VppIpRoute(self, p.remote_tun_if_host, 32,
52 [VppRoutePath(p.tun_if.remote_ip4,
53 0xffffffff)]).add_vpp_config()
54 VppIpRoute(self, p.remote_tun_if_host6, 128,
55 [VppRoutePath(p.tun_if.remote_ip6,
57 proto=DpoProto.DPO_PROTO_IP6)],
58 is_ip6=1).add_vpp_config()
62 self.vapi.cli("show hardware")
63 super(TemplateIpsec4TunIfEsp, self).tearDown()
66 class TestIpsec4TunIfEsp1(TemplateIpsec4TunIfEsp, IpsecTun4Tests):
67 """ Ipsec ESP - TUN tests """
68 tun4_encrypt_node_name = "esp4-encrypt"
69 tun4_decrypt_node_name = "esp4-decrypt"
71 def test_tun_basic64(self):
72 """ ipsec 6o4 tunnel basic test """
73 self.verify_tun_64(self.params[socket.AF_INET], count=1)
75 def test_tun_burst64(self):
76 """ ipsec 6o4 tunnel basic test """
77 self.verify_tun_64(self.params[socket.AF_INET], count=257)
79 def test_tun_basic_frag44(self):
80 """ ipsec 4o4 tunnel frag basic test """
83 self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index,
85 self.verify_tun_44(self.params[socket.AF_INET],
86 count=1, payload_size=1800, n_rx=2)
87 self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index,
91 class TestIpsec4TunIfEsp2(TemplateIpsec4TunIfEsp, IpsecTcpTests):
92 """ Ipsec ESP - TCP tests """
96 class TemplateIpsec6TunIfEsp(TemplateIpsec):
97 """ IPsec tunnel interface tests """
102 super(TemplateIpsec6TunIfEsp, self).setUp()
104 self.tun_if = self.pg0
107 tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
108 p.scapy_tun_spi, p.crypt_algo_vpp_id,
109 p.crypt_key, p.crypt_key,
110 p.auth_algo_vpp_id, p.auth_key,
111 p.auth_key, is_ip6=True)
112 tun_if.add_vpp_config()
117 VppIpRoute(self, p.remote_tun_if_host, 128,
118 [VppRoutePath(tun_if.remote_ip6,
120 proto=DpoProto.DPO_PROTO_IP6)],
121 is_ip6=1).add_vpp_config()
122 VppIpRoute(self, p.remote_tun_if_host4, 32,
123 [VppRoutePath(tun_if.remote_ip4,
124 0xffffffff)]).add_vpp_config()
127 if not self.vpp_dead:
128 self.vapi.cli("show hardware")
129 super(TemplateIpsec6TunIfEsp, self).tearDown()
132 class TestIpsec6TunIfEsp1(TemplateIpsec6TunIfEsp, IpsecTun6Tests):
133 """ Ipsec ESP - TUN tests """
134 tun6_encrypt_node_name = "esp6-encrypt"
135 tun6_decrypt_node_name = "esp6-decrypt"
137 def test_tun_basic46(self):
138 """ ipsec 4o6 tunnel basic test """
139 self.verify_tun_46(self.params[socket.AF_INET6], count=1)
141 def test_tun_burst46(self):
142 """ ipsec 4o6 tunnel burst test """
143 self.verify_tun_46(self.params[socket.AF_INET6], count=257)
146 class TestIpsec4MultiTunIfEsp(TemplateIpsec, IpsecTun4):
147 """ IPsec IPv4 Multi Tunnel interface """
149 encryption_type = ESP
150 tun4_encrypt_node_name = "esp4-encrypt"
151 tun4_decrypt_node_name = "esp4-decrypt"
154 super(TestIpsec4MultiTunIfEsp, self).setUp()
156 self.tun_if = self.pg0
158 self.multi_params = []
161 p = copy.copy(self.ipv4_params)
163 p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
164 p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
165 p.scapy_tun_spi = p.scapy_tun_spi + ii
166 p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
167 p.vpp_tun_spi = p.vpp_tun_spi + ii
169 p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
170 p.scapy_tra_spi = p.scapy_tra_spi + ii
171 p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
172 p.vpp_tra_spi = p.vpp_tra_spi + ii
174 config_tun_params(p, self.encryption_type, self.tun_if)
175 self.multi_params.append(p)
177 p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
180 p.crypt_key, p.crypt_key,
181 p.auth_algo_vpp_id, p.auth_key,
183 p.tun_if.add_vpp_config()
185 p.tun_if.config_ip4()
187 VppIpRoute(self, p.remote_tun_if_host, 32,
188 [VppRoutePath(p.tun_if.remote_ip4,
189 0xffffffff)]).add_vpp_config()
192 if not self.vpp_dead:
193 self.vapi.cli("show hardware")
194 super(TestIpsec4MultiTunIfEsp, self).tearDown()
196 def test_tun_44(self):
197 """Multiple IPSEC tunnel interfaces """
198 for p in self.multi_params:
199 self.verify_tun_44(p, count=127)
200 c = p.tun_if.get_rx_stats()
201 self.assertEqual(c['packets'], 127)
202 c = p.tun_if.get_tx_stats()
203 self.assertEqual(c['packets'], 127)
206 class TestIpsec4TunIfEspAll(TemplateIpsec, IpsecTun4):
207 """ IPsec IPv4 Tunnel interface all Algos """
209 encryption_type = ESP
210 tun4_encrypt_node_name = "esp4-encrypt"
211 tun4_decrypt_node_name = "esp4-decrypt"
213 def config_network(self, p):
214 config_tun_params(p, self.encryption_type, self.tun_if)
216 p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
219 p.crypt_key, p.crypt_key,
220 p.auth_algo_vpp_id, p.auth_key,
222 p.tun_if.add_vpp_config()
224 p.tun_if.config_ip4()
225 self.logger.info(self.vapi.cli("sh ipsec sa 0"))
226 self.logger.info(self.vapi.cli("sh ipsec sa 1"))
228 p.route = VppIpRoute(self, p.remote_tun_if_host, 32,
229 [VppRoutePath(p.tun_if.remote_ip4,
231 p.route.add_vpp_config()
233 def unconfig_network(self, p):
234 p.tun_if.unconfig_ip4()
235 p.tun_if.remove_vpp_config()
236 p.route.remove_vpp_config()
239 super(TestIpsec4TunIfEspAll, self).setUp()
241 self.tun_if = self.pg0
244 super(TestIpsec4TunIfEspAll, self).tearDown()
246 def test_tun_44(self):
247 """IPSEC tunnel all algos """
249 # foreach VPP crypto engine
250 engines = ["ia32", "ipsecmb", "openssl"]
252 # foreach crypto algorithm
253 algos = [{'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
254 IPSEC_API_CRYPTO_ALG_AES_GCM_128),
255 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
256 IPSEC_API_INTEG_ALG_NONE),
257 'scapy-crypto': "AES-GCM",
258 'scapy-integ': "NULL",
259 'key': "JPjyOWBeVEQiMe7h",
260 'salt': struct.pack("!L", 0)},
261 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
262 IPSEC_API_CRYPTO_ALG_AES_GCM_192),
263 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
264 IPSEC_API_INTEG_ALG_NONE),
265 'scapy-crypto': "AES-GCM",
266 'scapy-integ': "NULL",
267 'key': "JPjyOWBeVEQiMe7hJPjyOWBe",
268 'salt': struct.pack("!L", 0)},
269 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
270 IPSEC_API_CRYPTO_ALG_AES_GCM_256),
271 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
272 IPSEC_API_INTEG_ALG_NONE),
273 'scapy-crypto': "AES-GCM",
274 'scapy-integ': "NULL",
275 'key': "JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h",
276 'salt': struct.pack("!L", 0)},
277 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
278 IPSEC_API_CRYPTO_ALG_AES_CBC_128),
279 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
280 IPSEC_API_INTEG_ALG_SHA1_96),
281 'scapy-crypto': "AES-CBC",
282 'scapy-integ': "HMAC-SHA1-96",
284 'key': "JPjyOWBeVEQiMe7h"},
285 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
286 IPSEC_API_CRYPTO_ALG_AES_CBC_192),
287 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
288 IPSEC_API_INTEG_ALG_SHA1_96),
289 'scapy-crypto': "AES-CBC",
290 'scapy-integ': "HMAC-SHA1-96",
292 'key': "JPjyOWBeVEQiMe7hJPjyOWBe"},
293 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
294 IPSEC_API_CRYPTO_ALG_AES_CBC_256),
295 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
296 IPSEC_API_INTEG_ALG_SHA1_96),
297 'scapy-crypto': "AES-CBC",
298 'scapy-integ': "HMAC-SHA1-96",
300 'key': "JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"}]
302 for engine in engines:
303 self.vapi.cli("set crypto handler all %s" % engine)
306 # loop through each of the algorithms
309 # with self.subTest(algo=algo['scapy']):
311 p = copy.copy(self.ipv4_params)
312 p.auth_algo_vpp_id = algo['vpp-integ']
313 p.crypt_algo_vpp_id = algo['vpp-crypto']
314 p.crypt_algo = algo['scapy-crypto']
315 p.auth_algo = algo['scapy-integ']
316 p.crypt_key = algo['key']
317 p.crypt_salt = algo['salt']
319 self.config_network(p)
321 self.verify_tun_44(p, count=127)
322 c = p.tun_if.get_rx_stats()
323 self.assertEqual(c['packets'], 127)
324 c = p.tun_if.get_tx_stats()
325 self.assertEqual(c['packets'], 127)
327 self.unconfig_network(p)
330 class TestIpsec6MultiTunIfEsp(TemplateIpsec, IpsecTun6):
331 """ IPsec IPv6 Multi Tunnel interface """
333 encryption_type = ESP
334 tun6_encrypt_node_name = "esp6-encrypt"
335 tun6_decrypt_node_name = "esp6-decrypt"
338 super(TestIpsec6MultiTunIfEsp, self).setUp()
340 self.tun_if = self.pg0
342 self.multi_params = []
345 p = copy.copy(self.ipv6_params)
347 p.remote_tun_if_host = "1111::%d" % (ii + 1)
348 p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
349 p.scapy_tun_spi = p.scapy_tun_spi + ii
350 p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
351 p.vpp_tun_spi = p.vpp_tun_spi + ii
353 p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
354 p.scapy_tra_spi = p.scapy_tra_spi + ii
355 p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
356 p.vpp_tra_spi = p.vpp_tra_spi + ii
358 config_tun_params(p, self.encryption_type, self.tun_if)
359 self.multi_params.append(p)
361 p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
364 p.crypt_key, p.crypt_key,
365 p.auth_algo_vpp_id, p.auth_key,
366 p.auth_key, is_ip6=True)
367 p.tun_if.add_vpp_config()
369 p.tun_if.config_ip6()
371 VppIpRoute(self, p.remote_tun_if_host, 128,
372 [VppRoutePath(p.tun_if.remote_ip6,
374 proto=DpoProto.DPO_PROTO_IP6)],
375 is_ip6=1).add_vpp_config()
378 if not self.vpp_dead:
379 self.vapi.cli("show hardware")
380 super(TestIpsec6MultiTunIfEsp, self).tearDown()
382 def test_tun_66(self):
383 """Multiple IPSEC tunnel interfaces """
384 for p in self.multi_params:
385 self.verify_tun_66(p, count=127)
386 c = p.tun_if.get_rx_stats()
387 self.assertEqual(c['packets'], 127)
388 c = p.tun_if.get_tx_stats()
389 self.assertEqual(c['packets'], 127)
392 class TemplateIpsecGRETunIfEsp(TemplateIpsec):
393 """ IPsec GRE tunnel interface tests """
395 encryption_type = ESP
396 omac = "00:11:22:33:44:55"
398 def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1,
400 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
401 sa.encrypt(IP(src=self.pg0.remote_ip4,
402 dst=self.pg0.local_ip4) /
404 Ether(dst=self.omac) /
405 IP(src="1.1.1.1", dst="1.1.1.2") /
406 UDP(sport=1144, dport=2233) /
407 Raw('X' * payload_size))
408 for i in range(count)]
410 def gen_pkts(self, sw_intf, src, dst, count=1,
412 return [Ether(dst=self.omac) /
413 IP(src="1.1.1.1", dst="1.1.1.2") /
414 UDP(sport=1144, dport=2233) /
415 Raw('X' * payload_size)
416 for i in range(count)]
418 def verify_decrypted(self, p, rxs):
420 self.assert_equal(rx[Ether].dst, self.omac)
421 self.assert_equal(rx[IP].dst, "1.1.1.2")
423 def verify_encrypted(self, p, sa, rxs):
426 pkt = sa.decrypt(rx[IP])
427 if not pkt.haslayer(IP):
428 pkt = IP(pkt[Raw].load)
429 self.assert_packet_checksums_valid(pkt)
430 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
431 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
432 self.assertTrue(pkt.haslayer(GRE))
434 self.assertEqual(e[Ether].dst, self.omac)
435 self.assertEqual(e[IP].dst, "1.1.1.2")
436 except (IndexError, AssertionError):
437 self.logger.debug(ppp("Unexpected packet:", rx))
439 self.logger.debug(ppp("Decrypted packet:", pkt))
445 super(TemplateIpsecGRETunIfEsp, self).setUp()
447 self.tun_if = self.pg0
451 bd1 = VppBridgeDomain(self, 1)
454 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
455 p.auth_algo_vpp_id, p.auth_key,
456 p.crypt_algo_vpp_id, p.crypt_key,
457 self.vpp_esp_protocol,
460 p.tun_sa_out.add_vpp_config()
462 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
463 p.auth_algo_vpp_id, p.auth_key,
464 p.crypt_algo_vpp_id, p.crypt_key,
465 self.vpp_esp_protocol,
468 p.tun_sa_in.add_vpp_config()
470 self.tun = VppIpsecGRETunInterface(self, self.pg0,
474 self.tun.add_vpp_config()
476 self.tun.config_ip4()
478 VppIpRoute(self, p.remote_tun_if_host, 32,
479 [VppRoutePath(self.tun.remote_ip4,
480 0xffffffff)]).add_vpp_config()
481 VppBridgeDomainPort(self, bd1, self.tun).add_vpp_config()
482 VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
485 if not self.vpp_dead:
486 self.vapi.cli("show hardware")
487 self.tun.unconfig_ip4()
488 super(TemplateIpsecGRETunIfEsp, self).tearDown()
491 @unittest.skipIf(is_skip_aarch64_set and is_platform_aarch64,
492 "test doesn't work on aarch64")
493 class TestIpsecGRETunIfEsp1(TemplateIpsecGRETunIfEsp, IpsecTun4Tests):
494 """ Ipsec GRE ESP - TUN tests """
495 tun4_encrypt_node_name = "esp4-encrypt"
496 tun4_decrypt_node_name = "esp4-decrypt"
498 if __name__ == '__main__':
499 unittest.main(testRunner=VppTestRunner)