fib: fib api updates
[vpp.git] / test / test_ipsec_nat.py
index 6d9dc10..07670d7 100644 (file)
@@ -2,22 +2,22 @@
 
 import socket
 
 
 import socket
 
+import scapy.compat
 from scapy.layers.l2 import Ether
 from scapy.layers.inet import ICMP, IP, TCP, UDP
 from scapy.layers.ipsec import SecurityAssociation, ESP
 from scapy.layers.l2 import Ether
 from scapy.layers.inet import ICMP, IP, TCP, UDP
 from scapy.layers.ipsec import SecurityAssociation, ESP
+
 from util import ppp, ppc
 from template_ipsec import TemplateIpsec
 from util import ppp, ppc
 from template_ipsec import TemplateIpsec
+from vpp_ipsec import VppIpsecSA, VppIpsecSpd, VppIpsecSpdEntry,\
+        VppIpsecSpdItfBinding
+from vpp_ip_route import VppIpRoute, VppRoutePath
+from vpp_ip import DpoProto
+from vpp_papi import VppEnum
 
 
 class IPSecNATTestCase(TemplateIpsec):
     """ IPSec/NAT
 
 
 class IPSecNATTestCase(TemplateIpsec):
     """ IPSec/NAT
-
-    TRANSPORT MODE:
-
-     ---   encrypt   ---
-    |pg2| <-------> |VPP|
-     ---   decrypt   ---
-
     TUNNEL MODE:
 
 
     TUNNEL MODE:
 
 
@@ -31,20 +31,42 @@ class IPSecNATTestCase(TemplateIpsec):
      ---            ---           ---
     """
 
      ---            ---           ---
     """
 
+    tcp_port_in = 6303
+    tcp_port_out = 6303
+    udp_port_in = 6304
+    udp_port_out = 6304
+    icmp_id_in = 6305
+    icmp_id_out = 6305
+
     @classmethod
     def setUpClass(cls):
         super(IPSecNATTestCase, cls).setUpClass()
     @classmethod
     def setUpClass(cls):
         super(IPSecNATTestCase, cls).setUpClass()
