ipsec: USE_EXTENDED_SEQ_NUM -> USE_ESN
[vpp.git] / test / template_ipsec.py
index 273865d..40e787e 100644 (file)
@@ -21,6 +21,7 @@ class IPsecIPv4Params(object):
 
     def __init__(self):
         self.remote_tun_if_host = '1.1.1.1'
+        self.remote_tun_if_host6 = '1111::1'
 
         self.scapy_tun_sa_id = 10
         self.scapy_tun_spi = 1001
@@ -55,6 +56,7 @@ class IPsecIPv6Params(object):
 
     def __init__(self):
         self.remote_tun_if_host = '1111:1111:1111:1111:1111:1111:1111:1111'
+        self.remote_tun_if_host4 = '1.1.1.1'
 
         self.scapy_tun_sa_id = 50
         self.scapy_tun_spi = 3001
@@ -82,7 +84,7 @@ class IPsecIPv6Params(object):
 def config_tun_params(p, encryption_type, tun_if):
     ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
     use_esn = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
-                              IPSEC_API_SAD_FLAG_USE_EXTENDED_SEQ_NUM))
+                              IPSEC_API_SAD_FLAG_USE_ESN))
     p.scapy_tun_sa = SecurityAssociation(
         encryption_type, spi=p.vpp_tun_spi,
         crypt_algo=p.crypt_algo, crypt_key=p.crypt_key,
@@ -105,7 +107,7 @@ def config_tun_params(p, encryption_type, tun_if):
 
 def config_tra_params(p, encryption_type):
     use_esn = p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
-                         IPSEC_API_SAD_FLAG_USE_EXTENDED_SEQ_NUM)
+                         IPSEC_API_SAD_FLAG_USE_ESN)
     p.scapy_tra_sa = SecurityAssociation(
         encryption_type,
         spi=p.vpp_tra_spi,
@@ -213,24 +215,6 @@ class TemplateIpsec(VppTestCase):
                 ICMPv6EchoRequest(id=0, seq=1, data=self.payload)
                 for i in range(count)]
 
-    def configure_sa_tra(self, params):
-        params.scapy_tra_sa = SecurityAssociation(
-            self.encryption_type,
-            spi=params.vpp_tra_spi,
-            crypt_algo=params.crypt_algo,
-            crypt_key=params.crypt_key,
-            auth_algo=params.auth_algo,
-            auth_key=params.auth_key,
-            nat_t_header=params.nat_header)
-        params.vpp_tra_sa = SecurityAssociation(
-            self.encryption_type,
-            spi=params.scapy_tra_spi,
-            crypt_algo=params.crypt_algo,
-            crypt_key=params.crypt_key,
-            auth_algo=params.auth_algo,
-            auth_key=params.auth_key,
-            nat_t_header=params.nat_header)
-
 
 class IpsecTcpTests(object):
     def test_tcp_checksum(self):
@@ -345,8 +329,8 @@ class IpsecTra4Tests(object):
         # move VPP's SA to just before the seq-number wrap
         self.vapi.cli("test ipsec sa %d seq 0xffffffff" % p.scapy_tra_sa_id)
 
-        # then fire in a packet that VPP should drop becuase it causes the
-        # seq number to wrap, unless we're using exteneded.
+        # then fire in a packet that VPP should drop because it causes the
+        # seq number to wrap  unless we're using extended.
         pkt = (Ether(src=self.tra_if.remote_mac,
                      dst=self.tra_if.local_mac) /
                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
@@ -466,6 +450,26 @@ class IpsecTra46Tests(IpsecTra4Tests, IpsecTra6Tests):
 
 class IpsecTun4(object):
 
+    def verify_counters(self, p, count):
+        if (hasattr(p, "spd_policy_in_any")):
+            pkts = p.spd_policy_in_any.get_stats()['packets']
+            self.assertEqual(pkts, count,
+                             "incorrect SPD any policy: expected %d != %d" %
+                             (count, pkts))
+
+        if (hasattr(p, "tun_sa_in")):
+            pkts = p.tun_sa_in.get_stats()['packets']
+            self.assertEqual(pkts, count,
+                             "incorrect SA in counts: expected %d != %d" %
+                             (count, pkts))
+            pkts = p.tun_sa_out.get_stats()['packets']
+            self.assertEqual(pkts, count,
+                             "incorrect SA out counts: expected %d != %d" %
+                             (count, pkts))
+
+        self.assert_packet_counter_equal(self.tun4_encrypt_node_name, count)
+        self.assert_packet_counter_equal(self.tun4_decrypt_node_name, count)
+
     def verify_tun_44(self, p, count=1):
         self.vapi.cli("clear errors")
         try:
@@ -502,24 +506,45 @@ class IpsecTun4(object):
             self.logger.info(self.vapi.ppcli("show error"))
             self.logger.info(self.vapi.ppcli("show ipsec"))
 
-        if (hasattr(p, "spd_policy_in_any")):
-            pkts = p.spd_policy_in_any.get_stats()['packets']
-            self.assertEqual(pkts, count,
-                             "incorrect SPD any policy: expected %d != %d" %
-                             (count, pkts))
+        self.verify_counters(p, count)
 
-        if (hasattr(p, "tun_sa_in")):
-            pkts = p.tun_sa_in.get_stats()['packets']
-            self.assertEqual(pkts, count,
-                             "incorrect SA in counts: expected %d != %d" %
-                             (count, pkts))
-            pkts = p.tun_sa_out.get_stats()['packets']
-            self.assertEqual(pkts, count,
-                             "incorrect SA out counts: expected %d != %d" %
-                             (count, pkts))
+    def verify_tun_64(self, p, count=1):
+        self.vapi.cli("clear errors")
+        try:
+            config_tun_params(p, self.encryption_type, self.tun_if)
+            send_pkts = self.gen_encrypt_pkts6(p.scapy_tun_sa, self.tun_if,
+                                               src=p.remote_tun_if_host6,
+                                               dst=self.pg1.remote_ip6,
+                                               count=count)
+            recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
+            for recv_pkt in recv_pkts:
+                self.assert_equal(recv_pkt[IPv6].src, p.remote_tun_if_host6)
+                self.assert_equal(recv_pkt[IPv6].dst, self.pg1.remote_ip6)
+                self.assert_packet_checksums_valid(recv_pkt)
+            send_pkts = self.gen_pkts6(self.pg1, src=self.pg1.remote_ip6,
+                                       dst=p.remote_tun_if_host6, count=count)
+            recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
+            for recv_pkt in recv_pkts:
+                try:
+                    decrypt_pkt = p.vpp_tun_sa.decrypt(recv_pkt[IP])
+                    if not decrypt_pkt.haslayer(IPv6):
+                        decrypt_pkt = IPv6(decrypt_pkt[Raw].load)
+                    self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip6)
+                    self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host6)
+                    self.assert_packet_checksums_valid(decrypt_pkt)
+                except:
+                    self.logger.error(ppp("Unexpected packet:", recv_pkt))
+                    try:
+                        self.logger.debug(
+                            ppp("Decrypted packet:", decrypt_pkt))
+                    except:
+                        pass
+                    raise
+        finally:
+            self.logger.info(self.vapi.ppcli("show error"))
+            self.logger.info(self.vapi.ppcli("show ipsec"))
 
