tests: Use errno value rather than a specific int
[vpp.git] / test / test_ipsec_nat.py
index 7a5aca6..881b5a7 100644 (file)
-#!/usr/bin/env python
+#!/usr/bin/env python3
 
 import socket
 
 
 import socket
 
+import scapy.compat
 from scapy.layers.l2 import Ether
 from scapy.layers.inet import ICMP, IP, TCP, UDP
 from scapy.layers.ipsec import SecurityAssociation, ESP
 from scapy.layers.l2 import Ether
 from scapy.layers.inet import ICMP, IP, TCP, UDP
 from scapy.layers.ipsec import SecurityAssociation, ESP
-from util import ppp, ppc
-from framework import VppTestCase
-
 
 
-class IPSecNATTestCase(VppTestCase):
-    """ IPSec/NAT
+from util import ppp, ppc
+from template_ipsec import TemplateIpsec
+from vpp_ipsec import VppIpsecSA, VppIpsecSpd, VppIpsecSpdEntry, VppIpsecSpdItfBinding
+from vpp_ip_route import VppIpRoute, VppRoutePath
+from vpp_ip import DpoProto
+from vpp_papi import VppEnum
 
 
-    TRANSPORT MODE:
 
 
-     ---   encrypt   ---
-    |pg2| <-------> |VPP|
-     ---   decrypt   ---
+class IPSecNATTestCase(TemplateIpsec):
+    """IPSec/NAT
 
 
-    TUNNEL MODE:
+    TUNNEL MODE::
 
 
+         public network  |   private network
+         ---   encrypt  ---   plain   ---
+        |pg0| <------- |VPP| <------ |pg1|
+         ---            ---           ---
 
 
-     public network  |   private network
-     ---   encrypt  ---   plain   ---
-    |pg0| <------- |VPP| <------ |pg1|
-     ---            ---           ---
+         ---   decrypt  ---   plain   ---
+        |pg0| -------> |VPP| ------> |pg1|
+         ---            ---           ---
 
 
-     ---   decrypt  ---   plain   ---
-    |pg0| -------> |VPP| ------> |pg1|
-     ---            ---           ---
     """
 
     """
 
-    remote_pg0_client_addr = '1.1.1.1'
+    tcp_port_in = 6303
+    tcp_port_out = 6303
+    udp_port_in = 6304
+    udp_port_out = 6304
+    icmp_id_in = 6305
+    icmp_id_out = 6305
 
     @classmethod
     def setUpClass(cls):
         super(IPSecNATTestCase, cls).setUpClass()
 
     @classmethod
     def setUpClass(cls):
         super(IPSecNATTestCase, cls).setUpClass()
