IPSEC: Mutli-tunnel tests 42/18442/4
authorNeale Ranns <nranns@cisco.com>
Wed, 20 Mar 2019 18:24:43 +0000 (18:24 +0000)
committerPaul Vinciguerra <pvinci@vinciconsulting.com>
Thu, 21 Mar 2019 13:44:31 +0000 (13:44 +0000)
Change-Id: I46f1db6579835c6613fdbb2b726246cc62b135fe
Signed-off-by: Neale Ranns <nranns@cisco.com>
test/template_ipsec.py
test/test_ipsec_ah.py
test/test_ipsec_esp.py
test/test_ipsec_tun_if_esp.py
test/vpp_interface.py

index 483699c..1b9a379 100644 (file)
@@ -79,6 +79,45 @@ class IPsecIPv6Params(object):
         self.nat_header = None
 
 
+def config_tun_params(p, encryption_type, tun_if):
+    ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
+    p.scapy_tun_sa = SecurityAssociation(
+        encryption_type, spi=p.vpp_tun_spi,
+        crypt_algo=p.crypt_algo, crypt_key=p.crypt_key,
+        auth_algo=p.auth_algo, auth_key=p.auth_key,
+        tunnel_header=ip_class_by_addr_type[p.addr_type](
+            src=tun_if.remote_addr[p.addr_type],
+            dst=tun_if.local_addr[p.addr_type]),
+        nat_t_header=p.nat_header)
+    p.vpp_tun_sa = SecurityAssociation(
+        encryption_type, spi=p.scapy_tun_spi,
+        crypt_algo=p.crypt_algo, crypt_key=p.crypt_key,
+        auth_algo=p.auth_algo, auth_key=p.auth_key,
+        tunnel_header=ip_class_by_addr_type[p.addr_type](
+            dst=tun_if.remote_addr[p.addr_type],
+            src=tun_if.local_addr[p.addr_type]),
+        nat_t_header=p.nat_header)
+
+
+def config_tra_params(p, encryption_type):
+    p.scapy_tra_sa = SecurityAssociation(
+        encryption_type,
+        spi=p.vpp_tra_spi,
+        crypt_algo=p.crypt_algo,
+        crypt_key=p.crypt_key,
+        auth_algo=p.auth_algo,
+        auth_key=p.auth_key,
+        nat_t_header=p.nat_header)
+    p.vpp_tra_sa = SecurityAssociation(
+        encryption_type,
+        spi=p.scapy_tra_spi,
+        crypt_algo=p.crypt_algo,
+        crypt_key=p.crypt_key,
+        auth_algo=p.auth_algo,
+        auth_key=p.auth_key,
+        nat_t_header=p.nat_header)
+
+
 class TemplateIpsec(VppTestCase):
     """
     TRANSPORT MODE:
@@ -164,26 +203,6 @@ class TemplateIpsec(VppTestCase):
                 ICMPv6EchoRequest(id=0, seq=1, data=self.payload)
                 for i in range(count)]
 
-    def configure_sa_tun(self, params):
-        ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
-        scapy_tun_sa = SecurityAssociation(
-            self.encryption_type, spi=params.vpp_tun_spi,
-            crypt_algo=params.crypt_algo, crypt_key=params.crypt_key,
-            auth_algo=params.auth_algo, auth_key=params.auth_key,
-            tunnel_header=ip_class_by_addr_type[params.addr_type](
-                src=self.tun_if.remote_addr[params.addr_type],
-                dst=self.tun_if.local_addr[params.addr_type]),
-            nat_t_header=params.nat_header)
-        vpp_tun_sa = SecurityAssociation(
-            self.encryption_type, spi=params.scapy_tun_spi,
-            crypt_algo=params.crypt_algo, crypt_key=params.crypt_key,
-            auth_algo=params.auth_algo, auth_key=params.auth_key,
-            tunnel_header=ip_class_by_addr_type[params.addr_type](
-                dst=self.tun_if.remote_addr[params.addr_type],
-                src=self.tun_if.local_addr[params.addr_type]),
-            nat_t_header=params.nat_header)
-        return vpp_tun_sa, scapy_tun_sa
-
     def configure_sa_tra(self, params):
         params.scapy_tra_sa = SecurityAssociation(
             self.encryption_type,
@@ -208,15 +227,15 @@ class IpsecTcpTests(object):
         """ verify checksum correctness for vpp generated packets """
         self.vapi.cli("test http server")
         p = self.params[socket.AF_INET]
-        vpp_tun_sa, scapy_tun_sa = self.configure_sa_tun(p)
+        config_tun_params(p, self.encryption_type, self.tun_if)
         send = (Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac) /
-                scapy_tun_sa.encrypt(IP(src=p.remote_tun_if_host,
-                                        dst=self.tun_if.local_ip4) /
-                                     TCP(flags='S', dport=80)))
+                p.scapy_tun_sa.encrypt(IP(src=p.remote_tun_if_host,
+                                          dst=self.tun_if.local_ip4) /
+                                       TCP(flags='S', dport=80)))
         self.logger.debug(ppp("Sending packet:", send))
         recv = self.send_and_expect(self.tun_if, [send], self.tun_if)
         recv = recv[0]
-        decrypted = vpp_tun_sa.decrypt(recv[IP])
+        decrypted = p.vpp_tun_sa.decrypt(recv[IP])
         self.assert_packet_checksums_valid(decrypted)
 
 
@@ -374,14 +393,13 @@ class IpsecTra46Tests(IpsecTra4Tests, IpsecTra6Tests):
     pass
 
 
-class IpsecTun4Tests(object):
-    def test_tun_basic44(self, count=1):
-        """ ipsec 4o4 tunnel basic test """
+class IpsecTun4(object):
+
+    def verify_tun_44(self, p, count=1):
         self.vapi.cli("clear errors")
         try:
-            p = self.params[socket.AF_INET]
-            vpp_tun_sa, scapy_tun_sa = self.configure_sa_tun(p)
-            send_pkts = self.gen_encrypt_pkts(scapy_tun_sa, self.tun_if,
+            config_tun_params(p, self.encryption_type, self.tun_if)
+            send_pkts = self.gen_encrypt_pkts(p.scapy_tun_sa, self.tun_if,
                                               src=p.remote_tun_if_host,
                                               dst=self.pg1.remote_ip4,
                                               count=count)
@@ -395,7 +413,7 @@ class IpsecTun4Tests(object):
             recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
             for recv_pkt in recv_pkts:
                 try:
-                    decrypt_pkt = vpp_tun_sa.decrypt(recv_pkt[IP])
+                    decrypt_pkt = p.vpp_tun_sa.decrypt(recv_pkt[IP])
                     if not decrypt_pkt.haslayer(IP):
                         decrypt_pkt = IP(decrypt_pkt[Raw].load)
                     self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip4)
@@ -432,19 +450,26 @@ class IpsecTun4Tests(object):
         self.assert_packet_counter_equal(self.tun4_encrypt_node_name, count)
         self.assert_packet_counter_equal(self.tun4_decrypt_node_name, count)
 
+
+class IpsecTun4Tests(IpsecTun4):
+
+    def test_tun_basic44(self):
+        """ ipsec 4o4 tunnel basic test """
+        self.verify_tun_44(self.params[socket.AF_INET], count=1)
+
     def test_tun_burst44(self):
         """ ipsec 4o4 tunnel burst test """
-        self.test_tun_basic44(count=257)
+        self.verify_tun_44(self.params[socket.AF_INET], count=257)
 
 
-class IpsecTun6Tests(object):
-    def test_tun_basic66(self, count=1):
+class IpsecTun6(object):
+
+    def verify_tun_66(self, p, count=1):
         """ ipsec 6o6 tunnel basic test """
         self.vapi.cli("clear errors")
         try:
-            p = self.params[socket.AF_INET6]
-            vpp_tun_sa, scapy_tun_sa = self.configure_sa_tun(p)
-            send_pkts = self.gen_encrypt_pkts6(scapy_tun_sa, self.tun_if,
+            config_tun_params(p, self.encryption_type, self.tun_if)
+            send_pkts = self.gen_encrypt_pkts6(p.scapy_tun_sa, self.tun_if,
                                                src=p.remote_tun_if_host,
                                                dst=self.pg1.remote_ip6,
                                                count=count)
@@ -459,7 +484,7 @@ class IpsecTun6Tests(object):
             recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
             for recv_pkt in recv_pkts:
                 try:
-                    decrypt_pkt = vpp_tun_sa.decrypt(recv_pkt[IPv6])
+                    decrypt_pkt = p.vpp_tun_sa.decrypt(recv_pkt[IPv6])
                     if not decrypt_pkt.haslayer(IPv6):
                         decrypt_pkt = IPv6(decrypt_pkt[Raw].load)
                     self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip6)
@@ -489,9 +514,16 @@ class IpsecTun6Tests(object):
         self.assert_packet_counter_equal(self.tun6_encrypt_node_name, count)
         self.assert_packet_counter_equal(self.tun6_decrypt_node_name, count)
 
+
+class IpsecTun6Tests(IpsecTun6):
+
+    def test_tun_basic66(self):
+        """ ipsec 6o6 tunnel basic test """
+        self.verify_tun_66(self.params[socket.AF_INET6], count=1)
+
     def test_tun_burst66(self):
         """ ipsec 6o6 tunnel burst test """
-        self.test_tun_basic66(count=257)
+        self.verify_tun_66(self.params[socket.AF_INET6], count=257)
 
 
 class IpsecTun46Tests(IpsecTun4Tests, IpsecTun6Tests):
index 7498f51..21080ca 100644 (file)
@@ -4,7 +4,8 @@ import unittest
 from scapy.layers.ipsec import AH
 
 from framework import VppTestRunner
-from template_ipsec import TemplateIpsec, IpsecTra46Tests, IpsecTun46Tests
+from template_ipsec import TemplateIpsec, IpsecTra46Tests, IpsecTun46Tests, \
+    config_tun_params, config_tra_params
 from template_ipsec import IpsecTcpTests
 from vpp_ipsec import VppIpsecSA, VppIpsecSpd, VppIpsecSpdEntry,\
         VppIpsecSpdItfBinding
@@ -53,7 +54,7 @@ class TemplateIpsecAh(TemplateIpsec):
 
         for _, p in self.params.items():
             self.config_ah_tra(p)
-            self.configure_sa_tra(p)
+            config_tra_params(p, self.encryption_type)
             self.logger.info(self.vapi.ppcli("show ipsec"))
         for _, p in self.params.items():
             self.config_ah_tun(p)
index 09b7240..4686360 100644 (file)
@@ -5,7 +5,7 @@ from scapy.layers.inet import UDP
 
 from framework import VppTestRunner
 from template_ipsec import IpsecTra46Tests, IpsecTun46Tests, TemplateIpsec, \
-    IpsecTcpTests, IpsecTun4Tests, IpsecTra4Tests
+    IpsecTcpTests, IpsecTun4Tests, IpsecTra4Tests, config_tra_params
 from vpp_ipsec import VppIpsecSpd, VppIpsecSpdEntry, VppIpsecSA,\
         VppIpsecSpdItfBinding
 from vpp_ip_route import VppIpRoute, VppRoutePath
@@ -177,8 +177,6 @@ class TemplateIpsecEsp(TemplateIpsec):
     |pg0| ------->  |VPP| ------> |pg1|
      ---             ---           ---
     """
