ipsec: Use the new tunnel API types to add flow label and TTL copy
[vpp.git] / test / template_ipsec.py
index 56f4b45..0c1f5a1 100644 (file)
@@ -5,7 +5,7 @@ import struct
 from scapy.layers.inet import IP, ICMP, TCP, UDP
 from scapy.layers.ipsec import SecurityAssociation, ESP
 from scapy.layers.l2 import Ether
-from scapy.packet import Raw
+from scapy.packet import raw, Raw
 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, IPv6ExtHdrHopByHop, \
     IPv6ExtHdrFragment, IPv6ExtHdrDestOpt
 
@@ -15,7 +15,7 @@ from util import ppp, reassemble4, fragment_rfc791, fragment_rfc8200
 from vpp_papi import VppEnum
 
 
-class IPsecIPv4Params(object):
+class IPsecIPv4Params:
 
     addr_type = socket.AF_INET
     addr_any = "0.0.0.0"
@@ -27,15 +27,20 @@ class IPsecIPv4Params(object):
         self.remote_tun_if_host = '1.1.1.1'
         self.remote_tun_if_host6 = '1111::1'
 
-        self.scapy_tun_sa_id = 10
-        self.scapy_tun_spi = 1001
-        self.vpp_tun_sa_id = 20
-        self.vpp_tun_spi = 1000
+        self.scapy_tun_sa_id = 100
+        self.scapy_tun_spi = 1000
+        self.vpp_tun_sa_id = 200
+        self.vpp_tun_spi = 2000
 
-        self.scapy_tra_sa_id = 30
-        self.scapy_tra_spi = 2001
-        self.vpp_tra_sa_id = 40
-        self.vpp_tra_spi = 2000
+        self.scapy_tra_sa_id = 300
+        self.scapy_tra_spi = 3000
+        self.vpp_tra_sa_id = 400
+        self.vpp_tra_spi = 4000
+
+        self.outer_hop_limit = 64
+        self.inner_hop_limit = 255
+        self.outer_flow_label = 0
+        self.inner_flow_label = 0x12345
 
         self.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
                                  IPSEC_API_INTEG_ALG_SHA1_96)
@@ -49,9 +54,12 @@ class IPsecIPv4Params(object):
         self.salt = 0
         self.flags = 0
         self.nat_header = None
+        self.tun_flags = (VppEnum.vl_api_tunnel_encap_decap_flags_t.
+                          TUNNEL_API_ENCAP_DECAP_FLAG_NONE)
+        self.dscp = 0
 
 
-class IPsecIPv6Params(object):
+class IPsecIPv6Params:
 
     addr_type = socket.AF_INET6
     addr_any = "0::0"
@@ -63,16 +71,21 @@ class IPsecIPv6Params(object):
         self.remote_tun_if_host = '1111:1111:1111:1111:1111:1111:1111:1111'
         self.remote_tun_if_host4 = '1.1.1.1'
 
-        self.scapy_tun_sa_id = 50
+        self.scapy_tun_sa_id = 500
         self.scapy_tun_spi = 3001
-        self.vpp_tun_sa_id = 60
+        self.vpp_tun_sa_id = 600
         self.vpp_tun_spi = 3000
 
-        self.scapy_tra_sa_id = 70
+        self.scapy_tra_sa_id = 700
         self.scapy_tra_spi = 4001
-        self.vpp_tra_sa_id = 80
+        self.vpp_tra_sa_id = 800
         self.vpp_tra_spi = 4000
 
+        self.outer_hop_limit = 64
+        self.inner_hop_limit = 255
+        self.outer_flow_label = 0
+        self.inner_flow_label = 0x12345
+
         self.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
                                  IPSEC_API_INTEG_ALG_SHA1_96)
         self.auth_algo = 'HMAC-SHA1-96'  # scapy name
@@ -85,10 +98,13 @@ class IPsecIPv6Params(object):
         self.salt = 0
         self.flags = 0
         self.nat_header = None
+        self.tun_flags = (VppEnum.vl_api_tunnel_encap_decap_flags_t.
+                          TUNNEL_API_ENCAP_DECAP_FLAG_NONE)
+        self.dscp = 0
 
 
 def mk_scapy_crypt_key(p):
-    if p.crypt_algo == "AES-GCM":
+    if p.crypt_algo in ("AES-GCM", "AES-CTR"):
         return p.crypt_key + struct.pack("!I", p.salt)
     else:
         return p.crypt_key
@@ -181,8 +197,10 @@ class TemplateIpsec(VppTestCase):
         super(TemplateIpsec, cls).tearDownClass()
 
     def setup_params(self):
