# port ranges
PORTS_ALL = -1
PORTS_RANGE = 0
+ PORTS_RANGE_2 = 1
udp_sport_from = 10
udp_sport_to = udp_sport_from + 5
udp_dport_from = 20000
tcp_dport_from = 40000
tcp_dport_to = tcp_dport_from + 5000
+ udp_sport_from_2 = 90
+ udp_sport_to_2 = udp_sport_from_2 + 5
+ udp_dport_from_2 = 30000
+ udp_dport_to_2 = udp_dport_from_2 + 5000
+ tcp_sport_from_2 = 130
+ tcp_sport_to_2 = tcp_sport_from_2 + 5
+ tcp_dport_from_2 = 20000
+ tcp_dport_to_2 = tcp_dport_from_2 + 5000
+
icmp4_type = 8 # echo request
icmp4_code = 3
icmp6_type = 128 # echo request
icmp6_code = 3
+ icmp4_type_2 = 8
+ icmp4_code_from_2 = 5
+ icmp4_code_to_2 = 20
+ icmp6_type_2 = 128
+ icmp6_code_from_2 = 8
+ icmp6_code_to_2 = 42
+
# Test variables
bd_id = 1
super(TestACLplugin, self).tearDown()
if not self.vpp_dead:
self.logger.info(self.vapi.ppcli("show l2fib verbose"))
+ self.logger.info(self.vapi.ppcli("show acl-plugin acl"))
+ self.logger.info(self.vapi.ppcli("show acl-plugin interface"))
+ self.logger.info(self.vapi.ppcli("show acl-plugin tables"))
self.logger.info(self.vapi.ppcli("show bridge-domain %s detail"
% self.bd_id))
sport_to = self.udp_sport_to
dport_from = self.udp_dport_from
dport_to = self.udp_dport_to
+ elif ports == self.PORTS_RANGE_2:
+ if proto == 1:
+ sport_from = self.icmp4_type_2
+ sport_to = self.icmp4_type_2
+ dport_from = self.icmp4_code_from_2
+ dport_to = self.icmp4_code_to_2
+ elif proto == 58:
+ sport_from = self.icmp6_type_2
+ sport_to = self.icmp6_type_2
+ dport_from = self.icmp6_code_from_2
+ dport_to = self.icmp6_code_to_2
+ elif proto == self.proto[self.IP][self.TCP]:
+ sport_from = self.tcp_sport_from_2
+ sport_to = self.tcp_sport_to_2
+ dport_from = self.tcp_dport_from_2
+ dport_to = self.tcp_dport_to_2
+ elif proto == self.proto[self.IP][self.UDP]:
+ sport_from = self.udp_sport_from_2
+ sport_to = self.udp_sport_to_2
+ dport_from = self.udp_dport_from_2
+ dport_to = self.udp_dport_to_2
else:
sport_from = ports
sport_to = ports
return rule
def apply_rules(self, rules, tag=''):
- reply = self.api_acl_add_replace(acl_index=4294967295, r=rules,
- count=len(rules),
- tag=tag)
+ reply = self.vapi.acl_add_replace(acl_index=4294967295, r=rules,
+ tag=tag)
self.logger.info("Dumped ACL: " + str(
- self.api_acl_dump(reply.acl_index)))
+ self.vapi.acl_dump(reply.acl_index)))
# Apply a ACL on the interface as inbound
for i in self.pg_interfaces:
- self.api_acl_interface_set_acl_list(sw_if_index=i.sw_if_index,
- count=1, n_input=1,
- acls=[reply.acl_index])
+ self.vapi.acl_interface_set_acl_list(sw_if_index=i.sw_if_index,
+ n_input=1,
+ acls=[reply.acl_index])
return
def create_upper_layer(self, packet_index, proto, ports=0):
return ''
def create_stream(self, src_if, packet_sizes, traffic_type=0, ipv6=0,
- proto=-1, ports=0, fragments=False):
+ proto=-1, ports=0, fragments=False, pkt_raw=True):
"""
Create input packet stream for defined interface using hosts or
deleted_hosts list.
code=self.icmp4_code)
else:
p /= self.create_upper_layer(i, pkt_info.proto, ports)
- p /= Raw(payload)
- pkt_info.data = p.copy()
- size = random.choice(packet_sizes)
- self.extend_packet(p, size)
+ if pkt_raw:
+ p /= Raw(payload)
+ pkt_info.data = p.copy()
+ if pkt_raw:
+ size = random.choice(packet_sizes)
+ self.extend_packet(p, size)
pkts.append(p)
return pkts
self.pg_start()
def run_verify_test(self, traffic_type=0, ip_type=0, proto=-1, ports=0,
- frags=False):
+ frags=False, pkt_raw=True):
# Test
# Create incoming packet streams for packet-generator interfaces
pkts_cnt = 0
if self.flows.__contains__(i):
pkts = self.create_stream(i, self.pg_if_packet_sizes,
traffic_type, ip_type, proto, ports,
- frags)
+ frags, pkt_raw)
if len(pkts) > 0:
i.add_stream(pkts)
pkts_cnt += len(pkts)
capture = dst_if.get_capture(0)
self.assertEqual(len(capture), 0)
- def api_acl_add_replace(self, acl_index, r, count, tag='',
- expected_retval=0):
- """Add/replace an ACL
-
- :param int acl_index: ACL index to replace,
- 4294967295 to create new ACL.
- :param acl_rule r: ACL rules array.
- :param str tag: symbolic tag (description) for this ACL.
- :param int count: number of rules.
- """
- return self.vapi.api(self.vapi.papi.acl_add_replace,
- {'acl_index': acl_index,
- 'r': r,
- 'count': count,
- 'tag': tag},
- expected_retval=expected_retval)
-
- def api_acl_interface_set_acl_list(self, sw_if_index, count, n_input, acls,
- expected_retval=0):
- return self.vapi.api(self.vapi.papi.acl_interface_set_acl_list,
- {'sw_if_index': sw_if_index,
- 'count': count,
- 'n_input': n_input,
- 'acls': acls},
- expected_retval=expected_retval)
-
- def api_acl_dump(self, acl_index, expected_retval=0):
- return self.vapi.api(self.vapi.papi.acl_dump,
- {'acl_index': acl_index},
- expected_retval=expected_retval)
-
def test_0000_warmup_test(self):
""" ACL plugin version check; learn MACs
"""
# self.assertEqual(reply.minor, 0)
def test_0001_acl_create(self):
- """ ACL create test
+ """ ACL create/delete test
"""
self.logger.info("ACLP_TEST_START_0001")
'dst_ip_addr': '\x00\x00\x00\x00',
'dst_ip_prefix_len': 0}]
# Test 1: add a new ACL
- reply = self.api_acl_add_replace(acl_index=4294967295, r=r,
- count=len(r), tag="permit 1234")
+ reply = self.vapi.acl_add_replace(acl_index=4294967295, r=r,
+ tag="permit 1234")
self.assertEqual(reply.retval, 0)
# The very first ACL gets #0
self.assertEqual(reply.acl_index, 0)
- rr = self.api_acl_dump(reply.acl_index)
+ first_acl = reply.acl_index
+ rr = self.vapi.acl_dump(reply.acl_index)
self.logger.info("Dumped ACL: " + str(rr))
self.assertEqual(len(rr), 1)
# We should have the same number of ACL entries as we had asked
'dst_ip_addr': '\x00\x00\x00\x00',
'dst_ip_prefix_len': 0})
- reply = self.api_acl_add_replace(acl_index=4294967295, r=r_deny,
- count=len(r_deny),
- tag="deny 1234;permit all")
+ reply = self.vapi.acl_add_replace(acl_index=4294967295, r=r_deny,
+ tag="deny 1234;permit all")
self.assertEqual(reply.retval, 0)
# The second ACL gets #1
self.assertEqual(reply.acl_index, 1)
+ second_acl = reply.acl_index
# Test 2: try to modify a nonexistent ACL
- reply = self.api_acl_add_replace(acl_index=432, r=r, count=len(r),
- tag="FFFF:FFFF", expected_retval=-1)
+ reply = self.vapi.acl_add_replace(acl_index=432, r=r,
+ tag="FFFF:FFFF", expected_retval=-1)
self.assertEqual(reply.retval, -1)
# The ACL number should pass through
self.assertEqual(reply.acl_index, 432)
+ # apply an ACL on an interface inbound, try to delete ACL, must fail
+ self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
+ n_input=1,
+ acls=[first_acl])
+ reply = self.vapi.acl_del(acl_index=first_acl, expected_retval=-1)
+ # Unapply an ACL and then try to delete it - must be ok
+ self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
+ n_input=0,
+ acls=[])
+ reply = self.vapi.acl_del(acl_index=first_acl, expected_retval=0)
+
+ # apply an ACL on an interface outbound, try to delete ACL, must fail
+ self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
+ n_input=0,
+ acls=[second_acl])
+ reply = self.vapi.acl_del(acl_index=second_acl, expected_retval=-1)
+ # Unapply the ACL and then try to delete it - must be ok
+ self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
+ n_input=0,
+ acls=[])
+ reply = self.vapi.acl_del(acl_index=second_acl, expected_retval=0)
+
+ # try to apply a nonexistent ACL - must fail
+ self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
+ n_input=1,
+ acls=[first_acl],
+ expected_retval=-1)
self.logger.info("ACLP_TEST_FINISH_0001")
for i in range(len(r)):
rules.append(self.create_rule(r[i][0], r[i][1], r[i][2], r[i][3]))
- reply = self.api_acl_add_replace(acl_index=4294967295, r=rules,
- count=len(rules))
- result = self.api_acl_dump(reply.acl_index)
+ reply = self.vapi.acl_add_replace(acl_index=4294967295, r=rules)
+ result = self.vapi.acl_dump(reply.acl_index)
i = 0
for drules in result:
self.logger.info("ACLP_TEST_FINISH_0021")
+ def test_0022_zero_length_udp_ipv4(self):
+ """ VPP-687 zero length udp ipv4 packet"""
+ self.logger.info("ACLP_TEST_START_0022")
+
+ port = random.randint(0, 65535)
+ # Add an ACL
+ rules = []
+ rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
+ self.proto[self.IP][self.UDP]))
+ # deny ip any any in the end
+ rules.append(
+ self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
+
+ # Apply rules
+ self.apply_rules(rules, "permit empty udp ip4 " + str(port))
+
+ # Traffic should still pass
+ # Create incoming packet streams for packet-generator interfaces
+ pkts_cnt = 0
+ pkts = self.create_stream(self.pg0, self.pg_if_packet_sizes,
+ self.IP, self.IPV4,
+ self.proto[self.IP][self.UDP], port,
+ False, False)
+ if len(pkts) > 0:
+ self.pg0.add_stream(pkts)
+ pkts_cnt += len(pkts)
+
+ # Enable packet capture and start packet sendingself.IPV
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+
+ self.pg1.get_capture(pkts_cnt)
+
+ self.logger.info("ACLP_TEST_FINISH_0022")
+
+ def test_0023_zero_length_udp_ipv6(self):
+ """ VPP-687 zero length udp ipv6 packet"""
+ self.logger.info("ACLP_TEST_START_0023")
+
+ port = random.randint(0, 65535)
+ # Add an ACL
+ rules = []
+ rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
+ self.proto[self.IP][self.UDP]))
+ # deny ip any any in the end
+ rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
+
+ # Apply rules
+ self.apply_rules(rules, "permit empty udp ip6 "+str(port))
+
+ # Traffic should still pass
+ # Create incoming packet streams for packet-generator interfaces
+ pkts_cnt = 0
+ pkts = self.create_stream(self.pg0, self.pg_if_packet_sizes,
+ self.IP, self.IPV6,
+ self.proto[self.IP][self.UDP], port,
+ False, False)
+ if len(pkts) > 0:
+ self.pg0.add_stream(pkts)
+ pkts_cnt += len(pkts)
+
+ # Enable packet capture and start packet sendingself.IPV
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+
+ # Verify outgoing packet streams per packet-generator interface
+ self.pg1.get_capture(pkts_cnt)
+
+ self.logger.info("ACLP_TEST_FINISH_0023")
+
+ def test_0108_tcp_permit_v4(self):
+ """ permit TCPv4 + non-match range
+ """
+ self.logger.info("ACLP_TEST_START_0108")
+
+ # Add an ACL
+ rules = []
+ rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
+ self.proto[self.IP][self.TCP]))
+ rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
+ self.proto[self.IP][self.TCP]))
+ # deny ip any any in the end
+ rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
+
+ # Apply rules
+ self.apply_rules(rules, "permit ipv4 tcp")
+
+ # Traffic should still pass
+ self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP])
+
+ self.logger.info("ACLP_TEST_FINISH_0108")
+
+ def test_0109_tcp_permit_v6(self):
+ """ permit TCPv6 + non-match range
+ """
+ self.logger.info("ACLP_TEST_START_0109")
+
+ # Add an ACL
+ rules = []
+ rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE_2,
+ self.proto[self.IP][self.TCP]))
+ rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
+ self.proto[self.IP][self.TCP]))
+ # deny ip any any in the end
+ rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
+
+ # Apply rules
+ self.apply_rules(rules, "permit ip6 tcp")
+
+ # Traffic should still pass
+ self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.TCP])
+
+ self.logger.info("ACLP_TEST_FINISH_0109")
+
+ def test_0110_udp_permit_v4(self):
+ """ permit UDPv4 + non-match range
+ """
+ self.logger.info("ACLP_TEST_START_0110")
+
+ # Add an ACL
+ rules = []
+ rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
+ self.proto[self.IP][self.UDP]))
+ rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
+ self.proto[self.IP][self.UDP]))
+ # deny ip any any in the end
+ rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
+
+ # Apply rules
+ self.apply_rules(rules, "permit ipv4 udp")
+
+ # Traffic should still pass
+ self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP])
+
+ self.logger.info("ACLP_TEST_FINISH_0110")
+
+ def test_0111_udp_permit_v6(self):
+ """ permit UDPv6 + non-match range
+ """
+ self.logger.info("ACLP_TEST_START_0111")
+
+ # Add an ACL
+ rules = []
+ rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE_2,
+ self.proto[self.IP][self.UDP]))
+ rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
+ self.proto[self.IP][self.UDP]))
+ # deny ip any any in the end
+ rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
+
+ # Apply rules
+ self.apply_rules(rules, "permit ip6 udp")
+
+ # Traffic should still pass
+ self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.UDP])
+
+ self.logger.info("ACLP_TEST_FINISH_0111")
+
+ def test_0112_tcp_deny(self):
+ """ deny TCPv4/v6 + non-match range
+ """
+ self.logger.info("ACLP_TEST_START_0112")
+
+ # Add an ACL
+ rules = []
+ rules.append(self.create_rule(self.IPV4, self.PERMIT,
+ self.PORTS_RANGE_2,
+ self.proto[self.IP][self.TCP]))
+ rules.append(self.create_rule(self.IPV6, self.PERMIT,
+ self.PORTS_RANGE_2,
+ self.proto[self.IP][self.TCP]))
+ rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
+ self.proto[self.IP][self.TCP]))
+ rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
+ self.proto[self.IP][self.TCP]))
+ # permit ip any any in the end
+ rules.append(self.create_rule(self.IPV4, self.PERMIT,
+ self.PORTS_ALL, 0))
+ rules.append(self.create_rule(self.IPV6, self.PERMIT,
+ self.PORTS_ALL, 0))
+
+ # Apply rules
+ self.apply_rules(rules, "deny ip4/ip6 tcp")
+
+ # Traffic should not pass
+ self.run_verify_negat_test(self.IP, self.IPRANDOM,
+ self.proto[self.IP][self.TCP])
+
+ self.logger.info("ACLP_TEST_FINISH_0112")
+
+ def test_0113_udp_deny(self):
+ """ deny UDPv4/v6 + non-match range
+ """
+ self.logger.info("ACLP_TEST_START_0113")
+
+ # Add an ACL
+ rules = []
+ rules.append(self.create_rule(self.IPV4, self.PERMIT,
+ self.PORTS_RANGE_2,
+ self.proto[self.IP][self.UDP]))
+ rules.append(self.create_rule(self.IPV6, self.PERMIT,
+ self.PORTS_RANGE_2,
+ self.proto[self.IP][self.UDP]))
+ rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
+ self.proto[self.IP][self.UDP]))
+ rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
+ self.proto[self.IP][self.UDP]))
+ # permit ip any any in the end
+ rules.append(self.create_rule(self.IPV4, self.PERMIT,
+ self.PORTS_ALL, 0))
+ rules.append(self.create_rule(self.IPV6, self.PERMIT,
+ self.PORTS_ALL, 0))
+
+ # Apply rules
+ self.apply_rules(rules, "deny ip4/ip6 udp")
+
+ # Traffic should not pass
+ self.run_verify_negat_test(self.IP, self.IPRANDOM,
+ self.proto[self.IP][self.UDP])
+
+ self.logger.info("ACLP_TEST_FINISH_0113")
+
+
if __name__ == '__main__':
unittest.main(testRunner=VppTestRunner)