2 """ACL plugin Test Case HLD:
8 from scapy.packet import Raw
9 from scapy.layers.l2 import Ether
10 from scapy.layers.inet import IP, TCP, UDP, ICMP
11 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest
12 from scapy.layers.inet6 import IPv6ExtHdrFragment
13 from framework import VppTestCase, VppTestRunner
14 from framework import tag_fixme_vpp_workers
15 from util import Host, ppp
16 from ipaddress import IPv4Network, IPv6Network
18 from vpp_lo_interface import VppLoInterface
19 from vpp_acl import AclRule, VppAcl, VppAclInterface, VppEtypeWhitelist
20 from vpp_ip import INVALID_INDEX
23 @tag_fixme_vpp_workers
24 class TestACLplugin(VppTestCase):
25 """ACL plugin Test Case"""
41 proto = [[6, 17], [1, 58]]
42 proto_map = {1: "ICMP", 58: "ICMPv6EchoRequest", 6: "TCP", 17: "UDP"}
54 udp_sport_to = udp_sport_from + 5
55 udp_dport_from = 20000
56 udp_dport_to = udp_dport_from + 5000
58 tcp_sport_to = tcp_sport_from + 5
59 tcp_dport_from = 40000
60 tcp_dport_to = tcp_dport_from + 5000
63 udp_sport_to_2 = udp_sport_from_2 + 5
64 udp_dport_from_2 = 30000
65 udp_dport_to_2 = udp_dport_from_2 + 5000
66 tcp_sport_from_2 = 130
67 tcp_sport_to_2 = tcp_sport_from_2 + 5
68 tcp_dport_from_2 = 20000
69 tcp_dport_to_2 = tcp_dport_from_2 + 5000
71 icmp4_type = 8 # echo request
73 icmp6_type = 128 # echo request
89 Perform standard class setup (defined by class method setUpClass in
90 class VppTestCase) before running the test case, set test case related
91 variables and configure VPP.
93 super(TestACLplugin, cls).setUpClass()
96 # Create 2 pg interfaces
97 cls.create_pg_interfaces(range(2))
99 # Packet flows mapping pg0 -> pg1, pg2 etc.
101 cls.flows[cls.pg0] = [cls.pg1]
104 cls.pg_if_packet_sizes = [64, 512, 1518, 9018]
106 # Create BD with MAC learning and unknown unicast flooding disabled
107 # and put interfaces to this BD
108 cls.vapi.bridge_domain_add_del(bd_id=cls.bd_id, uu_flood=1, learn=1)
109 for pg_if in cls.pg_interfaces:
110 cls.vapi.sw_interface_set_l2_bridge(
111 rx_sw_if_index=pg_if.sw_if_index, bd_id=cls.bd_id
114 # Set up all interfaces
115 for i in cls.pg_interfaces:
118 # Mapping between packet-generator index and lists of test hosts
119 cls.hosts_by_pg_idx = dict()
120 for pg_if in cls.pg_interfaces:
121 cls.hosts_by_pg_idx[pg_if.sw_if_index] = []
123 # Create list of deleted hosts
124 cls.deleted_hosts_by_pg_idx = dict()
125 for pg_if in cls.pg_interfaces:
126 cls.deleted_hosts_by_pg_idx[pg_if.sw_if_index] = []
128 # warm-up the mac address tables
132 n_int = len(cls.pg_interfaces)
133 macs_per_if = count // n_int
135 for pg_if in cls.pg_interfaces:
137 start_nr = macs_per_if * i + start
139 count + start if i == (n_int - 1) else macs_per_if * (i + 1) + start
141 hosts = cls.hosts_by_pg_idx[pg_if.sw_if_index]
142 for j in range(int(start_nr), int(end_nr)):
144 "00:00:00:ff:%02x:%02x" % (pg_if.sw_if_index, j),
145 "172.17.1%02x.%u" % (pg_if.sw_if_index, j),
146 "2017:dead:%02x::%u" % (pg_if.sw_if_index, j),
151 super(TestACLplugin, cls).tearDownClass()
155 def tearDownClass(cls):
156 super(TestACLplugin, cls).tearDownClass()
159 super(TestACLplugin, self).setUp()
160 self.reset_packet_infos()
164 Show various debug prints after each test.
166 super(TestACLplugin, self).tearDown()
168 def show_commands_at_teardown(self):
169 cli = "show vlib graph l2-input-feat-arc"
170 self.logger.info(self.vapi.ppcli(cli))
171 cli = "show vlib graph l2-input-feat-arc-end"
172 self.logger.info(self.vapi.ppcli(cli))
173 cli = "show vlib graph l2-output-feat-arc"
174 self.logger.info(self.vapi.ppcli(cli))
175 cli = "show vlib graph l2-output-feat-arc-end"
176 self.logger.info(self.vapi.ppcli(cli))
177 self.logger.info(self.vapi.ppcli("show l2fib verbose"))
178 self.logger.info(self.vapi.ppcli("show acl-plugin acl"))
179 self.logger.info(self.vapi.ppcli("show acl-plugin interface"))
180 self.logger.info(self.vapi.ppcli("show acl-plugin tables"))
181 self.logger.info(self.vapi.ppcli("show bridge-domain %s detail" % self.bd_id))
195 src_prefix = IPv6Network((s_ip, s_prefix))
196 dst_prefix = IPv6Network((d_ip, d_prefix))
198 src_prefix = IPv4Network((s_ip, s_prefix))
199 dst_prefix = IPv4Network((d_ip, d_prefix))
201 is_permit=permit_deny,
204 src_prefix=src_prefix,
205 dst_prefix=dst_prefix,
208 def apply_rules(self, rules, tag=None):
209 acl = VppAcl(self, rules, tag=tag)
211 self.logger.info("Dumped ACL: " + str(acl.dump()))
212 # Apply a ACL on the interface as inbound
213 for i in self.pg_interfaces:
214 acl_if = VppAclInterface(
215 self, sw_if_index=i.sw_if_index, n_input=1, acls=[acl]
217 acl_if.add_vpp_config()
220 def apply_rules_to(self, rules, tag=None, sw_if_index=INVALID_INDEX):
221 acl = VppAcl(self, rules, tag=tag)
223 self.logger.info("Dumped ACL: " + str(acl.dump()))
224 # Apply a ACL on the interface as inbound
225 acl_if = VppAclInterface(self, sw_if_index=sw_if_index, n_input=1, acls=[acl])
228 def etype_whitelist(self, whitelist, n_input, add=True):
229 # Apply whitelists on all the interfaces
232 for i in self.pg_interfaces:
236 sw_if_index=i.sw_if_index,
242 if hasattr(self, "_wl"):
244 wl.remove_vpp_config()
246 def create_upper_layer(self, packet_index, proto, ports=0):
247 p = self.proto_map[proto]
251 sport=random.randint(self.udp_sport_from, self.udp_sport_to),
252 dport=random.randint(self.udp_dport_from, self.udp_dport_to),
255 return UDP(sport=ports, dport=ports)
259 sport=random.randint(self.tcp_sport_from, self.tcp_sport_to),
260 dport=random.randint(self.tcp_dport_from, self.tcp_dport_to),
263 return TCP(sport=ports, dport=ports)
279 Create input packet stream for defined interface using hosts or
282 :param object src_if: Interface to create packet stream for.
283 :param list packet_sizes: List of required packet sizes.
284 :param traffic_type: 1: ICMP packet, 2: IPv6 with EH, 0: otherwise.
285 :return: Stream of packets.
288 if self.flows.__contains__(src_if):
289 src_hosts = self.hosts_by_pg_idx[src_if.sw_if_index]
290 for dst_if in self.flows[src_if]:
291 dst_hosts = self.hosts_by_pg_idx[dst_if.sw_if_index]
292 n_int = len(dst_hosts) * len(src_hosts)
293 for i in range(0, n_int):
294 dst_host = dst_hosts[int(i / len(src_hosts))]
295 src_host = src_hosts[i % len(src_hosts)]
296 pkt_info = self.create_packet_info(src_if, dst_if)
302 pkt_info.ip = random.choice([0, 1])
304 pkt_info.proto = random.choice(self.proto[self.IP])
306 pkt_info.proto = proto
307 payload = self.info_to_payload(pkt_info)
308 p = Ether(dst=dst_host.mac, src=src_host.mac)
310 p = Ether(dst=dst_host.mac, src=src_host.mac, type=etype)
312 p /= IPv6(dst=dst_host.ip6, src=src_host.ip6)
314 p /= IPv6ExtHdrFragment(offset=64, m=1)
318 src=src_host.ip4, dst=dst_host.ip4, flags=1, frag=64
321 p /= IP(src=src_host.ip4, dst=dst_host.ip4)
322 if traffic_type == self.ICMP:
324 p /= ICMPv6EchoRequest(
325 type=self.icmp6_type, code=self.icmp6_code
328 p /= ICMP(type=self.icmp4_type, code=self.icmp4_code)
330 p /= self.create_upper_layer(i, pkt_info.proto, ports)
333 pkt_info.data = p.copy()
335 size = random.choice(packet_sizes)
336 self.extend_packet(p, size)
340 def verify_capture(self, pg_if, capture, traffic_type=0, ip_type=0, etype=-1):
342 Verify captured input packet stream for defined interface.
344 :param object pg_if: Interface to verify captured packet stream for.
345 :param list capture: Captured packet stream.
346 :param traffic_type: 1: ICMP packet, 2: IPv6 with EH, 0: otherwise.
349 for i in self.pg_interfaces:
350 last_info[i.sw_if_index] = None
351 dst_sw_if_index = pg_if.sw_if_index
352 for packet in capture:
354 if packet[Ether].type != etype:
355 self.logger.error(ppp("Unexpected ethertype in packet:", packet))
359 # Raw data for ICMPv6 are stored in ICMPv6EchoRequest.data
360 if traffic_type == self.ICMP and ip_type == self.IPV6:
361 payload_info = self.payload_to_info(
362 packet[ICMPv6EchoRequest], "data"
364 payload = packet[ICMPv6EchoRequest]
366 payload_info = self.payload_to_info(packet[Raw])
367 payload = packet[self.proto_map[payload_info.proto]]
370 ppp("Unexpected or invalid packet (outside network):", packet)
375 self.assertEqual(payload_info.ip, ip_type)
376 if traffic_type == self.ICMP:
378 if payload_info.ip == 0:
379 self.assertEqual(payload.type, self.icmp4_type)
380 self.assertEqual(payload.code, self.icmp4_code)
382 self.assertEqual(payload.type, self.icmp6_type)
383 self.assertEqual(payload.code, self.icmp6_code)
386 ppp("Unexpected or invalid packet (outside network):", packet)
391 ip_version = IPv6 if payload_info.ip == 1 else IP
393 ip = packet[ip_version]
394 packet_index = payload_info.index
396 self.assertEqual(payload_info.dst, dst_sw_if_index)
398 "Got packet on port %s: src=%u (id=%u)"
399 % (pg_if.name, payload_info.src, packet_index)
401 next_info = self.get_next_packet_info_for_interface2(
402 payload_info.src, dst_sw_if_index, last_info[payload_info.src]
404 last_info[payload_info.src] = next_info
405 self.assertTrue(next_info is not None)
406 self.assertEqual(packet_index, next_info.index)
407 saved_packet = next_info.data
408 # Check standard fields
409 self.assertEqual(ip.src, saved_packet[ip_version].src)
410 self.assertEqual(ip.dst, saved_packet[ip_version].dst)
411 p = self.proto_map[payload_info.proto]
414 self.assertEqual(tcp.sport, saved_packet[TCP].sport)
415 self.assertEqual(tcp.dport, saved_packet[TCP].dport)
418 self.assertEqual(udp.sport, saved_packet[UDP].sport)
419 self.assertEqual(udp.dport, saved_packet[UDP].dport)
421 self.logger.error(ppp("Unexpected or invalid packet:", packet))
423 for i in self.pg_interfaces:
424 remaining_packet = self.get_next_packet_info_for_interface2(
425 i, dst_sw_if_index, last_info[i.sw_if_index]
428 remaining_packet is None,
429 "Port %u: Packet expected from source %u didn't arrive"
430 % (dst_sw_if_index, i.sw_if_index),
433 def run_traffic_no_check(self):
435 # Create incoming packet streams for packet-generator interfaces
436 for i in self.pg_interfaces:
437 if self.flows.__contains__(i):
438 pkts = self.create_stream(i, self.pg_if_packet_sizes)
442 # Enable packet capture and start packet sending
443 self.pg_enable_capture(self.pg_interfaces)
457 # Create incoming packet streams for packet-generator interfaces
459 for i in self.pg_interfaces:
460 if self.flows.__contains__(i):
461 pkts = self.create_stream(
463 self.pg_if_packet_sizes,
474 pkts_cnt += len(pkts)
476 # Enable packet capture and start packet sendingself.IPV
477 self.pg_enable_capture(self.pg_interfaces)
479 self.logger.info("sent packets count: %d" % pkts_cnt)
482 # Verify outgoing packet streams per packet-generator interface
483 for src_if in self.pg_interfaces:
484 if self.flows.__contains__(src_if):
485 for dst_if in self.flows[src_if]:
486 capture = dst_if.get_capture(pkts_cnt)
487 self.logger.info("Verifying capture on interface %s" % dst_if.name)
488 self.verify_capture(dst_if, capture, traffic_type, ip_type, etype)
490 def run_verify_negat_test(
491 self, traffic_type=0, ip_type=0, proto=-1, ports=0, frags=False, etype=-1
495 self.reset_packet_infos()
496 for i in self.pg_interfaces:
497 if self.flows.__contains__(i):
498 pkts = self.create_stream(
500 self.pg_if_packet_sizes,
511 pkts_cnt += len(pkts)
513 # Enable packet capture and start packet sending
514 self.pg_enable_capture(self.pg_interfaces)
516 self.logger.info("sent packets count: %d" % pkts_cnt)
519 # Verify outgoing packet streams per packet-generator interface
520 for src_if in self.pg_interfaces:
521 if self.flows.__contains__(src_if):
522 for dst_if in self.flows[src_if]:
523 self.logger.info("Verifying capture on interface %s" % dst_if.name)
524 capture = dst_if.get_capture(0)
525 self.assertEqual(len(capture), 0)
527 def test_0000_warmup_test(self):
528 """ACL plugin version check; learn MACs"""
529 reply = self.vapi.papi.acl_plugin_get_version()
530 self.assertEqual(reply.major, 1)
532 "Working with ACL plugin version: %d.%d" % (reply.major, reply.minor)
534 # minor version changes are non breaking
535 # self.assertEqual(reply.minor, 0)
537 def test_0001_acl_create(self):
538 """ACL create/delete test"""
540 self.logger.info("ACLP_TEST_START_0001")
541 # Create a permit-1234 ACL
542 r = [AclRule(is_permit=1, proto=17, ports=1234, sport_to=1235)]
543 # Test 1: add a new ACL
544 first_acl = VppAcl(self, rules=r, tag="permit 1234")
545 first_acl.add_vpp_config()
546 self.assertTrue(first_acl.query_vpp_config())
547 # The very first ACL gets #0
548 self.assertEqual(first_acl.acl_index, 0)
549 rr = first_acl.dump()
550 self.logger.info("Dumped ACL: " + str(rr))
551 self.assertEqual(len(rr), 1)
552 # We should have the same number of ACL entries as we had asked
553 self.assertEqual(len(rr[0].r), len(r))
554 # The rules should be the same. But because the submitted and returned
555 # are different types, we need to iterate over rules and keys to get
557 for i_rule in range(0, len(r) - 1):
558 encoded_rule = r[i_rule].encode()
559 for rule_key in encoded_rule:
560 self.assertEqual(rr[0].r[i_rule][rule_key], encoded_rule[rule_key])
562 # Create a deny-1234 ACL
564 AclRule(is_permit=0, proto=17, ports=1234, sport_to=1235),
565 AclRule(is_permit=1, proto=17, ports=0),
567 second_acl = VppAcl(self, rules=r_deny, tag="deny 1234;permit all")
568 second_acl.add_vpp_config()
569 self.assertTrue(second_acl.query_vpp_config())
570 # The second ACL gets #1
571 self.assertEqual(second_acl.acl_index, 1)
573 # Test 2: try to modify a nonexistent ACL
574 invalid_acl = VppAcl(self, acl_index=432, rules=r, tag="FFFF:FFFF")
575 reply = invalid_acl.add_vpp_config(expect_error=True)
577 # apply an ACL on an interface inbound, try to delete ACL, must fail
578 acl_if_list = VppAclInterface(
579 self, sw_if_index=self.pg0.sw_if_index, n_input=1, acls=[first_acl]
581 acl_if_list.add_vpp_config()
582 first_acl.remove_vpp_config(expect_error=True)
583 # Unapply an ACL and then try to delete it - must be ok
584 acl_if_list.remove_vpp_config()
585 first_acl.remove_vpp_config()
587 # apply an ACL on an interface inbound, try to delete ACL, must fail
588 acl_if_list = VppAclInterface(
589 self, sw_if_index=self.pg0.sw_if_index, n_input=0, acls=[second_acl]
591 acl_if_list.add_vpp_config()
592 second_acl.remove_vpp_config(expect_error=True)
593 # Unapply an ACL and then try to delete it - must be ok
594 acl_if_list.remove_vpp_config()
595 second_acl.remove_vpp_config()
597 # try to apply a nonexistent ACL - must fail
598 acl_if_list = VppAclInterface(
599 self, sw_if_index=self.pg0.sw_if_index, n_input=0, acls=[invalid_acl]
601 acl_if_list.add_vpp_config(expect_error=True)
603 self.logger.info("ACLP_TEST_FINISH_0001")
605 def test_0002_acl_permit_apply(self):
606 """permit ACL apply test"""
607 self.logger.info("ACLP_TEST_START_0002")
611 self.create_rule(self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.UDP])
614 self.create_rule(self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.TCP])
618 acl_idx = self.apply_rules(rules, "permit per-flow")
621 reply = self.vapi.papi.acl_stats_intf_counters_enable(enable=1)
623 # Traffic should still pass
624 self.run_verify_test(self.IP, self.IPV4, -1)
626 matches = self.statistics.get_counter("/acl/%d/matches" % acl_idx)
627 self.logger.info("stat segment counters: %s" % repr(matches))
628 cli = "show acl-plugin acl"
629 self.logger.info(self.vapi.ppcli(cli))
630 cli = "show acl-plugin tables"
631 self.logger.info(self.vapi.ppcli(cli))
633 total_hits = matches[0][0]["packets"] + matches[0][1]["packets"]
634 self.assertEqual(total_hits, 64)
637 reply = self.vapi.papi.acl_stats_intf_counters_enable(enable=0)
639 self.logger.info("ACLP_TEST_FINISH_0002")
641 def test_0003_acl_deny_apply(self):
642 """deny ACL apply test"""
643 self.logger.info("ACLP_TEST_START_0003")
644 # Add a deny-flows ACL
648 self.IPV4, self.DENY, self.PORTS_ALL, self.proto[self.IP][self.UDP]
651 # Permit ip any any in the end
652 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_ALL, 0))
655 acl_idx = self.apply_rules(rules, "deny per-flow;permit all")
658 reply = self.vapi.papi.acl_stats_intf_counters_enable(enable=1)
660 # Traffic should not pass
661 self.run_verify_negat_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP])
663 matches = self.statistics.get_counter("/acl/%d/matches" % acl_idx)
664 self.logger.info("stat segment counters: %s" % repr(matches))
665 cli = "show acl-plugin acl"
666 self.logger.info(self.vapi.ppcli(cli))
667 cli = "show acl-plugin tables"
668 self.logger.info(self.vapi.ppcli(cli))
669 self.assertEqual(matches[0][0]["packets"], 64)
671 reply = self.vapi.papi.acl_stats_intf_counters_enable(enable=0)
672 self.logger.info("ACLP_TEST_FINISH_0003")
673 # self.assertEqual(, 0)
675 def test_0004_vpp624_permit_icmpv4(self):
676 """VPP_624 permit ICMPv4"""
677 self.logger.info("ACLP_TEST_START_0004")
686 self.proto[self.ICMP][self.ICMPv4],
689 # deny ip any any in the end
690 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
693 self.apply_rules(rules, "permit icmpv4")
695 # Traffic should still pass
696 self.run_verify_test(self.ICMP, self.IPV4, self.proto[self.ICMP][self.ICMPv4])
698 self.logger.info("ACLP_TEST_FINISH_0004")
700 def test_0005_vpp624_permit_icmpv6(self):
701 """VPP_624 permit ICMPv6"""
702 self.logger.info("ACLP_TEST_START_0005")
711 self.proto[self.ICMP][self.ICMPv6],
714 # deny ip any any in the end
715 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
718 self.apply_rules(rules, "permit icmpv6")
720 # Traffic should still pass
721 self.run_verify_test(self.ICMP, self.IPV6, self.proto[self.ICMP][self.ICMPv6])
723 self.logger.info("ACLP_TEST_FINISH_0005")
725 def test_0006_vpp624_deny_icmpv4(self):
726 """VPP_624 deny ICMPv4"""
727 self.logger.info("ACLP_TEST_START_0006")
735 self.proto[self.ICMP][self.ICMPv4],
738 # permit ip any any in the end
739 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_ALL, 0))
742 self.apply_rules(rules, "deny icmpv4")
744 # Traffic should not pass
745 self.run_verify_negat_test(self.ICMP, self.IPV4, 0)
747 self.logger.info("ACLP_TEST_FINISH_0006")
749 def test_0007_vpp624_deny_icmpv6(self):
750 """VPP_624 deny ICMPv6"""
751 self.logger.info("ACLP_TEST_START_0007")
759 self.proto[self.ICMP][self.ICMPv6],
762 # deny ip any any in the end
763 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_ALL, 0))
766 self.apply_rules(rules, "deny icmpv6")
768 # Traffic should not pass
769 self.run_verify_negat_test(self.ICMP, self.IPV6, 0)
771 self.logger.info("ACLP_TEST_FINISH_0007")
773 def test_0008_tcp_permit_v4(self):
775 self.logger.info("ACLP_TEST_START_0008")
781 self.IPV4, self.PERMIT, self.PORTS_RANGE, self.proto[self.IP][self.TCP]
784 # deny ip any any in the end
785 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
788 self.apply_rules(rules, "permit ipv4 tcp")
790 # Traffic should still pass
791 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP])
793 self.logger.info("ACLP_TEST_FINISH_0008")
795 def test_0009_tcp_permit_v6(self):
797 self.logger.info("ACLP_TEST_START_0009")
803 self.IPV6, self.PERMIT, self.PORTS_RANGE, self.proto[self.IP][self.TCP]
806 # deny ip any any in the end
807 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
810 self.apply_rules(rules, "permit ip6 tcp")
812 # Traffic should still pass
813 self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.TCP])
815 self.logger.info("ACLP_TEST_FINISH_0008")
817 def test_0010_udp_permit_v4(self):
819 self.logger.info("ACLP_TEST_START_0010")
825 self.IPV4, self.PERMIT, self.PORTS_RANGE, self.proto[self.IP][self.UDP]
828 # deny ip any any in the end
829 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
832 self.apply_rules(rules, "permit ipv udp")
834 # Traffic should still pass
835 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP])
837 self.logger.info("ACLP_TEST_FINISH_0010")
839 def test_0011_udp_permit_v6(self):
841 self.logger.info("ACLP_TEST_START_0011")
847 self.IPV6, self.PERMIT, self.PORTS_RANGE, self.proto[self.IP][self.UDP]
850 # deny ip any any in the end
851 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
854 self.apply_rules(rules, "permit ip6 udp")
856 # Traffic should still pass
857 self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.UDP])
859 self.logger.info("ACLP_TEST_FINISH_0011")
861 def test_0012_tcp_deny(self):
863 self.logger.info("ACLP_TEST_START_0012")
869 self.IPV4, self.DENY, self.PORTS_RANGE, self.proto[self.IP][self.TCP]
874 self.IPV6, self.DENY, self.PORTS_RANGE, self.proto[self.IP][self.TCP]
877 # permit ip any any in the end
878 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_ALL, 0))
879 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_ALL, 0))
882 self.apply_rules(rules, "deny ip4/ip6 tcp")
884 # Traffic should not pass
885 self.run_verify_negat_test(
886 self.IP, self.IPRANDOM, self.proto[self.IP][self.TCP]
889 self.logger.info("ACLP_TEST_FINISH_0012")
891 def test_0013_udp_deny(self):
893 self.logger.info("ACLP_TEST_START_0013")
899 self.IPV4, self.DENY, self.PORTS_RANGE, self.proto[self.IP][self.UDP]
904 self.IPV6, self.DENY, self.PORTS_RANGE, self.proto[self.IP][self.UDP]
907 # permit ip any any in the end
908 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_ALL, 0))
909 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_ALL, 0))
912 self.apply_rules(rules, "deny ip4/ip6 udp")
914 # Traffic should not pass
915 self.run_verify_negat_test(
916 self.IP, self.IPRANDOM, self.proto[self.IP][self.UDP]
919 self.logger.info("ACLP_TEST_FINISH_0013")
921 def test_0014_acl_dump(self):
922 """verify add/dump acls"""
923 self.logger.info("ACLP_TEST_START_0014")
926 [self.IPV4, self.PERMIT, 1234, self.proto[self.IP][self.TCP]],
927 [self.IPV4, self.PERMIT, 2345, self.proto[self.IP][self.UDP]],
928 [self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.TCP]],
929 [self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.UDP]],
930 [self.IPV4, self.PERMIT, 5, self.proto[self.ICMP][self.ICMPv4]],
931 [self.IPV6, self.PERMIT, 4321, self.proto[self.IP][self.TCP]],
932 [self.IPV6, self.PERMIT, 5432, self.proto[self.IP][self.UDP]],
933 [self.IPV6, self.PERMIT, 0, self.proto[self.IP][self.TCP]],
934 [self.IPV6, self.PERMIT, 0, self.proto[self.IP][self.UDP]],
935 [self.IPV6, self.PERMIT, 6, self.proto[self.ICMP][self.ICMPv6]],
936 [self.IPV4, self.DENY, self.PORTS_ALL, 0],
937 [self.IPV4, self.DENY, 1234, self.proto[self.IP][self.TCP]],
938 [self.IPV4, self.DENY, 2345, self.proto[self.IP][self.UDP]],
939 [self.IPV4, self.DENY, 5, self.proto[self.ICMP][self.ICMPv4]],
940 [self.IPV6, self.DENY, 4321, self.proto[self.IP][self.TCP]],
941 [self.IPV6, self.DENY, 5432, self.proto[self.IP][self.UDP]],
942 [self.IPV6, self.DENY, 6, self.proto[self.ICMP][self.ICMPv6]],
943 [self.IPV6, self.DENY, self.PORTS_ALL, 0],
946 # Add and verify new ACLs
948 for i in range(len(r)):
949 rules.append(self.create_rule(r[i][0], r[i][1], r[i][2], r[i][3]))
951 acl = VppAcl(self, rules=rules)
956 for drules in result:
958 self.assertEqual(dr.is_permit, r[i][1])
959 self.assertEqual(dr.proto, r[i][3])
962 self.assertEqual(dr.srcport_or_icmptype_first, r[i][2])
965 self.assertEqual(dr.srcport_or_icmptype_first, 0)
966 self.assertEqual(dr.srcport_or_icmptype_last, 65535)
968 if dr.proto == self.proto[self.IP][self.TCP]:
970 dr.srcport_or_icmptype_first, self.tcp_sport_from - 1
973 dr.srcport_or_icmptype_first, self.tcp_sport_to + 1
976 dr.dstport_or_icmpcode_last, self.tcp_dport_from - 1
979 dr.dstport_or_icmpcode_last, self.tcp_dport_to + 1
981 elif dr.proto == self.proto[self.IP][self.UDP]:
983 dr.srcport_or_icmptype_first, self.udp_sport_from - 1
986 dr.srcport_or_icmptype_first, self.udp_sport_to + 1
989 dr.dstport_or_icmpcode_last, self.udp_dport_from - 1
992 dr.dstport_or_icmpcode_last, self.udp_dport_to + 1
996 self.logger.info("ACLP_TEST_FINISH_0014")
998 def test_0015_tcp_permit_port_v4(self):
999 """permit single TCPv4"""
1000 self.logger.info("ACLP_TEST_START_0015")
1002 port = random.randint(16384, 65535)
1007 self.IPV4, self.PERMIT, port, self.proto[self.IP][self.TCP]
1010 # deny ip any any in the end
1011 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1014 self.apply_rules(rules, "permit ip4 tcp %d" % port)
1016 # Traffic should still pass
1017 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP], port)
1019 self.logger.info("ACLP_TEST_FINISH_0015")
1021 def test_0016_udp_permit_port_v4(self):
1022 """permit single UDPv4"""
1023 self.logger.info("ACLP_TEST_START_0016")
1025 port = random.randint(16384, 65535)
1030 self.IPV4, self.PERMIT, port, self.proto[self.IP][self.UDP]
1033 # deny ip any any in the end
1034 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1037 self.apply_rules(rules, "permit ip4 tcp %d" % port)
1039 # Traffic should still pass
1040 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP], port)
1042 self.logger.info("ACLP_TEST_FINISH_0016")
1044 def test_0017_tcp_permit_port_v6(self):
1045 """permit single TCPv6"""
1046 self.logger.info("ACLP_TEST_START_0017")
1048 port = random.randint(16384, 65535)
1053 self.IPV6, self.PERMIT, port, self.proto[self.IP][self.TCP]
1056 # deny ip any any in the end
1057 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1060 self.apply_rules(rules, "permit ip4 tcp %d" % port)
1062 # Traffic should still pass
1063 self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.TCP], port)
1065 self.logger.info("ACLP_TEST_FINISH_0017")
1067 def test_0018_udp_permit_port_v6(self):
1068 """permit single UDPv6"""
1069 self.logger.info("ACLP_TEST_START_0018")
1071 port = random.randint(16384, 65535)
1076 self.IPV6, self.PERMIT, port, self.proto[self.IP][self.UDP]
1079 # deny ip any any in the end
1080 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1083 self.apply_rules(rules, "permit ip4 tcp %d" % port)
1085 # Traffic should still pass
1086 self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.UDP], port)
1088 self.logger.info("ACLP_TEST_FINISH_0018")
1090 def test_0019_udp_deny_port(self):
1091 """deny single TCPv4/v6"""
1092 self.logger.info("ACLP_TEST_START_0019")
1094 port = random.randint(16384, 65535)
1098 self.create_rule(self.IPV4, self.DENY, port, self.proto[self.IP][self.TCP])
1101 self.create_rule(self.IPV6, self.DENY, port, self.proto[self.IP][self.TCP])
1103 # Permit ip any any in the end
1104 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_ALL, 0))
1105 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_ALL, 0))
1108 self.apply_rules(rules, "deny ip4/ip6 udp %d" % port)
1110 # Traffic should not pass
1111 self.run_verify_negat_test(
1112 self.IP, self.IPRANDOM, self.proto[self.IP][self.TCP], port
1115 self.logger.info("ACLP_TEST_FINISH_0019")
1117 def test_0020_udp_deny_port(self):
1118 """deny single UDPv4/v6"""
1119 self.logger.info("ACLP_TEST_START_0020")
1121 port = random.randint(16384, 65535)
1125 self.create_rule(self.IPV4, self.DENY, port, self.proto[self.IP][self.UDP])
1128 self.create_rule(self.IPV6, self.DENY, port, self.proto[self.IP][self.UDP])
1130 # Permit ip any any in the end
1131 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_ALL, 0))
1132 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_ALL, 0))
1135 self.apply_rules(rules, "deny ip4/ip6 udp %d" % port)
1137 # Traffic should not pass
1138 self.run_verify_negat_test(
1139 self.IP, self.IPRANDOM, self.proto[self.IP][self.UDP], port
1142 self.logger.info("ACLP_TEST_FINISH_0020")
1144 def test_0021_udp_deny_port_verify_fragment_deny(self):
1145 """deny single UDPv4/v6, permit ip any, verify non-initial fragment
1148 self.logger.info("ACLP_TEST_START_0021")
1150 port = random.randint(16384, 65535)
1154 self.create_rule(self.IPV4, self.DENY, port, self.proto[self.IP][self.UDP])
1157 self.create_rule(self.IPV6, self.DENY, port, self.proto[self.IP][self.UDP])
1159 # deny ip any any in the end
1160 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_ALL, 0))
1161 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_ALL, 0))
1164 self.apply_rules(rules, "deny ip4/ip6 udp %d" % port)
1166 # Traffic should not pass
1167 self.run_verify_negat_test(
1168 self.IP, self.IPRANDOM, self.proto[self.IP][self.UDP], port, True
1171 self.logger.info("ACLP_TEST_FINISH_0021")
1173 def test_0022_zero_length_udp_ipv4(self):
1174 """VPP-687 zero length udp ipv4 packet"""
1175 self.logger.info("ACLP_TEST_START_0022")
1177 port = random.randint(16384, 65535)
1182 self.IPV4, self.PERMIT, port, self.proto[self.IP][self.UDP]
1185 # deny ip any any in the end
1186 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1189 self.apply_rules(rules, "permit empty udp ip4 %d" % port)
1191 # Traffic should still pass
1192 # Create incoming packet streams for packet-generator interfaces
1194 pkts = self.create_stream(
1196 self.pg_if_packet_sizes,
1199 self.proto[self.IP][self.UDP],
1205 self.pg0.add_stream(pkts)
1206 pkts_cnt += len(pkts)
1208 # Enable packet capture and start packet sendingself.IPV
1209 self.pg_enable_capture(self.pg_interfaces)
1212 self.pg1.get_capture(pkts_cnt)
1214 self.logger.info("ACLP_TEST_FINISH_0022")
1216 def test_0023_zero_length_udp_ipv6(self):
1217 """VPP-687 zero length udp ipv6 packet"""
1218 self.logger.info("ACLP_TEST_START_0023")
1220 port = random.randint(16384, 65535)
1225 self.IPV6, self.PERMIT, port, self.proto[self.IP][self.UDP]
1228 # deny ip any any in the end
1229 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1232 self.apply_rules(rules, "permit empty udp ip6 %d" % port)
1234 # Traffic should still pass
1235 # Create incoming packet streams for packet-generator interfaces
1237 pkts = self.create_stream(
1239 self.pg_if_packet_sizes,
1242 self.proto[self.IP][self.UDP],
1248 self.pg0.add_stream(pkts)
1249 pkts_cnt += len(pkts)
1251 # Enable packet capture and start packet sendingself.IPV
1252 self.pg_enable_capture(self.pg_interfaces)
1255 # Verify outgoing packet streams per packet-generator interface
1256 self.pg1.get_capture(pkts_cnt)
1258 self.logger.info("ACLP_TEST_FINISH_0023")
1260 def test_0108_tcp_permit_v4(self):
1261 """permit TCPv4 + non-match range"""
1262 self.logger.info("ACLP_TEST_START_0108")
1268 self.IPV4, self.DENY, self.PORTS_RANGE_2, self.proto[self.IP][self.TCP]
1273 self.IPV4, self.PERMIT, self.PORTS_RANGE, self.proto[self.IP][self.TCP]
1276 # deny ip any any in the end
1277 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1280 self.apply_rules(rules, "permit ipv4 tcp")
1282 # Traffic should still pass
1283 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP])
1285 self.logger.info("ACLP_TEST_FINISH_0108")
1287 def test_0109_tcp_permit_v6(self):
1288 """permit TCPv6 + non-match range"""
1289 self.logger.info("ACLP_TEST_START_0109")
1295 self.IPV6, self.DENY, self.PORTS_RANGE_2, self.proto[self.IP][self.TCP]
1300 self.IPV6, self.PERMIT, self.PORTS_RANGE, self.proto[self.IP][self.TCP]
1303 # deny ip any any in the end
1304 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1307 self.apply_rules(rules, "permit ip6 tcp")
1309 # Traffic should still pass
1310 self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.TCP])
1312 self.logger.info("ACLP_TEST_FINISH_0109")
1314 def test_0110_udp_permit_v4(self):
1315 """permit UDPv4 + non-match range"""
1316 self.logger.info("ACLP_TEST_START_0110")
1322 self.IPV4, self.DENY, self.PORTS_RANGE_2, self.proto[self.IP][self.UDP]
1327 self.IPV4, self.PERMIT, self.PORTS_RANGE, self.proto[self.IP][self.UDP]
1330 # deny ip any any in the end
1331 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1334 self.apply_rules(rules, "permit ipv4 udp")
1336 # Traffic should still pass
1337 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP])
1339 self.logger.info("ACLP_TEST_FINISH_0110")
1341 def test_0111_udp_permit_v6(self):
1342 """permit UDPv6 + non-match range"""
1343 self.logger.info("ACLP_TEST_START_0111")
1349 self.IPV6, self.DENY, self.PORTS_RANGE_2, self.proto[self.IP][self.UDP]
1354 self.IPV6, self.PERMIT, self.PORTS_RANGE, self.proto[self.IP][self.UDP]
1357 # deny ip any any in the end
1358 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1361 self.apply_rules(rules, "permit ip6 udp")
1363 # Traffic should still pass
1364 self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.UDP])
1366 self.logger.info("ACLP_TEST_FINISH_0111")
1368 def test_0112_tcp_deny(self):
1369 """deny TCPv4/v6 + non-match range"""
1370 self.logger.info("ACLP_TEST_START_0112")
1379 self.proto[self.IP][self.TCP],
1387 self.proto[self.IP][self.TCP],
1392 self.IPV4, self.DENY, self.PORTS_RANGE, self.proto[self.IP][self.TCP]
1397 self.IPV6, self.DENY, self.PORTS_RANGE, self.proto[self.IP][self.TCP]
1400 # permit ip any any in the end
1401 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_ALL, 0))
1402 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_ALL, 0))
1405 self.apply_rules(rules, "deny ip4/ip6 tcp")
1407 # Traffic should not pass
1408 self.run_verify_negat_test(
1409 self.IP, self.IPRANDOM, self.proto[self.IP][self.TCP]
1412 self.logger.info("ACLP_TEST_FINISH_0112")
1414 def test_0113_udp_deny(self):
1415 """deny UDPv4/v6 + non-match range"""
1416 self.logger.info("ACLP_TEST_START_0113")
1425 self.proto[self.IP][self.UDP],
1433 self.proto[self.IP][self.UDP],
1438 self.IPV4, self.DENY, self.PORTS_RANGE, self.proto[self.IP][self.UDP]
1443 self.IPV6, self.DENY, self.PORTS_RANGE, self.proto[self.IP][self.UDP]
1446 # permit ip any any in the end
1447 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_ALL, 0))
1448 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_ALL, 0))
1451 self.apply_rules(rules, "deny ip4/ip6 udp")
1453 # Traffic should not pass
1454 self.run_verify_negat_test(
1455 self.IP, self.IPRANDOM, self.proto[self.IP][self.UDP]
1458 self.logger.info("ACLP_TEST_FINISH_0113")
1460 def test_0300_tcp_permit_v4_etype_aaaa(self):
1461 """permit TCPv4, send 0xAAAA etype"""
1462 self.logger.info("ACLP_TEST_START_0300")
1468 self.IPV4, self.DENY, self.PORTS_RANGE_2, self.proto[self.IP][self.TCP]
1473 self.IPV4, self.PERMIT, self.PORTS_RANGE, self.proto[self.IP][self.TCP]
1476 # deny ip any any in the end
1477 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1480 self.apply_rules(rules, "permit ipv4 tcp")
1482 # Traffic should still pass also for an odd ethertype
1483 self.run_verify_test(
1484 self.IP, self.IPV4, self.proto[self.IP][self.TCP], 0, False, True, 0xAAAA
1486 self.logger.info("ACLP_TEST_FINISH_0300")
1488 def test_0305_tcp_permit_v4_etype_blacklist_aaaa(self):
1489 """permit TCPv4, whitelist 0x0BBB ethertype, send 0xAAAA-blocked"""
1490 self.logger.info("ACLP_TEST_START_0305")
1496 self.IPV4, self.DENY, self.PORTS_RANGE_2, self.proto[self.IP][self.TCP]
1501 self.IPV4, self.PERMIT, self.PORTS_RANGE, self.proto[self.IP][self.TCP]
1504 # deny ip any any in the end
1505 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1508 self.apply_rules(rules, "permit ipv4 tcp")
1509 # whitelist the 0xbbbb etype - so the 0xaaaa should be blocked
1510 self.etype_whitelist([0xBBB], 1)
1512 # The oddball ethertype should be blocked
1513 self.run_verify_negat_test(
1514 self.IP, self.IPV4, self.proto[self.IP][self.TCP], 0, False, 0xAAAA
1517 # remove the whitelist
1518 self.etype_whitelist([], 0, add=False)
1520 self.logger.info("ACLP_TEST_FINISH_0305")
1522 def test_0306_tcp_permit_v4_etype_blacklist_aaaa(self):
1523 """permit TCPv4, whitelist 0x0BBB ethertype, send 0x0BBB - pass"""
1524 self.logger.info("ACLP_TEST_START_0306")
1530 self.IPV4, self.DENY, self.PORTS_RANGE_2, self.proto[self.IP][self.TCP]
1535 self.IPV4, self.PERMIT, self.PORTS_RANGE, self.proto[self.IP][self.TCP]
1538 # deny ip any any in the end
1539 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1542 self.apply_rules(rules, "permit ipv4 tcp")
1543 # whitelist the 0xbbbb etype - so the 0xaaaa should be blocked
1544 self.etype_whitelist([0xBBB], 1)
1546 # The whitelisted traffic, should pass
1547 self.run_verify_test(
1548 self.IP, self.IPV4, self.proto[self.IP][self.TCP], 0, False, True, 0x0BBB
1551 # remove the whitelist, the previously blocked 0xAAAA should pass now
1552 self.etype_whitelist([], 0, add=False)
1554 self.logger.info("ACLP_TEST_FINISH_0306")
1556 def test_0307_tcp_permit_v4_etype_blacklist_aaaa(self):
1557 """permit TCPv4, whitelist 0x0BBB, remove, send 0xAAAA - pass"""
1558 self.logger.info("ACLP_TEST_START_0307")
1564 self.IPV4, self.DENY, self.PORTS_RANGE_2, self.proto[self.IP][self.TCP]
1569 self.IPV4, self.PERMIT, self.PORTS_RANGE, self.proto[self.IP][self.TCP]
1572 # deny ip any any in the end
1573 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1576 self.apply_rules(rules, "permit ipv4 tcp")
1578 # whitelist the 0xbbbb etype - so the 0xaaaa should be blocked
1579 self.etype_whitelist([0xBBB], 1)
1580 # remove the whitelist, the previously blocked 0xAAAA should pass now
1581 self.etype_whitelist([], 0, add=False)
1583 # The whitelisted traffic, should pass
1584 self.run_verify_test(
1585 self.IP, self.IPV4, self.proto[self.IP][self.TCP], 0, False, True, 0xAAAA
1588 self.logger.info("ACLP_TEST_FINISH_0306")
1590 def test_0315_del_intf(self):
1591 """apply an acl and delete the interface"""
1592 self.logger.info("ACLP_TEST_START_0315")
1598 self.IPV4, self.DENY, self.PORTS_RANGE_2, self.proto[self.IP][self.TCP]
1603 self.IPV4, self.PERMIT, self.PORTS_RANGE, self.proto[self.IP][self.TCP]
1606 # deny ip any any in the end
1607 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1609 # create an interface
1611 intf.append(VppLoInterface(self))
1614 self.apply_rules_to(rules, "permit ipv4 tcp", intf[0].sw_if_index)
1616 # Remove the interface
1617 intf[0].remove_vpp_config()
1619 self.logger.info("ACLP_TEST_FINISH_0315")
1622 if __name__ == "__main__":
1623 unittest.main(testRunner=VppTestRunner)