-        cls.create_pg_interfaces(range(2))
-        for i in cls.pg_interfaces:
-            i.configure_ipv4_neighbors()
-            i.admin_up()
-            i.config_ip4()
-            i.resolve_arp()
-
-        cls.tcp_port_in = 6303
-        cls.tcp_port_out = 6303
-        cls.udp_port_in = 6304
-        cls.udp_port_out = 6304
-        cls.icmp_id_in = 6305
-        cls.icmp_id_out = 6305
-        cls.config_esp_tun()
-        cls.logger.info(cls.vapi.ppcli("show ipsec"))
+
+    @classmethod
+    def tearDownClass(cls):
+        super(IPSecNATTestCase, cls).tearDownClass()
+
+    def setUp(self):
+        super(IPSecNATTestCase, self).setUp()
+        self.tun_if = self.pg0
+
+        self.tun_spd = VppIpsecSpd(self, self.tun_spd_id)
+        self.tun_spd.add_vpp_config()
+        VppIpsecSpdItfBinding(self, self.tun_spd, self.tun_if).add_vpp_config()
+
+        p = self.ipv4_params
+        self.config_esp_tun(p)
+        self.logger.info(self.vapi.ppcli("show ipsec all"))
+
+        d = DpoProto.DPO_PROTO_IP6 if p.is_ipv6 else DpoProto.DPO_PROTO_IP4
+        VppIpRoute(
+            self,
+            p.remote_tun_if_host,
+            p.addr_len,
+            [VppRoutePath(self.tun_if.remote_addr[p.addr_type], 0xFFFFFFFF, proto=d)],
+        ).add_vpp_config()
+
+    def tearDown(self):
+        super(IPSecNATTestCase, self).tearDown()
 
     def create_stream_plain(self, src_mac, dst_mac, src_ip, dst_ip):
         return [
             # TCP
 
     def create_stream_plain(self, src_mac, dst_mac, src_ip, dst_ip):
         return [
             # TCP
-            Ether(src=src_mac, dst=dst_mac) /
-            IP(src=src_ip, dst=dst_ip) /
-            TCP(sport=self.tcp_port_in, dport=20),
+            Ether(src=src_mac, dst=dst_mac)
+            / IP(src=src_ip, dst=dst_ip)
+            TCP(sport=self.tcp_port_in, dport=20),
             # UDP
             # UDP
-            Ether(src=src_mac, dst=dst_mac) /
-            IP(src=src_ip, dst=dst_ip) /
-            UDP(sport=self.udp_port_in, dport=20),
+            Ether(src=src_mac, dst=dst_mac)
+            / IP(src=src_ip, dst=dst_ip)
+            UDP(sport=self.udp_port_in, dport=20),
             # ICMP
             # ICMP
-            Ether(src=src_mac, dst=dst_mac) /
-            IP(src=src_ip, dst=dst_ip) /
-            ICMP(id=self.icmp_id_in, type='echo-request')
+            Ether(src=src_mac, dst=dst_mac)
+            / IP(src=src_ip, dst=dst_ip)
+            / ICMP(id=self.icmp_id_in, type="echo-request"),
         ]
 
     def create_stream_encrypted(self, src_mac, dst_mac, src_ip, dst_ip, sa):
         return [
             # TCP
         ]
 
     def create_stream_encrypted(self, src_mac, dst_mac, src_ip, dst_ip, sa):
         return [
             # TCP
-            Ether(src=src_mac, dst=dst_mac) /
-            sa.encrypt(IP(src=src_ip, dst=dst_ip) /
-                       TCP(dport=self.tcp_port_out, sport=20)),
+            Ether(src=src_mac, dst=dst_mac)
+            / sa.encrypt(
+                IP(src=src_ip, dst=dst_ip) / TCP(dport=self.tcp_port_out, sport=20)
+            ),
             # UDP
             # UDP
-            Ether(src=src_mac, dst=dst_mac) /
-            sa.encrypt(IP(src=src_ip, dst=dst_ip) /
-                       UDP(dport=self.udp_port_out, sport=20)),
+            Ether(src=src_mac, dst=dst_mac)
+            / sa.encrypt(
+                IP(src=src_ip, dst=dst_ip) / UDP(dport=self.udp_port_out, sport=20)
+            ),
             # ICMP
             # ICMP
-            Ether(src=src_mac, dst=dst_mac) /
-            sa.encrypt(IP(src=src_ip, dst=dst_ip) /
-                       ICMP(id=self.icmp_id_out, type='echo-request'))
+            Ether(src=src_mac, dst=dst_mac)
+            / sa.encrypt(
+                IP(src=src_ip, dst=dst_ip)
+                / ICMP(id=self.icmp_id_out, type="echo-request")
+            ),
         ]
 
     def verify_capture_plain(self, capture):
         for packet in capture:
             try:
                 self.assert_packet_checksums_valid(packet)
         ]
 
     def verify_capture_plain(self, capture):
         for packet in capture:
             try:
                 self.assert_packet_checksums_valid(packet)