-        cls.tcp_port_in = 6303
-        cls.tcp_port_out = 6303
-        cls.udp_port_in = 6304
-        cls.udp_port_out = 6304
-        cls.icmp_id_in = 6305
-        cls.icmp_id_out = 6305
-        cls.tun_if = cls.pg0
-        cls.config_esp_tun()
-        cls.logger.info(cls.vapi.ppcli("show ipsec"))
-        client = socket.inet_pton(socket.AF_INET, cls.remote_tun_if_host)
-        cls.vapi.ip_add_del_route(client, 32, cls.pg0.remote_ip4n)
+
+    @classmethod
+    def tearDownClass(cls):
+        super(IPSecNATTestCase, cls).tearDownClass()
+
+    def setUp(self):
+        super(IPSecNATTestCase, self).setUp()
+        self.tun_if = self.pg0
+
+        self.tun_spd = VppIpsecSpd(self, self.tun_spd_id)
+        self.tun_spd.add_vpp_config()
+        VppIpsecSpdItfBinding(self, self.tun_spd,
+                              self.tun_if).add_vpp_config()
+
+        p = self.ipv4_params
+        self.config_esp_tun(p)
+        self.logger.info(self.vapi.ppcli("show ipsec all"))
+
+        d = DpoProto.DPO_PROTO_IP6 if p.is_ipv6 else DpoProto.DPO_PROTO_IP4
+        VppIpRoute(self,  p.remote_tun_if_host, p.addr_len,
+                   [VppRoutePath(self.tun_if.remote_addr[p.addr_type],
+                                 0xffffffff,
+                                 proto=d)]).add_vpp_config()
+
+    def tearDown(self):
+        super(IPSecNATTestCase, self).tearDown()
 
     def create_stream_plain(self, src_mac, dst_mac, src_ip, dst_ip):
         return [
 
     def create_stream_plain(self, src_mac, dst_mac, src_ip, dst_ip):
         return [
@@ -113,83 +135,101 @@ class IPSecNATTestCase(TemplateIpsec):
     def verify_capture_encrypted(self, capture, sa):
         for packet in capture:
             try:
     def verify_capture_encrypted(self, capture, sa):
         for packet in capture:
             try:
+                copy = packet.__class__(scapy.compat.raw(packet))
+                del copy[UDP].len
+                copy = packet.__class__(scapy.compat.raw(copy))
+                self.assert_equal(packet[UDP].len, copy[UDP].len,
+                                  "UDP header length")
+                self.assert_packet_checksums_valid(packet)
                 self.assertIn(ESP, packet[IP])
                 decrypt_pkt = sa.decrypt(packet[IP])
                 self.assertIn(ESP, packet[IP])
                 decrypt_pkt = sa.decrypt(packet[IP])
+                self.assert_packet_checksums_valid(decrypt_pkt)
                 self.assert_equal(decrypt_pkt[IP].src, self.pg1.remote_ip4,
                                   "encrypted packet source address")
                 self.assert_equal(decrypt_pkt[IP].dst, self.tun_if.remote_ip4,
                                   "encrypted packet destination address")
                 self.assert_equal(decrypt_pkt[IP].src, self.pg1.remote_ip4,
                                   "encrypted packet source address")
                 self.assert_equal(decrypt_pkt[IP].dst, self.tun_if.remote_ip4,
                                   "encrypted packet destination address")
-                # if decrypt_pkt.haslayer(TCP):
-                #     self.tcp_port_out = decrypt_pkt[TCP].sport
-                # elif decrypt_pkt.haslayer(UDP):
-                #     self.udp_port_out = decrypt_pkt[UDP].sport
-                # else:
-                #     self.icmp_id_out = decrypt_pkt[ICMP].id
             except Exception:
                 self.logger.error(
                     ppp("Unexpected or invalid encrypted packet:", packet))
                 raise
 
             except Exception:
                 self.logger.error(
                     ppp("Unexpected or invalid encrypted packet:", packet))
                 raise
 
-    @classmethod
-    def config_esp_tun(cls):
-        cls.vapi.ipsec_sad_add_del_entry(cls.scapy_tun_sa_id,
-                                         cls.scapy_tun_spi,
-                                         cls.auth_algo_vpp_id, cls.auth_key,
-                                         cls.crypt_algo_vpp_id,
-                                         cls.crypt_key, cls.vpp_esp_protocol,
-                                         cls.pg1.remote_ip4n,
-                                         cls.tun_if.remote_ip4n)
-        cls.vapi.ipsec_sad_add_del_entry(cls.vpp_tun_sa_id,
-                                         cls.vpp_tun_spi,
-                                         cls.auth_algo_vpp_id, cls.auth_key,
-                                         cls.crypt_algo_vpp_id,
-                                         cls.crypt_key, cls.vpp_esp_protocol,
-                                         cls.tun_if.remote_ip4n,
-                                         cls.pg1.remote_ip4n)
-        cls.vapi.ipsec_spd_add_del(cls.tun_spd_id)
-        cls.vapi.ipsec_interface_add_del_spd(cls.tun_spd_id,
-                                             cls.tun_if.sw_if_index)
-        l_startaddr = r_startaddr = socket.inet_pton(socket.AF_INET,
-                                                     "0.0.0.0")
-        l_stopaddr = r_stopaddr = socket.inet_pton(socket.AF_INET,
-                                                   "255.255.255.255")
-        cls.vapi.ipsec_spd_add_del_entry(cls.tun_spd_id, cls.vpp_tun_sa_id,
-                                         l_startaddr, l_stopaddr, r_startaddr,
-                                         r_stopaddr,
-                                         protocol=socket.IPPROTO_ESP)
-        cls.vapi.ipsec_spd_add_del_entry(cls.tun_spd_id, cls.scapy_tun_sa_id,
-                                         l_startaddr, l_stopaddr, r_startaddr,
-                                         r_stopaddr, is_outbound=0,
-                                         protocol=socket.IPPROTO_ESP)
-        cls.vapi.ipsec_spd_add_del_entry(cls.tun_spd_id, cls.vpp_tun_sa_id,
-                                         l_startaddr, l_stopaddr, r_startaddr,
-                                         r_stopaddr, remote_port_start=4500,
-                                         remote_port_stop=4500,
-                                         protocol=socket.IPPROTO_UDP)
-        cls.vapi.ipsec_spd_add_del_entry(cls.tun_spd_id, cls.scapy_tun_sa_id,
-                                         l_startaddr, l_stopaddr, r_startaddr,
-                                         r_stopaddr, remote_port_start=4500,
-                                         remote_port_stop=4500,
-                                         protocol=socket.IPPROTO_UDP,
-                                         is_outbound=0)
-        l_startaddr = l_stopaddr = cls.tun_if.remote_ip4n
-        r_startaddr = r_stopaddr = cls.pg1.remote_ip4n
-        cls.vapi.ipsec_spd_add_del_entry(cls.tun_spd_id, cls.vpp_tun_sa_id,
-                                         l_startaddr, l_stopaddr, r_startaddr,
-                                         r_stopaddr, priority=10, policy=3,
-                                         is_outbound=0)
-        cls.vapi.ipsec_spd_add_del_entry(cls.tun_spd_id, cls.scapy_tun_sa_id,
-                                         r_startaddr, r_stopaddr, l_startaddr,
-                                         l_stopaddr, priority=10, policy=3)
+    def config_esp_tun(self, params):
+        addr_type = params.addr_type
+        scapy_tun_sa_id = params.scapy_tun_sa_id
+        scapy_tun_spi = params.scapy_tun_spi
+        vpp_tun_sa_id = params.vpp_tun_sa_id
+        vpp_tun_spi = params.vpp_tun_spi
+        auth_algo_vpp_id = params.auth_algo_vpp_id
+        auth_key = params.auth_key
+        crypt_algo_vpp_id = params.crypt_algo_vpp_id
+        crypt_key = params.crypt_key
+        addr_any = params.addr_any
+        addr_bcast = params.addr_bcast
+        flags = (VppEnum.vl_api_ipsec_sad_flags_t.
+                 IPSEC_API_SAD_FLAG_UDP_ENCAP)
+        e = VppEnum.vl_api_ipsec_spd_action_t
+
+        VppIpsecSA(self, scapy_tun_sa_id, scapy_tun_spi,
+                   auth_algo_vpp_id, auth_key,
+                   crypt_algo_vpp_id, crypt_key,
+                   self.vpp_esp_protocol,
+                   self.pg1.remote_addr[addr_type],
+                   self.tun_if.remote_addr[addr_type],
+                   flags=flags).add_vpp_config()
+        VppIpsecSA(self, vpp_tun_sa_id, vpp_tun_spi,
+                   auth_algo_vpp_id, auth_key,
+                   crypt_algo_vpp_id, crypt_key,
+                   self.vpp_esp_protocol,
+                   self.tun_if.remote_addr[addr_type],
+                   self.pg1.remote_addr[addr_type],
+                   flags=flags).add_vpp_config()
+
+        VppIpsecSpdEntry(self, self.tun_spd, scapy_tun_sa_id,
+                         addr_any, addr_bcast,
+                         addr_any, addr_bcast,
+                         socket.IPPROTO_ESP).add_vpp_config()
+        VppIpsecSpdEntry(self, self.tun_spd, scapy_tun_sa_id,
+                         addr_any, addr_bcast,
+                         addr_any, addr_bcast,
+                         socket.IPPROTO_ESP,
+                         is_outbound=0).add_vpp_config()
+        VppIpsecSpdEntry(self, self.tun_spd, scapy_tun_sa_id,
+                         addr_any, addr_bcast,
+                         addr_any, addr_bcast,
+                         socket.IPPROTO_UDP,
+                         remote_port_start=4500,
+                         remote_port_stop=4500).add_vpp_config()
+        VppIpsecSpdEntry(self, self.tun_spd, scapy_tun_sa_id,
+                         addr_any, addr_bcast,
+                         addr_any, addr_bcast,
+                         socket.IPPROTO_UDP,
+                         remote_port_start=4500,
+                         remote_port_stop=4500,
+                         is_outbound=0).add_vpp_config()
+        VppIpsecSpdEntry(self, self.tun_spd, vpp_tun_sa_id,
+                         self.tun_if.remote_addr[addr_type],
+                         self.tun_if.remote_addr[addr_type],
+                         self.pg1.remote_addr[addr_type],
+                         self.pg1.remote_addr[addr_type],
+                         0, priority=10,
+                         policy=e.IPSEC_API_SPD_ACTION_PROTECT,
+                         is_outbound=0).add_vpp_config()
+        VppIpsecSpdEntry(self, self.tun_spd, scapy_tun_sa_id,
+                         self.pg1.remote_addr[addr_type],
+                         self.pg1.remote_addr[addr_type],
+                         self.tun_if.remote_addr[addr_type],
+                         self.tun_if.remote_addr[addr_type],
+                         0, policy=e.IPSEC_API_SPD_ACTION_PROTECT,
+                         priority=10).add_vpp_config()
 
     def test_ipsec_nat_tun(self):
         """ IPSec/NAT tunnel test case """
 
     def test_ipsec_nat_tun(self):
         """ IPSec/NAT tunnel test case """
-        scapy_tun_sa = SecurityAssociation(ESP,
-                                           spi=self.scapy_tun_spi,
-                                           crypt_algo=self.crypt_algo,
-                                           crypt_key=self.crypt_key,
-                                           auth_algo=self.auth_algo,
-                                           auth_key=self.auth_key,
+        p = self.ipv4_params
+        scapy_tun_sa = SecurityAssociation(ESP, spi=p.scapy_tun_spi,
+                                           crypt_algo=p.crypt_algo,
+                                           crypt_key=p.crypt_key,
+                                           auth_algo=p.auth_algo,
+                                           auth_key=p.auth_key,
                                            tunnel_header=IP(
                                                src=self.pg1.remote_ip4,
                                                dst=self.tun_if.remote_ip4),
                                            tunnel_header=IP(
                                                src=self.pg1.remote_ip4,
                                                dst=self.tun_if.remote_ip4),
@@ -207,11 +247,11 @@ class IPSecNATTestCase(TemplateIpsec):
         self.verify_capture_encrypted(capture, scapy_tun_sa)
 
         vpp_tun_sa = SecurityAssociation(ESP,
         self.verify_capture_encrypted(capture, scapy_tun_sa)
 
         vpp_tun_sa = SecurityAssociation(ESP,
-                                         spi=self.vpp_tun_spi,
-                                         crypt_algo=self.crypt_algo,
-                                         crypt_key=self.crypt_key,
-                                         auth_algo=self.auth_algo,
-                                         auth_key=self.auth_key,
+                                         spi=p.vpp_tun_spi,
+                                         crypt_algo=p.crypt_algo,
+                                         crypt_key=p.crypt_key,
+                                         auth_algo=p.auth_algo,
+                                         auth_key=p.auth_key,
                                          tunnel_header=IP(
                                              src=self.tun_if.remote_ip4,
                                              dst=self.pg1.remote_ip4),
                                          tunnel_header=IP(
                                              src=self.tun_if.remote_ip4,
                                              dst=self.pg1.remote_ip4),