2 """Policy 1:1 NAT functional tests"""
5 from scapy.layers.inet import Ether, IP, UDP, ICMP
6 from framework import VppTestCase
7 from asfframework import VppTestRunner
8 from vpp_papi import VppEnum
11 class TestPNAT(VppTestCase):
18 super(TestPNAT, cls).setUpClass()
19 cls.create_pg_interfaces(range(2))
20 cls.interfaces = list(cls.pg_interfaces)
23 def tearDownClass(cls):
24 super(TestPNAT, cls).tearDownClass()
27 super(TestPNAT, self).setUp()
28 for i in self.interfaces:
34 super(TestPNAT, self).tearDown()
36 for i in self.pg_interfaces:
40 def validate(self, rx, expected):
41 self.assertEqual(rx, expected.__class__(expected))
43 def validate_bytes(self, rx, expected):
44 self.assertEqual(rx, expected)
47 """Verify non matching traffic works."""
48 p_ether = Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
50 icmpecho = IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4) / ICMP()
51 reply = IP(src=self.pg0.local_ip4, dst=self.pg0.remote_ip4) / ICMP(
54 rx = self.send_and_expect(self.pg0, p_ether / icmpecho * 1, self.pg0)
56 reply[IP].id = p[IP].id
57 self.validate(p[1], reply)
62 PNAT_IP4_INPUT = VppEnum.vl_api_pnat_attachment_point_t.PNAT_IP4_INPUT
63 PNAT_IP4_OUTPUT = VppEnum.vl_api_pnat_attachment_point_t.PNAT_IP4_OUTPUT
67 "input": PNAT_IP4_INPUT,
68 "sw_if_index": self.pg0.sw_if_index,
75 "rewrite": {"mask": 0x2, "dst": self.pg1.remote_ip4},
77 IP(src=self.pg0.remote_ip4, dst="10.10.10.10") / UDP(dport=6871)
80 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
85 "input": PNAT_IP4_OUTPUT,
86 "sw_if_index": self.pg1.sw_if_index,
89 "src": self.pg0.remote_ip4,
93 "rewrite": {"mask": 0x1, "src": "11.11.11.11"},
95 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
99 IP(src="11.11.11.11", dst=self.pg1.remote_ip4) / UDP(dport=6871)
103 "input": PNAT_IP4_INPUT,
104 "sw_if_index": self.pg0.sw_if_index,
107 "dst": "10.10.10.10",
111 "rewrite": {"mask": 0xA, "dst": self.pg1.remote_ip4, "dport": 5555},
113 IP(src=self.pg0.remote_ip4, dst="10.10.10.10")
114 / UDP(sport=65530, dport=6871)
117 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
118 / UDP(sport=65530, dport=5555)
122 "input": PNAT_IP4_INPUT,
123 "sw_if_index": self.pg0.sw_if_index,
126 "dst": self.pg1.remote_ip4,
130 "rewrite": {"mask": 0x8, "dport": 5555},
132 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
133 / UDP(dport=6871, chksum=0)
136 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
137 / UDP(dport=5555, chksum=0)
141 "input": PNAT_IP4_INPUT,
142 "sw_if_index": self.pg0.sw_if_index,
143 "match": {"mask": 0x2, "dst": self.pg1.remote_ip4, "proto": 1},
144 "rewrite": {"mask": 0x1, "src": "8.8.8.8"},
145 "send": (IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / ICMP()),
146 "reply": IP(src="8.8.8.8", dst=self.pg1.remote_ip4) / ICMP(),
150 p_ether = Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
152 rv = self.vapi.pnat_binding_add(match=t["match"], rewrite=t["rewrite"])
153 self.vapi.pnat_binding_attach(
154 sw_if_index=t["sw_if_index"],
155 attachment=t["input"],
156 binding_index=rv.binding_index,
161 rx = self.send_and_expect(self.pg0, p_ether / t["send"] * 1, self.pg1)
164 self.validate(p[1], reply)
168 self.vapi.pnat_binding_detach(
169 sw_if_index=t["sw_if_index"],
170 attachment=t["input"],
171 binding_index=rv.binding_index,
173 self.vapi.pnat_binding_del(binding_index=rv.binding_index)
175 def test_pnat_show(self):
176 """PNAT show tests"""
178 PNAT_IP4_INPUT = VppEnum.vl_api_pnat_attachment_point_t.PNAT_IP4_INPUT
179 PNAT_IP4_OUTPUT = VppEnum.vl_api_pnat_attachment_point_t.PNAT_IP4_OUTPUT
183 "input": PNAT_IP4_INPUT,
184 "sw_if_index": self.pg0.sw_if_index,
187 "dst": "10.10.10.10",
191 "rewrite": {"mask": 0x2, "dst": self.pg1.remote_ip4},
193 IP(src=self.pg0.remote_ip4, dst="10.10.10.10") / UDP(dport=6871)
196 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
201 "input": PNAT_IP4_OUTPUT,
202 "sw_if_index": self.pg1.sw_if_index,
205 "src": self.pg0.remote_ip4,
209 "rewrite": {"mask": 0x1, "src": "11.11.11.11"},
211 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
215 IP(src="11.11.11.11", dst=self.pg1.remote_ip4) / UDP(dport=6871)
221 rv = self.vapi.pnat_binding_add(match=t["match"], rewrite=t["rewrite"])
222 binding_index.append(rv.binding_index)
223 self.vapi.pnat_binding_attach(
224 sw_if_index=t["sw_if_index"],
225 attachment=t["input"],
226 binding_index=rv.binding_index,
229 rv, l = self.vapi.pnat_bindings_get()
230 self.assertEqual(len(l), len(tests))
232 rv, l = self.vapi.pnat_interfaces_get()
233 self.assertEqual(len(l), 2)
235 self.logger.info(self.vapi.cli("show pnat translations"))
236 self.logger.info(self.vapi.cli("show pnat interfaces"))
238 for i, t in enumerate(tests):
239 self.vapi.pnat_binding_detach(
240 sw_if_index=t["sw_if_index"],
241 attachment=t["input"],
242 binding_index=binding_index[i],
244 self.vapi.pnat_binding_del(binding_index=binding_index[i])
246 def test_pnat_wildcard_proto(self):
248 PNAT test wildcard IP protocol, PNAT_PROTO for mask should be set by
252 PNAT_IP4_INPUT = VppEnum.vl_api_pnat_attachment_point_t.PNAT_IP4_INPUT
253 PNAT_IP4_OUTPUT = VppEnum.vl_api_pnat_attachment_point_t.PNAT_IP4_OUTPUT
257 "input": PNAT_IP4_INPUT,
258 "sw_if_index": self.pg0.sw_if_index,
259 "match": {"mask": 0x2, "dst": "10.10.10.10"},
260 "rewrite": {"mask": 0x2, "dst": self.pg1.remote_ip4},
261 "send": (IP(src=self.pg0.remote_ip4, dst="10.10.10.10")),
262 "reply": (IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)),
265 "input": PNAT_IP4_OUTPUT,
266 "sw_if_index": self.pg1.sw_if_index,
267 "match": {"mask": 0x1, "src": self.pg0.remote_ip4},
268 "rewrite": {"mask": 0x1, "src": "11.11.11.11"},
269 "send": (IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)),
270 "reply": (IP(src="11.11.11.11", dst=self.pg1.remote_ip4)),
274 p_ether = Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
276 rv = self.vapi.pnat_binding_add(match=t["match"], rewrite=t["rewrite"])
277 self.vapi.pnat_binding_attach(
278 sw_if_index=t["sw_if_index"],
279 attachment=t["input"],
280 binding_index=rv.binding_index,
285 rx = self.send_and_expect(self.pg0, p_ether / t["send"] * 1, self.pg1)
287 self.validate(p[1], reply)
291 self.vapi.pnat_binding_detach(
292 sw_if_index=t["sw_if_index"],
293 attachment=t["input"],
294 binding_index=rv.binding_index,
296 self.vapi.pnat_binding_del(binding_index=rv.binding_index)
298 def test_pnat_wildcard_proto_v2(self):
299 """PNAT test wildcard IP protocol using pnat_binding_add_v2"""
301 PNAT_IP4_INPUT = VppEnum.vl_api_pnat_attachment_point_t.PNAT_IP4_INPUT
302 PNAT_IP4_OUTPUT = VppEnum.vl_api_pnat_attachment_point_t.PNAT_IP4_OUTPUT
306 "input": PNAT_IP4_INPUT,
307 "sw_if_index": self.pg0.sw_if_index,
308 "match": {"mask": 0x42, "dst": "10.10.10.10"},
309 "rewrite": {"mask": 0x42, "dst": self.pg1.remote_ip4},
310 "send": (IP(src=self.pg0.remote_ip4, dst="10.10.10.10")),
311 "reply": (IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)),
314 "input": PNAT_IP4_OUTPUT,
315 "sw_if_index": self.pg1.sw_if_index,
316 "match": {"mask": 0x41, "src": self.pg0.remote_ip4},
317 "rewrite": {"mask": 0x41, "src": "11.11.11.11"},
318 "send": (IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)),
319 "reply": (IP(src="11.11.11.11", dst=self.pg1.remote_ip4)),
323 p_ether = Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
325 rv = self.vapi.pnat_binding_add_v2(match=t["match"], rewrite=t["rewrite"])
326 self.vapi.pnat_binding_attach(
327 sw_if_index=t["sw_if_index"],
328 attachment=t["input"],
329 binding_index=rv.binding_index,
334 rx = self.send_and_expect(self.pg0, p_ether / t["send"] * 1, self.pg1)
336 self.validate(p[1], reply)
340 self.vapi.pnat_binding_detach(
341 sw_if_index=t["sw_if_index"],
342 attachment=t["input"],
343 binding_index=rv.binding_index,
345 self.vapi.pnat_binding_del(binding_index=rv.binding_index)
348 if __name__ == "__main__":
349 unittest.main(testRunner=VppTestRunner)