hs-test: fixed timed out tests passing in the CI
[vpp.git] / test / test_acl_plugin.py
1 #!/usr/bin/env python3
2 """ACL plugin Test Case HLD:
3 """
4
5 import unittest
6 import random
7
8 from config import config
9 from scapy.packet import Raw
10 from scapy.layers.l2 import Ether
11 from scapy.layers.inet import IP, TCP, UDP, ICMP
12 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest
13 from scapy.layers.inet6 import IPv6ExtHdrFragment
14 from framework import VppTestCase
15 from asfframework import VppTestRunner, tag_fixme_vpp_workers
16 from util import Host, ppp
17 from ipaddress import IPv4Network, IPv6Network
18
19 from vpp_lo_interface import VppLoInterface
20 from vpp_acl import AclRule, VppAcl, VppAclInterface, VppEtypeWhitelist
21 from vpp_ip import INVALID_INDEX
22
23
24 @unittest.skipIf("acl" in config.excluded_plugins, "Exclude ACL plugin tests")
25 @tag_fixme_vpp_workers
26 class TestACLplugin(VppTestCase):
27     """ACL plugin Test Case"""
28
29     # traffic types
30     IP = 0
31     ICMP = 1
32
33     # IP version
34     IPRANDOM = -1
35     IPV4 = 0
36     IPV6 = 1
37
38     # rule types
39     DENY = 0
40     PERMIT = 1
41
42     # supported protocols
43     proto = [[6, 17], [1, 58]]
44     proto_map = {1: "ICMP", 58: "ICMPv6EchoRequest", 6: "TCP", 17: "UDP"}
45     ICMPv4 = 0
46     ICMPv6 = 1
47     TCP = 0
48     UDP = 1
49     PROTO_ALL = 0
50
51     # port ranges
52     PORTS_ALL = -1
53     PORTS_RANGE = 0
54     PORTS_RANGE_2 = 1
55     udp_sport_from = 10
56     udp_sport_to = udp_sport_from + 5
57     udp_dport_from = 20000
58     udp_dport_to = udp_dport_from + 5000
59     tcp_sport_from = 30
60     tcp_sport_to = tcp_sport_from + 5
61     tcp_dport_from = 40000
62     tcp_dport_to = tcp_dport_from + 5000
63
64     udp_sport_from_2 = 90
65     udp_sport_to_2 = udp_sport_from_2 + 5
66     udp_dport_from_2 = 30000
67     udp_dport_to_2 = udp_dport_from_2 + 5000
68     tcp_sport_from_2 = 130
69     tcp_sport_to_2 = tcp_sport_from_2 + 5
70     tcp_dport_from_2 = 20000
71     tcp_dport_to_2 = tcp_dport_from_2 + 5000
72
73     icmp4_type = 8  # echo request
74     icmp4_code = 3
75     icmp6_type = 128  # echo request
76     icmp6_code = 3
77
78     icmp4_type_2 = 8
79     icmp4_code_from_2 = 5
80     icmp4_code_to_2 = 20
81     icmp6_type_2 = 128
82     icmp6_code_from_2 = 8
83     icmp6_code_to_2 = 42
84
85     # Test variables
86     bd_id = 1
87
88     @classmethod
89     def setUpClass(cls):
90         """
91         Perform standard class setup (defined by class method setUpClass in
92         class VppTestCase) before running the test case, set test case related
93         variables and configure VPP.
94         """
95         super(TestACLplugin, cls).setUpClass()
96
97         try:
98             # Create 2 pg interfaces
99             cls.create_pg_interfaces(range(2))
100
101             # Packet flows mapping pg0 -> pg1, pg2 etc.
102             cls.flows = dict()
103             cls.flows[cls.pg0] = [cls.pg1]
104
105             # Packet sizes
106             cls.pg_if_packet_sizes = [64, 512, 1518, 9018]
107
108             # Create BD with MAC learning and unknown unicast flooding disabled
109             # and put interfaces to this BD
110             cls.vapi.bridge_domain_add_del_v2(
111                 bd_id=cls.bd_id, uu_flood=1, learn=1, flood=1, forward=1, is_add=1
112             )
113             for pg_if in cls.pg_interfaces:
114                 cls.vapi.sw_interface_set_l2_bridge(
115                     rx_sw_if_index=pg_if.sw_if_index, bd_id=cls.bd_id
116                 )
117
118             # Set up all interfaces
119             for i in cls.pg_interfaces:
120                 i.admin_up()
121
122             # Mapping between packet-generator index and lists of test hosts
123             cls.hosts_by_pg_idx = dict()
124             for pg_if in cls.pg_interfaces:
125                 cls.hosts_by_pg_idx[pg_if.sw_if_index] = []
126
127             # Create list of deleted hosts
128             cls.deleted_hosts_by_pg_idx = dict()
129             for pg_if in cls.pg_interfaces:
130                 cls.deleted_hosts_by_pg_idx[pg_if.sw_if_index] = []
131
132             # warm-up the mac address tables
133             # self.warmup_test()
134             count = 16
135             start = 0
136             n_int = len(cls.pg_interfaces)
137             macs_per_if = count // n_int
138             i = -1
139             for pg_if in cls.pg_interfaces:
140                 i += 1
141                 start_nr = macs_per_if * i + start
142                 end_nr = (
143                     count + start if i == (n_int - 1) else macs_per_if * (i + 1) + start
144                 )
145                 hosts = cls.hosts_by_pg_idx[pg_if.sw_if_index]
146                 for j in range(int(start_nr), int(end_nr)):
147                     host = Host(
148                         "00:00:00:ff:%02x:%02x" % (pg_if.sw_if_index, j),
149                         "172.17.1%02x.%u" % (pg_if.sw_if_index, j),
150                         "2017:dead:%02x::%u" % (pg_if.sw_if_index, j),
151                     )
152                     hosts.append(host)
153
154         except Exception:
155             super(TestACLplugin, cls).tearDownClass()
156             raise
157
158     @classmethod
159     def tearDownClass(cls):
160         super(TestACLplugin, cls).tearDownClass()
161
162     def setUp(self):
163         super(TestACLplugin, self).setUp()
164         self.reset_packet_infos()
165
166     def tearDown(self):
167         """
168         Show various debug prints after each test.
169         """
170         super(TestACLplugin, self).tearDown()
171
172     def show_commands_at_teardown(self):
173         cli = "show vlib graph l2-input-feat-arc"
174         self.logger.info(self.vapi.ppcli(cli))
175         cli = "show vlib graph l2-input-feat-arc-end"
176         self.logger.info(self.vapi.ppcli(cli))
177         cli = "show vlib graph l2-output-feat-arc"
178         self.logger.info(self.vapi.ppcli(cli))
179         cli = "show vlib graph l2-output-feat-arc-end"
180         self.logger.info(self.vapi.ppcli(cli))
181         self.logger.info(self.vapi.ppcli("show l2fib verbose"))
182         self.logger.info(self.vapi.ppcli("show acl-plugin acl"))
183         self.logger.info(self.vapi.ppcli("show acl-plugin interface"))
184         self.logger.info(self.vapi.ppcli("show acl-plugin tables"))
185         self.logger.info(self.vapi.ppcli("show bridge-domain %s detail" % self.bd_id))
186
187     def create_rule(
188         self,
189         ip=0,
190         permit_deny=0,
191         ports=PORTS_ALL,
192         proto=-1,
193         s_prefix=0,
194         s_ip=0,
195         d_prefix=0,
196         d_ip=0,
197     ):
198         if ip:
199             src_prefix = IPv6Network((s_ip, s_prefix))
200             dst_prefix = IPv6Network((d_ip, d_prefix))
201         else:
202             src_prefix = IPv4Network((s_ip, s_prefix))
203             dst_prefix = IPv4Network((d_ip, d_prefix))
204         return AclRule(
205             is_permit=permit_deny,
206             ports=ports,
207             proto=proto,
208             src_prefix=src_prefix,
209             dst_prefix=dst_prefix,
210         )
211
212     def apply_rules(self, rules, tag=None):
213         acl = VppAcl(self, rules, tag=tag)
214         acl.add_vpp_config()
215         self.logger.info("Dumped ACL: " + str(acl.dump()))
216         # Apply a ACL on the interface as inbound
217         for i in self.pg_interfaces:
218             acl_if = VppAclInterface(
219                 self, sw_if_index=i.sw_if_index, n_input=1, acls=[acl]
220             )
221             acl_if.add_vpp_config()
222         return acl.acl_index
223
224     def apply_rules_to(self, rules, tag=None, sw_if_index=INVALID_INDEX):
225         acl = VppAcl(self, rules, tag=tag)
226         acl.add_vpp_config()
227         self.logger.info("Dumped ACL: " + str(acl.dump()))
228         # Apply a ACL on the interface as inbound
229         acl_if = VppAclInterface(self, sw_if_index=sw_if_index, n_input=1, acls=[acl])
230         return acl.acl_index
231
232     def etype_whitelist(self, whitelist, n_input, add=True):
233         # Apply whitelists on all the interfaces
234         if add:
235             self._wl = []
236             for i in self.pg_interfaces:
237                 self._wl.append(
238                     VppEtypeWhitelist(
239                         self,
240                         sw_if_index=i.sw_if_index,
241                         whitelist=whitelist,
242                         n_input=n_input,
243                     ).add_vpp_config()
244                 )
245         else:
246             if hasattr(self, "_wl"):
247                 for wl in self._wl:
248                     wl.remove_vpp_config()
249
250     def create_upper_layer(self, packet_index, proto, ports=0):
251         p = self.proto_map[proto]
252         if p == "UDP":
253             if ports == 0:
254                 return UDP(
255                     sport=random.randint(self.udp_sport_from, self.udp_sport_to),
256                     dport=random.randint(self.udp_dport_from, self.udp_dport_to),
257                 )
258             else:
259                 return UDP(sport=ports, dport=ports)
260         elif p == "TCP":
261             if ports == 0:
262                 return TCP(
263                     sport=random.randint(self.tcp_sport_from, self.tcp_sport_to),
264                     dport=random.randint(self.tcp_dport_from, self.tcp_dport_to),
265                 )
266             else:
267                 return TCP(sport=ports, dport=ports)
268         return ""
269
270     def create_stream(
271         self,
272         src_if,
273         packet_sizes,
274         traffic_type=0,
275         ipv6=0,
276         proto=-1,
277         ports=0,
278         fragments=False,
279         pkt_raw=True,
280         etype=-1,
281     ):
282         """
283         Create input packet stream for defined interface using hosts or
284         deleted_hosts list.
285
286         :param object src_if: Interface to create packet stream for.
287         :param list packet_sizes: List of required packet sizes.
288         :param traffic_type: 1: ICMP packet, 2: IPv6 with EH, 0: otherwise.
289         :return: Stream of packets.
290         """
291         pkts = []
292         if self.flows.__contains__(src_if):
293             src_hosts = self.hosts_by_pg_idx[src_if.sw_if_index]
294             for dst_if in self.flows[src_if]:
295                 dst_hosts = self.hosts_by_pg_idx[dst_if.sw_if_index]
296                 n_int = len(dst_hosts) * len(src_hosts)
297                 for i in range(0, n_int):
298                     dst_host = dst_hosts[int(i / len(src_hosts))]
299                     src_host = src_hosts[i % len(src_hosts)]
300                     pkt_info = self.create_packet_info(src_if, dst_if)
301                     if ipv6 == 1:
302                         pkt_info.ip = 1
303                     elif ipv6 == 0:
304                         pkt_info.ip = 0
305                     else:
306                         pkt_info.ip = random.choice([0, 1])
307                     if proto == -1:
308                         pkt_info.proto = random.choice(self.proto[self.IP])
309                     else:
310                         pkt_info.proto = proto
311                     payload = self.info_to_payload(pkt_info)
312                     p = Ether(dst=dst_host.mac, src=src_host.mac)
313                     if etype > 0:
314                         p = Ether(dst=dst_host.mac, src=src_host.mac, type=etype)
315                     if pkt_info.ip:
316                         p /= IPv6(dst=dst_host.ip6, src=src_host.ip6)
317                         if fragments:
318                             p /= IPv6ExtHdrFragment(offset=64, m=1)
319                     else:
320                         if fragments:
321                             p /= IP(
322                                 src=src_host.ip4, dst=dst_host.ip4, flags=1, frag=64
323                             )
324                         else:
325                             p /= IP(src=src_host.ip4, dst=dst_host.ip4)
326                     if traffic_type == self.ICMP:
327                         if pkt_info.ip:
328                             p /= ICMPv6EchoRequest(
329                                 type=self.icmp6_type, code=self.icmp6_code
330                             )
331                         else:
332                             p /= ICMP(type=self.icmp4_type, code=self.icmp4_code)
333                     else:
334                         p /= self.create_upper_layer(i, pkt_info.proto, ports)
335                     if pkt_raw:
336                         p /= Raw(payload)
337                         pkt_info.data = p.copy()
338                     if pkt_raw:
339                         size = random.choice(packet_sizes)
340                         self.extend_packet(p, size)
341                     pkts.append(p)
342         return pkts
343
344     def verify_capture(self, pg_if, capture, traffic_type=0, ip_type=0, etype=-1):
345         """
346         Verify captured input packet stream for defined interface.
347
348         :param object pg_if: Interface to verify captured packet stream for.
349         :param list capture: Captured packet stream.
350         :param traffic_type: 1: ICMP packet, 2: IPv6 with EH, 0: otherwise.
351         """
352         last_info = dict()
353         for i in self.pg_interfaces:
354             last_info[i.sw_if_index] = None
355         dst_sw_if_index = pg_if.sw_if_index
356         for packet in capture:
357             if etype > 0:
358                 if packet[Ether].type != etype:
359                     self.logger.error(ppp("Unexpected ethertype in packet:", packet))
360                 else:
361                     continue
362             try:
363                 # Raw data for ICMPv6 are stored in ICMPv6EchoRequest.data
364                 if traffic_type == self.ICMP and ip_type == self.IPV6:
365                     payload_info = self.payload_to_info(
366                         packet[ICMPv6EchoRequest], "data"
367                     )
368                     payload = packet[ICMPv6EchoRequest]
369                 else:
370                     payload_info = self.payload_to_info(packet[Raw])
371                     payload = packet[self.proto_map[payload_info.proto]]
372             except:
373                 self.logger.error(
374                     ppp("Unexpected or invalid packet (outside network):", packet)
375                 )
376                 raise
377
378             if ip_type != 0:
379                 self.assertEqual(payload_info.ip, ip_type)
380             if traffic_type == self.ICMP:
381                 try:
382                     if payload_info.ip == 0:
383                         self.assertEqual(payload.type, self.icmp4_type)
384                         self.assertEqual(payload.code, self.icmp4_code)
385                     else:
386                         self.assertEqual(payload.type, self.icmp6_type)
387                         self.assertEqual(payload.code, self.icmp6_code)
388                 except:
389                     self.logger.error(
390                         ppp("Unexpected or invalid packet (outside network):", packet)
391                     )
392                     raise
393             else:
394                 try:
395                     ip_version = IPv6 if payload_info.ip == 1 else IP
396
397                     ip = packet[ip_version]
398                     packet_index = payload_info.index
399
400                     self.assertEqual(payload_info.dst, dst_sw_if_index)
401                     self.logger.debug(
402                         "Got packet on port %s: src=%u (id=%u)"
403                         % (pg_if.name, payload_info.src, packet_index)
404                     )
405                     next_info = self.get_next_packet_info_for_interface2(
406                         payload_info.src, dst_sw_if_index, last_info[payload_info.src]
407                     )
408                     last_info[payload_info.src] = next_info
409                     self.assertTrue(next_info is not None)
410                     self.assertEqual(packet_index, next_info.index)
411                     saved_packet = next_info.data
412                     # Check standard fields
413                     self.assertEqual(ip.src, saved_packet[ip_version].src)
414                     self.assertEqual(ip.dst, saved_packet[ip_version].dst)
415                     p = self.proto_map[payload_info.proto]
416                     if p == "TCP":
417                         tcp = packet[TCP]
418                         self.assertEqual(tcp.sport, saved_packet[TCP].sport)
419                         self.assertEqual(tcp.dport, saved_packet[TCP].dport)
420                     elif p == "UDP":
421                         udp = packet[UDP]
422                         self.assertEqual(udp.sport, saved_packet[UDP].sport)
423                         self.assertEqual(udp.dport, saved_packet[UDP].dport)
424                 except:
425                     self.logger.error(ppp("Unexpected or invalid packet:", packet))
426                     raise
427         for i in self.pg_interfaces:
428             remaining_packet = self.get_next_packet_info_for_interface2(
429                 i, dst_sw_if_index, last_info[i.sw_if_index]
430             )
431             self.assertTrue(
432                 remaining_packet is None,
433                 "Port %u: Packet expected from source %u didn't arrive"
434                 % (dst_sw_if_index, i.sw_if_index),
435             )
436
437     def run_traffic_no_check(self):
438         # Test
439         # Create incoming packet streams for packet-generator interfaces
440         for i in self.pg_interfaces:
441             if self.flows.__contains__(i):
442                 pkts = self.create_stream(i, self.pg_if_packet_sizes)
443                 if len(pkts) > 0:
444                     i.add_stream(pkts)
445
446         # Enable packet capture and start packet sending
447         self.pg_enable_capture(self.pg_interfaces)
448         self.pg_start()
449
450     def run_verify_test(
451         self,
452         traffic_type=0,
453         ip_type=0,
454         proto=-1,
455         ports=0,
456         frags=False,
457         pkt_raw=True,
458         etype=-1,
459     ):
460         # Test
461         # Create incoming packet streams for packet-generator interfaces
462         pkts_cnt = 0
463         for i in self.pg_interfaces:
464             if self.flows.__contains__(i):
465                 pkts = self.create_stream(
466                     i,
467                     self.pg_if_packet_sizes,
468                     traffic_type,
469                     ip_type,
470                     proto,
471                     ports,
472                     frags,
473                     pkt_raw,
474                     etype,
475                 )
476                 if len(pkts) > 0:
477                     i.add_stream(pkts)
478                     pkts_cnt += len(pkts)
479
480         # Enable packet capture and start packet sendingself.IPV
481         self.pg_enable_capture(self.pg_interfaces)
482         self.pg_start()
483         self.logger.info("sent packets count: %d" % pkts_cnt)
484
485         # Verify
486         # Verify outgoing packet streams per packet-generator interface
487         for src_if in self.pg_interfaces:
488             if self.flows.__contains__(src_if):
489                 for dst_if in self.flows[src_if]:
490                     capture = dst_if.get_capture(pkts_cnt)
491                     self.logger.info("Verifying capture on interface %s" % dst_if.name)
492                     self.verify_capture(dst_if, capture, traffic_type, ip_type, etype)
493
494     def run_verify_negat_test(
495         self, traffic_type=0, ip_type=0, proto=-1, ports=0, frags=False, etype=-1
496     ):
497         # Test
498         pkts_cnt = 0
499         self.reset_packet_infos()
500         for i in self.pg_interfaces:
501             if self.flows.__contains__(i):
502                 pkts = self.create_stream(
503                     i,
504                     self.pg_if_packet_sizes,
505                     traffic_type,
506                     ip_type,
507                     proto,
508                     ports,
509                     frags,
510                     True,
511                     etype,
512                 )
513                 if len(pkts) > 0:
514                     i.add_stream(pkts)
515                     pkts_cnt += len(pkts)
516
517         # Enable packet capture and start packet sending
518         self.pg_enable_capture(self.pg_interfaces)
519         self.pg_start()
520         self.logger.info("sent packets count: %d" % pkts_cnt)
521
522         # Verify
523         # Verify outgoing packet streams per packet-generator interface
524         for src_if in self.pg_interfaces:
525             if self.flows.__contains__(src_if):
526                 for dst_if in self.flows[src_if]:
527                     self.logger.info("Verifying capture on interface %s" % dst_if.name)
528                     capture = dst_if.get_capture(0)
529                     self.assertEqual(len(capture), 0)
530
531     def test_0000_warmup_test(self):
532         """ACL plugin version check; learn MACs"""
533         reply = self.vapi.papi.acl_plugin_get_version()
534         self.assertEqual(reply.major, 1)
535         self.logger.info(
536             "Working with ACL plugin version: %d.%d" % (reply.major, reply.minor)
537         )
538         # minor version changes are non breaking
539         # self.assertEqual(reply.minor, 0)
540
541     def test_0001_acl_create(self):
542         """ACL create/delete test"""
543
544         self.logger.info("ACLP_TEST_START_0001")
545         # Create a permit-1234 ACL
546         r = [AclRule(is_permit=1, proto=17, ports=1234, sport_to=1235)]
547         # Test 1: add a new ACL
548         first_acl = VppAcl(self, rules=r, tag="permit 1234")
549         first_acl.add_vpp_config()
550         self.assertTrue(first_acl.query_vpp_config())
551         # The very first ACL gets #0
552         self.assertEqual(first_acl.acl_index, 0)
553         rr = first_acl.dump()
554         self.logger.info("Dumped ACL: " + str(rr))
555         self.assertEqual(len(rr), 1)
556         # We should have the same number of ACL entries as we had asked
557         self.assertEqual(len(rr[0].r), len(r))
558         # The rules should be the same. But because the submitted and returned
559         # are different types, we need to iterate over rules and keys to get
560         # to basic values.
561         for i_rule in range(0, len(r) - 1):
562             encoded_rule = r[i_rule].encode()
563             for rule_key in encoded_rule:
564                 self.assertEqual(rr[0].r[i_rule][rule_key], encoded_rule[rule_key])
565
566         # Create a deny-1234 ACL
567         r_deny = [
568             AclRule(is_permit=0, proto=17, ports=1234, sport_to=1235),
569             AclRule(is_permit=1, proto=17, ports=0),
570         ]
571         second_acl = VppAcl(self, rules=r_deny, tag="deny 1234;permit all")
572         second_acl.add_vpp_config()
573         self.assertTrue(second_acl.query_vpp_config())
574         # The second ACL gets #1
575         self.assertEqual(second_acl.acl_index, 1)
576
577         # Test 2: try to modify a nonexistent ACL
578         invalid_acl = VppAcl(self, acl_index=432, rules=r, tag="FFFF:FFFF")
579         reply = invalid_acl.add_vpp_config(expect_error=True)
580
581         # apply an ACL on an interface inbound, try to delete ACL, must fail
582         acl_if_list = VppAclInterface(
583             self, sw_if_index=self.pg0.sw_if_index, n_input=1, acls=[first_acl]
584         )
585         acl_if_list.add_vpp_config()
586         first_acl.remove_vpp_config(expect_error=True)
587         # Unapply an ACL and then try to delete it - must be ok
588         acl_if_list.remove_vpp_config()
589         first_acl.remove_vpp_config()
590
591         # apply an ACL on an interface inbound, try to delete ACL, must fail
592         acl_if_list = VppAclInterface(
593             self, sw_if_index=self.pg0.sw_if_index, n_input=0, acls=[second_acl]
594         )
595         acl_if_list.add_vpp_config()
596         second_acl.remove_vpp_config(expect_error=True)
597         # Unapply an ACL and then try to delete it - must be ok
598         acl_if_list.remove_vpp_config()
599         second_acl.remove_vpp_config()
600
601         # try to apply a nonexistent ACL - must fail
602         acl_if_list = VppAclInterface(
603             self, sw_if_index=self.pg0.sw_if_index, n_input=0, acls=[invalid_acl]
604         )
605         acl_if_list.add_vpp_config(expect_error=True)
606
607         self.logger.info("ACLP_TEST_FINISH_0001")
608
609     def test_0002_acl_permit_apply(self):
610         """permit ACL apply test"""
611         self.logger.info("ACLP_TEST_START_0002")
612
613         rules = []
614         rules.append(
615             self.create_rule(self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.UDP])
616         )
617         rules.append(
618             self.create_rule(self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.TCP])
619         )
620
621         # Apply rules
622         acl_idx = self.apply_rules(rules, "permit per-flow")
623
624         # enable counters
625         reply = self.vapi.papi.acl_stats_intf_counters_enable(enable=1)
626
627         # Traffic should still pass
628         self.run_verify_test(self.IP, self.IPV4, -1)
629
630         matches = self.statistics.get_counter("/acl/%d/matches" % acl_idx)
631         self.logger.info("stat segment counters: %s" % repr(matches))
632         cli = "show acl-plugin acl"
633         self.logger.info(self.vapi.ppcli(cli))
634         cli = "show acl-plugin tables"
635         self.logger.info(self.vapi.ppcli(cli))
636
637         total_hits = matches[0][0]["packets"] + matches[0][1]["packets"]
638         self.assertEqual(total_hits, 64)
639
640         # disable counters
641         reply = self.vapi.papi.acl_stats_intf_counters_enable(enable=0)
642
643         self.logger.info("ACLP_TEST_FINISH_0002")
644
645     def test_0003_acl_deny_apply(self):
646         """deny ACL apply test"""
647         self.logger.info("ACLP_TEST_START_0003")
648         # Add a deny-flows ACL
649         rules = []
650         rules.append(
651             self.create_rule(
652                 self.IPV4, self.DENY, self.PORTS_ALL, self.proto[self.IP][self.UDP]
653             )
654         )
655         # Permit ip any any in the end
656         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_ALL, 0))
657
658         # Apply rules
659         acl_idx = self.apply_rules(rules, "deny per-flow;permit all")
660
661         # enable counters
662         reply = self.vapi.papi.acl_stats_intf_counters_enable(enable=1)
663
664         # Traffic should not pass
665         self.run_verify_negat_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP])
666
667         matches = self.statistics.get_counter("/acl/%d/matches" % acl_idx)
668         self.logger.info("stat segment counters: %s" % repr(matches))
669         cli = "show acl-plugin acl"
670         self.logger.info(self.vapi.ppcli(cli))
671         cli = "show acl-plugin tables"
672         self.logger.info(self.vapi.ppcli(cli))
673         self.assertEqual(matches[0][0]["packets"], 64)
674         # disable counters
675         reply = self.vapi.papi.acl_stats_intf_counters_enable(enable=0)
676         self.logger.info("ACLP_TEST_FINISH_0003")
677         # self.assertEqual(, 0)
678
679     def test_0004_vpp624_permit_icmpv4(self):
680         """VPP_624 permit ICMPv4"""
681         self.logger.info("ACLP_TEST_START_0004")
682
683         # Add an ACL
684         rules = []
685         rules.append(
686             self.create_rule(
687                 self.IPV4,
688                 self.PERMIT,
689                 self.PORTS_RANGE,
690                 self.proto[self.ICMP][self.ICMPv4],
691             )
692         )
693         # deny ip any any in the end
694         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
695
696         # Apply rules
697         self.apply_rules(rules, "permit icmpv4")
698
699         # Traffic should still pass
700         self.run_verify_test(self.ICMP, self.IPV4, self.proto[self.ICMP][self.ICMPv4])
701
702         self.logger.info("ACLP_TEST_FINISH_0004")
703
704     def test_0005_vpp624_permit_icmpv6(self):
705         """VPP_624 permit ICMPv6"""
706         self.logger.info("ACLP_TEST_START_0005")
707
708         # Add an ACL
709         rules = []
710         rules.append(
711             self.create_rule(
712                 self.IPV6,
713                 self.PERMIT,
714                 self.PORTS_RANGE,
715                 self.proto[self.ICMP][self.ICMPv6],
716             )
717         )
718         # deny ip any any in the end
719         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
720
721         # Apply rules
722         self.apply_rules(rules, "permit icmpv6")
723
724         # Traffic should still pass
725         self.run_verify_test(self.ICMP, self.IPV6, self.proto[self.ICMP][self.ICMPv6])
726
727         self.logger.info("ACLP_TEST_FINISH_0005")
728
729     def test_0006_vpp624_deny_icmpv4(self):
730         """VPP_624 deny ICMPv4"""
731         self.logger.info("ACLP_TEST_START_0006")
732         # Add an ACL
733         rules = []
734         rules.append(
735             self.create_rule(
736                 self.IPV4,
737                 self.DENY,
738                 self.PORTS_RANGE,
739                 self.proto[self.ICMP][self.ICMPv4],
740             )
741         )
742         # permit ip any any in the end
743         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_ALL, 0))
744
745         # Apply rules
746         self.apply_rules(rules, "deny icmpv4")
747
748         # Traffic should not pass
749         self.run_verify_negat_test(self.ICMP, self.IPV4, 0)
750
751         self.logger.info("ACLP_TEST_FINISH_0006")
752
753     def test_0007_vpp624_deny_icmpv6(self):
754         """VPP_624 deny ICMPv6"""
755         self.logger.info("ACLP_TEST_START_0007")
756         # Add an ACL
757         rules = []
758         rules.append(
759             self.create_rule(
760                 self.IPV6,
761                 self.DENY,
762                 self.PORTS_RANGE,
763                 self.proto[self.ICMP][self.ICMPv6],
764             )
765         )
766         # deny ip any any in the end
767         rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_ALL, 0))
768
769         # Apply rules
770         self.apply_rules(rules, "deny icmpv6")
771
772         # Traffic should not pass
773         self.run_verify_negat_test(self.ICMP, self.IPV6, 0)
774
775         self.logger.info("ACLP_TEST_FINISH_0007")
776
777     def test_0008_tcp_permit_v4(self):
778         """permit TCPv4"""
779         self.logger.info("ACLP_TEST_START_0008")
780
781         # Add an ACL
782         rules = []
783         rules.append(
784             self.create_rule(
785                 self.IPV4, self.PERMIT, self.PORTS_RANGE, self.proto[self.IP][self.TCP]
786             )
787         )
788         # deny ip any any in the end
789         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
790
791         # Apply rules
792         self.apply_rules(rules, "permit ipv4 tcp")
793
794         # Traffic should still pass
795         self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP])
796
797         self.logger.info("ACLP_TEST_FINISH_0008")
798
799     def test_0009_tcp_permit_v6(self):
800         """permit TCPv6"""
801         self.logger.info("ACLP_TEST_START_0009")
802
803         # Add an ACL
804         rules = []
805         rules.append(
806             self.create_rule(
807                 self.IPV6, self.PERMIT, self.PORTS_RANGE, self.proto[self.IP][self.TCP]
808             )
809         )
810         # deny ip any any in the end
811         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
812
813         # Apply rules
814         self.apply_rules(rules, "permit ip6 tcp")
815
816         # Traffic should still pass
817         self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.TCP])
818
819         self.logger.info("ACLP_TEST_FINISH_0008")
820
821     def test_0010_udp_permit_v4(self):
822         """permit UDPv4"""
823         self.logger.info("ACLP_TEST_START_0010")
824
825         # Add an ACL
826         rules = []
827         rules.append(
828             self.create_rule(
829                 self.IPV4, self.PERMIT, self.PORTS_RANGE, self.proto[self.IP][self.UDP]
830             )
831         )
832         # deny ip any any in the end
833         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
834
835         # Apply rules
836         self.apply_rules(rules, "permit ipv udp")
837
838         # Traffic should still pass
839         self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP])
840
841         self.logger.info("ACLP_TEST_FINISH_0010")
842
843     def test_0011_udp_permit_v6(self):
844         """permit UDPv6"""
845         self.logger.info("ACLP_TEST_START_0011")
846
847         # Add an ACL
848         rules = []
849         rules.append(
850             self.create_rule(
851                 self.IPV6, self.PERMIT, self.PORTS_RANGE, self.proto[self.IP][self.UDP]
852             )
853         )
854         # deny ip any any in the end
855         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
856
857         # Apply rules
858         self.apply_rules(rules, "permit ip6 udp")
859
860         # Traffic should still pass
861         self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.UDP])
862
863         self.logger.info("ACLP_TEST_FINISH_0011")
864
865     def test_0012_tcp_deny(self):
866         """deny TCPv4/v6"""
867         self.logger.info("ACLP_TEST_START_0012")
868
869         # Add an ACL
870         rules = []
871         rules.append(
872             self.create_rule(
873                 self.IPV4, self.DENY, self.PORTS_RANGE, self.proto[self.IP][self.TCP]
874             )
875         )
876         rules.append(
877             self.create_rule(
878                 self.IPV6, self.DENY, self.PORTS_RANGE, self.proto[self.IP][self.TCP]
879             )
880         )
881         # permit ip any any in the end
882         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_ALL, 0))
883         rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_ALL, 0))
884
885         # Apply rules
886         self.apply_rules(rules, "deny ip4/ip6 tcp")
887
888         # Traffic should not pass
889         self.run_verify_negat_test(
890             self.IP, self.IPRANDOM, self.proto[self.IP][self.TCP]
891         )
892
893         self.logger.info("ACLP_TEST_FINISH_0012")
894
895     def test_0013_udp_deny(self):
896         """deny UDPv4/v6"""
897         self.logger.info("ACLP_TEST_START_0013")
898
899         # Add an ACL
900         rules = []
901         rules.append(
902             self.create_rule(
903                 self.IPV4, self.DENY, self.PORTS_RANGE, self.proto[self.IP][self.UDP]
904             )
905         )
906         rules.append(
907             self.create_rule(
908                 self.IPV6, self.DENY, self.PORTS_RANGE, self.proto[self.IP][self.UDP]
909             )
910         )
911         # permit ip any any in the end
912         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_ALL, 0))
913         rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_ALL, 0))
914
915         # Apply rules
916         self.apply_rules(rules, "deny ip4/ip6 udp")
917
918         # Traffic should not pass
919         self.run_verify_negat_test(
920             self.IP, self.IPRANDOM, self.proto[self.IP][self.UDP]
921         )
922
923         self.logger.info("ACLP_TEST_FINISH_0013")
924
925     def test_0014_acl_dump(self):
926         """verify add/dump acls"""
927         self.logger.info("ACLP_TEST_START_0014")
928
929         r = [
930             [self.IPV4, self.PERMIT, 1234, self.proto[self.IP][self.TCP]],
931             [self.IPV4, self.PERMIT, 2345, self.proto[self.IP][self.UDP]],
932             [self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.TCP]],
933             [self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.UDP]],
934             [self.IPV4, self.PERMIT, 5, self.proto[self.ICMP][self.ICMPv4]],
935             [self.IPV6, self.PERMIT, 4321, self.proto[self.IP][self.TCP]],
936             [self.IPV6, self.PERMIT, 5432, self.proto[self.IP][self.UDP]],
937             [self.IPV6, self.PERMIT, 0, self.proto[self.IP][self.TCP]],
938             [self.IPV6, self.PERMIT, 0, self.proto[self.IP][self.UDP]],
939             [self.IPV6, self.PERMIT, 6, self.proto[self.ICMP][self.ICMPv6]],
940             [self.IPV4, self.DENY, self.PORTS_ALL, 0],
941             [self.IPV4, self.DENY, 1234, self.proto[self.IP][self.TCP]],
942             [self.IPV4, self.DENY, 2345, self.proto[self.IP][self.UDP]],
943             [self.IPV4, self.DENY, 5, self.proto[self.ICMP][self.ICMPv4]],
944             [self.IPV6, self.DENY, 4321, self.proto[self.IP][self.TCP]],
945             [self.IPV6, self.DENY, 5432, self.proto[self.IP][self.UDP]],
946             [self.IPV6, self.DENY, 6, self.proto[self.ICMP][self.ICMPv6]],
947             [self.IPV6, self.DENY, self.PORTS_ALL, 0],
948         ]
949
950         # Add and verify new ACLs
951         rules = []
952         for i in range(len(r)):
953             rules.append(self.create_rule(r[i][0], r[i][1], r[i][2], r[i][3]))
954
955         acl = VppAcl(self, rules=rules)
956         acl.add_vpp_config()
957         result = acl.dump()
958
959         i = 0
960         for drules in result:
961             for dr in drules.r:
962                 self.assertEqual(dr.is_permit, r[i][1])
963                 self.assertEqual(dr.proto, r[i][3])
964
965                 if r[i][2] > 0:
966                     self.assertEqual(dr.srcport_or_icmptype_first, r[i][2])
967                 else:
968                     if r[i][2] < 0:
969                         self.assertEqual(dr.srcport_or_icmptype_first, 0)
970                         self.assertEqual(dr.srcport_or_icmptype_last, 65535)
971                     else:
972                         if dr.proto == self.proto[self.IP][self.TCP]:
973                             self.assertGreater(
974                                 dr.srcport_or_icmptype_first, self.tcp_sport_from - 1
975                             )
976                             self.assertLess(
977                                 dr.srcport_or_icmptype_first, self.tcp_sport_to + 1
978                             )
979                             self.assertGreater(
980                                 dr.dstport_or_icmpcode_last, self.tcp_dport_from - 1
981                             )
982                             self.assertLess(
983                                 dr.dstport_or_icmpcode_last, self.tcp_dport_to + 1
984                             )
985                         elif dr.proto == self.proto[self.IP][self.UDP]:
986                             self.assertGreater(
987                                 dr.srcport_or_icmptype_first, self.udp_sport_from - 1
988                             )
989                             self.assertLess(
990                                 dr.srcport_or_icmptype_first, self.udp_sport_to + 1
991                             )
992                             self.assertGreater(
993                                 dr.dstport_or_icmpcode_last, self.udp_dport_from - 1
994                             )
995                             self.assertLess(
996                                 dr.dstport_or_icmpcode_last, self.udp_dport_to + 1
997                             )
998                 i += 1
999
1000         self.logger.info("ACLP_TEST_FINISH_0014")
1001
1002     def test_0015_tcp_permit_port_v4(self):
1003         """permit single TCPv4"""
1004         self.logger.info("ACLP_TEST_START_0015")
1005
1006         port = random.randint(16384, 65535)
1007         # Add an ACL
1008         rules = []
1009         rules.append(
1010             self.create_rule(
1011                 self.IPV4, self.PERMIT, port, self.proto[self.IP][self.TCP]
1012             )
1013         )
1014         # deny ip any any in the end
1015         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1016
1017         # Apply rules
1018         self.apply_rules(rules, "permit ip4 tcp %d" % port)
1019
1020         # Traffic should still pass
1021         self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP], port)
1022
1023         self.logger.info("ACLP_TEST_FINISH_0015")
1024
1025     def test_0016_udp_permit_port_v4(self):
1026         """permit single UDPv4"""
1027         self.logger.info("ACLP_TEST_START_0016")
1028
1029         port = random.randint(16384, 65535)
1030         # Add an ACL
1031         rules = []
1032         rules.append(
1033             self.create_rule(
1034                 self.IPV4, self.PERMIT, port, self.proto[self.IP][self.UDP]
1035             )
1036         )
1037         # deny ip any any in the end
1038         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1039
1040         # Apply rules
1041         self.apply_rules(rules, "permit ip4 tcp %d" % port)
1042
1043         # Traffic should still pass
1044         self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP], port)
1045
1046         self.logger.info("ACLP_TEST_FINISH_0016")
1047
1048     def test_0017_tcp_permit_port_v6(self):
1049         """permit single TCPv6"""
1050         self.logger.info("ACLP_TEST_START_0017")
1051
1052         port = random.randint(16384, 65535)
1053         # Add an ACL
1054         rules = []
1055         rules.append(
1056             self.create_rule(
1057                 self.IPV6, self.PERMIT, port, self.proto[self.IP][self.TCP]
1058             )
1059         )
1060         # deny ip any any in the end
1061         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1062
1063         # Apply rules
1064         self.apply_rules(rules, "permit ip4 tcp %d" % port)
1065
1066         # Traffic should still pass
1067         self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.TCP], port)
1068
1069         self.logger.info("ACLP_TEST_FINISH_0017")
1070
1071     def test_0018_udp_permit_port_v6(self):
1072         """permit single UDPv6"""
1073         self.logger.info("ACLP_TEST_START_0018")
1074
1075         port = random.randint(16384, 65535)
1076         # Add an ACL
1077         rules = []
1078         rules.append(
1079             self.create_rule(
1080                 self.IPV6, self.PERMIT, port, self.proto[self.IP][self.UDP]
1081             )
1082         )
1083         # deny ip any any in the end
1084         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1085
1086         # Apply rules
1087         self.apply_rules(rules, "permit ip4 tcp %d" % port)
1088
1089         # Traffic should still pass
1090         self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.UDP], port)
1091
1092         self.logger.info("ACLP_TEST_FINISH_0018")
1093
1094     def test_0019_udp_deny_port(self):
1095         """deny single TCPv4/v6"""
1096         self.logger.info("ACLP_TEST_START_0019")
1097
1098         port = random.randint(16384, 65535)
1099         # Add an ACL
1100         rules = []
1101         rules.append(
1102             self.create_rule(self.IPV4, self.DENY, port, self.proto[self.IP][self.TCP])
1103         )
1104         rules.append(
1105             self.create_rule(self.IPV6, self.DENY, port, self.proto[self.IP][self.TCP])
1106         )
1107         # Permit ip any any in the end
1108         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_ALL, 0))
1109         rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_ALL, 0))
1110
1111         # Apply rules
1112         self.apply_rules(rules, "deny ip4/ip6 udp %d" % port)
1113
1114         # Traffic should not pass
1115         self.run_verify_negat_test(
1116             self.IP, self.IPRANDOM, self.proto[self.IP][self.TCP], port
1117         )
1118
1119         self.logger.info("ACLP_TEST_FINISH_0019")
1120
1121     def test_0020_udp_deny_port(self):
1122         """deny single UDPv4/v6"""
1123         self.logger.info("ACLP_TEST_START_0020")
1124
1125         port = random.randint(16384, 65535)
1126         # Add an ACL
1127         rules = []
1128         rules.append(
1129             self.create_rule(self.IPV4, self.DENY, port, self.proto[self.IP][self.UDP])
1130         )
1131         rules.append(
1132             self.create_rule(self.IPV6, self.DENY, port, self.proto[self.IP][self.UDP])
1133         )
1134         # Permit ip any any in the end
1135         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_ALL, 0))
1136         rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_ALL, 0))
1137
1138         # Apply rules
1139         self.apply_rules(rules, "deny ip4/ip6 udp %d" % port)
1140
1141         # Traffic should not pass
1142         self.run_verify_negat_test(
1143             self.IP, self.IPRANDOM, self.proto[self.IP][self.UDP], port
1144         )
1145
1146         self.logger.info("ACLP_TEST_FINISH_0020")
1147
1148     def test_0021_udp_deny_port_verify_fragment_deny(self):
1149         """deny single UDPv4/v6, permit ip any, verify non-initial fragment
1150         blocked
1151         """
1152         self.logger.info("ACLP_TEST_START_0021")
1153
1154         port = random.randint(16384, 65535)
1155         # Add an ACL
1156         rules = []
1157         rules.append(
1158             self.create_rule(self.IPV4, self.DENY, port, self.proto[self.IP][self.UDP])
1159         )
1160         rules.append(
1161             self.create_rule(self.IPV6, self.DENY, port, self.proto[self.IP][self.UDP])
1162         )
1163         # deny ip any any in the end
1164         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_ALL, 0))
1165         rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_ALL, 0))
1166
1167         # Apply rules
1168         self.apply_rules(rules, "deny ip4/ip6 udp %d" % port)
1169
1170         # Traffic should not pass
1171         self.run_verify_negat_test(
1172             self.IP, self.IPRANDOM, self.proto[self.IP][self.UDP], port, True
1173         )
1174
1175         self.logger.info("ACLP_TEST_FINISH_0021")
1176
1177     def test_0022_zero_length_udp_ipv4(self):
1178         """VPP-687 zero length udp ipv4 packet"""
1179         self.logger.info("ACLP_TEST_START_0022")
1180
1181         port = random.randint(16384, 65535)
1182         # Add an ACL
1183         rules = []
1184         rules.append(
1185             self.create_rule(
1186                 self.IPV4, self.PERMIT, port, self.proto[self.IP][self.UDP]
1187             )
1188         )
1189         # deny ip any any in the end
1190         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1191
1192         # Apply rules
1193         self.apply_rules(rules, "permit empty udp ip4 %d" % port)
1194
1195         # Traffic should still pass
1196         # Create incoming packet streams for packet-generator interfaces
1197         pkts_cnt = 0
1198         pkts = self.create_stream(
1199             self.pg0,
1200             self.pg_if_packet_sizes,
1201             self.IP,
1202             self.IPV4,
1203             self.proto[self.IP][self.UDP],
1204             port,
1205             False,
1206             False,
1207         )
1208         if len(pkts) > 0:
1209             self.pg0.add_stream(pkts)
1210             pkts_cnt += len(pkts)
1211
1212         # Enable packet capture and start packet sendingself.IPV
1213         self.pg_enable_capture(self.pg_interfaces)
1214         self.pg_start()
1215
1216         self.pg1.get_capture(pkts_cnt)
1217
1218         self.logger.info("ACLP_TEST_FINISH_0022")
1219
1220     def test_0023_zero_length_udp_ipv6(self):
1221         """VPP-687 zero length udp ipv6 packet"""
1222         self.logger.info("ACLP_TEST_START_0023")
1223
1224         port = random.randint(16384, 65535)
1225         # Add an ACL
1226         rules = []
1227         rules.append(
1228             self.create_rule(
1229                 self.IPV6, self.PERMIT, port, self.proto[self.IP][self.UDP]
1230             )
1231         )
1232         # deny ip any any in the end
1233         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1234
1235         # Apply rules
1236         self.apply_rules(rules, "permit empty udp ip6 %d" % port)
1237
1238         # Traffic should still pass
1239         # Create incoming packet streams for packet-generator interfaces
1240         pkts_cnt = 0
1241         pkts = self.create_stream(
1242             self.pg0,
1243             self.pg_if_packet_sizes,
1244             self.IP,
1245             self.IPV6,
1246             self.proto[self.IP][self.UDP],
1247             port,
1248             False,
1249             False,
1250         )
1251         if len(pkts) > 0:
1252             self.pg0.add_stream(pkts)
1253             pkts_cnt += len(pkts)
1254
1255         # Enable packet capture and start packet sendingself.IPV
1256         self.pg_enable_capture(self.pg_interfaces)
1257         self.pg_start()
1258
1259         # Verify outgoing packet streams per packet-generator interface
1260         self.pg1.get_capture(pkts_cnt)
1261
1262         self.logger.info("ACLP_TEST_FINISH_0023")
1263
1264     def test_0108_tcp_permit_v4(self):
1265         """permit TCPv4 + non-match range"""
1266         self.logger.info("ACLP_TEST_START_0108")
1267
1268         # Add an ACL
1269         rules = []
1270         rules.append(
1271             self.create_rule(
1272                 self.IPV4, self.DENY, self.PORTS_RANGE_2, self.proto[self.IP][self.TCP]
1273             )
1274         )
1275         rules.append(
1276             self.create_rule(
1277                 self.IPV4, self.PERMIT, self.PORTS_RANGE, self.proto[self.IP][self.TCP]
1278             )
1279         )
1280         # deny ip any any in the end
1281         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1282
1283         # Apply rules
1284         self.apply_rules(rules, "permit ipv4 tcp")
1285
1286         # Traffic should still pass
1287         self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP])
1288
1289         self.logger.info("ACLP_TEST_FINISH_0108")
1290
1291     def test_0109_tcp_permit_v6(self):
1292         """permit TCPv6 + non-match range"""
1293         self.logger.info("ACLP_TEST_START_0109")
1294
1295         # Add an ACL
1296         rules = []
1297         rules.append(
1298             self.create_rule(
1299                 self.IPV6, self.DENY, self.PORTS_RANGE_2, self.proto[self.IP][self.TCP]
1300             )
1301         )
1302         rules.append(
1303             self.create_rule(
1304                 self.IPV6, self.PERMIT, self.PORTS_RANGE, self.proto[self.IP][self.TCP]
1305             )
1306         )
1307         # deny ip any any in the end
1308         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1309
1310         # Apply rules
1311         self.apply_rules(rules, "permit ip6 tcp")
1312
1313         # Traffic should still pass
1314         self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.TCP])
1315
1316         self.logger.info("ACLP_TEST_FINISH_0109")
1317
1318     def test_0110_udp_permit_v4(self):
1319         """permit UDPv4 + non-match range"""
1320         self.logger.info("ACLP_TEST_START_0110")
1321
1322         # Add an ACL
1323         rules = []
1324         rules.append(
1325             self.create_rule(
1326                 self.IPV4, self.DENY, self.PORTS_RANGE_2, self.proto[self.IP][self.UDP]
1327             )
1328         )
1329         rules.append(
1330             self.create_rule(
1331                 self.IPV4, self.PERMIT, self.PORTS_RANGE, self.proto[self.IP][self.UDP]
1332             )
1333         )
1334         # deny ip any any in the end
1335         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1336
1337         # Apply rules
1338         self.apply_rules(rules, "permit ipv4 udp")
1339
1340         # Traffic should still pass
1341         self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP])
1342
1343         self.logger.info("ACLP_TEST_FINISH_0110")
1344
1345     def test_0111_udp_permit_v6(self):
1346         """permit UDPv6 + non-match range"""
1347         self.logger.info("ACLP_TEST_START_0111")
1348
1349         # Add an ACL
1350         rules = []
1351         rules.append(
1352             self.create_rule(
1353                 self.IPV6, self.DENY, self.PORTS_RANGE_2, self.proto[self.IP][self.UDP]
1354             )
1355         )
1356         rules.append(
1357             self.create_rule(
1358                 self.IPV6, self.PERMIT, self.PORTS_RANGE, self.proto[self.IP][self.UDP]
1359             )
1360         )
1361         # deny ip any any in the end
1362         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1363
1364         # Apply rules
1365         self.apply_rules(rules, "permit ip6 udp")
1366
1367         # Traffic should still pass
1368         self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.UDP])
1369
1370         self.logger.info("ACLP_TEST_FINISH_0111")
1371
1372     def test_0112_tcp_deny(self):
1373         """deny TCPv4/v6 + non-match range"""
1374         self.logger.info("ACLP_TEST_START_0112")
1375
1376         # Add an ACL
1377         rules = []
1378         rules.append(
1379             self.create_rule(
1380                 self.IPV4,
1381                 self.PERMIT,
1382                 self.PORTS_RANGE_2,
1383                 self.proto[self.IP][self.TCP],
1384             )
1385         )
1386         rules.append(
1387             self.create_rule(
1388                 self.IPV6,
1389                 self.PERMIT,
1390                 self.PORTS_RANGE_2,
1391                 self.proto[self.IP][self.TCP],
1392             )
1393         )
1394         rules.append(
1395             self.create_rule(
1396                 self.IPV4, self.DENY, self.PORTS_RANGE, self.proto[self.IP][self.TCP]
1397             )
1398         )
1399         rules.append(
1400             self.create_rule(
1401                 self.IPV6, self.DENY, self.PORTS_RANGE, self.proto[self.IP][self.TCP]
1402             )
1403         )
1404         # permit ip any any in the end
1405         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_ALL, 0))
1406         rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_ALL, 0))
1407
1408         # Apply rules
1409         self.apply_rules(rules, "deny ip4/ip6 tcp")
1410
1411         # Traffic should not pass
1412         self.run_verify_negat_test(
1413             self.IP, self.IPRANDOM, self.proto[self.IP][self.TCP]
1414         )
1415
1416         self.logger.info("ACLP_TEST_FINISH_0112")
1417
1418     def test_0113_udp_deny(self):
1419         """deny UDPv4/v6 + non-match range"""
1420         self.logger.info("ACLP_TEST_START_0113")
1421
1422         # Add an ACL
1423         rules = []
1424         rules.append(
1425             self.create_rule(
1426                 self.IPV4,
1427                 self.PERMIT,
1428                 self.PORTS_RANGE_2,
1429                 self.proto[self.IP][self.UDP],
1430             )
1431         )
1432         rules.append(
1433             self.create_rule(
1434                 self.IPV6,
1435                 self.PERMIT,
1436                 self.PORTS_RANGE_2,
1437                 self.proto[self.IP][self.UDP],
1438             )
1439         )
1440         rules.append(
1441             self.create_rule(
1442                 self.IPV4, self.DENY, self.PORTS_RANGE, self.proto[self.IP][self.UDP]
1443             )
1444         )
1445         rules.append(
1446             self.create_rule(
1447                 self.IPV6, self.DENY, self.PORTS_RANGE, self.proto[self.IP][self.UDP]
1448             )
1449         )
1450         # permit ip any any in the end
1451         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_ALL, 0))
1452         rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_ALL, 0))
1453
1454         # Apply rules
1455         self.apply_rules(rules, "deny ip4/ip6 udp")
1456
1457         # Traffic should not pass
1458         self.run_verify_negat_test(
1459             self.IP, self.IPRANDOM, self.proto[self.IP][self.UDP]
1460         )
1461
1462         self.logger.info("ACLP_TEST_FINISH_0113")
1463
1464     def test_0300_tcp_permit_v4_etype_aaaa(self):
1465         """permit TCPv4, send 0xAAAA etype"""
1466         self.logger.info("ACLP_TEST_START_0300")
1467
1468         # Add an ACL
1469         rules = []
1470         rules.append(
1471             self.create_rule(
1472                 self.IPV4, self.DENY, self.PORTS_RANGE_2, self.proto[self.IP][self.TCP]
1473             )
1474         )
1475         rules.append(
1476             self.create_rule(
1477                 self.IPV4, self.PERMIT, self.PORTS_RANGE, self.proto[self.IP][self.TCP]
1478             )
1479         )
1480         # deny ip any any in the end
1481         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1482
1483         # Apply rules
1484         self.apply_rules(rules, "permit ipv4 tcp")
1485
1486         # Traffic should still pass also for an odd ethertype
1487         self.run_verify_test(
1488             self.IP, self.IPV4, self.proto[self.IP][self.TCP], 0, False, True, 0xAAAA
1489         )
1490         self.logger.info("ACLP_TEST_FINISH_0300")
1491
1492     def test_0305_tcp_permit_v4_etype_blacklist_aaaa(self):
1493         """permit TCPv4, whitelist 0x0BBB ethertype, send 0xAAAA-blocked"""
1494         self.logger.info("ACLP_TEST_START_0305")
1495
1496         # Add an ACL
1497         rules = []
1498         rules.append(
1499             self.create_rule(
1500                 self.IPV4, self.DENY, self.PORTS_RANGE_2, self.proto[self.IP][self.TCP]
1501             )
1502         )
1503         rules.append(
1504             self.create_rule(
1505                 self.IPV4, self.PERMIT, self.PORTS_RANGE, self.proto[self.IP][self.TCP]
1506             )
1507         )
1508         # deny ip any any in the end
1509         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1510
1511         # Apply rules
1512         self.apply_rules(rules, "permit ipv4 tcp")
1513         # whitelist the 0xbbbb etype - so the 0xaaaa should be blocked
1514         self.etype_whitelist([0xBBB], 1)
1515
1516         # The oddball ethertype should be blocked
1517         self.run_verify_negat_test(
1518             self.IP, self.IPV4, self.proto[self.IP][self.TCP], 0, False, 0xAAAA
1519         )
1520
1521         # remove the whitelist
1522         self.etype_whitelist([], 0, add=False)
1523
1524         self.logger.info("ACLP_TEST_FINISH_0305")
1525
1526     def test_0306_tcp_permit_v4_etype_blacklist_aaaa(self):
1527         """permit TCPv4, whitelist 0x0BBB ethertype, send 0x0BBB - pass"""
1528         self.logger.info("ACLP_TEST_START_0306")
1529
1530         # Add an ACL
1531         rules = []
1532         rules.append(
1533             self.create_rule(
1534                 self.IPV4, self.DENY, self.PORTS_RANGE_2, self.proto[self.IP][self.TCP]
1535             )
1536         )
1537         rules.append(
1538             self.create_rule(
1539                 self.IPV4, self.PERMIT, self.PORTS_RANGE, self.proto[self.IP][self.TCP]
1540             )
1541         )
1542         # deny ip any any in the end
1543         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1544
1545         # Apply rules
1546         self.apply_rules(rules, "permit ipv4 tcp")
1547         # whitelist the 0xbbbb etype - so the 0xaaaa should be blocked
1548         self.etype_whitelist([0xBBB], 1)
1549
1550         # The whitelisted traffic, should pass
1551         self.run_verify_test(
1552             self.IP, self.IPV4, self.proto[self.IP][self.TCP], 0, False, True, 0x0BBB
1553         )
1554
1555         # remove the whitelist, the previously blocked 0xAAAA should pass now
1556         self.etype_whitelist([], 0, add=False)
1557
1558         self.logger.info("ACLP_TEST_FINISH_0306")
1559
1560     def test_0307_tcp_permit_v4_etype_blacklist_aaaa(self):
1561         """permit TCPv4, whitelist 0x0BBB, remove, send 0xAAAA - pass"""
1562         self.logger.info("ACLP_TEST_START_0307")
1563
1564         # Add an ACL
1565         rules = []
1566         rules.append(
1567             self.create_rule(
1568                 self.IPV4, self.DENY, self.PORTS_RANGE_2, self.proto[self.IP][self.TCP]
1569             )
1570         )
1571         rules.append(
1572             self.create_rule(
1573                 self.IPV4, self.PERMIT, self.PORTS_RANGE, self.proto[self.IP][self.TCP]
1574             )
1575         )
1576         # deny ip any any in the end
1577         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1578
1579         # Apply rules
1580         self.apply_rules(rules, "permit ipv4 tcp")
1581
1582         # whitelist the 0xbbbb etype - so the 0xaaaa should be blocked
1583         self.etype_whitelist([0xBBB], 1)
1584         # remove the whitelist, the previously blocked 0xAAAA should pass now
1585         self.etype_whitelist([], 0, add=False)
1586
1587         # The whitelisted traffic, should pass
1588         self.run_verify_test(
1589             self.IP, self.IPV4, self.proto[self.IP][self.TCP], 0, False, True, 0xAAAA
1590         )
1591
1592         self.logger.info("ACLP_TEST_FINISH_0306")
1593
1594     def test_0315_del_intf(self):
1595         """apply an acl and delete the interface"""
1596         self.logger.info("ACLP_TEST_START_0315")
1597
1598         # Add an ACL
1599         rules = []
1600         rules.append(
1601             self.create_rule(
1602                 self.IPV4, self.DENY, self.PORTS_RANGE_2, self.proto[self.IP][self.TCP]
1603             )
1604         )
1605         rules.append(
1606             self.create_rule(
1607                 self.IPV4, self.PERMIT, self.PORTS_RANGE, self.proto[self.IP][self.TCP]
1608             )
1609         )
1610         # deny ip any any in the end
1611         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1612
1613         # create an interface
1614         intf = []
1615         intf.append(VppLoInterface(self))
1616
1617         # Apply rules
1618         self.apply_rules_to(rules, "permit ipv4 tcp", intf[0].sw_if_index)
1619
1620         # Remove the interface
1621         intf[0].remove_vpp_config()
1622
1623         self.logger.info("ACLP_TEST_FINISH_0315")
1624
1625
1626 if __name__ == "__main__":
1627     unittest.main(testRunner=VppTestRunner)