ipsec: IPSec protection for multi-point tunnel interfaces
[vpp.git] / test / template_ipsec.py
index 398a6bb..5a700e8 100644 (file)
@@ -27,14 +27,14 @@ class IPsecIPv4Params(object):
         self.remote_tun_if_host = '1.1.1.1'
         self.remote_tun_if_host6 = '1111::1'
 
-        self.scapy_tun_sa_id = 10
+        self.scapy_tun_sa_id = 100
         self.scapy_tun_spi = 1001
-        self.vpp_tun_sa_id = 20
+        self.vpp_tun_sa_id = 200
         self.vpp_tun_spi = 1000
 
-        self.scapy_tra_sa_id = 30
+        self.scapy_tra_sa_id = 300
         self.scapy_tra_spi = 2001
-        self.vpp_tra_sa_id = 40
+        self.vpp_tra_sa_id = 400
         self.vpp_tra_spi = 2000
 
         self.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
@@ -63,14 +63,14 @@ class IPsecIPv6Params(object):
         self.remote_tun_if_host = '1111:1111:1111:1111:1111:1111:1111:1111'
         self.remote_tun_if_host4 = '1.1.1.1'
 
-        self.scapy_tun_sa_id = 50
+        self.scapy_tun_sa_id = 500
         self.scapy_tun_spi = 3001
-        self.vpp_tun_sa_id = 60
+        self.vpp_tun_sa_id = 600
         self.vpp_tun_spi = 3000
 
-        self.scapy_tra_sa_id = 70
+        self.scapy_tra_sa_id = 700
         self.scapy_tra_spi = 4001
-        self.vpp_tra_sa_id = 80
+        self.vpp_tra_sa_id = 800
         self.vpp_tra_spi = 4000
 
         self.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
@@ -98,6 +98,8 @@ def config_tun_params(p, encryption_type, tun_if):
     ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
     esn_en = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
                              IPSEC_API_SAD_FLAG_USE_ESN))
+    p.tun_dst = tun_if.remote_addr[p.addr_type]
+    p.tun_src = tun_if.local_addr[p.addr_type]
     crypt_key = mk_scapy_crypt_key(p)
     p.scapy_tun_sa = SecurityAssociation(
         encryption_type, spi=p.vpp_tun_spi,
@@ -105,8 +107,8 @@ def config_tun_params(p, encryption_type, tun_if):
         crypt_key=crypt_key,
         auth_algo=p.auth_algo, auth_key=p.auth_key,
         tunnel_header=ip_class_by_addr_type[p.addr_type](
-            src=tun_if.remote_addr[p.addr_type],
-            dst=tun_if.local_addr[p.addr_type]),
+            src=p.tun_dst,
+            dst=p.tun_src),
         nat_t_header=p.nat_header,
         esn_en=esn_en)
     p.vpp_tun_sa = SecurityAssociation(
@@ -115,8 +117,8 @@ def config_tun_params(p, encryption_type, tun_if):
         crypt_key=crypt_key,
         auth_algo=p.auth_algo, auth_key=p.auth_key,
         tunnel_header=ip_class_by_addr_type[p.addr_type](
-            dst=tun_if.remote_addr[p.addr_type],
-            src=tun_if.local_addr[p.addr_type]),
+            dst=p.tun_dst,
+            src=p.tun_src),
         nat_t_header=p.nat_header,
         esn_en=esn_en)
 
@@ -222,14 +224,14 @@ class TemplateIpsec(VppTestCase):
     def show_commands_at_teardown(self):
         self.logger.info(self.vapi.cli("show hardware"))
 
-    def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1,
+    def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
                          payload_size=54):
         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
                 sa.encrypt(IP(src=src, dst=dst) /
                            ICMP() / Raw(b'X' * payload_size))
                 for i in range(count)]
 
