ikev2: support responder hostname
[vpp.git] / test / template_ipsec.py
index 7bd0b9e..f7becf0 100644 (file)
@@ -15,7 +15,7 @@ from util import ppp, reassemble4, fragment_rfc791, fragment_rfc8200
 from vpp_papi import VppEnum
 
 
-class IPsecIPv4Params(object):
+class IPsecIPv4Params:
 
     addr_type = socket.AF_INET
     addr_any = "0.0.0.0"
@@ -28,14 +28,19 @@ class IPsecIPv4Params(object):
         self.remote_tun_if_host6 = '1111::1'
 
         self.scapy_tun_sa_id = 100
-        self.scapy_tun_spi = 1001
+        self.scapy_tun_spi = 1000
         self.vpp_tun_sa_id = 200
-        self.vpp_tun_spi = 1000
+        self.vpp_tun_spi = 2000
 
         self.scapy_tra_sa_id = 300
-        self.scapy_tra_spi = 2001
+        self.scapy_tra_spi = 3000
         self.vpp_tra_sa_id = 400
-        self.vpp_tra_spi = 2000
+        self.vpp_tra_spi = 4000
+
+        self.outer_hop_limit = 64
+        self.inner_hop_limit = 255
+        self.outer_flow_label = 0
+        self.inner_flow_label = 0x12345
 
         self.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
                                  IPSEC_API_INTEG_ALG_SHA1_96)
@@ -52,9 +57,10 @@ class IPsecIPv4Params(object):
         self.tun_flags = (VppEnum.vl_api_tunnel_encap_decap_flags_t.
                           TUNNEL_API_ENCAP_DECAP_FLAG_NONE)
         self.dscp = 0
+        self.async_mode = False
 
 
-class IPsecIPv6Params(object):
+class IPsecIPv6Params:
 
     addr_type = socket.AF_INET6
     addr_any = "0::0"
@@ -76,6 +82,11 @@ class IPsecIPv6Params(object):
         self.vpp_tra_sa_id = 800
         self.vpp_tra_spi = 4000
 
+        self.outer_hop_limit = 64
+        self.inner_hop_limit = 255
+        self.outer_flow_label = 0
+        self.inner_flow_label = 0x12345
+
         self.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
                                  IPSEC_API_INTEG_ALG_SHA1_96)
         self.auth_algo = 'HMAC-SHA1-96'  # scapy name
@@ -91,10 +102,11 @@ class IPsecIPv6Params(object):
         self.tun_flags = (VppEnum.vl_api_tunnel_encap_decap_flags_t.
                           TUNNEL_API_ENCAP_DECAP_FLAG_NONE)
         self.dscp = 0
+        self.async_mode = False
 
 
 def mk_scapy_crypt_key(p):
-    if p.crypt_algo == "AES-GCM":
+    if p.crypt_algo in ("AES-GCM", "AES-CTR"):
         return p.crypt_key + struct.pack("!I", p.salt)
     else:
         return p.crypt_key
@@ -242,7 +254,9 @@ class TemplateIpsec(VppTestCase):
     def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1,
                           payload_size=54):
         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
-                sa.encrypt(IPv6(src=src, dst=dst) /
+                sa.encrypt(IPv6(src=src, dst=dst,
+                                hlim=p.inner_hop_limit,
+                                fl=p.inner_flow_label) /
                            ICMPv6EchoRequest(id=0, seq=1,
                                              data='X' * payload_size))
                 for i in range(count)]
@@ -252,9 +266,10 @@ class TemplateIpsec(VppTestCase):
                 IP(src=src, dst=dst) / ICMP() / Raw(b'X' * payload_size)
                 for i in range(count)]
 
-    def gen_pkts6(self, sw_intf, src, dst, count=1, payload_size=54):
+    def gen_pkts6(self, p, sw_intf, src, dst, count=1, payload_size=54):
         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
-                IPv6(src=src, dst=dst) /
+                IPv6(src=src, dst=dst,
+                     hlim=p.inner_hop_limit, fl=p.inner_flow_label) /
                 ICMPv6EchoRequest(id=0, seq=1, data='X' * payload_size)
                 for i in range(count)]
 
