2 """Policy 1:1 NAT functional tests"""
5 from scapy.layers.inet import Ether, IP, UDP, ICMP
6 from framework import VppTestCase, VppTestRunner
7 from vpp_papi import VppEnum
10 class TestPNAT(VppTestCase):
17 super(TestPNAT, cls).setUpClass()
18 cls.create_pg_interfaces(range(2))
19 cls.interfaces = list(cls.pg_interfaces)
22 def tearDownClass(cls):
23 super(TestPNAT, cls).tearDownClass()
26 super(TestPNAT, self).setUp()
27 for i in self.interfaces:
33 super(TestPNAT, self).tearDown()
35 for i in self.pg_interfaces:
39 def validate(self, rx, expected):
40 self.assertEqual(rx, expected.__class__(expected))
42 def validate_bytes(self, rx, expected):
43 self.assertEqual(rx, expected)
46 """Verify non matching traffic works."""
47 p_ether = Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
49 icmpecho = IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4) / ICMP()
50 reply = IP(src=self.pg0.local_ip4, dst=self.pg0.remote_ip4) / ICMP(
53 rx = self.send_and_expect(self.pg0, p_ether / icmpecho * 1, self.pg0)
55 reply[IP].id = p[IP].id
56 self.validate(p[1], reply)
61 PNAT_IP4_INPUT = VppEnum.vl_api_pnat_attachment_point_t.PNAT_IP4_INPUT
62 PNAT_IP4_OUTPUT = VppEnum.vl_api_pnat_attachment_point_t.PNAT_IP4_OUTPUT
66 "input": PNAT_IP4_INPUT,
67 "sw_if_index": self.pg0.sw_if_index,
74 "rewrite": {"mask": 0x2, "dst": self.pg1.remote_ip4},
76 IP(src=self.pg0.remote_ip4, dst="10.10.10.10") / UDP(dport=6871)
79 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
84 "input": PNAT_IP4_OUTPUT,
85 "sw_if_index": self.pg1.sw_if_index,
88 "src": self.pg0.remote_ip4,
92 "rewrite": {"mask": 0x1, "src": "11.11.11.11"},
94 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
98 IP(src="11.11.11.11", dst=self.pg1.remote_ip4) / UDP(dport=6871)
102 "input": PNAT_IP4_INPUT,
103 "sw_if_index": self.pg0.sw_if_index,
106 "dst": "10.10.10.10",
110 "rewrite": {"mask": 0xA, "dst": self.pg1.remote_ip4, "dport": 5555},
112 IP(src=self.pg0.remote_ip4, dst="10.10.10.10")
113 / UDP(sport=65530, dport=6871)
116 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
117 / UDP(sport=65530, dport=5555)
121 "input": PNAT_IP4_INPUT,
122 "sw_if_index": self.pg0.sw_if_index,
125 "dst": self.pg1.remote_ip4,
129 "rewrite": {"mask": 0x8, "dport": 5555},
131 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
132 / UDP(dport=6871, chksum=0)
135 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
136 / UDP(dport=5555, chksum=0)
140 "input": PNAT_IP4_INPUT,
141 "sw_if_index": self.pg0.sw_if_index,
142 "match": {"mask": 0x2, "dst": self.pg1.remote_ip4, "proto": 1},
143 "rewrite": {"mask": 0x1, "src": "8.8.8.8"},
144 "send": (IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / ICMP()),
145 "reply": IP(src="8.8.8.8", dst=self.pg1.remote_ip4) / ICMP(),
149 p_ether = Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
151 rv = self.vapi.pnat_binding_add(match=t["match"], rewrite=t["rewrite"])
152 self.vapi.pnat_binding_attach(
153 sw_if_index=t["sw_if_index"],
154 attachment=t["input"],
155 binding_index=rv.binding_index,
160 rx = self.send_and_expect(self.pg0, p_ether / t["send"] * 1, self.pg1)
163 self.validate(p[1], reply)
167 self.vapi.pnat_binding_detach(
168 sw_if_index=t["sw_if_index"],
169 attachment=t["input"],
170 binding_index=rv.binding_index,
172 self.vapi.pnat_binding_del(binding_index=rv.binding_index)
174 def test_pnat_show(self):
175 """PNAT show tests"""
177 PNAT_IP4_INPUT = VppEnum.vl_api_pnat_attachment_point_t.PNAT_IP4_INPUT
178 PNAT_IP4_OUTPUT = VppEnum.vl_api_pnat_attachment_point_t.PNAT_IP4_OUTPUT
182 "input": PNAT_IP4_INPUT,
183 "sw_if_index": self.pg0.sw_if_index,
186 "dst": "10.10.10.10",
190 "rewrite": {"mask": 0x2, "dst": self.pg1.remote_ip4},
192 IP(src=self.pg0.remote_ip4, dst="10.10.10.10") / UDP(dport=6871)
195 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
200 "input": PNAT_IP4_OUTPUT,
201 "sw_if_index": self.pg1.sw_if_index,
204 "src": self.pg0.remote_ip4,
208 "rewrite": {"mask": 0x1, "src": "11.11.11.11"},
210 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
214 IP(src="11.11.11.11", dst=self.pg1.remote_ip4) / UDP(dport=6871)
220 rv = self.vapi.pnat_binding_add(match=t["match"], rewrite=t["rewrite"])
221 binding_index.append(rv.binding_index)
222 self.vapi.pnat_binding_attach(
223 sw_if_index=t["sw_if_index"],
224 attachment=t["input"],
225 binding_index=rv.binding_index,
228 rv, l = self.vapi.pnat_bindings_get()
229 self.assertEqual(len(l), len(tests))
231 rv, l = self.vapi.pnat_interfaces_get()
232 self.assertEqual(len(l), 2)
234 self.logger.info(self.vapi.cli("show pnat translations"))
235 self.logger.info(self.vapi.cli("show pnat interfaces"))
237 for i, t in enumerate(tests):
238 self.vapi.pnat_binding_detach(
239 sw_if_index=t["sw_if_index"],
240 attachment=t["input"],
241 binding_index=binding_index[i],
243 self.vapi.pnat_binding_del(binding_index=binding_index[i])
245 def test_pnat_wildcard_proto(self):
247 PNAT test wildcard IP protocol, PNAT_PROTO for mask should be set by
251 PNAT_IP4_INPUT = VppEnum.vl_api_pnat_attachment_point_t.PNAT_IP4_INPUT
252 PNAT_IP4_OUTPUT = VppEnum.vl_api_pnat_attachment_point_t.PNAT_IP4_OUTPUT
256 "input": PNAT_IP4_INPUT,
257 "sw_if_index": self.pg0.sw_if_index,
258 "match": {"mask": 0x2, "dst": "10.10.10.10"},
259 "rewrite": {"mask": 0x2, "dst": self.pg1.remote_ip4},
260 "send": (IP(src=self.pg0.remote_ip4, dst="10.10.10.10")),
261 "reply": (IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)),
264 "input": PNAT_IP4_OUTPUT,
265 "sw_if_index": self.pg1.sw_if_index,
266 "match": {"mask": 0x1, "src": self.pg0.remote_ip4},
267 "rewrite": {"mask": 0x1, "src": "11.11.11.11"},
268 "send": (IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)),
269 "reply": (IP(src="11.11.11.11", dst=self.pg1.remote_ip4)),
273 p_ether = Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
275 rv = self.vapi.pnat_binding_add(match=t["match"], rewrite=t["rewrite"])
276 self.vapi.pnat_binding_attach(
277 sw_if_index=t["sw_if_index"],
278 attachment=t["input"],
279 binding_index=rv.binding_index,
284 rx = self.send_and_expect(self.pg0, p_ether / t["send"] * 1, self.pg1)
286 self.validate(p[1], reply)
290 self.vapi.pnat_binding_detach(
291 sw_if_index=t["sw_if_index"],
292 attachment=t["input"],
293 binding_index=rv.binding_index,
295 self.vapi.pnat_binding_del(binding_index=rv.binding_index)
297 def test_pnat_wildcard_proto_v2(self):
298 """PNAT test wildcard IP protocol using pnat_binding_add_v2"""
300 PNAT_IP4_INPUT = VppEnum.vl_api_pnat_attachment_point_t.PNAT_IP4_INPUT
301 PNAT_IP4_OUTPUT = VppEnum.vl_api_pnat_attachment_point_t.PNAT_IP4_OUTPUT
305 "input": PNAT_IP4_INPUT,
306 "sw_if_index": self.pg0.sw_if_index,
307 "match": {"mask": 0x42, "dst": "10.10.10.10"},
308 "rewrite": {"mask": 0x42, "dst": self.pg1.remote_ip4},
309 "send": (IP(src=self.pg0.remote_ip4, dst="10.10.10.10")),
310 "reply": (IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)),
313 "input": PNAT_IP4_OUTPUT,
314 "sw_if_index": self.pg1.sw_if_index,
315 "match": {"mask": 0x41, "src": self.pg0.remote_ip4},
316 "rewrite": {"mask": 0x41, "src": "11.11.11.11"},
317 "send": (IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)),
318 "reply": (IP(src="11.11.11.11", dst=self.pg1.remote_ip4)),
322 p_ether = Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
324 rv = self.vapi.pnat_binding_add_v2(match=t["match"], rewrite=t["rewrite"])
325 self.vapi.pnat_binding_attach(
326 sw_if_index=t["sw_if_index"],
327 attachment=t["input"],
328 binding_index=rv.binding_index,
333 rx = self.send_and_expect(self.pg0, p_ether / t["send"] * 1, self.pg1)
335 self.validate(p[1], reply)
339 self.vapi.pnat_binding_detach(
340 sw_if_index=t["sw_if_index"],
341 attachment=t["input"],
342 binding_index=rv.binding_index,
344 self.vapi.pnat_binding_del(binding_index=rv.binding_index)
347 if __name__ == "__main__":
348 unittest.main(testRunner=VppTestRunner)