2 """ACL plugin Test Case HLD:
8 from scapy.packet import Raw
9 from scapy.layers.l2 import Ether
10 from scapy.layers.inet import IP, TCP, UDP, ICMP
11 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest
12 from scapy.layers.inet6 import IPv6ExtHdrFragment
13 from framework import VppTestCase, VppTestRunner
14 from framework import tag_fixme_vpp_workers
15 from util import Host, ppp
16 from ipaddress import IPv4Network, IPv6Network
18 from vpp_lo_interface import VppLoInterface
19 from vpp_acl import AclRule, VppAcl, VppAclInterface, VppEtypeWhitelist
20 from vpp_ip import INVALID_INDEX
23 @tag_fixme_vpp_workers
24 class TestACLplugin(VppTestCase):
25 """ACL plugin Test Case"""
41 proto = [[6, 17], [1, 58]]
42 proto_map = {1: "ICMP", 58: "ICMPv6EchoRequest", 6: "TCP", 17: "UDP"}
54 udp_sport_to = udp_sport_from + 5
55 udp_dport_from = 20000
56 udp_dport_to = udp_dport_from + 5000
58 tcp_sport_to = tcp_sport_from + 5
59 tcp_dport_from = 40000
60 tcp_dport_to = tcp_dport_from + 5000
63 udp_sport_to_2 = udp_sport_from_2 + 5
64 udp_dport_from_2 = 30000
65 udp_dport_to_2 = udp_dport_from_2 + 5000
66 tcp_sport_from_2 = 130
67 tcp_sport_to_2 = tcp_sport_from_2 + 5
68 tcp_dport_from_2 = 20000
69 tcp_dport_to_2 = tcp_dport_from_2 + 5000
71 icmp4_type = 8 # echo request
73 icmp6_type = 128 # echo request
89 Perform standard class setup (defined by class method setUpClass in
90 class VppTestCase) before running the test case, set test case related
91 variables and configure VPP.
93 super(TestACLplugin, cls).setUpClass()
96 # Create 2 pg interfaces
97 cls.create_pg_interfaces(range(2))
99 # Packet flows mapping pg0 -> pg1, pg2 etc.
101 cls.flows[cls.pg0] = [cls.pg1]
104 cls.pg_if_packet_sizes = [64, 512, 1518, 9018]
106 # Create BD with MAC learning and unknown unicast flooding disabled
107 # and put interfaces to this BD
108 cls.vapi.bridge_domain_add_del_v2(
109 bd_id=cls.bd_id, uu_flood=1, learn=1, flood=1, forward=1, is_add=1
111 for pg_if in cls.pg_interfaces:
112 cls.vapi.sw_interface_set_l2_bridge(
113 rx_sw_if_index=pg_if.sw_if_index, bd_id=cls.bd_id
116 # Set up all interfaces
117 for i in cls.pg_interfaces:
120 # Mapping between packet-generator index and lists of test hosts
121 cls.hosts_by_pg_idx = dict()
122 for pg_if in cls.pg_interfaces:
123 cls.hosts_by_pg_idx[pg_if.sw_if_index] = []
125 # Create list of deleted hosts
126 cls.deleted_hosts_by_pg_idx = dict()
127 for pg_if in cls.pg_interfaces:
128 cls.deleted_hosts_by_pg_idx[pg_if.sw_if_index] = []
130 # warm-up the mac address tables
134 n_int = len(cls.pg_interfaces)
135 macs_per_if = count // n_int
137 for pg_if in cls.pg_interfaces:
139 start_nr = macs_per_if * i + start
141 count + start if i == (n_int - 1) else macs_per_if * (i + 1) + start
143 hosts = cls.hosts_by_pg_idx[pg_if.sw_if_index]
144 for j in range(int(start_nr), int(end_nr)):
146 "00:00:00:ff:%02x:%02x" % (pg_if.sw_if_index, j),
147 "172.17.1%02x.%u" % (pg_if.sw_if_index, j),
148 "2017:dead:%02x::%u" % (pg_if.sw_if_index, j),
153 super(TestACLplugin, cls).tearDownClass()
157 def tearDownClass(cls):
158 super(TestACLplugin, cls).tearDownClass()
161 super(TestACLplugin, self).setUp()
162 self.reset_packet_infos()
166 Show various debug prints after each test.
168 super(TestACLplugin, self).tearDown()
170 def show_commands_at_teardown(self):
171 cli = "show vlib graph l2-input-feat-arc"
172 self.logger.info(self.vapi.ppcli(cli))
173 cli = "show vlib graph l2-input-feat-arc-end"
174 self.logger.info(self.vapi.ppcli(cli))
175 cli = "show vlib graph l2-output-feat-arc"
176 self.logger.info(self.vapi.ppcli(cli))
177 cli = "show vlib graph l2-output-feat-arc-end"
178 self.logger.info(self.vapi.ppcli(cli))
179 self.logger.info(self.vapi.ppcli("show l2fib verbose"))
180 self.logger.info(self.vapi.ppcli("show acl-plugin acl"))
181 self.logger.info(self.vapi.ppcli("show acl-plugin interface"))
182 self.logger.info(self.vapi.ppcli("show acl-plugin tables"))
183 self.logger.info(self.vapi.ppcli("show bridge-domain %s detail" % self.bd_id))
197 src_prefix = IPv6Network((s_ip, s_prefix))
198 dst_prefix = IPv6Network((d_ip, d_prefix))
200 src_prefix = IPv4Network((s_ip, s_prefix))
201 dst_prefix = IPv4Network((d_ip, d_prefix))
203 is_permit=permit_deny,
206 src_prefix=src_prefix,
207 dst_prefix=dst_prefix,
210 def apply_rules(self, rules, tag=None):
211 acl = VppAcl(self, rules, tag=tag)
213 self.logger.info("Dumped ACL: " + str(acl.dump()))
214 # Apply a ACL on the interface as inbound
215 for i in self.pg_interfaces:
216 acl_if = VppAclInterface(
217 self, sw_if_index=i.sw_if_index, n_input=1, acls=[acl]
219 acl_if.add_vpp_config()
222 def apply_rules_to(self, rules, tag=None, sw_if_index=INVALID_INDEX):
223 acl = VppAcl(self, rules, tag=tag)
225 self.logger.info("Dumped ACL: " + str(acl.dump()))
226 # Apply a ACL on the interface as inbound
227 acl_if = VppAclInterface(self, sw_if_index=sw_if_index, n_input=1, acls=[acl])
230 def etype_whitelist(self, whitelist, n_input, add=True):
231 # Apply whitelists on all the interfaces
234 for i in self.pg_interfaces:
238 sw_if_index=i.sw_if_index,
244 if hasattr(self, "_wl"):
246 wl.remove_vpp_config()
248 def create_upper_layer(self, packet_index, proto, ports=0):
249 p = self.proto_map[proto]
253 sport=random.randint(self.udp_sport_from, self.udp_sport_to),
254 dport=random.randint(self.udp_dport_from, self.udp_dport_to),
257 return UDP(sport=ports, dport=ports)
261 sport=random.randint(self.tcp_sport_from, self.tcp_sport_to),
262 dport=random.randint(self.tcp_dport_from, self.tcp_dport_to),
265 return TCP(sport=ports, dport=ports)
281 Create input packet stream for defined interface using hosts or
284 :param object src_if: Interface to create packet stream for.
285 :param list packet_sizes: List of required packet sizes.
286 :param traffic_type: 1: ICMP packet, 2: IPv6 with EH, 0: otherwise.
287 :return: Stream of packets.
290 if self.flows.__contains__(src_if):
291 src_hosts = self.hosts_by_pg_idx[src_if.sw_if_index]
292 for dst_if in self.flows[src_if]:
293 dst_hosts = self.hosts_by_pg_idx[dst_if.sw_if_index]
294 n_int = len(dst_hosts) * len(src_hosts)
295 for i in range(0, n_int):
296 dst_host = dst_hosts[int(i / len(src_hosts))]
297 src_host = src_hosts[i % len(src_hosts)]
298 pkt_info = self.create_packet_info(src_if, dst_if)
304 pkt_info.ip = random.choice([0, 1])
306 pkt_info.proto = random.choice(self.proto[self.IP])
308 pkt_info.proto = proto
309 payload = self.info_to_payload(pkt_info)
310 p = Ether(dst=dst_host.mac, src=src_host.mac)
312 p = Ether(dst=dst_host.mac, src=src_host.mac, type=etype)
314 p /= IPv6(dst=dst_host.ip6, src=src_host.ip6)
316 p /= IPv6ExtHdrFragment(offset=64, m=1)
320 src=src_host.ip4, dst=dst_host.ip4, flags=1, frag=64
323 p /= IP(src=src_host.ip4, dst=dst_host.ip4)
324 if traffic_type == self.ICMP:
326 p /= ICMPv6EchoRequest(
327 type=self.icmp6_type, code=self.icmp6_code
330 p /= ICMP(type=self.icmp4_type, code=self.icmp4_code)
332 p /= self.create_upper_layer(i, pkt_info.proto, ports)
335 pkt_info.data = p.copy()
337 size = random.choice(packet_sizes)
338 self.extend_packet(p, size)
342 def verify_capture(self, pg_if, capture, traffic_type=0, ip_type=0, etype=-1):
344 Verify captured input packet stream for defined interface.
346 :param object pg_if: Interface to verify captured packet stream for.
347 :param list capture: Captured packet stream.
348 :param traffic_type: 1: ICMP packet, 2: IPv6 with EH, 0: otherwise.
351 for i in self.pg_interfaces:
352 last_info[i.sw_if_index] = None
353 dst_sw_if_index = pg_if.sw_if_index
354 for packet in capture:
356 if packet[Ether].type != etype:
357 self.logger.error(ppp("Unexpected ethertype in packet:", packet))
361 # Raw data for ICMPv6 are stored in ICMPv6EchoRequest.data
362 if traffic_type == self.ICMP and ip_type == self.IPV6:
363 payload_info = self.payload_to_info(
364 packet[ICMPv6EchoRequest], "data"
366 payload = packet[ICMPv6EchoRequest]
368 payload_info = self.payload_to_info(packet[Raw])
369 payload = packet[self.proto_map[payload_info.proto]]
372 ppp("Unexpected or invalid packet (outside network):", packet)
377 self.assertEqual(payload_info.ip, ip_type)
378 if traffic_type == self.ICMP:
380 if payload_info.ip == 0:
381 self.assertEqual(payload.type, self.icmp4_type)
382 self.assertEqual(payload.code, self.icmp4_code)
384 self.assertEqual(payload.type, self.icmp6_type)
385 self.assertEqual(payload.code, self.icmp6_code)
388 ppp("Unexpected or invalid packet (outside network):", packet)
393 ip_version = IPv6 if payload_info.ip == 1 else IP
395 ip = packet[ip_version]
396 packet_index = payload_info.index
398 self.assertEqual(payload_info.dst, dst_sw_if_index)
400 "Got packet on port %s: src=%u (id=%u)"
401 % (pg_if.name, payload_info.src, packet_index)
403 next_info = self.get_next_packet_info_for_interface2(
404 payload_info.src, dst_sw_if_index, last_info[payload_info.src]
406 last_info[payload_info.src] = next_info
407 self.assertTrue(next_info is not None)
408 self.assertEqual(packet_index, next_info.index)
409 saved_packet = next_info.data
410 # Check standard fields
411 self.assertEqual(ip.src, saved_packet[ip_version].src)
412 self.assertEqual(ip.dst, saved_packet[ip_version].dst)
413 p = self.proto_map[payload_info.proto]
416 self.assertEqual(tcp.sport, saved_packet[TCP].sport)
417 self.assertEqual(tcp.dport, saved_packet[TCP].dport)
420 self.assertEqual(udp.sport, saved_packet[UDP].sport)
421 self.assertEqual(udp.dport, saved_packet[UDP].dport)
423 self.logger.error(ppp("Unexpected or invalid packet:", packet))
425 for i in self.pg_interfaces:
426 remaining_packet = self.get_next_packet_info_for_interface2(
427 i, dst_sw_if_index, last_info[i.sw_if_index]
430 remaining_packet is None,
431 "Port %u: Packet expected from source %u didn't arrive"
432 % (dst_sw_if_index, i.sw_if_index),
435 def run_traffic_no_check(self):
437 # Create incoming packet streams for packet-generator interfaces
438 for i in self.pg_interfaces:
439 if self.flows.__contains__(i):
440 pkts = self.create_stream(i, self.pg_if_packet_sizes)
444 # Enable packet capture and start packet sending
445 self.pg_enable_capture(self.pg_interfaces)
459 # Create incoming packet streams for packet-generator interfaces
461 for i in self.pg_interfaces:
462 if self.flows.__contains__(i):
463 pkts = self.create_stream(
465 self.pg_if_packet_sizes,
476 pkts_cnt += len(pkts)
478 # Enable packet capture and start packet sendingself.IPV
479 self.pg_enable_capture(self.pg_interfaces)
481 self.logger.info("sent packets count: %d" % pkts_cnt)
484 # Verify outgoing packet streams per packet-generator interface
485 for src_if in self.pg_interfaces:
486 if self.flows.__contains__(src_if):
487 for dst_if in self.flows[src_if]:
488 capture = dst_if.get_capture(pkts_cnt)
489 self.logger.info("Verifying capture on interface %s" % dst_if.name)
490 self.verify_capture(dst_if, capture, traffic_type, ip_type, etype)
492 def run_verify_negat_test(
493 self, traffic_type=0, ip_type=0, proto=-1, ports=0, frags=False, etype=-1
497 self.reset_packet_infos()
498 for i in self.pg_interfaces:
499 if self.flows.__contains__(i):
500 pkts = self.create_stream(
502 self.pg_if_packet_sizes,
513 pkts_cnt += len(pkts)
515 # Enable packet capture and start packet sending
516 self.pg_enable_capture(self.pg_interfaces)
518 self.logger.info("sent packets count: %d" % pkts_cnt)
521 # Verify outgoing packet streams per packet-generator interface
522 for src_if in self.pg_interfaces:
523 if self.flows.__contains__(src_if):
524 for dst_if in self.flows[src_if]:
525 self.logger.info("Verifying capture on interface %s" % dst_if.name)
526 capture = dst_if.get_capture(0)
527 self.assertEqual(len(capture), 0)
529 def test_0000_warmup_test(self):
530 """ACL plugin version check; learn MACs"""
531 reply = self.vapi.papi.acl_plugin_get_version()
532 self.assertEqual(reply.major, 1)
534 "Working with ACL plugin version: %d.%d" % (reply.major, reply.minor)
536 # minor version changes are non breaking
537 # self.assertEqual(reply.minor, 0)
539 def test_0001_acl_create(self):
540 """ACL create/delete test"""
542 self.logger.info("ACLP_TEST_START_0001")
543 # Create a permit-1234 ACL
544 r = [AclRule(is_permit=1, proto=17, ports=1234, sport_to=1235)]
545 # Test 1: add a new ACL
546 first_acl = VppAcl(self, rules=r, tag="permit 1234")
547 first_acl.add_vpp_config()
548 self.assertTrue(first_acl.query_vpp_config())
549 # The very first ACL gets #0
550 self.assertEqual(first_acl.acl_index, 0)
551 rr = first_acl.dump()
552 self.logger.info("Dumped ACL: " + str(rr))
553 self.assertEqual(len(rr), 1)
554 # We should have the same number of ACL entries as we had asked
555 self.assertEqual(len(rr[0].r), len(r))
556 # The rules should be the same. But because the submitted and returned
557 # are different types, we need to iterate over rules and keys to get
559 for i_rule in range(0, len(r) - 1):
560 encoded_rule = r[i_rule].encode()
561 for rule_key in encoded_rule:
562 self.assertEqual(rr[0].r[i_rule][rule_key], encoded_rule[rule_key])
564 # Create a deny-1234 ACL
566 AclRule(is_permit=0, proto=17, ports=1234, sport_to=1235),
567 AclRule(is_permit=1, proto=17, ports=0),
569 second_acl = VppAcl(self, rules=r_deny, tag="deny 1234;permit all")
570 second_acl.add_vpp_config()
571 self.assertTrue(second_acl.query_vpp_config())
572 # The second ACL gets #1
573 self.assertEqual(second_acl.acl_index, 1)
575 # Test 2: try to modify a nonexistent ACL
576 invalid_acl = VppAcl(self, acl_index=432, rules=r, tag="FFFF:FFFF")
577 reply = invalid_acl.add_vpp_config(expect_error=True)
579 # apply an ACL on an interface inbound, try to delete ACL, must fail
580 acl_if_list = VppAclInterface(
581 self, sw_if_index=self.pg0.sw_if_index, n_input=1, acls=[first_acl]
583 acl_if_list.add_vpp_config()
584 first_acl.remove_vpp_config(expect_error=True)
585 # Unapply an ACL and then try to delete it - must be ok
586 acl_if_list.remove_vpp_config()
587 first_acl.remove_vpp_config()
589 # apply an ACL on an interface inbound, try to delete ACL, must fail
590 acl_if_list = VppAclInterface(
591 self, sw_if_index=self.pg0.sw_if_index, n_input=0, acls=[second_acl]
593 acl_if_list.add_vpp_config()
594 second_acl.remove_vpp_config(expect_error=True)
595 # Unapply an ACL and then try to delete it - must be ok
596 acl_if_list.remove_vpp_config()
597 second_acl.remove_vpp_config()
599 # try to apply a nonexistent ACL - must fail
600 acl_if_list = VppAclInterface(
601 self, sw_if_index=self.pg0.sw_if_index, n_input=0, acls=[invalid_acl]
603 acl_if_list.add_vpp_config(expect_error=True)
605 self.logger.info("ACLP_TEST_FINISH_0001")
607 def test_0002_acl_permit_apply(self):
608 """permit ACL apply test"""
609 self.logger.info("ACLP_TEST_START_0002")
613 self.create_rule(self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.UDP])
616 self.create_rule(self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.TCP])
620 acl_idx = self.apply_rules(rules, "permit per-flow")
623 reply = self.vapi.papi.acl_stats_intf_counters_enable(enable=1)
625 # Traffic should still pass
626 self.run_verify_test(self.IP, self.IPV4, -1)
628 matches = self.statistics.get_counter("/acl/%d/matches" % acl_idx)
629 self.logger.info("stat segment counters: %s" % repr(matches))
630 cli = "show acl-plugin acl"
631 self.logger.info(self.vapi.ppcli(cli))
632 cli = "show acl-plugin tables"
633 self.logger.info(self.vapi.ppcli(cli))
635 total_hits = matches[0][0]["packets"] + matches[0][1]["packets"]
636 self.assertEqual(total_hits, 64)
639 reply = self.vapi.papi.acl_stats_intf_counters_enable(enable=0)
641 self.logger.info("ACLP_TEST_FINISH_0002")
643 def test_0003_acl_deny_apply(self):
644 """deny ACL apply test"""
645 self.logger.info("ACLP_TEST_START_0003")
646 # Add a deny-flows ACL
650 self.IPV4, self.DENY, self.PORTS_ALL, self.proto[self.IP][self.UDP]
653 # Permit ip any any in the end
654 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_ALL, 0))
657 acl_idx = self.apply_rules(rules, "deny per-flow;permit all")
660 reply = self.vapi.papi.acl_stats_intf_counters_enable(enable=1)
662 # Traffic should not pass
663 self.run_verify_negat_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP])
665 matches = self.statistics.get_counter("/acl/%d/matches" % acl_idx)
666 self.logger.info("stat segment counters: %s" % repr(matches))
667 cli = "show acl-plugin acl"
668 self.logger.info(self.vapi.ppcli(cli))
669 cli = "show acl-plugin tables"
670 self.logger.info(self.vapi.ppcli(cli))
671 self.assertEqual(matches[0][0]["packets"], 64)
673 reply = self.vapi.papi.acl_stats_intf_counters_enable(enable=0)
674 self.logger.info("ACLP_TEST_FINISH_0003")
675 # self.assertEqual(, 0)
677 def test_0004_vpp624_permit_icmpv4(self):
678 """VPP_624 permit ICMPv4"""
679 self.logger.info("ACLP_TEST_START_0004")
688 self.proto[self.ICMP][self.ICMPv4],
691 # deny ip any any in the end
692 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
695 self.apply_rules(rules, "permit icmpv4")
697 # Traffic should still pass
698 self.run_verify_test(self.ICMP, self.IPV4, self.proto[self.ICMP][self.ICMPv4])
700 self.logger.info("ACLP_TEST_FINISH_0004")
702 def test_0005_vpp624_permit_icmpv6(self):
703 """VPP_624 permit ICMPv6"""
704 self.logger.info("ACLP_TEST_START_0005")
713 self.proto[self.ICMP][self.ICMPv6],
716 # deny ip any any in the end
717 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
720 self.apply_rules(rules, "permit icmpv6")
722 # Traffic should still pass
723 self.run_verify_test(self.ICMP, self.IPV6, self.proto[self.ICMP][self.ICMPv6])
725 self.logger.info("ACLP_TEST_FINISH_0005")
727 def test_0006_vpp624_deny_icmpv4(self):
728 """VPP_624 deny ICMPv4"""
729 self.logger.info("ACLP_TEST_START_0006")
737 self.proto[self.ICMP][self.ICMPv4],
740 # permit ip any any in the end
741 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_ALL, 0))
744 self.apply_rules(rules, "deny icmpv4")
746 # Traffic should not pass
747 self.run_verify_negat_test(self.ICMP, self.IPV4, 0)
749 self.logger.info("ACLP_TEST_FINISH_0006")
751 def test_0007_vpp624_deny_icmpv6(self):
752 """VPP_624 deny ICMPv6"""
753 self.logger.info("ACLP_TEST_START_0007")
761 self.proto[self.ICMP][self.ICMPv6],
764 # deny ip any any in the end
765 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_ALL, 0))
768 self.apply_rules(rules, "deny icmpv6")
770 # Traffic should not pass
771 self.run_verify_negat_test(self.ICMP, self.IPV6, 0)
773 self.logger.info("ACLP_TEST_FINISH_0007")
775 def test_0008_tcp_permit_v4(self):
777 self.logger.info("ACLP_TEST_START_0008")
783 self.IPV4, self.PERMIT, self.PORTS_RANGE, self.proto[self.IP][self.TCP]
786 # deny ip any any in the end
787 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
790 self.apply_rules(rules, "permit ipv4 tcp")
792 # Traffic should still pass
793 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP])
795 self.logger.info("ACLP_TEST_FINISH_0008")
797 def test_0009_tcp_permit_v6(self):
799 self.logger.info("ACLP_TEST_START_0009")
805 self.IPV6, self.PERMIT, self.PORTS_RANGE, self.proto[self.IP][self.TCP]
808 # deny ip any any in the end
809 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
812 self.apply_rules(rules, "permit ip6 tcp")
814 # Traffic should still pass
815 self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.TCP])
817 self.logger.info("ACLP_TEST_FINISH_0008")
819 def test_0010_udp_permit_v4(self):
821 self.logger.info("ACLP_TEST_START_0010")
827 self.IPV4, self.PERMIT, self.PORTS_RANGE, self.proto[self.IP][self.UDP]
830 # deny ip any any in the end
831 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
834 self.apply_rules(rules, "permit ipv udp")
836 # Traffic should still pass
837 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP])
839 self.logger.info("ACLP_TEST_FINISH_0010")
841 def test_0011_udp_permit_v6(self):
843 self.logger.info("ACLP_TEST_START_0011")
849 self.IPV6, self.PERMIT, self.PORTS_RANGE, self.proto[self.IP][self.UDP]
852 # deny ip any any in the end
853 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
856 self.apply_rules(rules, "permit ip6 udp")
858 # Traffic should still pass
859 self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.UDP])
861 self.logger.info("ACLP_TEST_FINISH_0011")
863 def test_0012_tcp_deny(self):
865 self.logger.info("ACLP_TEST_START_0012")
871 self.IPV4, self.DENY, self.PORTS_RANGE, self.proto[self.IP][self.TCP]
876 self.IPV6, self.DENY, self.PORTS_RANGE, self.proto[self.IP][self.TCP]
879 # permit ip any any in the end
880 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_ALL, 0))
881 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_ALL, 0))
884 self.apply_rules(rules, "deny ip4/ip6 tcp")
886 # Traffic should not pass
887 self.run_verify_negat_test(
888 self.IP, self.IPRANDOM, self.proto[self.IP][self.TCP]
891 self.logger.info("ACLP_TEST_FINISH_0012")
893 def test_0013_udp_deny(self):
895 self.logger.info("ACLP_TEST_START_0013")
901 self.IPV4, self.DENY, self.PORTS_RANGE, self.proto[self.IP][self.UDP]
906 self.IPV6, self.DENY, self.PORTS_RANGE, self.proto[self.IP][self.UDP]
909 # permit ip any any in the end
910 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_ALL, 0))
911 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_ALL, 0))
914 self.apply_rules(rules, "deny ip4/ip6 udp")
916 # Traffic should not pass
917 self.run_verify_negat_test(
918 self.IP, self.IPRANDOM, self.proto[self.IP][self.UDP]
921 self.logger.info("ACLP_TEST_FINISH_0013")
923 def test_0014_acl_dump(self):
924 """verify add/dump acls"""
925 self.logger.info("ACLP_TEST_START_0014")
928 [self.IPV4, self.PERMIT, 1234, self.proto[self.IP][self.TCP]],
929 [self.IPV4, self.PERMIT, 2345, self.proto[self.IP][self.UDP]],
930 [self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.TCP]],
931 [self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.UDP]],
932 [self.IPV4, self.PERMIT, 5, self.proto[self.ICMP][self.ICMPv4]],
933 [self.IPV6, self.PERMIT, 4321, self.proto[self.IP][self.TCP]],
934 [self.IPV6, self.PERMIT, 5432, self.proto[self.IP][self.UDP]],
935 [self.IPV6, self.PERMIT, 0, self.proto[self.IP][self.TCP]],
936 [self.IPV6, self.PERMIT, 0, self.proto[self.IP][self.UDP]],
937 [self.IPV6, self.PERMIT, 6, self.proto[self.ICMP][self.ICMPv6]],
938 [self.IPV4, self.DENY, self.PORTS_ALL, 0],
939 [self.IPV4, self.DENY, 1234, self.proto[self.IP][self.TCP]],
940 [self.IPV4, self.DENY, 2345, self.proto[self.IP][self.UDP]],
941 [self.IPV4, self.DENY, 5, self.proto[self.ICMP][self.ICMPv4]],
942 [self.IPV6, self.DENY, 4321, self.proto[self.IP][self.TCP]],
943 [self.IPV6, self.DENY, 5432, self.proto[self.IP][self.UDP]],
944 [self.IPV6, self.DENY, 6, self.proto[self.ICMP][self.ICMPv6]],
945 [self.IPV6, self.DENY, self.PORTS_ALL, 0],
948 # Add and verify new ACLs
950 for i in range(len(r)):
951 rules.append(self.create_rule(r[i][0], r[i][1], r[i][2], r[i][3]))
953 acl = VppAcl(self, rules=rules)
958 for drules in result:
960 self.assertEqual(dr.is_permit, r[i][1])
961 self.assertEqual(dr.proto, r[i][3])
964 self.assertEqual(dr.srcport_or_icmptype_first, r[i][2])
967 self.assertEqual(dr.srcport_or_icmptype_first, 0)
968 self.assertEqual(dr.srcport_or_icmptype_last, 65535)
970 if dr.proto == self.proto[self.IP][self.TCP]:
972 dr.srcport_or_icmptype_first, self.tcp_sport_from - 1
975 dr.srcport_or_icmptype_first, self.tcp_sport_to + 1
978 dr.dstport_or_icmpcode_last, self.tcp_dport_from - 1
981 dr.dstport_or_icmpcode_last, self.tcp_dport_to + 1
983 elif dr.proto == self.proto[self.IP][self.UDP]:
985 dr.srcport_or_icmptype_first, self.udp_sport_from - 1
988 dr.srcport_or_icmptype_first, self.udp_sport_to + 1
991 dr.dstport_or_icmpcode_last, self.udp_dport_from - 1
994 dr.dstport_or_icmpcode_last, self.udp_dport_to + 1
998 self.logger.info("ACLP_TEST_FINISH_0014")
1000 def test_0015_tcp_permit_port_v4(self):
1001 """permit single TCPv4"""
1002 self.logger.info("ACLP_TEST_START_0015")
1004 port = random.randint(16384, 65535)
1009 self.IPV4, self.PERMIT, port, self.proto[self.IP][self.TCP]
1012 # deny ip any any in the end
1013 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1016 self.apply_rules(rules, "permit ip4 tcp %d" % port)
1018 # Traffic should still pass
1019 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP], port)
1021 self.logger.info("ACLP_TEST_FINISH_0015")
1023 def test_0016_udp_permit_port_v4(self):
1024 """permit single UDPv4"""
1025 self.logger.info("ACLP_TEST_START_0016")
1027 port = random.randint(16384, 65535)
1032 self.IPV4, self.PERMIT, port, self.proto[self.IP][self.UDP]
1035 # deny ip any any in the end
1036 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1039 self.apply_rules(rules, "permit ip4 tcp %d" % port)
1041 # Traffic should still pass
1042 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP], port)
1044 self.logger.info("ACLP_TEST_FINISH_0016")
1046 def test_0017_tcp_permit_port_v6(self):
1047 """permit single TCPv6"""
1048 self.logger.info("ACLP_TEST_START_0017")
1050 port = random.randint(16384, 65535)
1055 self.IPV6, self.PERMIT, port, self.proto[self.IP][self.TCP]
1058 # deny ip any any in the end
1059 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1062 self.apply_rules(rules, "permit ip4 tcp %d" % port)
1064 # Traffic should still pass
1065 self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.TCP], port)
1067 self.logger.info("ACLP_TEST_FINISH_0017")
1069 def test_0018_udp_permit_port_v6(self):
1070 """permit single UDPv6"""
1071 self.logger.info("ACLP_TEST_START_0018")
1073 port = random.randint(16384, 65535)
1078 self.IPV6, self.PERMIT, port, self.proto[self.IP][self.UDP]
1081 # deny ip any any in the end
1082 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1085 self.apply_rules(rules, "permit ip4 tcp %d" % port)
1087 # Traffic should still pass
1088 self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.UDP], port)
1090 self.logger.info("ACLP_TEST_FINISH_0018")
1092 def test_0019_udp_deny_port(self):
1093 """deny single TCPv4/v6"""
1094 self.logger.info("ACLP_TEST_START_0019")
1096 port = random.randint(16384, 65535)
1100 self.create_rule(self.IPV4, self.DENY, port, self.proto[self.IP][self.TCP])
1103 self.create_rule(self.IPV6, self.DENY, port, self.proto[self.IP][self.TCP])
1105 # Permit ip any any in the end
1106 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_ALL, 0))
1107 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_ALL, 0))
1110 self.apply_rules(rules, "deny ip4/ip6 udp %d" % port)
1112 # Traffic should not pass
1113 self.run_verify_negat_test(
1114 self.IP, self.IPRANDOM, self.proto[self.IP][self.TCP], port
1117 self.logger.info("ACLP_TEST_FINISH_0019")
1119 def test_0020_udp_deny_port(self):
1120 """deny single UDPv4/v6"""
1121 self.logger.info("ACLP_TEST_START_0020")
1123 port = random.randint(16384, 65535)
1127 self.create_rule(self.IPV4, self.DENY, port, self.proto[self.IP][self.UDP])
1130 self.create_rule(self.IPV6, self.DENY, port, self.proto[self.IP][self.UDP])
1132 # Permit ip any any in the end
1133 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_ALL, 0))
1134 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_ALL, 0))
1137 self.apply_rules(rules, "deny ip4/ip6 udp %d" % port)
1139 # Traffic should not pass
1140 self.run_verify_negat_test(
1141 self.IP, self.IPRANDOM, self.proto[self.IP][self.UDP], port
1144 self.logger.info("ACLP_TEST_FINISH_0020")
1146 def test_0021_udp_deny_port_verify_fragment_deny(self):
1147 """deny single UDPv4/v6, permit ip any, verify non-initial fragment
1150 self.logger.info("ACLP_TEST_START_0021")
1152 port = random.randint(16384, 65535)
1156 self.create_rule(self.IPV4, self.DENY, port, self.proto[self.IP][self.UDP])
1159 self.create_rule(self.IPV6, self.DENY, port, self.proto[self.IP][self.UDP])
1161 # deny ip any any in the end
1162 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_ALL, 0))
1163 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_ALL, 0))
1166 self.apply_rules(rules, "deny ip4/ip6 udp %d" % port)
1168 # Traffic should not pass
1169 self.run_verify_negat_test(
1170 self.IP, self.IPRANDOM, self.proto[self.IP][self.UDP], port, True
1173 self.logger.info("ACLP_TEST_FINISH_0021")
1175 def test_0022_zero_length_udp_ipv4(self):
1176 """VPP-687 zero length udp ipv4 packet"""
1177 self.logger.info("ACLP_TEST_START_0022")
1179 port = random.randint(16384, 65535)
1184 self.IPV4, self.PERMIT, port, self.proto[self.IP][self.UDP]
1187 # deny ip any any in the end
1188 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1191 self.apply_rules(rules, "permit empty udp ip4 %d" % port)
1193 # Traffic should still pass
1194 # Create incoming packet streams for packet-generator interfaces
1196 pkts = self.create_stream(
1198 self.pg_if_packet_sizes,
1201 self.proto[self.IP][self.UDP],
1207 self.pg0.add_stream(pkts)
1208 pkts_cnt += len(pkts)
1210 # Enable packet capture and start packet sendingself.IPV
1211 self.pg_enable_capture(self.pg_interfaces)
1214 self.pg1.get_capture(pkts_cnt)
1216 self.logger.info("ACLP_TEST_FINISH_0022")
1218 def test_0023_zero_length_udp_ipv6(self):
1219 """VPP-687 zero length udp ipv6 packet"""
1220 self.logger.info("ACLP_TEST_START_0023")
1222 port = random.randint(16384, 65535)
1227 self.IPV6, self.PERMIT, port, self.proto[self.IP][self.UDP]
1230 # deny ip any any in the end
1231 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1234 self.apply_rules(rules, "permit empty udp ip6 %d" % port)
1236 # Traffic should still pass
1237 # Create incoming packet streams for packet-generator interfaces
1239 pkts = self.create_stream(
1241 self.pg_if_packet_sizes,
1244 self.proto[self.IP][self.UDP],
1250 self.pg0.add_stream(pkts)
1251 pkts_cnt += len(pkts)
1253 # Enable packet capture and start packet sendingself.IPV
1254 self.pg_enable_capture(self.pg_interfaces)
1257 # Verify outgoing packet streams per packet-generator interface
1258 self.pg1.get_capture(pkts_cnt)
1260 self.logger.info("ACLP_TEST_FINISH_0023")
1262 def test_0108_tcp_permit_v4(self):
1263 """permit TCPv4 + non-match range"""
1264 self.logger.info("ACLP_TEST_START_0108")
1270 self.IPV4, self.DENY, self.PORTS_RANGE_2, self.proto[self.IP][self.TCP]
1275 self.IPV4, self.PERMIT, self.PORTS_RANGE, self.proto[self.IP][self.TCP]
1278 # deny ip any any in the end
1279 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1282 self.apply_rules(rules, "permit ipv4 tcp")
1284 # Traffic should still pass
1285 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP])
1287 self.logger.info("ACLP_TEST_FINISH_0108")
1289 def test_0109_tcp_permit_v6(self):
1290 """permit TCPv6 + non-match range"""
1291 self.logger.info("ACLP_TEST_START_0109")
1297 self.IPV6, self.DENY, self.PORTS_RANGE_2, self.proto[self.IP][self.TCP]
1302 self.IPV6, self.PERMIT, self.PORTS_RANGE, self.proto[self.IP][self.TCP]
1305 # deny ip any any in the end
1306 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1309 self.apply_rules(rules, "permit ip6 tcp")
1311 # Traffic should still pass
1312 self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.TCP])
1314 self.logger.info("ACLP_TEST_FINISH_0109")
1316 def test_0110_udp_permit_v4(self):
1317 """permit UDPv4 + non-match range"""
1318 self.logger.info("ACLP_TEST_START_0110")
1324 self.IPV4, self.DENY, self.PORTS_RANGE_2, self.proto[self.IP][self.UDP]
1329 self.IPV4, self.PERMIT, self.PORTS_RANGE, self.proto[self.IP][self.UDP]
1332 # deny ip any any in the end
1333 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1336 self.apply_rules(rules, "permit ipv4 udp")
1338 # Traffic should still pass
1339 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP])
1341 self.logger.info("ACLP_TEST_FINISH_0110")
1343 def test_0111_udp_permit_v6(self):
1344 """permit UDPv6 + non-match range"""
1345 self.logger.info("ACLP_TEST_START_0111")
1351 self.IPV6, self.DENY, self.PORTS_RANGE_2, self.proto[self.IP][self.UDP]
1356 self.IPV6, self.PERMIT, self.PORTS_RANGE, self.proto[self.IP][self.UDP]
1359 # deny ip any any in the end
1360 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1363 self.apply_rules(rules, "permit ip6 udp")
1365 # Traffic should still pass
1366 self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.UDP])
1368 self.logger.info("ACLP_TEST_FINISH_0111")
1370 def test_0112_tcp_deny(self):
1371 """deny TCPv4/v6 + non-match range"""
1372 self.logger.info("ACLP_TEST_START_0112")
1381 self.proto[self.IP][self.TCP],
1389 self.proto[self.IP][self.TCP],
1394 self.IPV4, self.DENY, self.PORTS_RANGE, self.proto[self.IP][self.TCP]
1399 self.IPV6, self.DENY, self.PORTS_RANGE, self.proto[self.IP][self.TCP]
1402 # permit ip any any in the end
1403 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_ALL, 0))
1404 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_ALL, 0))
1407 self.apply_rules(rules, "deny ip4/ip6 tcp")
1409 # Traffic should not pass
1410 self.run_verify_negat_test(
1411 self.IP, self.IPRANDOM, self.proto[self.IP][self.TCP]
1414 self.logger.info("ACLP_TEST_FINISH_0112")
1416 def test_0113_udp_deny(self):
1417 """deny UDPv4/v6 + non-match range"""
1418 self.logger.info("ACLP_TEST_START_0113")
1427 self.proto[self.IP][self.UDP],
1435 self.proto[self.IP][self.UDP],
1440 self.IPV4, self.DENY, self.PORTS_RANGE, self.proto[self.IP][self.UDP]
1445 self.IPV6, self.DENY, self.PORTS_RANGE, self.proto[self.IP][self.UDP]
1448 # permit ip any any in the end
1449 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_ALL, 0))
1450 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_ALL, 0))
1453 self.apply_rules(rules, "deny ip4/ip6 udp")
1455 # Traffic should not pass
1456 self.run_verify_negat_test(
1457 self.IP, self.IPRANDOM, self.proto[self.IP][self.UDP]
1460 self.logger.info("ACLP_TEST_FINISH_0113")
1462 def test_0300_tcp_permit_v4_etype_aaaa(self):
1463 """permit TCPv4, send 0xAAAA etype"""
1464 self.logger.info("ACLP_TEST_START_0300")
1470 self.IPV4, self.DENY, self.PORTS_RANGE_2, self.proto[self.IP][self.TCP]
1475 self.IPV4, self.PERMIT, self.PORTS_RANGE, self.proto[self.IP][self.TCP]
1478 # deny ip any any in the end
1479 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1482 self.apply_rules(rules, "permit ipv4 tcp")
1484 # Traffic should still pass also for an odd ethertype
1485 self.run_verify_test(
1486 self.IP, self.IPV4, self.proto[self.IP][self.TCP], 0, False, True, 0xAAAA
1488 self.logger.info("ACLP_TEST_FINISH_0300")
1490 def test_0305_tcp_permit_v4_etype_blacklist_aaaa(self):
1491 """permit TCPv4, whitelist 0x0BBB ethertype, send 0xAAAA-blocked"""
1492 self.logger.info("ACLP_TEST_START_0305")
1498 self.IPV4, self.DENY, self.PORTS_RANGE_2, self.proto[self.IP][self.TCP]
1503 self.IPV4, self.PERMIT, self.PORTS_RANGE, self.proto[self.IP][self.TCP]
1506 # deny ip any any in the end
1507 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1510 self.apply_rules(rules, "permit ipv4 tcp")
1511 # whitelist the 0xbbbb etype - so the 0xaaaa should be blocked
1512 self.etype_whitelist([0xBBB], 1)
1514 # The oddball ethertype should be blocked
1515 self.run_verify_negat_test(
1516 self.IP, self.IPV4, self.proto[self.IP][self.TCP], 0, False, 0xAAAA
1519 # remove the whitelist
1520 self.etype_whitelist([], 0, add=False)
1522 self.logger.info("ACLP_TEST_FINISH_0305")
1524 def test_0306_tcp_permit_v4_etype_blacklist_aaaa(self):
1525 """permit TCPv4, whitelist 0x0BBB ethertype, send 0x0BBB - pass"""
1526 self.logger.info("ACLP_TEST_START_0306")
1532 self.IPV4, self.DENY, self.PORTS_RANGE_2, self.proto[self.IP][self.TCP]
1537 self.IPV4, self.PERMIT, self.PORTS_RANGE, self.proto[self.IP][self.TCP]
1540 # deny ip any any in the end
1541 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1544 self.apply_rules(rules, "permit ipv4 tcp")
1545 # whitelist the 0xbbbb etype - so the 0xaaaa should be blocked
1546 self.etype_whitelist([0xBBB], 1)
1548 # The whitelisted traffic, should pass
1549 self.run_verify_test(
1550 self.IP, self.IPV4, self.proto[self.IP][self.TCP], 0, False, True, 0x0BBB
1553 # remove the whitelist, the previously blocked 0xAAAA should pass now
1554 self.etype_whitelist([], 0, add=False)
1556 self.logger.info("ACLP_TEST_FINISH_0306")
1558 def test_0307_tcp_permit_v4_etype_blacklist_aaaa(self):
1559 """permit TCPv4, whitelist 0x0BBB, remove, send 0xAAAA - pass"""
1560 self.logger.info("ACLP_TEST_START_0307")
1566 self.IPV4, self.DENY, self.PORTS_RANGE_2, self.proto[self.IP][self.TCP]
1571 self.IPV4, self.PERMIT, self.PORTS_RANGE, self.proto[self.IP][self.TCP]
1574 # deny ip any any in the end
1575 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1578 self.apply_rules(rules, "permit ipv4 tcp")
1580 # whitelist the 0xbbbb etype - so the 0xaaaa should be blocked
1581 self.etype_whitelist([0xBBB], 1)
1582 # remove the whitelist, the previously blocked 0xAAAA should pass now
1583 self.etype_whitelist([], 0, add=False)
1585 # The whitelisted traffic, should pass
1586 self.run_verify_test(
1587 self.IP, self.IPV4, self.proto[self.IP][self.TCP], 0, False, True, 0xAAAA
1590 self.logger.info("ACLP_TEST_FINISH_0306")
1592 def test_0315_del_intf(self):
1593 """apply an acl and delete the interface"""
1594 self.logger.info("ACLP_TEST_START_0315")
1600 self.IPV4, self.DENY, self.PORTS_RANGE_2, self.proto[self.IP][self.TCP]
1605 self.IPV4, self.PERMIT, self.PORTS_RANGE, self.proto[self.IP][self.TCP]
1608 # deny ip any any in the end
1609 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1611 # create an interface
1613 intf.append(VppLoInterface(self))
1616 self.apply_rules_to(rules, "permit ipv4 tcp", intf[0].sw_if_index)
1618 # Remove the interface
1619 intf[0].remove_vpp_config()
1621 self.logger.info("ACLP_TEST_FINISH_0315")
1624 if __name__ == "__main__":
1625 unittest.main(testRunner=VppTestRunner)