-    config_esp_tun = config_esp_tun
-    config_esp_tra = config_esp_tra
 
     def setUp(self):
         super(TemplateIpsecEsp, self).setUp()
@@ -193,8 +191,8 @@ class TemplateIpsecEsp(TemplateIpsec):
                               self.tra_if).add_vpp_config()
 
         for _, p in self.params.items():
-            self.config_esp_tra(p)
-            self.configure_sa_tra(p)
+            config_esp_tra(self, p)
+            config_tra_params(p, self.encryption_type)
         self.logger.info(self.vapi.ppcli("show ipsec"))
 
         self.tun_spd = VppIpsecSpd(self, self.tun_spd_id)
@@ -203,7 +201,7 @@ class TemplateIpsecEsp(TemplateIpsec):
                               self.tun_if).add_vpp_config()
 
         for _, p in self.params.items():
-            self.config_esp_tun(p)
+            config_esp_tun(self, p)
         self.logger.info(self.vapi.ppcli("show ipsec"))
 
         for _, p in self.params.items():
@@ -241,9 +239,6 @@ class TemplateIpsecEspUdp(TemplateIpsec):
     """
     UDP encapped ESP
     """
-    config_esp_tun = config_esp_tun
-    config_esp_tra = config_esp_tra
-
     def setUp(self):
         super(TemplateIpsecEspUdp, self).setUp()
         self.encryption_type = ESP
