ACL plugin rejects ICMP messages (VPP-624)
[vpp.git] / test / test_acl_plugin.py
1 #!/usr/bin/env python
2 """ACL plugin Test Case HLD:
3 """
4
5 import unittest
6 import random
7
8 from scapy.packet import Raw
9 from scapy.layers.l2 import Ether
10 from scapy.layers.inet import IP, TCP, UDP, ICMP
11 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest
12 from framework import VppTestCase, VppTestRunner
13 from util import Host, ppp
14
15
16 class TestACLplugin(VppTestCase):
17     """ ACL plugin Test Case """
18
19     # traffic types
20     IP = 0
21     ICMP = 1
22
23     # IP version
24     IPRANDOM = -1
25     IPV4 = 0
26     IPV6 = 1
27
28     # rule types
29     DENY = 0
30     PERMIT = 1
31
32     # supported protocols
33     proto = [[6, 17], [1, 58]]
34     proto_map = {1: 'ICMP', 58: 'ICMPv6EchoRequest', 6: 'TCP', 17: 'UDP'}
35     ICMPv4 = 0
36     ICMPv6 = 1
37     TCP = 0
38     UDP = 1
39     PROTO_ALL = 0
40
41     # port ranges
42     PORTS_ALL = -1
43     PORTS_RANGE = 0
44     udp_sport_from = 10
45     udp_sport_to = udp_sport_from + 5
46     udp_dport_from = 20000
47     udp_dport_to = udp_dport_from + 5000
48     tcp_sport_from = 30
49     tcp_sport_to = tcp_sport_from + 5
50     tcp_dport_from = 40000
51     tcp_dport_to = tcp_dport_from + 5000
52
53     icmp4_type = 8  # echo request
54     icmp4_code = 3
55     icmp6_type = 128  # echo request
56     icmp6_code = 3
57
58     # Test variables
59     bd_id = 1
60
61     @classmethod
62     def setUpClass(cls):
63         """
64         Perform standard class setup (defined by class method setUpClass in
65         class VppTestCase) before running the test case, set test case related
66         variables and configure VPP.
67         """
68         super(TestACLplugin, cls).setUpClass()
69
70         random.seed()
71
72         try:
73             # Create 2 pg interfaces
74             cls.create_pg_interfaces(range(2))
75
76             # Packet flows mapping pg0 -> pg1, pg2 etc.
77             cls.flows = dict()
78             cls.flows[cls.pg0] = [cls.pg1]
79
80             # Packet sizes
81             cls.pg_if_packet_sizes = [64, 512, 1518, 9018]
82
83             # Create BD with MAC learning and unknown unicast flooding disabled
84             # and put interfaces to this BD
85             cls.vapi.bridge_domain_add_del(bd_id=cls.bd_id, uu_flood=1,
86                                            learn=1)
87             for pg_if in cls.pg_interfaces:
88                 cls.vapi.sw_interface_set_l2_bridge(pg_if.sw_if_index,
89                                                     bd_id=cls.bd_id)
90
91             # Set up all interfaces
92             for i in cls.pg_interfaces:
93                 i.admin_up()
94
95             # Mapping between packet-generator index and lists of test hosts
96             cls.hosts_by_pg_idx = dict()
97             for pg_if in cls.pg_interfaces:
98                 cls.hosts_by_pg_idx[pg_if.sw_if_index] = []
99
100             # Create list of deleted hosts
101             cls.deleted_hosts_by_pg_idx = dict()
102             for pg_if in cls.pg_interfaces:
103                 cls.deleted_hosts_by_pg_idx[pg_if.sw_if_index] = []
104
105             # warm-up the mac address tables
106             # self.warmup_test()
107
108         except Exception:
109             super(TestACLplugin, cls).tearDownClass()
110             raise
111
112     def setUp(self):
113         super(TestACLplugin, self).setUp()
114         self.reset_packet_infos()
115
116     def tearDown(self):
117         """
118         Show various debug prints after each test.
119         """
120         super(TestACLplugin, self).tearDown()
121         if not self.vpp_dead:
122             self.logger.info(self.vapi.ppcli("show l2fib verbose"))
123             self.logger.info(self.vapi.ppcli("show bridge-domain %s detail"
124                                              % self.bd_id))
125
126     def create_hosts(self, count, start=0):
127         """
128         Create required number of host MAC addresses and distribute them among
129         interfaces. Create host IPv4 address for every host MAC address.
130
131         :param int count: Number of hosts to create MAC/IPv4 addresses for.
132         :param int start: Number to start numbering from.
133         """
134         n_int = len(self.pg_interfaces)
135         macs_per_if = count / n_int
136         i = -1
137         for pg_if in self.pg_interfaces:
138             i += 1
139             start_nr = macs_per_if * i + start
140             end_nr = count + start if i == (n_int - 1) \
141                 else macs_per_if * (i + 1) + start
142             hosts = self.hosts_by_pg_idx[pg_if.sw_if_index]
143             for j in range(start_nr, end_nr):
144                 host = Host(
145                     "00:00:00:ff:%02x:%02x" % (pg_if.sw_if_index, j),
146                     "172.17.1%02x.%u" % (pg_if.sw_if_index, j),
147                     "2017:dead:%02x::%u" % (pg_if.sw_if_index, j))
148                 hosts.append(host)
149
150     def create_rule(self, ip=0, permit_deny=0, ports=PORTS_ALL, proto=-1,
151                     s_prefix=0, s_ip='\x00\x00\x00\x00',
152                     d_prefix=0, d_ip='\x00\x00\x00\x00'):
153         if proto == -1:
154             return
155         if ports == self.PORTS_ALL:
156             sport_from = 0
157             dport_from = 0
158             sport_to = 65535 if proto != 1 and proto != 58 else 255
159             dport_to = sport_to
160         elif ports == self.PORTS_RANGE:
161             if proto == 1:
162                 sport_from = self.icmp4_type
163                 sport_to = self.icmp4_type
164                 dport_from = self.icmp4_code
165                 dport_to = self.icmp4_code
166             elif proto == 58:
167                 sport_from = self.icmp6_type
168                 sport_to = self.icmp6_type
169                 dport_from = self.icmp6_code
170                 dport_to = self.icmp6_code
171             elif proto == self.proto[self.IP][self.TCP]:
172                 sport_from = self.tcp_sport_from
173                 sport_to = self.tcp_sport_to
174                 dport_from = self.tcp_dport_from
175                 dport_to = self.tcp_dport_to
176             elif proto == self.proto[self.IP][self.UDP]:
177                 sport_from = self.udp_sport_from
178                 sport_to = self.udp_sport_to
179                 dport_from = self.udp_dport_from
180                 dport_to = self.udp_dport_to
181         else:
182             sport_from = ports
183             sport_to = ports
184             dport_from = ports
185             dport_to = ports
186
187         rule = ({'is_permit': permit_deny, 'is_ipv6': ip, 'proto': proto,
188                  'srcport_or_icmptype_first': sport_from,
189                  'srcport_or_icmptype_last': sport_to,
190                  'src_ip_prefix_len': s_prefix,
191                  'src_ip_addr': s_ip,
192                  'dstport_or_icmpcode_first': dport_from,
193                  'dstport_or_icmpcode_last': dport_to,
194                  'dst_ip_prefix_len': d_prefix,
195                  'dst_ip_addr': d_ip})
196         return rule
197
198     def apply_rules(self, rules, tag=''):
199         reply = self.api_acl_add_replace(acl_index=4294967295, r=rules,
200                                          count=len(rules),
201                                          tag=tag)
202         self.logger.info("Dumped ACL: " + str(
203             self.api_acl_dump(reply.acl_index)))
204         # Apply a ACL on the interface as inbound
205         for i in self.pg_interfaces:
206             self.api_acl_interface_set_acl_list(sw_if_index=i.sw_if_index,
207                                                 count=1, n_input=1,
208                                                 acls=[reply.acl_index])
209         return
210
211     def create_upper_layer(self, packet_index, proto, ports=0):
212         p = self.proto_map[proto]
213         if p == 'UDP':
214             if ports == 0:
215                 return UDP(sport=random.randint(self.udp_sport_from,
216                                                 self.udp_sport_to),
217                            dport=random.randint(self.udp_dport_from,
218                                                 self.udp_dport_to))
219             else:
220                 return UDP(sport=ports, dport=ports)
221         elif p == 'TCP':
222             if ports == 0:
223                 return TCP(sport=random.randint(self.tcp_sport_from,
224                                                 self.tcp_sport_to),
225                            dport=random.randint(self.tcp_dport_from,
226                                                 self.tcp_dport_to))
227             else:
228                 return TCP(sport=ports, dport=ports)
229         return ''
230
231     def create_stream(self, src_if, packet_sizes, traffic_type=0, ipv6=0,
232                       proto=-1, ports=0):
233         """
234         Create input packet stream for defined interface using hosts or
235         deleted_hosts list.
236
237         :param object src_if: Interface to create packet stream for.
238         :param list packet_sizes: List of required packet sizes.
239         :param traffic_type: 1: ICMP packet, 2: IPv6 with EH, 0: otherwise.
240         :return: Stream of packets.
241         """
242         pkts = []
243         if self.flows.__contains__(src_if):
244             src_hosts = self.hosts_by_pg_idx[src_if.sw_if_index]
245             for dst_if in self.flows[src_if]:
246                 dst_hosts = self.hosts_by_pg_idx[dst_if.sw_if_index]
247                 n_int = len(dst_hosts) * len(src_hosts)
248                 for i in range(0, n_int):
249                     dst_host = dst_hosts[i / len(src_hosts)]
250                     src_host = src_hosts[i % len(src_hosts)]
251                     pkt_info = self.create_packet_info(src_if, dst_if)
252                     if ipv6 == 1:
253                         pkt_info.ip = 1
254                     elif ipv6 == 0:
255                         pkt_info.ip = 0
256                     else:
257                         pkt_info.ip = random.choice([0, 1])
258                     if proto == -1:
259                         pkt_info.proto = random.choice(self.proto[self.IP])
260                     else:
261                         pkt_info.proto = proto
262                     payload = self.info_to_payload(pkt_info)
263                     p = Ether(dst=dst_host.mac, src=src_host.mac)
264                     if pkt_info.ip:
265                         p /= IPv6(dst=dst_host.ip6, src=src_host.ip6)
266                     else:
267                         p /= IP(src=src_host.ip4, dst=dst_host.ip4)
268                     if traffic_type == self.ICMP:
269                         if pkt_info.ip:
270                             p /= ICMPv6EchoRequest(type=self.icmp6_type,
271                                                    code=self.icmp6_code)
272                         else:
273                             p /= ICMP(type=self.icmp4_type,
274                                       code=self.icmp4_code)
275                     else:
276                         p /= self.create_upper_layer(i, pkt_info.proto, ports)
277                     p /= Raw(payload)
278                     pkt_info.data = p.copy()
279                     size = random.choice(packet_sizes)
280                     self.extend_packet(p, size)
281                     pkts.append(p)
282         return pkts
283
284     def verify_capture(self, pg_if, capture, traffic_type=0, ip_type=0):
285         """
286         Verify captured input packet stream for defined interface.
287
288         :param object pg_if: Interface to verify captured packet stream for.
289         :param list capture: Captured packet stream.
290         :param traffic_type: 1: ICMP packet, 2: IPv6 with EH, 0: otherwise.
291         """
292         last_info = dict()
293         for i in self.pg_interfaces:
294             last_info[i.sw_if_index] = None
295         dst_sw_if_index = pg_if.sw_if_index
296         for packet in capture:
297             try:
298                 # Raw data for ICMPv6 are stored in ICMPv6EchoRequest.data
299                 if traffic_type == self.ICMP and ip_type == self.IPV6:
300                     payload_info = self.payload_to_info(
301                         packet[ICMPv6EchoRequest].data)
302                     payload = packet[ICMPv6EchoRequest]
303                 else:
304                     payload_info = self.payload_to_info(str(packet[Raw]))
305                     payload = packet[self.proto_map[payload_info.proto]]
306             except:
307                 self.logger.error(ppp("Unexpected or invalid packet "
308                                       "(outside network):", packet))
309                 raise
310
311             if ip_type != 0:
312                 self.assertEqual(payload_info.ip, ip_type)
313             if traffic_type == self.ICMP:
314                 try:
315                     if payload_info.ip == 0:
316                         self.assertEqual(payload.type, self.icmp4_type)
317                         self.assertEqual(payload.code, self.icmp4_code)
318                     else:
319                         self.assertEqual(payload.type, self.icmp6_type)
320                         self.assertEqual(payload.code, self.icmp6_code)
321                 except:
322                     self.logger.error(ppp("Unexpected or invalid packet "
323                                           "(outside network):", packet))
324                     raise
325             else:
326                 try:
327                     ip_version = IPv6 if payload_info.ip == 1 else IP
328
329                     ip = packet[ip_version]
330                     packet_index = payload_info.index
331
332                     self.assertEqual(payload_info.dst, dst_sw_if_index)
333                     self.logger.debug("Got packet on port %s: src=%u (id=%u)" %
334                                       (pg_if.name, payload_info.src,
335                                        packet_index))
336                     next_info = self.get_next_packet_info_for_interface2(
337                         payload_info.src, dst_sw_if_index,
338                         last_info[payload_info.src])
339                     last_info[payload_info.src] = next_info
340                     self.assertTrue(next_info is not None)
341                     self.assertEqual(packet_index, next_info.index)
342                     saved_packet = next_info.data
343                     # Check standard fields
344                     self.assertEqual(ip.src, saved_packet[ip_version].src)
345                     self.assertEqual(ip.dst, saved_packet[ip_version].dst)
346                     p = self.proto_map[payload_info.proto]
347                     if p == 'TCP':
348                         tcp = packet[TCP]
349                         self.assertEqual(tcp.sport, saved_packet[
350                             TCP].sport)
351                         self.assertEqual(tcp.dport, saved_packet[
352                             TCP].dport)
353                     elif p == 'UDP':
354                         udp = packet[UDP]
355                         self.assertEqual(udp.sport, saved_packet[
356                             UDP].sport)
357                         self.assertEqual(udp.dport, saved_packet[
358                             UDP].dport)
359                 except:
360                     self.logger.error(ppp("Unexpected or invalid packet:",
361                                           packet))
362                     raise
363         for i in self.pg_interfaces:
364             remaining_packet = self.get_next_packet_info_for_interface2(
365                 i, dst_sw_if_index, last_info[i.sw_if_index])
366             self.assertTrue(
367                 remaining_packet is None,
368                 "Port %u: Packet expected from source %u didn't arrive" %
369                 (dst_sw_if_index, i.sw_if_index))
370
371     def run_traffic_no_check(self):
372         # Test
373         # Create incoming packet streams for packet-generator interfaces
374         for i in self.pg_interfaces:
375             if self.flows.__contains__(i):
376                 pkts = self.create_stream(i, self.pg_if_packet_sizes)
377                 if len(pkts) > 0:
378                     i.add_stream(pkts)
379
380         # Enable packet capture and start packet sending
381         self.pg_enable_capture(self.pg_interfaces)
382         self.pg_start()
383
384     def run_verify_test(self, traffic_type=0, ip_type=0, proto=-1, ports=0):
385         # Test
386         # Create incoming packet streams for packet-generator interfaces
387         pkts_cnt = 0
388         for i in self.pg_interfaces:
389             if self.flows.__contains__(i):
390                 pkts = self.create_stream(i, self.pg_if_packet_sizes,
391                                           traffic_type, ip_type, proto, ports)
392                 if len(pkts) > 0:
393                     i.add_stream(pkts)
394                     pkts_cnt += len(pkts)
395
396         # Enable packet capture and start packet sendingself.IPV
397         self.pg_enable_capture(self.pg_interfaces)
398         self.pg_start()
399
400         # Verify
401         # Verify outgoing packet streams per packet-generator interface
402         for src_if in self.pg_interfaces:
403             if self.flows.__contains__(src_if):
404                 for dst_if in self.flows[src_if]:
405                     capture = dst_if.get_capture(pkts_cnt)
406                     self.logger.info("Verifying capture on interface %s" %
407                                      dst_if.name)
408                     self.verify_capture(dst_if, capture, traffic_type, ip_type)
409
410     def run_verify_negat_test(self, traffic_type=0, ip_type=0, proto=-1,
411                               ports=0):
412         # Test
413         self.reset_packet_infos()
414         for i in self.pg_interfaces:
415             if self.flows.__contains__(i):
416                 pkts = self.create_stream(i, self.pg_if_packet_sizes,
417                                           traffic_type, ip_type, proto, ports)
418                 if len(pkts) > 0:
419                     i.add_stream(pkts)
420
421         # Enable packet capture and start packet sending
422         self.pg_enable_capture(self.pg_interfaces)
423         self.pg_start()
424
425         # Verify
426         # Verify outgoing packet streams per packet-generator interface
427         for src_if in self.pg_interfaces:
428             if self.flows.__contains__(src_if):
429                 for dst_if in self.flows[src_if]:
430                     self.logger.info("Verifying capture on interface %s" %
431                                      dst_if.name)
432                     capture = dst_if.get_capture(0)
433                     self.assertEqual(len(capture), 0)
434
435     def api_acl_add_replace(self, acl_index, r, count, tag='',
436                             expected_retval=0):
437         """Add/replace an ACL
438
439         :param int acl_index: ACL index to replace,
440         4294967295 to create new ACL.
441         :param acl_rule r: ACL rules array.
442         :param str tag: symbolic tag (description) for this ACL.
443         :param int count: number of rules.
444         """
445         return self.vapi.api(self.vapi.papi.acl_add_replace,
446                              {'acl_index': acl_index,
447                               'r': r,
448                               'count': count,
449                               'tag': tag},
450                              expected_retval=expected_retval)
451
452     def api_acl_interface_set_acl_list(self, sw_if_index, count, n_input, acls,
453                                        expected_retval=0):
454         return self.vapi.api(self.vapi.papi.acl_interface_set_acl_list,
455                              {'sw_if_index': sw_if_index,
456                               'count': count,
457                               'n_input': n_input,
458                               'acls': acls},
459                              expected_retval=expected_retval)
460
461     def api_acl_dump(self, acl_index, expected_retval=0):
462         return self.vapi.api(self.vapi.papi.acl_dump,
463                              {'acl_index': acl_index},
464                              expected_retval=expected_retval)
465
466     def test_0000_warmup_test(self):
467         """ ACL plugin version check; learn MACs
468         """
469         self.create_hosts(16)
470         self.run_traffic_no_check()
471         reply = self.vapi.papi.acl_plugin_get_version()
472         self.assertEqual(reply.major, 1)
473         self.logger.info("Working with ACL plugin version: %d.%d" % (
474             reply.major, reply.minor))
475         # minor version changes are non breaking
476         # self.assertEqual(reply.minor, 0)
477
478     def test_0001_acl_create(self):
479         """ ACL create test
480         """
481
482         self.logger.info("ACLP_TEST_START_0001")
483         # Add an ACL
484         r = [{'is_permit': 1, 'is_ipv6': 0, 'proto': 17,
485               'srcport_or_icmptype_first': 1234,
486               'srcport_or_icmptype_last': 1235,
487               'src_ip_prefix_len': 0,
488               'src_ip_addr': '\x00\x00\x00\x00',
489               'dstport_or_icmpcode_first': 1234,
490               'dstport_or_icmpcode_last': 1234,
491               'dst_ip_addr': '\x00\x00\x00\x00',
492               'dst_ip_prefix_len': 0}]
493         # Test 1: add a new ACL
494         reply = self.api_acl_add_replace(acl_index=4294967295, r=r,
495                                          count=len(r), tag="permit 1234")
496         self.assertEqual(reply.retval, 0)
497         # The very first ACL gets #0
498         self.assertEqual(reply.acl_index, 0)
499         rr = self.api_acl_dump(reply.acl_index)
500         self.logger.info("Dumped ACL: " + str(rr))
501         self.assertEqual(len(rr), 1)
502         # We should have the same number of ACL entries as we had asked
503         self.assertEqual(len(rr[0].r), len(r))
504         # The rules should be the same. But because the submitted and returned
505         # are different types, we need to iterate over rules and keys to get
506         # to basic values.
507         for i_rule in range(0, len(r) - 1):
508             for rule_key in r[i_rule]:
509                 self.assertEqual(rr[0].r[i_rule][rule_key],
510                                  r[i_rule][rule_key])
511
512         # Add a deny-1234 ACL
513         r_deny = ({'is_permit': 0, 'is_ipv6': 0, 'proto': 17,
514                    'srcport_or_icmptype_first': 1234,
515                    'srcport_or_icmptype_last': 1235,
516                    'src_ip_prefix_len': 0,
517                    'src_ip_addr': '\x00\x00\x00\x00',
518                    'dstport_or_icmpcode_first': 1234,
519                    'dstport_or_icmpcode_last': 1234,
520                    'dst_ip_addr': '\x00\x00\x00\x00',
521                    'dst_ip_prefix_len': 0},
522                   {'is_permit': 1, 'is_ipv6': 0, 'proto': 17,
523                    'srcport_or_icmptype_first': 0,
524                    'srcport_or_icmptype_last': 0,
525                    'src_ip_prefix_len': 0,
526                    'src_ip_addr': '\x00\x00\x00\x00',
527                    'dstport_or_icmpcode_first': 0,
528                    'dstport_or_icmpcode_last': 0,
529                    'dst_ip_addr': '\x00\x00\x00\x00',
530                    'dst_ip_prefix_len': 0})
531
532         reply = self.api_acl_add_replace(acl_index=4294967295, r=r_deny,
533                                          count=len(r_deny),
534                                          tag="deny 1234;permit all")
535         self.assertEqual(reply.retval, 0)
536         # The second ACL gets #1
537         self.assertEqual(reply.acl_index, 1)
538
539         # Test 2: try to modify a nonexistent ACL
540         reply = self.api_acl_add_replace(acl_index=432, r=r, count=len(r),
541                                          tag="FFFF:FFFF", expected_retval=-1)
542         self.assertEqual(reply.retval, -1)
543         # The ACL number should pass through
544         self.assertEqual(reply.acl_index, 432)
545
546         self.logger.info("ACLP_TEST_FINISH_0001")
547
548     def test_0002_acl_permit_apply(self):
549         """ permit ACL apply test
550         """
551         self.logger.info("ACLP_TEST_START_0002")
552
553         rules = []
554         rules.append(self.create_rule(self.IPV4, self.PERMIT,
555                      0, self.proto[self.IP][self.UDP]))
556         rules.append(self.create_rule(self.IPV4, self.PERMIT,
557                      0, self.proto[self.IP][self.TCP]))
558
559         # Apply rules
560         self.apply_rules(rules, "permit per-flow")
561
562         # Traffic should still pass
563         self.run_verify_test(self.IP, self.IPV4, -1)
564         self.logger.info("ACLP_TEST_FINISH_0002")
565
566     def test_0003_acl_deny_apply(self):
567         """ deny ACL apply test
568         """
569         self.logger.info("ACLP_TEST_START_0003")
570         # Add a deny-flows ACL
571         rules = []
572         rules.append(self.create_rule(self.IPV4, self.DENY,
573                      self.PORTS_ALL, self.proto[self.IP][self.UDP]))
574         # Permit ip any any in the end
575         rules.append(self.create_rule(self.IPV4, self.PERMIT,
576                                       self.PORTS_ALL, 0))
577
578         # Apply rules
579         self.apply_rules(rules, "deny per-flow;permit all")
580
581         # Traffic should not pass
582         self.run_verify_negat_test(self.IP, self.IPV4,
583                                    self.proto[self.IP][self.UDP])
584         self.logger.info("ACLP_TEST_FINISH_0003")
585         # self.assertEqual(1, 0)
586
587     def test_0004_vpp624_permit_icmpv4(self):
588         """ VPP_624 permit ICMPv4
589         """
590         self.logger.info("ACLP_TEST_START_0004")
591
592         # Add an ACL
593         rules = []
594         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
595                                       self.proto[self.ICMP][self.ICMPv4]))
596         # deny ip any any in the end
597         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
598
599         # Apply rules
600         self.apply_rules(rules, "permit icmpv4")
601
602         # Traffic should still pass
603         self.run_verify_test(self.ICMP, self.IPV4,
604                              self.proto[self.ICMP][self.ICMPv4])
605
606         self.logger.info("ACLP_TEST_FINISH_0004")
607
608     def test_0005_vpp624_permit_icmpv6(self):
609         """ VPP_624 permit ICMPv6
610         """
611         self.logger.info("ACLP_TEST_START_0005")
612
613         # Add an ACL
614         rules = []
615         rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
616                                       self.proto[self.ICMP][self.ICMPv6]))
617         # deny ip any any in the end
618         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
619
620         # Apply rules
621         self.apply_rules(rules, "permit icmpv6")
622
623         # Traffic should still pass
624         self.run_verify_test(self.ICMP, self.IPV6,
625                              self.proto[self.ICMP][self.ICMPv6])
626
627         self.logger.info("ACLP_TEST_FINISH_0005")
628
629     def test_0006_vpp624_deny_icmpv4(self):
630         """ VPP_624 deny ICMPv4
631         """
632         self.logger.info("ACLP_TEST_START_0006")
633         # Add an ACL
634         rules = []
635         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
636                                       self.proto[self.ICMP][self.ICMPv4]))
637         # permit ip any any in the end
638         rules.append(self.create_rule(self.IPV4, self.PERMIT,
639                                       self.PORTS_ALL, 0))
640
641         # Apply rules
642         self.apply_rules(rules, "deny icmpv4")
643
644         # Traffic should not pass
645         self.run_verify_negat_test(self.ICMP, self.IPV4, 0)
646
647         self.logger.info("ACLP_TEST_FINISH_0006")
648
649     def test_0007_vpp624_deny_icmpv6(self):
650         """ VPP_624 deny ICMPv6
651         """
652         self.logger.info("ACLP_TEST_START_0007")
653         # Add an ACL
654         rules = []
655         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
656                                       self.proto[self.ICMP][self.ICMPv6]))
657         # deny ip any any in the end
658         rules.append(self.create_rule(self.IPV6, self.PERMIT,
659                                       self.PORTS_ALL, 0))
660
661         # Apply rules
662         self.apply_rules(rules, "deny icmpv6")
663
664         # Traffic should not pass
665         self.run_verify_negat_test(self.ICMP, self.IPV6, 0)
666
667         self.logger.info("ACLP_TEST_FINISH_0007")
668
669     def test_0008_tcp_permit_v4(self):
670         """ permit TCPv4
671         """
672         self.logger.info("ACLP_TEST_START_0008")
673
674         # Add an ACL
675         rules = []
676         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
677                      self.proto[self.IP][self.TCP]))
678         # deny ip any any in the end
679         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
680
681         # Apply rules
682         self.apply_rules(rules, "permit ipv4 tcp")
683
684         # Traffic should still pass
685         self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP])
686
687         self.logger.info("ACLP_TEST_FINISH_0008")
688
689     def test_0009_tcp_permit_v6(self):
690         """ permit TCPv6
691         """
692         self.logger.info("ACLP_TEST_START_0009")
693
694         # Add an ACL
695         rules = []
696         rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
697                                       self.proto[self.IP][self.TCP]))
698         # deny ip any any in the end
699         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
700
701         # Apply rules
702         self.apply_rules(rules, "permit ip6 tcp")
703
704         # Traffic should still pass
705         self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.TCP])
706
707         self.logger.info("ACLP_TEST_FINISH_0008")
708
709     def test_0010_udp_permit_v4(self):
710         """ permit UDPv4
711         """
712         self.logger.info("ACLP_TEST_START_0010")
713
714         # Add an ACL
715         rules = []
716         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
717                                       self.proto[self.IP][self.UDP]))
718         # deny ip any any in the end
719         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
720
721         # Apply rules
722         self.apply_rules(rules, "permit ipv udp")
723
724         # Traffic should still pass
725         self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP])
726
727         self.logger.info("ACLP_TEST_FINISH_0010")
728
729     def test_0011_udp_permit_v6(self):
730         """ permit UDPv6
731         """
732         self.logger.info("ACLP_TEST_START_0011")
733
734         # Add an ACL
735         rules = []
736         rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
737                                       self.proto[self.IP][self.UDP]))
738         # deny ip any any in the end
739         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
740
741         # Apply rules
742         self.apply_rules(rules, "permit ip6 udp")
743
744         # Traffic should still pass
745         self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.UDP])
746
747         self.logger.info("ACLP_TEST_FINISH_0011")
748
749     def test_0012_tcp_deny(self):
750         """ deny TCPv4/v6
751         """
752         self.logger.info("ACLP_TEST_START_0012")
753
754         # Add an ACL
755         rules = []
756         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
757                                       self.proto[self.IP][self.TCP]))
758         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
759                                       self.proto[self.IP][self.TCP]))
760         # permit ip any any in the end
761         rules.append(self.create_rule(self.IPV4, self.PERMIT,
762                                       self.PORTS_ALL, 0))
763         rules.append(self.create_rule(self.IPV6, self.PERMIT,
764                                       self.PORTS_ALL, 0))
765
766         # Apply rules
767         self.apply_rules(rules, "deny ip4/ip6 tcp")
768
769         # Traffic should not pass
770         self.run_verify_negat_test(self.IP, self.IPRANDOM,
771                                    self.proto[self.IP][self.TCP])
772
773         self.logger.info("ACLP_TEST_FINISH_0012")
774
775     def test_0013_udp_deny(self):
776         """ deny UDPv4/v6
777         """
778         self.logger.info("ACLP_TEST_START_0013")
779
780         # Add an ACL
781         rules = []
782         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
783                                       self.proto[self.IP][self.UDP]))
784         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
785                                       self.proto[self.IP][self.UDP]))
786         # permit ip any any in the end
787         rules.append(self.create_rule(self.IPV4, self.PERMIT,
788                                       self.PORTS_ALL, 0))
789         rules.append(self.create_rule(self.IPV6, self.PERMIT,
790                                       self.PORTS_ALL, 0))
791
792         # Apply rules
793         self.apply_rules(rules, "deny ip4/ip6 udp")
794
795         # Traffic should not pass
796         self.run_verify_negat_test(self.IP, self.IPRANDOM,
797                                    self.proto[self.IP][self.UDP])
798
799         self.logger.info("ACLP_TEST_FINISH_0013")
800
801     def test_0014_acl_dump(self):
802         """ verify add/dump acls
803         """
804         self.logger.info("ACLP_TEST_START_0014")
805
806         r = [[self.IPV4, self.PERMIT, 1234, self.proto[self.IP][self.TCP]],
807              [self.IPV4, self.PERMIT, 2345, self.proto[self.IP][self.UDP]],
808              [self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.TCP]],
809              [self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.UDP]],
810              [self.IPV4, self.PERMIT, 5, self.proto[self.ICMP][self.ICMPv4]],
811              [self.IPV6, self.PERMIT, 4321, self.proto[self.IP][self.TCP]],
812              [self.IPV6, self.PERMIT, 5432, self.proto[self.IP][self.UDP]],
813              [self.IPV6, self.PERMIT, 0, self.proto[self.IP][self.TCP]],
814              [self.IPV6, self.PERMIT, 0, self.proto[self.IP][self.UDP]],
815              [self.IPV6, self.PERMIT, 6, self.proto[self.ICMP][self.ICMPv6]],
816              [self.IPV4, self.DENY, self.PORTS_ALL, 0],
817              [self.IPV4, self.DENY, 1234, self.proto[self.IP][self.TCP]],
818              [self.IPV4, self.DENY, 2345, self.proto[self.IP][self.UDP]],
819              [self.IPV4, self.DENY, 5, self.proto[self.ICMP][self.ICMPv4]],
820              [self.IPV6, self.DENY, 4321, self.proto[self.IP][self.TCP]],
821              [self.IPV6, self.DENY, 5432, self.proto[self.IP][self.UDP]],
822              [self.IPV6, self.DENY, 6, self.proto[self.ICMP][self.ICMPv6]],
823              [self.IPV6, self.DENY, self.PORTS_ALL, 0]
824              ]
825
826         # Add and verify new ACLs
827         rules = []
828         for i in range(len(r)):
829             rules.append(self.create_rule(r[i][0], r[i][1], r[i][2], r[i][3]))
830
831         reply = self.api_acl_add_replace(acl_index=4294967295, r=rules,
832                                          count=len(rules))
833         result = self.api_acl_dump(reply.acl_index)
834
835         i = 0
836         for drules in result:
837             for dr in drules.r:
838                 self.assertEqual(dr.is_ipv6, r[i][0])
839                 self.assertEqual(dr.is_permit, r[i][1])
840                 self.assertEqual(dr.proto, r[i][3])
841
842                 if r[i][2] > 0:
843                     self.assertEqual(dr.srcport_or_icmptype_first, r[i][2])
844                 else:
845                     if r[i][2] < 0:
846                         self.assertEqual(dr.srcport_or_icmptype_first, 0)
847                         self.assertEqual(dr.srcport_or_icmptype_last, 65535)
848                     else:
849                         if dr.proto == self.proto[self.IP][self.TCP]:
850                             self.assertGreater(dr.srcport_or_icmptype_first,
851                                                self.tcp_sport_from-1)
852                             self.assertLess(dr.srcport_or_icmptype_first,
853                                             self.tcp_sport_to+1)
854                             self.assertGreater(dr.dstport_or_icmpcode_last,
855                                                self.tcp_dport_from-1)
856                             self.assertLess(dr.dstport_or_icmpcode_last,
857                                             self.tcp_dport_to+1)
858                         elif dr.proto == self.proto[self.IP][self.UDP]:
859                             self.assertGreater(dr.srcport_or_icmptype_first,
860                                                self.udp_sport_from-1)
861                             self.assertLess(dr.srcport_or_icmptype_first,
862                                             self.udp_sport_to+1)
863                             self.assertGreater(dr.dstport_or_icmpcode_last,
864                                                self.udp_dport_from-1)
865                             self.assertLess(dr.dstport_or_icmpcode_last,
866                                             self.udp_dport_to+1)
867                 i += 1
868
869         self.logger.info("ACLP_TEST_FINISH_0014")
870
871     def test_0015_tcp_permit_port_v4(self):
872         """ permit single TCPv4
873         """
874         self.logger.info("ACLP_TEST_START_0015")
875
876         port = random.randint(0, 65535)
877         # Add an ACL
878         rules = []
879         rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
880                                       self.proto[self.IP][self.TCP]))
881         # deny ip any any in the end
882         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
883
884         # Apply rules
885         self.apply_rules(rules, "permit ip4 tcp "+str(port))
886
887         # Traffic should still pass
888         self.run_verify_test(self.IP, self.IPV4,
889                              self.proto[self.IP][self.TCP], port)
890
891         self.logger.info("ACLP_TEST_FINISH_0015")
892
893     def test_0016_udp_permit_port_v4(self):
894         """ permit single UDPv4
895         """
896         self.logger.info("ACLP_TEST_START_0016")
897
898         port = random.randint(0, 65535)
899         # Add an ACL
900         rules = []
901         rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
902                                       self.proto[self.IP][self.UDP]))
903         # deny ip any any in the end
904         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
905
906         # Apply rules
907         self.apply_rules(rules, "permit ip4 tcp "+str(port))
908
909         # Traffic should still pass
910         self.run_verify_test(self.IP, self.IPV4,
911                              self.proto[self.IP][self.UDP], port)
912
913         self.logger.info("ACLP_TEST_FINISH_0016")
914
915     def test_0017_tcp_permit_port_v6(self):
916         """ permit single TCPv6
917         """
918         self.logger.info("ACLP_TEST_START_0017")
919
920         port = random.randint(0, 65535)
921         # Add an ACL
922         rules = []
923         rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
924                                       self.proto[self.IP][self.TCP]))
925         # deny ip any any in the end
926         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
927
928         # Apply rules
929         self.apply_rules(rules, "permit ip4 tcp "+str(port))
930
931         # Traffic should still pass
932         self.run_verify_test(self.IP, self.IPV6,
933                              self.proto[self.IP][self.TCP], port)
934
935         self.logger.info("ACLP_TEST_FINISH_0017")
936
937     def test_0018_udp_permit_port_v6(self):
938         """ permit single UPPv6
939         """
940         self.logger.info("ACLP_TEST_START_0018")
941
942         port = random.randint(0, 65535)
943         # Add an ACL
944         rules = []
945         rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
946                                       self.proto[self.IP][self.UDP]))
947         # deny ip any any in the end
948         rules.append(self.create_rule(self.IPV6, self.DENY,
949                                       self.PORTS_ALL, 0))
950
951         # Apply rules
952         self.apply_rules(rules, "permit ip4 tcp "+str(port))
953
954         # Traffic should still pass
955         self.run_verify_test(self.IP, self.IPV6,
956                              self.proto[self.IP][self.UDP], port)
957
958         self.logger.info("ACLP_TEST_FINISH_0018")
959
960     def test_0019_udp_deny_port(self):
961         """ deny single TCPv4/v6
962         """
963         self.logger.info("ACLP_TEST_START_0019")
964
965         port = random.randint(0, 65535)
966         # Add an ACL
967         rules = []
968         rules.append(self.create_rule(self.IPV4, self.DENY, port,
969                                       self.proto[self.IP][self.TCP]))
970         rules.append(self.create_rule(self.IPV6, self.DENY, port,
971                                       self.proto[self.IP][self.TCP]))
972         # Permit ip any any in the end
973         rules.append(self.create_rule(self.IPV4, self.PERMIT,
974                                       self.PORTS_ALL, 0))
975         rules.append(self.create_rule(self.IPV6, self.PERMIT,
976                                       self.PORTS_ALL, 0))
977
978         # Apply rules
979         self.apply_rules(rules, "deny ip4/ip6 udp "+str(port))
980
981         # Traffic should not pass
982         self.run_verify_negat_test(self.IP, self.IPRANDOM,
983                                    self.proto[self.IP][self.TCP], port)
984
985         self.logger.info("ACLP_TEST_FINISH_0019")
986
987     def test_0020_udp_deny_port(self):
988         """ deny single UDPv4/v6
989         """
990         self.logger.info("ACLP_TEST_START_0020")
991
992         port = random.randint(0, 65535)
993         # Add an ACL
994         rules = []
995         rules.append(self.create_rule(self.IPV4, self.DENY, port,
996                                       self.proto[self.IP][self.UDP]))
997         rules.append(self.create_rule(self.IPV6, self.DENY, port,
998                                       self.proto[self.IP][self.UDP]))
999         # Permit ip any any in the end
1000         rules.append(self.create_rule(self.IPV4, self.PERMIT,
1001                                       self.PORTS_ALL, 0))
1002         rules.append(self.create_rule(self.IPV6, self.PERMIT,
1003                                       self.PORTS_ALL, 0))
1004
1005         # Apply rules
1006         self.apply_rules(rules, "deny ip4/ip6 udp "+str(port))
1007
1008         # Traffic should not pass
1009         self.run_verify_negat_test(self.IP, self.IPRANDOM,
1010                                    self.proto[self.IP][self.UDP], port)
1011
1012         self.logger.info("ACLP_TEST_FINISH_0020")
1013
1014 if __name__ == '__main__':
1015     unittest.main(testRunner=VppTestRunner)