ipsec: remove dedicated IPSec tunnels
[vpp.git] / test / template_ipsec.py
index 25cff7f..a59a213 100644 (file)
@@ -8,7 +8,7 @@ from scapy.layers.l2 import Ether, Raw
 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest
 
 from framework import VppTestCase, VppTestRunner
-from util import ppp, reassemble4
+from util import ppp, reassemble4, fragment_rfc791, fragment_rfc8200
 from vpp_papi import VppEnum
 
 
@@ -37,12 +37,12 @@ class IPsecIPv4Params(object):
         self.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
                                  IPSEC_API_INTEG_ALG_SHA1_96)
         self.auth_algo = 'HMAC-SHA1-96'  # scapy name
-        self.auth_key = 'C91KUR9GYMm5GfkEvNjX'
+        self.auth_key = b'C91KUR9GYMm5GfkEvNjX'
 
         self.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
                                   IPSEC_API_CRYPTO_ALG_AES_CBC_128)
         self.crypt_algo = 'AES-CBC'  # scapy name
-        self.crypt_key = 'JPjyOWBeVEQiMe7h'
+        self.crypt_key = b'JPjyOWBeVEQiMe7h'
         self.salt = 0
         self.flags = 0
         self.nat_header = None
@@ -73,18 +73,18 @@ class IPsecIPv6Params(object):
         self.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
                                  IPSEC_API_INTEG_ALG_SHA1_96)
         self.auth_algo = 'HMAC-SHA1-96'  # scapy name
-        self.auth_key = 'C91KUR9GYMm5GfkEvNjX'
+        self.auth_key = b'C91KUR9GYMm5GfkEvNjX'
 
         self.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
                                   IPSEC_API_CRYPTO_ALG_AES_CBC_128)
         self.crypt_algo = 'AES-CBC'  # scapy name
-        self.crypt_key = 'JPjyOWBeVEQiMe7h'
+        self.crypt_key = b'JPjyOWBeVEQiMe7h'
         self.salt = 0
         self.flags = 0
         self.nat_header = None
 
 
-def mk_scapy_crpyt_key(p):
+def mk_scapy_crypt_key(p):
     if p.crypt_algo == "AES-GCM":
         return p.crypt_key + struct.pack("!I", p.salt)
     else:
@@ -95,7 +95,7 @@ def config_tun_params(p, encryption_type, tun_if):
     ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
     use_esn = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
                               IPSEC_API_SAD_FLAG_USE_ESN))
-    crypt_key = mk_scapy_crpyt_key(p)
+    crypt_key = mk_scapy_crypt_key(p)
     p.scapy_tun_sa = SecurityAssociation(
         encryption_type, spi=p.vpp_tun_spi,
         crypt_algo=p.crypt_algo,
@@ -121,7 +121,7 @@ def config_tun_params(p, encryption_type, tun_if):
 def config_tra_params(p, encryption_type):
     use_esn = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
                               IPSEC_API_SAD_FLAG_USE_ESN))
-    crypt_key = mk_scapy_crpyt_key(p)
+    crypt_key = mk_scapy_crypt_key(p)
     p.scapy_tra_sa = SecurityAssociation(
         encryption_type,
         spi=p.vpp_tra_spi,
@@ -223,7 +223,7 @@ class TemplateIpsec(VppTestCase):
                          payload_size=54):
         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
                 sa.encrypt(IP(src=src, dst=dst) /
-                           ICMP() / Raw('X' * payload_size))
+                           ICMP() / Raw(b'X' * payload_size))
                 for i in range(count)]
 
     def gen_encrypt_pkts6(self, sa, sw_intf, src, dst, count=1,
@@ -236,7 +236,7 @@ class TemplateIpsec(VppTestCase):
 
     def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=54):
         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
-                IP(src=src, dst=dst) / ICMP() / Raw('X' * payload_size)
+                IP(src=src, dst=dst) / ICMP() / Raw(b'X' * payload_size)
                 for i in range(count)]
 
     def gen_pkts6(self, sw_intf, src, dst, count=1, payload_size=54):
@@ -250,7 +250,6 @@ class IpsecTcp(object):
     def verify_tcp_checksum(self):
         self.vapi.cli("test http server")
         p = self.params[socket.AF_INET]
