add api trace print
[vpp.git] / test / template_ipsec.py
index 77461d4..78d7584 100644 (file)
@@ -1,7 +1,7 @@
 import unittest
 import socket
 
-from scapy.layers.inet import IP, ICMP, TCP
+from scapy.layers.inet import IP, ICMP, TCP, UDP
 from scapy.layers.ipsec import SecurityAssociation
 from scapy.layers.l2 import Ether, Raw
 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest
@@ -41,6 +41,8 @@ class IPsecIPv4Params(object):
                                   IPSEC_API_CRYPTO_ALG_AES_CBC_128)
         self.crypt_algo = 'AES-CBC'  # scapy name
         self.crypt_key = 'JPjyOWBeVEQiMe7h'
+        self.flags = 0
+        self.nat_header = None
 
 
 class IPsecIPv6Params(object):
@@ -73,6 +75,47 @@ class IPsecIPv6Params(object):
                                   IPSEC_API_CRYPTO_ALG_AES_CBC_256)
         self.crypt_algo = 'AES-CBC'  # scapy name
         self.crypt_key = 'JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h'
+        self.flags = 0
+        self.nat_header = None
+
+
+def config_tun_params(p, encryption_type, tun_if):
+    ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
+    p.scapy_tun_sa = SecurityAssociation(
+        encryption_type, spi=p.vpp_tun_spi,
+        crypt_algo=p.crypt_algo, crypt_key=p.crypt_key,
+        auth_algo=p.auth_algo, auth_key=p.auth_key,
+        tunnel_header=ip_class_by_addr_type[p.addr_type](
+            src=tun_if.remote_addr[p.addr_type],
+            dst=tun_if.local_addr[p.addr_type]),
+        nat_t_header=p.nat_header)
+    p.vpp_tun_sa = SecurityAssociation(
+        encryption_type, spi=p.scapy_tun_spi,
+        crypt_algo=p.crypt_algo, crypt_key=p.crypt_key,
+        auth_algo=p.auth_algo, auth_key=p.auth_key,
+        tunnel_header=ip_class_by_addr_type[p.addr_type](
+            dst=tun_if.remote_addr[p.addr_type],
+            src=tun_if.local_addr[p.addr_type]),
+        nat_t_header=p.nat_header)
+
+
+def config_tra_params(p, encryption_type):
+    p.scapy_tra_sa = SecurityAssociation(
+        encryption_type,
+        spi=p.vpp_tra_spi,
+        crypt_algo=p.crypt_algo,
+        crypt_key=p.crypt_key,
+        auth_algo=p.auth_algo,
+        auth_key=p.auth_key,
+        nat_t_header=p.nat_header)
+    p.vpp_tra_sa = SecurityAssociation(
+        encryption_type,
+        spi=p.scapy_tra_spi,
+        crypt_algo=p.crypt_algo,
+        crypt_key=p.crypt_key,
+        auth_algo=p.auth_algo,
+        auth_key=p.auth_key,
+        nat_t_header=p.nat_header)
 
 
 class TemplateIpsec(VppTestCase):
@@ -160,37 +203,23 @@ class TemplateIpsec(VppTestCase):
                 ICMPv6EchoRequest(id=0, seq=1, data=self.payload)
                 for i in range(count)]
 
-    def configure_sa_tun(self, params):
-        ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
-        scapy_tun_sa = SecurityAssociation(
-            self.encryption_type, spi=params.vpp_tun_spi,
-            crypt_algo=params.crypt_algo, crypt_key=params.crypt_key,
-            auth_algo=params.auth_algo, auth_key=params.auth_key,
-            tunnel_header=ip_class_by_addr_type[params.addr_type](
-                src=self.tun_if.remote_addr[params.addr_type],
-                dst=self.tun_if.local_addr[params.addr_type]))
-        vpp_tun_sa = SecurityAssociation(
-            self.encryption_type, spi=params.scapy_tun_spi,
-            crypt_algo=params.crypt_algo, crypt_key=params.crypt_key,
-            auth_algo=params.auth_algo, auth_key=params.auth_key,
-            tunnel_header=ip_class_by_addr_type[params.addr_type](
-                dst=self.tun_if.remote_addr[params.addr_type],
-                src=self.tun_if.local_addr[params.addr_type]))
-        return vpp_tun_sa, scapy_tun_sa
-
     def configure_sa_tra(self, params):