@@ -282,28 +297,46 @@ class IpsecTcpTests(IpsecTcp):
 
 class IpsecTra4(object):
     """ verify methods for Transport v4 """
+    def get_replay_counts(self, p):
+        replay_node_name = ('/err/%s/SA replayed packet' %
+                            self.tra4_decrypt_node_name[0])
+        count = self.statistics.get_err_counter(replay_node_name)
+
+        if p.async_mode:
+            replay_post_node_name = ('/err/%s/SA replayed packet' %
+                                     self.tra4_decrypt_node_name[p.async_mode])
+            count += self.statistics.get_err_counter(replay_post_node_name)
+
+        return count
+
+    def get_hash_failed_counts(self, p):
+        if ESP == self.encryption_type and p.crypt_algo == "AES-GCM":
+            hash_failed_node_name = ('/err/%s/ESP decryption failed' %
+                                     self.tra4_decrypt_node_name[p.async_mode])
+        else:
+            hash_failed_node_name = ('/err/%s/Integrity check failed' %
+                                     self.tra4_decrypt_node_name[p.async_mode])
+        count = self.statistics.get_err_counter(hash_failed_node_name)
+
+        if p.async_mode:
+            count += self.statistics.get_err_counter(
+                '/err/crypto-dispatch/bad-hmac')
+
+        return count
+
     def verify_tra_anti_replay(self):
         p = self.params[socket.AF_INET]
         esn_en = p.vpp_tra_sa.esn_en
 
         seq_cycle_node_name = ('/err/%s/sequence number cycled' %
                                self.tra4_encrypt_node_name)
-        replay_node_name = ('/err/%s/SA replayed packet' %
-                            self.tra4_decrypt_node_name)
-        if ESP == self.encryption_type and p.crypt_algo == "AES-GCM":
-            hash_failed_node_name = ('/err/%s/ESP decryption failed' %
-                                     self.tra4_decrypt_node_name)
-        else:
-            hash_failed_node_name = ('/err/%s/Integrity check failed' %
-                                     self.tra4_decrypt_node_name)
-        replay_count = self.statistics.get_err_counter(replay_node_name)
-        hash_failed_count = self.statistics.get_err_counter(
-            hash_failed_node_name)
+        replay_count = self.get_replay_counts(p)
+        hash_failed_count = self.get_hash_failed_counts(p)
         seq_cycle_count = self.statistics.get_err_counter(seq_cycle_node_name)
 
         if ESP == self.encryption_type:
             undersize_node_name = ('/err/%s/undersized packet' %
-                                   self.tra4_decrypt_node_name)
+                                   self.tra4_decrypt_node_name[0])
             undersize_count = self.statistics.get_err_counter(
                 undersize_node_name)
 
@@ -327,12 +360,14 @@ class IpsecTra4(object):
         # replayed packets are dropped
         self.send_and_assert_no_replies(self.tra_if, pkts)
         replay_count += len(pkts)
-        self.assert_error_counter_equal(replay_node_name, replay_count)
+        self.assertEqual(self.get_replay_counts(p), replay_count)
 
         #
         # now send a batch of packets all with the same sequence number
         # the first packet in the batch is legitimate, the rest bogus
         #
