nat: scavenging functionality removed
[vpp.git] / src / plugins / nat / test / test_nat.py
index d5d4128..e267c4e 100644 (file)
@@ -33,6 +33,7 @@ from scapy.all import bind_layers, Packet, ByteEnumField, ShortField, \
 from ipaddress import IPv6Network
 from util import ppc, ppp
 from socket import inet_pton, AF_INET
+from vpp_acl import AclRule, VppAcl, VppAclInterface
 
 
 # NAT HA protocol event data
@@ -4186,100 +4187,6 @@ class TestNAT44(MethodHolder):
         self.logger.info(self.vapi.cli("show nat ha"))
 
 
-class TestNAT44EndpointDependent2(MethodHolder):
-    """ Endpoint-Dependent session test cases """
-
-    icmp_timeout = 2
-
-    @classmethod
-    def setUpConstants(cls):
-        super(TestNAT44EndpointDependent2, cls).setUpConstants()
-        cls.vpp_cmdline.extend(["nat", "{", "endpoint-dependent",
-                                "translation", "hash", "buckets", "1",
-                                "icmp", "timeout", str(cls.icmp_timeout), "}"])
-
-    @classmethod
-    def setUpClass(cls):
-        super(TestNAT44EndpointDependent2, cls).setUpClass()
-        translation_buckets = 1
-        cls.max_translations = 10 * translation_buckets
-
-        cls.create_pg_interfaces(range(2))
-        cls.interfaces = list(cls.pg_interfaces[0:2])
-
-        for i in cls.interfaces:
-            i.admin_up()
-            i.config_ip4()
-            i.resolve_arp()
-
-        cls.pg0.generate_remote_hosts(1)
-        cls.pg0.configure_ipv4_neighbors()
-
-        cls.pg1.generate_remote_hosts(1)
-        cls.pg1.configure_ipv4_neighbors()
-
-    @classmethod
-    def tearDownClass(cls):
-        super(TestNAT44EndpointDependent2, cls).tearDownClass()
-
-    def create_icmp_stream(self, in_if, out_if, count):
-        """
-        Create ICMP packet stream for inside network
-
-        :param in_if: Inside interface
-        :param out_if: Outside interface
-        :param count: Number of packets
-        """
-
-        self.assertTrue(count > 0)
-        icmp_id = random.randint(0, 65535 - (count - 1))
-
-        pkts = list()
-        for i in range(count):
-            p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
-                 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=64) /
-                 ICMP(id=icmp_id + i, type='echo-request'))
-            pkts.append(p)
-        return pkts
-
-    def send_pkts(self, pkts, expected=None):
-        self.pg0.add_stream(pkts)
-        self.pg_enable_capture(self.pg_interfaces)
-        self.pg_start()
-        return self.pg1.get_capture(
-            len(pkts) if expected is None else expected)
-
-    def test_session_cleanup(self):
-        """ NAT44 session cleanup test """
-
-        self.nat44_add_address(self.pg1.local_ip4)
-        flags = self.config_flags.NAT_IS_INSIDE
-        self.vapi.nat44_interface_add_del_feature(
-            sw_if_index=self.pg0.sw_if_index,
-            flags=flags, is_add=1)
-        self.vapi.nat44_interface_add_del_feature(
-            sw_if_index=self.pg1.sw_if_index,
-            is_add=1)
-
-        nat_config = self.vapi.nat_show_config()
-        self.assertEqual(1, nat_config.endpoint_dependent)
-
-        pkts = self.create_icmp_stream(self.pg0, self.pg1,
-                                       self.max_translations + 2)
-        sz = len(pkts)
-
-        # positive test
-        self.send_pkts(pkts[0:self.max_translations])
-
-        # false positive test
-        self.send_pkts(pkts[self.max_translations:sz - 1], 0)
-
-        sleep(self.icmp_timeout)
-
-        # positive test
-        self.send_pkts(pkts[self.max_translations + 1:sz])
-
-
 class TestNAT44EndpointDependent(MethodHolder):
     """ Endpoint-Dependent mapping and filtering test cases """
 
