tests: replace pycodestyle with black
[vpp.git] / test / test_acl_plugin_conns.py
1 #!/usr/bin/env python3
2 """ ACL plugin extended stateful tests """
3
4 import unittest
5 from config import config
6 from framework import VppTestCase, VppTestRunner
7 from scapy.layers.l2 import Ether
8 from scapy.packet import Raw
9 from scapy.layers.inet import IP, UDP, TCP
10 from scapy.packet import Packet
11 from socket import inet_pton, AF_INET, AF_INET6
12 from scapy.layers.inet6 import IPv6, ICMPv6Unknown, ICMPv6EchoRequest
13 from scapy.layers.inet6 import ICMPv6EchoReply, IPv6ExtHdrRouting
14 from scapy.layers.inet6 import IPv6ExtHdrFragment
15 from pprint import pprint
16 from random import randint
17 from util import L4_Conn
18 from ipaddress import ip_network
19
20 from vpp_acl import AclRule, VppAcl, VppAclInterface
21
22
23 def to_acl_rule(self, is_permit, wildcard_sport=False):
24     p = self
25     rule_family = AF_INET6 if p.haslayer(IPv6) else AF_INET
26     rule_prefix_len = 128 if p.haslayer(IPv6) else 32
27     rule_l3_layer = IPv6 if p.haslayer(IPv6) else IP
28     rule_l4_sport = p.sport
29     rule_l4_dport = p.dport
30     if p.haslayer(IPv6):
31         rule_l4_proto = p[IPv6].nh
32     else:
33         rule_l4_proto = p[IP].proto
34
35     if wildcard_sport:
36         rule_l4_sport_first = 0
37         rule_l4_sport_last = 65535
38     else:
39         rule_l4_sport_first = rule_l4_sport
40         rule_l4_sport_last = rule_l4_sport
41
42     new_rule = AclRule(
43         is_permit=is_permit,
44         proto=rule_l4_proto,
45         src_prefix=ip_network((p[rule_l3_layer].src, rule_prefix_len)),
46         dst_prefix=ip_network((p[rule_l3_layer].dst, rule_prefix_len)),
47         sport_from=rule_l4_sport_first,
48         sport_to=rule_l4_sport_last,
49         dport_from=rule_l4_dport,
50         dport_to=rule_l4_dport,
51     )
52
53     return new_rule
54
55
56 Packet.to_acl_rule = to_acl_rule
57
58
59 class IterateWithSleep:
60     def __init__(self, testcase, n_iters, description, sleep_sec):
61         self.curr = 0
62         self.testcase = testcase
63         self.n_iters = n_iters
64         self.sleep_sec = sleep_sec
65         self.description = description
66
67     def __iter__(self):
68         for x in range(0, self.n_iters):
69             yield x
70             self.testcase.sleep(self.sleep_sec)
71
72
73 class Conn(L4_Conn):
74     def apply_acls(self, reflect_side, acl_side):
75         pkts = []
76         pkts.append(self.pkt(0))
77         pkts.append(self.pkt(1))
78         pkt = pkts[reflect_side]
79
80         r = []
81         r.append(pkt.to_acl_rule(2, wildcard_sport=True))
82         r.append(self.wildcard_rule(0))
83         reflect_acl = VppAcl(self.testcase, r)
84         reflect_acl.add_vpp_config()
85
86         r = []
87         r.append(self.wildcard_rule(0))
88         deny_acl = VppAcl(self.testcase, r)
89         deny_acl.add_vpp_config()
90
91         if reflect_side == acl_side:
92             acl_if0 = VppAclInterface(
93                 self.testcase,
94                 self.ifs[acl_side].sw_if_index,
95                 [reflect_acl, deny_acl],
96                 n_input=1,
97             )
98             acl_if1 = VppAclInterface(
99                 self.testcase, self.ifs[1 - acl_side].sw_if_index, [], n_input=0
100             )
101             acl_if0.add_vpp_config()
102             acl_if1.add_vpp_config()
103         else:
104             acl_if0 = VppAclInterface(
105                 self.testcase,
106                 self.ifs[acl_side].sw_if_index,
107                 [deny_acl, reflect_acl],
108                 n_input=1,
109             )
110             acl_if1 = VppAclInterface(
111                 self.testcase, self.ifs[1 - acl_side].sw_if_index, [], n_input=0
112             )
113             acl_if0.add_vpp_config()
114             acl_if1.add_vpp_config()
115
116     def wildcard_rule(self, is_permit):
117         any_addr = ["0.0.0.0", "::"]
118         rule_family = self.address_family
119         is_ip6 = 1 if rule_family == AF_INET6 else 0
120         new_rule = AclRule(
121             is_permit=is_permit,
122             proto=0,
123             src_prefix=ip_network((any_addr[is_ip6], 0)),
124             dst_prefix=ip_network((any_addr[is_ip6], 0)),
125             sport_from=0,
126             sport_to=65535,
127             dport_from=0,
128             dport_to=65535,
129         )
130         return new_rule
131
132
133 @unittest.skipUnless(config.extended, "part of extended tests")
134 class ACLPluginConnTestCase(VppTestCase):
135     """ACL plugin connection-oriented extended testcases"""
136
137     @classmethod
138     def setUpClass(cls):
139         super(ACLPluginConnTestCase, cls).setUpClass()
140         # create pg0 and pg1
141         cls.create_pg_interfaces(range(2))
142         cmd = "set acl-plugin session table event-trace 1"
143         cls.logger.info(cls.vapi.cli(cmd))
144         for i in cls.pg_interfaces:
145             i.admin_up()
146             i.config_ip4()
147             i.config_ip6()
148             i.resolve_arp()
149             i.resolve_ndp()
150
151     @classmethod
152     def tearDownClass(cls):
153         super(ACLPluginConnTestCase, cls).tearDownClass()
154
155     def tearDown(self):
156         """Run standard test teardown and log various show commands"""
157         super(ACLPluginConnTestCase, self).tearDown()
158
159     def show_commands_at_teardown(self):
160         self.logger.info(self.vapi.cli("show ip neighbors"))
161         self.logger.info(self.vapi.cli("show ip6 neighbors"))
162         self.logger.info(self.vapi.cli("show acl-plugin sessions"))
163         self.logger.info(self.vapi.cli("show acl-plugin acl"))
164         self.logger.info(self.vapi.cli("show acl-plugin interface"))
165         self.logger.info(self.vapi.cli("show acl-plugin tables"))
166         self.logger.info(self.vapi.cli("show event-logger all"))
167
168     def run_basic_conn_test(self, af, acl_side):
169         """Basic conn timeout test"""
170         conn1 = Conn(self, self.pg0, self.pg1, af, UDP, 42001, 4242)
171         conn1.apply_acls(0, acl_side)
172         conn1.send_through(0)
173         # the return packets should pass
174         conn1.send_through(1)
175         # send some packets on conn1, ensure it doesn't go away
176         for i in IterateWithSleep(self, 20, "Keep conn active", 0.3):
177             conn1.send_through(1)
178         # allow the conn to time out
179         for i in IterateWithSleep(self, 30, "Wait for timeout", 0.1):
180             pass
181         # now try to send a packet on the reflected side
182         try:
183             p2 = conn1.send_through(1).command()
184         except:
185             # If we asserted while waiting, it's good.
186             # the conn should have timed out.
187             p2 = None
188         self.assert_equal(p2, None, "packet on long-idle conn")
189
190     def run_active_conn_test(self, af, acl_side):
191         """Idle connection behind active connection test"""
192         base = 10000 + 1000 * acl_side
193         conn1 = Conn(self, self.pg0, self.pg1, af, UDP, base + 1, 2323)
194         conn2 = Conn(self, self.pg0, self.pg1, af, UDP, base + 2, 2323)
195         conn3 = Conn(self, self.pg0, self.pg1, af, UDP, base + 3, 2323)
196         conn1.apply_acls(0, acl_side)
197         conn1.send(0)
198         conn1.recv(1)
199         # create and check that the conn2/3 work
200         self.sleep(0.1)
201         conn2.send_pingpong(0)
202         self.sleep(0.1)
203         conn3.send_pingpong(0)
204         # send some packets on conn1, keep conn2/3 idle
205         for i in IterateWithSleep(self, 20, "Keep conn active", 0.2):
206             conn1.send_through(1)
207         try:
208             p2 = conn2.send_through(1).command()
209         except:
210             # If we asserted while waiting, it's good.
211             # the conn should have timed out.
212             p2 = None
213         # We should have not received the packet on a long-idle
214         # connection, because it should have timed out
215         # If it didn't - it is a problem
216         self.assert_equal(p2, None, "packet on long-idle conn")
217
218     def run_clear_conn_test(self, af, acl_side):
219         """Clear the connections via CLI"""
220         conn1 = Conn(self, self.pg0, self.pg1, af, UDP, 42001, 4242)
221         conn1.apply_acls(0, acl_side)
222         conn1.send_through(0)
223         # the return packets should pass
224         conn1.send_through(1)
225         # send some packets on conn1, ensure it doesn't go away
226         for i in IterateWithSleep(self, 20, "Keep conn active", 0.3):
227             conn1.send_through(1)
228         # clear all connections
229         self.vapi.ppcli("clear acl-plugin sessions")
230         # now try to send a packet on the reflected side
231         try:
232             p2 = conn1.send_through(1).command()
233         except:
234             # If we asserted while waiting, it's good.
235             # the conn should have timed out.
236             p2 = None
237         self.assert_equal(p2, None, "packet on supposedly deleted conn")
238
239     def run_tcp_transient_setup_conn_test(self, af, acl_side):
240         conn1 = Conn(self, self.pg0, self.pg1, af, TCP, 53001, 5151)
241         conn1.apply_acls(0, acl_side)
242         conn1.send_through(0, "S")
243         # the return packets should pass
244         conn1.send_through(1, "SA")
245         # allow the conn to time out
246         for i in IterateWithSleep(self, 30, "Wait for timeout", 0.1):
247             pass
248         # ensure conn times out
249         try:
250             p2 = conn1.send_through(1).command()
251         except:
252             # If we asserted while waiting, it's good.
253             # the conn should have timed out.
254             p2 = None
255         self.assert_equal(p2, None, "packet on supposedly deleted conn")
256
257     def run_tcp_established_conn_test(self, af, acl_side):
258         conn1 = Conn(self, self.pg0, self.pg1, af, TCP, 53002, 5052)
259         conn1.apply_acls(0, acl_side)
260         conn1.send_through(0, "S")
261         # the return packets should pass
262         conn1.send_through(1, "SA")
263         # complete the threeway handshake
264         # (NB: sequence numbers not tracked, so not set!)
265         conn1.send_through(0, "A")
266         # allow the conn to time out if it's in embryonic timer
267         for i in IterateWithSleep(self, 30, "Wait for transient timeout", 0.1):
268             pass
269         # Try to send the packet from the "forbidden" side - it must pass
270         conn1.send_through(1, "A")
271         # ensure conn times out for real
272         for i in IterateWithSleep(self, 130, "Wait for timeout", 0.1):
273             pass
274         try:
275             p2 = conn1.send_through(1).command()
276         except:
277             # If we asserted while waiting, it's good.
278             # the conn should have timed out.
279             p2 = None
280         self.assert_equal(p2, None, "packet on supposedly deleted conn")
281
282     def run_tcp_transient_teardown_conn_test(self, af, acl_side):
283         conn1 = Conn(self, self.pg0, self.pg1, af, TCP, 53002, 5052)
284         conn1.apply_acls(0, acl_side)
285         conn1.send_through(0, "S")
286         # the return packets should pass
287         conn1.send_through(1, "SA")
288         # complete the threeway handshake
289         # (NB: sequence numbers not tracked, so not set!)
290         conn1.send_through(0, "A")
291         # allow the conn to time out if it's in embryonic timer
292         for i in IterateWithSleep(self, 30, "Wait for transient timeout", 0.1):
293             pass
294         # Try to send the packet from the "forbidden" side - it must pass
295         conn1.send_through(1, "A")
296         # Send the FIN to bounce the session out of established
297         conn1.send_through(1, "FA")
298         # If conn landed on transient timer it will time out here
299         for i in IterateWithSleep(self, 30, "Wait for transient timeout", 0.1):
300             pass
301         # Now it should have timed out already
302         try:
303             p2 = conn1.send_through(1).command()
304         except:
305             # If we asserted while waiting, it's good.
306             # the conn should have timed out.
307             p2 = None
308         self.assert_equal(p2, None, "packet on supposedly deleted conn")
309
310     def test_0000_conn_prepare_test(self):
311         """Prepare the settings"""
312         self.vapi.ppcli("set acl-plugin session timeout udp idle 1")
313
314     def test_0001_basic_conn_test(self):
315         """IPv4: Basic conn timeout test reflect on ingress"""
316         self.run_basic_conn_test(AF_INET, 0)
317
318     def test_0002_basic_conn_test(self):
319         """IPv4: Basic conn timeout test reflect on egress"""
320         self.run_basic_conn_test(AF_INET, 1)
321
322     def test_0005_clear_conn_test(self):
323         """IPv4: reflect egress, clear conn"""
324         self.run_clear_conn_test(AF_INET, 1)
325
326     def test_0006_clear_conn_test(self):
327         """IPv4: reflect ingress, clear conn"""
328         self.run_clear_conn_test(AF_INET, 0)
329
330     def test_0011_active_conn_test(self):
331         """IPv4: Idle conn behind active conn, reflect on ingress"""
332         self.run_active_conn_test(AF_INET, 0)
333
334     def test_0012_active_conn_test(self):
335         """IPv4: Idle conn behind active conn, reflect on egress"""
336         self.run_active_conn_test(AF_INET, 1)
337
338     def test_1001_basic_conn_test(self):
339         """IPv6: Basic conn timeout test reflect on ingress"""
340         self.run_basic_conn_test(AF_INET6, 0)
341
342     def test_1002_basic_conn_test(self):
343         """IPv6: Basic conn timeout test reflect on egress"""
344         self.run_basic_conn_test(AF_INET6, 1)
345
346     def test_1005_clear_conn_test(self):
347         """IPv6: reflect egress, clear conn"""
348         self.run_clear_conn_test(AF_INET6, 1)
349
350     def test_1006_clear_conn_test(self):
351         """IPv6: reflect ingress, clear conn"""
352         self.run_clear_conn_test(AF_INET6, 0)
353
354     def test_1011_active_conn_test(self):
355         """IPv6: Idle conn behind active conn, reflect on ingress"""
356         self.run_active_conn_test(AF_INET6, 0)
357
358     def test_1012_active_conn_test(self):
359         """IPv6: Idle conn behind active conn, reflect on egress"""
360         self.run_active_conn_test(AF_INET6, 1)
361
362     def test_2000_prepare_for_tcp_test(self):
363         """Prepare for TCP session tests"""
364         # ensure the session hangs on if it gets treated as UDP
365         self.vapi.ppcli("set acl-plugin session timeout udp idle 200")
366         # let the TCP connection time out at 5 seconds
367         self.vapi.ppcli("set acl-plugin session timeout tcp idle 10")
368         self.vapi.ppcli("set acl-plugin session timeout tcp transient 1")
369
370     def test_2001_tcp_transient_conn_test(self):
371         """IPv4: transient TCP session (incomplete 3WHS), ref. on ingress"""
372         self.run_tcp_transient_setup_conn_test(AF_INET, 0)
373
374     def test_2002_tcp_transient_conn_test(self):
375         """IPv4: transient TCP session (incomplete 3WHS), ref. on egress"""
376         self.run_tcp_transient_setup_conn_test(AF_INET, 1)
377
378     def test_2003_tcp_transient_conn_test(self):
379         """IPv4: established TCP session (complete 3WHS), ref. on ingress"""
380         self.run_tcp_established_conn_test(AF_INET, 0)
381
382     def test_2004_tcp_transient_conn_test(self):
383         """IPv4: established TCP session (complete 3WHS), ref. on egress"""
384         self.run_tcp_established_conn_test(AF_INET, 1)
385
386     def test_2005_tcp_transient_teardown_conn_test(self):
387         """IPv4: transient TCP session (3WHS,ACK,FINACK), ref. on ingress"""
388         self.run_tcp_transient_teardown_conn_test(AF_INET, 0)
389
390     def test_2006_tcp_transient_teardown_conn_test(self):
391         """IPv4: transient TCP session (3WHS,ACK,FINACK), ref. on egress"""
392         self.run_tcp_transient_teardown_conn_test(AF_INET, 1)
393
394     def test_3001_tcp_transient_conn_test(self):
395         """IPv6: transient TCP session (incomplete 3WHS), ref. on ingress"""
396         self.run_tcp_transient_setup_conn_test(AF_INET6, 0)
397
398     def test_3002_tcp_transient_conn_test(self):
399         """IPv6: transient TCP session (incomplete 3WHS), ref. on egress"""
400         self.run_tcp_transient_setup_conn_test(AF_INET6, 1)
401
402     def test_3003_tcp_transient_conn_test(self):
403         """IPv6: established TCP session (complete 3WHS), ref. on ingress"""
404         self.run_tcp_established_conn_test(AF_INET6, 0)
405
406     def test_3004_tcp_transient_conn_test(self):
407         """IPv6: established TCP session (complete 3WHS), ref. on egress"""
408         self.run_tcp_established_conn_test(AF_INET6, 1)
409
410     def test_3005_tcp_transient_teardown_conn_test(self):
411         """IPv6: transient TCP session (3WHS,ACK,FINACK), ref. on ingress"""
412         self.run_tcp_transient_teardown_conn_test(AF_INET6, 0)
413
414     def test_3006_tcp_transient_teardown_conn_test(self):
415         """IPv6: transient TCP session (3WHS,ACK,FINACK), ref. on egress"""
416         self.run_tcp_transient_teardown_conn_test(AF_INET6, 1)