2 """ACL plugin Test Case HLD:
8 from scapy.packet import Raw
9 from scapy.layers.l2 import Ether
10 from scapy.layers.inet import IP, TCP, UDP, ICMP
11 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest
12 from scapy.layers.inet6 import IPv6ExtHdrFragment
13 from framework import VppTestCase, VppTestRunner
14 from util import Host, ppp
16 from vpp_lo_interface import VppLoInterface
19 class TestACLplugin(VppTestCase):
20 """ ACL plugin Test Case """
36 proto = [[6, 17], [1, 58]]
37 proto_map = {1: 'ICMP', 58: 'ICMPv6EchoRequest', 6: 'TCP', 17: 'UDP'}
49 udp_sport_to = udp_sport_from + 5
50 udp_dport_from = 20000
51 udp_dport_to = udp_dport_from + 5000
53 tcp_sport_to = tcp_sport_from + 5
54 tcp_dport_from = 40000
55 tcp_dport_to = tcp_dport_from + 5000
58 udp_sport_to_2 = udp_sport_from_2 + 5
59 udp_dport_from_2 = 30000
60 udp_dport_to_2 = udp_dport_from_2 + 5000
61 tcp_sport_from_2 = 130
62 tcp_sport_to_2 = tcp_sport_from_2 + 5
63 tcp_dport_from_2 = 20000
64 tcp_dport_to_2 = tcp_dport_from_2 + 5000
66 icmp4_type = 8 # echo request
68 icmp6_type = 128 # echo request
84 Perform standard class setup (defined by class method setUpClass in
85 class VppTestCase) before running the test case, set test case related
86 variables and configure VPP.
88 super(TestACLplugin, cls).setUpClass()
91 # Create 2 pg interfaces
92 cls.create_pg_interfaces(range(2))
94 # Packet flows mapping pg0 -> pg1, pg2 etc.
96 cls.flows[cls.pg0] = [cls.pg1]
99 cls.pg_if_packet_sizes = [64, 512, 1518, 9018]
101 # Create BD with MAC learning and unknown unicast flooding disabled
102 # and put interfaces to this BD
103 cls.vapi.bridge_domain_add_del(bd_id=cls.bd_id, uu_flood=1,
105 for pg_if in cls.pg_interfaces:
106 cls.vapi.sw_interface_set_l2_bridge(pg_if.sw_if_index,
109 # Set up all interfaces
110 for i in cls.pg_interfaces:
113 # Mapping between packet-generator index and lists of test hosts
114 cls.hosts_by_pg_idx = dict()
115 for pg_if in cls.pg_interfaces:
116 cls.hosts_by_pg_idx[pg_if.sw_if_index] = []
118 # Create list of deleted hosts
119 cls.deleted_hosts_by_pg_idx = dict()
120 for pg_if in cls.pg_interfaces:
121 cls.deleted_hosts_by_pg_idx[pg_if.sw_if_index] = []
123 # warm-up the mac address tables
127 super(TestACLplugin, cls).tearDownClass()
131 super(TestACLplugin, self).setUp()
132 self.reset_packet_infos()
136 Show various debug prints after each test.
138 super(TestACLplugin, self).tearDown()
139 if not self.vpp_dead:
140 self.logger.info(self.vapi.ppcli("show l2fib verbose"))
141 self.logger.info(self.vapi.ppcli("show acl-plugin acl"))
142 self.logger.info(self.vapi.ppcli("show acl-plugin interface"))
143 self.logger.info(self.vapi.ppcli("show acl-plugin tables"))
144 self.logger.info(self.vapi.ppcli("show bridge-domain %s detail"
147 def create_hosts(self, count, start=0):
149 Create required number of host MAC addresses and distribute them among
150 interfaces. Create host IPv4 address for every host MAC address.
152 :param int count: Number of hosts to create MAC/IPv4 addresses for.
153 :param int start: Number to start numbering from.
155 n_int = len(self.pg_interfaces)
156 macs_per_if = count / n_int
158 for pg_if in self.pg_interfaces:
160 start_nr = macs_per_if * i + start
161 end_nr = count + start if i == (n_int - 1) \
162 else macs_per_if * (i + 1) + start
163 hosts = self.hosts_by_pg_idx[pg_if.sw_if_index]
164 for j in range(start_nr, end_nr):
166 "00:00:00:ff:%02x:%02x" % (pg_if.sw_if_index, j),
167 "172.17.1%02x.%u" % (pg_if.sw_if_index, j),
168 "2017:dead:%02x::%u" % (pg_if.sw_if_index, j))
171 def create_rule(self, ip=0, permit_deny=0, ports=PORTS_ALL, proto=-1,
172 s_prefix=0, s_ip='\x00\x00\x00\x00',
173 d_prefix=0, d_ip='\x00\x00\x00\x00'):
176 if ports == self.PORTS_ALL:
179 sport_to = 65535 if proto != 1 and proto != 58 else 255
181 elif ports == self.PORTS_RANGE:
183 sport_from = self.icmp4_type
184 sport_to = self.icmp4_type
185 dport_from = self.icmp4_code
186 dport_to = self.icmp4_code
188 sport_from = self.icmp6_type
189 sport_to = self.icmp6_type
190 dport_from = self.icmp6_code
191 dport_to = self.icmp6_code
192 elif proto == self.proto[self.IP][self.TCP]:
193 sport_from = self.tcp_sport_from
194 sport_to = self.tcp_sport_to
195 dport_from = self.tcp_dport_from
196 dport_to = self.tcp_dport_to
197 elif proto == self.proto[self.IP][self.UDP]:
198 sport_from = self.udp_sport_from
199 sport_to = self.udp_sport_to
200 dport_from = self.udp_dport_from
201 dport_to = self.udp_dport_to
202 elif ports == self.PORTS_RANGE_2:
204 sport_from = self.icmp4_type_2
205 sport_to = self.icmp4_type_2
206 dport_from = self.icmp4_code_from_2
207 dport_to = self.icmp4_code_to_2
209 sport_from = self.icmp6_type_2
210 sport_to = self.icmp6_type_2
211 dport_from = self.icmp6_code_from_2
212 dport_to = self.icmp6_code_to_2
213 elif proto == self.proto[self.IP][self.TCP]:
214 sport_from = self.tcp_sport_from_2
215 sport_to = self.tcp_sport_to_2
216 dport_from = self.tcp_dport_from_2
217 dport_to = self.tcp_dport_to_2
218 elif proto == self.proto[self.IP][self.UDP]:
219 sport_from = self.udp_sport_from_2
220 sport_to = self.udp_sport_to_2
221 dport_from = self.udp_dport_from_2
222 dport_to = self.udp_dport_to_2
229 rule = ({'is_permit': permit_deny, 'is_ipv6': ip, 'proto': proto,
230 'srcport_or_icmptype_first': sport_from,
231 'srcport_or_icmptype_last': sport_to,
232 'src_ip_prefix_len': s_prefix,
234 'dstport_or_icmpcode_first': dport_from,
235 'dstport_or_icmpcode_last': dport_to,
236 'dst_ip_prefix_len': d_prefix,
237 'dst_ip_addr': d_ip})
240 def apply_rules(self, rules, tag=''):
241 reply = self.vapi.acl_add_replace(acl_index=4294967295, r=rules,
243 self.logger.info("Dumped ACL: " + str(
244 self.vapi.acl_dump(reply.acl_index)))
245 # Apply a ACL on the interface as inbound
246 for i in self.pg_interfaces:
247 self.vapi.acl_interface_set_acl_list(sw_if_index=i.sw_if_index,
249 acls=[reply.acl_index])
252 def apply_rules_to(self, rules, tag='', sw_if_index=0xFFFFFFFF):
253 reply = self.vapi.acl_add_replace(acl_index=4294967295, r=rules,
255 self.logger.info("Dumped ACL: " + str(
256 self.vapi.acl_dump(reply.acl_index)))
257 # Apply a ACL on the interface as inbound
258 self.vapi.acl_interface_set_acl_list(sw_if_index=sw_if_index,
260 acls=[reply.acl_index])
263 def etype_whitelist(self, whitelist, n_input):
264 # Apply whitelists on all the interfaces
265 for i in self.pg_interfaces:
266 # checkstyle can't read long names. Help them.
267 fun = self.vapi.acl_interface_set_etype_whitelist
268 fun(sw_if_index=i.sw_if_index, n_input=n_input,
272 def create_upper_layer(self, packet_index, proto, ports=0):
273 p = self.proto_map[proto]
276 return UDP(sport=random.randint(self.udp_sport_from,
278 dport=random.randint(self.udp_dport_from,
281 return UDP(sport=ports, dport=ports)
284 return TCP(sport=random.randint(self.tcp_sport_from,
286 dport=random.randint(self.tcp_dport_from,
289 return TCP(sport=ports, dport=ports)
292 def create_stream(self, src_if, packet_sizes, traffic_type=0, ipv6=0,
293 proto=-1, ports=0, fragments=False,
294 pkt_raw=True, etype=-1):
296 Create input packet stream for defined interface using hosts or
299 :param object src_if: Interface to create packet stream for.
300 :param list packet_sizes: List of required packet sizes.
301 :param traffic_type: 1: ICMP packet, 2: IPv6 with EH, 0: otherwise.
302 :return: Stream of packets.
305 if self.flows.__contains__(src_if):
306 src_hosts = self.hosts_by_pg_idx[src_if.sw_if_index]
307 for dst_if in self.flows[src_if]:
308 dst_hosts = self.hosts_by_pg_idx[dst_if.sw_if_index]
309 n_int = len(dst_hosts) * len(src_hosts)
310 for i in range(0, n_int):
311 dst_host = dst_hosts[i / len(src_hosts)]
312 src_host = src_hosts[i % len(src_hosts)]
313 pkt_info = self.create_packet_info(src_if, dst_if)
319 pkt_info.ip = random.choice([0, 1])
321 pkt_info.proto = random.choice(self.proto[self.IP])
323 pkt_info.proto = proto
324 payload = self.info_to_payload(pkt_info)
325 p = Ether(dst=dst_host.mac, src=src_host.mac)
327 p = Ether(dst=dst_host.mac,
331 p /= IPv6(dst=dst_host.ip6, src=src_host.ip6)
333 p /= IPv6ExtHdrFragment(offset=64, m=1)
336 p /= IP(src=src_host.ip4, dst=dst_host.ip4,
339 p /= IP(src=src_host.ip4, dst=dst_host.ip4)
340 if traffic_type == self.ICMP:
342 p /= ICMPv6EchoRequest(type=self.icmp6_type,
343 code=self.icmp6_code)
345 p /= ICMP(type=self.icmp4_type,
346 code=self.icmp4_code)
348 p /= self.create_upper_layer(i, pkt_info.proto, ports)
351 pkt_info.data = p.copy()
353 size = random.choice(packet_sizes)
354 self.extend_packet(p, size)
358 def verify_capture(self, pg_if, capture,
359 traffic_type=0, ip_type=0, etype=-1):
361 Verify captured input packet stream for defined interface.
363 :param object pg_if: Interface to verify captured packet stream for.
364 :param list capture: Captured packet stream.
365 :param traffic_type: 1: ICMP packet, 2: IPv6 with EH, 0: otherwise.
368 for i in self.pg_interfaces:
369 last_info[i.sw_if_index] = None
370 dst_sw_if_index = pg_if.sw_if_index
371 for packet in capture:
373 if packet[Ether].type != etype:
374 self.logger.error(ppp("Unexpected ethertype in packet:",
379 # Raw data for ICMPv6 are stored in ICMPv6EchoRequest.data
380 if traffic_type == self.ICMP and ip_type == self.IPV6:
381 payload_info = self.payload_to_info(
382 packet[ICMPv6EchoRequest].data)
383 payload = packet[ICMPv6EchoRequest]
385 payload_info = self.payload_to_info(str(packet[Raw]))
386 payload = packet[self.proto_map[payload_info.proto]]
388 self.logger.error(ppp("Unexpected or invalid packet "
389 "(outside network):", packet))
393 self.assertEqual(payload_info.ip, ip_type)
394 if traffic_type == self.ICMP:
396 if payload_info.ip == 0:
397 self.assertEqual(payload.type, self.icmp4_type)
398 self.assertEqual(payload.code, self.icmp4_code)
400 self.assertEqual(payload.type, self.icmp6_type)
401 self.assertEqual(payload.code, self.icmp6_code)
403 self.logger.error(ppp("Unexpected or invalid packet "
404 "(outside network):", packet))
408 ip_version = IPv6 if payload_info.ip == 1 else IP
410 ip = packet[ip_version]
411 packet_index = payload_info.index
413 self.assertEqual(payload_info.dst, dst_sw_if_index)
414 self.logger.debug("Got packet on port %s: src=%u (id=%u)" %
415 (pg_if.name, payload_info.src,
417 next_info = self.get_next_packet_info_for_interface2(
418 payload_info.src, dst_sw_if_index,
419 last_info[payload_info.src])
420 last_info[payload_info.src] = next_info
421 self.assertTrue(next_info is not None)
422 self.assertEqual(packet_index, next_info.index)
423 saved_packet = next_info.data
424 # Check standard fields
425 self.assertEqual(ip.src, saved_packet[ip_version].src)
426 self.assertEqual(ip.dst, saved_packet[ip_version].dst)
427 p = self.proto_map[payload_info.proto]
430 self.assertEqual(tcp.sport, saved_packet[
432 self.assertEqual(tcp.dport, saved_packet[
436 self.assertEqual(udp.sport, saved_packet[
438 self.assertEqual(udp.dport, saved_packet[
441 self.logger.error(ppp("Unexpected or invalid packet:",
444 for i in self.pg_interfaces:
445 remaining_packet = self.get_next_packet_info_for_interface2(
446 i, dst_sw_if_index, last_info[i.sw_if_index])
448 remaining_packet is None,
449 "Port %u: Packet expected from source %u didn't arrive" %
450 (dst_sw_if_index, i.sw_if_index))
452 def run_traffic_no_check(self):
454 # Create incoming packet streams for packet-generator interfaces
455 for i in self.pg_interfaces:
456 if self.flows.__contains__(i):
457 pkts = self.create_stream(i, self.pg_if_packet_sizes)
461 # Enable packet capture and start packet sending
462 self.pg_enable_capture(self.pg_interfaces)
465 def run_verify_test(self, traffic_type=0, ip_type=0, proto=-1, ports=0,
466 frags=False, pkt_raw=True, etype=-1):
468 # Create incoming packet streams for packet-generator interfaces
470 for i in self.pg_interfaces:
471 if self.flows.__contains__(i):
472 pkts = self.create_stream(i, self.pg_if_packet_sizes,
473 traffic_type, ip_type, proto, ports,
474 frags, pkt_raw, etype)
477 pkts_cnt += len(pkts)
479 # Enable packet capture and start packet sendingself.IPV
480 self.pg_enable_capture(self.pg_interfaces)
484 # Verify outgoing packet streams per packet-generator interface
485 for src_if in self.pg_interfaces:
486 if self.flows.__contains__(src_if):
487 for dst_if in self.flows[src_if]:
488 capture = dst_if.get_capture(pkts_cnt)
489 self.logger.info("Verifying capture on interface %s" %
491 self.verify_capture(dst_if, capture,
492 traffic_type, ip_type, etype)
494 def run_verify_negat_test(self, traffic_type=0, ip_type=0, proto=-1,
495 ports=0, frags=False, etype=-1):
497 self.reset_packet_infos()
498 for i in self.pg_interfaces:
499 if self.flows.__contains__(i):
500 pkts = self.create_stream(i, self.pg_if_packet_sizes,
501 traffic_type, ip_type, proto, ports,
506 # Enable packet capture and start packet sending
507 self.pg_enable_capture(self.pg_interfaces)
511 # Verify outgoing packet streams per packet-generator interface
512 for src_if in self.pg_interfaces:
513 if self.flows.__contains__(src_if):
514 for dst_if in self.flows[src_if]:
515 self.logger.info("Verifying capture on interface %s" %
517 capture = dst_if.get_capture(0)
518 self.assertEqual(len(capture), 0)
520 def test_0000_warmup_test(self):
521 """ ACL plugin version check; learn MACs
523 self.create_hosts(16)
524 self.run_traffic_no_check()
525 reply = self.vapi.papi.acl_plugin_get_version()
526 self.assertEqual(reply.major, 1)
527 self.logger.info("Working with ACL plugin version: %d.%d" % (
528 reply.major, reply.minor))
529 # minor version changes are non breaking
530 # self.assertEqual(reply.minor, 0)
532 def test_0001_acl_create(self):
533 """ ACL create/delete test
536 self.logger.info("ACLP_TEST_START_0001")
538 r = [{'is_permit': 1, 'is_ipv6': 0, 'proto': 17,
539 'srcport_or_icmptype_first': 1234,
540 'srcport_or_icmptype_last': 1235,
541 'src_ip_prefix_len': 0,
542 'src_ip_addr': '\x00\x00\x00\x00',
543 'dstport_or_icmpcode_first': 1234,
544 'dstport_or_icmpcode_last': 1234,
545 'dst_ip_addr': '\x00\x00\x00\x00',
546 'dst_ip_prefix_len': 0}]
547 # Test 1: add a new ACL
548 reply = self.vapi.acl_add_replace(acl_index=4294967295, r=r,
550 self.assertEqual(reply.retval, 0)
551 # The very first ACL gets #0
552 self.assertEqual(reply.acl_index, 0)
553 first_acl = reply.acl_index
554 rr = self.vapi.acl_dump(reply.acl_index)
555 self.logger.info("Dumped ACL: " + str(rr))
556 self.assertEqual(len(rr), 1)
557 # We should have the same number of ACL entries as we had asked
558 self.assertEqual(len(rr[0].r), len(r))
559 # The rules should be the same. But because the submitted and returned
560 # are different types, we need to iterate over rules and keys to get
562 for i_rule in range(0, len(r) - 1):
563 for rule_key in r[i_rule]:
564 self.assertEqual(rr[0].r[i_rule][rule_key],
567 # Add a deny-1234 ACL
568 r_deny = [{'is_permit': 0, 'is_ipv6': 0, 'proto': 17,
569 'srcport_or_icmptype_first': 1234,
570 'srcport_or_icmptype_last': 1235,
571 'src_ip_prefix_len': 0,
572 'src_ip_addr': '\x00\x00\x00\x00',
573 'dstport_or_icmpcode_first': 1234,
574 'dstport_or_icmpcode_last': 1234,
575 'dst_ip_addr': '\x00\x00\x00\x00',
576 'dst_ip_prefix_len': 0},
577 {'is_permit': 1, 'is_ipv6': 0, 'proto': 17,
578 'srcport_or_icmptype_first': 0,
579 'srcport_or_icmptype_last': 0,
580 'src_ip_prefix_len': 0,
581 'src_ip_addr': '\x00\x00\x00\x00',
582 'dstport_or_icmpcode_first': 0,
583 'dstport_or_icmpcode_last': 0,
584 'dst_ip_addr': '\x00\x00\x00\x00',
585 'dst_ip_prefix_len': 0}]
587 reply = self.vapi.acl_add_replace(acl_index=4294967295, r=r_deny,
588 tag="deny 1234;permit all")
589 self.assertEqual(reply.retval, 0)
590 # The second ACL gets #1
591 self.assertEqual(reply.acl_index, 1)
592 second_acl = reply.acl_index
594 # Test 2: try to modify a nonexistent ACL
595 reply = self.vapi.acl_add_replace(acl_index=432, r=r,
596 tag="FFFF:FFFF", expected_retval=-6)
597 self.assertEqual(reply.retval, -6)
598 # The ACL number should pass through
599 self.assertEqual(reply.acl_index, 432)
600 # apply an ACL on an interface inbound, try to delete ACL, must fail
601 self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
604 reply = self.vapi.acl_del(acl_index=first_acl, expected_retval=-142)
605 # Unapply an ACL and then try to delete it - must be ok
606 self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
609 reply = self.vapi.acl_del(acl_index=first_acl, expected_retval=0)
611 # apply an ACL on an interface outbound, try to delete ACL, must fail
612 self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
615 reply = self.vapi.acl_del(acl_index=second_acl, expected_retval=-143)
616 # Unapply the ACL and then try to delete it - must be ok
617 self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
620 reply = self.vapi.acl_del(acl_index=second_acl, expected_retval=0)
622 # try to apply a nonexistent ACL - must fail
623 self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
628 self.logger.info("ACLP_TEST_FINISH_0001")
630 def test_0002_acl_permit_apply(self):
631 """ permit ACL apply test
633 self.logger.info("ACLP_TEST_START_0002")
636 rules.append(self.create_rule(self.IPV4, self.PERMIT,
637 0, self.proto[self.IP][self.UDP]))
638 rules.append(self.create_rule(self.IPV4, self.PERMIT,
639 0, self.proto[self.IP][self.TCP]))
642 self.apply_rules(rules, "permit per-flow")
644 # Traffic should still pass
645 self.run_verify_test(self.IP, self.IPV4, -1)
646 self.logger.info("ACLP_TEST_FINISH_0002")
648 def test_0003_acl_deny_apply(self):
649 """ deny ACL apply test
651 self.logger.info("ACLP_TEST_START_0003")
652 # Add a deny-flows ACL
654 rules.append(self.create_rule(self.IPV4, self.DENY,
655 self.PORTS_ALL, self.proto[self.IP][self.UDP]))
656 # Permit ip any any in the end
657 rules.append(self.create_rule(self.IPV4, self.PERMIT,
661 self.apply_rules(rules, "deny per-flow;permit all")
663 # Traffic should not pass
664 self.run_verify_negat_test(self.IP, self.IPV4,
665 self.proto[self.IP][self.UDP])
666 self.logger.info("ACLP_TEST_FINISH_0003")
667 # self.assertEqual(1, 0)
669 def test_0004_vpp624_permit_icmpv4(self):
670 """ VPP_624 permit ICMPv4
672 self.logger.info("ACLP_TEST_START_0004")
676 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
677 self.proto[self.ICMP][self.ICMPv4]))
678 # deny ip any any in the end
679 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
682 self.apply_rules(rules, "permit icmpv4")
684 # Traffic should still pass
685 self.run_verify_test(self.ICMP, self.IPV4,
686 self.proto[self.ICMP][self.ICMPv4])
688 self.logger.info("ACLP_TEST_FINISH_0004")
690 def test_0005_vpp624_permit_icmpv6(self):
691 """ VPP_624 permit ICMPv6
693 self.logger.info("ACLP_TEST_START_0005")
697 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
698 self.proto[self.ICMP][self.ICMPv6]))
699 # deny ip any any in the end
700 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
703 self.apply_rules(rules, "permit icmpv6")
705 # Traffic should still pass
706 self.run_verify_test(self.ICMP, self.IPV6,
707 self.proto[self.ICMP][self.ICMPv6])
709 self.logger.info("ACLP_TEST_FINISH_0005")
711 def test_0006_vpp624_deny_icmpv4(self):
712 """ VPP_624 deny ICMPv4
714 self.logger.info("ACLP_TEST_START_0006")
717 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
718 self.proto[self.ICMP][self.ICMPv4]))
719 # permit ip any any in the end
720 rules.append(self.create_rule(self.IPV4, self.PERMIT,
724 self.apply_rules(rules, "deny icmpv4")
726 # Traffic should not pass
727 self.run_verify_negat_test(self.ICMP, self.IPV4, 0)
729 self.logger.info("ACLP_TEST_FINISH_0006")
731 def test_0007_vpp624_deny_icmpv6(self):
732 """ VPP_624 deny ICMPv6
734 self.logger.info("ACLP_TEST_START_0007")
737 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
738 self.proto[self.ICMP][self.ICMPv6]))
739 # deny ip any any in the end
740 rules.append(self.create_rule(self.IPV6, self.PERMIT,
744 self.apply_rules(rules, "deny icmpv6")
746 # Traffic should not pass
747 self.run_verify_negat_test(self.ICMP, self.IPV6, 0)
749 self.logger.info("ACLP_TEST_FINISH_0007")
751 def test_0008_tcp_permit_v4(self):
754 self.logger.info("ACLP_TEST_START_0008")
758 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
759 self.proto[self.IP][self.TCP]))
760 # deny ip any any in the end
761 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
764 self.apply_rules(rules, "permit ipv4 tcp")
766 # Traffic should still pass
767 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP])
769 self.logger.info("ACLP_TEST_FINISH_0008")
771 def test_0009_tcp_permit_v6(self):
774 self.logger.info("ACLP_TEST_START_0009")
778 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
779 self.proto[self.IP][self.TCP]))
780 # deny ip any any in the end
781 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
784 self.apply_rules(rules, "permit ip6 tcp")
786 # Traffic should still pass
787 self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.TCP])
789 self.logger.info("ACLP_TEST_FINISH_0008")
791 def test_0010_udp_permit_v4(self):
794 self.logger.info("ACLP_TEST_START_0010")
798 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
799 self.proto[self.IP][self.UDP]))
800 # deny ip any any in the end
801 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
804 self.apply_rules(rules, "permit ipv udp")
806 # Traffic should still pass
807 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP])
809 self.logger.info("ACLP_TEST_FINISH_0010")
811 def test_0011_udp_permit_v6(self):
814 self.logger.info("ACLP_TEST_START_0011")
818 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
819 self.proto[self.IP][self.UDP]))
820 # deny ip any any in the end
821 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
824 self.apply_rules(rules, "permit ip6 udp")
826 # Traffic should still pass
827 self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.UDP])
829 self.logger.info("ACLP_TEST_FINISH_0011")
831 def test_0012_tcp_deny(self):
834 self.logger.info("ACLP_TEST_START_0012")
838 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
839 self.proto[self.IP][self.TCP]))
840 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
841 self.proto[self.IP][self.TCP]))
842 # permit ip any any in the end
843 rules.append(self.create_rule(self.IPV4, self.PERMIT,
845 rules.append(self.create_rule(self.IPV6, self.PERMIT,
849 self.apply_rules(rules, "deny ip4/ip6 tcp")
851 # Traffic should not pass
852 self.run_verify_negat_test(self.IP, self.IPRANDOM,
853 self.proto[self.IP][self.TCP])
855 self.logger.info("ACLP_TEST_FINISH_0012")
857 def test_0013_udp_deny(self):
860 self.logger.info("ACLP_TEST_START_0013")
864 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
865 self.proto[self.IP][self.UDP]))
866 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
867 self.proto[self.IP][self.UDP]))
868 # permit ip any any in the end
869 rules.append(self.create_rule(self.IPV4, self.PERMIT,
871 rules.append(self.create_rule(self.IPV6, self.PERMIT,
875 self.apply_rules(rules, "deny ip4/ip6 udp")
877 # Traffic should not pass
878 self.run_verify_negat_test(self.IP, self.IPRANDOM,
879 self.proto[self.IP][self.UDP])
881 self.logger.info("ACLP_TEST_FINISH_0013")
883 def test_0014_acl_dump(self):
884 """ verify add/dump acls
886 self.logger.info("ACLP_TEST_START_0014")
888 r = [[self.IPV4, self.PERMIT, 1234, self.proto[self.IP][self.TCP]],
889 [self.IPV4, self.PERMIT, 2345, self.proto[self.IP][self.UDP]],
890 [self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.TCP]],
891 [self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.UDP]],
892 [self.IPV4, self.PERMIT, 5, self.proto[self.ICMP][self.ICMPv4]],
893 [self.IPV6, self.PERMIT, 4321, self.proto[self.IP][self.TCP]],
894 [self.IPV6, self.PERMIT, 5432, self.proto[self.IP][self.UDP]],
895 [self.IPV6, self.PERMIT, 0, self.proto[self.IP][self.TCP]],
896 [self.IPV6, self.PERMIT, 0, self.proto[self.IP][self.UDP]],
897 [self.IPV6, self.PERMIT, 6, self.proto[self.ICMP][self.ICMPv6]],
898 [self.IPV4, self.DENY, self.PORTS_ALL, 0],
899 [self.IPV4, self.DENY, 1234, self.proto[self.IP][self.TCP]],
900 [self.IPV4, self.DENY, 2345, self.proto[self.IP][self.UDP]],
901 [self.IPV4, self.DENY, 5, self.proto[self.ICMP][self.ICMPv4]],
902 [self.IPV6, self.DENY, 4321, self.proto[self.IP][self.TCP]],
903 [self.IPV6, self.DENY, 5432, self.proto[self.IP][self.UDP]],
904 [self.IPV6, self.DENY, 6, self.proto[self.ICMP][self.ICMPv6]],
905 [self.IPV6, self.DENY, self.PORTS_ALL, 0]
908 # Add and verify new ACLs
910 for i in range(len(r)):
911 rules.append(self.create_rule(r[i][0], r[i][1], r[i][2], r[i][3]))
913 reply = self.vapi.acl_add_replace(acl_index=4294967295, r=rules)
914 result = self.vapi.acl_dump(reply.acl_index)
917 for drules in result:
919 self.assertEqual(dr.is_ipv6, r[i][0])
920 self.assertEqual(dr.is_permit, r[i][1])
921 self.assertEqual(dr.proto, r[i][3])
924 self.assertEqual(dr.srcport_or_icmptype_first, r[i][2])
927 self.assertEqual(dr.srcport_or_icmptype_first, 0)
928 self.assertEqual(dr.srcport_or_icmptype_last, 65535)
930 if dr.proto == self.proto[self.IP][self.TCP]:
931 self.assertGreater(dr.srcport_or_icmptype_first,
932 self.tcp_sport_from-1)
933 self.assertLess(dr.srcport_or_icmptype_first,
935 self.assertGreater(dr.dstport_or_icmpcode_last,
936 self.tcp_dport_from-1)
937 self.assertLess(dr.dstport_or_icmpcode_last,
939 elif dr.proto == self.proto[self.IP][self.UDP]:
940 self.assertGreater(dr.srcport_or_icmptype_first,
941 self.udp_sport_from-1)
942 self.assertLess(dr.srcport_or_icmptype_first,
944 self.assertGreater(dr.dstport_or_icmpcode_last,
945 self.udp_dport_from-1)
946 self.assertLess(dr.dstport_or_icmpcode_last,
950 self.logger.info("ACLP_TEST_FINISH_0014")
952 def test_0015_tcp_permit_port_v4(self):
953 """ permit single TCPv4
955 self.logger.info("ACLP_TEST_START_0015")
957 port = random.randint(0, 65535)
960 rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
961 self.proto[self.IP][self.TCP]))
962 # deny ip any any in the end
963 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
966 self.apply_rules(rules, "permit ip4 tcp "+str(port))
968 # Traffic should still pass
969 self.run_verify_test(self.IP, self.IPV4,
970 self.proto[self.IP][self.TCP], port)
972 self.logger.info("ACLP_TEST_FINISH_0015")
974 def test_0016_udp_permit_port_v4(self):
975 """ permit single UDPv4
977 self.logger.info("ACLP_TEST_START_0016")
979 port = random.randint(0, 65535)
982 rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
983 self.proto[self.IP][self.UDP]))
984 # deny ip any any in the end
985 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
988 self.apply_rules(rules, "permit ip4 tcp "+str(port))
990 # Traffic should still pass
991 self.run_verify_test(self.IP, self.IPV4,
992 self.proto[self.IP][self.UDP], port)
994 self.logger.info("ACLP_TEST_FINISH_0016")
996 def test_0017_tcp_permit_port_v6(self):
997 """ permit single TCPv6
999 self.logger.info("ACLP_TEST_START_0017")
1001 port = random.randint(0, 65535)
1004 rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
1005 self.proto[self.IP][self.TCP]))
1006 # deny ip any any in the end
1007 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1010 self.apply_rules(rules, "permit ip4 tcp "+str(port))
1012 # Traffic should still pass
1013 self.run_verify_test(self.IP, self.IPV6,
1014 self.proto[self.IP][self.TCP], port)
1016 self.logger.info("ACLP_TEST_FINISH_0017")
1018 def test_0018_udp_permit_port_v6(self):
1019 """ permit single UPPv6
1021 self.logger.info("ACLP_TEST_START_0018")
1023 port = random.randint(0, 65535)
1026 rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
1027 self.proto[self.IP][self.UDP]))
1028 # deny ip any any in the end
1029 rules.append(self.create_rule(self.IPV6, self.DENY,
1033 self.apply_rules(rules, "permit ip4 tcp "+str(port))
1035 # Traffic should still pass
1036 self.run_verify_test(self.IP, self.IPV6,
1037 self.proto[self.IP][self.UDP], port)
1039 self.logger.info("ACLP_TEST_FINISH_0018")
1041 def test_0019_udp_deny_port(self):
1042 """ deny single TCPv4/v6
1044 self.logger.info("ACLP_TEST_START_0019")
1046 port = random.randint(0, 65535)
1049 rules.append(self.create_rule(self.IPV4, self.DENY, port,
1050 self.proto[self.IP][self.TCP]))
1051 rules.append(self.create_rule(self.IPV6, self.DENY, port,
1052 self.proto[self.IP][self.TCP]))
1053 # Permit ip any any in the end
1054 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1056 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1060 self.apply_rules(rules, "deny ip4/ip6 udp "+str(port))
1062 # Traffic should not pass
1063 self.run_verify_negat_test(self.IP, self.IPRANDOM,
1064 self.proto[self.IP][self.TCP], port)
1066 self.logger.info("ACLP_TEST_FINISH_0019")
1068 def test_0020_udp_deny_port(self):
1069 """ deny single UDPv4/v6
1071 self.logger.info("ACLP_TEST_START_0020")
1073 port = random.randint(0, 65535)
1076 rules.append(self.create_rule(self.IPV4, self.DENY, port,
1077 self.proto[self.IP][self.UDP]))
1078 rules.append(self.create_rule(self.IPV6, self.DENY, port,
1079 self.proto[self.IP][self.UDP]))
1080 # Permit ip any any in the end
1081 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1083 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1087 self.apply_rules(rules, "deny ip4/ip6 udp "+str(port))
1089 # Traffic should not pass
1090 self.run_verify_negat_test(self.IP, self.IPRANDOM,
1091 self.proto[self.IP][self.UDP], port)
1093 self.logger.info("ACLP_TEST_FINISH_0020")
1095 def test_0021_udp_deny_port_verify_fragment_deny(self):
1096 """ deny single UDPv4/v6, permit ip any, verify non-initial fragment
1099 self.logger.info("ACLP_TEST_START_0021")
1101 port = random.randint(0, 65535)
1104 rules.append(self.create_rule(self.IPV4, self.DENY, port,
1105 self.proto[self.IP][self.UDP]))
1106 rules.append(self.create_rule(self.IPV6, self.DENY, port,
1107 self.proto[self.IP][self.UDP]))
1108 # deny ip any any in the end
1109 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1111 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1115 self.apply_rules(rules, "deny ip4/ip6 udp "+str(port))
1117 # Traffic should not pass
1118 self.run_verify_negat_test(self.IP, self.IPRANDOM,
1119 self.proto[self.IP][self.UDP], port, True)
1121 self.logger.info("ACLP_TEST_FINISH_0021")
1123 def test_0022_zero_length_udp_ipv4(self):
1124 """ VPP-687 zero length udp ipv4 packet"""
1125 self.logger.info("ACLP_TEST_START_0022")
1127 port = random.randint(0, 65535)
1130 rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
1131 self.proto[self.IP][self.UDP]))
1132 # deny ip any any in the end
1134 self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1137 self.apply_rules(rules, "permit empty udp ip4 " + str(port))
1139 # Traffic should still pass
1140 # Create incoming packet streams for packet-generator interfaces
1142 pkts = self.create_stream(self.pg0, self.pg_if_packet_sizes,
1144 self.proto[self.IP][self.UDP], port,
1147 self.pg0.add_stream(pkts)
1148 pkts_cnt += len(pkts)
1150 # Enable packet capture and start packet sendingself.IPV
1151 self.pg_enable_capture(self.pg_interfaces)
1154 self.pg1.get_capture(pkts_cnt)
1156 self.logger.info("ACLP_TEST_FINISH_0022")
1158 def test_0023_zero_length_udp_ipv6(self):
1159 """ VPP-687 zero length udp ipv6 packet"""
1160 self.logger.info("ACLP_TEST_START_0023")
1162 port = random.randint(0, 65535)
1165 rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
1166 self.proto[self.IP][self.UDP]))
1167 # deny ip any any in the end
1168 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1171 self.apply_rules(rules, "permit empty udp ip6 "+str(port))
1173 # Traffic should still pass
1174 # Create incoming packet streams for packet-generator interfaces
1176 pkts = self.create_stream(self.pg0, self.pg_if_packet_sizes,
1178 self.proto[self.IP][self.UDP], port,
1181 self.pg0.add_stream(pkts)
1182 pkts_cnt += len(pkts)
1184 # Enable packet capture and start packet sendingself.IPV
1185 self.pg_enable_capture(self.pg_interfaces)
1188 # Verify outgoing packet streams per packet-generator interface
1189 self.pg1.get_capture(pkts_cnt)
1191 self.logger.info("ACLP_TEST_FINISH_0023")
1193 def test_0108_tcp_permit_v4(self):
1194 """ permit TCPv4 + non-match range
1196 self.logger.info("ACLP_TEST_START_0108")
1200 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1201 self.proto[self.IP][self.TCP]))
1202 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1203 self.proto[self.IP][self.TCP]))
1204 # deny ip any any in the end
1205 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1208 self.apply_rules(rules, "permit ipv4 tcp")
1210 # Traffic should still pass
1211 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP])
1213 self.logger.info("ACLP_TEST_FINISH_0108")
1215 def test_0109_tcp_permit_v6(self):
1216 """ permit TCPv6 + non-match range
1218 self.logger.info("ACLP_TEST_START_0109")
1222 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE_2,
1223 self.proto[self.IP][self.TCP]))
1224 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
1225 self.proto[self.IP][self.TCP]))
1226 # deny ip any any in the end
1227 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1230 self.apply_rules(rules, "permit ip6 tcp")
1232 # Traffic should still pass
1233 self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.TCP])
1235 self.logger.info("ACLP_TEST_FINISH_0109")
1237 def test_0110_udp_permit_v4(self):
1238 """ permit UDPv4 + non-match range
1240 self.logger.info("ACLP_TEST_START_0110")
1244 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1245 self.proto[self.IP][self.UDP]))
1246 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1247 self.proto[self.IP][self.UDP]))
1248 # deny ip any any in the end
1249 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1252 self.apply_rules(rules, "permit ipv4 udp")
1254 # Traffic should still pass
1255 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP])
1257 self.logger.info("ACLP_TEST_FINISH_0110")
1259 def test_0111_udp_permit_v6(self):
1260 """ permit UDPv6 + non-match range
1262 self.logger.info("ACLP_TEST_START_0111")
1266 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE_2,
1267 self.proto[self.IP][self.UDP]))
1268 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
1269 self.proto[self.IP][self.UDP]))
1270 # deny ip any any in the end
1271 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1274 self.apply_rules(rules, "permit ip6 udp")
1276 # Traffic should still pass
1277 self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.UDP])
1279 self.logger.info("ACLP_TEST_FINISH_0111")
1281 def test_0112_tcp_deny(self):
1282 """ deny TCPv4/v6 + non-match range
1284 self.logger.info("ACLP_TEST_START_0112")
1288 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1290 self.proto[self.IP][self.TCP]))
1291 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1293 self.proto[self.IP][self.TCP]))
1294 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
1295 self.proto[self.IP][self.TCP]))
1296 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
1297 self.proto[self.IP][self.TCP]))
1298 # permit ip any any in the end
1299 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1301 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1305 self.apply_rules(rules, "deny ip4/ip6 tcp")
1307 # Traffic should not pass
1308 self.run_verify_negat_test(self.IP, self.IPRANDOM,
1309 self.proto[self.IP][self.TCP])
1311 self.logger.info("ACLP_TEST_FINISH_0112")
1313 def test_0113_udp_deny(self):
1314 """ deny UDPv4/v6 + non-match range
1316 self.logger.info("ACLP_TEST_START_0113")
1320 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1322 self.proto[self.IP][self.UDP]))
1323 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1325 self.proto[self.IP][self.UDP]))
1326 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
1327 self.proto[self.IP][self.UDP]))
1328 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
1329 self.proto[self.IP][self.UDP]))
1330 # permit ip any any in the end
1331 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1333 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1337 self.apply_rules(rules, "deny ip4/ip6 udp")
1339 # Traffic should not pass
1340 self.run_verify_negat_test(self.IP, self.IPRANDOM,
1341 self.proto[self.IP][self.UDP])
1343 self.logger.info("ACLP_TEST_FINISH_0113")
1345 def test_0300_tcp_permit_v4_etype_aaaa(self):
1346 """ permit TCPv4, send 0xAAAA etype
1348 self.logger.info("ACLP_TEST_START_0300")
1352 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1353 self.proto[self.IP][self.TCP]))
1354 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1355 self.proto[self.IP][self.TCP]))
1356 # deny ip any any in the end
1357 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1360 self.apply_rules(rules, "permit ipv4 tcp")
1362 # Traffic should still pass
1363 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP])
1365 # Traffic should still pass also for an odd ethertype
1366 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP],
1367 0, False, True, 0xaaaa)
1369 self.logger.info("ACLP_TEST_FINISH_0300")
1371 def test_0305_tcp_permit_v4_etype_blacklist_aaaa(self):
1372 """ permit TCPv4, whitelist 0x0BBB ethertype, send 0xAAAA, 0x0BBB
1374 self.logger.info("ACLP_TEST_START_0305")
1378 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1379 self.proto[self.IP][self.TCP]))
1380 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1381 self.proto[self.IP][self.TCP]))
1382 # deny ip any any in the end
1383 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1386 self.apply_rules(rules, "permit ipv4 tcp")
1388 # whitelist the 0xbbbb etype - so the 0xaaaa should be blocked
1389 self.etype_whitelist([0xbbb], 1)
1391 # The IPv4 traffic should still pass
1392 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP])
1394 # The oddball ethertype should be blocked
1395 self.run_verify_negat_test(self.IP, self.IPV4,
1396 self.proto[self.IP][self.TCP],
1399 # The whitelisted traffic, on the other hand, should pass
1400 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP],
1401 0, False, True, 0x0bbb)
1403 # remove the whitelist, the previously blocked 0xAAAA should pass now
1404 self.etype_whitelist([], 0)
1405 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP],
1406 0, False, True, 0xaaaa)
1408 self.logger.info("ACLP_TEST_FINISH_0305")
1410 def test_0315_del_intf(self):
1411 """ apply an acl and delete the interface
1413 self.logger.info("ACLP_TEST_START_0315")
1417 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1418 self.proto[self.IP][self.TCP]))
1419 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1420 self.proto[self.IP][self.TCP]))
1421 # deny ip any any in the end
1422 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1424 # create an interface
1426 intf.append(VppLoInterface(self))
1429 self.apply_rules_to(rules, "permit ipv4 tcp", intf[0].sw_if_index)
1431 # Remove the interface
1432 intf[0].remove_vpp_config()
1434 self.logger.info("ACLP_TEST_FINISH_0315")
1436 if __name__ == '__main__':
1437 unittest.main(testRunner=VppTestRunner)