cls.vapi.bridge_domain_add_del(bd_id=cls.bd_id, uu_flood=1,
learn=1)
for pg_if in cls.pg_interfaces:
- cls.vapi.sw_interface_set_l2_bridge(pg_if.sw_if_index,
- bd_id=cls.bd_id)
+ cls.vapi.sw_interface_set_l2_bridge(
+ rx_sw_if_index=pg_if.sw_if_index, bd_id=cls.bd_id)
# Set up all interfaces
for i in cls.pg_interfaces:
super(TestACLplugin, cls).tearDownClass()
raise
+ @classmethod
+ def tearDownClass(cls):
+ super(TestACLplugin, cls).tearDownClass()
+
def setUp(self):
super(TestACLplugin, self).setUp()
self.reset_packet_infos()
Show various debug prints after each test.
"""
super(TestACLplugin, self).tearDown()
- if not self.vpp_dead:
- cli = "show vlib graph l2-input-feat-arc"
- self.logger.info(self.vapi.ppcli(cli))
- cli = "show vlib graph l2-input-feat-arc-end"
- self.logger.info(self.vapi.ppcli(cli))
- cli = "show vlib graph l2-output-feat-arc"
- self.logger.info(self.vapi.ppcli(cli))
- cli = "show vlib graph l2-output-feat-arc-end"
- self.logger.info(self.vapi.ppcli(cli))
- self.logger.info(self.vapi.ppcli("show l2fib verbose"))
- self.logger.info(self.vapi.ppcli("show acl-plugin acl"))
- self.logger.info(self.vapi.ppcli("show acl-plugin interface"))
- self.logger.info(self.vapi.ppcli("show acl-plugin tables"))
- self.logger.info(self.vapi.ppcli("show bridge-domain %s detail"
- % self.bd_id))
+
+ def show_commands_at_teardown(self):
+ cli = "show vlib graph l2-input-feat-arc"
+ self.logger.info(self.vapi.ppcli(cli))
+ cli = "show vlib graph l2-input-feat-arc-end"
+ self.logger.info(self.vapi.ppcli(cli))
+ cli = "show vlib graph l2-output-feat-arc"
+ self.logger.info(self.vapi.ppcli(cli))
+ cli = "show vlib graph l2-output-feat-arc-end"
+ self.logger.info(self.vapi.ppcli(cli))
+ self.logger.info(self.vapi.ppcli("show l2fib verbose"))
+ self.logger.info(self.vapi.ppcli("show acl-plugin acl"))
+ self.logger.info(self.vapi.ppcli("show acl-plugin interface"))
+ self.logger.info(self.vapi.ppcli("show acl-plugin tables"))
+ self.logger.info(self.vapi.ppcli("show bridge-domain %s detail"
+ % self.bd_id))
def create_rule(self, ip=0, permit_deny=0, ports=PORTS_ALL, proto=-1,
s_prefix=0, s_ip='\x00\x00\x00\x00',
'dst_ip_addr': d_ip})
return rule
- def apply_rules(self, rules, tag=''):
+ def apply_rules(self, rules, tag=b''):
reply = self.vapi.acl_add_replace(acl_index=4294967295, r=rules,
tag=tag)
self.logger.info("Dumped ACL: " + str(
acls=[reply.acl_index])
return
- def apply_rules_to(self, rules, tag='', sw_if_index=0xFFFFFFFF):
+ def apply_rules_to(self, rules, tag=b'', sw_if_index=0xFFFFFFFF):
reply = self.vapi.acl_add_replace(acl_index=4294967295, r=rules,
tag=tag)
self.logger.info("Dumped ACL: " + str(
# Raw data for ICMPv6 are stored in ICMPv6EchoRequest.data
if traffic_type == self.ICMP and ip_type == self.IPV6:
payload_info = self.payload_to_info(
- packet[ICMPv6EchoRequest].data)
+ packet[ICMPv6EchoRequest], 'data')
payload = packet[ICMPv6EchoRequest]
else:
- payload_info = self.payload_to_info(str(packet[Raw]))
+ payload_info = self.payload_to_info(packet[Raw])
payload = packet[self.proto_map[payload_info.proto]]
except:
self.logger.error(ppp("Unexpected or invalid packet "
'srcport_or_icmptype_first': 1234,
'srcport_or_icmptype_last': 1235,
'src_ip_prefix_len': 0,
- 'src_ip_addr': '\x00\x00\x00\x00',
+ 'src_ip_addr': b'\x00\x00\x00\x00',
'dstport_or_icmpcode_first': 1234,
'dstport_or_icmpcode_last': 1234,
- 'dst_ip_addr': '\x00\x00\x00\x00',
+ 'dst_ip_addr': b'\x00\x00\x00\x00',
'dst_ip_prefix_len': 0}]
# Test 1: add a new ACL
reply = self.vapi.acl_add_replace(acl_index=4294967295, r=r,
- tag="permit 1234")
+ tag=b"permit 1234")
self.assertEqual(reply.retval, 0)
# The very first ACL gets #0
self.assertEqual(reply.acl_index, 0)
'srcport_or_icmptype_first': 1234,
'srcport_or_icmptype_last': 1235,
'src_ip_prefix_len': 0,
- 'src_ip_addr': '\x00\x00\x00\x00',
+ 'src_ip_addr': b'\x00\x00\x00\x00',
'dstport_or_icmpcode_first': 1234,
'dstport_or_icmpcode_last': 1234,
- 'dst_ip_addr': '\x00\x00\x00\x00',
+ 'dst_ip_addr': b'\x00\x00\x00\x00',
'dst_ip_prefix_len': 0},
{'is_permit': 1, 'is_ipv6': 0, 'proto': 17,
'srcport_or_icmptype_first': 0,
'srcport_or_icmptype_last': 0,
'src_ip_prefix_len': 0,
- 'src_ip_addr': '\x00\x00\x00\x00',
+ 'src_ip_addr': b'\x00\x00\x00\x00',
'dstport_or_icmpcode_first': 0,
'dstport_or_icmpcode_last': 0,
- 'dst_ip_addr': '\x00\x00\x00\x00',
+ 'dst_ip_addr': b'\x00\x00\x00\x00',
'dst_ip_prefix_len': 0}]
reply = self.vapi.acl_add_replace(acl_index=4294967295, r=r_deny,
- tag="deny 1234;permit all")
+ tag=b"deny 1234;permit all")
self.assertEqual(reply.retval, 0)
# The second ACL gets #1
self.assertEqual(reply.acl_index, 1)
# Test 2: try to modify a nonexistent ACL
reply = self.vapi.acl_add_replace(acl_index=432, r=r,
- tag="FFFF:FFFF", expected_retval=-6)
+ tag=b"FFFF:FFFF", expected_retval=-6)
self.assertEqual(reply.retval, -6)
# The ACL number should pass through
self.assertEqual(reply.acl_index, 432)
0, self.proto[self.IP][self.TCP]))
# Apply rules
- self.apply_rules(rules, "permit per-flow")
+ self.apply_rules(rules, b"permit per-flow")
# Traffic should still pass
self.run_verify_test(self.IP, self.IPV4, -1)
self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, "deny per-flow;permit all")
+ self.apply_rules(rules, b"deny per-flow;permit all")
# Traffic should not pass
self.run_verify_negat_test(self.IP, self.IPV4,
rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, "permit icmpv4")
+ self.apply_rules(rules, b"permit icmpv4")
# Traffic should still pass
self.run_verify_test(self.ICMP, self.IPV4,
rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, "permit icmpv6")
+ self.apply_rules(rules, b"permit icmpv6")
# Traffic should still pass
self.run_verify_test(self.ICMP, self.IPV6,
self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, "deny icmpv4")
+ self.apply_rules(rules, b"deny icmpv4")
# Traffic should not pass
self.run_verify_negat_test(self.ICMP, self.IPV4, 0)
self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, "deny icmpv6")
+ self.apply_rules(rules, b"deny icmpv6")
# Traffic should not pass
self.run_verify_negat_test(self.ICMP, self.IPV6, 0)
rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, "permit ipv4 tcp")
+ self.apply_rules(rules, b"permit ipv4 tcp")
# Traffic should still pass
self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP])
rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, "permit ip6 tcp")
+ self.apply_rules(rules, b"permit ip6 tcp")
# Traffic should still pass
self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.TCP])
rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, "permit ipv udp")
+ self.apply_rules(rules, b"permit ipv udp")
# Traffic should still pass
self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP])
rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, "permit ip6 udp")
+ self.apply_rules(rules, b"permit ip6 udp")
# Traffic should still pass
self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.UDP])
self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, "deny ip4/ip6 tcp")
+ self.apply_rules(rules, b"deny ip4/ip6 tcp")
# Traffic should not pass
self.run_verify_negat_test(self.IP, self.IPRANDOM,
self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, "deny ip4/ip6 udp")
+ self.apply_rules(rules, b"deny ip4/ip6 udp")
# Traffic should not pass
self.run_verify_negat_test(self.IP, self.IPRANDOM,
rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, "permit ip4 tcp "+str(port))
+ self.apply_rules(rules, b"permit ip4 tcp %d" % port)
# Traffic should still pass
self.run_verify_test(self.IP, self.IPV4,
rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, "permit ip4 tcp "+str(port))
+ self.apply_rules(rules, b"permit ip4 tcp %d" % port)
# Traffic should still pass
self.run_verify_test(self.IP, self.IPV4,
rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, "permit ip4 tcp "+str(port))
+ self.apply_rules(rules, b"permit ip4 tcp %d" % port)
# Traffic should still pass
self.run_verify_test(self.IP, self.IPV6,
self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, "permit ip4 tcp "+str(port))
+ self.apply_rules(rules, b"permit ip4 tcp %d" % port)
# Traffic should still pass
self.run_verify_test(self.IP, self.IPV6,
self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, "deny ip4/ip6 udp "+str(port))
+ self.apply_rules(rules, b"deny ip4/ip6 udp %d" % port)
# Traffic should not pass
self.run_verify_negat_test(self.IP, self.IPRANDOM,
self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, "deny ip4/ip6 udp "+str(port))
+ self.apply_rules(rules, b"deny ip4/ip6 udp %d" % port)
# Traffic should not pass
self.run_verify_negat_test(self.IP, self.IPRANDOM,
self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, "deny ip4/ip6 udp "+str(port))
+ self.apply_rules(rules, b"deny ip4/ip6 udp %d" % port)
# Traffic should not pass
self.run_verify_negat_test(self.IP, self.IPRANDOM,
self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, "permit empty udp ip4 " + str(port))
+ self.apply_rules(rules, b"permit empty udp ip4 %d" % port)
# Traffic should still pass
# Create incoming packet streams for packet-generator interfaces
rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, "permit empty udp ip6 "+str(port))
+ self.apply_rules(rules, b"permit empty udp ip6 %d" % port)
# Traffic should still pass
# Create incoming packet streams for packet-generator interfaces
rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, "permit ipv4 tcp")
+ self.apply_rules(rules, b"permit ipv4 tcp")
# Traffic should still pass
self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP])
rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, "permit ip6 tcp")
+ self.apply_rules(rules, b"permit ip6 tcp")
# Traffic should still pass
self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.TCP])
rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, "permit ipv4 udp")
+ self.apply_rules(rules, b"permit ipv4 udp")
# Traffic should still pass
self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP])
rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, "permit ip6 udp")
+ self.apply_rules(rules, b"permit ip6 udp")
# Traffic should still pass
self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.UDP])
self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, "deny ip4/ip6 tcp")
+ self.apply_rules(rules, b"deny ip4/ip6 tcp")
# Traffic should not pass
self.run_verify_negat_test(self.IP, self.IPRANDOM,
self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, "deny ip4/ip6 udp")
+ self.apply_rules(rules, b"deny ip4/ip6 udp")
# Traffic should not pass
self.run_verify_negat_test(self.IP, self.IPRANDOM,
rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, "permit ipv4 tcp")
+ self.apply_rules(rules, b"permit ipv4 tcp")
# Traffic should still pass also for an odd ethertype
self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP],
rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, "permit ipv4 tcp")
+ self.apply_rules(rules, b"permit ipv4 tcp")
# whitelist the 0xbbbb etype - so the 0xaaaa should be blocked
self.etype_whitelist([0xbbb], 1)
rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, "permit ipv4 tcp")
+ self.apply_rules(rules, b"permit ipv4 tcp")
# whitelist the 0xbbbb etype - so the 0xaaaa should be blocked
self.etype_whitelist([0xbbb], 1)
rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, "permit ipv4 tcp")
+ self.apply_rules(rules, b"permit ipv4 tcp")
# whitelist the 0xbbbb etype - so the 0xaaaa should be blocked
self.etype_whitelist([0xbbb], 1)
intf.append(VppLoInterface(self))
# Apply rules
- self.apply_rules_to(rules, "permit ipv4 tcp", intf[0].sw_if_index)
+ self.apply_rules_to(rules, b"permit ipv4 tcp", intf[0].sw_if_index)
# Remove the interface
intf[0].remove_vpp_config()