2 """ ACL plugin extended stateful tests """
5 from framework import VppTestCase, VppTestRunner, running_extended_tests
6 from scapy.layers.l2 import Ether
7 from scapy.packet import Raw
8 from scapy.layers.inet import IP, UDP
9 from scapy.packet import Packet
10 from socket import inet_pton, AF_INET, AF_INET6
11 from scapy.layers.inet6 import IPv6, ICMPv6Unknown, ICMPv6EchoRequest
12 from scapy.layers.inet6 import ICMPv6EchoReply, IPv6ExtHdrRouting
13 from scapy.layers.inet6 import IPv6ExtHdrFragment
14 from pprint import pprint
15 from random import randint
18 def to_acl_rule(self, is_permit, wildcard_sport=False):
20 rule_family = AF_INET6 if p.haslayer(IPv6) else AF_INET
21 rule_prefix_len = 128 if p.haslayer(IPv6) else 32
22 rule_l3_layer = IPv6 if p.haslayer(IPv6) else IP
23 rule_l4_sport = p.sport
24 rule_l4_dport = p.dport
26 rule_l4_proto = p[IPv6].nh
28 rule_l4_proto = p[IP].proto
31 rule_l4_sport_first = 0
32 rule_l4_sport_last = 65535
34 rule_l4_sport_first = rule_l4_sport
35 rule_l4_sport_last = rule_l4_sport
38 'is_permit': is_permit,
39 'is_ipv6': p.haslayer(IPv6),
40 'src_ip_addr': inet_pton(rule_family,
41 p[rule_l3_layer].src),
42 'src_ip_prefix_len': rule_prefix_len,
43 'dst_ip_addr': inet_pton(rule_family,
44 p[rule_l3_layer].dst),
45 'dst_ip_prefix_len': rule_prefix_len,
46 'srcport_or_icmptype_first': rule_l4_sport_first,
47 'srcport_or_icmptype_last': rule_l4_sport_last,
48 'dstport_or_icmpcode_first': rule_l4_dport,
49 'dstport_or_icmpcode_last': rule_l4_dport,
50 'proto': rule_l4_proto,
54 Packet.to_acl_rule = to_acl_rule
57 class IterateWithSleep():
58 def __init__(self, testcase, n_iters, description, sleep_sec):
60 self.testcase = testcase
61 self.n_iters = n_iters
62 self.sleep_sec = sleep_sec
63 self.description = description
66 for x in range(0, self.n_iters):
68 self.testcase.sleep(self.sleep_sec)
72 def __init__(self, testcase, if1, if2, af, l4proto, port1, port2):
73 self.testcase = testcase
74 self.ifs = [None, None]
77 self.address_family = af
78 self.l4proto = l4proto
79 self.ports = [None, None]
85 is_ip6 = 1 if self.address_family == AF_INET6 else 0
90 layer_3 = [IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4),
91 IPv6(src=src_if.remote_ip6, dst=dst_if.remote_ip6)]
93 p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
95 self.l4proto(sport=self.ports[s0], dport=self.ports[s1]) /
99 def apply_acls(self, reflect_side, acl_side):
101 pkts.append(self.pkt(0))
102 pkts.append(self.pkt(1))
103 pkt = pkts[reflect_side]
106 r.append(pkt.to_acl_rule(2, wildcard_sport=True))
107 r.append(self.wildcard_rule(0))
108 res = self.testcase.api_acl_add_replace(0xffffffff, r)
109 self.testcase.assert_equal(res.retval, 0, "error adding ACL")
110 reflect_acl_index = res.acl_index
113 r.append(self.wildcard_rule(0))
114 res = self.testcase.api_acl_add_replace(0xffffffff, r)
115 self.testcase.assert_equal(res.retval, 0, "error adding deny ACL")
116 deny_acl_index = res.acl_index
118 if reflect_side == acl_side:
119 self.testcase.api_acl_interface_set_acl_list(
120 self.ifs[acl_side].sw_if_index, 2, 1,
123 self.testcase.api_acl_interface_set_acl_list(
124 self.ifs[1-acl_side].sw_if_index, 0, 0, [])
126 self.testcase.api_acl_interface_set_acl_list(
127 self.ifs[acl_side].sw_if_index, 2, 1,
130 self.testcase.api_acl_interface_set_acl_list(
131 self.ifs[1-acl_side].sw_if_index, 0, 0, [])
133 def wildcard_rule(self, is_permit):
134 any_addr = ["0.0.0.0", "::"]
135 rule_family = self.address_family
136 is_ip6 = 1 if rule_family == AF_INET6 else 0
138 'is_permit': is_permit,
140 'src_ip_addr': inet_pton(rule_family, any_addr[is_ip6]),
141 'src_ip_prefix_len': 0,
142 'dst_ip_addr': inet_pton(rule_family, any_addr[is_ip6]),
143 'dst_ip_prefix_len': 0,
144 'srcport_or_icmptype_first': 0,
145 'srcport_or_icmptype_last': 65535,
146 'dstport_or_icmpcode_first': 0,
147 'dstport_or_icmpcode_last': 65535,
152 def send(self, side):
153 self.ifs[side].add_stream(self.pkt(side))
154 self.ifs[1-side].enable_capture()
155 self.testcase.pg_start()
157 def recv(self, side):
158 p = self.ifs[side].wait_for_packet(1)
161 def send_through(self, side):
163 p = self.recv(1-side)
166 def send_pingpong(self, side):
167 p1 = self.send_through(side)
168 p2 = self.send_through(1-side)
172 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
173 class ACLPluginConnTestCase(VppTestCase):
174 """ ACL plugin connection-oriented extended testcases """
177 def setUpClass(self):
178 super(ACLPluginConnTestCase, self).setUpClass()
180 self.create_pg_interfaces(range(2))
181 for i in self.pg_interfaces:
189 """Run standard test teardown and log various show commands
191 super(ACLPluginConnTestCase, self).tearDown()
192 if not self.vpp_dead:
193 self.logger.info(self.vapi.cli("show ip arp"))
194 self.logger.info(self.vapi.cli("show ip6 neighbors"))
195 self.logger.info(self.vapi.cli("show acl-plugin sessions"))
196 self.logger.info(self.vapi.cli("show acl-plugin acl"))
197 self.logger.info(self.vapi.cli("show acl-plugin interface"))
198 self.logger.info(self.vapi.cli("show acl-plugin tables"))
200 def api_acl_add_replace(self, acl_index, r, count=-1, tag="",
202 """Add/replace an ACL
204 :param int acl_index: ACL index to replace, 4294967295 to create new.
205 :param acl_rule r: ACL rules array.
206 :param str tag: symbolic tag (description) for this ACL.
207 :param int count: number of rules.
211 return self.vapi.api(self.vapi.papi.acl_add_replace,
212 {'acl_index': acl_index,
216 }, expected_retval=expected_retval)
218 def api_acl_interface_set_acl_list(self, sw_if_index, count, n_input, acls,
220 return self.vapi.api(self.vapi.papi.acl_interface_set_acl_list,
221 {'sw_if_index': sw_if_index,
225 }, expected_retval=expected_retval)
227 def api_acl_dump(self, acl_index, expected_retval=0):
228 return self.vapi.api(self.vapi.papi.acl_dump,
229 {'acl_index': acl_index},
230 expected_retval=expected_retval)
232 def run_basic_conn_test(self, af, acl_side):
233 """ Basic conn timeout test """
234 conn1 = Conn(self, self.pg0, self.pg1, af, UDP, 42001, 4242)
235 conn1.apply_acls(0, acl_side)
236 conn1.send_through(0)
237 # the return packets should pass
238 conn1.send_through(1)
239 # send some packets on conn1, ensure it doesn't go away
240 for i in IterateWithSleep(self, 20, "Keep conn active", 0.3):
241 conn1.send_through(1)
242 # allow the conn to time out
243 for i in IterateWithSleep(self, 30, "Wait for timeout", 0.1):
245 # now try to send a packet on the reflected side
247 p2 = conn1.send_through(1).command()
249 # If we asserted while waiting, it's good.
250 # the conn should have timed out.
252 self.assert_equal(p2, None, "packet on long-idle conn")
254 def run_active_conn_test(self, af, acl_side):
255 """ Idle connection behind active connection test """
256 base = 10000 + 1000*acl_side
257 conn1 = Conn(self, self.pg0, self.pg1, af, UDP, base + 1, 2323)
258 conn2 = Conn(self, self.pg0, self.pg1, af, UDP, base + 2, 2323)
259 conn3 = Conn(self, self.pg0, self.pg1, af, UDP, base + 3, 2323)
260 conn1.apply_acls(0, acl_side)
263 # create and check that the conn2/3 work
265 conn2.send_pingpong(0)
267 conn3.send_pingpong(0)
268 # send some packets on conn1, keep conn2/3 idle
269 for i in IterateWithSleep(self, 20, "Keep conn active", 0.2):
270 conn1.send_through(1)
272 p2 = conn2.send_through(1).command()
274 # If we asserted while waiting, it's good.
275 # the conn should have timed out.
277 # We should have not received the packet on a long-idle
278 # connection, because it should have timed out
279 # If it didn't - it is a problem
280 self.assert_equal(p2, None, "packet on long-idle conn")
282 def test_0000_conn_prepare_test(self):
283 """ Prepare the settings """
284 self.vapi.ppcli("set acl-plugin session timeout udp idle 1")
286 def test_0001_basic_conn_test(self):
287 """ IPv4: Basic conn timeout test reflect on ingress """
288 self.run_basic_conn_test(AF_INET, 0)
290 def test_0002_basic_conn_test(self):
291 """ IPv4: Basic conn timeout test reflect on egress """
292 self.run_basic_conn_test(AF_INET, 1)
294 def test_0011_active_conn_test(self):
295 """ IPv4: Idle conn behind active conn, reflect on ingress """
296 self.run_active_conn_test(AF_INET, 0)
298 def test_0012_active_conn_test(self):
299 """ IPv4: Idle conn behind active conn, reflect on egress """
300 self.run_active_conn_test(AF_INET, 1)
302 def test_1001_basic_conn_test(self):
303 """ IPv6: Basic conn timeout test reflect on ingress """
304 self.run_basic_conn_test(AF_INET6, 0)
306 def test_1002_basic_conn_test(self):
307 """ IPv6: Basic conn timeout test reflect on egress """
308 self.run_basic_conn_test(AF_INET6, 1)
310 def test_1011_active_conn_test(self):
311 """ IPv6: Idle conn behind active conn, reflect on ingress """
312 self.run_active_conn_test(AF_INET6, 0)
314 def test_1012_active_conn_test(self):
315 """ IPv6: Idle conn behind active conn, reflect on egress """
316 self.run_active_conn_test(AF_INET6, 1)