-        self.ipv4_params = IPsecIPv4Params()
-        self.ipv6_params = IPsecIPv6Params()
+        if not hasattr(self, 'ipv4_params'):
+            self.ipv4_params = IPsecIPv4Params()
+        if not hasattr(self, 'ipv6_params'):
+            self.ipv6_params = IPsecIPv6Params()
         self.params = {self.ipv4_params.addr_type: self.ipv4_params,
                        self.ipv6_params.addr_type: self.ipv6_params}
 
@@ -224,17 +242,19 @@ class TemplateIpsec(VppTestCase):
     def show_commands_at_teardown(self):
         self.logger.info(self.vapi.cli("show hardware"))
 
-    def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1,
+    def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
                          payload_size=54):
         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
                 sa.encrypt(IP(src=src, dst=dst) /
                            ICMP() / Raw(b'X' * payload_size))
                 for i in range(count)]
 
-    def gen_encrypt_pkts6(self, sa, sw_intf, src, dst, count=1,
+    def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1,
                           payload_size=54):
         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
-                sa.encrypt(IPv6(src=src, dst=dst) /
+                sa.encrypt(IPv6(src=src, dst=dst,
+                                hlim=p.inner_hop_limit,
+                                fl=p.inner_flow_label) /
                            ICMPv6EchoRequest(id=0, seq=1,
                                              data='X' * payload_size))
                 for i in range(count)]
@@ -244,9 +264,10 @@ class TemplateIpsec(VppTestCase):
                 IP(src=src, dst=dst) / ICMP() / Raw(b'X' * payload_size)
                 for i in range(count)]
 
-    def gen_pkts6(self, sw_intf, src, dst, count=1, payload_size=54):
+    def gen_pkts6(self, p, sw_intf, src, dst, count=1, payload_size=54):
         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
-                IPv6(src=src, dst=dst) /
+                IPv6(src=src, dst=dst,
+                     hlim=p.inner_hop_limit, fl=p.inner_flow_label) /
                 ICMPv6EchoRequest(id=0, seq=1, data='X' * payload_size)
                 for i in range(count)]
 
@@ -562,7 +583,7 @@ class IpsecTra4(object):
         self.vapi.cli("clear ipsec sa")
         try:
             p = self.params[socket.AF_INET]
