X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=test%2Ftest_acl_plugin.py;h=772b5bbb4156aa7cbaa75606d4e48b51f7b85488;hb=e36f44ad923f4e64266fd3e4dee6e2716e687504;hp=4d748921aaf12e4c772c834c526e55ba1c592854;hpb=6a6f4f7fe777dc77f8496fae1fc1075372ad16b6;p=vpp.git diff --git a/test/test_acl_plugin.py b/test/test_acl_plugin.py index 4d748921aaf..772b5bbb415 100644 --- a/test/test_acl_plugin.py +++ b/test/test_acl_plugin.py @@ -13,6 +13,8 @@ from scapy.layers.inet6 import IPv6ExtHdrFragment from framework import VppTestCase, VppTestRunner from util import Host, ppp +from vpp_lo_interface import VppLoInterface + class TestACLplugin(VppTestCase): """ ACL plugin Test Case """ @@ -101,8 +103,8 @@ class TestACLplugin(VppTestCase): cls.vapi.bridge_domain_add_del(bd_id=cls.bd_id, uu_flood=1, learn=1) for pg_if in cls.pg_interfaces: - cls.vapi.sw_interface_set_l2_bridge(pg_if.sw_if_index, - bd_id=cls.bd_id) + cls.vapi.sw_interface_set_l2_bridge( + rx_sw_if_index=pg_if.sw_if_index, bd_id=cls.bd_id) # Set up all interfaces for i in cls.pg_interfaces: @@ -120,11 +122,32 @@ class TestACLplugin(VppTestCase): # warm-up the mac address tables # self.warmup_test() + count = 16 + start = 0 + n_int = len(cls.pg_interfaces) + macs_per_if = count / n_int + i = -1 + for pg_if in cls.pg_interfaces: + i += 1 + start_nr = macs_per_if * i + start + end_nr = count + start if i == (n_int - 1) \ + else macs_per_if * (i + 1) + start + hosts = cls.hosts_by_pg_idx[pg_if.sw_if_index] + for j in range(start_nr, end_nr): + host = Host( + "00:00:00:ff:%02x:%02x" % (pg_if.sw_if_index, j), + "172.17.1%02x.%u" % (pg_if.sw_if_index, j), + "2017:dead:%02x::%u" % (pg_if.sw_if_index, j)) + hosts.append(host) except Exception: super(TestACLplugin, cls).tearDownClass() raise + @classmethod + def tearDownClass(cls): + super(TestACLplugin, cls).tearDownClass() + def setUp(self): super(TestACLplugin, self).setUp() self.reset_packet_infos() @@ -135,6 +158,14 @@ class TestACLplugin(VppTestCase): """ super(TestACLplugin, self).tearDown() if not self.vpp_dead: + cli = "show vlib graph l2-input-feat-arc" + self.logger.info(self.vapi.ppcli(cli)) + cli = "show vlib graph l2-input-feat-arc-end" + self.logger.info(self.vapi.ppcli(cli)) + cli = "show vlib graph l2-output-feat-arc" + self.logger.info(self.vapi.ppcli(cli)) + cli = "show vlib graph l2-output-feat-arc-end" + self.logger.info(self.vapi.ppcli(cli)) self.logger.info(self.vapi.ppcli("show l2fib verbose")) self.logger.info(self.vapi.ppcli("show acl-plugin acl")) self.logger.info(self.vapi.ppcli("show acl-plugin interface")) @@ -142,30 +173,6 @@ class TestACLplugin(VppTestCase): self.logger.info(self.vapi.ppcli("show bridge-domain %s detail" % self.bd_id)) - def create_hosts(self, count, start=0): - """ - Create required number of host MAC addresses and distribute them among - interfaces. Create host IPv4 address for every host MAC address. - - :param int count: Number of hosts to create MAC/IPv4 addresses for. - :param int start: Number to start numbering from. - """ - n_int = len(self.pg_interfaces) - macs_per_if = count / n_int - i = -1 - for pg_if in self.pg_interfaces: - i += 1 - start_nr = macs_per_if * i + start - end_nr = count + start if i == (n_int - 1) \ - else macs_per_if * (i + 1) + start - hosts = self.hosts_by_pg_idx[pg_if.sw_if_index] - for j in range(start_nr, end_nr): - host = Host( - "00:00:00:ff:%02x:%02x" % (pg_if.sw_if_index, j), - "172.17.1%02x.%u" % (pg_if.sw_if_index, j), - "2017:dead:%02x::%u" % (pg_if.sw_if_index, j)) - hosts.append(host) - def create_rule(self, ip=0, permit_deny=0, ports=PORTS_ALL, proto=-1, s_prefix=0, s_ip='\x00\x00\x00\x00', d_prefix=0, d_ip='\x00\x00\x00\x00'): @@ -235,7 +242,7 @@ class TestACLplugin(VppTestCase): 'dst_ip_addr': d_ip}) return rule - def apply_rules(self, rules, tag=''): + def apply_rules(self, rules, tag=b''): reply = self.vapi.acl_add_replace(acl_index=4294967295, r=rules, tag=tag) self.logger.info("Dumped ACL: " + str( @@ -247,6 +254,26 @@ class TestACLplugin(VppTestCase): acls=[reply.acl_index]) return + def apply_rules_to(self, rules, tag=b'', sw_if_index=0xFFFFFFFF): + reply = self.vapi.acl_add_replace(acl_index=4294967295, r=rules, + tag=tag) + self.logger.info("Dumped ACL: " + str( + self.vapi.acl_dump(reply.acl_index))) + # Apply a ACL on the interface as inbound + self.vapi.acl_interface_set_acl_list(sw_if_index=sw_if_index, + n_input=1, + acls=[reply.acl_index]) + return + + def etype_whitelist(self, whitelist, n_input): + # Apply whitelists on all the interfaces + for i in self.pg_interfaces: + # checkstyle can't read long names. Help them. + fun = self.vapi.acl_interface_set_etype_whitelist + fun(sw_if_index=i.sw_if_index, n_input=n_input, + whitelist=whitelist) + return + def create_upper_layer(self, packet_index, proto, ports=0): p = self.proto_map[proto] if p == 'UDP': @@ -268,7 +295,8 @@ class TestACLplugin(VppTestCase): return '' def create_stream(self, src_if, packet_sizes, traffic_type=0, ipv6=0, - proto=-1, ports=0, fragments=False, pkt_raw=True): + proto=-1, ports=0, fragments=False, + pkt_raw=True, etype=-1): """ Create input packet stream for defined interface using hosts or deleted_hosts list. @@ -300,6 +328,10 @@ class TestACLplugin(VppTestCase): pkt_info.proto = proto payload = self.info_to_payload(pkt_info) p = Ether(dst=dst_host.mac, src=src_host.mac) + if etype > 0: + p = Ether(dst=dst_host.mac, + src=src_host.mac, + type=etype) if pkt_info.ip: p /= IPv6(dst=dst_host.ip6, src=src_host.ip6) if fragments: @@ -328,7 +360,8 @@ class TestACLplugin(VppTestCase): pkts.append(p) return pkts - def verify_capture(self, pg_if, capture, traffic_type=0, ip_type=0): + def verify_capture(self, pg_if, capture, + traffic_type=0, ip_type=0, etype=-1): """ Verify captured input packet stream for defined interface. @@ -341,14 +374,20 @@ class TestACLplugin(VppTestCase): last_info[i.sw_if_index] = None dst_sw_if_index = pg_if.sw_if_index for packet in capture: + if etype > 0: + if packet[Ether].type != etype: + self.logger.error(ppp("Unexpected ethertype in packet:", + packet)) + else: + continue try: # Raw data for ICMPv6 are stored in ICMPv6EchoRequest.data if traffic_type == self.ICMP and ip_type == self.IPV6: payload_info = self.payload_to_info( - packet[ICMPv6EchoRequest].data) + packet[ICMPv6EchoRequest], 'data') payload = packet[ICMPv6EchoRequest] else: - payload_info = self.payload_to_info(str(packet[Raw])) + payload_info = self.payload_to_info(packet[Raw]) payload = packet[self.proto_map[payload_info.proto]] except: self.logger.error(ppp("Unexpected or invalid packet " @@ -429,7 +468,7 @@ class TestACLplugin(VppTestCase): self.pg_start() def run_verify_test(self, traffic_type=0, ip_type=0, proto=-1, ports=0, - frags=False, pkt_raw=True): + frags=False, pkt_raw=True, etype=-1): # Test # Create incoming packet streams for packet-generator interfaces pkts_cnt = 0 @@ -437,7 +476,7 @@ class TestACLplugin(VppTestCase): if self.flows.__contains__(i): pkts = self.create_stream(i, self.pg_if_packet_sizes, traffic_type, ip_type, proto, ports, - frags, pkt_raw) + frags, pkt_raw, etype) if len(pkts) > 0: i.add_stream(pkts) pkts_cnt += len(pkts) @@ -445,6 +484,7 @@ class TestACLplugin(VppTestCase): # Enable packet capture and start packet sendingself.IPV self.pg_enable_capture(self.pg_interfaces) self.pg_start() + self.logger.info("sent packets count: %d" % pkts_cnt) # Verify # Verify outgoing packet streams per packet-generator interface @@ -454,23 +494,27 @@ class TestACLplugin(VppTestCase): capture = dst_if.get_capture(pkts_cnt) self.logger.info("Verifying capture on interface %s" % dst_if.name) - self.verify_capture(dst_if, capture, traffic_type, ip_type) + self.verify_capture(dst_if, capture, + traffic_type, ip_type, etype) def run_verify_negat_test(self, traffic_type=0, ip_type=0, proto=-1, - ports=0, frags=False): + ports=0, frags=False, etype=-1): # Test + pkts_cnt = 0 self.reset_packet_infos() for i in self.pg_interfaces: if self.flows.__contains__(i): pkts = self.create_stream(i, self.pg_if_packet_sizes, traffic_type, ip_type, proto, ports, - frags) + frags, True, etype) if len(pkts) > 0: i.add_stream(pkts) + pkts_cnt += len(pkts) # Enable packet capture and start packet sending self.pg_enable_capture(self.pg_interfaces) self.pg_start() + self.logger.info("sent packets count: %d" % pkts_cnt) # Verify # Verify outgoing packet streams per packet-generator interface @@ -485,8 +529,6 @@ class TestACLplugin(VppTestCase): def test_0000_warmup_test(self): """ ACL plugin version check; learn MACs """ - self.create_hosts(16) - self.run_traffic_no_check() reply = self.vapi.papi.acl_plugin_get_version() self.assertEqual(reply.major, 1) self.logger.info("Working with ACL plugin version: %d.%d" % ( @@ -504,14 +546,14 @@ class TestACLplugin(VppTestCase): 'srcport_or_icmptype_first': 1234, 'srcport_or_icmptype_last': 1235, 'src_ip_prefix_len': 0, - 'src_ip_addr': '\x00\x00\x00\x00', + 'src_ip_addr': b'\x00\x00\x00\x00', 'dstport_or_icmpcode_first': 1234, 'dstport_or_icmpcode_last': 1234, - 'dst_ip_addr': '\x00\x00\x00\x00', + 'dst_ip_addr': b'\x00\x00\x00\x00', 'dst_ip_prefix_len': 0}] # Test 1: add a new ACL reply = self.vapi.acl_add_replace(acl_index=4294967295, r=r, - tag="permit 1234") + tag=b"permit 1234") self.assertEqual(reply.retval, 0) # The very first ACL gets #0 self.assertEqual(reply.acl_index, 0) @@ -534,23 +576,23 @@ class TestACLplugin(VppTestCase): 'srcport_or_icmptype_first': 1234, 'srcport_or_icmptype_last': 1235, 'src_ip_prefix_len': 0, - 'src_ip_addr': '\x00\x00\x00\x00', + 'src_ip_addr': b'\x00\x00\x00\x00', 'dstport_or_icmpcode_first': 1234, 'dstport_or_icmpcode_last': 1234, - 'dst_ip_addr': '\x00\x00\x00\x00', + 'dst_ip_addr': b'\x00\x00\x00\x00', 'dst_ip_prefix_len': 0}, {'is_permit': 1, 'is_ipv6': 0, 'proto': 17, 'srcport_or_icmptype_first': 0, 'srcport_or_icmptype_last': 0, 'src_ip_prefix_len': 0, - 'src_ip_addr': '\x00\x00\x00\x00', + 'src_ip_addr': b'\x00\x00\x00\x00', 'dstport_or_icmpcode_first': 0, 'dstport_or_icmpcode_last': 0, - 'dst_ip_addr': '\x00\x00\x00\x00', + 'dst_ip_addr': b'\x00\x00\x00\x00', 'dst_ip_prefix_len': 0}] reply = self.vapi.acl_add_replace(acl_index=4294967295, r=r_deny, - tag="deny 1234;permit all") + tag=b"deny 1234;permit all") self.assertEqual(reply.retval, 0) # The second ACL gets #1 self.assertEqual(reply.acl_index, 1) @@ -558,15 +600,15 @@ class TestACLplugin(VppTestCase): # Test 2: try to modify a nonexistent ACL reply = self.vapi.acl_add_replace(acl_index=432, r=r, - tag="FFFF:FFFF", expected_retval=-1) - self.assertEqual(reply.retval, -1) + tag=b"FFFF:FFFF", expected_retval=-6) + self.assertEqual(reply.retval, -6) # The ACL number should pass through self.assertEqual(reply.acl_index, 432) # apply an ACL on an interface inbound, try to delete ACL, must fail self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index, n_input=1, acls=[first_acl]) - reply = self.vapi.acl_del(acl_index=first_acl, expected_retval=-1) + reply = self.vapi.acl_del(acl_index=first_acl, expected_retval=-142) # Unapply an ACL and then try to delete it - must be ok self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index, n_input=0, @@ -577,7 +619,7 @@ class TestACLplugin(VppTestCase): self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index, n_input=0, acls=[second_acl]) - reply = self.vapi.acl_del(acl_index=second_acl, expected_retval=-1) + reply = self.vapi.acl_del(acl_index=second_acl, expected_retval=-143) # Unapply the ACL and then try to delete it - must be ok self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index, n_input=0, @@ -588,7 +630,7 @@ class TestACLplugin(VppTestCase): self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index, n_input=1, acls=[first_acl], - expected_retval=-1) + expected_retval=-6) self.logger.info("ACLP_TEST_FINISH_0001") @@ -604,7 +646,7 @@ class TestACLplugin(VppTestCase): 0, self.proto[self.IP][self.TCP])) # Apply rules - self.apply_rules(rules, "permit per-flow") + self.apply_rules(rules, b"permit per-flow") # Traffic should still pass self.run_verify_test(self.IP, self.IPV4, -1) @@ -623,7 +665,7 @@ class TestACLplugin(VppTestCase): self.PORTS_ALL, 0)) # Apply rules - self.apply_rules(rules, "deny per-flow;permit all") + self.apply_rules(rules, b"deny per-flow;permit all") # Traffic should not pass self.run_verify_negat_test(self.IP, self.IPV4, @@ -644,7 +686,7 @@ class TestACLplugin(VppTestCase): rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0)) # Apply rules - self.apply_rules(rules, "permit icmpv4") + self.apply_rules(rules, b"permit icmpv4") # Traffic should still pass self.run_verify_test(self.ICMP, self.IPV4, @@ -665,7 +707,7 @@ class TestACLplugin(VppTestCase): rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0)) # Apply rules - self.apply_rules(rules, "permit icmpv6") + self.apply_rules(rules, b"permit icmpv6") # Traffic should still pass self.run_verify_test(self.ICMP, self.IPV6, @@ -686,7 +728,7 @@ class TestACLplugin(VppTestCase): self.PORTS_ALL, 0)) # Apply rules - self.apply_rules(rules, "deny icmpv4") + self.apply_rules(rules, b"deny icmpv4") # Traffic should not pass self.run_verify_negat_test(self.ICMP, self.IPV4, 0) @@ -706,7 +748,7 @@ class TestACLplugin(VppTestCase): self.PORTS_ALL, 0)) # Apply rules - self.apply_rules(rules, "deny icmpv6") + self.apply_rules(rules, b"deny icmpv6") # Traffic should not pass self.run_verify_negat_test(self.ICMP, self.IPV6, 0) @@ -726,7 +768,7 @@ class TestACLplugin(VppTestCase): rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0)) # Apply rules - self.apply_rules(rules, "permit ipv4 tcp") + self.apply_rules(rules, b"permit ipv4 tcp") # Traffic should still pass self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP]) @@ -746,7 +788,7 @@ class TestACLplugin(VppTestCase): rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0)) # Apply rules - self.apply_rules(rules, "permit ip6 tcp") + self.apply_rules(rules, b"permit ip6 tcp") # Traffic should still pass self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.TCP]) @@ -766,7 +808,7 @@ class TestACLplugin(VppTestCase): rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0)) # Apply rules - self.apply_rules(rules, "permit ipv udp") + self.apply_rules(rules, b"permit ipv udp") # Traffic should still pass self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP]) @@ -786,7 +828,7 @@ class TestACLplugin(VppTestCase): rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0)) # Apply rules - self.apply_rules(rules, "permit ip6 udp") + self.apply_rules(rules, b"permit ip6 udp") # Traffic should still pass self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.UDP]) @@ -811,7 +853,7 @@ class TestACLplugin(VppTestCase): self.PORTS_ALL, 0)) # Apply rules - self.apply_rules(rules, "deny ip4/ip6 tcp") + self.apply_rules(rules, b"deny ip4/ip6 tcp") # Traffic should not pass self.run_verify_negat_test(self.IP, self.IPRANDOM, @@ -837,7 +879,7 @@ class TestACLplugin(VppTestCase): self.PORTS_ALL, 0)) # Apply rules - self.apply_rules(rules, "deny ip4/ip6 udp") + self.apply_rules(rules, b"deny ip4/ip6 udp") # Traffic should not pass self.run_verify_negat_test(self.IP, self.IPRANDOM, @@ -928,7 +970,7 @@ class TestACLplugin(VppTestCase): rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0)) # Apply rules - self.apply_rules(rules, "permit ip4 tcp "+str(port)) + self.apply_rules(rules, b"permit ip4 tcp %d" % port) # Traffic should still pass self.run_verify_test(self.IP, self.IPV4, @@ -950,7 +992,7 @@ class TestACLplugin(VppTestCase): rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0)) # Apply rules - self.apply_rules(rules, "permit ip4 tcp "+str(port)) + self.apply_rules(rules, b"permit ip4 tcp %d" % port) # Traffic should still pass self.run_verify_test(self.IP, self.IPV4, @@ -972,7 +1014,7 @@ class TestACLplugin(VppTestCase): rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0)) # Apply rules - self.apply_rules(rules, "permit ip4 tcp "+str(port)) + self.apply_rules(rules, b"permit ip4 tcp %d" % port) # Traffic should still pass self.run_verify_test(self.IP, self.IPV6, @@ -995,7 +1037,7 @@ class TestACLplugin(VppTestCase): self.PORTS_ALL, 0)) # Apply rules - self.apply_rules(rules, "permit ip4 tcp "+str(port)) + self.apply_rules(rules, b"permit ip4 tcp %d" % port) # Traffic should still pass self.run_verify_test(self.IP, self.IPV6, @@ -1022,7 +1064,7 @@ class TestACLplugin(VppTestCase): self.PORTS_ALL, 0)) # Apply rules - self.apply_rules(rules, "deny ip4/ip6 udp "+str(port)) + self.apply_rules(rules, b"deny ip4/ip6 udp %d" % port) # Traffic should not pass self.run_verify_negat_test(self.IP, self.IPRANDOM, @@ -1049,7 +1091,7 @@ class TestACLplugin(VppTestCase): self.PORTS_ALL, 0)) # Apply rules - self.apply_rules(rules, "deny ip4/ip6 udp "+str(port)) + self.apply_rules(rules, b"deny ip4/ip6 udp %d" % port) # Traffic should not pass self.run_verify_negat_test(self.IP, self.IPRANDOM, @@ -1058,7 +1100,8 @@ class TestACLplugin(VppTestCase): self.logger.info("ACLP_TEST_FINISH_0020") def test_0021_udp_deny_port_verify_fragment_deny(self): - """ deny single UDPv4/v6, permit ip any, verify non-initial fragment blocked + """ deny single UDPv4/v6, permit ip any, verify non-initial fragment + blocked """ self.logger.info("ACLP_TEST_START_0021") @@ -1076,7 +1119,7 @@ class TestACLplugin(VppTestCase): self.PORTS_ALL, 0)) # Apply rules - self.apply_rules(rules, "deny ip4/ip6 udp "+str(port)) + self.apply_rules(rules, b"deny ip4/ip6 udp %d" % port) # Traffic should not pass self.run_verify_negat_test(self.IP, self.IPRANDOM, @@ -1098,7 +1141,7 @@ class TestACLplugin(VppTestCase): self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0)) # Apply rules - self.apply_rules(rules, "permit empty udp ip4 " + str(port)) + self.apply_rules(rules, b"permit empty udp ip4 %d" % port) # Traffic should still pass # Create incoming packet streams for packet-generator interfaces @@ -1132,7 +1175,7 @@ class TestACLplugin(VppTestCase): rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0)) # Apply rules - self.apply_rules(rules, "permit empty udp ip6 "+str(port)) + self.apply_rules(rules, b"permit empty udp ip6 %d" % port) # Traffic should still pass # Create incoming packet streams for packet-generator interfaces @@ -1169,7 +1212,7 @@ class TestACLplugin(VppTestCase): rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0)) # Apply rules - self.apply_rules(rules, "permit ipv4 tcp") + self.apply_rules(rules, b"permit ipv4 tcp") # Traffic should still pass self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP]) @@ -1191,7 +1234,7 @@ class TestACLplugin(VppTestCase): rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0)) # Apply rules - self.apply_rules(rules, "permit ip6 tcp") + self.apply_rules(rules, b"permit ip6 tcp") # Traffic should still pass self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.TCP]) @@ -1213,7 +1256,7 @@ class TestACLplugin(VppTestCase): rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0)) # Apply rules - self.apply_rules(rules, "permit ipv4 udp") + self.apply_rules(rules, b"permit ipv4 udp") # Traffic should still pass self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP]) @@ -1235,7 +1278,7 @@ class TestACLplugin(VppTestCase): rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0)) # Apply rules - self.apply_rules(rules, "permit ip6 udp") + self.apply_rules(rules, b"permit ip6 udp") # Traffic should still pass self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.UDP]) @@ -1266,7 +1309,7 @@ class TestACLplugin(VppTestCase): self.PORTS_ALL, 0)) # Apply rules - self.apply_rules(rules, "deny ip4/ip6 tcp") + self.apply_rules(rules, b"deny ip4/ip6 tcp") # Traffic should not pass self.run_verify_negat_test(self.IP, self.IPRANDOM, @@ -1298,7 +1341,7 @@ class TestACLplugin(VppTestCase): self.PORTS_ALL, 0)) # Apply rules - self.apply_rules(rules, "deny ip4/ip6 udp") + self.apply_rules(rules, b"deny ip4/ip6 udp") # Traffic should not pass self.run_verify_negat_test(self.IP, self.IPRANDOM, @@ -1306,6 +1349,140 @@ class TestACLplugin(VppTestCase): self.logger.info("ACLP_TEST_FINISH_0113") + def test_0300_tcp_permit_v4_etype_aaaa(self): + """ permit TCPv4, send 0xAAAA etype + """ + self.logger.info("ACLP_TEST_START_0300") + + # Add an ACL + rules = [] + rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2, + self.proto[self.IP][self.TCP])) + rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE, + self.proto[self.IP][self.TCP])) + # deny ip any any in the end + rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0)) + + # Apply rules + self.apply_rules(rules, b"permit ipv4 tcp") + + # Traffic should still pass also for an odd ethertype + self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP], + 0, False, True, 0xaaaa) + self.logger.info("ACLP_TEST_FINISH_0300") + + def test_0305_tcp_permit_v4_etype_blacklist_aaaa(self): + """ permit TCPv4, whitelist 0x0BBB ethertype, send 0xAAAA-blocked + """ + self.logger.info("ACLP_TEST_START_0305") + + # Add an ACL + rules = [] + rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2, + self.proto[self.IP][self.TCP])) + rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE, + self.proto[self.IP][self.TCP])) + # deny ip any any in the end + rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0)) + + # Apply rules + self.apply_rules(rules, b"permit ipv4 tcp") + + # whitelist the 0xbbbb etype - so the 0xaaaa should be blocked + self.etype_whitelist([0xbbb], 1) + + # The oddball ethertype should be blocked + self.run_verify_negat_test(self.IP, self.IPV4, + self.proto[self.IP][self.TCP], + 0, False, 0xaaaa) + + # remove the whitelist + self.etype_whitelist([], 0) + + self.logger.info("ACLP_TEST_FINISH_0305") + + def test_0306_tcp_permit_v4_etype_blacklist_aaaa(self): + """ permit TCPv4, whitelist 0x0BBB ethertype, send 0x0BBB - pass + """ + self.logger.info("ACLP_TEST_START_0306") + + # Add an ACL + rules = [] + rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2, + self.proto[self.IP][self.TCP])) + rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE, + self.proto[self.IP][self.TCP])) + # deny ip any any in the end + rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0)) + + # Apply rules + self.apply_rules(rules, b"permit ipv4 tcp") + + # whitelist the 0xbbbb etype - so the 0xaaaa should be blocked + self.etype_whitelist([0xbbb], 1) + + # The whitelisted traffic, should pass + self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP], + 0, False, True, 0x0bbb) + + # remove the whitelist, the previously blocked 0xAAAA should pass now + self.etype_whitelist([], 0) + + self.logger.info("ACLP_TEST_FINISH_0306") + + def test_0307_tcp_permit_v4_etype_blacklist_aaaa(self): + """ permit TCPv4, whitelist 0x0BBB, remove, send 0xAAAA - pass + """ + self.logger.info("ACLP_TEST_START_0307") + + # Add an ACL + rules = [] + rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2, + self.proto[self.IP][self.TCP])) + rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE, + self.proto[self.IP][self.TCP])) + # deny ip any any in the end + rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0)) + + # Apply rules + self.apply_rules(rules, b"permit ipv4 tcp") + + # whitelist the 0xbbbb etype - so the 0xaaaa should be blocked + self.etype_whitelist([0xbbb], 1) + # remove the whitelist, the previously blocked 0xAAAA should pass now + self.etype_whitelist([], 0) + + # The whitelisted traffic, should pass + self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP], + 0, False, True, 0xaaaa) + + self.logger.info("ACLP_TEST_FINISH_0306") + + def test_0315_del_intf(self): + """ apply an acl and delete the interface + """ + self.logger.info("ACLP_TEST_START_0315") + + # Add an ACL + rules = [] + rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2, + self.proto[self.IP][self.TCP])) + rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE, + self.proto[self.IP][self.TCP])) + # deny ip any any in the end + rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0)) + + # create an interface + intf = [] + intf.append(VppLoInterface(self)) + + # Apply rules + self.apply_rules_to(rules, b"permit ipv4 tcp", intf[0].sw_if_index) + + # Remove the interface + intf[0].remove_vpp_config() + + self.logger.info("ACLP_TEST_FINISH_0315") if __name__ == '__main__': unittest.main(testRunner=VppTestRunner)