-    def gen_encrypt_pkts6(self, sa, sw_intf, src, dst, count=1,
+    def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1,
                           payload_size=54):
         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
                 sa.encrypt(IPv6(src=src, dst=dst) /
@@ -554,16 +556,17 @@ class IpsecTra4(object):
         p.scapy_tra_sa.seq_num = 351
         p.vpp_tra_sa.seq_num = 351
 
-    def verify_tra_basic4(self, count=1):
+    def verify_tra_basic4(self, count=1, payload_size=54):
         """ ipsec v4 transport basic test """
         self.vapi.cli("clear errors")
         self.vapi.cli("clear ipsec sa")
         try:
             p = self.params[socket.AF_INET]
-            send_pkts = self.gen_encrypt_pkts(p.scapy_tra_sa, self.tra_if,
+            send_pkts = self.gen_encrypt_pkts(p, p.scapy_tra_sa, self.tra_if,
                                               src=self.tra_if.remote_ip4,
                                               dst=self.tra_if.local_ip4,
-                                              count=count)
+                                              count=count,
+                                              payload_size=payload_size)
             recv_pkts = self.send_and_expect(self.tra_if, send_pkts,
                                              self.tra_if)
             for rx in recv_pkts:
@@ -595,7 +598,7 @@ class IpsecTra4(object):
 class IpsecTra4Tests(IpsecTra4):
     """ UT test methods for Transport v4 """
     def test_tra_anti_replay(self):
-        """ ipsec v4 transport anti-reply test """
+        """ ipsec v4 transport anti-replay test """
         self.verify_tra_anti_replay()
 
     def test_tra_basic(self, count=1):
@@ -609,14 +612,16 @@ class IpsecTra4Tests(IpsecTra4):
 
 class IpsecTra6(object):
     """ verify methods for Transport v6 """
-    def verify_tra_basic6(self, count=1):
+    def verify_tra_basic6(self, count=1, payload_size=54):
         self.vapi.cli("clear errors")
+        self.vapi.cli("clear ipsec sa")
         try:
             p = self.params[socket.AF_INET6]
-            send_pkts = self.gen_encrypt_pkts6(p.scapy_tra_sa, self.tra_if,
+            send_pkts = self.gen_encrypt_pkts6(p, p.scapy_tra_sa, self.tra_if,
                                                src=self.tra_if.remote_ip6,
                                                dst=self.tra_if.local_ip6,
-                                               count=count)
+                                               count=count,
+                                               payload_size=payload_size)
             recv_pkts = self.send_and_expect(self.tra_if, send_pkts,
                                              self.tra_if)
             for rx in recv_pkts:
@@ -826,13 +831,15 @@ class IpsecTun4(object):
     def verify_tun_44(self, p, count=1, payload_size=64, n_rx=None):
         self.vapi.cli("clear errors")
         self.vapi.cli("clear ipsec counters")
+        self.vapi.cli("clear ipsec sa")
         if not n_rx:
             n_rx = count
         try:
-            send_pkts = self.gen_encrypt_pkts(p.scapy_tun_sa, self.tun_if,
+            send_pkts = self.gen_encrypt_pkts(p, p.scapy_tun_sa, self.tun_if,
                                               src=p.remote_tun_if_host,
                                               dst=self.pg1.remote_ip4,
-                                              count=count)
+                                              count=count,
+                                              payload_size=payload_size)
             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
             self.verify_decrypted(p, recv_pkts)
 
@@ -843,6 +850,10 @@ class IpsecTun4(object):
                                              self.tun_if, n_rx)
             self.verify_encrypted(p, p.vpp_tun_sa, recv_pkts)
 
+            for rx in recv_pkts:
+                self.assertEqual(rx[IP].src, p.tun_src)
+                self.assertEqual(rx[IP].dst, p.tun_dst)
+
         finally:
             self.logger.info(self.vapi.ppcli("show error"))
             self.logger.info(self.vapi.ppcli("show ipsec all"))
@@ -851,40 +862,25 @@ class IpsecTun4(object):
         self.logger.info(self.vapi.ppcli("show ipsec sa 4"))
         self.verify_counters4(p, count, n_rx)
 
