session: remove ipv6 lookup threading assert
[vpp.git] / test / test_acl_plugin_l2l3.py
1 #!/usr/bin/env python3
2 """ACL IRB Test Case HLD:
3
4 **config**
5     - L2 MAC learning enabled in l2bd
6     - 2 routed interfaces untagged, bvi (Bridge Virtual Interface)
7     - 2 bridged interfaces in l2bd with bvi
8
9 **test**
10     - sending ip4 eth pkts between routed interfaces
11         - 2 routed interfaces
12         - 2 bridged interfaces
13
14     - 64B, 512B, 1518B, 9200B (ether_size)
15
16     - burst of pkts per interface
17         - 257pkts per burst
18         - routed pkts hitting different FIB entries
19         - bridged pkts hitting different MAC entries
20
21 **verify**
22     - all packets received correctly
23
24 """
25
26 import copy
27 import unittest
28 from socket import AF_INET, AF_INET6
29 from random import shuffle
30 from ipaddress import ip_network
31 from config import config
32
33 import scapy.compat
34 from scapy.packet import Raw
35 from scapy.layers.l2 import Ether
36 from scapy.layers.inet import IP, UDP, ICMP, TCP
37 from scapy.layers.inet6 import IPv6, ICMPv6Unknown
38 from scapy.layers.inet6 import IPv6ExtHdrRouting
39 from scapy.layers.inet6 import IPv6ExtHdrFragment
40
41 from framework import VppTestCase
42 from asfframework import VppTestRunner
43 from vpp_l2 import L2_PORT_TYPE
44
45 from vpp_acl import AclRule, VppAcl, VppAclInterface
46
47
48 @unittest.skipIf("acl" in config.excluded_plugins, "Exclude ACL plugin tests")
49 class TestACLpluginL2L3(VppTestCase):
50     """TestACLpluginL2L3 Test Case"""
51
52     @classmethod
53     def setUpClass(cls):
54         """
55         #. Create BD with MAC learning enabled and put interfaces to this BD.
56         #. Configure IPv4 addresses on loopback interface and routed interface.
57         #. Configure MAC address binding to IPv4 neighbors on loop0.
58         #. Configure MAC address on pg2.
59         #. Loopback BVI interface has remote hosts, one half of hosts are
60            behind pg0 second behind pg1.
61         """
62         super(TestACLpluginL2L3, cls).setUpClass()
63
64         cls.pg_if_packet_sizes = [64, 512, 1518, 9018]  # packet sizes
65         cls.bd_id = 10
66         cls.remote_hosts_count = 250
67
68         # create 3 pg interfaces, 1 loopback interface
69         cls.create_pg_interfaces(range(3))
70         cls.create_loopback_interfaces(1)
71
72         cls.interfaces = list(cls.pg_interfaces)
73         cls.interfaces.extend(cls.lo_interfaces)
74
75         for i in cls.interfaces:
76             i.admin_up()
77
78         # Create BD with MAC learning enabled and put interfaces to this BD
79         cls.vapi.sw_interface_set_l2_bridge(
80             rx_sw_if_index=cls.loop0.sw_if_index,
81             bd_id=cls.bd_id,
82             port_type=L2_PORT_TYPE.BVI,
83         )
84         cls.vapi.sw_interface_set_l2_bridge(
85             rx_sw_if_index=cls.pg0.sw_if_index, bd_id=cls.bd_id
86         )
87         cls.vapi.sw_interface_set_l2_bridge(
88             rx_sw_if_index=cls.pg1.sw_if_index, bd_id=cls.bd_id
89         )
90
91         # Configure IPv4 addresses on loopback interface and routed interface
92         cls.loop0.config_ip4()
93         cls.loop0.config_ip6()
94         cls.pg2.config_ip4()
95         cls.pg2.config_ip6()
96
97         # Configure MAC address binding to IPv4 neighbors on loop0
98         cls.loop0.generate_remote_hosts(cls.remote_hosts_count)
99         cls.loop0.configure_ipv4_neighbors()
100         cls.loop0.configure_ipv6_neighbors()
101         # configure MAC address on pg2
102         cls.pg2.resolve_arp()
103         cls.pg2.resolve_ndp()
104
105         cls.WITHOUT_EH = False
106         cls.WITH_EH = True
107         cls.STATELESS_ICMP = False
108         cls.STATEFUL_ICMP = True
109
110         # Loopback BVI interface has remote hosts, one half of hosts are behind
111         # pg0 second behind pg1
112         half = cls.remote_hosts_count // 2
113         cls.pg0.remote_hosts = cls.loop0.remote_hosts[:half]
114         cls.pg1.remote_hosts = cls.loop0.remote_hosts[half:]
115         reply = cls.vapi.papi.acl_stats_intf_counters_enable(enable=1)
116
117     @classmethod
118     def tearDownClass(cls):
119         reply = cls.vapi.papi.acl_stats_intf_counters_enable(enable=0)
120         super(TestACLpluginL2L3, cls).tearDownClass()
121
122     def tearDown(self):
123         """Run standard test teardown and log ``show l2patch``,
124         ``show l2fib verbose``,``show bridge-domain <bd_id> detail``,
125         ``show ip neighbors``.
126         """
127         super(TestACLpluginL2L3, self).tearDown()
128
129     def show_commands_at_teardown(self):
130         self.logger.info(self.vapi.cli("show l2patch"))
131         self.logger.info(self.vapi.cli("show classify tables"))
132         self.logger.info(self.vapi.cli("show l2fib verbose"))
133         self.logger.info(self.vapi.cli("show bridge-domain %s detail" % self.bd_id))
134         self.logger.info(self.vapi.cli("show ip neighbors"))
135         cmd = "show acl-plugin sessions verbose 1"
136         self.logger.info(self.vapi.cli(cmd))
137         self.logger.info(self.vapi.cli("show acl-plugin acl"))
138         self.logger.info(self.vapi.cli("show acl-plugin interface"))
139         self.logger.info(self.vapi.cli("show acl-plugin tables"))
140
141     def create_stream(
142         self,
143         src_ip_if,
144         dst_ip_if,
145         reverse,
146         packet_sizes,
147         is_ip6,
148         expect_blocked,
149         expect_established,
150         add_extension_header,
151         icmp_stateful=False,
152     ):
153         pkts = []
154         rules = []
155         permit_rules = []
156         permit_and_reflect_rules = []
157         total_packet_count = 8
158         for i in range(0, total_packet_count):
159             modulo = (i // 2) % 2
160             icmp_type_delta = i % 2
161             icmp_code = i
162             is_udp_packet = modulo == 0
163             if is_udp_packet and icmp_stateful:
164                 continue
165             is_reflectable_icmp = (
166                 icmp_stateful and icmp_type_delta == 0 and not is_udp_packet
167             )
168             is_reflected_icmp = is_reflectable_icmp and expect_established
169             can_reflect_this_packet = is_udp_packet or is_reflectable_icmp
170             is_permit = i % 2
171             remote_dst_index = i % len(dst_ip_if.remote_hosts)
172             remote_dst_host = dst_ip_if.remote_hosts[remote_dst_index]
173             if is_permit == 1:
174                 info = self.create_packet_info(src_ip_if, dst_ip_if)
175                 payload = self.info_to_payload(info)
176             else:
177                 to_be_blocked = False
178                 if expect_blocked and not expect_established:
179                     to_be_blocked = True
180                 if not can_reflect_this_packet:
181                     to_be_blocked = True
182                 if to_be_blocked:
183                     payload = "to be blocked"
184                 else:
185                     info = self.create_packet_info(src_ip_if, dst_ip_if)
186                     payload = self.info_to_payload(info)
187             if reverse:
188                 dst_mac = "de:ad:00:00:00:00"
189                 src_mac = remote_dst_host._mac
190                 dst_ip6 = src_ip_if.remote_ip6
191                 src_ip6 = remote_dst_host.ip6
192                 dst_ip4 = src_ip_if.remote_ip4
193                 src_ip4 = remote_dst_host.ip4
194                 dst_l4 = 1234 + i
195                 src_l4 = 4321 + i
196             else:
197                 dst_mac = src_ip_if.local_mac
198                 src_mac = src_ip_if.remote_mac
199                 src_ip6 = src_ip_if.remote_ip6
200                 dst_ip6 = remote_dst_host.ip6
201                 src_ip4 = src_ip_if.remote_ip4
202                 dst_ip4 = remote_dst_host.ip4
203                 src_l4 = 1234 + i
204                 dst_l4 = 4321 + i
205             if is_reflected_icmp:
206                 icmp_type_delta = 1
207
208             # default ULP should be something we do not use in tests
209             ulp_l4 = TCP(sport=src_l4, dport=dst_l4)
210             # potentially a chain of protocols leading to ULP
211             ulp = ulp_l4
212
213             if is_udp_packet:
214                 if is_ip6:
215                     ulp_l4 = UDP(sport=src_l4, dport=dst_l4)
216                     if add_extension_header:
217                         # prepend some extension headers
218                         ulp = (
219                             IPv6ExtHdrRouting()
220                             / IPv6ExtHdrRouting()
221                             / IPv6ExtHdrFragment(offset=0, m=1)
222                             / ulp_l4
223                         )
224                         # uncomment below to test invalid ones
225                         # ulp = IPv6ExtHdrRouting(len = 200) / ulp_l4
226                     else:
227                         ulp = ulp_l4
228                     p = (
229                         Ether(dst=dst_mac, src=src_mac)
230                         / IPv6(src=src_ip6, dst=dst_ip6)
231                         / ulp
232                         / Raw(payload)
233                     )
234                 else:
235                     ulp_l4 = UDP(sport=src_l4, dport=dst_l4)
236                     # IPv4 does not allow extension headers,
237                     # but we rather make it a first fragment
238                     flags = 1 if add_extension_header else 0
239                     ulp = ulp_l4
240                     p = (
241                         Ether(dst=dst_mac, src=src_mac)
242                         / IP(src=src_ip4, dst=dst_ip4, frag=0, flags=flags)
243                         / ulp
244                         / Raw(payload)
245                     )
246             elif modulo == 1:
247                 if is_ip6:
248                     ulp_l4 = ICMPv6Unknown(type=128 + icmp_type_delta, code=icmp_code)
249                     ulp = ulp_l4
250                     p = (
251                         Ether(dst=dst_mac, src=src_mac)
252                         / IPv6(src=src_ip6, dst=dst_ip6)
253                         / ulp
254                         / Raw(payload)
255                     )
256                 else:
257                     ulp_l4 = ICMP(type=8 - 8 * icmp_type_delta, code=icmp_code)
258                     ulp = ulp_l4
259                     p = (
260                         Ether(dst=dst_mac, src=src_mac)
261                         / IP(src=src_ip4, dst=dst_ip4)
262                         / ulp
263                         / Raw(payload)
264                     )
265
266             if i % 2 == 1:
267                 info.data = p.copy()
268             size = packet_sizes[(i // 2) % len(packet_sizes)]
269             self.extend_packet(p, size)
270             pkts.append(p)
271
272             rule_family = AF_INET6 if p.haslayer(IPv6) else AF_INET
273             rule_prefix_len = 128 if p.haslayer(IPv6) else 32
274             rule_l3_layer = IPv6 if p.haslayer(IPv6) else IP
275
276             if p.haslayer(UDP):
277                 rule_l4_sport = p[UDP].sport
278                 rule_l4_dport = p[UDP].dport
279             else:
280                 if p.haslayer(ICMP):
281                     rule_l4_sport = p[ICMP].type
282                     rule_l4_dport = p[ICMP].code
283                 else:
284                     rule_l4_sport = p[ICMPv6Unknown].type
285                     rule_l4_dport = p[ICMPv6Unknown].code
286             if p.haslayer(IPv6):
287                 rule_l4_proto = ulp_l4.overload_fields[IPv6]["nh"]
288             else:
289                 rule_l4_proto = p[IP].proto
290
291             new_rule = AclRule(
292                 is_permit=is_permit,
293                 proto=rule_l4_proto,
294                 src_prefix=ip_network((p[rule_l3_layer].src, rule_prefix_len)),
295                 dst_prefix=ip_network((p[rule_l3_layer].dst, rule_prefix_len)),
296                 sport_from=rule_l4_sport,
297                 sport_to=rule_l4_sport,
298                 dport_from=rule_l4_dport,
299                 dport_to=rule_l4_dport,
300             )
301
302             rules.append(new_rule)
303             new_rule_permit = copy.copy(new_rule)
304             new_rule_permit.is_permit = 1
305             permit_rules.append(new_rule_permit)
306
307             new_rule_permit_and_reflect = copy.copy(new_rule)
308             if can_reflect_this_packet:
309                 new_rule_permit_and_reflect.is_permit = 2
310             else:
311                 new_rule_permit_and_reflect.is_permit = is_permit
312
313             permit_and_reflect_rules.append(new_rule_permit_and_reflect)
314             self.logger.info("create_stream pkt#%d: %s" % (i, payload))
315
316         return {
317             "stream": pkts,
318             "rules": rules,
319             "permit_rules": permit_rules,
320             "permit_and_reflect_rules": permit_and_reflect_rules,
321         }
322
323     def verify_capture(self, dst_ip_if, src_ip_if, capture, reverse):
324         last_info = dict()
325         for i in self.interfaces:
326             last_info[i.sw_if_index] = None
327
328         dst_ip_sw_if_index = dst_ip_if.sw_if_index
329
330         for packet in capture:
331             l3 = IP if packet.haslayer(IP) else IPv6
332             ip = packet[l3]
333             if packet.haslayer(UDP):
334                 l4 = UDP
335             else:
336                 if packet.haslayer(ICMP):
337                     l4 = ICMP
338                 else:
339                     l4 = ICMPv6Unknown
340
341             # Scapy IPv6 stuff is too smart for its own good.
342             # So we do this and coerce the ICMP into unknown type
343             if packet.haslayer(UDP):
344                 data = scapy.compat.raw(packet[UDP][Raw])
345             else:
346                 if l3 == IP:
347                     data = scapy.compat.raw(
348                         ICMP(scapy.compat.raw(packet[l3].payload))[Raw]
349                     )
350                 else:
351                     data = scapy.compat.raw(
352                         ICMPv6Unknown(scapy.compat.raw(packet[l3].payload)).msgbody
353                     )
354             udp_or_icmp = packet[l3].payload
355             data_obj = Raw(data)
356             # FIXME: make framework believe we are on object
357             payload_info = self.payload_to_info(data_obj)
358             packet_index = payload_info.index
359
360             self.assertEqual(payload_info.dst, dst_ip_sw_if_index)
361
362             next_info = self.get_next_packet_info_for_interface2(
363                 payload_info.src, dst_ip_sw_if_index, last_info[payload_info.src]
364             )
365             last_info[payload_info.src] = next_info
366             self.assertTrue(next_info is not None)
367             self.assertEqual(packet_index, next_info.index)
368             saved_packet = next_info.data
369             self.assertTrue(next_info is not None)
370
371             # MAC: src, dst
372             if not reverse:
373                 self.assertEqual(packet.src, dst_ip_if.local_mac)
374                 host = dst_ip_if.host_by_mac(packet.dst)
375
376             # IP: src, dst
377             # self.assertEqual(ip.src, src_ip_if.remote_ip4)
378             if saved_packet is not None:
379                 self.assertEqual(ip.src, saved_packet[l3].src)
380                 self.assertEqual(ip.dst, saved_packet[l3].dst)
381                 if l4 == UDP:
382                     self.assertEqual(udp_or_icmp.sport, saved_packet[l4].sport)
383                     self.assertEqual(udp_or_icmp.dport, saved_packet[l4].dport)
384             # self.assertEqual(ip.dst, host.ip4)
385
386             # UDP:
387
388     def applied_acl_shuffle(self, acl_if):
389         saved_n_input = acl_if.n_input
390         # TOTO: maybe copy each one??
391         saved_acls = acl_if.acls
392
393         # now create a list of all the rules in all ACLs
394         all_rules = []
395         for old_acl in saved_acls:
396             for rule in old_acl.rules:
397                 all_rules.append(rule)
398
399         # Add a few ACLs made from shuffled rules
400         shuffle(all_rules)
401         acl1 = VppAcl(self, rules=all_rules[::2], tag="shuffle 1. acl")
402         acl1.add_vpp_config()
403
404         shuffle(all_rules)
405         acl2 = VppAcl(self, rules=all_rules[::3], tag="shuffle 2. acl")
406         acl2.add_vpp_config()
407
408         shuffle(all_rules)
409         acl3 = VppAcl(self, rules=all_rules[::2], tag="shuffle 3. acl")
410         acl3.add_vpp_config()
411
412         # apply the shuffle ACLs in front
413         input_acls = [acl1, acl2]
414         output_acls = [acl1, acl2]
415
416         # add the currently applied ACLs
417         n_input = acl_if.n_input
418         input_acls.extend(saved_acls[:n_input])
419         output_acls.extend(saved_acls[n_input:])
420
421         # and the trailing shuffle ACL(s)
422         input_acls.extend([acl3])
423         output_acls.extend([acl3])
424
425         # set the interface ACL list to the result
426         acl_if.n_input = len(input_acls)
427         acl_if.acls = input_acls + output_acls
428         acl_if.add_vpp_config()
429
430         # change the ACLs a few times
431         for i in range(1, 10):
432             shuffle(all_rules)
433             acl1.modify_vpp_config(all_rules[:: 1 + (i % 2)])
434
435             shuffle(all_rules)
436             acl2.modify_vpp_config(all_rules[:: 1 + (i % 3)])
437
438             shuffle(all_rules)
439             acl3.modify_vpp_config(all_rules[:: 1 + (i % 5)])
440
441         # restore to how it was before and clean up
442         acl_if.n_input = saved_n_input
443         acl_if.acls = saved_acls
444         acl_if.add_vpp_config()
445
446         acl1.remove_vpp_config()
447         acl2.remove_vpp_config()
448         acl3.remove_vpp_config()
449
450     def create_acls_for_a_stream(self, stream_dict, test_l2_action, is_reflect):
451         r = stream_dict["rules"]
452         r_permit = stream_dict["permit_rules"]
453         r_permit_reflect = stream_dict["permit_and_reflect_rules"]
454         r_action = r_permit_reflect if is_reflect else r
455         action_acl = VppAcl(self, rules=r_action, tag="act. acl")
456         action_acl.add_vpp_config()
457         permit_acl = VppAcl(self, rules=r_permit, tag="perm. acl")
458         permit_acl.add_vpp_config()
459
460         return {
461             "L2": action_acl if test_l2_action else permit_acl,
462             "L3": permit_acl if test_l2_action else action_acl,
463             "permit": permit_acl,
464             "action": action_acl,
465         }
466
467     def apply_acl_ip46_x_to_y(
468         self, bridged_to_routed, test_l2_deny, is_ip6, is_reflect, add_eh
469     ):
470         """Apply the ACLs"""
471         self.reset_packet_infos()
472         stream_dict = self.create_stream(
473             self.pg2,
474             self.loop0,
475             bridged_to_routed,
476             self.pg_if_packet_sizes,
477             is_ip6,
478             not is_reflect,
479             False,
480             add_eh,
481         )
482         stream = stream_dict["stream"]
483         acl_idx = self.create_acls_for_a_stream(stream_dict, test_l2_deny, is_reflect)
484         n_input_l3 = 0 if bridged_to_routed else 1
485         n_input_l2 = 1 if bridged_to_routed else 0
486
487         acl_if_pg2 = VppAclInterface(
488             self,
489             sw_if_index=self.pg2.sw_if_index,
490             n_input=n_input_l3,
491             acls=[acl_idx["L3"]],
492         )
493         acl_if_pg2.add_vpp_config()
494
495         acl_if_pg0 = VppAclInterface(
496             self,
497             sw_if_index=self.pg0.sw_if_index,
498             n_input=n_input_l2,
499             acls=[acl_idx["L2"]],
500         )
501         acl_if_pg0.add_vpp_config()
502
503         acl_if_pg1 = VppAclInterface(
504             self,
505             sw_if_index=self.pg1.sw_if_index,
506             n_input=n_input_l2,
507             acls=[acl_idx["L2"]],
508         )
509         acl_if_pg1.add_vpp_config()
510
511         self.applied_acl_shuffle(acl_if_pg0)
512         self.applied_acl_shuffle(acl_if_pg1)
513         return {"L2": acl_idx["L2"], "L3": acl_idx["L3"]}
514
515     def apply_acl_ip46_both_directions_reflect(
516         self, primary_is_bridged_to_routed, reflect_on_l2, is_ip6, add_eh, stateful_icmp
517     ):
518         primary_is_routed_to_bridged = not primary_is_bridged_to_routed
519         self.reset_packet_infos()
520         stream_dict_fwd = self.create_stream(
521             self.pg2,
522             self.loop0,
523             primary_is_bridged_to_routed,
524             self.pg_if_packet_sizes,
525             is_ip6,
526             False,
527             False,
528             add_eh,
529             stateful_icmp,
530         )
531         acl_idx_fwd = self.create_acls_for_a_stream(
532             stream_dict_fwd, reflect_on_l2, True
533         )
534
535         stream_dict_rev = self.create_stream(
536             self.pg2,
537             self.loop0,
538             not primary_is_bridged_to_routed,
539             self.pg_if_packet_sizes,
540             is_ip6,
541             True,
542             True,
543             add_eh,
544             stateful_icmp,
545         )
546         # We want the primary action to be "deny" rather than reflect
547         acl_idx_rev = self.create_acls_for_a_stream(
548             stream_dict_rev, reflect_on_l2, False
549         )
550
551         if primary_is_bridged_to_routed:
552             inbound_l2_acl = acl_idx_fwd["L2"]
553         else:
554             inbound_l2_acl = acl_idx_rev["L2"]
555
556         if primary_is_routed_to_bridged:
557             outbound_l2_acl = acl_idx_fwd["L2"]
558         else:
559             outbound_l2_acl = acl_idx_rev["L2"]
560
561         if primary_is_routed_to_bridged:
562             inbound_l3_acl = acl_idx_fwd["L3"]
563         else:
564             inbound_l3_acl = acl_idx_rev["L3"]
565
566         if primary_is_bridged_to_routed:
567             outbound_l3_acl = acl_idx_fwd["L3"]
568         else:
569             outbound_l3_acl = acl_idx_rev["L3"]
570
571         acl_if_pg2 = VppAclInterface(
572             self,
573             sw_if_index=self.pg2.sw_if_index,
574             n_input=1,
575             acls=[inbound_l3_acl, outbound_l3_acl],
576         )
577         acl_if_pg2.add_vpp_config()
578
579         acl_if_pg0 = VppAclInterface(
580             self,
581             sw_if_index=self.pg0.sw_if_index,
582             n_input=1,
583             acls=[inbound_l2_acl, outbound_l2_acl],
584         )
585         acl_if_pg0.add_vpp_config()
586
587         acl_if_pg1 = VppAclInterface(
588             self,
589             sw_if_index=self.pg1.sw_if_index,
590             n_input=1,
591             acls=[inbound_l2_acl, outbound_l2_acl],
592         )
593         acl_if_pg1.add_vpp_config()
594
595         self.applied_acl_shuffle(acl_if_pg0)
596         self.applied_acl_shuffle(acl_if_pg2)
597
598     def apply_acl_ip46_routed_to_bridged(
599         self, test_l2_deny, is_ip6, is_reflect, add_eh
600     ):
601         return self.apply_acl_ip46_x_to_y(
602             False, test_l2_deny, is_ip6, is_reflect, add_eh
603         )
604
605     def apply_acl_ip46_bridged_to_routed(
606         self, test_l2_deny, is_ip6, is_reflect, add_eh
607     ):
608         return self.apply_acl_ip46_x_to_y(
609             True, test_l2_deny, is_ip6, is_reflect, add_eh
610         )
611
612     def verify_acl_packet_count(self, acl_idx, packet_count):
613         matches = self.statistics.get_counter("/acl/%d/matches" % acl_idx)
614         self.logger.info("stat seg for ACL %d: %s" % (acl_idx, repr(matches)))
615         total_count = 0
616         for m in matches:
617             for p in m:
618                 total_count = total_count + p["packets"]
619         self.assertEqual(total_count, packet_count)
620
621     def run_traffic_ip46_x_to_y(
622         self,
623         bridged_to_routed,
624         test_l2_deny,
625         is_ip6,
626         is_reflect,
627         is_established,
628         add_eh,
629         stateful_icmp=False,
630     ):
631         self.reset_packet_infos()
632         stream_dict = self.create_stream(
633             self.pg2,
634             self.loop0,
635             bridged_to_routed,
636             self.pg_if_packet_sizes,
637             is_ip6,
638             not is_reflect,
639             is_established,
640             add_eh,
641             stateful_icmp,
642         )
643         stream = stream_dict["stream"]
644
645         tx_if = self.pg0 if bridged_to_routed else self.pg2
646         rx_if = self.pg2 if bridged_to_routed else self.pg0
647
648         tx_if.add_stream(stream)
649         self.pg_enable_capture(self.pg_interfaces)
650         self.pg_start()
651         packet_count = self.get_packet_count_for_if_idx(self.loop0.sw_if_index)
652         rcvd1 = rx_if.get_capture(packet_count)
653         self.verify_capture(self.loop0, self.pg2, rcvd1, bridged_to_routed)
654         return len(stream)
655
656     def run_traffic_ip46_routed_to_bridged(
657         self,
658         test_l2_deny,
659         is_ip6,
660         is_reflect,
661         is_established,
662         add_eh,
663         stateful_icmp=False,
664     ):
665         return self.run_traffic_ip46_x_to_y(
666             False,
667             test_l2_deny,
668             is_ip6,
669             is_reflect,
670             is_established,
671             add_eh,
672             stateful_icmp,
673         )
674
675     def run_traffic_ip46_bridged_to_routed(
676         self,
677         test_l2_deny,
678         is_ip6,
679         is_reflect,
680         is_established,
681         add_eh,
682         stateful_icmp=False,
683     ):
684         return self.run_traffic_ip46_x_to_y(
685             True,
686             test_l2_deny,
687             is_ip6,
688             is_reflect,
689             is_established,
690             add_eh,
691             stateful_icmp,
692         )
693
694     def run_test_ip46_routed_to_bridged(self, test_l2_deny, is_ip6, is_reflect, add_eh):
695         acls = self.apply_acl_ip46_routed_to_bridged(
696             test_l2_deny, is_ip6, is_reflect, add_eh
697         )
698         pkts = self.run_traffic_ip46_routed_to_bridged(
699             test_l2_deny, is_ip6, is_reflect, False, add_eh
700         )
701         self.verify_acl_packet_count(acls["L3"].acl_index, pkts)
702
703     def run_test_ip46_bridged_to_routed(self, test_l2_deny, is_ip6, is_reflect, add_eh):
704         acls = self.apply_acl_ip46_bridged_to_routed(
705             test_l2_deny, is_ip6, is_reflect, add_eh
706         )
707         pkts = self.run_traffic_ip46_bridged_to_routed(
708             test_l2_deny, is_ip6, is_reflect, False, add_eh
709         )
710         self.verify_acl_packet_count(acls["L2"].acl_index, pkts)
711
712     def run_test_ip46_routed_to_bridged_and_back(
713         self, test_l2_action, is_ip6, add_eh, stateful_icmp=False
714     ):
715         self.apply_acl_ip46_both_directions_reflect(
716             False, test_l2_action, is_ip6, add_eh, stateful_icmp
717         )
718         self.run_traffic_ip46_routed_to_bridged(
719             test_l2_action, is_ip6, True, False, add_eh, stateful_icmp
720         )
721         self.run_traffic_ip46_bridged_to_routed(
722             test_l2_action, is_ip6, False, True, add_eh, stateful_icmp
723         )
724
725     def run_test_ip46_bridged_to_routed_and_back(
726         self, test_l2_action, is_ip6, add_eh, stateful_icmp=False
727     ):
728         self.apply_acl_ip46_both_directions_reflect(
729             True, test_l2_action, is_ip6, add_eh, stateful_icmp
730         )
731         self.run_traffic_ip46_bridged_to_routed(
732             test_l2_action, is_ip6, True, False, add_eh, stateful_icmp
733         )
734         self.run_traffic_ip46_routed_to_bridged(
735             test_l2_action, is_ip6, False, True, add_eh, stateful_icmp
736         )
737
738     def test_0000_ip6_irb_1(self):
739         """ACL plugin prepare"""
740         if not self.vpp_dead:
741             cmd = "set acl-plugin session timeout udp idle 2000"
742             self.logger.info(self.vapi.ppcli(cmd))
743             # uncomment to not skip past the routing header
744             # and watch the EH tests fail
745             # self.logger.info(self.vapi.ppcli(
746             #    "set acl-plugin skip-ipv6-extension-header 43 0"))
747             # uncomment to test the session limit (stateful tests will fail)
748             # self.logger.info(self.vapi.ppcli(
749             #    "set acl-plugin session table max-entries 1"))
750             # new datapath is the default, but just in case
751             # self.logger.info(self.vapi.ppcli(
752             #    "set acl-plugin l2-datapath new"))
753             # If you want to see some tests fail, uncomment the next line
754             # self.logger.info(self.vapi.ppcli(
755             #    "set acl-plugin l2-datapath old"))
756
757     def test_0001_ip6_irb_1(self):
758         """ACL IPv6 routed -> bridged, L2 ACL deny"""
759         self.run_test_ip46_routed_to_bridged(True, True, False, self.WITHOUT_EH)
760
761     def test_0002_ip6_irb_1(self):
762         """ACL IPv6 routed -> bridged, L3 ACL deny"""
763         self.run_test_ip46_routed_to_bridged(False, True, False, self.WITHOUT_EH)
764
765     def test_0003_ip4_irb_1(self):
766         """ACL IPv4 routed -> bridged, L2 ACL deny"""
767         self.run_test_ip46_routed_to_bridged(True, False, False, self.WITHOUT_EH)
768
769     def test_0004_ip4_irb_1(self):
770         """ACL IPv4 routed -> bridged, L3 ACL deny"""
771         self.run_test_ip46_routed_to_bridged(False, False, False, self.WITHOUT_EH)
772
773     def test_0005_ip6_irb_1(self):
774         """ACL IPv6 bridged -> routed, L2 ACL deny"""
775         self.run_test_ip46_bridged_to_routed(True, True, False, self.WITHOUT_EH)
776
777     def test_0006_ip6_irb_1(self):
778         """ACL IPv6 bridged -> routed, L3 ACL deny"""
779         self.run_test_ip46_bridged_to_routed(False, True, False, self.WITHOUT_EH)
780
781     def test_0007_ip6_irb_1(self):
782         """ACL IPv4 bridged -> routed, L2 ACL deny"""
783         self.run_test_ip46_bridged_to_routed(True, False, False, self.WITHOUT_EH)
784
785     def test_0008_ip6_irb_1(self):
786         """ACL IPv4 bridged -> routed, L3 ACL deny"""
787         self.run_test_ip46_bridged_to_routed(False, False, False, self.WITHOUT_EH)
788
789     # Stateful ACL tests
790     def test_0101_ip6_irb_1(self):
791         """ACL IPv6 routed -> bridged, L2 ACL permit+reflect"""
792         self.run_test_ip46_routed_to_bridged_and_back(True, True, self.WITHOUT_EH)
793
794     def test_0102_ip6_irb_1(self):
795         """ACL IPv6 bridged -> routed, L2 ACL permit+reflect"""
796         self.run_test_ip46_bridged_to_routed_and_back(True, True, self.WITHOUT_EH)
797
798     def test_0103_ip6_irb_1(self):
799         """ACL IPv4 routed -> bridged, L2 ACL permit+reflect"""
800         self.run_test_ip46_routed_to_bridged_and_back(True, False, self.WITHOUT_EH)
801
802     def test_0104_ip6_irb_1(self):
803         """ACL IPv4 bridged -> routed, L2 ACL permit+reflect"""
804         self.run_test_ip46_bridged_to_routed_and_back(True, False, self.WITHOUT_EH)
805
806     def test_0111_ip6_irb_1(self):
807         """ACL IPv6 routed -> bridged, L3 ACL permit+reflect"""
808         self.run_test_ip46_routed_to_bridged_and_back(False, True, self.WITHOUT_EH)
809
810     def test_0112_ip6_irb_1(self):
811         """ACL IPv6 bridged -> routed, L3 ACL permit+reflect"""
812         self.run_test_ip46_bridged_to_routed_and_back(False, True, self.WITHOUT_EH)
813
814     def test_0113_ip6_irb_1(self):
815         """ACL IPv4 routed -> bridged, L3 ACL permit+reflect"""
816         self.run_test_ip46_routed_to_bridged_and_back(False, False, self.WITHOUT_EH)
817
818     def test_0114_ip6_irb_1(self):
819         """ACL IPv4 bridged -> routed, L3 ACL permit+reflect"""
820         self.run_test_ip46_bridged_to_routed_and_back(False, False, self.WITHOUT_EH)
821
822     # A block of tests with extension headers
823
824     def test_1001_ip6_irb_1(self):
825         """ACL IPv6+EH routed -> bridged, L2 ACL deny"""
826         self.run_test_ip46_routed_to_bridged(True, True, False, self.WITH_EH)
827
828     def test_1002_ip6_irb_1(self):
829         """ACL IPv6+EH routed -> bridged, L3 ACL deny"""
830         self.run_test_ip46_routed_to_bridged(False, True, False, self.WITH_EH)
831
832     def test_1005_ip6_irb_1(self):
833         """ACL IPv6+EH bridged -> routed, L2 ACL deny"""
834         self.run_test_ip46_bridged_to_routed(True, True, False, self.WITH_EH)
835
836     def test_1006_ip6_irb_1(self):
837         """ACL IPv6+EH bridged -> routed, L3 ACL deny"""
838         self.run_test_ip46_bridged_to_routed(False, True, False, self.WITH_EH)
839
840     def test_1101_ip6_irb_1(self):
841         """ACL IPv6+EH routed -> bridged, L2 ACL permit+reflect"""
842         self.run_test_ip46_routed_to_bridged_and_back(True, True, self.WITH_EH)
843
844     def test_1102_ip6_irb_1(self):
845         """ACL IPv6+EH bridged -> routed, L2 ACL permit+reflect"""
846         self.run_test_ip46_bridged_to_routed_and_back(True, True, self.WITH_EH)
847
848     def test_1111_ip6_irb_1(self):
849         """ACL IPv6+EH routed -> bridged, L3 ACL permit+reflect"""
850         self.run_test_ip46_routed_to_bridged_and_back(False, True, self.WITH_EH)
851
852     def test_1112_ip6_irb_1(self):
853         """ACL IPv6+EH bridged -> routed, L3 ACL permit+reflect"""
854         self.run_test_ip46_bridged_to_routed_and_back(False, True, self.WITH_EH)
855
856     # IPv4 with "MF" bit set
857
858     def test_1201_ip6_irb_1(self):
859         """ACL IPv4+MF routed -> bridged, L2 ACL deny"""
860         self.run_test_ip46_routed_to_bridged(True, False, False, self.WITH_EH)
861
862     def test_1202_ip6_irb_1(self):
863         """ACL IPv4+MF routed -> bridged, L3 ACL deny"""
864         self.run_test_ip46_routed_to_bridged(False, False, False, self.WITH_EH)
865
866     def test_1205_ip6_irb_1(self):
867         """ACL IPv4+MF bridged -> routed, L2 ACL deny"""
868         self.run_test_ip46_bridged_to_routed(True, False, False, self.WITH_EH)
869
870     def test_1206_ip6_irb_1(self):
871         """ACL IPv4+MF bridged -> routed, L3 ACL deny"""
872         self.run_test_ip46_bridged_to_routed(False, False, False, self.WITH_EH)
873
874     def test_1301_ip6_irb_1(self):
875         """ACL IPv4+MF routed -> bridged, L2 ACL permit+reflect"""
876         self.run_test_ip46_routed_to_bridged_and_back(True, False, self.WITH_EH)
877
878     def test_1302_ip6_irb_1(self):
879         """ACL IPv4+MF bridged -> routed, L2 ACL permit+reflect"""
880         self.run_test_ip46_bridged_to_routed_and_back(True, False, self.WITH_EH)
881
882     def test_1311_ip6_irb_1(self):
883         """ACL IPv4+MF routed -> bridged, L3 ACL permit+reflect"""
884         self.run_test_ip46_routed_to_bridged_and_back(False, False, self.WITH_EH)
885
886     def test_1312_ip6_irb_1(self):
887         """ACL IPv4+MF bridged -> routed, L3 ACL permit+reflect"""
888         self.run_test_ip46_bridged_to_routed_and_back(False, False, self.WITH_EH)
889
890     # Stateful ACL tests with stateful ICMP
891
892     def test_1401_ip6_irb_1(self):
893         """IPv6 routed -> bridged, L2 ACL permit+reflect, ICMP reflect"""
894         self.run_test_ip46_routed_to_bridged_and_back(
895             True, True, self.WITHOUT_EH, self.STATEFUL_ICMP
896         )
897
898     def test_1402_ip6_irb_1(self):
899         """IPv6 bridged -> routed, L2 ACL permit+reflect, ICMP reflect"""
900         self.run_test_ip46_bridged_to_routed_and_back(
901             True, True, self.WITHOUT_EH, self.STATEFUL_ICMP
902         )
903
904     def test_1403_ip4_irb_1(self):
905         """IPv4 routed -> bridged, L2 ACL permit+reflect, ICMP reflect"""
906         self.run_test_ip46_routed_to_bridged_and_back(
907             True, False, self.WITHOUT_EH, self.STATEFUL_ICMP
908         )
909
910     def test_1404_ip4_irb_1(self):
911         """IPv4 bridged -> routed, L2 ACL permit+reflect, ICMP reflect"""
912         self.run_test_ip46_bridged_to_routed_and_back(
913             True, False, self.WITHOUT_EH, self.STATEFUL_ICMP
914         )
915
916     def test_1411_ip6_irb_1(self):
917         """IPv6 routed -> bridged, L3 ACL permit+reflect, ICMP reflect"""
918         self.run_test_ip46_routed_to_bridged_and_back(
919             False, True, self.WITHOUT_EH, self.STATEFUL_ICMP
920         )
921
922     def test_1412_ip6_irb_1(self):
923         """IPv6 bridged -> routed, L3 ACL permit+reflect, ICMP reflect"""
924         self.run_test_ip46_bridged_to_routed_and_back(
925             False, True, self.WITHOUT_EH, self.STATEFUL_ICMP
926         )
927
928     def test_1413_ip4_irb_1(self):
929         """IPv4 routed -> bridged, L3 ACL permit+reflect, ICMP reflect"""
930         self.run_test_ip46_routed_to_bridged_and_back(
931             False, False, self.WITHOUT_EH, self.STATEFUL_ICMP
932         )
933
934     def test_1414_ip4_irb_1(self):
935         """IPv4 bridged -> routed, L3 ACL permit+reflect, ICMP reflect"""
936         self.run_test_ip46_bridged_to_routed_and_back(
937             False, False, self.WITHOUT_EH, self.STATEFUL_ICMP
938         )
939
940
941 if __name__ == "__main__":
942     unittest.main(testRunner=VppTestRunner)