tests: Use errno value rather than a specific int
[vpp.git] / test / test_pnat.py
1 #!/usr/bin/env python3
2 """Policy 1:1 NAT functional tests"""
3
4 import unittest
5 from scapy.layers.inet import Ether, IP, UDP, ICMP
6 from framework import VppTestCase
7 from asfframework import VppTestRunner
8 from vpp_papi import VppEnum
9
10
11 class TestPNAT(VppTestCase):
12     """PNAT Test Case"""
13
14     maxDiff = None
15
16     @classmethod
17     def setUpClass(cls):
18         super(TestPNAT, cls).setUpClass()
19         cls.create_pg_interfaces(range(2))
20         cls.interfaces = list(cls.pg_interfaces)
21
22     @classmethod
23     def tearDownClass(cls):
24         super(TestPNAT, cls).tearDownClass()
25
26     def setUp(self):
27         super(TestPNAT, self).setUp()
28         for i in self.interfaces:
29             i.admin_up()
30             i.config_ip4()
31             i.resolve_arp()
32
33     def tearDown(self):
34         super(TestPNAT, self).tearDown()
35         if not self.vpp_dead:
36             for i in self.pg_interfaces:
37                 i.unconfig_ip4()
38                 i.admin_down()
39
40     def validate(self, rx, expected):
41         self.assertEqual(rx, expected.__class__(expected))
42
43     def validate_bytes(self, rx, expected):
44         self.assertEqual(rx, expected)
45
46     def ping_check(self):
47         """Verify non matching traffic works."""
48         p_ether = Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
49
50         icmpecho = IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4) / ICMP()
51         reply = IP(src=self.pg0.local_ip4, dst=self.pg0.remote_ip4) / ICMP(
52             type="echo-reply"
53         )
54         rx = self.send_and_expect(self.pg0, p_ether / icmpecho * 1, self.pg0)
55         for p in rx:
56             reply[IP].id = p[IP].id
57             self.validate(p[1], reply)
58
59     def test_pnat(self):
60         """PNAT test"""
61
62         PNAT_IP4_INPUT = VppEnum.vl_api_pnat_attachment_point_t.PNAT_IP4_INPUT
63         PNAT_IP4_OUTPUT = VppEnum.vl_api_pnat_attachment_point_t.PNAT_IP4_OUTPUT
64
65         tests = [
66             {
67                 "input": PNAT_IP4_INPUT,
68                 "sw_if_index": self.pg0.sw_if_index,
69                 "match": {
70                     "mask": 0xA,
71                     "dst": "10.10.10.10",
72                     "proto": 17,
73                     "dport": 6871,
74                 },
75                 "rewrite": {"mask": 0x2, "dst": self.pg1.remote_ip4},
76                 "send": (
77                     IP(src=self.pg0.remote_ip4, dst="10.10.10.10") / UDP(dport=6871)
78                 ),
79                 "reply": (
80                     IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
81                     / UDP(dport=6871)
82                 ),
83             },
84             {
85                 "input": PNAT_IP4_OUTPUT,
86                 "sw_if_index": self.pg1.sw_if_index,
87                 "match": {
88                     "mask": 0x9,
89                     "src": self.pg0.remote_ip4,
90                     "proto": 17,
91                     "dport": 6871,
92                 },
93                 "rewrite": {"mask": 0x1, "src": "11.11.11.11"},
94                 "send": (
95                     IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
96                     / UDP(dport=6871)
97                 ),
98                 "reply": (
99                     IP(src="11.11.11.11", dst=self.pg1.remote_ip4) / UDP(dport=6871)
100                 ),
101             },
102             {
103                 "input": PNAT_IP4_INPUT,
104                 "sw_if_index": self.pg0.sw_if_index,
105                 "match": {
106                     "mask": 0xA,
107                     "dst": "10.10.10.10",
108                     "proto": 17,
109                     "dport": 6871,
110                 },
111                 "rewrite": {"mask": 0xA, "dst": self.pg1.remote_ip4, "dport": 5555},
112                 "send": (
113                     IP(src=self.pg0.remote_ip4, dst="10.10.10.10")
114                     / UDP(sport=65530, dport=6871)
115                 ),
116                 "reply": (
117                     IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
118                     / UDP(sport=65530, dport=5555)
119                 ),
120             },
121             {
122                 "input": PNAT_IP4_INPUT,
123                 "sw_if_index": self.pg0.sw_if_index,
124                 "match": {
125                     "mask": 0xA,
126                     "dst": self.pg1.remote_ip4,
127                     "proto": 17,
128                     "dport": 6871,
129                 },
130                 "rewrite": {"mask": 0x8, "dport": 5555},
131                 "send": (
132                     IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
133                     / UDP(dport=6871, chksum=0)
134                 ),
135                 "reply": (
136                     IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
137                     / UDP(dport=5555, chksum=0)
138                 ),
139             },
140             {
141                 "input": PNAT_IP4_INPUT,
142                 "sw_if_index": self.pg0.sw_if_index,
143                 "match": {"mask": 0x2, "dst": self.pg1.remote_ip4, "proto": 1},
144                 "rewrite": {"mask": 0x1, "src": "8.8.8.8"},
145                 "send": (IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / ICMP()),
146                 "reply": IP(src="8.8.8.8", dst=self.pg1.remote_ip4) / ICMP(),
147             },
148         ]
149
150         p_ether = Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
151         for t in tests:
152             rv = self.vapi.pnat_binding_add(match=t["match"], rewrite=t["rewrite"])
153             self.vapi.pnat_binding_attach(
154                 sw_if_index=t["sw_if_index"],
155                 attachment=t["input"],
156                 binding_index=rv.binding_index,
157             )
158
159             reply = t["reply"]
160             reply[IP].ttl -= 1
161             rx = self.send_and_expect(self.pg0, p_ether / t["send"] * 1, self.pg1)
162             for p in rx:
163                 # p.show2()
164                 self.validate(p[1], reply)
165
166             self.ping_check()
167
168             self.vapi.pnat_binding_detach(
169                 sw_if_index=t["sw_if_index"],
170                 attachment=t["input"],
171                 binding_index=rv.binding_index,
172             )
173             self.vapi.pnat_binding_del(binding_index=rv.binding_index)
174
175     def test_pnat_show(self):
176         """PNAT show tests"""
177
178         PNAT_IP4_INPUT = VppEnum.vl_api_pnat_attachment_point_t.PNAT_IP4_INPUT
179         PNAT_IP4_OUTPUT = VppEnum.vl_api_pnat_attachment_point_t.PNAT_IP4_OUTPUT
180
181         tests = [
182             {
183                 "input": PNAT_IP4_INPUT,
184                 "sw_if_index": self.pg0.sw_if_index,
185                 "match": {
186                     "mask": 0xA,
187                     "dst": "10.10.10.10",
188                     "proto": 17,
189                     "dport": 6871,
190                 },
191                 "rewrite": {"mask": 0x2, "dst": self.pg1.remote_ip4},
192                 "send": (
193                     IP(src=self.pg0.remote_ip4, dst="10.10.10.10") / UDP(dport=6871)
194                 ),
195                 "reply": (
196                     IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
197                     / UDP(dport=6871)
198                 ),
199             },
200             {
201                 "input": PNAT_IP4_OUTPUT,
202                 "sw_if_index": self.pg1.sw_if_index,
203                 "match": {
204                     "mask": 0x9,
205                     "src": self.pg0.remote_ip4,
206                     "proto": 17,
207                     "dport": 6871,
208                 },
209                 "rewrite": {"mask": 0x1, "src": "11.11.11.11"},
210                 "send": (
211                     IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
212                     / UDP(dport=6871)
213                 ),
214                 "reply": (
215                     IP(src="11.11.11.11", dst=self.pg1.remote_ip4) / UDP(dport=6871)
216                 ),
217             },
218         ]
219         binding_index = []
220         for t in tests:
221             rv = self.vapi.pnat_binding_add(match=t["match"], rewrite=t["rewrite"])
222             binding_index.append(rv.binding_index)
223             self.vapi.pnat_binding_attach(
224                 sw_if_index=t["sw_if_index"],
225                 attachment=t["input"],
226                 binding_index=rv.binding_index,
227             )
228
229         rv, l = self.vapi.pnat_bindings_get()
230         self.assertEqual(len(l), len(tests))
231
232         rv, l = self.vapi.pnat_interfaces_get()
233         self.assertEqual(len(l), 2)
234
235         self.logger.info(self.vapi.cli("show pnat translations"))
236         self.logger.info(self.vapi.cli("show pnat interfaces"))
237
238         for i, t in enumerate(tests):
239             self.vapi.pnat_binding_detach(
240                 sw_if_index=t["sw_if_index"],
241                 attachment=t["input"],
242                 binding_index=binding_index[i],
243             )
244             self.vapi.pnat_binding_del(binding_index=binding_index[i])
245
246     def test_pnat_wildcard_proto(self):
247         """
248         PNAT test wildcard IP protocol, PNAT_PROTO for mask should be set by
249         handler
250         """
251
252         PNAT_IP4_INPUT = VppEnum.vl_api_pnat_attachment_point_t.PNAT_IP4_INPUT
253         PNAT_IP4_OUTPUT = VppEnum.vl_api_pnat_attachment_point_t.PNAT_IP4_OUTPUT
254
255         tests = [
256             {
257                 "input": PNAT_IP4_INPUT,
258                 "sw_if_index": self.pg0.sw_if_index,
259                 "match": {"mask": 0x2, "dst": "10.10.10.10"},
260                 "rewrite": {"mask": 0x2, "dst": self.pg1.remote_ip4},
261                 "send": (IP(src=self.pg0.remote_ip4, dst="10.10.10.10")),
262                 "reply": (IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)),
263             },
264             {
265                 "input": PNAT_IP4_OUTPUT,
266                 "sw_if_index": self.pg1.sw_if_index,
267                 "match": {"mask": 0x1, "src": self.pg0.remote_ip4},
268                 "rewrite": {"mask": 0x1, "src": "11.11.11.11"},
269                 "send": (IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)),
270                 "reply": (IP(src="11.11.11.11", dst=self.pg1.remote_ip4)),
271             },
272         ]
273
274         p_ether = Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
275         for t in tests:
276             rv = self.vapi.pnat_binding_add(match=t["match"], rewrite=t["rewrite"])
277             self.vapi.pnat_binding_attach(
278                 sw_if_index=t["sw_if_index"],
279                 attachment=t["input"],
280                 binding_index=rv.binding_index,
281             )
282
283             reply = t["reply"]
284             reply[IP].ttl -= 1
285             rx = self.send_and_expect(self.pg0, p_ether / t["send"] * 1, self.pg1)
286             for p in rx:
287                 self.validate(p[1], reply)
288
289             self.ping_check()
290
291             self.vapi.pnat_binding_detach(
292                 sw_if_index=t["sw_if_index"],
293                 attachment=t["input"],
294                 binding_index=rv.binding_index,
295             )
296             self.vapi.pnat_binding_del(binding_index=rv.binding_index)
297
298     def test_pnat_wildcard_proto_v2(self):
299         """PNAT test wildcard IP protocol using pnat_binding_add_v2"""
300
301         PNAT_IP4_INPUT = VppEnum.vl_api_pnat_attachment_point_t.PNAT_IP4_INPUT
302         PNAT_IP4_OUTPUT = VppEnum.vl_api_pnat_attachment_point_t.PNAT_IP4_OUTPUT
303
304         tests = [
305             {
306                 "input": PNAT_IP4_INPUT,
307                 "sw_if_index": self.pg0.sw_if_index,
308                 "match": {"mask": 0x42, "dst": "10.10.10.10"},
309                 "rewrite": {"mask": 0x42, "dst": self.pg1.remote_ip4},
310                 "send": (IP(src=self.pg0.remote_ip4, dst="10.10.10.10")),
311                 "reply": (IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)),
312             },
313             {
314                 "input": PNAT_IP4_OUTPUT,
315                 "sw_if_index": self.pg1.sw_if_index,
316                 "match": {"mask": 0x41, "src": self.pg0.remote_ip4},
317                 "rewrite": {"mask": 0x41, "src": "11.11.11.11"},
318                 "send": (IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)),
319                 "reply": (IP(src="11.11.11.11", dst=self.pg1.remote_ip4)),
320             },
321         ]
322
323         p_ether = Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
324         for t in tests:
325             rv = self.vapi.pnat_binding_add_v2(match=t["match"], rewrite=t["rewrite"])
326             self.vapi.pnat_binding_attach(
327                 sw_if_index=t["sw_if_index"],
328                 attachment=t["input"],
329                 binding_index=rv.binding_index,
330             )
331
332             reply = t["reply"]
333             reply[IP].ttl -= 1
334             rx = self.send_and_expect(self.pg0, p_ether / t["send"] * 1, self.pg1)
335             for p in rx:
336                 self.validate(p[1], reply)
337
338             self.ping_check()
339
340             self.vapi.pnat_binding_detach(
341                 sw_if_index=t["sw_if_index"],
342                 attachment=t["input"],
343                 binding_index=rv.binding_index,
344             )
345             self.vapi.pnat_binding_del(binding_index=rv.binding_index)
346
347
348 if __name__ == "__main__":
349     unittest.main(testRunner=VppTestRunner)