nat: nat44-ed add session timing out indicator in api
[vpp.git] / test / test_acl_plugin_l2l3.py
1 #!/usr/bin/env python3
2 """ACL IRB Test Case HLD:
3
4 **config**
5     - L2 MAC learning enabled in l2bd
6     - 2 routed interfaces untagged, bvi (Bridge Virtual Interface)
7     - 2 bridged interfaces in l2bd with bvi
8
9 **test**
10     - sending ip4 eth pkts between routed interfaces
11         - 2 routed interfaces
12         - 2 bridged interfaces
13
14     - 64B, 512B, 1518B, 9200B (ether_size)
15
16     - burst of pkts per interface
17         - 257pkts per burst
18         - routed pkts hitting different FIB entries
19         - bridged pkts hitting different MAC entries
20
21 **verify**
22     - all packets received correctly
23
24 """
25
26 import copy
27 import unittest
28 from socket import inet_pton, AF_INET, AF_INET6
29 from random import choice, shuffle
30 from pprint import pprint
31 from ipaddress import ip_network
32
33 import scapy.compat
34 from scapy.packet import Raw
35 from scapy.layers.l2 import Ether
36 from scapy.layers.inet import IP, UDP, ICMP, TCP
37 from scapy.layers.inet6 import IPv6, ICMPv6Unknown, ICMPv6EchoRequest
38 from scapy.layers.inet6 import ICMPv6EchoReply, IPv6ExtHdrRouting
39 from scapy.layers.inet6 import IPv6ExtHdrFragment
40
41 from framework import VppTestCase, VppTestRunner
42 from vpp_l2 import L2_PORT_TYPE
43 import time
44
45 from vpp_acl import AclRule, VppAcl, VppAclInterface
46
47
48 class TestACLpluginL2L3(VppTestCase):
49     """TestACLpluginL2L3 Test Case"""
50
51     @classmethod
52     def setUpClass(cls):
53         """
54         #. Create BD with MAC learning enabled and put interfaces to this BD.
55         #. Configure IPv4 addresses on loopback interface and routed interface.
56         #. Configure MAC address binding to IPv4 neighbors on loop0.
57         #. Configure MAC address on pg2.
58         #. Loopback BVI interface has remote hosts, one half of hosts are
59            behind pg0 second behind pg1.
60         """
61         super(TestACLpluginL2L3, cls).setUpClass()
62
63         cls.pg_if_packet_sizes = [64, 512, 1518, 9018]  # packet sizes
64         cls.bd_id = 10
65         cls.remote_hosts_count = 250
66
67         # create 3 pg interfaces, 1 loopback interface
68         cls.create_pg_interfaces(range(3))
69         cls.create_loopback_interfaces(1)
70
71         cls.interfaces = list(cls.pg_interfaces)
72         cls.interfaces.extend(cls.lo_interfaces)
73
74         for i in cls.interfaces:
75             i.admin_up()
76
77         # Create BD with MAC learning enabled and put interfaces to this BD
78         cls.vapi.sw_interface_set_l2_bridge(
79             rx_sw_if_index=cls.loop0.sw_if_index, bd_id=cls.bd_id,
80             port_type=L2_PORT_TYPE.BVI)
81         cls.vapi.sw_interface_set_l2_bridge(rx_sw_if_index=cls.pg0.sw_if_index,
82                                             bd_id=cls.bd_id)
83         cls.vapi.sw_interface_set_l2_bridge(rx_sw_if_index=cls.pg1.sw_if_index,
84                                             bd_id=cls.bd_id)
85
86         # Configure IPv4 addresses on loopback interface and routed interface
87         cls.loop0.config_ip4()
88         cls.loop0.config_ip6()
89         cls.pg2.config_ip4()
90         cls.pg2.config_ip6()
91
92         # Configure MAC address binding to IPv4 neighbors on loop0
93         cls.loop0.generate_remote_hosts(cls.remote_hosts_count)
94         cls.loop0.configure_ipv4_neighbors()
95         cls.loop0.configure_ipv6_neighbors()
96         # configure MAC address on pg2
97         cls.pg2.resolve_arp()
98         cls.pg2.resolve_ndp()
99
100         cls.WITHOUT_EH = False
101         cls.WITH_EH = True
102         cls.STATELESS_ICMP = False
103         cls.STATEFUL_ICMP = True
104
105         # Loopback BVI interface has remote hosts, one half of hosts are behind
106         # pg0 second behind pg1
107         half = cls.remote_hosts_count // 2
108         cls.pg0.remote_hosts = cls.loop0.remote_hosts[:half]
109         cls.pg1.remote_hosts = cls.loop0.remote_hosts[half:]
110         reply = cls.vapi.papi.acl_stats_intf_counters_enable(enable=1)
111
112     @classmethod
113     def tearDownClass(cls):
114         reply = cls.vapi.papi.acl_stats_intf_counters_enable(enable=0)
115         super(TestACLpluginL2L3, cls).tearDownClass()
116
117     def tearDown(self):
118         """Run standard test teardown and log ``show l2patch``,
119         ``show l2fib verbose``,``show bridge-domain <bd_id> detail``,
120         ``show ip neighbors``.
121         """
122         super(TestACLpluginL2L3, self).tearDown()
123
124     def show_commands_at_teardown(self):
125         self.logger.info(self.vapi.cli("show l2patch"))
126         self.logger.info(self.vapi.cli("show classify tables"))
127         self.logger.info(self.vapi.cli("show l2fib verbose"))
128         self.logger.info(self.vapi.cli("show bridge-domain %s detail" %
129                                        self.bd_id))
130         self.logger.info(self.vapi.cli("show ip neighbors"))
131         cmd = "show acl-plugin sessions verbose 1"
132         self.logger.info(self.vapi.cli(cmd))
133         self.logger.info(self.vapi.cli("show acl-plugin acl"))
134         self.logger.info(self.vapi.cli("show acl-plugin interface"))
135         self.logger.info(self.vapi.cli("show acl-plugin tables"))
136
137     def create_stream(self, src_ip_if, dst_ip_if, reverse, packet_sizes,
138                       is_ip6, expect_blocked, expect_established,
139                       add_extension_header, icmp_stateful=False):
140         pkts = []
141         rules = []
142         permit_rules = []
143         permit_and_reflect_rules = []
144         total_packet_count = 8
145         for i in range(0, total_packet_count):
146             modulo = (i//2) % 2
147             icmp_type_delta = i % 2
148             icmp_code = i
149             is_udp_packet = (modulo == 0)
150             if is_udp_packet and icmp_stateful:
151                 continue
152             is_reflectable_icmp = (icmp_stateful and icmp_type_delta == 0 and
153                                    not is_udp_packet)
154             is_reflected_icmp = is_reflectable_icmp and expect_established
155             can_reflect_this_packet = is_udp_packet or is_reflectable_icmp
156             is_permit = i % 2
157             remote_dst_index = i % len(dst_ip_if.remote_hosts)
158             remote_dst_host = dst_ip_if.remote_hosts[remote_dst_index]
159             if is_permit == 1:
160                 info = self.create_packet_info(src_ip_if, dst_ip_if)
161                 payload = self.info_to_payload(info)
162             else:
163                 to_be_blocked = False
164                 if (expect_blocked and not expect_established):
165                     to_be_blocked = True
166                 if (not can_reflect_this_packet):
167                     to_be_blocked = True
168                 if to_be_blocked:
169                     payload = "to be blocked"
170                 else:
171                     info = self.create_packet_info(src_ip_if, dst_ip_if)
172                     payload = self.info_to_payload(info)
173             if reverse:
174                 dst_mac = 'de:ad:00:00:00:00'
175                 src_mac = remote_dst_host._mac
176                 dst_ip6 = src_ip_if.remote_ip6
177                 src_ip6 = remote_dst_host.ip6
178                 dst_ip4 = src_ip_if.remote_ip4
179                 src_ip4 = remote_dst_host.ip4
180                 dst_l4 = 1234 + i
181                 src_l4 = 4321 + i
182             else:
183                 dst_mac = src_ip_if.local_mac
184                 src_mac = src_ip_if.remote_mac
185                 src_ip6 = src_ip_if.remote_ip6
186                 dst_ip6 = remote_dst_host.ip6
187                 src_ip4 = src_ip_if.remote_ip4
188                 dst_ip4 = remote_dst_host.ip4
189                 src_l4 = 1234 + i
190                 dst_l4 = 4321 + i
191             if is_reflected_icmp:
192                 icmp_type_delta = 1
193
194             # default ULP should be something we do not use in tests
195             ulp_l4 = TCP(sport=src_l4, dport=dst_l4)
196             # potentially a chain of protocols leading to ULP
197             ulp = ulp_l4
198
199             if is_udp_packet:
200                 if is_ip6:
201                     ulp_l4 = UDP(sport=src_l4, dport=dst_l4)
202                     if add_extension_header:
203                         # prepend some extension headers
204                         ulp = (IPv6ExtHdrRouting() / IPv6ExtHdrRouting() /
205                                IPv6ExtHdrFragment(offset=0, m=1) / ulp_l4)
206                         # uncomment below to test invalid ones
207                         # ulp = IPv6ExtHdrRouting(len = 200) / ulp_l4
208                     else:
209                         ulp = ulp_l4
210                     p = (Ether(dst=dst_mac, src=src_mac) /
211                          IPv6(src=src_ip6, dst=dst_ip6) /
212                          ulp /
213                          Raw(payload))
214                 else:
215                     ulp_l4 = UDP(sport=src_l4, dport=dst_l4)
216                     # IPv4 does not allow extension headers,
217                     # but we rather make it a first fragment
218                     flags = 1 if add_extension_header else 0
219                     ulp = ulp_l4
220                     p = (Ether(dst=dst_mac, src=src_mac) /
221                          IP(src=src_ip4, dst=dst_ip4, frag=0, flags=flags) /
222                          ulp /
223                          Raw(payload))
224             elif modulo == 1:
225                 if is_ip6:
226                     ulp_l4 = ICMPv6Unknown(type=128 + icmp_type_delta,
227                                            code=icmp_code)
228                     ulp = ulp_l4
229                     p = (Ether(dst=dst_mac, src=src_mac) /
230                          IPv6(src=src_ip6, dst=dst_ip6) /
231                          ulp /
232                          Raw(payload))
233                 else:
234                     ulp_l4 = ICMP(type=8 - 8*icmp_type_delta, code=icmp_code)
235                     ulp = ulp_l4
236                     p = (Ether(dst=dst_mac, src=src_mac) /
237                          IP(src=src_ip4, dst=dst_ip4) /
238                          ulp /
239                          Raw(payload))
240
241             if i % 2 == 1:
242                 info.data = p.copy()
243             size = packet_sizes[(i // 2) % len(packet_sizes)]
244             self.extend_packet(p, size)
245             pkts.append(p)
246
247             rule_family = AF_INET6 if p.haslayer(IPv6) else AF_INET
248             rule_prefix_len = 128 if p.haslayer(IPv6) else 32
249             rule_l3_layer = IPv6 if p.haslayer(IPv6) else IP
250
251             if p.haslayer(UDP):
252                 rule_l4_sport = p[UDP].sport
253                 rule_l4_dport = p[UDP].dport
254             else:
255                 if p.haslayer(ICMP):
256                     rule_l4_sport = p[ICMP].type
257                     rule_l4_dport = p[ICMP].code
258                 else:
259                     rule_l4_sport = p[ICMPv6Unknown].type
260                     rule_l4_dport = p[ICMPv6Unknown].code
261             if p.haslayer(IPv6):
262                 rule_l4_proto = ulp_l4.overload_fields[IPv6]['nh']
263             else:
264                 rule_l4_proto = p[IP].proto
265
266             new_rule = AclRule(is_permit=is_permit, proto=rule_l4_proto,
267                                src_prefix=ip_network(
268                                    (p[rule_l3_layer].src, rule_prefix_len)),
269                                dst_prefix=ip_network(
270                                    (p[rule_l3_layer].dst, rule_prefix_len)),
271                                sport_from=rule_l4_sport,
272                                sport_to=rule_l4_sport,
273                                dport_from=rule_l4_dport,
274                                dport_to=rule_l4_dport)
275
276             rules.append(new_rule)
277             new_rule_permit = copy.copy(new_rule)
278             new_rule_permit.is_permit = 1
279             permit_rules.append(new_rule_permit)
280
281             new_rule_permit_and_reflect = copy.copy(new_rule)
282             if can_reflect_this_packet:
283                 new_rule_permit_and_reflect.is_permit = 2
284             else:
285                 new_rule_permit_and_reflect.is_permit = is_permit
286
287             permit_and_reflect_rules.append(new_rule_permit_and_reflect)
288             self.logger.info("create_stream pkt#%d: %s" % (i, payload))
289
290         return {'stream': pkts,
291                 'rules': rules,
292                 'permit_rules': permit_rules,
293                 'permit_and_reflect_rules': permit_and_reflect_rules}
294
295     def verify_capture(self, dst_ip_if, src_ip_if, capture, reverse):
296         last_info = dict()
297         for i in self.interfaces:
298             last_info[i.sw_if_index] = None
299
300         dst_ip_sw_if_index = dst_ip_if.sw_if_index
301
302         for packet in capture:
303             l3 = IP if packet.haslayer(IP) else IPv6
304             ip = packet[l3]
305             if packet.haslayer(UDP):
306                 l4 = UDP
307             else:
308                 if packet.haslayer(ICMP):
309                     l4 = ICMP
310                 else:
311                     l4 = ICMPv6Unknown
312
313             # Scapy IPv6 stuff is too smart for its own good.
314             # So we do this and coerce the ICMP into unknown type
315             if packet.haslayer(UDP):
316                 data = scapy.compat.raw(packet[UDP][Raw])
317             else:
318                 if l3 == IP:
319                     data = scapy.compat.raw(ICMP(
320                         scapy.compat.raw(packet[l3].payload))[Raw])
321                 else:
322                     data = scapy.compat.raw(ICMPv6Unknown(
323                         scapy.compat.raw(packet[l3].payload)).msgbody)
324             udp_or_icmp = packet[l3].payload
325             data_obj = Raw(data)
326             # FIXME: make framework believe we are on object
327             payload_info = self.payload_to_info(data_obj)
328             packet_index = payload_info.index
329
330             self.assertEqual(payload_info.dst, dst_ip_sw_if_index)
331
332             next_info = self.get_next_packet_info_for_interface2(
333                 payload_info.src, dst_ip_sw_if_index,
334                 last_info[payload_info.src])
335             last_info[payload_info.src] = next_info
336             self.assertTrue(next_info is not None)
337             self.assertEqual(packet_index, next_info.index)
338             saved_packet = next_info.data
339             self.assertTrue(next_info is not None)
340
341             # MAC: src, dst
342             if not reverse:
343                 self.assertEqual(packet.src, dst_ip_if.local_mac)
344                 host = dst_ip_if.host_by_mac(packet.dst)
345
346             # IP: src, dst
347             # self.assertEqual(ip.src, src_ip_if.remote_ip4)
348             if saved_packet is not None:
349                 self.assertEqual(ip.src, saved_packet[l3].src)
350                 self.assertEqual(ip.dst, saved_packet[l3].dst)
351                 if l4 == UDP:
352                     self.assertEqual(udp_or_icmp.sport, saved_packet[l4].sport)
353                     self.assertEqual(udp_or_icmp.dport, saved_packet[l4].dport)
354             # self.assertEqual(ip.dst, host.ip4)
355
356             # UDP:
357
358     def applied_acl_shuffle(self, acl_if):
359         saved_n_input = acl_if.n_input
360         # TOTO: maybe copy each one??
361         saved_acls = acl_if.acls
362
363         # now create a list of all the rules in all ACLs
364         all_rules = []
365         for old_acl in saved_acls:
366             for rule in old_acl.rules:
367                 all_rules.append(rule)
368
369         # Add a few ACLs made from shuffled rules
370         shuffle(all_rules)
371         acl1 = VppAcl(self, rules=all_rules[::2], tag="shuffle 1. acl")
372         acl1.add_vpp_config()
373
374         shuffle(all_rules)
375         acl2 = VppAcl(self, rules=all_rules[::3], tag="shuffle 2. acl")
376         acl2.add_vpp_config()
377
378         shuffle(all_rules)
379         acl3 = VppAcl(self, rules=all_rules[::2], tag="shuffle 3. acl")
380         acl3.add_vpp_config()
381
382         # apply the shuffle ACLs in front
383         input_acls = [acl1, acl2]
384         output_acls = [acl1, acl2]
385
386         # add the currently applied ACLs
387         n_input = acl_if.n_input
388         input_acls.extend(saved_acls[:n_input])
389         output_acls.extend(saved_acls[n_input:])
390
391         # and the trailing shuffle ACL(s)
392         input_acls.extend([acl3])
393         output_acls.extend([acl3])
394
395         # set the interface ACL list to the result
396         acl_if.n_input = len(input_acls)
397         acl_if.acls = input_acls + output_acls
398         acl_if.add_vpp_config()
399
400         # change the ACLs a few times
401         for i in range(1, 10):
402             shuffle(all_rules)
403             acl1.modify_vpp_config(all_rules[::1+(i % 2)])
404
405             shuffle(all_rules)
406             acl2.modify_vpp_config(all_rules[::1+(i % 3)])
407
408             shuffle(all_rules)
409             acl3.modify_vpp_config(all_rules[::1+(i % 5)])
410
411         # restore to how it was before and clean up
412         acl_if.n_input = saved_n_input
413         acl_if.acls = saved_acls
414         acl_if.add_vpp_config()
415
416         acl1.remove_vpp_config()
417         acl2.remove_vpp_config()
418         acl3.remove_vpp_config()
419
420     def create_acls_for_a_stream(self, stream_dict,
421                                  test_l2_action, is_reflect):
422         r = stream_dict['rules']
423         r_permit = stream_dict['permit_rules']
424         r_permit_reflect = stream_dict['permit_and_reflect_rules']
425         r_action = r_permit_reflect if is_reflect else r
426         action_acl = VppAcl(self, rules=r_action, tag="act. acl")
427         action_acl.add_vpp_config()
428         permit_acl = VppAcl(self, rules=r_permit, tag="perm. acl")
429         permit_acl.add_vpp_config()
430
431         return {'L2': action_acl if test_l2_action else permit_acl,
432                 'L3': permit_acl if test_l2_action else action_acl,
433                 'permit': permit_acl, 'action': action_acl}
434
435     def apply_acl_ip46_x_to_y(self, bridged_to_routed, test_l2_deny,
436                               is_ip6, is_reflect, add_eh):
437         """ Apply the ACLs
438         """
439         self.reset_packet_infos()
440         stream_dict = self.create_stream(
441             self.pg2, self.loop0,
442             bridged_to_routed,
443             self.pg_if_packet_sizes, is_ip6,
444             not is_reflect, False, add_eh)
445         stream = stream_dict['stream']
446         acl_idx = self.create_acls_for_a_stream(stream_dict, test_l2_deny,
447                                                 is_reflect)
448         n_input_l3 = 0 if bridged_to_routed else 1
449         n_input_l2 = 1 if bridged_to_routed else 0
450
451         acl_if_pg2 = VppAclInterface(self, sw_if_index=self.pg2.sw_if_index,
452                                      n_input=n_input_l3, acls=[acl_idx['L3']])
453         acl_if_pg2.add_vpp_config()
454
455         acl_if_pg0 = VppAclInterface(self, sw_if_index=self.pg0.sw_if_index,
456                                      n_input=n_input_l2, acls=[acl_idx['L2']])
457         acl_if_pg0.add_vpp_config()
458
459         acl_if_pg1 = VppAclInterface(self, sw_if_index=self.pg1.sw_if_index,
460                                      n_input=n_input_l2, acls=[acl_idx['L2']])
461         acl_if_pg1.add_vpp_config()
462
463         self.applied_acl_shuffle(acl_if_pg0)
464         self.applied_acl_shuffle(acl_if_pg1)
465         return {'L2': acl_idx['L2'], 'L3': acl_idx['L3']}
466
467     def apply_acl_ip46_both_directions_reflect(self,
468                                                primary_is_bridged_to_routed,
469                                                reflect_on_l2, is_ip6, add_eh,
470                                                stateful_icmp):
471         primary_is_routed_to_bridged = not primary_is_bridged_to_routed
472         self.reset_packet_infos()
473         stream_dict_fwd = self.create_stream(self.pg2, self.loop0,
474                                              primary_is_bridged_to_routed,
475                                              self.pg_if_packet_sizes, is_ip6,
476                                              False, False, add_eh,
477                                              stateful_icmp)
478         acl_idx_fwd = self.create_acls_for_a_stream(stream_dict_fwd,
479                                                     reflect_on_l2, True)
480
481         stream_dict_rev = self.create_stream(self.pg2, self.loop0,
482                                              not primary_is_bridged_to_routed,
483                                              self.pg_if_packet_sizes, is_ip6,
484                                              True, True, add_eh, stateful_icmp)
485         # We want the primary action to be "deny" rather than reflect
486         acl_idx_rev = self.create_acls_for_a_stream(stream_dict_rev,
487                                                     reflect_on_l2, False)
488
489         if primary_is_bridged_to_routed:
490             inbound_l2_acl = acl_idx_fwd['L2']
491         else:
492             inbound_l2_acl = acl_idx_rev['L2']
493
494         if primary_is_routed_to_bridged:
495             outbound_l2_acl = acl_idx_fwd['L2']
496         else:
497             outbound_l2_acl = acl_idx_rev['L2']
498
499         if primary_is_routed_to_bridged:
500             inbound_l3_acl = acl_idx_fwd['L3']
501         else:
502             inbound_l3_acl = acl_idx_rev['L3']
503
504         if primary_is_bridged_to_routed:
505             outbound_l3_acl = acl_idx_fwd['L3']
506         else:
507             outbound_l3_acl = acl_idx_rev['L3']
508
509         acl_if_pg2 = VppAclInterface(self, sw_if_index=self.pg2.sw_if_index,
510                                      n_input=1,
511                                      acls=[inbound_l3_acl, outbound_l3_acl])
512         acl_if_pg2.add_vpp_config()
513
514         acl_if_pg0 = VppAclInterface(self, sw_if_index=self.pg0.sw_if_index,
515                                      n_input=1,
516                                      acls=[inbound_l2_acl, outbound_l2_acl])
517         acl_if_pg0.add_vpp_config()
518
519         acl_if_pg1 = VppAclInterface(self, sw_if_index=self.pg1.sw_if_index,
520                                      n_input=1,
521                                      acls=[inbound_l2_acl, outbound_l2_acl])
522         acl_if_pg1.add_vpp_config()
523
524         self.applied_acl_shuffle(acl_if_pg0)
525         self.applied_acl_shuffle(acl_if_pg2)
526
527     def apply_acl_ip46_routed_to_bridged(self, test_l2_deny, is_ip6,
528                                          is_reflect, add_eh):
529         return self.apply_acl_ip46_x_to_y(False, test_l2_deny, is_ip6,
530                                           is_reflect, add_eh)
531
532     def apply_acl_ip46_bridged_to_routed(self, test_l2_deny, is_ip6,
533                                          is_reflect, add_eh):
534         return self.apply_acl_ip46_x_to_y(True, test_l2_deny, is_ip6,
535                                           is_reflect, add_eh)
536
537     def verify_acl_packet_count(self, acl_idx, packet_count):
538         matches = self.statistics.get_counter('/acl/%d/matches' % acl_idx)
539         self.logger.info("stat seg for ACL %d: %s" % (acl_idx, repr(matches)))
540         total_count = 0
541         for m in matches:
542             for p in m:
543                 total_count = total_count + p['packets']
544         self.assertEqual(total_count, packet_count)
545
546     def run_traffic_ip46_x_to_y(self, bridged_to_routed,
547                                 test_l2_deny, is_ip6,
548                                 is_reflect, is_established, add_eh,
549                                 stateful_icmp=False):
550         self.reset_packet_infos()
551         stream_dict = self.create_stream(self.pg2, self.loop0,
552                                          bridged_to_routed,
553                                          self.pg_if_packet_sizes, is_ip6,
554                                          not is_reflect, is_established,
555                                          add_eh, stateful_icmp)
556         stream = stream_dict['stream']
557
558         tx_if = self.pg0 if bridged_to_routed else self.pg2
559         rx_if = self.pg2 if bridged_to_routed else self.pg0
560
561         tx_if.add_stream(stream)
562         self.pg_enable_capture(self.pg_interfaces)
563         self.pg_start()
564         packet_count = self.get_packet_count_for_if_idx(self.loop0.sw_if_index)
565         rcvd1 = rx_if.get_capture(packet_count)
566         self.verify_capture(self.loop0, self.pg2, rcvd1, bridged_to_routed)
567         return len(stream)
568
569     def run_traffic_ip46_routed_to_bridged(self, test_l2_deny, is_ip6,
570                                            is_reflect, is_established, add_eh,
571                                            stateful_icmp=False):
572         return self.run_traffic_ip46_x_to_y(False, test_l2_deny, is_ip6,
573                                             is_reflect, is_established, add_eh,
574                                             stateful_icmp)
575
576     def run_traffic_ip46_bridged_to_routed(self, test_l2_deny, is_ip6,
577                                            is_reflect, is_established, add_eh,
578                                            stateful_icmp=False):
579         return self.run_traffic_ip46_x_to_y(True, test_l2_deny, is_ip6,
580                                             is_reflect, is_established, add_eh,
581                                             stateful_icmp)
582
583     def run_test_ip46_routed_to_bridged(self, test_l2_deny,
584                                         is_ip6, is_reflect, add_eh):
585         acls = self.apply_acl_ip46_routed_to_bridged(test_l2_deny,
586                                                      is_ip6, is_reflect,
587                                                      add_eh)
588         pkts = self.run_traffic_ip46_routed_to_bridged(test_l2_deny, is_ip6,
589                                                        is_reflect, False,
590                                                        add_eh)
591         self.verify_acl_packet_count(acls['L3'].acl_index, pkts)
592
593     def run_test_ip46_bridged_to_routed(self, test_l2_deny,
594                                         is_ip6, is_reflect, add_eh):
595         acls = self.apply_acl_ip46_bridged_to_routed(test_l2_deny,
596                                                      is_ip6, is_reflect,
597                                                      add_eh)
598         pkts = self.run_traffic_ip46_bridged_to_routed(test_l2_deny, is_ip6,
599                                                        is_reflect, False,
600                                                        add_eh)
601         self.verify_acl_packet_count(acls['L2'].acl_index, pkts)
602
603     def run_test_ip46_routed_to_bridged_and_back(self, test_l2_action,
604                                                  is_ip6, add_eh,
605                                                  stateful_icmp=False):
606         self.apply_acl_ip46_both_directions_reflect(False, test_l2_action,
607                                                     is_ip6, add_eh,
608                                                     stateful_icmp)
609         self.run_traffic_ip46_routed_to_bridged(test_l2_action, is_ip6,
610                                                 True, False, add_eh,
611                                                 stateful_icmp)
612         self.run_traffic_ip46_bridged_to_routed(test_l2_action, is_ip6,
613                                                 False, True, add_eh,
614                                                 stateful_icmp)
615
616     def run_test_ip46_bridged_to_routed_and_back(self, test_l2_action,
617                                                  is_ip6, add_eh,
618                                                  stateful_icmp=False):
619         self.apply_acl_ip46_both_directions_reflect(True, test_l2_action,
620                                                     is_ip6, add_eh,
621                                                     stateful_icmp)
622         self.run_traffic_ip46_bridged_to_routed(test_l2_action, is_ip6,
623                                                 True, False, add_eh,
624                                                 stateful_icmp)
625         self.run_traffic_ip46_routed_to_bridged(test_l2_action, is_ip6,
626                                                 False, True, add_eh,
627                                                 stateful_icmp)
628
629     def test_0000_ip6_irb_1(self):
630         """ ACL plugin prepare"""
631         if not self.vpp_dead:
632             cmd = "set acl-plugin session timeout udp idle 2000"
633             self.logger.info(self.vapi.ppcli(cmd))
634             # uncomment to not skip past the routing header
635             # and watch the EH tests fail
636             # self.logger.info(self.vapi.ppcli(
637             #    "set acl-plugin skip-ipv6-extension-header 43 0"))
638             # uncomment to test the session limit (stateful tests will fail)
639             # self.logger.info(self.vapi.ppcli(
640             #    "set acl-plugin session table max-entries 1"))
641             # new datapath is the default, but just in case
642             # self.logger.info(self.vapi.ppcli(
643             #    "set acl-plugin l2-datapath new"))
644             # If you want to see some tests fail, uncomment the next line
645             # self.logger.info(self.vapi.ppcli(
646             #    "set acl-plugin l2-datapath old"))
647
648     def test_0001_ip6_irb_1(self):
649         """ ACL IPv6 routed -> bridged, L2 ACL deny"""
650         self.run_test_ip46_routed_to_bridged(True, True, False,
651                                              self.WITHOUT_EH)
652
653     def test_0002_ip6_irb_1(self):
654         """ ACL IPv6 routed -> bridged, L3 ACL deny"""
655         self.run_test_ip46_routed_to_bridged(False, True, False,
656                                              self.WITHOUT_EH)
657
658     def test_0003_ip4_irb_1(self):
659         """ ACL IPv4 routed -> bridged, L2 ACL deny"""
660         self.run_test_ip46_routed_to_bridged(True, False, False,
661                                              self.WITHOUT_EH)
662
663     def test_0004_ip4_irb_1(self):
664         """ ACL IPv4 routed -> bridged, L3 ACL deny"""
665         self.run_test_ip46_routed_to_bridged(False, False, False,
666                                              self.WITHOUT_EH)
667
668     def test_0005_ip6_irb_1(self):
669         """ ACL IPv6 bridged -> routed, L2 ACL deny """
670         self.run_test_ip46_bridged_to_routed(True, True, False,
671                                              self.WITHOUT_EH)
672
673     def test_0006_ip6_irb_1(self):
674         """ ACL IPv6 bridged -> routed, L3 ACL deny """
675         self.run_test_ip46_bridged_to_routed(False, True, False,
676                                              self.WITHOUT_EH)
677
678     def test_0007_ip6_irb_1(self):
679         """ ACL IPv4 bridged -> routed, L2 ACL deny """
680         self.run_test_ip46_bridged_to_routed(True, False, False,
681                                              self.WITHOUT_EH)
682
683     def test_0008_ip6_irb_1(self):
684         """ ACL IPv4 bridged -> routed, L3 ACL deny """
685         self.run_test_ip46_bridged_to_routed(False, False, False,
686                                              self.WITHOUT_EH)
687
688     # Stateful ACL tests
689     def test_0101_ip6_irb_1(self):
690         """ ACL IPv6 routed -> bridged, L2 ACL permit+reflect"""
691         self.run_test_ip46_routed_to_bridged_and_back(True, True,
692                                                       self.WITHOUT_EH)
693
694     def test_0102_ip6_irb_1(self):
695         """ ACL IPv6 bridged -> routed, L2 ACL permit+reflect"""
696         self.run_test_ip46_bridged_to_routed_and_back(True, True,
697                                                       self.WITHOUT_EH)
698
699     def test_0103_ip6_irb_1(self):
700         """ ACL IPv4 routed -> bridged, L2 ACL permit+reflect"""
701         self.run_test_ip46_routed_to_bridged_and_back(True, False,
702                                                       self.WITHOUT_EH)
703
704     def test_0104_ip6_irb_1(self):
705         """ ACL IPv4 bridged -> routed, L2 ACL permit+reflect"""
706         self.run_test_ip46_bridged_to_routed_and_back(True, False,
707                                                       self.WITHOUT_EH)
708
709     def test_0111_ip6_irb_1(self):
710         """ ACL IPv6 routed -> bridged, L3 ACL permit+reflect"""
711         self.run_test_ip46_routed_to_bridged_and_back(False, True,
712                                                       self.WITHOUT_EH)
713
714     def test_0112_ip6_irb_1(self):
715         """ ACL IPv6 bridged -> routed, L3 ACL permit+reflect"""
716         self.run_test_ip46_bridged_to_routed_and_back(False, True,
717                                                       self.WITHOUT_EH)
718
719     def test_0113_ip6_irb_1(self):
720         """ ACL IPv4 routed -> bridged, L3 ACL permit+reflect"""
721         self.run_test_ip46_routed_to_bridged_and_back(False, False,
722                                                       self.WITHOUT_EH)
723
724     def test_0114_ip6_irb_1(self):
725         """ ACL IPv4 bridged -> routed, L3 ACL permit+reflect"""
726         self.run_test_ip46_bridged_to_routed_and_back(False, False,
727                                                       self.WITHOUT_EH)
728
729     # A block of tests with extension headers
730
731     def test_1001_ip6_irb_1(self):
732         """ ACL IPv6+EH routed -> bridged, L2 ACL deny"""
733         self.run_test_ip46_routed_to_bridged(True, True, False,
734                                              self.WITH_EH)
735
736     def test_1002_ip6_irb_1(self):
737         """ ACL IPv6+EH routed -> bridged, L3 ACL deny"""
738         self.run_test_ip46_routed_to_bridged(False, True, False,
739                                              self.WITH_EH)
740
741     def test_1005_ip6_irb_1(self):
742         """ ACL IPv6+EH bridged -> routed, L2 ACL deny """
743         self.run_test_ip46_bridged_to_routed(True, True, False,
744                                              self.WITH_EH)
745
746     def test_1006_ip6_irb_1(self):
747         """ ACL IPv6+EH bridged -> routed, L3 ACL deny """
748         self.run_test_ip46_bridged_to_routed(False, True, False,
749                                              self.WITH_EH)
750
751     def test_1101_ip6_irb_1(self):
752         """ ACL IPv6+EH routed -> bridged, L2 ACL permit+reflect"""
753         self.run_test_ip46_routed_to_bridged_and_back(True, True,
754                                                       self.WITH_EH)
755
756     def test_1102_ip6_irb_1(self):
757         """ ACL IPv6+EH bridged -> routed, L2 ACL permit+reflect"""
758         self.run_test_ip46_bridged_to_routed_and_back(True, True,
759                                                       self.WITH_EH)
760
761     def test_1111_ip6_irb_1(self):
762         """ ACL IPv6+EH routed -> bridged, L3 ACL permit+reflect"""
763         self.run_test_ip46_routed_to_bridged_and_back(False, True,
764                                                       self.WITH_EH)
765
766     def test_1112_ip6_irb_1(self):
767         """ ACL IPv6+EH bridged -> routed, L3 ACL permit+reflect"""
768         self.run_test_ip46_bridged_to_routed_and_back(False, True,
769                                                       self.WITH_EH)
770
771     # IPv4 with "MF" bit set
772
773     def test_1201_ip6_irb_1(self):
774         """ ACL IPv4+MF routed -> bridged, L2 ACL deny"""
775         self.run_test_ip46_routed_to_bridged(True, False, False,
776                                              self.WITH_EH)
777
778     def test_1202_ip6_irb_1(self):
779         """ ACL IPv4+MF routed -> bridged, L3 ACL deny"""
780         self.run_test_ip46_routed_to_bridged(False, False, False,
781                                              self.WITH_EH)
782
783     def test_1205_ip6_irb_1(self):
784         """ ACL IPv4+MF bridged -> routed, L2 ACL deny """
785         self.run_test_ip46_bridged_to_routed(True, False, False,
786                                              self.WITH_EH)
787
788     def test_1206_ip6_irb_1(self):
789         """ ACL IPv4+MF bridged -> routed, L3 ACL deny """
790         self.run_test_ip46_bridged_to_routed(False, False, False,
791                                              self.WITH_EH)
792
793     def test_1301_ip6_irb_1(self):
794         """ ACL IPv4+MF routed -> bridged, L2 ACL permit+reflect"""
795         self.run_test_ip46_routed_to_bridged_and_back(True, False,
796                                                       self.WITH_EH)
797
798     def test_1302_ip6_irb_1(self):
799         """ ACL IPv4+MF bridged -> routed, L2 ACL permit+reflect"""
800         self.run_test_ip46_bridged_to_routed_and_back(True, False,
801                                                       self.WITH_EH)
802
803     def test_1311_ip6_irb_1(self):
804         """ ACL IPv4+MF routed -> bridged, L3 ACL permit+reflect"""
805         self.run_test_ip46_routed_to_bridged_and_back(False, False,
806                                                       self.WITH_EH)
807
808     def test_1312_ip6_irb_1(self):
809         """ ACL IPv4+MF bridged -> routed, L3 ACL permit+reflect"""
810         self.run_test_ip46_bridged_to_routed_and_back(False, False,
811                                                       self.WITH_EH)
812     # Stateful ACL tests with stateful ICMP
813
814     def test_1401_ip6_irb_1(self):
815         """ IPv6 routed -> bridged, L2 ACL permit+reflect, ICMP reflect"""
816         self.run_test_ip46_routed_to_bridged_and_back(True, True,
817                                                       self.WITHOUT_EH,
818                                                       self.STATEFUL_ICMP)
819
820     def test_1402_ip6_irb_1(self):
821         """ IPv6 bridged -> routed, L2 ACL permit+reflect, ICMP reflect"""
822         self.run_test_ip46_bridged_to_routed_and_back(True, True,
823                                                       self.WITHOUT_EH,
824                                                       self.STATEFUL_ICMP)
825
826     def test_1403_ip4_irb_1(self):
827         """ IPv4 routed -> bridged, L2 ACL permit+reflect, ICMP reflect"""
828         self.run_test_ip46_routed_to_bridged_and_back(True, False,
829                                                       self.WITHOUT_EH,
830                                                       self.STATEFUL_ICMP)
831
832     def test_1404_ip4_irb_1(self):
833         """ IPv4 bridged -> routed, L2 ACL permit+reflect, ICMP reflect"""
834         self.run_test_ip46_bridged_to_routed_and_back(True, False,
835                                                       self.WITHOUT_EH,
836                                                       self.STATEFUL_ICMP)
837
838     def test_1411_ip6_irb_1(self):
839         """ IPv6 routed -> bridged, L3 ACL permit+reflect, ICMP reflect"""
840         self.run_test_ip46_routed_to_bridged_and_back(False, True,
841                                                       self.WITHOUT_EH,
842                                                       self.STATEFUL_ICMP)
843
844     def test_1412_ip6_irb_1(self):
845         """ IPv6 bridged -> routed, L3 ACL permit+reflect, ICMP reflect"""
846         self.run_test_ip46_bridged_to_routed_and_back(False, True,
847                                                       self.WITHOUT_EH,
848                                                       self.STATEFUL_ICMP)
849
850     def test_1413_ip4_irb_1(self):
851         """ IPv4 routed -> bridged, L3 ACL permit+reflect, ICMP reflect"""
852         self.run_test_ip46_routed_to_bridged_and_back(False, False,
853                                                       self.WITHOUT_EH,
854                                                       self.STATEFUL_ICMP)
855
856     def test_1414_ip4_irb_1(self):
857         """ IPv4 bridged -> routed, L3 ACL permit+reflect, ICMP reflect"""
858         self.run_test_ip46_bridged_to_routed_and_back(False, False,
859                                                       self.WITHOUT_EH,
860                                                       self.STATEFUL_ICMP)
861
862
863 if __name__ == '__main__':
864     unittest.main(testRunner=VppTestRunner)