X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=src%2Fplugins%2Fnat%2Ftest%2Ftest_nat.py;h=ac9c65dd0f577bc64c4f93efb5964a8c341194bb;hb=2f8cd914514fe54f91974c6d465d4769dfac8de8;hp=d5d41288c424b6ff16ddd62f2c4d9257e327e1f5;hpb=64d9da3ba3b07d23782ef1a947fb5a71b9f4de56;p=vpp.git diff --git a/src/plugins/nat/test/test_nat.py b/src/plugins/nat/test/test_nat.py index d5d41288c42..ac9c65dd0f5 100644 --- a/src/plugins/nat/test/test_nat.py +++ b/src/plugins/nat/test/test_nat.py @@ -33,6 +33,7 @@ from scapy.all import bind_layers, Packet, ByteEnumField, ShortField, \ from ipaddress import IPv6Network from util import ppc, ppp from socket import inet_pton, AF_INET +from vpp_acl import AclRule, VppAcl, VppAclInterface # NAT HA protocol event data @@ -6525,53 +6526,24 @@ class TestNAT44EndpointDependent(MethodHolder): self.verify_capture_in(capture, self.pg0) # Create an ACL blocking everything - out2in_deny_rule = { - 'is_permit': 0, - 'is_ipv6': 0, - 'src_ip_addr': inet_pton(AF_INET, "0.0.0.0"), - 'src_ip_prefix_len': 0, - 'dst_ip_addr': inet_pton(AF_INET, "0.0.0.0"), - 'dst_ip_prefix_len': 0, - 'srcport_or_icmptype_first': 0, - 'srcport_or_icmptype_last': 65535, - 'dstport_or_icmpcode_first': 0, - 'dstport_or_icmpcode_last': 65535, - 'proto': 0, - } - out2in_rules = [out2in_deny_rule] - res = self.vapi.acl_add_replace(0xffffffff, out2in_rules) - self.assertEqual(res.retval, 0, "error adding out2in ACL") - out2in_acl = res.acl_index + out2in_deny_rule = AclRule(is_permit=0) + out2in_acl = VppAcl(self, rules=[out2in_deny_rule]) + out2in_acl.add_vpp_config() + + # create an ACL to permit/reflect everything + in2out_reflect_rule = AclRule(is_permit=2) + in2out_acl = VppAcl(self, rules=[in2out_reflect_rule]) + in2out_acl.add_vpp_config() # apply as input acl on interface and confirm it blocks everything - self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg1.sw_if_index, - n_input=1, - acls=[out2in_acl]) + acl_if = VppAclInterface(self, sw_if_index=self.pg1.sw_if_index, + n_input=1, acls=[out2in_acl]) + acl_if.add_vpp_config() self.send_and_assert_no_replies(self.pg1, pkts_out2in) - # create an ACL to permit/reflect everything - in2out_reflect_rule = { - 'is_permit': 2, - 'is_ipv6': 0, - 'src_ip_addr': inet_pton(AF_INET, "0.0.0.0"), - 'src_ip_prefix_len': 0, - 'dst_ip_addr': inet_pton(AF_INET, "0.0.0.0"), - 'dst_ip_prefix_len': 0, - 'srcport_or_icmptype_first': 0, - 'srcport_or_icmptype_last': 65535, - 'dstport_or_icmpcode_first': 0, - 'dstport_or_icmpcode_last': 65535, - 'proto': 0, - } - in2out_rules = [in2out_reflect_rule] - res = self.vapi.acl_add_replace(0xffffffff, in2out_rules) - self.assertEqual(res.retval, 0, "error adding in2out ACL") - in2out_acl = res.acl_index - # apply output acl - self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg1.sw_if_index, - n_input=1, - acls=[out2in_acl, in2out_acl]) + acl_if.acls = [out2in_acl, in2out_acl] + acl_if.add_vpp_config() # send in2out to generate ACL state (NAT state was created earlier) capture = self.send_and_expect(self.pg0, pkts_in2out, self.pg1, len(pkts_in2out)) @@ -6587,15 +6559,6 @@ class TestNAT44EndpointDependent(MethodHolder): self.verify_capture_in(capture, self.pg0) self.logger.info(self.vapi.cli("show trace")) - # Clean up - # Remove ACLs from interface - self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg1.sw_if_index, - n_input=0, - acls=[]) - # delete ACLs - self.vapi.acl_del(acl_index=out2in_acl, expected_retval=0) - self.vapi.acl_del(acl_index=in2out_acl, expected_retval=0) - def test_multiple_vrf(self): """ Multiple VRF setup """ external_addr = '1.2.3.4'