2 """ ACL plugin extended stateful tests """
5 from framework import VppTestCase, VppTestRunner, running_extended_tests
6 from scapy.layers.l2 import Ether
7 from scapy.packet import Raw
8 from scapy.layers.inet import IP, UDP
9 from scapy.packet import Packet
10 from socket import inet_pton, AF_INET, AF_INET6
11 from scapy.layers.inet6 import IPv6, ICMPv6Unknown, ICMPv6EchoRequest
12 from scapy.layers.inet6 import ICMPv6EchoReply, IPv6ExtHdrRouting
13 from scapy.layers.inet6 import IPv6ExtHdrFragment
14 from pprint import pprint
15 from random import randint
18 def to_acl_rule(self, is_permit, wildcard_sport=False):
20 rule_family = AF_INET6 if p.haslayer(IPv6) else AF_INET
21 rule_prefix_len = 128 if p.haslayer(IPv6) else 32
22 rule_l3_layer = IPv6 if p.haslayer(IPv6) else IP
23 rule_l4_sport = p.sport
24 rule_l4_dport = p.dport
26 rule_l4_proto = p[IPv6].nh
28 rule_l4_proto = p[IP].proto
31 rule_l4_sport_first = 0
32 rule_l4_sport_last = 65535
34 rule_l4_sport_first = rule_l4_sport
35 rule_l4_sport_last = rule_l4_sport
38 'is_permit': is_permit,
39 'is_ipv6': p.haslayer(IPv6),
40 'src_ip_addr': inet_pton(rule_family,
41 p[rule_l3_layer].src),
42 'src_ip_prefix_len': rule_prefix_len,
43 'dst_ip_addr': inet_pton(rule_family,
44 p[rule_l3_layer].dst),
45 'dst_ip_prefix_len': rule_prefix_len,
46 'srcport_or_icmptype_first': rule_l4_sport_first,
47 'srcport_or_icmptype_last': rule_l4_sport_last,
48 'dstport_or_icmpcode_first': rule_l4_dport,
49 'dstport_or_icmpcode_last': rule_l4_dport,
50 'proto': rule_l4_proto,
54 Packet.to_acl_rule = to_acl_rule
57 class IterateWithSleep():
58 def __init__(self, testcase, n_iters, description, sleep_sec):
60 self.testcase = testcase
61 self.n_iters = n_iters
62 self.sleep_sec = sleep_sec
63 self.description = description
66 for x in range(0, self.n_iters):
68 self.testcase.sleep(self.sleep_sec)
72 def __init__(self, testcase, if1, if2, af, l4proto, port1, port2):
73 self.testcase = testcase
74 self.ifs = [None, None]
77 self.address_family = af
78 self.l4proto = l4proto
79 self.ports = [None, None]
85 is_ip6 = 1 if self.address_family == AF_INET6 else 0
90 layer_3 = [IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4),
91 IPv6(src=src_if.remote_ip6, dst=dst_if.remote_ip6)]
93 p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
95 self.l4proto(sport=self.ports[s0], dport=self.ports[s1]) /
99 def apply_acls(self, reflect_side, acl_side):
101 pkts.append(self.pkt(0))
102 pkts.append(self.pkt(1))
103 pkt = pkts[reflect_side]
106 r.append(pkt.to_acl_rule(2, wildcard_sport=True))
107 r.append(self.wildcard_rule(0))
108 res = self.testcase.api_acl_add_replace(0xffffffff, r)
109 self.testcase.assert_equal(res.retval, 0, "error adding ACL")
110 reflect_acl_index = res.acl_index
113 r.append(self.wildcard_rule(0))
114 res = self.testcase.api_acl_add_replace(0xffffffff, r)
115 self.testcase.assert_equal(res.retval, 0, "error adding deny ACL")
116 deny_acl_index = res.acl_index
118 if reflect_side == acl_side:
119 self.testcase.api_acl_interface_set_acl_list(
120 self.ifs[acl_side].sw_if_index, 2, 1,
123 self.testcase.api_acl_interface_set_acl_list(
124 self.ifs[1-acl_side].sw_if_index, 0, 0, [])
126 self.testcase.api_acl_interface_set_acl_list(
127 self.ifs[acl_side].sw_if_index, 2, 1,
130 self.testcase.api_acl_interface_set_acl_list(
131 self.ifs[1-acl_side].sw_if_index, 0, 0, [])
133 def wildcard_rule(self, is_permit):
134 any_addr = ["0.0.0.0", "::"]
135 rule_family = self.address_family
136 is_ip6 = 1 if rule_family == AF_INET6 else 0
138 'is_permit': is_permit,
140 'src_ip_addr': inet_pton(rule_family, any_addr[is_ip6]),
141 'src_ip_prefix_len': 0,
142 'dst_ip_addr': inet_pton(rule_family, any_addr[is_ip6]),
143 'dst_ip_prefix_len': 0,
144 'srcport_or_icmptype_first': 0,
145 'srcport_or_icmptype_last': 65535,
146 'dstport_or_icmpcode_first': 0,
147 'dstport_or_icmpcode_last': 65535,
152 def send(self, side):
153 self.ifs[side].add_stream(self.pkt(side))
154 self.ifs[1-side].enable_capture()
155 self.testcase.pg_start()
157 def recv(self, side):
158 p = self.ifs[side].wait_for_packet(1)
161 def send_through(self, side):
163 p = self.recv(1-side)
166 def send_pingpong(self, side):
167 p1 = self.send_through(side)
168 p2 = self.send_through(1-side)
172 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
173 class ACLPluginConnTestCase(VppTestCase):
174 """ ACL plugin connection-oriented extended testcases """
177 def setUpClass(self):
178 super(ACLPluginConnTestCase, self).setUpClass()
180 self.create_pg_interfaces(range(2))
181 for i in self.pg_interfaces:
188 def api_acl_add_replace(self, acl_index, r, count=-1, tag="",
190 """Add/replace an ACL
192 :param int acl_index: ACL index to replace, 4294967295 to create new.
193 :param acl_rule r: ACL rules array.
194 :param str tag: symbolic tag (description) for this ACL.
195 :param int count: number of rules.
199 return self.vapi.api(self.vapi.papi.acl_add_replace,
200 {'acl_index': acl_index,
204 }, expected_retval=expected_retval)
206 def api_acl_interface_set_acl_list(self, sw_if_index, count, n_input, acls,
208 return self.vapi.api(self.vapi.papi.acl_interface_set_acl_list,
209 {'sw_if_index': sw_if_index,
213 }, expected_retval=expected_retval)
215 def api_acl_dump(self, acl_index, expected_retval=0):
216 return self.vapi.api(self.vapi.papi.acl_dump,
217 {'acl_index': acl_index},
218 expected_retval=expected_retval)
220 def run_basic_conn_test(self, af, acl_side):
221 """ Basic conn timeout test """
222 conn1 = Conn(self, self.pg0, self.pg1, af, UDP, 42001, 4242)
223 conn1.apply_acls(0, acl_side)
224 conn1.send_through(0)
225 # the return packets should pass
226 conn1.send_through(1)
227 # send some packets on conn1, ensure it doesn't go away
228 for i in IterateWithSleep(self, 20, "Keep conn active", 0.3):
229 conn1.send_through(1)
230 # allow the conn to time out
231 for i in IterateWithSleep(self, 30, "Wait for timeout", 0.1):
233 # now try to send a packet on the reflected side
235 p2 = conn1.send_through(1).command()
237 # If we asserted while waiting, it's good.
238 # the conn should have timed out.
240 self.assert_equal(p2, None, "packet on long-idle conn")
242 def run_active_conn_test(self, af, acl_side):
243 """ Idle connection behind active connection test """
244 base = 10000 + 1000*acl_side
245 conn1 = Conn(self, self.pg0, self.pg1, af, UDP, base + 1, 2323)
246 conn2 = Conn(self, self.pg0, self.pg1, af, UDP, base + 2, 2323)
247 conn3 = Conn(self, self.pg0, self.pg1, af, UDP, base + 3, 2323)
248 conn1.apply_acls(0, acl_side)
251 # create and check that the conn2/3 work
253 conn2.send_pingpong(0)
255 conn3.send_pingpong(0)
256 # send some packets on conn1, keep conn2/3 idle
257 for i in IterateWithSleep(self, 20, "Keep conn active", 0.2):
258 conn1.send_through(1)
260 p2 = conn2.send_through(1).command()
262 # If we asserted while waiting, it's good.
263 # the conn should have timed out.
265 # We should have not received the packet on a long-idle
266 # connection, because it should have timed out
267 # If it didn't - it is a problem
268 self.assert_equal(p2, None, "packet on long-idle conn")
270 def test_0000_conn_prepare_test(self):
271 """ Prepare the settings """
272 self.vapi.ppcli("set acl-plugin session timeout udp idle 1")
274 def test_0001_basic_conn_test(self):
275 """ IPv4: Basic conn timeout test reflect on ingress """
276 self.run_basic_conn_test(AF_INET, 0)
278 def test_0002_basic_conn_test(self):
279 """ IPv4: Basic conn timeout test reflect on egress """
280 self.run_basic_conn_test(AF_INET, 1)
282 def test_0011_active_conn_test(self):
283 """ IPv4: Idle conn behind active conn, reflect on ingress """
284 self.run_active_conn_test(AF_INET, 0)
286 def test_0012_active_conn_test(self):
287 """ IPv4: Idle conn behind active conn, reflect on egress """
288 self.run_active_conn_test(AF_INET, 1)
290 def test_1001_basic_conn_test(self):
291 """ IPv6: Basic conn timeout test reflect on ingress """
292 self.run_basic_conn_test(AF_INET6, 0)
294 def test_1002_basic_conn_test(self):
295 """ IPv6: Basic conn timeout test reflect on egress """
296 self.run_basic_conn_test(AF_INET6, 1)
298 def test_1011_active_conn_test(self):
299 """ IPv6: Idle conn behind active conn, reflect on ingress """
300 self.run_active_conn_test(AF_INET6, 0)
302 def test_1012_active_conn_test(self):
303 """ IPv6: Idle conn behind active conn, reflect on egress """
304 self.run_active_conn_test(AF_INET6, 1)