build: add ability to disable some plugins from packaging and tests
[vpp.git] / test / test_acl_plugin_l2l3.py
1 #!/usr/bin/env python3
2 """ACL IRB Test Case HLD:
3
4 **config**
5     - L2 MAC learning enabled in l2bd
6     - 2 routed interfaces untagged, bvi (Bridge Virtual Interface)
7     - 2 bridged interfaces in l2bd with bvi
8
9 **test**
10     - sending ip4 eth pkts between routed interfaces
11         - 2 routed interfaces
12         - 2 bridged interfaces
13
14     - 64B, 512B, 1518B, 9200B (ether_size)
15
16     - burst of pkts per interface
17         - 257pkts per burst
18         - routed pkts hitting different FIB entries
19         - bridged pkts hitting different MAC entries
20
21 **verify**
22     - all packets received correctly
23
24 """
25
26 import copy
27 import unittest
28 from socket import inet_pton, AF_INET, AF_INET6
29 from random import choice, shuffle
30 from pprint import pprint
31 from ipaddress import ip_network
32 from config import config
33
34 import scapy.compat
35 from scapy.packet import Raw
36 from scapy.layers.l2 import Ether
37 from scapy.layers.inet import IP, UDP, ICMP, TCP
38 from scapy.layers.inet6 import IPv6, ICMPv6Unknown, ICMPv6EchoRequest
39 from scapy.layers.inet6 import ICMPv6EchoReply, IPv6ExtHdrRouting
40 from scapy.layers.inet6 import IPv6ExtHdrFragment
41
42 from framework import VppTestCase, VppTestRunner
43 from vpp_l2 import L2_PORT_TYPE
44 import time
45
46 from vpp_acl import AclRule, VppAcl, VppAclInterface
47
48
49 @unittest.skipIf("acl" in config.excluded_plugins, "Exclude ACL plugin tests")
50 class TestACLpluginL2L3(VppTestCase):
51     """TestACLpluginL2L3 Test Case"""
52
53     @classmethod
54     def setUpClass(cls):
55         """
56         #. Create BD with MAC learning enabled and put interfaces to this BD.
57         #. Configure IPv4 addresses on loopback interface and routed interface.
58         #. Configure MAC address binding to IPv4 neighbors on loop0.
59         #. Configure MAC address on pg2.
60         #. Loopback BVI interface has remote hosts, one half of hosts are
61            behind pg0 second behind pg1.
62         """
63         super(TestACLpluginL2L3, cls).setUpClass()
64
65         cls.pg_if_packet_sizes = [64, 512, 1518, 9018]  # packet sizes
66         cls.bd_id = 10
67         cls.remote_hosts_count = 250
68
69         # create 3 pg interfaces, 1 loopback interface
70         cls.create_pg_interfaces(range(3))
71         cls.create_loopback_interfaces(1)
72
73         cls.interfaces = list(cls.pg_interfaces)
74         cls.interfaces.extend(cls.lo_interfaces)
75
76         for i in cls.interfaces:
77             i.admin_up()
78
79         # Create BD with MAC learning enabled and put interfaces to this BD
80         cls.vapi.sw_interface_set_l2_bridge(
81             rx_sw_if_index=cls.loop0.sw_if_index,
82             bd_id=cls.bd_id,
83             port_type=L2_PORT_TYPE.BVI,
84         )
85         cls.vapi.sw_interface_set_l2_bridge(
86             rx_sw_if_index=cls.pg0.sw_if_index, bd_id=cls.bd_id
87         )
88         cls.vapi.sw_interface_set_l2_bridge(
89             rx_sw_if_index=cls.pg1.sw_if_index, bd_id=cls.bd_id
90         )
91
92         # Configure IPv4 addresses on loopback interface and routed interface
93         cls.loop0.config_ip4()
94         cls.loop0.config_ip6()
95         cls.pg2.config_ip4()
96         cls.pg2.config_ip6()
97
98         # Configure MAC address binding to IPv4 neighbors on loop0
99         cls.loop0.generate_remote_hosts(cls.remote_hosts_count)
100         cls.loop0.configure_ipv4_neighbors()
101         cls.loop0.configure_ipv6_neighbors()
102         # configure MAC address on pg2
103         cls.pg2.resolve_arp()
104         cls.pg2.resolve_ndp()
105
106         cls.WITHOUT_EH = False
107         cls.WITH_EH = True
108         cls.STATELESS_ICMP = False
109         cls.STATEFUL_ICMP = True
110
111         # Loopback BVI interface has remote hosts, one half of hosts are behind
112         # pg0 second behind pg1
113         half = cls.remote_hosts_count // 2
114         cls.pg0.remote_hosts = cls.loop0.remote_hosts[:half]
115         cls.pg1.remote_hosts = cls.loop0.remote_hosts[half:]
116         reply = cls.vapi.papi.acl_stats_intf_counters_enable(enable=1)
117
118     @classmethod
119     def tearDownClass(cls):
120         reply = cls.vapi.papi.acl_stats_intf_counters_enable(enable=0)
121         super(TestACLpluginL2L3, cls).tearDownClass()
122
123     def tearDown(self):
124         """Run standard test teardown and log ``show l2patch``,
125         ``show l2fib verbose``,``show bridge-domain <bd_id> detail``,
126         ``show ip neighbors``.
127         """
128         super(TestACLpluginL2L3, self).tearDown()
129
130     def show_commands_at_teardown(self):
131         self.logger.info(self.vapi.cli("show l2patch"))
132         self.logger.info(self.vapi.cli("show classify tables"))
133         self.logger.info(self.vapi.cli("show l2fib verbose"))
134         self.logger.info(self.vapi.cli("show bridge-domain %s detail" % self.bd_id))
135         self.logger.info(self.vapi.cli("show ip neighbors"))
136         cmd = "show acl-plugin sessions verbose 1"
137         self.logger.info(self.vapi.cli(cmd))
138         self.logger.info(self.vapi.cli("show acl-plugin acl"))
139         self.logger.info(self.vapi.cli("show acl-plugin interface"))
140         self.logger.info(self.vapi.cli("show acl-plugin tables"))
141
142     def create_stream(
143         self,
144         src_ip_if,
145         dst_ip_if,
146         reverse,
147         packet_sizes,
148         is_ip6,
149         expect_blocked,
150         expect_established,
151         add_extension_header,
152         icmp_stateful=False,
153     ):
154         pkts = []
155         rules = []
156         permit_rules = []
157         permit_and_reflect_rules = []
158         total_packet_count = 8
159         for i in range(0, total_packet_count):
160             modulo = (i // 2) % 2
161             icmp_type_delta = i % 2
162             icmp_code = i
163             is_udp_packet = modulo == 0
164             if is_udp_packet and icmp_stateful:
165                 continue
166             is_reflectable_icmp = (
167                 icmp_stateful and icmp_type_delta == 0 and not is_udp_packet
168             )
169             is_reflected_icmp = is_reflectable_icmp and expect_established
170             can_reflect_this_packet = is_udp_packet or is_reflectable_icmp
171             is_permit = i % 2
172             remote_dst_index = i % len(dst_ip_if.remote_hosts)
173             remote_dst_host = dst_ip_if.remote_hosts[remote_dst_index]
174             if is_permit == 1:
175                 info = self.create_packet_info(src_ip_if, dst_ip_if)
176                 payload = self.info_to_payload(info)
177             else:
178                 to_be_blocked = False
179                 if expect_blocked and not expect_established:
180                     to_be_blocked = True
181                 if not can_reflect_this_packet:
182                     to_be_blocked = True
183                 if to_be_blocked:
184                     payload = "to be blocked"
185                 else:
186                     info = self.create_packet_info(src_ip_if, dst_ip_if)
187                     payload = self.info_to_payload(info)
188             if reverse:
189                 dst_mac = "de:ad:00:00:00:00"
190                 src_mac = remote_dst_host._mac
191                 dst_ip6 = src_ip_if.remote_ip6
192                 src_ip6 = remote_dst_host.ip6
193                 dst_ip4 = src_ip_if.remote_ip4
194                 src_ip4 = remote_dst_host.ip4
195                 dst_l4 = 1234 + i
196                 src_l4 = 4321 + i
197             else:
198                 dst_mac = src_ip_if.local_mac
199                 src_mac = src_ip_if.remote_mac
200                 src_ip6 = src_ip_if.remote_ip6
201                 dst_ip6 = remote_dst_host.ip6
202                 src_ip4 = src_ip_if.remote_ip4
203                 dst_ip4 = remote_dst_host.ip4
204                 src_l4 = 1234 + i
205                 dst_l4 = 4321 + i
206             if is_reflected_icmp:
207                 icmp_type_delta = 1
208
209             # default ULP should be something we do not use in tests
210             ulp_l4 = TCP(sport=src_l4, dport=dst_l4)
211             # potentially a chain of protocols leading to ULP
212             ulp = ulp_l4
213
214             if is_udp_packet:
215                 if is_ip6:
216                     ulp_l4 = UDP(sport=src_l4, dport=dst_l4)
217                     if add_extension_header:
218                         # prepend some extension headers
219                         ulp = (
220                             IPv6ExtHdrRouting()
221                             / IPv6ExtHdrRouting()
222                             / IPv6ExtHdrFragment(offset=0, m=1)
223                             / ulp_l4
224                         )
225                         # uncomment below to test invalid ones
226                         # ulp = IPv6ExtHdrRouting(len = 200) / ulp_l4
227                     else:
228                         ulp = ulp_l4
229                     p = (
230                         Ether(dst=dst_mac, src=src_mac)
231                         / IPv6(src=src_ip6, dst=dst_ip6)
232                         / ulp
233                         / Raw(payload)
234                     )
235                 else:
236                     ulp_l4 = UDP(sport=src_l4, dport=dst_l4)
237                     # IPv4 does not allow extension headers,
238                     # but we rather make it a first fragment
239                     flags = 1 if add_extension_header else 0
240                     ulp = ulp_l4
241                     p = (
242                         Ether(dst=dst_mac, src=src_mac)
243                         / IP(src=src_ip4, dst=dst_ip4, frag=0, flags=flags)
244                         / ulp
245                         / Raw(payload)
246                     )
247             elif modulo == 1:
248                 if is_ip6:
249                     ulp_l4 = ICMPv6Unknown(type=128 + icmp_type_delta, code=icmp_code)
250                     ulp = ulp_l4
251                     p = (
252                         Ether(dst=dst_mac, src=src_mac)
253                         / IPv6(src=src_ip6, dst=dst_ip6)
254                         / ulp
255                         / Raw(payload)
256                     )
257                 else:
258                     ulp_l4 = ICMP(type=8 - 8 * icmp_type_delta, code=icmp_code)
259                     ulp = ulp_l4
260                     p = (
261                         Ether(dst=dst_mac, src=src_mac)
262                         / IP(src=src_ip4, dst=dst_ip4)
263                         / ulp
264                         / Raw(payload)
265                     )
266
267             if i % 2 == 1:
268                 info.data = p.copy()
269             size = packet_sizes[(i // 2) % len(packet_sizes)]
270             self.extend_packet(p, size)
271             pkts.append(p)
272
273             rule_family = AF_INET6 if p.haslayer(IPv6) else AF_INET
274             rule_prefix_len = 128 if p.haslayer(IPv6) else 32
275             rule_l3_layer = IPv6 if p.haslayer(IPv6) else IP
276
277             if p.haslayer(UDP):
278                 rule_l4_sport = p[UDP].sport
279                 rule_l4_dport = p[UDP].dport
280             else:
281                 if p.haslayer(ICMP):
282                     rule_l4_sport = p[ICMP].type
283                     rule_l4_dport = p[ICMP].code
284                 else:
285                     rule_l4_sport = p[ICMPv6Unknown].type
286                     rule_l4_dport = p[ICMPv6Unknown].code
287             if p.haslayer(IPv6):
288                 rule_l4_proto = ulp_l4.overload_fields[IPv6]["nh"]
289             else:
290                 rule_l4_proto = p[IP].proto
291
292             new_rule = AclRule(
293                 is_permit=is_permit,
294                 proto=rule_l4_proto,
295                 src_prefix=ip_network((p[rule_l3_layer].src, rule_prefix_len)),
296                 dst_prefix=ip_network((p[rule_l3_layer].dst, rule_prefix_len)),
297                 sport_from=rule_l4_sport,
298                 sport_to=rule_l4_sport,
299                 dport_from=rule_l4_dport,
300                 dport_to=rule_l4_dport,
301             )
302
303             rules.append(new_rule)
304             new_rule_permit = copy.copy(new_rule)
305             new_rule_permit.is_permit = 1
306             permit_rules.append(new_rule_permit)
307
308             new_rule_permit_and_reflect = copy.copy(new_rule)
309             if can_reflect_this_packet:
310                 new_rule_permit_and_reflect.is_permit = 2
311             else:
312                 new_rule_permit_and_reflect.is_permit = is_permit
313
314             permit_and_reflect_rules.append(new_rule_permit_and_reflect)
315             self.logger.info("create_stream pkt#%d: %s" % (i, payload))
316
317         return {
318             "stream": pkts,
319             "rules": rules,
320             "permit_rules": permit_rules,
321             "permit_and_reflect_rules": permit_and_reflect_rules,
322         }
323
324     def verify_capture(self, dst_ip_if, src_ip_if, capture, reverse):
325         last_info = dict()
326         for i in self.interfaces:
327             last_info[i.sw_if_index] = None
328
329         dst_ip_sw_if_index = dst_ip_if.sw_if_index
330
331         for packet in capture:
332             l3 = IP if packet.haslayer(IP) else IPv6
333             ip = packet[l3]
334             if packet.haslayer(UDP):
335                 l4 = UDP
336             else:
337                 if packet.haslayer(ICMP):
338                     l4 = ICMP
339                 else:
340                     l4 = ICMPv6Unknown
341
342             # Scapy IPv6 stuff is too smart for its own good.
343             # So we do this and coerce the ICMP into unknown type
344             if packet.haslayer(UDP):
345                 data = scapy.compat.raw(packet[UDP][Raw])
346             else:
347                 if l3 == IP:
348                     data = scapy.compat.raw(
349                         ICMP(scapy.compat.raw(packet[l3].payload))[Raw]
350                     )
351                 else:
352                     data = scapy.compat.raw(
353                         ICMPv6Unknown(scapy.compat.raw(packet[l3].payload)).msgbody
354                     )
355             udp_or_icmp = packet[l3].payload
356             data_obj = Raw(data)
357             # FIXME: make framework believe we are on object
358             payload_info = self.payload_to_info(data_obj)
359             packet_index = payload_info.index
360
361             self.assertEqual(payload_info.dst, dst_ip_sw_if_index)
362
363             next_info = self.get_next_packet_info_for_interface2(
364                 payload_info.src, dst_ip_sw_if_index, last_info[payload_info.src]
365             )
366             last_info[payload_info.src] = next_info
367             self.assertTrue(next_info is not None)
368             self.assertEqual(packet_index, next_info.index)
369             saved_packet = next_info.data
370             self.assertTrue(next_info is not None)
371
372             # MAC: src, dst
373             if not reverse:
374                 self.assertEqual(packet.src, dst_ip_if.local_mac)
375                 host = dst_ip_if.host_by_mac(packet.dst)
376
377             # IP: src, dst
378             # self.assertEqual(ip.src, src_ip_if.remote_ip4)
379             if saved_packet is not None:
380                 self.assertEqual(ip.src, saved_packet[l3].src)
381                 self.assertEqual(ip.dst, saved_packet[l3].dst)
382                 if l4 == UDP:
383                     self.assertEqual(udp_or_icmp.sport, saved_packet[l4].sport)
384                     self.assertEqual(udp_or_icmp.dport, saved_packet[l4].dport)
385             # self.assertEqual(ip.dst, host.ip4)
386
387             # UDP:
388
389     def applied_acl_shuffle(self, acl_if):
390         saved_n_input = acl_if.n_input
391         # TOTO: maybe copy each one??
392         saved_acls = acl_if.acls
393
394         # now create a list of all the rules in all ACLs
395         all_rules = []
396         for old_acl in saved_acls:
397             for rule in old_acl.rules:
398                 all_rules.append(rule)
399
400         # Add a few ACLs made from shuffled rules
401         shuffle(all_rules)
402         acl1 = VppAcl(self, rules=all_rules[::2], tag="shuffle 1. acl")
403         acl1.add_vpp_config()
404
405         shuffle(all_rules)
406         acl2 = VppAcl(self, rules=all_rules[::3], tag="shuffle 2. acl")
407         acl2.add_vpp_config()
408
409         shuffle(all_rules)
410         acl3 = VppAcl(self, rules=all_rules[::2], tag="shuffle 3. acl")
411         acl3.add_vpp_config()
412
413         # apply the shuffle ACLs in front
414         input_acls = [acl1, acl2]
415         output_acls = [acl1, acl2]
416
417         # add the currently applied ACLs
418         n_input = acl_if.n_input
419         input_acls.extend(saved_acls[:n_input])
420         output_acls.extend(saved_acls[n_input:])
421
422         # and the trailing shuffle ACL(s)
423         input_acls.extend([acl3])
424         output_acls.extend([acl3])
425
426         # set the interface ACL list to the result
427         acl_if.n_input = len(input_acls)
428         acl_if.acls = input_acls + output_acls
429         acl_if.add_vpp_config()
430
431         # change the ACLs a few times
432         for i in range(1, 10):
433             shuffle(all_rules)
434             acl1.modify_vpp_config(all_rules[:: 1 + (i % 2)])
435
436             shuffle(all_rules)
437             acl2.modify_vpp_config(all_rules[:: 1 + (i % 3)])
438
439             shuffle(all_rules)
440             acl3.modify_vpp_config(all_rules[:: 1 + (i % 5)])
441
442         # restore to how it was before and clean up
443         acl_if.n_input = saved_n_input
444         acl_if.acls = saved_acls
445         acl_if.add_vpp_config()
446
447         acl1.remove_vpp_config()
448         acl2.remove_vpp_config()
449         acl3.remove_vpp_config()
450
451     def create_acls_for_a_stream(self, stream_dict, test_l2_action, is_reflect):
452         r = stream_dict["rules"]
453         r_permit = stream_dict["permit_rules"]
454         r_permit_reflect = stream_dict["permit_and_reflect_rules"]
455         r_action = r_permit_reflect if is_reflect else r
456         action_acl = VppAcl(self, rules=r_action, tag="act. acl")
457         action_acl.add_vpp_config()
458         permit_acl = VppAcl(self, rules=r_permit, tag="perm. acl")
459         permit_acl.add_vpp_config()
460
461         return {
462             "L2": action_acl if test_l2_action else permit_acl,
463             "L3": permit_acl if test_l2_action else action_acl,
464             "permit": permit_acl,
465             "action": action_acl,
466         }
467
468     def apply_acl_ip46_x_to_y(
469         self, bridged_to_routed, test_l2_deny, is_ip6, is_reflect, add_eh
470     ):
471         """Apply the ACLs"""
472         self.reset_packet_infos()
473         stream_dict = self.create_stream(
474             self.pg2,
475             self.loop0,
476             bridged_to_routed,
477             self.pg_if_packet_sizes,
478             is_ip6,
479             not is_reflect,
480             False,
481             add_eh,
482         )
483         stream = stream_dict["stream"]
484         acl_idx = self.create_acls_for_a_stream(stream_dict, test_l2_deny, is_reflect)
485         n_input_l3 = 0 if bridged_to_routed else 1
486         n_input_l2 = 1 if bridged_to_routed else 0
487
488         acl_if_pg2 = VppAclInterface(
489             self,
490             sw_if_index=self.pg2.sw_if_index,
491             n_input=n_input_l3,
492             acls=[acl_idx["L3"]],
493         )
494         acl_if_pg2.add_vpp_config()
495
496         acl_if_pg0 = VppAclInterface(
497             self,
498             sw_if_index=self.pg0.sw_if_index,
499             n_input=n_input_l2,
500             acls=[acl_idx["L2"]],
501         )
502         acl_if_pg0.add_vpp_config()
503
504         acl_if_pg1 = VppAclInterface(
505             self,
506             sw_if_index=self.pg1.sw_if_index,
507             n_input=n_input_l2,
508             acls=[acl_idx["L2"]],
509         )
510         acl_if_pg1.add_vpp_config()
511
512         self.applied_acl_shuffle(acl_if_pg0)
513         self.applied_acl_shuffle(acl_if_pg1)
514         return {"L2": acl_idx["L2"], "L3": acl_idx["L3"]}
515
516     def apply_acl_ip46_both_directions_reflect(
517         self, primary_is_bridged_to_routed, reflect_on_l2, is_ip6, add_eh, stateful_icmp
518     ):
519         primary_is_routed_to_bridged = not primary_is_bridged_to_routed
520         self.reset_packet_infos()
521         stream_dict_fwd = self.create_stream(
522             self.pg2,
523             self.loop0,
524             primary_is_bridged_to_routed,
525             self.pg_if_packet_sizes,
526             is_ip6,
527             False,
528             False,
529             add_eh,
530             stateful_icmp,
531         )
532         acl_idx_fwd = self.create_acls_for_a_stream(
533             stream_dict_fwd, reflect_on_l2, True
534         )
535
536         stream_dict_rev = self.create_stream(
537             self.pg2,
538             self.loop0,
539             not primary_is_bridged_to_routed,
540             self.pg_if_packet_sizes,
541             is_ip6,
542             True,
543             True,
544             add_eh,
545             stateful_icmp,
546         )
547         # We want the primary action to be "deny" rather than reflect
548         acl_idx_rev = self.create_acls_for_a_stream(
549             stream_dict_rev, reflect_on_l2, False
550         )
551
552         if primary_is_bridged_to_routed:
553             inbound_l2_acl = acl_idx_fwd["L2"]
554         else:
555             inbound_l2_acl = acl_idx_rev["L2"]
556
557         if primary_is_routed_to_bridged:
558             outbound_l2_acl = acl_idx_fwd["L2"]
559         else:
560             outbound_l2_acl = acl_idx_rev["L2"]
561
562         if primary_is_routed_to_bridged:
563             inbound_l3_acl = acl_idx_fwd["L3"]
564         else:
565             inbound_l3_acl = acl_idx_rev["L3"]
566
567         if primary_is_bridged_to_routed:
568             outbound_l3_acl = acl_idx_fwd["L3"]
569         else:
570             outbound_l3_acl = acl_idx_rev["L3"]
571
572         acl_if_pg2 = VppAclInterface(
573             self,
574             sw_if_index=self.pg2.sw_if_index,
575             n_input=1,
576             acls=[inbound_l3_acl, outbound_l3_acl],
577         )
578         acl_if_pg2.add_vpp_config()
579
580         acl_if_pg0 = VppAclInterface(
581             self,
582             sw_if_index=self.pg0.sw_if_index,
583             n_input=1,
584             acls=[inbound_l2_acl, outbound_l2_acl],
585         )
586         acl_if_pg0.add_vpp_config()
587
588         acl_if_pg1 = VppAclInterface(
589             self,
590             sw_if_index=self.pg1.sw_if_index,
591             n_input=1,
592             acls=[inbound_l2_acl, outbound_l2_acl],
593         )
594         acl_if_pg1.add_vpp_config()
595
596         self.applied_acl_shuffle(acl_if_pg0)
597         self.applied_acl_shuffle(acl_if_pg2)
598
599     def apply_acl_ip46_routed_to_bridged(
600         self, test_l2_deny, is_ip6, is_reflect, add_eh
601     ):
602         return self.apply_acl_ip46_x_to_y(
603             False, test_l2_deny, is_ip6, is_reflect, add_eh
604         )
605
606     def apply_acl_ip46_bridged_to_routed(
607         self, test_l2_deny, is_ip6, is_reflect, add_eh
608     ):
609         return self.apply_acl_ip46_x_to_y(
610             True, test_l2_deny, is_ip6, is_reflect, add_eh
611         )
612
613     def verify_acl_packet_count(self, acl_idx, packet_count):
614         matches = self.statistics.get_counter("/acl/%d/matches" % acl_idx)
615         self.logger.info("stat seg for ACL %d: %s" % (acl_idx, repr(matches)))
616         total_count = 0
617         for m in matches:
618             for p in m:
619                 total_count = total_count + p["packets"]
620         self.assertEqual(total_count, packet_count)
621
622     def run_traffic_ip46_x_to_y(
623         self,
624         bridged_to_routed,
625         test_l2_deny,
626         is_ip6,
627         is_reflect,
628         is_established,
629         add_eh,
630         stateful_icmp=False,
631     ):
632         self.reset_packet_infos()
633         stream_dict = self.create_stream(
634             self.pg2,
635             self.loop0,
636             bridged_to_routed,
637             self.pg_if_packet_sizes,
638             is_ip6,
639             not is_reflect,
640             is_established,
641             add_eh,
642             stateful_icmp,
643         )
644         stream = stream_dict["stream"]
645
646         tx_if = self.pg0 if bridged_to_routed else self.pg2
647         rx_if = self.pg2 if bridged_to_routed else self.pg0
648
649         tx_if.add_stream(stream)
650         self.pg_enable_capture(self.pg_interfaces)
651         self.pg_start()
652         packet_count = self.get_packet_count_for_if_idx(self.loop0.sw_if_index)
653         rcvd1 = rx_if.get_capture(packet_count)
654         self.verify_capture(self.loop0, self.pg2, rcvd1, bridged_to_routed)
655         return len(stream)
656
657     def run_traffic_ip46_routed_to_bridged(
658         self,
659         test_l2_deny,
660         is_ip6,
661         is_reflect,
662         is_established,
663         add_eh,
664         stateful_icmp=False,
665     ):
666         return self.run_traffic_ip46_x_to_y(
667             False,
668             test_l2_deny,
669             is_ip6,
670             is_reflect,
671             is_established,
672             add_eh,
673             stateful_icmp,
674         )
675
676     def run_traffic_ip46_bridged_to_routed(
677         self,
678         test_l2_deny,
679         is_ip6,
680         is_reflect,
681         is_established,
682         add_eh,
683         stateful_icmp=False,
684     ):
685         return self.run_traffic_ip46_x_to_y(
686             True,
687             test_l2_deny,
688             is_ip6,
689             is_reflect,
690             is_established,
691             add_eh,
692             stateful_icmp,
693         )
694
695     def run_test_ip46_routed_to_bridged(self, test_l2_deny, is_ip6, is_reflect, add_eh):
696         acls = self.apply_acl_ip46_routed_to_bridged(
697             test_l2_deny, is_ip6, is_reflect, add_eh
698         )
699         pkts = self.run_traffic_ip46_routed_to_bridged(
700             test_l2_deny, is_ip6, is_reflect, False, add_eh
701         )
702         self.verify_acl_packet_count(acls["L3"].acl_index, pkts)
703
704     def run_test_ip46_bridged_to_routed(self, test_l2_deny, is_ip6, is_reflect, add_eh):
705         acls = self.apply_acl_ip46_bridged_to_routed(
706             test_l2_deny, is_ip6, is_reflect, add_eh
707         )
708         pkts = self.run_traffic_ip46_bridged_to_routed(
709             test_l2_deny, is_ip6, is_reflect, False, add_eh
710         )
711         self.verify_acl_packet_count(acls["L2"].acl_index, pkts)
712
713     def run_test_ip46_routed_to_bridged_and_back(
714         self, test_l2_action, is_ip6, add_eh, stateful_icmp=False
715     ):
716         self.apply_acl_ip46_both_directions_reflect(
717             False, test_l2_action, is_ip6, add_eh, stateful_icmp
718         )
719         self.run_traffic_ip46_routed_to_bridged(
720             test_l2_action, is_ip6, True, False, add_eh, stateful_icmp
721         )
722         self.run_traffic_ip46_bridged_to_routed(
723             test_l2_action, is_ip6, False, True, add_eh, stateful_icmp
724         )
725
726     def run_test_ip46_bridged_to_routed_and_back(
727         self, test_l2_action, is_ip6, add_eh, stateful_icmp=False
728     ):
729         self.apply_acl_ip46_both_directions_reflect(
730             True, test_l2_action, is_ip6, add_eh, stateful_icmp
731         )
732         self.run_traffic_ip46_bridged_to_routed(
733             test_l2_action, is_ip6, True, False, add_eh, stateful_icmp
734         )
735         self.run_traffic_ip46_routed_to_bridged(
736             test_l2_action, is_ip6, False, True, add_eh, stateful_icmp
737         )
738
739     def test_0000_ip6_irb_1(self):
740         """ACL plugin prepare"""
741         if not self.vpp_dead:
742             cmd = "set acl-plugin session timeout udp idle 2000"
743             self.logger.info(self.vapi.ppcli(cmd))
744             # uncomment to not skip past the routing header
745             # and watch the EH tests fail
746             # self.logger.info(self.vapi.ppcli(
747             #    "set acl-plugin skip-ipv6-extension-header 43 0"))
748             # uncomment to test the session limit (stateful tests will fail)
749             # self.logger.info(self.vapi.ppcli(
750             #    "set acl-plugin session table max-entries 1"))
751             # new datapath is the default, but just in case
752             # self.logger.info(self.vapi.ppcli(
753             #    "set acl-plugin l2-datapath new"))
754             # If you want to see some tests fail, uncomment the next line
755             # self.logger.info(self.vapi.ppcli(
756             #    "set acl-plugin l2-datapath old"))
757
758     def test_0001_ip6_irb_1(self):
759         """ACL IPv6 routed -> bridged, L2 ACL deny"""
760         self.run_test_ip46_routed_to_bridged(True, True, False, self.WITHOUT_EH)
761
762     def test_0002_ip6_irb_1(self):
763         """ACL IPv6 routed -> bridged, L3 ACL deny"""
764         self.run_test_ip46_routed_to_bridged(False, True, False, self.WITHOUT_EH)
765
766     def test_0003_ip4_irb_1(self):
767         """ACL IPv4 routed -> bridged, L2 ACL deny"""
768         self.run_test_ip46_routed_to_bridged(True, False, False, self.WITHOUT_EH)
769
770     def test_0004_ip4_irb_1(self):
771         """ACL IPv4 routed -> bridged, L3 ACL deny"""
772         self.run_test_ip46_routed_to_bridged(False, False, False, self.WITHOUT_EH)
773
774     def test_0005_ip6_irb_1(self):
775         """ACL IPv6 bridged -> routed, L2 ACL deny"""
776         self.run_test_ip46_bridged_to_routed(True, True, False, self.WITHOUT_EH)
777
778     def test_0006_ip6_irb_1(self):
779         """ACL IPv6 bridged -> routed, L3 ACL deny"""
780         self.run_test_ip46_bridged_to_routed(False, True, False, self.WITHOUT_EH)
781
782     def test_0007_ip6_irb_1(self):
783         """ACL IPv4 bridged -> routed, L2 ACL deny"""
784         self.run_test_ip46_bridged_to_routed(True, False, False, self.WITHOUT_EH)
785
786     def test_0008_ip6_irb_1(self):
787         """ACL IPv4 bridged -> routed, L3 ACL deny"""
788         self.run_test_ip46_bridged_to_routed(False, False, False, self.WITHOUT_EH)
789
790     # Stateful ACL tests
791     def test_0101_ip6_irb_1(self):
792         """ACL IPv6 routed -> bridged, L2 ACL permit+reflect"""
793         self.run_test_ip46_routed_to_bridged_and_back(True, True, self.WITHOUT_EH)
794
795     def test_0102_ip6_irb_1(self):
796         """ACL IPv6 bridged -> routed, L2 ACL permit+reflect"""
797         self.run_test_ip46_bridged_to_routed_and_back(True, True, self.WITHOUT_EH)
798
799     def test_0103_ip6_irb_1(self):
800         """ACL IPv4 routed -> bridged, L2 ACL permit+reflect"""
801         self.run_test_ip46_routed_to_bridged_and_back(True, False, self.WITHOUT_EH)
802
803     def test_0104_ip6_irb_1(self):
804         """ACL IPv4 bridged -> routed, L2 ACL permit+reflect"""
805         self.run_test_ip46_bridged_to_routed_and_back(True, False, self.WITHOUT_EH)
806
807     def test_0111_ip6_irb_1(self):
808         """ACL IPv6 routed -> bridged, L3 ACL permit+reflect"""
809         self.run_test_ip46_routed_to_bridged_and_back(False, True, self.WITHOUT_EH)
810
811     def test_0112_ip6_irb_1(self):
812         """ACL IPv6 bridged -> routed, L3 ACL permit+reflect"""
813         self.run_test_ip46_bridged_to_routed_and_back(False, True, self.WITHOUT_EH)
814
815     def test_0113_ip6_irb_1(self):
816         """ACL IPv4 routed -> bridged, L3 ACL permit+reflect"""
817         self.run_test_ip46_routed_to_bridged_and_back(False, False, self.WITHOUT_EH)
818
819     def test_0114_ip6_irb_1(self):
820         """ACL IPv4 bridged -> routed, L3 ACL permit+reflect"""
821         self.run_test_ip46_bridged_to_routed_and_back(False, False, self.WITHOUT_EH)
822
823     # A block of tests with extension headers
824
825     def test_1001_ip6_irb_1(self):
826         """ACL IPv6+EH routed -> bridged, L2 ACL deny"""
827         self.run_test_ip46_routed_to_bridged(True, True, False, self.WITH_EH)
828
829     def test_1002_ip6_irb_1(self):
830         """ACL IPv6+EH routed -> bridged, L3 ACL deny"""
831         self.run_test_ip46_routed_to_bridged(False, True, False, self.WITH_EH)
832
833     def test_1005_ip6_irb_1(self):
834         """ACL IPv6+EH bridged -> routed, L2 ACL deny"""
835         self.run_test_ip46_bridged_to_routed(True, True, False, self.WITH_EH)
836
837     def test_1006_ip6_irb_1(self):
838         """ACL IPv6+EH bridged -> routed, L3 ACL deny"""
839         self.run_test_ip46_bridged_to_routed(False, True, False, self.WITH_EH)
840
841     def test_1101_ip6_irb_1(self):
842         """ACL IPv6+EH routed -> bridged, L2 ACL permit+reflect"""
843         self.run_test_ip46_routed_to_bridged_and_back(True, True, self.WITH_EH)
844
845     def test_1102_ip6_irb_1(self):
846         """ACL IPv6+EH bridged -> routed, L2 ACL permit+reflect"""
847         self.run_test_ip46_bridged_to_routed_and_back(True, True, self.WITH_EH)
848
849     def test_1111_ip6_irb_1(self):
850         """ACL IPv6+EH routed -> bridged, L3 ACL permit+reflect"""
851         self.run_test_ip46_routed_to_bridged_and_back(False, True, self.WITH_EH)
852
853     def test_1112_ip6_irb_1(self):
854         """ACL IPv6+EH bridged -> routed, L3 ACL permit+reflect"""
855         self.run_test_ip46_bridged_to_routed_and_back(False, True, self.WITH_EH)
856
857     # IPv4 with "MF" bit set
858
859     def test_1201_ip6_irb_1(self):
860         """ACL IPv4+MF routed -> bridged, L2 ACL deny"""
861         self.run_test_ip46_routed_to_bridged(True, False, False, self.WITH_EH)
862
863     def test_1202_ip6_irb_1(self):
864         """ACL IPv4+MF routed -> bridged, L3 ACL deny"""
865         self.run_test_ip46_routed_to_bridged(False, False, False, self.WITH_EH)
866
867     def test_1205_ip6_irb_1(self):
868         """ACL IPv4+MF bridged -> routed, L2 ACL deny"""
869         self.run_test_ip46_bridged_to_routed(True, False, False, self.WITH_EH)
870
871     def test_1206_ip6_irb_1(self):
872         """ACL IPv4+MF bridged -> routed, L3 ACL deny"""
873         self.run_test_ip46_bridged_to_routed(False, False, False, self.WITH_EH)
874
875     def test_1301_ip6_irb_1(self):
876         """ACL IPv4+MF routed -> bridged, L2 ACL permit+reflect"""
877         self.run_test_ip46_routed_to_bridged_and_back(True, False, self.WITH_EH)
878
879     def test_1302_ip6_irb_1(self):
880         """ACL IPv4+MF bridged -> routed, L2 ACL permit+reflect"""
881         self.run_test_ip46_bridged_to_routed_and_back(True, False, self.WITH_EH)
882
883     def test_1311_ip6_irb_1(self):
884         """ACL IPv4+MF routed -> bridged, L3 ACL permit+reflect"""
885         self.run_test_ip46_routed_to_bridged_and_back(False, False, self.WITH_EH)
886
887     def test_1312_ip6_irb_1(self):
888         """ACL IPv4+MF bridged -> routed, L3 ACL permit+reflect"""
889         self.run_test_ip46_bridged_to_routed_and_back(False, False, self.WITH_EH)
890
891     # Stateful ACL tests with stateful ICMP
892
893     def test_1401_ip6_irb_1(self):
894         """IPv6 routed -> bridged, L2 ACL permit+reflect, ICMP reflect"""
895         self.run_test_ip46_routed_to_bridged_and_back(
896             True, True, self.WITHOUT_EH, self.STATEFUL_ICMP
897         )
898
899     def test_1402_ip6_irb_1(self):
900         """IPv6 bridged -> routed, L2 ACL permit+reflect, ICMP reflect"""
901         self.run_test_ip46_bridged_to_routed_and_back(
902             True, True, self.WITHOUT_EH, self.STATEFUL_ICMP
903         )
904
905     def test_1403_ip4_irb_1(self):
906         """IPv4 routed -> bridged, L2 ACL permit+reflect, ICMP reflect"""
907         self.run_test_ip46_routed_to_bridged_and_back(
908             True, False, self.WITHOUT_EH, self.STATEFUL_ICMP
909         )
910
911     def test_1404_ip4_irb_1(self):
912         """IPv4 bridged -> routed, L2 ACL permit+reflect, ICMP reflect"""
913         self.run_test_ip46_bridged_to_routed_and_back(
914             True, False, self.WITHOUT_EH, self.STATEFUL_ICMP
915         )
916
917     def test_1411_ip6_irb_1(self):
918         """IPv6 routed -> bridged, L3 ACL permit+reflect, ICMP reflect"""
919         self.run_test_ip46_routed_to_bridged_and_back(
920             False, True, self.WITHOUT_EH, self.STATEFUL_ICMP
921         )
922
923     def test_1412_ip6_irb_1(self):
924         """IPv6 bridged -> routed, L3 ACL permit+reflect, ICMP reflect"""
925         self.run_test_ip46_bridged_to_routed_and_back(
926             False, True, self.WITHOUT_EH, self.STATEFUL_ICMP
927         )
928
929     def test_1413_ip4_irb_1(self):
930         """IPv4 routed -> bridged, L3 ACL permit+reflect, ICMP reflect"""
931         self.run_test_ip46_routed_to_bridged_and_back(
932             False, False, self.WITHOUT_EH, self.STATEFUL_ICMP
933         )
934
935     def test_1414_ip4_irb_1(self):
936         """IPv4 bridged -> routed, L3 ACL permit+reflect, ICMP reflect"""
937         self.run_test_ip46_bridged_to_routed_and_back(
938             False, False, self.WITHOUT_EH, self.STATEFUL_ICMP
939         )
940
941
942 if __name__ == "__main__":
943     unittest.main(testRunner=VppTestRunner)