X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=test%2Ftest_acl_plugin.py;h=32ecedba2a69bd0589e8d09161eee992cbd875c8;hb=d9b0c6fbf7aa5bd9af84264105b39c82028a4a29;hp=53d9621594965ad4576940b7b88be6e7b300af58;hpb=f90348bcb4afd0af2611cefc43b17ef3042b511c;p=vpp.git diff --git a/test/test_acl_plugin.py b/test/test_acl_plugin.py index 53d96215949..32ecedba2a6 100644 --- a/test/test_acl_plugin.py +++ b/test/test_acl_plugin.py @@ -22,7 +22,7 @@ from vpp_ip import INVALID_INDEX @tag_fixme_vpp_workers class TestACLplugin(VppTestCase): - """ ACL plugin Test Case """ + """ACL plugin Test Case""" # traffic types IP = 0 @@ -39,7 +39,7 @@ class TestACLplugin(VppTestCase): # supported protocols proto = [[6, 17], [1, 58]] - proto_map = {1: 'ICMP', 58: 'ICMPv6EchoRequest', 6: 'TCP', 17: 'UDP'} + proto_map = {1: "ICMP", 58: "ICMPv6EchoRequest", 6: "TCP", 17: "UDP"} ICMPv4 = 0 ICMPv6 = 1 TCP = 0 @@ -105,11 +105,11 @@ class TestACLplugin(VppTestCase): # Create BD with MAC learning and unknown unicast flooding disabled # and put interfaces to this BD - cls.vapi.bridge_domain_add_del(bd_id=cls.bd_id, uu_flood=1, - learn=1) + cls.vapi.bridge_domain_add_del(bd_id=cls.bd_id, uu_flood=1, learn=1) for pg_if in cls.pg_interfaces: cls.vapi.sw_interface_set_l2_bridge( - rx_sw_if_index=pg_if.sw_if_index, bd_id=cls.bd_id) + rx_sw_if_index=pg_if.sw_if_index, bd_id=cls.bd_id + ) # Set up all interfaces for i in cls.pg_interfaces: @@ -135,14 +135,16 @@ class TestACLplugin(VppTestCase): for pg_if in cls.pg_interfaces: i += 1 start_nr = macs_per_if * i + start - end_nr = count + start if i == (n_int - 1) \ - else macs_per_if * (i + 1) + start + end_nr = ( + count + start if i == (n_int - 1) else macs_per_if * (i + 1) + start + ) hosts = cls.hosts_by_pg_idx[pg_if.sw_if_index] for j in range(int(start_nr), int(end_nr)): host = Host( "00:00:00:ff:%02x:%02x" % (pg_if.sw_if_index, j), "172.17.1%02x.%u" % (pg_if.sw_if_index, j), - "2017:dead:%02x::%u" % (pg_if.sw_if_index, j)) + "2017:dead:%02x::%u" % (pg_if.sw_if_index, j), + ) hosts.append(host) except Exception: @@ -176,20 +178,32 @@ class TestACLplugin(VppTestCase): self.logger.info(self.vapi.ppcli("show acl-plugin acl")) self.logger.info(self.vapi.ppcli("show acl-plugin interface")) self.logger.info(self.vapi.ppcli("show acl-plugin tables")) - self.logger.info(self.vapi.ppcli("show bridge-domain %s detail" - % self.bd_id)) - - def create_rule(self, ip=0, permit_deny=0, ports=PORTS_ALL, proto=-1, - s_prefix=0, s_ip=0, - d_prefix=0, d_ip=0): + self.logger.info(self.vapi.ppcli("show bridge-domain %s detail" % self.bd_id)) + + def create_rule( + self, + ip=0, + permit_deny=0, + ports=PORTS_ALL, + proto=-1, + s_prefix=0, + s_ip=0, + d_prefix=0, + d_ip=0, + ): if ip: src_prefix = IPv6Network((s_ip, s_prefix)) dst_prefix = IPv6Network((d_ip, d_prefix)) else: src_prefix = IPv4Network((s_ip, s_prefix)) dst_prefix = IPv4Network((d_ip, d_prefix)) - return AclRule(is_permit=permit_deny, ports=ports, proto=proto, - src_prefix=src_prefix, dst_prefix=dst_prefix) + return AclRule( + is_permit=permit_deny, + ports=ports, + proto=proto, + src_prefix=src_prefix, + dst_prefix=dst_prefix, + ) def apply_rules(self, rules, tag=None): acl = VppAcl(self, rules, tag=tag) @@ -198,7 +212,8 @@ class TestACLplugin(VppTestCase): # Apply a ACL on the interface as inbound for i in self.pg_interfaces: acl_if = VppAclInterface( - self, sw_if_index=i.sw_if_index, n_input=1, acls=[acl]) + self, sw_if_index=i.sw_if_index, n_input=1, acls=[acl] + ) acl_if.add_vpp_config() return acl.acl_index @@ -207,8 +222,7 @@ class TestACLplugin(VppTestCase): acl.add_vpp_config() self.logger.info("Dumped ACL: " + str(acl.dump())) # Apply a ACL on the interface as inbound - acl_if = VppAclInterface(self, sw_if_index=sw_if_index, n_input=1, - acls=[acl]) + acl_if = VppAclInterface(self, sw_if_index=sw_if_index, n_input=1, acls=[acl]) return acl.acl_index def etype_whitelist(self, whitelist, n_input, add=True): @@ -216,9 +230,14 @@ class TestACLplugin(VppTestCase): if add: self._wl = [] for i in self.pg_interfaces: - self._wl.append(VppEtypeWhitelist( - self, sw_if_index=i.sw_if_index, whitelist=whitelist, - n_input=n_input).add_vpp_config()) + self._wl.append( + VppEtypeWhitelist( + self, + sw_if_index=i.sw_if_index, + whitelist=whitelist, + n_input=n_input, + ).add_vpp_config() + ) else: if hasattr(self, "_wl"): for wl in self._wl: @@ -226,27 +245,36 @@ class TestACLplugin(VppTestCase): def create_upper_layer(self, packet_index, proto, ports=0): p = self.proto_map[proto] - if p == 'UDP': + if p == "UDP": if ports == 0: - return UDP(sport=random.randint(self.udp_sport_from, - self.udp_sport_to), - dport=random.randint(self.udp_dport_from, - self.udp_dport_to)) + return UDP( + sport=random.randint(self.udp_sport_from, self.udp_sport_to), + dport=random.randint(self.udp_dport_from, self.udp_dport_to), + ) else: return UDP(sport=ports, dport=ports) - elif p == 'TCP': + elif p == "TCP": if ports == 0: - return TCP(sport=random.randint(self.tcp_sport_from, - self.tcp_sport_to), - dport=random.randint(self.tcp_dport_from, - self.tcp_dport_to)) + return TCP( + sport=random.randint(self.tcp_sport_from, self.tcp_sport_to), + dport=random.randint(self.tcp_dport_from, self.tcp_dport_to), + ) else: return TCP(sport=ports, dport=ports) - return '' - - def create_stream(self, src_if, packet_sizes, traffic_type=0, ipv6=0, - proto=-1, ports=0, fragments=False, - pkt_raw=True, etype=-1): + return "" + + def create_stream( + self, + src_if, + packet_sizes, + traffic_type=0, + ipv6=0, + proto=-1, + ports=0, + fragments=False, + pkt_raw=True, + etype=-1, + ): """ Create input packet stream for defined interface using hosts or deleted_hosts list. @@ -279,26 +307,25 @@ class TestACLplugin(VppTestCase): payload = self.info_to_payload(pkt_info) p = Ether(dst=dst_host.mac, src=src_host.mac) if etype > 0: - p = Ether(dst=dst_host.mac, - src=src_host.mac, - type=etype) + p = Ether(dst=dst_host.mac, src=src_host.mac, type=etype) if pkt_info.ip: p /= IPv6(dst=dst_host.ip6, src=src_host.ip6) if fragments: p /= IPv6ExtHdrFragment(offset=64, m=1) else: if fragments: - p /= IP(src=src_host.ip4, dst=dst_host.ip4, - flags=1, frag=64) + p /= IP( + src=src_host.ip4, dst=dst_host.ip4, flags=1, frag=64 + ) else: p /= IP(src=src_host.ip4, dst=dst_host.ip4) if traffic_type == self.ICMP: if pkt_info.ip: - p /= ICMPv6EchoRequest(type=self.icmp6_type, - code=self.icmp6_code) + p /= ICMPv6EchoRequest( + type=self.icmp6_type, code=self.icmp6_code + ) else: - p /= ICMP(type=self.icmp4_type, - code=self.icmp4_code) + p /= ICMP(type=self.icmp4_type, code=self.icmp4_code) else: p /= self.create_upper_layer(i, pkt_info.proto, ports) if pkt_raw: @@ -310,8 +337,7 @@ class TestACLplugin(VppTestCase): pkts.append(p) return pkts - def verify_capture(self, pg_if, capture, - traffic_type=0, ip_type=0, etype=-1): + def verify_capture(self, pg_if, capture, traffic_type=0, ip_type=0, etype=-1): """ Verify captured input packet stream for defined interface. @@ -326,22 +352,23 @@ class TestACLplugin(VppTestCase): for packet in capture: if etype > 0: if packet[Ether].type != etype: - self.logger.error(ppp("Unexpected ethertype in packet:", - packet)) + self.logger.error(ppp("Unexpected ethertype in packet:", packet)) else: continue try: # Raw data for ICMPv6 are stored in ICMPv6EchoRequest.data if traffic_type == self.ICMP and ip_type == self.IPV6: payload_info = self.payload_to_info( - packet[ICMPv6EchoRequest], 'data') + packet[ICMPv6EchoRequest], "data" + ) payload = packet[ICMPv6EchoRequest] else: payload_info = self.payload_to_info(packet[Raw]) payload = packet[self.proto_map[payload_info.proto]] except: - self.logger.error(ppp("Unexpected or invalid packet " - "(outside network):", packet)) + self.logger.error( + ppp("Unexpected or invalid packet (outside network):", packet) + ) raise if ip_type != 0: @@ -355,8 +382,9 @@ class TestACLplugin(VppTestCase): self.assertEqual(payload.type, self.icmp6_type) self.assertEqual(payload.code, self.icmp6_code) except: - self.logger.error(ppp("Unexpected or invalid packet " - "(outside network):", packet)) + self.logger.error( + ppp("Unexpected or invalid packet (outside network):", packet) + ) raise else: try: @@ -366,12 +394,13 @@ class TestACLplugin(VppTestCase): packet_index = payload_info.index self.assertEqual(payload_info.dst, dst_sw_if_index) - self.logger.debug("Got packet on port %s: src=%u (id=%u)" % - (pg_if.name, payload_info.src, - packet_index)) + self.logger.debug( + "Got packet on port %s: src=%u (id=%u)" + % (pg_if.name, payload_info.src, packet_index) + ) next_info = self.get_next_packet_info_for_interface2( - payload_info.src, dst_sw_if_index, - last_info[payload_info.src]) + payload_info.src, dst_sw_if_index, last_info[payload_info.src] + ) last_info[payload_info.src] = next_info self.assertTrue(next_info is not None) self.assertEqual(packet_index, next_info.index) @@ -380,29 +409,26 @@ class TestACLplugin(VppTestCase): self.assertEqual(ip.src, saved_packet[ip_version].src) self.assertEqual(ip.dst, saved_packet[ip_version].dst) p = self.proto_map[payload_info.proto] - if p == 'TCP': + if p == "TCP": tcp = packet[TCP] - self.assertEqual(tcp.sport, saved_packet[ - TCP].sport) - self.assertEqual(tcp.dport, saved_packet[ - TCP].dport) - elif p == 'UDP': + self.assertEqual(tcp.sport, saved_packet[TCP].sport) + self.assertEqual(tcp.dport, saved_packet[TCP].dport) + elif p == "UDP": udp = packet[UDP] - self.assertEqual(udp.sport, saved_packet[ - UDP].sport) - self.assertEqual(udp.dport, saved_packet[ - UDP].dport) + self.assertEqual(udp.sport, saved_packet[UDP].sport) + self.assertEqual(udp.dport, saved_packet[UDP].dport) except: - self.logger.error(ppp("Unexpected or invalid packet:", - packet)) + self.logger.error(ppp("Unexpected or invalid packet:", packet)) raise for i in self.pg_interfaces: remaining_packet = self.get_next_packet_info_for_interface2( - i, dst_sw_if_index, last_info[i.sw_if_index]) + i, dst_sw_if_index, last_info[i.sw_if_index] + ) self.assertTrue( remaining_packet is None, - "Port %u: Packet expected from source %u didn't arrive" % - (dst_sw_if_index, i.sw_if_index)) + "Port %u: Packet expected from source %u didn't arrive" + % (dst_sw_if_index, i.sw_if_index), + ) def run_traffic_no_check(self): # Test @@ -417,16 +443,32 @@ class TestACLplugin(VppTestCase): self.pg_enable_capture(self.pg_interfaces) self.pg_start() - def run_verify_test(self, traffic_type=0, ip_type=0, proto=-1, ports=0, - frags=False, pkt_raw=True, etype=-1): + def run_verify_test( + self, + traffic_type=0, + ip_type=0, + proto=-1, + ports=0, + frags=False, + pkt_raw=True, + etype=-1, + ): # Test # Create incoming packet streams for packet-generator interfaces pkts_cnt = 0 for i in self.pg_interfaces: if self.flows.__contains__(i): - pkts = self.create_stream(i, self.pg_if_packet_sizes, - traffic_type, ip_type, proto, ports, - frags, pkt_raw, etype) + pkts = self.create_stream( + i, + self.pg_if_packet_sizes, + traffic_type, + ip_type, + proto, + ports, + frags, + pkt_raw, + etype, + ) if len(pkts) > 0: i.add_stream(pkts) pkts_cnt += len(pkts) @@ -442,21 +484,28 @@ class TestACLplugin(VppTestCase): if self.flows.__contains__(src_if): for dst_if in self.flows[src_if]: capture = dst_if.get_capture(pkts_cnt) - self.logger.info("Verifying capture on interface %s" % - dst_if.name) - self.verify_capture(dst_if, capture, - traffic_type, ip_type, etype) + self.logger.info("Verifying capture on interface %s" % dst_if.name) + self.verify_capture(dst_if, capture, traffic_type, ip_type, etype) - def run_verify_negat_test(self, traffic_type=0, ip_type=0, proto=-1, - ports=0, frags=False, etype=-1): + def run_verify_negat_test( + self, traffic_type=0, ip_type=0, proto=-1, ports=0, frags=False, etype=-1 + ): # Test pkts_cnt = 0 self.reset_packet_infos() for i in self.pg_interfaces: if self.flows.__contains__(i): - pkts = self.create_stream(i, self.pg_if_packet_sizes, - traffic_type, ip_type, proto, ports, - frags, True, etype) + pkts = self.create_stream( + i, + self.pg_if_packet_sizes, + traffic_type, + ip_type, + proto, + ports, + frags, + True, + etype, + ) if len(pkts) > 0: i.add_stream(pkts) pkts_cnt += len(pkts) @@ -471,24 +520,22 @@ class TestACLplugin(VppTestCase): for src_if in self.pg_interfaces: if self.flows.__contains__(src_if): for dst_if in self.flows[src_if]: - self.logger.info("Verifying capture on interface %s" % - dst_if.name) + self.logger.info("Verifying capture on interface %s" % dst_if.name) capture = dst_if.get_capture(0) self.assertEqual(len(capture), 0) def test_0000_warmup_test(self): - """ ACL plugin version check; learn MACs - """ + """ACL plugin version check; learn MACs""" reply = self.vapi.papi.acl_plugin_get_version() self.assertEqual(reply.major, 1) - self.logger.info("Working with ACL plugin version: %d.%d" % ( - reply.major, reply.minor)) + self.logger.info( + "Working with ACL plugin version: %d.%d" % (reply.major, reply.minor) + ) # minor version changes are non breaking # self.assertEqual(reply.minor, 0) def test_0001_acl_create(self): - """ ACL create/delete test - """ + """ACL create/delete test""" self.logger.info("ACLP_TEST_START_0001") # Create a permit-1234 ACL @@ -510,12 +557,13 @@ class TestACLplugin(VppTestCase): for i_rule in range(0, len(r) - 1): encoded_rule = r[i_rule].encode() for rule_key in encoded_rule: - self.assertEqual(rr[0].r[i_rule][rule_key], - encoded_rule[rule_key]) + self.assertEqual(rr[0].r[i_rule][rule_key], encoded_rule[rule_key]) # Create a deny-1234 ACL - r_deny = [AclRule(is_permit=0, proto=17, ports=1234, sport_to=1235), - AclRule(is_permit=1, proto=17, ports=0)] + r_deny = [ + AclRule(is_permit=0, proto=17, ports=1234, sport_to=1235), + AclRule(is_permit=1, proto=17, ports=0), + ] second_acl = VppAcl(self, rules=r_deny, tag="deny 1234;permit all") second_acl.add_vpp_config() self.assertTrue(second_acl.query_vpp_config()) @@ -528,8 +576,8 @@ class TestACLplugin(VppTestCase): # apply an ACL on an interface inbound, try to delete ACL, must fail acl_if_list = VppAclInterface( - self, sw_if_index=self.pg0.sw_if_index, n_input=1, - acls=[first_acl]) + self, sw_if_index=self.pg0.sw_if_index, n_input=1, acls=[first_acl] + ) acl_if_list.add_vpp_config() first_acl.remove_vpp_config(expect_error=True) # Unapply an ACL and then try to delete it - must be ok @@ -538,8 +586,8 @@ class TestACLplugin(VppTestCase): # apply an ACL on an interface inbound, try to delete ACL, must fail acl_if_list = VppAclInterface( - self, sw_if_index=self.pg0.sw_if_index, n_input=0, - acls=[second_acl]) + self, sw_if_index=self.pg0.sw_if_index, n_input=0, acls=[second_acl] + ) acl_if_list.add_vpp_config() second_acl.remove_vpp_config(expect_error=True) # Unapply an ACL and then try to delete it - must be ok @@ -548,22 +596,23 @@ class TestACLplugin(VppTestCase): # try to apply a nonexistent ACL - must fail acl_if_list = VppAclInterface( - self, sw_if_index=self.pg0.sw_if_index, n_input=0, - acls=[invalid_acl]) + self, sw_if_index=self.pg0.sw_if_index, n_input=0, acls=[invalid_acl] + ) acl_if_list.add_vpp_config(expect_error=True) self.logger.info("ACLP_TEST_FINISH_0001") def test_0002_acl_permit_apply(self): - """ permit ACL apply test - """ + """permit ACL apply test""" self.logger.info("ACLP_TEST_START_0002") rules = [] - rules.append(self.create_rule(self.IPV4, self.PERMIT, - 0, self.proto[self.IP][self.UDP])) - rules.append(self.create_rule(self.IPV4, self.PERMIT, - 0, self.proto[self.IP][self.TCP])) + rules.append( + self.create_rule(self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.UDP]) + ) + rules.append( + self.create_rule(self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.TCP]) + ) # Apply rules acl_idx = self.apply_rules(rules, "permit per-flow") @@ -574,14 +623,14 @@ class TestACLplugin(VppTestCase): # Traffic should still pass self.run_verify_test(self.IP, self.IPV4, -1) - matches = self.statistics.get_counter('/acl/%d/matches' % acl_idx) + matches = self.statistics.get_counter("/acl/%d/matches" % acl_idx) self.logger.info("stat segment counters: %s" % repr(matches)) cli = "show acl-plugin acl" self.logger.info(self.vapi.ppcli(cli)) cli = "show acl-plugin tables" self.logger.info(self.vapi.ppcli(cli)) - total_hits = matches[0][0]['packets'] + matches[0][1]['packets'] + total_hits = matches[0][0]["packets"] + matches[0][1]["packets"] self.assertEqual(total_hits, 64) # disable counters @@ -590,17 +639,17 @@ class TestACLplugin(VppTestCase): self.logger.info("ACLP_TEST_FINISH_0002") def test_0003_acl_deny_apply(self): - """ deny ACL apply test - """ + """deny ACL apply test""" self.logger.info("ACLP_TEST_START_0003") # Add a deny-flows ACL rules = [] - rules.append(self.create_rule( - self.IPV4, self.DENY, self.PORTS_ALL, - self.proto[self.IP][self.UDP])) + rules.append( + self.create_rule( + self.IPV4, self.DENY, self.PORTS_ALL, self.proto[self.IP][self.UDP] + ) + ) # Permit ip any any in the end - rules.append(self.create_rule(self.IPV4, self.PERMIT, - self.PORTS_ALL, 0)) + rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_ALL, 0)) # Apply rules acl_idx = self.apply_rules(rules, "deny per-flow;permit all") @@ -609,30 +658,34 @@ class TestACLplugin(VppTestCase): reply = self.vapi.papi.acl_stats_intf_counters_enable(enable=1) # Traffic should not pass - self.run_verify_negat_test(self.IP, self.IPV4, - self.proto[self.IP][self.UDP]) + self.run_verify_negat_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP]) - matches = self.statistics.get_counter('/acl/%d/matches' % acl_idx) + matches = self.statistics.get_counter("/acl/%d/matches" % acl_idx) self.logger.info("stat segment counters: %s" % repr(matches)) cli = "show acl-plugin acl" self.logger.info(self.vapi.ppcli(cli)) cli = "show acl-plugin tables" self.logger.info(self.vapi.ppcli(cli)) - self.assertEqual(matches[0][0]['packets'], 64) + self.assertEqual(matches[0][0]["packets"], 64) # disable counters reply = self.vapi.papi.acl_stats_intf_counters_enable(enable=0) self.logger.info("ACLP_TEST_FINISH_0003") # self.assertEqual(, 0) def test_0004_vpp624_permit_icmpv4(self): - """ VPP_624 permit ICMPv4 - """ + """VPP_624 permit ICMPv4""" self.logger.info("ACLP_TEST_START_0004") # Add an ACL rules = [] - rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE, - self.proto[self.ICMP][self.ICMPv4])) + rules.append( + self.create_rule( + self.IPV4, + self.PERMIT, + self.PORTS_RANGE, + self.proto[self.ICMP][self.ICMPv4], + ) + ) # deny ip any any in the end rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0)) @@ -640,20 +693,24 @@ class TestACLplugin(VppTestCase): self.apply_rules(rules, "permit icmpv4") # Traffic should still pass - self.run_verify_test(self.ICMP, self.IPV4, - self.proto[self.ICMP][self.ICMPv4]) + self.run_verify_test(self.ICMP, self.IPV4, self.proto[self.ICMP][self.ICMPv4]) self.logger.info("ACLP_TEST_FINISH_0004") def test_0005_vpp624_permit_icmpv6(self): - """ VPP_624 permit ICMPv6 - """ + """VPP_624 permit ICMPv6""" self.logger.info("ACLP_TEST_START_0005") # Add an ACL rules = [] - rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE, - self.proto[self.ICMP][self.ICMPv6])) + rules.append( + self.create_rule( + self.IPV6, + self.PERMIT, + self.PORTS_RANGE, + self.proto[self.ICMP][self.ICMPv6], + ) + ) # deny ip any any in the end rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0)) @@ -661,22 +718,25 @@ class TestACLplugin(VppTestCase): self.apply_rules(rules, "permit icmpv6") # Traffic should still pass - self.run_verify_test(self.ICMP, self.IPV6, - self.proto[self.ICMP][self.ICMPv6]) + self.run_verify_test(self.ICMP, self.IPV6, self.proto[self.ICMP][self.ICMPv6]) self.logger.info("ACLP_TEST_FINISH_0005") def test_0006_vpp624_deny_icmpv4(self): - """ VPP_624 deny ICMPv4 - """ + """VPP_624 deny ICMPv4""" self.logger.info("ACLP_TEST_START_0006") # Add an ACL rules = [] - rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE, - self.proto[self.ICMP][self.ICMPv4])) + rules.append( + self.create_rule( + self.IPV4, + self.DENY, + self.PORTS_RANGE, + self.proto[self.ICMP][self.ICMPv4], + ) + ) # permit ip any any in the end - rules.append(self.create_rule(self.IPV4, self.PERMIT, - self.PORTS_ALL, 0)) + rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_ALL, 0)) # Apply rules self.apply_rules(rules, "deny icmpv4") @@ -687,16 +747,20 @@ class TestACLplugin(VppTestCase): self.logger.info("ACLP_TEST_FINISH_0006") def test_0007_vpp624_deny_icmpv6(self): - """ VPP_624 deny ICMPv6 - """ + """VPP_624 deny ICMPv6""" self.logger.info("ACLP_TEST_START_0007") # Add an ACL rules = [] - rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE, - self.proto[self.ICMP][self.ICMPv6])) + rules.append( + self.create_rule( + self.IPV6, + self.DENY, + self.PORTS_RANGE, + self.proto[self.ICMP][self.ICMPv6], + ) + ) # deny ip any any in the end - rules.append(self.create_rule(self.IPV6, self.PERMIT, - self.PORTS_ALL, 0)) + rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_ALL, 0)) # Apply rules self.apply_rules(rules, "deny icmpv6") @@ -707,14 +771,16 @@ class TestACLplugin(VppTestCase): self.logger.info("ACLP_TEST_FINISH_0007") def test_0008_tcp_permit_v4(self): - """ permit TCPv4 - """ + """permit TCPv4""" self.logger.info("ACLP_TEST_START_0008") # Add an ACL rules = [] - rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE, - self.proto[self.IP][self.TCP])) + rules.append( + self.create_rule( + self.IPV4, self.PERMIT, self.PORTS_RANGE, self.proto[self.IP][self.TCP] + ) + ) # deny ip any any in the end rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0)) @@ -727,14 +793,16 @@ class TestACLplugin(VppTestCase): self.logger.info("ACLP_TEST_FINISH_0008") def test_0009_tcp_permit_v6(self): - """ permit TCPv6 - """ + """permit TCPv6""" self.logger.info("ACLP_TEST_START_0009") # Add an ACL rules = [] - rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE, - self.proto[self.IP][self.TCP])) + rules.append( + self.create_rule( + self.IPV6, self.PERMIT, self.PORTS_RANGE, self.proto[self.IP][self.TCP] + ) + ) # deny ip any any in the end rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0)) @@ -747,14 +815,16 @@ class TestACLplugin(VppTestCase): self.logger.info("ACLP_TEST_FINISH_0008") def test_0010_udp_permit_v4(self): - """ permit UDPv4 - """ + """permit UDPv4""" self.logger.info("ACLP_TEST_START_0010") # Add an ACL rules = [] - rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE, - self.proto[self.IP][self.UDP])) + rules.append( + self.create_rule( + self.IPV4, self.PERMIT, self.PORTS_RANGE, self.proto[self.IP][self.UDP] + ) + ) # deny ip any any in the end rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0)) @@ -767,14 +837,16 @@ class TestACLplugin(VppTestCase): self.logger.info("ACLP_TEST_FINISH_0010") def test_0011_udp_permit_v6(self): - """ permit UDPv6 - """ + """permit UDPv6""" self.logger.info("ACLP_TEST_START_0011") # Add an ACL rules = [] - rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE, - self.proto[self.IP][self.UDP])) + rules.append( + self.create_rule( + self.IPV6, self.PERMIT, self.PORTS_RANGE, self.proto[self.IP][self.UDP] + ) + ) # deny ip any any in the end rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0)) @@ -787,81 +859,89 @@ class TestACLplugin(VppTestCase): self.logger.info("ACLP_TEST_FINISH_0011") def test_0012_tcp_deny(self): - """ deny TCPv4/v6 - """ + """deny TCPv4/v6""" self.logger.info("ACLP_TEST_START_0012") # Add an ACL rules = [] - rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE, - self.proto[self.IP][self.TCP])) - rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE, - self.proto[self.IP][self.TCP])) + rules.append( + self.create_rule( + self.IPV4, self.DENY, self.PORTS_RANGE, self.proto[self.IP][self.TCP] + ) + ) + rules.append( + self.create_rule( + self.IPV6, self.DENY, self.PORTS_RANGE, self.proto[self.IP][self.TCP] + ) + ) # permit ip any any in the end - rules.append(self.create_rule(self.IPV4, self.PERMIT, - self.PORTS_ALL, 0)) - rules.append(self.create_rule(self.IPV6, self.PERMIT, - self.PORTS_ALL, 0)) + rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_ALL, 0)) + rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_ALL, 0)) # Apply rules self.apply_rules(rules, "deny ip4/ip6 tcp") # Traffic should not pass - self.run_verify_negat_test(self.IP, self.IPRANDOM, - self.proto[self.IP][self.TCP]) + self.run_verify_negat_test( + self.IP, self.IPRANDOM, self.proto[self.IP][self.TCP] + ) self.logger.info("ACLP_TEST_FINISH_0012") def test_0013_udp_deny(self): - """ deny UDPv4/v6 - """ + """deny UDPv4/v6""" self.logger.info("ACLP_TEST_START_0013") # Add an ACL rules = [] - rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE, - self.proto[self.IP][self.UDP])) - rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE, - self.proto[self.IP][self.UDP])) + rules.append( + self.create_rule( + self.IPV4, self.DENY, self.PORTS_RANGE, self.proto[self.IP][self.UDP] + ) + ) + rules.append( + self.create_rule( + self.IPV6, self.DENY, self.PORTS_RANGE, self.proto[self.IP][self.UDP] + ) + ) # permit ip any any in the end - rules.append(self.create_rule(self.IPV4, self.PERMIT, - self.PORTS_ALL, 0)) - rules.append(self.create_rule(self.IPV6, self.PERMIT, - self.PORTS_ALL, 0)) + rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_ALL, 0)) + rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_ALL, 0)) # Apply rules self.apply_rules(rules, "deny ip4/ip6 udp") # Traffic should not pass - self.run_verify_negat_test(self.IP, self.IPRANDOM, - self.proto[self.IP][self.UDP]) + self.run_verify_negat_test( + self.IP, self.IPRANDOM, self.proto[self.IP][self.UDP] + ) self.logger.info("ACLP_TEST_FINISH_0013") def test_0014_acl_dump(self): - """ verify add/dump acls - """ + """verify add/dump acls""" self.logger.info("ACLP_TEST_START_0014") - r = [[self.IPV4, self.PERMIT, 1234, self.proto[self.IP][self.TCP]], - [self.IPV4, self.PERMIT, 2345, self.proto[self.IP][self.UDP]], - [self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.TCP]], - [self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.UDP]], - [self.IPV4, self.PERMIT, 5, self.proto[self.ICMP][self.ICMPv4]], - [self.IPV6, self.PERMIT, 4321, self.proto[self.IP][self.TCP]], - [self.IPV6, self.PERMIT, 5432, self.proto[self.IP][self.UDP]], - [self.IPV6, self.PERMIT, 0, self.proto[self.IP][self.TCP]], - [self.IPV6, self.PERMIT, 0, self.proto[self.IP][self.UDP]], - [self.IPV6, self.PERMIT, 6, self.proto[self.ICMP][self.ICMPv6]], - [self.IPV4, self.DENY, self.PORTS_ALL, 0], - [self.IPV4, self.DENY, 1234, self.proto[self.IP][self.TCP]], - [self.IPV4, self.DENY, 2345, self.proto[self.IP][self.UDP]], - [self.IPV4, self.DENY, 5, self.proto[self.ICMP][self.ICMPv4]], - [self.IPV6, self.DENY, 4321, self.proto[self.IP][self.TCP]], - [self.IPV6, self.DENY, 5432, self.proto[self.IP][self.UDP]], - [self.IPV6, self.DENY, 6, self.proto[self.ICMP][self.ICMPv6]], - [self.IPV6, self.DENY, self.PORTS_ALL, 0] - ] + r = [ + [self.IPV4, self.PERMIT, 1234, self.proto[self.IP][self.TCP]], + [self.IPV4, self.PERMIT, 2345, self.proto[self.IP][self.UDP]], + [self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.TCP]], + [self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.UDP]], + [self.IPV4, self.PERMIT, 5, self.proto[self.ICMP][self.ICMPv4]], + [self.IPV6, self.PERMIT, 4321, self.proto[self.IP][self.TCP]], + [self.IPV6, self.PERMIT, 5432, self.proto[self.IP][self.UDP]], + [self.IPV6, self.PERMIT, 0, self.proto[self.IP][self.TCP]], + [self.IPV6, self.PERMIT, 0, self.proto[self.IP][self.UDP]], + [self.IPV6, self.PERMIT, 6, self.proto[self.ICMP][self.ICMPv6]], + [self.IPV4, self.DENY, self.PORTS_ALL, 0], + [self.IPV4, self.DENY, 1234, self.proto[self.IP][self.TCP]], + [self.IPV4, self.DENY, 2345, self.proto[self.IP][self.UDP]], + [self.IPV4, self.DENY, 5, self.proto[self.ICMP][self.ICMPv4]], + [self.IPV6, self.DENY, 4321, self.proto[self.IP][self.TCP]], + [self.IPV6, self.DENY, 5432, self.proto[self.IP][self.UDP]], + [self.IPV6, self.DENY, 6, self.proto[self.ICMP][self.ICMPv6]], + [self.IPV6, self.DENY, self.PORTS_ALL, 0], + ] # Add and verify new ACLs rules = [] @@ -886,37 +966,47 @@ class TestACLplugin(VppTestCase): self.assertEqual(dr.srcport_or_icmptype_last, 65535) else: if dr.proto == self.proto[self.IP][self.TCP]: - self.assertGreater(dr.srcport_or_icmptype_first, - self.tcp_sport_from-1) - self.assertLess(dr.srcport_or_icmptype_first, - self.tcp_sport_to+1) - self.assertGreater(dr.dstport_or_icmpcode_last, - self.tcp_dport_from-1) - self.assertLess(dr.dstport_or_icmpcode_last, - self.tcp_dport_to+1) + self.assertGreater( + dr.srcport_or_icmptype_first, self.tcp_sport_from - 1 + ) + self.assertLess( + dr.srcport_or_icmptype_first, self.tcp_sport_to + 1 + ) + self.assertGreater( + dr.dstport_or_icmpcode_last, self.tcp_dport_from - 1 + ) + self.assertLess( + dr.dstport_or_icmpcode_last, self.tcp_dport_to + 1 + ) elif dr.proto == self.proto[self.IP][self.UDP]: - self.assertGreater(dr.srcport_or_icmptype_first, - self.udp_sport_from-1) - self.assertLess(dr.srcport_or_icmptype_first, - self.udp_sport_to+1) - self.assertGreater(dr.dstport_or_icmpcode_last, - self.udp_dport_from-1) - self.assertLess(dr.dstport_or_icmpcode_last, - self.udp_dport_to+1) + self.assertGreater( + dr.srcport_or_icmptype_first, self.udp_sport_from - 1 + ) + self.assertLess( + dr.srcport_or_icmptype_first, self.udp_sport_to + 1 + ) + self.assertGreater( + dr.dstport_or_icmpcode_last, self.udp_dport_from - 1 + ) + self.assertLess( + dr.dstport_or_icmpcode_last, self.udp_dport_to + 1 + ) i += 1 self.logger.info("ACLP_TEST_FINISH_0014") def test_0015_tcp_permit_port_v4(self): - """ permit single TCPv4 - """ + """permit single TCPv4""" self.logger.info("ACLP_TEST_START_0015") port = random.randint(16384, 65535) # Add an ACL rules = [] - rules.append(self.create_rule(self.IPV4, self.PERMIT, port, - self.proto[self.IP][self.TCP])) + rules.append( + self.create_rule( + self.IPV4, self.PERMIT, port, self.proto[self.IP][self.TCP] + ) + ) # deny ip any any in the end rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0)) @@ -924,21 +1014,22 @@ class TestACLplugin(VppTestCase): self.apply_rules(rules, "permit ip4 tcp %d" % port) # Traffic should still pass - self.run_verify_test(self.IP, self.IPV4, - self.proto[self.IP][self.TCP], port) + self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP], port) self.logger.info("ACLP_TEST_FINISH_0015") def test_0016_udp_permit_port_v4(self): - """ permit single UDPv4 - """ + """permit single UDPv4""" self.logger.info("ACLP_TEST_START_0016") port = random.randint(16384, 65535) # Add an ACL rules = [] - rules.append(self.create_rule(self.IPV4, self.PERMIT, port, - self.proto[self.IP][self.UDP])) + rules.append( + self.create_rule( + self.IPV4, self.PERMIT, port, self.proto[self.IP][self.UDP] + ) + ) # deny ip any any in the end rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0)) @@ -946,21 +1037,22 @@ class TestACLplugin(VppTestCase): self.apply_rules(rules, "permit ip4 tcp %d" % port) # Traffic should still pass - self.run_verify_test(self.IP, self.IPV4, - self.proto[self.IP][self.UDP], port) + self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP], port) self.logger.info("ACLP_TEST_FINISH_0016") def test_0017_tcp_permit_port_v6(self): - """ permit single TCPv6 - """ + """permit single TCPv6""" self.logger.info("ACLP_TEST_START_0017") port = random.randint(16384, 65535) # Add an ACL rules = [] - rules.append(self.create_rule(self.IPV6, self.PERMIT, port, - self.proto[self.IP][self.TCP])) + rules.append( + self.create_rule( + self.IPV6, self.PERMIT, port, self.proto[self.IP][self.TCP] + ) + ) # deny ip any any in the end rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0)) @@ -968,90 +1060,89 @@ class TestACLplugin(VppTestCase): self.apply_rules(rules, "permit ip4 tcp %d" % port) # Traffic should still pass - self.run_verify_test(self.IP, self.IPV6, - self.proto[self.IP][self.TCP], port) + self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.TCP], port) self.logger.info("ACLP_TEST_FINISH_0017") def test_0018_udp_permit_port_v6(self): - """ permit single UDPv6 - """ + """permit single UDPv6""" self.logger.info("ACLP_TEST_START_0018") port = random.randint(16384, 65535) # Add an ACL rules = [] - rules.append(self.create_rule(self.IPV6, self.PERMIT, port, - self.proto[self.IP][self.UDP])) + rules.append( + self.create_rule( + self.IPV6, self.PERMIT, port, self.proto[self.IP][self.UDP] + ) + ) # deny ip any any in the end - rules.append(self.create_rule(self.IPV6, self.DENY, - self.PORTS_ALL, 0)) + rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0)) # Apply rules self.apply_rules(rules, "permit ip4 tcp %d" % port) # Traffic should still pass - self.run_verify_test(self.IP, self.IPV6, - self.proto[self.IP][self.UDP], port) + self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.UDP], port) self.logger.info("ACLP_TEST_FINISH_0018") def test_0019_udp_deny_port(self): - """ deny single TCPv4/v6 - """ + """deny single TCPv4/v6""" self.logger.info("ACLP_TEST_START_0019") port = random.randint(16384, 65535) # Add an ACL rules = [] - rules.append(self.create_rule(self.IPV4, self.DENY, port, - self.proto[self.IP][self.TCP])) - rules.append(self.create_rule(self.IPV6, self.DENY, port, - self.proto[self.IP][self.TCP])) + rules.append( + self.create_rule(self.IPV4, self.DENY, port, self.proto[self.IP][self.TCP]) + ) + rules.append( + self.create_rule(self.IPV6, self.DENY, port, self.proto[self.IP][self.TCP]) + ) # Permit ip any any in the end - rules.append(self.create_rule(self.IPV4, self.PERMIT, - self.PORTS_ALL, 0)) - rules.append(self.create_rule(self.IPV6, self.PERMIT, - self.PORTS_ALL, 0)) + rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_ALL, 0)) + rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_ALL, 0)) # Apply rules self.apply_rules(rules, "deny ip4/ip6 udp %d" % port) # Traffic should not pass - self.run_verify_negat_test(self.IP, self.IPRANDOM, - self.proto[self.IP][self.TCP], port) + self.run_verify_negat_test( + self.IP, self.IPRANDOM, self.proto[self.IP][self.TCP], port + ) self.logger.info("ACLP_TEST_FINISH_0019") def test_0020_udp_deny_port(self): - """ deny single UDPv4/v6 - """ + """deny single UDPv4/v6""" self.logger.info("ACLP_TEST_START_0020") port = random.randint(16384, 65535) # Add an ACL rules = [] - rules.append(self.create_rule(self.IPV4, self.DENY, port, - self.proto[self.IP][self.UDP])) - rules.append(self.create_rule(self.IPV6, self.DENY, port, - self.proto[self.IP][self.UDP])) + rules.append( + self.create_rule(self.IPV4, self.DENY, port, self.proto[self.IP][self.UDP]) + ) + rules.append( + self.create_rule(self.IPV6, self.DENY, port, self.proto[self.IP][self.UDP]) + ) # Permit ip any any in the end - rules.append(self.create_rule(self.IPV4, self.PERMIT, - self.PORTS_ALL, 0)) - rules.append(self.create_rule(self.IPV6, self.PERMIT, - self.PORTS_ALL, 0)) + rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_ALL, 0)) + rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_ALL, 0)) # Apply rules self.apply_rules(rules, "deny ip4/ip6 udp %d" % port) # Traffic should not pass - self.run_verify_negat_test(self.IP, self.IPRANDOM, - self.proto[self.IP][self.UDP], port) + self.run_verify_negat_test( + self.IP, self.IPRANDOM, self.proto[self.IP][self.UDP], port + ) self.logger.info("ACLP_TEST_FINISH_0020") def test_0021_udp_deny_port_verify_fragment_deny(self): - """ deny single UDPv4/v6, permit ip any, verify non-initial fragment + """deny single UDPv4/v6, permit ip any, verify non-initial fragment blocked """ self.logger.info("ACLP_TEST_START_0021") @@ -1059,37 +1150,40 @@ class TestACLplugin(VppTestCase): port = random.randint(16384, 65535) # Add an ACL rules = [] - rules.append(self.create_rule(self.IPV4, self.DENY, port, - self.proto[self.IP][self.UDP])) - rules.append(self.create_rule(self.IPV6, self.DENY, port, - self.proto[self.IP][self.UDP])) + rules.append( + self.create_rule(self.IPV4, self.DENY, port, self.proto[self.IP][self.UDP]) + ) + rules.append( + self.create_rule(self.IPV6, self.DENY, port, self.proto[self.IP][self.UDP]) + ) # deny ip any any in the end - rules.append(self.create_rule(self.IPV4, self.PERMIT, - self.PORTS_ALL, 0)) - rules.append(self.create_rule(self.IPV6, self.PERMIT, - self.PORTS_ALL, 0)) + rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_ALL, 0)) + rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_ALL, 0)) # Apply rules self.apply_rules(rules, "deny ip4/ip6 udp %d" % port) # Traffic should not pass - self.run_verify_negat_test(self.IP, self.IPRANDOM, - self.proto[self.IP][self.UDP], port, True) + self.run_verify_negat_test( + self.IP, self.IPRANDOM, self.proto[self.IP][self.UDP], port, True + ) self.logger.info("ACLP_TEST_FINISH_0021") def test_0022_zero_length_udp_ipv4(self): - """ VPP-687 zero length udp ipv4 packet""" + """VPP-687 zero length udp ipv4 packet""" self.logger.info("ACLP_TEST_START_0022") port = random.randint(16384, 65535) # Add an ACL rules = [] - rules.append(self.create_rule(self.IPV4, self.PERMIT, port, - self.proto[self.IP][self.UDP])) - # deny ip any any in the end rules.append( - self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0)) + self.create_rule( + self.IPV4, self.PERMIT, port, self.proto[self.IP][self.UDP] + ) + ) + # deny ip any any in the end + rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0)) # Apply rules self.apply_rules(rules, "permit empty udp ip4 %d" % port) @@ -1097,10 +1191,16 @@ class TestACLplugin(VppTestCase): # Traffic should still pass # Create incoming packet streams for packet-generator interfaces pkts_cnt = 0 - pkts = self.create_stream(self.pg0, self.pg_if_packet_sizes, - self.IP, self.IPV4, - self.proto[self.IP][self.UDP], port, - False, False) + pkts = self.create_stream( + self.pg0, + self.pg_if_packet_sizes, + self.IP, + self.IPV4, + self.proto[self.IP][self.UDP], + port, + False, + False, + ) if len(pkts) > 0: self.pg0.add_stream(pkts) pkts_cnt += len(pkts) @@ -1114,14 +1214,17 @@ class TestACLplugin(VppTestCase): self.logger.info("ACLP_TEST_FINISH_0022") def test_0023_zero_length_udp_ipv6(self): - """ VPP-687 zero length udp ipv6 packet""" + """VPP-687 zero length udp ipv6 packet""" self.logger.info("ACLP_TEST_START_0023") port = random.randint(16384, 65535) # Add an ACL rules = [] - rules.append(self.create_rule(self.IPV6, self.PERMIT, port, - self.proto[self.IP][self.UDP])) + rules.append( + self.create_rule( + self.IPV6, self.PERMIT, port, self.proto[self.IP][self.UDP] + ) + ) # deny ip any any in the end rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0)) @@ -1131,10 +1234,16 @@ class TestACLplugin(VppTestCase): # Traffic should still pass # Create incoming packet streams for packet-generator interfaces pkts_cnt = 0 - pkts = self.create_stream(self.pg0, self.pg_if_packet_sizes, - self.IP, self.IPV6, - self.proto[self.IP][self.UDP], port, - False, False) + pkts = self.create_stream( + self.pg0, + self.pg_if_packet_sizes, + self.IP, + self.IPV6, + self.proto[self.IP][self.UDP], + port, + False, + False, + ) if len(pkts) > 0: self.pg0.add_stream(pkts) pkts_cnt += len(pkts) @@ -1149,16 +1258,21 @@ class TestACLplugin(VppTestCase): self.logger.info("ACLP_TEST_FINISH_0023") def test_0108_tcp_permit_v4(self): - """ permit TCPv4 + non-match range - """ + """permit TCPv4 + non-match range""" self.logger.info("ACLP_TEST_START_0108") # Add an ACL rules = [] - rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2, - self.proto[self.IP][self.TCP])) - rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE, - self.proto[self.IP][self.TCP])) + rules.append( + self.create_rule( + self.IPV4, self.DENY, self.PORTS_RANGE_2, self.proto[self.IP][self.TCP] + ) + ) + rules.append( + self.create_rule( + self.IPV4, self.PERMIT, self.PORTS_RANGE, self.proto[self.IP][self.TCP] + ) + ) # deny ip any any in the end rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0)) @@ -1171,16 +1285,21 @@ class TestACLplugin(VppTestCase): self.logger.info("ACLP_TEST_FINISH_0108") def test_0109_tcp_permit_v6(self): - """ permit TCPv6 + non-match range - """ + """permit TCPv6 + non-match range""" self.logger.info("ACLP_TEST_START_0109") # Add an ACL rules = [] - rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE_2, - self.proto[self.IP][self.TCP])) - rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE, - self.proto[self.IP][self.TCP])) + rules.append( + self.create_rule( + self.IPV6, self.DENY, self.PORTS_RANGE_2, self.proto[self.IP][self.TCP] + ) + ) + rules.append( + self.create_rule( + self.IPV6, self.PERMIT, self.PORTS_RANGE, self.proto[self.IP][self.TCP] + ) + ) # deny ip any any in the end rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0)) @@ -1193,16 +1312,21 @@ class TestACLplugin(VppTestCase): self.logger.info("ACLP_TEST_FINISH_0109") def test_0110_udp_permit_v4(self): - """ permit UDPv4 + non-match range - """ + """permit UDPv4 + non-match range""" self.logger.info("ACLP_TEST_START_0110") # Add an ACL rules = [] - rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2, - self.proto[self.IP][self.UDP])) - rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE, - self.proto[self.IP][self.UDP])) + rules.append( + self.create_rule( + self.IPV4, self.DENY, self.PORTS_RANGE_2, self.proto[self.IP][self.UDP] + ) + ) + rules.append( + self.create_rule( + self.IPV4, self.PERMIT, self.PORTS_RANGE, self.proto[self.IP][self.UDP] + ) + ) # deny ip any any in the end rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0)) @@ -1215,16 +1339,21 @@ class TestACLplugin(VppTestCase): self.logger.info("ACLP_TEST_FINISH_0110") def test_0111_udp_permit_v6(self): - """ permit UDPv6 + non-match range - """ + """permit UDPv6 + non-match range""" self.logger.info("ACLP_TEST_START_0111") # Add an ACL rules = [] - rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE_2, - self.proto[self.IP][self.UDP])) - rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE, - self.proto[self.IP][self.UDP])) + rules.append( + self.create_rule( + self.IPV6, self.DENY, self.PORTS_RANGE_2, self.proto[self.IP][self.UDP] + ) + ) + rules.append( + self.create_rule( + self.IPV6, self.PERMIT, self.PORTS_RANGE, self.proto[self.IP][self.UDP] + ) + ) # deny ip any any in the end rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0)) @@ -1237,80 +1366,113 @@ class TestACLplugin(VppTestCase): self.logger.info("ACLP_TEST_FINISH_0111") def test_0112_tcp_deny(self): - """ deny TCPv4/v6 + non-match range - """ + """deny TCPv4/v6 + non-match range""" self.logger.info("ACLP_TEST_START_0112") # Add an ACL rules = [] - rules.append(self.create_rule(self.IPV4, self.PERMIT, - self.PORTS_RANGE_2, - self.proto[self.IP][self.TCP])) - rules.append(self.create_rule(self.IPV6, self.PERMIT, - self.PORTS_RANGE_2, - self.proto[self.IP][self.TCP])) - rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE, - self.proto[self.IP][self.TCP])) - rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE, - self.proto[self.IP][self.TCP])) + rules.append( + self.create_rule( + self.IPV4, + self.PERMIT, + self.PORTS_RANGE_2, + self.proto[self.IP][self.TCP], + ) + ) + rules.append( + self.create_rule( + self.IPV6, + self.PERMIT, + self.PORTS_RANGE_2, + self.proto[self.IP][self.TCP], + ) + ) + rules.append( + self.create_rule( + self.IPV4, self.DENY, self.PORTS_RANGE, self.proto[self.IP][self.TCP] + ) + ) + rules.append( + self.create_rule( + self.IPV6, self.DENY, self.PORTS_RANGE, self.proto[self.IP][self.TCP] + ) + ) # permit ip any any in the end - rules.append(self.create_rule(self.IPV4, self.PERMIT, - self.PORTS_ALL, 0)) - rules.append(self.create_rule(self.IPV6, self.PERMIT, - self.PORTS_ALL, 0)) + rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_ALL, 0)) + rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_ALL, 0)) # Apply rules self.apply_rules(rules, "deny ip4/ip6 tcp") # Traffic should not pass - self.run_verify_negat_test(self.IP, self.IPRANDOM, - self.proto[self.IP][self.TCP]) + self.run_verify_negat_test( + self.IP, self.IPRANDOM, self.proto[self.IP][self.TCP] + ) self.logger.info("ACLP_TEST_FINISH_0112") def test_0113_udp_deny(self): - """ deny UDPv4/v6 + non-match range - """ + """deny UDPv4/v6 + non-match range""" self.logger.info("ACLP_TEST_START_0113") # Add an ACL rules = [] - rules.append(self.create_rule(self.IPV4, self.PERMIT, - self.PORTS_RANGE_2, - self.proto[self.IP][self.UDP])) - rules.append(self.create_rule(self.IPV6, self.PERMIT, - self.PORTS_RANGE_2, - self.proto[self.IP][self.UDP])) - rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE, - self.proto[self.IP][self.UDP])) - rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE, - self.proto[self.IP][self.UDP])) + rules.append( + self.create_rule( + self.IPV4, + self.PERMIT, + self.PORTS_RANGE_2, + self.proto[self.IP][self.UDP], + ) + ) + rules.append( + self.create_rule( + self.IPV6, + self.PERMIT, + self.PORTS_RANGE_2, + self.proto[self.IP][self.UDP], + ) + ) + rules.append( + self.create_rule( + self.IPV4, self.DENY, self.PORTS_RANGE, self.proto[self.IP][self.UDP] + ) + ) + rules.append( + self.create_rule( + self.IPV6, self.DENY, self.PORTS_RANGE, self.proto[self.IP][self.UDP] + ) + ) # permit ip any any in the end - rules.append(self.create_rule(self.IPV4, self.PERMIT, - self.PORTS_ALL, 0)) - rules.append(self.create_rule(self.IPV6, self.PERMIT, - self.PORTS_ALL, 0)) + rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_ALL, 0)) + rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_ALL, 0)) # Apply rules self.apply_rules(rules, "deny ip4/ip6 udp") # Traffic should not pass - self.run_verify_negat_test(self.IP, self.IPRANDOM, - self.proto[self.IP][self.UDP]) + self.run_verify_negat_test( + self.IP, self.IPRANDOM, self.proto[self.IP][self.UDP] + ) self.logger.info("ACLP_TEST_FINISH_0113") def test_0300_tcp_permit_v4_etype_aaaa(self): - """ permit TCPv4, send 0xAAAA etype - """ + """permit TCPv4, send 0xAAAA etype""" self.logger.info("ACLP_TEST_START_0300") # Add an ACL rules = [] - rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2, - self.proto[self.IP][self.TCP])) - rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE, - self.proto[self.IP][self.TCP])) + rules.append( + self.create_rule( + self.IPV4, self.DENY, self.PORTS_RANGE_2, self.proto[self.IP][self.TCP] + ) + ) + rules.append( + self.create_rule( + self.IPV4, self.PERMIT, self.PORTS_RANGE, self.proto[self.IP][self.TCP] + ) + ) # deny ip any any in the end rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0)) @@ -1318,33 +1480,39 @@ class TestACLplugin(VppTestCase): self.apply_rules(rules, "permit ipv4 tcp") # Traffic should still pass also for an odd ethertype - self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP], - 0, False, True, 0xaaaa) + self.run_verify_test( + self.IP, self.IPV4, self.proto[self.IP][self.TCP], 0, False, True, 0xAAAA + ) self.logger.info("ACLP_TEST_FINISH_0300") def test_0305_tcp_permit_v4_etype_blacklist_aaaa(self): - """ permit TCPv4, whitelist 0x0BBB ethertype, send 0xAAAA-blocked - """ + """permit TCPv4, whitelist 0x0BBB ethertype, send 0xAAAA-blocked""" self.logger.info("ACLP_TEST_START_0305") # Add an ACL rules = [] - rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2, - self.proto[self.IP][self.TCP])) - rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE, - self.proto[self.IP][self.TCP])) + rules.append( + self.create_rule( + self.IPV4, self.DENY, self.PORTS_RANGE_2, self.proto[self.IP][self.TCP] + ) + ) + rules.append( + self.create_rule( + self.IPV4, self.PERMIT, self.PORTS_RANGE, self.proto[self.IP][self.TCP] + ) + ) # deny ip any any in the end rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0)) # Apply rules self.apply_rules(rules, "permit ipv4 tcp") # whitelist the 0xbbbb etype - so the 0xaaaa should be blocked - self.etype_whitelist([0xbbb], 1) + self.etype_whitelist([0xBBB], 1) # The oddball ethertype should be blocked - self.run_verify_negat_test(self.IP, self.IPV4, - self.proto[self.IP][self.TCP], - 0, False, 0xaaaa) + self.run_verify_negat_test( + self.IP, self.IPV4, self.proto[self.IP][self.TCP], 0, False, 0xAAAA + ) # remove the whitelist self.etype_whitelist([], 0, add=False) @@ -1352,27 +1520,33 @@ class TestACLplugin(VppTestCase): self.logger.info("ACLP_TEST_FINISH_0305") def test_0306_tcp_permit_v4_etype_blacklist_aaaa(self): - """ permit TCPv4, whitelist 0x0BBB ethertype, send 0x0BBB - pass - """ + """permit TCPv4, whitelist 0x0BBB ethertype, send 0x0BBB - pass""" self.logger.info("ACLP_TEST_START_0306") # Add an ACL rules = [] - rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2, - self.proto[self.IP][self.TCP])) - rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE, - self.proto[self.IP][self.TCP])) + rules.append( + self.create_rule( + self.IPV4, self.DENY, self.PORTS_RANGE_2, self.proto[self.IP][self.TCP] + ) + ) + rules.append( + self.create_rule( + self.IPV4, self.PERMIT, self.PORTS_RANGE, self.proto[self.IP][self.TCP] + ) + ) # deny ip any any in the end rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0)) # Apply rules self.apply_rules(rules, "permit ipv4 tcp") # whitelist the 0xbbbb etype - so the 0xaaaa should be blocked - self.etype_whitelist([0xbbb], 1) + self.etype_whitelist([0xBBB], 1) # The whitelisted traffic, should pass - self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP], - 0, False, True, 0x0bbb) + self.run_verify_test( + self.IP, self.IPV4, self.proto[self.IP][self.TCP], 0, False, True, 0x0BBB + ) # remove the whitelist, the previously blocked 0xAAAA should pass now self.etype_whitelist([], 0, add=False) @@ -1380,16 +1554,21 @@ class TestACLplugin(VppTestCase): self.logger.info("ACLP_TEST_FINISH_0306") def test_0307_tcp_permit_v4_etype_blacklist_aaaa(self): - """ permit TCPv4, whitelist 0x0BBB, remove, send 0xAAAA - pass - """ + """permit TCPv4, whitelist 0x0BBB, remove, send 0xAAAA - pass""" self.logger.info("ACLP_TEST_START_0307") # Add an ACL rules = [] - rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2, - self.proto[self.IP][self.TCP])) - rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE, - self.proto[self.IP][self.TCP])) + rules.append( + self.create_rule( + self.IPV4, self.DENY, self.PORTS_RANGE_2, self.proto[self.IP][self.TCP] + ) + ) + rules.append( + self.create_rule( + self.IPV4, self.PERMIT, self.PORTS_RANGE, self.proto[self.IP][self.TCP] + ) + ) # deny ip any any in the end rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0)) @@ -1397,27 +1576,33 @@ class TestACLplugin(VppTestCase): self.apply_rules(rules, "permit ipv4 tcp") # whitelist the 0xbbbb etype - so the 0xaaaa should be blocked - self.etype_whitelist([0xbbb], 1) + self.etype_whitelist([0xBBB], 1) # remove the whitelist, the previously blocked 0xAAAA should pass now self.etype_whitelist([], 0, add=False) # The whitelisted traffic, should pass - self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP], - 0, False, True, 0xaaaa) + self.run_verify_test( + self.IP, self.IPV4, self.proto[self.IP][self.TCP], 0, False, True, 0xAAAA + ) self.logger.info("ACLP_TEST_FINISH_0306") def test_0315_del_intf(self): - """ apply an acl and delete the interface - """ + """apply an acl and delete the interface""" self.logger.info("ACLP_TEST_START_0315") # Add an ACL rules = [] - rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2, - self.proto[self.IP][self.TCP])) - rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE, - self.proto[self.IP][self.TCP])) + rules.append( + self.create_rule( + self.IPV4, self.DENY, self.PORTS_RANGE_2, self.proto[self.IP][self.TCP] + ) + ) + rules.append( + self.create_rule( + self.IPV4, self.PERMIT, self.PORTS_RANGE, self.proto[self.IP][self.TCP] + ) + ) # deny ip any any in the end rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0)) @@ -1434,5 +1619,5 @@ class TestACLplugin(VppTestCase): self.logger.info("ACLP_TEST_FINISH_0315") -if __name__ == '__main__': +if __name__ == "__main__": unittest.main(testRunner=VppTestRunner)