26b562e8ad0c7335b7dcdbc59b291eeef04aaf54
[vpp.git] / test / test_acl_plugin_l2l3.py
1 #!/usr/bin/env python
2 """ACL IRB Test Case HLD:
3
4 **config**
5     - L2 MAC learning enabled in l2bd
6     - 2 routed interfaces untagged, bvi (Bridge Virtual Interface)
7     - 2 bridged interfaces in l2bd with bvi
8
9 **test**
10     - sending ip4 eth pkts between routed interfaces
11         - 2 routed interfaces
12         - 2 bridged interfaces
13
14     - 64B, 512B, 1518B, 9200B (ether_size)
15
16     - burst of pkts per interface
17         - 257pkts per burst
18         - routed pkts hitting different FIB entries
19         - bridged pkts hitting different MAC entries
20
21 **verify**
22     - all packets received correctly
23
24 """
25
26 import unittest
27 from socket import inet_pton, AF_INET, AF_INET6
28 from random import choice, shuffle
29 from pprint import pprint
30
31 from scapy.packet import Raw
32 from scapy.layers.l2 import Ether
33 from scapy.layers.inet import IP, UDP, ICMP, TCP
34 from scapy.layers.inet6 import IPv6, ICMPv6Unknown, ICMPv6EchoRequest
35 from scapy.layers.inet6 import ICMPv6EchoReply, IPv6ExtHdrRouting
36 from scapy.layers.inet6 import IPv6ExtHdrFragment
37
38 from framework import VppTestCase, VppTestRunner
39 import time
40
41
42 class TestIpIrb(VppTestCase):
43     """IRB Test Case"""
44
45     @classmethod
46     def setUpClass(cls):
47         """
48         #. Create BD with MAC learning enabled and put interfaces to this BD.
49         #. Configure IPv4 addresses on loopback interface and routed interface.
50         #. Configure MAC address binding to IPv4 neighbors on loop0.
51         #. Configure MAC address on pg2.
52         #. Loopback BVI interface has remote hosts, one half of hosts are
53            behind pg0 second behind pg1.
54         """
55         super(TestIpIrb, cls).setUpClass()
56
57         cls.pg_if_packet_sizes = [64, 512, 1518, 9018]  # packet sizes
58         cls.bd_id = 10
59         cls.remote_hosts_count = 250
60
61         # create 3 pg interfaces, 1 loopback interface
62         cls.create_pg_interfaces(range(3))
63         cls.create_loopback_interfaces(1)
64
65         cls.interfaces = list(cls.pg_interfaces)
66         cls.interfaces.extend(cls.lo_interfaces)
67
68         for i in cls.interfaces:
69             i.admin_up()
70
71         # Create BD with MAC learning enabled and put interfaces to this BD
72         cls.vapi.sw_interface_set_l2_bridge(
73             cls.loop0.sw_if_index, bd_id=cls.bd_id, bvi=1)
74         cls.vapi.sw_interface_set_l2_bridge(
75             cls.pg0.sw_if_index, bd_id=cls.bd_id)
76         cls.vapi.sw_interface_set_l2_bridge(
77             cls.pg1.sw_if_index, bd_id=cls.bd_id)
78
79         # Configure IPv4 addresses on loopback interface and routed interface
80         cls.loop0.config_ip4()
81         cls.loop0.config_ip6()
82         cls.pg2.config_ip4()
83         cls.pg2.config_ip6()
84
85         # Configure MAC address binding to IPv4 neighbors on loop0
86         cls.loop0.generate_remote_hosts(cls.remote_hosts_count)
87         cls.loop0.configure_ipv4_neighbors()
88         cls.loop0.configure_ipv6_neighbors()
89         # configure MAC address on pg2
90         cls.pg2.resolve_arp()
91         cls.pg2.resolve_ndp()
92
93         cls.WITHOUT_EH = False
94         cls.WITH_EH = True
95
96         # Loopback BVI interface has remote hosts, one half of hosts are behind
97         # pg0 second behind pg1
98         half = cls.remote_hosts_count // 2
99         cls.pg0.remote_hosts = cls.loop0.remote_hosts[:half]
100         cls.pg1.remote_hosts = cls.loop0.remote_hosts[half:]
101
102     def tearDown(self):
103         """Run standard test teardown and log ``show l2patch``,
104         ``show l2fib verbose``,``show bridge-domain <bd_id> detail``,
105         ``show ip arp``.
106         """
107         super(TestIpIrb, self).tearDown()
108         if not self.vpp_dead:
109             self.logger.info(self.vapi.cli("show l2patch"))
110             self.logger.info(self.vapi.cli("show classify tables"))
111             self.logger.info(self.vapi.cli("show vlib graph"))
112             self.logger.info(self.vapi.cli("show l2fib verbose"))
113             self.logger.info(self.vapi.cli("show bridge-domain %s detail" %
114                                            self.bd_id))
115             self.logger.info(self.vapi.cli("show ip arp"))
116             self.logger.info(self.vapi.cli("show ip6 neighbors"))
117             self.logger.info(self.vapi.cli("show acl-plugin sessions"))
118             self.logger.info(self.vapi.cli("show acl-plugin acl"))
119             self.logger.info(self.vapi.cli("show acl-plugin interface"))
120             self.logger.info(self.vapi.cli("show acl-plugin tables"))
121
122     def create_stream(self, src_ip_if, dst_ip_if, reverse, packet_sizes,
123                       is_ip6, expect_blocked, expect_established,
124                       add_extension_header):
125         pkts = []
126         rules = []
127         permit_rules = []
128         permit_and_reflect_rules = []
129         total_packet_count = 8
130         for i in range(0, total_packet_count):
131             modulo = (i//2) % 2
132             can_reflect_this_packet = (modulo == 0)
133             is_permit = i % 2
134             remote_dst_index = i % len(dst_ip_if.remote_hosts)
135             remote_dst_host = dst_ip_if.remote_hosts[remote_dst_index]
136             if is_permit == 1:
137                 info = self.create_packet_info(src_ip_if, dst_ip_if)
138                 payload = self.info_to_payload(info)
139             else:
140                 to_be_blocked = False
141                 if (expect_blocked and not expect_established):
142                     to_be_blocked = True
143                 if (not can_reflect_this_packet):
144                     to_be_blocked = True
145                 if to_be_blocked:
146                     payload = "to be blocked"
147                 else:
148                     info = self.create_packet_info(src_ip_if, dst_ip_if)
149                     payload = self.info_to_payload(info)
150             if reverse:
151                 dst_mac = 'de:ad:00:00:00:00'
152                 src_mac = remote_dst_host._mac
153                 dst_ip6 = src_ip_if.remote_ip6
154                 src_ip6 = remote_dst_host.ip6
155                 dst_ip4 = src_ip_if.remote_ip4
156                 src_ip4 = remote_dst_host.ip4
157                 dst_l4 = 1234 + i
158                 src_l4 = 4321 + i
159             else:
160                 dst_mac = src_ip_if.local_mac
161                 src_mac = src_ip_if.remote_mac
162                 src_ip6 = src_ip_if.remote_ip6
163                 dst_ip6 = remote_dst_host.ip6
164                 src_ip4 = src_ip_if.remote_ip4
165                 dst_ip4 = remote_dst_host.ip4
166                 src_l4 = 1234 + i
167                 dst_l4 = 4321 + i
168
169             # default ULP should be something we do not use in tests
170             ulp_l4 = TCP(sport=src_l4, dport=dst_l4)
171             # potentially a chain of protocols leading to ULP
172             ulp = ulp_l4
173
174             if can_reflect_this_packet:
175                 if is_ip6:
176                     ulp_l4 = UDP(sport=src_l4, dport=dst_l4)
177                     if add_extension_header:
178                         # prepend some extension headers
179                         ulp = (IPv6ExtHdrRouting() / IPv6ExtHdrRouting() /
180                                IPv6ExtHdrFragment(offset=0, m=1) / ulp_l4)
181                         # uncomment below to test invalid ones
182                         # ulp = IPv6ExtHdrRouting(len = 200) / ulp_l4
183                     else:
184                         ulp = ulp_l4
185                     p = (Ether(dst=dst_mac, src=src_mac) /
186                          IPv6(src=src_ip6, dst=dst_ip6) /
187                          ulp /
188                          Raw(payload))
189                 else:
190                     ulp_l4 = UDP(sport=src_l4, dport=dst_l4)
191                     # IPv4 does not allow extension headers,
192                     # but we rather make it a first fragment
193                     flags = 1 if add_extension_header else 0
194                     ulp = ulp_l4
195                     p = (Ether(dst=dst_mac, src=src_mac) /
196                          IP(src=src_ip4, dst=dst_ip4, frag=0, flags=flags) /
197                          ulp /
198                          Raw(payload))
199             elif modulo == 1:
200                 if is_ip6:
201                     ulp_l4 = ICMPv6Unknown(type=128 + (i % 2), code=i % 2)
202                     ulp = ulp_l4
203                     p = (Ether(dst=dst_mac, src=src_mac) /
204                          IPv6(src=src_ip6, dst=dst_ip6) /
205                          ulp /
206                          Raw(payload))
207                 else:
208                     ulp_l4 = ICMP(type=8 + (i % 2), code=i % 2)
209                     ulp = ulp_l4
210                     p = (Ether(dst=dst_mac, src=src_mac) /
211                          IP(src=src_ip4, dst=dst_ip4) /
212                          ulp /
213                          Raw(payload))
214
215             if i % 2 == 1:
216                 info.data = p.copy()
217             size = packet_sizes[(i // 2) % len(packet_sizes)]
218             self.extend_packet(p, size)
219             pkts.append(p)
220
221             rule_family = AF_INET6 if p.haslayer(IPv6) else AF_INET
222             rule_prefix_len = 128 if p.haslayer(IPv6) else 32
223             rule_l3_layer = IPv6 if p.haslayer(IPv6) else IP
224
225             if p.haslayer(UDP):
226                 rule_l4_sport = p[UDP].sport
227                 rule_l4_dport = p[UDP].dport
228             else:
229                 if p.haslayer(ICMP):
230                     rule_l4_sport = p[ICMP].type
231                     rule_l4_dport = p[ICMP].code
232                 else:
233                     rule_l4_sport = p[ICMPv6Unknown].type
234                     rule_l4_dport = p[ICMPv6Unknown].code
235             if p.haslayer(IPv6):
236                 rule_l4_proto = ulp_l4.overload_fields[IPv6]['nh']
237             else:
238                 rule_l4_proto = p[IP].proto
239
240             new_rule = {
241                         'is_permit': is_permit,
242                         'is_ipv6': p.haslayer(IPv6),
243                         'src_ip_addr': inet_pton(rule_family,
244                                                  p[rule_l3_layer].src),
245                         'src_ip_prefix_len': rule_prefix_len,
246                         'dst_ip_addr': inet_pton(rule_family,
247                                                  p[rule_l3_layer].dst),
248                         'dst_ip_prefix_len': rule_prefix_len,
249                         'srcport_or_icmptype_first': rule_l4_sport,
250                         'srcport_or_icmptype_last': rule_l4_sport,
251                         'dstport_or_icmpcode_first': rule_l4_dport,
252                         'dstport_or_icmpcode_last': rule_l4_dport,
253                         'proto': rule_l4_proto,
254                        }
255             rules.append(new_rule)
256             new_rule_permit = new_rule.copy()
257             new_rule_permit['is_permit'] = 1
258             permit_rules.append(new_rule_permit)
259
260             new_rule_permit_and_reflect = new_rule.copy()
261             if can_reflect_this_packet:
262                 new_rule_permit_and_reflect['is_permit'] = 2
263             else:
264                 new_rule_permit_and_reflect['is_permit'] = is_permit
265             permit_and_reflect_rules.append(new_rule_permit_and_reflect)
266
267         return {'stream': pkts,
268                 'rules': rules,
269                 'permit_rules': permit_rules,
270                 'permit_and_reflect_rules': permit_and_reflect_rules}
271
272     def verify_capture(self, dst_ip_if, src_ip_if, capture, reverse):
273         last_info = dict()
274         for i in self.interfaces:
275             last_info[i.sw_if_index] = None
276
277         dst_ip_sw_if_index = dst_ip_if.sw_if_index
278         return
279
280         for packet in capture:
281             l3 = IP if packet.haslayer(IP) else IPv6
282             ip = packet[l3]
283             if packet.haslayer(UDP):
284                 l4 = UDP
285             else:
286                 if packet.haslayer(ICMP):
287                     l4 = ICMP
288                 else:
289                     l4 = ICMPv6Unknown
290
291             # Scapy IPv6 stuff is too smart for its own good.
292             # So we do this and coerce the ICMP into unknown type
293             if packet.haslayer(UDP):
294                 data = str(packet[UDP][Raw])
295             else:
296                 if l3 == IP:
297                     data = str(ICMP(str(packet[l3].payload))[Raw])
298                 else:
299                     data = str(ICMPv6Unknown(str(packet[l3].payload)).msgbody)
300             udp_or_icmp = packet[l3].payload
301             payload_info = self.payload_to_info(data)
302             packet_index = payload_info.index
303
304             self.assertEqual(payload_info.dst, dst_ip_sw_if_index)
305
306             next_info = self.get_next_packet_info_for_interface2(
307                 payload_info.src, dst_ip_sw_if_index,
308                 last_info[payload_info.src])
309             last_info[payload_info.src] = next_info
310             self.assertTrue(next_info is not None)
311             self.assertEqual(packet_index, next_info.index)
312             saved_packet = next_info.data
313             self.assertTrue(next_info is not None)
314
315             # MAC: src, dst
316             if not reverse:
317                 self.assertEqual(packet.src, dst_ip_if.local_mac)
318                 host = dst_ip_if.host_by_mac(packet.dst)
319
320             # IP: src, dst
321             # self.assertEqual(ip.src, src_ip_if.remote_ip4)
322             if saved_packet is not None:
323                 self.assertEqual(ip.src, saved_packet[l3].src)
324                 self.assertEqual(ip.dst, saved_packet[l3].dst)
325                 if l4 == UDP:
326                     self.assertEqual(udp_or_icmp.sport, saved_packet[l4].sport)
327                     self.assertEqual(udp_or_icmp.dport, saved_packet[l4].dport)
328             else:
329                 print("Saved packet is none")
330             # self.assertEqual(ip.dst, host.ip4)
331
332             # UDP:
333
334     def applied_acl_shuffle(self, sw_if_index):
335         # first collect what ACLs are applied and what they look like
336         r = self.vapi.acl_interface_list_dump(sw_if_index=sw_if_index)
337         orig_applied_acls = r[0]
338
339         # we will collect these just to save and generate additional rulesets
340         orig_acls = []
341         for acl_num in orig_applied_acls.acls:
342             rr = self.vapi.acl_dump(acl_num)
343             orig_acls.append(rr[0])
344
345         # now create a list of all the rules in all ACLs
346         all_rules = []
347         for old_acl in orig_acls:
348             for rule in old_acl.r:
349                 all_rules.append(dict(rule._asdict()))
350
351         # Add a few ACLs made from shuffled rules
352         shuffle(all_rules)
353         reply = self.vapi.acl_add_replace(acl_index=4294967295,
354                                           r=all_rules[::2],
355                                           tag="shuffle 1. acl")
356         shuffle_acl_1 = reply.acl_index
357         shuffle(all_rules)
358         reply = self.vapi.acl_add_replace(acl_index=4294967295,
359                                           r=all_rules[::3],
360                                           tag="shuffle 2. acl")
361         shuffle_acl_2 = reply.acl_index
362         shuffle(all_rules)
363         reply = self.vapi.acl_add_replace(acl_index=4294967295,
364                                           r=all_rules[::2],
365                                           tag="shuffle 3. acl")
366         shuffle_acl_3 = reply.acl_index
367
368         # apply the shuffle ACLs in front
369         input_acls = [shuffle_acl_1, shuffle_acl_2]
370         output_acls = [shuffle_acl_1, shuffle_acl_2]
371
372         # add the currently applied ACLs
373         n_input = orig_applied_acls.n_input
374         input_acls.extend(orig_applied_acls.acls[:n_input])
375         output_acls.extend(orig_applied_acls.acls[n_input:])
376
377         # and the trailing shuffle ACL(s)
378         input_acls.extend([shuffle_acl_3])
379         output_acls.extend([shuffle_acl_3])
380
381         # set the interface ACL list to the result
382         self.vapi.acl_interface_set_acl_list(sw_if_index=sw_if_index,
383                                              n_input=len(input_acls),
384                                              acls=input_acls + output_acls)
385         # change the ACLs a few times
386         for i in range(1, 10):
387             shuffle(all_rules)
388             reply = self.vapi.acl_add_replace(acl_index=shuffle_acl_1,
389                                               r=all_rules[::1+(i % 2)],
390                                               tag="shuffle 1. acl")
391             shuffle(all_rules)
392             reply = self.vapi.acl_add_replace(acl_index=shuffle_acl_2,
393                                               r=all_rules[::1+(i % 3)],
394                                               tag="shuffle 2. acl")
395             shuffle(all_rules)
396             reply = self.vapi.acl_add_replace(acl_index=shuffle_acl_2,
397                                               r=all_rules[::1+(i % 5)],
398                                               tag="shuffle 3. acl")
399
400         # restore to how it was before and clean up
401         self.vapi.acl_interface_set_acl_list(sw_if_index=sw_if_index,
402                                              n_input=orig_applied_acls.n_input,
403                                              acls=orig_applied_acls.acls)
404         reply = self.vapi.acl_del(acl_index=shuffle_acl_1)
405         reply = self.vapi.acl_del(acl_index=shuffle_acl_2)
406         reply = self.vapi.acl_del(acl_index=shuffle_acl_3)
407
408     def create_acls_for_a_stream(self, stream_dict,
409                                  test_l2_action, is_reflect):
410         r = stream_dict['rules']
411         r_permit = stream_dict['permit_rules']
412         r_permit_reflect = stream_dict['permit_and_reflect_rules']
413         r_action = r_permit_reflect if is_reflect else r
414         reply = self.vapi.acl_add_replace(acl_index=4294967295, r=r_action,
415                                           tag="act. acl")
416         action_acl_index = reply.acl_index
417         reply = self.vapi.acl_add_replace(acl_index=4294967295, r=r_permit,
418                                           tag="perm. acl")
419         permit_acl_index = reply.acl_index
420         return {'L2': action_acl_index if test_l2_action else permit_acl_index,
421                 'L3': permit_acl_index if test_l2_action else action_acl_index,
422                 'permit': permit_acl_index, 'action': action_acl_index}
423
424     def apply_acl_ip46_x_to_y(self, bridged_to_routed, test_l2_deny,
425                               is_ip6, is_reflect, add_eh):
426         """ Apply the ACLs
427         """
428         self.reset_packet_infos()
429         stream_dict = self.create_stream(
430                                          self.pg2, self.loop0,
431                                          bridged_to_routed,
432                                          self.pg_if_packet_sizes, is_ip6,
433                                          not is_reflect, False, add_eh)
434         stream = stream_dict['stream']
435         acl_idx = self.create_acls_for_a_stream(stream_dict, test_l2_deny,
436                                                 is_reflect)
437         n_input_l3 = 0 if bridged_to_routed else 1
438         n_input_l2 = 1 if bridged_to_routed else 0
439         self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg2.sw_if_index,
440                                              n_input=n_input_l3,
441                                              acls=[acl_idx['L3']])
442         self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
443                                              n_input=n_input_l2,
444                                              acls=[acl_idx['L2']])
445         self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg1.sw_if_index,
446                                              n_input=n_input_l2,
447                                              acls=[acl_idx['L2']])
448         self.applied_acl_shuffle(self.pg0.sw_if_index)
449         self.applied_acl_shuffle(self.pg2.sw_if_index)
450
451     def apply_acl_ip46_both_directions_reflect(self,
452                                                primary_is_bridged_to_routed,
453                                                reflect_on_l2, is_ip6, add_eh):
454         primary_is_routed_to_bridged = not primary_is_bridged_to_routed
455         self.reset_packet_infos()
456         stream_dict_fwd = self.create_stream(self.pg2, self.loop0,
457                                              primary_is_bridged_to_routed,
458                                              self.pg_if_packet_sizes, is_ip6,
459                                              False, False, add_eh)
460         acl_idx_fwd = self.create_acls_for_a_stream(stream_dict_fwd,
461                                                     reflect_on_l2, True)
462
463         stream_dict_rev = self.create_stream(self.pg2, self.loop0,
464                                              not primary_is_bridged_to_routed,
465                                              self.pg_if_packet_sizes, is_ip6,
466                                              True, True, add_eh)
467         # We want the primary action to be "deny" rather than reflect
468         acl_idx_rev = self.create_acls_for_a_stream(stream_dict_rev,
469                                                     reflect_on_l2, False)
470
471         if primary_is_bridged_to_routed:
472             inbound_l2_acl = acl_idx_fwd['L2']
473         else:
474             inbound_l2_acl = acl_idx_rev['L2']
475
476         if primary_is_routed_to_bridged:
477             outbound_l2_acl = acl_idx_fwd['L2']
478         else:
479             outbound_l2_acl = acl_idx_rev['L2']
480
481         if primary_is_routed_to_bridged:
482             inbound_l3_acl = acl_idx_fwd['L3']
483         else:
484             inbound_l3_acl = acl_idx_rev['L3']
485
486         if primary_is_bridged_to_routed:
487             outbound_l3_acl = acl_idx_fwd['L3']
488         else:
489             outbound_l3_acl = acl_idx_rev['L3']
490
491         self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg2.sw_if_index,
492                                              n_input=1,
493                                              acls=[inbound_l3_acl,
494                                                    outbound_l3_acl])
495         self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
496                                              n_input=1,
497                                              acls=[inbound_l2_acl,
498                                                    outbound_l2_acl])
499         self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg1.sw_if_index,
500                                              n_input=1,
501                                              acls=[inbound_l2_acl,
502                                                    outbound_l2_acl])
503         self.applied_acl_shuffle(self.pg0.sw_if_index)
504         self.applied_acl_shuffle(self.pg2.sw_if_index)
505
506     def apply_acl_ip46_routed_to_bridged(self, test_l2_deny, is_ip6,
507                                          is_reflect, add_eh):
508         self.apply_acl_ip46_x_to_y(False, test_l2_deny, is_ip6,
509                                    is_reflect, add_eh)
510
511     def apply_acl_ip46_bridged_to_routed(self, test_l2_deny, is_ip6,
512                                          is_reflect, add_eh):
513         self.apply_acl_ip46_x_to_y(True, test_l2_deny, is_ip6,
514                                    is_reflect, add_eh)
515
516     def run_traffic_ip46_x_to_y(self, bridged_to_routed,
517                                 test_l2_deny, is_ip6,
518                                 is_reflect, is_established, add_eh):
519         self.reset_packet_infos()
520         stream_dict = self.create_stream(self.pg2, self.loop0,
521                                          bridged_to_routed,
522                                          self.pg_if_packet_sizes, is_ip6,
523                                          not is_reflect, is_established,
524                                          add_eh)
525         stream = stream_dict['stream']
526
527         tx_if = self.pg0 if bridged_to_routed else self.pg2
528         rx_if = self.pg2 if bridged_to_routed else self.pg0
529
530         tx_if.add_stream(stream)
531         self.pg_enable_capture(self.pg_interfaces)
532         self.pg_start()
533         packet_count = self.get_packet_count_for_if_idx(self.loop0.sw_if_index)
534         rcvd1 = rx_if.get_capture(packet_count)
535         self.verify_capture(self.loop0, self.pg2, rcvd1, bridged_to_routed)
536
537     def run_traffic_ip46_routed_to_bridged(self, test_l2_deny, is_ip6,
538                                            is_reflect, is_established, add_eh):
539         self.run_traffic_ip46_x_to_y(False, test_l2_deny, is_ip6,
540                                      is_reflect, is_established, add_eh)
541
542     def run_traffic_ip46_bridged_to_routed(self, test_l2_deny, is_ip6,
543                                            is_reflect, is_established, add_eh):
544         self.run_traffic_ip46_x_to_y(True, test_l2_deny, is_ip6,
545                                      is_reflect, is_established, add_eh)
546
547     def run_test_ip46_routed_to_bridged(self, test_l2_deny,
548                                         is_ip6, is_reflect, add_eh):
549         self.apply_acl_ip46_routed_to_bridged(test_l2_deny,
550                                               is_ip6, is_reflect, add_eh)
551         self.run_traffic_ip46_routed_to_bridged(test_l2_deny, is_ip6,
552                                                 is_reflect, False, add_eh)
553
554     def run_test_ip46_bridged_to_routed(self, test_l2_deny,
555                                         is_ip6, is_reflect, add_eh):
556         self.apply_acl_ip46_bridged_to_routed(test_l2_deny,
557                                               is_ip6, is_reflect, add_eh)
558         self.run_traffic_ip46_bridged_to_routed(test_l2_deny, is_ip6,
559                                                 is_reflect, False, add_eh)
560
561     def run_test_ip46_routed_to_bridged_and_back(self, test_l2_action,
562                                                  is_ip6, add_eh):
563         self.apply_acl_ip46_both_directions_reflect(False, test_l2_action,
564                                                     is_ip6, add_eh)
565         self.run_traffic_ip46_routed_to_bridged(test_l2_action, is_ip6,
566                                                 True, False, add_eh)
567         self.run_traffic_ip46_bridged_to_routed(test_l2_action, is_ip6,
568                                                 False, True, add_eh)
569
570     def run_test_ip46_bridged_to_routed_and_back(self, test_l2_action,
571                                                  is_ip6, add_eh):
572         self.apply_acl_ip46_both_directions_reflect(True, test_l2_action,
573                                                     is_ip6, add_eh)
574         self.run_traffic_ip46_bridged_to_routed(test_l2_action, is_ip6,
575                                                 True, False, add_eh)
576         self.run_traffic_ip46_routed_to_bridged(test_l2_action, is_ip6,
577                                                 False, True, add_eh)
578
579     def test_0000_ip6_irb_1(self):
580         """ ACL plugin prepare"""
581         if not self.vpp_dead:
582             cmd = "set acl-plugin session timeout udp idle 2000"
583             self.logger.info(self.vapi.ppcli(cmd))
584             # uncomment to not skip past the routing header
585             # and watch the EH tests fail
586             # self.logger.info(self.vapi.ppcli(
587             #    "set acl-plugin skip-ipv6-extension-header 43 0"))
588             # uncomment to test the session limit (stateful tests will fail)
589             # self.logger.info(self.vapi.ppcli(
590             #    "set acl-plugin session table max-entries 1"))
591             # new datapath is the default, but just in case
592             # self.logger.info(self.vapi.ppcli(
593             #    "set acl-plugin l2-datapath new"))
594             # If you want to see some tests fail, uncomment the next line
595             # self.logger.info(self.vapi.ppcli(
596             #    "set acl-plugin l2-datapath old"))
597
598     def test_0001_ip6_irb_1(self):
599         """ ACL IPv6 routed -> bridged, L2 ACL deny"""
600         self.run_test_ip46_routed_to_bridged(True, True, False,
601                                              self.WITHOUT_EH)
602
603     def test_0002_ip6_irb_1(self):
604         """ ACL IPv6 routed -> bridged, L3 ACL deny"""
605         self.run_test_ip46_routed_to_bridged(False, True, False,
606                                              self.WITHOUT_EH)
607
608     def test_0003_ip4_irb_1(self):
609         """ ACL IPv4 routed -> bridged, L2 ACL deny"""
610         self.run_test_ip46_routed_to_bridged(True, False, False,
611                                              self.WITHOUT_EH)
612
613     def test_0004_ip4_irb_1(self):
614         """ ACL IPv4 routed -> bridged, L3 ACL deny"""
615         self.run_test_ip46_routed_to_bridged(False, False, False,
616                                              self.WITHOUT_EH)
617
618     def test_0005_ip6_irb_1(self):
619         """ ACL IPv6 bridged -> routed, L2 ACL deny """
620         self.run_test_ip46_bridged_to_routed(True, True, False,
621                                              self.WITHOUT_EH)
622
623     def test_0006_ip6_irb_1(self):
624         """ ACL IPv6 bridged -> routed, L3 ACL deny """
625         self.run_test_ip46_bridged_to_routed(False, True, False,
626                                              self.WITHOUT_EH)
627
628     def test_0007_ip6_irb_1(self):
629         """ ACL IPv4 bridged -> routed, L2 ACL deny """
630         self.run_test_ip46_bridged_to_routed(True, False, False,
631                                              self.WITHOUT_EH)
632
633     def test_0008_ip6_irb_1(self):
634         """ ACL IPv4 bridged -> routed, L3 ACL deny """
635         self.run_test_ip46_bridged_to_routed(False, False, False,
636                                              self.WITHOUT_EH)
637
638     # Stateful ACL tests
639     def test_0101_ip6_irb_1(self):
640         """ ACL IPv6 routed -> bridged, L2 ACL permit+reflect"""
641         self.run_test_ip46_routed_to_bridged_and_back(True, True,
642                                                       self.WITHOUT_EH)
643
644     def test_0102_ip6_irb_1(self):
645         """ ACL IPv6 bridged -> routed, L2 ACL permit+reflect"""
646         self.run_test_ip46_bridged_to_routed_and_back(True, True,
647                                                       self.WITHOUT_EH)
648
649     def test_0103_ip6_irb_1(self):
650         """ ACL IPv4 routed -> bridged, L2 ACL permit+reflect"""
651         self.run_test_ip46_routed_to_bridged_and_back(True, False,
652                                                       self.WITHOUT_EH)
653
654     def test_0104_ip6_irb_1(self):
655         """ ACL IPv4 bridged -> routed, L2 ACL permit+reflect"""
656         self.run_test_ip46_bridged_to_routed_and_back(True, False,
657                                                       self.WITHOUT_EH)
658
659     def test_0111_ip6_irb_1(self):
660         """ ACL IPv6 routed -> bridged, L3 ACL permit+reflect"""
661         self.run_test_ip46_routed_to_bridged_and_back(False, True,
662                                                       self.WITHOUT_EH)
663
664     def test_0112_ip6_irb_1(self):
665         """ ACL IPv6 bridged -> routed, L3 ACL permit+reflect"""
666         self.run_test_ip46_bridged_to_routed_and_back(False, True,
667                                                       self.WITHOUT_EH)
668
669     def test_0113_ip6_irb_1(self):
670         """ ACL IPv4 routed -> bridged, L3 ACL permit+reflect"""
671         self.run_test_ip46_routed_to_bridged_and_back(False, False,
672                                                       self.WITHOUT_EH)
673
674     def test_0114_ip6_irb_1(self):
675         """ ACL IPv4 bridged -> routed, L3 ACL permit+reflect"""
676         self.run_test_ip46_bridged_to_routed_and_back(False, False,
677                                                       self.WITHOUT_EH)
678
679     # A block of tests with extension headers
680
681     def test_1001_ip6_irb_1(self):
682         """ ACL IPv6+EH routed -> bridged, L2 ACL deny"""
683         self.run_test_ip46_routed_to_bridged(True, True, False,
684                                              self.WITH_EH)
685
686     def test_1002_ip6_irb_1(self):
687         """ ACL IPv6+EH routed -> bridged, L3 ACL deny"""
688         self.run_test_ip46_routed_to_bridged(False, True, False,
689                                              self.WITH_EH)
690
691     def test_1005_ip6_irb_1(self):
692         """ ACL IPv6+EH bridged -> routed, L2 ACL deny """
693         self.run_test_ip46_bridged_to_routed(True, True, False,
694                                              self.WITH_EH)
695
696     def test_1006_ip6_irb_1(self):
697         """ ACL IPv6+EH bridged -> routed, L3 ACL deny """
698         self.run_test_ip46_bridged_to_routed(False, True, False,
699                                              self.WITH_EH)
700
701     def test_1101_ip6_irb_1(self):
702         """ ACL IPv6+EH routed -> bridged, L2 ACL permit+reflect"""
703         self.run_test_ip46_routed_to_bridged_and_back(True, True,
704                                                       self.WITH_EH)
705
706     def test_1102_ip6_irb_1(self):
707         """ ACL IPv6+EH bridged -> routed, L2 ACL permit+reflect"""
708         self.run_test_ip46_bridged_to_routed_and_back(True, True,
709                                                       self.WITH_EH)
710
711     def test_1111_ip6_irb_1(self):
712         """ ACL IPv6+EH routed -> bridged, L3 ACL permit+reflect"""
713         self.run_test_ip46_routed_to_bridged_and_back(False, True,
714                                                       self.WITH_EH)
715
716     def test_1112_ip6_irb_1(self):
717         """ ACL IPv6+EH bridged -> routed, L3 ACL permit+reflect"""
718         self.run_test_ip46_bridged_to_routed_and_back(False, True,
719                                                       self.WITH_EH)
720
721     # IPv4 with "MF" bit set
722
723     def test_1201_ip6_irb_1(self):
724         """ ACL IPv4+MF routed -> bridged, L2 ACL deny"""
725         self.run_test_ip46_routed_to_bridged(True, False, False,
726                                              self.WITH_EH)
727
728     def test_1202_ip6_irb_1(self):
729         """ ACL IPv4+MF routed -> bridged, L3 ACL deny"""
730         self.run_test_ip46_routed_to_bridged(False, False, False,
731                                              self.WITH_EH)
732
733     def test_1205_ip6_irb_1(self):
734         """ ACL IPv4+MF bridged -> routed, L2 ACL deny """
735         self.run_test_ip46_bridged_to_routed(True, False, False,
736                                              self.WITH_EH)
737
738     def test_1206_ip6_irb_1(self):
739         """ ACL IPv4+MF bridged -> routed, L3 ACL deny """
740         self.run_test_ip46_bridged_to_routed(False, False, False,
741                                              self.WITH_EH)
742
743     def test_1301_ip6_irb_1(self):
744         """ ACL IPv4+MF routed -> bridged, L2 ACL permit+reflect"""
745         self.run_test_ip46_routed_to_bridged_and_back(True, False,
746                                                       self.WITH_EH)
747
748     def test_1302_ip6_irb_1(self):
749         """ ACL IPv4+MF bridged -> routed, L2 ACL permit+reflect"""
750         self.run_test_ip46_bridged_to_routed_and_back(True, False,
751                                                       self.WITH_EH)
752
753     def test_1311_ip6_irb_1(self):
754         """ ACL IPv4+MF routed -> bridged, L3 ACL permit+reflect"""
755         self.run_test_ip46_routed_to_bridged_and_back(False, False,
756                                                       self.WITH_EH)
757
758     def test_1312_ip6_irb_1(self):
759         """ ACL IPv4+MF bridged -> routed, L3 ACL permit+reflect"""
760         self.run_test_ip46_bridged_to_routed_and_back(False, False,
761                                                       self.WITH_EH)
762
763 if __name__ == '__main__':
764     unittest.main(testRunner=VppTestRunner)