5 from scapy.layers.ipsec import SecurityAssociation, ESP
6 from scapy.layers.l2 import Ether, GRE, Dot1Q
7 from scapy.packet import Raw
8 from scapy.layers.inet import IP, UDP
9 from scapy.layers.inet6 import IPv6
10 from framework import VppTestRunner
11 from template_ipsec import TemplateIpsec, IpsecTun4Tests, IpsecTun6Tests, \
12 IpsecTun4, IpsecTun6, IpsecTcpTests, mk_scapy_crypt_key, \
13 IpsecTun6HandoffTests, IpsecTun4HandoffTests, config_tun_params
14 from vpp_ipsec_tun_interface import VppIpsecTunInterface
15 from vpp_gre_interface import VppGreInterface
16 from vpp_ipip_tun_interface import VppIpIpTunInterface
17 from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto
18 from vpp_ipsec import VppIpsecSA, VppIpsecTunProtect
19 from vpp_l2 import VppBridgeDomain, VppBridgeDomainPort
20 from vpp_sub_interface import L2_VTR_OP, VppDot1QSubint
22 from vpp_papi import VppEnum
25 def config_tun_params(p, encryption_type, tun_if):
26 ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
27 esn_en = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
28 IPSEC_API_SAD_FLAG_USE_ESN))
29 crypt_key = mk_scapy_crypt_key(p)
30 p.tun_dst = tun_if.remote_ip
31 p.tun_src = tun_if.local_ip
32 p.scapy_tun_sa = SecurityAssociation(
33 encryption_type, spi=p.vpp_tun_spi,
34 crypt_algo=p.crypt_algo,
36 auth_algo=p.auth_algo, auth_key=p.auth_key,
37 tunnel_header=ip_class_by_addr_type[p.addr_type](
40 nat_t_header=p.nat_header,
42 p.vpp_tun_sa = SecurityAssociation(
43 encryption_type, spi=p.scapy_tun_spi,
44 crypt_algo=p.crypt_algo,
46 auth_algo=p.auth_algo, auth_key=p.auth_key,
47 tunnel_header=ip_class_by_addr_type[p.addr_type](
50 nat_t_header=p.nat_header,
54 def config_tra_params(p, encryption_type, tun_if):
55 ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
56 esn_en = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
57 IPSEC_API_SAD_FLAG_USE_ESN))
58 crypt_key = mk_scapy_crypt_key(p)
59 p.tun_dst = tun_if.remote_ip
60 p.tun_src = tun_if.local_ip
61 p.scapy_tun_sa = SecurityAssociation(
62 encryption_type, spi=p.vpp_tun_spi,
63 crypt_algo=p.crypt_algo,
65 auth_algo=p.auth_algo, auth_key=p.auth_key,
67 p.vpp_tun_sa = SecurityAssociation(
68 encryption_type, spi=p.scapy_tun_spi,
69 crypt_algo=p.crypt_algo,
71 auth_algo=p.auth_algo, auth_key=p.auth_key,
75 class TemplateIpsec4TunIfEsp(TemplateIpsec):
76 """ IPsec tunnel interface tests """
82 super(TemplateIpsec4TunIfEsp, cls).setUpClass()
85 def tearDownClass(cls):
86 super(TemplateIpsec4TunIfEsp, cls).tearDownClass()
89 super(TemplateIpsec4TunIfEsp, self).setUp()
91 self.tun_if = self.pg0
95 p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
96 p.scapy_tun_spi, p.crypt_algo_vpp_id,
97 p.crypt_key, p.crypt_key,
98 p.auth_algo_vpp_id, p.auth_key,
100 p.tun_if.add_vpp_config()
102 p.tun_if.config_ip4()
103 p.tun_if.config_ip6()
104 config_tun_params(p, self.encryption_type, p.tun_if)
106 r = VppIpRoute(self, p.remote_tun_if_host, 32,
107 [VppRoutePath(p.tun_if.remote_ip4,
110 r = VppIpRoute(self, p.remote_tun_if_host6, 128,
111 [VppRoutePath(p.tun_if.remote_ip6,
113 proto=DpoProto.DPO_PROTO_IP6)])
117 super(TemplateIpsec4TunIfEsp, self).tearDown()
120 class TemplateIpsec4TunIfEspUdp(TemplateIpsec):
121 """ IPsec UDP tunnel interface tests """
123 tun4_encrypt_node_name = "esp4-encrypt-tun"
124 tun4_decrypt_node_name = "esp4-decrypt-tun"
125 encryption_type = ESP
129 super(TemplateIpsec4TunIfEspUdp, cls).setUpClass()
132 def tearDownClass(cls):
133 super(TemplateIpsec4TunIfEspUdp, cls).tearDownClass()
136 super(TemplateIpsec4TunIfEspUdp, self).setUp()
138 self.tun_if = self.pg0
141 p.flags = (VppEnum.vl_api_ipsec_sad_flags_t.
142 IPSEC_API_SAD_FLAG_UDP_ENCAP)
143 p.nat_header = UDP(sport=5454, dport=4500)
145 p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
146 p.scapy_tun_spi, p.crypt_algo_vpp_id,
147 p.crypt_key, p.crypt_key,
148 p.auth_algo_vpp_id, p.auth_key,
149 p.auth_key, udp_encap=True)
150 p.tun_if.add_vpp_config()
152 p.tun_if.config_ip4()
153 p.tun_if.config_ip6()
154 config_tun_params(p, self.encryption_type, p.tun_if)
156 r = VppIpRoute(self, p.remote_tun_if_host, 32,
157 [VppRoutePath(p.tun_if.remote_ip4,
160 r = VppIpRoute(self, p.remote_tun_if_host6, 128,
161 [VppRoutePath(p.tun_if.remote_ip6,
163 proto=DpoProto.DPO_PROTO_IP6)])
167 super(TemplateIpsec4TunIfEspUdp, self).tearDown()
170 class TestIpsec4TunIfEsp1(TemplateIpsec4TunIfEsp, IpsecTun4Tests):
171 """ Ipsec ESP - TUN tests """
172 tun4_encrypt_node_name = "esp4-encrypt-tun"
173 tun4_decrypt_node_name = "esp4-decrypt-tun"
175 def test_tun_basic64(self):
176 """ ipsec 6o4 tunnel basic test """
177 self.tun4_encrypt_node_name = "esp4-encrypt-tun"
179 self.verify_tun_64(self.params[socket.AF_INET], count=1)
181 def test_tun_burst64(self):
182 """ ipsec 6o4 tunnel basic test """
183 self.tun4_encrypt_node_name = "esp4-encrypt-tun"
185 self.verify_tun_64(self.params[socket.AF_INET], count=257)
187 def test_tun_basic_frag44(self):
188 """ ipsec 4o4 tunnel frag basic test """
189 self.tun4_encrypt_node_name = "esp4-encrypt-tun"
193 self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index,
195 self.verify_tun_44(self.params[socket.AF_INET],
196 count=1, payload_size=1800, n_rx=2)
197 self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index,
201 class TestIpsec4TunIfEspUdp(TemplateIpsec4TunIfEspUdp, IpsecTun4Tests):
202 """ Ipsec ESP UDP tests """
204 tun4_input_node = "ipsec4-tun-input"
206 def test_keepalive(self):
207 """ IPSEC NAT Keepalive """
208 self.verify_keepalive(self.ipv4_params)
211 class TestIpsec4TunIfEsp2(TemplateIpsec4TunIfEsp, IpsecTcpTests):
212 """ Ipsec ESP - TCP tests """
216 class TemplateIpsec6TunIfEsp(TemplateIpsec):
217 """ IPsec tunnel interface tests """
219 encryption_type = ESP
222 super(TemplateIpsec6TunIfEsp, self).setUp()
224 self.tun_if = self.pg0
227 p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
228 p.scapy_tun_spi, p.crypt_algo_vpp_id,
229 p.crypt_key, p.crypt_key,
230 p.auth_algo_vpp_id, p.auth_key,
231 p.auth_key, is_ip6=True)
232 p.tun_if.add_vpp_config()
234 p.tun_if.config_ip6()
235 p.tun_if.config_ip4()
236 config_tun_params(p, self.encryption_type, p.tun_if)
238 r = VppIpRoute(self, p.remote_tun_if_host, 128,
239 [VppRoutePath(p.tun_if.remote_ip6,
241 proto=DpoProto.DPO_PROTO_IP6)])
243 r = VppIpRoute(self, p.remote_tun_if_host4, 32,
244 [VppRoutePath(p.tun_if.remote_ip4,
249 super(TemplateIpsec6TunIfEsp, self).tearDown()
252 class TestIpsec6TunIfEsp1(TemplateIpsec6TunIfEsp,
254 """ Ipsec ESP - TUN tests """
255 tun6_encrypt_node_name = "esp6-encrypt-tun"
256 tun6_decrypt_node_name = "esp6-decrypt-tun"
258 def test_tun_basic46(self):
259 """ ipsec 4o6 tunnel basic test """
260 self.tun6_encrypt_node_name = "esp6-encrypt-tun"
261 self.verify_tun_46(self.params[socket.AF_INET6], count=1)
263 def test_tun_burst46(self):
264 """ ipsec 4o6 tunnel burst test """
265 self.tun6_encrypt_node_name = "esp6-encrypt-tun"
266 self.verify_tun_46(self.params[socket.AF_INET6], count=257)
269 class TestIpsec6TunIfEspHandoff(TemplateIpsec6TunIfEsp,
270 IpsecTun6HandoffTests):
271 """ Ipsec ESP 6 Handoff tests """
272 tun6_encrypt_node_name = "esp6-encrypt-tun"
273 tun6_decrypt_node_name = "esp6-decrypt-tun"
276 class TestIpsec4TunIfEspHandoff(TemplateIpsec4TunIfEsp,
277 IpsecTun4HandoffTests):
278 """ Ipsec ESP 4 Handoff tests """
279 tun4_encrypt_node_name = "esp4-encrypt-tun"
280 tun4_decrypt_node_name = "esp4-decrypt-tun"
283 class TestIpsec4MultiTunIfEsp(TemplateIpsec, IpsecTun4):
284 """ IPsec IPv4 Multi Tunnel interface """
286 encryption_type = ESP
287 tun4_encrypt_node_name = "esp4-encrypt-tun"
288 tun4_decrypt_node_name = "esp4-decrypt-tun"
291 super(TestIpsec4MultiTunIfEsp, self).setUp()
293 self.tun_if = self.pg0
295 self.multi_params = []
296 self.pg0.generate_remote_hosts(10)
297 self.pg0.configure_ipv4_neighbors()
300 p = copy.copy(self.ipv4_params)
302 p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
303 p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
304 p.scapy_tun_spi = p.scapy_tun_spi + ii
305 p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
306 p.vpp_tun_spi = p.vpp_tun_spi + ii
308 p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
309 p.scapy_tra_spi = p.scapy_tra_spi + ii
310 p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
311 p.vpp_tra_spi = p.vpp_tra_spi + ii
312 p.tun_dst = self.pg0.remote_hosts[ii].ip4
314 p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
317 p.crypt_key, p.crypt_key,
318 p.auth_algo_vpp_id, p.auth_key,
321 p.tun_if.add_vpp_config()
323 p.tun_if.config_ip4()
324 config_tun_params(p, self.encryption_type, p.tun_if)
325 self.multi_params.append(p)
327 VppIpRoute(self, p.remote_tun_if_host, 32,
328 [VppRoutePath(p.tun_if.remote_ip4,
329 0xffffffff)]).add_vpp_config()
332 super(TestIpsec4MultiTunIfEsp, self).tearDown()
334 def test_tun_44(self):
335 """Multiple IPSEC tunnel interfaces """
336 for p in self.multi_params:
337 self.verify_tun_44(p, count=127)
338 c = p.tun_if.get_rx_stats()
339 self.assertEqual(c['packets'], 127)
340 c = p.tun_if.get_tx_stats()
341 self.assertEqual(c['packets'], 127)
343 def test_tun_rr_44(self):
344 """ Round-robin packets acrros multiple interface """
346 for p in self.multi_params:
347 tx = tx + self.gen_encrypt_pkts(p.scapy_tun_sa, self.tun_if,
348 src=p.remote_tun_if_host,
349 dst=self.pg1.remote_ip4)
350 rxs = self.send_and_expect(self.tun_if, tx, self.pg1)
352 for rx, p in zip(rxs, self.multi_params):
353 self.verify_decrypted(p, [rx])
356 for p in self.multi_params:
357 tx = tx + self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
358 dst=p.remote_tun_if_host)
359 rxs = self.send_and_expect(self.pg1, tx, self.tun_if)
361 for rx, p in zip(rxs, self.multi_params):
362 self.verify_encrypted(p, p.vpp_tun_sa, [rx])
365 class TestIpsec4TunIfEspAll(TemplateIpsec, IpsecTun4):
366 """ IPsec IPv4 Tunnel interface all Algos """
368 encryption_type = ESP
369 tun4_encrypt_node_name = "esp4-encrypt-tun"
370 tun4_decrypt_node_name = "esp4-decrypt-tun"
372 def config_network(self, p):
374 p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
377 p.crypt_key, p.crypt_key,
378 p.auth_algo_vpp_id, p.auth_key,
381 p.tun_if.add_vpp_config()
383 p.tun_if.config_ip4()
384 config_tun_params(p, self.encryption_type, p.tun_if)
385 self.logger.info(self.vapi.cli("sh ipsec sa 0"))
386 self.logger.info(self.vapi.cli("sh ipsec sa 1"))
388 p.route = VppIpRoute(self, p.remote_tun_if_host, 32,
389 [VppRoutePath(p.tun_if.remote_ip4,
391 p.route.add_vpp_config()
393 def unconfig_network(self, p):
394 p.tun_if.unconfig_ip4()
395 p.tun_if.remove_vpp_config()
396 p.route.remove_vpp_config()
399 super(TestIpsec4TunIfEspAll, self).setUp()
401 self.tun_if = self.pg0
404 super(TestIpsec4TunIfEspAll, self).tearDown()
408 # change the key and the SPI
410 p.crypt_key = b'X' + p.crypt_key[1:]
412 p.scapy_tun_sa_id += 1
415 p.tun_if.local_spi = p.vpp_tun_spi
416 p.tun_if.remote_spi = p.scapy_tun_spi
418 config_tun_params(p, self.encryption_type, p.tun_if)
420 p.tun_sa_in = VppIpsecSA(self,
427 self.vpp_esp_protocol,
430 p.tun_sa_out = VppIpsecSA(self,
437 self.vpp_esp_protocol,
440 p.tun_sa_in.add_vpp_config()
441 p.tun_sa_out.add_vpp_config()
443 self.vapi.ipsec_tunnel_if_set_sa(sw_if_index=p.tun_if.sw_if_index,
444 sa_id=p.tun_sa_in.id,
446 self.vapi.ipsec_tunnel_if_set_sa(sw_if_index=p.tun_if.sw_if_index,
447 sa_id=p.tun_sa_out.id,
449 self.logger.info(self.vapi.cli("sh ipsec sa"))
451 def test_tun_44(self):
452 """IPSEC tunnel all algos """
454 # foreach VPP crypto engine
455 engines = ["ia32", "ipsecmb", "openssl"]
457 # foreach crypto algorithm
458 algos = [{'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
459 IPSEC_API_CRYPTO_ALG_AES_GCM_128),
460 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
461 IPSEC_API_INTEG_ALG_NONE),
462 'scapy-crypto': "AES-GCM",
463 'scapy-integ': "NULL",
464 'key': b"JPjyOWBeVEQiMe7h",
466 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
467 IPSEC_API_CRYPTO_ALG_AES_GCM_192),
468 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
469 IPSEC_API_INTEG_ALG_NONE),
470 'scapy-crypto': "AES-GCM",
471 'scapy-integ': "NULL",
472 'key': b"JPjyOWBeVEQiMe7hJPjyOWBe",
474 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
475 IPSEC_API_CRYPTO_ALG_AES_GCM_256),
476 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
477 IPSEC_API_INTEG_ALG_NONE),
478 'scapy-crypto': "AES-GCM",
479 'scapy-integ': "NULL",
480 'key': b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h",
482 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
483 IPSEC_API_CRYPTO_ALG_AES_CBC_128),
484 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
485 IPSEC_API_INTEG_ALG_SHA1_96),
486 'scapy-crypto': "AES-CBC",
487 'scapy-integ': "HMAC-SHA1-96",
489 'key': b"JPjyOWBeVEQiMe7h"},
490 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
491 IPSEC_API_CRYPTO_ALG_AES_CBC_192),
492 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
493 IPSEC_API_INTEG_ALG_SHA1_96),
494 'scapy-crypto': "AES-CBC",
495 'scapy-integ': "HMAC-SHA1-96",
497 'key': b"JPjyOWBeVEQiMe7hJPjyOWBe"},
498 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
499 IPSEC_API_CRYPTO_ALG_AES_CBC_256),
500 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
501 IPSEC_API_INTEG_ALG_SHA1_96),
502 'scapy-crypto': "AES-CBC",
503 'scapy-integ': "HMAC-SHA1-96",
505 'key': b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"},
506 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
507 IPSEC_API_CRYPTO_ALG_NONE),
508 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
509 IPSEC_API_INTEG_ALG_SHA1_96),
510 'scapy-crypto': "NULL",
511 'scapy-integ': "HMAC-SHA1-96",
513 'key': b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"}]
515 for engine in engines:
516 self.vapi.cli("set crypto handler all %s" % engine)
519 # loop through each of the algorithms
522 # with self.subTest(algo=algo['scapy']):
524 p = copy.copy(self.ipv4_params)
525 p.auth_algo_vpp_id = algo['vpp-integ']
526 p.crypt_algo_vpp_id = algo['vpp-crypto']
527 p.crypt_algo = algo['scapy-crypto']
528 p.auth_algo = algo['scapy-integ']
529 p.crypt_key = algo['key']
530 p.salt = algo['salt']
532 self.config_network(p)
534 self.verify_tun_44(p, count=127)
535 c = p.tun_if.get_rx_stats()
536 self.assertEqual(c['packets'], 127)
537 c = p.tun_if.get_tx_stats()
538 self.assertEqual(c['packets'], 127)
544 self.verify_tun_44(p, count=127)
546 self.unconfig_network(p)
547 p.tun_sa_out.remove_vpp_config()
548 p.tun_sa_in.remove_vpp_config()
551 class TestIpsec4TunIfEspNoAlgo(TemplateIpsec, IpsecTun4):
552 """ IPsec IPv4 Tunnel interface all Algos """
554 encryption_type = ESP
555 tun4_encrypt_node_name = "esp4-encrypt-tun"
556 tun4_decrypt_node_name = "esp4-decrypt-tun"
558 def config_network(self, p):
560 p.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
561 IPSEC_API_INTEG_ALG_NONE)
565 p.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
566 IPSEC_API_CRYPTO_ALG_NONE)
567 p.crypt_algo = 'NULL'
570 p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
573 p.crypt_key, p.crypt_key,
574 p.auth_algo_vpp_id, p.auth_key,
577 p.tun_if.add_vpp_config()
579 p.tun_if.config_ip4()
580 config_tun_params(p, self.encryption_type, p.tun_if)
581 self.logger.info(self.vapi.cli("sh ipsec sa 0"))
582 self.logger.info(self.vapi.cli("sh ipsec sa 1"))
584 p.route = VppIpRoute(self, p.remote_tun_if_host, 32,
585 [VppRoutePath(p.tun_if.remote_ip4,
587 p.route.add_vpp_config()
589 def unconfig_network(self, p):
590 p.tun_if.unconfig_ip4()
591 p.tun_if.remove_vpp_config()
592 p.route.remove_vpp_config()
595 super(TestIpsec4TunIfEspNoAlgo, self).setUp()
597 self.tun_if = self.pg0
600 super(TestIpsec4TunIfEspNoAlgo, self).tearDown()
602 def test_tun_44(self):
605 self.config_network(p)
607 tx = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
608 dst=p.remote_tun_if_host)
609 self.send_and_assert_no_replies(self.pg1, tx)
611 self.unconfig_network(p)
614 class TestIpsec6MultiTunIfEsp(TemplateIpsec, IpsecTun6):
615 """ IPsec IPv6 Multi Tunnel interface """
617 encryption_type = ESP
618 tun6_encrypt_node_name = "esp6-encrypt-tun"
619 tun6_decrypt_node_name = "esp6-decrypt-tun"
622 super(TestIpsec6MultiTunIfEsp, self).setUp()
624 self.tun_if = self.pg0
626 self.multi_params = []
627 self.pg0.generate_remote_hosts(10)
628 self.pg0.configure_ipv6_neighbors()
631 p = copy.copy(self.ipv6_params)
633 p.remote_tun_if_host = "1111::%d" % (ii + 1)
634 p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
635 p.scapy_tun_spi = p.scapy_tun_spi + ii
636 p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
637 p.vpp_tun_spi = p.vpp_tun_spi + ii
639 p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
640 p.scapy_tra_spi = p.scapy_tra_spi + ii
641 p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
642 p.vpp_tra_spi = p.vpp_tra_spi + ii
644 p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
647 p.crypt_key, p.crypt_key,
648 p.auth_algo_vpp_id, p.auth_key,
649 p.auth_key, is_ip6=True,
650 dst=self.pg0.remote_hosts[ii].ip6)
651 p.tun_if.add_vpp_config()
653 p.tun_if.config_ip6()
654 config_tun_params(p, self.encryption_type, p.tun_if)
655 self.multi_params.append(p)
657 r = VppIpRoute(self, p.remote_tun_if_host, 128,
658 [VppRoutePath(p.tun_if.remote_ip6,
660 proto=DpoProto.DPO_PROTO_IP6)])
664 super(TestIpsec6MultiTunIfEsp, self).tearDown()
666 def test_tun_66(self):
667 """Multiple IPSEC tunnel interfaces """
668 for p in self.multi_params:
669 self.verify_tun_66(p, count=127)
670 c = p.tun_if.get_rx_stats()
671 self.assertEqual(c['packets'], 127)
672 c = p.tun_if.get_tx_stats()
673 self.assertEqual(c['packets'], 127)
676 class TestIpsecGreTebIfEsp(TemplateIpsec,
678 """ Ipsec GRE TEB ESP - TUN tests """
679 tun4_encrypt_node_name = "esp4-encrypt-tun"
680 tun4_decrypt_node_name = "esp4-decrypt-tun"
681 encryption_type = ESP
682 omac = "00:11:22:33:44:55"
684 def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1,
686 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
687 sa.encrypt(IP(src=self.pg0.remote_ip4,
688 dst=self.pg0.local_ip4) /
690 Ether(dst=self.omac) /
691 IP(src="1.1.1.1", dst="1.1.1.2") /
692 UDP(sport=1144, dport=2233) /
693 Raw(b'X' * payload_size))
694 for i in range(count)]
696 def gen_pkts(self, sw_intf, src, dst, count=1,
698 return [Ether(dst=self.omac) /
699 IP(src="1.1.1.1", dst="1.1.1.2") /
700 UDP(sport=1144, dport=2233) /
701 Raw(b'X' * payload_size)
702 for i in range(count)]
704 def verify_decrypted(self, p, rxs):
706 self.assert_equal(rx[Ether].dst, self.omac)
707 self.assert_equal(rx[IP].dst, "1.1.1.2")
709 def verify_encrypted(self, p, sa, rxs):
712 pkt = sa.decrypt(rx[IP])
713 if not pkt.haslayer(IP):
714 pkt = IP(pkt[Raw].load)
715 self.assert_packet_checksums_valid(pkt)
716 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
717 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
718 self.assertTrue(pkt.haslayer(GRE))
720 self.assertEqual(e[Ether].dst, self.omac)
721 self.assertEqual(e[IP].dst, "1.1.1.2")
722 except (IndexError, AssertionError):
723 self.logger.debug(ppp("Unexpected packet:", rx))
725 self.logger.debug(ppp("Decrypted packet:", pkt))
731 super(TestIpsecGreTebIfEsp, self).setUp()
733 self.tun_if = self.pg0
737 bd1 = VppBridgeDomain(self, 1)
740 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
741 p.auth_algo_vpp_id, p.auth_key,
742 p.crypt_algo_vpp_id, p.crypt_key,
743 self.vpp_esp_protocol,
746 p.tun_sa_out.add_vpp_config()
748 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
749 p.auth_algo_vpp_id, p.auth_key,
750 p.crypt_algo_vpp_id, p.crypt_key,
751 self.vpp_esp_protocol,
754 p.tun_sa_in.add_vpp_config()
756 p.tun_if = VppGreInterface(self,
759 type=(VppEnum.vl_api_gre_tunnel_type_t.
760 GRE_API_TUNNEL_TYPE_TEB))
761 p.tun_if.add_vpp_config()
763 p.tun_protect = VppIpsecTunProtect(self,
768 p.tun_protect.add_vpp_config()
771 p.tun_if.config_ip4()
772 config_tun_params(p, self.encryption_type, p.tun_if)
774 VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
775 VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
777 self.vapi.cli("clear ipsec sa")
781 p.tun_if.unconfig_ip4()
782 super(TestIpsecGreTebIfEsp, self).tearDown()
785 class TestIpsecGreTebVlanIfEsp(TemplateIpsec,
787 """ Ipsec GRE TEB ESP - TUN tests """
788 tun4_encrypt_node_name = "esp4-encrypt-tun"
789 tun4_decrypt_node_name = "esp4-decrypt-tun"
790 encryption_type = ESP
791 omac = "00:11:22:33:44:55"
793 def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1,
795 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
796 sa.encrypt(IP(src=self.pg0.remote_ip4,
797 dst=self.pg0.local_ip4) /
799 Ether(dst=self.omac) /
800 IP(src="1.1.1.1", dst="1.1.1.2") /
801 UDP(sport=1144, dport=2233) /
802 Raw(b'X' * payload_size))
803 for i in range(count)]
805 def gen_pkts(self, sw_intf, src, dst, count=1,
807 return [Ether(dst=self.omac) /
809 IP(src="1.1.1.1", dst="1.1.1.2") /
810 UDP(sport=1144, dport=2233) /
811 Raw(b'X' * payload_size)
812 for i in range(count)]
814 def verify_decrypted(self, p, rxs):
816 self.assert_equal(rx[Ether].dst, self.omac)
817 self.assert_equal(rx[Dot1Q].vlan, 11)
818 self.assert_equal(rx[IP].dst, "1.1.1.2")
820 def verify_encrypted(self, p, sa, rxs):
823 pkt = sa.decrypt(rx[IP])
824 if not pkt.haslayer(IP):
825 pkt = IP(pkt[Raw].load)
826 self.assert_packet_checksums_valid(pkt)
827 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
828 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
829 self.assertTrue(pkt.haslayer(GRE))
831 self.assertEqual(e[Ether].dst, self.omac)
832 self.assertFalse(e.haslayer(Dot1Q))
833 self.assertEqual(e[IP].dst, "1.1.1.2")
834 except (IndexError, AssertionError):
835 self.logger.debug(ppp("Unexpected packet:", rx))
837 self.logger.debug(ppp("Decrypted packet:", pkt))
843 super(TestIpsecGreTebVlanIfEsp, self).setUp()
845 self.tun_if = self.pg0
849 bd1 = VppBridgeDomain(self, 1)
852 self.pg1_11 = VppDot1QSubint(self, self.pg1, 11)
853 self.vapi.l2_interface_vlan_tag_rewrite(
854 sw_if_index=self.pg1_11.sw_if_index, vtr_op=L2_VTR_OP.L2_POP_1,
856 self.pg1_11.admin_up()
858 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
859 p.auth_algo_vpp_id, p.auth_key,
860 p.crypt_algo_vpp_id, p.crypt_key,
861 self.vpp_esp_protocol,
864 p.tun_sa_out.add_vpp_config()
866 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
867 p.auth_algo_vpp_id, p.auth_key,
868 p.crypt_algo_vpp_id, p.crypt_key,
869 self.vpp_esp_protocol,
872 p.tun_sa_in.add_vpp_config()
874 p.tun_if = VppGreInterface(self,
877 type=(VppEnum.vl_api_gre_tunnel_type_t.
878 GRE_API_TUNNEL_TYPE_TEB))
879 p.tun_if.add_vpp_config()
881 p.tun_protect = VppIpsecTunProtect(self,
886 p.tun_protect.add_vpp_config()
889 p.tun_if.config_ip4()
890 config_tun_params(p, self.encryption_type, p.tun_if)
892 VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
893 VppBridgeDomainPort(self, bd1, self.pg1_11).add_vpp_config()
895 self.vapi.cli("clear ipsec sa")
899 p.tun_if.unconfig_ip4()
900 super(TestIpsecGreTebVlanIfEsp, self).tearDown()
901 self.pg1_11.admin_down()
902 self.pg1_11.remove_vpp_config()
905 class TestIpsecGreTebIfEspTra(TemplateIpsec,
907 """ Ipsec GRE TEB ESP - Tra tests """
908 tun4_encrypt_node_name = "esp4-encrypt-tun"
909 tun4_decrypt_node_name = "esp4-decrypt-tun"
910 encryption_type = ESP
911 omac = "00:11:22:33:44:55"
913 def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1,
915 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
916 sa.encrypt(IP(src=self.pg0.remote_ip4,
917 dst=self.pg0.local_ip4) /
919 Ether(dst=self.omac) /
920 IP(src="1.1.1.1", dst="1.1.1.2") /
921 UDP(sport=1144, dport=2233) /
922 Raw(b'X' * payload_size))
923 for i in range(count)]
925 def gen_pkts(self, sw_intf, src, dst, count=1,
927 return [Ether(dst=self.omac) /
928 IP(src="1.1.1.1", dst="1.1.1.2") /
929 UDP(sport=1144, dport=2233) /
930 Raw(b'X' * payload_size)
931 for i in range(count)]
933 def verify_decrypted(self, p, rxs):
935 self.assert_equal(rx[Ether].dst, self.omac)
936 self.assert_equal(rx[IP].dst, "1.1.1.2")
938 def verify_encrypted(self, p, sa, rxs):
941 pkt = sa.decrypt(rx[IP])
942 if not pkt.haslayer(IP):
943 pkt = IP(pkt[Raw].load)
944 self.assert_packet_checksums_valid(pkt)
945 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
946 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
947 self.assertTrue(pkt.haslayer(GRE))
949 self.assertEqual(e[Ether].dst, self.omac)
950 self.assertEqual(e[IP].dst, "1.1.1.2")
951 except (IndexError, AssertionError):
952 self.logger.debug(ppp("Unexpected packet:", rx))
954 self.logger.debug(ppp("Decrypted packet:", pkt))
960 super(TestIpsecGreTebIfEspTra, self).setUp()
962 self.tun_if = self.pg0
966 bd1 = VppBridgeDomain(self, 1)
969 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
970 p.auth_algo_vpp_id, p.auth_key,
971 p.crypt_algo_vpp_id, p.crypt_key,
972 self.vpp_esp_protocol)
973 p.tun_sa_out.add_vpp_config()
975 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
976 p.auth_algo_vpp_id, p.auth_key,
977 p.crypt_algo_vpp_id, p.crypt_key,
978 self.vpp_esp_protocol)
979 p.tun_sa_in.add_vpp_config()
981 p.tun_if = VppGreInterface(self,
984 type=(VppEnum.vl_api_gre_tunnel_type_t.
985 GRE_API_TUNNEL_TYPE_TEB))
986 p.tun_if.add_vpp_config()
988 p.tun_protect = VppIpsecTunProtect(self,
993 p.tun_protect.add_vpp_config()
996 p.tun_if.config_ip4()
997 config_tra_params(p, self.encryption_type, p.tun_if)
999 VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
1000 VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
1002 self.vapi.cli("clear ipsec sa")
1005 p = self.ipv4_params
1006 p.tun_if.unconfig_ip4()
1007 super(TestIpsecGreTebIfEspTra, self).tearDown()
1010 class TestIpsecGreIfEsp(TemplateIpsec,
1012 """ Ipsec GRE ESP - TUN tests """
1013 tun4_encrypt_node_name = "esp4-encrypt-tun"
1014 tun4_decrypt_node_name = "esp4-decrypt-tun"
1015 encryption_type = ESP
1017 def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1,
1019 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1020 sa.encrypt(IP(src=self.pg0.remote_ip4,
1021 dst=self.pg0.local_ip4) /
1023 IP(src=self.pg1.local_ip4,
1024 dst=self.pg1.remote_ip4) /
1025 UDP(sport=1144, dport=2233) /
1026 Raw(b'X' * payload_size))
1027 for i in range(count)]
1029 def gen_pkts(self, sw_intf, src, dst, count=1,
1031 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1032 IP(src="1.1.1.1", dst="1.1.1.2") /
1033 UDP(sport=1144, dport=2233) /
1034 Raw(b'X' * payload_size)
1035 for i in range(count)]
1037 def verify_decrypted(self, p, rxs):
1039 self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1040 self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1042 def verify_encrypted(self, p, sa, rxs):
1045 pkt = sa.decrypt(rx[IP])
1046 if not pkt.haslayer(IP):
1047 pkt = IP(pkt[Raw].load)
1048 self.assert_packet_checksums_valid(pkt)
1049 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1050 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1051 self.assertTrue(pkt.haslayer(GRE))
1053 self.assertEqual(e[IP].dst, "1.1.1.2")
1054 except (IndexError, AssertionError):
1055 self.logger.debug(ppp("Unexpected packet:", rx))
1057 self.logger.debug(ppp("Decrypted packet:", pkt))
1063 super(TestIpsecGreIfEsp, self).setUp()
1065 self.tun_if = self.pg0
1067 p = self.ipv4_params
1069 bd1 = VppBridgeDomain(self, 1)
1070 bd1.add_vpp_config()
1072 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1073 p.auth_algo_vpp_id, p.auth_key,
1074 p.crypt_algo_vpp_id, p.crypt_key,
1075 self.vpp_esp_protocol,
1077 self.pg0.remote_ip4)
1078 p.tun_sa_out.add_vpp_config()
1080 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1081 p.auth_algo_vpp_id, p.auth_key,
1082 p.crypt_algo_vpp_id, p.crypt_key,
1083 self.vpp_esp_protocol,
1084 self.pg0.remote_ip4,
1086 p.tun_sa_in.add_vpp_config()
1088 p.tun_if = VppGreInterface(self,
1090 self.pg0.remote_ip4)
1091 p.tun_if.add_vpp_config()
1093 p.tun_protect = VppIpsecTunProtect(self,
1097 p.tun_protect.add_vpp_config()
1100 p.tun_if.config_ip4()
1101 config_tun_params(p, self.encryption_type, p.tun_if)
1103 VppIpRoute(self, "1.1.1.2", 32,
1104 [VppRoutePath(p.tun_if.remote_ip4,
1105 0xffffffff)]).add_vpp_config()
1108 p = self.ipv4_params
1109 p.tun_if.unconfig_ip4()
1110 super(TestIpsecGreIfEsp, self).tearDown()
1113 class TestIpsecGreIfEspTra(TemplateIpsec,
1115 """ Ipsec GRE ESP - TRA tests """
1116 tun4_encrypt_node_name = "esp4-encrypt-tun"
1117 tun4_decrypt_node_name = "esp4-decrypt-tun"
1118 encryption_type = ESP
1120 def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1,
1122 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1123 sa.encrypt(IP(src=self.pg0.remote_ip4,
1124 dst=self.pg0.local_ip4) /
1126 IP(src=self.pg1.local_ip4,
1127 dst=self.pg1.remote_ip4) /
1128 UDP(sport=1144, dport=2233) /
1129 Raw(b'X' * payload_size))
1130 for i in range(count)]
1132 def gen_encrypt_non_ip_pkts(self, sa, sw_intf, src, dst, count=1,
1134 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1135 sa.encrypt(IP(src=self.pg0.remote_ip4,
1136 dst=self.pg0.local_ip4) /
1138 UDP(sport=1144, dport=2233) /
1139 Raw(b'X' * payload_size))
1140 for i in range(count)]
1142 def gen_pkts(self, sw_intf, src, dst, count=1,
1144 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1145 IP(src="1.1.1.1", dst="1.1.1.2") /
1146 UDP(sport=1144, dport=2233) /
1147 Raw(b'X' * payload_size)
1148 for i in range(count)]
1150 def verify_decrypted(self, p, rxs):
1152 self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1153 self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1155 def verify_encrypted(self, p, sa, rxs):
1158 pkt = sa.decrypt(rx[IP])
1159 if not pkt.haslayer(IP):
1160 pkt = IP(pkt[Raw].load)
1161 self.assert_packet_checksums_valid(pkt)
1162 self.assertTrue(pkt.haslayer(GRE))
1164 self.assertEqual(e[IP].dst, "1.1.1.2")
1165 except (IndexError, AssertionError):
1166 self.logger.debug(ppp("Unexpected packet:", rx))
1168 self.logger.debug(ppp("Decrypted packet:", pkt))
1174 super(TestIpsecGreIfEspTra, self).setUp()
1176 self.tun_if = self.pg0
1178 p = self.ipv4_params
1180 bd1 = VppBridgeDomain(self, 1)
1181 bd1.add_vpp_config()
1183 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1184 p.auth_algo_vpp_id, p.auth_key,
1185 p.crypt_algo_vpp_id, p.crypt_key,
1186 self.vpp_esp_protocol)
1187 p.tun_sa_out.add_vpp_config()
1189 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1190 p.auth_algo_vpp_id, p.auth_key,
1191 p.crypt_algo_vpp_id, p.crypt_key,
1192 self.vpp_esp_protocol)
1193 p.tun_sa_in.add_vpp_config()
1195 p.tun_if = VppGreInterface(self,
1197 self.pg0.remote_ip4)
1198 p.tun_if.add_vpp_config()
1200 p.tun_protect = VppIpsecTunProtect(self,
1204 p.tun_protect.add_vpp_config()
1207 p.tun_if.config_ip4()
1208 config_tra_params(p, self.encryption_type, p.tun_if)
1210 VppIpRoute(self, "1.1.1.2", 32,
1211 [VppRoutePath(p.tun_if.remote_ip4,
1212 0xffffffff)]).add_vpp_config()
1215 p = self.ipv4_params
1216 p.tun_if.unconfig_ip4()
1217 super(TestIpsecGreIfEspTra, self).tearDown()
1219 def test_gre_non_ip(self):
1220 p = self.ipv4_params
1221 tx = self.gen_encrypt_non_ip_pkts(p.scapy_tun_sa, self.tun_if,
1222 src=p.remote_tun_if_host,
1223 dst=self.pg1.remote_ip6)
1224 self.send_and_assert_no_replies(self.tun_if, tx)
1225 node_name = ('/err/%s/unsupported payload' %
1226 self.tun4_decrypt_node_name)
1227 self.assertEqual(1, self.statistics.get_err_counter(node_name))
1230 class TestIpsecGre6IfEspTra(TemplateIpsec,
1232 """ Ipsec GRE ESP - TRA tests """
1233 tun6_encrypt_node_name = "esp6-encrypt-tun"
1234 tun6_decrypt_node_name = "esp6-decrypt-tun"
1235 encryption_type = ESP
1237 def gen_encrypt_pkts6(self, sa, sw_intf, src, dst, count=1,
1239 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1240 sa.encrypt(IPv6(src=self.pg0.remote_ip6,
1241 dst=self.pg0.local_ip6) /
1243 IPv6(src=self.pg1.local_ip6,
1244 dst=self.pg1.remote_ip6) /
1245 UDP(sport=1144, dport=2233) /
1246 Raw(b'X' * payload_size))
1247 for i in range(count)]
1249 def gen_pkts6(self, sw_intf, src, dst, count=1,
1251 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1252 IPv6(src="1::1", dst="1::2") /
1253 UDP(sport=1144, dport=2233) /
1254 Raw(b'X' * payload_size)
1255 for i in range(count)]
1257 def verify_decrypted6(self, p, rxs):
1259 self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1260 self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
1262 def verify_encrypted6(self, p, sa, rxs):
1265 pkt = sa.decrypt(rx[IPv6])
1266 if not pkt.haslayer(IPv6):
1267 pkt = IPv6(pkt[Raw].load)
1268 self.assert_packet_checksums_valid(pkt)
1269 self.assertTrue(pkt.haslayer(GRE))
1271 self.assertEqual(e[IPv6].dst, "1::2")
1272 except (IndexError, AssertionError):
1273 self.logger.debug(ppp("Unexpected packet:", rx))
1275 self.logger.debug(ppp("Decrypted packet:", pkt))
1281 super(TestIpsecGre6IfEspTra, self).setUp()
1283 self.tun_if = self.pg0
1285 p = self.ipv6_params
1287 bd1 = VppBridgeDomain(self, 1)
1288 bd1.add_vpp_config()
1290 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1291 p.auth_algo_vpp_id, p.auth_key,
1292 p.crypt_algo_vpp_id, p.crypt_key,
1293 self.vpp_esp_protocol)
1294 p.tun_sa_out.add_vpp_config()
1296 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1297 p.auth_algo_vpp_id, p.auth_key,
1298 p.crypt_algo_vpp_id, p.crypt_key,
1299 self.vpp_esp_protocol)
1300 p.tun_sa_in.add_vpp_config()
1302 p.tun_if = VppGreInterface(self,
1304 self.pg0.remote_ip6)
1305 p.tun_if.add_vpp_config()
1307 p.tun_protect = VppIpsecTunProtect(self,
1311 p.tun_protect.add_vpp_config()
1314 p.tun_if.config_ip6()
1315 config_tra_params(p, self.encryption_type, p.tun_if)
1317 r = VppIpRoute(self, "1::2", 128,
1318 [VppRoutePath(p.tun_if.remote_ip6,
1320 proto=DpoProto.DPO_PROTO_IP6)])
1324 p = self.ipv6_params
1325 p.tun_if.unconfig_ip6()
1326 super(TestIpsecGre6IfEspTra, self).tearDown()
1329 class TemplateIpsec4TunProtect(object):
1330 """ IPsec IPv4 Tunnel protect """
1332 encryption_type = ESP
1333 tun4_encrypt_node_name = "esp4-encrypt-tun"
1334 tun4_decrypt_node_name = "esp4-decrypt-tun"
1335 tun4_input_node = "ipsec4-tun-input"
1337 def config_sa_tra(self, p):
1338 config_tun_params(p, self.encryption_type, p.tun_if)
1340 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1341 p.auth_algo_vpp_id, p.auth_key,
1342 p.crypt_algo_vpp_id, p.crypt_key,
1343 self.vpp_esp_protocol,
1345 p.tun_sa_out.add_vpp_config()
1347 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1348 p.auth_algo_vpp_id, p.auth_key,
1349 p.crypt_algo_vpp_id, p.crypt_key,
1350 self.vpp_esp_protocol,
1352 p.tun_sa_in.add_vpp_config()
1354 def config_sa_tun(self, p):
1355 config_tun_params(p, self.encryption_type, p.tun_if)
1357 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1358 p.auth_algo_vpp_id, p.auth_key,
1359 p.crypt_algo_vpp_id, p.crypt_key,
1360 self.vpp_esp_protocol,
1361 self.tun_if.local_addr[p.addr_type],
1362 self.tun_if.remote_addr[p.addr_type],
1364 p.tun_sa_out.add_vpp_config()
1366 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1367 p.auth_algo_vpp_id, p.auth_key,
1368 p.crypt_algo_vpp_id, p.crypt_key,
1369 self.vpp_esp_protocol,
1370 self.tun_if.remote_addr[p.addr_type],
1371 self.tun_if.local_addr[p.addr_type],
1373 p.tun_sa_in.add_vpp_config()
1375 def config_protect(self, p):
1376 p.tun_protect = VppIpsecTunProtect(self,
1380 p.tun_protect.add_vpp_config()
1382 def config_network(self, p):
1383 p.tun_if = VppIpIpTunInterface(self, self.pg0,
1385 self.pg0.remote_ip4)
1386 p.tun_if.add_vpp_config()
1388 p.tun_if.config_ip4()
1389 p.tun_if.config_ip6()
1391 p.route = VppIpRoute(self, p.remote_tun_if_host, 32,
1392 [VppRoutePath(p.tun_if.remote_ip4,
1394 p.route.add_vpp_config()
1395 r = VppIpRoute(self, p.remote_tun_if_host6, 128,
1396 [VppRoutePath(p.tun_if.remote_ip6,
1398 proto=DpoProto.DPO_PROTO_IP6)])
1401 def unconfig_network(self, p):
1402 p.route.remove_vpp_config()
1403 p.tun_if.remove_vpp_config()
1405 def unconfig_protect(self, p):
1406 p.tun_protect.remove_vpp_config()
1408 def unconfig_sa(self, p):
1409 p.tun_sa_out.remove_vpp_config()
1410 p.tun_sa_in.remove_vpp_config()
1413 class TestIpsec4TunProtect(TemplateIpsec,
1414 TemplateIpsec4TunProtect,
1416 """ IPsec IPv4 Tunnel protect - transport mode"""
1419 super(TestIpsec4TunProtect, self).setUp()
1421 self.tun_if = self.pg0
1424 super(TestIpsec4TunProtect, self).tearDown()
1426 def test_tun_44(self):
1427 """IPSEC tunnel protect"""
1429 p = self.ipv4_params
1431 self.config_network(p)
1432 self.config_sa_tra(p)
1433 self.config_protect(p)
1435 self.verify_tun_44(p, count=127)
1436 c = p.tun_if.get_rx_stats()
1437 self.assertEqual(c['packets'], 127)
1438 c = p.tun_if.get_tx_stats()
1439 self.assertEqual(c['packets'], 127)
1441 self.vapi.cli("clear ipsec sa")
1442 self.verify_tun_64(p, count=127)
1443 c = p.tun_if.get_rx_stats()
1444 self.assertEqual(c['packets'], 254)
1445 c = p.tun_if.get_tx_stats()
1446 self.assertEqual(c['packets'], 254)
1448 # rekey - create new SAs and update the tunnel protection
1450 np.crypt_key = b'X' + p.crypt_key[1:]
1451 np.scapy_tun_spi += 100
1452 np.scapy_tun_sa_id += 1
1453 np.vpp_tun_spi += 100
1454 np.vpp_tun_sa_id += 1
1455 np.tun_if.local_spi = p.vpp_tun_spi
1456 np.tun_if.remote_spi = p.scapy_tun_spi
1458 self.config_sa_tra(np)
1459 self.config_protect(np)
1462 self.verify_tun_44(np, count=127)
1463 c = p.tun_if.get_rx_stats()
1464 self.assertEqual(c['packets'], 381)
1465 c = p.tun_if.get_tx_stats()
1466 self.assertEqual(c['packets'], 381)
1469 self.unconfig_protect(np)
1470 self.unconfig_sa(np)
1471 self.unconfig_network(p)
1474 class TestIpsec4TunProtectUdp(TemplateIpsec,
1475 TemplateIpsec4TunProtect,
1477 """ IPsec IPv4 Tunnel protect - transport mode"""
1480 super(TestIpsec4TunProtectUdp, self).setUp()
1482 self.tun_if = self.pg0
1484 p = self.ipv4_params
1485 p.flags = (VppEnum.vl_api_ipsec_sad_flags_t.
1486 IPSEC_API_SAD_FLAG_UDP_ENCAP)
1487 p.nat_header = UDP(sport=5454, dport=4500)
1488 self.config_network(p)
1489 self.config_sa_tra(p)
1490 self.config_protect(p)
1493 p = self.ipv4_params
1494 self.unconfig_protect(p)
1496 self.unconfig_network(p)
1497 super(TestIpsec4TunProtectUdp, self).tearDown()
1499 def test_tun_44(self):
1500 """IPSEC UDP tunnel protect"""
1502 p = self.ipv4_params
1504 self.verify_tun_44(p, count=127)
1505 c = p.tun_if.get_rx_stats()
1506 self.assertEqual(c['packets'], 127)
1507 c = p.tun_if.get_tx_stats()
1508 self.assertEqual(c['packets'], 127)
1510 def test_keepalive(self):
1511 """ IPSEC NAT Keepalive """
1512 self.verify_keepalive(self.ipv4_params)
1515 class TestIpsec4TunProtectTun(TemplateIpsec,
1516 TemplateIpsec4TunProtect,
1518 """ IPsec IPv4 Tunnel protect - tunnel mode"""
1520 encryption_type = ESP
1521 tun4_encrypt_node_name = "esp4-encrypt-tun"
1522 tun4_decrypt_node_name = "esp4-decrypt-tun"
1525 super(TestIpsec4TunProtectTun, self).setUp()
1527 self.tun_if = self.pg0
1530 super(TestIpsec4TunProtectTun, self).tearDown()
1532 def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1,
1534 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1535 sa.encrypt(IP(src=sw_intf.remote_ip4,
1536 dst=sw_intf.local_ip4) /
1537 IP(src=src, dst=dst) /
1538 UDP(sport=1144, dport=2233) /
1539 Raw(b'X' * payload_size))
1540 for i in range(count)]
1542 def gen_pkts(self, sw_intf, src, dst, count=1,
1544 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1545 IP(src=src, dst=dst) /
1546 UDP(sport=1144, dport=2233) /
1547 Raw(b'X' * payload_size)
1548 for i in range(count)]
1550 def verify_decrypted(self, p, rxs):
1552 self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1553 self.assert_equal(rx[IP].src, p.remote_tun_if_host)
1554 self.assert_packet_checksums_valid(rx)
1556 def verify_encrypted(self, p, sa, rxs):
1559 pkt = sa.decrypt(rx[IP])
1560 if not pkt.haslayer(IP):
1561 pkt = IP(pkt[Raw].load)
1562 self.assert_packet_checksums_valid(pkt)
1563 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1564 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1565 inner = pkt[IP].payload
1566 self.assertEqual(inner[IP][IP].dst, p.remote_tun_if_host)
1568 except (IndexError, AssertionError):
1569 self.logger.debug(ppp("Unexpected packet:", rx))
1571 self.logger.debug(ppp("Decrypted packet:", pkt))
1576 def test_tun_44(self):
1577 """IPSEC tunnel protect """
1579 p = self.ipv4_params
1581 self.config_network(p)
1582 self.config_sa_tun(p)
1583 self.config_protect(p)
1585 self.verify_tun_44(p, count=127)
1587 c = p.tun_if.get_rx_stats()
1588 self.assertEqual(c['packets'], 127)
1589 c = p.tun_if.get_tx_stats()
1590 self.assertEqual(c['packets'], 127)
1592 # rekey - create new SAs and update the tunnel protection
1594 np.crypt_key = b'X' + p.crypt_key[1:]
1595 np.scapy_tun_spi += 100
1596 np.scapy_tun_sa_id += 1
1597 np.vpp_tun_spi += 100
1598 np.vpp_tun_sa_id += 1
1599 np.tun_if.local_spi = p.vpp_tun_spi
1600 np.tun_if.remote_spi = p.scapy_tun_spi
1602 self.config_sa_tun(np)
1603 self.config_protect(np)
1606 self.verify_tun_44(np, count=127)
1607 c = p.tun_if.get_rx_stats()
1608 self.assertEqual(c['packets'], 254)
1609 c = p.tun_if.get_tx_stats()
1610 self.assertEqual(c['packets'], 254)
1613 self.unconfig_protect(np)
1614 self.unconfig_sa(np)
1615 self.unconfig_network(p)
1618 class TestIpsec4TunProtectTunDrop(TemplateIpsec,
1619 TemplateIpsec4TunProtect,
1621 """ IPsec IPv4 Tunnel protect - tunnel mode - drop"""
1623 encryption_type = ESP
1624 tun4_encrypt_node_name = "esp4-encrypt-tun"
1625 tun4_decrypt_node_name = "esp4-decrypt-tun"
1628 super(TestIpsec4TunProtectTunDrop, self).setUp()
1630 self.tun_if = self.pg0
1633 super(TestIpsec4TunProtectTunDrop, self).tearDown()
1635 def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1,
1637 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1638 sa.encrypt(IP(src=sw_intf.remote_ip4,
1640 IP(src=src, dst=dst) /
1641 UDP(sport=1144, dport=2233) /
1642 Raw(b'X' * payload_size))
1643 for i in range(count)]
1645 def test_tun_drop_44(self):
1646 """IPSEC tunnel protect bogus tunnel header """
1648 p = self.ipv4_params
1650 self.config_network(p)
1651 self.config_sa_tun(p)
1652 self.config_protect(p)
1654 tx = self.gen_encrypt_pkts(p.scapy_tun_sa, self.tun_if,
1655 src=p.remote_tun_if_host,
1656 dst=self.pg1.remote_ip4,
1658 self.send_and_assert_no_replies(self.tun_if, tx)
1661 self.unconfig_protect(p)
1663 self.unconfig_network(p)
1666 class TemplateIpsec6TunProtect(object):
1667 """ IPsec IPv6 Tunnel protect """
1669 def config_sa_tra(self, p):
1670 config_tun_params(p, self.encryption_type, p.tun_if)
1672 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1673 p.auth_algo_vpp_id, p.auth_key,
1674 p.crypt_algo_vpp_id, p.crypt_key,
1675 self.vpp_esp_protocol)
1676 p.tun_sa_out.add_vpp_config()
1678 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1679 p.auth_algo_vpp_id, p.auth_key,
1680 p.crypt_algo_vpp_id, p.crypt_key,
1681 self.vpp_esp_protocol)
1682 p.tun_sa_in.add_vpp_config()
1684 def config_sa_tun(self, p):
1685 config_tun_params(p, self.encryption_type, p.tun_if)
1687 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1688 p.auth_algo_vpp_id, p.auth_key,
1689 p.crypt_algo_vpp_id, p.crypt_key,
1690 self.vpp_esp_protocol,
1691 self.tun_if.local_addr[p.addr_type],
1692 self.tun_if.remote_addr[p.addr_type])
1693 p.tun_sa_out.add_vpp_config()
1695 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1696 p.auth_algo_vpp_id, p.auth_key,
1697 p.crypt_algo_vpp_id, p.crypt_key,
1698 self.vpp_esp_protocol,
1699 self.tun_if.remote_addr[p.addr_type],
1700 self.tun_if.local_addr[p.addr_type])
1701 p.tun_sa_in.add_vpp_config()
1703 def config_protect(self, p):
1704 p.tun_protect = VppIpsecTunProtect(self,
1708 p.tun_protect.add_vpp_config()
1710 def config_network(self, p):
1711 p.tun_if = VppIpIpTunInterface(self, self.pg0,
1713 self.pg0.remote_ip6)
1714 p.tun_if.add_vpp_config()
1716 p.tun_if.config_ip6()
1717 p.tun_if.config_ip4()
1719 p.route = VppIpRoute(self, p.remote_tun_if_host, 128,
1720 [VppRoutePath(p.tun_if.remote_ip6,
1722 proto=DpoProto.DPO_PROTO_IP6)])
1723 p.route.add_vpp_config()
1724 r = VppIpRoute(self, p.remote_tun_if_host4, 32,
1725 [VppRoutePath(p.tun_if.remote_ip4,
1729 def unconfig_network(self, p):
1730 p.route.remove_vpp_config()
1731 p.tun_if.remove_vpp_config()
1733 def unconfig_protect(self, p):
1734 p.tun_protect.remove_vpp_config()
1736 def unconfig_sa(self, p):
1737 p.tun_sa_out.remove_vpp_config()
1738 p.tun_sa_in.remove_vpp_config()
1741 class TestIpsec6TunProtect(TemplateIpsec,
1742 TemplateIpsec6TunProtect,
1744 """ IPsec IPv6 Tunnel protect - transport mode"""
1746 encryption_type = ESP
1747 tun6_encrypt_node_name = "esp6-encrypt-tun"
1748 tun6_decrypt_node_name = "esp6-decrypt-tun"
1751 super(TestIpsec6TunProtect, self).setUp()
1753 self.tun_if = self.pg0
1756 super(TestIpsec6TunProtect, self).tearDown()
1758 def test_tun_66(self):
1759 """IPSEC tunnel protect 6o6"""
1761 p = self.ipv6_params
1763 self.config_network(p)
1764 self.config_sa_tra(p)
1765 self.config_protect(p)
1767 self.verify_tun_66(p, count=127)
1768 c = p.tun_if.get_rx_stats()
1769 self.assertEqual(c['packets'], 127)
1770 c = p.tun_if.get_tx_stats()
1771 self.assertEqual(c['packets'], 127)
1773 # rekey - create new SAs and update the tunnel protection
1775 np.crypt_key = b'X' + p.crypt_key[1:]
1776 np.scapy_tun_spi += 100
1777 np.scapy_tun_sa_id += 1
1778 np.vpp_tun_spi += 100
1779 np.vpp_tun_sa_id += 1
1780 np.tun_if.local_spi = p.vpp_tun_spi
1781 np.tun_if.remote_spi = p.scapy_tun_spi
1783 self.config_sa_tra(np)
1784 self.config_protect(np)
1787 self.verify_tun_66(np, count=127)
1788 c = p.tun_if.get_rx_stats()
1789 self.assertEqual(c['packets'], 254)
1790 c = p.tun_if.get_tx_stats()
1791 self.assertEqual(c['packets'], 254)
1793 # bounce the interface state
1794 p.tun_if.admin_down()
1795 self.verify_drop_tun_66(np, count=127)
1796 node = ('/err/ipsec6-tun-input/%s' %
1797 'ipsec packets received on disabled interface')
1798 self.assertEqual(127, self.statistics.get_err_counter(node))
1800 self.verify_tun_66(np, count=127)
1803 # 1) add two input SAs [old, new]
1804 # 2) swap output SA to [new]
1805 # 3) use only [new] input SA
1807 np3.crypt_key = b'Z' + p.crypt_key[1:]
1808 np3.scapy_tun_spi += 100
1809 np3.scapy_tun_sa_id += 1
1810 np3.vpp_tun_spi += 100
1811 np3.vpp_tun_sa_id += 1
1812 np3.tun_if.local_spi = p.vpp_tun_spi
1813 np3.tun_if.remote_spi = p.scapy_tun_spi
1815 self.config_sa_tra(np3)
1818 p.tun_protect.update_vpp_config(np.tun_sa_out,
1819 [np.tun_sa_in, np3.tun_sa_in])
1820 self.verify_tun_66(np, np, count=127)
1821 self.verify_tun_66(np3, np, count=127)
1824 p.tun_protect.update_vpp_config(np3.tun_sa_out,
1825 [np.tun_sa_in, np3.tun_sa_in])
1826 self.verify_tun_66(np, np3, count=127)
1827 self.verify_tun_66(np3, np3, count=127)
1830 p.tun_protect.update_vpp_config(np3.tun_sa_out,
1832 self.verify_tun_66(np3, np3, count=127)
1833 self.verify_drop_tun_66(np, count=127)
1835 c = p.tun_if.get_rx_stats()
1836 self.assertEqual(c['packets'], 127*9)
1837 c = p.tun_if.get_tx_stats()
1838 self.assertEqual(c['packets'], 127*8)
1839 self.unconfig_sa(np)
1842 self.unconfig_protect(np3)
1843 self.unconfig_sa(np3)
1844 self.unconfig_network(p)
1846 def test_tun_46(self):
1847 """IPSEC tunnel protect 4o6"""
1849 p = self.ipv6_params
1851 self.config_network(p)
1852 self.config_sa_tra(p)
1853 self.config_protect(p)
1855 self.verify_tun_46(p, count=127)
1856 c = p.tun_if.get_rx_stats()
1857 self.assertEqual(c['packets'], 127)
1858 c = p.tun_if.get_tx_stats()
1859 self.assertEqual(c['packets'], 127)
1862 self.unconfig_protect(p)
1864 self.unconfig_network(p)
1867 class TestIpsec6TunProtectTun(TemplateIpsec,
1868 TemplateIpsec6TunProtect,
1870 """ IPsec IPv6 Tunnel protect - tunnel mode"""
1872 encryption_type = ESP
1873 tun6_encrypt_node_name = "esp6-encrypt-tun"
1874 tun6_decrypt_node_name = "esp6-decrypt-tun"
1877 super(TestIpsec6TunProtectTun, self).setUp()
1879 self.tun_if = self.pg0
1882 super(TestIpsec6TunProtectTun, self).tearDown()
1884 def gen_encrypt_pkts6(self, sa, sw_intf, src, dst, count=1,
1886 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1887 sa.encrypt(IPv6(src=sw_intf.remote_ip6,
1888 dst=sw_intf.local_ip6) /
1889 IPv6(src=src, dst=dst) /
1890 UDP(sport=1166, dport=2233) /
1891 Raw(b'X' * payload_size))
1892 for i in range(count)]
1894 def gen_pkts6(self, sw_intf, src, dst, count=1,
1896 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1897 IPv6(src=src, dst=dst) /
1898 UDP(sport=1166, dport=2233) /
1899 Raw(b'X' * payload_size)
1900 for i in range(count)]
1902 def verify_decrypted6(self, p, rxs):
1904 self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
1905 self.assert_equal(rx[IPv6].src, p.remote_tun_if_host)
1906 self.assert_packet_checksums_valid(rx)
1908 def verify_encrypted6(self, p, sa, rxs):
1911 pkt = sa.decrypt(rx[IPv6])
1912 if not pkt.haslayer(IPv6):
1913 pkt = IPv6(pkt[Raw].load)
1914 self.assert_packet_checksums_valid(pkt)
1915 self.assert_equal(pkt[IPv6].dst, self.pg0.remote_ip6)
1916 self.assert_equal(pkt[IPv6].src, self.pg0.local_ip6)
1917 inner = pkt[IPv6].payload
1918 self.assertEqual(inner[IPv6][IPv6].dst, p.remote_tun_if_host)
1920 except (IndexError, AssertionError):
1921 self.logger.debug(ppp("Unexpected packet:", rx))
1923 self.logger.debug(ppp("Decrypted packet:", pkt))
1928 def test_tun_66(self):
1929 """IPSEC tunnel protect """
1931 p = self.ipv6_params
1933 self.config_network(p)
1934 self.config_sa_tun(p)
1935 self.config_protect(p)
1937 self.verify_tun_66(p, count=127)
1939 c = p.tun_if.get_rx_stats()
1940 self.assertEqual(c['packets'], 127)
1941 c = p.tun_if.get_tx_stats()
1942 self.assertEqual(c['packets'], 127)
1944 # rekey - create new SAs and update the tunnel protection
1946 np.crypt_key = b'X' + p.crypt_key[1:]
1947 np.scapy_tun_spi += 100
1948 np.scapy_tun_sa_id += 1
1949 np.vpp_tun_spi += 100
1950 np.vpp_tun_sa_id += 1
1951 np.tun_if.local_spi = p.vpp_tun_spi
1952 np.tun_if.remote_spi = p.scapy_tun_spi
1954 self.config_sa_tun(np)
1955 self.config_protect(np)
1958 self.verify_tun_66(np, count=127)
1959 c = p.tun_if.get_rx_stats()
1960 self.assertEqual(c['packets'], 254)
1961 c = p.tun_if.get_tx_stats()
1962 self.assertEqual(c['packets'], 254)
1965 self.unconfig_protect(np)
1966 self.unconfig_sa(np)
1967 self.unconfig_network(p)
1970 class TestIpsec6TunProtectTunDrop(TemplateIpsec,
1971 TemplateIpsec6TunProtect,
1973 """ IPsec IPv6 Tunnel protect - tunnel mode - drop"""
1975 encryption_type = ESP
1976 tun6_encrypt_node_name = "esp6-encrypt-tun"
1977 tun6_decrypt_node_name = "esp6-decrypt-tun"
1980 super(TestIpsec6TunProtectTunDrop, self).setUp()
1982 self.tun_if = self.pg0
1985 super(TestIpsec6TunProtectTunDrop, self).tearDown()
1987 def gen_encrypt_pkts5(self, sa, sw_intf, src, dst, count=1,
1989 # the IP destination of the revelaed packet does not match
1990 # that assigned to the tunnel
1991 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1992 sa.encrypt(IPv6(src=sw_intf.remote_ip6,
1994 IPv6(src=src, dst=dst) /
1995 UDP(sport=1144, dport=2233) /
1996 Raw(b'X' * payload_size))
1997 for i in range(count)]
1999 def test_tun_drop_66(self):
2000 """IPSEC 6 tunnel protect bogus tunnel header """
2002 p = self.ipv6_params
2004 self.config_network(p)
2005 self.config_sa_tun(p)
2006 self.config_protect(p)
2008 tx = self.gen_encrypt_pkts6(p.scapy_tun_sa, self.tun_if,
2009 src=p.remote_tun_if_host,
2010 dst=self.pg1.remote_ip6,
2012 self.send_and_assert_no_replies(self.tun_if, tx)
2014 self.unconfig_protect(p)
2016 self.unconfig_network(p)
2019 if __name__ == '__main__':
2020 unittest.main(testRunner=VppTestRunner)