2 """ACL plugin Test Case HLD:
8 from scapy.packet import Raw
9 from scapy.layers.l2 import Ether
10 from scapy.layers.inet import IP, TCP, UDP, ICMP
11 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest
12 from scapy.layers.inet6 import IPv6ExtHdrFragment
13 from framework import VppTestCase, VppTestRunner
14 from util import Host, ppp
16 from vpp_lo_interface import VppLoInterface
19 class TestACLplugin(VppTestCase):
20 """ ACL plugin Test Case """
36 proto = [[6, 17], [1, 58]]
37 proto_map = {1: 'ICMP', 58: 'ICMPv6EchoRequest', 6: 'TCP', 17: 'UDP'}
49 udp_sport_to = udp_sport_from + 5
50 udp_dport_from = 20000
51 udp_dport_to = udp_dport_from + 5000
53 tcp_sport_to = tcp_sport_from + 5
54 tcp_dport_from = 40000
55 tcp_dport_to = tcp_dport_from + 5000
58 udp_sport_to_2 = udp_sport_from_2 + 5
59 udp_dport_from_2 = 30000
60 udp_dport_to_2 = udp_dport_from_2 + 5000
61 tcp_sport_from_2 = 130
62 tcp_sport_to_2 = tcp_sport_from_2 + 5
63 tcp_dport_from_2 = 20000
64 tcp_dport_to_2 = tcp_dport_from_2 + 5000
66 icmp4_type = 8 # echo request
68 icmp6_type = 128 # echo request
84 Perform standard class setup (defined by class method setUpClass in
85 class VppTestCase) before running the test case, set test case related
86 variables and configure VPP.
88 super(TestACLplugin, cls).setUpClass()
91 # Create 2 pg interfaces
92 cls.create_pg_interfaces(range(2))
94 # Packet flows mapping pg0 -> pg1, pg2 etc.
96 cls.flows[cls.pg0] = [cls.pg1]
99 cls.pg_if_packet_sizes = [64, 512, 1518, 9018]
101 # Create BD with MAC learning and unknown unicast flooding disabled
102 # and put interfaces to this BD
103 cls.vapi.bridge_domain_add_del(bd_id=cls.bd_id, uu_flood=1,
105 for pg_if in cls.pg_interfaces:
106 cls.vapi.sw_interface_set_l2_bridge(
107 rx_sw_if_index=pg_if.sw_if_index, bd_id=cls.bd_id)
109 # Set up all interfaces
110 for i in cls.pg_interfaces:
113 # Mapping between packet-generator index and lists of test hosts
114 cls.hosts_by_pg_idx = dict()
115 for pg_if in cls.pg_interfaces:
116 cls.hosts_by_pg_idx[pg_if.sw_if_index] = []
118 # Create list of deleted hosts
119 cls.deleted_hosts_by_pg_idx = dict()
120 for pg_if in cls.pg_interfaces:
121 cls.deleted_hosts_by_pg_idx[pg_if.sw_if_index] = []
123 # warm-up the mac address tables
127 n_int = len(cls.pg_interfaces)
128 macs_per_if = count / n_int
130 for pg_if in cls.pg_interfaces:
132 start_nr = macs_per_if * i + start
133 end_nr = count + start if i == (n_int - 1) \
134 else macs_per_if * (i + 1) + start
135 hosts = cls.hosts_by_pg_idx[pg_if.sw_if_index]
136 for j in range(start_nr, end_nr):
138 "00:00:00:ff:%02x:%02x" % (pg_if.sw_if_index, j),
139 "172.17.1%02x.%u" % (pg_if.sw_if_index, j),
140 "2017:dead:%02x::%u" % (pg_if.sw_if_index, j))
144 super(TestACLplugin, cls).tearDownClass()
148 def tearDownClass(cls):
149 super(TestACLplugin, cls).tearDownClass()
152 super(TestACLplugin, self).setUp()
153 self.reset_packet_infos()
157 Show various debug prints after each test.
159 super(TestACLplugin, self).tearDown()
160 if not self.vpp_dead:
161 cli = "show vlib graph l2-input-feat-arc"
162 self.logger.info(self.vapi.ppcli(cli))
163 cli = "show vlib graph l2-input-feat-arc-end"
164 self.logger.info(self.vapi.ppcli(cli))
165 cli = "show vlib graph l2-output-feat-arc"
166 self.logger.info(self.vapi.ppcli(cli))
167 cli = "show vlib graph l2-output-feat-arc-end"
168 self.logger.info(self.vapi.ppcli(cli))
169 self.logger.info(self.vapi.ppcli("show l2fib verbose"))
170 self.logger.info(self.vapi.ppcli("show acl-plugin acl"))
171 self.logger.info(self.vapi.ppcli("show acl-plugin interface"))
172 self.logger.info(self.vapi.ppcli("show acl-plugin tables"))
173 self.logger.info(self.vapi.ppcli("show bridge-domain %s detail"
176 def create_rule(self, ip=0, permit_deny=0, ports=PORTS_ALL, proto=-1,
177 s_prefix=0, s_ip='\x00\x00\x00\x00',
178 d_prefix=0, d_ip='\x00\x00\x00\x00'):
181 if ports == self.PORTS_ALL:
184 sport_to = 65535 if proto != 1 and proto != 58 else 255
186 elif ports == self.PORTS_RANGE:
188 sport_from = self.icmp4_type
189 sport_to = self.icmp4_type
190 dport_from = self.icmp4_code
191 dport_to = self.icmp4_code
193 sport_from = self.icmp6_type
194 sport_to = self.icmp6_type
195 dport_from = self.icmp6_code
196 dport_to = self.icmp6_code
197 elif proto == self.proto[self.IP][self.TCP]:
198 sport_from = self.tcp_sport_from
199 sport_to = self.tcp_sport_to
200 dport_from = self.tcp_dport_from
201 dport_to = self.tcp_dport_to
202 elif proto == self.proto[self.IP][self.UDP]:
203 sport_from = self.udp_sport_from
204 sport_to = self.udp_sport_to
205 dport_from = self.udp_dport_from
206 dport_to = self.udp_dport_to
207 elif ports == self.PORTS_RANGE_2:
209 sport_from = self.icmp4_type_2
210 sport_to = self.icmp4_type_2
211 dport_from = self.icmp4_code_from_2
212 dport_to = self.icmp4_code_to_2
214 sport_from = self.icmp6_type_2
215 sport_to = self.icmp6_type_2
216 dport_from = self.icmp6_code_from_2
217 dport_to = self.icmp6_code_to_2
218 elif proto == self.proto[self.IP][self.TCP]:
219 sport_from = self.tcp_sport_from_2
220 sport_to = self.tcp_sport_to_2
221 dport_from = self.tcp_dport_from_2
222 dport_to = self.tcp_dport_to_2
223 elif proto == self.proto[self.IP][self.UDP]:
224 sport_from = self.udp_sport_from_2
225 sport_to = self.udp_sport_to_2
226 dport_from = self.udp_dport_from_2
227 dport_to = self.udp_dport_to_2
234 rule = ({'is_permit': permit_deny, 'is_ipv6': ip, 'proto': proto,
235 'srcport_or_icmptype_first': sport_from,
236 'srcport_or_icmptype_last': sport_to,
237 'src_ip_prefix_len': s_prefix,
239 'dstport_or_icmpcode_first': dport_from,
240 'dstport_or_icmpcode_last': dport_to,
241 'dst_ip_prefix_len': d_prefix,
242 'dst_ip_addr': d_ip})
245 def apply_rules(self, rules, tag=b''):
246 reply = self.vapi.acl_add_replace(acl_index=4294967295, r=rules,
248 self.logger.info("Dumped ACL: " + str(
249 self.vapi.acl_dump(reply.acl_index)))
250 # Apply a ACL on the interface as inbound
251 for i in self.pg_interfaces:
252 self.vapi.acl_interface_set_acl_list(sw_if_index=i.sw_if_index,
254 acls=[reply.acl_index])
257 def apply_rules_to(self, rules, tag=b'', sw_if_index=0xFFFFFFFF):
258 reply = self.vapi.acl_add_replace(acl_index=4294967295, r=rules,
260 self.logger.info("Dumped ACL: " + str(
261 self.vapi.acl_dump(reply.acl_index)))
262 # Apply a ACL on the interface as inbound
263 self.vapi.acl_interface_set_acl_list(sw_if_index=sw_if_index,
265 acls=[reply.acl_index])
268 def etype_whitelist(self, whitelist, n_input):
269 # Apply whitelists on all the interfaces
270 for i in self.pg_interfaces:
271 # checkstyle can't read long names. Help them.
272 fun = self.vapi.acl_interface_set_etype_whitelist
273 fun(sw_if_index=i.sw_if_index, n_input=n_input,
277 def create_upper_layer(self, packet_index, proto, ports=0):
278 p = self.proto_map[proto]
281 return UDP(sport=random.randint(self.udp_sport_from,
283 dport=random.randint(self.udp_dport_from,
286 return UDP(sport=ports, dport=ports)
289 return TCP(sport=random.randint(self.tcp_sport_from,
291 dport=random.randint(self.tcp_dport_from,
294 return TCP(sport=ports, dport=ports)
297 def create_stream(self, src_if, packet_sizes, traffic_type=0, ipv6=0,
298 proto=-1, ports=0, fragments=False,
299 pkt_raw=True, etype=-1):
301 Create input packet stream for defined interface using hosts or
304 :param object src_if: Interface to create packet stream for.
305 :param list packet_sizes: List of required packet sizes.
306 :param traffic_type: 1: ICMP packet, 2: IPv6 with EH, 0: otherwise.
307 :return: Stream of packets.
310 if self.flows.__contains__(src_if):
311 src_hosts = self.hosts_by_pg_idx[src_if.sw_if_index]
312 for dst_if in self.flows[src_if]:
313 dst_hosts = self.hosts_by_pg_idx[dst_if.sw_if_index]
314 n_int = len(dst_hosts) * len(src_hosts)
315 for i in range(0, n_int):
316 dst_host = dst_hosts[i / len(src_hosts)]
317 src_host = src_hosts[i % len(src_hosts)]
318 pkt_info = self.create_packet_info(src_if, dst_if)
324 pkt_info.ip = random.choice([0, 1])
326 pkt_info.proto = random.choice(self.proto[self.IP])
328 pkt_info.proto = proto
329 payload = self.info_to_payload(pkt_info)
330 p = Ether(dst=dst_host.mac, src=src_host.mac)
332 p = Ether(dst=dst_host.mac,
336 p /= IPv6(dst=dst_host.ip6, src=src_host.ip6)
338 p /= IPv6ExtHdrFragment(offset=64, m=1)
341 p /= IP(src=src_host.ip4, dst=dst_host.ip4,
344 p /= IP(src=src_host.ip4, dst=dst_host.ip4)
345 if traffic_type == self.ICMP:
347 p /= ICMPv6EchoRequest(type=self.icmp6_type,
348 code=self.icmp6_code)
350 p /= ICMP(type=self.icmp4_type,
351 code=self.icmp4_code)
353 p /= self.create_upper_layer(i, pkt_info.proto, ports)
356 pkt_info.data = p.copy()
358 size = random.choice(packet_sizes)
359 self.extend_packet(p, size)
363 def verify_capture(self, pg_if, capture,
364 traffic_type=0, ip_type=0, etype=-1):
366 Verify captured input packet stream for defined interface.
368 :param object pg_if: Interface to verify captured packet stream for.
369 :param list capture: Captured packet stream.
370 :param traffic_type: 1: ICMP packet, 2: IPv6 with EH, 0: otherwise.
373 for i in self.pg_interfaces:
374 last_info[i.sw_if_index] = None
375 dst_sw_if_index = pg_if.sw_if_index
376 for packet in capture:
378 if packet[Ether].type != etype:
379 self.logger.error(ppp("Unexpected ethertype in packet:",
384 # Raw data for ICMPv6 are stored in ICMPv6EchoRequest.data
385 if traffic_type == self.ICMP and ip_type == self.IPV6:
386 payload_info = self.payload_to_info(
387 packet[ICMPv6EchoRequest], 'data')
388 payload = packet[ICMPv6EchoRequest]
390 payload_info = self.payload_to_info(packet[Raw])
391 payload = packet[self.proto_map[payload_info.proto]]
393 self.logger.error(ppp("Unexpected or invalid packet "
394 "(outside network):", packet))
398 self.assertEqual(payload_info.ip, ip_type)
399 if traffic_type == self.ICMP:
401 if payload_info.ip == 0:
402 self.assertEqual(payload.type, self.icmp4_type)
403 self.assertEqual(payload.code, self.icmp4_code)
405 self.assertEqual(payload.type, self.icmp6_type)
406 self.assertEqual(payload.code, self.icmp6_code)
408 self.logger.error(ppp("Unexpected or invalid packet "
409 "(outside network):", packet))
413 ip_version = IPv6 if payload_info.ip == 1 else IP
415 ip = packet[ip_version]
416 packet_index = payload_info.index
418 self.assertEqual(payload_info.dst, dst_sw_if_index)
419 self.logger.debug("Got packet on port %s: src=%u (id=%u)" %
420 (pg_if.name, payload_info.src,
422 next_info = self.get_next_packet_info_for_interface2(
423 payload_info.src, dst_sw_if_index,
424 last_info[payload_info.src])
425 last_info[payload_info.src] = next_info
426 self.assertTrue(next_info is not None)
427 self.assertEqual(packet_index, next_info.index)
428 saved_packet = next_info.data
429 # Check standard fields
430 self.assertEqual(ip.src, saved_packet[ip_version].src)
431 self.assertEqual(ip.dst, saved_packet[ip_version].dst)
432 p = self.proto_map[payload_info.proto]
435 self.assertEqual(tcp.sport, saved_packet[
437 self.assertEqual(tcp.dport, saved_packet[
441 self.assertEqual(udp.sport, saved_packet[
443 self.assertEqual(udp.dport, saved_packet[
446 self.logger.error(ppp("Unexpected or invalid packet:",
449 for i in self.pg_interfaces:
450 remaining_packet = self.get_next_packet_info_for_interface2(
451 i, dst_sw_if_index, last_info[i.sw_if_index])
453 remaining_packet is None,
454 "Port %u: Packet expected from source %u didn't arrive" %
455 (dst_sw_if_index, i.sw_if_index))
457 def run_traffic_no_check(self):
459 # Create incoming packet streams for packet-generator interfaces
460 for i in self.pg_interfaces:
461 if self.flows.__contains__(i):
462 pkts = self.create_stream(i, self.pg_if_packet_sizes)
466 # Enable packet capture and start packet sending
467 self.pg_enable_capture(self.pg_interfaces)
470 def run_verify_test(self, traffic_type=0, ip_type=0, proto=-1, ports=0,
471 frags=False, pkt_raw=True, etype=-1):
473 # Create incoming packet streams for packet-generator interfaces
475 for i in self.pg_interfaces:
476 if self.flows.__contains__(i):
477 pkts = self.create_stream(i, self.pg_if_packet_sizes,
478 traffic_type, ip_type, proto, ports,
479 frags, pkt_raw, etype)
482 pkts_cnt += len(pkts)
484 # Enable packet capture and start packet sendingself.IPV
485 self.pg_enable_capture(self.pg_interfaces)
487 self.logger.info("sent packets count: %d" % pkts_cnt)
490 # Verify outgoing packet streams per packet-generator interface
491 for src_if in self.pg_interfaces:
492 if self.flows.__contains__(src_if):
493 for dst_if in self.flows[src_if]:
494 capture = dst_if.get_capture(pkts_cnt)
495 self.logger.info("Verifying capture on interface %s" %
497 self.verify_capture(dst_if, capture,
498 traffic_type, ip_type, etype)
500 def run_verify_negat_test(self, traffic_type=0, ip_type=0, proto=-1,
501 ports=0, frags=False, etype=-1):
504 self.reset_packet_infos()
505 for i in self.pg_interfaces:
506 if self.flows.__contains__(i):
507 pkts = self.create_stream(i, self.pg_if_packet_sizes,
508 traffic_type, ip_type, proto, ports,
512 pkts_cnt += len(pkts)
514 # Enable packet capture and start packet sending
515 self.pg_enable_capture(self.pg_interfaces)
517 self.logger.info("sent packets count: %d" % pkts_cnt)
520 # Verify outgoing packet streams per packet-generator interface
521 for src_if in self.pg_interfaces:
522 if self.flows.__contains__(src_if):
523 for dst_if in self.flows[src_if]:
524 self.logger.info("Verifying capture on interface %s" %
526 capture = dst_if.get_capture(0)
527 self.assertEqual(len(capture), 0)
529 def test_0000_warmup_test(self):
530 """ ACL plugin version check; learn MACs
532 reply = self.vapi.papi.acl_plugin_get_version()
533 self.assertEqual(reply.major, 1)
534 self.logger.info("Working with ACL plugin version: %d.%d" % (
535 reply.major, reply.minor))
536 # minor version changes are non breaking
537 # self.assertEqual(reply.minor, 0)
539 def test_0001_acl_create(self):
540 """ ACL create/delete test
543 self.logger.info("ACLP_TEST_START_0001")
545 r = [{'is_permit': 1, 'is_ipv6': 0, 'proto': 17,
546 'srcport_or_icmptype_first': 1234,
547 'srcport_or_icmptype_last': 1235,
548 'src_ip_prefix_len': 0,
549 'src_ip_addr': b'\x00\x00\x00\x00',
550 'dstport_or_icmpcode_first': 1234,
551 'dstport_or_icmpcode_last': 1234,
552 'dst_ip_addr': b'\x00\x00\x00\x00',
553 'dst_ip_prefix_len': 0}]
554 # Test 1: add a new ACL
555 reply = self.vapi.acl_add_replace(acl_index=4294967295, r=r,
557 self.assertEqual(reply.retval, 0)
558 # The very first ACL gets #0
559 self.assertEqual(reply.acl_index, 0)
560 first_acl = reply.acl_index
561 rr = self.vapi.acl_dump(reply.acl_index)
562 self.logger.info("Dumped ACL: " + str(rr))
563 self.assertEqual(len(rr), 1)
564 # We should have the same number of ACL entries as we had asked
565 self.assertEqual(len(rr[0].r), len(r))
566 # The rules should be the same. But because the submitted and returned
567 # are different types, we need to iterate over rules and keys to get
569 for i_rule in range(0, len(r) - 1):
570 for rule_key in r[i_rule]:
571 self.assertEqual(rr[0].r[i_rule][rule_key],
574 # Add a deny-1234 ACL
575 r_deny = [{'is_permit': 0, 'is_ipv6': 0, 'proto': 17,
576 'srcport_or_icmptype_first': 1234,
577 'srcport_or_icmptype_last': 1235,
578 'src_ip_prefix_len': 0,
579 'src_ip_addr': b'\x00\x00\x00\x00',
580 'dstport_or_icmpcode_first': 1234,
581 'dstport_or_icmpcode_last': 1234,
582 'dst_ip_addr': b'\x00\x00\x00\x00',
583 'dst_ip_prefix_len': 0},
584 {'is_permit': 1, 'is_ipv6': 0, 'proto': 17,
585 'srcport_or_icmptype_first': 0,
586 'srcport_or_icmptype_last': 0,
587 'src_ip_prefix_len': 0,
588 'src_ip_addr': b'\x00\x00\x00\x00',
589 'dstport_or_icmpcode_first': 0,
590 'dstport_or_icmpcode_last': 0,
591 'dst_ip_addr': b'\x00\x00\x00\x00',
592 'dst_ip_prefix_len': 0}]
594 reply = self.vapi.acl_add_replace(acl_index=4294967295, r=r_deny,
595 tag=b"deny 1234;permit all")
596 self.assertEqual(reply.retval, 0)
597 # The second ACL gets #1
598 self.assertEqual(reply.acl_index, 1)
599 second_acl = reply.acl_index
601 # Test 2: try to modify a nonexistent ACL
602 reply = self.vapi.acl_add_replace(acl_index=432, r=r,
603 tag=b"FFFF:FFFF", expected_retval=-6)
604 self.assertEqual(reply.retval, -6)
605 # The ACL number should pass through
606 self.assertEqual(reply.acl_index, 432)
607 # apply an ACL on an interface inbound, try to delete ACL, must fail
608 self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
611 reply = self.vapi.acl_del(acl_index=first_acl, expected_retval=-142)
612 # Unapply an ACL and then try to delete it - must be ok
613 self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
616 reply = self.vapi.acl_del(acl_index=first_acl, expected_retval=0)
618 # apply an ACL on an interface outbound, try to delete ACL, must fail
619 self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
622 reply = self.vapi.acl_del(acl_index=second_acl, expected_retval=-143)
623 # Unapply the ACL and then try to delete it - must be ok
624 self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
627 reply = self.vapi.acl_del(acl_index=second_acl, expected_retval=0)
629 # try to apply a nonexistent ACL - must fail
630 self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
635 self.logger.info("ACLP_TEST_FINISH_0001")
637 def test_0002_acl_permit_apply(self):
638 """ permit ACL apply test
640 self.logger.info("ACLP_TEST_START_0002")
643 rules.append(self.create_rule(self.IPV4, self.PERMIT,
644 0, self.proto[self.IP][self.UDP]))
645 rules.append(self.create_rule(self.IPV4, self.PERMIT,
646 0, self.proto[self.IP][self.TCP]))
649 self.apply_rules(rules, b"permit per-flow")
651 # Traffic should still pass
652 self.run_verify_test(self.IP, self.IPV4, -1)
653 self.logger.info("ACLP_TEST_FINISH_0002")
655 def test_0003_acl_deny_apply(self):
656 """ deny ACL apply test
658 self.logger.info("ACLP_TEST_START_0003")
659 # Add a deny-flows ACL
661 rules.append(self.create_rule(self.IPV4, self.DENY,
662 self.PORTS_ALL, self.proto[self.IP][self.UDP]))
663 # Permit ip any any in the end
664 rules.append(self.create_rule(self.IPV4, self.PERMIT,
668 self.apply_rules(rules, b"deny per-flow;permit all")
670 # Traffic should not pass
671 self.run_verify_negat_test(self.IP, self.IPV4,
672 self.proto[self.IP][self.UDP])
673 self.logger.info("ACLP_TEST_FINISH_0003")
674 # self.assertEqual(1, 0)
676 def test_0004_vpp624_permit_icmpv4(self):
677 """ VPP_624 permit ICMPv4
679 self.logger.info("ACLP_TEST_START_0004")
683 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
684 self.proto[self.ICMP][self.ICMPv4]))
685 # deny ip any any in the end
686 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
689 self.apply_rules(rules, b"permit icmpv4")
691 # Traffic should still pass
692 self.run_verify_test(self.ICMP, self.IPV4,
693 self.proto[self.ICMP][self.ICMPv4])
695 self.logger.info("ACLP_TEST_FINISH_0004")
697 def test_0005_vpp624_permit_icmpv6(self):
698 """ VPP_624 permit ICMPv6
700 self.logger.info("ACLP_TEST_START_0005")
704 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
705 self.proto[self.ICMP][self.ICMPv6]))
706 # deny ip any any in the end
707 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
710 self.apply_rules(rules, b"permit icmpv6")
712 # Traffic should still pass
713 self.run_verify_test(self.ICMP, self.IPV6,
714 self.proto[self.ICMP][self.ICMPv6])
716 self.logger.info("ACLP_TEST_FINISH_0005")
718 def test_0006_vpp624_deny_icmpv4(self):
719 """ VPP_624 deny ICMPv4
721 self.logger.info("ACLP_TEST_START_0006")
724 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
725 self.proto[self.ICMP][self.ICMPv4]))
726 # permit ip any any in the end
727 rules.append(self.create_rule(self.IPV4, self.PERMIT,
731 self.apply_rules(rules, b"deny icmpv4")
733 # Traffic should not pass
734 self.run_verify_negat_test(self.ICMP, self.IPV4, 0)
736 self.logger.info("ACLP_TEST_FINISH_0006")
738 def test_0007_vpp624_deny_icmpv6(self):
739 """ VPP_624 deny ICMPv6
741 self.logger.info("ACLP_TEST_START_0007")
744 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
745 self.proto[self.ICMP][self.ICMPv6]))
746 # deny ip any any in the end
747 rules.append(self.create_rule(self.IPV6, self.PERMIT,
751 self.apply_rules(rules, b"deny icmpv6")
753 # Traffic should not pass
754 self.run_verify_negat_test(self.ICMP, self.IPV6, 0)
756 self.logger.info("ACLP_TEST_FINISH_0007")
758 def test_0008_tcp_permit_v4(self):
761 self.logger.info("ACLP_TEST_START_0008")
765 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
766 self.proto[self.IP][self.TCP]))
767 # deny ip any any in the end
768 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
771 self.apply_rules(rules, b"permit ipv4 tcp")
773 # Traffic should still pass
774 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP])
776 self.logger.info("ACLP_TEST_FINISH_0008")
778 def test_0009_tcp_permit_v6(self):
781 self.logger.info("ACLP_TEST_START_0009")
785 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
786 self.proto[self.IP][self.TCP]))
787 # deny ip any any in the end
788 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
791 self.apply_rules(rules, b"permit ip6 tcp")
793 # Traffic should still pass
794 self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.TCP])
796 self.logger.info("ACLP_TEST_FINISH_0008")
798 def test_0010_udp_permit_v4(self):
801 self.logger.info("ACLP_TEST_START_0010")
805 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
806 self.proto[self.IP][self.UDP]))
807 # deny ip any any in the end
808 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
811 self.apply_rules(rules, b"permit ipv udp")
813 # Traffic should still pass
814 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP])
816 self.logger.info("ACLP_TEST_FINISH_0010")
818 def test_0011_udp_permit_v6(self):
821 self.logger.info("ACLP_TEST_START_0011")
825 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
826 self.proto[self.IP][self.UDP]))
827 # deny ip any any in the end
828 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
831 self.apply_rules(rules, b"permit ip6 udp")
833 # Traffic should still pass
834 self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.UDP])
836 self.logger.info("ACLP_TEST_FINISH_0011")
838 def test_0012_tcp_deny(self):
841 self.logger.info("ACLP_TEST_START_0012")
845 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
846 self.proto[self.IP][self.TCP]))
847 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
848 self.proto[self.IP][self.TCP]))
849 # permit ip any any in the end
850 rules.append(self.create_rule(self.IPV4, self.PERMIT,
852 rules.append(self.create_rule(self.IPV6, self.PERMIT,
856 self.apply_rules(rules, b"deny ip4/ip6 tcp")
858 # Traffic should not pass
859 self.run_verify_negat_test(self.IP, self.IPRANDOM,
860 self.proto[self.IP][self.TCP])
862 self.logger.info("ACLP_TEST_FINISH_0012")
864 def test_0013_udp_deny(self):
867 self.logger.info("ACLP_TEST_START_0013")
871 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
872 self.proto[self.IP][self.UDP]))
873 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
874 self.proto[self.IP][self.UDP]))
875 # permit ip any any in the end
876 rules.append(self.create_rule(self.IPV4, self.PERMIT,
878 rules.append(self.create_rule(self.IPV6, self.PERMIT,
882 self.apply_rules(rules, b"deny ip4/ip6 udp")
884 # Traffic should not pass
885 self.run_verify_negat_test(self.IP, self.IPRANDOM,
886 self.proto[self.IP][self.UDP])
888 self.logger.info("ACLP_TEST_FINISH_0013")
890 def test_0014_acl_dump(self):
891 """ verify add/dump acls
893 self.logger.info("ACLP_TEST_START_0014")
895 r = [[self.IPV4, self.PERMIT, 1234, self.proto[self.IP][self.TCP]],
896 [self.IPV4, self.PERMIT, 2345, self.proto[self.IP][self.UDP]],
897 [self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.TCP]],
898 [self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.UDP]],
899 [self.IPV4, self.PERMIT, 5, self.proto[self.ICMP][self.ICMPv4]],
900 [self.IPV6, self.PERMIT, 4321, self.proto[self.IP][self.TCP]],
901 [self.IPV6, self.PERMIT, 5432, self.proto[self.IP][self.UDP]],
902 [self.IPV6, self.PERMIT, 0, self.proto[self.IP][self.TCP]],
903 [self.IPV6, self.PERMIT, 0, self.proto[self.IP][self.UDP]],
904 [self.IPV6, self.PERMIT, 6, self.proto[self.ICMP][self.ICMPv6]],
905 [self.IPV4, self.DENY, self.PORTS_ALL, 0],
906 [self.IPV4, self.DENY, 1234, self.proto[self.IP][self.TCP]],
907 [self.IPV4, self.DENY, 2345, self.proto[self.IP][self.UDP]],
908 [self.IPV4, self.DENY, 5, self.proto[self.ICMP][self.ICMPv4]],
909 [self.IPV6, self.DENY, 4321, self.proto[self.IP][self.TCP]],
910 [self.IPV6, self.DENY, 5432, self.proto[self.IP][self.UDP]],
911 [self.IPV6, self.DENY, 6, self.proto[self.ICMP][self.ICMPv6]],
912 [self.IPV6, self.DENY, self.PORTS_ALL, 0]
915 # Add and verify new ACLs
917 for i in range(len(r)):
918 rules.append(self.create_rule(r[i][0], r[i][1], r[i][2], r[i][3]))
920 reply = self.vapi.acl_add_replace(acl_index=4294967295, r=rules)
921 result = self.vapi.acl_dump(reply.acl_index)
924 for drules in result:
926 self.assertEqual(dr.is_ipv6, r[i][0])
927 self.assertEqual(dr.is_permit, r[i][1])
928 self.assertEqual(dr.proto, r[i][3])
931 self.assertEqual(dr.srcport_or_icmptype_first, r[i][2])
934 self.assertEqual(dr.srcport_or_icmptype_first, 0)
935 self.assertEqual(dr.srcport_or_icmptype_last, 65535)
937 if dr.proto == self.proto[self.IP][self.TCP]:
938 self.assertGreater(dr.srcport_or_icmptype_first,
939 self.tcp_sport_from-1)
940 self.assertLess(dr.srcport_or_icmptype_first,
942 self.assertGreater(dr.dstport_or_icmpcode_last,
943 self.tcp_dport_from-1)
944 self.assertLess(dr.dstport_or_icmpcode_last,
946 elif dr.proto == self.proto[self.IP][self.UDP]:
947 self.assertGreater(dr.srcport_or_icmptype_first,
948 self.udp_sport_from-1)
949 self.assertLess(dr.srcport_or_icmptype_first,
951 self.assertGreater(dr.dstport_or_icmpcode_last,
952 self.udp_dport_from-1)
953 self.assertLess(dr.dstport_or_icmpcode_last,
957 self.logger.info("ACLP_TEST_FINISH_0014")
959 def test_0015_tcp_permit_port_v4(self):
960 """ permit single TCPv4
962 self.logger.info("ACLP_TEST_START_0015")
964 port = random.randint(0, 65535)
967 rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
968 self.proto[self.IP][self.TCP]))
969 # deny ip any any in the end
970 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
973 self.apply_rules(rules, b"permit ip4 tcp %d" % port)
975 # Traffic should still pass
976 self.run_verify_test(self.IP, self.IPV4,
977 self.proto[self.IP][self.TCP], port)
979 self.logger.info("ACLP_TEST_FINISH_0015")
981 def test_0016_udp_permit_port_v4(self):
982 """ permit single UDPv4
984 self.logger.info("ACLP_TEST_START_0016")
986 port = random.randint(0, 65535)
989 rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
990 self.proto[self.IP][self.UDP]))
991 # deny ip any any in the end
992 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
995 self.apply_rules(rules, b"permit ip4 tcp %d" % port)
997 # Traffic should still pass
998 self.run_verify_test(self.IP, self.IPV4,
999 self.proto[self.IP][self.UDP], port)
1001 self.logger.info("ACLP_TEST_FINISH_0016")
1003 def test_0017_tcp_permit_port_v6(self):
1004 """ permit single TCPv6
1006 self.logger.info("ACLP_TEST_START_0017")
1008 port = random.randint(0, 65535)
1011 rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
1012 self.proto[self.IP][self.TCP]))
1013 # deny ip any any in the end
1014 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1017 self.apply_rules(rules, b"permit ip4 tcp %d" % port)
1019 # Traffic should still pass
1020 self.run_verify_test(self.IP, self.IPV6,
1021 self.proto[self.IP][self.TCP], port)
1023 self.logger.info("ACLP_TEST_FINISH_0017")
1025 def test_0018_udp_permit_port_v6(self):
1026 """ permit single UPPv6
1028 self.logger.info("ACLP_TEST_START_0018")
1030 port = random.randint(0, 65535)
1033 rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
1034 self.proto[self.IP][self.UDP]))
1035 # deny ip any any in the end
1036 rules.append(self.create_rule(self.IPV6, self.DENY,
1040 self.apply_rules(rules, b"permit ip4 tcp %d" % port)
1042 # Traffic should still pass
1043 self.run_verify_test(self.IP, self.IPV6,
1044 self.proto[self.IP][self.UDP], port)
1046 self.logger.info("ACLP_TEST_FINISH_0018")
1048 def test_0019_udp_deny_port(self):
1049 """ deny single TCPv4/v6
1051 self.logger.info("ACLP_TEST_START_0019")
1053 port = random.randint(0, 65535)
1056 rules.append(self.create_rule(self.IPV4, self.DENY, port,
1057 self.proto[self.IP][self.TCP]))
1058 rules.append(self.create_rule(self.IPV6, self.DENY, port,
1059 self.proto[self.IP][self.TCP]))
1060 # Permit ip any any in the end
1061 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1063 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1067 self.apply_rules(rules, b"deny ip4/ip6 udp %d" % port)
1069 # Traffic should not pass
1070 self.run_verify_negat_test(self.IP, self.IPRANDOM,
1071 self.proto[self.IP][self.TCP], port)
1073 self.logger.info("ACLP_TEST_FINISH_0019")
1075 def test_0020_udp_deny_port(self):
1076 """ deny single UDPv4/v6
1078 self.logger.info("ACLP_TEST_START_0020")
1080 port = random.randint(0, 65535)
1083 rules.append(self.create_rule(self.IPV4, self.DENY, port,
1084 self.proto[self.IP][self.UDP]))
1085 rules.append(self.create_rule(self.IPV6, self.DENY, port,
1086 self.proto[self.IP][self.UDP]))
1087 # Permit ip any any in the end
1088 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1090 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1094 self.apply_rules(rules, b"deny ip4/ip6 udp %d" % port)
1096 # Traffic should not pass
1097 self.run_verify_negat_test(self.IP, self.IPRANDOM,
1098 self.proto[self.IP][self.UDP], port)
1100 self.logger.info("ACLP_TEST_FINISH_0020")
1102 def test_0021_udp_deny_port_verify_fragment_deny(self):
1103 """ deny single UDPv4/v6, permit ip any, verify non-initial fragment
1106 self.logger.info("ACLP_TEST_START_0021")
1108 port = random.randint(0, 65535)
1111 rules.append(self.create_rule(self.IPV4, self.DENY, port,
1112 self.proto[self.IP][self.UDP]))
1113 rules.append(self.create_rule(self.IPV6, self.DENY, port,
1114 self.proto[self.IP][self.UDP]))
1115 # deny ip any any in the end
1116 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1118 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1122 self.apply_rules(rules, b"deny ip4/ip6 udp %d" % port)
1124 # Traffic should not pass
1125 self.run_verify_negat_test(self.IP, self.IPRANDOM,
1126 self.proto[self.IP][self.UDP], port, True)
1128 self.logger.info("ACLP_TEST_FINISH_0021")
1130 def test_0022_zero_length_udp_ipv4(self):
1131 """ VPP-687 zero length udp ipv4 packet"""
1132 self.logger.info("ACLP_TEST_START_0022")
1134 port = random.randint(0, 65535)
1137 rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
1138 self.proto[self.IP][self.UDP]))
1139 # deny ip any any in the end
1141 self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1144 self.apply_rules(rules, b"permit empty udp ip4 %d" % port)
1146 # Traffic should still pass
1147 # Create incoming packet streams for packet-generator interfaces
1149 pkts = self.create_stream(self.pg0, self.pg_if_packet_sizes,
1151 self.proto[self.IP][self.UDP], port,
1154 self.pg0.add_stream(pkts)
1155 pkts_cnt += len(pkts)
1157 # Enable packet capture and start packet sendingself.IPV
1158 self.pg_enable_capture(self.pg_interfaces)
1161 self.pg1.get_capture(pkts_cnt)
1163 self.logger.info("ACLP_TEST_FINISH_0022")
1165 def test_0023_zero_length_udp_ipv6(self):
1166 """ VPP-687 zero length udp ipv6 packet"""
1167 self.logger.info("ACLP_TEST_START_0023")
1169 port = random.randint(0, 65535)
1172 rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
1173 self.proto[self.IP][self.UDP]))
1174 # deny ip any any in the end
1175 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1178 self.apply_rules(rules, b"permit empty udp ip6 %d" % port)
1180 # Traffic should still pass
1181 # Create incoming packet streams for packet-generator interfaces
1183 pkts = self.create_stream(self.pg0, self.pg_if_packet_sizes,
1185 self.proto[self.IP][self.UDP], port,
1188 self.pg0.add_stream(pkts)
1189 pkts_cnt += len(pkts)
1191 # Enable packet capture and start packet sendingself.IPV
1192 self.pg_enable_capture(self.pg_interfaces)
1195 # Verify outgoing packet streams per packet-generator interface
1196 self.pg1.get_capture(pkts_cnt)
1198 self.logger.info("ACLP_TEST_FINISH_0023")
1200 def test_0108_tcp_permit_v4(self):
1201 """ permit TCPv4 + non-match range
1203 self.logger.info("ACLP_TEST_START_0108")
1207 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1208 self.proto[self.IP][self.TCP]))
1209 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1210 self.proto[self.IP][self.TCP]))
1211 # deny ip any any in the end
1212 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1215 self.apply_rules(rules, b"permit ipv4 tcp")
1217 # Traffic should still pass
1218 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP])
1220 self.logger.info("ACLP_TEST_FINISH_0108")
1222 def test_0109_tcp_permit_v6(self):
1223 """ permit TCPv6 + non-match range
1225 self.logger.info("ACLP_TEST_START_0109")
1229 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE_2,
1230 self.proto[self.IP][self.TCP]))
1231 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
1232 self.proto[self.IP][self.TCP]))
1233 # deny ip any any in the end
1234 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1237 self.apply_rules(rules, b"permit ip6 tcp")
1239 # Traffic should still pass
1240 self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.TCP])
1242 self.logger.info("ACLP_TEST_FINISH_0109")
1244 def test_0110_udp_permit_v4(self):
1245 """ permit UDPv4 + non-match range
1247 self.logger.info("ACLP_TEST_START_0110")
1251 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1252 self.proto[self.IP][self.UDP]))
1253 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1254 self.proto[self.IP][self.UDP]))
1255 # deny ip any any in the end
1256 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1259 self.apply_rules(rules, b"permit ipv4 udp")
1261 # Traffic should still pass
1262 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP])
1264 self.logger.info("ACLP_TEST_FINISH_0110")
1266 def test_0111_udp_permit_v6(self):
1267 """ permit UDPv6 + non-match range
1269 self.logger.info("ACLP_TEST_START_0111")
1273 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE_2,
1274 self.proto[self.IP][self.UDP]))
1275 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
1276 self.proto[self.IP][self.UDP]))
1277 # deny ip any any in the end
1278 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1281 self.apply_rules(rules, b"permit ip6 udp")
1283 # Traffic should still pass
1284 self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.UDP])
1286 self.logger.info("ACLP_TEST_FINISH_0111")
1288 def test_0112_tcp_deny(self):
1289 """ deny TCPv4/v6 + non-match range
1291 self.logger.info("ACLP_TEST_START_0112")
1295 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1297 self.proto[self.IP][self.TCP]))
1298 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1300 self.proto[self.IP][self.TCP]))
1301 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
1302 self.proto[self.IP][self.TCP]))
1303 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
1304 self.proto[self.IP][self.TCP]))
1305 # permit ip any any in the end
1306 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1308 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1312 self.apply_rules(rules, b"deny ip4/ip6 tcp")
1314 # Traffic should not pass
1315 self.run_verify_negat_test(self.IP, self.IPRANDOM,
1316 self.proto[self.IP][self.TCP])
1318 self.logger.info("ACLP_TEST_FINISH_0112")
1320 def test_0113_udp_deny(self):
1321 """ deny UDPv4/v6 + non-match range
1323 self.logger.info("ACLP_TEST_START_0113")
1327 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1329 self.proto[self.IP][self.UDP]))
1330 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1332 self.proto[self.IP][self.UDP]))
1333 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
1334 self.proto[self.IP][self.UDP]))
1335 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
1336 self.proto[self.IP][self.UDP]))
1337 # permit ip any any in the end
1338 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1340 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1344 self.apply_rules(rules, b"deny ip4/ip6 udp")
1346 # Traffic should not pass
1347 self.run_verify_negat_test(self.IP, self.IPRANDOM,
1348 self.proto[self.IP][self.UDP])
1350 self.logger.info("ACLP_TEST_FINISH_0113")
1352 def test_0300_tcp_permit_v4_etype_aaaa(self):
1353 """ permit TCPv4, send 0xAAAA etype
1355 self.logger.info("ACLP_TEST_START_0300")
1359 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1360 self.proto[self.IP][self.TCP]))
1361 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1362 self.proto[self.IP][self.TCP]))
1363 # deny ip any any in the end
1364 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1367 self.apply_rules(rules, b"permit ipv4 tcp")
1369 # Traffic should still pass also for an odd ethertype
1370 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP],
1371 0, False, True, 0xaaaa)
1372 self.logger.info("ACLP_TEST_FINISH_0300")
1374 def test_0305_tcp_permit_v4_etype_blacklist_aaaa(self):
1375 """ permit TCPv4, whitelist 0x0BBB ethertype, send 0xAAAA-blocked
1377 self.logger.info("ACLP_TEST_START_0305")
1381 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1382 self.proto[self.IP][self.TCP]))
1383 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1384 self.proto[self.IP][self.TCP]))
1385 # deny ip any any in the end
1386 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1389 self.apply_rules(rules, b"permit ipv4 tcp")
1391 # whitelist the 0xbbbb etype - so the 0xaaaa should be blocked
1392 self.etype_whitelist([0xbbb], 1)
1394 # The oddball ethertype should be blocked
1395 self.run_verify_negat_test(self.IP, self.IPV4,
1396 self.proto[self.IP][self.TCP],
1399 # remove the whitelist
1400 self.etype_whitelist([], 0)
1402 self.logger.info("ACLP_TEST_FINISH_0305")
1404 def test_0306_tcp_permit_v4_etype_blacklist_aaaa(self):
1405 """ permit TCPv4, whitelist 0x0BBB ethertype, send 0x0BBB - pass
1407 self.logger.info("ACLP_TEST_START_0306")
1411 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1412 self.proto[self.IP][self.TCP]))
1413 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1414 self.proto[self.IP][self.TCP]))
1415 # deny ip any any in the end
1416 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1419 self.apply_rules(rules, b"permit ipv4 tcp")
1421 # whitelist the 0xbbbb etype - so the 0xaaaa should be blocked
1422 self.etype_whitelist([0xbbb], 1)
1424 # The whitelisted traffic, should pass
1425 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP],
1426 0, False, True, 0x0bbb)
1428 # remove the whitelist, the previously blocked 0xAAAA should pass now
1429 self.etype_whitelist([], 0)
1431 self.logger.info("ACLP_TEST_FINISH_0306")
1433 def test_0307_tcp_permit_v4_etype_blacklist_aaaa(self):
1434 """ permit TCPv4, whitelist 0x0BBB, remove, send 0xAAAA - pass
1436 self.logger.info("ACLP_TEST_START_0307")
1440 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1441 self.proto[self.IP][self.TCP]))
1442 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1443 self.proto[self.IP][self.TCP]))
1444 # deny ip any any in the end
1445 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1448 self.apply_rules(rules, b"permit ipv4 tcp")
1450 # whitelist the 0xbbbb etype - so the 0xaaaa should be blocked
1451 self.etype_whitelist([0xbbb], 1)
1452 # remove the whitelist, the previously blocked 0xAAAA should pass now
1453 self.etype_whitelist([], 0)
1455 # The whitelisted traffic, should pass
1456 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP],
1457 0, False, True, 0xaaaa)
1459 self.logger.info("ACLP_TEST_FINISH_0306")
1461 def test_0315_del_intf(self):
1462 """ apply an acl and delete the interface
1464 self.logger.info("ACLP_TEST_START_0315")
1468 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1469 self.proto[self.IP][self.TCP]))
1470 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1471 self.proto[self.IP][self.TCP]))
1472 # deny ip any any in the end
1473 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1475 # create an interface
1477 intf.append(VppLoInterface(self))
1480 self.apply_rules_to(rules, b"permit ipv4 tcp", intf[0].sw_if_index)
1482 # Remove the interface
1483 intf[0].remove_vpp_config()
1485 self.logger.info("ACLP_TEST_FINISH_0315")
1487 if __name__ == '__main__':
1488 unittest.main(testRunner=VppTestRunner)