IPSEC: API modernisation
[vpp.git] / test / test_ipsec_nat.py
index 9c22fbb..cdb9cb4 100644 (file)
@@ -6,18 +6,16 @@ from scapy.layers.l2 import Ether
 from scapy.layers.inet import ICMP, IP, TCP, UDP
 from scapy.layers.ipsec import SecurityAssociation, ESP
 from util import ppp, ppc
 from scapy.layers.inet import ICMP, IP, TCP, UDP
 from scapy.layers.ipsec import SecurityAssociation, ESP
 from util import ppp, ppc
-from framework import VppTestCase
+from template_ipsec import TemplateIpsec
+from vpp_ipsec import VppIpsecSA, VppIpsecSpd, VppIpsecSpdEntry,\
+        VppIpsecSpdItfBinding
+from vpp_ip_route import VppIpRoute, VppRoutePath
+from vpp_ip import DpoProto
+from vpp_papi import VppEnum
 
 
 
 
-class IPSecNATTestCase(VppTestCase):
+class IPSecNATTestCase(TemplateIpsec):
     """ IPSec/NAT
     """ IPSec/NAT
-
-    TRANSPORT MODE:
-
-     ---   encrypt   ---
-    |pg2| <-------> |VPP|
-     ---   decrypt   ---
-
     TUNNEL MODE:
 
 
     TUNNEL MODE:
 
 
@@ -31,26 +29,35 @@ class IPSecNATTestCase(VppTestCase):
      ---            ---           ---
     """
 
      ---            ---           ---
     """
 
-    remote_pg0_client_addr = '1.1.1.1'
+    tcp_port_in = 6303
+    tcp_port_out = 6303
+    udp_port_in = 6304
+    udp_port_out = 6304
+    icmp_id_in = 6305
+    icmp_id_out = 6305
+
+    def setUp(self):
+        super(IPSecNATTestCase, self).setUp()
+        self.tun_if = self.pg0
 
 
-    @classmethod
-    def setUpClass(cls):
-        super(IPSecNATTestCase, cls).setUpClass()
-        cls.create_pg_interfaces(range(2))
-        for i in cls.pg_interfaces:
-            i.configure_ipv4_neighbors()
-            i.admin_up()
-            i.config_ip4()
-            i.resolve_arp()
+        self.tun_spd = VppIpsecSpd(self, self.tun_spd_id)
+        self.tun_spd.add_vpp_config()
+        VppIpsecSpdItfBinding(self, self.tun_spd,
+                              self.tun_if).add_vpp_config()
 
 
-        cls.tcp_port_in = 6303
-        cls.tcp_port_out = 6303
-        cls.udp_port_in = 6304
-        cls.udp_port_out = 6304
-        cls.icmp_id_in = 6305
-        cls.icmp_id_out = 6305
-        cls.config_esp_tun()
-        cls.logger.info(cls.vapi.ppcli("show ipsec"))
+        p = self.ipv4_params
+        self.config_esp_tun(p)
+        self.logger.info(self.vapi.ppcli("show ipsec"))
+
+        d = DpoProto.DPO_PROTO_IP6 if p.is_ipv6 else DpoProto.DPO_PROTO_IP4
+        VppIpRoute(self,  p.remote_tun_if_host, p.addr_len,
+                   [VppRoutePath(self.tun_if.remote_addr[p.addr_type],
+                                 0xffffffff,
+                                 proto=d)],
+                   is_ip6=p.is_ipv6).add_vpp_config()
+
+    def tearDown(self):
+        super(IPSecNATTestCase, self).tearDown()
 
     def create_stream_plain(self, src_mac, dst_mac, src_ip, dst_ip):
         return [
 
     def create_stream_plain(self, src_mac, dst_mac, src_ip, dst_ip):
         return [
@@ -84,30 +91,11 @@ class IPSecNATTestCase(VppTestCase):
                        ICMP(id=self.icmp_id_out, type='echo-request'))
         ]
 
                        ICMP(id=self.icmp_id_out, type='echo-request'))
         ]
 
