2 """ACL plugin Test Case HLD:
8 from scapy.packet import Raw
9 from scapy.layers.l2 import Ether
10 from scapy.layers.inet import IP, TCP, UDP, ICMP
11 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest
12 from scapy.layers.inet6 import IPv6ExtHdrFragment
13 from framework import VppTestCase, VppTestRunner
14 from util import Host, ppp
17 class TestACLplugin(VppTestCase):
18 """ ACL plugin Test Case """
34 proto = [[6, 17], [1, 58]]
35 proto_map = {1: 'ICMP', 58: 'ICMPv6EchoRequest', 6: 'TCP', 17: 'UDP'}
47 udp_sport_to = udp_sport_from + 5
48 udp_dport_from = 20000
49 udp_dport_to = udp_dport_from + 5000
51 tcp_sport_to = tcp_sport_from + 5
52 tcp_dport_from = 40000
53 tcp_dport_to = tcp_dport_from + 5000
56 udp_sport_to_2 = udp_sport_from_2 + 5
57 udp_dport_from_2 = 30000
58 udp_dport_to_2 = udp_dport_from_2 + 5000
59 tcp_sport_from_2 = 130
60 tcp_sport_to_2 = tcp_sport_from_2 + 5
61 tcp_dport_from_2 = 20000
62 tcp_dport_to_2 = tcp_dport_from_2 + 5000
64 icmp4_type = 8 # echo request
66 icmp6_type = 128 # echo request
82 Perform standard class setup (defined by class method setUpClass in
83 class VppTestCase) before running the test case, set test case related
84 variables and configure VPP.
86 super(TestACLplugin, cls).setUpClass()
91 # Create 2 pg interfaces
92 cls.create_pg_interfaces(range(2))
94 # Packet flows mapping pg0 -> pg1, pg2 etc.
96 cls.flows[cls.pg0] = [cls.pg1]
99 cls.pg_if_packet_sizes = [64, 512, 1518, 9018]
101 # Create BD with MAC learning and unknown unicast flooding disabled
102 # and put interfaces to this BD
103 cls.vapi.bridge_domain_add_del(bd_id=cls.bd_id, uu_flood=1,
105 for pg_if in cls.pg_interfaces:
106 cls.vapi.sw_interface_set_l2_bridge(pg_if.sw_if_index,
109 # Set up all interfaces
110 for i in cls.pg_interfaces:
113 # Mapping between packet-generator index and lists of test hosts
114 cls.hosts_by_pg_idx = dict()
115 for pg_if in cls.pg_interfaces:
116 cls.hosts_by_pg_idx[pg_if.sw_if_index] = []
118 # Create list of deleted hosts
119 cls.deleted_hosts_by_pg_idx = dict()
120 for pg_if in cls.pg_interfaces:
121 cls.deleted_hosts_by_pg_idx[pg_if.sw_if_index] = []
123 # warm-up the mac address tables
127 super(TestACLplugin, cls).tearDownClass()
131 super(TestACLplugin, self).setUp()
132 self.reset_packet_infos()
136 Show various debug prints after each test.
138 super(TestACLplugin, self).tearDown()
139 if not self.vpp_dead:
140 self.logger.info(self.vapi.ppcli("show l2fib verbose"))
141 self.logger.info(self.vapi.ppcli("show acl-plugin acl"))
142 self.logger.info(self.vapi.ppcli("show acl-plugin interface"))
143 self.logger.info(self.vapi.ppcli("show acl-plugin tables"))
144 self.logger.info(self.vapi.ppcli("show bridge-domain %s detail"
147 def create_hosts(self, count, start=0):
149 Create required number of host MAC addresses and distribute them among
150 interfaces. Create host IPv4 address for every host MAC address.
152 :param int count: Number of hosts to create MAC/IPv4 addresses for.
153 :param int start: Number to start numbering from.
155 n_int = len(self.pg_interfaces)
156 macs_per_if = count / n_int
158 for pg_if in self.pg_interfaces:
160 start_nr = macs_per_if * i + start
161 end_nr = count + start if i == (n_int - 1) \
162 else macs_per_if * (i + 1) + start
163 hosts = self.hosts_by_pg_idx[pg_if.sw_if_index]
164 for j in range(start_nr, end_nr):
166 "00:00:00:ff:%02x:%02x" % (pg_if.sw_if_index, j),
167 "172.17.1%02x.%u" % (pg_if.sw_if_index, j),
168 "2017:dead:%02x::%u" % (pg_if.sw_if_index, j))
171 def create_rule(self, ip=0, permit_deny=0, ports=PORTS_ALL, proto=-1,
172 s_prefix=0, s_ip='\x00\x00\x00\x00',
173 d_prefix=0, d_ip='\x00\x00\x00\x00'):
176 if ports == self.PORTS_ALL:
179 sport_to = 65535 if proto != 1 and proto != 58 else 255
181 elif ports == self.PORTS_RANGE:
183 sport_from = self.icmp4_type
184 sport_to = self.icmp4_type
185 dport_from = self.icmp4_code
186 dport_to = self.icmp4_code
188 sport_from = self.icmp6_type
189 sport_to = self.icmp6_type
190 dport_from = self.icmp6_code
191 dport_to = self.icmp6_code
192 elif proto == self.proto[self.IP][self.TCP]:
193 sport_from = self.tcp_sport_from
194 sport_to = self.tcp_sport_to
195 dport_from = self.tcp_dport_from
196 dport_to = self.tcp_dport_to
197 elif proto == self.proto[self.IP][self.UDP]:
198 sport_from = self.udp_sport_from
199 sport_to = self.udp_sport_to
200 dport_from = self.udp_dport_from
201 dport_to = self.udp_dport_to
202 elif ports == self.PORTS_RANGE_2:
204 sport_from = self.icmp4_type_2
205 sport_to = self.icmp4_type_2
206 dport_from = self.icmp4_code_from_2
207 dport_to = self.icmp4_code_to_2
209 sport_from = self.icmp6_type_2
210 sport_to = self.icmp6_type_2
211 dport_from = self.icmp6_code_from_2
212 dport_to = self.icmp6_code_to_2
213 elif proto == self.proto[self.IP][self.TCP]:
214 sport_from = self.tcp_sport_from_2
215 sport_to = self.tcp_sport_to_2
216 dport_from = self.tcp_dport_from_2
217 dport_to = self.tcp_dport_to_2
218 elif proto == self.proto[self.IP][self.UDP]:
219 sport_from = self.udp_sport_from_2
220 sport_to = self.udp_sport_to_2
221 dport_from = self.udp_dport_from_2
222 dport_to = self.udp_dport_to_2
229 rule = ({'is_permit': permit_deny, 'is_ipv6': ip, 'proto': proto,
230 'srcport_or_icmptype_first': sport_from,
231 'srcport_or_icmptype_last': sport_to,
232 'src_ip_prefix_len': s_prefix,
234 'dstport_or_icmpcode_first': dport_from,
235 'dstport_or_icmpcode_last': dport_to,
236 'dst_ip_prefix_len': d_prefix,
237 'dst_ip_addr': d_ip})
240 def apply_rules(self, rules, tag=''):
241 reply = self.api_acl_add_replace(acl_index=4294967295, r=rules,
244 self.logger.info("Dumped ACL: " + str(
245 self.api_acl_dump(reply.acl_index)))
246 # Apply a ACL on the interface as inbound
247 for i in self.pg_interfaces:
248 self.api_acl_interface_set_acl_list(sw_if_index=i.sw_if_index,
250 acls=[reply.acl_index])
253 def create_upper_layer(self, packet_index, proto, ports=0):
254 p = self.proto_map[proto]
257 return UDP(sport=random.randint(self.udp_sport_from,
259 dport=random.randint(self.udp_dport_from,
262 return UDP(sport=ports, dport=ports)
265 return TCP(sport=random.randint(self.tcp_sport_from,
267 dport=random.randint(self.tcp_dport_from,
270 return TCP(sport=ports, dport=ports)
273 def create_stream(self, src_if, packet_sizes, traffic_type=0, ipv6=0,
274 proto=-1, ports=0, fragments=False, pkt_raw=True):
276 Create input packet stream for defined interface using hosts or
279 :param object src_if: Interface to create packet stream for.
280 :param list packet_sizes: List of required packet sizes.
281 :param traffic_type: 1: ICMP packet, 2: IPv6 with EH, 0: otherwise.
282 :return: Stream of packets.
285 if self.flows.__contains__(src_if):
286 src_hosts = self.hosts_by_pg_idx[src_if.sw_if_index]
287 for dst_if in self.flows[src_if]:
288 dst_hosts = self.hosts_by_pg_idx[dst_if.sw_if_index]
289 n_int = len(dst_hosts) * len(src_hosts)
290 for i in range(0, n_int):
291 dst_host = dst_hosts[i / len(src_hosts)]
292 src_host = src_hosts[i % len(src_hosts)]
293 pkt_info = self.create_packet_info(src_if, dst_if)
299 pkt_info.ip = random.choice([0, 1])
301 pkt_info.proto = random.choice(self.proto[self.IP])
303 pkt_info.proto = proto
304 payload = self.info_to_payload(pkt_info)
305 p = Ether(dst=dst_host.mac, src=src_host.mac)
307 p /= IPv6(dst=dst_host.ip6, src=src_host.ip6)
309 p /= IPv6ExtHdrFragment(offset=64, m=1)
312 p /= IP(src=src_host.ip4, dst=dst_host.ip4,
315 p /= IP(src=src_host.ip4, dst=dst_host.ip4)
316 if traffic_type == self.ICMP:
318 p /= ICMPv6EchoRequest(type=self.icmp6_type,
319 code=self.icmp6_code)
321 p /= ICMP(type=self.icmp4_type,
322 code=self.icmp4_code)
324 p /= self.create_upper_layer(i, pkt_info.proto, ports)
327 pkt_info.data = p.copy()
329 size = random.choice(packet_sizes)
330 self.extend_packet(p, size)
334 def verify_capture(self, pg_if, capture, traffic_type=0, ip_type=0):
336 Verify captured input packet stream for defined interface.
338 :param object pg_if: Interface to verify captured packet stream for.
339 :param list capture: Captured packet stream.
340 :param traffic_type: 1: ICMP packet, 2: IPv6 with EH, 0: otherwise.
343 for i in self.pg_interfaces:
344 last_info[i.sw_if_index] = None
345 dst_sw_if_index = pg_if.sw_if_index
346 for packet in capture:
348 # Raw data for ICMPv6 are stored in ICMPv6EchoRequest.data
349 if traffic_type == self.ICMP and ip_type == self.IPV6:
350 payload_info = self.payload_to_info(
351 packet[ICMPv6EchoRequest].data)
352 payload = packet[ICMPv6EchoRequest]
354 payload_info = self.payload_to_info(str(packet[Raw]))
355 payload = packet[self.proto_map[payload_info.proto]]
357 self.logger.error(ppp("Unexpected or invalid packet "
358 "(outside network):", packet))
362 self.assertEqual(payload_info.ip, ip_type)
363 if traffic_type == self.ICMP:
365 if payload_info.ip == 0:
366 self.assertEqual(payload.type, self.icmp4_type)
367 self.assertEqual(payload.code, self.icmp4_code)
369 self.assertEqual(payload.type, self.icmp6_type)
370 self.assertEqual(payload.code, self.icmp6_code)
372 self.logger.error(ppp("Unexpected or invalid packet "
373 "(outside network):", packet))
377 ip_version = IPv6 if payload_info.ip == 1 else IP
379 ip = packet[ip_version]
380 packet_index = payload_info.index
382 self.assertEqual(payload_info.dst, dst_sw_if_index)
383 self.logger.debug("Got packet on port %s: src=%u (id=%u)" %
384 (pg_if.name, payload_info.src,
386 next_info = self.get_next_packet_info_for_interface2(
387 payload_info.src, dst_sw_if_index,
388 last_info[payload_info.src])
389 last_info[payload_info.src] = next_info
390 self.assertTrue(next_info is not None)
391 self.assertEqual(packet_index, next_info.index)
392 saved_packet = next_info.data
393 # Check standard fields
394 self.assertEqual(ip.src, saved_packet[ip_version].src)
395 self.assertEqual(ip.dst, saved_packet[ip_version].dst)
396 p = self.proto_map[payload_info.proto]
399 self.assertEqual(tcp.sport, saved_packet[
401 self.assertEqual(tcp.dport, saved_packet[
405 self.assertEqual(udp.sport, saved_packet[
407 self.assertEqual(udp.dport, saved_packet[
410 self.logger.error(ppp("Unexpected or invalid packet:",
413 for i in self.pg_interfaces:
414 remaining_packet = self.get_next_packet_info_for_interface2(
415 i, dst_sw_if_index, last_info[i.sw_if_index])
417 remaining_packet is None,
418 "Port %u: Packet expected from source %u didn't arrive" %
419 (dst_sw_if_index, i.sw_if_index))
421 def run_traffic_no_check(self):
423 # Create incoming packet streams for packet-generator interfaces
424 for i in self.pg_interfaces:
425 if self.flows.__contains__(i):
426 pkts = self.create_stream(i, self.pg_if_packet_sizes)
430 # Enable packet capture and start packet sending
431 self.pg_enable_capture(self.pg_interfaces)
434 def run_verify_test(self, traffic_type=0, ip_type=0, proto=-1, ports=0,
435 frags=False, pkt_raw=True):
437 # Create incoming packet streams for packet-generator interfaces
439 for i in self.pg_interfaces:
440 if self.flows.__contains__(i):
441 pkts = self.create_stream(i, self.pg_if_packet_sizes,
442 traffic_type, ip_type, proto, ports,
446 pkts_cnt += len(pkts)
448 # Enable packet capture and start packet sendingself.IPV
449 self.pg_enable_capture(self.pg_interfaces)
453 # Verify outgoing packet streams per packet-generator interface
454 for src_if in self.pg_interfaces:
455 if self.flows.__contains__(src_if):
456 for dst_if in self.flows[src_if]:
457 capture = dst_if.get_capture(pkts_cnt)
458 self.logger.info("Verifying capture on interface %s" %
460 self.verify_capture(dst_if, capture, traffic_type, ip_type)
462 def run_verify_negat_test(self, traffic_type=0, ip_type=0, proto=-1,
463 ports=0, frags=False):
465 self.reset_packet_infos()
466 for i in self.pg_interfaces:
467 if self.flows.__contains__(i):
468 pkts = self.create_stream(i, self.pg_if_packet_sizes,
469 traffic_type, ip_type, proto, ports,
474 # Enable packet capture and start packet sending
475 self.pg_enable_capture(self.pg_interfaces)
479 # Verify outgoing packet streams per packet-generator interface
480 for src_if in self.pg_interfaces:
481 if self.flows.__contains__(src_if):
482 for dst_if in self.flows[src_if]:
483 self.logger.info("Verifying capture on interface %s" %
485 capture = dst_if.get_capture(0)
486 self.assertEqual(len(capture), 0)
488 def api_acl_add_replace(self, acl_index, r, count, tag='',
490 """Add/replace an ACL
492 :param int acl_index: ACL index to replace,
493 4294967295 to create new ACL.
494 :param acl_rule r: ACL rules array.
495 :param str tag: symbolic tag (description) for this ACL.
496 :param int count: number of rules.
498 return self.vapi.api(self.vapi.papi.acl_add_replace,
499 {'acl_index': acl_index,
503 expected_retval=expected_retval)
505 def api_acl_interface_set_acl_list(self, sw_if_index, count, n_input, acls,
507 return self.vapi.api(self.vapi.papi.acl_interface_set_acl_list,
508 {'sw_if_index': sw_if_index,
512 expected_retval=expected_retval)
514 def api_acl_dump(self, acl_index, expected_retval=0):
515 return self.vapi.api(self.vapi.papi.acl_dump,
516 {'acl_index': acl_index},
517 expected_retval=expected_retval)
519 def test_0000_warmup_test(self):
520 """ ACL plugin version check; learn MACs
522 self.create_hosts(16)
523 self.run_traffic_no_check()
524 reply = self.vapi.papi.acl_plugin_get_version()
525 self.assertEqual(reply.major, 1)
526 self.logger.info("Working with ACL plugin version: %d.%d" % (
527 reply.major, reply.minor))
528 # minor version changes are non breaking
529 # self.assertEqual(reply.minor, 0)
531 def test_0001_acl_create(self):
535 self.logger.info("ACLP_TEST_START_0001")
537 r = [{'is_permit': 1, 'is_ipv6': 0, 'proto': 17,
538 'srcport_or_icmptype_first': 1234,
539 'srcport_or_icmptype_last': 1235,
540 'src_ip_prefix_len': 0,
541 'src_ip_addr': '\x00\x00\x00\x00',
542 'dstport_or_icmpcode_first': 1234,
543 'dstport_or_icmpcode_last': 1234,
544 'dst_ip_addr': '\x00\x00\x00\x00',
545 'dst_ip_prefix_len': 0}]
546 # Test 1: add a new ACL
547 reply = self.api_acl_add_replace(acl_index=4294967295, r=r,
548 count=len(r), tag="permit 1234")
549 self.assertEqual(reply.retval, 0)
550 # The very first ACL gets #0
551 self.assertEqual(reply.acl_index, 0)
552 rr = self.api_acl_dump(reply.acl_index)
553 self.logger.info("Dumped ACL: " + str(rr))
554 self.assertEqual(len(rr), 1)
555 # We should have the same number of ACL entries as we had asked
556 self.assertEqual(len(rr[0].r), len(r))
557 # The rules should be the same. But because the submitted and returned
558 # are different types, we need to iterate over rules and keys to get
560 for i_rule in range(0, len(r) - 1):
561 for rule_key in r[i_rule]:
562 self.assertEqual(rr[0].r[i_rule][rule_key],
565 # Add a deny-1234 ACL
566 r_deny = ({'is_permit': 0, 'is_ipv6': 0, 'proto': 17,
567 'srcport_or_icmptype_first': 1234,
568 'srcport_or_icmptype_last': 1235,
569 'src_ip_prefix_len': 0,
570 'src_ip_addr': '\x00\x00\x00\x00',
571 'dstport_or_icmpcode_first': 1234,
572 'dstport_or_icmpcode_last': 1234,
573 'dst_ip_addr': '\x00\x00\x00\x00',
574 'dst_ip_prefix_len': 0},
575 {'is_permit': 1, 'is_ipv6': 0, 'proto': 17,
576 'srcport_or_icmptype_first': 0,
577 'srcport_or_icmptype_last': 0,
578 'src_ip_prefix_len': 0,
579 'src_ip_addr': '\x00\x00\x00\x00',
580 'dstport_or_icmpcode_first': 0,
581 'dstport_or_icmpcode_last': 0,
582 'dst_ip_addr': '\x00\x00\x00\x00',
583 'dst_ip_prefix_len': 0})
585 reply = self.api_acl_add_replace(acl_index=4294967295, r=r_deny,
587 tag="deny 1234;permit all")
588 self.assertEqual(reply.retval, 0)
589 # The second ACL gets #1
590 self.assertEqual(reply.acl_index, 1)
592 # Test 2: try to modify a nonexistent ACL
593 reply = self.api_acl_add_replace(acl_index=432, r=r, count=len(r),
594 tag="FFFF:FFFF", expected_retval=-1)
595 self.assertEqual(reply.retval, -1)
596 # The ACL number should pass through
597 self.assertEqual(reply.acl_index, 432)
599 self.logger.info("ACLP_TEST_FINISH_0001")
601 def test_0002_acl_permit_apply(self):
602 """ permit ACL apply test
604 self.logger.info("ACLP_TEST_START_0002")
607 rules.append(self.create_rule(self.IPV4, self.PERMIT,
608 0, self.proto[self.IP][self.UDP]))
609 rules.append(self.create_rule(self.IPV4, self.PERMIT,
610 0, self.proto[self.IP][self.TCP]))
613 self.apply_rules(rules, "permit per-flow")
615 # Traffic should still pass
616 self.run_verify_test(self.IP, self.IPV4, -1)
617 self.logger.info("ACLP_TEST_FINISH_0002")
619 def test_0003_acl_deny_apply(self):
620 """ deny ACL apply test
622 self.logger.info("ACLP_TEST_START_0003")
623 # Add a deny-flows ACL
625 rules.append(self.create_rule(self.IPV4, self.DENY,
626 self.PORTS_ALL, self.proto[self.IP][self.UDP]))
627 # Permit ip any any in the end
628 rules.append(self.create_rule(self.IPV4, self.PERMIT,
632 self.apply_rules(rules, "deny per-flow;permit all")
634 # Traffic should not pass
635 self.run_verify_negat_test(self.IP, self.IPV4,
636 self.proto[self.IP][self.UDP])
637 self.logger.info("ACLP_TEST_FINISH_0003")
638 # self.assertEqual(1, 0)
640 def test_0004_vpp624_permit_icmpv4(self):
641 """ VPP_624 permit ICMPv4
643 self.logger.info("ACLP_TEST_START_0004")
647 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
648 self.proto[self.ICMP][self.ICMPv4]))
649 # deny ip any any in the end
650 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
653 self.apply_rules(rules, "permit icmpv4")
655 # Traffic should still pass
656 self.run_verify_test(self.ICMP, self.IPV4,
657 self.proto[self.ICMP][self.ICMPv4])
659 self.logger.info("ACLP_TEST_FINISH_0004")
661 def test_0005_vpp624_permit_icmpv6(self):
662 """ VPP_624 permit ICMPv6
664 self.logger.info("ACLP_TEST_START_0005")
668 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
669 self.proto[self.ICMP][self.ICMPv6]))
670 # deny ip any any in the end
671 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
674 self.apply_rules(rules, "permit icmpv6")
676 # Traffic should still pass
677 self.run_verify_test(self.ICMP, self.IPV6,
678 self.proto[self.ICMP][self.ICMPv6])
680 self.logger.info("ACLP_TEST_FINISH_0005")
682 def test_0006_vpp624_deny_icmpv4(self):
683 """ VPP_624 deny ICMPv4
685 self.logger.info("ACLP_TEST_START_0006")
688 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
689 self.proto[self.ICMP][self.ICMPv4]))
690 # permit ip any any in the end
691 rules.append(self.create_rule(self.IPV4, self.PERMIT,
695 self.apply_rules(rules, "deny icmpv4")
697 # Traffic should not pass
698 self.run_verify_negat_test(self.ICMP, self.IPV4, 0)
700 self.logger.info("ACLP_TEST_FINISH_0006")
702 def test_0007_vpp624_deny_icmpv6(self):
703 """ VPP_624 deny ICMPv6
705 self.logger.info("ACLP_TEST_START_0007")
708 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
709 self.proto[self.ICMP][self.ICMPv6]))
710 # deny ip any any in the end
711 rules.append(self.create_rule(self.IPV6, self.PERMIT,
715 self.apply_rules(rules, "deny icmpv6")
717 # Traffic should not pass
718 self.run_verify_negat_test(self.ICMP, self.IPV6, 0)
720 self.logger.info("ACLP_TEST_FINISH_0007")
722 def test_0008_tcp_permit_v4(self):
725 self.logger.info("ACLP_TEST_START_0008")
729 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
730 self.proto[self.IP][self.TCP]))
731 # deny ip any any in the end
732 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
735 self.apply_rules(rules, "permit ipv4 tcp")
737 # Traffic should still pass
738 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP])
740 self.logger.info("ACLP_TEST_FINISH_0008")
742 def test_0009_tcp_permit_v6(self):
745 self.logger.info("ACLP_TEST_START_0009")
749 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
750 self.proto[self.IP][self.TCP]))
751 # deny ip any any in the end
752 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
755 self.apply_rules(rules, "permit ip6 tcp")
757 # Traffic should still pass
758 self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.TCP])
760 self.logger.info("ACLP_TEST_FINISH_0008")
762 def test_0010_udp_permit_v4(self):
765 self.logger.info("ACLP_TEST_START_0010")
769 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
770 self.proto[self.IP][self.UDP]))
771 # deny ip any any in the end
772 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
775 self.apply_rules(rules, "permit ipv udp")
777 # Traffic should still pass
778 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP])
780 self.logger.info("ACLP_TEST_FINISH_0010")
782 def test_0011_udp_permit_v6(self):
785 self.logger.info("ACLP_TEST_START_0011")
789 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
790 self.proto[self.IP][self.UDP]))
791 # deny ip any any in the end
792 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
795 self.apply_rules(rules, "permit ip6 udp")
797 # Traffic should still pass
798 self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.UDP])
800 self.logger.info("ACLP_TEST_FINISH_0011")
802 def test_0012_tcp_deny(self):
805 self.logger.info("ACLP_TEST_START_0012")
809 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
810 self.proto[self.IP][self.TCP]))
811 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
812 self.proto[self.IP][self.TCP]))
813 # permit ip any any in the end
814 rules.append(self.create_rule(self.IPV4, self.PERMIT,
816 rules.append(self.create_rule(self.IPV6, self.PERMIT,
820 self.apply_rules(rules, "deny ip4/ip6 tcp")
822 # Traffic should not pass
823 self.run_verify_negat_test(self.IP, self.IPRANDOM,
824 self.proto[self.IP][self.TCP])
826 self.logger.info("ACLP_TEST_FINISH_0012")
828 def test_0013_udp_deny(self):
831 self.logger.info("ACLP_TEST_START_0013")
835 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
836 self.proto[self.IP][self.UDP]))
837 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
838 self.proto[self.IP][self.UDP]))
839 # permit ip any any in the end
840 rules.append(self.create_rule(self.IPV4, self.PERMIT,
842 rules.append(self.create_rule(self.IPV6, self.PERMIT,
846 self.apply_rules(rules, "deny ip4/ip6 udp")
848 # Traffic should not pass
849 self.run_verify_negat_test(self.IP, self.IPRANDOM,
850 self.proto[self.IP][self.UDP])
852 self.logger.info("ACLP_TEST_FINISH_0013")
854 def test_0014_acl_dump(self):
855 """ verify add/dump acls
857 self.logger.info("ACLP_TEST_START_0014")
859 r = [[self.IPV4, self.PERMIT, 1234, self.proto[self.IP][self.TCP]],
860 [self.IPV4, self.PERMIT, 2345, self.proto[self.IP][self.UDP]],
861 [self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.TCP]],
862 [self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.UDP]],
863 [self.IPV4, self.PERMIT, 5, self.proto[self.ICMP][self.ICMPv4]],
864 [self.IPV6, self.PERMIT, 4321, self.proto[self.IP][self.TCP]],
865 [self.IPV6, self.PERMIT, 5432, self.proto[self.IP][self.UDP]],
866 [self.IPV6, self.PERMIT, 0, self.proto[self.IP][self.TCP]],
867 [self.IPV6, self.PERMIT, 0, self.proto[self.IP][self.UDP]],
868 [self.IPV6, self.PERMIT, 6, self.proto[self.ICMP][self.ICMPv6]],
869 [self.IPV4, self.DENY, self.PORTS_ALL, 0],
870 [self.IPV4, self.DENY, 1234, self.proto[self.IP][self.TCP]],
871 [self.IPV4, self.DENY, 2345, self.proto[self.IP][self.UDP]],
872 [self.IPV4, self.DENY, 5, self.proto[self.ICMP][self.ICMPv4]],
873 [self.IPV6, self.DENY, 4321, self.proto[self.IP][self.TCP]],
874 [self.IPV6, self.DENY, 5432, self.proto[self.IP][self.UDP]],
875 [self.IPV6, self.DENY, 6, self.proto[self.ICMP][self.ICMPv6]],
876 [self.IPV6, self.DENY, self.PORTS_ALL, 0]
879 # Add and verify new ACLs
881 for i in range(len(r)):
882 rules.append(self.create_rule(r[i][0], r[i][1], r[i][2], r[i][3]))
884 reply = self.api_acl_add_replace(acl_index=4294967295, r=rules,
886 result = self.api_acl_dump(reply.acl_index)
889 for drules in result:
891 self.assertEqual(dr.is_ipv6, r[i][0])
892 self.assertEqual(dr.is_permit, r[i][1])
893 self.assertEqual(dr.proto, r[i][3])
896 self.assertEqual(dr.srcport_or_icmptype_first, r[i][2])
899 self.assertEqual(dr.srcport_or_icmptype_first, 0)
900 self.assertEqual(dr.srcport_or_icmptype_last, 65535)
902 if dr.proto == self.proto[self.IP][self.TCP]:
903 self.assertGreater(dr.srcport_or_icmptype_first,
904 self.tcp_sport_from-1)
905 self.assertLess(dr.srcport_or_icmptype_first,
907 self.assertGreater(dr.dstport_or_icmpcode_last,
908 self.tcp_dport_from-1)
909 self.assertLess(dr.dstport_or_icmpcode_last,
911 elif dr.proto == self.proto[self.IP][self.UDP]:
912 self.assertGreater(dr.srcport_or_icmptype_first,
913 self.udp_sport_from-1)
914 self.assertLess(dr.srcport_or_icmptype_first,
916 self.assertGreater(dr.dstport_or_icmpcode_last,
917 self.udp_dport_from-1)
918 self.assertLess(dr.dstport_or_icmpcode_last,
922 self.logger.info("ACLP_TEST_FINISH_0014")
924 def test_0015_tcp_permit_port_v4(self):
925 """ permit single TCPv4
927 self.logger.info("ACLP_TEST_START_0015")
929 port = random.randint(0, 65535)
932 rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
933 self.proto[self.IP][self.TCP]))
934 # deny ip any any in the end
935 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
938 self.apply_rules(rules, "permit ip4 tcp "+str(port))
940 # Traffic should still pass
941 self.run_verify_test(self.IP, self.IPV4,
942 self.proto[self.IP][self.TCP], port)
944 self.logger.info("ACLP_TEST_FINISH_0015")
946 def test_0016_udp_permit_port_v4(self):
947 """ permit single UDPv4
949 self.logger.info("ACLP_TEST_START_0016")
951 port = random.randint(0, 65535)
954 rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
955 self.proto[self.IP][self.UDP]))
956 # deny ip any any in the end
957 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
960 self.apply_rules(rules, "permit ip4 tcp "+str(port))
962 # Traffic should still pass
963 self.run_verify_test(self.IP, self.IPV4,
964 self.proto[self.IP][self.UDP], port)
966 self.logger.info("ACLP_TEST_FINISH_0016")
968 def test_0017_tcp_permit_port_v6(self):
969 """ permit single TCPv6
971 self.logger.info("ACLP_TEST_START_0017")
973 port = random.randint(0, 65535)
976 rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
977 self.proto[self.IP][self.TCP]))
978 # deny ip any any in the end
979 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
982 self.apply_rules(rules, "permit ip4 tcp "+str(port))
984 # Traffic should still pass
985 self.run_verify_test(self.IP, self.IPV6,
986 self.proto[self.IP][self.TCP], port)
988 self.logger.info("ACLP_TEST_FINISH_0017")
990 def test_0018_udp_permit_port_v6(self):
991 """ permit single UPPv6
993 self.logger.info("ACLP_TEST_START_0018")
995 port = random.randint(0, 65535)
998 rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
999 self.proto[self.IP][self.UDP]))
1000 # deny ip any any in the end
1001 rules.append(self.create_rule(self.IPV6, self.DENY,
1005 self.apply_rules(rules, "permit ip4 tcp "+str(port))
1007 # Traffic should still pass
1008 self.run_verify_test(self.IP, self.IPV6,
1009 self.proto[self.IP][self.UDP], port)
1011 self.logger.info("ACLP_TEST_FINISH_0018")
1013 def test_0019_udp_deny_port(self):
1014 """ deny single TCPv4/v6
1016 self.logger.info("ACLP_TEST_START_0019")
1018 port = random.randint(0, 65535)
1021 rules.append(self.create_rule(self.IPV4, self.DENY, port,
1022 self.proto[self.IP][self.TCP]))
1023 rules.append(self.create_rule(self.IPV6, self.DENY, port,
1024 self.proto[self.IP][self.TCP]))
1025 # Permit ip any any in the end
1026 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1028 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1032 self.apply_rules(rules, "deny ip4/ip6 udp "+str(port))
1034 # Traffic should not pass
1035 self.run_verify_negat_test(self.IP, self.IPRANDOM,
1036 self.proto[self.IP][self.TCP], port)
1038 self.logger.info("ACLP_TEST_FINISH_0019")
1040 def test_0020_udp_deny_port(self):
1041 """ deny single UDPv4/v6
1043 self.logger.info("ACLP_TEST_START_0020")
1045 port = random.randint(0, 65535)
1048 rules.append(self.create_rule(self.IPV4, self.DENY, port,
1049 self.proto[self.IP][self.UDP]))
1050 rules.append(self.create_rule(self.IPV6, self.DENY, port,
1051 self.proto[self.IP][self.UDP]))
1052 # Permit ip any any in the end
1053 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1055 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1059 self.apply_rules(rules, "deny ip4/ip6 udp "+str(port))
1061 # Traffic should not pass
1062 self.run_verify_negat_test(self.IP, self.IPRANDOM,
1063 self.proto[self.IP][self.UDP], port)
1065 self.logger.info("ACLP_TEST_FINISH_0020")
1067 def test_0021_udp_deny_port_verify_fragment_deny(self):
1068 """ deny single UDPv4/v6, permit ip any, verify non-initial fragment blocked
1070 self.logger.info("ACLP_TEST_START_0021")
1072 port = random.randint(0, 65535)
1075 rules.append(self.create_rule(self.IPV4, self.DENY, port,
1076 self.proto[self.IP][self.UDP]))
1077 rules.append(self.create_rule(self.IPV6, self.DENY, port,
1078 self.proto[self.IP][self.UDP]))
1079 # deny ip any any in the end
1080 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1082 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1086 self.apply_rules(rules, "deny ip4/ip6 udp "+str(port))
1088 # Traffic should not pass
1089 self.run_verify_negat_test(self.IP, self.IPRANDOM,
1090 self.proto[self.IP][self.UDP], port, True)
1092 self.logger.info("ACLP_TEST_FINISH_0021")
1094 def test_0022_zero_length_udp_ipv4(self):
1095 """ VPP-687 zero length udp ipv4 packet"""
1096 self.logger.info("ACLP_TEST_START_0022")
1098 port = random.randint(0, 65535)
1101 rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
1102 self.proto[self.IP][self.UDP]))
1103 # deny ip any any in the end
1105 self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1108 self.apply_rules(rules, "permit empty udp ip4 " + str(port))
1110 # Traffic should still pass
1111 # Create incoming packet streams for packet-generator interfaces
1113 pkts = self.create_stream(self.pg0, self.pg_if_packet_sizes,
1115 self.proto[self.IP][self.UDP], port,
1118 self.pg0.add_stream(pkts)
1119 pkts_cnt += len(pkts)
1121 # Enable packet capture and start packet sendingself.IPV
1122 self.pg_enable_capture(self.pg_interfaces)
1125 self.pg1.get_capture(pkts_cnt)
1127 self.logger.info("ACLP_TEST_FINISH_0022")
1129 def test_0023_zero_length_udp_ipv6(self):
1130 """ VPP-687 zero length udp ipv6 packet"""
1131 self.logger.info("ACLP_TEST_START_0023")
1133 port = random.randint(0, 65535)
1136 rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
1137 self.proto[self.IP][self.UDP]))
1138 # deny ip any any in the end
1139 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1142 self.apply_rules(rules, "permit empty udp ip6 "+str(port))
1144 # Traffic should still pass
1145 # Create incoming packet streams for packet-generator interfaces
1147 pkts = self.create_stream(self.pg0, self.pg_if_packet_sizes,
1149 self.proto[self.IP][self.UDP], port,
1152 self.pg0.add_stream(pkts)
1153 pkts_cnt += len(pkts)
1155 # Enable packet capture and start packet sendingself.IPV
1156 self.pg_enable_capture(self.pg_interfaces)
1159 # Verify outgoing packet streams per packet-generator interface
1160 self.pg1.get_capture(pkts_cnt)
1162 self.logger.info("ACLP_TEST_FINISH_0023")
1164 def test_0108_tcp_permit_v4(self):
1165 """ permit TCPv4 + non-match range
1167 self.logger.info("ACLP_TEST_START_0108")
1171 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1172 self.proto[self.IP][self.TCP]))
1173 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1174 self.proto[self.IP][self.TCP]))
1175 # deny ip any any in the end
1176 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1179 self.apply_rules(rules, "permit ipv4 tcp")
1181 # Traffic should still pass
1182 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP])
1184 self.logger.info("ACLP_TEST_FINISH_0108")
1186 def test_0109_tcp_permit_v6(self):
1187 """ permit TCPv6 + non-match range
1189 self.logger.info("ACLP_TEST_START_0109")
1193 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE_2,
1194 self.proto[self.IP][self.TCP]))
1195 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
1196 self.proto[self.IP][self.TCP]))
1197 # deny ip any any in the end
1198 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1201 self.apply_rules(rules, "permit ip6 tcp")
1203 # Traffic should still pass
1204 self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.TCP])
1206 self.logger.info("ACLP_TEST_FINISH_0109")
1208 def test_0110_udp_permit_v4(self):
1209 """ permit UDPv4 + non-match range
1211 self.logger.info("ACLP_TEST_START_0110")
1215 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1216 self.proto[self.IP][self.UDP]))
1217 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1218 self.proto[self.IP][self.UDP]))
1219 # deny ip any any in the end
1220 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1223 self.apply_rules(rules, "permit ipv4 udp")
1225 # Traffic should still pass
1226 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP])
1228 self.logger.info("ACLP_TEST_FINISH_0110")
1230 def test_0111_udp_permit_v6(self):
1231 """ permit UDPv6 + non-match range
1233 self.logger.info("ACLP_TEST_START_0111")
1237 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE_2,
1238 self.proto[self.IP][self.UDP]))
1239 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
1240 self.proto[self.IP][self.UDP]))
1241 # deny ip any any in the end
1242 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1245 self.apply_rules(rules, "permit ip6 udp")
1247 # Traffic should still pass
1248 self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.UDP])
1250 self.logger.info("ACLP_TEST_FINISH_0111")
1252 def test_0112_tcp_deny(self):
1253 """ deny TCPv4/v6 + non-match range
1255 self.logger.info("ACLP_TEST_START_0112")
1259 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1261 self.proto[self.IP][self.TCP]))
1262 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1264 self.proto[self.IP][self.TCP]))
1265 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
1266 self.proto[self.IP][self.TCP]))
1267 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
1268 self.proto[self.IP][self.TCP]))
1269 # permit ip any any in the end
1270 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1272 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1276 self.apply_rules(rules, "deny ip4/ip6 tcp")
1278 # Traffic should not pass
1279 self.run_verify_negat_test(self.IP, self.IPRANDOM,
1280 self.proto[self.IP][self.TCP])
1282 self.logger.info("ACLP_TEST_FINISH_0112")
1284 def test_0113_udp_deny(self):
1285 """ deny UDPv4/v6 + non-match range
1287 self.logger.info("ACLP_TEST_START_0113")
1291 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1293 self.proto[self.IP][self.UDP]))
1294 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1296 self.proto[self.IP][self.UDP]))
1297 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
1298 self.proto[self.IP][self.UDP]))
1299 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
1300 self.proto[self.IP][self.UDP]))
1301 # permit ip any any in the end
1302 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1304 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1308 self.apply_rules(rules, "deny ip4/ip6 udp")
1310 # Traffic should not pass
1311 self.run_verify_negat_test(self.IP, self.IPRANDOM,
1312 self.proto[self.IP][self.UDP])
1314 self.logger.info("ACLP_TEST_FINISH_0113")
1317 if __name__ == '__main__':
1318 unittest.main(testRunner=VppTestRunner)