tests: replace pycodestyle with black
[vpp.git] / test / test_ipsec_nat.py
index cebbfc8..64a2725 100644 (file)
-#!/usr/bin/env python
+#!/usr/bin/env python3
 
 import socket
 
+import scapy.compat
 from scapy.layers.l2 import Ether
 from scapy.layers.inet import ICMP, IP, TCP, UDP
 from scapy.layers.ipsec import SecurityAssociation, ESP
-from util import ppp, ppc
-from framework import VppTestCase
-
 
-class IPSecNATTestCase(VppTestCase):
-    """ IPSec/NAT
+from util import ppp, ppc
+from template_ipsec import TemplateIpsec
+from vpp_ipsec import VppIpsecSA, VppIpsecSpd, VppIpsecSpdEntry, VppIpsecSpdItfBinding
+from vpp_ip_route import VppIpRoute, VppRoutePath
+from vpp_ip import DpoProto
+from vpp_papi import VppEnum
 
-    TRANSPORT MODE:
 
-     ---   encrypt   ---
-    |pg2| <-------> |VPP|
-     ---   decrypt   ---
+class IPSecNATTestCase(TemplateIpsec):
+    """IPSec/NAT
 
-    TUNNEL MODE:
+    TUNNEL MODE::
 
+         public network  |   private network
+         ---   encrypt  ---   plain   ---
+        |pg0| <------- |VPP| <------ |pg1|
+         ---            ---           ---
 
-     public network  |   private network
-     ---   encrypt  ---   plain   ---
-    |pg0| <------- |VPP| <------ |pg1|
-     ---            ---           ---
+         ---   decrypt  ---   plain   ---
+        |pg0| -------> |VPP| ------> |pg1|
+         ---            ---           ---
 
