hs-test: fixed timed out tests passing in the CI
[vpp.git] / test / test_nat44_ed_output.py
1 #!/usr/bin/env python3
2 """NAT44 ED output-feature tests"""
3
4 import random
5 import unittest
6 from scapy.layers.inet import Ether, IP, TCP
7 from scapy.packet import Raw
8 from scapy.data import IP_PROTOS
9 from framework import VppTestCase
10 from asfframework import VppTestRunner
11 from vpp_papi import VppEnum
12
13
14 def get_nat44_ed_in2out_worker_index(ip, vpp_worker_count):
15     if 0 == vpp_worker_count:
16         return 0
17     numeric = socket.inet_aton(ip)
18     numeric = struct.unpack("!L", numeric)[0]
19     numeric = socket.htonl(numeric)
20     h = numeric + (numeric >> 8) + (numeric >> 16) + (numeric >> 24)
21     return 1 + h % vpp_worker_count
22
23
24 class TestNAT44EDOutput(VppTestCase):
25     """NAT44 ED output feature Test Case"""
26
27     max_sessions = 1024
28
29     @classmethod
30     def setUpClass(cls):
31         super().setUpClass()
32         cls.create_pg_interfaces(range(2))
33         cls.interfaces = list(cls.pg_interfaces)
34
35     @classmethod
36     def tearDownClass(cls):
37         super().tearDownClass()
38
39     def setUp(self):
40         super().setUp()
41         for i in self.interfaces:
42             i.admin_up()
43             i.config_ip4()
44             i.resolve_arp()
45         self.vapi.nat44_ed_plugin_enable_disable(sessions=self.max_sessions, enable=1)
46
47     def tearDown(self):
48         if not self.vpp_dead:
49             self.logger.debug(self.vapi.cli("show nat44 sessions"))
50         super().tearDown()
51         if not self.vpp_dead:
52             for i in self.pg_interfaces:
53                 i.unconfig_ip4()
54                 i.admin_down()
55             self.vapi.nat44_ed_plugin_enable_disable(enable=0)
56
57     def test_static_dynamic(self):
58         """Create static mapping which matches existing dynamic mapping"""
59
60         config = self.vapi.nat44_show_running_config()
61         old_timeouts = config.timeouts
62         new_transitory = 2
63         self.vapi.nat_set_timeouts(
64             udp=old_timeouts.udp,
65             tcp_established=old_timeouts.tcp_established,
66             icmp=old_timeouts.icmp,
67             tcp_transitory=new_transitory,
68         )
69
70         local_host = self.pg0.remote_ip4
71         remote_host = self.pg1.remote_ip4
72         nat_intf = self.pg1
73         outside_addr = nat_intf.local_ip4
74
75         self.vapi.nat44_add_del_address_range(
76             first_ip_address=outside_addr,
77             last_ip_address=outside_addr,
78             vrf_id=0xFFFFFFFF,
79             is_add=1,
80             flags=0,
81         )
82         self.vapi.nat44_interface_add_del_feature(
83             sw_if_index=self.pg0.sw_if_index, is_add=1
84         )
85         self.vapi.nat44_interface_add_del_feature(
86             sw_if_index=self.pg0.sw_if_index,
87             flags=VppEnum.vl_api_nat_config_flags_t.NAT_IS_INSIDE,
88             is_add=1,
89         )
90         self.vapi.nat44_ed_add_del_output_interface(
91             sw_if_index=self.pg1.sw_if_index, is_add=1
92         )
93
94         thread_index = get_nat44_ed_in2out_worker_index(
95             local_host, self.vpp_worker_count
96         )
97         port_per_thread = int((0xFFFF - 1024) / max(1, self.vpp_worker_count))
98         local_sport = 1024 + random.randint(1, port_per_thread)
99         if self.vpp_worker_count > 0:
100             local_sport += port_per_thread * (thread_index - 1)
101
102         remote_dport = 10000
103
104         pg0 = self.pg0
105         pg1 = self.pg1
106
107         # first setup a dynamic TCP session
108
109         # SYN packet in->out
110         p = (
111             Ether(src=pg0.remote_mac, dst=pg0.local_mac)
112             / IP(src=local_host, dst=remote_host)
113             / TCP(sport=local_sport, dport=remote_dport, flags="S")
114         )
115         p = self.send_and_expect(pg0, [p], pg1)[0]
116
117         self.assertEqual(p[IP].src, outside_addr)
118         self.assertEqual(p[TCP].sport, local_sport)
119         outside_port = p[TCP].sport
120
121         # SYN+ACK packet out->in
122         p = (
123             Ether(src=pg1.remote_mac, dst=pg1.local_mac)
124             / IP(src=remote_host, dst=outside_addr)
125             / TCP(sport=remote_dport, dport=outside_port, flags="SA")
126         )
127         self.send_and_expect(pg1, [p], pg0)
128
129         # ACK packet in->out
130         p = (
131             Ether(src=pg0.remote_mac, dst=pg0.local_mac)
132             / IP(src=local_host, dst=remote_host)
133             / TCP(sport=local_sport, dport=remote_dport, flags="A")
134         )
135         self.send_and_expect(pg0, [p], pg1)
136
137         # now we have a session up, create a conflicting static mapping
138         self.vapi.nat44_add_del_static_mapping(
139             is_add=1,
140             local_ip_address=local_host,
141             external_ip_address=outside_addr,
142             external_sw_if_index=0xFFFFFFFF,
143             local_port=local_sport,
144             external_port=outside_port,
145             protocol=IP_PROTOS.tcp,
146             flags=VppEnum.vl_api_nat_config_flags_t.NAT_IS_OUT2IN_ONLY,
147         )
148
149         sessions = self.vapi.nat44_user_session_dump(local_host, 0)
150         self.assertEqual(1, len(sessions))
151
152         # now send some more data over existing session - it should pass
153
154         # in->out
155         p = (
156             Ether(src=pg0.remote_mac, dst=pg0.local_mac)
157             / IP(src=local_host, dst=remote_host)
158             / TCP(sport=local_sport, dport=remote_dport)
159             / Raw("zippity zap")
160         )
161         self.send_and_expect(pg0, [p], pg1)
162
163         # out->in
164         p = (
165             Ether(src=pg1.remote_mac, dst=pg1.local_mac)
166             / IP(src=remote_host, dst=outside_addr)
167             / TCP(sport=remote_dport, dport=outside_port)
168             / Raw("flippity flop")
169         )
170         self.send_and_expect(pg1, [p], pg0)
171
172         # now close the session
173
174         # FIN packet in -> out
175         p = (
176             Ether(src=pg0.remote_mac, dst=pg0.local_mac)
177             / IP(src=local_host, dst=remote_host)
178             / TCP(sport=local_sport, dport=remote_dport, flags="FA", seq=100, ack=300)
179         )
180         self.send_and_expect(pg0, [p], pg1)
181
182         # FIN+ACK packet out -> in
183         p = (
184             Ether(src=pg1.remote_mac, dst=pg1.local_mac)
185             / IP(src=remote_host, dst=outside_addr)
186             / TCP(sport=remote_dport, dport=outside_port, flags="FA", seq=300, ack=101)
187         )
188         self.send_and_expect(pg1, [p], pg0)
189
190         # ACK packet in -> out
191         p = (
192             Ether(src=pg0.remote_mac, dst=pg0.local_mac)
193             / IP(src=local_host, dst=remote_host)
194             / TCP(sport=local_sport, dport=remote_dport, flags="A", seq=101, ack=301)
195         )
196         self.send_and_expect(pg0, [p], pg1)
197
198         # session now in transitory timeout
199         # try SYN packet in->out - should be dropped
200         p = (
201             Ether(src=pg0.remote_mac, dst=pg0.local_mac)
202             / IP(src=local_host, dst=remote_host)
203             / TCP(sport=local_sport, dport=remote_dport, flags="S")
204         )
205         pg0.add_stream(p)
206         self.pg_enable_capture()
207         self.pg_start()
208
209         self.sleep(new_transitory, "wait for transitory timeout")
210         pg0.assert_nothing_captured(0)
211
212         # session should still exist
213         sessions = self.vapi.nat44_user_session_dump(pg0.remote_ip4, 0)
214         self.assertEqual(1, len(sessions))
215
216         # send FIN+ACK packet in->out - will cause session to be wiped
217         # but won't create a new session
218         p = (
219             Ether(src=pg0.remote_mac, dst=pg0.local_mac)
220             / IP(src=local_host, dst=remote_host)
221             / TCP(sport=local_sport, dport=remote_dport, flags="FA", seq=300, ack=101)
222         )
223         pg1.add_stream(p)
224         self.pg_enable_capture()
225         self.pg_start()
226         pg0.assert_nothing_captured(0)
227
228         sessions = self.vapi.nat44_user_session_dump(pg0.remote_ip4, 0)
229         self.assertEqual(0, len(sessions))
230
231         # create a new session and make sure the outside port is remapped
232         # SYN packet in->out
233
234         p = (
235             Ether(src=pg0.remote_mac, dst=pg0.local_mac)
236             / IP(src=local_host, dst=remote_host)
237             / TCP(sport=local_sport, dport=remote_dport, flags="S")
238         )
239         p = self.send_and_expect(pg0, [p], pg1)[0]
240
241         self.assertEqual(p[IP].src, outside_addr)
242         self.assertNotEqual(p[TCP].sport, local_sport)
243
244         # make sure static mapping works and creates a new session
245         # SYN packet out->in
246         p = (
247             Ether(src=pg1.remote_mac, dst=pg1.local_mac)
248             / IP(src=remote_host, dst=outside_addr)
249             / TCP(sport=remote_dport, dport=outside_port, flags="S")
250         )
251         self.send_and_expect(pg1, [p], pg0)
252
253         sessions = self.vapi.nat44_user_session_dump(pg0.remote_ip4, 0)
254         self.assertEqual(2, len(sessions))
255
256
257 if __name__ == "__main__":
258     unittest.main(testRunner=VppTestRunner)