X-Git-Url: https://gerrit.fd.io/r/gitweb?p=vpp.git;a=blobdiff_plain;f=src%2Fplugins%2Facl%2Ftest%2Ftest_acl_plugin.py;h=f07d37548fe533ee63559e2eff707c82a86a79f8;hp=8cac81cdb9f17254e7e90e3ed6114d8f51e7c3e2;hb=492a5d0bd;hpb=aad1ee149403994194cf37cef4530b042ba7df3a diff --git a/src/plugins/acl/test/test_acl_plugin.py b/src/plugins/acl/test/test_acl_plugin.py index 8cac81cdb9f..f07d37548fe 100644 --- a/src/plugins/acl/test/test_acl_plugin.py +++ b/src/plugins/acl/test/test_acl_plugin.py @@ -12,11 +12,8 @@ from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest from scapy.layers.inet6 import IPv6ExtHdrFragment from framework import VppTestCase, VppTestRunner from util import Host, ppp -from ipaddress import IPv4Network, IPv6Network from vpp_lo_interface import VppLoInterface -from vpp_acl import AclRule, VppAcl, VppAclInterface, VppEtypeWhitelist -from vpp_ip import INVALID_INDEX class TestACLplugin(VppTestCase): @@ -178,49 +175,105 @@ class TestACLplugin(VppTestCase): % self.bd_id)) def create_rule(self, ip=0, permit_deny=0, ports=PORTS_ALL, proto=-1, - s_prefix=0, s_ip=0, - d_prefix=0, d_ip=0): - if ip: - src_prefix = IPv6Network((s_ip, s_prefix)) - dst_prefix = IPv6Network((d_ip, d_prefix)) + s_prefix=0, s_ip=b'\x00\x00\x00\x00', + d_prefix=0, d_ip=b'\x00\x00\x00\x00'): + if proto == -1: + return + if ports == self.PORTS_ALL: + sport_from = 0 + dport_from = 0 + sport_to = 65535 if proto != 1 and proto != 58 else 255 + dport_to = sport_to + elif ports == self.PORTS_RANGE: + if proto == 1: + sport_from = self.icmp4_type + sport_to = self.icmp4_type + dport_from = self.icmp4_code + dport_to = self.icmp4_code + elif proto == 58: + sport_from = self.icmp6_type + sport_to = self.icmp6_type + dport_from = self.icmp6_code + dport_to = self.icmp6_code + elif proto == self.proto[self.IP][self.TCP]: + sport_from = self.tcp_sport_from + sport_to = self.tcp_sport_to + dport_from = self.tcp_dport_from + dport_to = self.tcp_dport_to + elif proto == self.proto[self.IP][self.UDP]: + sport_from = self.udp_sport_from + sport_to = self.udp_sport_to + dport_from = self.udp_dport_from + dport_to = self.udp_dport_to + elif ports == self.PORTS_RANGE_2: + if proto == 1: + sport_from = self.icmp4_type_2 + sport_to = self.icmp4_type_2 + dport_from = self.icmp4_code_from_2 + dport_to = self.icmp4_code_to_2 + elif proto == 58: + sport_from = self.icmp6_type_2 + sport_to = self.icmp6_type_2 + dport_from = self.icmp6_code_from_2 + dport_to = self.icmp6_code_to_2 + elif proto == self.proto[self.IP][self.TCP]: + sport_from = self.tcp_sport_from_2 + sport_to = self.tcp_sport_to_2 + dport_from = self.tcp_dport_from_2 + dport_to = self.tcp_dport_to_2 + elif proto == self.proto[self.IP][self.UDP]: + sport_from = self.udp_sport_from_2 + sport_to = self.udp_sport_to_2 + dport_from = self.udp_dport_from_2 + dport_to = self.udp_dport_to_2 else: - src_prefix = IPv4Network((s_ip, s_prefix)) - dst_prefix = IPv4Network((d_ip, d_prefix)) - return AclRule(is_permit=permit_deny, ports=ports, proto=proto, - src_prefix=src_prefix, dst_prefix=dst_prefix) - - def apply_rules(self, rules, tag=None): - acl = VppAcl(self, rules, tag=tag) - acl.add_vpp_config() - self.logger.info("Dumped ACL: " + str(acl.dump())) + sport_from = ports + sport_to = ports + dport_from = ports + dport_to = ports + + rule = ({'is_permit': permit_deny, 'is_ipv6': ip, 'proto': proto, + 'srcport_or_icmptype_first': sport_from, + 'srcport_or_icmptype_last': sport_to, + 'src_ip_prefix_len': s_prefix, + 'src_ip_addr': s_ip, + 'dstport_or_icmpcode_first': dport_from, + 'dstport_or_icmpcode_last': dport_to, + 'dst_ip_prefix_len': d_prefix, + 'dst_ip_addr': d_ip}) + return rule + + def apply_rules(self, rules, tag=b''): + reply = self.vapi.acl_add_replace(acl_index=4294967295, r=rules, + tag=tag) + self.logger.info("Dumped ACL: " + str( + self.vapi.acl_dump(reply.acl_index))) # Apply a ACL on the interface as inbound for i in self.pg_interfaces: - acl_if = VppAclInterface( - self, sw_if_index=i.sw_if_index, n_input=1, acls=[acl]) - acl_if.add_vpp_config() - return acl.acl_index - - def apply_rules_to(self, rules, tag=None, sw_if_index=INVALID_INDEX): - acl = VppAcl(self, rules, tag=tag) - acl.add_vpp_config() - self.logger.info("Dumped ACL: " + str(acl.dump())) + self.vapi.acl_interface_set_acl_list(sw_if_index=i.sw_if_index, + n_input=1, + acls=[reply.acl_index]) + return reply.acl_index + + def apply_rules_to(self, rules, tag=b'', sw_if_index=0xFFFFFFFF): + reply = self.vapi.acl_add_replace(acl_index=4294967295, r=rules, + tag=tag) + self.logger.info("Dumped ACL: " + str( + self.vapi.acl_dump(reply.acl_index))) # Apply a ACL on the interface as inbound - acl_if = VppAclInterface(self, sw_if_index=sw_if_index, n_input=1, - acls=[acl]) - return acl.acl_index + self.vapi.acl_interface_set_acl_list(sw_if_index=sw_if_index, + n_input=1, + acls=[reply.acl_index]) + return reply.acl_index - def etype_whitelist(self, whitelist, n_input, add=True): + def etype_whitelist(self, whitelist, n_input): # Apply whitelists on all the interfaces - if add: - self._wl = [] - for i in self.pg_interfaces: - self._wl.append(VppEtypeWhitelist( - self, sw_if_index=i.sw_if_index, whitelist=whitelist, - n_input=n_input).add_vpp_config()) - else: - if hasattr(self, "_wl"): - for wl in self._wl: - wl.remove_vpp_config() + for i in self.pg_interfaces: + # checkstyle can't read long names. Help them. + fun = self.vapi.acl_interface_set_etype_whitelist + fun(sw_if_index=i.sw_if_index, n_input=n_input, + whitelist=whitelist) + return def create_upper_layer(self, packet_index, proto, ports=0): p = self.proto_map[proto] @@ -489,16 +542,24 @@ class TestACLplugin(VppTestCase): """ self.logger.info("ACLP_TEST_START_0001") - # Create a permit-1234 ACL - r = [AclRule(is_permit=1, proto=17, ports=1234)] - r[0].sport_to = 1235 + # Add an ACL + r = [{'is_permit': 1, 'is_ipv6': 0, 'proto': 17, + 'srcport_or_icmptype_first': 1234, + 'srcport_or_icmptype_last': 1235, + 'src_ip_prefix_len': 0, + 'src_ip_addr': b'\x00\x00\x00\x00', + 'dstport_or_icmpcode_first': 1234, + 'dstport_or_icmpcode_last': 1234, + 'dst_ip_addr': b'\x00\x00\x00\x00', + 'dst_ip_prefix_len': 0}] # Test 1: add a new ACL - first_acl = VppAcl(self, rules=r, tag="permit 1234") - first_acl.add_vpp_config() - self.assertTrue(first_acl.query_vpp_config()) + reply = self.vapi.acl_add_replace(acl_index=4294967295, r=r, + tag=b"permit 1234") + self.assertEqual(reply.retval, 0) # The very first ACL gets #0 - self.assertEqual(first_acl.acl_index, 0) - rr = first_acl.dump() + self.assertEqual(reply.acl_index, 0) + first_acl = reply.acl_index + rr = self.vapi.acl_dump(reply.acl_index) self.logger.info("Dumped ACL: " + str(rr)) self.assertEqual(len(rr), 1) # We should have the same number of ACL entries as we had asked @@ -507,50 +568,70 @@ class TestACLplugin(VppTestCase): # are different types, we need to iterate over rules and keys to get # to basic values. for i_rule in range(0, len(r) - 1): - encoded_rule = r[i_rule].encode() - for rule_key in encoded_rule: + for rule_key in r[i_rule]: self.assertEqual(rr[0].r[i_rule][rule_key], - encoded_rule[rule_key]) - - # Create a deny-1234 ACL - r_deny = [AclRule(is_permit=0, proto=17, ports=1234), - AclRule(is_permit=1, proto=17, ports=0)] - r_deny[0].sport_to = 1235 - second_acl = VppAcl(self, rules=r_deny, tag="deny 1234;permit all") - second_acl.add_vpp_config() - self.assertTrue(second_acl.query_vpp_config()) + r[i_rule][rule_key]) + + # Add a deny-1234 ACL + r_deny = [{'is_permit': 0, 'is_ipv6': 0, 'proto': 17, + 'srcport_or_icmptype_first': 1234, + 'srcport_or_icmptype_last': 1235, + 'src_ip_prefix_len': 0, + 'src_ip_addr': b'\x00\x00\x00\x00', + 'dstport_or_icmpcode_first': 1234, + 'dstport_or_icmpcode_last': 1234, + 'dst_ip_addr': b'\x00\x00\x00\x00', + 'dst_ip_prefix_len': 0}, + {'is_permit': 1, 'is_ipv6': 0, 'proto': 17, + 'srcport_or_icmptype_first': 0, + 'srcport_or_icmptype_last': 0, + 'src_ip_prefix_len': 0, + 'src_ip_addr': b'\x00\x00\x00\x00', + 'dstport_or_icmpcode_first': 0, + 'dstport_or_icmpcode_last': 0, + 'dst_ip_addr': b'\x00\x00\x00\x00', + 'dst_ip_prefix_len': 0}] + + reply = self.vapi.acl_add_replace(acl_index=4294967295, r=r_deny, + tag=b"deny 1234;permit all") + self.assertEqual(reply.retval, 0) # The second ACL gets #1 - self.assertEqual(second_acl.acl_index, 1) + self.assertEqual(reply.acl_index, 1) + second_acl = reply.acl_index # Test 2: try to modify a nonexistent ACL - invalid_acl = VppAcl(self, acl_index=432, rules=r, tag="FFFF:FFFF") - reply = invalid_acl.add_vpp_config(expect_error=True) - - # apply an ACL on an interface inbound, try to delete ACL, must fail - acl_if_list = VppAclInterface( - self, sw_if_index=self.pg0.sw_if_index, n_input=1, - acls=[first_acl]) - acl_if_list.add_vpp_config() - first_acl.remove_vpp_config(expect_error=True) - # Unapply an ACL and then try to delete it - must be ok - acl_if_list.remove_vpp_config() - first_acl.remove_vpp_config() - + reply = self.vapi.acl_add_replace(acl_index=432, r=r, + tag=b"FFFF:FFFF", expected_retval=-6) + self.assertEqual(reply.retval, -6) + # The ACL number should pass through + self.assertEqual(reply.acl_index, 432) # apply an ACL on an interface inbound, try to delete ACL, must fail - acl_if_list = VppAclInterface( - self, sw_if_index=self.pg0.sw_if_index, n_input=0, - acls=[second_acl]) - acl_if_list.add_vpp_config() - second_acl.remove_vpp_config(expect_error=True) + self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index, + n_input=1, + acls=[first_acl]) + reply = self.vapi.acl_del(acl_index=first_acl, expected_retval=-142) # Unapply an ACL and then try to delete it - must be ok - acl_if_list.remove_vpp_config() - second_acl.remove_vpp_config() + self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index, + n_input=0, + acls=[]) + reply = self.vapi.acl_del(acl_index=first_acl, expected_retval=0) + + # apply an ACL on an interface outbound, try to delete ACL, must fail + self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index, + n_input=0, + acls=[second_acl]) + reply = self.vapi.acl_del(acl_index=second_acl, expected_retval=-143) + # Unapply the ACL and then try to delete it - must be ok + self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index, + n_input=0, + acls=[]) + reply = self.vapi.acl_del(acl_index=second_acl, expected_retval=0) # try to apply a nonexistent ACL - must fail - acl_if_list = VppAclInterface( - self, sw_if_index=self.pg0.sw_if_index, n_input=0, - acls=[invalid_acl]) - acl_if_list.add_vpp_config(expect_error=True) + self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index, + n_input=1, + acls=[first_acl], + expected_retval=-6) self.logger.info("ACLP_TEST_FINISH_0001") @@ -561,12 +642,12 @@ class TestACLplugin(VppTestCase): rules = [] rules.append(self.create_rule(self.IPV4, self.PERMIT, - 0, self.proto[self.IP][self.UDP])) + 0, self.proto[self.IP][self.UDP])) rules.append(self.create_rule(self.IPV4, self.PERMIT, - 0, self.proto[self.IP][self.TCP])) + 0, self.proto[self.IP][self.TCP])) # Apply rules - acl_idx = self.apply_rules(rules, "permit per-flow") + acl_idx = self.apply_rules(rules, b"permit per-flow") # enable counters reply = self.vapi.papi.acl_stats_intf_counters_enable(enable=1) @@ -595,15 +676,14 @@ class TestACLplugin(VppTestCase): self.logger.info("ACLP_TEST_START_0003") # Add a deny-flows ACL rules = [] - rules.append(self.create_rule( - self.IPV4, self.DENY, self.PORTS_ALL, - self.proto[self.IP][self.UDP])) + rules.append(self.create_rule(self.IPV4, self.DENY, + self.PORTS_ALL, self.proto[self.IP][self.UDP])) # Permit ip any any in the end rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_ALL, 0)) # Apply rules - acl_idx = self.apply_rules(rules, "deny per-flow;permit all") + acl_idx = self.apply_rules(rules, b"deny per-flow;permit all") # enable counters reply = self.vapi.papi.acl_stats_intf_counters_enable(enable=1) @@ -637,7 +717,7 @@ class TestACLplugin(VppTestCase): rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0)) # Apply rules - self.apply_rules(rules, "permit icmpv4") + self.apply_rules(rules, b"permit icmpv4") # Traffic should still pass self.run_verify_test(self.ICMP, self.IPV4, @@ -658,7 +738,7 @@ class TestACLplugin(VppTestCase): rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0)) # Apply rules - self.apply_rules(rules, "permit icmpv6") + self.apply_rules(rules, b"permit icmpv6") # Traffic should still pass self.run_verify_test(self.ICMP, self.IPV6, @@ -679,7 +759,7 @@ class TestACLplugin(VppTestCase): self.PORTS_ALL, 0)) # Apply rules - self.apply_rules(rules, "deny icmpv4") + self.apply_rules(rules, b"deny icmpv4") # Traffic should not pass self.run_verify_negat_test(self.ICMP, self.IPV4, 0) @@ -699,7 +779,7 @@ class TestACLplugin(VppTestCase): self.PORTS_ALL, 0)) # Apply rules - self.apply_rules(rules, "deny icmpv6") + self.apply_rules(rules, b"deny icmpv6") # Traffic should not pass self.run_verify_negat_test(self.ICMP, self.IPV6, 0) @@ -714,12 +794,12 @@ class TestACLplugin(VppTestCase): # Add an ACL rules = [] rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE, - self.proto[self.IP][self.TCP])) + self.proto[self.IP][self.TCP])) # deny ip any any in the end rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0)) # Apply rules - self.apply_rules(rules, "permit ipv4 tcp") + self.apply_rules(rules, b"permit ipv4 tcp") # Traffic should still pass self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP]) @@ -739,7 +819,7 @@ class TestACLplugin(VppTestCase): rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0)) # Apply rules - self.apply_rules(rules, "permit ip6 tcp") + self.apply_rules(rules, b"permit ip6 tcp") # Traffic should still pass self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.TCP]) @@ -759,7 +839,7 @@ class TestACLplugin(VppTestCase): rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0)) # Apply rules - self.apply_rules(rules, "permit ipv udp") + self.apply_rules(rules, b"permit ipv udp") # Traffic should still pass self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP]) @@ -779,7 +859,7 @@ class TestACLplugin(VppTestCase): rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0)) # Apply rules - self.apply_rules(rules, "permit ip6 udp") + self.apply_rules(rules, b"permit ip6 udp") # Traffic should still pass self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.UDP]) @@ -804,7 +884,7 @@ class TestACLplugin(VppTestCase): self.PORTS_ALL, 0)) # Apply rules - self.apply_rules(rules, "deny ip4/ip6 tcp") + self.apply_rules(rules, b"deny ip4/ip6 tcp") # Traffic should not pass self.run_verify_negat_test(self.IP, self.IPRANDOM, @@ -830,7 +910,7 @@ class TestACLplugin(VppTestCase): self.PORTS_ALL, 0)) # Apply rules - self.apply_rules(rules, "deny ip4/ip6 udp") + self.apply_rules(rules, b"deny ip4/ip6 udp") # Traffic should not pass self.run_verify_negat_test(self.IP, self.IPRANDOM, @@ -868,13 +948,13 @@ class TestACLplugin(VppTestCase): for i in range(len(r)): rules.append(self.create_rule(r[i][0], r[i][1], r[i][2], r[i][3])) - acl = VppAcl(self, rules=rules) - acl.add_vpp_config() - result = acl.dump() + reply = self.vapi.acl_add_replace(acl_index=4294967295, r=rules) + result = self.vapi.acl_dump(reply.acl_index) i = 0 for drules in result: for dr in drules.r: + self.assertEqual(dr.is_ipv6, r[i][0]) self.assertEqual(dr.is_permit, r[i][1]) self.assertEqual(dr.proto, r[i][3]) @@ -921,7 +1001,7 @@ class TestACLplugin(VppTestCase): rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0)) # Apply rules - self.apply_rules(rules, "permit ip4 tcp %d" % port) + self.apply_rules(rules, b"permit ip4 tcp %d" % port) # Traffic should still pass self.run_verify_test(self.IP, self.IPV4, @@ -943,7 +1023,7 @@ class TestACLplugin(VppTestCase): rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0)) # Apply rules - self.apply_rules(rules, "permit ip4 tcp %d" % port) + self.apply_rules(rules, b"permit ip4 tcp %d" % port) # Traffic should still pass self.run_verify_test(self.IP, self.IPV4, @@ -965,7 +1045,7 @@ class TestACLplugin(VppTestCase): rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0)) # Apply rules - self.apply_rules(rules, "permit ip4 tcp %d" % port) + self.apply_rules(rules, b"permit ip4 tcp %d" % port) # Traffic should still pass self.run_verify_test(self.IP, self.IPV6, @@ -974,7 +1054,7 @@ class TestACLplugin(VppTestCase): self.logger.info("ACLP_TEST_FINISH_0017") def test_0018_udp_permit_port_v6(self): - """ permit single UDPv6 + """ permit single UPPv6 """ self.logger.info("ACLP_TEST_START_0018") @@ -988,7 +1068,7 @@ class TestACLplugin(VppTestCase): self.PORTS_ALL, 0)) # Apply rules - self.apply_rules(rules, "permit ip4 tcp %d" % port) + self.apply_rules(rules, b"permit ip4 tcp %d" % port) # Traffic should still pass self.run_verify_test(self.IP, self.IPV6, @@ -1015,7 +1095,7 @@ class TestACLplugin(VppTestCase): self.PORTS_ALL, 0)) # Apply rules - self.apply_rules(rules, "deny ip4/ip6 udp %d" % port) + self.apply_rules(rules, b"deny ip4/ip6 udp %d" % port) # Traffic should not pass self.run_verify_negat_test(self.IP, self.IPRANDOM, @@ -1042,7 +1122,7 @@ class TestACLplugin(VppTestCase): self.PORTS_ALL, 0)) # Apply rules - self.apply_rules(rules, "deny ip4/ip6 udp %d" % port) + self.apply_rules(rules, b"deny ip4/ip6 udp %d" % port) # Traffic should not pass self.run_verify_negat_test(self.IP, self.IPRANDOM, @@ -1070,7 +1150,7 @@ class TestACLplugin(VppTestCase): self.PORTS_ALL, 0)) # Apply rules - self.apply_rules(rules, "deny ip4/ip6 udp %d" % port) + self.apply_rules(rules, b"deny ip4/ip6 udp %d" % port) # Traffic should not pass self.run_verify_negat_test(self.IP, self.IPRANDOM, @@ -1092,7 +1172,7 @@ class TestACLplugin(VppTestCase): self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0)) # Apply rules - self.apply_rules(rules, "permit empty udp ip4 %d" % port) + self.apply_rules(rules, b"permit empty udp ip4 %d" % port) # Traffic should still pass # Create incoming packet streams for packet-generator interfaces @@ -1126,7 +1206,7 @@ class TestACLplugin(VppTestCase): rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0)) # Apply rules - self.apply_rules(rules, "permit empty udp ip6 %d" % port) + self.apply_rules(rules, b"permit empty udp ip6 %d" % port) # Traffic should still pass # Create incoming packet streams for packet-generator interfaces @@ -1156,14 +1236,14 @@ class TestACLplugin(VppTestCase): # Add an ACL rules = [] rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2, - self.proto[self.IP][self.TCP])) + self.proto[self.IP][self.TCP])) rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE, - self.proto[self.IP][self.TCP])) + self.proto[self.IP][self.TCP])) # deny ip any any in the end rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0)) # Apply rules - self.apply_rules(rules, "permit ipv4 tcp") + self.apply_rules(rules, b"permit ipv4 tcp") # Traffic should still pass self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP]) @@ -1185,7 +1265,7 @@ class TestACLplugin(VppTestCase): rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0)) # Apply rules - self.apply_rules(rules, "permit ip6 tcp") + self.apply_rules(rules, b"permit ip6 tcp") # Traffic should still pass self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.TCP]) @@ -1207,7 +1287,7 @@ class TestACLplugin(VppTestCase): rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0)) # Apply rules - self.apply_rules(rules, "permit ipv4 udp") + self.apply_rules(rules, b"permit ipv4 udp") # Traffic should still pass self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP]) @@ -1229,7 +1309,7 @@ class TestACLplugin(VppTestCase): rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0)) # Apply rules - self.apply_rules(rules, "permit ip6 udp") + self.apply_rules(rules, b"permit ip6 udp") # Traffic should still pass self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.UDP]) @@ -1260,7 +1340,7 @@ class TestACLplugin(VppTestCase): self.PORTS_ALL, 0)) # Apply rules - self.apply_rules(rules, "deny ip4/ip6 tcp") + self.apply_rules(rules, b"deny ip4/ip6 tcp") # Traffic should not pass self.run_verify_negat_test(self.IP, self.IPRANDOM, @@ -1292,7 +1372,7 @@ class TestACLplugin(VppTestCase): self.PORTS_ALL, 0)) # Apply rules - self.apply_rules(rules, "deny ip4/ip6 udp") + self.apply_rules(rules, b"deny ip4/ip6 udp") # Traffic should not pass self.run_verify_negat_test(self.IP, self.IPRANDOM, @@ -1308,14 +1388,14 @@ class TestACLplugin(VppTestCase): # Add an ACL rules = [] rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2, - self.proto[self.IP][self.TCP])) + self.proto[self.IP][self.TCP])) rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE, - self.proto[self.IP][self.TCP])) + self.proto[self.IP][self.TCP])) # deny ip any any in the end rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0)) # Apply rules - self.apply_rules(rules, "permit ipv4 tcp") + self.apply_rules(rules, b"permit ipv4 tcp") # Traffic should still pass also for an odd ethertype self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP], @@ -1330,14 +1410,15 @@ class TestACLplugin(VppTestCase): # Add an ACL rules = [] rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2, - self.proto[self.IP][self.TCP])) + self.proto[self.IP][self.TCP])) rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE, - self.proto[self.IP][self.TCP])) + self.proto[self.IP][self.TCP])) # deny ip any any in the end rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0)) # Apply rules - self.apply_rules(rules, "permit ipv4 tcp") + self.apply_rules(rules, b"permit ipv4 tcp") + # whitelist the 0xbbbb etype - so the 0xaaaa should be blocked self.etype_whitelist([0xbbb], 1) @@ -1347,7 +1428,7 @@ class TestACLplugin(VppTestCase): 0, False, 0xaaaa) # remove the whitelist - self.etype_whitelist([], 0, add=False) + self.etype_whitelist([], 0) self.logger.info("ACLP_TEST_FINISH_0305") @@ -1359,14 +1440,15 @@ class TestACLplugin(VppTestCase): # Add an ACL rules = [] rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2, - self.proto[self.IP][self.TCP])) + self.proto[self.IP][self.TCP])) rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE, - self.proto[self.IP][self.TCP])) + self.proto[self.IP][self.TCP])) # deny ip any any in the end rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0)) # Apply rules - self.apply_rules(rules, "permit ipv4 tcp") + self.apply_rules(rules, b"permit ipv4 tcp") + # whitelist the 0xbbbb etype - so the 0xaaaa should be blocked self.etype_whitelist([0xbbb], 1) @@ -1375,7 +1457,7 @@ class TestACLplugin(VppTestCase): 0, False, True, 0x0bbb) # remove the whitelist, the previously blocked 0xAAAA should pass now - self.etype_whitelist([], 0, add=False) + self.etype_whitelist([], 0) self.logger.info("ACLP_TEST_FINISH_0306") @@ -1387,19 +1469,19 @@ class TestACLplugin(VppTestCase): # Add an ACL rules = [] rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2, - self.proto[self.IP][self.TCP])) + self.proto[self.IP][self.TCP])) rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE, - self.proto[self.IP][self.TCP])) + self.proto[self.IP][self.TCP])) # deny ip any any in the end rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0)) # Apply rules - self.apply_rules(rules, "permit ipv4 tcp") + self.apply_rules(rules, b"permit ipv4 tcp") # whitelist the 0xbbbb etype - so the 0xaaaa should be blocked self.etype_whitelist([0xbbb], 1) # remove the whitelist, the previously blocked 0xAAAA should pass now - self.etype_whitelist([], 0, add=False) + self.etype_whitelist([], 0) # The whitelisted traffic, should pass self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP], @@ -1415,9 +1497,9 @@ class TestACLplugin(VppTestCase): # Add an ACL rules = [] rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2, - self.proto[self.IP][self.TCP])) + self.proto[self.IP][self.TCP])) rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE, - self.proto[self.IP][self.TCP])) + self.proto[self.IP][self.TCP])) # deny ip any any in the end rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0)) @@ -1426,13 +1508,12 @@ class TestACLplugin(VppTestCase): intf.append(VppLoInterface(self)) # Apply rules - self.apply_rules_to(rules, "permit ipv4 tcp", intf[0].sw_if_index) + self.apply_rules_to(rules, b"permit ipv4 tcp", intf[0].sw_if_index) # Remove the interface intf[0].remove_vpp_config() self.logger.info("ACLP_TEST_FINISH_0315") - if __name__ == '__main__': unittest.main(testRunner=VppTestRunner)