acl: API cleanup
[vpp.git] / src / plugins / acl / test / test_acl_plugin.py
1 #!/usr/bin/env python3
2 """ACL plugin Test Case HLD:
3 """
4
5 import unittest
6 import random
7
8 from scapy.packet import Raw
9 from scapy.layers.l2 import Ether
10 from scapy.layers.inet import IP, TCP, UDP, ICMP
11 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest
12 from scapy.layers.inet6 import IPv6ExtHdrFragment
13 from framework import VppTestCase, VppTestRunner
14 from util import Host, ppp
15 from ipaddress import IPv4Network, IPv6Network
16
17 from vpp_lo_interface import VppLoInterface
18 from vpp_acl import AclRule, VppAcl, VppAclInterface, VppEtypeWhitelist
19 from vpp_ip import INVALID_INDEX
20
21
22 class TestACLplugin(VppTestCase):
23     """ ACL plugin Test Case """
24
25     # traffic types
26     IP = 0
27     ICMP = 1
28
29     # IP version
30     IPRANDOM = -1
31     IPV4 = 0
32     IPV6 = 1
33
34     # rule types
35     DENY = 0
36     PERMIT = 1
37
38     # supported protocols
39     proto = [[6, 17], [1, 58]]
40     proto_map = {1: 'ICMP', 58: 'ICMPv6EchoRequest', 6: 'TCP', 17: 'UDP'}
41     ICMPv4 = 0
42     ICMPv6 = 1
43     TCP = 0
44     UDP = 1
45     PROTO_ALL = 0
46
47     # port ranges
48     PORTS_ALL = -1
49     PORTS_RANGE = 0
50     PORTS_RANGE_2 = 1
51     udp_sport_from = 10
52     udp_sport_to = udp_sport_from + 5
53     udp_dport_from = 20000
54     udp_dport_to = udp_dport_from + 5000
55     tcp_sport_from = 30
56     tcp_sport_to = tcp_sport_from + 5
57     tcp_dport_from = 40000
58     tcp_dport_to = tcp_dport_from + 5000
59
60     udp_sport_from_2 = 90
61     udp_sport_to_2 = udp_sport_from_2 + 5
62     udp_dport_from_2 = 30000
63     udp_dport_to_2 = udp_dport_from_2 + 5000
64     tcp_sport_from_2 = 130
65     tcp_sport_to_2 = tcp_sport_from_2 + 5
66     tcp_dport_from_2 = 20000
67     tcp_dport_to_2 = tcp_dport_from_2 + 5000
68
69     icmp4_type = 8  # echo request
70     icmp4_code = 3
71     icmp6_type = 128  # echo request
72     icmp6_code = 3
73
74     icmp4_type_2 = 8
75     icmp4_code_from_2 = 5
76     icmp4_code_to_2 = 20
77     icmp6_type_2 = 128
78     icmp6_code_from_2 = 8
79     icmp6_code_to_2 = 42
80
81     # Test variables
82     bd_id = 1
83
84     @classmethod
85     def setUpClass(cls):
86         """
87         Perform standard class setup (defined by class method setUpClass in
88         class VppTestCase) before running the test case, set test case related
89         variables and configure VPP.
90         """
91         super(TestACLplugin, cls).setUpClass()
92
93         try:
94             # Create 2 pg interfaces
95             cls.create_pg_interfaces(range(2))
96
97             # Packet flows mapping pg0 -> pg1, pg2 etc.
98             cls.flows = dict()
99             cls.flows[cls.pg0] = [cls.pg1]
100
101             # Packet sizes
102             cls.pg_if_packet_sizes = [64, 512, 1518, 9018]
103
104             # Create BD with MAC learning and unknown unicast flooding disabled
105             # and put interfaces to this BD
106             cls.vapi.bridge_domain_add_del(bd_id=cls.bd_id, uu_flood=1,
107                                            learn=1)
108             for pg_if in cls.pg_interfaces:
109                 cls.vapi.sw_interface_set_l2_bridge(
110                     rx_sw_if_index=pg_if.sw_if_index, bd_id=cls.bd_id)
111
112             # Set up all interfaces
113             for i in cls.pg_interfaces:
114                 i.admin_up()
115
116             # Mapping between packet-generator index and lists of test hosts
117             cls.hosts_by_pg_idx = dict()
118             for pg_if in cls.pg_interfaces:
119                 cls.hosts_by_pg_idx[pg_if.sw_if_index] = []
120
121             # Create list of deleted hosts
122             cls.deleted_hosts_by_pg_idx = dict()
123             for pg_if in cls.pg_interfaces:
124                 cls.deleted_hosts_by_pg_idx[pg_if.sw_if_index] = []
125
126             # warm-up the mac address tables
127             # self.warmup_test()
128             count = 16
129             start = 0
130             n_int = len(cls.pg_interfaces)
131             macs_per_if = count // n_int
132             i = -1
133             for pg_if in cls.pg_interfaces:
134                 i += 1
135                 start_nr = macs_per_if * i + start
136                 end_nr = count + start if i == (n_int - 1) \
137                     else macs_per_if * (i + 1) + start
138                 hosts = cls.hosts_by_pg_idx[pg_if.sw_if_index]
139                 for j in range(int(start_nr), int(end_nr)):
140                     host = Host(
141                         "00:00:00:ff:%02x:%02x" % (pg_if.sw_if_index, j),
142                         "172.17.1%02x.%u" % (pg_if.sw_if_index, j),
143                         "2017:dead:%02x::%u" % (pg_if.sw_if_index, j))
144                     hosts.append(host)
145
146         except Exception:
147             super(TestACLplugin, cls).tearDownClass()
148             raise
149
150     @classmethod
151     def tearDownClass(cls):
152         super(TestACLplugin, cls).tearDownClass()
153
154     def setUp(self):
155         super(TestACLplugin, self).setUp()
156         self.reset_packet_infos()
157
158     def tearDown(self):
159         """
160         Show various debug prints after each test.
161         """
162         super(TestACLplugin, self).tearDown()
163
164     def show_commands_at_teardown(self):
165         cli = "show vlib graph l2-input-feat-arc"
166         self.logger.info(self.vapi.ppcli(cli))
167         cli = "show vlib graph l2-input-feat-arc-end"
168         self.logger.info(self.vapi.ppcli(cli))
169         cli = "show vlib graph l2-output-feat-arc"
170         self.logger.info(self.vapi.ppcli(cli))
171         cli = "show vlib graph l2-output-feat-arc-end"
172         self.logger.info(self.vapi.ppcli(cli))
173         self.logger.info(self.vapi.ppcli("show l2fib verbose"))
174         self.logger.info(self.vapi.ppcli("show acl-plugin acl"))
175         self.logger.info(self.vapi.ppcli("show acl-plugin interface"))
176         self.logger.info(self.vapi.ppcli("show acl-plugin tables"))
177         self.logger.info(self.vapi.ppcli("show bridge-domain %s detail"
178                                          % self.bd_id))
179
180     def create_rule(self, ip=0, permit_deny=0, ports=PORTS_ALL, proto=-1,
181                     s_prefix=0, s_ip=0,
182                     d_prefix=0, d_ip=0):
183         if ip:
184             src_prefix = IPv6Network((s_ip, s_prefix))
185             dst_prefix = IPv6Network((d_ip, d_prefix))
186         else:
187             src_prefix = IPv4Network((s_ip, s_prefix))
188             dst_prefix = IPv4Network((d_ip, d_prefix))
189         return AclRule(is_permit=permit_deny, ports=ports, proto=proto,
190                        src_prefix=src_prefix, dst_prefix=dst_prefix)
191
192     def apply_rules(self, rules, tag=None):
193         acl = VppAcl(self, rules, tag=tag)
194         acl.add_vpp_config()
195         self.logger.info("Dumped ACL: " + str(acl.dump()))
196         # Apply a ACL on the interface as inbound
197         for i in self.pg_interfaces:
198             acl_if = VppAclInterface(
199                 self, sw_if_index=i.sw_if_index, n_input=1, acls=[acl])
200             acl_if.add_vpp_config()
201         return acl.acl_index
202
203     def apply_rules_to(self, rules, tag=None, sw_if_index=INVALID_INDEX):
204         acl = VppAcl(self, rules, tag=tag)
205         acl.add_vpp_config()
206         self.logger.info("Dumped ACL: " + str(acl.dump()))
207         # Apply a ACL on the interface as inbound
208         acl_if = VppAclInterface(self, sw_if_index=sw_if_index, n_input=1,
209                                  acls=[acl])
210         return acl.acl_index
211
212     def etype_whitelist(self, whitelist, n_input, add=True):
213         # Apply whitelists on all the interfaces
214         if add:
215             self._wl = []
216             for i in self.pg_interfaces:
217                 self._wl.append(VppEtypeWhitelist(
218                     self, sw_if_index=i.sw_if_index, whitelist=whitelist,
219                     n_input=n_input).add_vpp_config())
220         else:
221             if hasattr(self, "_wl"):
222                 for wl in self._wl:
223                     wl.remove_vpp_config()
224
225     def create_upper_layer(self, packet_index, proto, ports=0):
226         p = self.proto_map[proto]
227         if p == 'UDP':
228             if ports == 0:
229                 return UDP(sport=random.randint(self.udp_sport_from,
230                                                 self.udp_sport_to),
231                            dport=random.randint(self.udp_dport_from,
232                                                 self.udp_dport_to))
233             else:
234                 return UDP(sport=ports, dport=ports)
235         elif p == 'TCP':
236             if ports == 0:
237                 return TCP(sport=random.randint(self.tcp_sport_from,
238                                                 self.tcp_sport_to),
239                            dport=random.randint(self.tcp_dport_from,
240                                                 self.tcp_dport_to))
241             else:
242                 return TCP(sport=ports, dport=ports)
243         return ''
244
245     def create_stream(self, src_if, packet_sizes, traffic_type=0, ipv6=0,
246                       proto=-1, ports=0, fragments=False,
247                       pkt_raw=True, etype=-1):
248         """
249         Create input packet stream for defined interface using hosts or
250         deleted_hosts list.
251
252         :param object src_if: Interface to create packet stream for.
253         :param list packet_sizes: List of required packet sizes.
254         :param traffic_type: 1: ICMP packet, 2: IPv6 with EH, 0: otherwise.
255         :return: Stream of packets.
256         """
257         pkts = []
258         if self.flows.__contains__(src_if):
259             src_hosts = self.hosts_by_pg_idx[src_if.sw_if_index]
260             for dst_if in self.flows[src_if]:
261                 dst_hosts = self.hosts_by_pg_idx[dst_if.sw_if_index]
262                 n_int = len(dst_hosts) * len(src_hosts)
263                 for i in range(0, n_int):
264                     dst_host = dst_hosts[int(i / len(src_hosts))]
265                     src_host = src_hosts[i % len(src_hosts)]
266                     pkt_info = self.create_packet_info(src_if, dst_if)
267                     if ipv6 == 1:
268                         pkt_info.ip = 1
269                     elif ipv6 == 0:
270                         pkt_info.ip = 0
271                     else:
272                         pkt_info.ip = random.choice([0, 1])
273                     if proto == -1:
274                         pkt_info.proto = random.choice(self.proto[self.IP])
275                     else:
276                         pkt_info.proto = proto
277                     payload = self.info_to_payload(pkt_info)
278                     p = Ether(dst=dst_host.mac, src=src_host.mac)
279                     if etype > 0:
280                         p = Ether(dst=dst_host.mac,
281                                   src=src_host.mac,
282                                   type=etype)
283                     if pkt_info.ip:
284                         p /= IPv6(dst=dst_host.ip6, src=src_host.ip6)
285                         if fragments:
286                             p /= IPv6ExtHdrFragment(offset=64, m=1)
287                     else:
288                         if fragments:
289                             p /= IP(src=src_host.ip4, dst=dst_host.ip4,
290                                     flags=1, frag=64)
291                         else:
292                             p /= IP(src=src_host.ip4, dst=dst_host.ip4)
293                     if traffic_type == self.ICMP:
294                         if pkt_info.ip:
295                             p /= ICMPv6EchoRequest(type=self.icmp6_type,
296                                                    code=self.icmp6_code)
297                         else:
298                             p /= ICMP(type=self.icmp4_type,
299                                       code=self.icmp4_code)
300                     else:
301                         p /= self.create_upper_layer(i, pkt_info.proto, ports)
302                     if pkt_raw:
303                         p /= Raw(payload)
304                         pkt_info.data = p.copy()
305                     if pkt_raw:
306                         size = random.choice(packet_sizes)
307                         self.extend_packet(p, size)
308                     pkts.append(p)
309         return pkts
310
311     def verify_capture(self, pg_if, capture,
312                        traffic_type=0, ip_type=0, etype=-1):
313         """
314         Verify captured input packet stream for defined interface.
315
316         :param object pg_if: Interface to verify captured packet stream for.
317         :param list capture: Captured packet stream.
318         :param traffic_type: 1: ICMP packet, 2: IPv6 with EH, 0: otherwise.
319         """
320         last_info = dict()
321         for i in self.pg_interfaces:
322             last_info[i.sw_if_index] = None
323         dst_sw_if_index = pg_if.sw_if_index
324         for packet in capture:
325             if etype > 0:
326                 if packet[Ether].type != etype:
327                     self.logger.error(ppp("Unexpected ethertype in packet:",
328                                           packet))
329                 else:
330                     continue
331             try:
332                 # Raw data for ICMPv6 are stored in ICMPv6EchoRequest.data
333                 if traffic_type == self.ICMP and ip_type == self.IPV6:
334                     payload_info = self.payload_to_info(
335                         packet[ICMPv6EchoRequest], 'data')
336                     payload = packet[ICMPv6EchoRequest]
337                 else:
338                     payload_info = self.payload_to_info(packet[Raw])
339                     payload = packet[self.proto_map[payload_info.proto]]
340             except:
341                 self.logger.error(ppp("Unexpected or invalid packet "
342                                       "(outside network):", packet))
343                 raise
344
345             if ip_type != 0:
346                 self.assertEqual(payload_info.ip, ip_type)
347             if traffic_type == self.ICMP:
348                 try:
349                     if payload_info.ip == 0:
350                         self.assertEqual(payload.type, self.icmp4_type)
351                         self.assertEqual(payload.code, self.icmp4_code)
352                     else:
353                         self.assertEqual(payload.type, self.icmp6_type)
354                         self.assertEqual(payload.code, self.icmp6_code)
355                 except:
356                     self.logger.error(ppp("Unexpected or invalid packet "
357                                           "(outside network):", packet))
358                     raise
359             else:
360                 try:
361                     ip_version = IPv6 if payload_info.ip == 1 else IP
362
363                     ip = packet[ip_version]
364                     packet_index = payload_info.index
365
366                     self.assertEqual(payload_info.dst, dst_sw_if_index)
367                     self.logger.debug("Got packet on port %s: src=%u (id=%u)" %
368                                       (pg_if.name, payload_info.src,
369                                        packet_index))
370                     next_info = self.get_next_packet_info_for_interface2(
371                         payload_info.src, dst_sw_if_index,
372                         last_info[payload_info.src])
373                     last_info[payload_info.src] = next_info
374                     self.assertTrue(next_info is not None)
375                     self.assertEqual(packet_index, next_info.index)
376                     saved_packet = next_info.data
377                     # Check standard fields
378                     self.assertEqual(ip.src, saved_packet[ip_version].src)
379                     self.assertEqual(ip.dst, saved_packet[ip_version].dst)
380                     p = self.proto_map[payload_info.proto]
381                     if p == 'TCP':
382                         tcp = packet[TCP]
383                         self.assertEqual(tcp.sport, saved_packet[
384                             TCP].sport)
385                         self.assertEqual(tcp.dport, saved_packet[
386                             TCP].dport)
387                     elif p == 'UDP':
388                         udp = packet[UDP]
389                         self.assertEqual(udp.sport, saved_packet[
390                             UDP].sport)
391                         self.assertEqual(udp.dport, saved_packet[
392                             UDP].dport)
393                 except:
394                     self.logger.error(ppp("Unexpected or invalid packet:",
395                                           packet))
396                     raise
397         for i in self.pg_interfaces:
398             remaining_packet = self.get_next_packet_info_for_interface2(
399                 i, dst_sw_if_index, last_info[i.sw_if_index])
400             self.assertTrue(
401                 remaining_packet is None,
402                 "Port %u: Packet expected from source %u didn't arrive" %
403                 (dst_sw_if_index, i.sw_if_index))
404
405     def run_traffic_no_check(self):
406         # Test
407         # Create incoming packet streams for packet-generator interfaces
408         for i in self.pg_interfaces:
409             if self.flows.__contains__(i):
410                 pkts = self.create_stream(i, self.pg_if_packet_sizes)
411                 if len(pkts) > 0:
412                     i.add_stream(pkts)
413
414         # Enable packet capture and start packet sending
415         self.pg_enable_capture(self.pg_interfaces)
416         self.pg_start()
417
418     def run_verify_test(self, traffic_type=0, ip_type=0, proto=-1, ports=0,
419                         frags=False, pkt_raw=True, etype=-1):
420         # Test
421         # Create incoming packet streams for packet-generator interfaces
422         pkts_cnt = 0
423         for i in self.pg_interfaces:
424             if self.flows.__contains__(i):
425                 pkts = self.create_stream(i, self.pg_if_packet_sizes,
426                                           traffic_type, ip_type, proto, ports,
427                                           frags, pkt_raw, etype)
428                 if len(pkts) > 0:
429                     i.add_stream(pkts)
430                     pkts_cnt += len(pkts)
431
432         # Enable packet capture and start packet sendingself.IPV
433         self.pg_enable_capture(self.pg_interfaces)
434         self.pg_start()
435         self.logger.info("sent packets count: %d" % pkts_cnt)
436
437         # Verify
438         # Verify outgoing packet streams per packet-generator interface
439         for src_if in self.pg_interfaces:
440             if self.flows.__contains__(src_if):
441                 for dst_if in self.flows[src_if]:
442                     capture = dst_if.get_capture(pkts_cnt)
443                     self.logger.info("Verifying capture on interface %s" %
444                                      dst_if.name)
445                     self.verify_capture(dst_if, capture,
446                                         traffic_type, ip_type, etype)
447
448     def run_verify_negat_test(self, traffic_type=0, ip_type=0, proto=-1,
449                               ports=0, frags=False, etype=-1):
450         # Test
451         pkts_cnt = 0
452         self.reset_packet_infos()
453         for i in self.pg_interfaces:
454             if self.flows.__contains__(i):
455                 pkts = self.create_stream(i, self.pg_if_packet_sizes,
456                                           traffic_type, ip_type, proto, ports,
457                                           frags, True, etype)
458                 if len(pkts) > 0:
459                     i.add_stream(pkts)
460                     pkts_cnt += len(pkts)
461
462         # Enable packet capture and start packet sending
463         self.pg_enable_capture(self.pg_interfaces)
464         self.pg_start()
465         self.logger.info("sent packets count: %d" % pkts_cnt)
466
467         # Verify
468         # Verify outgoing packet streams per packet-generator interface
469         for src_if in self.pg_interfaces:
470             if self.flows.__contains__(src_if):
471                 for dst_if in self.flows[src_if]:
472                     self.logger.info("Verifying capture on interface %s" %
473                                      dst_if.name)
474                     capture = dst_if.get_capture(0)
475                     self.assertEqual(len(capture), 0)
476
477     def test_0000_warmup_test(self):
478         """ ACL plugin version check; learn MACs
479         """
480         reply = self.vapi.papi.acl_plugin_get_version()
481         self.assertEqual(reply.major, 1)
482         self.logger.info("Working with ACL plugin version: %d.%d" % (
483             reply.major, reply.minor))
484         # minor version changes are non breaking
485         # self.assertEqual(reply.minor, 0)
486
487     def test_0001_acl_create(self):
488         """ ACL create/delete test
489         """
490
491         self.logger.info("ACLP_TEST_START_0001")
492         # Create a permit-1234 ACL
493         r = [AclRule(is_permit=1, proto=17, ports=1234)]
494         r[0].sport_to = 1235
495         # Test 1: add a new ACL
496         first_acl = VppAcl(self, rules=r, tag="permit 1234")
497         first_acl.add_vpp_config()
498         self.assertTrue(first_acl.query_vpp_config())
499         # The very first ACL gets #0
500         self.assertEqual(first_acl.acl_index, 0)
501         rr = first_acl.dump()
502         self.logger.info("Dumped ACL: " + str(rr))
503         self.assertEqual(len(rr), 1)
504         # We should have the same number of ACL entries as we had asked
505         self.assertEqual(len(rr[0].r), len(r))
506         # The rules should be the same. But because the submitted and returned
507         # are different types, we need to iterate over rules and keys to get
508         # to basic values.
509         for i_rule in range(0, len(r) - 1):
510             encoded_rule = r[i_rule].encode()
511             for rule_key in encoded_rule:
512                 self.assertEqual(rr[0].r[i_rule][rule_key],
513                                  encoded_rule[rule_key])
514
515         # Create a deny-1234 ACL
516         r_deny = [AclRule(is_permit=0, proto=17, ports=1234),
517                   AclRule(is_permit=1, proto=17, ports=0)]
518         r_deny[0].sport_to = 1235
519         second_acl = VppAcl(self, rules=r_deny, tag="deny 1234;permit all")
520         second_acl.add_vpp_config()
521         self.assertTrue(second_acl.query_vpp_config())
522         # The second ACL gets #1
523         self.assertEqual(second_acl.acl_index, 1)
524
525         # Test 2: try to modify a nonexistent ACL
526         invalid_acl = VppAcl(self, acl_index=432, rules=r, tag="FFFF:FFFF")
527         reply = invalid_acl.add_vpp_config(expect_error=True)
528
529         # apply an ACL on an interface inbound, try to delete ACL, must fail
530         acl_if_list = VppAclInterface(
531             self, sw_if_index=self.pg0.sw_if_index, n_input=1,
532             acls=[first_acl])
533         acl_if_list.add_vpp_config()
534         first_acl.remove_vpp_config(expect_error=True)
535         # Unapply an ACL and then try to delete it - must be ok
536         acl_if_list.remove_vpp_config()
537         first_acl.remove_vpp_config()
538
539         # apply an ACL on an interface inbound, try to delete ACL, must fail
540         acl_if_list = VppAclInterface(
541             self, sw_if_index=self.pg0.sw_if_index, n_input=0,
542             acls=[second_acl])
543         acl_if_list.add_vpp_config()
544         second_acl.remove_vpp_config(expect_error=True)
545         # Unapply an ACL and then try to delete it - must be ok
546         acl_if_list.remove_vpp_config()
547         second_acl.remove_vpp_config()
548
549         # try to apply a nonexistent ACL - must fail
550         acl_if_list = VppAclInterface(
551             self, sw_if_index=self.pg0.sw_if_index, n_input=0,
552             acls=[invalid_acl])
553         acl_if_list.add_vpp_config(expect_error=True)
554
555         self.logger.info("ACLP_TEST_FINISH_0001")
556
557     def test_0002_acl_permit_apply(self):
558         """ permit ACL apply test
559         """
560         self.logger.info("ACLP_TEST_START_0002")
561
562         rules = []
563         rules.append(self.create_rule(self.IPV4, self.PERMIT,
564                                       0, self.proto[self.IP][self.UDP]))
565         rules.append(self.create_rule(self.IPV4, self.PERMIT,
566                                       0, self.proto[self.IP][self.TCP]))
567
568         # Apply rules
569         acl_idx = self.apply_rules(rules, "permit per-flow")
570
571         # enable counters
572         reply = self.vapi.papi.acl_stats_intf_counters_enable(enable=1)
573
574         # Traffic should still pass
575         self.run_verify_test(self.IP, self.IPV4, -1)
576
577         matches = self.statistics.get_counter('/acl/%d/matches' % acl_idx)
578         self.logger.info("stat segment counters: %s" % repr(matches))
579         cli = "show acl-plugin acl"
580         self.logger.info(self.vapi.ppcli(cli))
581         cli = "show acl-plugin tables"
582         self.logger.info(self.vapi.ppcli(cli))
583
584         total_hits = matches[0][0]['packets'] + matches[0][1]['packets']
585         self.assertEqual(total_hits, 64)
586
587         # disable counters
588         reply = self.vapi.papi.acl_stats_intf_counters_enable(enable=0)
589
590         self.logger.info("ACLP_TEST_FINISH_0002")
591
592     def test_0003_acl_deny_apply(self):
593         """ deny ACL apply test
594         """
595         self.logger.info("ACLP_TEST_START_0003")
596         # Add a deny-flows ACL
597         rules = []
598         rules.append(self.create_rule(
599             self.IPV4, self.DENY, self.PORTS_ALL,
600             self.proto[self.IP][self.UDP]))
601         # Permit ip any any in the end
602         rules.append(self.create_rule(self.IPV4, self.PERMIT,
603                                       self.PORTS_ALL, 0))
604
605         # Apply rules
606         acl_idx = self.apply_rules(rules, "deny per-flow;permit all")
607
608         # enable counters
609         reply = self.vapi.papi.acl_stats_intf_counters_enable(enable=1)
610
611         # Traffic should not pass
612         self.run_verify_negat_test(self.IP, self.IPV4,
613                                    self.proto[self.IP][self.UDP])
614
615         matches = self.statistics.get_counter('/acl/%d/matches' % acl_idx)
616         self.logger.info("stat segment counters: %s" % repr(matches))
617         cli = "show acl-plugin acl"
618         self.logger.info(self.vapi.ppcli(cli))
619         cli = "show acl-plugin tables"
620         self.logger.info(self.vapi.ppcli(cli))
621         self.assertEqual(matches[0][0]['packets'], 64)
622         # disable counters
623         reply = self.vapi.papi.acl_stats_intf_counters_enable(enable=0)
624         self.logger.info("ACLP_TEST_FINISH_0003")
625         # self.assertEqual(, 0)
626
627     def test_0004_vpp624_permit_icmpv4(self):
628         """ VPP_624 permit ICMPv4
629         """
630         self.logger.info("ACLP_TEST_START_0004")
631
632         # Add an ACL
633         rules = []
634         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
635                                       self.proto[self.ICMP][self.ICMPv4]))
636         # deny ip any any in the end
637         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
638
639         # Apply rules
640         self.apply_rules(rules, "permit icmpv4")
641
642         # Traffic should still pass
643         self.run_verify_test(self.ICMP, self.IPV4,
644                              self.proto[self.ICMP][self.ICMPv4])
645
646         self.logger.info("ACLP_TEST_FINISH_0004")
647
648     def test_0005_vpp624_permit_icmpv6(self):
649         """ VPP_624 permit ICMPv6
650         """
651         self.logger.info("ACLP_TEST_START_0005")
652
653         # Add an ACL
654         rules = []
655         rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
656                                       self.proto[self.ICMP][self.ICMPv6]))
657         # deny ip any any in the end
658         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
659
660         # Apply rules
661         self.apply_rules(rules, "permit icmpv6")
662
663         # Traffic should still pass
664         self.run_verify_test(self.ICMP, self.IPV6,
665                              self.proto[self.ICMP][self.ICMPv6])
666
667         self.logger.info("ACLP_TEST_FINISH_0005")
668
669     def test_0006_vpp624_deny_icmpv4(self):
670         """ VPP_624 deny ICMPv4
671         """
672         self.logger.info("ACLP_TEST_START_0006")
673         # Add an ACL
674         rules = []
675         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
676                                       self.proto[self.ICMP][self.ICMPv4]))
677         # permit ip any any in the end
678         rules.append(self.create_rule(self.IPV4, self.PERMIT,
679                                       self.PORTS_ALL, 0))
680
681         # Apply rules
682         self.apply_rules(rules, "deny icmpv4")
683
684         # Traffic should not pass
685         self.run_verify_negat_test(self.ICMP, self.IPV4, 0)
686
687         self.logger.info("ACLP_TEST_FINISH_0006")
688
689     def test_0007_vpp624_deny_icmpv6(self):
690         """ VPP_624 deny ICMPv6
691         """
692         self.logger.info("ACLP_TEST_START_0007")
693         # Add an ACL
694         rules = []
695         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
696                                       self.proto[self.ICMP][self.ICMPv6]))
697         # deny ip any any in the end
698         rules.append(self.create_rule(self.IPV6, self.PERMIT,
699                                       self.PORTS_ALL, 0))
700
701         # Apply rules
702         self.apply_rules(rules, "deny icmpv6")
703
704         # Traffic should not pass
705         self.run_verify_negat_test(self.ICMP, self.IPV6, 0)
706
707         self.logger.info("ACLP_TEST_FINISH_0007")
708
709     def test_0008_tcp_permit_v4(self):
710         """ permit TCPv4
711         """
712         self.logger.info("ACLP_TEST_START_0008")
713
714         # Add an ACL
715         rules = []
716         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
717                                       self.proto[self.IP][self.TCP]))
718         # deny ip any any in the end
719         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
720
721         # Apply rules
722         self.apply_rules(rules, "permit ipv4 tcp")
723
724         # Traffic should still pass
725         self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP])
726
727         self.logger.info("ACLP_TEST_FINISH_0008")
728
729     def test_0009_tcp_permit_v6(self):
730         """ permit TCPv6
731         """
732         self.logger.info("ACLP_TEST_START_0009")
733
734         # Add an ACL
735         rules = []
736         rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
737                                       self.proto[self.IP][self.TCP]))
738         # deny ip any any in the end
739         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
740
741         # Apply rules
742         self.apply_rules(rules, "permit ip6 tcp")
743
744         # Traffic should still pass
745         self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.TCP])
746
747         self.logger.info("ACLP_TEST_FINISH_0008")
748
749     def test_0010_udp_permit_v4(self):
750         """ permit UDPv4
751         """
752         self.logger.info("ACLP_TEST_START_0010")
753
754         # Add an ACL
755         rules = []
756         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
757                                       self.proto[self.IP][self.UDP]))
758         # deny ip any any in the end
759         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
760
761         # Apply rules
762         self.apply_rules(rules, "permit ipv udp")
763
764         # Traffic should still pass
765         self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP])
766
767         self.logger.info("ACLP_TEST_FINISH_0010")
768
769     def test_0011_udp_permit_v6(self):
770         """ permit UDPv6
771         """
772         self.logger.info("ACLP_TEST_START_0011")
773
774         # Add an ACL
775         rules = []
776         rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
777                                       self.proto[self.IP][self.UDP]))
778         # deny ip any any in the end
779         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
780
781         # Apply rules
782         self.apply_rules(rules, "permit ip6 udp")
783
784         # Traffic should still pass
785         self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.UDP])
786
787         self.logger.info("ACLP_TEST_FINISH_0011")
788
789     def test_0012_tcp_deny(self):
790         """ deny TCPv4/v6
791         """
792         self.logger.info("ACLP_TEST_START_0012")
793
794         # Add an ACL
795         rules = []
796         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
797                                       self.proto[self.IP][self.TCP]))
798         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
799                                       self.proto[self.IP][self.TCP]))
800         # permit ip any any in the end
801         rules.append(self.create_rule(self.IPV4, self.PERMIT,
802                                       self.PORTS_ALL, 0))
803         rules.append(self.create_rule(self.IPV6, self.PERMIT,
804                                       self.PORTS_ALL, 0))
805
806         # Apply rules
807         self.apply_rules(rules, "deny ip4/ip6 tcp")
808
809         # Traffic should not pass
810         self.run_verify_negat_test(self.IP, self.IPRANDOM,
811                                    self.proto[self.IP][self.TCP])
812
813         self.logger.info("ACLP_TEST_FINISH_0012")
814
815     def test_0013_udp_deny(self):
816         """ deny UDPv4/v6
817         """
818         self.logger.info("ACLP_TEST_START_0013")
819
820         # Add an ACL
821         rules = []
822         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
823                                       self.proto[self.IP][self.UDP]))
824         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
825                                       self.proto[self.IP][self.UDP]))
826         # permit ip any any in the end
827         rules.append(self.create_rule(self.IPV4, self.PERMIT,
828                                       self.PORTS_ALL, 0))
829         rules.append(self.create_rule(self.IPV6, self.PERMIT,
830                                       self.PORTS_ALL, 0))
831
832         # Apply rules
833         self.apply_rules(rules, "deny ip4/ip6 udp")
834
835         # Traffic should not pass
836         self.run_verify_negat_test(self.IP, self.IPRANDOM,
837                                    self.proto[self.IP][self.UDP])
838
839         self.logger.info("ACLP_TEST_FINISH_0013")
840
841     def test_0014_acl_dump(self):
842         """ verify add/dump acls
843         """
844         self.logger.info("ACLP_TEST_START_0014")
845
846         r = [[self.IPV4, self.PERMIT, 1234, self.proto[self.IP][self.TCP]],
847              [self.IPV4, self.PERMIT, 2345, self.proto[self.IP][self.UDP]],
848              [self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.TCP]],
849              [self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.UDP]],
850              [self.IPV4, self.PERMIT, 5, self.proto[self.ICMP][self.ICMPv4]],
851              [self.IPV6, self.PERMIT, 4321, self.proto[self.IP][self.TCP]],
852              [self.IPV6, self.PERMIT, 5432, self.proto[self.IP][self.UDP]],
853              [self.IPV6, self.PERMIT, 0, self.proto[self.IP][self.TCP]],
854              [self.IPV6, self.PERMIT, 0, self.proto[self.IP][self.UDP]],
855              [self.IPV6, self.PERMIT, 6, self.proto[self.ICMP][self.ICMPv6]],
856              [self.IPV4, self.DENY, self.PORTS_ALL, 0],
857              [self.IPV4, self.DENY, 1234, self.proto[self.IP][self.TCP]],
858              [self.IPV4, self.DENY, 2345, self.proto[self.IP][self.UDP]],
859              [self.IPV4, self.DENY, 5, self.proto[self.ICMP][self.ICMPv4]],
860              [self.IPV6, self.DENY, 4321, self.proto[self.IP][self.TCP]],
861              [self.IPV6, self.DENY, 5432, self.proto[self.IP][self.UDP]],
862              [self.IPV6, self.DENY, 6, self.proto[self.ICMP][self.ICMPv6]],
863              [self.IPV6, self.DENY, self.PORTS_ALL, 0]
864              ]
865
866         # Add and verify new ACLs
867         rules = []
868         for i in range(len(r)):
869             rules.append(self.create_rule(r[i][0], r[i][1], r[i][2], r[i][3]))
870
871         acl = VppAcl(self, rules=rules)
872         acl.add_vpp_config()
873         result = acl.dump()
874
875         i = 0
876         for drules in result:
877             for dr in drules.r:
878                 self.assertEqual(dr.is_permit, r[i][1])
879                 self.assertEqual(dr.proto, r[i][3])
880
881                 if r[i][2] > 0:
882                     self.assertEqual(dr.srcport_or_icmptype_first, r[i][2])
883                 else:
884                     if r[i][2] < 0:
885                         self.assertEqual(dr.srcport_or_icmptype_first, 0)
886                         self.assertEqual(dr.srcport_or_icmptype_last, 65535)
887                     else:
888                         if dr.proto == self.proto[self.IP][self.TCP]:
889                             self.assertGreater(dr.srcport_or_icmptype_first,
890                                                self.tcp_sport_from-1)
891                             self.assertLess(dr.srcport_or_icmptype_first,
892                                             self.tcp_sport_to+1)
893                             self.assertGreater(dr.dstport_or_icmpcode_last,
894                                                self.tcp_dport_from-1)
895                             self.assertLess(dr.dstport_or_icmpcode_last,
896                                             self.tcp_dport_to+1)
897                         elif dr.proto == self.proto[self.IP][self.UDP]:
898                             self.assertGreater(dr.srcport_or_icmptype_first,
899                                                self.udp_sport_from-1)
900                             self.assertLess(dr.srcport_or_icmptype_first,
901                                             self.udp_sport_to+1)
902                             self.assertGreater(dr.dstport_or_icmpcode_last,
903                                                self.udp_dport_from-1)
904                             self.assertLess(dr.dstport_or_icmpcode_last,
905                                             self.udp_dport_to+1)
906                 i += 1
907
908         self.logger.info("ACLP_TEST_FINISH_0014")
909
910     def test_0015_tcp_permit_port_v4(self):
911         """ permit single TCPv4
912         """
913         self.logger.info("ACLP_TEST_START_0015")
914
915         port = random.randint(16384, 65535)
916         # Add an ACL
917         rules = []
918         rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
919                                       self.proto[self.IP][self.TCP]))
920         # deny ip any any in the end
921         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
922
923         # Apply rules
924         self.apply_rules(rules, "permit ip4 tcp %d" % port)
925
926         # Traffic should still pass
927         self.run_verify_test(self.IP, self.IPV4,
928                              self.proto[self.IP][self.TCP], port)
929
930         self.logger.info("ACLP_TEST_FINISH_0015")
931
932     def test_0016_udp_permit_port_v4(self):
933         """ permit single UDPv4
934         """
935         self.logger.info("ACLP_TEST_START_0016")
936
937         port = random.randint(16384, 65535)
938         # Add an ACL
939         rules = []
940         rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
941                                       self.proto[self.IP][self.UDP]))
942         # deny ip any any in the end
943         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
944
945         # Apply rules
946         self.apply_rules(rules, "permit ip4 tcp %d" % port)
947
948         # Traffic should still pass
949         self.run_verify_test(self.IP, self.IPV4,
950                              self.proto[self.IP][self.UDP], port)
951
952         self.logger.info("ACLP_TEST_FINISH_0016")
953
954     def test_0017_tcp_permit_port_v6(self):
955         """ permit single TCPv6
956         """
957         self.logger.info("ACLP_TEST_START_0017")
958
959         port = random.randint(16384, 65535)
960         # Add an ACL
961         rules = []
962         rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
963                                       self.proto[self.IP][self.TCP]))
964         # deny ip any any in the end
965         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
966
967         # Apply rules
968         self.apply_rules(rules, "permit ip4 tcp %d" % port)
969
970         # Traffic should still pass
971         self.run_verify_test(self.IP, self.IPV6,
972                              self.proto[self.IP][self.TCP], port)
973
974         self.logger.info("ACLP_TEST_FINISH_0017")
975
976     def test_0018_udp_permit_port_v6(self):
977         """ permit single UDPv6
978         """
979         self.logger.info("ACLP_TEST_START_0018")
980
981         port = random.randint(16384, 65535)
982         # Add an ACL
983         rules = []
984         rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
985                                       self.proto[self.IP][self.UDP]))
986         # deny ip any any in the end
987         rules.append(self.create_rule(self.IPV6, self.DENY,
988                                       self.PORTS_ALL, 0))
989
990         # Apply rules
991         self.apply_rules(rules, "permit ip4 tcp %d" % port)
992
993         # Traffic should still pass
994         self.run_verify_test(self.IP, self.IPV6,
995                              self.proto[self.IP][self.UDP], port)
996
997         self.logger.info("ACLP_TEST_FINISH_0018")
998
999     def test_0019_udp_deny_port(self):
1000         """ deny single TCPv4/v6
1001         """
1002         self.logger.info("ACLP_TEST_START_0019")
1003
1004         port = random.randint(16384, 65535)
1005         # Add an ACL
1006         rules = []
1007         rules.append(self.create_rule(self.IPV4, self.DENY, port,
1008                                       self.proto[self.IP][self.TCP]))
1009         rules.append(self.create_rule(self.IPV6, self.DENY, port,
1010                                       self.proto[self.IP][self.TCP]))
1011         # Permit ip any any in the end
1012         rules.append(self.create_rule(self.IPV4, self.PERMIT,
1013                                       self.PORTS_ALL, 0))
1014         rules.append(self.create_rule(self.IPV6, self.PERMIT,
1015                                       self.PORTS_ALL, 0))
1016
1017         # Apply rules
1018         self.apply_rules(rules, "deny ip4/ip6 udp %d" % port)
1019
1020         # Traffic should not pass
1021         self.run_verify_negat_test(self.IP, self.IPRANDOM,
1022                                    self.proto[self.IP][self.TCP], port)
1023
1024         self.logger.info("ACLP_TEST_FINISH_0019")
1025
1026     def test_0020_udp_deny_port(self):
1027         """ deny single UDPv4/v6
1028         """
1029         self.logger.info("ACLP_TEST_START_0020")
1030
1031         port = random.randint(16384, 65535)
1032         # Add an ACL
1033         rules = []
1034         rules.append(self.create_rule(self.IPV4, self.DENY, port,
1035                                       self.proto[self.IP][self.UDP]))
1036         rules.append(self.create_rule(self.IPV6, self.DENY, port,
1037                                       self.proto[self.IP][self.UDP]))
1038         # Permit ip any any in the end
1039         rules.append(self.create_rule(self.IPV4, self.PERMIT,
1040                                       self.PORTS_ALL, 0))
1041         rules.append(self.create_rule(self.IPV6, self.PERMIT,
1042                                       self.PORTS_ALL, 0))
1043
1044         # Apply rules
1045         self.apply_rules(rules, "deny ip4/ip6 udp %d" % port)
1046
1047         # Traffic should not pass
1048         self.run_verify_negat_test(self.IP, self.IPRANDOM,
1049                                    self.proto[self.IP][self.UDP], port)
1050
1051         self.logger.info("ACLP_TEST_FINISH_0020")
1052
1053     def test_0021_udp_deny_port_verify_fragment_deny(self):
1054         """ deny single UDPv4/v6, permit ip any, verify non-initial fragment
1055         blocked
1056         """
1057         self.logger.info("ACLP_TEST_START_0021")
1058
1059         port = random.randint(16384, 65535)
1060         # Add an ACL
1061         rules = []
1062         rules.append(self.create_rule(self.IPV4, self.DENY, port,
1063                                       self.proto[self.IP][self.UDP]))
1064         rules.append(self.create_rule(self.IPV6, self.DENY, port,
1065                                       self.proto[self.IP][self.UDP]))
1066         # deny ip any any in the end
1067         rules.append(self.create_rule(self.IPV4, self.PERMIT,
1068                                       self.PORTS_ALL, 0))
1069         rules.append(self.create_rule(self.IPV6, self.PERMIT,
1070                                       self.PORTS_ALL, 0))
1071
1072         # Apply rules
1073         self.apply_rules(rules, "deny ip4/ip6 udp %d" % port)
1074
1075         # Traffic should not pass
1076         self.run_verify_negat_test(self.IP, self.IPRANDOM,
1077                                    self.proto[self.IP][self.UDP], port, True)
1078
1079         self.logger.info("ACLP_TEST_FINISH_0021")
1080
1081     def test_0022_zero_length_udp_ipv4(self):
1082         """ VPP-687 zero length udp ipv4 packet"""
1083         self.logger.info("ACLP_TEST_START_0022")
1084
1085         port = random.randint(16384, 65535)
1086         # Add an ACL
1087         rules = []
1088         rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
1089                                       self.proto[self.IP][self.UDP]))
1090         # deny ip any any in the end
1091         rules.append(
1092             self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1093
1094         # Apply rules
1095         self.apply_rules(rules, "permit empty udp ip4 %d" % port)
1096
1097         # Traffic should still pass
1098         # Create incoming packet streams for packet-generator interfaces
1099         pkts_cnt = 0
1100         pkts = self.create_stream(self.pg0, self.pg_if_packet_sizes,
1101                                   self.IP, self.IPV4,
1102                                   self.proto[self.IP][self.UDP], port,
1103                                   False, False)
1104         if len(pkts) > 0:
1105             self.pg0.add_stream(pkts)
1106             pkts_cnt += len(pkts)
1107
1108         # Enable packet capture and start packet sendingself.IPV
1109         self.pg_enable_capture(self.pg_interfaces)
1110         self.pg_start()
1111
1112         self.pg1.get_capture(pkts_cnt)
1113
1114         self.logger.info("ACLP_TEST_FINISH_0022")
1115
1116     def test_0023_zero_length_udp_ipv6(self):
1117         """ VPP-687 zero length udp ipv6 packet"""
1118         self.logger.info("ACLP_TEST_START_0023")
1119
1120         port = random.randint(16384, 65535)
1121         # Add an ACL
1122         rules = []
1123         rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
1124                                       self.proto[self.IP][self.UDP]))
1125         # deny ip any any in the end
1126         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1127
1128         # Apply rules
1129         self.apply_rules(rules, "permit empty udp ip6 %d" % port)
1130
1131         # Traffic should still pass
1132         # Create incoming packet streams for packet-generator interfaces
1133         pkts_cnt = 0
1134         pkts = self.create_stream(self.pg0, self.pg_if_packet_sizes,
1135                                   self.IP, self.IPV6,
1136                                   self.proto[self.IP][self.UDP], port,
1137                                   False, False)
1138         if len(pkts) > 0:
1139             self.pg0.add_stream(pkts)
1140             pkts_cnt += len(pkts)
1141
1142         # Enable packet capture and start packet sendingself.IPV
1143         self.pg_enable_capture(self.pg_interfaces)
1144         self.pg_start()
1145
1146         # Verify outgoing packet streams per packet-generator interface
1147         self.pg1.get_capture(pkts_cnt)
1148
1149         self.logger.info("ACLP_TEST_FINISH_0023")
1150
1151     def test_0108_tcp_permit_v4(self):
1152         """ permit TCPv4 + non-match range
1153         """
1154         self.logger.info("ACLP_TEST_START_0108")
1155
1156         # Add an ACL
1157         rules = []
1158         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1159                                       self.proto[self.IP][self.TCP]))
1160         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1161                                       self.proto[self.IP][self.TCP]))
1162         # deny ip any any in the end
1163         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1164
1165         # Apply rules
1166         self.apply_rules(rules, "permit ipv4 tcp")
1167
1168         # Traffic should still pass
1169         self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP])
1170
1171         self.logger.info("ACLP_TEST_FINISH_0108")
1172
1173     def test_0109_tcp_permit_v6(self):
1174         """ permit TCPv6 + non-match range
1175         """
1176         self.logger.info("ACLP_TEST_START_0109")
1177
1178         # Add an ACL
1179         rules = []
1180         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE_2,
1181                                       self.proto[self.IP][self.TCP]))
1182         rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
1183                                       self.proto[self.IP][self.TCP]))
1184         # deny ip any any in the end
1185         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1186
1187         # Apply rules
1188         self.apply_rules(rules, "permit ip6 tcp")
1189
1190         # Traffic should still pass
1191         self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.TCP])
1192
1193         self.logger.info("ACLP_TEST_FINISH_0109")
1194
1195     def test_0110_udp_permit_v4(self):
1196         """ permit UDPv4 + non-match range
1197         """
1198         self.logger.info("ACLP_TEST_START_0110")
1199
1200         # Add an ACL
1201         rules = []
1202         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1203                                       self.proto[self.IP][self.UDP]))
1204         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1205                                       self.proto[self.IP][self.UDP]))
1206         # deny ip any any in the end
1207         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1208
1209         # Apply rules
1210         self.apply_rules(rules, "permit ipv4 udp")
1211
1212         # Traffic should still pass
1213         self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP])
1214
1215         self.logger.info("ACLP_TEST_FINISH_0110")
1216
1217     def test_0111_udp_permit_v6(self):
1218         """ permit UDPv6 + non-match range
1219         """
1220         self.logger.info("ACLP_TEST_START_0111")
1221
1222         # Add an ACL
1223         rules = []
1224         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE_2,
1225                                       self.proto[self.IP][self.UDP]))
1226         rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
1227                                       self.proto[self.IP][self.UDP]))
1228         # deny ip any any in the end
1229         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1230
1231         # Apply rules
1232         self.apply_rules(rules, "permit ip6 udp")
1233
1234         # Traffic should still pass
1235         self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.UDP])
1236
1237         self.logger.info("ACLP_TEST_FINISH_0111")
1238
1239     def test_0112_tcp_deny(self):
1240         """ deny TCPv4/v6 + non-match range
1241         """
1242         self.logger.info("ACLP_TEST_START_0112")
1243
1244         # Add an ACL
1245         rules = []
1246         rules.append(self.create_rule(self.IPV4, self.PERMIT,
1247                                       self.PORTS_RANGE_2,
1248                                       self.proto[self.IP][self.TCP]))
1249         rules.append(self.create_rule(self.IPV6, self.PERMIT,
1250                                       self.PORTS_RANGE_2,
1251                                       self.proto[self.IP][self.TCP]))
1252         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
1253                                       self.proto[self.IP][self.TCP]))
1254         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
1255                                       self.proto[self.IP][self.TCP]))
1256         # permit ip any any in the end
1257         rules.append(self.create_rule(self.IPV4, self.PERMIT,
1258                                       self.PORTS_ALL, 0))
1259         rules.append(self.create_rule(self.IPV6, self.PERMIT,
1260                                       self.PORTS_ALL, 0))
1261
1262         # Apply rules
1263         self.apply_rules(rules, "deny ip4/ip6 tcp")
1264
1265         # Traffic should not pass
1266         self.run_verify_negat_test(self.IP, self.IPRANDOM,
1267                                    self.proto[self.IP][self.TCP])
1268
1269         self.logger.info("ACLP_TEST_FINISH_0112")
1270
1271     def test_0113_udp_deny(self):
1272         """ deny UDPv4/v6 + non-match range
1273         """
1274         self.logger.info("ACLP_TEST_START_0113")
1275
1276         # Add an ACL
1277         rules = []
1278         rules.append(self.create_rule(self.IPV4, self.PERMIT,
1279                                       self.PORTS_RANGE_2,
1280                                       self.proto[self.IP][self.UDP]))
1281         rules.append(self.create_rule(self.IPV6, self.PERMIT,
1282                                       self.PORTS_RANGE_2,
1283                                       self.proto[self.IP][self.UDP]))
1284         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
1285                                       self.proto[self.IP][self.UDP]))
1286         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
1287                                       self.proto[self.IP][self.UDP]))
1288         # permit ip any any in the end
1289         rules.append(self.create_rule(self.IPV4, self.PERMIT,
1290                                       self.PORTS_ALL, 0))
1291         rules.append(self.create_rule(self.IPV6, self.PERMIT,
1292                                       self.PORTS_ALL, 0))
1293
1294         # Apply rules
1295         self.apply_rules(rules, "deny ip4/ip6 udp")
1296
1297         # Traffic should not pass
1298         self.run_verify_negat_test(self.IP, self.IPRANDOM,
1299                                    self.proto[self.IP][self.UDP])
1300
1301         self.logger.info("ACLP_TEST_FINISH_0113")
1302
1303     def test_0300_tcp_permit_v4_etype_aaaa(self):
1304         """ permit TCPv4, send 0xAAAA etype
1305         """
1306         self.logger.info("ACLP_TEST_START_0300")
1307
1308         # Add an ACL
1309         rules = []
1310         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1311                                       self.proto[self.IP][self.TCP]))
1312         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1313                                       self.proto[self.IP][self.TCP]))
1314         # deny ip any any in the end
1315         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1316
1317         # Apply rules
1318         self.apply_rules(rules, "permit ipv4 tcp")
1319
1320         # Traffic should still pass also for an odd ethertype
1321         self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP],
1322                              0, False, True, 0xaaaa)
1323         self.logger.info("ACLP_TEST_FINISH_0300")
1324
1325     def test_0305_tcp_permit_v4_etype_blacklist_aaaa(self):
1326         """ permit TCPv4, whitelist 0x0BBB ethertype, send 0xAAAA-blocked
1327         """
1328         self.logger.info("ACLP_TEST_START_0305")
1329
1330         # Add an ACL
1331         rules = []
1332         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1333                                       self.proto[self.IP][self.TCP]))
1334         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1335                                       self.proto[self.IP][self.TCP]))
1336         # deny ip any any in the end
1337         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1338
1339         # Apply rules
1340         self.apply_rules(rules, "permit ipv4 tcp")
1341         # whitelist the 0xbbbb etype - so the 0xaaaa should be blocked
1342         self.etype_whitelist([0xbbb], 1)
1343
1344         # The oddball ethertype should be blocked
1345         self.run_verify_negat_test(self.IP, self.IPV4,
1346                                    self.proto[self.IP][self.TCP],
1347                                    0, False, 0xaaaa)
1348
1349         # remove the whitelist
1350         self.etype_whitelist([], 0, add=False)
1351
1352         self.logger.info("ACLP_TEST_FINISH_0305")
1353
1354     def test_0306_tcp_permit_v4_etype_blacklist_aaaa(self):
1355         """ permit TCPv4, whitelist 0x0BBB ethertype, send 0x0BBB - pass
1356         """
1357         self.logger.info("ACLP_TEST_START_0306")
1358
1359         # Add an ACL
1360         rules = []
1361         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1362                                       self.proto[self.IP][self.TCP]))
1363         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1364                                       self.proto[self.IP][self.TCP]))
1365         # deny ip any any in the end
1366         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1367
1368         # Apply rules
1369         self.apply_rules(rules, "permit ipv4 tcp")
1370         # whitelist the 0xbbbb etype - so the 0xaaaa should be blocked
1371         self.etype_whitelist([0xbbb], 1)
1372
1373         # The whitelisted traffic, should pass
1374         self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP],
1375                              0, False, True, 0x0bbb)
1376
1377         # remove the whitelist, the previously blocked 0xAAAA should pass now
1378         self.etype_whitelist([], 0, add=False)
1379
1380         self.logger.info("ACLP_TEST_FINISH_0306")
1381
1382     def test_0307_tcp_permit_v4_etype_blacklist_aaaa(self):
1383         """ permit TCPv4, whitelist 0x0BBB, remove, send 0xAAAA - pass
1384         """
1385         self.logger.info("ACLP_TEST_START_0307")
1386
1387         # Add an ACL
1388         rules = []
1389         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1390                                       self.proto[self.IP][self.TCP]))
1391         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1392                                       self.proto[self.IP][self.TCP]))
1393         # deny ip any any in the end
1394         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1395
1396         # Apply rules
1397         self.apply_rules(rules, "permit ipv4 tcp")
1398
1399         # whitelist the 0xbbbb etype - so the 0xaaaa should be blocked
1400         self.etype_whitelist([0xbbb], 1)
1401         # remove the whitelist, the previously blocked 0xAAAA should pass now
1402         self.etype_whitelist([], 0, add=False)
1403
1404         # The whitelisted traffic, should pass
1405         self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP],
1406                              0, False, True, 0xaaaa)
1407
1408         self.logger.info("ACLP_TEST_FINISH_0306")
1409
1410     def test_0315_del_intf(self):
1411         """ apply an acl and delete the interface
1412         """
1413         self.logger.info("ACLP_TEST_START_0315")
1414
1415         # Add an ACL
1416         rules = []
1417         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1418                                       self.proto[self.IP][self.TCP]))
1419         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1420                                       self.proto[self.IP][self.TCP]))
1421         # deny ip any any in the end
1422         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1423
1424         # create an interface
1425         intf = []
1426         intf.append(VppLoInterface(self))
1427
1428         # Apply rules
1429         self.apply_rules_to(rules, "permit ipv4 tcp", intf[0].sw_if_index)
1430
1431         # Remove the interface
1432         intf[0].remove_vpp_config()
1433
1434         self.logger.info("ACLP_TEST_FINISH_0315")
1435
1436
1437 if __name__ == '__main__':
1438     unittest.main(testRunner=VppTestRunner)