2 """ACL plugin Test Case HLD:
8 from scapy.packet import Raw
9 from scapy.layers.l2 import Ether
10 from scapy.layers.inet import IP, TCP, UDP, ICMP
11 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest
12 from scapy.layers.inet6 import IPv6ExtHdrFragment
13 from framework import VppTestCase, VppTestRunner
14 from util import Host, ppp
17 class TestACLplugin(VppTestCase):
18 """ ACL plugin Test Case """
34 proto = [[6, 17], [1, 58]]
35 proto_map = {1: 'ICMP', 58: 'ICMPv6EchoRequest', 6: 'TCP', 17: 'UDP'}
47 udp_sport_to = udp_sport_from + 5
48 udp_dport_from = 20000
49 udp_dport_to = udp_dport_from + 5000
51 tcp_sport_to = tcp_sport_from + 5
52 tcp_dport_from = 40000
53 tcp_dport_to = tcp_dport_from + 5000
56 udp_sport_to_2 = udp_sport_from_2 + 5
57 udp_dport_from_2 = 30000
58 udp_dport_to_2 = udp_dport_from_2 + 5000
59 tcp_sport_from_2 = 130
60 tcp_sport_to_2 = tcp_sport_from_2 + 5
61 tcp_dport_from_2 = 20000
62 tcp_dport_to_2 = tcp_dport_from_2 + 5000
64 icmp4_type = 8 # echo request
66 icmp6_type = 128 # echo request
82 Perform standard class setup (defined by class method setUpClass in
83 class VppTestCase) before running the test case, set test case related
84 variables and configure VPP.
86 super(TestACLplugin, cls).setUpClass()
91 # Create 2 pg interfaces
92 cls.create_pg_interfaces(range(2))
94 # Packet flows mapping pg0 -> pg1, pg2 etc.
96 cls.flows[cls.pg0] = [cls.pg1]
99 cls.pg_if_packet_sizes = [64, 512, 1518, 9018]
101 # Create BD with MAC learning and unknown unicast flooding disabled
102 # and put interfaces to this BD
103 cls.vapi.bridge_domain_add_del(bd_id=cls.bd_id, uu_flood=1,
105 for pg_if in cls.pg_interfaces:
106 cls.vapi.sw_interface_set_l2_bridge(pg_if.sw_if_index,
109 # Set up all interfaces
110 for i in cls.pg_interfaces:
113 # Mapping between packet-generator index and lists of test hosts
114 cls.hosts_by_pg_idx = dict()
115 for pg_if in cls.pg_interfaces:
116 cls.hosts_by_pg_idx[pg_if.sw_if_index] = []
118 # Create list of deleted hosts
119 cls.deleted_hosts_by_pg_idx = dict()
120 for pg_if in cls.pg_interfaces:
121 cls.deleted_hosts_by_pg_idx[pg_if.sw_if_index] = []
123 # warm-up the mac address tables
127 super(TestACLplugin, cls).tearDownClass()
131 super(TestACLplugin, self).setUp()
132 self.reset_packet_infos()
136 Show various debug prints after each test.
138 super(TestACLplugin, self).tearDown()
139 if not self.vpp_dead:
140 self.logger.info(self.vapi.ppcli("show l2fib verbose"))
141 self.logger.info(self.vapi.ppcli("show acl-plugin acl"))
142 self.logger.info(self.vapi.ppcli("show acl-plugin interface"))
143 self.logger.info(self.vapi.ppcli("show acl-plugin tables"))
144 self.logger.info(self.vapi.ppcli("show bridge-domain %s detail"
147 def create_hosts(self, count, start=0):
149 Create required number of host MAC addresses and distribute them among
150 interfaces. Create host IPv4 address for every host MAC address.
152 :param int count: Number of hosts to create MAC/IPv4 addresses for.
153 :param int start: Number to start numbering from.
155 n_int = len(self.pg_interfaces)
156 macs_per_if = count / n_int
158 for pg_if in self.pg_interfaces:
160 start_nr = macs_per_if * i + start
161 end_nr = count + start if i == (n_int - 1) \
162 else macs_per_if * (i + 1) + start
163 hosts = self.hosts_by_pg_idx[pg_if.sw_if_index]
164 for j in range(start_nr, end_nr):
166 "00:00:00:ff:%02x:%02x" % (pg_if.sw_if_index, j),
167 "172.17.1%02x.%u" % (pg_if.sw_if_index, j),
168 "2017:dead:%02x::%u" % (pg_if.sw_if_index, j))
171 def create_rule(self, ip=0, permit_deny=0, ports=PORTS_ALL, proto=-1,
172 s_prefix=0, s_ip='\x00\x00\x00\x00',
173 d_prefix=0, d_ip='\x00\x00\x00\x00'):
176 if ports == self.PORTS_ALL:
179 sport_to = 65535 if proto != 1 and proto != 58 else 255
181 elif ports == self.PORTS_RANGE:
183 sport_from = self.icmp4_type
184 sport_to = self.icmp4_type
185 dport_from = self.icmp4_code
186 dport_to = self.icmp4_code
188 sport_from = self.icmp6_type
189 sport_to = self.icmp6_type
190 dport_from = self.icmp6_code
191 dport_to = self.icmp6_code
192 elif proto == self.proto[self.IP][self.TCP]:
193 sport_from = self.tcp_sport_from
194 sport_to = self.tcp_sport_to
195 dport_from = self.tcp_dport_from
196 dport_to = self.tcp_dport_to
197 elif proto == self.proto[self.IP][self.UDP]:
198 sport_from = self.udp_sport_from
199 sport_to = self.udp_sport_to
200 dport_from = self.udp_dport_from
201 dport_to = self.udp_dport_to
202 elif ports == self.PORTS_RANGE_2:
204 sport_from = self.icmp4_type_2
205 sport_to = self.icmp4_type_2
206 dport_from = self.icmp4_code_from_2
207 dport_to = self.icmp4_code_to_2
209 sport_from = self.icmp6_type_2
210 sport_to = self.icmp6_type_2
211 dport_from = self.icmp6_code_from_2
212 dport_to = self.icmp6_code_to_2
213 elif proto == self.proto[self.IP][self.TCP]:
214 sport_from = self.tcp_sport_from_2
215 sport_to = self.tcp_sport_to_2
216 dport_from = self.tcp_dport_from_2
217 dport_to = self.tcp_dport_to_2
218 elif proto == self.proto[self.IP][self.UDP]:
219 sport_from = self.udp_sport_from_2
220 sport_to = self.udp_sport_to_2
221 dport_from = self.udp_dport_from_2
222 dport_to = self.udp_dport_to_2
229 rule = ({'is_permit': permit_deny, 'is_ipv6': ip, 'proto': proto,
230 'srcport_or_icmptype_first': sport_from,
231 'srcport_or_icmptype_last': sport_to,
232 'src_ip_prefix_len': s_prefix,
234 'dstport_or_icmpcode_first': dport_from,
235 'dstport_or_icmpcode_last': dport_to,
236 'dst_ip_prefix_len': d_prefix,
237 'dst_ip_addr': d_ip})
240 def apply_rules(self, rules, tag=''):
241 reply = self.vapi.acl_add_replace(acl_index=4294967295, r=rules,
243 self.logger.info("Dumped ACL: " + str(
244 self.vapi.acl_dump(reply.acl_index)))
245 # Apply a ACL on the interface as inbound
246 for i in self.pg_interfaces:
247 self.vapi.acl_interface_set_acl_list(sw_if_index=i.sw_if_index,
249 acls=[reply.acl_index])
252 def create_upper_layer(self, packet_index, proto, ports=0):
253 p = self.proto_map[proto]
256 return UDP(sport=random.randint(self.udp_sport_from,
258 dport=random.randint(self.udp_dport_from,
261 return UDP(sport=ports, dport=ports)
264 return TCP(sport=random.randint(self.tcp_sport_from,
266 dport=random.randint(self.tcp_dport_from,
269 return TCP(sport=ports, dport=ports)
272 def create_stream(self, src_if, packet_sizes, traffic_type=0, ipv6=0,
273 proto=-1, ports=0, fragments=False, pkt_raw=True):
275 Create input packet stream for defined interface using hosts or
278 :param object src_if: Interface to create packet stream for.
279 :param list packet_sizes: List of required packet sizes.
280 :param traffic_type: 1: ICMP packet, 2: IPv6 with EH, 0: otherwise.
281 :return: Stream of packets.
284 if self.flows.__contains__(src_if):
285 src_hosts = self.hosts_by_pg_idx[src_if.sw_if_index]
286 for dst_if in self.flows[src_if]:
287 dst_hosts = self.hosts_by_pg_idx[dst_if.sw_if_index]
288 n_int = len(dst_hosts) * len(src_hosts)
289 for i in range(0, n_int):
290 dst_host = dst_hosts[i / len(src_hosts)]
291 src_host = src_hosts[i % len(src_hosts)]
292 pkt_info = self.create_packet_info(src_if, dst_if)
298 pkt_info.ip = random.choice([0, 1])
300 pkt_info.proto = random.choice(self.proto[self.IP])
302 pkt_info.proto = proto
303 payload = self.info_to_payload(pkt_info)
304 p = Ether(dst=dst_host.mac, src=src_host.mac)
306 p /= IPv6(dst=dst_host.ip6, src=src_host.ip6)
308 p /= IPv6ExtHdrFragment(offset=64, m=1)
311 p /= IP(src=src_host.ip4, dst=dst_host.ip4,
314 p /= IP(src=src_host.ip4, dst=dst_host.ip4)
315 if traffic_type == self.ICMP:
317 p /= ICMPv6EchoRequest(type=self.icmp6_type,
318 code=self.icmp6_code)
320 p /= ICMP(type=self.icmp4_type,
321 code=self.icmp4_code)
323 p /= self.create_upper_layer(i, pkt_info.proto, ports)
326 pkt_info.data = p.copy()
328 size = random.choice(packet_sizes)
329 self.extend_packet(p, size)
333 def verify_capture(self, pg_if, capture, traffic_type=0, ip_type=0):
335 Verify captured input packet stream for defined interface.
337 :param object pg_if: Interface to verify captured packet stream for.
338 :param list capture: Captured packet stream.
339 :param traffic_type: 1: ICMP packet, 2: IPv6 with EH, 0: otherwise.
342 for i in self.pg_interfaces:
343 last_info[i.sw_if_index] = None
344 dst_sw_if_index = pg_if.sw_if_index
345 for packet in capture:
347 # Raw data for ICMPv6 are stored in ICMPv6EchoRequest.data
348 if traffic_type == self.ICMP and ip_type == self.IPV6:
349 payload_info = self.payload_to_info(
350 packet[ICMPv6EchoRequest].data)
351 payload = packet[ICMPv6EchoRequest]
353 payload_info = self.payload_to_info(str(packet[Raw]))
354 payload = packet[self.proto_map[payload_info.proto]]
356 self.logger.error(ppp("Unexpected or invalid packet "
357 "(outside network):", packet))
361 self.assertEqual(payload_info.ip, ip_type)
362 if traffic_type == self.ICMP:
364 if payload_info.ip == 0:
365 self.assertEqual(payload.type, self.icmp4_type)
366 self.assertEqual(payload.code, self.icmp4_code)
368 self.assertEqual(payload.type, self.icmp6_type)
369 self.assertEqual(payload.code, self.icmp6_code)
371 self.logger.error(ppp("Unexpected or invalid packet "
372 "(outside network):", packet))
376 ip_version = IPv6 if payload_info.ip == 1 else IP
378 ip = packet[ip_version]
379 packet_index = payload_info.index
381 self.assertEqual(payload_info.dst, dst_sw_if_index)
382 self.logger.debug("Got packet on port %s: src=%u (id=%u)" %
383 (pg_if.name, payload_info.src,
385 next_info = self.get_next_packet_info_for_interface2(
386 payload_info.src, dst_sw_if_index,
387 last_info[payload_info.src])
388 last_info[payload_info.src] = next_info
389 self.assertTrue(next_info is not None)
390 self.assertEqual(packet_index, next_info.index)
391 saved_packet = next_info.data
392 # Check standard fields
393 self.assertEqual(ip.src, saved_packet[ip_version].src)
394 self.assertEqual(ip.dst, saved_packet[ip_version].dst)
395 p = self.proto_map[payload_info.proto]
398 self.assertEqual(tcp.sport, saved_packet[
400 self.assertEqual(tcp.dport, saved_packet[
404 self.assertEqual(udp.sport, saved_packet[
406 self.assertEqual(udp.dport, saved_packet[
409 self.logger.error(ppp("Unexpected or invalid packet:",
412 for i in self.pg_interfaces:
413 remaining_packet = self.get_next_packet_info_for_interface2(
414 i, dst_sw_if_index, last_info[i.sw_if_index])
416 remaining_packet is None,
417 "Port %u: Packet expected from source %u didn't arrive" %
418 (dst_sw_if_index, i.sw_if_index))
420 def run_traffic_no_check(self):
422 # Create incoming packet streams for packet-generator interfaces
423 for i in self.pg_interfaces:
424 if self.flows.__contains__(i):
425 pkts = self.create_stream(i, self.pg_if_packet_sizes)
429 # Enable packet capture and start packet sending
430 self.pg_enable_capture(self.pg_interfaces)
433 def run_verify_test(self, traffic_type=0, ip_type=0, proto=-1, ports=0,
434 frags=False, pkt_raw=True):
436 # Create incoming packet streams for packet-generator interfaces
438 for i in self.pg_interfaces:
439 if self.flows.__contains__(i):
440 pkts = self.create_stream(i, self.pg_if_packet_sizes,
441 traffic_type, ip_type, proto, ports,
445 pkts_cnt += len(pkts)
447 # Enable packet capture and start packet sendingself.IPV
448 self.pg_enable_capture(self.pg_interfaces)
452 # Verify outgoing packet streams per packet-generator interface
453 for src_if in self.pg_interfaces:
454 if self.flows.__contains__(src_if):
455 for dst_if in self.flows[src_if]:
456 capture = dst_if.get_capture(pkts_cnt)
457 self.logger.info("Verifying capture on interface %s" %
459 self.verify_capture(dst_if, capture, traffic_type, ip_type)
461 def run_verify_negat_test(self, traffic_type=0, ip_type=0, proto=-1,
462 ports=0, frags=False):
464 self.reset_packet_infos()
465 for i in self.pg_interfaces:
466 if self.flows.__contains__(i):
467 pkts = self.create_stream(i, self.pg_if_packet_sizes,
468 traffic_type, ip_type, proto, ports,
473 # Enable packet capture and start packet sending
474 self.pg_enable_capture(self.pg_interfaces)
478 # Verify outgoing packet streams per packet-generator interface
479 for src_if in self.pg_interfaces:
480 if self.flows.__contains__(src_if):
481 for dst_if in self.flows[src_if]:
482 self.logger.info("Verifying capture on interface %s" %
484 capture = dst_if.get_capture(0)
485 self.assertEqual(len(capture), 0)
487 def test_0000_warmup_test(self):
488 """ ACL plugin version check; learn MACs
490 self.create_hosts(16)
491 self.run_traffic_no_check()
492 reply = self.vapi.papi.acl_plugin_get_version()
493 self.assertEqual(reply.major, 1)
494 self.logger.info("Working with ACL plugin version: %d.%d" % (
495 reply.major, reply.minor))
496 # minor version changes are non breaking
497 # self.assertEqual(reply.minor, 0)
499 def test_0001_acl_create(self):
503 self.logger.info("ACLP_TEST_START_0001")
505 r = [{'is_permit': 1, 'is_ipv6': 0, 'proto': 17,
506 'srcport_or_icmptype_first': 1234,
507 'srcport_or_icmptype_last': 1235,
508 'src_ip_prefix_len': 0,
509 'src_ip_addr': '\x00\x00\x00\x00',
510 'dstport_or_icmpcode_first': 1234,
511 'dstport_or_icmpcode_last': 1234,
512 'dst_ip_addr': '\x00\x00\x00\x00',
513 'dst_ip_prefix_len': 0}]
514 # Test 1: add a new ACL
515 reply = self.vapi.acl_add_replace(acl_index=4294967295, r=r,
517 self.assertEqual(reply.retval, 0)
518 # The very first ACL gets #0
519 self.assertEqual(reply.acl_index, 0)
520 rr = self.vapi.acl_dump(reply.acl_index)
521 self.logger.info("Dumped ACL: " + str(rr))
522 self.assertEqual(len(rr), 1)
523 # We should have the same number of ACL entries as we had asked
524 self.assertEqual(len(rr[0].r), len(r))
525 # The rules should be the same. But because the submitted and returned
526 # are different types, we need to iterate over rules and keys to get
528 for i_rule in range(0, len(r) - 1):
529 for rule_key in r[i_rule]:
530 self.assertEqual(rr[0].r[i_rule][rule_key],
533 # Add a deny-1234 ACL
534 r_deny = ({'is_permit': 0, 'is_ipv6': 0, 'proto': 17,
535 'srcport_or_icmptype_first': 1234,
536 'srcport_or_icmptype_last': 1235,
537 'src_ip_prefix_len': 0,
538 'src_ip_addr': '\x00\x00\x00\x00',
539 'dstport_or_icmpcode_first': 1234,
540 'dstport_or_icmpcode_last': 1234,
541 'dst_ip_addr': '\x00\x00\x00\x00',
542 'dst_ip_prefix_len': 0},
543 {'is_permit': 1, 'is_ipv6': 0, 'proto': 17,
544 'srcport_or_icmptype_first': 0,
545 'srcport_or_icmptype_last': 0,
546 'src_ip_prefix_len': 0,
547 'src_ip_addr': '\x00\x00\x00\x00',
548 'dstport_or_icmpcode_first': 0,
549 'dstport_or_icmpcode_last': 0,
550 'dst_ip_addr': '\x00\x00\x00\x00',
551 'dst_ip_prefix_len': 0})
553 reply = self.vapi.acl_add_replace(acl_index=4294967295, r=r_deny,
554 tag="deny 1234;permit all")
555 self.assertEqual(reply.retval, 0)
556 # The second ACL gets #1
557 self.assertEqual(reply.acl_index, 1)
559 # Test 2: try to modify a nonexistent ACL
560 reply = self.vapi.acl_add_replace(acl_index=432, r=r,
561 tag="FFFF:FFFF", expected_retval=-1)
562 self.assertEqual(reply.retval, -1)
563 # The ACL number should pass through
564 self.assertEqual(reply.acl_index, 432)
566 self.logger.info("ACLP_TEST_FINISH_0001")
568 def test_0002_acl_permit_apply(self):
569 """ permit ACL apply test
571 self.logger.info("ACLP_TEST_START_0002")
574 rules.append(self.create_rule(self.IPV4, self.PERMIT,
575 0, self.proto[self.IP][self.UDP]))
576 rules.append(self.create_rule(self.IPV4, self.PERMIT,
577 0, self.proto[self.IP][self.TCP]))
580 self.apply_rules(rules, "permit per-flow")
582 # Traffic should still pass
583 self.run_verify_test(self.IP, self.IPV4, -1)
584 self.logger.info("ACLP_TEST_FINISH_0002")
586 def test_0003_acl_deny_apply(self):
587 """ deny ACL apply test
589 self.logger.info("ACLP_TEST_START_0003")
590 # Add a deny-flows ACL
592 rules.append(self.create_rule(self.IPV4, self.DENY,
593 self.PORTS_ALL, self.proto[self.IP][self.UDP]))
594 # Permit ip any any in the end
595 rules.append(self.create_rule(self.IPV4, self.PERMIT,
599 self.apply_rules(rules, "deny per-flow;permit all")
601 # Traffic should not pass
602 self.run_verify_negat_test(self.IP, self.IPV4,
603 self.proto[self.IP][self.UDP])
604 self.logger.info("ACLP_TEST_FINISH_0003")
605 # self.assertEqual(1, 0)
607 def test_0004_vpp624_permit_icmpv4(self):
608 """ VPP_624 permit ICMPv4
610 self.logger.info("ACLP_TEST_START_0004")
614 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
615 self.proto[self.ICMP][self.ICMPv4]))
616 # deny ip any any in the end
617 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
620 self.apply_rules(rules, "permit icmpv4")
622 # Traffic should still pass
623 self.run_verify_test(self.ICMP, self.IPV4,
624 self.proto[self.ICMP][self.ICMPv4])
626 self.logger.info("ACLP_TEST_FINISH_0004")
628 def test_0005_vpp624_permit_icmpv6(self):
629 """ VPP_624 permit ICMPv6
631 self.logger.info("ACLP_TEST_START_0005")
635 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
636 self.proto[self.ICMP][self.ICMPv6]))
637 # deny ip any any in the end
638 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
641 self.apply_rules(rules, "permit icmpv6")
643 # Traffic should still pass
644 self.run_verify_test(self.ICMP, self.IPV6,
645 self.proto[self.ICMP][self.ICMPv6])
647 self.logger.info("ACLP_TEST_FINISH_0005")
649 def test_0006_vpp624_deny_icmpv4(self):
650 """ VPP_624 deny ICMPv4
652 self.logger.info("ACLP_TEST_START_0006")
655 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
656 self.proto[self.ICMP][self.ICMPv4]))
657 # permit ip any any in the end
658 rules.append(self.create_rule(self.IPV4, self.PERMIT,
662 self.apply_rules(rules, "deny icmpv4")
664 # Traffic should not pass
665 self.run_verify_negat_test(self.ICMP, self.IPV4, 0)
667 self.logger.info("ACLP_TEST_FINISH_0006")
669 def test_0007_vpp624_deny_icmpv6(self):
670 """ VPP_624 deny ICMPv6
672 self.logger.info("ACLP_TEST_START_0007")
675 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
676 self.proto[self.ICMP][self.ICMPv6]))
677 # deny ip any any in the end
678 rules.append(self.create_rule(self.IPV6, self.PERMIT,
682 self.apply_rules(rules, "deny icmpv6")
684 # Traffic should not pass
685 self.run_verify_negat_test(self.ICMP, self.IPV6, 0)
687 self.logger.info("ACLP_TEST_FINISH_0007")
689 def test_0008_tcp_permit_v4(self):
692 self.logger.info("ACLP_TEST_START_0008")
696 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
697 self.proto[self.IP][self.TCP]))
698 # deny ip any any in the end
699 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
702 self.apply_rules(rules, "permit ipv4 tcp")
704 # Traffic should still pass
705 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP])
707 self.logger.info("ACLP_TEST_FINISH_0008")
709 def test_0009_tcp_permit_v6(self):
712 self.logger.info("ACLP_TEST_START_0009")
716 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
717 self.proto[self.IP][self.TCP]))
718 # deny ip any any in the end
719 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
722 self.apply_rules(rules, "permit ip6 tcp")
724 # Traffic should still pass
725 self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.TCP])
727 self.logger.info("ACLP_TEST_FINISH_0008")
729 def test_0010_udp_permit_v4(self):
732 self.logger.info("ACLP_TEST_START_0010")
736 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
737 self.proto[self.IP][self.UDP]))
738 # deny ip any any in the end
739 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
742 self.apply_rules(rules, "permit ipv udp")
744 # Traffic should still pass
745 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP])
747 self.logger.info("ACLP_TEST_FINISH_0010")
749 def test_0011_udp_permit_v6(self):
752 self.logger.info("ACLP_TEST_START_0011")
756 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
757 self.proto[self.IP][self.UDP]))
758 # deny ip any any in the end
759 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
762 self.apply_rules(rules, "permit ip6 udp")
764 # Traffic should still pass
765 self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.UDP])
767 self.logger.info("ACLP_TEST_FINISH_0011")
769 def test_0012_tcp_deny(self):
772 self.logger.info("ACLP_TEST_START_0012")
776 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
777 self.proto[self.IP][self.TCP]))
778 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
779 self.proto[self.IP][self.TCP]))
780 # permit ip any any in the end
781 rules.append(self.create_rule(self.IPV4, self.PERMIT,
783 rules.append(self.create_rule(self.IPV6, self.PERMIT,
787 self.apply_rules(rules, "deny ip4/ip6 tcp")
789 # Traffic should not pass
790 self.run_verify_negat_test(self.IP, self.IPRANDOM,
791 self.proto[self.IP][self.TCP])
793 self.logger.info("ACLP_TEST_FINISH_0012")
795 def test_0013_udp_deny(self):
798 self.logger.info("ACLP_TEST_START_0013")
802 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
803 self.proto[self.IP][self.UDP]))
804 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
805 self.proto[self.IP][self.UDP]))
806 # permit ip any any in the end
807 rules.append(self.create_rule(self.IPV4, self.PERMIT,
809 rules.append(self.create_rule(self.IPV6, self.PERMIT,
813 self.apply_rules(rules, "deny ip4/ip6 udp")
815 # Traffic should not pass
816 self.run_verify_negat_test(self.IP, self.IPRANDOM,
817 self.proto[self.IP][self.UDP])
819 self.logger.info("ACLP_TEST_FINISH_0013")
821 def test_0014_acl_dump(self):
822 """ verify add/dump acls
824 self.logger.info("ACLP_TEST_START_0014")
826 r = [[self.IPV4, self.PERMIT, 1234, self.proto[self.IP][self.TCP]],
827 [self.IPV4, self.PERMIT, 2345, self.proto[self.IP][self.UDP]],
828 [self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.TCP]],
829 [self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.UDP]],
830 [self.IPV4, self.PERMIT, 5, self.proto[self.ICMP][self.ICMPv4]],
831 [self.IPV6, self.PERMIT, 4321, self.proto[self.IP][self.TCP]],
832 [self.IPV6, self.PERMIT, 5432, self.proto[self.IP][self.UDP]],
833 [self.IPV6, self.PERMIT, 0, self.proto[self.IP][self.TCP]],
834 [self.IPV6, self.PERMIT, 0, self.proto[self.IP][self.UDP]],
835 [self.IPV6, self.PERMIT, 6, self.proto[self.ICMP][self.ICMPv6]],
836 [self.IPV4, self.DENY, self.PORTS_ALL, 0],
837 [self.IPV4, self.DENY, 1234, self.proto[self.IP][self.TCP]],
838 [self.IPV4, self.DENY, 2345, self.proto[self.IP][self.UDP]],
839 [self.IPV4, self.DENY, 5, self.proto[self.ICMP][self.ICMPv4]],
840 [self.IPV6, self.DENY, 4321, self.proto[self.IP][self.TCP]],
841 [self.IPV6, self.DENY, 5432, self.proto[self.IP][self.UDP]],
842 [self.IPV6, self.DENY, 6, self.proto[self.ICMP][self.ICMPv6]],
843 [self.IPV6, self.DENY, self.PORTS_ALL, 0]
846 # Add and verify new ACLs
848 for i in range(len(r)):
849 rules.append(self.create_rule(r[i][0], r[i][1], r[i][2], r[i][3]))
851 reply = self.vapi.acl_add_replace(acl_index=4294967295, r=rules)
852 result = self.vapi.acl_dump(reply.acl_index)
855 for drules in result:
857 self.assertEqual(dr.is_ipv6, r[i][0])
858 self.assertEqual(dr.is_permit, r[i][1])
859 self.assertEqual(dr.proto, r[i][3])
862 self.assertEqual(dr.srcport_or_icmptype_first, r[i][2])
865 self.assertEqual(dr.srcport_or_icmptype_first, 0)
866 self.assertEqual(dr.srcport_or_icmptype_last, 65535)
868 if dr.proto == self.proto[self.IP][self.TCP]:
869 self.assertGreater(dr.srcport_or_icmptype_first,
870 self.tcp_sport_from-1)
871 self.assertLess(dr.srcport_or_icmptype_first,
873 self.assertGreater(dr.dstport_or_icmpcode_last,
874 self.tcp_dport_from-1)
875 self.assertLess(dr.dstport_or_icmpcode_last,
877 elif dr.proto == self.proto[self.IP][self.UDP]:
878 self.assertGreater(dr.srcport_or_icmptype_first,
879 self.udp_sport_from-1)
880 self.assertLess(dr.srcport_or_icmptype_first,
882 self.assertGreater(dr.dstport_or_icmpcode_last,
883 self.udp_dport_from-1)
884 self.assertLess(dr.dstport_or_icmpcode_last,
888 self.logger.info("ACLP_TEST_FINISH_0014")
890 def test_0015_tcp_permit_port_v4(self):
891 """ permit single TCPv4
893 self.logger.info("ACLP_TEST_START_0015")
895 port = random.randint(0, 65535)
898 rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
899 self.proto[self.IP][self.TCP]))
900 # deny ip any any in the end
901 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
904 self.apply_rules(rules, "permit ip4 tcp "+str(port))
906 # Traffic should still pass
907 self.run_verify_test(self.IP, self.IPV4,
908 self.proto[self.IP][self.TCP], port)
910 self.logger.info("ACLP_TEST_FINISH_0015")
912 def test_0016_udp_permit_port_v4(self):
913 """ permit single UDPv4
915 self.logger.info("ACLP_TEST_START_0016")
917 port = random.randint(0, 65535)
920 rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
921 self.proto[self.IP][self.UDP]))
922 # deny ip any any in the end
923 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
926 self.apply_rules(rules, "permit ip4 tcp "+str(port))
928 # Traffic should still pass
929 self.run_verify_test(self.IP, self.IPV4,
930 self.proto[self.IP][self.UDP], port)
932 self.logger.info("ACLP_TEST_FINISH_0016")
934 def test_0017_tcp_permit_port_v6(self):
935 """ permit single TCPv6
937 self.logger.info("ACLP_TEST_START_0017")
939 port = random.randint(0, 65535)
942 rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
943 self.proto[self.IP][self.TCP]))
944 # deny ip any any in the end
945 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
948 self.apply_rules(rules, "permit ip4 tcp "+str(port))
950 # Traffic should still pass
951 self.run_verify_test(self.IP, self.IPV6,
952 self.proto[self.IP][self.TCP], port)
954 self.logger.info("ACLP_TEST_FINISH_0017")
956 def test_0018_udp_permit_port_v6(self):
957 """ permit single UPPv6
959 self.logger.info("ACLP_TEST_START_0018")
961 port = random.randint(0, 65535)
964 rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
965 self.proto[self.IP][self.UDP]))
966 # deny ip any any in the end
967 rules.append(self.create_rule(self.IPV6, self.DENY,
971 self.apply_rules(rules, "permit ip4 tcp "+str(port))
973 # Traffic should still pass
974 self.run_verify_test(self.IP, self.IPV6,
975 self.proto[self.IP][self.UDP], port)
977 self.logger.info("ACLP_TEST_FINISH_0018")
979 def test_0019_udp_deny_port(self):
980 """ deny single TCPv4/v6
982 self.logger.info("ACLP_TEST_START_0019")
984 port = random.randint(0, 65535)
987 rules.append(self.create_rule(self.IPV4, self.DENY, port,
988 self.proto[self.IP][self.TCP]))
989 rules.append(self.create_rule(self.IPV6, self.DENY, port,
990 self.proto[self.IP][self.TCP]))
991 # Permit ip any any in the end
992 rules.append(self.create_rule(self.IPV4, self.PERMIT,
994 rules.append(self.create_rule(self.IPV6, self.PERMIT,
998 self.apply_rules(rules, "deny ip4/ip6 udp "+str(port))
1000 # Traffic should not pass
1001 self.run_verify_negat_test(self.IP, self.IPRANDOM,
1002 self.proto[self.IP][self.TCP], port)
1004 self.logger.info("ACLP_TEST_FINISH_0019")
1006 def test_0020_udp_deny_port(self):
1007 """ deny single UDPv4/v6
1009 self.logger.info("ACLP_TEST_START_0020")
1011 port = random.randint(0, 65535)
1014 rules.append(self.create_rule(self.IPV4, self.DENY, port,
1015 self.proto[self.IP][self.UDP]))
1016 rules.append(self.create_rule(self.IPV6, self.DENY, port,
1017 self.proto[self.IP][self.UDP]))
1018 # Permit ip any any in the end
1019 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1021 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1025 self.apply_rules(rules, "deny ip4/ip6 udp "+str(port))
1027 # Traffic should not pass
1028 self.run_verify_negat_test(self.IP, self.IPRANDOM,
1029 self.proto[self.IP][self.UDP], port)
1031 self.logger.info("ACLP_TEST_FINISH_0020")
1033 def test_0021_udp_deny_port_verify_fragment_deny(self):
1034 """ deny single UDPv4/v6, permit ip any, verify non-initial fragment blocked
1036 self.logger.info("ACLP_TEST_START_0021")
1038 port = random.randint(0, 65535)
1041 rules.append(self.create_rule(self.IPV4, self.DENY, port,
1042 self.proto[self.IP][self.UDP]))
1043 rules.append(self.create_rule(self.IPV6, self.DENY, port,
1044 self.proto[self.IP][self.UDP]))
1045 # deny ip any any in the end
1046 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1048 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1052 self.apply_rules(rules, "deny ip4/ip6 udp "+str(port))
1054 # Traffic should not pass
1055 self.run_verify_negat_test(self.IP, self.IPRANDOM,
1056 self.proto[self.IP][self.UDP], port, True)
1058 self.logger.info("ACLP_TEST_FINISH_0021")
1060 def test_0022_zero_length_udp_ipv4(self):
1061 """ VPP-687 zero length udp ipv4 packet"""
1062 self.logger.info("ACLP_TEST_START_0022")
1064 port = random.randint(0, 65535)
1067 rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
1068 self.proto[self.IP][self.UDP]))
1069 # deny ip any any in the end
1071 self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1074 self.apply_rules(rules, "permit empty udp ip4 " + str(port))
1076 # Traffic should still pass
1077 # Create incoming packet streams for packet-generator interfaces
1079 pkts = self.create_stream(self.pg0, self.pg_if_packet_sizes,
1081 self.proto[self.IP][self.UDP], port,
1084 self.pg0.add_stream(pkts)
1085 pkts_cnt += len(pkts)
1087 # Enable packet capture and start packet sendingself.IPV
1088 self.pg_enable_capture(self.pg_interfaces)
1091 self.pg1.get_capture(pkts_cnt)
1093 self.logger.info("ACLP_TEST_FINISH_0022")
1095 def test_0023_zero_length_udp_ipv6(self):
1096 """ VPP-687 zero length udp ipv6 packet"""
1097 self.logger.info("ACLP_TEST_START_0023")
1099 port = random.randint(0, 65535)
1102 rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
1103 self.proto[self.IP][self.UDP]))
1104 # deny ip any any in the end
1105 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1108 self.apply_rules(rules, "permit empty udp ip6 "+str(port))
1110 # Traffic should still pass
1111 # Create incoming packet streams for packet-generator interfaces
1113 pkts = self.create_stream(self.pg0, self.pg_if_packet_sizes,
1115 self.proto[self.IP][self.UDP], port,
1118 self.pg0.add_stream(pkts)
1119 pkts_cnt += len(pkts)
1121 # Enable packet capture and start packet sendingself.IPV
1122 self.pg_enable_capture(self.pg_interfaces)
1125 # Verify outgoing packet streams per packet-generator interface
1126 self.pg1.get_capture(pkts_cnt)
1128 self.logger.info("ACLP_TEST_FINISH_0023")
1130 def test_0108_tcp_permit_v4(self):
1131 """ permit TCPv4 + non-match range
1133 self.logger.info("ACLP_TEST_START_0108")
1137 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1138 self.proto[self.IP][self.TCP]))
1139 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1140 self.proto[self.IP][self.TCP]))
1141 # deny ip any any in the end
1142 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1145 self.apply_rules(rules, "permit ipv4 tcp")
1147 # Traffic should still pass
1148 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP])
1150 self.logger.info("ACLP_TEST_FINISH_0108")
1152 def test_0109_tcp_permit_v6(self):
1153 """ permit TCPv6 + non-match range
1155 self.logger.info("ACLP_TEST_START_0109")
1159 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE_2,
1160 self.proto[self.IP][self.TCP]))
1161 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
1162 self.proto[self.IP][self.TCP]))
1163 # deny ip any any in the end
1164 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1167 self.apply_rules(rules, "permit ip6 tcp")
1169 # Traffic should still pass
1170 self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.TCP])
1172 self.logger.info("ACLP_TEST_FINISH_0109")
1174 def test_0110_udp_permit_v4(self):
1175 """ permit UDPv4 + non-match range
1177 self.logger.info("ACLP_TEST_START_0110")
1181 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1182 self.proto[self.IP][self.UDP]))
1183 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1184 self.proto[self.IP][self.UDP]))
1185 # deny ip any any in the end
1186 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1189 self.apply_rules(rules, "permit ipv4 udp")
1191 # Traffic should still pass
1192 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP])
1194 self.logger.info("ACLP_TEST_FINISH_0110")
1196 def test_0111_udp_permit_v6(self):
1197 """ permit UDPv6 + non-match range
1199 self.logger.info("ACLP_TEST_START_0111")
1203 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE_2,
1204 self.proto[self.IP][self.UDP]))
1205 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
1206 self.proto[self.IP][self.UDP]))
1207 # deny ip any any in the end
1208 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1211 self.apply_rules(rules, "permit ip6 udp")
1213 # Traffic should still pass
1214 self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.UDP])
1216 self.logger.info("ACLP_TEST_FINISH_0111")
1218 def test_0112_tcp_deny(self):
1219 """ deny TCPv4/v6 + non-match range
1221 self.logger.info("ACLP_TEST_START_0112")
1225 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1227 self.proto[self.IP][self.TCP]))
1228 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1230 self.proto[self.IP][self.TCP]))
1231 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
1232 self.proto[self.IP][self.TCP]))
1233 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
1234 self.proto[self.IP][self.TCP]))
1235 # permit ip any any in the end
1236 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1238 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1242 self.apply_rules(rules, "deny ip4/ip6 tcp")
1244 # Traffic should not pass
1245 self.run_verify_negat_test(self.IP, self.IPRANDOM,
1246 self.proto[self.IP][self.TCP])
1248 self.logger.info("ACLP_TEST_FINISH_0112")
1250 def test_0113_udp_deny(self):
1251 """ deny UDPv4/v6 + non-match range
1253 self.logger.info("ACLP_TEST_START_0113")
1257 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1259 self.proto[self.IP][self.UDP]))
1260 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1262 self.proto[self.IP][self.UDP]))
1263 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
1264 self.proto[self.IP][self.UDP]))
1265 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
1266 self.proto[self.IP][self.UDP]))
1267 # permit ip any any in the end
1268 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1270 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1274 self.apply_rules(rules, "deny ip4/ip6 udp")
1276 # Traffic should not pass
1277 self.run_verify_negat_test(self.IP, self.IPRANDOM,
1278 self.proto[self.IP][self.UDP])
1280 self.logger.info("ACLP_TEST_FINISH_0113")
1283 if __name__ == '__main__':
1284 unittest.main(testRunner=VppTestRunner)