-        params.scapy_tra_sa = SecurityAssociation(self.encryption_type,
-                                                  spi=params.vpp_tra_spi,
-                                                  crypt_algo=params.crypt_algo,
-                                                  crypt_key=params.crypt_key,
-                                                  auth_algo=params.auth_algo,
-                                                  auth_key=params.auth_key)
-        params.vpp_tra_sa = SecurityAssociation(self.encryption_type,
-                                                spi=params.scapy_tra_spi,
-                                                crypt_algo=params.crypt_algo,
-                                                crypt_key=params.crypt_key,
-                                                auth_algo=params.auth_algo,
-                                                auth_key=params.auth_key)
+        params.scapy_tra_sa = SecurityAssociation(
+            self.encryption_type,
+            spi=params.vpp_tra_spi,
+            crypt_algo=params.crypt_algo,
+            crypt_key=params.crypt_key,
+            auth_algo=params.auth_algo,
+            auth_key=params.auth_key,
+            nat_t_header=params.nat_header)
+        params.vpp_tra_sa = SecurityAssociation(
+            self.encryption_type,
+            spi=params.scapy_tra_spi,
+            crypt_algo=params.crypt_algo,
+            crypt_key=params.crypt_key,
+            auth_algo=params.auth_algo,
+            auth_key=params.auth_key,
+            nat_t_header=params.nat_header)
 
 
 class IpsecTcpTests(object):
@@ -198,19 +227,19 @@ class IpsecTcpTests(object):
         """ verify checksum correctness for vpp generated packets """
         self.vapi.cli("test http server")
         p = self.params[socket.AF_INET]
-        vpp_tun_sa, scapy_tun_sa = self.configure_sa_tun(p)
+        config_tun_params(p, self.encryption_type, self.tun_if)
         send = (Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac) /
-                scapy_tun_sa.encrypt(IP(src=p.remote_tun_if_host,
-                                        dst=self.tun_if.local_ip4) /
-                                     TCP(flags='S', dport=80)))
+                p.scapy_tun_sa.encrypt(IP(src=p.remote_tun_if_host,
+                                          dst=self.tun_if.local_ip4) /
+                                       TCP(flags='S', dport=80)))
         self.logger.debug(ppp("Sending packet:", send))
         recv = self.send_and_expect(self.tun_if, [send], self.tun_if)
         recv = recv[0]
-        decrypted = vpp_tun_sa.decrypt(recv[IP])
+        decrypted = p.vpp_tun_sa.decrypt(recv[IP])
         self.assert_packet_checksums_valid(decrypted)
 
 
-class IpsecTraTests(object):
+class IpsecTra4Tests(object):
     def test_tra_anti_replay(self, count=1):
         """ ipsec v4 transport anti-reply test """
         p = self.params[socket.AF_INET]
@@ -278,7 +307,23 @@ class IpsecTraTests(object):
                                       seq_num=234))
         self.send_and_expect(self.tra_if, [pkt], self.tra_if)
 
+        # move VPP's SA to just before the seq-number wrap
+        self.vapi.cli("test ipsec sa %d seq 0xffffffff" % p.scapy_tra_sa_id)
+
+        # then fire in a packet that VPP should drop becuase it causes the
+        # seq number to wrap
+        pkt = (Ether(src=self.tra_if.remote_mac,
+                     dst=self.tra_if.local_mac) /
+               p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
+                                         dst=self.tra_if.local_ip4) /
+                                      ICMP(),
+                                      seq_num=236))
+        self.send_and_assert_no_replies(self.tra_if, [pkt])
+        self.assert_packet_counter_equal(
+            '/err/%s/sequence number cycled' % self.tra4_encrypt_node_name, 1)
+
         # move the security-associations seq number on to the last we used
