2 """ACL plugin Test Case HLD:
8 from scapy.packet import Raw
9 from scapy.layers.l2 import Ether
10 from scapy.layers.inet import IP, TCP, UDP, ICMP
11 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest
12 from scapy.layers.inet6 import IPv6ExtHdrFragment
13 from framework import VppTestCase, VppTestRunner
14 from util import Host, ppp
17 class TestACLplugin(VppTestCase):
18 """ ACL plugin Test Case """
34 proto = [[6, 17], [1, 58]]
35 proto_map = {1: 'ICMP', 58: 'ICMPv6EchoRequest', 6: 'TCP', 17: 'UDP'}
46 udp_sport_to = udp_sport_from + 5
47 udp_dport_from = 20000
48 udp_dport_to = udp_dport_from + 5000
50 tcp_sport_to = tcp_sport_from + 5
51 tcp_dport_from = 40000
52 tcp_dport_to = tcp_dport_from + 5000
54 icmp4_type = 8 # echo request
56 icmp6_type = 128 # echo request
65 Perform standard class setup (defined by class method setUpClass in
66 class VppTestCase) before running the test case, set test case related
67 variables and configure VPP.
69 super(TestACLplugin, cls).setUpClass()
74 # Create 2 pg interfaces
75 cls.create_pg_interfaces(range(2))
77 # Packet flows mapping pg0 -> pg1, pg2 etc.
79 cls.flows[cls.pg0] = [cls.pg1]
82 cls.pg_if_packet_sizes = [64, 512, 1518, 9018]
84 # Create BD with MAC learning and unknown unicast flooding disabled
85 # and put interfaces to this BD
86 cls.vapi.bridge_domain_add_del(bd_id=cls.bd_id, uu_flood=1,
88 for pg_if in cls.pg_interfaces:
89 cls.vapi.sw_interface_set_l2_bridge(pg_if.sw_if_index,
92 # Set up all interfaces
93 for i in cls.pg_interfaces:
96 # Mapping between packet-generator index and lists of test hosts
97 cls.hosts_by_pg_idx = dict()
98 for pg_if in cls.pg_interfaces:
99 cls.hosts_by_pg_idx[pg_if.sw_if_index] = []
101 # Create list of deleted hosts
102 cls.deleted_hosts_by_pg_idx = dict()
103 for pg_if in cls.pg_interfaces:
104 cls.deleted_hosts_by_pg_idx[pg_if.sw_if_index] = []
106 # warm-up the mac address tables
110 super(TestACLplugin, cls).tearDownClass()
114 super(TestACLplugin, self).setUp()
115 self.reset_packet_infos()
119 Show various debug prints after each test.
121 super(TestACLplugin, self).tearDown()
122 if not self.vpp_dead:
123 self.logger.info(self.vapi.ppcli("show l2fib verbose"))
124 self.logger.info(self.vapi.ppcli("show bridge-domain %s detail"
127 def create_hosts(self, count, start=0):
129 Create required number of host MAC addresses and distribute them among
130 interfaces. Create host IPv4 address for every host MAC address.
132 :param int count: Number of hosts to create MAC/IPv4 addresses for.
133 :param int start: Number to start numbering from.
135 n_int = len(self.pg_interfaces)
136 macs_per_if = count / n_int
138 for pg_if in self.pg_interfaces:
140 start_nr = macs_per_if * i + start
141 end_nr = count + start if i == (n_int - 1) \
142 else macs_per_if * (i + 1) + start
143 hosts = self.hosts_by_pg_idx[pg_if.sw_if_index]
144 for j in range(start_nr, end_nr):
146 "00:00:00:ff:%02x:%02x" % (pg_if.sw_if_index, j),
147 "172.17.1%02x.%u" % (pg_if.sw_if_index, j),
148 "2017:dead:%02x::%u" % (pg_if.sw_if_index, j))
151 def create_rule(self, ip=0, permit_deny=0, ports=PORTS_ALL, proto=-1,
152 s_prefix=0, s_ip='\x00\x00\x00\x00',
153 d_prefix=0, d_ip='\x00\x00\x00\x00'):
156 if ports == self.PORTS_ALL:
159 sport_to = 65535 if proto != 1 and proto != 58 else 255
161 elif ports == self.PORTS_RANGE:
163 sport_from = self.icmp4_type
164 sport_to = self.icmp4_type
165 dport_from = self.icmp4_code
166 dport_to = self.icmp4_code
168 sport_from = self.icmp6_type
169 sport_to = self.icmp6_type
170 dport_from = self.icmp6_code
171 dport_to = self.icmp6_code
172 elif proto == self.proto[self.IP][self.TCP]:
173 sport_from = self.tcp_sport_from
174 sport_to = self.tcp_sport_to
175 dport_from = self.tcp_dport_from
176 dport_to = self.tcp_dport_to
177 elif proto == self.proto[self.IP][self.UDP]:
178 sport_from = self.udp_sport_from
179 sport_to = self.udp_sport_to
180 dport_from = self.udp_dport_from
181 dport_to = self.udp_dport_to
188 rule = ({'is_permit': permit_deny, 'is_ipv6': ip, 'proto': proto,
189 'srcport_or_icmptype_first': sport_from,
190 'srcport_or_icmptype_last': sport_to,
191 'src_ip_prefix_len': s_prefix,
193 'dstport_or_icmpcode_first': dport_from,
194 'dstport_or_icmpcode_last': dport_to,
195 'dst_ip_prefix_len': d_prefix,
196 'dst_ip_addr': d_ip})
199 def apply_rules(self, rules, tag=''):
200 reply = self.api_acl_add_replace(acl_index=4294967295, r=rules,
203 self.logger.info("Dumped ACL: " + str(
204 self.api_acl_dump(reply.acl_index)))
205 # Apply a ACL on the interface as inbound
206 for i in self.pg_interfaces:
207 self.api_acl_interface_set_acl_list(sw_if_index=i.sw_if_index,
209 acls=[reply.acl_index])
212 def create_upper_layer(self, packet_index, proto, ports=0):
213 p = self.proto_map[proto]
216 return UDP(sport=random.randint(self.udp_sport_from,
218 dport=random.randint(self.udp_dport_from,
221 return UDP(sport=ports, dport=ports)
224 return TCP(sport=random.randint(self.tcp_sport_from,
226 dport=random.randint(self.tcp_dport_from,
229 return TCP(sport=ports, dport=ports)
232 def create_stream(self, src_if, packet_sizes, traffic_type=0, ipv6=0,
233 proto=-1, ports=0, fragments=False):
235 Create input packet stream for defined interface using hosts or
238 :param object src_if: Interface to create packet stream for.
239 :param list packet_sizes: List of required packet sizes.
240 :param traffic_type: 1: ICMP packet, 2: IPv6 with EH, 0: otherwise.
241 :return: Stream of packets.
244 if self.flows.__contains__(src_if):
245 src_hosts = self.hosts_by_pg_idx[src_if.sw_if_index]
246 for dst_if in self.flows[src_if]:
247 dst_hosts = self.hosts_by_pg_idx[dst_if.sw_if_index]
248 n_int = len(dst_hosts) * len(src_hosts)
249 for i in range(0, n_int):
250 dst_host = dst_hosts[i / len(src_hosts)]
251 src_host = src_hosts[i % len(src_hosts)]
252 pkt_info = self.create_packet_info(src_if, dst_if)
258 pkt_info.ip = random.choice([0, 1])
260 pkt_info.proto = random.choice(self.proto[self.IP])
262 pkt_info.proto = proto
263 payload = self.info_to_payload(pkt_info)
264 p = Ether(dst=dst_host.mac, src=src_host.mac)
266 p /= IPv6(dst=dst_host.ip6, src=src_host.ip6)
268 p /= IPv6ExtHdrFragment(offset=64, m=1)
271 p /= IP(src=src_host.ip4, dst=dst_host.ip4,
274 p /= IP(src=src_host.ip4, dst=dst_host.ip4)
275 if traffic_type == self.ICMP:
277 p /= ICMPv6EchoRequest(type=self.icmp6_type,
278 code=self.icmp6_code)
280 p /= ICMP(type=self.icmp4_type,
281 code=self.icmp4_code)
283 p /= self.create_upper_layer(i, pkt_info.proto, ports)
285 pkt_info.data = p.copy()
286 size = random.choice(packet_sizes)
287 self.extend_packet(p, size)
291 def verify_capture(self, pg_if, capture, traffic_type=0, ip_type=0):
293 Verify captured input packet stream for defined interface.
295 :param object pg_if: Interface to verify captured packet stream for.
296 :param list capture: Captured packet stream.
297 :param traffic_type: 1: ICMP packet, 2: IPv6 with EH, 0: otherwise.
300 for i in self.pg_interfaces:
301 last_info[i.sw_if_index] = None
302 dst_sw_if_index = pg_if.sw_if_index
303 for packet in capture:
305 # Raw data for ICMPv6 are stored in ICMPv6EchoRequest.data
306 if traffic_type == self.ICMP and ip_type == self.IPV6:
307 payload_info = self.payload_to_info(
308 packet[ICMPv6EchoRequest].data)
309 payload = packet[ICMPv6EchoRequest]
311 payload_info = self.payload_to_info(str(packet[Raw]))
312 payload = packet[self.proto_map[payload_info.proto]]
314 self.logger.error(ppp("Unexpected or invalid packet "
315 "(outside network):", packet))
319 self.assertEqual(payload_info.ip, ip_type)
320 if traffic_type == self.ICMP:
322 if payload_info.ip == 0:
323 self.assertEqual(payload.type, self.icmp4_type)
324 self.assertEqual(payload.code, self.icmp4_code)
326 self.assertEqual(payload.type, self.icmp6_type)
327 self.assertEqual(payload.code, self.icmp6_code)
329 self.logger.error(ppp("Unexpected or invalid packet "
330 "(outside network):", packet))
334 ip_version = IPv6 if payload_info.ip == 1 else IP
336 ip = packet[ip_version]
337 packet_index = payload_info.index
339 self.assertEqual(payload_info.dst, dst_sw_if_index)
340 self.logger.debug("Got packet on port %s: src=%u (id=%u)" %
341 (pg_if.name, payload_info.src,
343 next_info = self.get_next_packet_info_for_interface2(
344 payload_info.src, dst_sw_if_index,
345 last_info[payload_info.src])
346 last_info[payload_info.src] = next_info
347 self.assertTrue(next_info is not None)
348 self.assertEqual(packet_index, next_info.index)
349 saved_packet = next_info.data
350 # Check standard fields
351 self.assertEqual(ip.src, saved_packet[ip_version].src)
352 self.assertEqual(ip.dst, saved_packet[ip_version].dst)
353 p = self.proto_map[payload_info.proto]
356 self.assertEqual(tcp.sport, saved_packet[
358 self.assertEqual(tcp.dport, saved_packet[
362 self.assertEqual(udp.sport, saved_packet[
364 self.assertEqual(udp.dport, saved_packet[
367 self.logger.error(ppp("Unexpected or invalid packet:",
370 for i in self.pg_interfaces:
371 remaining_packet = self.get_next_packet_info_for_interface2(
372 i, dst_sw_if_index, last_info[i.sw_if_index])
374 remaining_packet is None,
375 "Port %u: Packet expected from source %u didn't arrive" %
376 (dst_sw_if_index, i.sw_if_index))
378 def run_traffic_no_check(self):
380 # Create incoming packet streams for packet-generator interfaces
381 for i in self.pg_interfaces:
382 if self.flows.__contains__(i):
383 pkts = self.create_stream(i, self.pg_if_packet_sizes)
387 # Enable packet capture and start packet sending
388 self.pg_enable_capture(self.pg_interfaces)
391 def run_verify_test(self, traffic_type=0, ip_type=0, proto=-1, ports=0,
394 # Create incoming packet streams for packet-generator interfaces
396 for i in self.pg_interfaces:
397 if self.flows.__contains__(i):
398 pkts = self.create_stream(i, self.pg_if_packet_sizes,
399 traffic_type, ip_type, proto, ports,
403 pkts_cnt += len(pkts)
405 # Enable packet capture and start packet sendingself.IPV
406 self.pg_enable_capture(self.pg_interfaces)
410 # Verify outgoing packet streams per packet-generator interface
411 for src_if in self.pg_interfaces:
412 if self.flows.__contains__(src_if):
413 for dst_if in self.flows[src_if]:
414 capture = dst_if.get_capture(pkts_cnt)
415 self.logger.info("Verifying capture on interface %s" %
417 self.verify_capture(dst_if, capture, traffic_type, ip_type)
419 def run_verify_negat_test(self, traffic_type=0, ip_type=0, proto=-1,
420 ports=0, frags=False):
422 self.reset_packet_infos()
423 for i in self.pg_interfaces:
424 if self.flows.__contains__(i):
425 pkts = self.create_stream(i, self.pg_if_packet_sizes,
426 traffic_type, ip_type, proto, ports,
431 # Enable packet capture and start packet sending
432 self.pg_enable_capture(self.pg_interfaces)
436 # Verify outgoing packet streams per packet-generator interface
437 for src_if in self.pg_interfaces:
438 if self.flows.__contains__(src_if):
439 for dst_if in self.flows[src_if]:
440 self.logger.info("Verifying capture on interface %s" %
442 capture = dst_if.get_capture(0)
443 self.assertEqual(len(capture), 0)
445 def api_acl_add_replace(self, acl_index, r, count, tag='',
447 """Add/replace an ACL
449 :param int acl_index: ACL index to replace,
450 4294967295 to create new ACL.
451 :param acl_rule r: ACL rules array.
452 :param str tag: symbolic tag (description) for this ACL.
453 :param int count: number of rules.
455 return self.vapi.api(self.vapi.papi.acl_add_replace,
456 {'acl_index': acl_index,
460 expected_retval=expected_retval)
462 def api_acl_interface_set_acl_list(self, sw_if_index, count, n_input, acls,
464 return self.vapi.api(self.vapi.papi.acl_interface_set_acl_list,
465 {'sw_if_index': sw_if_index,
469 expected_retval=expected_retval)
471 def api_acl_dump(self, acl_index, expected_retval=0):
472 return self.vapi.api(self.vapi.papi.acl_dump,
473 {'acl_index': acl_index},
474 expected_retval=expected_retval)
476 def test_0000_warmup_test(self):
477 """ ACL plugin version check; learn MACs
479 self.create_hosts(16)
480 self.run_traffic_no_check()
481 reply = self.vapi.papi.acl_plugin_get_version()
482 self.assertEqual(reply.major, 1)
483 self.logger.info("Working with ACL plugin version: %d.%d" % (
484 reply.major, reply.minor))
485 # minor version changes are non breaking
486 # self.assertEqual(reply.minor, 0)
488 def test_0001_acl_create(self):
492 self.logger.info("ACLP_TEST_START_0001")
494 r = [{'is_permit': 1, 'is_ipv6': 0, 'proto': 17,
495 'srcport_or_icmptype_first': 1234,
496 'srcport_or_icmptype_last': 1235,
497 'src_ip_prefix_len': 0,
498 'src_ip_addr': '\x00\x00\x00\x00',
499 'dstport_or_icmpcode_first': 1234,
500 'dstport_or_icmpcode_last': 1234,
501 'dst_ip_addr': '\x00\x00\x00\x00',
502 'dst_ip_prefix_len': 0}]
503 # Test 1: add a new ACL
504 reply = self.api_acl_add_replace(acl_index=4294967295, r=r,
505 count=len(r), tag="permit 1234")
506 self.assertEqual(reply.retval, 0)
507 # The very first ACL gets #0
508 self.assertEqual(reply.acl_index, 0)
509 rr = self.api_acl_dump(reply.acl_index)
510 self.logger.info("Dumped ACL: " + str(rr))
511 self.assertEqual(len(rr), 1)
512 # We should have the same number of ACL entries as we had asked
513 self.assertEqual(len(rr[0].r), len(r))
514 # The rules should be the same. But because the submitted and returned
515 # are different types, we need to iterate over rules and keys to get
517 for i_rule in range(0, len(r) - 1):
518 for rule_key in r[i_rule]:
519 self.assertEqual(rr[0].r[i_rule][rule_key],
522 # Add a deny-1234 ACL
523 r_deny = ({'is_permit': 0, 'is_ipv6': 0, 'proto': 17,
524 'srcport_or_icmptype_first': 1234,
525 'srcport_or_icmptype_last': 1235,
526 'src_ip_prefix_len': 0,
527 'src_ip_addr': '\x00\x00\x00\x00',
528 'dstport_or_icmpcode_first': 1234,
529 'dstport_or_icmpcode_last': 1234,
530 'dst_ip_addr': '\x00\x00\x00\x00',
531 'dst_ip_prefix_len': 0},
532 {'is_permit': 1, 'is_ipv6': 0, 'proto': 17,
533 'srcport_or_icmptype_first': 0,
534 'srcport_or_icmptype_last': 0,
535 'src_ip_prefix_len': 0,
536 'src_ip_addr': '\x00\x00\x00\x00',
537 'dstport_or_icmpcode_first': 0,
538 'dstport_or_icmpcode_last': 0,
539 'dst_ip_addr': '\x00\x00\x00\x00',
540 'dst_ip_prefix_len': 0})
542 reply = self.api_acl_add_replace(acl_index=4294967295, r=r_deny,
544 tag="deny 1234;permit all")
545 self.assertEqual(reply.retval, 0)
546 # The second ACL gets #1
547 self.assertEqual(reply.acl_index, 1)
549 # Test 2: try to modify a nonexistent ACL
550 reply = self.api_acl_add_replace(acl_index=432, r=r, count=len(r),
551 tag="FFFF:FFFF", expected_retval=-1)
552 self.assertEqual(reply.retval, -1)
553 # The ACL number should pass through
554 self.assertEqual(reply.acl_index, 432)
556 self.logger.info("ACLP_TEST_FINISH_0001")
558 def test_0002_acl_permit_apply(self):
559 """ permit ACL apply test
561 self.logger.info("ACLP_TEST_START_0002")
564 rules.append(self.create_rule(self.IPV4, self.PERMIT,
565 0, self.proto[self.IP][self.UDP]))
566 rules.append(self.create_rule(self.IPV4, self.PERMIT,
567 0, self.proto[self.IP][self.TCP]))
570 self.apply_rules(rules, "permit per-flow")
572 # Traffic should still pass
573 self.run_verify_test(self.IP, self.IPV4, -1)
574 self.logger.info("ACLP_TEST_FINISH_0002")
576 def test_0003_acl_deny_apply(self):
577 """ deny ACL apply test
579 self.logger.info("ACLP_TEST_START_0003")
580 # Add a deny-flows ACL
582 rules.append(self.create_rule(self.IPV4, self.DENY,
583 self.PORTS_ALL, self.proto[self.IP][self.UDP]))
584 # Permit ip any any in the end
585 rules.append(self.create_rule(self.IPV4, self.PERMIT,
589 self.apply_rules(rules, "deny per-flow;permit all")
591 # Traffic should not pass
592 self.run_verify_negat_test(self.IP, self.IPV4,
593 self.proto[self.IP][self.UDP])
594 self.logger.info("ACLP_TEST_FINISH_0003")
595 # self.assertEqual(1, 0)
597 def test_0004_vpp624_permit_icmpv4(self):
598 """ VPP_624 permit ICMPv4
600 self.logger.info("ACLP_TEST_START_0004")
604 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
605 self.proto[self.ICMP][self.ICMPv4]))
606 # deny ip any any in the end
607 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
610 self.apply_rules(rules, "permit icmpv4")
612 # Traffic should still pass
613 self.run_verify_test(self.ICMP, self.IPV4,
614 self.proto[self.ICMP][self.ICMPv4])
616 self.logger.info("ACLP_TEST_FINISH_0004")
618 def test_0005_vpp624_permit_icmpv6(self):
619 """ VPP_624 permit ICMPv6
621 self.logger.info("ACLP_TEST_START_0005")
625 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
626 self.proto[self.ICMP][self.ICMPv6]))
627 # deny ip any any in the end
628 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
631 self.apply_rules(rules, "permit icmpv6")
633 # Traffic should still pass
634 self.run_verify_test(self.ICMP, self.IPV6,
635 self.proto[self.ICMP][self.ICMPv6])
637 self.logger.info("ACLP_TEST_FINISH_0005")
639 def test_0006_vpp624_deny_icmpv4(self):
640 """ VPP_624 deny ICMPv4
642 self.logger.info("ACLP_TEST_START_0006")
645 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
646 self.proto[self.ICMP][self.ICMPv4]))
647 # permit ip any any in the end
648 rules.append(self.create_rule(self.IPV4, self.PERMIT,
652 self.apply_rules(rules, "deny icmpv4")
654 # Traffic should not pass
655 self.run_verify_negat_test(self.ICMP, self.IPV4, 0)
657 self.logger.info("ACLP_TEST_FINISH_0006")
659 def test_0007_vpp624_deny_icmpv6(self):
660 """ VPP_624 deny ICMPv6
662 self.logger.info("ACLP_TEST_START_0007")
665 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
666 self.proto[self.ICMP][self.ICMPv6]))
667 # deny ip any any in the end
668 rules.append(self.create_rule(self.IPV6, self.PERMIT,
672 self.apply_rules(rules, "deny icmpv6")
674 # Traffic should not pass
675 self.run_verify_negat_test(self.ICMP, self.IPV6, 0)
677 self.logger.info("ACLP_TEST_FINISH_0007")
679 def test_0008_tcp_permit_v4(self):
682 self.logger.info("ACLP_TEST_START_0008")
686 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
687 self.proto[self.IP][self.TCP]))
688 # deny ip any any in the end
689 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
692 self.apply_rules(rules, "permit ipv4 tcp")
694 # Traffic should still pass
695 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP])
697 self.logger.info("ACLP_TEST_FINISH_0008")
699 def test_0009_tcp_permit_v6(self):
702 self.logger.info("ACLP_TEST_START_0009")
706 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
707 self.proto[self.IP][self.TCP]))
708 # deny ip any any in the end
709 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
712 self.apply_rules(rules, "permit ip6 tcp")
714 # Traffic should still pass
715 self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.TCP])
717 self.logger.info("ACLP_TEST_FINISH_0008")
719 def test_0010_udp_permit_v4(self):
722 self.logger.info("ACLP_TEST_START_0010")
726 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
727 self.proto[self.IP][self.UDP]))
728 # deny ip any any in the end
729 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
732 self.apply_rules(rules, "permit ipv udp")
734 # Traffic should still pass
735 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP])
737 self.logger.info("ACLP_TEST_FINISH_0010")
739 def test_0011_udp_permit_v6(self):
742 self.logger.info("ACLP_TEST_START_0011")
746 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
747 self.proto[self.IP][self.UDP]))
748 # deny ip any any in the end
749 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
752 self.apply_rules(rules, "permit ip6 udp")
754 # Traffic should still pass
755 self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.UDP])
757 self.logger.info("ACLP_TEST_FINISH_0011")
759 def test_0012_tcp_deny(self):
762 self.logger.info("ACLP_TEST_START_0012")
766 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
767 self.proto[self.IP][self.TCP]))
768 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
769 self.proto[self.IP][self.TCP]))
770 # permit ip any any in the end
771 rules.append(self.create_rule(self.IPV4, self.PERMIT,
773 rules.append(self.create_rule(self.IPV6, self.PERMIT,
777 self.apply_rules(rules, "deny ip4/ip6 tcp")
779 # Traffic should not pass
780 self.run_verify_negat_test(self.IP, self.IPRANDOM,
781 self.proto[self.IP][self.TCP])
783 self.logger.info("ACLP_TEST_FINISH_0012")
785 def test_0013_udp_deny(self):
788 self.logger.info("ACLP_TEST_START_0013")
792 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
793 self.proto[self.IP][self.UDP]))
794 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
795 self.proto[self.IP][self.UDP]))
796 # permit ip any any in the end
797 rules.append(self.create_rule(self.IPV4, self.PERMIT,
799 rules.append(self.create_rule(self.IPV6, self.PERMIT,
803 self.apply_rules(rules, "deny ip4/ip6 udp")
805 # Traffic should not pass
806 self.run_verify_negat_test(self.IP, self.IPRANDOM,
807 self.proto[self.IP][self.UDP])
809 self.logger.info("ACLP_TEST_FINISH_0013")
811 def test_0014_acl_dump(self):
812 """ verify add/dump acls
814 self.logger.info("ACLP_TEST_START_0014")
816 r = [[self.IPV4, self.PERMIT, 1234, self.proto[self.IP][self.TCP]],
817 [self.IPV4, self.PERMIT, 2345, self.proto[self.IP][self.UDP]],
818 [self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.TCP]],
819 [self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.UDP]],
820 [self.IPV4, self.PERMIT, 5, self.proto[self.ICMP][self.ICMPv4]],
821 [self.IPV6, self.PERMIT, 4321, self.proto[self.IP][self.TCP]],
822 [self.IPV6, self.PERMIT, 5432, self.proto[self.IP][self.UDP]],
823 [self.IPV6, self.PERMIT, 0, self.proto[self.IP][self.TCP]],
824 [self.IPV6, self.PERMIT, 0, self.proto[self.IP][self.UDP]],
825 [self.IPV6, self.PERMIT, 6, self.proto[self.ICMP][self.ICMPv6]],
826 [self.IPV4, self.DENY, self.PORTS_ALL, 0],
827 [self.IPV4, self.DENY, 1234, self.proto[self.IP][self.TCP]],
828 [self.IPV4, self.DENY, 2345, self.proto[self.IP][self.UDP]],
829 [self.IPV4, self.DENY, 5, self.proto[self.ICMP][self.ICMPv4]],
830 [self.IPV6, self.DENY, 4321, self.proto[self.IP][self.TCP]],
831 [self.IPV6, self.DENY, 5432, self.proto[self.IP][self.UDP]],
832 [self.IPV6, self.DENY, 6, self.proto[self.ICMP][self.ICMPv6]],
833 [self.IPV6, self.DENY, self.PORTS_ALL, 0]
836 # Add and verify new ACLs
838 for i in range(len(r)):
839 rules.append(self.create_rule(r[i][0], r[i][1], r[i][2], r[i][3]))
841 reply = self.api_acl_add_replace(acl_index=4294967295, r=rules,
843 result = self.api_acl_dump(reply.acl_index)
846 for drules in result:
848 self.assertEqual(dr.is_ipv6, r[i][0])
849 self.assertEqual(dr.is_permit, r[i][1])
850 self.assertEqual(dr.proto, r[i][3])
853 self.assertEqual(dr.srcport_or_icmptype_first, r[i][2])
856 self.assertEqual(dr.srcport_or_icmptype_first, 0)
857 self.assertEqual(dr.srcport_or_icmptype_last, 65535)
859 if dr.proto == self.proto[self.IP][self.TCP]:
860 self.assertGreater(dr.srcport_or_icmptype_first,
861 self.tcp_sport_from-1)
862 self.assertLess(dr.srcport_or_icmptype_first,
864 self.assertGreater(dr.dstport_or_icmpcode_last,
865 self.tcp_dport_from-1)
866 self.assertLess(dr.dstport_or_icmpcode_last,
868 elif dr.proto == self.proto[self.IP][self.UDP]:
869 self.assertGreater(dr.srcport_or_icmptype_first,
870 self.udp_sport_from-1)
871 self.assertLess(dr.srcport_or_icmptype_first,
873 self.assertGreater(dr.dstport_or_icmpcode_last,
874 self.udp_dport_from-1)
875 self.assertLess(dr.dstport_or_icmpcode_last,
879 self.logger.info("ACLP_TEST_FINISH_0014")
881 def test_0015_tcp_permit_port_v4(self):
882 """ permit single TCPv4
884 self.logger.info("ACLP_TEST_START_0015")
886 port = random.randint(0, 65535)
889 rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
890 self.proto[self.IP][self.TCP]))
891 # deny ip any any in the end
892 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
895 self.apply_rules(rules, "permit ip4 tcp "+str(port))
897 # Traffic should still pass
898 self.run_verify_test(self.IP, self.IPV4,
899 self.proto[self.IP][self.TCP], port)
901 self.logger.info("ACLP_TEST_FINISH_0015")
903 def test_0016_udp_permit_port_v4(self):
904 """ permit single UDPv4
906 self.logger.info("ACLP_TEST_START_0016")
908 port = random.randint(0, 65535)
911 rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
912 self.proto[self.IP][self.UDP]))
913 # deny ip any any in the end
914 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
917 self.apply_rules(rules, "permit ip4 tcp "+str(port))
919 # Traffic should still pass
920 self.run_verify_test(self.IP, self.IPV4,
921 self.proto[self.IP][self.UDP], port)
923 self.logger.info("ACLP_TEST_FINISH_0016")
925 def test_0017_tcp_permit_port_v6(self):
926 """ permit single TCPv6
928 self.logger.info("ACLP_TEST_START_0017")
930 port = random.randint(0, 65535)
933 rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
934 self.proto[self.IP][self.TCP]))
935 # deny ip any any in the end
936 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
939 self.apply_rules(rules, "permit ip4 tcp "+str(port))
941 # Traffic should still pass
942 self.run_verify_test(self.IP, self.IPV6,
943 self.proto[self.IP][self.TCP], port)
945 self.logger.info("ACLP_TEST_FINISH_0017")
947 def test_0018_udp_permit_port_v6(self):
948 """ permit single UPPv6
950 self.logger.info("ACLP_TEST_START_0018")
952 port = random.randint(0, 65535)
955 rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
956 self.proto[self.IP][self.UDP]))
957 # deny ip any any in the end
958 rules.append(self.create_rule(self.IPV6, self.DENY,
962 self.apply_rules(rules, "permit ip4 tcp "+str(port))
964 # Traffic should still pass
965 self.run_verify_test(self.IP, self.IPV6,
966 self.proto[self.IP][self.UDP], port)
968 self.logger.info("ACLP_TEST_FINISH_0018")
970 def test_0019_udp_deny_port(self):
971 """ deny single TCPv4/v6
973 self.logger.info("ACLP_TEST_START_0019")
975 port = random.randint(0, 65535)
978 rules.append(self.create_rule(self.IPV4, self.DENY, port,
979 self.proto[self.IP][self.TCP]))
980 rules.append(self.create_rule(self.IPV6, self.DENY, port,
981 self.proto[self.IP][self.TCP]))
982 # Permit ip any any in the end
983 rules.append(self.create_rule(self.IPV4, self.PERMIT,
985 rules.append(self.create_rule(self.IPV6, self.PERMIT,
989 self.apply_rules(rules, "deny ip4/ip6 udp "+str(port))
991 # Traffic should not pass
992 self.run_verify_negat_test(self.IP, self.IPRANDOM,
993 self.proto[self.IP][self.TCP], port)
995 self.logger.info("ACLP_TEST_FINISH_0019")
997 def test_0020_udp_deny_port(self):
998 """ deny single UDPv4/v6
1000 self.logger.info("ACLP_TEST_START_0020")
1002 port = random.randint(0, 65535)
1005 rules.append(self.create_rule(self.IPV4, self.DENY, port,
1006 self.proto[self.IP][self.UDP]))
1007 rules.append(self.create_rule(self.IPV6, self.DENY, port,
1008 self.proto[self.IP][self.UDP]))
1009 # Permit ip any any in the end
1010 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1012 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1016 self.apply_rules(rules, "deny ip4/ip6 udp "+str(port))
1018 # Traffic should not pass
1019 self.run_verify_negat_test(self.IP, self.IPRANDOM,
1020 self.proto[self.IP][self.UDP], port)
1022 self.logger.info("ACLP_TEST_FINISH_0020")
1024 def test_0021_udp_deny_port_verify_fragment_deny(self):
1025 """ deny single UDPv4/v6, permit ip any, verify non-initial fragment blocked
1027 self.logger.info("ACLP_TEST_START_0021")
1029 port = random.randint(0, 65535)
1032 rules.append(self.create_rule(self.IPV4, self.DENY, port,
1033 self.proto[self.IP][self.UDP]))
1034 rules.append(self.create_rule(self.IPV6, self.DENY, port,
1035 self.proto[self.IP][self.UDP]))
1036 # deny ip any any in the end
1037 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1039 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1043 self.apply_rules(rules, "deny ip4/ip6 udp "+str(port))
1045 # Traffic should not pass
1046 self.run_verify_negat_test(self.IP, self.IPRANDOM,
1047 self.proto[self.IP][self.UDP], port, True)
1049 self.logger.info("ACLP_TEST_FINISH_0021")
1051 if __name__ == '__main__':
1052 unittest.main(testRunner=VppTestRunner)