2 """ACL plugin Test Case HLD:
8 from scapy.packet import Raw
9 from scapy.layers.l2 import Ether
10 from scapy.layers.inet import IP, TCP, UDP, ICMP
11 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest
12 from scapy.layers.inet6 import IPv6ExtHdrFragment
13 from framework import VppTestCase, VppTestRunner
14 from util import Host, ppp
17 class TestACLplugin(VppTestCase):
18 """ ACL plugin Test Case """
34 proto = [[6, 17], [1, 58]]
35 proto_map = {1: 'ICMP', 58: 'ICMPv6EchoRequest', 6: 'TCP', 17: 'UDP'}
46 udp_sport_to = udp_sport_from + 5
47 udp_dport_from = 20000
48 udp_dport_to = udp_dport_from + 5000
50 tcp_sport_to = tcp_sport_from + 5
51 tcp_dport_from = 40000
52 tcp_dport_to = tcp_dport_from + 5000
54 icmp4_type = 8 # echo request
56 icmp6_type = 128 # echo request
65 Perform standard class setup (defined by class method setUpClass in
66 class VppTestCase) before running the test case, set test case related
67 variables and configure VPP.
69 super(TestACLplugin, cls).setUpClass()
74 # Create 2 pg interfaces
75 cls.create_pg_interfaces(range(2))
77 # Packet flows mapping pg0 -> pg1, pg2 etc.
79 cls.flows[cls.pg0] = [cls.pg1]
82 cls.pg_if_packet_sizes = [64, 512, 1518, 9018]
84 # Create BD with MAC learning and unknown unicast flooding disabled
85 # and put interfaces to this BD
86 cls.vapi.bridge_domain_add_del(bd_id=cls.bd_id, uu_flood=1,
88 for pg_if in cls.pg_interfaces:
89 cls.vapi.sw_interface_set_l2_bridge(pg_if.sw_if_index,
92 # Set up all interfaces
93 for i in cls.pg_interfaces:
96 # Mapping between packet-generator index and lists of test hosts
97 cls.hosts_by_pg_idx = dict()
98 for pg_if in cls.pg_interfaces:
99 cls.hosts_by_pg_idx[pg_if.sw_if_index] = []
101 # Create list of deleted hosts
102 cls.deleted_hosts_by_pg_idx = dict()
103 for pg_if in cls.pg_interfaces:
104 cls.deleted_hosts_by_pg_idx[pg_if.sw_if_index] = []
106 # warm-up the mac address tables
110 super(TestACLplugin, cls).tearDownClass()
114 super(TestACLplugin, self).setUp()
115 self.reset_packet_infos()
119 Show various debug prints after each test.
121 super(TestACLplugin, self).tearDown()
122 if not self.vpp_dead:
123 self.logger.info(self.vapi.ppcli("show l2fib verbose"))
124 self.logger.info(self.vapi.ppcli("show acl-plugin acl"))
125 self.logger.info(self.vapi.ppcli("show acl-plugin interface"))
126 self.logger.info(self.vapi.ppcli("show acl-plugin tables"))
127 self.logger.info(self.vapi.ppcli("show bridge-domain %s detail"
130 def create_hosts(self, count, start=0):
132 Create required number of host MAC addresses and distribute them among
133 interfaces. Create host IPv4 address for every host MAC address.
135 :param int count: Number of hosts to create MAC/IPv4 addresses for.
136 :param int start: Number to start numbering from.
138 n_int = len(self.pg_interfaces)
139 macs_per_if = count / n_int
141 for pg_if in self.pg_interfaces:
143 start_nr = macs_per_if * i + start
144 end_nr = count + start if i == (n_int - 1) \
145 else macs_per_if * (i + 1) + start
146 hosts = self.hosts_by_pg_idx[pg_if.sw_if_index]
147 for j in range(start_nr, end_nr):
149 "00:00:00:ff:%02x:%02x" % (pg_if.sw_if_index, j),
150 "172.17.1%02x.%u" % (pg_if.sw_if_index, j),
151 "2017:dead:%02x::%u" % (pg_if.sw_if_index, j))
154 def create_rule(self, ip=0, permit_deny=0, ports=PORTS_ALL, proto=-1,
155 s_prefix=0, s_ip='\x00\x00\x00\x00',
156 d_prefix=0, d_ip='\x00\x00\x00\x00'):
159 if ports == self.PORTS_ALL:
162 sport_to = 65535 if proto != 1 and proto != 58 else 255
164 elif ports == self.PORTS_RANGE:
166 sport_from = self.icmp4_type
167 sport_to = self.icmp4_type
168 dport_from = self.icmp4_code
169 dport_to = self.icmp4_code
171 sport_from = self.icmp6_type
172 sport_to = self.icmp6_type
173 dport_from = self.icmp6_code
174 dport_to = self.icmp6_code
175 elif proto == self.proto[self.IP][self.TCP]:
176 sport_from = self.tcp_sport_from
177 sport_to = self.tcp_sport_to
178 dport_from = self.tcp_dport_from
179 dport_to = self.tcp_dport_to
180 elif proto == self.proto[self.IP][self.UDP]:
181 sport_from = self.udp_sport_from
182 sport_to = self.udp_sport_to
183 dport_from = self.udp_dport_from
184 dport_to = self.udp_dport_to
191 rule = ({'is_permit': permit_deny, 'is_ipv6': ip, 'proto': proto,
192 'srcport_or_icmptype_first': sport_from,
193 'srcport_or_icmptype_last': sport_to,
194 'src_ip_prefix_len': s_prefix,
196 'dstport_or_icmpcode_first': dport_from,
197 'dstport_or_icmpcode_last': dport_to,
198 'dst_ip_prefix_len': d_prefix,
199 'dst_ip_addr': d_ip})
202 def apply_rules(self, rules, tag=''):
203 reply = self.api_acl_add_replace(acl_index=4294967295, r=rules,
206 self.logger.info("Dumped ACL: " + str(
207 self.api_acl_dump(reply.acl_index)))
208 # Apply a ACL on the interface as inbound
209 for i in self.pg_interfaces:
210 self.api_acl_interface_set_acl_list(sw_if_index=i.sw_if_index,
212 acls=[reply.acl_index])
215 def create_upper_layer(self, packet_index, proto, ports=0):
216 p = self.proto_map[proto]
219 return UDP(sport=random.randint(self.udp_sport_from,
221 dport=random.randint(self.udp_dport_from,
224 return UDP(sport=ports, dport=ports)
227 return TCP(sport=random.randint(self.tcp_sport_from,
229 dport=random.randint(self.tcp_dport_from,
232 return TCP(sport=ports, dport=ports)
235 def create_stream(self, src_if, packet_sizes, traffic_type=0, ipv6=0,
236 proto=-1, ports=0, fragments=False, pkt_raw=True):
238 Create input packet stream for defined interface using hosts or
241 :param object src_if: Interface to create packet stream for.
242 :param list packet_sizes: List of required packet sizes.
243 :param traffic_type: 1: ICMP packet, 2: IPv6 with EH, 0: otherwise.
244 :return: Stream of packets.
247 if self.flows.__contains__(src_if):
248 src_hosts = self.hosts_by_pg_idx[src_if.sw_if_index]
249 for dst_if in self.flows[src_if]:
250 dst_hosts = self.hosts_by_pg_idx[dst_if.sw_if_index]
251 n_int = len(dst_hosts) * len(src_hosts)
252 for i in range(0, n_int):
253 dst_host = dst_hosts[i / len(src_hosts)]
254 src_host = src_hosts[i % len(src_hosts)]
255 pkt_info = self.create_packet_info(src_if, dst_if)
261 pkt_info.ip = random.choice([0, 1])
263 pkt_info.proto = random.choice(self.proto[self.IP])
265 pkt_info.proto = proto
266 payload = self.info_to_payload(pkt_info)
267 p = Ether(dst=dst_host.mac, src=src_host.mac)
269 p /= IPv6(dst=dst_host.ip6, src=src_host.ip6)
271 p /= IPv6ExtHdrFragment(offset=64, m=1)
274 p /= IP(src=src_host.ip4, dst=dst_host.ip4,
277 p /= IP(src=src_host.ip4, dst=dst_host.ip4)
278 if traffic_type == self.ICMP:
280 p /= ICMPv6EchoRequest(type=self.icmp6_type,
281 code=self.icmp6_code)
283 p /= ICMP(type=self.icmp4_type,
284 code=self.icmp4_code)
286 p /= self.create_upper_layer(i, pkt_info.proto, ports)
289 pkt_info.data = p.copy()
291 size = random.choice(packet_sizes)
292 self.extend_packet(p, size)
296 def verify_capture(self, pg_if, capture, traffic_type=0, ip_type=0):
298 Verify captured input packet stream for defined interface.
300 :param object pg_if: Interface to verify captured packet stream for.
301 :param list capture: Captured packet stream.
302 :param traffic_type: 1: ICMP packet, 2: IPv6 with EH, 0: otherwise.
305 for i in self.pg_interfaces:
306 last_info[i.sw_if_index] = None
307 dst_sw_if_index = pg_if.sw_if_index
308 for packet in capture:
310 # Raw data for ICMPv6 are stored in ICMPv6EchoRequest.data
311 if traffic_type == self.ICMP and ip_type == self.IPV6:
312 payload_info = self.payload_to_info(
313 packet[ICMPv6EchoRequest].data)
314 payload = packet[ICMPv6EchoRequest]
316 payload_info = self.payload_to_info(str(packet[Raw]))
317 payload = packet[self.proto_map[payload_info.proto]]
319 self.logger.error(ppp("Unexpected or invalid packet "
320 "(outside network):", packet))
324 self.assertEqual(payload_info.ip, ip_type)
325 if traffic_type == self.ICMP:
327 if payload_info.ip == 0:
328 self.assertEqual(payload.type, self.icmp4_type)
329 self.assertEqual(payload.code, self.icmp4_code)
331 self.assertEqual(payload.type, self.icmp6_type)
332 self.assertEqual(payload.code, self.icmp6_code)
334 self.logger.error(ppp("Unexpected or invalid packet "
335 "(outside network):", packet))
339 ip_version = IPv6 if payload_info.ip == 1 else IP
341 ip = packet[ip_version]
342 packet_index = payload_info.index
344 self.assertEqual(payload_info.dst, dst_sw_if_index)
345 self.logger.debug("Got packet on port %s: src=%u (id=%u)" %
346 (pg_if.name, payload_info.src,
348 next_info = self.get_next_packet_info_for_interface2(
349 payload_info.src, dst_sw_if_index,
350 last_info[payload_info.src])
351 last_info[payload_info.src] = next_info
352 self.assertTrue(next_info is not None)
353 self.assertEqual(packet_index, next_info.index)
354 saved_packet = next_info.data
355 # Check standard fields
356 self.assertEqual(ip.src, saved_packet[ip_version].src)
357 self.assertEqual(ip.dst, saved_packet[ip_version].dst)
358 p = self.proto_map[payload_info.proto]
361 self.assertEqual(tcp.sport, saved_packet[
363 self.assertEqual(tcp.dport, saved_packet[
367 self.assertEqual(udp.sport, saved_packet[
369 self.assertEqual(udp.dport, saved_packet[
372 self.logger.error(ppp("Unexpected or invalid packet:",
375 for i in self.pg_interfaces:
376 remaining_packet = self.get_next_packet_info_for_interface2(
377 i, dst_sw_if_index, last_info[i.sw_if_index])
379 remaining_packet is None,
380 "Port %u: Packet expected from source %u didn't arrive" %
381 (dst_sw_if_index, i.sw_if_index))
383 def run_traffic_no_check(self):
385 # Create incoming packet streams for packet-generator interfaces
386 for i in self.pg_interfaces:
387 if self.flows.__contains__(i):
388 pkts = self.create_stream(i, self.pg_if_packet_sizes)
392 # Enable packet capture and start packet sending
393 self.pg_enable_capture(self.pg_interfaces)
396 def run_verify_test(self, traffic_type=0, ip_type=0, proto=-1, ports=0,
397 frags=False, pkt_raw=True):
399 # Create incoming packet streams for packet-generator interfaces
401 for i in self.pg_interfaces:
402 if self.flows.__contains__(i):
403 pkts = self.create_stream(i, self.pg_if_packet_sizes,
404 traffic_type, ip_type, proto, ports,
408 pkts_cnt += len(pkts)
410 # Enable packet capture and start packet sendingself.IPV
411 self.pg_enable_capture(self.pg_interfaces)
415 # Verify outgoing packet streams per packet-generator interface
416 for src_if in self.pg_interfaces:
417 if self.flows.__contains__(src_if):
418 for dst_if in self.flows[src_if]:
419 capture = dst_if.get_capture(pkts_cnt)
420 self.logger.info("Verifying capture on interface %s" %
422 self.verify_capture(dst_if, capture, traffic_type, ip_type)
424 def run_verify_negat_test(self, traffic_type=0, ip_type=0, proto=-1,
425 ports=0, frags=False):
427 self.reset_packet_infos()
428 for i in self.pg_interfaces:
429 if self.flows.__contains__(i):
430 pkts = self.create_stream(i, self.pg_if_packet_sizes,
431 traffic_type, ip_type, proto, ports,
436 # Enable packet capture and start packet sending
437 self.pg_enable_capture(self.pg_interfaces)
441 # Verify outgoing packet streams per packet-generator interface
442 for src_if in self.pg_interfaces:
443 if self.flows.__contains__(src_if):
444 for dst_if in self.flows[src_if]:
445 self.logger.info("Verifying capture on interface %s" %
447 capture = dst_if.get_capture(0)
448 self.assertEqual(len(capture), 0)
450 def api_acl_add_replace(self, acl_index, r, count, tag='',
452 """Add/replace an ACL
454 :param int acl_index: ACL index to replace,
455 4294967295 to create new ACL.
456 :param acl_rule r: ACL rules array.
457 :param str tag: symbolic tag (description) for this ACL.
458 :param int count: number of rules.
460 return self.vapi.api(self.vapi.papi.acl_add_replace,
461 {'acl_index': acl_index,
465 expected_retval=expected_retval)
467 def api_acl_interface_set_acl_list(self, sw_if_index, count, n_input, acls,
469 return self.vapi.api(self.vapi.papi.acl_interface_set_acl_list,
470 {'sw_if_index': sw_if_index,
474 expected_retval=expected_retval)
476 def api_acl_dump(self, acl_index, expected_retval=0):
477 return self.vapi.api(self.vapi.papi.acl_dump,
478 {'acl_index': acl_index},
479 expected_retval=expected_retval)
481 def test_0000_warmup_test(self):
482 """ ACL plugin version check; learn MACs
484 self.create_hosts(16)
485 self.run_traffic_no_check()
486 reply = self.vapi.papi.acl_plugin_get_version()
487 self.assertEqual(reply.major, 1)
488 self.logger.info("Working with ACL plugin version: %d.%d" % (
489 reply.major, reply.minor))
490 # minor version changes are non breaking
491 # self.assertEqual(reply.minor, 0)
493 def test_0001_acl_create(self):
497 self.logger.info("ACLP_TEST_START_0001")
499 r = [{'is_permit': 1, 'is_ipv6': 0, 'proto': 17,
500 'srcport_or_icmptype_first': 1234,
501 'srcport_or_icmptype_last': 1235,
502 'src_ip_prefix_len': 0,
503 'src_ip_addr': '\x00\x00\x00\x00',
504 'dstport_or_icmpcode_first': 1234,
505 'dstport_or_icmpcode_last': 1234,
506 'dst_ip_addr': '\x00\x00\x00\x00',
507 'dst_ip_prefix_len': 0}]
508 # Test 1: add a new ACL
509 reply = self.api_acl_add_replace(acl_index=4294967295, r=r,
510 count=len(r), tag="permit 1234")
511 self.assertEqual(reply.retval, 0)
512 # The very first ACL gets #0
513 self.assertEqual(reply.acl_index, 0)
514 rr = self.api_acl_dump(reply.acl_index)
515 self.logger.info("Dumped ACL: " + str(rr))
516 self.assertEqual(len(rr), 1)
517 # We should have the same number of ACL entries as we had asked
518 self.assertEqual(len(rr[0].r), len(r))
519 # The rules should be the same. But because the submitted and returned
520 # are different types, we need to iterate over rules and keys to get
522 for i_rule in range(0, len(r) - 1):
523 for rule_key in r[i_rule]:
524 self.assertEqual(rr[0].r[i_rule][rule_key],
527 # Add a deny-1234 ACL
528 r_deny = ({'is_permit': 0, 'is_ipv6': 0, 'proto': 17,
529 'srcport_or_icmptype_first': 1234,
530 'srcport_or_icmptype_last': 1235,
531 'src_ip_prefix_len': 0,
532 'src_ip_addr': '\x00\x00\x00\x00',
533 'dstport_or_icmpcode_first': 1234,
534 'dstport_or_icmpcode_last': 1234,
535 'dst_ip_addr': '\x00\x00\x00\x00',
536 'dst_ip_prefix_len': 0},
537 {'is_permit': 1, 'is_ipv6': 0, 'proto': 17,
538 'srcport_or_icmptype_first': 0,
539 'srcport_or_icmptype_last': 0,
540 'src_ip_prefix_len': 0,
541 'src_ip_addr': '\x00\x00\x00\x00',
542 'dstport_or_icmpcode_first': 0,
543 'dstport_or_icmpcode_last': 0,
544 'dst_ip_addr': '\x00\x00\x00\x00',
545 'dst_ip_prefix_len': 0})
547 reply = self.api_acl_add_replace(acl_index=4294967295, r=r_deny,
549 tag="deny 1234;permit all")
550 self.assertEqual(reply.retval, 0)
551 # The second ACL gets #1
552 self.assertEqual(reply.acl_index, 1)
554 # Test 2: try to modify a nonexistent ACL
555 reply = self.api_acl_add_replace(acl_index=432, r=r, count=len(r),
556 tag="FFFF:FFFF", expected_retval=-1)
557 self.assertEqual(reply.retval, -1)
558 # The ACL number should pass through
559 self.assertEqual(reply.acl_index, 432)
561 self.logger.info("ACLP_TEST_FINISH_0001")
563 def test_0002_acl_permit_apply(self):
564 """ permit ACL apply test
566 self.logger.info("ACLP_TEST_START_0002")
569 rules.append(self.create_rule(self.IPV4, self.PERMIT,
570 0, self.proto[self.IP][self.UDP]))
571 rules.append(self.create_rule(self.IPV4, self.PERMIT,
572 0, self.proto[self.IP][self.TCP]))
575 self.apply_rules(rules, "permit per-flow")
577 # Traffic should still pass
578 self.run_verify_test(self.IP, self.IPV4, -1)
579 self.logger.info("ACLP_TEST_FINISH_0002")
581 def test_0003_acl_deny_apply(self):
582 """ deny ACL apply test
584 self.logger.info("ACLP_TEST_START_0003")
585 # Add a deny-flows ACL
587 rules.append(self.create_rule(self.IPV4, self.DENY,
588 self.PORTS_ALL, self.proto[self.IP][self.UDP]))
589 # Permit ip any any in the end
590 rules.append(self.create_rule(self.IPV4, self.PERMIT,
594 self.apply_rules(rules, "deny per-flow;permit all")
596 # Traffic should not pass
597 self.run_verify_negat_test(self.IP, self.IPV4,
598 self.proto[self.IP][self.UDP])
599 self.logger.info("ACLP_TEST_FINISH_0003")
600 # self.assertEqual(1, 0)
602 def test_0004_vpp624_permit_icmpv4(self):
603 """ VPP_624 permit ICMPv4
605 self.logger.info("ACLP_TEST_START_0004")
609 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
610 self.proto[self.ICMP][self.ICMPv4]))
611 # deny ip any any in the end
612 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
615 self.apply_rules(rules, "permit icmpv4")
617 # Traffic should still pass
618 self.run_verify_test(self.ICMP, self.IPV4,
619 self.proto[self.ICMP][self.ICMPv4])
621 self.logger.info("ACLP_TEST_FINISH_0004")
623 def test_0005_vpp624_permit_icmpv6(self):
624 """ VPP_624 permit ICMPv6
626 self.logger.info("ACLP_TEST_START_0005")
630 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
631 self.proto[self.ICMP][self.ICMPv6]))
632 # deny ip any any in the end
633 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
636 self.apply_rules(rules, "permit icmpv6")
638 # Traffic should still pass
639 self.run_verify_test(self.ICMP, self.IPV6,
640 self.proto[self.ICMP][self.ICMPv6])
642 self.logger.info("ACLP_TEST_FINISH_0005")
644 def test_0006_vpp624_deny_icmpv4(self):
645 """ VPP_624 deny ICMPv4
647 self.logger.info("ACLP_TEST_START_0006")
650 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
651 self.proto[self.ICMP][self.ICMPv4]))
652 # permit ip any any in the end
653 rules.append(self.create_rule(self.IPV4, self.PERMIT,
657 self.apply_rules(rules, "deny icmpv4")
659 # Traffic should not pass
660 self.run_verify_negat_test(self.ICMP, self.IPV4, 0)
662 self.logger.info("ACLP_TEST_FINISH_0006")
664 def test_0007_vpp624_deny_icmpv6(self):
665 """ VPP_624 deny ICMPv6
667 self.logger.info("ACLP_TEST_START_0007")
670 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
671 self.proto[self.ICMP][self.ICMPv6]))
672 # deny ip any any in the end
673 rules.append(self.create_rule(self.IPV6, self.PERMIT,
677 self.apply_rules(rules, "deny icmpv6")
679 # Traffic should not pass
680 self.run_verify_negat_test(self.ICMP, self.IPV6, 0)
682 self.logger.info("ACLP_TEST_FINISH_0007")
684 def test_0008_tcp_permit_v4(self):
687 self.logger.info("ACLP_TEST_START_0008")
691 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
692 self.proto[self.IP][self.TCP]))
693 # deny ip any any in the end
694 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
697 self.apply_rules(rules, "permit ipv4 tcp")
699 # Traffic should still pass
700 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP])
702 self.logger.info("ACLP_TEST_FINISH_0008")
704 def test_0009_tcp_permit_v6(self):
707 self.logger.info("ACLP_TEST_START_0009")
711 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
712 self.proto[self.IP][self.TCP]))
713 # deny ip any any in the end
714 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
717 self.apply_rules(rules, "permit ip6 tcp")
719 # Traffic should still pass
720 self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.TCP])
722 self.logger.info("ACLP_TEST_FINISH_0008")
724 def test_0010_udp_permit_v4(self):
727 self.logger.info("ACLP_TEST_START_0010")
731 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
732 self.proto[self.IP][self.UDP]))
733 # deny ip any any in the end
734 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
737 self.apply_rules(rules, "permit ipv udp")
739 # Traffic should still pass
740 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP])
742 self.logger.info("ACLP_TEST_FINISH_0010")
744 def test_0011_udp_permit_v6(self):
747 self.logger.info("ACLP_TEST_START_0011")
751 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
752 self.proto[self.IP][self.UDP]))
753 # deny ip any any in the end
754 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
757 self.apply_rules(rules, "permit ip6 udp")
759 # Traffic should still pass
760 self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.UDP])
762 self.logger.info("ACLP_TEST_FINISH_0011")
764 def test_0012_tcp_deny(self):
767 self.logger.info("ACLP_TEST_START_0012")
771 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
772 self.proto[self.IP][self.TCP]))
773 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
774 self.proto[self.IP][self.TCP]))
775 # permit ip any any in the end
776 rules.append(self.create_rule(self.IPV4, self.PERMIT,
778 rules.append(self.create_rule(self.IPV6, self.PERMIT,
782 self.apply_rules(rules, "deny ip4/ip6 tcp")
784 # Traffic should not pass
785 self.run_verify_negat_test(self.IP, self.IPRANDOM,
786 self.proto[self.IP][self.TCP])
788 self.logger.info("ACLP_TEST_FINISH_0012")
790 def test_0013_udp_deny(self):
793 self.logger.info("ACLP_TEST_START_0013")
797 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
798 self.proto[self.IP][self.UDP]))
799 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
800 self.proto[self.IP][self.UDP]))
801 # permit ip any any in the end
802 rules.append(self.create_rule(self.IPV4, self.PERMIT,
804 rules.append(self.create_rule(self.IPV6, self.PERMIT,
808 self.apply_rules(rules, "deny ip4/ip6 udp")
810 # Traffic should not pass
811 self.run_verify_negat_test(self.IP, self.IPRANDOM,
812 self.proto[self.IP][self.UDP])
814 self.logger.info("ACLP_TEST_FINISH_0013")
816 def test_0014_acl_dump(self):
817 """ verify add/dump acls
819 self.logger.info("ACLP_TEST_START_0014")
821 r = [[self.IPV4, self.PERMIT, 1234, self.proto[self.IP][self.TCP]],
822 [self.IPV4, self.PERMIT, 2345, self.proto[self.IP][self.UDP]],
823 [self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.TCP]],
824 [self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.UDP]],
825 [self.IPV4, self.PERMIT, 5, self.proto[self.ICMP][self.ICMPv4]],
826 [self.IPV6, self.PERMIT, 4321, self.proto[self.IP][self.TCP]],
827 [self.IPV6, self.PERMIT, 5432, self.proto[self.IP][self.UDP]],
828 [self.IPV6, self.PERMIT, 0, self.proto[self.IP][self.TCP]],
829 [self.IPV6, self.PERMIT, 0, self.proto[self.IP][self.UDP]],
830 [self.IPV6, self.PERMIT, 6, self.proto[self.ICMP][self.ICMPv6]],
831 [self.IPV4, self.DENY, self.PORTS_ALL, 0],
832 [self.IPV4, self.DENY, 1234, self.proto[self.IP][self.TCP]],
833 [self.IPV4, self.DENY, 2345, self.proto[self.IP][self.UDP]],
834 [self.IPV4, self.DENY, 5, self.proto[self.ICMP][self.ICMPv4]],
835 [self.IPV6, self.DENY, 4321, self.proto[self.IP][self.TCP]],
836 [self.IPV6, self.DENY, 5432, self.proto[self.IP][self.UDP]],
837 [self.IPV6, self.DENY, 6, self.proto[self.ICMP][self.ICMPv6]],
838 [self.IPV6, self.DENY, self.PORTS_ALL, 0]
841 # Add and verify new ACLs
843 for i in range(len(r)):
844 rules.append(self.create_rule(r[i][0], r[i][1], r[i][2], r[i][3]))
846 reply = self.api_acl_add_replace(acl_index=4294967295, r=rules,
848 result = self.api_acl_dump(reply.acl_index)
851 for drules in result:
853 self.assertEqual(dr.is_ipv6, r[i][0])
854 self.assertEqual(dr.is_permit, r[i][1])
855 self.assertEqual(dr.proto, r[i][3])
858 self.assertEqual(dr.srcport_or_icmptype_first, r[i][2])
861 self.assertEqual(dr.srcport_or_icmptype_first, 0)
862 self.assertEqual(dr.srcport_or_icmptype_last, 65535)
864 if dr.proto == self.proto[self.IP][self.TCP]:
865 self.assertGreater(dr.srcport_or_icmptype_first,
866 self.tcp_sport_from-1)
867 self.assertLess(dr.srcport_or_icmptype_first,
869 self.assertGreater(dr.dstport_or_icmpcode_last,
870 self.tcp_dport_from-1)
871 self.assertLess(dr.dstport_or_icmpcode_last,
873 elif dr.proto == self.proto[self.IP][self.UDP]:
874 self.assertGreater(dr.srcport_or_icmptype_first,
875 self.udp_sport_from-1)
876 self.assertLess(dr.srcport_or_icmptype_first,
878 self.assertGreater(dr.dstport_or_icmpcode_last,
879 self.udp_dport_from-1)
880 self.assertLess(dr.dstport_or_icmpcode_last,
884 self.logger.info("ACLP_TEST_FINISH_0014")
886 def test_0015_tcp_permit_port_v4(self):
887 """ permit single TCPv4
889 self.logger.info("ACLP_TEST_START_0015")
891 port = random.randint(0, 65535)
894 rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
895 self.proto[self.IP][self.TCP]))
896 # deny ip any any in the end
897 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
900 self.apply_rules(rules, "permit ip4 tcp "+str(port))
902 # Traffic should still pass
903 self.run_verify_test(self.IP, self.IPV4,
904 self.proto[self.IP][self.TCP], port)
906 self.logger.info("ACLP_TEST_FINISH_0015")
908 def test_0016_udp_permit_port_v4(self):
909 """ permit single UDPv4
911 self.logger.info("ACLP_TEST_START_0016")
913 port = random.randint(0, 65535)
916 rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
917 self.proto[self.IP][self.UDP]))
918 # deny ip any any in the end
919 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
922 self.apply_rules(rules, "permit ip4 tcp "+str(port))
924 # Traffic should still pass
925 self.run_verify_test(self.IP, self.IPV4,
926 self.proto[self.IP][self.UDP], port)
928 self.logger.info("ACLP_TEST_FINISH_0016")
930 def test_0017_tcp_permit_port_v6(self):
931 """ permit single TCPv6
933 self.logger.info("ACLP_TEST_START_0017")
935 port = random.randint(0, 65535)
938 rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
939 self.proto[self.IP][self.TCP]))
940 # deny ip any any in the end
941 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
944 self.apply_rules(rules, "permit ip4 tcp "+str(port))
946 # Traffic should still pass
947 self.run_verify_test(self.IP, self.IPV6,
948 self.proto[self.IP][self.TCP], port)
950 self.logger.info("ACLP_TEST_FINISH_0017")
952 def test_0018_udp_permit_port_v6(self):
953 """ permit single UPPv6
955 self.logger.info("ACLP_TEST_START_0018")
957 port = random.randint(0, 65535)
960 rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
961 self.proto[self.IP][self.UDP]))
962 # deny ip any any in the end
963 rules.append(self.create_rule(self.IPV6, self.DENY,
967 self.apply_rules(rules, "permit ip4 tcp "+str(port))
969 # Traffic should still pass
970 self.run_verify_test(self.IP, self.IPV6,
971 self.proto[self.IP][self.UDP], port)
973 self.logger.info("ACLP_TEST_FINISH_0018")
975 def test_0019_udp_deny_port(self):
976 """ deny single TCPv4/v6
978 self.logger.info("ACLP_TEST_START_0019")
980 port = random.randint(0, 65535)
983 rules.append(self.create_rule(self.IPV4, self.DENY, port,
984 self.proto[self.IP][self.TCP]))
985 rules.append(self.create_rule(self.IPV6, self.DENY, port,
986 self.proto[self.IP][self.TCP]))
987 # Permit ip any any in the end
988 rules.append(self.create_rule(self.IPV4, self.PERMIT,
990 rules.append(self.create_rule(self.IPV6, self.PERMIT,
994 self.apply_rules(rules, "deny ip4/ip6 udp "+str(port))
996 # Traffic should not pass
997 self.run_verify_negat_test(self.IP, self.IPRANDOM,
998 self.proto[self.IP][self.TCP], port)
1000 self.logger.info("ACLP_TEST_FINISH_0019")
1002 def test_0020_udp_deny_port(self):
1003 """ deny single UDPv4/v6
1005 self.logger.info("ACLP_TEST_START_0020")
1007 port = random.randint(0, 65535)
1010 rules.append(self.create_rule(self.IPV4, self.DENY, port,
1011 self.proto[self.IP][self.UDP]))
1012 rules.append(self.create_rule(self.IPV6, self.DENY, port,
1013 self.proto[self.IP][self.UDP]))
1014 # Permit ip any any in the end
1015 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1017 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1021 self.apply_rules(rules, "deny ip4/ip6 udp "+str(port))
1023 # Traffic should not pass
1024 self.run_verify_negat_test(self.IP, self.IPRANDOM,
1025 self.proto[self.IP][self.UDP], port)
1027 self.logger.info("ACLP_TEST_FINISH_0020")
1029 def test_0021_udp_deny_port_verify_fragment_deny(self):
1030 """ deny single UDPv4/v6, permit ip any, verify non-initial fragment blocked
1032 self.logger.info("ACLP_TEST_START_0021")
1034 port = random.randint(0, 65535)
1037 rules.append(self.create_rule(self.IPV4, self.DENY, port,
1038 self.proto[self.IP][self.UDP]))
1039 rules.append(self.create_rule(self.IPV6, self.DENY, port,
1040 self.proto[self.IP][self.UDP]))
1041 # deny ip any any in the end
1042 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1044 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1048 self.apply_rules(rules, "deny ip4/ip6 udp "+str(port))
1050 # Traffic should not pass
1051 self.run_verify_negat_test(self.IP, self.IPRANDOM,
1052 self.proto[self.IP][self.UDP], port, True)
1054 self.logger.info("ACLP_TEST_FINISH_0021")
1056 def test_0022_zero_length_udp_ipv4(self):
1057 """ VPP-687 zero length udp ipv4 packet"""
1058 self.logger.info("ACLP_TEST_START_0022")
1060 port = random.randint(0, 65535)
1063 rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
1064 self.proto[self.IP][self.UDP]))
1065 # deny ip any any in the end
1067 self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1070 self.apply_rules(rules, "permit empty udp ip4 " + str(port))
1072 # Traffic should still pass
1073 # Create incoming packet streams for packet-generator interfaces
1075 pkts = self.create_stream(self.pg0, self.pg_if_packet_sizes,
1077 self.proto[self.IP][self.UDP], port,
1080 self.pg0.add_stream(pkts)
1081 pkts_cnt += len(pkts)
1083 # Enable packet capture and start packet sendingself.IPV
1084 self.pg_enable_capture(self.pg_interfaces)
1087 self.pg1.get_capture(pkts_cnt)
1089 self.logger.info("ACLP_TEST_FINISH_0022")
1091 def test_0023_zero_length_udp_ipv6(self):
1092 """ VPP-687 zero length udp ipv6 packet"""
1093 self.logger.info("ACLP_TEST_START_0023")
1095 port = random.randint(0, 65535)
1098 rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
1099 self.proto[self.IP][self.UDP]))
1100 # deny ip any any in the end
1101 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1104 self.apply_rules(rules, "permit empty udp ip6 "+str(port))
1106 # Traffic should still pass
1107 # Create incoming packet streams for packet-generator interfaces
1109 pkts = self.create_stream(self.pg0, self.pg_if_packet_sizes,
1111 self.proto[self.IP][self.UDP], port,
1114 self.pg0.add_stream(pkts)
1115 pkts_cnt += len(pkts)
1117 # Enable packet capture and start packet sendingself.IPV
1118 self.pg_enable_capture(self.pg_interfaces)
1121 # Verify outgoing packet streams per packet-generator interface
1122 self.pg1.get_capture(pkts_cnt)
1124 self.logger.info("ACLP_TEST_FINISH_0023")
1126 if __name__ == '__main__':
1127 unittest.main(testRunner=VppTestRunner)