-    """ verify methods for Transport v4 """
-    def verify_tun_44_bad_packet_sizes(self, p):
-        # with a buffer size of 2048, 1989 bytes of payload
-        # means there isn't space to insert the ESP header
-        N_PKTS = 63
-        for p_siz in [1989, 8500]:
-            send_pkts = self.gen_encrypt_pkts(p.scapy_tun_sa, self.tun_if,
+    def verify_tun_dropped_44(self, p, count=1, payload_size=64, n_rx=None):
+        self.vapi.cli("clear errors")
+        if not n_rx:
+            n_rx = count
+        try:
+            send_pkts = self.gen_encrypt_pkts(p, p.scapy_tun_sa, self.tun_if,
                                               src=p.remote_tun_if_host,
                                               dst=self.pg1.remote_ip4,
-                                              count=N_PKTS,
-                                              payload_size=p_siz)
+                                              count=count)
             self.send_and_assert_no_replies(self.tun_if, send_pkts)
+
             send_pkts = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
-                                      dst=p.remote_tun_if_host, count=N_PKTS,
-                                      payload_size=p_siz)
-            self.send_and_assert_no_replies(self.pg1, send_pkts,
-                                            self.tun_if)
-
-        # both large packets on decrpyt count against chained buffers
-        # the 9000 bytes one does on encrypt
-        self.assertEqual(2 * N_PKTS,
-                         self.statistics.get_err_counter(
-                             '/err/%s/chained buffers (packet dropped)' %
-                             self.tun4_decrypt_node_name))
-        self.assertEqual(N_PKTS,
-                         self.statistics.get_err_counter(
-                             '/err/%s/chained buffers (packet dropped)' %
-                             self.tun4_encrypt_node_name))
-
-        # on encrypt the 1989 size is no trailer space
-        self.assertEqual(N_PKTS,
-                         self.statistics.get_err_counter(
-                             '/err/%s/no trailer space (packet dropped)' %
-                             self.tun4_encrypt_node_name))
+                                      dst=p.remote_tun_if_host, count=count,
+                                      payload_size=payload_size)
+            self.send_and_assert_no_replies(self.pg1, send_pkts)
+
+        finally:
+            self.logger.info(self.vapi.ppcli("show error"))
+            self.logger.info(self.vapi.ppcli("show ipsec all"))
 
     def verify_tun_reass_44(self, p):
         self.vapi.cli("clear errors")
@@ -892,7 +888,7 @@ class IpsecTun4(object):
             sw_if_index=self.tun_if.sw_if_index, enable_ip4=True)
 
         try:
