Tests: Refactor tearDown show command logging, add lifecycle markers.
[vpp.git] / test / test_acl_plugin.py
1 #!/usr/bin/env python
2 """ACL plugin Test Case HLD:
3 """
4
5 import unittest
6 import random
7
8 from scapy.packet import Raw
9 from scapy.layers.l2 import Ether
10 from scapy.layers.inet import IP, TCP, UDP, ICMP
11 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest
12 from scapy.layers.inet6 import IPv6ExtHdrFragment
13 from framework import VppTestCase, VppTestRunner
14 from util import Host, ppp
15
16 from vpp_lo_interface import VppLoInterface
17
18
19 class TestACLplugin(VppTestCase):
20     """ ACL plugin Test Case """
21
22     # traffic types
23     IP = 0
24     ICMP = 1
25
26     # IP version
27     IPRANDOM = -1
28     IPV4 = 0
29     IPV6 = 1
30
31     # rule types
32     DENY = 0
33     PERMIT = 1
34
35     # supported protocols
36     proto = [[6, 17], [1, 58]]
37     proto_map = {1: 'ICMP', 58: 'ICMPv6EchoRequest', 6: 'TCP', 17: 'UDP'}
38     ICMPv4 = 0
39     ICMPv6 = 1
40     TCP = 0
41     UDP = 1
42     PROTO_ALL = 0
43
44     # port ranges
45     PORTS_ALL = -1
46     PORTS_RANGE = 0
47     PORTS_RANGE_2 = 1
48     udp_sport_from = 10
49     udp_sport_to = udp_sport_from + 5
50     udp_dport_from = 20000
51     udp_dport_to = udp_dport_from + 5000
52     tcp_sport_from = 30
53     tcp_sport_to = tcp_sport_from + 5
54     tcp_dport_from = 40000
55     tcp_dport_to = tcp_dport_from + 5000
56
57     udp_sport_from_2 = 90
58     udp_sport_to_2 = udp_sport_from_2 + 5
59     udp_dport_from_2 = 30000
60     udp_dport_to_2 = udp_dport_from_2 + 5000
61     tcp_sport_from_2 = 130
62     tcp_sport_to_2 = tcp_sport_from_2 + 5
63     tcp_dport_from_2 = 20000
64     tcp_dport_to_2 = tcp_dport_from_2 + 5000
65
66     icmp4_type = 8  # echo request
67     icmp4_code = 3
68     icmp6_type = 128  # echo request
69     icmp6_code = 3
70
71     icmp4_type_2 = 8
72     icmp4_code_from_2 = 5
73     icmp4_code_to_2 = 20
74     icmp6_type_2 = 128
75     icmp6_code_from_2 = 8
76     icmp6_code_to_2 = 42
77
78     # Test variables
79     bd_id = 1
80
81     @classmethod
82     def setUpClass(cls):
83         """
84         Perform standard class setup (defined by class method setUpClass in
85         class VppTestCase) before running the test case, set test case related
86         variables and configure VPP.
87         """
88         super(TestACLplugin, cls).setUpClass()
89
90         try:
91             # Create 2 pg interfaces
92             cls.create_pg_interfaces(range(2))
93
94             # Packet flows mapping pg0 -> pg1, pg2 etc.
95             cls.flows = dict()
96             cls.flows[cls.pg0] = [cls.pg1]
97
98             # Packet sizes
99             cls.pg_if_packet_sizes = [64, 512, 1518, 9018]
100
101             # Create BD with MAC learning and unknown unicast flooding disabled
102             # and put interfaces to this BD
103             cls.vapi.bridge_domain_add_del(bd_id=cls.bd_id, uu_flood=1,
104                                            learn=1)
105             for pg_if in cls.pg_interfaces:
106                 cls.vapi.sw_interface_set_l2_bridge(
107                     rx_sw_if_index=pg_if.sw_if_index, bd_id=cls.bd_id)
108
109             # Set up all interfaces
110             for i in cls.pg_interfaces:
111                 i.admin_up()
112
113             # Mapping between packet-generator index and lists of test hosts
114             cls.hosts_by_pg_idx = dict()
115             for pg_if in cls.pg_interfaces:
116                 cls.hosts_by_pg_idx[pg_if.sw_if_index] = []
117
118             # Create list of deleted hosts
119             cls.deleted_hosts_by_pg_idx = dict()
120             for pg_if in cls.pg_interfaces:
121                 cls.deleted_hosts_by_pg_idx[pg_if.sw_if_index] = []
122
123             # warm-up the mac address tables
124             # self.warmup_test()
125             count = 16
126             start = 0
127             n_int = len(cls.pg_interfaces)
128             macs_per_if = count / n_int
129             i = -1
130             for pg_if in cls.pg_interfaces:
131                 i += 1
132                 start_nr = macs_per_if * i + start
133                 end_nr = count + start if i == (n_int - 1) \
134                     else macs_per_if * (i + 1) + start
135                 hosts = cls.hosts_by_pg_idx[pg_if.sw_if_index]
136                 for j in range(start_nr, end_nr):
137                     host = Host(
138                         "00:00:00:ff:%02x:%02x" % (pg_if.sw_if_index, j),
139                         "172.17.1%02x.%u" % (pg_if.sw_if_index, j),
140                         "2017:dead:%02x::%u" % (pg_if.sw_if_index, j))
141                     hosts.append(host)
142
143         except Exception:
144             super(TestACLplugin, cls).tearDownClass()
145             raise
146
147     @classmethod
148     def tearDownClass(cls):
149         super(TestACLplugin, cls).tearDownClass()
150
151     def setUp(self):
152         super(TestACLplugin, self).setUp()
153         self.reset_packet_infos()
154
155     def tearDown(self):
156         """
157         Show various debug prints after each test.
158         """
159         super(TestACLplugin, self).tearDown()
160
161     def show_commands_at_teardown(self):
162         cli = "show vlib graph l2-input-feat-arc"
163         self.logger.info(self.vapi.ppcli(cli))
164         cli = "show vlib graph l2-input-feat-arc-end"
165         self.logger.info(self.vapi.ppcli(cli))
166         cli = "show vlib graph l2-output-feat-arc"
167         self.logger.info(self.vapi.ppcli(cli))
168         cli = "show vlib graph l2-output-feat-arc-end"
169         self.logger.info(self.vapi.ppcli(cli))
170         self.logger.info(self.vapi.ppcli("show l2fib verbose"))
171         self.logger.info(self.vapi.ppcli("show acl-plugin acl"))
172         self.logger.info(self.vapi.ppcli("show acl-plugin interface"))
173         self.logger.info(self.vapi.ppcli("show acl-plugin tables"))
174         self.logger.info(self.vapi.ppcli("show bridge-domain %s detail"
175                                          % self.bd_id))
176
177     def create_rule(self, ip=0, permit_deny=0, ports=PORTS_ALL, proto=-1,
178                     s_prefix=0, s_ip='\x00\x00\x00\x00',
179                     d_prefix=0, d_ip='\x00\x00\x00\x00'):
180         if proto == -1:
181             return
182         if ports == self.PORTS_ALL:
183             sport_from = 0
184             dport_from = 0
185             sport_to = 65535 if proto != 1 and proto != 58 else 255
186             dport_to = sport_to
187         elif ports == self.PORTS_RANGE:
188             if proto == 1:
189                 sport_from = self.icmp4_type
190                 sport_to = self.icmp4_type
191                 dport_from = self.icmp4_code
192                 dport_to = self.icmp4_code
193             elif proto == 58:
194                 sport_from = self.icmp6_type
195                 sport_to = self.icmp6_type
196                 dport_from = self.icmp6_code
197                 dport_to = self.icmp6_code
198             elif proto == self.proto[self.IP][self.TCP]:
199                 sport_from = self.tcp_sport_from
200                 sport_to = self.tcp_sport_to
201                 dport_from = self.tcp_dport_from
202                 dport_to = self.tcp_dport_to
203             elif proto == self.proto[self.IP][self.UDP]:
204                 sport_from = self.udp_sport_from
205                 sport_to = self.udp_sport_to
206                 dport_from = self.udp_dport_from
207                 dport_to = self.udp_dport_to
208         elif ports == self.PORTS_RANGE_2:
209             if proto == 1:
210                 sport_from = self.icmp4_type_2
211                 sport_to = self.icmp4_type_2
212                 dport_from = self.icmp4_code_from_2
213                 dport_to = self.icmp4_code_to_2
214             elif proto == 58:
215                 sport_from = self.icmp6_type_2
216                 sport_to = self.icmp6_type_2
217                 dport_from = self.icmp6_code_from_2
218                 dport_to = self.icmp6_code_to_2
219             elif proto == self.proto[self.IP][self.TCP]:
220                 sport_from = self.tcp_sport_from_2
221                 sport_to = self.tcp_sport_to_2
222                 dport_from = self.tcp_dport_from_2
223                 dport_to = self.tcp_dport_to_2
224             elif proto == self.proto[self.IP][self.UDP]:
225                 sport_from = self.udp_sport_from_2
226                 sport_to = self.udp_sport_to_2
227                 dport_from = self.udp_dport_from_2
228                 dport_to = self.udp_dport_to_2
229         else:
230             sport_from = ports
231             sport_to = ports
232             dport_from = ports
233             dport_to = ports
234
235         rule = ({'is_permit': permit_deny, 'is_ipv6': ip, 'proto': proto,
236                  'srcport_or_icmptype_first': sport_from,
237                  'srcport_or_icmptype_last': sport_to,
238                  'src_ip_prefix_len': s_prefix,
239                  'src_ip_addr': s_ip,
240                  'dstport_or_icmpcode_first': dport_from,
241                  'dstport_or_icmpcode_last': dport_to,
242                  'dst_ip_prefix_len': d_prefix,
243                  'dst_ip_addr': d_ip})
244         return rule
245
246     def apply_rules(self, rules, tag=b''):
247         reply = self.vapi.acl_add_replace(acl_index=4294967295, r=rules,
248                                           tag=tag)
249         self.logger.info("Dumped ACL: " + str(
250             self.vapi.acl_dump(reply.acl_index)))
251         # Apply a ACL on the interface as inbound
252         for i in self.pg_interfaces:
253             self.vapi.acl_interface_set_acl_list(sw_if_index=i.sw_if_index,
254                                                  n_input=1,
255                                                  acls=[reply.acl_index])
256         return
257
258     def apply_rules_to(self, rules, tag=b'', sw_if_index=0xFFFFFFFF):
259         reply = self.vapi.acl_add_replace(acl_index=4294967295, r=rules,
260                                           tag=tag)
261         self.logger.info("Dumped ACL: " + str(
262             self.vapi.acl_dump(reply.acl_index)))
263         # Apply a ACL on the interface as inbound
264         self.vapi.acl_interface_set_acl_list(sw_if_index=sw_if_index,
265                                              n_input=1,
266                                              acls=[reply.acl_index])
267         return
268
269     def etype_whitelist(self, whitelist, n_input):
270         # Apply whitelists on all the interfaces
271         for i in self.pg_interfaces:
272             # checkstyle can't read long names. Help them.
273             fun = self.vapi.acl_interface_set_etype_whitelist
274             fun(sw_if_index=i.sw_if_index, n_input=n_input,
275                 whitelist=whitelist)
276         return
277
278     def create_upper_layer(self, packet_index, proto, ports=0):
279         p = self.proto_map[proto]
280         if p == 'UDP':
281             if ports == 0:
282                 return UDP(sport=random.randint(self.udp_sport_from,
283                                                 self.udp_sport_to),
284                            dport=random.randint(self.udp_dport_from,
285                                                 self.udp_dport_to))
286             else:
287                 return UDP(sport=ports, dport=ports)
288         elif p == 'TCP':
289             if ports == 0:
290                 return TCP(sport=random.randint(self.tcp_sport_from,
291                                                 self.tcp_sport_to),
292                            dport=random.randint(self.tcp_dport_from,
293                                                 self.tcp_dport_to))
294             else:
295                 return TCP(sport=ports, dport=ports)
296         return ''
297
298     def create_stream(self, src_if, packet_sizes, traffic_type=0, ipv6=0,
299                       proto=-1, ports=0, fragments=False,
300                       pkt_raw=True, etype=-1):
301         """
302         Create input packet stream for defined interface using hosts or
303         deleted_hosts list.
304
305         :param object src_if: Interface to create packet stream for.
306         :param list packet_sizes: List of required packet sizes.
307         :param traffic_type: 1: ICMP packet, 2: IPv6 with EH, 0: otherwise.
308         :return: Stream of packets.
309         """
310         pkts = []
311         if self.flows.__contains__(src_if):
312             src_hosts = self.hosts_by_pg_idx[src_if.sw_if_index]
313             for dst_if in self.flows[src_if]:
314                 dst_hosts = self.hosts_by_pg_idx[dst_if.sw_if_index]
315                 n_int = len(dst_hosts) * len(src_hosts)
316                 for i in range(0, n_int):
317                     dst_host = dst_hosts[i / len(src_hosts)]
318                     src_host = src_hosts[i % len(src_hosts)]
319                     pkt_info = self.create_packet_info(src_if, dst_if)
320                     if ipv6 == 1:
321                         pkt_info.ip = 1
322                     elif ipv6 == 0:
323                         pkt_info.ip = 0
324                     else:
325                         pkt_info.ip = random.choice([0, 1])
326                     if proto == -1:
327                         pkt_info.proto = random.choice(self.proto[self.IP])
328                     else:
329                         pkt_info.proto = proto
330                     payload = self.info_to_payload(pkt_info)
331                     p = Ether(dst=dst_host.mac, src=src_host.mac)
332                     if etype > 0:
333                         p = Ether(dst=dst_host.mac,
334                                   src=src_host.mac,
335                                   type=etype)
336                     if pkt_info.ip:
337                         p /= IPv6(dst=dst_host.ip6, src=src_host.ip6)
338                         if fragments:
339                             p /= IPv6ExtHdrFragment(offset=64, m=1)
340                     else:
341                         if fragments:
342                             p /= IP(src=src_host.ip4, dst=dst_host.ip4,
343                                     flags=1, frag=64)
344                         else:
345                             p /= IP(src=src_host.ip4, dst=dst_host.ip4)
346                     if traffic_type == self.ICMP:
347                         if pkt_info.ip:
348                             p /= ICMPv6EchoRequest(type=self.icmp6_type,
349                                                    code=self.icmp6_code)
350                         else:
351                             p /= ICMP(type=self.icmp4_type,
352                                       code=self.icmp4_code)
353                     else:
354                         p /= self.create_upper_layer(i, pkt_info.proto, ports)
355                     if pkt_raw:
356                         p /= Raw(payload)
357                         pkt_info.data = p.copy()
358                     if pkt_raw:
359                         size = random.choice(packet_sizes)
360                         self.extend_packet(p, size)
361                     pkts.append(p)
362         return pkts
363
364     def verify_capture(self, pg_if, capture,
365                        traffic_type=0, ip_type=0, etype=-1):
366         """
367         Verify captured input packet stream for defined interface.
368
369         :param object pg_if: Interface to verify captured packet stream for.
370         :param list capture: Captured packet stream.
371         :param traffic_type: 1: ICMP packet, 2: IPv6 with EH, 0: otherwise.
372         """
373         last_info = dict()
374         for i in self.pg_interfaces:
375             last_info[i.sw_if_index] = None
376         dst_sw_if_index = pg_if.sw_if_index
377         for packet in capture:
378             if etype > 0:
379                 if packet[Ether].type != etype:
380                     self.logger.error(ppp("Unexpected ethertype in packet:",
381                                           packet))
382                 else:
383                     continue
384             try:
385                 # Raw data for ICMPv6 are stored in ICMPv6EchoRequest.data
386                 if traffic_type == self.ICMP and ip_type == self.IPV6:
387                     payload_info = self.payload_to_info(
388                         packet[ICMPv6EchoRequest], 'data')
389                     payload = packet[ICMPv6EchoRequest]
390                 else:
391                     payload_info = self.payload_to_info(packet[Raw])
392                     payload = packet[self.proto_map[payload_info.proto]]
393             except:
394                 self.logger.error(ppp("Unexpected or invalid packet "
395                                       "(outside network):", packet))
396                 raise
397
398             if ip_type != 0:
399                 self.assertEqual(payload_info.ip, ip_type)
400             if traffic_type == self.ICMP:
401                 try:
402                     if payload_info.ip == 0:
403                         self.assertEqual(payload.type, self.icmp4_type)
404                         self.assertEqual(payload.code, self.icmp4_code)
405                     else:
406                         self.assertEqual(payload.type, self.icmp6_type)
407                         self.assertEqual(payload.code, self.icmp6_code)
408                 except:
409                     self.logger.error(ppp("Unexpected or invalid packet "
410                                           "(outside network):", packet))
411                     raise
412             else:
413                 try:
414                     ip_version = IPv6 if payload_info.ip == 1 else IP
415
416                     ip = packet[ip_version]
417                     packet_index = payload_info.index
418
419                     self.assertEqual(payload_info.dst, dst_sw_if_index)
420                     self.logger.debug("Got packet on port %s: src=%u (id=%u)" %
421                                       (pg_if.name, payload_info.src,
422                                        packet_index))
423                     next_info = self.get_next_packet_info_for_interface2(
424                         payload_info.src, dst_sw_if_index,
425                         last_info[payload_info.src])
426                     last_info[payload_info.src] = next_info
427                     self.assertTrue(next_info is not None)
428                     self.assertEqual(packet_index, next_info.index)
429                     saved_packet = next_info.data
430                     # Check standard fields
431                     self.assertEqual(ip.src, saved_packet[ip_version].src)
432                     self.assertEqual(ip.dst, saved_packet[ip_version].dst)
433                     p = self.proto_map[payload_info.proto]
434                     if p == 'TCP':
435                         tcp = packet[TCP]
436                         self.assertEqual(tcp.sport, saved_packet[
437                             TCP].sport)
438                         self.assertEqual(tcp.dport, saved_packet[
439                             TCP].dport)
440                     elif p == 'UDP':
441                         udp = packet[UDP]
442                         self.assertEqual(udp.sport, saved_packet[
443                             UDP].sport)
444                         self.assertEqual(udp.dport, saved_packet[
445                             UDP].dport)
446                 except:
447                     self.logger.error(ppp("Unexpected or invalid packet:",
448                                           packet))
449                     raise
450         for i in self.pg_interfaces:
451             remaining_packet = self.get_next_packet_info_for_interface2(
452                 i, dst_sw_if_index, last_info[i.sw_if_index])
453             self.assertTrue(
454                 remaining_packet is None,
455                 "Port %u: Packet expected from source %u didn't arrive" %
456                 (dst_sw_if_index, i.sw_if_index))
457
458     def run_traffic_no_check(self):
459         # Test
460         # Create incoming packet streams for packet-generator interfaces
461         for i in self.pg_interfaces:
462             if self.flows.__contains__(i):
463                 pkts = self.create_stream(i, self.pg_if_packet_sizes)
464                 if len(pkts) > 0:
465                     i.add_stream(pkts)
466
467         # Enable packet capture and start packet sending
468         self.pg_enable_capture(self.pg_interfaces)
469         self.pg_start()
470
471     def run_verify_test(self, traffic_type=0, ip_type=0, proto=-1, ports=0,
472                         frags=False, pkt_raw=True, etype=-1):
473         # Test
474         # Create incoming packet streams for packet-generator interfaces
475         pkts_cnt = 0
476         for i in self.pg_interfaces:
477             if self.flows.__contains__(i):
478                 pkts = self.create_stream(i, self.pg_if_packet_sizes,
479                                           traffic_type, ip_type, proto, ports,
480                                           frags, pkt_raw, etype)
481                 if len(pkts) > 0:
482                     i.add_stream(pkts)
483                     pkts_cnt += len(pkts)
484
485         # Enable packet capture and start packet sendingself.IPV
486         self.pg_enable_capture(self.pg_interfaces)
487         self.pg_start()
488         self.logger.info("sent packets count: %d" % pkts_cnt)
489
490         # Verify
491         # Verify outgoing packet streams per packet-generator interface
492         for src_if in self.pg_interfaces:
493             if self.flows.__contains__(src_if):
494                 for dst_if in self.flows[src_if]:
495                     capture = dst_if.get_capture(pkts_cnt)
496                     self.logger.info("Verifying capture on interface %s" %
497                                      dst_if.name)
498                     self.verify_capture(dst_if, capture,
499                                         traffic_type, ip_type, etype)
500
501     def run_verify_negat_test(self, traffic_type=0, ip_type=0, proto=-1,
502                               ports=0, frags=False, etype=-1):
503         # Test
504         pkts_cnt = 0
505         self.reset_packet_infos()
506         for i in self.pg_interfaces:
507             if self.flows.__contains__(i):
508                 pkts = self.create_stream(i, self.pg_if_packet_sizes,
509                                           traffic_type, ip_type, proto, ports,
510                                           frags, True, etype)
511                 if len(pkts) > 0:
512                     i.add_stream(pkts)
513                     pkts_cnt += len(pkts)
514
515         # Enable packet capture and start packet sending
516         self.pg_enable_capture(self.pg_interfaces)
517         self.pg_start()
518         self.logger.info("sent packets count: %d" % pkts_cnt)
519
520         # Verify
521         # Verify outgoing packet streams per packet-generator interface
522         for src_if in self.pg_interfaces:
523             if self.flows.__contains__(src_if):
524                 for dst_if in self.flows[src_if]:
525                     self.logger.info("Verifying capture on interface %s" %
526                                      dst_if.name)
527                     capture = dst_if.get_capture(0)
528                     self.assertEqual(len(capture), 0)
529
530     def test_0000_warmup_test(self):
531         """ ACL plugin version check; learn MACs
532         """
533         reply = self.vapi.papi.acl_plugin_get_version()
534         self.assertEqual(reply.major, 1)
535         self.logger.info("Working with ACL plugin version: %d.%d" % (
536             reply.major, reply.minor))
537         # minor version changes are non breaking
538         # self.assertEqual(reply.minor, 0)
539
540     def test_0001_acl_create(self):
541         """ ACL create/delete test
542         """
543
544         self.logger.info("ACLP_TEST_START_0001")
545         # Add an ACL
546         r = [{'is_permit': 1, 'is_ipv6': 0, 'proto': 17,
547               'srcport_or_icmptype_first': 1234,
548               'srcport_or_icmptype_last': 1235,
549               'src_ip_prefix_len': 0,
550               'src_ip_addr': b'\x00\x00\x00\x00',
551               'dstport_or_icmpcode_first': 1234,
552               'dstport_or_icmpcode_last': 1234,
553               'dst_ip_addr': b'\x00\x00\x00\x00',
554               'dst_ip_prefix_len': 0}]
555         # Test 1: add a new ACL
556         reply = self.vapi.acl_add_replace(acl_index=4294967295, r=r,
557                                           tag=b"permit 1234")
558         self.assertEqual(reply.retval, 0)
559         # The very first ACL gets #0
560         self.assertEqual(reply.acl_index, 0)
561         first_acl = reply.acl_index
562         rr = self.vapi.acl_dump(reply.acl_index)
563         self.logger.info("Dumped ACL: " + str(rr))
564         self.assertEqual(len(rr), 1)
565         # We should have the same number of ACL entries as we had asked
566         self.assertEqual(len(rr[0].r), len(r))
567         # The rules should be the same. But because the submitted and returned
568         # are different types, we need to iterate over rules and keys to get
569         # to basic values.
570         for i_rule in range(0, len(r) - 1):
571             for rule_key in r[i_rule]:
572                 self.assertEqual(rr[0].r[i_rule][rule_key],
573                                  r[i_rule][rule_key])
574
575         # Add a deny-1234 ACL
576         r_deny = [{'is_permit': 0, 'is_ipv6': 0, 'proto': 17,
577                    'srcport_or_icmptype_first': 1234,
578                    'srcport_or_icmptype_last': 1235,
579                    'src_ip_prefix_len': 0,
580                    'src_ip_addr': b'\x00\x00\x00\x00',
581                    'dstport_or_icmpcode_first': 1234,
582                    'dstport_or_icmpcode_last': 1234,
583                    'dst_ip_addr': b'\x00\x00\x00\x00',
584                    'dst_ip_prefix_len': 0},
585                   {'is_permit': 1, 'is_ipv6': 0, 'proto': 17,
586                    'srcport_or_icmptype_first': 0,
587                    'srcport_or_icmptype_last': 0,
588                    'src_ip_prefix_len': 0,
589                    'src_ip_addr': b'\x00\x00\x00\x00',
590                    'dstport_or_icmpcode_first': 0,
591                    'dstport_or_icmpcode_last': 0,
592                    'dst_ip_addr': b'\x00\x00\x00\x00',
593                    'dst_ip_prefix_len': 0}]
594
595         reply = self.vapi.acl_add_replace(acl_index=4294967295, r=r_deny,
596                                           tag=b"deny 1234;permit all")
597         self.assertEqual(reply.retval, 0)
598         # The second ACL gets #1
599         self.assertEqual(reply.acl_index, 1)
600         second_acl = reply.acl_index
601
602         # Test 2: try to modify a nonexistent ACL
603         reply = self.vapi.acl_add_replace(acl_index=432, r=r,
604                                           tag=b"FFFF:FFFF", expected_retval=-6)
605         self.assertEqual(reply.retval, -6)
606         # The ACL number should pass through
607         self.assertEqual(reply.acl_index, 432)
608         # apply an ACL on an interface inbound, try to delete ACL, must fail
609         self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
610                                              n_input=1,
611                                              acls=[first_acl])
612         reply = self.vapi.acl_del(acl_index=first_acl, expected_retval=-142)
613         # Unapply an ACL and then try to delete it - must be ok
614         self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
615                                              n_input=0,
616                                              acls=[])
617         reply = self.vapi.acl_del(acl_index=first_acl, expected_retval=0)
618
619         # apply an ACL on an interface outbound, try to delete ACL, must fail
620         self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
621                                              n_input=0,
622                                              acls=[second_acl])
623         reply = self.vapi.acl_del(acl_index=second_acl, expected_retval=-143)
624         # Unapply the ACL and then try to delete it - must be ok
625         self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
626                                              n_input=0,
627                                              acls=[])
628         reply = self.vapi.acl_del(acl_index=second_acl, expected_retval=0)
629
630         # try to apply a nonexistent ACL - must fail
631         self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
632                                              n_input=1,
633                                              acls=[first_acl],
634                                              expected_retval=-6)
635
636         self.logger.info("ACLP_TEST_FINISH_0001")
637
638     def test_0002_acl_permit_apply(self):
639         """ permit ACL apply test
640         """
641         self.logger.info("ACLP_TEST_START_0002")
642
643         rules = []
644         rules.append(self.create_rule(self.IPV4, self.PERMIT,
645                      0, self.proto[self.IP][self.UDP]))
646         rules.append(self.create_rule(self.IPV4, self.PERMIT,
647                      0, self.proto[self.IP][self.TCP]))
648
649         # Apply rules
650         self.apply_rules(rules, b"permit per-flow")
651
652         # Traffic should still pass
653         self.run_verify_test(self.IP, self.IPV4, -1)
654         self.logger.info("ACLP_TEST_FINISH_0002")
655
656     def test_0003_acl_deny_apply(self):
657         """ deny ACL apply test
658         """
659         self.logger.info("ACLP_TEST_START_0003")
660         # Add a deny-flows ACL
661         rules = []
662         rules.append(self.create_rule(self.IPV4, self.DENY,
663                      self.PORTS_ALL, self.proto[self.IP][self.UDP]))
664         # Permit ip any any in the end
665         rules.append(self.create_rule(self.IPV4, self.PERMIT,
666                                       self.PORTS_ALL, 0))
667
668         # Apply rules
669         self.apply_rules(rules, b"deny per-flow;permit all")
670
671         # Traffic should not pass
672         self.run_verify_negat_test(self.IP, self.IPV4,
673                                    self.proto[self.IP][self.UDP])
674         self.logger.info("ACLP_TEST_FINISH_0003")
675         # self.assertEqual(1, 0)
676
677     def test_0004_vpp624_permit_icmpv4(self):
678         """ VPP_624 permit ICMPv4
679         """
680         self.logger.info("ACLP_TEST_START_0004")
681
682         # Add an ACL
683         rules = []
684         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
685                                       self.proto[self.ICMP][self.ICMPv4]))
686         # deny ip any any in the end
687         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
688
689         # Apply rules
690         self.apply_rules(rules, b"permit icmpv4")
691
692         # Traffic should still pass
693         self.run_verify_test(self.ICMP, self.IPV4,
694                              self.proto[self.ICMP][self.ICMPv4])
695
696         self.logger.info("ACLP_TEST_FINISH_0004")
697
698     def test_0005_vpp624_permit_icmpv6(self):
699         """ VPP_624 permit ICMPv6
700         """
701         self.logger.info("ACLP_TEST_START_0005")
702
703         # Add an ACL
704         rules = []
705         rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
706                                       self.proto[self.ICMP][self.ICMPv6]))
707         # deny ip any any in the end
708         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
709
710         # Apply rules
711         self.apply_rules(rules, b"permit icmpv6")
712
713         # Traffic should still pass
714         self.run_verify_test(self.ICMP, self.IPV6,
715                              self.proto[self.ICMP][self.ICMPv6])
716
717         self.logger.info("ACLP_TEST_FINISH_0005")
718
719     def test_0006_vpp624_deny_icmpv4(self):
720         """ VPP_624 deny ICMPv4
721         """
722         self.logger.info("ACLP_TEST_START_0006")
723         # Add an ACL
724         rules = []
725         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
726                                       self.proto[self.ICMP][self.ICMPv4]))
727         # permit ip any any in the end
728         rules.append(self.create_rule(self.IPV4, self.PERMIT,
729                                       self.PORTS_ALL, 0))
730
731         # Apply rules
732         self.apply_rules(rules, b"deny icmpv4")
733
734         # Traffic should not pass
735         self.run_verify_negat_test(self.ICMP, self.IPV4, 0)
736
737         self.logger.info("ACLP_TEST_FINISH_0006")
738
739     def test_0007_vpp624_deny_icmpv6(self):
740         """ VPP_624 deny ICMPv6
741         """
742         self.logger.info("ACLP_TEST_START_0007")
743         # Add an ACL
744         rules = []
745         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
746                                       self.proto[self.ICMP][self.ICMPv6]))
747         # deny ip any any in the end
748         rules.append(self.create_rule(self.IPV6, self.PERMIT,
749                                       self.PORTS_ALL, 0))
750
751         # Apply rules
752         self.apply_rules(rules, b"deny icmpv6")
753
754         # Traffic should not pass
755         self.run_verify_negat_test(self.ICMP, self.IPV6, 0)
756
757         self.logger.info("ACLP_TEST_FINISH_0007")
758
759     def test_0008_tcp_permit_v4(self):
760         """ permit TCPv4
761         """
762         self.logger.info("ACLP_TEST_START_0008")
763
764         # Add an ACL
765         rules = []
766         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
767                      self.proto[self.IP][self.TCP]))
768         # deny ip any any in the end
769         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
770
771         # Apply rules
772         self.apply_rules(rules, b"permit ipv4 tcp")
773
774         # Traffic should still pass
775         self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP])
776
777         self.logger.info("ACLP_TEST_FINISH_0008")
778
779     def test_0009_tcp_permit_v6(self):
780         """ permit TCPv6
781         """
782         self.logger.info("ACLP_TEST_START_0009")
783
784         # Add an ACL
785         rules = []
786         rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
787                                       self.proto[self.IP][self.TCP]))
788         # deny ip any any in the end
789         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
790
791         # Apply rules
792         self.apply_rules(rules, b"permit ip6 tcp")
793
794         # Traffic should still pass
795         self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.TCP])
796
797         self.logger.info("ACLP_TEST_FINISH_0008")
798
799     def test_0010_udp_permit_v4(self):
800         """ permit UDPv4
801         """
802         self.logger.info("ACLP_TEST_START_0010")
803
804         # Add an ACL
805         rules = []
806         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
807                                       self.proto[self.IP][self.UDP]))
808         # deny ip any any in the end
809         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
810
811         # Apply rules
812         self.apply_rules(rules, b"permit ipv udp")
813
814         # Traffic should still pass
815         self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP])
816
817         self.logger.info("ACLP_TEST_FINISH_0010")
818
819     def test_0011_udp_permit_v6(self):
820         """ permit UDPv6
821         """
822         self.logger.info("ACLP_TEST_START_0011")
823
824         # Add an ACL
825         rules = []
826         rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
827                                       self.proto[self.IP][self.UDP]))
828         # deny ip any any in the end
829         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
830
831         # Apply rules
832         self.apply_rules(rules, b"permit ip6 udp")
833
834         # Traffic should still pass
835         self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.UDP])
836
837         self.logger.info("ACLP_TEST_FINISH_0011")
838
839     def test_0012_tcp_deny(self):
840         """ deny TCPv4/v6
841         """
842         self.logger.info("ACLP_TEST_START_0012")
843
844         # Add an ACL
845         rules = []
846         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
847                                       self.proto[self.IP][self.TCP]))
848         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
849                                       self.proto[self.IP][self.TCP]))
850         # permit ip any any in the end
851         rules.append(self.create_rule(self.IPV4, self.PERMIT,
852                                       self.PORTS_ALL, 0))
853         rules.append(self.create_rule(self.IPV6, self.PERMIT,
854                                       self.PORTS_ALL, 0))
855
856         # Apply rules
857         self.apply_rules(rules, b"deny ip4/ip6 tcp")
858
859         # Traffic should not pass
860         self.run_verify_negat_test(self.IP, self.IPRANDOM,
861                                    self.proto[self.IP][self.TCP])
862
863         self.logger.info("ACLP_TEST_FINISH_0012")
864
865     def test_0013_udp_deny(self):
866         """ deny UDPv4/v6
867         """
868         self.logger.info("ACLP_TEST_START_0013")
869
870         # Add an ACL
871         rules = []
872         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
873                                       self.proto[self.IP][self.UDP]))
874         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
875                                       self.proto[self.IP][self.UDP]))
876         # permit ip any any in the end
877         rules.append(self.create_rule(self.IPV4, self.PERMIT,
878                                       self.PORTS_ALL, 0))
879         rules.append(self.create_rule(self.IPV6, self.PERMIT,
880                                       self.PORTS_ALL, 0))
881
882         # Apply rules
883         self.apply_rules(rules, b"deny ip4/ip6 udp")
884
885         # Traffic should not pass
886         self.run_verify_negat_test(self.IP, self.IPRANDOM,
887                                    self.proto[self.IP][self.UDP])
888
889         self.logger.info("ACLP_TEST_FINISH_0013")
890
891     def test_0014_acl_dump(self):
892         """ verify add/dump acls
893         """
894         self.logger.info("ACLP_TEST_START_0014")
895
896         r = [[self.IPV4, self.PERMIT, 1234, self.proto[self.IP][self.TCP]],
897              [self.IPV4, self.PERMIT, 2345, self.proto[self.IP][self.UDP]],
898              [self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.TCP]],
899              [self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.UDP]],
900              [self.IPV4, self.PERMIT, 5, self.proto[self.ICMP][self.ICMPv4]],
901              [self.IPV6, self.PERMIT, 4321, self.proto[self.IP][self.TCP]],
902              [self.IPV6, self.PERMIT, 5432, self.proto[self.IP][self.UDP]],
903              [self.IPV6, self.PERMIT, 0, self.proto[self.IP][self.TCP]],
904              [self.IPV6, self.PERMIT, 0, self.proto[self.IP][self.UDP]],
905              [self.IPV6, self.PERMIT, 6, self.proto[self.ICMP][self.ICMPv6]],
906              [self.IPV4, self.DENY, self.PORTS_ALL, 0],
907              [self.IPV4, self.DENY, 1234, self.proto[self.IP][self.TCP]],
908              [self.IPV4, self.DENY, 2345, self.proto[self.IP][self.UDP]],
909              [self.IPV4, self.DENY, 5, self.proto[self.ICMP][self.ICMPv4]],
910              [self.IPV6, self.DENY, 4321, self.proto[self.IP][self.TCP]],
911              [self.IPV6, self.DENY, 5432, self.proto[self.IP][self.UDP]],
912              [self.IPV6, self.DENY, 6, self.proto[self.ICMP][self.ICMPv6]],
913              [self.IPV6, self.DENY, self.PORTS_ALL, 0]
914              ]
915
916         # Add and verify new ACLs
917         rules = []
918         for i in range(len(r)):
919             rules.append(self.create_rule(r[i][0], r[i][1], r[i][2], r[i][3]))
920
921         reply = self.vapi.acl_add_replace(acl_index=4294967295, r=rules)
922         result = self.vapi.acl_dump(reply.acl_index)
923
924         i = 0
925         for drules in result:
926             for dr in drules.r:
927                 self.assertEqual(dr.is_ipv6, r[i][0])
928                 self.assertEqual(dr.is_permit, r[i][1])
929                 self.assertEqual(dr.proto, r[i][3])
930
931                 if r[i][2] > 0:
932                     self.assertEqual(dr.srcport_or_icmptype_first, r[i][2])
933                 else:
934                     if r[i][2] < 0:
935                         self.assertEqual(dr.srcport_or_icmptype_first, 0)
936                         self.assertEqual(dr.srcport_or_icmptype_last, 65535)
937                     else:
938                         if dr.proto == self.proto[self.IP][self.TCP]:
939                             self.assertGreater(dr.srcport_or_icmptype_first,
940                                                self.tcp_sport_from-1)
941                             self.assertLess(dr.srcport_or_icmptype_first,
942                                             self.tcp_sport_to+1)
943                             self.assertGreater(dr.dstport_or_icmpcode_last,
944                                                self.tcp_dport_from-1)
945                             self.assertLess(dr.dstport_or_icmpcode_last,
946                                             self.tcp_dport_to+1)
947                         elif dr.proto == self.proto[self.IP][self.UDP]:
948                             self.assertGreater(dr.srcport_or_icmptype_first,
949                                                self.udp_sport_from-1)
950                             self.assertLess(dr.srcport_or_icmptype_first,
951                                             self.udp_sport_to+1)
952                             self.assertGreater(dr.dstport_or_icmpcode_last,
953                                                self.udp_dport_from-1)
954                             self.assertLess(dr.dstport_or_icmpcode_last,
955                                             self.udp_dport_to+1)
956                 i += 1
957
958         self.logger.info("ACLP_TEST_FINISH_0014")
959
960     def test_0015_tcp_permit_port_v4(self):
961         """ permit single TCPv4
962         """
963         self.logger.info("ACLP_TEST_START_0015")
964
965         port = random.randint(0, 65535)
966         # Add an ACL
967         rules = []
968         rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
969                                       self.proto[self.IP][self.TCP]))
970         # deny ip any any in the end
971         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
972
973         # Apply rules
974         self.apply_rules(rules, b"permit ip4 tcp %d" % port)
975
976         # Traffic should still pass
977         self.run_verify_test(self.IP, self.IPV4,
978                              self.proto[self.IP][self.TCP], port)
979
980         self.logger.info("ACLP_TEST_FINISH_0015")
981
982     def test_0016_udp_permit_port_v4(self):
983         """ permit single UDPv4
984         """
985         self.logger.info("ACLP_TEST_START_0016")
986
987         port = random.randint(0, 65535)
988         # Add an ACL
989         rules = []
990         rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
991                                       self.proto[self.IP][self.UDP]))
992         # deny ip any any in the end
993         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
994
995         # Apply rules
996         self.apply_rules(rules, b"permit ip4 tcp %d" % port)
997
998         # Traffic should still pass
999         self.run_verify_test(self.IP, self.IPV4,
1000                              self.proto[self.IP][self.UDP], port)
1001
1002         self.logger.info("ACLP_TEST_FINISH_0016")
1003
1004     def test_0017_tcp_permit_port_v6(self):
1005         """ permit single TCPv6
1006         """
1007         self.logger.info("ACLP_TEST_START_0017")
1008
1009         port = random.randint(0, 65535)
1010         # Add an ACL
1011         rules = []
1012         rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
1013                                       self.proto[self.IP][self.TCP]))
1014         # deny ip any any in the end
1015         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1016
1017         # Apply rules
1018         self.apply_rules(rules, b"permit ip4 tcp %d" % port)
1019
1020         # Traffic should still pass
1021         self.run_verify_test(self.IP, self.IPV6,
1022                              self.proto[self.IP][self.TCP], port)
1023
1024         self.logger.info("ACLP_TEST_FINISH_0017")
1025
1026     def test_0018_udp_permit_port_v6(self):
1027         """ permit single UPPv6
1028         """
1029         self.logger.info("ACLP_TEST_START_0018")
1030
1031         port = random.randint(0, 65535)
1032         # Add an ACL
1033         rules = []
1034         rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
1035                                       self.proto[self.IP][self.UDP]))
1036         # deny ip any any in the end
1037         rules.append(self.create_rule(self.IPV6, self.DENY,
1038                                       self.PORTS_ALL, 0))
1039
1040         # Apply rules
1041         self.apply_rules(rules, b"permit ip4 tcp %d" % port)
1042
1043         # Traffic should still pass
1044         self.run_verify_test(self.IP, self.IPV6,
1045                              self.proto[self.IP][self.UDP], port)
1046
1047         self.logger.info("ACLP_TEST_FINISH_0018")
1048
1049     def test_0019_udp_deny_port(self):
1050         """ deny single TCPv4/v6
1051         """
1052         self.logger.info("ACLP_TEST_START_0019")
1053
1054         port = random.randint(0, 65535)
1055         # Add an ACL
1056         rules = []
1057         rules.append(self.create_rule(self.IPV4, self.DENY, port,
1058                                       self.proto[self.IP][self.TCP]))
1059         rules.append(self.create_rule(self.IPV6, self.DENY, port,
1060                                       self.proto[self.IP][self.TCP]))
1061         # Permit ip any any in the end
1062         rules.append(self.create_rule(self.IPV4, self.PERMIT,
1063                                       self.PORTS_ALL, 0))
1064         rules.append(self.create_rule(self.IPV6, self.PERMIT,
1065                                       self.PORTS_ALL, 0))
1066
1067         # Apply rules
1068         self.apply_rules(rules, b"deny ip4/ip6 udp %d" % port)
1069
1070         # Traffic should not pass
1071         self.run_verify_negat_test(self.IP, self.IPRANDOM,
1072                                    self.proto[self.IP][self.TCP], port)
1073
1074         self.logger.info("ACLP_TEST_FINISH_0019")
1075
1076     def test_0020_udp_deny_port(self):
1077         """ deny single UDPv4/v6
1078         """
1079         self.logger.info("ACLP_TEST_START_0020")
1080
1081         port = random.randint(0, 65535)
1082         # Add an ACL
1083         rules = []
1084         rules.append(self.create_rule(self.IPV4, self.DENY, port,
1085                                       self.proto[self.IP][self.UDP]))
1086         rules.append(self.create_rule(self.IPV6, self.DENY, port,
1087                                       self.proto[self.IP][self.UDP]))
1088         # Permit ip any any in the end
1089         rules.append(self.create_rule(self.IPV4, self.PERMIT,
1090                                       self.PORTS_ALL, 0))
1091         rules.append(self.create_rule(self.IPV6, self.PERMIT,
1092                                       self.PORTS_ALL, 0))
1093
1094         # Apply rules
1095         self.apply_rules(rules, b"deny ip4/ip6 udp %d" % port)
1096
1097         # Traffic should not pass
1098         self.run_verify_negat_test(self.IP, self.IPRANDOM,
1099                                    self.proto[self.IP][self.UDP], port)
1100
1101         self.logger.info("ACLP_TEST_FINISH_0020")
1102
1103     def test_0021_udp_deny_port_verify_fragment_deny(self):
1104         """ deny single UDPv4/v6, permit ip any, verify non-initial fragment
1105         blocked
1106         """
1107         self.logger.info("ACLP_TEST_START_0021")
1108
1109         port = random.randint(0, 65535)
1110         # Add an ACL
1111         rules = []
1112         rules.append(self.create_rule(self.IPV4, self.DENY, port,
1113                                       self.proto[self.IP][self.UDP]))
1114         rules.append(self.create_rule(self.IPV6, self.DENY, port,
1115                                       self.proto[self.IP][self.UDP]))
1116         # deny ip any any in the end
1117         rules.append(self.create_rule(self.IPV4, self.PERMIT,
1118                                       self.PORTS_ALL, 0))
1119         rules.append(self.create_rule(self.IPV6, self.PERMIT,
1120                                       self.PORTS_ALL, 0))
1121
1122         # Apply rules
1123         self.apply_rules(rules, b"deny ip4/ip6 udp %d" % port)
1124
1125         # Traffic should not pass
1126         self.run_verify_negat_test(self.IP, self.IPRANDOM,
1127                                    self.proto[self.IP][self.UDP], port, True)
1128
1129         self.logger.info("ACLP_TEST_FINISH_0021")
1130
1131     def test_0022_zero_length_udp_ipv4(self):
1132         """ VPP-687 zero length udp ipv4 packet"""
1133         self.logger.info("ACLP_TEST_START_0022")
1134
1135         port = random.randint(0, 65535)
1136         # Add an ACL
1137         rules = []
1138         rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
1139                                       self.proto[self.IP][self.UDP]))
1140         # deny ip any any in the end
1141         rules.append(
1142             self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1143
1144         # Apply rules
1145         self.apply_rules(rules, b"permit empty udp ip4 %d" % port)
1146
1147         # Traffic should still pass
1148         # Create incoming packet streams for packet-generator interfaces
1149         pkts_cnt = 0
1150         pkts = self.create_stream(self.pg0, self.pg_if_packet_sizes,
1151                                   self.IP, self.IPV4,
1152                                   self.proto[self.IP][self.UDP], port,
1153                                   False, False)
1154         if len(pkts) > 0:
1155             self.pg0.add_stream(pkts)
1156             pkts_cnt += len(pkts)
1157
1158         # Enable packet capture and start packet sendingself.IPV
1159         self.pg_enable_capture(self.pg_interfaces)
1160         self.pg_start()
1161
1162         self.pg1.get_capture(pkts_cnt)
1163
1164         self.logger.info("ACLP_TEST_FINISH_0022")
1165
1166     def test_0023_zero_length_udp_ipv6(self):
1167         """ VPP-687 zero length udp ipv6 packet"""
1168         self.logger.info("ACLP_TEST_START_0023")
1169
1170         port = random.randint(0, 65535)
1171         # Add an ACL
1172         rules = []
1173         rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
1174                                       self.proto[self.IP][self.UDP]))
1175         # deny ip any any in the end
1176         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1177
1178         # Apply rules
1179         self.apply_rules(rules, b"permit empty udp ip6 %d" % port)
1180
1181         # Traffic should still pass
1182         # Create incoming packet streams for packet-generator interfaces
1183         pkts_cnt = 0
1184         pkts = self.create_stream(self.pg0, self.pg_if_packet_sizes,
1185                                   self.IP, self.IPV6,
1186                                   self.proto[self.IP][self.UDP], port,
1187                                   False, False)
1188         if len(pkts) > 0:
1189             self.pg0.add_stream(pkts)
1190             pkts_cnt += len(pkts)
1191
1192         # Enable packet capture and start packet sendingself.IPV
1193         self.pg_enable_capture(self.pg_interfaces)
1194         self.pg_start()
1195
1196         # Verify outgoing packet streams per packet-generator interface
1197         self.pg1.get_capture(pkts_cnt)
1198
1199         self.logger.info("ACLP_TEST_FINISH_0023")
1200
1201     def test_0108_tcp_permit_v4(self):
1202         """ permit TCPv4 + non-match range
1203         """
1204         self.logger.info("ACLP_TEST_START_0108")
1205
1206         # Add an ACL
1207         rules = []
1208         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1209                      self.proto[self.IP][self.TCP]))
1210         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1211                      self.proto[self.IP][self.TCP]))
1212         # deny ip any any in the end
1213         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1214
1215         # Apply rules
1216         self.apply_rules(rules, b"permit ipv4 tcp")
1217
1218         # Traffic should still pass
1219         self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP])
1220
1221         self.logger.info("ACLP_TEST_FINISH_0108")
1222
1223     def test_0109_tcp_permit_v6(self):
1224         """ permit TCPv6 + non-match range
1225         """
1226         self.logger.info("ACLP_TEST_START_0109")
1227
1228         # Add an ACL
1229         rules = []
1230         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE_2,
1231                                       self.proto[self.IP][self.TCP]))
1232         rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
1233                                       self.proto[self.IP][self.TCP]))
1234         # deny ip any any in the end
1235         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1236
1237         # Apply rules
1238         self.apply_rules(rules, b"permit ip6 tcp")
1239
1240         # Traffic should still pass
1241         self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.TCP])
1242
1243         self.logger.info("ACLP_TEST_FINISH_0109")
1244
1245     def test_0110_udp_permit_v4(self):
1246         """ permit UDPv4 + non-match range
1247         """
1248         self.logger.info("ACLP_TEST_START_0110")
1249
1250         # Add an ACL
1251         rules = []
1252         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1253                                       self.proto[self.IP][self.UDP]))
1254         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1255                                       self.proto[self.IP][self.UDP]))
1256         # deny ip any any in the end
1257         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1258
1259         # Apply rules
1260         self.apply_rules(rules, b"permit ipv4 udp")
1261
1262         # Traffic should still pass
1263         self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP])
1264
1265         self.logger.info("ACLP_TEST_FINISH_0110")
1266
1267     def test_0111_udp_permit_v6(self):
1268         """ permit UDPv6 + non-match range
1269         """
1270         self.logger.info("ACLP_TEST_START_0111")
1271
1272         # Add an ACL
1273         rules = []
1274         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE_2,
1275                                       self.proto[self.IP][self.UDP]))
1276         rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
1277                                       self.proto[self.IP][self.UDP]))
1278         # deny ip any any in the end
1279         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1280
1281         # Apply rules
1282         self.apply_rules(rules, b"permit ip6 udp")
1283
1284         # Traffic should still pass
1285         self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.UDP])
1286
1287         self.logger.info("ACLP_TEST_FINISH_0111")
1288
1289     def test_0112_tcp_deny(self):
1290         """ deny TCPv4/v6 + non-match range
1291         """
1292         self.logger.info("ACLP_TEST_START_0112")
1293
1294         # Add an ACL
1295         rules = []
1296         rules.append(self.create_rule(self.IPV4, self.PERMIT,
1297                                       self.PORTS_RANGE_2,
1298                                       self.proto[self.IP][self.TCP]))
1299         rules.append(self.create_rule(self.IPV6, self.PERMIT,
1300                                       self.PORTS_RANGE_2,
1301                                       self.proto[self.IP][self.TCP]))
1302         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
1303                                       self.proto[self.IP][self.TCP]))
1304         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
1305                                       self.proto[self.IP][self.TCP]))
1306         # permit ip any any in the end
1307         rules.append(self.create_rule(self.IPV4, self.PERMIT,
1308                                       self.PORTS_ALL, 0))
1309         rules.append(self.create_rule(self.IPV6, self.PERMIT,
1310                                       self.PORTS_ALL, 0))
1311
1312         # Apply rules
1313         self.apply_rules(rules, b"deny ip4/ip6 tcp")
1314
1315         # Traffic should not pass
1316         self.run_verify_negat_test(self.IP, self.IPRANDOM,
1317                                    self.proto[self.IP][self.TCP])
1318
1319         self.logger.info("ACLP_TEST_FINISH_0112")
1320
1321     def test_0113_udp_deny(self):
1322         """ deny UDPv4/v6 + non-match range
1323         """
1324         self.logger.info("ACLP_TEST_START_0113")
1325
1326         # Add an ACL
1327         rules = []
1328         rules.append(self.create_rule(self.IPV4, self.PERMIT,
1329                                       self.PORTS_RANGE_2,
1330                                       self.proto[self.IP][self.UDP]))
1331         rules.append(self.create_rule(self.IPV6, self.PERMIT,
1332                                       self.PORTS_RANGE_2,
1333                                       self.proto[self.IP][self.UDP]))
1334         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
1335                                       self.proto[self.IP][self.UDP]))
1336         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
1337                                       self.proto[self.IP][self.UDP]))
1338         # permit ip any any in the end
1339         rules.append(self.create_rule(self.IPV4, self.PERMIT,
1340                                       self.PORTS_ALL, 0))
1341         rules.append(self.create_rule(self.IPV6, self.PERMIT,
1342                                       self.PORTS_ALL, 0))
1343
1344         # Apply rules
1345         self.apply_rules(rules, b"deny ip4/ip6 udp")
1346
1347         # Traffic should not pass
1348         self.run_verify_negat_test(self.IP, self.IPRANDOM,
1349                                    self.proto[self.IP][self.UDP])
1350
1351         self.logger.info("ACLP_TEST_FINISH_0113")
1352
1353     def test_0300_tcp_permit_v4_etype_aaaa(self):
1354         """ permit TCPv4, send 0xAAAA etype
1355         """
1356         self.logger.info("ACLP_TEST_START_0300")
1357
1358         # Add an ACL
1359         rules = []
1360         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1361                      self.proto[self.IP][self.TCP]))
1362         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1363                      self.proto[self.IP][self.TCP]))
1364         # deny ip any any in the end
1365         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1366
1367         # Apply rules
1368         self.apply_rules(rules, b"permit ipv4 tcp")
1369
1370         # Traffic should still pass also for an odd ethertype
1371         self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP],
1372                              0, False, True, 0xaaaa)
1373         self.logger.info("ACLP_TEST_FINISH_0300")
1374
1375     def test_0305_tcp_permit_v4_etype_blacklist_aaaa(self):
1376         """ permit TCPv4, whitelist 0x0BBB ethertype, send 0xAAAA-blocked
1377         """
1378         self.logger.info("ACLP_TEST_START_0305")
1379
1380         # Add an ACL
1381         rules = []
1382         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1383                      self.proto[self.IP][self.TCP]))
1384         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1385                      self.proto[self.IP][self.TCP]))
1386         # deny ip any any in the end
1387         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1388
1389         # Apply rules
1390         self.apply_rules(rules, b"permit ipv4 tcp")
1391
1392         # whitelist the 0xbbbb etype - so the 0xaaaa should be blocked
1393         self.etype_whitelist([0xbbb], 1)
1394
1395         # The oddball ethertype should be blocked
1396         self.run_verify_negat_test(self.IP, self.IPV4,
1397                                    self.proto[self.IP][self.TCP],
1398                                    0, False, 0xaaaa)
1399
1400         # remove the whitelist
1401         self.etype_whitelist([], 0)
1402
1403         self.logger.info("ACLP_TEST_FINISH_0305")
1404
1405     def test_0306_tcp_permit_v4_etype_blacklist_aaaa(self):
1406         """ permit TCPv4, whitelist 0x0BBB ethertype, send 0x0BBB - pass
1407         """
1408         self.logger.info("ACLP_TEST_START_0306")
1409
1410         # Add an ACL
1411         rules = []
1412         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1413                      self.proto[self.IP][self.TCP]))
1414         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1415                      self.proto[self.IP][self.TCP]))
1416         # deny ip any any in the end
1417         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1418
1419         # Apply rules
1420         self.apply_rules(rules, b"permit ipv4 tcp")
1421
1422         # whitelist the 0xbbbb etype - so the 0xaaaa should be blocked
1423         self.etype_whitelist([0xbbb], 1)
1424
1425         # The whitelisted traffic, should pass
1426         self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP],
1427                              0, False, True, 0x0bbb)
1428
1429         # remove the whitelist, the previously blocked 0xAAAA should pass now
1430         self.etype_whitelist([], 0)
1431
1432         self.logger.info("ACLP_TEST_FINISH_0306")
1433
1434     def test_0307_tcp_permit_v4_etype_blacklist_aaaa(self):
1435         """ permit TCPv4, whitelist 0x0BBB, remove, send 0xAAAA - pass
1436         """
1437         self.logger.info("ACLP_TEST_START_0307")
1438
1439         # Add an ACL
1440         rules = []
1441         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1442                      self.proto[self.IP][self.TCP]))
1443         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1444                      self.proto[self.IP][self.TCP]))
1445         # deny ip any any in the end
1446         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1447
1448         # Apply rules
1449         self.apply_rules(rules, b"permit ipv4 tcp")
1450
1451         # whitelist the 0xbbbb etype - so the 0xaaaa should be blocked
1452         self.etype_whitelist([0xbbb], 1)
1453         # remove the whitelist, the previously blocked 0xAAAA should pass now
1454         self.etype_whitelist([], 0)
1455
1456         # The whitelisted traffic, should pass
1457         self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP],
1458                              0, False, True, 0xaaaa)
1459
1460         self.logger.info("ACLP_TEST_FINISH_0306")
1461
1462     def test_0315_del_intf(self):
1463         """ apply an acl and delete the interface
1464         """
1465         self.logger.info("ACLP_TEST_START_0315")
1466
1467         # Add an ACL
1468         rules = []
1469         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1470                      self.proto[self.IP][self.TCP]))
1471         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1472                      self.proto[self.IP][self.TCP]))
1473         # deny ip any any in the end
1474         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1475
1476         # create an interface
1477         intf = []
1478         intf.append(VppLoInterface(self))
1479
1480         # Apply rules
1481         self.apply_rules_to(rules, b"permit ipv4 tcp", intf[0].sw_if_index)
1482
1483         # Remove the interface
1484         intf[0].remove_vpp_config()
1485
1486         self.logger.info("ACLP_TEST_FINISH_0315")
1487
1488 if __name__ == '__main__':
1489     unittest.main(testRunner=VppTestRunner)