ACL plugin enable macip for ip4/ip6 traffic
[vpp.git] / test / test_acl_plugin.py
1 #!/usr/bin/env python
2 """ACL plugin Test Case HLD:
3 """
4
5 import unittest
6 import random
7
8 from scapy.packet import Raw
9 from scapy.layers.l2 import Ether
10 from scapy.layers.inet import IP, TCP, UDP, ICMP
11 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest
12 from scapy.layers.inet6 import IPv6ExtHdrFragment
13 from framework import VppTestCase, VppTestRunner
14 from util import Host, ppp
15
16
17 class TestACLplugin(VppTestCase):
18     """ ACL plugin Test Case """
19
20     # traffic types
21     IP = 0
22     ICMP = 1
23
24     # IP version
25     IPRANDOM = -1
26     IPV4 = 0
27     IPV6 = 1
28
29     # rule types
30     DENY = 0
31     PERMIT = 1
32
33     # supported protocols
34     proto = [[6, 17], [1, 58]]
35     proto_map = {1: 'ICMP', 58: 'ICMPv6EchoRequest', 6: 'TCP', 17: 'UDP'}
36     ICMPv4 = 0
37     ICMPv6 = 1
38     TCP = 0
39     UDP = 1
40     PROTO_ALL = 0
41
42     # port ranges
43     PORTS_ALL = -1
44     PORTS_RANGE = 0
45     PORTS_RANGE_2 = 1
46     udp_sport_from = 10
47     udp_sport_to = udp_sport_from + 5
48     udp_dport_from = 20000
49     udp_dport_to = udp_dport_from + 5000
50     tcp_sport_from = 30
51     tcp_sport_to = tcp_sport_from + 5
52     tcp_dport_from = 40000
53     tcp_dport_to = tcp_dport_from + 5000
54
55     udp_sport_from_2 = 90
56     udp_sport_to_2 = udp_sport_from_2 + 5
57     udp_dport_from_2 = 30000
58     udp_dport_to_2 = udp_dport_from_2 + 5000
59     tcp_sport_from_2 = 130
60     tcp_sport_to_2 = tcp_sport_from_2 + 5
61     tcp_dport_from_2 = 20000
62     tcp_dport_to_2 = tcp_dport_from_2 + 5000
63
64     icmp4_type = 8  # echo request
65     icmp4_code = 3
66     icmp6_type = 128  # echo request
67     icmp6_code = 3
68
69     icmp4_type_2 = 8
70     icmp4_code_from_2 = 5
71     icmp4_code_to_2 = 20
72     icmp6_type_2 = 128
73     icmp6_code_from_2 = 8
74     icmp6_code_to_2 = 42
75
76     # Test variables
77     bd_id = 1
78
79     @classmethod
80     def setUpClass(cls):
81         """
82         Perform standard class setup (defined by class method setUpClass in
83         class VppTestCase) before running the test case, set test case related
84         variables and configure VPP.
85         """
86         super(TestACLplugin, cls).setUpClass()
87
88         random.seed()
89
90         try:
91             # Create 2 pg interfaces
92             cls.create_pg_interfaces(range(2))
93
94             # Packet flows mapping pg0 -> pg1, pg2 etc.
95             cls.flows = dict()
96             cls.flows[cls.pg0] = [cls.pg1]
97
98             # Packet sizes
99             cls.pg_if_packet_sizes = [64, 512, 1518, 9018]
100
101             # Create BD with MAC learning and unknown unicast flooding disabled
102             # and put interfaces to this BD
103             cls.vapi.bridge_domain_add_del(bd_id=cls.bd_id, uu_flood=1,
104                                            learn=1)
105             for pg_if in cls.pg_interfaces:
106                 cls.vapi.sw_interface_set_l2_bridge(pg_if.sw_if_index,
107                                                     bd_id=cls.bd_id)
108
109             # Set up all interfaces
110             for i in cls.pg_interfaces:
111                 i.admin_up()
112
113             # Mapping between packet-generator index and lists of test hosts
114             cls.hosts_by_pg_idx = dict()
115             for pg_if in cls.pg_interfaces:
116                 cls.hosts_by_pg_idx[pg_if.sw_if_index] = []
117
118             # Create list of deleted hosts
119             cls.deleted_hosts_by_pg_idx = dict()
120             for pg_if in cls.pg_interfaces:
121                 cls.deleted_hosts_by_pg_idx[pg_if.sw_if_index] = []
122
123             # warm-up the mac address tables
124             # self.warmup_test()
125
126         except Exception:
127             super(TestACLplugin, cls).tearDownClass()
128             raise
129
130     def setUp(self):
131         super(TestACLplugin, self).setUp()
132         self.reset_packet_infos()
133
134     def tearDown(self):
135         """
136         Show various debug prints after each test.
137         """
138         super(TestACLplugin, self).tearDown()
139         if not self.vpp_dead:
140             self.logger.info(self.vapi.ppcli("show l2fib verbose"))
141             self.logger.info(self.vapi.ppcli("show acl-plugin acl"))
142             self.logger.info(self.vapi.ppcli("show acl-plugin interface"))
143             self.logger.info(self.vapi.ppcli("show acl-plugin tables"))
144             self.logger.info(self.vapi.ppcli("show bridge-domain %s detail"
145                                              % self.bd_id))
146
147     def create_hosts(self, count, start=0):
148         """
149         Create required number of host MAC addresses and distribute them among
150         interfaces. Create host IPv4 address for every host MAC address.
151
152         :param int count: Number of hosts to create MAC/IPv4 addresses for.
153         :param int start: Number to start numbering from.
154         """
155         n_int = len(self.pg_interfaces)
156         macs_per_if = count / n_int
157         i = -1
158         for pg_if in self.pg_interfaces:
159             i += 1
160             start_nr = macs_per_if * i + start
161             end_nr = count + start if i == (n_int - 1) \
162                 else macs_per_if * (i + 1) + start
163             hosts = self.hosts_by_pg_idx[pg_if.sw_if_index]
164             for j in range(start_nr, end_nr):
165                 host = Host(
166                     "00:00:00:ff:%02x:%02x" % (pg_if.sw_if_index, j),
167                     "172.17.1%02x.%u" % (pg_if.sw_if_index, j),
168                     "2017:dead:%02x::%u" % (pg_if.sw_if_index, j))
169                 hosts.append(host)
170
171     def create_rule(self, ip=0, permit_deny=0, ports=PORTS_ALL, proto=-1,
172                     s_prefix=0, s_ip='\x00\x00\x00\x00',
173                     d_prefix=0, d_ip='\x00\x00\x00\x00'):
174         if proto == -1:
175             return
176         if ports == self.PORTS_ALL:
177             sport_from = 0
178             dport_from = 0
179             sport_to = 65535 if proto != 1 and proto != 58 else 255
180             dport_to = sport_to
181         elif ports == self.PORTS_RANGE:
182             if proto == 1:
183                 sport_from = self.icmp4_type
184                 sport_to = self.icmp4_type
185                 dport_from = self.icmp4_code
186                 dport_to = self.icmp4_code
187             elif proto == 58:
188                 sport_from = self.icmp6_type
189                 sport_to = self.icmp6_type
190                 dport_from = self.icmp6_code
191                 dport_to = self.icmp6_code
192             elif proto == self.proto[self.IP][self.TCP]:
193                 sport_from = self.tcp_sport_from
194                 sport_to = self.tcp_sport_to
195                 dport_from = self.tcp_dport_from
196                 dport_to = self.tcp_dport_to
197             elif proto == self.proto[self.IP][self.UDP]:
198                 sport_from = self.udp_sport_from
199                 sport_to = self.udp_sport_to
200                 dport_from = self.udp_dport_from
201                 dport_to = self.udp_dport_to
202         elif ports == self.PORTS_RANGE_2:
203             if proto == 1:
204                 sport_from = self.icmp4_type_2
205                 sport_to = self.icmp4_type_2
206                 dport_from = self.icmp4_code_from_2
207                 dport_to = self.icmp4_code_to_2
208             elif proto == 58:
209                 sport_from = self.icmp6_type_2
210                 sport_to = self.icmp6_type_2
211                 dport_from = self.icmp6_code_from_2
212                 dport_to = self.icmp6_code_to_2
213             elif proto == self.proto[self.IP][self.TCP]:
214                 sport_from = self.tcp_sport_from_2
215                 sport_to = self.tcp_sport_to_2
216                 dport_from = self.tcp_dport_from_2
217                 dport_to = self.tcp_dport_to_2
218             elif proto == self.proto[self.IP][self.UDP]:
219                 sport_from = self.udp_sport_from_2
220                 sport_to = self.udp_sport_to_2
221                 dport_from = self.udp_dport_from_2
222                 dport_to = self.udp_dport_to_2
223         else:
224             sport_from = ports
225             sport_to = ports
226             dport_from = ports
227             dport_to = ports
228
229         rule = ({'is_permit': permit_deny, 'is_ipv6': ip, 'proto': proto,
230                  'srcport_or_icmptype_first': sport_from,
231                  'srcport_or_icmptype_last': sport_to,
232                  'src_ip_prefix_len': s_prefix,
233                  'src_ip_addr': s_ip,
234                  'dstport_or_icmpcode_first': dport_from,
235                  'dstport_or_icmpcode_last': dport_to,
236                  'dst_ip_prefix_len': d_prefix,
237                  'dst_ip_addr': d_ip})
238         return rule
239
240     def apply_rules(self, rules, tag=''):
241         reply = self.api_acl_add_replace(acl_index=4294967295, r=rules,
242                                          count=len(rules),
243                                          tag=tag)
244         self.logger.info("Dumped ACL: " + str(
245             self.api_acl_dump(reply.acl_index)))
246         # Apply a ACL on the interface as inbound
247         for i in self.pg_interfaces:
248             self.api_acl_interface_set_acl_list(sw_if_index=i.sw_if_index,
249                                                 count=1, n_input=1,
250                                                 acls=[reply.acl_index])
251         return
252
253     def create_upper_layer(self, packet_index, proto, ports=0):
254         p = self.proto_map[proto]
255         if p == 'UDP':
256             if ports == 0:
257                 return UDP(sport=random.randint(self.udp_sport_from,
258                                                 self.udp_sport_to),
259                            dport=random.randint(self.udp_dport_from,
260                                                 self.udp_dport_to))
261             else:
262                 return UDP(sport=ports, dport=ports)
263         elif p == 'TCP':
264             if ports == 0:
265                 return TCP(sport=random.randint(self.tcp_sport_from,
266                                                 self.tcp_sport_to),
267                            dport=random.randint(self.tcp_dport_from,
268                                                 self.tcp_dport_to))
269             else:
270                 return TCP(sport=ports, dport=ports)
271         return ''
272
273     def create_stream(self, src_if, packet_sizes, traffic_type=0, ipv6=0,
274                       proto=-1, ports=0, fragments=False, pkt_raw=True):
275         """
276         Create input packet stream for defined interface using hosts or
277         deleted_hosts list.
278
279         :param object src_if: Interface to create packet stream for.
280         :param list packet_sizes: List of required packet sizes.
281         :param traffic_type: 1: ICMP packet, 2: IPv6 with EH, 0: otherwise.
282         :return: Stream of packets.
283         """
284         pkts = []
285         if self.flows.__contains__(src_if):
286             src_hosts = self.hosts_by_pg_idx[src_if.sw_if_index]
287             for dst_if in self.flows[src_if]:
288                 dst_hosts = self.hosts_by_pg_idx[dst_if.sw_if_index]
289                 n_int = len(dst_hosts) * len(src_hosts)
290                 for i in range(0, n_int):
291                     dst_host = dst_hosts[i / len(src_hosts)]
292                     src_host = src_hosts[i % len(src_hosts)]
293                     pkt_info = self.create_packet_info(src_if, dst_if)
294                     if ipv6 == 1:
295                         pkt_info.ip = 1
296                     elif ipv6 == 0:
297                         pkt_info.ip = 0
298                     else:
299                         pkt_info.ip = random.choice([0, 1])
300                     if proto == -1:
301                         pkt_info.proto = random.choice(self.proto[self.IP])
302                     else:
303                         pkt_info.proto = proto
304                     payload = self.info_to_payload(pkt_info)
305                     p = Ether(dst=dst_host.mac, src=src_host.mac)
306                     if pkt_info.ip:
307                         p /= IPv6(dst=dst_host.ip6, src=src_host.ip6)
308                         if fragments:
309                             p /= IPv6ExtHdrFragment(offset=64, m=1)
310                     else:
311                         if fragments:
312                             p /= IP(src=src_host.ip4, dst=dst_host.ip4,
313                                     flags=1, frag=64)
314                         else:
315                             p /= IP(src=src_host.ip4, dst=dst_host.ip4)
316                     if traffic_type == self.ICMP:
317                         if pkt_info.ip:
318                             p /= ICMPv6EchoRequest(type=self.icmp6_type,
319                                                    code=self.icmp6_code)
320                         else:
321                             p /= ICMP(type=self.icmp4_type,
322                                       code=self.icmp4_code)
323                     else:
324                         p /= self.create_upper_layer(i, pkt_info.proto, ports)
325                     if pkt_raw:
326                         p /= Raw(payload)
327                         pkt_info.data = p.copy()
328                     if pkt_raw:
329                         size = random.choice(packet_sizes)
330                         self.extend_packet(p, size)
331                     pkts.append(p)
332         return pkts
333
334     def verify_capture(self, pg_if, capture, traffic_type=0, ip_type=0):
335         """
336         Verify captured input packet stream for defined interface.
337
338         :param object pg_if: Interface to verify captured packet stream for.
339         :param list capture: Captured packet stream.
340         :param traffic_type: 1: ICMP packet, 2: IPv6 with EH, 0: otherwise.
341         """
342         last_info = dict()
343         for i in self.pg_interfaces:
344             last_info[i.sw_if_index] = None
345         dst_sw_if_index = pg_if.sw_if_index
346         for packet in capture:
347             try:
348                 # Raw data for ICMPv6 are stored in ICMPv6EchoRequest.data
349                 if traffic_type == self.ICMP and ip_type == self.IPV6:
350                     payload_info = self.payload_to_info(
351                         packet[ICMPv6EchoRequest].data)
352                     payload = packet[ICMPv6EchoRequest]
353                 else:
354                     payload_info = self.payload_to_info(str(packet[Raw]))
355                     payload = packet[self.proto_map[payload_info.proto]]
356             except:
357                 self.logger.error(ppp("Unexpected or invalid packet "
358                                       "(outside network):", packet))
359                 raise
360
361             if ip_type != 0:
362                 self.assertEqual(payload_info.ip, ip_type)
363             if traffic_type == self.ICMP:
364                 try:
365                     if payload_info.ip == 0:
366                         self.assertEqual(payload.type, self.icmp4_type)
367                         self.assertEqual(payload.code, self.icmp4_code)
368                     else:
369                         self.assertEqual(payload.type, self.icmp6_type)
370                         self.assertEqual(payload.code, self.icmp6_code)
371                 except:
372                     self.logger.error(ppp("Unexpected or invalid packet "
373                                           "(outside network):", packet))
374                     raise
375             else:
376                 try:
377                     ip_version = IPv6 if payload_info.ip == 1 else IP
378
379                     ip = packet[ip_version]
380                     packet_index = payload_info.index
381
382                     self.assertEqual(payload_info.dst, dst_sw_if_index)
383                     self.logger.debug("Got packet on port %s: src=%u (id=%u)" %
384                                       (pg_if.name, payload_info.src,
385                                        packet_index))
386                     next_info = self.get_next_packet_info_for_interface2(
387                         payload_info.src, dst_sw_if_index,
388                         last_info[payload_info.src])
389                     last_info[payload_info.src] = next_info
390                     self.assertTrue(next_info is not None)
391                     self.assertEqual(packet_index, next_info.index)
392                     saved_packet = next_info.data
393                     # Check standard fields
394                     self.assertEqual(ip.src, saved_packet[ip_version].src)
395                     self.assertEqual(ip.dst, saved_packet[ip_version].dst)
396                     p = self.proto_map[payload_info.proto]
397                     if p == 'TCP':
398                         tcp = packet[TCP]
399                         self.assertEqual(tcp.sport, saved_packet[
400                             TCP].sport)
401                         self.assertEqual(tcp.dport, saved_packet[
402                             TCP].dport)
403                     elif p == 'UDP':
404                         udp = packet[UDP]
405                         self.assertEqual(udp.sport, saved_packet[
406                             UDP].sport)
407                         self.assertEqual(udp.dport, saved_packet[
408                             UDP].dport)
409                 except:
410                     self.logger.error(ppp("Unexpected or invalid packet:",
411                                           packet))
412                     raise
413         for i in self.pg_interfaces:
414             remaining_packet = self.get_next_packet_info_for_interface2(
415                 i, dst_sw_if_index, last_info[i.sw_if_index])
416             self.assertTrue(
417                 remaining_packet is None,
418                 "Port %u: Packet expected from source %u didn't arrive" %
419                 (dst_sw_if_index, i.sw_if_index))
420
421     def run_traffic_no_check(self):
422         # Test
423         # Create incoming packet streams for packet-generator interfaces
424         for i in self.pg_interfaces:
425             if self.flows.__contains__(i):
426                 pkts = self.create_stream(i, self.pg_if_packet_sizes)
427                 if len(pkts) > 0:
428                     i.add_stream(pkts)
429
430         # Enable packet capture and start packet sending
431         self.pg_enable_capture(self.pg_interfaces)
432         self.pg_start()
433
434     def run_verify_test(self, traffic_type=0, ip_type=0, proto=-1, ports=0,
435                         frags=False, pkt_raw=True):
436         # Test
437         # Create incoming packet streams for packet-generator interfaces
438         pkts_cnt = 0
439         for i in self.pg_interfaces:
440             if self.flows.__contains__(i):
441                 pkts = self.create_stream(i, self.pg_if_packet_sizes,
442                                           traffic_type, ip_type, proto, ports,
443                                           frags, pkt_raw)
444                 if len(pkts) > 0:
445                     i.add_stream(pkts)
446                     pkts_cnt += len(pkts)
447
448         # Enable packet capture and start packet sendingself.IPV
449         self.pg_enable_capture(self.pg_interfaces)
450         self.pg_start()
451
452         # Verify
453         # Verify outgoing packet streams per packet-generator interface
454         for src_if in self.pg_interfaces:
455             if self.flows.__contains__(src_if):
456                 for dst_if in self.flows[src_if]:
457                     capture = dst_if.get_capture(pkts_cnt)
458                     self.logger.info("Verifying capture on interface %s" %
459                                      dst_if.name)
460                     self.verify_capture(dst_if, capture, traffic_type, ip_type)
461
462     def run_verify_negat_test(self, traffic_type=0, ip_type=0, proto=-1,
463                               ports=0, frags=False):
464         # Test
465         self.reset_packet_infos()
466         for i in self.pg_interfaces:
467             if self.flows.__contains__(i):
468                 pkts = self.create_stream(i, self.pg_if_packet_sizes,
469                                           traffic_type, ip_type, proto, ports,
470                                           frags)
471                 if len(pkts) > 0:
472                     i.add_stream(pkts)
473
474         # Enable packet capture and start packet sending
475         self.pg_enable_capture(self.pg_interfaces)
476         self.pg_start()
477
478         # Verify
479         # Verify outgoing packet streams per packet-generator interface
480         for src_if in self.pg_interfaces:
481             if self.flows.__contains__(src_if):
482                 for dst_if in self.flows[src_if]:
483                     self.logger.info("Verifying capture on interface %s" %
484                                      dst_if.name)
485                     capture = dst_if.get_capture(0)
486                     self.assertEqual(len(capture), 0)
487
488     def api_acl_add_replace(self, acl_index, r, count, tag='',
489                             expected_retval=0):
490         """Add/replace an ACL
491
492         :param int acl_index: ACL index to replace,
493         4294967295 to create new ACL.
494         :param acl_rule r: ACL rules array.
495         :param str tag: symbolic tag (description) for this ACL.
496         :param int count: number of rules.
497         """
498         return self.vapi.api(self.vapi.papi.acl_add_replace,
499                              {'acl_index': acl_index,
500                               'r': r,
501                               'count': count,
502                               'tag': tag},
503                              expected_retval=expected_retval)
504
505     def api_acl_interface_set_acl_list(self, sw_if_index, count, n_input, acls,
506                                        expected_retval=0):
507         return self.vapi.api(self.vapi.papi.acl_interface_set_acl_list,
508                              {'sw_if_index': sw_if_index,
509                               'count': count,
510                               'n_input': n_input,
511                               'acls': acls},
512                              expected_retval=expected_retval)
513
514     def api_acl_dump(self, acl_index, expected_retval=0):
515         return self.vapi.api(self.vapi.papi.acl_dump,
516                              {'acl_index': acl_index},
517                              expected_retval=expected_retval)
518
519     def test_0000_warmup_test(self):
520         """ ACL plugin version check; learn MACs
521         """
522         self.create_hosts(16)
523         self.run_traffic_no_check()
524         reply = self.vapi.papi.acl_plugin_get_version()
525         self.assertEqual(reply.major, 1)
526         self.logger.info("Working with ACL plugin version: %d.%d" % (
527             reply.major, reply.minor))
528         # minor version changes are non breaking
529         # self.assertEqual(reply.minor, 0)
530
531     def test_0001_acl_create(self):
532         """ ACL create test
533         """
534
535         self.logger.info("ACLP_TEST_START_0001")
536         # Add an ACL
537         r = [{'is_permit': 1, 'is_ipv6': 0, 'proto': 17,
538               'srcport_or_icmptype_first': 1234,
539               'srcport_or_icmptype_last': 1235,
540               'src_ip_prefix_len': 0,
541               'src_ip_addr': '\x00\x00\x00\x00',
542               'dstport_or_icmpcode_first': 1234,
543               'dstport_or_icmpcode_last': 1234,
544               'dst_ip_addr': '\x00\x00\x00\x00',
545               'dst_ip_prefix_len': 0}]
546         # Test 1: add a new ACL
547         reply = self.api_acl_add_replace(acl_index=4294967295, r=r,
548                                          count=len(r), tag="permit 1234")
549         self.assertEqual(reply.retval, 0)
550         # The very first ACL gets #0
551         self.assertEqual(reply.acl_index, 0)
552         rr = self.api_acl_dump(reply.acl_index)
553         self.logger.info("Dumped ACL: " + str(rr))
554         self.assertEqual(len(rr), 1)
555         # We should have the same number of ACL entries as we had asked
556         self.assertEqual(len(rr[0].r), len(r))
557         # The rules should be the same. But because the submitted and returned
558         # are different types, we need to iterate over rules and keys to get
559         # to basic values.
560         for i_rule in range(0, len(r) - 1):
561             for rule_key in r[i_rule]:
562                 self.assertEqual(rr[0].r[i_rule][rule_key],
563                                  r[i_rule][rule_key])
564
565         # Add a deny-1234 ACL
566         r_deny = ({'is_permit': 0, 'is_ipv6': 0, 'proto': 17,
567                    'srcport_or_icmptype_first': 1234,
568                    'srcport_or_icmptype_last': 1235,
569                    'src_ip_prefix_len': 0,
570                    'src_ip_addr': '\x00\x00\x00\x00',
571                    'dstport_or_icmpcode_first': 1234,
572                    'dstport_or_icmpcode_last': 1234,
573                    'dst_ip_addr': '\x00\x00\x00\x00',
574                    'dst_ip_prefix_len': 0},
575                   {'is_permit': 1, 'is_ipv6': 0, 'proto': 17,
576                    'srcport_or_icmptype_first': 0,
577                    'srcport_or_icmptype_last': 0,
578                    'src_ip_prefix_len': 0,
579                    'src_ip_addr': '\x00\x00\x00\x00',
580                    'dstport_or_icmpcode_first': 0,
581                    'dstport_or_icmpcode_last': 0,
582                    'dst_ip_addr': '\x00\x00\x00\x00',
583                    'dst_ip_prefix_len': 0})
584
585         reply = self.api_acl_add_replace(acl_index=4294967295, r=r_deny,
586                                          count=len(r_deny),
587                                          tag="deny 1234;permit all")
588         self.assertEqual(reply.retval, 0)
589         # The second ACL gets #1
590         self.assertEqual(reply.acl_index, 1)
591
592         # Test 2: try to modify a nonexistent ACL
593         reply = self.api_acl_add_replace(acl_index=432, r=r, count=len(r),
594                                          tag="FFFF:FFFF", expected_retval=-1)
595         self.assertEqual(reply.retval, -1)
596         # The ACL number should pass through
597         self.assertEqual(reply.acl_index, 432)
598
599         self.logger.info("ACLP_TEST_FINISH_0001")
600
601     def test_0002_acl_permit_apply(self):
602         """ permit ACL apply test
603         """
604         self.logger.info("ACLP_TEST_START_0002")
605
606         rules = []
607         rules.append(self.create_rule(self.IPV4, self.PERMIT,
608                      0, self.proto[self.IP][self.UDP]))
609         rules.append(self.create_rule(self.IPV4, self.PERMIT,
610                      0, self.proto[self.IP][self.TCP]))
611
612         # Apply rules
613         self.apply_rules(rules, "permit per-flow")
614
615         # Traffic should still pass
616         self.run_verify_test(self.IP, self.IPV4, -1)
617         self.logger.info("ACLP_TEST_FINISH_0002")
618
619     def test_0003_acl_deny_apply(self):
620         """ deny ACL apply test
621         """
622         self.logger.info("ACLP_TEST_START_0003")
623         # Add a deny-flows ACL
624         rules = []
625         rules.append(self.create_rule(self.IPV4, self.DENY,
626                      self.PORTS_ALL, self.proto[self.IP][self.UDP]))
627         # Permit ip any any in the end
628         rules.append(self.create_rule(self.IPV4, self.PERMIT,
629                                       self.PORTS_ALL, 0))
630
631         # Apply rules
632         self.apply_rules(rules, "deny per-flow;permit all")
633
634         # Traffic should not pass
635         self.run_verify_negat_test(self.IP, self.IPV4,
636                                    self.proto[self.IP][self.UDP])
637         self.logger.info("ACLP_TEST_FINISH_0003")
638         # self.assertEqual(1, 0)
639
640     def test_0004_vpp624_permit_icmpv4(self):
641         """ VPP_624 permit ICMPv4
642         """
643         self.logger.info("ACLP_TEST_START_0004")
644
645         # Add an ACL
646         rules = []
647         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
648                                       self.proto[self.ICMP][self.ICMPv4]))
649         # deny ip any any in the end
650         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
651
652         # Apply rules
653         self.apply_rules(rules, "permit icmpv4")
654
655         # Traffic should still pass
656         self.run_verify_test(self.ICMP, self.IPV4,
657                              self.proto[self.ICMP][self.ICMPv4])
658
659         self.logger.info("ACLP_TEST_FINISH_0004")
660
661     def test_0005_vpp624_permit_icmpv6(self):
662         """ VPP_624 permit ICMPv6
663         """
664         self.logger.info("ACLP_TEST_START_0005")
665
666         # Add an ACL
667         rules = []
668         rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
669                                       self.proto[self.ICMP][self.ICMPv6]))
670         # deny ip any any in the end
671         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
672
673         # Apply rules
674         self.apply_rules(rules, "permit icmpv6")
675
676         # Traffic should still pass
677         self.run_verify_test(self.ICMP, self.IPV6,
678                              self.proto[self.ICMP][self.ICMPv6])
679
680         self.logger.info("ACLP_TEST_FINISH_0005")
681
682     def test_0006_vpp624_deny_icmpv4(self):
683         """ VPP_624 deny ICMPv4
684         """
685         self.logger.info("ACLP_TEST_START_0006")
686         # Add an ACL
687         rules = []
688         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
689                                       self.proto[self.ICMP][self.ICMPv4]))
690         # permit ip any any in the end
691         rules.append(self.create_rule(self.IPV4, self.PERMIT,
692                                       self.PORTS_ALL, 0))
693
694         # Apply rules
695         self.apply_rules(rules, "deny icmpv4")
696
697         # Traffic should not pass
698         self.run_verify_negat_test(self.ICMP, self.IPV4, 0)
699
700         self.logger.info("ACLP_TEST_FINISH_0006")
701
702     def test_0007_vpp624_deny_icmpv6(self):
703         """ VPP_624 deny ICMPv6
704         """
705         self.logger.info("ACLP_TEST_START_0007")
706         # Add an ACL
707         rules = []
708         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
709                                       self.proto[self.ICMP][self.ICMPv6]))
710         # deny ip any any in the end
711         rules.append(self.create_rule(self.IPV6, self.PERMIT,
712                                       self.PORTS_ALL, 0))
713
714         # Apply rules
715         self.apply_rules(rules, "deny icmpv6")
716
717         # Traffic should not pass
718         self.run_verify_negat_test(self.ICMP, self.IPV6, 0)
719
720         self.logger.info("ACLP_TEST_FINISH_0007")
721
722     def test_0008_tcp_permit_v4(self):
723         """ permit TCPv4
724         """
725         self.logger.info("ACLP_TEST_START_0008")
726
727         # Add an ACL
728         rules = []
729         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
730                      self.proto[self.IP][self.TCP]))
731         # deny ip any any in the end
732         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
733
734         # Apply rules
735         self.apply_rules(rules, "permit ipv4 tcp")
736
737         # Traffic should still pass
738         self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP])
739
740         self.logger.info("ACLP_TEST_FINISH_0008")
741
742     def test_0009_tcp_permit_v6(self):
743         """ permit TCPv6
744         """
745         self.logger.info("ACLP_TEST_START_0009")
746
747         # Add an ACL
748         rules = []
749         rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
750                                       self.proto[self.IP][self.TCP]))
751         # deny ip any any in the end
752         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
753
754         # Apply rules
755         self.apply_rules(rules, "permit ip6 tcp")
756
757         # Traffic should still pass
758         self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.TCP])
759
760         self.logger.info("ACLP_TEST_FINISH_0008")
761
762     def test_0010_udp_permit_v4(self):
763         """ permit UDPv4
764         """
765         self.logger.info("ACLP_TEST_START_0010")
766
767         # Add an ACL
768         rules = []
769         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
770                                       self.proto[self.IP][self.UDP]))
771         # deny ip any any in the end
772         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
773
774         # Apply rules
775         self.apply_rules(rules, "permit ipv udp")
776
777         # Traffic should still pass
778         self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP])
779
780         self.logger.info("ACLP_TEST_FINISH_0010")
781
782     def test_0011_udp_permit_v6(self):
783         """ permit UDPv6
784         """
785         self.logger.info("ACLP_TEST_START_0011")
786
787         # Add an ACL
788         rules = []
789         rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
790                                       self.proto[self.IP][self.UDP]))
791         # deny ip any any in the end
792         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
793
794         # Apply rules
795         self.apply_rules(rules, "permit ip6 udp")
796
797         # Traffic should still pass
798         self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.UDP])
799
800         self.logger.info("ACLP_TEST_FINISH_0011")
801
802     def test_0012_tcp_deny(self):
803         """ deny TCPv4/v6
804         """
805         self.logger.info("ACLP_TEST_START_0012")
806
807         # Add an ACL
808         rules = []
809         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
810                                       self.proto[self.IP][self.TCP]))
811         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
812                                       self.proto[self.IP][self.TCP]))
813         # permit ip any any in the end
814         rules.append(self.create_rule(self.IPV4, self.PERMIT,
815                                       self.PORTS_ALL, 0))
816         rules.append(self.create_rule(self.IPV6, self.PERMIT,
817                                       self.PORTS_ALL, 0))
818
819         # Apply rules
820         self.apply_rules(rules, "deny ip4/ip6 tcp")
821
822         # Traffic should not pass
823         self.run_verify_negat_test(self.IP, self.IPRANDOM,
824                                    self.proto[self.IP][self.TCP])
825
826         self.logger.info("ACLP_TEST_FINISH_0012")
827
828     def test_0013_udp_deny(self):
829         """ deny UDPv4/v6
830         """
831         self.logger.info("ACLP_TEST_START_0013")
832
833         # Add an ACL
834         rules = []
835         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
836                                       self.proto[self.IP][self.UDP]))
837         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
838                                       self.proto[self.IP][self.UDP]))
839         # permit ip any any in the end
840         rules.append(self.create_rule(self.IPV4, self.PERMIT,
841                                       self.PORTS_ALL, 0))
842         rules.append(self.create_rule(self.IPV6, self.PERMIT,
843                                       self.PORTS_ALL, 0))
844
845         # Apply rules
846         self.apply_rules(rules, "deny ip4/ip6 udp")
847
848         # Traffic should not pass
849         self.run_verify_negat_test(self.IP, self.IPRANDOM,
850                                    self.proto[self.IP][self.UDP])
851
852         self.logger.info("ACLP_TEST_FINISH_0013")
853
854     def test_0014_acl_dump(self):
855         """ verify add/dump acls
856         """
857         self.logger.info("ACLP_TEST_START_0014")
858
859         r = [[self.IPV4, self.PERMIT, 1234, self.proto[self.IP][self.TCP]],
860              [self.IPV4, self.PERMIT, 2345, self.proto[self.IP][self.UDP]],
861              [self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.TCP]],
862              [self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.UDP]],
863              [self.IPV4, self.PERMIT, 5, self.proto[self.ICMP][self.ICMPv4]],
864              [self.IPV6, self.PERMIT, 4321, self.proto[self.IP][self.TCP]],
865              [self.IPV6, self.PERMIT, 5432, self.proto[self.IP][self.UDP]],
866              [self.IPV6, self.PERMIT, 0, self.proto[self.IP][self.TCP]],
867              [self.IPV6, self.PERMIT, 0, self.proto[self.IP][self.UDP]],
868              [self.IPV6, self.PERMIT, 6, self.proto[self.ICMP][self.ICMPv6]],
869              [self.IPV4, self.DENY, self.PORTS_ALL, 0],
870              [self.IPV4, self.DENY, 1234, self.proto[self.IP][self.TCP]],
871              [self.IPV4, self.DENY, 2345, self.proto[self.IP][self.UDP]],
872              [self.IPV4, self.DENY, 5, self.proto[self.ICMP][self.ICMPv4]],
873              [self.IPV6, self.DENY, 4321, self.proto[self.IP][self.TCP]],
874              [self.IPV6, self.DENY, 5432, self.proto[self.IP][self.UDP]],
875              [self.IPV6, self.DENY, 6, self.proto[self.ICMP][self.ICMPv6]],
876              [self.IPV6, self.DENY, self.PORTS_ALL, 0]
877              ]
878
879         # Add and verify new ACLs
880         rules = []
881         for i in range(len(r)):
882             rules.append(self.create_rule(r[i][0], r[i][1], r[i][2], r[i][3]))
883
884         reply = self.api_acl_add_replace(acl_index=4294967295, r=rules,
885                                          count=len(rules))
886         result = self.api_acl_dump(reply.acl_index)
887
888         i = 0
889         for drules in result:
890             for dr in drules.r:
891                 self.assertEqual(dr.is_ipv6, r[i][0])
892                 self.assertEqual(dr.is_permit, r[i][1])
893                 self.assertEqual(dr.proto, r[i][3])
894
895                 if r[i][2] > 0:
896                     self.assertEqual(dr.srcport_or_icmptype_first, r[i][2])
897                 else:
898                     if r[i][2] < 0:
899                         self.assertEqual(dr.srcport_or_icmptype_first, 0)
900                         self.assertEqual(dr.srcport_or_icmptype_last, 65535)
901                     else:
902                         if dr.proto == self.proto[self.IP][self.TCP]:
903                             self.assertGreater(dr.srcport_or_icmptype_first,
904                                                self.tcp_sport_from-1)
905                             self.assertLess(dr.srcport_or_icmptype_first,
906                                             self.tcp_sport_to+1)
907                             self.assertGreater(dr.dstport_or_icmpcode_last,
908                                                self.tcp_dport_from-1)
909                             self.assertLess(dr.dstport_or_icmpcode_last,
910                                             self.tcp_dport_to+1)
911                         elif dr.proto == self.proto[self.IP][self.UDP]:
912                             self.assertGreater(dr.srcport_or_icmptype_first,
913                                                self.udp_sport_from-1)
914                             self.assertLess(dr.srcport_or_icmptype_first,
915                                             self.udp_sport_to+1)
916                             self.assertGreater(dr.dstport_or_icmpcode_last,
917                                                self.udp_dport_from-1)
918                             self.assertLess(dr.dstport_or_icmpcode_last,
919                                             self.udp_dport_to+1)
920                 i += 1
921
922         self.logger.info("ACLP_TEST_FINISH_0014")
923
924     def test_0015_tcp_permit_port_v4(self):
925         """ permit single TCPv4
926         """
927         self.logger.info("ACLP_TEST_START_0015")
928
929         port = random.randint(0, 65535)
930         # Add an ACL
931         rules = []
932         rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
933                                       self.proto[self.IP][self.TCP]))
934         # deny ip any any in the end
935         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
936
937         # Apply rules
938         self.apply_rules(rules, "permit ip4 tcp "+str(port))
939
940         # Traffic should still pass
941         self.run_verify_test(self.IP, self.IPV4,
942                              self.proto[self.IP][self.TCP], port)
943
944         self.logger.info("ACLP_TEST_FINISH_0015")
945
946     def test_0016_udp_permit_port_v4(self):
947         """ permit single UDPv4
948         """
949         self.logger.info("ACLP_TEST_START_0016")
950
951         port = random.randint(0, 65535)
952         # Add an ACL
953         rules = []
954         rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
955                                       self.proto[self.IP][self.UDP]))
956         # deny ip any any in the end
957         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
958
959         # Apply rules
960         self.apply_rules(rules, "permit ip4 tcp "+str(port))
961
962         # Traffic should still pass
963         self.run_verify_test(self.IP, self.IPV4,
964                              self.proto[self.IP][self.UDP], port)
965
966         self.logger.info("ACLP_TEST_FINISH_0016")
967
968     def test_0017_tcp_permit_port_v6(self):
969         """ permit single TCPv6
970         """
971         self.logger.info("ACLP_TEST_START_0017")
972
973         port = random.randint(0, 65535)
974         # Add an ACL
975         rules = []
976         rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
977                                       self.proto[self.IP][self.TCP]))
978         # deny ip any any in the end
979         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
980
981         # Apply rules
982         self.apply_rules(rules, "permit ip4 tcp "+str(port))
983
984         # Traffic should still pass
985         self.run_verify_test(self.IP, self.IPV6,
986                              self.proto[self.IP][self.TCP], port)
987
988         self.logger.info("ACLP_TEST_FINISH_0017")
989
990     def test_0018_udp_permit_port_v6(self):
991         """ permit single UPPv6
992         """
993         self.logger.info("ACLP_TEST_START_0018")
994
995         port = random.randint(0, 65535)
996         # Add an ACL
997         rules = []
998         rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
999                                       self.proto[self.IP][self.UDP]))
1000         # deny ip any any in the end
1001         rules.append(self.create_rule(self.IPV6, self.DENY,
1002                                       self.PORTS_ALL, 0))
1003
1004         # Apply rules
1005         self.apply_rules(rules, "permit ip4 tcp "+str(port))
1006
1007         # Traffic should still pass
1008         self.run_verify_test(self.IP, self.IPV6,
1009                              self.proto[self.IP][self.UDP], port)
1010
1011         self.logger.info("ACLP_TEST_FINISH_0018")
1012
1013     def test_0019_udp_deny_port(self):
1014         """ deny single TCPv4/v6
1015         """
1016         self.logger.info("ACLP_TEST_START_0019")
1017
1018         port = random.randint(0, 65535)
1019         # Add an ACL
1020         rules = []
1021         rules.append(self.create_rule(self.IPV4, self.DENY, port,
1022                                       self.proto[self.IP][self.TCP]))
1023         rules.append(self.create_rule(self.IPV6, self.DENY, port,
1024                                       self.proto[self.IP][self.TCP]))
1025         # Permit ip any any in the end
1026         rules.append(self.create_rule(self.IPV4, self.PERMIT,
1027                                       self.PORTS_ALL, 0))
1028         rules.append(self.create_rule(self.IPV6, self.PERMIT,
1029                                       self.PORTS_ALL, 0))
1030
1031         # Apply rules
1032         self.apply_rules(rules, "deny ip4/ip6 udp "+str(port))
1033
1034         # Traffic should not pass
1035         self.run_verify_negat_test(self.IP, self.IPRANDOM,
1036                                    self.proto[self.IP][self.TCP], port)
1037
1038         self.logger.info("ACLP_TEST_FINISH_0019")
1039
1040     def test_0020_udp_deny_port(self):
1041         """ deny single UDPv4/v6
1042         """
1043         self.logger.info("ACLP_TEST_START_0020")
1044
1045         port = random.randint(0, 65535)
1046         # Add an ACL
1047         rules = []
1048         rules.append(self.create_rule(self.IPV4, self.DENY, port,
1049                                       self.proto[self.IP][self.UDP]))
1050         rules.append(self.create_rule(self.IPV6, self.DENY, port,
1051                                       self.proto[self.IP][self.UDP]))
1052         # Permit ip any any in the end
1053         rules.append(self.create_rule(self.IPV4, self.PERMIT,
1054                                       self.PORTS_ALL, 0))
1055         rules.append(self.create_rule(self.IPV6, self.PERMIT,
1056                                       self.PORTS_ALL, 0))
1057
1058         # Apply rules
1059         self.apply_rules(rules, "deny ip4/ip6 udp "+str(port))
1060
1061         # Traffic should not pass
1062         self.run_verify_negat_test(self.IP, self.IPRANDOM,
1063                                    self.proto[self.IP][self.UDP], port)
1064
1065         self.logger.info("ACLP_TEST_FINISH_0020")
1066
1067     def test_0021_udp_deny_port_verify_fragment_deny(self):
1068         """ deny single UDPv4/v6, permit ip any, verify non-initial fragment blocked
1069         """
1070         self.logger.info("ACLP_TEST_START_0021")
1071
1072         port = random.randint(0, 65535)
1073         # Add an ACL
1074         rules = []
1075         rules.append(self.create_rule(self.IPV4, self.DENY, port,
1076                                       self.proto[self.IP][self.UDP]))
1077         rules.append(self.create_rule(self.IPV6, self.DENY, port,
1078                                       self.proto[self.IP][self.UDP]))
1079         # deny ip any any in the end
1080         rules.append(self.create_rule(self.IPV4, self.PERMIT,
1081                                       self.PORTS_ALL, 0))
1082         rules.append(self.create_rule(self.IPV6, self.PERMIT,
1083                                       self.PORTS_ALL, 0))
1084
1085         # Apply rules
1086         self.apply_rules(rules, "deny ip4/ip6 udp "+str(port))
1087
1088         # Traffic should not pass
1089         self.run_verify_negat_test(self.IP, self.IPRANDOM,
1090                                    self.proto[self.IP][self.UDP], port, True)
1091
1092         self.logger.info("ACLP_TEST_FINISH_0021")
1093
1094     def test_0022_zero_length_udp_ipv4(self):
1095         """ VPP-687 zero length udp ipv4 packet"""
1096         self.logger.info("ACLP_TEST_START_0022")
1097
1098         port = random.randint(0, 65535)
1099         # Add an ACL
1100         rules = []
1101         rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
1102                                       self.proto[self.IP][self.UDP]))
1103         # deny ip any any in the end
1104         rules.append(
1105             self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1106
1107         # Apply rules
1108         self.apply_rules(rules, "permit empty udp ip4 " + str(port))
1109
1110         # Traffic should still pass
1111         # Create incoming packet streams for packet-generator interfaces
1112         pkts_cnt = 0
1113         pkts = self.create_stream(self.pg0, self.pg_if_packet_sizes,
1114                                   self.IP, self.IPV4,
1115                                   self.proto[self.IP][self.UDP], port,
1116                                   False, False)
1117         if len(pkts) > 0:
1118             self.pg0.add_stream(pkts)
1119             pkts_cnt += len(pkts)
1120
1121         # Enable packet capture and start packet sendingself.IPV
1122         self.pg_enable_capture(self.pg_interfaces)
1123         self.pg_start()
1124
1125         self.pg1.get_capture(pkts_cnt)
1126
1127         self.logger.info("ACLP_TEST_FINISH_0022")
1128
1129     def test_0023_zero_length_udp_ipv6(self):
1130         """ VPP-687 zero length udp ipv6 packet"""
1131         self.logger.info("ACLP_TEST_START_0023")
1132
1133         port = random.randint(0, 65535)
1134         # Add an ACL
1135         rules = []
1136         rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
1137                                       self.proto[self.IP][self.UDP]))
1138         # deny ip any any in the end
1139         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1140
1141         # Apply rules
1142         self.apply_rules(rules, "permit empty udp ip6 "+str(port))
1143
1144         # Traffic should still pass
1145         # Create incoming packet streams for packet-generator interfaces
1146         pkts_cnt = 0
1147         pkts = self.create_stream(self.pg0, self.pg_if_packet_sizes,
1148                                   self.IP, self.IPV6,
1149                                   self.proto[self.IP][self.UDP], port,
1150                                   False, False)
1151         if len(pkts) > 0:
1152             self.pg0.add_stream(pkts)
1153             pkts_cnt += len(pkts)
1154
1155         # Enable packet capture and start packet sendingself.IPV
1156         self.pg_enable_capture(self.pg_interfaces)
1157         self.pg_start()
1158
1159         # Verify outgoing packet streams per packet-generator interface
1160         self.pg1.get_capture(pkts_cnt)
1161
1162         self.logger.info("ACLP_TEST_FINISH_0023")
1163
1164     def test_0108_tcp_permit_v4(self):
1165         """ permit TCPv4 + non-match range
1166         """
1167         self.logger.info("ACLP_TEST_START_0108")
1168
1169         # Add an ACL
1170         rules = []
1171         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1172                      self.proto[self.IP][self.TCP]))
1173         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1174                      self.proto[self.IP][self.TCP]))
1175         # deny ip any any in the end
1176         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1177
1178         # Apply rules
1179         self.apply_rules(rules, "permit ipv4 tcp")
1180
1181         # Traffic should still pass
1182         self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP])
1183
1184         self.logger.info("ACLP_TEST_FINISH_0108")
1185
1186     def test_0109_tcp_permit_v6(self):
1187         """ permit TCPv6 + non-match range
1188         """
1189         self.logger.info("ACLP_TEST_START_0109")
1190
1191         # Add an ACL
1192         rules = []
1193         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE_2,
1194                                       self.proto[self.IP][self.TCP]))
1195         rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
1196                                       self.proto[self.IP][self.TCP]))
1197         # deny ip any any in the end
1198         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1199
1200         # Apply rules
1201         self.apply_rules(rules, "permit ip6 tcp")
1202
1203         # Traffic should still pass
1204         self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.TCP])
1205
1206         self.logger.info("ACLP_TEST_FINISH_0109")
1207
1208     def test_0110_udp_permit_v4(self):
1209         """ permit UDPv4 + non-match range
1210         """
1211         self.logger.info("ACLP_TEST_START_0110")
1212
1213         # Add an ACL
1214         rules = []
1215         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1216                                       self.proto[self.IP][self.UDP]))
1217         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1218                                       self.proto[self.IP][self.UDP]))
1219         # deny ip any any in the end
1220         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1221
1222         # Apply rules
1223         self.apply_rules(rules, "permit ipv4 udp")
1224
1225         # Traffic should still pass
1226         self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP])
1227
1228         self.logger.info("ACLP_TEST_FINISH_0110")
1229
1230     def test_0111_udp_permit_v6(self):
1231         """ permit UDPv6 + non-match range
1232         """
1233         self.logger.info("ACLP_TEST_START_0111")
1234
1235         # Add an ACL
1236         rules = []
1237         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE_2,
1238                                       self.proto[self.IP][self.UDP]))
1239         rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
1240                                       self.proto[self.IP][self.UDP]))
1241         # deny ip any any in the end
1242         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1243
1244         # Apply rules
1245         self.apply_rules(rules, "permit ip6 udp")
1246
1247         # Traffic should still pass
1248         self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.UDP])
1249
1250         self.logger.info("ACLP_TEST_FINISH_0111")
1251
1252     def test_0112_tcp_deny(self):
1253         """ deny TCPv4/v6 + non-match range
1254         """
1255         self.logger.info("ACLP_TEST_START_0112")
1256
1257         # Add an ACL
1258         rules = []
1259         rules.append(self.create_rule(self.IPV4, self.PERMIT,
1260                                       self.PORTS_RANGE_2,
1261                                       self.proto[self.IP][self.TCP]))
1262         rules.append(self.create_rule(self.IPV6, self.PERMIT,
1263                                       self.PORTS_RANGE_2,
1264                                       self.proto[self.IP][self.TCP]))
1265         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
1266                                       self.proto[self.IP][self.TCP]))
1267         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
1268                                       self.proto[self.IP][self.TCP]))
1269         # permit ip any any in the end
1270         rules.append(self.create_rule(self.IPV4, self.PERMIT,
1271                                       self.PORTS_ALL, 0))
1272         rules.append(self.create_rule(self.IPV6, self.PERMIT,
1273                                       self.PORTS_ALL, 0))
1274
1275         # Apply rules
1276         self.apply_rules(rules, "deny ip4/ip6 tcp")
1277
1278         # Traffic should not pass
1279         self.run_verify_negat_test(self.IP, self.IPRANDOM,
1280                                    self.proto[self.IP][self.TCP])
1281
1282         self.logger.info("ACLP_TEST_FINISH_0112")
1283
1284     def test_0113_udp_deny(self):
1285         """ deny UDPv4/v6 + non-match range
1286         """
1287         self.logger.info("ACLP_TEST_START_0113")
1288
1289         # Add an ACL
1290         rules = []
1291         rules.append(self.create_rule(self.IPV4, self.PERMIT,
1292                                       self.PORTS_RANGE_2,
1293                                       self.proto[self.IP][self.UDP]))
1294         rules.append(self.create_rule(self.IPV6, self.PERMIT,
1295                                       self.PORTS_RANGE_2,
1296                                       self.proto[self.IP][self.UDP]))
1297         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
1298                                       self.proto[self.IP][self.UDP]))
1299         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
1300                                       self.proto[self.IP][self.UDP]))
1301         # permit ip any any in the end
1302         rules.append(self.create_rule(self.IPV4, self.PERMIT,
1303                                       self.PORTS_ALL, 0))
1304         rules.append(self.create_rule(self.IPV6, self.PERMIT,
1305                                       self.PORTS_ALL, 0))
1306
1307         # Apply rules
1308         self.apply_rules(rules, "deny ip4/ip6 udp")
1309
1310         # Traffic should not pass
1311         self.run_verify_negat_test(self.IP, self.IPRANDOM,
1312                                    self.proto[self.IP][self.UDP])
1313
1314         self.logger.info("ACLP_TEST_FINISH_0113")
1315
1316
1317 if __name__ == '__main__':
1318     unittest.main(testRunner=VppTestRunner)