-            send_pkts = self.gen_encrypt_pkts(p.scapy_tun_sa, self.tun_if,
+            send_pkts = self.gen_encrypt_pkts(p, p.scapy_tun_sa, self.tun_if,
                                               src=p.remote_tun_if_host,
                                               dst=self.pg1.remote_ip4,
                                               payload_size=1900,
@@ -919,7 +915,7 @@ class IpsecTun4(object):
     def verify_tun_64(self, p, count=1):
         self.vapi.cli("clear errors")
         try:
-            send_pkts = self.gen_encrypt_pkts6(p.scapy_tun_sa, self.tun_if,
+            send_pkts = self.gen_encrypt_pkts6(p, p.scapy_tun_sa, self.tun_if,
                                                src=p.remote_tun_if_host6,
                                                dst=self.pg1.remote_ip6,
                                                count=count)
@@ -990,12 +986,6 @@ class IpsecTun4Tests(IpsecTun4):
         self.verify_tun_44(self.params[socket.AF_INET], count=127)
 
 
-class IpsecTunEsp4Tests(IpsecTun4):
-    def test_tun_bad_packet_sizes(self):
-        """ ipsec v4 tunnel bad packet size """
-        self.verify_tun_44_bad_packet_sizes(self.params[socket.AF_INET])
-
-
 class IpsecTun6(object):
     """ verify methods for Tunnel v6 """
     def verify_counters6(self, p_in, p_out, count, worker=None):
@@ -1042,7 +1032,8 @@ class IpsecTun6(object):
         self.vapi.cli("clear errors")
         self.vapi.cli("clear ipsec sa")
 
-        send_pkts = self.gen_encrypt_pkts6(p_in.scapy_tun_sa, self.tun_if,
+        send_pkts = self.gen_encrypt_pkts6(p_in, p_in.scapy_tun_sa,
+                                           self.tun_if,
                                            src=p_in.remote_tun_if_host,
                                            dst=self.pg1.remote_ip6,
                                            count=count)
@@ -1055,10 +1046,12 @@ class IpsecTun6(object):
         if not p_out:
             p_out = p_in
         try:
-            send_pkts = self.gen_encrypt_pkts6(p_in.scapy_tun_sa, self.tun_if,
+            send_pkts = self.gen_encrypt_pkts6(p_in, p_in.scapy_tun_sa,
+                                               self.tun_if,
                                                src=p_in.remote_tun_if_host,
                                                dst=self.pg1.remote_ip6,
-                                               count=count)
+                                               count=count,
+                                               payload_size=payload_size)
             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
             self.verify_decrypted6(p_in, recv_pkts)
 
@@ -1069,6 +1062,10 @@ class IpsecTun6(object):
             recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
             self.verify_encrypted6(p_out, p_out.vpp_tun_sa, recv_pkts)
 
+            for rx in recv_pkts:
+                self.assertEqual(rx[IPv6].src, p_out.tun_src)
+                self.assertEqual(rx[IPv6].dst, p_out.tun_dst)
+
         finally:
             self.logger.info(self.vapi.ppcli("show error"))
             self.logger.info(self.vapi.ppcli("show ipsec all"))
@@ -1080,7 +1077,7 @@ class IpsecTun6(object):
             sw_if_index=self.tun_if.sw_if_index, enable_ip6=True)
 
         try:
-            send_pkts = self.gen_encrypt_pkts6(p.scapy_tun_sa, self.tun_if,
+            send_pkts = self.gen_encrypt_pkts6(p, p.scapy_tun_sa, self.tun_if,
                                                src=p.remote_tun_if_host,
                                                dst=self.pg1.remote_ip6,
                                                count=1,
@@ -1108,7 +1105,7 @@ class IpsecTun6(object):
         """ ipsec 4o6 tunnel basic test """
         self.vapi.cli("clear errors")
         try:
-            send_pkts = self.gen_encrypt_pkts(p.scapy_tun_sa, self.tun_if,
+            send_pkts = self.gen_encrypt_pkts(p, p.scapy_tun_sa, self.tun_if,
                                               src=p.remote_tun_if_host4,
                                               dst=self.pg1.remote_ip4,
                                               count=count)
@@ -1171,7 +1168,7 @@ class IpsecTun6HandoffTests(IpsecTun6):
         # inject alternately on worker 0 and 1. all counts on the SA
         # should be against worker 0
         for worker in [0, 1, 0, 1]:
-            send_pkts = self.gen_encrypt_pkts6(p.scapy_tun_sa, self.tun_if,
+            send_pkts = self.gen_encrypt_pkts6(p, p.scapy_tun_sa, self.tun_if,
                                                src=p.remote_tun_if_host,
                                                dst=self.pg1.remote_ip6,
                                                count=N_PKTS)
@@ -1202,7 +1199,7 @@ class IpsecTun4HandoffTests(IpsecTun4):
         # inject alternately on worker 0 and 1. all counts on the SA
         # should be against worker 0
         for worker in [0, 1, 0, 1]:
-            send_pkts = self.gen_encrypt_pkts(p.scapy_tun_sa, self.tun_if,
+            send_pkts = self.gen_encrypt_pkts(p, p.scapy_tun_sa, self.tun_if,
                                               src=p.remote_tun_if_host,
                                               dst=self.pg1.remote_ip4,
                                               count=N_PKTS)