2 """ACL plugin Test Case HLD:
8 from scapy.packet import Raw
9 from scapy.layers.l2 import Ether
10 from scapy.layers.inet import IP, TCP, UDP, ICMP
11 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest
12 from scapy.layers.inet6 import IPv6ExtHdrFragment
13 from framework import VppTestCase, VppTestRunner
14 from util import Host, ppp
16 from vpp_lo_interface import VppLoInterface
19 class TestACLplugin(VppTestCase):
20 """ ACL plugin Test Case """
36 proto = [[6, 17], [1, 58]]
37 proto_map = {1: 'ICMP', 58: 'ICMPv6EchoRequest', 6: 'TCP', 17: 'UDP'}
49 udp_sport_to = udp_sport_from + 5
50 udp_dport_from = 20000
51 udp_dport_to = udp_dport_from + 5000
53 tcp_sport_to = tcp_sport_from + 5
54 tcp_dport_from = 40000
55 tcp_dport_to = tcp_dport_from + 5000
58 udp_sport_to_2 = udp_sport_from_2 + 5
59 udp_dport_from_2 = 30000
60 udp_dport_to_2 = udp_dport_from_2 + 5000
61 tcp_sport_from_2 = 130
62 tcp_sport_to_2 = tcp_sport_from_2 + 5
63 tcp_dport_from_2 = 20000
64 tcp_dport_to_2 = tcp_dport_from_2 + 5000
66 icmp4_type = 8 # echo request
68 icmp6_type = 128 # echo request
84 Perform standard class setup (defined by class method setUpClass in
85 class VppTestCase) before running the test case, set test case related
86 variables and configure VPP.
88 super(TestACLplugin, cls).setUpClass()
91 # Create 2 pg interfaces
92 cls.create_pg_interfaces(range(2))
94 # Packet flows mapping pg0 -> pg1, pg2 etc.
96 cls.flows[cls.pg0] = [cls.pg1]
99 cls.pg_if_packet_sizes = [64, 512, 1518, 9018]
101 # Create BD with MAC learning and unknown unicast flooding disabled
102 # and put interfaces to this BD
103 cls.vapi.bridge_domain_add_del(bd_id=cls.bd_id, uu_flood=1,
105 for pg_if in cls.pg_interfaces:
106 cls.vapi.sw_interface_set_l2_bridge(
107 rx_sw_if_index=pg_if.sw_if_index, bd_id=cls.bd_id)
109 # Set up all interfaces
110 for i in cls.pg_interfaces:
113 # Mapping between packet-generator index and lists of test hosts
114 cls.hosts_by_pg_idx = dict()
115 for pg_if in cls.pg_interfaces:
116 cls.hosts_by_pg_idx[pg_if.sw_if_index] = []
118 # Create list of deleted hosts
119 cls.deleted_hosts_by_pg_idx = dict()
120 for pg_if in cls.pg_interfaces:
121 cls.deleted_hosts_by_pg_idx[pg_if.sw_if_index] = []
123 # warm-up the mac address tables
127 n_int = len(cls.pg_interfaces)
128 macs_per_if = count / n_int
130 for pg_if in cls.pg_interfaces:
132 start_nr = macs_per_if * i + start
133 end_nr = count + start if i == (n_int - 1) \
134 else macs_per_if * (i + 1) + start
135 hosts = cls.hosts_by_pg_idx[pg_if.sw_if_index]
136 for j in range(start_nr, end_nr):
138 "00:00:00:ff:%02x:%02x" % (pg_if.sw_if_index, j),
139 "172.17.1%02x.%u" % (pg_if.sw_if_index, j),
140 "2017:dead:%02x::%u" % (pg_if.sw_if_index, j))
144 super(TestACLplugin, cls).tearDownClass()
148 def tearDownClass(cls):
149 super(TestACLplugin, cls).tearDownClass()
152 super(TestACLplugin, self).setUp()
153 self.reset_packet_infos()
157 Show various debug prints after each test.
159 super(TestACLplugin, self).tearDown()
161 def show_commands_at_teardown(self):
162 cli = "show vlib graph l2-input-feat-arc"
163 self.logger.info(self.vapi.ppcli(cli))
164 cli = "show vlib graph l2-input-feat-arc-end"
165 self.logger.info(self.vapi.ppcli(cli))
166 cli = "show vlib graph l2-output-feat-arc"
167 self.logger.info(self.vapi.ppcli(cli))
168 cli = "show vlib graph l2-output-feat-arc-end"
169 self.logger.info(self.vapi.ppcli(cli))
170 self.logger.info(self.vapi.ppcli("show l2fib verbose"))
171 self.logger.info(self.vapi.ppcli("show acl-plugin acl"))
172 self.logger.info(self.vapi.ppcli("show acl-plugin interface"))
173 self.logger.info(self.vapi.ppcli("show acl-plugin tables"))
174 self.logger.info(self.vapi.ppcli("show bridge-domain %s detail"
177 def create_rule(self, ip=0, permit_deny=0, ports=PORTS_ALL, proto=-1,
178 s_prefix=0, s_ip='\x00\x00\x00\x00',
179 d_prefix=0, d_ip='\x00\x00\x00\x00'):
182 if ports == self.PORTS_ALL:
185 sport_to = 65535 if proto != 1 and proto != 58 else 255
187 elif ports == self.PORTS_RANGE:
189 sport_from = self.icmp4_type
190 sport_to = self.icmp4_type
191 dport_from = self.icmp4_code
192 dport_to = self.icmp4_code
194 sport_from = self.icmp6_type
195 sport_to = self.icmp6_type
196 dport_from = self.icmp6_code
197 dport_to = self.icmp6_code
198 elif proto == self.proto[self.IP][self.TCP]:
199 sport_from = self.tcp_sport_from
200 sport_to = self.tcp_sport_to
201 dport_from = self.tcp_dport_from
202 dport_to = self.tcp_dport_to
203 elif proto == self.proto[self.IP][self.UDP]:
204 sport_from = self.udp_sport_from
205 sport_to = self.udp_sport_to
206 dport_from = self.udp_dport_from
207 dport_to = self.udp_dport_to
208 elif ports == self.PORTS_RANGE_2:
210 sport_from = self.icmp4_type_2
211 sport_to = self.icmp4_type_2
212 dport_from = self.icmp4_code_from_2
213 dport_to = self.icmp4_code_to_2
215 sport_from = self.icmp6_type_2
216 sport_to = self.icmp6_type_2
217 dport_from = self.icmp6_code_from_2
218 dport_to = self.icmp6_code_to_2
219 elif proto == self.proto[self.IP][self.TCP]:
220 sport_from = self.tcp_sport_from_2
221 sport_to = self.tcp_sport_to_2
222 dport_from = self.tcp_dport_from_2
223 dport_to = self.tcp_dport_to_2
224 elif proto == self.proto[self.IP][self.UDP]:
225 sport_from = self.udp_sport_from_2
226 sport_to = self.udp_sport_to_2
227 dport_from = self.udp_dport_from_2
228 dport_to = self.udp_dport_to_2
235 rule = ({'is_permit': permit_deny, 'is_ipv6': ip, 'proto': proto,
236 'srcport_or_icmptype_first': sport_from,
237 'srcport_or_icmptype_last': sport_to,
238 'src_ip_prefix_len': s_prefix,
240 'dstport_or_icmpcode_first': dport_from,
241 'dstport_or_icmpcode_last': dport_to,
242 'dst_ip_prefix_len': d_prefix,
243 'dst_ip_addr': d_ip})
246 def apply_rules(self, rules, tag=b''):
247 reply = self.vapi.acl_add_replace(acl_index=4294967295, r=rules,
249 self.logger.info("Dumped ACL: " + str(
250 self.vapi.acl_dump(reply.acl_index)))
251 # Apply a ACL on the interface as inbound
252 for i in self.pg_interfaces:
253 self.vapi.acl_interface_set_acl_list(sw_if_index=i.sw_if_index,
255 acls=[reply.acl_index])
256 return reply.acl_index
258 def apply_rules_to(self, rules, tag=b'', sw_if_index=0xFFFFFFFF):
259 reply = self.vapi.acl_add_replace(acl_index=4294967295, r=rules,
261 self.logger.info("Dumped ACL: " + str(
262 self.vapi.acl_dump(reply.acl_index)))
263 # Apply a ACL on the interface as inbound
264 self.vapi.acl_interface_set_acl_list(sw_if_index=sw_if_index,
266 acls=[reply.acl_index])
267 return reply.acl_index
269 def etype_whitelist(self, whitelist, n_input):
270 # Apply whitelists on all the interfaces
271 for i in self.pg_interfaces:
272 # checkstyle can't read long names. Help them.
273 fun = self.vapi.acl_interface_set_etype_whitelist
274 fun(sw_if_index=i.sw_if_index, n_input=n_input,
278 def create_upper_layer(self, packet_index, proto, ports=0):
279 p = self.proto_map[proto]
282 return UDP(sport=random.randint(self.udp_sport_from,
284 dport=random.randint(self.udp_dport_from,
287 return UDP(sport=ports, dport=ports)
290 return TCP(sport=random.randint(self.tcp_sport_from,
292 dport=random.randint(self.tcp_dport_from,
295 return TCP(sport=ports, dport=ports)
298 def create_stream(self, src_if, packet_sizes, traffic_type=0, ipv6=0,
299 proto=-1, ports=0, fragments=False,
300 pkt_raw=True, etype=-1):
302 Create input packet stream for defined interface using hosts or
305 :param object src_if: Interface to create packet stream for.
306 :param list packet_sizes: List of required packet sizes.
307 :param traffic_type: 1: ICMP packet, 2: IPv6 with EH, 0: otherwise.
308 :return: Stream of packets.
311 if self.flows.__contains__(src_if):
312 src_hosts = self.hosts_by_pg_idx[src_if.sw_if_index]
313 for dst_if in self.flows[src_if]:
314 dst_hosts = self.hosts_by_pg_idx[dst_if.sw_if_index]
315 n_int = len(dst_hosts) * len(src_hosts)
316 for i in range(0, n_int):
317 dst_host = dst_hosts[i / len(src_hosts)]
318 src_host = src_hosts[i % len(src_hosts)]
319 pkt_info = self.create_packet_info(src_if, dst_if)
325 pkt_info.ip = random.choice([0, 1])
327 pkt_info.proto = random.choice(self.proto[self.IP])
329 pkt_info.proto = proto
330 payload = self.info_to_payload(pkt_info)
331 p = Ether(dst=dst_host.mac, src=src_host.mac)
333 p = Ether(dst=dst_host.mac,
337 p /= IPv6(dst=dst_host.ip6, src=src_host.ip6)
339 p /= IPv6ExtHdrFragment(offset=64, m=1)
342 p /= IP(src=src_host.ip4, dst=dst_host.ip4,
345 p /= IP(src=src_host.ip4, dst=dst_host.ip4)
346 if traffic_type == self.ICMP:
348 p /= ICMPv6EchoRequest(type=self.icmp6_type,
349 code=self.icmp6_code)
351 p /= ICMP(type=self.icmp4_type,
352 code=self.icmp4_code)
354 p /= self.create_upper_layer(i, pkt_info.proto, ports)
357 pkt_info.data = p.copy()
359 size = random.choice(packet_sizes)
360 self.extend_packet(p, size)
364 def verify_capture(self, pg_if, capture,
365 traffic_type=0, ip_type=0, etype=-1):
367 Verify captured input packet stream for defined interface.
369 :param object pg_if: Interface to verify captured packet stream for.
370 :param list capture: Captured packet stream.
371 :param traffic_type: 1: ICMP packet, 2: IPv6 with EH, 0: otherwise.
374 for i in self.pg_interfaces:
375 last_info[i.sw_if_index] = None
376 dst_sw_if_index = pg_if.sw_if_index
377 for packet in capture:
379 if packet[Ether].type != etype:
380 self.logger.error(ppp("Unexpected ethertype in packet:",
385 # Raw data for ICMPv6 are stored in ICMPv6EchoRequest.data
386 if traffic_type == self.ICMP and ip_type == self.IPV6:
387 payload_info = self.payload_to_info(
388 packet[ICMPv6EchoRequest], 'data')
389 payload = packet[ICMPv6EchoRequest]
391 payload_info = self.payload_to_info(packet[Raw])
392 payload = packet[self.proto_map[payload_info.proto]]
394 self.logger.error(ppp("Unexpected or invalid packet "
395 "(outside network):", packet))
399 self.assertEqual(payload_info.ip, ip_type)
400 if traffic_type == self.ICMP:
402 if payload_info.ip == 0:
403 self.assertEqual(payload.type, self.icmp4_type)
404 self.assertEqual(payload.code, self.icmp4_code)
406 self.assertEqual(payload.type, self.icmp6_type)
407 self.assertEqual(payload.code, self.icmp6_code)
409 self.logger.error(ppp("Unexpected or invalid packet "
410 "(outside network):", packet))
414 ip_version = IPv6 if payload_info.ip == 1 else IP
416 ip = packet[ip_version]
417 packet_index = payload_info.index
419 self.assertEqual(payload_info.dst, dst_sw_if_index)
420 self.logger.debug("Got packet on port %s: src=%u (id=%u)" %
421 (pg_if.name, payload_info.src,
423 next_info = self.get_next_packet_info_for_interface2(
424 payload_info.src, dst_sw_if_index,
425 last_info[payload_info.src])
426 last_info[payload_info.src] = next_info
427 self.assertTrue(next_info is not None)
428 self.assertEqual(packet_index, next_info.index)
429 saved_packet = next_info.data
430 # Check standard fields
431 self.assertEqual(ip.src, saved_packet[ip_version].src)
432 self.assertEqual(ip.dst, saved_packet[ip_version].dst)
433 p = self.proto_map[payload_info.proto]
436 self.assertEqual(tcp.sport, saved_packet[
438 self.assertEqual(tcp.dport, saved_packet[
442 self.assertEqual(udp.sport, saved_packet[
444 self.assertEqual(udp.dport, saved_packet[
447 self.logger.error(ppp("Unexpected or invalid packet:",
450 for i in self.pg_interfaces:
451 remaining_packet = self.get_next_packet_info_for_interface2(
452 i, dst_sw_if_index, last_info[i.sw_if_index])
454 remaining_packet is None,
455 "Port %u: Packet expected from source %u didn't arrive" %
456 (dst_sw_if_index, i.sw_if_index))
458 def run_traffic_no_check(self):
460 # Create incoming packet streams for packet-generator interfaces
461 for i in self.pg_interfaces:
462 if self.flows.__contains__(i):
463 pkts = self.create_stream(i, self.pg_if_packet_sizes)
467 # Enable packet capture and start packet sending
468 self.pg_enable_capture(self.pg_interfaces)
471 def run_verify_test(self, traffic_type=0, ip_type=0, proto=-1, ports=0,
472 frags=False, pkt_raw=True, etype=-1):
474 # Create incoming packet streams for packet-generator interfaces
476 for i in self.pg_interfaces:
477 if self.flows.__contains__(i):
478 pkts = self.create_stream(i, self.pg_if_packet_sizes,
479 traffic_type, ip_type, proto, ports,
480 frags, pkt_raw, etype)
483 pkts_cnt += len(pkts)
485 # Enable packet capture and start packet sendingself.IPV
486 self.pg_enable_capture(self.pg_interfaces)
488 self.logger.info("sent packets count: %d" % pkts_cnt)
491 # Verify outgoing packet streams per packet-generator interface
492 for src_if in self.pg_interfaces:
493 if self.flows.__contains__(src_if):
494 for dst_if in self.flows[src_if]:
495 capture = dst_if.get_capture(pkts_cnt)
496 self.logger.info("Verifying capture on interface %s" %
498 self.verify_capture(dst_if, capture,
499 traffic_type, ip_type, etype)
501 def run_verify_negat_test(self, traffic_type=0, ip_type=0, proto=-1,
502 ports=0, frags=False, etype=-1):
505 self.reset_packet_infos()
506 for i in self.pg_interfaces:
507 if self.flows.__contains__(i):
508 pkts = self.create_stream(i, self.pg_if_packet_sizes,
509 traffic_type, ip_type, proto, ports,
513 pkts_cnt += len(pkts)
515 # Enable packet capture and start packet sending
516 self.pg_enable_capture(self.pg_interfaces)
518 self.logger.info("sent packets count: %d" % pkts_cnt)
521 # Verify outgoing packet streams per packet-generator interface
522 for src_if in self.pg_interfaces:
523 if self.flows.__contains__(src_if):
524 for dst_if in self.flows[src_if]:
525 self.logger.info("Verifying capture on interface %s" %
527 capture = dst_if.get_capture(0)
528 self.assertEqual(len(capture), 0)
530 def test_0000_warmup_test(self):
531 """ ACL plugin version check; learn MACs
533 reply = self.vapi.papi.acl_plugin_get_version()
534 self.assertEqual(reply.major, 1)
535 self.logger.info("Working with ACL plugin version: %d.%d" % (
536 reply.major, reply.minor))
537 # minor version changes are non breaking
538 # self.assertEqual(reply.minor, 0)
540 def test_0001_acl_create(self):
541 """ ACL create/delete test
544 self.logger.info("ACLP_TEST_START_0001")
546 r = [{'is_permit': 1, 'is_ipv6': 0, 'proto': 17,
547 'srcport_or_icmptype_first': 1234,
548 'srcport_or_icmptype_last': 1235,
549 'src_ip_prefix_len': 0,
550 'src_ip_addr': b'\x00\x00\x00\x00',
551 'dstport_or_icmpcode_first': 1234,
552 'dstport_or_icmpcode_last': 1234,
553 'dst_ip_addr': b'\x00\x00\x00\x00',
554 'dst_ip_prefix_len': 0}]
555 # Test 1: add a new ACL
556 reply = self.vapi.acl_add_replace(acl_index=4294967295, r=r,
558 self.assertEqual(reply.retval, 0)
559 # The very first ACL gets #0
560 self.assertEqual(reply.acl_index, 0)
561 first_acl = reply.acl_index
562 rr = self.vapi.acl_dump(reply.acl_index)
563 self.logger.info("Dumped ACL: " + str(rr))
564 self.assertEqual(len(rr), 1)
565 # We should have the same number of ACL entries as we had asked
566 self.assertEqual(len(rr[0].r), len(r))
567 # The rules should be the same. But because the submitted and returned
568 # are different types, we need to iterate over rules and keys to get
570 for i_rule in range(0, len(r) - 1):
571 for rule_key in r[i_rule]:
572 self.assertEqual(rr[0].r[i_rule][rule_key],
575 # Add a deny-1234 ACL
576 r_deny = [{'is_permit': 0, 'is_ipv6': 0, 'proto': 17,
577 'srcport_or_icmptype_first': 1234,
578 'srcport_or_icmptype_last': 1235,
579 'src_ip_prefix_len': 0,
580 'src_ip_addr': b'\x00\x00\x00\x00',
581 'dstport_or_icmpcode_first': 1234,
582 'dstport_or_icmpcode_last': 1234,
583 'dst_ip_addr': b'\x00\x00\x00\x00',
584 'dst_ip_prefix_len': 0},
585 {'is_permit': 1, 'is_ipv6': 0, 'proto': 17,
586 'srcport_or_icmptype_first': 0,
587 'srcport_or_icmptype_last': 0,
588 'src_ip_prefix_len': 0,
589 'src_ip_addr': b'\x00\x00\x00\x00',
590 'dstport_or_icmpcode_first': 0,
591 'dstport_or_icmpcode_last': 0,
592 'dst_ip_addr': b'\x00\x00\x00\x00',
593 'dst_ip_prefix_len': 0}]
595 reply = self.vapi.acl_add_replace(acl_index=4294967295, r=r_deny,
596 tag=b"deny 1234;permit all")
597 self.assertEqual(reply.retval, 0)
598 # The second ACL gets #1
599 self.assertEqual(reply.acl_index, 1)
600 second_acl = reply.acl_index
602 # Test 2: try to modify a nonexistent ACL
603 reply = self.vapi.acl_add_replace(acl_index=432, r=r,
604 tag=b"FFFF:FFFF", expected_retval=-6)
605 self.assertEqual(reply.retval, -6)
606 # The ACL number should pass through
607 self.assertEqual(reply.acl_index, 432)
608 # apply an ACL on an interface inbound, try to delete ACL, must fail
609 self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
612 reply = self.vapi.acl_del(acl_index=first_acl, expected_retval=-142)
613 # Unapply an ACL and then try to delete it - must be ok
614 self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
617 reply = self.vapi.acl_del(acl_index=first_acl, expected_retval=0)
619 # apply an ACL on an interface outbound, try to delete ACL, must fail
620 self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
623 reply = self.vapi.acl_del(acl_index=second_acl, expected_retval=-143)
624 # Unapply the ACL and then try to delete it - must be ok
625 self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
628 reply = self.vapi.acl_del(acl_index=second_acl, expected_retval=0)
630 # try to apply a nonexistent ACL - must fail
631 self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
636 self.logger.info("ACLP_TEST_FINISH_0001")
638 def test_0002_acl_permit_apply(self):
639 """ permit ACL apply test
641 self.logger.info("ACLP_TEST_START_0002")
644 rules.append(self.create_rule(self.IPV4, self.PERMIT,
645 0, self.proto[self.IP][self.UDP]))
646 rules.append(self.create_rule(self.IPV4, self.PERMIT,
647 0, self.proto[self.IP][self.TCP]))
650 acl_idx = self.apply_rules(rules, b"permit per-flow")
653 reply = self.vapi.papi.acl_stats_intf_counters_enable(enable=1)
655 # Traffic should still pass
656 self.run_verify_test(self.IP, self.IPV4, -1)
658 matches = self.statistics.get_counter('/acl/%d/matches' % acl_idx)
659 self.logger.info("stat segment counters: %s" % repr(matches))
660 cli = "show acl-plugin acl"
661 self.logger.info(self.vapi.ppcli(cli))
662 cli = "show acl-plugin tables"
663 self.logger.info(self.vapi.ppcli(cli))
665 total_hits = matches[0][0]['packets'] + matches[0][1]['packets']
666 self.assertEqual(total_hits, 64)
669 reply = self.vapi.papi.acl_stats_intf_counters_enable(enable=0)
671 self.logger.info("ACLP_TEST_FINISH_0002")
673 def test_0003_acl_deny_apply(self):
674 """ deny ACL apply test
676 self.logger.info("ACLP_TEST_START_0003")
677 # Add a deny-flows ACL
679 rules.append(self.create_rule(self.IPV4, self.DENY,
680 self.PORTS_ALL, self.proto[self.IP][self.UDP]))
681 # Permit ip any any in the end
682 rules.append(self.create_rule(self.IPV4, self.PERMIT,
686 acl_idx = self.apply_rules(rules, b"deny per-flow;permit all")
689 reply = self.vapi.papi.acl_stats_intf_counters_enable(enable=1)
691 # Traffic should not pass
692 self.run_verify_negat_test(self.IP, self.IPV4,
693 self.proto[self.IP][self.UDP])
695 matches = self.statistics.get_counter('/acl/%d/matches' % acl_idx)
696 self.logger.info("stat segment counters: %s" % repr(matches))
697 cli = "show acl-plugin acl"
698 self.logger.info(self.vapi.ppcli(cli))
699 cli = "show acl-plugin tables"
700 self.logger.info(self.vapi.ppcli(cli))
701 self.assertEqual(matches[0][0]['packets'], 64)
703 reply = self.vapi.papi.acl_stats_intf_counters_enable(enable=0)
704 self.logger.info("ACLP_TEST_FINISH_0003")
705 # self.assertEqual(, 0)
707 def test_0004_vpp624_permit_icmpv4(self):
708 """ VPP_624 permit ICMPv4
710 self.logger.info("ACLP_TEST_START_0004")
714 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
715 self.proto[self.ICMP][self.ICMPv4]))
716 # deny ip any any in the end
717 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
720 self.apply_rules(rules, b"permit icmpv4")
722 # Traffic should still pass
723 self.run_verify_test(self.ICMP, self.IPV4,
724 self.proto[self.ICMP][self.ICMPv4])
726 self.logger.info("ACLP_TEST_FINISH_0004")
728 def test_0005_vpp624_permit_icmpv6(self):
729 """ VPP_624 permit ICMPv6
731 self.logger.info("ACLP_TEST_START_0005")
735 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
736 self.proto[self.ICMP][self.ICMPv6]))
737 # deny ip any any in the end
738 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
741 self.apply_rules(rules, b"permit icmpv6")
743 # Traffic should still pass
744 self.run_verify_test(self.ICMP, self.IPV6,
745 self.proto[self.ICMP][self.ICMPv6])
747 self.logger.info("ACLP_TEST_FINISH_0005")
749 def test_0006_vpp624_deny_icmpv4(self):
750 """ VPP_624 deny ICMPv4
752 self.logger.info("ACLP_TEST_START_0006")
755 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
756 self.proto[self.ICMP][self.ICMPv4]))
757 # permit ip any any in the end
758 rules.append(self.create_rule(self.IPV4, self.PERMIT,
762 self.apply_rules(rules, b"deny icmpv4")
764 # Traffic should not pass
765 self.run_verify_negat_test(self.ICMP, self.IPV4, 0)
767 self.logger.info("ACLP_TEST_FINISH_0006")
769 def test_0007_vpp624_deny_icmpv6(self):
770 """ VPP_624 deny ICMPv6
772 self.logger.info("ACLP_TEST_START_0007")
775 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
776 self.proto[self.ICMP][self.ICMPv6]))
777 # deny ip any any in the end
778 rules.append(self.create_rule(self.IPV6, self.PERMIT,
782 self.apply_rules(rules, b"deny icmpv6")
784 # Traffic should not pass
785 self.run_verify_negat_test(self.ICMP, self.IPV6, 0)
787 self.logger.info("ACLP_TEST_FINISH_0007")
789 def test_0008_tcp_permit_v4(self):
792 self.logger.info("ACLP_TEST_START_0008")
796 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
797 self.proto[self.IP][self.TCP]))
798 # deny ip any any in the end
799 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
802 self.apply_rules(rules, b"permit ipv4 tcp")
804 # Traffic should still pass
805 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP])
807 self.logger.info("ACLP_TEST_FINISH_0008")
809 def test_0009_tcp_permit_v6(self):
812 self.logger.info("ACLP_TEST_START_0009")
816 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
817 self.proto[self.IP][self.TCP]))
818 # deny ip any any in the end
819 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
822 self.apply_rules(rules, b"permit ip6 tcp")
824 # Traffic should still pass
825 self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.TCP])
827 self.logger.info("ACLP_TEST_FINISH_0008")
829 def test_0010_udp_permit_v4(self):
832 self.logger.info("ACLP_TEST_START_0010")
836 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
837 self.proto[self.IP][self.UDP]))
838 # deny ip any any in the end
839 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
842 self.apply_rules(rules, b"permit ipv udp")
844 # Traffic should still pass
845 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP])
847 self.logger.info("ACLP_TEST_FINISH_0010")
849 def test_0011_udp_permit_v6(self):
852 self.logger.info("ACLP_TEST_START_0011")
856 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
857 self.proto[self.IP][self.UDP]))
858 # deny ip any any in the end
859 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
862 self.apply_rules(rules, b"permit ip6 udp")
864 # Traffic should still pass
865 self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.UDP])
867 self.logger.info("ACLP_TEST_FINISH_0011")
869 def test_0012_tcp_deny(self):
872 self.logger.info("ACLP_TEST_START_0012")
876 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
877 self.proto[self.IP][self.TCP]))
878 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
879 self.proto[self.IP][self.TCP]))
880 # permit ip any any in the end
881 rules.append(self.create_rule(self.IPV4, self.PERMIT,
883 rules.append(self.create_rule(self.IPV6, self.PERMIT,
887 self.apply_rules(rules, b"deny ip4/ip6 tcp")
889 # Traffic should not pass
890 self.run_verify_negat_test(self.IP, self.IPRANDOM,
891 self.proto[self.IP][self.TCP])
893 self.logger.info("ACLP_TEST_FINISH_0012")
895 def test_0013_udp_deny(self):
898 self.logger.info("ACLP_TEST_START_0013")
902 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
903 self.proto[self.IP][self.UDP]))
904 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
905 self.proto[self.IP][self.UDP]))
906 # permit ip any any in the end
907 rules.append(self.create_rule(self.IPV4, self.PERMIT,
909 rules.append(self.create_rule(self.IPV6, self.PERMIT,
913 self.apply_rules(rules, b"deny ip4/ip6 udp")
915 # Traffic should not pass
916 self.run_verify_negat_test(self.IP, self.IPRANDOM,
917 self.proto[self.IP][self.UDP])
919 self.logger.info("ACLP_TEST_FINISH_0013")
921 def test_0014_acl_dump(self):
922 """ verify add/dump acls
924 self.logger.info("ACLP_TEST_START_0014")
926 r = [[self.IPV4, self.PERMIT, 1234, self.proto[self.IP][self.TCP]],
927 [self.IPV4, self.PERMIT, 2345, self.proto[self.IP][self.UDP]],
928 [self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.TCP]],
929 [self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.UDP]],
930 [self.IPV4, self.PERMIT, 5, self.proto[self.ICMP][self.ICMPv4]],
931 [self.IPV6, self.PERMIT, 4321, self.proto[self.IP][self.TCP]],
932 [self.IPV6, self.PERMIT, 5432, self.proto[self.IP][self.UDP]],
933 [self.IPV6, self.PERMIT, 0, self.proto[self.IP][self.TCP]],
934 [self.IPV6, self.PERMIT, 0, self.proto[self.IP][self.UDP]],
935 [self.IPV6, self.PERMIT, 6, self.proto[self.ICMP][self.ICMPv6]],
936 [self.IPV4, self.DENY, self.PORTS_ALL, 0],
937 [self.IPV4, self.DENY, 1234, self.proto[self.IP][self.TCP]],
938 [self.IPV4, self.DENY, 2345, self.proto[self.IP][self.UDP]],
939 [self.IPV4, self.DENY, 5, self.proto[self.ICMP][self.ICMPv4]],
940 [self.IPV6, self.DENY, 4321, self.proto[self.IP][self.TCP]],
941 [self.IPV6, self.DENY, 5432, self.proto[self.IP][self.UDP]],
942 [self.IPV6, self.DENY, 6, self.proto[self.ICMP][self.ICMPv6]],
943 [self.IPV6, self.DENY, self.PORTS_ALL, 0]
946 # Add and verify new ACLs
948 for i in range(len(r)):
949 rules.append(self.create_rule(r[i][0], r[i][1], r[i][2], r[i][3]))
951 reply = self.vapi.acl_add_replace(acl_index=4294967295, r=rules)
952 result = self.vapi.acl_dump(reply.acl_index)
955 for drules in result:
957 self.assertEqual(dr.is_ipv6, r[i][0])
958 self.assertEqual(dr.is_permit, r[i][1])
959 self.assertEqual(dr.proto, r[i][3])
962 self.assertEqual(dr.srcport_or_icmptype_first, r[i][2])
965 self.assertEqual(dr.srcport_or_icmptype_first, 0)
966 self.assertEqual(dr.srcport_or_icmptype_last, 65535)
968 if dr.proto == self.proto[self.IP][self.TCP]:
969 self.assertGreater(dr.srcport_or_icmptype_first,
970 self.tcp_sport_from-1)
971 self.assertLess(dr.srcport_or_icmptype_first,
973 self.assertGreater(dr.dstport_or_icmpcode_last,
974 self.tcp_dport_from-1)
975 self.assertLess(dr.dstport_or_icmpcode_last,
977 elif dr.proto == self.proto[self.IP][self.UDP]:
978 self.assertGreater(dr.srcport_or_icmptype_first,
979 self.udp_sport_from-1)
980 self.assertLess(dr.srcport_or_icmptype_first,
982 self.assertGreater(dr.dstport_or_icmpcode_last,
983 self.udp_dport_from-1)
984 self.assertLess(dr.dstport_or_icmpcode_last,
988 self.logger.info("ACLP_TEST_FINISH_0014")
990 def test_0015_tcp_permit_port_v4(self):
991 """ permit single TCPv4
993 self.logger.info("ACLP_TEST_START_0015")
995 port = random.randint(0, 65535)
998 rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
999 self.proto[self.IP][self.TCP]))
1000 # deny ip any any in the end
1001 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1004 self.apply_rules(rules, b"permit ip4 tcp %d" % port)
1006 # Traffic should still pass
1007 self.run_verify_test(self.IP, self.IPV4,
1008 self.proto[self.IP][self.TCP], port)
1010 self.logger.info("ACLP_TEST_FINISH_0015")
1012 def test_0016_udp_permit_port_v4(self):
1013 """ permit single UDPv4
1015 self.logger.info("ACLP_TEST_START_0016")
1017 port = random.randint(0, 65535)
1020 rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
1021 self.proto[self.IP][self.UDP]))
1022 # deny ip any any in the end
1023 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1026 self.apply_rules(rules, b"permit ip4 tcp %d" % port)
1028 # Traffic should still pass
1029 self.run_verify_test(self.IP, self.IPV4,
1030 self.proto[self.IP][self.UDP], port)
1032 self.logger.info("ACLP_TEST_FINISH_0016")
1034 def test_0017_tcp_permit_port_v6(self):
1035 """ permit single TCPv6
1037 self.logger.info("ACLP_TEST_START_0017")
1039 port = random.randint(0, 65535)
1042 rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
1043 self.proto[self.IP][self.TCP]))
1044 # deny ip any any in the end
1045 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1048 self.apply_rules(rules, b"permit ip4 tcp %d" % port)
1050 # Traffic should still pass
1051 self.run_verify_test(self.IP, self.IPV6,
1052 self.proto[self.IP][self.TCP], port)
1054 self.logger.info("ACLP_TEST_FINISH_0017")
1056 def test_0018_udp_permit_port_v6(self):
1057 """ permit single UPPv6
1059 self.logger.info("ACLP_TEST_START_0018")
1061 port = random.randint(0, 65535)
1064 rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
1065 self.proto[self.IP][self.UDP]))
1066 # deny ip any any in the end
1067 rules.append(self.create_rule(self.IPV6, self.DENY,
1071 self.apply_rules(rules, b"permit ip4 tcp %d" % port)
1073 # Traffic should still pass
1074 self.run_verify_test(self.IP, self.IPV6,
1075 self.proto[self.IP][self.UDP], port)
1077 self.logger.info("ACLP_TEST_FINISH_0018")
1079 def test_0019_udp_deny_port(self):
1080 """ deny single TCPv4/v6
1082 self.logger.info("ACLP_TEST_START_0019")
1084 port = random.randint(0, 65535)
1087 rules.append(self.create_rule(self.IPV4, self.DENY, port,
1088 self.proto[self.IP][self.TCP]))
1089 rules.append(self.create_rule(self.IPV6, self.DENY, port,
1090 self.proto[self.IP][self.TCP]))
1091 # Permit ip any any in the end
1092 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1094 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1098 self.apply_rules(rules, b"deny ip4/ip6 udp %d" % port)
1100 # Traffic should not pass
1101 self.run_verify_negat_test(self.IP, self.IPRANDOM,
1102 self.proto[self.IP][self.TCP], port)
1104 self.logger.info("ACLP_TEST_FINISH_0019")
1106 def test_0020_udp_deny_port(self):
1107 """ deny single UDPv4/v6
1109 self.logger.info("ACLP_TEST_START_0020")
1111 port = random.randint(0, 65535)
1114 rules.append(self.create_rule(self.IPV4, self.DENY, port,
1115 self.proto[self.IP][self.UDP]))
1116 rules.append(self.create_rule(self.IPV6, self.DENY, port,
1117 self.proto[self.IP][self.UDP]))
1118 # Permit ip any any in the end
1119 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1121 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1125 self.apply_rules(rules, b"deny ip4/ip6 udp %d" % port)
1127 # Traffic should not pass
1128 self.run_verify_negat_test(self.IP, self.IPRANDOM,
1129 self.proto[self.IP][self.UDP], port)
1131 self.logger.info("ACLP_TEST_FINISH_0020")
1133 def test_0021_udp_deny_port_verify_fragment_deny(self):
1134 """ deny single UDPv4/v6, permit ip any, verify non-initial fragment
1137 self.logger.info("ACLP_TEST_START_0021")
1139 port = random.randint(0, 65535)
1142 rules.append(self.create_rule(self.IPV4, self.DENY, port,
1143 self.proto[self.IP][self.UDP]))
1144 rules.append(self.create_rule(self.IPV6, self.DENY, port,
1145 self.proto[self.IP][self.UDP]))
1146 # deny ip any any in the end
1147 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1149 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1153 self.apply_rules(rules, b"deny ip4/ip6 udp %d" % port)
1155 # Traffic should not pass
1156 self.run_verify_negat_test(self.IP, self.IPRANDOM,
1157 self.proto[self.IP][self.UDP], port, True)
1159 self.logger.info("ACLP_TEST_FINISH_0021")
1161 def test_0022_zero_length_udp_ipv4(self):
1162 """ VPP-687 zero length udp ipv4 packet"""
1163 self.logger.info("ACLP_TEST_START_0022")
1165 port = random.randint(0, 65535)
1168 rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
1169 self.proto[self.IP][self.UDP]))
1170 # deny ip any any in the end
1172 self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1175 self.apply_rules(rules, b"permit empty udp ip4 %d" % port)
1177 # Traffic should still pass
1178 # Create incoming packet streams for packet-generator interfaces
1180 pkts = self.create_stream(self.pg0, self.pg_if_packet_sizes,
1182 self.proto[self.IP][self.UDP], port,
1185 self.pg0.add_stream(pkts)
1186 pkts_cnt += len(pkts)
1188 # Enable packet capture and start packet sendingself.IPV
1189 self.pg_enable_capture(self.pg_interfaces)
1192 self.pg1.get_capture(pkts_cnt)
1194 self.logger.info("ACLP_TEST_FINISH_0022")
1196 def test_0023_zero_length_udp_ipv6(self):
1197 """ VPP-687 zero length udp ipv6 packet"""
1198 self.logger.info("ACLP_TEST_START_0023")
1200 port = random.randint(0, 65535)
1203 rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
1204 self.proto[self.IP][self.UDP]))
1205 # deny ip any any in the end
1206 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1209 self.apply_rules(rules, b"permit empty udp ip6 %d" % port)
1211 # Traffic should still pass
1212 # Create incoming packet streams for packet-generator interfaces
1214 pkts = self.create_stream(self.pg0, self.pg_if_packet_sizes,
1216 self.proto[self.IP][self.UDP], port,
1219 self.pg0.add_stream(pkts)
1220 pkts_cnt += len(pkts)
1222 # Enable packet capture and start packet sendingself.IPV
1223 self.pg_enable_capture(self.pg_interfaces)
1226 # Verify outgoing packet streams per packet-generator interface
1227 self.pg1.get_capture(pkts_cnt)
1229 self.logger.info("ACLP_TEST_FINISH_0023")
1231 def test_0108_tcp_permit_v4(self):
1232 """ permit TCPv4 + non-match range
1234 self.logger.info("ACLP_TEST_START_0108")
1238 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1239 self.proto[self.IP][self.TCP]))
1240 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1241 self.proto[self.IP][self.TCP]))
1242 # deny ip any any in the end
1243 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1246 self.apply_rules(rules, b"permit ipv4 tcp")
1248 # Traffic should still pass
1249 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP])
1251 self.logger.info("ACLP_TEST_FINISH_0108")
1253 def test_0109_tcp_permit_v6(self):
1254 """ permit TCPv6 + non-match range
1256 self.logger.info("ACLP_TEST_START_0109")
1260 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE_2,
1261 self.proto[self.IP][self.TCP]))
1262 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
1263 self.proto[self.IP][self.TCP]))
1264 # deny ip any any in the end
1265 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1268 self.apply_rules(rules, b"permit ip6 tcp")
1270 # Traffic should still pass
1271 self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.TCP])
1273 self.logger.info("ACLP_TEST_FINISH_0109")
1275 def test_0110_udp_permit_v4(self):
1276 """ permit UDPv4 + non-match range
1278 self.logger.info("ACLP_TEST_START_0110")
1282 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1283 self.proto[self.IP][self.UDP]))
1284 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1285 self.proto[self.IP][self.UDP]))
1286 # deny ip any any in the end
1287 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1290 self.apply_rules(rules, b"permit ipv4 udp")
1292 # Traffic should still pass
1293 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP])
1295 self.logger.info("ACLP_TEST_FINISH_0110")
1297 def test_0111_udp_permit_v6(self):
1298 """ permit UDPv6 + non-match range
1300 self.logger.info("ACLP_TEST_START_0111")
1304 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE_2,
1305 self.proto[self.IP][self.UDP]))
1306 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
1307 self.proto[self.IP][self.UDP]))
1308 # deny ip any any in the end
1309 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1312 self.apply_rules(rules, b"permit ip6 udp")
1314 # Traffic should still pass
1315 self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.UDP])
1317 self.logger.info("ACLP_TEST_FINISH_0111")
1319 def test_0112_tcp_deny(self):
1320 """ deny TCPv4/v6 + non-match range
1322 self.logger.info("ACLP_TEST_START_0112")
1326 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1328 self.proto[self.IP][self.TCP]))
1329 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1331 self.proto[self.IP][self.TCP]))
1332 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
1333 self.proto[self.IP][self.TCP]))
1334 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
1335 self.proto[self.IP][self.TCP]))
1336 # permit ip any any in the end
1337 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1339 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1343 self.apply_rules(rules, b"deny ip4/ip6 tcp")
1345 # Traffic should not pass
1346 self.run_verify_negat_test(self.IP, self.IPRANDOM,
1347 self.proto[self.IP][self.TCP])
1349 self.logger.info("ACLP_TEST_FINISH_0112")
1351 def test_0113_udp_deny(self):
1352 """ deny UDPv4/v6 + non-match range
1354 self.logger.info("ACLP_TEST_START_0113")
1358 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1360 self.proto[self.IP][self.UDP]))
1361 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1363 self.proto[self.IP][self.UDP]))
1364 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
1365 self.proto[self.IP][self.UDP]))
1366 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
1367 self.proto[self.IP][self.UDP]))
1368 # permit ip any any in the end
1369 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1371 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1375 self.apply_rules(rules, b"deny ip4/ip6 udp")
1377 # Traffic should not pass
1378 self.run_verify_negat_test(self.IP, self.IPRANDOM,
1379 self.proto[self.IP][self.UDP])
1381 self.logger.info("ACLP_TEST_FINISH_0113")
1383 def test_0300_tcp_permit_v4_etype_aaaa(self):
1384 """ permit TCPv4, send 0xAAAA etype
1386 self.logger.info("ACLP_TEST_START_0300")
1390 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1391 self.proto[self.IP][self.TCP]))
1392 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1393 self.proto[self.IP][self.TCP]))
1394 # deny ip any any in the end
1395 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1398 self.apply_rules(rules, b"permit ipv4 tcp")
1400 # Traffic should still pass also for an odd ethertype
1401 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP],
1402 0, False, True, 0xaaaa)
1403 self.logger.info("ACLP_TEST_FINISH_0300")
1405 def test_0305_tcp_permit_v4_etype_blacklist_aaaa(self):
1406 """ permit TCPv4, whitelist 0x0BBB ethertype, send 0xAAAA-blocked
1408 self.logger.info("ACLP_TEST_START_0305")
1412 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1413 self.proto[self.IP][self.TCP]))
1414 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1415 self.proto[self.IP][self.TCP]))
1416 # deny ip any any in the end
1417 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1420 self.apply_rules(rules, b"permit ipv4 tcp")
1422 # whitelist the 0xbbbb etype - so the 0xaaaa should be blocked
1423 self.etype_whitelist([0xbbb], 1)
1425 # The oddball ethertype should be blocked
1426 self.run_verify_negat_test(self.IP, self.IPV4,
1427 self.proto[self.IP][self.TCP],
1430 # remove the whitelist
1431 self.etype_whitelist([], 0)
1433 self.logger.info("ACLP_TEST_FINISH_0305")
1435 def test_0306_tcp_permit_v4_etype_blacklist_aaaa(self):
1436 """ permit TCPv4, whitelist 0x0BBB ethertype, send 0x0BBB - pass
1438 self.logger.info("ACLP_TEST_START_0306")
1442 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1443 self.proto[self.IP][self.TCP]))
1444 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1445 self.proto[self.IP][self.TCP]))
1446 # deny ip any any in the end
1447 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1450 self.apply_rules(rules, b"permit ipv4 tcp")
1452 # whitelist the 0xbbbb etype - so the 0xaaaa should be blocked
1453 self.etype_whitelist([0xbbb], 1)
1455 # The whitelisted traffic, should pass
1456 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP],
1457 0, False, True, 0x0bbb)
1459 # remove the whitelist, the previously blocked 0xAAAA should pass now
1460 self.etype_whitelist([], 0)
1462 self.logger.info("ACLP_TEST_FINISH_0306")
1464 def test_0307_tcp_permit_v4_etype_blacklist_aaaa(self):
1465 """ permit TCPv4, whitelist 0x0BBB, remove, send 0xAAAA - pass
1467 self.logger.info("ACLP_TEST_START_0307")
1471 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1472 self.proto[self.IP][self.TCP]))
1473 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1474 self.proto[self.IP][self.TCP]))
1475 # deny ip any any in the end
1476 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1479 self.apply_rules(rules, b"permit ipv4 tcp")
1481 # whitelist the 0xbbbb etype - so the 0xaaaa should be blocked
1482 self.etype_whitelist([0xbbb], 1)
1483 # remove the whitelist, the previously blocked 0xAAAA should pass now
1484 self.etype_whitelist([], 0)
1486 # The whitelisted traffic, should pass
1487 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP],
1488 0, False, True, 0xaaaa)
1490 self.logger.info("ACLP_TEST_FINISH_0306")
1492 def test_0315_del_intf(self):
1493 """ apply an acl and delete the interface
1495 self.logger.info("ACLP_TEST_START_0315")
1499 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1500 self.proto[self.IP][self.TCP]))
1501 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1502 self.proto[self.IP][self.TCP]))
1503 # deny ip any any in the end
1504 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1506 # create an interface
1508 intf.append(VppLoInterface(self))
1511 self.apply_rules_to(rules, b"permit ipv4 tcp", intf[0].sw_if_index)
1513 # Remove the interface
1514 intf[0].remove_vpp_config()
1516 self.logger.info("ACLP_TEST_FINISH_0315")
1518 if __name__ == '__main__':
1519 unittest.main(testRunner=VppTestRunner)