4d748921aaf12e4c772c834c526e55ba1c592854
[vpp.git] / test / test_acl_plugin.py
1 #!/usr/bin/env python
2 """ACL plugin Test Case HLD:
3 """
4
5 import unittest
6 import random
7
8 from scapy.packet import Raw
9 from scapy.layers.l2 import Ether
10 from scapy.layers.inet import IP, TCP, UDP, ICMP
11 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest
12 from scapy.layers.inet6 import IPv6ExtHdrFragment
13 from framework import VppTestCase, VppTestRunner
14 from util import Host, ppp
15
16
17 class TestACLplugin(VppTestCase):
18     """ ACL plugin Test Case """
19
20     # traffic types
21     IP = 0
22     ICMP = 1
23
24     # IP version
25     IPRANDOM = -1
26     IPV4 = 0
27     IPV6 = 1
28
29     # rule types
30     DENY = 0
31     PERMIT = 1
32
33     # supported protocols
34     proto = [[6, 17], [1, 58]]
35     proto_map = {1: 'ICMP', 58: 'ICMPv6EchoRequest', 6: 'TCP', 17: 'UDP'}
36     ICMPv4 = 0
37     ICMPv6 = 1
38     TCP = 0
39     UDP = 1
40     PROTO_ALL = 0
41
42     # port ranges
43     PORTS_ALL = -1
44     PORTS_RANGE = 0
45     PORTS_RANGE_2 = 1
46     udp_sport_from = 10
47     udp_sport_to = udp_sport_from + 5
48     udp_dport_from = 20000
49     udp_dport_to = udp_dport_from + 5000
50     tcp_sport_from = 30
51     tcp_sport_to = tcp_sport_from + 5
52     tcp_dport_from = 40000
53     tcp_dport_to = tcp_dport_from + 5000
54
55     udp_sport_from_2 = 90
56     udp_sport_to_2 = udp_sport_from_2 + 5
57     udp_dport_from_2 = 30000
58     udp_dport_to_2 = udp_dport_from_2 + 5000
59     tcp_sport_from_2 = 130
60     tcp_sport_to_2 = tcp_sport_from_2 + 5
61     tcp_dport_from_2 = 20000
62     tcp_dport_to_2 = tcp_dport_from_2 + 5000
63
64     icmp4_type = 8  # echo request
65     icmp4_code = 3
66     icmp6_type = 128  # echo request
67     icmp6_code = 3
68
69     icmp4_type_2 = 8
70     icmp4_code_from_2 = 5
71     icmp4_code_to_2 = 20
72     icmp6_type_2 = 128
73     icmp6_code_from_2 = 8
74     icmp6_code_to_2 = 42
75
76     # Test variables
77     bd_id = 1
78
79     @classmethod
80     def setUpClass(cls):
81         """
82         Perform standard class setup (defined by class method setUpClass in
83         class VppTestCase) before running the test case, set test case related
84         variables and configure VPP.
85         """
86         super(TestACLplugin, cls).setUpClass()
87
88         try:
89             # Create 2 pg interfaces
90             cls.create_pg_interfaces(range(2))
91
92             # Packet flows mapping pg0 -> pg1, pg2 etc.
93             cls.flows = dict()
94             cls.flows[cls.pg0] = [cls.pg1]
95
96             # Packet sizes
97             cls.pg_if_packet_sizes = [64, 512, 1518, 9018]
98
99             # Create BD with MAC learning and unknown unicast flooding disabled
100             # and put interfaces to this BD
101             cls.vapi.bridge_domain_add_del(bd_id=cls.bd_id, uu_flood=1,
102                                            learn=1)
103             for pg_if in cls.pg_interfaces:
104                 cls.vapi.sw_interface_set_l2_bridge(pg_if.sw_if_index,
105                                                     bd_id=cls.bd_id)
106
107             # Set up all interfaces
108             for i in cls.pg_interfaces:
109                 i.admin_up()
110
111             # Mapping between packet-generator index and lists of test hosts
112             cls.hosts_by_pg_idx = dict()
113             for pg_if in cls.pg_interfaces:
114                 cls.hosts_by_pg_idx[pg_if.sw_if_index] = []
115
116             # Create list of deleted hosts
117             cls.deleted_hosts_by_pg_idx = dict()
118             for pg_if in cls.pg_interfaces:
119                 cls.deleted_hosts_by_pg_idx[pg_if.sw_if_index] = []
120
121             # warm-up the mac address tables
122             # self.warmup_test()
123
124         except Exception:
125             super(TestACLplugin, cls).tearDownClass()
126             raise
127
128     def setUp(self):
129         super(TestACLplugin, self).setUp()
130         self.reset_packet_infos()
131
132     def tearDown(self):
133         """
134         Show various debug prints after each test.
135         """
136         super(TestACLplugin, self).tearDown()
137         if not self.vpp_dead:
138             self.logger.info(self.vapi.ppcli("show l2fib verbose"))
139             self.logger.info(self.vapi.ppcli("show acl-plugin acl"))
140             self.logger.info(self.vapi.ppcli("show acl-plugin interface"))
141             self.logger.info(self.vapi.ppcli("show acl-plugin tables"))
142             self.logger.info(self.vapi.ppcli("show bridge-domain %s detail"
143                                              % self.bd_id))
144
145     def create_hosts(self, count, start=0):
146         """
147         Create required number of host MAC addresses and distribute them among
148         interfaces. Create host IPv4 address for every host MAC address.
149
150         :param int count: Number of hosts to create MAC/IPv4 addresses for.
151         :param int start: Number to start numbering from.
152         """
153         n_int = len(self.pg_interfaces)
154         macs_per_if = count / n_int
155         i = -1
156         for pg_if in self.pg_interfaces:
157             i += 1
158             start_nr = macs_per_if * i + start
159             end_nr = count + start if i == (n_int - 1) \
160                 else macs_per_if * (i + 1) + start
161             hosts = self.hosts_by_pg_idx[pg_if.sw_if_index]
162             for j in range(start_nr, end_nr):
163                 host = Host(
164                     "00:00:00:ff:%02x:%02x" % (pg_if.sw_if_index, j),
165                     "172.17.1%02x.%u" % (pg_if.sw_if_index, j),
166                     "2017:dead:%02x::%u" % (pg_if.sw_if_index, j))
167                 hosts.append(host)
168
169     def create_rule(self, ip=0, permit_deny=0, ports=PORTS_ALL, proto=-1,
170                     s_prefix=0, s_ip='\x00\x00\x00\x00',
171                     d_prefix=0, d_ip='\x00\x00\x00\x00'):
172         if proto == -1:
173             return
174         if ports == self.PORTS_ALL:
175             sport_from = 0
176             dport_from = 0
177             sport_to = 65535 if proto != 1 and proto != 58 else 255
178             dport_to = sport_to
179         elif ports == self.PORTS_RANGE:
180             if proto == 1:
181                 sport_from = self.icmp4_type
182                 sport_to = self.icmp4_type
183                 dport_from = self.icmp4_code
184                 dport_to = self.icmp4_code
185             elif proto == 58:
186                 sport_from = self.icmp6_type
187                 sport_to = self.icmp6_type
188                 dport_from = self.icmp6_code
189                 dport_to = self.icmp6_code
190             elif proto == self.proto[self.IP][self.TCP]:
191                 sport_from = self.tcp_sport_from
192                 sport_to = self.tcp_sport_to
193                 dport_from = self.tcp_dport_from
194                 dport_to = self.tcp_dport_to
195             elif proto == self.proto[self.IP][self.UDP]:
196                 sport_from = self.udp_sport_from
197                 sport_to = self.udp_sport_to
198                 dport_from = self.udp_dport_from
199                 dport_to = self.udp_dport_to
200         elif ports == self.PORTS_RANGE_2:
201             if proto == 1:
202                 sport_from = self.icmp4_type_2
203                 sport_to = self.icmp4_type_2
204                 dport_from = self.icmp4_code_from_2
205                 dport_to = self.icmp4_code_to_2
206             elif proto == 58:
207                 sport_from = self.icmp6_type_2
208                 sport_to = self.icmp6_type_2
209                 dport_from = self.icmp6_code_from_2
210                 dport_to = self.icmp6_code_to_2
211             elif proto == self.proto[self.IP][self.TCP]:
212                 sport_from = self.tcp_sport_from_2
213                 sport_to = self.tcp_sport_to_2
214                 dport_from = self.tcp_dport_from_2
215                 dport_to = self.tcp_dport_to_2
216             elif proto == self.proto[self.IP][self.UDP]:
217                 sport_from = self.udp_sport_from_2
218                 sport_to = self.udp_sport_to_2
219                 dport_from = self.udp_dport_from_2
220                 dport_to = self.udp_dport_to_2
221         else:
222             sport_from = ports
223             sport_to = ports
224             dport_from = ports
225             dport_to = ports
226
227         rule = ({'is_permit': permit_deny, 'is_ipv6': ip, 'proto': proto,
228                  'srcport_or_icmptype_first': sport_from,
229                  'srcport_or_icmptype_last': sport_to,
230                  'src_ip_prefix_len': s_prefix,
231                  'src_ip_addr': s_ip,
232                  'dstport_or_icmpcode_first': dport_from,
233                  'dstport_or_icmpcode_last': dport_to,
234                  'dst_ip_prefix_len': d_prefix,
235                  'dst_ip_addr': d_ip})
236         return rule
237
238     def apply_rules(self, rules, tag=''):
239         reply = self.vapi.acl_add_replace(acl_index=4294967295, r=rules,
240                                           tag=tag)
241         self.logger.info("Dumped ACL: " + str(
242             self.vapi.acl_dump(reply.acl_index)))
243         # Apply a ACL on the interface as inbound
244         for i in self.pg_interfaces:
245             self.vapi.acl_interface_set_acl_list(sw_if_index=i.sw_if_index,
246                                                  n_input=1,
247                                                  acls=[reply.acl_index])
248         return
249
250     def create_upper_layer(self, packet_index, proto, ports=0):
251         p = self.proto_map[proto]
252         if p == 'UDP':
253             if ports == 0:
254                 return UDP(sport=random.randint(self.udp_sport_from,
255                                                 self.udp_sport_to),
256                            dport=random.randint(self.udp_dport_from,
257                                                 self.udp_dport_to))
258             else:
259                 return UDP(sport=ports, dport=ports)
260         elif p == 'TCP':
261             if ports == 0:
262                 return TCP(sport=random.randint(self.tcp_sport_from,
263                                                 self.tcp_sport_to),
264                            dport=random.randint(self.tcp_dport_from,
265                                                 self.tcp_dport_to))
266             else:
267                 return TCP(sport=ports, dport=ports)
268         return ''
269
270     def create_stream(self, src_if, packet_sizes, traffic_type=0, ipv6=0,
271                       proto=-1, ports=0, fragments=False, pkt_raw=True):
272         """
273         Create input packet stream for defined interface using hosts or
274         deleted_hosts list.
275
276         :param object src_if: Interface to create packet stream for.
277         :param list packet_sizes: List of required packet sizes.
278         :param traffic_type: 1: ICMP packet, 2: IPv6 with EH, 0: otherwise.
279         :return: Stream of packets.
280         """
281         pkts = []
282         if self.flows.__contains__(src_if):
283             src_hosts = self.hosts_by_pg_idx[src_if.sw_if_index]
284             for dst_if in self.flows[src_if]:
285                 dst_hosts = self.hosts_by_pg_idx[dst_if.sw_if_index]
286                 n_int = len(dst_hosts) * len(src_hosts)
287                 for i in range(0, n_int):
288                     dst_host = dst_hosts[i / len(src_hosts)]
289                     src_host = src_hosts[i % len(src_hosts)]
290                     pkt_info = self.create_packet_info(src_if, dst_if)
291                     if ipv6 == 1:
292                         pkt_info.ip = 1
293                     elif ipv6 == 0:
294                         pkt_info.ip = 0
295                     else:
296                         pkt_info.ip = random.choice([0, 1])
297                     if proto == -1:
298                         pkt_info.proto = random.choice(self.proto[self.IP])
299                     else:
300                         pkt_info.proto = proto
301                     payload = self.info_to_payload(pkt_info)
302                     p = Ether(dst=dst_host.mac, src=src_host.mac)
303                     if pkt_info.ip:
304                         p /= IPv6(dst=dst_host.ip6, src=src_host.ip6)
305                         if fragments:
306                             p /= IPv6ExtHdrFragment(offset=64, m=1)
307                     else:
308                         if fragments:
309                             p /= IP(src=src_host.ip4, dst=dst_host.ip4,
310                                     flags=1, frag=64)
311                         else:
312                             p /= IP(src=src_host.ip4, dst=dst_host.ip4)
313                     if traffic_type == self.ICMP:
314                         if pkt_info.ip:
315                             p /= ICMPv6EchoRequest(type=self.icmp6_type,
316                                                    code=self.icmp6_code)
317                         else:
318                             p /= ICMP(type=self.icmp4_type,
319                                       code=self.icmp4_code)
320                     else:
321                         p /= self.create_upper_layer(i, pkt_info.proto, ports)
322                     if pkt_raw:
323                         p /= Raw(payload)
324                         pkt_info.data = p.copy()
325                     if pkt_raw:
326                         size = random.choice(packet_sizes)
327                         self.extend_packet(p, size)
328                     pkts.append(p)
329         return pkts
330
331     def verify_capture(self, pg_if, capture, traffic_type=0, ip_type=0):
332         """
333         Verify captured input packet stream for defined interface.
334
335         :param object pg_if: Interface to verify captured packet stream for.
336         :param list capture: Captured packet stream.
337         :param traffic_type: 1: ICMP packet, 2: IPv6 with EH, 0: otherwise.
338         """
339         last_info = dict()
340         for i in self.pg_interfaces:
341             last_info[i.sw_if_index] = None
342         dst_sw_if_index = pg_if.sw_if_index
343         for packet in capture:
344             try:
345                 # Raw data for ICMPv6 are stored in ICMPv6EchoRequest.data
346                 if traffic_type == self.ICMP and ip_type == self.IPV6:
347                     payload_info = self.payload_to_info(
348                         packet[ICMPv6EchoRequest].data)
349                     payload = packet[ICMPv6EchoRequest]
350                 else:
351                     payload_info = self.payload_to_info(str(packet[Raw]))
352                     payload = packet[self.proto_map[payload_info.proto]]
353             except:
354                 self.logger.error(ppp("Unexpected or invalid packet "
355                                       "(outside network):", packet))
356                 raise
357
358             if ip_type != 0:
359                 self.assertEqual(payload_info.ip, ip_type)
360             if traffic_type == self.ICMP:
361                 try:
362                     if payload_info.ip == 0:
363                         self.assertEqual(payload.type, self.icmp4_type)
364                         self.assertEqual(payload.code, self.icmp4_code)
365                     else:
366                         self.assertEqual(payload.type, self.icmp6_type)
367                         self.assertEqual(payload.code, self.icmp6_code)
368                 except:
369                     self.logger.error(ppp("Unexpected or invalid packet "
370                                           "(outside network):", packet))
371                     raise
372             else:
373                 try:
374                     ip_version = IPv6 if payload_info.ip == 1 else IP
375
376                     ip = packet[ip_version]
377                     packet_index = payload_info.index
378
379                     self.assertEqual(payload_info.dst, dst_sw_if_index)
380                     self.logger.debug("Got packet on port %s: src=%u (id=%u)" %
381                                       (pg_if.name, payload_info.src,
382                                        packet_index))
383                     next_info = self.get_next_packet_info_for_interface2(
384                         payload_info.src, dst_sw_if_index,
385                         last_info[payload_info.src])
386                     last_info[payload_info.src] = next_info
387                     self.assertTrue(next_info is not None)
388                     self.assertEqual(packet_index, next_info.index)
389                     saved_packet = next_info.data
390                     # Check standard fields
391                     self.assertEqual(ip.src, saved_packet[ip_version].src)
392                     self.assertEqual(ip.dst, saved_packet[ip_version].dst)
393                     p = self.proto_map[payload_info.proto]
394                     if p == 'TCP':
395                         tcp = packet[TCP]
396                         self.assertEqual(tcp.sport, saved_packet[
397                             TCP].sport)
398                         self.assertEqual(tcp.dport, saved_packet[
399                             TCP].dport)
400                     elif p == 'UDP':
401                         udp = packet[UDP]
402                         self.assertEqual(udp.sport, saved_packet[
403                             UDP].sport)
404                         self.assertEqual(udp.dport, saved_packet[
405                             UDP].dport)
406                 except:
407                     self.logger.error(ppp("Unexpected or invalid packet:",
408                                           packet))
409                     raise
410         for i in self.pg_interfaces:
411             remaining_packet = self.get_next_packet_info_for_interface2(
412                 i, dst_sw_if_index, last_info[i.sw_if_index])
413             self.assertTrue(
414                 remaining_packet is None,
415                 "Port %u: Packet expected from source %u didn't arrive" %
416                 (dst_sw_if_index, i.sw_if_index))
417
418     def run_traffic_no_check(self):
419         # Test
420         # Create incoming packet streams for packet-generator interfaces
421         for i in self.pg_interfaces:
422             if self.flows.__contains__(i):
423                 pkts = self.create_stream(i, self.pg_if_packet_sizes)
424                 if len(pkts) > 0:
425                     i.add_stream(pkts)
426
427         # Enable packet capture and start packet sending
428         self.pg_enable_capture(self.pg_interfaces)
429         self.pg_start()
430
431     def run_verify_test(self, traffic_type=0, ip_type=0, proto=-1, ports=0,
432                         frags=False, pkt_raw=True):
433         # Test
434         # Create incoming packet streams for packet-generator interfaces
435         pkts_cnt = 0
436         for i in self.pg_interfaces:
437             if self.flows.__contains__(i):
438                 pkts = self.create_stream(i, self.pg_if_packet_sizes,
439                                           traffic_type, ip_type, proto, ports,
440                                           frags, pkt_raw)
441                 if len(pkts) > 0:
442                     i.add_stream(pkts)
443                     pkts_cnt += len(pkts)
444
445         # Enable packet capture and start packet sendingself.IPV
446         self.pg_enable_capture(self.pg_interfaces)
447         self.pg_start()
448
449         # Verify
450         # Verify outgoing packet streams per packet-generator interface
451         for src_if in self.pg_interfaces:
452             if self.flows.__contains__(src_if):
453                 for dst_if in self.flows[src_if]:
454                     capture = dst_if.get_capture(pkts_cnt)
455                     self.logger.info("Verifying capture on interface %s" %
456                                      dst_if.name)
457                     self.verify_capture(dst_if, capture, traffic_type, ip_type)
458
459     def run_verify_negat_test(self, traffic_type=0, ip_type=0, proto=-1,
460                               ports=0, frags=False):
461         # Test
462         self.reset_packet_infos()
463         for i in self.pg_interfaces:
464             if self.flows.__contains__(i):
465                 pkts = self.create_stream(i, self.pg_if_packet_sizes,
466                                           traffic_type, ip_type, proto, ports,
467                                           frags)
468                 if len(pkts) > 0:
469                     i.add_stream(pkts)
470
471         # Enable packet capture and start packet sending
472         self.pg_enable_capture(self.pg_interfaces)
473         self.pg_start()
474
475         # Verify
476         # Verify outgoing packet streams per packet-generator interface
477         for src_if in self.pg_interfaces:
478             if self.flows.__contains__(src_if):
479                 for dst_if in self.flows[src_if]:
480                     self.logger.info("Verifying capture on interface %s" %
481                                      dst_if.name)
482                     capture = dst_if.get_capture(0)
483                     self.assertEqual(len(capture), 0)
484
485     def test_0000_warmup_test(self):
486         """ ACL plugin version check; learn MACs
487         """
488         self.create_hosts(16)
489         self.run_traffic_no_check()
490         reply = self.vapi.papi.acl_plugin_get_version()
491         self.assertEqual(reply.major, 1)
492         self.logger.info("Working with ACL plugin version: %d.%d" % (
493             reply.major, reply.minor))
494         # minor version changes are non breaking
495         # self.assertEqual(reply.minor, 0)
496
497     def test_0001_acl_create(self):
498         """ ACL create/delete test
499         """
500
501         self.logger.info("ACLP_TEST_START_0001")
502         # Add an ACL
503         r = [{'is_permit': 1, 'is_ipv6': 0, 'proto': 17,
504               'srcport_or_icmptype_first': 1234,
505               'srcport_or_icmptype_last': 1235,
506               'src_ip_prefix_len': 0,
507               'src_ip_addr': '\x00\x00\x00\x00',
508               'dstport_or_icmpcode_first': 1234,
509               'dstport_or_icmpcode_last': 1234,
510               'dst_ip_addr': '\x00\x00\x00\x00',
511               'dst_ip_prefix_len': 0}]
512         # Test 1: add a new ACL
513         reply = self.vapi.acl_add_replace(acl_index=4294967295, r=r,
514                                           tag="permit 1234")
515         self.assertEqual(reply.retval, 0)
516         # The very first ACL gets #0
517         self.assertEqual(reply.acl_index, 0)
518         first_acl = reply.acl_index
519         rr = self.vapi.acl_dump(reply.acl_index)
520         self.logger.info("Dumped ACL: " + str(rr))
521         self.assertEqual(len(rr), 1)
522         # We should have the same number of ACL entries as we had asked
523         self.assertEqual(len(rr[0].r), len(r))
524         # The rules should be the same. But because the submitted and returned
525         # are different types, we need to iterate over rules and keys to get
526         # to basic values.
527         for i_rule in range(0, len(r) - 1):
528             for rule_key in r[i_rule]:
529                 self.assertEqual(rr[0].r[i_rule][rule_key],
530                                  r[i_rule][rule_key])
531
532         # Add a deny-1234 ACL
533         r_deny = [{'is_permit': 0, 'is_ipv6': 0, 'proto': 17,
534                    'srcport_or_icmptype_first': 1234,
535                    'srcport_or_icmptype_last': 1235,
536                    'src_ip_prefix_len': 0,
537                    'src_ip_addr': '\x00\x00\x00\x00',
538                    'dstport_or_icmpcode_first': 1234,
539                    'dstport_or_icmpcode_last': 1234,
540                    'dst_ip_addr': '\x00\x00\x00\x00',
541                    'dst_ip_prefix_len': 0},
542                   {'is_permit': 1, 'is_ipv6': 0, 'proto': 17,
543                    'srcport_or_icmptype_first': 0,
544                    'srcport_or_icmptype_last': 0,
545                    'src_ip_prefix_len': 0,
546                    'src_ip_addr': '\x00\x00\x00\x00',
547                    'dstport_or_icmpcode_first': 0,
548                    'dstport_or_icmpcode_last': 0,
549                    'dst_ip_addr': '\x00\x00\x00\x00',
550                    'dst_ip_prefix_len': 0}]
551
552         reply = self.vapi.acl_add_replace(acl_index=4294967295, r=r_deny,
553                                           tag="deny 1234;permit all")
554         self.assertEqual(reply.retval, 0)
555         # The second ACL gets #1
556         self.assertEqual(reply.acl_index, 1)
557         second_acl = reply.acl_index
558
559         # Test 2: try to modify a nonexistent ACL
560         reply = self.vapi.acl_add_replace(acl_index=432, r=r,
561                                           tag="FFFF:FFFF", expected_retval=-1)
562         self.assertEqual(reply.retval, -1)
563         # The ACL number should pass through
564         self.assertEqual(reply.acl_index, 432)
565         # apply an ACL on an interface inbound, try to delete ACL, must fail
566         self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
567                                              n_input=1,
568                                              acls=[first_acl])
569         reply = self.vapi.acl_del(acl_index=first_acl, expected_retval=-1)
570         # Unapply an ACL and then try to delete it - must be ok
571         self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
572                                              n_input=0,
573                                              acls=[])
574         reply = self.vapi.acl_del(acl_index=first_acl, expected_retval=0)
575
576         # apply an ACL on an interface outbound, try to delete ACL, must fail
577         self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
578                                              n_input=0,
579                                              acls=[second_acl])
580         reply = self.vapi.acl_del(acl_index=second_acl, expected_retval=-1)
581         # Unapply the ACL and then try to delete it - must be ok
582         self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
583                                              n_input=0,
584                                              acls=[])
585         reply = self.vapi.acl_del(acl_index=second_acl, expected_retval=0)
586
587         # try to apply a nonexistent ACL - must fail
588         self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
589                                              n_input=1,
590                                              acls=[first_acl],
591                                              expected_retval=-1)
592
593         self.logger.info("ACLP_TEST_FINISH_0001")
594
595     def test_0002_acl_permit_apply(self):
596         """ permit ACL apply test
597         """
598         self.logger.info("ACLP_TEST_START_0002")
599
600         rules = []
601         rules.append(self.create_rule(self.IPV4, self.PERMIT,
602                      0, self.proto[self.IP][self.UDP]))
603         rules.append(self.create_rule(self.IPV4, self.PERMIT,
604                      0, self.proto[self.IP][self.TCP]))
605
606         # Apply rules
607         self.apply_rules(rules, "permit per-flow")
608
609         # Traffic should still pass
610         self.run_verify_test(self.IP, self.IPV4, -1)
611         self.logger.info("ACLP_TEST_FINISH_0002")
612
613     def test_0003_acl_deny_apply(self):
614         """ deny ACL apply test
615         """
616         self.logger.info("ACLP_TEST_START_0003")
617         # Add a deny-flows ACL
618         rules = []
619         rules.append(self.create_rule(self.IPV4, self.DENY,
620                      self.PORTS_ALL, self.proto[self.IP][self.UDP]))
621         # Permit ip any any in the end
622         rules.append(self.create_rule(self.IPV4, self.PERMIT,
623                                       self.PORTS_ALL, 0))
624
625         # Apply rules
626         self.apply_rules(rules, "deny per-flow;permit all")
627
628         # Traffic should not pass
629         self.run_verify_negat_test(self.IP, self.IPV4,
630                                    self.proto[self.IP][self.UDP])
631         self.logger.info("ACLP_TEST_FINISH_0003")
632         # self.assertEqual(1, 0)
633
634     def test_0004_vpp624_permit_icmpv4(self):
635         """ VPP_624 permit ICMPv4
636         """
637         self.logger.info("ACLP_TEST_START_0004")
638
639         # Add an ACL
640         rules = []
641         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
642                                       self.proto[self.ICMP][self.ICMPv4]))
643         # deny ip any any in the end
644         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
645
646         # Apply rules
647         self.apply_rules(rules, "permit icmpv4")
648
649         # Traffic should still pass
650         self.run_verify_test(self.ICMP, self.IPV4,
651                              self.proto[self.ICMP][self.ICMPv4])
652
653         self.logger.info("ACLP_TEST_FINISH_0004")
654
655     def test_0005_vpp624_permit_icmpv6(self):
656         """ VPP_624 permit ICMPv6
657         """
658         self.logger.info("ACLP_TEST_START_0005")
659
660         # Add an ACL
661         rules = []
662         rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
663                                       self.proto[self.ICMP][self.ICMPv6]))
664         # deny ip any any in the end
665         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
666
667         # Apply rules
668         self.apply_rules(rules, "permit icmpv6")
669
670         # Traffic should still pass
671         self.run_verify_test(self.ICMP, self.IPV6,
672                              self.proto[self.ICMP][self.ICMPv6])
673
674         self.logger.info("ACLP_TEST_FINISH_0005")
675
676     def test_0006_vpp624_deny_icmpv4(self):
677         """ VPP_624 deny ICMPv4
678         """
679         self.logger.info("ACLP_TEST_START_0006")
680         # Add an ACL
681         rules = []
682         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
683                                       self.proto[self.ICMP][self.ICMPv4]))
684         # permit ip any any in the end
685         rules.append(self.create_rule(self.IPV4, self.PERMIT,
686                                       self.PORTS_ALL, 0))
687
688         # Apply rules
689         self.apply_rules(rules, "deny icmpv4")
690
691         # Traffic should not pass
692         self.run_verify_negat_test(self.ICMP, self.IPV4, 0)
693
694         self.logger.info("ACLP_TEST_FINISH_0006")
695
696     def test_0007_vpp624_deny_icmpv6(self):
697         """ VPP_624 deny ICMPv6
698         """
699         self.logger.info("ACLP_TEST_START_0007")
700         # Add an ACL
701         rules = []
702         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
703                                       self.proto[self.ICMP][self.ICMPv6]))
704         # deny ip any any in the end
705         rules.append(self.create_rule(self.IPV6, self.PERMIT,
706                                       self.PORTS_ALL, 0))
707
708         # Apply rules
709         self.apply_rules(rules, "deny icmpv6")
710
711         # Traffic should not pass
712         self.run_verify_negat_test(self.ICMP, self.IPV6, 0)
713
714         self.logger.info("ACLP_TEST_FINISH_0007")
715
716     def test_0008_tcp_permit_v4(self):
717         """ permit TCPv4
718         """
719         self.logger.info("ACLP_TEST_START_0008")
720
721         # Add an ACL
722         rules = []
723         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
724                      self.proto[self.IP][self.TCP]))
725         # deny ip any any in the end
726         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
727
728         # Apply rules
729         self.apply_rules(rules, "permit ipv4 tcp")
730
731         # Traffic should still pass
732         self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP])
733
734         self.logger.info("ACLP_TEST_FINISH_0008")
735
736     def test_0009_tcp_permit_v6(self):
737         """ permit TCPv6
738         """
739         self.logger.info("ACLP_TEST_START_0009")
740
741         # Add an ACL
742         rules = []
743         rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
744                                       self.proto[self.IP][self.TCP]))
745         # deny ip any any in the end
746         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
747
748         # Apply rules
749         self.apply_rules(rules, "permit ip6 tcp")
750
751         # Traffic should still pass
752         self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.TCP])
753
754         self.logger.info("ACLP_TEST_FINISH_0008")
755
756     def test_0010_udp_permit_v4(self):
757         """ permit UDPv4
758         """
759         self.logger.info("ACLP_TEST_START_0010")
760
761         # Add an ACL
762         rules = []
763         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
764                                       self.proto[self.IP][self.UDP]))
765         # deny ip any any in the end
766         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
767
768         # Apply rules
769         self.apply_rules(rules, "permit ipv udp")
770
771         # Traffic should still pass
772         self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP])
773
774         self.logger.info("ACLP_TEST_FINISH_0010")
775
776     def test_0011_udp_permit_v6(self):
777         """ permit UDPv6
778         """
779         self.logger.info("ACLP_TEST_START_0011")
780
781         # Add an ACL
782         rules = []
783         rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
784                                       self.proto[self.IP][self.UDP]))
785         # deny ip any any in the end
786         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
787
788         # Apply rules
789         self.apply_rules(rules, "permit ip6 udp")
790
791         # Traffic should still pass
792         self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.UDP])
793
794         self.logger.info("ACLP_TEST_FINISH_0011")
795
796     def test_0012_tcp_deny(self):
797         """ deny TCPv4/v6
798         """
799         self.logger.info("ACLP_TEST_START_0012")
800
801         # Add an ACL
802         rules = []
803         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
804                                       self.proto[self.IP][self.TCP]))
805         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
806                                       self.proto[self.IP][self.TCP]))
807         # permit ip any any in the end
808         rules.append(self.create_rule(self.IPV4, self.PERMIT,
809                                       self.PORTS_ALL, 0))
810         rules.append(self.create_rule(self.IPV6, self.PERMIT,
811                                       self.PORTS_ALL, 0))
812
813         # Apply rules
814         self.apply_rules(rules, "deny ip4/ip6 tcp")
815
816         # Traffic should not pass
817         self.run_verify_negat_test(self.IP, self.IPRANDOM,
818                                    self.proto[self.IP][self.TCP])
819
820         self.logger.info("ACLP_TEST_FINISH_0012")
821
822     def test_0013_udp_deny(self):
823         """ deny UDPv4/v6
824         """
825         self.logger.info("ACLP_TEST_START_0013")
826
827         # Add an ACL
828         rules = []
829         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
830                                       self.proto[self.IP][self.UDP]))
831         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
832                                       self.proto[self.IP][self.UDP]))
833         # permit ip any any in the end
834         rules.append(self.create_rule(self.IPV4, self.PERMIT,
835                                       self.PORTS_ALL, 0))
836         rules.append(self.create_rule(self.IPV6, self.PERMIT,
837                                       self.PORTS_ALL, 0))
838
839         # Apply rules
840         self.apply_rules(rules, "deny ip4/ip6 udp")
841
842         # Traffic should not pass
843         self.run_verify_negat_test(self.IP, self.IPRANDOM,
844                                    self.proto[self.IP][self.UDP])
845
846         self.logger.info("ACLP_TEST_FINISH_0013")
847
848     def test_0014_acl_dump(self):
849         """ verify add/dump acls
850         """
851         self.logger.info("ACLP_TEST_START_0014")
852
853         r = [[self.IPV4, self.PERMIT, 1234, self.proto[self.IP][self.TCP]],
854              [self.IPV4, self.PERMIT, 2345, self.proto[self.IP][self.UDP]],
855              [self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.TCP]],
856              [self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.UDP]],
857              [self.IPV4, self.PERMIT, 5, self.proto[self.ICMP][self.ICMPv4]],
858              [self.IPV6, self.PERMIT, 4321, self.proto[self.IP][self.TCP]],
859              [self.IPV6, self.PERMIT, 5432, self.proto[self.IP][self.UDP]],
860              [self.IPV6, self.PERMIT, 0, self.proto[self.IP][self.TCP]],
861              [self.IPV6, self.PERMIT, 0, self.proto[self.IP][self.UDP]],
862              [self.IPV6, self.PERMIT, 6, self.proto[self.ICMP][self.ICMPv6]],
863              [self.IPV4, self.DENY, self.PORTS_ALL, 0],
864              [self.IPV4, self.DENY, 1234, self.proto[self.IP][self.TCP]],
865              [self.IPV4, self.DENY, 2345, self.proto[self.IP][self.UDP]],
866              [self.IPV4, self.DENY, 5, self.proto[self.ICMP][self.ICMPv4]],
867              [self.IPV6, self.DENY, 4321, self.proto[self.IP][self.TCP]],
868              [self.IPV6, self.DENY, 5432, self.proto[self.IP][self.UDP]],
869              [self.IPV6, self.DENY, 6, self.proto[self.ICMP][self.ICMPv6]],
870              [self.IPV6, self.DENY, self.PORTS_ALL, 0]
871              ]
872
873         # Add and verify new ACLs
874         rules = []
875         for i in range(len(r)):
876             rules.append(self.create_rule(r[i][0], r[i][1], r[i][2], r[i][3]))
877
878         reply = self.vapi.acl_add_replace(acl_index=4294967295, r=rules)
879         result = self.vapi.acl_dump(reply.acl_index)
880
881         i = 0
882         for drules in result:
883             for dr in drules.r:
884                 self.assertEqual(dr.is_ipv6, r[i][0])
885                 self.assertEqual(dr.is_permit, r[i][1])
886                 self.assertEqual(dr.proto, r[i][3])
887
888                 if r[i][2] > 0:
889                     self.assertEqual(dr.srcport_or_icmptype_first, r[i][2])
890                 else:
891                     if r[i][2] < 0:
892                         self.assertEqual(dr.srcport_or_icmptype_first, 0)
893                         self.assertEqual(dr.srcport_or_icmptype_last, 65535)
894                     else:
895                         if dr.proto == self.proto[self.IP][self.TCP]:
896                             self.assertGreater(dr.srcport_or_icmptype_first,
897                                                self.tcp_sport_from-1)
898                             self.assertLess(dr.srcport_or_icmptype_first,
899                                             self.tcp_sport_to+1)
900                             self.assertGreater(dr.dstport_or_icmpcode_last,
901                                                self.tcp_dport_from-1)
902                             self.assertLess(dr.dstport_or_icmpcode_last,
903                                             self.tcp_dport_to+1)
904                         elif dr.proto == self.proto[self.IP][self.UDP]:
905                             self.assertGreater(dr.srcport_or_icmptype_first,
906                                                self.udp_sport_from-1)
907                             self.assertLess(dr.srcport_or_icmptype_first,
908                                             self.udp_sport_to+1)
909                             self.assertGreater(dr.dstport_or_icmpcode_last,
910                                                self.udp_dport_from-1)
911                             self.assertLess(dr.dstport_or_icmpcode_last,
912                                             self.udp_dport_to+1)
913                 i += 1
914
915         self.logger.info("ACLP_TEST_FINISH_0014")
916
917     def test_0015_tcp_permit_port_v4(self):
918         """ permit single TCPv4
919         """
920         self.logger.info("ACLP_TEST_START_0015")
921
922         port = random.randint(0, 65535)
923         # Add an ACL
924         rules = []
925         rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
926                                       self.proto[self.IP][self.TCP]))
927         # deny ip any any in the end
928         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
929
930         # Apply rules
931         self.apply_rules(rules, "permit ip4 tcp "+str(port))
932
933         # Traffic should still pass
934         self.run_verify_test(self.IP, self.IPV4,
935                              self.proto[self.IP][self.TCP], port)
936
937         self.logger.info("ACLP_TEST_FINISH_0015")
938
939     def test_0016_udp_permit_port_v4(self):
940         """ permit single UDPv4
941         """
942         self.logger.info("ACLP_TEST_START_0016")
943
944         port = random.randint(0, 65535)
945         # Add an ACL
946         rules = []
947         rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
948                                       self.proto[self.IP][self.UDP]))
949         # deny ip any any in the end
950         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
951
952         # Apply rules
953         self.apply_rules(rules, "permit ip4 tcp "+str(port))
954
955         # Traffic should still pass
956         self.run_verify_test(self.IP, self.IPV4,
957                              self.proto[self.IP][self.UDP], port)
958
959         self.logger.info("ACLP_TEST_FINISH_0016")
960
961     def test_0017_tcp_permit_port_v6(self):
962         """ permit single TCPv6
963         """
964         self.logger.info("ACLP_TEST_START_0017")
965
966         port = random.randint(0, 65535)
967         # Add an ACL
968         rules = []
969         rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
970                                       self.proto[self.IP][self.TCP]))
971         # deny ip any any in the end
972         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
973
974         # Apply rules
975         self.apply_rules(rules, "permit ip4 tcp "+str(port))
976
977         # Traffic should still pass
978         self.run_verify_test(self.IP, self.IPV6,
979                              self.proto[self.IP][self.TCP], port)
980
981         self.logger.info("ACLP_TEST_FINISH_0017")
982
983     def test_0018_udp_permit_port_v6(self):
984         """ permit single UPPv6
985         """
986         self.logger.info("ACLP_TEST_START_0018")
987
988         port = random.randint(0, 65535)
989         # Add an ACL
990         rules = []
991         rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
992                                       self.proto[self.IP][self.UDP]))
993         # deny ip any any in the end
994         rules.append(self.create_rule(self.IPV6, self.DENY,
995                                       self.PORTS_ALL, 0))
996
997         # Apply rules
998         self.apply_rules(rules, "permit ip4 tcp "+str(port))
999
1000         # Traffic should still pass
1001         self.run_verify_test(self.IP, self.IPV6,
1002                              self.proto[self.IP][self.UDP], port)
1003
1004         self.logger.info("ACLP_TEST_FINISH_0018")
1005
1006     def test_0019_udp_deny_port(self):
1007         """ deny single TCPv4/v6
1008         """
1009         self.logger.info("ACLP_TEST_START_0019")
1010
1011         port = random.randint(0, 65535)
1012         # Add an ACL
1013         rules = []
1014         rules.append(self.create_rule(self.IPV4, self.DENY, port,
1015                                       self.proto[self.IP][self.TCP]))
1016         rules.append(self.create_rule(self.IPV6, self.DENY, port,
1017                                       self.proto[self.IP][self.TCP]))
1018         # Permit ip any any in the end
1019         rules.append(self.create_rule(self.IPV4, self.PERMIT,
1020                                       self.PORTS_ALL, 0))
1021         rules.append(self.create_rule(self.IPV6, self.PERMIT,
1022                                       self.PORTS_ALL, 0))
1023
1024         # Apply rules
1025         self.apply_rules(rules, "deny ip4/ip6 udp "+str(port))
1026
1027         # Traffic should not pass
1028         self.run_verify_negat_test(self.IP, self.IPRANDOM,
1029                                    self.proto[self.IP][self.TCP], port)
1030
1031         self.logger.info("ACLP_TEST_FINISH_0019")
1032
1033     def test_0020_udp_deny_port(self):
1034         """ deny single UDPv4/v6
1035         """
1036         self.logger.info("ACLP_TEST_START_0020")
1037
1038         port = random.randint(0, 65535)
1039         # Add an ACL
1040         rules = []
1041         rules.append(self.create_rule(self.IPV4, self.DENY, port,
1042                                       self.proto[self.IP][self.UDP]))
1043         rules.append(self.create_rule(self.IPV6, self.DENY, port,
1044                                       self.proto[self.IP][self.UDP]))
1045         # Permit ip any any in the end
1046         rules.append(self.create_rule(self.IPV4, self.PERMIT,
1047                                       self.PORTS_ALL, 0))
1048         rules.append(self.create_rule(self.IPV6, self.PERMIT,
1049                                       self.PORTS_ALL, 0))
1050
1051         # Apply rules
1052         self.apply_rules(rules, "deny ip4/ip6 udp "+str(port))
1053
1054         # Traffic should not pass
1055         self.run_verify_negat_test(self.IP, self.IPRANDOM,
1056                                    self.proto[self.IP][self.UDP], port)
1057
1058         self.logger.info("ACLP_TEST_FINISH_0020")
1059
1060     def test_0021_udp_deny_port_verify_fragment_deny(self):
1061         """ deny single UDPv4/v6, permit ip any, verify non-initial fragment blocked
1062         """
1063         self.logger.info("ACLP_TEST_START_0021")
1064
1065         port = random.randint(0, 65535)
1066         # Add an ACL
1067         rules = []
1068         rules.append(self.create_rule(self.IPV4, self.DENY, port,
1069                                       self.proto[self.IP][self.UDP]))
1070         rules.append(self.create_rule(self.IPV6, self.DENY, port,
1071                                       self.proto[self.IP][self.UDP]))
1072         # deny ip any any in the end
1073         rules.append(self.create_rule(self.IPV4, self.PERMIT,
1074                                       self.PORTS_ALL, 0))
1075         rules.append(self.create_rule(self.IPV6, self.PERMIT,
1076                                       self.PORTS_ALL, 0))
1077
1078         # Apply rules
1079         self.apply_rules(rules, "deny ip4/ip6 udp "+str(port))
1080
1081         # Traffic should not pass
1082         self.run_verify_negat_test(self.IP, self.IPRANDOM,
1083                                    self.proto[self.IP][self.UDP], port, True)
1084
1085         self.logger.info("ACLP_TEST_FINISH_0021")
1086
1087     def test_0022_zero_length_udp_ipv4(self):
1088         """ VPP-687 zero length udp ipv4 packet"""
1089         self.logger.info("ACLP_TEST_START_0022")
1090
1091         port = random.randint(0, 65535)
1092         # Add an ACL
1093         rules = []
1094         rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
1095                                       self.proto[self.IP][self.UDP]))
1096         # deny ip any any in the end
1097         rules.append(
1098             self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1099
1100         # Apply rules
1101         self.apply_rules(rules, "permit empty udp ip4 " + str(port))
1102
1103         # Traffic should still pass
1104         # Create incoming packet streams for packet-generator interfaces
1105         pkts_cnt = 0
1106         pkts = self.create_stream(self.pg0, self.pg_if_packet_sizes,
1107                                   self.IP, self.IPV4,
1108                                   self.proto[self.IP][self.UDP], port,
1109                                   False, False)
1110         if len(pkts) > 0:
1111             self.pg0.add_stream(pkts)
1112             pkts_cnt += len(pkts)
1113
1114         # Enable packet capture and start packet sendingself.IPV
1115         self.pg_enable_capture(self.pg_interfaces)
1116         self.pg_start()
1117
1118         self.pg1.get_capture(pkts_cnt)
1119
1120         self.logger.info("ACLP_TEST_FINISH_0022")
1121
1122     def test_0023_zero_length_udp_ipv6(self):
1123         """ VPP-687 zero length udp ipv6 packet"""
1124         self.logger.info("ACLP_TEST_START_0023")
1125
1126         port = random.randint(0, 65535)
1127         # Add an ACL
1128         rules = []
1129         rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
1130                                       self.proto[self.IP][self.UDP]))
1131         # deny ip any any in the end
1132         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1133
1134         # Apply rules
1135         self.apply_rules(rules, "permit empty udp ip6 "+str(port))
1136
1137         # Traffic should still pass
1138         # Create incoming packet streams for packet-generator interfaces
1139         pkts_cnt = 0
1140         pkts = self.create_stream(self.pg0, self.pg_if_packet_sizes,
1141                                   self.IP, self.IPV6,
1142                                   self.proto[self.IP][self.UDP], port,
1143                                   False, False)
1144         if len(pkts) > 0:
1145             self.pg0.add_stream(pkts)
1146             pkts_cnt += len(pkts)
1147
1148         # Enable packet capture and start packet sendingself.IPV
1149         self.pg_enable_capture(self.pg_interfaces)
1150         self.pg_start()
1151
1152         # Verify outgoing packet streams per packet-generator interface
1153         self.pg1.get_capture(pkts_cnt)
1154
1155         self.logger.info("ACLP_TEST_FINISH_0023")
1156
1157     def test_0108_tcp_permit_v4(self):
1158         """ permit TCPv4 + non-match range
1159         """
1160         self.logger.info("ACLP_TEST_START_0108")
1161
1162         # Add an ACL
1163         rules = []
1164         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1165                      self.proto[self.IP][self.TCP]))
1166         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1167                      self.proto[self.IP][self.TCP]))
1168         # deny ip any any in the end
1169         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1170
1171         # Apply rules
1172         self.apply_rules(rules, "permit ipv4 tcp")
1173
1174         # Traffic should still pass
1175         self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP])
1176
1177         self.logger.info("ACLP_TEST_FINISH_0108")
1178
1179     def test_0109_tcp_permit_v6(self):
1180         """ permit TCPv6 + non-match range
1181         """
1182         self.logger.info("ACLP_TEST_START_0109")
1183
1184         # Add an ACL
1185         rules = []
1186         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE_2,
1187                                       self.proto[self.IP][self.TCP]))
1188         rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
1189                                       self.proto[self.IP][self.TCP]))
1190         # deny ip any any in the end
1191         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1192
1193         # Apply rules
1194         self.apply_rules(rules, "permit ip6 tcp")
1195
1196         # Traffic should still pass
1197         self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.TCP])
1198
1199         self.logger.info("ACLP_TEST_FINISH_0109")
1200
1201     def test_0110_udp_permit_v4(self):
1202         """ permit UDPv4 + non-match range
1203         """
1204         self.logger.info("ACLP_TEST_START_0110")
1205
1206         # Add an ACL
1207         rules = []
1208         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1209                                       self.proto[self.IP][self.UDP]))
1210         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1211                                       self.proto[self.IP][self.UDP]))
1212         # deny ip any any in the end
1213         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1214
1215         # Apply rules
1216         self.apply_rules(rules, "permit ipv4 udp")
1217
1218         # Traffic should still pass
1219         self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP])
1220
1221         self.logger.info("ACLP_TEST_FINISH_0110")
1222
1223     def test_0111_udp_permit_v6(self):
1224         """ permit UDPv6 + non-match range
1225         """
1226         self.logger.info("ACLP_TEST_START_0111")
1227
1228         # Add an ACL
1229         rules = []
1230         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE_2,
1231                                       self.proto[self.IP][self.UDP]))
1232         rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
1233                                       self.proto[self.IP][self.UDP]))
1234         # deny ip any any in the end
1235         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1236
1237         # Apply rules
1238         self.apply_rules(rules, "permit ip6 udp")
1239
1240         # Traffic should still pass
1241         self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.UDP])
1242
1243         self.logger.info("ACLP_TEST_FINISH_0111")
1244
1245     def test_0112_tcp_deny(self):
1246         """ deny TCPv4/v6 + non-match range
1247         """
1248         self.logger.info("ACLP_TEST_START_0112")
1249
1250         # Add an ACL
1251         rules = []
1252         rules.append(self.create_rule(self.IPV4, self.PERMIT,
1253                                       self.PORTS_RANGE_2,
1254                                       self.proto[self.IP][self.TCP]))
1255         rules.append(self.create_rule(self.IPV6, self.PERMIT,
1256                                       self.PORTS_RANGE_2,
1257                                       self.proto[self.IP][self.TCP]))
1258         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
1259                                       self.proto[self.IP][self.TCP]))
1260         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
1261                                       self.proto[self.IP][self.TCP]))
1262         # permit ip any any in the end
1263         rules.append(self.create_rule(self.IPV4, self.PERMIT,
1264                                       self.PORTS_ALL, 0))
1265         rules.append(self.create_rule(self.IPV6, self.PERMIT,
1266                                       self.PORTS_ALL, 0))
1267
1268         # Apply rules
1269         self.apply_rules(rules, "deny ip4/ip6 tcp")
1270
1271         # Traffic should not pass
1272         self.run_verify_negat_test(self.IP, self.IPRANDOM,
1273                                    self.proto[self.IP][self.TCP])
1274
1275         self.logger.info("ACLP_TEST_FINISH_0112")
1276
1277     def test_0113_udp_deny(self):
1278         """ deny UDPv4/v6 + non-match range
1279         """
1280         self.logger.info("ACLP_TEST_START_0113")
1281
1282         # Add an ACL
1283         rules = []
1284         rules.append(self.create_rule(self.IPV4, self.PERMIT,
1285                                       self.PORTS_RANGE_2,
1286                                       self.proto[self.IP][self.UDP]))
1287         rules.append(self.create_rule(self.IPV6, self.PERMIT,
1288                                       self.PORTS_RANGE_2,
1289                                       self.proto[self.IP][self.UDP]))
1290         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
1291                                       self.proto[self.IP][self.UDP]))
1292         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
1293                                       self.proto[self.IP][self.UDP]))
1294         # permit ip any any in the end
1295         rules.append(self.create_rule(self.IPV4, self.PERMIT,
1296                                       self.PORTS_ALL, 0))
1297         rules.append(self.create_rule(self.IPV6, self.PERMIT,
1298                                       self.PORTS_ALL, 0))
1299
1300         # Apply rules
1301         self.apply_rules(rules, "deny ip4/ip6 udp")
1302
1303         # Traffic should not pass
1304         self.run_verify_negat_test(self.IP, self.IPRANDOM,
1305                                    self.proto[self.IP][self.UDP])
1306
1307         self.logger.info("ACLP_TEST_FINISH_0113")
1308
1309
1310 if __name__ == '__main__':
1311     unittest.main(testRunner=VppTestRunner)