test: Fix issues with new version of pycodestyle (VPP-1232)
[vpp.git] / test / test_acl_plugin.py
1 #!/usr/bin/env python
2 """ACL plugin Test Case HLD:
3 """
4
5 import unittest
6 import random
7
8 from scapy.packet import Raw
9 from scapy.layers.l2 import Ether
10 from scapy.layers.inet import IP, TCP, UDP, ICMP
11 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest
12 from scapy.layers.inet6 import IPv6ExtHdrFragment
13 from framework import VppTestCase, VppTestRunner
14 from util import Host, ppp
15
16 from vpp_lo_interface import VppLoInterface
17
18
19 class TestACLplugin(VppTestCase):
20     """ ACL plugin Test Case """
21
22     # traffic types
23     IP = 0
24     ICMP = 1
25
26     # IP version
27     IPRANDOM = -1
28     IPV4 = 0
29     IPV6 = 1
30
31     # rule types
32     DENY = 0
33     PERMIT = 1
34
35     # supported protocols
36     proto = [[6, 17], [1, 58]]
37     proto_map = {1: 'ICMP', 58: 'ICMPv6EchoRequest', 6: 'TCP', 17: 'UDP'}
38     ICMPv4 = 0
39     ICMPv6 = 1
40     TCP = 0
41     UDP = 1
42     PROTO_ALL = 0
43
44     # port ranges
45     PORTS_ALL = -1
46     PORTS_RANGE = 0
47     PORTS_RANGE_2 = 1
48     udp_sport_from = 10
49     udp_sport_to = udp_sport_from + 5
50     udp_dport_from = 20000
51     udp_dport_to = udp_dport_from + 5000
52     tcp_sport_from = 30
53     tcp_sport_to = tcp_sport_from + 5
54     tcp_dport_from = 40000
55     tcp_dport_to = tcp_dport_from + 5000
56
57     udp_sport_from_2 = 90
58     udp_sport_to_2 = udp_sport_from_2 + 5
59     udp_dport_from_2 = 30000
60     udp_dport_to_2 = udp_dport_from_2 + 5000
61     tcp_sport_from_2 = 130
62     tcp_sport_to_2 = tcp_sport_from_2 + 5
63     tcp_dport_from_2 = 20000
64     tcp_dport_to_2 = tcp_dport_from_2 + 5000
65
66     icmp4_type = 8  # echo request
67     icmp4_code = 3
68     icmp6_type = 128  # echo request
69     icmp6_code = 3
70
71     icmp4_type_2 = 8
72     icmp4_code_from_2 = 5
73     icmp4_code_to_2 = 20
74     icmp6_type_2 = 128
75     icmp6_code_from_2 = 8
76     icmp6_code_to_2 = 42
77
78     # Test variables
79     bd_id = 1
80
81     @classmethod
82     def setUpClass(cls):
83         """
84         Perform standard class setup (defined by class method setUpClass in
85         class VppTestCase) before running the test case, set test case related
86         variables and configure VPP.
87         """
88         super(TestACLplugin, cls).setUpClass()
89
90         try:
91             # Create 2 pg interfaces
92             cls.create_pg_interfaces(range(2))
93
94             # Packet flows mapping pg0 -> pg1, pg2 etc.
95             cls.flows = dict()
96             cls.flows[cls.pg0] = [cls.pg1]
97
98             # Packet sizes
99             cls.pg_if_packet_sizes = [64, 512, 1518, 9018]
100
101             # Create BD with MAC learning and unknown unicast flooding disabled
102             # and put interfaces to this BD
103             cls.vapi.bridge_domain_add_del(bd_id=cls.bd_id, uu_flood=1,
104                                            learn=1)
105             for pg_if in cls.pg_interfaces:
106                 cls.vapi.sw_interface_set_l2_bridge(pg_if.sw_if_index,
107                                                     bd_id=cls.bd_id)
108
109             # Set up all interfaces
110             for i in cls.pg_interfaces:
111                 i.admin_up()
112
113             # Mapping between packet-generator index and lists of test hosts
114             cls.hosts_by_pg_idx = dict()
115             for pg_if in cls.pg_interfaces:
116                 cls.hosts_by_pg_idx[pg_if.sw_if_index] = []
117
118             # Create list of deleted hosts
119             cls.deleted_hosts_by_pg_idx = dict()
120             for pg_if in cls.pg_interfaces:
121                 cls.deleted_hosts_by_pg_idx[pg_if.sw_if_index] = []
122
123             # warm-up the mac address tables
124             # self.warmup_test()
125
126         except Exception:
127             super(TestACLplugin, cls).tearDownClass()
128             raise
129
130     def setUp(self):
131         super(TestACLplugin, self).setUp()
132         self.reset_packet_infos()
133
134     def tearDown(self):
135         """
136         Show various debug prints after each test.
137         """
138         super(TestACLplugin, self).tearDown()
139         if not self.vpp_dead:
140             self.logger.info(self.vapi.ppcli("show l2fib verbose"))
141             self.logger.info(self.vapi.ppcli("show acl-plugin acl"))
142             self.logger.info(self.vapi.ppcli("show acl-plugin interface"))
143             self.logger.info(self.vapi.ppcli("show acl-plugin tables"))
144             self.logger.info(self.vapi.ppcli("show bridge-domain %s detail"
145                                              % self.bd_id))
146
147     def create_hosts(self, count, start=0):
148         """
149         Create required number of host MAC addresses and distribute them among
150         interfaces. Create host IPv4 address for every host MAC address.
151
152         :param int count: Number of hosts to create MAC/IPv4 addresses for.
153         :param int start: Number to start numbering from.
154         """
155         n_int = len(self.pg_interfaces)
156         macs_per_if = count / n_int
157         i = -1
158         for pg_if in self.pg_interfaces:
159             i += 1
160             start_nr = macs_per_if * i + start
161             end_nr = count + start if i == (n_int - 1) \
162                 else macs_per_if * (i + 1) + start
163             hosts = self.hosts_by_pg_idx[pg_if.sw_if_index]
164             for j in range(start_nr, end_nr):
165                 host = Host(
166                     "00:00:00:ff:%02x:%02x" % (pg_if.sw_if_index, j),
167                     "172.17.1%02x.%u" % (pg_if.sw_if_index, j),
168                     "2017:dead:%02x::%u" % (pg_if.sw_if_index, j))
169                 hosts.append(host)
170
171     def create_rule(self, ip=0, permit_deny=0, ports=PORTS_ALL, proto=-1,
172                     s_prefix=0, s_ip='\x00\x00\x00\x00',
173                     d_prefix=0, d_ip='\x00\x00\x00\x00'):
174         if proto == -1:
175             return
176         if ports == self.PORTS_ALL:
177             sport_from = 0
178             dport_from = 0
179             sport_to = 65535 if proto != 1 and proto != 58 else 255
180             dport_to = sport_to
181         elif ports == self.PORTS_RANGE:
182             if proto == 1:
183                 sport_from = self.icmp4_type
184                 sport_to = self.icmp4_type
185                 dport_from = self.icmp4_code
186                 dport_to = self.icmp4_code
187             elif proto == 58:
188                 sport_from = self.icmp6_type
189                 sport_to = self.icmp6_type
190                 dport_from = self.icmp6_code
191                 dport_to = self.icmp6_code
192             elif proto == self.proto[self.IP][self.TCP]:
193                 sport_from = self.tcp_sport_from
194                 sport_to = self.tcp_sport_to
195                 dport_from = self.tcp_dport_from
196                 dport_to = self.tcp_dport_to
197             elif proto == self.proto[self.IP][self.UDP]:
198                 sport_from = self.udp_sport_from
199                 sport_to = self.udp_sport_to
200                 dport_from = self.udp_dport_from
201                 dport_to = self.udp_dport_to
202         elif ports == self.PORTS_RANGE_2:
203             if proto == 1:
204                 sport_from = self.icmp4_type_2
205                 sport_to = self.icmp4_type_2
206                 dport_from = self.icmp4_code_from_2
207                 dport_to = self.icmp4_code_to_2
208             elif proto == 58:
209                 sport_from = self.icmp6_type_2
210                 sport_to = self.icmp6_type_2
211                 dport_from = self.icmp6_code_from_2
212                 dport_to = self.icmp6_code_to_2
213             elif proto == self.proto[self.IP][self.TCP]:
214                 sport_from = self.tcp_sport_from_2
215                 sport_to = self.tcp_sport_to_2
216                 dport_from = self.tcp_dport_from_2
217                 dport_to = self.tcp_dport_to_2
218             elif proto == self.proto[self.IP][self.UDP]:
219                 sport_from = self.udp_sport_from_2
220                 sport_to = self.udp_sport_to_2
221                 dport_from = self.udp_dport_from_2
222                 dport_to = self.udp_dport_to_2
223         else:
224             sport_from = ports
225             sport_to = ports
226             dport_from = ports
227             dport_to = ports
228
229         rule = ({'is_permit': permit_deny, 'is_ipv6': ip, 'proto': proto,
230                  'srcport_or_icmptype_first': sport_from,
231                  'srcport_or_icmptype_last': sport_to,
232                  'src_ip_prefix_len': s_prefix,
233                  'src_ip_addr': s_ip,
234                  'dstport_or_icmpcode_first': dport_from,
235                  'dstport_or_icmpcode_last': dport_to,
236                  'dst_ip_prefix_len': d_prefix,
237                  'dst_ip_addr': d_ip})
238         return rule
239
240     def apply_rules(self, rules, tag=''):
241         reply = self.vapi.acl_add_replace(acl_index=4294967295, r=rules,
242                                           tag=tag)
243         self.logger.info("Dumped ACL: " + str(
244             self.vapi.acl_dump(reply.acl_index)))
245         # Apply a ACL on the interface as inbound
246         for i in self.pg_interfaces:
247             self.vapi.acl_interface_set_acl_list(sw_if_index=i.sw_if_index,
248                                                  n_input=1,
249                                                  acls=[reply.acl_index])
250         return
251
252     def apply_rules_to(self, rules, tag='', sw_if_index=0xFFFFFFFF):
253         reply = self.vapi.acl_add_replace(acl_index=4294967295, r=rules,
254                                           tag=tag)
255         self.logger.info("Dumped ACL: " + str(
256             self.vapi.acl_dump(reply.acl_index)))
257         # Apply a ACL on the interface as inbound
258         self.vapi.acl_interface_set_acl_list(sw_if_index=sw_if_index,
259                                              n_input=1,
260                                              acls=[reply.acl_index])
261         return
262
263     def etype_whitelist(self, whitelist, n_input):
264         # Apply whitelists on all the interfaces
265         for i in self.pg_interfaces:
266             # checkstyle can't read long names. Help them.
267             fun = self.vapi.acl_interface_set_etype_whitelist
268             fun(sw_if_index=i.sw_if_index, n_input=n_input,
269                 whitelist=whitelist)
270         return
271
272     def create_upper_layer(self, packet_index, proto, ports=0):
273         p = self.proto_map[proto]
274         if p == 'UDP':
275             if ports == 0:
276                 return UDP(sport=random.randint(self.udp_sport_from,
277                                                 self.udp_sport_to),
278                            dport=random.randint(self.udp_dport_from,
279                                                 self.udp_dport_to))
280             else:
281                 return UDP(sport=ports, dport=ports)
282         elif p == 'TCP':
283             if ports == 0:
284                 return TCP(sport=random.randint(self.tcp_sport_from,
285                                                 self.tcp_sport_to),
286                            dport=random.randint(self.tcp_dport_from,
287                                                 self.tcp_dport_to))
288             else:
289                 return TCP(sport=ports, dport=ports)
290         return ''
291
292     def create_stream(self, src_if, packet_sizes, traffic_type=0, ipv6=0,
293                       proto=-1, ports=0, fragments=False,
294                       pkt_raw=True, etype=-1):
295         """
296         Create input packet stream for defined interface using hosts or
297         deleted_hosts list.
298
299         :param object src_if: Interface to create packet stream for.
300         :param list packet_sizes: List of required packet sizes.
301         :param traffic_type: 1: ICMP packet, 2: IPv6 with EH, 0: otherwise.
302         :return: Stream of packets.
303         """
304         pkts = []
305         if self.flows.__contains__(src_if):
306             src_hosts = self.hosts_by_pg_idx[src_if.sw_if_index]
307             for dst_if in self.flows[src_if]:
308                 dst_hosts = self.hosts_by_pg_idx[dst_if.sw_if_index]
309                 n_int = len(dst_hosts) * len(src_hosts)
310                 for i in range(0, n_int):
311                     dst_host = dst_hosts[i / len(src_hosts)]
312                     src_host = src_hosts[i % len(src_hosts)]
313                     pkt_info = self.create_packet_info(src_if, dst_if)
314                     if ipv6 == 1:
315                         pkt_info.ip = 1
316                     elif ipv6 == 0:
317                         pkt_info.ip = 0
318                     else:
319                         pkt_info.ip = random.choice([0, 1])
320                     if proto == -1:
321                         pkt_info.proto = random.choice(self.proto[self.IP])
322                     else:
323                         pkt_info.proto = proto
324                     payload = self.info_to_payload(pkt_info)
325                     p = Ether(dst=dst_host.mac, src=src_host.mac)
326                     if etype > 0:
327                         p = Ether(dst=dst_host.mac,
328                                   src=src_host.mac,
329                                   type=etype)
330                     if pkt_info.ip:
331                         p /= IPv6(dst=dst_host.ip6, src=src_host.ip6)
332                         if fragments:
333                             p /= IPv6ExtHdrFragment(offset=64, m=1)
334                     else:
335                         if fragments:
336                             p /= IP(src=src_host.ip4, dst=dst_host.ip4,
337                                     flags=1, frag=64)
338                         else:
339                             p /= IP(src=src_host.ip4, dst=dst_host.ip4)
340                     if traffic_type == self.ICMP:
341                         if pkt_info.ip:
342                             p /= ICMPv6EchoRequest(type=self.icmp6_type,
343                                                    code=self.icmp6_code)
344                         else:
345                             p /= ICMP(type=self.icmp4_type,
346                                       code=self.icmp4_code)
347                     else:
348                         p /= self.create_upper_layer(i, pkt_info.proto, ports)
349                     if pkt_raw:
350                         p /= Raw(payload)
351                         pkt_info.data = p.copy()
352                     if pkt_raw:
353                         size = random.choice(packet_sizes)
354                         self.extend_packet(p, size)
355                     pkts.append(p)
356         return pkts
357
358     def verify_capture(self, pg_if, capture,
359                        traffic_type=0, ip_type=0, etype=-1):
360         """
361         Verify captured input packet stream for defined interface.
362
363         :param object pg_if: Interface to verify captured packet stream for.
364         :param list capture: Captured packet stream.
365         :param traffic_type: 1: ICMP packet, 2: IPv6 with EH, 0: otherwise.
366         """
367         last_info = dict()
368         for i in self.pg_interfaces:
369             last_info[i.sw_if_index] = None
370         dst_sw_if_index = pg_if.sw_if_index
371         for packet in capture:
372             if etype > 0:
373                 if packet[Ether].type != etype:
374                     self.logger.error(ppp("Unexpected ethertype in packet:",
375                                           packet))
376                 else:
377                     continue
378             try:
379                 # Raw data for ICMPv6 are stored in ICMPv6EchoRequest.data
380                 if traffic_type == self.ICMP and ip_type == self.IPV6:
381                     payload_info = self.payload_to_info(
382                         packet[ICMPv6EchoRequest].data)
383                     payload = packet[ICMPv6EchoRequest]
384                 else:
385                     payload_info = self.payload_to_info(str(packet[Raw]))
386                     payload = packet[self.proto_map[payload_info.proto]]
387             except:
388                 self.logger.error(ppp("Unexpected or invalid packet "
389                                       "(outside network):", packet))
390                 raise
391
392             if ip_type != 0:
393                 self.assertEqual(payload_info.ip, ip_type)
394             if traffic_type == self.ICMP:
395                 try:
396                     if payload_info.ip == 0:
397                         self.assertEqual(payload.type, self.icmp4_type)
398                         self.assertEqual(payload.code, self.icmp4_code)
399                     else:
400                         self.assertEqual(payload.type, self.icmp6_type)
401                         self.assertEqual(payload.code, self.icmp6_code)
402                 except:
403                     self.logger.error(ppp("Unexpected or invalid packet "
404                                           "(outside network):", packet))
405                     raise
406             else:
407                 try:
408                     ip_version = IPv6 if payload_info.ip == 1 else IP
409
410                     ip = packet[ip_version]
411                     packet_index = payload_info.index
412
413                     self.assertEqual(payload_info.dst, dst_sw_if_index)
414                     self.logger.debug("Got packet on port %s: src=%u (id=%u)" %
415                                       (pg_if.name, payload_info.src,
416                                        packet_index))
417                     next_info = self.get_next_packet_info_for_interface2(
418                         payload_info.src, dst_sw_if_index,
419                         last_info[payload_info.src])
420                     last_info[payload_info.src] = next_info
421                     self.assertTrue(next_info is not None)
422                     self.assertEqual(packet_index, next_info.index)
423                     saved_packet = next_info.data
424                     # Check standard fields
425                     self.assertEqual(ip.src, saved_packet[ip_version].src)
426                     self.assertEqual(ip.dst, saved_packet[ip_version].dst)
427                     p = self.proto_map[payload_info.proto]
428                     if p == 'TCP':
429                         tcp = packet[TCP]
430                         self.assertEqual(tcp.sport, saved_packet[
431                             TCP].sport)
432                         self.assertEqual(tcp.dport, saved_packet[
433                             TCP].dport)
434                     elif p == 'UDP':
435                         udp = packet[UDP]
436                         self.assertEqual(udp.sport, saved_packet[
437                             UDP].sport)
438                         self.assertEqual(udp.dport, saved_packet[
439                             UDP].dport)
440                 except:
441                     self.logger.error(ppp("Unexpected or invalid packet:",
442                                           packet))
443                     raise
444         for i in self.pg_interfaces:
445             remaining_packet = self.get_next_packet_info_for_interface2(
446                 i, dst_sw_if_index, last_info[i.sw_if_index])
447             self.assertTrue(
448                 remaining_packet is None,
449                 "Port %u: Packet expected from source %u didn't arrive" %
450                 (dst_sw_if_index, i.sw_if_index))
451
452     def run_traffic_no_check(self):
453         # Test
454         # Create incoming packet streams for packet-generator interfaces
455         for i in self.pg_interfaces:
456             if self.flows.__contains__(i):
457                 pkts = self.create_stream(i, self.pg_if_packet_sizes)
458                 if len(pkts) > 0:
459                     i.add_stream(pkts)
460
461         # Enable packet capture and start packet sending
462         self.pg_enable_capture(self.pg_interfaces)
463         self.pg_start()
464
465     def run_verify_test(self, traffic_type=0, ip_type=0, proto=-1, ports=0,
466                         frags=False, pkt_raw=True, etype=-1):
467         # Test
468         # Create incoming packet streams for packet-generator interfaces
469         pkts_cnt = 0
470         for i in self.pg_interfaces:
471             if self.flows.__contains__(i):
472                 pkts = self.create_stream(i, self.pg_if_packet_sizes,
473                                           traffic_type, ip_type, proto, ports,
474                                           frags, pkt_raw, etype)
475                 if len(pkts) > 0:
476                     i.add_stream(pkts)
477                     pkts_cnt += len(pkts)
478
479         # Enable packet capture and start packet sendingself.IPV
480         self.pg_enable_capture(self.pg_interfaces)
481         self.pg_start()
482
483         # Verify
484         # Verify outgoing packet streams per packet-generator interface
485         for src_if in self.pg_interfaces:
486             if self.flows.__contains__(src_if):
487                 for dst_if in self.flows[src_if]:
488                     capture = dst_if.get_capture(pkts_cnt)
489                     self.logger.info("Verifying capture on interface %s" %
490                                      dst_if.name)
491                     self.verify_capture(dst_if, capture,
492                                         traffic_type, ip_type, etype)
493
494     def run_verify_negat_test(self, traffic_type=0, ip_type=0, proto=-1,
495                               ports=0, frags=False, etype=-1):
496         # Test
497         self.reset_packet_infos()
498         for i in self.pg_interfaces:
499             if self.flows.__contains__(i):
500                 pkts = self.create_stream(i, self.pg_if_packet_sizes,
501                                           traffic_type, ip_type, proto, ports,
502                                           frags, True, etype)
503                 if len(pkts) > 0:
504                     i.add_stream(pkts)
505
506         # Enable packet capture and start packet sending
507         self.pg_enable_capture(self.pg_interfaces)
508         self.pg_start()
509
510         # Verify
511         # Verify outgoing packet streams per packet-generator interface
512         for src_if in self.pg_interfaces:
513             if self.flows.__contains__(src_if):
514                 for dst_if in self.flows[src_if]:
515                     self.logger.info("Verifying capture on interface %s" %
516                                      dst_if.name)
517                     capture = dst_if.get_capture(0)
518                     self.assertEqual(len(capture), 0)
519
520     def test_0000_warmup_test(self):
521         """ ACL plugin version check; learn MACs
522         """
523         self.create_hosts(16)
524         self.run_traffic_no_check()
525         reply = self.vapi.papi.acl_plugin_get_version()
526         self.assertEqual(reply.major, 1)
527         self.logger.info("Working with ACL plugin version: %d.%d" % (
528             reply.major, reply.minor))
529         # minor version changes are non breaking
530         # self.assertEqual(reply.minor, 0)
531
532     def test_0001_acl_create(self):
533         """ ACL create/delete test
534         """
535
536         self.logger.info("ACLP_TEST_START_0001")
537         # Add an ACL
538         r = [{'is_permit': 1, 'is_ipv6': 0, 'proto': 17,
539               'srcport_or_icmptype_first': 1234,
540               'srcport_or_icmptype_last': 1235,
541               'src_ip_prefix_len': 0,
542               'src_ip_addr': '\x00\x00\x00\x00',
543               'dstport_or_icmpcode_first': 1234,
544               'dstport_or_icmpcode_last': 1234,
545               'dst_ip_addr': '\x00\x00\x00\x00',
546               'dst_ip_prefix_len': 0}]
547         # Test 1: add a new ACL
548         reply = self.vapi.acl_add_replace(acl_index=4294967295, r=r,
549                                           tag="permit 1234")
550         self.assertEqual(reply.retval, 0)
551         # The very first ACL gets #0
552         self.assertEqual(reply.acl_index, 0)
553         first_acl = reply.acl_index
554         rr = self.vapi.acl_dump(reply.acl_index)
555         self.logger.info("Dumped ACL: " + str(rr))
556         self.assertEqual(len(rr), 1)
557         # We should have the same number of ACL entries as we had asked
558         self.assertEqual(len(rr[0].r), len(r))
559         # The rules should be the same. But because the submitted and returned
560         # are different types, we need to iterate over rules and keys to get
561         # to basic values.
562         for i_rule in range(0, len(r) - 1):
563             for rule_key in r[i_rule]:
564                 self.assertEqual(rr[0].r[i_rule][rule_key],
565                                  r[i_rule][rule_key])
566
567         # Add a deny-1234 ACL
568         r_deny = [{'is_permit': 0, 'is_ipv6': 0, 'proto': 17,
569                    'srcport_or_icmptype_first': 1234,
570                    'srcport_or_icmptype_last': 1235,
571                    'src_ip_prefix_len': 0,
572                    'src_ip_addr': '\x00\x00\x00\x00',
573                    'dstport_or_icmpcode_first': 1234,
574                    'dstport_or_icmpcode_last': 1234,
575                    'dst_ip_addr': '\x00\x00\x00\x00',
576                    'dst_ip_prefix_len': 0},
577                   {'is_permit': 1, 'is_ipv6': 0, 'proto': 17,
578                    'srcport_or_icmptype_first': 0,
579                    'srcport_or_icmptype_last': 0,
580                    'src_ip_prefix_len': 0,
581                    'src_ip_addr': '\x00\x00\x00\x00',
582                    'dstport_or_icmpcode_first': 0,
583                    'dstport_or_icmpcode_last': 0,
584                    'dst_ip_addr': '\x00\x00\x00\x00',
585                    'dst_ip_prefix_len': 0}]
586
587         reply = self.vapi.acl_add_replace(acl_index=4294967295, r=r_deny,
588                                           tag="deny 1234;permit all")
589         self.assertEqual(reply.retval, 0)
590         # The second ACL gets #1
591         self.assertEqual(reply.acl_index, 1)
592         second_acl = reply.acl_index
593
594         # Test 2: try to modify a nonexistent ACL
595         reply = self.vapi.acl_add_replace(acl_index=432, r=r,
596                                           tag="FFFF:FFFF", expected_retval=-6)
597         self.assertEqual(reply.retval, -6)
598         # The ACL number should pass through
599         self.assertEqual(reply.acl_index, 432)
600         # apply an ACL on an interface inbound, try to delete ACL, must fail
601         self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
602                                              n_input=1,
603                                              acls=[first_acl])
604         reply = self.vapi.acl_del(acl_index=first_acl, expected_retval=-142)
605         # Unapply an ACL and then try to delete it - must be ok
606         self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
607                                              n_input=0,
608                                              acls=[])
609         reply = self.vapi.acl_del(acl_index=first_acl, expected_retval=0)
610
611         # apply an ACL on an interface outbound, try to delete ACL, must fail
612         self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
613                                              n_input=0,
614                                              acls=[second_acl])
615         reply = self.vapi.acl_del(acl_index=second_acl, expected_retval=-143)
616         # Unapply the ACL and then try to delete it - must be ok
617         self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
618                                              n_input=0,
619                                              acls=[])
620         reply = self.vapi.acl_del(acl_index=second_acl, expected_retval=0)
621
622         # try to apply a nonexistent ACL - must fail
623         self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
624                                              n_input=1,
625                                              acls=[first_acl],
626                                              expected_retval=-6)
627
628         self.logger.info("ACLP_TEST_FINISH_0001")
629
630     def test_0002_acl_permit_apply(self):
631         """ permit ACL apply test
632         """
633         self.logger.info("ACLP_TEST_START_0002")
634
635         rules = []
636         rules.append(self.create_rule(self.IPV4, self.PERMIT,
637                      0, self.proto[self.IP][self.UDP]))
638         rules.append(self.create_rule(self.IPV4, self.PERMIT,
639                      0, self.proto[self.IP][self.TCP]))
640
641         # Apply rules
642         self.apply_rules(rules, "permit per-flow")
643
644         # Traffic should still pass
645         self.run_verify_test(self.IP, self.IPV4, -1)
646         self.logger.info("ACLP_TEST_FINISH_0002")
647
648     def test_0003_acl_deny_apply(self):
649         """ deny ACL apply test
650         """
651         self.logger.info("ACLP_TEST_START_0003")
652         # Add a deny-flows ACL
653         rules = []
654         rules.append(self.create_rule(self.IPV4, self.DENY,
655                      self.PORTS_ALL, self.proto[self.IP][self.UDP]))
656         # Permit ip any any in the end
657         rules.append(self.create_rule(self.IPV4, self.PERMIT,
658                                       self.PORTS_ALL, 0))
659
660         # Apply rules
661         self.apply_rules(rules, "deny per-flow;permit all")
662
663         # Traffic should not pass
664         self.run_verify_negat_test(self.IP, self.IPV4,
665                                    self.proto[self.IP][self.UDP])
666         self.logger.info("ACLP_TEST_FINISH_0003")
667         # self.assertEqual(1, 0)
668
669     def test_0004_vpp624_permit_icmpv4(self):
670         """ VPP_624 permit ICMPv4
671         """
672         self.logger.info("ACLP_TEST_START_0004")
673
674         # Add an ACL
675         rules = []
676         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
677                                       self.proto[self.ICMP][self.ICMPv4]))
678         # deny ip any any in the end
679         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
680
681         # Apply rules
682         self.apply_rules(rules, "permit icmpv4")
683
684         # Traffic should still pass
685         self.run_verify_test(self.ICMP, self.IPV4,
686                              self.proto[self.ICMP][self.ICMPv4])
687
688         self.logger.info("ACLP_TEST_FINISH_0004")
689
690     def test_0005_vpp624_permit_icmpv6(self):
691         """ VPP_624 permit ICMPv6
692         """
693         self.logger.info("ACLP_TEST_START_0005")
694
695         # Add an ACL
696         rules = []
697         rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
698                                       self.proto[self.ICMP][self.ICMPv6]))
699         # deny ip any any in the end
700         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
701
702         # Apply rules
703         self.apply_rules(rules, "permit icmpv6")
704
705         # Traffic should still pass
706         self.run_verify_test(self.ICMP, self.IPV6,
707                              self.proto[self.ICMP][self.ICMPv6])
708
709         self.logger.info("ACLP_TEST_FINISH_0005")
710
711     def test_0006_vpp624_deny_icmpv4(self):
712         """ VPP_624 deny ICMPv4
713         """
714         self.logger.info("ACLP_TEST_START_0006")
715         # Add an ACL
716         rules = []
717         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
718                                       self.proto[self.ICMP][self.ICMPv4]))
719         # permit ip any any in the end
720         rules.append(self.create_rule(self.IPV4, self.PERMIT,
721                                       self.PORTS_ALL, 0))
722
723         # Apply rules
724         self.apply_rules(rules, "deny icmpv4")
725
726         # Traffic should not pass
727         self.run_verify_negat_test(self.ICMP, self.IPV4, 0)
728
729         self.logger.info("ACLP_TEST_FINISH_0006")
730
731     def test_0007_vpp624_deny_icmpv6(self):
732         """ VPP_624 deny ICMPv6
733         """
734         self.logger.info("ACLP_TEST_START_0007")
735         # Add an ACL
736         rules = []
737         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
738                                       self.proto[self.ICMP][self.ICMPv6]))
739         # deny ip any any in the end
740         rules.append(self.create_rule(self.IPV6, self.PERMIT,
741                                       self.PORTS_ALL, 0))
742
743         # Apply rules
744         self.apply_rules(rules, "deny icmpv6")
745
746         # Traffic should not pass
747         self.run_verify_negat_test(self.ICMP, self.IPV6, 0)
748
749         self.logger.info("ACLP_TEST_FINISH_0007")
750
751     def test_0008_tcp_permit_v4(self):
752         """ permit TCPv4
753         """
754         self.logger.info("ACLP_TEST_START_0008")
755
756         # Add an ACL
757         rules = []
758         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
759                      self.proto[self.IP][self.TCP]))
760         # deny ip any any in the end
761         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
762
763         # Apply rules
764         self.apply_rules(rules, "permit ipv4 tcp")
765
766         # Traffic should still pass
767         self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP])
768
769         self.logger.info("ACLP_TEST_FINISH_0008")
770
771     def test_0009_tcp_permit_v6(self):
772         """ permit TCPv6
773         """
774         self.logger.info("ACLP_TEST_START_0009")
775
776         # Add an ACL
777         rules = []
778         rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
779                                       self.proto[self.IP][self.TCP]))
780         # deny ip any any in the end
781         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
782
783         # Apply rules
784         self.apply_rules(rules, "permit ip6 tcp")
785
786         # Traffic should still pass
787         self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.TCP])
788
789         self.logger.info("ACLP_TEST_FINISH_0008")
790
791     def test_0010_udp_permit_v4(self):
792         """ permit UDPv4
793         """
794         self.logger.info("ACLP_TEST_START_0010")
795
796         # Add an ACL
797         rules = []
798         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
799                                       self.proto[self.IP][self.UDP]))
800         # deny ip any any in the end
801         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
802
803         # Apply rules
804         self.apply_rules(rules, "permit ipv udp")
805
806         # Traffic should still pass
807         self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP])
808
809         self.logger.info("ACLP_TEST_FINISH_0010")
810
811     def test_0011_udp_permit_v6(self):
812         """ permit UDPv6
813         """
814         self.logger.info("ACLP_TEST_START_0011")
815
816         # Add an ACL
817         rules = []
818         rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
819                                       self.proto[self.IP][self.UDP]))
820         # deny ip any any in the end
821         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
822
823         # Apply rules
824         self.apply_rules(rules, "permit ip6 udp")
825
826         # Traffic should still pass
827         self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.UDP])
828
829         self.logger.info("ACLP_TEST_FINISH_0011")
830
831     def test_0012_tcp_deny(self):
832         """ deny TCPv4/v6
833         """
834         self.logger.info("ACLP_TEST_START_0012")
835
836         # Add an ACL
837         rules = []
838         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
839                                       self.proto[self.IP][self.TCP]))
840         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
841                                       self.proto[self.IP][self.TCP]))
842         # permit ip any any in the end
843         rules.append(self.create_rule(self.IPV4, self.PERMIT,
844                                       self.PORTS_ALL, 0))
845         rules.append(self.create_rule(self.IPV6, self.PERMIT,
846                                       self.PORTS_ALL, 0))
847
848         # Apply rules
849         self.apply_rules(rules, "deny ip4/ip6 tcp")
850
851         # Traffic should not pass
852         self.run_verify_negat_test(self.IP, self.IPRANDOM,
853                                    self.proto[self.IP][self.TCP])
854
855         self.logger.info("ACLP_TEST_FINISH_0012")
856
857     def test_0013_udp_deny(self):
858         """ deny UDPv4/v6
859         """
860         self.logger.info("ACLP_TEST_START_0013")
861
862         # Add an ACL
863         rules = []
864         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
865                                       self.proto[self.IP][self.UDP]))
866         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
867                                       self.proto[self.IP][self.UDP]))
868         # permit ip any any in the end
869         rules.append(self.create_rule(self.IPV4, self.PERMIT,
870                                       self.PORTS_ALL, 0))
871         rules.append(self.create_rule(self.IPV6, self.PERMIT,
872                                       self.PORTS_ALL, 0))
873
874         # Apply rules
875         self.apply_rules(rules, "deny ip4/ip6 udp")
876
877         # Traffic should not pass
878         self.run_verify_negat_test(self.IP, self.IPRANDOM,
879                                    self.proto[self.IP][self.UDP])
880
881         self.logger.info("ACLP_TEST_FINISH_0013")
882
883     def test_0014_acl_dump(self):
884         """ verify add/dump acls
885         """
886         self.logger.info("ACLP_TEST_START_0014")
887
888         r = [[self.IPV4, self.PERMIT, 1234, self.proto[self.IP][self.TCP]],
889              [self.IPV4, self.PERMIT, 2345, self.proto[self.IP][self.UDP]],
890              [self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.TCP]],
891              [self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.UDP]],
892              [self.IPV4, self.PERMIT, 5, self.proto[self.ICMP][self.ICMPv4]],
893              [self.IPV6, self.PERMIT, 4321, self.proto[self.IP][self.TCP]],
894              [self.IPV6, self.PERMIT, 5432, self.proto[self.IP][self.UDP]],
895              [self.IPV6, self.PERMIT, 0, self.proto[self.IP][self.TCP]],
896              [self.IPV6, self.PERMIT, 0, self.proto[self.IP][self.UDP]],
897              [self.IPV6, self.PERMIT, 6, self.proto[self.ICMP][self.ICMPv6]],
898              [self.IPV4, self.DENY, self.PORTS_ALL, 0],
899              [self.IPV4, self.DENY, 1234, self.proto[self.IP][self.TCP]],
900              [self.IPV4, self.DENY, 2345, self.proto[self.IP][self.UDP]],
901              [self.IPV4, self.DENY, 5, self.proto[self.ICMP][self.ICMPv4]],
902              [self.IPV6, self.DENY, 4321, self.proto[self.IP][self.TCP]],
903              [self.IPV6, self.DENY, 5432, self.proto[self.IP][self.UDP]],
904              [self.IPV6, self.DENY, 6, self.proto[self.ICMP][self.ICMPv6]],
905              [self.IPV6, self.DENY, self.PORTS_ALL, 0]
906              ]
907
908         # Add and verify new ACLs
909         rules = []
910         for i in range(len(r)):
911             rules.append(self.create_rule(r[i][0], r[i][1], r[i][2], r[i][3]))
912
913         reply = self.vapi.acl_add_replace(acl_index=4294967295, r=rules)
914         result = self.vapi.acl_dump(reply.acl_index)
915
916         i = 0
917         for drules in result:
918             for dr in drules.r:
919                 self.assertEqual(dr.is_ipv6, r[i][0])
920                 self.assertEqual(dr.is_permit, r[i][1])
921                 self.assertEqual(dr.proto, r[i][3])
922
923                 if r[i][2] > 0:
924                     self.assertEqual(dr.srcport_or_icmptype_first, r[i][2])
925                 else:
926                     if r[i][2] < 0:
927                         self.assertEqual(dr.srcport_or_icmptype_first, 0)
928                         self.assertEqual(dr.srcport_or_icmptype_last, 65535)
929                     else:
930                         if dr.proto == self.proto[self.IP][self.TCP]:
931                             self.assertGreater(dr.srcport_or_icmptype_first,
932                                                self.tcp_sport_from-1)
933                             self.assertLess(dr.srcport_or_icmptype_first,
934                                             self.tcp_sport_to+1)
935                             self.assertGreater(dr.dstport_or_icmpcode_last,
936                                                self.tcp_dport_from-1)
937                             self.assertLess(dr.dstport_or_icmpcode_last,
938                                             self.tcp_dport_to+1)
939                         elif dr.proto == self.proto[self.IP][self.UDP]:
940                             self.assertGreater(dr.srcport_or_icmptype_first,
941                                                self.udp_sport_from-1)
942                             self.assertLess(dr.srcport_or_icmptype_first,
943                                             self.udp_sport_to+1)
944                             self.assertGreater(dr.dstport_or_icmpcode_last,
945                                                self.udp_dport_from-1)
946                             self.assertLess(dr.dstport_or_icmpcode_last,
947                                             self.udp_dport_to+1)
948                 i += 1
949
950         self.logger.info("ACLP_TEST_FINISH_0014")
951
952     def test_0015_tcp_permit_port_v4(self):
953         """ permit single TCPv4
954         """
955         self.logger.info("ACLP_TEST_START_0015")
956
957         port = random.randint(0, 65535)
958         # Add an ACL
959         rules = []
960         rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
961                                       self.proto[self.IP][self.TCP]))
962         # deny ip any any in the end
963         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
964
965         # Apply rules
966         self.apply_rules(rules, "permit ip4 tcp "+str(port))
967
968         # Traffic should still pass
969         self.run_verify_test(self.IP, self.IPV4,
970                              self.proto[self.IP][self.TCP], port)
971
972         self.logger.info("ACLP_TEST_FINISH_0015")
973
974     def test_0016_udp_permit_port_v4(self):
975         """ permit single UDPv4
976         """
977         self.logger.info("ACLP_TEST_START_0016")
978
979         port = random.randint(0, 65535)
980         # Add an ACL
981         rules = []
982         rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
983                                       self.proto[self.IP][self.UDP]))
984         # deny ip any any in the end
985         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
986
987         # Apply rules
988         self.apply_rules(rules, "permit ip4 tcp "+str(port))
989
990         # Traffic should still pass
991         self.run_verify_test(self.IP, self.IPV4,
992                              self.proto[self.IP][self.UDP], port)
993
994         self.logger.info("ACLP_TEST_FINISH_0016")
995
996     def test_0017_tcp_permit_port_v6(self):
997         """ permit single TCPv6
998         """
999         self.logger.info("ACLP_TEST_START_0017")
1000
1001         port = random.randint(0, 65535)
1002         # Add an ACL
1003         rules = []
1004         rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
1005                                       self.proto[self.IP][self.TCP]))
1006         # deny ip any any in the end
1007         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1008
1009         # Apply rules
1010         self.apply_rules(rules, "permit ip4 tcp "+str(port))
1011
1012         # Traffic should still pass
1013         self.run_verify_test(self.IP, self.IPV6,
1014                              self.proto[self.IP][self.TCP], port)
1015
1016         self.logger.info("ACLP_TEST_FINISH_0017")
1017
1018     def test_0018_udp_permit_port_v6(self):
1019         """ permit single UPPv6
1020         """
1021         self.logger.info("ACLP_TEST_START_0018")
1022
1023         port = random.randint(0, 65535)
1024         # Add an ACL
1025         rules = []
1026         rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
1027                                       self.proto[self.IP][self.UDP]))
1028         # deny ip any any in the end
1029         rules.append(self.create_rule(self.IPV6, self.DENY,
1030                                       self.PORTS_ALL, 0))
1031
1032         # Apply rules
1033         self.apply_rules(rules, "permit ip4 tcp "+str(port))
1034
1035         # Traffic should still pass
1036         self.run_verify_test(self.IP, self.IPV6,
1037                              self.proto[self.IP][self.UDP], port)
1038
1039         self.logger.info("ACLP_TEST_FINISH_0018")
1040
1041     def test_0019_udp_deny_port(self):
1042         """ deny single TCPv4/v6
1043         """
1044         self.logger.info("ACLP_TEST_START_0019")
1045
1046         port = random.randint(0, 65535)
1047         # Add an ACL
1048         rules = []
1049         rules.append(self.create_rule(self.IPV4, self.DENY, port,
1050                                       self.proto[self.IP][self.TCP]))
1051         rules.append(self.create_rule(self.IPV6, self.DENY, port,
1052                                       self.proto[self.IP][self.TCP]))
1053         # Permit ip any any in the end
1054         rules.append(self.create_rule(self.IPV4, self.PERMIT,
1055                                       self.PORTS_ALL, 0))
1056         rules.append(self.create_rule(self.IPV6, self.PERMIT,
1057                                       self.PORTS_ALL, 0))
1058
1059         # Apply rules
1060         self.apply_rules(rules, "deny ip4/ip6 udp "+str(port))
1061
1062         # Traffic should not pass
1063         self.run_verify_negat_test(self.IP, self.IPRANDOM,
1064                                    self.proto[self.IP][self.TCP], port)
1065
1066         self.logger.info("ACLP_TEST_FINISH_0019")
1067
1068     def test_0020_udp_deny_port(self):
1069         """ deny single UDPv4/v6
1070         """
1071         self.logger.info("ACLP_TEST_START_0020")
1072
1073         port = random.randint(0, 65535)
1074         # Add an ACL
1075         rules = []
1076         rules.append(self.create_rule(self.IPV4, self.DENY, port,
1077                                       self.proto[self.IP][self.UDP]))
1078         rules.append(self.create_rule(self.IPV6, self.DENY, port,
1079                                       self.proto[self.IP][self.UDP]))
1080         # Permit ip any any in the end
1081         rules.append(self.create_rule(self.IPV4, self.PERMIT,
1082                                       self.PORTS_ALL, 0))
1083         rules.append(self.create_rule(self.IPV6, self.PERMIT,
1084                                       self.PORTS_ALL, 0))
1085
1086         # Apply rules
1087         self.apply_rules(rules, "deny ip4/ip6 udp "+str(port))
1088
1089         # Traffic should not pass
1090         self.run_verify_negat_test(self.IP, self.IPRANDOM,
1091                                    self.proto[self.IP][self.UDP], port)
1092
1093         self.logger.info("ACLP_TEST_FINISH_0020")
1094
1095     def test_0021_udp_deny_port_verify_fragment_deny(self):
1096         """ deny single UDPv4/v6, permit ip any, verify non-initial fragment
1097         blocked
1098         """
1099         self.logger.info("ACLP_TEST_START_0021")
1100
1101         port = random.randint(0, 65535)
1102         # Add an ACL
1103         rules = []
1104         rules.append(self.create_rule(self.IPV4, self.DENY, port,
1105                                       self.proto[self.IP][self.UDP]))
1106         rules.append(self.create_rule(self.IPV6, self.DENY, port,
1107                                       self.proto[self.IP][self.UDP]))
1108         # deny ip any any in the end
1109         rules.append(self.create_rule(self.IPV4, self.PERMIT,
1110                                       self.PORTS_ALL, 0))
1111         rules.append(self.create_rule(self.IPV6, self.PERMIT,
1112                                       self.PORTS_ALL, 0))
1113
1114         # Apply rules
1115         self.apply_rules(rules, "deny ip4/ip6 udp "+str(port))
1116
1117         # Traffic should not pass
1118         self.run_verify_negat_test(self.IP, self.IPRANDOM,
1119                                    self.proto[self.IP][self.UDP], port, True)
1120
1121         self.logger.info("ACLP_TEST_FINISH_0021")
1122
1123     def test_0022_zero_length_udp_ipv4(self):
1124         """ VPP-687 zero length udp ipv4 packet"""
1125         self.logger.info("ACLP_TEST_START_0022")
1126
1127         port = random.randint(0, 65535)
1128         # Add an ACL
1129         rules = []
1130         rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
1131                                       self.proto[self.IP][self.UDP]))
1132         # deny ip any any in the end
1133         rules.append(
1134             self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1135
1136         # Apply rules
1137         self.apply_rules(rules, "permit empty udp ip4 " + str(port))
1138
1139         # Traffic should still pass
1140         # Create incoming packet streams for packet-generator interfaces
1141         pkts_cnt = 0
1142         pkts = self.create_stream(self.pg0, self.pg_if_packet_sizes,
1143                                   self.IP, self.IPV4,
1144                                   self.proto[self.IP][self.UDP], port,
1145                                   False, False)
1146         if len(pkts) > 0:
1147             self.pg0.add_stream(pkts)
1148             pkts_cnt += len(pkts)
1149
1150         # Enable packet capture and start packet sendingself.IPV
1151         self.pg_enable_capture(self.pg_interfaces)
1152         self.pg_start()
1153
1154         self.pg1.get_capture(pkts_cnt)
1155
1156         self.logger.info("ACLP_TEST_FINISH_0022")
1157
1158     def test_0023_zero_length_udp_ipv6(self):
1159         """ VPP-687 zero length udp ipv6 packet"""
1160         self.logger.info("ACLP_TEST_START_0023")
1161
1162         port = random.randint(0, 65535)
1163         # Add an ACL
1164         rules = []
1165         rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
1166                                       self.proto[self.IP][self.UDP]))
1167         # deny ip any any in the end
1168         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1169
1170         # Apply rules
1171         self.apply_rules(rules, "permit empty udp ip6 "+str(port))
1172
1173         # Traffic should still pass
1174         # Create incoming packet streams for packet-generator interfaces
1175         pkts_cnt = 0
1176         pkts = self.create_stream(self.pg0, self.pg_if_packet_sizes,
1177                                   self.IP, self.IPV6,
1178                                   self.proto[self.IP][self.UDP], port,
1179                                   False, False)
1180         if len(pkts) > 0:
1181             self.pg0.add_stream(pkts)
1182             pkts_cnt += len(pkts)
1183
1184         # Enable packet capture and start packet sendingself.IPV
1185         self.pg_enable_capture(self.pg_interfaces)
1186         self.pg_start()
1187
1188         # Verify outgoing packet streams per packet-generator interface
1189         self.pg1.get_capture(pkts_cnt)
1190
1191         self.logger.info("ACLP_TEST_FINISH_0023")
1192
1193     def test_0108_tcp_permit_v4(self):
1194         """ permit TCPv4 + non-match range
1195         """
1196         self.logger.info("ACLP_TEST_START_0108")
1197
1198         # Add an ACL
1199         rules = []
1200         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1201                      self.proto[self.IP][self.TCP]))
1202         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1203                      self.proto[self.IP][self.TCP]))
1204         # deny ip any any in the end
1205         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1206
1207         # Apply rules
1208         self.apply_rules(rules, "permit ipv4 tcp")
1209
1210         # Traffic should still pass
1211         self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP])
1212
1213         self.logger.info("ACLP_TEST_FINISH_0108")
1214
1215     def test_0109_tcp_permit_v6(self):
1216         """ permit TCPv6 + non-match range
1217         """
1218         self.logger.info("ACLP_TEST_START_0109")
1219
1220         # Add an ACL
1221         rules = []
1222         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE_2,
1223                                       self.proto[self.IP][self.TCP]))
1224         rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
1225                                       self.proto[self.IP][self.TCP]))
1226         # deny ip any any in the end
1227         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1228
1229         # Apply rules
1230         self.apply_rules(rules, "permit ip6 tcp")
1231
1232         # Traffic should still pass
1233         self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.TCP])
1234
1235         self.logger.info("ACLP_TEST_FINISH_0109")
1236
1237     def test_0110_udp_permit_v4(self):
1238         """ permit UDPv4 + non-match range
1239         """
1240         self.logger.info("ACLP_TEST_START_0110")
1241
1242         # Add an ACL
1243         rules = []
1244         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1245                                       self.proto[self.IP][self.UDP]))
1246         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1247                                       self.proto[self.IP][self.UDP]))
1248         # deny ip any any in the end
1249         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1250
1251         # Apply rules
1252         self.apply_rules(rules, "permit ipv4 udp")
1253
1254         # Traffic should still pass
1255         self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP])
1256
1257         self.logger.info("ACLP_TEST_FINISH_0110")
1258
1259     def test_0111_udp_permit_v6(self):
1260         """ permit UDPv6 + non-match range
1261         """
1262         self.logger.info("ACLP_TEST_START_0111")
1263
1264         # Add an ACL
1265         rules = []
1266         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE_2,
1267                                       self.proto[self.IP][self.UDP]))
1268         rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
1269                                       self.proto[self.IP][self.UDP]))
1270         # deny ip any any in the end
1271         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1272
1273         # Apply rules
1274         self.apply_rules(rules, "permit ip6 udp")
1275
1276         # Traffic should still pass
1277         self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.UDP])
1278
1279         self.logger.info("ACLP_TEST_FINISH_0111")
1280
1281     def test_0112_tcp_deny(self):
1282         """ deny TCPv4/v6 + non-match range
1283         """
1284         self.logger.info("ACLP_TEST_START_0112")
1285
1286         # Add an ACL
1287         rules = []
1288         rules.append(self.create_rule(self.IPV4, self.PERMIT,
1289                                       self.PORTS_RANGE_2,
1290                                       self.proto[self.IP][self.TCP]))
1291         rules.append(self.create_rule(self.IPV6, self.PERMIT,
1292                                       self.PORTS_RANGE_2,
1293                                       self.proto[self.IP][self.TCP]))
1294         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
1295                                       self.proto[self.IP][self.TCP]))
1296         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
1297                                       self.proto[self.IP][self.TCP]))
1298         # permit ip any any in the end
1299         rules.append(self.create_rule(self.IPV4, self.PERMIT,
1300                                       self.PORTS_ALL, 0))
1301         rules.append(self.create_rule(self.IPV6, self.PERMIT,
1302                                       self.PORTS_ALL, 0))
1303
1304         # Apply rules
1305         self.apply_rules(rules, "deny ip4/ip6 tcp")
1306
1307         # Traffic should not pass
1308         self.run_verify_negat_test(self.IP, self.IPRANDOM,
1309                                    self.proto[self.IP][self.TCP])
1310
1311         self.logger.info("ACLP_TEST_FINISH_0112")
1312
1313     def test_0113_udp_deny(self):
1314         """ deny UDPv4/v6 + non-match range
1315         """
1316         self.logger.info("ACLP_TEST_START_0113")
1317
1318         # Add an ACL
1319         rules = []
1320         rules.append(self.create_rule(self.IPV4, self.PERMIT,
1321                                       self.PORTS_RANGE_2,
1322                                       self.proto[self.IP][self.UDP]))
1323         rules.append(self.create_rule(self.IPV6, self.PERMIT,
1324                                       self.PORTS_RANGE_2,
1325                                       self.proto[self.IP][self.UDP]))
1326         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
1327                                       self.proto[self.IP][self.UDP]))
1328         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
1329                                       self.proto[self.IP][self.UDP]))
1330         # permit ip any any in the end
1331         rules.append(self.create_rule(self.IPV4, self.PERMIT,
1332                                       self.PORTS_ALL, 0))
1333         rules.append(self.create_rule(self.IPV6, self.PERMIT,
1334                                       self.PORTS_ALL, 0))
1335
1336         # Apply rules
1337         self.apply_rules(rules, "deny ip4/ip6 udp")
1338
1339         # Traffic should not pass
1340         self.run_verify_negat_test(self.IP, self.IPRANDOM,
1341                                    self.proto[self.IP][self.UDP])
1342
1343         self.logger.info("ACLP_TEST_FINISH_0113")
1344
1345     def test_0300_tcp_permit_v4_etype_aaaa(self):
1346         """ permit TCPv4, send 0xAAAA etype
1347         """
1348         self.logger.info("ACLP_TEST_START_0300")
1349
1350         # Add an ACL
1351         rules = []
1352         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1353                      self.proto[self.IP][self.TCP]))
1354         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1355                      self.proto[self.IP][self.TCP]))
1356         # deny ip any any in the end
1357         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1358
1359         # Apply rules
1360         self.apply_rules(rules, "permit ipv4 tcp")
1361
1362         # Traffic should still pass
1363         self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP])
1364
1365         # Traffic should still pass also for an odd ethertype
1366         self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP],
1367                              0, False, True, 0xaaaa)
1368
1369         self.logger.info("ACLP_TEST_FINISH_0300")
1370
1371     def test_0305_tcp_permit_v4_etype_blacklist_aaaa(self):
1372         """ permit TCPv4, whitelist 0x0BBB ethertype, send 0xAAAA, 0x0BBB
1373         """
1374         self.logger.info("ACLP_TEST_START_0305")
1375
1376         # Add an ACL
1377         rules = []
1378         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1379                      self.proto[self.IP][self.TCP]))
1380         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1381                      self.proto[self.IP][self.TCP]))
1382         # deny ip any any in the end
1383         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1384
1385         # Apply rules
1386         self.apply_rules(rules, "permit ipv4 tcp")
1387
1388         # whitelist the 0xbbbb etype - so the 0xaaaa should be blocked
1389         self.etype_whitelist([0xbbb], 1)
1390
1391         # The IPv4 traffic should still pass
1392         self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP])
1393
1394         # The oddball ethertype should be blocked
1395         self.run_verify_negat_test(self.IP, self.IPV4,
1396                                    self.proto[self.IP][self.TCP],
1397                                    0, False, 0xaaaa)
1398
1399         # The whitelisted traffic, on the other hand, should pass
1400         self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP],
1401                              0, False, True, 0x0bbb)
1402
1403         # remove the whitelist, the previously blocked 0xAAAA should pass now
1404         self.etype_whitelist([], 0)
1405         self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP],
1406                              0, False, True, 0xaaaa)
1407
1408         self.logger.info("ACLP_TEST_FINISH_0305")
1409
1410     def test_0315_del_intf(self):
1411         """ apply an acl and delete the interface
1412         """
1413         self.logger.info("ACLP_TEST_START_0315")
1414
1415         # Add an ACL
1416         rules = []
1417         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1418                      self.proto[self.IP][self.TCP]))
1419         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1420                      self.proto[self.IP][self.TCP]))
1421         # deny ip any any in the end
1422         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1423
1424         # create an interface
1425         intf = []
1426         intf.append(VppLoInterface(self, 0))
1427
1428         # Apply rules
1429         self.apply_rules_to(rules, "permit ipv4 tcp", intf[0].sw_if_index)
1430
1431         # Remove the interface
1432         intf[0].remove_vpp_config()
1433
1434         self.logger.info("ACLP_TEST_FINISH_0315")
1435
1436 if __name__ == '__main__':
1437     unittest.main(testRunner=VppTestRunner)