Typos. A bunch of typos I've been collecting.
[vpp.git] / test / template_ipsec.py
index 483699c..df4c2bc 100644 (file)
@@ -79,6 +79,53 @@ class IPsecIPv6Params(object):
         self.nat_header = None
 
 
+def config_tun_params(p, encryption_type, tun_if):
+    ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
+    use_esn = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
+                              IPSEC_API_SAD_FLAG_USE_EXTENDED_SEQ_NUM))
+    p.scapy_tun_sa = SecurityAssociation(
+        encryption_type, spi=p.vpp_tun_spi,
+        crypt_algo=p.crypt_algo, crypt_key=p.crypt_key,
+        auth_algo=p.auth_algo, auth_key=p.auth_key,
+        tunnel_header=ip_class_by_addr_type[p.addr_type](
+            src=tun_if.remote_addr[p.addr_type],
+            dst=tun_if.local_addr[p.addr_type]),
+        nat_t_header=p.nat_header,
+        use_esn=use_esn)
+    p.vpp_tun_sa = SecurityAssociation(
+        encryption_type, spi=p.scapy_tun_spi,
+        crypt_algo=p.crypt_algo, crypt_key=p.crypt_key,
+        auth_algo=p.auth_algo, auth_key=p.auth_key,
+        tunnel_header=ip_class_by_addr_type[p.addr_type](
+            dst=tun_if.remote_addr[p.addr_type],
+            src=tun_if.local_addr[p.addr_type]),
+        nat_t_header=p.nat_header,
+        use_esn=use_esn)
+
+
+def config_tra_params(p, encryption_type):
+    use_esn = p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
+                         IPSEC_API_SAD_FLAG_USE_EXTENDED_SEQ_NUM)
+    p.scapy_tra_sa = SecurityAssociation(
+        encryption_type,
+        spi=p.vpp_tra_spi,
+        crypt_algo=p.crypt_algo,
+        crypt_key=p.crypt_key,
+        auth_algo=p.auth_algo,
+        auth_key=p.auth_key,
+        nat_t_header=p.nat_header,
+        use_esn=use_esn)
+    p.vpp_tra_sa = SecurityAssociation(
+        encryption_type,
+        spi=p.scapy_tra_spi,
+        crypt_algo=p.crypt_algo,
+        crypt_key=p.crypt_key,
+        auth_algo=p.auth_algo,
+        auth_key=p.auth_key,
+        nat_t_header=p.nat_header,
+        use_esn=use_esn)
+
+
 class TemplateIpsec(VppTestCase):
     """
     TRANSPORT MODE:
@@ -102,14 +149,16 @@ class TemplateIpsec(VppTestCase):
         """ empty method to be overloaded when necessary """
         pass
 
-    def setUp(self):
-        super(TemplateIpsec, self).setUp()
-
+    def setup_params(self):
         self.ipv4_params = IPsecIPv4Params()
         self.ipv6_params = IPsecIPv6Params()
         self.params = {self.ipv4_params.addr_type: self.ipv4_params,
                        self.ipv6_params.addr_type: self.ipv6_params}
 
+    def setUp(self):
+        super(TemplateIpsec, self).setUp()
+
+        self.setup_params()
         self.payload = "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"\
                        "XXXXXXXXXXXXXXXXXXXXX"
 
@@ -164,26 +213,6 @@ class TemplateIpsec(VppTestCase):
                 ICMPv6EchoRequest(id=0, seq=1, data=self.payload)
                 for i in range(count)]
 
-    def configure_sa_tun(self, params):
-        ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
-        scapy_tun_sa = SecurityAssociation(
-            self.encryption_type, spi=params.vpp_tun_spi,
-            crypt_algo=params.crypt_algo, crypt_key=params.crypt_key,
-            auth_algo=params.auth_algo, auth_key=params.auth_key,
-            tunnel_header=ip_class_by_addr_type[params.addr_type](
-                src=self.tun_if.remote_addr[params.addr_type],
-                dst=self.tun_if.local_addr[params.addr_type]),
-            nat_t_header=params.nat_header)
-        vpp_tun_sa = SecurityAssociation(
-            self.encryption_type, spi=params.scapy_tun_spi,
-            crypt_algo=params.crypt_algo, crypt_key=params.crypt_key,
-            auth_algo=params.auth_algo, auth_key=params.auth_key,
-            tunnel_header=ip_class_by_addr_type[params.addr_type](
-                dst=self.tun_if.remote_addr[params.addr_type],
-                src=self.tun_if.local_addr[params.addr_type]),
-            nat_t_header=params.nat_header)
-        return vpp_tun_sa, scapy_tun_sa
-
     def configure_sa_tra(self, params):
         params.scapy_tra_sa = SecurityAssociation(
             self.encryption_type,
@@ -208,15 +237,15 @@ class IpsecTcpTests(object):
         """ verify checksum correctness for vpp generated packets """
         self.vapi.cli("test http server")
         p = self.params[socket.AF_INET]
-        vpp_tun_sa, scapy_tun_sa = self.configure_sa_tun(p)
+        config_tun_params(p, self.encryption_type, self.tun_if)
         send = (Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac) /
-                scapy_tun_sa.encrypt(IP(src=p.remote_tun_if_host,
-                                        dst=self.tun_if.local_ip4) /
-                                     TCP(flags='S', dport=80)))
+                p.scapy_tun_sa.encrypt(IP(src=p.remote_tun_if_host,
+                                          dst=self.tun_if.local_ip4) /
+                                       TCP(flags='S', dport=80)))
         self.logger.debug(ppp("Sending packet:", send))
         recv = self.send_and_expect(self.tun_if, [send], self.tun_if)
         recv = recv[0]
-        decrypted = vpp_tun_sa.decrypt(recv[IP])
+        decrypted = p.vpp_tun_sa.decrypt(recv[IP])
         self.assert_packet_checksums_valid(decrypted)
 
 
@@ -224,6 +253,7 @@ class IpsecTra4Tests(object):
     def test_tra_anti_replay(self, count=1):
         """ ipsec v4 transport anti-reply test """
         p = self.params[socket.AF_INET]
+        use_esn = p.vpp_tra_sa.use_esn
 
         # fire in a packet with seq number 1
         pkt = (Ether(src=self.tra_if.remote_mac,
@@ -243,6 +273,11 @@ class IpsecTra4Tests(object):
                                       seq_num=235))
         recv_pkts = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
 
+        # replayed packets are dropped
+        self.send_and_assert_no_replies(self.tra_if, pkt * 3)
+        self.assert_packet_counter_equal(
+            '/err/%s/SA replayed packet' % self.tra4_decrypt_node_name, 3)
+
         # the window size is 64 packets
         # in window are still accepted
         pkt = (Ether(src=self.tra_if.remote_mac,
@@ -253,18 +288,6 @@ class IpsecTra4Tests(object):
                                       seq_num=172))
         recv_pkts = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
 
-        # out of window are dropped
-        pkt = (Ether(src=self.tra_if.remote_mac,
-                     dst=self.tra_if.local_mac) /
-               p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
-                                         dst=self.tra_if.local_ip4) /
-                                      ICMP(),
-                                      seq_num=17))
-        self.send_and_assert_no_replies(self.tra_if, pkt * 17)
-
-        self.assert_packet_counter_equal(
-            '/err/%s/SA replayed packet' % self.tra4_decrypt_node_name, 17)
-
         # a packet that does not decrypt does not move the window forward
         bogus_sa = SecurityAssociation(self.encryption_type,
                                        p.vpp_tra_spi)
@@ -288,7 +311,74 @@ class IpsecTra4Tests(object):
                                       seq_num=234))
         self.send_and_expect(self.tra_if, [pkt], self.tra_if)
 
+        # out of window are dropped
+        pkt = (Ether(src=self.tra_if.remote_mac,
+                     dst=self.tra_if.local_mac) /
+               p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
+                                         dst=self.tra_if.local_ip4) /
+                                      ICMP(),
+                                      seq_num=17))
+        self.send_and_assert_no_replies(self.tra_if, pkt * 17)
+
+        if use_esn:
+            # an out of window error with ESN looks like a high sequence
+            # wrap. but since it isn't then the verify will fail.
+            self.assert_packet_counter_equal(
+                '/err/%s/Integrity check failed' %
+                self.tra4_decrypt_node_name, 34)
+
+        else:
+            self.assert_packet_counter_equal(
+                '/err/%s/SA replayed packet' %
+                self.tra4_decrypt_node_name, 20)
+
+        # valid packet moves the window over to 236
+        pkt = (Ether(src=self.tra_if.remote_mac,
+                     dst=self.tra_if.local_mac) /
+               p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
+                                         dst=self.tra_if.local_ip4) /
+                                      ICMP(),
+                                      seq_num=236))
+        rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
+        decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
+
+        # move VPP's SA to just before the seq-number wrap
+        self.vapi.cli("test ipsec sa %d seq 0xffffffff" % p.scapy_tra_sa_id)
+
+        # then fire in a packet that VPP should drop because it causes the
+        # seq number to wrap  unless we're using extended.
+        pkt = (Ether(src=self.tra_if.remote_mac,
+                     dst=self.tra_if.local_mac) /
+               p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
+                                         dst=self.tra_if.local_ip4) /
+                                      ICMP(),
+                                      seq_num=237))
+
+        if use_esn:
+            rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
+            # in order to decrpyt the high order number needs to wrap
+            p.vpp_tra_sa.seq_num = 0x100000000
+            decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
+
+            # send packets with high bits set
+            p.scapy_tra_sa.seq_num = 0x100000005
+            pkt = (Ether(src=self.tra_if.remote_mac,
+                         dst=self.tra_if.local_mac) /
+                   p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
+                                             dst=self.tra_if.local_ip4) /
+                                          ICMP(),
+                                          seq_num=0x100000005))
+            rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
+            # in order to decrpyt the high order number needs to wrap
+            decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
+        else:
+            self.send_and_assert_no_replies(self.tra_if, [pkt])
+            self.assert_packet_counter_equal(
+                '/err/%s/sequence number cycled' %
+                self.tra4_encrypt_node_name, 1)
+
         # move the security-associations seq number on to the last we used
+        self.vapi.cli("test ipsec sa %d seq 0x15f" % p.scapy_tra_sa_id)
         p.scapy_tra_sa.seq_num = 351
         p.vpp_tra_sa.seq_num = 351
 
@@ -374,14 +464,13 @@ class IpsecTra46Tests(IpsecTra4Tests, IpsecTra6Tests):
     pass
 
 
-class IpsecTun4Tests(object):
-    def test_tun_basic44(self, count=1):
-        """ ipsec 4o4 tunnel basic test """
+class IpsecTun4(object):
+
+    def verify_tun_44(self, p, count=1):
         self.vapi.cli("clear errors")
         try:
-            p = self.params[socket.AF_INET]
-            vpp_tun_sa, scapy_tun_sa = self.configure_sa_tun(p)
-            send_pkts = self.gen_encrypt_pkts(scapy_tun_sa, self.tun_if,
+            config_tun_params(p, self.encryption_type, self.tun_if)
+            send_pkts = self.gen_encrypt_pkts(p.scapy_tun_sa, self.tun_if,
                                               src=p.remote_tun_if_host,
                                               dst=self.pg1.remote_ip4,
                                               count=count)
@@ -395,7 +484,7 @@ class IpsecTun4Tests(object):
             recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
             for recv_pkt in recv_pkts:
                 try:
-                    decrypt_pkt = vpp_tun_sa.decrypt(recv_pkt[IP])
+                    decrypt_pkt = p.vpp_tun_sa.decrypt(recv_pkt[IP])
                     if not decrypt_pkt.haslayer(IP):
                         decrypt_pkt = IP(decrypt_pkt[Raw].load)
                     self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip4)
@@ -432,19 +521,26 @@ class IpsecTun4Tests(object):
         self.assert_packet_counter_equal(self.tun4_encrypt_node_name, count)
         self.assert_packet_counter_equal(self.tun4_decrypt_node_name, count)
 
+
+class IpsecTun4Tests(IpsecTun4):
+
+    def test_tun_basic44(self):
+        """ ipsec 4o4 tunnel basic test """
+        self.verify_tun_44(self.params[socket.AF_INET], count=1)
+
     def test_tun_burst44(self):
         """ ipsec 4o4 tunnel burst test """
-        self.test_tun_basic44(count=257)
+        self.verify_tun_44(self.params[socket.AF_INET], count=257)
+
 
+class IpsecTun6(object):
 
-class IpsecTun6Tests(object):
-    def test_tun_basic66(self, count=1):
+    def verify_tun_66(self, p, count=1):
         """ ipsec 6o6 tunnel basic test """
         self.vapi.cli("clear errors")
         try:
-            p = self.params[socket.AF_INET6]
-            vpp_tun_sa, scapy_tun_sa = self.configure_sa_tun(p)
-            send_pkts = self.gen_encrypt_pkts6(scapy_tun_sa, self.tun_if,
+            config_tun_params(p, self.encryption_type, self.tun_if)
+            send_pkts = self.gen_encrypt_pkts6(p.scapy_tun_sa, self.tun_if,
                                                src=p.remote_tun_if_host,
                                                dst=self.pg1.remote_ip6,
                                                count=count)
@@ -459,7 +555,7 @@ class IpsecTun6Tests(object):
             recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
             for recv_pkt in recv_pkts:
                 try:
-                    decrypt_pkt = vpp_tun_sa.decrypt(recv_pkt[IPv6])
+                    decrypt_pkt = p.vpp_tun_sa.decrypt(recv_pkt[IPv6])
                     if not decrypt_pkt.haslayer(IPv6):
                         decrypt_pkt = IPv6(decrypt_pkt[Raw].load)
                     self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip6)
@@ -489,9 +585,16 @@ class IpsecTun6Tests(object):
         self.assert_packet_counter_equal(self.tun6_encrypt_node_name, count)
         self.assert_packet_counter_equal(self.tun6_decrypt_node_name, count)
 
+
+class IpsecTun6Tests(IpsecTun6):
+
+    def test_tun_basic66(self):
+        """ ipsec 6o6 tunnel basic test """
+        self.verify_tun_66(self.params[socket.AF_INET6], count=1)
+
     def test_tun_burst66(self):
         """ ipsec 6o6 tunnel burst test """
-        self.test_tun_basic66(count=257)
+        self.verify_tun_66(self.params[socket.AF_INET6], count=257)
 
 
 class IpsecTun46Tests(IpsecTun4Tests, IpsecTun6Tests):