Revert "tests: Deprecate the use of CLI commands in tests."
[vpp.git] / test / test_ipsec_nat.py
index 05befe4..e364f5f 100644 (file)
@@ -2,11 +2,18 @@
 
 import socket
 
 
 import socket
 
+import scapy.compat
 from scapy.layers.l2 import Ether
 from scapy.layers.inet import ICMP, IP, TCP, UDP
 from scapy.layers.ipsec import SecurityAssociation, ESP
 from scapy.layers.l2 import Ether
 from scapy.layers.inet import ICMP, IP, TCP, UDP
 from scapy.layers.ipsec import SecurityAssociation, ESP
+
 from util import ppp, ppc
 from template_ipsec import TemplateIpsec
 from util import ppp, ppc
 from template_ipsec import TemplateIpsec
+from vpp_ipsec import VppIpsecSA, VppIpsecSpd, VppIpsecSpdEntry,\
+        VppIpsecSpdItfBinding
+from vpp_ip_route import VppIpRoute, VppRoutePath
+from vpp_ip import DpoProto
+from vpp_papi import VppEnum
 
 
 class IPSecNATTestCase(TemplateIpsec):
 
 
 class IPSecNATTestCase(TemplateIpsec):
@@ -34,16 +41,25 @@ class IPSecNATTestCase(TemplateIpsec):
     def setUp(self):
         super(IPSecNATTestCase, self).setUp()
         self.tun_if = self.pg0
     def setUp(self):
         super(IPSecNATTestCase, self).setUp()
         self.tun_if = self.pg0
-        self.vapi.ipsec_spd_add_del(self.tun_spd_id)
-        self.vapi.ipsec_interface_add_del_spd(self.tun_spd_id,
-                                              self.tun_if.sw_if_index)
+
+        self.tun_spd = VppIpsecSpd(self, self.tun_spd_id)
+        self.tun_spd.add_vpp_config()
+        VppIpsecSpdItfBinding(self, self.tun_spd,
+                              self.tun_if).add_vpp_config()
+
         p = self.ipv4_params
         self.config_esp_tun(p)
         self.logger.info(self.vapi.ppcli("show ipsec"))
         p = self.ipv4_params
         self.config_esp_tun(p)
         self.logger.info(self.vapi.ppcli("show ipsec"))