-    def check_checksum(self, pkt, layer):
-        """ Check checksum of the packet on given layer """
-        new = pkt.__class__(str(pkt))
-        del new[layer].chksum
-        new = new.__class__(str(new))
-        self.assertEqual(new[layer].chksum, pkt[layer].chksum)
-
-    def check_ip_checksum(self, pkt):
-        return self.check_checksum(pkt, 'IP')
-
-    def check_tcp_checksum(self, pkt):
-        return self.check_checksum(pkt, 'TCP')
-
-    def check_udp_checksum(self, pkt):
-        return self.check_checksum(pkt, 'UDP')
-
-    def check_icmp_checksum(self, pkt):
-        return self.check_checksum(pkt, 'ICMP')
-
     def verify_capture_plain(self, capture):
         for packet in capture:
             try:
     def verify_capture_plain(self, capture):
         for packet in capture:
             try:
-                self.check_ip_checksum(packet)
-                self.assert_equal(packet[IP].src, self.pg0.remote_ip4,
+                self.assert_packet_checksums_valid(packet)
+                self.assert_equal(packet[IP].src, self.tun_if.remote_ip4,
                                   "decrypted packet source address")
                 self.assert_equal(packet[IP].dst, self.pg1.remote_ip4,
                                   "decrypted packet destination address")
                                   "decrypted packet source address")
                 self.assert_equal(packet[IP].dst, self.pg1.remote_ip4,
                                   "decrypted packet destination address")
@@ -117,7 +105,6 @@ class IPSecNATTestCase(VppTestCase):
                         "unexpected UDP header in decrypted packet")
                     self.assert_equal(packet[TCP].dport, self.tcp_port_in,
                                       "decrypted packet TCP destination port")
                         "unexpected UDP header in decrypted packet")
                     self.assert_equal(packet[TCP].dport, self.tcp_port_in,
                                       "decrypted packet TCP destination port")
-                    self.check_tcp_checksum(packet)
                 elif packet.haslayer(UDP):
                     if packet[UDP].payload:
                         self.assertFalse(
                 elif packet.haslayer(UDP):
                     if packet[UDP].payload:
                         self.assertFalse(
@@ -131,7 +118,6 @@ class IPSecNATTestCase(VppTestCase):
                         "unexpected UDP header in decrypted packet")
                     self.assert_equal(packet[ICMP].id, self.icmp_id_in,
                                       "decrypted packet ICMP ID")
                         "unexpected UDP header in decrypted packet")
                     self.assert_equal(packet[ICMP].id, self.icmp_id_in,
                                       "decrypted packet ICMP ID")
-                    self.check_icmp_checksum(packet)
             except Exception:
                 self.logger.error(
                     ppp("Unexpected or invalid plain packet:", packet))
             except Exception:
                 self.logger.error(
                     ppp("Unexpected or invalid plain packet:", packet))
@@ -140,120 +126,136 @@ class IPSecNATTestCase(VppTestCase):
     def verify_capture_encrypted(self, capture, sa):
         for packet in capture:
             try:
     def verify_capture_encrypted(self, capture, sa):
         for packet in capture:
             try:
+                copy = packet.__class__(str(packet))
+                del copy[UDP].len
+                copy = packet.__class__(str(copy))
+                self.assert_equal(packet[UDP].len, copy[UDP].len,
+                                  "UDP header length")
+                self.assert_packet_checksums_valid(packet)
                 self.assertIn(ESP, packet[IP])
                 decrypt_pkt = sa.decrypt(packet[IP])
                 self.assertIn(ESP, packet[IP])
                 decrypt_pkt = sa.decrypt(packet[IP])
+                self.assert_packet_checksums_valid(decrypt_pkt)
                 self.assert_equal(decrypt_pkt[IP].src, self.pg1.remote_ip4,
                                   "encrypted packet source address")
                 self.assert_equal(decrypt_pkt[IP].src, self.pg1.remote_ip4,
                                   "encrypted packet source address")
