acl: API cleanup
[vpp.git] / src / plugins / acl / test / test_acl_plugin.py
1 #!/usr/bin/env python3
2 """ACL plugin Test Case HLD:
3 """
4
5 import unittest
6 import random
7
8 from scapy.packet import Raw
9 from scapy.layers.l2 import Ether
10 from scapy.layers.inet import IP, TCP, UDP, ICMP
11 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest
12 from scapy.layers.inet6 import IPv6ExtHdrFragment
13 from framework import VppTestCase, VppTestRunner
14 from util import Host, ppp
15 from ipaddress import IPv4Network, IPv6Network
16
17 from vpp_lo_interface import VppLoInterface
18 from vpp_acl import AclRule, VppAcl, VppAclInterface, VppEtypeWhitelist
19 from vpp_ip import INVALID_INDEX
20
21
22 class TestACLplugin(VppTestCase):
23     """ ACL plugin Test Case """
24
25     # traffic types
26     IP = 0
27     ICMP = 1
28
29     # IP version
30     IPRANDOM = -1
31     IPV4 = 0
32     IPV6 = 1
33
34     # rule types
35     DENY = 0
36     PERMIT = 1
37
38     # supported protocols
39     proto = [[6, 17], [1, 58]]
40     proto_map = {1: 'ICMP', 58: 'ICMPv6EchoRequest', 6: 'TCP', 17: 'UDP'}
41     ICMPv4 = 0
42     ICMPv6 = 1
43     TCP = 0
44     UDP = 1
45     PROTO_ALL = 0
46
47     # port ranges
48     PORTS_ALL = -1
49     PORTS_RANGE = 0
50     PORTS_RANGE_2 = 1
51     udp_sport_from = 10
52     udp_sport_to = udp_sport_from + 5
53     udp_dport_from = 20000
54     udp_dport_to = udp_dport_from + 5000
55     tcp_sport_from = 30
56     tcp_sport_to = tcp_sport_from + 5
57     tcp_dport_from = 40000
58     tcp_dport_to = tcp_dport_from + 5000
59
60     udp_sport_from_2 = 90
61     udp_sport_to_2 = udp_sport_from_2 + 5
62     udp_dport_from_2 = 30000
63     udp_dport_to_2 = udp_dport_from_2 + 5000
64     tcp_sport_from_2 = 130
65     tcp_sport_to_2 = tcp_sport_from_2 + 5
66     tcp_dport_from_2 = 20000
67     tcp_dport_to_2 = tcp_dport_from_2 + 5000
68
69     icmp4_type = 8  # echo request
70     icmp4_code = 3
71     icmp6_type = 128  # echo request
72     icmp6_code = 3
73
74     icmp4_type_2 = 8
75     icmp4_code_from_2 = 5
76     icmp4_code_to_2 = 20
77     icmp6_type_2 = 128
78     icmp6_code_from_2 = 8
79     icmp6_code_to_2 = 42
80
81     # Test variables
82     bd_id = 1
83
84     @classmethod
85     def setUpClass(cls):
86         """
87         Perform standard class setup (defined by class method setUpClass in
88         class VppTestCase) before running the test case, set test case related
89         variables and configure VPP.
90         """
91         super(TestACLplugin, cls).setUpClass()
92
93         try:
94             # Create 2 pg interfaces
95             cls.create_pg_interfaces(range(2))
96
97             # Packet flows mapping pg0 -> pg1, pg2 etc.
98             cls.flows = dict()
99             cls.flows[cls.pg0] = [cls.pg1]
100
101             # Packet sizes
102             cls.pg_if_packet_sizes = [64, 512, 1518, 9018]
103
104             # Create BD with MAC learning and unknown unicast flooding disabled
105             # and put interfaces to this BD
106             cls.vapi.bridge_domain_add_del(bd_id=cls.bd_id, uu_flood=1,
107                                            learn=1)
108             for pg_if in cls.pg_interfaces:
109                 cls.vapi.sw_interface_set_l2_bridge(
110                     rx_sw_if_index=pg_if.sw_if_index, bd_id=cls.bd_id)
111
112             # Set up all interfaces
113             for i in cls.pg_interfaces:
114                 i.admin_up()
115
116             # Mapping between packet-generator index and lists of test hosts
117             cls.hosts_by_pg_idx = dict()
118             for pg_if in cls.pg_interfaces:
119                 cls.hosts_by_pg_idx[pg_if.sw_if_index] = []
120
121             # Create list of deleted hosts
122             cls.deleted_hosts_by_pg_idx = dict()
123             for pg_if in cls.pg_interfaces:
124                 cls.deleted_hosts_by_pg_idx[pg_if.sw_if_index] = []
125
126             # warm-up the mac address tables
127             # self.warmup_test()
128             count = 16
129             start = 0
130             n_int = len(cls.pg_interfaces)
131             macs_per_if = count // n_int
132             i = -1
133             for pg_if in cls.pg_interfaces:
134                 i += 1
135                 start_nr = macs_per_if * i + start
136                 end_nr = count + start if i == (n_int - 1) \
137                     else macs_per_if * (i + 1) + start
138                 hosts = cls.hosts_by_pg_idx[pg_if.sw_if_index]
139                 for j in range(int(start_nr), int(end_nr)):
140                     host = Host(
141                         "00:00:00:ff:%02x:%02x" % (pg_if.sw_if_index, j),
142                         "172.17.1%02x.%u" % (pg_if.sw_if_index, j),
143                         "2017:dead:%02x::%u" % (pg_if.sw_if_index, j))
144                     hosts.append(host)
145
146         except Exception:
147             super(TestACLplugin, cls).tearDownClass()
148             raise
149
150     @classmethod
151     def tearDownClass(cls):
152         super(TestACLplugin, cls).tearDownClass()
153
154     def setUp(self):
155         super(TestACLplugin, self).setUp()
156         self.reset_packet_infos()
157
158     def tearDown(self):
159         """
160         Show various debug prints after each test.
161         """
162         super(TestACLplugin, self).tearDown()
163
164     def show_commands_at_teardown(self):
165         cli = "show vlib graph l2-input-feat-arc"
166         self.logger.info(self.vapi.ppcli(cli))
167         cli = "show vlib graph l2-input-feat-arc-end"
168         self.logger.info(self.vapi.ppcli(cli))
169         cli = "show vlib graph l2-output-feat-arc"
170         self.logger.info(self.vapi.ppcli(cli))
171         cli = "show vlib graph l2-output-feat-arc-end"
172         self.logger.info(self.vapi.ppcli(cli))
173         self.logger.info(self.vapi.ppcli("show l2fib verbose"))
174         self.logger.info(self.vapi.ppcli("show acl-plugin acl"))
175         self.logger.info(self.vapi.ppcli("show acl-plugin interface"))
176         self.logger.info(self.vapi.ppcli("show acl-plugin tables"))
177         self.logger.info(self.vapi.ppcli("show bridge-domain %s detail"
178                                          % self.bd_id))
179
180     def create_rule(self, ip=0, permit_deny=0, ports=PORTS_ALL, proto=-1,
181                     s_prefix=0, s_ip=0,
182                     d_prefix=0, d_ip=0):
183         if ip:
184             src_prefix = IPv6Network((s_ip, s_prefix))
185             dst_prefix = IPv6Network((d_ip, d_prefix))
186         else:
187             src_prefix = IPv4Network((s_ip, s_prefix))
188             dst_prefix = IPv4Network((d_ip, d_prefix))
189         return AclRule(is_permit=permit_deny, ports=ports, proto=proto,
190                        src_prefix=src_prefix, dst_prefix=dst_prefix)
191
192     def apply_rules(self, rules, tag=None):
193         acl = VppAcl(self, rules, tag=tag)
194         acl.add_vpp_config()
195         self.logger.info("Dumped ACL: " + str(acl.dump()))
196         # Apply a ACL on the interface as inbound
197         for i in self.pg_interfaces:
198             acl_if = VppAclInterface(
199                 self, sw_if_index=i.sw_if_index, n_input=1, acls=[acl])
200             acl_if.add_vpp_config()
201         return acl.acl_index
202
203     def apply_rules_to(self, rules, tag=None, sw_if_index=INVALID_INDEX):
204         acl = VppAcl(self, rules, tag=tag)
205         acl.add_vpp_config()
206         self.logger.info("Dumped ACL: " + str(acl.dump()))
207         # Apply a ACL on the interface as inbound
208         acl_if = VppAclInterface(self, sw_if_index=sw_if_index, n_input=1,
209                                  acls=[acl])
210         return acl.acl_index
211
212     def etype_whitelist(self, whitelist, n_input, add=True):
213         # Apply whitelists on all the interfaces
214         if add:
215             self._wl = []
216             for i in self.pg_interfaces:
217                 self._wl.append(VppEtypeWhitelist(
218                     self, sw_if_index=i.sw_if_index, whitelist=whitelist,
219                     n_input=n_input).add_vpp_config())
220         else:
221             if hasattr(self, "_wl"):
222                 for wl in self._wl:
223                     wl.remove_vpp_config()
224
225     def create_upper_layer(self, packet_index, proto, ports=0):
226         p = self.proto_map[proto]
227         if p == 'UDP':
228             if ports == 0:
229                 return UDP(sport=random.randint(self.udp_sport_from,
230                                                 self.udp_sport_to),
231                            dport=random.randint(self.udp_dport_from,
232                                                 self.udp_dport_to))
233             else:
234                 return UDP(sport=ports, dport=ports)
235         elif p == 'TCP':
236             if ports == 0:
237                 return TCP(sport=random.randint(self.tcp_sport_from,
238                                                 self.tcp_sport_to),
239                            dport=random.randint(self.tcp_dport_from,
240                                                 self.tcp_dport_to))
241             else:
242                 return TCP(sport=ports, dport=ports)
243         return ''
244
245     def create_stream(self, src_if, packet_sizes, traffic_type=0, ipv6=0,
246                       proto=-1, ports=0, fragments=False,
247                       pkt_raw=True, etype=-1):
248         """
249         Create input packet stream for defined interface using hosts or
250         deleted_hosts list.
251
252         :param object src_if: Interface to create packet stream for.
253         :param list packet_sizes: List of required packet sizes.
254         :param traffic_type: 1: ICMP packet, 2: IPv6 with EH, 0: otherwise.
255         :return: Stream of packets.
256         """
257         pkts = []
258         if self.flows.__contains__(src_if):
259             src_hosts = self.hosts_by_pg_idx[src_if.sw_if_index]
260             for dst_if in self.flows[src_if]:
261                 dst_hosts = self.hosts_by_pg_idx[dst_if.sw_if_index]
262                 n_int = len(dst_hosts) * len(src_hosts)
263                 for i in range(0, n_int):
264                     dst_host = dst_hosts[int(i / len(src_hosts))]
265                     src_host = src_hosts[i % len(src_hosts)]
266                     pkt_info = self.create_packet_info(src_if, dst_if)
267                     if ipv6 == 1:
268                         pkt_info.ip = 1
269                     elif ipv6 == 0:
270                         pkt_info.ip = 0
271                     else:
272                         pkt_info.ip = random.choice([0, 1])
273                     if proto == -1:
274                         pkt_info.proto = random.choice(self.proto[self.IP])
275                     else:
276                         pkt_info.proto = proto
277                     payload = self.info_to_payload(pkt_info)
278                     p = Ether(dst=dst_host.mac, src=src_host.mac)
279                     if etype > 0:
280                         p = Ether(dst=dst_host.mac,
281                                   src=src_host.mac,
282                                   type=etype)
283                     if pkt_info.ip:
284                         p /= IPv6(dst=dst_host.ip6, src=src_host.ip6)
285                         if fragments:
286                             p /= IPv6ExtHdrFragment(offset=64, m=1)
287                     else:
288                         if fragments:
289                             p /= IP(src=src_host.ip4, dst=dst_host.ip4,
290                                     flags=1, frag=64)
291                         else:
292                             p /= IP(src=src_host.ip4, dst=dst_host.ip4)
293                     if traffic_type == self.ICMP:
294                         if pkt_info.ip:
295                             p /= ICMPv6EchoRequest(type=self.icmp6_type,
296                                                    code=self.icmp6_code)
297                         else:
298                             p /= ICMP(type=self.icmp4_type,
299                                       code=self.icmp4_code)
300                     else:
301                         p /= self.create_upper_layer(i, pkt_info.proto, ports)
302                     if pkt_raw:
303                         p /= Raw(payload)
304                         pkt_info.data = p.copy()
305                     if pkt_raw:
306                         size = random.choice(packet_sizes)
307                         self.extend_packet(p, size)
308                     pkts.append(p)
309         return pkts
310
311     def verify_capture(self, pg_if, capture,
312                        traffic_type=0, ip_type=0, etype=-1):
313         """
314         Verify captured input packet stream for defined interface.
315
316         :param object pg_if: Interface to verify captured packet stream for.
317         :param list capture: Captured packet stream.
318         :param traffic_type: 1: ICMP packet, 2: IPv6 with EH, 0: otherwise.
319         """
320         last_info = dict()
321         for i in self.pg_interfaces:
322             last_info[i.sw_if_index] = None
323         dst_sw_if_index = pg_if.sw_if_index
324         for packet in capture:
325             if etype > 0:
326                 if packet[Ether].type != etype:
327                     self.logger.error(ppp("Unexpected ethertype in packet:",
328                                           packet))
329                 else:
330                     continue
331             try:
332                 # Raw data for ICMPv6 are stored in ICMPv6EchoRequest.data
333                 if traffic_type == self.ICMP and ip_type == self.IPV6:
334                     payload_info = self.payload_to_info(
335                         packet[ICMPv6EchoRequest], 'data')
336                     payload = packet[ICMPv6EchoRequest]
337                 else:
338                     payload_info = self.payload_to_info(packet[Raw])
339                     payload = packet[self.proto_map[payload_info.proto]]
340             except:
341                 self.logger.error(ppp("Unexpected or invalid packet "
342                                       "(outside network):", packet))
343                 raise
344
345             if ip_type != 0:
346                 self.assertEqual(payload_info.ip, ip_type)
347             if traffic_type == self.ICMP:
348                 try:
349                     if payload_info.ip == 0:
350                         self.assertEqual(payload.type, self.icmp4_type)
351                         self.assertEqual(payload.code, self.icmp4_code)
352                     else:
353                         self.assertEqual(payload.type, self.icmp6_type)
354                         self.assertEqual(payload.code, self.icmp6_code)
355                 except:
356                     self.logger.error(ppp("Unexpected or invalid packet "
357                                           "(outside network):", packet))
358                     raise
359             else:
360                 try:
361                     ip_version = IPv6 if payload_info.ip == 1 else IP
362
363                     ip = packet[ip_version]
364                     packet_index = payload_info.index
365
366                     self.assertEqual(payload_info.dst, dst_sw_if_index)
367                     self.logger.debug("Got packet on port %s: src=%u (id=%u)" %
368                                       (pg_if.name, payload_info.src,
369                                        packet_index))
370                     next_info = self.get_next_packet_info_for_interface2(
371                         payload_info.src, dst_sw_if_index,
372                         last_info[payload_info.src])
373                     last_info[payload_info.src] = next_info
374                     self.assertTrue(next_info is not None)
375                     self.assertEqual(packet_index, next_info.index)
376                     saved_packet = next_info.data
377                     # Check standard fields
378                     self.assertEqual(ip.src, saved_packet[ip_version].src)
379                     self.assertEqual(ip.dst, saved_packet[ip_version].dst)
380                     p = self.proto_map[payload_info.proto]
381                     if p == 'TCP':
382                         tcp = packet[TCP]
383                         self.assertEqual(tcp.sport, saved_packet[
384                             TCP].sport)
385                         self.assertEqual(tcp.dport, saved_packet[
386                             TCP].dport)
387                     elif p == 'UDP':
388                         udp = packet[UDP]
389                         self.assertEqual(udp.sport, saved_packet[
390                             UDP].sport)
391                         self.assertEqual(udp.dport, saved_packet[
392                             UDP].dport)
393                 except:
394                     self.logger.error(ppp("Unexpected or invalid packet:",
395                                           packet))
396                     raise
397         for i in self.pg_interfaces:
398             remaining_packet = self.get_next_packet_info_for_interface2(
399                 i, dst_sw_if_index, last_info[i.sw_if_index])
400             self.assertTrue(
401                 remaining_packet is None,
402                 "Port %u: Packet expected from source %u didn't arrive" %
403                 (dst_sw_if_index, i.sw_if_index))
404
405     def run_traffic_no_check(self):
406         # Test
407         # Create incoming packet streams for packet-generator interfaces
408         for i in self.pg_interfaces:
409             if self.flows.__contains__(i):
410                 pkts = self.create_stream(i, self.pg_if_packet_sizes)
411                 if len(pkts) > 0:
412                     i.add_stream(pkts)
413
414         # Enable packet capture and start packet sending
415         self.pg_enable_capture(self.pg_interfaces)
416         self.pg_start()
417
418     def run_verify_test(self, traffic_type=0, ip_type=0, proto=-1, ports=0,
419                         frags=False, pkt_raw=True, etype=-1):
420         # Test
421         # Create incoming packet streams for packet-generator interfaces
422         pkts_cnt = 0
423         for i in self.pg_interfaces:
424             if self.flows.__contains__(i):
425                 pkts = self.create_stream(i, self.pg_if_packet_sizes,
426                                           traffic_type, ip_type, proto, ports,
427                                           frags, pkt_raw, etype)
428                 if len(pkts) > 0:
429                     i.add_stream(pkts)
430                     pkts_cnt += len(pkts)
431
432         # Enable packet capture and start packet sendingself.IPV
433         self.pg_enable_capture(self.pg_interfaces)
434         self.pg_start()
435         self.logger.info("sent packets count: %d" % pkts_cnt)
436
437         # Verify
438         # Verify outgoing packet streams per packet-generator interface
439         for src_if in self.pg_interfaces:
440             if self.flows.__contains__(src_if):
441                 for dst_if in self.flows[src_if]:
442                     capture = dst_if.get_capture(pkts_cnt)
443                     self.logger.info("Verifying capture on interface %s" %
444                                      dst_if.name)
445                     self.verify_capture(dst_if, capture,
446                                         traffic_type, ip_type, etype)
447
448     def run_verify_negat_test(self, traffic_type=0, ip_type=0, proto=-1,
449                               ports=0, frags=False, etype=-1):
450         # Test
451         pkts_cnt = 0
452         self.reset_packet_infos()
453         for i in self.pg_interfaces:
454             if self.flows.__contains__(i):
455                 pkts = self.create_stream(i, self.pg_if_packet_sizes,
456                                           traffic_type, ip_type, proto, ports,
457                                           frags, True, etype)
458                 if len(pkts) > 0:
459                     i.add_stream(pkts)
460                     pkts_cnt += len(pkts)
461
462         # Enable packet capture and start packet sending
463         self.pg_enable_capture(self.pg_interfaces)
464         self.pg_start()
465         self.logger.info("sent packets count: %d" % pkts_cnt)
466
467         # Verify
468         # Verify outgoing packet streams per packet-generator interface
469         for src_if in self.pg_interfaces:
470             if self.flows.__contains__(src_if):
471                 for dst_if in self.flows[src_if]:
472                     self.logger.info("Verifying capture on interface %s" %
473                                      dst_if.name)
474                     capture = dst_if.get_capture(0)
475                     self.assertEqual(len(capture), 0)
476
477     def test_0000_warmup_test(self):
478         """ ACL plugin version check; learn MACs
479         """
480         reply = self.vapi.papi.acl_plugin_get_version()
481         self.assertEqual(reply.major, 1)
482         self.logger.info("Working with ACL plugin version: %d.%d" % (
483             reply.major, reply.minor))
484         # minor version changes are non breaking
485         # self.assertEqual(reply.minor, 0)
486
487     def test_0001_acl_create(self):
488         """ ACL create/delete test
489         """
490
491         self.logger.info("ACLP_TEST_START_0001")
492         # Create a permit-1234 ACL
493         r = [AclRule(is_permit=1, proto=17, ports=1234, sport_to=1235)]
494         # Test 1: add a new ACL
495         first_acl = VppAcl(self, rules=r, tag="permit 1234")
496         first_acl.add_vpp_config()
497         self.assertTrue(first_acl.query_vpp_config())
498         # The very first ACL gets #0
499         self.assertEqual(first_acl.acl_index, 0)
500         rr = first_acl.dump()
501         self.logger.info("Dumped ACL: " + str(rr))
502         self.assertEqual(len(rr), 1)
503         # We should have the same number of ACL entries as we had asked
504         self.assertEqual(len(rr[0].r), len(r))
505         # The rules should be the same. But because the submitted and returned
506         # are different types, we need to iterate over rules and keys to get
507         # to basic values.
508         for i_rule in range(0, len(r) - 1):
509             encoded_rule = r[i_rule].encode()
510             for rule_key in encoded_rule:
511                 self.assertEqual(rr[0].r[i_rule][rule_key],
512                                  encoded_rule[rule_key])
513
514         # Create a deny-1234 ACL
515         r_deny = [AclRule(is_permit=0, proto=17, ports=1234, sport_to=1235),
516                   AclRule(is_permit=1, proto=17, ports=0)]
517         second_acl = VppAcl(self, rules=r_deny, tag="deny 1234;permit all")
518         second_acl.add_vpp_config()
519         self.assertTrue(second_acl.query_vpp_config())
520         # The second ACL gets #1
521         self.assertEqual(second_acl.acl_index, 1)
522
523         # Test 2: try to modify a nonexistent ACL
524         invalid_acl = VppAcl(self, acl_index=432, rules=r, tag="FFFF:FFFF")
525         reply = invalid_acl.add_vpp_config(expect_error=True)
526
527         # apply an ACL on an interface inbound, try to delete ACL, must fail
528         acl_if_list = VppAclInterface(
529             self, sw_if_index=self.pg0.sw_if_index, n_input=1,
530             acls=[first_acl])
531         acl_if_list.add_vpp_config()
532         first_acl.remove_vpp_config(expect_error=True)
533         # Unapply an ACL and then try to delete it - must be ok
534         acl_if_list.remove_vpp_config()
535         first_acl.remove_vpp_config()
536
537         # apply an ACL on an interface inbound, try to delete ACL, must fail
538         acl_if_list = VppAclInterface(
539             self, sw_if_index=self.pg0.sw_if_index, n_input=0,
540             acls=[second_acl])
541         acl_if_list.add_vpp_config()
542         second_acl.remove_vpp_config(expect_error=True)
543         # Unapply an ACL and then try to delete it - must be ok
544         acl_if_list.remove_vpp_config()
545         second_acl.remove_vpp_config()
546
547         # try to apply a nonexistent ACL - must fail
548         acl_if_list = VppAclInterface(
549             self, sw_if_index=self.pg0.sw_if_index, n_input=0,
550             acls=[invalid_acl])
551         acl_if_list.add_vpp_config(expect_error=True)
552
553         self.logger.info("ACLP_TEST_FINISH_0001")
554
555     def test_0002_acl_permit_apply(self):
556         """ permit ACL apply test
557         """
558         self.logger.info("ACLP_TEST_START_0002")
559
560         rules = []
561         rules.append(self.create_rule(self.IPV4, self.PERMIT,
562                                       0, self.proto[self.IP][self.UDP]))
563         rules.append(self.create_rule(self.IPV4, self.PERMIT,
564                                       0, self.proto[self.IP][self.TCP]))
565
566         # Apply rules
567         acl_idx = self.apply_rules(rules, "permit per-flow")
568
569         # enable counters
570         reply = self.vapi.papi.acl_stats_intf_counters_enable(enable=1)
571
572         # Traffic should still pass
573         self.run_verify_test(self.IP, self.IPV4, -1)
574
575         matches = self.statistics.get_counter('/acl/%d/matches' % acl_idx)
576         self.logger.info("stat segment counters: %s" % repr(matches))
577         cli = "show acl-plugin acl"
578         self.logger.info(self.vapi.ppcli(cli))
579         cli = "show acl-plugin tables"
580         self.logger.info(self.vapi.ppcli(cli))
581
582         total_hits = matches[0][0]['packets'] + matches[0][1]['packets']
583         self.assertEqual(total_hits, 64)
584
585         # disable counters
586         reply = self.vapi.papi.acl_stats_intf_counters_enable(enable=0)
587
588         self.logger.info("ACLP_TEST_FINISH_0002")
589
590     def test_0003_acl_deny_apply(self):
591         """ deny ACL apply test
592         """
593         self.logger.info("ACLP_TEST_START_0003")
594         # Add a deny-flows ACL
595         rules = []
596         rules.append(self.create_rule(
597             self.IPV4, self.DENY, self.PORTS_ALL,
598             self.proto[self.IP][self.UDP]))
599         # Permit ip any any in the end
600         rules.append(self.create_rule(self.IPV4, self.PERMIT,
601                                       self.PORTS_ALL, 0))
602
603         # Apply rules
604         acl_idx = self.apply_rules(rules, "deny per-flow;permit all")
605
606         # enable counters
607         reply = self.vapi.papi.acl_stats_intf_counters_enable(enable=1)
608
609         # Traffic should not pass
610         self.run_verify_negat_test(self.IP, self.IPV4,
611                                    self.proto[self.IP][self.UDP])
612
613         matches = self.statistics.get_counter('/acl/%d/matches' % acl_idx)
614         self.logger.info("stat segment counters: %s" % repr(matches))
615         cli = "show acl-plugin acl"
616         self.logger.info(self.vapi.ppcli(cli))
617         cli = "show acl-plugin tables"
618         self.logger.info(self.vapi.ppcli(cli))
619         self.assertEqual(matches[0][0]['packets'], 64)
620         # disable counters
621         reply = self.vapi.papi.acl_stats_intf_counters_enable(enable=0)
622         self.logger.info("ACLP_TEST_FINISH_0003")
623         # self.assertEqual(, 0)
624
625     def test_0004_vpp624_permit_icmpv4(self):
626         """ VPP_624 permit ICMPv4
627         """
628         self.logger.info("ACLP_TEST_START_0004")
629
630         # Add an ACL
631         rules = []
632         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
633                                       self.proto[self.ICMP][self.ICMPv4]))
634         # deny ip any any in the end
635         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
636
637         # Apply rules
638         self.apply_rules(rules, "permit icmpv4")
639
640         # Traffic should still pass
641         self.run_verify_test(self.ICMP, self.IPV4,
642                              self.proto[self.ICMP][self.ICMPv4])
643
644         self.logger.info("ACLP_TEST_FINISH_0004")
645
646     def test_0005_vpp624_permit_icmpv6(self):
647         """ VPP_624 permit ICMPv6
648         """
649         self.logger.info("ACLP_TEST_START_0005")
650
651         # Add an ACL
652         rules = []
653         rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
654                                       self.proto[self.ICMP][self.ICMPv6]))
655         # deny ip any any in the end
656         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
657
658         # Apply rules
659         self.apply_rules(rules, "permit icmpv6")
660
661         # Traffic should still pass
662         self.run_verify_test(self.ICMP, self.IPV6,
663                              self.proto[self.ICMP][self.ICMPv6])
664
665         self.logger.info("ACLP_TEST_FINISH_0005")
666
667     def test_0006_vpp624_deny_icmpv4(self):
668         """ VPP_624 deny ICMPv4
669         """
670         self.logger.info("ACLP_TEST_START_0006")
671         # Add an ACL
672         rules = []
673         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
674                                       self.proto[self.ICMP][self.ICMPv4]))
675         # permit ip any any in the end
676         rules.append(self.create_rule(self.IPV4, self.PERMIT,
677                                       self.PORTS_ALL, 0))
678
679         # Apply rules
680         self.apply_rules(rules, "deny icmpv4")
681
682         # Traffic should not pass
683         self.run_verify_negat_test(self.ICMP, self.IPV4, 0)
684
685         self.logger.info("ACLP_TEST_FINISH_0006")
686
687     def test_0007_vpp624_deny_icmpv6(self):
688         """ VPP_624 deny ICMPv6
689         """
690         self.logger.info("ACLP_TEST_START_0007")
691         # Add an ACL
692         rules = []
693         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
694                                       self.proto[self.ICMP][self.ICMPv6]))
695         # deny ip any any in the end
696         rules.append(self.create_rule(self.IPV6, self.PERMIT,
697                                       self.PORTS_ALL, 0))
698
699         # Apply rules
700         self.apply_rules(rules, "deny icmpv6")
701
702         # Traffic should not pass
703         self.run_verify_negat_test(self.ICMP, self.IPV6, 0)
704
705         self.logger.info("ACLP_TEST_FINISH_0007")
706
707     def test_0008_tcp_permit_v4(self):
708         """ permit TCPv4
709         """
710         self.logger.info("ACLP_TEST_START_0008")
711
712         # Add an ACL
713         rules = []
714         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
715                                       self.proto[self.IP][self.TCP]))
716         # deny ip any any in the end
717         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
718
719         # Apply rules
720         self.apply_rules(rules, "permit ipv4 tcp")
721
722         # Traffic should still pass
723         self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP])
724
725         self.logger.info("ACLP_TEST_FINISH_0008")
726
727     def test_0009_tcp_permit_v6(self):
728         """ permit TCPv6
729         """
730         self.logger.info("ACLP_TEST_START_0009")
731
732         # Add an ACL
733         rules = []
734         rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
735                                       self.proto[self.IP][self.TCP]))
736         # deny ip any any in the end
737         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
738
739         # Apply rules
740         self.apply_rules(rules, "permit ip6 tcp")
741
742         # Traffic should still pass
743         self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.TCP])
744
745         self.logger.info("ACLP_TEST_FINISH_0008")
746
747     def test_0010_udp_permit_v4(self):
748         """ permit UDPv4
749         """
750         self.logger.info("ACLP_TEST_START_0010")
751
752         # Add an ACL
753         rules = []
754         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
755                                       self.proto[self.IP][self.UDP]))
756         # deny ip any any in the end
757         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
758
759         # Apply rules
760         self.apply_rules(rules, "permit ipv udp")
761
762         # Traffic should still pass
763         self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP])
764
765         self.logger.info("ACLP_TEST_FINISH_0010")
766
767     def test_0011_udp_permit_v6(self):
768         """ permit UDPv6
769         """
770         self.logger.info("ACLP_TEST_START_0011")
771
772         # Add an ACL
773         rules = []
774         rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
775                                       self.proto[self.IP][self.UDP]))
776         # deny ip any any in the end
777         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
778
779         # Apply rules
780         self.apply_rules(rules, "permit ip6 udp")
781
782         # Traffic should still pass
783         self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.UDP])
784
785         self.logger.info("ACLP_TEST_FINISH_0011")
786
787     def test_0012_tcp_deny(self):
788         """ deny TCPv4/v6
789         """
790         self.logger.info("ACLP_TEST_START_0012")
791
792         # Add an ACL
793         rules = []
794         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
795                                       self.proto[self.IP][self.TCP]))
796         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
797                                       self.proto[self.IP][self.TCP]))
798         # permit ip any any in the end
799         rules.append(self.create_rule(self.IPV4, self.PERMIT,
800                                       self.PORTS_ALL, 0))
801         rules.append(self.create_rule(self.IPV6, self.PERMIT,
802                                       self.PORTS_ALL, 0))
803
804         # Apply rules
805         self.apply_rules(rules, "deny ip4/ip6 tcp")
806
807         # Traffic should not pass
808         self.run_verify_negat_test(self.IP, self.IPRANDOM,
809                                    self.proto[self.IP][self.TCP])
810
811         self.logger.info("ACLP_TEST_FINISH_0012")
812
813     def test_0013_udp_deny(self):
814         """ deny UDPv4/v6
815         """
816         self.logger.info("ACLP_TEST_START_0013")
817
818         # Add an ACL
819         rules = []
820         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
821                                       self.proto[self.IP][self.UDP]))
822         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
823                                       self.proto[self.IP][self.UDP]))
824         # permit ip any any in the end
825         rules.append(self.create_rule(self.IPV4, self.PERMIT,
826                                       self.PORTS_ALL, 0))
827         rules.append(self.create_rule(self.IPV6, self.PERMIT,
828                                       self.PORTS_ALL, 0))
829
830         # Apply rules
831         self.apply_rules(rules, "deny ip4/ip6 udp")
832
833         # Traffic should not pass
834         self.run_verify_negat_test(self.IP, self.IPRANDOM,
835                                    self.proto[self.IP][self.UDP])
836
837         self.logger.info("ACLP_TEST_FINISH_0013")
838
839     def test_0014_acl_dump(self):
840         """ verify add/dump acls
841         """
842         self.logger.info("ACLP_TEST_START_0014")
843
844         r = [[self.IPV4, self.PERMIT, 1234, self.proto[self.IP][self.TCP]],
845              [self.IPV4, self.PERMIT, 2345, self.proto[self.IP][self.UDP]],
846              [self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.TCP]],
847              [self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.UDP]],
848              [self.IPV4, self.PERMIT, 5, self.proto[self.ICMP][self.ICMPv4]],
849              [self.IPV6, self.PERMIT, 4321, self.proto[self.IP][self.TCP]],
850              [self.IPV6, self.PERMIT, 5432, self.proto[self.IP][self.UDP]],
851              [self.IPV6, self.PERMIT, 0, self.proto[self.IP][self.TCP]],
852              [self.IPV6, self.PERMIT, 0, self.proto[self.IP][self.UDP]],
853              [self.IPV6, self.PERMIT, 6, self.proto[self.ICMP][self.ICMPv6]],
854              [self.IPV4, self.DENY, self.PORTS_ALL, 0],
855              [self.IPV4, self.DENY, 1234, self.proto[self.IP][self.TCP]],
856              [self.IPV4, self.DENY, 2345, self.proto[self.IP][self.UDP]],
857              [self.IPV4, self.DENY, 5, self.proto[self.ICMP][self.ICMPv4]],
858              [self.IPV6, self.DENY, 4321, self.proto[self.IP][self.TCP]],
859              [self.IPV6, self.DENY, 5432, self.proto[self.IP][self.UDP]],
860              [self.IPV6, self.DENY, 6, self.proto[self.ICMP][self.ICMPv6]],
861              [self.IPV6, self.DENY, self.PORTS_ALL, 0]
862              ]
863
864         # Add and verify new ACLs
865         rules = []
866         for i in range(len(r)):
867             rules.append(self.create_rule(r[i][0], r[i][1], r[i][2], r[i][3]))
868
869         acl = VppAcl(self, rules=rules)
870         acl.add_vpp_config()
871         result = acl.dump()
872
873         i = 0
874         for drules in result:
875             for dr in drules.r:
876                 self.assertEqual(dr.is_permit, r[i][1])
877                 self.assertEqual(dr.proto, r[i][3])
878
879                 if r[i][2] > 0:
880                     self.assertEqual(dr.srcport_or_icmptype_first, r[i][2])
881                 else:
882                     if r[i][2] < 0:
883                         self.assertEqual(dr.srcport_or_icmptype_first, 0)
884                         self.assertEqual(dr.srcport_or_icmptype_last, 65535)
885                     else:
886                         if dr.proto == self.proto[self.IP][self.TCP]:
887                             self.assertGreater(dr.srcport_or_icmptype_first,
888                                                self.tcp_sport_from-1)
889                             self.assertLess(dr.srcport_or_icmptype_first,
890                                             self.tcp_sport_to+1)
891                             self.assertGreater(dr.dstport_or_icmpcode_last,
892                                                self.tcp_dport_from-1)
893                             self.assertLess(dr.dstport_or_icmpcode_last,
894                                             self.tcp_dport_to+1)
895                         elif dr.proto == self.proto[self.IP][self.UDP]:
896                             self.assertGreater(dr.srcport_or_icmptype_first,
897                                                self.udp_sport_from-1)
898                             self.assertLess(dr.srcport_or_icmptype_first,
899                                             self.udp_sport_to+1)
900                             self.assertGreater(dr.dstport_or_icmpcode_last,
901                                                self.udp_dport_from-1)
902                             self.assertLess(dr.dstport_or_icmpcode_last,
903                                             self.udp_dport_to+1)
904                 i += 1
905
906         self.logger.info("ACLP_TEST_FINISH_0014")
907
908     def test_0015_tcp_permit_port_v4(self):
909         """ permit single TCPv4
910         """
911         self.logger.info("ACLP_TEST_START_0015")
912
913         port = random.randint(16384, 65535)
914         # Add an ACL
915         rules = []
916         rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
917                                       self.proto[self.IP][self.TCP]))
918         # deny ip any any in the end
919         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
920
921         # Apply rules
922         self.apply_rules(rules, "permit ip4 tcp %d" % port)
923
924         # Traffic should still pass
925         self.run_verify_test(self.IP, self.IPV4,
926                              self.proto[self.IP][self.TCP], port)
927
928         self.logger.info("ACLP_TEST_FINISH_0015")
929
930     def test_0016_udp_permit_port_v4(self):
931         """ permit single UDPv4
932         """
933         self.logger.info("ACLP_TEST_START_0016")
934
935         port = random.randint(16384, 65535)
936         # Add an ACL
937         rules = []
938         rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
939                                       self.proto[self.IP][self.UDP]))
940         # deny ip any any in the end
941         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
942
943         # Apply rules
944         self.apply_rules(rules, "permit ip4 tcp %d" % port)
945
946         # Traffic should still pass
947         self.run_verify_test(self.IP, self.IPV4,
948                              self.proto[self.IP][self.UDP], port)
949
950         self.logger.info("ACLP_TEST_FINISH_0016")
951
952     def test_0017_tcp_permit_port_v6(self):
953         """ permit single TCPv6
954         """
955         self.logger.info("ACLP_TEST_START_0017")
956
957         port = random.randint(16384, 65535)
958         # Add an ACL
959         rules = []
960         rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
961                                       self.proto[self.IP][self.TCP]))
962         # deny ip any any in the end
963         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
964
965         # Apply rules
966         self.apply_rules(rules, "permit ip4 tcp %d" % port)
967
968         # Traffic should still pass
969         self.run_verify_test(self.IP, self.IPV6,
970                              self.proto[self.IP][self.TCP], port)
971
972         self.logger.info("ACLP_TEST_FINISH_0017")
973
974     def test_0018_udp_permit_port_v6(self):
975         """ permit single UDPv6
976         """
977         self.logger.info("ACLP_TEST_START_0018")
978
979         port = random.randint(16384, 65535)
980         # Add an ACL
981         rules = []
982         rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
983                                       self.proto[self.IP][self.UDP]))
984         # deny ip any any in the end
985         rules.append(self.create_rule(self.IPV6, self.DENY,
986                                       self.PORTS_ALL, 0))
987
988         # Apply rules
989         self.apply_rules(rules, "permit ip4 tcp %d" % port)
990
991         # Traffic should still pass
992         self.run_verify_test(self.IP, self.IPV6,
993                              self.proto[self.IP][self.UDP], port)
994
995         self.logger.info("ACLP_TEST_FINISH_0018")
996
997     def test_0019_udp_deny_port(self):
998         """ deny single TCPv4/v6
999         """
1000         self.logger.info("ACLP_TEST_START_0019")
1001
1002         port = random.randint(16384, 65535)
1003         # Add an ACL
1004         rules = []
1005         rules.append(self.create_rule(self.IPV4, self.DENY, port,
1006                                       self.proto[self.IP][self.TCP]))
1007         rules.append(self.create_rule(self.IPV6, self.DENY, port,
1008                                       self.proto[self.IP][self.TCP]))
1009         # Permit ip any any in the end
1010         rules.append(self.create_rule(self.IPV4, self.PERMIT,
1011                                       self.PORTS_ALL, 0))
1012         rules.append(self.create_rule(self.IPV6, self.PERMIT,
1013                                       self.PORTS_ALL, 0))
1014
1015         # Apply rules
1016         self.apply_rules(rules, "deny ip4/ip6 udp %d" % port)
1017
1018         # Traffic should not pass
1019         self.run_verify_negat_test(self.IP, self.IPRANDOM,
1020                                    self.proto[self.IP][self.TCP], port)
1021
1022         self.logger.info("ACLP_TEST_FINISH_0019")
1023
1024     def test_0020_udp_deny_port(self):
1025         """ deny single UDPv4/v6
1026         """
1027         self.logger.info("ACLP_TEST_START_0020")
1028
1029         port = random.randint(16384, 65535)
1030         # Add an ACL
1031         rules = []
1032         rules.append(self.create_rule(self.IPV4, self.DENY, port,
1033                                       self.proto[self.IP][self.UDP]))
1034         rules.append(self.create_rule(self.IPV6, self.DENY, port,
1035                                       self.proto[self.IP][self.UDP]))
1036         # Permit ip any any in the end
1037         rules.append(self.create_rule(self.IPV4, self.PERMIT,
1038                                       self.PORTS_ALL, 0))
1039         rules.append(self.create_rule(self.IPV6, self.PERMIT,
1040                                       self.PORTS_ALL, 0))
1041
1042         # Apply rules
1043         self.apply_rules(rules, "deny ip4/ip6 udp %d" % port)
1044
1045         # Traffic should not pass
1046         self.run_verify_negat_test(self.IP, self.IPRANDOM,
1047                                    self.proto[self.IP][self.UDP], port)
1048
1049         self.logger.info("ACLP_TEST_FINISH_0020")
1050
1051     def test_0021_udp_deny_port_verify_fragment_deny(self):
1052         """ deny single UDPv4/v6, permit ip any, verify non-initial fragment
1053         blocked
1054         """
1055         self.logger.info("ACLP_TEST_START_0021")
1056
1057         port = random.randint(16384, 65535)
1058         # Add an ACL
1059         rules = []
1060         rules.append(self.create_rule(self.IPV4, self.DENY, port,
1061                                       self.proto[self.IP][self.UDP]))
1062         rules.append(self.create_rule(self.IPV6, self.DENY, port,
1063                                       self.proto[self.IP][self.UDP]))
1064         # deny ip any any in the end
1065         rules.append(self.create_rule(self.IPV4, self.PERMIT,
1066                                       self.PORTS_ALL, 0))
1067         rules.append(self.create_rule(self.IPV6, self.PERMIT,
1068                                       self.PORTS_ALL, 0))
1069
1070         # Apply rules
1071         self.apply_rules(rules, "deny ip4/ip6 udp %d" % port)
1072
1073         # Traffic should not pass
1074         self.run_verify_negat_test(self.IP, self.IPRANDOM,
1075                                    self.proto[self.IP][self.UDP], port, True)
1076
1077         self.logger.info("ACLP_TEST_FINISH_0021")
1078
1079     def test_0022_zero_length_udp_ipv4(self):
1080         """ VPP-687 zero length udp ipv4 packet"""
1081         self.logger.info("ACLP_TEST_START_0022")
1082
1083         port = random.randint(16384, 65535)
1084         # Add an ACL
1085         rules = []
1086         rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
1087                                       self.proto[self.IP][self.UDP]))
1088         # deny ip any any in the end
1089         rules.append(
1090             self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1091
1092         # Apply rules
1093         self.apply_rules(rules, "permit empty udp ip4 %d" % port)
1094
1095         # Traffic should still pass
1096         # Create incoming packet streams for packet-generator interfaces
1097         pkts_cnt = 0
1098         pkts = self.create_stream(self.pg0, self.pg_if_packet_sizes,
1099                                   self.IP, self.IPV4,
1100                                   self.proto[self.IP][self.UDP], port,
1101                                   False, False)
1102         if len(pkts) > 0:
1103             self.pg0.add_stream(pkts)
1104             pkts_cnt += len(pkts)
1105
1106         # Enable packet capture and start packet sendingself.IPV
1107         self.pg_enable_capture(self.pg_interfaces)
1108         self.pg_start()
1109
1110         self.pg1.get_capture(pkts_cnt)
1111
1112         self.logger.info("ACLP_TEST_FINISH_0022")
1113
1114     def test_0023_zero_length_udp_ipv6(self):
1115         """ VPP-687 zero length udp ipv6 packet"""
1116         self.logger.info("ACLP_TEST_START_0023")
1117
1118         port = random.randint(16384, 65535)
1119         # Add an ACL
1120         rules = []
1121         rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
1122                                       self.proto[self.IP][self.UDP]))
1123         # deny ip any any in the end
1124         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1125
1126         # Apply rules
1127         self.apply_rules(rules, "permit empty udp ip6 %d" % port)
1128
1129         # Traffic should still pass
1130         # Create incoming packet streams for packet-generator interfaces
1131         pkts_cnt = 0
1132         pkts = self.create_stream(self.pg0, self.pg_if_packet_sizes,
1133                                   self.IP, self.IPV6,
1134                                   self.proto[self.IP][self.UDP], port,
1135                                   False, False)
1136         if len(pkts) > 0:
1137             self.pg0.add_stream(pkts)
1138             pkts_cnt += len(pkts)
1139
1140         # Enable packet capture and start packet sendingself.IPV
1141         self.pg_enable_capture(self.pg_interfaces)
1142         self.pg_start()
1143
1144         # Verify outgoing packet streams per packet-generator interface
1145         self.pg1.get_capture(pkts_cnt)
1146
1147         self.logger.info("ACLP_TEST_FINISH_0023")
1148
1149     def test_0108_tcp_permit_v4(self):
1150         """ permit TCPv4 + non-match range
1151         """
1152         self.logger.info("ACLP_TEST_START_0108")
1153
1154         # Add an ACL
1155         rules = []
1156         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1157                                       self.proto[self.IP][self.TCP]))
1158         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1159                                       self.proto[self.IP][self.TCP]))
1160         # deny ip any any in the end
1161         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1162
1163         # Apply rules
1164         self.apply_rules(rules, "permit ipv4 tcp")
1165
1166         # Traffic should still pass
1167         self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP])
1168
1169         self.logger.info("ACLP_TEST_FINISH_0108")
1170
1171     def test_0109_tcp_permit_v6(self):
1172         """ permit TCPv6 + non-match range
1173         """
1174         self.logger.info("ACLP_TEST_START_0109")
1175
1176         # Add an ACL
1177         rules = []
1178         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE_2,
1179                                       self.proto[self.IP][self.TCP]))
1180         rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
1181                                       self.proto[self.IP][self.TCP]))
1182         # deny ip any any in the end
1183         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1184
1185         # Apply rules
1186         self.apply_rules(rules, "permit ip6 tcp")
1187
1188         # Traffic should still pass
1189         self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.TCP])
1190
1191         self.logger.info("ACLP_TEST_FINISH_0109")
1192
1193     def test_0110_udp_permit_v4(self):
1194         """ permit UDPv4 + non-match range
1195         """
1196         self.logger.info("ACLP_TEST_START_0110")
1197
1198         # Add an ACL
1199         rules = []
1200         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1201                                       self.proto[self.IP][self.UDP]))
1202         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1203                                       self.proto[self.IP][self.UDP]))
1204         # deny ip any any in the end
1205         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1206
1207         # Apply rules
1208         self.apply_rules(rules, "permit ipv4 udp")
1209
1210         # Traffic should still pass
1211         self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP])
1212
1213         self.logger.info("ACLP_TEST_FINISH_0110")
1214
1215     def test_0111_udp_permit_v6(self):
1216         """ permit UDPv6 + non-match range
1217         """
1218         self.logger.info("ACLP_TEST_START_0111")
1219
1220         # Add an ACL
1221         rules = []
1222         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE_2,
1223                                       self.proto[self.IP][self.UDP]))
1224         rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
1225                                       self.proto[self.IP][self.UDP]))
1226         # deny ip any any in the end
1227         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1228
1229         # Apply rules
1230         self.apply_rules(rules, "permit ip6 udp")
1231
1232         # Traffic should still pass
1233         self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.UDP])
1234
1235         self.logger.info("ACLP_TEST_FINISH_0111")
1236
1237     def test_0112_tcp_deny(self):
1238         """ deny TCPv4/v6 + non-match range
1239         """
1240         self.logger.info("ACLP_TEST_START_0112")
1241
1242         # Add an ACL
1243         rules = []
1244         rules.append(self.create_rule(self.IPV4, self.PERMIT,
1245                                       self.PORTS_RANGE_2,
1246                                       self.proto[self.IP][self.TCP]))
1247         rules.append(self.create_rule(self.IPV6, self.PERMIT,
1248                                       self.PORTS_RANGE_2,
1249                                       self.proto[self.IP][self.TCP]))
1250         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
1251                                       self.proto[self.IP][self.TCP]))
1252         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
1253                                       self.proto[self.IP][self.TCP]))
1254         # permit ip any any in the end
1255         rules.append(self.create_rule(self.IPV4, self.PERMIT,
1256                                       self.PORTS_ALL, 0))
1257         rules.append(self.create_rule(self.IPV6, self.PERMIT,
1258                                       self.PORTS_ALL, 0))
1259
1260         # Apply rules
1261         self.apply_rules(rules, "deny ip4/ip6 tcp")
1262
1263         # Traffic should not pass
1264         self.run_verify_negat_test(self.IP, self.IPRANDOM,
1265                                    self.proto[self.IP][self.TCP])
1266
1267         self.logger.info("ACLP_TEST_FINISH_0112")
1268
1269     def test_0113_udp_deny(self):
1270         """ deny UDPv4/v6 + non-match range
1271         """
1272         self.logger.info("ACLP_TEST_START_0113")
1273
1274         # Add an ACL
1275         rules = []
1276         rules.append(self.create_rule(self.IPV4, self.PERMIT,
1277                                       self.PORTS_RANGE_2,
1278                                       self.proto[self.IP][self.UDP]))
1279         rules.append(self.create_rule(self.IPV6, self.PERMIT,
1280                                       self.PORTS_RANGE_2,
1281                                       self.proto[self.IP][self.UDP]))
1282         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
1283                                       self.proto[self.IP][self.UDP]))
1284         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
1285                                       self.proto[self.IP][self.UDP]))
1286         # permit ip any any in the end
1287         rules.append(self.create_rule(self.IPV4, self.PERMIT,
1288                                       self.PORTS_ALL, 0))
1289         rules.append(self.create_rule(self.IPV6, self.PERMIT,
1290                                       self.PORTS_ALL, 0))
1291
1292         # Apply rules
1293         self.apply_rules(rules, "deny ip4/ip6 udp")
1294
1295         # Traffic should not pass
1296         self.run_verify_negat_test(self.IP, self.IPRANDOM,
1297                                    self.proto[self.IP][self.UDP])
1298
1299         self.logger.info("ACLP_TEST_FINISH_0113")
1300
1301     def test_0300_tcp_permit_v4_etype_aaaa(self):
1302         """ permit TCPv4, send 0xAAAA etype
1303         """
1304         self.logger.info("ACLP_TEST_START_0300")
1305
1306         # Add an ACL
1307         rules = []
1308         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1309                                       self.proto[self.IP][self.TCP]))
1310         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1311                                       self.proto[self.IP][self.TCP]))
1312         # deny ip any any in the end
1313         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1314
1315         # Apply rules
1316         self.apply_rules(rules, "permit ipv4 tcp")
1317
1318         # Traffic should still pass also for an odd ethertype
1319         self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP],
1320                              0, False, True, 0xaaaa)
1321         self.logger.info("ACLP_TEST_FINISH_0300")
1322
1323     def test_0305_tcp_permit_v4_etype_blacklist_aaaa(self):
1324         """ permit TCPv4, whitelist 0x0BBB ethertype, send 0xAAAA-blocked
1325         """
1326         self.logger.info("ACLP_TEST_START_0305")
1327
1328         # Add an ACL
1329         rules = []
1330         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1331                                       self.proto[self.IP][self.TCP]))
1332         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1333                                       self.proto[self.IP][self.TCP]))
1334         # deny ip any any in the end
1335         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1336
1337         # Apply rules
1338         self.apply_rules(rules, "permit ipv4 tcp")
1339         # whitelist the 0xbbbb etype - so the 0xaaaa should be blocked
1340         self.etype_whitelist([0xbbb], 1)
1341
1342         # The oddball ethertype should be blocked
1343         self.run_verify_negat_test(self.IP, self.IPV4,
1344                                    self.proto[self.IP][self.TCP],
1345                                    0, False, 0xaaaa)
1346
1347         # remove the whitelist
1348         self.etype_whitelist([], 0, add=False)
1349
1350         self.logger.info("ACLP_TEST_FINISH_0305")
1351
1352     def test_0306_tcp_permit_v4_etype_blacklist_aaaa(self):
1353         """ permit TCPv4, whitelist 0x0BBB ethertype, send 0x0BBB - pass
1354         """
1355         self.logger.info("ACLP_TEST_START_0306")
1356
1357         # Add an ACL
1358         rules = []
1359         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1360                                       self.proto[self.IP][self.TCP]))
1361         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1362                                       self.proto[self.IP][self.TCP]))
1363         # deny ip any any in the end
1364         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1365
1366         # Apply rules
1367         self.apply_rules(rules, "permit ipv4 tcp")
1368         # whitelist the 0xbbbb etype - so the 0xaaaa should be blocked
1369         self.etype_whitelist([0xbbb], 1)
1370
1371         # The whitelisted traffic, should pass
1372         self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP],
1373                              0, False, True, 0x0bbb)
1374
1375         # remove the whitelist, the previously blocked 0xAAAA should pass now
1376         self.etype_whitelist([], 0, add=False)
1377
1378         self.logger.info("ACLP_TEST_FINISH_0306")
1379
1380     def test_0307_tcp_permit_v4_etype_blacklist_aaaa(self):
1381         """ permit TCPv4, whitelist 0x0BBB, remove, send 0xAAAA - pass
1382         """
1383         self.logger.info("ACLP_TEST_START_0307")
1384
1385         # Add an ACL
1386         rules = []
1387         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1388                                       self.proto[self.IP][self.TCP]))
1389         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1390                                       self.proto[self.IP][self.TCP]))
1391         # deny ip any any in the end
1392         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1393
1394         # Apply rules
1395         self.apply_rules(rules, "permit ipv4 tcp")
1396
1397         # whitelist the 0xbbbb etype - so the 0xaaaa should be blocked
1398         self.etype_whitelist([0xbbb], 1)
1399         # remove the whitelist, the previously blocked 0xAAAA should pass now
1400         self.etype_whitelist([], 0, add=False)
1401
1402         # The whitelisted traffic, should pass
1403         self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP],
1404                              0, False, True, 0xaaaa)
1405
1406         self.logger.info("ACLP_TEST_FINISH_0306")
1407
1408     def test_0315_del_intf(self):
1409         """ apply an acl and delete the interface
1410         """
1411         self.logger.info("ACLP_TEST_START_0315")
1412
1413         # Add an ACL
1414         rules = []
1415         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1416                                       self.proto[self.IP][self.TCP]))
1417         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1418                                       self.proto[self.IP][self.TCP]))
1419         # deny ip any any in the end
1420         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1421
1422         # create an interface
1423         intf = []
1424         intf.append(VppLoInterface(self))
1425
1426         # Apply rules
1427         self.apply_rules_to(rules, "permit ipv4 tcp", intf[0].sw_if_index)
1428
1429         # Remove the interface
1430         intf[0].remove_vpp_config()
1431
1432         self.logger.info("ACLP_TEST_FINISH_0315")
1433
1434
1435 if __name__ == '__main__':
1436     unittest.main(testRunner=VppTestRunner)