L3 cross connect
[vpp.git] / test / test_ipsec_nat.py
index 05befe4..3209def 100644 (file)
@@ -2,11 +2,18 @@
 
 import socket
 
 
 import socket
 
+import scapy.compat
 from scapy.layers.l2 import Ether
 from scapy.layers.inet import ICMP, IP, TCP, UDP
 from scapy.layers.ipsec import SecurityAssociation, ESP
 from scapy.layers.l2 import Ether
 from scapy.layers.inet import ICMP, IP, TCP, UDP
 from scapy.layers.ipsec import SecurityAssociation, ESP
+
 from util import ppp, ppc
 from template_ipsec import TemplateIpsec
 from util import ppp, ppc
 from template_ipsec import TemplateIpsec
+from vpp_ipsec import VppIpsecSA, VppIpsecSpd, VppIpsecSpdEntry,\
+        VppIpsecSpdItfBinding
+from vpp_ip_route import VppIpRoute, VppRoutePath
+from vpp_ip import DpoProto
+from vpp_papi import VppEnum
 
 
 class IPSecNATTestCase(TemplateIpsec):
 
 
 class IPSecNATTestCase(TemplateIpsec):
@@ -31,19 +38,36 @@ class IPSecNATTestCase(TemplateIpsec):
     icmp_id_in = 6305
     icmp_id_out = 6305
 
     icmp_id_in = 6305
     icmp_id_out = 6305
 
+    @classmethod
+    def setUpClass(cls):
+        super(IPSecNATTestCase, cls).setUpClass()
+
+    @classmethod
+    def tearDownClass(cls):
+        super(IPSecNATTestCase, cls).tearDownClass()
+
     def setUp(self):
         super(IPSecNATTestCase, self).setUp()
         self.tun_if = self.pg0
     def setUp(self):
         super(IPSecNATTestCase, self).setUp()
         self.tun_if = self.pg0
-        self.vapi.ipsec_spd_add_del(self.tun_spd_id)
-        self.vapi.ipsec_interface_add_del_spd(self.tun_spd_id,
-                                              self.tun_if.sw_if_index)
+
+        self.tun_spd = VppIpsecSpd(self, self.tun_spd_id)
+        self.tun_spd.add_vpp_config()
+        VppIpsecSpdItfBinding(self, self.tun_spd,
+                              self.tun_if).add_vpp_config()
+
         p = self.ipv4_params
         self.config_esp_tun(p)
         p = self.ipv4_params
         self.config_esp_tun(p)
