acl: API cleanup
[vpp.git] / src / plugins / acl / test / test_acl_plugin_l2l3.py
1 #!/usr/bin/env python3
2 """ACL IRB Test Case HLD:
3
4 **config**
5     - L2 MAC learning enabled in l2bd
6     - 2 routed interfaces untagged, bvi (Bridge Virtual Interface)
7     - 2 bridged interfaces in l2bd with bvi
8
9 **test**
10     - sending ip4 eth pkts between routed interfaces
11         - 2 routed interfaces
12         - 2 bridged interfaces
13
14     - 64B, 512B, 1518B, 9200B (ether_size)
15
16     - burst of pkts per interface
17         - 257pkts per burst
18         - routed pkts hitting different FIB entries
19         - bridged pkts hitting different MAC entries
20
21 **verify**
22     - all packets received correctly
23
24 """
25
26 import copy
27 import unittest
28 from socket import inet_pton, AF_INET, AF_INET6
29 from random import choice, shuffle
30 from pprint import pprint
31 from ipaddress import ip_network
32
33 import scapy.compat
34 from scapy.packet import Raw
35 from scapy.layers.l2 import Ether
36 from scapy.layers.inet import IP, UDP, ICMP, TCP
37 from scapy.layers.inet6 import IPv6, ICMPv6Unknown, ICMPv6EchoRequest
38 from scapy.layers.inet6 import ICMPv6EchoReply, IPv6ExtHdrRouting
39 from scapy.layers.inet6 import IPv6ExtHdrFragment
40
41 from framework import VppTestCase, VppTestRunner
42 from vpp_l2 import L2_PORT_TYPE
43 import time
44
45 from vpp_acl import AclRule, VppAcl, VppAclInterface
46
47
48 class TestACLpluginL2L3(VppTestCase):
49     """TestACLpluginL2L3 Test Case"""
50
51     @classmethod
52     def setUpClass(cls):
53         """
54         #. Create BD with MAC learning enabled and put interfaces to this BD.
55         #. Configure IPv4 addresses on loopback interface and routed interface.
56         #. Configure MAC address binding to IPv4 neighbors on loop0.
57         #. Configure MAC address on pg2.
58         #. Loopback BVI interface has remote hosts, one half of hosts are
59            behind pg0 second behind pg1.
60         """
61         super(TestACLpluginL2L3, cls).setUpClass()
62
63         cls.pg_if_packet_sizes = [64, 512, 1518, 9018]  # packet sizes
64         cls.bd_id = 10
65         cls.remote_hosts_count = 250
66
67         # create 3 pg interfaces, 1 loopback interface
68         cls.create_pg_interfaces(range(3))
69         cls.create_loopback_interfaces(1)
70
71         cls.interfaces = list(cls.pg_interfaces)
72         cls.interfaces.extend(cls.lo_interfaces)
73
74         for i in cls.interfaces:
75             i.admin_up()
76
77         # Create BD with MAC learning enabled and put interfaces to this BD
78         cls.vapi.sw_interface_set_l2_bridge(
79             rx_sw_if_index=cls.loop0.sw_if_index, bd_id=cls.bd_id,
80             port_type=L2_PORT_TYPE.BVI)
81         cls.vapi.sw_interface_set_l2_bridge(rx_sw_if_index=cls.pg0.sw_if_index,
82                                             bd_id=cls.bd_id)
83         cls.vapi.sw_interface_set_l2_bridge(rx_sw_if_index=cls.pg1.sw_if_index,
84                                             bd_id=cls.bd_id)
85
86         # Configure IPv4 addresses on loopback interface and routed interface
87         cls.loop0.config_ip4()
88         cls.loop0.config_ip6()
89         cls.pg2.config_ip4()
90         cls.pg2.config_ip6()
91
92         # Configure MAC address binding to IPv4 neighbors on loop0
93         cls.loop0.generate_remote_hosts(cls.remote_hosts_count)
94         cls.loop0.configure_ipv4_neighbors()
95         cls.loop0.configure_ipv6_neighbors()
96         # configure MAC address on pg2
97         cls.pg2.resolve_arp()
98         cls.pg2.resolve_ndp()
99
100         cls.WITHOUT_EH = False
101         cls.WITH_EH = True
102         cls.STATELESS_ICMP = False
103         cls.STATEFUL_ICMP = True
104
105         # Loopback BVI interface has remote hosts, one half of hosts are behind
106         # pg0 second behind pg1
107         half = cls.remote_hosts_count // 2
108         cls.pg0.remote_hosts = cls.loop0.remote_hosts[:half]
109         cls.pg1.remote_hosts = cls.loop0.remote_hosts[half:]
110         reply = cls.vapi.papi.acl_stats_intf_counters_enable(enable=1)
111
112     @classmethod
113     def tearDownClass(cls):
114         reply = cls.vapi.papi.acl_stats_intf_counters_enable(enable=0)
115         super(TestACLpluginL2L3, cls).tearDownClass()
116
117     def tearDown(self):
118         """Run standard test teardown and log ``show l2patch``,
119         ``show l2fib verbose``,``show bridge-domain <bd_id> detail``,
120         ``show ip neighbors``.
121         """
122         super(TestACLpluginL2L3, self).tearDown()
123
124     def show_commands_at_teardown(self):
125         self.logger.info(self.vapi.cli("show l2patch"))
126         self.logger.info(self.vapi.cli("show classify tables"))
127         self.logger.info(self.vapi.cli("show l2fib verbose"))
128         self.logger.info(self.vapi.cli("show bridge-domain %s detail" %
129                                        self.bd_id))
130         self.logger.info(self.vapi.cli("show ip neighbors"))
131         cmd = "show acl-plugin sessions verbose 1"
132         self.logger.info(self.vapi.cli(cmd))
133         self.logger.info(self.vapi.cli("show acl-plugin acl"))
134         self.logger.info(self.vapi.cli("show acl-plugin interface"))
135         self.logger.info(self.vapi.cli("show acl-plugin tables"))
136
137     def create_stream(self, src_ip_if, dst_ip_if, reverse, packet_sizes,
138                       is_ip6, expect_blocked, expect_established,
139                       add_extension_header, icmp_stateful=False):
140         pkts = []
141         rules = []
142         permit_rules = []
143         permit_and_reflect_rules = []
144         total_packet_count = 8
145         for i in range(0, total_packet_count):
146             modulo = (i//2) % 2
147             icmp_type_delta = i % 2
148             icmp_code = i
149             is_udp_packet = (modulo == 0)
150             if is_udp_packet and icmp_stateful:
151                 continue
152             is_reflectable_icmp = (icmp_stateful and icmp_type_delta == 0 and
153                                    not is_udp_packet)
154             is_reflected_icmp = is_reflectable_icmp and expect_established
155             can_reflect_this_packet = is_udp_packet or is_reflectable_icmp
156             is_permit = i % 2
157             remote_dst_index = i % len(dst_ip_if.remote_hosts)
158             remote_dst_host = dst_ip_if.remote_hosts[remote_dst_index]
159             if is_permit == 1:
160                 info = self.create_packet_info(src_ip_if, dst_ip_if)
161                 payload = self.info_to_payload(info)
162             else:
163                 to_be_blocked = False
164                 if (expect_blocked and not expect_established):
165                     to_be_blocked = True
166                 if (not can_reflect_this_packet):
167                     to_be_blocked = True
168                 if to_be_blocked:
169                     payload = "to be blocked"
170                 else:
171                     info = self.create_packet_info(src_ip_if, dst_ip_if)
172                     payload = self.info_to_payload(info)
173             if reverse:
174                 dst_mac = 'de:ad:00:00:00:00'
175                 src_mac = remote_dst_host._mac
176                 dst_ip6 = src_ip_if.remote_ip6
177                 src_ip6 = remote_dst_host.ip6
178                 dst_ip4 = src_ip_if.remote_ip4
179                 src_ip4 = remote_dst_host.ip4
180                 dst_l4 = 1234 + i
181                 src_l4 = 4321 + i
182             else:
183                 dst_mac = src_ip_if.local_mac
184                 src_mac = src_ip_if.remote_mac
185                 src_ip6 = src_ip_if.remote_ip6
186                 dst_ip6 = remote_dst_host.ip6
187                 src_ip4 = src_ip_if.remote_ip4
188                 dst_ip4 = remote_dst_host.ip4
189                 src_l4 = 1234 + i
190                 dst_l4 = 4321 + i
191             if is_reflected_icmp:
192                 icmp_type_delta = 1
193
194             # default ULP should be something we do not use in tests
195             ulp_l4 = TCP(sport=src_l4, dport=dst_l4)
196             # potentially a chain of protocols leading to ULP
197             ulp = ulp_l4
198
199             if is_udp_packet:
200                 if is_ip6:
201                     ulp_l4 = UDP(sport=src_l4, dport=dst_l4)
202                     if add_extension_header:
203                         # prepend some extension headers
204                         ulp = (IPv6ExtHdrRouting() / IPv6ExtHdrRouting() /
205                                IPv6ExtHdrFragment(offset=0, m=1) / ulp_l4)
206                         # uncomment below to test invalid ones
207                         # ulp = IPv6ExtHdrRouting(len = 200) / ulp_l4
208                     else:
209                         ulp = ulp_l4
210                     p = (Ether(dst=dst_mac, src=src_mac) /
211                          IPv6(src=src_ip6, dst=dst_ip6) /
212                          ulp /
213                          Raw(payload))
214                 else:
215                     ulp_l4 = UDP(sport=src_l4, dport=dst_l4)
216                     # IPv4 does not allow extension headers,
217                     # but we rather make it a first fragment
218                     flags = 1 if add_extension_header else 0
219                     ulp = ulp_l4
220                     p = (Ether(dst=dst_mac, src=src_mac) /
221                          IP(src=src_ip4, dst=dst_ip4, frag=0, flags=flags) /
222                          ulp /
223                          Raw(payload))
224             elif modulo == 1:
225                 if is_ip6:
226                     ulp_l4 = ICMPv6Unknown(type=128 + icmp_type_delta,
227                                            code=icmp_code)
228                     ulp = ulp_l4
229                     p = (Ether(dst=dst_mac, src=src_mac) /
230                          IPv6(src=src_ip6, dst=dst_ip6) /
231                          ulp /
232                          Raw(payload))
233                 else:
234                     ulp_l4 = ICMP(type=8 - 8*icmp_type_delta, code=icmp_code)
235                     ulp = ulp_l4
236                     p = (Ether(dst=dst_mac, src=src_mac) /
237                          IP(src=src_ip4, dst=dst_ip4) /
238                          ulp /
239                          Raw(payload))
240
241             if i % 2 == 1:
242                 info.data = p.copy()
243             size = packet_sizes[(i // 2) % len(packet_sizes)]
244             self.extend_packet(p, size)
245             pkts.append(p)
246
247             rule_family = AF_INET6 if p.haslayer(IPv6) else AF_INET
248             rule_prefix_len = 128 if p.haslayer(IPv6) else 32
249             rule_l3_layer = IPv6 if p.haslayer(IPv6) else IP
250
251             if p.haslayer(UDP):
252                 rule_l4_sport = p[UDP].sport
253                 rule_l4_dport = p[UDP].dport
254             else:
255                 if p.haslayer(ICMP):
256                     rule_l4_sport = p[ICMP].type
257                     rule_l4_dport = p[ICMP].code
258                 else:
259                     rule_l4_sport = p[ICMPv6Unknown].type
260                     rule_l4_dport = p[ICMPv6Unknown].code
261             if p.haslayer(IPv6):
262                 rule_l4_proto = ulp_l4.overload_fields[IPv6]['nh']
263             else:
264                 rule_l4_proto = p[IP].proto
265
266             new_rule = AclRule(is_permit=is_permit, proto=rule_l4_proto,
267                                src_prefix=ip_network(
268                                    (p[rule_l3_layer].src, rule_prefix_len)),
269                                dst_prefix=ip_network(
270                                    (p[rule_l3_layer].dst, rule_prefix_len)))
271             new_rule.sport_from = rule_l4_sport
272             new_rule.sport_fo = rule_l4_sport
273             new_rule.dport_from = rule_l4_dport
274             new_rule.dport_to = rule_l4_dport
275
276             rules.append(new_rule)
277             new_rule_permit = copy.copy(new_rule)
278             new_rule_permit.is_permit = 1
279             permit_rules.append(new_rule_permit)
280
281             new_rule_permit_and_reflect = copy.copy(new_rule)
282             if can_reflect_this_packet:
283                 new_rule_permit_and_reflect.is_permit = 2
284             else:
285                 new_rule_permit_and_reflect.is_permit = is_permit
286
287             permit_and_reflect_rules.append(new_rule_permit_and_reflect)
288             self.logger.info("create_stream pkt#%d: %s" % (i, payload))
289
290         return {'stream': pkts,
291                 'rules': rules,
292                 'permit_rules': permit_rules,
293                 'permit_and_reflect_rules': permit_and_reflect_rules}
294
295     def verify_capture(self, dst_ip_if, src_ip_if, capture, reverse):
296         last_info = dict()
297         for i in self.interfaces:
298             last_info[i.sw_if_index] = None
299
300         dst_ip_sw_if_index = dst_ip_if.sw_if_index
301
302         for packet in capture:
303             l3 = IP if packet.haslayer(IP) else IPv6
304             ip = packet[l3]
305             if packet.haslayer(UDP):
306                 l4 = UDP
307             else:
308                 if packet.haslayer(ICMP):
309                     l4 = ICMP
310                 else:
311                     l4 = ICMPv6Unknown
312
313             # Scapy IPv6 stuff is too smart for its own good.
314             # So we do this and coerce the ICMP into unknown type
315             if packet.haslayer(UDP):
316                 data = scapy.compat.raw(packet[UDP][Raw])
317             else:
318                 if l3 == IP:
319                     data = scapy.compat.raw(ICMP(
320                         scapy.compat.raw(packet[l3].payload))[Raw])
321                 else:
322                     data = scapy.compat.raw(ICMPv6Unknown(
323                         scapy.compat.raw(packet[l3].payload)).msgbody)
324             udp_or_icmp = packet[l3].payload
325             data_obj = Raw(data)
326             # FIXME: make framework believe we are on object
327             payload_info = self.payload_to_info(data_obj)
328             packet_index = payload_info.index
329
330             self.assertEqual(payload_info.dst, dst_ip_sw_if_index)
331
332             next_info = self.get_next_packet_info_for_interface2(
333                 payload_info.src, dst_ip_sw_if_index,
334                 last_info[payload_info.src])
335             last_info[payload_info.src] = next_info
336             self.assertTrue(next_info is not None)
337             self.assertEqual(packet_index, next_info.index)
338             saved_packet = next_info.data
339             self.assertTrue(next_info is not None)
340
341             # MAC: src, dst
342             if not reverse:
343                 self.assertEqual(packet.src, dst_ip_if.local_mac)
344                 host = dst_ip_if.host_by_mac(packet.dst)
345
346             # IP: src, dst
347             # self.assertEqual(ip.src, src_ip_if.remote_ip4)
348             if saved_packet is not None:
349                 self.assertEqual(ip.src, saved_packet[l3].src)
350                 self.assertEqual(ip.dst, saved_packet[l3].dst)
351                 if l4 == UDP:
352                     self.assertEqual(udp_or_icmp.sport, saved_packet[l4].sport)
353                     self.assertEqual(udp_or_icmp.dport, saved_packet[l4].dport)
354             # self.assertEqual(ip.dst, host.ip4)
355
356             # UDP:
357
358     def applied_acl_shuffle(self, acl_if):
359         saved_n_input = acl_if.n_input
360         # TOTO: maybe copy each one??
361         saved_acls = acl_if.acls
362
363         # now create a list of all the rules in all ACLs
364         all_rules = []
365         for old_acl in saved_acls:
366             for rule in old_acl.rules:
367                 all_rules.append(rule)
368
369         # Add a few ACLs made from shuffled rules
370         shuffle(all_rules)
371         acl1 = VppAcl(self, rules=all_rules[::2], tag="shuffle 1. acl")
372         acl1.add_vpp_config()
373
374         shuffle(all_rules)
375         acl2 = VppAcl(self, rules=all_rules[::3], tag="shuffle 2. acl")
376         acl2.add_vpp_config()
377
378         shuffle(all_rules)
379         acl3 = VppAcl(self, rules=all_rules[::2], tag="shuffle 3. acl")
380         acl3.add_vpp_config()
381
382         # apply the shuffle ACLs in front
383         input_acls = [acl1, acl2]
384         output_acls = [acl1, acl2]
385
386         # add the currently applied ACLs
387         n_input = acl_if.n_input
388         input_acls.extend(saved_acls[:n_input])
389         output_acls.extend(saved_acls[n_input:])
390
391         # and the trailing shuffle ACL(s)
392         input_acls.extend([acl3])
393         output_acls.extend([acl3])
394
395         # set the interface ACL list to the result
396         acl_if.n_input = len(input_acls)
397         acl_if.acls = input_acls + output_acls
398         acl_if.add_vpp_config()
399
400         # change the ACLs a few times
401         for i in range(1, 10):
402             shuffle(all_rules)
403             acl1.rules = all_rules[::1+(i % 2)]
404             acl1.add_vpp_config()
405
406             shuffle(all_rules)
407             acl2.rules = all_rules[::1+(i % 3)]
408             acl2.add_vpp_config()
409
410             shuffle(all_rules)
411             acl3.rules = all_rules[::1+(i % 5)]
412             acl3.add_vpp_config()
413
414         # restore to how it was before and clean up
415         acl_if.n_input = saved_n_input
416         acl_if.acls = saved_acls
417         acl_if.add_vpp_config()
418
419         acl1.remove_vpp_config()
420         acl2.remove_vpp_config()
421         acl3.remove_vpp_config()
422
423     def create_acls_for_a_stream(self, stream_dict,
424                                  test_l2_action, is_reflect):
425         r = stream_dict['rules']
426         r_permit = stream_dict['permit_rules']
427         r_permit_reflect = stream_dict['permit_and_reflect_rules']
428         r_action = r_permit_reflect if is_reflect else r
429         action_acl = VppAcl(self, rules=r_action, tag="act. acl")
430         action_acl.add_vpp_config()
431         permit_acl = VppAcl(self, rules=r_permit, tag="perm. acl")
432         permit_acl.add_vpp_config()
433
434         return {'L2': action_acl if test_l2_action else permit_acl,
435                 'L3': permit_acl if test_l2_action else action_acl,
436                 'permit': permit_acl, 'action': action_acl}
437
438     def apply_acl_ip46_x_to_y(self, bridged_to_routed, test_l2_deny,
439                               is_ip6, is_reflect, add_eh):
440         """ Apply the ACLs
441         """
442         self.reset_packet_infos()
443         stream_dict = self.create_stream(
444             self.pg2, self.loop0,
445             bridged_to_routed,
446             self.pg_if_packet_sizes, is_ip6,
447             not is_reflect, False, add_eh)
448         stream = stream_dict['stream']
449         acl_idx = self.create_acls_for_a_stream(stream_dict, test_l2_deny,
450                                                 is_reflect)
451         n_input_l3 = 0 if bridged_to_routed else 1
452         n_input_l2 = 1 if bridged_to_routed else 0
453
454         acl_if_pg2 = VppAclInterface(self, sw_if_index=self.pg2.sw_if_index,
455                                      n_input=n_input_l3, acls=[acl_idx['L3']])
456         acl_if_pg2.add_vpp_config()
457
458         acl_if_pg0 = VppAclInterface(self, sw_if_index=self.pg0.sw_if_index,
459                                      n_input=n_input_l2, acls=[acl_idx['L2']])
460         acl_if_pg0.add_vpp_config()
461
462         acl_if_pg1 = VppAclInterface(self, sw_if_index=self.pg1.sw_if_index,
463                                      n_input=n_input_l2, acls=[acl_idx['L2']])
464         acl_if_pg1.add_vpp_config()
465
466         self.applied_acl_shuffle(acl_if_pg0)
467         self.applied_acl_shuffle(acl_if_pg1)
468         return {'L2': acl_idx['L2'], 'L3': acl_idx['L3']}
469
470     def apply_acl_ip46_both_directions_reflect(self,
471                                                primary_is_bridged_to_routed,
472                                                reflect_on_l2, is_ip6, add_eh,
473                                                stateful_icmp):
474         primary_is_routed_to_bridged = not primary_is_bridged_to_routed
475         self.reset_packet_infos()
476         stream_dict_fwd = self.create_stream(self.pg2, self.loop0,
477                                              primary_is_bridged_to_routed,
478                                              self.pg_if_packet_sizes, is_ip6,
479                                              False, False, add_eh,
480                                              stateful_icmp)
481         acl_idx_fwd = self.create_acls_for_a_stream(stream_dict_fwd,
482                                                     reflect_on_l2, True)
483
484         stream_dict_rev = self.create_stream(self.pg2, self.loop0,
485                                              not primary_is_bridged_to_routed,
486                                              self.pg_if_packet_sizes, is_ip6,
487                                              True, True, add_eh, stateful_icmp)
488         # We want the primary action to be "deny" rather than reflect
489         acl_idx_rev = self.create_acls_for_a_stream(stream_dict_rev,
490                                                     reflect_on_l2, False)
491
492         if primary_is_bridged_to_routed:
493             inbound_l2_acl = acl_idx_fwd['L2']
494         else:
495             inbound_l2_acl = acl_idx_rev['L2']
496
497         if primary_is_routed_to_bridged:
498             outbound_l2_acl = acl_idx_fwd['L2']
499         else:
500             outbound_l2_acl = acl_idx_rev['L2']
501
502         if primary_is_routed_to_bridged:
503             inbound_l3_acl = acl_idx_fwd['L3']
504         else:
505             inbound_l3_acl = acl_idx_rev['L3']
506
507         if primary_is_bridged_to_routed:
508             outbound_l3_acl = acl_idx_fwd['L3']
509         else:
510             outbound_l3_acl = acl_idx_rev['L3']
511
512         acl_if_pg2 = VppAclInterface(self, sw_if_index=self.pg2.sw_if_index,
513                                      n_input=1,
514                                      acls=[inbound_l3_acl, outbound_l3_acl])
515         acl_if_pg2.add_vpp_config()
516
517         acl_if_pg0 = VppAclInterface(self, sw_if_index=self.pg0.sw_if_index,
518                                      n_input=1,
519                                      acls=[inbound_l2_acl, outbound_l2_acl])
520         acl_if_pg0.add_vpp_config()
521
522         acl_if_pg1 = VppAclInterface(self, sw_if_index=self.pg1.sw_if_index,
523                                      n_input=1,
524                                      acls=[inbound_l2_acl, outbound_l2_acl])
525         acl_if_pg1.add_vpp_config()
526
527         self.applied_acl_shuffle(acl_if_pg0)
528         self.applied_acl_shuffle(acl_if_pg2)
529
530     def apply_acl_ip46_routed_to_bridged(self, test_l2_deny, is_ip6,
531                                          is_reflect, add_eh):
532         return self.apply_acl_ip46_x_to_y(False, test_l2_deny, is_ip6,
533                                           is_reflect, add_eh)
534
535     def apply_acl_ip46_bridged_to_routed(self, test_l2_deny, is_ip6,
536                                          is_reflect, add_eh):
537         return self.apply_acl_ip46_x_to_y(True, test_l2_deny, is_ip6,
538                                           is_reflect, add_eh)
539
540     def verify_acl_packet_count(self, acl_idx, packet_count):
541         matches = self.statistics.get_counter('/acl/%d/matches' % acl_idx)
542         self.logger.info("stat seg for ACL %d: %s" % (acl_idx, repr(matches)))
543         total_count = 0
544         for p in matches[0]:
545             total_count = total_count + p['packets']
546         self.assertEqual(total_count, packet_count)
547
548     def run_traffic_ip46_x_to_y(self, bridged_to_routed,
549                                 test_l2_deny, is_ip6,
550                                 is_reflect, is_established, add_eh,
551                                 stateful_icmp=False):
552         self.reset_packet_infos()
553         stream_dict = self.create_stream(self.pg2, self.loop0,
554                                          bridged_to_routed,
555                                          self.pg_if_packet_sizes, is_ip6,
556                                          not is_reflect, is_established,
557                                          add_eh, stateful_icmp)
558         stream = stream_dict['stream']
559
560         tx_if = self.pg0 if bridged_to_routed else self.pg2
561         rx_if = self.pg2 if bridged_to_routed else self.pg0
562
563         tx_if.add_stream(stream)
564         self.pg_enable_capture(self.pg_interfaces)
565         self.pg_start()
566         packet_count = self.get_packet_count_for_if_idx(self.loop0.sw_if_index)
567         rcvd1 = rx_if.get_capture(packet_count)
568         self.verify_capture(self.loop0, self.pg2, rcvd1, bridged_to_routed)
569         return len(stream)
570
571     def run_traffic_ip46_routed_to_bridged(self, test_l2_deny, is_ip6,
572                                            is_reflect, is_established, add_eh,
573                                            stateful_icmp=False):
574         return self.run_traffic_ip46_x_to_y(False, test_l2_deny, is_ip6,
575                                             is_reflect, is_established, add_eh,
576                                             stateful_icmp)
577
578     def run_traffic_ip46_bridged_to_routed(self, test_l2_deny, is_ip6,
579                                            is_reflect, is_established, add_eh,
580                                            stateful_icmp=False):
581         return self.run_traffic_ip46_x_to_y(True, test_l2_deny, is_ip6,
582                                             is_reflect, is_established, add_eh,
583                                             stateful_icmp)
584
585     def run_test_ip46_routed_to_bridged(self, test_l2_deny,
586                                         is_ip6, is_reflect, add_eh):
587         acls = self.apply_acl_ip46_routed_to_bridged(test_l2_deny,
588                                                      is_ip6, is_reflect,
589                                                      add_eh)
590         pkts = self.run_traffic_ip46_routed_to_bridged(test_l2_deny, is_ip6,
591                                                        is_reflect, False,
592                                                        add_eh)
593         self.verify_acl_packet_count(acls['L3'].acl_index, pkts)
594
595     def run_test_ip46_bridged_to_routed(self, test_l2_deny,
596                                         is_ip6, is_reflect, add_eh):
597         acls = self.apply_acl_ip46_bridged_to_routed(test_l2_deny,
598                                                      is_ip6, is_reflect,
599                                                      add_eh)
600         pkts = self.run_traffic_ip46_bridged_to_routed(test_l2_deny, is_ip6,
601                                                        is_reflect, False,
602                                                        add_eh)
603         self.verify_acl_packet_count(acls['L2'].acl_index, pkts)
604
605     def run_test_ip46_routed_to_bridged_and_back(self, test_l2_action,
606                                                  is_ip6, add_eh,
607                                                  stateful_icmp=False):
608         self.apply_acl_ip46_both_directions_reflect(False, test_l2_action,
609                                                     is_ip6, add_eh,
610                                                     stateful_icmp)
611         self.run_traffic_ip46_routed_to_bridged(test_l2_action, is_ip6,
612                                                 True, False, add_eh,
613                                                 stateful_icmp)
614         self.run_traffic_ip46_bridged_to_routed(test_l2_action, is_ip6,
615                                                 False, True, add_eh,
616                                                 stateful_icmp)
617
618     def run_test_ip46_bridged_to_routed_and_back(self, test_l2_action,
619                                                  is_ip6, add_eh,
620                                                  stateful_icmp=False):
621         self.apply_acl_ip46_both_directions_reflect(True, test_l2_action,
622                                                     is_ip6, add_eh,
623                                                     stateful_icmp)
624         self.run_traffic_ip46_bridged_to_routed(test_l2_action, is_ip6,
625                                                 True, False, add_eh,
626                                                 stateful_icmp)
627         self.run_traffic_ip46_routed_to_bridged(test_l2_action, is_ip6,
628                                                 False, True, add_eh,
629                                                 stateful_icmp)
630
631     def test_0000_ip6_irb_1(self):
632         """ ACL plugin prepare"""
633         if not self.vpp_dead:
634             cmd = "set acl-plugin session timeout udp idle 2000"
635             self.logger.info(self.vapi.ppcli(cmd))
636             # uncomment to not skip past the routing header
637             # and watch the EH tests fail
638             # self.logger.info(self.vapi.ppcli(
639             #    "set acl-plugin skip-ipv6-extension-header 43 0"))
640             # uncomment to test the session limit (stateful tests will fail)
641             # self.logger.info(self.vapi.ppcli(
642             #    "set acl-plugin session table max-entries 1"))
643             # new datapath is the default, but just in case
644             # self.logger.info(self.vapi.ppcli(
645             #    "set acl-plugin l2-datapath new"))
646             # If you want to see some tests fail, uncomment the next line
647             # self.logger.info(self.vapi.ppcli(
648             #    "set acl-plugin l2-datapath old"))
649
650     def test_0001_ip6_irb_1(self):
651         """ ACL IPv6 routed -> bridged, L2 ACL deny"""
652         self.run_test_ip46_routed_to_bridged(True, True, False,
653                                              self.WITHOUT_EH)
654
655     def test_0002_ip6_irb_1(self):
656         """ ACL IPv6 routed -> bridged, L3 ACL deny"""
657         self.run_test_ip46_routed_to_bridged(False, True, False,
658                                              self.WITHOUT_EH)
659
660     def test_0003_ip4_irb_1(self):
661         """ ACL IPv4 routed -> bridged, L2 ACL deny"""
662         self.run_test_ip46_routed_to_bridged(True, False, False,
663                                              self.WITHOUT_EH)
664
665     def test_0004_ip4_irb_1(self):
666         """ ACL IPv4 routed -> bridged, L3 ACL deny"""
667         self.run_test_ip46_routed_to_bridged(False, False, False,
668                                              self.WITHOUT_EH)
669
670     def test_0005_ip6_irb_1(self):
671         """ ACL IPv6 bridged -> routed, L2 ACL deny """
672         self.run_test_ip46_bridged_to_routed(True, True, False,
673                                              self.WITHOUT_EH)
674
675     def test_0006_ip6_irb_1(self):
676         """ ACL IPv6 bridged -> routed, L3 ACL deny """
677         self.run_test_ip46_bridged_to_routed(False, True, False,
678                                              self.WITHOUT_EH)
679
680     def test_0007_ip6_irb_1(self):
681         """ ACL IPv4 bridged -> routed, L2 ACL deny """
682         self.run_test_ip46_bridged_to_routed(True, False, False,
683                                              self.WITHOUT_EH)
684
685     def test_0008_ip6_irb_1(self):
686         """ ACL IPv4 bridged -> routed, L3 ACL deny """
687         self.run_test_ip46_bridged_to_routed(False, False, False,
688                                              self.WITHOUT_EH)
689
690     # Stateful ACL tests
691     def test_0101_ip6_irb_1(self):
692         """ ACL IPv6 routed -> bridged, L2 ACL permit+reflect"""
693         self.run_test_ip46_routed_to_bridged_and_back(True, True,
694                                                       self.WITHOUT_EH)
695
696     def test_0102_ip6_irb_1(self):
697         """ ACL IPv6 bridged -> routed, L2 ACL permit+reflect"""
698         self.run_test_ip46_bridged_to_routed_and_back(True, True,
699                                                       self.WITHOUT_EH)
700
701     def test_0103_ip6_irb_1(self):
702         """ ACL IPv4 routed -> bridged, L2 ACL permit+reflect"""
703         self.run_test_ip46_routed_to_bridged_and_back(True, False,
704                                                       self.WITHOUT_EH)
705
706     def test_0104_ip6_irb_1(self):
707         """ ACL IPv4 bridged -> routed, L2 ACL permit+reflect"""
708         self.run_test_ip46_bridged_to_routed_and_back(True, False,
709                                                       self.WITHOUT_EH)
710
711     def test_0111_ip6_irb_1(self):
712         """ ACL IPv6 routed -> bridged, L3 ACL permit+reflect"""
713         self.run_test_ip46_routed_to_bridged_and_back(False, True,
714                                                       self.WITHOUT_EH)
715
716     def test_0112_ip6_irb_1(self):
717         """ ACL IPv6 bridged -> routed, L3 ACL permit+reflect"""
718         self.run_test_ip46_bridged_to_routed_and_back(False, True,
719                                                       self.WITHOUT_EH)
720
721     def test_0113_ip6_irb_1(self):
722         """ ACL IPv4 routed -> bridged, L3 ACL permit+reflect"""
723         self.run_test_ip46_routed_to_bridged_and_back(False, False,
724                                                       self.WITHOUT_EH)
725
726     def test_0114_ip6_irb_1(self):
727         """ ACL IPv4 bridged -> routed, L3 ACL permit+reflect"""
728         self.run_test_ip46_bridged_to_routed_and_back(False, False,
729                                                       self.WITHOUT_EH)
730
731     # A block of tests with extension headers
732
733     def test_1001_ip6_irb_1(self):
734         """ ACL IPv6+EH routed -> bridged, L2 ACL deny"""
735         self.run_test_ip46_routed_to_bridged(True, True, False,
736                                              self.WITH_EH)
737
738     def test_1002_ip6_irb_1(self):
739         """ ACL IPv6+EH routed -> bridged, L3 ACL deny"""
740         self.run_test_ip46_routed_to_bridged(False, True, False,
741                                              self.WITH_EH)
742
743     def test_1005_ip6_irb_1(self):
744         """ ACL IPv6+EH bridged -> routed, L2 ACL deny """
745         self.run_test_ip46_bridged_to_routed(True, True, False,
746                                              self.WITH_EH)
747
748     def test_1006_ip6_irb_1(self):
749         """ ACL IPv6+EH bridged -> routed, L3 ACL deny """
750         self.run_test_ip46_bridged_to_routed(False, True, False,
751                                              self.WITH_EH)
752
753     def test_1101_ip6_irb_1(self):
754         """ ACL IPv6+EH routed -> bridged, L2 ACL permit+reflect"""
755         self.run_test_ip46_routed_to_bridged_and_back(True, True,
756                                                       self.WITH_EH)
757
758     def test_1102_ip6_irb_1(self):
759         """ ACL IPv6+EH bridged -> routed, L2 ACL permit+reflect"""
760         self.run_test_ip46_bridged_to_routed_and_back(True, True,
761                                                       self.WITH_EH)
762
763     def test_1111_ip6_irb_1(self):
764         """ ACL IPv6+EH routed -> bridged, L3 ACL permit+reflect"""
765         self.run_test_ip46_routed_to_bridged_and_back(False, True,
766                                                       self.WITH_EH)
767
768     def test_1112_ip6_irb_1(self):
769         """ ACL IPv6+EH bridged -> routed, L3 ACL permit+reflect"""
770         self.run_test_ip46_bridged_to_routed_and_back(False, True,
771                                                       self.WITH_EH)
772
773     # IPv4 with "MF" bit set
774
775     def test_1201_ip6_irb_1(self):
776         """ ACL IPv4+MF routed -> bridged, L2 ACL deny"""
777         self.run_test_ip46_routed_to_bridged(True, False, False,
778                                              self.WITH_EH)
779
780     def test_1202_ip6_irb_1(self):
781         """ ACL IPv4+MF routed -> bridged, L3 ACL deny"""
782         self.run_test_ip46_routed_to_bridged(False, False, False,
783                                              self.WITH_EH)
784
785     def test_1205_ip6_irb_1(self):
786         """ ACL IPv4+MF bridged -> routed, L2 ACL deny """
787         self.run_test_ip46_bridged_to_routed(True, False, False,
788                                              self.WITH_EH)
789
790     def test_1206_ip6_irb_1(self):
791         """ ACL IPv4+MF bridged -> routed, L3 ACL deny """
792         self.run_test_ip46_bridged_to_routed(False, False, False,
793                                              self.WITH_EH)
794
795     def test_1301_ip6_irb_1(self):
796         """ ACL IPv4+MF routed -> bridged, L2 ACL permit+reflect"""
797         self.run_test_ip46_routed_to_bridged_and_back(True, False,
798                                                       self.WITH_EH)
799
800     def test_1302_ip6_irb_1(self):
801         """ ACL IPv4+MF bridged -> routed, L2 ACL permit+reflect"""
802         self.run_test_ip46_bridged_to_routed_and_back(True, False,
803                                                       self.WITH_EH)
804
805     def test_1311_ip6_irb_1(self):
806         """ ACL IPv4+MF routed -> bridged, L3 ACL permit+reflect"""
807         self.run_test_ip46_routed_to_bridged_and_back(False, False,
808                                                       self.WITH_EH)
809
810     def test_1312_ip6_irb_1(self):
811         """ ACL IPv4+MF bridged -> routed, L3 ACL permit+reflect"""
812         self.run_test_ip46_bridged_to_routed_and_back(False, False,
813                                                       self.WITH_EH)
814     # Stateful ACL tests with stateful ICMP
815
816     def test_1401_ip6_irb_1(self):
817         """ IPv6 routed -> bridged, L2 ACL permit+reflect, ICMP reflect"""
818         self.run_test_ip46_routed_to_bridged_and_back(True, True,
819                                                       self.WITHOUT_EH,
820                                                       self.STATEFUL_ICMP)
821
822     def test_1402_ip6_irb_1(self):
823         """ IPv6 bridged -> routed, L2 ACL permit+reflect, ICMP reflect"""
824         self.run_test_ip46_bridged_to_routed_and_back(True, True,
825                                                       self.WITHOUT_EH,
826                                                       self.STATEFUL_ICMP)
827
828     def test_1403_ip4_irb_1(self):
829         """ IPv4 routed -> bridged, L2 ACL permit+reflect, ICMP reflect"""
830         self.run_test_ip46_routed_to_bridged_and_back(True, False,
831                                                       self.WITHOUT_EH,
832                                                       self.STATEFUL_ICMP)
833
834     def test_1404_ip4_irb_1(self):
835         """ IPv4 bridged -> routed, L2 ACL permit+reflect, ICMP reflect"""
836         self.run_test_ip46_bridged_to_routed_and_back(True, False,
837                                                       self.WITHOUT_EH,
838                                                       self.STATEFUL_ICMP)
839
840     def test_1411_ip6_irb_1(self):
841         """ IPv6 routed -> bridged, L3 ACL permit+reflect, ICMP reflect"""
842         self.run_test_ip46_routed_to_bridged_and_back(False, True,
843                                                       self.WITHOUT_EH,
844                                                       self.STATEFUL_ICMP)
845
846     def test_1412_ip6_irb_1(self):
847         """ IPv6 bridged -> routed, L3 ACL permit+reflect, ICMP reflect"""
848         self.run_test_ip46_bridged_to_routed_and_back(False, True,
849                                                       self.WITHOUT_EH,
850                                                       self.STATEFUL_ICMP)
851
852     def test_1413_ip4_irb_1(self):
853         """ IPv4 routed -> bridged, L3 ACL permit+reflect, ICMP reflect"""
854         self.run_test_ip46_routed_to_bridged_and_back(False, False,
855                                                       self.WITHOUT_EH,
856                                                       self.STATEFUL_ICMP)
857
858     def test_1414_ip4_irb_1(self):
859         """ IPv4 bridged -> routed, L3 ACL permit+reflect, ICMP reflect"""
860         self.run_test_ip46_bridged_to_routed_and_back(False, False,
861                                                       self.WITHOUT_EH,
862                                                       self.STATEFUL_ICMP)
863
864
865 if __name__ == '__main__':
866     unittest.main(testRunner=VppTestRunner)