acl: API cleanup
[vpp.git] / src / plugins / acl / test / test_acl_plugin_l2l3.py
1 #!/usr/bin/env python3
2 """ACL IRB Test Case HLD:
3
4 **config**
5     - L2 MAC learning enabled in l2bd
6     - 2 routed interfaces untagged, bvi (Bridge Virtual Interface)
7     - 2 bridged interfaces in l2bd with bvi
8
9 **test**
10     - sending ip4 eth pkts between routed interfaces
11         - 2 routed interfaces
12         - 2 bridged interfaces
13
14     - 64B, 512B, 1518B, 9200B (ether_size)
15
16     - burst of pkts per interface
17         - 257pkts per burst
18         - routed pkts hitting different FIB entries
19         - bridged pkts hitting different MAC entries
20
21 **verify**
22     - all packets received correctly
23
24 """
25
26 import copy
27 import unittest
28 from socket import inet_pton, AF_INET, AF_INET6
29 from random import choice, shuffle
30 from pprint import pprint
31 from ipaddress import ip_network
32
33 import scapy.compat
34 from scapy.packet import Raw
35 from scapy.layers.l2 import Ether
36 from scapy.layers.inet import IP, UDP, ICMP, TCP
37 from scapy.layers.inet6 import IPv6, ICMPv6Unknown, ICMPv6EchoRequest
38 from scapy.layers.inet6 import ICMPv6EchoReply, IPv6ExtHdrRouting
39 from scapy.layers.inet6 import IPv6ExtHdrFragment
40
41 from framework import VppTestCase, VppTestRunner
42 from vpp_l2 import L2_PORT_TYPE
43 import time
44
45 from vpp_acl import AclRule, VppAcl, VppAclInterface
46
47
48 class TestACLpluginL2L3(VppTestCase):
49     """TestACLpluginL2L3 Test Case"""
50
51     @classmethod
52     def setUpClass(cls):
53         """
54         #. Create BD with MAC learning enabled and put interfaces to this BD.
55         #. Configure IPv4 addresses on loopback interface and routed interface.
56         #. Configure MAC address binding to IPv4 neighbors on loop0.
57         #. Configure MAC address on pg2.
58         #. Loopback BVI interface has remote hosts, one half of hosts are
59            behind pg0 second behind pg1.
60         """
61         super(TestACLpluginL2L3, cls).setUpClass()
62
63         cls.pg_if_packet_sizes = [64, 512, 1518, 9018]  # packet sizes
64         cls.bd_id = 10
65         cls.remote_hosts_count = 250
66
67         # create 3 pg interfaces, 1 loopback interface
68         cls.create_pg_interfaces(range(3))
69         cls.create_loopback_interfaces(1)
70
71         cls.interfaces = list(cls.pg_interfaces)
72         cls.interfaces.extend(cls.lo_interfaces)
73
74         for i in cls.interfaces:
75             i.admin_up()
76
77         # Create BD with MAC learning enabled and put interfaces to this BD
78         cls.vapi.sw_interface_set_l2_bridge(
79             rx_sw_if_index=cls.loop0.sw_if_index, bd_id=cls.bd_id,
80             port_type=L2_PORT_TYPE.BVI)
81         cls.vapi.sw_interface_set_l2_bridge(rx_sw_if_index=cls.pg0.sw_if_index,
82                                             bd_id=cls.bd_id)
83         cls.vapi.sw_interface_set_l2_bridge(rx_sw_if_index=cls.pg1.sw_if_index,
84                                             bd_id=cls.bd_id)
85
86         # Configure IPv4 addresses on loopback interface and routed interface
87         cls.loop0.config_ip4()
88         cls.loop0.config_ip6()
89         cls.pg2.config_ip4()
90         cls.pg2.config_ip6()
91
92         # Configure MAC address binding to IPv4 neighbors on loop0
93         cls.loop0.generate_remote_hosts(cls.remote_hosts_count)
94         cls.loop0.configure_ipv4_neighbors()
95         cls.loop0.configure_ipv6_neighbors()
96         # configure MAC address on pg2
97         cls.pg2.resolve_arp()
98         cls.pg2.resolve_ndp()
99
100         cls.WITHOUT_EH = False
101         cls.WITH_EH = True
102         cls.STATELESS_ICMP = False
103         cls.STATEFUL_ICMP = True
104
105         # Loopback BVI interface has remote hosts, one half of hosts are behind
106         # pg0 second behind pg1
107         half = cls.remote_hosts_count // 2
108         cls.pg0.remote_hosts = cls.loop0.remote_hosts[:half]
109         cls.pg1.remote_hosts = cls.loop0.remote_hosts[half:]
110         reply = cls.vapi.papi.acl_stats_intf_counters_enable(enable=1)
111
112     @classmethod
113     def tearDownClass(cls):
114         reply = cls.vapi.papi.acl_stats_intf_counters_enable(enable=0)
115         super(TestACLpluginL2L3, cls).tearDownClass()
116
117     def tearDown(self):
118         """Run standard test teardown and log ``show l2patch``,
119         ``show l2fib verbose``,``show bridge-domain <bd_id> detail``,
120         ``show ip neighbors``.
121         """
122         super(TestACLpluginL2L3, self).tearDown()
123
124     def show_commands_at_teardown(self):
125         self.logger.info(self.vapi.cli("show l2patch"))
126         self.logger.info(self.vapi.cli("show classify tables"))
127         self.logger.info(self.vapi.cli("show l2fib verbose"))
128         self.logger.info(self.vapi.cli("show bridge-domain %s detail" %
129                                        self.bd_id))
130         self.logger.info(self.vapi.cli("show ip neighbors"))
131         cmd = "show acl-plugin sessions verbose 1"
132         self.logger.info(self.vapi.cli(cmd))
133         self.logger.info(self.vapi.cli("show acl-plugin acl"))
134         self.logger.info(self.vapi.cli("show acl-plugin interface"))
135         self.logger.info(self.vapi.cli("show acl-plugin tables"))
136
137     def create_stream(self, src_ip_if, dst_ip_if, reverse, packet_sizes,
138                       is_ip6, expect_blocked, expect_established,
139                       add_extension_header, icmp_stateful=False):
140         pkts = []
141         rules = []
142         permit_rules = []
143         permit_and_reflect_rules = []
144         total_packet_count = 8
145         for i in range(0, total_packet_count):
146             modulo = (i//2) % 2
147             icmp_type_delta = i % 2
148             icmp_code = i
149             is_udp_packet = (modulo == 0)
150             if is_udp_packet and icmp_stateful:
151                 continue
152             is_reflectable_icmp = (icmp_stateful and icmp_type_delta == 0 and
153                                    not is_udp_packet)
154             is_reflected_icmp = is_reflectable_icmp and expect_established
155             can_reflect_this_packet = is_udp_packet or is_reflectable_icmp
156             is_permit = i % 2
157             remote_dst_index = i % len(dst_ip_if.remote_hosts)
158             remote_dst_host = dst_ip_if.remote_hosts[remote_dst_index]
159             if is_permit == 1:
160                 info = self.create_packet_info(src_ip_if, dst_ip_if)
161                 payload = self.info_to_payload(info)
162             else:
163                 to_be_blocked = False
164                 if (expect_blocked and not expect_established):
165                     to_be_blocked = True
166                 if (not can_reflect_this_packet):
167                     to_be_blocked = True
168                 if to_be_blocked:
169                     payload = "to be blocked"
170                 else:
171                     info = self.create_packet_info(src_ip_if, dst_ip_if)
172                     payload = self.info_to_payload(info)
173             if reverse:
174                 dst_mac = 'de:ad:00:00:00:00'
175                 src_mac = remote_dst_host._mac
176                 dst_ip6 = src_ip_if.remote_ip6
177                 src_ip6 = remote_dst_host.ip6
178                 dst_ip4 = src_ip_if.remote_ip4
179                 src_ip4 = remote_dst_host.ip4
180                 dst_l4 = 1234 + i
181                 src_l4 = 4321 + i
182             else:
183                 dst_mac = src_ip_if.local_mac
184                 src_mac = src_ip_if.remote_mac
185                 src_ip6 = src_ip_if.remote_ip6
186                 dst_ip6 = remote_dst_host.ip6
187                 src_ip4 = src_ip_if.remote_ip4
188                 dst_ip4 = remote_dst_host.ip4
189                 src_l4 = 1234 + i
190                 dst_l4 = 4321 + i
191             if is_reflected_icmp:
192                 icmp_type_delta = 1
193
194             # default ULP should be something we do not use in tests
195             ulp_l4 = TCP(sport=src_l4, dport=dst_l4)
196             # potentially a chain of protocols leading to ULP
197             ulp = ulp_l4
198
199             if is_udp_packet:
200                 if is_ip6:
201                     ulp_l4 = UDP(sport=src_l4, dport=dst_l4)
202                     if add_extension_header:
203                         # prepend some extension headers
204                         ulp = (IPv6ExtHdrRouting() / IPv6ExtHdrRouting() /
205                                IPv6ExtHdrFragment(offset=0, m=1) / ulp_l4)
206                         # uncomment below to test invalid ones
207                         # ulp = IPv6ExtHdrRouting(len = 200) / ulp_l4
208                     else:
209                         ulp = ulp_l4
210                     p = (Ether(dst=dst_mac, src=src_mac) /
211                          IPv6(src=src_ip6, dst=dst_ip6) /
212                          ulp /
213                          Raw(payload))
214                 else:
215                     ulp_l4 = UDP(sport=src_l4, dport=dst_l4)
216                     # IPv4 does not allow extension headers,
217                     # but we rather make it a first fragment
218                     flags = 1 if add_extension_header else 0
219                     ulp = ulp_l4
220                     p = (Ether(dst=dst_mac, src=src_mac) /
221                          IP(src=src_ip4, dst=dst_ip4, frag=0, flags=flags) /
222                          ulp /
223                          Raw(payload))
224             elif modulo == 1:
225                 if is_ip6:
226                     ulp_l4 = ICMPv6Unknown(type=128 + icmp_type_delta,
227                                            code=icmp_code)
228                     ulp = ulp_l4
229                     p = (Ether(dst=dst_mac, src=src_mac) /
230                          IPv6(src=src_ip6, dst=dst_ip6) /
231                          ulp /
232                          Raw(payload))
233                 else:
234                     ulp_l4 = ICMP(type=8 - 8*icmp_type_delta, code=icmp_code)
235                     ulp = ulp_l4
236                     p = (Ether(dst=dst_mac, src=src_mac) /
237                          IP(src=src_ip4, dst=dst_ip4) /
238                          ulp /
239                          Raw(payload))
240
241             if i % 2 == 1:
242                 info.data = p.copy()
243             size = packet_sizes[(i // 2) % len(packet_sizes)]
244             self.extend_packet(p, size)
245             pkts.append(p)
246
247             rule_family = AF_INET6 if p.haslayer(IPv6) else AF_INET
248             rule_prefix_len = 128 if p.haslayer(IPv6) else 32
249             rule_l3_layer = IPv6 if p.haslayer(IPv6) else IP
250
251             if p.haslayer(UDP):
252                 rule_l4_sport = p[UDP].sport
253                 rule_l4_dport = p[UDP].dport
254             else:
255                 if p.haslayer(ICMP):
256                     rule_l4_sport = p[ICMP].type
257                     rule_l4_dport = p[ICMP].code
258                 else:
259                     rule_l4_sport = p[ICMPv6Unknown].type
260                     rule_l4_dport = p[ICMPv6Unknown].code
261             if p.haslayer(IPv6):
262                 rule_l4_proto = ulp_l4.overload_fields[IPv6]['nh']
263             else:
264                 rule_l4_proto = p[IP].proto
265
266             new_rule = AclRule(is_permit=is_permit, proto=rule_l4_proto,
267                                src_prefix=ip_network(
268                                    (p[rule_l3_layer].src, rule_prefix_len)),
269                                dst_prefix=ip_network(
270                                    (p[rule_l3_layer].dst, rule_prefix_len)),
271                                sport_from=rule_l4_sport,
272                                sport_to=rule_l4_sport,
273                                dport_from=rule_l4_dport,
274                                dport_to=rule_l4_dport)
275
276             rules.append(new_rule)
277             new_rule_permit = copy.copy(new_rule)
278             new_rule_permit.is_permit = 1
279             permit_rules.append(new_rule_permit)
280
281             new_rule_permit_and_reflect = copy.copy(new_rule)
282             if can_reflect_this_packet:
283                 new_rule_permit_and_reflect.is_permit = 2
284             else:
285                 new_rule_permit_and_reflect.is_permit = is_permit
286
287             permit_and_reflect_rules.append(new_rule_permit_and_reflect)
288             self.logger.info("create_stream pkt#%d: %s" % (i, payload))
289
290         return {'stream': pkts,
291                 'rules': rules,
292                 'permit_rules': permit_rules,
293                 'permit_and_reflect_rules': permit_and_reflect_rules}
294
295     def verify_capture(self, dst_ip_if, src_ip_if, capture, reverse):
296         last_info = dict()
297         for i in self.interfaces:
298             last_info[i.sw_if_index] = None
299
300         dst_ip_sw_if_index = dst_ip_if.sw_if_index
301
302         for packet in capture:
303             l3 = IP if packet.haslayer(IP) else IPv6
304             ip = packet[l3]
305             if packet.haslayer(UDP):
306                 l4 = UDP
307             else:
308                 if packet.haslayer(ICMP):
309                     l4 = ICMP
310                 else:
311                     l4 = ICMPv6Unknown
312
313             # Scapy IPv6 stuff is too smart for its own good.
314             # So we do this and coerce the ICMP into unknown type
315             if packet.haslayer(UDP):
316                 data = scapy.compat.raw(packet[UDP][Raw])
317             else:
318                 if l3 == IP:
319                     data = scapy.compat.raw(ICMP(
320                         scapy.compat.raw(packet[l3].payload))[Raw])
321                 else:
322                     data = scapy.compat.raw(ICMPv6Unknown(
323                         scapy.compat.raw(packet[l3].payload)).msgbody)
324             udp_or_icmp = packet[l3].payload
325             data_obj = Raw(data)
326             # FIXME: make framework believe we are on object
327             payload_info = self.payload_to_info(data_obj)
328             packet_index = payload_info.index
329
330             self.assertEqual(payload_info.dst, dst_ip_sw_if_index)
331
332             next_info = self.get_next_packet_info_for_interface2(
333                 payload_info.src, dst_ip_sw_if_index,
334                 last_info[payload_info.src])
335             last_info[payload_info.src] = next_info
336             self.assertTrue(next_info is not None)
337             self.assertEqual(packet_index, next_info.index)
338             saved_packet = next_info.data
339             self.assertTrue(next_info is not None)
340
341             # MAC: src, dst
342             if not reverse:
343                 self.assertEqual(packet.src, dst_ip_if.local_mac)
344                 host = dst_ip_if.host_by_mac(packet.dst)
345
346             # IP: src, dst
347             # self.assertEqual(ip.src, src_ip_if.remote_ip4)
348             if saved_packet is not None:
349                 self.assertEqual(ip.src, saved_packet[l3].src)
350                 self.assertEqual(ip.dst, saved_packet[l3].dst)
351                 if l4 == UDP:
352                     self.assertEqual(udp_or_icmp.sport, saved_packet[l4].sport)
353                     self.assertEqual(udp_or_icmp.dport, saved_packet[l4].dport)
354             # self.assertEqual(ip.dst, host.ip4)
355
356             # UDP:
357
358     def applied_acl_shuffle(self, acl_if):
359         saved_n_input = acl_if.n_input
360         # TOTO: maybe copy each one??
361         saved_acls = acl_if.acls
362
363         # now create a list of all the rules in all ACLs
364         all_rules = []
365         for old_acl in saved_acls:
366             for rule in old_acl.rules:
367                 all_rules.append(rule)
368
369         # Add a few ACLs made from shuffled rules
370         shuffle(all_rules)
371         acl1 = VppAcl(self, rules=all_rules[::2], tag="shuffle 1. acl")
372         acl1.add_vpp_config()
373
374         shuffle(all_rules)
375         acl2 = VppAcl(self, rules=all_rules[::3], tag="shuffle 2. acl")
376         acl2.add_vpp_config()
377
378         shuffle(all_rules)
379         acl3 = VppAcl(self, rules=all_rules[::2], tag="shuffle 3. acl")
380         acl3.add_vpp_config()
381
382         # apply the shuffle ACLs in front
383         input_acls = [acl1, acl2]
384         output_acls = [acl1, acl2]
385
386         # add the currently applied ACLs
387         n_input = acl_if.n_input
388         input_acls.extend(saved_acls[:n_input])
389         output_acls.extend(saved_acls[n_input:])
390
391         # and the trailing shuffle ACL(s)
392         input_acls.extend([acl3])
393         output_acls.extend([acl3])
394
395         # set the interface ACL list to the result
396         acl_if.n_input = len(input_acls)
397         acl_if.acls = input_acls + output_acls
398         acl_if.add_vpp_config()
399
400         # change the ACLs a few times
401         for i in range(1, 10):
402             shuffle(all_rules)
403             acl1.modify_vpp_config(all_rules[::1+(i % 2)])
404
405             shuffle(all_rules)
406             acl2.modify_vpp_config(all_rules[::1+(i % 3)])
407
408             shuffle(all_rules)
409             acl3.modify_vpp_config(all_rules[::1+(i % 5)])
410
411         # restore to how it was before and clean up
412         acl_if.n_input = saved_n_input
413         acl_if.acls = saved_acls
414         acl_if.add_vpp_config()
415
416         acl1.remove_vpp_config()
417         acl2.remove_vpp_config()
418         acl3.remove_vpp_config()
419
420     def create_acls_for_a_stream(self, stream_dict,
421                                  test_l2_action, is_reflect):
422         r = stream_dict['rules']
423         r_permit = stream_dict['permit_rules']
424         r_permit_reflect = stream_dict['permit_and_reflect_rules']
425         r_action = r_permit_reflect if is_reflect else r
426         action_acl = VppAcl(self, rules=r_action, tag="act. acl")
427         action_acl.add_vpp_config()
428         permit_acl = VppAcl(self, rules=r_permit, tag="perm. acl")
429         permit_acl.add_vpp_config()
430
431         return {'L2': action_acl if test_l2_action else permit_acl,
432                 'L3': permit_acl if test_l2_action else action_acl,
433                 'permit': permit_acl, 'action': action_acl}
434
435     def apply_acl_ip46_x_to_y(self, bridged_to_routed, test_l2_deny,
436                               is_ip6, is_reflect, add_eh):
437         """ Apply the ACLs
438         """
439         self.reset_packet_infos()
440         stream_dict = self.create_stream(
441             self.pg2, self.loop0,
442             bridged_to_routed,
443             self.pg_if_packet_sizes, is_ip6,
444             not is_reflect, False, add_eh)
445         stream = stream_dict['stream']
446         acl_idx = self.create_acls_for_a_stream(stream_dict, test_l2_deny,
447                                                 is_reflect)
448         n_input_l3 = 0 if bridged_to_routed else 1
449         n_input_l2 = 1 if bridged_to_routed else 0
450
451         acl_if_pg2 = VppAclInterface(self, sw_if_index=self.pg2.sw_if_index,
452                                      n_input=n_input_l3, acls=[acl_idx['L3']])
453         acl_if_pg2.add_vpp_config()
454
455         acl_if_pg0 = VppAclInterface(self, sw_if_index=self.pg0.sw_if_index,
456                                      n_input=n_input_l2, acls=[acl_idx['L2']])
457         acl_if_pg0.add_vpp_config()
458
459         acl_if_pg1 = VppAclInterface(self, sw_if_index=self.pg1.sw_if_index,
460                                      n_input=n_input_l2, acls=[acl_idx['L2']])
461         acl_if_pg1.add_vpp_config()
462
463         self.applied_acl_shuffle(acl_if_pg0)
464         self.applied_acl_shuffle(acl_if_pg1)
465         return {'L2': acl_idx['L2'], 'L3': acl_idx['L3']}
466
467     def apply_acl_ip46_both_directions_reflect(self,
468                                                primary_is_bridged_to_routed,
469                                                reflect_on_l2, is_ip6, add_eh,
470                                                stateful_icmp):
471         primary_is_routed_to_bridged = not primary_is_bridged_to_routed
472         self.reset_packet_infos()
473         stream_dict_fwd = self.create_stream(self.pg2, self.loop0,
474                                              primary_is_bridged_to_routed,
475                                              self.pg_if_packet_sizes, is_ip6,
476                                              False, False, add_eh,
477                                              stateful_icmp)
478         acl_idx_fwd = self.create_acls_for_a_stream(stream_dict_fwd,
479                                                     reflect_on_l2, True)
480
481         stream_dict_rev = self.create_stream(self.pg2, self.loop0,
482                                              not primary_is_bridged_to_routed,
483                                              self.pg_if_packet_sizes, is_ip6,
484                                              True, True, add_eh, stateful_icmp)
485         # We want the primary action to be "deny" rather than reflect
486         acl_idx_rev = self.create_acls_for_a_stream(stream_dict_rev,
487                                                     reflect_on_l2, False)
488
489         if primary_is_bridged_to_routed:
490             inbound_l2_acl = acl_idx_fwd['L2']
491         else:
492             inbound_l2_acl = acl_idx_rev['L2']
493
494         if primary_is_routed_to_bridged:
495             outbound_l2_acl = acl_idx_fwd['L2']
496         else:
497             outbound_l2_acl = acl_idx_rev['L2']
498
499         if primary_is_routed_to_bridged:
500             inbound_l3_acl = acl_idx_fwd['L3']
501         else:
502             inbound_l3_acl = acl_idx_rev['L3']
503
504         if primary_is_bridged_to_routed:
505             outbound_l3_acl = acl_idx_fwd['L3']
506         else:
507             outbound_l3_acl = acl_idx_rev['L3']
508
509         acl_if_pg2 = VppAclInterface(self, sw_if_index=self.pg2.sw_if_index,
510                                      n_input=1,
511                                      acls=[inbound_l3_acl, outbound_l3_acl])
512         acl_if_pg2.add_vpp_config()
513
514         acl_if_pg0 = VppAclInterface(self, sw_if_index=self.pg0.sw_if_index,
515                                      n_input=1,
516                                      acls=[inbound_l2_acl, outbound_l2_acl])
517         acl_if_pg0.add_vpp_config()
518
519         acl_if_pg1 = VppAclInterface(self, sw_if_index=self.pg1.sw_if_index,
520                                      n_input=1,
521                                      acls=[inbound_l2_acl, outbound_l2_acl])
522         acl_if_pg1.add_vpp_config()
523
524         self.applied_acl_shuffle(acl_if_pg0)
525         self.applied_acl_shuffle(acl_if_pg2)
526
527     def apply_acl_ip46_routed_to_bridged(self, test_l2_deny, is_ip6,
528                                          is_reflect, add_eh):
529         return self.apply_acl_ip46_x_to_y(False, test_l2_deny, is_ip6,
530                                           is_reflect, add_eh)
531
532     def apply_acl_ip46_bridged_to_routed(self, test_l2_deny, is_ip6,
533                                          is_reflect, add_eh):
534         return self.apply_acl_ip46_x_to_y(True, test_l2_deny, is_ip6,
535                                           is_reflect, add_eh)
536
537     def verify_acl_packet_count(self, acl_idx, packet_count):
538         matches = self.statistics.get_counter('/acl/%d/matches' % acl_idx)
539         self.logger.info("stat seg for ACL %d: %s" % (acl_idx, repr(matches)))
540         total_count = 0
541         for p in matches[0]:
542             total_count = total_count + p['packets']
543         self.assertEqual(total_count, packet_count)
544
545     def run_traffic_ip46_x_to_y(self, bridged_to_routed,
546                                 test_l2_deny, is_ip6,
547                                 is_reflect, is_established, add_eh,
548                                 stateful_icmp=False):
549         self.reset_packet_infos()
550         stream_dict = self.create_stream(self.pg2, self.loop0,
551                                          bridged_to_routed,
552                                          self.pg_if_packet_sizes, is_ip6,
553                                          not is_reflect, is_established,
554                                          add_eh, stateful_icmp)
555         stream = stream_dict['stream']
556
557         tx_if = self.pg0 if bridged_to_routed else self.pg2
558         rx_if = self.pg2 if bridged_to_routed else self.pg0
559
560         tx_if.add_stream(stream)
561         self.pg_enable_capture(self.pg_interfaces)
562         self.pg_start()
563         packet_count = self.get_packet_count_for_if_idx(self.loop0.sw_if_index)
564         rcvd1 = rx_if.get_capture(packet_count)
565         self.verify_capture(self.loop0, self.pg2, rcvd1, bridged_to_routed)
566         return len(stream)
567
568     def run_traffic_ip46_routed_to_bridged(self, test_l2_deny, is_ip6,
569                                            is_reflect, is_established, add_eh,
570                                            stateful_icmp=False):
571         return self.run_traffic_ip46_x_to_y(False, test_l2_deny, is_ip6,
572                                             is_reflect, is_established, add_eh,
573                                             stateful_icmp)
574
575     def run_traffic_ip46_bridged_to_routed(self, test_l2_deny, is_ip6,
576                                            is_reflect, is_established, add_eh,
577                                            stateful_icmp=False):
578         return self.run_traffic_ip46_x_to_y(True, test_l2_deny, is_ip6,
579                                             is_reflect, is_established, add_eh,
580                                             stateful_icmp)
581
582     def run_test_ip46_routed_to_bridged(self, test_l2_deny,
583                                         is_ip6, is_reflect, add_eh):
584         acls = self.apply_acl_ip46_routed_to_bridged(test_l2_deny,
585                                                      is_ip6, is_reflect,
586                                                      add_eh)
587         pkts = self.run_traffic_ip46_routed_to_bridged(test_l2_deny, is_ip6,
588                                                        is_reflect, False,
589                                                        add_eh)
590         self.verify_acl_packet_count(acls['L3'].acl_index, pkts)
591
592     def run_test_ip46_bridged_to_routed(self, test_l2_deny,
593                                         is_ip6, is_reflect, add_eh):
594         acls = self.apply_acl_ip46_bridged_to_routed(test_l2_deny,
595                                                      is_ip6, is_reflect,
596                                                      add_eh)
597         pkts = self.run_traffic_ip46_bridged_to_routed(test_l2_deny, is_ip6,
598                                                        is_reflect, False,
599                                                        add_eh)
600         self.verify_acl_packet_count(acls['L2'].acl_index, pkts)
601
602     def run_test_ip46_routed_to_bridged_and_back(self, test_l2_action,
603                                                  is_ip6, add_eh,
604                                                  stateful_icmp=False):
605         self.apply_acl_ip46_both_directions_reflect(False, test_l2_action,
606                                                     is_ip6, add_eh,
607                                                     stateful_icmp)
608         self.run_traffic_ip46_routed_to_bridged(test_l2_action, is_ip6,
609                                                 True, False, add_eh,
610                                                 stateful_icmp)
611         self.run_traffic_ip46_bridged_to_routed(test_l2_action, is_ip6,
612                                                 False, True, add_eh,
613                                                 stateful_icmp)
614
615     def run_test_ip46_bridged_to_routed_and_back(self, test_l2_action,
616                                                  is_ip6, add_eh,
617                                                  stateful_icmp=False):
618         self.apply_acl_ip46_both_directions_reflect(True, test_l2_action,
619                                                     is_ip6, add_eh,
620                                                     stateful_icmp)
621         self.run_traffic_ip46_bridged_to_routed(test_l2_action, is_ip6,
622                                                 True, False, add_eh,
623                                                 stateful_icmp)
624         self.run_traffic_ip46_routed_to_bridged(test_l2_action, is_ip6,
625                                                 False, True, add_eh,
626                                                 stateful_icmp)
627
628     def test_0000_ip6_irb_1(self):
629         """ ACL plugin prepare"""
630         if not self.vpp_dead:
631             cmd = "set acl-plugin session timeout udp idle 2000"
632             self.logger.info(self.vapi.ppcli(cmd))
633             # uncomment to not skip past the routing header
634             # and watch the EH tests fail
635             # self.logger.info(self.vapi.ppcli(
636             #    "set acl-plugin skip-ipv6-extension-header 43 0"))
637             # uncomment to test the session limit (stateful tests will fail)
638             # self.logger.info(self.vapi.ppcli(
639             #    "set acl-plugin session table max-entries 1"))
640             # new datapath is the default, but just in case
641             # self.logger.info(self.vapi.ppcli(
642             #    "set acl-plugin l2-datapath new"))
643             # If you want to see some tests fail, uncomment the next line
644             # self.logger.info(self.vapi.ppcli(
645             #    "set acl-plugin l2-datapath old"))
646
647     def test_0001_ip6_irb_1(self):
648         """ ACL IPv6 routed -> bridged, L2 ACL deny"""
649         self.run_test_ip46_routed_to_bridged(True, True, False,
650                                              self.WITHOUT_EH)
651
652     def test_0002_ip6_irb_1(self):
653         """ ACL IPv6 routed -> bridged, L3 ACL deny"""
654         self.run_test_ip46_routed_to_bridged(False, True, False,
655                                              self.WITHOUT_EH)
656
657     def test_0003_ip4_irb_1(self):
658         """ ACL IPv4 routed -> bridged, L2 ACL deny"""
659         self.run_test_ip46_routed_to_bridged(True, False, False,
660                                              self.WITHOUT_EH)
661
662     def test_0004_ip4_irb_1(self):
663         """ ACL IPv4 routed -> bridged, L3 ACL deny"""
664         self.run_test_ip46_routed_to_bridged(False, False, False,
665                                              self.WITHOUT_EH)
666
667     def test_0005_ip6_irb_1(self):
668         """ ACL IPv6 bridged -> routed, L2 ACL deny """
669         self.run_test_ip46_bridged_to_routed(True, True, False,
670                                              self.WITHOUT_EH)
671
672     def test_0006_ip6_irb_1(self):
673         """ ACL IPv6 bridged -> routed, L3 ACL deny """
674         self.run_test_ip46_bridged_to_routed(False, True, False,
675                                              self.WITHOUT_EH)
676
677     def test_0007_ip6_irb_1(self):
678         """ ACL IPv4 bridged -> routed, L2 ACL deny """
679         self.run_test_ip46_bridged_to_routed(True, False, False,
680                                              self.WITHOUT_EH)
681
682     def test_0008_ip6_irb_1(self):
683         """ ACL IPv4 bridged -> routed, L3 ACL deny """
684         self.run_test_ip46_bridged_to_routed(False, False, False,
685                                              self.WITHOUT_EH)
686
687     # Stateful ACL tests
688     def test_0101_ip6_irb_1(self):
689         """ ACL IPv6 routed -> bridged, L2 ACL permit+reflect"""
690         self.run_test_ip46_routed_to_bridged_and_back(True, True,
691                                                       self.WITHOUT_EH)
692
693     def test_0102_ip6_irb_1(self):
694         """ ACL IPv6 bridged -> routed, L2 ACL permit+reflect"""
695         self.run_test_ip46_bridged_to_routed_and_back(True, True,
696                                                       self.WITHOUT_EH)
697
698     def test_0103_ip6_irb_1(self):
699         """ ACL IPv4 routed -> bridged, L2 ACL permit+reflect"""
700         self.run_test_ip46_routed_to_bridged_and_back(True, False,
701                                                       self.WITHOUT_EH)
702
703     def test_0104_ip6_irb_1(self):
704         """ ACL IPv4 bridged -> routed, L2 ACL permit+reflect"""
705         self.run_test_ip46_bridged_to_routed_and_back(True, False,
706                                                       self.WITHOUT_EH)
707
708     def test_0111_ip6_irb_1(self):
709         """ ACL IPv6 routed -> bridged, L3 ACL permit+reflect"""
710         self.run_test_ip46_routed_to_bridged_and_back(False, True,
711                                                       self.WITHOUT_EH)
712
713     def test_0112_ip6_irb_1(self):
714         """ ACL IPv6 bridged -> routed, L3 ACL permit+reflect"""
715         self.run_test_ip46_bridged_to_routed_and_back(False, True,
716                                                       self.WITHOUT_EH)
717
718     def test_0113_ip6_irb_1(self):
719         """ ACL IPv4 routed -> bridged, L3 ACL permit+reflect"""
720         self.run_test_ip46_routed_to_bridged_and_back(False, False,
721                                                       self.WITHOUT_EH)
722
723     def test_0114_ip6_irb_1(self):
724         """ ACL IPv4 bridged -> routed, L3 ACL permit+reflect"""
725         self.run_test_ip46_bridged_to_routed_and_back(False, False,
726                                                       self.WITHOUT_EH)
727
728     # A block of tests with extension headers
729
730     def test_1001_ip6_irb_1(self):
731         """ ACL IPv6+EH routed -> bridged, L2 ACL deny"""
732         self.run_test_ip46_routed_to_bridged(True, True, False,
733                                              self.WITH_EH)
734
735     def test_1002_ip6_irb_1(self):
736         """ ACL IPv6+EH routed -> bridged, L3 ACL deny"""
737         self.run_test_ip46_routed_to_bridged(False, True, False,
738                                              self.WITH_EH)
739
740     def test_1005_ip6_irb_1(self):
741         """ ACL IPv6+EH bridged -> routed, L2 ACL deny """
742         self.run_test_ip46_bridged_to_routed(True, True, False,
743                                              self.WITH_EH)
744
745     def test_1006_ip6_irb_1(self):
746         """ ACL IPv6+EH bridged -> routed, L3 ACL deny """
747         self.run_test_ip46_bridged_to_routed(False, True, False,
748                                              self.WITH_EH)
749
750     def test_1101_ip6_irb_1(self):
751         """ ACL IPv6+EH routed -> bridged, L2 ACL permit+reflect"""
752         self.run_test_ip46_routed_to_bridged_and_back(True, True,
753                                                       self.WITH_EH)
754
755     def test_1102_ip6_irb_1(self):
756         """ ACL IPv6+EH bridged -> routed, L2 ACL permit+reflect"""
757         self.run_test_ip46_bridged_to_routed_and_back(True, True,
758                                                       self.WITH_EH)
759
760     def test_1111_ip6_irb_1(self):
761         """ ACL IPv6+EH routed -> bridged, L3 ACL permit+reflect"""
762         self.run_test_ip46_routed_to_bridged_and_back(False, True,
763                                                       self.WITH_EH)
764
765     def test_1112_ip6_irb_1(self):
766         """ ACL IPv6+EH bridged -> routed, L3 ACL permit+reflect"""
767         self.run_test_ip46_bridged_to_routed_and_back(False, True,
768                                                       self.WITH_EH)
769
770     # IPv4 with "MF" bit set
771
772     def test_1201_ip6_irb_1(self):
773         """ ACL IPv4+MF routed -> bridged, L2 ACL deny"""
774         self.run_test_ip46_routed_to_bridged(True, False, False,
775                                              self.WITH_EH)
776
777     def test_1202_ip6_irb_1(self):
778         """ ACL IPv4+MF routed -> bridged, L3 ACL deny"""
779         self.run_test_ip46_routed_to_bridged(False, False, False,
780                                              self.WITH_EH)
781
782     def test_1205_ip6_irb_1(self):
783         """ ACL IPv4+MF bridged -> routed, L2 ACL deny """
784         self.run_test_ip46_bridged_to_routed(True, False, False,
785                                              self.WITH_EH)
786
787     def test_1206_ip6_irb_1(self):
788         """ ACL IPv4+MF bridged -> routed, L3 ACL deny """
789         self.run_test_ip46_bridged_to_routed(False, False, False,
790                                              self.WITH_EH)
791
792     def test_1301_ip6_irb_1(self):
793         """ ACL IPv4+MF routed -> bridged, L2 ACL permit+reflect"""
794         self.run_test_ip46_routed_to_bridged_and_back(True, False,
795                                                       self.WITH_EH)
796
797     def test_1302_ip6_irb_1(self):
798         """ ACL IPv4+MF bridged -> routed, L2 ACL permit+reflect"""
799         self.run_test_ip46_bridged_to_routed_and_back(True, False,
800                                                       self.WITH_EH)
801
802     def test_1311_ip6_irb_1(self):
803         """ ACL IPv4+MF routed -> bridged, L3 ACL permit+reflect"""
804         self.run_test_ip46_routed_to_bridged_and_back(False, False,
805                                                       self.WITH_EH)
806
807     def test_1312_ip6_irb_1(self):
808         """ ACL IPv4+MF bridged -> routed, L3 ACL permit+reflect"""
809         self.run_test_ip46_bridged_to_routed_and_back(False, False,
810                                                       self.WITH_EH)
811     # Stateful ACL tests with stateful ICMP
812
813     def test_1401_ip6_irb_1(self):
814         """ IPv6 routed -> bridged, L2 ACL permit+reflect, ICMP reflect"""
815         self.run_test_ip46_routed_to_bridged_and_back(True, True,
816                                                       self.WITHOUT_EH,
817                                                       self.STATEFUL_ICMP)
818
819     def test_1402_ip6_irb_1(self):
820         """ IPv6 bridged -> routed, L2 ACL permit+reflect, ICMP reflect"""
821         self.run_test_ip46_bridged_to_routed_and_back(True, True,
822                                                       self.WITHOUT_EH,
823                                                       self.STATEFUL_ICMP)
824
825     def test_1403_ip4_irb_1(self):
826         """ IPv4 routed -> bridged, L2 ACL permit+reflect, ICMP reflect"""
827         self.run_test_ip46_routed_to_bridged_and_back(True, False,
828                                                       self.WITHOUT_EH,
829                                                       self.STATEFUL_ICMP)
830
831     def test_1404_ip4_irb_1(self):
832         """ IPv4 bridged -> routed, L2 ACL permit+reflect, ICMP reflect"""
833         self.run_test_ip46_bridged_to_routed_and_back(True, False,
834                                                       self.WITHOUT_EH,
835                                                       self.STATEFUL_ICMP)
836
837     def test_1411_ip6_irb_1(self):
838         """ IPv6 routed -> bridged, L3 ACL permit+reflect, ICMP reflect"""
839         self.run_test_ip46_routed_to_bridged_and_back(False, True,
840                                                       self.WITHOUT_EH,
841                                                       self.STATEFUL_ICMP)
842
843     def test_1412_ip6_irb_1(self):
844         """ IPv6 bridged -> routed, L3 ACL permit+reflect, ICMP reflect"""
845         self.run_test_ip46_bridged_to_routed_and_back(False, True,
846                                                       self.WITHOUT_EH,
847                                                       self.STATEFUL_ICMP)
848
849     def test_1413_ip4_irb_1(self):
850         """ IPv4 routed -> bridged, L3 ACL permit+reflect, ICMP reflect"""
851         self.run_test_ip46_routed_to_bridged_and_back(False, False,
852                                                       self.WITHOUT_EH,
853                                                       self.STATEFUL_ICMP)
854
855     def test_1414_ip4_irb_1(self):
856         """ IPv4 bridged -> routed, L3 ACL permit+reflect, ICMP reflect"""
857         self.run_test_ip46_bridged_to_routed_and_back(False, False,
858                                                       self.WITHOUT_EH,
859                                                       self.STATEFUL_ICMP)
860
861
862 if __name__ == '__main__':
863     unittest.main(testRunner=VppTestRunner)