acl-plugin: bihash-based ACL lookup
[vpp.git] / test / test_acl_plugin.py
1 #!/usr/bin/env python
2 """ACL plugin Test Case HLD:
3 """
4
5 import unittest
6 import random
7
8 from scapy.packet import Raw
9 from scapy.layers.l2 import Ether
10 from scapy.layers.inet import IP, TCP, UDP, ICMP
11 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest
12 from scapy.layers.inet6 import IPv6ExtHdrFragment
13 from framework import VppTestCase, VppTestRunner
14 from util import Host, ppp
15
16
17 class TestACLplugin(VppTestCase):
18     """ ACL plugin Test Case """
19
20     # traffic types
21     IP = 0
22     ICMP = 1
23
24     # IP version
25     IPRANDOM = -1
26     IPV4 = 0
27     IPV6 = 1
28
29     # rule types
30     DENY = 0
31     PERMIT = 1
32
33     # supported protocols
34     proto = [[6, 17], [1, 58]]
35     proto_map = {1: 'ICMP', 58: 'ICMPv6EchoRequest', 6: 'TCP', 17: 'UDP'}
36     ICMPv4 = 0
37     ICMPv6 = 1
38     TCP = 0
39     UDP = 1
40     PROTO_ALL = 0
41
42     # port ranges
43     PORTS_ALL = -1
44     PORTS_RANGE = 0
45     udp_sport_from = 10
46     udp_sport_to = udp_sport_from + 5
47     udp_dport_from = 20000
48     udp_dport_to = udp_dport_from + 5000
49     tcp_sport_from = 30
50     tcp_sport_to = tcp_sport_from + 5
51     tcp_dport_from = 40000
52     tcp_dport_to = tcp_dport_from + 5000
53
54     icmp4_type = 8  # echo request
55     icmp4_code = 3
56     icmp6_type = 128  # echo request
57     icmp6_code = 3
58
59     # Test variables
60     bd_id = 1
61
62     @classmethod
63     def setUpClass(cls):
64         """
65         Perform standard class setup (defined by class method setUpClass in
66         class VppTestCase) before running the test case, set test case related
67         variables and configure VPP.
68         """
69         super(TestACLplugin, cls).setUpClass()
70
71         random.seed()
72
73         try:
74             # Create 2 pg interfaces
75             cls.create_pg_interfaces(range(2))
76
77             # Packet flows mapping pg0 -> pg1, pg2 etc.
78             cls.flows = dict()
79             cls.flows[cls.pg0] = [cls.pg1]
80
81             # Packet sizes
82             cls.pg_if_packet_sizes = [64, 512, 1518, 9018]
83
84             # Create BD with MAC learning and unknown unicast flooding disabled
85             # and put interfaces to this BD
86             cls.vapi.bridge_domain_add_del(bd_id=cls.bd_id, uu_flood=1,
87                                            learn=1)
88             for pg_if in cls.pg_interfaces:
89                 cls.vapi.sw_interface_set_l2_bridge(pg_if.sw_if_index,
90                                                     bd_id=cls.bd_id)
91
92             # Set up all interfaces
93             for i in cls.pg_interfaces:
94                 i.admin_up()
95
96             # Mapping between packet-generator index and lists of test hosts
97             cls.hosts_by_pg_idx = dict()
98             for pg_if in cls.pg_interfaces:
99                 cls.hosts_by_pg_idx[pg_if.sw_if_index] = []
100
101             # Create list of deleted hosts
102             cls.deleted_hosts_by_pg_idx = dict()
103             for pg_if in cls.pg_interfaces:
104                 cls.deleted_hosts_by_pg_idx[pg_if.sw_if_index] = []
105
106             # warm-up the mac address tables
107             # self.warmup_test()
108
109         except Exception:
110             super(TestACLplugin, cls).tearDownClass()
111             raise
112
113     def setUp(self):
114         super(TestACLplugin, self).setUp()
115         self.reset_packet_infos()
116
117     def tearDown(self):
118         """
119         Show various debug prints after each test.
120         """
121         super(TestACLplugin, self).tearDown()
122         if not self.vpp_dead:
123             self.logger.info(self.vapi.ppcli("show l2fib verbose"))
124             self.logger.info(self.vapi.ppcli("show acl-plugin acl"))
125             self.logger.info(self.vapi.ppcli("show acl-plugin interface"))
126             self.logger.info(self.vapi.ppcli("show acl-plugin tables"))
127             self.logger.info(self.vapi.ppcli("show bridge-domain %s detail"
128                                              % self.bd_id))
129
130     def create_hosts(self, count, start=0):
131         """
132         Create required number of host MAC addresses and distribute them among
133         interfaces. Create host IPv4 address for every host MAC address.
134
135         :param int count: Number of hosts to create MAC/IPv4 addresses for.
136         :param int start: Number to start numbering from.
137         """
138         n_int = len(self.pg_interfaces)
139         macs_per_if = count / n_int
140         i = -1
141         for pg_if in self.pg_interfaces:
142             i += 1
143             start_nr = macs_per_if * i + start
144             end_nr = count + start if i == (n_int - 1) \
145                 else macs_per_if * (i + 1) + start
146             hosts = self.hosts_by_pg_idx[pg_if.sw_if_index]
147             for j in range(start_nr, end_nr):
148                 host = Host(
149                     "00:00:00:ff:%02x:%02x" % (pg_if.sw_if_index, j),
150                     "172.17.1%02x.%u" % (pg_if.sw_if_index, j),
151                     "2017:dead:%02x::%u" % (pg_if.sw_if_index, j))
152                 hosts.append(host)
153
154     def create_rule(self, ip=0, permit_deny=0, ports=PORTS_ALL, proto=-1,
155                     s_prefix=0, s_ip='\x00\x00\x00\x00',
156                     d_prefix=0, d_ip='\x00\x00\x00\x00'):
157         if proto == -1:
158             return
159         if ports == self.PORTS_ALL:
160             sport_from = 0
161             dport_from = 0
162             sport_to = 65535 if proto != 1 and proto != 58 else 255
163             dport_to = sport_to
164         elif ports == self.PORTS_RANGE:
165             if proto == 1:
166                 sport_from = self.icmp4_type
167                 sport_to = self.icmp4_type
168                 dport_from = self.icmp4_code
169                 dport_to = self.icmp4_code
170             elif proto == 58:
171                 sport_from = self.icmp6_type
172                 sport_to = self.icmp6_type
173                 dport_from = self.icmp6_code
174                 dport_to = self.icmp6_code
175             elif proto == self.proto[self.IP][self.TCP]:
176                 sport_from = self.tcp_sport_from
177                 sport_to = self.tcp_sport_to
178                 dport_from = self.tcp_dport_from
179                 dport_to = self.tcp_dport_to
180             elif proto == self.proto[self.IP][self.UDP]:
181                 sport_from = self.udp_sport_from
182                 sport_to = self.udp_sport_to
183                 dport_from = self.udp_dport_from
184                 dport_to = self.udp_dport_to
185         else:
186             sport_from = ports
187             sport_to = ports
188             dport_from = ports
189             dport_to = ports
190
191         rule = ({'is_permit': permit_deny, 'is_ipv6': ip, 'proto': proto,
192                  'srcport_or_icmptype_first': sport_from,
193                  'srcport_or_icmptype_last': sport_to,
194                  'src_ip_prefix_len': s_prefix,
195                  'src_ip_addr': s_ip,
196                  'dstport_or_icmpcode_first': dport_from,
197                  'dstport_or_icmpcode_last': dport_to,
198                  'dst_ip_prefix_len': d_prefix,
199                  'dst_ip_addr': d_ip})
200         return rule
201
202     def apply_rules(self, rules, tag=''):
203         reply = self.api_acl_add_replace(acl_index=4294967295, r=rules,
204                                          count=len(rules),
205                                          tag=tag)
206         self.logger.info("Dumped ACL: " + str(
207             self.api_acl_dump(reply.acl_index)))
208         # Apply a ACL on the interface as inbound
209         for i in self.pg_interfaces:
210             self.api_acl_interface_set_acl_list(sw_if_index=i.sw_if_index,
211                                                 count=1, n_input=1,
212                                                 acls=[reply.acl_index])
213         return
214
215     def create_upper_layer(self, packet_index, proto, ports=0):
216         p = self.proto_map[proto]
217         if p == 'UDP':
218             if ports == 0:
219                 return UDP(sport=random.randint(self.udp_sport_from,
220                                                 self.udp_sport_to),
221                            dport=random.randint(self.udp_dport_from,
222                                                 self.udp_dport_to))
223             else:
224                 return UDP(sport=ports, dport=ports)
225         elif p == 'TCP':
226             if ports == 0:
227                 return TCP(sport=random.randint(self.tcp_sport_from,
228                                                 self.tcp_sport_to),
229                            dport=random.randint(self.tcp_dport_from,
230                                                 self.tcp_dport_to))
231             else:
232                 return TCP(sport=ports, dport=ports)
233         return ''
234
235     def create_stream(self, src_if, packet_sizes, traffic_type=0, ipv6=0,
236                       proto=-1, ports=0, fragments=False, pkt_raw=True):
237         """
238         Create input packet stream for defined interface using hosts or
239         deleted_hosts list.
240
241         :param object src_if: Interface to create packet stream for.
242         :param list packet_sizes: List of required packet sizes.
243         :param traffic_type: 1: ICMP packet, 2: IPv6 with EH, 0: otherwise.
244         :return: Stream of packets.
245         """
246         pkts = []
247         if self.flows.__contains__(src_if):
248             src_hosts = self.hosts_by_pg_idx[src_if.sw_if_index]
249             for dst_if in self.flows[src_if]:
250                 dst_hosts = self.hosts_by_pg_idx[dst_if.sw_if_index]
251                 n_int = len(dst_hosts) * len(src_hosts)
252                 for i in range(0, n_int):
253                     dst_host = dst_hosts[i / len(src_hosts)]
254                     src_host = src_hosts[i % len(src_hosts)]
255                     pkt_info = self.create_packet_info(src_if, dst_if)
256                     if ipv6 == 1:
257                         pkt_info.ip = 1
258                     elif ipv6 == 0:
259                         pkt_info.ip = 0
260                     else:
261                         pkt_info.ip = random.choice([0, 1])
262                     if proto == -1:
263                         pkt_info.proto = random.choice(self.proto[self.IP])
264                     else:
265                         pkt_info.proto = proto
266                     payload = self.info_to_payload(pkt_info)
267                     p = Ether(dst=dst_host.mac, src=src_host.mac)
268                     if pkt_info.ip:
269                         p /= IPv6(dst=dst_host.ip6, src=src_host.ip6)
270                         if fragments:
271                             p /= IPv6ExtHdrFragment(offset=64, m=1)
272                     else:
273                         if fragments:
274                             p /= IP(src=src_host.ip4, dst=dst_host.ip4,
275                                     flags=1, frag=64)
276                         else:
277                             p /= IP(src=src_host.ip4, dst=dst_host.ip4)
278                     if traffic_type == self.ICMP:
279                         if pkt_info.ip:
280                             p /= ICMPv6EchoRequest(type=self.icmp6_type,
281                                                    code=self.icmp6_code)
282                         else:
283                             p /= ICMP(type=self.icmp4_type,
284                                       code=self.icmp4_code)
285                     else:
286                         p /= self.create_upper_layer(i, pkt_info.proto, ports)
287                     if pkt_raw:
288                         p /= Raw(payload)
289                         pkt_info.data = p.copy()
290                     if pkt_raw:
291                         size = random.choice(packet_sizes)
292                         self.extend_packet(p, size)
293                     pkts.append(p)
294         return pkts
295
296     def verify_capture(self, pg_if, capture, traffic_type=0, ip_type=0):
297         """
298         Verify captured input packet stream for defined interface.
299
300         :param object pg_if: Interface to verify captured packet stream for.
301         :param list capture: Captured packet stream.
302         :param traffic_type: 1: ICMP packet, 2: IPv6 with EH, 0: otherwise.
303         """
304         last_info = dict()
305         for i in self.pg_interfaces:
306             last_info[i.sw_if_index] = None
307         dst_sw_if_index = pg_if.sw_if_index
308         for packet in capture:
309             try:
310                 # Raw data for ICMPv6 are stored in ICMPv6EchoRequest.data
311                 if traffic_type == self.ICMP and ip_type == self.IPV6:
312                     payload_info = self.payload_to_info(
313                         packet[ICMPv6EchoRequest].data)
314                     payload = packet[ICMPv6EchoRequest]
315                 else:
316                     payload_info = self.payload_to_info(str(packet[Raw]))
317                     payload = packet[self.proto_map[payload_info.proto]]
318             except:
319                 self.logger.error(ppp("Unexpected or invalid packet "
320                                       "(outside network):", packet))
321                 raise
322
323             if ip_type != 0:
324                 self.assertEqual(payload_info.ip, ip_type)
325             if traffic_type == self.ICMP:
326                 try:
327                     if payload_info.ip == 0:
328                         self.assertEqual(payload.type, self.icmp4_type)
329                         self.assertEqual(payload.code, self.icmp4_code)
330                     else:
331                         self.assertEqual(payload.type, self.icmp6_type)
332                         self.assertEqual(payload.code, self.icmp6_code)
333                 except:
334                     self.logger.error(ppp("Unexpected or invalid packet "
335                                           "(outside network):", packet))
336                     raise
337             else:
338                 try:
339                     ip_version = IPv6 if payload_info.ip == 1 else IP
340
341                     ip = packet[ip_version]
342                     packet_index = payload_info.index
343
344                     self.assertEqual(payload_info.dst, dst_sw_if_index)
345                     self.logger.debug("Got packet on port %s: src=%u (id=%u)" %
346                                       (pg_if.name, payload_info.src,
347                                        packet_index))
348                     next_info = self.get_next_packet_info_for_interface2(
349                         payload_info.src, dst_sw_if_index,
350                         last_info[payload_info.src])
351                     last_info[payload_info.src] = next_info
352                     self.assertTrue(next_info is not None)
353                     self.assertEqual(packet_index, next_info.index)
354                     saved_packet = next_info.data
355                     # Check standard fields
356                     self.assertEqual(ip.src, saved_packet[ip_version].src)
357                     self.assertEqual(ip.dst, saved_packet[ip_version].dst)
358                     p = self.proto_map[payload_info.proto]
359                     if p == 'TCP':
360                         tcp = packet[TCP]
361                         self.assertEqual(tcp.sport, saved_packet[
362                             TCP].sport)
363                         self.assertEqual(tcp.dport, saved_packet[
364                             TCP].dport)
365                     elif p == 'UDP':
366                         udp = packet[UDP]
367                         self.assertEqual(udp.sport, saved_packet[
368                             UDP].sport)
369                         self.assertEqual(udp.dport, saved_packet[
370                             UDP].dport)
371                 except:
372                     self.logger.error(ppp("Unexpected or invalid packet:",
373                                           packet))
374                     raise
375         for i in self.pg_interfaces:
376             remaining_packet = self.get_next_packet_info_for_interface2(
377                 i, dst_sw_if_index, last_info[i.sw_if_index])
378             self.assertTrue(
379                 remaining_packet is None,
380                 "Port %u: Packet expected from source %u didn't arrive" %
381                 (dst_sw_if_index, i.sw_if_index))
382
383     def run_traffic_no_check(self):
384         # Test
385         # Create incoming packet streams for packet-generator interfaces
386         for i in self.pg_interfaces:
387             if self.flows.__contains__(i):
388                 pkts = self.create_stream(i, self.pg_if_packet_sizes)
389                 if len(pkts) > 0:
390                     i.add_stream(pkts)
391
392         # Enable packet capture and start packet sending
393         self.pg_enable_capture(self.pg_interfaces)
394         self.pg_start()
395
396     def run_verify_test(self, traffic_type=0, ip_type=0, proto=-1, ports=0,
397                         frags=False, pkt_raw=True):
398         # Test
399         # Create incoming packet streams for packet-generator interfaces
400         pkts_cnt = 0
401         for i in self.pg_interfaces:
402             if self.flows.__contains__(i):
403                 pkts = self.create_stream(i, self.pg_if_packet_sizes,
404                                           traffic_type, ip_type, proto, ports,
405                                           frags, pkt_raw)
406                 if len(pkts) > 0:
407                     i.add_stream(pkts)
408                     pkts_cnt += len(pkts)
409
410         # Enable packet capture and start packet sendingself.IPV
411         self.pg_enable_capture(self.pg_interfaces)
412         self.pg_start()
413
414         # Verify
415         # Verify outgoing packet streams per packet-generator interface
416         for src_if in self.pg_interfaces:
417             if self.flows.__contains__(src_if):
418                 for dst_if in self.flows[src_if]:
419                     capture = dst_if.get_capture(pkts_cnt)
420                     self.logger.info("Verifying capture on interface %s" %
421                                      dst_if.name)
422                     self.verify_capture(dst_if, capture, traffic_type, ip_type)
423
424     def run_verify_negat_test(self, traffic_type=0, ip_type=0, proto=-1,
425                               ports=0, frags=False):
426         # Test
427         self.reset_packet_infos()
428         for i in self.pg_interfaces:
429             if self.flows.__contains__(i):
430                 pkts = self.create_stream(i, self.pg_if_packet_sizes,
431                                           traffic_type, ip_type, proto, ports,
432                                           frags)
433                 if len(pkts) > 0:
434                     i.add_stream(pkts)
435
436         # Enable packet capture and start packet sending
437         self.pg_enable_capture(self.pg_interfaces)
438         self.pg_start()
439
440         # Verify
441         # Verify outgoing packet streams per packet-generator interface
442         for src_if in self.pg_interfaces:
443             if self.flows.__contains__(src_if):
444                 for dst_if in self.flows[src_if]:
445                     self.logger.info("Verifying capture on interface %s" %
446                                      dst_if.name)
447                     capture = dst_if.get_capture(0)
448                     self.assertEqual(len(capture), 0)
449
450     def api_acl_add_replace(self, acl_index, r, count, tag='',
451                             expected_retval=0):
452         """Add/replace an ACL
453
454         :param int acl_index: ACL index to replace,
455         4294967295 to create new ACL.
456         :param acl_rule r: ACL rules array.
457         :param str tag: symbolic tag (description) for this ACL.
458         :param int count: number of rules.
459         """
460         return self.vapi.api(self.vapi.papi.acl_add_replace,
461                              {'acl_index': acl_index,
462                               'r': r,
463                               'count': count,
464                               'tag': tag},
465                              expected_retval=expected_retval)
466
467     def api_acl_interface_set_acl_list(self, sw_if_index, count, n_input, acls,
468                                        expected_retval=0):
469         return self.vapi.api(self.vapi.papi.acl_interface_set_acl_list,
470                              {'sw_if_index': sw_if_index,
471                               'count': count,
472                               'n_input': n_input,
473                               'acls': acls},
474                              expected_retval=expected_retval)
475
476     def api_acl_dump(self, acl_index, expected_retval=0):
477         return self.vapi.api(self.vapi.papi.acl_dump,
478                              {'acl_index': acl_index},
479                              expected_retval=expected_retval)
480
481     def test_0000_warmup_test(self):
482         """ ACL plugin version check; learn MACs
483         """
484         self.create_hosts(16)
485         self.run_traffic_no_check()
486         reply = self.vapi.papi.acl_plugin_get_version()
487         self.assertEqual(reply.major, 1)
488         self.logger.info("Working with ACL plugin version: %d.%d" % (
489             reply.major, reply.minor))
490         # minor version changes are non breaking
491         # self.assertEqual(reply.minor, 0)
492
493     def test_0001_acl_create(self):
494         """ ACL create test
495         """
496
497         self.logger.info("ACLP_TEST_START_0001")
498         # Add an ACL
499         r = [{'is_permit': 1, 'is_ipv6': 0, 'proto': 17,
500               'srcport_or_icmptype_first': 1234,
501               'srcport_or_icmptype_last': 1235,
502               'src_ip_prefix_len': 0,
503               'src_ip_addr': '\x00\x00\x00\x00',
504               'dstport_or_icmpcode_first': 1234,
505               'dstport_or_icmpcode_last': 1234,
506               'dst_ip_addr': '\x00\x00\x00\x00',
507               'dst_ip_prefix_len': 0}]
508         # Test 1: add a new ACL
509         reply = self.api_acl_add_replace(acl_index=4294967295, r=r,
510                                          count=len(r), tag="permit 1234")
511         self.assertEqual(reply.retval, 0)
512         # The very first ACL gets #0
513         self.assertEqual(reply.acl_index, 0)
514         rr = self.api_acl_dump(reply.acl_index)
515         self.logger.info("Dumped ACL: " + str(rr))
516         self.assertEqual(len(rr), 1)
517         # We should have the same number of ACL entries as we had asked
518         self.assertEqual(len(rr[0].r), len(r))
519         # The rules should be the same. But because the submitted and returned
520         # are different types, we need to iterate over rules and keys to get
521         # to basic values.
522         for i_rule in range(0, len(r) - 1):
523             for rule_key in r[i_rule]:
524                 self.assertEqual(rr[0].r[i_rule][rule_key],
525                                  r[i_rule][rule_key])
526
527         # Add a deny-1234 ACL
528         r_deny = ({'is_permit': 0, 'is_ipv6': 0, 'proto': 17,
529                    'srcport_or_icmptype_first': 1234,
530                    'srcport_or_icmptype_last': 1235,
531                    'src_ip_prefix_len': 0,
532                    'src_ip_addr': '\x00\x00\x00\x00',
533                    'dstport_or_icmpcode_first': 1234,
534                    'dstport_or_icmpcode_last': 1234,
535                    'dst_ip_addr': '\x00\x00\x00\x00',
536                    'dst_ip_prefix_len': 0},
537                   {'is_permit': 1, 'is_ipv6': 0, 'proto': 17,
538                    'srcport_or_icmptype_first': 0,
539                    'srcport_or_icmptype_last': 0,
540                    'src_ip_prefix_len': 0,
541                    'src_ip_addr': '\x00\x00\x00\x00',
542                    'dstport_or_icmpcode_first': 0,
543                    'dstport_or_icmpcode_last': 0,
544                    'dst_ip_addr': '\x00\x00\x00\x00',
545                    'dst_ip_prefix_len': 0})
546
547         reply = self.api_acl_add_replace(acl_index=4294967295, r=r_deny,
548                                          count=len(r_deny),
549                                          tag="deny 1234;permit all")
550         self.assertEqual(reply.retval, 0)
551         # The second ACL gets #1
552         self.assertEqual(reply.acl_index, 1)
553
554         # Test 2: try to modify a nonexistent ACL
555         reply = self.api_acl_add_replace(acl_index=432, r=r, count=len(r),
556                                          tag="FFFF:FFFF", expected_retval=-1)
557         self.assertEqual(reply.retval, -1)
558         # The ACL number should pass through
559         self.assertEqual(reply.acl_index, 432)
560
561         self.logger.info("ACLP_TEST_FINISH_0001")
562
563     def test_0002_acl_permit_apply(self):
564         """ permit ACL apply test
565         """
566         self.logger.info("ACLP_TEST_START_0002")
567
568         rules = []
569         rules.append(self.create_rule(self.IPV4, self.PERMIT,
570                      0, self.proto[self.IP][self.UDP]))
571         rules.append(self.create_rule(self.IPV4, self.PERMIT,
572                      0, self.proto[self.IP][self.TCP]))
573
574         # Apply rules
575         self.apply_rules(rules, "permit per-flow")
576
577         # Traffic should still pass
578         self.run_verify_test(self.IP, self.IPV4, -1)
579         self.logger.info("ACLP_TEST_FINISH_0002")
580
581     def test_0003_acl_deny_apply(self):
582         """ deny ACL apply test
583         """
584         self.logger.info("ACLP_TEST_START_0003")
585         # Add a deny-flows ACL
586         rules = []
587         rules.append(self.create_rule(self.IPV4, self.DENY,
588                      self.PORTS_ALL, self.proto[self.IP][self.UDP]))
589         # Permit ip any any in the end
590         rules.append(self.create_rule(self.IPV4, self.PERMIT,
591                                       self.PORTS_ALL, 0))
592
593         # Apply rules
594         self.apply_rules(rules, "deny per-flow;permit all")
595
596         # Traffic should not pass
597         self.run_verify_negat_test(self.IP, self.IPV4,
598                                    self.proto[self.IP][self.UDP])
599         self.logger.info("ACLP_TEST_FINISH_0003")
600         # self.assertEqual(1, 0)
601
602     def test_0004_vpp624_permit_icmpv4(self):
603         """ VPP_624 permit ICMPv4
604         """
605         self.logger.info("ACLP_TEST_START_0004")
606
607         # Add an ACL
608         rules = []
609         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
610                                       self.proto[self.ICMP][self.ICMPv4]))
611         # deny ip any any in the end
612         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
613
614         # Apply rules
615         self.apply_rules(rules, "permit icmpv4")
616
617         # Traffic should still pass
618         self.run_verify_test(self.ICMP, self.IPV4,
619                              self.proto[self.ICMP][self.ICMPv4])
620
621         self.logger.info("ACLP_TEST_FINISH_0004")
622
623     def test_0005_vpp624_permit_icmpv6(self):
624         """ VPP_624 permit ICMPv6
625         """
626         self.logger.info("ACLP_TEST_START_0005")
627
628         # Add an ACL
629         rules = []
630         rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
631                                       self.proto[self.ICMP][self.ICMPv6]))
632         # deny ip any any in the end
633         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
634
635         # Apply rules
636         self.apply_rules(rules, "permit icmpv6")
637
638         # Traffic should still pass
639         self.run_verify_test(self.ICMP, self.IPV6,
640                              self.proto[self.ICMP][self.ICMPv6])
641
642         self.logger.info("ACLP_TEST_FINISH_0005")
643
644     def test_0006_vpp624_deny_icmpv4(self):
645         """ VPP_624 deny ICMPv4
646         """
647         self.logger.info("ACLP_TEST_START_0006")
648         # Add an ACL
649         rules = []
650         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
651                                       self.proto[self.ICMP][self.ICMPv4]))
652         # permit ip any any in the end
653         rules.append(self.create_rule(self.IPV4, self.PERMIT,
654                                       self.PORTS_ALL, 0))
655
656         # Apply rules
657         self.apply_rules(rules, "deny icmpv4")
658
659         # Traffic should not pass
660         self.run_verify_negat_test(self.ICMP, self.IPV4, 0)
661
662         self.logger.info("ACLP_TEST_FINISH_0006")
663
664     def test_0007_vpp624_deny_icmpv6(self):
665         """ VPP_624 deny ICMPv6
666         """
667         self.logger.info("ACLP_TEST_START_0007")
668         # Add an ACL
669         rules = []
670         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
671                                       self.proto[self.ICMP][self.ICMPv6]))
672         # deny ip any any in the end
673         rules.append(self.create_rule(self.IPV6, self.PERMIT,
674                                       self.PORTS_ALL, 0))
675
676         # Apply rules
677         self.apply_rules(rules, "deny icmpv6")
678
679         # Traffic should not pass
680         self.run_verify_negat_test(self.ICMP, self.IPV6, 0)
681
682         self.logger.info("ACLP_TEST_FINISH_0007")
683
684     def test_0008_tcp_permit_v4(self):
685         """ permit TCPv4
686         """
687         self.logger.info("ACLP_TEST_START_0008")
688
689         # Add an ACL
690         rules = []
691         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
692                      self.proto[self.IP][self.TCP]))
693         # deny ip any any in the end
694         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
695
696         # Apply rules
697         self.apply_rules(rules, "permit ipv4 tcp")
698
699         # Traffic should still pass
700         self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP])
701
702         self.logger.info("ACLP_TEST_FINISH_0008")
703
704     def test_0009_tcp_permit_v6(self):
705         """ permit TCPv6
706         """
707         self.logger.info("ACLP_TEST_START_0009")
708
709         # Add an ACL
710         rules = []
711         rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
712                                       self.proto[self.IP][self.TCP]))
713         # deny ip any any in the end
714         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
715
716         # Apply rules
717         self.apply_rules(rules, "permit ip6 tcp")
718
719         # Traffic should still pass
720         self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.TCP])
721
722         self.logger.info("ACLP_TEST_FINISH_0008")
723
724     def test_0010_udp_permit_v4(self):
725         """ permit UDPv4
726         """
727         self.logger.info("ACLP_TEST_START_0010")
728
729         # Add an ACL
730         rules = []
731         rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
732                                       self.proto[self.IP][self.UDP]))
733         # deny ip any any in the end
734         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
735
736         # Apply rules
737         self.apply_rules(rules, "permit ipv udp")
738
739         # Traffic should still pass
740         self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP])
741
742         self.logger.info("ACLP_TEST_FINISH_0010")
743
744     def test_0011_udp_permit_v6(self):
745         """ permit UDPv6
746         """
747         self.logger.info("ACLP_TEST_START_0011")
748
749         # Add an ACL
750         rules = []
751         rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
752                                       self.proto[self.IP][self.UDP]))
753         # deny ip any any in the end
754         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
755
756         # Apply rules
757         self.apply_rules(rules, "permit ip6 udp")
758
759         # Traffic should still pass
760         self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.UDP])
761
762         self.logger.info("ACLP_TEST_FINISH_0011")
763
764     def test_0012_tcp_deny(self):
765         """ deny TCPv4/v6
766         """
767         self.logger.info("ACLP_TEST_START_0012")
768
769         # Add an ACL
770         rules = []
771         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
772                                       self.proto[self.IP][self.TCP]))
773         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
774                                       self.proto[self.IP][self.TCP]))
775         # permit ip any any in the end
776         rules.append(self.create_rule(self.IPV4, self.PERMIT,
777                                       self.PORTS_ALL, 0))
778         rules.append(self.create_rule(self.IPV6, self.PERMIT,
779                                       self.PORTS_ALL, 0))
780
781         # Apply rules
782         self.apply_rules(rules, "deny ip4/ip6 tcp")
783
784         # Traffic should not pass
785         self.run_verify_negat_test(self.IP, self.IPRANDOM,
786                                    self.proto[self.IP][self.TCP])
787
788         self.logger.info("ACLP_TEST_FINISH_0012")
789
790     def test_0013_udp_deny(self):
791         """ deny UDPv4/v6
792         """
793         self.logger.info("ACLP_TEST_START_0013")
794
795         # Add an ACL
796         rules = []
797         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
798                                       self.proto[self.IP][self.UDP]))
799         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
800                                       self.proto[self.IP][self.UDP]))
801         # permit ip any any in the end
802         rules.append(self.create_rule(self.IPV4, self.PERMIT,
803                                       self.PORTS_ALL, 0))
804         rules.append(self.create_rule(self.IPV6, self.PERMIT,
805                                       self.PORTS_ALL, 0))
806
807         # Apply rules
808         self.apply_rules(rules, "deny ip4/ip6 udp")
809
810         # Traffic should not pass
811         self.run_verify_negat_test(self.IP, self.IPRANDOM,
812                                    self.proto[self.IP][self.UDP])
813
814         self.logger.info("ACLP_TEST_FINISH_0013")
815
816     def test_0014_acl_dump(self):
817         """ verify add/dump acls
818         """
819         self.logger.info("ACLP_TEST_START_0014")
820
821         r = [[self.IPV4, self.PERMIT, 1234, self.proto[self.IP][self.TCP]],
822              [self.IPV4, self.PERMIT, 2345, self.proto[self.IP][self.UDP]],
823              [self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.TCP]],
824              [self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.UDP]],
825              [self.IPV4, self.PERMIT, 5, self.proto[self.ICMP][self.ICMPv4]],
826              [self.IPV6, self.PERMIT, 4321, self.proto[self.IP][self.TCP]],
827              [self.IPV6, self.PERMIT, 5432, self.proto[self.IP][self.UDP]],
828              [self.IPV6, self.PERMIT, 0, self.proto[self.IP][self.TCP]],
829              [self.IPV6, self.PERMIT, 0, self.proto[self.IP][self.UDP]],
830              [self.IPV6, self.PERMIT, 6, self.proto[self.ICMP][self.ICMPv6]],
831              [self.IPV4, self.DENY, self.PORTS_ALL, 0],
832              [self.IPV4, self.DENY, 1234, self.proto[self.IP][self.TCP]],
833              [self.IPV4, self.DENY, 2345, self.proto[self.IP][self.UDP]],
834              [self.IPV4, self.DENY, 5, self.proto[self.ICMP][self.ICMPv4]],
835              [self.IPV6, self.DENY, 4321, self.proto[self.IP][self.TCP]],
836              [self.IPV6, self.DENY, 5432, self.proto[self.IP][self.UDP]],
837              [self.IPV6, self.DENY, 6, self.proto[self.ICMP][self.ICMPv6]],
838              [self.IPV6, self.DENY, self.PORTS_ALL, 0]
839              ]
840
841         # Add and verify new ACLs
842         rules = []
843         for i in range(len(r)):
844             rules.append(self.create_rule(r[i][0], r[i][1], r[i][2], r[i][3]))
845
846         reply = self.api_acl_add_replace(acl_index=4294967295, r=rules,
847                                          count=len(rules))
848         result = self.api_acl_dump(reply.acl_index)
849
850         i = 0
851         for drules in result:
852             for dr in drules.r:
853                 self.assertEqual(dr.is_ipv6, r[i][0])
854                 self.assertEqual(dr.is_permit, r[i][1])
855                 self.assertEqual(dr.proto, r[i][3])
856
857                 if r[i][2] > 0:
858                     self.assertEqual(dr.srcport_or_icmptype_first, r[i][2])
859                 else:
860                     if r[i][2] < 0:
861                         self.assertEqual(dr.srcport_or_icmptype_first, 0)
862                         self.assertEqual(dr.srcport_or_icmptype_last, 65535)
863                     else:
864                         if dr.proto == self.proto[self.IP][self.TCP]:
865                             self.assertGreater(dr.srcport_or_icmptype_first,
866                                                self.tcp_sport_from-1)
867                             self.assertLess(dr.srcport_or_icmptype_first,
868                                             self.tcp_sport_to+1)
869                             self.assertGreater(dr.dstport_or_icmpcode_last,
870                                                self.tcp_dport_from-1)
871                             self.assertLess(dr.dstport_or_icmpcode_last,
872                                             self.tcp_dport_to+1)
873                         elif dr.proto == self.proto[self.IP][self.UDP]:
874                             self.assertGreater(dr.srcport_or_icmptype_first,
875                                                self.udp_sport_from-1)
876                             self.assertLess(dr.srcport_or_icmptype_first,
877                                             self.udp_sport_to+1)
878                             self.assertGreater(dr.dstport_or_icmpcode_last,
879                                                self.udp_dport_from-1)
880                             self.assertLess(dr.dstport_or_icmpcode_last,
881                                             self.udp_dport_to+1)
882                 i += 1
883
884         self.logger.info("ACLP_TEST_FINISH_0014")
885
886     def test_0015_tcp_permit_port_v4(self):
887         """ permit single TCPv4
888         """
889         self.logger.info("ACLP_TEST_START_0015")
890
891         port = random.randint(0, 65535)
892         # Add an ACL
893         rules = []
894         rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
895                                       self.proto[self.IP][self.TCP]))
896         # deny ip any any in the end
897         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
898
899         # Apply rules
900         self.apply_rules(rules, "permit ip4 tcp "+str(port))
901
902         # Traffic should still pass
903         self.run_verify_test(self.IP, self.IPV4,
904                              self.proto[self.IP][self.TCP], port)
905
906         self.logger.info("ACLP_TEST_FINISH_0015")
907
908     def test_0016_udp_permit_port_v4(self):
909         """ permit single UDPv4
910         """
911         self.logger.info("ACLP_TEST_START_0016")
912
913         port = random.randint(0, 65535)
914         # Add an ACL
915         rules = []
916         rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
917                                       self.proto[self.IP][self.UDP]))
918         # deny ip any any in the end
919         rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
920
921         # Apply rules
922         self.apply_rules(rules, "permit ip4 tcp "+str(port))
923
924         # Traffic should still pass
925         self.run_verify_test(self.IP, self.IPV4,
926                              self.proto[self.IP][self.UDP], port)
927
928         self.logger.info("ACLP_TEST_FINISH_0016")
929
930     def test_0017_tcp_permit_port_v6(self):
931         """ permit single TCPv6
932         """
933         self.logger.info("ACLP_TEST_START_0017")
934
935         port = random.randint(0, 65535)
936         # Add an ACL
937         rules = []
938         rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
939                                       self.proto[self.IP][self.TCP]))
940         # deny ip any any in the end
941         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
942
943         # Apply rules
944         self.apply_rules(rules, "permit ip4 tcp "+str(port))
945
946         # Traffic should still pass
947         self.run_verify_test(self.IP, self.IPV6,
948                              self.proto[self.IP][self.TCP], port)
949
950         self.logger.info("ACLP_TEST_FINISH_0017")
951
952     def test_0018_udp_permit_port_v6(self):
953         """ permit single UPPv6
954         """
955         self.logger.info("ACLP_TEST_START_0018")
956
957         port = random.randint(0, 65535)
958         # Add an ACL
959         rules = []
960         rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
961                                       self.proto[self.IP][self.UDP]))
962         # deny ip any any in the end
963         rules.append(self.create_rule(self.IPV6, self.DENY,
964                                       self.PORTS_ALL, 0))
965
966         # Apply rules
967         self.apply_rules(rules, "permit ip4 tcp "+str(port))
968
969         # Traffic should still pass
970         self.run_verify_test(self.IP, self.IPV6,
971                              self.proto[self.IP][self.UDP], port)
972
973         self.logger.info("ACLP_TEST_FINISH_0018")
974
975     def test_0019_udp_deny_port(self):
976         """ deny single TCPv4/v6
977         """
978         self.logger.info("ACLP_TEST_START_0019")
979
980         port = random.randint(0, 65535)
981         # Add an ACL
982         rules = []
983         rules.append(self.create_rule(self.IPV4, self.DENY, port,
984                                       self.proto[self.IP][self.TCP]))
985         rules.append(self.create_rule(self.IPV6, self.DENY, port,
986                                       self.proto[self.IP][self.TCP]))
987         # Permit ip any any in the end
988         rules.append(self.create_rule(self.IPV4, self.PERMIT,
989                                       self.PORTS_ALL, 0))
990         rules.append(self.create_rule(self.IPV6, self.PERMIT,
991                                       self.PORTS_ALL, 0))
992
993         # Apply rules
994         self.apply_rules(rules, "deny ip4/ip6 udp "+str(port))
995
996         # Traffic should not pass
997         self.run_verify_negat_test(self.IP, self.IPRANDOM,
998                                    self.proto[self.IP][self.TCP], port)
999
1000         self.logger.info("ACLP_TEST_FINISH_0019")
1001
1002     def test_0020_udp_deny_port(self):
1003         """ deny single UDPv4/v6
1004         """
1005         self.logger.info("ACLP_TEST_START_0020")
1006
1007         port = random.randint(0, 65535)
1008         # Add an ACL
1009         rules = []
1010         rules.append(self.create_rule(self.IPV4, self.DENY, port,
1011                                       self.proto[self.IP][self.UDP]))
1012         rules.append(self.create_rule(self.IPV6, self.DENY, port,
1013                                       self.proto[self.IP][self.UDP]))
1014         # Permit ip any any in the end
1015         rules.append(self.create_rule(self.IPV4, self.PERMIT,
1016                                       self.PORTS_ALL, 0))
1017         rules.append(self.create_rule(self.IPV6, self.PERMIT,
1018                                       self.PORTS_ALL, 0))
1019
1020         # Apply rules
1021         self.apply_rules(rules, "deny ip4/ip6 udp "+str(port))
1022
1023         # Traffic should not pass
1024         self.run_verify_negat_test(self.IP, self.IPRANDOM,
1025                                    self.proto[self.IP][self.UDP], port)
1026
1027         self.logger.info("ACLP_TEST_FINISH_0020")
1028
1029     def test_0021_udp_deny_port_verify_fragment_deny(self):
1030         """ deny single UDPv4/v6, permit ip any, verify non-initial fragment blocked
1031         """
1032         self.logger.info("ACLP_TEST_START_0021")
1033
1034         port = random.randint(0, 65535)
1035         # Add an ACL
1036         rules = []
1037         rules.append(self.create_rule(self.IPV4, self.DENY, port,
1038                                       self.proto[self.IP][self.UDP]))
1039         rules.append(self.create_rule(self.IPV6, self.DENY, port,
1040                                       self.proto[self.IP][self.UDP]))
1041         # deny ip any any in the end
1042         rules.append(self.create_rule(self.IPV4, self.PERMIT,
1043                                       self.PORTS_ALL, 0))
1044         rules.append(self.create_rule(self.IPV6, self.PERMIT,
1045                                       self.PORTS_ALL, 0))
1046
1047         # Apply rules
1048         self.apply_rules(rules, "deny ip4/ip6 udp "+str(port))
1049
1050         # Traffic should not pass
1051         self.run_verify_negat_test(self.IP, self.IPRANDOM,
1052                                    self.proto[self.IP][self.UDP], port, True)
1053
1054         self.logger.info("ACLP_TEST_FINISH_0021")
1055
1056     def test_0022_zero_length_udp_ipv4(self):
1057         """ VPP-687 zero length udp ipv4 packet"""
1058         self.logger.info("ACLP_TEST_START_0022")
1059
1060         port = random.randint(0, 65535)
1061         # Add an ACL
1062         rules = []
1063         rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
1064                                       self.proto[self.IP][self.UDP]))
1065         # deny ip any any in the end
1066         rules.append(
1067             self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1068
1069         # Apply rules
1070         self.apply_rules(rules, "permit empty udp ip4 " + str(port))
1071
1072         # Traffic should still pass
1073         # Create incoming packet streams for packet-generator interfaces
1074         pkts_cnt = 0
1075         pkts = self.create_stream(self.pg0, self.pg_if_packet_sizes,
1076                                   self.IP, self.IPV4,
1077                                   self.proto[self.IP][self.UDP], port,
1078                                   False, False)
1079         if len(pkts) > 0:
1080             self.pg0.add_stream(pkts)
1081             pkts_cnt += len(pkts)
1082
1083         # Enable packet capture and start packet sendingself.IPV
1084         self.pg_enable_capture(self.pg_interfaces)
1085         self.pg_start()
1086
1087         self.pg1.get_capture(pkts_cnt)
1088
1089         self.logger.info("ACLP_TEST_FINISH_0022")
1090
1091     def test_0023_zero_length_udp_ipv6(self):
1092         """ VPP-687 zero length udp ipv6 packet"""
1093         self.logger.info("ACLP_TEST_START_0023")
1094
1095         port = random.randint(0, 65535)
1096         # Add an ACL
1097         rules = []
1098         rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
1099                                       self.proto[self.IP][self.UDP]))
1100         # deny ip any any in the end
1101         rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1102
1103         # Apply rules
1104         self.apply_rules(rules, "permit empty udp ip6 "+str(port))
1105
1106         # Traffic should still pass
1107         # Create incoming packet streams for packet-generator interfaces
1108         pkts_cnt = 0
1109         pkts = self.create_stream(self.pg0, self.pg_if_packet_sizes,
1110                                   self.IP, self.IPV6,
1111                                   self.proto[self.IP][self.UDP], port,
1112                                   False, False)
1113         if len(pkts) > 0:
1114             self.pg0.add_stream(pkts)
1115             pkts_cnt += len(pkts)
1116
1117         # Enable packet capture and start packet sendingself.IPV
1118         self.pg_enable_capture(self.pg_interfaces)
1119         self.pg_start()
1120
1121         # Verify outgoing packet streams per packet-generator interface
1122         self.pg1.get_capture(pkts_cnt)
1123
1124         self.logger.info("ACLP_TEST_FINISH_0023")
1125
1126 if __name__ == '__main__':
1127     unittest.main(testRunner=VppTestRunner)