+        self.vapi.cli("clear error")
+        self.vapi.cli("clear node counters")
         pkts = (Ether(src=self.tra_if.remote_mac,
                       dst=self.tra_if.local_mac) /
                 p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
@@ -342,11 +377,12 @@ class IpsecTra4(object):
         recv_pkts = self.send_and_expect(self.tra_if, pkts * 8,
                                          self.tra_if, n_rx=1)
         replay_count += 7
-        self.assert_error_counter_equal(replay_node_name, replay_count)
+        self.assertEqual(self.get_replay_counts(p), replay_count)
 
         #
         # now move the window over to 257 (more than one byte) and into Case A
         #
+        self.vapi.cli("clear error")
         pkt = (Ether(src=self.tra_if.remote_mac,
                      dst=self.tra_if.local_mac) /
                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
@@ -358,7 +394,7 @@ class IpsecTra4(object):
         # replayed packets are dropped
         self.send_and_assert_no_replies(self.tra_if, pkt * 3)
         replay_count += 3
-        self.assert_error_counter_equal(replay_node_name, replay_count)
+        self.assertEqual(self.get_replay_counts(p), replay_count)
 
         # the window size is 64 packets
         # in window are still accepted
@@ -386,8 +422,7 @@ class IpsecTra4(object):
         self.send_and_assert_no_replies(self.tra_if, pkt * 17)
 
         hash_failed_count += 17
-        self.assert_error_counter_equal(hash_failed_node_name,
-                                        hash_failed_count)
+        self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
 
         # a malformed 'runt' packet
         #  created by a mis-constructed SA
@@ -432,13 +467,11 @@ class IpsecTra4(object):
             # an out of window error with ESN looks like a high sequence
             # wrap. but since it isn't then the verify will fail.
             hash_failed_count += 17
-            self.assert_error_counter_equal(hash_failed_node_name,
-                                            hash_failed_count)
+            self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
 
         else:
             replay_count += 17
-            self.assert_error_counter_equal(replay_node_name,
-                                            replay_count)
+            self.assertEqual(self.get_replay_counts(p), replay_count)
 
         # valid packet moves the window over to 258
         pkt = (Ether(src=self.tra_if.remote_mac,
@@ -522,8 +555,7 @@ class IpsecTra4(object):
             self.send_and_assert_no_replies(self.tra_if, [pkt], self.tra_if)
 
             hash_failed_count += 1
-            self.assert_error_counter_equal(hash_failed_node_name,
-                                            hash_failed_count)
+            self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
 
             #
             # but if we move the wondow forward to case B, then we can wrap
@@ -600,7 +632,7 @@ class IpsecTra4(object):
                          (count, pkts))
 
         self.assert_packet_counter_equal(self.tra4_encrypt_node_name, count)
-        self.assert_packet_counter_equal(self.tra4_decrypt_node_name, count)
+        self.assert_packet_counter_equal(self.tra4_decrypt_node_name[0], count)
 
 
 class IpsecTra4Tests(IpsecTra4):
@@ -654,7 +686,7 @@ class IpsecTra6(object):
                          "incorrect SA out counts: expected %d != %d" %
                          (count, pkts))
         self.assert_packet_counter_equal(self.tra6_encrypt_node_name, count)
-        self.assert_packet_counter_equal(self.tra6_decrypt_node_name, count)
+        self.assert_packet_counter_equal(self.tra6_decrypt_node_name[0], count)
 
     def gen_encrypt_pkts_ext_hdrs6(self, sa, sw_intf, src, dst, count=1,
                                    payload_size=54):
@@ -798,12 +830,12 @@ class IpsecTun4(object):
                              "incorrect SA in counts: expected %d != %d" %
                              (count, pkts))
             pkts = p.tun_sa_out.get_stats(worker)['packets']
-            self.assertEqual(pkts, count,
+            self.assertEqual(pkts, n_frags,
                              "incorrect SA out counts: expected %d != %d" %
                              (count, pkts))
 
         self.assert_packet_counter_equal(self.tun4_encrypt_node_name, n_frags)
-        self.assert_packet_counter_equal(self.tun4_decrypt_node_name, count)
+        self.assert_packet_counter_equal(self.tun4_decrypt_node_name[0], count)
 
     def verify_decrypted(self, p, rxs):
         for rx in rxs:
@@ -945,7 +977,7 @@ class IpsecTun4(object):
                 self.assert_equal(recv_pkt[IPv6].src, p.remote_tun_if_host6)
                 self.assert_equal(recv_pkt[IPv6].dst, self.pg1.remote_ip6)
                 self.assert_packet_checksums_valid(recv_pkt)
-            send_pkts = self.gen_pkts6(self.pg1, src=self.pg1.remote_ip6,
+            send_pkts = self.gen_pkts6(p, self.pg1, src=self.pg1.remote_ip6,
                                        dst=p.remote_tun_if_host6, count=count)
             recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
             for recv_pkt in recv_pkts:
@@ -1021,7 +1053,7 @@ class IpsecTun6(object):
                              "incorrect SA out counts: expected %d != %d" %
                              (count, pkts))
         self.assert_packet_counter_equal(self.tun6_encrypt_node_name, count)
-        self.assert_packet_counter_equal(self.tun6_decrypt_node_name, count)
+        self.assert_packet_counter_equal(self.tun6_decrypt_node_name[0], count)
 
     def verify_decrypted6(self, p, rxs):
         for rx in rxs:
@@ -1034,6 +1066,9 @@ class IpsecTun6(object):
             self.assert_packet_checksums_valid(rx)
             self.assertEqual(len(rx) - len(Ether()) - len(IPv6()),
                              rx[IPv6].plen)
+            self.assert_equal(rx[IPv6].hlim, p.outer_hop_limit)
+            if p.outer_flow_label:
+                self.assert_equal(rx[IPv6].fl, p.outer_flow_label)
             try:
                 decrypt_pkt = p.vpp_tun_sa.decrypt(rx[IPv6])
                 if not decrypt_pkt.haslayer(IPv6):
@@ -1041,6 +1076,8 @@ class IpsecTun6(object):
                 self.assert_packet_checksums_valid(decrypt_pkt)
                 self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip6)
                 self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host)
