l2: Add bridge_domain_add_del_v2 to l2 api
[vpp.git] / test / test_acl_plugin.py
1 #!/usr/bin/env python3
2 """ACL plugin Test Case HLD:
3 """
4
5 import unittest
6 import random
7
8 from scapy.packet import Raw
9 from scapy.layers.l2 import Ether
10 from scapy.layers.inet import IP, TCP, UDP, ICMP
11 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest
12 from scapy.layers.inet6 import IPv6ExtHdrFragment
13 from framework import VppTestCase, VppTestRunner
14 from framework import tag_fixme_vpp_workers
15 from util import Host, ppp
16 from ipaddress import IPv4Network, IPv6Network
17
18 from vpp_lo_interface import VppLoInterface
19 from vpp_acl import AclRule, VppAcl, VppAclInterface, VppEtypeWhitelist
20 from vpp_ip import INVALID_INDEX
21
22
23 @tag_fixme_vpp_workers
24 class TestACLplugin(VppTestCase):
25     """ACL plugin Test Case"""
26
27     # traffic types
28     IP = 0
29     ICMP = 1
30
31     # IP version
32     IPRANDOM = -1
33     IPV4 = 0
34     IPV6 = 1
35
36     # rule types
37     DENY = 0
38     PERMIT = 1
39
40     # supported protocols
41     proto = [[6, 17], [1, 58]]
42     proto_map = {1: "ICMP", 58: "ICMPv6EchoRequest", 6: "TCP", 17: "UDP"}
43     ICMPv4 = 0
44     ICMPv6 = 1
45     TCP = 0
46     UDP = 1
47     PROTO_ALL = 0
48
49     # port ranges
50     PORTS_ALL = -1
51     PORTS_RANGE = 0
52     PORTS_RANGE_2 = 1
53     udp_sport_from = 10
54     udp_sport_to = udp_sport_from + 5
55     udp_dport_from = 20000
56     udp_dport_to = udp_dport_from + 5000
57     tcp_sport_from = 30
58     tcp_sport_to = tcp_sport_from + 5
59     tcp_dport_from = 40000
60     tcp_dport_to = tcp_dport_from + 5000
61
62     udp_sport_from_2 = 90
63     udp_sport_to_2 = udp_sport_from_2 + 5
64     udp_dport_from_2 = 30000
65     udp_dport_to_2 = udp_dport_from_2 + 5000
66     tcp_sport_from_2 = 130
67     tcp_sport_to_2 = tcp_sport_from_2 + 5
68     tcp_dport_from_2 = 20000
69     tcp_dport_to_2 = tcp_dport_from_2 + 5000
70
71     icmp4_type = 8  # echo request
72     icmp4_code = 3
73     icmp6_type = 128  # echo request
74     icmp6_code = 3
75
76     icmp4_type_2 = 8
77     icmp4_code_from_2 = 5
78     icmp4_code_to_2 = 20
79     icmp6_type_2 = 128
80     icmp6_code_from_2 = 8
81     icmp6_code_to_2 = 42
82
83     # Test variables
84     bd_id = 1
85
86     @classmethod
87     def setUpClass(cls):
88         """
89         Perform standard class setup (defined by class method setUpClass in
90         class VppTestCase) before running the test case, set test case related
91         variables and configure VPP.
92         """
93         super(TestACLplugin, cls).setUpClass()
94
95         try:
96             # Create 2 pg interfaces
97             cls.create_pg_interfaces(range(2))
98
99             # Packet flows mapping pg0 -> pg1, pg2 etc.
100             cls.flows = dict()
101             cls.flows[cls.pg0] = [cls.pg1]
102
103             # Packet sizes
104             cls.pg_if_packet_sizes = [64, 512, 1518, 9018]
105
106             # Create BD with MAC learning and unknown unicast flooding disabled
107             # and put interfaces to this BD
108             cls.vapi.bridge_domain_add_del_v2(
109                 bd_id=cls.bd_id, uu_flood=1, learn=1, flood=1, forward=1, is_add=1
110             )
111             for pg_if in cls.pg_interfaces:
112                 cls.vapi.sw_interface_set_l2_bridge(
113                     rx_sw_if_index=pg_if.sw_if_index, bd_id=cls.bd_id
114                 )
115
116             # Set up all interfaces
117             for i in cls.pg_interfaces:
118                 i.admin_up()
119
120             # Mapping between packet-generator index and lists of test hosts
121             cls.hosts_by_pg_idx = dict()
122             for pg_if in cls.pg_interfaces:
123                 cls.hosts_by_pg_idx[pg_if.sw_if_index] = []
124
125             # Create list of deleted hosts
126             cls.deleted_hosts_by_pg_idx = dict()
127             for pg_if in cls.pg_interfaces:
128                 cls.deleted_hosts_by_pg_idx[pg_if.sw_if_index] = []
129
130             # warm-up the mac address tables
131             # self.warmup_test()
132             count = 16
133             start = 0
134             n_int = len(cls.pg_interfaces)
135             macs_per_if = count // n_int
136             i = -1
137             for pg_if in cls.pg_interfaces:
138                 i += 1
139                 start_nr = macs_per_if * i + start
140                 end_nr = (
141                     count + start if i == (n_int - 1) else macs_per_if * (i + 1) + start
142                 )
143                 hosts = cls.hosts_by_pg_idx[pg_if.sw_if_index]
144                 for j in range(int(start_nr), int(end_nr)):
145                     host = Host(
146                         "00:00:00:ff:%02x:%02x" % (pg_if.sw_if_index, j),
147                         "172.17.1%02x.%u" % (pg_if.sw_if_index, j),
148                         "2017:dead:%02x::%u" % (pg_if.sw_if_index, j),
149                     )
150                     hosts.append(host)
151
152         except Exception:
153             super(TestACLplugin, cls).tearDownClass()
154             raise
155
156     @classmethod
157     def tearDownClass(cls):
158         super(TestACLplugin, cls).tearDownClass()
159
160     def setUp(self):
161         super(TestACLplugin, self).setUp()
162         self.reset_packet_infos()
163
164     def tearDown(self):
165         """
166         Show various debug prints after each test.
167         """
168         super(TestACLplugin, self).tearDown()
169
170     def show_commands_at_teardown(self):
171         cli = "show vlib graph l2-input-feat-arc"
172         self.logger.info(self.vapi.ppcli(cli))
173         cli = "show vlib graph l2-input-feat-arc-end"
174         self.logger.info(self.vapi.ppcli(cli))
175         cli = "show vlib graph l2-output-feat-arc"
176         self.logger.info(self.vapi.ppcli(cli))
177         cli = "show vlib graph l2-output-feat-arc-end"
178         self.logger.info(self.vapi.ppcli(cli))
179         self.logger.info(self.vapi.ppcli("show l2fib verbose"))
180         self.logger.info(self.vapi.ppcli("show acl-plugin acl"))
181         self.logger.info(self.vapi.ppcli("show acl-plugin interface"))
182         self.logger.info(self.vapi.ppcli("show acl-plugin tables"))
183         self.logger.info(self.vapi.ppcli("show bridge-domain %s detail" % self.bd_id))
184
185     def create_rule(
186         self,
187         ip=0,
188         permit_deny=0,
189         ports=PORTS_ALL,
190         proto=-1,
191         s_prefix=0,
192         s_ip=0,
193         d_prefix=0,
194         d_ip=0,
195     ):
196         if ip:
197             src_prefix = IPv6Network((s_ip, s_prefix))
198             dst_prefix = IPv6Network((d_ip, d_prefix))
199         else:
200             src_prefix = IPv4Network((s_ip, s_prefix))
201             dst_prefix = IPv4Network((d_ip, d_prefix))
202         return AclRule(
203             is_permit=permit_deny,
204             ports=ports,
205             proto=proto,
206             src_prefix=src_prefix,
207             dst_prefix=dst_prefix,
208         )
209
210     def apply_rules(self, rules, tag=None):
211         acl = VppAcl(self, rules, tag=tag)
212         acl.add_vpp_config()
213         self.logger.info("Dumped ACL: " + str(acl.dump()))
214         # Apply a ACL on the interface as inbound
215         for i in self.pg_interfaces:
216             acl_if = VppAclInterface(
217                 self, sw_if_index=i.sw_if_index, n_input=1, acls=[acl]
218             )
219             acl_if.add_vpp_config()
220         return acl.acl_index
221
222     def apply_rules_to(self, rules, tag=None, sw_if_index=INVALID_INDEX):
223         acl = VppAcl(self, rules, tag=tag)
224         acl.add_vpp_config()
225         self.logger.info("Dumped ACL: " + str(acl.dump()))
226         # Apply a ACL on the interface as inbound
227         acl_if = VppAclInterface(self, sw_if_index=sw_if_index, n_input=1, acls=[acl])
228         return acl.acl_index
229
230     def etype_whitelist(self, whitelist, n_input, add=True):
231         # Apply whitelists on all the interfaces
232         if add:
233             self._wl = []
234             for i in self.pg_interfaces:
235                 self._wl.append(
236                     VppEtypeWhitelist(
237                         self,
238                         sw_if_index=i.sw_if_index,
239                         whitelist=whitelist,
240                         n_input=n_input,
241                     ).add_vpp_config()
242                 )
243         else:
244             if hasattr(self, "_wl"):
245                 for wl in self._wl:
246                     wl.remove_vpp_config()
247
248     def create_upper_layer(self, packet_index, proto, ports=0):
249         p = self.proto_map[proto]
250         if p == "UDP":
251             if ports == 0:
252                 return UDP(
253                     sport=random.randint(self.udp_sport_from, self.udp_sport_to),
254                     dport=random.randint(self.udp_dport_from, self.udp_dport_to),
255                 )
256             else:
257                 return UDP(sport=ports, dport=ports)
258         elif p == "TCP":
259             if ports == 0:
260                 return TCP(
261                     sport=random.randint(self.tcp_sport_from, self.tcp_sport_to),
262                     dport=random.randint(self.tcp_dport_from, self.tcp_dport_to),
263                 )
264             else:
265                 return TCP(sport=ports, dport=ports)
266         return ""
267
268     def create_stream(
269         self,
270         src_if,
271         packet_sizes,
272         traffic_type=0,
273         ipv6=0,
274         proto=-1,
275         ports=0,
276         fragments=False,
277         pkt_raw=True,
278         etype=-1,
279     ):
280         """
281         Create input packet stream for defined interface using hosts or
282         deleted_hosts list.
283
284         :param object src_if: Interface to create packet stream for.
285         :param list packet_sizes: List of required packet sizes.
286         :param traffic_type: 1: ICMP packet, 2: IPv6 with EH, 0: otherwise.
287         :return: Stream of packets.
288         """
289         pkts = []
290         if self.flows.__contains__(src_if):
291             src_hosts = self.hosts_by_pg_idx[src_if.sw_if_index]
292             for dst_if in self.flows[src_if]:
293                 dst_hosts = self.hosts_by_pg_idx[dst_if.sw_if_index]
294                 n_int = len(dst_hosts) * len(src_hosts)
295                 for i in range(0, n_int):
296                     dst_host = dst_hosts[int(i / len(src_hosts))]
297                     src_host = src_hosts[i % len(src_hosts)]
298                     pkt_info = self.create_packet_info(src_if, dst_if)
299                     if ipv6 == 1:
300                         pkt_info.ip = 1
301                     elif ipv6 == 0:
302                         pkt_info.ip = 0
303                     else:
304                         pkt_info.ip = random.choice([0, 1])
305                     if proto == -1:
306                         pkt_info.proto = random.choice(self.proto[self.IP])
307                     else:
308                         pkt_info.proto = proto
309                     payload = self.info_to_payload(pkt_info)
310                     p = Ether(dst=dst_host.mac, src=src_host.mac)
311                     if etype > 0:
312                         p = Ether(dst=dst_host.mac, src=src_host.mac, type=etype)
313                     if pkt_info.ip:
314                         p /= IPv6(dst=dst_host.ip6, src=src_host.ip6)
315                         if fragments:
316                             p /= IPv6ExtHdrFragment(offset=64, m=1)
317                     else:
318                         if fragments:
319                             p /= IP(
320                                 src=src_host.ip4, dst=dst_host.ip4, flags=1, frag=64
321                             )
322                         else:
323                             p /= IP(src=src_host.ip4, dst=dst_host.ip4)
324                     if traffic_type == self.ICMP:
325                         if pkt_info.ip:
326                             p /= ICMPv6EchoRequest(
327                                 type=self.icmp6_type, code=self.icmp6_code
328                             )
329                         else:
330                             p /= ICMP(type=self.icmp4_type, code=self.icmp4_code)
331                     else:
332                         p /= self.create_upper_layer(i, pkt_info.proto, ports)
333                     if pkt_raw:
334                         p /= Raw(payload)
335                         pkt_info.data = p.copy()
336                     if pkt_raw:
337                         size = random.choice(packet_sizes)
338                         self.extend_packet(p, size)
339                     pkts.append(p)
340         return pkts
341
342     def verify_capture(self, pg_if, capture, traffic_type=0, ip_type=0, etype=-1):
343         """
344         Verify captured input packet stream for defined interface.
345
346         :param object pg_if: Interface to verify captured packet stream for.
347         :param list capture: Captured packet stream.
348         :param traffic_type: 1: ICMP packet, 2: IPv6 with EH, 0: otherwise.
349         """
350         last_info = dict()
351         for i in self.pg_interfaces:
352             last_info[i.sw_if_index] = None
353         dst_sw_if_index = pg_if.sw_if_index
354         for packet in capture:
355             if etype > 0:
356                 if packet[Ether].type != etype:
357                     self.logger.error(ppp("Unexpected ethertype in packet:", packet))
358                 else:
359                     continue
360             try:
361                 # Raw data for ICMPv6 are stored in ICMPv6EchoRequest.data
362                 if traffic_type == self.ICMP and ip_type == self.IPV6:
363                     payload_info = self.payload_to_info(
364                         packet[ICMPv6EchoRequest], "data"
365                     )
366                     payload = packet[ICMPv6EchoRequest]
367                 else:
368                     payload_info = self.payload_to_info(packet[Raw])
369                     payload = packet[self.proto_map[payload_info.proto]]
370             except:
371                 self.logger.error(
372                     ppp("Unexpected or invalid packet (outside network):", packet)
373                 )
374                 raise
375
376             if ip_type != 0:
377                 self.assertEqual(payload_info.ip, ip_type)
378             if traffic_type == self.ICMP:
379                 try:
380                     if payload_info.ip == 0:
381                         self.assertEqual(payload.type, self.icmp4_type)
382                         self.assertEqual(payload.code, self.icmp4_code)
383                     else:
384                         self.assertEqual(payload.type, self.icmp6_type)
385                         self.assertEqual(payload.code, self.icmp6_code)
386                 except:
387                     self.logger.error(
388                         ppp("Unexpected or invalid packet (outside network):", packet)
389                     )
390                     raise
391             else:
392                 try:
393                     ip_version = IPv6 if payload_info.ip == 1 else IP
394
395                     ip = packet[ip_version]
396                     packet_index = payload_info.index
397
398                     self.assertEqual(payload_info.dst, dst_sw_if_index)
399                     self.logger.debug(
400                         "Got packet on port %s: src=%u (id=%u)"
401                         % (pg_if.name, payload_info.src, packet_index)
402                     )
403                     next_info = self.get_next_packet_info_for_interface2(
404                         payload_info.src, dst_sw_if_index, last_info[payload_info.src]
405                     )
406                     last_info[payload_info.src] = next_info
407                     self.assertTrue(next_info is not None)
408                     self.assertEqual(packet_index, next_info.index)
409                     saved_packet = next_info.data
410                     # Check standard fields
411                     self.assertEqual(ip.src, saved_packet[ip_version].src)
412                     self.assertEqual(ip.dst, saved_packet[ip_version].dst)
413                     p = self.proto_map[payload_info.proto]
414                     if p == "TCP":
415                         tcp = packet[TCP]
416                         self.assertEqual(tcp.sport, saved_packet[TCP].sport)
417                         self.assertEqual(tcp.dport, saved_packet[TCP].dport)
418                     elif p == "UDP":
419                         udp = packet[UDP]
420                         self.assertEqual(udp.sport, saved_packet[UDP].sport)
421                         self.assertEqual(udp.dport, saved_packet[UDP].dport)
422                 except:
423                     self.logger.error(ppp("Unexpected or invalid packet:", packet))
424                     raise
425         for i in self.pg_interfaces:
426             remaining_packet = self.get_next_packet_info_for_interface2(
427                 i, dst_sw_if_index, last_info[i.sw_if_index]
428             )
429             self.assertTrue(
430                 remaining_packet is None,
431                 "Port %u: Packet expected from source %u didn't arrive"
432                 % (dst_sw_if_index, i.sw_if_index),
433             )
434
435     def run_traffic_no_check(self):
436         # Test
437         # Create incoming packet streams for packet-generator interfaces
438         for i in self.pg_interfaces:
439             if self.flows.__contains__(i):
440                 pkts = self.create_stream(i, self.pg_if_packet_sizes)
441                 if len(pkts) > 0:
442                     i.add_stream(pkts)
443
444         # Enable packet capture and start packet sending
445         self.pg_enable_capture(self.pg_interfaces)
446         self.pg_start()
447
448     def run_verify_test(
449         self,
450         traffic_type=0,
451         ip_type=0,
452         proto=-1,
453         ports=0,
454         frags=False,
455         pkt_raw=True,
456         etype=-1,
457     ):
458         # Test
459         # Create incoming packet streams for packet-generator interfaces
460         pkts_cnt = 0
461         for i in self.pg_interfaces:
462             if self.flows.__contains__(i):
463                 pkts = self.create_stream(
464                     i,
465                     self.pg_if_packet_sizes,
466                     traffic_type,
467                     ip_type,
468                     proto,
469                     ports,
470                     frags,
471                     pkt_raw,
472                     etype,
473                 )
474                 if len(pkts) > 0:
475                     i.add_stream(pkts)
476                     pkts_cnt += len(pkts)
477
478         # Enable packet capture and start packet sendingself.IPV
479         self.pg_enable_capture(self.pg_interfaces)
480         self.pg_start()
481         self.logger.info("sent packets count: %d" % pkts_cnt)
482
483         # Verify
484         # Verify outgoing packet streams per packet-generator interface
485         for src_if in self.pg_interfaces:
486             if self.flows.__contains__(src_if):
487                 for dst_if in self.flows[src_if]:
488                     capture = dst_if.get_capture(pkts_cnt)
489                     self.logger.info("Verifying capture on interface %s" % dst_if.name)
490                     self.verify_capture(dst_if, capture, traffic_type, ip_type, etype)
491
492     def run_verify_negat_test(
493         self, traffic_type=0, ip_type=0, proto=-1, ports=0, frags=False, etype=-1
494     ):
495         # Test
496         pkts_cnt = 0
497         self.reset_packet_infos()
498         for i in self.pg_interfaces:
499             if self.flows.__contains__(i):
500                 pkts = self.create_stream(
501                     i,
502                     self.pg_if_packet_sizes,
503                     traffic_type,
504                     ip_type,
505                     proto,
506                     ports,
507                     frags,
508                     True,
509                     etype,
510                 )
511                 if len(pkts) > 0:
512                     i.add_stream(pkts)
513                     pkts_cnt += len(pkts)
514
515         # Enable packet capture and start packet sending
516         self.pg_enable_capture(self.pg_interfaces)
517         self.pg_start()
518         self.logger.info("sent packets count: %d" % pkts_cnt)
519
520         # Verify
521         # Verify outgoing packet streams per packet-generator interface
522         for src_if in self.pg_interfaces:
523             if self.flows.__contains__(src_if):
524                 for dst_if in self.flows[src_if]:
525                     self.logger.info("Verifying capture on interface %s" % dst_if.name)
526                     capture = dst_if.get_capture(0)
527                     self.assertEqual(len(capture), 0)
528
529     def test_0000_warmup_test(self):
530         """ACL plugin version check; learn MACs"""
531         reply = self.vapi.papi.acl_plugin_get_version()
532         self.assertEqual(reply.major, 1)
533         self.logger.info(
534             "Working with ACL plugin version: %d.%d" % (reply.major, reply.minor)
535         )
536         # minor version changes are non breaking
537         # self.assertEqual(reply.minor, 0)
538
539     def test_0001_acl_create(self):
540         """ACL create/delete test"""
541
542         self.logger.info("ACLP_TEST_START_0001")
543         # Create a permit-1234 ACL
544         r = [AclRule(is_permit=1, proto=17, ports=1234, sport_to=1235)]
545         # Test 1: add a new ACL
546         first_acl = VppAcl(self, rules=r, tag="permit 1234")
547         first_acl.add_vpp_config()
548         self.assertTrue(first_acl.query_vpp_config())
549         # The very first ACL gets #0
550         self.assertEqual(first_acl.acl_index, 0)
551         rr = first_acl.dump()
552         self.logger.info("Dumped ACL: " + str(rr))
553         self.assertEqual(len(rr), 1)
554         # We should have the same number of ACL entries as we had asked
555         self.assertEqual(len(rr[0].r), len(r))
556         # The rules should be the same. But because the submitted and returned
557         # are different types, we need to iterate over rules and keys to get
558         # to basic values.
559         for i_rule in range(0, len(r) - 1):
560             encoded_rule = r[i_rule].encode()
561             for rule_key in encoded_rule:
562                 self.assertEqual(rr[0].r[i_rule][rule_key], encoded_rule[rule_key])
563
564         # Create a deny-1234 ACL
565         r_deny = [
566             AclRule(is_permit=0, proto=17, ports=1234, sport_to=1235),
567             AclRule(is_permit=1, proto=17, ports=0),
568         ]
569         second_acl = VppAcl(self, rules=r_deny, tag="deny 1234;permit all")
570         second_acl.add_vpp_config()
571         self.assertTrue(second_acl.query_vpp_config())
572         # The second ACL gets #1
573         self.assertEqual(second_acl.acl_index, 1)
574
575         # Test 2: try to modify a nonexistent ACL
576         invalid_acl = VppAcl(self, acl_index=432, rules=r, tag="FFFF:FFFF")
577         reply = invalid_acl.add_vpp_config(expect_error=True)
578
579         # apply an ACL on an interface inbound, try to delete ACL, must fail
580         acl_if_list = VppAclInterface(
581             self, sw_if_index=self.pg0.sw_if_index, n_input=1, acls=[first_acl]
582         )
583         acl_if_list.add_vpp_config()
584         first_acl.remove_vpp_config(expect_error=True)
585         # Unapply an ACL and then try to delete it - must be ok
586         acl_if_list.remove_vpp_config()
587         first_acl.remove_vpp_config()
588
589         # apply an ACL on an interface inbound, try to delete ACL, must fail
590         acl_if_list = VppAclInterface(
591             self, sw_if_index=self.pg0.sw_if_index, n_input=0, acls=[second_acl]
592         )
593         acl_if_list.add_vpp_config()
594         second_acl.remove_vpp_config(expect_error=True)
595         # Unapply an ACL and then try to delete it - must be ok
596         acl_if_list.remove_vpp_config()
597         second_acl.remove_vpp_config()
598
599         # try to apply a nonexistent ACL - must fail
600         acl_if_list = VppAclInterface(
601             self, sw_if_index=self.pg0.sw_if_index, n_input=0, acls=[invalid_acl]
602         )
603         acl_if_list.add_vpp_config(expect_error=True)
604
605         self.logger.info("ACLP_TEST_FINISH_0001")
606
607     def test_0002_acl_permit_apply(self):
608         """permit ACL apply test"""
609         self.logger.info("ACLP_TEST_START_0002")
610
611         rules = []
612         rules.append(
613             self.create_rule(self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.UDP])
614         )
615         rules.append(
616             self.create_rule(self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.TCP])
617         )
618
619         # Apply rules
620         acl_idx = self.apply_rules(rules, "permit per-flow")
621
622         # enable counters
623         reply = self.vapi.papi.acl_stats_intf_counters_enable(enable=1)
624
625         # Traffic should still pass
626         self.run_verify_test(self.IP, self.IPV4, -1)
627
628         matches = self.statistics.get_counter("/acl/%d/matches" % acl_idx)
629         self.logger.info("stat segment counters: %s" % repr(matches))
630         cli = "show acl-plugin acl"
631         self.logger.info(self.vapi.ppcli(cli))
632         cli = "show acl-plugin tables"
633         self.logger.info(self.vapi.ppcli(cli))
634
635         total_hits = matches[0][0]["packets"] + matches[0][1]["packets"]
636         self.assertEqual(total_hits, 64)
637
638         # disable counters
639         reply = self.vapi.papi.acl_stats_intf_counters_enable(enable=0)
640
641         self.logger.info("ACLP_TEST_FINISH_0002")
642
643     def test_0003_acl_deny_apply(self):
644         """deny ACL apply test"""
645         self.logger.info("ACLP_TEST_START_0003")
646         # Add a deny-flows ACL
647         rules = []
648         rules.append(
649             self.create_rule(
650                 self.IPV4, self.DENY, self.PORTS_ALL, self.proto[self.IP][self.UDP]
651             )
652         )
653         # Permit ip any any in the end
654         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_ALL, 0))
655
656         # Apply rules
657         acl_idx = self.apply_rules(rules, "deny per-flow;permit all")
658
659         # enable counters
660         reply = self.vapi.papi.acl_stats_intf_counters_enable(enable=1)
661
662         # Traffic should not pass
663         self.run_verify_negat_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP])
664
665         matches = self.statistics.get_counter("/acl/%d/matches" % acl_idx)
666         self.logger.info("stat segment counters: %s" % repr(matches))
667         cli = "show acl-plugin acl"
668         self.logger.info(self.vapi.ppcli(cli))
669         cli = "show acl-plugin tables"
670         self.logger.info(self.vapi.ppcli(cli))
671         self.assertEqual(matches[0][0]["packets"], 64)
672         # disable counters
673         reply = self.vapi.papi.acl_stats_intf_counters_enable(enable=0)
674         self.logger.info("ACLP_TEST_FINISH_0003")
675         # self.assertEqual(, 0)
676
677     def test_0004_vpp624_permit_icmpv4(self):
678         """VPP_624 permit ICMPv4"""
679         self.logger.info("ACLP_TEST_START_0004")
680
681         # Add an ACL
682         rules = []
683         rules.append(
684             self.create_rule(
685                 self.IPV4,
686                 self.PERMIT,
687                 self.PORTS_RANGE,
688                 self.proto[self.ICMP][self.ICMPv4],
689             )
690         )
691         # deny ip any any in the end
692         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
693
694         # Apply rules
695         self.apply_rules(rules, "permit icmpv4")
696
697         # Traffic should still pass
698         self.run_verify_test(self.ICMP, self.IPV4, self.proto[self.ICMP][self.ICMPv4])
699
700         self.logger.info("ACLP_TEST_FINISH_0004")
701
702     def test_0005_vpp624_permit_icmpv6(self):
703         """VPP_624 permit ICMPv6"""
704         self.logger.info("ACLP_TEST_START_0005")
705
706         # Add an ACL
707         rules = []
708         rules.append(
709             self.create_rule(
710                 self.IPV6,
711                 self.PERMIT,
712                 self.PORTS_RANGE,
713                 self.proto[self.ICMP][self.ICMPv6],
714             )
715         )
716         # deny ip any any in the end
717         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
718
719         # Apply rules
720         self.apply_rules(rules, "permit icmpv6")
721
722         # Traffic should still pass
723         self.run_verify_test(self.ICMP, self.IPV6, self.proto[self.ICMP][self.ICMPv6])
724
725         self.logger.info("ACLP_TEST_FINISH_0005")
726
727     def test_0006_vpp624_deny_icmpv4(self):
728         """VPP_624 deny ICMPv4"""
729         self.logger.info("ACLP_TEST_START_0006")
730         # Add an ACL
731         rules = []
732         rules.append(
733             self.create_rule(
734                 self.IPV4,
735                 self.DENY,
736                 self.PORTS_RANGE,
737                 self.proto[self.ICMP][self.ICMPv4],
738             )
739         )
740         # permit ip any any in the end
741         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_ALL, 0))
742
743         # Apply rules
744         self.apply_rules(rules, "deny icmpv4")
745
746         # Traffic should not pass
747         self.run_verify_negat_test(self.ICMP, self.IPV4, 0)
748
749         self.logger.info("ACLP_TEST_FINISH_0006")
750
751     def test_0007_vpp624_deny_icmpv6(self):
752         """VPP_624 deny ICMPv6"""
753         self.logger.info("ACLP_TEST_START_0007")
754         # Add an ACL
755         rules = []
756         rules.append(
757             self.create_rule(
758                 self.IPV6,
759                 self.DENY,
760                 self.PORTS_RANGE,
761                 self.proto[self.ICMP][self.ICMPv6],
762             )
763         )
764         # deny ip any any in the end
765         rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_ALL, 0))
766
767         # Apply rules
768         self.apply_rules(rules, "deny icmpv6")
769
770         # Traffic should not pass
771         self.run_verify_negat_test(self.ICMP, self.IPV6, 0)
772
773         self.logger.info("ACLP_TEST_FINISH_0007")
774
775     def test_0008_tcp_permit_v4(self):
776         """permit TCPv4"""
777         self.logger.info("ACLP_TEST_START_0008")
778
779         # Add an ACL
780         rules = []
781         rules.append(
782             self.create_rule(
783                 self.IPV4, self.PERMIT, self.PORTS_RANGE, self.proto[self.IP][self.TCP]
784             )
785         )
786         # deny ip any any in the end
787         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
788
789         # Apply rules
790         self.apply_rules(rules, "permit ipv4 tcp")
791
792         # Traffic should still pass
793         self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP])
794
795         self.logger.info("ACLP_TEST_FINISH_0008")
796
797     def test_0009_tcp_permit_v6(self):
798         """permit TCPv6"""
799         self.logger.info("ACLP_TEST_START_0009")
800
801         # Add an ACL
802         rules = []
803         rules.append(
804             self.create_rule(
805                 self.IPV6, self.PERMIT, self.PORTS_RANGE, self.proto[self.IP][self.TCP]
806             )
807         )
808         # deny ip any any in the end
809         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
810
811         # Apply rules
812         self.apply_rules(rules, "permit ip6 tcp")
813
814         # Traffic should still pass
815         self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.TCP])
816
817         self.logger.info("ACLP_TEST_FINISH_0008")
818
819     def test_0010_udp_permit_v4(self):
820         """permit UDPv4"""
821         self.logger.info("ACLP_TEST_START_0010")
822
823         # Add an ACL
824         rules = []
825         rules.append(
826             self.create_rule(
827                 self.IPV4, self.PERMIT, self.PORTS_RANGE, self.proto[self.IP][self.UDP]
828             )
829         )
830         # deny ip any any in the end
831         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
832
833         # Apply rules
834         self.apply_rules(rules, "permit ipv udp")
835
836         # Traffic should still pass
837         self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP])
838
839         self.logger.info("ACLP_TEST_FINISH_0010")
840
841     def test_0011_udp_permit_v6(self):
842         """permit UDPv6"""
843         self.logger.info("ACLP_TEST_START_0011")
844
845         # Add an ACL
846         rules = []
847         rules.append(
848             self.create_rule(
849                 self.IPV6, self.PERMIT, self.PORTS_RANGE, self.proto[self.IP][self.UDP]
850             )
851         )
852         # deny ip any any in the end
853         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
854
855         # Apply rules
856         self.apply_rules(rules, "permit ip6 udp")
857
858         # Traffic should still pass
859         self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.UDP])
860
861         self.logger.info("ACLP_TEST_FINISH_0011")
862
863     def test_0012_tcp_deny(self):
864         """deny TCPv4/v6"""
865         self.logger.info("ACLP_TEST_START_0012")
866
867         # Add an ACL
868         rules = []
869         rules.append(
870             self.create_rule(
871                 self.IPV4, self.DENY, self.PORTS_RANGE, self.proto[self.IP][self.TCP]
872             )
873         )
874         rules.append(
875             self.create_rule(
876                 self.IPV6, self.DENY, self.PORTS_RANGE, self.proto[self.IP][self.TCP]
877             )
878         )
879         # permit ip any any in the end
880         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_ALL, 0))
881         rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_ALL, 0))
882
883         # Apply rules
884         self.apply_rules(rules, "deny ip4/ip6 tcp")
885
886         # Traffic should not pass
887         self.run_verify_negat_test(
888             self.IP, self.IPRANDOM, self.proto[self.IP][self.TCP]
889         )
890
891         self.logger.info("ACLP_TEST_FINISH_0012")
892
893     def test_0013_udp_deny(self):
894         """deny UDPv4/v6"""
895         self.logger.info("ACLP_TEST_START_0013")
896
897         # Add an ACL
898         rules = []
899         rules.append(
900             self.create_rule(
901                 self.IPV4, self.DENY, self.PORTS_RANGE, self.proto[self.IP][self.UDP]
902             )
903         )
904         rules.append(
905             self.create_rule(
906                 self.IPV6, self.DENY, self.PORTS_RANGE, self.proto[self.IP][self.UDP]
907             )
908         )
909         # permit ip any any in the end
910         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_ALL, 0))
911         rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_ALL, 0))
912
913         # Apply rules
914         self.apply_rules(rules, "deny ip4/ip6 udp")
915
916         # Traffic should not pass
917         self.run_verify_negat_test(
918             self.IP, self.IPRANDOM, self.proto[self.IP][self.UDP]
919         )
920
921         self.logger.info("ACLP_TEST_FINISH_0013")
922
923     def test_0014_acl_dump(self):
924         """verify add/dump acls"""
925         self.logger.info("ACLP_TEST_START_0014")
926
927         r = [
928             [self.IPV4, self.PERMIT, 1234, self.proto[self.IP][self.TCP]],
929             [self.IPV4, self.PERMIT, 2345, self.proto[self.IP][self.UDP]],
930             [self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.TCP]],
931             [self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.UDP]],
932             [self.IPV4, self.PERMIT, 5, self.proto[self.ICMP][self.ICMPv4]],
933             [self.IPV6, self.PERMIT, 4321, self.proto[self.IP][self.TCP]],
934             [self.IPV6, self.PERMIT, 5432, self.proto[self.IP][self.UDP]],
935             [self.IPV6, self.PERMIT, 0, self.proto[self.IP][self.TCP]],
936             [self.IPV6, self.PERMIT, 0, self.proto[self.IP][self.UDP]],
937             [self.IPV6, self.PERMIT, 6, self.proto[self.ICMP][self.ICMPv6]],
938             [self.IPV4, self.DENY, self.PORTS_ALL, 0],
939             [self.IPV4, self.DENY, 1234, self.proto[self.IP][self.TCP]],
940             [self.IPV4, self.DENY, 2345, self.proto[self.IP][self.UDP]],
941             [self.IPV4, self.DENY, 5, self.proto[self.ICMP][self.ICMPv4]],
942             [self.IPV6, self.DENY, 4321, self.proto[self.IP][self.TCP]],
943             [self.IPV6, self.DENY, 5432, self.proto[self.IP][self.UDP]],
944             [self.IPV6, self.DENY, 6, self.proto[self.ICMP][self.ICMPv6]],
945             [self.IPV6, self.DENY, self.PORTS_ALL, 0],
946         ]
947
948         # Add and verify new ACLs
949         rules = []
950         for i in range(len(r)):
951             rules.append(self.create_rule(r[i][0], r[i][1], r[i][2], r[i][3]))
952
953         acl = VppAcl(self, rules=rules)
954         acl.add_vpp_config()
955         result = acl.dump()
956
957         i = 0
958         for drules in result:
959             for dr in drules.r:
960                 self.assertEqual(dr.is_permit, r[i][1])
961                 self.assertEqual(dr.proto, r[i][3])
962
963                 if r[i][2] > 0:
964                     self.assertEqual(dr.srcport_or_icmptype_first, r[i][2])
965                 else:
966                     if r[i][2] < 0:
967                         self.assertEqual(dr.srcport_or_icmptype_first, 0)
968                         self.assertEqual(dr.srcport_or_icmptype_last, 65535)
969                     else:
970                         if dr.proto == self.proto[self.IP][self.TCP]:
971                             self.assertGreater(
972                                 dr.srcport_or_icmptype_first, self.tcp_sport_from - 1
973                             )
974                             self.assertLess(
975                                 dr.srcport_or_icmptype_first, self.tcp_sport_to + 1
976                             )
977                             self.assertGreater(
978                                 dr.dstport_or_icmpcode_last, self.tcp_dport_from - 1
979                             )
980                             self.assertLess(
981                                 dr.dstport_or_icmpcode_last, self.tcp_dport_to + 1
982                             )
983                         elif dr.proto == self.proto[self.IP][self.UDP]:
984                             self.assertGreater(
985                                 dr.srcport_or_icmptype_first, self.udp_sport_from - 1
986                             )
987                             self.assertLess(
988                                 dr.srcport_or_icmptype_first, self.udp_sport_to + 1
989                             )
990                             self.assertGreater(
991                                 dr.dstport_or_icmpcode_last, self.udp_dport_from - 1
992                             )
993                             self.assertLess(
994                                 dr.dstport_or_icmpcode_last, self.udp_dport_to + 1
995                             )
996                 i += 1
997
998         self.logger.info("ACLP_TEST_FINISH_0014")
999
1000     def test_0015_tcp_permit_port_v4(self):
1001         """permit single TCPv4"""
1002         self.logger.info("ACLP_TEST_START_0015")
1003
1004         port = random.randint(16384, 65535)
1005         # Add an ACL
1006         rules = []
1007         rules.append(
1008             self.create_rule(
1009                 self.IPV4, self.PERMIT, port, self.proto[self.IP][self.TCP]
1010             )
1011         )
1012         # deny ip any any in the end
1013         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1014
1015         # Apply rules
1016         self.apply_rules(rules, "permit ip4 tcp %d" % port)
1017
1018         # Traffic should still pass
1019         self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP], port)
1020
1021         self.logger.info("ACLP_TEST_FINISH_0015")
1022
1023     def test_0016_udp_permit_port_v4(self):
1024         """permit single UDPv4"""
1025         self.logger.info("ACLP_TEST_START_0016")
1026
1027         port = random.randint(16384, 65535)
1028         # Add an ACL
1029         rules = []
1030         rules.append(
1031             self.create_rule(
1032                 self.IPV4, self.PERMIT, port, self.proto[self.IP][self.UDP]
1033             )
1034         )
1035         # deny ip any any in the end
1036         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1037
1038         # Apply rules
1039         self.apply_rules(rules, "permit ip4 tcp %d" % port)
1040
1041         # Traffic should still pass
1042         self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP], port)
1043
1044         self.logger.info("ACLP_TEST_FINISH_0016")
1045
1046     def test_0017_tcp_permit_port_v6(self):
1047         """permit single TCPv6"""
1048         self.logger.info("ACLP_TEST_START_0017")
1049
1050         port = random.randint(16384, 65535)
1051         # Add an ACL
1052         rules = []
1053         rules.append(
1054             self.create_rule(
1055                 self.IPV6, self.PERMIT, port, self.proto[self.IP][self.TCP]
1056             )
1057         )
1058         # deny ip any any in the end
1059         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1060
1061         # Apply rules
1062         self.apply_rules(rules, "permit ip4 tcp %d" % port)
1063
1064         # Traffic should still pass
1065         self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.TCP], port)
1066
1067         self.logger.info("ACLP_TEST_FINISH_0017")
1068
1069     def test_0018_udp_permit_port_v6(self):
1070         """permit single UDPv6"""
1071         self.logger.info("ACLP_TEST_START_0018")
1072
1073         port = random.randint(16384, 65535)
1074         # Add an ACL
1075         rules = []
1076         rules.append(
1077             self.create_rule(
1078                 self.IPV6, self.PERMIT, port, self.proto[self.IP][self.UDP]
1079             )
1080         )
1081         # deny ip any any in the end
1082         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1083
1084         # Apply rules
1085         self.apply_rules(rules, "permit ip4 tcp %d" % port)
1086
1087         # Traffic should still pass
1088         self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.UDP], port)
1089
1090         self.logger.info("ACLP_TEST_FINISH_0018")
1091
1092     def test_0019_udp_deny_port(self):
1093         """deny single TCPv4/v6"""
1094         self.logger.info("ACLP_TEST_START_0019")
1095
1096         port = random.randint(16384, 65535)
1097         # Add an ACL
1098         rules = []
1099         rules.append(
1100             self.create_rule(self.IPV4, self.DENY, port, self.proto[self.IP][self.TCP])
1101         )
1102         rules.append(
1103             self.create_rule(self.IPV6, self.DENY, port, self.proto[self.IP][self.TCP])
1104         )
1105         # Permit ip any any in the end
1106         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_ALL, 0))
1107         rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_ALL, 0))
1108
1109         # Apply rules
1110         self.apply_rules(rules, "deny ip4/ip6 udp %d" % port)
1111
1112         # Traffic should not pass
1113         self.run_verify_negat_test(
1114             self.IP, self.IPRANDOM, self.proto[self.IP][self.TCP], port
1115         )
1116
1117         self.logger.info("ACLP_TEST_FINISH_0019")
1118
1119     def test_0020_udp_deny_port(self):
1120         """deny single UDPv4/v6"""
1121         self.logger.info("ACLP_TEST_START_0020")
1122
1123         port = random.randint(16384, 65535)
1124         # Add an ACL
1125         rules = []
1126         rules.append(
1127             self.create_rule(self.IPV4, self.DENY, port, self.proto[self.IP][self.UDP])
1128         )
1129         rules.append(
1130             self.create_rule(self.IPV6, self.DENY, port, self.proto[self.IP][self.UDP])
1131         )
1132         # Permit ip any any in the end
1133         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_ALL, 0))
1134         rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_ALL, 0))
1135
1136         # Apply rules
1137         self.apply_rules(rules, "deny ip4/ip6 udp %d" % port)
1138
1139         # Traffic should not pass
1140         self.run_verify_negat_test(
1141             self.IP, self.IPRANDOM, self.proto[self.IP][self.UDP], port
1142         )
1143
1144         self.logger.info("ACLP_TEST_FINISH_0020")
1145
1146     def test_0021_udp_deny_port_verify_fragment_deny(self):
1147         """deny single UDPv4/v6, permit ip any, verify non-initial fragment
1148         blocked
1149         """
1150         self.logger.info("ACLP_TEST_START_0021")
1151
1152         port = random.randint(16384, 65535)
1153         # Add an ACL
1154         rules = []
1155         rules.append(
1156             self.create_rule(self.IPV4, self.DENY, port, self.proto[self.IP][self.UDP])
1157         )
1158         rules.append(
1159             self.create_rule(self.IPV6, self.DENY, port, self.proto[self.IP][self.UDP])
1160         )
1161         # deny ip any any in the end
1162         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_ALL, 0))
1163         rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_ALL, 0))
1164
1165         # Apply rules
1166         self.apply_rules(rules, "deny ip4/ip6 udp %d" % port)
1167
1168         # Traffic should not pass
1169         self.run_verify_negat_test(
1170             self.IP, self.IPRANDOM, self.proto[self.IP][self.UDP], port, True
1171         )
1172
1173         self.logger.info("ACLP_TEST_FINISH_0021")
1174
1175     def test_0022_zero_length_udp_ipv4(self):
1176         """VPP-687 zero length udp ipv4 packet"""
1177         self.logger.info("ACLP_TEST_START_0022")
1178
1179         port = random.randint(16384, 65535)
1180         # Add an ACL
1181         rules = []
1182         rules.append(
1183             self.create_rule(
1184                 self.IPV4, self.PERMIT, port, self.proto[self.IP][self.UDP]
1185             )
1186         )
1187         # deny ip any any in the end
1188         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1189
1190         # Apply rules
1191         self.apply_rules(rules, "permit empty udp ip4 %d" % port)
1192
1193         # Traffic should still pass
1194         # Create incoming packet streams for packet-generator interfaces
1195         pkts_cnt = 0
1196         pkts = self.create_stream(
1197             self.pg0,
1198             self.pg_if_packet_sizes,
1199             self.IP,
1200             self.IPV4,
1201             self.proto[self.IP][self.UDP],
1202             port,
1203             False,
1204             False,
1205         )
1206         if len(pkts) > 0:
1207             self.pg0.add_stream(pkts)
1208             pkts_cnt += len(pkts)
1209
1210         # Enable packet capture and start packet sendingself.IPV
1211         self.pg_enable_capture(self.pg_interfaces)
1212         self.pg_start()
1213
1214         self.pg1.get_capture(pkts_cnt)
1215
1216         self.logger.info("ACLP_TEST_FINISH_0022")
1217
1218     def test_0023_zero_length_udp_ipv6(self):
1219         """VPP-687 zero length udp ipv6 packet"""
1220         self.logger.info("ACLP_TEST_START_0023")
1221
1222         port = random.randint(16384, 65535)
1223         # Add an ACL
1224         rules = []
1225         rules.append(
1226             self.create_rule(
1227                 self.IPV6, self.PERMIT, port, self.proto[self.IP][self.UDP]
1228             )
1229         )
1230         # deny ip any any in the end
1231         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1232
1233         # Apply rules
1234         self.apply_rules(rules, "permit empty udp ip6 %d" % port)
1235
1236         # Traffic should still pass
1237         # Create incoming packet streams for packet-generator interfaces
1238         pkts_cnt = 0
1239         pkts = self.create_stream(
1240             self.pg0,
1241             self.pg_if_packet_sizes,
1242             self.IP,
1243             self.IPV6,
1244             self.proto[self.IP][self.UDP],
1245             port,
1246             False,
1247             False,
1248         )
1249         if len(pkts) > 0:
1250             self.pg0.add_stream(pkts)
1251             pkts_cnt += len(pkts)
1252
1253         # Enable packet capture and start packet sendingself.IPV
1254         self.pg_enable_capture(self.pg_interfaces)
1255         self.pg_start()
1256
1257         # Verify outgoing packet streams per packet-generator interface
1258         self.pg1.get_capture(pkts_cnt)
1259
1260         self.logger.info("ACLP_TEST_FINISH_0023")
1261
1262     def test_0108_tcp_permit_v4(self):
1263         """permit TCPv4 + non-match range"""
1264         self.logger.info("ACLP_TEST_START_0108")
1265
1266         # Add an ACL
1267         rules = []
1268         rules.append(
1269             self.create_rule(
1270                 self.IPV4, self.DENY, self.PORTS_RANGE_2, self.proto[self.IP][self.TCP]
1271             )
1272         )
1273         rules.append(
1274             self.create_rule(
1275                 self.IPV4, self.PERMIT, self.PORTS_RANGE, self.proto[self.IP][self.TCP]
1276             )
1277         )
1278         # deny ip any any in the end
1279         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1280
1281         # Apply rules
1282         self.apply_rules(rules, "permit ipv4 tcp")
1283
1284         # Traffic should still pass
1285         self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP])
1286
1287         self.logger.info("ACLP_TEST_FINISH_0108")
1288
1289     def test_0109_tcp_permit_v6(self):
1290         """permit TCPv6 + non-match range"""
1291         self.logger.info("ACLP_TEST_START_0109")
1292
1293         # Add an ACL
1294         rules = []
1295         rules.append(
1296             self.create_rule(
1297                 self.IPV6, self.DENY, self.PORTS_RANGE_2, self.proto[self.IP][self.TCP]
1298             )
1299         )
1300         rules.append(
1301             self.create_rule(
1302                 self.IPV6, self.PERMIT, self.PORTS_RANGE, self.proto[self.IP][self.TCP]
1303             )
1304         )
1305         # deny ip any any in the end
1306         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1307
1308         # Apply rules
1309         self.apply_rules(rules, "permit ip6 tcp")
1310
1311         # Traffic should still pass
1312         self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.TCP])
1313
1314         self.logger.info("ACLP_TEST_FINISH_0109")
1315
1316     def test_0110_udp_permit_v4(self):
1317         """permit UDPv4 + non-match range"""
1318         self.logger.info("ACLP_TEST_START_0110")
1319
1320         # Add an ACL
1321         rules = []
1322         rules.append(
1323             self.create_rule(
1324                 self.IPV4, self.DENY, self.PORTS_RANGE_2, self.proto[self.IP][self.UDP]
1325             )
1326         )
1327         rules.append(
1328             self.create_rule(
1329                 self.IPV4, self.PERMIT, self.PORTS_RANGE, self.proto[self.IP][self.UDP]
1330             )
1331         )
1332         # deny ip any any in the end
1333         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1334
1335         # Apply rules
1336         self.apply_rules(rules, "permit ipv4 udp")
1337
1338         # Traffic should still pass
1339         self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP])
1340
1341         self.logger.info("ACLP_TEST_FINISH_0110")
1342
1343     def test_0111_udp_permit_v6(self):
1344         """permit UDPv6 + non-match range"""
1345         self.logger.info("ACLP_TEST_START_0111")
1346
1347         # Add an ACL
1348         rules = []
1349         rules.append(
1350             self.create_rule(
1351                 self.IPV6, self.DENY, self.PORTS_RANGE_2, self.proto[self.IP][self.UDP]
1352             )
1353         )
1354         rules.append(
1355             self.create_rule(
1356                 self.IPV6, self.PERMIT, self.PORTS_RANGE, self.proto[self.IP][self.UDP]
1357             )
1358         )
1359         # deny ip any any in the end
1360         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1361
1362         # Apply rules
1363         self.apply_rules(rules, "permit ip6 udp")
1364
1365         # Traffic should still pass
1366         self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.UDP])
1367
1368         self.logger.info("ACLP_TEST_FINISH_0111")
1369
1370     def test_0112_tcp_deny(self):
1371         """deny TCPv4/v6 + non-match range"""
1372         self.logger.info("ACLP_TEST_START_0112")
1373
1374         # Add an ACL
1375         rules = []
1376         rules.append(
1377             self.create_rule(
1378                 self.IPV4,
1379                 self.PERMIT,
1380                 self.PORTS_RANGE_2,
1381                 self.proto[self.IP][self.TCP],
1382             )
1383         )
1384         rules.append(
1385             self.create_rule(
1386                 self.IPV6,
1387                 self.PERMIT,
1388                 self.PORTS_RANGE_2,
1389                 self.proto[self.IP][self.TCP],
1390             )
1391         )
1392         rules.append(
1393             self.create_rule(
1394                 self.IPV4, self.DENY, self.PORTS_RANGE, self.proto[self.IP][self.TCP]
1395             )
1396         )
1397         rules.append(
1398             self.create_rule(
1399                 self.IPV6, self.DENY, self.PORTS_RANGE, self.proto[self.IP][self.TCP]
1400             )
1401         )
1402         # permit ip any any in the end
1403         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_ALL, 0))
1404         rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_ALL, 0))
1405
1406         # Apply rules
1407         self.apply_rules(rules, "deny ip4/ip6 tcp")
1408
1409         # Traffic should not pass
1410         self.run_verify_negat_test(
1411             self.IP, self.IPRANDOM, self.proto[self.IP][self.TCP]
1412         )
1413
1414         self.logger.info("ACLP_TEST_FINISH_0112")
1415
1416     def test_0113_udp_deny(self):
1417         """deny UDPv4/v6 + non-match range"""
1418         self.logger.info("ACLP_TEST_START_0113")
1419
1420         # Add an ACL
1421         rules = []
1422         rules.append(
1423             self.create_rule(
1424                 self.IPV4,
1425                 self.PERMIT,
1426                 self.PORTS_RANGE_2,
1427                 self.proto[self.IP][self.UDP],
1428             )
1429         )
1430         rules.append(
1431             self.create_rule(
1432                 self.IPV6,
1433                 self.PERMIT,
1434                 self.PORTS_RANGE_2,
1435                 self.proto[self.IP][self.UDP],
1436             )
1437         )
1438         rules.append(
1439             self.create_rule(
1440                 self.IPV4, self.DENY, self.PORTS_RANGE, self.proto[self.IP][self.UDP]
1441             )
1442         )
1443         rules.append(
1444             self.create_rule(
1445                 self.IPV6, self.DENY, self.PORTS_RANGE, self.proto[self.IP][self.UDP]
1446             )
1447         )
1448         # permit ip any any in the end
1449         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_ALL, 0))
1450         rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_ALL, 0))
1451
1452         # Apply rules
1453         self.apply_rules(rules, "deny ip4/ip6 udp")
1454
1455         # Traffic should not pass
1456         self.run_verify_negat_test(
1457             self.IP, self.IPRANDOM, self.proto[self.IP][self.UDP]
1458         )
1459
1460         self.logger.info("ACLP_TEST_FINISH_0113")
1461
1462     def test_0300_tcp_permit_v4_etype_aaaa(self):
1463         """permit TCPv4, send 0xAAAA etype"""
1464         self.logger.info("ACLP_TEST_START_0300")
1465
1466         # Add an ACL
1467         rules = []
1468         rules.append(
1469             self.create_rule(
1470                 self.IPV4, self.DENY, self.PORTS_RANGE_2, self.proto[self.IP][self.TCP]
1471             )
1472         )
1473         rules.append(
1474             self.create_rule(
1475                 self.IPV4, self.PERMIT, self.PORTS_RANGE, self.proto[self.IP][self.TCP]
1476             )
1477         )
1478         # deny ip any any in the end
1479         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1480
1481         # Apply rules
1482         self.apply_rules(rules, "permit ipv4 tcp")
1483
1484         # Traffic should still pass also for an odd ethertype
1485         self.run_verify_test(
1486             self.IP, self.IPV4, self.proto[self.IP][self.TCP], 0, False, True, 0xAAAA
1487         )
1488         self.logger.info("ACLP_TEST_FINISH_0300")
1489
1490     def test_0305_tcp_permit_v4_etype_blacklist_aaaa(self):
1491         """permit TCPv4, whitelist 0x0BBB ethertype, send 0xAAAA-blocked"""
1492         self.logger.info("ACLP_TEST_START_0305")
1493
1494         # Add an ACL
1495         rules = []
1496         rules.append(
1497             self.create_rule(
1498                 self.IPV4, self.DENY, self.PORTS_RANGE_2, self.proto[self.IP][self.TCP]
1499             )
1500         )
1501         rules.append(
1502             self.create_rule(
1503                 self.IPV4, self.PERMIT, self.PORTS_RANGE, self.proto[self.IP][self.TCP]
1504             )
1505         )
1506         # deny ip any any in the end
1507         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1508
1509         # Apply rules
1510         self.apply_rules(rules, "permit ipv4 tcp")
1511         # whitelist the 0xbbbb etype - so the 0xaaaa should be blocked
1512         self.etype_whitelist([0xBBB], 1)
1513
1514         # The oddball ethertype should be blocked
1515         self.run_verify_negat_test(
1516             self.IP, self.IPV4, self.proto[self.IP][self.TCP], 0, False, 0xAAAA
1517         )
1518
1519         # remove the whitelist
1520         self.etype_whitelist([], 0, add=False)
1521
1522         self.logger.info("ACLP_TEST_FINISH_0305")
1523
1524     def test_0306_tcp_permit_v4_etype_blacklist_aaaa(self):
1525         """permit TCPv4, whitelist 0x0BBB ethertype, send 0x0BBB - pass"""
1526         self.logger.info("ACLP_TEST_START_0306")
1527
1528         # Add an ACL
1529         rules = []
1530         rules.append(
1531             self.create_rule(
1532                 self.IPV4, self.DENY, self.PORTS_RANGE_2, self.proto[self.IP][self.TCP]
1533             )
1534         )
1535         rules.append(
1536             self.create_rule(
1537                 self.IPV4, self.PERMIT, self.PORTS_RANGE, self.proto[self.IP][self.TCP]
1538             )
1539         )
1540         # deny ip any any in the end
1541         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1542
1543         # Apply rules
1544         self.apply_rules(rules, "permit ipv4 tcp")
1545         # whitelist the 0xbbbb etype - so the 0xaaaa should be blocked
1546         self.etype_whitelist([0xBBB], 1)
1547
1548         # The whitelisted traffic, should pass
1549         self.run_verify_test(
1550             self.IP, self.IPV4, self.proto[self.IP][self.TCP], 0, False, True, 0x0BBB
1551         )
1552
1553         # remove the whitelist, the previously blocked 0xAAAA should pass now
1554         self.etype_whitelist([], 0, add=False)
1555
1556         self.logger.info("ACLP_TEST_FINISH_0306")
1557
1558     def test_0307_tcp_permit_v4_etype_blacklist_aaaa(self):
1559         """permit TCPv4, whitelist 0x0BBB, remove, send 0xAAAA - pass"""
1560         self.logger.info("ACLP_TEST_START_0307")
1561
1562         # Add an ACL
1563         rules = []
1564         rules.append(
1565             self.create_rule(
1566                 self.IPV4, self.DENY, self.PORTS_RANGE_2, self.proto[self.IP][self.TCP]
1567             )
1568         )
1569         rules.append(
1570             self.create_rule(
1571                 self.IPV4, self.PERMIT, self.PORTS_RANGE, self.proto[self.IP][self.TCP]
1572             )
1573         )
1574         # deny ip any any in the end
1575         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1576
1577         # Apply rules
1578         self.apply_rules(rules, "permit ipv4 tcp")
1579
1580         # whitelist the 0xbbbb etype - so the 0xaaaa should be blocked
1581         self.etype_whitelist([0xBBB], 1)
1582         # remove the whitelist, the previously blocked 0xAAAA should pass now
1583         self.etype_whitelist([], 0, add=False)
1584
1585         # The whitelisted traffic, should pass
1586         self.run_verify_test(
1587             self.IP, self.IPV4, self.proto[self.IP][self.TCP], 0, False, True, 0xAAAA
1588         )
1589
1590         self.logger.info("ACLP_TEST_FINISH_0306")
1591
1592     def test_0315_del_intf(self):
1593         """apply an acl and delete the interface"""
1594         self.logger.info("ACLP_TEST_START_0315")
1595
1596         # Add an ACL
1597         rules = []
1598         rules.append(
1599             self.create_rule(
1600                 self.IPV4, self.DENY, self.PORTS_RANGE_2, self.proto[self.IP][self.TCP]
1601             )
1602         )
1603         rules.append(
1604             self.create_rule(
1605                 self.IPV4, self.PERMIT, self.PORTS_RANGE, self.proto[self.IP][self.TCP]
1606             )
1607         )
1608         # deny ip any any in the end
1609         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1610
1611         # create an interface
1612         intf = []
1613         intf.append(VppLoInterface(self))
1614
1615         # Apply rules
1616         self.apply_rules_to(rules, "permit ipv4 tcp", intf[0].sw_if_index)
1617
1618         # Remove the interface
1619         intf[0].remove_vpp_config()
1620
1621         self.logger.info("ACLP_TEST_FINISH_0315")
1622
1623
1624 if __name__ == "__main__":
1625     unittest.main(testRunner=VppTestRunner)