5 from scapy.layers.ipsec import SecurityAssociation, ESP
6 from scapy.layers.l2 import Ether, GRE, Dot1Q
7 from scapy.packet import Raw
8 from scapy.layers.inet import IP, UDP
9 from scapy.layers.inet6 import IPv6
10 from framework import VppTestRunner
11 from template_ipsec import TemplateIpsec, IpsecTun4Tests, IpsecTun6Tests, \
12 IpsecTun4, IpsecTun6, IpsecTcpTests, mk_scapy_crypt_key, \
13 IpsecTun6HandoffTests, IpsecTun4HandoffTests, config_tun_params
14 from vpp_ipsec_tun_interface import VppIpsecTunInterface
15 from vpp_gre_interface import VppGreInterface
16 from vpp_ipip_tun_interface import VppIpIpTunInterface
17 from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto
18 from vpp_ipsec import VppIpsecSA, VppIpsecTunProtect
19 from vpp_l2 import VppBridgeDomain, VppBridgeDomainPort
20 from vpp_sub_interface import L2_VTR_OP, VppDot1QSubint
21 from vpp_teib import VppTeib
23 from vpp_papi import VppEnum
26 def config_tun_params(p, encryption_type, tun_if):
27 ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
28 esn_en = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
29 IPSEC_API_SAD_FLAG_USE_ESN))
30 crypt_key = mk_scapy_crypt_key(p)
31 p.tun_dst = tun_if.remote_ip
32 p.tun_src = tun_if.local_ip
33 p.scapy_tun_sa = SecurityAssociation(
34 encryption_type, spi=p.vpp_tun_spi,
35 crypt_algo=p.crypt_algo,
37 auth_algo=p.auth_algo, auth_key=p.auth_key,
38 tunnel_header=ip_class_by_addr_type[p.addr_type](
41 nat_t_header=p.nat_header,
43 p.vpp_tun_sa = SecurityAssociation(
44 encryption_type, spi=p.scapy_tun_spi,
45 crypt_algo=p.crypt_algo,
47 auth_algo=p.auth_algo, auth_key=p.auth_key,
48 tunnel_header=ip_class_by_addr_type[p.addr_type](
51 nat_t_header=p.nat_header,
55 def config_tra_params(p, encryption_type, tun_if):
56 ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
57 esn_en = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
58 IPSEC_API_SAD_FLAG_USE_ESN))
59 crypt_key = mk_scapy_crypt_key(p)
60 p.tun_dst = tun_if.remote_ip
61 p.tun_src = tun_if.local_ip
62 p.scapy_tun_sa = SecurityAssociation(
63 encryption_type, spi=p.vpp_tun_spi,
64 crypt_algo=p.crypt_algo,
66 auth_algo=p.auth_algo, auth_key=p.auth_key,
68 p.vpp_tun_sa = SecurityAssociation(
69 encryption_type, spi=p.scapy_tun_spi,
70 crypt_algo=p.crypt_algo,
72 auth_algo=p.auth_algo, auth_key=p.auth_key,
76 class TemplateIpsec4TunIfEsp(TemplateIpsec):
77 """ IPsec tunnel interface tests """
83 super(TemplateIpsec4TunIfEsp, cls).setUpClass()
86 def tearDownClass(cls):
87 super(TemplateIpsec4TunIfEsp, cls).tearDownClass()
90 super(TemplateIpsec4TunIfEsp, self).setUp()
92 self.tun_if = self.pg0
96 p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
97 p.scapy_tun_spi, p.crypt_algo_vpp_id,
98 p.crypt_key, p.crypt_key,
99 p.auth_algo_vpp_id, p.auth_key,
101 p.tun_if.add_vpp_config()
103 p.tun_if.config_ip4()
104 p.tun_if.config_ip6()
105 config_tun_params(p, self.encryption_type, p.tun_if)
107 r = VppIpRoute(self, p.remote_tun_if_host, 32,
108 [VppRoutePath(p.tun_if.remote_ip4,
111 r = VppIpRoute(self, p.remote_tun_if_host6, 128,
112 [VppRoutePath(p.tun_if.remote_ip6,
114 proto=DpoProto.DPO_PROTO_IP6)])
118 super(TemplateIpsec4TunIfEsp, self).tearDown()
121 class TemplateIpsec4TunIfEspUdp(TemplateIpsec):
122 """ IPsec UDP tunnel interface tests """
124 tun4_encrypt_node_name = "esp4-encrypt-tun"
125 tun4_decrypt_node_name = "esp4-decrypt-tun"
126 encryption_type = ESP
130 super(TemplateIpsec4TunIfEspUdp, cls).setUpClass()
133 def tearDownClass(cls):
134 super(TemplateIpsec4TunIfEspUdp, cls).tearDownClass()
136 def verify_encrypted(self, p, sa, rxs):
139 # ensure the UDP ports are correct before we decrypt
141 self.assertTrue(rx.haslayer(UDP))
142 self.assert_equal(rx[UDP].sport, 4500)
143 self.assert_equal(rx[UDP].dport, 4500)
145 pkt = sa.decrypt(rx[IP])
146 if not pkt.haslayer(IP):
147 pkt = IP(pkt[Raw].load)
149 self.assert_packet_checksums_valid(pkt)
150 self.assert_equal(pkt[IP].dst, "1.1.1.1")
151 self.assert_equal(pkt[IP].src, self.pg1.remote_ip4)
152 except (IndexError, AssertionError):
153 self.logger.debug(ppp("Unexpected packet:", rx))
155 self.logger.debug(ppp("Decrypted packet:", pkt))
161 super(TemplateIpsec4TunIfEspUdp, self).setUp()
164 p.flags = (VppEnum.vl_api_ipsec_sad_flags_t.
165 IPSEC_API_SAD_FLAG_UDP_ENCAP)
166 p.nat_header = UDP(sport=5454, dport=4500)
168 def config_network(self):
170 self.tun_if = self.pg0
172 p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
173 p.scapy_tun_spi, p.crypt_algo_vpp_id,
174 p.crypt_key, p.crypt_key,
175 p.auth_algo_vpp_id, p.auth_key,
176 p.auth_key, udp_encap=True)
177 p.tun_if.add_vpp_config()
179 p.tun_if.config_ip4()
180 p.tun_if.config_ip6()
181 config_tun_params(p, self.encryption_type, p.tun_if)
183 r = VppIpRoute(self, p.remote_tun_if_host, 32,
184 [VppRoutePath(p.tun_if.remote_ip4,
187 r = VppIpRoute(self, p.remote_tun_if_host6, 128,
188 [VppRoutePath(p.tun_if.remote_ip6,
190 proto=DpoProto.DPO_PROTO_IP6)])
194 super(TemplateIpsec4TunIfEspUdp, self).tearDown()
197 class TestIpsec4TunIfEsp1(TemplateIpsec4TunIfEsp, IpsecTun4Tests):
198 """ Ipsec ESP - TUN tests """
199 tun4_encrypt_node_name = "esp4-encrypt-tun"
200 tun4_decrypt_node_name = "esp4-decrypt-tun"
202 def test_tun_basic64(self):
203 """ ipsec 6o4 tunnel basic test """
204 self.tun4_encrypt_node_name = "esp4-encrypt-tun"
206 self.verify_tun_64(self.params[socket.AF_INET], count=1)
208 def test_tun_burst64(self):
209 """ ipsec 6o4 tunnel basic test """
210 self.tun4_encrypt_node_name = "esp4-encrypt-tun"
212 self.verify_tun_64(self.params[socket.AF_INET], count=257)
214 def test_tun_basic_frag44(self):
215 """ ipsec 4o4 tunnel frag basic test """
216 self.tun4_encrypt_node_name = "esp4-encrypt-tun"
220 self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index,
222 self.verify_tun_44(self.params[socket.AF_INET],
223 count=1, payload_size=1800, n_rx=2)
224 self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index,
228 class TestIpsec4TunIfEspUdp(TemplateIpsec4TunIfEspUdp, IpsecTun4Tests):
229 """ Ipsec ESP UDP tests """
231 tun4_input_node = "ipsec4-tun-input"
234 super(TemplateIpsec4TunIfEspUdp, self).setUp()
235 self.config_network()
237 def test_keepalive(self):
238 """ IPSEC NAT Keepalive """
239 self.verify_keepalive(self.ipv4_params)
242 class TestIpsec4TunIfEspUdpGCM(TemplateIpsec4TunIfEspUdp, IpsecTun4Tests):
243 """ Ipsec ESP UDP GCM tests """
245 tun4_input_node = "ipsec4-tun-input"
248 super(TemplateIpsec4TunIfEspUdp, self).setUp()
250 p.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
251 IPSEC_API_INTEG_ALG_NONE)
252 p.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
253 IPSEC_API_CRYPTO_ALG_AES_GCM_256)
254 p.crypt_algo = "AES-GCM"
256 p.crypt_key = b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"
258 self.config_network()
261 class TestIpsec4TunIfEsp2(TemplateIpsec4TunIfEsp, IpsecTcpTests):
262 """ Ipsec ESP - TCP tests """
266 class TemplateIpsec6TunIfEsp(TemplateIpsec):
267 """ IPsec tunnel interface tests """
269 encryption_type = ESP
272 super(TemplateIpsec6TunIfEsp, self).setUp()
274 self.tun_if = self.pg0
277 p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
278 p.scapy_tun_spi, p.crypt_algo_vpp_id,
279 p.crypt_key, p.crypt_key,
280 p.auth_algo_vpp_id, p.auth_key,
281 p.auth_key, is_ip6=True)
282 p.tun_if.add_vpp_config()
284 p.tun_if.config_ip6()
285 p.tun_if.config_ip4()
286 config_tun_params(p, self.encryption_type, p.tun_if)
288 r = VppIpRoute(self, p.remote_tun_if_host, 128,
289 [VppRoutePath(p.tun_if.remote_ip6,
291 proto=DpoProto.DPO_PROTO_IP6)])
293 r = VppIpRoute(self, p.remote_tun_if_host4, 32,
294 [VppRoutePath(p.tun_if.remote_ip4,
299 super(TemplateIpsec6TunIfEsp, self).tearDown()
302 class TestIpsec6TunIfEsp1(TemplateIpsec6TunIfEsp,
304 """ Ipsec ESP - TUN tests """
305 tun6_encrypt_node_name = "esp6-encrypt-tun"
306 tun6_decrypt_node_name = "esp6-decrypt-tun"
308 def test_tun_basic46(self):
309 """ ipsec 4o6 tunnel basic test """
310 self.tun6_encrypt_node_name = "esp6-encrypt-tun"
311 self.verify_tun_46(self.params[socket.AF_INET6], count=1)
313 def test_tun_burst46(self):
314 """ ipsec 4o6 tunnel burst test """
315 self.tun6_encrypt_node_name = "esp6-encrypt-tun"
316 self.verify_tun_46(self.params[socket.AF_INET6], count=257)
319 class TestIpsec6TunIfEspHandoff(TemplateIpsec6TunIfEsp,
320 IpsecTun6HandoffTests):
321 """ Ipsec ESP 6 Handoff tests """
322 tun6_encrypt_node_name = "esp6-encrypt-tun"
323 tun6_decrypt_node_name = "esp6-decrypt-tun"
326 class TestIpsec4TunIfEspHandoff(TemplateIpsec4TunIfEsp,
327 IpsecTun4HandoffTests):
328 """ Ipsec ESP 4 Handoff tests """
329 tun4_encrypt_node_name = "esp4-encrypt-tun"
330 tun4_decrypt_node_name = "esp4-decrypt-tun"
333 class TestIpsec4MultiTunIfEsp(TemplateIpsec, IpsecTun4):
334 """ IPsec IPv4 Multi Tunnel interface """
336 encryption_type = ESP
337 tun4_encrypt_node_name = "esp4-encrypt-tun"
338 tun4_decrypt_node_name = "esp4-decrypt-tun"
341 super(TestIpsec4MultiTunIfEsp, self).setUp()
343 self.tun_if = self.pg0
345 self.multi_params = []
346 self.pg0.generate_remote_hosts(10)
347 self.pg0.configure_ipv4_neighbors()
350 p = copy.copy(self.ipv4_params)
352 p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
353 p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
354 p.scapy_tun_spi = p.scapy_tun_spi + ii
355 p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
356 p.vpp_tun_spi = p.vpp_tun_spi + ii
358 p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
359 p.scapy_tra_spi = p.scapy_tra_spi + ii
360 p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
361 p.vpp_tra_spi = p.vpp_tra_spi + ii
362 p.tun_dst = self.pg0.remote_hosts[ii].ip4
364 p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
367 p.crypt_key, p.crypt_key,
368 p.auth_algo_vpp_id, p.auth_key,
371 p.tun_if.add_vpp_config()
373 p.tun_if.config_ip4()
374 config_tun_params(p, self.encryption_type, p.tun_if)
375 self.multi_params.append(p)
377 VppIpRoute(self, p.remote_tun_if_host, 32,
378 [VppRoutePath(p.tun_if.remote_ip4,
379 0xffffffff)]).add_vpp_config()
382 super(TestIpsec4MultiTunIfEsp, self).tearDown()
384 def test_tun_44(self):
385 """Multiple IPSEC tunnel interfaces """
386 for p in self.multi_params:
387 self.verify_tun_44(p, count=127)
388 c = p.tun_if.get_rx_stats()
389 self.assertEqual(c['packets'], 127)
390 c = p.tun_if.get_tx_stats()
391 self.assertEqual(c['packets'], 127)
393 def test_tun_rr_44(self):
394 """ Round-robin packets acrros multiple interface """
396 for p in self.multi_params:
397 tx = tx + self.gen_encrypt_pkts(p, p.scapy_tun_sa, self.tun_if,
398 src=p.remote_tun_if_host,
399 dst=self.pg1.remote_ip4)
400 rxs = self.send_and_expect(self.tun_if, tx, self.pg1)
402 for rx, p in zip(rxs, self.multi_params):
403 self.verify_decrypted(p, [rx])
406 for p in self.multi_params:
407 tx = tx + self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
408 dst=p.remote_tun_if_host)
409 rxs = self.send_and_expect(self.pg1, tx, self.tun_if)
411 for rx, p in zip(rxs, self.multi_params):
412 self.verify_encrypted(p, p.vpp_tun_sa, [rx])
415 class TestIpsec4TunIfEspAll(TemplateIpsec, IpsecTun4):
416 """ IPsec IPv4 Tunnel interface all Algos """
418 encryption_type = ESP
419 tun4_encrypt_node_name = "esp4-encrypt-tun"
420 tun4_decrypt_node_name = "esp4-decrypt-tun"
422 def config_network(self, p):
424 p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
427 p.crypt_key, p.crypt_key,
428 p.auth_algo_vpp_id, p.auth_key,
431 p.tun_if.add_vpp_config()
433 p.tun_if.config_ip4()
434 config_tun_params(p, self.encryption_type, p.tun_if)
435 self.logger.info(self.vapi.cli("sh ipsec sa 0"))
436 self.logger.info(self.vapi.cli("sh ipsec sa 1"))
438 p.route = VppIpRoute(self, p.remote_tun_if_host, 32,
439 [VppRoutePath(p.tun_if.remote_ip4,
441 p.route.add_vpp_config()
443 def unconfig_network(self, p):
444 p.tun_if.unconfig_ip4()
445 p.tun_if.remove_vpp_config()
446 p.route.remove_vpp_config()
449 super(TestIpsec4TunIfEspAll, self).setUp()
451 self.tun_if = self.pg0
454 super(TestIpsec4TunIfEspAll, self).tearDown()
458 # change the key and the SPI
460 p.crypt_key = b'X' + p.crypt_key[1:]
462 p.scapy_tun_sa_id += 1
465 p.tun_if.local_spi = p.vpp_tun_spi
466 p.tun_if.remote_spi = p.scapy_tun_spi
468 config_tun_params(p, self.encryption_type, p.tun_if)
470 p.tun_sa_in = VppIpsecSA(self,
477 self.vpp_esp_protocol,
480 p.tun_sa_out = VppIpsecSA(self,
487 self.vpp_esp_protocol,
490 p.tun_sa_in.add_vpp_config()
491 p.tun_sa_out.add_vpp_config()
493 self.vapi.ipsec_tunnel_if_set_sa(sw_if_index=p.tun_if.sw_if_index,
494 sa_id=p.tun_sa_in.id,
496 self.vapi.ipsec_tunnel_if_set_sa(sw_if_index=p.tun_if.sw_if_index,
497 sa_id=p.tun_sa_out.id,
499 self.logger.info(self.vapi.cli("sh ipsec sa"))
501 def test_tun_44(self):
502 """IPSEC tunnel all algos """
504 # foreach VPP crypto engine
505 engines = ["ia32", "ipsecmb", "openssl"]
507 # foreach crypto algorithm
508 algos = [{'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
509 IPSEC_API_CRYPTO_ALG_AES_GCM_128),
510 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
511 IPSEC_API_INTEG_ALG_NONE),
512 'scapy-crypto': "AES-GCM",
513 'scapy-integ': "NULL",
514 'key': b"JPjyOWBeVEQiMe7h",
516 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
517 IPSEC_API_CRYPTO_ALG_AES_GCM_192),
518 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
519 IPSEC_API_INTEG_ALG_NONE),
520 'scapy-crypto': "AES-GCM",
521 'scapy-integ': "NULL",
522 'key': b"JPjyOWBeVEQiMe7hJPjyOWBe",
524 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
525 IPSEC_API_CRYPTO_ALG_AES_GCM_256),
526 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
527 IPSEC_API_INTEG_ALG_NONE),
528 'scapy-crypto': "AES-GCM",
529 'scapy-integ': "NULL",
530 'key': b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h",
532 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
533 IPSEC_API_CRYPTO_ALG_AES_CBC_128),
534 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
535 IPSEC_API_INTEG_ALG_SHA1_96),
536 'scapy-crypto': "AES-CBC",
537 'scapy-integ': "HMAC-SHA1-96",
539 'key': b"JPjyOWBeVEQiMe7h"},
540 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
541 IPSEC_API_CRYPTO_ALG_AES_CBC_192),
542 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
543 IPSEC_API_INTEG_ALG_SHA1_96),
544 'scapy-crypto': "AES-CBC",
545 'scapy-integ': "HMAC-SHA1-96",
547 'key': b"JPjyOWBeVEQiMe7hJPjyOWBe"},
548 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
549 IPSEC_API_CRYPTO_ALG_AES_CBC_256),
550 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
551 IPSEC_API_INTEG_ALG_SHA1_96),
552 'scapy-crypto': "AES-CBC",
553 'scapy-integ': "HMAC-SHA1-96",
555 'key': b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"},
556 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
557 IPSEC_API_CRYPTO_ALG_NONE),
558 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
559 IPSEC_API_INTEG_ALG_SHA1_96),
560 'scapy-crypto': "NULL",
561 'scapy-integ': "HMAC-SHA1-96",
563 'key': b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"}]
565 for engine in engines:
566 self.vapi.cli("set crypto handler all %s" % engine)
569 # loop through each of the algorithms
572 # with self.subTest(algo=algo['scapy']):
574 p = copy.copy(self.ipv4_params)
575 p.auth_algo_vpp_id = algo['vpp-integ']
576 p.crypt_algo_vpp_id = algo['vpp-crypto']
577 p.crypt_algo = algo['scapy-crypto']
578 p.auth_algo = algo['scapy-integ']
579 p.crypt_key = algo['key']
580 p.salt = algo['salt']
582 self.config_network(p)
584 self.verify_tun_44(p, count=127)
585 c = p.tun_if.get_rx_stats()
586 self.assertEqual(c['packets'], 127)
587 c = p.tun_if.get_tx_stats()
588 self.assertEqual(c['packets'], 127)
594 self.verify_tun_44(p, count=127)
596 self.unconfig_network(p)
597 p.tun_sa_out.remove_vpp_config()
598 p.tun_sa_in.remove_vpp_config()
601 class TestIpsec4TunIfEspNoAlgo(TemplateIpsec, IpsecTun4):
602 """ IPsec IPv4 Tunnel interface all Algos """
604 encryption_type = ESP
605 tun4_encrypt_node_name = "esp4-encrypt-tun"
606 tun4_decrypt_node_name = "esp4-decrypt-tun"
608 def config_network(self, p):
610 p.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
611 IPSEC_API_INTEG_ALG_NONE)
615 p.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
616 IPSEC_API_CRYPTO_ALG_NONE)
617 p.crypt_algo = 'NULL'
620 p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
623 p.crypt_key, p.crypt_key,
624 p.auth_algo_vpp_id, p.auth_key,
627 p.tun_if.add_vpp_config()
629 p.tun_if.config_ip4()
630 config_tun_params(p, self.encryption_type, p.tun_if)
631 self.logger.info(self.vapi.cli("sh ipsec sa 0"))
632 self.logger.info(self.vapi.cli("sh ipsec sa 1"))
634 p.route = VppIpRoute(self, p.remote_tun_if_host, 32,
635 [VppRoutePath(p.tun_if.remote_ip4,
637 p.route.add_vpp_config()
639 def unconfig_network(self, p):
640 p.tun_if.unconfig_ip4()
641 p.tun_if.remove_vpp_config()
642 p.route.remove_vpp_config()
645 super(TestIpsec4TunIfEspNoAlgo, self).setUp()
647 self.tun_if = self.pg0
650 super(TestIpsec4TunIfEspNoAlgo, self).tearDown()
652 def test_tun_44(self):
655 self.config_network(p)
657 tx = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
658 dst=p.remote_tun_if_host)
659 self.send_and_assert_no_replies(self.pg1, tx)
661 self.unconfig_network(p)
664 class TestIpsec6MultiTunIfEsp(TemplateIpsec, IpsecTun6):
665 """ IPsec IPv6 Multi Tunnel interface """
667 encryption_type = ESP
668 tun6_encrypt_node_name = "esp6-encrypt-tun"
669 tun6_decrypt_node_name = "esp6-decrypt-tun"
672 super(TestIpsec6MultiTunIfEsp, self).setUp()
674 self.tun_if = self.pg0
676 self.multi_params = []
677 self.pg0.generate_remote_hosts(10)
678 self.pg0.configure_ipv6_neighbors()
681 p = copy.copy(self.ipv6_params)
683 p.remote_tun_if_host = "1111::%d" % (ii + 1)
684 p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
685 p.scapy_tun_spi = p.scapy_tun_spi + ii
686 p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
687 p.vpp_tun_spi = p.vpp_tun_spi + ii
689 p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
690 p.scapy_tra_spi = p.scapy_tra_spi + ii
691 p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
692 p.vpp_tra_spi = p.vpp_tra_spi + ii
694 p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
697 p.crypt_key, p.crypt_key,
698 p.auth_algo_vpp_id, p.auth_key,
699 p.auth_key, is_ip6=True,
700 dst=self.pg0.remote_hosts[ii].ip6)
701 p.tun_if.add_vpp_config()
703 p.tun_if.config_ip6()
704 config_tun_params(p, self.encryption_type, p.tun_if)
705 self.multi_params.append(p)
707 r = VppIpRoute(self, p.remote_tun_if_host, 128,
708 [VppRoutePath(p.tun_if.remote_ip6,
710 proto=DpoProto.DPO_PROTO_IP6)])
714 super(TestIpsec6MultiTunIfEsp, self).tearDown()
716 def test_tun_66(self):
717 """Multiple IPSEC tunnel interfaces """
718 for p in self.multi_params:
719 self.verify_tun_66(p, count=127)
720 c = p.tun_if.get_rx_stats()
721 self.assertEqual(c['packets'], 127)
722 c = p.tun_if.get_tx_stats()
723 self.assertEqual(c['packets'], 127)
726 class TestIpsecGreTebIfEsp(TemplateIpsec,
728 """ Ipsec GRE TEB ESP - TUN tests """
729 tun4_encrypt_node_name = "esp4-encrypt-tun"
730 tun4_decrypt_node_name = "esp4-decrypt-tun"
731 encryption_type = ESP
732 omac = "00:11:22:33:44:55"
734 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
736 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
737 sa.encrypt(IP(src=self.pg0.remote_ip4,
738 dst=self.pg0.local_ip4) /
740 Ether(dst=self.omac) /
741 IP(src="1.1.1.1", dst="1.1.1.2") /
742 UDP(sport=1144, dport=2233) /
743 Raw(b'X' * payload_size))
744 for i in range(count)]
746 def gen_pkts(self, sw_intf, src, dst, count=1,
748 return [Ether(dst=self.omac) /
749 IP(src="1.1.1.1", dst="1.1.1.2") /
750 UDP(sport=1144, dport=2233) /
751 Raw(b'X' * payload_size)
752 for i in range(count)]
754 def verify_decrypted(self, p, rxs):
756 self.assert_equal(rx[Ether].dst, self.omac)
757 self.assert_equal(rx[IP].dst, "1.1.1.2")
759 def verify_encrypted(self, p, sa, rxs):
762 pkt = sa.decrypt(rx[IP])
763 if not pkt.haslayer(IP):
764 pkt = IP(pkt[Raw].load)
765 self.assert_packet_checksums_valid(pkt)
766 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
767 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
768 self.assertTrue(pkt.haslayer(GRE))
770 self.assertEqual(e[Ether].dst, self.omac)
771 self.assertEqual(e[IP].dst, "1.1.1.2")
772 except (IndexError, AssertionError):
773 self.logger.debug(ppp("Unexpected packet:", rx))
775 self.logger.debug(ppp("Decrypted packet:", pkt))
781 super(TestIpsecGreTebIfEsp, self).setUp()
783 self.tun_if = self.pg0
787 bd1 = VppBridgeDomain(self, 1)
790 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
791 p.auth_algo_vpp_id, p.auth_key,
792 p.crypt_algo_vpp_id, p.crypt_key,
793 self.vpp_esp_protocol,
796 p.tun_sa_out.add_vpp_config()
798 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
799 p.auth_algo_vpp_id, p.auth_key,
800 p.crypt_algo_vpp_id, p.crypt_key,
801 self.vpp_esp_protocol,
804 p.tun_sa_in.add_vpp_config()
806 p.tun_if = VppGreInterface(self,
809 type=(VppEnum.vl_api_gre_tunnel_type_t.
810 GRE_API_TUNNEL_TYPE_TEB))
811 p.tun_if.add_vpp_config()
813 p.tun_protect = VppIpsecTunProtect(self,
818 p.tun_protect.add_vpp_config()
821 p.tun_if.config_ip4()
822 config_tun_params(p, self.encryption_type, p.tun_if)
824 VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
825 VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
827 self.vapi.cli("clear ipsec sa")
828 self.vapi.cli("sh adj")
829 self.vapi.cli("sh ipsec tun")
833 p.tun_if.unconfig_ip4()
834 super(TestIpsecGreTebIfEsp, self).tearDown()
837 class TestIpsecGreTebVlanIfEsp(TemplateIpsec,
839 """ Ipsec GRE TEB ESP - TUN tests """
840 tun4_encrypt_node_name = "esp4-encrypt-tun"
841 tun4_decrypt_node_name = "esp4-decrypt-tun"
842 encryption_type = ESP
843 omac = "00:11:22:33:44:55"
845 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
847 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
848 sa.encrypt(IP(src=self.pg0.remote_ip4,
849 dst=self.pg0.local_ip4) /
851 Ether(dst=self.omac) /
852 IP(src="1.1.1.1", dst="1.1.1.2") /
853 UDP(sport=1144, dport=2233) /
854 Raw(b'X' * payload_size))
855 for i in range(count)]
857 def gen_pkts(self, sw_intf, src, dst, count=1,
859 return [Ether(dst=self.omac) /
861 IP(src="1.1.1.1", dst="1.1.1.2") /
862 UDP(sport=1144, dport=2233) /
863 Raw(b'X' * payload_size)
864 for i in range(count)]
866 def verify_decrypted(self, p, rxs):
868 self.assert_equal(rx[Ether].dst, self.omac)
869 self.assert_equal(rx[Dot1Q].vlan, 11)
870 self.assert_equal(rx[IP].dst, "1.1.1.2")
872 def verify_encrypted(self, p, sa, rxs):
875 pkt = sa.decrypt(rx[IP])
876 if not pkt.haslayer(IP):
877 pkt = IP(pkt[Raw].load)
878 self.assert_packet_checksums_valid(pkt)
879 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
880 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
881 self.assertTrue(pkt.haslayer(GRE))
883 self.assertEqual(e[Ether].dst, self.omac)
884 self.assertFalse(e.haslayer(Dot1Q))
885 self.assertEqual(e[IP].dst, "1.1.1.2")
886 except (IndexError, AssertionError):
887 self.logger.debug(ppp("Unexpected packet:", rx))
889 self.logger.debug(ppp("Decrypted packet:", pkt))
895 super(TestIpsecGreTebVlanIfEsp, self).setUp()
897 self.tun_if = self.pg0
901 bd1 = VppBridgeDomain(self, 1)
904 self.pg1_11 = VppDot1QSubint(self, self.pg1, 11)
905 self.vapi.l2_interface_vlan_tag_rewrite(
906 sw_if_index=self.pg1_11.sw_if_index, vtr_op=L2_VTR_OP.L2_POP_1,
908 self.pg1_11.admin_up()
910 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
911 p.auth_algo_vpp_id, p.auth_key,
912 p.crypt_algo_vpp_id, p.crypt_key,
913 self.vpp_esp_protocol,
916 p.tun_sa_out.add_vpp_config()
918 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
919 p.auth_algo_vpp_id, p.auth_key,
920 p.crypt_algo_vpp_id, p.crypt_key,
921 self.vpp_esp_protocol,
924 p.tun_sa_in.add_vpp_config()
926 p.tun_if = VppGreInterface(self,
929 type=(VppEnum.vl_api_gre_tunnel_type_t.
930 GRE_API_TUNNEL_TYPE_TEB))
931 p.tun_if.add_vpp_config()
933 p.tun_protect = VppIpsecTunProtect(self,
938 p.tun_protect.add_vpp_config()
941 p.tun_if.config_ip4()
942 config_tun_params(p, self.encryption_type, p.tun_if)
944 VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
945 VppBridgeDomainPort(self, bd1, self.pg1_11).add_vpp_config()
947 self.vapi.cli("clear ipsec sa")
951 p.tun_if.unconfig_ip4()
952 super(TestIpsecGreTebVlanIfEsp, self).tearDown()
953 self.pg1_11.admin_down()
954 self.pg1_11.remove_vpp_config()
957 class TestIpsecGreTebIfEspTra(TemplateIpsec,
959 """ Ipsec GRE TEB ESP - Tra tests """
960 tun4_encrypt_node_name = "esp4-encrypt-tun"
961 tun4_decrypt_node_name = "esp4-decrypt-tun"
962 encryption_type = ESP
963 omac = "00:11:22:33:44:55"
965 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
967 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
968 sa.encrypt(IP(src=self.pg0.remote_ip4,
969 dst=self.pg0.local_ip4) /
971 Ether(dst=self.omac) /
972 IP(src="1.1.1.1", dst="1.1.1.2") /
973 UDP(sport=1144, dport=2233) /
974 Raw(b'X' * payload_size))
975 for i in range(count)]
977 def gen_pkts(self, sw_intf, src, dst, count=1,
979 return [Ether(dst=self.omac) /
980 IP(src="1.1.1.1", dst="1.1.1.2") /
981 UDP(sport=1144, dport=2233) /
982 Raw(b'X' * payload_size)
983 for i in range(count)]
985 def verify_decrypted(self, p, rxs):
987 self.assert_equal(rx[Ether].dst, self.omac)
988 self.assert_equal(rx[IP].dst, "1.1.1.2")
990 def verify_encrypted(self, p, sa, rxs):
993 pkt = sa.decrypt(rx[IP])
994 if not pkt.haslayer(IP):
995 pkt = IP(pkt[Raw].load)
996 self.assert_packet_checksums_valid(pkt)
997 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
998 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
999 self.assertTrue(pkt.haslayer(GRE))
1001 self.assertEqual(e[Ether].dst, self.omac)
1002 self.assertEqual(e[IP].dst, "1.1.1.2")
1003 except (IndexError, AssertionError):
1004 self.logger.debug(ppp("Unexpected packet:", rx))
1006 self.logger.debug(ppp("Decrypted packet:", pkt))
1012 super(TestIpsecGreTebIfEspTra, self).setUp()
1014 self.tun_if = self.pg0
1016 p = self.ipv4_params
1018 bd1 = VppBridgeDomain(self, 1)
1019 bd1.add_vpp_config()
1021 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1022 p.auth_algo_vpp_id, p.auth_key,
1023 p.crypt_algo_vpp_id, p.crypt_key,
1024 self.vpp_esp_protocol)
1025 p.tun_sa_out.add_vpp_config()
1027 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1028 p.auth_algo_vpp_id, p.auth_key,
1029 p.crypt_algo_vpp_id, p.crypt_key,
1030 self.vpp_esp_protocol)
1031 p.tun_sa_in.add_vpp_config()
1033 p.tun_if = VppGreInterface(self,
1035 self.pg0.remote_ip4,
1036 type=(VppEnum.vl_api_gre_tunnel_type_t.
1037 GRE_API_TUNNEL_TYPE_TEB))
1038 p.tun_if.add_vpp_config()
1040 p.tun_protect = VppIpsecTunProtect(self,
1045 p.tun_protect.add_vpp_config()
1048 p.tun_if.config_ip4()
1049 config_tra_params(p, self.encryption_type, p.tun_if)
1051 VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
1052 VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
1054 self.vapi.cli("clear ipsec sa")
1057 p = self.ipv4_params
1058 p.tun_if.unconfig_ip4()
1059 super(TestIpsecGreTebIfEspTra, self).tearDown()
1062 class TestIpsecGreIfEsp(TemplateIpsec,
1064 """ Ipsec GRE ESP - TUN tests """
1065 tun4_encrypt_node_name = "esp4-encrypt-tun"
1066 tun4_decrypt_node_name = "esp4-decrypt-tun"
1067 encryption_type = ESP
1069 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1071 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1072 sa.encrypt(IP(src=self.pg0.remote_ip4,
1073 dst=self.pg0.local_ip4) /
1075 IP(src=self.pg1.local_ip4,
1076 dst=self.pg1.remote_ip4) /
1077 UDP(sport=1144, dport=2233) /
1078 Raw(b'X' * payload_size))
1079 for i in range(count)]
1081 def gen_pkts(self, sw_intf, src, dst, count=1,
1083 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1084 IP(src="1.1.1.1", dst="1.1.1.2") /
1085 UDP(sport=1144, dport=2233) /
1086 Raw(b'X' * payload_size)
1087 for i in range(count)]
1089 def verify_decrypted(self, p, rxs):
1091 self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1092 self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1094 def verify_encrypted(self, p, sa, rxs):
1097 pkt = sa.decrypt(rx[IP])
1098 if not pkt.haslayer(IP):
1099 pkt = IP(pkt[Raw].load)
1100 self.assert_packet_checksums_valid(pkt)
1101 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1102 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1103 self.assertTrue(pkt.haslayer(GRE))
1105 self.assertEqual(e[IP].dst, "1.1.1.2")
1106 except (IndexError, AssertionError):
1107 self.logger.debug(ppp("Unexpected packet:", rx))
1109 self.logger.debug(ppp("Decrypted packet:", pkt))
1115 super(TestIpsecGreIfEsp, self).setUp()
1117 self.tun_if = self.pg0
1119 p = self.ipv4_params
1121 bd1 = VppBridgeDomain(self, 1)
1122 bd1.add_vpp_config()
1124 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1125 p.auth_algo_vpp_id, p.auth_key,
1126 p.crypt_algo_vpp_id, p.crypt_key,
1127 self.vpp_esp_protocol,
1129 self.pg0.remote_ip4)
1130 p.tun_sa_out.add_vpp_config()
1132 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1133 p.auth_algo_vpp_id, p.auth_key,
1134 p.crypt_algo_vpp_id, p.crypt_key,
1135 self.vpp_esp_protocol,
1136 self.pg0.remote_ip4,
1138 p.tun_sa_in.add_vpp_config()
1140 p.tun_if = VppGreInterface(self,
1142 self.pg0.remote_ip4)
1143 p.tun_if.add_vpp_config()
1145 p.tun_protect = VppIpsecTunProtect(self,
1149 p.tun_protect.add_vpp_config()
1152 p.tun_if.config_ip4()
1153 config_tun_params(p, self.encryption_type, p.tun_if)
1155 VppIpRoute(self, "1.1.1.2", 32,
1156 [VppRoutePath(p.tun_if.remote_ip4,
1157 0xffffffff)]).add_vpp_config()
1160 p = self.ipv4_params
1161 p.tun_if.unconfig_ip4()
1162 super(TestIpsecGreIfEsp, self).tearDown()
1165 class TestIpsecGreIfEspTra(TemplateIpsec,
1167 """ Ipsec GRE ESP - TRA tests """
1168 tun4_encrypt_node_name = "esp4-encrypt-tun"
1169 tun4_decrypt_node_name = "esp4-decrypt-tun"
1170 encryption_type = ESP
1172 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1174 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1175 sa.encrypt(IP(src=self.pg0.remote_ip4,
1176 dst=self.pg0.local_ip4) /
1178 IP(src=self.pg1.local_ip4,
1179 dst=self.pg1.remote_ip4) /
1180 UDP(sport=1144, dport=2233) /
1181 Raw(b'X' * payload_size))
1182 for i in range(count)]
1184 def gen_encrypt_non_ip_pkts(self, sa, sw_intf, src, dst, count=1,
1186 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1187 sa.encrypt(IP(src=self.pg0.remote_ip4,
1188 dst=self.pg0.local_ip4) /
1190 UDP(sport=1144, dport=2233) /
1191 Raw(b'X' * payload_size))
1192 for i in range(count)]
1194 def gen_pkts(self, sw_intf, src, dst, count=1,
1196 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1197 IP(src="1.1.1.1", dst="1.1.1.2") /
1198 UDP(sport=1144, dport=2233) /
1199 Raw(b'X' * payload_size)
1200 for i in range(count)]
1202 def verify_decrypted(self, p, rxs):
1204 self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1205 self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1207 def verify_encrypted(self, p, sa, rxs):
1210 pkt = sa.decrypt(rx[IP])
1211 if not pkt.haslayer(IP):
1212 pkt = IP(pkt[Raw].load)
1213 self.assert_packet_checksums_valid(pkt)
1214 self.assertTrue(pkt.haslayer(GRE))
1216 self.assertEqual(e[IP].dst, "1.1.1.2")
1217 except (IndexError, AssertionError):
1218 self.logger.debug(ppp("Unexpected packet:", rx))
1220 self.logger.debug(ppp("Decrypted packet:", pkt))
1226 super(TestIpsecGreIfEspTra, self).setUp()
1228 self.tun_if = self.pg0
1230 p = self.ipv4_params
1232 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1233 p.auth_algo_vpp_id, p.auth_key,
1234 p.crypt_algo_vpp_id, p.crypt_key,
1235 self.vpp_esp_protocol)
1236 p.tun_sa_out.add_vpp_config()
1238 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1239 p.auth_algo_vpp_id, p.auth_key,
1240 p.crypt_algo_vpp_id, p.crypt_key,
1241 self.vpp_esp_protocol)
1242 p.tun_sa_in.add_vpp_config()
1244 p.tun_if = VppGreInterface(self,
1246 self.pg0.remote_ip4)
1247 p.tun_if.add_vpp_config()
1249 p.tun_protect = VppIpsecTunProtect(self,
1253 p.tun_protect.add_vpp_config()
1256 p.tun_if.config_ip4()
1257 config_tra_params(p, self.encryption_type, p.tun_if)
1259 VppIpRoute(self, "1.1.1.2", 32,
1260 [VppRoutePath(p.tun_if.remote_ip4,
1261 0xffffffff)]).add_vpp_config()
1264 p = self.ipv4_params
1265 p.tun_if.unconfig_ip4()
1266 super(TestIpsecGreIfEspTra, self).tearDown()
1268 def test_gre_non_ip(self):
1269 p = self.ipv4_params
1270 tx = self.gen_encrypt_non_ip_pkts(p.scapy_tun_sa, self.tun_if,
1271 src=p.remote_tun_if_host,
1272 dst=self.pg1.remote_ip6)
1273 self.send_and_assert_no_replies(self.tun_if, tx)
1274 node_name = ('/err/%s/unsupported payload' %
1275 self.tun4_decrypt_node_name)
1276 self.assertEqual(1, self.statistics.get_err_counter(node_name))
1279 class TestIpsecGre6IfEspTra(TemplateIpsec,
1281 """ Ipsec GRE ESP - TRA tests """
1282 tun6_encrypt_node_name = "esp6-encrypt-tun"
1283 tun6_decrypt_node_name = "esp6-decrypt-tun"
1284 encryption_type = ESP
1286 def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1,
1288 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1289 sa.encrypt(IPv6(src=self.pg0.remote_ip6,
1290 dst=self.pg0.local_ip6) /
1292 IPv6(src=self.pg1.local_ip6,
1293 dst=self.pg1.remote_ip6) /
1294 UDP(sport=1144, dport=2233) /
1295 Raw(b'X' * payload_size))
1296 for i in range(count)]
1298 def gen_pkts6(self, sw_intf, src, dst, count=1,
1300 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1301 IPv6(src="1::1", dst="1::2") /
1302 UDP(sport=1144, dport=2233) /
1303 Raw(b'X' * payload_size)
1304 for i in range(count)]
1306 def verify_decrypted6(self, p, rxs):
1308 self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1309 self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
1311 def verify_encrypted6(self, p, sa, rxs):
1314 pkt = sa.decrypt(rx[IPv6])
1315 if not pkt.haslayer(IPv6):
1316 pkt = IPv6(pkt[Raw].load)
1317 self.assert_packet_checksums_valid(pkt)
1318 self.assertTrue(pkt.haslayer(GRE))
1320 self.assertEqual(e[IPv6].dst, "1::2")
1321 except (IndexError, AssertionError):
1322 self.logger.debug(ppp("Unexpected packet:", rx))
1324 self.logger.debug(ppp("Decrypted packet:", pkt))
1330 super(TestIpsecGre6IfEspTra, self).setUp()
1332 self.tun_if = self.pg0
1334 p = self.ipv6_params
1336 bd1 = VppBridgeDomain(self, 1)
1337 bd1.add_vpp_config()
1339 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1340 p.auth_algo_vpp_id, p.auth_key,
1341 p.crypt_algo_vpp_id, p.crypt_key,
1342 self.vpp_esp_protocol)
1343 p.tun_sa_out.add_vpp_config()
1345 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1346 p.auth_algo_vpp_id, p.auth_key,
1347 p.crypt_algo_vpp_id, p.crypt_key,
1348 self.vpp_esp_protocol)
1349 p.tun_sa_in.add_vpp_config()
1351 p.tun_if = VppGreInterface(self,
1353 self.pg0.remote_ip6)
1354 p.tun_if.add_vpp_config()
1356 p.tun_protect = VppIpsecTunProtect(self,
1360 p.tun_protect.add_vpp_config()
1363 p.tun_if.config_ip6()
1364 config_tra_params(p, self.encryption_type, p.tun_if)
1366 r = VppIpRoute(self, "1::2", 128,
1367 [VppRoutePath(p.tun_if.remote_ip6,
1369 proto=DpoProto.DPO_PROTO_IP6)])
1373 p = self.ipv6_params
1374 p.tun_if.unconfig_ip6()
1375 super(TestIpsecGre6IfEspTra, self).tearDown()
1378 class TestIpsecMGreIfEspTra4(TemplateIpsec, IpsecTun4):
1379 """ Ipsec mGRE ESP v4 TRA tests """
1380 tun4_encrypt_node_name = "esp4-encrypt-tun"
1381 tun4_decrypt_node_name = "esp4-decrypt-tun"
1382 encryption_type = ESP
1384 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1386 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1387 sa.encrypt(IP(src=p.tun_dst,
1388 dst=self.pg0.local_ip4) /
1390 IP(src=self.pg1.local_ip4,
1391 dst=self.pg1.remote_ip4) /
1392 UDP(sport=1144, dport=2233) /
1393 Raw(b'X' * payload_size))
1394 for i in range(count)]
1396 def gen_pkts(self, sw_intf, src, dst, count=1,
1398 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1399 IP(src="1.1.1.1", dst=dst) /
1400 UDP(sport=1144, dport=2233) /
1401 Raw(b'X' * payload_size)
1402 for i in range(count)]
1404 def verify_decrypted(self, p, rxs):
1406 self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1407 self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1409 def verify_encrypted(self, p, sa, rxs):
1412 pkt = sa.decrypt(rx[IP])
1413 if not pkt.haslayer(IP):
1414 pkt = IP(pkt[Raw].load)
1415 self.assert_packet_checksums_valid(pkt)
1416 self.assertTrue(pkt.haslayer(GRE))
1418 self.assertEqual(e[IP].dst, p.remote_tun_if_host)
1419 except (IndexError, AssertionError):
1420 self.logger.debug(ppp("Unexpected packet:", rx))
1422 self.logger.debug(ppp("Decrypted packet:", pkt))
1428 super(TestIpsecMGreIfEspTra4, self).setUp()
1431 self.tun_if = self.pg0
1432 p = self.ipv4_params
1433 p.tun_if = VppGreInterface(self,
1436 mode=(VppEnum.vl_api_tunnel_mode_t.
1437 TUNNEL_API_MODE_MP))
1438 p.tun_if.add_vpp_config()
1440 p.tun_if.config_ip4()
1441 p.tun_if.generate_remote_hosts(N_NHS)
1442 self.pg0.generate_remote_hosts(N_NHS)
1443 self.pg0.configure_ipv4_neighbors()
1445 # setup some SAs for several next-hops on the interface
1446 self.multi_params = []
1448 for ii in range(N_NHS):
1449 p = copy.copy(self.ipv4_params)
1451 p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
1452 p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
1453 p.scapy_tun_spi = p.scapy_tun_spi + ii
1454 p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
1455 p.vpp_tun_spi = p.vpp_tun_spi + ii
1457 p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
1458 p.scapy_tra_spi = p.scapy_tra_spi + ii
1459 p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
1460 p.vpp_tra_spi = p.vpp_tra_spi + ii
1461 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1462 p.auth_algo_vpp_id, p.auth_key,
1463 p.crypt_algo_vpp_id, p.crypt_key,
1464 self.vpp_esp_protocol)
1465 p.tun_sa_out.add_vpp_config()
1467 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1468 p.auth_algo_vpp_id, p.auth_key,
1469 p.crypt_algo_vpp_id, p.crypt_key,
1470 self.vpp_esp_protocol)
1471 p.tun_sa_in.add_vpp_config()
1473 p.tun_protect = VppIpsecTunProtect(
1478 nh=p.tun_if.remote_hosts[ii].ip4)
1479 p.tun_protect.add_vpp_config()
1480 config_tra_params(p, self.encryption_type, p.tun_if)
1481 self.multi_params.append(p)
1483 VppIpRoute(self, p.remote_tun_if_host, 32,
1484 [VppRoutePath(p.tun_if.remote_hosts[ii].ip4,
1485 p.tun_if.sw_if_index)]).add_vpp_config()
1487 # in this v4 variant add the teibs after the protect
1488 p.teib = VppTeib(self, p.tun_if,
1489 p.tun_if.remote_hosts[ii].ip4,
1490 self.pg0.remote_hosts[ii].ip4).add_vpp_config()
1491 p.tun_dst = self.pg0.remote_hosts[ii].ip4
1492 self.logger.info(self.vapi.cli("sh ipsec protect-hash"))
1495 p = self.ipv4_params
1496 p.tun_if.unconfig_ip4()
1497 super(TestIpsecMGreIfEspTra4, self).tearDown()
1499 def test_tun_44(self):
1502 for p in self.multi_params:
1503 self.verify_tun_44(p, count=N_PKTS)
1504 p.teib.remove_vpp_config()
1505 self.verify_tun_dropped_44(p, count=N_PKTS)
1506 p.teib.add_vpp_config()
1507 self.verify_tun_44(p, count=N_PKTS)
1510 class TestIpsecMGreIfEspTra6(TemplateIpsec, IpsecTun6):
1511 """ Ipsec mGRE ESP v6 TRA tests """
1512 tun6_encrypt_node_name = "esp6-encrypt-tun"
1513 tun6_decrypt_node_name = "esp6-decrypt-tun"
1514 encryption_type = ESP
1516 def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1,
1518 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1519 sa.encrypt(IPv6(src=p.tun_dst,
1520 dst=self.pg0.local_ip6) /
1522 IPv6(src=self.pg1.local_ip6,
1523 dst=self.pg1.remote_ip6) /
1524 UDP(sport=1144, dport=2233) /
1525 Raw(b'X' * payload_size))
1526 for i in range(count)]
1528 def gen_pkts6(self, sw_intf, src, dst, count=1,
1530 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1531 IPv6(src="1::1", dst=dst) /
1532 UDP(sport=1144, dport=2233) /
1533 Raw(b'X' * payload_size)
1534 for i in range(count)]
1536 def verify_decrypted6(self, p, rxs):
1538 self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1539 self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
1541 def verify_encrypted6(self, p, sa, rxs):
1544 pkt = sa.decrypt(rx[IPv6])
1545 if not pkt.haslayer(IPv6):
1546 pkt = IPv6(pkt[Raw].load)
1547 self.assert_packet_checksums_valid(pkt)
1548 self.assertTrue(pkt.haslayer(GRE))
1550 self.assertEqual(e[IPv6].dst, p.remote_tun_if_host)
1551 except (IndexError, AssertionError):
1552 self.logger.debug(ppp("Unexpected packet:", rx))
1554 self.logger.debug(ppp("Decrypted packet:", pkt))
1560 super(TestIpsecMGreIfEspTra6, self).setUp()
1562 self.vapi.cli("set logging class ipsec level debug")
1565 self.tun_if = self.pg0
1566 p = self.ipv6_params
1567 p.tun_if = VppGreInterface(self,
1570 mode=(VppEnum.vl_api_tunnel_mode_t.
1571 TUNNEL_API_MODE_MP))
1572 p.tun_if.add_vpp_config()
1574 p.tun_if.config_ip6()
1575 p.tun_if.generate_remote_hosts(N_NHS)
1576 self.pg0.generate_remote_hosts(N_NHS)
1577 self.pg0.configure_ipv6_neighbors()
1579 # setup some SAs for several next-hops on the interface
1580 self.multi_params = []
1582 for ii in range(N_NHS):
1583 p = copy.copy(self.ipv6_params)
1585 p.remote_tun_if_host = "1::%d" % (ii + 1)
1586 p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
1587 p.scapy_tun_spi = p.scapy_tun_spi + ii
1588 p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
1589 p.vpp_tun_spi = p.vpp_tun_spi + ii
1591 p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
1592 p.scapy_tra_spi = p.scapy_tra_spi + ii
1593 p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
1594 p.vpp_tra_spi = p.vpp_tra_spi + ii
1595 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1596 p.auth_algo_vpp_id, p.auth_key,
1597 p.crypt_algo_vpp_id, p.crypt_key,
1598 self.vpp_esp_protocol)
1599 p.tun_sa_out.add_vpp_config()
1601 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1602 p.auth_algo_vpp_id, p.auth_key,
1603 p.crypt_algo_vpp_id, p.crypt_key,
1604 self.vpp_esp_protocol)
1605 p.tun_sa_in.add_vpp_config()
1607 # in this v6 variant add the teibs first then the protection
1608 p.tun_dst = self.pg0.remote_hosts[ii].ip6
1609 VppTeib(self, p.tun_if,
1610 p.tun_if.remote_hosts[ii].ip6,
1611 p.tun_dst).add_vpp_config()
1613 p.tun_protect = VppIpsecTunProtect(
1618 nh=p.tun_if.remote_hosts[ii].ip6)
1619 p.tun_protect.add_vpp_config()
1620 config_tra_params(p, self.encryption_type, p.tun_if)
1621 self.multi_params.append(p)
1623 VppIpRoute(self, p.remote_tun_if_host, 128,
1624 [VppRoutePath(p.tun_if.remote_hosts[ii].ip6,
1625 p.tun_if.sw_if_index)]).add_vpp_config()
1626 p.tun_dst = self.pg0.remote_hosts[ii].ip6
1628 self.logger.info(self.vapi.cli("sh log"))
1629 self.logger.info(self.vapi.cli("sh ipsec protect-hash"))
1630 self.logger.info(self.vapi.cli("sh adj 41"))
1633 p = self.ipv6_params
1634 p.tun_if.unconfig_ip6()
1635 super(TestIpsecMGreIfEspTra6, self).tearDown()
1637 def test_tun_66(self):
1639 for p in self.multi_params:
1640 self.verify_tun_66(p, count=63)
1643 class TemplateIpsec4TunProtect(object):
1644 """ IPsec IPv4 Tunnel protect """
1646 encryption_type = ESP
1647 tun4_encrypt_node_name = "esp4-encrypt-tun"
1648 tun4_decrypt_node_name = "esp4-decrypt-tun"
1649 tun4_input_node = "ipsec4-tun-input"
1651 def config_sa_tra(self, p):
1652 config_tun_params(p, self.encryption_type, p.tun_if)
1654 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1655 p.auth_algo_vpp_id, p.auth_key,
1656 p.crypt_algo_vpp_id, p.crypt_key,
1657 self.vpp_esp_protocol,
1659 p.tun_sa_out.add_vpp_config()
1661 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1662 p.auth_algo_vpp_id, p.auth_key,
1663 p.crypt_algo_vpp_id, p.crypt_key,
1664 self.vpp_esp_protocol,
1666 p.tun_sa_in.add_vpp_config()
1668 def config_sa_tun(self, p):
1669 config_tun_params(p, self.encryption_type, p.tun_if)
1671 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1672 p.auth_algo_vpp_id, p.auth_key,
1673 p.crypt_algo_vpp_id, p.crypt_key,
1674 self.vpp_esp_protocol,
1675 self.tun_if.local_addr[p.addr_type],
1676 self.tun_if.remote_addr[p.addr_type],
1678 p.tun_sa_out.add_vpp_config()
1680 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1681 p.auth_algo_vpp_id, p.auth_key,
1682 p.crypt_algo_vpp_id, p.crypt_key,
1683 self.vpp_esp_protocol,
1684 self.tun_if.remote_addr[p.addr_type],
1685 self.tun_if.local_addr[p.addr_type],
1687 p.tun_sa_in.add_vpp_config()
1689 def config_protect(self, p):
1690 p.tun_protect = VppIpsecTunProtect(self,
1694 p.tun_protect.add_vpp_config()
1696 def config_network(self, p):
1697 p.tun_if = VppIpIpTunInterface(self, self.pg0,
1699 self.pg0.remote_ip4)
1700 p.tun_if.add_vpp_config()
1702 p.tun_if.config_ip4()
1703 p.tun_if.config_ip6()
1705 p.route = VppIpRoute(self, p.remote_tun_if_host, 32,
1706 [VppRoutePath(p.tun_if.remote_ip4,
1708 p.route.add_vpp_config()
1709 r = VppIpRoute(self, p.remote_tun_if_host6, 128,
1710 [VppRoutePath(p.tun_if.remote_ip6,
1712 proto=DpoProto.DPO_PROTO_IP6)])
1715 def unconfig_network(self, p):
1716 p.route.remove_vpp_config()
1717 p.tun_if.remove_vpp_config()
1719 def unconfig_protect(self, p):
1720 p.tun_protect.remove_vpp_config()
1722 def unconfig_sa(self, p):
1723 p.tun_sa_out.remove_vpp_config()
1724 p.tun_sa_in.remove_vpp_config()
1727 class TestIpsec4TunProtect(TemplateIpsec,
1728 TemplateIpsec4TunProtect,
1730 """ IPsec IPv4 Tunnel protect - transport mode"""
1733 super(TestIpsec4TunProtect, self).setUp()
1735 self.tun_if = self.pg0
1738 super(TestIpsec4TunProtect, self).tearDown()
1740 def test_tun_44(self):
1741 """IPSEC tunnel protect"""
1743 p = self.ipv4_params
1745 self.config_network(p)
1746 self.config_sa_tra(p)
1747 self.config_protect(p)
1749 self.verify_tun_44(p, count=127)
1750 c = p.tun_if.get_rx_stats()
1751 self.assertEqual(c['packets'], 127)
1752 c = p.tun_if.get_tx_stats()
1753 self.assertEqual(c['packets'], 127)
1755 self.vapi.cli("clear ipsec sa")
1756 self.verify_tun_64(p, count=127)
1757 c = p.tun_if.get_rx_stats()
1758 self.assertEqual(c['packets'], 254)
1759 c = p.tun_if.get_tx_stats()
1760 self.assertEqual(c['packets'], 254)
1762 # rekey - create new SAs and update the tunnel protection
1764 np.crypt_key = b'X' + p.crypt_key[1:]
1765 np.scapy_tun_spi += 100
1766 np.scapy_tun_sa_id += 1
1767 np.vpp_tun_spi += 100
1768 np.vpp_tun_sa_id += 1
1769 np.tun_if.local_spi = p.vpp_tun_spi
1770 np.tun_if.remote_spi = p.scapy_tun_spi
1772 self.config_sa_tra(np)
1773 self.config_protect(np)
1776 self.verify_tun_44(np, count=127)
1777 c = p.tun_if.get_rx_stats()
1778 self.assertEqual(c['packets'], 381)
1779 c = p.tun_if.get_tx_stats()
1780 self.assertEqual(c['packets'], 381)
1783 self.unconfig_protect(np)
1784 self.unconfig_sa(np)
1785 self.unconfig_network(p)
1788 class TestIpsec4TunProtectUdp(TemplateIpsec,
1789 TemplateIpsec4TunProtect,
1791 """ IPsec IPv4 Tunnel protect - transport mode"""
1794 super(TestIpsec4TunProtectUdp, self).setUp()
1796 self.tun_if = self.pg0
1798 p = self.ipv4_params
1799 p.flags = (VppEnum.vl_api_ipsec_sad_flags_t.
1800 IPSEC_API_SAD_FLAG_UDP_ENCAP)
1801 p.nat_header = UDP(sport=5454, dport=4500)
1802 self.config_network(p)
1803 self.config_sa_tra(p)
1804 self.config_protect(p)
1807 p = self.ipv4_params
1808 self.unconfig_protect(p)
1810 self.unconfig_network(p)
1811 super(TestIpsec4TunProtectUdp, self).tearDown()
1813 def test_tun_44(self):
1814 """IPSEC UDP tunnel protect"""
1816 p = self.ipv4_params
1818 self.verify_tun_44(p, count=127)
1819 c = p.tun_if.get_rx_stats()
1820 self.assertEqual(c['packets'], 127)
1821 c = p.tun_if.get_tx_stats()
1822 self.assertEqual(c['packets'], 127)
1824 def test_keepalive(self):
1825 """ IPSEC NAT Keepalive """
1826 self.verify_keepalive(self.ipv4_params)
1829 class TestIpsec4TunProtectTun(TemplateIpsec,
1830 TemplateIpsec4TunProtect,
1832 """ IPsec IPv4 Tunnel protect - tunnel mode"""
1834 encryption_type = ESP
1835 tun4_encrypt_node_name = "esp4-encrypt-tun"
1836 tun4_decrypt_node_name = "esp4-decrypt-tun"
1839 super(TestIpsec4TunProtectTun, self).setUp()
1841 self.tun_if = self.pg0
1844 super(TestIpsec4TunProtectTun, self).tearDown()
1846 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1848 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1849 sa.encrypt(IP(src=sw_intf.remote_ip4,
1850 dst=sw_intf.local_ip4) /
1851 IP(src=src, dst=dst) /
1852 UDP(sport=1144, dport=2233) /
1853 Raw(b'X' * payload_size))
1854 for i in range(count)]
1856 def gen_pkts(self, sw_intf, src, dst, count=1,
1858 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1859 IP(src=src, dst=dst) /
1860 UDP(sport=1144, dport=2233) /
1861 Raw(b'X' * payload_size)
1862 for i in range(count)]
1864 def verify_decrypted(self, p, rxs):
1866 self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1867 self.assert_equal(rx[IP].src, p.remote_tun_if_host)
1868 self.assert_packet_checksums_valid(rx)
1870 def verify_encrypted(self, p, sa, rxs):
1873 pkt = sa.decrypt(rx[IP])
1874 if not pkt.haslayer(IP):
1875 pkt = IP(pkt[Raw].load)
1876 self.assert_packet_checksums_valid(pkt)
1877 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1878 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1879 inner = pkt[IP].payload
1880 self.assertEqual(inner[IP][IP].dst, p.remote_tun_if_host)
1882 except (IndexError, AssertionError):
1883 self.logger.debug(ppp("Unexpected packet:", rx))
1885 self.logger.debug(ppp("Decrypted packet:", pkt))
1890 def test_tun_44(self):
1891 """IPSEC tunnel protect """
1893 p = self.ipv4_params
1895 self.config_network(p)
1896 self.config_sa_tun(p)
1897 self.config_protect(p)
1899 self.verify_tun_44(p, count=127)
1901 c = p.tun_if.get_rx_stats()
1902 self.assertEqual(c['packets'], 127)
1903 c = p.tun_if.get_tx_stats()
1904 self.assertEqual(c['packets'], 127)
1906 # rekey - create new SAs and update the tunnel protection
1908 np.crypt_key = b'X' + p.crypt_key[1:]
1909 np.scapy_tun_spi += 100
1910 np.scapy_tun_sa_id += 1
1911 np.vpp_tun_spi += 100
1912 np.vpp_tun_sa_id += 1
1913 np.tun_if.local_spi = p.vpp_tun_spi
1914 np.tun_if.remote_spi = p.scapy_tun_spi
1916 self.config_sa_tun(np)
1917 self.config_protect(np)
1920 self.verify_tun_44(np, count=127)
1921 c = p.tun_if.get_rx_stats()
1922 self.assertEqual(c['packets'], 254)
1923 c = p.tun_if.get_tx_stats()
1924 self.assertEqual(c['packets'], 254)
1927 self.unconfig_protect(np)
1928 self.unconfig_sa(np)
1929 self.unconfig_network(p)
1932 class TestIpsec4TunProtectTunDrop(TemplateIpsec,
1933 TemplateIpsec4TunProtect,
1935 """ IPsec IPv4 Tunnel protect - tunnel mode - drop"""
1937 encryption_type = ESP
1938 tun4_encrypt_node_name = "esp4-encrypt-tun"
1939 tun4_decrypt_node_name = "esp4-decrypt-tun"
1942 super(TestIpsec4TunProtectTunDrop, self).setUp()
1944 self.tun_if = self.pg0
1947 super(TestIpsec4TunProtectTunDrop, self).tearDown()
1949 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1951 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1952 sa.encrypt(IP(src=sw_intf.remote_ip4,
1954 IP(src=src, dst=dst) /
1955 UDP(sport=1144, dport=2233) /
1956 Raw(b'X' * payload_size))
1957 for i in range(count)]
1959 def test_tun_drop_44(self):
1960 """IPSEC tunnel protect bogus tunnel header """
1962 p = self.ipv4_params
1964 self.config_network(p)
1965 self.config_sa_tun(p)
1966 self.config_protect(p)
1968 tx = self.gen_encrypt_pkts(p, p.scapy_tun_sa, self.tun_if,
1969 src=p.remote_tun_if_host,
1970 dst=self.pg1.remote_ip4,
1972 self.send_and_assert_no_replies(self.tun_if, tx)
1975 self.unconfig_protect(p)
1977 self.unconfig_network(p)
1980 class TemplateIpsec6TunProtect(object):
1981 """ IPsec IPv6 Tunnel protect """
1983 def config_sa_tra(self, p):
1984 config_tun_params(p, self.encryption_type, p.tun_if)
1986 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1987 p.auth_algo_vpp_id, p.auth_key,
1988 p.crypt_algo_vpp_id, p.crypt_key,
1989 self.vpp_esp_protocol)
1990 p.tun_sa_out.add_vpp_config()
1992 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1993 p.auth_algo_vpp_id, p.auth_key,
1994 p.crypt_algo_vpp_id, p.crypt_key,
1995 self.vpp_esp_protocol)
1996 p.tun_sa_in.add_vpp_config()
1998 def config_sa_tun(self, p):
1999 config_tun_params(p, self.encryption_type, p.tun_if)
2001 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
2002 p.auth_algo_vpp_id, p.auth_key,
2003 p.crypt_algo_vpp_id, p.crypt_key,
2004 self.vpp_esp_protocol,
2005 self.tun_if.local_addr[p.addr_type],
2006 self.tun_if.remote_addr[p.addr_type])
2007 p.tun_sa_out.add_vpp_config()
2009 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
2010 p.auth_algo_vpp_id, p.auth_key,
2011 p.crypt_algo_vpp_id, p.crypt_key,
2012 self.vpp_esp_protocol,
2013 self.tun_if.remote_addr[p.addr_type],
2014 self.tun_if.local_addr[p.addr_type])
2015 p.tun_sa_in.add_vpp_config()
2017 def config_protect(self, p):
2018 p.tun_protect = VppIpsecTunProtect(self,
2022 p.tun_protect.add_vpp_config()
2024 def config_network(self, p):
2025 p.tun_if = VppIpIpTunInterface(self, self.pg0,
2027 self.pg0.remote_ip6)
2028 p.tun_if.add_vpp_config()
2030 p.tun_if.config_ip6()
2031 p.tun_if.config_ip4()
2033 p.route = VppIpRoute(self, p.remote_tun_if_host, 128,
2034 [VppRoutePath(p.tun_if.remote_ip6,
2036 proto=DpoProto.DPO_PROTO_IP6)])
2037 p.route.add_vpp_config()
2038 r = VppIpRoute(self, p.remote_tun_if_host4, 32,
2039 [VppRoutePath(p.tun_if.remote_ip4,
2043 def unconfig_network(self, p):
2044 p.route.remove_vpp_config()
2045 p.tun_if.remove_vpp_config()
2047 def unconfig_protect(self, p):
2048 p.tun_protect.remove_vpp_config()
2050 def unconfig_sa(self, p):
2051 p.tun_sa_out.remove_vpp_config()
2052 p.tun_sa_in.remove_vpp_config()
2055 class TestIpsec6TunProtect(TemplateIpsec,
2056 TemplateIpsec6TunProtect,
2058 """ IPsec IPv6 Tunnel protect - transport mode"""
2060 encryption_type = ESP
2061 tun6_encrypt_node_name = "esp6-encrypt-tun"
2062 tun6_decrypt_node_name = "esp6-decrypt-tun"
2065 super(TestIpsec6TunProtect, self).setUp()
2067 self.tun_if = self.pg0
2070 super(TestIpsec6TunProtect, self).tearDown()
2072 def test_tun_66(self):
2073 """IPSEC tunnel protect 6o6"""
2075 p = self.ipv6_params
2077 self.config_network(p)
2078 self.config_sa_tra(p)
2079 self.config_protect(p)
2081 self.verify_tun_66(p, count=127)
2082 c = p.tun_if.get_rx_stats()
2083 self.assertEqual(c['packets'], 127)
2084 c = p.tun_if.get_tx_stats()
2085 self.assertEqual(c['packets'], 127)
2087 # rekey - create new SAs and update the tunnel protection
2089 np.crypt_key = b'X' + p.crypt_key[1:]
2090 np.scapy_tun_spi += 100
2091 np.scapy_tun_sa_id += 1
2092 np.vpp_tun_spi += 100
2093 np.vpp_tun_sa_id += 1
2094 np.tun_if.local_spi = p.vpp_tun_spi
2095 np.tun_if.remote_spi = p.scapy_tun_spi
2097 self.config_sa_tra(np)
2098 self.config_protect(np)
2101 self.verify_tun_66(np, count=127)
2102 c = p.tun_if.get_rx_stats()
2103 self.assertEqual(c['packets'], 254)
2104 c = p.tun_if.get_tx_stats()
2105 self.assertEqual(c['packets'], 254)
2107 # bounce the interface state
2108 p.tun_if.admin_down()
2109 self.verify_drop_tun_66(np, count=127)
2110 node = ('/err/ipsec6-tun-input/%s' %
2111 'ipsec packets received on disabled interface')
2112 self.assertEqual(127, self.statistics.get_err_counter(node))
2114 self.verify_tun_66(np, count=127)
2117 # 1) add two input SAs [old, new]
2118 # 2) swap output SA to [new]
2119 # 3) use only [new] input SA
2121 np3.crypt_key = b'Z' + p.crypt_key[1:]
2122 np3.scapy_tun_spi += 100
2123 np3.scapy_tun_sa_id += 1
2124 np3.vpp_tun_spi += 100
2125 np3.vpp_tun_sa_id += 1
2126 np3.tun_if.local_spi = p.vpp_tun_spi
2127 np3.tun_if.remote_spi = p.scapy_tun_spi
2129 self.config_sa_tra(np3)
2132 p.tun_protect.update_vpp_config(np.tun_sa_out,
2133 [np.tun_sa_in, np3.tun_sa_in])
2134 self.verify_tun_66(np, np, count=127)
2135 self.verify_tun_66(np3, np, count=127)
2138 p.tun_protect.update_vpp_config(np3.tun_sa_out,
2139 [np.tun_sa_in, np3.tun_sa_in])
2140 self.verify_tun_66(np, np3, count=127)
2141 self.verify_tun_66(np3, np3, count=127)
2144 p.tun_protect.update_vpp_config(np3.tun_sa_out,
2146 self.verify_tun_66(np3, np3, count=127)
2147 self.verify_drop_tun_66(np, count=127)
2149 c = p.tun_if.get_rx_stats()
2150 self.assertEqual(c['packets'], 127*9)
2151 c = p.tun_if.get_tx_stats()
2152 self.assertEqual(c['packets'], 127*8)
2153 self.unconfig_sa(np)
2156 self.unconfig_protect(np3)
2157 self.unconfig_sa(np3)
2158 self.unconfig_network(p)
2160 def test_tun_46(self):
2161 """IPSEC tunnel protect 4o6"""
2163 p = self.ipv6_params
2165 self.config_network(p)
2166 self.config_sa_tra(p)
2167 self.config_protect(p)
2169 self.verify_tun_46(p, count=127)
2170 c = p.tun_if.get_rx_stats()
2171 self.assertEqual(c['packets'], 127)
2172 c = p.tun_if.get_tx_stats()
2173 self.assertEqual(c['packets'], 127)
2176 self.unconfig_protect(p)
2178 self.unconfig_network(p)
2181 class TestIpsec6TunProtectTun(TemplateIpsec,
2182 TemplateIpsec6TunProtect,
2184 """ IPsec IPv6 Tunnel protect - tunnel mode"""
2186 encryption_type = ESP
2187 tun6_encrypt_node_name = "esp6-encrypt-tun"
2188 tun6_decrypt_node_name = "esp6-decrypt-tun"
2191 super(TestIpsec6TunProtectTun, self).setUp()
2193 self.tun_if = self.pg0
2196 super(TestIpsec6TunProtectTun, self).tearDown()
2198 def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1,
2200 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2201 sa.encrypt(IPv6(src=sw_intf.remote_ip6,
2202 dst=sw_intf.local_ip6) /
2203 IPv6(src=src, dst=dst) /
2204 UDP(sport=1166, dport=2233) /
2205 Raw(b'X' * payload_size))
2206 for i in range(count)]
2208 def gen_pkts6(self, sw_intf, src, dst, count=1,
2210 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2211 IPv6(src=src, dst=dst) /
2212 UDP(sport=1166, dport=2233) /
2213 Raw(b'X' * payload_size)
2214 for i in range(count)]
2216 def verify_decrypted6(self, p, rxs):
2218 self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
2219 self.assert_equal(rx[IPv6].src, p.remote_tun_if_host)
2220 self.assert_packet_checksums_valid(rx)
2222 def verify_encrypted6(self, p, sa, rxs):
2225 pkt = sa.decrypt(rx[IPv6])
2226 if not pkt.haslayer(IPv6):
2227 pkt = IPv6(pkt[Raw].load)
2228 self.assert_packet_checksums_valid(pkt)
2229 self.assert_equal(pkt[IPv6].dst, self.pg0.remote_ip6)
2230 self.assert_equal(pkt[IPv6].src, self.pg0.local_ip6)
2231 inner = pkt[IPv6].payload
2232 self.assertEqual(inner[IPv6][IPv6].dst, p.remote_tun_if_host)
2234 except (IndexError, AssertionError):
2235 self.logger.debug(ppp("Unexpected packet:", rx))
2237 self.logger.debug(ppp("Decrypted packet:", pkt))
2242 def test_tun_66(self):
2243 """IPSEC tunnel protect """
2245 p = self.ipv6_params
2247 self.config_network(p)
2248 self.config_sa_tun(p)
2249 self.config_protect(p)
2251 self.verify_tun_66(p, count=127)
2253 c = p.tun_if.get_rx_stats()
2254 self.assertEqual(c['packets'], 127)
2255 c = p.tun_if.get_tx_stats()
2256 self.assertEqual(c['packets'], 127)
2258 # rekey - create new SAs and update the tunnel protection
2260 np.crypt_key = b'X' + p.crypt_key[1:]
2261 np.scapy_tun_spi += 100
2262 np.scapy_tun_sa_id += 1
2263 np.vpp_tun_spi += 100
2264 np.vpp_tun_sa_id += 1
2265 np.tun_if.local_spi = p.vpp_tun_spi
2266 np.tun_if.remote_spi = p.scapy_tun_spi
2268 self.config_sa_tun(np)
2269 self.config_protect(np)
2272 self.verify_tun_66(np, count=127)
2273 c = p.tun_if.get_rx_stats()
2274 self.assertEqual(c['packets'], 254)
2275 c = p.tun_if.get_tx_stats()
2276 self.assertEqual(c['packets'], 254)
2279 self.unconfig_protect(np)
2280 self.unconfig_sa(np)
2281 self.unconfig_network(p)
2284 class TestIpsec6TunProtectTunDrop(TemplateIpsec,
2285 TemplateIpsec6TunProtect,
2287 """ IPsec IPv6 Tunnel protect - tunnel mode - drop"""
2289 encryption_type = ESP
2290 tun6_encrypt_node_name = "esp6-encrypt-tun"
2291 tun6_decrypt_node_name = "esp6-decrypt-tun"
2294 super(TestIpsec6TunProtectTunDrop, self).setUp()
2296 self.tun_if = self.pg0
2299 super(TestIpsec6TunProtectTunDrop, self).tearDown()
2301 def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1,
2303 # the IP destination of the revelaed packet does not match
2304 # that assigned to the tunnel
2305 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2306 sa.encrypt(IPv6(src=sw_intf.remote_ip6,
2308 IPv6(src=src, dst=dst) /
2309 UDP(sport=1144, dport=2233) /
2310 Raw(b'X' * payload_size))
2311 for i in range(count)]
2313 def test_tun_drop_66(self):
2314 """IPSEC 6 tunnel protect bogus tunnel header """
2316 p = self.ipv6_params
2318 self.config_network(p)
2319 self.config_sa_tun(p)
2320 self.config_protect(p)
2322 tx = self.gen_encrypt_pkts6(p, p.scapy_tun_sa, self.tun_if,
2323 src=p.remote_tun_if_host,
2324 dst=self.pg1.remote_ip6,
2326 self.send_and_assert_no_replies(self.tun_if, tx)
2328 self.unconfig_protect(p)
2330 self.unconfig_network(p)
2333 if __name__ == '__main__':
2334 unittest.main(testRunner=VppTestRunner)