-            send_pkts = self.gen_encrypt_pkts(p.scapy_tra_sa, self.tra_if,
+            send_pkts = self.gen_encrypt_pkts(p, p.scapy_tra_sa, self.tra_if,
                                               src=self.tra_if.remote_ip4,
                                               dst=self.tra_if.local_ip4,
                                               count=count,
@@ -617,7 +638,7 @@ class IpsecTra6(object):
         self.vapi.cli("clear ipsec sa")
         try:
             p = self.params[socket.AF_INET6]
-            send_pkts = self.gen_encrypt_pkts6(p.scapy_tra_sa, self.tra_if,
+            send_pkts = self.gen_encrypt_pkts6(p, p.scapy_tra_sa, self.tra_if,
                                                src=self.tra_if.remote_ip6,
                                                dst=self.tra_if.local_ip6,
                                                count=count,
@@ -790,7 +811,7 @@ class IpsecTun4(object):
                              "incorrect SA in counts: expected %d != %d" %
                              (count, pkts))
             pkts = p.tun_sa_out.get_stats(worker)['packets']
-            self.assertEqual(pkts, count,
+            self.assertEqual(pkts, n_frags,
                              "incorrect SA out counts: expected %d != %d" %
                              (count, pkts))
 
@@ -803,6 +824,15 @@ class IpsecTun4(object):
             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
             self.assert_packet_checksums_valid(rx)
 
+    def verify_esp_padding(self, sa, esp_payload, decrypt_pkt):
+        align = sa.crypt_algo.block_size
+        if align < 4:
+            align = 4
+        exp_len = (len(decrypt_pkt) + 2 + (align - 1)) & ~(align - 1)
+        exp_len += sa.crypt_algo.iv_size
+        exp_len += sa.crypt_algo.icv_size or sa.auth_algo.icv_size
+        self.assertEqual(exp_len, len(esp_payload))
+
     def verify_encrypted(self, p, sa, rxs):
         decrypt_pkts = []
         for rx in rxs:
@@ -811,9 +841,12 @@ class IpsecTun4(object):
             self.assert_packet_checksums_valid(rx)
             self.assertEqual(len(rx) - len(Ether()), rx[IP].len)
             try:
-                decrypt_pkt = p.vpp_tun_sa.decrypt(rx[IP])
+                rx_ip = rx[IP]
+                decrypt_pkt = p.vpp_tun_sa.decrypt(rx_ip)
                 if not decrypt_pkt.haslayer(IP):
                     decrypt_pkt = IP(decrypt_pkt[Raw].load)
+                if rx_ip.proto == socket.IPPROTO_ESP:
+                    self.verify_esp_padding(sa, rx_ip[ESP].data, decrypt_pkt)
                 decrypt_pkts.append(decrypt_pkt)
                 self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip4)
                 self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host)
@@ -831,10 +864,11 @@ class IpsecTun4(object):
     def verify_tun_44(self, p, count=1, payload_size=64, n_rx=None):
         self.vapi.cli("clear errors")
         self.vapi.cli("clear ipsec counters")
+        self.vapi.cli("clear ipsec sa")
         if not n_rx:
             n_rx = count
         try:
-            send_pkts = self.gen_encrypt_pkts(p.scapy_tun_sa, self.tun_if,
+            send_pkts = self.gen_encrypt_pkts(p, p.scapy_tun_sa, self.tun_if,
                                               src=p.remote_tun_if_host,
                                               dst=self.pg1.remote_ip4,
                                               count=count,
@@ -861,13 +895,33 @@ class IpsecTun4(object):
         self.logger.info(self.vapi.ppcli("show ipsec sa 4"))
         self.verify_counters4(p, count, n_rx)
 
+    def verify_tun_dropped_44(self, p, count=1, payload_size=64, n_rx=None):
+        self.vapi.cli("clear errors")
+        if not n_rx:
+            n_rx = count
+        try:
+            send_pkts = self.gen_encrypt_pkts(p, p.scapy_tun_sa, self.tun_if,
+                                              src=p.remote_tun_if_host,
+                                              dst=self.pg1.remote_ip4,
+                                              count=count)
+            self.send_and_assert_no_replies(self.tun_if, send_pkts)
+
+            send_pkts = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
+                                      dst=p.remote_tun_if_host, count=count,
+                                      payload_size=payload_size)
+            self.send_and_assert_no_replies(self.pg1, send_pkts)
+
+        finally:
+            self.logger.info(self.vapi.ppcli("show error"))
+            self.logger.info(self.vapi.ppcli("show ipsec all"))
+
     def verify_tun_reass_44(self, p):
         self.vapi.cli("clear errors")
         self.vapi.ip_reassembly_enable_disable(
             sw_if_index=self.tun_if.sw_if_index, enable_ip4=True)
 
         try:
-            send_pkts = self.gen_encrypt_pkts(p.scapy_tun_sa, self.tun_if,
+            send_pkts = self.gen_encrypt_pkts(p, p.scapy_tun_sa, self.tun_if,
                                               src=p.remote_tun_if_host,
                                               dst=self.pg1.remote_ip4,
                                               payload_size=1900,
@@ -893,8 +947,9 @@ class IpsecTun4(object):
 
     def verify_tun_64(self, p, count=1):
         self.vapi.cli("clear errors")
+        self.vapi.cli("clear ipsec sa")
         try:
-            send_pkts = self.gen_encrypt_pkts6(p.scapy_tun_sa, self.tun_if,
+            send_pkts = self.gen_encrypt_pkts6(p, p.scapy_tun_sa, self.tun_if,
                                                src=p.remote_tun_if_host6,
                                                dst=self.pg1.remote_ip6,
                                                count=count)
@@ -903,7 +958,7 @@ class IpsecTun4(object):
                 self.assert_equal(recv_pkt[IPv6].src, p.remote_tun_if_host6)
                 self.assert_equal(recv_pkt[IPv6].dst, self.pg1.remote_ip6)
                 self.assert_packet_checksums_valid(recv_pkt)
-            send_pkts = self.gen_pkts6(self.pg1, src=self.pg1.remote_ip6,
+            send_pkts = self.gen_pkts6(p, self.pg1, src=self.pg1.remote_ip6,
                                        dst=p.remote_tun_if_host6, count=count)
             recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
             for recv_pkt in recv_pkts:
@@ -992,6 +1047,9 @@ class IpsecTun6(object):
             self.assert_packet_checksums_valid(rx)
             self.assertEqual(len(rx) - len(Ether()) - len(IPv6()),
                              rx[IPv6].plen)
+            self.assert_equal(rx[IPv6].hlim, p.outer_hop_limit)
+            if p.outer_flow_label:
+                self.assert_equal(rx[IPv6].fl, p.outer_flow_label)
             try:
                 decrypt_pkt = p.vpp_tun_sa.decrypt(rx[IPv6])
                 if not decrypt_pkt.haslayer(IPv6):
@@ -999,6 +1057,8 @@ class IpsecTun6(object):
                 self.assert_packet_checksums_valid(decrypt_pkt)
                 self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip6)
                 self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host)
+                self.assert_equal(decrypt_pkt.hlim, p.inner_hop_limit - 1)
+                self.assert_equal(decrypt_pkt.fl, p.inner_flow_label)
             except:
                 self.logger.debug(ppp("Unexpected packet:", rx))
                 try:
@@ -1011,7 +1071,8 @@ class IpsecTun6(object):
         self.vapi.cli("clear errors")
         self.vapi.cli("clear ipsec sa")
 
-        send_pkts = self.gen_encrypt_pkts6(p_in.scapy_tun_sa, self.tun_if,
+        send_pkts = self.gen_encrypt_pkts6(p_in, p_in.scapy_tun_sa,
+                                           self.tun_if,
                                            src=p_in.remote_tun_if_host,
                                            dst=self.pg1.remote_ip6,
                                            count=count)
@@ -1024,7 +1085,8 @@ class IpsecTun6(object):
         if not p_out:
             p_out = p_in
         try:
-            send_pkts = self.gen_encrypt_pkts6(p_in.scapy_tun_sa, self.tun_if,
+            send_pkts = self.gen_encrypt_pkts6(p_in, p_in.scapy_tun_sa,
+                                               self.tun_if,
                                                src=p_in.remote_tun_if_host,
                                                dst=self.pg1.remote_ip6,
                                                count=count,
@@ -1032,7 +1094,7 @@ class IpsecTun6(object):
             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
             self.verify_decrypted6(p_in, recv_pkts)
 
-            send_pkts = self.gen_pkts6(self.pg1, src=self.pg1.remote_ip6,
+            send_pkts = self.gen_pkts6(p_in, self.pg1, src=self.pg1.remote_ip6,
                                        dst=p_out.remote_tun_if_host,
                                        count=count,
                                        payload_size=payload_size)
@@ -1054,7 +1116,7 @@ class IpsecTun6(object):
             sw_if_index=self.tun_if.sw_if_index, enable_ip6=True)
 
         try:
-            send_pkts = self.gen_encrypt_pkts6(p.scapy_tun_sa, self.tun_if,
+            send_pkts = self.gen_encrypt_pkts6(p, p.scapy_tun_sa, self.tun_if,
                                                src=p.remote_tun_if_host,
                                                dst=self.pg1.remote_ip6,
                                                count=1,
@@ -1064,7 +1126,7 @@ class IpsecTun6(object):
                                              self.pg1, n_rx=1)
             self.verify_decrypted6(p, recv_pkts)
 
-            send_pkts = self.gen_pkts6(self.pg1, src=self.pg1.remote_ip6,
+            send_pkts = self.gen_pkts6(p, self.pg1, src=self.pg1.remote_ip6,
                                        dst=p.remote_tun_if_host,
                                        count=1,
                                        payload_size=64)
@@ -1081,8 +1143,9 @@ class IpsecTun6(object):
     def verify_tun_46(self, p, count=1):
         """ ipsec 4o6 tunnel basic test """
         self.vapi.cli("clear errors")
+        self.vapi.cli("clear ipsec sa")
         try:
-            send_pkts = self.gen_encrypt_pkts(p.scapy_tun_sa, self.tun_if,
+            send_pkts = self.gen_encrypt_pkts(p, p.scapy_tun_sa, self.tun_if,
                                               src=p.remote_tun_if_host4,
                                               dst=self.pg1.remote_ip4,
                                               count=count)
@@ -1145,7 +1208,7 @@ class IpsecTun6HandoffTests(IpsecTun6):
         # inject alternately on worker 0 and 1. all counts on the SA
         # should be against worker 0
         for worker in [0, 1, 0, 1]:
-            send_pkts = self.gen_encrypt_pkts6(p.scapy_tun_sa, self.tun_if,
+            send_pkts = self.gen_encrypt_pkts6(p, p.scapy_tun_sa, self.tun_if,
                                                src=p.remote_tun_if_host,
                                                dst=self.pg1.remote_ip6,
                                                count=N_PKTS)
@@ -1153,7 +1216,7 @@ class IpsecTun6HandoffTests(IpsecTun6):
                                              self.pg1, worker=worker)
             self.verify_decrypted6(p, recv_pkts)
 
-            send_pkts = self.gen_pkts6(self.pg1, src=self.pg1.remote_ip6,
+            send_pkts = self.gen_pkts6(p, self.pg1, src=self.pg1.remote_ip6,
                                        dst=p.remote_tun_if_host,
                                        count=N_PKTS)
             recv_pkts = self.send_and_expect(self.pg1, send_pkts,
@@ -1176,7 +1239,7 @@ class IpsecTun4HandoffTests(IpsecTun4):
         # inject alternately on worker 0 and 1. all counts on the SA
         # should be against worker 0
         for worker in [0, 1, 0, 1]:
-            send_pkts = self.gen_encrypt_pkts(p.scapy_tun_sa, self.tun_if,
+            send_pkts = self.gen_encrypt_pkts(p, p.scapy_tun_sa, self.tun_if,
                                               src=p.remote_tun_if_host,
                                               dst=self.pg1.remote_ip4,
                                               count=N_PKTS)