+                self.assert_equal(decrypt_pkt.hlim, p.inner_hop_limit - 1)
+                self.assert_equal(decrypt_pkt.fl, p.inner_flow_label)
             except:
                 self.logger.debug(ppp("Unexpected packet:", rx))
                 try:
@@ -1076,7 +1113,7 @@ class IpsecTun6(object):
             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
             self.verify_decrypted6(p_in, recv_pkts)
 
-            send_pkts = self.gen_pkts6(self.pg1, src=self.pg1.remote_ip6,
+            send_pkts = self.gen_pkts6(p_in, self.pg1, src=self.pg1.remote_ip6,
                                        dst=p_out.remote_tun_if_host,
                                        count=count,
                                        payload_size=payload_size)
@@ -1108,7 +1145,7 @@ class IpsecTun6(object):
                                              self.pg1, n_rx=1)
             self.verify_decrypted6(p, recv_pkts)
 
-            send_pkts = self.gen_pkts6(self.pg1, src=self.pg1.remote_ip6,
+            send_pkts = self.gen_pkts6(p, self.pg1, src=self.pg1.remote_ip6,
                                        dst=p.remote_tun_if_host,
                                        count=1,
                                        payload_size=64)
@@ -1184,6 +1221,9 @@ class IpsecTun6HandoffTests(IpsecTun6):
 
     def test_tun_handoff_66(self):
         """ ipsec 6o6 tunnel worker hand-off test """
+        self.vapi.cli("clear errors")
+        self.vapi.cli("clear ipsec sa")
+
         N_PKTS = 15
         p = self.params[socket.AF_INET6]
 
@@ -1198,7 +1238,7 @@ class IpsecTun6HandoffTests(IpsecTun6):
                                              self.pg1, worker=worker)
             self.verify_decrypted6(p, recv_pkts)
 
-            send_pkts = self.gen_pkts6(self.pg1, src=self.pg1.remote_ip6,
+            send_pkts = self.gen_pkts6(p, self.pg1, src=self.pg1.remote_ip6,
                                        dst=p.remote_tun_if_host,
                                        count=N_PKTS)
             recv_pkts = self.send_and_expect(self.pg1, send_pkts,
@@ -1215,6 +1255,9 @@ class IpsecTun4HandoffTests(IpsecTun4):
 
     def test_tun_handooff_44(self):
         """ ipsec 4o4 tunnel worker hand-off test """
+        self.vapi.cli("clear errors")
+        self.vapi.cli("clear ipsec sa")
+
         N_PKTS = 15
         p = self.params[socket.AF_INET]