2 """ACL plugin Test Case HLD:
8 from config import config
9 from scapy.packet import Raw
10 from scapy.layers.l2 import Ether
11 from scapy.layers.inet import IP, TCP, UDP, ICMP
12 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest
13 from scapy.layers.inet6 import IPv6ExtHdrFragment
14 from framework import VppTestCase
15 from asfframework import VppTestRunner, tag_fixme_vpp_workers
16 from util import Host, ppp
17 from ipaddress import IPv4Network, IPv6Network
19 from vpp_lo_interface import VppLoInterface
20 from vpp_acl import AclRule, VppAcl, VppAclInterface, VppEtypeWhitelist
21 from vpp_ip import INVALID_INDEX
24 @unittest.skipIf("acl" in config.excluded_plugins, "Exclude ACL plugin tests")
25 @tag_fixme_vpp_workers
26 class TestACLplugin(VppTestCase):
27 """ACL plugin Test Case"""
43 proto = [[6, 17], [1, 58]]
44 proto_map = {1: "ICMP", 58: "ICMPv6EchoRequest", 6: "TCP", 17: "UDP"}
56 udp_sport_to = udp_sport_from + 5
57 udp_dport_from = 20000
58 udp_dport_to = udp_dport_from + 5000
60 tcp_sport_to = tcp_sport_from + 5
61 tcp_dport_from = 40000
62 tcp_dport_to = tcp_dport_from + 5000
65 udp_sport_to_2 = udp_sport_from_2 + 5
66 udp_dport_from_2 = 30000
67 udp_dport_to_2 = udp_dport_from_2 + 5000
68 tcp_sport_from_2 = 130
69 tcp_sport_to_2 = tcp_sport_from_2 + 5
70 tcp_dport_from_2 = 20000
71 tcp_dport_to_2 = tcp_dport_from_2 + 5000
73 icmp4_type = 8 # echo request
75 icmp6_type = 128 # echo request
91 Perform standard class setup (defined by class method setUpClass in
92 class VppTestCase) before running the test case, set test case related
93 variables and configure VPP.
95 super(TestACLplugin, cls).setUpClass()
98 # Create 2 pg interfaces
99 cls.create_pg_interfaces(range(2))
101 # Packet flows mapping pg0 -> pg1, pg2 etc.
103 cls.flows[cls.pg0] = [cls.pg1]
106 cls.pg_if_packet_sizes = [64, 512, 1518, 9018]
108 # Create BD with MAC learning and unknown unicast flooding disabled
109 # and put interfaces to this BD
110 cls.vapi.bridge_domain_add_del_v2(
111 bd_id=cls.bd_id, uu_flood=1, learn=1, flood=1, forward=1, is_add=1
113 for pg_if in cls.pg_interfaces:
114 cls.vapi.sw_interface_set_l2_bridge(
115 rx_sw_if_index=pg_if.sw_if_index, bd_id=cls.bd_id
118 # Set up all interfaces
119 for i in cls.pg_interfaces:
122 # Mapping between packet-generator index and lists of test hosts
123 cls.hosts_by_pg_idx = dict()
124 for pg_if in cls.pg_interfaces:
125 cls.hosts_by_pg_idx[pg_if.sw_if_index] = []
127 # Create list of deleted hosts
128 cls.deleted_hosts_by_pg_idx = dict()
129 for pg_if in cls.pg_interfaces:
130 cls.deleted_hosts_by_pg_idx[pg_if.sw_if_index] = []
132 # warm-up the mac address tables
136 n_int = len(cls.pg_interfaces)
137 macs_per_if = count // n_int
139 for pg_if in cls.pg_interfaces:
141 start_nr = macs_per_if * i + start
143 count + start if i == (n_int - 1) else macs_per_if * (i + 1) + start
145 hosts = cls.hosts_by_pg_idx[pg_if.sw_if_index]
146 for j in range(int(start_nr), int(end_nr)):
148 "00:00:00:ff:%02x:%02x" % (pg_if.sw_if_index, j),
149 "172.17.1%02x.%u" % (pg_if.sw_if_index, j),
150 "2017:dead:%02x::%u" % (pg_if.sw_if_index, j),
155 super(TestACLplugin, cls).tearDownClass()
159 def tearDownClass(cls):
160 super(TestACLplugin, cls).tearDownClass()
163 super(TestACLplugin, self).setUp()
164 self.reset_packet_infos()
168 Show various debug prints after each test.
170 super(TestACLplugin, self).tearDown()
172 def show_commands_at_teardown(self):
173 cli = "show vlib graph l2-input-feat-arc"
174 self.logger.info(self.vapi.ppcli(cli))
175 cli = "show vlib graph l2-input-feat-arc-end"
176 self.logger.info(self.vapi.ppcli(cli))
177 cli = "show vlib graph l2-output-feat-arc"
178 self.logger.info(self.vapi.ppcli(cli))
179 cli = "show vlib graph l2-output-feat-arc-end"
180 self.logger.info(self.vapi.ppcli(cli))
181 self.logger.info(self.vapi.ppcli("show l2fib verbose"))
182 self.logger.info(self.vapi.ppcli("show acl-plugin acl"))
183 self.logger.info(self.vapi.ppcli("show acl-plugin interface"))
184 self.logger.info(self.vapi.ppcli("show acl-plugin tables"))
185 self.logger.info(self.vapi.ppcli("show bridge-domain %s detail" % self.bd_id))
199 src_prefix = IPv6Network((s_ip, s_prefix))
200 dst_prefix = IPv6Network((d_ip, d_prefix))
202 src_prefix = IPv4Network((s_ip, s_prefix))
203 dst_prefix = IPv4Network((d_ip, d_prefix))
205 is_permit=permit_deny,
208 src_prefix=src_prefix,
209 dst_prefix=dst_prefix,
212 def apply_rules(self, rules, tag=None):
213 acl = VppAcl(self, rules, tag=tag)
215 self.logger.info("Dumped ACL: " + str(acl.dump()))
216 # Apply a ACL on the interface as inbound
217 for i in self.pg_interfaces:
218 acl_if = VppAclInterface(
219 self, sw_if_index=i.sw_if_index, n_input=1, acls=[acl]
221 acl_if.add_vpp_config()
224 def apply_rules_to(self, rules, tag=None, sw_if_index=INVALID_INDEX):
225 acl = VppAcl(self, rules, tag=tag)
227 self.logger.info("Dumped ACL: " + str(acl.dump()))
228 # Apply a ACL on the interface as inbound
229 acl_if = VppAclInterface(self, sw_if_index=sw_if_index, n_input=1, acls=[acl])
232 def etype_whitelist(self, whitelist, n_input, add=True):
233 # Apply whitelists on all the interfaces
236 for i in self.pg_interfaces:
240 sw_if_index=i.sw_if_index,
246 if hasattr(self, "_wl"):
248 wl.remove_vpp_config()
250 def create_upper_layer(self, packet_index, proto, ports=0):
251 p = self.proto_map[proto]
255 sport=random.randint(self.udp_sport_from, self.udp_sport_to),
256 dport=random.randint(self.udp_dport_from, self.udp_dport_to),
259 return UDP(sport=ports, dport=ports)
263 sport=random.randint(self.tcp_sport_from, self.tcp_sport_to),
264 dport=random.randint(self.tcp_dport_from, self.tcp_dport_to),
267 return TCP(sport=ports, dport=ports)
283 Create input packet stream for defined interface using hosts or
286 :param object src_if: Interface to create packet stream for.
287 :param list packet_sizes: List of required packet sizes.
288 :param traffic_type: 1: ICMP packet, 2: IPv6 with EH, 0: otherwise.
289 :return: Stream of packets.
292 if self.flows.__contains__(src_if):
293 src_hosts = self.hosts_by_pg_idx[src_if.sw_if_index]
294 for dst_if in self.flows[src_if]:
295 dst_hosts = self.hosts_by_pg_idx[dst_if.sw_if_index]
296 n_int = len(dst_hosts) * len(src_hosts)
297 for i in range(0, n_int):
298 dst_host = dst_hosts[int(i / len(src_hosts))]
299 src_host = src_hosts[i % len(src_hosts)]
300 pkt_info = self.create_packet_info(src_if, dst_if)
306 pkt_info.ip = random.choice([0, 1])
308 pkt_info.proto = random.choice(self.proto[self.IP])
310 pkt_info.proto = proto
311 payload = self.info_to_payload(pkt_info)
312 p = Ether(dst=dst_host.mac, src=src_host.mac)
314 p = Ether(dst=dst_host.mac, src=src_host.mac, type=etype)
316 p /= IPv6(dst=dst_host.ip6, src=src_host.ip6)
318 p /= IPv6ExtHdrFragment(offset=64, m=1)
322 src=src_host.ip4, dst=dst_host.ip4, flags=1, frag=64
325 p /= IP(src=src_host.ip4, dst=dst_host.ip4)
326 if traffic_type == self.ICMP:
328 p /= ICMPv6EchoRequest(
329 type=self.icmp6_type, code=self.icmp6_code
332 p /= ICMP(type=self.icmp4_type, code=self.icmp4_code)
334 p /= self.create_upper_layer(i, pkt_info.proto, ports)
337 pkt_info.data = p.copy()
339 size = random.choice(packet_sizes)
340 self.extend_packet(p, size)
344 def verify_capture(self, pg_if, capture, traffic_type=0, ip_type=0, etype=-1):
346 Verify captured input packet stream for defined interface.
348 :param object pg_if: Interface to verify captured packet stream for.
349 :param list capture: Captured packet stream.
350 :param traffic_type: 1: ICMP packet, 2: IPv6 with EH, 0: otherwise.
353 for i in self.pg_interfaces:
354 last_info[i.sw_if_index] = None
355 dst_sw_if_index = pg_if.sw_if_index
356 for packet in capture:
358 if packet[Ether].type != etype:
359 self.logger.error(ppp("Unexpected ethertype in packet:", packet))
363 # Raw data for ICMPv6 are stored in ICMPv6EchoRequest.data
364 if traffic_type == self.ICMP and ip_type == self.IPV6:
365 payload_info = self.payload_to_info(
366 packet[ICMPv6EchoRequest], "data"
368 payload = packet[ICMPv6EchoRequest]
370 payload_info = self.payload_to_info(packet[Raw])
371 payload = packet[self.proto_map[payload_info.proto]]
374 ppp("Unexpected or invalid packet (outside network):", packet)
379 self.assertEqual(payload_info.ip, ip_type)
380 if traffic_type == self.ICMP:
382 if payload_info.ip == 0:
383 self.assertEqual(payload.type, self.icmp4_type)
384 self.assertEqual(payload.code, self.icmp4_code)
386 self.assertEqual(payload.type, self.icmp6_type)
387 self.assertEqual(payload.code, self.icmp6_code)
390 ppp("Unexpected or invalid packet (outside network):", packet)
395 ip_version = IPv6 if payload_info.ip == 1 else IP
397 ip = packet[ip_version]
398 packet_index = payload_info.index
400 self.assertEqual(payload_info.dst, dst_sw_if_index)
402 "Got packet on port %s: src=%u (id=%u)"
403 % (pg_if.name, payload_info.src, packet_index)
405 next_info = self.get_next_packet_info_for_interface2(
406 payload_info.src, dst_sw_if_index, last_info[payload_info.src]
408 last_info[payload_info.src] = next_info
409 self.assertTrue(next_info is not None)
410 self.assertEqual(packet_index, next_info.index)
411 saved_packet = next_info.data
412 # Check standard fields
413 self.assertEqual(ip.src, saved_packet[ip_version].src)
414 self.assertEqual(ip.dst, saved_packet[ip_version].dst)
415 p = self.proto_map[payload_info.proto]
418 self.assertEqual(tcp.sport, saved_packet[TCP].sport)
419 self.assertEqual(tcp.dport, saved_packet[TCP].dport)
422 self.assertEqual(udp.sport, saved_packet[UDP].sport)
423 self.assertEqual(udp.dport, saved_packet[UDP].dport)
425 self.logger.error(ppp("Unexpected or invalid packet:", packet))
427 for i in self.pg_interfaces:
428 remaining_packet = self.get_next_packet_info_for_interface2(
429 i, dst_sw_if_index, last_info[i.sw_if_index]
432 remaining_packet is None,
433 "Port %u: Packet expected from source %u didn't arrive"
434 % (dst_sw_if_index, i.sw_if_index),
437 def run_traffic_no_check(self):
439 # Create incoming packet streams for packet-generator interfaces
440 for i in self.pg_interfaces:
441 if self.flows.__contains__(i):
442 pkts = self.create_stream(i, self.pg_if_packet_sizes)
446 # Enable packet capture and start packet sending
447 self.pg_enable_capture(self.pg_interfaces)
461 # Create incoming packet streams for packet-generator interfaces
463 for i in self.pg_interfaces:
464 if self.flows.__contains__(i):
465 pkts = self.create_stream(
467 self.pg_if_packet_sizes,
478 pkts_cnt += len(pkts)
480 # Enable packet capture and start packet sendingself.IPV
481 self.pg_enable_capture(self.pg_interfaces)
483 self.logger.info("sent packets count: %d" % pkts_cnt)
486 # Verify outgoing packet streams per packet-generator interface
487 for src_if in self.pg_interfaces:
488 if self.flows.__contains__(src_if):
489 for dst_if in self.flows[src_if]:
490 capture = dst_if.get_capture(pkts_cnt)
491 self.logger.info("Verifying capture on interface %s" % dst_if.name)
492 self.verify_capture(dst_if, capture, traffic_type, ip_type, etype)
494 def run_verify_negat_test(
495 self, traffic_type=0, ip_type=0, proto=-1, ports=0, frags=False, etype=-1
499 self.reset_packet_infos()
500 for i in self.pg_interfaces:
501 if self.flows.__contains__(i):
502 pkts = self.create_stream(
504 self.pg_if_packet_sizes,
515 pkts_cnt += len(pkts)
517 # Enable packet capture and start packet sending
518 self.pg_enable_capture(self.pg_interfaces)
520 self.logger.info("sent packets count: %d" % pkts_cnt)
523 # Verify outgoing packet streams per packet-generator interface
524 for src_if in self.pg_interfaces:
525 if self.flows.__contains__(src_if):
526 for dst_if in self.flows[src_if]:
527 self.logger.info("Verifying capture on interface %s" % dst_if.name)
528 capture = dst_if.get_capture(0)
529 self.assertEqual(len(capture), 0)
531 def test_0000_warmup_test(self):
532 """ACL plugin version check; learn MACs"""
533 reply = self.vapi.papi.acl_plugin_get_version()
534 self.assertEqual(reply.major, 1)
536 "Working with ACL plugin version: %d.%d" % (reply.major, reply.minor)
538 # minor version changes are non breaking
539 # self.assertEqual(reply.minor, 0)
541 def test_0001_acl_create(self):
542 """ACL create/delete test"""
544 self.logger.info("ACLP_TEST_START_0001")
545 # Create a permit-1234 ACL
546 r = [AclRule(is_permit=1, proto=17, ports=1234, sport_to=1235)]
547 # Test 1: add a new ACL
548 first_acl = VppAcl(self, rules=r, tag="permit 1234")
549 first_acl.add_vpp_config()
550 self.assertTrue(first_acl.query_vpp_config())
551 # The very first ACL gets #0
552 self.assertEqual(first_acl.acl_index, 0)
553 rr = first_acl.dump()
554 self.logger.info("Dumped ACL: " + str(rr))
555 self.assertEqual(len(rr), 1)
556 # We should have the same number of ACL entries as we had asked
557 self.assertEqual(len(rr[0].r), len(r))
558 # The rules should be the same. But because the submitted and returned
559 # are different types, we need to iterate over rules and keys to get
561 for i_rule in range(0, len(r) - 1):
562 encoded_rule = r[i_rule].encode()
563 for rule_key in encoded_rule:
564 self.assertEqual(rr[0].r[i_rule][rule_key], encoded_rule[rule_key])
566 # Create a deny-1234 ACL
568 AclRule(is_permit=0, proto=17, ports=1234, sport_to=1235),
569 AclRule(is_permit=1, proto=17, ports=0),
571 second_acl = VppAcl(self, rules=r_deny, tag="deny 1234;permit all")
572 second_acl.add_vpp_config()
573 self.assertTrue(second_acl.query_vpp_config())
574 # The second ACL gets #1
575 self.assertEqual(second_acl.acl_index, 1)
577 # Test 2: try to modify a nonexistent ACL
578 invalid_acl = VppAcl(self, acl_index=432, rules=r, tag="FFFF:FFFF")
579 reply = invalid_acl.add_vpp_config(expect_error=True)
581 # apply an ACL on an interface inbound, try to delete ACL, must fail
582 acl_if_list = VppAclInterface(
583 self, sw_if_index=self.pg0.sw_if_index, n_input=1, acls=[first_acl]
585 acl_if_list.add_vpp_config()
586 first_acl.remove_vpp_config(expect_error=True)
587 # Unapply an ACL and then try to delete it - must be ok
588 acl_if_list.remove_vpp_config()
589 first_acl.remove_vpp_config()
591 # apply an ACL on an interface inbound, try to delete ACL, must fail
592 acl_if_list = VppAclInterface(
593 self, sw_if_index=self.pg0.sw_if_index, n_input=0, acls=[second_acl]
595 acl_if_list.add_vpp_config()
596 second_acl.remove_vpp_config(expect_error=True)
597 # Unapply an ACL and then try to delete it - must be ok
598 acl_if_list.remove_vpp_config()
599 second_acl.remove_vpp_config()
601 # try to apply a nonexistent ACL - must fail
602 acl_if_list = VppAclInterface(
603 self, sw_if_index=self.pg0.sw_if_index, n_input=0, acls=[invalid_acl]
605 acl_if_list.add_vpp_config(expect_error=True)
607 self.logger.info("ACLP_TEST_FINISH_0001")
609 def test_0002_acl_permit_apply(self):
610 """permit ACL apply test"""
611 self.logger.info("ACLP_TEST_START_0002")
615 self.create_rule(self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.UDP])
618 self.create_rule(self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.TCP])
622 acl_idx = self.apply_rules(rules, "permit per-flow")
625 reply = self.vapi.papi.acl_stats_intf_counters_enable(enable=1)
627 # Traffic should still pass
628 self.run_verify_test(self.IP, self.IPV4, -1)
630 matches = self.statistics.get_counter("/acl/%d/matches" % acl_idx)
631 self.logger.info("stat segment counters: %s" % repr(matches))
632 cli = "show acl-plugin acl"
633 self.logger.info(self.vapi.ppcli(cli))
634 cli = "show acl-plugin tables"
635 self.logger.info(self.vapi.ppcli(cli))
637 total_hits = matches[0][0]["packets"] + matches[0][1]["packets"]
638 self.assertEqual(total_hits, 64)
641 reply = self.vapi.papi.acl_stats_intf_counters_enable(enable=0)
643 self.logger.info("ACLP_TEST_FINISH_0002")
645 def test_0003_acl_deny_apply(self):
646 """deny ACL apply test"""
647 self.logger.info("ACLP_TEST_START_0003")
648 # Add a deny-flows ACL
652 self.IPV4, self.DENY, self.PORTS_ALL, self.proto[self.IP][self.UDP]
655 # Permit ip any any in the end
656 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_ALL, 0))
659 acl_idx = self.apply_rules(rules, "deny per-flow;permit all")
662 reply = self.vapi.papi.acl_stats_intf_counters_enable(enable=1)
664 # Traffic should not pass
665 self.run_verify_negat_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP])
667 matches = self.statistics.get_counter("/acl/%d/matches" % acl_idx)
668 self.logger.info("stat segment counters: %s" % repr(matches))
669 cli = "show acl-plugin acl"
670 self.logger.info(self.vapi.ppcli(cli))
671 cli = "show acl-plugin tables"
672 self.logger.info(self.vapi.ppcli(cli))
673 self.assertEqual(matches[0][0]["packets"], 64)
675 reply = self.vapi.papi.acl_stats_intf_counters_enable(enable=0)
676 self.logger.info("ACLP_TEST_FINISH_0003")
677 # self.assertEqual(, 0)
679 def test_0004_vpp624_permit_icmpv4(self):
680 """VPP_624 permit ICMPv4"""
681 self.logger.info("ACLP_TEST_START_0004")
690 self.proto[self.ICMP][self.ICMPv4],
693 # deny ip any any in the end
694 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
697 self.apply_rules(rules, "permit icmpv4")
699 # Traffic should still pass
700 self.run_verify_test(self.ICMP, self.IPV4, self.proto[self.ICMP][self.ICMPv4])
702 self.logger.info("ACLP_TEST_FINISH_0004")
704 def test_0005_vpp624_permit_icmpv6(self):
705 """VPP_624 permit ICMPv6"""
706 self.logger.info("ACLP_TEST_START_0005")
715 self.proto[self.ICMP][self.ICMPv6],
718 # deny ip any any in the end
719 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
722 self.apply_rules(rules, "permit icmpv6")
724 # Traffic should still pass
725 self.run_verify_test(self.ICMP, self.IPV6, self.proto[self.ICMP][self.ICMPv6])
727 self.logger.info("ACLP_TEST_FINISH_0005")
729 def test_0006_vpp624_deny_icmpv4(self):
730 """VPP_624 deny ICMPv4"""
731 self.logger.info("ACLP_TEST_START_0006")
739 self.proto[self.ICMP][self.ICMPv4],
742 # permit ip any any in the end
743 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_ALL, 0))
746 self.apply_rules(rules, "deny icmpv4")
748 # Traffic should not pass
749 self.run_verify_negat_test(self.ICMP, self.IPV4, 0)
751 self.logger.info("ACLP_TEST_FINISH_0006")
753 def test_0007_vpp624_deny_icmpv6(self):
754 """VPP_624 deny ICMPv6"""
755 self.logger.info("ACLP_TEST_START_0007")
763 self.proto[self.ICMP][self.ICMPv6],
766 # deny ip any any in the end
767 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_ALL, 0))
770 self.apply_rules(rules, "deny icmpv6")
772 # Traffic should not pass
773 self.run_verify_negat_test(self.ICMP, self.IPV6, 0)
775 self.logger.info("ACLP_TEST_FINISH_0007")
777 def test_0008_tcp_permit_v4(self):
779 self.logger.info("ACLP_TEST_START_0008")
785 self.IPV4, self.PERMIT, self.PORTS_RANGE, self.proto[self.IP][self.TCP]
788 # deny ip any any in the end
789 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
792 self.apply_rules(rules, "permit ipv4 tcp")
794 # Traffic should still pass
795 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP])
797 self.logger.info("ACLP_TEST_FINISH_0008")
799 def test_0009_tcp_permit_v6(self):
801 self.logger.info("ACLP_TEST_START_0009")
807 self.IPV6, self.PERMIT, self.PORTS_RANGE, self.proto[self.IP][self.TCP]
810 # deny ip any any in the end
811 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
814 self.apply_rules(rules, "permit ip6 tcp")
816 # Traffic should still pass
817 self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.TCP])
819 self.logger.info("ACLP_TEST_FINISH_0008")
821 def test_0010_udp_permit_v4(self):
823 self.logger.info("ACLP_TEST_START_0010")
829 self.IPV4, self.PERMIT, self.PORTS_RANGE, self.proto[self.IP][self.UDP]
832 # deny ip any any in the end
833 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
836 self.apply_rules(rules, "permit ipv udp")
838 # Traffic should still pass
839 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP])
841 self.logger.info("ACLP_TEST_FINISH_0010")
843 def test_0011_udp_permit_v6(self):
845 self.logger.info("ACLP_TEST_START_0011")
851 self.IPV6, self.PERMIT, self.PORTS_RANGE, self.proto[self.IP][self.UDP]
854 # deny ip any any in the end
855 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
858 self.apply_rules(rules, "permit ip6 udp")
860 # Traffic should still pass
861 self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.UDP])
863 self.logger.info("ACLP_TEST_FINISH_0011")
865 def test_0012_tcp_deny(self):
867 self.logger.info("ACLP_TEST_START_0012")
873 self.IPV4, self.DENY, self.PORTS_RANGE, self.proto[self.IP][self.TCP]
878 self.IPV6, self.DENY, self.PORTS_RANGE, self.proto[self.IP][self.TCP]
881 # permit ip any any in the end
882 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_ALL, 0))
883 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_ALL, 0))
886 self.apply_rules(rules, "deny ip4/ip6 tcp")
888 # Traffic should not pass
889 self.run_verify_negat_test(
890 self.IP, self.IPRANDOM, self.proto[self.IP][self.TCP]
893 self.logger.info("ACLP_TEST_FINISH_0012")
895 def test_0013_udp_deny(self):
897 self.logger.info("ACLP_TEST_START_0013")
903 self.IPV4, self.DENY, self.PORTS_RANGE, self.proto[self.IP][self.UDP]
908 self.IPV6, self.DENY, self.PORTS_RANGE, self.proto[self.IP][self.UDP]
911 # permit ip any any in the end
912 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_ALL, 0))
913 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_ALL, 0))
916 self.apply_rules(rules, "deny ip4/ip6 udp")
918 # Traffic should not pass
919 self.run_verify_negat_test(
920 self.IP, self.IPRANDOM, self.proto[self.IP][self.UDP]
923 self.logger.info("ACLP_TEST_FINISH_0013")
925 def test_0014_acl_dump(self):
926 """verify add/dump acls"""
927 self.logger.info("ACLP_TEST_START_0014")
930 [self.IPV4, self.PERMIT, 1234, self.proto[self.IP][self.TCP]],
931 [self.IPV4, self.PERMIT, 2345, self.proto[self.IP][self.UDP]],
932 [self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.TCP]],
933 [self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.UDP]],
934 [self.IPV4, self.PERMIT, 5, self.proto[self.ICMP][self.ICMPv4]],
935 [self.IPV6, self.PERMIT, 4321, self.proto[self.IP][self.TCP]],
936 [self.IPV6, self.PERMIT, 5432, self.proto[self.IP][self.UDP]],
937 [self.IPV6, self.PERMIT, 0, self.proto[self.IP][self.TCP]],
938 [self.IPV6, self.PERMIT, 0, self.proto[self.IP][self.UDP]],
939 [self.IPV6, self.PERMIT, 6, self.proto[self.ICMP][self.ICMPv6]],
940 [self.IPV4, self.DENY, self.PORTS_ALL, 0],
941 [self.IPV4, self.DENY, 1234, self.proto[self.IP][self.TCP]],
942 [self.IPV4, self.DENY, 2345, self.proto[self.IP][self.UDP]],
943 [self.IPV4, self.DENY, 5, self.proto[self.ICMP][self.ICMPv4]],
944 [self.IPV6, self.DENY, 4321, self.proto[self.IP][self.TCP]],
945 [self.IPV6, self.DENY, 5432, self.proto[self.IP][self.UDP]],
946 [self.IPV6, self.DENY, 6, self.proto[self.ICMP][self.ICMPv6]],
947 [self.IPV6, self.DENY, self.PORTS_ALL, 0],
950 # Add and verify new ACLs
952 for i in range(len(r)):
953 rules.append(self.create_rule(r[i][0], r[i][1], r[i][2], r[i][3]))
955 acl = VppAcl(self, rules=rules)
960 for drules in result:
962 self.assertEqual(dr.is_permit, r[i][1])
963 self.assertEqual(dr.proto, r[i][3])
966 self.assertEqual(dr.srcport_or_icmptype_first, r[i][2])
969 self.assertEqual(dr.srcport_or_icmptype_first, 0)
970 self.assertEqual(dr.srcport_or_icmptype_last, 65535)
972 if dr.proto == self.proto[self.IP][self.TCP]:
974 dr.srcport_or_icmptype_first, self.tcp_sport_from - 1
977 dr.srcport_or_icmptype_first, self.tcp_sport_to + 1
980 dr.dstport_or_icmpcode_last, self.tcp_dport_from - 1
983 dr.dstport_or_icmpcode_last, self.tcp_dport_to + 1
985 elif dr.proto == self.proto[self.IP][self.UDP]:
987 dr.srcport_or_icmptype_first, self.udp_sport_from - 1
990 dr.srcport_or_icmptype_first, self.udp_sport_to + 1
993 dr.dstport_or_icmpcode_last, self.udp_dport_from - 1
996 dr.dstport_or_icmpcode_last, self.udp_dport_to + 1
1000 self.logger.info("ACLP_TEST_FINISH_0014")
1002 def test_0015_tcp_permit_port_v4(self):
1003 """permit single TCPv4"""
1004 self.logger.info("ACLP_TEST_START_0015")
1006 port = random.randint(16384, 65535)
1011 self.IPV4, self.PERMIT, port, self.proto[self.IP][self.TCP]
1014 # deny ip any any in the end
1015 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1018 self.apply_rules(rules, "permit ip4 tcp %d" % port)
1020 # Traffic should still pass
1021 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP], port)
1023 self.logger.info("ACLP_TEST_FINISH_0015")
1025 def test_0016_udp_permit_port_v4(self):
1026 """permit single UDPv4"""
1027 self.logger.info("ACLP_TEST_START_0016")
1029 port = random.randint(16384, 65535)
1034 self.IPV4, self.PERMIT, port, self.proto[self.IP][self.UDP]
1037 # deny ip any any in the end
1038 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1041 self.apply_rules(rules, "permit ip4 tcp %d" % port)
1043 # Traffic should still pass
1044 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP], port)
1046 self.logger.info("ACLP_TEST_FINISH_0016")
1048 def test_0017_tcp_permit_port_v6(self):
1049 """permit single TCPv6"""
1050 self.logger.info("ACLP_TEST_START_0017")
1052 port = random.randint(16384, 65535)
1057 self.IPV6, self.PERMIT, port, self.proto[self.IP][self.TCP]
1060 # deny ip any any in the end
1061 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1064 self.apply_rules(rules, "permit ip4 tcp %d" % port)
1066 # Traffic should still pass
1067 self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.TCP], port)
1069 self.logger.info("ACLP_TEST_FINISH_0017")
1071 def test_0018_udp_permit_port_v6(self):
1072 """permit single UDPv6"""
1073 self.logger.info("ACLP_TEST_START_0018")
1075 port = random.randint(16384, 65535)
1080 self.IPV6, self.PERMIT, port, self.proto[self.IP][self.UDP]
1083 # deny ip any any in the end
1084 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1087 self.apply_rules(rules, "permit ip4 tcp %d" % port)
1089 # Traffic should still pass
1090 self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.UDP], port)
1092 self.logger.info("ACLP_TEST_FINISH_0018")
1094 def test_0019_udp_deny_port(self):
1095 """deny single TCPv4/v6"""
1096 self.logger.info("ACLP_TEST_START_0019")
1098 port = random.randint(16384, 65535)
1102 self.create_rule(self.IPV4, self.DENY, port, self.proto[self.IP][self.TCP])
1105 self.create_rule(self.IPV6, self.DENY, port, self.proto[self.IP][self.TCP])
1107 # Permit ip any any in the end
1108 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_ALL, 0))
1109 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_ALL, 0))
1112 self.apply_rules(rules, "deny ip4/ip6 udp %d" % port)
1114 # Traffic should not pass
1115 self.run_verify_negat_test(
1116 self.IP, self.IPRANDOM, self.proto[self.IP][self.TCP], port
1119 self.logger.info("ACLP_TEST_FINISH_0019")
1121 def test_0020_udp_deny_port(self):
1122 """deny single UDPv4/v6"""
1123 self.logger.info("ACLP_TEST_START_0020")
1125 port = random.randint(16384, 65535)
1129 self.create_rule(self.IPV4, self.DENY, port, self.proto[self.IP][self.UDP])
1132 self.create_rule(self.IPV6, self.DENY, port, self.proto[self.IP][self.UDP])
1134 # Permit ip any any in the end
1135 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_ALL, 0))
1136 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_ALL, 0))
1139 self.apply_rules(rules, "deny ip4/ip6 udp %d" % port)
1141 # Traffic should not pass
1142 self.run_verify_negat_test(
1143 self.IP, self.IPRANDOM, self.proto[self.IP][self.UDP], port
1146 self.logger.info("ACLP_TEST_FINISH_0020")
1148 def test_0021_udp_deny_port_verify_fragment_deny(self):
1149 """deny single UDPv4/v6, permit ip any, verify non-initial fragment
1152 self.logger.info("ACLP_TEST_START_0021")
1154 port = random.randint(16384, 65535)
1158 self.create_rule(self.IPV4, self.DENY, port, self.proto[self.IP][self.UDP])
1161 self.create_rule(self.IPV6, self.DENY, port, self.proto[self.IP][self.UDP])
1163 # deny ip any any in the end
1164 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_ALL, 0))
1165 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_ALL, 0))
1168 self.apply_rules(rules, "deny ip4/ip6 udp %d" % port)
1170 # Traffic should not pass
1171 self.run_verify_negat_test(
1172 self.IP, self.IPRANDOM, self.proto[self.IP][self.UDP], port, True
1175 self.logger.info("ACLP_TEST_FINISH_0021")
1177 def test_0022_zero_length_udp_ipv4(self):
1178 """VPP-687 zero length udp ipv4 packet"""
1179 self.logger.info("ACLP_TEST_START_0022")
1181 port = random.randint(16384, 65535)
1186 self.IPV4, self.PERMIT, port, self.proto[self.IP][self.UDP]
1189 # deny ip any any in the end
1190 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1193 self.apply_rules(rules, "permit empty udp ip4 %d" % port)
1195 # Traffic should still pass
1196 # Create incoming packet streams for packet-generator interfaces
1198 pkts = self.create_stream(
1200 self.pg_if_packet_sizes,
1203 self.proto[self.IP][self.UDP],
1209 self.pg0.add_stream(pkts)
1210 pkts_cnt += len(pkts)
1212 # Enable packet capture and start packet sendingself.IPV
1213 self.pg_enable_capture(self.pg_interfaces)
1216 self.pg1.get_capture(pkts_cnt)
1218 self.logger.info("ACLP_TEST_FINISH_0022")
1220 def test_0023_zero_length_udp_ipv6(self):
1221 """VPP-687 zero length udp ipv6 packet"""
1222 self.logger.info("ACLP_TEST_START_0023")
1224 port = random.randint(16384, 65535)
1229 self.IPV6, self.PERMIT, port, self.proto[self.IP][self.UDP]
1232 # deny ip any any in the end
1233 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1236 self.apply_rules(rules, "permit empty udp ip6 %d" % port)
1238 # Traffic should still pass
1239 # Create incoming packet streams for packet-generator interfaces
1241 pkts = self.create_stream(
1243 self.pg_if_packet_sizes,
1246 self.proto[self.IP][self.UDP],
1252 self.pg0.add_stream(pkts)
1253 pkts_cnt += len(pkts)
1255 # Enable packet capture and start packet sendingself.IPV
1256 self.pg_enable_capture(self.pg_interfaces)
1259 # Verify outgoing packet streams per packet-generator interface
1260 self.pg1.get_capture(pkts_cnt)
1262 self.logger.info("ACLP_TEST_FINISH_0023")
1264 def test_0108_tcp_permit_v4(self):
1265 """permit TCPv4 + non-match range"""
1266 self.logger.info("ACLP_TEST_START_0108")
1272 self.IPV4, self.DENY, self.PORTS_RANGE_2, self.proto[self.IP][self.TCP]
1277 self.IPV4, self.PERMIT, self.PORTS_RANGE, self.proto[self.IP][self.TCP]
1280 # deny ip any any in the end
1281 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1284 self.apply_rules(rules, "permit ipv4 tcp")
1286 # Traffic should still pass
1287 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP])
1289 self.logger.info("ACLP_TEST_FINISH_0108")
1291 def test_0109_tcp_permit_v6(self):
1292 """permit TCPv6 + non-match range"""
1293 self.logger.info("ACLP_TEST_START_0109")
1299 self.IPV6, self.DENY, self.PORTS_RANGE_2, self.proto[self.IP][self.TCP]
1304 self.IPV6, self.PERMIT, self.PORTS_RANGE, self.proto[self.IP][self.TCP]
1307 # deny ip any any in the end
1308 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1311 self.apply_rules(rules, "permit ip6 tcp")
1313 # Traffic should still pass
1314 self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.TCP])
1316 self.logger.info("ACLP_TEST_FINISH_0109")
1318 def test_0110_udp_permit_v4(self):
1319 """permit UDPv4 + non-match range"""
1320 self.logger.info("ACLP_TEST_START_0110")
1326 self.IPV4, self.DENY, self.PORTS_RANGE_2, self.proto[self.IP][self.UDP]
1331 self.IPV4, self.PERMIT, self.PORTS_RANGE, self.proto[self.IP][self.UDP]
1334 # deny ip any any in the end
1335 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1338 self.apply_rules(rules, "permit ipv4 udp")
1340 # Traffic should still pass
1341 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP])
1343 self.logger.info("ACLP_TEST_FINISH_0110")
1345 def test_0111_udp_permit_v6(self):
1346 """permit UDPv6 + non-match range"""
1347 self.logger.info("ACLP_TEST_START_0111")
1353 self.IPV6, self.DENY, self.PORTS_RANGE_2, self.proto[self.IP][self.UDP]
1358 self.IPV6, self.PERMIT, self.PORTS_RANGE, self.proto[self.IP][self.UDP]
1361 # deny ip any any in the end
1362 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1365 self.apply_rules(rules, "permit ip6 udp")
1367 # Traffic should still pass
1368 self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.UDP])
1370 self.logger.info("ACLP_TEST_FINISH_0111")
1372 def test_0112_tcp_deny(self):
1373 """deny TCPv4/v6 + non-match range"""
1374 self.logger.info("ACLP_TEST_START_0112")
1383 self.proto[self.IP][self.TCP],
1391 self.proto[self.IP][self.TCP],
1396 self.IPV4, self.DENY, self.PORTS_RANGE, self.proto[self.IP][self.TCP]
1401 self.IPV6, self.DENY, self.PORTS_RANGE, self.proto[self.IP][self.TCP]
1404 # permit ip any any in the end
1405 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_ALL, 0))
1406 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_ALL, 0))
1409 self.apply_rules(rules, "deny ip4/ip6 tcp")
1411 # Traffic should not pass
1412 self.run_verify_negat_test(
1413 self.IP, self.IPRANDOM, self.proto[self.IP][self.TCP]
1416 self.logger.info("ACLP_TEST_FINISH_0112")
1418 def test_0113_udp_deny(self):
1419 """deny UDPv4/v6 + non-match range"""
1420 self.logger.info("ACLP_TEST_START_0113")
1429 self.proto[self.IP][self.UDP],
1437 self.proto[self.IP][self.UDP],
1442 self.IPV4, self.DENY, self.PORTS_RANGE, self.proto[self.IP][self.UDP]
1447 self.IPV6, self.DENY, self.PORTS_RANGE, self.proto[self.IP][self.UDP]
1450 # permit ip any any in the end
1451 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_ALL, 0))
1452 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_ALL, 0))
1455 self.apply_rules(rules, "deny ip4/ip6 udp")
1457 # Traffic should not pass
1458 self.run_verify_negat_test(
1459 self.IP, self.IPRANDOM, self.proto[self.IP][self.UDP]
1462 self.logger.info("ACLP_TEST_FINISH_0113")
1464 def test_0300_tcp_permit_v4_etype_aaaa(self):
1465 """permit TCPv4, send 0xAAAA etype"""
1466 self.logger.info("ACLP_TEST_START_0300")
1472 self.IPV4, self.DENY, self.PORTS_RANGE_2, self.proto[self.IP][self.TCP]
1477 self.IPV4, self.PERMIT, self.PORTS_RANGE, self.proto[self.IP][self.TCP]
1480 # deny ip any any in the end
1481 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1484 self.apply_rules(rules, "permit ipv4 tcp")
1486 # Traffic should still pass also for an odd ethertype
1487 self.run_verify_test(
1488 self.IP, self.IPV4, self.proto[self.IP][self.TCP], 0, False, True, 0xAAAA
1490 self.logger.info("ACLP_TEST_FINISH_0300")
1492 def test_0305_tcp_permit_v4_etype_blacklist_aaaa(self):
1493 """permit TCPv4, whitelist 0x0BBB ethertype, send 0xAAAA-blocked"""
1494 self.logger.info("ACLP_TEST_START_0305")
1500 self.IPV4, self.DENY, self.PORTS_RANGE_2, self.proto[self.IP][self.TCP]
1505 self.IPV4, self.PERMIT, self.PORTS_RANGE, self.proto[self.IP][self.TCP]
1508 # deny ip any any in the end
1509 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1512 self.apply_rules(rules, "permit ipv4 tcp")
1513 # whitelist the 0xbbbb etype - so the 0xaaaa should be blocked
1514 self.etype_whitelist([0xBBB], 1)
1516 # The oddball ethertype should be blocked
1517 self.run_verify_negat_test(
1518 self.IP, self.IPV4, self.proto[self.IP][self.TCP], 0, False, 0xAAAA
1521 # remove the whitelist
1522 self.etype_whitelist([], 0, add=False)
1524 self.logger.info("ACLP_TEST_FINISH_0305")
1526 def test_0306_tcp_permit_v4_etype_blacklist_aaaa(self):
1527 """permit TCPv4, whitelist 0x0BBB ethertype, send 0x0BBB - pass"""
1528 self.logger.info("ACLP_TEST_START_0306")
1534 self.IPV4, self.DENY, self.PORTS_RANGE_2, self.proto[self.IP][self.TCP]
1539 self.IPV4, self.PERMIT, self.PORTS_RANGE, self.proto[self.IP][self.TCP]
1542 # deny ip any any in the end
1543 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1546 self.apply_rules(rules, "permit ipv4 tcp")
1547 # whitelist the 0xbbbb etype - so the 0xaaaa should be blocked
1548 self.etype_whitelist([0xBBB], 1)
1550 # The whitelisted traffic, should pass
1551 self.run_verify_test(
1552 self.IP, self.IPV4, self.proto[self.IP][self.TCP], 0, False, True, 0x0BBB
1555 # remove the whitelist, the previously blocked 0xAAAA should pass now
1556 self.etype_whitelist([], 0, add=False)
1558 self.logger.info("ACLP_TEST_FINISH_0306")
1560 def test_0307_tcp_permit_v4_etype_blacklist_aaaa(self):
1561 """permit TCPv4, whitelist 0x0BBB, remove, send 0xAAAA - pass"""
1562 self.logger.info("ACLP_TEST_START_0307")
1568 self.IPV4, self.DENY, self.PORTS_RANGE_2, self.proto[self.IP][self.TCP]
1573 self.IPV4, self.PERMIT, self.PORTS_RANGE, self.proto[self.IP][self.TCP]
1576 # deny ip any any in the end
1577 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1580 self.apply_rules(rules, "permit ipv4 tcp")
1582 # whitelist the 0xbbbb etype - so the 0xaaaa should be blocked
1583 self.etype_whitelist([0xBBB], 1)
1584 # remove the whitelist, the previously blocked 0xAAAA should pass now
1585 self.etype_whitelist([], 0, add=False)
1587 # The whitelisted traffic, should pass
1588 self.run_verify_test(
1589 self.IP, self.IPV4, self.proto[self.IP][self.TCP], 0, False, True, 0xAAAA
1592 self.logger.info("ACLP_TEST_FINISH_0306")
1594 def test_0315_del_intf(self):
1595 """apply an acl and delete the interface"""
1596 self.logger.info("ACLP_TEST_START_0315")
1602 self.IPV4, self.DENY, self.PORTS_RANGE_2, self.proto[self.IP][self.TCP]
1607 self.IPV4, self.PERMIT, self.PORTS_RANGE, self.proto[self.IP][self.TCP]
1610 # deny ip any any in the end
1611 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1613 # create an interface
1615 intf.append(VppLoInterface(self))
1618 self.apply_rules_to(rules, "permit ipv4 tcp", intf[0].sw_if_index)
1620 # Remove the interface
1621 intf[0].remove_vpp_config()
1623 self.logger.info("ACLP_TEST_FINISH_0315")
1626 if __name__ == "__main__":
1627 unittest.main(testRunner=VppTestRunner)