2 """ACL plugin Test Case HLD:
8 from scapy.packet import Raw
9 from scapy.layers.l2 import Ether
10 from scapy.layers.inet import IP, TCP, UDP, ICMP
11 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest
12 from scapy.layers.inet6 import IPv6ExtHdrFragment
13 from framework import VppTestCase, VppTestRunner
14 from util import Host, ppp
16 from vpp_lo_interface import VppLoInterface
19 class TestACLplugin(VppTestCase):
20 """ ACL plugin Test Case """
36 proto = [[6, 17], [1, 58]]
37 proto_map = {1: 'ICMP', 58: 'ICMPv6EchoRequest', 6: 'TCP', 17: 'UDP'}
49 udp_sport_to = udp_sport_from + 5
50 udp_dport_from = 20000
51 udp_dport_to = udp_dport_from + 5000
53 tcp_sport_to = tcp_sport_from + 5
54 tcp_dport_from = 40000
55 tcp_dport_to = tcp_dport_from + 5000
58 udp_sport_to_2 = udp_sport_from_2 + 5
59 udp_dport_from_2 = 30000
60 udp_dport_to_2 = udp_dport_from_2 + 5000
61 tcp_sport_from_2 = 130
62 tcp_sport_to_2 = tcp_sport_from_2 + 5
63 tcp_dport_from_2 = 20000
64 tcp_dport_to_2 = tcp_dport_from_2 + 5000
66 icmp4_type = 8 # echo request
68 icmp6_type = 128 # echo request
84 Perform standard class setup (defined by class method setUpClass in
85 class VppTestCase) before running the test case, set test case related
86 variables and configure VPP.
88 super(TestACLplugin, cls).setUpClass()
91 # Create 2 pg interfaces
92 cls.create_pg_interfaces(range(2))
94 # Packet flows mapping pg0 -> pg1, pg2 etc.
96 cls.flows[cls.pg0] = [cls.pg1]
99 cls.pg_if_packet_sizes = [64, 512, 1518, 9018]
101 # Create BD with MAC learning and unknown unicast flooding disabled
102 # and put interfaces to this BD
103 cls.vapi.bridge_domain_add_del(bd_id=cls.bd_id, uu_flood=1,
105 for pg_if in cls.pg_interfaces:
106 cls.vapi.sw_interface_set_l2_bridge(pg_if.sw_if_index,
109 # Set up all interfaces
110 for i in cls.pg_interfaces:
113 # Mapping between packet-generator index and lists of test hosts
114 cls.hosts_by_pg_idx = dict()
115 for pg_if in cls.pg_interfaces:
116 cls.hosts_by_pg_idx[pg_if.sw_if_index] = []
118 # Create list of deleted hosts
119 cls.deleted_hosts_by_pg_idx = dict()
120 for pg_if in cls.pg_interfaces:
121 cls.deleted_hosts_by_pg_idx[pg_if.sw_if_index] = []
123 # warm-up the mac address tables
127 n_int = len(cls.pg_interfaces)
128 macs_per_if = count / n_int
130 for pg_if in cls.pg_interfaces:
132 start_nr = macs_per_if * i + start
133 end_nr = count + start if i == (n_int - 1) \
134 else macs_per_if * (i + 1) + start
135 hosts = cls.hosts_by_pg_idx[pg_if.sw_if_index]
136 for j in range(start_nr, end_nr):
138 "00:00:00:ff:%02x:%02x" % (pg_if.sw_if_index, j),
139 "172.17.1%02x.%u" % (pg_if.sw_if_index, j),
140 "2017:dead:%02x::%u" % (pg_if.sw_if_index, j))
144 super(TestACLplugin, cls).tearDownClass()
148 super(TestACLplugin, self).setUp()
149 self.reset_packet_infos()
153 Show various debug prints after each test.
155 super(TestACLplugin, self).tearDown()
156 if not self.vpp_dead:
157 cli = "show vlib graph l2-input-feat-arc"
158 self.logger.info(self.vapi.ppcli(cli))
159 cli = "show vlib graph l2-input-feat-arc-end"
160 self.logger.info(self.vapi.ppcli(cli))
161 cli = "show vlib graph l2-output-feat-arc"
162 self.logger.info(self.vapi.ppcli(cli))
163 cli = "show vlib graph l2-output-feat-arc-end"
164 self.logger.info(self.vapi.ppcli(cli))
165 self.logger.info(self.vapi.ppcli("show l2fib verbose"))
166 self.logger.info(self.vapi.ppcli("show acl-plugin acl"))
167 self.logger.info(self.vapi.ppcli("show acl-plugin interface"))
168 self.logger.info(self.vapi.ppcli("show acl-plugin tables"))
169 self.logger.info(self.vapi.ppcli("show bridge-domain %s detail"
172 def create_rule(self, ip=0, permit_deny=0, ports=PORTS_ALL, proto=-1,
173 s_prefix=0, s_ip='\x00\x00\x00\x00',
174 d_prefix=0, d_ip='\x00\x00\x00\x00'):
177 if ports == self.PORTS_ALL:
180 sport_to = 65535 if proto != 1 and proto != 58 else 255
182 elif ports == self.PORTS_RANGE:
184 sport_from = self.icmp4_type
185 sport_to = self.icmp4_type
186 dport_from = self.icmp4_code
187 dport_to = self.icmp4_code
189 sport_from = self.icmp6_type
190 sport_to = self.icmp6_type
191 dport_from = self.icmp6_code
192 dport_to = self.icmp6_code
193 elif proto == self.proto[self.IP][self.TCP]:
194 sport_from = self.tcp_sport_from
195 sport_to = self.tcp_sport_to
196 dport_from = self.tcp_dport_from
197 dport_to = self.tcp_dport_to
198 elif proto == self.proto[self.IP][self.UDP]:
199 sport_from = self.udp_sport_from
200 sport_to = self.udp_sport_to
201 dport_from = self.udp_dport_from
202 dport_to = self.udp_dport_to
203 elif ports == self.PORTS_RANGE_2:
205 sport_from = self.icmp4_type_2
206 sport_to = self.icmp4_type_2
207 dport_from = self.icmp4_code_from_2
208 dport_to = self.icmp4_code_to_2
210 sport_from = self.icmp6_type_2
211 sport_to = self.icmp6_type_2
212 dport_from = self.icmp6_code_from_2
213 dport_to = self.icmp6_code_to_2
214 elif proto == self.proto[self.IP][self.TCP]:
215 sport_from = self.tcp_sport_from_2
216 sport_to = self.tcp_sport_to_2
217 dport_from = self.tcp_dport_from_2
218 dport_to = self.tcp_dport_to_2
219 elif proto == self.proto[self.IP][self.UDP]:
220 sport_from = self.udp_sport_from_2
221 sport_to = self.udp_sport_to_2
222 dport_from = self.udp_dport_from_2
223 dport_to = self.udp_dport_to_2
230 rule = ({'is_permit': permit_deny, 'is_ipv6': ip, 'proto': proto,
231 'srcport_or_icmptype_first': sport_from,
232 'srcport_or_icmptype_last': sport_to,
233 'src_ip_prefix_len': s_prefix,
235 'dstport_or_icmpcode_first': dport_from,
236 'dstport_or_icmpcode_last': dport_to,
237 'dst_ip_prefix_len': d_prefix,
238 'dst_ip_addr': d_ip})
241 def apply_rules(self, rules, tag=''):
242 reply = self.vapi.acl_add_replace(acl_index=4294967295, r=rules,
244 self.logger.info("Dumped ACL: " + str(
245 self.vapi.acl_dump(reply.acl_index)))
246 # Apply a ACL on the interface as inbound
247 for i in self.pg_interfaces:
248 self.vapi.acl_interface_set_acl_list(sw_if_index=i.sw_if_index,
250 acls=[reply.acl_index])
253 def apply_rules_to(self, rules, tag='', sw_if_index=0xFFFFFFFF):
254 reply = self.vapi.acl_add_replace(acl_index=4294967295, r=rules,
256 self.logger.info("Dumped ACL: " + str(
257 self.vapi.acl_dump(reply.acl_index)))
258 # Apply a ACL on the interface as inbound
259 self.vapi.acl_interface_set_acl_list(sw_if_index=sw_if_index,
261 acls=[reply.acl_index])
264 def etype_whitelist(self, whitelist, n_input):
265 # Apply whitelists on all the interfaces
266 for i in self.pg_interfaces:
267 # checkstyle can't read long names. Help them.
268 fun = self.vapi.acl_interface_set_etype_whitelist
269 fun(sw_if_index=i.sw_if_index, n_input=n_input,
273 def create_upper_layer(self, packet_index, proto, ports=0):
274 p = self.proto_map[proto]
277 return UDP(sport=random.randint(self.udp_sport_from,
279 dport=random.randint(self.udp_dport_from,
282 return UDP(sport=ports, dport=ports)
285 return TCP(sport=random.randint(self.tcp_sport_from,
287 dport=random.randint(self.tcp_dport_from,
290 return TCP(sport=ports, dport=ports)
293 def create_stream(self, src_if, packet_sizes, traffic_type=0, ipv6=0,
294 proto=-1, ports=0, fragments=False,
295 pkt_raw=True, etype=-1):
297 Create input packet stream for defined interface using hosts or
300 :param object src_if: Interface to create packet stream for.
301 :param list packet_sizes: List of required packet sizes.
302 :param traffic_type: 1: ICMP packet, 2: IPv6 with EH, 0: otherwise.
303 :return: Stream of packets.
306 if self.flows.__contains__(src_if):
307 src_hosts = self.hosts_by_pg_idx[src_if.sw_if_index]
308 for dst_if in self.flows[src_if]:
309 dst_hosts = self.hosts_by_pg_idx[dst_if.sw_if_index]
310 n_int = len(dst_hosts) * len(src_hosts)
311 for i in range(0, n_int):
312 dst_host = dst_hosts[i / len(src_hosts)]
313 src_host = src_hosts[i % len(src_hosts)]
314 pkt_info = self.create_packet_info(src_if, dst_if)
320 pkt_info.ip = random.choice([0, 1])
322 pkt_info.proto = random.choice(self.proto[self.IP])
324 pkt_info.proto = proto
325 payload = self.info_to_payload(pkt_info)
326 p = Ether(dst=dst_host.mac, src=src_host.mac)
328 p = Ether(dst=dst_host.mac,
332 p /= IPv6(dst=dst_host.ip6, src=src_host.ip6)
334 p /= IPv6ExtHdrFragment(offset=64, m=1)
337 p /= IP(src=src_host.ip4, dst=dst_host.ip4,
340 p /= IP(src=src_host.ip4, dst=dst_host.ip4)
341 if traffic_type == self.ICMP:
343 p /= ICMPv6EchoRequest(type=self.icmp6_type,
344 code=self.icmp6_code)
346 p /= ICMP(type=self.icmp4_type,
347 code=self.icmp4_code)
349 p /= self.create_upper_layer(i, pkt_info.proto, ports)
352 pkt_info.data = p.copy()
354 size = random.choice(packet_sizes)
355 self.extend_packet(p, size)
359 def verify_capture(self, pg_if, capture,
360 traffic_type=0, ip_type=0, etype=-1):
362 Verify captured input packet stream for defined interface.
364 :param object pg_if: Interface to verify captured packet stream for.
365 :param list capture: Captured packet stream.
366 :param traffic_type: 1: ICMP packet, 2: IPv6 with EH, 0: otherwise.
369 for i in self.pg_interfaces:
370 last_info[i.sw_if_index] = None
371 dst_sw_if_index = pg_if.sw_if_index
372 for packet in capture:
374 if packet[Ether].type != etype:
375 self.logger.error(ppp("Unexpected ethertype in packet:",
380 # Raw data for ICMPv6 are stored in ICMPv6EchoRequest.data
381 if traffic_type == self.ICMP and ip_type == self.IPV6:
382 payload_info = self.payload_to_info(
383 packet[ICMPv6EchoRequest].data)
384 payload = packet[ICMPv6EchoRequest]
386 payload_info = self.payload_to_info(str(packet[Raw]))
387 payload = packet[self.proto_map[payload_info.proto]]
389 self.logger.error(ppp("Unexpected or invalid packet "
390 "(outside network):", packet))
394 self.assertEqual(payload_info.ip, ip_type)
395 if traffic_type == self.ICMP:
397 if payload_info.ip == 0:
398 self.assertEqual(payload.type, self.icmp4_type)
399 self.assertEqual(payload.code, self.icmp4_code)
401 self.assertEqual(payload.type, self.icmp6_type)
402 self.assertEqual(payload.code, self.icmp6_code)
404 self.logger.error(ppp("Unexpected or invalid packet "
405 "(outside network):", packet))
409 ip_version = IPv6 if payload_info.ip == 1 else IP
411 ip = packet[ip_version]
412 packet_index = payload_info.index
414 self.assertEqual(payload_info.dst, dst_sw_if_index)
415 self.logger.debug("Got packet on port %s: src=%u (id=%u)" %
416 (pg_if.name, payload_info.src,
418 next_info = self.get_next_packet_info_for_interface2(
419 payload_info.src, dst_sw_if_index,
420 last_info[payload_info.src])
421 last_info[payload_info.src] = next_info
422 self.assertTrue(next_info is not None)
423 self.assertEqual(packet_index, next_info.index)
424 saved_packet = next_info.data
425 # Check standard fields
426 self.assertEqual(ip.src, saved_packet[ip_version].src)
427 self.assertEqual(ip.dst, saved_packet[ip_version].dst)
428 p = self.proto_map[payload_info.proto]
431 self.assertEqual(tcp.sport, saved_packet[
433 self.assertEqual(tcp.dport, saved_packet[
437 self.assertEqual(udp.sport, saved_packet[
439 self.assertEqual(udp.dport, saved_packet[
442 self.logger.error(ppp("Unexpected or invalid packet:",
445 for i in self.pg_interfaces:
446 remaining_packet = self.get_next_packet_info_for_interface2(
447 i, dst_sw_if_index, last_info[i.sw_if_index])
449 remaining_packet is None,
450 "Port %u: Packet expected from source %u didn't arrive" %
451 (dst_sw_if_index, i.sw_if_index))
453 def run_traffic_no_check(self):
455 # Create incoming packet streams for packet-generator interfaces
456 for i in self.pg_interfaces:
457 if self.flows.__contains__(i):
458 pkts = self.create_stream(i, self.pg_if_packet_sizes)
462 # Enable packet capture and start packet sending
463 self.pg_enable_capture(self.pg_interfaces)
466 def run_verify_test(self, traffic_type=0, ip_type=0, proto=-1, ports=0,
467 frags=False, pkt_raw=True, etype=-1):
469 # Create incoming packet streams for packet-generator interfaces
471 for i in self.pg_interfaces:
472 if self.flows.__contains__(i):
473 pkts = self.create_stream(i, self.pg_if_packet_sizes,
474 traffic_type, ip_type, proto, ports,
475 frags, pkt_raw, etype)
478 pkts_cnt += len(pkts)
480 # Enable packet capture and start packet sendingself.IPV
481 self.pg_enable_capture(self.pg_interfaces)
483 self.logger.info("sent packets count: %d" % pkts_cnt)
486 # Verify outgoing packet streams per packet-generator interface
487 for src_if in self.pg_interfaces:
488 if self.flows.__contains__(src_if):
489 for dst_if in self.flows[src_if]:
490 capture = dst_if.get_capture(pkts_cnt)
491 self.logger.info("Verifying capture on interface %s" %
493 self.verify_capture(dst_if, capture,
494 traffic_type, ip_type, etype)
496 def run_verify_negat_test(self, traffic_type=0, ip_type=0, proto=-1,
497 ports=0, frags=False, etype=-1):
500 self.reset_packet_infos()
501 for i in self.pg_interfaces:
502 if self.flows.__contains__(i):
503 pkts = self.create_stream(i, self.pg_if_packet_sizes,
504 traffic_type, ip_type, proto, ports,
508 pkts_cnt += len(pkts)
510 # Enable packet capture and start packet sending
511 self.pg_enable_capture(self.pg_interfaces)
513 self.logger.info("sent packets count: %d" % pkts_cnt)
516 # Verify outgoing packet streams per packet-generator interface
517 for src_if in self.pg_interfaces:
518 if self.flows.__contains__(src_if):
519 for dst_if in self.flows[src_if]:
520 self.logger.info("Verifying capture on interface %s" %
522 capture = dst_if.get_capture(0)
523 self.assertEqual(len(capture), 0)
525 def test_0000_warmup_test(self):
526 """ ACL plugin version check; learn MACs
528 reply = self.vapi.papi.acl_plugin_get_version()
529 self.assertEqual(reply.major, 1)
530 self.logger.info("Working with ACL plugin version: %d.%d" % (
531 reply.major, reply.minor))
532 # minor version changes are non breaking
533 # self.assertEqual(reply.minor, 0)
535 def test_0001_acl_create(self):
536 """ ACL create/delete test
539 self.logger.info("ACLP_TEST_START_0001")
541 r = [{'is_permit': 1, 'is_ipv6': 0, 'proto': 17,
542 'srcport_or_icmptype_first': 1234,
543 'srcport_or_icmptype_last': 1235,
544 'src_ip_prefix_len': 0,
545 'src_ip_addr': '\x00\x00\x00\x00',
546 'dstport_or_icmpcode_first': 1234,
547 'dstport_or_icmpcode_last': 1234,
548 'dst_ip_addr': '\x00\x00\x00\x00',
549 'dst_ip_prefix_len': 0}]
550 # Test 1: add a new ACL
551 reply = self.vapi.acl_add_replace(acl_index=4294967295, r=r,
553 self.assertEqual(reply.retval, 0)
554 # The very first ACL gets #0
555 self.assertEqual(reply.acl_index, 0)
556 first_acl = reply.acl_index
557 rr = self.vapi.acl_dump(reply.acl_index)
558 self.logger.info("Dumped ACL: " + str(rr))
559 self.assertEqual(len(rr), 1)
560 # We should have the same number of ACL entries as we had asked
561 self.assertEqual(len(rr[0].r), len(r))
562 # The rules should be the same. But because the submitted and returned
563 # are different types, we need to iterate over rules and keys to get
565 for i_rule in range(0, len(r) - 1):
566 for rule_key in r[i_rule]:
567 self.assertEqual(rr[0].r[i_rule][rule_key],
570 # Add a deny-1234 ACL
571 r_deny = [{'is_permit': 0, 'is_ipv6': 0, 'proto': 17,
572 'srcport_or_icmptype_first': 1234,
573 'srcport_or_icmptype_last': 1235,
574 'src_ip_prefix_len': 0,
575 'src_ip_addr': '\x00\x00\x00\x00',
576 'dstport_or_icmpcode_first': 1234,
577 'dstport_or_icmpcode_last': 1234,
578 'dst_ip_addr': '\x00\x00\x00\x00',
579 'dst_ip_prefix_len': 0},
580 {'is_permit': 1, 'is_ipv6': 0, 'proto': 17,
581 'srcport_or_icmptype_first': 0,
582 'srcport_or_icmptype_last': 0,
583 'src_ip_prefix_len': 0,
584 'src_ip_addr': '\x00\x00\x00\x00',
585 'dstport_or_icmpcode_first': 0,
586 'dstport_or_icmpcode_last': 0,
587 'dst_ip_addr': '\x00\x00\x00\x00',
588 'dst_ip_prefix_len': 0}]
590 reply = self.vapi.acl_add_replace(acl_index=4294967295, r=r_deny,
591 tag="deny 1234;permit all")
592 self.assertEqual(reply.retval, 0)
593 # The second ACL gets #1
594 self.assertEqual(reply.acl_index, 1)
595 second_acl = reply.acl_index
597 # Test 2: try to modify a nonexistent ACL
598 reply = self.vapi.acl_add_replace(acl_index=432, r=r,
599 tag="FFFF:FFFF", expected_retval=-6)
600 self.assertEqual(reply.retval, -6)
601 # The ACL number should pass through
602 self.assertEqual(reply.acl_index, 432)
603 # apply an ACL on an interface inbound, try to delete ACL, must fail
604 self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
607 reply = self.vapi.acl_del(acl_index=first_acl, expected_retval=-142)
608 # Unapply an ACL and then try to delete it - must be ok
609 self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
612 reply = self.vapi.acl_del(acl_index=first_acl, expected_retval=0)
614 # apply an ACL on an interface outbound, try to delete ACL, must fail
615 self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
618 reply = self.vapi.acl_del(acl_index=second_acl, expected_retval=-143)
619 # Unapply the ACL and then try to delete it - must be ok
620 self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
623 reply = self.vapi.acl_del(acl_index=second_acl, expected_retval=0)
625 # try to apply a nonexistent ACL - must fail
626 self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
631 self.logger.info("ACLP_TEST_FINISH_0001")
633 def test_0002_acl_permit_apply(self):
634 """ permit ACL apply test
636 self.logger.info("ACLP_TEST_START_0002")
639 rules.append(self.create_rule(self.IPV4, self.PERMIT,
640 0, self.proto[self.IP][self.UDP]))
641 rules.append(self.create_rule(self.IPV4, self.PERMIT,
642 0, self.proto[self.IP][self.TCP]))
645 self.apply_rules(rules, "permit per-flow")
647 # Traffic should still pass
648 self.run_verify_test(self.IP, self.IPV4, -1)
649 self.logger.info("ACLP_TEST_FINISH_0002")
651 def test_0003_acl_deny_apply(self):
652 """ deny ACL apply test
654 self.logger.info("ACLP_TEST_START_0003")
655 # Add a deny-flows ACL
657 rules.append(self.create_rule(self.IPV4, self.DENY,
658 self.PORTS_ALL, self.proto[self.IP][self.UDP]))
659 # Permit ip any any in the end
660 rules.append(self.create_rule(self.IPV4, self.PERMIT,
664 self.apply_rules(rules, "deny per-flow;permit all")
666 # Traffic should not pass
667 self.run_verify_negat_test(self.IP, self.IPV4,
668 self.proto[self.IP][self.UDP])
669 self.logger.info("ACLP_TEST_FINISH_0003")
670 # self.assertEqual(1, 0)
672 def test_0004_vpp624_permit_icmpv4(self):
673 """ VPP_624 permit ICMPv4
675 self.logger.info("ACLP_TEST_START_0004")
679 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
680 self.proto[self.ICMP][self.ICMPv4]))
681 # deny ip any any in the end
682 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
685 self.apply_rules(rules, "permit icmpv4")
687 # Traffic should still pass
688 self.run_verify_test(self.ICMP, self.IPV4,
689 self.proto[self.ICMP][self.ICMPv4])
691 self.logger.info("ACLP_TEST_FINISH_0004")
693 def test_0005_vpp624_permit_icmpv6(self):
694 """ VPP_624 permit ICMPv6
696 self.logger.info("ACLP_TEST_START_0005")
700 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
701 self.proto[self.ICMP][self.ICMPv6]))
702 # deny ip any any in the end
703 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
706 self.apply_rules(rules, "permit icmpv6")
708 # Traffic should still pass
709 self.run_verify_test(self.ICMP, self.IPV6,
710 self.proto[self.ICMP][self.ICMPv6])
712 self.logger.info("ACLP_TEST_FINISH_0005")
714 def test_0006_vpp624_deny_icmpv4(self):
715 """ VPP_624 deny ICMPv4
717 self.logger.info("ACLP_TEST_START_0006")
720 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
721 self.proto[self.ICMP][self.ICMPv4]))
722 # permit ip any any in the end
723 rules.append(self.create_rule(self.IPV4, self.PERMIT,
727 self.apply_rules(rules, "deny icmpv4")
729 # Traffic should not pass
730 self.run_verify_negat_test(self.ICMP, self.IPV4, 0)
732 self.logger.info("ACLP_TEST_FINISH_0006")
734 def test_0007_vpp624_deny_icmpv6(self):
735 """ VPP_624 deny ICMPv6
737 self.logger.info("ACLP_TEST_START_0007")
740 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
741 self.proto[self.ICMP][self.ICMPv6]))
742 # deny ip any any in the end
743 rules.append(self.create_rule(self.IPV6, self.PERMIT,
747 self.apply_rules(rules, "deny icmpv6")
749 # Traffic should not pass
750 self.run_verify_negat_test(self.ICMP, self.IPV6, 0)
752 self.logger.info("ACLP_TEST_FINISH_0007")
754 def test_0008_tcp_permit_v4(self):
757 self.logger.info("ACLP_TEST_START_0008")
761 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
762 self.proto[self.IP][self.TCP]))
763 # deny ip any any in the end
764 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
767 self.apply_rules(rules, "permit ipv4 tcp")
769 # Traffic should still pass
770 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP])
772 self.logger.info("ACLP_TEST_FINISH_0008")
774 def test_0009_tcp_permit_v6(self):
777 self.logger.info("ACLP_TEST_START_0009")
781 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
782 self.proto[self.IP][self.TCP]))
783 # deny ip any any in the end
784 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
787 self.apply_rules(rules, "permit ip6 tcp")
789 # Traffic should still pass
790 self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.TCP])
792 self.logger.info("ACLP_TEST_FINISH_0008")
794 def test_0010_udp_permit_v4(self):
797 self.logger.info("ACLP_TEST_START_0010")
801 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
802 self.proto[self.IP][self.UDP]))
803 # deny ip any any in the end
804 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
807 self.apply_rules(rules, "permit ipv udp")
809 # Traffic should still pass
810 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP])
812 self.logger.info("ACLP_TEST_FINISH_0010")
814 def test_0011_udp_permit_v6(self):
817 self.logger.info("ACLP_TEST_START_0011")
821 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
822 self.proto[self.IP][self.UDP]))
823 # deny ip any any in the end
824 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
827 self.apply_rules(rules, "permit ip6 udp")
829 # Traffic should still pass
830 self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.UDP])
832 self.logger.info("ACLP_TEST_FINISH_0011")
834 def test_0012_tcp_deny(self):
837 self.logger.info("ACLP_TEST_START_0012")
841 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
842 self.proto[self.IP][self.TCP]))
843 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
844 self.proto[self.IP][self.TCP]))
845 # permit ip any any in the end
846 rules.append(self.create_rule(self.IPV4, self.PERMIT,
848 rules.append(self.create_rule(self.IPV6, self.PERMIT,
852 self.apply_rules(rules, "deny ip4/ip6 tcp")
854 # Traffic should not pass
855 self.run_verify_negat_test(self.IP, self.IPRANDOM,
856 self.proto[self.IP][self.TCP])
858 self.logger.info("ACLP_TEST_FINISH_0012")
860 def test_0013_udp_deny(self):
863 self.logger.info("ACLP_TEST_START_0013")
867 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
868 self.proto[self.IP][self.UDP]))
869 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
870 self.proto[self.IP][self.UDP]))
871 # permit ip any any in the end
872 rules.append(self.create_rule(self.IPV4, self.PERMIT,
874 rules.append(self.create_rule(self.IPV6, self.PERMIT,
878 self.apply_rules(rules, "deny ip4/ip6 udp")
880 # Traffic should not pass
881 self.run_verify_negat_test(self.IP, self.IPRANDOM,
882 self.proto[self.IP][self.UDP])
884 self.logger.info("ACLP_TEST_FINISH_0013")
886 def test_0014_acl_dump(self):
887 """ verify add/dump acls
889 self.logger.info("ACLP_TEST_START_0014")
891 r = [[self.IPV4, self.PERMIT, 1234, self.proto[self.IP][self.TCP]],
892 [self.IPV4, self.PERMIT, 2345, self.proto[self.IP][self.UDP]],
893 [self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.TCP]],
894 [self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.UDP]],
895 [self.IPV4, self.PERMIT, 5, self.proto[self.ICMP][self.ICMPv4]],
896 [self.IPV6, self.PERMIT, 4321, self.proto[self.IP][self.TCP]],
897 [self.IPV6, self.PERMIT, 5432, self.proto[self.IP][self.UDP]],
898 [self.IPV6, self.PERMIT, 0, self.proto[self.IP][self.TCP]],
899 [self.IPV6, self.PERMIT, 0, self.proto[self.IP][self.UDP]],
900 [self.IPV6, self.PERMIT, 6, self.proto[self.ICMP][self.ICMPv6]],
901 [self.IPV4, self.DENY, self.PORTS_ALL, 0],
902 [self.IPV4, self.DENY, 1234, self.proto[self.IP][self.TCP]],
903 [self.IPV4, self.DENY, 2345, self.proto[self.IP][self.UDP]],
904 [self.IPV4, self.DENY, 5, self.proto[self.ICMP][self.ICMPv4]],
905 [self.IPV6, self.DENY, 4321, self.proto[self.IP][self.TCP]],
906 [self.IPV6, self.DENY, 5432, self.proto[self.IP][self.UDP]],
907 [self.IPV6, self.DENY, 6, self.proto[self.ICMP][self.ICMPv6]],
908 [self.IPV6, self.DENY, self.PORTS_ALL, 0]
911 # Add and verify new ACLs
913 for i in range(len(r)):
914 rules.append(self.create_rule(r[i][0], r[i][1], r[i][2], r[i][3]))
916 reply = self.vapi.acl_add_replace(acl_index=4294967295, r=rules)
917 result = self.vapi.acl_dump(reply.acl_index)
920 for drules in result:
922 self.assertEqual(dr.is_ipv6, r[i][0])
923 self.assertEqual(dr.is_permit, r[i][1])
924 self.assertEqual(dr.proto, r[i][3])
927 self.assertEqual(dr.srcport_or_icmptype_first, r[i][2])
930 self.assertEqual(dr.srcport_or_icmptype_first, 0)
931 self.assertEqual(dr.srcport_or_icmptype_last, 65535)
933 if dr.proto == self.proto[self.IP][self.TCP]:
934 self.assertGreater(dr.srcport_or_icmptype_first,
935 self.tcp_sport_from-1)
936 self.assertLess(dr.srcport_or_icmptype_first,
938 self.assertGreater(dr.dstport_or_icmpcode_last,
939 self.tcp_dport_from-1)
940 self.assertLess(dr.dstport_or_icmpcode_last,
942 elif dr.proto == self.proto[self.IP][self.UDP]:
943 self.assertGreater(dr.srcport_or_icmptype_first,
944 self.udp_sport_from-1)
945 self.assertLess(dr.srcport_or_icmptype_first,
947 self.assertGreater(dr.dstport_or_icmpcode_last,
948 self.udp_dport_from-1)
949 self.assertLess(dr.dstport_or_icmpcode_last,
953 self.logger.info("ACLP_TEST_FINISH_0014")
955 def test_0015_tcp_permit_port_v4(self):
956 """ permit single TCPv4
958 self.logger.info("ACLP_TEST_START_0015")
960 port = random.randint(0, 65535)
963 rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
964 self.proto[self.IP][self.TCP]))
965 # deny ip any any in the end
966 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
969 self.apply_rules(rules, "permit ip4 tcp "+str(port))
971 # Traffic should still pass
972 self.run_verify_test(self.IP, self.IPV4,
973 self.proto[self.IP][self.TCP], port)
975 self.logger.info("ACLP_TEST_FINISH_0015")
977 def test_0016_udp_permit_port_v4(self):
978 """ permit single UDPv4
980 self.logger.info("ACLP_TEST_START_0016")
982 port = random.randint(0, 65535)
985 rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
986 self.proto[self.IP][self.UDP]))
987 # deny ip any any in the end
988 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
991 self.apply_rules(rules, "permit ip4 tcp "+str(port))
993 # Traffic should still pass
994 self.run_verify_test(self.IP, self.IPV4,
995 self.proto[self.IP][self.UDP], port)
997 self.logger.info("ACLP_TEST_FINISH_0016")
999 def test_0017_tcp_permit_port_v6(self):
1000 """ permit single TCPv6
1002 self.logger.info("ACLP_TEST_START_0017")
1004 port = random.randint(0, 65535)
1007 rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
1008 self.proto[self.IP][self.TCP]))
1009 # deny ip any any in the end
1010 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1013 self.apply_rules(rules, "permit ip4 tcp "+str(port))
1015 # Traffic should still pass
1016 self.run_verify_test(self.IP, self.IPV6,
1017 self.proto[self.IP][self.TCP], port)
1019 self.logger.info("ACLP_TEST_FINISH_0017")
1021 def test_0018_udp_permit_port_v6(self):
1022 """ permit single UPPv6
1024 self.logger.info("ACLP_TEST_START_0018")
1026 port = random.randint(0, 65535)
1029 rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
1030 self.proto[self.IP][self.UDP]))
1031 # deny ip any any in the end
1032 rules.append(self.create_rule(self.IPV6, self.DENY,
1036 self.apply_rules(rules, "permit ip4 tcp "+str(port))
1038 # Traffic should still pass
1039 self.run_verify_test(self.IP, self.IPV6,
1040 self.proto[self.IP][self.UDP], port)
1042 self.logger.info("ACLP_TEST_FINISH_0018")
1044 def test_0019_udp_deny_port(self):
1045 """ deny single TCPv4/v6
1047 self.logger.info("ACLP_TEST_START_0019")
1049 port = random.randint(0, 65535)
1052 rules.append(self.create_rule(self.IPV4, self.DENY, port,
1053 self.proto[self.IP][self.TCP]))
1054 rules.append(self.create_rule(self.IPV6, self.DENY, port,
1055 self.proto[self.IP][self.TCP]))
1056 # Permit ip any any in the end
1057 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1059 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1063 self.apply_rules(rules, "deny ip4/ip6 udp "+str(port))
1065 # Traffic should not pass
1066 self.run_verify_negat_test(self.IP, self.IPRANDOM,
1067 self.proto[self.IP][self.TCP], port)
1069 self.logger.info("ACLP_TEST_FINISH_0019")
1071 def test_0020_udp_deny_port(self):
1072 """ deny single UDPv4/v6
1074 self.logger.info("ACLP_TEST_START_0020")
1076 port = random.randint(0, 65535)
1079 rules.append(self.create_rule(self.IPV4, self.DENY, port,
1080 self.proto[self.IP][self.UDP]))
1081 rules.append(self.create_rule(self.IPV6, self.DENY, port,
1082 self.proto[self.IP][self.UDP]))
1083 # Permit ip any any in the end
1084 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1086 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1090 self.apply_rules(rules, "deny ip4/ip6 udp "+str(port))
1092 # Traffic should not pass
1093 self.run_verify_negat_test(self.IP, self.IPRANDOM,
1094 self.proto[self.IP][self.UDP], port)
1096 self.logger.info("ACLP_TEST_FINISH_0020")
1098 def test_0021_udp_deny_port_verify_fragment_deny(self):
1099 """ deny single UDPv4/v6, permit ip any, verify non-initial fragment
1102 self.logger.info("ACLP_TEST_START_0021")
1104 port = random.randint(0, 65535)
1107 rules.append(self.create_rule(self.IPV4, self.DENY, port,
1108 self.proto[self.IP][self.UDP]))
1109 rules.append(self.create_rule(self.IPV6, self.DENY, port,
1110 self.proto[self.IP][self.UDP]))
1111 # deny ip any any in the end
1112 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1114 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1118 self.apply_rules(rules, "deny ip4/ip6 udp "+str(port))
1120 # Traffic should not pass
1121 self.run_verify_negat_test(self.IP, self.IPRANDOM,
1122 self.proto[self.IP][self.UDP], port, True)
1124 self.logger.info("ACLP_TEST_FINISH_0021")
1126 def test_0022_zero_length_udp_ipv4(self):
1127 """ VPP-687 zero length udp ipv4 packet"""
1128 self.logger.info("ACLP_TEST_START_0022")
1130 port = random.randint(0, 65535)
1133 rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
1134 self.proto[self.IP][self.UDP]))
1135 # deny ip any any in the end
1137 self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1140 self.apply_rules(rules, "permit empty udp ip4 " + str(port))
1142 # Traffic should still pass
1143 # Create incoming packet streams for packet-generator interfaces
1145 pkts = self.create_stream(self.pg0, self.pg_if_packet_sizes,
1147 self.proto[self.IP][self.UDP], port,
1150 self.pg0.add_stream(pkts)
1151 pkts_cnt += len(pkts)
1153 # Enable packet capture and start packet sendingself.IPV
1154 self.pg_enable_capture(self.pg_interfaces)
1157 self.pg1.get_capture(pkts_cnt)
1159 self.logger.info("ACLP_TEST_FINISH_0022")
1161 def test_0023_zero_length_udp_ipv6(self):
1162 """ VPP-687 zero length udp ipv6 packet"""
1163 self.logger.info("ACLP_TEST_START_0023")
1165 port = random.randint(0, 65535)
1168 rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
1169 self.proto[self.IP][self.UDP]))
1170 # deny ip any any in the end
1171 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1174 self.apply_rules(rules, "permit empty udp ip6 "+str(port))
1176 # Traffic should still pass
1177 # Create incoming packet streams for packet-generator interfaces
1179 pkts = self.create_stream(self.pg0, self.pg_if_packet_sizes,
1181 self.proto[self.IP][self.UDP], port,
1184 self.pg0.add_stream(pkts)
1185 pkts_cnt += len(pkts)
1187 # Enable packet capture and start packet sendingself.IPV
1188 self.pg_enable_capture(self.pg_interfaces)
1191 # Verify outgoing packet streams per packet-generator interface
1192 self.pg1.get_capture(pkts_cnt)
1194 self.logger.info("ACLP_TEST_FINISH_0023")
1196 def test_0108_tcp_permit_v4(self):
1197 """ permit TCPv4 + non-match range
1199 self.logger.info("ACLP_TEST_START_0108")
1203 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1204 self.proto[self.IP][self.TCP]))
1205 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1206 self.proto[self.IP][self.TCP]))
1207 # deny ip any any in the end
1208 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1211 self.apply_rules(rules, "permit ipv4 tcp")
1213 # Traffic should still pass
1214 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP])
1216 self.logger.info("ACLP_TEST_FINISH_0108")
1218 def test_0109_tcp_permit_v6(self):
1219 """ permit TCPv6 + non-match range
1221 self.logger.info("ACLP_TEST_START_0109")
1225 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE_2,
1226 self.proto[self.IP][self.TCP]))
1227 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
1228 self.proto[self.IP][self.TCP]))
1229 # deny ip any any in the end
1230 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1233 self.apply_rules(rules, "permit ip6 tcp")
1235 # Traffic should still pass
1236 self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.TCP])
1238 self.logger.info("ACLP_TEST_FINISH_0109")
1240 def test_0110_udp_permit_v4(self):
1241 """ permit UDPv4 + non-match range
1243 self.logger.info("ACLP_TEST_START_0110")
1247 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1248 self.proto[self.IP][self.UDP]))
1249 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1250 self.proto[self.IP][self.UDP]))
1251 # deny ip any any in the end
1252 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1255 self.apply_rules(rules, "permit ipv4 udp")
1257 # Traffic should still pass
1258 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP])
1260 self.logger.info("ACLP_TEST_FINISH_0110")
1262 def test_0111_udp_permit_v6(self):
1263 """ permit UDPv6 + non-match range
1265 self.logger.info("ACLP_TEST_START_0111")
1269 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE_2,
1270 self.proto[self.IP][self.UDP]))
1271 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
1272 self.proto[self.IP][self.UDP]))
1273 # deny ip any any in the end
1274 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1277 self.apply_rules(rules, "permit ip6 udp")
1279 # Traffic should still pass
1280 self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.UDP])
1282 self.logger.info("ACLP_TEST_FINISH_0111")
1284 def test_0112_tcp_deny(self):
1285 """ deny TCPv4/v6 + non-match range
1287 self.logger.info("ACLP_TEST_START_0112")
1291 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1293 self.proto[self.IP][self.TCP]))
1294 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1296 self.proto[self.IP][self.TCP]))
1297 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
1298 self.proto[self.IP][self.TCP]))
1299 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
1300 self.proto[self.IP][self.TCP]))
1301 # permit ip any any in the end
1302 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1304 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1308 self.apply_rules(rules, "deny ip4/ip6 tcp")
1310 # Traffic should not pass
1311 self.run_verify_negat_test(self.IP, self.IPRANDOM,
1312 self.proto[self.IP][self.TCP])
1314 self.logger.info("ACLP_TEST_FINISH_0112")
1316 def test_0113_udp_deny(self):
1317 """ deny UDPv4/v6 + non-match range
1319 self.logger.info("ACLP_TEST_START_0113")
1323 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1325 self.proto[self.IP][self.UDP]))
1326 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1328 self.proto[self.IP][self.UDP]))
1329 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
1330 self.proto[self.IP][self.UDP]))
1331 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
1332 self.proto[self.IP][self.UDP]))
1333 # permit ip any any in the end
1334 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1336 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1340 self.apply_rules(rules, "deny ip4/ip6 udp")
1342 # Traffic should not pass
1343 self.run_verify_negat_test(self.IP, self.IPRANDOM,
1344 self.proto[self.IP][self.UDP])
1346 self.logger.info("ACLP_TEST_FINISH_0113")
1348 def test_0300_tcp_permit_v4_etype_aaaa(self):
1349 """ permit TCPv4, send 0xAAAA etype
1351 self.logger.info("ACLP_TEST_START_0300")
1355 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1356 self.proto[self.IP][self.TCP]))
1357 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1358 self.proto[self.IP][self.TCP]))
1359 # deny ip any any in the end
1360 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1363 self.apply_rules(rules, "permit ipv4 tcp")
1365 # Traffic should still pass also for an odd ethertype
1366 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP],
1367 0, False, True, 0xaaaa)
1368 self.logger.info("ACLP_TEST_FINISH_0300")
1370 def test_0305_tcp_permit_v4_etype_blacklist_aaaa(self):
1371 """ permit TCPv4, whitelist 0x0BBB ethertype, send 0xAAAA-blocked
1373 self.logger.info("ACLP_TEST_START_0305")
1377 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1378 self.proto[self.IP][self.TCP]))
1379 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1380 self.proto[self.IP][self.TCP]))
1381 # deny ip any any in the end
1382 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1385 self.apply_rules(rules, "permit ipv4 tcp")
1387 # whitelist the 0xbbbb etype - so the 0xaaaa should be blocked
1388 self.etype_whitelist([0xbbb], 1)
1390 # The oddball ethertype should be blocked
1391 self.run_verify_negat_test(self.IP, self.IPV4,
1392 self.proto[self.IP][self.TCP],
1395 # remove the whitelist
1396 self.etype_whitelist([], 0)
1398 self.logger.info("ACLP_TEST_FINISH_0305")
1400 def test_0306_tcp_permit_v4_etype_blacklist_aaaa(self):
1401 """ permit TCPv4, whitelist 0x0BBB ethertype, send 0x0BBB - pass
1403 self.logger.info("ACLP_TEST_START_0306")
1407 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1408 self.proto[self.IP][self.TCP]))
1409 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1410 self.proto[self.IP][self.TCP]))
1411 # deny ip any any in the end
1412 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1415 self.apply_rules(rules, "permit ipv4 tcp")
1417 # whitelist the 0xbbbb etype - so the 0xaaaa should be blocked
1418 self.etype_whitelist([0xbbb], 1)
1420 # The whitelisted traffic, should pass
1421 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP],
1422 0, False, True, 0x0bbb)
1424 # remove the whitelist, the previously blocked 0xAAAA should pass now
1425 self.etype_whitelist([], 0)
1427 self.logger.info("ACLP_TEST_FINISH_0306")
1429 def test_0307_tcp_permit_v4_etype_blacklist_aaaa(self):
1430 """ permit TCPv4, whitelist 0x0BBB, remove, send 0xAAAA - pass
1432 self.logger.info("ACLP_TEST_START_0307")
1436 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1437 self.proto[self.IP][self.TCP]))
1438 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1439 self.proto[self.IP][self.TCP]))
1440 # deny ip any any in the end
1441 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1444 self.apply_rules(rules, "permit ipv4 tcp")
1446 # whitelist the 0xbbbb etype - so the 0xaaaa should be blocked
1447 self.etype_whitelist([0xbbb], 1)
1448 # remove the whitelist, the previously blocked 0xAAAA should pass now
1449 self.etype_whitelist([], 0)
1451 # The whitelisted traffic, should pass
1452 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP],
1453 0, False, True, 0xaaaa)
1455 self.logger.info("ACLP_TEST_FINISH_0306")
1457 def test_0315_del_intf(self):
1458 """ apply an acl and delete the interface
1460 self.logger.info("ACLP_TEST_START_0315")
1464 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1465 self.proto[self.IP][self.TCP]))
1466 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1467 self.proto[self.IP][self.TCP]))
1468 # deny ip any any in the end
1469 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1471 # create an interface
1473 intf.append(VppLoInterface(self))
1476 self.apply_rules_to(rules, "permit ipv4 tcp", intf[0].sw_if_index)
1478 # Remove the interface
1479 intf[0].remove_vpp_config()
1481 self.logger.info("ACLP_TEST_FINISH_0315")
1483 if __name__ == '__main__':
1484 unittest.main(testRunner=VppTestRunner)