@@ -261,15 +256,15 @@ class TemplateIpsecEspUdp(TemplateIpsec):
         VppIpsecSpdItfBinding(self, self.tra_spd,
                               self.tra_if).add_vpp_config()
 
-        self.config_esp_tra(p)
-        self.configure_sa_tra(p)
+        config_esp_tra(self, p)
+        config_tra_params(p, self.encryption_type)
 
         self.tun_spd = VppIpsecSpd(self, self.tun_spd_id)
         self.tun_spd.add_vpp_config()
         VppIpsecSpdItfBinding(self, self.tun_spd,
                               self.tun_if).add_vpp_config()
 
-        self.config_esp_tun(p)
+        config_esp_tun(self, p)
         self.logger.info(self.vapi.ppcli("show ipsec"))
 
         d = DpoProto.DPO_PROTO_IP4
index 06d2c89..ac96cfb 100644 (file)
@@ -1,9 +1,10 @@
 import unittest
 import socket
+import copy
 from scapy.layers.ipsec import ESP
 from framework import VppTestRunner
 from template_ipsec import TemplateIpsec, IpsecTun4Tests, IpsecTun6Tests, \
-    IpsecTcpTests
+    IpsecTun4, IpsecTun6,  IpsecTcpTests,  config_tun_params
 from vpp_ipsec_tun_interface import VppIpsecTunInterface
 from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto
 
