605efbd4ccbbf94abc9a82263a2be46d26d6b814
[vpp.git] / test / test_acl_plugin.py
1 #!/usr/bin/env python
2 """ACL plugin Test Case HLD:
3 """
4
5 import unittest
6 import random
7
8 from scapy.packet import Raw
9 from scapy.layers.l2 import Ether
10 from scapy.layers.inet import IP, TCP, UDP, ICMP
11 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest
12 from scapy.layers.inet6 import IPv6ExtHdrFragment
13 from framework import VppTestCase, VppTestRunner
14 from util import Host, ppp
15
16
17 class TestACLplugin(VppTestCase):
18     """ ACL plugin Test Case """
19
20     # traffic types
21     IP = 0
22     ICMP = 1
23
24     # IP version
25     IPRANDOM = -1
26     IPV4 = 0
27     IPV6 = 1
28
29     # rule types
30     DENY = 0
31     PERMIT = 1
32
33     # supported protocols
34     proto = [[6, 17], [1, 58]]
35     proto_map = {1: 'ICMP', 58: 'ICMPv6EchoRequest', 6: 'TCP', 17: 'UDP'}
36     ICMPv4 = 0
37     ICMPv6 = 1
38     TCP = 0
39     UDP = 1
40     PROTO_ALL = 0
41
42     # port ranges
43     PORTS_ALL = -1
44     PORTS_RANGE = 0
45     PORTS_RANGE_2 = 1
46     udp_sport_from = 10
47     udp_sport_to = udp_sport_from + 5
48     udp_dport_from = 20000
49     udp_dport_to = udp_dport_from + 5000
50     tcp_sport_from = 30
51     tcp_sport_to = tcp_sport_from + 5
52     tcp_dport_from = 40000
53     tcp_dport_to = tcp_dport_from + 5000
54
55     udp_sport_from_2 = 90
56     udp_sport_to_2 = udp_sport_from_2 + 5
57     udp_dport_from_2 = 30000
58     udp_dport_to_2 = udp_dport_from_2 + 5000
59     tcp_sport_from_2 = 130
60     tcp_sport_to_2 = tcp_sport_from_2 + 5
61     tcp_dport_from_2 = 20000
62     tcp_dport_to_2 = tcp_dport_from_2 + 5000
63
64     icmp4_type = 8  # echo request
65     icmp4_code = 3
66     icmp6_type = 128  # echo request
67     icmp6_code = 3
68
69     icmp4_type_2 = 8
70     icmp4_code_from_2 = 5
71     icmp4_code_to_2 = 20
72     icmp6_type_2 = 128
73     icmp6_code_from_2 = 8
74     icmp6_code_to_2 = 42
75
76     # Test variables
77     bd_id = 1
78
79     @classmethod
80     def setUpClass(cls):
81         """
82         Perform standard class setup (defined by class method setUpClass in
83         class VppTestCase) before running the test case, set test case related
84         variables and configure VPP.
85         """
86         super(TestACLplugin, cls).setUpClass()
87
88         random.seed()
89
90         try:
91             # Create 2 pg interfaces
92             cls.create_pg_interfaces(range(2))
93
94             # Packet flows mapping pg0 -> pg1, pg2 etc.
95             cls.flows = dict()
96             cls.flows[cls.pg0] = [cls.pg1]
97
98             # Packet sizes
99             cls.pg_if_packet_sizes = [64, 512, 1518, 9018]
100
101             # Create BD with MAC learning and unknown unicast flooding disabled
102             # and put interfaces to this BD
103             cls.vapi.bridge_domain_add_del(bd_id=cls.bd_id, uu_flood=1,
104                                            learn=1)
105             for pg_if in cls.pg_interfaces:
106                 cls.vapi.sw_interface_set_l2_bridge(pg_if.sw_if_index,
107                                                     bd_id=cls.bd_id)
108
109             # Set up all interfaces
110             for i in cls.pg_interfaces:
111                 i.admin_up()
112
113             # Mapping between packet-generator index and lists of test hosts
114             cls.hosts_by_pg_idx = dict()
115             for pg_if in cls.pg_interfaces:
116                 cls.hosts_by_pg_idx[pg_if.sw_if_index] = []
117
118             # Create list of deleted hosts
119             cls.deleted_hosts_by_pg_idx = dict()
120             for pg_if in cls.pg_interfaces:
121                 cls.deleted_hosts_by_pg_idx[pg_if.sw_if_index] = []
122
123             # warm-up the mac address tables
124             # self.warmup_test()
125
126         except Exception:
127             super(TestACLplugin, cls).tearDownClass()
128             raise
129
130     def setUp(self):
131         super(TestACLplugin, self).setUp()
132         self.reset_packet_infos()
133
134     def tearDown(self):
135         """
136         Show various debug prints after each test.
137         """
138         super(TestACLplugin, self).tearDown()
139         if not self.vpp_dead:
140             self.logger.info(self.vapi.ppcli("show l2fib verbose"))
141             self.logger.info(self.vapi.ppcli("show acl-plugin acl"))
142             self.logger.info(self.vapi.ppcli("show acl-plugin interface"))
143             self.logger.info(self.vapi.ppcli("show acl-plugin tables"))
144             self.logger.info(self.vapi.ppcli("show bridge-domain %s detail"
145                                              % self.bd_id))
146
147     def create_hosts(self, count, start=0):
148         """
149         Create required number of host MAC addresses and distribute them among
150         interfaces. Create host IPv4 address for every host MAC address.
151
152         :param int count: Number of hosts to create MAC/IPv4 addresses for.
153         :param int start: Number to start numbering from.
154         """
155         n_int = len(self.pg_interfaces)
156         macs_per_if = count / n_int
157         i = -1
158         for pg_if in self.pg_interfaces:
159             i += 1
160             start_nr = macs_per_if * i + start
161             end_nr = count + start if i == (n_int - 1) \
162                 else macs_per_if * (i + 1) + start
163             hosts = self.hosts_by_pg_idx[pg_if.sw_if_index]
164             for j in range(start_nr, end_nr):
165                 host = Host(
166                     "00:00:00:ff:%02x:%02x" % (pg_if.sw_if_index, j),
167                     "172.17.1%02x.%u" % (pg_if.sw_if_index, j),
168                     "2017:dead:%02x::%u" % (pg_if.sw_if_index, j))
169                 hosts.append(host)
170
171     def create_rule(self, ip=0, permit_deny=0, ports=PORTS_ALL, proto=-1,
172                     s_prefix=0, s_ip='\x00\x00\x00\x00',
173                     d_prefix=0, d_ip='\x00\x00\x00\x00'):
174         if proto == -1:
175             return
176         if ports == self.PORTS_ALL:
177             sport_from = 0
178             dport_from = 0
179             sport_to = 65535 if proto != 1 and proto != 58 else 255
180             dport_to = sport_to
181         elif ports == self.PORTS_RANGE:
182             if proto == 1:
183                 sport_from = self.icmp4_type
184                 sport_to = self.icmp4_type
185                 dport_from = self.icmp4_code
186                 dport_to = self.icmp4_code
187             elif proto == 58:
188                 sport_from = self.icmp6_type
189                 sport_to = self.icmp6_type
190                 dport_from = self.icmp6_code
191                 dport_to = self.icmp6_code
192             elif proto == self.proto[self.IP][self.TCP]:
193                 sport_from = self.tcp_sport_from
194                 sport_to = self.tcp_sport_to
195                 dport_from = self.tcp_dport_from
196                 dport_to = self.tcp_dport_to
197             elif proto == self.proto[self.IP][self.UDP]:
198                 sport_from = self.udp_sport_from
199                 sport_to = self.udp_sport_to
200                 dport_from = self.udp_dport_from
201                 dport_to = self.udp_dport_to
202         elif ports == self.PORTS_RANGE_2:
203             if proto == 1:
204                 sport_from = self.icmp4_type_2
205                 sport_to = self.icmp4_type_2
206                 dport_from = self.icmp4_code_from_2
207                 dport_to = self.icmp4_code_to_2
208             elif proto == 58:
209                 sport_from = self.icmp6_type_2
210                 sport_to = self.icmp6_type_2
211                 dport_from = self.icmp6_code_from_2
212                 dport_to = self.icmp6_code_to_2
213             elif proto == self.proto[self.IP][self.TCP]:
214                 sport_from = self.tcp_sport_from_2
215                 sport_to = self.tcp_sport_to_2
216                 dport_from = self.tcp_dport_from_2
217                 dport_to = self.tcp_dport_to_2
218             elif proto == self.proto[self.IP][self.UDP]:
219                 sport_from = self.udp_sport_from_2
220                 sport_to = self.udp_sport_to_2
221                 dport_from = self.udp_dport_from_2
222                 dport_to = self.udp_dport_to_2
223         else:
224             sport_from = ports
225             sport_to = ports
226             dport_from = ports
227             dport_to = ports
228
229         rule = ({'is_permit': permit_deny, 'is_ipv6': ip, 'proto': proto,
230                  'srcport_or_icmptype_first': sport_from,
231                  'srcport_or_icmptype_last': sport_to,
232                  'src_ip_prefix_len': s_prefix,
233                  'src_ip_addr': s_ip,
234                  'dstport_or_icmpcode_first': dport_from,
235                  'dstport_or_icmpcode_last': dport_to,
236                  'dst_ip_prefix_len': d_prefix,
237                  'dst_ip_addr': d_ip})
238         return rule
239
240     def apply_rules(self, rules, tag=''):
241         reply = self.vapi.acl_add_replace(acl_index=4294967295, r=rules,
242                                           tag=tag)
243         self.logger.info("Dumped ACL: " + str(
244             self.vapi.acl_dump(reply.acl_index)))
245         # Apply a ACL on the interface as inbound
246         for i in self.pg_interfaces:
247             self.vapi.acl_interface_set_acl_list(sw_if_index=i.sw_if_index,
248                                                  n_input=1,
249                                                  acls=[reply.acl_index])
250         return
251
252     def create_upper_layer(self, packet_index, proto, ports=0):
253         p = self.proto_map[proto]
254         if p == 'UDP':
255             if ports == 0:
256                 return UDP(sport=random.randint(self.udp_sport_from,
257                                                 self.udp_sport_to),
258                            dport=random.randint(self.udp_dport_from,
259                                                 self.udp_dport_to))
260             else:
261                 return UDP(sport=ports, dport=ports)
262         elif p == 'TCP':
263             if ports == 0:
264                 return TCP(sport=random.randint(self.tcp_sport_from,
265                                                 self.tcp_sport_to),
266                            dport=random.randint(self.tcp_dport_from,
267                                                 self.tcp_dport_to))
268             else:
269                 return TCP(sport=ports, dport=ports)
270         return ''
271
272     def create_stream(self, src_if, packet_sizes, traffic_type=0, ipv6=0,
273                       proto=-1, ports=0, fragments=False, pkt_raw=True):
274         """
275         Create input packet stream for defined interface using hosts or
276         deleted_hosts list.
277
278         :param object src_if: Interface to create packet stream for.
279         :param list packet_sizes: List of required packet sizes.
280         :param traffic_type: 1: ICMP packet, 2: IPv6 with EH, 0: otherwise.
281         :return: Stream of packets.
282         """
283         pkts = []
284         if self.flows.__contains__(src_if):
285             src_hosts = self.hosts_by_pg_idx[src_if.sw_if_index]
286             for dst_if in self.flows[src_if]:
287                 dst_hosts = self.hosts_by_pg_idx[dst_if.sw_if_index]
288                 n_int = len(dst_hosts) * len(src_hosts)
289                 for i in range(0, n_int):
290                     dst_host = dst_hosts[i / len(src_hosts)]
291                     src_host = src_hosts[i % len(src_hosts)]
292                     pkt_info = self.create_packet_info(src_if, dst_if)
293                     if ipv6 == 1:
294                         pkt_info.ip = 1
295                     elif ipv6 == 0:
296                         pkt_info.ip = 0
297                     else:
298                         pkt_info.ip = random.choice([0, 1])
299                     if proto == -1:
300                         pkt_info.proto = random.choice(self.proto[self.IP])
301                     else:
302                         pkt_info.proto = proto
303                     payload = self.info_to_payload(pkt_info)
304                     p = Ether(dst=dst_host.mac, src=src_host.mac)
305                     if pkt_info.ip:
306                         p /= IPv6(dst=dst_host.ip6, src=src_host.ip6)
307                         if fragments:
308                             p /= IPv6ExtHdrFragment(offset=64, m=1)
309                     else:
310                         if fragments:
311                             p /= IP(src=src_host.ip4, dst=dst_host.ip4,
312                                     flags=1, frag=64)
313                         else:
314                             p /= IP(src=src_host.ip4, dst=dst_host.ip4)
315                     if traffic_type == self.ICMP:
316                         if pkt_info.ip:
317                             p /= ICMPv6EchoRequest(type=self.icmp6_type,
318                                                    code=self.icmp6_code)
319                         else:
320                             p /= ICMP(type=self.icmp4_type,
321                                       code=self.icmp4_code)
322                     else:
323                         p /= self.create_upper_layer(i, pkt_info.proto, ports)
324                     if pkt_raw:
325                         p /= Raw(payload)
326                         pkt_info.data = p.copy()
327                     if pkt_raw:
328                         size = random.choice(packet_sizes)
329                         self.extend_packet(p, size)
330                     pkts.append(p)
331         return pkts
332
333     def verify_capture(self, pg_if, capture, traffic_type=0, ip_type=0):
334         """
335         Verify captured input packet stream for defined interface.
336
337         :param object pg_if: Interface to verify captured packet stream for.
338         :param list capture: Captured packet stream.
339         :param traffic_type: 1: ICMP packet, 2: IPv6 with EH, 0: otherwise.
340         """
341         last_info = dict()
342         for i in self.pg_interfaces:
343             last_info[i.sw_if_index] = None
344         dst_sw_if_index = pg_if.sw_if_index
345         for packet in capture:
346             try:
347                 # Raw data for ICMPv6 are stored in ICMPv6EchoRequest.data
348                 if traffic_type == self.ICMP and ip_type == self.IPV6:
349                     payload_info = self.payload_to_info(
350                         packet[ICMPv6EchoRequest].data)
351                     payload = packet[ICMPv6EchoRequest]
352                 else:
353                     payload_info = self.payload_to_info(str(packet[Raw]))
354                     payload = packet[self.proto_map[payload_info.proto]]
355             except:
356                 self.logger.error(ppp("Unexpected or invalid packet "
357                                       "(outside network):", packet))
358                 raise
359
360             if ip_type != 0:
361                 self.assertEqual(payload_info.ip, ip_type)
362             if traffic_type == self.ICMP:
363                 try:
364                     if payload_info.ip == 0:
365                         self.assertEqual(payload.type, self.icmp4_type)
366                         self.assertEqual(payload.code, self.icmp4_code)
367                     else:
368                         self.assertEqual(payload.type, self.icmp6_type)
369                         self.assertEqual(payload.code, self.icmp6_code)
370                 except:
371                     self.logger.error(ppp("Unexpected or invalid packet "
372                                           "(outside network):", packet))
373                     raise
374             else:
375                 try:
376                     ip_version = IPv6 if payload_info.ip == 1 else IP
377
378                     ip = packet[ip_version]
379                     packet_index = payload_info.index
380
381                     self.assertEqual(payload_info.dst, dst_sw_if_index)
382                     self.logger.debug("Got packet on port %s: src=%u (id=%u)" %
383                                       (pg_if.name, payload_info.src,
384                                        packet_index))
385                     next_info = self.get_next_packet_info_for_interface2(
386                         payload_info.src, dst_sw_if_index,
387                         last_info[payload_info.src])
388                     last_info[payload_info.src] = next_info
389                     self.assertTrue(next_info is not None)
390                     self.assertEqual(packet_index, next_info.index)
391                     saved_packet = next_info.data
392                     # Check standard fields
393                     self.assertEqual(ip.src, saved_packet[ip_version].src)
394                     self.assertEqual(ip.dst, saved_packet[ip_version].dst)
395                     p = self.proto_map[payload_info.proto]
396                     if p == 'TCP':
397                         tcp = packet[TCP]
398                         self.assertEqual(tcp.sport, saved_packet[
399                             TCP].sport)
400                         self.assertEqual(tcp.dport, saved_packet[
401                             TCP].dport)
402                     elif p == 'UDP':
403                         udp = packet[UDP]
404                         self.assertEqual(udp.sport, saved_packet[
405                             UDP].sport)
406                         self.assertEqual(udp.dport, saved_packet[
407                             UDP].dport)
408                 except:
409                     self.logger.error(ppp("Unexpected or invalid packet:",
410                                           packet))
411                     raise
412         for i in self.pg_interfaces:
413             remaining_packet = self.get_next_packet_info_for_interface2(
414                 i, dst_sw_if_index, last_info[i.sw_if_index])
415             self.assertTrue(
416                 remaining_packet is None,
417                 "Port %u: Packet expected from source %u didn't arrive" %
418                 (dst_sw_if_index, i.sw_if_index))
419
420     def run_traffic_no_check(self):
421         # Test
422         # Create incoming packet streams for packet-generator interfaces
423         for i in self.pg_interfaces:
424             if self.flows.__contains__(i):
425                 pkts = self.create_stream(i, self.pg_if_packet_sizes)
426                 if len(pkts) > 0:
427                     i.add_stream(pkts)
428
429         # Enable packet capture and start packet sending
430         self.pg_enable_capture(self.pg_interfaces)
431         self.pg_start()
432
433     def run_verify_test(self, traffic_type=0, ip_type=0, proto=-1, ports=0,
434                         frags=False, pkt_raw=True):
435         # Test
436         # Create incoming packet streams for packet-generator interfaces
437         pkts_cnt = 0
438         for i in self.pg_interfaces:
439             if self.flows.__contains__(i):
440                 pkts = self.create_stream(i, self.pg_if_packet_sizes,
441                                           traffic_type, ip_type, proto, ports,
442                                           frags, pkt_raw)
443                 if len(pkts) > 0:
444                     i.add_stream(pkts)
445                     pkts_cnt += len(pkts)
446
447         # Enable packet capture and start packet sendingself.IPV
448         self.pg_enable_capture(self.pg_interfaces)
449         self.pg_start()
450
451         # Verify
452         # Verify outgoing packet streams per packet-generator interface
453         for src_if in self.pg_interfaces:
454             if self.flows.__contains__(src_if):
455                 for dst_if in self.flows[src_if]:
456                     capture = dst_if.get_capture(pkts_cnt)
457                     self.logger.info("Verifying capture on interface %s" %
458                                      dst_if.name)
459                     self.verify_capture(dst_if, capture, traffic_type, ip_type)
460
461     def run_verify_negat_test(self, traffic_type=0, ip_type=0, proto=-1,
462                               ports=0, frags=False):
463         # Test
464         self.reset_packet_infos()
465         for i in self.pg_interfaces:
466             if self.flows.__contains__(i):
467                 pkts = self.create_stream(i, self.pg_if_packet_sizes,
468                                           traffic_type, ip_type, proto, ports,
469                                           frags)
470                 if len(pkts) > 0:
471                     i.add_stream(pkts)
472
473         # Enable packet capture and start packet sending
474         self.pg_enable_capture(self.pg_interfaces)
475         self.pg_start()
476
477         # Verify
478         # Verify outgoing packet streams per packet-generator interface
479         for src_if in self.pg_interfaces:
480             if self.flows.__contains__(src_if):
481                 for dst_if in self.flows[src_if]:
482                     self.logger.info("Verifying capture on interface %s" %
483                                      dst_if.name)
484                     capture = dst_if.get_capture(0)
485                     self.assertEqual(len(capture), 0)
486
487     def test_0000_warmup_test(self):
488         """ ACL plugin version check; learn MACs
489         """
490         self.create_hosts(16)
491         self.run_traffic_no_check()
492         reply = self.vapi.papi.acl_plugin_get_version()
493         self.assertEqual(reply.major, 1)
494         self.logger.info("Working with ACL plugin version: %d.%d" % (
495             reply.major, reply.minor))
496         # minor version changes are non breaking
497         # self.assertEqual(reply.minor, 0)
498
499     def test_0001_acl_create(self):
500         """ ACL create test
501         """
502
503         self.logger.info("ACLP_TEST_START_0001")
504         # Add an ACL
505         r = [{'is_permit': 1, 'is_ipv6': 0, 'proto': 17,
506               'srcport_or_icmptype_first': 1234,
507               'srcport_or_icmptype_last': 1235,
508               'src_ip_prefix_len': 0,
509               'src_ip_addr': '\x00\x00\x00\x00',
510               'dstport_or_icmpcode_first': 1234,
511               'dstport_or_icmpcode_last': 1234,
512               'dst_ip_addr': '\x00\x00\x00\x00',
513               'dst_ip_prefix_len': 0}]
514         # Test 1: add a new ACL
515         reply = self.vapi.acl_add_replace(acl_index=4294967295, r=r,
516                                           tag="permit 1234")
517         self.assertEqual(reply.retval, 0)
518         # The very first ACL gets #0
519         self.assertEqual(reply.acl_index, 0)
520         rr = self.vapi.acl_dump(reply.acl_index)
521         self.logger.info("Dumped ACL: " + str(rr))
522         self.assertEqual(len(rr), 1)
523         # We should have the same number of ACL entries as we had asked
524         self.assertEqual(len(rr[0].r), len(r))
525         # The rules should be the same. But because the submitted and returned
526         # are different types, we need to iterate over rules and keys to get
527         # to basic values.
528         for i_rule in range(0, len(r) - 1):
529             for rule_key in r[i_rule]:
530                 self.assertEqual(rr[0].r[i_rule][rule_key],
531                                  r[i_rule][rule_key])
532
533         # Add a deny-1234 ACL
534         r_deny = ({'is_permit': 0, 'is_ipv6': 0, 'proto': 17,
535                    'srcport_or_icmptype_first': 1234,
536                    'srcport_or_icmptype_last': 1235,
537                    'src_ip_prefix_len': 0,
538                    'src_ip_addr': '\x00\x00\x00\x00',
539                    'dstport_or_icmpcode_first': 1234,
540                    'dstport_or_icmpcode_last': 1234,
541                    'dst_ip_addr': '\x00\x00\x00\x00',
542                    'dst_ip_prefix_len': 0},
543                   {'is_permit': 1, 'is_ipv6': 0, 'proto': 17,
544                    'srcport_or_icmptype_first': 0,
545                    'srcport_or_icmptype_last': 0,
546                    'src_ip_prefix_len': 0,
547                    'src_ip_addr': '\x00\x00\x00\x00',
548                    'dstport_or_icmpcode_first': 0,
549                    'dstport_or_icmpcode_last': 0,
550                    'dst_ip_addr': '\x00\x00\x00\x00',
551                    'dst_ip_prefix_len': 0})
552
553         reply = self.vapi.acl_add_replace(acl_index=4294967295, r=r_deny,
554                                           tag="deny 1234;permit all")
555         self.assertEqual(reply.retval, 0)
556         # The second ACL gets #1
557         self.assertEqual(reply.acl_index, 1)
558
559         # Test 2: try to modify a nonexistent ACL
560         reply = self.vapi.acl_add_replace(acl_index=432, r=r,
561                                           tag="FFFF:FFFF", expected_retval=-1)
562         self.assertEqual(reply.retval, -1)
563         # The ACL number should pass through
564         self.assertEqual(reply.acl_index, 432)
565
566         self.logger.info("ACLP_TEST_FINISH_0001")
567
568     def test_0002_acl_permit_apply(self):
569         """ permit ACL apply test
570         """
571         self.logger.info("ACLP_TEST_START_0002")
572
573         rules = []
574         rules.append(self.create_rule(self.IPV4, self.PERMIT,
575                      0, self.proto[self.IP][self.UDP]))
576         rules.append(self.create_rule(self.IPV4, self.PERMIT,
577                      0, self.proto[self.IP][self.TCP]))
578
579         # Apply rules
580         self.apply_rules(rules, "permit per-flow")
581
582         # Traffic should still pass
583         self.run_verify_test(self.IP, self.IPV4, -1)
584         self.logger.info("ACLP_TEST_FINISH_0002")
585
586     def test_0003_acl_deny_apply(self):
587         """ deny ACL apply test
588         """
589         self.logger.info("ACLP_TEST_START_0003")
590         # Add a deny-flows ACL
591         rules = []
592         rules.append(self.create_rule(self.IPV4, self.DENY,
593                      self.PORTS_ALL, self.proto[self.IP][self.UDP]))
594         # Permit ip any any in the end
595         rules.append(self.create_rule(self.IPV4, self.PERMIT,
596                                       self.PORTS_ALL, 0))
597
598         # Apply rules
599         self.apply_rules(rules, "deny per-flow;permit all")
600
601         # Traffic should not pass
602         self.run_verify_negat_test(self.IP, self.IPV4,
603                                    self.proto[self.IP][self.UDP])
604         self.logger.info("ACLP_TEST_FINISH_0003")
605         # self.assertEqual(1, 0)
606
607     def test_0004_vpp624_permit_icmpv4(self):
608         """ VPP_624 permit ICMPv4
609         """
610         self.logger.info("ACLP_TEST_START_0004")
611
612         # Add an ACL
613         rules = []
614         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
615                                       self.proto[self.ICMP][self.ICMPv4]))
616         # deny ip any any in the end
617         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
618
619         # Apply rules
620         self.apply_rules(rules, "permit icmpv4")
621
622         # Traffic should still pass
623         self.run_verify_test(self.ICMP, self.IPV4,
624                              self.proto[self.ICMP][self.ICMPv4])
625
626         self.logger.info("ACLP_TEST_FINISH_0004")
627
628     def test_0005_vpp624_permit_icmpv6(self):
629         """ VPP_624 permit ICMPv6
630         """
631         self.logger.info("ACLP_TEST_START_0005")
632
633         # Add an ACL
634         rules = []
635         rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
636                                       self.proto[self.ICMP][self.ICMPv6]))
637         # deny ip any any in the end
638         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
639
640         # Apply rules
641         self.apply_rules(rules, "permit icmpv6")
642
643         # Traffic should still pass
644         self.run_verify_test(self.ICMP, self.IPV6,
645                              self.proto[self.ICMP][self.ICMPv6])
646
647         self.logger.info("ACLP_TEST_FINISH_0005")
648
649     def test_0006_vpp624_deny_icmpv4(self):
650         """ VPP_624 deny ICMPv4
651         """
652         self.logger.info("ACLP_TEST_START_0006")
653         # Add an ACL
654         rules = []
655         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
656                                       self.proto[self.ICMP][self.ICMPv4]))
657         # permit ip any any in the end
658         rules.append(self.create_rule(self.IPV4, self.PERMIT,
659                                       self.PORTS_ALL, 0))
660
661         # Apply rules
662         self.apply_rules(rules, "deny icmpv4")
663
664         # Traffic should not pass
665         self.run_verify_negat_test(self.ICMP, self.IPV4, 0)
666
667         self.logger.info("ACLP_TEST_FINISH_0006")
668
669     def test_0007_vpp624_deny_icmpv6(self):
670         """ VPP_624 deny ICMPv6
671         """
672         self.logger.info("ACLP_TEST_START_0007")
673         # Add an ACL
674         rules = []
675         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
676                                       self.proto[self.ICMP][self.ICMPv6]))
677         # deny ip any any in the end
678         rules.append(self.create_rule(self.IPV6, self.PERMIT,
679                                       self.PORTS_ALL, 0))
680
681         # Apply rules
682         self.apply_rules(rules, "deny icmpv6")
683
684         # Traffic should not pass
685         self.run_verify_negat_test(self.ICMP, self.IPV6, 0)
686
687         self.logger.info("ACLP_TEST_FINISH_0007")
688
689     def test_0008_tcp_permit_v4(self):
690         """ permit TCPv4
691         """
692         self.logger.info("ACLP_TEST_START_0008")
693
694         # Add an ACL
695         rules = []
696         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
697                      self.proto[self.IP][self.TCP]))
698         # deny ip any any in the end
699         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
700
701         # Apply rules
702         self.apply_rules(rules, "permit ipv4 tcp")
703
704         # Traffic should still pass
705         self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP])
706
707         self.logger.info("ACLP_TEST_FINISH_0008")
708
709     def test_0009_tcp_permit_v6(self):
710         """ permit TCPv6
711         """
712         self.logger.info("ACLP_TEST_START_0009")
713
714         # Add an ACL
715         rules = []
716         rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
717                                       self.proto[self.IP][self.TCP]))
718         # deny ip any any in the end
719         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
720
721         # Apply rules
722         self.apply_rules(rules, "permit ip6 tcp")
723
724         # Traffic should still pass
725         self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.TCP])
726
727         self.logger.info("ACLP_TEST_FINISH_0008")
728
729     def test_0010_udp_permit_v4(self):
730         """ permit UDPv4
731         """
732         self.logger.info("ACLP_TEST_START_0010")
733
734         # Add an ACL
735         rules = []
736         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
737                                       self.proto[self.IP][self.UDP]))
738         # deny ip any any in the end
739         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
740
741         # Apply rules
742         self.apply_rules(rules, "permit ipv udp")
743
744         # Traffic should still pass
745         self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP])
746
747         self.logger.info("ACLP_TEST_FINISH_0010")
748
749     def test_0011_udp_permit_v6(self):
750         """ permit UDPv6
751         """
752         self.logger.info("ACLP_TEST_START_0011")
753
754         # Add an ACL
755         rules = []
756         rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
757                                       self.proto[self.IP][self.UDP]))
758         # deny ip any any in the end
759         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
760
761         # Apply rules
762         self.apply_rules(rules, "permit ip6 udp")
763
764         # Traffic should still pass
765         self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.UDP])
766
767         self.logger.info("ACLP_TEST_FINISH_0011")
768
769     def test_0012_tcp_deny(self):
770         """ deny TCPv4/v6
771         """
772         self.logger.info("ACLP_TEST_START_0012")
773
774         # Add an ACL
775         rules = []
776         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
777                                       self.proto[self.IP][self.TCP]))
778         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
779                                       self.proto[self.IP][self.TCP]))
780         # permit ip any any in the end
781         rules.append(self.create_rule(self.IPV4, self.PERMIT,
782                                       self.PORTS_ALL, 0))
783         rules.append(self.create_rule(self.IPV6, self.PERMIT,
784                                       self.PORTS_ALL, 0))
785
786         # Apply rules
787         self.apply_rules(rules, "deny ip4/ip6 tcp")
788
789         # Traffic should not pass
790         self.run_verify_negat_test(self.IP, self.IPRANDOM,
791                                    self.proto[self.IP][self.TCP])
792
793         self.logger.info("ACLP_TEST_FINISH_0012")
794
795     def test_0013_udp_deny(self):
796         """ deny UDPv4/v6
797         """
798         self.logger.info("ACLP_TEST_START_0013")
799
800         # Add an ACL
801         rules = []
802         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
803                                       self.proto[self.IP][self.UDP]))
804         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
805                                       self.proto[self.IP][self.UDP]))
806         # permit ip any any in the end
807         rules.append(self.create_rule(self.IPV4, self.PERMIT,
808                                       self.PORTS_ALL, 0))
809         rules.append(self.create_rule(self.IPV6, self.PERMIT,
810                                       self.PORTS_ALL, 0))
811
812         # Apply rules
813         self.apply_rules(rules, "deny ip4/ip6 udp")
814
815         # Traffic should not pass
816         self.run_verify_negat_test(self.IP, self.IPRANDOM,
817                                    self.proto[self.IP][self.UDP])
818
819         self.logger.info("ACLP_TEST_FINISH_0013")
820
821     def test_0014_acl_dump(self):
822         """ verify add/dump acls
823         """
824         self.logger.info("ACLP_TEST_START_0014")
825
826         r = [[self.IPV4, self.PERMIT, 1234, self.proto[self.IP][self.TCP]],
827              [self.IPV4, self.PERMIT, 2345, self.proto[self.IP][self.UDP]],
828              [self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.TCP]],
829              [self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.UDP]],
830              [self.IPV4, self.PERMIT, 5, self.proto[self.ICMP][self.ICMPv4]],
831              [self.IPV6, self.PERMIT, 4321, self.proto[self.IP][self.TCP]],
832              [self.IPV6, self.PERMIT, 5432, self.proto[self.IP][self.UDP]],
833              [self.IPV6, self.PERMIT, 0, self.proto[self.IP][self.TCP]],
834              [self.IPV6, self.PERMIT, 0, self.proto[self.IP][self.UDP]],
835              [self.IPV6, self.PERMIT, 6, self.proto[self.ICMP][self.ICMPv6]],
836              [self.IPV4, self.DENY, self.PORTS_ALL, 0],
837              [self.IPV4, self.DENY, 1234, self.proto[self.IP][self.TCP]],
838              [self.IPV4, self.DENY, 2345, self.proto[self.IP][self.UDP]],
839              [self.IPV4, self.DENY, 5, self.proto[self.ICMP][self.ICMPv4]],
840              [self.IPV6, self.DENY, 4321, self.proto[self.IP][self.TCP]],
841              [self.IPV6, self.DENY, 5432, self.proto[self.IP][self.UDP]],
842              [self.IPV6, self.DENY, 6, self.proto[self.ICMP][self.ICMPv6]],
843              [self.IPV6, self.DENY, self.PORTS_ALL, 0]
844              ]
845
846         # Add and verify new ACLs
847         rules = []
848         for i in range(len(r)):
849             rules.append(self.create_rule(r[i][0], r[i][1], r[i][2], r[i][3]))
850
851         reply = self.vapi.acl_add_replace(acl_index=4294967295, r=rules)
852         result = self.vapi.acl_dump(reply.acl_index)
853
854         i = 0
855         for drules in result:
856             for dr in drules.r:
857                 self.assertEqual(dr.is_ipv6, r[i][0])
858                 self.assertEqual(dr.is_permit, r[i][1])
859                 self.assertEqual(dr.proto, r[i][3])
860
861                 if r[i][2] > 0:
862                     self.assertEqual(dr.srcport_or_icmptype_first, r[i][2])
863                 else:
864                     if r[i][2] < 0:
865                         self.assertEqual(dr.srcport_or_icmptype_first, 0)
866                         self.assertEqual(dr.srcport_or_icmptype_last, 65535)
867                     else:
868                         if dr.proto == self.proto[self.IP][self.TCP]:
869                             self.assertGreater(dr.srcport_or_icmptype_first,
870                                                self.tcp_sport_from-1)
871                             self.assertLess(dr.srcport_or_icmptype_first,
872                                             self.tcp_sport_to+1)
873                             self.assertGreater(dr.dstport_or_icmpcode_last,
874                                                self.tcp_dport_from-1)
875                             self.assertLess(dr.dstport_or_icmpcode_last,
876                                             self.tcp_dport_to+1)
877                         elif dr.proto == self.proto[self.IP][self.UDP]:
878                             self.assertGreater(dr.srcport_or_icmptype_first,
879                                                self.udp_sport_from-1)
880                             self.assertLess(dr.srcport_or_icmptype_first,
881                                             self.udp_sport_to+1)
882                             self.assertGreater(dr.dstport_or_icmpcode_last,
883                                                self.udp_dport_from-1)
884                             self.assertLess(dr.dstport_or_icmpcode_last,
885                                             self.udp_dport_to+1)
886                 i += 1
887
888         self.logger.info("ACLP_TEST_FINISH_0014")
889
890     def test_0015_tcp_permit_port_v4(self):
891         """ permit single TCPv4
892         """
893         self.logger.info("ACLP_TEST_START_0015")
894
895         port = random.randint(0, 65535)
896         # Add an ACL
897         rules = []
898         rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
899                                       self.proto[self.IP][self.TCP]))
900         # deny ip any any in the end
901         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
902
903         # Apply rules
904         self.apply_rules(rules, "permit ip4 tcp "+str(port))
905
906         # Traffic should still pass
907         self.run_verify_test(self.IP, self.IPV4,
908                              self.proto[self.IP][self.TCP], port)
909
910         self.logger.info("ACLP_TEST_FINISH_0015")
911
912     def test_0016_udp_permit_port_v4(self):
913         """ permit single UDPv4
914         """
915         self.logger.info("ACLP_TEST_START_0016")
916
917         port = random.randint(0, 65535)
918         # Add an ACL
919         rules = []
920         rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
921                                       self.proto[self.IP][self.UDP]))
922         # deny ip any any in the end
923         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
924
925         # Apply rules
926         self.apply_rules(rules, "permit ip4 tcp "+str(port))
927
928         # Traffic should still pass
929         self.run_verify_test(self.IP, self.IPV4,
930                              self.proto[self.IP][self.UDP], port)
931
932         self.logger.info("ACLP_TEST_FINISH_0016")
933
934     def test_0017_tcp_permit_port_v6(self):
935         """ permit single TCPv6
936         """
937         self.logger.info("ACLP_TEST_START_0017")
938
939         port = random.randint(0, 65535)
940         # Add an ACL
941         rules = []
942         rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
943                                       self.proto[self.IP][self.TCP]))
944         # deny ip any any in the end
945         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
946
947         # Apply rules
948         self.apply_rules(rules, "permit ip4 tcp "+str(port))
949
950         # Traffic should still pass
951         self.run_verify_test(self.IP, self.IPV6,
952                              self.proto[self.IP][self.TCP], port)
953
954         self.logger.info("ACLP_TEST_FINISH_0017")
955
956     def test_0018_udp_permit_port_v6(self):
957         """ permit single UPPv6
958         """
959         self.logger.info("ACLP_TEST_START_0018")
960
961         port = random.randint(0, 65535)
962         # Add an ACL
963         rules = []
964         rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
965                                       self.proto[self.IP][self.UDP]))
966         # deny ip any any in the end
967         rules.append(self.create_rule(self.IPV6, self.DENY,
968                                       self.PORTS_ALL, 0))
969
970         # Apply rules
971         self.apply_rules(rules, "permit ip4 tcp "+str(port))
972
973         # Traffic should still pass
974         self.run_verify_test(self.IP, self.IPV6,
975                              self.proto[self.IP][self.UDP], port)
976
977         self.logger.info("ACLP_TEST_FINISH_0018")
978
979     def test_0019_udp_deny_port(self):
980         """ deny single TCPv4/v6
981         """
982         self.logger.info("ACLP_TEST_START_0019")
983
984         port = random.randint(0, 65535)
985         # Add an ACL
986         rules = []
987         rules.append(self.create_rule(self.IPV4, self.DENY, port,
988                                       self.proto[self.IP][self.TCP]))
989         rules.append(self.create_rule(self.IPV6, self.DENY, port,
990                                       self.proto[self.IP][self.TCP]))
991         # Permit ip any any in the end
992         rules.append(self.create_rule(self.IPV4, self.PERMIT,
993                                       self.PORTS_ALL, 0))
994         rules.append(self.create_rule(self.IPV6, self.PERMIT,
995                                       self.PORTS_ALL, 0))
996
997         # Apply rules
998         self.apply_rules(rules, "deny ip4/ip6 udp "+str(port))
999
1000         # Traffic should not pass
1001         self.run_verify_negat_test(self.IP, self.IPRANDOM,
1002                                    self.proto[self.IP][self.TCP], port)
1003
1004         self.logger.info("ACLP_TEST_FINISH_0019")
1005
1006     def test_0020_udp_deny_port(self):
1007         """ deny single UDPv4/v6
1008         """
1009         self.logger.info("ACLP_TEST_START_0020")
1010
1011         port = random.randint(0, 65535)
1012         # Add an ACL
1013         rules = []
1014         rules.append(self.create_rule(self.IPV4, self.DENY, port,
1015                                       self.proto[self.IP][self.UDP]))
1016         rules.append(self.create_rule(self.IPV6, self.DENY, port,
1017                                       self.proto[self.IP][self.UDP]))
1018         # Permit ip any any in the end
1019         rules.append(self.create_rule(self.IPV4, self.PERMIT,
1020                                       self.PORTS_ALL, 0))
1021         rules.append(self.create_rule(self.IPV6, self.PERMIT,
1022                                       self.PORTS_ALL, 0))
1023
1024         # Apply rules
1025         self.apply_rules(rules, "deny ip4/ip6 udp "+str(port))
1026
1027         # Traffic should not pass
1028         self.run_verify_negat_test(self.IP, self.IPRANDOM,
1029                                    self.proto[self.IP][self.UDP], port)
1030
1031         self.logger.info("ACLP_TEST_FINISH_0020")
1032
1033     def test_0021_udp_deny_port_verify_fragment_deny(self):
1034         """ deny single UDPv4/v6, permit ip any, verify non-initial fragment blocked
1035         """
1036         self.logger.info("ACLP_TEST_START_0021")
1037
1038         port = random.randint(0, 65535)
1039         # Add an ACL
1040         rules = []
1041         rules.append(self.create_rule(self.IPV4, self.DENY, port,
1042                                       self.proto[self.IP][self.UDP]))
1043         rules.append(self.create_rule(self.IPV6, self.DENY, port,
1044                                       self.proto[self.IP][self.UDP]))
1045         # deny ip any any in the end
1046         rules.append(self.create_rule(self.IPV4, self.PERMIT,
1047                                       self.PORTS_ALL, 0))
1048         rules.append(self.create_rule(self.IPV6, self.PERMIT,
1049                                       self.PORTS_ALL, 0))
1050
1051         # Apply rules
1052         self.apply_rules(rules, "deny ip4/ip6 udp "+str(port))
1053
1054         # Traffic should not pass
1055         self.run_verify_negat_test(self.IP, self.IPRANDOM,
1056                                    self.proto[self.IP][self.UDP], port, True)
1057
1058         self.logger.info("ACLP_TEST_FINISH_0021")
1059
1060     def test_0022_zero_length_udp_ipv4(self):
1061         """ VPP-687 zero length udp ipv4 packet"""
1062         self.logger.info("ACLP_TEST_START_0022")
1063
1064         port = random.randint(0, 65535)
1065         # Add an ACL
1066         rules = []
1067         rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
1068                                       self.proto[self.IP][self.UDP]))
1069         # deny ip any any in the end
1070         rules.append(
1071             self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1072
1073         # Apply rules
1074         self.apply_rules(rules, "permit empty udp ip4 " + str(port))
1075
1076         # Traffic should still pass
1077         # Create incoming packet streams for packet-generator interfaces
1078         pkts_cnt = 0
1079         pkts = self.create_stream(self.pg0, self.pg_if_packet_sizes,
1080                                   self.IP, self.IPV4,
1081                                   self.proto[self.IP][self.UDP], port,
1082                                   False, False)
1083         if len(pkts) > 0:
1084             self.pg0.add_stream(pkts)
1085             pkts_cnt += len(pkts)
1086
1087         # Enable packet capture and start packet sendingself.IPV
1088         self.pg_enable_capture(self.pg_interfaces)
1089         self.pg_start()
1090
1091         self.pg1.get_capture(pkts_cnt)
1092
1093         self.logger.info("ACLP_TEST_FINISH_0022")
1094
1095     def test_0023_zero_length_udp_ipv6(self):
1096         """ VPP-687 zero length udp ipv6 packet"""
1097         self.logger.info("ACLP_TEST_START_0023")
1098
1099         port = random.randint(0, 65535)
1100         # Add an ACL
1101         rules = []
1102         rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
1103                                       self.proto[self.IP][self.UDP]))
1104         # deny ip any any in the end
1105         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1106
1107         # Apply rules
1108         self.apply_rules(rules, "permit empty udp ip6 "+str(port))
1109
1110         # Traffic should still pass
1111         # Create incoming packet streams for packet-generator interfaces
1112         pkts_cnt = 0
1113         pkts = self.create_stream(self.pg0, self.pg_if_packet_sizes,
1114                                   self.IP, self.IPV6,
1115                                   self.proto[self.IP][self.UDP], port,
1116                                   False, False)
1117         if len(pkts) > 0:
1118             self.pg0.add_stream(pkts)
1119             pkts_cnt += len(pkts)
1120
1121         # Enable packet capture and start packet sendingself.IPV
1122         self.pg_enable_capture(self.pg_interfaces)
1123         self.pg_start()
1124
1125         # Verify outgoing packet streams per packet-generator interface
1126         self.pg1.get_capture(pkts_cnt)
1127
1128         self.logger.info("ACLP_TEST_FINISH_0023")
1129
1130     def test_0108_tcp_permit_v4(self):
1131         """ permit TCPv4 + non-match range
1132         """
1133         self.logger.info("ACLP_TEST_START_0108")
1134
1135         # Add an ACL
1136         rules = []
1137         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1138                      self.proto[self.IP][self.TCP]))
1139         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1140                      self.proto[self.IP][self.TCP]))
1141         # deny ip any any in the end
1142         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1143
1144         # Apply rules
1145         self.apply_rules(rules, "permit ipv4 tcp")
1146
1147         # Traffic should still pass
1148         self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP])
1149
1150         self.logger.info("ACLP_TEST_FINISH_0108")
1151
1152     def test_0109_tcp_permit_v6(self):
1153         """ permit TCPv6 + non-match range
1154         """
1155         self.logger.info("ACLP_TEST_START_0109")
1156
1157         # Add an ACL
1158         rules = []
1159         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE_2,
1160                                       self.proto[self.IP][self.TCP]))
1161         rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
1162                                       self.proto[self.IP][self.TCP]))
1163         # deny ip any any in the end
1164         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1165
1166         # Apply rules
1167         self.apply_rules(rules, "permit ip6 tcp")
1168
1169         # Traffic should still pass
1170         self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.TCP])
1171
1172         self.logger.info("ACLP_TEST_FINISH_0109")
1173
1174     def test_0110_udp_permit_v4(self):
1175         """ permit UDPv4 + non-match range
1176         """
1177         self.logger.info("ACLP_TEST_START_0110")
1178
1179         # Add an ACL
1180         rules = []
1181         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1182                                       self.proto[self.IP][self.UDP]))
1183         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1184                                       self.proto[self.IP][self.UDP]))
1185         # deny ip any any in the end
1186         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1187
1188         # Apply rules
1189         self.apply_rules(rules, "permit ipv4 udp")
1190
1191         # Traffic should still pass
1192         self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP])
1193
1194         self.logger.info("ACLP_TEST_FINISH_0110")
1195
1196     def test_0111_udp_permit_v6(self):
1197         """ permit UDPv6 + non-match range
1198         """
1199         self.logger.info("ACLP_TEST_START_0111")
1200
1201         # Add an ACL
1202         rules = []
1203         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE_2,
1204                                       self.proto[self.IP][self.UDP]))
1205         rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
1206                                       self.proto[self.IP][self.UDP]))
1207         # deny ip any any in the end
1208         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1209
1210         # Apply rules
1211         self.apply_rules(rules, "permit ip6 udp")
1212
1213         # Traffic should still pass
1214         self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.UDP])
1215
1216         self.logger.info("ACLP_TEST_FINISH_0111")
1217
1218     def test_0112_tcp_deny(self):
1219         """ deny TCPv4/v6 + non-match range
1220         """
1221         self.logger.info("ACLP_TEST_START_0112")
1222
1223         # Add an ACL
1224         rules = []
1225         rules.append(self.create_rule(self.IPV4, self.PERMIT,
1226                                       self.PORTS_RANGE_2,
1227                                       self.proto[self.IP][self.TCP]))
1228         rules.append(self.create_rule(self.IPV6, self.PERMIT,
1229                                       self.PORTS_RANGE_2,
1230                                       self.proto[self.IP][self.TCP]))
1231         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
1232                                       self.proto[self.IP][self.TCP]))
1233         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
1234                                       self.proto[self.IP][self.TCP]))
1235         # permit ip any any in the end
1236         rules.append(self.create_rule(self.IPV4, self.PERMIT,
1237                                       self.PORTS_ALL, 0))
1238         rules.append(self.create_rule(self.IPV6, self.PERMIT,
1239                                       self.PORTS_ALL, 0))
1240
1241         # Apply rules
1242         self.apply_rules(rules, "deny ip4/ip6 tcp")
1243
1244         # Traffic should not pass
1245         self.run_verify_negat_test(self.IP, self.IPRANDOM,
1246                                    self.proto[self.IP][self.TCP])
1247
1248         self.logger.info("ACLP_TEST_FINISH_0112")
1249
1250     def test_0113_udp_deny(self):
1251         """ deny UDPv4/v6 + non-match range
1252         """
1253         self.logger.info("ACLP_TEST_START_0113")
1254
1255         # Add an ACL
1256         rules = []
1257         rules.append(self.create_rule(self.IPV4, self.PERMIT,
1258                                       self.PORTS_RANGE_2,
1259                                       self.proto[self.IP][self.UDP]))
1260         rules.append(self.create_rule(self.IPV6, self.PERMIT,
1261                                       self.PORTS_RANGE_2,
1262                                       self.proto[self.IP][self.UDP]))
1263         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
1264                                       self.proto[self.IP][self.UDP]))
1265         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
1266                                       self.proto[self.IP][self.UDP]))
1267         # permit ip any any in the end
1268         rules.append(self.create_rule(self.IPV4, self.PERMIT,
1269                                       self.PORTS_ALL, 0))
1270         rules.append(self.create_rule(self.IPV6, self.PERMIT,
1271                                       self.PORTS_ALL, 0))
1272
1273         # Apply rules
1274         self.apply_rules(rules, "deny ip4/ip6 udp")
1275
1276         # Traffic should not pass
1277         self.run_verify_negat_test(self.IP, self.IPRANDOM,
1278                                    self.proto[self.IP][self.UDP])
1279
1280         self.logger.info("ACLP_TEST_FINISH_0113")
1281
1282
1283 if __name__ == '__main__':
1284     unittest.main(testRunner=VppTestRunner)