tests: replace pycodestyle with black
[vpp.git] / test / test_acl_plugin_l2l3.py
1 #!/usr/bin/env python3
2 """ACL IRB Test Case HLD:
3
4 **config**
5     - L2 MAC learning enabled in l2bd
6     - 2 routed interfaces untagged, bvi (Bridge Virtual Interface)
7     - 2 bridged interfaces in l2bd with bvi
8
9 **test**
10     - sending ip4 eth pkts between routed interfaces
11         - 2 routed interfaces
12         - 2 bridged interfaces
13
14     - 64B, 512B, 1518B, 9200B (ether_size)
15
16     - burst of pkts per interface
17         - 257pkts per burst
18         - routed pkts hitting different FIB entries
19         - bridged pkts hitting different MAC entries
20
21 **verify**
22     - all packets received correctly
23
24 """
25
26 import copy
27 import unittest
28 from socket import inet_pton, AF_INET, AF_INET6
29 from random import choice, shuffle
30 from pprint import pprint
31 from ipaddress import ip_network
32
33 import scapy.compat
34 from scapy.packet import Raw
35 from scapy.layers.l2 import Ether
36 from scapy.layers.inet import IP, UDP, ICMP, TCP
37 from scapy.layers.inet6 import IPv6, ICMPv6Unknown, ICMPv6EchoRequest
38 from scapy.layers.inet6 import ICMPv6EchoReply, IPv6ExtHdrRouting
39 from scapy.layers.inet6 import IPv6ExtHdrFragment
40
41 from framework import VppTestCase, VppTestRunner
42 from vpp_l2 import L2_PORT_TYPE
43 import time
44
45 from vpp_acl import AclRule, VppAcl, VppAclInterface
46
47
48 class TestACLpluginL2L3(VppTestCase):
49     """TestACLpluginL2L3 Test Case"""
50
51     @classmethod
52     def setUpClass(cls):
53         """
54         #. Create BD with MAC learning enabled and put interfaces to this BD.
55         #. Configure IPv4 addresses on loopback interface and routed interface.
56         #. Configure MAC address binding to IPv4 neighbors on loop0.
57         #. Configure MAC address on pg2.
58         #. Loopback BVI interface has remote hosts, one half of hosts are
59            behind pg0 second behind pg1.
60         """
61         super(TestACLpluginL2L3, cls).setUpClass()
62
63         cls.pg_if_packet_sizes = [64, 512, 1518, 9018]  # packet sizes
64         cls.bd_id = 10
65         cls.remote_hosts_count = 250
66
67         # create 3 pg interfaces, 1 loopback interface
68         cls.create_pg_interfaces(range(3))
69         cls.create_loopback_interfaces(1)
70
71         cls.interfaces = list(cls.pg_interfaces)
72         cls.interfaces.extend(cls.lo_interfaces)
73
74         for i in cls.interfaces:
75             i.admin_up()
76
77         # Create BD with MAC learning enabled and put interfaces to this BD
78         cls.vapi.sw_interface_set_l2_bridge(
79             rx_sw_if_index=cls.loop0.sw_if_index,
80             bd_id=cls.bd_id,
81             port_type=L2_PORT_TYPE.BVI,
82         )
83         cls.vapi.sw_interface_set_l2_bridge(
84             rx_sw_if_index=cls.pg0.sw_if_index, bd_id=cls.bd_id
85         )
86         cls.vapi.sw_interface_set_l2_bridge(
87             rx_sw_if_index=cls.pg1.sw_if_index, bd_id=cls.bd_id
88         )
89
90         # Configure IPv4 addresses on loopback interface and routed interface
91         cls.loop0.config_ip4()
92         cls.loop0.config_ip6()
93         cls.pg2.config_ip4()
94         cls.pg2.config_ip6()
95
96         # Configure MAC address binding to IPv4 neighbors on loop0
97         cls.loop0.generate_remote_hosts(cls.remote_hosts_count)
98         cls.loop0.configure_ipv4_neighbors()
99         cls.loop0.configure_ipv6_neighbors()
100         # configure MAC address on pg2
101         cls.pg2.resolve_arp()
102         cls.pg2.resolve_ndp()
103
104         cls.WITHOUT_EH = False
105         cls.WITH_EH = True
106         cls.STATELESS_ICMP = False
107         cls.STATEFUL_ICMP = True
108
109         # Loopback BVI interface has remote hosts, one half of hosts are behind
110         # pg0 second behind pg1
111         half = cls.remote_hosts_count // 2
112         cls.pg0.remote_hosts = cls.loop0.remote_hosts[:half]
113         cls.pg1.remote_hosts = cls.loop0.remote_hosts[half:]
114         reply = cls.vapi.papi.acl_stats_intf_counters_enable(enable=1)
115
116     @classmethod
117     def tearDownClass(cls):
118         reply = cls.vapi.papi.acl_stats_intf_counters_enable(enable=0)
119         super(TestACLpluginL2L3, cls).tearDownClass()
120
121     def tearDown(self):
122         """Run standard test teardown and log ``show l2patch``,
123         ``show l2fib verbose``,``show bridge-domain <bd_id> detail``,
124         ``show ip neighbors``.
125         """
126         super(TestACLpluginL2L3, self).tearDown()
127
128     def show_commands_at_teardown(self):
129         self.logger.info(self.vapi.cli("show l2patch"))
130         self.logger.info(self.vapi.cli("show classify tables"))
131         self.logger.info(self.vapi.cli("show l2fib verbose"))
132         self.logger.info(self.vapi.cli("show bridge-domain %s detail" % self.bd_id))
133         self.logger.info(self.vapi.cli("show ip neighbors"))
134         cmd = "show acl-plugin sessions verbose 1"
135         self.logger.info(self.vapi.cli(cmd))
136         self.logger.info(self.vapi.cli("show acl-plugin acl"))
137         self.logger.info(self.vapi.cli("show acl-plugin interface"))
138         self.logger.info(self.vapi.cli("show acl-plugin tables"))
139
140     def create_stream(
141         self,
142         src_ip_if,
143         dst_ip_if,
144         reverse,
145         packet_sizes,
146         is_ip6,
147         expect_blocked,
148         expect_established,
149         add_extension_header,
150         icmp_stateful=False,
151     ):
152         pkts = []
153         rules = []
154         permit_rules = []
155         permit_and_reflect_rules = []
156         total_packet_count = 8
157         for i in range(0, total_packet_count):
158             modulo = (i // 2) % 2
159             icmp_type_delta = i % 2
160             icmp_code = i
161             is_udp_packet = modulo == 0
162             if is_udp_packet and icmp_stateful:
163                 continue
164             is_reflectable_icmp = (
165                 icmp_stateful and icmp_type_delta == 0 and not is_udp_packet
166             )
167             is_reflected_icmp = is_reflectable_icmp and expect_established
168             can_reflect_this_packet = is_udp_packet or is_reflectable_icmp
169             is_permit = i % 2
170             remote_dst_index = i % len(dst_ip_if.remote_hosts)
171             remote_dst_host = dst_ip_if.remote_hosts[remote_dst_index]
172             if is_permit == 1:
173                 info = self.create_packet_info(src_ip_if, dst_ip_if)
174                 payload = self.info_to_payload(info)
175             else:
176                 to_be_blocked = False
177                 if expect_blocked and not expect_established:
178                     to_be_blocked = True
179                 if not can_reflect_this_packet:
180                     to_be_blocked = True
181                 if to_be_blocked:
182                     payload = "to be blocked"
183                 else:
184                     info = self.create_packet_info(src_ip_if, dst_ip_if)
185                     payload = self.info_to_payload(info)
186             if reverse:
187                 dst_mac = "de:ad:00:00:00:00"
188                 src_mac = remote_dst_host._mac
189                 dst_ip6 = src_ip_if.remote_ip6
190                 src_ip6 = remote_dst_host.ip6
191                 dst_ip4 = src_ip_if.remote_ip4
192                 src_ip4 = remote_dst_host.ip4
193                 dst_l4 = 1234 + i
194                 src_l4 = 4321 + i
195             else:
196                 dst_mac = src_ip_if.local_mac
197                 src_mac = src_ip_if.remote_mac
198                 src_ip6 = src_ip_if.remote_ip6
199                 dst_ip6 = remote_dst_host.ip6
200                 src_ip4 = src_ip_if.remote_ip4
201                 dst_ip4 = remote_dst_host.ip4
202                 src_l4 = 1234 + i
203                 dst_l4 = 4321 + i
204             if is_reflected_icmp:
205                 icmp_type_delta = 1
206
207             # default ULP should be something we do not use in tests
208             ulp_l4 = TCP(sport=src_l4, dport=dst_l4)
209             # potentially a chain of protocols leading to ULP
210             ulp = ulp_l4
211
212             if is_udp_packet:
213                 if is_ip6:
214                     ulp_l4 = UDP(sport=src_l4, dport=dst_l4)
215                     if add_extension_header:
216                         # prepend some extension headers
217                         ulp = (
218                             IPv6ExtHdrRouting()
219                             / IPv6ExtHdrRouting()
220                             / IPv6ExtHdrFragment(offset=0, m=1)
221                             / ulp_l4
222                         )
223                         # uncomment below to test invalid ones
224                         # ulp = IPv6ExtHdrRouting(len = 200) / ulp_l4
225                     else:
226                         ulp = ulp_l4
227                     p = (
228                         Ether(dst=dst_mac, src=src_mac)
229                         / IPv6(src=src_ip6, dst=dst_ip6)
230                         / ulp
231                         / Raw(payload)
232                     )
233                 else:
234                     ulp_l4 = UDP(sport=src_l4, dport=dst_l4)
235                     # IPv4 does not allow extension headers,
236                     # but we rather make it a first fragment
237                     flags = 1 if add_extension_header else 0
238                     ulp = ulp_l4
239                     p = (
240                         Ether(dst=dst_mac, src=src_mac)
241                         / IP(src=src_ip4, dst=dst_ip4, frag=0, flags=flags)
242                         / ulp
243                         / Raw(payload)
244                     )
245             elif modulo == 1:
246                 if is_ip6:
247                     ulp_l4 = ICMPv6Unknown(type=128 + icmp_type_delta, code=icmp_code)
248                     ulp = ulp_l4
249                     p = (
250                         Ether(dst=dst_mac, src=src_mac)
251                         / IPv6(src=src_ip6, dst=dst_ip6)
252                         / ulp
253                         / Raw(payload)
254                     )
255                 else:
256                     ulp_l4 = ICMP(type=8 - 8 * icmp_type_delta, code=icmp_code)
257                     ulp = ulp_l4
258                     p = (
259                         Ether(dst=dst_mac, src=src_mac)
260                         / IP(src=src_ip4, dst=dst_ip4)
261                         / ulp
262                         / Raw(payload)
263                     )
264
265             if i % 2 == 1:
266                 info.data = p.copy()
267             size = packet_sizes[(i // 2) % len(packet_sizes)]
268             self.extend_packet(p, size)
269             pkts.append(p)
270
271             rule_family = AF_INET6 if p.haslayer(IPv6) else AF_INET
272             rule_prefix_len = 128 if p.haslayer(IPv6) else 32
273             rule_l3_layer = IPv6 if p.haslayer(IPv6) else IP
274
275             if p.haslayer(UDP):
276                 rule_l4_sport = p[UDP].sport
277                 rule_l4_dport = p[UDP].dport
278             else:
279                 if p.haslayer(ICMP):
280                     rule_l4_sport = p[ICMP].type
281                     rule_l4_dport = p[ICMP].code
282                 else:
283                     rule_l4_sport = p[ICMPv6Unknown].type
284                     rule_l4_dport = p[ICMPv6Unknown].code
285             if p.haslayer(IPv6):
286                 rule_l4_proto = ulp_l4.overload_fields[IPv6]["nh"]
287             else:
288                 rule_l4_proto = p[IP].proto
289
290             new_rule = AclRule(
291                 is_permit=is_permit,
292                 proto=rule_l4_proto,
293                 src_prefix=ip_network((p[rule_l3_layer].src, rule_prefix_len)),
294                 dst_prefix=ip_network((p[rule_l3_layer].dst, rule_prefix_len)),
295                 sport_from=rule_l4_sport,
296                 sport_to=rule_l4_sport,
297                 dport_from=rule_l4_dport,
298                 dport_to=rule_l4_dport,
299             )
300
301             rules.append(new_rule)
302             new_rule_permit = copy.copy(new_rule)
303             new_rule_permit.is_permit = 1
304             permit_rules.append(new_rule_permit)
305
306             new_rule_permit_and_reflect = copy.copy(new_rule)
307             if can_reflect_this_packet:
308                 new_rule_permit_and_reflect.is_permit = 2
309             else:
310                 new_rule_permit_and_reflect.is_permit = is_permit
311
312             permit_and_reflect_rules.append(new_rule_permit_and_reflect)
313             self.logger.info("create_stream pkt#%d: %s" % (i, payload))
314
315         return {
316             "stream": pkts,
317             "rules": rules,
318             "permit_rules": permit_rules,
319             "permit_and_reflect_rules": permit_and_reflect_rules,
320         }
321
322     def verify_capture(self, dst_ip_if, src_ip_if, capture, reverse):
323         last_info = dict()
324         for i in self.interfaces:
325             last_info[i.sw_if_index] = None
326
327         dst_ip_sw_if_index = dst_ip_if.sw_if_index
328
329         for packet in capture:
330             l3 = IP if packet.haslayer(IP) else IPv6
331             ip = packet[l3]
332             if packet.haslayer(UDP):
333                 l4 = UDP
334             else:
335                 if packet.haslayer(ICMP):
336                     l4 = ICMP
337                 else:
338                     l4 = ICMPv6Unknown
339
340             # Scapy IPv6 stuff is too smart for its own good.
341             # So we do this and coerce the ICMP into unknown type
342             if packet.haslayer(UDP):
343                 data = scapy.compat.raw(packet[UDP][Raw])
344             else:
345                 if l3 == IP:
346                     data = scapy.compat.raw(
347                         ICMP(scapy.compat.raw(packet[l3].payload))[Raw]
348                     )
349                 else:
350                     data = scapy.compat.raw(
351                         ICMPv6Unknown(scapy.compat.raw(packet[l3].payload)).msgbody
352                     )
353             udp_or_icmp = packet[l3].payload
354             data_obj = Raw(data)
355             # FIXME: make framework believe we are on object
356             payload_info = self.payload_to_info(data_obj)
357             packet_index = payload_info.index
358
359             self.assertEqual(payload_info.dst, dst_ip_sw_if_index)
360
361             next_info = self.get_next_packet_info_for_interface2(
362                 payload_info.src, dst_ip_sw_if_index, last_info[payload_info.src]
363             )
364             last_info[payload_info.src] = next_info
365             self.assertTrue(next_info is not None)
366             self.assertEqual(packet_index, next_info.index)
367             saved_packet = next_info.data
368             self.assertTrue(next_info is not None)
369
370             # MAC: src, dst
371             if not reverse:
372                 self.assertEqual(packet.src, dst_ip_if.local_mac)
373                 host = dst_ip_if.host_by_mac(packet.dst)
374
375             # IP: src, dst
376             # self.assertEqual(ip.src, src_ip_if.remote_ip4)
377             if saved_packet is not None:
378                 self.assertEqual(ip.src, saved_packet[l3].src)
379                 self.assertEqual(ip.dst, saved_packet[l3].dst)
380                 if l4 == UDP:
381                     self.assertEqual(udp_or_icmp.sport, saved_packet[l4].sport)
382                     self.assertEqual(udp_or_icmp.dport, saved_packet[l4].dport)
383             # self.assertEqual(ip.dst, host.ip4)
384
385             # UDP:
386
387     def applied_acl_shuffle(self, acl_if):
388         saved_n_input = acl_if.n_input
389         # TOTO: maybe copy each one??
390         saved_acls = acl_if.acls
391
392         # now create a list of all the rules in all ACLs
393         all_rules = []
394         for old_acl in saved_acls:
395             for rule in old_acl.rules:
396                 all_rules.append(rule)
397
398         # Add a few ACLs made from shuffled rules
399         shuffle(all_rules)
400         acl1 = VppAcl(self, rules=all_rules[::2], tag="shuffle 1. acl")
401         acl1.add_vpp_config()
402
403         shuffle(all_rules)
404         acl2 = VppAcl(self, rules=all_rules[::3], tag="shuffle 2. acl")
405         acl2.add_vpp_config()
406
407         shuffle(all_rules)
408         acl3 = VppAcl(self, rules=all_rules[::2], tag="shuffle 3. acl")
409         acl3.add_vpp_config()
410
411         # apply the shuffle ACLs in front
412         input_acls = [acl1, acl2]
413         output_acls = [acl1, acl2]
414
415         # add the currently applied ACLs
416         n_input = acl_if.n_input
417         input_acls.extend(saved_acls[:n_input])
418         output_acls.extend(saved_acls[n_input:])
419
420         # and the trailing shuffle ACL(s)
421         input_acls.extend([acl3])
422         output_acls.extend([acl3])
423
424         # set the interface ACL list to the result
425         acl_if.n_input = len(input_acls)
426         acl_if.acls = input_acls + output_acls
427         acl_if.add_vpp_config()
428
429         # change the ACLs a few times
430         for i in range(1, 10):
431             shuffle(all_rules)
432             acl1.modify_vpp_config(all_rules[:: 1 + (i % 2)])
433
434             shuffle(all_rules)
435             acl2.modify_vpp_config(all_rules[:: 1 + (i % 3)])
436
437             shuffle(all_rules)
438             acl3.modify_vpp_config(all_rules[:: 1 + (i % 5)])
439
440         # restore to how it was before and clean up
441         acl_if.n_input = saved_n_input
442         acl_if.acls = saved_acls
443         acl_if.add_vpp_config()
444
445         acl1.remove_vpp_config()
446         acl2.remove_vpp_config()
447         acl3.remove_vpp_config()
448
449     def create_acls_for_a_stream(self, stream_dict, test_l2_action, is_reflect):
450         r = stream_dict["rules"]
451         r_permit = stream_dict["permit_rules"]
452         r_permit_reflect = stream_dict["permit_and_reflect_rules"]
453         r_action = r_permit_reflect if is_reflect else r
454         action_acl = VppAcl(self, rules=r_action, tag="act. acl")
455         action_acl.add_vpp_config()
456         permit_acl = VppAcl(self, rules=r_permit, tag="perm. acl")
457         permit_acl.add_vpp_config()
458
459         return {
460             "L2": action_acl if test_l2_action else permit_acl,
461             "L3": permit_acl if test_l2_action else action_acl,
462             "permit": permit_acl,
463             "action": action_acl,
464         }
465
466     def apply_acl_ip46_x_to_y(
467         self, bridged_to_routed, test_l2_deny, is_ip6, is_reflect, add_eh
468     ):
469         """Apply the ACLs"""
470         self.reset_packet_infos()
471         stream_dict = self.create_stream(
472             self.pg2,
473             self.loop0,
474             bridged_to_routed,
475             self.pg_if_packet_sizes,
476             is_ip6,
477             not is_reflect,
478             False,
479             add_eh,
480         )
481         stream = stream_dict["stream"]
482         acl_idx = self.create_acls_for_a_stream(stream_dict, test_l2_deny, is_reflect)
483         n_input_l3 = 0 if bridged_to_routed else 1
484         n_input_l2 = 1 if bridged_to_routed else 0
485
486         acl_if_pg2 = VppAclInterface(
487             self,
488             sw_if_index=self.pg2.sw_if_index,
489             n_input=n_input_l3,
490             acls=[acl_idx["L3"]],
491         )
492         acl_if_pg2.add_vpp_config()
493
494         acl_if_pg0 = VppAclInterface(
495             self,
496             sw_if_index=self.pg0.sw_if_index,
497             n_input=n_input_l2,
498             acls=[acl_idx["L2"]],
499         )
500         acl_if_pg0.add_vpp_config()
501
502         acl_if_pg1 = VppAclInterface(
503             self,
504             sw_if_index=self.pg1.sw_if_index,
505             n_input=n_input_l2,
506             acls=[acl_idx["L2"]],
507         )
508         acl_if_pg1.add_vpp_config()
509
510         self.applied_acl_shuffle(acl_if_pg0)
511         self.applied_acl_shuffle(acl_if_pg1)
512         return {"L2": acl_idx["L2"], "L3": acl_idx["L3"]}
513
514     def apply_acl_ip46_both_directions_reflect(
515         self, primary_is_bridged_to_routed, reflect_on_l2, is_ip6, add_eh, stateful_icmp
516     ):
517         primary_is_routed_to_bridged = not primary_is_bridged_to_routed
518         self.reset_packet_infos()
519         stream_dict_fwd = self.create_stream(
520             self.pg2,
521             self.loop0,
522             primary_is_bridged_to_routed,
523             self.pg_if_packet_sizes,
524             is_ip6,
525             False,
526             False,
527             add_eh,
528             stateful_icmp,
529         )
530         acl_idx_fwd = self.create_acls_for_a_stream(
531             stream_dict_fwd, reflect_on_l2, True
532         )
533
534         stream_dict_rev = self.create_stream(
535             self.pg2,
536             self.loop0,
537             not primary_is_bridged_to_routed,
538             self.pg_if_packet_sizes,
539             is_ip6,
540             True,
541             True,
542             add_eh,
543             stateful_icmp,
544         )
545         # We want the primary action to be "deny" rather than reflect
546         acl_idx_rev = self.create_acls_for_a_stream(
547             stream_dict_rev, reflect_on_l2, False
548         )
549
550         if primary_is_bridged_to_routed:
551             inbound_l2_acl = acl_idx_fwd["L2"]
552         else:
553             inbound_l2_acl = acl_idx_rev["L2"]
554
555         if primary_is_routed_to_bridged:
556             outbound_l2_acl = acl_idx_fwd["L2"]
557         else:
558             outbound_l2_acl = acl_idx_rev["L2"]
559
560         if primary_is_routed_to_bridged:
561             inbound_l3_acl = acl_idx_fwd["L3"]
562         else:
563             inbound_l3_acl = acl_idx_rev["L3"]
564
565         if primary_is_bridged_to_routed:
566             outbound_l3_acl = acl_idx_fwd["L3"]
567         else:
568             outbound_l3_acl = acl_idx_rev["L3"]
569
570         acl_if_pg2 = VppAclInterface(
571             self,
572             sw_if_index=self.pg2.sw_if_index,
573             n_input=1,
574             acls=[inbound_l3_acl, outbound_l3_acl],
575         )
576         acl_if_pg2.add_vpp_config()
577
578         acl_if_pg0 = VppAclInterface(
579             self,
580             sw_if_index=self.pg0.sw_if_index,
581             n_input=1,
582             acls=[inbound_l2_acl, outbound_l2_acl],
583         )
584         acl_if_pg0.add_vpp_config()
585
586         acl_if_pg1 = VppAclInterface(
587             self,
588             sw_if_index=self.pg1.sw_if_index,
589             n_input=1,
590             acls=[inbound_l2_acl, outbound_l2_acl],
591         )
592         acl_if_pg1.add_vpp_config()
593
594         self.applied_acl_shuffle(acl_if_pg0)
595         self.applied_acl_shuffle(acl_if_pg2)
596
597     def apply_acl_ip46_routed_to_bridged(
598         self, test_l2_deny, is_ip6, is_reflect, add_eh
599     ):
600         return self.apply_acl_ip46_x_to_y(
601             False, test_l2_deny, is_ip6, is_reflect, add_eh
602         )
603
604     def apply_acl_ip46_bridged_to_routed(
605         self, test_l2_deny, is_ip6, is_reflect, add_eh
606     ):
607         return self.apply_acl_ip46_x_to_y(
608             True, test_l2_deny, is_ip6, is_reflect, add_eh
609         )
610
611     def verify_acl_packet_count(self, acl_idx, packet_count):
612         matches = self.statistics.get_counter("/acl/%d/matches" % acl_idx)
613         self.logger.info("stat seg for ACL %d: %s" % (acl_idx, repr(matches)))
614         total_count = 0
615         for m in matches:
616             for p in m:
617                 total_count = total_count + p["packets"]
618         self.assertEqual(total_count, packet_count)
619
620     def run_traffic_ip46_x_to_y(
621         self,
622         bridged_to_routed,
623         test_l2_deny,
624         is_ip6,
625         is_reflect,
626         is_established,
627         add_eh,
628         stateful_icmp=False,
629     ):
630         self.reset_packet_infos()
631         stream_dict = self.create_stream(
632             self.pg2,
633             self.loop0,
634             bridged_to_routed,
635             self.pg_if_packet_sizes,
636             is_ip6,
637             not is_reflect,
638             is_established,
639             add_eh,
640             stateful_icmp,
641         )
642         stream = stream_dict["stream"]
643
644         tx_if = self.pg0 if bridged_to_routed else self.pg2
645         rx_if = self.pg2 if bridged_to_routed else self.pg0
646
647         tx_if.add_stream(stream)
648         self.pg_enable_capture(self.pg_interfaces)
649         self.pg_start()
650         packet_count = self.get_packet_count_for_if_idx(self.loop0.sw_if_index)
651         rcvd1 = rx_if.get_capture(packet_count)
652         self.verify_capture(self.loop0, self.pg2, rcvd1, bridged_to_routed)
653         return len(stream)
654
655     def run_traffic_ip46_routed_to_bridged(
656         self,
657         test_l2_deny,
658         is_ip6,
659         is_reflect,
660         is_established,
661         add_eh,
662         stateful_icmp=False,
663     ):
664         return self.run_traffic_ip46_x_to_y(
665             False,
666             test_l2_deny,
667             is_ip6,
668             is_reflect,
669             is_established,
670             add_eh,
671             stateful_icmp,
672         )
673
674     def run_traffic_ip46_bridged_to_routed(
675         self,
676         test_l2_deny,
677         is_ip6,
678         is_reflect,
679         is_established,
680         add_eh,
681         stateful_icmp=False,
682     ):
683         return self.run_traffic_ip46_x_to_y(
684             True,
685             test_l2_deny,
686             is_ip6,
687             is_reflect,
688             is_established,
689             add_eh,
690             stateful_icmp,
691         )
692
693     def run_test_ip46_routed_to_bridged(self, test_l2_deny, is_ip6, is_reflect, add_eh):
694         acls = self.apply_acl_ip46_routed_to_bridged(
695             test_l2_deny, is_ip6, is_reflect, add_eh
696         )
697         pkts = self.run_traffic_ip46_routed_to_bridged(
698             test_l2_deny, is_ip6, is_reflect, False, add_eh
699         )
700         self.verify_acl_packet_count(acls["L3"].acl_index, pkts)
701
702     def run_test_ip46_bridged_to_routed(self, test_l2_deny, is_ip6, is_reflect, add_eh):
703         acls = self.apply_acl_ip46_bridged_to_routed(
704             test_l2_deny, is_ip6, is_reflect, add_eh
705         )
706         pkts = self.run_traffic_ip46_bridged_to_routed(
707             test_l2_deny, is_ip6, is_reflect, False, add_eh
708         )
709         self.verify_acl_packet_count(acls["L2"].acl_index, pkts)
710
711     def run_test_ip46_routed_to_bridged_and_back(
712         self, test_l2_action, is_ip6, add_eh, stateful_icmp=False
713     ):
714         self.apply_acl_ip46_both_directions_reflect(
715             False, test_l2_action, is_ip6, add_eh, stateful_icmp
716         )
717         self.run_traffic_ip46_routed_to_bridged(
718             test_l2_action, is_ip6, True, False, add_eh, stateful_icmp
719         )
720         self.run_traffic_ip46_bridged_to_routed(
721             test_l2_action, is_ip6, False, True, add_eh, stateful_icmp
722         )
723
724     def run_test_ip46_bridged_to_routed_and_back(
725         self, test_l2_action, is_ip6, add_eh, stateful_icmp=False
726     ):
727         self.apply_acl_ip46_both_directions_reflect(
728             True, test_l2_action, is_ip6, add_eh, stateful_icmp
729         )
730         self.run_traffic_ip46_bridged_to_routed(
731             test_l2_action, is_ip6, True, False, add_eh, stateful_icmp
732         )
733         self.run_traffic_ip46_routed_to_bridged(
734             test_l2_action, is_ip6, False, True, add_eh, stateful_icmp
735         )
736
737     def test_0000_ip6_irb_1(self):
738         """ACL plugin prepare"""
739         if not self.vpp_dead:
740             cmd = "set acl-plugin session timeout udp idle 2000"
741             self.logger.info(self.vapi.ppcli(cmd))
742             # uncomment to not skip past the routing header
743             # and watch the EH tests fail
744             # self.logger.info(self.vapi.ppcli(
745             #    "set acl-plugin skip-ipv6-extension-header 43 0"))
746             # uncomment to test the session limit (stateful tests will fail)
747             # self.logger.info(self.vapi.ppcli(
748             #    "set acl-plugin session table max-entries 1"))
749             # new datapath is the default, but just in case
750             # self.logger.info(self.vapi.ppcli(
751             #    "set acl-plugin l2-datapath new"))
752             # If you want to see some tests fail, uncomment the next line
753             # self.logger.info(self.vapi.ppcli(
754             #    "set acl-plugin l2-datapath old"))
755
756     def test_0001_ip6_irb_1(self):
757         """ACL IPv6 routed -> bridged, L2 ACL deny"""
758         self.run_test_ip46_routed_to_bridged(True, True, False, self.WITHOUT_EH)
759
760     def test_0002_ip6_irb_1(self):
761         """ACL IPv6 routed -> bridged, L3 ACL deny"""
762         self.run_test_ip46_routed_to_bridged(False, True, False, self.WITHOUT_EH)
763
764     def test_0003_ip4_irb_1(self):
765         """ACL IPv4 routed -> bridged, L2 ACL deny"""
766         self.run_test_ip46_routed_to_bridged(True, False, False, self.WITHOUT_EH)
767
768     def test_0004_ip4_irb_1(self):
769         """ACL IPv4 routed -> bridged, L3 ACL deny"""
770         self.run_test_ip46_routed_to_bridged(False, False, False, self.WITHOUT_EH)
771
772     def test_0005_ip6_irb_1(self):
773         """ACL IPv6 bridged -> routed, L2 ACL deny"""
774         self.run_test_ip46_bridged_to_routed(True, True, False, self.WITHOUT_EH)
775
776     def test_0006_ip6_irb_1(self):
777         """ACL IPv6 bridged -> routed, L3 ACL deny"""
778         self.run_test_ip46_bridged_to_routed(False, True, False, self.WITHOUT_EH)
779
780     def test_0007_ip6_irb_1(self):
781         """ACL IPv4 bridged -> routed, L2 ACL deny"""
782         self.run_test_ip46_bridged_to_routed(True, False, False, self.WITHOUT_EH)
783
784     def test_0008_ip6_irb_1(self):
785         """ACL IPv4 bridged -> routed, L3 ACL deny"""
786         self.run_test_ip46_bridged_to_routed(False, False, False, self.WITHOUT_EH)
787
788     # Stateful ACL tests
789     def test_0101_ip6_irb_1(self):
790         """ACL IPv6 routed -> bridged, L2 ACL permit+reflect"""
791         self.run_test_ip46_routed_to_bridged_and_back(True, True, self.WITHOUT_EH)
792
793     def test_0102_ip6_irb_1(self):
794         """ACL IPv6 bridged -> routed, L2 ACL permit+reflect"""
795         self.run_test_ip46_bridged_to_routed_and_back(True, True, self.WITHOUT_EH)
796
797     def test_0103_ip6_irb_1(self):
798         """ACL IPv4 routed -> bridged, L2 ACL permit+reflect"""
799         self.run_test_ip46_routed_to_bridged_and_back(True, False, self.WITHOUT_EH)
800
801     def test_0104_ip6_irb_1(self):
802         """ACL IPv4 bridged -> routed, L2 ACL permit+reflect"""
803         self.run_test_ip46_bridged_to_routed_and_back(True, False, self.WITHOUT_EH)
804
805     def test_0111_ip6_irb_1(self):
806         """ACL IPv6 routed -> bridged, L3 ACL permit+reflect"""
807         self.run_test_ip46_routed_to_bridged_and_back(False, True, self.WITHOUT_EH)
808
809     def test_0112_ip6_irb_1(self):
810         """ACL IPv6 bridged -> routed, L3 ACL permit+reflect"""
811         self.run_test_ip46_bridged_to_routed_and_back(False, True, self.WITHOUT_EH)
812
813     def test_0113_ip6_irb_1(self):
814         """ACL IPv4 routed -> bridged, L3 ACL permit+reflect"""
815         self.run_test_ip46_routed_to_bridged_and_back(False, False, self.WITHOUT_EH)
816
817     def test_0114_ip6_irb_1(self):
818         """ACL IPv4 bridged -> routed, L3 ACL permit+reflect"""
819         self.run_test_ip46_bridged_to_routed_and_back(False, False, self.WITHOUT_EH)
820
821     # A block of tests with extension headers
822
823     def test_1001_ip6_irb_1(self):
824         """ACL IPv6+EH routed -> bridged, L2 ACL deny"""
825         self.run_test_ip46_routed_to_bridged(True, True, False, self.WITH_EH)
826
827     def test_1002_ip6_irb_1(self):
828         """ACL IPv6+EH routed -> bridged, L3 ACL deny"""
829         self.run_test_ip46_routed_to_bridged(False, True, False, self.WITH_EH)
830
831     def test_1005_ip6_irb_1(self):
832         """ACL IPv6+EH bridged -> routed, L2 ACL deny"""
833         self.run_test_ip46_bridged_to_routed(True, True, False, self.WITH_EH)
834
835     def test_1006_ip6_irb_1(self):
836         """ACL IPv6+EH bridged -> routed, L3 ACL deny"""
837         self.run_test_ip46_bridged_to_routed(False, True, False, self.WITH_EH)
838
839     def test_1101_ip6_irb_1(self):
840         """ACL IPv6+EH routed -> bridged, L2 ACL permit+reflect"""
841         self.run_test_ip46_routed_to_bridged_and_back(True, True, self.WITH_EH)
842
843     def test_1102_ip6_irb_1(self):
844         """ACL IPv6+EH bridged -> routed, L2 ACL permit+reflect"""
845         self.run_test_ip46_bridged_to_routed_and_back(True, True, self.WITH_EH)
846
847     def test_1111_ip6_irb_1(self):
848         """ACL IPv6+EH routed -> bridged, L3 ACL permit+reflect"""
849         self.run_test_ip46_routed_to_bridged_and_back(False, True, self.WITH_EH)
850
851     def test_1112_ip6_irb_1(self):
852         """ACL IPv6+EH bridged -> routed, L3 ACL permit+reflect"""
853         self.run_test_ip46_bridged_to_routed_and_back(False, True, self.WITH_EH)
854
855     # IPv4 with "MF" bit set
856
857     def test_1201_ip6_irb_1(self):
858         """ACL IPv4+MF routed -> bridged, L2 ACL deny"""
859         self.run_test_ip46_routed_to_bridged(True, False, False, self.WITH_EH)
860
861     def test_1202_ip6_irb_1(self):
862         """ACL IPv4+MF routed -> bridged, L3 ACL deny"""
863         self.run_test_ip46_routed_to_bridged(False, False, False, self.WITH_EH)
864
865     def test_1205_ip6_irb_1(self):
866         """ACL IPv4+MF bridged -> routed, L2 ACL deny"""
867         self.run_test_ip46_bridged_to_routed(True, False, False, self.WITH_EH)
868
869     def test_1206_ip6_irb_1(self):
870         """ACL IPv4+MF bridged -> routed, L3 ACL deny"""
871         self.run_test_ip46_bridged_to_routed(False, False, False, self.WITH_EH)
872
873     def test_1301_ip6_irb_1(self):
874         """ACL IPv4+MF routed -> bridged, L2 ACL permit+reflect"""
875         self.run_test_ip46_routed_to_bridged_and_back(True, False, self.WITH_EH)
876
877     def test_1302_ip6_irb_1(self):
878         """ACL IPv4+MF bridged -> routed, L2 ACL permit+reflect"""
879         self.run_test_ip46_bridged_to_routed_and_back(True, False, self.WITH_EH)
880
881     def test_1311_ip6_irb_1(self):
882         """ACL IPv4+MF routed -> bridged, L3 ACL permit+reflect"""
883         self.run_test_ip46_routed_to_bridged_and_back(False, False, self.WITH_EH)
884
885     def test_1312_ip6_irb_1(self):
886         """ACL IPv4+MF bridged -> routed, L3 ACL permit+reflect"""
887         self.run_test_ip46_bridged_to_routed_and_back(False, False, self.WITH_EH)
888
889     # Stateful ACL tests with stateful ICMP
890
891     def test_1401_ip6_irb_1(self):
892         """IPv6 routed -> bridged, L2 ACL permit+reflect, ICMP reflect"""
893         self.run_test_ip46_routed_to_bridged_and_back(
894             True, True, self.WITHOUT_EH, self.STATEFUL_ICMP
895         )
896
897     def test_1402_ip6_irb_1(self):
898         """IPv6 bridged -> routed, L2 ACL permit+reflect, ICMP reflect"""
899         self.run_test_ip46_bridged_to_routed_and_back(
900             True, True, self.WITHOUT_EH, self.STATEFUL_ICMP
901         )
902
903     def test_1403_ip4_irb_1(self):
904         """IPv4 routed -> bridged, L2 ACL permit+reflect, ICMP reflect"""
905         self.run_test_ip46_routed_to_bridged_and_back(
906             True, False, self.WITHOUT_EH, self.STATEFUL_ICMP
907         )
908
909     def test_1404_ip4_irb_1(self):
910         """IPv4 bridged -> routed, L2 ACL permit+reflect, ICMP reflect"""
911         self.run_test_ip46_bridged_to_routed_and_back(
912             True, False, self.WITHOUT_EH, self.STATEFUL_ICMP
913         )
914
915     def test_1411_ip6_irb_1(self):
916         """IPv6 routed -> bridged, L3 ACL permit+reflect, ICMP reflect"""
917         self.run_test_ip46_routed_to_bridged_and_back(
918             False, True, self.WITHOUT_EH, self.STATEFUL_ICMP
919         )
920
921     def test_1412_ip6_irb_1(self):
922         """IPv6 bridged -> routed, L3 ACL permit+reflect, ICMP reflect"""
923         self.run_test_ip46_bridged_to_routed_and_back(
924             False, True, self.WITHOUT_EH, self.STATEFUL_ICMP
925         )
926
927     def test_1413_ip4_irb_1(self):
928         """IPv4 routed -> bridged, L3 ACL permit+reflect, ICMP reflect"""
929         self.run_test_ip46_routed_to_bridged_and_back(
930             False, False, self.WITHOUT_EH, self.STATEFUL_ICMP
931         )
932
933     def test_1414_ip4_irb_1(self):
934         """IPv4 bridged -> routed, L3 ACL permit+reflect, ICMP reflect"""
935         self.run_test_ip46_bridged_to_routed_and_back(
936             False, False, self.WITHOUT_EH, self.STATEFUL_ICMP
937         )
938
939
940 if __name__ == "__main__":
941     unittest.main(testRunner=VppTestRunner)