-                self.assert_equal(packet[IP].src, self.pg0.remote_ip4,
-                                  "decrypted packet source address")
-                self.assert_equal(packet[IP].dst, self.pg1.remote_ip4,
-                                  "decrypted packet destination address")
+                self.assert_equal(
+                    packet[IP].src,
+                    self.tun_if.remote_ip4,
+                    "decrypted packet source address",
+                )
+                self.assert_equal(
+                    packet[IP].dst,
+                    self.pg1.remote_ip4,
+                    "decrypted packet destination address",
+                )
                 if packet.haslayer(TCP):
                     self.assertFalse(
                         packet.haslayer(UDP),
                 if packet.haslayer(TCP):
                     self.assertFalse(
                         packet.haslayer(UDP),
-                        "unexpected UDP header in decrypted packet")
-                    self.assert_equal(packet[TCP].dport, self.tcp_port_in,
-                                      "decrypted packet TCP destination port")
+                        "unexpected UDP header in decrypted packet",
+                    )
+                    self.assert_equal(
+                        packet[TCP].dport,
+                        self.tcp_port_in,
+                        "decrypted packet TCP destination port",
+                    )
                 elif packet.haslayer(UDP):
                     if packet[UDP].payload:
                         self.assertFalse(
                             packet[UDP][1].haslayer(UDP),
                 elif packet.haslayer(UDP):
                     if packet[UDP].payload:
                         self.assertFalse(
                             packet[UDP][1].haslayer(UDP),
-                            "unexpected UDP header in decrypted packet")
-                    self.assert_equal(packet[UDP].dport, self.udp_port_in,
-                                      "decrypted packet UDP destination port")
+                            "unexpected UDP header in decrypted packet",
+                        )
+                    self.assert_equal(
+                        packet[UDP].dport,
+                        self.udp_port_in,
+                        "decrypted packet UDP destination port",
+                    )
                 else:
                     self.assertFalse(
                         packet.haslayer(UDP),
                 else:
                     self.assertFalse(
                         packet.haslayer(UDP),
-                        "unexpected UDP header in decrypted packet")
-                    self.assert_equal(packet[ICMP].id, self.icmp_id_in,
-                                      "decrypted packet ICMP ID")
+                        "unexpected UDP header in decrypted packet",
+                    )
+                    self.assert_equal(
+                        packet[ICMP].id, self.icmp_id_in, "decrypted packet ICMP ID"
+                    )
             except Exception:
             except Exception:
-                self.logger.error(
-                    ppp("Unexpected or invalid plain packet:", packet))
+                self.logger.error(ppp("Unexpected or invalid plain packet:", packet))
                 raise
 
     def verify_capture_encrypted(self, capture, sa):
         for packet in capture:
             try:
                 raise
 
     def verify_capture_encrypted(self, capture, sa):
         for packet in capture:
             try:
+                copy = packet.__class__(scapy.compat.raw(packet))
+                del copy[UDP].len
+                copy = packet.__class__(scapy.compat.raw(copy))
+                self.assert_equal(packet[UDP].len, copy[UDP].len, "UDP header length")
+                self.assert_packet_checksums_valid(packet)
                 self.assertIn(ESP, packet[IP])
                 decrypt_pkt = sa.decrypt(packet[IP])
                 self.assertIn(ESP, packet[IP])
                 decrypt_pkt = sa.decrypt(packet[IP])
-                self.assert_equal(decrypt_pkt[IP].src, self.pg1.remote_ip4,
-                                  "encrypted packet source address")
-                self.assert_equal(decrypt_pkt[IP].dst, self.pg0.remote_ip4,
-                                  "encrypted packet destination address")
-                # if decrypt_pkt.haslayer(TCP):
-                #     self.tcp_port_out = decrypt_pkt[TCP].sport
-                # elif decrypt_pkt.haslayer(UDP):
-                #     self.udp_port_out = decrypt_pkt[UDP].sport
-                # else:
-                #     self.icmp_id_out = decrypt_pkt[ICMP].id
+                self.assert_packet_checksums_valid(decrypt_pkt)
+                self.assert_equal(
+                    decrypt_pkt[IP].src,
+                    self.pg1.remote_ip4,
+                    "encrypted packet source address",
+                )
+                self.assert_equal(
+                    decrypt_pkt[IP].dst,
+                    self.tun_if.remote_ip4,
+                    "encrypted packet destination address",
+                )
             except Exception:
                 self.logger.error(
             except Exception:
                 self.logger.error(
-                    ppp("Unexpected or invalid encrypted packet:", packet))
+                    ppp("Unexpected or invalid encrypted packet:", packet)
+                )
                 raise
 
                 raise
 
