Tests Cleanup: Fix missing calls to setUpClass/tearDownClass.
[vpp.git] / test / test_acl_plugin.py
1 #!/usr/bin/env python
2 """ACL plugin Test Case HLD:
3 """
4
5 import unittest
6 import random
7
8 from scapy.packet import Raw
9 from scapy.layers.l2 import Ether
10 from scapy.layers.inet import IP, TCP, UDP, ICMP
11 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest
12 from scapy.layers.inet6 import IPv6ExtHdrFragment
13 from framework import VppTestCase, VppTestRunner
14 from util import Host, ppp
15
16 from vpp_lo_interface import VppLoInterface
17
18
19 class TestACLplugin(VppTestCase):
20     """ ACL plugin Test Case """
21
22     # traffic types
23     IP = 0
24     ICMP = 1
25
26     # IP version
27     IPRANDOM = -1
28     IPV4 = 0
29     IPV6 = 1
30
31     # rule types
32     DENY = 0
33     PERMIT = 1
34
35     # supported protocols
36     proto = [[6, 17], [1, 58]]
37     proto_map = {1: 'ICMP', 58: 'ICMPv6EchoRequest', 6: 'TCP', 17: 'UDP'}
38     ICMPv4 = 0
39     ICMPv6 = 1
40     TCP = 0
41     UDP = 1
42     PROTO_ALL = 0
43
44     # port ranges
45     PORTS_ALL = -1
46     PORTS_RANGE = 0
47     PORTS_RANGE_2 = 1
48     udp_sport_from = 10
49     udp_sport_to = udp_sport_from + 5
50     udp_dport_from = 20000
51     udp_dport_to = udp_dport_from + 5000
52     tcp_sport_from = 30
53     tcp_sport_to = tcp_sport_from + 5
54     tcp_dport_from = 40000
55     tcp_dport_to = tcp_dport_from + 5000
56
57     udp_sport_from_2 = 90
58     udp_sport_to_2 = udp_sport_from_2 + 5
59     udp_dport_from_2 = 30000
60     udp_dport_to_2 = udp_dport_from_2 + 5000
61     tcp_sport_from_2 = 130
62     tcp_sport_to_2 = tcp_sport_from_2 + 5
63     tcp_dport_from_2 = 20000
64     tcp_dport_to_2 = tcp_dport_from_2 + 5000
65
66     icmp4_type = 8  # echo request
67     icmp4_code = 3
68     icmp6_type = 128  # echo request
69     icmp6_code = 3
70
71     icmp4_type_2 = 8
72     icmp4_code_from_2 = 5
73     icmp4_code_to_2 = 20
74     icmp6_type_2 = 128
75     icmp6_code_from_2 = 8
76     icmp6_code_to_2 = 42
77
78     # Test variables
79     bd_id = 1
80
81     @classmethod
82     def setUpClass(cls):
83         """
84         Perform standard class setup (defined by class method setUpClass in
85         class VppTestCase) before running the test case, set test case related
86         variables and configure VPP.
87         """
88         super(TestACLplugin, cls).setUpClass()
89
90         try:
91             # Create 2 pg interfaces
92             cls.create_pg_interfaces(range(2))
93
94             # Packet flows mapping pg0 -> pg1, pg2 etc.
95             cls.flows = dict()
96             cls.flows[cls.pg0] = [cls.pg1]
97
98             # Packet sizes
99             cls.pg_if_packet_sizes = [64, 512, 1518, 9018]
100
101             # Create BD with MAC learning and unknown unicast flooding disabled
102             # and put interfaces to this BD
103             cls.vapi.bridge_domain_add_del(bd_id=cls.bd_id, uu_flood=1,
104                                            learn=1)
105             for pg_if in cls.pg_interfaces:
106                 cls.vapi.sw_interface_set_l2_bridge(
107                     rx_sw_if_index=pg_if.sw_if_index, bd_id=cls.bd_id)
108
109             # Set up all interfaces
110             for i in cls.pg_interfaces:
111                 i.admin_up()
112
113             # Mapping between packet-generator index and lists of test hosts
114             cls.hosts_by_pg_idx = dict()
115             for pg_if in cls.pg_interfaces:
116                 cls.hosts_by_pg_idx[pg_if.sw_if_index] = []
117
118             # Create list of deleted hosts
119             cls.deleted_hosts_by_pg_idx = dict()
120             for pg_if in cls.pg_interfaces:
121                 cls.deleted_hosts_by_pg_idx[pg_if.sw_if_index] = []
122
123             # warm-up the mac address tables
124             # self.warmup_test()
125             count = 16
126             start = 0
127             n_int = len(cls.pg_interfaces)
128             macs_per_if = count / n_int
129             i = -1
130             for pg_if in cls.pg_interfaces:
131                 i += 1
132                 start_nr = macs_per_if * i + start
133                 end_nr = count + start if i == (n_int - 1) \
134                     else macs_per_if * (i + 1) + start
135                 hosts = cls.hosts_by_pg_idx[pg_if.sw_if_index]
136                 for j in range(start_nr, end_nr):
137                     host = Host(
138                         "00:00:00:ff:%02x:%02x" % (pg_if.sw_if_index, j),
139                         "172.17.1%02x.%u" % (pg_if.sw_if_index, j),
140                         "2017:dead:%02x::%u" % (pg_if.sw_if_index, j))
141                     hosts.append(host)
142
143         except Exception:
144             super(TestACLplugin, cls).tearDownClass()
145             raise
146
147     @classmethod
148     def tearDownClass(cls):
149         super(TestACLplugin, cls).tearDownClass()
150
151     def setUp(self):
152         super(TestACLplugin, self).setUp()
153         self.reset_packet_infos()
154
155     def tearDown(self):
156         """
157         Show various debug prints after each test.
158         """
159         super(TestACLplugin, self).tearDown()
160         if not self.vpp_dead:
161             cli = "show vlib graph l2-input-feat-arc"
162             self.logger.info(self.vapi.ppcli(cli))
163             cli = "show vlib graph l2-input-feat-arc-end"
164             self.logger.info(self.vapi.ppcli(cli))
165             cli = "show vlib graph l2-output-feat-arc"
166             self.logger.info(self.vapi.ppcli(cli))
167             cli = "show vlib graph l2-output-feat-arc-end"
168             self.logger.info(self.vapi.ppcli(cli))
169             self.logger.info(self.vapi.ppcli("show l2fib verbose"))
170             self.logger.info(self.vapi.ppcli("show acl-plugin acl"))
171             self.logger.info(self.vapi.ppcli("show acl-plugin interface"))
172             self.logger.info(self.vapi.ppcli("show acl-plugin tables"))
173             self.logger.info(self.vapi.ppcli("show bridge-domain %s detail"
174                                              % self.bd_id))
175
176     def create_rule(self, ip=0, permit_deny=0, ports=PORTS_ALL, proto=-1,
177                     s_prefix=0, s_ip='\x00\x00\x00\x00',
178                     d_prefix=0, d_ip='\x00\x00\x00\x00'):
179         if proto == -1:
180             return
181         if ports == self.PORTS_ALL:
182             sport_from = 0
183             dport_from = 0
184             sport_to = 65535 if proto != 1 and proto != 58 else 255
185             dport_to = sport_to
186         elif ports == self.PORTS_RANGE:
187             if proto == 1:
188                 sport_from = self.icmp4_type
189                 sport_to = self.icmp4_type
190                 dport_from = self.icmp4_code
191                 dport_to = self.icmp4_code
192             elif proto == 58:
193                 sport_from = self.icmp6_type
194                 sport_to = self.icmp6_type
195                 dport_from = self.icmp6_code
196                 dport_to = self.icmp6_code
197             elif proto == self.proto[self.IP][self.TCP]:
198                 sport_from = self.tcp_sport_from
199                 sport_to = self.tcp_sport_to
200                 dport_from = self.tcp_dport_from
201                 dport_to = self.tcp_dport_to
202             elif proto == self.proto[self.IP][self.UDP]:
203                 sport_from = self.udp_sport_from
204                 sport_to = self.udp_sport_to
205                 dport_from = self.udp_dport_from
206                 dport_to = self.udp_dport_to
207         elif ports == self.PORTS_RANGE_2:
208             if proto == 1:
209                 sport_from = self.icmp4_type_2
210                 sport_to = self.icmp4_type_2
211                 dport_from = self.icmp4_code_from_2
212                 dport_to = self.icmp4_code_to_2
213             elif proto == 58:
214                 sport_from = self.icmp6_type_2
215                 sport_to = self.icmp6_type_2
216                 dport_from = self.icmp6_code_from_2
217                 dport_to = self.icmp6_code_to_2
218             elif proto == self.proto[self.IP][self.TCP]:
219                 sport_from = self.tcp_sport_from_2
220                 sport_to = self.tcp_sport_to_2
221                 dport_from = self.tcp_dport_from_2
222                 dport_to = self.tcp_dport_to_2
223             elif proto == self.proto[self.IP][self.UDP]:
224                 sport_from = self.udp_sport_from_2
225                 sport_to = self.udp_sport_to_2
226                 dport_from = self.udp_dport_from_2
227                 dport_to = self.udp_dport_to_2
228         else:
229             sport_from = ports
230             sport_to = ports
231             dport_from = ports
232             dport_to = ports
233
234         rule = ({'is_permit': permit_deny, 'is_ipv6': ip, 'proto': proto,
235                  'srcport_or_icmptype_first': sport_from,
236                  'srcport_or_icmptype_last': sport_to,
237                  'src_ip_prefix_len': s_prefix,
238                  'src_ip_addr': s_ip,
239                  'dstport_or_icmpcode_first': dport_from,
240                  'dstport_or_icmpcode_last': dport_to,
241                  'dst_ip_prefix_len': d_prefix,
242                  'dst_ip_addr': d_ip})
243         return rule
244
245     def apply_rules(self, rules, tag=b''):
246         reply = self.vapi.acl_add_replace(acl_index=4294967295, r=rules,
247                                           tag=tag)
248         self.logger.info("Dumped ACL: " + str(
249             self.vapi.acl_dump(reply.acl_index)))
250         # Apply a ACL on the interface as inbound
251         for i in self.pg_interfaces:
252             self.vapi.acl_interface_set_acl_list(sw_if_index=i.sw_if_index,
253                                                  n_input=1,
254                                                  acls=[reply.acl_index])
255         return
256
257     def apply_rules_to(self, rules, tag=b'', sw_if_index=0xFFFFFFFF):
258         reply = self.vapi.acl_add_replace(acl_index=4294967295, r=rules,
259                                           tag=tag)
260         self.logger.info("Dumped ACL: " + str(
261             self.vapi.acl_dump(reply.acl_index)))
262         # Apply a ACL on the interface as inbound
263         self.vapi.acl_interface_set_acl_list(sw_if_index=sw_if_index,
264                                              n_input=1,
265                                              acls=[reply.acl_index])
266         return
267
268     def etype_whitelist(self, whitelist, n_input):
269         # Apply whitelists on all the interfaces
270         for i in self.pg_interfaces:
271             # checkstyle can't read long names. Help them.
272             fun = self.vapi.acl_interface_set_etype_whitelist
273             fun(sw_if_index=i.sw_if_index, n_input=n_input,
274                 whitelist=whitelist)
275         return
276
277     def create_upper_layer(self, packet_index, proto, ports=0):
278         p = self.proto_map[proto]
279         if p == 'UDP':
280             if ports == 0:
281                 return UDP(sport=random.randint(self.udp_sport_from,
282                                                 self.udp_sport_to),
283                            dport=random.randint(self.udp_dport_from,
284                                                 self.udp_dport_to))
285             else:
286                 return UDP(sport=ports, dport=ports)
287         elif p == 'TCP':
288             if ports == 0:
289                 return TCP(sport=random.randint(self.tcp_sport_from,
290                                                 self.tcp_sport_to),
291                            dport=random.randint(self.tcp_dport_from,
292                                                 self.tcp_dport_to))
293             else:
294                 return TCP(sport=ports, dport=ports)
295         return ''
296
297     def create_stream(self, src_if, packet_sizes, traffic_type=0, ipv6=0,
298                       proto=-1, ports=0, fragments=False,
299                       pkt_raw=True, etype=-1):
300         """
301         Create input packet stream for defined interface using hosts or
302         deleted_hosts list.
303
304         :param object src_if: Interface to create packet stream for.
305         :param list packet_sizes: List of required packet sizes.
306         :param traffic_type: 1: ICMP packet, 2: IPv6 with EH, 0: otherwise.
307         :return: Stream of packets.
308         """
309         pkts = []
310         if self.flows.__contains__(src_if):
311             src_hosts = self.hosts_by_pg_idx[src_if.sw_if_index]
312             for dst_if in self.flows[src_if]:
313                 dst_hosts = self.hosts_by_pg_idx[dst_if.sw_if_index]
314                 n_int = len(dst_hosts) * len(src_hosts)
315                 for i in range(0, n_int):
316                     dst_host = dst_hosts[i / len(src_hosts)]
317                     src_host = src_hosts[i % len(src_hosts)]
318                     pkt_info = self.create_packet_info(src_if, dst_if)
319                     if ipv6 == 1:
320                         pkt_info.ip = 1
321                     elif ipv6 == 0:
322                         pkt_info.ip = 0
323                     else:
324                         pkt_info.ip = random.choice([0, 1])
325                     if proto == -1:
326                         pkt_info.proto = random.choice(self.proto[self.IP])
327                     else:
328                         pkt_info.proto = proto
329                     payload = self.info_to_payload(pkt_info)
330                     p = Ether(dst=dst_host.mac, src=src_host.mac)
331                     if etype > 0:
332                         p = Ether(dst=dst_host.mac,
333                                   src=src_host.mac,
334                                   type=etype)
335                     if pkt_info.ip:
336                         p /= IPv6(dst=dst_host.ip6, src=src_host.ip6)
337                         if fragments:
338                             p /= IPv6ExtHdrFragment(offset=64, m=1)
339                     else:
340                         if fragments:
341                             p /= IP(src=src_host.ip4, dst=dst_host.ip4,
342                                     flags=1, frag=64)
343                         else:
344                             p /= IP(src=src_host.ip4, dst=dst_host.ip4)
345                     if traffic_type == self.ICMP:
346                         if pkt_info.ip:
347                             p /= ICMPv6EchoRequest(type=self.icmp6_type,
348                                                    code=self.icmp6_code)
349                         else:
350                             p /= ICMP(type=self.icmp4_type,
351                                       code=self.icmp4_code)
352                     else:
353                         p /= self.create_upper_layer(i, pkt_info.proto, ports)
354                     if pkt_raw:
355                         p /= Raw(payload)
356                         pkt_info.data = p.copy()
357                     if pkt_raw:
358                         size = random.choice(packet_sizes)
359                         self.extend_packet(p, size)
360                     pkts.append(p)
361         return pkts
362
363     def verify_capture(self, pg_if, capture,
364                        traffic_type=0, ip_type=0, etype=-1):
365         """
366         Verify captured input packet stream for defined interface.
367
368         :param object pg_if: Interface to verify captured packet stream for.
369         :param list capture: Captured packet stream.
370         :param traffic_type: 1: ICMP packet, 2: IPv6 with EH, 0: otherwise.
371         """
372         last_info = dict()
373         for i in self.pg_interfaces:
374             last_info[i.sw_if_index] = None
375         dst_sw_if_index = pg_if.sw_if_index
376         for packet in capture:
377             if etype > 0:
378                 if packet[Ether].type != etype:
379                     self.logger.error(ppp("Unexpected ethertype in packet:",
380                                           packet))
381                 else:
382                     continue
383             try:
384                 # Raw data for ICMPv6 are stored in ICMPv6EchoRequest.data
385                 if traffic_type == self.ICMP and ip_type == self.IPV6:
386                     payload_info = self.payload_to_info(
387                         packet[ICMPv6EchoRequest], 'data')
388                     payload = packet[ICMPv6EchoRequest]
389                 else:
390                     payload_info = self.payload_to_info(packet[Raw])
391                     payload = packet[self.proto_map[payload_info.proto]]
392             except:
393                 self.logger.error(ppp("Unexpected or invalid packet "
394                                       "(outside network):", packet))
395                 raise
396
397             if ip_type != 0:
398                 self.assertEqual(payload_info.ip, ip_type)
399             if traffic_type == self.ICMP:
400                 try:
401                     if payload_info.ip == 0:
402                         self.assertEqual(payload.type, self.icmp4_type)
403                         self.assertEqual(payload.code, self.icmp4_code)
404                     else:
405                         self.assertEqual(payload.type, self.icmp6_type)
406                         self.assertEqual(payload.code, self.icmp6_code)
407                 except:
408                     self.logger.error(ppp("Unexpected or invalid packet "
409                                           "(outside network):", packet))
410                     raise
411             else:
412                 try:
413                     ip_version = IPv6 if payload_info.ip == 1 else IP
414
415                     ip = packet[ip_version]
416                     packet_index = payload_info.index
417
418                     self.assertEqual(payload_info.dst, dst_sw_if_index)
419                     self.logger.debug("Got packet on port %s: src=%u (id=%u)" %
420                                       (pg_if.name, payload_info.src,
421                                        packet_index))
422                     next_info = self.get_next_packet_info_for_interface2(
423                         payload_info.src, dst_sw_if_index,
424                         last_info[payload_info.src])
425                     last_info[payload_info.src] = next_info
426                     self.assertTrue(next_info is not None)
427                     self.assertEqual(packet_index, next_info.index)
428                     saved_packet = next_info.data
429                     # Check standard fields
430                     self.assertEqual(ip.src, saved_packet[ip_version].src)
431                     self.assertEqual(ip.dst, saved_packet[ip_version].dst)
432                     p = self.proto_map[payload_info.proto]
433                     if p == 'TCP':
434                         tcp = packet[TCP]
435                         self.assertEqual(tcp.sport, saved_packet[
436                             TCP].sport)
437                         self.assertEqual(tcp.dport, saved_packet[
438                             TCP].dport)
439                     elif p == 'UDP':
440                         udp = packet[UDP]
441                         self.assertEqual(udp.sport, saved_packet[
442                             UDP].sport)
443                         self.assertEqual(udp.dport, saved_packet[
444                             UDP].dport)
445                 except:
446                     self.logger.error(ppp("Unexpected or invalid packet:",
447                                           packet))
448                     raise
449         for i in self.pg_interfaces:
450             remaining_packet = self.get_next_packet_info_for_interface2(
451                 i, dst_sw_if_index, last_info[i.sw_if_index])
452             self.assertTrue(
453                 remaining_packet is None,
454                 "Port %u: Packet expected from source %u didn't arrive" %
455                 (dst_sw_if_index, i.sw_if_index))
456
457     def run_traffic_no_check(self):
458         # Test
459         # Create incoming packet streams for packet-generator interfaces
460         for i in self.pg_interfaces:
461             if self.flows.__contains__(i):
462                 pkts = self.create_stream(i, self.pg_if_packet_sizes)
463                 if len(pkts) > 0:
464                     i.add_stream(pkts)
465
466         # Enable packet capture and start packet sending
467         self.pg_enable_capture(self.pg_interfaces)
468         self.pg_start()
469
470     def run_verify_test(self, traffic_type=0, ip_type=0, proto=-1, ports=0,
471                         frags=False, pkt_raw=True, etype=-1):
472         # Test
473         # Create incoming packet streams for packet-generator interfaces
474         pkts_cnt = 0
475         for i in self.pg_interfaces:
476             if self.flows.__contains__(i):
477                 pkts = self.create_stream(i, self.pg_if_packet_sizes,
478                                           traffic_type, ip_type, proto, ports,
479                                           frags, pkt_raw, etype)
480                 if len(pkts) > 0:
481                     i.add_stream(pkts)
482                     pkts_cnt += len(pkts)
483
484         # Enable packet capture and start packet sendingself.IPV
485         self.pg_enable_capture(self.pg_interfaces)
486         self.pg_start()
487         self.logger.info("sent packets count: %d" % pkts_cnt)
488
489         # Verify
490         # Verify outgoing packet streams per packet-generator interface
491         for src_if in self.pg_interfaces:
492             if self.flows.__contains__(src_if):
493                 for dst_if in self.flows[src_if]:
494                     capture = dst_if.get_capture(pkts_cnt)
495                     self.logger.info("Verifying capture on interface %s" %
496                                      dst_if.name)
497                     self.verify_capture(dst_if, capture,
498                                         traffic_type, ip_type, etype)
499
500     def run_verify_negat_test(self, traffic_type=0, ip_type=0, proto=-1,
501                               ports=0, frags=False, etype=-1):
502         # Test
503         pkts_cnt = 0
504         self.reset_packet_infos()
505         for i in self.pg_interfaces:
506             if self.flows.__contains__(i):
507                 pkts = self.create_stream(i, self.pg_if_packet_sizes,
508                                           traffic_type, ip_type, proto, ports,
509                                           frags, True, etype)
510                 if len(pkts) > 0:
511                     i.add_stream(pkts)
512                     pkts_cnt += len(pkts)
513
514         # Enable packet capture and start packet sending
515         self.pg_enable_capture(self.pg_interfaces)
516         self.pg_start()
517         self.logger.info("sent packets count: %d" % pkts_cnt)
518
519         # Verify
520         # Verify outgoing packet streams per packet-generator interface
521         for src_if in self.pg_interfaces:
522             if self.flows.__contains__(src_if):
523                 for dst_if in self.flows[src_if]:
524                     self.logger.info("Verifying capture on interface %s" %
525                                      dst_if.name)
526                     capture = dst_if.get_capture(0)
527                     self.assertEqual(len(capture), 0)
528
529     def test_0000_warmup_test(self):
530         """ ACL plugin version check; learn MACs
531         """
532         reply = self.vapi.papi.acl_plugin_get_version()
533         self.assertEqual(reply.major, 1)
534         self.logger.info("Working with ACL plugin version: %d.%d" % (
535             reply.major, reply.minor))
536         # minor version changes are non breaking
537         # self.assertEqual(reply.minor, 0)
538
539     def test_0001_acl_create(self):
540         """ ACL create/delete test
541         """
542
543         self.logger.info("ACLP_TEST_START_0001")
544         # Add an ACL
545         r = [{'is_permit': 1, 'is_ipv6': 0, 'proto': 17,
546               'srcport_or_icmptype_first': 1234,
547               'srcport_or_icmptype_last': 1235,
548               'src_ip_prefix_len': 0,
549               'src_ip_addr': b'\x00\x00\x00\x00',
550               'dstport_or_icmpcode_first': 1234,
551               'dstport_or_icmpcode_last': 1234,
552               'dst_ip_addr': b'\x00\x00\x00\x00',
553               'dst_ip_prefix_len': 0}]
554         # Test 1: add a new ACL
555         reply = self.vapi.acl_add_replace(acl_index=4294967295, r=r,
556                                           tag=b"permit 1234")
557         self.assertEqual(reply.retval, 0)
558         # The very first ACL gets #0
559         self.assertEqual(reply.acl_index, 0)
560         first_acl = reply.acl_index
561         rr = self.vapi.acl_dump(reply.acl_index)
562         self.logger.info("Dumped ACL: " + str(rr))
563         self.assertEqual(len(rr), 1)
564         # We should have the same number of ACL entries as we had asked
565         self.assertEqual(len(rr[0].r), len(r))
566         # The rules should be the same. But because the submitted and returned
567         # are different types, we need to iterate over rules and keys to get
568         # to basic values.
569         for i_rule in range(0, len(r) - 1):
570             for rule_key in r[i_rule]:
571                 self.assertEqual(rr[0].r[i_rule][rule_key],
572                                  r[i_rule][rule_key])
573
574         # Add a deny-1234 ACL
575         r_deny = [{'is_permit': 0, 'is_ipv6': 0, 'proto': 17,
576                    'srcport_or_icmptype_first': 1234,
577                    'srcport_or_icmptype_last': 1235,
578                    'src_ip_prefix_len': 0,
579                    'src_ip_addr': b'\x00\x00\x00\x00',
580                    'dstport_or_icmpcode_first': 1234,
581                    'dstport_or_icmpcode_last': 1234,
582                    'dst_ip_addr': b'\x00\x00\x00\x00',
583                    'dst_ip_prefix_len': 0},
584                   {'is_permit': 1, 'is_ipv6': 0, 'proto': 17,
585                    'srcport_or_icmptype_first': 0,
586                    'srcport_or_icmptype_last': 0,
587                    'src_ip_prefix_len': 0,
588                    'src_ip_addr': b'\x00\x00\x00\x00',
589                    'dstport_or_icmpcode_first': 0,
590                    'dstport_or_icmpcode_last': 0,
591                    'dst_ip_addr': b'\x00\x00\x00\x00',
592                    'dst_ip_prefix_len': 0}]
593
594         reply = self.vapi.acl_add_replace(acl_index=4294967295, r=r_deny,
595                                           tag=b"deny 1234;permit all")
596         self.assertEqual(reply.retval, 0)
597         # The second ACL gets #1
598         self.assertEqual(reply.acl_index, 1)
599         second_acl = reply.acl_index
600
601         # Test 2: try to modify a nonexistent ACL
602         reply = self.vapi.acl_add_replace(acl_index=432, r=r,
603                                           tag=b"FFFF:FFFF", expected_retval=-6)
604         self.assertEqual(reply.retval, -6)
605         # The ACL number should pass through
606         self.assertEqual(reply.acl_index, 432)
607         # apply an ACL on an interface inbound, try to delete ACL, must fail
608         self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
609                                              n_input=1,
610                                              acls=[first_acl])
611         reply = self.vapi.acl_del(acl_index=first_acl, expected_retval=-142)
612         # Unapply an ACL and then try to delete it - must be ok
613         self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
614                                              n_input=0,
615                                              acls=[])
616         reply = self.vapi.acl_del(acl_index=first_acl, expected_retval=0)
617
618         # apply an ACL on an interface outbound, try to delete ACL, must fail
619         self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
620                                              n_input=0,
621                                              acls=[second_acl])
622         reply = self.vapi.acl_del(acl_index=second_acl, expected_retval=-143)
623         # Unapply the ACL and then try to delete it - must be ok
624         self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
625                                              n_input=0,
626                                              acls=[])
627         reply = self.vapi.acl_del(acl_index=second_acl, expected_retval=0)
628
629         # try to apply a nonexistent ACL - must fail
630         self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
631                                              n_input=1,
632                                              acls=[first_acl],
633                                              expected_retval=-6)
634
635         self.logger.info("ACLP_TEST_FINISH_0001")
636
637     def test_0002_acl_permit_apply(self):
638         """ permit ACL apply test
639         """
640         self.logger.info("ACLP_TEST_START_0002")
641
642         rules = []
643         rules.append(self.create_rule(self.IPV4, self.PERMIT,
644                      0, self.proto[self.IP][self.UDP]))
645         rules.append(self.create_rule(self.IPV4, self.PERMIT,
646                      0, self.proto[self.IP][self.TCP]))
647
648         # Apply rules
649         self.apply_rules(rules, b"permit per-flow")
650
651         # Traffic should still pass
652         self.run_verify_test(self.IP, self.IPV4, -1)
653         self.logger.info("ACLP_TEST_FINISH_0002")
654
655     def test_0003_acl_deny_apply(self):
656         """ deny ACL apply test
657         """
658         self.logger.info("ACLP_TEST_START_0003")
659         # Add a deny-flows ACL
660         rules = []
661         rules.append(self.create_rule(self.IPV4, self.DENY,
662                      self.PORTS_ALL, self.proto[self.IP][self.UDP]))
663         # Permit ip any any in the end
664         rules.append(self.create_rule(self.IPV4, self.PERMIT,
665                                       self.PORTS_ALL, 0))
666
667         # Apply rules
668         self.apply_rules(rules, b"deny per-flow;permit all")
669
670         # Traffic should not pass
671         self.run_verify_negat_test(self.IP, self.IPV4,
672                                    self.proto[self.IP][self.UDP])
673         self.logger.info("ACLP_TEST_FINISH_0003")
674         # self.assertEqual(1, 0)
675
676     def test_0004_vpp624_permit_icmpv4(self):
677         """ VPP_624 permit ICMPv4
678         """
679         self.logger.info("ACLP_TEST_START_0004")
680
681         # Add an ACL
682         rules = []
683         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
684                                       self.proto[self.ICMP][self.ICMPv4]))
685         # deny ip any any in the end
686         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
687
688         # Apply rules
689         self.apply_rules(rules, b"permit icmpv4")
690
691         # Traffic should still pass
692         self.run_verify_test(self.ICMP, self.IPV4,
693                              self.proto[self.ICMP][self.ICMPv4])
694
695         self.logger.info("ACLP_TEST_FINISH_0004")
696
697     def test_0005_vpp624_permit_icmpv6(self):
698         """ VPP_624 permit ICMPv6
699         """
700         self.logger.info("ACLP_TEST_START_0005")
701
702         # Add an ACL
703         rules = []
704         rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
705                                       self.proto[self.ICMP][self.ICMPv6]))
706         # deny ip any any in the end
707         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
708
709         # Apply rules
710         self.apply_rules(rules, b"permit icmpv6")
711
712         # Traffic should still pass
713         self.run_verify_test(self.ICMP, self.IPV6,
714                              self.proto[self.ICMP][self.ICMPv6])
715
716         self.logger.info("ACLP_TEST_FINISH_0005")
717
718     def test_0006_vpp624_deny_icmpv4(self):
719         """ VPP_624 deny ICMPv4
720         """
721         self.logger.info("ACLP_TEST_START_0006")
722         # Add an ACL
723         rules = []
724         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
725                                       self.proto[self.ICMP][self.ICMPv4]))
726         # permit ip any any in the end
727         rules.append(self.create_rule(self.IPV4, self.PERMIT,
728                                       self.PORTS_ALL, 0))
729
730         # Apply rules
731         self.apply_rules(rules, b"deny icmpv4")
732
733         # Traffic should not pass
734         self.run_verify_negat_test(self.ICMP, self.IPV4, 0)
735
736         self.logger.info("ACLP_TEST_FINISH_0006")
737
738     def test_0007_vpp624_deny_icmpv6(self):
739         """ VPP_624 deny ICMPv6
740         """
741         self.logger.info("ACLP_TEST_START_0007")
742         # Add an ACL
743         rules = []
744         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
745                                       self.proto[self.ICMP][self.ICMPv6]))
746         # deny ip any any in the end
747         rules.append(self.create_rule(self.IPV6, self.PERMIT,
748                                       self.PORTS_ALL, 0))
749
750         # Apply rules
751         self.apply_rules(rules, b"deny icmpv6")
752
753         # Traffic should not pass
754         self.run_verify_negat_test(self.ICMP, self.IPV6, 0)
755
756         self.logger.info("ACLP_TEST_FINISH_0007")
757
758     def test_0008_tcp_permit_v4(self):
759         """ permit TCPv4
760         """
761         self.logger.info("ACLP_TEST_START_0008")
762
763         # Add an ACL
764         rules = []
765         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
766                      self.proto[self.IP][self.TCP]))
767         # deny ip any any in the end
768         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
769
770         # Apply rules
771         self.apply_rules(rules, b"permit ipv4 tcp")
772
773         # Traffic should still pass
774         self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP])
775
776         self.logger.info("ACLP_TEST_FINISH_0008")
777
778     def test_0009_tcp_permit_v6(self):
779         """ permit TCPv6
780         """
781         self.logger.info("ACLP_TEST_START_0009")
782
783         # Add an ACL
784         rules = []
785         rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
786                                       self.proto[self.IP][self.TCP]))
787         # deny ip any any in the end
788         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
789
790         # Apply rules
791         self.apply_rules(rules, b"permit ip6 tcp")
792
793         # Traffic should still pass
794         self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.TCP])
795
796         self.logger.info("ACLP_TEST_FINISH_0008")
797
798     def test_0010_udp_permit_v4(self):
799         """ permit UDPv4
800         """
801         self.logger.info("ACLP_TEST_START_0010")
802
803         # Add an ACL
804         rules = []
805         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
806                                       self.proto[self.IP][self.UDP]))
807         # deny ip any any in the end
808         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
809
810         # Apply rules
811         self.apply_rules(rules, b"permit ipv udp")
812
813         # Traffic should still pass
814         self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP])
815
816         self.logger.info("ACLP_TEST_FINISH_0010")
817
818     def test_0011_udp_permit_v6(self):
819         """ permit UDPv6
820         """
821         self.logger.info("ACLP_TEST_START_0011")
822
823         # Add an ACL
824         rules = []
825         rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
826                                       self.proto[self.IP][self.UDP]))
827         # deny ip any any in the end
828         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
829
830         # Apply rules
831         self.apply_rules(rules, b"permit ip6 udp")
832
833         # Traffic should still pass
834         self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.UDP])
835
836         self.logger.info("ACLP_TEST_FINISH_0011")
837
838     def test_0012_tcp_deny(self):
839         """ deny TCPv4/v6
840         """
841         self.logger.info("ACLP_TEST_START_0012")
842
843         # Add an ACL
844         rules = []
845         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
846                                       self.proto[self.IP][self.TCP]))
847         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
848                                       self.proto[self.IP][self.TCP]))
849         # permit ip any any in the end
850         rules.append(self.create_rule(self.IPV4, self.PERMIT,
851                                       self.PORTS_ALL, 0))
852         rules.append(self.create_rule(self.IPV6, self.PERMIT,
853                                       self.PORTS_ALL, 0))
854
855         # Apply rules
856         self.apply_rules(rules, b"deny ip4/ip6 tcp")
857
858         # Traffic should not pass
859         self.run_verify_negat_test(self.IP, self.IPRANDOM,
860                                    self.proto[self.IP][self.TCP])
861
862         self.logger.info("ACLP_TEST_FINISH_0012")
863
864     def test_0013_udp_deny(self):
865         """ deny UDPv4/v6
866         """
867         self.logger.info("ACLP_TEST_START_0013")
868
869         # Add an ACL
870         rules = []
871         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
872                                       self.proto[self.IP][self.UDP]))
873         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
874                                       self.proto[self.IP][self.UDP]))
875         # permit ip any any in the end
876         rules.append(self.create_rule(self.IPV4, self.PERMIT,
877                                       self.PORTS_ALL, 0))
878         rules.append(self.create_rule(self.IPV6, self.PERMIT,
879                                       self.PORTS_ALL, 0))
880
881         # Apply rules
882         self.apply_rules(rules, b"deny ip4/ip6 udp")
883
884         # Traffic should not pass
885         self.run_verify_negat_test(self.IP, self.IPRANDOM,
886                                    self.proto[self.IP][self.UDP])
887
888         self.logger.info("ACLP_TEST_FINISH_0013")
889
890     def test_0014_acl_dump(self):
891         """ verify add/dump acls
892         """
893         self.logger.info("ACLP_TEST_START_0014")
894
895         r = [[self.IPV4, self.PERMIT, 1234, self.proto[self.IP][self.TCP]],
896              [self.IPV4, self.PERMIT, 2345, self.proto[self.IP][self.UDP]],
897              [self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.TCP]],
898              [self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.UDP]],
899              [self.IPV4, self.PERMIT, 5, self.proto[self.ICMP][self.ICMPv4]],
900              [self.IPV6, self.PERMIT, 4321, self.proto[self.IP][self.TCP]],
901              [self.IPV6, self.PERMIT, 5432, self.proto[self.IP][self.UDP]],
902              [self.IPV6, self.PERMIT, 0, self.proto[self.IP][self.TCP]],
903              [self.IPV6, self.PERMIT, 0, self.proto[self.IP][self.UDP]],
904              [self.IPV6, self.PERMIT, 6, self.proto[self.ICMP][self.ICMPv6]],
905              [self.IPV4, self.DENY, self.PORTS_ALL, 0],
906              [self.IPV4, self.DENY, 1234, self.proto[self.IP][self.TCP]],
907              [self.IPV4, self.DENY, 2345, self.proto[self.IP][self.UDP]],
908              [self.IPV4, self.DENY, 5, self.proto[self.ICMP][self.ICMPv4]],
909              [self.IPV6, self.DENY, 4321, self.proto[self.IP][self.TCP]],
910              [self.IPV6, self.DENY, 5432, self.proto[self.IP][self.UDP]],
911              [self.IPV6, self.DENY, 6, self.proto[self.ICMP][self.ICMPv6]],
912              [self.IPV6, self.DENY, self.PORTS_ALL, 0]
913              ]
914
915         # Add and verify new ACLs
916         rules = []
917         for i in range(len(r)):
918             rules.append(self.create_rule(r[i][0], r[i][1], r[i][2], r[i][3]))
919
920         reply = self.vapi.acl_add_replace(acl_index=4294967295, r=rules)
921         result = self.vapi.acl_dump(reply.acl_index)
922
923         i = 0
924         for drules in result:
925             for dr in drules.r:
926                 self.assertEqual(dr.is_ipv6, r[i][0])
927                 self.assertEqual(dr.is_permit, r[i][1])
928                 self.assertEqual(dr.proto, r[i][3])
929
930                 if r[i][2] > 0:
931                     self.assertEqual(dr.srcport_or_icmptype_first, r[i][2])
932                 else:
933                     if r[i][2] < 0:
934                         self.assertEqual(dr.srcport_or_icmptype_first, 0)
935                         self.assertEqual(dr.srcport_or_icmptype_last, 65535)
936                     else:
937                         if dr.proto == self.proto[self.IP][self.TCP]:
938                             self.assertGreater(dr.srcport_or_icmptype_first,
939                                                self.tcp_sport_from-1)
940                             self.assertLess(dr.srcport_or_icmptype_first,
941                                             self.tcp_sport_to+1)
942                             self.assertGreater(dr.dstport_or_icmpcode_last,
943                                                self.tcp_dport_from-1)
944                             self.assertLess(dr.dstport_or_icmpcode_last,
945                                             self.tcp_dport_to+1)
946                         elif dr.proto == self.proto[self.IP][self.UDP]:
947                             self.assertGreater(dr.srcport_or_icmptype_first,
948                                                self.udp_sport_from-1)
949                             self.assertLess(dr.srcport_or_icmptype_first,
950                                             self.udp_sport_to+1)
951                             self.assertGreater(dr.dstport_or_icmpcode_last,
952                                                self.udp_dport_from-1)
953                             self.assertLess(dr.dstport_or_icmpcode_last,
954                                             self.udp_dport_to+1)
955                 i += 1
956
957         self.logger.info("ACLP_TEST_FINISH_0014")
958
959     def test_0015_tcp_permit_port_v4(self):
960         """ permit single TCPv4
961         """
962         self.logger.info("ACLP_TEST_START_0015")
963
964         port = random.randint(0, 65535)
965         # Add an ACL
966         rules = []
967         rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
968                                       self.proto[self.IP][self.TCP]))
969         # deny ip any any in the end
970         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
971
972         # Apply rules
973         self.apply_rules(rules, b"permit ip4 tcp %d" % port)
974
975         # Traffic should still pass
976         self.run_verify_test(self.IP, self.IPV4,
977                              self.proto[self.IP][self.TCP], port)
978
979         self.logger.info("ACLP_TEST_FINISH_0015")
980
981     def test_0016_udp_permit_port_v4(self):
982         """ permit single UDPv4
983         """
984         self.logger.info("ACLP_TEST_START_0016")
985
986         port = random.randint(0, 65535)
987         # Add an ACL
988         rules = []
989         rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
990                                       self.proto[self.IP][self.UDP]))
991         # deny ip any any in the end
992         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
993
994         # Apply rules
995         self.apply_rules(rules, b"permit ip4 tcp %d" % port)
996
997         # Traffic should still pass
998         self.run_verify_test(self.IP, self.IPV4,
999                              self.proto[self.IP][self.UDP], port)
1000
1001         self.logger.info("ACLP_TEST_FINISH_0016")
1002
1003     def test_0017_tcp_permit_port_v6(self):
1004         """ permit single TCPv6
1005         """
1006         self.logger.info("ACLP_TEST_START_0017")
1007
1008         port = random.randint(0, 65535)
1009         # Add an ACL
1010         rules = []
1011         rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
1012                                       self.proto[self.IP][self.TCP]))
1013         # deny ip any any in the end
1014         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1015
1016         # Apply rules
1017         self.apply_rules(rules, b"permit ip4 tcp %d" % port)
1018
1019         # Traffic should still pass
1020         self.run_verify_test(self.IP, self.IPV6,
1021                              self.proto[self.IP][self.TCP], port)
1022
1023         self.logger.info("ACLP_TEST_FINISH_0017")
1024
1025     def test_0018_udp_permit_port_v6(self):
1026         """ permit single UPPv6
1027         """
1028         self.logger.info("ACLP_TEST_START_0018")
1029
1030         port = random.randint(0, 65535)
1031         # Add an ACL
1032         rules = []
1033         rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
1034                                       self.proto[self.IP][self.UDP]))
1035         # deny ip any any in the end
1036         rules.append(self.create_rule(self.IPV6, self.DENY,
1037                                       self.PORTS_ALL, 0))
1038
1039         # Apply rules
1040         self.apply_rules(rules, b"permit ip4 tcp %d" % port)
1041
1042         # Traffic should still pass
1043         self.run_verify_test(self.IP, self.IPV6,
1044                              self.proto[self.IP][self.UDP], port)
1045
1046         self.logger.info("ACLP_TEST_FINISH_0018")
1047
1048     def test_0019_udp_deny_port(self):
1049         """ deny single TCPv4/v6
1050         """
1051         self.logger.info("ACLP_TEST_START_0019")
1052
1053         port = random.randint(0, 65535)
1054         # Add an ACL
1055         rules = []
1056         rules.append(self.create_rule(self.IPV4, self.DENY, port,
1057                                       self.proto[self.IP][self.TCP]))
1058         rules.append(self.create_rule(self.IPV6, self.DENY, port,
1059                                       self.proto[self.IP][self.TCP]))
1060         # Permit ip any any in the end
1061         rules.append(self.create_rule(self.IPV4, self.PERMIT,
1062                                       self.PORTS_ALL, 0))
1063         rules.append(self.create_rule(self.IPV6, self.PERMIT,
1064                                       self.PORTS_ALL, 0))
1065
1066         # Apply rules
1067         self.apply_rules(rules, b"deny ip4/ip6 udp %d" % port)
1068
1069         # Traffic should not pass
1070         self.run_verify_negat_test(self.IP, self.IPRANDOM,
1071                                    self.proto[self.IP][self.TCP], port)
1072
1073         self.logger.info("ACLP_TEST_FINISH_0019")
1074
1075     def test_0020_udp_deny_port(self):
1076         """ deny single UDPv4/v6
1077         """
1078         self.logger.info("ACLP_TEST_START_0020")
1079
1080         port = random.randint(0, 65535)
1081         # Add an ACL
1082         rules = []
1083         rules.append(self.create_rule(self.IPV4, self.DENY, port,
1084                                       self.proto[self.IP][self.UDP]))
1085         rules.append(self.create_rule(self.IPV6, self.DENY, port,
1086                                       self.proto[self.IP][self.UDP]))
1087         # Permit ip any any in the end
1088         rules.append(self.create_rule(self.IPV4, self.PERMIT,
1089                                       self.PORTS_ALL, 0))
1090         rules.append(self.create_rule(self.IPV6, self.PERMIT,
1091                                       self.PORTS_ALL, 0))
1092
1093         # Apply rules
1094         self.apply_rules(rules, b"deny ip4/ip6 udp %d" % port)
1095
1096         # Traffic should not pass
1097         self.run_verify_negat_test(self.IP, self.IPRANDOM,
1098                                    self.proto[self.IP][self.UDP], port)
1099
1100         self.logger.info("ACLP_TEST_FINISH_0020")
1101
1102     def test_0021_udp_deny_port_verify_fragment_deny(self):
1103         """ deny single UDPv4/v6, permit ip any, verify non-initial fragment
1104         blocked
1105         """
1106         self.logger.info("ACLP_TEST_START_0021")
1107
1108         port = random.randint(0, 65535)
1109         # Add an ACL
1110         rules = []
1111         rules.append(self.create_rule(self.IPV4, self.DENY, port,
1112                                       self.proto[self.IP][self.UDP]))
1113         rules.append(self.create_rule(self.IPV6, self.DENY, port,
1114                                       self.proto[self.IP][self.UDP]))
1115         # deny ip any any in the end
1116         rules.append(self.create_rule(self.IPV4, self.PERMIT,
1117                                       self.PORTS_ALL, 0))
1118         rules.append(self.create_rule(self.IPV6, self.PERMIT,
1119                                       self.PORTS_ALL, 0))
1120
1121         # Apply rules
1122         self.apply_rules(rules, b"deny ip4/ip6 udp %d" % port)
1123
1124         # Traffic should not pass
1125         self.run_verify_negat_test(self.IP, self.IPRANDOM,
1126                                    self.proto[self.IP][self.UDP], port, True)
1127
1128         self.logger.info("ACLP_TEST_FINISH_0021")
1129
1130     def test_0022_zero_length_udp_ipv4(self):
1131         """ VPP-687 zero length udp ipv4 packet"""
1132         self.logger.info("ACLP_TEST_START_0022")
1133
1134         port = random.randint(0, 65535)
1135         # Add an ACL
1136         rules = []
1137         rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
1138                                       self.proto[self.IP][self.UDP]))
1139         # deny ip any any in the end
1140         rules.append(
1141             self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1142
1143         # Apply rules
1144         self.apply_rules(rules, b"permit empty udp ip4 %d" % port)
1145
1146         # Traffic should still pass
1147         # Create incoming packet streams for packet-generator interfaces
1148         pkts_cnt = 0
1149         pkts = self.create_stream(self.pg0, self.pg_if_packet_sizes,
1150                                   self.IP, self.IPV4,
1151                                   self.proto[self.IP][self.UDP], port,
1152                                   False, False)
1153         if len(pkts) > 0:
1154             self.pg0.add_stream(pkts)
1155             pkts_cnt += len(pkts)
1156
1157         # Enable packet capture and start packet sendingself.IPV
1158         self.pg_enable_capture(self.pg_interfaces)
1159         self.pg_start()
1160
1161         self.pg1.get_capture(pkts_cnt)
1162
1163         self.logger.info("ACLP_TEST_FINISH_0022")
1164
1165     def test_0023_zero_length_udp_ipv6(self):
1166         """ VPP-687 zero length udp ipv6 packet"""
1167         self.logger.info("ACLP_TEST_START_0023")
1168
1169         port = random.randint(0, 65535)
1170         # Add an ACL
1171         rules = []
1172         rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
1173                                       self.proto[self.IP][self.UDP]))
1174         # deny ip any any in the end
1175         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1176
1177         # Apply rules
1178         self.apply_rules(rules, b"permit empty udp ip6 %d" % port)
1179
1180         # Traffic should still pass
1181         # Create incoming packet streams for packet-generator interfaces
1182         pkts_cnt = 0
1183         pkts = self.create_stream(self.pg0, self.pg_if_packet_sizes,
1184                                   self.IP, self.IPV6,
1185                                   self.proto[self.IP][self.UDP], port,
1186                                   False, False)
1187         if len(pkts) > 0:
1188             self.pg0.add_stream(pkts)
1189             pkts_cnt += len(pkts)
1190
1191         # Enable packet capture and start packet sendingself.IPV
1192         self.pg_enable_capture(self.pg_interfaces)
1193         self.pg_start()
1194
1195         # Verify outgoing packet streams per packet-generator interface
1196         self.pg1.get_capture(pkts_cnt)
1197
1198         self.logger.info("ACLP_TEST_FINISH_0023")
1199
1200     def test_0108_tcp_permit_v4(self):
1201         """ permit TCPv4 + non-match range
1202         """
1203         self.logger.info("ACLP_TEST_START_0108")
1204
1205         # Add an ACL
1206         rules = []
1207         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1208                      self.proto[self.IP][self.TCP]))
1209         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1210                      self.proto[self.IP][self.TCP]))
1211         # deny ip any any in the end
1212         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1213
1214         # Apply rules
1215         self.apply_rules(rules, b"permit ipv4 tcp")
1216
1217         # Traffic should still pass
1218         self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP])
1219
1220         self.logger.info("ACLP_TEST_FINISH_0108")
1221
1222     def test_0109_tcp_permit_v6(self):
1223         """ permit TCPv6 + non-match range
1224         """
1225         self.logger.info("ACLP_TEST_START_0109")
1226
1227         # Add an ACL
1228         rules = []
1229         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE_2,
1230                                       self.proto[self.IP][self.TCP]))
1231         rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
1232                                       self.proto[self.IP][self.TCP]))
1233         # deny ip any any in the end
1234         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1235
1236         # Apply rules
1237         self.apply_rules(rules, b"permit ip6 tcp")
1238
1239         # Traffic should still pass
1240         self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.TCP])
1241
1242         self.logger.info("ACLP_TEST_FINISH_0109")
1243
1244     def test_0110_udp_permit_v4(self):
1245         """ permit UDPv4 + non-match range
1246         """
1247         self.logger.info("ACLP_TEST_START_0110")
1248
1249         # Add an ACL
1250         rules = []
1251         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1252                                       self.proto[self.IP][self.UDP]))
1253         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1254                                       self.proto[self.IP][self.UDP]))
1255         # deny ip any any in the end
1256         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1257
1258         # Apply rules
1259         self.apply_rules(rules, b"permit ipv4 udp")
1260
1261         # Traffic should still pass
1262         self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP])
1263
1264         self.logger.info("ACLP_TEST_FINISH_0110")
1265
1266     def test_0111_udp_permit_v6(self):
1267         """ permit UDPv6 + non-match range
1268         """
1269         self.logger.info("ACLP_TEST_START_0111")
1270
1271         # Add an ACL
1272         rules = []
1273         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE_2,
1274                                       self.proto[self.IP][self.UDP]))
1275         rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
1276                                       self.proto[self.IP][self.UDP]))
1277         # deny ip any any in the end
1278         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1279
1280         # Apply rules
1281         self.apply_rules(rules, b"permit ip6 udp")
1282
1283         # Traffic should still pass
1284         self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.UDP])
1285
1286         self.logger.info("ACLP_TEST_FINISH_0111")
1287
1288     def test_0112_tcp_deny(self):
1289         """ deny TCPv4/v6 + non-match range
1290         """
1291         self.logger.info("ACLP_TEST_START_0112")
1292
1293         # Add an ACL
1294         rules = []
1295         rules.append(self.create_rule(self.IPV4, self.PERMIT,
1296                                       self.PORTS_RANGE_2,
1297                                       self.proto[self.IP][self.TCP]))
1298         rules.append(self.create_rule(self.IPV6, self.PERMIT,
1299                                       self.PORTS_RANGE_2,
1300                                       self.proto[self.IP][self.TCP]))
1301         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
1302                                       self.proto[self.IP][self.TCP]))
1303         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
1304                                       self.proto[self.IP][self.TCP]))
1305         # permit ip any any in the end
1306         rules.append(self.create_rule(self.IPV4, self.PERMIT,
1307                                       self.PORTS_ALL, 0))
1308         rules.append(self.create_rule(self.IPV6, self.PERMIT,
1309                                       self.PORTS_ALL, 0))
1310
1311         # Apply rules
1312         self.apply_rules(rules, b"deny ip4/ip6 tcp")
1313
1314         # Traffic should not pass
1315         self.run_verify_negat_test(self.IP, self.IPRANDOM,
1316                                    self.proto[self.IP][self.TCP])
1317
1318         self.logger.info("ACLP_TEST_FINISH_0112")
1319
1320     def test_0113_udp_deny(self):
1321         """ deny UDPv4/v6 + non-match range
1322         """
1323         self.logger.info("ACLP_TEST_START_0113")
1324
1325         # Add an ACL
1326         rules = []
1327         rules.append(self.create_rule(self.IPV4, self.PERMIT,
1328                                       self.PORTS_RANGE_2,
1329                                       self.proto[self.IP][self.UDP]))
1330         rules.append(self.create_rule(self.IPV6, self.PERMIT,
1331                                       self.PORTS_RANGE_2,
1332                                       self.proto[self.IP][self.UDP]))
1333         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
1334                                       self.proto[self.IP][self.UDP]))
1335         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
1336                                       self.proto[self.IP][self.UDP]))
1337         # permit ip any any in the end
1338         rules.append(self.create_rule(self.IPV4, self.PERMIT,
1339                                       self.PORTS_ALL, 0))
1340         rules.append(self.create_rule(self.IPV6, self.PERMIT,
1341                                       self.PORTS_ALL, 0))
1342
1343         # Apply rules
1344         self.apply_rules(rules, b"deny ip4/ip6 udp")
1345
1346         # Traffic should not pass
1347         self.run_verify_negat_test(self.IP, self.IPRANDOM,
1348                                    self.proto[self.IP][self.UDP])
1349
1350         self.logger.info("ACLP_TEST_FINISH_0113")
1351
1352     def test_0300_tcp_permit_v4_etype_aaaa(self):
1353         """ permit TCPv4, send 0xAAAA etype
1354         """
1355         self.logger.info("ACLP_TEST_START_0300")
1356
1357         # Add an ACL
1358         rules = []
1359         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1360                      self.proto[self.IP][self.TCP]))
1361         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1362                      self.proto[self.IP][self.TCP]))
1363         # deny ip any any in the end
1364         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1365
1366         # Apply rules
1367         self.apply_rules(rules, b"permit ipv4 tcp")
1368
1369         # Traffic should still pass also for an odd ethertype
1370         self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP],
1371                              0, False, True, 0xaaaa)
1372         self.logger.info("ACLP_TEST_FINISH_0300")
1373
1374     def test_0305_tcp_permit_v4_etype_blacklist_aaaa(self):
1375         """ permit TCPv4, whitelist 0x0BBB ethertype, send 0xAAAA-blocked
1376         """
1377         self.logger.info("ACLP_TEST_START_0305")
1378
1379         # Add an ACL
1380         rules = []
1381         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1382                      self.proto[self.IP][self.TCP]))
1383         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1384                      self.proto[self.IP][self.TCP]))
1385         # deny ip any any in the end
1386         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1387
1388         # Apply rules
1389         self.apply_rules(rules, b"permit ipv4 tcp")
1390
1391         # whitelist the 0xbbbb etype - so the 0xaaaa should be blocked
1392         self.etype_whitelist([0xbbb], 1)
1393
1394         # The oddball ethertype should be blocked
1395         self.run_verify_negat_test(self.IP, self.IPV4,
1396                                    self.proto[self.IP][self.TCP],
1397                                    0, False, 0xaaaa)
1398
1399         # remove the whitelist
1400         self.etype_whitelist([], 0)
1401
1402         self.logger.info("ACLP_TEST_FINISH_0305")
1403
1404     def test_0306_tcp_permit_v4_etype_blacklist_aaaa(self):
1405         """ permit TCPv4, whitelist 0x0BBB ethertype, send 0x0BBB - pass
1406         """
1407         self.logger.info("ACLP_TEST_START_0306")
1408
1409         # Add an ACL
1410         rules = []
1411         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1412                      self.proto[self.IP][self.TCP]))
1413         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1414                      self.proto[self.IP][self.TCP]))
1415         # deny ip any any in the end
1416         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1417
1418         # Apply rules
1419         self.apply_rules(rules, b"permit ipv4 tcp")
1420
1421         # whitelist the 0xbbbb etype - so the 0xaaaa should be blocked
1422         self.etype_whitelist([0xbbb], 1)
1423
1424         # The whitelisted traffic, should pass
1425         self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP],
1426                              0, False, True, 0x0bbb)
1427
1428         # remove the whitelist, the previously blocked 0xAAAA should pass now
1429         self.etype_whitelist([], 0)
1430
1431         self.logger.info("ACLP_TEST_FINISH_0306")
1432
1433     def test_0307_tcp_permit_v4_etype_blacklist_aaaa(self):
1434         """ permit TCPv4, whitelist 0x0BBB, remove, send 0xAAAA - pass
1435         """
1436         self.logger.info("ACLP_TEST_START_0307")
1437
1438         # Add an ACL
1439         rules = []
1440         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1441                      self.proto[self.IP][self.TCP]))
1442         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1443                      self.proto[self.IP][self.TCP]))
1444         # deny ip any any in the end
1445         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1446
1447         # Apply rules
1448         self.apply_rules(rules, b"permit ipv4 tcp")
1449
1450         # whitelist the 0xbbbb etype - so the 0xaaaa should be blocked
1451         self.etype_whitelist([0xbbb], 1)
1452         # remove the whitelist, the previously blocked 0xAAAA should pass now
1453         self.etype_whitelist([], 0)
1454
1455         # The whitelisted traffic, should pass
1456         self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP],
1457                              0, False, True, 0xaaaa)
1458
1459         self.logger.info("ACLP_TEST_FINISH_0306")
1460
1461     def test_0315_del_intf(self):
1462         """ apply an acl and delete the interface
1463         """
1464         self.logger.info("ACLP_TEST_START_0315")
1465
1466         # Add an ACL
1467         rules = []
1468         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1469                      self.proto[self.IP][self.TCP]))
1470         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1471                      self.proto[self.IP][self.TCP]))
1472         # deny ip any any in the end
1473         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1474
1475         # create an interface
1476         intf = []
1477         intf.append(VppLoInterface(self))
1478
1479         # Apply rules
1480         self.apply_rules_to(rules, b"permit ipv4 tcp", intf[0].sw_if_index)
1481
1482         # Remove the interface
1483         intf[0].remove_vpp_config()
1484
1485         self.logger.info("ACLP_TEST_FINISH_0315")
1486
1487 if __name__ == '__main__':
1488     unittest.main(testRunner=VppTestRunner)