-                self.assert_equal(decrypt_pkt[IP].dst, self.pg0.remote_ip4,
+                self.assert_equal(decrypt_pkt[IP].dst, self.tun_if.remote_ip4,
                                   "encrypted packet destination address")
                                   "encrypted packet destination address")
-                # if decrypt_pkt.haslayer(TCP):
-                #     self.tcp_port_out = decrypt_pkt[TCP].sport
-                # elif decrypt_pkt.haslayer(UDP):
-                #     self.udp_port_out = decrypt_pkt[UDP].sport
-                # else:
-                #     self.icmp_id_out = decrypt_pkt[ICMP].id
             except Exception:
                 self.logger.error(
                     ppp("Unexpected or invalid encrypted packet:", packet))
                 raise
 
             except Exception:
                 self.logger.error(
                     ppp("Unexpected or invalid encrypted packet:", packet))
                 raise
 
-    @classmethod
-    def config_esp_tun(cls):
-        spd_id = 1
-        remote_sa_id = 10
-        local_sa_id = 20
-        remote_tun_spi = 1001
-        local_tun_spi = 1000
-        client = socket.inet_pton(socket.AF_INET, cls.remote_pg0_client_addr)
-        cls.vapi.ip_add_del_route(client, 32, cls.pg0.remote_ip4n)
-        cls.vapi.ipsec_sad_add_del_entry(remote_sa_id, remote_tun_spi,
-                                         cls.pg1.remote_ip4n,
-                                         cls.pg0.remote_ip4n,
-                                         integrity_key_length=20,
-                                         crypto_key_length=16,
-                                         protocol=1, udp_encap=1)
-        cls.vapi.ipsec_sad_add_del_entry(local_sa_id, local_tun_spi,
-                                         cls.pg0.remote_ip4n,
-                                         cls.pg1.remote_ip4n,
-                                         integrity_key_length=20,
-                                         crypto_key_length=16,
-                                         protocol=1, udp_encap=1)
-        cls.vapi.ipsec_spd_add_del(spd_id)
-        cls.vapi.ipsec_interface_add_del_spd(spd_id, cls.pg0.sw_if_index)
-        l_startaddr = r_startaddr = socket.inet_pton(socket.AF_INET,
-                                                     "0.0.0.0")
-        l_stopaddr = r_stopaddr = socket.inet_pton(socket.AF_INET,
-                                                   "255.255.255.255")
-        cls.vapi.ipsec_spd_add_del_entry(spd_id, l_startaddr, l_stopaddr,
-                                         r_startaddr, r_stopaddr,
-                                         protocol=socket.IPPROTO_ESP)
-        cls.vapi.ipsec_spd_add_del_entry(spd_id, l_startaddr, l_stopaddr,
-                                         r_startaddr, r_stopaddr,
-                                         protocol=socket.IPPROTO_ESP,
-                                         is_outbound=0)
-        cls.vapi.ipsec_spd_add_del_entry(spd_id, l_startaddr, l_stopaddr,
-                                         r_startaddr, r_stopaddr,
-                                         remote_port_start=4500,
-                                         remote_port_stop=4500,
-                                         protocol=socket.IPPROTO_UDP)
-        cls.vapi.ipsec_spd_add_del_entry(spd_id, l_startaddr, l_stopaddr,
-                                         r_startaddr, r_stopaddr,
-                                         remote_port_start=4500,
-                                         remote_port_stop=4500,
-                                         protocol=socket.IPPROTO_UDP,
-                                         is_outbound=0)
-        l_startaddr = l_stopaddr = cls.pg0.remote_ip4n
-        r_startaddr = r_stopaddr = cls.pg1.remote_ip4n
-        cls.vapi.ipsec_spd_add_del_entry(spd_id, l_startaddr, l_stopaddr,
-                                         r_startaddr, r_stopaddr,
-                                         priority=10, policy=3,
-                                         is_outbound=0, sa_id=local_sa_id)
-        cls.vapi.ipsec_spd_add_del_entry(spd_id, r_startaddr, r_stopaddr,
-                                         l_startaddr, l_stopaddr,
-                                         priority=10, policy=3,
-                                         sa_id=remote_sa_id)
+    def config_esp_tun(self, params):
+        addr_type = params.addr_type
+        scapy_tun_sa_id = params.scapy_tun_sa_id
+        scapy_tun_spi = params.scapy_tun_spi
+        vpp_tun_sa_id = params.vpp_tun_sa_id
+        vpp_tun_spi = params.vpp_tun_spi
+        auth_algo_vpp_id = params.auth_algo_vpp_id
+        auth_key = params.auth_key
+        crypt_algo_vpp_id = params.crypt_algo_vpp_id
+        crypt_key = params.crypt_key
+        addr_any = params.addr_any
+        addr_bcast = params.addr_bcast
+        flags = (VppEnum.vl_api_ipsec_sad_flags_t.
+                 IPSEC_API_SAD_FLAG_UDP_ENCAP)
+        e = VppEnum.vl_api_ipsec_spd_action_t
+
+        VppIpsecSA(self, scapy_tun_sa_id, scapy_tun_spi,
+                   auth_algo_vpp_id, auth_key,
+                   crypt_algo_vpp_id, crypt_key,
+                   self.vpp_esp_protocol,
+                   self.pg1.remote_addr[addr_type],
+                   self.tun_if.remote_addr[addr_type],
+                   flags=flags).add_vpp_config()
+        VppIpsecSA(self, vpp_tun_sa_id, vpp_tun_spi,
+                   auth_algo_vpp_id, auth_key,
+                   crypt_algo_vpp_id, crypt_key,
+                   self.vpp_esp_protocol,
+                   self.tun_if.remote_addr[addr_type],
+                   self.pg1.remote_addr[addr_type],
+                   flags=flags).add_vpp_config()
+
+        VppIpsecSpdEntry(self, self.tun_spd, scapy_tun_sa_id,
+                         addr_any, addr_bcast,
+                         addr_any, addr_bcast,
+                         socket.IPPROTO_ESP).add_vpp_config()
+        VppIpsecSpdEntry(self, self.tun_spd, scapy_tun_sa_id,
+                         addr_any, addr_bcast,
+                         addr_any, addr_bcast,
+                         socket.IPPROTO_ESP,
+                         is_outbound=0).add_vpp_config()
+        VppIpsecSpdEntry(self, self.tun_spd, scapy_tun_sa_id,
+                         addr_any, addr_bcast,
+                         addr_any, addr_bcast,
+                         socket.IPPROTO_UDP,
+                         remote_port_start=4500,
+                         remote_port_stop=4500).add_vpp_config()
+        VppIpsecSpdEntry(self, self.tun_spd, scapy_tun_sa_id,
+                         addr_any, addr_bcast,
+                         addr_any, addr_bcast,
+                         socket.IPPROTO_UDP,
+                         remote_port_start=4500,
+                         remote_port_stop=4500,
+                         is_outbound=0).add_vpp_config()
+        VppIpsecSpdEntry(self, self.tun_spd, vpp_tun_sa_id,
+                         self.tun_if.remote_addr[addr_type],
+                         self.tun_if.remote_addr[addr_type],
+                         self.pg1.remote_addr[addr_type],
+                         self.pg1.remote_addr[addr_type],
+                         0, priority=10,
+                         policy=e.IPSEC_API_SPD_ACTION_PROTECT,
+                         is_outbound=0).add_vpp_config()
+        VppIpsecSpdEntry(self, self.tun_spd, scapy_tun_sa_id,
+                         self.pg1.remote_addr[addr_type],
+                         self.pg1.remote_addr[addr_type],
+                         self.tun_if.remote_addr[addr_type],
+                         self.tun_if.remote_addr[addr_type],
+                         0, policy=e.IPSEC_API_SPD_ACTION_PROTECT,
+                         priority=10).add_vpp_config()
 
     def test_ipsec_nat_tun(self):
         """ IPSec/NAT tunnel test case """
 
     def test_ipsec_nat_tun(self):
         """ IPSec/NAT tunnel test case """