-    @classmethod
-    def config_esp_tun(cls):
-        spd_id = 1
-        remote_sa_id = 10
-        local_sa_id = 20
-        scapy_tun_spi = 1001
-        vpp_tun_spi = 1000
-        client = socket.inet_pton(socket.AF_INET, cls.remote_pg0_client_addr)
-        cls.vapi.ip_add_del_route(client, 32, cls.pg0.remote_ip4n)
-        cls.vapi.ipsec_sad_add_del_entry(remote_sa_id, scapy_tun_spi,
-                                         cls.pg1.remote_ip4n,
-                                         cls.pg0.remote_ip4n,
-                                         integrity_key_length=20,
-                                         crypto_key_length=16,
-                                         protocol=1, udp_encap=1)
-        cls.vapi.ipsec_sad_add_del_entry(local_sa_id, vpp_tun_spi,
-                                         cls.pg0.remote_ip4n,
-                                         cls.pg1.remote_ip4n,
-                                         integrity_key_length=20,
-                                         crypto_key_length=16,
-                                         protocol=1, udp_encap=1)
-        cls.vapi.ipsec_spd_add_del(spd_id)
-        cls.vapi.ipsec_interface_add_del_spd(spd_id, cls.pg0.sw_if_index)
-        l_startaddr = r_startaddr = socket.inet_pton(socket.AF_INET,
-                                                     "0.0.0.0")
-        l_stopaddr = r_stopaddr = socket.inet_pton(socket.AF_INET,
-                                                   "255.255.255.255")
-        cls.vapi.ipsec_spd_add_del_entry(spd_id, l_startaddr, l_stopaddr,
-                                         r_startaddr, r_stopaddr,
-                                         protocol=socket.IPPROTO_ESP)
-        cls.vapi.ipsec_spd_add_del_entry(spd_id, l_startaddr, l_stopaddr,
-                                         r_startaddr, r_stopaddr,
-                                         protocol=socket.IPPROTO_ESP,
-                                         is_outbound=0)
-        cls.vapi.ipsec_spd_add_del_entry(spd_id, l_startaddr, l_stopaddr,
-                                         r_startaddr, r_stopaddr,
-                                         remote_port_start=4500,
-                                         remote_port_stop=4500,
-                                         protocol=socket.IPPROTO_UDP)
-        cls.vapi.ipsec_spd_add_del_entry(spd_id, l_startaddr, l_stopaddr,
-                                         r_startaddr, r_stopaddr,
-                                         remote_port_start=4500,
-                                         remote_port_stop=4500,
-                                         protocol=socket.IPPROTO_UDP,
-                                         is_outbound=0)
-        l_startaddr = l_stopaddr = cls.pg0.remote_ip4n
-        r_startaddr = r_stopaddr = cls.pg1.remote_ip4n
-        cls.vapi.ipsec_spd_add_del_entry(spd_id, l_startaddr, l_stopaddr,
-                                         r_startaddr, r_stopaddr,
-                                         priority=10, policy=3,
-                                         is_outbound=0, sa_id=local_sa_id)
-        cls.vapi.ipsec_spd_add_del_entry(spd_id, r_startaddr, r_stopaddr,
-                                         l_startaddr, l_stopaddr,
-                                         priority=10, policy=3,
-                                         sa_id=remote_sa_id)
+    def config_esp_tun(self, params):
+        addr_type = params.addr_type
+        scapy_tun_sa_id = params.scapy_tun_sa_id
+        scapy_tun_spi = params.scapy_tun_spi
+        vpp_tun_sa_id = params.vpp_tun_sa_id
+        vpp_tun_spi = params.vpp_tun_spi
+        auth_algo_vpp_id = params.auth_algo_vpp_id
+        auth_key = params.auth_key
+        crypt_algo_vpp_id = params.crypt_algo_vpp_id
+        crypt_key = params.crypt_key
+        addr_any = params.addr_any
+        addr_bcast = params.addr_bcast
+        flags = VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_UDP_ENCAP
+        e = VppEnum.vl_api_ipsec_spd_action_t
+
+        VppIpsecSA(
+            self,
+            scapy_tun_sa_id,
+            scapy_tun_spi,
+            auth_algo_vpp_id,
+            auth_key,
+            crypt_algo_vpp_id,
+            crypt_key,
+            self.vpp_esp_protocol,
+            self.pg1.remote_addr[addr_type],
+            self.tun_if.remote_addr[addr_type],
+            flags=flags,
+        ).add_vpp_config()
+        VppIpsecSA(
+            self,
+            vpp_tun_sa_id,
+            vpp_tun_spi,
+            auth_algo_vpp_id,
+            auth_key,
+            crypt_algo_vpp_id,
+            crypt_key,
+            self.vpp_esp_protocol,
+            self.tun_if.remote_addr[addr_type],
+            self.pg1.remote_addr[addr_type],
+            flags=flags,
+        ).add_vpp_config()
+
+        VppIpsecSpdEntry(
+            self,
+            self.tun_spd,
+            scapy_tun_sa_id,
+            addr_any,
+            addr_bcast,
+            addr_any,
+            addr_bcast,
+            socket.IPPROTO_ESP,
+        ).add_vpp_config()
+        VppIpsecSpdEntry(
+            self,
+            self.tun_spd,
+            scapy_tun_sa_id,
+            addr_any,
+            addr_bcast,
+            addr_any,
+            addr_bcast,
+            socket.IPPROTO_ESP,
+            is_outbound=0,
+        ).add_vpp_config()
+        VppIpsecSpdEntry(
+            self,
+            self.tun_spd,
+            scapy_tun_sa_id,
+            addr_any,
+            addr_bcast,
+            addr_any,
+            addr_bcast,
+            socket.IPPROTO_UDP,
+            remote_port_start=4500,
+            remote_port_stop=4500,
+        ).add_vpp_config()
+        VppIpsecSpdEntry(
+            self,
+            self.tun_spd,
+            scapy_tun_sa_id,
+            addr_any,
+            addr_bcast,
+            addr_any,
+            addr_bcast,
+            socket.IPPROTO_UDP,
+            remote_port_start=4500,
+            remote_port_stop=4500,
+            is_outbound=0,
+        ).add_vpp_config()
+        VppIpsecSpdEntry(
+            self,
+            self.tun_spd,
+            vpp_tun_sa_id,
+            self.tun_if.remote_addr[addr_type],
+            self.tun_if.remote_addr[addr_type],
+            self.pg1.remote_addr[addr_type],
+            self.pg1.remote_addr[addr_type],
+            socket.IPPROTO_RAW,
+            priority=10,
+            policy=e.IPSEC_API_SPD_ACTION_PROTECT,
+            is_outbound=0,
+        ).add_vpp_config()
+        VppIpsecSpdEntry(
+            self,
+            self.tun_spd,
+            scapy_tun_sa_id,
+            self.pg1.remote_addr[addr_type],
+            self.pg1.remote_addr[addr_type],
+            self.tun_if.remote_addr[addr_type],
+            self.tun_if.remote_addr[addr_type],
+            socket.IPPROTO_RAW,
+            policy=e.IPSEC_API_SPD_ACTION_PROTECT,
+            priority=10,
+        ).add_vpp_config()
 
     def test_ipsec_nat_tun(self):
 
     def test_ipsec_nat_tun(self):