@@ -6525,53 +6432,24 @@ class TestNAT44EndpointDependent(MethodHolder):
         self.verify_capture_in(capture, self.pg0)
 
         # Create an ACL blocking everything
-        out2in_deny_rule = {
-            'is_permit': 0,
-            'is_ipv6': 0,
-            'src_ip_addr': inet_pton(AF_INET, "0.0.0.0"),
-            'src_ip_prefix_len': 0,
-            'dst_ip_addr':  inet_pton(AF_INET, "0.0.0.0"),
-            'dst_ip_prefix_len': 0,
-            'srcport_or_icmptype_first': 0,
-            'srcport_or_icmptype_last': 65535,
-            'dstport_or_icmpcode_first': 0,
-            'dstport_or_icmpcode_last': 65535,
-            'proto': 0,
-        }
-        out2in_rules = [out2in_deny_rule]
-        res = self.vapi.acl_add_replace(0xffffffff, out2in_rules)
-        self.assertEqual(res.retval, 0, "error adding out2in ACL")
-        out2in_acl = res.acl_index
+        out2in_deny_rule = AclRule(is_permit=0)
+        out2in_acl = VppAcl(self, rules=[out2in_deny_rule])
+        out2in_acl.add_vpp_config()
+
+        # create an ACL to permit/reflect everything
+        in2out_reflect_rule = AclRule(is_permit=2)
+        in2out_acl = VppAcl(self, rules=[in2out_reflect_rule])
+        in2out_acl.add_vpp_config()
 
         # apply as input acl on interface and confirm it blocks everything
-        self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg1.sw_if_index,
-                                             n_input=1,
-                                             acls=[out2in_acl])
+        acl_if = VppAclInterface(self, sw_if_index=self.pg1.sw_if_index,
+                                 n_input=1, acls=[out2in_acl])
+        acl_if.add_vpp_config()
         self.send_and_assert_no_replies(self.pg1, pkts_out2in)
 
-        # create an ACL to permit/reflect everything
-        in2out_reflect_rule = {
-            'is_permit': 2,
-            'is_ipv6': 0,
-            'src_ip_addr': inet_pton(AF_INET, "0.0.0.0"),
-            'src_ip_prefix_len': 0,
-            'dst_ip_addr':  inet_pton(AF_INET, "0.0.0.0"),
-            'dst_ip_prefix_len': 0,
-            'srcport_or_icmptype_first': 0,
-            'srcport_or_icmptype_last': 65535,
-            'dstport_or_icmpcode_first': 0,
-            'dstport_or_icmpcode_last': 65535,
-            'proto': 0,
-        }
-        in2out_rules = [in2out_reflect_rule]
-        res = self.vapi.acl_add_replace(0xffffffff, in2out_rules)
-        self.assertEqual(res.retval, 0, "error adding in2out ACL")
-        in2out_acl = res.acl_index
-
         # apply output acl
-        self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg1.sw_if_index,
-                                             n_input=1,
-                                             acls=[out2in_acl, in2out_acl])
+        acl_if.acls = [out2in_acl, in2out_acl]
+        acl_if.add_vpp_config()
         # send in2out to generate ACL state (NAT state was created earlier)
         capture = self.send_and_expect(self.pg0, pkts_in2out, self.pg1,
                                        len(pkts_in2out))
@@ -6587,15 +6465,6 @@ class TestNAT44EndpointDependent(MethodHolder):
         self.verify_capture_in(capture, self.pg0)
         self.logger.info(self.vapi.cli("show trace"))
 
-        # Clean up
-        # Remove ACLs from interface
-        self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg1.sw_if_index,
-                                             n_input=0,
-                                             acls=[])
-        # delete ACLs
-        self.vapi.acl_del(acl_index=out2in_acl, expected_retval=0)
-        self.vapi.acl_del(acl_index=in2out_acl, expected_retval=0)
-
     def test_multiple_vrf(self):
         """ Multiple VRF setup """
         external_addr = '1.2.3.4'