-        src = socket.inet_pton(p.addr_type, p.remote_tun_if_host)
-        self.vapi.ip_add_del_route(src, p.addr_len,
-                                   self.tun_if.remote_addr_n[p.addr_type],
-                                   is_ipv6=p.is_ipv6)
+
+        d = DpoProto.DPO_PROTO_IP6 if p.is_ipv6 else DpoProto.DPO_PROTO_IP4
+        VppIpRoute(self,  p.remote_tun_if_host, p.addr_len,
+                   [VppRoutePath(self.tun_if.remote_addr[p.addr_type],
+                                 0xffffffff,
+                                 proto=d)],
+                   is_ip6=p.is_ipv6).add_vpp_config()
+
+    def tearDown(self):
+        super(IPSecNATTestCase, self).tearDown()
 
     def create_stream_plain(self, src_mac, dst_mac, src_ip, dst_ip):
         return [
 
     def create_stream_plain(self, src_mac, dst_mac, src_ip, dst_ip):
         return [
@@ -112,9 +128,9 @@ class IPSecNATTestCase(TemplateIpsec):
     def verify_capture_encrypted(self, capture, sa):
         for packet in capture:
             try:
     def verify_capture_encrypted(self, capture, sa):
         for packet in capture:
             try:
-                copy = packet.__class__(str(packet))
+                copy = packet.__class__(scapy.compat.raw(packet))
                 del copy[UDP].len
                 del copy[UDP].len
-                copy = packet.__class__(str(copy))
+                copy = packet.__class__(scapy.compat.raw(copy))
                 self.assert_equal(packet[UDP].len, copy[UDP].len,
                                   "UDP header length")
                 self.assert_packet_checksums_valid(packet)
                 self.assert_equal(packet[UDP].len, copy[UDP].len,
                                   "UDP header length")
                 self.assert_packet_checksums_valid(packet)
@@ -142,50 +158,62 @@ class IPSecNATTestCase(TemplateIpsec):
         crypt_key = params.crypt_key
         addr_any = params.addr_any
         addr_bcast = params.addr_bcast
         crypt_key = params.crypt_key
         addr_any = params.addr_any
         addr_bcast = params.addr_bcast
-        self.vapi.ipsec_sad_add_del_entry(scapy_tun_sa_id, scapy_tun_spi,
-                                          auth_algo_vpp_id, auth_key,
-                                          crypt_algo_vpp_id, crypt_key,
-                                          self.vpp_esp_protocol,
-                                          self.pg1.remote_addr_n[addr_type],
-                                          self.tun_if.remote_addr_n[addr_type],
-                                          udp_encap=1)
-        self.vapi.ipsec_sad_add_del_entry(vpp_tun_sa_id, vpp_tun_spi,
-                                          auth_algo_vpp_id, auth_key,
-                                          crypt_algo_vpp_id, crypt_key,
-                                          self.vpp_esp_protocol,
-                                          self.tun_if.remote_addr_n[addr_type],
-                                          self.pg1.remote_addr_n[addr_type],
-                                          udp_encap=1)
-        l_startaddr = r_startaddr = socket.inet_pton(addr_type, addr_any)
-        l_stopaddr = r_stopaddr = socket.inet_pton(addr_type, addr_bcast)
-        self.vapi.ipsec_spd_add_del_entry(self.tun_spd_id, scapy_tun_sa_id,
-                                          l_startaddr, l_stopaddr, r_startaddr,
-                                          r_stopaddr,
-                                          protocol=socket.IPPROTO_ESP)
-        self.vapi.ipsec_spd_add_del_entry(self.tun_spd_id, scapy_tun_sa_id,
-                                          l_startaddr, l_stopaddr, r_startaddr,
-                                          r_stopaddr, is_outbound=0,
-                                          protocol=socket.IPPROTO_ESP)
-        self.vapi.ipsec_spd_add_del_entry(self.tun_spd_id, scapy_tun_sa_id,
-                                          l_startaddr, l_stopaddr, r_startaddr,
-                                          r_stopaddr, remote_port_start=4500,
-                                          remote_port_stop=4500,
-                                          protocol=socket.IPPROTO_UDP)
-        self.vapi.ipsec_spd_add_del_entry(self.tun_spd_id, scapy_tun_sa_id,
-                                          l_startaddr, l_stopaddr, r_startaddr,
-                                          r_stopaddr, remote_port_start=4500,
-                                          remote_port_stop=4500,
-                                          protocol=socket.IPPROTO_UDP,
-                                          is_outbound=0)
-        l_startaddr = l_stopaddr = self.tun_if.remote_addr_n[addr_type]
-        r_startaddr = r_stopaddr = self.pg1.remote_addr_n[addr_type]
-        self.vapi.ipsec_spd_add_del_entry(self.tun_spd_id, vpp_tun_sa_id,
-                                          l_startaddr, l_stopaddr, r_startaddr,
-                                          r_stopaddr, priority=10, policy=3,
-                                          is_outbound=0)
-        self.vapi.ipsec_spd_add_del_entry(self.tun_spd_id, scapy_tun_sa_id,
-                                          r_startaddr, r_stopaddr, l_startaddr,
-                                          l_stopaddr, priority=10, policy=3)
+        flags = (VppEnum.vl_api_ipsec_sad_flags_t.
+                 IPSEC_API_SAD_FLAG_UDP_ENCAP)
+        e = VppEnum.vl_api_ipsec_spd_action_t
+
+        VppIpsecSA(self, scapy_tun_sa_id, scapy_tun_spi,
+                   auth_algo_vpp_id, auth_key,
+                   crypt_algo_vpp_id, crypt_key,
+                   self.vpp_esp_protocol,
+                   self.pg1.remote_addr[addr_type],
+                   self.tun_if.remote_addr[addr_type],
+                   flags=flags).add_vpp_config()
+        VppIpsecSA(self, vpp_tun_sa_id, vpp_tun_spi,
+                   auth_algo_vpp_id, auth_key,
+                   crypt_algo_vpp_id, crypt_key,
+                   self.vpp_esp_protocol,
+                   self.tun_if.remote_addr[addr_type],
+                   self.pg1.remote_addr[addr_type],
+                   flags=flags).add_vpp_config()
+
+        VppIpsecSpdEntry(self, self.tun_spd, scapy_tun_sa_id,
+                         addr_any, addr_bcast,
+                         addr_any, addr_bcast,
+                         socket.IPPROTO_ESP).add_vpp_config()
+        VppIpsecSpdEntry(self, self.tun_spd, scapy_tun_sa_id,
+                         addr_any, addr_bcast,
+                         addr_any, addr_bcast,
+                         socket.IPPROTO_ESP,
+                         is_outbound=0).add_vpp_config()
+        VppIpsecSpdEntry(self, self.tun_spd, scapy_tun_sa_id,
+                         addr_any, addr_bcast,
+                         addr_any, addr_bcast,
+                         socket.IPPROTO_UDP,
+                         remote_port_start=4500,
+                         remote_port_stop=4500).add_vpp_config()
+        VppIpsecSpdEntry(self, self.tun_spd, scapy_tun_sa_id,
+                         addr_any, addr_bcast,
+                         addr_any, addr_bcast,
+                         socket.IPPROTO_UDP,
+                         remote_port_start=4500,
+                         remote_port_stop=4500,
+                         is_outbound=0).add_vpp_config()
+        VppIpsecSpdEntry(self, self.tun_spd, vpp_tun_sa_id,
+                         self.tun_if.remote_addr[addr_type],
+                         self.tun_if.remote_addr[addr_type],
+                         self.pg1.remote_addr[addr_type],
+                         self.pg1.remote_addr[addr_type],
+                         0, priority=10,
+                         policy=e.IPSEC_API_SPD_ACTION_PROTECT,
+                         is_outbound=0).add_vpp_config()
+        VppIpsecSpdEntry(self, self.tun_spd, scapy_tun_sa_id,
+                         self.pg1.remote_addr[addr_type],
+                         self.pg1.remote_addr[addr_type],
+                         self.tun_if.remote_addr[addr_type],
+                         self.tun_if.remote_addr[addr_type],
+                         0, policy=e.IPSEC_API_SPD_ACTION_PROTECT,
+                         priority=10).add_vpp_config()
 
     def test_ipsec_nat_tun(self):
         """ IPSec/NAT tunnel test case """
 
     def test_ipsec_nat_tun(self):
         """ IPSec/NAT tunnel test case """