acl: revert acl: api cleanup
[vpp.git] / src / plugins / acl / test / test_acl_plugin.py
1 #!/usr/bin/env python3
2 """ACL plugin Test Case HLD:
3 """
4
5 import unittest
6 import random
7
8 from scapy.packet import Raw
9 from scapy.layers.l2 import Ether
10 from scapy.layers.inet import IP, TCP, UDP, ICMP
11 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest
12 from scapy.layers.inet6 import IPv6ExtHdrFragment
13 from framework import VppTestCase, VppTestRunner
14 from util import Host, ppp
15
16 from vpp_lo_interface import VppLoInterface
17
18
19 class TestACLplugin(VppTestCase):
20     """ ACL plugin Test Case """
21
22     # traffic types
23     IP = 0
24     ICMP = 1
25
26     # IP version
27     IPRANDOM = -1
28     IPV4 = 0
29     IPV6 = 1
30
31     # rule types
32     DENY = 0
33     PERMIT = 1
34
35     # supported protocols
36     proto = [[6, 17], [1, 58]]
37     proto_map = {1: 'ICMP', 58: 'ICMPv6EchoRequest', 6: 'TCP', 17: 'UDP'}
38     ICMPv4 = 0
39     ICMPv6 = 1
40     TCP = 0
41     UDP = 1
42     PROTO_ALL = 0
43
44     # port ranges
45     PORTS_ALL = -1
46     PORTS_RANGE = 0
47     PORTS_RANGE_2 = 1
48     udp_sport_from = 10
49     udp_sport_to = udp_sport_from + 5
50     udp_dport_from = 20000
51     udp_dport_to = udp_dport_from + 5000
52     tcp_sport_from = 30
53     tcp_sport_to = tcp_sport_from + 5
54     tcp_dport_from = 40000
55     tcp_dport_to = tcp_dport_from + 5000
56
57     udp_sport_from_2 = 90
58     udp_sport_to_2 = udp_sport_from_2 + 5
59     udp_dport_from_2 = 30000
60     udp_dport_to_2 = udp_dport_from_2 + 5000
61     tcp_sport_from_2 = 130
62     tcp_sport_to_2 = tcp_sport_from_2 + 5
63     tcp_dport_from_2 = 20000
64     tcp_dport_to_2 = tcp_dport_from_2 + 5000
65
66     icmp4_type = 8  # echo request
67     icmp4_code = 3
68     icmp6_type = 128  # echo request
69     icmp6_code = 3
70
71     icmp4_type_2 = 8
72     icmp4_code_from_2 = 5
73     icmp4_code_to_2 = 20
74     icmp6_type_2 = 128
75     icmp6_code_from_2 = 8
76     icmp6_code_to_2 = 42
77
78     # Test variables
79     bd_id = 1
80
81     @classmethod
82     def setUpClass(cls):
83         """
84         Perform standard class setup (defined by class method setUpClass in
85         class VppTestCase) before running the test case, set test case related
86         variables and configure VPP.
87         """
88         super(TestACLplugin, cls).setUpClass()
89
90         try:
91             # Create 2 pg interfaces
92             cls.create_pg_interfaces(range(2))
93
94             # Packet flows mapping pg0 -> pg1, pg2 etc.
95             cls.flows = dict()
96             cls.flows[cls.pg0] = [cls.pg1]
97
98             # Packet sizes
99             cls.pg_if_packet_sizes = [64, 512, 1518, 9018]
100
101             # Create BD with MAC learning and unknown unicast flooding disabled
102             # and put interfaces to this BD
103             cls.vapi.bridge_domain_add_del(bd_id=cls.bd_id, uu_flood=1,
104                                            learn=1)
105             for pg_if in cls.pg_interfaces:
106                 cls.vapi.sw_interface_set_l2_bridge(
107                     rx_sw_if_index=pg_if.sw_if_index, bd_id=cls.bd_id)
108
109             # Set up all interfaces
110             for i in cls.pg_interfaces:
111                 i.admin_up()
112
113             # Mapping between packet-generator index and lists of test hosts
114             cls.hosts_by_pg_idx = dict()
115             for pg_if in cls.pg_interfaces:
116                 cls.hosts_by_pg_idx[pg_if.sw_if_index] = []
117
118             # Create list of deleted hosts
119             cls.deleted_hosts_by_pg_idx = dict()
120             for pg_if in cls.pg_interfaces:
121                 cls.deleted_hosts_by_pg_idx[pg_if.sw_if_index] = []
122
123             # warm-up the mac address tables
124             # self.warmup_test()
125             count = 16
126             start = 0
127             n_int = len(cls.pg_interfaces)
128             macs_per_if = count // n_int
129             i = -1
130             for pg_if in cls.pg_interfaces:
131                 i += 1
132                 start_nr = macs_per_if * i + start
133                 end_nr = count + start if i == (n_int - 1) \
134                     else macs_per_if * (i + 1) + start
135                 hosts = cls.hosts_by_pg_idx[pg_if.sw_if_index]
136                 for j in range(int(start_nr), int(end_nr)):
137                     host = Host(
138                         "00:00:00:ff:%02x:%02x" % (pg_if.sw_if_index, j),
139                         "172.17.1%02x.%u" % (pg_if.sw_if_index, j),
140                         "2017:dead:%02x::%u" % (pg_if.sw_if_index, j))
141                     hosts.append(host)
142
143         except Exception:
144             super(TestACLplugin, cls).tearDownClass()
145             raise
146
147     @classmethod
148     def tearDownClass(cls):
149         super(TestACLplugin, cls).tearDownClass()
150
151     def setUp(self):
152         super(TestACLplugin, self).setUp()
153         self.reset_packet_infos()
154
155     def tearDown(self):
156         """
157         Show various debug prints after each test.
158         """
159         super(TestACLplugin, self).tearDown()
160
161     def show_commands_at_teardown(self):
162         cli = "show vlib graph l2-input-feat-arc"
163         self.logger.info(self.vapi.ppcli(cli))
164         cli = "show vlib graph l2-input-feat-arc-end"
165         self.logger.info(self.vapi.ppcli(cli))
166         cli = "show vlib graph l2-output-feat-arc"
167         self.logger.info(self.vapi.ppcli(cli))
168         cli = "show vlib graph l2-output-feat-arc-end"
169         self.logger.info(self.vapi.ppcli(cli))
170         self.logger.info(self.vapi.ppcli("show l2fib verbose"))
171         self.logger.info(self.vapi.ppcli("show acl-plugin acl"))
172         self.logger.info(self.vapi.ppcli("show acl-plugin interface"))
173         self.logger.info(self.vapi.ppcli("show acl-plugin tables"))
174         self.logger.info(self.vapi.ppcli("show bridge-domain %s detail"
175                                          % self.bd_id))
176
177     def create_rule(self, ip=0, permit_deny=0, ports=PORTS_ALL, proto=-1,
178                     s_prefix=0, s_ip=b'\x00\x00\x00\x00',
179                     d_prefix=0, d_ip=b'\x00\x00\x00\x00'):
180         if proto == -1:
181             return
182         if ports == self.PORTS_ALL:
183             sport_from = 0
184             dport_from = 0
185             sport_to = 65535 if proto != 1 and proto != 58 else 255
186             dport_to = sport_to
187         elif ports == self.PORTS_RANGE:
188             if proto == 1:
189                 sport_from = self.icmp4_type
190                 sport_to = self.icmp4_type
191                 dport_from = self.icmp4_code
192                 dport_to = self.icmp4_code
193             elif proto == 58:
194                 sport_from = self.icmp6_type
195                 sport_to = self.icmp6_type
196                 dport_from = self.icmp6_code
197                 dport_to = self.icmp6_code
198             elif proto == self.proto[self.IP][self.TCP]:
199                 sport_from = self.tcp_sport_from
200                 sport_to = self.tcp_sport_to
201                 dport_from = self.tcp_dport_from
202                 dport_to = self.tcp_dport_to
203             elif proto == self.proto[self.IP][self.UDP]:
204                 sport_from = self.udp_sport_from
205                 sport_to = self.udp_sport_to
206                 dport_from = self.udp_dport_from
207                 dport_to = self.udp_dport_to
208         elif ports == self.PORTS_RANGE_2:
209             if proto == 1:
210                 sport_from = self.icmp4_type_2
211                 sport_to = self.icmp4_type_2
212                 dport_from = self.icmp4_code_from_2
213                 dport_to = self.icmp4_code_to_2
214             elif proto == 58:
215                 sport_from = self.icmp6_type_2
216                 sport_to = self.icmp6_type_2
217                 dport_from = self.icmp6_code_from_2
218                 dport_to = self.icmp6_code_to_2
219             elif proto == self.proto[self.IP][self.TCP]:
220                 sport_from = self.tcp_sport_from_2
221                 sport_to = self.tcp_sport_to_2
222                 dport_from = self.tcp_dport_from_2
223                 dport_to = self.tcp_dport_to_2
224             elif proto == self.proto[self.IP][self.UDP]:
225                 sport_from = self.udp_sport_from_2
226                 sport_to = self.udp_sport_to_2
227                 dport_from = self.udp_dport_from_2
228                 dport_to = self.udp_dport_to_2
229         else:
230             sport_from = ports
231             sport_to = ports
232             dport_from = ports
233             dport_to = ports
234
235         rule = ({'is_permit': permit_deny, 'is_ipv6': ip, 'proto': proto,
236                  'srcport_or_icmptype_first': sport_from,
237                  'srcport_or_icmptype_last': sport_to,
238                  'src_ip_prefix_len': s_prefix,
239                  'src_ip_addr': s_ip,
240                  'dstport_or_icmpcode_first': dport_from,
241                  'dstport_or_icmpcode_last': dport_to,
242                  'dst_ip_prefix_len': d_prefix,
243                  'dst_ip_addr': d_ip})
244         return rule
245
246     def apply_rules(self, rules, tag=b''):
247         reply = self.vapi.acl_add_replace(acl_index=4294967295, r=rules,
248                                           tag=tag)
249         self.logger.info("Dumped ACL: " + str(
250             self.vapi.acl_dump(reply.acl_index)))
251         # Apply a ACL on the interface as inbound
252         for i in self.pg_interfaces:
253             self.vapi.acl_interface_set_acl_list(sw_if_index=i.sw_if_index,
254                                                  n_input=1,
255                                                  acls=[reply.acl_index])
256         return reply.acl_index
257
258     def apply_rules_to(self, rules, tag=b'', sw_if_index=0xFFFFFFFF):
259         reply = self.vapi.acl_add_replace(acl_index=4294967295, r=rules,
260                                           tag=tag)
261         self.logger.info("Dumped ACL: " + str(
262             self.vapi.acl_dump(reply.acl_index)))
263         # Apply a ACL on the interface as inbound
264         self.vapi.acl_interface_set_acl_list(sw_if_index=sw_if_index,
265                                              n_input=1,
266                                              acls=[reply.acl_index])
267         return reply.acl_index
268
269     def etype_whitelist(self, whitelist, n_input):
270         # Apply whitelists on all the interfaces
271         for i in self.pg_interfaces:
272             # checkstyle can't read long names. Help them.
273             fun = self.vapi.acl_interface_set_etype_whitelist
274             fun(sw_if_index=i.sw_if_index, n_input=n_input,
275                 whitelist=whitelist)
276         return
277
278     def create_upper_layer(self, packet_index, proto, ports=0):
279         p = self.proto_map[proto]
280         if p == 'UDP':
281             if ports == 0:
282                 return UDP(sport=random.randint(self.udp_sport_from,
283                                                 self.udp_sport_to),
284                            dport=random.randint(self.udp_dport_from,
285                                                 self.udp_dport_to))
286             else:
287                 return UDP(sport=ports, dport=ports)
288         elif p == 'TCP':
289             if ports == 0:
290                 return TCP(sport=random.randint(self.tcp_sport_from,
291                                                 self.tcp_sport_to),
292                            dport=random.randint(self.tcp_dport_from,
293                                                 self.tcp_dport_to))
294             else:
295                 return TCP(sport=ports, dport=ports)
296         return ''
297
298     def create_stream(self, src_if, packet_sizes, traffic_type=0, ipv6=0,
299                       proto=-1, ports=0, fragments=False,
300                       pkt_raw=True, etype=-1):
301         """
302         Create input packet stream for defined interface using hosts or
303         deleted_hosts list.
304
305         :param object src_if: Interface to create packet stream for.
306         :param list packet_sizes: List of required packet sizes.
307         :param traffic_type: 1: ICMP packet, 2: IPv6 with EH, 0: otherwise.
308         :return: Stream of packets.
309         """
310         pkts = []
311         if self.flows.__contains__(src_if):
312             src_hosts = self.hosts_by_pg_idx[src_if.sw_if_index]
313             for dst_if in self.flows[src_if]:
314                 dst_hosts = self.hosts_by_pg_idx[dst_if.sw_if_index]
315                 n_int = len(dst_hosts) * len(src_hosts)
316                 for i in range(0, n_int):
317                     dst_host = dst_hosts[int(i / len(src_hosts))]
318                     src_host = src_hosts[i % len(src_hosts)]
319                     pkt_info = self.create_packet_info(src_if, dst_if)
320                     if ipv6 == 1:
321                         pkt_info.ip = 1
322                     elif ipv6 == 0:
323                         pkt_info.ip = 0
324                     else:
325                         pkt_info.ip = random.choice([0, 1])
326                     if proto == -1:
327                         pkt_info.proto = random.choice(self.proto[self.IP])
328                     else:
329                         pkt_info.proto = proto
330                     payload = self.info_to_payload(pkt_info)
331                     p = Ether(dst=dst_host.mac, src=src_host.mac)
332                     if etype > 0:
333                         p = Ether(dst=dst_host.mac,
334                                   src=src_host.mac,
335                                   type=etype)
336                     if pkt_info.ip:
337                         p /= IPv6(dst=dst_host.ip6, src=src_host.ip6)
338                         if fragments:
339                             p /= IPv6ExtHdrFragment(offset=64, m=1)
340                     else:
341                         if fragments:
342                             p /= IP(src=src_host.ip4, dst=dst_host.ip4,
343                                     flags=1, frag=64)
344                         else:
345                             p /= IP(src=src_host.ip4, dst=dst_host.ip4)
346                     if traffic_type == self.ICMP:
347                         if pkt_info.ip:
348                             p /= ICMPv6EchoRequest(type=self.icmp6_type,
349                                                    code=self.icmp6_code)
350                         else:
351                             p /= ICMP(type=self.icmp4_type,
352                                       code=self.icmp4_code)
353                     else:
354                         p /= self.create_upper_layer(i, pkt_info.proto, ports)
355                     if pkt_raw:
356                         p /= Raw(payload)
357                         pkt_info.data = p.copy()
358                     if pkt_raw:
359                         size = random.choice(packet_sizes)
360                         self.extend_packet(p, size)
361                     pkts.append(p)
362         return pkts
363
364     def verify_capture(self, pg_if, capture,
365                        traffic_type=0, ip_type=0, etype=-1):
366         """
367         Verify captured input packet stream for defined interface.
368
369         :param object pg_if: Interface to verify captured packet stream for.
370         :param list capture: Captured packet stream.
371         :param traffic_type: 1: ICMP packet, 2: IPv6 with EH, 0: otherwise.
372         """
373         last_info = dict()
374         for i in self.pg_interfaces:
375             last_info[i.sw_if_index] = None
376         dst_sw_if_index = pg_if.sw_if_index
377         for packet in capture:
378             if etype > 0:
379                 if packet[Ether].type != etype:
380                     self.logger.error(ppp("Unexpected ethertype in packet:",
381                                           packet))
382                 else:
383                     continue
384             try:
385                 # Raw data for ICMPv6 are stored in ICMPv6EchoRequest.data
386                 if traffic_type == self.ICMP and ip_type == self.IPV6:
387                     payload_info = self.payload_to_info(
388                         packet[ICMPv6EchoRequest], 'data')
389                     payload = packet[ICMPv6EchoRequest]
390                 else:
391                     payload_info = self.payload_to_info(packet[Raw])
392                     payload = packet[self.proto_map[payload_info.proto]]
393             except:
394                 self.logger.error(ppp("Unexpected or invalid packet "
395                                       "(outside network):", packet))
396                 raise
397
398             if ip_type != 0:
399                 self.assertEqual(payload_info.ip, ip_type)
400             if traffic_type == self.ICMP:
401                 try:
402                     if payload_info.ip == 0:
403                         self.assertEqual(payload.type, self.icmp4_type)
404                         self.assertEqual(payload.code, self.icmp4_code)
405                     else:
406                         self.assertEqual(payload.type, self.icmp6_type)
407                         self.assertEqual(payload.code, self.icmp6_code)
408                 except:
409                     self.logger.error(ppp("Unexpected or invalid packet "
410                                           "(outside network):", packet))
411                     raise
412             else:
413                 try:
414                     ip_version = IPv6 if payload_info.ip == 1 else IP
415
416                     ip = packet[ip_version]
417                     packet_index = payload_info.index
418
419                     self.assertEqual(payload_info.dst, dst_sw_if_index)
420                     self.logger.debug("Got packet on port %s: src=%u (id=%u)" %
421                                       (pg_if.name, payload_info.src,
422                                        packet_index))
423                     next_info = self.get_next_packet_info_for_interface2(
424                         payload_info.src, dst_sw_if_index,
425                         last_info[payload_info.src])
426                     last_info[payload_info.src] = next_info
427                     self.assertTrue(next_info is not None)
428                     self.assertEqual(packet_index, next_info.index)
429                     saved_packet = next_info.data
430                     # Check standard fields
431                     self.assertEqual(ip.src, saved_packet[ip_version].src)
432                     self.assertEqual(ip.dst, saved_packet[ip_version].dst)
433                     p = self.proto_map[payload_info.proto]
434                     if p == 'TCP':
435                         tcp = packet[TCP]
436                         self.assertEqual(tcp.sport, saved_packet[
437                             TCP].sport)
438                         self.assertEqual(tcp.dport, saved_packet[
439                             TCP].dport)
440                     elif p == 'UDP':
441                         udp = packet[UDP]
442                         self.assertEqual(udp.sport, saved_packet[
443                             UDP].sport)
444                         self.assertEqual(udp.dport, saved_packet[
445                             UDP].dport)
446                 except:
447                     self.logger.error(ppp("Unexpected or invalid packet:",
448                                           packet))
449                     raise
450         for i in self.pg_interfaces:
451             remaining_packet = self.get_next_packet_info_for_interface2(
452                 i, dst_sw_if_index, last_info[i.sw_if_index])
453             self.assertTrue(
454                 remaining_packet is None,
455                 "Port %u: Packet expected from source %u didn't arrive" %
456                 (dst_sw_if_index, i.sw_if_index))
457
458     def run_traffic_no_check(self):
459         # Test
460         # Create incoming packet streams for packet-generator interfaces
461         for i in self.pg_interfaces:
462             if self.flows.__contains__(i):
463                 pkts = self.create_stream(i, self.pg_if_packet_sizes)
464                 if len(pkts) > 0:
465                     i.add_stream(pkts)
466
467         # Enable packet capture and start packet sending
468         self.pg_enable_capture(self.pg_interfaces)
469         self.pg_start()
470
471     def run_verify_test(self, traffic_type=0, ip_type=0, proto=-1, ports=0,
472                         frags=False, pkt_raw=True, etype=-1):
473         # Test
474         # Create incoming packet streams for packet-generator interfaces
475         pkts_cnt = 0
476         for i in self.pg_interfaces:
477             if self.flows.__contains__(i):
478                 pkts = self.create_stream(i, self.pg_if_packet_sizes,
479                                           traffic_type, ip_type, proto, ports,
480                                           frags, pkt_raw, etype)
481                 if len(pkts) > 0:
482                     i.add_stream(pkts)
483                     pkts_cnt += len(pkts)
484
485         # Enable packet capture and start packet sendingself.IPV
486         self.pg_enable_capture(self.pg_interfaces)
487         self.pg_start()
488         self.logger.info("sent packets count: %d" % pkts_cnt)
489
490         # Verify
491         # Verify outgoing packet streams per packet-generator interface
492         for src_if in self.pg_interfaces:
493             if self.flows.__contains__(src_if):
494                 for dst_if in self.flows[src_if]:
495                     capture = dst_if.get_capture(pkts_cnt)
496                     self.logger.info("Verifying capture on interface %s" %
497                                      dst_if.name)
498                     self.verify_capture(dst_if, capture,
499                                         traffic_type, ip_type, etype)
500
501     def run_verify_negat_test(self, traffic_type=0, ip_type=0, proto=-1,
502                               ports=0, frags=False, etype=-1):
503         # Test
504         pkts_cnt = 0
505         self.reset_packet_infos()
506         for i in self.pg_interfaces:
507             if self.flows.__contains__(i):
508                 pkts = self.create_stream(i, self.pg_if_packet_sizes,
509                                           traffic_type, ip_type, proto, ports,
510                                           frags, True, etype)
511                 if len(pkts) > 0:
512                     i.add_stream(pkts)
513                     pkts_cnt += len(pkts)
514
515         # Enable packet capture and start packet sending
516         self.pg_enable_capture(self.pg_interfaces)
517         self.pg_start()
518         self.logger.info("sent packets count: %d" % pkts_cnt)
519
520         # Verify
521         # Verify outgoing packet streams per packet-generator interface
522         for src_if in self.pg_interfaces:
523             if self.flows.__contains__(src_if):
524                 for dst_if in self.flows[src_if]:
525                     self.logger.info("Verifying capture on interface %s" %
526                                      dst_if.name)
527                     capture = dst_if.get_capture(0)
528                     self.assertEqual(len(capture), 0)
529
530     def test_0000_warmup_test(self):
531         """ ACL plugin version check; learn MACs
532         """
533         reply = self.vapi.papi.acl_plugin_get_version()
534         self.assertEqual(reply.major, 1)
535         self.logger.info("Working with ACL plugin version: %d.%d" % (
536             reply.major, reply.minor))
537         # minor version changes are non breaking
538         # self.assertEqual(reply.minor, 0)
539
540     def test_0001_acl_create(self):
541         """ ACL create/delete test
542         """
543
544         self.logger.info("ACLP_TEST_START_0001")
545         # Add an ACL
546         r = [{'is_permit': 1, 'is_ipv6': 0, 'proto': 17,
547               'srcport_or_icmptype_first': 1234,
548               'srcport_or_icmptype_last': 1235,
549               'src_ip_prefix_len': 0,
550               'src_ip_addr': b'\x00\x00\x00\x00',
551               'dstport_or_icmpcode_first': 1234,
552               'dstport_or_icmpcode_last': 1234,
553               'dst_ip_addr': b'\x00\x00\x00\x00',
554               'dst_ip_prefix_len': 0}]
555         # Test 1: add a new ACL
556         reply = self.vapi.acl_add_replace(acl_index=4294967295, r=r,
557                                           tag=b"permit 1234")
558         self.assertEqual(reply.retval, 0)
559         # The very first ACL gets #0
560         self.assertEqual(reply.acl_index, 0)
561         first_acl = reply.acl_index
562         rr = self.vapi.acl_dump(reply.acl_index)
563         self.logger.info("Dumped ACL: " + str(rr))
564         self.assertEqual(len(rr), 1)
565         # We should have the same number of ACL entries as we had asked
566         self.assertEqual(len(rr[0].r), len(r))
567         # The rules should be the same. But because the submitted and returned
568         # are different types, we need to iterate over rules and keys to get
569         # to basic values.
570         for i_rule in range(0, len(r) - 1):
571             for rule_key in r[i_rule]:
572                 self.assertEqual(rr[0].r[i_rule][rule_key],
573                                  r[i_rule][rule_key])
574
575         # Add a deny-1234 ACL
576         r_deny = [{'is_permit': 0, 'is_ipv6': 0, 'proto': 17,
577                    'srcport_or_icmptype_first': 1234,
578                    'srcport_or_icmptype_last': 1235,
579                    'src_ip_prefix_len': 0,
580                    'src_ip_addr': b'\x00\x00\x00\x00',
581                    'dstport_or_icmpcode_first': 1234,
582                    'dstport_or_icmpcode_last': 1234,
583                    'dst_ip_addr': b'\x00\x00\x00\x00',
584                    'dst_ip_prefix_len': 0},
585                   {'is_permit': 1, 'is_ipv6': 0, 'proto': 17,
586                    'srcport_or_icmptype_first': 0,
587                    'srcport_or_icmptype_last': 0,
588                    'src_ip_prefix_len': 0,
589                    'src_ip_addr': b'\x00\x00\x00\x00',
590                    'dstport_or_icmpcode_first': 0,
591                    'dstport_or_icmpcode_last': 0,
592                    'dst_ip_addr': b'\x00\x00\x00\x00',
593                    'dst_ip_prefix_len': 0}]
594
595         reply = self.vapi.acl_add_replace(acl_index=4294967295, r=r_deny,
596                                           tag=b"deny 1234;permit all")
597         self.assertEqual(reply.retval, 0)
598         # The second ACL gets #1
599         self.assertEqual(reply.acl_index, 1)
600         second_acl = reply.acl_index
601
602         # Test 2: try to modify a nonexistent ACL
603         reply = self.vapi.acl_add_replace(acl_index=432, r=r,
604                                           tag=b"FFFF:FFFF", expected_retval=-6)
605         self.assertEqual(reply.retval, -6)
606         # The ACL number should pass through
607         self.assertEqual(reply.acl_index, 432)
608         # apply an ACL on an interface inbound, try to delete ACL, must fail
609         self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
610                                              n_input=1,
611                                              acls=[first_acl])
612         reply = self.vapi.acl_del(acl_index=first_acl, expected_retval=-142)
613         # Unapply an ACL and then try to delete it - must be ok
614         self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
615                                              n_input=0,
616                                              acls=[])
617         reply = self.vapi.acl_del(acl_index=first_acl, expected_retval=0)
618
619         # apply an ACL on an interface outbound, try to delete ACL, must fail
620         self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
621                                              n_input=0,
622                                              acls=[second_acl])
623         reply = self.vapi.acl_del(acl_index=second_acl, expected_retval=-143)
624         # Unapply the ACL and then try to delete it - must be ok
625         self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
626                                              n_input=0,
627                                              acls=[])
628         reply = self.vapi.acl_del(acl_index=second_acl, expected_retval=0)
629
630         # try to apply a nonexistent ACL - must fail
631         self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
632                                              n_input=1,
633                                              acls=[first_acl],
634                                              expected_retval=-6)
635
636         self.logger.info("ACLP_TEST_FINISH_0001")
637
638     def test_0002_acl_permit_apply(self):
639         """ permit ACL apply test
640         """
641         self.logger.info("ACLP_TEST_START_0002")
642
643         rules = []
644         rules.append(self.create_rule(self.IPV4, self.PERMIT,
645                      0, self.proto[self.IP][self.UDP]))
646         rules.append(self.create_rule(self.IPV4, self.PERMIT,
647                      0, self.proto[self.IP][self.TCP]))
648
649         # Apply rules
650         acl_idx = self.apply_rules(rules, b"permit per-flow")
651
652         # enable counters
653         reply = self.vapi.papi.acl_stats_intf_counters_enable(enable=1)
654
655         # Traffic should still pass
656         self.run_verify_test(self.IP, self.IPV4, -1)
657
658         matches = self.statistics.get_counter('/acl/%d/matches' % acl_idx)
659         self.logger.info("stat segment counters: %s" % repr(matches))
660         cli = "show acl-plugin acl"
661         self.logger.info(self.vapi.ppcli(cli))
662         cli = "show acl-plugin tables"
663         self.logger.info(self.vapi.ppcli(cli))
664
665         total_hits = matches[0][0]['packets'] + matches[0][1]['packets']
666         self.assertEqual(total_hits, 64)
667
668         # disable counters
669         reply = self.vapi.papi.acl_stats_intf_counters_enable(enable=0)
670
671         self.logger.info("ACLP_TEST_FINISH_0002")
672
673     def test_0003_acl_deny_apply(self):
674         """ deny ACL apply test
675         """
676         self.logger.info("ACLP_TEST_START_0003")
677         # Add a deny-flows ACL
678         rules = []
679         rules.append(self.create_rule(self.IPV4, self.DENY,
680                      self.PORTS_ALL, self.proto[self.IP][self.UDP]))
681         # Permit ip any any in the end
682         rules.append(self.create_rule(self.IPV4, self.PERMIT,
683                                       self.PORTS_ALL, 0))
684
685         # Apply rules
686         acl_idx = self.apply_rules(rules, b"deny per-flow;permit all")
687
688         # enable counters
689         reply = self.vapi.papi.acl_stats_intf_counters_enable(enable=1)
690
691         # Traffic should not pass
692         self.run_verify_negat_test(self.IP, self.IPV4,
693                                    self.proto[self.IP][self.UDP])
694
695         matches = self.statistics.get_counter('/acl/%d/matches' % acl_idx)
696         self.logger.info("stat segment counters: %s" % repr(matches))
697         cli = "show acl-plugin acl"
698         self.logger.info(self.vapi.ppcli(cli))
699         cli = "show acl-plugin tables"
700         self.logger.info(self.vapi.ppcli(cli))
701         self.assertEqual(matches[0][0]['packets'], 64)
702         # disable counters
703         reply = self.vapi.papi.acl_stats_intf_counters_enable(enable=0)
704         self.logger.info("ACLP_TEST_FINISH_0003")
705         # self.assertEqual(, 0)
706
707     def test_0004_vpp624_permit_icmpv4(self):
708         """ VPP_624 permit ICMPv4
709         """
710         self.logger.info("ACLP_TEST_START_0004")
711
712         # Add an ACL
713         rules = []
714         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
715                                       self.proto[self.ICMP][self.ICMPv4]))
716         # deny ip any any in the end
717         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
718
719         # Apply rules
720         self.apply_rules(rules, b"permit icmpv4")
721
722         # Traffic should still pass
723         self.run_verify_test(self.ICMP, self.IPV4,
724                              self.proto[self.ICMP][self.ICMPv4])
725
726         self.logger.info("ACLP_TEST_FINISH_0004")
727
728     def test_0005_vpp624_permit_icmpv6(self):
729         """ VPP_624 permit ICMPv6
730         """
731         self.logger.info("ACLP_TEST_START_0005")
732
733         # Add an ACL
734         rules = []
735         rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
736                                       self.proto[self.ICMP][self.ICMPv6]))
737         # deny ip any any in the end
738         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
739
740         # Apply rules
741         self.apply_rules(rules, b"permit icmpv6")
742
743         # Traffic should still pass
744         self.run_verify_test(self.ICMP, self.IPV6,
745                              self.proto[self.ICMP][self.ICMPv6])
746
747         self.logger.info("ACLP_TEST_FINISH_0005")
748
749     def test_0006_vpp624_deny_icmpv4(self):
750         """ VPP_624 deny ICMPv4
751         """
752         self.logger.info("ACLP_TEST_START_0006")
753         # Add an ACL
754         rules = []
755         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
756                                       self.proto[self.ICMP][self.ICMPv4]))
757         # permit ip any any in the end
758         rules.append(self.create_rule(self.IPV4, self.PERMIT,
759                                       self.PORTS_ALL, 0))
760
761         # Apply rules
762         self.apply_rules(rules, b"deny icmpv4")
763
764         # Traffic should not pass
765         self.run_verify_negat_test(self.ICMP, self.IPV4, 0)
766
767         self.logger.info("ACLP_TEST_FINISH_0006")
768
769     def test_0007_vpp624_deny_icmpv6(self):
770         """ VPP_624 deny ICMPv6
771         """
772         self.logger.info("ACLP_TEST_START_0007")
773         # Add an ACL
774         rules = []
775         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
776                                       self.proto[self.ICMP][self.ICMPv6]))
777         # deny ip any any in the end
778         rules.append(self.create_rule(self.IPV6, self.PERMIT,
779                                       self.PORTS_ALL, 0))
780
781         # Apply rules
782         self.apply_rules(rules, b"deny icmpv6")
783
784         # Traffic should not pass
785         self.run_verify_negat_test(self.ICMP, self.IPV6, 0)
786
787         self.logger.info("ACLP_TEST_FINISH_0007")
788
789     def test_0008_tcp_permit_v4(self):
790         """ permit TCPv4
791         """
792         self.logger.info("ACLP_TEST_START_0008")
793
794         # Add an ACL
795         rules = []
796         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
797                      self.proto[self.IP][self.TCP]))
798         # deny ip any any in the end
799         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
800
801         # Apply rules
802         self.apply_rules(rules, b"permit ipv4 tcp")
803
804         # Traffic should still pass
805         self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP])
806
807         self.logger.info("ACLP_TEST_FINISH_0008")
808
809     def test_0009_tcp_permit_v6(self):
810         """ permit TCPv6
811         """
812         self.logger.info("ACLP_TEST_START_0009")
813
814         # Add an ACL
815         rules = []
816         rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
817                                       self.proto[self.IP][self.TCP]))
818         # deny ip any any in the end
819         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
820
821         # Apply rules
822         self.apply_rules(rules, b"permit ip6 tcp")
823
824         # Traffic should still pass
825         self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.TCP])
826
827         self.logger.info("ACLP_TEST_FINISH_0008")
828
829     def test_0010_udp_permit_v4(self):
830         """ permit UDPv4
831         """
832         self.logger.info("ACLP_TEST_START_0010")
833
834         # Add an ACL
835         rules = []
836         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
837                                       self.proto[self.IP][self.UDP]))
838         # deny ip any any in the end
839         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
840
841         # Apply rules
842         self.apply_rules(rules, b"permit ipv udp")
843
844         # Traffic should still pass
845         self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP])
846
847         self.logger.info("ACLP_TEST_FINISH_0010")
848
849     def test_0011_udp_permit_v6(self):
850         """ permit UDPv6
851         """
852         self.logger.info("ACLP_TEST_START_0011")
853
854         # Add an ACL
855         rules = []
856         rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
857                                       self.proto[self.IP][self.UDP]))
858         # deny ip any any in the end
859         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
860
861         # Apply rules
862         self.apply_rules(rules, b"permit ip6 udp")
863
864         # Traffic should still pass
865         self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.UDP])
866
867         self.logger.info("ACLP_TEST_FINISH_0011")
868
869     def test_0012_tcp_deny(self):
870         """ deny TCPv4/v6
871         """
872         self.logger.info("ACLP_TEST_START_0012")
873
874         # Add an ACL
875         rules = []
876         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
877                                       self.proto[self.IP][self.TCP]))
878         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
879                                       self.proto[self.IP][self.TCP]))
880         # permit ip any any in the end
881         rules.append(self.create_rule(self.IPV4, self.PERMIT,
882                                       self.PORTS_ALL, 0))
883         rules.append(self.create_rule(self.IPV6, self.PERMIT,
884                                       self.PORTS_ALL, 0))
885
886         # Apply rules
887         self.apply_rules(rules, b"deny ip4/ip6 tcp")
888
889         # Traffic should not pass
890         self.run_verify_negat_test(self.IP, self.IPRANDOM,
891                                    self.proto[self.IP][self.TCP])
892
893         self.logger.info("ACLP_TEST_FINISH_0012")
894
895     def test_0013_udp_deny(self):
896         """ deny UDPv4/v6
897         """
898         self.logger.info("ACLP_TEST_START_0013")
899
900         # Add an ACL
901         rules = []
902         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
903                                       self.proto[self.IP][self.UDP]))
904         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
905                                       self.proto[self.IP][self.UDP]))
906         # permit ip any any in the end
907         rules.append(self.create_rule(self.IPV4, self.PERMIT,
908                                       self.PORTS_ALL, 0))
909         rules.append(self.create_rule(self.IPV6, self.PERMIT,
910                                       self.PORTS_ALL, 0))
911
912         # Apply rules
913         self.apply_rules(rules, b"deny ip4/ip6 udp")
914
915         # Traffic should not pass
916         self.run_verify_negat_test(self.IP, self.IPRANDOM,
917                                    self.proto[self.IP][self.UDP])
918
919         self.logger.info("ACLP_TEST_FINISH_0013")
920
921     def test_0014_acl_dump(self):
922         """ verify add/dump acls
923         """
924         self.logger.info("ACLP_TEST_START_0014")
925
926         r = [[self.IPV4, self.PERMIT, 1234, self.proto[self.IP][self.TCP]],
927              [self.IPV4, self.PERMIT, 2345, self.proto[self.IP][self.UDP]],
928              [self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.TCP]],
929              [self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.UDP]],
930              [self.IPV4, self.PERMIT, 5, self.proto[self.ICMP][self.ICMPv4]],
931              [self.IPV6, self.PERMIT, 4321, self.proto[self.IP][self.TCP]],
932              [self.IPV6, self.PERMIT, 5432, self.proto[self.IP][self.UDP]],
933              [self.IPV6, self.PERMIT, 0, self.proto[self.IP][self.TCP]],
934              [self.IPV6, self.PERMIT, 0, self.proto[self.IP][self.UDP]],
935              [self.IPV6, self.PERMIT, 6, self.proto[self.ICMP][self.ICMPv6]],
936              [self.IPV4, self.DENY, self.PORTS_ALL, 0],
937              [self.IPV4, self.DENY, 1234, self.proto[self.IP][self.TCP]],
938              [self.IPV4, self.DENY, 2345, self.proto[self.IP][self.UDP]],
939              [self.IPV4, self.DENY, 5, self.proto[self.ICMP][self.ICMPv4]],
940              [self.IPV6, self.DENY, 4321, self.proto[self.IP][self.TCP]],
941              [self.IPV6, self.DENY, 5432, self.proto[self.IP][self.UDP]],
942              [self.IPV6, self.DENY, 6, self.proto[self.ICMP][self.ICMPv6]],
943              [self.IPV6, self.DENY, self.PORTS_ALL, 0]
944              ]
945
946         # Add and verify new ACLs
947         rules = []
948         for i in range(len(r)):
949             rules.append(self.create_rule(r[i][0], r[i][1], r[i][2], r[i][3]))
950
951         reply = self.vapi.acl_add_replace(acl_index=4294967295, r=rules)
952         result = self.vapi.acl_dump(reply.acl_index)
953
954         i = 0
955         for drules in result:
956             for dr in drules.r:
957                 self.assertEqual(dr.is_ipv6, r[i][0])
958                 self.assertEqual(dr.is_permit, r[i][1])
959                 self.assertEqual(dr.proto, r[i][3])
960
961                 if r[i][2] > 0:
962                     self.assertEqual(dr.srcport_or_icmptype_first, r[i][2])
963                 else:
964                     if r[i][2] < 0:
965                         self.assertEqual(dr.srcport_or_icmptype_first, 0)
966                         self.assertEqual(dr.srcport_or_icmptype_last, 65535)
967                     else:
968                         if dr.proto == self.proto[self.IP][self.TCP]:
969                             self.assertGreater(dr.srcport_or_icmptype_first,
970                                                self.tcp_sport_from-1)
971                             self.assertLess(dr.srcport_or_icmptype_first,
972                                             self.tcp_sport_to+1)
973                             self.assertGreater(dr.dstport_or_icmpcode_last,
974                                                self.tcp_dport_from-1)
975                             self.assertLess(dr.dstport_or_icmpcode_last,
976                                             self.tcp_dport_to+1)
977                         elif dr.proto == self.proto[self.IP][self.UDP]:
978                             self.assertGreater(dr.srcport_or_icmptype_first,
979                                                self.udp_sport_from-1)
980                             self.assertLess(dr.srcport_or_icmptype_first,
981                                             self.udp_sport_to+1)
982                             self.assertGreater(dr.dstport_or_icmpcode_last,
983                                                self.udp_dport_from-1)
984                             self.assertLess(dr.dstport_or_icmpcode_last,
985                                             self.udp_dport_to+1)
986                 i += 1
987
988         self.logger.info("ACLP_TEST_FINISH_0014")
989
990     def test_0015_tcp_permit_port_v4(self):
991         """ permit single TCPv4
992         """
993         self.logger.info("ACLP_TEST_START_0015")
994
995         port = random.randint(16384, 65535)
996         # Add an ACL
997         rules = []
998         rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
999                                       self.proto[self.IP][self.TCP]))
1000         # deny ip any any in the end
1001         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1002
1003         # Apply rules
1004         self.apply_rules(rules, b"permit ip4 tcp %d" % port)
1005
1006         # Traffic should still pass
1007         self.run_verify_test(self.IP, self.IPV4,
1008                              self.proto[self.IP][self.TCP], port)
1009
1010         self.logger.info("ACLP_TEST_FINISH_0015")
1011
1012     def test_0016_udp_permit_port_v4(self):
1013         """ permit single UDPv4
1014         """
1015         self.logger.info("ACLP_TEST_START_0016")
1016
1017         port = random.randint(16384, 65535)
1018         # Add an ACL
1019         rules = []
1020         rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
1021                                       self.proto[self.IP][self.UDP]))
1022         # deny ip any any in the end
1023         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1024
1025         # Apply rules
1026         self.apply_rules(rules, b"permit ip4 tcp %d" % port)
1027
1028         # Traffic should still pass
1029         self.run_verify_test(self.IP, self.IPV4,
1030                              self.proto[self.IP][self.UDP], port)
1031
1032         self.logger.info("ACLP_TEST_FINISH_0016")
1033
1034     def test_0017_tcp_permit_port_v6(self):
1035         """ permit single TCPv6
1036         """
1037         self.logger.info("ACLP_TEST_START_0017")
1038
1039         port = random.randint(16384, 65535)
1040         # Add an ACL
1041         rules = []
1042         rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
1043                                       self.proto[self.IP][self.TCP]))
1044         # deny ip any any in the end
1045         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1046
1047         # Apply rules
1048         self.apply_rules(rules, b"permit ip4 tcp %d" % port)
1049
1050         # Traffic should still pass
1051         self.run_verify_test(self.IP, self.IPV6,
1052                              self.proto[self.IP][self.TCP], port)
1053
1054         self.logger.info("ACLP_TEST_FINISH_0017")
1055
1056     def test_0018_udp_permit_port_v6(self):
1057         """ permit single UPPv6
1058         """
1059         self.logger.info("ACLP_TEST_START_0018")
1060
1061         port = random.randint(16384, 65535)
1062         # Add an ACL
1063         rules = []
1064         rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
1065                                       self.proto[self.IP][self.UDP]))
1066         # deny ip any any in the end
1067         rules.append(self.create_rule(self.IPV6, self.DENY,
1068                                       self.PORTS_ALL, 0))
1069
1070         # Apply rules
1071         self.apply_rules(rules, b"permit ip4 tcp %d" % port)
1072
1073         # Traffic should still pass
1074         self.run_verify_test(self.IP, self.IPV6,
1075                              self.proto[self.IP][self.UDP], port)
1076
1077         self.logger.info("ACLP_TEST_FINISH_0018")
1078
1079     def test_0019_udp_deny_port(self):
1080         """ deny single TCPv4/v6
1081         """
1082         self.logger.info("ACLP_TEST_START_0019")
1083
1084         port = random.randint(16384, 65535)
1085         # Add an ACL
1086         rules = []
1087         rules.append(self.create_rule(self.IPV4, self.DENY, port,
1088                                       self.proto[self.IP][self.TCP]))
1089         rules.append(self.create_rule(self.IPV6, self.DENY, port,
1090                                       self.proto[self.IP][self.TCP]))
1091         # Permit ip any any in the end
1092         rules.append(self.create_rule(self.IPV4, self.PERMIT,
1093                                       self.PORTS_ALL, 0))
1094         rules.append(self.create_rule(self.IPV6, self.PERMIT,
1095                                       self.PORTS_ALL, 0))
1096
1097         # Apply rules
1098         self.apply_rules(rules, b"deny ip4/ip6 udp %d" % port)
1099
1100         # Traffic should not pass
1101         self.run_verify_negat_test(self.IP, self.IPRANDOM,
1102                                    self.proto[self.IP][self.TCP], port)
1103
1104         self.logger.info("ACLP_TEST_FINISH_0019")
1105
1106     def test_0020_udp_deny_port(self):
1107         """ deny single UDPv4/v6
1108         """
1109         self.logger.info("ACLP_TEST_START_0020")
1110
1111         port = random.randint(16384, 65535)
1112         # Add an ACL
1113         rules = []
1114         rules.append(self.create_rule(self.IPV4, self.DENY, port,
1115                                       self.proto[self.IP][self.UDP]))
1116         rules.append(self.create_rule(self.IPV6, self.DENY, port,
1117                                       self.proto[self.IP][self.UDP]))
1118         # Permit ip any any in the end
1119         rules.append(self.create_rule(self.IPV4, self.PERMIT,
1120                                       self.PORTS_ALL, 0))
1121         rules.append(self.create_rule(self.IPV6, self.PERMIT,
1122                                       self.PORTS_ALL, 0))
1123
1124         # Apply rules
1125         self.apply_rules(rules, b"deny ip4/ip6 udp %d" % port)
1126
1127         # Traffic should not pass
1128         self.run_verify_negat_test(self.IP, self.IPRANDOM,
1129                                    self.proto[self.IP][self.UDP], port)
1130
1131         self.logger.info("ACLP_TEST_FINISH_0020")
1132
1133     def test_0021_udp_deny_port_verify_fragment_deny(self):
1134         """ deny single UDPv4/v6, permit ip any, verify non-initial fragment
1135         blocked
1136         """
1137         self.logger.info("ACLP_TEST_START_0021")
1138
1139         port = random.randint(16384, 65535)
1140         # Add an ACL
1141         rules = []
1142         rules.append(self.create_rule(self.IPV4, self.DENY, port,
1143                                       self.proto[self.IP][self.UDP]))
1144         rules.append(self.create_rule(self.IPV6, self.DENY, port,
1145                                       self.proto[self.IP][self.UDP]))
1146         # deny ip any any in the end
1147         rules.append(self.create_rule(self.IPV4, self.PERMIT,
1148                                       self.PORTS_ALL, 0))
1149         rules.append(self.create_rule(self.IPV6, self.PERMIT,
1150                                       self.PORTS_ALL, 0))
1151
1152         # Apply rules
1153         self.apply_rules(rules, b"deny ip4/ip6 udp %d" % port)
1154
1155         # Traffic should not pass
1156         self.run_verify_negat_test(self.IP, self.IPRANDOM,
1157                                    self.proto[self.IP][self.UDP], port, True)
1158
1159         self.logger.info("ACLP_TEST_FINISH_0021")
1160
1161     def test_0022_zero_length_udp_ipv4(self):
1162         """ VPP-687 zero length udp ipv4 packet"""
1163         self.logger.info("ACLP_TEST_START_0022")
1164
1165         port = random.randint(16384, 65535)
1166         # Add an ACL
1167         rules = []
1168         rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
1169                                       self.proto[self.IP][self.UDP]))
1170         # deny ip any any in the end
1171         rules.append(
1172             self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1173
1174         # Apply rules
1175         self.apply_rules(rules, b"permit empty udp ip4 %d" % port)
1176
1177         # Traffic should still pass
1178         # Create incoming packet streams for packet-generator interfaces
1179         pkts_cnt = 0
1180         pkts = self.create_stream(self.pg0, self.pg_if_packet_sizes,
1181                                   self.IP, self.IPV4,
1182                                   self.proto[self.IP][self.UDP], port,
1183                                   False, False)
1184         if len(pkts) > 0:
1185             self.pg0.add_stream(pkts)
1186             pkts_cnt += len(pkts)
1187
1188         # Enable packet capture and start packet sendingself.IPV
1189         self.pg_enable_capture(self.pg_interfaces)
1190         self.pg_start()
1191
1192         self.pg1.get_capture(pkts_cnt)
1193
1194         self.logger.info("ACLP_TEST_FINISH_0022")
1195
1196     def test_0023_zero_length_udp_ipv6(self):
1197         """ VPP-687 zero length udp ipv6 packet"""
1198         self.logger.info("ACLP_TEST_START_0023")
1199
1200         port = random.randint(16384, 65535)
1201         # Add an ACL
1202         rules = []
1203         rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
1204                                       self.proto[self.IP][self.UDP]))
1205         # deny ip any any in the end
1206         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1207
1208         # Apply rules
1209         self.apply_rules(rules, b"permit empty udp ip6 %d" % port)
1210
1211         # Traffic should still pass
1212         # Create incoming packet streams for packet-generator interfaces
1213         pkts_cnt = 0
1214         pkts = self.create_stream(self.pg0, self.pg_if_packet_sizes,
1215                                   self.IP, self.IPV6,
1216                                   self.proto[self.IP][self.UDP], port,
1217                                   False, False)
1218         if len(pkts) > 0:
1219             self.pg0.add_stream(pkts)
1220             pkts_cnt += len(pkts)
1221
1222         # Enable packet capture and start packet sendingself.IPV
1223         self.pg_enable_capture(self.pg_interfaces)
1224         self.pg_start()
1225
1226         # Verify outgoing packet streams per packet-generator interface
1227         self.pg1.get_capture(pkts_cnt)
1228
1229         self.logger.info("ACLP_TEST_FINISH_0023")
1230
1231     def test_0108_tcp_permit_v4(self):
1232         """ permit TCPv4 + non-match range
1233         """
1234         self.logger.info("ACLP_TEST_START_0108")
1235
1236         # Add an ACL
1237         rules = []
1238         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1239                      self.proto[self.IP][self.TCP]))
1240         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1241                      self.proto[self.IP][self.TCP]))
1242         # deny ip any any in the end
1243         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1244
1245         # Apply rules
1246         self.apply_rules(rules, b"permit ipv4 tcp")
1247
1248         # Traffic should still pass
1249         self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP])
1250
1251         self.logger.info("ACLP_TEST_FINISH_0108")
1252
1253     def test_0109_tcp_permit_v6(self):
1254         """ permit TCPv6 + non-match range
1255         """
1256         self.logger.info("ACLP_TEST_START_0109")
1257
1258         # Add an ACL
1259         rules = []
1260         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE_2,
1261                                       self.proto[self.IP][self.TCP]))
1262         rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
1263                                       self.proto[self.IP][self.TCP]))
1264         # deny ip any any in the end
1265         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1266
1267         # Apply rules
1268         self.apply_rules(rules, b"permit ip6 tcp")
1269
1270         # Traffic should still pass
1271         self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.TCP])
1272
1273         self.logger.info("ACLP_TEST_FINISH_0109")
1274
1275     def test_0110_udp_permit_v4(self):
1276         """ permit UDPv4 + non-match range
1277         """
1278         self.logger.info("ACLP_TEST_START_0110")
1279
1280         # Add an ACL
1281         rules = []
1282         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1283                                       self.proto[self.IP][self.UDP]))
1284         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1285                                       self.proto[self.IP][self.UDP]))
1286         # deny ip any any in the end
1287         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1288
1289         # Apply rules
1290         self.apply_rules(rules, b"permit ipv4 udp")
1291
1292         # Traffic should still pass
1293         self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP])
1294
1295         self.logger.info("ACLP_TEST_FINISH_0110")
1296
1297     def test_0111_udp_permit_v6(self):
1298         """ permit UDPv6 + non-match range
1299         """
1300         self.logger.info("ACLP_TEST_START_0111")
1301
1302         # Add an ACL
1303         rules = []
1304         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE_2,
1305                                       self.proto[self.IP][self.UDP]))
1306         rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
1307                                       self.proto[self.IP][self.UDP]))
1308         # deny ip any any in the end
1309         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1310
1311         # Apply rules
1312         self.apply_rules(rules, b"permit ip6 udp")
1313
1314         # Traffic should still pass
1315         self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.UDP])
1316
1317         self.logger.info("ACLP_TEST_FINISH_0111")
1318
1319     def test_0112_tcp_deny(self):
1320         """ deny TCPv4/v6 + non-match range
1321         """
1322         self.logger.info("ACLP_TEST_START_0112")
1323
1324         # Add an ACL
1325         rules = []
1326         rules.append(self.create_rule(self.IPV4, self.PERMIT,
1327                                       self.PORTS_RANGE_2,
1328                                       self.proto[self.IP][self.TCP]))
1329         rules.append(self.create_rule(self.IPV6, self.PERMIT,
1330                                       self.PORTS_RANGE_2,
1331                                       self.proto[self.IP][self.TCP]))
1332         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
1333                                       self.proto[self.IP][self.TCP]))
1334         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
1335                                       self.proto[self.IP][self.TCP]))
1336         # permit ip any any in the end
1337         rules.append(self.create_rule(self.IPV4, self.PERMIT,
1338                                       self.PORTS_ALL, 0))
1339         rules.append(self.create_rule(self.IPV6, self.PERMIT,
1340                                       self.PORTS_ALL, 0))
1341
1342         # Apply rules
1343         self.apply_rules(rules, b"deny ip4/ip6 tcp")
1344
1345         # Traffic should not pass
1346         self.run_verify_negat_test(self.IP, self.IPRANDOM,
1347                                    self.proto[self.IP][self.TCP])
1348
1349         self.logger.info("ACLP_TEST_FINISH_0112")
1350
1351     def test_0113_udp_deny(self):
1352         """ deny UDPv4/v6 + non-match range
1353         """
1354         self.logger.info("ACLP_TEST_START_0113")
1355
1356         # Add an ACL
1357         rules = []
1358         rules.append(self.create_rule(self.IPV4, self.PERMIT,
1359                                       self.PORTS_RANGE_2,
1360                                       self.proto[self.IP][self.UDP]))
1361         rules.append(self.create_rule(self.IPV6, self.PERMIT,
1362                                       self.PORTS_RANGE_2,
1363                                       self.proto[self.IP][self.UDP]))
1364         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
1365                                       self.proto[self.IP][self.UDP]))
1366         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
1367                                       self.proto[self.IP][self.UDP]))
1368         # permit ip any any in the end
1369         rules.append(self.create_rule(self.IPV4, self.PERMIT,
1370                                       self.PORTS_ALL, 0))
1371         rules.append(self.create_rule(self.IPV6, self.PERMIT,
1372                                       self.PORTS_ALL, 0))
1373
1374         # Apply rules
1375         self.apply_rules(rules, b"deny ip4/ip6 udp")
1376
1377         # Traffic should not pass
1378         self.run_verify_negat_test(self.IP, self.IPRANDOM,
1379                                    self.proto[self.IP][self.UDP])
1380
1381         self.logger.info("ACLP_TEST_FINISH_0113")
1382
1383     def test_0300_tcp_permit_v4_etype_aaaa(self):
1384         """ permit TCPv4, send 0xAAAA etype
1385         """
1386         self.logger.info("ACLP_TEST_START_0300")
1387
1388         # Add an ACL
1389         rules = []
1390         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1391                      self.proto[self.IP][self.TCP]))
1392         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1393                      self.proto[self.IP][self.TCP]))
1394         # deny ip any any in the end
1395         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1396
1397         # Apply rules
1398         self.apply_rules(rules, b"permit ipv4 tcp")
1399
1400         # Traffic should still pass also for an odd ethertype
1401         self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP],
1402                              0, False, True, 0xaaaa)
1403         self.logger.info("ACLP_TEST_FINISH_0300")
1404
1405     def test_0305_tcp_permit_v4_etype_blacklist_aaaa(self):
1406         """ permit TCPv4, whitelist 0x0BBB ethertype, send 0xAAAA-blocked
1407         """
1408         self.logger.info("ACLP_TEST_START_0305")
1409
1410         # Add an ACL
1411         rules = []
1412         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1413                      self.proto[self.IP][self.TCP]))
1414         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1415                      self.proto[self.IP][self.TCP]))
1416         # deny ip any any in the end
1417         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1418
1419         # Apply rules
1420         self.apply_rules(rules, b"permit ipv4 tcp")
1421
1422         # whitelist the 0xbbbb etype - so the 0xaaaa should be blocked
1423         self.etype_whitelist([0xbbb], 1)
1424
1425         # The oddball ethertype should be blocked
1426         self.run_verify_negat_test(self.IP, self.IPV4,
1427                                    self.proto[self.IP][self.TCP],
1428                                    0, False, 0xaaaa)
1429
1430         # remove the whitelist
1431         self.etype_whitelist([], 0)
1432
1433         self.logger.info("ACLP_TEST_FINISH_0305")
1434
1435     def test_0306_tcp_permit_v4_etype_blacklist_aaaa(self):
1436         """ permit TCPv4, whitelist 0x0BBB ethertype, send 0x0BBB - pass
1437         """
1438         self.logger.info("ACLP_TEST_START_0306")
1439
1440         # Add an ACL
1441         rules = []
1442         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1443                      self.proto[self.IP][self.TCP]))
1444         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1445                      self.proto[self.IP][self.TCP]))
1446         # deny ip any any in the end
1447         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1448
1449         # Apply rules
1450         self.apply_rules(rules, b"permit ipv4 tcp")
1451
1452         # whitelist the 0xbbbb etype - so the 0xaaaa should be blocked
1453         self.etype_whitelist([0xbbb], 1)
1454
1455         # The whitelisted traffic, should pass
1456         self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP],
1457                              0, False, True, 0x0bbb)
1458
1459         # remove the whitelist, the previously blocked 0xAAAA should pass now
1460         self.etype_whitelist([], 0)
1461
1462         self.logger.info("ACLP_TEST_FINISH_0306")
1463
1464     def test_0307_tcp_permit_v4_etype_blacklist_aaaa(self):
1465         """ permit TCPv4, whitelist 0x0BBB, remove, send 0xAAAA - pass
1466         """
1467         self.logger.info("ACLP_TEST_START_0307")
1468
1469         # Add an ACL
1470         rules = []
1471         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1472                      self.proto[self.IP][self.TCP]))
1473         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1474                      self.proto[self.IP][self.TCP]))
1475         # deny ip any any in the end
1476         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1477
1478         # Apply rules
1479         self.apply_rules(rules, b"permit ipv4 tcp")
1480
1481         # whitelist the 0xbbbb etype - so the 0xaaaa should be blocked
1482         self.etype_whitelist([0xbbb], 1)
1483         # remove the whitelist, the previously blocked 0xAAAA should pass now
1484         self.etype_whitelist([], 0)
1485
1486         # The whitelisted traffic, should pass
1487         self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP],
1488                              0, False, True, 0xaaaa)
1489
1490         self.logger.info("ACLP_TEST_FINISH_0306")
1491
1492     def test_0315_del_intf(self):
1493         """ apply an acl and delete the interface
1494         """
1495         self.logger.info("ACLP_TEST_START_0315")
1496
1497         # Add an ACL
1498         rules = []
1499         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1500                      self.proto[self.IP][self.TCP]))
1501         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1502                      self.proto[self.IP][self.TCP]))
1503         # deny ip any any in the end
1504         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1505
1506         # create an interface
1507         intf = []
1508         intf.append(VppLoInterface(self))
1509
1510         # Apply rules
1511         self.apply_rules_to(rules, b"permit ipv4 tcp", intf[0].sw_if_index)
1512
1513         # Remove the interface
1514         intf[0].remove_vpp_config()
1515
1516         self.logger.info("ACLP_TEST_FINISH_0315")
1517
1518 if __name__ == '__main__':
1519     unittest.main(testRunner=VppTestRunner)