-        self.assert_packet_counter_equal(self.tun4_encrypt_node_name, count)
-        self.assert_packet_counter_equal(self.tun4_decrypt_node_name, count)
+        self.verify_counters(p, count)
 
 
 class IpsecTun4Tests(IpsecTun4):
@@ -535,6 +560,19 @@ class IpsecTun4Tests(IpsecTun4):
 
 class IpsecTun6(object):
 
+    def verify_counters(self, p, count):
+        if (hasattr(p, "tun_sa_in")):
+            pkts = p.tun_sa_in.get_stats()['packets']
+            self.assertEqual(pkts, count,
+                             "incorrect SA in counts: expected %d != %d" %
+                             (count, pkts))
+            pkts = p.tun_sa_out.get_stats()['packets']
+            self.assertEqual(pkts, count,
+                             "incorrect SA out counts: expected %d != %d" %
+                             (count, pkts))
+        self.assert_packet_counter_equal(self.tun6_encrypt_node_name, count)
+        self.assert_packet_counter_equal(self.tun6_decrypt_node_name, count)
+
     def verify_tun_66(self, p, count=1):
         """ ipsec 6o6 tunnel basic test """
         self.vapi.cli("clear errors")
@@ -572,18 +610,46 @@ class IpsecTun6(object):
         finally:
             self.logger.info(self.vapi.ppcli("show error"))
             self.logger.info(self.vapi.ppcli("show ipsec"))
+        self.verify_counters(p, count)
 
-        if (hasattr(p, "tun_sa_in")):
-            pkts = p.tun_sa_in.get_stats()['packets']
-            self.assertEqual(pkts, count,
-                             "incorrect SA in counts: expected %d != %d" %
-                             (count, pkts))
-            pkts = p.tun_sa_out.get_stats()['packets']
-            self.assertEqual(pkts, count,
-                             "incorrect SA out counts: expected %d != %d" %
-                             (count, pkts))
-        self.assert_packet_counter_equal(self.tun6_encrypt_node_name, count)
-        self.assert_packet_counter_equal(self.tun6_decrypt_node_name, count)
+    def verify_tun_46(self, p, count=1):
+        """ ipsec 4o6 tunnel basic test """
+        self.vapi.cli("clear errors")
+        try:
+            config_tun_params(p, self.encryption_type, self.tun_if)
+            send_pkts = self.gen_encrypt_pkts(p.scapy_tun_sa, self.tun_if,
+                                              src=p.remote_tun_if_host4,
+                                              dst=self.pg1.remote_ip4,
+                                              count=count)
+            recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
+            for recv_pkt in recv_pkts:
+                self.assert_equal(recv_pkt[IP].src, p.remote_tun_if_host4)
+                self.assert_equal(recv_pkt[IP].dst, self.pg1.remote_ip4)
+                self.assert_packet_checksums_valid(recv_pkt)
+            send_pkts = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
+                                      dst=p.remote_tun_if_host4,
+                                      count=count)
+            recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
+            for recv_pkt in recv_pkts:
+                try:
+                    decrypt_pkt = p.vpp_tun_sa.decrypt(recv_pkt[IPv6])
+                    if not decrypt_pkt.haslayer(IP):
+                        decrypt_pkt = IP(decrypt_pkt[Raw].load)
+                    self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip4)
+                    self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host4)
+                    self.assert_packet_checksums_valid(decrypt_pkt)
+                except:
+                    self.logger.debug(ppp("Unexpected packet:", recv_pkt))
+                    try:
+                        self.logger.debug(ppp("Decrypted packet:",
+                                              decrypt_pkt))
+                    except:
+                        pass
+                    raise
+        finally:
+            self.logger.info(self.vapi.ppcli("show error"))
+            self.logger.info(self.vapi.ppcli("show ipsec"))
+        self.verify_counters(p, count)
 
 
 class IpsecTun6Tests(IpsecTun6):