-        """ IPSec/NAT tunnel test case """
-        local_tun_sa = SecurityAssociation(ESP, spi=0x000003e9,
-                                           crypt_algo='AES-CBC',
-                                           crypt_key='JPjyOWBeVEQiMe7h',
-                                           auth_algo='HMAC-SHA1-96',
-                                           auth_key='C91KUR9GYMm5GfkEvNjX',
-                                           tunnel_header=IP(
-                                               src=self.pg1.remote_ip4,
-                                               dst=self.pg0.remote_ip4),
-                                           nat_t_header=UDP(
-                                               sport=4500,
-                                               dport=4500))
+        """IPSec/NAT tunnel test case"""
+        p = self.ipv4_params
+        scapy_tun_sa = SecurityAssociation(
+            ESP,
+            spi=p.scapy_tun_spi,
+            crypt_algo=p.crypt_algo,
+            crypt_key=p.crypt_key,
+            auth_algo=p.auth_algo,
+            auth_key=p.auth_key,
+            tunnel_header=IP(src=self.pg1.remote_ip4, dst=self.tun_if.remote_ip4),
+            nat_t_header=UDP(sport=4500, dport=4500),
+        )
         # in2out - from private network to public
         pkts = self.create_stream_plain(
         # in2out - from private network to public
         pkts = self.create_stream_plain(
-            self.pg1.remote_mac, self.pg1.local_mac,
-            self.pg1.remote_ip4, self.pg0.remote_ip4)
+            self.pg1.remote_mac,
+            self.pg1.local_mac,
+            self.pg1.remote_ip4,
+            self.tun_if.remote_ip4,
+        )
         self.pg1.add_stream(pkts)
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
         self.pg1.add_stream(pkts)
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
-        capture = self.pg0.get_capture(len(pkts))
-        self.verify_capture_encrypted(capture, local_tun_sa)
-
-        remote_tun_sa = SecurityAssociation(ESP, spi=0x000003e8,
-                                            crypt_algo='AES-CBC',
-                                            crypt_key='JPjyOWBeVEQiMe7h',
-                                            auth_algo='HMAC-SHA1-96',
-                                            auth_key='C91KUR9GYMm5GfkEvNjX',
-                                            tunnel_header=IP(
-                                                src=self.pg0.remote_ip4,
-                                                dst=self.pg1.remote_ip4),
-                                            nat_t_header=UDP(
-                                                sport=4500,
-                                                dport=4500))
+        capture = self.tun_if.get_capture(len(pkts))
+        self.verify_capture_encrypted(capture, scapy_tun_sa)
+
+        vpp_tun_sa = SecurityAssociation(
+            ESP,
+            spi=p.vpp_tun_spi,
+            crypt_algo=p.crypt_algo,
+            crypt_key=p.crypt_key,
+            auth_algo=p.auth_algo,
+            auth_key=p.auth_key,
+            tunnel_header=IP(src=self.tun_if.remote_ip4, dst=self.pg1.remote_ip4),
+            nat_t_header=UDP(sport=4500, dport=4500),
+        )
 
         # out2in - from public network to private
         pkts = self.create_stream_encrypted(
 
         # out2in - from public network to private
         pkts = self.create_stream_encrypted(
-            self.pg0.remote_mac, self.pg0.local_mac,
-            self.pg0.remote_ip4, self.pg1.remote_ip4, remote_tun_sa)
+            self.tun_if.remote_mac,
+            self.tun_if.local_mac,
+            self.tun_if.remote_ip4,
+            self.pg1.remote_ip4,
+            vpp_tun_sa,
+        )
         self.logger.info(ppc("Sending packets:", pkts))
         self.logger.info(ppc("Sending packets:", pkts))
-        self.pg0.add_stream(pkts)
+        self.tun_if.add_stream(pkts)
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
         capture = self.pg1.get_capture(len(pkts))
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
         capture = self.pg1.get_capture(len(pkts))