b051d457824f71b9c756a1cc3d20a1696b30f342
[vpp.git] / test / test_acl_plugin.py
1 #!/usr/bin/env python
2 """ACL plugin Test Case HLD:
3 """
4
5 import unittest
6 import random
7
8 from scapy.packet import Raw
9 from scapy.layers.l2 import Ether
10 from scapy.layers.inet import IP, TCP, UDP, ICMP
11 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest
12 from scapy.layers.inet6 import IPv6ExtHdrFragment
13 from framework import VppTestCase, VppTestRunner
14 from util import Host, ppp
15
16
17 class TestACLplugin(VppTestCase):
18     """ ACL plugin Test Case """
19
20     # traffic types
21     IP = 0
22     ICMP = 1
23
24     # IP version
25     IPRANDOM = -1
26     IPV4 = 0
27     IPV6 = 1
28
29     # rule types
30     DENY = 0
31     PERMIT = 1
32
33     # supported protocols
34     proto = [[6, 17], [1, 58]]
35     proto_map = {1: 'ICMP', 58: 'ICMPv6EchoRequest', 6: 'TCP', 17: 'UDP'}
36     ICMPv4 = 0
37     ICMPv6 = 1
38     TCP = 0
39     UDP = 1
40     PROTO_ALL = 0
41
42     # port ranges
43     PORTS_ALL = -1
44     PORTS_RANGE = 0
45     udp_sport_from = 10
46     udp_sport_to = udp_sport_from + 5
47     udp_dport_from = 20000
48     udp_dport_to = udp_dport_from + 5000
49     tcp_sport_from = 30
50     tcp_sport_to = tcp_sport_from + 5
51     tcp_dport_from = 40000
52     tcp_dport_to = tcp_dport_from + 5000
53
54     icmp4_type = 8  # echo request
55     icmp4_code = 3
56     icmp6_type = 128  # echo request
57     icmp6_code = 3
58
59     # Test variables
60     bd_id = 1
61
62     @classmethod
63     def setUpClass(cls):
64         """
65         Perform standard class setup (defined by class method setUpClass in
66         class VppTestCase) before running the test case, set test case related
67         variables and configure VPP.
68         """
69         super(TestACLplugin, cls).setUpClass()
70
71         random.seed()
72
73         try:
74             # Create 2 pg interfaces
75             cls.create_pg_interfaces(range(2))
76
77             # Packet flows mapping pg0 -> pg1, pg2 etc.
78             cls.flows = dict()
79             cls.flows[cls.pg0] = [cls.pg1]
80
81             # Packet sizes
82             cls.pg_if_packet_sizes = [64, 512, 1518, 9018]
83
84             # Create BD with MAC learning and unknown unicast flooding disabled
85             # and put interfaces to this BD
86             cls.vapi.bridge_domain_add_del(bd_id=cls.bd_id, uu_flood=1,
87                                            learn=1)
88             for pg_if in cls.pg_interfaces:
89                 cls.vapi.sw_interface_set_l2_bridge(pg_if.sw_if_index,
90                                                     bd_id=cls.bd_id)
91
92             # Set up all interfaces
93             for i in cls.pg_interfaces:
94                 i.admin_up()
95
96             # Mapping between packet-generator index and lists of test hosts
97             cls.hosts_by_pg_idx = dict()
98             for pg_if in cls.pg_interfaces:
99                 cls.hosts_by_pg_idx[pg_if.sw_if_index] = []
100
101             # Create list of deleted hosts
102             cls.deleted_hosts_by_pg_idx = dict()
103             for pg_if in cls.pg_interfaces:
104                 cls.deleted_hosts_by_pg_idx[pg_if.sw_if_index] = []
105
106             # warm-up the mac address tables
107             # self.warmup_test()
108
109         except Exception:
110             super(TestACLplugin, cls).tearDownClass()
111             raise
112
113     def setUp(self):
114         super(TestACLplugin, self).setUp()
115         self.reset_packet_infos()
116
117     def tearDown(self):
118         """
119         Show various debug prints after each test.
120         """
121         super(TestACLplugin, self).tearDown()
122         if not self.vpp_dead:
123             self.logger.info(self.vapi.ppcli("show l2fib verbose"))
124             self.logger.info(self.vapi.ppcli("show bridge-domain %s detail"
125                                              % self.bd_id))
126
127     def create_hosts(self, count, start=0):
128         """
129         Create required number of host MAC addresses and distribute them among
130         interfaces. Create host IPv4 address for every host MAC address.
131
132         :param int count: Number of hosts to create MAC/IPv4 addresses for.
133         :param int start: Number to start numbering from.
134         """
135         n_int = len(self.pg_interfaces)
136         macs_per_if = count / n_int
137         i = -1
138         for pg_if in self.pg_interfaces:
139             i += 1
140             start_nr = macs_per_if * i + start
141             end_nr = count + start if i == (n_int - 1) \
142                 else macs_per_if * (i + 1) + start
143             hosts = self.hosts_by_pg_idx[pg_if.sw_if_index]
144             for j in range(start_nr, end_nr):
145                 host = Host(
146                     "00:00:00:ff:%02x:%02x" % (pg_if.sw_if_index, j),
147                     "172.17.1%02x.%u" % (pg_if.sw_if_index, j),
148                     "2017:dead:%02x::%u" % (pg_if.sw_if_index, j))
149                 hosts.append(host)
150
151     def create_rule(self, ip=0, permit_deny=0, ports=PORTS_ALL, proto=-1,
152                     s_prefix=0, s_ip='\x00\x00\x00\x00',
153                     d_prefix=0, d_ip='\x00\x00\x00\x00'):
154         if proto == -1:
155             return
156         if ports == self.PORTS_ALL:
157             sport_from = 0
158             dport_from = 0
159             sport_to = 65535 if proto != 1 and proto != 58 else 255
160             dport_to = sport_to
161         elif ports == self.PORTS_RANGE:
162             if proto == 1:
163                 sport_from = self.icmp4_type
164                 sport_to = self.icmp4_type
165                 dport_from = self.icmp4_code
166                 dport_to = self.icmp4_code
167             elif proto == 58:
168                 sport_from = self.icmp6_type
169                 sport_to = self.icmp6_type
170                 dport_from = self.icmp6_code
171                 dport_to = self.icmp6_code
172             elif proto == self.proto[self.IP][self.TCP]:
173                 sport_from = self.tcp_sport_from
174                 sport_to = self.tcp_sport_to
175                 dport_from = self.tcp_dport_from
176                 dport_to = self.tcp_dport_to
177             elif proto == self.proto[self.IP][self.UDP]:
178                 sport_from = self.udp_sport_from
179                 sport_to = self.udp_sport_to
180                 dport_from = self.udp_dport_from
181                 dport_to = self.udp_dport_to
182         else:
183             sport_from = ports
184             sport_to = ports
185             dport_from = ports
186             dport_to = ports
187
188         rule = ({'is_permit': permit_deny, 'is_ipv6': ip, 'proto': proto,
189                  'srcport_or_icmptype_first': sport_from,
190                  'srcport_or_icmptype_last': sport_to,
191                  'src_ip_prefix_len': s_prefix,
192                  'src_ip_addr': s_ip,
193                  'dstport_or_icmpcode_first': dport_from,
194                  'dstport_or_icmpcode_last': dport_to,
195                  'dst_ip_prefix_len': d_prefix,
196                  'dst_ip_addr': d_ip})
197         return rule
198
199     def apply_rules(self, rules, tag=''):
200         reply = self.api_acl_add_replace(acl_index=4294967295, r=rules,
201                                          count=len(rules),
202                                          tag=tag)
203         self.logger.info("Dumped ACL: " + str(
204             self.api_acl_dump(reply.acl_index)))
205         # Apply a ACL on the interface as inbound
206         for i in self.pg_interfaces:
207             self.api_acl_interface_set_acl_list(sw_if_index=i.sw_if_index,
208                                                 count=1, n_input=1,
209                                                 acls=[reply.acl_index])
210         return
211
212     def create_upper_layer(self, packet_index, proto, ports=0):
213         p = self.proto_map[proto]
214         if p == 'UDP':
215             if ports == 0:
216                 return UDP(sport=random.randint(self.udp_sport_from,
217                                                 self.udp_sport_to),
218                            dport=random.randint(self.udp_dport_from,
219                                                 self.udp_dport_to))
220             else:
221                 return UDP(sport=ports, dport=ports)
222         elif p == 'TCP':
223             if ports == 0:
224                 return TCP(sport=random.randint(self.tcp_sport_from,
225                                                 self.tcp_sport_to),
226                            dport=random.randint(self.tcp_dport_from,
227                                                 self.tcp_dport_to))
228             else:
229                 return TCP(sport=ports, dport=ports)
230         return ''
231
232     def create_stream(self, src_if, packet_sizes, traffic_type=0, ipv6=0,
233                       proto=-1, ports=0, fragments=False):
234         """
235         Create input packet stream for defined interface using hosts or
236         deleted_hosts list.
237
238         :param object src_if: Interface to create packet stream for.
239         :param list packet_sizes: List of required packet sizes.
240         :param traffic_type: 1: ICMP packet, 2: IPv6 with EH, 0: otherwise.
241         :return: Stream of packets.
242         """
243         pkts = []
244         if self.flows.__contains__(src_if):
245             src_hosts = self.hosts_by_pg_idx[src_if.sw_if_index]
246             for dst_if in self.flows[src_if]:
247                 dst_hosts = self.hosts_by_pg_idx[dst_if.sw_if_index]
248                 n_int = len(dst_hosts) * len(src_hosts)
249                 for i in range(0, n_int):
250                     dst_host = dst_hosts[i / len(src_hosts)]
251                     src_host = src_hosts[i % len(src_hosts)]
252                     pkt_info = self.create_packet_info(src_if, dst_if)
253                     if ipv6 == 1:
254                         pkt_info.ip = 1
255                     elif ipv6 == 0:
256                         pkt_info.ip = 0
257                     else:
258                         pkt_info.ip = random.choice([0, 1])
259                     if proto == -1:
260                         pkt_info.proto = random.choice(self.proto[self.IP])
261                     else:
262                         pkt_info.proto = proto
263                     payload = self.info_to_payload(pkt_info)
264                     p = Ether(dst=dst_host.mac, src=src_host.mac)
265                     if pkt_info.ip:
266                         p /= IPv6(dst=dst_host.ip6, src=src_host.ip6)
267                         if fragments:
268                             p /= IPv6ExtHdrFragment(offset=64, m=1)
269                     else:
270                         if fragments:
271                             p /= IP(src=src_host.ip4, dst=dst_host.ip4,
272                                     flags=1, frag=64)
273                         else:
274                             p /= IP(src=src_host.ip4, dst=dst_host.ip4)
275                     if traffic_type == self.ICMP:
276                         if pkt_info.ip:
277                             p /= ICMPv6EchoRequest(type=self.icmp6_type,
278                                                    code=self.icmp6_code)
279                         else:
280                             p /= ICMP(type=self.icmp4_type,
281                                       code=self.icmp4_code)
282                     else:
283                         p /= self.create_upper_layer(i, pkt_info.proto, ports)
284                     p /= Raw(payload)
285                     pkt_info.data = p.copy()
286                     size = random.choice(packet_sizes)
287                     self.extend_packet(p, size)
288                     pkts.append(p)
289         return pkts
290
291     def verify_capture(self, pg_if, capture, traffic_type=0, ip_type=0):
292         """
293         Verify captured input packet stream for defined interface.
294
295         :param object pg_if: Interface to verify captured packet stream for.
296         :param list capture: Captured packet stream.
297         :param traffic_type: 1: ICMP packet, 2: IPv6 with EH, 0: otherwise.
298         """
299         last_info = dict()
300         for i in self.pg_interfaces:
301             last_info[i.sw_if_index] = None
302         dst_sw_if_index = pg_if.sw_if_index
303         for packet in capture:
304             try:
305                 # Raw data for ICMPv6 are stored in ICMPv6EchoRequest.data
306                 if traffic_type == self.ICMP and ip_type == self.IPV6:
307                     payload_info = self.payload_to_info(
308                         packet[ICMPv6EchoRequest].data)
309                     payload = packet[ICMPv6EchoRequest]
310                 else:
311                     payload_info = self.payload_to_info(str(packet[Raw]))
312                     payload = packet[self.proto_map[payload_info.proto]]
313             except:
314                 self.logger.error(ppp("Unexpected or invalid packet "
315                                       "(outside network):", packet))
316                 raise
317
318             if ip_type != 0:
319                 self.assertEqual(payload_info.ip, ip_type)
320             if traffic_type == self.ICMP:
321                 try:
322                     if payload_info.ip == 0:
323                         self.assertEqual(payload.type, self.icmp4_type)
324                         self.assertEqual(payload.code, self.icmp4_code)
325                     else:
326                         self.assertEqual(payload.type, self.icmp6_type)
327                         self.assertEqual(payload.code, self.icmp6_code)
328                 except:
329                     self.logger.error(ppp("Unexpected or invalid packet "
330                                           "(outside network):", packet))
331                     raise
332             else:
333                 try:
334                     ip_version = IPv6 if payload_info.ip == 1 else IP
335
336                     ip = packet[ip_version]
337                     packet_index = payload_info.index
338
339                     self.assertEqual(payload_info.dst, dst_sw_if_index)
340                     self.logger.debug("Got packet on port %s: src=%u (id=%u)" %
341                                       (pg_if.name, payload_info.src,
342                                        packet_index))
343                     next_info = self.get_next_packet_info_for_interface2(
344                         payload_info.src, dst_sw_if_index,
345                         last_info[payload_info.src])
346                     last_info[payload_info.src] = next_info
347                     self.assertTrue(next_info is not None)
348                     self.assertEqual(packet_index, next_info.index)
349                     saved_packet = next_info.data
350                     # Check standard fields
351                     self.assertEqual(ip.src, saved_packet[ip_version].src)
352                     self.assertEqual(ip.dst, saved_packet[ip_version].dst)
353                     p = self.proto_map[payload_info.proto]
354                     if p == 'TCP':
355                         tcp = packet[TCP]
356                         self.assertEqual(tcp.sport, saved_packet[
357                             TCP].sport)
358                         self.assertEqual(tcp.dport, saved_packet[
359                             TCP].dport)
360                     elif p == 'UDP':
361                         udp = packet[UDP]
362                         self.assertEqual(udp.sport, saved_packet[
363                             UDP].sport)
364                         self.assertEqual(udp.dport, saved_packet[
365                             UDP].dport)
366                 except:
367                     self.logger.error(ppp("Unexpected or invalid packet:",
368                                           packet))
369                     raise
370         for i in self.pg_interfaces:
371             remaining_packet = self.get_next_packet_info_for_interface2(
372                 i, dst_sw_if_index, last_info[i.sw_if_index])
373             self.assertTrue(
374                 remaining_packet is None,
375                 "Port %u: Packet expected from source %u didn't arrive" %
376                 (dst_sw_if_index, i.sw_if_index))
377
378     def run_traffic_no_check(self):
379         # Test
380         # Create incoming packet streams for packet-generator interfaces
381         for i in self.pg_interfaces:
382             if self.flows.__contains__(i):
383                 pkts = self.create_stream(i, self.pg_if_packet_sizes)
384                 if len(pkts) > 0:
385                     i.add_stream(pkts)
386
387         # Enable packet capture and start packet sending
388         self.pg_enable_capture(self.pg_interfaces)
389         self.pg_start()
390
391     def run_verify_test(self, traffic_type=0, ip_type=0, proto=-1, ports=0,
392                         frags=False):
393         # Test
394         # Create incoming packet streams for packet-generator interfaces
395         pkts_cnt = 0
396         for i in self.pg_interfaces:
397             if self.flows.__contains__(i):
398                 pkts = self.create_stream(i, self.pg_if_packet_sizes,
399                                           traffic_type, ip_type, proto, ports,
400                                           frags)
401                 if len(pkts) > 0:
402                     i.add_stream(pkts)
403                     pkts_cnt += len(pkts)
404
405         # Enable packet capture and start packet sendingself.IPV
406         self.pg_enable_capture(self.pg_interfaces)
407         self.pg_start()
408
409         # Verify
410         # Verify outgoing packet streams per packet-generator interface
411         for src_if in self.pg_interfaces:
412             if self.flows.__contains__(src_if):
413                 for dst_if in self.flows[src_if]:
414                     capture = dst_if.get_capture(pkts_cnt)
415                     self.logger.info("Verifying capture on interface %s" %
416                                      dst_if.name)
417                     self.verify_capture(dst_if, capture, traffic_type, ip_type)
418
419     def run_verify_negat_test(self, traffic_type=0, ip_type=0, proto=-1,
420                               ports=0, frags=False):
421         # Test
422         self.reset_packet_infos()
423         for i in self.pg_interfaces:
424             if self.flows.__contains__(i):
425                 pkts = self.create_stream(i, self.pg_if_packet_sizes,
426                                           traffic_type, ip_type, proto, ports,
427                                           frags)
428                 if len(pkts) > 0:
429                     i.add_stream(pkts)
430
431         # Enable packet capture and start packet sending
432         self.pg_enable_capture(self.pg_interfaces)
433         self.pg_start()
434
435         # Verify
436         # Verify outgoing packet streams per packet-generator interface
437         for src_if in self.pg_interfaces:
438             if self.flows.__contains__(src_if):
439                 for dst_if in self.flows[src_if]:
440                     self.logger.info("Verifying capture on interface %s" %
441                                      dst_if.name)
442                     capture = dst_if.get_capture(0)
443                     self.assertEqual(len(capture), 0)
444
445     def api_acl_add_replace(self, acl_index, r, count, tag='',
446                             expected_retval=0):
447         """Add/replace an ACL
448
449         :param int acl_index: ACL index to replace,
450         4294967295 to create new ACL.
451         :param acl_rule r: ACL rules array.
452         :param str tag: symbolic tag (description) for this ACL.
453         :param int count: number of rules.
454         """
455         return self.vapi.api(self.vapi.papi.acl_add_replace,
456                              {'acl_index': acl_index,
457                               'r': r,
458                               'count': count,
459                               'tag': tag},
460                              expected_retval=expected_retval)
461
462     def api_acl_interface_set_acl_list(self, sw_if_index, count, n_input, acls,
463                                        expected_retval=0):
464         return self.vapi.api(self.vapi.papi.acl_interface_set_acl_list,
465                              {'sw_if_index': sw_if_index,
466                               'count': count,
467                               'n_input': n_input,
468                               'acls': acls},
469                              expected_retval=expected_retval)
470
471     def api_acl_dump(self, acl_index, expected_retval=0):
472         return self.vapi.api(self.vapi.papi.acl_dump,
473                              {'acl_index': acl_index},
474                              expected_retval=expected_retval)
475
476     def test_0000_warmup_test(self):
477         """ ACL plugin version check; learn MACs
478         """
479         self.create_hosts(16)
480         self.run_traffic_no_check()
481         reply = self.vapi.papi.acl_plugin_get_version()
482         self.assertEqual(reply.major, 1)
483         self.logger.info("Working with ACL plugin version: %d.%d" % (
484             reply.major, reply.minor))
485         # minor version changes are non breaking
486         # self.assertEqual(reply.minor, 0)
487
488     def test_0001_acl_create(self):
489         """ ACL create test
490         """
491
492         self.logger.info("ACLP_TEST_START_0001")
493         # Add an ACL
494         r = [{'is_permit': 1, 'is_ipv6': 0, 'proto': 17,
495               'srcport_or_icmptype_first': 1234,
496               'srcport_or_icmptype_last': 1235,
497               'src_ip_prefix_len': 0,
498               'src_ip_addr': '\x00\x00\x00\x00',
499               'dstport_or_icmpcode_first': 1234,
500               'dstport_or_icmpcode_last': 1234,
501               'dst_ip_addr': '\x00\x00\x00\x00',
502               'dst_ip_prefix_len': 0}]
503         # Test 1: add a new ACL
504         reply = self.api_acl_add_replace(acl_index=4294967295, r=r,
505                                          count=len(r), tag="permit 1234")
506         self.assertEqual(reply.retval, 0)
507         # The very first ACL gets #0
508         self.assertEqual(reply.acl_index, 0)
509         rr = self.api_acl_dump(reply.acl_index)
510         self.logger.info("Dumped ACL: " + str(rr))
511         self.assertEqual(len(rr), 1)
512         # We should have the same number of ACL entries as we had asked
513         self.assertEqual(len(rr[0].r), len(r))
514         # The rules should be the same. But because the submitted and returned
515         # are different types, we need to iterate over rules and keys to get
516         # to basic values.
517         for i_rule in range(0, len(r) - 1):
518             for rule_key in r[i_rule]:
519                 self.assertEqual(rr[0].r[i_rule][rule_key],
520                                  r[i_rule][rule_key])
521
522         # Add a deny-1234 ACL
523         r_deny = ({'is_permit': 0, 'is_ipv6': 0, 'proto': 17,
524                    'srcport_or_icmptype_first': 1234,
525                    'srcport_or_icmptype_last': 1235,
526                    'src_ip_prefix_len': 0,
527                    'src_ip_addr': '\x00\x00\x00\x00',
528                    'dstport_or_icmpcode_first': 1234,
529                    'dstport_or_icmpcode_last': 1234,
530                    'dst_ip_addr': '\x00\x00\x00\x00',
531                    'dst_ip_prefix_len': 0},
532                   {'is_permit': 1, 'is_ipv6': 0, 'proto': 17,
533                    'srcport_or_icmptype_first': 0,
534                    'srcport_or_icmptype_last': 0,
535                    'src_ip_prefix_len': 0,
536                    'src_ip_addr': '\x00\x00\x00\x00',
537                    'dstport_or_icmpcode_first': 0,
538                    'dstport_or_icmpcode_last': 0,
539                    'dst_ip_addr': '\x00\x00\x00\x00',
540                    'dst_ip_prefix_len': 0})
541
542         reply = self.api_acl_add_replace(acl_index=4294967295, r=r_deny,
543                                          count=len(r_deny),
544                                          tag="deny 1234;permit all")
545         self.assertEqual(reply.retval, 0)
546         # The second ACL gets #1
547         self.assertEqual(reply.acl_index, 1)
548
549         # Test 2: try to modify a nonexistent ACL
550         reply = self.api_acl_add_replace(acl_index=432, r=r, count=len(r),
551                                          tag="FFFF:FFFF", expected_retval=-1)
552         self.assertEqual(reply.retval, -1)
553         # The ACL number should pass through
554         self.assertEqual(reply.acl_index, 432)
555
556         self.logger.info("ACLP_TEST_FINISH_0001")
557
558     def test_0002_acl_permit_apply(self):
559         """ permit ACL apply test
560         """
561         self.logger.info("ACLP_TEST_START_0002")
562
563         rules = []
564         rules.append(self.create_rule(self.IPV4, self.PERMIT,
565                      0, self.proto[self.IP][self.UDP]))
566         rules.append(self.create_rule(self.IPV4, self.PERMIT,
567                      0, self.proto[self.IP][self.TCP]))
568
569         # Apply rules
570         self.apply_rules(rules, "permit per-flow")
571
572         # Traffic should still pass
573         self.run_verify_test(self.IP, self.IPV4, -1)
574         self.logger.info("ACLP_TEST_FINISH_0002")
575
576     def test_0003_acl_deny_apply(self):
577         """ deny ACL apply test
578         """
579         self.logger.info("ACLP_TEST_START_0003")
580         # Add a deny-flows ACL
581         rules = []
582         rules.append(self.create_rule(self.IPV4, self.DENY,
583                      self.PORTS_ALL, self.proto[self.IP][self.UDP]))
584         # Permit ip any any in the end
585         rules.append(self.create_rule(self.IPV4, self.PERMIT,
586                                       self.PORTS_ALL, 0))
587
588         # Apply rules
589         self.apply_rules(rules, "deny per-flow;permit all")
590
591         # Traffic should not pass
592         self.run_verify_negat_test(self.IP, self.IPV4,
593                                    self.proto[self.IP][self.UDP])
594         self.logger.info("ACLP_TEST_FINISH_0003")
595         # self.assertEqual(1, 0)
596
597     def test_0004_vpp624_permit_icmpv4(self):
598         """ VPP_624 permit ICMPv4
599         """
600         self.logger.info("ACLP_TEST_START_0004")
601
602         # Add an ACL
603         rules = []
604         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
605                                       self.proto[self.ICMP][self.ICMPv4]))
606         # deny ip any any in the end
607         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
608
609         # Apply rules
610         self.apply_rules(rules, "permit icmpv4")
611
612         # Traffic should still pass
613         self.run_verify_test(self.ICMP, self.IPV4,
614                              self.proto[self.ICMP][self.ICMPv4])
615
616         self.logger.info("ACLP_TEST_FINISH_0004")
617
618     def test_0005_vpp624_permit_icmpv6(self):
619         """ VPP_624 permit ICMPv6
620         """
621         self.logger.info("ACLP_TEST_START_0005")
622
623         # Add an ACL
624         rules = []
625         rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
626                                       self.proto[self.ICMP][self.ICMPv6]))
627         # deny ip any any in the end
628         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
629
630         # Apply rules
631         self.apply_rules(rules, "permit icmpv6")
632
633         # Traffic should still pass
634         self.run_verify_test(self.ICMP, self.IPV6,
635                              self.proto[self.ICMP][self.ICMPv6])
636
637         self.logger.info("ACLP_TEST_FINISH_0005")
638
639     def test_0006_vpp624_deny_icmpv4(self):
640         """ VPP_624 deny ICMPv4
641         """
642         self.logger.info("ACLP_TEST_START_0006")
643         # Add an ACL
644         rules = []
645         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
646                                       self.proto[self.ICMP][self.ICMPv4]))
647         # permit ip any any in the end
648         rules.append(self.create_rule(self.IPV4, self.PERMIT,
649                                       self.PORTS_ALL, 0))
650
651         # Apply rules
652         self.apply_rules(rules, "deny icmpv4")
653
654         # Traffic should not pass
655         self.run_verify_negat_test(self.ICMP, self.IPV4, 0)
656
657         self.logger.info("ACLP_TEST_FINISH_0006")
658
659     def test_0007_vpp624_deny_icmpv6(self):
660         """ VPP_624 deny ICMPv6
661         """
662         self.logger.info("ACLP_TEST_START_0007")
663         # Add an ACL
664         rules = []
665         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
666                                       self.proto[self.ICMP][self.ICMPv6]))
667         # deny ip any any in the end
668         rules.append(self.create_rule(self.IPV6, self.PERMIT,
669                                       self.PORTS_ALL, 0))
670
671         # Apply rules
672         self.apply_rules(rules, "deny icmpv6")
673
674         # Traffic should not pass
675         self.run_verify_negat_test(self.ICMP, self.IPV6, 0)
676
677         self.logger.info("ACLP_TEST_FINISH_0007")
678
679     def test_0008_tcp_permit_v4(self):
680         """ permit TCPv4
681         """
682         self.logger.info("ACLP_TEST_START_0008")
683
684         # Add an ACL
685         rules = []
686         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
687                      self.proto[self.IP][self.TCP]))
688         # deny ip any any in the end
689         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
690
691         # Apply rules
692         self.apply_rules(rules, "permit ipv4 tcp")
693
694         # Traffic should still pass
695         self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP])
696
697         self.logger.info("ACLP_TEST_FINISH_0008")
698
699     def test_0009_tcp_permit_v6(self):
700         """ permit TCPv6
701         """
702         self.logger.info("ACLP_TEST_START_0009")
703
704         # Add an ACL
705         rules = []
706         rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
707                                       self.proto[self.IP][self.TCP]))
708         # deny ip any any in the end
709         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
710
711         # Apply rules
712         self.apply_rules(rules, "permit ip6 tcp")
713
714         # Traffic should still pass
715         self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.TCP])
716
717         self.logger.info("ACLP_TEST_FINISH_0008")
718
719     def test_0010_udp_permit_v4(self):
720         """ permit UDPv4
721         """
722         self.logger.info("ACLP_TEST_START_0010")
723
724         # Add an ACL
725         rules = []
726         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
727                                       self.proto[self.IP][self.UDP]))
728         # deny ip any any in the end
729         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
730
731         # Apply rules
732         self.apply_rules(rules, "permit ipv udp")
733
734         # Traffic should still pass
735         self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP])
736
737         self.logger.info("ACLP_TEST_FINISH_0010")
738
739     def test_0011_udp_permit_v6(self):
740         """ permit UDPv6
741         """
742         self.logger.info("ACLP_TEST_START_0011")
743
744         # Add an ACL
745         rules = []
746         rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
747                                       self.proto[self.IP][self.UDP]))
748         # deny ip any any in the end
749         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
750
751         # Apply rules
752         self.apply_rules(rules, "permit ip6 udp")
753
754         # Traffic should still pass
755         self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.UDP])
756
757         self.logger.info("ACLP_TEST_FINISH_0011")
758
759     def test_0012_tcp_deny(self):
760         """ deny TCPv4/v6
761         """
762         self.logger.info("ACLP_TEST_START_0012")
763
764         # Add an ACL
765         rules = []
766         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
767                                       self.proto[self.IP][self.TCP]))
768         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
769                                       self.proto[self.IP][self.TCP]))
770         # permit ip any any in the end
771         rules.append(self.create_rule(self.IPV4, self.PERMIT,
772                                       self.PORTS_ALL, 0))
773         rules.append(self.create_rule(self.IPV6, self.PERMIT,
774                                       self.PORTS_ALL, 0))
775
776         # Apply rules
777         self.apply_rules(rules, "deny ip4/ip6 tcp")
778
779         # Traffic should not pass
780         self.run_verify_negat_test(self.IP, self.IPRANDOM,
781                                    self.proto[self.IP][self.TCP])
782
783         self.logger.info("ACLP_TEST_FINISH_0012")
784
785     def test_0013_udp_deny(self):
786         """ deny UDPv4/v6
787         """
788         self.logger.info("ACLP_TEST_START_0013")
789
790         # Add an ACL
791         rules = []
792         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
793                                       self.proto[self.IP][self.UDP]))
794         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
795                                       self.proto[self.IP][self.UDP]))
796         # permit ip any any in the end
797         rules.append(self.create_rule(self.IPV4, self.PERMIT,
798                                       self.PORTS_ALL, 0))
799         rules.append(self.create_rule(self.IPV6, self.PERMIT,
800                                       self.PORTS_ALL, 0))
801
802         # Apply rules
803         self.apply_rules(rules, "deny ip4/ip6 udp")
804
805         # Traffic should not pass
806         self.run_verify_negat_test(self.IP, self.IPRANDOM,
807                                    self.proto[self.IP][self.UDP])
808
809         self.logger.info("ACLP_TEST_FINISH_0013")
810
811     def test_0014_acl_dump(self):
812         """ verify add/dump acls
813         """
814         self.logger.info("ACLP_TEST_START_0014")
815
816         r = [[self.IPV4, self.PERMIT, 1234, self.proto[self.IP][self.TCP]],
817              [self.IPV4, self.PERMIT, 2345, self.proto[self.IP][self.UDP]],
818              [self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.TCP]],
819              [self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.UDP]],
820              [self.IPV4, self.PERMIT, 5, self.proto[self.ICMP][self.ICMPv4]],
821              [self.IPV6, self.PERMIT, 4321, self.proto[self.IP][self.TCP]],
822              [self.IPV6, self.PERMIT, 5432, self.proto[self.IP][self.UDP]],
823              [self.IPV6, self.PERMIT, 0, self.proto[self.IP][self.TCP]],
824              [self.IPV6, self.PERMIT, 0, self.proto[self.IP][self.UDP]],
825              [self.IPV6, self.PERMIT, 6, self.proto[self.ICMP][self.ICMPv6]],
826              [self.IPV4, self.DENY, self.PORTS_ALL, 0],
827              [self.IPV4, self.DENY, 1234, self.proto[self.IP][self.TCP]],
828              [self.IPV4, self.DENY, 2345, self.proto[self.IP][self.UDP]],
829              [self.IPV4, self.DENY, 5, self.proto[self.ICMP][self.ICMPv4]],
830              [self.IPV6, self.DENY, 4321, self.proto[self.IP][self.TCP]],
831              [self.IPV6, self.DENY, 5432, self.proto[self.IP][self.UDP]],
832              [self.IPV6, self.DENY, 6, self.proto[self.ICMP][self.ICMPv6]],
833              [self.IPV6, self.DENY, self.PORTS_ALL, 0]
834              ]
835
836         # Add and verify new ACLs
837         rules = []
838         for i in range(len(r)):
839             rules.append(self.create_rule(r[i][0], r[i][1], r[i][2], r[i][3]))
840
841         reply = self.api_acl_add_replace(acl_index=4294967295, r=rules,
842                                          count=len(rules))
843         result = self.api_acl_dump(reply.acl_index)
844
845         i = 0
846         for drules in result:
847             for dr in drules.r:
848                 self.assertEqual(dr.is_ipv6, r[i][0])
849                 self.assertEqual(dr.is_permit, r[i][1])
850                 self.assertEqual(dr.proto, r[i][3])
851
852                 if r[i][2] > 0:
853                     self.assertEqual(dr.srcport_or_icmptype_first, r[i][2])
854                 else:
855                     if r[i][2] < 0:
856                         self.assertEqual(dr.srcport_or_icmptype_first, 0)
857                         self.assertEqual(dr.srcport_or_icmptype_last, 65535)
858                     else:
859                         if dr.proto == self.proto[self.IP][self.TCP]:
860                             self.assertGreater(dr.srcport_or_icmptype_first,
861                                                self.tcp_sport_from-1)
862                             self.assertLess(dr.srcport_or_icmptype_first,
863                                             self.tcp_sport_to+1)
864                             self.assertGreater(dr.dstport_or_icmpcode_last,
865                                                self.tcp_dport_from-1)
866                             self.assertLess(dr.dstport_or_icmpcode_last,
867                                             self.tcp_dport_to+1)
868                         elif dr.proto == self.proto[self.IP][self.UDP]:
869                             self.assertGreater(dr.srcport_or_icmptype_first,
870                                                self.udp_sport_from-1)
871                             self.assertLess(dr.srcport_or_icmptype_first,
872                                             self.udp_sport_to+1)
873                             self.assertGreater(dr.dstport_or_icmpcode_last,
874                                                self.udp_dport_from-1)
875                             self.assertLess(dr.dstport_or_icmpcode_last,
876                                             self.udp_dport_to+1)
877                 i += 1
878
879         self.logger.info("ACLP_TEST_FINISH_0014")
880
881     def test_0015_tcp_permit_port_v4(self):
882         """ permit single TCPv4
883         """
884         self.logger.info("ACLP_TEST_START_0015")
885
886         port = random.randint(0, 65535)
887         # Add an ACL
888         rules = []
889         rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
890                                       self.proto[self.IP][self.TCP]))
891         # deny ip any any in the end
892         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
893
894         # Apply rules
895         self.apply_rules(rules, "permit ip4 tcp "+str(port))
896
897         # Traffic should still pass
898         self.run_verify_test(self.IP, self.IPV4,
899                              self.proto[self.IP][self.TCP], port)
900
901         self.logger.info("ACLP_TEST_FINISH_0015")
902
903     def test_0016_udp_permit_port_v4(self):
904         """ permit single UDPv4
905         """
906         self.logger.info("ACLP_TEST_START_0016")
907
908         port = random.randint(0, 65535)
909         # Add an ACL
910         rules = []
911         rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
912                                       self.proto[self.IP][self.UDP]))
913         # deny ip any any in the end
914         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
915
916         # Apply rules
917         self.apply_rules(rules, "permit ip4 tcp "+str(port))
918
919         # Traffic should still pass
920         self.run_verify_test(self.IP, self.IPV4,
921                              self.proto[self.IP][self.UDP], port)
922
923         self.logger.info("ACLP_TEST_FINISH_0016")
924
925     def test_0017_tcp_permit_port_v6(self):
926         """ permit single TCPv6
927         """
928         self.logger.info("ACLP_TEST_START_0017")
929
930         port = random.randint(0, 65535)
931         # Add an ACL
932         rules = []
933         rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
934                                       self.proto[self.IP][self.TCP]))
935         # deny ip any any in the end
936         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
937
938         # Apply rules
939         self.apply_rules(rules, "permit ip4 tcp "+str(port))
940
941         # Traffic should still pass
942         self.run_verify_test(self.IP, self.IPV6,
943                              self.proto[self.IP][self.TCP], port)
944
945         self.logger.info("ACLP_TEST_FINISH_0017")
946
947     def test_0018_udp_permit_port_v6(self):
948         """ permit single UPPv6
949         """
950         self.logger.info("ACLP_TEST_START_0018")
951
952         port = random.randint(0, 65535)
953         # Add an ACL
954         rules = []
955         rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
956                                       self.proto[self.IP][self.UDP]))
957         # deny ip any any in the end
958         rules.append(self.create_rule(self.IPV6, self.DENY,
959                                       self.PORTS_ALL, 0))
960
961         # Apply rules
962         self.apply_rules(rules, "permit ip4 tcp "+str(port))
963
964         # Traffic should still pass
965         self.run_verify_test(self.IP, self.IPV6,
966                              self.proto[self.IP][self.UDP], port)
967
968         self.logger.info("ACLP_TEST_FINISH_0018")
969
970     def test_0019_udp_deny_port(self):
971         """ deny single TCPv4/v6
972         """
973         self.logger.info("ACLP_TEST_START_0019")
974
975         port = random.randint(0, 65535)
976         # Add an ACL
977         rules = []
978         rules.append(self.create_rule(self.IPV4, self.DENY, port,
979                                       self.proto[self.IP][self.TCP]))
980         rules.append(self.create_rule(self.IPV6, self.DENY, port,
981                                       self.proto[self.IP][self.TCP]))
982         # Permit ip any any in the end
983         rules.append(self.create_rule(self.IPV4, self.PERMIT,
984                                       self.PORTS_ALL, 0))
985         rules.append(self.create_rule(self.IPV6, self.PERMIT,
986                                       self.PORTS_ALL, 0))
987
988         # Apply rules
989         self.apply_rules(rules, "deny ip4/ip6 udp "+str(port))
990
991         # Traffic should not pass
992         self.run_verify_negat_test(self.IP, self.IPRANDOM,
993                                    self.proto[self.IP][self.TCP], port)
994
995         self.logger.info("ACLP_TEST_FINISH_0019")
996
997     def test_0020_udp_deny_port(self):
998         """ deny single UDPv4/v6
999         """
1000         self.logger.info("ACLP_TEST_START_0020")
1001
1002         port = random.randint(0, 65535)
1003         # Add an ACL
1004         rules = []
1005         rules.append(self.create_rule(self.IPV4, self.DENY, port,
1006                                       self.proto[self.IP][self.UDP]))
1007         rules.append(self.create_rule(self.IPV6, self.DENY, port,
1008                                       self.proto[self.IP][self.UDP]))
1009         # Permit ip any any in the end
1010         rules.append(self.create_rule(self.IPV4, self.PERMIT,
1011                                       self.PORTS_ALL, 0))
1012         rules.append(self.create_rule(self.IPV6, self.PERMIT,
1013                                       self.PORTS_ALL, 0))
1014
1015         # Apply rules
1016         self.apply_rules(rules, "deny ip4/ip6 udp "+str(port))
1017
1018         # Traffic should not pass
1019         self.run_verify_negat_test(self.IP, self.IPRANDOM,
1020                                    self.proto[self.IP][self.UDP], port)
1021
1022         self.logger.info("ACLP_TEST_FINISH_0020")
1023
1024     def test_0021_udp_deny_port_verify_fragment_deny(self):
1025         """ deny single UDPv4/v6, permit ip any, verify non-initial fragment blocked
1026         """
1027         self.logger.info("ACLP_TEST_START_0021")
1028
1029         port = random.randint(0, 65535)
1030         # Add an ACL
1031         rules = []
1032         rules.append(self.create_rule(self.IPV4, self.DENY, port,
1033                                       self.proto[self.IP][self.UDP]))
1034         rules.append(self.create_rule(self.IPV6, self.DENY, port,
1035                                       self.proto[self.IP][self.UDP]))
1036         # deny ip any any in the end
1037         rules.append(self.create_rule(self.IPV4, self.PERMIT,
1038                                       self.PORTS_ALL, 0))
1039         rules.append(self.create_rule(self.IPV6, self.PERMIT,
1040                                       self.PORTS_ALL, 0))
1041
1042         # Apply rules
1043         self.apply_rules(rules, "deny ip4/ip6 udp "+str(port))
1044
1045         # Traffic should not pass
1046         self.run_verify_negat_test(self.IP, self.IPRANDOM,
1047                                    self.proto[self.IP][self.UDP], port, True)
1048
1049         self.logger.info("ACLP_TEST_FINISH_0021")
1050
1051 if __name__ == '__main__':
1052     unittest.main(testRunner=VppTestRunner)