-     ---   decrypt  ---   plain   ---
-    |pg0| -------> |VPP| ------> |pg1|
-     ---            ---           ---
     """
 
-    remote_pg0_client_addr = '1.1.1.1'
+    tcp_port_in = 6303
+    tcp_port_out = 6303
+    udp_port_in = 6304
+    udp_port_out = 6304
+    icmp_id_in = 6305
+    icmp_id_out = 6305
 
     @classmethod
     def setUpClass(cls):
         super(IPSecNATTestCase, cls).setUpClass()
-        cls.create_pg_interfaces(range(2))
-        for i in cls.pg_interfaces:
-            i.configure_ipv4_neighbors()
-            i.admin_up()
-            i.config_ip4()
-            i.resolve_arp()
-
-        cls.tcp_port_in = 6303
-        cls.tcp_port_out = 6303
-        cls.udp_port_in = 6304
-        cls.udp_port_out = 6304
-        cls.icmp_id_in = 6305
-        cls.icmp_id_out = 6305
-        cls.config_esp_tun()
-        cls.logger.info(cls.vapi.ppcli("show ipsec"))
+
+    @classmethod
+    def tearDownClass(cls):
+        super(IPSecNATTestCase, cls).tearDownClass()
+
+    def setUp(self):
+        super(IPSecNATTestCase, self).setUp()
+        self.tun_if = self.pg0
+
+        self.tun_spd = VppIpsecSpd(self, self.tun_spd_id)
+        self.tun_spd.add_vpp_config()
+        VppIpsecSpdItfBinding(self, self.tun_spd, self.tun_if).add_vpp_config()
+
+        p = self.ipv4_params
+        self.config_esp_tun(p)
+        self.logger.info(self.vapi.ppcli("show ipsec all"))
+
+        d = DpoProto.DPO_PROTO_IP6 if p.is_ipv6 else DpoProto.DPO_PROTO_IP4
+        VppIpRoute(
+            self,
+            p.remote_tun_if_host,
+            p.addr_len,
+            [VppRoutePath(self.tun_if.remote_addr[p.addr_type], 0xFFFFFFFF, proto=d)],
+        ).add_vpp_config()
+
+    def tearDown(self):
+        super(IPSecNATTestCase, self).tearDown()
 
     def create_stream_plain(self, src_mac, dst_mac, src_ip, dst_ip):
         return [
             # TCP
-            Ether(src=src_mac, dst=dst_mac) /
-            IP(src=src_ip, dst=dst_ip) /
-            TCP(sport=self.tcp_port_in, dport=20),
+            Ether(src=src_mac, dst=dst_mac)
+            / IP(src=src_ip, dst=dst_ip)
+            TCP(sport=self.tcp_port_in, dport=20),
             # UDP
-            Ether(src=src_mac, dst=dst_mac) /
-            IP(src=src_ip, dst=dst_ip) /
-            UDP(sport=self.udp_port_in, dport=20),
+            Ether(src=src_mac, dst=dst_mac)
+            / IP(src=src_ip, dst=dst_ip)
+            UDP(sport=self.udp_port_in, dport=20),
             # ICMP
-            Ether(src=src_mac, dst=dst_mac) /
-            IP(src=src_ip, dst=dst_ip) /
-            ICMP(id=self.icmp_id_in, type='echo-request')
+            Ether(src=src_mac, dst=dst_mac)
+            / IP(src=src_ip, dst=dst_ip)
+            / ICMP(id=self.icmp_id_in, type="echo-request"),
         ]
 
     def create_stream_encrypted(self, src_mac, dst_mac, src_ip, dst_ip, sa):
         return [
             # TCP
-            Ether(src=src_mac, dst=dst_mac) /
-            sa.encrypt(IP(src=src_ip, dst=dst_ip) /
-                       TCP(dport=self.tcp_port_out, sport=20)),
+            Ether(src=src_mac, dst=dst_mac)
+            / sa.encrypt(
+                IP(src=src_ip, dst=dst_ip) / TCP(dport=self.tcp_port_out, sport=20)
+            ),
             # UDP
-            Ether(src=src_mac, dst=dst_mac) /
-            sa.encrypt(IP(src=src_ip, dst=dst_ip) /
-                       UDP(dport=self.udp_port_out, sport=20)),
+            Ether(src=src_mac, dst=dst_mac)
+            / sa.encrypt(
+                IP(src=src_ip, dst=dst_ip) / UDP(dport=self.udp_port_out, sport=20)
+            ),
             # ICMP
-            Ether(src=src_mac, dst=dst_mac) /
-            sa.encrypt(IP(src=src_ip, dst=dst_ip) /
-                       ICMP(id=self.icmp_id_out, type='echo-request'))
+            Ether(src=src_mac, dst=dst_mac)
+            / sa.encrypt(
+                IP(src=src_ip, dst=dst_ip)
+                / ICMP(id=self.icmp_id_out, type="echo-request")
+            ),
         ]
 
     def verify_capture_plain(self, capture):
         for packet in capture:
             try:
                 self.assert_packet_checksums_valid(packet)
-                self.assert_equal(packet[IP].src, self.pg0.remote_ip4,
-                                  "decrypted packet source address")
-                self.assert_equal(packet[IP].dst, self.pg1.remote_ip4,
-                                  "decrypted packet destination address")
+                self.assert_equal(
+                    packet[IP].src,
+                    self.tun_if.remote_ip4,
+                    "decrypted packet source address",
+                )
+                self.assert_equal(
+                    packet[IP].dst,
+                    self.pg1.remote_ip4,
+                    "decrypted packet destination address",
+                )
                 if packet.haslayer(TCP):
                     self.assertFalse(
                         packet.haslayer(UDP),
-                        "unexpected UDP header in decrypted packet")
-                    self.assert_equal(packet[TCP].dport, self.tcp_port_in,
-                                      "decrypted packet TCP destination port")
+                        "unexpected UDP header in decrypted packet",
+                    )
+                    self.assert_equal(
+                        packet[TCP].dport,
+                        self.tcp_port_in,
+                        "decrypted packet TCP destination port",
+                    )
                 elif packet.haslayer(UDP):
                     if packet[UDP].payload:
                         self.assertFalse(
                             packet[UDP][1].haslayer(UDP),
-                            "unexpected UDP header in decrypted packet")
-                    self.assert_equal(packet[UDP].dport, self.udp_port_in,
-                                      "decrypted packet UDP destination port")
+                            "unexpected UDP header in decrypted packet",
+                        )
+                    self.assert_equal(
+                        packet[UDP].dport,
+                        self.udp_port_in,
+                        "decrypted packet UDP destination port",
+                    )
                 else:
                     self.assertFalse(
                         packet.haslayer(UDP),
-                        "unexpected UDP header in decrypted packet")
-                    self.assert_equal(packet[ICMP].id, self.icmp_id_in,
-                                      "decrypted packet ICMP ID")
+                        "unexpected UDP header in decrypted packet",
+                    )
+                    self.assert_equal(
+                        packet[ICMP].id, self.icmp_id_in, "decrypted packet ICMP ID"
+                    )
             except Exception:
-                self.logger.error(
-                    ppp("Unexpected or invalid plain packet:", packet))
+                self.logger.error(ppp("Unexpected or invalid plain packet:", packet))
                 raise
 
     def verify_capture_encrypted(self, capture, sa):
         for packet in capture:
             try:
+                copy = packet.__class__(scapy.compat.raw(packet))
+                del copy[UDP].len
+                copy = packet.__class__(scapy.compat.raw(copy))
+                self.assert_equal(packet[UDP].len, copy[UDP].len, "UDP header length")
+                self.assert_packet_checksums_valid(packet)
                 self.assertIn(ESP, packet[IP])
                 decrypt_pkt = sa.decrypt(packet[IP])
-                self.assert_equal(decrypt_pkt[IP].src, self.pg1.remote_ip4,
-                                  "encrypted packet source address")
-                self.assert_equal(decrypt_pkt[IP].dst, self.pg0.remote_ip4,
-                                  "encrypted packet destination address")
-                # if decrypt_pkt.haslayer(TCP):
-                #     self.tcp_port_out = decrypt_pkt[TCP].sport
-                # elif decrypt_pkt.haslayer(UDP):
-                #     self.udp_port_out = decrypt_pkt[UDP].sport
-                # else:
-                #     self.icmp_id_out = decrypt_pkt[ICMP].id
+                self.assert_packet_checksums_valid(decrypt_pkt)
+                self.assert_equal(
+                    decrypt_pkt[IP].src,
+                    self.pg1.remote_ip4,
+                    "encrypted packet source address",
+                )
+                self.assert_equal(
+                    decrypt_pkt[IP].dst,
+                    self.tun_if.remote_ip4,
+                    "encrypted packet destination address",
+                )
             except Exception:
                 self.logger.error(
-                    ppp("Unexpected or invalid encrypted packet:", packet))
+                    ppp("Unexpected or invalid encrypted packet:", packet)
+                )
                 raise
 
-    @classmethod
-    def config_esp_tun(cls):
-        spd_id = 1
-        remote_sa_id = 10
-        local_sa_id = 20
-        remote_tun_spi = 1001
-        local_tun_spi = 1000
-        client = socket.inet_pton(socket.AF_INET, cls.remote_pg0_client_addr)
-        cls.vapi.ip_add_del_route(client, 32, cls.pg0.remote_ip4n)
-        cls.vapi.ipsec_sad_add_del_entry(remote_sa_id, remote_tun_spi,
-                                         cls.pg1.remote_ip4n,
-                                         cls.pg0.remote_ip4n,
-                                         integrity_key_length=20,
-                                         crypto_key_length=16,
-                                         protocol=1, udp_encap=1)
-        cls.vapi.ipsec_sad_add_del_entry(local_sa_id, local_tun_spi,
-                                         cls.pg0.remote_ip4n,
-                                         cls.pg1.remote_ip4n,
-                                         integrity_key_length=20,
-                                         crypto_key_length=16,
-                                         protocol=1, udp_encap=1)
-        cls.vapi.ipsec_spd_add_del(spd_id)
-        cls.vapi.ipsec_interface_add_del_spd(spd_id, cls.pg0.sw_if_index)
-        l_startaddr = r_startaddr = socket.inet_pton(socket.AF_INET,
-                                                     "0.0.0.0")
-        l_stopaddr = r_stopaddr = socket.inet_pton(socket.AF_INET,
-                                                   "255.255.255.255")
-        cls.vapi.ipsec_spd_add_del_entry(spd_id, l_startaddr, l_stopaddr,
-                                         r_startaddr, r_stopaddr,
-                                         protocol=socket.IPPROTO_ESP)
-        cls.vapi.ipsec_spd_add_del_entry(spd_id, l_startaddr, l_stopaddr,
-                                         r_startaddr, r_stopaddr,
-                                         protocol=socket.IPPROTO_ESP,
-                                         is_outbound=0)
-        cls.vapi.ipsec_spd_add_del_entry(spd_id, l_startaddr, l_stopaddr,
-                                         r_startaddr, r_stopaddr,
-                                         remote_port_start=4500,
-                                         remote_port_stop=4500,
-                                         protocol=socket.IPPROTO_UDP)
-        cls.vapi.ipsec_spd_add_del_entry(spd_id, l_startaddr, l_stopaddr,
-                                         r_startaddr, r_stopaddr,
-                                         remote_port_start=4500,
-                                         remote_port_stop=4500,
-                                         protocol=socket.IPPROTO_UDP,
-                                         is_outbound=0)
-        l_startaddr = l_stopaddr = cls.pg0.remote_ip4n
-        r_startaddr = r_stopaddr = cls.pg1.remote_ip4n
-        cls.vapi.ipsec_spd_add_del_entry(spd_id, l_startaddr, l_stopaddr,
-                                         r_startaddr, r_stopaddr,
-                                         priority=10, policy=3,
-                                         is_outbound=0, sa_id=local_sa_id)
-        cls.vapi.ipsec_spd_add_del_entry(spd_id, r_startaddr, r_stopaddr,
-                                         l_startaddr, l_stopaddr,
-                                         priority=10, policy=3,
-                                         sa_id=remote_sa_id)
+    def config_esp_tun(self, params):
+        addr_type = params.addr_type
+        scapy_tun_sa_id = params.scapy_tun_sa_id
+        scapy_tun_spi = params.scapy_tun_spi
+        vpp_tun_sa_id = params.vpp_tun_sa_id
+        vpp_tun_spi = params.vpp_tun_spi
+        auth_algo_vpp_id = params.auth_algo_vpp_id
+        auth_key = params.auth_key
+        crypt_algo_vpp_id = params.crypt_algo_vpp_id
+        crypt_key = params.crypt_key
+        addr_any = params.addr_any
+        addr_bcast = params.addr_bcast
+        flags = VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_UDP_ENCAP
+        e = VppEnum.vl_api_ipsec_spd_action_t
+
+        VppIpsecSA(
+            self,
+            scapy_tun_sa_id,
+            scapy_tun_spi,
+            auth_algo_vpp_id,
+            auth_key,
+            crypt_algo_vpp_id,
+            crypt_key,
+            self.vpp_esp_protocol,
+            self.pg1.remote_addr[addr_type],
+            self.tun_if.remote_addr[addr_type],
+            flags=flags,
+        ).add_vpp_config()
+        VppIpsecSA(
+            self,
+            vpp_tun_sa_id,
+            vpp_tun_spi,
+            auth_algo_vpp_id,
+            auth_key,
+            crypt_algo_vpp_id,
+            crypt_key,
+            self.vpp_esp_protocol,
+            self.tun_if.remote_addr[addr_type],
+            self.pg1.remote_addr[addr_type],
+            flags=flags,
+        ).add_vpp_config()
+
+        VppIpsecSpdEntry(
+            self,
+            self.tun_spd,
+            scapy_tun_sa_id,
+            addr_any,
+            addr_bcast,
+            addr_any,
+            addr_bcast,
+            socket.IPPROTO_ESP,
+        ).add_vpp_config()
+        VppIpsecSpdEntry(
+            self,
+            self.tun_spd,
+            scapy_tun_sa_id,
+            addr_any,
+            addr_bcast,
+            addr_any,
+            addr_bcast,
+            socket.IPPROTO_ESP,
+            is_outbound=0,
+        ).add_vpp_config()
+        VppIpsecSpdEntry(
+            self,
+            self.tun_spd,
+            scapy_tun_sa_id,
+            addr_any,
+            addr_bcast,
+            addr_any,
+            addr_bcast,
+            socket.IPPROTO_UDP,
+            remote_port_start=4500,
+            remote_port_stop=4500,
+        ).add_vpp_config()
+        VppIpsecSpdEntry(
+            self,
+            self.tun_spd,
+            scapy_tun_sa_id,
+            addr_any,
+            addr_bcast,
+            addr_any,
+            addr_bcast,
+            socket.IPPROTO_UDP,
+            remote_port_start=4500,
+            remote_port_stop=4500,
+            is_outbound=0,
+        ).add_vpp_config()
+        VppIpsecSpdEntry(
+            self,
+            self.tun_spd,
+            vpp_tun_sa_id,
+            self.tun_if.remote_addr[addr_type],
+            self.tun_if.remote_addr[addr_type],
+            self.pg1.remote_addr[addr_type],
+            self.pg1.remote_addr[addr_type],
+            0,
+            priority=10,
+            policy=e.IPSEC_API_SPD_ACTION_PROTECT,
+            is_outbound=0,
+        ).add_vpp_config()
+        VppIpsecSpdEntry(
+            self,
+            self.tun_spd,
+            scapy_tun_sa_id,
+            self.pg1.remote_addr[addr_type],
+            self.pg1.remote_addr[addr_type],
+            self.tun_if.remote_addr[addr_type],
+            self.tun_if.remote_addr[addr_type],
+            0,
+            policy=e.IPSEC_API_SPD_ACTION_PROTECT,
+            priority=10,
+        ).add_vpp_config()
 
     def test_ipsec_nat_tun(self):
-        """ IPSec/NAT tunnel test case """
-        local_tun_sa = SecurityAssociation(ESP, spi=0x000003e9,
-                                           crypt_algo='AES-CBC',
-                                           crypt_key='JPjyOWBeVEQiMe7h',
-                                           auth_algo='HMAC-SHA1-96',
-                                           auth_key='C91KUR9GYMm5GfkEvNjX',
-                                           tunnel_header=IP(
-                                               src=self.pg1.remote_ip4,
-                                               dst=self.pg0.remote_ip4),
-                                           nat_t_header=UDP(
-                                               sport=4500,
-                                               dport=4500))
+        """IPSec/NAT tunnel test case"""
+        p = self.ipv4_params
+        scapy_tun_sa = SecurityAssociation(
+            ESP,
+            spi=p.scapy_tun_spi,
+            crypt_algo=p.crypt_algo,
+            crypt_key=p.crypt_key,
+            auth_algo=p.auth_algo,
+            auth_key=p.auth_key,
+            tunnel_header=IP(src=self.pg1.remote_ip4, dst=self.tun_if.remote_ip4),
+            nat_t_header=UDP(sport=4500, dport=4500),
+        )
         # in2out - from private network to public
         pkts = self.create_stream_plain(
-            self.pg1.remote_mac, self.pg1.local_mac,
-            self.pg1.remote_ip4, self.pg0.remote_ip4)
+            self.pg1.remote_mac,
+            self.pg1.local_mac,
+            self.pg1.remote_ip4,
+            self.tun_if.remote_ip4,
+        )
         self.pg1.add_stream(pkts)
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
-        capture = self.pg0.get_capture(len(pkts))
-        self.verify_capture_encrypted(capture, local_tun_sa)
-
-        remote_tun_sa = SecurityAssociation(ESP, spi=0x000003e8,
-                                            crypt_algo='AES-CBC',
-                                            crypt_key='JPjyOWBeVEQiMe7h',
-                                            auth_algo='HMAC-SHA1-96',
-                                            auth_key='C91KUR9GYMm5GfkEvNjX',
-                                            tunnel_header=IP(
-                                                src=self.pg0.remote_ip4,
-                                                dst=self.pg1.remote_ip4),
-                                            nat_t_header=UDP(
-                                                sport=4500,
-                                                dport=4500))
+        capture = self.tun_if.get_capture(len(pkts))
+        self.verify_capture_encrypted(capture, scapy_tun_sa)
+
+        vpp_tun_sa = SecurityAssociation(
+            ESP,
+            spi=p.vpp_tun_spi,
+            crypt_algo=p.crypt_algo,
+            crypt_key=p.crypt_key,
+            auth_algo=p.auth_algo,
+            auth_key=p.auth_key,
+            tunnel_header=IP(src=self.tun_if.remote_ip4, dst=self.pg1.remote_ip4),
+            nat_t_header=UDP(sport=4500, dport=4500),
+        )
 
         # out2in - from public network to private
         pkts = self.create_stream_encrypted(
-            self.pg0.remote_mac, self.pg0.local_mac,
-            self.pg0.remote_ip4, self.pg1.remote_ip4, remote_tun_sa)
+            self.tun_if.remote_mac,
+            self.tun_if.local_mac,
+            self.tun_if.remote_ip4,
+            self.pg1.remote_ip4,
+            vpp_tun_sa,
+        )
         self.logger.info(ppc("Sending packets:", pkts))
-        self.pg0.add_stream(pkts)
+        self.tun_if.add_stream(pkts)
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
         capture = self.pg1.get_capture(len(pkts))