2 """ACL plugin Test Case HLD:
8 from scapy.packet import Raw
9 from scapy.layers.l2 import Ether
10 from scapy.layers.inet import IP, TCP, UDP, ICMP
11 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest
12 from framework import VppTestCase, VppTestRunner
13 from util import Host, ppp
16 class TestACLplugin(VppTestCase):
17 """ ACL plugin Test Case """
33 proto = [[6, 17], [1, 58]]
34 proto_map = {1: 'ICMP', 58: 'ICMPv6EchoRequest', 6: 'TCP', 17: 'UDP'}
45 udp_sport_to = udp_sport_from + 5
46 udp_dport_from = 20000
47 udp_dport_to = udp_dport_from + 5000
49 tcp_sport_to = tcp_sport_from + 5
50 tcp_dport_from = 40000
51 tcp_dport_to = tcp_dport_from + 5000
53 icmp4_type = 8 # echo request
55 icmp6_type = 128 # echo request
64 Perform standard class setup (defined by class method setUpClass in
65 class VppTestCase) before running the test case, set test case related
66 variables and configure VPP.
68 super(TestACLplugin, cls).setUpClass()
73 # Create 2 pg interfaces
74 cls.create_pg_interfaces(range(2))
76 # Packet flows mapping pg0 -> pg1, pg2 etc.
78 cls.flows[cls.pg0] = [cls.pg1]
81 cls.pg_if_packet_sizes = [64, 512, 1518, 9018]
83 # Create BD with MAC learning and unknown unicast flooding disabled
84 # and put interfaces to this BD
85 cls.vapi.bridge_domain_add_del(bd_id=cls.bd_id, uu_flood=1,
87 for pg_if in cls.pg_interfaces:
88 cls.vapi.sw_interface_set_l2_bridge(pg_if.sw_if_index,
91 # Set up all interfaces
92 for i in cls.pg_interfaces:
95 # Mapping between packet-generator index and lists of test hosts
96 cls.hosts_by_pg_idx = dict()
97 for pg_if in cls.pg_interfaces:
98 cls.hosts_by_pg_idx[pg_if.sw_if_index] = []
100 # Create list of deleted hosts
101 cls.deleted_hosts_by_pg_idx = dict()
102 for pg_if in cls.pg_interfaces:
103 cls.deleted_hosts_by_pg_idx[pg_if.sw_if_index] = []
105 # warm-up the mac address tables
109 super(TestACLplugin, cls).tearDownClass()
113 super(TestACLplugin, self).setUp()
114 self.reset_packet_infos()
118 Show various debug prints after each test.
120 super(TestACLplugin, self).tearDown()
121 if not self.vpp_dead:
122 self.logger.info(self.vapi.ppcli("show l2fib verbose"))
123 self.logger.info(self.vapi.ppcli("show bridge-domain %s detail"
126 def create_hosts(self, count, start=0):
128 Create required number of host MAC addresses and distribute them among
129 interfaces. Create host IPv4 address for every host MAC address.
131 :param int count: Number of hosts to create MAC/IPv4 addresses for.
132 :param int start: Number to start numbering from.
134 n_int = len(self.pg_interfaces)
135 macs_per_if = count / n_int
137 for pg_if in self.pg_interfaces:
139 start_nr = macs_per_if * i + start
140 end_nr = count + start if i == (n_int - 1) \
141 else macs_per_if * (i + 1) + start
142 hosts = self.hosts_by_pg_idx[pg_if.sw_if_index]
143 for j in range(start_nr, end_nr):
145 "00:00:00:ff:%02x:%02x" % (pg_if.sw_if_index, j),
146 "172.17.1%02x.%u" % (pg_if.sw_if_index, j),
147 "2017:dead:%02x::%u" % (pg_if.sw_if_index, j))
150 def create_rule(self, ip=0, permit_deny=0, ports=PORTS_ALL, proto=-1,
151 s_prefix=0, s_ip='\x00\x00\x00\x00',
152 d_prefix=0, d_ip='\x00\x00\x00\x00'):
155 if ports == self.PORTS_ALL:
158 sport_to = 65535 if proto != 1 and proto != 58 else 255
160 elif ports == self.PORTS_RANGE:
162 sport_from = self.icmp4_type
163 sport_to = self.icmp4_type
164 dport_from = self.icmp4_code
165 dport_to = self.icmp4_code
167 sport_from = self.icmp6_type
168 sport_to = self.icmp6_type
169 dport_from = self.icmp6_code
170 dport_to = self.icmp6_code
171 elif proto == self.proto[self.IP][self.TCP]:
172 sport_from = self.tcp_sport_from
173 sport_to = self.tcp_sport_to
174 dport_from = self.tcp_dport_from
175 dport_to = self.tcp_dport_to
176 elif proto == self.proto[self.IP][self.UDP]:
177 sport_from = self.udp_sport_from
178 sport_to = self.udp_sport_to
179 dport_from = self.udp_dport_from
180 dport_to = self.udp_dport_to
187 rule = ({'is_permit': permit_deny, 'is_ipv6': ip, 'proto': proto,
188 'srcport_or_icmptype_first': sport_from,
189 'srcport_or_icmptype_last': sport_to,
190 'src_ip_prefix_len': s_prefix,
192 'dstport_or_icmpcode_first': dport_from,
193 'dstport_or_icmpcode_last': dport_to,
194 'dst_ip_prefix_len': d_prefix,
195 'dst_ip_addr': d_ip})
198 def apply_rules(self, rules, tag=''):
199 reply = self.api_acl_add_replace(acl_index=4294967295, r=rules,
202 self.logger.info("Dumped ACL: " + str(
203 self.api_acl_dump(reply.acl_index)))
204 # Apply a ACL on the interface as inbound
205 for i in self.pg_interfaces:
206 self.api_acl_interface_set_acl_list(sw_if_index=i.sw_if_index,
208 acls=[reply.acl_index])
211 def create_upper_layer(self, packet_index, proto, ports=0):
212 p = self.proto_map[proto]
215 return UDP(sport=random.randint(self.udp_sport_from,
217 dport=random.randint(self.udp_dport_from,
220 return UDP(sport=ports, dport=ports)
223 return TCP(sport=random.randint(self.tcp_sport_from,
225 dport=random.randint(self.tcp_dport_from,
228 return TCP(sport=ports, dport=ports)
231 def create_stream(self, src_if, packet_sizes, traffic_type=0, ipv6=0,
234 Create input packet stream for defined interface using hosts or
237 :param object src_if: Interface to create packet stream for.
238 :param list packet_sizes: List of required packet sizes.
239 :param traffic_type: 1: ICMP packet, 2: IPv6 with EH, 0: otherwise.
240 :return: Stream of packets.
243 if self.flows.__contains__(src_if):
244 src_hosts = self.hosts_by_pg_idx[src_if.sw_if_index]
245 for dst_if in self.flows[src_if]:
246 dst_hosts = self.hosts_by_pg_idx[dst_if.sw_if_index]
247 n_int = len(dst_hosts) * len(src_hosts)
248 for i in range(0, n_int):
249 dst_host = dst_hosts[i / len(src_hosts)]
250 src_host = src_hosts[i % len(src_hosts)]
251 pkt_info = self.create_packet_info(src_if, dst_if)
257 pkt_info.ip = random.choice([0, 1])
259 pkt_info.proto = random.choice(self.proto[self.IP])
261 pkt_info.proto = proto
262 payload = self.info_to_payload(pkt_info)
263 p = Ether(dst=dst_host.mac, src=src_host.mac)
265 p /= IPv6(dst=dst_host.ip6, src=src_host.ip6)
267 p /= IP(src=src_host.ip4, dst=dst_host.ip4)
268 if traffic_type == self.ICMP:
270 p /= ICMPv6EchoRequest(type=self.icmp6_type,
271 code=self.icmp6_code)
273 p /= ICMP(type=self.icmp4_type,
274 code=self.icmp4_code)
276 p /= self.create_upper_layer(i, pkt_info.proto, ports)
278 pkt_info.data = p.copy()
279 size = random.choice(packet_sizes)
280 self.extend_packet(p, size)
284 def verify_capture(self, pg_if, capture, traffic_type=0, ip_type=0):
286 Verify captured input packet stream for defined interface.
288 :param object pg_if: Interface to verify captured packet stream for.
289 :param list capture: Captured packet stream.
290 :param traffic_type: 1: ICMP packet, 2: IPv6 with EH, 0: otherwise.
293 for i in self.pg_interfaces:
294 last_info[i.sw_if_index] = None
295 dst_sw_if_index = pg_if.sw_if_index
296 for packet in capture:
298 # Raw data for ICMPv6 are stored in ICMPv6EchoRequest.data
299 if traffic_type == self.ICMP and ip_type == self.IPV6:
300 payload_info = self.payload_to_info(
301 packet[ICMPv6EchoRequest].data)
302 payload = packet[ICMPv6EchoRequest]
304 payload_info = self.payload_to_info(str(packet[Raw]))
305 payload = packet[self.proto_map[payload_info.proto]]
307 self.logger.error(ppp("Unexpected or invalid packet "
308 "(outside network):", packet))
312 self.assertEqual(payload_info.ip, ip_type)
313 if traffic_type == self.ICMP:
315 if payload_info.ip == 0:
316 self.assertEqual(payload.type, self.icmp4_type)
317 self.assertEqual(payload.code, self.icmp4_code)
319 self.assertEqual(payload.type, self.icmp6_type)
320 self.assertEqual(payload.code, self.icmp6_code)
322 self.logger.error(ppp("Unexpected or invalid packet "
323 "(outside network):", packet))
327 ip_version = IPv6 if payload_info.ip == 1 else IP
329 ip = packet[ip_version]
330 packet_index = payload_info.index
332 self.assertEqual(payload_info.dst, dst_sw_if_index)
333 self.logger.debug("Got packet on port %s: src=%u (id=%u)" %
334 (pg_if.name, payload_info.src,
336 next_info = self.get_next_packet_info_for_interface2(
337 payload_info.src, dst_sw_if_index,
338 last_info[payload_info.src])
339 last_info[payload_info.src] = next_info
340 self.assertTrue(next_info is not None)
341 self.assertEqual(packet_index, next_info.index)
342 saved_packet = next_info.data
343 # Check standard fields
344 self.assertEqual(ip.src, saved_packet[ip_version].src)
345 self.assertEqual(ip.dst, saved_packet[ip_version].dst)
346 p = self.proto_map[payload_info.proto]
349 self.assertEqual(tcp.sport, saved_packet[
351 self.assertEqual(tcp.dport, saved_packet[
355 self.assertEqual(udp.sport, saved_packet[
357 self.assertEqual(udp.dport, saved_packet[
360 self.logger.error(ppp("Unexpected or invalid packet:",
363 for i in self.pg_interfaces:
364 remaining_packet = self.get_next_packet_info_for_interface2(
365 i, dst_sw_if_index, last_info[i.sw_if_index])
367 remaining_packet is None,
368 "Port %u: Packet expected from source %u didn't arrive" %
369 (dst_sw_if_index, i.sw_if_index))
371 def run_traffic_no_check(self):
373 # Create incoming packet streams for packet-generator interfaces
374 for i in self.pg_interfaces:
375 if self.flows.__contains__(i):
376 pkts = self.create_stream(i, self.pg_if_packet_sizes)
380 # Enable packet capture and start packet sending
381 self.pg_enable_capture(self.pg_interfaces)
384 def run_verify_test(self, traffic_type=0, ip_type=0, proto=-1, ports=0):
386 # Create incoming packet streams for packet-generator interfaces
388 for i in self.pg_interfaces:
389 if self.flows.__contains__(i):
390 pkts = self.create_stream(i, self.pg_if_packet_sizes,
391 traffic_type, ip_type, proto, ports)
394 pkts_cnt += len(pkts)
396 # Enable packet capture and start packet sendingself.IPV
397 self.pg_enable_capture(self.pg_interfaces)
401 # Verify outgoing packet streams per packet-generator interface
402 for src_if in self.pg_interfaces:
403 if self.flows.__contains__(src_if):
404 for dst_if in self.flows[src_if]:
405 capture = dst_if.get_capture(pkts_cnt)
406 self.logger.info("Verifying capture on interface %s" %
408 self.verify_capture(dst_if, capture, traffic_type, ip_type)
410 def run_verify_negat_test(self, traffic_type=0, ip_type=0, proto=-1,
413 self.reset_packet_infos()
414 for i in self.pg_interfaces:
415 if self.flows.__contains__(i):
416 pkts = self.create_stream(i, self.pg_if_packet_sizes,
417 traffic_type, ip_type, proto, ports)
421 # Enable packet capture and start packet sending
422 self.pg_enable_capture(self.pg_interfaces)
426 # Verify outgoing packet streams per packet-generator interface
427 for src_if in self.pg_interfaces:
428 if self.flows.__contains__(src_if):
429 for dst_if in self.flows[src_if]:
430 self.logger.info("Verifying capture on interface %s" %
432 capture = dst_if.get_capture(0)
433 self.assertEqual(len(capture), 0)
435 def api_acl_add_replace(self, acl_index, r, count, tag='',
437 """Add/replace an ACL
439 :param int acl_index: ACL index to replace,
440 4294967295 to create new ACL.
441 :param acl_rule r: ACL rules array.
442 :param str tag: symbolic tag (description) for this ACL.
443 :param int count: number of rules.
445 return self.vapi.api(self.vapi.papi.acl_add_replace,
446 {'acl_index': acl_index,
450 expected_retval=expected_retval)
452 def api_acl_interface_set_acl_list(self, sw_if_index, count, n_input, acls,
454 return self.vapi.api(self.vapi.papi.acl_interface_set_acl_list,
455 {'sw_if_index': sw_if_index,
459 expected_retval=expected_retval)
461 def api_acl_dump(self, acl_index, expected_retval=0):
462 return self.vapi.api(self.vapi.papi.acl_dump,
463 {'acl_index': acl_index},
464 expected_retval=expected_retval)
466 def test_0000_warmup_test(self):
467 """ ACL plugin version check; learn MACs
469 self.create_hosts(16)
470 self.run_traffic_no_check()
471 reply = self.vapi.papi.acl_plugin_get_version()
472 self.assertEqual(reply.major, 1)
473 self.logger.info("Working with ACL plugin version: %d.%d" % (
474 reply.major, reply.minor))
475 # minor version changes are non breaking
476 # self.assertEqual(reply.minor, 0)
478 def test_0001_acl_create(self):
482 self.logger.info("ACLP_TEST_START_0001")
484 r = [{'is_permit': 1, 'is_ipv6': 0, 'proto': 17,
485 'srcport_or_icmptype_first': 1234,
486 'srcport_or_icmptype_last': 1235,
487 'src_ip_prefix_len': 0,
488 'src_ip_addr': '\x00\x00\x00\x00',
489 'dstport_or_icmpcode_first': 1234,
490 'dstport_or_icmpcode_last': 1234,
491 'dst_ip_addr': '\x00\x00\x00\x00',
492 'dst_ip_prefix_len': 0}]
493 # Test 1: add a new ACL
494 reply = self.api_acl_add_replace(acl_index=4294967295, r=r,
495 count=len(r), tag="permit 1234")
496 self.assertEqual(reply.retval, 0)
497 # The very first ACL gets #0
498 self.assertEqual(reply.acl_index, 0)
499 rr = self.api_acl_dump(reply.acl_index)
500 self.logger.info("Dumped ACL: " + str(rr))
501 self.assertEqual(len(rr), 1)
502 # We should have the same number of ACL entries as we had asked
503 self.assertEqual(len(rr[0].r), len(r))
504 # The rules should be the same. But because the submitted and returned
505 # are different types, we need to iterate over rules and keys to get
507 for i_rule in range(0, len(r) - 1):
508 for rule_key in r[i_rule]:
509 self.assertEqual(rr[0].r[i_rule][rule_key],
512 # Add a deny-1234 ACL
513 r_deny = ({'is_permit': 0, 'is_ipv6': 0, 'proto': 17,
514 'srcport_or_icmptype_first': 1234,
515 'srcport_or_icmptype_last': 1235,
516 'src_ip_prefix_len': 0,
517 'src_ip_addr': '\x00\x00\x00\x00',
518 'dstport_or_icmpcode_first': 1234,
519 'dstport_or_icmpcode_last': 1234,
520 'dst_ip_addr': '\x00\x00\x00\x00',
521 'dst_ip_prefix_len': 0},
522 {'is_permit': 1, 'is_ipv6': 0, 'proto': 17,
523 'srcport_or_icmptype_first': 0,
524 'srcport_or_icmptype_last': 0,
525 'src_ip_prefix_len': 0,
526 'src_ip_addr': '\x00\x00\x00\x00',
527 'dstport_or_icmpcode_first': 0,
528 'dstport_or_icmpcode_last': 0,
529 'dst_ip_addr': '\x00\x00\x00\x00',
530 'dst_ip_prefix_len': 0})
532 reply = self.api_acl_add_replace(acl_index=4294967295, r=r_deny,
534 tag="deny 1234;permit all")
535 self.assertEqual(reply.retval, 0)
536 # The second ACL gets #1
537 self.assertEqual(reply.acl_index, 1)
539 # Test 2: try to modify a nonexistent ACL
540 reply = self.api_acl_add_replace(acl_index=432, r=r, count=len(r),
541 tag="FFFF:FFFF", expected_retval=-1)
542 self.assertEqual(reply.retval, -1)
543 # The ACL number should pass through
544 self.assertEqual(reply.acl_index, 432)
546 self.logger.info("ACLP_TEST_FINISH_0001")
548 def test_0002_acl_permit_apply(self):
549 """ permit ACL apply test
551 self.logger.info("ACLP_TEST_START_0002")
554 rules.append(self.create_rule(self.IPV4, self.PERMIT,
555 0, self.proto[self.IP][self.UDP]))
556 rules.append(self.create_rule(self.IPV4, self.PERMIT,
557 0, self.proto[self.IP][self.TCP]))
560 self.apply_rules(rules, "permit per-flow")
562 # Traffic should still pass
563 self.run_verify_test(self.IP, self.IPV4, -1)
564 self.logger.info("ACLP_TEST_FINISH_0002")
566 def test_0003_acl_deny_apply(self):
567 """ deny ACL apply test
569 self.logger.info("ACLP_TEST_START_0003")
570 # Add a deny-flows ACL
572 rules.append(self.create_rule(self.IPV4, self.DENY,
573 self.PORTS_ALL, self.proto[self.IP][self.UDP]))
574 # Permit ip any any in the end
575 rules.append(self.create_rule(self.IPV4, self.PERMIT,
579 self.apply_rules(rules, "deny per-flow;permit all")
581 # Traffic should not pass
582 self.run_verify_negat_test(self.IP, self.IPV4,
583 self.proto[self.IP][self.UDP])
584 self.logger.info("ACLP_TEST_FINISH_0003")
585 # self.assertEqual(1, 0)
587 def test_0004_vpp624_permit_icmpv4(self):
588 """ VPP_624 permit ICMPv4
590 self.logger.info("ACLP_TEST_START_0004")
594 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
595 self.proto[self.ICMP][self.ICMPv4]))
596 # deny ip any any in the end
597 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
600 self.apply_rules(rules, "permit icmpv4")
602 # Traffic should still pass
603 self.run_verify_test(self.ICMP, self.IPV4,
604 self.proto[self.ICMP][self.ICMPv4])
606 self.logger.info("ACLP_TEST_FINISH_0004")
608 def test_0005_vpp624_permit_icmpv6(self):
609 """ VPP_624 permit ICMPv6
611 self.logger.info("ACLP_TEST_START_0005")
615 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
616 self.proto[self.ICMP][self.ICMPv6]))
617 # deny ip any any in the end
618 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
621 self.apply_rules(rules, "permit icmpv6")
623 # Traffic should still pass
624 self.run_verify_test(self.ICMP, self.IPV6,
625 self.proto[self.ICMP][self.ICMPv6])
627 self.logger.info("ACLP_TEST_FINISH_0005")
629 def test_0006_vpp624_deny_icmpv4(self):
630 """ VPP_624 deny ICMPv4
632 self.logger.info("ACLP_TEST_START_0006")
635 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
636 self.proto[self.ICMP][self.ICMPv4]))
637 # permit ip any any in the end
638 rules.append(self.create_rule(self.IPV4, self.PERMIT,
642 self.apply_rules(rules, "deny icmpv4")
644 # Traffic should not pass
645 self.run_verify_negat_test(self.ICMP, self.IPV4, 0)
647 self.logger.info("ACLP_TEST_FINISH_0006")
649 def test_0007_vpp624_deny_icmpv6(self):
650 """ VPP_624 deny ICMPv6
652 self.logger.info("ACLP_TEST_START_0007")
655 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
656 self.proto[self.ICMP][self.ICMPv6]))
657 # deny ip any any in the end
658 rules.append(self.create_rule(self.IPV6, self.PERMIT,
662 self.apply_rules(rules, "deny icmpv6")
664 # Traffic should not pass
665 self.run_verify_negat_test(self.ICMP, self.IPV6, 0)
667 self.logger.info("ACLP_TEST_FINISH_0007")
669 def test_0008_tcp_permit_v4(self):
672 self.logger.info("ACLP_TEST_START_0008")
676 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
677 self.proto[self.IP][self.TCP]))
678 # deny ip any any in the end
679 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
682 self.apply_rules(rules, "permit ipv4 tcp")
684 # Traffic should still pass
685 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP])
687 self.logger.info("ACLP_TEST_FINISH_0008")
689 def test_0009_tcp_permit_v6(self):
692 self.logger.info("ACLP_TEST_START_0009")
696 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
697 self.proto[self.IP][self.TCP]))
698 # deny ip any any in the end
699 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
702 self.apply_rules(rules, "permit ip6 tcp")
704 # Traffic should still pass
705 self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.TCP])
707 self.logger.info("ACLP_TEST_FINISH_0008")
709 def test_0010_udp_permit_v4(self):
712 self.logger.info("ACLP_TEST_START_0010")
716 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
717 self.proto[self.IP][self.UDP]))
718 # deny ip any any in the end
719 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
722 self.apply_rules(rules, "permit ipv udp")
724 # Traffic should still pass
725 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP])
727 self.logger.info("ACLP_TEST_FINISH_0010")
729 def test_0011_udp_permit_v6(self):
732 self.logger.info("ACLP_TEST_START_0011")
736 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
737 self.proto[self.IP][self.UDP]))
738 # deny ip any any in the end
739 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
742 self.apply_rules(rules, "permit ip6 udp")
744 # Traffic should still pass
745 self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.UDP])
747 self.logger.info("ACLP_TEST_FINISH_0011")
749 def test_0012_tcp_deny(self):
752 self.logger.info("ACLP_TEST_START_0012")
756 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
757 self.proto[self.IP][self.TCP]))
758 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
759 self.proto[self.IP][self.TCP]))
760 # permit ip any any in the end
761 rules.append(self.create_rule(self.IPV4, self.PERMIT,
763 rules.append(self.create_rule(self.IPV6, self.PERMIT,
767 self.apply_rules(rules, "deny ip4/ip6 tcp")
769 # Traffic should not pass
770 self.run_verify_negat_test(self.IP, self.IPRANDOM,
771 self.proto[self.IP][self.TCP])
773 self.logger.info("ACLP_TEST_FINISH_0012")
775 def test_0013_udp_deny(self):
778 self.logger.info("ACLP_TEST_START_0013")
782 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
783 self.proto[self.IP][self.UDP]))
784 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
785 self.proto[self.IP][self.UDP]))
786 # permit ip any any in the end
787 rules.append(self.create_rule(self.IPV4, self.PERMIT,
789 rules.append(self.create_rule(self.IPV6, self.PERMIT,
793 self.apply_rules(rules, "deny ip4/ip6 udp")
795 # Traffic should not pass
796 self.run_verify_negat_test(self.IP, self.IPRANDOM,
797 self.proto[self.IP][self.UDP])
799 self.logger.info("ACLP_TEST_FINISH_0013")
801 def test_0014_acl_dump(self):
802 """ verify add/dump acls
804 self.logger.info("ACLP_TEST_START_0014")
806 r = [[self.IPV4, self.PERMIT, 1234, self.proto[self.IP][self.TCP]],
807 [self.IPV4, self.PERMIT, 2345, self.proto[self.IP][self.UDP]],
808 [self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.TCP]],
809 [self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.UDP]],
810 [self.IPV4, self.PERMIT, 5, self.proto[self.ICMP][self.ICMPv4]],
811 [self.IPV6, self.PERMIT, 4321, self.proto[self.IP][self.TCP]],
812 [self.IPV6, self.PERMIT, 5432, self.proto[self.IP][self.UDP]],
813 [self.IPV6, self.PERMIT, 0, self.proto[self.IP][self.TCP]],
814 [self.IPV6, self.PERMIT, 0, self.proto[self.IP][self.UDP]],
815 [self.IPV6, self.PERMIT, 6, self.proto[self.ICMP][self.ICMPv6]],
816 [self.IPV4, self.DENY, self.PORTS_ALL, 0],
817 [self.IPV4, self.DENY, 1234, self.proto[self.IP][self.TCP]],
818 [self.IPV4, self.DENY, 2345, self.proto[self.IP][self.UDP]],
819 [self.IPV4, self.DENY, 5, self.proto[self.ICMP][self.ICMPv4]],
820 [self.IPV6, self.DENY, 4321, self.proto[self.IP][self.TCP]],
821 [self.IPV6, self.DENY, 5432, self.proto[self.IP][self.UDP]],
822 [self.IPV6, self.DENY, 6, self.proto[self.ICMP][self.ICMPv6]],
823 [self.IPV6, self.DENY, self.PORTS_ALL, 0]
826 # Add and verify new ACLs
828 for i in range(len(r)):
829 rules.append(self.create_rule(r[i][0], r[i][1], r[i][2], r[i][3]))
831 reply = self.api_acl_add_replace(acl_index=4294967295, r=rules,
833 result = self.api_acl_dump(reply.acl_index)
836 for drules in result:
838 self.assertEqual(dr.is_ipv6, r[i][0])
839 self.assertEqual(dr.is_permit, r[i][1])
840 self.assertEqual(dr.proto, r[i][3])
843 self.assertEqual(dr.srcport_or_icmptype_first, r[i][2])
846 self.assertEqual(dr.srcport_or_icmptype_first, 0)
847 self.assertEqual(dr.srcport_or_icmptype_last, 65535)
849 if dr.proto == self.proto[self.IP][self.TCP]:
850 self.assertGreater(dr.srcport_or_icmptype_first,
851 self.tcp_sport_from-1)
852 self.assertLess(dr.srcport_or_icmptype_first,
854 self.assertGreater(dr.dstport_or_icmpcode_last,
855 self.tcp_dport_from-1)
856 self.assertLess(dr.dstport_or_icmpcode_last,
858 elif dr.proto == self.proto[self.IP][self.UDP]:
859 self.assertGreater(dr.srcport_or_icmptype_first,
860 self.udp_sport_from-1)
861 self.assertLess(dr.srcport_or_icmptype_first,
863 self.assertGreater(dr.dstport_or_icmpcode_last,
864 self.udp_dport_from-1)
865 self.assertLess(dr.dstport_or_icmpcode_last,
869 self.logger.info("ACLP_TEST_FINISH_0014")
871 def test_0015_tcp_permit_port_v4(self):
872 """ permit single TCPv4
874 self.logger.info("ACLP_TEST_START_0015")
876 port = random.randint(0, 65535)
879 rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
880 self.proto[self.IP][self.TCP]))
881 # deny ip any any in the end
882 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
885 self.apply_rules(rules, "permit ip4 tcp "+str(port))
887 # Traffic should still pass
888 self.run_verify_test(self.IP, self.IPV4,
889 self.proto[self.IP][self.TCP], port)
891 self.logger.info("ACLP_TEST_FINISH_0015")
893 def test_0016_udp_permit_port_v4(self):
894 """ permit single UDPv4
896 self.logger.info("ACLP_TEST_START_0016")
898 port = random.randint(0, 65535)
901 rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
902 self.proto[self.IP][self.UDP]))
903 # deny ip any any in the end
904 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
907 self.apply_rules(rules, "permit ip4 tcp "+str(port))
909 # Traffic should still pass
910 self.run_verify_test(self.IP, self.IPV4,
911 self.proto[self.IP][self.UDP], port)
913 self.logger.info("ACLP_TEST_FINISH_0016")
915 def test_0017_tcp_permit_port_v6(self):
916 """ permit single TCPv6
918 self.logger.info("ACLP_TEST_START_0017")
920 port = random.randint(0, 65535)
923 rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
924 self.proto[self.IP][self.TCP]))
925 # deny ip any any in the end
926 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
929 self.apply_rules(rules, "permit ip4 tcp "+str(port))
931 # Traffic should still pass
932 self.run_verify_test(self.IP, self.IPV6,
933 self.proto[self.IP][self.TCP], port)
935 self.logger.info("ACLP_TEST_FINISH_0017")
937 def test_0018_udp_permit_port_v6(self):
938 """ permit single UPPv6
940 self.logger.info("ACLP_TEST_START_0018")
942 port = random.randint(0, 65535)
945 rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
946 self.proto[self.IP][self.UDP]))
947 # deny ip any any in the end
948 rules.append(self.create_rule(self.IPV6, self.DENY,
952 self.apply_rules(rules, "permit ip4 tcp "+str(port))
954 # Traffic should still pass
955 self.run_verify_test(self.IP, self.IPV6,
956 self.proto[self.IP][self.UDP], port)
958 self.logger.info("ACLP_TEST_FINISH_0018")
960 def test_0019_udp_deny_port(self):
961 """ deny single TCPv4/v6
963 self.logger.info("ACLP_TEST_START_0019")
965 port = random.randint(0, 65535)
968 rules.append(self.create_rule(self.IPV4, self.DENY, port,
969 self.proto[self.IP][self.TCP]))
970 rules.append(self.create_rule(self.IPV6, self.DENY, port,
971 self.proto[self.IP][self.TCP]))
972 # Permit ip any any in the end
973 rules.append(self.create_rule(self.IPV4, self.PERMIT,
975 rules.append(self.create_rule(self.IPV6, self.PERMIT,
979 self.apply_rules(rules, "deny ip4/ip6 udp "+str(port))
981 # Traffic should not pass
982 self.run_verify_negat_test(self.IP, self.IPRANDOM,
983 self.proto[self.IP][self.TCP], port)
985 self.logger.info("ACLP_TEST_FINISH_0019")
987 def test_0020_udp_deny_port(self):
988 """ deny single UDPv4/v6
990 self.logger.info("ACLP_TEST_START_0020")
992 port = random.randint(0, 65535)
995 rules.append(self.create_rule(self.IPV4, self.DENY, port,
996 self.proto[self.IP][self.UDP]))
997 rules.append(self.create_rule(self.IPV6, self.DENY, port,
998 self.proto[self.IP][self.UDP]))
999 # Permit ip any any in the end
1000 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1002 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1006 self.apply_rules(rules, "deny ip4/ip6 udp "+str(port))
1008 # Traffic should not pass
1009 self.run_verify_negat_test(self.IP, self.IPRANDOM,
1010 self.proto[self.IP][self.UDP], port)
1012 self.logger.info("ACLP_TEST_FINISH_0020")
1014 if __name__ == '__main__':
1015 unittest.main(testRunner=VppTestRunner)