-        self.logger.info(self.vapi.ppcli("show ipsec"))
-        src = socket.inet_pton(p.addr_type, p.remote_tun_if_host)
-        self.vapi.ip_add_del_route(src, p.addr_len,
-                                   self.tun_if.remote_addr_n[p.addr_type],
-                                   is_ipv6=p.is_ipv6)
+        self.logger.info(self.vapi.ppcli("show ipsec all"))
+
+        d = DpoProto.DPO_PROTO_IP6 if p.is_ipv6 else DpoProto.DPO_PROTO_IP4
+        VppIpRoute(self,  p.remote_tun_if_host, p.addr_len,
+                   [VppRoutePath(self.tun_if.remote_addr[p.addr_type],
+                                 0xffffffff,
+                                 proto=d)],
+                   is_ip6=p.is_ipv6).add_vpp_config()
+
+    def tearDown(self):
+        super(IPSecNATTestCase, self).tearDown()
 
     def create_stream_plain(self, src_mac, dst_mac, src_ip, dst_ip):
         return [
 
     def create_stream_plain(self, src_mac, dst_mac, src_ip, dst_ip):
         return [
@@ -112,9 +136,9 @@ class IPSecNATTestCase(TemplateIpsec):
     def verify_capture_encrypted(self, capture, sa):
         for packet in capture:
             try:
     def verify_capture_encrypted(self, capture, sa):
         for packet in capture:
             try:
-                copy = packet.__class__(str(packet))
+                copy = packet.__class__(scapy.compat.raw(packet))
                 del copy[UDP].len
                 del copy[UDP].len
-                copy = packet.__class__(str(copy))
+                copy = packet.__class__(scapy.compat.raw(copy))
                 self.assert_equal(packet[UDP].len, copy[UDP].len,
                                   "UDP header length")
                 self.assert_packet_checksums_valid(packet)
                 self.assert_equal(packet[UDP].len, copy[UDP].len,
                                   "UDP header length")
                 self.assert_packet_checksums_valid(packet)
@@ -142,50 +166,62 @@ class IPSecNATTestCase(TemplateIpsec):
         crypt_key = params.crypt_key
         addr_any = params.addr_any
         addr_bcast = params.addr_bcast
         crypt_key = params.crypt_key
         addr_any = params.addr_any
         addr_bcast = params.addr_bcast
-        self.vapi.ipsec_sad_add_del_entry(scapy_tun_sa_id, scapy_tun_spi,
-                                          auth_algo_vpp_id, auth_key,
-                                          crypt_algo_vpp_id, crypt_key,
-                                          self.vpp_esp_protocol,
-                                          self.pg1.remote_addr_n[addr_type],
-                                          self.tun_if.remote_addr_n[addr_type],
-                                          udp_encap=1)
-        self.vapi.ipsec_sad_add_del_entry(vpp_tun_sa_id, vpp_tun_spi,
-                                          auth_algo_vpp_id, auth_key,
-                                          crypt_algo_vpp_id, crypt_key,
-                                          self.vpp_esp_protocol,
-                                          self.tun_if.remote_addr_n[addr_type],
-                                          self.pg1.remote_addr_n[addr_type],
-                                          udp_encap=1)
-        l_startaddr = r_startaddr = socket.inet_pton(addr_type, addr_any)
-        l_stopaddr = r_stopaddr = socket.inet_pton(addr_type, addr_bcast)
-        self.vapi.ipsec_spd_add_del_entry(self.tun_spd_id, scapy_tun_sa_id,
-                                          l_startaddr, l_stopaddr, r_startaddr,
-                                          r_stopaddr,
-                                          protocol=socket.IPPROTO_ESP)
-        self.vapi.ipsec_spd_add_del_entry(self.tun_spd_id, scapy_tun_sa_id,
-                                          l_startaddr, l_stopaddr, r_startaddr,
-                                          r_stopaddr, is_outbound=0,
-                                          protocol=socket.IPPROTO_ESP)
-        self.vapi.ipsec_spd_add_del_entry(self.tun_spd_id, scapy_tun_sa_id,
-                                          l_startaddr, l_stopaddr, r_startaddr,
-                                          r_stopaddr, remote_port_start=4500,
-                                          remote_port_stop=4500,
-                                          protocol=socket.IPPROTO_UDP)
-        self.vapi.ipsec_spd_add_del_entry(self.tun_spd_id, scapy_tun_sa_id,
-                                          l_startaddr, l_stopaddr, r_startaddr,
-                                          r_stopaddr, remote_port_start=4500,
-                                          remote_port_stop=4500,
-                                          protocol=socket.IPPROTO_UDP,
-                                          is_outbound=0)
-        l_startaddr = l_stopaddr = self.tun_if.remote_addr_n[addr_type]
-        r_startaddr = r_stopaddr = self.pg1.remote_addr_n[addr_type]
-        self.vapi.ipsec_spd_add_del_entry(self.tun_spd_id, vpp_tun_sa_id,
-                                          l_startaddr, l_stopaddr, r_startaddr,
-                                          r_stopaddr, priority=10, policy=3,
-                                          is_outbound=0)
-        self.vapi.ipsec_spd_add_del_entry(self.tun_spd_id, scapy_tun_sa_id,
-                                          r_startaddr, r_stopaddr, l_startaddr,
-                                          l_stopaddr, priority=10, policy=3)
+        flags = (VppEnum.vl_api_ipsec_sad_flags_t.
+                 IPSEC_API_SAD_FLAG_UDP_ENCAP)
+        e = VppEnum.vl_api_ipsec_spd_action_t
+
+        VppIpsecSA(self, scapy_tun_sa_id, scapy_tun_spi,
+                   auth_algo_vpp_id, auth_key,
+                   crypt_algo_vpp_id, crypt_key,
+                   self.vpp_esp_protocol,
+                   self.pg1.remote_addr[addr_type],
+                   self.tun_if.remote_addr[addr_type],
+                   flags=flags).add_vpp_config()
+        VppIpsecSA(self, vpp_tun_sa_id, vpp_tun_spi,
+                   auth_algo_vpp_id, auth_key,
+                   crypt_algo_vpp_id, crypt_key,
+                   self.vpp_esp_protocol,
+                   self.tun_if.remote_addr[addr_type],
+                   self.pg1.remote_addr[addr_type],
+                   flags=flags).add_vpp_config()
+
+        VppIpsecSpdEntry(self, self.tun_spd, scapy_tun_sa_id,
+                         addr_any, addr_bcast,
+                         addr_any, addr_bcast,
+                         socket.IPPROTO_ESP).add_vpp_config()
+        VppIpsecSpdEntry(self, self.tun_spd, scapy_tun_sa_id,
+                         addr_any, addr_bcast,
+                         addr_any, addr_bcast,
+                         socket.IPPROTO_ESP,
+                         is_outbound=0).add_vpp_config()
+        VppIpsecSpdEntry(self, self.tun_spd, scapy_tun_sa_id,
+                         addr_any, addr_bcast,
+                         addr_any, addr_bcast,
+                         socket.IPPROTO_UDP,
+                         remote_port_start=4500,
+                         remote_port_stop=4500).add_vpp_config()
+        VppIpsecSpdEntry(self, self.tun_spd, scapy_tun_sa_id,
+                         addr_any, addr_bcast,
+                         addr_any, addr_bcast,
+                         socket.IPPROTO_UDP,
+                         remote_port_start=4500,
+                         remote_port_stop=4500,
+                         is_outbound=0).add_vpp_config()
+        VppIpsecSpdEntry(self, self.tun_spd, vpp_tun_sa_id,
+                         self.tun_if.remote_addr[addr_type],
+                         self.tun_if.remote_addr[addr_type],
+                         self.pg1.remote_addr[addr_type],
+                         self.pg1.remote_addr[addr_type],
+                         0, priority=10,
+                         policy=e.IPSEC_API_SPD_ACTION_PROTECT,
+                         is_outbound=0).add_vpp_config()
+        VppIpsecSpdEntry(self, self.tun_spd, scapy_tun_sa_id,
+                         self.pg1.remote_addr[addr_type],
+                         self.pg1.remote_addr[addr_type],
+                         self.tun_if.remote_addr[addr_type],
+                         self.tun_if.remote_addr[addr_type],
+                         0, policy=e.IPSEC_API_SPD_ACTION_PROTECT,
+                         priority=10).add_vpp_config()
 
     def test_ipsec_nat_tun(self):
         """ IPSec/NAT tunnel test case """
 
     def test_ipsec_nat_tun(self):
         """ IPSec/NAT tunnel test case """