5 from framework import VppTestCase, VppTestRunner
6 from vpp_ip import DpoProto
8 from scapy.packet import Raw
9 from scapy.layers.l2 import Ether
10 from scapy.layers.inet import IP, UDP, TCP
11 from scapy.layers.inet6 import IPv6
13 from ipaddress import ip_address, ip_network, \
14 IPv4Address, IPv6Address, IPv4Network, IPv6Network
16 from vpp_object import VppObject
17 from vpp_papi import VppEnum
22 def find_cnat_translation(test, id):
23 ts = test.vapi.cnat_translation_dump()
25 if id == t.translation.id:
33 def __init__(self, ip, port, l4p=TCP):
39 return {'addr': self.ip,
43 return ("%s:%d" % (self.ip, self.port))
46 class EpTuple(object):
49 def __init__(self, src, dst):
54 return {'src_ep': self.src.encode(),
55 'dst_ep': self.dst.encode()}
58 return ("%s->%s" % (self.src, self.dst))
61 class VppCNatTranslation(VppObject):
63 def __init__(self, test, iproto, vip, paths):
68 self.encoded_paths = []
69 for path in self.paths:
70 self.encoded_paths.append(path.encode())
74 ip_proto = VppEnum.vl_api_ip_proto_t
76 UDP: ip_proto.IP_API_PROTO_UDP,
77 TCP: ip_proto.IP_API_PROTO_TCP,
81 r = self._test.vapi.cnat_translation_del(id=self.id)
83 def add_vpp_config(self):
84 r = self._test.vapi.cnat_translation_update(
85 {'vip': self.vip.encode(),
86 'ip_proto': self.vl4_proto,
87 'n_paths': len(self.paths),
88 'paths': self.encoded_paths})
89 self._test.registry.register(self, self._test.logger)
92 def modify_vpp_config(self, paths):
94 self.encoded_paths = []
95 for path in self.paths:
96 self.encoded_paths.append(path.encode())
98 r = self._test.vapi.cnat_translation_update(
99 {'vip': self.vip.encode(),
100 'ip_proto': self.vl4_proto,
101 'n_paths': len(self.paths),
102 'paths': self.encoded_paths})
103 self._test.registry.register(self, self._test.logger)
105 def remove_vpp_config(self):
106 self._test.vapi.cnat_translation_del(self.id)
108 def query_vpp_config(self):
109 return find_cnat_translation(self._test, self.id)
112 return ("cnat-translation-%s" % (self.vip))
115 c = self._test.statistics.get_counter("/net/cnat-translation")
119 class VppCNATSourceNat(VppObject):
121 def __init__(self, test, address, exclude_subnets=[]):
123 self.address = address
124 self.exclude_subnets = exclude_subnets
126 def add_vpp_config(self):
127 a = ip_address(self.address)
129 self._test.vapi.cnat_set_snat_addresses(snat_ip4=self.address)
131 self._test.vapi.cnat_set_snat_addresses(snat_ip6=self.address)
132 for subnet in self.exclude_subnets:
133 self.cnat_exclude_subnet(subnet, True)
135 def cnat_exclude_subnet(self, exclude_subnet, isAdd=True):
136 add = 1 if isAdd else 0
137 self._test.vapi.cnat_add_del_snat_prefix(
138 prefix=exclude_subnet, is_add=add)
140 def query_vpp_config(self):
143 def remove_vpp_config(self):
147 class TestCNatTranslation(VppTestCase):
148 """ CNat Translation """
149 extra_vpp_punt_config = ["cnat", "{",
150 "session-max-age", "1",
151 "tcp-max-age", "1", "}"]
155 super(TestCNatTranslation, cls).setUpClass()
158 def tearDownClass(cls):
159 super(TestCNatTranslation, cls).tearDownClass()
162 super(TestCNatTranslation, self).setUp()
164 self.create_pg_interfaces(range(3))
166 for i in self.pg_interfaces:
174 for i in self.pg_interfaces:
178 super(TestCNatTranslation, self).tearDown()
180 def cnat_create_translation(self, vip, nbr, isV6=False):
181 ip_v = "ip6" if isV6 else "ip4"
182 dep = Ep(getattr(self.pg1.remote_hosts[nbr], ip_v), 4000 + nbr)
183 sep = Ep("::", 0) if isV6 else Ep("0.0.0.0", 0)
184 t1 = VppCNatTranslation(
186 [EpTuple(sep, dep), EpTuple(sep, dep)])
190 def cnat_test_translation(self, t1, nbr, sports, isV6=False):
191 ip_v = "ip6" if isV6 else "ip4"
192 ip_class = IPv6 if isV6 else IP
198 for src in self.pg0.remote_hosts:
201 p1 = (Ether(dst=self.pg0.local_mac,
203 ip_class(src=getattr(src, ip_v), dst=vip.ip) /
204 vip.l4p(sport=sport, dport=vip.port) /
207 self.vapi.cli("trace add pg-input 1")
208 rxs = self.send_and_expect(self.pg0,
213 self.assert_packet_checksums_valid(rx)
216 getattr(self.pg1.remote_hosts[nbr], ip_v))
217 self.assertEqual(rx[vip.l4p].dport, 4000 + nbr)
221 self.assertEqual(rx[vip.l4p].sport, sport)
224 p1 = (Ether(dst=self.pg1.local_mac,
225 src=self.pg1.remote_mac) /
226 ip_class(src=getattr(
227 self.pg1.remote_hosts[nbr],
229 dst=getattr(src, ip_v)) /
230 vip.l4p(sport=4000 + nbr, dport=sport) /
233 rxs = self.send_and_expect(self.pg1,
238 self.assert_packet_checksums_valid(rx)
242 self.assertEqual(rx[vip.l4p].dport, sport)
243 self.assertEqual(rx[ip_class].src, vip.ip)
244 self.assertEqual(rx[vip.l4p].sport, vip.port)
247 # packets to the VIP that do not match a
248 # translation are dropped
250 p1 = (Ether(dst=self.pg0.local_mac,
252 ip_class(src=getattr(src, ip_v), dst=vip.ip) /
253 vip.l4p(sport=sport, dport=6666) /
256 self.send_and_assert_no_replies(self.pg0,
261 # packets from the VIP that do not match a
262 # session are forwarded
264 p1 = (Ether(dst=self.pg1.local_mac,
265 src=self.pg1.remote_mac) /
266 ip_class(src=getattr(
267 self.pg1.remote_hosts[nbr],
269 dst=getattr(src, ip_v)) /
270 vip.l4p(sport=6666, dport=sport) /
273 rxs = self.send_and_expect(self.pg1,
277 self.assertEqual(t1.get_stats()['packets'],
280 len(self.pg0.remote_hosts))
282 def cnat_test_translation_update(self, t1, sports, isV6=False):
283 ip_v = "ip6" if isV6 else "ip4"
284 ip_class = IPv6 if isV6 else IP
288 # modify the translation to use a different backend
290 dep = Ep(getattr(self.pg2, 'remote_' + ip_v), 5000)
291 sep = Ep("::", 0) if isV6 else Ep("0.0.0.0", 0)
292 t1.modify_vpp_config([EpTuple(sep, dep)])
295 # existing flows follow the old path
297 for src in self.pg0.remote_hosts:
300 p1 = (Ether(dst=self.pg0.local_mac,
302 ip_class(src=getattr(src, ip_v), dst=vip.ip) /
303 vip.l4p(sport=sport, dport=vip.port) /
306 rxs = self.send_and_expect(self.pg0,
311 # new flows go to the new backend
313 for src in self.pg0.remote_hosts:
314 p1 = (Ether(dst=self.pg0.local_mac,
316 ip_class(src=getattr(src, ip_v), dst=vip.ip) /
317 vip.l4p(sport=9999, dport=vip.port) /
320 rxs = self.send_and_expect(self.pg0,
324 def cnat_translation(self, vips, isV6=False):
325 """ CNat Translation """
327 ip_class = IPv6 if isV6 else IP
328 ip_v = "ip6" if isV6 else "ip4"
329 sports = [1234, 1233]
332 # turn the scanner off whilst testing otherwise sessions
335 self.vapi.cli("test cnat scanner off")
337 sessions = self.vapi.cnat_session_dump()
340 for nbr, vip in enumerate(vips):
341 trs.append(self.cnat_create_translation(vip, nbr, isV6=isV6))
343 self.logger.info(self.vapi.cli("sh cnat client"))
344 self.logger.info(self.vapi.cli("sh cnat translation"))
349 for nbr, vip in enumerate(vips):
350 self.cnat_test_translation(trs[nbr], nbr, sports, isV6=isV6)
351 self.cnat_test_translation_update(trs[nbr], sports, isV6=isV6)
353 self.logger.info(self.vapi.cli(
354 "sh ip6 fib %s" % self.pg0.remote_ip6))
356 self.logger.info(self.vapi.cli(
357 "sh ip fib %s" % self.pg0.remote_ip4))
358 self.logger.info(self.vapi.cli("sh cnat session verbose"))
361 # turn the scanner back on and wait untill the sessions
364 self.vapi.cli("test cnat scanner on")
367 sessions = self.vapi.cnat_session_dump()
368 while (len(sessions) and n_tries < 100):
370 sessions = self.vapi.cnat_session_dump()
373 self.assertTrue(n_tries < 100)
376 # load some flows again and purge
379 for src in self.pg0.remote_hosts:
382 p1 = (Ether(dst=self.pg0.local_mac,
384 ip_class(src=getattr(src, ip_v), dst=vip.ip) /
385 vip.l4p(sport=sport, dport=vip.port) /
387 self.send_and_expect(self.pg0,
394 self.assertTrue(self.vapi.cnat_session_dump())
395 self.vapi.cnat_session_purge()
396 self.assertFalse(self.vapi.cnat_session_dump())
398 def test_cnat6(self):
399 # """ CNat Translation ipv6 """
403 Ep("30::2", 5553, UDP),
406 self.pg0.generate_remote_hosts(len(vips))
407 self.pg0.configure_ipv6_neighbors()
408 self.pg1.generate_remote_hosts(len(vips))
409 self.pg1.configure_ipv6_neighbors()
411 self.cnat_translation(vips, isV6=True)
413 def test_cnat4(self):
414 # """ CNat Translation ipv4 """
417 Ep("30.0.0.1", 5555),
418 Ep("30.0.0.2", 5554),
419 Ep("30.0.0.2", 5553, UDP),
422 self.pg0.generate_remote_hosts(len(vips))
423 self.pg0.configure_ipv4_neighbors()
424 self.pg1.generate_remote_hosts(len(vips))
425 self.pg1.configure_ipv4_neighbors()
427 self.cnat_translation(vips)
430 class TestCNatSourceNAT(VppTestCase):
431 """ CNat Source NAT """
432 extra_vpp_punt_config = ["cnat", "{",
433 "session-max-age", "1",
434 "tcp-max-age", "1", "}"]
438 super(TestCNatSourceNAT, cls).setUpClass()
441 def tearDownClass(cls):
442 super(TestCNatSourceNAT, cls).tearDownClass()
445 super(TestCNatSourceNAT, self).setUp()
447 self.create_pg_interfaces(range(3))
449 for i in self.pg_interfaces:
457 for i in self.pg_interfaces:
461 super(TestCNatSourceNAT, self).tearDown()
463 def cnat_set_snat_address(self, srcNatAddr, interface, isV6=False):
464 t1 = VppCNATSourceNat(self, srcNatAddr)
466 cnat_arc_name = "ip6-unicast" if isV6 else "ip4-unicast"
467 cnat_feature_name = "ip6-cnat-snat" if isV6 else "ip4-cnat-snat"
468 self.vapi.feature_enable_disable(
470 arc_name=cnat_arc_name,
471 feature_name=cnat_feature_name,
472 sw_if_index=interface.sw_if_index)
476 def cnat_test_sourcenat(self, srcNatAddr, l4p=TCP, isV6=False):
477 ip_v = "ip6" if isV6 else "ip4"
478 ip_class = IPv6 if isV6 else IP
479 sports = [1234, 1235, 1236]
480 dports = [6661, 6662, 6663]
482 self.pg0.generate_remote_hosts(1)
483 self.pg0.configure_ipv4_neighbors()
484 self.pg0.configure_ipv6_neighbors()
485 self.pg1.generate_remote_hosts(len(sports))
486 self.pg1.configure_ipv4_neighbors()
487 self.pg1.configure_ipv6_neighbors()
489 self.vapi.cli("test cnat scanner on")
490 t1 = self.cnat_set_snat_address(srcNatAddr, self.pg0, isV6)
492 for nbr, remote_host in enumerate(self.pg1.remote_hosts):
493 # from pods to outside network
496 dst=self.pg0.local_mac,
497 src=self.pg0.remote_hosts[0].mac) /
499 src=getattr(self.pg0.remote_hosts[0], ip_v),
500 dst=getattr(remote_host, ip_v)) /
501 l4p(sport=sports[nbr], dport=dports[nbr]) /
504 rxs = self.send_and_expect(
509 self.assert_packet_checksums_valid(rx)
512 getattr(remote_host, ip_v))
513 self.assertEqual(rx[l4p].dport, dports[nbr])
517 sport = rx[l4p].sport
519 # from outside to pods
522 dst=self.pg1.local_mac,
523 src=self.pg1.remote_hosts[nbr].mac) /
524 ip_class(src=getattr(remote_host, ip_v), dst=srcNatAddr) /
525 l4p(sport=dports[nbr], dport=sport) /
528 rxs = self.send_and_expect(
534 self.assert_packet_checksums_valid(rx)
537 getattr(self.pg0.remote_hosts[0], ip_v))
538 self.assertEqual(rx[l4p].dport, sports[nbr])
539 self.assertEqual(rx[l4p].sport, dports[nbr])
542 getattr(remote_host, ip_v))
544 # add remote host to exclude list
545 subnet_mask = 100 if isV6 else 16
546 subnet = getattr(remote_host, ip_v) + "/" + str(subnet_mask)
547 exclude_subnet = ip_network(subnet, strict=False)
549 t1.cnat_exclude_subnet(exclude_subnet)
550 self.vapi.cnat_session_purge()
552 rxs = self.send_and_expect(
557 self.assert_packet_checksums_valid(rx)
560 getattr(remote_host, ip_v))
561 self.assertEqual(rx[l4p].dport, dports[nbr])
564 getattr(self.pg0.remote_hosts[0], ip_v))
566 # remove remote host from exclude list
567 t1.cnat_exclude_subnet(exclude_subnet, isAdd=False)
568 self.vapi.cnat_session_purge()
570 rxs = self.send_and_expect(
576 self.assert_packet_checksums_valid(rx)
579 getattr(remote_host, ip_v))
580 self.assertEqual(rx[l4p].dport, dports[nbr])
585 def test_cnat6_sourcenat(self):
586 # """ CNat Source Nat ipv6 """
587 self.cnat_test_sourcenat(self.pg2.remote_hosts[0].ip6, TCP, True)
588 self.cnat_test_sourcenat(self.pg2.remote_hosts[0].ip6, UDP, True)
590 def test_cnat4_sourcenat(self):
591 # """ CNat Source Nat ipv4 """
592 self.cnat_test_sourcenat(self.pg2.remote_hosts[0].ip4, TCP)
593 self.cnat_test_sourcenat(self.pg2.remote_hosts[0].ip4, UDP)
595 if __name__ == '__main__':
596 unittest.main(testRunner=VppTestRunner)