2 """ACL plugin Test Case HLD:
8 from scapy.packet import Raw
9 from scapy.layers.l2 import Ether
10 from scapy.layers.inet import IP, TCP, UDP, ICMP
11 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest
12 from scapy.layers.inet6 import IPv6ExtHdrFragment
13 from framework import VppTestCase, VppTestRunner
14 from util import Host, ppp
17 class TestACLplugin(VppTestCase):
18 """ ACL plugin Test Case """
34 proto = [[6, 17], [1, 58]]
35 proto_map = {1: 'ICMP', 58: 'ICMPv6EchoRequest', 6: 'TCP', 17: 'UDP'}
47 udp_sport_to = udp_sport_from + 5
48 udp_dport_from = 20000
49 udp_dport_to = udp_dport_from + 5000
51 tcp_sport_to = tcp_sport_from + 5
52 tcp_dport_from = 40000
53 tcp_dport_to = tcp_dport_from + 5000
56 udp_sport_to_2 = udp_sport_from_2 + 5
57 udp_dport_from_2 = 30000
58 udp_dport_to_2 = udp_dport_from_2 + 5000
59 tcp_sport_from_2 = 130
60 tcp_sport_to_2 = tcp_sport_from_2 + 5
61 tcp_dport_from_2 = 20000
62 tcp_dport_to_2 = tcp_dport_from_2 + 5000
64 icmp4_type = 8 # echo request
66 icmp6_type = 128 # echo request
82 Perform standard class setup (defined by class method setUpClass in
83 class VppTestCase) before running the test case, set test case related
84 variables and configure VPP.
86 super(TestACLplugin, cls).setUpClass()
89 # Create 2 pg interfaces
90 cls.create_pg_interfaces(range(2))
92 # Packet flows mapping pg0 -> pg1, pg2 etc.
94 cls.flows[cls.pg0] = [cls.pg1]
97 cls.pg_if_packet_sizes = [64, 512, 1518, 9018]
99 # Create BD with MAC learning and unknown unicast flooding disabled
100 # and put interfaces to this BD
101 cls.vapi.bridge_domain_add_del(bd_id=cls.bd_id, uu_flood=1,
103 for pg_if in cls.pg_interfaces:
104 cls.vapi.sw_interface_set_l2_bridge(pg_if.sw_if_index,
107 # Set up all interfaces
108 for i in cls.pg_interfaces:
111 # Mapping between packet-generator index and lists of test hosts
112 cls.hosts_by_pg_idx = dict()
113 for pg_if in cls.pg_interfaces:
114 cls.hosts_by_pg_idx[pg_if.sw_if_index] = []
116 # Create list of deleted hosts
117 cls.deleted_hosts_by_pg_idx = dict()
118 for pg_if in cls.pg_interfaces:
119 cls.deleted_hosts_by_pg_idx[pg_if.sw_if_index] = []
121 # warm-up the mac address tables
125 super(TestACLplugin, cls).tearDownClass()
129 super(TestACLplugin, self).setUp()
130 self.reset_packet_infos()
134 Show various debug prints after each test.
136 super(TestACLplugin, self).tearDown()
137 if not self.vpp_dead:
138 self.logger.info(self.vapi.ppcli("show l2fib verbose"))
139 self.logger.info(self.vapi.ppcli("show acl-plugin acl"))
140 self.logger.info(self.vapi.ppcli("show acl-plugin interface"))
141 self.logger.info(self.vapi.ppcli("show acl-plugin tables"))
142 self.logger.info(self.vapi.ppcli("show bridge-domain %s detail"
145 def create_hosts(self, count, start=0):
147 Create required number of host MAC addresses and distribute them among
148 interfaces. Create host IPv4 address for every host MAC address.
150 :param int count: Number of hosts to create MAC/IPv4 addresses for.
151 :param int start: Number to start numbering from.
153 n_int = len(self.pg_interfaces)
154 macs_per_if = count / n_int
156 for pg_if in self.pg_interfaces:
158 start_nr = macs_per_if * i + start
159 end_nr = count + start if i == (n_int - 1) \
160 else macs_per_if * (i + 1) + start
161 hosts = self.hosts_by_pg_idx[pg_if.sw_if_index]
162 for j in range(start_nr, end_nr):
164 "00:00:00:ff:%02x:%02x" % (pg_if.sw_if_index, j),
165 "172.17.1%02x.%u" % (pg_if.sw_if_index, j),
166 "2017:dead:%02x::%u" % (pg_if.sw_if_index, j))
169 def create_rule(self, ip=0, permit_deny=0, ports=PORTS_ALL, proto=-1,
170 s_prefix=0, s_ip='\x00\x00\x00\x00',
171 d_prefix=0, d_ip='\x00\x00\x00\x00'):
174 if ports == self.PORTS_ALL:
177 sport_to = 65535 if proto != 1 and proto != 58 else 255
179 elif ports == self.PORTS_RANGE:
181 sport_from = self.icmp4_type
182 sport_to = self.icmp4_type
183 dport_from = self.icmp4_code
184 dport_to = self.icmp4_code
186 sport_from = self.icmp6_type
187 sport_to = self.icmp6_type
188 dport_from = self.icmp6_code
189 dport_to = self.icmp6_code
190 elif proto == self.proto[self.IP][self.TCP]:
191 sport_from = self.tcp_sport_from
192 sport_to = self.tcp_sport_to
193 dport_from = self.tcp_dport_from
194 dport_to = self.tcp_dport_to
195 elif proto == self.proto[self.IP][self.UDP]:
196 sport_from = self.udp_sport_from
197 sport_to = self.udp_sport_to
198 dport_from = self.udp_dport_from
199 dport_to = self.udp_dport_to
200 elif ports == self.PORTS_RANGE_2:
202 sport_from = self.icmp4_type_2
203 sport_to = self.icmp4_type_2
204 dport_from = self.icmp4_code_from_2
205 dport_to = self.icmp4_code_to_2
207 sport_from = self.icmp6_type_2
208 sport_to = self.icmp6_type_2
209 dport_from = self.icmp6_code_from_2
210 dport_to = self.icmp6_code_to_2
211 elif proto == self.proto[self.IP][self.TCP]:
212 sport_from = self.tcp_sport_from_2
213 sport_to = self.tcp_sport_to_2
214 dport_from = self.tcp_dport_from_2
215 dport_to = self.tcp_dport_to_2
216 elif proto == self.proto[self.IP][self.UDP]:
217 sport_from = self.udp_sport_from_2
218 sport_to = self.udp_sport_to_2
219 dport_from = self.udp_dport_from_2
220 dport_to = self.udp_dport_to_2
227 rule = ({'is_permit': permit_deny, 'is_ipv6': ip, 'proto': proto,
228 'srcport_or_icmptype_first': sport_from,
229 'srcport_or_icmptype_last': sport_to,
230 'src_ip_prefix_len': s_prefix,
232 'dstport_or_icmpcode_first': dport_from,
233 'dstport_or_icmpcode_last': dport_to,
234 'dst_ip_prefix_len': d_prefix,
235 'dst_ip_addr': d_ip})
238 def apply_rules(self, rules, tag=''):
239 reply = self.vapi.acl_add_replace(acl_index=4294967295, r=rules,
241 self.logger.info("Dumped ACL: " + str(
242 self.vapi.acl_dump(reply.acl_index)))
243 # Apply a ACL on the interface as inbound
244 for i in self.pg_interfaces:
245 self.vapi.acl_interface_set_acl_list(sw_if_index=i.sw_if_index,
247 acls=[reply.acl_index])
250 def create_upper_layer(self, packet_index, proto, ports=0):
251 p = self.proto_map[proto]
254 return UDP(sport=random.randint(self.udp_sport_from,
256 dport=random.randint(self.udp_dport_from,
259 return UDP(sport=ports, dport=ports)
262 return TCP(sport=random.randint(self.tcp_sport_from,
264 dport=random.randint(self.tcp_dport_from,
267 return TCP(sport=ports, dport=ports)
270 def create_stream(self, src_if, packet_sizes, traffic_type=0, ipv6=0,
271 proto=-1, ports=0, fragments=False, pkt_raw=True):
273 Create input packet stream for defined interface using hosts or
276 :param object src_if: Interface to create packet stream for.
277 :param list packet_sizes: List of required packet sizes.
278 :param traffic_type: 1: ICMP packet, 2: IPv6 with EH, 0: otherwise.
279 :return: Stream of packets.
282 if self.flows.__contains__(src_if):
283 src_hosts = self.hosts_by_pg_idx[src_if.sw_if_index]
284 for dst_if in self.flows[src_if]:
285 dst_hosts = self.hosts_by_pg_idx[dst_if.sw_if_index]
286 n_int = len(dst_hosts) * len(src_hosts)
287 for i in range(0, n_int):
288 dst_host = dst_hosts[i / len(src_hosts)]
289 src_host = src_hosts[i % len(src_hosts)]
290 pkt_info = self.create_packet_info(src_if, dst_if)
296 pkt_info.ip = random.choice([0, 1])
298 pkt_info.proto = random.choice(self.proto[self.IP])
300 pkt_info.proto = proto
301 payload = self.info_to_payload(pkt_info)
302 p = Ether(dst=dst_host.mac, src=src_host.mac)
304 p /= IPv6(dst=dst_host.ip6, src=src_host.ip6)
306 p /= IPv6ExtHdrFragment(offset=64, m=1)
309 p /= IP(src=src_host.ip4, dst=dst_host.ip4,
312 p /= IP(src=src_host.ip4, dst=dst_host.ip4)
313 if traffic_type == self.ICMP:
315 p /= ICMPv6EchoRequest(type=self.icmp6_type,
316 code=self.icmp6_code)
318 p /= ICMP(type=self.icmp4_type,
319 code=self.icmp4_code)
321 p /= self.create_upper_layer(i, pkt_info.proto, ports)
324 pkt_info.data = p.copy()
326 size = random.choice(packet_sizes)
327 self.extend_packet(p, size)
331 def verify_capture(self, pg_if, capture, traffic_type=0, ip_type=0):
333 Verify captured input packet stream for defined interface.
335 :param object pg_if: Interface to verify captured packet stream for.
336 :param list capture: Captured packet stream.
337 :param traffic_type: 1: ICMP packet, 2: IPv6 with EH, 0: otherwise.
340 for i in self.pg_interfaces:
341 last_info[i.sw_if_index] = None
342 dst_sw_if_index = pg_if.sw_if_index
343 for packet in capture:
345 # Raw data for ICMPv6 are stored in ICMPv6EchoRequest.data
346 if traffic_type == self.ICMP and ip_type == self.IPV6:
347 payload_info = self.payload_to_info(
348 packet[ICMPv6EchoRequest].data)
349 payload = packet[ICMPv6EchoRequest]
351 payload_info = self.payload_to_info(str(packet[Raw]))
352 payload = packet[self.proto_map[payload_info.proto]]
354 self.logger.error(ppp("Unexpected or invalid packet "
355 "(outside network):", packet))
359 self.assertEqual(payload_info.ip, ip_type)
360 if traffic_type == self.ICMP:
362 if payload_info.ip == 0:
363 self.assertEqual(payload.type, self.icmp4_type)
364 self.assertEqual(payload.code, self.icmp4_code)
366 self.assertEqual(payload.type, self.icmp6_type)
367 self.assertEqual(payload.code, self.icmp6_code)
369 self.logger.error(ppp("Unexpected or invalid packet "
370 "(outside network):", packet))
374 ip_version = IPv6 if payload_info.ip == 1 else IP
376 ip = packet[ip_version]
377 packet_index = payload_info.index
379 self.assertEqual(payload_info.dst, dst_sw_if_index)
380 self.logger.debug("Got packet on port %s: src=%u (id=%u)" %
381 (pg_if.name, payload_info.src,
383 next_info = self.get_next_packet_info_for_interface2(
384 payload_info.src, dst_sw_if_index,
385 last_info[payload_info.src])
386 last_info[payload_info.src] = next_info
387 self.assertTrue(next_info is not None)
388 self.assertEqual(packet_index, next_info.index)
389 saved_packet = next_info.data
390 # Check standard fields
391 self.assertEqual(ip.src, saved_packet[ip_version].src)
392 self.assertEqual(ip.dst, saved_packet[ip_version].dst)
393 p = self.proto_map[payload_info.proto]
396 self.assertEqual(tcp.sport, saved_packet[
398 self.assertEqual(tcp.dport, saved_packet[
402 self.assertEqual(udp.sport, saved_packet[
404 self.assertEqual(udp.dport, saved_packet[
407 self.logger.error(ppp("Unexpected or invalid packet:",
410 for i in self.pg_interfaces:
411 remaining_packet = self.get_next_packet_info_for_interface2(
412 i, dst_sw_if_index, last_info[i.sw_if_index])
414 remaining_packet is None,
415 "Port %u: Packet expected from source %u didn't arrive" %
416 (dst_sw_if_index, i.sw_if_index))
418 def run_traffic_no_check(self):
420 # Create incoming packet streams for packet-generator interfaces
421 for i in self.pg_interfaces:
422 if self.flows.__contains__(i):
423 pkts = self.create_stream(i, self.pg_if_packet_sizes)
427 # Enable packet capture and start packet sending
428 self.pg_enable_capture(self.pg_interfaces)
431 def run_verify_test(self, traffic_type=0, ip_type=0, proto=-1, ports=0,
432 frags=False, pkt_raw=True):
434 # Create incoming packet streams for packet-generator interfaces
436 for i in self.pg_interfaces:
437 if self.flows.__contains__(i):
438 pkts = self.create_stream(i, self.pg_if_packet_sizes,
439 traffic_type, ip_type, proto, ports,
443 pkts_cnt += len(pkts)
445 # Enable packet capture and start packet sendingself.IPV
446 self.pg_enable_capture(self.pg_interfaces)
450 # Verify outgoing packet streams per packet-generator interface
451 for src_if in self.pg_interfaces:
452 if self.flows.__contains__(src_if):
453 for dst_if in self.flows[src_if]:
454 capture = dst_if.get_capture(pkts_cnt)
455 self.logger.info("Verifying capture on interface %s" %
457 self.verify_capture(dst_if, capture, traffic_type, ip_type)
459 def run_verify_negat_test(self, traffic_type=0, ip_type=0, proto=-1,
460 ports=0, frags=False):
462 self.reset_packet_infos()
463 for i in self.pg_interfaces:
464 if self.flows.__contains__(i):
465 pkts = self.create_stream(i, self.pg_if_packet_sizes,
466 traffic_type, ip_type, proto, ports,
471 # Enable packet capture and start packet sending
472 self.pg_enable_capture(self.pg_interfaces)
476 # Verify outgoing packet streams per packet-generator interface
477 for src_if in self.pg_interfaces:
478 if self.flows.__contains__(src_if):
479 for dst_if in self.flows[src_if]:
480 self.logger.info("Verifying capture on interface %s" %
482 capture = dst_if.get_capture(0)
483 self.assertEqual(len(capture), 0)
485 def test_0000_warmup_test(self):
486 """ ACL plugin version check; learn MACs
488 self.create_hosts(16)
489 self.run_traffic_no_check()
490 reply = self.vapi.papi.acl_plugin_get_version()
491 self.assertEqual(reply.major, 1)
492 self.logger.info("Working with ACL plugin version: %d.%d" % (
493 reply.major, reply.minor))
494 # minor version changes are non breaking
495 # self.assertEqual(reply.minor, 0)
497 def test_0001_acl_create(self):
498 """ ACL create/delete test
501 self.logger.info("ACLP_TEST_START_0001")
503 r = [{'is_permit': 1, 'is_ipv6': 0, 'proto': 17,
504 'srcport_or_icmptype_first': 1234,
505 'srcport_or_icmptype_last': 1235,
506 'src_ip_prefix_len': 0,
507 'src_ip_addr': '\x00\x00\x00\x00',
508 'dstport_or_icmpcode_first': 1234,
509 'dstport_or_icmpcode_last': 1234,
510 'dst_ip_addr': '\x00\x00\x00\x00',
511 'dst_ip_prefix_len': 0}]
512 # Test 1: add a new ACL
513 reply = self.vapi.acl_add_replace(acl_index=4294967295, r=r,
515 self.assertEqual(reply.retval, 0)
516 # The very first ACL gets #0
517 self.assertEqual(reply.acl_index, 0)
518 first_acl = reply.acl_index
519 rr = self.vapi.acl_dump(reply.acl_index)
520 self.logger.info("Dumped ACL: " + str(rr))
521 self.assertEqual(len(rr), 1)
522 # We should have the same number of ACL entries as we had asked
523 self.assertEqual(len(rr[0].r), len(r))
524 # The rules should be the same. But because the submitted and returned
525 # are different types, we need to iterate over rules and keys to get
527 for i_rule in range(0, len(r) - 1):
528 for rule_key in r[i_rule]:
529 self.assertEqual(rr[0].r[i_rule][rule_key],
532 # Add a deny-1234 ACL
533 r_deny = [{'is_permit': 0, 'is_ipv6': 0, 'proto': 17,
534 'srcport_or_icmptype_first': 1234,
535 'srcport_or_icmptype_last': 1235,
536 'src_ip_prefix_len': 0,
537 'src_ip_addr': '\x00\x00\x00\x00',
538 'dstport_or_icmpcode_first': 1234,
539 'dstport_or_icmpcode_last': 1234,
540 'dst_ip_addr': '\x00\x00\x00\x00',
541 'dst_ip_prefix_len': 0},
542 {'is_permit': 1, 'is_ipv6': 0, 'proto': 17,
543 'srcport_or_icmptype_first': 0,
544 'srcport_or_icmptype_last': 0,
545 'src_ip_prefix_len': 0,
546 'src_ip_addr': '\x00\x00\x00\x00',
547 'dstport_or_icmpcode_first': 0,
548 'dstport_or_icmpcode_last': 0,
549 'dst_ip_addr': '\x00\x00\x00\x00',
550 'dst_ip_prefix_len': 0}]
552 reply = self.vapi.acl_add_replace(acl_index=4294967295, r=r_deny,
553 tag="deny 1234;permit all")
554 self.assertEqual(reply.retval, 0)
555 # The second ACL gets #1
556 self.assertEqual(reply.acl_index, 1)
557 second_acl = reply.acl_index
559 # Test 2: try to modify a nonexistent ACL
560 reply = self.vapi.acl_add_replace(acl_index=432, r=r,
561 tag="FFFF:FFFF", expected_retval=-6)
562 self.assertEqual(reply.retval, -6)
563 # The ACL number should pass through
564 self.assertEqual(reply.acl_index, 432)
565 # apply an ACL on an interface inbound, try to delete ACL, must fail
566 self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
569 reply = self.vapi.acl_del(acl_index=first_acl, expected_retval=-142)
570 # Unapply an ACL and then try to delete it - must be ok
571 self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
574 reply = self.vapi.acl_del(acl_index=first_acl, expected_retval=0)
576 # apply an ACL on an interface outbound, try to delete ACL, must fail
577 self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
580 reply = self.vapi.acl_del(acl_index=second_acl, expected_retval=-143)
581 # Unapply the ACL and then try to delete it - must be ok
582 self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
585 reply = self.vapi.acl_del(acl_index=second_acl, expected_retval=0)
587 # try to apply a nonexistent ACL - must fail
588 self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
593 self.logger.info("ACLP_TEST_FINISH_0001")
595 def test_0002_acl_permit_apply(self):
596 """ permit ACL apply test
598 self.logger.info("ACLP_TEST_START_0002")
601 rules.append(self.create_rule(self.IPV4, self.PERMIT,
602 0, self.proto[self.IP][self.UDP]))
603 rules.append(self.create_rule(self.IPV4, self.PERMIT,
604 0, self.proto[self.IP][self.TCP]))
607 self.apply_rules(rules, "permit per-flow")
609 # Traffic should still pass
610 self.run_verify_test(self.IP, self.IPV4, -1)
611 self.logger.info("ACLP_TEST_FINISH_0002")
613 def test_0003_acl_deny_apply(self):
614 """ deny ACL apply test
616 self.logger.info("ACLP_TEST_START_0003")
617 # Add a deny-flows ACL
619 rules.append(self.create_rule(self.IPV4, self.DENY,
620 self.PORTS_ALL, self.proto[self.IP][self.UDP]))
621 # Permit ip any any in the end
622 rules.append(self.create_rule(self.IPV4, self.PERMIT,
626 self.apply_rules(rules, "deny per-flow;permit all")
628 # Traffic should not pass
629 self.run_verify_negat_test(self.IP, self.IPV4,
630 self.proto[self.IP][self.UDP])
631 self.logger.info("ACLP_TEST_FINISH_0003")
632 # self.assertEqual(1, 0)
634 def test_0004_vpp624_permit_icmpv4(self):
635 """ VPP_624 permit ICMPv4
637 self.logger.info("ACLP_TEST_START_0004")
641 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
642 self.proto[self.ICMP][self.ICMPv4]))
643 # deny ip any any in the end
644 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
647 self.apply_rules(rules, "permit icmpv4")
649 # Traffic should still pass
650 self.run_verify_test(self.ICMP, self.IPV4,
651 self.proto[self.ICMP][self.ICMPv4])
653 self.logger.info("ACLP_TEST_FINISH_0004")
655 def test_0005_vpp624_permit_icmpv6(self):
656 """ VPP_624 permit ICMPv6
658 self.logger.info("ACLP_TEST_START_0005")
662 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
663 self.proto[self.ICMP][self.ICMPv6]))
664 # deny ip any any in the end
665 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
668 self.apply_rules(rules, "permit icmpv6")
670 # Traffic should still pass
671 self.run_verify_test(self.ICMP, self.IPV6,
672 self.proto[self.ICMP][self.ICMPv6])
674 self.logger.info("ACLP_TEST_FINISH_0005")
676 def test_0006_vpp624_deny_icmpv4(self):
677 """ VPP_624 deny ICMPv4
679 self.logger.info("ACLP_TEST_START_0006")
682 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
683 self.proto[self.ICMP][self.ICMPv4]))
684 # permit ip any any in the end
685 rules.append(self.create_rule(self.IPV4, self.PERMIT,
689 self.apply_rules(rules, "deny icmpv4")
691 # Traffic should not pass
692 self.run_verify_negat_test(self.ICMP, self.IPV4, 0)
694 self.logger.info("ACLP_TEST_FINISH_0006")
696 def test_0007_vpp624_deny_icmpv6(self):
697 """ VPP_624 deny ICMPv6
699 self.logger.info("ACLP_TEST_START_0007")
702 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
703 self.proto[self.ICMP][self.ICMPv6]))
704 # deny ip any any in the end
705 rules.append(self.create_rule(self.IPV6, self.PERMIT,
709 self.apply_rules(rules, "deny icmpv6")
711 # Traffic should not pass
712 self.run_verify_negat_test(self.ICMP, self.IPV6, 0)
714 self.logger.info("ACLP_TEST_FINISH_0007")
716 def test_0008_tcp_permit_v4(self):
719 self.logger.info("ACLP_TEST_START_0008")
723 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
724 self.proto[self.IP][self.TCP]))
725 # deny ip any any in the end
726 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
729 self.apply_rules(rules, "permit ipv4 tcp")
731 # Traffic should still pass
732 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP])
734 self.logger.info("ACLP_TEST_FINISH_0008")
736 def test_0009_tcp_permit_v6(self):
739 self.logger.info("ACLP_TEST_START_0009")
743 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
744 self.proto[self.IP][self.TCP]))
745 # deny ip any any in the end
746 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
749 self.apply_rules(rules, "permit ip6 tcp")
751 # Traffic should still pass
752 self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.TCP])
754 self.logger.info("ACLP_TEST_FINISH_0008")
756 def test_0010_udp_permit_v4(self):
759 self.logger.info("ACLP_TEST_START_0010")
763 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
764 self.proto[self.IP][self.UDP]))
765 # deny ip any any in the end
766 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
769 self.apply_rules(rules, "permit ipv udp")
771 # Traffic should still pass
772 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP])
774 self.logger.info("ACLP_TEST_FINISH_0010")
776 def test_0011_udp_permit_v6(self):
779 self.logger.info("ACLP_TEST_START_0011")
783 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
784 self.proto[self.IP][self.UDP]))
785 # deny ip any any in the end
786 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
789 self.apply_rules(rules, "permit ip6 udp")
791 # Traffic should still pass
792 self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.UDP])
794 self.logger.info("ACLP_TEST_FINISH_0011")
796 def test_0012_tcp_deny(self):
799 self.logger.info("ACLP_TEST_START_0012")
803 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
804 self.proto[self.IP][self.TCP]))
805 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
806 self.proto[self.IP][self.TCP]))
807 # permit ip any any in the end
808 rules.append(self.create_rule(self.IPV4, self.PERMIT,
810 rules.append(self.create_rule(self.IPV6, self.PERMIT,
814 self.apply_rules(rules, "deny ip4/ip6 tcp")
816 # Traffic should not pass
817 self.run_verify_negat_test(self.IP, self.IPRANDOM,
818 self.proto[self.IP][self.TCP])
820 self.logger.info("ACLP_TEST_FINISH_0012")
822 def test_0013_udp_deny(self):
825 self.logger.info("ACLP_TEST_START_0013")
829 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
830 self.proto[self.IP][self.UDP]))
831 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
832 self.proto[self.IP][self.UDP]))
833 # permit ip any any in the end
834 rules.append(self.create_rule(self.IPV4, self.PERMIT,
836 rules.append(self.create_rule(self.IPV6, self.PERMIT,
840 self.apply_rules(rules, "deny ip4/ip6 udp")
842 # Traffic should not pass
843 self.run_verify_negat_test(self.IP, self.IPRANDOM,
844 self.proto[self.IP][self.UDP])
846 self.logger.info("ACLP_TEST_FINISH_0013")
848 def test_0014_acl_dump(self):
849 """ verify add/dump acls
851 self.logger.info("ACLP_TEST_START_0014")
853 r = [[self.IPV4, self.PERMIT, 1234, self.proto[self.IP][self.TCP]],
854 [self.IPV4, self.PERMIT, 2345, self.proto[self.IP][self.UDP]],
855 [self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.TCP]],
856 [self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.UDP]],
857 [self.IPV4, self.PERMIT, 5, self.proto[self.ICMP][self.ICMPv4]],
858 [self.IPV6, self.PERMIT, 4321, self.proto[self.IP][self.TCP]],
859 [self.IPV6, self.PERMIT, 5432, self.proto[self.IP][self.UDP]],
860 [self.IPV6, self.PERMIT, 0, self.proto[self.IP][self.TCP]],
861 [self.IPV6, self.PERMIT, 0, self.proto[self.IP][self.UDP]],
862 [self.IPV6, self.PERMIT, 6, self.proto[self.ICMP][self.ICMPv6]],
863 [self.IPV4, self.DENY, self.PORTS_ALL, 0],
864 [self.IPV4, self.DENY, 1234, self.proto[self.IP][self.TCP]],
865 [self.IPV4, self.DENY, 2345, self.proto[self.IP][self.UDP]],
866 [self.IPV4, self.DENY, 5, self.proto[self.ICMP][self.ICMPv4]],
867 [self.IPV6, self.DENY, 4321, self.proto[self.IP][self.TCP]],
868 [self.IPV6, self.DENY, 5432, self.proto[self.IP][self.UDP]],
869 [self.IPV6, self.DENY, 6, self.proto[self.ICMP][self.ICMPv6]],
870 [self.IPV6, self.DENY, self.PORTS_ALL, 0]
873 # Add and verify new ACLs
875 for i in range(len(r)):
876 rules.append(self.create_rule(r[i][0], r[i][1], r[i][2], r[i][3]))
878 reply = self.vapi.acl_add_replace(acl_index=4294967295, r=rules)
879 result = self.vapi.acl_dump(reply.acl_index)
882 for drules in result:
884 self.assertEqual(dr.is_ipv6, r[i][0])
885 self.assertEqual(dr.is_permit, r[i][1])
886 self.assertEqual(dr.proto, r[i][3])
889 self.assertEqual(dr.srcport_or_icmptype_first, r[i][2])
892 self.assertEqual(dr.srcport_or_icmptype_first, 0)
893 self.assertEqual(dr.srcport_or_icmptype_last, 65535)
895 if dr.proto == self.proto[self.IP][self.TCP]:
896 self.assertGreater(dr.srcport_or_icmptype_first,
897 self.tcp_sport_from-1)
898 self.assertLess(dr.srcport_or_icmptype_first,
900 self.assertGreater(dr.dstport_or_icmpcode_last,
901 self.tcp_dport_from-1)
902 self.assertLess(dr.dstport_or_icmpcode_last,
904 elif dr.proto == self.proto[self.IP][self.UDP]:
905 self.assertGreater(dr.srcport_or_icmptype_first,
906 self.udp_sport_from-1)
907 self.assertLess(dr.srcport_or_icmptype_first,
909 self.assertGreater(dr.dstport_or_icmpcode_last,
910 self.udp_dport_from-1)
911 self.assertLess(dr.dstport_or_icmpcode_last,
915 self.logger.info("ACLP_TEST_FINISH_0014")
917 def test_0015_tcp_permit_port_v4(self):
918 """ permit single TCPv4
920 self.logger.info("ACLP_TEST_START_0015")
922 port = random.randint(0, 65535)
925 rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
926 self.proto[self.IP][self.TCP]))
927 # deny ip any any in the end
928 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
931 self.apply_rules(rules, "permit ip4 tcp "+str(port))
933 # Traffic should still pass
934 self.run_verify_test(self.IP, self.IPV4,
935 self.proto[self.IP][self.TCP], port)
937 self.logger.info("ACLP_TEST_FINISH_0015")
939 def test_0016_udp_permit_port_v4(self):
940 """ permit single UDPv4
942 self.logger.info("ACLP_TEST_START_0016")
944 port = random.randint(0, 65535)
947 rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
948 self.proto[self.IP][self.UDP]))
949 # deny ip any any in the end
950 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
953 self.apply_rules(rules, "permit ip4 tcp "+str(port))
955 # Traffic should still pass
956 self.run_verify_test(self.IP, self.IPV4,
957 self.proto[self.IP][self.UDP], port)
959 self.logger.info("ACLP_TEST_FINISH_0016")
961 def test_0017_tcp_permit_port_v6(self):
962 """ permit single TCPv6
964 self.logger.info("ACLP_TEST_START_0017")
966 port = random.randint(0, 65535)
969 rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
970 self.proto[self.IP][self.TCP]))
971 # deny ip any any in the end
972 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
975 self.apply_rules(rules, "permit ip4 tcp "+str(port))
977 # Traffic should still pass
978 self.run_verify_test(self.IP, self.IPV6,
979 self.proto[self.IP][self.TCP], port)
981 self.logger.info("ACLP_TEST_FINISH_0017")
983 def test_0018_udp_permit_port_v6(self):
984 """ permit single UPPv6
986 self.logger.info("ACLP_TEST_START_0018")
988 port = random.randint(0, 65535)
991 rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
992 self.proto[self.IP][self.UDP]))
993 # deny ip any any in the end
994 rules.append(self.create_rule(self.IPV6, self.DENY,
998 self.apply_rules(rules, "permit ip4 tcp "+str(port))
1000 # Traffic should still pass
1001 self.run_verify_test(self.IP, self.IPV6,
1002 self.proto[self.IP][self.UDP], port)
1004 self.logger.info("ACLP_TEST_FINISH_0018")
1006 def test_0019_udp_deny_port(self):
1007 """ deny single TCPv4/v6
1009 self.logger.info("ACLP_TEST_START_0019")
1011 port = random.randint(0, 65535)
1014 rules.append(self.create_rule(self.IPV4, self.DENY, port,
1015 self.proto[self.IP][self.TCP]))
1016 rules.append(self.create_rule(self.IPV6, self.DENY, port,
1017 self.proto[self.IP][self.TCP]))
1018 # Permit ip any any in the end
1019 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1021 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1025 self.apply_rules(rules, "deny ip4/ip6 udp "+str(port))
1027 # Traffic should not pass
1028 self.run_verify_negat_test(self.IP, self.IPRANDOM,
1029 self.proto[self.IP][self.TCP], port)
1031 self.logger.info("ACLP_TEST_FINISH_0019")
1033 def test_0020_udp_deny_port(self):
1034 """ deny single UDPv4/v6
1036 self.logger.info("ACLP_TEST_START_0020")
1038 port = random.randint(0, 65535)
1041 rules.append(self.create_rule(self.IPV4, self.DENY, port,
1042 self.proto[self.IP][self.UDP]))
1043 rules.append(self.create_rule(self.IPV6, self.DENY, port,
1044 self.proto[self.IP][self.UDP]))
1045 # Permit ip any any in the end
1046 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1048 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1052 self.apply_rules(rules, "deny ip4/ip6 udp "+str(port))
1054 # Traffic should not pass
1055 self.run_verify_negat_test(self.IP, self.IPRANDOM,
1056 self.proto[self.IP][self.UDP], port)
1058 self.logger.info("ACLP_TEST_FINISH_0020")
1060 def test_0021_udp_deny_port_verify_fragment_deny(self):
1061 """ deny single UDPv4/v6, permit ip any, verify non-initial fragment blocked
1063 self.logger.info("ACLP_TEST_START_0021")
1065 port = random.randint(0, 65535)
1068 rules.append(self.create_rule(self.IPV4, self.DENY, port,
1069 self.proto[self.IP][self.UDP]))
1070 rules.append(self.create_rule(self.IPV6, self.DENY, port,
1071 self.proto[self.IP][self.UDP]))
1072 # deny ip any any in the end
1073 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1075 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1079 self.apply_rules(rules, "deny ip4/ip6 udp "+str(port))
1081 # Traffic should not pass
1082 self.run_verify_negat_test(self.IP, self.IPRANDOM,
1083 self.proto[self.IP][self.UDP], port, True)
1085 self.logger.info("ACLP_TEST_FINISH_0021")
1087 def test_0022_zero_length_udp_ipv4(self):
1088 """ VPP-687 zero length udp ipv4 packet"""
1089 self.logger.info("ACLP_TEST_START_0022")
1091 port = random.randint(0, 65535)
1094 rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
1095 self.proto[self.IP][self.UDP]))
1096 # deny ip any any in the end
1098 self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1101 self.apply_rules(rules, "permit empty udp ip4 " + str(port))
1103 # Traffic should still pass
1104 # Create incoming packet streams for packet-generator interfaces
1106 pkts = self.create_stream(self.pg0, self.pg_if_packet_sizes,
1108 self.proto[self.IP][self.UDP], port,
1111 self.pg0.add_stream(pkts)
1112 pkts_cnt += len(pkts)
1114 # Enable packet capture and start packet sendingself.IPV
1115 self.pg_enable_capture(self.pg_interfaces)
1118 self.pg1.get_capture(pkts_cnt)
1120 self.logger.info("ACLP_TEST_FINISH_0022")
1122 def test_0023_zero_length_udp_ipv6(self):
1123 """ VPP-687 zero length udp ipv6 packet"""
1124 self.logger.info("ACLP_TEST_START_0023")
1126 port = random.randint(0, 65535)
1129 rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
1130 self.proto[self.IP][self.UDP]))
1131 # deny ip any any in the end
1132 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1135 self.apply_rules(rules, "permit empty udp ip6 "+str(port))
1137 # Traffic should still pass
1138 # Create incoming packet streams for packet-generator interfaces
1140 pkts = self.create_stream(self.pg0, self.pg_if_packet_sizes,
1142 self.proto[self.IP][self.UDP], port,
1145 self.pg0.add_stream(pkts)
1146 pkts_cnt += len(pkts)
1148 # Enable packet capture and start packet sendingself.IPV
1149 self.pg_enable_capture(self.pg_interfaces)
1152 # Verify outgoing packet streams per packet-generator interface
1153 self.pg1.get_capture(pkts_cnt)
1155 self.logger.info("ACLP_TEST_FINISH_0023")
1157 def test_0108_tcp_permit_v4(self):
1158 """ permit TCPv4 + non-match range
1160 self.logger.info("ACLP_TEST_START_0108")
1164 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1165 self.proto[self.IP][self.TCP]))
1166 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1167 self.proto[self.IP][self.TCP]))
1168 # deny ip any any in the end
1169 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1172 self.apply_rules(rules, "permit ipv4 tcp")
1174 # Traffic should still pass
1175 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP])
1177 self.logger.info("ACLP_TEST_FINISH_0108")
1179 def test_0109_tcp_permit_v6(self):
1180 """ permit TCPv6 + non-match range
1182 self.logger.info("ACLP_TEST_START_0109")
1186 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE_2,
1187 self.proto[self.IP][self.TCP]))
1188 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
1189 self.proto[self.IP][self.TCP]))
1190 # deny ip any any in the end
1191 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1194 self.apply_rules(rules, "permit ip6 tcp")
1196 # Traffic should still pass
1197 self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.TCP])
1199 self.logger.info("ACLP_TEST_FINISH_0109")
1201 def test_0110_udp_permit_v4(self):
1202 """ permit UDPv4 + non-match range
1204 self.logger.info("ACLP_TEST_START_0110")
1208 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1209 self.proto[self.IP][self.UDP]))
1210 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1211 self.proto[self.IP][self.UDP]))
1212 # deny ip any any in the end
1213 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1216 self.apply_rules(rules, "permit ipv4 udp")
1218 # Traffic should still pass
1219 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP])
1221 self.logger.info("ACLP_TEST_FINISH_0110")
1223 def test_0111_udp_permit_v6(self):
1224 """ permit UDPv6 + non-match range
1226 self.logger.info("ACLP_TEST_START_0111")
1230 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE_2,
1231 self.proto[self.IP][self.UDP]))
1232 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
1233 self.proto[self.IP][self.UDP]))
1234 # deny ip any any in the end
1235 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1238 self.apply_rules(rules, "permit ip6 udp")
1240 # Traffic should still pass
1241 self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.UDP])
1243 self.logger.info("ACLP_TEST_FINISH_0111")
1245 def test_0112_tcp_deny(self):
1246 """ deny TCPv4/v6 + non-match range
1248 self.logger.info("ACLP_TEST_START_0112")
1252 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1254 self.proto[self.IP][self.TCP]))
1255 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1257 self.proto[self.IP][self.TCP]))
1258 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
1259 self.proto[self.IP][self.TCP]))
1260 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
1261 self.proto[self.IP][self.TCP]))
1262 # permit ip any any in the end
1263 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1265 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1269 self.apply_rules(rules, "deny ip4/ip6 tcp")
1271 # Traffic should not pass
1272 self.run_verify_negat_test(self.IP, self.IPRANDOM,
1273 self.proto[self.IP][self.TCP])
1275 self.logger.info("ACLP_TEST_FINISH_0112")
1277 def test_0113_udp_deny(self):
1278 """ deny UDPv4/v6 + non-match range
1280 self.logger.info("ACLP_TEST_START_0113")
1284 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1286 self.proto[self.IP][self.UDP]))
1287 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1289 self.proto[self.IP][self.UDP]))
1290 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
1291 self.proto[self.IP][self.UDP]))
1292 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
1293 self.proto[self.IP][self.UDP]))
1294 # permit ip any any in the end
1295 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1297 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1301 self.apply_rules(rules, "deny ip4/ip6 udp")
1303 # Traffic should not pass
1304 self.run_verify_negat_test(self.IP, self.IPRANDOM,
1305 self.proto[self.IP][self.UDP])
1307 self.logger.info("ACLP_TEST_FINISH_0113")
1310 if __name__ == '__main__':
1311 unittest.main(testRunner=VppTestRunner)