5 from scapy.layers.ipsec import SecurityAssociation, ESP
6 from scapy.layers.l2 import Ether, GRE, Dot1Q
7 from scapy.packet import Raw, bind_layers
8 from scapy.layers.inet import IP, UDP
9 from scapy.layers.inet6 import IPv6
10 from scapy.contrib.mpls import MPLS
11 from framework import tag_fixme_vpp_workers
12 from framework import VppTestRunner
13 from template_ipsec import (
21 IpsecTun6HandoffTests,
22 IpsecTun4HandoffTests,
25 from vpp_gre_interface import VppGreInterface
26 from vpp_ipip_tun_interface import VppIpIpTunInterface
27 from vpp_ip_route import (
36 from vpp_ipsec import VppIpsecSA, VppIpsecTunProtect, VppIpsecInterface
37 from vpp_l2 import VppBridgeDomain, VppBridgeDomainPort
38 from vpp_sub_interface import L2_VTR_OP, VppDot1QSubint
39 from vpp_teib import VppTeib
41 from vpp_papi import VppEnum
42 from vpp_papi_provider import CliFailedCommandError
43 from vpp_acl import AclRule, VppAcl, VppAclInterface
44 from vpp_policer import PolicerAction, VppPolicer, Dir
47 def config_tun_params(p, encryption_type, tun_if, src=None, dst=None):
48 ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
50 p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_USE_ESN)
52 crypt_key = mk_scapy_crypt_key(p)
54 p.tun_dst = tun_if.remote_ip
55 p.tun_src = tun_if.local_ip
61 is_default_port = p.nat_header.dport == 4500
63 is_default_port = True
66 outbound_nat_header = p.nat_header
68 outbound_nat_header = UDP(sport=p.nat_header.dport, dport=p.nat_header.sport)
69 bind_layers(UDP, ESP, dport=p.nat_header.dport)
71 p.scapy_tun_sa = SecurityAssociation(
74 crypt_algo=p.crypt_algo,
76 auth_algo=p.auth_algo,
78 tunnel_header=ip_class_by_addr_type[p.addr_type](src=p.tun_dst, dst=p.tun_src),
79 nat_t_header=outbound_nat_header,
82 p.vpp_tun_sa = SecurityAssociation(
85 crypt_algo=p.crypt_algo,
87 auth_algo=p.auth_algo,
89 tunnel_header=ip_class_by_addr_type[p.addr_type](dst=p.tun_dst, src=p.tun_src),
90 nat_t_header=p.nat_header,
95 def config_tra_params(p, encryption_type, tun_if):
96 ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
98 p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_USE_ESN)
100 crypt_key = mk_scapy_crypt_key(p)
101 p.tun_dst = tun_if.remote_ip
102 p.tun_src = tun_if.local_ip
105 is_default_port = p.nat_header.dport == 4500
107 is_default_port = True
110 outbound_nat_header = p.nat_header
112 outbound_nat_header = UDP(sport=p.nat_header.dport, dport=p.nat_header.sport)
113 bind_layers(UDP, ESP, dport=p.nat_header.dport)
115 p.scapy_tun_sa = SecurityAssociation(
118 crypt_algo=p.crypt_algo,
120 auth_algo=p.auth_algo,
123 nat_t_header=outbound_nat_header,
125 p.vpp_tun_sa = SecurityAssociation(
128 crypt_algo=p.crypt_algo,
130 auth_algo=p.auth_algo,
133 nat_t_header=p.nat_header,
137 class TemplateIpsec4TunProtect(object):
138 """IPsec IPv4 Tunnel protect"""
140 encryption_type = ESP
141 tun4_encrypt_node_name = "esp4-encrypt-tun"
142 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
143 tun4_input_node = "ipsec4-tun-input"
145 def config_sa_tra(self, p):
146 config_tun_params(p, self.encryption_type, p.tun_if)
148 p.tun_sa_out = VppIpsecSA(
156 self.vpp_esp_protocol,
159 p.tun_sa_out.add_vpp_config()
161 p.tun_sa_in = VppIpsecSA(
169 self.vpp_esp_protocol,
172 p.tun_sa_in.add_vpp_config()
174 def config_sa_tun(self, p):
175 config_tun_params(p, self.encryption_type, p.tun_if)
177 p.tun_sa_out = VppIpsecSA(
185 self.vpp_esp_protocol,
186 self.tun_if.local_addr[p.addr_type],
187 self.tun_if.remote_addr[p.addr_type],
190 p.tun_sa_out.add_vpp_config()
192 p.tun_sa_in = VppIpsecSA(
200 self.vpp_esp_protocol,
201 self.tun_if.remote_addr[p.addr_type],
202 self.tun_if.local_addr[p.addr_type],
205 p.tun_sa_in.add_vpp_config()
207 def config_protect(self, p):
208 p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
209 p.tun_protect.add_vpp_config()
211 def config_network(self, p):
212 if hasattr(p, "tun_dst"):
215 tun_dst = self.pg0.remote_ip4
216 p.tun_if = VppIpIpTunInterface(self, self.pg0, self.pg0.local_ip4, tun_dst)
217 p.tun_if.add_vpp_config()
219 p.tun_if.config_ip4()
220 p.tun_if.config_ip6()
222 p.route = VppIpRoute(
224 p.remote_tun_if_host,
226 [VppRoutePath(p.tun_if.remote_ip4, 0xFFFFFFFF)],
228 p.route.add_vpp_config()
231 p.remote_tun_if_host6,
235 p.tun_if.remote_ip6, 0xFFFFFFFF, proto=DpoProto.DPO_PROTO_IP6
241 def unconfig_network(self, p):
242 p.route.remove_vpp_config()
243 p.tun_if.remove_vpp_config()
245 def unconfig_protect(self, p):
246 p.tun_protect.remove_vpp_config()
248 def unconfig_sa(self, p):
249 p.tun_sa_out.remove_vpp_config()
250 p.tun_sa_in.remove_vpp_config()
253 class TemplateIpsec4TunIfEsp(TemplateIpsec4TunProtect, TemplateIpsec):
254 """IPsec tunnel interface tests"""
256 encryption_type = ESP
260 super(TemplateIpsec4TunIfEsp, cls).setUpClass()
263 def tearDownClass(cls):
264 super(TemplateIpsec4TunIfEsp, cls).tearDownClass()
267 super(TemplateIpsec4TunIfEsp, self).setUp()
269 self.tun_if = self.pg0
273 self.config_network(p)
274 self.config_sa_tra(p)
275 self.config_protect(p)
278 super(TemplateIpsec4TunIfEsp, self).tearDown()
281 class TemplateIpsec4TunIfEspUdp(TemplateIpsec4TunProtect, TemplateIpsec):
282 """IPsec UDP tunnel interface tests"""
284 tun4_encrypt_node_name = "esp4-encrypt-tun"
285 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
286 encryption_type = ESP
290 super(TemplateIpsec4TunIfEspUdp, cls).setUpClass()
293 def tearDownClass(cls):
294 super(TemplateIpsec4TunIfEspUdp, cls).tearDownClass()
296 def verify_encrypted(self, p, sa, rxs):
299 # ensure the UDP ports are correct before we decrypt
301 self.assertTrue(rx.haslayer(UDP))
302 self.assert_equal(rx[UDP].sport, p.nat_header.sport)
303 self.assert_equal(rx[UDP].dport, 4500)
305 pkt = sa.decrypt(rx[IP])
306 if not pkt.haslayer(IP):
307 pkt = IP(pkt[Raw].load)
309 self.assert_packet_checksums_valid(pkt)
310 self.assert_equal(pkt[IP].dst, "1.1.1.1")
311 self.assert_equal(pkt[IP].src, self.pg1.remote_ip4)
312 except (IndexError, AssertionError):
313 self.logger.debug(ppp("Unexpected packet:", rx))
315 self.logger.debug(ppp("Decrypted packet:", pkt))
320 def config_sa_tra(self, p):
321 config_tun_params(p, self.encryption_type, p.tun_if)
323 p.tun_sa_out = VppIpsecSA(
331 self.vpp_esp_protocol,
333 udp_src=p.nat_header.sport,
334 udp_dst=p.nat_header.dport,
336 p.tun_sa_out.add_vpp_config()
338 p.tun_sa_in = VppIpsecSA(
346 self.vpp_esp_protocol,
348 udp_src=p.nat_header.sport,
349 udp_dst=p.nat_header.dport,
351 p.tun_sa_in.add_vpp_config()
354 super(TemplateIpsec4TunIfEspUdp, self).setUp()
357 p.flags = VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_UDP_ENCAP
358 p.nat_header = UDP(sport=5454, dport=4500)
360 self.tun_if = self.pg0
362 self.config_network(p)
363 self.config_sa_tra(p)
364 self.config_protect(p)
367 super(TemplateIpsec4TunIfEspUdp, self).tearDown()
370 class TestIpsec4TunIfEsp1(TemplateIpsec4TunIfEsp, IpsecTun4Tests):
371 """Ipsec ESP - TUN tests"""
373 tun4_encrypt_node_name = "esp4-encrypt-tun"
374 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
376 def test_tun_basic64(self):
377 """ipsec 6o4 tunnel basic test"""
378 self.tun4_encrypt_node_name = "esp4-encrypt-tun"
380 self.verify_tun_64(self.params[socket.AF_INET], count=1)
382 def test_tun_burst64(self):
383 """ipsec 6o4 tunnel basic test"""
384 self.tun4_encrypt_node_name = "esp4-encrypt-tun"
386 self.verify_tun_64(self.params[socket.AF_INET], count=257)
388 def test_tun_basic_frag44(self):
389 """ipsec 4o4 tunnel frag basic test"""
390 self.tun4_encrypt_node_name = "esp4-encrypt-tun"
394 self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index, [1500, 0, 0, 0])
396 self.params[socket.AF_INET], count=1, payload_size=1800, n_rx=2
398 self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index, [9000, 0, 0, 0])
401 class TestIpsec4TunIfEspUdp(TemplateIpsec4TunIfEspUdp, IpsecTun4Tests):
402 """Ipsec ESP UDP tests"""
404 tun4_input_node = "ipsec4-tun-input"
407 super(TestIpsec4TunIfEspUdp, self).setUp()
409 def test_keepalive(self):
410 """IPSEC NAT Keepalive"""
411 self.verify_keepalive(self.ipv4_params)
414 class TestIpsec4TunIfEspUdpGCM(TemplateIpsec4TunIfEspUdp, IpsecTun4Tests):
415 """Ipsec ESP UDP GCM tests"""
417 tun4_input_node = "ipsec4-tun-input"
420 super(TestIpsec4TunIfEspUdpGCM, self).setUp()
422 p.auth_algo_vpp_id = VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_NONE
423 p.crypt_algo_vpp_id = (
424 VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_GCM_256
426 p.crypt_algo = "AES-GCM"
428 p.crypt_key = b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"
432 class TestIpsec4TunIfEsp2(TemplateIpsec4TunIfEsp, IpsecTcpTests):
433 """Ipsec ESP - TCP tests"""
438 class TemplateIpsec6TunProtect(object):
439 """IPsec IPv6 Tunnel protect"""
441 def config_sa_tra(self, p):
442 config_tun_params(p, self.encryption_type, p.tun_if)
444 p.tun_sa_out = VppIpsecSA(
452 self.vpp_esp_protocol,
454 p.tun_sa_out.add_vpp_config()
456 p.tun_sa_in = VppIpsecSA(
464 self.vpp_esp_protocol,
466 p.tun_sa_in.add_vpp_config()
468 def config_sa_tun(self, p):
469 config_tun_params(p, self.encryption_type, p.tun_if)
471 p.tun_sa_out = VppIpsecSA(
479 self.vpp_esp_protocol,
480 self.tun_if.local_addr[p.addr_type],
481 self.tun_if.remote_addr[p.addr_type],
483 p.tun_sa_out.add_vpp_config()
485 p.tun_sa_in = VppIpsecSA(
493 self.vpp_esp_protocol,
494 self.tun_if.remote_addr[p.addr_type],
495 self.tun_if.local_addr[p.addr_type],
497 p.tun_sa_in.add_vpp_config()
499 def config_protect(self, p):
500 p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
501 p.tun_protect.add_vpp_config()
503 def config_network(self, p):
504 if hasattr(p, "tun_dst"):
507 tun_dst = self.pg0.remote_ip6
508 p.tun_if = VppIpIpTunInterface(self, self.pg0, self.pg0.local_ip6, tun_dst)
509 p.tun_if.add_vpp_config()
511 p.tun_if.config_ip6()
512 p.tun_if.config_ip4()
514 p.route = VppIpRoute(
516 p.remote_tun_if_host,
520 p.tun_if.remote_ip6, 0xFFFFFFFF, proto=DpoProto.DPO_PROTO_IP6
524 p.route.add_vpp_config()
527 p.remote_tun_if_host4,
529 [VppRoutePath(p.tun_if.remote_ip4, 0xFFFFFFFF)],
533 def unconfig_network(self, p):
534 p.route.remove_vpp_config()
535 p.tun_if.remove_vpp_config()
537 def unconfig_protect(self, p):
538 p.tun_protect.remove_vpp_config()
540 def unconfig_sa(self, p):
541 p.tun_sa_out.remove_vpp_config()
542 p.tun_sa_in.remove_vpp_config()
545 class TemplateIpsec6TunIfEsp(TemplateIpsec6TunProtect, TemplateIpsec):
546 """IPsec tunnel interface tests"""
548 encryption_type = ESP
551 super(TemplateIpsec6TunIfEsp, self).setUp()
553 self.tun_if = self.pg0
556 self.config_network(p)
557 self.config_sa_tra(p)
558 self.config_protect(p)
561 super(TemplateIpsec6TunIfEsp, self).tearDown()
564 class TestIpsec6TunIfEsp1(TemplateIpsec6TunIfEsp, IpsecTun6Tests):
565 """Ipsec ESP - TUN tests"""
567 tun6_encrypt_node_name = "esp6-encrypt-tun"
568 tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
570 def test_tun_basic46(self):
571 """ipsec 4o6 tunnel basic test"""
572 self.tun6_encrypt_node_name = "esp6-encrypt-tun"
573 self.verify_tun_46(self.params[socket.AF_INET6], count=1)
575 def test_tun_burst46(self):
576 """ipsec 4o6 tunnel burst test"""
577 self.tun6_encrypt_node_name = "esp6-encrypt-tun"
578 self.verify_tun_46(self.params[socket.AF_INET6], count=257)
581 class TestIpsec6TunIfEspHandoff(TemplateIpsec6TunIfEsp, IpsecTun6HandoffTests):
582 """Ipsec ESP 6 Handoff tests"""
584 tun6_encrypt_node_name = "esp6-encrypt-tun"
585 tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
587 def test_tun_handoff_66_police(self):
588 """ESP 6o6 tunnel with policer worker hand-off test"""
589 self.vapi.cli("clear errors")
590 self.vapi.cli("clear ipsec sa")
593 p = self.params[socket.AF_INET6]
595 action_tx = PolicerAction(
596 VppEnum.vl_api_sse2_qos_action_type_t.SSE2_QOS_ACTION_API_TRANSMIT, 0
598 policer = VppPolicer(
605 conform_action=action_tx,
606 exceed_action=action_tx,
607 violate_action=action_tx,
609 policer.add_vpp_config()
611 # Start policing on tun
612 policer.apply_vpp_config(p.tun_if.sw_if_index, Dir.RX, True)
614 for pol_bind in [1, 0]:
615 policer.bind_vpp_config(pol_bind, True)
617 # inject alternately on worker 0 and 1.
618 for worker in [0, 1, 0, 1]:
619 send_pkts = self.gen_encrypt_pkts6(
623 src=p.remote_tun_if_host,
624 dst=self.pg1.remote_ip6,
627 recv_pkts = self.send_and_expect(
628 self.tun_if, send_pkts, self.pg1, worker=worker
630 self.verify_decrypted6(p, recv_pkts)
631 self.logger.debug(self.vapi.cli("show trace max 100"))
633 stats = policer.get_stats()
634 stats0 = policer.get_stats(worker=0)
635 stats1 = policer.get_stats(worker=1)
638 # First pass: Worker 1, should have done all the policing
639 self.assertEqual(stats, stats1)
641 # Worker 0, should have handed everything off
642 self.assertEqual(stats0["conform_packets"], 0)
643 self.assertEqual(stats0["exceed_packets"], 0)
644 self.assertEqual(stats0["violate_packets"], 0)
646 # Second pass: both workers should have policed equal amounts
647 self.assertGreater(stats1["conform_packets"], 0)
648 self.assertEqual(stats1["exceed_packets"], 0)
649 self.assertGreater(stats1["violate_packets"], 0)
651 self.assertGreater(stats0["conform_packets"], 0)
652 self.assertEqual(stats0["exceed_packets"], 0)
653 self.assertGreater(stats0["violate_packets"], 0)
656 stats0["conform_packets"] + stats0["violate_packets"],
657 stats1["conform_packets"] + stats1["violate_packets"],
660 policer.apply_vpp_config(p.tun_if.sw_if_index, Dir.RX, False)
661 policer.remove_vpp_config()
664 class TestIpsec4TunIfEspHandoff(TemplateIpsec4TunIfEsp, IpsecTun4HandoffTests):
665 """Ipsec ESP 4 Handoff tests"""
667 tun4_encrypt_node_name = "esp4-encrypt-tun"
668 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
670 def test_tun_handoff_44_police(self):
671 """ESP 4o4 tunnel with policer worker hand-off test"""
672 self.vapi.cli("clear errors")
673 self.vapi.cli("clear ipsec sa")
676 p = self.params[socket.AF_INET]
678 action_tx = PolicerAction(
679 VppEnum.vl_api_sse2_qos_action_type_t.SSE2_QOS_ACTION_API_TRANSMIT, 0
681 policer = VppPolicer(
688 conform_action=action_tx,
689 exceed_action=action_tx,
690 violate_action=action_tx,
692 policer.add_vpp_config()
694 # Start policing on tun
695 policer.apply_vpp_config(p.tun_if.sw_if_index, Dir.RX, True)
697 for pol_bind in [1, 0]:
698 policer.bind_vpp_config(pol_bind, True)
700 # inject alternately on worker 0 and 1.
701 for worker in [0, 1, 0, 1]:
702 send_pkts = self.gen_encrypt_pkts(
706 src=p.remote_tun_if_host,
707 dst=self.pg1.remote_ip4,
710 recv_pkts = self.send_and_expect(
711 self.tun_if, send_pkts, self.pg1, worker=worker
713 self.verify_decrypted(p, recv_pkts)
714 self.logger.debug(self.vapi.cli("show trace max 100"))
716 stats = policer.get_stats()
717 stats0 = policer.get_stats(worker=0)
718 stats1 = policer.get_stats(worker=1)
721 # First pass: Worker 1, should have done all the policing
722 self.assertEqual(stats, stats1)
724 # Worker 0, should have handed everything off
725 self.assertEqual(stats0["conform_packets"], 0)
726 self.assertEqual(stats0["exceed_packets"], 0)
727 self.assertEqual(stats0["violate_packets"], 0)
729 # Second pass: both workers should have policed equal amounts
730 self.assertGreater(stats1["conform_packets"], 0)
731 self.assertEqual(stats1["exceed_packets"], 0)
732 self.assertGreater(stats1["violate_packets"], 0)
734 self.assertGreater(stats0["conform_packets"], 0)
735 self.assertEqual(stats0["exceed_packets"], 0)
736 self.assertGreater(stats0["violate_packets"], 0)
739 stats0["conform_packets"] + stats0["violate_packets"],
740 stats1["conform_packets"] + stats1["violate_packets"],
743 policer.apply_vpp_config(p.tun_if.sw_if_index, Dir.RX, False)
744 policer.remove_vpp_config()
747 @tag_fixme_vpp_workers
748 class TestIpsec4MultiTunIfEsp(TemplateIpsec4TunProtect, TemplateIpsec, IpsecTun4):
749 """IPsec IPv4 Multi Tunnel interface"""
751 encryption_type = ESP
752 tun4_encrypt_node_name = "esp4-encrypt-tun"
753 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
756 super(TestIpsec4MultiTunIfEsp, self).setUp()
758 self.tun_if = self.pg0
760 self.multi_params = []
761 self.pg0.generate_remote_hosts(10)
762 self.pg0.configure_ipv4_neighbors()
765 p = copy.copy(self.ipv4_params)
767 p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
768 p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
769 p.scapy_tun_spi = p.scapy_tun_spi + ii
770 p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
771 p.vpp_tun_spi = p.vpp_tun_spi + ii
773 p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
774 p.scapy_tra_spi = p.scapy_tra_spi + ii
775 p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
776 p.vpp_tra_spi = p.vpp_tra_spi + ii
777 p.tun_dst = self.pg0.remote_hosts[ii].ip4
779 self.multi_params.append(p)
780 self.config_network(p)
781 self.config_sa_tra(p)
782 self.config_protect(p)
785 super(TestIpsec4MultiTunIfEsp, self).tearDown()
787 def test_tun_44(self):
788 """Multiple IPSEC tunnel interfaces"""
789 for p in self.multi_params:
790 self.verify_tun_44(p, count=127)
791 self.assertEqual(p.tun_if.get_rx_stats(), 127)
792 self.assertEqual(p.tun_if.get_tx_stats(), 127)
794 def test_tun_rr_44(self):
795 """Round-robin packets acrros multiple interface"""
797 for p in self.multi_params:
798 tx = tx + self.gen_encrypt_pkts(
802 src=p.remote_tun_if_host,
803 dst=self.pg1.remote_ip4,
805 rxs = self.send_and_expect(self.tun_if, tx, self.pg1)
807 for rx, p in zip(rxs, self.multi_params):
808 self.verify_decrypted(p, [rx])
811 for p in self.multi_params:
812 tx = tx + self.gen_pkts(
813 self.pg1, src=self.pg1.remote_ip4, dst=p.remote_tun_if_host
815 rxs = self.send_and_expect(self.pg1, tx, self.tun_if)
817 for rx, p in zip(rxs, self.multi_params):
818 self.verify_encrypted(p, p.vpp_tun_sa, [rx])
821 class TestIpsec4TunIfEspAll(TemplateIpsec4TunProtect, TemplateIpsec, IpsecTun4):
822 """IPsec IPv4 Tunnel interface all Algos"""
824 encryption_type = ESP
825 tun4_encrypt_node_name = "esp4-encrypt-tun"
826 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
829 super(TestIpsec4TunIfEspAll, self).setUp()
831 self.tun_if = self.pg0
834 self.config_network(p)
835 self.config_sa_tra(p)
836 self.config_protect(p)
840 self.unconfig_protect(p)
841 self.unconfig_network(p)
844 super(TestIpsec4TunIfEspAll, self).tearDown()
848 # change the key and the SPI
851 p.crypt_key = b"X" + p.crypt_key[1:]
853 p.scapy_tun_sa_id += 1
856 p.tun_if.local_spi = p.vpp_tun_spi
857 p.tun_if.remote_spi = p.scapy_tun_spi
859 config_tun_params(p, self.encryption_type, p.tun_if)
861 p.tun_sa_out = VppIpsecSA(
869 self.vpp_esp_protocol,
873 p.tun_sa_in = VppIpsecSA(
881 self.vpp_esp_protocol,
885 p.tun_sa_in.add_vpp_config()
886 p.tun_sa_out.add_vpp_config()
888 self.config_protect(p)
889 np.tun_sa_out.remove_vpp_config()
890 np.tun_sa_in.remove_vpp_config()
891 self.logger.info(self.vapi.cli("sh ipsec sa"))
893 def test_tun_44(self):
894 """IPSEC tunnel all algos"""
896 # foreach VPP crypto engine
897 engines = ["ia32", "ipsecmb", "openssl"]
899 # foreach crypto algorithm
903 VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_GCM_128
906 VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_NONE
908 "scapy-crypto": "AES-GCM",
909 "scapy-integ": "NULL",
910 "key": b"JPjyOWBeVEQiMe7h",
915 VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_GCM_192
918 VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_NONE
920 "scapy-crypto": "AES-GCM",
921 "scapy-integ": "NULL",
922 "key": b"JPjyOWBeVEQiMe7hJPjyOWBe",
927 VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_GCM_256
930 VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_NONE
932 "scapy-crypto": "AES-GCM",
933 "scapy-integ": "NULL",
934 "key": b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h",
939 VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_CBC_128
942 VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_SHA1_96
944 "scapy-crypto": "AES-CBC",
945 "scapy-integ": "HMAC-SHA1-96",
947 "key": b"JPjyOWBeVEQiMe7h",
951 VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_CBC_192
954 VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_SHA_512_256
956 "scapy-crypto": "AES-CBC",
957 "scapy-integ": "SHA2-512-256",
959 "key": b"JPjyOWBeVEQiMe7hJPjyOWBe",
963 VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_CBC_256
966 VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_SHA_256_128
968 "scapy-crypto": "AES-CBC",
969 "scapy-integ": "SHA2-256-128",
971 "key": b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h",
975 VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_NONE
978 VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_SHA1_96
980 "scapy-crypto": "NULL",
981 "scapy-integ": "HMAC-SHA1-96",
983 "key": b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h",
987 for engine in engines:
988 self.vapi.cli("set crypto handler all %s" % engine)
991 # loop through each of the algorithms
994 # with self.subTest(algo=algo['scapy']):
997 p.auth_algo_vpp_id = algo["vpp-integ"]
998 p.crypt_algo_vpp_id = algo["vpp-crypto"]
999 p.crypt_algo = algo["scapy-crypto"]
1000 p.auth_algo = algo["scapy-integ"]
1001 p.crypt_key = algo["key"]
1002 p.salt = algo["salt"]
1008 self.verify_tun_44(p, count=127)
1011 class TestIpsec4TunIfEspNoAlgo(TemplateIpsec4TunProtect, TemplateIpsec, IpsecTun4):
1012 """IPsec IPv4 Tunnel interface no Algos"""
1014 encryption_type = ESP
1015 tun4_encrypt_node_name = "esp4-encrypt-tun"
1016 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1019 super(TestIpsec4TunIfEspNoAlgo, self).setUp()
1021 self.tun_if = self.pg0
1022 p = self.ipv4_params
1023 p.auth_algo_vpp_id = VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_NONE
1024 p.auth_algo = "NULL"
1027 p.crypt_algo_vpp_id = (
1028 VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_NONE
1030 p.crypt_algo = "NULL"
1034 super(TestIpsec4TunIfEspNoAlgo, self).tearDown()
1036 def test_tun_44(self):
1037 """IPSec SA with NULL algos"""
1038 p = self.ipv4_params
1040 self.config_network(p)
1041 self.config_sa_tra(p)
1042 self.config_protect(p)
1044 tx = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4, dst=p.remote_tun_if_host)
1045 self.send_and_assert_no_replies(self.pg1, tx)
1047 self.unconfig_protect(p)
1049 self.unconfig_network(p)
1052 @tag_fixme_vpp_workers
1053 class TestIpsec6MultiTunIfEsp(TemplateIpsec6TunProtect, TemplateIpsec, IpsecTun6):
1054 """IPsec IPv6 Multi Tunnel interface"""
1056 encryption_type = ESP
1057 tun6_encrypt_node_name = "esp6-encrypt-tun"
1058 tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
1061 super(TestIpsec6MultiTunIfEsp, self).setUp()
1063 self.tun_if = self.pg0
1065 self.multi_params = []
1066 self.pg0.generate_remote_hosts(10)
1067 self.pg0.configure_ipv6_neighbors()
1069 for ii in range(10):
1070 p = copy.copy(self.ipv6_params)
1072 p.remote_tun_if_host = "1111::%d" % (ii + 1)
1073 p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
1074 p.scapy_tun_spi = p.scapy_tun_spi + ii
1075 p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
1076 p.vpp_tun_spi = p.vpp_tun_spi + ii
1078 p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
1079 p.scapy_tra_spi = p.scapy_tra_spi + ii
1080 p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
1081 p.vpp_tra_spi = p.vpp_tra_spi + ii
1082 p.tun_dst = self.pg0.remote_hosts[ii].ip6
1084 self.multi_params.append(p)
1085 self.config_network(p)
1086 self.config_sa_tra(p)
1087 self.config_protect(p)
1090 super(TestIpsec6MultiTunIfEsp, self).tearDown()
1092 def test_tun_66(self):
1093 """Multiple IPSEC tunnel interfaces"""
1094 for p in self.multi_params:
1095 self.verify_tun_66(p, count=127)
1096 self.assertEqual(p.tun_if.get_rx_stats(), 127)
1097 self.assertEqual(p.tun_if.get_tx_stats(), 127)
1100 class TestIpsecGreTebIfEsp(TemplateIpsec, IpsecTun4Tests):
1101 """Ipsec GRE TEB ESP - TUN tests"""
1103 tun4_encrypt_node_name = "esp4-encrypt-tun"
1104 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1105 encryption_type = ESP
1106 omac = "00:11:22:33:44:55"
1108 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
1110 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1112 IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
1114 / Ether(dst=self.omac)
1115 / IP(src="1.1.1.1", dst="1.1.1.2")
1116 / UDP(sport=1144, dport=2233)
1117 / Raw(b"X" * payload_size)
1119 for i in range(count)
1122 def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
1124 Ether(dst=self.omac)
1125 / IP(src="1.1.1.1", dst="1.1.1.2")
1126 / UDP(sport=1144, dport=2233)
1127 / Raw(b"X" * payload_size)
1128 for i in range(count)
1131 def verify_decrypted(self, p, rxs):
1133 self.assert_equal(rx[Ether].dst, self.omac)
1134 self.assert_equal(rx[IP].dst, "1.1.1.2")
1136 def verify_encrypted(self, p, sa, rxs):
1139 pkt = sa.decrypt(rx[IP])
1140 if not pkt.haslayer(IP):
1141 pkt = IP(pkt[Raw].load)
1142 self.assert_packet_checksums_valid(pkt)
1143 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1144 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1145 self.assertTrue(pkt.haslayer(GRE))
1147 self.assertEqual(e[Ether].dst, self.omac)
1148 self.assertEqual(e[IP].dst, "1.1.1.2")
1149 except (IndexError, AssertionError):
1150 self.logger.debug(ppp("Unexpected packet:", rx))
1152 self.logger.debug(ppp("Decrypted packet:", pkt))
1158 super(TestIpsecGreTebIfEsp, self).setUp()
1160 self.tun_if = self.pg0
1162 p = self.ipv4_params
1164 bd1 = VppBridgeDomain(self, 1)
1165 bd1.add_vpp_config()
1167 p.tun_sa_out = VppIpsecSA(
1173 p.crypt_algo_vpp_id,
1175 self.vpp_esp_protocol,
1177 self.pg0.remote_ip4,
1179 p.tun_sa_out.add_vpp_config()
1181 p.tun_sa_in = VppIpsecSA(
1187 p.crypt_algo_vpp_id,
1189 self.vpp_esp_protocol,
1190 self.pg0.remote_ip4,
1193 p.tun_sa_in.add_vpp_config()
1195 p.tun_if = VppGreInterface(
1198 self.pg0.remote_ip4,
1199 type=(VppEnum.vl_api_gre_tunnel_type_t.GRE_API_TUNNEL_TYPE_TEB),
1201 p.tun_if.add_vpp_config()
1203 p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
1205 p.tun_protect.add_vpp_config()
1208 p.tun_if.config_ip4()
1209 config_tun_params(p, self.encryption_type, p.tun_if)
1211 VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
1212 VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
1214 self.vapi.cli("clear ipsec sa")
1215 self.vapi.cli("sh adj")
1216 self.vapi.cli("sh ipsec tun")
1219 p = self.ipv4_params
1220 p.tun_if.unconfig_ip4()
1221 super(TestIpsecGreTebIfEsp, self).tearDown()
1224 class TestIpsecGreTebVlanIfEsp(TemplateIpsec, IpsecTun4Tests):
1225 """Ipsec GRE TEB ESP - TUN tests"""
1227 tun4_encrypt_node_name = "esp4-encrypt-tun"
1228 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1229 encryption_type = ESP
1230 omac = "00:11:22:33:44:55"
1232 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
1234 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1236 IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
1238 / Ether(dst=self.omac)
1239 / IP(src="1.1.1.1", dst="1.1.1.2")
1240 / UDP(sport=1144, dport=2233)
1241 / Raw(b"X" * payload_size)
1243 for i in range(count)
1246 def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
1248 Ether(dst=self.omac)
1250 / IP(src="1.1.1.1", dst="1.1.1.2")
1251 / UDP(sport=1144, dport=2233)
1252 / Raw(b"X" * payload_size)
1253 for i in range(count)
1256 def verify_decrypted(self, p, rxs):
1258 self.assert_equal(rx[Ether].dst, self.omac)
1259 self.assert_equal(rx[Dot1Q].vlan, 11)
1260 self.assert_equal(rx[IP].dst, "1.1.1.2")
1262 def verify_encrypted(self, p, sa, rxs):
1265 pkt = sa.decrypt(rx[IP])
1266 if not pkt.haslayer(IP):
1267 pkt = IP(pkt[Raw].load)
1268 self.assert_packet_checksums_valid(pkt)
1269 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1270 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1271 self.assertTrue(pkt.haslayer(GRE))
1273 self.assertEqual(e[Ether].dst, self.omac)
1274 self.assertFalse(e.haslayer(Dot1Q))
1275 self.assertEqual(e[IP].dst, "1.1.1.2")
1276 except (IndexError, AssertionError):
1277 self.logger.debug(ppp("Unexpected packet:", rx))
1279 self.logger.debug(ppp("Decrypted packet:", pkt))
1285 super(TestIpsecGreTebVlanIfEsp, self).setUp()
1287 self.tun_if = self.pg0
1289 p = self.ipv4_params
1291 bd1 = VppBridgeDomain(self, 1)
1292 bd1.add_vpp_config()
1294 self.pg1_11 = VppDot1QSubint(self, self.pg1, 11)
1295 self.vapi.l2_interface_vlan_tag_rewrite(
1296 sw_if_index=self.pg1_11.sw_if_index,
1297 vtr_op=L2_VTR_OP.L2_POP_1,
1300 self.pg1_11.admin_up()
1302 p.tun_sa_out = VppIpsecSA(
1308 p.crypt_algo_vpp_id,
1310 self.vpp_esp_protocol,
1312 self.pg0.remote_ip4,
1314 p.tun_sa_out.add_vpp_config()
1316 p.tun_sa_in = VppIpsecSA(
1322 p.crypt_algo_vpp_id,
1324 self.vpp_esp_protocol,
1325 self.pg0.remote_ip4,
1328 p.tun_sa_in.add_vpp_config()
1330 p.tun_if = VppGreInterface(
1333 self.pg0.remote_ip4,
1334 type=(VppEnum.vl_api_gre_tunnel_type_t.GRE_API_TUNNEL_TYPE_TEB),
1336 p.tun_if.add_vpp_config()
1338 p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
1340 p.tun_protect.add_vpp_config()
1343 p.tun_if.config_ip4()
1344 config_tun_params(p, self.encryption_type, p.tun_if)
1346 VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
1347 VppBridgeDomainPort(self, bd1, self.pg1_11).add_vpp_config()
1349 self.vapi.cli("clear ipsec sa")
1352 p = self.ipv4_params
1353 p.tun_if.unconfig_ip4()
1354 super(TestIpsecGreTebVlanIfEsp, self).tearDown()
1355 self.pg1_11.admin_down()
1356 self.pg1_11.remove_vpp_config()
1359 class TestIpsecGreTebIfEspTra(TemplateIpsec, IpsecTun4Tests):
1360 """Ipsec GRE TEB ESP - Tra tests"""
1362 tun4_encrypt_node_name = "esp4-encrypt-tun"
1363 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1364 encryption_type = ESP
1365 omac = "00:11:22:33:44:55"
1367 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
1369 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1371 IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
1373 / Ether(dst=self.omac)
1374 / IP(src="1.1.1.1", dst="1.1.1.2")
1375 / UDP(sport=1144, dport=2233)
1376 / Raw(b"X" * payload_size)
1378 for i in range(count)
1381 def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
1383 Ether(dst=self.omac)
1384 / IP(src="1.1.1.1", dst="1.1.1.2")
1385 / UDP(sport=1144, dport=2233)
1386 / Raw(b"X" * payload_size)
1387 for i in range(count)
1390 def verify_decrypted(self, p, rxs):
1392 self.assert_equal(rx[Ether].dst, self.omac)
1393 self.assert_equal(rx[IP].dst, "1.1.1.2")
1395 def verify_encrypted(self, p, sa, rxs):
1398 pkt = sa.decrypt(rx[IP])
1399 if not pkt.haslayer(IP):
1400 pkt = IP(pkt[Raw].load)
1401 self.assert_packet_checksums_valid(pkt)
1402 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1403 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1404 self.assertTrue(pkt.haslayer(GRE))
1406 self.assertEqual(e[Ether].dst, self.omac)
1407 self.assertEqual(e[IP].dst, "1.1.1.2")
1408 except (IndexError, AssertionError):
1409 self.logger.debug(ppp("Unexpected packet:", rx))
1411 self.logger.debug(ppp("Decrypted packet:", pkt))
1417 super(TestIpsecGreTebIfEspTra, self).setUp()
1419 self.tun_if = self.pg0
1421 p = self.ipv4_params
1423 bd1 = VppBridgeDomain(self, 1)
1424 bd1.add_vpp_config()
1426 p.tun_sa_out = VppIpsecSA(
1432 p.crypt_algo_vpp_id,
1434 self.vpp_esp_protocol,
1436 p.tun_sa_out.add_vpp_config()
1438 p.tun_sa_in = VppIpsecSA(
1444 p.crypt_algo_vpp_id,
1446 self.vpp_esp_protocol,
1448 p.tun_sa_in.add_vpp_config()
1450 p.tun_if = VppGreInterface(
1453 self.pg0.remote_ip4,
1454 type=(VppEnum.vl_api_gre_tunnel_type_t.GRE_API_TUNNEL_TYPE_TEB),
1456 p.tun_if.add_vpp_config()
1458 p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
1460 p.tun_protect.add_vpp_config()
1463 p.tun_if.config_ip4()
1464 config_tra_params(p, self.encryption_type, p.tun_if)
1466 VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
1467 VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
1469 self.vapi.cli("clear ipsec sa")
1472 p = self.ipv4_params
1473 p.tun_if.unconfig_ip4()
1474 super(TestIpsecGreTebIfEspTra, self).tearDown()
1477 class TestIpsecGreTebUdpIfEspTra(TemplateIpsec, IpsecTun4Tests):
1478 """Ipsec GRE TEB UDP ESP - Tra tests"""
1480 tun4_encrypt_node_name = "esp4-encrypt-tun"
1481 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1482 encryption_type = ESP
1483 omac = "00:11:22:33:44:55"
1485 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
1487 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1489 IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
1491 / Ether(dst=self.omac)
1492 / IP(src="1.1.1.1", dst="1.1.1.2")
1493 / UDP(sport=1144, dport=2233)
1494 / Raw(b"X" * payload_size)
1496 for i in range(count)
1499 def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
1501 Ether(dst=self.omac)
1502 / IP(src="1.1.1.1", dst="1.1.1.2")
1503 / UDP(sport=1144, dport=2233)
1504 / Raw(b"X" * payload_size)
1505 for i in range(count)
1508 def verify_decrypted(self, p, rxs):
1510 self.assert_equal(rx[Ether].dst, self.omac)
1511 self.assert_equal(rx[IP].dst, "1.1.1.2")
1513 def verify_encrypted(self, p, sa, rxs):
1515 self.assertTrue(rx.haslayer(UDP))
1516 self.assertEqual(rx[UDP].dport, 4545)
1517 self.assertEqual(rx[UDP].sport, 5454)
1519 pkt = sa.decrypt(rx[IP])
1520 if not pkt.haslayer(IP):
1521 pkt = IP(pkt[Raw].load)
1522 self.assert_packet_checksums_valid(pkt)
1523 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1524 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1525 self.assertTrue(pkt.haslayer(GRE))
1527 self.assertEqual(e[Ether].dst, self.omac)
1528 self.assertEqual(e[IP].dst, "1.1.1.2")
1529 except (IndexError, AssertionError):
1530 self.logger.debug(ppp("Unexpected packet:", rx))
1532 self.logger.debug(ppp("Decrypted packet:", pkt))
1538 super(TestIpsecGreTebUdpIfEspTra, self).setUp()
1540 self.tun_if = self.pg0
1542 p = self.ipv4_params
1543 p = self.ipv4_params
1544 p.flags = VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_UDP_ENCAP
1545 p.nat_header = UDP(sport=5454, dport=4545)
1547 bd1 = VppBridgeDomain(self, 1)
1548 bd1.add_vpp_config()
1550 p.tun_sa_out = VppIpsecSA(
1556 p.crypt_algo_vpp_id,
1558 self.vpp_esp_protocol,
1563 p.tun_sa_out.add_vpp_config()
1565 p.tun_sa_in = VppIpsecSA(
1571 p.crypt_algo_vpp_id,
1573 self.vpp_esp_protocol,
1575 p.flags | VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_IS_INBOUND
1580 p.tun_sa_in.add_vpp_config()
1582 p.tun_if = VppGreInterface(
1585 self.pg0.remote_ip4,
1586 type=(VppEnum.vl_api_gre_tunnel_type_t.GRE_API_TUNNEL_TYPE_TEB),
1588 p.tun_if.add_vpp_config()
1590 p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
1592 p.tun_protect.add_vpp_config()
1595 p.tun_if.config_ip4()
1596 config_tra_params(p, self.encryption_type, p.tun_if)
1598 VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
1599 VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
1601 self.vapi.cli("clear ipsec sa")
1602 self.logger.info(self.vapi.cli("sh ipsec sa 0"))
1605 p = self.ipv4_params
1606 p.tun_if.unconfig_ip4()
1607 super(TestIpsecGreTebUdpIfEspTra, self).tearDown()
1610 class TestIpsecGreIfEsp(TemplateIpsec, IpsecTun4Tests):
1611 """Ipsec GRE ESP - TUN tests"""
1613 tun4_encrypt_node_name = "esp4-encrypt-tun"
1614 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1615 encryption_type = ESP
1617 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
1619 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1621 IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
1623 / IP(src=self.pg1.local_ip4, dst=self.pg1.remote_ip4)
1624 / UDP(sport=1144, dport=2233)
1625 / Raw(b"X" * payload_size)
1627 for i in range(count)
1630 def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
1632 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1633 / IP(src="1.1.1.1", dst="1.1.1.2")
1634 / UDP(sport=1144, dport=2233)
1635 / Raw(b"X" * payload_size)
1636 for i in range(count)
1639 def verify_decrypted(self, p, rxs):
1641 self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1642 self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1644 def verify_encrypted(self, p, sa, rxs):
1647 pkt = sa.decrypt(rx[IP])
1648 if not pkt.haslayer(IP):
1649 pkt = IP(pkt[Raw].load)
1650 self.assert_packet_checksums_valid(pkt)
1651 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1652 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1653 self.assertTrue(pkt.haslayer(GRE))
1655 self.assertEqual(e[IP].dst, "1.1.1.2")
1656 except (IndexError, AssertionError):
1657 self.logger.debug(ppp("Unexpected packet:", rx))
1659 self.logger.debug(ppp("Decrypted packet:", pkt))
1665 super(TestIpsecGreIfEsp, self).setUp()
1667 self.tun_if = self.pg0
1669 p = self.ipv4_params
1671 bd1 = VppBridgeDomain(self, 1)
1672 bd1.add_vpp_config()
1674 p.tun_sa_out = VppIpsecSA(
1680 p.crypt_algo_vpp_id,
1682 self.vpp_esp_protocol,
1684 self.pg0.remote_ip4,
1686 p.tun_sa_out.add_vpp_config()
1688 p.tun_sa_in = VppIpsecSA(
1694 p.crypt_algo_vpp_id,
1696 self.vpp_esp_protocol,
1697 self.pg0.remote_ip4,
1700 p.tun_sa_in.add_vpp_config()
1702 p.tun_if = VppGreInterface(self, self.pg0.local_ip4, self.pg0.remote_ip4)
1703 p.tun_if.add_vpp_config()
1705 p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
1706 p.tun_protect.add_vpp_config()
1709 p.tun_if.config_ip4()
1710 config_tun_params(p, self.encryption_type, p.tun_if)
1713 self, "1.1.1.2", 32, [VppRoutePath(p.tun_if.remote_ip4, 0xFFFFFFFF)]
1717 p = self.ipv4_params
1718 p.tun_if.unconfig_ip4()
1719 super(TestIpsecGreIfEsp, self).tearDown()
1722 class TestIpsecGreIfEspTra(TemplateIpsec, IpsecTun4Tests):
1723 """Ipsec GRE ESP - TRA tests"""
1725 tun4_encrypt_node_name = "esp4-encrypt-tun"
1726 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1727 encryption_type = ESP
1729 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
1731 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1733 IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
1735 / IP(src=self.pg1.local_ip4, dst=self.pg1.remote_ip4)
1736 / UDP(sport=1144, dport=2233)
1737 / Raw(b"X" * payload_size)
1739 for i in range(count)
1742 def gen_encrypt_non_ip_pkts(self, sa, sw_intf, src, dst, count=1, payload_size=100):
1744 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1746 IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
1748 / UDP(sport=1144, dport=2233)
1749 / Raw(b"X" * payload_size)
1751 for i in range(count)
1754 def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
1756 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1757 / IP(src="1.1.1.1", dst="1.1.1.2")
1758 / UDP(sport=1144, dport=2233)
1759 / Raw(b"X" * payload_size)
1760 for i in range(count)
1763 def verify_decrypted(self, p, rxs):
1765 self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1766 self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1768 def verify_encrypted(self, p, sa, rxs):
1771 pkt = sa.decrypt(rx[IP])
1772 if not pkt.haslayer(IP):
1773 pkt = IP(pkt[Raw].load)
1774 self.assert_packet_checksums_valid(pkt)
1775 self.assertTrue(pkt.haslayer(GRE))
1777 self.assertEqual(e[IP].dst, "1.1.1.2")
1778 except (IndexError, AssertionError):
1779 self.logger.debug(ppp("Unexpected packet:", rx))
1781 self.logger.debug(ppp("Decrypted packet:", pkt))
1787 super(TestIpsecGreIfEspTra, self).setUp()
1789 self.tun_if = self.pg0
1791 p = self.ipv4_params
1793 p.tun_sa_out = VppIpsecSA(
1799 p.crypt_algo_vpp_id,
1801 self.vpp_esp_protocol,
1803 p.tun_sa_out.add_vpp_config()
1805 p.tun_sa_in = VppIpsecSA(
1811 p.crypt_algo_vpp_id,
1813 self.vpp_esp_protocol,
1815 p.tun_sa_in.add_vpp_config()
1817 p.tun_if = VppGreInterface(self, self.pg0.local_ip4, self.pg0.remote_ip4)
1818 p.tun_if.add_vpp_config()
1820 p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
1821 p.tun_protect.add_vpp_config()
1824 p.tun_if.config_ip4()
1825 config_tra_params(p, self.encryption_type, p.tun_if)
1828 self, "1.1.1.2", 32, [VppRoutePath(p.tun_if.remote_ip4, 0xFFFFFFFF)]
1832 p = self.ipv4_params
1833 p.tun_if.unconfig_ip4()
1834 super(TestIpsecGreIfEspTra, self).tearDown()
1836 def test_gre_non_ip(self):
1837 p = self.ipv4_params
1838 tx = self.gen_encrypt_non_ip_pkts(
1841 src=p.remote_tun_if_host,
1842 dst=self.pg1.remote_ip6,
1844 self.send_and_assert_no_replies(self.tun_if, tx)
1845 node_name = "/err/%s/unsupported payload" % self.tun4_decrypt_node_name[0]
1846 self.assertEqual(1, self.statistics.get_err_counter(node_name))
1849 class TestIpsecGre6IfEspTra(TemplateIpsec, IpsecTun6Tests):
1850 """Ipsec GRE ESP - TRA tests"""
1852 tun6_encrypt_node_name = "esp6-encrypt-tun"
1853 tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
1854 encryption_type = ESP
1856 def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
1858 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1860 IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6)
1862 / IPv6(src=self.pg1.local_ip6, dst=self.pg1.remote_ip6)
1863 / UDP(sport=1144, dport=2233)
1864 / Raw(b"X" * payload_size)
1866 for i in range(count)
1869 def gen_pkts6(self, p, sw_intf, src, dst, count=1, payload_size=100):
1871 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1872 / IPv6(src="1::1", dst="1::2")
1873 / UDP(sport=1144, dport=2233)
1874 / Raw(b"X" * payload_size)
1875 for i in range(count)
1878 def verify_decrypted6(self, p, rxs):
1880 self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1881 self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
1883 def verify_encrypted6(self, p, sa, rxs):
1886 pkt = sa.decrypt(rx[IPv6])
1887 if not pkt.haslayer(IPv6):
1888 pkt = IPv6(pkt[Raw].load)
1889 self.assert_packet_checksums_valid(pkt)
1890 self.assertTrue(pkt.haslayer(GRE))
1892 self.assertEqual(e[IPv6].dst, "1::2")
1893 except (IndexError, AssertionError):
1894 self.logger.debug(ppp("Unexpected packet:", rx))
1896 self.logger.debug(ppp("Decrypted packet:", pkt))
1902 super(TestIpsecGre6IfEspTra, self).setUp()
1904 self.tun_if = self.pg0
1906 p = self.ipv6_params
1908 bd1 = VppBridgeDomain(self, 1)
1909 bd1.add_vpp_config()
1911 p.tun_sa_out = VppIpsecSA(
1917 p.crypt_algo_vpp_id,
1919 self.vpp_esp_protocol,
1921 p.tun_sa_out.add_vpp_config()
1923 p.tun_sa_in = VppIpsecSA(
1929 p.crypt_algo_vpp_id,
1931 self.vpp_esp_protocol,
1933 p.tun_sa_in.add_vpp_config()
1935 p.tun_if = VppGreInterface(self, self.pg0.local_ip6, self.pg0.remote_ip6)
1936 p.tun_if.add_vpp_config()
1938 p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
1939 p.tun_protect.add_vpp_config()
1942 p.tun_if.config_ip6()
1943 config_tra_params(p, self.encryption_type, p.tun_if)
1951 p.tun_if.remote_ip6, 0xFFFFFFFF, proto=DpoProto.DPO_PROTO_IP6
1958 p = self.ipv6_params
1959 p.tun_if.unconfig_ip6()
1960 super(TestIpsecGre6IfEspTra, self).tearDown()
1963 class TestIpsecMGreIfEspTra4(TemplateIpsec, IpsecTun4):
1964 """Ipsec mGRE ESP v4 TRA tests"""
1966 tun4_encrypt_node_name = "esp4-encrypt-tun"
1967 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1968 encryption_type = ESP
1970 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
1972 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1974 IP(src=p.tun_dst, dst=self.pg0.local_ip4)
1976 / IP(src=self.pg1.local_ip4, dst=self.pg1.remote_ip4)
1977 / UDP(sport=1144, dport=2233)
1978 / Raw(b"X" * payload_size)
1980 for i in range(count)
1983 def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
1985 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1986 / IP(src="1.1.1.1", dst=dst)
1987 / UDP(sport=1144, dport=2233)
1988 / Raw(b"X" * payload_size)
1989 for i in range(count)
1992 def verify_decrypted(self, p, rxs):
1994 self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1995 self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1997 def verify_encrypted(self, p, sa, rxs):
2000 pkt = sa.decrypt(rx[IP])
2001 if not pkt.haslayer(IP):
2002 pkt = IP(pkt[Raw].load)
2003 self.assert_packet_checksums_valid(pkt)
2004 self.assertTrue(pkt.haslayer(GRE))
2006 self.assertEqual(e[IP].dst, p.remote_tun_if_host)
2007 except (IndexError, AssertionError):
2008 self.logger.debug(ppp("Unexpected packet:", rx))
2010 self.logger.debug(ppp("Decrypted packet:", pkt))
2016 super(TestIpsecMGreIfEspTra4, self).setUp()
2019 self.tun_if = self.pg0
2020 p = self.ipv4_params
2021 p.tun_if = VppGreInterface(
2025 mode=(VppEnum.vl_api_tunnel_mode_t.TUNNEL_API_MODE_MP),
2027 p.tun_if.add_vpp_config()
2029 p.tun_if.config_ip4()
2030 p.tun_if.generate_remote_hosts(N_NHS)
2031 self.pg0.generate_remote_hosts(N_NHS)
2032 self.pg0.configure_ipv4_neighbors()
2034 # setup some SAs for several next-hops on the interface
2035 self.multi_params = []
2037 for ii in range(N_NHS):
2038 p = copy.copy(self.ipv4_params)
2040 p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
2041 p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
2042 p.scapy_tun_spi = p.scapy_tun_spi + ii
2043 p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
2044 p.vpp_tun_spi = p.vpp_tun_spi + ii
2046 p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
2047 p.scapy_tra_spi = p.scapy_tra_spi + ii
2048 p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
2049 p.vpp_tra_spi = p.vpp_tra_spi + ii
2050 p.tun_sa_out = VppIpsecSA(
2056 p.crypt_algo_vpp_id,
2058 self.vpp_esp_protocol,
2060 p.tun_sa_out.add_vpp_config()
2062 p.tun_sa_in = VppIpsecSA(
2068 p.crypt_algo_vpp_id,
2070 self.vpp_esp_protocol,
2072 p.tun_sa_in.add_vpp_config()
2074 p.tun_protect = VppIpsecTunProtect(
2079 nh=p.tun_if.remote_hosts[ii].ip4,
2081 p.tun_protect.add_vpp_config()
2082 config_tra_params(p, self.encryption_type, p.tun_if)
2083 self.multi_params.append(p)
2087 p.remote_tun_if_host,
2089 [VppRoutePath(p.tun_if.remote_hosts[ii].ip4, p.tun_if.sw_if_index)],
2092 # in this v4 variant add the teibs after the protect
2096 p.tun_if.remote_hosts[ii].ip4,
2097 self.pg0.remote_hosts[ii].ip4,
2099 p.tun_dst = self.pg0.remote_hosts[ii].ip4
2100 self.logger.info(self.vapi.cli("sh ipsec protect-hash"))
2103 p = self.ipv4_params
2104 p.tun_if.unconfig_ip4()
2105 super(TestIpsecMGreIfEspTra4, self).tearDown()
2107 def test_tun_44(self):
2110 for p in self.multi_params:
2111 self.verify_tun_44(p, count=N_PKTS)
2112 p.teib.remove_vpp_config()
2113 self.verify_tun_dropped_44(p, count=N_PKTS)
2114 p.teib.add_vpp_config()
2115 self.verify_tun_44(p, count=N_PKTS)
2118 class TestIpsecMGreIfEspTra6(TemplateIpsec, IpsecTun6):
2119 """Ipsec mGRE ESP v6 TRA tests"""
2121 tun6_encrypt_node_name = "esp6-encrypt-tun"
2122 tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
2123 encryption_type = ESP
2125 def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
2127 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2129 IPv6(src=p.tun_dst, dst=self.pg0.local_ip6)
2131 / IPv6(src=self.pg1.local_ip6, dst=self.pg1.remote_ip6)
2132 / UDP(sport=1144, dport=2233)
2133 / Raw(b"X" * payload_size)
2135 for i in range(count)
2138 def gen_pkts6(self, p, sw_intf, src, dst, count=1, payload_size=100):
2140 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2141 / IPv6(src="1::1", dst=dst)
2142 / UDP(sport=1144, dport=2233)
2143 / Raw(b"X" * payload_size)
2144 for i in range(count)
2147 def verify_decrypted6(self, p, rxs):
2149 self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
2150 self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
2152 def verify_encrypted6(self, p, sa, rxs):
2155 pkt = sa.decrypt(rx[IPv6])
2156 if not pkt.haslayer(IPv6):
2157 pkt = IPv6(pkt[Raw].load)
2158 self.assert_packet_checksums_valid(pkt)
2159 self.assertTrue(pkt.haslayer(GRE))
2161 self.assertEqual(e[IPv6].dst, p.remote_tun_if_host)
2162 except (IndexError, AssertionError):
2163 self.logger.debug(ppp("Unexpected packet:", rx))
2165 self.logger.debug(ppp("Decrypted packet:", pkt))
2171 super(TestIpsecMGreIfEspTra6, self).setUp()
2173 self.vapi.cli("set logging class ipsec level debug")
2176 self.tun_if = self.pg0
2177 p = self.ipv6_params
2178 p.tun_if = VppGreInterface(
2182 mode=(VppEnum.vl_api_tunnel_mode_t.TUNNEL_API_MODE_MP),
2184 p.tun_if.add_vpp_config()
2186 p.tun_if.config_ip6()
2187 p.tun_if.generate_remote_hosts(N_NHS)
2188 self.pg0.generate_remote_hosts(N_NHS)
2189 self.pg0.configure_ipv6_neighbors()
2191 # setup some SAs for several next-hops on the interface
2192 self.multi_params = []
2194 for ii in range(N_NHS):
2195 p = copy.copy(self.ipv6_params)
2197 p.remote_tun_if_host = "1::%d" % (ii + 1)
2198 p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
2199 p.scapy_tun_spi = p.scapy_tun_spi + ii
2200 p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
2201 p.vpp_tun_spi = p.vpp_tun_spi + ii
2203 p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
2204 p.scapy_tra_spi = p.scapy_tra_spi + ii
2205 p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
2206 p.vpp_tra_spi = p.vpp_tra_spi + ii
2207 p.tun_sa_out = VppIpsecSA(
2213 p.crypt_algo_vpp_id,
2215 self.vpp_esp_protocol,
2217 p.tun_sa_out.add_vpp_config()
2219 p.tun_sa_in = VppIpsecSA(
2225 p.crypt_algo_vpp_id,
2227 self.vpp_esp_protocol,
2229 p.tun_sa_in.add_vpp_config()
2231 # in this v6 variant add the teibs first then the protection
2232 p.tun_dst = self.pg0.remote_hosts[ii].ip6
2234 self, p.tun_if, p.tun_if.remote_hosts[ii].ip6, p.tun_dst
2237 p.tun_protect = VppIpsecTunProtect(
2242 nh=p.tun_if.remote_hosts[ii].ip6,
2244 p.tun_protect.add_vpp_config()
2245 config_tra_params(p, self.encryption_type, p.tun_if)
2246 self.multi_params.append(p)
2250 p.remote_tun_if_host,
2252 [VppRoutePath(p.tun_if.remote_hosts[ii].ip6, p.tun_if.sw_if_index)],
2254 p.tun_dst = self.pg0.remote_hosts[ii].ip6
2256 self.logger.info(self.vapi.cli("sh log"))
2257 self.logger.info(self.vapi.cli("sh ipsec protect-hash"))
2258 self.logger.info(self.vapi.cli("sh adj 41"))
2261 p = self.ipv6_params
2262 p.tun_if.unconfig_ip6()
2263 super(TestIpsecMGreIfEspTra6, self).tearDown()
2265 def test_tun_66(self):
2267 for p in self.multi_params:
2268 self.verify_tun_66(p, count=63)
2271 @tag_fixme_vpp_workers
2272 class TestIpsec4TunProtect(TemplateIpsec, TemplateIpsec4TunProtect, IpsecTun4):
2273 """IPsec IPv4 Tunnel protect - transport mode"""
2276 super(TestIpsec4TunProtect, self).setUp()
2278 self.tun_if = self.pg0
2281 super(TestIpsec4TunProtect, self).tearDown()
2283 def test_tun_44(self):
2284 """IPSEC tunnel protect"""
2286 p = self.ipv4_params
2288 self.config_network(p)
2289 self.config_sa_tra(p)
2290 self.config_protect(p)
2292 self.verify_tun_44(p, count=127)
2293 self.assertEqual(p.tun_if.get_rx_stats(), 127)
2294 self.assertEqual(p.tun_if.get_tx_stats(), 127)
2296 self.vapi.cli("clear ipsec sa")
2297 self.verify_tun_64(p, count=127)
2298 self.assertEqual(p.tun_if.get_rx_stats(), 254)
2299 self.assertEqual(p.tun_if.get_tx_stats(), 254)
2301 # rekey - create new SAs and update the tunnel protection
2303 np.crypt_key = b"X" + p.crypt_key[1:]
2304 np.scapy_tun_spi += 100
2305 np.scapy_tun_sa_id += 1
2306 np.vpp_tun_spi += 100
2307 np.vpp_tun_sa_id += 1
2308 np.tun_if.local_spi = p.vpp_tun_spi
2309 np.tun_if.remote_spi = p.scapy_tun_spi
2311 self.config_sa_tra(np)
2312 self.config_protect(np)
2315 self.verify_tun_44(np, count=127)
2316 self.assertEqual(p.tun_if.get_rx_stats(), 381)
2317 self.assertEqual(p.tun_if.get_tx_stats(), 381)
2320 self.unconfig_protect(np)
2321 self.unconfig_sa(np)
2322 self.unconfig_network(p)
2325 @tag_fixme_vpp_workers
2326 class TestIpsec4TunProtectUdp(TemplateIpsec, TemplateIpsec4TunProtect, IpsecTun4):
2327 """IPsec IPv4 Tunnel protect - transport mode"""
2330 super(TestIpsec4TunProtectUdp, self).setUp()
2332 self.tun_if = self.pg0
2334 p = self.ipv4_params
2335 p.flags = VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_UDP_ENCAP
2336 p.nat_header = UDP(sport=4500, dport=4500)
2337 self.config_network(p)
2338 self.config_sa_tra(p)
2339 self.config_protect(p)
2342 p = self.ipv4_params
2343 self.unconfig_protect(p)
2345 self.unconfig_network(p)
2346 super(TestIpsec4TunProtectUdp, self).tearDown()
2348 def verify_encrypted(self, p, sa, rxs):
2349 # ensure encrypted packets are recieved with the default UDP ports
2351 self.assertEqual(rx[UDP].sport, 4500)
2352 self.assertEqual(rx[UDP].dport, 4500)
2353 super(TestIpsec4TunProtectUdp, self).verify_encrypted(p, sa, rxs)
2355 def test_tun_44(self):
2356 """IPSEC UDP tunnel protect"""
2358 p = self.ipv4_params
2360 self.verify_tun_44(p, count=127)
2361 self.assertEqual(p.tun_if.get_rx_stats(), 127)
2362 self.assertEqual(p.tun_if.get_tx_stats(), 127)
2364 def test_keepalive(self):
2365 """IPSEC NAT Keepalive"""
2366 self.verify_keepalive(self.ipv4_params)
2369 @tag_fixme_vpp_workers
2370 class TestIpsec4TunProtectTun(TemplateIpsec, TemplateIpsec4TunProtect, IpsecTun4):
2371 """IPsec IPv4 Tunnel protect - tunnel mode"""
2373 encryption_type = ESP
2374 tun4_encrypt_node_name = "esp4-encrypt-tun"
2375 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
2378 super(TestIpsec4TunProtectTun, self).setUp()
2380 self.tun_if = self.pg0
2383 super(TestIpsec4TunProtectTun, self).tearDown()
2385 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
2387 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2389 IP(src=sw_intf.remote_ip4, dst=sw_intf.local_ip4)
2390 / IP(src=src, dst=dst)
2391 / UDP(sport=1144, dport=2233)
2392 / Raw(b"X" * payload_size)
2394 for i in range(count)
2397 def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
2399 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2400 / IP(src=src, dst=dst)
2401 / UDP(sport=1144, dport=2233)
2402 / Raw(b"X" * payload_size)
2403 for i in range(count)
2406 def verify_decrypted(self, p, rxs):
2408 self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
2409 self.assert_equal(rx[IP].src, p.remote_tun_if_host)
2410 self.assert_packet_checksums_valid(rx)
2412 def verify_encrypted(self, p, sa, rxs):
2415 pkt = sa.decrypt(rx[IP])
2416 if not pkt.haslayer(IP):
2417 pkt = IP(pkt[Raw].load)
2418 self.assert_packet_checksums_valid(pkt)
2419 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
2420 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
2421 inner = pkt[IP].payload
2422 self.assertEqual(inner[IP][IP].dst, p.remote_tun_if_host)
2424 except (IndexError, AssertionError):
2425 self.logger.debug(ppp("Unexpected packet:", rx))
2427 self.logger.debug(ppp("Decrypted packet:", pkt))
2432 def test_tun_44(self):
2433 """IPSEC tunnel protect"""
2435 p = self.ipv4_params
2437 self.config_network(p)
2438 self.config_sa_tun(p)
2439 self.config_protect(p)
2441 # also add an output features on the tunnel and physical interface
2442 # so we test they still work
2443 r_all = AclRule(True, src_prefix="0.0.0.0/0", dst_prefix="0.0.0.0/0", proto=0)
2444 a = VppAcl(self, [r_all]).add_vpp_config()
2446 VppAclInterface(self, self.pg0.sw_if_index, [a]).add_vpp_config()
2447 VppAclInterface(self, p.tun_if.sw_if_index, [a]).add_vpp_config()
2449 self.verify_tun_44(p, count=127)
2451 self.assertEqual(p.tun_if.get_rx_stats(), 127)
2452 self.assertEqual(p.tun_if.get_tx_stats(), 127)
2454 # rekey - create new SAs and update the tunnel protection
2456 np.crypt_key = b"X" + p.crypt_key[1:]
2457 np.scapy_tun_spi += 100
2458 np.scapy_tun_sa_id += 1
2459 np.vpp_tun_spi += 100
2460 np.vpp_tun_sa_id += 1
2461 np.tun_if.local_spi = p.vpp_tun_spi
2462 np.tun_if.remote_spi = p.scapy_tun_spi
2464 self.config_sa_tun(np)
2465 self.config_protect(np)
2468 self.verify_tun_44(np, count=127)
2469 self.assertEqual(p.tun_if.get_rx_stats(), 254)
2470 self.assertEqual(p.tun_if.get_tx_stats(), 254)
2473 self.unconfig_protect(np)
2474 self.unconfig_sa(np)
2475 self.unconfig_network(p)
2478 class TestIpsec4TunProtectTunDrop(TemplateIpsec, TemplateIpsec4TunProtect, IpsecTun4):
2479 """IPsec IPv4 Tunnel protect - tunnel mode - drop"""
2481 encryption_type = ESP
2482 tun4_encrypt_node_name = "esp4-encrypt-tun"
2483 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
2486 super(TestIpsec4TunProtectTunDrop, self).setUp()
2488 self.tun_if = self.pg0
2491 super(TestIpsec4TunProtectTunDrop, self).tearDown()
2493 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
2495 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2497 IP(src=sw_intf.remote_ip4, dst="5.5.5.5")
2498 / IP(src=src, dst=dst)
2499 / UDP(sport=1144, dport=2233)
2500 / Raw(b"X" * payload_size)
2502 for i in range(count)
2505 def test_tun_drop_44(self):
2506 """IPSEC tunnel protect bogus tunnel header"""
2508 p = self.ipv4_params
2510 self.config_network(p)
2511 self.config_sa_tun(p)
2512 self.config_protect(p)
2514 tx = self.gen_encrypt_pkts(
2518 src=p.remote_tun_if_host,
2519 dst=self.pg1.remote_ip4,
2522 self.send_and_assert_no_replies(self.tun_if, tx)
2525 self.unconfig_protect(p)
2527 self.unconfig_network(p)
2530 @tag_fixme_vpp_workers
2531 class TestIpsec6TunProtect(TemplateIpsec, TemplateIpsec6TunProtect, IpsecTun6):
2532 """IPsec IPv6 Tunnel protect - transport mode"""
2534 encryption_type = ESP
2535 tun6_encrypt_node_name = "esp6-encrypt-tun"
2536 tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
2539 super(TestIpsec6TunProtect, self).setUp()
2541 self.tun_if = self.pg0
2544 super(TestIpsec6TunProtect, self).tearDown()
2546 def test_tun_66(self):
2547 """IPSEC tunnel protect 6o6"""
2549 p = self.ipv6_params
2551 self.config_network(p)
2552 self.config_sa_tra(p)
2553 self.config_protect(p)
2555 self.verify_tun_66(p, count=127)
2556 self.assertEqual(p.tun_if.get_rx_stats(), 127)
2557 self.assertEqual(p.tun_if.get_tx_stats(), 127)
2559 # rekey - create new SAs and update the tunnel protection
2561 np.crypt_key = b"X" + p.crypt_key[1:]
2562 np.scapy_tun_spi += 100
2563 np.scapy_tun_sa_id += 1
2564 np.vpp_tun_spi += 100
2565 np.vpp_tun_sa_id += 1
2566 np.tun_if.local_spi = p.vpp_tun_spi
2567 np.tun_if.remote_spi = p.scapy_tun_spi
2569 self.config_sa_tra(np)
2570 self.config_protect(np)
2573 self.verify_tun_66(np, count=127)
2574 self.assertEqual(p.tun_if.get_rx_stats(), 254)
2575 self.assertEqual(p.tun_if.get_tx_stats(), 254)
2577 # bounce the interface state
2578 p.tun_if.admin_down()
2579 self.verify_drop_tun_66(np, count=127)
2581 "/err/ipsec6-tun-input/%s" % "ipsec packets received on disabled interface"
2583 self.assertEqual(127, self.statistics.get_err_counter(node))
2585 self.verify_tun_66(np, count=127)
2588 # 1) add two input SAs [old, new]
2589 # 2) swap output SA to [new]
2590 # 3) use only [new] input SA
2592 np3.crypt_key = b"Z" + p.crypt_key[1:]
2593 np3.scapy_tun_spi += 100
2594 np3.scapy_tun_sa_id += 1
2595 np3.vpp_tun_spi += 100
2596 np3.vpp_tun_sa_id += 1
2597 np3.tun_if.local_spi = p.vpp_tun_spi
2598 np3.tun_if.remote_spi = p.scapy_tun_spi
2600 self.config_sa_tra(np3)
2603 p.tun_protect.update_vpp_config(np.tun_sa_out, [np.tun_sa_in, np3.tun_sa_in])
2604 self.verify_tun_66(np, np, count=127)
2605 self.verify_tun_66(np3, np, count=127)
2608 p.tun_protect.update_vpp_config(np3.tun_sa_out, [np.tun_sa_in, np3.tun_sa_in])
2609 self.verify_tun_66(np, np3, count=127)
2610 self.verify_tun_66(np3, np3, count=127)
2613 p.tun_protect.update_vpp_config(np3.tun_sa_out, [np3.tun_sa_in])
2614 self.verify_tun_66(np3, np3, count=127)
2615 self.verify_drop_tun_rx_66(np, count=127)
2617 self.assertEqual(p.tun_if.get_rx_stats(), 127 * 9)
2618 self.assertEqual(p.tun_if.get_tx_stats(), 127 * 8)
2619 self.unconfig_sa(np)
2622 self.unconfig_protect(np3)
2623 self.unconfig_sa(np3)
2624 self.unconfig_network(p)
2626 def test_tun_46(self):
2627 """IPSEC tunnel protect 4o6"""
2629 p = self.ipv6_params
2631 self.config_network(p)
2632 self.config_sa_tra(p)
2633 self.config_protect(p)
2635 self.verify_tun_46(p, count=127)
2636 self.assertEqual(p.tun_if.get_rx_stats(), 127)
2637 self.assertEqual(p.tun_if.get_tx_stats(), 127)
2640 self.unconfig_protect(p)
2642 self.unconfig_network(p)
2645 @tag_fixme_vpp_workers
2646 class TestIpsec6TunProtectTun(TemplateIpsec, TemplateIpsec6TunProtect, IpsecTun6):
2647 """IPsec IPv6 Tunnel protect - tunnel mode"""
2649 encryption_type = ESP
2650 tun6_encrypt_node_name = "esp6-encrypt-tun"
2651 tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
2654 super(TestIpsec6TunProtectTun, self).setUp()
2656 self.tun_if = self.pg0
2659 super(TestIpsec6TunProtectTun, self).tearDown()
2661 def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
2663 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2665 IPv6(src=sw_intf.remote_ip6, dst=sw_intf.local_ip6)
2666 / IPv6(src=src, dst=dst)
2667 / UDP(sport=1166, dport=2233)
2668 / Raw(b"X" * payload_size)
2670 for i in range(count)
2673 def gen_pkts6(self, p, sw_intf, src, dst, count=1, payload_size=100):
2675 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2676 / IPv6(src=src, dst=dst)
2677 / UDP(sport=1166, dport=2233)
2678 / Raw(b"X" * payload_size)
2679 for i in range(count)
2682 def verify_decrypted6(self, p, rxs):
2684 self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
2685 self.assert_equal(rx[IPv6].src, p.remote_tun_if_host)
2686 self.assert_packet_checksums_valid(rx)
2688 def verify_encrypted6(self, p, sa, rxs):
2691 pkt = sa.decrypt(rx[IPv6])
2692 if not pkt.haslayer(IPv6):
2693 pkt = IPv6(pkt[Raw].load)
2694 self.assert_packet_checksums_valid(pkt)
2695 self.assert_equal(pkt[IPv6].dst, self.pg0.remote_ip6)
2696 self.assert_equal(pkt[IPv6].src, self.pg0.local_ip6)
2697 inner = pkt[IPv6].payload
2698 self.assertEqual(inner[IPv6][IPv6].dst, p.remote_tun_if_host)
2700 except (IndexError, AssertionError):
2701 self.logger.debug(ppp("Unexpected packet:", rx))
2703 self.logger.debug(ppp("Decrypted packet:", pkt))
2708 def test_tun_66(self):
2709 """IPSEC tunnel protect"""
2711 p = self.ipv6_params
2713 self.config_network(p)
2714 self.config_sa_tun(p)
2715 self.config_protect(p)
2717 self.verify_tun_66(p, count=127)
2719 self.assertEqual(p.tun_if.get_rx_stats(), 127)
2720 self.assertEqual(p.tun_if.get_tx_stats(), 127)
2722 # rekey - create new SAs and update the tunnel protection
2724 np.crypt_key = b"X" + p.crypt_key[1:]
2725 np.scapy_tun_spi += 100
2726 np.scapy_tun_sa_id += 1
2727 np.vpp_tun_spi += 100
2728 np.vpp_tun_sa_id += 1
2729 np.tun_if.local_spi = p.vpp_tun_spi
2730 np.tun_if.remote_spi = p.scapy_tun_spi
2732 self.config_sa_tun(np)
2733 self.config_protect(np)
2736 self.verify_tun_66(np, count=127)
2737 self.assertEqual(p.tun_if.get_rx_stats(), 254)
2738 self.assertEqual(p.tun_if.get_tx_stats(), 254)
2741 self.unconfig_protect(np)
2742 self.unconfig_sa(np)
2743 self.unconfig_network(p)
2746 class TestIpsec6TunProtectTunDrop(TemplateIpsec, TemplateIpsec6TunProtect, IpsecTun6):
2747 """IPsec IPv6 Tunnel protect - tunnel mode - drop"""
2749 encryption_type = ESP
2750 tun6_encrypt_node_name = "esp6-encrypt-tun"
2751 tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
2754 super(TestIpsec6TunProtectTunDrop, self).setUp()
2756 self.tun_if = self.pg0
2759 super(TestIpsec6TunProtectTunDrop, self).tearDown()
2761 def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
2762 # the IP destination of the revelaed packet does not match
2763 # that assigned to the tunnel
2765 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2767 IPv6(src=sw_intf.remote_ip6, dst="5::5")
2768 / IPv6(src=src, dst=dst)
2769 / UDP(sport=1144, dport=2233)
2770 / Raw(b"X" * payload_size)
2772 for i in range(count)
2775 def test_tun_drop_66(self):
2776 """IPSEC 6 tunnel protect bogus tunnel header"""
2778 p = self.ipv6_params
2780 self.config_network(p)
2781 self.config_sa_tun(p)
2782 self.config_protect(p)
2784 tx = self.gen_encrypt_pkts6(
2788 src=p.remote_tun_if_host,
2789 dst=self.pg1.remote_ip6,
2792 self.send_and_assert_no_replies(self.tun_if, tx)
2794 self.unconfig_protect(p)
2796 self.unconfig_network(p)
2799 class TemplateIpsecItf4(object):
2800 """IPsec Interface IPv4"""
2802 encryption_type = ESP
2803 tun4_encrypt_node_name = "esp4-encrypt-tun"
2804 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
2805 tun4_input_node = "ipsec4-tun-input"
2807 def config_sa_tun(self, p, src, dst):
2808 config_tun_params(p, self.encryption_type, None, src, dst)
2810 p.tun_sa_out = VppIpsecSA(
2816 p.crypt_algo_vpp_id,
2818 self.vpp_esp_protocol,
2823 p.tun_sa_out.add_vpp_config()
2825 p.tun_sa_in = VppIpsecSA(
2831 p.crypt_algo_vpp_id,
2833 self.vpp_esp_protocol,
2838 p.tun_sa_in.add_vpp_config()
2840 def config_protect(self, p):
2841 p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
2842 p.tun_protect.add_vpp_config()
2844 def config_network(self, p, instance=0xFFFFFFFF):
2845 p.tun_if = VppIpsecInterface(self, instance=instance)
2847 p.tun_if.add_vpp_config()
2849 p.tun_if.config_ip4()
2850 p.tun_if.config_ip6()
2852 p.route = VppIpRoute(
2854 p.remote_tun_if_host,
2856 [VppRoutePath(p.tun_if.remote_ip4, 0xFFFFFFFF)],
2858 p.route.add_vpp_config()
2861 p.remote_tun_if_host6,
2865 p.tun_if.remote_ip6, 0xFFFFFFFF, proto=DpoProto.DPO_PROTO_IP6
2871 def unconfig_network(self, p):
2872 p.route.remove_vpp_config()
2873 p.tun_if.remove_vpp_config()
2875 def unconfig_protect(self, p):
2876 p.tun_protect.remove_vpp_config()
2878 def unconfig_sa(self, p):
2879 p.tun_sa_out.remove_vpp_config()
2880 p.tun_sa_in.remove_vpp_config()
2883 @tag_fixme_vpp_workers
2884 class TestIpsecItf4(TemplateIpsec, TemplateIpsecItf4, IpsecTun4):
2885 """IPsec Interface IPv4"""
2888 super(TestIpsecItf4, self).setUp()
2890 self.tun_if = self.pg0
2893 super(TestIpsecItf4, self).tearDown()
2895 def test_tun_instance_44(self):
2896 p = self.ipv4_params
2897 self.config_network(p, instance=3)
2899 with self.assertRaises(CliFailedCommandError):
2900 self.vapi.cli("show interface ipsec0")
2902 output = self.vapi.cli("show interface ipsec3")
2903 self.assertTrue("unknown" not in output)
2905 self.unconfig_network(p)
2907 def test_tun_44(self):
2908 """IPSEC interface IPv4"""
2911 p = self.ipv4_params
2913 self.config_network(p)
2915 p, self.encryption_type, None, self.pg0.local_ip4, self.pg0.remote_ip4
2917 self.verify_tun_dropped_44(p, count=n_pkts)
2918 self.config_sa_tun(p, self.pg0.local_ip4, self.pg0.remote_ip4)
2919 self.config_protect(p)
2921 self.verify_tun_44(p, count=n_pkts)
2922 self.assertEqual(p.tun_if.get_rx_stats(), n_pkts)
2923 self.assertEqual(p.tun_if.get_tx_stats(), n_pkts)
2925 p.tun_if.admin_down()
2926 self.verify_tun_dropped_44(p, count=n_pkts)
2928 self.verify_tun_44(p, count=n_pkts)
2930 self.assertEqual(p.tun_if.get_rx_stats(), 3 * n_pkts)
2931 self.assertEqual(p.tun_if.get_tx_stats(), 2 * n_pkts)
2933 # it's a v6 packet when its encrypted
2934 self.tun4_encrypt_node_name = "esp6-encrypt-tun"
2936 self.verify_tun_64(p, count=n_pkts)
2937 self.assertEqual(p.tun_if.get_rx_stats(), 4 * n_pkts)
2938 self.assertEqual(p.tun_if.get_tx_stats(), 3 * n_pkts)
2940 self.tun4_encrypt_node_name = "esp4-encrypt-tun"
2942 self.vapi.cli("clear interfaces")
2944 # rekey - create new SAs and update the tunnel protection
2946 np.crypt_key = b"X" + p.crypt_key[1:]
2947 np.scapy_tun_spi += 100
2948 np.scapy_tun_sa_id += 1
2949 np.vpp_tun_spi += 100
2950 np.vpp_tun_sa_id += 1
2951 np.tun_if.local_spi = p.vpp_tun_spi
2952 np.tun_if.remote_spi = p.scapy_tun_spi
2954 self.config_sa_tun(np, self.pg0.local_ip4, self.pg0.remote_ip4)
2955 self.config_protect(np)
2958 self.verify_tun_44(np, count=n_pkts)
2959 self.assertEqual(p.tun_if.get_rx_stats(), n_pkts)
2960 self.assertEqual(p.tun_if.get_tx_stats(), n_pkts)
2963 self.unconfig_protect(np)
2964 self.unconfig_sa(np)
2965 self.unconfig_network(p)
2967 def test_tun_44_null(self):
2968 """IPSEC interface IPv4 NULL auth/crypto"""
2971 p = copy.copy(self.ipv4_params)
2973 p.auth_algo_vpp_id = VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_NONE
2974 p.crypt_algo_vpp_id = (
2975 VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_NONE
2977 p.crypt_algo = "NULL"
2978 p.auth_algo = "NULL"
2980 self.config_network(p)
2981 self.config_sa_tun(p, self.pg0.local_ip4, self.pg0.remote_ip4)
2982 self.config_protect(p)
2984 self.logger.info(self.vapi.cli("sh ipsec sa"))
2985 self.verify_tun_44(p, count=n_pkts)
2988 self.unconfig_protect(p)
2990 self.unconfig_network(p)
2992 def test_tun_44_police(self):
2993 """IPSEC interface IPv4 with input policer"""
2995 p = self.ipv4_params
2997 self.config_network(p)
2998 self.config_sa_tun(p, self.pg0.local_ip4, self.pg0.remote_ip4)
2999 self.config_protect(p)
3001 action_tx = PolicerAction(
3002 VppEnum.vl_api_sse2_qos_action_type_t.SSE2_QOS_ACTION_API_TRANSMIT, 0
3004 policer = VppPolicer(
3011 conform_action=action_tx,
3012 exceed_action=action_tx,
3013 violate_action=action_tx,
3015 policer.add_vpp_config()
3017 # Start policing on tun
3018 policer.apply_vpp_config(p.tun_if.sw_if_index, Dir.RX, True)
3020 self.verify_tun_44(p, count=n_pkts)
3021 self.assertEqual(p.tun_if.get_rx_stats(), n_pkts)
3022 self.assertEqual(p.tun_if.get_tx_stats(), n_pkts)
3024 stats = policer.get_stats()
3026 # Single rate, 2 colour policer - expect conform, violate but no exceed
3027 self.assertGreater(stats["conform_packets"], 0)
3028 self.assertEqual(stats["exceed_packets"], 0)
3029 self.assertGreater(stats["violate_packets"], 0)
3031 # Stop policing on tun
3032 policer.apply_vpp_config(p.tun_if.sw_if_index, Dir.RX, False)
3033 self.verify_tun_44(p, count=n_pkts)
3035 # No new policer stats
3036 statsnew = policer.get_stats()
3037 self.assertEqual(stats, statsnew)
3040 policer.remove_vpp_config()
3041 self.unconfig_protect(p)
3043 self.unconfig_network(p)
3046 class TestIpsecItf4MPLS(TemplateIpsec, TemplateIpsecItf4, IpsecTun4):
3047 """IPsec Interface MPLSoIPv4"""
3049 tun4_encrypt_node_name = "esp-mpls-encrypt-tun"
3052 super(TestIpsecItf4MPLS, self).setUp()
3054 self.tun_if = self.pg0
3057 super(TestIpsecItf4MPLS, self).tearDown()
3059 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
3061 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
3063 MPLS(label=44, ttl=3)
3064 / IP(src=src, dst=dst)
3065 / UDP(sport=1166, dport=2233)
3066 / Raw(b"X" * payload_size)
3068 for i in range(count)
3071 def verify_encrypted(self, p, sa, rxs):
3074 pkt = sa.decrypt(rx[IP])
3075 if not pkt.haslayer(IP):
3076 pkt = IP(pkt[Raw].load)
3077 self.assert_packet_checksums_valid(pkt)
3078 self.assert_equal(pkt[MPLS].label, 44)
3079 self.assert_equal(pkt[IP].dst, p.remote_tun_if_host)
3080 except (IndexError, AssertionError):
3081 self.logger.debug(ppp("Unexpected packet:", rx))
3083 self.logger.debug(ppp("Decrypted packet:", pkt))
3088 def test_tun_mpls_o_ip4(self):
3089 """IPSEC interface MPLS over IPv4"""
3092 p = self.ipv4_params
3095 tbl = VppMplsTable(self, 0)
3096 tbl.add_vpp_config()
3098 self.config_network(p)
3099 # deag MPLS routes from the tunnel
3101 self, 44, 1, [VppRoutePath(self.pg1.remote_ip4, self.pg1.sw_if_index)]
3106 p.tun_if.remote_ip4, p.tun_if.sw_if_index, labels=[VppMplsLabel(44)]
3110 p.tun_if.enable_mpls()
3112 self.config_sa_tun(p, self.pg0.local_ip4, self.pg0.remote_ip4)
3113 self.config_protect(p)
3115 self.verify_tun_44(p, count=n_pkts)
3118 p.tun_if.disable_mpls()
3119 self.unconfig_protect(p)
3121 self.unconfig_network(p)
3124 class TemplateIpsecItf6(object):
3125 """IPsec Interface IPv6"""
3127 encryption_type = ESP
3128 tun6_encrypt_node_name = "esp6-encrypt-tun"
3129 tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
3130 tun6_input_node = "ipsec6-tun-input"
3132 def config_sa_tun(self, p, src, dst):
3133 config_tun_params(p, self.encryption_type, None, src, dst)
3135 if not hasattr(p, "tun_flags"):
3137 if not hasattr(p, "hop_limit"):
3140 p.tun_sa_out = VppIpsecSA(
3146 p.crypt_algo_vpp_id,
3148 self.vpp_esp_protocol,
3152 tun_flags=p.tun_flags,
3153 hop_limit=p.hop_limit,
3155 p.tun_sa_out.add_vpp_config()
3157 p.tun_sa_in = VppIpsecSA(
3163 p.crypt_algo_vpp_id,
3165 self.vpp_esp_protocol,
3170 p.tun_sa_in.add_vpp_config()
3172 def config_protect(self, p):
3173 p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
3174 p.tun_protect.add_vpp_config()
3176 def config_network(self, p):
3177 p.tun_if = VppIpsecInterface(self)
3179 p.tun_if.add_vpp_config()
3181 p.tun_if.config_ip4()
3182 p.tun_if.config_ip6()
3186 p.remote_tun_if_host4,
3188 [VppRoutePath(p.tun_if.remote_ip4, 0xFFFFFFFF)],
3192 p.route = VppIpRoute(
3194 p.remote_tun_if_host,
3198 p.tun_if.remote_ip6, 0xFFFFFFFF, proto=DpoProto.DPO_PROTO_IP6
3202 p.route.add_vpp_config()
3204 def unconfig_network(self, p):
3205 p.route.remove_vpp_config()
3206 p.tun_if.remove_vpp_config()
3208 def unconfig_protect(self, p):
3209 p.tun_protect.remove_vpp_config()
3211 def unconfig_sa(self, p):
3212 p.tun_sa_out.remove_vpp_config()
3213 p.tun_sa_in.remove_vpp_config()
3216 @tag_fixme_vpp_workers
3217 class TestIpsecItf6(TemplateIpsec, TemplateIpsecItf6, IpsecTun6):
3218 """IPsec Interface IPv6"""
3221 super(TestIpsecItf6, self).setUp()
3223 self.tun_if = self.pg0
3226 super(TestIpsecItf6, self).tearDown()
3228 def test_tun_66(self):
3229 """IPSEC interface IPv6"""
3231 tf = VppEnum.vl_api_tunnel_encap_decap_flags_t
3233 p = self.ipv6_params
3234 p.inner_hop_limit = 24
3235 p.outer_hop_limit = 23
3236 p.outer_flow_label = 243224
3237 p.tun_flags = tf.TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_COPY_HOP_LIMIT
3239 self.config_network(p)
3241 p, self.encryption_type, None, self.pg0.local_ip6, self.pg0.remote_ip6
3243 self.verify_drop_tun_66(p, count=n_pkts)
3244 self.config_sa_tun(p, self.pg0.local_ip6, self.pg0.remote_ip6)
3245 self.config_protect(p)
3247 self.verify_tun_66(p, count=n_pkts)
3248 self.assertEqual(p.tun_if.get_rx_stats(), n_pkts)
3249 self.assertEqual(p.tun_if.get_tx_stats(), n_pkts)
3251 p.tun_if.admin_down()
3252 self.verify_drop_tun_66(p, count=n_pkts)
3254 self.verify_tun_66(p, count=n_pkts)
3256 self.assertEqual(p.tun_if.get_rx_stats(), 3 * n_pkts)
3257 self.assertEqual(p.tun_if.get_tx_stats(), 2 * n_pkts)
3259 # it's a v4 packet when its encrypted
3260 self.tun6_encrypt_node_name = "esp4-encrypt-tun"
3262 self.verify_tun_46(p, count=n_pkts)
3263 self.assertEqual(p.tun_if.get_rx_stats(), 4 * n_pkts)
3264 self.assertEqual(p.tun_if.get_tx_stats(), 3 * n_pkts)
3266 self.tun6_encrypt_node_name = "esp6-encrypt-tun"
3268 self.vapi.cli("clear interfaces")
3270 # rekey - create new SAs and update the tunnel protection
3272 np.crypt_key = b"X" + p.crypt_key[1:]
3273 np.scapy_tun_spi += 100
3274 np.scapy_tun_sa_id += 1
3275 np.vpp_tun_spi += 100
3276 np.vpp_tun_sa_id += 1
3277 np.tun_if.local_spi = p.vpp_tun_spi
3278 np.tun_if.remote_spi = p.scapy_tun_spi
3279 np.inner_hop_limit = 24
3280 np.outer_hop_limit = 128
3281 np.inner_flow_label = 0xABCDE
3282 np.outer_flow_label = 0xABCDE
3284 np.tun_flags = tf.TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_COPY_FLOW_LABEL
3286 self.config_sa_tun(np, self.pg0.local_ip6, self.pg0.remote_ip6)
3287 self.config_protect(np)
3290 self.verify_tun_66(np, count=n_pkts)
3291 self.assertEqual(p.tun_if.get_rx_stats(), n_pkts)
3292 self.assertEqual(p.tun_if.get_tx_stats(), n_pkts)
3295 self.unconfig_protect(np)
3296 self.unconfig_sa(np)
3297 self.unconfig_network(p)
3299 def test_tun_66_police(self):
3300 """IPSEC interface IPv6 with input policer"""
3301 tf = VppEnum.vl_api_tunnel_encap_decap_flags_t
3303 p = self.ipv6_params
3304 p.inner_hop_limit = 24
3305 p.outer_hop_limit = 23
3306 p.outer_flow_label = 243224
3307 p.tun_flags = tf.TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_COPY_HOP_LIMIT
3309 self.config_network(p)
3310 self.config_sa_tun(p, self.pg0.local_ip6, self.pg0.remote_ip6)
3311 self.config_protect(p)
3313 action_tx = PolicerAction(
3314 VppEnum.vl_api_sse2_qos_action_type_t.SSE2_QOS_ACTION_API_TRANSMIT, 0
3316 policer = VppPolicer(
3323 conform_action=action_tx,
3324 exceed_action=action_tx,
3325 violate_action=action_tx,
3327 policer.add_vpp_config()
3329 # Start policing on tun
3330 policer.apply_vpp_config(p.tun_if.sw_if_index, Dir.RX, True)
3332 self.verify_tun_66(p, count=n_pkts)
3333 self.assertEqual(p.tun_if.get_rx_stats(), n_pkts)
3334 self.assertEqual(p.tun_if.get_tx_stats(), n_pkts)
3336 stats = policer.get_stats()
3338 # Single rate, 2 colour policer - expect conform, violate but no exceed
3339 self.assertGreater(stats["conform_packets"], 0)
3340 self.assertEqual(stats["exceed_packets"], 0)
3341 self.assertGreater(stats["violate_packets"], 0)
3343 # Stop policing on tun
3344 policer.apply_vpp_config(p.tun_if.sw_if_index, Dir.RX, False)
3345 self.verify_tun_66(p, count=n_pkts)
3347 # No new policer stats
3348 statsnew = policer.get_stats()
3349 self.assertEqual(stats, statsnew)
3352 policer.remove_vpp_config()
3353 self.unconfig_protect(p)
3355 self.unconfig_network(p)
3358 class TestIpsecMIfEsp4(TemplateIpsec, IpsecTun4):
3359 """Ipsec P2MP ESP v4 tests"""
3361 tun4_encrypt_node_name = "esp4-encrypt-tun"
3362 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
3363 encryption_type = ESP
3365 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
3367 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
3369 IP(src=self.pg1.local_ip4, dst=self.pg1.remote_ip4)
3370 / UDP(sport=1144, dport=2233)
3371 / Raw(b"X" * payload_size)
3373 for i in range(count)
3376 def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
3378 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
3379 / IP(src="1.1.1.1", dst=dst)
3380 / UDP(sport=1144, dport=2233)
3381 / Raw(b"X" * payload_size)
3382 for i in range(count)
3385 def verify_decrypted(self, p, rxs):
3387 self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
3388 self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
3390 def verify_encrypted(self, p, sa, rxs):
3394 rx[IP].tos, VppEnum.vl_api_ip_dscp_t.IP_API_DSCP_EF << 2
3396 self.assertEqual(rx[IP].ttl, p.hop_limit)
3397 pkt = sa.decrypt(rx[IP])
3398 if not pkt.haslayer(IP):
3399 pkt = IP(pkt[Raw].load)
3400 self.assert_packet_checksums_valid(pkt)
3402 self.assertEqual(e[IP].dst, p.remote_tun_if_host)
3403 except (IndexError, AssertionError):
3404 self.logger.debug(ppp("Unexpected packet:", rx))
3406 self.logger.debug(ppp("Decrypted packet:", pkt))
3412 super(TestIpsecMIfEsp4, self).setUp()
3415 self.tun_if = self.pg0
3416 p = self.ipv4_params
3417 p.tun_if = VppIpsecInterface(
3418 self, mode=(VppEnum.vl_api_tunnel_mode_t.TUNNEL_API_MODE_MP)
3420 p.tun_if.add_vpp_config()
3422 p.tun_if.config_ip4()
3423 p.tun_if.unconfig_ip4()
3424 p.tun_if.config_ip4()
3425 p.tun_if.generate_remote_hosts(N_NHS)
3426 self.pg0.generate_remote_hosts(N_NHS)
3427 self.pg0.configure_ipv4_neighbors()
3429 r_all = AclRule(True, src_prefix="0.0.0.0/0", dst_prefix="0.0.0.0/0", proto=0)
3430 a = VppAcl(self, [r_all]).add_vpp_config()
3432 VppAclInterface(self, self.pg0.sw_if_index, [a]).add_vpp_config()
3433 VppAclInterface(self, p.tun_if.sw_if_index, [a]).add_vpp_config()
3435 # setup some SAs for several next-hops on the interface
3436 self.multi_params = []
3438 for ii in range(N_NHS):
3439 p = copy.copy(self.ipv4_params)
3441 p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
3442 p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
3443 p.scapy_tun_spi = p.scapy_tun_spi + ii
3444 p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
3445 p.vpp_tun_spi = p.vpp_tun_spi + ii
3447 p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
3448 p.scapy_tra_spi = p.scapy_tra_spi + ii
3449 p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
3450 p.vpp_tra_spi = p.vpp_tra_spi + ii
3451 p.hop_limit = ii + 10
3452 p.tun_sa_out = VppIpsecSA(
3458 p.crypt_algo_vpp_id,
3460 self.vpp_esp_protocol,
3462 self.pg0.remote_hosts[ii].ip4,
3463 dscp=VppEnum.vl_api_ip_dscp_t.IP_API_DSCP_EF,
3464 hop_limit=p.hop_limit,
3466 p.tun_sa_out.add_vpp_config()
3468 p.tun_sa_in = VppIpsecSA(
3474 p.crypt_algo_vpp_id,
3476 self.vpp_esp_protocol,
3477 self.pg0.remote_hosts[ii].ip4,
3479 dscp=VppEnum.vl_api_ip_dscp_t.IP_API_DSCP_EF,
3480 hop_limit=p.hop_limit,
3482 p.tun_sa_in.add_vpp_config()
3484 p.tun_protect = VppIpsecTunProtect(
3489 nh=p.tun_if.remote_hosts[ii].ip4,
3491 p.tun_protect.add_vpp_config()
3494 self.encryption_type,
3497 self.pg0.remote_hosts[ii].ip4,
3499 self.multi_params.append(p)
3501 p.via_tun_route = VppIpRoute(
3503 p.remote_tun_if_host,
3505 [VppRoutePath(p.tun_if.remote_hosts[ii].ip4, p.tun_if.sw_if_index)],
3508 p.tun_dst = self.pg0.remote_hosts[ii].ip4
3511 p = self.ipv4_params
3512 p.tun_if.unconfig_ip4()
3513 super(TestIpsecMIfEsp4, self).tearDown()
3515 def test_tun_44(self):
3518 for p in self.multi_params:
3519 self.verify_tun_44(p, count=N_PKTS)
3521 # remove one tunnel protect, the rest should still work
3522 self.multi_params[0].tun_protect.remove_vpp_config()
3523 self.verify_tun_dropped_44(self.multi_params[0], count=N_PKTS)
3524 self.multi_params[0].via_tun_route.remove_vpp_config()
3525 self.verify_tun_dropped_44(self.multi_params[0], count=N_PKTS)
3527 for p in self.multi_params[1:]:
3528 self.verify_tun_44(p, count=N_PKTS)
3530 self.multi_params[0].tun_protect.add_vpp_config()
3531 self.multi_params[0].via_tun_route.add_vpp_config()
3533 for p in self.multi_params:
3534 self.verify_tun_44(p, count=N_PKTS)
3537 class TestIpsecItf6MPLS(TemplateIpsec, TemplateIpsecItf6, IpsecTun6):
3538 """IPsec Interface MPLSoIPv6"""
3540 tun6_encrypt_node_name = "esp-mpls-encrypt-tun"
3543 super(TestIpsecItf6MPLS, self).setUp()
3545 self.tun_if = self.pg0
3548 super(TestIpsecItf6MPLS, self).tearDown()
3550 def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
3552 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
3554 MPLS(label=66, ttl=3)
3555 / IPv6(src=src, dst=dst)
3556 / UDP(sport=1166, dport=2233)
3557 / Raw(b"X" * payload_size)
3559 for i in range(count)
3562 def verify_encrypted6(self, p, sa, rxs):
3565 pkt = sa.decrypt(rx[IPv6])
3566 if not pkt.haslayer(IPv6):
3567 pkt = IP(pkt[Raw].load)
3568 self.assert_packet_checksums_valid(pkt)
3569 self.assert_equal(pkt[MPLS].label, 66)
3570 self.assert_equal(pkt[IPv6].dst, p.remote_tun_if_host)
3571 except (IndexError, AssertionError):
3572 self.logger.debug(ppp("Unexpected packet:", rx))
3574 self.logger.debug(ppp("Decrypted packet:", pkt))
3579 def test_tun_mpls_o_ip6(self):
3580 """IPSEC interface MPLS over IPv6"""
3583 p = self.ipv6_params
3586 tbl = VppMplsTable(self, 0)
3587 tbl.add_vpp_config()
3589 self.config_network(p)
3590 # deag MPLS routes from the tunnel
3595 [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)],
3596 eos_proto=f.FIB_PATH_NH_PROTO_IP6,
3601 p.tun_if.remote_ip6, p.tun_if.sw_if_index, labels=[VppMplsLabel(66)]
3605 p.tun_if.enable_mpls()
3607 self.config_sa_tun(p, self.pg0.local_ip6, self.pg0.remote_ip6)
3608 self.config_protect(p)
3610 self.verify_tun_66(p, count=n_pkts)
3613 p.tun_if.disable_mpls()
3614 self.unconfig_protect(p)
3616 self.unconfig_network(p)
3619 if __name__ == "__main__":
3620 unittest.main(testRunner=VppTestRunner)