@@ -69,7 +70,7 @@ class TemplateIpsec6TunIfEsp(TemplateIpsec):
         tun_if.admin_up()
         tun_if.config_ip6()
 
-        VppIpRoute(self,  p.remote_tun_if_host, 32,
+        VppIpRoute(self,  p.remote_tun_if_host, 128,
                    [VppRoutePath(tun_if.remote_ip6,
                                  0xffffffff,
                                  proto=DpoProto.DPO_PROTO_IP6)],
@@ -87,5 +88,127 @@ class TestIpsec6TunIfEsp1(TemplateIpsec6TunIfEsp, IpsecTun6Tests):
     tun6_decrypt_node_name = "esp6-decrypt"
 
 
+class TestIpsec4MultiTunIfEsp(TemplateIpsec, IpsecTun4):
+    """ IPsec IPv4 Multi Tunnel interface """
+
+    encryption_type = ESP
+    tun4_encrypt_node_name = "esp4-encrypt"
+    tun4_decrypt_node_name = "esp4-decrypt"
+
+    def setUp(self):
+        super(TestIpsec4MultiTunIfEsp, self).setUp()
+
+        self.tun_if = self.pg0
+
+        self.multi_params = []
+
+        for ii in range(10):
+            p = copy.copy(self.ipv4_params)
+
+            p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
+            p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
+            p.scapy_tun_spi = p.scapy_tun_spi + ii
+            p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
+            p.vpp_tun_spi = p.vpp_tun_spi + ii
+
+            p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
+            p.scapy_tra_spi = p.scapy_tra_spi + ii
+            p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
+            p.vpp_tra_spi = p.vpp_tra_spi + ii
+
+            config_tun_params(p, self.encryption_type, self.tun_if)
+            self.multi_params.append(p)
+
+            p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
+                                            p.scapy_tun_spi,
+                                            p.crypt_algo_vpp_id,
+                                            p.crypt_key, p.crypt_key,
+                                            p.auth_algo_vpp_id, p.auth_key,
+                                            p.auth_key)
+            p.tun_if.add_vpp_config()
+            p.tun_if.admin_up()
+            p.tun_if.config_ip4()
+
+            VppIpRoute(self, p.remote_tun_if_host, 32,
+                       [VppRoutePath(p.tun_if.remote_ip4,
+                                     0xffffffff)]).add_vpp_config()
+
+    def tearDown(self):
+        if not self.vpp_dead:
+            self.vapi.cli("show hardware")
+        super(TestIpsec4MultiTunIfEsp, self).tearDown()
+
+    def test_tun_44(self):
+        """Multiple IPSEC tunnel interfaces """
+        for p in self.multi_params:
+            self.verify_tun_44(p, count=127)
+            c = p.tun_if.get_rx_stats()
+            self.assertEqual(c['packets'], 127)
+            c = p.tun_if.get_tx_stats()
+            self.assertEqual(c['packets'], 127)
+
+
+class TestIpsec6MultiTunIfEsp(TemplateIpsec, IpsecTun6):
+    """ IPsec IPv6 Muitli Tunnel interface """
+
+    encryption_type = ESP
+    tun6_encrypt_node_name = "esp6-encrypt"
+    tun6_decrypt_node_name = "esp6-decrypt"
+
+    def setUp(self):
+        super(TestIpsec6MultiTunIfEsp, self).setUp()
+
+        self.tun_if = self.pg0
+
+        self.multi_params = []
+
+        for ii in range(10):
+            p = copy.copy(self.ipv6_params)
+
+            p.remote_tun_if_host = "1111::%d" % (ii + 1)
+            p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
+            p.scapy_tun_spi = p.scapy_tun_spi + ii
+            p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
+            p.vpp_tun_spi = p.vpp_tun_spi + ii
+
+            p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
+            p.scapy_tra_spi = p.scapy_tra_spi + ii
+            p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
+            p.vpp_tra_spi = p.vpp_tra_spi + ii
+
+            config_tun_params(p, self.encryption_type, self.tun_if)
+            self.multi_params.append(p)
+
+            p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
+                                            p.scapy_tun_spi,
+                                            p.crypt_algo_vpp_id,
+                                            p.crypt_key, p.crypt_key,
+                                            p.auth_algo_vpp_id, p.auth_key,
+                                            p.auth_key, is_ip6=True)
+            p.tun_if.add_vpp_config()
+            p.tun_if.admin_up()
+            p.tun_if.config_ip6()
+
+            VppIpRoute(self, p.remote_tun_if_host, 128,
+                       [VppRoutePath(p.tun_if.remote_ip6,
+                                     0xffffffff,
+                                     proto=DpoProto.DPO_PROTO_IP6)],
+                       is_ip6=1).add_vpp_config()
+
+    def tearDown(self):
+        if not self.vpp_dead:
+            self.vapi.cli("show hardware")
+        super(TestIpsec6MultiTunIfEsp, self).tearDown()
+
+    def test_tun_66(self):
+        """Multiple IPSEC tunnel interfaces """
+        for p in self.multi_params:
+            self.verify_tun_66(p, count=127)
+            c = p.tun_if.get_rx_stats()
+            self.assertEqual(c['packets'], 127)
+            c = p.tun_if.get_tx_stats()
+            self.assertEqual(c['packets'], 127)
+
+
 if __name__ == '__main__':
     unittest.main(testRunner=VppTestRunner)
index d586c84..7be0d45 100644 (file)
@@ -463,3 +463,11 @@ class VppInterface(object):
 
     def __str__(self):
         return self.name
+
+    def get_rx_stats(self):
+        c = self.test.statistics.get_counter("^/if/rx$")
+        return c[0][self.sw_if_index]
+
+    def get_tx_stats(self):
+        c = self.test.statistics.get_counter("^/if/tx$")
+        return c[0][self.sw_if_index]