VPP-1033: Python API support arbitrary sized input parameters.
[vpp.git] / test / test_acl_plugin.py
1 #!/usr/bin/env python
2 """ACL plugin Test Case HLD:
3 """
4
5 import unittest
6 import random
7
8 from scapy.packet import Raw
9 from scapy.layers.l2 import Ether
10 from scapy.layers.inet import IP, TCP, UDP, ICMP
11 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest
12 from scapy.layers.inet6 import IPv6ExtHdrFragment
13 from framework import VppTestCase, VppTestRunner
14 from util import Host, ppp
15
16
17 class TestACLplugin(VppTestCase):
18     """ ACL plugin Test Case """
19
20     # traffic types
21     IP = 0
22     ICMP = 1
23
24     # IP version
25     IPRANDOM = -1
26     IPV4 = 0
27     IPV6 = 1
28
29     # rule types
30     DENY = 0
31     PERMIT = 1
32
33     # supported protocols
34     proto = [[6, 17], [1, 58]]
35     proto_map = {1: 'ICMP', 58: 'ICMPv6EchoRequest', 6: 'TCP', 17: 'UDP'}
36     ICMPv4 = 0
37     ICMPv6 = 1
38     TCP = 0
39     UDP = 1
40     PROTO_ALL = 0
41
42     # port ranges
43     PORTS_ALL = -1
44     PORTS_RANGE = 0
45     PORTS_RANGE_2 = 1
46     udp_sport_from = 10
47     udp_sport_to = udp_sport_from + 5
48     udp_dport_from = 20000
49     udp_dport_to = udp_dport_from + 5000
50     tcp_sport_from = 30
51     tcp_sport_to = tcp_sport_from + 5
52     tcp_dport_from = 40000
53     tcp_dport_to = tcp_dport_from + 5000
54
55     udp_sport_from_2 = 90
56     udp_sport_to_2 = udp_sport_from_2 + 5
57     udp_dport_from_2 = 30000
58     udp_dport_to_2 = udp_dport_from_2 + 5000
59     tcp_sport_from_2 = 130
60     tcp_sport_to_2 = tcp_sport_from_2 + 5
61     tcp_dport_from_2 = 20000
62     tcp_dport_to_2 = tcp_dport_from_2 + 5000
63
64     icmp4_type = 8  # echo request
65     icmp4_code = 3
66     icmp6_type = 128  # echo request
67     icmp6_code = 3
68
69     icmp4_type_2 = 8
70     icmp4_code_from_2 = 5
71     icmp4_code_to_2 = 20
72     icmp6_type_2 = 128
73     icmp6_code_from_2 = 8
74     icmp6_code_to_2 = 42
75
76     # Test variables
77     bd_id = 1
78
79     @classmethod
80     def setUpClass(cls):
81         """
82         Perform standard class setup (defined by class method setUpClass in
83         class VppTestCase) before running the test case, set test case related
84         variables and configure VPP.
85         """
86         super(TestACLplugin, cls).setUpClass()
87
88         random.seed()
89
90         try:
91             # Create 2 pg interfaces
92             cls.create_pg_interfaces(range(2))
93
94             # Packet flows mapping pg0 -> pg1, pg2 etc.
95             cls.flows = dict()
96             cls.flows[cls.pg0] = [cls.pg1]
97
98             # Packet sizes
99             cls.pg_if_packet_sizes = [64, 512, 1518, 9018]
100
101             # Create BD with MAC learning and unknown unicast flooding disabled
102             # and put interfaces to this BD
103             cls.vapi.bridge_domain_add_del(bd_id=cls.bd_id, uu_flood=1,
104                                            learn=1)
105             for pg_if in cls.pg_interfaces:
106                 cls.vapi.sw_interface_set_l2_bridge(pg_if.sw_if_index,
107                                                     bd_id=cls.bd_id)
108
109             # Set up all interfaces
110             for i in cls.pg_interfaces:
111                 i.admin_up()
112
113             # Mapping between packet-generator index and lists of test hosts
114             cls.hosts_by_pg_idx = dict()
115             for pg_if in cls.pg_interfaces:
116                 cls.hosts_by_pg_idx[pg_if.sw_if_index] = []
117
118             # Create list of deleted hosts
119             cls.deleted_hosts_by_pg_idx = dict()
120             for pg_if in cls.pg_interfaces:
121                 cls.deleted_hosts_by_pg_idx[pg_if.sw_if_index] = []
122
123             # warm-up the mac address tables
124             # self.warmup_test()
125
126         except Exception:
127             super(TestACLplugin, cls).tearDownClass()
128             raise
129
130     def setUp(self):
131         super(TestACLplugin, self).setUp()
132         self.reset_packet_infos()
133
134     def tearDown(self):
135         """
136         Show various debug prints after each test.
137         """
138         super(TestACLplugin, self).tearDown()
139         if not self.vpp_dead:
140             self.logger.info(self.vapi.ppcli("show l2fib verbose"))
141             self.logger.info(self.vapi.ppcli("show acl-plugin acl"))
142             self.logger.info(self.vapi.ppcli("show acl-plugin interface"))
143             self.logger.info(self.vapi.ppcli("show acl-plugin tables"))
144             self.logger.info(self.vapi.ppcli("show bridge-domain %s detail"
145                                              % self.bd_id))
146
147     def create_hosts(self, count, start=0):
148         """
149         Create required number of host MAC addresses and distribute them among
150         interfaces. Create host IPv4 address for every host MAC address.
151
152         :param int count: Number of hosts to create MAC/IPv4 addresses for.
153         :param int start: Number to start numbering from.
154         """
155         n_int = len(self.pg_interfaces)
156         macs_per_if = count / n_int
157         i = -1
158         for pg_if in self.pg_interfaces:
159             i += 1
160             start_nr = macs_per_if * i + start
161             end_nr = count + start if i == (n_int - 1) \
162                 else macs_per_if * (i + 1) + start
163             hosts = self.hosts_by_pg_idx[pg_if.sw_if_index]
164             for j in range(start_nr, end_nr):
165                 host = Host(
166                     "00:00:00:ff:%02x:%02x" % (pg_if.sw_if_index, j),
167                     "172.17.1%02x.%u" % (pg_if.sw_if_index, j),
168                     "2017:dead:%02x::%u" % (pg_if.sw_if_index, j))
169                 hosts.append(host)
170
171     def create_rule(self, ip=0, permit_deny=0, ports=PORTS_ALL, proto=-1,
172                     s_prefix=0, s_ip='\x00\x00\x00\x00',
173                     d_prefix=0, d_ip='\x00\x00\x00\x00'):
174         if proto == -1:
175             return
176         if ports == self.PORTS_ALL:
177             sport_from = 0
178             dport_from = 0
179             sport_to = 65535 if proto != 1 and proto != 58 else 255
180             dport_to = sport_to
181         elif ports == self.PORTS_RANGE:
182             if proto == 1:
183                 sport_from = self.icmp4_type
184                 sport_to = self.icmp4_type
185                 dport_from = self.icmp4_code
186                 dport_to = self.icmp4_code
187             elif proto == 58:
188                 sport_from = self.icmp6_type
189                 sport_to = self.icmp6_type
190                 dport_from = self.icmp6_code
191                 dport_to = self.icmp6_code
192             elif proto == self.proto[self.IP][self.TCP]:
193                 sport_from = self.tcp_sport_from
194                 sport_to = self.tcp_sport_to
195                 dport_from = self.tcp_dport_from
196                 dport_to = self.tcp_dport_to
197             elif proto == self.proto[self.IP][self.UDP]:
198                 sport_from = self.udp_sport_from
199                 sport_to = self.udp_sport_to
200                 dport_from = self.udp_dport_from
201                 dport_to = self.udp_dport_to
202         elif ports == self.PORTS_RANGE_2:
203             if proto == 1:
204                 sport_from = self.icmp4_type_2
205                 sport_to = self.icmp4_type_2
206                 dport_from = self.icmp4_code_from_2
207                 dport_to = self.icmp4_code_to_2
208             elif proto == 58:
209                 sport_from = self.icmp6_type_2
210                 sport_to = self.icmp6_type_2
211                 dport_from = self.icmp6_code_from_2
212                 dport_to = self.icmp6_code_to_2
213             elif proto == self.proto[self.IP][self.TCP]:
214                 sport_from = self.tcp_sport_from_2
215                 sport_to = self.tcp_sport_to_2
216                 dport_from = self.tcp_dport_from_2
217                 dport_to = self.tcp_dport_to_2
218             elif proto == self.proto[self.IP][self.UDP]:
219                 sport_from = self.udp_sport_from_2
220                 sport_to = self.udp_sport_to_2
221                 dport_from = self.udp_dport_from_2
222                 dport_to = self.udp_dport_to_2
223         else:
224             sport_from = ports
225             sport_to = ports
226             dport_from = ports
227             dport_to = ports
228
229         rule = ({'is_permit': permit_deny, 'is_ipv6': ip, 'proto': proto,
230                  'srcport_or_icmptype_first': sport_from,
231                  'srcport_or_icmptype_last': sport_to,
232                  'src_ip_prefix_len': s_prefix,
233                  'src_ip_addr': s_ip,
234                  'dstport_or_icmpcode_first': dport_from,
235                  'dstport_or_icmpcode_last': dport_to,
236                  'dst_ip_prefix_len': d_prefix,
237                  'dst_ip_addr': d_ip})
238         return rule
239
240     def apply_rules(self, rules, tag=''):
241         reply = self.vapi.acl_add_replace(acl_index=4294967295, r=rules,
242                                           tag=tag)
243         self.logger.info("Dumped ACL: " + str(
244             self.vapi.acl_dump(reply.acl_index)))
245         # Apply a ACL on the interface as inbound
246         for i in self.pg_interfaces:
247             self.vapi.acl_interface_set_acl_list(sw_if_index=i.sw_if_index,
248                                                  n_input=1,
249                                                  acls=[reply.acl_index])
250         return
251
252     def create_upper_layer(self, packet_index, proto, ports=0):
253         p = self.proto_map[proto]
254         if p == 'UDP':
255             if ports == 0:
256                 return UDP(sport=random.randint(self.udp_sport_from,
257                                                 self.udp_sport_to),
258                            dport=random.randint(self.udp_dport_from,
259                                                 self.udp_dport_to))
260             else:
261                 return UDP(sport=ports, dport=ports)
262         elif p == 'TCP':
263             if ports == 0:
264                 return TCP(sport=random.randint(self.tcp_sport_from,
265                                                 self.tcp_sport_to),
266                            dport=random.randint(self.tcp_dport_from,
267                                                 self.tcp_dport_to))
268             else:
269                 return TCP(sport=ports, dport=ports)
270         return ''
271
272     def create_stream(self, src_if, packet_sizes, traffic_type=0, ipv6=0,
273                       proto=-1, ports=0, fragments=False, pkt_raw=True):
274         """
275         Create input packet stream for defined interface using hosts or
276         deleted_hosts list.
277
278         :param object src_if: Interface to create packet stream for.
279         :param list packet_sizes: List of required packet sizes.
280         :param traffic_type: 1: ICMP packet, 2: IPv6 with EH, 0: otherwise.
281         :return: Stream of packets.
282         """
283         pkts = []
284         if self.flows.__contains__(src_if):
285             src_hosts = self.hosts_by_pg_idx[src_if.sw_if_index]
286             for dst_if in self.flows[src_if]:
287                 dst_hosts = self.hosts_by_pg_idx[dst_if.sw_if_index]
288                 n_int = len(dst_hosts) * len(src_hosts)
289                 for i in range(0, n_int):
290                     dst_host = dst_hosts[i / len(src_hosts)]
291                     src_host = src_hosts[i % len(src_hosts)]
292                     pkt_info = self.create_packet_info(src_if, dst_if)
293                     if ipv6 == 1:
294                         pkt_info.ip = 1
295                     elif ipv6 == 0:
296                         pkt_info.ip = 0
297                     else:
298                         pkt_info.ip = random.choice([0, 1])
299                     if proto == -1:
300                         pkt_info.proto = random.choice(self.proto[self.IP])
301                     else:
302                         pkt_info.proto = proto
303                     payload = self.info_to_payload(pkt_info)
304                     p = Ether(dst=dst_host.mac, src=src_host.mac)
305                     if pkt_info.ip:
306                         p /= IPv6(dst=dst_host.ip6, src=src_host.ip6)
307                         if fragments:
308                             p /= IPv6ExtHdrFragment(offset=64, m=1)
309                     else:
310                         if fragments:
311                             p /= IP(src=src_host.ip4, dst=dst_host.ip4,
312                                     flags=1, frag=64)
313                         else:
314                             p /= IP(src=src_host.ip4, dst=dst_host.ip4)
315                     if traffic_type == self.ICMP:
316                         if pkt_info.ip:
317                             p /= ICMPv6EchoRequest(type=self.icmp6_type,
318                                                    code=self.icmp6_code)
319                         else:
320                             p /= ICMP(type=self.icmp4_type,
321                                       code=self.icmp4_code)
322                     else:
323                         p /= self.create_upper_layer(i, pkt_info.proto, ports)
324                     if pkt_raw:
325                         p /= Raw(payload)
326                         pkt_info.data = p.copy()
327                     if pkt_raw:
328                         size = random.choice(packet_sizes)
329                         self.extend_packet(p, size)
330                     pkts.append(p)
331         return pkts
332
333     def verify_capture(self, pg_if, capture, traffic_type=0, ip_type=0):
334         """
335         Verify captured input packet stream for defined interface.
336
337         :param object pg_if: Interface to verify captured packet stream for.
338         :param list capture: Captured packet stream.
339         :param traffic_type: 1: ICMP packet, 2: IPv6 with EH, 0: otherwise.
340         """
341         last_info = dict()
342         for i in self.pg_interfaces:
343             last_info[i.sw_if_index] = None
344         dst_sw_if_index = pg_if.sw_if_index
345         for packet in capture:
346             try:
347                 # Raw data for ICMPv6 are stored in ICMPv6EchoRequest.data
348                 if traffic_type == self.ICMP and ip_type == self.IPV6:
349                     payload_info = self.payload_to_info(
350                         packet[ICMPv6EchoRequest].data)
351                     payload = packet[ICMPv6EchoRequest]
352                 else:
353                     payload_info = self.payload_to_info(str(packet[Raw]))
354                     payload = packet[self.proto_map[payload_info.proto]]
355             except:
356                 self.logger.error(ppp("Unexpected or invalid packet "
357                                       "(outside network):", packet))
358                 raise
359
360             if ip_type != 0:
361                 self.assertEqual(payload_info.ip, ip_type)
362             if traffic_type == self.ICMP:
363                 try:
364                     if payload_info.ip == 0:
365                         self.assertEqual(payload.type, self.icmp4_type)
366                         self.assertEqual(payload.code, self.icmp4_code)
367                     else:
368                         self.assertEqual(payload.type, self.icmp6_type)
369                         self.assertEqual(payload.code, self.icmp6_code)
370                 except:
371                     self.logger.error(ppp("Unexpected or invalid packet "
372                                           "(outside network):", packet))
373                     raise
374             else:
375                 try:
376                     ip_version = IPv6 if payload_info.ip == 1 else IP
377
378                     ip = packet[ip_version]
379                     packet_index = payload_info.index
380
381                     self.assertEqual(payload_info.dst, dst_sw_if_index)
382                     self.logger.debug("Got packet on port %s: src=%u (id=%u)" %
383                                       (pg_if.name, payload_info.src,
384                                        packet_index))
385                     next_info = self.get_next_packet_info_for_interface2(
386                         payload_info.src, dst_sw_if_index,
387                         last_info[payload_info.src])
388                     last_info[payload_info.src] = next_info
389                     self.assertTrue(next_info is not None)
390                     self.assertEqual(packet_index, next_info.index)
391                     saved_packet = next_info.data
392                     # Check standard fields
393                     self.assertEqual(ip.src, saved_packet[ip_version].src)
394                     self.assertEqual(ip.dst, saved_packet[ip_version].dst)
395                     p = self.proto_map[payload_info.proto]
396                     if p == 'TCP':
397                         tcp = packet[TCP]
398                         self.assertEqual(tcp.sport, saved_packet[
399                             TCP].sport)
400                         self.assertEqual(tcp.dport, saved_packet[
401                             TCP].dport)
402                     elif p == 'UDP':
403                         udp = packet[UDP]
404                         self.assertEqual(udp.sport, saved_packet[
405                             UDP].sport)
406                         self.assertEqual(udp.dport, saved_packet[
407                             UDP].dport)
408                 except:
409                     self.logger.error(ppp("Unexpected or invalid packet:",
410                                           packet))
411                     raise
412         for i in self.pg_interfaces:
413             remaining_packet = self.get_next_packet_info_for_interface2(
414                 i, dst_sw_if_index, last_info[i.sw_if_index])
415             self.assertTrue(
416                 remaining_packet is None,
417                 "Port %u: Packet expected from source %u didn't arrive" %
418                 (dst_sw_if_index, i.sw_if_index))
419
420     def run_traffic_no_check(self):
421         # Test
422         # Create incoming packet streams for packet-generator interfaces
423         for i in self.pg_interfaces:
424             if self.flows.__contains__(i):
425                 pkts = self.create_stream(i, self.pg_if_packet_sizes)
426                 if len(pkts) > 0:
427                     i.add_stream(pkts)
428
429         # Enable packet capture and start packet sending
430         self.pg_enable_capture(self.pg_interfaces)
431         self.pg_start()
432
433     def run_verify_test(self, traffic_type=0, ip_type=0, proto=-1, ports=0,
434                         frags=False, pkt_raw=True):
435         # Test
436         # Create incoming packet streams for packet-generator interfaces
437         pkts_cnt = 0
438         for i in self.pg_interfaces:
439             if self.flows.__contains__(i):
440                 pkts = self.create_stream(i, self.pg_if_packet_sizes,
441                                           traffic_type, ip_type, proto, ports,
442                                           frags, pkt_raw)
443                 if len(pkts) > 0:
444                     i.add_stream(pkts)
445                     pkts_cnt += len(pkts)
446
447         # Enable packet capture and start packet sendingself.IPV
448         self.pg_enable_capture(self.pg_interfaces)
449         self.pg_start()
450
451         # Verify
452         # Verify outgoing packet streams per packet-generator interface
453         for src_if in self.pg_interfaces:
454             if self.flows.__contains__(src_if):
455                 for dst_if in self.flows[src_if]:
456                     capture = dst_if.get_capture(pkts_cnt)
457                     self.logger.info("Verifying capture on interface %s" %
458                                      dst_if.name)
459                     self.verify_capture(dst_if, capture, traffic_type, ip_type)
460
461     def run_verify_negat_test(self, traffic_type=0, ip_type=0, proto=-1,
462                               ports=0, frags=False):
463         # Test
464         self.reset_packet_infos()
465         for i in self.pg_interfaces:
466             if self.flows.__contains__(i):
467                 pkts = self.create_stream(i, self.pg_if_packet_sizes,
468                                           traffic_type, ip_type, proto, ports,
469                                           frags)
470                 if len(pkts) > 0:
471                     i.add_stream(pkts)
472
473         # Enable packet capture and start packet sending
474         self.pg_enable_capture(self.pg_interfaces)
475         self.pg_start()
476
477         # Verify
478         # Verify outgoing packet streams per packet-generator interface
479         for src_if in self.pg_interfaces:
480             if self.flows.__contains__(src_if):
481                 for dst_if in self.flows[src_if]:
482                     self.logger.info("Verifying capture on interface %s" %
483                                      dst_if.name)
484                     capture = dst_if.get_capture(0)
485                     self.assertEqual(len(capture), 0)
486
487     def test_0000_warmup_test(self):
488         """ ACL plugin version check; learn MACs
489         """
490         self.create_hosts(16)
491         self.run_traffic_no_check()
492         reply = self.vapi.papi.acl_plugin_get_version()
493         self.assertEqual(reply.major, 1)
494         self.logger.info("Working with ACL plugin version: %d.%d" % (
495             reply.major, reply.minor))
496         # minor version changes are non breaking
497         # self.assertEqual(reply.minor, 0)
498
499     def test_0001_acl_create(self):
500         """ ACL create/delete test
501         """
502
503         self.logger.info("ACLP_TEST_START_0001")
504         # Add an ACL
505         r = [{'is_permit': 1, 'is_ipv6': 0, 'proto': 17,
506               'srcport_or_icmptype_first': 1234,
507               'srcport_or_icmptype_last': 1235,
508               'src_ip_prefix_len': 0,
509               'src_ip_addr': '\x00\x00\x00\x00',
510               'dstport_or_icmpcode_first': 1234,
511               'dstport_or_icmpcode_last': 1234,
512               'dst_ip_addr': '\x00\x00\x00\x00',
513               'dst_ip_prefix_len': 0}]
514         # Test 1: add a new ACL
515         reply = self.vapi.acl_add_replace(acl_index=4294967295, r=r,
516                                           tag="permit 1234")
517         self.assertEqual(reply.retval, 0)
518         # The very first ACL gets #0
519         self.assertEqual(reply.acl_index, 0)
520         first_acl = reply.acl_index
521         rr = self.vapi.acl_dump(reply.acl_index)
522         self.logger.info("Dumped ACL: " + str(rr))
523         self.assertEqual(len(rr), 1)
524         # We should have the same number of ACL entries as we had asked
525         self.assertEqual(len(rr[0].r), len(r))
526         # The rules should be the same. But because the submitted and returned
527         # are different types, we need to iterate over rules and keys to get
528         # to basic values.
529         for i_rule in range(0, len(r) - 1):
530             for rule_key in r[i_rule]:
531                 self.assertEqual(rr[0].r[i_rule][rule_key],
532                                  r[i_rule][rule_key])
533
534         # Add a deny-1234 ACL
535         r_deny = [{'is_permit': 0, 'is_ipv6': 0, 'proto': 17,
536                    'srcport_or_icmptype_first': 1234,
537                    'srcport_or_icmptype_last': 1235,
538                    'src_ip_prefix_len': 0,
539                    'src_ip_addr': '\x00\x00\x00\x00',
540                    'dstport_or_icmpcode_first': 1234,
541                    'dstport_or_icmpcode_last': 1234,
542                    'dst_ip_addr': '\x00\x00\x00\x00',
543                    'dst_ip_prefix_len': 0},
544                   {'is_permit': 1, 'is_ipv6': 0, 'proto': 17,
545                    'srcport_or_icmptype_first': 0,
546                    'srcport_or_icmptype_last': 0,
547                    'src_ip_prefix_len': 0,
548                    'src_ip_addr': '\x00\x00\x00\x00',
549                    'dstport_or_icmpcode_first': 0,
550                    'dstport_or_icmpcode_last': 0,
551                    'dst_ip_addr': '\x00\x00\x00\x00',
552                    'dst_ip_prefix_len': 0}]
553
554         reply = self.vapi.acl_add_replace(acl_index=4294967295, r=r_deny,
555                                           tag="deny 1234;permit all")
556         self.assertEqual(reply.retval, 0)
557         # The second ACL gets #1
558         self.assertEqual(reply.acl_index, 1)
559         second_acl = reply.acl_index
560
561         # Test 2: try to modify a nonexistent ACL
562         reply = self.vapi.acl_add_replace(acl_index=432, r=r,
563                                           tag="FFFF:FFFF", expected_retval=-1)
564         self.assertEqual(reply.retval, -1)
565         # The ACL number should pass through
566         self.assertEqual(reply.acl_index, 432)
567         # apply an ACL on an interface inbound, try to delete ACL, must fail
568         self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
569                                              n_input=1,
570                                              acls=[first_acl])
571         reply = self.vapi.acl_del(acl_index=first_acl, expected_retval=-1)
572         # Unapply an ACL and then try to delete it - must be ok
573         self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
574                                              n_input=0,
575                                              acls=[])
576         reply = self.vapi.acl_del(acl_index=first_acl, expected_retval=0)
577
578         # apply an ACL on an interface outbound, try to delete ACL, must fail
579         self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
580                                              n_input=0,
581                                              acls=[second_acl])
582         reply = self.vapi.acl_del(acl_index=second_acl, expected_retval=-1)
583         # Unapply the ACL and then try to delete it - must be ok
584         self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
585                                              n_input=0,
586                                              acls=[])
587         reply = self.vapi.acl_del(acl_index=second_acl, expected_retval=0)
588
589         # try to apply a nonexistent ACL - must fail
590         self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
591                                              n_input=1,
592                                              acls=[first_acl],
593                                              expected_retval=-1)
594
595         self.logger.info("ACLP_TEST_FINISH_0001")
596
597     def test_0002_acl_permit_apply(self):
598         """ permit ACL apply test
599         """
600         self.logger.info("ACLP_TEST_START_0002")
601
602         rules = []
603         rules.append(self.create_rule(self.IPV4, self.PERMIT,
604                      0, self.proto[self.IP][self.UDP]))
605         rules.append(self.create_rule(self.IPV4, self.PERMIT,
606                      0, self.proto[self.IP][self.TCP]))
607
608         # Apply rules
609         self.apply_rules(rules, "permit per-flow")
610
611         # Traffic should still pass
612         self.run_verify_test(self.IP, self.IPV4, -1)
613         self.logger.info("ACLP_TEST_FINISH_0002")
614
615     def test_0003_acl_deny_apply(self):
616         """ deny ACL apply test
617         """
618         self.logger.info("ACLP_TEST_START_0003")
619         # Add a deny-flows ACL
620         rules = []
621         rules.append(self.create_rule(self.IPV4, self.DENY,
622                      self.PORTS_ALL, self.proto[self.IP][self.UDP]))
623         # Permit ip any any in the end
624         rules.append(self.create_rule(self.IPV4, self.PERMIT,
625                                       self.PORTS_ALL, 0))
626
627         # Apply rules
628         self.apply_rules(rules, "deny per-flow;permit all")
629
630         # Traffic should not pass
631         self.run_verify_negat_test(self.IP, self.IPV4,
632                                    self.proto[self.IP][self.UDP])
633         self.logger.info("ACLP_TEST_FINISH_0003")
634         # self.assertEqual(1, 0)
635
636     def test_0004_vpp624_permit_icmpv4(self):
637         """ VPP_624 permit ICMPv4
638         """
639         self.logger.info("ACLP_TEST_START_0004")
640
641         # Add an ACL
642         rules = []
643         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
644                                       self.proto[self.ICMP][self.ICMPv4]))
645         # deny ip any any in the end
646         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
647
648         # Apply rules
649         self.apply_rules(rules, "permit icmpv4")
650
651         # Traffic should still pass
652         self.run_verify_test(self.ICMP, self.IPV4,
653                              self.proto[self.ICMP][self.ICMPv4])
654
655         self.logger.info("ACLP_TEST_FINISH_0004")
656
657     def test_0005_vpp624_permit_icmpv6(self):
658         """ VPP_624 permit ICMPv6
659         """
660         self.logger.info("ACLP_TEST_START_0005")
661
662         # Add an ACL
663         rules = []
664         rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
665                                       self.proto[self.ICMP][self.ICMPv6]))
666         # deny ip any any in the end
667         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
668
669         # Apply rules
670         self.apply_rules(rules, "permit icmpv6")
671
672         # Traffic should still pass
673         self.run_verify_test(self.ICMP, self.IPV6,
674                              self.proto[self.ICMP][self.ICMPv6])
675
676         self.logger.info("ACLP_TEST_FINISH_0005")
677
678     def test_0006_vpp624_deny_icmpv4(self):
679         """ VPP_624 deny ICMPv4
680         """
681         self.logger.info("ACLP_TEST_START_0006")
682         # Add an ACL
683         rules = []
684         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
685                                       self.proto[self.ICMP][self.ICMPv4]))
686         # permit ip any any in the end
687         rules.append(self.create_rule(self.IPV4, self.PERMIT,
688                                       self.PORTS_ALL, 0))
689
690         # Apply rules
691         self.apply_rules(rules, "deny icmpv4")
692
693         # Traffic should not pass
694         self.run_verify_negat_test(self.ICMP, self.IPV4, 0)
695
696         self.logger.info("ACLP_TEST_FINISH_0006")
697
698     def test_0007_vpp624_deny_icmpv6(self):
699         """ VPP_624 deny ICMPv6
700         """
701         self.logger.info("ACLP_TEST_START_0007")
702         # Add an ACL
703         rules = []
704         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
705                                       self.proto[self.ICMP][self.ICMPv6]))
706         # deny ip any any in the end
707         rules.append(self.create_rule(self.IPV6, self.PERMIT,
708                                       self.PORTS_ALL, 0))
709
710         # Apply rules
711         self.apply_rules(rules, "deny icmpv6")
712
713         # Traffic should not pass
714         self.run_verify_negat_test(self.ICMP, self.IPV6, 0)
715
716         self.logger.info("ACLP_TEST_FINISH_0007")
717
718     def test_0008_tcp_permit_v4(self):
719         """ permit TCPv4
720         """
721         self.logger.info("ACLP_TEST_START_0008")
722
723         # Add an ACL
724         rules = []
725         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
726                      self.proto[self.IP][self.TCP]))
727         # deny ip any any in the end
728         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
729
730         # Apply rules
731         self.apply_rules(rules, "permit ipv4 tcp")
732
733         # Traffic should still pass
734         self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP])
735
736         self.logger.info("ACLP_TEST_FINISH_0008")
737
738     def test_0009_tcp_permit_v6(self):
739         """ permit TCPv6
740         """
741         self.logger.info("ACLP_TEST_START_0009")
742
743         # Add an ACL
744         rules = []
745         rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
746                                       self.proto[self.IP][self.TCP]))
747         # deny ip any any in the end
748         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
749
750         # Apply rules
751         self.apply_rules(rules, "permit ip6 tcp")
752
753         # Traffic should still pass
754         self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.TCP])
755
756         self.logger.info("ACLP_TEST_FINISH_0008")
757
758     def test_0010_udp_permit_v4(self):
759         """ permit UDPv4
760         """
761         self.logger.info("ACLP_TEST_START_0010")
762
763         # Add an ACL
764         rules = []
765         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
766                                       self.proto[self.IP][self.UDP]))
767         # deny ip any any in the end
768         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
769
770         # Apply rules
771         self.apply_rules(rules, "permit ipv udp")
772
773         # Traffic should still pass
774         self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP])
775
776         self.logger.info("ACLP_TEST_FINISH_0010")
777
778     def test_0011_udp_permit_v6(self):
779         """ permit UDPv6
780         """
781         self.logger.info("ACLP_TEST_START_0011")
782
783         # Add an ACL
784         rules = []
785         rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
786                                       self.proto[self.IP][self.UDP]))
787         # deny ip any any in the end
788         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
789
790         # Apply rules
791         self.apply_rules(rules, "permit ip6 udp")
792
793         # Traffic should still pass
794         self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.UDP])
795
796         self.logger.info("ACLP_TEST_FINISH_0011")
797
798     def test_0012_tcp_deny(self):
799         """ deny TCPv4/v6
800         """
801         self.logger.info("ACLP_TEST_START_0012")
802
803         # Add an ACL
804         rules = []
805         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
806                                       self.proto[self.IP][self.TCP]))
807         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
808                                       self.proto[self.IP][self.TCP]))
809         # permit ip any any in the end
810         rules.append(self.create_rule(self.IPV4, self.PERMIT,
811                                       self.PORTS_ALL, 0))
812         rules.append(self.create_rule(self.IPV6, self.PERMIT,
813                                       self.PORTS_ALL, 0))
814
815         # Apply rules
816         self.apply_rules(rules, "deny ip4/ip6 tcp")
817
818         # Traffic should not pass
819         self.run_verify_negat_test(self.IP, self.IPRANDOM,
820                                    self.proto[self.IP][self.TCP])
821
822         self.logger.info("ACLP_TEST_FINISH_0012")
823
824     def test_0013_udp_deny(self):
825         """ deny UDPv4/v6
826         """
827         self.logger.info("ACLP_TEST_START_0013")
828
829         # Add an ACL
830         rules = []
831         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
832                                       self.proto[self.IP][self.UDP]))
833         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
834                                       self.proto[self.IP][self.UDP]))
835         # permit ip any any in the end
836         rules.append(self.create_rule(self.IPV4, self.PERMIT,
837                                       self.PORTS_ALL, 0))
838         rules.append(self.create_rule(self.IPV6, self.PERMIT,
839                                       self.PORTS_ALL, 0))
840
841         # Apply rules
842         self.apply_rules(rules, "deny ip4/ip6 udp")
843
844         # Traffic should not pass
845         self.run_verify_negat_test(self.IP, self.IPRANDOM,
846                                    self.proto[self.IP][self.UDP])
847
848         self.logger.info("ACLP_TEST_FINISH_0013")
849
850     def test_0014_acl_dump(self):
851         """ verify add/dump acls
852         """
853         self.logger.info("ACLP_TEST_START_0014")
854
855         r = [[self.IPV4, self.PERMIT, 1234, self.proto[self.IP][self.TCP]],
856              [self.IPV4, self.PERMIT, 2345, self.proto[self.IP][self.UDP]],
857              [self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.TCP]],
858              [self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.UDP]],
859              [self.IPV4, self.PERMIT, 5, self.proto[self.ICMP][self.ICMPv4]],
860              [self.IPV6, self.PERMIT, 4321, self.proto[self.IP][self.TCP]],
861              [self.IPV6, self.PERMIT, 5432, self.proto[self.IP][self.UDP]],
862              [self.IPV6, self.PERMIT, 0, self.proto[self.IP][self.TCP]],
863              [self.IPV6, self.PERMIT, 0, self.proto[self.IP][self.UDP]],
864              [self.IPV6, self.PERMIT, 6, self.proto[self.ICMP][self.ICMPv6]],
865              [self.IPV4, self.DENY, self.PORTS_ALL, 0],
866              [self.IPV4, self.DENY, 1234, self.proto[self.IP][self.TCP]],
867              [self.IPV4, self.DENY, 2345, self.proto[self.IP][self.UDP]],
868              [self.IPV4, self.DENY, 5, self.proto[self.ICMP][self.ICMPv4]],
869              [self.IPV6, self.DENY, 4321, self.proto[self.IP][self.TCP]],
870              [self.IPV6, self.DENY, 5432, self.proto[self.IP][self.UDP]],
871              [self.IPV6, self.DENY, 6, self.proto[self.ICMP][self.ICMPv6]],
872              [self.IPV6, self.DENY, self.PORTS_ALL, 0]
873              ]
874
875         # Add and verify new ACLs
876         rules = []
877         for i in range(len(r)):
878             rules.append(self.create_rule(r[i][0], r[i][1], r[i][2], r[i][3]))
879
880         reply = self.vapi.acl_add_replace(acl_index=4294967295, r=rules)
881         result = self.vapi.acl_dump(reply.acl_index)
882
883         i = 0
884         for drules in result:
885             for dr in drules.r:
886                 self.assertEqual(dr.is_ipv6, r[i][0])
887                 self.assertEqual(dr.is_permit, r[i][1])
888                 self.assertEqual(dr.proto, r[i][3])
889
890                 if r[i][2] > 0:
891                     self.assertEqual(dr.srcport_or_icmptype_first, r[i][2])
892                 else:
893                     if r[i][2] < 0:
894                         self.assertEqual(dr.srcport_or_icmptype_first, 0)
895                         self.assertEqual(dr.srcport_or_icmptype_last, 65535)
896                     else:
897                         if dr.proto == self.proto[self.IP][self.TCP]:
898                             self.assertGreater(dr.srcport_or_icmptype_first,
899                                                self.tcp_sport_from-1)
900                             self.assertLess(dr.srcport_or_icmptype_first,
901                                             self.tcp_sport_to+1)
902                             self.assertGreater(dr.dstport_or_icmpcode_last,
903                                                self.tcp_dport_from-1)
904                             self.assertLess(dr.dstport_or_icmpcode_last,
905                                             self.tcp_dport_to+1)
906                         elif dr.proto == self.proto[self.IP][self.UDP]:
907                             self.assertGreater(dr.srcport_or_icmptype_first,
908                                                self.udp_sport_from-1)
909                             self.assertLess(dr.srcport_or_icmptype_first,
910                                             self.udp_sport_to+1)
911                             self.assertGreater(dr.dstport_or_icmpcode_last,
912                                                self.udp_dport_from-1)
913                             self.assertLess(dr.dstport_or_icmpcode_last,
914                                             self.udp_dport_to+1)
915                 i += 1
916
917         self.logger.info("ACLP_TEST_FINISH_0014")
918
919     def test_0015_tcp_permit_port_v4(self):
920         """ permit single TCPv4
921         """
922         self.logger.info("ACLP_TEST_START_0015")
923
924         port = random.randint(0, 65535)
925         # Add an ACL
926         rules = []
927         rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
928                                       self.proto[self.IP][self.TCP]))
929         # deny ip any any in the end
930         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
931
932         # Apply rules
933         self.apply_rules(rules, "permit ip4 tcp "+str(port))
934
935         # Traffic should still pass
936         self.run_verify_test(self.IP, self.IPV4,
937                              self.proto[self.IP][self.TCP], port)
938
939         self.logger.info("ACLP_TEST_FINISH_0015")
940
941     def test_0016_udp_permit_port_v4(self):
942         """ permit single UDPv4
943         """
944         self.logger.info("ACLP_TEST_START_0016")
945
946         port = random.randint(0, 65535)
947         # Add an ACL
948         rules = []
949         rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
950                                       self.proto[self.IP][self.UDP]))
951         # deny ip any any in the end
952         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
953
954         # Apply rules
955         self.apply_rules(rules, "permit ip4 tcp "+str(port))
956
957         # Traffic should still pass
958         self.run_verify_test(self.IP, self.IPV4,
959                              self.proto[self.IP][self.UDP], port)
960
961         self.logger.info("ACLP_TEST_FINISH_0016")
962
963     def test_0017_tcp_permit_port_v6(self):
964         """ permit single TCPv6
965         """
966         self.logger.info("ACLP_TEST_START_0017")
967
968         port = random.randint(0, 65535)
969         # Add an ACL
970         rules = []
971         rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
972                                       self.proto[self.IP][self.TCP]))
973         # deny ip any any in the end
974         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
975
976         # Apply rules
977         self.apply_rules(rules, "permit ip4 tcp "+str(port))
978
979         # Traffic should still pass
980         self.run_verify_test(self.IP, self.IPV6,
981                              self.proto[self.IP][self.TCP], port)
982
983         self.logger.info("ACLP_TEST_FINISH_0017")
984
985     def test_0018_udp_permit_port_v6(self):
986         """ permit single UPPv6
987         """
988         self.logger.info("ACLP_TEST_START_0018")
989
990         port = random.randint(0, 65535)
991         # Add an ACL
992         rules = []
993         rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
994                                       self.proto[self.IP][self.UDP]))
995         # deny ip any any in the end
996         rules.append(self.create_rule(self.IPV6, self.DENY,
997                                       self.PORTS_ALL, 0))
998
999         # Apply rules
1000         self.apply_rules(rules, "permit ip4 tcp "+str(port))
1001
1002         # Traffic should still pass
1003         self.run_verify_test(self.IP, self.IPV6,
1004                              self.proto[self.IP][self.UDP], port)
1005
1006         self.logger.info("ACLP_TEST_FINISH_0018")
1007
1008     def test_0019_udp_deny_port(self):
1009         """ deny single TCPv4/v6
1010         """
1011         self.logger.info("ACLP_TEST_START_0019")
1012
1013         port = random.randint(0, 65535)
1014         # Add an ACL
1015         rules = []
1016         rules.append(self.create_rule(self.IPV4, self.DENY, port,
1017                                       self.proto[self.IP][self.TCP]))
1018         rules.append(self.create_rule(self.IPV6, self.DENY, port,
1019                                       self.proto[self.IP][self.TCP]))
1020         # Permit ip any any in the end
1021         rules.append(self.create_rule(self.IPV4, self.PERMIT,
1022                                       self.PORTS_ALL, 0))
1023         rules.append(self.create_rule(self.IPV6, self.PERMIT,
1024                                       self.PORTS_ALL, 0))
1025
1026         # Apply rules
1027         self.apply_rules(rules, "deny ip4/ip6 udp "+str(port))
1028
1029         # Traffic should not pass
1030         self.run_verify_negat_test(self.IP, self.IPRANDOM,
1031                                    self.proto[self.IP][self.TCP], port)
1032
1033         self.logger.info("ACLP_TEST_FINISH_0019")
1034
1035     def test_0020_udp_deny_port(self):
1036         """ deny single UDPv4/v6
1037         """
1038         self.logger.info("ACLP_TEST_START_0020")
1039
1040         port = random.randint(0, 65535)
1041         # Add an ACL
1042         rules = []
1043         rules.append(self.create_rule(self.IPV4, self.DENY, port,
1044                                       self.proto[self.IP][self.UDP]))
1045         rules.append(self.create_rule(self.IPV6, self.DENY, port,
1046                                       self.proto[self.IP][self.UDP]))
1047         # Permit ip any any in the end
1048         rules.append(self.create_rule(self.IPV4, self.PERMIT,
1049                                       self.PORTS_ALL, 0))
1050         rules.append(self.create_rule(self.IPV6, self.PERMIT,
1051                                       self.PORTS_ALL, 0))
1052
1053         # Apply rules
1054         self.apply_rules(rules, "deny ip4/ip6 udp "+str(port))
1055
1056         # Traffic should not pass
1057         self.run_verify_negat_test(self.IP, self.IPRANDOM,
1058                                    self.proto[self.IP][self.UDP], port)
1059
1060         self.logger.info("ACLP_TEST_FINISH_0020")
1061
1062     def test_0021_udp_deny_port_verify_fragment_deny(self):
1063         """ deny single UDPv4/v6, permit ip any, verify non-initial fragment blocked
1064         """
1065         self.logger.info("ACLP_TEST_START_0021")
1066
1067         port = random.randint(0, 65535)
1068         # Add an ACL
1069         rules = []
1070         rules.append(self.create_rule(self.IPV4, self.DENY, port,
1071                                       self.proto[self.IP][self.UDP]))
1072         rules.append(self.create_rule(self.IPV6, self.DENY, port,
1073                                       self.proto[self.IP][self.UDP]))
1074         # deny ip any any in the end
1075         rules.append(self.create_rule(self.IPV4, self.PERMIT,
1076                                       self.PORTS_ALL, 0))
1077         rules.append(self.create_rule(self.IPV6, self.PERMIT,
1078                                       self.PORTS_ALL, 0))
1079
1080         # Apply rules
1081         self.apply_rules(rules, "deny ip4/ip6 udp "+str(port))
1082
1083         # Traffic should not pass
1084         self.run_verify_negat_test(self.IP, self.IPRANDOM,
1085                                    self.proto[self.IP][self.UDP], port, True)
1086
1087         self.logger.info("ACLP_TEST_FINISH_0021")
1088
1089     def test_0022_zero_length_udp_ipv4(self):
1090         """ VPP-687 zero length udp ipv4 packet"""
1091         self.logger.info("ACLP_TEST_START_0022")
1092
1093         port = random.randint(0, 65535)
1094         # Add an ACL
1095         rules = []
1096         rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
1097                                       self.proto[self.IP][self.UDP]))
1098         # deny ip any any in the end
1099         rules.append(
1100             self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1101
1102         # Apply rules
1103         self.apply_rules(rules, "permit empty udp ip4 " + str(port))
1104
1105         # Traffic should still pass
1106         # Create incoming packet streams for packet-generator interfaces
1107         pkts_cnt = 0
1108         pkts = self.create_stream(self.pg0, self.pg_if_packet_sizes,
1109                                   self.IP, self.IPV4,
1110                                   self.proto[self.IP][self.UDP], port,
1111                                   False, False)
1112         if len(pkts) > 0:
1113             self.pg0.add_stream(pkts)
1114             pkts_cnt += len(pkts)
1115
1116         # Enable packet capture and start packet sendingself.IPV
1117         self.pg_enable_capture(self.pg_interfaces)
1118         self.pg_start()
1119
1120         self.pg1.get_capture(pkts_cnt)
1121
1122         self.logger.info("ACLP_TEST_FINISH_0022")
1123
1124     def test_0023_zero_length_udp_ipv6(self):
1125         """ VPP-687 zero length udp ipv6 packet"""
1126         self.logger.info("ACLP_TEST_START_0023")
1127
1128         port = random.randint(0, 65535)
1129         # Add an ACL
1130         rules = []
1131         rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
1132                                       self.proto[self.IP][self.UDP]))
1133         # deny ip any any in the end
1134         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1135
1136         # Apply rules
1137         self.apply_rules(rules, "permit empty udp ip6 "+str(port))
1138
1139         # Traffic should still pass
1140         # Create incoming packet streams for packet-generator interfaces
1141         pkts_cnt = 0
1142         pkts = self.create_stream(self.pg0, self.pg_if_packet_sizes,
1143                                   self.IP, self.IPV6,
1144                                   self.proto[self.IP][self.UDP], port,
1145                                   False, False)
1146         if len(pkts) > 0:
1147             self.pg0.add_stream(pkts)
1148             pkts_cnt += len(pkts)
1149
1150         # Enable packet capture and start packet sendingself.IPV
1151         self.pg_enable_capture(self.pg_interfaces)
1152         self.pg_start()
1153
1154         # Verify outgoing packet streams per packet-generator interface
1155         self.pg1.get_capture(pkts_cnt)
1156
1157         self.logger.info("ACLP_TEST_FINISH_0023")
1158
1159     def test_0108_tcp_permit_v4(self):
1160         """ permit TCPv4 + non-match range
1161         """
1162         self.logger.info("ACLP_TEST_START_0108")
1163
1164         # Add an ACL
1165         rules = []
1166         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1167                      self.proto[self.IP][self.TCP]))
1168         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1169                      self.proto[self.IP][self.TCP]))
1170         # deny ip any any in the end
1171         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1172
1173         # Apply rules
1174         self.apply_rules(rules, "permit ipv4 tcp")
1175
1176         # Traffic should still pass
1177         self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP])
1178
1179         self.logger.info("ACLP_TEST_FINISH_0108")
1180
1181     def test_0109_tcp_permit_v6(self):
1182         """ permit TCPv6 + non-match range
1183         """
1184         self.logger.info("ACLP_TEST_START_0109")
1185
1186         # Add an ACL
1187         rules = []
1188         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE_2,
1189                                       self.proto[self.IP][self.TCP]))
1190         rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
1191                                       self.proto[self.IP][self.TCP]))
1192         # deny ip any any in the end
1193         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1194
1195         # Apply rules
1196         self.apply_rules(rules, "permit ip6 tcp")
1197
1198         # Traffic should still pass
1199         self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.TCP])
1200
1201         self.logger.info("ACLP_TEST_FINISH_0109")
1202
1203     def test_0110_udp_permit_v4(self):
1204         """ permit UDPv4 + non-match range
1205         """
1206         self.logger.info("ACLP_TEST_START_0110")
1207
1208         # Add an ACL
1209         rules = []
1210         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1211                                       self.proto[self.IP][self.UDP]))
1212         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1213                                       self.proto[self.IP][self.UDP]))
1214         # deny ip any any in the end
1215         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1216
1217         # Apply rules
1218         self.apply_rules(rules, "permit ipv4 udp")
1219
1220         # Traffic should still pass
1221         self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP])
1222
1223         self.logger.info("ACLP_TEST_FINISH_0110")
1224
1225     def test_0111_udp_permit_v6(self):
1226         """ permit UDPv6 + non-match range
1227         """
1228         self.logger.info("ACLP_TEST_START_0111")
1229
1230         # Add an ACL
1231         rules = []
1232         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE_2,
1233                                       self.proto[self.IP][self.UDP]))
1234         rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
1235                                       self.proto[self.IP][self.UDP]))
1236         # deny ip any any in the end
1237         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1238
1239         # Apply rules
1240         self.apply_rules(rules, "permit ip6 udp")
1241
1242         # Traffic should still pass
1243         self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.UDP])
1244
1245         self.logger.info("ACLP_TEST_FINISH_0111")
1246
1247     def test_0112_tcp_deny(self):
1248         """ deny TCPv4/v6 + non-match range
1249         """
1250         self.logger.info("ACLP_TEST_START_0112")
1251
1252         # Add an ACL
1253         rules = []
1254         rules.append(self.create_rule(self.IPV4, self.PERMIT,
1255                                       self.PORTS_RANGE_2,
1256                                       self.proto[self.IP][self.TCP]))
1257         rules.append(self.create_rule(self.IPV6, self.PERMIT,
1258                                       self.PORTS_RANGE_2,
1259                                       self.proto[self.IP][self.TCP]))
1260         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
1261                                       self.proto[self.IP][self.TCP]))
1262         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
1263                                       self.proto[self.IP][self.TCP]))
1264         # permit ip any any in the end
1265         rules.append(self.create_rule(self.IPV4, self.PERMIT,
1266                                       self.PORTS_ALL, 0))
1267         rules.append(self.create_rule(self.IPV6, self.PERMIT,
1268                                       self.PORTS_ALL, 0))
1269
1270         # Apply rules
1271         self.apply_rules(rules, "deny ip4/ip6 tcp")
1272
1273         # Traffic should not pass
1274         self.run_verify_negat_test(self.IP, self.IPRANDOM,
1275                                    self.proto[self.IP][self.TCP])
1276
1277         self.logger.info("ACLP_TEST_FINISH_0112")
1278
1279     def test_0113_udp_deny(self):
1280         """ deny UDPv4/v6 + non-match range
1281         """
1282         self.logger.info("ACLP_TEST_START_0113")
1283
1284         # Add an ACL
1285         rules = []
1286         rules.append(self.create_rule(self.IPV4, self.PERMIT,
1287                                       self.PORTS_RANGE_2,
1288                                       self.proto[self.IP][self.UDP]))
1289         rules.append(self.create_rule(self.IPV6, self.PERMIT,
1290                                       self.PORTS_RANGE_2,
1291                                       self.proto[self.IP][self.UDP]))
1292         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
1293                                       self.proto[self.IP][self.UDP]))
1294         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
1295                                       self.proto[self.IP][self.UDP]))
1296         # permit ip any any in the end
1297         rules.append(self.create_rule(self.IPV4, self.PERMIT,
1298                                       self.PORTS_ALL, 0))
1299         rules.append(self.create_rule(self.IPV6, self.PERMIT,
1300                                       self.PORTS_ALL, 0))
1301
1302         # Apply rules
1303         self.apply_rules(rules, "deny ip4/ip6 udp")
1304
1305         # Traffic should not pass
1306         self.run_verify_negat_test(self.IP, self.IPRANDOM,
1307                                    self.proto[self.IP][self.UDP])
1308
1309         self.logger.info("ACLP_TEST_FINISH_0113")
1310
1311
1312 if __name__ == '__main__':
1313     unittest.main(testRunner=VppTestRunner)