5 from framework import VppTestCase, VppTestRunner
6 from vpp_ip import DpoProto
8 from scapy.packet import Raw
9 from scapy.layers.l2 import Ether
10 from scapy.layers.inet import IP, UDP, TCP, ICMP
11 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
12 from scapy.layers.inet6 import IPv6, IPerror6, ICMPv6DestUnreach
16 from ipaddress import ip_address, ip_network, \
17 IPv4Address, IPv6Address, IPv4Network, IPv6Network
19 from vpp_object import VppObject
20 from vpp_papi import VppEnum
25 def find_cnat_translation(test, id):
26 ts = test.vapi.cnat_translation_dump()
28 if id == t.translation.id:
36 def __init__(self, ip, port, l4p=TCP):
42 return {'addr': self.ip,
50 return ("%s:%d" % (self.ip, self.port))
53 class EpTuple(object):
56 def __init__(self, src, dst):
61 return {'src_ep': self.src.encode(),
62 'dst_ep': self.dst.encode()}
65 return ("%s->%s" % (self.src, self.dst))
68 class VppCNatTranslation(VppObject):
70 def __init__(self, test, iproto, vip, paths):
75 self.encoded_paths = []
76 for path in self.paths:
77 self.encoded_paths.append(path.encode())
81 ip_proto = VppEnum.vl_api_ip_proto_t
83 UDP: ip_proto.IP_API_PROTO_UDP,
84 TCP: ip_proto.IP_API_PROTO_TCP,
88 r = self._test.vapi.cnat_translation_del(id=self.id)
90 def add_vpp_config(self):
91 r = self._test.vapi.cnat_translation_update(
92 {'vip': self.vip.encode(),
93 'ip_proto': self.vl4_proto,
94 'n_paths': len(self.paths),
95 'paths': self.encoded_paths})
96 self._test.registry.register(self, self._test.logger)
99 def modify_vpp_config(self, paths):
101 self.encoded_paths = []
102 for path in self.paths:
103 self.encoded_paths.append(path.encode())
105 r = self._test.vapi.cnat_translation_update(
106 {'vip': self.vip.encode(),
107 'ip_proto': self.vl4_proto,
108 'n_paths': len(self.paths),
109 'paths': self.encoded_paths})
110 self._test.registry.register(self, self._test.logger)
112 def remove_vpp_config(self):
113 self._test.vapi.cnat_translation_del(self.id)
115 def query_vpp_config(self):
116 return find_cnat_translation(self._test, self.id)
119 return ("cnat-translation-%s" % (self.vip))
122 c = self._test.statistics.get_counter("/net/cnat-translation")
126 class VppCNATSourceNat(VppObject):
128 def __init__(self, test, address, exclude_subnets=[]):
130 self.address = address
131 self.exclude_subnets = exclude_subnets
133 def add_vpp_config(self):
134 a = ip_address(self.address)
136 self._test.vapi.cnat_set_snat_addresses(snat_ip4=self.address)
138 self._test.vapi.cnat_set_snat_addresses(snat_ip6=self.address)
139 for subnet in self.exclude_subnets:
140 self.cnat_exclude_subnet(subnet, True)
142 def cnat_exclude_subnet(self, exclude_subnet, isAdd=True):
143 add = 1 if isAdd else 0
144 self._test.vapi.cnat_add_del_snat_prefix(
145 prefix=exclude_subnet, is_add=add)
147 def query_vpp_config(self):
150 def remove_vpp_config(self):
154 class TestCNatTranslation(VppTestCase):
155 """ CNat Translation """
156 extra_vpp_punt_config = ["cnat", "{",
157 "session-db-buckets", "64",
158 "session-cleanup-timeout", "0.1",
159 "session-max-age", "1",
161 "scanner", "off", "}"]
165 super(TestCNatTranslation, cls).setUpClass()
168 def tearDownClass(cls):
169 super(TestCNatTranslation, cls).tearDownClass()
172 super(TestCNatTranslation, self).setUp()
174 self.create_pg_interfaces(range(3))
176 for i in self.pg_interfaces:
184 for i in self.pg_interfaces:
188 super(TestCNatTranslation, self).tearDown()
190 def cnat_create_translation(self, vip, nbr):
191 ip_v = "ip6" if vip.isV6 else "ip4"
192 dep = Ep(getattr(self.pg1.remote_hosts[nbr], ip_v), 4000 + nbr)
193 sep = Ep("::", 0) if vip.isV6 else Ep("0.0.0.0", 0)
194 t1 = VppCNatTranslation(
196 [EpTuple(sep, dep), EpTuple(sep, dep)])
200 def cnat_test_translation(self, t1, nbr, sports, isV6=False):
201 ip_v = "ip6" if isV6 else "ip4"
202 ip_class = IPv6 if isV6 else IP
208 for src in self.pg0.remote_hosts:
211 p1 = (Ether(dst=self.pg0.local_mac,
213 ip_class(src=getattr(src, ip_v), dst=vip.ip) /
214 vip.l4p(sport=sport, dport=vip.port) /
217 self.vapi.cli("trace add pg-input 1")
218 rxs = self.send_and_expect(self.pg0,
223 self.assert_packet_checksums_valid(rx)
226 getattr(self.pg1.remote_hosts[nbr], ip_v))
227 self.assertEqual(rx[vip.l4p].dport, 4000 + nbr)
231 self.assertEqual(rx[vip.l4p].sport, sport)
234 p1 = (Ether(dst=self.pg1.local_mac,
235 src=self.pg1.remote_mac) /
236 ip_class(src=getattr(
237 self.pg1.remote_hosts[nbr],
239 dst=getattr(src, ip_v)) /
240 vip.l4p(sport=4000 + nbr, dport=sport) /
243 rxs = self.send_and_expect(self.pg1,
248 self.assert_packet_checksums_valid(rx)
252 self.assertEqual(rx[vip.l4p].dport, sport)
253 self.assertEqual(rx[ip_class].src, vip.ip)
254 self.assertEqual(rx[vip.l4p].sport, vip.port)
257 # packets to the VIP that do not match a
258 # translation are dropped
260 p1 = (Ether(dst=self.pg0.local_mac,
262 ip_class(src=getattr(src, ip_v), dst=vip.ip) /
263 vip.l4p(sport=sport, dport=6666) /
266 self.send_and_assert_no_replies(self.pg0,
271 # packets from the VIP that do not match a
272 # session are forwarded
274 p1 = (Ether(dst=self.pg1.local_mac,
275 src=self.pg1.remote_mac) /
276 ip_class(src=getattr(
277 self.pg1.remote_hosts[nbr],
279 dst=getattr(src, ip_v)) /
280 vip.l4p(sport=6666, dport=sport) /
283 rxs = self.send_and_expect(self.pg1,
287 self.assertEqual(t1.get_stats()['packets'],
290 len(self.pg0.remote_hosts))
292 def cnat_test_translation_update(self, t1, sports, isV6=False):
293 ip_v = "ip6" if isV6 else "ip4"
294 ip_class = IPv6 if isV6 else IP
298 # modify the translation to use a different backend
300 dep = Ep(getattr(self.pg2, 'remote_' + ip_v), 5000)
301 sep = Ep("::", 0) if isV6 else Ep("0.0.0.0", 0)
302 t1.modify_vpp_config([EpTuple(sep, dep)])
305 # existing flows follow the old path
307 for src in self.pg0.remote_hosts:
310 p1 = (Ether(dst=self.pg0.local_mac,
312 ip_class(src=getattr(src, ip_v), dst=vip.ip) /
313 vip.l4p(sport=sport, dport=vip.port) /
316 rxs = self.send_and_expect(self.pg0,
321 # new flows go to the new backend
323 for src in self.pg0.remote_hosts:
324 p1 = (Ether(dst=self.pg0.local_mac,
326 ip_class(src=getattr(src, ip_v), dst=vip.ip) /
327 vip.l4p(sport=9999, dport=vip.port) /
330 rxs = self.send_and_expect(self.pg0,
334 def cnat_translation(self, vips, isV6=False):
335 """ CNat Translation """
337 ip_class = IPv6 if isV6 else IP
338 ip_v = "ip6" if isV6 else "ip4"
339 sports = [1234, 1233]
342 # turn the scanner off whilst testing otherwise sessions
345 self.vapi.cli("test cnat scanner off")
347 sessions = self.vapi.cnat_session_dump()
350 for nbr, vip in enumerate(vips):
351 trs.append(self.cnat_create_translation(vip, nbr))
353 self.logger.info(self.vapi.cli("sh cnat client"))
354 self.logger.info(self.vapi.cli("sh cnat translation"))
359 for nbr, vip in enumerate(vips):
360 self.cnat_test_translation(trs[nbr], nbr, sports, isV6=isV6)
361 self.cnat_test_translation_update(trs[nbr], sports, isV6=isV6)
363 self.logger.info(self.vapi.cli(
364 "sh ip6 fib %s" % self.pg0.remote_ip6))
366 self.logger.info(self.vapi.cli(
367 "sh ip fib %s" % self.pg0.remote_ip4))
368 self.logger.info(self.vapi.cli("sh cnat session verbose"))
371 # turn the scanner back on and wait until the sessions
374 self.vapi.cli("test cnat scanner on")
377 sessions = self.vapi.cnat_session_dump()
378 while (len(sessions) and n_tries < 100):
380 sessions = self.vapi.cnat_session_dump()
382 print(self.vapi.cli("show cnat session verbose"))
384 self.assertTrue(n_tries < 100)
385 self.vapi.cli("test cnat scanner off")
388 # load some flows again and purge
391 for src in self.pg0.remote_hosts:
394 p1 = (Ether(dst=self.pg0.local_mac,
396 ip_class(src=getattr(src, ip_v), dst=vip.ip) /
397 vip.l4p(sport=sport, dport=vip.port) /
399 self.send_and_expect(self.pg0,
406 self.assertTrue(self.vapi.cnat_session_dump())
407 self.vapi.cnat_session_purge()
408 self.assertFalse(self.vapi.cnat_session_dump())
412 Ep("30.0.0.1", 5555),
413 Ep("30.0.0.2", 5554),
414 Ep("30.0.0.2", 5553, UDP),
416 Ep("30::2", 5553, UDP),
420 self.pg0.generate_remote_hosts(len(vips))
421 self.pg0.configure_ipv6_neighbors()
422 self.pg0.configure_ipv4_neighbors()
424 self.pg1.generate_remote_hosts(len(vips))
425 self.pg1.configure_ipv6_neighbors()
426 self.pg1.configure_ipv4_neighbors()
428 self.vapi.cli("test cnat scanner off")
430 for nbr, vip in enumerate(vips):
431 trs.append(self.cnat_create_translation(vip, nbr))
433 self.logger.info(self.vapi.cli("sh cnat client"))
434 self.logger.info(self.vapi.cli("sh cnat translation"))
436 for nbr, vip in enumerate(vips):
438 client_addr = self.pg0.remote_hosts[0].ip6
439 remote_addr = self.pg1.remote_hosts[nbr].ip6
440 remote2_addr = self.pg2.remote_hosts[0].ip6
442 client_addr = self.pg0.remote_hosts[0].ip4
443 remote_addr = self.pg1.remote_hosts[nbr].ip4
444 remote2_addr = self.pg2.remote_hosts[0].ip4
445 IP46 = IPv6 if vip.isV6 else IP
447 p1 = (Ether(dst=self.pg0.local_mac,
448 src=self.pg0.remote_hosts[0].mac) /
449 IP46(src=client_addr, dst=vip.ip) /
450 vip.l4p(sport=sport, dport=vip.port) /
453 rxs = self.send_and_expect(self.pg0,
458 self.assert_packet_checksums_valid(rx)
459 self.assertEqual(rx[IP46].dst, remote_addr)
460 self.assertEqual(rx[vip.l4p].dport, 4000 + nbr)
461 self.assertEqual(rx[IP46].src, client_addr)
462 self.assertEqual(rx[vip.l4p].sport, sport)
464 InnerIP = rxs[0][IP46]
466 ICMP46 = ICMPv6DestUnreach if vip.isV6 else ICMP
467 ICMPelem = ICMPv6DestUnreach(code=1) if vip.isV6 else ICMP(type=11)
468 # from vip to client, ICMP error
469 p1 = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
470 IP46(src=remote_addr, dst=client_addr) /
473 rxs = self.send_and_expect(self.pg1,
477 TCPUDPError = TCPerror if vip.l4p == TCP else UDPerror
478 IP46error = IPerror6 if vip.isV6 else IPerror
480 self.assert_packet_checksums_valid(rx)
481 self.assertEqual(rx[IP46].src, vip.ip)
482 self.assertEqual(rx[ICMP46][IP46error].src, client_addr)
483 self.assertEqual(rx[ICMP46][IP46error].dst, vip.ip)
484 self.assertEqual(rx[ICMP46][IP46error]
485 [TCPUDPError].sport, sport)
486 self.assertEqual(rx[ICMP46][IP46error]
487 [TCPUDPError].dport, vip.port)
489 # from other remote to client, ICMP error
490 # outside shouldn't be NAT-ed
491 p1 = (Ether(dst=self.pg2.local_mac, src=self.pg2.remote_mac) /
492 IP46(src=remote2_addr, dst=client_addr) /
495 rxs = self.send_and_expect(self.pg1,
499 TCPUDPError = TCPerror if vip.l4p == TCP else UDPerror
500 IP46error = IPerror6 if vip.isV6 else IPerror
502 self.assert_packet_checksums_valid(rx)
503 self.assertEqual(rx[IP46].src, remote2_addr)
504 self.assertEqual(rx[ICMP46][IP46error].src, client_addr)
505 self.assertEqual(rx[ICMP46][IP46error].dst, vip.ip)
506 self.assertEqual(rx[ICMP46][IP46error]
507 [TCPUDPError].sport, sport)
508 self.assertEqual(rx[ICMP46][IP46error]
509 [TCPUDPError].dport, vip.port)
511 self.vapi.cnat_session_purge()
513 def test_cnat6(self):
514 # """ CNat Translation ipv6 """
518 Ep("30::2", 5553, UDP),
521 self.pg0.generate_remote_hosts(len(vips))
522 self.pg0.configure_ipv6_neighbors()
523 self.pg1.generate_remote_hosts(len(vips))
524 self.pg1.configure_ipv6_neighbors()
526 self.cnat_translation(vips, isV6=True)
528 def test_cnat4(self):
529 # """ CNat Translation ipv4 """
532 Ep("30.0.0.1", 5555),
533 Ep("30.0.0.2", 5554),
534 Ep("30.0.0.2", 5553, UDP),
537 self.pg0.generate_remote_hosts(len(vips))
538 self.pg0.configure_ipv4_neighbors()
539 self.pg1.generate_remote_hosts(len(vips))
540 self.pg1.configure_ipv4_neighbors()
542 self.cnat_translation(vips)
545 class TestCNatSourceNAT(VppTestCase):
546 """ CNat Source NAT """
547 extra_vpp_punt_config = ["cnat", "{",
548 "session-max-age", "1",
549 "tcp-max-age", "1", "}"]
553 super(TestCNatSourceNAT, cls).setUpClass()
556 def tearDownClass(cls):
557 super(TestCNatSourceNAT, cls).tearDownClass()
560 super(TestCNatSourceNAT, self).setUp()
562 self.create_pg_interfaces(range(3))
564 for i in self.pg_interfaces:
572 for i in self.pg_interfaces:
576 super(TestCNatSourceNAT, self).tearDown()
578 def cnat_set_snat_address(self, srcNatAddr, interface, isV6=False):
579 t1 = VppCNATSourceNat(self, srcNatAddr)
581 cnat_arc_name = "ip6-unicast" if isV6 else "ip4-unicast"
582 cnat_feature_name = "ip6-cnat-snat" if isV6 else "ip4-cnat-snat"
583 self.vapi.feature_enable_disable(
585 arc_name=cnat_arc_name,
586 feature_name=cnat_feature_name,
587 sw_if_index=interface.sw_if_index)
591 def cnat_test_sourcenat(self, srcNatAddr, l4p=TCP, isV6=False):
592 ip_v = "ip6" if isV6 else "ip4"
593 IP46 = IPv6 if isV6 else IP
594 sports = [1234, 1235, 1236]
595 dports = [6661, 6662, 6663]
597 self.pg0.generate_remote_hosts(1)
598 self.pg0.configure_ipv4_neighbors()
599 self.pg0.configure_ipv6_neighbors()
600 self.pg1.generate_remote_hosts(len(sports))
601 self.pg1.configure_ipv4_neighbors()
602 self.pg1.configure_ipv6_neighbors()
604 self.vapi.cli("test cnat scanner on")
605 t1 = self.cnat_set_snat_address(srcNatAddr, self.pg0, isV6)
607 for nbr, remote_host in enumerate(self.pg1.remote_hosts):
609 client_addr = self.pg0.remote_hosts[0].ip6
610 remote_addr = self.pg1.remote_hosts[nbr].ip6
612 client_addr = self.pg0.remote_hosts[0].ip4
613 remote_addr = self.pg1.remote_hosts[nbr].ip4
614 # from pods to outside network
616 Ether(dst=self.pg0.local_mac,
617 src=self.pg0.remote_hosts[0].mac) /
618 IP46(src=client_addr, dst=remote_addr) /
619 l4p(sport=sports[nbr], dport=dports[nbr]) /
622 rxs = self.send_and_expect(
627 self.assert_packet_checksums_valid(rx)
628 self.assertEqual(rx[IP46].dst, remote_addr)
629 self.assertEqual(rx[l4p].dport, dports[nbr])
630 self.assertEqual(rx[IP46].src, srcNatAddr)
631 sport = rx[l4p].sport
633 # from outside to pods
635 Ether(dst=self.pg1.local_mac,
636 src=self.pg1.remote_hosts[nbr].mac) /
637 IP46(src=remote_addr, dst=srcNatAddr) /
638 l4p(sport=dports[nbr], dport=sport) /
641 rxs = self.send_and_expect(
647 self.assert_packet_checksums_valid(rx)
648 self.assertEqual(rx[IP46].dst, client_addr)
649 self.assertEqual(rx[l4p].dport, sports[nbr])
650 self.assertEqual(rx[l4p].sport, dports[nbr])
651 self.assertEqual(rx[IP46].src, remote_addr)
653 # add remote host to exclude list
654 subnet_mask = 100 if isV6 else 16
655 subnet = "%s/%d" % (remote_addr, subnet_mask)
656 exclude_subnet = ip_network(subnet, strict=False)
658 t1.cnat_exclude_subnet(exclude_subnet)
659 self.vapi.cnat_session_purge()
661 rxs = self.send_and_expect(
666 self.assert_packet_checksums_valid(rx)
667 self.assertEqual(rx[IP46].dst, remote_addr)
668 self.assertEqual(rx[l4p].dport, dports[nbr])
669 self.assertEqual(rx[IP46].src, client_addr)
671 # remove remote host from exclude list
672 t1.cnat_exclude_subnet(exclude_subnet, isAdd=False)
673 self.vapi.cnat_session_purge()
675 rxs = self.send_and_expect(
681 self.assert_packet_checksums_valid(rx)
682 self.assertEqual(rx[IP46].dst, remote_addr)
683 self.assertEqual(rx[l4p].dport, dports[nbr])
684 self.assertEqual(rx[IP46].src, srcNatAddr)
686 def test_cnat6_sourcenat(self):
687 # """ CNat Source Nat ipv6 """
688 self.cnat_test_sourcenat(self.pg2.remote_hosts[0].ip6, TCP, True)
689 self.cnat_test_sourcenat(self.pg2.remote_hosts[0].ip6, UDP, True)
691 def test_cnat4_sourcenat(self):
692 # """ CNat Source Nat ipv4 """
693 self.cnat_test_sourcenat(self.pg2.remote_hosts[0].ip4, TCP)
694 self.cnat_test_sourcenat(self.pg2.remote_hosts[0].ip4, UDP)
697 if __name__ == '__main__':
698 unittest.main(testRunner=VppTestRunner)