sctp: move to plugins, disabled by default
[vpp.git] / test / template_ipsec.py
index c623d6a..d714a93 100644 (file)
@@ -293,7 +293,7 @@ class IpsecTra4(object):
 
         # replayed packets are dropped
         self.send_and_assert_no_replies(self.tra_if, pkt * 3)
-        self.assert_packet_counter_equal(
+        self.assert_error_counter_equal(
             '/err/%s/SA replayed packet' % self.tra4_decrypt_node_name, 3)
 
         # the window size is 64 packets
@@ -321,7 +321,7 @@ class IpsecTra4(object):
                                 seq_num=350))
         self.send_and_assert_no_replies(self.tra_if, pkt * 17)
 
-        self.assert_packet_counter_equal(
+        self.assert_error_counter_equal(
             '/err/%s/Integrity check failed' % self.tra4_decrypt_node_name, 17)
 
         # a malformed 'runt' packet
@@ -337,7 +337,7 @@ class IpsecTra4(object):
                                     seq_num=350))
             self.send_and_assert_no_replies(self.tra_if, pkt * 17)
 
-            self.assert_packet_counter_equal(
+            self.assert_error_counter_equal(
                 '/err/%s/undersized packet' % self.tra4_decrypt_node_name, 17)
 
         # which we can determine since this packet is still in the window
@@ -361,12 +361,12 @@ class IpsecTra4(object):
         if use_esn:
             # an out of window error with ESN looks like a high sequence
             # wrap. but since it isn't then the verify will fail.
-            self.assert_packet_counter_equal(
+            self.assert_error_counter_equal(
                 '/err/%s/Integrity check failed' %
                 self.tra4_decrypt_node_name, 34)
 
         else:
-            self.assert_packet_counter_equal(
+            self.assert_error_counter_equal(
                 '/err/%s/SA replayed packet' %
                 self.tra4_decrypt_node_name, 20)
 
@@ -411,7 +411,7 @@ class IpsecTra4(object):
             decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
         else:
             self.send_and_assert_no_replies(self.tra_if, [pkt])
-            self.assert_packet_counter_equal(
+            self.assert_error_counter_equal(
                 '/err/%s/sequence number cycled' %
                 self.tra4_encrypt_node_name, 1)
 
@@ -442,7 +442,7 @@ class IpsecTra4(object):
                     raise
         finally:
             self.logger.info(self.vapi.ppcli("show error"))
-            self.logger.info(self.vapi.ppcli("show ipsec"))
+            self.logger.info(self.vapi.ppcli("show ipsec all"))
 
         pkts = p.tra_sa_in.get_stats()['packets']
         self.assertEqual(pkts, count,
@@ -495,7 +495,7 @@ class IpsecTra6(object):
                     raise
         finally:
             self.logger.info(self.vapi.ppcli("show error"))
-            self.logger.info(self.vapi.ppcli("show ipsec"))
+            self.logger.info(self.vapi.ppcli("show ipsec all"))
 
         pkts = p.tra_sa_in.get_stats()['packets']
         self.assertEqual(pkts, count,
@@ -527,7 +527,9 @@ class IpsecTra46Tests(IpsecTra4Tests, IpsecTra6Tests):
 
 class IpsecTun4(object):
     """ verify methods for Tunnel v4 """
-    def verify_counters(self, p, count):
+    def verify_counters4(self, p, count, n_frags=None):
+        if not n_frags:
+            n_frags = count
         if (hasattr(p, "spd_policy_in_any")):
             pkts = p.spd_policy_in_any.get_stats()['packets']
             self.assertEqual(pkts, count,
@@ -544,7 +546,7 @@ class IpsecTun4(object):
                              "incorrect SA out counts: expected %d != %d" %
                              (count, pkts))
 
-        self.assert_packet_counter_equal(self.tun4_encrypt_node_name, count)
+        self.assert_packet_counter_equal(self.tun4_encrypt_node_name, n_frags)
         self.assert_packet_counter_equal(self.tun4_decrypt_node_name, count)
 
     def verify_decrypted(self, p, rxs):
@@ -598,9 +600,9 @@ class IpsecTun4(object):
 
         finally:
             self.logger.info(self.vapi.ppcli("show error"))
-            self.logger.info(self.vapi.ppcli("show ipsec"))
+            self.logger.info(self.vapi.ppcli("show ipsec all"))
 
-        self.verify_counters(p, count)
+        self.verify_counters4(p, count, n_rx)
 
     def verify_tun_64(self, p, count=1):
         self.vapi.cli("clear errors")
@@ -636,9 +638,9 @@ class IpsecTun4(object):
                     raise
         finally:
             self.logger.info(self.vapi.ppcli("show error"))
-            self.logger.info(self.vapi.ppcli("show ipsec"))
+            self.logger.info(self.vapi.ppcli("show ipsec all"))
 
-        self.verify_counters(p, count)
+        self.verify_counters4(p, count)
 
 
 class IpsecTun4Tests(IpsecTun4):
@@ -654,59 +656,85 @@ class IpsecTun4Tests(IpsecTun4):
 
 class IpsecTun6(object):
     """ verify methods for Tunnel v6 """
-    def verify_counters(self, p, count):
-        if (hasattr(p, "tun_sa_in")):
-            pkts = p.tun_sa_in.get_stats()['packets']
+    def verify_counters6(self, p_in, p_out, count):
+        if (hasattr(p_in, "tun_sa_in")):
+            pkts = p_in.tun_sa_in.get_stats()['packets']
             self.assertEqual(pkts, count,
                              "incorrect SA in counts: expected %d != %d" %
                              (count, pkts))
-            pkts = p.tun_sa_out.get_stats()['packets']
+        if (hasattr(p_out, "tun_sa_out")):
+            pkts = p_out.tun_sa_out.get_stats()['packets']
             self.assertEqual(pkts, count,
                              "incorrect SA out counts: expected %d != %d" %
                              (count, pkts))
         self.assert_packet_counter_equal(self.tun6_encrypt_node_name, count)
         self.assert_packet_counter_equal(self.tun6_decrypt_node_name, count)
 
-    def verify_tun_66(self, p, count=1):
-        """ ipsec 6o6 tunnel basic test """
+    def verify_decrypted6(self, p, rxs):
+        for rx in rxs:
+            self.assert_equal(rx[IPv6].src, p.remote_tun_if_host)
+            self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
+            self.assert_packet_checksums_valid(rx)
+
+    def verify_encrypted6(self, p, sa, rxs):
+        for rx in rxs:
+            self.assert_packet_checksums_valid(rx)
+            self.assertEqual(len(rx) - len(Ether()) - len(IPv6()),
+                             rx[IPv6].plen)
+            try:
+                decrypt_pkt = p.vpp_tun_sa.decrypt(rx[IPv6])
+                if not decrypt_pkt.haslayer(IPv6):
+                    decrypt_pkt = IPv6(decrypt_pkt[Raw].load)
+                self.assert_packet_checksums_valid(decrypt_pkt)
+                self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip6)
+                self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host)
+            except:
+                self.logger.debug(ppp("Unexpected packet:", rx))
+                try:
+                    self.logger.debug(ppp("Decrypted packet:", decrypt_pkt))
+                except:
+                    pass
+                raise
+
+    def verify_drop_tun_66(self, p_in, count=1, payload_size=64):
         self.vapi.cli("clear errors")
+        self.vapi.cli("clear ipsec sa")
+
+        config_tun_params(p_in, self.encryption_type, self.tun_if)
+        send_pkts = self.gen_encrypt_pkts6(p_in.scapy_tun_sa, self.tun_if,
+                                           src=p_in.remote_tun_if_host,
+                                           dst=self.pg1.remote_ip6,
+                                           count=count)
+        self.send_and_assert_no_replies(self.tun_if, send_pkts)
+        self.logger.info(self.vapi.cli("sh punt stats"))
+
+    def verify_tun_66(self, p_in, p_out=None, count=1, payload_size=64):
+        self.vapi.cli("clear errors")
+        self.vapi.cli("clear ipsec sa")
+        if not p_out:
+            p_out = p_in
         try:
-            config_tun_params(p, self.encryption_type, self.tun_if)
-            send_pkts = self.gen_encrypt_pkts6(p.scapy_tun_sa, self.tun_if,
-                                               src=p.remote_tun_if_host,
+            config_tun_params(p_in, self.encryption_type, self.tun_if)
+            config_tun_params(p_out, self.encryption_type, self.tun_if)
+            send_pkts = self.gen_encrypt_pkts6(p_in.scapy_tun_sa, self.tun_if,
+                                               src=p_in.remote_tun_if_host,
                                                dst=self.pg1.remote_ip6,
                                                count=count)
             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
-            for recv_pkt in recv_pkts:
-                self.assert_equal(recv_pkt[IPv6].src, p.remote_tun_if_host)
-                self.assert_equal(recv_pkt[IPv6].dst, self.pg1.remote_ip6)
-                self.assert_packet_checksums_valid(recv_pkt)
+            self.verify_decrypted6(p_in, recv_pkts)
+
             send_pkts = self.gen_pkts6(self.pg1, src=self.pg1.remote_ip6,
-                                       dst=p.remote_tun_if_host,
-                                       count=count)
-            recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
-            for recv_pkt in recv_pkts:
-                self.assertEqual(len(recv_pkt) - len(Ether()) - len(IPv6()),
-                                 recv_pkt[IPv6].plen)
-                try:
-                    decrypt_pkt = p.vpp_tun_sa.decrypt(recv_pkt[IPv6])
-                    if not decrypt_pkt.haslayer(IPv6):
-                        decrypt_pkt = IPv6(decrypt_pkt[Raw].load)
-                    self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip6)
-                    self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host)
-                    self.assert_packet_checksums_valid(decrypt_pkt)
-                except:
-                    self.logger.debug(ppp("Unexpected packet:", recv_pkt))
-                    try:
-                        self.logger.debug(
-                            ppp("Decrypted packet:", decrypt_pkt))
-                    except:
-                        pass
-                    raise
+                                       dst=p_out.remote_tun_if_host,
+                                       count=count,
+                                       payload_size=payload_size)
+            recv_pkts = self.send_and_expect(self.pg1, send_pkts,
+                                             self.tun_if)
+            self.verify_encrypted6(p_out, p_out.vpp_tun_sa, recv_pkts)
+
         finally:
             self.logger.info(self.vapi.ppcli("show error"))
-            self.logger.info(self.vapi.ppcli("show ipsec"))
-        self.verify_counters(p, count)
+            self.logger.info(self.vapi.ppcli("show ipsec all"))
+        self.verify_counters6(p_in, p_out, count)
 
     def verify_tun_46(self, p, count=1):
         """ ipsec 4o6 tunnel basic test """
@@ -744,8 +772,8 @@ class IpsecTun6(object):
                     raise
         finally:
             self.logger.info(self.vapi.ppcli("show error"))
-            self.logger.info(self.vapi.ppcli("show ipsec"))
-        self.verify_counters(p, count)
+            self.logger.info(self.vapi.ppcli("show ipsec all"))
+        self.verify_counters6(p, p, count)
 
 
 class IpsecTun6Tests(IpsecTun6):