+        self.vapi.cli("test ipsec sa %d seq 0x15f" % p.scapy_tra_sa_id)
         p.scapy_tra_sa.seq_num = 351
         p.vpp_tra_sa.seq_num = 351
 
@@ -304,6 +349,15 @@ class IpsecTraTests(object):
             self.logger.info(self.vapi.ppcli("show error"))
             self.logger.info(self.vapi.ppcli("show ipsec"))
 
+        pkts = p.tra_sa_in.get_stats()['packets']
+        self.assertEqual(pkts, count,
+                         "incorrect SA in counts: expected %d != %d" %
+                         (count, pkts))
+        pkts = p.tra_sa_out.get_stats()['packets']
+        self.assertEqual(pkts, count,
+                         "incorrect SA out counts: expected %d != %d" %
+                         (count, pkts))
+
         self.assert_packet_counter_equal(self.tra4_encrypt_node_name, count)
         self.assert_packet_counter_equal(self.tra4_decrypt_node_name, count)
 
@@ -311,6 +365,8 @@ class IpsecTraTests(object):
         """ ipsec v4 transport burst test """
         self.test_tra_basic(count=257)
 
+
+class IpsecTra6Tests(object):
     def test_tra_basic6(self, count=1):
         """ ipsec v6 transport basic test """
         self.vapi.cli("clear errors")
@@ -333,6 +389,14 @@ class IpsecTraTests(object):
             self.logger.info(self.vapi.ppcli("show error"))
             self.logger.info(self.vapi.ppcli("show ipsec"))
 
+        pkts = p.tra_sa_in.get_stats()['packets']
+        self.assertEqual(pkts, count,
+                         "incorrect SA in counts: expected %d != %d" %
+                         (count, pkts))
+        pkts = p.tra_sa_out.get_stats()['packets']
+        self.assertEqual(pkts, count,
+                         "incorrect SA out counts: expected %d != %d" %
+                         (count, pkts))
         self.assert_packet_counter_equal(self.tra6_encrypt_node_name, count)
         self.assert_packet_counter_equal(self.tra6_decrypt_node_name, count)
 
@@ -341,14 +405,17 @@ class IpsecTraTests(object):
         self.test_tra_basic6(count=257)
 
 
-class IpsecTun4Tests(object):
-    def test_tun_basic44(self, count=1):
-        """ ipsec 4o4 tunnel basic test """
+class IpsecTra46Tests(IpsecTra4Tests, IpsecTra6Tests):
+    pass
+
+
+class IpsecTun4(object):
+
+    def verify_tun_44(self, p, count=1):
         self.vapi.cli("clear errors")
         try:
-            p = self.params[socket.AF_INET]
-            vpp_tun_sa, scapy_tun_sa = self.configure_sa_tun(p)
-            send_pkts = self.gen_encrypt_pkts(scapy_tun_sa, self.tun_if,
+            config_tun_params(p, self.encryption_type, self.tun_if)
+            send_pkts = self.gen_encrypt_pkts(p.scapy_tun_sa, self.tun_if,
                                               src=p.remote_tun_if_host,
                                               dst=self.pg1.remote_ip4,
                                               count=count)
@@ -362,7 +429,7 @@ class IpsecTun4Tests(object):
             recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
             for recv_pkt in recv_pkts:
                 try:
-                    decrypt_pkt = vpp_tun_sa.decrypt(recv_pkt[IP])
+                    decrypt_pkt = p.vpp_tun_sa.decrypt(recv_pkt[IP])
                     if not decrypt_pkt.haslayer(IP):
                         decrypt_pkt = IP(decrypt_pkt[Raw].load)
                     self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip4)
@@ -385,22 +452,40 @@ class IpsecTun4Tests(object):
             self.assertEqual(pkts, count,
                              "incorrect SPD any policy: expected %d != %d" %
                              (count, pkts))
