2 """ACL plugin Test Case HLD:
8 from scapy.packet import Raw
9 from scapy.layers.l2 import Ether
10 from scapy.layers.inet import IP, TCP, UDP, ICMP
11 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest
12 from scapy.layers.inet6 import IPv6ExtHdrFragment
13 from framework import VppTestCase, VppTestRunner
14 from util import Host, ppp
17 class TestACLplugin(VppTestCase):
18 """ ACL plugin Test Case """
34 proto = [[6, 17], [1, 58]]
35 proto_map = {1: 'ICMP', 58: 'ICMPv6EchoRequest', 6: 'TCP', 17: 'UDP'}
46 udp_sport_to = udp_sport_from + 5
47 udp_dport_from = 20000
48 udp_dport_to = udp_dport_from + 5000
50 tcp_sport_to = tcp_sport_from + 5
51 tcp_dport_from = 40000
52 tcp_dport_to = tcp_dport_from + 5000
54 icmp4_type = 8 # echo request
56 icmp6_type = 128 # echo request
65 Perform standard class setup (defined by class method setUpClass in
66 class VppTestCase) before running the test case, set test case related
67 variables and configure VPP.
69 super(TestACLplugin, cls).setUpClass()
74 # Create 2 pg interfaces
75 cls.create_pg_interfaces(range(2))
77 # Packet flows mapping pg0 -> pg1, pg2 etc.
79 cls.flows[cls.pg0] = [cls.pg1]
82 cls.pg_if_packet_sizes = [64, 512, 1518, 9018]
84 # Create BD with MAC learning and unknown unicast flooding disabled
85 # and put interfaces to this BD
86 cls.vapi.bridge_domain_add_del(bd_id=cls.bd_id, uu_flood=1,
88 for pg_if in cls.pg_interfaces:
89 cls.vapi.sw_interface_set_l2_bridge(pg_if.sw_if_index,
92 # Set up all interfaces
93 for i in cls.pg_interfaces:
96 # Mapping between packet-generator index and lists of test hosts
97 cls.hosts_by_pg_idx = dict()
98 for pg_if in cls.pg_interfaces:
99 cls.hosts_by_pg_idx[pg_if.sw_if_index] = []
101 # Create list of deleted hosts
102 cls.deleted_hosts_by_pg_idx = dict()
103 for pg_if in cls.pg_interfaces:
104 cls.deleted_hosts_by_pg_idx[pg_if.sw_if_index] = []
106 # warm-up the mac address tables
110 super(TestACLplugin, cls).tearDownClass()
114 super(TestACLplugin, self).setUp()
115 self.reset_packet_infos()
119 Show various debug prints after each test.
121 super(TestACLplugin, self).tearDown()
122 if not self.vpp_dead:
123 self.logger.info(self.vapi.ppcli("show l2fib verbose"))
124 self.logger.info(self.vapi.ppcli("show bridge-domain %s detail"
127 def create_hosts(self, count, start=0):
129 Create required number of host MAC addresses and distribute them among
130 interfaces. Create host IPv4 address for every host MAC address.
132 :param int count: Number of hosts to create MAC/IPv4 addresses for.
133 :param int start: Number to start numbering from.
135 n_int = len(self.pg_interfaces)
136 macs_per_if = count / n_int
138 for pg_if in self.pg_interfaces:
140 start_nr = macs_per_if * i + start
141 end_nr = count + start if i == (n_int - 1) \
142 else macs_per_if * (i + 1) + start
143 hosts = self.hosts_by_pg_idx[pg_if.sw_if_index]
144 for j in range(start_nr, end_nr):
146 "00:00:00:ff:%02x:%02x" % (pg_if.sw_if_index, j),
147 "172.17.1%02x.%u" % (pg_if.sw_if_index, j),
148 "2017:dead:%02x::%u" % (pg_if.sw_if_index, j))
151 def create_rule(self, ip=0, permit_deny=0, ports=PORTS_ALL, proto=-1,
152 s_prefix=0, s_ip='\x00\x00\x00\x00',
153 d_prefix=0, d_ip='\x00\x00\x00\x00'):
156 if ports == self.PORTS_ALL:
159 sport_to = 65535 if proto != 1 and proto != 58 else 255
161 elif ports == self.PORTS_RANGE:
163 sport_from = self.icmp4_type
164 sport_to = self.icmp4_type
165 dport_from = self.icmp4_code
166 dport_to = self.icmp4_code
168 sport_from = self.icmp6_type
169 sport_to = self.icmp6_type
170 dport_from = self.icmp6_code
171 dport_to = self.icmp6_code
172 elif proto == self.proto[self.IP][self.TCP]:
173 sport_from = self.tcp_sport_from
174 sport_to = self.tcp_sport_to
175 dport_from = self.tcp_dport_from
176 dport_to = self.tcp_dport_to
177 elif proto == self.proto[self.IP][self.UDP]:
178 sport_from = self.udp_sport_from
179 sport_to = self.udp_sport_to
180 dport_from = self.udp_dport_from
181 dport_to = self.udp_dport_to
188 rule = ({'is_permit': permit_deny, 'is_ipv6': ip, 'proto': proto,
189 'srcport_or_icmptype_first': sport_from,
190 'srcport_or_icmptype_last': sport_to,
191 'src_ip_prefix_len': s_prefix,
193 'dstport_or_icmpcode_first': dport_from,
194 'dstport_or_icmpcode_last': dport_to,
195 'dst_ip_prefix_len': d_prefix,
196 'dst_ip_addr': d_ip})
199 def apply_rules(self, rules, tag=''):
200 reply = self.api_acl_add_replace(acl_index=4294967295, r=rules,
203 self.logger.info("Dumped ACL: " + str(
204 self.api_acl_dump(reply.acl_index)))
205 # Apply a ACL on the interface as inbound
206 for i in self.pg_interfaces:
207 self.api_acl_interface_set_acl_list(sw_if_index=i.sw_if_index,
209 acls=[reply.acl_index])
212 def create_upper_layer(self, packet_index, proto, ports=0):
213 p = self.proto_map[proto]
216 return UDP(sport=random.randint(self.udp_sport_from,
218 dport=random.randint(self.udp_dport_from,
221 return UDP(sport=ports, dport=ports)
224 return TCP(sport=random.randint(self.tcp_sport_from,
226 dport=random.randint(self.tcp_dport_from,
229 return TCP(sport=ports, dport=ports)
232 def create_stream(self, src_if, packet_sizes, traffic_type=0, ipv6=0,
233 proto=-1, ports=0, fragments=False, pkt_raw=True):
235 Create input packet stream for defined interface using hosts or
238 :param object src_if: Interface to create packet stream for.
239 :param list packet_sizes: List of required packet sizes.
240 :param traffic_type: 1: ICMP packet, 2: IPv6 with EH, 0: otherwise.
241 :return: Stream of packets.
244 if self.flows.__contains__(src_if):
245 src_hosts = self.hosts_by_pg_idx[src_if.sw_if_index]
246 for dst_if in self.flows[src_if]:
247 dst_hosts = self.hosts_by_pg_idx[dst_if.sw_if_index]
248 n_int = len(dst_hosts) * len(src_hosts)
249 for i in range(0, n_int):
250 dst_host = dst_hosts[i / len(src_hosts)]
251 src_host = src_hosts[i % len(src_hosts)]
252 pkt_info = self.create_packet_info(src_if, dst_if)
258 pkt_info.ip = random.choice([0, 1])
260 pkt_info.proto = random.choice(self.proto[self.IP])
262 pkt_info.proto = proto
263 payload = self.info_to_payload(pkt_info)
264 p = Ether(dst=dst_host.mac, src=src_host.mac)
266 p /= IPv6(dst=dst_host.ip6, src=src_host.ip6)
268 p /= IPv6ExtHdrFragment(offset=64, m=1)
271 p /= IP(src=src_host.ip4, dst=dst_host.ip4,
274 p /= IP(src=src_host.ip4, dst=dst_host.ip4)
275 if traffic_type == self.ICMP:
277 p /= ICMPv6EchoRequest(type=self.icmp6_type,
278 code=self.icmp6_code)
280 p /= ICMP(type=self.icmp4_type,
281 code=self.icmp4_code)
283 p /= self.create_upper_layer(i, pkt_info.proto, ports)
286 pkt_info.data = p.copy()
288 size = random.choice(packet_sizes)
289 self.extend_packet(p, size)
293 def verify_capture(self, pg_if, capture, traffic_type=0, ip_type=0):
295 Verify captured input packet stream for defined interface.
297 :param object pg_if: Interface to verify captured packet stream for.
298 :param list capture: Captured packet stream.
299 :param traffic_type: 1: ICMP packet, 2: IPv6 with EH, 0: otherwise.
302 for i in self.pg_interfaces:
303 last_info[i.sw_if_index] = None
304 dst_sw_if_index = pg_if.sw_if_index
305 for packet in capture:
307 # Raw data for ICMPv6 are stored in ICMPv6EchoRequest.data
308 if traffic_type == self.ICMP and ip_type == self.IPV6:
309 payload_info = self.payload_to_info(
310 packet[ICMPv6EchoRequest].data)
311 payload = packet[ICMPv6EchoRequest]
313 payload_info = self.payload_to_info(str(packet[Raw]))
314 payload = packet[self.proto_map[payload_info.proto]]
316 self.logger.error(ppp("Unexpected or invalid packet "
317 "(outside network):", packet))
321 self.assertEqual(payload_info.ip, ip_type)
322 if traffic_type == self.ICMP:
324 if payload_info.ip == 0:
325 self.assertEqual(payload.type, self.icmp4_type)
326 self.assertEqual(payload.code, self.icmp4_code)
328 self.assertEqual(payload.type, self.icmp6_type)
329 self.assertEqual(payload.code, self.icmp6_code)
331 self.logger.error(ppp("Unexpected or invalid packet "
332 "(outside network):", packet))
336 ip_version = IPv6 if payload_info.ip == 1 else IP
338 ip = packet[ip_version]
339 packet_index = payload_info.index
341 self.assertEqual(payload_info.dst, dst_sw_if_index)
342 self.logger.debug("Got packet on port %s: src=%u (id=%u)" %
343 (pg_if.name, payload_info.src,
345 next_info = self.get_next_packet_info_for_interface2(
346 payload_info.src, dst_sw_if_index,
347 last_info[payload_info.src])
348 last_info[payload_info.src] = next_info
349 self.assertTrue(next_info is not None)
350 self.assertEqual(packet_index, next_info.index)
351 saved_packet = next_info.data
352 # Check standard fields
353 self.assertEqual(ip.src, saved_packet[ip_version].src)
354 self.assertEqual(ip.dst, saved_packet[ip_version].dst)
355 p = self.proto_map[payload_info.proto]
358 self.assertEqual(tcp.sport, saved_packet[
360 self.assertEqual(tcp.dport, saved_packet[
364 self.assertEqual(udp.sport, saved_packet[
366 self.assertEqual(udp.dport, saved_packet[
369 self.logger.error(ppp("Unexpected or invalid packet:",
372 for i in self.pg_interfaces:
373 remaining_packet = self.get_next_packet_info_for_interface2(
374 i, dst_sw_if_index, last_info[i.sw_if_index])
376 remaining_packet is None,
377 "Port %u: Packet expected from source %u didn't arrive" %
378 (dst_sw_if_index, i.sw_if_index))
380 def run_traffic_no_check(self):
382 # Create incoming packet streams for packet-generator interfaces
383 for i in self.pg_interfaces:
384 if self.flows.__contains__(i):
385 pkts = self.create_stream(i, self.pg_if_packet_sizes)
389 # Enable packet capture and start packet sending
390 self.pg_enable_capture(self.pg_interfaces)
393 def run_verify_test(self, traffic_type=0, ip_type=0, proto=-1, ports=0,
394 frags=False, pkt_raw=True):
396 # Create incoming packet streams for packet-generator interfaces
398 for i in self.pg_interfaces:
399 if self.flows.__contains__(i):
400 pkts = self.create_stream(i, self.pg_if_packet_sizes,
401 traffic_type, ip_type, proto, ports,
405 pkts_cnt += len(pkts)
407 # Enable packet capture and start packet sendingself.IPV
408 self.pg_enable_capture(self.pg_interfaces)
412 # Verify outgoing packet streams per packet-generator interface
413 for src_if in self.pg_interfaces:
414 if self.flows.__contains__(src_if):
415 for dst_if in self.flows[src_if]:
416 capture = dst_if.get_capture(pkts_cnt)
417 self.logger.info("Verifying capture on interface %s" %
419 self.verify_capture(dst_if, capture, traffic_type, ip_type)
421 def run_verify_negat_test(self, traffic_type=0, ip_type=0, proto=-1,
422 ports=0, frags=False):
424 self.reset_packet_infos()
425 for i in self.pg_interfaces:
426 if self.flows.__contains__(i):
427 pkts = self.create_stream(i, self.pg_if_packet_sizes,
428 traffic_type, ip_type, proto, ports,
433 # Enable packet capture and start packet sending
434 self.pg_enable_capture(self.pg_interfaces)
438 # Verify outgoing packet streams per packet-generator interface
439 for src_if in self.pg_interfaces:
440 if self.flows.__contains__(src_if):
441 for dst_if in self.flows[src_if]:
442 self.logger.info("Verifying capture on interface %s" %
444 capture = dst_if.get_capture(0)
445 self.assertEqual(len(capture), 0)
447 def api_acl_add_replace(self, acl_index, r, count, tag='',
449 """Add/replace an ACL
451 :param int acl_index: ACL index to replace,
452 4294967295 to create new ACL.
453 :param acl_rule r: ACL rules array.
454 :param str tag: symbolic tag (description) for this ACL.
455 :param int count: number of rules.
457 return self.vapi.api(self.vapi.papi.acl_add_replace,
458 {'acl_index': acl_index,
462 expected_retval=expected_retval)
464 def api_acl_interface_set_acl_list(self, sw_if_index, count, n_input, acls,
466 return self.vapi.api(self.vapi.papi.acl_interface_set_acl_list,
467 {'sw_if_index': sw_if_index,
471 expected_retval=expected_retval)
473 def api_acl_dump(self, acl_index, expected_retval=0):
474 return self.vapi.api(self.vapi.papi.acl_dump,
475 {'acl_index': acl_index},
476 expected_retval=expected_retval)
478 def test_0000_warmup_test(self):
479 """ ACL plugin version check; learn MACs
481 self.create_hosts(16)
482 self.run_traffic_no_check()
483 reply = self.vapi.papi.acl_plugin_get_version()
484 self.assertEqual(reply.major, 1)
485 self.logger.info("Working with ACL plugin version: %d.%d" % (
486 reply.major, reply.minor))
487 # minor version changes are non breaking
488 # self.assertEqual(reply.minor, 0)
490 def test_0001_acl_create(self):
494 self.logger.info("ACLP_TEST_START_0001")
496 r = [{'is_permit': 1, 'is_ipv6': 0, 'proto': 17,
497 'srcport_or_icmptype_first': 1234,
498 'srcport_or_icmptype_last': 1235,
499 'src_ip_prefix_len': 0,
500 'src_ip_addr': '\x00\x00\x00\x00',
501 'dstport_or_icmpcode_first': 1234,
502 'dstport_or_icmpcode_last': 1234,
503 'dst_ip_addr': '\x00\x00\x00\x00',
504 'dst_ip_prefix_len': 0}]
505 # Test 1: add a new ACL
506 reply = self.api_acl_add_replace(acl_index=4294967295, r=r,
507 count=len(r), tag="permit 1234")
508 self.assertEqual(reply.retval, 0)
509 # The very first ACL gets #0
510 self.assertEqual(reply.acl_index, 0)
511 rr = self.api_acl_dump(reply.acl_index)
512 self.logger.info("Dumped ACL: " + str(rr))
513 self.assertEqual(len(rr), 1)
514 # We should have the same number of ACL entries as we had asked
515 self.assertEqual(len(rr[0].r), len(r))
516 # The rules should be the same. But because the submitted and returned
517 # are different types, we need to iterate over rules and keys to get
519 for i_rule in range(0, len(r) - 1):
520 for rule_key in r[i_rule]:
521 self.assertEqual(rr[0].r[i_rule][rule_key],
524 # Add a deny-1234 ACL
525 r_deny = ({'is_permit': 0, 'is_ipv6': 0, 'proto': 17,
526 'srcport_or_icmptype_first': 1234,
527 'srcport_or_icmptype_last': 1235,
528 'src_ip_prefix_len': 0,
529 'src_ip_addr': '\x00\x00\x00\x00',
530 'dstport_or_icmpcode_first': 1234,
531 'dstport_or_icmpcode_last': 1234,
532 'dst_ip_addr': '\x00\x00\x00\x00',
533 'dst_ip_prefix_len': 0},
534 {'is_permit': 1, 'is_ipv6': 0, 'proto': 17,
535 'srcport_or_icmptype_first': 0,
536 'srcport_or_icmptype_last': 0,
537 'src_ip_prefix_len': 0,
538 'src_ip_addr': '\x00\x00\x00\x00',
539 'dstport_or_icmpcode_first': 0,
540 'dstport_or_icmpcode_last': 0,
541 'dst_ip_addr': '\x00\x00\x00\x00',
542 'dst_ip_prefix_len': 0})
544 reply = self.api_acl_add_replace(acl_index=4294967295, r=r_deny,
546 tag="deny 1234;permit all")
547 self.assertEqual(reply.retval, 0)
548 # The second ACL gets #1
549 self.assertEqual(reply.acl_index, 1)
551 # Test 2: try to modify a nonexistent ACL
552 reply = self.api_acl_add_replace(acl_index=432, r=r, count=len(r),
553 tag="FFFF:FFFF", expected_retval=-1)
554 self.assertEqual(reply.retval, -1)
555 # The ACL number should pass through
556 self.assertEqual(reply.acl_index, 432)
558 self.logger.info("ACLP_TEST_FINISH_0001")
560 def test_0002_acl_permit_apply(self):
561 """ permit ACL apply test
563 self.logger.info("ACLP_TEST_START_0002")
566 rules.append(self.create_rule(self.IPV4, self.PERMIT,
567 0, self.proto[self.IP][self.UDP]))
568 rules.append(self.create_rule(self.IPV4, self.PERMIT,
569 0, self.proto[self.IP][self.TCP]))
572 self.apply_rules(rules, "permit per-flow")
574 # Traffic should still pass
575 self.run_verify_test(self.IP, self.IPV4, -1)
576 self.logger.info("ACLP_TEST_FINISH_0002")
578 def test_0003_acl_deny_apply(self):
579 """ deny ACL apply test
581 self.logger.info("ACLP_TEST_START_0003")
582 # Add a deny-flows ACL
584 rules.append(self.create_rule(self.IPV4, self.DENY,
585 self.PORTS_ALL, self.proto[self.IP][self.UDP]))
586 # Permit ip any any in the end
587 rules.append(self.create_rule(self.IPV4, self.PERMIT,
591 self.apply_rules(rules, "deny per-flow;permit all")
593 # Traffic should not pass
594 self.run_verify_negat_test(self.IP, self.IPV4,
595 self.proto[self.IP][self.UDP])
596 self.logger.info("ACLP_TEST_FINISH_0003")
597 # self.assertEqual(1, 0)
599 def test_0004_vpp624_permit_icmpv4(self):
600 """ VPP_624 permit ICMPv4
602 self.logger.info("ACLP_TEST_START_0004")
606 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
607 self.proto[self.ICMP][self.ICMPv4]))
608 # deny ip any any in the end
609 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
612 self.apply_rules(rules, "permit icmpv4")
614 # Traffic should still pass
615 self.run_verify_test(self.ICMP, self.IPV4,
616 self.proto[self.ICMP][self.ICMPv4])
618 self.logger.info("ACLP_TEST_FINISH_0004")
620 def test_0005_vpp624_permit_icmpv6(self):
621 """ VPP_624 permit ICMPv6
623 self.logger.info("ACLP_TEST_START_0005")
627 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
628 self.proto[self.ICMP][self.ICMPv6]))
629 # deny ip any any in the end
630 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
633 self.apply_rules(rules, "permit icmpv6")
635 # Traffic should still pass
636 self.run_verify_test(self.ICMP, self.IPV6,
637 self.proto[self.ICMP][self.ICMPv6])
639 self.logger.info("ACLP_TEST_FINISH_0005")
641 def test_0006_vpp624_deny_icmpv4(self):
642 """ VPP_624 deny ICMPv4
644 self.logger.info("ACLP_TEST_START_0006")
647 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
648 self.proto[self.ICMP][self.ICMPv4]))
649 # permit ip any any in the end
650 rules.append(self.create_rule(self.IPV4, self.PERMIT,
654 self.apply_rules(rules, "deny icmpv4")
656 # Traffic should not pass
657 self.run_verify_negat_test(self.ICMP, self.IPV4, 0)
659 self.logger.info("ACLP_TEST_FINISH_0006")
661 def test_0007_vpp624_deny_icmpv6(self):
662 """ VPP_624 deny ICMPv6
664 self.logger.info("ACLP_TEST_START_0007")
667 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
668 self.proto[self.ICMP][self.ICMPv6]))
669 # deny ip any any in the end
670 rules.append(self.create_rule(self.IPV6, self.PERMIT,
674 self.apply_rules(rules, "deny icmpv6")
676 # Traffic should not pass
677 self.run_verify_negat_test(self.ICMP, self.IPV6, 0)
679 self.logger.info("ACLP_TEST_FINISH_0007")
681 def test_0008_tcp_permit_v4(self):
684 self.logger.info("ACLP_TEST_START_0008")
688 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
689 self.proto[self.IP][self.TCP]))
690 # deny ip any any in the end
691 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
694 self.apply_rules(rules, "permit ipv4 tcp")
696 # Traffic should still pass
697 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP])
699 self.logger.info("ACLP_TEST_FINISH_0008")
701 def test_0009_tcp_permit_v6(self):
704 self.logger.info("ACLP_TEST_START_0009")
708 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
709 self.proto[self.IP][self.TCP]))
710 # deny ip any any in the end
711 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
714 self.apply_rules(rules, "permit ip6 tcp")
716 # Traffic should still pass
717 self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.TCP])
719 self.logger.info("ACLP_TEST_FINISH_0008")
721 def test_0010_udp_permit_v4(self):
724 self.logger.info("ACLP_TEST_START_0010")
728 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
729 self.proto[self.IP][self.UDP]))
730 # deny ip any any in the end
731 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
734 self.apply_rules(rules, "permit ipv udp")
736 # Traffic should still pass
737 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP])
739 self.logger.info("ACLP_TEST_FINISH_0010")
741 def test_0011_udp_permit_v6(self):
744 self.logger.info("ACLP_TEST_START_0011")
748 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
749 self.proto[self.IP][self.UDP]))
750 # deny ip any any in the end
751 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
754 self.apply_rules(rules, "permit ip6 udp")
756 # Traffic should still pass
757 self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.UDP])
759 self.logger.info("ACLP_TEST_FINISH_0011")
761 def test_0012_tcp_deny(self):
764 self.logger.info("ACLP_TEST_START_0012")
768 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
769 self.proto[self.IP][self.TCP]))
770 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
771 self.proto[self.IP][self.TCP]))
772 # permit ip any any in the end
773 rules.append(self.create_rule(self.IPV4, self.PERMIT,
775 rules.append(self.create_rule(self.IPV6, self.PERMIT,
779 self.apply_rules(rules, "deny ip4/ip6 tcp")
781 # Traffic should not pass
782 self.run_verify_negat_test(self.IP, self.IPRANDOM,
783 self.proto[self.IP][self.TCP])
785 self.logger.info("ACLP_TEST_FINISH_0012")
787 def test_0013_udp_deny(self):
790 self.logger.info("ACLP_TEST_START_0013")
794 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
795 self.proto[self.IP][self.UDP]))
796 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
797 self.proto[self.IP][self.UDP]))
798 # permit ip any any in the end
799 rules.append(self.create_rule(self.IPV4, self.PERMIT,
801 rules.append(self.create_rule(self.IPV6, self.PERMIT,
805 self.apply_rules(rules, "deny ip4/ip6 udp")
807 # Traffic should not pass
808 self.run_verify_negat_test(self.IP, self.IPRANDOM,
809 self.proto[self.IP][self.UDP])
811 self.logger.info("ACLP_TEST_FINISH_0013")
813 def test_0014_acl_dump(self):
814 """ verify add/dump acls
816 self.logger.info("ACLP_TEST_START_0014")
818 r = [[self.IPV4, self.PERMIT, 1234, self.proto[self.IP][self.TCP]],
819 [self.IPV4, self.PERMIT, 2345, self.proto[self.IP][self.UDP]],
820 [self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.TCP]],
821 [self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.UDP]],
822 [self.IPV4, self.PERMIT, 5, self.proto[self.ICMP][self.ICMPv4]],
823 [self.IPV6, self.PERMIT, 4321, self.proto[self.IP][self.TCP]],
824 [self.IPV6, self.PERMIT, 5432, self.proto[self.IP][self.UDP]],
825 [self.IPV6, self.PERMIT, 0, self.proto[self.IP][self.TCP]],
826 [self.IPV6, self.PERMIT, 0, self.proto[self.IP][self.UDP]],
827 [self.IPV6, self.PERMIT, 6, self.proto[self.ICMP][self.ICMPv6]],
828 [self.IPV4, self.DENY, self.PORTS_ALL, 0],
829 [self.IPV4, self.DENY, 1234, self.proto[self.IP][self.TCP]],
830 [self.IPV4, self.DENY, 2345, self.proto[self.IP][self.UDP]],
831 [self.IPV4, self.DENY, 5, self.proto[self.ICMP][self.ICMPv4]],
832 [self.IPV6, self.DENY, 4321, self.proto[self.IP][self.TCP]],
833 [self.IPV6, self.DENY, 5432, self.proto[self.IP][self.UDP]],
834 [self.IPV6, self.DENY, 6, self.proto[self.ICMP][self.ICMPv6]],
835 [self.IPV6, self.DENY, self.PORTS_ALL, 0]
838 # Add and verify new ACLs
840 for i in range(len(r)):
841 rules.append(self.create_rule(r[i][0], r[i][1], r[i][2], r[i][3]))
843 reply = self.api_acl_add_replace(acl_index=4294967295, r=rules,
845 result = self.api_acl_dump(reply.acl_index)
848 for drules in result:
850 self.assertEqual(dr.is_ipv6, r[i][0])
851 self.assertEqual(dr.is_permit, r[i][1])
852 self.assertEqual(dr.proto, r[i][3])
855 self.assertEqual(dr.srcport_or_icmptype_first, r[i][2])
858 self.assertEqual(dr.srcport_or_icmptype_first, 0)
859 self.assertEqual(dr.srcport_or_icmptype_last, 65535)
861 if dr.proto == self.proto[self.IP][self.TCP]:
862 self.assertGreater(dr.srcport_or_icmptype_first,
863 self.tcp_sport_from-1)
864 self.assertLess(dr.srcport_or_icmptype_first,
866 self.assertGreater(dr.dstport_or_icmpcode_last,
867 self.tcp_dport_from-1)
868 self.assertLess(dr.dstport_or_icmpcode_last,
870 elif dr.proto == self.proto[self.IP][self.UDP]:
871 self.assertGreater(dr.srcport_or_icmptype_first,
872 self.udp_sport_from-1)
873 self.assertLess(dr.srcport_or_icmptype_first,
875 self.assertGreater(dr.dstport_or_icmpcode_last,
876 self.udp_dport_from-1)
877 self.assertLess(dr.dstport_or_icmpcode_last,
881 self.logger.info("ACLP_TEST_FINISH_0014")
883 def test_0015_tcp_permit_port_v4(self):
884 """ permit single TCPv4
886 self.logger.info("ACLP_TEST_START_0015")
888 port = random.randint(0, 65535)
891 rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
892 self.proto[self.IP][self.TCP]))
893 # deny ip any any in the end
894 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
897 self.apply_rules(rules, "permit ip4 tcp "+str(port))
899 # Traffic should still pass
900 self.run_verify_test(self.IP, self.IPV4,
901 self.proto[self.IP][self.TCP], port)
903 self.logger.info("ACLP_TEST_FINISH_0015")
905 def test_0016_udp_permit_port_v4(self):
906 """ permit single UDPv4
908 self.logger.info("ACLP_TEST_START_0016")
910 port = random.randint(0, 65535)
913 rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
914 self.proto[self.IP][self.UDP]))
915 # deny ip any any in the end
916 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
919 self.apply_rules(rules, "permit ip4 tcp "+str(port))
921 # Traffic should still pass
922 self.run_verify_test(self.IP, self.IPV4,
923 self.proto[self.IP][self.UDP], port)
925 self.logger.info("ACLP_TEST_FINISH_0016")
927 def test_0017_tcp_permit_port_v6(self):
928 """ permit single TCPv6
930 self.logger.info("ACLP_TEST_START_0017")
932 port = random.randint(0, 65535)
935 rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
936 self.proto[self.IP][self.TCP]))
937 # deny ip any any in the end
938 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
941 self.apply_rules(rules, "permit ip4 tcp "+str(port))
943 # Traffic should still pass
944 self.run_verify_test(self.IP, self.IPV6,
945 self.proto[self.IP][self.TCP], port)
947 self.logger.info("ACLP_TEST_FINISH_0017")
949 def test_0018_udp_permit_port_v6(self):
950 """ permit single UPPv6
952 self.logger.info("ACLP_TEST_START_0018")
954 port = random.randint(0, 65535)
957 rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
958 self.proto[self.IP][self.UDP]))
959 # deny ip any any in the end
960 rules.append(self.create_rule(self.IPV6, self.DENY,
964 self.apply_rules(rules, "permit ip4 tcp "+str(port))
966 # Traffic should still pass
967 self.run_verify_test(self.IP, self.IPV6,
968 self.proto[self.IP][self.UDP], port)
970 self.logger.info("ACLP_TEST_FINISH_0018")
972 def test_0019_udp_deny_port(self):
973 """ deny single TCPv4/v6
975 self.logger.info("ACLP_TEST_START_0019")
977 port = random.randint(0, 65535)
980 rules.append(self.create_rule(self.IPV4, self.DENY, port,
981 self.proto[self.IP][self.TCP]))
982 rules.append(self.create_rule(self.IPV6, self.DENY, port,
983 self.proto[self.IP][self.TCP]))
984 # Permit ip any any in the end
985 rules.append(self.create_rule(self.IPV4, self.PERMIT,
987 rules.append(self.create_rule(self.IPV6, self.PERMIT,
991 self.apply_rules(rules, "deny ip4/ip6 udp "+str(port))
993 # Traffic should not pass
994 self.run_verify_negat_test(self.IP, self.IPRANDOM,
995 self.proto[self.IP][self.TCP], port)
997 self.logger.info("ACLP_TEST_FINISH_0019")
999 def test_0020_udp_deny_port(self):
1000 """ deny single UDPv4/v6
1002 self.logger.info("ACLP_TEST_START_0020")
1004 port = random.randint(0, 65535)
1007 rules.append(self.create_rule(self.IPV4, self.DENY, port,
1008 self.proto[self.IP][self.UDP]))
1009 rules.append(self.create_rule(self.IPV6, self.DENY, port,
1010 self.proto[self.IP][self.UDP]))
1011 # Permit ip any any in the end
1012 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1014 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1018 self.apply_rules(rules, "deny ip4/ip6 udp "+str(port))
1020 # Traffic should not pass
1021 self.run_verify_negat_test(self.IP, self.IPRANDOM,
1022 self.proto[self.IP][self.UDP], port)
1024 self.logger.info("ACLP_TEST_FINISH_0020")
1026 def test_0021_udp_deny_port_verify_fragment_deny(self):
1027 """ deny single UDPv4/v6, permit ip any, verify non-initial fragment blocked
1029 self.logger.info("ACLP_TEST_START_0021")
1031 port = random.randint(0, 65535)
1034 rules.append(self.create_rule(self.IPV4, self.DENY, port,
1035 self.proto[self.IP][self.UDP]))
1036 rules.append(self.create_rule(self.IPV6, self.DENY, port,
1037 self.proto[self.IP][self.UDP]))
1038 # deny ip any any in the end
1039 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1041 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1045 self.apply_rules(rules, "deny ip4/ip6 udp "+str(port))
1047 # Traffic should not pass
1048 self.run_verify_negat_test(self.IP, self.IPRANDOM,
1049 self.proto[self.IP][self.UDP], port, True)
1051 self.logger.info("ACLP_TEST_FINISH_0021")
1053 def test_0022_zero_length_udp_ipv4(self):
1054 """ VPP-687 zero length udp ipv4 packet"""
1055 self.logger.info("ACLP_TEST_START_0022")
1057 port = random.randint(0, 65535)
1060 rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
1061 self.proto[self.IP][self.UDP]))
1062 # deny ip any any in the end
1064 self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1067 self.apply_rules(rules, "permit empty udp ip4 " + str(port))
1069 # Traffic should still pass
1070 # Create incoming packet streams for packet-generator interfaces
1072 pkts = self.create_stream(self.pg0, self.pg_if_packet_sizes,
1074 self.proto[self.IP][self.UDP], port,
1077 self.pg0.add_stream(pkts)
1078 pkts_cnt += len(pkts)
1080 # Enable packet capture and start packet sendingself.IPV
1081 self.pg_enable_capture(self.pg_interfaces)
1084 self.pg1.get_capture(pkts_cnt)
1086 self.logger.info("ACLP_TEST_FINISH_0022")
1088 def test_0023_zero_length_udp_ipv6(self):
1089 """ VPP-687 zero length udp ipv6 packet"""
1090 self.logger.info("ACLP_TEST_START_0023")
1092 port = random.randint(0, 65535)
1095 rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
1096 self.proto[self.IP][self.UDP]))
1097 # deny ip any any in the end
1098 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1101 self.apply_rules(rules, "permit empty udp ip6 "+str(port))
1103 # Traffic should still pass
1104 # Create incoming packet streams for packet-generator interfaces
1106 pkts = self.create_stream(self.pg0, self.pg_if_packet_sizes,
1108 self.proto[self.IP][self.UDP], port,
1111 self.pg0.add_stream(pkts)
1112 pkts_cnt += len(pkts)
1114 # Enable packet capture and start packet sendingself.IPV
1115 self.pg_enable_capture(self.pg_interfaces)
1118 # Verify outgoing packet streams per packet-generator interface
1119 self.pg1.get_capture(pkts_cnt)
1121 self.logger.info("ACLP_TEST_FINISH_0023")
1123 if __name__ == '__main__':
1124 unittest.main(testRunner=VppTestRunner)