tests: replace pycodestyle with black
[vpp.git] / test / test_acl_plugin.py
1 #!/usr/bin/env python3
2 """ACL plugin Test Case HLD:
3 """
4
5 import unittest
6 import random
7
8 from scapy.packet import Raw
9 from scapy.layers.l2 import Ether
10 from scapy.layers.inet import IP, TCP, UDP, ICMP
11 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest
12 from scapy.layers.inet6 import IPv6ExtHdrFragment
13 from framework import VppTestCase, VppTestRunner
14 from framework import tag_fixme_vpp_workers
15 from util import Host, ppp
16 from ipaddress import IPv4Network, IPv6Network
17
18 from vpp_lo_interface import VppLoInterface
19 from vpp_acl import AclRule, VppAcl, VppAclInterface, VppEtypeWhitelist
20 from vpp_ip import INVALID_INDEX
21
22
23 @tag_fixme_vpp_workers
24 class TestACLplugin(VppTestCase):
25     """ACL plugin Test Case"""
26
27     # traffic types
28     IP = 0
29     ICMP = 1
30
31     # IP version
32     IPRANDOM = -1
33     IPV4 = 0
34     IPV6 = 1
35
36     # rule types
37     DENY = 0
38     PERMIT = 1
39
40     # supported protocols
41     proto = [[6, 17], [1, 58]]
42     proto_map = {1: "ICMP", 58: "ICMPv6EchoRequest", 6: "TCP", 17: "UDP"}
43     ICMPv4 = 0
44     ICMPv6 = 1
45     TCP = 0
46     UDP = 1
47     PROTO_ALL = 0
48
49     # port ranges
50     PORTS_ALL = -1
51     PORTS_RANGE = 0
52     PORTS_RANGE_2 = 1
53     udp_sport_from = 10
54     udp_sport_to = udp_sport_from + 5
55     udp_dport_from = 20000
56     udp_dport_to = udp_dport_from + 5000
57     tcp_sport_from = 30
58     tcp_sport_to = tcp_sport_from + 5
59     tcp_dport_from = 40000
60     tcp_dport_to = tcp_dport_from + 5000
61
62     udp_sport_from_2 = 90
63     udp_sport_to_2 = udp_sport_from_2 + 5
64     udp_dport_from_2 = 30000
65     udp_dport_to_2 = udp_dport_from_2 + 5000
66     tcp_sport_from_2 = 130
67     tcp_sport_to_2 = tcp_sport_from_2 + 5
68     tcp_dport_from_2 = 20000
69     tcp_dport_to_2 = tcp_dport_from_2 + 5000
70
71     icmp4_type = 8  # echo request
72     icmp4_code = 3
73     icmp6_type = 128  # echo request
74     icmp6_code = 3
75
76     icmp4_type_2 = 8
77     icmp4_code_from_2 = 5
78     icmp4_code_to_2 = 20
79     icmp6_type_2 = 128
80     icmp6_code_from_2 = 8
81     icmp6_code_to_2 = 42
82
83     # Test variables
84     bd_id = 1
85
86     @classmethod
87     def setUpClass(cls):
88         """
89         Perform standard class setup (defined by class method setUpClass in
90         class VppTestCase) before running the test case, set test case related
91         variables and configure VPP.
92         """
93         super(TestACLplugin, cls).setUpClass()
94
95         try:
96             # Create 2 pg interfaces
97             cls.create_pg_interfaces(range(2))
98
99             # Packet flows mapping pg0 -> pg1, pg2 etc.
100             cls.flows = dict()
101             cls.flows[cls.pg0] = [cls.pg1]
102
103             # Packet sizes
104             cls.pg_if_packet_sizes = [64, 512, 1518, 9018]
105
106             # Create BD with MAC learning and unknown unicast flooding disabled
107             # and put interfaces to this BD
108             cls.vapi.bridge_domain_add_del(bd_id=cls.bd_id, uu_flood=1, learn=1)
109             for pg_if in cls.pg_interfaces:
110                 cls.vapi.sw_interface_set_l2_bridge(
111                     rx_sw_if_index=pg_if.sw_if_index, bd_id=cls.bd_id
112                 )
113
114             # Set up all interfaces
115             for i in cls.pg_interfaces:
116                 i.admin_up()
117
118             # Mapping between packet-generator index and lists of test hosts
119             cls.hosts_by_pg_idx = dict()
120             for pg_if in cls.pg_interfaces:
121                 cls.hosts_by_pg_idx[pg_if.sw_if_index] = []
122
123             # Create list of deleted hosts
124             cls.deleted_hosts_by_pg_idx = dict()
125             for pg_if in cls.pg_interfaces:
126                 cls.deleted_hosts_by_pg_idx[pg_if.sw_if_index] = []
127
128             # warm-up the mac address tables
129             # self.warmup_test()
130             count = 16
131             start = 0
132             n_int = len(cls.pg_interfaces)
133             macs_per_if = count // n_int
134             i = -1
135             for pg_if in cls.pg_interfaces:
136                 i += 1
137                 start_nr = macs_per_if * i + start
138                 end_nr = (
139                     count + start if i == (n_int - 1) else macs_per_if * (i + 1) + start
140                 )
141                 hosts = cls.hosts_by_pg_idx[pg_if.sw_if_index]
142                 for j in range(int(start_nr), int(end_nr)):
143                     host = Host(
144                         "00:00:00:ff:%02x:%02x" % (pg_if.sw_if_index, j),
145                         "172.17.1%02x.%u" % (pg_if.sw_if_index, j),
146                         "2017:dead:%02x::%u" % (pg_if.sw_if_index, j),
147                     )
148                     hosts.append(host)
149
150         except Exception:
151             super(TestACLplugin, cls).tearDownClass()
152             raise
153
154     @classmethod
155     def tearDownClass(cls):
156         super(TestACLplugin, cls).tearDownClass()
157
158     def setUp(self):
159         super(TestACLplugin, self).setUp()
160         self.reset_packet_infos()
161
162     def tearDown(self):
163         """
164         Show various debug prints after each test.
165         """
166         super(TestACLplugin, self).tearDown()
167
168     def show_commands_at_teardown(self):
169         cli = "show vlib graph l2-input-feat-arc"
170         self.logger.info(self.vapi.ppcli(cli))
171         cli = "show vlib graph l2-input-feat-arc-end"
172         self.logger.info(self.vapi.ppcli(cli))
173         cli = "show vlib graph l2-output-feat-arc"
174         self.logger.info(self.vapi.ppcli(cli))
175         cli = "show vlib graph l2-output-feat-arc-end"
176         self.logger.info(self.vapi.ppcli(cli))
177         self.logger.info(self.vapi.ppcli("show l2fib verbose"))
178         self.logger.info(self.vapi.ppcli("show acl-plugin acl"))
179         self.logger.info(self.vapi.ppcli("show acl-plugin interface"))
180         self.logger.info(self.vapi.ppcli("show acl-plugin tables"))
181         self.logger.info(self.vapi.ppcli("show bridge-domain %s detail" % self.bd_id))
182
183     def create_rule(
184         self,
185         ip=0,
186         permit_deny=0,
187         ports=PORTS_ALL,
188         proto=-1,
189         s_prefix=0,
190         s_ip=0,
191         d_prefix=0,
192         d_ip=0,
193     ):
194         if ip:
195             src_prefix = IPv6Network((s_ip, s_prefix))
196             dst_prefix = IPv6Network((d_ip, d_prefix))
197         else:
198             src_prefix = IPv4Network((s_ip, s_prefix))
199             dst_prefix = IPv4Network((d_ip, d_prefix))
200         return AclRule(
201             is_permit=permit_deny,
202             ports=ports,
203             proto=proto,
204             src_prefix=src_prefix,
205             dst_prefix=dst_prefix,
206         )
207
208     def apply_rules(self, rules, tag=None):
209         acl = VppAcl(self, rules, tag=tag)
210         acl.add_vpp_config()
211         self.logger.info("Dumped ACL: " + str(acl.dump()))
212         # Apply a ACL on the interface as inbound
213         for i in self.pg_interfaces:
214             acl_if = VppAclInterface(
215                 self, sw_if_index=i.sw_if_index, n_input=1, acls=[acl]
216             )
217             acl_if.add_vpp_config()
218         return acl.acl_index
219
220     def apply_rules_to(self, rules, tag=None, sw_if_index=INVALID_INDEX):
221         acl = VppAcl(self, rules, tag=tag)
222         acl.add_vpp_config()
223         self.logger.info("Dumped ACL: " + str(acl.dump()))
224         # Apply a ACL on the interface as inbound
225         acl_if = VppAclInterface(self, sw_if_index=sw_if_index, n_input=1, acls=[acl])
226         return acl.acl_index
227
228     def etype_whitelist(self, whitelist, n_input, add=True):
229         # Apply whitelists on all the interfaces
230         if add:
231             self._wl = []
232             for i in self.pg_interfaces:
233                 self._wl.append(
234                     VppEtypeWhitelist(
235                         self,
236                         sw_if_index=i.sw_if_index,
237                         whitelist=whitelist,
238                         n_input=n_input,
239                     ).add_vpp_config()
240                 )
241         else:
242             if hasattr(self, "_wl"):
243                 for wl in self._wl:
244                     wl.remove_vpp_config()
245
246     def create_upper_layer(self, packet_index, proto, ports=0):
247         p = self.proto_map[proto]
248         if p == "UDP":
249             if ports == 0:
250                 return UDP(
251                     sport=random.randint(self.udp_sport_from, self.udp_sport_to),
252                     dport=random.randint(self.udp_dport_from, self.udp_dport_to),
253                 )
254             else:
255                 return UDP(sport=ports, dport=ports)
256         elif p == "TCP":
257             if ports == 0:
258                 return TCP(
259                     sport=random.randint(self.tcp_sport_from, self.tcp_sport_to),
260                     dport=random.randint(self.tcp_dport_from, self.tcp_dport_to),
261                 )
262             else:
263                 return TCP(sport=ports, dport=ports)
264         return ""
265
266     def create_stream(
267         self,
268         src_if,
269         packet_sizes,
270         traffic_type=0,
271         ipv6=0,
272         proto=-1,
273         ports=0,
274         fragments=False,
275         pkt_raw=True,
276         etype=-1,
277     ):
278         """
279         Create input packet stream for defined interface using hosts or
280         deleted_hosts list.
281
282         :param object src_if: Interface to create packet stream for.
283         :param list packet_sizes: List of required packet sizes.
284         :param traffic_type: 1: ICMP packet, 2: IPv6 with EH, 0: otherwise.
285         :return: Stream of packets.
286         """
287         pkts = []
288         if self.flows.__contains__(src_if):
289             src_hosts = self.hosts_by_pg_idx[src_if.sw_if_index]
290             for dst_if in self.flows[src_if]:
291                 dst_hosts = self.hosts_by_pg_idx[dst_if.sw_if_index]
292                 n_int = len(dst_hosts) * len(src_hosts)
293                 for i in range(0, n_int):
294                     dst_host = dst_hosts[int(i / len(src_hosts))]
295                     src_host = src_hosts[i % len(src_hosts)]
296                     pkt_info = self.create_packet_info(src_if, dst_if)
297                     if ipv6 == 1:
298                         pkt_info.ip = 1
299                     elif ipv6 == 0:
300                         pkt_info.ip = 0
301                     else:
302                         pkt_info.ip = random.choice([0, 1])
303                     if proto == -1:
304                         pkt_info.proto = random.choice(self.proto[self.IP])
305                     else:
306                         pkt_info.proto = proto
307                     payload = self.info_to_payload(pkt_info)
308                     p = Ether(dst=dst_host.mac, src=src_host.mac)
309                     if etype > 0:
310                         p = Ether(dst=dst_host.mac, src=src_host.mac, type=etype)
311                     if pkt_info.ip:
312                         p /= IPv6(dst=dst_host.ip6, src=src_host.ip6)
313                         if fragments:
314                             p /= IPv6ExtHdrFragment(offset=64, m=1)
315                     else:
316                         if fragments:
317                             p /= IP(
318                                 src=src_host.ip4, dst=dst_host.ip4, flags=1, frag=64
319                             )
320                         else:
321                             p /= IP(src=src_host.ip4, dst=dst_host.ip4)
322                     if traffic_type == self.ICMP:
323                         if pkt_info.ip:
324                             p /= ICMPv6EchoRequest(
325                                 type=self.icmp6_type, code=self.icmp6_code
326                             )
327                         else:
328                             p /= ICMP(type=self.icmp4_type, code=self.icmp4_code)
329                     else:
330                         p /= self.create_upper_layer(i, pkt_info.proto, ports)
331                     if pkt_raw:
332                         p /= Raw(payload)
333                         pkt_info.data = p.copy()
334                     if pkt_raw:
335                         size = random.choice(packet_sizes)
336                         self.extend_packet(p, size)
337                     pkts.append(p)
338         return pkts
339
340     def verify_capture(self, pg_if, capture, traffic_type=0, ip_type=0, etype=-1):
341         """
342         Verify captured input packet stream for defined interface.
343
344         :param object pg_if: Interface to verify captured packet stream for.
345         :param list capture: Captured packet stream.
346         :param traffic_type: 1: ICMP packet, 2: IPv6 with EH, 0: otherwise.
347         """
348         last_info = dict()
349         for i in self.pg_interfaces:
350             last_info[i.sw_if_index] = None
351         dst_sw_if_index = pg_if.sw_if_index
352         for packet in capture:
353             if etype > 0:
354                 if packet[Ether].type != etype:
355                     self.logger.error(ppp("Unexpected ethertype in packet:", packet))
356                 else:
357                     continue
358             try:
359                 # Raw data for ICMPv6 are stored in ICMPv6EchoRequest.data
360                 if traffic_type == self.ICMP and ip_type == self.IPV6:
361                     payload_info = self.payload_to_info(
362                         packet[ICMPv6EchoRequest], "data"
363                     )
364                     payload = packet[ICMPv6EchoRequest]
365                 else:
366                     payload_info = self.payload_to_info(packet[Raw])
367                     payload = packet[self.proto_map[payload_info.proto]]
368             except:
369                 self.logger.error(
370                     ppp("Unexpected or invalid packet (outside network):", packet)
371                 )
372                 raise
373
374             if ip_type != 0:
375                 self.assertEqual(payload_info.ip, ip_type)
376             if traffic_type == self.ICMP:
377                 try:
378                     if payload_info.ip == 0:
379                         self.assertEqual(payload.type, self.icmp4_type)
380                         self.assertEqual(payload.code, self.icmp4_code)
381                     else:
382                         self.assertEqual(payload.type, self.icmp6_type)
383                         self.assertEqual(payload.code, self.icmp6_code)
384                 except:
385                     self.logger.error(
386                         ppp("Unexpected or invalid packet (outside network):", packet)
387                     )
388                     raise
389             else:
390                 try:
391                     ip_version = IPv6 if payload_info.ip == 1 else IP
392
393                     ip = packet[ip_version]
394                     packet_index = payload_info.index
395
396                     self.assertEqual(payload_info.dst, dst_sw_if_index)
397                     self.logger.debug(
398                         "Got packet on port %s: src=%u (id=%u)"
399                         % (pg_if.name, payload_info.src, packet_index)
400                     )
401                     next_info = self.get_next_packet_info_for_interface2(
402                         payload_info.src, dst_sw_if_index, last_info[payload_info.src]
403                     )
404                     last_info[payload_info.src] = next_info
405                     self.assertTrue(next_info is not None)
406                     self.assertEqual(packet_index, next_info.index)
407                     saved_packet = next_info.data
408                     # Check standard fields
409                     self.assertEqual(ip.src, saved_packet[ip_version].src)
410                     self.assertEqual(ip.dst, saved_packet[ip_version].dst)
411                     p = self.proto_map[payload_info.proto]
412                     if p == "TCP":
413                         tcp = packet[TCP]
414                         self.assertEqual(tcp.sport, saved_packet[TCP].sport)
415                         self.assertEqual(tcp.dport, saved_packet[TCP].dport)
416                     elif p == "UDP":
417                         udp = packet[UDP]
418                         self.assertEqual(udp.sport, saved_packet[UDP].sport)
419                         self.assertEqual(udp.dport, saved_packet[UDP].dport)
420                 except:
421                     self.logger.error(ppp("Unexpected or invalid packet:", packet))
422                     raise
423         for i in self.pg_interfaces:
424             remaining_packet = self.get_next_packet_info_for_interface2(
425                 i, dst_sw_if_index, last_info[i.sw_if_index]
426             )
427             self.assertTrue(
428                 remaining_packet is None,
429                 "Port %u: Packet expected from source %u didn't arrive"
430                 % (dst_sw_if_index, i.sw_if_index),
431             )
432
433     def run_traffic_no_check(self):
434         # Test
435         # Create incoming packet streams for packet-generator interfaces
436         for i in self.pg_interfaces:
437             if self.flows.__contains__(i):
438                 pkts = self.create_stream(i, self.pg_if_packet_sizes)
439                 if len(pkts) > 0:
440                     i.add_stream(pkts)
441
442         # Enable packet capture and start packet sending
443         self.pg_enable_capture(self.pg_interfaces)
444         self.pg_start()
445
446     def run_verify_test(
447         self,
448         traffic_type=0,
449         ip_type=0,
450         proto=-1,
451         ports=0,
452         frags=False,
453         pkt_raw=True,
454         etype=-1,
455     ):
456         # Test
457         # Create incoming packet streams for packet-generator interfaces
458         pkts_cnt = 0
459         for i in self.pg_interfaces:
460             if self.flows.__contains__(i):
461                 pkts = self.create_stream(
462                     i,
463                     self.pg_if_packet_sizes,
464                     traffic_type,
465                     ip_type,
466                     proto,
467                     ports,
468                     frags,
469                     pkt_raw,
470                     etype,
471                 )
472                 if len(pkts) > 0:
473                     i.add_stream(pkts)
474                     pkts_cnt += len(pkts)
475
476         # Enable packet capture and start packet sendingself.IPV
477         self.pg_enable_capture(self.pg_interfaces)
478         self.pg_start()
479         self.logger.info("sent packets count: %d" % pkts_cnt)
480
481         # Verify
482         # Verify outgoing packet streams per packet-generator interface
483         for src_if in self.pg_interfaces:
484             if self.flows.__contains__(src_if):
485                 for dst_if in self.flows[src_if]:
486                     capture = dst_if.get_capture(pkts_cnt)
487                     self.logger.info("Verifying capture on interface %s" % dst_if.name)
488                     self.verify_capture(dst_if, capture, traffic_type, ip_type, etype)
489
490     def run_verify_negat_test(
491         self, traffic_type=0, ip_type=0, proto=-1, ports=0, frags=False, etype=-1
492     ):
493         # Test
494         pkts_cnt = 0
495         self.reset_packet_infos()
496         for i in self.pg_interfaces:
497             if self.flows.__contains__(i):
498                 pkts = self.create_stream(
499                     i,
500                     self.pg_if_packet_sizes,
501                     traffic_type,
502                     ip_type,
503                     proto,
504                     ports,
505                     frags,
506                     True,
507                     etype,
508                 )
509                 if len(pkts) > 0:
510                     i.add_stream(pkts)
511                     pkts_cnt += len(pkts)
512
513         # Enable packet capture and start packet sending
514         self.pg_enable_capture(self.pg_interfaces)
515         self.pg_start()
516         self.logger.info("sent packets count: %d" % pkts_cnt)
517
518         # Verify
519         # Verify outgoing packet streams per packet-generator interface
520         for src_if in self.pg_interfaces:
521             if self.flows.__contains__(src_if):
522                 for dst_if in self.flows[src_if]:
523                     self.logger.info("Verifying capture on interface %s" % dst_if.name)
524                     capture = dst_if.get_capture(0)
525                     self.assertEqual(len(capture), 0)
526
527     def test_0000_warmup_test(self):
528         """ACL plugin version check; learn MACs"""
529         reply = self.vapi.papi.acl_plugin_get_version()
530         self.assertEqual(reply.major, 1)
531         self.logger.info(
532             "Working with ACL plugin version: %d.%d" % (reply.major, reply.minor)
533         )
534         # minor version changes are non breaking
535         # self.assertEqual(reply.minor, 0)
536
537     def test_0001_acl_create(self):
538         """ACL create/delete test"""
539
540         self.logger.info("ACLP_TEST_START_0001")
541         # Create a permit-1234 ACL
542         r = [AclRule(is_permit=1, proto=17, ports=1234, sport_to=1235)]
543         # Test 1: add a new ACL
544         first_acl = VppAcl(self, rules=r, tag="permit 1234")
545         first_acl.add_vpp_config()
546         self.assertTrue(first_acl.query_vpp_config())
547         # The very first ACL gets #0
548         self.assertEqual(first_acl.acl_index, 0)
549         rr = first_acl.dump()
550         self.logger.info("Dumped ACL: " + str(rr))
551         self.assertEqual(len(rr), 1)
552         # We should have the same number of ACL entries as we had asked
553         self.assertEqual(len(rr[0].r), len(r))
554         # The rules should be the same. But because the submitted and returned
555         # are different types, we need to iterate over rules and keys to get
556         # to basic values.
557         for i_rule in range(0, len(r) - 1):
558             encoded_rule = r[i_rule].encode()
559             for rule_key in encoded_rule:
560                 self.assertEqual(rr[0].r[i_rule][rule_key], encoded_rule[rule_key])
561
562         # Create a deny-1234 ACL
563         r_deny = [
564             AclRule(is_permit=0, proto=17, ports=1234, sport_to=1235),
565             AclRule(is_permit=1, proto=17, ports=0),
566         ]
567         second_acl = VppAcl(self, rules=r_deny, tag="deny 1234;permit all")
568         second_acl.add_vpp_config()
569         self.assertTrue(second_acl.query_vpp_config())
570         # The second ACL gets #1
571         self.assertEqual(second_acl.acl_index, 1)
572
573         # Test 2: try to modify a nonexistent ACL
574         invalid_acl = VppAcl(self, acl_index=432, rules=r, tag="FFFF:FFFF")
575         reply = invalid_acl.add_vpp_config(expect_error=True)
576
577         # apply an ACL on an interface inbound, try to delete ACL, must fail
578         acl_if_list = VppAclInterface(
579             self, sw_if_index=self.pg0.sw_if_index, n_input=1, acls=[first_acl]
580         )
581         acl_if_list.add_vpp_config()
582         first_acl.remove_vpp_config(expect_error=True)
583         # Unapply an ACL and then try to delete it - must be ok
584         acl_if_list.remove_vpp_config()
585         first_acl.remove_vpp_config()
586
587         # apply an ACL on an interface inbound, try to delete ACL, must fail
588         acl_if_list = VppAclInterface(
589             self, sw_if_index=self.pg0.sw_if_index, n_input=0, acls=[second_acl]
590         )
591         acl_if_list.add_vpp_config()
592         second_acl.remove_vpp_config(expect_error=True)
593         # Unapply an ACL and then try to delete it - must be ok
594         acl_if_list.remove_vpp_config()
595         second_acl.remove_vpp_config()
596
597         # try to apply a nonexistent ACL - must fail
598         acl_if_list = VppAclInterface(
599             self, sw_if_index=self.pg0.sw_if_index, n_input=0, acls=[invalid_acl]
600         )
601         acl_if_list.add_vpp_config(expect_error=True)
602
603         self.logger.info("ACLP_TEST_FINISH_0001")
604
605     def test_0002_acl_permit_apply(self):
606         """permit ACL apply test"""
607         self.logger.info("ACLP_TEST_START_0002")
608
609         rules = []
610         rules.append(
611             self.create_rule(self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.UDP])
612         )
613         rules.append(
614             self.create_rule(self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.TCP])
615         )
616
617         # Apply rules
618         acl_idx = self.apply_rules(rules, "permit per-flow")
619
620         # enable counters
621         reply = self.vapi.papi.acl_stats_intf_counters_enable(enable=1)
622
623         # Traffic should still pass
624         self.run_verify_test(self.IP, self.IPV4, -1)
625
626         matches = self.statistics.get_counter("/acl/%d/matches" % acl_idx)
627         self.logger.info("stat segment counters: %s" % repr(matches))
628         cli = "show acl-plugin acl"
629         self.logger.info(self.vapi.ppcli(cli))
630         cli = "show acl-plugin tables"
631         self.logger.info(self.vapi.ppcli(cli))
632
633         total_hits = matches[0][0]["packets"] + matches[0][1]["packets"]
634         self.assertEqual(total_hits, 64)
635
636         # disable counters
637         reply = self.vapi.papi.acl_stats_intf_counters_enable(enable=0)
638
639         self.logger.info("ACLP_TEST_FINISH_0002")
640
641     def test_0003_acl_deny_apply(self):
642         """deny ACL apply test"""
643         self.logger.info("ACLP_TEST_START_0003")
644         # Add a deny-flows ACL
645         rules = []
646         rules.append(
647             self.create_rule(
648                 self.IPV4, self.DENY, self.PORTS_ALL, self.proto[self.IP][self.UDP]
649             )
650         )
651         # Permit ip any any in the end
652         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_ALL, 0))
653
654         # Apply rules
655         acl_idx = self.apply_rules(rules, "deny per-flow;permit all")
656
657         # enable counters
658         reply = self.vapi.papi.acl_stats_intf_counters_enable(enable=1)
659
660         # Traffic should not pass
661         self.run_verify_negat_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP])
662
663         matches = self.statistics.get_counter("/acl/%d/matches" % acl_idx)
664         self.logger.info("stat segment counters: %s" % repr(matches))
665         cli = "show acl-plugin acl"
666         self.logger.info(self.vapi.ppcli(cli))
667         cli = "show acl-plugin tables"
668         self.logger.info(self.vapi.ppcli(cli))
669         self.assertEqual(matches[0][0]["packets"], 64)
670         # disable counters
671         reply = self.vapi.papi.acl_stats_intf_counters_enable(enable=0)
672         self.logger.info("ACLP_TEST_FINISH_0003")
673         # self.assertEqual(, 0)
674
675     def test_0004_vpp624_permit_icmpv4(self):
676         """VPP_624 permit ICMPv4"""
677         self.logger.info("ACLP_TEST_START_0004")
678
679         # Add an ACL
680         rules = []
681         rules.append(
682             self.create_rule(
683                 self.IPV4,
684                 self.PERMIT,
685                 self.PORTS_RANGE,
686                 self.proto[self.ICMP][self.ICMPv4],
687             )
688         )
689         # deny ip any any in the end
690         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
691
692         # Apply rules
693         self.apply_rules(rules, "permit icmpv4")
694
695         # Traffic should still pass
696         self.run_verify_test(self.ICMP, self.IPV4, self.proto[self.ICMP][self.ICMPv4])
697
698         self.logger.info("ACLP_TEST_FINISH_0004")
699
700     def test_0005_vpp624_permit_icmpv6(self):
701         """VPP_624 permit ICMPv6"""
702         self.logger.info("ACLP_TEST_START_0005")
703
704         # Add an ACL
705         rules = []
706         rules.append(
707             self.create_rule(
708                 self.IPV6,
709                 self.PERMIT,
710                 self.PORTS_RANGE,
711                 self.proto[self.ICMP][self.ICMPv6],
712             )
713         )
714         # deny ip any any in the end
715         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
716
717         # Apply rules
718         self.apply_rules(rules, "permit icmpv6")
719
720         # Traffic should still pass
721         self.run_verify_test(self.ICMP, self.IPV6, self.proto[self.ICMP][self.ICMPv6])
722
723         self.logger.info("ACLP_TEST_FINISH_0005")
724
725     def test_0006_vpp624_deny_icmpv4(self):
726         """VPP_624 deny ICMPv4"""
727         self.logger.info("ACLP_TEST_START_0006")
728         # Add an ACL
729         rules = []
730         rules.append(
731             self.create_rule(
732                 self.IPV4,
733                 self.DENY,
734                 self.PORTS_RANGE,
735                 self.proto[self.ICMP][self.ICMPv4],
736             )
737         )
738         # permit ip any any in the end
739         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_ALL, 0))
740
741         # Apply rules
742         self.apply_rules(rules, "deny icmpv4")
743
744         # Traffic should not pass
745         self.run_verify_negat_test(self.ICMP, self.IPV4, 0)
746
747         self.logger.info("ACLP_TEST_FINISH_0006")
748
749     def test_0007_vpp624_deny_icmpv6(self):
750         """VPP_624 deny ICMPv6"""
751         self.logger.info("ACLP_TEST_START_0007")
752         # Add an ACL
753         rules = []
754         rules.append(
755             self.create_rule(
756                 self.IPV6,
757                 self.DENY,
758                 self.PORTS_RANGE,
759                 self.proto[self.ICMP][self.ICMPv6],
760             )
761         )
762         # deny ip any any in the end
763         rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_ALL, 0))
764
765         # Apply rules
766         self.apply_rules(rules, "deny icmpv6")
767
768         # Traffic should not pass
769         self.run_verify_negat_test(self.ICMP, self.IPV6, 0)
770
771         self.logger.info("ACLP_TEST_FINISH_0007")
772
773     def test_0008_tcp_permit_v4(self):
774         """permit TCPv4"""
775         self.logger.info("ACLP_TEST_START_0008")
776
777         # Add an ACL
778         rules = []
779         rules.append(
780             self.create_rule(
781                 self.IPV4, self.PERMIT, self.PORTS_RANGE, self.proto[self.IP][self.TCP]
782             )
783         )
784         # deny ip any any in the end
785         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
786
787         # Apply rules
788         self.apply_rules(rules, "permit ipv4 tcp")
789
790         # Traffic should still pass
791         self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP])
792
793         self.logger.info("ACLP_TEST_FINISH_0008")
794
795     def test_0009_tcp_permit_v6(self):
796         """permit TCPv6"""
797         self.logger.info("ACLP_TEST_START_0009")
798
799         # Add an ACL
800         rules = []
801         rules.append(
802             self.create_rule(
803                 self.IPV6, self.PERMIT, self.PORTS_RANGE, self.proto[self.IP][self.TCP]
804             )
805         )
806         # deny ip any any in the end
807         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
808
809         # Apply rules
810         self.apply_rules(rules, "permit ip6 tcp")
811
812         # Traffic should still pass
813         self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.TCP])
814
815         self.logger.info("ACLP_TEST_FINISH_0008")
816
817     def test_0010_udp_permit_v4(self):
818         """permit UDPv4"""
819         self.logger.info("ACLP_TEST_START_0010")
820
821         # Add an ACL
822         rules = []
823         rules.append(
824             self.create_rule(
825                 self.IPV4, self.PERMIT, self.PORTS_RANGE, self.proto[self.IP][self.UDP]
826             )
827         )
828         # deny ip any any in the end
829         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
830
831         # Apply rules
832         self.apply_rules(rules, "permit ipv udp")
833
834         # Traffic should still pass
835         self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP])
836
837         self.logger.info("ACLP_TEST_FINISH_0010")
838
839     def test_0011_udp_permit_v6(self):
840         """permit UDPv6"""
841         self.logger.info("ACLP_TEST_START_0011")
842
843         # Add an ACL
844         rules = []
845         rules.append(
846             self.create_rule(
847                 self.IPV6, self.PERMIT, self.PORTS_RANGE, self.proto[self.IP][self.UDP]
848             )
849         )
850         # deny ip any any in the end
851         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
852
853         # Apply rules
854         self.apply_rules(rules, "permit ip6 udp")
855
856         # Traffic should still pass
857         self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.UDP])
858
859         self.logger.info("ACLP_TEST_FINISH_0011")
860
861     def test_0012_tcp_deny(self):
862         """deny TCPv4/v6"""
863         self.logger.info("ACLP_TEST_START_0012")
864
865         # Add an ACL
866         rules = []
867         rules.append(
868             self.create_rule(
869                 self.IPV4, self.DENY, self.PORTS_RANGE, self.proto[self.IP][self.TCP]
870             )
871         )
872         rules.append(
873             self.create_rule(
874                 self.IPV6, self.DENY, self.PORTS_RANGE, self.proto[self.IP][self.TCP]
875             )
876         )
877         # permit ip any any in the end
878         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_ALL, 0))
879         rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_ALL, 0))
880
881         # Apply rules
882         self.apply_rules(rules, "deny ip4/ip6 tcp")
883
884         # Traffic should not pass
885         self.run_verify_negat_test(
886             self.IP, self.IPRANDOM, self.proto[self.IP][self.TCP]
887         )
888
889         self.logger.info("ACLP_TEST_FINISH_0012")
890
891     def test_0013_udp_deny(self):
892         """deny UDPv4/v6"""
893         self.logger.info("ACLP_TEST_START_0013")
894
895         # Add an ACL
896         rules = []
897         rules.append(
898             self.create_rule(
899                 self.IPV4, self.DENY, self.PORTS_RANGE, self.proto[self.IP][self.UDP]
900             )
901         )
902         rules.append(
903             self.create_rule(
904                 self.IPV6, self.DENY, self.PORTS_RANGE, self.proto[self.IP][self.UDP]
905             )
906         )
907         # permit ip any any in the end
908         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_ALL, 0))
909         rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_ALL, 0))
910
911         # Apply rules
912         self.apply_rules(rules, "deny ip4/ip6 udp")
913
914         # Traffic should not pass
915         self.run_verify_negat_test(
916             self.IP, self.IPRANDOM, self.proto[self.IP][self.UDP]
917         )
918
919         self.logger.info("ACLP_TEST_FINISH_0013")
920
921     def test_0014_acl_dump(self):
922         """verify add/dump acls"""
923         self.logger.info("ACLP_TEST_START_0014")
924
925         r = [
926             [self.IPV4, self.PERMIT, 1234, self.proto[self.IP][self.TCP]],
927             [self.IPV4, self.PERMIT, 2345, self.proto[self.IP][self.UDP]],
928             [self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.TCP]],
929             [self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.UDP]],
930             [self.IPV4, self.PERMIT, 5, self.proto[self.ICMP][self.ICMPv4]],
931             [self.IPV6, self.PERMIT, 4321, self.proto[self.IP][self.TCP]],
932             [self.IPV6, self.PERMIT, 5432, self.proto[self.IP][self.UDP]],
933             [self.IPV6, self.PERMIT, 0, self.proto[self.IP][self.TCP]],
934             [self.IPV6, self.PERMIT, 0, self.proto[self.IP][self.UDP]],
935             [self.IPV6, self.PERMIT, 6, self.proto[self.ICMP][self.ICMPv6]],
936             [self.IPV4, self.DENY, self.PORTS_ALL, 0],
937             [self.IPV4, self.DENY, 1234, self.proto[self.IP][self.TCP]],
938             [self.IPV4, self.DENY, 2345, self.proto[self.IP][self.UDP]],
939             [self.IPV4, self.DENY, 5, self.proto[self.ICMP][self.ICMPv4]],
940             [self.IPV6, self.DENY, 4321, self.proto[self.IP][self.TCP]],
941             [self.IPV6, self.DENY, 5432, self.proto[self.IP][self.UDP]],
942             [self.IPV6, self.DENY, 6, self.proto[self.ICMP][self.ICMPv6]],
943             [self.IPV6, self.DENY, self.PORTS_ALL, 0],
944         ]
945
946         # Add and verify new ACLs
947         rules = []
948         for i in range(len(r)):
949             rules.append(self.create_rule(r[i][0], r[i][1], r[i][2], r[i][3]))
950
951         acl = VppAcl(self, rules=rules)
952         acl.add_vpp_config()
953         result = acl.dump()
954
955         i = 0
956         for drules in result:
957             for dr in drules.r:
958                 self.assertEqual(dr.is_permit, r[i][1])
959                 self.assertEqual(dr.proto, r[i][3])
960
961                 if r[i][2] > 0:
962                     self.assertEqual(dr.srcport_or_icmptype_first, r[i][2])
963                 else:
964                     if r[i][2] < 0:
965                         self.assertEqual(dr.srcport_or_icmptype_first, 0)
966                         self.assertEqual(dr.srcport_or_icmptype_last, 65535)
967                     else:
968                         if dr.proto == self.proto[self.IP][self.TCP]:
969                             self.assertGreater(
970                                 dr.srcport_or_icmptype_first, self.tcp_sport_from - 1
971                             )
972                             self.assertLess(
973                                 dr.srcport_or_icmptype_first, self.tcp_sport_to + 1
974                             )
975                             self.assertGreater(
976                                 dr.dstport_or_icmpcode_last, self.tcp_dport_from - 1
977                             )
978                             self.assertLess(
979                                 dr.dstport_or_icmpcode_last, self.tcp_dport_to + 1
980                             )
981                         elif dr.proto == self.proto[self.IP][self.UDP]:
982                             self.assertGreater(
983                                 dr.srcport_or_icmptype_first, self.udp_sport_from - 1
984                             )
985                             self.assertLess(
986                                 dr.srcport_or_icmptype_first, self.udp_sport_to + 1
987                             )
988                             self.assertGreater(
989                                 dr.dstport_or_icmpcode_last, self.udp_dport_from - 1
990                             )
991                             self.assertLess(
992                                 dr.dstport_or_icmpcode_last, self.udp_dport_to + 1
993                             )
994                 i += 1
995
996         self.logger.info("ACLP_TEST_FINISH_0014")
997
998     def test_0015_tcp_permit_port_v4(self):
999         """permit single TCPv4"""
1000         self.logger.info("ACLP_TEST_START_0015")
1001
1002         port = random.randint(16384, 65535)
1003         # Add an ACL
1004         rules = []
1005         rules.append(
1006             self.create_rule(
1007                 self.IPV4, self.PERMIT, port, self.proto[self.IP][self.TCP]
1008             )
1009         )
1010         # deny ip any any in the end
1011         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1012
1013         # Apply rules
1014         self.apply_rules(rules, "permit ip4 tcp %d" % port)
1015
1016         # Traffic should still pass
1017         self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP], port)
1018
1019         self.logger.info("ACLP_TEST_FINISH_0015")
1020
1021     def test_0016_udp_permit_port_v4(self):
1022         """permit single UDPv4"""
1023         self.logger.info("ACLP_TEST_START_0016")
1024
1025         port = random.randint(16384, 65535)
1026         # Add an ACL
1027         rules = []
1028         rules.append(
1029             self.create_rule(
1030                 self.IPV4, self.PERMIT, port, self.proto[self.IP][self.UDP]
1031             )
1032         )
1033         # deny ip any any in the end
1034         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1035
1036         # Apply rules
1037         self.apply_rules(rules, "permit ip4 tcp %d" % port)
1038
1039         # Traffic should still pass
1040         self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP], port)
1041
1042         self.logger.info("ACLP_TEST_FINISH_0016")
1043
1044     def test_0017_tcp_permit_port_v6(self):
1045         """permit single TCPv6"""
1046         self.logger.info("ACLP_TEST_START_0017")
1047
1048         port = random.randint(16384, 65535)
1049         # Add an ACL
1050         rules = []
1051         rules.append(
1052             self.create_rule(
1053                 self.IPV6, self.PERMIT, port, self.proto[self.IP][self.TCP]
1054             )
1055         )
1056         # deny ip any any in the end
1057         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1058
1059         # Apply rules
1060         self.apply_rules(rules, "permit ip4 tcp %d" % port)
1061
1062         # Traffic should still pass
1063         self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.TCP], port)
1064
1065         self.logger.info("ACLP_TEST_FINISH_0017")
1066
1067     def test_0018_udp_permit_port_v6(self):
1068         """permit single UDPv6"""
1069         self.logger.info("ACLP_TEST_START_0018")
1070
1071         port = random.randint(16384, 65535)
1072         # Add an ACL
1073         rules = []
1074         rules.append(
1075             self.create_rule(
1076                 self.IPV6, self.PERMIT, port, self.proto[self.IP][self.UDP]
1077             )
1078         )
1079         # deny ip any any in the end
1080         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1081
1082         # Apply rules
1083         self.apply_rules(rules, "permit ip4 tcp %d" % port)
1084
1085         # Traffic should still pass
1086         self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.UDP], port)
1087
1088         self.logger.info("ACLP_TEST_FINISH_0018")
1089
1090     def test_0019_udp_deny_port(self):
1091         """deny single TCPv4/v6"""
1092         self.logger.info("ACLP_TEST_START_0019")
1093
1094         port = random.randint(16384, 65535)
1095         # Add an ACL
1096         rules = []
1097         rules.append(
1098             self.create_rule(self.IPV4, self.DENY, port, self.proto[self.IP][self.TCP])
1099         )
1100         rules.append(
1101             self.create_rule(self.IPV6, self.DENY, port, self.proto[self.IP][self.TCP])
1102         )
1103         # Permit ip any any in the end
1104         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_ALL, 0))
1105         rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_ALL, 0))
1106
1107         # Apply rules
1108         self.apply_rules(rules, "deny ip4/ip6 udp %d" % port)
1109
1110         # Traffic should not pass
1111         self.run_verify_negat_test(
1112             self.IP, self.IPRANDOM, self.proto[self.IP][self.TCP], port
1113         )
1114
1115         self.logger.info("ACLP_TEST_FINISH_0019")
1116
1117     def test_0020_udp_deny_port(self):
1118         """deny single UDPv4/v6"""
1119         self.logger.info("ACLP_TEST_START_0020")
1120
1121         port = random.randint(16384, 65535)
1122         # Add an ACL
1123         rules = []
1124         rules.append(
1125             self.create_rule(self.IPV4, self.DENY, port, self.proto[self.IP][self.UDP])
1126         )
1127         rules.append(
1128             self.create_rule(self.IPV6, self.DENY, port, self.proto[self.IP][self.UDP])
1129         )
1130         # Permit ip any any in the end
1131         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_ALL, 0))
1132         rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_ALL, 0))
1133
1134         # Apply rules
1135         self.apply_rules(rules, "deny ip4/ip6 udp %d" % port)
1136
1137         # Traffic should not pass
1138         self.run_verify_negat_test(
1139             self.IP, self.IPRANDOM, self.proto[self.IP][self.UDP], port
1140         )
1141
1142         self.logger.info("ACLP_TEST_FINISH_0020")
1143
1144     def test_0021_udp_deny_port_verify_fragment_deny(self):
1145         """deny single UDPv4/v6, permit ip any, verify non-initial fragment
1146         blocked
1147         """
1148         self.logger.info("ACLP_TEST_START_0021")
1149
1150         port = random.randint(16384, 65535)
1151         # Add an ACL
1152         rules = []
1153         rules.append(
1154             self.create_rule(self.IPV4, self.DENY, port, self.proto[self.IP][self.UDP])
1155         )
1156         rules.append(
1157             self.create_rule(self.IPV6, self.DENY, port, self.proto[self.IP][self.UDP])
1158         )
1159         # deny ip any any in the end
1160         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_ALL, 0))
1161         rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_ALL, 0))
1162
1163         # Apply rules
1164         self.apply_rules(rules, "deny ip4/ip6 udp %d" % port)
1165
1166         # Traffic should not pass
1167         self.run_verify_negat_test(
1168             self.IP, self.IPRANDOM, self.proto[self.IP][self.UDP], port, True
1169         )
1170
1171         self.logger.info("ACLP_TEST_FINISH_0021")
1172
1173     def test_0022_zero_length_udp_ipv4(self):
1174         """VPP-687 zero length udp ipv4 packet"""
1175         self.logger.info("ACLP_TEST_START_0022")
1176
1177         port = random.randint(16384, 65535)
1178         # Add an ACL
1179         rules = []
1180         rules.append(
1181             self.create_rule(
1182                 self.IPV4, self.PERMIT, port, self.proto[self.IP][self.UDP]
1183             )
1184         )
1185         # deny ip any any in the end
1186         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1187
1188         # Apply rules
1189         self.apply_rules(rules, "permit empty udp ip4 %d" % port)
1190
1191         # Traffic should still pass
1192         # Create incoming packet streams for packet-generator interfaces
1193         pkts_cnt = 0
1194         pkts = self.create_stream(
1195             self.pg0,
1196             self.pg_if_packet_sizes,
1197             self.IP,
1198             self.IPV4,
1199             self.proto[self.IP][self.UDP],
1200             port,
1201             False,
1202             False,
1203         )
1204         if len(pkts) > 0:
1205             self.pg0.add_stream(pkts)
1206             pkts_cnt += len(pkts)
1207
1208         # Enable packet capture and start packet sendingself.IPV
1209         self.pg_enable_capture(self.pg_interfaces)
1210         self.pg_start()
1211
1212         self.pg1.get_capture(pkts_cnt)
1213
1214         self.logger.info("ACLP_TEST_FINISH_0022")
1215
1216     def test_0023_zero_length_udp_ipv6(self):
1217         """VPP-687 zero length udp ipv6 packet"""
1218         self.logger.info("ACLP_TEST_START_0023")
1219
1220         port = random.randint(16384, 65535)
1221         # Add an ACL
1222         rules = []
1223         rules.append(
1224             self.create_rule(
1225                 self.IPV6, self.PERMIT, port, self.proto[self.IP][self.UDP]
1226             )
1227         )
1228         # deny ip any any in the end
1229         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1230
1231         # Apply rules
1232         self.apply_rules(rules, "permit empty udp ip6 %d" % port)
1233
1234         # Traffic should still pass
1235         # Create incoming packet streams for packet-generator interfaces
1236         pkts_cnt = 0
1237         pkts = self.create_stream(
1238             self.pg0,
1239             self.pg_if_packet_sizes,
1240             self.IP,
1241             self.IPV6,
1242             self.proto[self.IP][self.UDP],
1243             port,
1244             False,
1245             False,
1246         )
1247         if len(pkts) > 0:
1248             self.pg0.add_stream(pkts)
1249             pkts_cnt += len(pkts)
1250
1251         # Enable packet capture and start packet sendingself.IPV
1252         self.pg_enable_capture(self.pg_interfaces)
1253         self.pg_start()
1254
1255         # Verify outgoing packet streams per packet-generator interface
1256         self.pg1.get_capture(pkts_cnt)
1257
1258         self.logger.info("ACLP_TEST_FINISH_0023")
1259
1260     def test_0108_tcp_permit_v4(self):
1261         """permit TCPv4 + non-match range"""
1262         self.logger.info("ACLP_TEST_START_0108")
1263
1264         # Add an ACL
1265         rules = []
1266         rules.append(
1267             self.create_rule(
1268                 self.IPV4, self.DENY, self.PORTS_RANGE_2, self.proto[self.IP][self.TCP]
1269             )
1270         )
1271         rules.append(
1272             self.create_rule(
1273                 self.IPV4, self.PERMIT, self.PORTS_RANGE, self.proto[self.IP][self.TCP]
1274             )
1275         )
1276         # deny ip any any in the end
1277         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1278
1279         # Apply rules
1280         self.apply_rules(rules, "permit ipv4 tcp")
1281
1282         # Traffic should still pass
1283         self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP])
1284
1285         self.logger.info("ACLP_TEST_FINISH_0108")
1286
1287     def test_0109_tcp_permit_v6(self):
1288         """permit TCPv6 + non-match range"""
1289         self.logger.info("ACLP_TEST_START_0109")
1290
1291         # Add an ACL
1292         rules = []
1293         rules.append(
1294             self.create_rule(
1295                 self.IPV6, self.DENY, self.PORTS_RANGE_2, self.proto[self.IP][self.TCP]
1296             )
1297         )
1298         rules.append(
1299             self.create_rule(
1300                 self.IPV6, self.PERMIT, self.PORTS_RANGE, self.proto[self.IP][self.TCP]
1301             )
1302         )
1303         # deny ip any any in the end
1304         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1305
1306         # Apply rules
1307         self.apply_rules(rules, "permit ip6 tcp")
1308
1309         # Traffic should still pass
1310         self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.TCP])
1311
1312         self.logger.info("ACLP_TEST_FINISH_0109")
1313
1314     def test_0110_udp_permit_v4(self):
1315         """permit UDPv4 + non-match range"""
1316         self.logger.info("ACLP_TEST_START_0110")
1317
1318         # Add an ACL
1319         rules = []
1320         rules.append(
1321             self.create_rule(
1322                 self.IPV4, self.DENY, self.PORTS_RANGE_2, self.proto[self.IP][self.UDP]
1323             )
1324         )
1325         rules.append(
1326             self.create_rule(
1327                 self.IPV4, self.PERMIT, self.PORTS_RANGE, self.proto[self.IP][self.UDP]
1328             )
1329         )
1330         # deny ip any any in the end
1331         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1332
1333         # Apply rules
1334         self.apply_rules(rules, "permit ipv4 udp")
1335
1336         # Traffic should still pass
1337         self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP])
1338
1339         self.logger.info("ACLP_TEST_FINISH_0110")
1340
1341     def test_0111_udp_permit_v6(self):
1342         """permit UDPv6 + non-match range"""
1343         self.logger.info("ACLP_TEST_START_0111")
1344
1345         # Add an ACL
1346         rules = []
1347         rules.append(
1348             self.create_rule(
1349                 self.IPV6, self.DENY, self.PORTS_RANGE_2, self.proto[self.IP][self.UDP]
1350             )
1351         )
1352         rules.append(
1353             self.create_rule(
1354                 self.IPV6, self.PERMIT, self.PORTS_RANGE, self.proto[self.IP][self.UDP]
1355             )
1356         )
1357         # deny ip any any in the end
1358         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1359
1360         # Apply rules
1361         self.apply_rules(rules, "permit ip6 udp")
1362
1363         # Traffic should still pass
1364         self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.UDP])
1365
1366         self.logger.info("ACLP_TEST_FINISH_0111")
1367
1368     def test_0112_tcp_deny(self):
1369         """deny TCPv4/v6 + non-match range"""
1370         self.logger.info("ACLP_TEST_START_0112")
1371
1372         # Add an ACL
1373         rules = []
1374         rules.append(
1375             self.create_rule(
1376                 self.IPV4,
1377                 self.PERMIT,
1378                 self.PORTS_RANGE_2,
1379                 self.proto[self.IP][self.TCP],
1380             )
1381         )
1382         rules.append(
1383             self.create_rule(
1384                 self.IPV6,
1385                 self.PERMIT,
1386                 self.PORTS_RANGE_2,
1387                 self.proto[self.IP][self.TCP],
1388             )
1389         )
1390         rules.append(
1391             self.create_rule(
1392                 self.IPV4, self.DENY, self.PORTS_RANGE, self.proto[self.IP][self.TCP]
1393             )
1394         )
1395         rules.append(
1396             self.create_rule(
1397                 self.IPV6, self.DENY, self.PORTS_RANGE, self.proto[self.IP][self.TCP]
1398             )
1399         )
1400         # permit ip any any in the end
1401         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_ALL, 0))
1402         rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_ALL, 0))
1403
1404         # Apply rules
1405         self.apply_rules(rules, "deny ip4/ip6 tcp")
1406
1407         # Traffic should not pass
1408         self.run_verify_negat_test(
1409             self.IP, self.IPRANDOM, self.proto[self.IP][self.TCP]
1410         )
1411
1412         self.logger.info("ACLP_TEST_FINISH_0112")
1413
1414     def test_0113_udp_deny(self):
1415         """deny UDPv4/v6 + non-match range"""
1416         self.logger.info("ACLP_TEST_START_0113")
1417
1418         # Add an ACL
1419         rules = []
1420         rules.append(
1421             self.create_rule(
1422                 self.IPV4,
1423                 self.PERMIT,
1424                 self.PORTS_RANGE_2,
1425                 self.proto[self.IP][self.UDP],
1426             )
1427         )
1428         rules.append(
1429             self.create_rule(
1430                 self.IPV6,
1431                 self.PERMIT,
1432                 self.PORTS_RANGE_2,
1433                 self.proto[self.IP][self.UDP],
1434             )
1435         )
1436         rules.append(
1437             self.create_rule(
1438                 self.IPV4, self.DENY, self.PORTS_RANGE, self.proto[self.IP][self.UDP]
1439             )
1440         )
1441         rules.append(
1442             self.create_rule(
1443                 self.IPV6, self.DENY, self.PORTS_RANGE, self.proto[self.IP][self.UDP]
1444             )
1445         )
1446         # permit ip any any in the end
1447         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_ALL, 0))
1448         rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_ALL, 0))
1449
1450         # Apply rules
1451         self.apply_rules(rules, "deny ip4/ip6 udp")
1452
1453         # Traffic should not pass
1454         self.run_verify_negat_test(
1455             self.IP, self.IPRANDOM, self.proto[self.IP][self.UDP]
1456         )
1457
1458         self.logger.info("ACLP_TEST_FINISH_0113")
1459
1460     def test_0300_tcp_permit_v4_etype_aaaa(self):
1461         """permit TCPv4, send 0xAAAA etype"""
1462         self.logger.info("ACLP_TEST_START_0300")
1463
1464         # Add an ACL
1465         rules = []
1466         rules.append(
1467             self.create_rule(
1468                 self.IPV4, self.DENY, self.PORTS_RANGE_2, self.proto[self.IP][self.TCP]
1469             )
1470         )
1471         rules.append(
1472             self.create_rule(
1473                 self.IPV4, self.PERMIT, self.PORTS_RANGE, self.proto[self.IP][self.TCP]
1474             )
1475         )
1476         # deny ip any any in the end
1477         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1478
1479         # Apply rules
1480         self.apply_rules(rules, "permit ipv4 tcp")
1481
1482         # Traffic should still pass also for an odd ethertype
1483         self.run_verify_test(
1484             self.IP, self.IPV4, self.proto[self.IP][self.TCP], 0, False, True, 0xAAAA
1485         )
1486         self.logger.info("ACLP_TEST_FINISH_0300")
1487
1488     def test_0305_tcp_permit_v4_etype_blacklist_aaaa(self):
1489         """permit TCPv4, whitelist 0x0BBB ethertype, send 0xAAAA-blocked"""
1490         self.logger.info("ACLP_TEST_START_0305")
1491
1492         # Add an ACL
1493         rules = []
1494         rules.append(
1495             self.create_rule(
1496                 self.IPV4, self.DENY, self.PORTS_RANGE_2, self.proto[self.IP][self.TCP]
1497             )
1498         )
1499         rules.append(
1500             self.create_rule(
1501                 self.IPV4, self.PERMIT, self.PORTS_RANGE, self.proto[self.IP][self.TCP]
1502             )
1503         )
1504         # deny ip any any in the end
1505         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1506
1507         # Apply rules
1508         self.apply_rules(rules, "permit ipv4 tcp")
1509         # whitelist the 0xbbbb etype - so the 0xaaaa should be blocked
1510         self.etype_whitelist([0xBBB], 1)
1511
1512         # The oddball ethertype should be blocked
1513         self.run_verify_negat_test(
1514             self.IP, self.IPV4, self.proto[self.IP][self.TCP], 0, False, 0xAAAA
1515         )
1516
1517         # remove the whitelist
1518         self.etype_whitelist([], 0, add=False)
1519
1520         self.logger.info("ACLP_TEST_FINISH_0305")
1521
1522     def test_0306_tcp_permit_v4_etype_blacklist_aaaa(self):
1523         """permit TCPv4, whitelist 0x0BBB ethertype, send 0x0BBB - pass"""
1524         self.logger.info("ACLP_TEST_START_0306")
1525
1526         # Add an ACL
1527         rules = []
1528         rules.append(
1529             self.create_rule(
1530                 self.IPV4, self.DENY, self.PORTS_RANGE_2, self.proto[self.IP][self.TCP]
1531             )
1532         )
1533         rules.append(
1534             self.create_rule(
1535                 self.IPV4, self.PERMIT, self.PORTS_RANGE, self.proto[self.IP][self.TCP]
1536             )
1537         )
1538         # deny ip any any in the end
1539         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1540
1541         # Apply rules
1542         self.apply_rules(rules, "permit ipv4 tcp")
1543         # whitelist the 0xbbbb etype - so the 0xaaaa should be blocked
1544         self.etype_whitelist([0xBBB], 1)
1545
1546         # The whitelisted traffic, should pass
1547         self.run_verify_test(
1548             self.IP, self.IPV4, self.proto[self.IP][self.TCP], 0, False, True, 0x0BBB
1549         )
1550
1551         # remove the whitelist, the previously blocked 0xAAAA should pass now
1552         self.etype_whitelist([], 0, add=False)
1553
1554         self.logger.info("ACLP_TEST_FINISH_0306")
1555
1556     def test_0307_tcp_permit_v4_etype_blacklist_aaaa(self):
1557         """permit TCPv4, whitelist 0x0BBB, remove, send 0xAAAA - pass"""
1558         self.logger.info("ACLP_TEST_START_0307")
1559
1560         # Add an ACL
1561         rules = []
1562         rules.append(
1563             self.create_rule(
1564                 self.IPV4, self.DENY, self.PORTS_RANGE_2, self.proto[self.IP][self.TCP]
1565             )
1566         )
1567         rules.append(
1568             self.create_rule(
1569                 self.IPV4, self.PERMIT, self.PORTS_RANGE, self.proto[self.IP][self.TCP]
1570             )
1571         )
1572         # deny ip any any in the end
1573         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1574
1575         # Apply rules
1576         self.apply_rules(rules, "permit ipv4 tcp")
1577
1578         # whitelist the 0xbbbb etype - so the 0xaaaa should be blocked
1579         self.etype_whitelist([0xBBB], 1)
1580         # remove the whitelist, the previously blocked 0xAAAA should pass now
1581         self.etype_whitelist([], 0, add=False)
1582
1583         # The whitelisted traffic, should pass
1584         self.run_verify_test(
1585             self.IP, self.IPV4, self.proto[self.IP][self.TCP], 0, False, True, 0xAAAA
1586         )
1587
1588         self.logger.info("ACLP_TEST_FINISH_0306")
1589
1590     def test_0315_del_intf(self):
1591         """apply an acl and delete the interface"""
1592         self.logger.info("ACLP_TEST_START_0315")
1593
1594         # Add an ACL
1595         rules = []
1596         rules.append(
1597             self.create_rule(
1598                 self.IPV4, self.DENY, self.PORTS_RANGE_2, self.proto[self.IP][self.TCP]
1599             )
1600         )
1601         rules.append(
1602             self.create_rule(
1603                 self.IPV4, self.PERMIT, self.PORTS_RANGE, self.proto[self.IP][self.TCP]
1604             )
1605         )
1606         # deny ip any any in the end
1607         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1608
1609         # create an interface
1610         intf = []
1611         intf.append(VppLoInterface(self))
1612
1613         # Apply rules
1614         self.apply_rules_to(rules, "permit ipv4 tcp", intf[0].sw_if_index)
1615
1616         # Remove the interface
1617         intf[0].remove_vpp_config()
1618
1619         self.logger.info("ACLP_TEST_FINISH_0315")
1620
1621
1622 if __name__ == "__main__":
1623     unittest.main(testRunner=VppTestRunner)