2 """ACL plugin Test Case HLD:
8 from scapy.packet import Raw
9 from scapy.layers.l2 import Ether
10 from scapy.layers.inet import IP, TCP, UDP, ICMP
11 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest
12 from scapy.layers.inet6 import IPv6ExtHdrFragment
13 from framework import VppTestCase, VppTestRunner
14 from util import Host, ppp
17 class TestACLplugin(VppTestCase):
18 """ ACL plugin Test Case """
34 proto = [[6, 17], [1, 58]]
35 proto_map = {1: 'ICMP', 58: 'ICMPv6EchoRequest', 6: 'TCP', 17: 'UDP'}
47 udp_sport_to = udp_sport_from + 5
48 udp_dport_from = 20000
49 udp_dport_to = udp_dport_from + 5000
51 tcp_sport_to = tcp_sport_from + 5
52 tcp_dport_from = 40000
53 tcp_dport_to = tcp_dport_from + 5000
56 udp_sport_to_2 = udp_sport_from_2 + 5
57 udp_dport_from_2 = 30000
58 udp_dport_to_2 = udp_dport_from_2 + 5000
59 tcp_sport_from_2 = 130
60 tcp_sport_to_2 = tcp_sport_from_2 + 5
61 tcp_dport_from_2 = 20000
62 tcp_dport_to_2 = tcp_dport_from_2 + 5000
64 icmp4_type = 8 # echo request
66 icmp6_type = 128 # echo request
82 Perform standard class setup (defined by class method setUpClass in
83 class VppTestCase) before running the test case, set test case related
84 variables and configure VPP.
86 super(TestACLplugin, cls).setUpClass()
89 # Create 2 pg interfaces
90 cls.create_pg_interfaces(range(2))
92 # Packet flows mapping pg0 -> pg1, pg2 etc.
94 cls.flows[cls.pg0] = [cls.pg1]
97 cls.pg_if_packet_sizes = [64, 512, 1518, 9018]
99 # Create BD with MAC learning and unknown unicast flooding disabled
100 # and put interfaces to this BD
101 cls.vapi.bridge_domain_add_del(bd_id=cls.bd_id, uu_flood=1,
103 for pg_if in cls.pg_interfaces:
104 cls.vapi.sw_interface_set_l2_bridge(pg_if.sw_if_index,
107 # Set up all interfaces
108 for i in cls.pg_interfaces:
111 # Mapping between packet-generator index and lists of test hosts
112 cls.hosts_by_pg_idx = dict()
113 for pg_if in cls.pg_interfaces:
114 cls.hosts_by_pg_idx[pg_if.sw_if_index] = []
116 # Create list of deleted hosts
117 cls.deleted_hosts_by_pg_idx = dict()
118 for pg_if in cls.pg_interfaces:
119 cls.deleted_hosts_by_pg_idx[pg_if.sw_if_index] = []
121 # warm-up the mac address tables
125 super(TestACLplugin, cls).tearDownClass()
129 super(TestACLplugin, self).setUp()
130 self.reset_packet_infos()
134 Show various debug prints after each test.
136 super(TestACLplugin, self).tearDown()
137 if not self.vpp_dead:
138 self.logger.info(self.vapi.ppcli("show l2fib verbose"))
139 self.logger.info(self.vapi.ppcli("show acl-plugin acl"))
140 self.logger.info(self.vapi.ppcli("show acl-plugin interface"))
141 self.logger.info(self.vapi.ppcli("show acl-plugin tables"))
142 self.logger.info(self.vapi.ppcli("show bridge-domain %s detail"
145 def create_hosts(self, count, start=0):
147 Create required number of host MAC addresses and distribute them among
148 interfaces. Create host IPv4 address for every host MAC address.
150 :param int count: Number of hosts to create MAC/IPv4 addresses for.
151 :param int start: Number to start numbering from.
153 n_int = len(self.pg_interfaces)
154 macs_per_if = count / n_int
156 for pg_if in self.pg_interfaces:
158 start_nr = macs_per_if * i + start
159 end_nr = count + start if i == (n_int - 1) \
160 else macs_per_if * (i + 1) + start
161 hosts = self.hosts_by_pg_idx[pg_if.sw_if_index]
162 for j in range(start_nr, end_nr):
164 "00:00:00:ff:%02x:%02x" % (pg_if.sw_if_index, j),
165 "172.17.1%02x.%u" % (pg_if.sw_if_index, j),
166 "2017:dead:%02x::%u" % (pg_if.sw_if_index, j))
169 def create_rule(self, ip=0, permit_deny=0, ports=PORTS_ALL, proto=-1,
170 s_prefix=0, s_ip='\x00\x00\x00\x00',
171 d_prefix=0, d_ip='\x00\x00\x00\x00'):
174 if ports == self.PORTS_ALL:
177 sport_to = 65535 if proto != 1 and proto != 58 else 255
179 elif ports == self.PORTS_RANGE:
181 sport_from = self.icmp4_type
182 sport_to = self.icmp4_type
183 dport_from = self.icmp4_code
184 dport_to = self.icmp4_code
186 sport_from = self.icmp6_type
187 sport_to = self.icmp6_type
188 dport_from = self.icmp6_code
189 dport_to = self.icmp6_code
190 elif proto == self.proto[self.IP][self.TCP]:
191 sport_from = self.tcp_sport_from
192 sport_to = self.tcp_sport_to
193 dport_from = self.tcp_dport_from
194 dport_to = self.tcp_dport_to
195 elif proto == self.proto[self.IP][self.UDP]:
196 sport_from = self.udp_sport_from
197 sport_to = self.udp_sport_to
198 dport_from = self.udp_dport_from
199 dport_to = self.udp_dport_to
200 elif ports == self.PORTS_RANGE_2:
202 sport_from = self.icmp4_type_2
203 sport_to = self.icmp4_type_2
204 dport_from = self.icmp4_code_from_2
205 dport_to = self.icmp4_code_to_2
207 sport_from = self.icmp6_type_2
208 sport_to = self.icmp6_type_2
209 dport_from = self.icmp6_code_from_2
210 dport_to = self.icmp6_code_to_2
211 elif proto == self.proto[self.IP][self.TCP]:
212 sport_from = self.tcp_sport_from_2
213 sport_to = self.tcp_sport_to_2
214 dport_from = self.tcp_dport_from_2
215 dport_to = self.tcp_dport_to_2
216 elif proto == self.proto[self.IP][self.UDP]:
217 sport_from = self.udp_sport_from_2
218 sport_to = self.udp_sport_to_2
219 dport_from = self.udp_dport_from_2
220 dport_to = self.udp_dport_to_2
227 rule = ({'is_permit': permit_deny, 'is_ipv6': ip, 'proto': proto,
228 'srcport_or_icmptype_first': sport_from,
229 'srcport_or_icmptype_last': sport_to,
230 'src_ip_prefix_len': s_prefix,
232 'dstport_or_icmpcode_first': dport_from,
233 'dstport_or_icmpcode_last': dport_to,
234 'dst_ip_prefix_len': d_prefix,
235 'dst_ip_addr': d_ip})
238 def apply_rules(self, rules, tag=''):
239 reply = self.vapi.acl_add_replace(acl_index=4294967295, r=rules,
241 self.logger.info("Dumped ACL: " + str(
242 self.vapi.acl_dump(reply.acl_index)))
243 # Apply a ACL on the interface as inbound
244 for i in self.pg_interfaces:
245 self.vapi.acl_interface_set_acl_list(sw_if_index=i.sw_if_index,
247 acls=[reply.acl_index])
250 def etype_whitelist(self, whitelist, n_input):
251 # Apply whitelists on all the interfaces
252 for i in self.pg_interfaces:
253 # checkstyle can't read long names. Help them.
254 fun = self.vapi.acl_interface_set_etype_whitelist
255 fun(sw_if_index=i.sw_if_index, n_input=n_input,
259 def create_upper_layer(self, packet_index, proto, ports=0):
260 p = self.proto_map[proto]
263 return UDP(sport=random.randint(self.udp_sport_from,
265 dport=random.randint(self.udp_dport_from,
268 return UDP(sport=ports, dport=ports)
271 return TCP(sport=random.randint(self.tcp_sport_from,
273 dport=random.randint(self.tcp_dport_from,
276 return TCP(sport=ports, dport=ports)
279 def create_stream(self, src_if, packet_sizes, traffic_type=0, ipv6=0,
280 proto=-1, ports=0, fragments=False,
281 pkt_raw=True, etype=-1):
283 Create input packet stream for defined interface using hosts or
286 :param object src_if: Interface to create packet stream for.
287 :param list packet_sizes: List of required packet sizes.
288 :param traffic_type: 1: ICMP packet, 2: IPv6 with EH, 0: otherwise.
289 :return: Stream of packets.
292 if self.flows.__contains__(src_if):
293 src_hosts = self.hosts_by_pg_idx[src_if.sw_if_index]
294 for dst_if in self.flows[src_if]:
295 dst_hosts = self.hosts_by_pg_idx[dst_if.sw_if_index]
296 n_int = len(dst_hosts) * len(src_hosts)
297 for i in range(0, n_int):
298 dst_host = dst_hosts[i / len(src_hosts)]
299 src_host = src_hosts[i % len(src_hosts)]
300 pkt_info = self.create_packet_info(src_if, dst_if)
306 pkt_info.ip = random.choice([0, 1])
308 pkt_info.proto = random.choice(self.proto[self.IP])
310 pkt_info.proto = proto
311 payload = self.info_to_payload(pkt_info)
312 p = Ether(dst=dst_host.mac, src=src_host.mac)
314 p = Ether(dst=dst_host.mac,
318 p /= IPv6(dst=dst_host.ip6, src=src_host.ip6)
320 p /= IPv6ExtHdrFragment(offset=64, m=1)
323 p /= IP(src=src_host.ip4, dst=dst_host.ip4,
326 p /= IP(src=src_host.ip4, dst=dst_host.ip4)
327 if traffic_type == self.ICMP:
329 p /= ICMPv6EchoRequest(type=self.icmp6_type,
330 code=self.icmp6_code)
332 p /= ICMP(type=self.icmp4_type,
333 code=self.icmp4_code)
335 p /= self.create_upper_layer(i, pkt_info.proto, ports)
338 pkt_info.data = p.copy()
340 size = random.choice(packet_sizes)
341 self.extend_packet(p, size)
345 def verify_capture(self, pg_if, capture,
346 traffic_type=0, ip_type=0, etype=-1):
348 Verify captured input packet stream for defined interface.
350 :param object pg_if: Interface to verify captured packet stream for.
351 :param list capture: Captured packet stream.
352 :param traffic_type: 1: ICMP packet, 2: IPv6 with EH, 0: otherwise.
355 for i in self.pg_interfaces:
356 last_info[i.sw_if_index] = None
357 dst_sw_if_index = pg_if.sw_if_index
358 for packet in capture:
360 if packet[Ether].type != etype:
361 self.logger.error(ppp("Unexpected ethertype in packet:",
366 # Raw data for ICMPv6 are stored in ICMPv6EchoRequest.data
367 if traffic_type == self.ICMP and ip_type == self.IPV6:
368 payload_info = self.payload_to_info(
369 packet[ICMPv6EchoRequest].data)
370 payload = packet[ICMPv6EchoRequest]
372 payload_info = self.payload_to_info(str(packet[Raw]))
373 payload = packet[self.proto_map[payload_info.proto]]
375 self.logger.error(ppp("Unexpected or invalid packet "
376 "(outside network):", packet))
380 self.assertEqual(payload_info.ip, ip_type)
381 if traffic_type == self.ICMP:
383 if payload_info.ip == 0:
384 self.assertEqual(payload.type, self.icmp4_type)
385 self.assertEqual(payload.code, self.icmp4_code)
387 self.assertEqual(payload.type, self.icmp6_type)
388 self.assertEqual(payload.code, self.icmp6_code)
390 self.logger.error(ppp("Unexpected or invalid packet "
391 "(outside network):", packet))
395 ip_version = IPv6 if payload_info.ip == 1 else IP
397 ip = packet[ip_version]
398 packet_index = payload_info.index
400 self.assertEqual(payload_info.dst, dst_sw_if_index)
401 self.logger.debug("Got packet on port %s: src=%u (id=%u)" %
402 (pg_if.name, payload_info.src,
404 next_info = self.get_next_packet_info_for_interface2(
405 payload_info.src, dst_sw_if_index,
406 last_info[payload_info.src])
407 last_info[payload_info.src] = next_info
408 self.assertTrue(next_info is not None)
409 self.assertEqual(packet_index, next_info.index)
410 saved_packet = next_info.data
411 # Check standard fields
412 self.assertEqual(ip.src, saved_packet[ip_version].src)
413 self.assertEqual(ip.dst, saved_packet[ip_version].dst)
414 p = self.proto_map[payload_info.proto]
417 self.assertEqual(tcp.sport, saved_packet[
419 self.assertEqual(tcp.dport, saved_packet[
423 self.assertEqual(udp.sport, saved_packet[
425 self.assertEqual(udp.dport, saved_packet[
428 self.logger.error(ppp("Unexpected or invalid packet:",
431 for i in self.pg_interfaces:
432 remaining_packet = self.get_next_packet_info_for_interface2(
433 i, dst_sw_if_index, last_info[i.sw_if_index])
435 remaining_packet is None,
436 "Port %u: Packet expected from source %u didn't arrive" %
437 (dst_sw_if_index, i.sw_if_index))
439 def run_traffic_no_check(self):
441 # Create incoming packet streams for packet-generator interfaces
442 for i in self.pg_interfaces:
443 if self.flows.__contains__(i):
444 pkts = self.create_stream(i, self.pg_if_packet_sizes)
448 # Enable packet capture and start packet sending
449 self.pg_enable_capture(self.pg_interfaces)
452 def run_verify_test(self, traffic_type=0, ip_type=0, proto=-1, ports=0,
453 frags=False, pkt_raw=True, etype=-1):
455 # Create incoming packet streams for packet-generator interfaces
457 for i in self.pg_interfaces:
458 if self.flows.__contains__(i):
459 pkts = self.create_stream(i, self.pg_if_packet_sizes,
460 traffic_type, ip_type, proto, ports,
461 frags, pkt_raw, etype)
464 pkts_cnt += len(pkts)
466 # Enable packet capture and start packet sendingself.IPV
467 self.pg_enable_capture(self.pg_interfaces)
471 # Verify outgoing packet streams per packet-generator interface
472 for src_if in self.pg_interfaces:
473 if self.flows.__contains__(src_if):
474 for dst_if in self.flows[src_if]:
475 capture = dst_if.get_capture(pkts_cnt)
476 self.logger.info("Verifying capture on interface %s" %
478 self.verify_capture(dst_if, capture,
479 traffic_type, ip_type, etype)
481 def run_verify_negat_test(self, traffic_type=0, ip_type=0, proto=-1,
482 ports=0, frags=False, etype=-1):
484 self.reset_packet_infos()
485 for i in self.pg_interfaces:
486 if self.flows.__contains__(i):
487 pkts = self.create_stream(i, self.pg_if_packet_sizes,
488 traffic_type, ip_type, proto, ports,
493 # Enable packet capture and start packet sending
494 self.pg_enable_capture(self.pg_interfaces)
498 # Verify outgoing packet streams per packet-generator interface
499 for src_if in self.pg_interfaces:
500 if self.flows.__contains__(src_if):
501 for dst_if in self.flows[src_if]:
502 self.logger.info("Verifying capture on interface %s" %
504 capture = dst_if.get_capture(0)
505 self.assertEqual(len(capture), 0)
507 def test_0000_warmup_test(self):
508 """ ACL plugin version check; learn MACs
510 self.create_hosts(16)
511 self.run_traffic_no_check()
512 reply = self.vapi.papi.acl_plugin_get_version()
513 self.assertEqual(reply.major, 1)
514 self.logger.info("Working with ACL plugin version: %d.%d" % (
515 reply.major, reply.minor))
516 # minor version changes are non breaking
517 # self.assertEqual(reply.minor, 0)
519 def test_0001_acl_create(self):
520 """ ACL create/delete test
523 self.logger.info("ACLP_TEST_START_0001")
525 r = [{'is_permit': 1, 'is_ipv6': 0, 'proto': 17,
526 'srcport_or_icmptype_first': 1234,
527 'srcport_or_icmptype_last': 1235,
528 'src_ip_prefix_len': 0,
529 'src_ip_addr': '\x00\x00\x00\x00',
530 'dstport_or_icmpcode_first': 1234,
531 'dstport_or_icmpcode_last': 1234,
532 'dst_ip_addr': '\x00\x00\x00\x00',
533 'dst_ip_prefix_len': 0}]
534 # Test 1: add a new ACL
535 reply = self.vapi.acl_add_replace(acl_index=4294967295, r=r,
537 self.assertEqual(reply.retval, 0)
538 # The very first ACL gets #0
539 self.assertEqual(reply.acl_index, 0)
540 first_acl = reply.acl_index
541 rr = self.vapi.acl_dump(reply.acl_index)
542 self.logger.info("Dumped ACL: " + str(rr))
543 self.assertEqual(len(rr), 1)
544 # We should have the same number of ACL entries as we had asked
545 self.assertEqual(len(rr[0].r), len(r))
546 # The rules should be the same. But because the submitted and returned
547 # are different types, we need to iterate over rules and keys to get
549 for i_rule in range(0, len(r) - 1):
550 for rule_key in r[i_rule]:
551 self.assertEqual(rr[0].r[i_rule][rule_key],
554 # Add a deny-1234 ACL
555 r_deny = [{'is_permit': 0, 'is_ipv6': 0, 'proto': 17,
556 'srcport_or_icmptype_first': 1234,
557 'srcport_or_icmptype_last': 1235,
558 'src_ip_prefix_len': 0,
559 'src_ip_addr': '\x00\x00\x00\x00',
560 'dstport_or_icmpcode_first': 1234,
561 'dstport_or_icmpcode_last': 1234,
562 'dst_ip_addr': '\x00\x00\x00\x00',
563 'dst_ip_prefix_len': 0},
564 {'is_permit': 1, 'is_ipv6': 0, 'proto': 17,
565 'srcport_or_icmptype_first': 0,
566 'srcport_or_icmptype_last': 0,
567 'src_ip_prefix_len': 0,
568 'src_ip_addr': '\x00\x00\x00\x00',
569 'dstport_or_icmpcode_first': 0,
570 'dstport_or_icmpcode_last': 0,
571 'dst_ip_addr': '\x00\x00\x00\x00',
572 'dst_ip_prefix_len': 0}]
574 reply = self.vapi.acl_add_replace(acl_index=4294967295, r=r_deny,
575 tag="deny 1234;permit all")
576 self.assertEqual(reply.retval, 0)
577 # The second ACL gets #1
578 self.assertEqual(reply.acl_index, 1)
579 second_acl = reply.acl_index
581 # Test 2: try to modify a nonexistent ACL
582 reply = self.vapi.acl_add_replace(acl_index=432, r=r,
583 tag="FFFF:FFFF", expected_retval=-6)
584 self.assertEqual(reply.retval, -6)
585 # The ACL number should pass through
586 self.assertEqual(reply.acl_index, 432)
587 # apply an ACL on an interface inbound, try to delete ACL, must fail
588 self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
591 reply = self.vapi.acl_del(acl_index=first_acl, expected_retval=-142)
592 # Unapply an ACL and then try to delete it - must be ok
593 self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
596 reply = self.vapi.acl_del(acl_index=first_acl, expected_retval=0)
598 # apply an ACL on an interface outbound, try to delete ACL, must fail
599 self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
602 reply = self.vapi.acl_del(acl_index=second_acl, expected_retval=-143)
603 # Unapply the ACL and then try to delete it - must be ok
604 self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
607 reply = self.vapi.acl_del(acl_index=second_acl, expected_retval=0)
609 # try to apply a nonexistent ACL - must fail
610 self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
615 self.logger.info("ACLP_TEST_FINISH_0001")
617 def test_0002_acl_permit_apply(self):
618 """ permit ACL apply test
620 self.logger.info("ACLP_TEST_START_0002")
623 rules.append(self.create_rule(self.IPV4, self.PERMIT,
624 0, self.proto[self.IP][self.UDP]))
625 rules.append(self.create_rule(self.IPV4, self.PERMIT,
626 0, self.proto[self.IP][self.TCP]))
629 self.apply_rules(rules, "permit per-flow")
631 # Traffic should still pass
632 self.run_verify_test(self.IP, self.IPV4, -1)
633 self.logger.info("ACLP_TEST_FINISH_0002")
635 def test_0003_acl_deny_apply(self):
636 """ deny ACL apply test
638 self.logger.info("ACLP_TEST_START_0003")
639 # Add a deny-flows ACL
641 rules.append(self.create_rule(self.IPV4, self.DENY,
642 self.PORTS_ALL, self.proto[self.IP][self.UDP]))
643 # Permit ip any any in the end
644 rules.append(self.create_rule(self.IPV4, self.PERMIT,
648 self.apply_rules(rules, "deny per-flow;permit all")
650 # Traffic should not pass
651 self.run_verify_negat_test(self.IP, self.IPV4,
652 self.proto[self.IP][self.UDP])
653 self.logger.info("ACLP_TEST_FINISH_0003")
654 # self.assertEqual(1, 0)
656 def test_0004_vpp624_permit_icmpv4(self):
657 """ VPP_624 permit ICMPv4
659 self.logger.info("ACLP_TEST_START_0004")
663 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
664 self.proto[self.ICMP][self.ICMPv4]))
665 # deny ip any any in the end
666 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
669 self.apply_rules(rules, "permit icmpv4")
671 # Traffic should still pass
672 self.run_verify_test(self.ICMP, self.IPV4,
673 self.proto[self.ICMP][self.ICMPv4])
675 self.logger.info("ACLP_TEST_FINISH_0004")
677 def test_0005_vpp624_permit_icmpv6(self):
678 """ VPP_624 permit ICMPv6
680 self.logger.info("ACLP_TEST_START_0005")
684 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
685 self.proto[self.ICMP][self.ICMPv6]))
686 # deny ip any any in the end
687 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
690 self.apply_rules(rules, "permit icmpv6")
692 # Traffic should still pass
693 self.run_verify_test(self.ICMP, self.IPV6,
694 self.proto[self.ICMP][self.ICMPv6])
696 self.logger.info("ACLP_TEST_FINISH_0005")
698 def test_0006_vpp624_deny_icmpv4(self):
699 """ VPP_624 deny ICMPv4
701 self.logger.info("ACLP_TEST_START_0006")
704 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
705 self.proto[self.ICMP][self.ICMPv4]))
706 # permit ip any any in the end
707 rules.append(self.create_rule(self.IPV4, self.PERMIT,
711 self.apply_rules(rules, "deny icmpv4")
713 # Traffic should not pass
714 self.run_verify_negat_test(self.ICMP, self.IPV4, 0)
716 self.logger.info("ACLP_TEST_FINISH_0006")
718 def test_0007_vpp624_deny_icmpv6(self):
719 """ VPP_624 deny ICMPv6
721 self.logger.info("ACLP_TEST_START_0007")
724 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
725 self.proto[self.ICMP][self.ICMPv6]))
726 # deny ip any any in the end
727 rules.append(self.create_rule(self.IPV6, self.PERMIT,
731 self.apply_rules(rules, "deny icmpv6")
733 # Traffic should not pass
734 self.run_verify_negat_test(self.ICMP, self.IPV6, 0)
736 self.logger.info("ACLP_TEST_FINISH_0007")
738 def test_0008_tcp_permit_v4(self):
741 self.logger.info("ACLP_TEST_START_0008")
745 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
746 self.proto[self.IP][self.TCP]))
747 # deny ip any any in the end
748 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
751 self.apply_rules(rules, "permit ipv4 tcp")
753 # Traffic should still pass
754 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP])
756 self.logger.info("ACLP_TEST_FINISH_0008")
758 def test_0009_tcp_permit_v6(self):
761 self.logger.info("ACLP_TEST_START_0009")
765 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
766 self.proto[self.IP][self.TCP]))
767 # deny ip any any in the end
768 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
771 self.apply_rules(rules, "permit ip6 tcp")
773 # Traffic should still pass
774 self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.TCP])
776 self.logger.info("ACLP_TEST_FINISH_0008")
778 def test_0010_udp_permit_v4(self):
781 self.logger.info("ACLP_TEST_START_0010")
785 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
786 self.proto[self.IP][self.UDP]))
787 # deny ip any any in the end
788 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
791 self.apply_rules(rules, "permit ipv udp")
793 # Traffic should still pass
794 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP])
796 self.logger.info("ACLP_TEST_FINISH_0010")
798 def test_0011_udp_permit_v6(self):
801 self.logger.info("ACLP_TEST_START_0011")
805 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
806 self.proto[self.IP][self.UDP]))
807 # deny ip any any in the end
808 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
811 self.apply_rules(rules, "permit ip6 udp")
813 # Traffic should still pass
814 self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.UDP])
816 self.logger.info("ACLP_TEST_FINISH_0011")
818 def test_0012_tcp_deny(self):
821 self.logger.info("ACLP_TEST_START_0012")
825 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
826 self.proto[self.IP][self.TCP]))
827 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
828 self.proto[self.IP][self.TCP]))
829 # permit ip any any in the end
830 rules.append(self.create_rule(self.IPV4, self.PERMIT,
832 rules.append(self.create_rule(self.IPV6, self.PERMIT,
836 self.apply_rules(rules, "deny ip4/ip6 tcp")
838 # Traffic should not pass
839 self.run_verify_negat_test(self.IP, self.IPRANDOM,
840 self.proto[self.IP][self.TCP])
842 self.logger.info("ACLP_TEST_FINISH_0012")
844 def test_0013_udp_deny(self):
847 self.logger.info("ACLP_TEST_START_0013")
851 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
852 self.proto[self.IP][self.UDP]))
853 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
854 self.proto[self.IP][self.UDP]))
855 # permit ip any any in the end
856 rules.append(self.create_rule(self.IPV4, self.PERMIT,
858 rules.append(self.create_rule(self.IPV6, self.PERMIT,
862 self.apply_rules(rules, "deny ip4/ip6 udp")
864 # Traffic should not pass
865 self.run_verify_negat_test(self.IP, self.IPRANDOM,
866 self.proto[self.IP][self.UDP])
868 self.logger.info("ACLP_TEST_FINISH_0013")
870 def test_0014_acl_dump(self):
871 """ verify add/dump acls
873 self.logger.info("ACLP_TEST_START_0014")
875 r = [[self.IPV4, self.PERMIT, 1234, self.proto[self.IP][self.TCP]],
876 [self.IPV4, self.PERMIT, 2345, self.proto[self.IP][self.UDP]],
877 [self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.TCP]],
878 [self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.UDP]],
879 [self.IPV4, self.PERMIT, 5, self.proto[self.ICMP][self.ICMPv4]],
880 [self.IPV6, self.PERMIT, 4321, self.proto[self.IP][self.TCP]],
881 [self.IPV6, self.PERMIT, 5432, self.proto[self.IP][self.UDP]],
882 [self.IPV6, self.PERMIT, 0, self.proto[self.IP][self.TCP]],
883 [self.IPV6, self.PERMIT, 0, self.proto[self.IP][self.UDP]],
884 [self.IPV6, self.PERMIT, 6, self.proto[self.ICMP][self.ICMPv6]],
885 [self.IPV4, self.DENY, self.PORTS_ALL, 0],
886 [self.IPV4, self.DENY, 1234, self.proto[self.IP][self.TCP]],
887 [self.IPV4, self.DENY, 2345, self.proto[self.IP][self.UDP]],
888 [self.IPV4, self.DENY, 5, self.proto[self.ICMP][self.ICMPv4]],
889 [self.IPV6, self.DENY, 4321, self.proto[self.IP][self.TCP]],
890 [self.IPV6, self.DENY, 5432, self.proto[self.IP][self.UDP]],
891 [self.IPV6, self.DENY, 6, self.proto[self.ICMP][self.ICMPv6]],
892 [self.IPV6, self.DENY, self.PORTS_ALL, 0]
895 # Add and verify new ACLs
897 for i in range(len(r)):
898 rules.append(self.create_rule(r[i][0], r[i][1], r[i][2], r[i][3]))
900 reply = self.vapi.acl_add_replace(acl_index=4294967295, r=rules)
901 result = self.vapi.acl_dump(reply.acl_index)
904 for drules in result:
906 self.assertEqual(dr.is_ipv6, r[i][0])
907 self.assertEqual(dr.is_permit, r[i][1])
908 self.assertEqual(dr.proto, r[i][3])
911 self.assertEqual(dr.srcport_or_icmptype_first, r[i][2])
914 self.assertEqual(dr.srcport_or_icmptype_first, 0)
915 self.assertEqual(dr.srcport_or_icmptype_last, 65535)
917 if dr.proto == self.proto[self.IP][self.TCP]:
918 self.assertGreater(dr.srcport_or_icmptype_first,
919 self.tcp_sport_from-1)
920 self.assertLess(dr.srcport_or_icmptype_first,
922 self.assertGreater(dr.dstport_or_icmpcode_last,
923 self.tcp_dport_from-1)
924 self.assertLess(dr.dstport_or_icmpcode_last,
926 elif dr.proto == self.proto[self.IP][self.UDP]:
927 self.assertGreater(dr.srcport_or_icmptype_first,
928 self.udp_sport_from-1)
929 self.assertLess(dr.srcport_or_icmptype_first,
931 self.assertGreater(dr.dstport_or_icmpcode_last,
932 self.udp_dport_from-1)
933 self.assertLess(dr.dstport_or_icmpcode_last,
937 self.logger.info("ACLP_TEST_FINISH_0014")
939 def test_0015_tcp_permit_port_v4(self):
940 """ permit single TCPv4
942 self.logger.info("ACLP_TEST_START_0015")
944 port = random.randint(0, 65535)
947 rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
948 self.proto[self.IP][self.TCP]))
949 # deny ip any any in the end
950 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
953 self.apply_rules(rules, "permit ip4 tcp "+str(port))
955 # Traffic should still pass
956 self.run_verify_test(self.IP, self.IPV4,
957 self.proto[self.IP][self.TCP], port)
959 self.logger.info("ACLP_TEST_FINISH_0015")
961 def test_0016_udp_permit_port_v4(self):
962 """ permit single UDPv4
964 self.logger.info("ACLP_TEST_START_0016")
966 port = random.randint(0, 65535)
969 rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
970 self.proto[self.IP][self.UDP]))
971 # deny ip any any in the end
972 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
975 self.apply_rules(rules, "permit ip4 tcp "+str(port))
977 # Traffic should still pass
978 self.run_verify_test(self.IP, self.IPV4,
979 self.proto[self.IP][self.UDP], port)
981 self.logger.info("ACLP_TEST_FINISH_0016")
983 def test_0017_tcp_permit_port_v6(self):
984 """ permit single TCPv6
986 self.logger.info("ACLP_TEST_START_0017")
988 port = random.randint(0, 65535)
991 rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
992 self.proto[self.IP][self.TCP]))
993 # deny ip any any in the end
994 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
997 self.apply_rules(rules, "permit ip4 tcp "+str(port))
999 # Traffic should still pass
1000 self.run_verify_test(self.IP, self.IPV6,
1001 self.proto[self.IP][self.TCP], port)
1003 self.logger.info("ACLP_TEST_FINISH_0017")
1005 def test_0018_udp_permit_port_v6(self):
1006 """ permit single UPPv6
1008 self.logger.info("ACLP_TEST_START_0018")
1010 port = random.randint(0, 65535)
1013 rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
1014 self.proto[self.IP][self.UDP]))
1015 # deny ip any any in the end
1016 rules.append(self.create_rule(self.IPV6, self.DENY,
1020 self.apply_rules(rules, "permit ip4 tcp "+str(port))
1022 # Traffic should still pass
1023 self.run_verify_test(self.IP, self.IPV6,
1024 self.proto[self.IP][self.UDP], port)
1026 self.logger.info("ACLP_TEST_FINISH_0018")
1028 def test_0019_udp_deny_port(self):
1029 """ deny single TCPv4/v6
1031 self.logger.info("ACLP_TEST_START_0019")
1033 port = random.randint(0, 65535)
1036 rules.append(self.create_rule(self.IPV4, self.DENY, port,
1037 self.proto[self.IP][self.TCP]))
1038 rules.append(self.create_rule(self.IPV6, self.DENY, port,
1039 self.proto[self.IP][self.TCP]))
1040 # Permit ip any any in the end
1041 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1043 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1047 self.apply_rules(rules, "deny ip4/ip6 udp "+str(port))
1049 # Traffic should not pass
1050 self.run_verify_negat_test(self.IP, self.IPRANDOM,
1051 self.proto[self.IP][self.TCP], port)
1053 self.logger.info("ACLP_TEST_FINISH_0019")
1055 def test_0020_udp_deny_port(self):
1056 """ deny single UDPv4/v6
1058 self.logger.info("ACLP_TEST_START_0020")
1060 port = random.randint(0, 65535)
1063 rules.append(self.create_rule(self.IPV4, self.DENY, port,
1064 self.proto[self.IP][self.UDP]))
1065 rules.append(self.create_rule(self.IPV6, self.DENY, port,
1066 self.proto[self.IP][self.UDP]))
1067 # Permit ip any any in the end
1068 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1070 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1074 self.apply_rules(rules, "deny ip4/ip6 udp "+str(port))
1076 # Traffic should not pass
1077 self.run_verify_negat_test(self.IP, self.IPRANDOM,
1078 self.proto[self.IP][self.UDP], port)
1080 self.logger.info("ACLP_TEST_FINISH_0020")
1082 def test_0021_udp_deny_port_verify_fragment_deny(self):
1083 """ deny single UDPv4/v6, permit ip any, verify non-initial fragment blocked
1085 self.logger.info("ACLP_TEST_START_0021")
1087 port = random.randint(0, 65535)
1090 rules.append(self.create_rule(self.IPV4, self.DENY, port,
1091 self.proto[self.IP][self.UDP]))
1092 rules.append(self.create_rule(self.IPV6, self.DENY, port,
1093 self.proto[self.IP][self.UDP]))
1094 # deny ip any any in the end
1095 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1097 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1101 self.apply_rules(rules, "deny ip4/ip6 udp "+str(port))
1103 # Traffic should not pass
1104 self.run_verify_negat_test(self.IP, self.IPRANDOM,
1105 self.proto[self.IP][self.UDP], port, True)
1107 self.logger.info("ACLP_TEST_FINISH_0021")
1109 def test_0022_zero_length_udp_ipv4(self):
1110 """ VPP-687 zero length udp ipv4 packet"""
1111 self.logger.info("ACLP_TEST_START_0022")
1113 port = random.randint(0, 65535)
1116 rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
1117 self.proto[self.IP][self.UDP]))
1118 # deny ip any any in the end
1120 self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1123 self.apply_rules(rules, "permit empty udp ip4 " + str(port))
1125 # Traffic should still pass
1126 # Create incoming packet streams for packet-generator interfaces
1128 pkts = self.create_stream(self.pg0, self.pg_if_packet_sizes,
1130 self.proto[self.IP][self.UDP], port,
1133 self.pg0.add_stream(pkts)
1134 pkts_cnt += len(pkts)
1136 # Enable packet capture and start packet sendingself.IPV
1137 self.pg_enable_capture(self.pg_interfaces)
1140 self.pg1.get_capture(pkts_cnt)
1142 self.logger.info("ACLP_TEST_FINISH_0022")
1144 def test_0023_zero_length_udp_ipv6(self):
1145 """ VPP-687 zero length udp ipv6 packet"""
1146 self.logger.info("ACLP_TEST_START_0023")
1148 port = random.randint(0, 65535)
1151 rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
1152 self.proto[self.IP][self.UDP]))
1153 # deny ip any any in the end
1154 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1157 self.apply_rules(rules, "permit empty udp ip6 "+str(port))
1159 # Traffic should still pass
1160 # Create incoming packet streams for packet-generator interfaces
1162 pkts = self.create_stream(self.pg0, self.pg_if_packet_sizes,
1164 self.proto[self.IP][self.UDP], port,
1167 self.pg0.add_stream(pkts)
1168 pkts_cnt += len(pkts)
1170 # Enable packet capture and start packet sendingself.IPV
1171 self.pg_enable_capture(self.pg_interfaces)
1174 # Verify outgoing packet streams per packet-generator interface
1175 self.pg1.get_capture(pkts_cnt)
1177 self.logger.info("ACLP_TEST_FINISH_0023")
1179 def test_0108_tcp_permit_v4(self):
1180 """ permit TCPv4 + non-match range
1182 self.logger.info("ACLP_TEST_START_0108")
1186 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1187 self.proto[self.IP][self.TCP]))
1188 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1189 self.proto[self.IP][self.TCP]))
1190 # deny ip any any in the end
1191 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1194 self.apply_rules(rules, "permit ipv4 tcp")
1196 # Traffic should still pass
1197 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP])
1199 self.logger.info("ACLP_TEST_FINISH_0108")
1201 def test_0109_tcp_permit_v6(self):
1202 """ permit TCPv6 + non-match range
1204 self.logger.info("ACLP_TEST_START_0109")
1208 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE_2,
1209 self.proto[self.IP][self.TCP]))
1210 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
1211 self.proto[self.IP][self.TCP]))
1212 # deny ip any any in the end
1213 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1216 self.apply_rules(rules, "permit ip6 tcp")
1218 # Traffic should still pass
1219 self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.TCP])
1221 self.logger.info("ACLP_TEST_FINISH_0109")
1223 def test_0110_udp_permit_v4(self):
1224 """ permit UDPv4 + non-match range
1226 self.logger.info("ACLP_TEST_START_0110")
1230 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1231 self.proto[self.IP][self.UDP]))
1232 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1233 self.proto[self.IP][self.UDP]))
1234 # deny ip any any in the end
1235 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1238 self.apply_rules(rules, "permit ipv4 udp")
1240 # Traffic should still pass
1241 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP])
1243 self.logger.info("ACLP_TEST_FINISH_0110")
1245 def test_0111_udp_permit_v6(self):
1246 """ permit UDPv6 + non-match range
1248 self.logger.info("ACLP_TEST_START_0111")
1252 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE_2,
1253 self.proto[self.IP][self.UDP]))
1254 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
1255 self.proto[self.IP][self.UDP]))
1256 # deny ip any any in the end
1257 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1260 self.apply_rules(rules, "permit ip6 udp")
1262 # Traffic should still pass
1263 self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.UDP])
1265 self.logger.info("ACLP_TEST_FINISH_0111")
1267 def test_0112_tcp_deny(self):
1268 """ deny TCPv4/v6 + non-match range
1270 self.logger.info("ACLP_TEST_START_0112")
1274 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1276 self.proto[self.IP][self.TCP]))
1277 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1279 self.proto[self.IP][self.TCP]))
1280 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
1281 self.proto[self.IP][self.TCP]))
1282 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
1283 self.proto[self.IP][self.TCP]))
1284 # permit ip any any in the end
1285 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1287 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1291 self.apply_rules(rules, "deny ip4/ip6 tcp")
1293 # Traffic should not pass
1294 self.run_verify_negat_test(self.IP, self.IPRANDOM,
1295 self.proto[self.IP][self.TCP])
1297 self.logger.info("ACLP_TEST_FINISH_0112")
1299 def test_0113_udp_deny(self):
1300 """ deny UDPv4/v6 + non-match range
1302 self.logger.info("ACLP_TEST_START_0113")
1306 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1308 self.proto[self.IP][self.UDP]))
1309 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1311 self.proto[self.IP][self.UDP]))
1312 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
1313 self.proto[self.IP][self.UDP]))
1314 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
1315 self.proto[self.IP][self.UDP]))
1316 # permit ip any any in the end
1317 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1319 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1323 self.apply_rules(rules, "deny ip4/ip6 udp")
1325 # Traffic should not pass
1326 self.run_verify_negat_test(self.IP, self.IPRANDOM,
1327 self.proto[self.IP][self.UDP])
1329 self.logger.info("ACLP_TEST_FINISH_0113")
1331 def test_0300_tcp_permit_v4_etype_aaaa(self):
1332 """ permit TCPv4, send 0xAAAA etype
1334 self.logger.info("ACLP_TEST_START_0300")
1338 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1339 self.proto[self.IP][self.TCP]))
1340 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1341 self.proto[self.IP][self.TCP]))
1342 # deny ip any any in the end
1343 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1346 self.apply_rules(rules, "permit ipv4 tcp")
1348 # Traffic should still pass
1349 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP])
1351 # Traffic should still pass also for an odd ethertype
1352 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP],
1353 0, False, True, 0xaaaa)
1355 self.logger.info("ACLP_TEST_FINISH_0300")
1357 def test_0305_tcp_permit_v4_etype_blacklist_aaaa(self):
1358 """ permit TCPv4, whitelist 0x0BBB ethertype, send 0xAAAA, 0x0BBB
1360 self.logger.info("ACLP_TEST_START_0305")
1364 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1365 self.proto[self.IP][self.TCP]))
1366 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1367 self.proto[self.IP][self.TCP]))
1368 # deny ip any any in the end
1369 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1372 self.apply_rules(rules, "permit ipv4 tcp")
1374 # whitelist the 0xbbbb etype - so the 0xaaaa should be blocked
1375 self.etype_whitelist([0xbbb], 1)
1377 # The IPv4 traffic should still pass
1378 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP])
1380 # The oddball ethertype should be blocked
1381 self.run_verify_negat_test(self.IP, self.IPV4,
1382 self.proto[self.IP][self.TCP],
1385 # The whitelisted traffic, on the other hand, should pass
1386 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP],
1387 0, False, True, 0x0bbb)
1389 # remove the whitelist, the previously blocked 0xAAAA should pass now
1390 self.etype_whitelist([], 0)
1391 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP],
1392 0, False, True, 0xaaaa)
1394 self.logger.info("ACLP_TEST_FINISH_0305")
1396 if __name__ == '__main__':
1397 unittest.main(testRunner=VppTestRunner)