return rule
def apply_rules(self, rules, tag=''):
- reply = self.api_acl_add_replace(acl_index=4294967295, r=rules,
- count=len(rules),
- tag=tag)
+ reply = self.vapi.acl_add_replace(acl_index=4294967295, r=rules,
+ tag=tag)
self.logger.info("Dumped ACL: " + str(
- self.api_acl_dump(reply.acl_index)))
+ self.vapi.acl_dump(reply.acl_index)))
# Apply a ACL on the interface as inbound
for i in self.pg_interfaces:
- self.api_acl_interface_set_acl_list(sw_if_index=i.sw_if_index,
- count=1, n_input=1,
- acls=[reply.acl_index])
+ self.vapi.acl_interface_set_acl_list(sw_if_index=i.sw_if_index,
+ n_input=1,
+ acls=[reply.acl_index])
return
def create_upper_layer(self, packet_index, proto, ports=0):
capture = dst_if.get_capture(0)
self.assertEqual(len(capture), 0)
- def api_acl_add_replace(self, acl_index, r, count, tag='',
- expected_retval=0):
- """Add/replace an ACL
-
- :param int acl_index: ACL index to replace,
- 4294967295 to create new ACL.
- :param acl_rule r: ACL rules array.
- :param str tag: symbolic tag (description) for this ACL.
- :param int count: number of rules.
- """
- return self.vapi.api(self.vapi.papi.acl_add_replace,
- {'acl_index': acl_index,
- 'r': r,
- 'count': count,
- 'tag': tag},
- expected_retval=expected_retval)
-
- def api_acl_interface_set_acl_list(self, sw_if_index, count, n_input, acls,
- expected_retval=0):
- return self.vapi.api(self.vapi.papi.acl_interface_set_acl_list,
- {'sw_if_index': sw_if_index,
- 'count': count,
- 'n_input': n_input,
- 'acls': acls},
- expected_retval=expected_retval)
-
- def api_acl_dump(self, acl_index, expected_retval=0):
- return self.vapi.api(self.vapi.papi.acl_dump,
- {'acl_index': acl_index},
- expected_retval=expected_retval)
-
def test_0000_warmup_test(self):
""" ACL plugin version check; learn MACs
"""
'dst_ip_addr': '\x00\x00\x00\x00',
'dst_ip_prefix_len': 0}]
# Test 1: add a new ACL
- reply = self.api_acl_add_replace(acl_index=4294967295, r=r,
- count=len(r), tag="permit 1234")
+ reply = self.vapi.acl_add_replace(acl_index=4294967295, r=r,
+ tag="permit 1234")
self.assertEqual(reply.retval, 0)
# The very first ACL gets #0
self.assertEqual(reply.acl_index, 0)
- rr = self.api_acl_dump(reply.acl_index)
+ rr = self.vapi.acl_dump(reply.acl_index)
self.logger.info("Dumped ACL: " + str(rr))
self.assertEqual(len(rr), 1)
# We should have the same number of ACL entries as we had asked
'dst_ip_addr': '\x00\x00\x00\x00',
'dst_ip_prefix_len': 0})
- reply = self.api_acl_add_replace(acl_index=4294967295, r=r_deny,
- count=len(r_deny),
- tag="deny 1234;permit all")
+ reply = self.vapi.acl_add_replace(acl_index=4294967295, r=r_deny,
+ tag="deny 1234;permit all")
self.assertEqual(reply.retval, 0)
# The second ACL gets #1
self.assertEqual(reply.acl_index, 1)
# Test 2: try to modify a nonexistent ACL
- reply = self.api_acl_add_replace(acl_index=432, r=r, count=len(r),
- tag="FFFF:FFFF", expected_retval=-1)
+ reply = self.vapi.acl_add_replace(acl_index=432, r=r,
+ tag="FFFF:FFFF", expected_retval=-1)
self.assertEqual(reply.retval, -1)
# The ACL number should pass through
self.assertEqual(reply.acl_index, 432)
for i in range(len(r)):
rules.append(self.create_rule(r[i][0], r[i][1], r[i][2], r[i][3]))
- reply = self.api_acl_add_replace(acl_index=4294967295, r=rules,
- count=len(rules))
- result = self.api_acl_dump(reply.acl_index)
+ reply = self.vapi.acl_add_replace(acl_index=4294967295, r=rules)
+ result = self.vapi.acl_dump(reply.acl_index)
i = 0
for drules in result:
r = []
r.append(pkt.to_acl_rule(2, wildcard_sport=True))
r.append(self.wildcard_rule(0))
- res = self.testcase.api_acl_add_replace(0xffffffff, r)
+ res = self.testcase.vapi.acl_add_replace(0xffffffff, r)
self.testcase.assert_equal(res.retval, 0, "error adding ACL")
reflect_acl_index = res.acl_index
r = []
r.append(self.wildcard_rule(0))
- res = self.testcase.api_acl_add_replace(0xffffffff, r)
+ res = self.testcase.vapi.acl_add_replace(0xffffffff, r)
self.testcase.assert_equal(res.retval, 0, "error adding deny ACL")
deny_acl_index = res.acl_index
if reflect_side == acl_side:
- self.testcase.api_acl_interface_set_acl_list(
- self.ifs[acl_side].sw_if_index, 2, 1,
+ self.testcase.vapi.acl_interface_set_acl_list(
+ self.ifs[acl_side].sw_if_index, 1,
[reflect_acl_index,
deny_acl_index])
- self.testcase.api_acl_interface_set_acl_list(
- self.ifs[1-acl_side].sw_if_index, 0, 0, [])
+ self.testcase.vapi.acl_interface_set_acl_list(
+ self.ifs[1-acl_side].sw_if_index, 0, [])
else:
- self.testcase.api_acl_interface_set_acl_list(
- self.ifs[acl_side].sw_if_index, 2, 1,
+ self.testcase.vapi.acl_interface_set_acl_list(
+ self.ifs[acl_side].sw_if_index, 1,
[deny_acl_index,
reflect_acl_index])
- self.testcase.api_acl_interface_set_acl_list(
- self.ifs[1-acl_side].sw_if_index, 0, 0, [])
+ self.testcase.vapi.acl_interface_set_acl_list(
+ self.ifs[1-acl_side].sw_if_index, 0, [])
def wildcard_rule(self, is_permit):
any_addr = ["0.0.0.0", "::"]
self.logger.info(self.vapi.cli("show acl-plugin interface"))
self.logger.info(self.vapi.cli("show acl-plugin tables"))
- def api_acl_add_replace(self, acl_index, r, count=-1, tag="",
- expected_retval=0):
- """Add/replace an ACL
-
- :param int acl_index: ACL index to replace, 4294967295 to create new.
- :param acl_rule r: ACL rules array.
- :param str tag: symbolic tag (description) for this ACL.
- :param int count: number of rules.
- """
- if (count < 0):
- count = len(r)
- return self.vapi.api(self.vapi.papi.acl_add_replace,
- {'acl_index': acl_index,
- 'r': r,
- 'count': count,
- 'tag': tag
- }, expected_retval=expected_retval)
-
- def api_acl_interface_set_acl_list(self, sw_if_index, count, n_input, acls,
- expected_retval=0):
- return self.vapi.api(self.vapi.papi.acl_interface_set_acl_list,
- {'sw_if_index': sw_if_index,
- 'count': count,
- 'n_input': n_input,
- 'acls': acls
- }, expected_retval=expected_retval)
-
- def api_acl_dump(self, acl_index, expected_retval=0):
- return self.vapi.api(self.vapi.papi.acl_dump,
- {'acl_index': acl_index},
- expected_retval=expected_retval)
-
def run_basic_conn_test(self, af, acl_side):
""" Basic conn timeout test """
conn1 = Conn(self, self.pg0, self.pg1, af, UDP, 42001, 4242)
self.logger.info(self.vapi.cli("show acl-plugin interface"))
self.logger.info(self.vapi.cli("show acl-plugin tables"))
- def api_acl_add_replace(self, acl_index, r, count, tag="",
- expected_retval=0):
- """Add/replace an ACL
-
- :param int acl_index: ACL index to replace, 4294967295 to create new.
- :param acl_rule r: ACL rules array.
- :param str tag: symbolic tag (description) for this ACL.
- :param int count: number of rules.
- """
- return self.vapi.api(self.vapi.papi.acl_add_replace,
- {'acl_index': acl_index,
- 'r': r,
- 'count': count,
- 'tag': tag
- }, expected_retval=expected_retval)
-
- def api_acl_interface_set_acl_list(self, sw_if_index, count, n_input, acls,
- expected_retval=0):
- return self.vapi.api(self.vapi.papi.acl_interface_set_acl_list,
- {'sw_if_index': sw_if_index,
- 'count': count,
- 'n_input': n_input,
- 'acls': acls
- }, expected_retval=expected_retval)
-
- def api_acl_dump(self, acl_index, expected_retval=0):
- return self.vapi.api(self.vapi.papi.acl_dump,
- {'acl_index': acl_index},
- expected_retval=expected_retval)
-
def create_stream(self, src_ip_if, dst_ip_if, reverse, packet_sizes,
is_ip6, expect_blocked, expect_established,
add_extension_header):
r_permit = stream_dict['permit_rules']
r_permit_reflect = stream_dict['permit_and_reflect_rules']
r_action = r_permit_reflect if is_reflect else r
- reply = self.api_acl_add_replace(acl_index=4294967295, r=r_action,
- count=len(r_action), tag="action acl")
+ reply = self.vapi.acl_add_replace(acl_index=4294967295, r=r_action,
+ tag="act. acl")
action_acl_index = reply.acl_index
- reply = self.api_acl_add_replace(acl_index=4294967295, r=r_permit,
- count=len(r_permit), tag="permit acl")
+ reply = self.vapi.acl_add_replace(acl_index=4294967295, r=r_permit,
+ tag="perm. acl")
permit_acl_index = reply.acl_index
return {'L2': action_acl_index if test_l2_action else permit_acl_index,
'L3': permit_acl_index if test_l2_action else action_acl_index,
is_reflect)
n_input_l3 = 0 if bridged_to_routed else 1
n_input_l2 = 1 if bridged_to_routed else 0
- self.api_acl_interface_set_acl_list(sw_if_index=self.pg2.sw_if_index,
- count=1,
- n_input=n_input_l3,
- acls=[acl_idx['L3']])
- self.api_acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
- count=1,
- n_input=n_input_l2,
- acls=[acl_idx['L2']])
- self.api_acl_interface_set_acl_list(sw_if_index=self.pg1.sw_if_index,
- count=1,
- n_input=n_input_l2,
- acls=[acl_idx['L2']])
+ self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg2.sw_if_index,
+ n_input=n_input_l3,
+ acls=[acl_idx['L3']])
+ self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
+ n_input=n_input_l2,
+ acls=[acl_idx['L2']])
+ self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg1.sw_if_index,
+ n_input=n_input_l2,
+ acls=[acl_idx['L2']])
def apply_acl_ip46_both_directions_reflect(self,
primary_is_bridged_to_routed,
else:
outbound_l3_acl = acl_idx_rev['L3']
- self.api_acl_interface_set_acl_list(sw_if_index=self.pg2.sw_if_index,
- count=2,
- n_input=1,
- acls=[inbound_l3_acl,
- outbound_l3_acl])
- self.api_acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
- count=2,
- n_input=1,
- acls=[inbound_l2_acl,
- outbound_l2_acl])
- self.api_acl_interface_set_acl_list(sw_if_index=self.pg1.sw_if_index,
- count=2,
- n_input=1,
- acls=[inbound_l2_acl,
- outbound_l2_acl])
+ self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg2.sw_if_index,
+ n_input=1,
+ acls=[inbound_l3_acl,
+ outbound_l3_acl])
+ self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
+ n_input=1,
+ acls=[inbound_l2_acl,
+ outbound_l2_acl])
+ self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg1.sw_if_index,
+ n_input=1,
+ acls=[inbound_l2_acl,
+ outbound_l2_acl])
def apply_acl_ip46_routed_to_bridged(self, test_l2_deny, is_ip6,
is_reflect, add_eh):
'traffic_type': traffic_type
})
+ def acl_add_replace(self, acl_index, r, tag='',
+ expected_retval=0):
+ """Add/replace an ACL
+ :param int acl_index: ACL index to replace, 2^32-1 to create new ACL.
+ :param acl_rule r: ACL rules array.
+ :param str tag: symbolic tag (description) for this ACL.
+ :param int count: number of rules.
+ """
+ return self.api(self.papi.acl_add_replace,
+ {'acl_index': acl_index,
+ 'r': r,
+ 'count': len(r),
+ 'tag': tag},
+ expected_retval=expected_retval)
+
+ def acl_interface_set_acl_list(self, sw_if_index, n_input, acls,
+ expected_retval=0):
+ return self.api(self.papi.acl_interface_set_acl_list,
+ {'sw_if_index': sw_if_index,
+ 'count': len(acls),
+ 'n_input': n_input,
+ 'acls': acls},
+ expected_retval=expected_retval)
+
+ def acl_dump(self, acl_index, expected_retval=0):
+ return self.api(self.papi.acl_dump,
+ {'acl_index': acl_index},
+ expected_retval=expected_retval)
+
def macip_acl_add(self, rules, tag=""):
""" Add MACIP acl