2 """ACL plugin Test Case HLD:
8 from scapy.packet import Raw
9 from scapy.layers.l2 import Ether
10 from scapy.layers.inet import IP, TCP, UDP, ICMP
11 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest
12 from scapy.layers.inet6 import IPv6ExtHdrFragment
13 from framework import VppTestCase, VppTestRunner
14 from util import Host, ppp
17 class TestACLplugin(VppTestCase):
18 """ ACL plugin Test Case """
34 proto = [[6, 17], [1, 58]]
35 proto_map = {1: 'ICMP', 58: 'ICMPv6EchoRequest', 6: 'TCP', 17: 'UDP'}
47 udp_sport_to = udp_sport_from + 5
48 udp_dport_from = 20000
49 udp_dport_to = udp_dport_from + 5000
51 tcp_sport_to = tcp_sport_from + 5
52 tcp_dport_from = 40000
53 tcp_dport_to = tcp_dport_from + 5000
56 udp_sport_to_2 = udp_sport_from_2 + 5
57 udp_dport_from_2 = 30000
58 udp_dport_to_2 = udp_dport_from_2 + 5000
59 tcp_sport_from_2 = 130
60 tcp_sport_to_2 = tcp_sport_from_2 + 5
61 tcp_dport_from_2 = 20000
62 tcp_dport_to_2 = tcp_dport_from_2 + 5000
64 icmp4_type = 8 # echo request
66 icmp6_type = 128 # echo request
82 Perform standard class setup (defined by class method setUpClass in
83 class VppTestCase) before running the test case, set test case related
84 variables and configure VPP.
86 super(TestACLplugin, cls).setUpClass()
91 # Create 2 pg interfaces
92 cls.create_pg_interfaces(range(2))
94 # Packet flows mapping pg0 -> pg1, pg2 etc.
96 cls.flows[cls.pg0] = [cls.pg1]
99 cls.pg_if_packet_sizes = [64, 512, 1518, 9018]
101 # Create BD with MAC learning and unknown unicast flooding disabled
102 # and put interfaces to this BD
103 cls.vapi.bridge_domain_add_del(bd_id=cls.bd_id, uu_flood=1,
105 for pg_if in cls.pg_interfaces:
106 cls.vapi.sw_interface_set_l2_bridge(pg_if.sw_if_index,
109 # Set up all interfaces
110 for i in cls.pg_interfaces:
113 # Mapping between packet-generator index and lists of test hosts
114 cls.hosts_by_pg_idx = dict()
115 for pg_if in cls.pg_interfaces:
116 cls.hosts_by_pg_idx[pg_if.sw_if_index] = []
118 # Create list of deleted hosts
119 cls.deleted_hosts_by_pg_idx = dict()
120 for pg_if in cls.pg_interfaces:
121 cls.deleted_hosts_by_pg_idx[pg_if.sw_if_index] = []
123 # warm-up the mac address tables
127 super(TestACLplugin, cls).tearDownClass()
131 super(TestACLplugin, self).setUp()
132 self.reset_packet_infos()
136 Show various debug prints after each test.
138 super(TestACLplugin, self).tearDown()
139 if not self.vpp_dead:
140 self.logger.info(self.vapi.ppcli("show l2fib verbose"))
141 self.logger.info(self.vapi.ppcli("show acl-plugin acl"))
142 self.logger.info(self.vapi.ppcli("show acl-plugin interface"))
143 self.logger.info(self.vapi.ppcli("show acl-plugin tables"))
144 self.logger.info(self.vapi.ppcli("show bridge-domain %s detail"
147 def create_hosts(self, count, start=0):
149 Create required number of host MAC addresses and distribute them among
150 interfaces. Create host IPv4 address for every host MAC address.
152 :param int count: Number of hosts to create MAC/IPv4 addresses for.
153 :param int start: Number to start numbering from.
155 n_int = len(self.pg_interfaces)
156 macs_per_if = count / n_int
158 for pg_if in self.pg_interfaces:
160 start_nr = macs_per_if * i + start
161 end_nr = count + start if i == (n_int - 1) \
162 else macs_per_if * (i + 1) + start
163 hosts = self.hosts_by_pg_idx[pg_if.sw_if_index]
164 for j in range(start_nr, end_nr):
166 "00:00:00:ff:%02x:%02x" % (pg_if.sw_if_index, j),
167 "172.17.1%02x.%u" % (pg_if.sw_if_index, j),
168 "2017:dead:%02x::%u" % (pg_if.sw_if_index, j))
171 def create_rule(self, ip=0, permit_deny=0, ports=PORTS_ALL, proto=-1,
172 s_prefix=0, s_ip='\x00\x00\x00\x00',
173 d_prefix=0, d_ip='\x00\x00\x00\x00'):
176 if ports == self.PORTS_ALL:
179 sport_to = 65535 if proto != 1 and proto != 58 else 255
181 elif ports == self.PORTS_RANGE:
183 sport_from = self.icmp4_type
184 sport_to = self.icmp4_type
185 dport_from = self.icmp4_code
186 dport_to = self.icmp4_code
188 sport_from = self.icmp6_type
189 sport_to = self.icmp6_type
190 dport_from = self.icmp6_code
191 dport_to = self.icmp6_code
192 elif proto == self.proto[self.IP][self.TCP]:
193 sport_from = self.tcp_sport_from
194 sport_to = self.tcp_sport_to
195 dport_from = self.tcp_dport_from
196 dport_to = self.tcp_dport_to
197 elif proto == self.proto[self.IP][self.UDP]:
198 sport_from = self.udp_sport_from
199 sport_to = self.udp_sport_to
200 dport_from = self.udp_dport_from
201 dport_to = self.udp_dport_to
202 elif ports == self.PORTS_RANGE_2:
204 sport_from = self.icmp4_type_2
205 sport_to = self.icmp4_type_2
206 dport_from = self.icmp4_code_from_2
207 dport_to = self.icmp4_code_to_2
209 sport_from = self.icmp6_type_2
210 sport_to = self.icmp6_type_2
211 dport_from = self.icmp6_code_from_2
212 dport_to = self.icmp6_code_to_2
213 elif proto == self.proto[self.IP][self.TCP]:
214 sport_from = self.tcp_sport_from_2
215 sport_to = self.tcp_sport_to_2
216 dport_from = self.tcp_dport_from_2
217 dport_to = self.tcp_dport_to_2
218 elif proto == self.proto[self.IP][self.UDP]:
219 sport_from = self.udp_sport_from_2
220 sport_to = self.udp_sport_to_2
221 dport_from = self.udp_dport_from_2
222 dport_to = self.udp_dport_to_2
229 rule = ({'is_permit': permit_deny, 'is_ipv6': ip, 'proto': proto,
230 'srcport_or_icmptype_first': sport_from,
231 'srcport_or_icmptype_last': sport_to,
232 'src_ip_prefix_len': s_prefix,
234 'dstport_or_icmpcode_first': dport_from,
235 'dstport_or_icmpcode_last': dport_to,
236 'dst_ip_prefix_len': d_prefix,
237 'dst_ip_addr': d_ip})
240 def apply_rules(self, rules, tag=''):
241 reply = self.vapi.acl_add_replace(acl_index=4294967295, r=rules,
243 self.logger.info("Dumped ACL: " + str(
244 self.vapi.acl_dump(reply.acl_index)))
245 # Apply a ACL on the interface as inbound
246 for i in self.pg_interfaces:
247 self.vapi.acl_interface_set_acl_list(sw_if_index=i.sw_if_index,
249 acls=[reply.acl_index])
252 def create_upper_layer(self, packet_index, proto, ports=0):
253 p = self.proto_map[proto]
256 return UDP(sport=random.randint(self.udp_sport_from,
258 dport=random.randint(self.udp_dport_from,
261 return UDP(sport=ports, dport=ports)
264 return TCP(sport=random.randint(self.tcp_sport_from,
266 dport=random.randint(self.tcp_dport_from,
269 return TCP(sport=ports, dport=ports)
272 def create_stream(self, src_if, packet_sizes, traffic_type=0, ipv6=0,
273 proto=-1, ports=0, fragments=False, pkt_raw=True):
275 Create input packet stream for defined interface using hosts or
278 :param object src_if: Interface to create packet stream for.
279 :param list packet_sizes: List of required packet sizes.
280 :param traffic_type: 1: ICMP packet, 2: IPv6 with EH, 0: otherwise.
281 :return: Stream of packets.
284 if self.flows.__contains__(src_if):
285 src_hosts = self.hosts_by_pg_idx[src_if.sw_if_index]
286 for dst_if in self.flows[src_if]:
287 dst_hosts = self.hosts_by_pg_idx[dst_if.sw_if_index]
288 n_int = len(dst_hosts) * len(src_hosts)
289 for i in range(0, n_int):
290 dst_host = dst_hosts[i / len(src_hosts)]
291 src_host = src_hosts[i % len(src_hosts)]
292 pkt_info = self.create_packet_info(src_if, dst_if)
298 pkt_info.ip = random.choice([0, 1])
300 pkt_info.proto = random.choice(self.proto[self.IP])
302 pkt_info.proto = proto
303 payload = self.info_to_payload(pkt_info)
304 p = Ether(dst=dst_host.mac, src=src_host.mac)
306 p /= IPv6(dst=dst_host.ip6, src=src_host.ip6)
308 p /= IPv6ExtHdrFragment(offset=64, m=1)
311 p /= IP(src=src_host.ip4, dst=dst_host.ip4,
314 p /= IP(src=src_host.ip4, dst=dst_host.ip4)
315 if traffic_type == self.ICMP:
317 p /= ICMPv6EchoRequest(type=self.icmp6_type,
318 code=self.icmp6_code)
320 p /= ICMP(type=self.icmp4_type,
321 code=self.icmp4_code)
323 p /= self.create_upper_layer(i, pkt_info.proto, ports)
326 pkt_info.data = p.copy()
328 size = random.choice(packet_sizes)
329 self.extend_packet(p, size)
333 def verify_capture(self, pg_if, capture, traffic_type=0, ip_type=0):
335 Verify captured input packet stream for defined interface.
337 :param object pg_if: Interface to verify captured packet stream for.
338 :param list capture: Captured packet stream.
339 :param traffic_type: 1: ICMP packet, 2: IPv6 with EH, 0: otherwise.
342 for i in self.pg_interfaces:
343 last_info[i.sw_if_index] = None
344 dst_sw_if_index = pg_if.sw_if_index
345 for packet in capture:
347 # Raw data for ICMPv6 are stored in ICMPv6EchoRequest.data
348 if traffic_type == self.ICMP and ip_type == self.IPV6:
349 payload_info = self.payload_to_info(
350 packet[ICMPv6EchoRequest].data)
351 payload = packet[ICMPv6EchoRequest]
353 payload_info = self.payload_to_info(str(packet[Raw]))
354 payload = packet[self.proto_map[payload_info.proto]]
356 self.logger.error(ppp("Unexpected or invalid packet "
357 "(outside network):", packet))
361 self.assertEqual(payload_info.ip, ip_type)
362 if traffic_type == self.ICMP:
364 if payload_info.ip == 0:
365 self.assertEqual(payload.type, self.icmp4_type)
366 self.assertEqual(payload.code, self.icmp4_code)
368 self.assertEqual(payload.type, self.icmp6_type)
369 self.assertEqual(payload.code, self.icmp6_code)
371 self.logger.error(ppp("Unexpected or invalid packet "
372 "(outside network):", packet))
376 ip_version = IPv6 if payload_info.ip == 1 else IP
378 ip = packet[ip_version]
379 packet_index = payload_info.index
381 self.assertEqual(payload_info.dst, dst_sw_if_index)
382 self.logger.debug("Got packet on port %s: src=%u (id=%u)" %
383 (pg_if.name, payload_info.src,
385 next_info = self.get_next_packet_info_for_interface2(
386 payload_info.src, dst_sw_if_index,
387 last_info[payload_info.src])
388 last_info[payload_info.src] = next_info
389 self.assertTrue(next_info is not None)
390 self.assertEqual(packet_index, next_info.index)
391 saved_packet = next_info.data
392 # Check standard fields
393 self.assertEqual(ip.src, saved_packet[ip_version].src)
394 self.assertEqual(ip.dst, saved_packet[ip_version].dst)
395 p = self.proto_map[payload_info.proto]
398 self.assertEqual(tcp.sport, saved_packet[
400 self.assertEqual(tcp.dport, saved_packet[
404 self.assertEqual(udp.sport, saved_packet[
406 self.assertEqual(udp.dport, saved_packet[
409 self.logger.error(ppp("Unexpected or invalid packet:",
412 for i in self.pg_interfaces:
413 remaining_packet = self.get_next_packet_info_for_interface2(
414 i, dst_sw_if_index, last_info[i.sw_if_index])
416 remaining_packet is None,
417 "Port %u: Packet expected from source %u didn't arrive" %
418 (dst_sw_if_index, i.sw_if_index))
420 def run_traffic_no_check(self):
422 # Create incoming packet streams for packet-generator interfaces
423 for i in self.pg_interfaces:
424 if self.flows.__contains__(i):
425 pkts = self.create_stream(i, self.pg_if_packet_sizes)
429 # Enable packet capture and start packet sending
430 self.pg_enable_capture(self.pg_interfaces)
433 def run_verify_test(self, traffic_type=0, ip_type=0, proto=-1, ports=0,
434 frags=False, pkt_raw=True):
436 # Create incoming packet streams for packet-generator interfaces
438 for i in self.pg_interfaces:
439 if self.flows.__contains__(i):
440 pkts = self.create_stream(i, self.pg_if_packet_sizes,
441 traffic_type, ip_type, proto, ports,
445 pkts_cnt += len(pkts)
447 # Enable packet capture and start packet sendingself.IPV
448 self.pg_enable_capture(self.pg_interfaces)
452 # Verify outgoing packet streams per packet-generator interface
453 for src_if in self.pg_interfaces:
454 if self.flows.__contains__(src_if):
455 for dst_if in self.flows[src_if]:
456 capture = dst_if.get_capture(pkts_cnt)
457 self.logger.info("Verifying capture on interface %s" %
459 self.verify_capture(dst_if, capture, traffic_type, ip_type)
461 def run_verify_negat_test(self, traffic_type=0, ip_type=0, proto=-1,
462 ports=0, frags=False):
464 self.reset_packet_infos()
465 for i in self.pg_interfaces:
466 if self.flows.__contains__(i):
467 pkts = self.create_stream(i, self.pg_if_packet_sizes,
468 traffic_type, ip_type, proto, ports,
473 # Enable packet capture and start packet sending
474 self.pg_enable_capture(self.pg_interfaces)
478 # Verify outgoing packet streams per packet-generator interface
479 for src_if in self.pg_interfaces:
480 if self.flows.__contains__(src_if):
481 for dst_if in self.flows[src_if]:
482 self.logger.info("Verifying capture on interface %s" %
484 capture = dst_if.get_capture(0)
485 self.assertEqual(len(capture), 0)
487 def test_0000_warmup_test(self):
488 """ ACL plugin version check; learn MACs
490 self.create_hosts(16)
491 self.run_traffic_no_check()
492 reply = self.vapi.papi.acl_plugin_get_version()
493 self.assertEqual(reply.major, 1)
494 self.logger.info("Working with ACL plugin version: %d.%d" % (
495 reply.major, reply.minor))
496 # minor version changes are non breaking
497 # self.assertEqual(reply.minor, 0)
499 def test_0001_acl_create(self):
500 """ ACL create/delete test
503 self.logger.info("ACLP_TEST_START_0001")
505 r = [{'is_permit': 1, 'is_ipv6': 0, 'proto': 17,
506 'srcport_or_icmptype_first': 1234,
507 'srcport_or_icmptype_last': 1235,
508 'src_ip_prefix_len': 0,
509 'src_ip_addr': '\x00\x00\x00\x00',
510 'dstport_or_icmpcode_first': 1234,
511 'dstport_or_icmpcode_last': 1234,
512 'dst_ip_addr': '\x00\x00\x00\x00',
513 'dst_ip_prefix_len': 0}]
514 # Test 1: add a new ACL
515 reply = self.vapi.acl_add_replace(acl_index=4294967295, r=r,
517 self.assertEqual(reply.retval, 0)
518 # The very first ACL gets #0
519 self.assertEqual(reply.acl_index, 0)
520 first_acl = reply.acl_index
521 rr = self.vapi.acl_dump(reply.acl_index)
522 self.logger.info("Dumped ACL: " + str(rr))
523 self.assertEqual(len(rr), 1)
524 # We should have the same number of ACL entries as we had asked
525 self.assertEqual(len(rr[0].r), len(r))
526 # The rules should be the same. But because the submitted and returned
527 # are different types, we need to iterate over rules and keys to get
529 for i_rule in range(0, len(r) - 1):
530 for rule_key in r[i_rule]:
531 self.assertEqual(rr[0].r[i_rule][rule_key],
534 # Add a deny-1234 ACL
535 r_deny = [{'is_permit': 0, 'is_ipv6': 0, 'proto': 17,
536 'srcport_or_icmptype_first': 1234,
537 'srcport_or_icmptype_last': 1235,
538 'src_ip_prefix_len': 0,
539 'src_ip_addr': '\x00\x00\x00\x00',
540 'dstport_or_icmpcode_first': 1234,
541 'dstport_or_icmpcode_last': 1234,
542 'dst_ip_addr': '\x00\x00\x00\x00',
543 'dst_ip_prefix_len': 0},
544 {'is_permit': 1, 'is_ipv6': 0, 'proto': 17,
545 'srcport_or_icmptype_first': 0,
546 'srcport_or_icmptype_last': 0,
547 'src_ip_prefix_len': 0,
548 'src_ip_addr': '\x00\x00\x00\x00',
549 'dstport_or_icmpcode_first': 0,
550 'dstport_or_icmpcode_last': 0,
551 'dst_ip_addr': '\x00\x00\x00\x00',
552 'dst_ip_prefix_len': 0}]
554 reply = self.vapi.acl_add_replace(acl_index=4294967295, r=r_deny,
555 tag="deny 1234;permit all")
556 self.assertEqual(reply.retval, 0)
557 # The second ACL gets #1
558 self.assertEqual(reply.acl_index, 1)
559 second_acl = reply.acl_index
561 # Test 2: try to modify a nonexistent ACL
562 reply = self.vapi.acl_add_replace(acl_index=432, r=r,
563 tag="FFFF:FFFF", expected_retval=-1)
564 self.assertEqual(reply.retval, -1)
565 # The ACL number should pass through
566 self.assertEqual(reply.acl_index, 432)
567 # apply an ACL on an interface inbound, try to delete ACL, must fail
568 self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
571 reply = self.vapi.acl_del(acl_index=first_acl, expected_retval=-1)
572 # Unapply an ACL and then try to delete it - must be ok
573 self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
576 reply = self.vapi.acl_del(acl_index=first_acl, expected_retval=0)
578 # apply an ACL on an interface outbound, try to delete ACL, must fail
579 self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
582 reply = self.vapi.acl_del(acl_index=second_acl, expected_retval=-1)
583 # Unapply the ACL and then try to delete it - must be ok
584 self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
587 reply = self.vapi.acl_del(acl_index=second_acl, expected_retval=0)
589 # try to apply a nonexistent ACL - must fail
590 self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
595 self.logger.info("ACLP_TEST_FINISH_0001")
597 def test_0002_acl_permit_apply(self):
598 """ permit ACL apply test
600 self.logger.info("ACLP_TEST_START_0002")
603 rules.append(self.create_rule(self.IPV4, self.PERMIT,
604 0, self.proto[self.IP][self.UDP]))
605 rules.append(self.create_rule(self.IPV4, self.PERMIT,
606 0, self.proto[self.IP][self.TCP]))
609 self.apply_rules(rules, "permit per-flow")
611 # Traffic should still pass
612 self.run_verify_test(self.IP, self.IPV4, -1)
613 self.logger.info("ACLP_TEST_FINISH_0002")
615 def test_0003_acl_deny_apply(self):
616 """ deny ACL apply test
618 self.logger.info("ACLP_TEST_START_0003")
619 # Add a deny-flows ACL
621 rules.append(self.create_rule(self.IPV4, self.DENY,
622 self.PORTS_ALL, self.proto[self.IP][self.UDP]))
623 # Permit ip any any in the end
624 rules.append(self.create_rule(self.IPV4, self.PERMIT,
628 self.apply_rules(rules, "deny per-flow;permit all")
630 # Traffic should not pass
631 self.run_verify_negat_test(self.IP, self.IPV4,
632 self.proto[self.IP][self.UDP])
633 self.logger.info("ACLP_TEST_FINISH_0003")
634 # self.assertEqual(1, 0)
636 def test_0004_vpp624_permit_icmpv4(self):
637 """ VPP_624 permit ICMPv4
639 self.logger.info("ACLP_TEST_START_0004")
643 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
644 self.proto[self.ICMP][self.ICMPv4]))
645 # deny ip any any in the end
646 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
649 self.apply_rules(rules, "permit icmpv4")
651 # Traffic should still pass
652 self.run_verify_test(self.ICMP, self.IPV4,
653 self.proto[self.ICMP][self.ICMPv4])
655 self.logger.info("ACLP_TEST_FINISH_0004")
657 def test_0005_vpp624_permit_icmpv6(self):
658 """ VPP_624 permit ICMPv6
660 self.logger.info("ACLP_TEST_START_0005")
664 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
665 self.proto[self.ICMP][self.ICMPv6]))
666 # deny ip any any in the end
667 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
670 self.apply_rules(rules, "permit icmpv6")
672 # Traffic should still pass
673 self.run_verify_test(self.ICMP, self.IPV6,
674 self.proto[self.ICMP][self.ICMPv6])
676 self.logger.info("ACLP_TEST_FINISH_0005")
678 def test_0006_vpp624_deny_icmpv4(self):
679 """ VPP_624 deny ICMPv4
681 self.logger.info("ACLP_TEST_START_0006")
684 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
685 self.proto[self.ICMP][self.ICMPv4]))
686 # permit ip any any in the end
687 rules.append(self.create_rule(self.IPV4, self.PERMIT,
691 self.apply_rules(rules, "deny icmpv4")
693 # Traffic should not pass
694 self.run_verify_negat_test(self.ICMP, self.IPV4, 0)
696 self.logger.info("ACLP_TEST_FINISH_0006")
698 def test_0007_vpp624_deny_icmpv6(self):
699 """ VPP_624 deny ICMPv6
701 self.logger.info("ACLP_TEST_START_0007")
704 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
705 self.proto[self.ICMP][self.ICMPv6]))
706 # deny ip any any in the end
707 rules.append(self.create_rule(self.IPV6, self.PERMIT,
711 self.apply_rules(rules, "deny icmpv6")
713 # Traffic should not pass
714 self.run_verify_negat_test(self.ICMP, self.IPV6, 0)
716 self.logger.info("ACLP_TEST_FINISH_0007")
718 def test_0008_tcp_permit_v4(self):
721 self.logger.info("ACLP_TEST_START_0008")
725 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
726 self.proto[self.IP][self.TCP]))
727 # deny ip any any in the end
728 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
731 self.apply_rules(rules, "permit ipv4 tcp")
733 # Traffic should still pass
734 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP])
736 self.logger.info("ACLP_TEST_FINISH_0008")
738 def test_0009_tcp_permit_v6(self):
741 self.logger.info("ACLP_TEST_START_0009")
745 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
746 self.proto[self.IP][self.TCP]))
747 # deny ip any any in the end
748 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
751 self.apply_rules(rules, "permit ip6 tcp")
753 # Traffic should still pass
754 self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.TCP])
756 self.logger.info("ACLP_TEST_FINISH_0008")
758 def test_0010_udp_permit_v4(self):
761 self.logger.info("ACLP_TEST_START_0010")
765 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
766 self.proto[self.IP][self.UDP]))
767 # deny ip any any in the end
768 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
771 self.apply_rules(rules, "permit ipv udp")
773 # Traffic should still pass
774 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP])
776 self.logger.info("ACLP_TEST_FINISH_0010")
778 def test_0011_udp_permit_v6(self):
781 self.logger.info("ACLP_TEST_START_0011")
785 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
786 self.proto[self.IP][self.UDP]))
787 # deny ip any any in the end
788 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
791 self.apply_rules(rules, "permit ip6 udp")
793 # Traffic should still pass
794 self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.UDP])
796 self.logger.info("ACLP_TEST_FINISH_0011")
798 def test_0012_tcp_deny(self):
801 self.logger.info("ACLP_TEST_START_0012")
805 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
806 self.proto[self.IP][self.TCP]))
807 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
808 self.proto[self.IP][self.TCP]))
809 # permit ip any any in the end
810 rules.append(self.create_rule(self.IPV4, self.PERMIT,
812 rules.append(self.create_rule(self.IPV6, self.PERMIT,
816 self.apply_rules(rules, "deny ip4/ip6 tcp")
818 # Traffic should not pass
819 self.run_verify_negat_test(self.IP, self.IPRANDOM,
820 self.proto[self.IP][self.TCP])
822 self.logger.info("ACLP_TEST_FINISH_0012")
824 def test_0013_udp_deny(self):
827 self.logger.info("ACLP_TEST_START_0013")
831 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
832 self.proto[self.IP][self.UDP]))
833 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
834 self.proto[self.IP][self.UDP]))
835 # permit ip any any in the end
836 rules.append(self.create_rule(self.IPV4, self.PERMIT,
838 rules.append(self.create_rule(self.IPV6, self.PERMIT,
842 self.apply_rules(rules, "deny ip4/ip6 udp")
844 # Traffic should not pass
845 self.run_verify_negat_test(self.IP, self.IPRANDOM,
846 self.proto[self.IP][self.UDP])
848 self.logger.info("ACLP_TEST_FINISH_0013")
850 def test_0014_acl_dump(self):
851 """ verify add/dump acls
853 self.logger.info("ACLP_TEST_START_0014")
855 r = [[self.IPV4, self.PERMIT, 1234, self.proto[self.IP][self.TCP]],
856 [self.IPV4, self.PERMIT, 2345, self.proto[self.IP][self.UDP]],
857 [self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.TCP]],
858 [self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.UDP]],
859 [self.IPV4, self.PERMIT, 5, self.proto[self.ICMP][self.ICMPv4]],
860 [self.IPV6, self.PERMIT, 4321, self.proto[self.IP][self.TCP]],
861 [self.IPV6, self.PERMIT, 5432, self.proto[self.IP][self.UDP]],
862 [self.IPV6, self.PERMIT, 0, self.proto[self.IP][self.TCP]],
863 [self.IPV6, self.PERMIT, 0, self.proto[self.IP][self.UDP]],
864 [self.IPV6, self.PERMIT, 6, self.proto[self.ICMP][self.ICMPv6]],
865 [self.IPV4, self.DENY, self.PORTS_ALL, 0],
866 [self.IPV4, self.DENY, 1234, self.proto[self.IP][self.TCP]],
867 [self.IPV4, self.DENY, 2345, self.proto[self.IP][self.UDP]],
868 [self.IPV4, self.DENY, 5, self.proto[self.ICMP][self.ICMPv4]],
869 [self.IPV6, self.DENY, 4321, self.proto[self.IP][self.TCP]],
870 [self.IPV6, self.DENY, 5432, self.proto[self.IP][self.UDP]],
871 [self.IPV6, self.DENY, 6, self.proto[self.ICMP][self.ICMPv6]],
872 [self.IPV6, self.DENY, self.PORTS_ALL, 0]
875 # Add and verify new ACLs
877 for i in range(len(r)):
878 rules.append(self.create_rule(r[i][0], r[i][1], r[i][2], r[i][3]))
880 reply = self.vapi.acl_add_replace(acl_index=4294967295, r=rules)
881 result = self.vapi.acl_dump(reply.acl_index)
884 for drules in result:
886 self.assertEqual(dr.is_ipv6, r[i][0])
887 self.assertEqual(dr.is_permit, r[i][1])
888 self.assertEqual(dr.proto, r[i][3])
891 self.assertEqual(dr.srcport_or_icmptype_first, r[i][2])
894 self.assertEqual(dr.srcport_or_icmptype_first, 0)
895 self.assertEqual(dr.srcport_or_icmptype_last, 65535)
897 if dr.proto == self.proto[self.IP][self.TCP]:
898 self.assertGreater(dr.srcport_or_icmptype_first,
899 self.tcp_sport_from-1)
900 self.assertLess(dr.srcport_or_icmptype_first,
902 self.assertGreater(dr.dstport_or_icmpcode_last,
903 self.tcp_dport_from-1)
904 self.assertLess(dr.dstport_or_icmpcode_last,
906 elif dr.proto == self.proto[self.IP][self.UDP]:
907 self.assertGreater(dr.srcport_or_icmptype_first,
908 self.udp_sport_from-1)
909 self.assertLess(dr.srcport_or_icmptype_first,
911 self.assertGreater(dr.dstport_or_icmpcode_last,
912 self.udp_dport_from-1)
913 self.assertLess(dr.dstport_or_icmpcode_last,
917 self.logger.info("ACLP_TEST_FINISH_0014")
919 def test_0015_tcp_permit_port_v4(self):
920 """ permit single TCPv4
922 self.logger.info("ACLP_TEST_START_0015")
924 port = random.randint(0, 65535)
927 rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
928 self.proto[self.IP][self.TCP]))
929 # deny ip any any in the end
930 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
933 self.apply_rules(rules, "permit ip4 tcp "+str(port))
935 # Traffic should still pass
936 self.run_verify_test(self.IP, self.IPV4,
937 self.proto[self.IP][self.TCP], port)
939 self.logger.info("ACLP_TEST_FINISH_0015")
941 def test_0016_udp_permit_port_v4(self):
942 """ permit single UDPv4
944 self.logger.info("ACLP_TEST_START_0016")
946 port = random.randint(0, 65535)
949 rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
950 self.proto[self.IP][self.UDP]))
951 # deny ip any any in the end
952 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
955 self.apply_rules(rules, "permit ip4 tcp "+str(port))
957 # Traffic should still pass
958 self.run_verify_test(self.IP, self.IPV4,
959 self.proto[self.IP][self.UDP], port)
961 self.logger.info("ACLP_TEST_FINISH_0016")
963 def test_0017_tcp_permit_port_v6(self):
964 """ permit single TCPv6
966 self.logger.info("ACLP_TEST_START_0017")
968 port = random.randint(0, 65535)
971 rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
972 self.proto[self.IP][self.TCP]))
973 # deny ip any any in the end
974 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
977 self.apply_rules(rules, "permit ip4 tcp "+str(port))
979 # Traffic should still pass
980 self.run_verify_test(self.IP, self.IPV6,
981 self.proto[self.IP][self.TCP], port)
983 self.logger.info("ACLP_TEST_FINISH_0017")
985 def test_0018_udp_permit_port_v6(self):
986 """ permit single UPPv6
988 self.logger.info("ACLP_TEST_START_0018")
990 port = random.randint(0, 65535)
993 rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
994 self.proto[self.IP][self.UDP]))
995 # deny ip any any in the end
996 rules.append(self.create_rule(self.IPV6, self.DENY,
1000 self.apply_rules(rules, "permit ip4 tcp "+str(port))
1002 # Traffic should still pass
1003 self.run_verify_test(self.IP, self.IPV6,
1004 self.proto[self.IP][self.UDP], port)
1006 self.logger.info("ACLP_TEST_FINISH_0018")
1008 def test_0019_udp_deny_port(self):
1009 """ deny single TCPv4/v6
1011 self.logger.info("ACLP_TEST_START_0019")
1013 port = random.randint(0, 65535)
1016 rules.append(self.create_rule(self.IPV4, self.DENY, port,
1017 self.proto[self.IP][self.TCP]))
1018 rules.append(self.create_rule(self.IPV6, self.DENY, port,
1019 self.proto[self.IP][self.TCP]))
1020 # Permit ip any any in the end
1021 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1023 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1027 self.apply_rules(rules, "deny ip4/ip6 udp "+str(port))
1029 # Traffic should not pass
1030 self.run_verify_negat_test(self.IP, self.IPRANDOM,
1031 self.proto[self.IP][self.TCP], port)
1033 self.logger.info("ACLP_TEST_FINISH_0019")
1035 def test_0020_udp_deny_port(self):
1036 """ deny single UDPv4/v6
1038 self.logger.info("ACLP_TEST_START_0020")
1040 port = random.randint(0, 65535)
1043 rules.append(self.create_rule(self.IPV4, self.DENY, port,
1044 self.proto[self.IP][self.UDP]))
1045 rules.append(self.create_rule(self.IPV6, self.DENY, port,
1046 self.proto[self.IP][self.UDP]))
1047 # Permit ip any any in the end
1048 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1050 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1054 self.apply_rules(rules, "deny ip4/ip6 udp "+str(port))
1056 # Traffic should not pass
1057 self.run_verify_negat_test(self.IP, self.IPRANDOM,
1058 self.proto[self.IP][self.UDP], port)
1060 self.logger.info("ACLP_TEST_FINISH_0020")
1062 def test_0021_udp_deny_port_verify_fragment_deny(self):
1063 """ deny single UDPv4/v6, permit ip any, verify non-initial fragment blocked
1065 self.logger.info("ACLP_TEST_START_0021")
1067 port = random.randint(0, 65535)
1070 rules.append(self.create_rule(self.IPV4, self.DENY, port,
1071 self.proto[self.IP][self.UDP]))
1072 rules.append(self.create_rule(self.IPV6, self.DENY, port,
1073 self.proto[self.IP][self.UDP]))
1074 # deny ip any any in the end
1075 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1077 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1081 self.apply_rules(rules, "deny ip4/ip6 udp "+str(port))
1083 # Traffic should not pass
1084 self.run_verify_negat_test(self.IP, self.IPRANDOM,
1085 self.proto[self.IP][self.UDP], port, True)
1087 self.logger.info("ACLP_TEST_FINISH_0021")
1089 def test_0022_zero_length_udp_ipv4(self):
1090 """ VPP-687 zero length udp ipv4 packet"""
1091 self.logger.info("ACLP_TEST_START_0022")
1093 port = random.randint(0, 65535)
1096 rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
1097 self.proto[self.IP][self.UDP]))
1098 # deny ip any any in the end
1100 self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1103 self.apply_rules(rules, "permit empty udp ip4 " + str(port))
1105 # Traffic should still pass
1106 # Create incoming packet streams for packet-generator interfaces
1108 pkts = self.create_stream(self.pg0, self.pg_if_packet_sizes,
1110 self.proto[self.IP][self.UDP], port,
1113 self.pg0.add_stream(pkts)
1114 pkts_cnt += len(pkts)
1116 # Enable packet capture and start packet sendingself.IPV
1117 self.pg_enable_capture(self.pg_interfaces)
1120 self.pg1.get_capture(pkts_cnt)
1122 self.logger.info("ACLP_TEST_FINISH_0022")
1124 def test_0023_zero_length_udp_ipv6(self):
1125 """ VPP-687 zero length udp ipv6 packet"""
1126 self.logger.info("ACLP_TEST_START_0023")
1128 port = random.randint(0, 65535)
1131 rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
1132 self.proto[self.IP][self.UDP]))
1133 # deny ip any any in the end
1134 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1137 self.apply_rules(rules, "permit empty udp ip6 "+str(port))
1139 # Traffic should still pass
1140 # Create incoming packet streams for packet-generator interfaces
1142 pkts = self.create_stream(self.pg0, self.pg_if_packet_sizes,
1144 self.proto[self.IP][self.UDP], port,
1147 self.pg0.add_stream(pkts)
1148 pkts_cnt += len(pkts)
1150 # Enable packet capture and start packet sendingself.IPV
1151 self.pg_enable_capture(self.pg_interfaces)
1154 # Verify outgoing packet streams per packet-generator interface
1155 self.pg1.get_capture(pkts_cnt)
1157 self.logger.info("ACLP_TEST_FINISH_0023")
1159 def test_0108_tcp_permit_v4(self):
1160 """ permit TCPv4 + non-match range
1162 self.logger.info("ACLP_TEST_START_0108")
1166 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1167 self.proto[self.IP][self.TCP]))
1168 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1169 self.proto[self.IP][self.TCP]))
1170 # deny ip any any in the end
1171 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1174 self.apply_rules(rules, "permit ipv4 tcp")
1176 # Traffic should still pass
1177 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP])
1179 self.logger.info("ACLP_TEST_FINISH_0108")
1181 def test_0109_tcp_permit_v6(self):
1182 """ permit TCPv6 + non-match range
1184 self.logger.info("ACLP_TEST_START_0109")
1188 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE_2,
1189 self.proto[self.IP][self.TCP]))
1190 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
1191 self.proto[self.IP][self.TCP]))
1192 # deny ip any any in the end
1193 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1196 self.apply_rules(rules, "permit ip6 tcp")
1198 # Traffic should still pass
1199 self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.TCP])
1201 self.logger.info("ACLP_TEST_FINISH_0109")
1203 def test_0110_udp_permit_v4(self):
1204 """ permit UDPv4 + non-match range
1206 self.logger.info("ACLP_TEST_START_0110")
1210 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1211 self.proto[self.IP][self.UDP]))
1212 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1213 self.proto[self.IP][self.UDP]))
1214 # deny ip any any in the end
1215 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1218 self.apply_rules(rules, "permit ipv4 udp")
1220 # Traffic should still pass
1221 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP])
1223 self.logger.info("ACLP_TEST_FINISH_0110")
1225 def test_0111_udp_permit_v6(self):
1226 """ permit UDPv6 + non-match range
1228 self.logger.info("ACLP_TEST_START_0111")
1232 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE_2,
1233 self.proto[self.IP][self.UDP]))
1234 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
1235 self.proto[self.IP][self.UDP]))
1236 # deny ip any any in the end
1237 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1240 self.apply_rules(rules, "permit ip6 udp")
1242 # Traffic should still pass
1243 self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.UDP])
1245 self.logger.info("ACLP_TEST_FINISH_0111")
1247 def test_0112_tcp_deny(self):
1248 """ deny TCPv4/v6 + non-match range
1250 self.logger.info("ACLP_TEST_START_0112")
1254 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1256 self.proto[self.IP][self.TCP]))
1257 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1259 self.proto[self.IP][self.TCP]))
1260 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
1261 self.proto[self.IP][self.TCP]))
1262 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
1263 self.proto[self.IP][self.TCP]))
1264 # permit ip any any in the end
1265 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1267 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1271 self.apply_rules(rules, "deny ip4/ip6 tcp")
1273 # Traffic should not pass
1274 self.run_verify_negat_test(self.IP, self.IPRANDOM,
1275 self.proto[self.IP][self.TCP])
1277 self.logger.info("ACLP_TEST_FINISH_0112")
1279 def test_0113_udp_deny(self):
1280 """ deny UDPv4/v6 + non-match range
1282 self.logger.info("ACLP_TEST_START_0113")
1286 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1288 self.proto[self.IP][self.UDP]))
1289 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1291 self.proto[self.IP][self.UDP]))
1292 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
1293 self.proto[self.IP][self.UDP]))
1294 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
1295 self.proto[self.IP][self.UDP]))
1296 # permit ip any any in the end
1297 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1299 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1303 self.apply_rules(rules, "deny ip4/ip6 udp")
1305 # Traffic should not pass
1306 self.run_verify_negat_test(self.IP, self.IPRANDOM,
1307 self.proto[self.IP][self.UDP])
1309 self.logger.info("ACLP_TEST_FINISH_0113")
1312 if __name__ == '__main__':
1313 unittest.main(testRunner=VppTestRunner)