-        local_tun_sa = SecurityAssociation(ESP, spi=0x000003e9,
-                                           crypt_algo='AES-CBC',
-                                           crypt_key='JPjyOWBeVEQiMe7h',
-                                           auth_algo='HMAC-SHA1-96',
-                                           auth_key='C91KUR9GYMm5GfkEvNjX',
+        p = self.ipv4_params
+        scapy_tun_sa = SecurityAssociation(ESP, spi=p.scapy_tun_spi,
+                                           crypt_algo=p.crypt_algo,
+                                           crypt_key=p.crypt_key,
+                                           auth_algo=p.auth_algo,
+                                           auth_key=p.auth_key,
                                            tunnel_header=IP(
                                                src=self.pg1.remote_ip4,
                                            tunnel_header=IP(
                                                src=self.pg1.remote_ip4,
-                                               dst=self.pg0.remote_ip4),
+                                               dst=self.tun_if.remote_ip4),
                                            nat_t_header=UDP(
                                                sport=4500,
                                                dport=4500))
         # in2out - from private network to public
         pkts = self.create_stream_plain(
             self.pg1.remote_mac, self.pg1.local_mac,
                                            nat_t_header=UDP(
                                                sport=4500,
                                                dport=4500))
         # in2out - from private network to public
         pkts = self.create_stream_plain(
             self.pg1.remote_mac, self.pg1.local_mac,
-            self.pg1.remote_ip4, self.pg0.remote_ip4)
+            self.pg1.remote_ip4, self.tun_if.remote_ip4)
         self.pg1.add_stream(pkts)
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
         self.pg1.add_stream(pkts)
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
-        capture = self.pg0.get_capture(len(pkts))
-        self.verify_capture_encrypted(capture, local_tun_sa)
+        capture = self.tun_if.get_capture(len(pkts))
+        self.verify_capture_encrypted(capture, scapy_tun_sa)
 
 
-        remote_tun_sa = SecurityAssociation(ESP, spi=0x000003e8,
-                                            crypt_algo='AES-CBC',
-                                            crypt_key='JPjyOWBeVEQiMe7h',
-                                            auth_algo='HMAC-SHA1-96',
-                                            auth_key='C91KUR9GYMm5GfkEvNjX',
-                                            tunnel_header=IP(
-                                                src=self.pg0.remote_ip4,
-                                                dst=self.pg1.remote_ip4),
-                                            nat_t_header=UDP(
-                                                sport=4500,
-                                                dport=4500))
+        vpp_tun_sa = SecurityAssociation(ESP,
+                                         spi=p.vpp_tun_spi,
+                                         crypt_algo=p.crypt_algo,
+                                         crypt_key=p.crypt_key,
+                                         auth_algo=p.auth_algo,
+                                         auth_key=p.auth_key,
+                                         tunnel_header=IP(
+                                             src=self.tun_if.remote_ip4,
+                                             dst=self.pg1.remote_ip4),
+                                         nat_t_header=UDP(
+                                             sport=4500,
+                                             dport=4500))
 
         # out2in - from public network to private
         pkts = self.create_stream_encrypted(
 
         # out2in - from public network to private
         pkts = self.create_stream_encrypted(
-            self.pg0.remote_mac, self.pg0.local_mac,
-            self.pg0.remote_ip4, self.pg1.remote_ip4, remote_tun_sa)
+            self.tun_if.remote_mac, self.tun_if.local_mac,
+            self.tun_if.remote_ip4, self.pg1.remote_ip4, vpp_tun_sa)
         self.logger.info(ppc("Sending packets:", pkts))
         self.logger.info(ppc("Sending packets:", pkts))
-        self.pg0.add_stream(pkts)
+        self.tun_if.add_stream(pkts)
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
         capture = self.pg1.get_capture(len(pkts))
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
         capture = self.pg1.get_capture(len(pkts))