5 from scapy.layers.ipsec import SecurityAssociation, ESP
6 from scapy.layers.l2 import Ether, GRE, Dot1Q
7 from scapy.packet import Raw
8 from scapy.layers.inet import IP, UDP
9 from scapy.layers.inet6 import IPv6
10 from framework import VppTestRunner
11 from template_ipsec import TemplateIpsec, IpsecTun4Tests, IpsecTun6Tests, \
12 IpsecTun4, IpsecTun6, IpsecTcpTests, mk_scapy_crypt_key, \
13 IpsecTun6HandoffTests, IpsecTun4HandoffTests, config_tun_params
14 from vpp_ipsec_tun_interface import VppIpsecTunInterface
15 from vpp_gre_interface import VppGreInterface
16 from vpp_ipip_tun_interface import VppIpIpTunInterface
17 from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto
18 from vpp_ipsec import VppIpsecSA, VppIpsecTunProtect
19 from vpp_l2 import VppBridgeDomain, VppBridgeDomainPort
20 from vpp_sub_interface import L2_VTR_OP, VppDot1QSubint
21 from vpp_teib import VppTeib
23 from vpp_papi import VppEnum
26 def config_tun_params(p, encryption_type, tun_if):
27 ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
28 esn_en = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
29 IPSEC_API_SAD_FLAG_USE_ESN))
30 crypt_key = mk_scapy_crypt_key(p)
31 p.tun_dst = tun_if.remote_ip
32 p.tun_src = tun_if.local_ip
33 p.scapy_tun_sa = SecurityAssociation(
34 encryption_type, spi=p.vpp_tun_spi,
35 crypt_algo=p.crypt_algo,
37 auth_algo=p.auth_algo, auth_key=p.auth_key,
38 tunnel_header=ip_class_by_addr_type[p.addr_type](
41 nat_t_header=p.nat_header,
43 p.vpp_tun_sa = SecurityAssociation(
44 encryption_type, spi=p.scapy_tun_spi,
45 crypt_algo=p.crypt_algo,
47 auth_algo=p.auth_algo, auth_key=p.auth_key,
48 tunnel_header=ip_class_by_addr_type[p.addr_type](
51 nat_t_header=p.nat_header,
55 def config_tra_params(p, encryption_type, tun_if):
56 ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
57 esn_en = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
58 IPSEC_API_SAD_FLAG_USE_ESN))
59 crypt_key = mk_scapy_crypt_key(p)
60 p.tun_dst = tun_if.remote_ip
61 p.tun_src = tun_if.local_ip
62 p.scapy_tun_sa = SecurityAssociation(
63 encryption_type, spi=p.vpp_tun_spi,
64 crypt_algo=p.crypt_algo,
66 auth_algo=p.auth_algo, auth_key=p.auth_key,
68 p.vpp_tun_sa = SecurityAssociation(
69 encryption_type, spi=p.scapy_tun_spi,
70 crypt_algo=p.crypt_algo,
72 auth_algo=p.auth_algo, auth_key=p.auth_key,
76 class TemplateIpsec4TunIfEsp(TemplateIpsec):
77 """ IPsec tunnel interface tests """
83 super(TemplateIpsec4TunIfEsp, cls).setUpClass()
86 def tearDownClass(cls):
87 super(TemplateIpsec4TunIfEsp, cls).tearDownClass()
90 super(TemplateIpsec4TunIfEsp, self).setUp()
92 self.tun_if = self.pg0
96 p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
97 p.scapy_tun_spi, p.crypt_algo_vpp_id,
98 p.crypt_key, p.crypt_key,
99 p.auth_algo_vpp_id, p.auth_key,
101 p.tun_if.add_vpp_config()
103 p.tun_if.config_ip4()
104 p.tun_if.config_ip6()
105 config_tun_params(p, self.encryption_type, p.tun_if)
107 r = VppIpRoute(self, p.remote_tun_if_host, 32,
108 [VppRoutePath(p.tun_if.remote_ip4,
111 r = VppIpRoute(self, p.remote_tun_if_host6, 128,
112 [VppRoutePath(p.tun_if.remote_ip6,
114 proto=DpoProto.DPO_PROTO_IP6)])
118 super(TemplateIpsec4TunIfEsp, self).tearDown()
121 class TemplateIpsec4TunIfEspUdp(TemplateIpsec):
122 """ IPsec UDP tunnel interface tests """
124 tun4_encrypt_node_name = "esp4-encrypt-tun"
125 tun4_decrypt_node_name = "esp4-decrypt-tun"
126 encryption_type = ESP
130 super(TemplateIpsec4TunIfEspUdp, cls).setUpClass()
133 def tearDownClass(cls):
134 super(TemplateIpsec4TunIfEspUdp, cls).tearDownClass()
137 super(TemplateIpsec4TunIfEspUdp, self).setUp()
139 self.tun_if = self.pg0
142 p.flags = (VppEnum.vl_api_ipsec_sad_flags_t.
143 IPSEC_API_SAD_FLAG_UDP_ENCAP)
144 p.nat_header = UDP(sport=5454, dport=4500)
146 p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
147 p.scapy_tun_spi, p.crypt_algo_vpp_id,
148 p.crypt_key, p.crypt_key,
149 p.auth_algo_vpp_id, p.auth_key,
150 p.auth_key, udp_encap=True)
151 p.tun_if.add_vpp_config()
153 p.tun_if.config_ip4()
154 p.tun_if.config_ip6()
155 config_tun_params(p, self.encryption_type, p.tun_if)
157 r = VppIpRoute(self, p.remote_tun_if_host, 32,
158 [VppRoutePath(p.tun_if.remote_ip4,
161 r = VppIpRoute(self, p.remote_tun_if_host6, 128,
162 [VppRoutePath(p.tun_if.remote_ip6,
164 proto=DpoProto.DPO_PROTO_IP6)])
168 super(TemplateIpsec4TunIfEspUdp, self).tearDown()
171 class TestIpsec4TunIfEsp1(TemplateIpsec4TunIfEsp, IpsecTun4Tests):
172 """ Ipsec ESP - TUN tests """
173 tun4_encrypt_node_name = "esp4-encrypt-tun"
174 tun4_decrypt_node_name = "esp4-decrypt-tun"
176 def test_tun_basic64(self):
177 """ ipsec 6o4 tunnel basic test """
178 self.tun4_encrypt_node_name = "esp4-encrypt-tun"
180 self.verify_tun_64(self.params[socket.AF_INET], count=1)
182 def test_tun_burst64(self):
183 """ ipsec 6o4 tunnel basic test """
184 self.tun4_encrypt_node_name = "esp4-encrypt-tun"
186 self.verify_tun_64(self.params[socket.AF_INET], count=257)
188 def test_tun_basic_frag44(self):
189 """ ipsec 4o4 tunnel frag basic test """
190 self.tun4_encrypt_node_name = "esp4-encrypt-tun"
194 self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index,
196 self.verify_tun_44(self.params[socket.AF_INET],
197 count=1, payload_size=1800, n_rx=2)
198 self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index,
202 class TestIpsec4TunIfEspUdp(TemplateIpsec4TunIfEspUdp, IpsecTun4Tests):
203 """ Ipsec ESP UDP tests """
205 tun4_input_node = "ipsec4-tun-input"
207 def test_keepalive(self):
208 """ IPSEC NAT Keepalive """
209 self.verify_keepalive(self.ipv4_params)
212 class TestIpsec4TunIfEsp2(TemplateIpsec4TunIfEsp, IpsecTcpTests):
213 """ Ipsec ESP - TCP tests """
217 class TemplateIpsec6TunIfEsp(TemplateIpsec):
218 """ IPsec tunnel interface tests """
220 encryption_type = ESP
223 super(TemplateIpsec6TunIfEsp, self).setUp()
225 self.tun_if = self.pg0
228 p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
229 p.scapy_tun_spi, p.crypt_algo_vpp_id,
230 p.crypt_key, p.crypt_key,
231 p.auth_algo_vpp_id, p.auth_key,
232 p.auth_key, is_ip6=True)
233 p.tun_if.add_vpp_config()
235 p.tun_if.config_ip6()
236 p.tun_if.config_ip4()
237 config_tun_params(p, self.encryption_type, p.tun_if)
239 r = VppIpRoute(self, p.remote_tun_if_host, 128,
240 [VppRoutePath(p.tun_if.remote_ip6,
242 proto=DpoProto.DPO_PROTO_IP6)])
244 r = VppIpRoute(self, p.remote_tun_if_host4, 32,
245 [VppRoutePath(p.tun_if.remote_ip4,
250 super(TemplateIpsec6TunIfEsp, self).tearDown()
253 class TestIpsec6TunIfEsp1(TemplateIpsec6TunIfEsp,
255 """ Ipsec ESP - TUN tests """
256 tun6_encrypt_node_name = "esp6-encrypt-tun"
257 tun6_decrypt_node_name = "esp6-decrypt-tun"
259 def test_tun_basic46(self):
260 """ ipsec 4o6 tunnel basic test """
261 self.tun6_encrypt_node_name = "esp6-encrypt-tun"
262 self.verify_tun_46(self.params[socket.AF_INET6], count=1)
264 def test_tun_burst46(self):
265 """ ipsec 4o6 tunnel burst test """
266 self.tun6_encrypt_node_name = "esp6-encrypt-tun"
267 self.verify_tun_46(self.params[socket.AF_INET6], count=257)
270 class TestIpsec6TunIfEspHandoff(TemplateIpsec6TunIfEsp,
271 IpsecTun6HandoffTests):
272 """ Ipsec ESP 6 Handoff tests """
273 tun6_encrypt_node_name = "esp6-encrypt-tun"
274 tun6_decrypt_node_name = "esp6-decrypt-tun"
277 class TestIpsec4TunIfEspHandoff(TemplateIpsec4TunIfEsp,
278 IpsecTun4HandoffTests):
279 """ Ipsec ESP 4 Handoff tests """
280 tun4_encrypt_node_name = "esp4-encrypt-tun"
281 tun4_decrypt_node_name = "esp4-decrypt-tun"
284 class TestIpsec4MultiTunIfEsp(TemplateIpsec, IpsecTun4):
285 """ IPsec IPv4 Multi Tunnel interface """
287 encryption_type = ESP
288 tun4_encrypt_node_name = "esp4-encrypt-tun"
289 tun4_decrypt_node_name = "esp4-decrypt-tun"
292 super(TestIpsec4MultiTunIfEsp, self).setUp()
294 self.tun_if = self.pg0
296 self.multi_params = []
297 self.pg0.generate_remote_hosts(10)
298 self.pg0.configure_ipv4_neighbors()
301 p = copy.copy(self.ipv4_params)
303 p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
304 p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
305 p.scapy_tun_spi = p.scapy_tun_spi + ii
306 p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
307 p.vpp_tun_spi = p.vpp_tun_spi + ii
309 p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
310 p.scapy_tra_spi = p.scapy_tra_spi + ii
311 p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
312 p.vpp_tra_spi = p.vpp_tra_spi + ii
313 p.tun_dst = self.pg0.remote_hosts[ii].ip4
315 p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
318 p.crypt_key, p.crypt_key,
319 p.auth_algo_vpp_id, p.auth_key,
322 p.tun_if.add_vpp_config()
324 p.tun_if.config_ip4()
325 config_tun_params(p, self.encryption_type, p.tun_if)
326 self.multi_params.append(p)
328 VppIpRoute(self, p.remote_tun_if_host, 32,
329 [VppRoutePath(p.tun_if.remote_ip4,
330 0xffffffff)]).add_vpp_config()
333 super(TestIpsec4MultiTunIfEsp, self).tearDown()
335 def test_tun_44(self):
336 """Multiple IPSEC tunnel interfaces """
337 for p in self.multi_params:
338 self.verify_tun_44(p, count=127)
339 c = p.tun_if.get_rx_stats()
340 self.assertEqual(c['packets'], 127)
341 c = p.tun_if.get_tx_stats()
342 self.assertEqual(c['packets'], 127)
344 def test_tun_rr_44(self):
345 """ Round-robin packets acrros multiple interface """
347 for p in self.multi_params:
348 tx = tx + self.gen_encrypt_pkts(p, p.scapy_tun_sa, self.tun_if,
349 src=p.remote_tun_if_host,
350 dst=self.pg1.remote_ip4)
351 rxs = self.send_and_expect(self.tun_if, tx, self.pg1)
353 for rx, p in zip(rxs, self.multi_params):
354 self.verify_decrypted(p, [rx])
357 for p in self.multi_params:
358 tx = tx + self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
359 dst=p.remote_tun_if_host)
360 rxs = self.send_and_expect(self.pg1, tx, self.tun_if)
362 for rx, p in zip(rxs, self.multi_params):
363 self.verify_encrypted(p, p.vpp_tun_sa, [rx])
366 class TestIpsec4TunIfEspAll(TemplateIpsec, IpsecTun4):
367 """ IPsec IPv4 Tunnel interface all Algos """
369 encryption_type = ESP
370 tun4_encrypt_node_name = "esp4-encrypt-tun"
371 tun4_decrypt_node_name = "esp4-decrypt-tun"
373 def config_network(self, p):
375 p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
378 p.crypt_key, p.crypt_key,
379 p.auth_algo_vpp_id, p.auth_key,
382 p.tun_if.add_vpp_config()
384 p.tun_if.config_ip4()
385 config_tun_params(p, self.encryption_type, p.tun_if)
386 self.logger.info(self.vapi.cli("sh ipsec sa 0"))
387 self.logger.info(self.vapi.cli("sh ipsec sa 1"))
389 p.route = VppIpRoute(self, p.remote_tun_if_host, 32,
390 [VppRoutePath(p.tun_if.remote_ip4,
392 p.route.add_vpp_config()
394 def unconfig_network(self, p):
395 p.tun_if.unconfig_ip4()
396 p.tun_if.remove_vpp_config()
397 p.route.remove_vpp_config()
400 super(TestIpsec4TunIfEspAll, self).setUp()
402 self.tun_if = self.pg0
405 super(TestIpsec4TunIfEspAll, self).tearDown()
409 # change the key and the SPI
411 p.crypt_key = b'X' + p.crypt_key[1:]
413 p.scapy_tun_sa_id += 1
416 p.tun_if.local_spi = p.vpp_tun_spi
417 p.tun_if.remote_spi = p.scapy_tun_spi
419 config_tun_params(p, self.encryption_type, p.tun_if)
421 p.tun_sa_in = VppIpsecSA(self,
428 self.vpp_esp_protocol,
431 p.tun_sa_out = VppIpsecSA(self,
438 self.vpp_esp_protocol,
441 p.tun_sa_in.add_vpp_config()
442 p.tun_sa_out.add_vpp_config()
444 self.vapi.ipsec_tunnel_if_set_sa(sw_if_index=p.tun_if.sw_if_index,
445 sa_id=p.tun_sa_in.id,
447 self.vapi.ipsec_tunnel_if_set_sa(sw_if_index=p.tun_if.sw_if_index,
448 sa_id=p.tun_sa_out.id,
450 self.logger.info(self.vapi.cli("sh ipsec sa"))
452 def test_tun_44(self):
453 """IPSEC tunnel all algos """
455 # foreach VPP crypto engine
456 engines = ["ia32", "ipsecmb", "openssl"]
458 # foreach crypto algorithm
459 algos = [{'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
460 IPSEC_API_CRYPTO_ALG_AES_GCM_128),
461 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
462 IPSEC_API_INTEG_ALG_NONE),
463 'scapy-crypto': "AES-GCM",
464 'scapy-integ': "NULL",
465 'key': b"JPjyOWBeVEQiMe7h",
467 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
468 IPSEC_API_CRYPTO_ALG_AES_GCM_192),
469 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
470 IPSEC_API_INTEG_ALG_NONE),
471 'scapy-crypto': "AES-GCM",
472 'scapy-integ': "NULL",
473 'key': b"JPjyOWBeVEQiMe7hJPjyOWBe",
475 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
476 IPSEC_API_CRYPTO_ALG_AES_GCM_256),
477 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
478 IPSEC_API_INTEG_ALG_NONE),
479 'scapy-crypto': "AES-GCM",
480 'scapy-integ': "NULL",
481 'key': b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h",
483 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
484 IPSEC_API_CRYPTO_ALG_AES_CBC_128),
485 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
486 IPSEC_API_INTEG_ALG_SHA1_96),
487 'scapy-crypto': "AES-CBC",
488 'scapy-integ': "HMAC-SHA1-96",
490 'key': b"JPjyOWBeVEQiMe7h"},
491 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
492 IPSEC_API_CRYPTO_ALG_AES_CBC_192),
493 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
494 IPSEC_API_INTEG_ALG_SHA1_96),
495 'scapy-crypto': "AES-CBC",
496 'scapy-integ': "HMAC-SHA1-96",
498 'key': b"JPjyOWBeVEQiMe7hJPjyOWBe"},
499 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
500 IPSEC_API_CRYPTO_ALG_AES_CBC_256),
501 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
502 IPSEC_API_INTEG_ALG_SHA1_96),
503 'scapy-crypto': "AES-CBC",
504 'scapy-integ': "HMAC-SHA1-96",
506 'key': b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"},
507 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
508 IPSEC_API_CRYPTO_ALG_NONE),
509 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
510 IPSEC_API_INTEG_ALG_SHA1_96),
511 'scapy-crypto': "NULL",
512 'scapy-integ': "HMAC-SHA1-96",
514 'key': b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"}]
516 for engine in engines:
517 self.vapi.cli("set crypto handler all %s" % engine)
520 # loop through each of the algorithms
523 # with self.subTest(algo=algo['scapy']):
525 p = copy.copy(self.ipv4_params)
526 p.auth_algo_vpp_id = algo['vpp-integ']
527 p.crypt_algo_vpp_id = algo['vpp-crypto']
528 p.crypt_algo = algo['scapy-crypto']
529 p.auth_algo = algo['scapy-integ']
530 p.crypt_key = algo['key']
531 p.salt = algo['salt']
533 self.config_network(p)
535 self.verify_tun_44(p, count=127)
536 c = p.tun_if.get_rx_stats()
537 self.assertEqual(c['packets'], 127)
538 c = p.tun_if.get_tx_stats()
539 self.assertEqual(c['packets'], 127)
545 self.verify_tun_44(p, count=127)
547 self.unconfig_network(p)
548 p.tun_sa_out.remove_vpp_config()
549 p.tun_sa_in.remove_vpp_config()
552 class TestIpsec4TunIfEspNoAlgo(TemplateIpsec, IpsecTun4):
553 """ IPsec IPv4 Tunnel interface all Algos """
555 encryption_type = ESP
556 tun4_encrypt_node_name = "esp4-encrypt-tun"
557 tun4_decrypt_node_name = "esp4-decrypt-tun"
559 def config_network(self, p):
561 p.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
562 IPSEC_API_INTEG_ALG_NONE)
566 p.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
567 IPSEC_API_CRYPTO_ALG_NONE)
568 p.crypt_algo = 'NULL'
571 p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
574 p.crypt_key, p.crypt_key,
575 p.auth_algo_vpp_id, p.auth_key,
578 p.tun_if.add_vpp_config()
580 p.tun_if.config_ip4()
581 config_tun_params(p, self.encryption_type, p.tun_if)
582 self.logger.info(self.vapi.cli("sh ipsec sa 0"))
583 self.logger.info(self.vapi.cli("sh ipsec sa 1"))
585 p.route = VppIpRoute(self, p.remote_tun_if_host, 32,
586 [VppRoutePath(p.tun_if.remote_ip4,
588 p.route.add_vpp_config()
590 def unconfig_network(self, p):
591 p.tun_if.unconfig_ip4()
592 p.tun_if.remove_vpp_config()
593 p.route.remove_vpp_config()
596 super(TestIpsec4TunIfEspNoAlgo, self).setUp()
598 self.tun_if = self.pg0
601 super(TestIpsec4TunIfEspNoAlgo, self).tearDown()
603 def test_tun_44(self):
606 self.config_network(p)
608 tx = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
609 dst=p.remote_tun_if_host)
610 self.send_and_assert_no_replies(self.pg1, tx)
612 self.unconfig_network(p)
615 class TestIpsec6MultiTunIfEsp(TemplateIpsec, IpsecTun6):
616 """ IPsec IPv6 Multi Tunnel interface """
618 encryption_type = ESP
619 tun6_encrypt_node_name = "esp6-encrypt-tun"
620 tun6_decrypt_node_name = "esp6-decrypt-tun"
623 super(TestIpsec6MultiTunIfEsp, self).setUp()
625 self.tun_if = self.pg0
627 self.multi_params = []
628 self.pg0.generate_remote_hosts(10)
629 self.pg0.configure_ipv6_neighbors()
632 p = copy.copy(self.ipv6_params)
634 p.remote_tun_if_host = "1111::%d" % (ii + 1)
635 p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
636 p.scapy_tun_spi = p.scapy_tun_spi + ii
637 p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
638 p.vpp_tun_spi = p.vpp_tun_spi + ii
640 p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
641 p.scapy_tra_spi = p.scapy_tra_spi + ii
642 p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
643 p.vpp_tra_spi = p.vpp_tra_spi + ii
645 p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
648 p.crypt_key, p.crypt_key,
649 p.auth_algo_vpp_id, p.auth_key,
650 p.auth_key, is_ip6=True,
651 dst=self.pg0.remote_hosts[ii].ip6)
652 p.tun_if.add_vpp_config()
654 p.tun_if.config_ip6()
655 config_tun_params(p, self.encryption_type, p.tun_if)
656 self.multi_params.append(p)
658 r = VppIpRoute(self, p.remote_tun_if_host, 128,
659 [VppRoutePath(p.tun_if.remote_ip6,
661 proto=DpoProto.DPO_PROTO_IP6)])
665 super(TestIpsec6MultiTunIfEsp, self).tearDown()
667 def test_tun_66(self):
668 """Multiple IPSEC tunnel interfaces """
669 for p in self.multi_params:
670 self.verify_tun_66(p, count=127)
671 c = p.tun_if.get_rx_stats()
672 self.assertEqual(c['packets'], 127)
673 c = p.tun_if.get_tx_stats()
674 self.assertEqual(c['packets'], 127)
677 class TestIpsecGreTebIfEsp(TemplateIpsec,
679 """ Ipsec GRE TEB ESP - TUN tests """
680 tun4_encrypt_node_name = "esp4-encrypt-tun"
681 tun4_decrypt_node_name = "esp4-decrypt-tun"
682 encryption_type = ESP
683 omac = "00:11:22:33:44:55"
685 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
687 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
688 sa.encrypt(IP(src=self.pg0.remote_ip4,
689 dst=self.pg0.local_ip4) /
691 Ether(dst=self.omac) /
692 IP(src="1.1.1.1", dst="1.1.1.2") /
693 UDP(sport=1144, dport=2233) /
694 Raw(b'X' * payload_size))
695 for i in range(count)]
697 def gen_pkts(self, sw_intf, src, dst, count=1,
699 return [Ether(dst=self.omac) /
700 IP(src="1.1.1.1", dst="1.1.1.2") /
701 UDP(sport=1144, dport=2233) /
702 Raw(b'X' * payload_size)
703 for i in range(count)]
705 def verify_decrypted(self, p, rxs):
707 self.assert_equal(rx[Ether].dst, self.omac)
708 self.assert_equal(rx[IP].dst, "1.1.1.2")
710 def verify_encrypted(self, p, sa, rxs):
713 pkt = sa.decrypt(rx[IP])
714 if not pkt.haslayer(IP):
715 pkt = IP(pkt[Raw].load)
716 self.assert_packet_checksums_valid(pkt)
717 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
718 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
719 self.assertTrue(pkt.haslayer(GRE))
721 self.assertEqual(e[Ether].dst, self.omac)
722 self.assertEqual(e[IP].dst, "1.1.1.2")
723 except (IndexError, AssertionError):
724 self.logger.debug(ppp("Unexpected packet:", rx))
726 self.logger.debug(ppp("Decrypted packet:", pkt))
732 super(TestIpsecGreTebIfEsp, self).setUp()
734 self.tun_if = self.pg0
738 bd1 = VppBridgeDomain(self, 1)
741 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
742 p.auth_algo_vpp_id, p.auth_key,
743 p.crypt_algo_vpp_id, p.crypt_key,
744 self.vpp_esp_protocol,
747 p.tun_sa_out.add_vpp_config()
749 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
750 p.auth_algo_vpp_id, p.auth_key,
751 p.crypt_algo_vpp_id, p.crypt_key,
752 self.vpp_esp_protocol,
755 p.tun_sa_in.add_vpp_config()
757 p.tun_if = VppGreInterface(self,
760 type=(VppEnum.vl_api_gre_tunnel_type_t.
761 GRE_API_TUNNEL_TYPE_TEB))
762 p.tun_if.add_vpp_config()
764 p.tun_protect = VppIpsecTunProtect(self,
769 p.tun_protect.add_vpp_config()
772 p.tun_if.config_ip4()
773 config_tun_params(p, self.encryption_type, p.tun_if)
775 VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
776 VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
778 self.vapi.cli("clear ipsec sa")
779 self.vapi.cli("sh adj")
780 self.vapi.cli("sh ipsec tun")
784 p.tun_if.unconfig_ip4()
785 super(TestIpsecGreTebIfEsp, self).tearDown()
788 class TestIpsecGreTebVlanIfEsp(TemplateIpsec,
790 """ Ipsec GRE TEB ESP - TUN tests """
791 tun4_encrypt_node_name = "esp4-encrypt-tun"
792 tun4_decrypt_node_name = "esp4-decrypt-tun"
793 encryption_type = ESP
794 omac = "00:11:22:33:44:55"
796 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
798 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
799 sa.encrypt(IP(src=self.pg0.remote_ip4,
800 dst=self.pg0.local_ip4) /
802 Ether(dst=self.omac) /
803 IP(src="1.1.1.1", dst="1.1.1.2") /
804 UDP(sport=1144, dport=2233) /
805 Raw(b'X' * payload_size))
806 for i in range(count)]
808 def gen_pkts(self, sw_intf, src, dst, count=1,
810 return [Ether(dst=self.omac) /
812 IP(src="1.1.1.1", dst="1.1.1.2") /
813 UDP(sport=1144, dport=2233) /
814 Raw(b'X' * payload_size)
815 for i in range(count)]
817 def verify_decrypted(self, p, rxs):
819 self.assert_equal(rx[Ether].dst, self.omac)
820 self.assert_equal(rx[Dot1Q].vlan, 11)
821 self.assert_equal(rx[IP].dst, "1.1.1.2")
823 def verify_encrypted(self, p, sa, rxs):
826 pkt = sa.decrypt(rx[IP])
827 if not pkt.haslayer(IP):
828 pkt = IP(pkt[Raw].load)
829 self.assert_packet_checksums_valid(pkt)
830 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
831 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
832 self.assertTrue(pkt.haslayer(GRE))
834 self.assertEqual(e[Ether].dst, self.omac)
835 self.assertFalse(e.haslayer(Dot1Q))
836 self.assertEqual(e[IP].dst, "1.1.1.2")
837 except (IndexError, AssertionError):
838 self.logger.debug(ppp("Unexpected packet:", rx))
840 self.logger.debug(ppp("Decrypted packet:", pkt))
846 super(TestIpsecGreTebVlanIfEsp, self).setUp()
848 self.tun_if = self.pg0
852 bd1 = VppBridgeDomain(self, 1)
855 self.pg1_11 = VppDot1QSubint(self, self.pg1, 11)
856 self.vapi.l2_interface_vlan_tag_rewrite(
857 sw_if_index=self.pg1_11.sw_if_index, vtr_op=L2_VTR_OP.L2_POP_1,
859 self.pg1_11.admin_up()
861 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
862 p.auth_algo_vpp_id, p.auth_key,
863 p.crypt_algo_vpp_id, p.crypt_key,
864 self.vpp_esp_protocol,
867 p.tun_sa_out.add_vpp_config()
869 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
870 p.auth_algo_vpp_id, p.auth_key,
871 p.crypt_algo_vpp_id, p.crypt_key,
872 self.vpp_esp_protocol,
875 p.tun_sa_in.add_vpp_config()
877 p.tun_if = VppGreInterface(self,
880 type=(VppEnum.vl_api_gre_tunnel_type_t.
881 GRE_API_TUNNEL_TYPE_TEB))
882 p.tun_if.add_vpp_config()
884 p.tun_protect = VppIpsecTunProtect(self,
889 p.tun_protect.add_vpp_config()
892 p.tun_if.config_ip4()
893 config_tun_params(p, self.encryption_type, p.tun_if)
895 VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
896 VppBridgeDomainPort(self, bd1, self.pg1_11).add_vpp_config()
898 self.vapi.cli("clear ipsec sa")
902 p.tun_if.unconfig_ip4()
903 super(TestIpsecGreTebVlanIfEsp, self).tearDown()
904 self.pg1_11.admin_down()
905 self.pg1_11.remove_vpp_config()
908 class TestIpsecGreTebIfEspTra(TemplateIpsec,
910 """ Ipsec GRE TEB ESP - Tra tests """
911 tun4_encrypt_node_name = "esp4-encrypt-tun"
912 tun4_decrypt_node_name = "esp4-decrypt-tun"
913 encryption_type = ESP
914 omac = "00:11:22:33:44:55"
916 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
918 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
919 sa.encrypt(IP(src=self.pg0.remote_ip4,
920 dst=self.pg0.local_ip4) /
922 Ether(dst=self.omac) /
923 IP(src="1.1.1.1", dst="1.1.1.2") /
924 UDP(sport=1144, dport=2233) /
925 Raw(b'X' * payload_size))
926 for i in range(count)]
928 def gen_pkts(self, sw_intf, src, dst, count=1,
930 return [Ether(dst=self.omac) /
931 IP(src="1.1.1.1", dst="1.1.1.2") /
932 UDP(sport=1144, dport=2233) /
933 Raw(b'X' * payload_size)
934 for i in range(count)]
936 def verify_decrypted(self, p, rxs):
938 self.assert_equal(rx[Ether].dst, self.omac)
939 self.assert_equal(rx[IP].dst, "1.1.1.2")
941 def verify_encrypted(self, p, sa, rxs):
944 pkt = sa.decrypt(rx[IP])
945 if not pkt.haslayer(IP):
946 pkt = IP(pkt[Raw].load)
947 self.assert_packet_checksums_valid(pkt)
948 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
949 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
950 self.assertTrue(pkt.haslayer(GRE))
952 self.assertEqual(e[Ether].dst, self.omac)
953 self.assertEqual(e[IP].dst, "1.1.1.2")
954 except (IndexError, AssertionError):
955 self.logger.debug(ppp("Unexpected packet:", rx))
957 self.logger.debug(ppp("Decrypted packet:", pkt))
963 super(TestIpsecGreTebIfEspTra, self).setUp()
965 self.tun_if = self.pg0
969 bd1 = VppBridgeDomain(self, 1)
972 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
973 p.auth_algo_vpp_id, p.auth_key,
974 p.crypt_algo_vpp_id, p.crypt_key,
975 self.vpp_esp_protocol)
976 p.tun_sa_out.add_vpp_config()
978 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
979 p.auth_algo_vpp_id, p.auth_key,
980 p.crypt_algo_vpp_id, p.crypt_key,
981 self.vpp_esp_protocol)
982 p.tun_sa_in.add_vpp_config()
984 p.tun_if = VppGreInterface(self,
987 type=(VppEnum.vl_api_gre_tunnel_type_t.
988 GRE_API_TUNNEL_TYPE_TEB))
989 p.tun_if.add_vpp_config()
991 p.tun_protect = VppIpsecTunProtect(self,
996 p.tun_protect.add_vpp_config()
999 p.tun_if.config_ip4()
1000 config_tra_params(p, self.encryption_type, p.tun_if)
1002 VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
1003 VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
1005 self.vapi.cli("clear ipsec sa")
1008 p = self.ipv4_params
1009 p.tun_if.unconfig_ip4()
1010 super(TestIpsecGreTebIfEspTra, self).tearDown()
1013 class TestIpsecGreIfEsp(TemplateIpsec,
1015 """ Ipsec GRE ESP - TUN tests """
1016 tun4_encrypt_node_name = "esp4-encrypt-tun"
1017 tun4_decrypt_node_name = "esp4-decrypt-tun"
1018 encryption_type = ESP
1020 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1022 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1023 sa.encrypt(IP(src=self.pg0.remote_ip4,
1024 dst=self.pg0.local_ip4) /
1026 IP(src=self.pg1.local_ip4,
1027 dst=self.pg1.remote_ip4) /
1028 UDP(sport=1144, dport=2233) /
1029 Raw(b'X' * payload_size))
1030 for i in range(count)]
1032 def gen_pkts(self, sw_intf, src, dst, count=1,
1034 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1035 IP(src="1.1.1.1", dst="1.1.1.2") /
1036 UDP(sport=1144, dport=2233) /
1037 Raw(b'X' * payload_size)
1038 for i in range(count)]
1040 def verify_decrypted(self, p, rxs):
1042 self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1043 self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1045 def verify_encrypted(self, p, sa, rxs):
1048 pkt = sa.decrypt(rx[IP])
1049 if not pkt.haslayer(IP):
1050 pkt = IP(pkt[Raw].load)
1051 self.assert_packet_checksums_valid(pkt)
1052 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1053 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1054 self.assertTrue(pkt.haslayer(GRE))
1056 self.assertEqual(e[IP].dst, "1.1.1.2")
1057 except (IndexError, AssertionError):
1058 self.logger.debug(ppp("Unexpected packet:", rx))
1060 self.logger.debug(ppp("Decrypted packet:", pkt))
1066 super(TestIpsecGreIfEsp, self).setUp()
1068 self.tun_if = self.pg0
1070 p = self.ipv4_params
1072 bd1 = VppBridgeDomain(self, 1)
1073 bd1.add_vpp_config()
1075 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1076 p.auth_algo_vpp_id, p.auth_key,
1077 p.crypt_algo_vpp_id, p.crypt_key,
1078 self.vpp_esp_protocol,
1080 self.pg0.remote_ip4)
1081 p.tun_sa_out.add_vpp_config()
1083 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1084 p.auth_algo_vpp_id, p.auth_key,
1085 p.crypt_algo_vpp_id, p.crypt_key,
1086 self.vpp_esp_protocol,
1087 self.pg0.remote_ip4,
1089 p.tun_sa_in.add_vpp_config()
1091 p.tun_if = VppGreInterface(self,
1093 self.pg0.remote_ip4)
1094 p.tun_if.add_vpp_config()
1096 p.tun_protect = VppIpsecTunProtect(self,
1100 p.tun_protect.add_vpp_config()
1103 p.tun_if.config_ip4()
1104 config_tun_params(p, self.encryption_type, p.tun_if)
1106 VppIpRoute(self, "1.1.1.2", 32,
1107 [VppRoutePath(p.tun_if.remote_ip4,
1108 0xffffffff)]).add_vpp_config()
1111 p = self.ipv4_params
1112 p.tun_if.unconfig_ip4()
1113 super(TestIpsecGreIfEsp, self).tearDown()
1116 class TestIpsecGreIfEspTra(TemplateIpsec,
1118 """ Ipsec GRE ESP - TRA tests """
1119 tun4_encrypt_node_name = "esp4-encrypt-tun"
1120 tun4_decrypt_node_name = "esp4-decrypt-tun"
1121 encryption_type = ESP
1123 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1125 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1126 sa.encrypt(IP(src=self.pg0.remote_ip4,
1127 dst=self.pg0.local_ip4) /
1129 IP(src=self.pg1.local_ip4,
1130 dst=self.pg1.remote_ip4) /
1131 UDP(sport=1144, dport=2233) /
1132 Raw(b'X' * payload_size))
1133 for i in range(count)]
1135 def gen_encrypt_non_ip_pkts(self, sa, sw_intf, src, dst, count=1,
1137 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1138 sa.encrypt(IP(src=self.pg0.remote_ip4,
1139 dst=self.pg0.local_ip4) /
1141 UDP(sport=1144, dport=2233) /
1142 Raw(b'X' * payload_size))
1143 for i in range(count)]
1145 def gen_pkts(self, sw_intf, src, dst, count=1,
1147 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1148 IP(src="1.1.1.1", dst="1.1.1.2") /
1149 UDP(sport=1144, dport=2233) /
1150 Raw(b'X' * payload_size)
1151 for i in range(count)]
1153 def verify_decrypted(self, p, rxs):
1155 self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1156 self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1158 def verify_encrypted(self, p, sa, rxs):
1161 pkt = sa.decrypt(rx[IP])
1162 if not pkt.haslayer(IP):
1163 pkt = IP(pkt[Raw].load)
1164 self.assert_packet_checksums_valid(pkt)
1165 self.assertTrue(pkt.haslayer(GRE))
1167 self.assertEqual(e[IP].dst, "1.1.1.2")
1168 except (IndexError, AssertionError):
1169 self.logger.debug(ppp("Unexpected packet:", rx))
1171 self.logger.debug(ppp("Decrypted packet:", pkt))
1177 super(TestIpsecGreIfEspTra, self).setUp()
1179 self.tun_if = self.pg0
1181 p = self.ipv4_params
1183 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1184 p.auth_algo_vpp_id, p.auth_key,
1185 p.crypt_algo_vpp_id, p.crypt_key,
1186 self.vpp_esp_protocol)
1187 p.tun_sa_out.add_vpp_config()
1189 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1190 p.auth_algo_vpp_id, p.auth_key,
1191 p.crypt_algo_vpp_id, p.crypt_key,
1192 self.vpp_esp_protocol)
1193 p.tun_sa_in.add_vpp_config()
1195 p.tun_if = VppGreInterface(self,
1197 self.pg0.remote_ip4)
1198 p.tun_if.add_vpp_config()
1200 p.tun_protect = VppIpsecTunProtect(self,
1204 p.tun_protect.add_vpp_config()
1207 p.tun_if.config_ip4()
1208 config_tra_params(p, self.encryption_type, p.tun_if)
1210 VppIpRoute(self, "1.1.1.2", 32,
1211 [VppRoutePath(p.tun_if.remote_ip4,
1212 0xffffffff)]).add_vpp_config()
1215 p = self.ipv4_params
1216 p.tun_if.unconfig_ip4()
1217 super(TestIpsecGreIfEspTra, self).tearDown()
1219 def test_gre_non_ip(self):
1220 p = self.ipv4_params
1221 tx = self.gen_encrypt_non_ip_pkts(p.scapy_tun_sa, self.tun_if,
1222 src=p.remote_tun_if_host,
1223 dst=self.pg1.remote_ip6)
1224 self.send_and_assert_no_replies(self.tun_if, tx)
1225 node_name = ('/err/%s/unsupported payload' %
1226 self.tun4_decrypt_node_name)
1227 self.assertEqual(1, self.statistics.get_err_counter(node_name))
1230 class TestIpsecGre6IfEspTra(TemplateIpsec,
1232 """ Ipsec GRE ESP - TRA tests """
1233 tun6_encrypt_node_name = "esp6-encrypt-tun"
1234 tun6_decrypt_node_name = "esp6-decrypt-tun"
1235 encryption_type = ESP
1237 def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1,
1239 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1240 sa.encrypt(IPv6(src=self.pg0.remote_ip6,
1241 dst=self.pg0.local_ip6) /
1243 IPv6(src=self.pg1.local_ip6,
1244 dst=self.pg1.remote_ip6) /
1245 UDP(sport=1144, dport=2233) /
1246 Raw(b'X' * payload_size))
1247 for i in range(count)]
1249 def gen_pkts6(self, sw_intf, src, dst, count=1,
1251 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1252 IPv6(src="1::1", dst="1::2") /
1253 UDP(sport=1144, dport=2233) /
1254 Raw(b'X' * payload_size)
1255 for i in range(count)]
1257 def verify_decrypted6(self, p, rxs):
1259 self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1260 self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
1262 def verify_encrypted6(self, p, sa, rxs):
1265 pkt = sa.decrypt(rx[IPv6])
1266 if not pkt.haslayer(IPv6):
1267 pkt = IPv6(pkt[Raw].load)
1268 self.assert_packet_checksums_valid(pkt)
1269 self.assertTrue(pkt.haslayer(GRE))
1271 self.assertEqual(e[IPv6].dst, "1::2")
1272 except (IndexError, AssertionError):
1273 self.logger.debug(ppp("Unexpected packet:", rx))
1275 self.logger.debug(ppp("Decrypted packet:", pkt))
1281 super(TestIpsecGre6IfEspTra, self).setUp()
1283 self.tun_if = self.pg0
1285 p = self.ipv6_params
1287 bd1 = VppBridgeDomain(self, 1)
1288 bd1.add_vpp_config()
1290 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1291 p.auth_algo_vpp_id, p.auth_key,
1292 p.crypt_algo_vpp_id, p.crypt_key,
1293 self.vpp_esp_protocol)
1294 p.tun_sa_out.add_vpp_config()
1296 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1297 p.auth_algo_vpp_id, p.auth_key,
1298 p.crypt_algo_vpp_id, p.crypt_key,
1299 self.vpp_esp_protocol)
1300 p.tun_sa_in.add_vpp_config()
1302 p.tun_if = VppGreInterface(self,
1304 self.pg0.remote_ip6)
1305 p.tun_if.add_vpp_config()
1307 p.tun_protect = VppIpsecTunProtect(self,
1311 p.tun_protect.add_vpp_config()
1314 p.tun_if.config_ip6()
1315 config_tra_params(p, self.encryption_type, p.tun_if)
1317 r = VppIpRoute(self, "1::2", 128,
1318 [VppRoutePath(p.tun_if.remote_ip6,
1320 proto=DpoProto.DPO_PROTO_IP6)])
1324 p = self.ipv6_params
1325 p.tun_if.unconfig_ip6()
1326 super(TestIpsecGre6IfEspTra, self).tearDown()
1329 class TestIpsecMGreIfEspTra4(TemplateIpsec, IpsecTun4):
1330 """ Ipsec mGRE ESP v4 TRA tests """
1331 tun4_encrypt_node_name = "esp4-encrypt-tun"
1332 tun4_decrypt_node_name = "esp4-decrypt-tun"
1333 encryption_type = ESP
1335 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1337 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1338 sa.encrypt(IP(src=p.tun_dst,
1339 dst=self.pg0.local_ip4) /
1341 IP(src=self.pg1.local_ip4,
1342 dst=self.pg1.remote_ip4) /
1343 UDP(sport=1144, dport=2233) /
1344 Raw(b'X' * payload_size))
1345 for i in range(count)]
1347 def gen_pkts(self, sw_intf, src, dst, count=1,
1349 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1350 IP(src="1.1.1.1", dst=dst) /
1351 UDP(sport=1144, dport=2233) /
1352 Raw(b'X' * payload_size)
1353 for i in range(count)]
1355 def verify_decrypted(self, p, rxs):
1357 self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1358 self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1360 def verify_encrypted(self, p, sa, rxs):
1363 pkt = sa.decrypt(rx[IP])
1364 if not pkt.haslayer(IP):
1365 pkt = IP(pkt[Raw].load)
1366 self.assert_packet_checksums_valid(pkt)
1367 self.assertTrue(pkt.haslayer(GRE))
1369 self.assertEqual(e[IP].dst, p.remote_tun_if_host)
1370 except (IndexError, AssertionError):
1371 self.logger.debug(ppp("Unexpected packet:", rx))
1373 self.logger.debug(ppp("Decrypted packet:", pkt))
1379 super(TestIpsecMGreIfEspTra4, self).setUp()
1382 self.tun_if = self.pg0
1383 p = self.ipv4_params
1384 p.tun_if = VppGreInterface(self,
1387 mode=(VppEnum.vl_api_tunnel_mode_t.
1388 TUNNEL_API_MODE_MP))
1389 p.tun_if.add_vpp_config()
1391 p.tun_if.config_ip4()
1392 p.tun_if.generate_remote_hosts(N_NHS)
1393 self.pg0.generate_remote_hosts(N_NHS)
1394 self.pg0.configure_ipv4_neighbors()
1396 # setup some SAs for several next-hops on the interface
1397 self.multi_params = []
1399 for ii in range(N_NHS):
1400 p = copy.copy(self.ipv4_params)
1402 p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
1403 p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
1404 p.scapy_tun_spi = p.scapy_tun_spi + ii
1405 p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
1406 p.vpp_tun_spi = p.vpp_tun_spi + ii
1408 p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
1409 p.scapy_tra_spi = p.scapy_tra_spi + ii
1410 p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
1411 p.vpp_tra_spi = p.vpp_tra_spi + ii
1412 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1413 p.auth_algo_vpp_id, p.auth_key,
1414 p.crypt_algo_vpp_id, p.crypt_key,
1415 self.vpp_esp_protocol)
1416 p.tun_sa_out.add_vpp_config()
1418 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1419 p.auth_algo_vpp_id, p.auth_key,
1420 p.crypt_algo_vpp_id, p.crypt_key,
1421 self.vpp_esp_protocol)
1422 p.tun_sa_in.add_vpp_config()
1424 p.tun_protect = VppIpsecTunProtect(
1429 nh=p.tun_if.remote_hosts[ii].ip4)
1430 p.tun_protect.add_vpp_config()
1431 config_tra_params(p, self.encryption_type, p.tun_if)
1432 self.multi_params.append(p)
1434 VppIpRoute(self, p.remote_tun_if_host, 32,
1435 [VppRoutePath(p.tun_if.remote_hosts[ii].ip4,
1436 p.tun_if.sw_if_index)]).add_vpp_config()
1438 # in this v4 variant add the teibs after the protect
1439 p.teib = VppTeib(self, p.tun_if,
1440 p.tun_if.remote_hosts[ii].ip4,
1441 self.pg0.remote_hosts[ii].ip4).add_vpp_config()
1442 p.tun_dst = self.pg0.remote_hosts[ii].ip4
1443 self.logger.info(self.vapi.cli("sh ipsec protect-hash"))
1446 p = self.ipv4_params
1447 p.tun_if.unconfig_ip4()
1448 super(TestIpsecMGreIfEspTra4, self).tearDown()
1450 def test_tun_44(self):
1453 for p in self.multi_params:
1454 self.verify_tun_44(p, count=N_PKTS)
1455 p.teib.remove_vpp_config()
1456 self.verify_tun_dropped_44(p, count=N_PKTS)
1457 p.teib.add_vpp_config()
1458 self.verify_tun_44(p, count=N_PKTS)
1461 class TestIpsecMGreIfEspTra6(TemplateIpsec, IpsecTun6):
1462 """ Ipsec mGRE ESP v6 TRA tests """
1463 tun6_encrypt_node_name = "esp6-encrypt-tun"
1464 tun6_decrypt_node_name = "esp6-decrypt-tun"
1465 encryption_type = ESP
1467 def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1,
1469 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1470 sa.encrypt(IPv6(src=p.tun_dst,
1471 dst=self.pg0.local_ip6) /
1473 IPv6(src=self.pg1.local_ip6,
1474 dst=self.pg1.remote_ip6) /
1475 UDP(sport=1144, dport=2233) /
1476 Raw(b'X' * payload_size))
1477 for i in range(count)]
1479 def gen_pkts6(self, sw_intf, src, dst, count=1,
1481 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1482 IPv6(src="1::1", dst=dst) /
1483 UDP(sport=1144, dport=2233) /
1484 Raw(b'X' * payload_size)
1485 for i in range(count)]
1487 def verify_decrypted6(self, p, rxs):
1489 self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1490 self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
1492 def verify_encrypted6(self, p, sa, rxs):
1495 pkt = sa.decrypt(rx[IPv6])
1496 if not pkt.haslayer(IPv6):
1497 pkt = IPv6(pkt[Raw].load)
1498 self.assert_packet_checksums_valid(pkt)
1499 self.assertTrue(pkt.haslayer(GRE))
1501 self.assertEqual(e[IPv6].dst, p.remote_tun_if_host)
1502 except (IndexError, AssertionError):
1503 self.logger.debug(ppp("Unexpected packet:", rx))
1505 self.logger.debug(ppp("Decrypted packet:", pkt))
1511 super(TestIpsecMGreIfEspTra6, self).setUp()
1513 self.vapi.cli("set logging class ipsec level debug")
1516 self.tun_if = self.pg0
1517 p = self.ipv6_params
1518 p.tun_if = VppGreInterface(self,
1521 mode=(VppEnum.vl_api_tunnel_mode_t.
1522 TUNNEL_API_MODE_MP))
1523 p.tun_if.add_vpp_config()
1525 p.tun_if.config_ip6()
1526 p.tun_if.generate_remote_hosts(N_NHS)
1527 self.pg0.generate_remote_hosts(N_NHS)
1528 self.pg0.configure_ipv6_neighbors()
1530 # setup some SAs for several next-hops on the interface
1531 self.multi_params = []
1533 for ii in range(N_NHS):
1534 p = copy.copy(self.ipv6_params)
1536 p.remote_tun_if_host = "1::%d" % (ii + 1)
1537 p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
1538 p.scapy_tun_spi = p.scapy_tun_spi + ii
1539 p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
1540 p.vpp_tun_spi = p.vpp_tun_spi + ii
1542 p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
1543 p.scapy_tra_spi = p.scapy_tra_spi + ii
1544 p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
1545 p.vpp_tra_spi = p.vpp_tra_spi + ii
1546 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1547 p.auth_algo_vpp_id, p.auth_key,
1548 p.crypt_algo_vpp_id, p.crypt_key,
1549 self.vpp_esp_protocol)
1550 p.tun_sa_out.add_vpp_config()
1552 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1553 p.auth_algo_vpp_id, p.auth_key,
1554 p.crypt_algo_vpp_id, p.crypt_key,
1555 self.vpp_esp_protocol)
1556 p.tun_sa_in.add_vpp_config()
1558 # in this v6 variant add the teibs first then the protection
1559 p.tun_dst = self.pg0.remote_hosts[ii].ip6
1560 VppTeib(self, p.tun_if,
1561 p.tun_if.remote_hosts[ii].ip6,
1562 p.tun_dst).add_vpp_config()
1564 p.tun_protect = VppIpsecTunProtect(
1569 nh=p.tun_if.remote_hosts[ii].ip6)
1570 p.tun_protect.add_vpp_config()
1571 config_tra_params(p, self.encryption_type, p.tun_if)
1572 self.multi_params.append(p)
1574 VppIpRoute(self, p.remote_tun_if_host, 128,
1575 [VppRoutePath(p.tun_if.remote_hosts[ii].ip6,
1576 p.tun_if.sw_if_index)]).add_vpp_config()
1577 p.tun_dst = self.pg0.remote_hosts[ii].ip6
1579 self.logger.info(self.vapi.cli("sh log"))
1580 self.logger.info(self.vapi.cli("sh ipsec protect-hash"))
1581 self.logger.info(self.vapi.cli("sh adj 41"))
1584 p = self.ipv6_params
1585 p.tun_if.unconfig_ip6()
1586 super(TestIpsecMGreIfEspTra6, self).tearDown()
1588 def test_tun_66(self):
1590 for p in self.multi_params:
1591 self.verify_tun_66(p, count=63)
1594 class TemplateIpsec4TunProtect(object):
1595 """ IPsec IPv4 Tunnel protect """
1597 encryption_type = ESP
1598 tun4_encrypt_node_name = "esp4-encrypt-tun"
1599 tun4_decrypt_node_name = "esp4-decrypt-tun"
1600 tun4_input_node = "ipsec4-tun-input"
1602 def config_sa_tra(self, p):
1603 config_tun_params(p, self.encryption_type, p.tun_if)
1605 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1606 p.auth_algo_vpp_id, p.auth_key,
1607 p.crypt_algo_vpp_id, p.crypt_key,
1608 self.vpp_esp_protocol,
1610 p.tun_sa_out.add_vpp_config()
1612 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1613 p.auth_algo_vpp_id, p.auth_key,
1614 p.crypt_algo_vpp_id, p.crypt_key,
1615 self.vpp_esp_protocol,
1617 p.tun_sa_in.add_vpp_config()
1619 def config_sa_tun(self, p):
1620 config_tun_params(p, self.encryption_type, p.tun_if)
1622 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1623 p.auth_algo_vpp_id, p.auth_key,
1624 p.crypt_algo_vpp_id, p.crypt_key,
1625 self.vpp_esp_protocol,
1626 self.tun_if.local_addr[p.addr_type],
1627 self.tun_if.remote_addr[p.addr_type],
1629 p.tun_sa_out.add_vpp_config()
1631 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1632 p.auth_algo_vpp_id, p.auth_key,
1633 p.crypt_algo_vpp_id, p.crypt_key,
1634 self.vpp_esp_protocol,
1635 self.tun_if.remote_addr[p.addr_type],
1636 self.tun_if.local_addr[p.addr_type],
1638 p.tun_sa_in.add_vpp_config()
1640 def config_protect(self, p):
1641 p.tun_protect = VppIpsecTunProtect(self,
1645 p.tun_protect.add_vpp_config()
1647 def config_network(self, p):
1648 p.tun_if = VppIpIpTunInterface(self, self.pg0,
1650 self.pg0.remote_ip4)
1651 p.tun_if.add_vpp_config()
1653 p.tun_if.config_ip4()
1654 p.tun_if.config_ip6()
1656 p.route = VppIpRoute(self, p.remote_tun_if_host, 32,
1657 [VppRoutePath(p.tun_if.remote_ip4,
1659 p.route.add_vpp_config()
1660 r = VppIpRoute(self, p.remote_tun_if_host6, 128,
1661 [VppRoutePath(p.tun_if.remote_ip6,
1663 proto=DpoProto.DPO_PROTO_IP6)])
1666 def unconfig_network(self, p):
1667 p.route.remove_vpp_config()
1668 p.tun_if.remove_vpp_config()
1670 def unconfig_protect(self, p):
1671 p.tun_protect.remove_vpp_config()
1673 def unconfig_sa(self, p):
1674 p.tun_sa_out.remove_vpp_config()
1675 p.tun_sa_in.remove_vpp_config()
1678 class TestIpsec4TunProtect(TemplateIpsec,
1679 TemplateIpsec4TunProtect,
1681 """ IPsec IPv4 Tunnel protect - transport mode"""
1684 super(TestIpsec4TunProtect, self).setUp()
1686 self.tun_if = self.pg0
1689 super(TestIpsec4TunProtect, self).tearDown()
1691 def test_tun_44(self):
1692 """IPSEC tunnel protect"""
1694 p = self.ipv4_params
1696 self.config_network(p)
1697 self.config_sa_tra(p)
1698 self.config_protect(p)
1700 self.verify_tun_44(p, count=127)
1701 c = p.tun_if.get_rx_stats()
1702 self.assertEqual(c['packets'], 127)
1703 c = p.tun_if.get_tx_stats()
1704 self.assertEqual(c['packets'], 127)
1706 self.vapi.cli("clear ipsec sa")
1707 self.verify_tun_64(p, count=127)
1708 c = p.tun_if.get_rx_stats()
1709 self.assertEqual(c['packets'], 254)
1710 c = p.tun_if.get_tx_stats()
1711 self.assertEqual(c['packets'], 254)
1713 # rekey - create new SAs and update the tunnel protection
1715 np.crypt_key = b'X' + p.crypt_key[1:]
1716 np.scapy_tun_spi += 100
1717 np.scapy_tun_sa_id += 1
1718 np.vpp_tun_spi += 100
1719 np.vpp_tun_sa_id += 1
1720 np.tun_if.local_spi = p.vpp_tun_spi
1721 np.tun_if.remote_spi = p.scapy_tun_spi
1723 self.config_sa_tra(np)
1724 self.config_protect(np)
1727 self.verify_tun_44(np, count=127)
1728 c = p.tun_if.get_rx_stats()
1729 self.assertEqual(c['packets'], 381)
1730 c = p.tun_if.get_tx_stats()
1731 self.assertEqual(c['packets'], 381)
1734 self.unconfig_protect(np)
1735 self.unconfig_sa(np)
1736 self.unconfig_network(p)
1739 class TestIpsec4TunProtectUdp(TemplateIpsec,
1740 TemplateIpsec4TunProtect,
1742 """ IPsec IPv4 Tunnel protect - transport mode"""
1745 super(TestIpsec4TunProtectUdp, self).setUp()
1747 self.tun_if = self.pg0
1749 p = self.ipv4_params
1750 p.flags = (VppEnum.vl_api_ipsec_sad_flags_t.
1751 IPSEC_API_SAD_FLAG_UDP_ENCAP)
1752 p.nat_header = UDP(sport=5454, dport=4500)
1753 self.config_network(p)
1754 self.config_sa_tra(p)
1755 self.config_protect(p)
1758 p = self.ipv4_params
1759 self.unconfig_protect(p)
1761 self.unconfig_network(p)
1762 super(TestIpsec4TunProtectUdp, self).tearDown()
1764 def test_tun_44(self):
1765 """IPSEC UDP tunnel protect"""
1767 p = self.ipv4_params
1769 self.verify_tun_44(p, count=127)
1770 c = p.tun_if.get_rx_stats()
1771 self.assertEqual(c['packets'], 127)
1772 c = p.tun_if.get_tx_stats()
1773 self.assertEqual(c['packets'], 127)
1775 def test_keepalive(self):
1776 """ IPSEC NAT Keepalive """
1777 self.verify_keepalive(self.ipv4_params)
1780 class TestIpsec4TunProtectTun(TemplateIpsec,
1781 TemplateIpsec4TunProtect,
1783 """ IPsec IPv4 Tunnel protect - tunnel mode"""
1785 encryption_type = ESP
1786 tun4_encrypt_node_name = "esp4-encrypt-tun"
1787 tun4_decrypt_node_name = "esp4-decrypt-tun"
1790 super(TestIpsec4TunProtectTun, self).setUp()
1792 self.tun_if = self.pg0
1795 super(TestIpsec4TunProtectTun, self).tearDown()
1797 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1799 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1800 sa.encrypt(IP(src=sw_intf.remote_ip4,
1801 dst=sw_intf.local_ip4) /
1802 IP(src=src, dst=dst) /
1803 UDP(sport=1144, dport=2233) /
1804 Raw(b'X' * payload_size))
1805 for i in range(count)]
1807 def gen_pkts(self, sw_intf, src, dst, count=1,
1809 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1810 IP(src=src, dst=dst) /
1811 UDP(sport=1144, dport=2233) /
1812 Raw(b'X' * payload_size)
1813 for i in range(count)]
1815 def verify_decrypted(self, p, rxs):
1817 self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1818 self.assert_equal(rx[IP].src, p.remote_tun_if_host)
1819 self.assert_packet_checksums_valid(rx)
1821 def verify_encrypted(self, p, sa, rxs):
1824 pkt = sa.decrypt(rx[IP])
1825 if not pkt.haslayer(IP):
1826 pkt = IP(pkt[Raw].load)
1827 self.assert_packet_checksums_valid(pkt)
1828 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1829 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1830 inner = pkt[IP].payload
1831 self.assertEqual(inner[IP][IP].dst, p.remote_tun_if_host)
1833 except (IndexError, AssertionError):
1834 self.logger.debug(ppp("Unexpected packet:", rx))
1836 self.logger.debug(ppp("Decrypted packet:", pkt))
1841 def test_tun_44(self):
1842 """IPSEC tunnel protect """
1844 p = self.ipv4_params
1846 self.config_network(p)
1847 self.config_sa_tun(p)
1848 self.config_protect(p)
1850 self.verify_tun_44(p, count=127)
1852 c = p.tun_if.get_rx_stats()
1853 self.assertEqual(c['packets'], 127)
1854 c = p.tun_if.get_tx_stats()
1855 self.assertEqual(c['packets'], 127)
1857 # rekey - create new SAs and update the tunnel protection
1859 np.crypt_key = b'X' + p.crypt_key[1:]
1860 np.scapy_tun_spi += 100
1861 np.scapy_tun_sa_id += 1
1862 np.vpp_tun_spi += 100
1863 np.vpp_tun_sa_id += 1
1864 np.tun_if.local_spi = p.vpp_tun_spi
1865 np.tun_if.remote_spi = p.scapy_tun_spi
1867 self.config_sa_tun(np)
1868 self.config_protect(np)
1871 self.verify_tun_44(np, count=127)
1872 c = p.tun_if.get_rx_stats()
1873 self.assertEqual(c['packets'], 254)
1874 c = p.tun_if.get_tx_stats()
1875 self.assertEqual(c['packets'], 254)
1878 self.unconfig_protect(np)
1879 self.unconfig_sa(np)
1880 self.unconfig_network(p)
1883 class TestIpsec4TunProtectTunDrop(TemplateIpsec,
1884 TemplateIpsec4TunProtect,
1886 """ IPsec IPv4 Tunnel protect - tunnel mode - drop"""
1888 encryption_type = ESP
1889 tun4_encrypt_node_name = "esp4-encrypt-tun"
1890 tun4_decrypt_node_name = "esp4-decrypt-tun"
1893 super(TestIpsec4TunProtectTunDrop, self).setUp()
1895 self.tun_if = self.pg0
1898 super(TestIpsec4TunProtectTunDrop, self).tearDown()
1900 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1902 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1903 sa.encrypt(IP(src=sw_intf.remote_ip4,
1905 IP(src=src, dst=dst) /
1906 UDP(sport=1144, dport=2233) /
1907 Raw(b'X' * payload_size))
1908 for i in range(count)]
1910 def test_tun_drop_44(self):
1911 """IPSEC tunnel protect bogus tunnel header """
1913 p = self.ipv4_params
1915 self.config_network(p)
1916 self.config_sa_tun(p)
1917 self.config_protect(p)
1919 tx = self.gen_encrypt_pkts(p, p.scapy_tun_sa, self.tun_if,
1920 src=p.remote_tun_if_host,
1921 dst=self.pg1.remote_ip4,
1923 self.send_and_assert_no_replies(self.tun_if, tx)
1926 self.unconfig_protect(p)
1928 self.unconfig_network(p)
1931 class TemplateIpsec6TunProtect(object):
1932 """ IPsec IPv6 Tunnel protect """
1934 def config_sa_tra(self, p):
1935 config_tun_params(p, self.encryption_type, p.tun_if)
1937 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1938 p.auth_algo_vpp_id, p.auth_key,
1939 p.crypt_algo_vpp_id, p.crypt_key,
1940 self.vpp_esp_protocol)
1941 p.tun_sa_out.add_vpp_config()
1943 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1944 p.auth_algo_vpp_id, p.auth_key,
1945 p.crypt_algo_vpp_id, p.crypt_key,
1946 self.vpp_esp_protocol)
1947 p.tun_sa_in.add_vpp_config()
1949 def config_sa_tun(self, p):
1950 config_tun_params(p, self.encryption_type, p.tun_if)
1952 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1953 p.auth_algo_vpp_id, p.auth_key,
1954 p.crypt_algo_vpp_id, p.crypt_key,
1955 self.vpp_esp_protocol,
1956 self.tun_if.local_addr[p.addr_type],
1957 self.tun_if.remote_addr[p.addr_type])
1958 p.tun_sa_out.add_vpp_config()
1960 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1961 p.auth_algo_vpp_id, p.auth_key,
1962 p.crypt_algo_vpp_id, p.crypt_key,
1963 self.vpp_esp_protocol,
1964 self.tun_if.remote_addr[p.addr_type],
1965 self.tun_if.local_addr[p.addr_type])
1966 p.tun_sa_in.add_vpp_config()
1968 def config_protect(self, p):
1969 p.tun_protect = VppIpsecTunProtect(self,
1973 p.tun_protect.add_vpp_config()
1975 def config_network(self, p):
1976 p.tun_if = VppIpIpTunInterface(self, self.pg0,
1978 self.pg0.remote_ip6)
1979 p.tun_if.add_vpp_config()
1981 p.tun_if.config_ip6()
1982 p.tun_if.config_ip4()
1984 p.route = VppIpRoute(self, p.remote_tun_if_host, 128,
1985 [VppRoutePath(p.tun_if.remote_ip6,
1987 proto=DpoProto.DPO_PROTO_IP6)])
1988 p.route.add_vpp_config()
1989 r = VppIpRoute(self, p.remote_tun_if_host4, 32,
1990 [VppRoutePath(p.tun_if.remote_ip4,
1994 def unconfig_network(self, p):
1995 p.route.remove_vpp_config()
1996 p.tun_if.remove_vpp_config()
1998 def unconfig_protect(self, p):
1999 p.tun_protect.remove_vpp_config()
2001 def unconfig_sa(self, p):
2002 p.tun_sa_out.remove_vpp_config()
2003 p.tun_sa_in.remove_vpp_config()
2006 class TestIpsec6TunProtect(TemplateIpsec,
2007 TemplateIpsec6TunProtect,
2009 """ IPsec IPv6 Tunnel protect - transport mode"""
2011 encryption_type = ESP
2012 tun6_encrypt_node_name = "esp6-encrypt-tun"
2013 tun6_decrypt_node_name = "esp6-decrypt-tun"
2016 super(TestIpsec6TunProtect, self).setUp()
2018 self.tun_if = self.pg0
2021 super(TestIpsec6TunProtect, self).tearDown()
2023 def test_tun_66(self):
2024 """IPSEC tunnel protect 6o6"""
2026 p = self.ipv6_params
2028 self.config_network(p)
2029 self.config_sa_tra(p)
2030 self.config_protect(p)
2032 self.verify_tun_66(p, count=127)
2033 c = p.tun_if.get_rx_stats()
2034 self.assertEqual(c['packets'], 127)
2035 c = p.tun_if.get_tx_stats()
2036 self.assertEqual(c['packets'], 127)
2038 # rekey - create new SAs and update the tunnel protection
2040 np.crypt_key = b'X' + p.crypt_key[1:]
2041 np.scapy_tun_spi += 100
2042 np.scapy_tun_sa_id += 1
2043 np.vpp_tun_spi += 100
2044 np.vpp_tun_sa_id += 1
2045 np.tun_if.local_spi = p.vpp_tun_spi
2046 np.tun_if.remote_spi = p.scapy_tun_spi
2048 self.config_sa_tra(np)
2049 self.config_protect(np)
2052 self.verify_tun_66(np, count=127)
2053 c = p.tun_if.get_rx_stats()
2054 self.assertEqual(c['packets'], 254)
2055 c = p.tun_if.get_tx_stats()
2056 self.assertEqual(c['packets'], 254)
2058 # bounce the interface state
2059 p.tun_if.admin_down()
2060 self.verify_drop_tun_66(np, count=127)
2061 node = ('/err/ipsec6-tun-input/%s' %
2062 'ipsec packets received on disabled interface')
2063 self.assertEqual(127, self.statistics.get_err_counter(node))
2065 self.verify_tun_66(np, count=127)
2068 # 1) add two input SAs [old, new]
2069 # 2) swap output SA to [new]
2070 # 3) use only [new] input SA
2072 np3.crypt_key = b'Z' + p.crypt_key[1:]
2073 np3.scapy_tun_spi += 100
2074 np3.scapy_tun_sa_id += 1
2075 np3.vpp_tun_spi += 100
2076 np3.vpp_tun_sa_id += 1
2077 np3.tun_if.local_spi = p.vpp_tun_spi
2078 np3.tun_if.remote_spi = p.scapy_tun_spi
2080 self.config_sa_tra(np3)
2083 p.tun_protect.update_vpp_config(np.tun_sa_out,
2084 [np.tun_sa_in, np3.tun_sa_in])
2085 self.verify_tun_66(np, np, count=127)
2086 self.verify_tun_66(np3, np, count=127)
2089 p.tun_protect.update_vpp_config(np3.tun_sa_out,
2090 [np.tun_sa_in, np3.tun_sa_in])
2091 self.verify_tun_66(np, np3, count=127)
2092 self.verify_tun_66(np3, np3, count=127)
2095 p.tun_protect.update_vpp_config(np3.tun_sa_out,
2097 self.verify_tun_66(np3, np3, count=127)
2098 self.verify_drop_tun_66(np, count=127)
2100 c = p.tun_if.get_rx_stats()
2101 self.assertEqual(c['packets'], 127*9)
2102 c = p.tun_if.get_tx_stats()
2103 self.assertEqual(c['packets'], 127*8)
2104 self.unconfig_sa(np)
2107 self.unconfig_protect(np3)
2108 self.unconfig_sa(np3)
2109 self.unconfig_network(p)
2111 def test_tun_46(self):
2112 """IPSEC tunnel protect 4o6"""
2114 p = self.ipv6_params
2116 self.config_network(p)
2117 self.config_sa_tra(p)
2118 self.config_protect(p)
2120 self.verify_tun_46(p, count=127)
2121 c = p.tun_if.get_rx_stats()
2122 self.assertEqual(c['packets'], 127)
2123 c = p.tun_if.get_tx_stats()
2124 self.assertEqual(c['packets'], 127)
2127 self.unconfig_protect(p)
2129 self.unconfig_network(p)
2132 class TestIpsec6TunProtectTun(TemplateIpsec,
2133 TemplateIpsec6TunProtect,
2135 """ IPsec IPv6 Tunnel protect - tunnel mode"""
2137 encryption_type = ESP
2138 tun6_encrypt_node_name = "esp6-encrypt-tun"
2139 tun6_decrypt_node_name = "esp6-decrypt-tun"
2142 super(TestIpsec6TunProtectTun, self).setUp()
2144 self.tun_if = self.pg0
2147 super(TestIpsec6TunProtectTun, self).tearDown()
2149 def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1,
2151 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2152 sa.encrypt(IPv6(src=sw_intf.remote_ip6,
2153 dst=sw_intf.local_ip6) /
2154 IPv6(src=src, dst=dst) /
2155 UDP(sport=1166, dport=2233) /
2156 Raw(b'X' * payload_size))
2157 for i in range(count)]
2159 def gen_pkts6(self, sw_intf, src, dst, count=1,
2161 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2162 IPv6(src=src, dst=dst) /
2163 UDP(sport=1166, dport=2233) /
2164 Raw(b'X' * payload_size)
2165 for i in range(count)]
2167 def verify_decrypted6(self, p, rxs):
2169 self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
2170 self.assert_equal(rx[IPv6].src, p.remote_tun_if_host)
2171 self.assert_packet_checksums_valid(rx)
2173 def verify_encrypted6(self, p, sa, rxs):
2176 pkt = sa.decrypt(rx[IPv6])
2177 if not pkt.haslayer(IPv6):
2178 pkt = IPv6(pkt[Raw].load)
2179 self.assert_packet_checksums_valid(pkt)
2180 self.assert_equal(pkt[IPv6].dst, self.pg0.remote_ip6)
2181 self.assert_equal(pkt[IPv6].src, self.pg0.local_ip6)
2182 inner = pkt[IPv6].payload
2183 self.assertEqual(inner[IPv6][IPv6].dst, p.remote_tun_if_host)
2185 except (IndexError, AssertionError):
2186 self.logger.debug(ppp("Unexpected packet:", rx))
2188 self.logger.debug(ppp("Decrypted packet:", pkt))
2193 def test_tun_66(self):
2194 """IPSEC tunnel protect """
2196 p = self.ipv6_params
2198 self.config_network(p)
2199 self.config_sa_tun(p)
2200 self.config_protect(p)
2202 self.verify_tun_66(p, count=127)
2204 c = p.tun_if.get_rx_stats()
2205 self.assertEqual(c['packets'], 127)
2206 c = p.tun_if.get_tx_stats()
2207 self.assertEqual(c['packets'], 127)
2209 # rekey - create new SAs and update the tunnel protection
2211 np.crypt_key = b'X' + p.crypt_key[1:]
2212 np.scapy_tun_spi += 100
2213 np.scapy_tun_sa_id += 1
2214 np.vpp_tun_spi += 100
2215 np.vpp_tun_sa_id += 1
2216 np.tun_if.local_spi = p.vpp_tun_spi
2217 np.tun_if.remote_spi = p.scapy_tun_spi
2219 self.config_sa_tun(np)
2220 self.config_protect(np)
2223 self.verify_tun_66(np, count=127)
2224 c = p.tun_if.get_rx_stats()
2225 self.assertEqual(c['packets'], 254)
2226 c = p.tun_if.get_tx_stats()
2227 self.assertEqual(c['packets'], 254)
2230 self.unconfig_protect(np)
2231 self.unconfig_sa(np)
2232 self.unconfig_network(p)
2235 class TestIpsec6TunProtectTunDrop(TemplateIpsec,
2236 TemplateIpsec6TunProtect,
2238 """ IPsec IPv6 Tunnel protect - tunnel mode - drop"""
2240 encryption_type = ESP
2241 tun6_encrypt_node_name = "esp6-encrypt-tun"
2242 tun6_decrypt_node_name = "esp6-decrypt-tun"
2245 super(TestIpsec6TunProtectTunDrop, self).setUp()
2247 self.tun_if = self.pg0
2250 super(TestIpsec6TunProtectTunDrop, self).tearDown()
2252 def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1,
2254 # the IP destination of the revelaed packet does not match
2255 # that assigned to the tunnel
2256 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2257 sa.encrypt(IPv6(src=sw_intf.remote_ip6,
2259 IPv6(src=src, dst=dst) /
2260 UDP(sport=1144, dport=2233) /
2261 Raw(b'X' * payload_size))
2262 for i in range(count)]
2264 def test_tun_drop_66(self):
2265 """IPSEC 6 tunnel protect bogus tunnel header """
2267 p = self.ipv6_params
2269 self.config_network(p)
2270 self.config_sa_tun(p)
2271 self.config_protect(p)
2273 tx = self.gen_encrypt_pkts6(p, p.scapy_tun_sa, self.tun_if,
2274 src=p.remote_tun_if_host,
2275 dst=self.pg1.remote_ip6,
2277 self.send_and_assert_no_replies(self.tun_if, tx)
2279 self.unconfig_protect(p)
2281 self.unconfig_network(p)
2284 if __name__ == '__main__':
2285 unittest.main(testRunner=VppTestRunner)