IPSEC-AH: anti-replay testing
[vpp.git] / test / template_ipsec.py
index bf13d71..961f63a 100644 (file)
@@ -95,6 +95,11 @@ class TemplateIpsec(VppTestCase):
     vpp_esp_protocol = 1
     vpp_ah_protocol = 0
 
+    @classmethod
+    def ipsec_select_backend(cls):
+        """ empty method to be overloaded when necessary """
+        pass
+
     @classmethod
     def setUpClass(cls):
         super(TemplateIpsec, cls).setUpClass()
@@ -106,6 +111,7 @@ class TemplateIpsec(VppTestCase):
             i.resolve_arp()
             i.config_ip6()
             i.resolve_ndp()
+        cls.ipsec_select_backend()
 
     def tearDown(self):
         super(TemplateIpsec, self).tearDown()
@@ -152,20 +158,20 @@ class TemplateIpsec(VppTestCase):
                 src=self.tun_if.local_addr[params.addr_type]))
         return vpp_tun_sa, scapy_tun_sa
 
-    def configure_sa_tra(self, params):
-        scapy_tra_sa = SecurityAssociation(self.encryption_type,
-                                           spi=params.vpp_tra_spi,
-                                           crypt_algo=params.crypt_algo,
-                                           crypt_key=params.crypt_key,
-                                           auth_algo=params.auth_algo,
-                                           auth_key=params.auth_key)
-        vpp_tra_sa = SecurityAssociation(self.encryption_type,
-                                         spi=params.scapy_tra_spi,
-                                         crypt_algo=params.crypt_algo,
-                                         crypt_key=params.crypt_key,
-                                         auth_algo=params.auth_algo,
-                                         auth_key=params.auth_key)
-        return vpp_tra_sa, scapy_tra_sa
+    @classmethod
+    def configure_sa_tra(cls, params):
+        params.scapy_tra_sa = SecurityAssociation(cls.encryption_type,
+                                                  spi=params.vpp_tra_spi,
+                                                  crypt_algo=params.crypt_algo,
+                                                  crypt_key=params.crypt_key,
+                                                  auth_algo=params.auth_algo,
+                                                  auth_key=params.auth_key)
+        params.vpp_tra_sa = SecurityAssociation(cls.encryption_type,
+                                                spi=params.scapy_tra_spi,
+                                                crypt_algo=params.crypt_algo,
+                                                crypt_key=params.crypt_key,
+                                                auth_algo=params.auth_algo,
+                                                auth_key=params.auth_key)
 
 
 class IpsecTcpTests(object):
@@ -186,54 +192,133 @@ class IpsecTcpTests(object):
 
 
 class IpsecTraTests(object):
+    def test_tra_anti_replay(self, count=1):
+        """ ipsec v4 transport anti-reply test """
+        p = self.params[socket.AF_INET]
+
+        # fire in a packet with seq number 1
+        pkt = (Ether(src=self.tra_if.remote_mac,
+                     dst=self.tra_if.local_mac) /
+               p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
+                                         dst=self.tra_if.local_ip4) /
+                                      ICMP(),
+                                      seq_num=1))
+        recv_pkts = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
+
+        # now move the window over to 235
+        pkt = (Ether(src=self.tra_if.remote_mac,
+                     dst=self.tra_if.local_mac) /
+               p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
+                                         dst=self.tra_if.local_ip4) /
+                                      ICMP(),
+                                      seq_num=235))
+        recv_pkts = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
+
+        # the window size is 64 packets
+        # in window are still accepted
+        pkt = (Ether(src=self.tra_if.remote_mac,
+                     dst=self.tra_if.local_mac) /
+               p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
+                                         dst=self.tra_if.local_ip4) /
+                                      ICMP(),
+                                      seq_num=172))
+        recv_pkts = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
+
+        # out of window are dropped
+        pkt = (Ether(src=self.tra_if.remote_mac,
+                     dst=self.tra_if.local_mac) /
+               p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
+                                         dst=self.tra_if.local_ip4) /
+                                      ICMP(),
+                                      seq_num=17))
+        self.send_and_assert_no_replies(self.tra_if, pkt * 17)
+
+        err = self.statistics.get_counter(
+            '/err/%s/SA replayed packet' % self.tra4_decrypt_node_name)
+        self.assertEqual(err, 17)
+
+        # a packet that does not decrypt does not move the window forward
+        bogus_sa = SecurityAssociation(self.encryption_type,
+                                       p.vpp_tra_spi)
+        pkt = (Ether(src=self.tra_if.remote_mac,
+                     dst=self.tra_if.local_mac) /
+               bogus_sa.encrypt(IP(src=self.tra_if.remote_ip4,
+                                   dst=self.tra_if.local_ip4) /
+                                ICMP(),
+                                seq_num=350))
+        self.send_and_assert_no_replies(self.tra_if, pkt * 17)
+
+        err = self.statistics.get_counter(
+            '/err/%s/Integrity check failed' % self.tra4_decrypt_node_name)
+        self.assertEqual(err, 17)
+
+        # which we can determine since this packet is still in the window
+        pkt = (Ether(src=self.tra_if.remote_mac,
+                     dst=self.tra_if.local_mac) /
+               p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
+                                         dst=self.tra_if.local_ip4) /
+                                      ICMP(),
+                                      seq_num=234))
+        recv_pkts = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
+
+        # move the security-associations seq number on to the last we used
+        p.scapy_tra_sa.seq_num = 351
+        p.vpp_tra_sa.seq_num = 351
+
     def test_tra_basic(self, count=1):
         """ ipsec v4 transport basic test """
