5 from scapy.layers.ipsec import SecurityAssociation, ESP
6 from scapy.layers.l2 import Ether, GRE, Dot1Q
7 from scapy.packet import Raw
8 from scapy.layers.inet import IP, UDP
9 from scapy.layers.inet6 import IPv6
10 from framework import VppTestRunner
11 from template_ipsec import TemplateIpsec, IpsecTun4Tests, IpsecTun6Tests, \
12 IpsecTun4, IpsecTun6, IpsecTcpTests, mk_scapy_crypt_key, \
13 IpsecTun6HandoffTests, IpsecTun4HandoffTests, config_tun_params
14 from vpp_ipsec_tun_interface import VppIpsecTunInterface
15 from vpp_gre_interface import VppGreInterface
16 from vpp_ipip_tun_interface import VppIpIpTunInterface
17 from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto
18 from vpp_ipsec import VppIpsecSA, VppIpsecTunProtect
19 from vpp_l2 import VppBridgeDomain, VppBridgeDomainPort
20 from vpp_sub_interface import L2_VTR_OP, VppDot1QSubint
21 from vpp_teib import VppTeib
23 from vpp_papi import VppEnum
26 def config_tun_params(p, encryption_type, tun_if):
27 ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
28 esn_en = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
29 IPSEC_API_SAD_FLAG_USE_ESN))
30 crypt_key = mk_scapy_crypt_key(p)
31 p.tun_dst = tun_if.remote_ip
32 p.tun_src = tun_if.local_ip
33 p.scapy_tun_sa = SecurityAssociation(
34 encryption_type, spi=p.vpp_tun_spi,
35 crypt_algo=p.crypt_algo,
37 auth_algo=p.auth_algo, auth_key=p.auth_key,
38 tunnel_header=ip_class_by_addr_type[p.addr_type](
41 nat_t_header=p.nat_header,
43 p.vpp_tun_sa = SecurityAssociation(
44 encryption_type, spi=p.scapy_tun_spi,
45 crypt_algo=p.crypt_algo,
47 auth_algo=p.auth_algo, auth_key=p.auth_key,
48 tunnel_header=ip_class_by_addr_type[p.addr_type](
51 nat_t_header=p.nat_header,
55 def config_tra_params(p, encryption_type, tun_if):
56 ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
57 esn_en = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
58 IPSEC_API_SAD_FLAG_USE_ESN))
59 crypt_key = mk_scapy_crypt_key(p)
60 p.tun_dst = tun_if.remote_ip
61 p.tun_src = tun_if.local_ip
62 p.scapy_tun_sa = SecurityAssociation(
63 encryption_type, spi=p.vpp_tun_spi,
64 crypt_algo=p.crypt_algo,
66 auth_algo=p.auth_algo, auth_key=p.auth_key,
68 p.vpp_tun_sa = SecurityAssociation(
69 encryption_type, spi=p.scapy_tun_spi,
70 crypt_algo=p.crypt_algo,
72 auth_algo=p.auth_algo, auth_key=p.auth_key,
76 class TemplateIpsec4TunIfEsp(TemplateIpsec):
77 """ IPsec tunnel interface tests """
83 super(TemplateIpsec4TunIfEsp, cls).setUpClass()
86 def tearDownClass(cls):
87 super(TemplateIpsec4TunIfEsp, cls).tearDownClass()
90 super(TemplateIpsec4TunIfEsp, self).setUp()
92 self.tun_if = self.pg0
96 p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
97 p.scapy_tun_spi, p.crypt_algo_vpp_id,
98 p.crypt_key, p.crypt_key,
99 p.auth_algo_vpp_id, p.auth_key,
101 p.tun_if.add_vpp_config()
103 p.tun_if.config_ip4()
104 p.tun_if.config_ip6()
105 config_tun_params(p, self.encryption_type, p.tun_if)
107 r = VppIpRoute(self, p.remote_tun_if_host, 32,
108 [VppRoutePath(p.tun_if.remote_ip4,
111 r = VppIpRoute(self, p.remote_tun_if_host6, 128,
112 [VppRoutePath(p.tun_if.remote_ip6,
114 proto=DpoProto.DPO_PROTO_IP6)])
118 super(TemplateIpsec4TunIfEsp, self).tearDown()
121 class TemplateIpsec4TunIfEspUdp(TemplateIpsec):
122 """ IPsec UDP tunnel interface tests """
124 tun4_encrypt_node_name = "esp4-encrypt-tun"
125 tun4_decrypt_node_name = "esp4-decrypt-tun"
126 encryption_type = ESP
130 super(TemplateIpsec4TunIfEspUdp, cls).setUpClass()
133 def tearDownClass(cls):
134 super(TemplateIpsec4TunIfEspUdp, cls).tearDownClass()
136 def verify_encrypted(self, p, sa, rxs):
139 # ensure the UDP ports are correct before we decrypt
141 self.assertTrue(rx.haslayer(UDP))
142 self.assert_equal(rx[UDP].sport, 4500)
143 self.assert_equal(rx[UDP].dport, 4500)
145 pkt = sa.decrypt(rx[IP])
146 if not pkt.haslayer(IP):
147 pkt = IP(pkt[Raw].load)
149 self.assert_packet_checksums_valid(pkt)
150 self.assert_equal(pkt[IP].dst, "1.1.1.1")
151 self.assert_equal(pkt[IP].src, self.pg1.remote_ip4)
152 except (IndexError, AssertionError):
153 self.logger.debug(ppp("Unexpected packet:", rx))
155 self.logger.debug(ppp("Decrypted packet:", pkt))
161 super(TemplateIpsec4TunIfEspUdp, self).setUp()
164 p.flags = (VppEnum.vl_api_ipsec_sad_flags_t.
165 IPSEC_API_SAD_FLAG_UDP_ENCAP)
166 p.nat_header = UDP(sport=5454, dport=4500)
168 def config_network(self):
170 self.tun_if = self.pg0
172 p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
173 p.scapy_tun_spi, p.crypt_algo_vpp_id,
174 p.crypt_key, p.crypt_key,
175 p.auth_algo_vpp_id, p.auth_key,
176 p.auth_key, udp_encap=True)
177 p.tun_if.add_vpp_config()
179 p.tun_if.config_ip4()
180 p.tun_if.config_ip6()
181 config_tun_params(p, self.encryption_type, p.tun_if)
183 r = VppIpRoute(self, p.remote_tun_if_host, 32,
184 [VppRoutePath(p.tun_if.remote_ip4,
187 r = VppIpRoute(self, p.remote_tun_if_host6, 128,
188 [VppRoutePath(p.tun_if.remote_ip6,
190 proto=DpoProto.DPO_PROTO_IP6)])
194 super(TemplateIpsec4TunIfEspUdp, self).tearDown()
197 class TestIpsec4TunIfEsp1(TemplateIpsec4TunIfEsp, IpsecTun4Tests):
198 """ Ipsec ESP - TUN tests """
199 tun4_encrypt_node_name = "esp4-encrypt-tun"
200 tun4_decrypt_node_name = "esp4-decrypt-tun"
202 def test_tun_basic64(self):
203 """ ipsec 6o4 tunnel basic test """
204 self.tun4_encrypt_node_name = "esp4-encrypt-tun"
206 self.verify_tun_64(self.params[socket.AF_INET], count=1)
208 def test_tun_burst64(self):
209 """ ipsec 6o4 tunnel basic test """
210 self.tun4_encrypt_node_name = "esp4-encrypt-tun"
212 self.verify_tun_64(self.params[socket.AF_INET], count=257)
214 def test_tun_basic_frag44(self):
215 """ ipsec 4o4 tunnel frag basic test """
216 self.tun4_encrypt_node_name = "esp4-encrypt-tun"
220 self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index,
222 self.verify_tun_44(self.params[socket.AF_INET],
223 count=1, payload_size=1800, n_rx=2)
224 self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index,
228 class TestIpsec4TunIfEspUdp(TemplateIpsec4TunIfEspUdp, IpsecTun4Tests):
229 """ Ipsec ESP UDP tests """
231 tun4_input_node = "ipsec4-tun-input"
234 super(TemplateIpsec4TunIfEspUdp, self).setUp()
235 self.config_network()
237 def test_keepalive(self):
238 """ IPSEC NAT Keepalive """
239 self.verify_keepalive(self.ipv4_params)
242 class TestIpsec4TunIfEspUdpGCM(TemplateIpsec4TunIfEspUdp, IpsecTun4Tests):
243 """ Ipsec ESP UDP GCM tests """
245 tun4_input_node = "ipsec4-tun-input"
248 super(TemplateIpsec4TunIfEspUdp, self).setUp()
250 p.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
251 IPSEC_API_INTEG_ALG_NONE)
252 p.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
253 IPSEC_API_CRYPTO_ALG_AES_GCM_256)
254 p.crypt_algo = "AES-GCM"
256 p.crypt_key = b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"
258 self.config_network()
261 class TestIpsec4TunIfEsp2(TemplateIpsec4TunIfEsp, IpsecTcpTests):
262 """ Ipsec ESP - TCP tests """
266 class TemplateIpsec6TunIfEsp(TemplateIpsec):
267 """ IPsec tunnel interface tests """
269 encryption_type = ESP
272 super(TemplateIpsec6TunIfEsp, self).setUp()
274 self.tun_if = self.pg0
277 p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
278 p.scapy_tun_spi, p.crypt_algo_vpp_id,
279 p.crypt_key, p.crypt_key,
280 p.auth_algo_vpp_id, p.auth_key,
281 p.auth_key, is_ip6=True)
282 p.tun_if.add_vpp_config()
284 p.tun_if.config_ip6()
285 p.tun_if.config_ip4()
286 config_tun_params(p, self.encryption_type, p.tun_if)
288 r = VppIpRoute(self, p.remote_tun_if_host, 128,
289 [VppRoutePath(p.tun_if.remote_ip6,
291 proto=DpoProto.DPO_PROTO_IP6)])
293 r = VppIpRoute(self, p.remote_tun_if_host4, 32,
294 [VppRoutePath(p.tun_if.remote_ip4,
299 super(TemplateIpsec6TunIfEsp, self).tearDown()
302 class TestIpsec6TunIfEsp1(TemplateIpsec6TunIfEsp,
304 """ Ipsec ESP - TUN tests """
305 tun6_encrypt_node_name = "esp6-encrypt-tun"
306 tun6_decrypt_node_name = "esp6-decrypt-tun"
308 def test_tun_basic46(self):
309 """ ipsec 4o6 tunnel basic test """
310 self.tun6_encrypt_node_name = "esp6-encrypt-tun"
311 self.verify_tun_46(self.params[socket.AF_INET6], count=1)
313 def test_tun_burst46(self):
314 """ ipsec 4o6 tunnel burst test """
315 self.tun6_encrypt_node_name = "esp6-encrypt-tun"
316 self.verify_tun_46(self.params[socket.AF_INET6], count=257)
319 class TestIpsec6TunIfEspHandoff(TemplateIpsec6TunIfEsp,
320 IpsecTun6HandoffTests):
321 """ Ipsec ESP 6 Handoff tests """
322 tun6_encrypt_node_name = "esp6-encrypt-tun"
323 tun6_decrypt_node_name = "esp6-decrypt-tun"
326 class TestIpsec4TunIfEspHandoff(TemplateIpsec4TunIfEsp,
327 IpsecTun4HandoffTests):
328 """ Ipsec ESP 4 Handoff tests """
329 tun4_encrypt_node_name = "esp4-encrypt-tun"
330 tun4_decrypt_node_name = "esp4-decrypt-tun"
333 class TestIpsec4MultiTunIfEsp(TemplateIpsec, IpsecTun4):
334 """ IPsec IPv4 Multi Tunnel interface """
336 encryption_type = ESP
337 tun4_encrypt_node_name = "esp4-encrypt-tun"
338 tun4_decrypt_node_name = "esp4-decrypt-tun"
341 super(TestIpsec4MultiTunIfEsp, self).setUp()
343 self.tun_if = self.pg0
345 self.multi_params = []
346 self.pg0.generate_remote_hosts(10)
347 self.pg0.configure_ipv4_neighbors()
350 p = copy.copy(self.ipv4_params)
352 p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
353 p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
354 p.scapy_tun_spi = p.scapy_tun_spi + ii
355 p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
356 p.vpp_tun_spi = p.vpp_tun_spi + ii
358 p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
359 p.scapy_tra_spi = p.scapy_tra_spi + ii
360 p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
361 p.vpp_tra_spi = p.vpp_tra_spi + ii
362 p.tun_dst = self.pg0.remote_hosts[ii].ip4
364 p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
367 p.crypt_key, p.crypt_key,
368 p.auth_algo_vpp_id, p.auth_key,
371 p.tun_if.add_vpp_config()
373 p.tun_if.config_ip4()
374 config_tun_params(p, self.encryption_type, p.tun_if)
375 self.multi_params.append(p)
377 VppIpRoute(self, p.remote_tun_if_host, 32,
378 [VppRoutePath(p.tun_if.remote_ip4,
379 0xffffffff)]).add_vpp_config()
382 super(TestIpsec4MultiTunIfEsp, self).tearDown()
384 def test_tun_44(self):
385 """Multiple IPSEC tunnel interfaces """
386 for p in self.multi_params:
387 self.verify_tun_44(p, count=127)
388 c = p.tun_if.get_rx_stats()
389 self.assertEqual(c['packets'], 127)
390 c = p.tun_if.get_tx_stats()
391 self.assertEqual(c['packets'], 127)
393 def test_tun_rr_44(self):
394 """ Round-robin packets acrros multiple interface """
396 for p in self.multi_params:
397 tx = tx + self.gen_encrypt_pkts(p, p.scapy_tun_sa, self.tun_if,
398 src=p.remote_tun_if_host,
399 dst=self.pg1.remote_ip4)
400 rxs = self.send_and_expect(self.tun_if, tx, self.pg1)
402 for rx, p in zip(rxs, self.multi_params):
403 self.verify_decrypted(p, [rx])
406 for p in self.multi_params:
407 tx = tx + self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
408 dst=p.remote_tun_if_host)
409 rxs = self.send_and_expect(self.pg1, tx, self.tun_if)
411 for rx, p in zip(rxs, self.multi_params):
412 self.verify_encrypted(p, p.vpp_tun_sa, [rx])
415 class TestIpsec4TunIfEspAll(TemplateIpsec, IpsecTun4):
416 """ IPsec IPv4 Tunnel interface all Algos """
418 encryption_type = ESP
419 tun4_encrypt_node_name = "esp4-encrypt-tun"
420 tun4_decrypt_node_name = "esp4-decrypt-tun"
422 def config_network(self, p):
424 p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
427 p.crypt_key, p.crypt_key,
428 p.auth_algo_vpp_id, p.auth_key,
431 p.tun_if.add_vpp_config()
433 p.tun_if.config_ip4()
434 config_tun_params(p, self.encryption_type, p.tun_if)
435 self.logger.info(self.vapi.cli("sh ipsec sa 0"))
436 self.logger.info(self.vapi.cli("sh ipsec sa 1"))
438 p.route = VppIpRoute(self, p.remote_tun_if_host, 32,
439 [VppRoutePath(p.tun_if.remote_ip4,
441 p.route.add_vpp_config()
443 def unconfig_network(self, p):
444 p.tun_if.unconfig_ip4()
445 p.tun_if.remove_vpp_config()
446 p.route.remove_vpp_config()
449 super(TestIpsec4TunIfEspAll, self).setUp()
451 self.tun_if = self.pg0
454 super(TestIpsec4TunIfEspAll, self).tearDown()
458 # change the key and the SPI
460 p.crypt_key = b'X' + p.crypt_key[1:]
462 p.scapy_tun_sa_id += 1
465 p.tun_if.local_spi = p.vpp_tun_spi
466 p.tun_if.remote_spi = p.scapy_tun_spi
468 config_tun_params(p, self.encryption_type, p.tun_if)
470 p.tun_sa_in = VppIpsecSA(self,
477 self.vpp_esp_protocol,
480 p.tun_sa_out = VppIpsecSA(self,
487 self.vpp_esp_protocol,
490 p.tun_sa_in.add_vpp_config()
491 p.tun_sa_out.add_vpp_config()
493 self.vapi.ipsec_tunnel_if_set_sa(sw_if_index=p.tun_if.sw_if_index,
494 sa_id=p.tun_sa_in.id,
496 self.vapi.ipsec_tunnel_if_set_sa(sw_if_index=p.tun_if.sw_if_index,
497 sa_id=p.tun_sa_out.id,
499 self.logger.info(self.vapi.cli("sh ipsec sa"))
501 def test_tun_44(self):
502 """IPSEC tunnel all algos """
504 # foreach VPP crypto engine
505 engines = ["ia32", "ipsecmb", "openssl"]
507 # foreach crypto algorithm
508 algos = [{'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
509 IPSEC_API_CRYPTO_ALG_AES_GCM_128),
510 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
511 IPSEC_API_INTEG_ALG_NONE),
512 'scapy-crypto': "AES-GCM",
513 'scapy-integ': "NULL",
514 'key': b"JPjyOWBeVEQiMe7h",
516 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
517 IPSEC_API_CRYPTO_ALG_AES_GCM_192),
518 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
519 IPSEC_API_INTEG_ALG_NONE),
520 'scapy-crypto': "AES-GCM",
521 'scapy-integ': "NULL",
522 'key': b"JPjyOWBeVEQiMe7hJPjyOWBe",
524 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
525 IPSEC_API_CRYPTO_ALG_AES_GCM_256),
526 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
527 IPSEC_API_INTEG_ALG_NONE),
528 'scapy-crypto': "AES-GCM",
529 'scapy-integ': "NULL",
530 'key': b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h",
532 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
533 IPSEC_API_CRYPTO_ALG_AES_CBC_128),
534 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
535 IPSEC_API_INTEG_ALG_SHA1_96),
536 'scapy-crypto': "AES-CBC",
537 'scapy-integ': "HMAC-SHA1-96",
539 'key': b"JPjyOWBeVEQiMe7h"},
540 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
541 IPSEC_API_CRYPTO_ALG_AES_CBC_192),
542 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
543 IPSEC_API_INTEG_ALG_SHA1_96),
544 'scapy-crypto': "AES-CBC",
545 'scapy-integ': "HMAC-SHA1-96",
547 'key': b"JPjyOWBeVEQiMe7hJPjyOWBe"},
548 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
549 IPSEC_API_CRYPTO_ALG_AES_CBC_256),
550 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
551 IPSEC_API_INTEG_ALG_SHA1_96),
552 'scapy-crypto': "AES-CBC",
553 'scapy-integ': "HMAC-SHA1-96",
555 'key': b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"},
556 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
557 IPSEC_API_CRYPTO_ALG_NONE),
558 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
559 IPSEC_API_INTEG_ALG_SHA1_96),
560 'scapy-crypto': "NULL",
561 'scapy-integ': "HMAC-SHA1-96",
563 'key': b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"}]
565 for engine in engines:
566 self.vapi.cli("set crypto handler all %s" % engine)
569 # loop through each of the algorithms
572 # with self.subTest(algo=algo['scapy']):
574 p = copy.copy(self.ipv4_params)
575 p.auth_algo_vpp_id = algo['vpp-integ']
576 p.crypt_algo_vpp_id = algo['vpp-crypto']
577 p.crypt_algo = algo['scapy-crypto']
578 p.auth_algo = algo['scapy-integ']
579 p.crypt_key = algo['key']
580 p.salt = algo['salt']
582 self.config_network(p)
584 self.verify_tun_44(p, count=127)
585 c = p.tun_if.get_rx_stats()
586 self.assertEqual(c['packets'], 127)
587 c = p.tun_if.get_tx_stats()
588 self.assertEqual(c['packets'], 127)
594 self.verify_tun_44(p, count=127)
596 self.unconfig_network(p)
597 p.tun_sa_out.remove_vpp_config()
598 p.tun_sa_in.remove_vpp_config()
601 class TestIpsec4TunIfEspNoAlgo(TemplateIpsec, IpsecTun4):
602 """ IPsec IPv4 Tunnel interface no Algos """
604 encryption_type = ESP
605 tun4_encrypt_node_name = "esp4-encrypt-tun"
606 tun4_decrypt_node_name = "esp4-decrypt-tun"
608 def config_network(self, p):
610 p.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
611 IPSEC_API_INTEG_ALG_NONE)
615 p.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
616 IPSEC_API_CRYPTO_ALG_NONE)
617 p.crypt_algo = 'NULL'
620 p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
623 p.crypt_key, p.crypt_key,
624 p.auth_algo_vpp_id, p.auth_key,
627 p.tun_if.add_vpp_config()
629 p.tun_if.config_ip4()
630 config_tun_params(p, self.encryption_type, p.tun_if)
631 self.logger.info(self.vapi.cli("sh ipsec sa 0"))
632 self.logger.info(self.vapi.cli("sh ipsec sa 1"))
634 p.route = VppIpRoute(self, p.remote_tun_if_host, 32,
635 [VppRoutePath(p.tun_if.remote_ip4,
637 p.route.add_vpp_config()
639 def unconfig_network(self, p):
640 p.tun_if.unconfig_ip4()
641 p.tun_if.remove_vpp_config()
642 p.route.remove_vpp_config()
645 super(TestIpsec4TunIfEspNoAlgo, self).setUp()
647 self.tun_if = self.pg0
650 super(TestIpsec4TunIfEspNoAlgo, self).tearDown()
652 def test_tun_44(self):
653 """ IPSec SA with NULL algos """
656 self.config_network(p)
658 tx = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
659 dst=p.remote_tun_if_host)
660 self.send_and_assert_no_replies(self.pg1, tx)
662 self.unconfig_network(p)
665 class TestIpsec6MultiTunIfEsp(TemplateIpsec, IpsecTun6):
666 """ IPsec IPv6 Multi Tunnel interface """
668 encryption_type = ESP
669 tun6_encrypt_node_name = "esp6-encrypt-tun"
670 tun6_decrypt_node_name = "esp6-decrypt-tun"
673 super(TestIpsec6MultiTunIfEsp, self).setUp()
675 self.tun_if = self.pg0
677 self.multi_params = []
678 self.pg0.generate_remote_hosts(10)
679 self.pg0.configure_ipv6_neighbors()
682 p = copy.copy(self.ipv6_params)
684 p.remote_tun_if_host = "1111::%d" % (ii + 1)
685 p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
686 p.scapy_tun_spi = p.scapy_tun_spi + ii
687 p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
688 p.vpp_tun_spi = p.vpp_tun_spi + ii
690 p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
691 p.scapy_tra_spi = p.scapy_tra_spi + ii
692 p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
693 p.vpp_tra_spi = p.vpp_tra_spi + ii
695 p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
698 p.crypt_key, p.crypt_key,
699 p.auth_algo_vpp_id, p.auth_key,
700 p.auth_key, is_ip6=True,
701 dst=self.pg0.remote_hosts[ii].ip6)
702 p.tun_if.add_vpp_config()
704 p.tun_if.config_ip6()
705 config_tun_params(p, self.encryption_type, p.tun_if)
706 self.multi_params.append(p)
708 r = VppIpRoute(self, p.remote_tun_if_host, 128,
709 [VppRoutePath(p.tun_if.remote_ip6,
711 proto=DpoProto.DPO_PROTO_IP6)])
715 super(TestIpsec6MultiTunIfEsp, self).tearDown()
717 def test_tun_66(self):
718 """Multiple IPSEC tunnel interfaces """
719 for p in self.multi_params:
720 self.verify_tun_66(p, count=127)
721 c = p.tun_if.get_rx_stats()
722 self.assertEqual(c['packets'], 127)
723 c = p.tun_if.get_tx_stats()
724 self.assertEqual(c['packets'], 127)
727 class TestIpsecGreTebIfEsp(TemplateIpsec,
729 """ Ipsec GRE TEB ESP - TUN tests """
730 tun4_encrypt_node_name = "esp4-encrypt-tun"
731 tun4_decrypt_node_name = "esp4-decrypt-tun"
732 encryption_type = ESP
733 omac = "00:11:22:33:44:55"
735 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
737 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
738 sa.encrypt(IP(src=self.pg0.remote_ip4,
739 dst=self.pg0.local_ip4) /
741 Ether(dst=self.omac) /
742 IP(src="1.1.1.1", dst="1.1.1.2") /
743 UDP(sport=1144, dport=2233) /
744 Raw(b'X' * payload_size))
745 for i in range(count)]
747 def gen_pkts(self, sw_intf, src, dst, count=1,
749 return [Ether(dst=self.omac) /
750 IP(src="1.1.1.1", dst="1.1.1.2") /
751 UDP(sport=1144, dport=2233) /
752 Raw(b'X' * payload_size)
753 for i in range(count)]
755 def verify_decrypted(self, p, rxs):
757 self.assert_equal(rx[Ether].dst, self.omac)
758 self.assert_equal(rx[IP].dst, "1.1.1.2")
760 def verify_encrypted(self, p, sa, rxs):
763 pkt = sa.decrypt(rx[IP])
764 if not pkt.haslayer(IP):
765 pkt = IP(pkt[Raw].load)
766 self.assert_packet_checksums_valid(pkt)
767 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
768 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
769 self.assertTrue(pkt.haslayer(GRE))
771 self.assertEqual(e[Ether].dst, self.omac)
772 self.assertEqual(e[IP].dst, "1.1.1.2")
773 except (IndexError, AssertionError):
774 self.logger.debug(ppp("Unexpected packet:", rx))
776 self.logger.debug(ppp("Decrypted packet:", pkt))
782 super(TestIpsecGreTebIfEsp, self).setUp()
784 self.tun_if = self.pg0
788 bd1 = VppBridgeDomain(self, 1)
791 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
792 p.auth_algo_vpp_id, p.auth_key,
793 p.crypt_algo_vpp_id, p.crypt_key,
794 self.vpp_esp_protocol,
797 p.tun_sa_out.add_vpp_config()
799 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
800 p.auth_algo_vpp_id, p.auth_key,
801 p.crypt_algo_vpp_id, p.crypt_key,
802 self.vpp_esp_protocol,
805 p.tun_sa_in.add_vpp_config()
807 p.tun_if = VppGreInterface(self,
810 type=(VppEnum.vl_api_gre_tunnel_type_t.
811 GRE_API_TUNNEL_TYPE_TEB))
812 p.tun_if.add_vpp_config()
814 p.tun_protect = VppIpsecTunProtect(self,
819 p.tun_protect.add_vpp_config()
822 p.tun_if.config_ip4()
823 config_tun_params(p, self.encryption_type, p.tun_if)
825 VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
826 VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
828 self.vapi.cli("clear ipsec sa")
829 self.vapi.cli("sh adj")
830 self.vapi.cli("sh ipsec tun")
834 p.tun_if.unconfig_ip4()
835 super(TestIpsecGreTebIfEsp, self).tearDown()
838 class TestIpsecGreTebVlanIfEsp(TemplateIpsec,
840 """ Ipsec GRE TEB ESP - TUN tests """
841 tun4_encrypt_node_name = "esp4-encrypt-tun"
842 tun4_decrypt_node_name = "esp4-decrypt-tun"
843 encryption_type = ESP
844 omac = "00:11:22:33:44:55"
846 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
848 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
849 sa.encrypt(IP(src=self.pg0.remote_ip4,
850 dst=self.pg0.local_ip4) /
852 Ether(dst=self.omac) /
853 IP(src="1.1.1.1", dst="1.1.1.2") /
854 UDP(sport=1144, dport=2233) /
855 Raw(b'X' * payload_size))
856 for i in range(count)]
858 def gen_pkts(self, sw_intf, src, dst, count=1,
860 return [Ether(dst=self.omac) /
862 IP(src="1.1.1.1", dst="1.1.1.2") /
863 UDP(sport=1144, dport=2233) /
864 Raw(b'X' * payload_size)
865 for i in range(count)]
867 def verify_decrypted(self, p, rxs):
869 self.assert_equal(rx[Ether].dst, self.omac)
870 self.assert_equal(rx[Dot1Q].vlan, 11)
871 self.assert_equal(rx[IP].dst, "1.1.1.2")
873 def verify_encrypted(self, p, sa, rxs):
876 pkt = sa.decrypt(rx[IP])
877 if not pkt.haslayer(IP):
878 pkt = IP(pkt[Raw].load)
879 self.assert_packet_checksums_valid(pkt)
880 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
881 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
882 self.assertTrue(pkt.haslayer(GRE))
884 self.assertEqual(e[Ether].dst, self.omac)
885 self.assertFalse(e.haslayer(Dot1Q))
886 self.assertEqual(e[IP].dst, "1.1.1.2")
887 except (IndexError, AssertionError):
888 self.logger.debug(ppp("Unexpected packet:", rx))
890 self.logger.debug(ppp("Decrypted packet:", pkt))
896 super(TestIpsecGreTebVlanIfEsp, self).setUp()
898 self.tun_if = self.pg0
902 bd1 = VppBridgeDomain(self, 1)
905 self.pg1_11 = VppDot1QSubint(self, self.pg1, 11)
906 self.vapi.l2_interface_vlan_tag_rewrite(
907 sw_if_index=self.pg1_11.sw_if_index, vtr_op=L2_VTR_OP.L2_POP_1,
909 self.pg1_11.admin_up()
911 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
912 p.auth_algo_vpp_id, p.auth_key,
913 p.crypt_algo_vpp_id, p.crypt_key,
914 self.vpp_esp_protocol,
917 p.tun_sa_out.add_vpp_config()
919 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
920 p.auth_algo_vpp_id, p.auth_key,
921 p.crypt_algo_vpp_id, p.crypt_key,
922 self.vpp_esp_protocol,
925 p.tun_sa_in.add_vpp_config()
927 p.tun_if = VppGreInterface(self,
930 type=(VppEnum.vl_api_gre_tunnel_type_t.
931 GRE_API_TUNNEL_TYPE_TEB))
932 p.tun_if.add_vpp_config()
934 p.tun_protect = VppIpsecTunProtect(self,
939 p.tun_protect.add_vpp_config()
942 p.tun_if.config_ip4()
943 config_tun_params(p, self.encryption_type, p.tun_if)
945 VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
946 VppBridgeDomainPort(self, bd1, self.pg1_11).add_vpp_config()
948 self.vapi.cli("clear ipsec sa")
952 p.tun_if.unconfig_ip4()
953 super(TestIpsecGreTebVlanIfEsp, self).tearDown()
954 self.pg1_11.admin_down()
955 self.pg1_11.remove_vpp_config()
958 class TestIpsecGreTebIfEspTra(TemplateIpsec,
960 """ Ipsec GRE TEB ESP - Tra tests """
961 tun4_encrypt_node_name = "esp4-encrypt-tun"
962 tun4_decrypt_node_name = "esp4-decrypt-tun"
963 encryption_type = ESP
964 omac = "00:11:22:33:44:55"
966 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
968 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
969 sa.encrypt(IP(src=self.pg0.remote_ip4,
970 dst=self.pg0.local_ip4) /
972 Ether(dst=self.omac) /
973 IP(src="1.1.1.1", dst="1.1.1.2") /
974 UDP(sport=1144, dport=2233) /
975 Raw(b'X' * payload_size))
976 for i in range(count)]
978 def gen_pkts(self, sw_intf, src, dst, count=1,
980 return [Ether(dst=self.omac) /
981 IP(src="1.1.1.1", dst="1.1.1.2") /
982 UDP(sport=1144, dport=2233) /
983 Raw(b'X' * payload_size)
984 for i in range(count)]
986 def verify_decrypted(self, p, rxs):
988 self.assert_equal(rx[Ether].dst, self.omac)
989 self.assert_equal(rx[IP].dst, "1.1.1.2")
991 def verify_encrypted(self, p, sa, rxs):
994 pkt = sa.decrypt(rx[IP])
995 if not pkt.haslayer(IP):
996 pkt = IP(pkt[Raw].load)
997 self.assert_packet_checksums_valid(pkt)
998 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
999 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1000 self.assertTrue(pkt.haslayer(GRE))
1002 self.assertEqual(e[Ether].dst, self.omac)
1003 self.assertEqual(e[IP].dst, "1.1.1.2")
1004 except (IndexError, AssertionError):
1005 self.logger.debug(ppp("Unexpected packet:", rx))
1007 self.logger.debug(ppp("Decrypted packet:", pkt))
1013 super(TestIpsecGreTebIfEspTra, self).setUp()
1015 self.tun_if = self.pg0
1017 p = self.ipv4_params
1019 bd1 = VppBridgeDomain(self, 1)
1020 bd1.add_vpp_config()
1022 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1023 p.auth_algo_vpp_id, p.auth_key,
1024 p.crypt_algo_vpp_id, p.crypt_key,
1025 self.vpp_esp_protocol)
1026 p.tun_sa_out.add_vpp_config()
1028 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1029 p.auth_algo_vpp_id, p.auth_key,
1030 p.crypt_algo_vpp_id, p.crypt_key,
1031 self.vpp_esp_protocol)
1032 p.tun_sa_in.add_vpp_config()
1034 p.tun_if = VppGreInterface(self,
1036 self.pg0.remote_ip4,
1037 type=(VppEnum.vl_api_gre_tunnel_type_t.
1038 GRE_API_TUNNEL_TYPE_TEB))
1039 p.tun_if.add_vpp_config()
1041 p.tun_protect = VppIpsecTunProtect(self,
1046 p.tun_protect.add_vpp_config()
1049 p.tun_if.config_ip4()
1050 config_tra_params(p, self.encryption_type, p.tun_if)
1052 VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
1053 VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
1055 self.vapi.cli("clear ipsec sa")
1058 p = self.ipv4_params
1059 p.tun_if.unconfig_ip4()
1060 super(TestIpsecGreTebIfEspTra, self).tearDown()
1063 class TestIpsecGreIfEsp(TemplateIpsec,
1065 """ Ipsec GRE ESP - TUN tests """
1066 tun4_encrypt_node_name = "esp4-encrypt-tun"
1067 tun4_decrypt_node_name = "esp4-decrypt-tun"
1068 encryption_type = ESP
1070 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1072 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1073 sa.encrypt(IP(src=self.pg0.remote_ip4,
1074 dst=self.pg0.local_ip4) /
1076 IP(src=self.pg1.local_ip4,
1077 dst=self.pg1.remote_ip4) /
1078 UDP(sport=1144, dport=2233) /
1079 Raw(b'X' * payload_size))
1080 for i in range(count)]
1082 def gen_pkts(self, sw_intf, src, dst, count=1,
1084 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1085 IP(src="1.1.1.1", dst="1.1.1.2") /
1086 UDP(sport=1144, dport=2233) /
1087 Raw(b'X' * payload_size)
1088 for i in range(count)]
1090 def verify_decrypted(self, p, rxs):
1092 self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1093 self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1095 def verify_encrypted(self, p, sa, rxs):
1098 pkt = sa.decrypt(rx[IP])
1099 if not pkt.haslayer(IP):
1100 pkt = IP(pkt[Raw].load)
1101 self.assert_packet_checksums_valid(pkt)
1102 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1103 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1104 self.assertTrue(pkt.haslayer(GRE))
1106 self.assertEqual(e[IP].dst, "1.1.1.2")
1107 except (IndexError, AssertionError):
1108 self.logger.debug(ppp("Unexpected packet:", rx))
1110 self.logger.debug(ppp("Decrypted packet:", pkt))
1116 super(TestIpsecGreIfEsp, self).setUp()
1118 self.tun_if = self.pg0
1120 p = self.ipv4_params
1122 bd1 = VppBridgeDomain(self, 1)
1123 bd1.add_vpp_config()
1125 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1126 p.auth_algo_vpp_id, p.auth_key,
1127 p.crypt_algo_vpp_id, p.crypt_key,
1128 self.vpp_esp_protocol,
1130 self.pg0.remote_ip4)
1131 p.tun_sa_out.add_vpp_config()
1133 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1134 p.auth_algo_vpp_id, p.auth_key,
1135 p.crypt_algo_vpp_id, p.crypt_key,
1136 self.vpp_esp_protocol,
1137 self.pg0.remote_ip4,
1139 p.tun_sa_in.add_vpp_config()
1141 p.tun_if = VppGreInterface(self,
1143 self.pg0.remote_ip4)
1144 p.tun_if.add_vpp_config()
1146 p.tun_protect = VppIpsecTunProtect(self,
1150 p.tun_protect.add_vpp_config()
1153 p.tun_if.config_ip4()
1154 config_tun_params(p, self.encryption_type, p.tun_if)
1156 VppIpRoute(self, "1.1.1.2", 32,
1157 [VppRoutePath(p.tun_if.remote_ip4,
1158 0xffffffff)]).add_vpp_config()
1161 p = self.ipv4_params
1162 p.tun_if.unconfig_ip4()
1163 super(TestIpsecGreIfEsp, self).tearDown()
1166 class TestIpsecGreIfEspTra(TemplateIpsec,
1168 """ Ipsec GRE ESP - TRA tests """
1169 tun4_encrypt_node_name = "esp4-encrypt-tun"
1170 tun4_decrypt_node_name = "esp4-decrypt-tun"
1171 encryption_type = ESP
1173 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1175 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1176 sa.encrypt(IP(src=self.pg0.remote_ip4,
1177 dst=self.pg0.local_ip4) /
1179 IP(src=self.pg1.local_ip4,
1180 dst=self.pg1.remote_ip4) /
1181 UDP(sport=1144, dport=2233) /
1182 Raw(b'X' * payload_size))
1183 for i in range(count)]
1185 def gen_encrypt_non_ip_pkts(self, sa, sw_intf, src, dst, count=1,
1187 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1188 sa.encrypt(IP(src=self.pg0.remote_ip4,
1189 dst=self.pg0.local_ip4) /
1191 UDP(sport=1144, dport=2233) /
1192 Raw(b'X' * payload_size))
1193 for i in range(count)]
1195 def gen_pkts(self, sw_intf, src, dst, count=1,
1197 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1198 IP(src="1.1.1.1", dst="1.1.1.2") /
1199 UDP(sport=1144, dport=2233) /
1200 Raw(b'X' * payload_size)
1201 for i in range(count)]
1203 def verify_decrypted(self, p, rxs):
1205 self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1206 self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1208 def verify_encrypted(self, p, sa, rxs):
1211 pkt = sa.decrypt(rx[IP])
1212 if not pkt.haslayer(IP):
1213 pkt = IP(pkt[Raw].load)
1214 self.assert_packet_checksums_valid(pkt)
1215 self.assertTrue(pkt.haslayer(GRE))
1217 self.assertEqual(e[IP].dst, "1.1.1.2")
1218 except (IndexError, AssertionError):
1219 self.logger.debug(ppp("Unexpected packet:", rx))
1221 self.logger.debug(ppp("Decrypted packet:", pkt))
1227 super(TestIpsecGreIfEspTra, self).setUp()
1229 self.tun_if = self.pg0
1231 p = self.ipv4_params
1233 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1234 p.auth_algo_vpp_id, p.auth_key,
1235 p.crypt_algo_vpp_id, p.crypt_key,
1236 self.vpp_esp_protocol)
1237 p.tun_sa_out.add_vpp_config()
1239 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1240 p.auth_algo_vpp_id, p.auth_key,
1241 p.crypt_algo_vpp_id, p.crypt_key,
1242 self.vpp_esp_protocol)
1243 p.tun_sa_in.add_vpp_config()
1245 p.tun_if = VppGreInterface(self,
1247 self.pg0.remote_ip4)
1248 p.tun_if.add_vpp_config()
1250 p.tun_protect = VppIpsecTunProtect(self,
1254 p.tun_protect.add_vpp_config()
1257 p.tun_if.config_ip4()
1258 config_tra_params(p, self.encryption_type, p.tun_if)
1260 VppIpRoute(self, "1.1.1.2", 32,
1261 [VppRoutePath(p.tun_if.remote_ip4,
1262 0xffffffff)]).add_vpp_config()
1265 p = self.ipv4_params
1266 p.tun_if.unconfig_ip4()
1267 super(TestIpsecGreIfEspTra, self).tearDown()
1269 def test_gre_non_ip(self):
1270 p = self.ipv4_params
1271 tx = self.gen_encrypt_non_ip_pkts(p.scapy_tun_sa, self.tun_if,
1272 src=p.remote_tun_if_host,
1273 dst=self.pg1.remote_ip6)
1274 self.send_and_assert_no_replies(self.tun_if, tx)
1275 node_name = ('/err/%s/unsupported payload' %
1276 self.tun4_decrypt_node_name)
1277 self.assertEqual(1, self.statistics.get_err_counter(node_name))
1280 class TestIpsecGre6IfEspTra(TemplateIpsec,
1282 """ Ipsec GRE ESP - TRA tests """
1283 tun6_encrypt_node_name = "esp6-encrypt-tun"
1284 tun6_decrypt_node_name = "esp6-decrypt-tun"
1285 encryption_type = ESP
1287 def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1,
1289 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1290 sa.encrypt(IPv6(src=self.pg0.remote_ip6,
1291 dst=self.pg0.local_ip6) /
1293 IPv6(src=self.pg1.local_ip6,
1294 dst=self.pg1.remote_ip6) /
1295 UDP(sport=1144, dport=2233) /
1296 Raw(b'X' * payload_size))
1297 for i in range(count)]
1299 def gen_pkts6(self, sw_intf, src, dst, count=1,
1301 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1302 IPv6(src="1::1", dst="1::2") /
1303 UDP(sport=1144, dport=2233) /
1304 Raw(b'X' * payload_size)
1305 for i in range(count)]
1307 def verify_decrypted6(self, p, rxs):
1309 self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1310 self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
1312 def verify_encrypted6(self, p, sa, rxs):
1315 pkt = sa.decrypt(rx[IPv6])
1316 if not pkt.haslayer(IPv6):
1317 pkt = IPv6(pkt[Raw].load)
1318 self.assert_packet_checksums_valid(pkt)
1319 self.assertTrue(pkt.haslayer(GRE))
1321 self.assertEqual(e[IPv6].dst, "1::2")
1322 except (IndexError, AssertionError):
1323 self.logger.debug(ppp("Unexpected packet:", rx))
1325 self.logger.debug(ppp("Decrypted packet:", pkt))
1331 super(TestIpsecGre6IfEspTra, self).setUp()
1333 self.tun_if = self.pg0
1335 p = self.ipv6_params
1337 bd1 = VppBridgeDomain(self, 1)
1338 bd1.add_vpp_config()
1340 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1341 p.auth_algo_vpp_id, p.auth_key,
1342 p.crypt_algo_vpp_id, p.crypt_key,
1343 self.vpp_esp_protocol)
1344 p.tun_sa_out.add_vpp_config()
1346 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1347 p.auth_algo_vpp_id, p.auth_key,
1348 p.crypt_algo_vpp_id, p.crypt_key,
1349 self.vpp_esp_protocol)
1350 p.tun_sa_in.add_vpp_config()
1352 p.tun_if = VppGreInterface(self,
1354 self.pg0.remote_ip6)
1355 p.tun_if.add_vpp_config()
1357 p.tun_protect = VppIpsecTunProtect(self,
1361 p.tun_protect.add_vpp_config()
1364 p.tun_if.config_ip6()
1365 config_tra_params(p, self.encryption_type, p.tun_if)
1367 r = VppIpRoute(self, "1::2", 128,
1368 [VppRoutePath(p.tun_if.remote_ip6,
1370 proto=DpoProto.DPO_PROTO_IP6)])
1374 p = self.ipv6_params
1375 p.tun_if.unconfig_ip6()
1376 super(TestIpsecGre6IfEspTra, self).tearDown()
1379 class TestIpsecMGreIfEspTra4(TemplateIpsec, IpsecTun4):
1380 """ Ipsec mGRE ESP v4 TRA tests """
1381 tun4_encrypt_node_name = "esp4-encrypt-tun"
1382 tun4_decrypt_node_name = "esp4-decrypt-tun"
1383 encryption_type = ESP
1385 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1387 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1388 sa.encrypt(IP(src=p.tun_dst,
1389 dst=self.pg0.local_ip4) /
1391 IP(src=self.pg1.local_ip4,
1392 dst=self.pg1.remote_ip4) /
1393 UDP(sport=1144, dport=2233) /
1394 Raw(b'X' * payload_size))
1395 for i in range(count)]
1397 def gen_pkts(self, sw_intf, src, dst, count=1,
1399 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1400 IP(src="1.1.1.1", dst=dst) /
1401 UDP(sport=1144, dport=2233) /
1402 Raw(b'X' * payload_size)
1403 for i in range(count)]
1405 def verify_decrypted(self, p, rxs):
1407 self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1408 self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1410 def verify_encrypted(self, p, sa, rxs):
1413 pkt = sa.decrypt(rx[IP])
1414 if not pkt.haslayer(IP):
1415 pkt = IP(pkt[Raw].load)
1416 self.assert_packet_checksums_valid(pkt)
1417 self.assertTrue(pkt.haslayer(GRE))
1419 self.assertEqual(e[IP].dst, p.remote_tun_if_host)
1420 except (IndexError, AssertionError):
1421 self.logger.debug(ppp("Unexpected packet:", rx))
1423 self.logger.debug(ppp("Decrypted packet:", pkt))
1429 super(TestIpsecMGreIfEspTra4, self).setUp()
1432 self.tun_if = self.pg0
1433 p = self.ipv4_params
1434 p.tun_if = VppGreInterface(self,
1437 mode=(VppEnum.vl_api_tunnel_mode_t.
1438 TUNNEL_API_MODE_MP))
1439 p.tun_if.add_vpp_config()
1441 p.tun_if.config_ip4()
1442 p.tun_if.generate_remote_hosts(N_NHS)
1443 self.pg0.generate_remote_hosts(N_NHS)
1444 self.pg0.configure_ipv4_neighbors()
1446 # setup some SAs for several next-hops on the interface
1447 self.multi_params = []
1450 p = copy.copy(self.ipv4_params)
1452 p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
1453 p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
1454 p.scapy_tun_spi = p.scapy_tun_spi + ii
1455 p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
1456 p.vpp_tun_spi = p.vpp_tun_spi + ii
1458 p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
1459 p.scapy_tra_spi = p.scapy_tra_spi + ii
1460 p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
1461 p.vpp_tra_spi = p.vpp_tra_spi + ii
1462 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1463 p.auth_algo_vpp_id, p.auth_key,
1464 p.crypt_algo_vpp_id, p.crypt_key,
1465 self.vpp_esp_protocol)
1466 p.tun_sa_out.add_vpp_config()
1468 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1469 p.auth_algo_vpp_id, p.auth_key,
1470 p.crypt_algo_vpp_id, p.crypt_key,
1471 self.vpp_esp_protocol)
1472 p.tun_sa_in.add_vpp_config()
1474 p.tun_protect = VppIpsecTunProtect(
1479 nh=p.tun_if.remote_hosts[ii].ip4)
1480 p.tun_protect.add_vpp_config()
1481 config_tra_params(p, self.encryption_type, p.tun_if)
1482 self.multi_params.append(p)
1484 VppIpRoute(self, p.remote_tun_if_host, 32,
1485 [VppRoutePath(p.tun_if.remote_hosts[ii].ip4,
1486 p.tun_if.sw_if_index)]).add_vpp_config()
1488 # in this v4 variant add the teibs after the protect
1489 p.teib = VppTeib(self, p.tun_if,
1490 p.tun_if.remote_hosts[ii].ip4,
1491 self.pg0.remote_hosts[ii].ip4).add_vpp_config()
1492 p.tun_dst = self.pg0.remote_hosts[ii].ip4
1493 self.logger.info(self.vapi.cli("sh ipsec protect-hash"))
1496 p = self.ipv4_params
1497 p.tun_if.unconfig_ip4()
1498 super(TestIpsecMGreIfEspTra4, self).tearDown()
1500 def test_tun_44(self):
1503 for p in self.multi_params:
1504 self.verify_tun_44(p, count=N_PKTS)
1505 p.teib.remove_vpp_config()
1506 self.verify_tun_dropped_44(p, count=N_PKTS)
1507 p.teib.add_vpp_config()
1508 self.verify_tun_44(p, count=N_PKTS)
1511 class TestIpsecMGreIfEspTra6(TemplateIpsec, IpsecTun6):
1512 """ Ipsec mGRE ESP v6 TRA tests """
1513 tun6_encrypt_node_name = "esp6-encrypt-tun"
1514 tun6_decrypt_node_name = "esp6-decrypt-tun"
1515 encryption_type = ESP
1517 def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1,
1519 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1520 sa.encrypt(IPv6(src=p.tun_dst,
1521 dst=self.pg0.local_ip6) /
1523 IPv6(src=self.pg1.local_ip6,
1524 dst=self.pg1.remote_ip6) /
1525 UDP(sport=1144, dport=2233) /
1526 Raw(b'X' * payload_size))
1527 for i in range(count)]
1529 def gen_pkts6(self, sw_intf, src, dst, count=1,
1531 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1532 IPv6(src="1::1", dst=dst) /
1533 UDP(sport=1144, dport=2233) /
1534 Raw(b'X' * payload_size)
1535 for i in range(count)]
1537 def verify_decrypted6(self, p, rxs):
1539 self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1540 self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
1542 def verify_encrypted6(self, p, sa, rxs):
1545 pkt = sa.decrypt(rx[IPv6])
1546 if not pkt.haslayer(IPv6):
1547 pkt = IPv6(pkt[Raw].load)
1548 self.assert_packet_checksums_valid(pkt)
1549 self.assertTrue(pkt.haslayer(GRE))
1551 self.assertEqual(e[IPv6].dst, p.remote_tun_if_host)
1552 except (IndexError, AssertionError):
1553 self.logger.debug(ppp("Unexpected packet:", rx))
1555 self.logger.debug(ppp("Decrypted packet:", pkt))
1561 super(TestIpsecMGreIfEspTra6, self).setUp()
1563 self.vapi.cli("set logging class ipsec level debug")
1566 self.tun_if = self.pg0
1567 p = self.ipv6_params
1568 p.tun_if = VppGreInterface(self,
1571 mode=(VppEnum.vl_api_tunnel_mode_t.
1572 TUNNEL_API_MODE_MP))
1573 p.tun_if.add_vpp_config()
1575 p.tun_if.config_ip6()
1576 p.tun_if.generate_remote_hosts(N_NHS)
1577 self.pg0.generate_remote_hosts(N_NHS)
1578 self.pg0.configure_ipv6_neighbors()
1580 # setup some SAs for several next-hops on the interface
1581 self.multi_params = []
1583 for ii in range(N_NHS):
1584 p = copy.copy(self.ipv6_params)
1586 p.remote_tun_if_host = "1::%d" % (ii + 1)
1587 p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
1588 p.scapy_tun_spi = p.scapy_tun_spi + ii
1589 p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
1590 p.vpp_tun_spi = p.vpp_tun_spi + ii
1592 p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
1593 p.scapy_tra_spi = p.scapy_tra_spi + ii
1594 p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
1595 p.vpp_tra_spi = p.vpp_tra_spi + ii
1596 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1597 p.auth_algo_vpp_id, p.auth_key,
1598 p.crypt_algo_vpp_id, p.crypt_key,
1599 self.vpp_esp_protocol)
1600 p.tun_sa_out.add_vpp_config()
1602 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1603 p.auth_algo_vpp_id, p.auth_key,
1604 p.crypt_algo_vpp_id, p.crypt_key,
1605 self.vpp_esp_protocol)
1606 p.tun_sa_in.add_vpp_config()
1608 # in this v6 variant add the teibs first then the protection
1609 p.tun_dst = self.pg0.remote_hosts[ii].ip6
1610 VppTeib(self, p.tun_if,
1611 p.tun_if.remote_hosts[ii].ip6,
1612 p.tun_dst).add_vpp_config()
1614 p.tun_protect = VppIpsecTunProtect(
1619 nh=p.tun_if.remote_hosts[ii].ip6)
1620 p.tun_protect.add_vpp_config()
1621 config_tra_params(p, self.encryption_type, p.tun_if)
1622 self.multi_params.append(p)
1624 VppIpRoute(self, p.remote_tun_if_host, 128,
1625 [VppRoutePath(p.tun_if.remote_hosts[ii].ip6,
1626 p.tun_if.sw_if_index)]).add_vpp_config()
1627 p.tun_dst = self.pg0.remote_hosts[ii].ip6
1629 self.logger.info(self.vapi.cli("sh log"))
1630 self.logger.info(self.vapi.cli("sh ipsec protect-hash"))
1631 self.logger.info(self.vapi.cli("sh adj 41"))
1634 p = self.ipv6_params
1635 p.tun_if.unconfig_ip6()
1636 super(TestIpsecMGreIfEspTra6, self).tearDown()
1638 def test_tun_66(self):
1640 for p in self.multi_params:
1641 self.verify_tun_66(p, count=63)
1644 class TemplateIpsec4TunProtect(object):
1645 """ IPsec IPv4 Tunnel protect """
1647 encryption_type = ESP
1648 tun4_encrypt_node_name = "esp4-encrypt-tun"
1649 tun4_decrypt_node_name = "esp4-decrypt-tun"
1650 tun4_input_node = "ipsec4-tun-input"
1652 def config_sa_tra(self, p):
1653 config_tun_params(p, self.encryption_type, p.tun_if)
1655 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1656 p.auth_algo_vpp_id, p.auth_key,
1657 p.crypt_algo_vpp_id, p.crypt_key,
1658 self.vpp_esp_protocol,
1660 p.tun_sa_out.add_vpp_config()
1662 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1663 p.auth_algo_vpp_id, p.auth_key,
1664 p.crypt_algo_vpp_id, p.crypt_key,
1665 self.vpp_esp_protocol,
1667 p.tun_sa_in.add_vpp_config()
1669 def config_sa_tun(self, p):
1670 config_tun_params(p, self.encryption_type, p.tun_if)
1672 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1673 p.auth_algo_vpp_id, p.auth_key,
1674 p.crypt_algo_vpp_id, p.crypt_key,
1675 self.vpp_esp_protocol,
1676 self.tun_if.local_addr[p.addr_type],
1677 self.tun_if.remote_addr[p.addr_type],
1679 p.tun_sa_out.add_vpp_config()
1681 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1682 p.auth_algo_vpp_id, p.auth_key,
1683 p.crypt_algo_vpp_id, p.crypt_key,
1684 self.vpp_esp_protocol,
1685 self.tun_if.remote_addr[p.addr_type],
1686 self.tun_if.local_addr[p.addr_type],
1688 p.tun_sa_in.add_vpp_config()
1690 def config_protect(self, p):
1691 p.tun_protect = VppIpsecTunProtect(self,
1695 p.tun_protect.add_vpp_config()
1697 def config_network(self, p):
1698 p.tun_if = VppIpIpTunInterface(self, self.pg0,
1700 self.pg0.remote_ip4)
1701 p.tun_if.add_vpp_config()
1703 p.tun_if.config_ip4()
1704 p.tun_if.config_ip6()
1706 p.route = VppIpRoute(self, p.remote_tun_if_host, 32,
1707 [VppRoutePath(p.tun_if.remote_ip4,
1709 p.route.add_vpp_config()
1710 r = VppIpRoute(self, p.remote_tun_if_host6, 128,
1711 [VppRoutePath(p.tun_if.remote_ip6,
1713 proto=DpoProto.DPO_PROTO_IP6)])
1716 def unconfig_network(self, p):
1717 p.route.remove_vpp_config()
1718 p.tun_if.remove_vpp_config()
1720 def unconfig_protect(self, p):
1721 p.tun_protect.remove_vpp_config()
1723 def unconfig_sa(self, p):
1724 p.tun_sa_out.remove_vpp_config()
1725 p.tun_sa_in.remove_vpp_config()
1728 class TestIpsec4TunProtect(TemplateIpsec,
1729 TemplateIpsec4TunProtect,
1731 """ IPsec IPv4 Tunnel protect - transport mode"""
1734 super(TestIpsec4TunProtect, self).setUp()
1736 self.tun_if = self.pg0
1739 super(TestIpsec4TunProtect, self).tearDown()
1741 def test_tun_44(self):
1742 """IPSEC tunnel protect"""
1744 p = self.ipv4_params
1746 self.config_network(p)
1747 self.config_sa_tra(p)
1748 self.config_protect(p)
1750 self.verify_tun_44(p, count=127)
1751 c = p.tun_if.get_rx_stats()
1752 self.assertEqual(c['packets'], 127)
1753 c = p.tun_if.get_tx_stats()
1754 self.assertEqual(c['packets'], 127)
1756 self.vapi.cli("clear ipsec sa")
1757 self.verify_tun_64(p, count=127)
1758 c = p.tun_if.get_rx_stats()
1759 self.assertEqual(c['packets'], 254)
1760 c = p.tun_if.get_tx_stats()
1761 self.assertEqual(c['packets'], 254)
1763 # rekey - create new SAs and update the tunnel protection
1765 np.crypt_key = b'X' + p.crypt_key[1:]
1766 np.scapy_tun_spi += 100
1767 np.scapy_tun_sa_id += 1
1768 np.vpp_tun_spi += 100
1769 np.vpp_tun_sa_id += 1
1770 np.tun_if.local_spi = p.vpp_tun_spi
1771 np.tun_if.remote_spi = p.scapy_tun_spi
1773 self.config_sa_tra(np)
1774 self.config_protect(np)
1777 self.verify_tun_44(np, count=127)
1778 c = p.tun_if.get_rx_stats()
1779 self.assertEqual(c['packets'], 381)
1780 c = p.tun_if.get_tx_stats()
1781 self.assertEqual(c['packets'], 381)
1784 self.unconfig_protect(np)
1785 self.unconfig_sa(np)
1786 self.unconfig_network(p)
1789 class TestIpsec4TunProtectUdp(TemplateIpsec,
1790 TemplateIpsec4TunProtect,
1792 """ IPsec IPv4 Tunnel protect - transport mode"""
1795 super(TestIpsec4TunProtectUdp, self).setUp()
1797 self.tun_if = self.pg0
1799 p = self.ipv4_params
1800 p.flags = (VppEnum.vl_api_ipsec_sad_flags_t.
1801 IPSEC_API_SAD_FLAG_UDP_ENCAP)
1802 p.nat_header = UDP(sport=5454, dport=4500)
1803 self.config_network(p)
1804 self.config_sa_tra(p)
1805 self.config_protect(p)
1808 p = self.ipv4_params
1809 self.unconfig_protect(p)
1811 self.unconfig_network(p)
1812 super(TestIpsec4TunProtectUdp, self).tearDown()
1814 def test_tun_44(self):
1815 """IPSEC UDP tunnel protect"""
1817 p = self.ipv4_params
1819 self.verify_tun_44(p, count=127)
1820 c = p.tun_if.get_rx_stats()
1821 self.assertEqual(c['packets'], 127)
1822 c = p.tun_if.get_tx_stats()
1823 self.assertEqual(c['packets'], 127)
1825 def test_keepalive(self):
1826 """ IPSEC NAT Keepalive """
1827 self.verify_keepalive(self.ipv4_params)
1830 class TestIpsec4TunProtectTun(TemplateIpsec,
1831 TemplateIpsec4TunProtect,
1833 """ IPsec IPv4 Tunnel protect - tunnel mode"""
1835 encryption_type = ESP
1836 tun4_encrypt_node_name = "esp4-encrypt-tun"
1837 tun4_decrypt_node_name = "esp4-decrypt-tun"
1840 super(TestIpsec4TunProtectTun, self).setUp()
1842 self.tun_if = self.pg0
1845 super(TestIpsec4TunProtectTun, self).tearDown()
1847 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1849 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1850 sa.encrypt(IP(src=sw_intf.remote_ip4,
1851 dst=sw_intf.local_ip4) /
1852 IP(src=src, dst=dst) /
1853 UDP(sport=1144, dport=2233) /
1854 Raw(b'X' * payload_size))
1855 for i in range(count)]
1857 def gen_pkts(self, sw_intf, src, dst, count=1,
1859 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1860 IP(src=src, dst=dst) /
1861 UDP(sport=1144, dport=2233) /
1862 Raw(b'X' * payload_size)
1863 for i in range(count)]
1865 def verify_decrypted(self, p, rxs):
1867 self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1868 self.assert_equal(rx[IP].src, p.remote_tun_if_host)
1869 self.assert_packet_checksums_valid(rx)
1871 def verify_encrypted(self, p, sa, rxs):
1874 pkt = sa.decrypt(rx[IP])
1875 if not pkt.haslayer(IP):
1876 pkt = IP(pkt[Raw].load)
1877 self.assert_packet_checksums_valid(pkt)
1878 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1879 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1880 inner = pkt[IP].payload
1881 self.assertEqual(inner[IP][IP].dst, p.remote_tun_if_host)
1883 except (IndexError, AssertionError):
1884 self.logger.debug(ppp("Unexpected packet:", rx))
1886 self.logger.debug(ppp("Decrypted packet:", pkt))
1891 def test_tun_44(self):
1892 """IPSEC tunnel protect """
1894 p = self.ipv4_params
1896 self.config_network(p)
1897 self.config_sa_tun(p)
1898 self.config_protect(p)
1900 self.verify_tun_44(p, count=127)
1902 c = p.tun_if.get_rx_stats()
1903 self.assertEqual(c['packets'], 127)
1904 c = p.tun_if.get_tx_stats()
1905 self.assertEqual(c['packets'], 127)
1907 # rekey - create new SAs and update the tunnel protection
1909 np.crypt_key = b'X' + p.crypt_key[1:]
1910 np.scapy_tun_spi += 100
1911 np.scapy_tun_sa_id += 1
1912 np.vpp_tun_spi += 100
1913 np.vpp_tun_sa_id += 1
1914 np.tun_if.local_spi = p.vpp_tun_spi
1915 np.tun_if.remote_spi = p.scapy_tun_spi
1917 self.config_sa_tun(np)
1918 self.config_protect(np)
1921 self.verify_tun_44(np, count=127)
1922 c = p.tun_if.get_rx_stats()
1923 self.assertEqual(c['packets'], 254)
1924 c = p.tun_if.get_tx_stats()
1925 self.assertEqual(c['packets'], 254)
1928 self.unconfig_protect(np)
1929 self.unconfig_sa(np)
1930 self.unconfig_network(p)
1933 class TestIpsec4TunProtectTunDrop(TemplateIpsec,
1934 TemplateIpsec4TunProtect,
1936 """ IPsec IPv4 Tunnel protect - tunnel mode - drop"""
1938 encryption_type = ESP
1939 tun4_encrypt_node_name = "esp4-encrypt-tun"
1940 tun4_decrypt_node_name = "esp4-decrypt-tun"
1943 super(TestIpsec4TunProtectTunDrop, self).setUp()
1945 self.tun_if = self.pg0
1948 super(TestIpsec4TunProtectTunDrop, self).tearDown()
1950 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1952 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1953 sa.encrypt(IP(src=sw_intf.remote_ip4,
1955 IP(src=src, dst=dst) /
1956 UDP(sport=1144, dport=2233) /
1957 Raw(b'X' * payload_size))
1958 for i in range(count)]
1960 def test_tun_drop_44(self):
1961 """IPSEC tunnel protect bogus tunnel header """
1963 p = self.ipv4_params
1965 self.config_network(p)
1966 self.config_sa_tun(p)
1967 self.config_protect(p)
1969 tx = self.gen_encrypt_pkts(p, p.scapy_tun_sa, self.tun_if,
1970 src=p.remote_tun_if_host,
1971 dst=self.pg1.remote_ip4,
1973 self.send_and_assert_no_replies(self.tun_if, tx)
1976 self.unconfig_protect(p)
1978 self.unconfig_network(p)
1981 class TemplateIpsec6TunProtect(object):
1982 """ IPsec IPv6 Tunnel protect """
1984 def config_sa_tra(self, p):
1985 config_tun_params(p, self.encryption_type, p.tun_if)
1987 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1988 p.auth_algo_vpp_id, p.auth_key,
1989 p.crypt_algo_vpp_id, p.crypt_key,
1990 self.vpp_esp_protocol)
1991 p.tun_sa_out.add_vpp_config()
1993 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1994 p.auth_algo_vpp_id, p.auth_key,
1995 p.crypt_algo_vpp_id, p.crypt_key,
1996 self.vpp_esp_protocol)
1997 p.tun_sa_in.add_vpp_config()
1999 def config_sa_tun(self, p):
2000 config_tun_params(p, self.encryption_type, p.tun_if)
2002 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
2003 p.auth_algo_vpp_id, p.auth_key,
2004 p.crypt_algo_vpp_id, p.crypt_key,
2005 self.vpp_esp_protocol,
2006 self.tun_if.local_addr[p.addr_type],
2007 self.tun_if.remote_addr[p.addr_type])
2008 p.tun_sa_out.add_vpp_config()
2010 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
2011 p.auth_algo_vpp_id, p.auth_key,
2012 p.crypt_algo_vpp_id, p.crypt_key,
2013 self.vpp_esp_protocol,
2014 self.tun_if.remote_addr[p.addr_type],
2015 self.tun_if.local_addr[p.addr_type])
2016 p.tun_sa_in.add_vpp_config()
2018 def config_protect(self, p):
2019 p.tun_protect = VppIpsecTunProtect(self,
2023 p.tun_protect.add_vpp_config()
2025 def config_network(self, p):
2026 p.tun_if = VppIpIpTunInterface(self, self.pg0,
2028 self.pg0.remote_ip6)
2029 p.tun_if.add_vpp_config()
2031 p.tun_if.config_ip6()
2032 p.tun_if.config_ip4()
2034 p.route = VppIpRoute(self, p.remote_tun_if_host, 128,
2035 [VppRoutePath(p.tun_if.remote_ip6,
2037 proto=DpoProto.DPO_PROTO_IP6)])
2038 p.route.add_vpp_config()
2039 r = VppIpRoute(self, p.remote_tun_if_host4, 32,
2040 [VppRoutePath(p.tun_if.remote_ip4,
2044 def unconfig_network(self, p):
2045 p.route.remove_vpp_config()
2046 p.tun_if.remove_vpp_config()
2048 def unconfig_protect(self, p):
2049 p.tun_protect.remove_vpp_config()
2051 def unconfig_sa(self, p):
2052 p.tun_sa_out.remove_vpp_config()
2053 p.tun_sa_in.remove_vpp_config()
2056 class TestIpsec6TunProtect(TemplateIpsec,
2057 TemplateIpsec6TunProtect,
2059 """ IPsec IPv6 Tunnel protect - transport mode"""
2061 encryption_type = ESP
2062 tun6_encrypt_node_name = "esp6-encrypt-tun"
2063 tun6_decrypt_node_name = "esp6-decrypt-tun"
2066 super(TestIpsec6TunProtect, self).setUp()
2068 self.tun_if = self.pg0
2071 super(TestIpsec6TunProtect, self).tearDown()
2073 def test_tun_66(self):
2074 """IPSEC tunnel protect 6o6"""
2076 p = self.ipv6_params
2078 self.config_network(p)
2079 self.config_sa_tra(p)
2080 self.config_protect(p)
2082 self.verify_tun_66(p, count=127)
2083 c = p.tun_if.get_rx_stats()
2084 self.assertEqual(c['packets'], 127)
2085 c = p.tun_if.get_tx_stats()
2086 self.assertEqual(c['packets'], 127)
2088 # rekey - create new SAs and update the tunnel protection
2090 np.crypt_key = b'X' + p.crypt_key[1:]
2091 np.scapy_tun_spi += 100
2092 np.scapy_tun_sa_id += 1
2093 np.vpp_tun_spi += 100
2094 np.vpp_tun_sa_id += 1
2095 np.tun_if.local_spi = p.vpp_tun_spi
2096 np.tun_if.remote_spi = p.scapy_tun_spi
2098 self.config_sa_tra(np)
2099 self.config_protect(np)
2102 self.verify_tun_66(np, count=127)
2103 c = p.tun_if.get_rx_stats()
2104 self.assertEqual(c['packets'], 254)
2105 c = p.tun_if.get_tx_stats()
2106 self.assertEqual(c['packets'], 254)
2108 # bounce the interface state
2109 p.tun_if.admin_down()
2110 self.verify_drop_tun_66(np, count=127)
2111 node = ('/err/ipsec6-tun-input/%s' %
2112 'ipsec packets received on disabled interface')
2113 self.assertEqual(127, self.statistics.get_err_counter(node))
2115 self.verify_tun_66(np, count=127)
2118 # 1) add two input SAs [old, new]
2119 # 2) swap output SA to [new]
2120 # 3) use only [new] input SA
2122 np3.crypt_key = b'Z' + p.crypt_key[1:]
2123 np3.scapy_tun_spi += 100
2124 np3.scapy_tun_sa_id += 1
2125 np3.vpp_tun_spi += 100
2126 np3.vpp_tun_sa_id += 1
2127 np3.tun_if.local_spi = p.vpp_tun_spi
2128 np3.tun_if.remote_spi = p.scapy_tun_spi
2130 self.config_sa_tra(np3)
2133 p.tun_protect.update_vpp_config(np.tun_sa_out,
2134 [np.tun_sa_in, np3.tun_sa_in])
2135 self.verify_tun_66(np, np, count=127)
2136 self.verify_tun_66(np3, np, count=127)
2139 p.tun_protect.update_vpp_config(np3.tun_sa_out,
2140 [np.tun_sa_in, np3.tun_sa_in])
2141 self.verify_tun_66(np, np3, count=127)
2142 self.verify_tun_66(np3, np3, count=127)
2145 p.tun_protect.update_vpp_config(np3.tun_sa_out,
2147 self.verify_tun_66(np3, np3, count=127)
2148 self.verify_drop_tun_66(np, count=127)
2150 c = p.tun_if.get_rx_stats()
2151 self.assertEqual(c['packets'], 127*9)
2152 c = p.tun_if.get_tx_stats()
2153 self.assertEqual(c['packets'], 127*8)
2154 self.unconfig_sa(np)
2157 self.unconfig_protect(np3)
2158 self.unconfig_sa(np3)
2159 self.unconfig_network(p)
2161 def test_tun_46(self):
2162 """IPSEC tunnel protect 4o6"""
2164 p = self.ipv6_params
2166 self.config_network(p)
2167 self.config_sa_tra(p)
2168 self.config_protect(p)
2170 self.verify_tun_46(p, count=127)
2171 c = p.tun_if.get_rx_stats()
2172 self.assertEqual(c['packets'], 127)
2173 c = p.tun_if.get_tx_stats()
2174 self.assertEqual(c['packets'], 127)
2177 self.unconfig_protect(p)
2179 self.unconfig_network(p)
2182 class TestIpsec6TunProtectTun(TemplateIpsec,
2183 TemplateIpsec6TunProtect,
2185 """ IPsec IPv6 Tunnel protect - tunnel mode"""
2187 encryption_type = ESP
2188 tun6_encrypt_node_name = "esp6-encrypt-tun"
2189 tun6_decrypt_node_name = "esp6-decrypt-tun"
2192 super(TestIpsec6TunProtectTun, self).setUp()
2194 self.tun_if = self.pg0
2197 super(TestIpsec6TunProtectTun, self).tearDown()
2199 def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1,
2201 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2202 sa.encrypt(IPv6(src=sw_intf.remote_ip6,
2203 dst=sw_intf.local_ip6) /
2204 IPv6(src=src, dst=dst) /
2205 UDP(sport=1166, dport=2233) /
2206 Raw(b'X' * payload_size))
2207 for i in range(count)]
2209 def gen_pkts6(self, sw_intf, src, dst, count=1,
2211 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2212 IPv6(src=src, dst=dst) /
2213 UDP(sport=1166, dport=2233) /
2214 Raw(b'X' * payload_size)
2215 for i in range(count)]
2217 def verify_decrypted6(self, p, rxs):
2219 self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
2220 self.assert_equal(rx[IPv6].src, p.remote_tun_if_host)
2221 self.assert_packet_checksums_valid(rx)
2223 def verify_encrypted6(self, p, sa, rxs):
2226 pkt = sa.decrypt(rx[IPv6])
2227 if not pkt.haslayer(IPv6):
2228 pkt = IPv6(pkt[Raw].load)
2229 self.assert_packet_checksums_valid(pkt)
2230 self.assert_equal(pkt[IPv6].dst, self.pg0.remote_ip6)
2231 self.assert_equal(pkt[IPv6].src, self.pg0.local_ip6)
2232 inner = pkt[IPv6].payload
2233 self.assertEqual(inner[IPv6][IPv6].dst, p.remote_tun_if_host)
2235 except (IndexError, AssertionError):
2236 self.logger.debug(ppp("Unexpected packet:", rx))
2238 self.logger.debug(ppp("Decrypted packet:", pkt))
2243 def test_tun_66(self):
2244 """IPSEC tunnel protect """
2246 p = self.ipv6_params
2248 self.config_network(p)
2249 self.config_sa_tun(p)
2250 self.config_protect(p)
2252 self.verify_tun_66(p, count=127)
2254 c = p.tun_if.get_rx_stats()
2255 self.assertEqual(c['packets'], 127)
2256 c = p.tun_if.get_tx_stats()
2257 self.assertEqual(c['packets'], 127)
2259 # rekey - create new SAs and update the tunnel protection
2261 np.crypt_key = b'X' + p.crypt_key[1:]
2262 np.scapy_tun_spi += 100
2263 np.scapy_tun_sa_id += 1
2264 np.vpp_tun_spi += 100
2265 np.vpp_tun_sa_id += 1
2266 np.tun_if.local_spi = p.vpp_tun_spi
2267 np.tun_if.remote_spi = p.scapy_tun_spi
2269 self.config_sa_tun(np)
2270 self.config_protect(np)
2273 self.verify_tun_66(np, count=127)
2274 c = p.tun_if.get_rx_stats()
2275 self.assertEqual(c['packets'], 254)
2276 c = p.tun_if.get_tx_stats()
2277 self.assertEqual(c['packets'], 254)
2280 self.unconfig_protect(np)
2281 self.unconfig_sa(np)
2282 self.unconfig_network(p)
2285 class TestIpsec6TunProtectTunDrop(TemplateIpsec,
2286 TemplateIpsec6TunProtect,
2288 """ IPsec IPv6 Tunnel protect - tunnel mode - drop"""
2290 encryption_type = ESP
2291 tun6_encrypt_node_name = "esp6-encrypt-tun"
2292 tun6_decrypt_node_name = "esp6-decrypt-tun"
2295 super(TestIpsec6TunProtectTunDrop, self).setUp()
2297 self.tun_if = self.pg0
2300 super(TestIpsec6TunProtectTunDrop, self).tearDown()
2302 def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1,
2304 # the IP destination of the revelaed packet does not match
2305 # that assigned to the tunnel
2306 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2307 sa.encrypt(IPv6(src=sw_intf.remote_ip6,
2309 IPv6(src=src, dst=dst) /
2310 UDP(sport=1144, dport=2233) /
2311 Raw(b'X' * payload_size))
2312 for i in range(count)]
2314 def test_tun_drop_66(self):
2315 """IPSEC 6 tunnel protect bogus tunnel header """
2317 p = self.ipv6_params
2319 self.config_network(p)
2320 self.config_sa_tun(p)
2321 self.config_protect(p)
2323 tx = self.gen_encrypt_pkts6(p, p.scapy_tun_sa, self.tun_if,
2324 src=p.remote_tun_if_host,
2325 dst=self.pg1.remote_ip6,
2327 self.send_and_assert_no_replies(self.tun_if, tx)
2329 self.unconfig_protect(p)
2331 self.unconfig_network(p)
2334 if __name__ == '__main__':
2335 unittest.main(testRunner=VppTestRunner)