+
+        if (hasattr(p, "tun_sa_in")):
+            pkts = p.tun_sa_in.get_stats()['packets']
+            self.assertEqual(pkts, count,
+                             "incorrect SA in counts: expected %d != %d" %
+                             (count, pkts))
+            pkts = p.tun_sa_out.get_stats()['packets']
+            self.assertEqual(pkts, count,
+                             "incorrect SA out counts: expected %d != %d" %
+                             (count, pkts))
+
         self.assert_packet_counter_equal(self.tun4_encrypt_node_name, count)
         self.assert_packet_counter_equal(self.tun4_decrypt_node_name, count)
 
+
+class IpsecTun4Tests(IpsecTun4):
+
+    def test_tun_basic44(self):
+        """ ipsec 4o4 tunnel basic test """
+        self.verify_tun_44(self.params[socket.AF_INET], count=1)
+
     def test_tun_burst44(self):
         """ ipsec 4o4 tunnel burst test """
-        self.test_tun_basic44(count=257)
+        self.verify_tun_44(self.params[socket.AF_INET], count=257)
+
 
+class IpsecTun6(object):
 
-class IpsecTun6Tests(object):
-    def test_tun_basic66(self, count=1):
+    def verify_tun_66(self, p, count=1):
         """ ipsec 6o6 tunnel basic test """
         self.vapi.cli("clear errors")
         try:
-            p = self.params[socket.AF_INET6]
-            vpp_tun_sa, scapy_tun_sa = self.configure_sa_tun(p)
-            send_pkts = self.gen_encrypt_pkts6(scapy_tun_sa, self.tun_if,
+            config_tun_params(p, self.encryption_type, self.tun_if)
+            send_pkts = self.gen_encrypt_pkts6(p.scapy_tun_sa, self.tun_if,
                                                src=p.remote_tun_if_host,
                                                dst=self.pg1.remote_ip6,
                                                count=count)
@@ -415,7 +500,7 @@ class IpsecTun6Tests(object):
             recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
             for recv_pkt in recv_pkts:
                 try:
-                    decrypt_pkt = vpp_tun_sa.decrypt(recv_pkt[IPv6])
+                    decrypt_pkt = p.vpp_tun_sa.decrypt(recv_pkt[IPv6])
                     if not decrypt_pkt.haslayer(IPv6):
                         decrypt_pkt = IPv6(decrypt_pkt[Raw].load)
                     self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip6)
@@ -433,15 +518,31 @@ class IpsecTun6Tests(object):
             self.logger.info(self.vapi.ppcli("show error"))
             self.logger.info(self.vapi.ppcli("show ipsec"))
 
+        if (hasattr(p, "tun_sa_in")):
+            pkts = p.tun_sa_in.get_stats()['packets']
+            self.assertEqual(pkts, count,
+                             "incorrect SA in counts: expected %d != %d" %
+                             (count, pkts))
+            pkts = p.tun_sa_out.get_stats()['packets']
+            self.assertEqual(pkts, count,
+                             "incorrect SA out counts: expected %d != %d" %
+                             (count, pkts))
         self.assert_packet_counter_equal(self.tun6_encrypt_node_name, count)
         self.assert_packet_counter_equal(self.tun6_decrypt_node_name, count)
 
+
+class IpsecTun6Tests(IpsecTun6):
+
+    def test_tun_basic66(self):
+        """ ipsec 6o6 tunnel basic test """
+        self.verify_tun_66(self.params[socket.AF_INET6], count=1)
+
     def test_tun_burst66(self):
         """ ipsec 6o6 tunnel burst test """
-        self.test_tun_basic66(count=257)
+        self.verify_tun_66(self.params[socket.AF_INET6], count=257)
 
 
-class IpsecTunTests(IpsecTun4Tests, IpsecTun6Tests):
+class IpsecTun46Tests(IpsecTun4Tests, IpsecTun6Tests):
     pass