2 """ACL plugin Test Case HLD:
8 from scapy.packet import Raw
9 from scapy.layers.l2 import Ether
10 from scapy.layers.inet import IP, TCP, UDP, ICMP
11 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest
12 from scapy.layers.inet6 import IPv6ExtHdrFragment
13 from framework import VppTestCase, VppTestRunner
14 from util import Host, ppp
16 from vpp_lo_interface import VppLoInterface
19 class TestACLplugin(VppTestCase):
20 """ ACL plugin Test Case """
36 proto = [[6, 17], [1, 58]]
37 proto_map = {1: 'ICMP', 58: 'ICMPv6EchoRequest', 6: 'TCP', 17: 'UDP'}
49 udp_sport_to = udp_sport_from + 5
50 udp_dport_from = 20000
51 udp_dport_to = udp_dport_from + 5000
53 tcp_sport_to = tcp_sport_from + 5
54 tcp_dport_from = 40000
55 tcp_dport_to = tcp_dport_from + 5000
58 udp_sport_to_2 = udp_sport_from_2 + 5
59 udp_dport_from_2 = 30000
60 udp_dport_to_2 = udp_dport_from_2 + 5000
61 tcp_sport_from_2 = 130
62 tcp_sport_to_2 = tcp_sport_from_2 + 5
63 tcp_dport_from_2 = 20000
64 tcp_dport_to_2 = tcp_dport_from_2 + 5000
66 icmp4_type = 8 # echo request
68 icmp6_type = 128 # echo request
84 Perform standard class setup (defined by class method setUpClass in
85 class VppTestCase) before running the test case, set test case related
86 variables and configure VPP.
88 super(TestACLplugin, cls).setUpClass()
91 # Create 2 pg interfaces
92 cls.create_pg_interfaces(range(2))
94 # Packet flows mapping pg0 -> pg1, pg2 etc.
96 cls.flows[cls.pg0] = [cls.pg1]
99 cls.pg_if_packet_sizes = [64, 512, 1518, 9018]
101 # Create BD with MAC learning and unknown unicast flooding disabled
102 # and put interfaces to this BD
103 cls.vapi.bridge_domain_add_del(bd_id=cls.bd_id, uu_flood=1,
105 for pg_if in cls.pg_interfaces:
106 cls.vapi.sw_interface_set_l2_bridge(pg_if.sw_if_index,
109 # Set up all interfaces
110 for i in cls.pg_interfaces:
113 # Mapping between packet-generator index and lists of test hosts
114 cls.hosts_by_pg_idx = dict()
115 for pg_if in cls.pg_interfaces:
116 cls.hosts_by_pg_idx[pg_if.sw_if_index] = []
118 # Create list of deleted hosts
119 cls.deleted_hosts_by_pg_idx = dict()
120 for pg_if in cls.pg_interfaces:
121 cls.deleted_hosts_by_pg_idx[pg_if.sw_if_index] = []
123 # warm-up the mac address tables
127 n_int = len(cls.pg_interfaces)
128 macs_per_if = count / n_int
130 for pg_if in cls.pg_interfaces:
132 start_nr = macs_per_if * i + start
133 end_nr = count + start if i == (n_int - 1) \
134 else macs_per_if * (i + 1) + start
135 hosts = cls.hosts_by_pg_idx[pg_if.sw_if_index]
136 for j in range(start_nr, end_nr):
138 "00:00:00:ff:%02x:%02x" % (pg_if.sw_if_index, j),
139 "172.17.1%02x.%u" % (pg_if.sw_if_index, j),
140 "2017:dead:%02x::%u" % (pg_if.sw_if_index, j))
144 super(TestACLplugin, cls).tearDownClass()
148 super(TestACLplugin, self).setUp()
149 self.reset_packet_infos()
153 Show various debug prints after each test.
155 super(TestACLplugin, self).tearDown()
156 if not self.vpp_dead:
157 self.logger.info(self.vapi.ppcli("show l2fib verbose"))
158 self.logger.info(self.vapi.ppcli("show acl-plugin acl"))
159 self.logger.info(self.vapi.ppcli("show acl-plugin interface"))
160 self.logger.info(self.vapi.ppcli("show acl-plugin tables"))
161 self.logger.info(self.vapi.ppcli("show bridge-domain %s detail"
164 def create_rule(self, ip=0, permit_deny=0, ports=PORTS_ALL, proto=-1,
165 s_prefix=0, s_ip='\x00\x00\x00\x00',
166 d_prefix=0, d_ip='\x00\x00\x00\x00'):
169 if ports == self.PORTS_ALL:
172 sport_to = 65535 if proto != 1 and proto != 58 else 255
174 elif ports == self.PORTS_RANGE:
176 sport_from = self.icmp4_type
177 sport_to = self.icmp4_type
178 dport_from = self.icmp4_code
179 dport_to = self.icmp4_code
181 sport_from = self.icmp6_type
182 sport_to = self.icmp6_type
183 dport_from = self.icmp6_code
184 dport_to = self.icmp6_code
185 elif proto == self.proto[self.IP][self.TCP]:
186 sport_from = self.tcp_sport_from
187 sport_to = self.tcp_sport_to
188 dport_from = self.tcp_dport_from
189 dport_to = self.tcp_dport_to
190 elif proto == self.proto[self.IP][self.UDP]:
191 sport_from = self.udp_sport_from
192 sport_to = self.udp_sport_to
193 dport_from = self.udp_dport_from
194 dport_to = self.udp_dport_to
195 elif ports == self.PORTS_RANGE_2:
197 sport_from = self.icmp4_type_2
198 sport_to = self.icmp4_type_2
199 dport_from = self.icmp4_code_from_2
200 dport_to = self.icmp4_code_to_2
202 sport_from = self.icmp6_type_2
203 sport_to = self.icmp6_type_2
204 dport_from = self.icmp6_code_from_2
205 dport_to = self.icmp6_code_to_2
206 elif proto == self.proto[self.IP][self.TCP]:
207 sport_from = self.tcp_sport_from_2
208 sport_to = self.tcp_sport_to_2
209 dport_from = self.tcp_dport_from_2
210 dport_to = self.tcp_dport_to_2
211 elif proto == self.proto[self.IP][self.UDP]:
212 sport_from = self.udp_sport_from_2
213 sport_to = self.udp_sport_to_2
214 dport_from = self.udp_dport_from_2
215 dport_to = self.udp_dport_to_2
222 rule = ({'is_permit': permit_deny, 'is_ipv6': ip, 'proto': proto,
223 'srcport_or_icmptype_first': sport_from,
224 'srcport_or_icmptype_last': sport_to,
225 'src_ip_prefix_len': s_prefix,
227 'dstport_or_icmpcode_first': dport_from,
228 'dstport_or_icmpcode_last': dport_to,
229 'dst_ip_prefix_len': d_prefix,
230 'dst_ip_addr': d_ip})
233 def apply_rules(self, rules, tag=''):
234 reply = self.vapi.acl_add_replace(acl_index=4294967295, r=rules,
236 self.logger.info("Dumped ACL: " + str(
237 self.vapi.acl_dump(reply.acl_index)))
238 # Apply a ACL on the interface as inbound
239 for i in self.pg_interfaces:
240 self.vapi.acl_interface_set_acl_list(sw_if_index=i.sw_if_index,
242 acls=[reply.acl_index])
245 def apply_rules_to(self, rules, tag='', sw_if_index=0xFFFFFFFF):
246 reply = self.vapi.acl_add_replace(acl_index=4294967295, r=rules,
248 self.logger.info("Dumped ACL: " + str(
249 self.vapi.acl_dump(reply.acl_index)))
250 # Apply a ACL on the interface as inbound
251 self.vapi.acl_interface_set_acl_list(sw_if_index=sw_if_index,
253 acls=[reply.acl_index])
256 def etype_whitelist(self, whitelist, n_input):
257 # Apply whitelists on all the interfaces
258 for i in self.pg_interfaces:
259 # checkstyle can't read long names. Help them.
260 fun = self.vapi.acl_interface_set_etype_whitelist
261 fun(sw_if_index=i.sw_if_index, n_input=n_input,
265 def create_upper_layer(self, packet_index, proto, ports=0):
266 p = self.proto_map[proto]
269 return UDP(sport=random.randint(self.udp_sport_from,
271 dport=random.randint(self.udp_dport_from,
274 return UDP(sport=ports, dport=ports)
277 return TCP(sport=random.randint(self.tcp_sport_from,
279 dport=random.randint(self.tcp_dport_from,
282 return TCP(sport=ports, dport=ports)
285 def create_stream(self, src_if, packet_sizes, traffic_type=0, ipv6=0,
286 proto=-1, ports=0, fragments=False,
287 pkt_raw=True, etype=-1):
289 Create input packet stream for defined interface using hosts or
292 :param object src_if: Interface to create packet stream for.
293 :param list packet_sizes: List of required packet sizes.
294 :param traffic_type: 1: ICMP packet, 2: IPv6 with EH, 0: otherwise.
295 :return: Stream of packets.
298 if self.flows.__contains__(src_if):
299 src_hosts = self.hosts_by_pg_idx[src_if.sw_if_index]
300 for dst_if in self.flows[src_if]:
301 dst_hosts = self.hosts_by_pg_idx[dst_if.sw_if_index]
302 n_int = len(dst_hosts) * len(src_hosts)
303 for i in range(0, n_int):
304 dst_host = dst_hosts[i / len(src_hosts)]
305 src_host = src_hosts[i % len(src_hosts)]
306 pkt_info = self.create_packet_info(src_if, dst_if)
312 pkt_info.ip = random.choice([0, 1])
314 pkt_info.proto = random.choice(self.proto[self.IP])
316 pkt_info.proto = proto
317 payload = self.info_to_payload(pkt_info)
318 p = Ether(dst=dst_host.mac, src=src_host.mac)
320 p = Ether(dst=dst_host.mac,
324 p /= IPv6(dst=dst_host.ip6, src=src_host.ip6)
326 p /= IPv6ExtHdrFragment(offset=64, m=1)
329 p /= IP(src=src_host.ip4, dst=dst_host.ip4,
332 p /= IP(src=src_host.ip4, dst=dst_host.ip4)
333 if traffic_type == self.ICMP:
335 p /= ICMPv6EchoRequest(type=self.icmp6_type,
336 code=self.icmp6_code)
338 p /= ICMP(type=self.icmp4_type,
339 code=self.icmp4_code)
341 p /= self.create_upper_layer(i, pkt_info.proto, ports)
344 pkt_info.data = p.copy()
346 size = random.choice(packet_sizes)
347 self.extend_packet(p, size)
351 def verify_capture(self, pg_if, capture,
352 traffic_type=0, ip_type=0, etype=-1):
354 Verify captured input packet stream for defined interface.
356 :param object pg_if: Interface to verify captured packet stream for.
357 :param list capture: Captured packet stream.
358 :param traffic_type: 1: ICMP packet, 2: IPv6 with EH, 0: otherwise.
361 for i in self.pg_interfaces:
362 last_info[i.sw_if_index] = None
363 dst_sw_if_index = pg_if.sw_if_index
364 for packet in capture:
366 if packet[Ether].type != etype:
367 self.logger.error(ppp("Unexpected ethertype in packet:",
372 # Raw data for ICMPv6 are stored in ICMPv6EchoRequest.data
373 if traffic_type == self.ICMP and ip_type == self.IPV6:
374 payload_info = self.payload_to_info(
375 packet[ICMPv6EchoRequest].data)
376 payload = packet[ICMPv6EchoRequest]
378 payload_info = self.payload_to_info(str(packet[Raw]))
379 payload = packet[self.proto_map[payload_info.proto]]
381 self.logger.error(ppp("Unexpected or invalid packet "
382 "(outside network):", packet))
386 self.assertEqual(payload_info.ip, ip_type)
387 if traffic_type == self.ICMP:
389 if payload_info.ip == 0:
390 self.assertEqual(payload.type, self.icmp4_type)
391 self.assertEqual(payload.code, self.icmp4_code)
393 self.assertEqual(payload.type, self.icmp6_type)
394 self.assertEqual(payload.code, self.icmp6_code)
396 self.logger.error(ppp("Unexpected or invalid packet "
397 "(outside network):", packet))
401 ip_version = IPv6 if payload_info.ip == 1 else IP
403 ip = packet[ip_version]
404 packet_index = payload_info.index
406 self.assertEqual(payload_info.dst, dst_sw_if_index)
407 self.logger.debug("Got packet on port %s: src=%u (id=%u)" %
408 (pg_if.name, payload_info.src,
410 next_info = self.get_next_packet_info_for_interface2(
411 payload_info.src, dst_sw_if_index,
412 last_info[payload_info.src])
413 last_info[payload_info.src] = next_info
414 self.assertTrue(next_info is not None)
415 self.assertEqual(packet_index, next_info.index)
416 saved_packet = next_info.data
417 # Check standard fields
418 self.assertEqual(ip.src, saved_packet[ip_version].src)
419 self.assertEqual(ip.dst, saved_packet[ip_version].dst)
420 p = self.proto_map[payload_info.proto]
423 self.assertEqual(tcp.sport, saved_packet[
425 self.assertEqual(tcp.dport, saved_packet[
429 self.assertEqual(udp.sport, saved_packet[
431 self.assertEqual(udp.dport, saved_packet[
434 self.logger.error(ppp("Unexpected or invalid packet:",
437 for i in self.pg_interfaces:
438 remaining_packet = self.get_next_packet_info_for_interface2(
439 i, dst_sw_if_index, last_info[i.sw_if_index])
441 remaining_packet is None,
442 "Port %u: Packet expected from source %u didn't arrive" %
443 (dst_sw_if_index, i.sw_if_index))
445 def run_traffic_no_check(self):
447 # Create incoming packet streams for packet-generator interfaces
448 for i in self.pg_interfaces:
449 if self.flows.__contains__(i):
450 pkts = self.create_stream(i, self.pg_if_packet_sizes)
454 # Enable packet capture and start packet sending
455 self.pg_enable_capture(self.pg_interfaces)
458 def run_verify_test(self, traffic_type=0, ip_type=0, proto=-1, ports=0,
459 frags=False, pkt_raw=True, etype=-1):
461 # Create incoming packet streams for packet-generator interfaces
463 for i in self.pg_interfaces:
464 if self.flows.__contains__(i):
465 pkts = self.create_stream(i, self.pg_if_packet_sizes,
466 traffic_type, ip_type, proto, ports,
467 frags, pkt_raw, etype)
470 pkts_cnt += len(pkts)
472 # Enable packet capture and start packet sendingself.IPV
473 self.pg_enable_capture(self.pg_interfaces)
475 self.logger.info("sent packets count: %d" % pkts_cnt)
478 # Verify outgoing packet streams per packet-generator interface
479 for src_if in self.pg_interfaces:
480 if self.flows.__contains__(src_if):
481 for dst_if in self.flows[src_if]:
482 capture = dst_if.get_capture(pkts_cnt)
483 self.logger.info("Verifying capture on interface %s" %
485 self.verify_capture(dst_if, capture,
486 traffic_type, ip_type, etype)
488 def run_verify_negat_test(self, traffic_type=0, ip_type=0, proto=-1,
489 ports=0, frags=False, etype=-1):
492 self.reset_packet_infos()
493 for i in self.pg_interfaces:
494 if self.flows.__contains__(i):
495 pkts = self.create_stream(i, self.pg_if_packet_sizes,
496 traffic_type, ip_type, proto, ports,
500 pkts_cnt += len(pkts)
502 # Enable packet capture and start packet sending
503 self.pg_enable_capture(self.pg_interfaces)
505 self.logger.info("sent packets count: %d" % pkts_cnt)
508 # Verify outgoing packet streams per packet-generator interface
509 for src_if in self.pg_interfaces:
510 if self.flows.__contains__(src_if):
511 for dst_if in self.flows[src_if]:
512 self.logger.info("Verifying capture on interface %s" %
514 capture = dst_if.get_capture(0)
515 self.assertEqual(len(capture), 0)
517 def test_0000_warmup_test(self):
518 """ ACL plugin version check; learn MACs
520 reply = self.vapi.papi.acl_plugin_get_version()
521 self.assertEqual(reply.major, 1)
522 self.logger.info("Working with ACL plugin version: %d.%d" % (
523 reply.major, reply.minor))
524 # minor version changes are non breaking
525 # self.assertEqual(reply.minor, 0)
527 def test_0001_acl_create(self):
528 """ ACL create/delete test
531 self.logger.info("ACLP_TEST_START_0001")
533 r = [{'is_permit': 1, 'is_ipv6': 0, 'proto': 17,
534 'srcport_or_icmptype_first': 1234,
535 'srcport_or_icmptype_last': 1235,
536 'src_ip_prefix_len': 0,
537 'src_ip_addr': '\x00\x00\x00\x00',
538 'dstport_or_icmpcode_first': 1234,
539 'dstport_or_icmpcode_last': 1234,
540 'dst_ip_addr': '\x00\x00\x00\x00',
541 'dst_ip_prefix_len': 0}]
542 # Test 1: add a new ACL
543 reply = self.vapi.acl_add_replace(acl_index=4294967295, r=r,
545 self.assertEqual(reply.retval, 0)
546 # The very first ACL gets #0
547 self.assertEqual(reply.acl_index, 0)
548 first_acl = reply.acl_index
549 rr = self.vapi.acl_dump(reply.acl_index)
550 self.logger.info("Dumped ACL: " + str(rr))
551 self.assertEqual(len(rr), 1)
552 # We should have the same number of ACL entries as we had asked
553 self.assertEqual(len(rr[0].r), len(r))
554 # The rules should be the same. But because the submitted and returned
555 # are different types, we need to iterate over rules and keys to get
557 for i_rule in range(0, len(r) - 1):
558 for rule_key in r[i_rule]:
559 self.assertEqual(rr[0].r[i_rule][rule_key],
562 # Add a deny-1234 ACL
563 r_deny = [{'is_permit': 0, 'is_ipv6': 0, 'proto': 17,
564 'srcport_or_icmptype_first': 1234,
565 'srcport_or_icmptype_last': 1235,
566 'src_ip_prefix_len': 0,
567 'src_ip_addr': '\x00\x00\x00\x00',
568 'dstport_or_icmpcode_first': 1234,
569 'dstport_or_icmpcode_last': 1234,
570 'dst_ip_addr': '\x00\x00\x00\x00',
571 'dst_ip_prefix_len': 0},
572 {'is_permit': 1, 'is_ipv6': 0, 'proto': 17,
573 'srcport_or_icmptype_first': 0,
574 'srcport_or_icmptype_last': 0,
575 'src_ip_prefix_len': 0,
576 'src_ip_addr': '\x00\x00\x00\x00',
577 'dstport_or_icmpcode_first': 0,
578 'dstport_or_icmpcode_last': 0,
579 'dst_ip_addr': '\x00\x00\x00\x00',
580 'dst_ip_prefix_len': 0}]
582 reply = self.vapi.acl_add_replace(acl_index=4294967295, r=r_deny,
583 tag="deny 1234;permit all")
584 self.assertEqual(reply.retval, 0)
585 # The second ACL gets #1
586 self.assertEqual(reply.acl_index, 1)
587 second_acl = reply.acl_index
589 # Test 2: try to modify a nonexistent ACL
590 reply = self.vapi.acl_add_replace(acl_index=432, r=r,
591 tag="FFFF:FFFF", expected_retval=-6)
592 self.assertEqual(reply.retval, -6)
593 # The ACL number should pass through
594 self.assertEqual(reply.acl_index, 432)
595 # apply an ACL on an interface inbound, try to delete ACL, must fail
596 self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
599 reply = self.vapi.acl_del(acl_index=first_acl, expected_retval=-142)
600 # Unapply an ACL and then try to delete it - must be ok
601 self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
604 reply = self.vapi.acl_del(acl_index=first_acl, expected_retval=0)
606 # apply an ACL on an interface outbound, try to delete ACL, must fail
607 self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
610 reply = self.vapi.acl_del(acl_index=second_acl, expected_retval=-143)
611 # Unapply the ACL and then try to delete it - must be ok
612 self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
615 reply = self.vapi.acl_del(acl_index=second_acl, expected_retval=0)
617 # try to apply a nonexistent ACL - must fail
618 self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
623 self.logger.info("ACLP_TEST_FINISH_0001")
625 def test_0002_acl_permit_apply(self):
626 """ permit ACL apply test
628 self.logger.info("ACLP_TEST_START_0002")
631 rules.append(self.create_rule(self.IPV4, self.PERMIT,
632 0, self.proto[self.IP][self.UDP]))
633 rules.append(self.create_rule(self.IPV4, self.PERMIT,
634 0, self.proto[self.IP][self.TCP]))
637 self.apply_rules(rules, "permit per-flow")
639 # Traffic should still pass
640 self.run_verify_test(self.IP, self.IPV4, -1)
641 self.logger.info("ACLP_TEST_FINISH_0002")
643 def test_0003_acl_deny_apply(self):
644 """ deny ACL apply test
646 self.logger.info("ACLP_TEST_START_0003")
647 # Add a deny-flows ACL
649 rules.append(self.create_rule(self.IPV4, self.DENY,
650 self.PORTS_ALL, self.proto[self.IP][self.UDP]))
651 # Permit ip any any in the end
652 rules.append(self.create_rule(self.IPV4, self.PERMIT,
656 self.apply_rules(rules, "deny per-flow;permit all")
658 # Traffic should not pass
659 self.run_verify_negat_test(self.IP, self.IPV4,
660 self.proto[self.IP][self.UDP])
661 self.logger.info("ACLP_TEST_FINISH_0003")
662 # self.assertEqual(1, 0)
664 def test_0004_vpp624_permit_icmpv4(self):
665 """ VPP_624 permit ICMPv4
667 self.logger.info("ACLP_TEST_START_0004")
671 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
672 self.proto[self.ICMP][self.ICMPv4]))
673 # deny ip any any in the end
674 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
677 self.apply_rules(rules, "permit icmpv4")
679 # Traffic should still pass
680 self.run_verify_test(self.ICMP, self.IPV4,
681 self.proto[self.ICMP][self.ICMPv4])
683 self.logger.info("ACLP_TEST_FINISH_0004")
685 def test_0005_vpp624_permit_icmpv6(self):
686 """ VPP_624 permit ICMPv6
688 self.logger.info("ACLP_TEST_START_0005")
692 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
693 self.proto[self.ICMP][self.ICMPv6]))
694 # deny ip any any in the end
695 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
698 self.apply_rules(rules, "permit icmpv6")
700 # Traffic should still pass
701 self.run_verify_test(self.ICMP, self.IPV6,
702 self.proto[self.ICMP][self.ICMPv6])
704 self.logger.info("ACLP_TEST_FINISH_0005")
706 def test_0006_vpp624_deny_icmpv4(self):
707 """ VPP_624 deny ICMPv4
709 self.logger.info("ACLP_TEST_START_0006")
712 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
713 self.proto[self.ICMP][self.ICMPv4]))
714 # permit ip any any in the end
715 rules.append(self.create_rule(self.IPV4, self.PERMIT,
719 self.apply_rules(rules, "deny icmpv4")
721 # Traffic should not pass
722 self.run_verify_negat_test(self.ICMP, self.IPV4, 0)
724 self.logger.info("ACLP_TEST_FINISH_0006")
726 def test_0007_vpp624_deny_icmpv6(self):
727 """ VPP_624 deny ICMPv6
729 self.logger.info("ACLP_TEST_START_0007")
732 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
733 self.proto[self.ICMP][self.ICMPv6]))
734 # deny ip any any in the end
735 rules.append(self.create_rule(self.IPV6, self.PERMIT,
739 self.apply_rules(rules, "deny icmpv6")
741 # Traffic should not pass
742 self.run_verify_negat_test(self.ICMP, self.IPV6, 0)
744 self.logger.info("ACLP_TEST_FINISH_0007")
746 def test_0008_tcp_permit_v4(self):
749 self.logger.info("ACLP_TEST_START_0008")
753 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
754 self.proto[self.IP][self.TCP]))
755 # deny ip any any in the end
756 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
759 self.apply_rules(rules, "permit ipv4 tcp")
761 # Traffic should still pass
762 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP])
764 self.logger.info("ACLP_TEST_FINISH_0008")
766 def test_0009_tcp_permit_v6(self):
769 self.logger.info("ACLP_TEST_START_0009")
773 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
774 self.proto[self.IP][self.TCP]))
775 # deny ip any any in the end
776 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
779 self.apply_rules(rules, "permit ip6 tcp")
781 # Traffic should still pass
782 self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.TCP])
784 self.logger.info("ACLP_TEST_FINISH_0008")
786 def test_0010_udp_permit_v4(self):
789 self.logger.info("ACLP_TEST_START_0010")
793 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
794 self.proto[self.IP][self.UDP]))
795 # deny ip any any in the end
796 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
799 self.apply_rules(rules, "permit ipv udp")
801 # Traffic should still pass
802 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP])
804 self.logger.info("ACLP_TEST_FINISH_0010")
806 def test_0011_udp_permit_v6(self):
809 self.logger.info("ACLP_TEST_START_0011")
813 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
814 self.proto[self.IP][self.UDP]))
815 # deny ip any any in the end
816 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
819 self.apply_rules(rules, "permit ip6 udp")
821 # Traffic should still pass
822 self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.UDP])
824 self.logger.info("ACLP_TEST_FINISH_0011")
826 def test_0012_tcp_deny(self):
829 self.logger.info("ACLP_TEST_START_0012")
833 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
834 self.proto[self.IP][self.TCP]))
835 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
836 self.proto[self.IP][self.TCP]))
837 # permit ip any any in the end
838 rules.append(self.create_rule(self.IPV4, self.PERMIT,
840 rules.append(self.create_rule(self.IPV6, self.PERMIT,
844 self.apply_rules(rules, "deny ip4/ip6 tcp")
846 # Traffic should not pass
847 self.run_verify_negat_test(self.IP, self.IPRANDOM,
848 self.proto[self.IP][self.TCP])
850 self.logger.info("ACLP_TEST_FINISH_0012")
852 def test_0013_udp_deny(self):
855 self.logger.info("ACLP_TEST_START_0013")
859 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
860 self.proto[self.IP][self.UDP]))
861 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
862 self.proto[self.IP][self.UDP]))
863 # permit ip any any in the end
864 rules.append(self.create_rule(self.IPV4, self.PERMIT,
866 rules.append(self.create_rule(self.IPV6, self.PERMIT,
870 self.apply_rules(rules, "deny ip4/ip6 udp")
872 # Traffic should not pass
873 self.run_verify_negat_test(self.IP, self.IPRANDOM,
874 self.proto[self.IP][self.UDP])
876 self.logger.info("ACLP_TEST_FINISH_0013")
878 def test_0014_acl_dump(self):
879 """ verify add/dump acls
881 self.logger.info("ACLP_TEST_START_0014")
883 r = [[self.IPV4, self.PERMIT, 1234, self.proto[self.IP][self.TCP]],
884 [self.IPV4, self.PERMIT, 2345, self.proto[self.IP][self.UDP]],
885 [self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.TCP]],
886 [self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.UDP]],
887 [self.IPV4, self.PERMIT, 5, self.proto[self.ICMP][self.ICMPv4]],
888 [self.IPV6, self.PERMIT, 4321, self.proto[self.IP][self.TCP]],
889 [self.IPV6, self.PERMIT, 5432, self.proto[self.IP][self.UDP]],
890 [self.IPV6, self.PERMIT, 0, self.proto[self.IP][self.TCP]],
891 [self.IPV6, self.PERMIT, 0, self.proto[self.IP][self.UDP]],
892 [self.IPV6, self.PERMIT, 6, self.proto[self.ICMP][self.ICMPv6]],
893 [self.IPV4, self.DENY, self.PORTS_ALL, 0],
894 [self.IPV4, self.DENY, 1234, self.proto[self.IP][self.TCP]],
895 [self.IPV4, self.DENY, 2345, self.proto[self.IP][self.UDP]],
896 [self.IPV4, self.DENY, 5, self.proto[self.ICMP][self.ICMPv4]],
897 [self.IPV6, self.DENY, 4321, self.proto[self.IP][self.TCP]],
898 [self.IPV6, self.DENY, 5432, self.proto[self.IP][self.UDP]],
899 [self.IPV6, self.DENY, 6, self.proto[self.ICMP][self.ICMPv6]],
900 [self.IPV6, self.DENY, self.PORTS_ALL, 0]
903 # Add and verify new ACLs
905 for i in range(len(r)):
906 rules.append(self.create_rule(r[i][0], r[i][1], r[i][2], r[i][3]))
908 reply = self.vapi.acl_add_replace(acl_index=4294967295, r=rules)
909 result = self.vapi.acl_dump(reply.acl_index)
912 for drules in result:
914 self.assertEqual(dr.is_ipv6, r[i][0])
915 self.assertEqual(dr.is_permit, r[i][1])
916 self.assertEqual(dr.proto, r[i][3])
919 self.assertEqual(dr.srcport_or_icmptype_first, r[i][2])
922 self.assertEqual(dr.srcport_or_icmptype_first, 0)
923 self.assertEqual(dr.srcport_or_icmptype_last, 65535)
925 if dr.proto == self.proto[self.IP][self.TCP]:
926 self.assertGreater(dr.srcport_or_icmptype_first,
927 self.tcp_sport_from-1)
928 self.assertLess(dr.srcport_or_icmptype_first,
930 self.assertGreater(dr.dstport_or_icmpcode_last,
931 self.tcp_dport_from-1)
932 self.assertLess(dr.dstport_or_icmpcode_last,
934 elif dr.proto == self.proto[self.IP][self.UDP]:
935 self.assertGreater(dr.srcport_or_icmptype_first,
936 self.udp_sport_from-1)
937 self.assertLess(dr.srcport_or_icmptype_first,
939 self.assertGreater(dr.dstport_or_icmpcode_last,
940 self.udp_dport_from-1)
941 self.assertLess(dr.dstport_or_icmpcode_last,
945 self.logger.info("ACLP_TEST_FINISH_0014")
947 def test_0015_tcp_permit_port_v4(self):
948 """ permit single TCPv4
950 self.logger.info("ACLP_TEST_START_0015")
952 port = random.randint(0, 65535)
955 rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
956 self.proto[self.IP][self.TCP]))
957 # deny ip any any in the end
958 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
961 self.apply_rules(rules, "permit ip4 tcp "+str(port))
963 # Traffic should still pass
964 self.run_verify_test(self.IP, self.IPV4,
965 self.proto[self.IP][self.TCP], port)
967 self.logger.info("ACLP_TEST_FINISH_0015")
969 def test_0016_udp_permit_port_v4(self):
970 """ permit single UDPv4
972 self.logger.info("ACLP_TEST_START_0016")
974 port = random.randint(0, 65535)
977 rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
978 self.proto[self.IP][self.UDP]))
979 # deny ip any any in the end
980 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
983 self.apply_rules(rules, "permit ip4 tcp "+str(port))
985 # Traffic should still pass
986 self.run_verify_test(self.IP, self.IPV4,
987 self.proto[self.IP][self.UDP], port)
989 self.logger.info("ACLP_TEST_FINISH_0016")
991 def test_0017_tcp_permit_port_v6(self):
992 """ permit single TCPv6
994 self.logger.info("ACLP_TEST_START_0017")
996 port = random.randint(0, 65535)
999 rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
1000 self.proto[self.IP][self.TCP]))
1001 # deny ip any any in the end
1002 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1005 self.apply_rules(rules, "permit ip4 tcp "+str(port))
1007 # Traffic should still pass
1008 self.run_verify_test(self.IP, self.IPV6,
1009 self.proto[self.IP][self.TCP], port)
1011 self.logger.info("ACLP_TEST_FINISH_0017")
1013 def test_0018_udp_permit_port_v6(self):
1014 """ permit single UPPv6
1016 self.logger.info("ACLP_TEST_START_0018")
1018 port = random.randint(0, 65535)
1021 rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
1022 self.proto[self.IP][self.UDP]))
1023 # deny ip any any in the end
1024 rules.append(self.create_rule(self.IPV6, self.DENY,
1028 self.apply_rules(rules, "permit ip4 tcp "+str(port))
1030 # Traffic should still pass
1031 self.run_verify_test(self.IP, self.IPV6,
1032 self.proto[self.IP][self.UDP], port)
1034 self.logger.info("ACLP_TEST_FINISH_0018")
1036 def test_0019_udp_deny_port(self):
1037 """ deny single TCPv4/v6
1039 self.logger.info("ACLP_TEST_START_0019")
1041 port = random.randint(0, 65535)
1044 rules.append(self.create_rule(self.IPV4, self.DENY, port,
1045 self.proto[self.IP][self.TCP]))
1046 rules.append(self.create_rule(self.IPV6, self.DENY, port,
1047 self.proto[self.IP][self.TCP]))
1048 # Permit ip any any in the end
1049 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1051 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1055 self.apply_rules(rules, "deny ip4/ip6 udp "+str(port))
1057 # Traffic should not pass
1058 self.run_verify_negat_test(self.IP, self.IPRANDOM,
1059 self.proto[self.IP][self.TCP], port)
1061 self.logger.info("ACLP_TEST_FINISH_0019")
1063 def test_0020_udp_deny_port(self):
1064 """ deny single UDPv4/v6
1066 self.logger.info("ACLP_TEST_START_0020")
1068 port = random.randint(0, 65535)
1071 rules.append(self.create_rule(self.IPV4, self.DENY, port,
1072 self.proto[self.IP][self.UDP]))
1073 rules.append(self.create_rule(self.IPV6, self.DENY, port,
1074 self.proto[self.IP][self.UDP]))
1075 # Permit ip any any in the end
1076 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1078 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1082 self.apply_rules(rules, "deny ip4/ip6 udp "+str(port))
1084 # Traffic should not pass
1085 self.run_verify_negat_test(self.IP, self.IPRANDOM,
1086 self.proto[self.IP][self.UDP], port)
1088 self.logger.info("ACLP_TEST_FINISH_0020")
1090 def test_0021_udp_deny_port_verify_fragment_deny(self):
1091 """ deny single UDPv4/v6, permit ip any, verify non-initial fragment
1094 self.logger.info("ACLP_TEST_START_0021")
1096 port = random.randint(0, 65535)
1099 rules.append(self.create_rule(self.IPV4, self.DENY, port,
1100 self.proto[self.IP][self.UDP]))
1101 rules.append(self.create_rule(self.IPV6, self.DENY, port,
1102 self.proto[self.IP][self.UDP]))
1103 # deny ip any any in the end
1104 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1106 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1110 self.apply_rules(rules, "deny ip4/ip6 udp "+str(port))
1112 # Traffic should not pass
1113 self.run_verify_negat_test(self.IP, self.IPRANDOM,
1114 self.proto[self.IP][self.UDP], port, True)
1116 self.logger.info("ACLP_TEST_FINISH_0021")
1118 def test_0022_zero_length_udp_ipv4(self):
1119 """ VPP-687 zero length udp ipv4 packet"""
1120 self.logger.info("ACLP_TEST_START_0022")
1122 port = random.randint(0, 65535)
1125 rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
1126 self.proto[self.IP][self.UDP]))
1127 # deny ip any any in the end
1129 self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1132 self.apply_rules(rules, "permit empty udp ip4 " + str(port))
1134 # Traffic should still pass
1135 # Create incoming packet streams for packet-generator interfaces
1137 pkts = self.create_stream(self.pg0, self.pg_if_packet_sizes,
1139 self.proto[self.IP][self.UDP], port,
1142 self.pg0.add_stream(pkts)
1143 pkts_cnt += len(pkts)
1145 # Enable packet capture and start packet sendingself.IPV
1146 self.pg_enable_capture(self.pg_interfaces)
1149 self.pg1.get_capture(pkts_cnt)
1151 self.logger.info("ACLP_TEST_FINISH_0022")
1153 def test_0023_zero_length_udp_ipv6(self):
1154 """ VPP-687 zero length udp ipv6 packet"""
1155 self.logger.info("ACLP_TEST_START_0023")
1157 port = random.randint(0, 65535)
1160 rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
1161 self.proto[self.IP][self.UDP]))
1162 # deny ip any any in the end
1163 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1166 self.apply_rules(rules, "permit empty udp ip6 "+str(port))
1168 # Traffic should still pass
1169 # Create incoming packet streams for packet-generator interfaces
1171 pkts = self.create_stream(self.pg0, self.pg_if_packet_sizes,
1173 self.proto[self.IP][self.UDP], port,
1176 self.pg0.add_stream(pkts)
1177 pkts_cnt += len(pkts)
1179 # Enable packet capture and start packet sendingself.IPV
1180 self.pg_enable_capture(self.pg_interfaces)
1183 # Verify outgoing packet streams per packet-generator interface
1184 self.pg1.get_capture(pkts_cnt)
1186 self.logger.info("ACLP_TEST_FINISH_0023")
1188 def test_0108_tcp_permit_v4(self):
1189 """ permit TCPv4 + non-match range
1191 self.logger.info("ACLP_TEST_START_0108")
1195 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1196 self.proto[self.IP][self.TCP]))
1197 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1198 self.proto[self.IP][self.TCP]))
1199 # deny ip any any in the end
1200 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1203 self.apply_rules(rules, "permit ipv4 tcp")
1205 # Traffic should still pass
1206 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP])
1208 self.logger.info("ACLP_TEST_FINISH_0108")
1210 def test_0109_tcp_permit_v6(self):
1211 """ permit TCPv6 + non-match range
1213 self.logger.info("ACLP_TEST_START_0109")
1217 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE_2,
1218 self.proto[self.IP][self.TCP]))
1219 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
1220 self.proto[self.IP][self.TCP]))
1221 # deny ip any any in the end
1222 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1225 self.apply_rules(rules, "permit ip6 tcp")
1227 # Traffic should still pass
1228 self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.TCP])
1230 self.logger.info("ACLP_TEST_FINISH_0109")
1232 def test_0110_udp_permit_v4(self):
1233 """ permit UDPv4 + non-match range
1235 self.logger.info("ACLP_TEST_START_0110")
1239 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1240 self.proto[self.IP][self.UDP]))
1241 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1242 self.proto[self.IP][self.UDP]))
1243 # deny ip any any in the end
1244 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1247 self.apply_rules(rules, "permit ipv4 udp")
1249 # Traffic should still pass
1250 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP])
1252 self.logger.info("ACLP_TEST_FINISH_0110")
1254 def test_0111_udp_permit_v6(self):
1255 """ permit UDPv6 + non-match range
1257 self.logger.info("ACLP_TEST_START_0111")
1261 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE_2,
1262 self.proto[self.IP][self.UDP]))
1263 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
1264 self.proto[self.IP][self.UDP]))
1265 # deny ip any any in the end
1266 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1269 self.apply_rules(rules, "permit ip6 udp")
1271 # Traffic should still pass
1272 self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.UDP])
1274 self.logger.info("ACLP_TEST_FINISH_0111")
1276 def test_0112_tcp_deny(self):
1277 """ deny TCPv4/v6 + non-match range
1279 self.logger.info("ACLP_TEST_START_0112")
1283 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1285 self.proto[self.IP][self.TCP]))
1286 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1288 self.proto[self.IP][self.TCP]))
1289 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
1290 self.proto[self.IP][self.TCP]))
1291 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
1292 self.proto[self.IP][self.TCP]))
1293 # permit ip any any in the end
1294 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1296 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1300 self.apply_rules(rules, "deny ip4/ip6 tcp")
1302 # Traffic should not pass
1303 self.run_verify_negat_test(self.IP, self.IPRANDOM,
1304 self.proto[self.IP][self.TCP])
1306 self.logger.info("ACLP_TEST_FINISH_0112")
1308 def test_0113_udp_deny(self):
1309 """ deny UDPv4/v6 + non-match range
1311 self.logger.info("ACLP_TEST_START_0113")
1315 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1317 self.proto[self.IP][self.UDP]))
1318 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1320 self.proto[self.IP][self.UDP]))
1321 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
1322 self.proto[self.IP][self.UDP]))
1323 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
1324 self.proto[self.IP][self.UDP]))
1325 # permit ip any any in the end
1326 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1328 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1332 self.apply_rules(rules, "deny ip4/ip6 udp")
1334 # Traffic should not pass
1335 self.run_verify_negat_test(self.IP, self.IPRANDOM,
1336 self.proto[self.IP][self.UDP])
1338 self.logger.info("ACLP_TEST_FINISH_0113")
1340 def test_0300_tcp_permit_v4_etype_aaaa(self):
1341 """ permit TCPv4, send 0xAAAA etype
1343 self.logger.info("ACLP_TEST_START_0300")
1347 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1348 self.proto[self.IP][self.TCP]))
1349 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1350 self.proto[self.IP][self.TCP]))
1351 # deny ip any any in the end
1352 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1355 self.apply_rules(rules, "permit ipv4 tcp")
1357 # Traffic should still pass also for an odd ethertype
1358 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP],
1359 0, False, True, 0xaaaa)
1360 self.logger.info("ACLP_TEST_FINISH_0300")
1362 def test_0305_tcp_permit_v4_etype_blacklist_aaaa(self):
1363 """ permit TCPv4, whitelist 0x0BBB ethertype, send 0xAAAA-blocked
1365 self.logger.info("ACLP_TEST_START_0305")
1369 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1370 self.proto[self.IP][self.TCP]))
1371 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1372 self.proto[self.IP][self.TCP]))
1373 # deny ip any any in the end
1374 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1377 self.apply_rules(rules, "permit ipv4 tcp")
1379 # whitelist the 0xbbbb etype - so the 0xaaaa should be blocked
1380 self.etype_whitelist([0xbbb], 1)
1382 # The oddball ethertype should be blocked
1383 self.run_verify_negat_test(self.IP, self.IPV4,
1384 self.proto[self.IP][self.TCP],
1387 # remove the whitelist
1388 self.etype_whitelist([], 0)
1390 self.logger.info("ACLP_TEST_FINISH_0305")
1392 def test_0306_tcp_permit_v4_etype_blacklist_aaaa(self):
1393 """ permit TCPv4, whitelist 0x0BBB ethertype, send 0x0BBB - pass
1395 self.logger.info("ACLP_TEST_START_0306")
1399 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1400 self.proto[self.IP][self.TCP]))
1401 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1402 self.proto[self.IP][self.TCP]))
1403 # deny ip any any in the end
1404 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1407 self.apply_rules(rules, "permit ipv4 tcp")
1409 # whitelist the 0xbbbb etype - so the 0xaaaa should be blocked
1410 self.etype_whitelist([0xbbb], 1)
1412 # The whitelisted traffic, should pass
1413 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP],
1414 0, False, True, 0x0bbb)
1416 # remove the whitelist, the previously blocked 0xAAAA should pass now
1417 self.etype_whitelist([], 0)
1419 self.logger.info("ACLP_TEST_FINISH_0306")
1421 def test_0307_tcp_permit_v4_etype_blacklist_aaaa(self):
1422 """ permit TCPv4, whitelist 0x0BBB, remove, send 0xAAAA - pass
1424 self.logger.info("ACLP_TEST_START_0307")
1428 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1429 self.proto[self.IP][self.TCP]))
1430 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1431 self.proto[self.IP][self.TCP]))
1432 # deny ip any any in the end
1433 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1436 self.apply_rules(rules, "permit ipv4 tcp")
1438 # whitelist the 0xbbbb etype - so the 0xaaaa should be blocked
1439 self.etype_whitelist([0xbbb], 1)
1440 # remove the whitelist, the previously blocked 0xAAAA should pass now
1441 self.etype_whitelist([], 0)
1443 # The whitelisted traffic, should pass
1444 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP],
1445 0, False, True, 0xaaaa)
1447 self.logger.info("ACLP_TEST_FINISH_0306")
1449 def test_0315_del_intf(self):
1450 """ apply an acl and delete the interface
1452 self.logger.info("ACLP_TEST_START_0315")
1456 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1457 self.proto[self.IP][self.TCP]))
1458 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1459 self.proto[self.IP][self.TCP]))
1460 # deny ip any any in the end
1461 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1463 # create an interface
1465 intf.append(VppLoInterface(self))
1468 self.apply_rules_to(rules, "permit ipv4 tcp", intf[0].sw_if_index)
1470 # Remove the interface
1471 intf[0].remove_vpp_config()
1473 self.logger.info("ACLP_TEST_FINISH_0315")
1475 if __name__ == '__main__':
1476 unittest.main(testRunner=VppTestRunner)