X-Git-Url: https://gerrit.fd.io/r/gitweb?p=vpp.git;a=blobdiff_plain;f=test%2Ftest_ipsec_tun_if_esp.py;h=5bcd9ddfae0918a3c1007654b00cbfd577b74b24;hp=7e36d1345be9473b4a40c75ae340111f86ee2d1f;hb=4a58e49cf;hpb=20399f8f3a27d54f65c4aff92998a2a345a7adab diff --git a/test/test_ipsec_tun_if_esp.py b/test/test_ipsec_tun_if_esp.py index 7e36d1345be..5bcd9ddfae0 100644 --- a/test/test_ipsec_tun_if_esp.py +++ b/test/test_ipsec_tun_if_esp.py @@ -7,13 +7,15 @@ from scapy.layers.l2 import Ether, GRE, Dot1Q from scapy.packet import Raw from scapy.layers.inet import IP, UDP from scapy.layers.inet6 import IPv6 +from scapy.contrib.mpls import MPLS from framework import VppTestRunner from template_ipsec import TemplateIpsec, IpsecTun4Tests, IpsecTun6Tests, \ IpsecTun4, IpsecTun6, IpsecTcpTests, mk_scapy_crypt_key, \ IpsecTun6HandoffTests, IpsecTun4HandoffTests, config_tun_params from vpp_gre_interface import VppGreInterface from vpp_ipip_tun_interface import VppIpIpTunInterface -from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto +from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto, VppMplsLabel, \ + VppMplsTable, VppMplsRoute, FibPathProto from vpp_ipsec import VppIpsecSA, VppIpsecTunProtect, VppIpsecInterface from vpp_l2 import VppBridgeDomain, VppBridgeDomainPort from vpp_sub_interface import L2_VTR_OP, VppDot1QSubint @@ -2570,6 +2572,82 @@ class TestIpsecItf4(TemplateIpsec, self.unconfig_network(p) +class TestIpsecItf4MPLS(TemplateIpsec, + TemplateIpsecItf4, + IpsecTun4): + """ IPsec Interface MPLSoIPv4 """ + + tun4_encrypt_node_name = "esp-mpls-encrypt-tun" + + def setUp(self): + super(TestIpsecItf4MPLS, self).setUp() + + self.tun_if = self.pg0 + + def tearDown(self): + super(TestIpsecItf4MPLS, self).tearDown() + + def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, + payload_size=100): + return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) / + sa.encrypt(MPLS(label=44, ttl=3) / + IP(src=src, dst=dst) / + UDP(sport=1166, dport=2233) / + Raw(b'X' * payload_size)) + for i in range(count)] + + def verify_encrypted(self, p, sa, rxs): + for rx in rxs: + try: + pkt = sa.decrypt(rx[IP]) + if not pkt.haslayer(IP): + pkt = IP(pkt[Raw].load) + self.assert_packet_checksums_valid(pkt) + self.assert_equal(pkt[MPLS].label, 44) + self.assert_equal(pkt[IP].dst, p.remote_tun_if_host) + except (IndexError, AssertionError): + self.logger.debug(ppp("Unexpected packet:", rx)) + try: + self.logger.debug(ppp("Decrypted packet:", pkt)) + except: + pass + raise + + def test_tun_mpls_o_ip4(self): + """IPSEC interface MPLS over IPv4""" + + n_pkts = 127 + p = self.ipv4_params + f = FibPathProto + + tbl = VppMplsTable(self, 0) + tbl.add_vpp_config() + + self.config_network(p) + # deag MPLS routes from the tunnel + r4 = VppMplsRoute(self, 44, 1, + [VppRoutePath( + self.pg1.remote_ip4, + self.pg1.sw_if_index)]).add_vpp_config() + p.route.modify([VppRoutePath(p.tun_if.remote_ip4, + p.tun_if.sw_if_index, + labels=[VppMplsLabel(44)])]) + p.tun_if.enable_mpls() + + self.config_sa_tun(p, + self.pg0.local_ip4, + self.pg0.remote_ip4) + self.config_protect(p) + + self.verify_tun_44(p, count=n_pkts) + + # cleanup + p.tun_if.disable_mpls() + self.unconfig_protect(p) + self.unconfig_sa(p) + self.unconfig_network(p) + + class TemplateIpsecItf6(object): """ IPsec Interface IPv6 """ @@ -2848,5 +2926,82 @@ class TestIpsecMIfEsp4(TemplateIpsec, IpsecTun4): self.verify_tun_44(p, count=N_PKTS) +class TestIpsecItf6MPLS(TemplateIpsec, + TemplateIpsecItf6, + IpsecTun6): + """ IPsec Interface MPLSoIPv6 """ + + tun6_encrypt_node_name = "esp-mpls-encrypt-tun" + + def setUp(self): + super(TestIpsecItf6MPLS, self).setUp() + + self.tun_if = self.pg0 + + def tearDown(self): + super(TestIpsecItf6MPLS, self).tearDown() + + def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1, + payload_size=100): + return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) / + sa.encrypt(MPLS(label=66, ttl=3) / + IPv6(src=src, dst=dst) / + UDP(sport=1166, dport=2233) / + Raw(b'X' * payload_size)) + for i in range(count)] + + def verify_encrypted6(self, p, sa, rxs): + for rx in rxs: + try: + pkt = sa.decrypt(rx[IPv6]) + if not pkt.haslayer(IPv6): + pkt = IP(pkt[Raw].load) + self.assert_packet_checksums_valid(pkt) + self.assert_equal(pkt[MPLS].label, 66) + self.assert_equal(pkt[IPv6].dst, p.remote_tun_if_host) + except (IndexError, AssertionError): + self.logger.debug(ppp("Unexpected packet:", rx)) + try: + self.logger.debug(ppp("Decrypted packet:", pkt)) + except: + pass + raise + + def test_tun_mpls_o_ip6(self): + """IPSEC interface MPLS over IPv6""" + + n_pkts = 127 + p = self.ipv6_params + f = FibPathProto + + tbl = VppMplsTable(self, 0) + tbl.add_vpp_config() + + self.config_network(p) + # deag MPLS routes from the tunnel + r6 = VppMplsRoute(self, 66, 1, + [VppRoutePath( + self.pg1.remote_ip6, + self.pg1.sw_if_index)], + eos_proto=f.FIB_PATH_NH_PROTO_IP6).add_vpp_config() + p.route.modify([VppRoutePath(p.tun_if.remote_ip6, + p.tun_if.sw_if_index, + labels=[VppMplsLabel(66)])]) + p.tun_if.enable_mpls() + + self.config_sa_tun(p, + self.pg0.local_ip6, + self.pg0.remote_ip6) + self.config_protect(p) + + self.verify_tun_66(p, count=n_pkts) + + # cleanup + p.tun_if.disable_mpls() + self.unconfig_protect(p) + self.unconfig_sa(p) + self.unconfig_network(p) + + if __name__ == '__main__': unittest.main(testRunner=VppTestRunner)