+        self.vapi.cli("clear errors")
         try:
             p = self.params[socket.AF_INET]
-            vpp_tra_sa, scapy_tra_sa = self.configure_sa_tra(p)
-            send_pkts = self.gen_encrypt_pkts(scapy_tra_sa, self.tra_if,
+            send_pkts = self.gen_encrypt_pkts(p.scapy_tra_sa, self.tra_if,
                                               src=self.tra_if.remote_ip4,
                                               dst=self.tra_if.local_ip4,
                                               count=count)
             recv_pkts = self.send_and_expect(self.tra_if, send_pkts,
                                              self.tra_if)
-            for p in recv_pkts:
+            for rx in recv_pkts:
                 try:
-                    decrypted = vpp_tra_sa.decrypt(p[IP])
+                    decrypted = p.vpp_tra_sa.decrypt(rx[IP])
                     self.assert_packet_checksums_valid(decrypted)
                 except:
-                    self.logger.debug(ppp("Unexpected packet:", p))
+                    self.logger.debug(ppp("Unexpected packet:", rx))
                     raise
         finally:
             self.logger.info(self.vapi.ppcli("show error"))
             self.logger.info(self.vapi.ppcli("show ipsec"))
 
+        self.assert_packet_counter_equal(self.tra4_encrypt_node_name, count)
+        self.assert_packet_counter_equal(self.tra4_decrypt_node_name, count)
+
     def test_tra_burst(self):
         """ ipsec v4 transport burst test """
         self.test_tra_basic(count=257)
 
     def test_tra_basic6(self, count=1):
         """ ipsec v6 transport basic test """
+        self.vapi.cli("clear errors")
         try:
             p = self.params[socket.AF_INET6]
-            vpp_tra_sa, scapy_tra_sa = self.configure_sa_tra(p)
-            send_pkts = self.gen_encrypt_pkts6(scapy_tra_sa, self.tra_if,
+            send_pkts = self.gen_encrypt_pkts6(p.scapy_tra_sa, self.tra_if,
                                                src=self.tra_if.remote_ip6,
                                                dst=self.tra_if.local_ip6,
                                                count=count)
             recv_pkts = self.send_and_expect(self.tra_if, send_pkts,
                                              self.tra_if)
-            for p in recv_pkts:
+            for rx in recv_pkts:
                 try:
-                    decrypted = vpp_tra_sa.decrypt(p[IPv6])
+                    decrypted = p.vpp_tra_sa.decrypt(rx[IPv6])
                     self.assert_packet_checksums_valid(decrypted)
                 except:
-                    self.logger.debug(ppp("Unexpected packet:", p))
+                    self.logger.debug(ppp("Unexpected packet:", rx))
                     raise
         finally:
             self.logger.info(self.vapi.ppcli("show error"))
             self.logger.info(self.vapi.ppcli("show ipsec"))
 
+        self.assert_packet_counter_equal(self.tra6_encrypt_node_name, count)
+        self.assert_packet_counter_equal(self.tra6_decrypt_node_name, count)
+
     def test_tra_burst6(self):
         """ ipsec v6 transport burst test """
         self.test_tra_basic6(count=257)
@@ -242,6 +327,7 @@ class IpsecTraTests(object):
 class IpsecTun4Tests(object):
     def test_tun_basic44(self, count=1):
         """ ipsec 4o4 tunnel basic test """
+        self.vapi.cli("clear errors")
         try:
             p = self.params[socket.AF_INET]
             vpp_tun_sa, scapy_tun_sa = self.configure_sa_tun(p)
@@ -277,6 +363,9 @@ class IpsecTun4Tests(object):
             self.logger.info(self.vapi.ppcli("show error"))
             self.logger.info(self.vapi.ppcli("show ipsec"))
 
+        self.assert_packet_counter_equal(self.tun4_encrypt_node_name, count)
+        self.assert_packet_counter_equal(self.tun4_decrypt_node_name, count)
+
     def test_tun_burst44(self):
         """ ipsec 4o4 tunnel burst test """
         self.test_tun_basic44(count=257)
@@ -285,6 +374,7 @@ class IpsecTun4Tests(object):
 class IpsecTun6Tests(object):
     def test_tun_basic66(self, count=1):
         """ ipsec 6o6 tunnel basic test """
+        self.vapi.cli("clear errors")
         try:
             p = self.params[socket.AF_INET6]
             vpp_tun_sa, scapy_tun_sa = self.configure_sa_tun(p)
@@ -321,6 +411,9 @@ class IpsecTun6Tests(object):
             self.logger.info(self.vapi.ppcli("show error"))
             self.logger.info(self.vapi.ppcli("show ipsec"))
 
+        self.assert_packet_counter_equal(self.tun6_encrypt_node_name, count)
+        self.assert_packet_counter_equal(self.tun6_decrypt_node_name, count)
+
     def test_tun_burst66(self):
         """ ipsec 6o6 tunnel burst test """
         self.test_tun_basic66(count=257)