X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=src%2Fplugins%2Fnat%2Ftest%2Ftest_nat.py;h=e267c4ef2bcafba2c8a936ca636bddd8a19741e1;hb=1a0a89770688a37e500f634b68805b1984eccac0;hp=d5d41288c424b6ff16ddd62f2c4d9257e327e1f5;hpb=492a5d0bd79c3c0913f1b8fb4ad35d9ad23d821b;p=vpp.git diff --git a/src/plugins/nat/test/test_nat.py b/src/plugins/nat/test/test_nat.py index d5d41288c42..e267c4ef2bc 100644 --- a/src/plugins/nat/test/test_nat.py +++ b/src/plugins/nat/test/test_nat.py @@ -33,6 +33,7 @@ from scapy.all import bind_layers, Packet, ByteEnumField, ShortField, \ from ipaddress import IPv6Network from util import ppc, ppp from socket import inet_pton, AF_INET +from vpp_acl import AclRule, VppAcl, VppAclInterface # NAT HA protocol event data @@ -4186,100 +4187,6 @@ class TestNAT44(MethodHolder): self.logger.info(self.vapi.cli("show nat ha")) -class TestNAT44EndpointDependent2(MethodHolder): - """ Endpoint-Dependent session test cases """ - - icmp_timeout = 2 - - @classmethod - def setUpConstants(cls): - super(TestNAT44EndpointDependent2, cls).setUpConstants() - cls.vpp_cmdline.extend(["nat", "{", "endpoint-dependent", - "translation", "hash", "buckets", "1", - "icmp", "timeout", str(cls.icmp_timeout), "}"]) - - @classmethod - def setUpClass(cls): - super(TestNAT44EndpointDependent2, cls).setUpClass() - translation_buckets = 1 - cls.max_translations = 10 * translation_buckets - - cls.create_pg_interfaces(range(2)) - cls.interfaces = list(cls.pg_interfaces[0:2]) - - for i in cls.interfaces: - i.admin_up() - i.config_ip4() - i.resolve_arp() - - cls.pg0.generate_remote_hosts(1) - cls.pg0.configure_ipv4_neighbors() - - cls.pg1.generate_remote_hosts(1) - cls.pg1.configure_ipv4_neighbors() - - @classmethod - def tearDownClass(cls): - super(TestNAT44EndpointDependent2, cls).tearDownClass() - - def create_icmp_stream(self, in_if, out_if, count): - """ - Create ICMP packet stream for inside network - - :param in_if: Inside interface - :param out_if: Outside interface - :param count: Number of packets - """ - - self.assertTrue(count > 0) - icmp_id = random.randint(0, 65535 - (count - 1)) - - pkts = list() - for i in range(count): - p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) / - IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=64) / - ICMP(id=icmp_id + i, type='echo-request')) - pkts.append(p) - return pkts - - def send_pkts(self, pkts, expected=None): - self.pg0.add_stream(pkts) - self.pg_enable_capture(self.pg_interfaces) - self.pg_start() - return self.pg1.get_capture( - len(pkts) if expected is None else expected) - - def test_session_cleanup(self): - """ NAT44 session cleanup test """ - - self.nat44_add_address(self.pg1.local_ip4) - flags = self.config_flags.NAT_IS_INSIDE - self.vapi.nat44_interface_add_del_feature( - sw_if_index=self.pg0.sw_if_index, - flags=flags, is_add=1) - self.vapi.nat44_interface_add_del_feature( - sw_if_index=self.pg1.sw_if_index, - is_add=1) - - nat_config = self.vapi.nat_show_config() - self.assertEqual(1, nat_config.endpoint_dependent) - - pkts = self.create_icmp_stream(self.pg0, self.pg1, - self.max_translations + 2) - sz = len(pkts) - - # positive test - self.send_pkts(pkts[0:self.max_translations]) - - # false positive test - self.send_pkts(pkts[self.max_translations:sz - 1], 0) - - sleep(self.icmp_timeout) - - # positive test - self.send_pkts(pkts[self.max_translations + 1:sz]) - - class TestNAT44EndpointDependent(MethodHolder): """ Endpoint-Dependent mapping and filtering test cases """ @@ -6525,53 +6432,24 @@ class TestNAT44EndpointDependent(MethodHolder): self.verify_capture_in(capture, self.pg0) # Create an ACL blocking everything - out2in_deny_rule = { - 'is_permit': 0, - 'is_ipv6': 0, - 'src_ip_addr': inet_pton(AF_INET, "0.0.0.0"), - 'src_ip_prefix_len': 0, - 'dst_ip_addr': inet_pton(AF_INET, "0.0.0.0"), - 'dst_ip_prefix_len': 0, - 'srcport_or_icmptype_first': 0, - 'srcport_or_icmptype_last': 65535, - 'dstport_or_icmpcode_first': 0, - 'dstport_or_icmpcode_last': 65535, - 'proto': 0, - } - out2in_rules = [out2in_deny_rule] - res = self.vapi.acl_add_replace(0xffffffff, out2in_rules) - self.assertEqual(res.retval, 0, "error adding out2in ACL") - out2in_acl = res.acl_index + out2in_deny_rule = AclRule(is_permit=0) + out2in_acl = VppAcl(self, rules=[out2in_deny_rule]) + out2in_acl.add_vpp_config() + + # create an ACL to permit/reflect everything + in2out_reflect_rule = AclRule(is_permit=2) + in2out_acl = VppAcl(self, rules=[in2out_reflect_rule]) + in2out_acl.add_vpp_config() # apply as input acl on interface and confirm it blocks everything - self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg1.sw_if_index, - n_input=1, - acls=[out2in_acl]) + acl_if = VppAclInterface(self, sw_if_index=self.pg1.sw_if_index, + n_input=1, acls=[out2in_acl]) + acl_if.add_vpp_config() self.send_and_assert_no_replies(self.pg1, pkts_out2in) - # create an ACL to permit/reflect everything - in2out_reflect_rule = { - 'is_permit': 2, - 'is_ipv6': 0, - 'src_ip_addr': inet_pton(AF_INET, "0.0.0.0"), - 'src_ip_prefix_len': 0, - 'dst_ip_addr': inet_pton(AF_INET, "0.0.0.0"), - 'dst_ip_prefix_len': 0, - 'srcport_or_icmptype_first': 0, - 'srcport_or_icmptype_last': 65535, - 'dstport_or_icmpcode_first': 0, - 'dstport_or_icmpcode_last': 65535, - 'proto': 0, - } - in2out_rules = [in2out_reflect_rule] - res = self.vapi.acl_add_replace(0xffffffff, in2out_rules) - self.assertEqual(res.retval, 0, "error adding in2out ACL") - in2out_acl = res.acl_index - # apply output acl - self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg1.sw_if_index, - n_input=1, - acls=[out2in_acl, in2out_acl]) + acl_if.acls = [out2in_acl, in2out_acl] + acl_if.add_vpp_config() # send in2out to generate ACL state (NAT state was created earlier) capture = self.send_and_expect(self.pg0, pkts_in2out, self.pg1, len(pkts_in2out)) @@ -6587,15 +6465,6 @@ class TestNAT44EndpointDependent(MethodHolder): self.verify_capture_in(capture, self.pg0) self.logger.info(self.vapi.cli("show trace")) - # Clean up - # Remove ACLs from interface - self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg1.sw_if_index, - n_input=0, - acls=[]) - # delete ACLs - self.vapi.acl_del(acl_index=out2in_acl, expected_retval=0) - self.vapi.acl_del(acl_index=in2out_acl, expected_retval=0) - def test_multiple_vrf(self): """ Multiple VRF setup """ external_addr = '1.2.3.4'