-        config_tun_params(p, self.encryption_type, self.tun_if)
         send = (Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac) /
                 p.scapy_tun_sa.encrypt(IP(src=p.remote_tun_if_host,
                                           dst=self.tun_if.local_ip4) /
@@ -317,6 +316,21 @@ class IpsecTra4(object):
         replay_count += len(pkts)
         self.assert_error_counter_equal(replay_node_name, replay_count)
 
+        #
+        # now send a batch of packets all with the same sequence number
+        # the first packet in the batch is legitimate, the rest bogus
+        #
+        pkts = (Ether(src=self.tra_if.remote_mac,
+                      dst=self.tra_if.local_mac) /
+                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
+                                          dst=self.tra_if.local_ip4) /
+                                       ICMP(),
+                                       seq_num=35))
+        recv_pkts = self.send_and_expect(self.tra_if, pkts * 8,
+                                         self.tra_if, n_rx=1)
+        replay_count += 7
+        self.assert_error_counter_equal(replay_node_name, replay_count)
+
         #
         # now move the window over to 257 (more than one byte) and into Case A
         #
@@ -347,7 +361,7 @@ class IpsecTra4(object):
         bogus_sa = SecurityAssociation(self.encryption_type,
                                        p.vpp_tra_spi,
                                        crypt_algo=p.crypt_algo,
-                                       crypt_key=mk_scapy_crpyt_key(p)[::-1],
+                                       crypt_key=mk_scapy_crypt_key(p)[::-1],
                                        auth_algo=p.auth_algo,
                                        auth_key=p.auth_key[::-1])
         pkt = (Ether(src=self.tra_if.remote_mac,
@@ -364,7 +378,7 @@ class IpsecTra4(object):
 
         # a malformed 'runt' packet
         #  created by a mis-constructed SA
-        if (ESP == self.encryption_type):
+        if (ESP == self.encryption_type and p.crypt_algo != "NULL"):
             bogus_sa = SecurityAssociation(self.encryption_type,
                                            p.vpp_tra_spi)
             pkt = (Ether(src=self.tra_if.remote_mac,
@@ -703,7 +717,6 @@ class IpsecTun4(object):
         if not n_rx:
             n_rx = count
         try:
-            config_tun_params(p, self.encryption_type, self.tun_if)
             send_pkts = self.gen_encrypt_pkts(p.scapy_tun_sa, self.tun_if,
                                               src=p.remote_tun_if_host,
                                               dst=self.pg1.remote_ip4,
@@ -724,10 +737,39 @@ class IpsecTun4(object):
 
         self.verify_counters4(p, count, n_rx)
 
+    def verify_tun_reass_44(self, p):
+        self.vapi.cli("clear errors")
+        self.vapi.ip_reassembly_enable_disable(
+            sw_if_index=self.tun_if.sw_if_index, enable_ip4=True)
+
+        try:
+            send_pkts = self.gen_encrypt_pkts(p.scapy_tun_sa, self.tun_if,
+                                              src=p.remote_tun_if_host,
+                                              dst=self.pg1.remote_ip4,
+                                              payload_size=1900,
+                                              count=1)
+            send_pkts = fragment_rfc791(send_pkts[0], 1400)
+            recv_pkts = self.send_and_expect(self.tun_if, send_pkts,
+                                             self.pg1, n_rx=1)
+            self.verify_decrypted(p, recv_pkts)
+
+            send_pkts = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
+                                      dst=p.remote_tun_if_host, count=1)
+            recv_pkts = self.send_and_expect(self.pg1, send_pkts,
+                                             self.tun_if)
+            self.verify_encrypted(p, p.vpp_tun_sa, recv_pkts)
+
+        finally:
+            self.logger.info(self.vapi.ppcli("show error"))
+            self.logger.info(self.vapi.ppcli("show ipsec all"))
+
+        self.verify_counters4(p, 1, 1)
+        self.vapi.ip_reassembly_enable_disable(
+            sw_if_index=self.tun_if.sw_if_index, enable_ip4=False)
+
     def verify_tun_64(self, p, count=1):
         self.vapi.cli("clear errors")
         try:
-            config_tun_params(p, self.encryption_type, self.tun_if)
             send_pkts = self.gen_encrypt_pkts6(p.scapy_tun_sa, self.tun_if,
                                                src=p.remote_tun_if_host6,
                                                dst=self.pg1.remote_ip6,
@@ -766,7 +808,7 @@ class IpsecTun4(object):
         pkt = (Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac) /
                IP(src=p.remote_tun_if_host, dst=self.tun_if.local_ip4) /
                UDP(sport=333, dport=4500) /
-               Raw(0xff))
+               Raw(b'\xff'))
         self.send_and_assert_no_replies(self.tun_if, pkt*31)
         self.assert_error_counter_equal(
             '/err/%s/NAT Keepalive' % self.tun4_input_node, 31)
@@ -774,7 +816,7 @@ class IpsecTun4(object):
         pkt = (Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac) /
                IP(src=p.remote_tun_if_host, dst=self.tun_if.local_ip4) /
                UDP(sport=333, dport=4500) /
-               Raw(0xfe))
+               Raw(b'\xfe'))
         self.send_and_assert_no_replies(self.tun_if, pkt*31)
         self.assert_error_counter_equal(
             '/err/%s/Too Short' % self.tun4_input_node, 31)
@@ -786,6 +828,10 @@ class IpsecTun4Tests(IpsecTun4):
         """ ipsec 4o4 tunnel basic test """
         self.verify_tun_44(self.params[socket.AF_INET], count=1)
 
+    def test_tun_reass_basic44(self):
+        """ ipsec 4o4 tunnel basic reassembly test """
+        self.verify_tun_reass_44(self.params[socket.AF_INET])
+
     def test_tun_burst44(self):
         """ ipsec 4o4 tunnel burst test """
         self.verify_tun_44(self.params[socket.AF_INET], count=257)
@@ -837,7 +883,6 @@ class IpsecTun6(object):
         self.vapi.cli("clear errors")
         self.vapi.cli("clear ipsec sa")
 
-        config_tun_params(p_in, self.encryption_type, self.tun_if)
         send_pkts = self.gen_encrypt_pkts6(p_in.scapy_tun_sa, self.tun_if,
                                            src=p_in.remote_tun_if_host,
                                            dst=self.pg1.remote_ip6,
@@ -851,8 +896,6 @@ class IpsecTun6(object):
         if not p_out:
             p_out = p_in
         try:
-            config_tun_params(p_in, self.encryption_type, self.tun_if)
-            config_tun_params(p_out, self.encryption_type, self.tun_if)
             send_pkts = self.gen_encrypt_pkts6(p_in.scapy_tun_sa, self.tun_if,
                                                src=p_in.remote_tun_if_host,
                                                dst=self.pg1.remote_ip6,
@@ -873,11 +916,40 @@ class IpsecTun6(object):
             self.logger.info(self.vapi.ppcli("show ipsec all"))
         self.verify_counters6(p_in, p_out, count)
 
+    def verify_tun_reass_66(self, p):
+        self.vapi.cli("clear errors")
+        self.vapi.ip_reassembly_enable_disable(
+            sw_if_index=self.tun_if.sw_if_index, enable_ip6=True)
+
+        try:
+            send_pkts = self.gen_encrypt_pkts6(p.scapy_tun_sa, self.tun_if,
+                                               src=p.remote_tun_if_host,
+                                               dst=self.pg1.remote_ip6,
+                                               count=1,
+                                               payload_size=1900)
+            send_pkts = fragment_rfc8200(send_pkts[0], 1, 1400, self.logger)
+            recv_pkts = self.send_and_expect(self.tun_if, send_pkts,
+                                             self.pg1, n_rx=1)
+            self.verify_decrypted6(p, recv_pkts)
+
+            send_pkts = self.gen_pkts6(self.pg1, src=self.pg1.remote_ip6,
+                                       dst=p.remote_tun_if_host,
+                                       count=1,
+                                       payload_size=64)
+            recv_pkts = self.send_and_expect(self.pg1, send_pkts,
+                                             self.tun_if)
+            self.verify_encrypted6(p, p.vpp_tun_sa, recv_pkts)
+        finally:
+            self.logger.info(self.vapi.ppcli("show error"))
+            self.logger.info(self.vapi.ppcli("show ipsec all"))
+        self.verify_counters6(p, p, 1)
+        self.vapi.ip_reassembly_enable_disable(
+            sw_if_index=self.tun_if.sw_if_index, enable_ip6=False)
+
     def verify_tun_46(self, p, count=1):
         """ ipsec 4o6 tunnel basic test """
         self.vapi.cli("clear errors")
         try:
-            config_tun_params(p, self.encryption_type, self.tun_if)
             send_pkts = self.gen_encrypt_pkts(p.scapy_tun_sa, self.tun_if,
                                               src=p.remote_tun_if_host4,
                                               dst=self.pg1.remote_ip4,
@@ -920,6 +992,10 @@ class IpsecTun6Tests(IpsecTun6):
         """ ipsec 6o6 tunnel basic test """
         self.verify_tun_66(self.params[socket.AF_INET6], count=1)
 
+    def test_tun_reass_basic66(self):
+        """ ipsec 6o6 tunnel basic reassembly test """
+        self.verify_tun_reass_66(self.params[socket.AF_INET6])
+
     def test_tun_burst66(self):
         """ ipsec 6o6 tunnel burst test """
         self.verify_tun_66(self.params[socket.AF_INET6], count=257)