5 from framework import VppTestCase, VppTestRunner
6 from vpp_ip import DpoProto
8 from scapy.packet import Raw
9 from scapy.layers.l2 import Ether
10 from scapy.layers.inet import IP, UDP, TCP, ICMP
11 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
12 from scapy.layers.inet6 import IPv6, IPerror6, ICMPv6DestUnreach
13 from scapy.layers.inet6 import ICMPv6EchoRequest, ICMPv6EchoReply
17 from ipaddress import ip_address, ip_network, \
18 IPv4Address, IPv6Address, IPv4Network, IPv6Network
20 from vpp_object import VppObject
21 from vpp_papi import VppEnum
26 def find_cnat_translation(test, id):
27 ts = test.vapi.cnat_translation_dump()
29 if id == t.translation.id:
37 def __init__(self, ip, port, l4p=TCP):
43 return {'addr': self.ip,
51 return ("%s:%d" % (self.ip, self.port))
54 class EpTuple(object):
57 def __init__(self, src, dst):
62 return {'src_ep': self.src.encode(),
63 'dst_ep': self.dst.encode()}
66 return ("%s->%s" % (self.src, self.dst))
69 class VppCNatTranslation(VppObject):
71 def __init__(self, test, iproto, vip, paths):
76 self.encoded_paths = []
77 for path in self.paths:
78 self.encoded_paths.append(path.encode())
82 ip_proto = VppEnum.vl_api_ip_proto_t
84 UDP: ip_proto.IP_API_PROTO_UDP,
85 TCP: ip_proto.IP_API_PROTO_TCP,
89 r = self._test.vapi.cnat_translation_del(id=self.id)
91 def add_vpp_config(self):
92 r = self._test.vapi.cnat_translation_update(
93 {'vip': self.vip.encode(),
94 'ip_proto': self.vl4_proto,
95 'n_paths': len(self.paths),
96 'paths': self.encoded_paths})
97 self._test.registry.register(self, self._test.logger)
100 def modify_vpp_config(self, paths):
102 self.encoded_paths = []
103 for path in self.paths:
104 self.encoded_paths.append(path.encode())
106 r = self._test.vapi.cnat_translation_update(
107 {'vip': self.vip.encode(),
108 'ip_proto': self.vl4_proto,
109 'n_paths': len(self.paths),
110 'paths': self.encoded_paths})
111 self._test.registry.register(self, self._test.logger)
113 def remove_vpp_config(self):
114 self._test.vapi.cnat_translation_del(self.id)
116 def query_vpp_config(self):
117 return find_cnat_translation(self._test, self.id)
120 return ("cnat-translation-%s" % (self.vip))
123 c = self._test.statistics.get_counter("/net/cnat-translation")
127 class TestCNatTranslation(VppTestCase):
128 """ CNat Translation """
129 extra_vpp_punt_config = ["cnat", "{",
130 "session-db-buckets", "64",
131 "session-cleanup-timeout", "0.1",
132 "session-max-age", "1",
134 "scanner", "off", "}"]
138 super(TestCNatTranslation, cls).setUpClass()
141 def tearDownClass(cls):
142 super(TestCNatTranslation, cls).tearDownClass()
145 super(TestCNatTranslation, self).setUp()
147 self.create_pg_interfaces(range(3))
149 for i in self.pg_interfaces:
157 for i in self.pg_interfaces:
161 super(TestCNatTranslation, self).tearDown()
163 def cnat_create_translation(self, vip, nbr):
164 ip_v = "ip6" if vip.isV6 else "ip4"
165 dep = Ep(getattr(self.pg1.remote_hosts[nbr], ip_v), 4000 + nbr)
166 sep = Ep("::", 0) if vip.isV6 else Ep("0.0.0.0", 0)
167 t1 = VppCNatTranslation(
169 [EpTuple(sep, dep), EpTuple(sep, dep)])
173 def cnat_test_translation(self, t1, nbr, sports, isV6=False):
174 ip_v = "ip6" if isV6 else "ip4"
175 ip_class = IPv6 if isV6 else IP
181 for src in self.pg0.remote_hosts:
184 p1 = (Ether(dst=self.pg0.local_mac,
186 ip_class(src=getattr(src, ip_v), dst=vip.ip) /
187 vip.l4p(sport=sport, dport=vip.port) /
190 self.vapi.cli("trace add pg-input 1")
191 rxs = self.send_and_expect(self.pg0,
196 self.assert_packet_checksums_valid(rx)
199 getattr(self.pg1.remote_hosts[nbr], ip_v))
200 self.assertEqual(rx[vip.l4p].dport, 4000 + nbr)
204 self.assertEqual(rx[vip.l4p].sport, sport)
207 p1 = (Ether(dst=self.pg1.local_mac,
208 src=self.pg1.remote_mac) /
209 ip_class(src=getattr(
210 self.pg1.remote_hosts[nbr],
212 dst=getattr(src, ip_v)) /
213 vip.l4p(sport=4000 + nbr, dport=sport) /
216 rxs = self.send_and_expect(self.pg1,
221 self.assert_packet_checksums_valid(rx)
225 self.assertEqual(rx[vip.l4p].dport, sport)
226 self.assertEqual(rx[ip_class].src, vip.ip)
227 self.assertEqual(rx[vip.l4p].sport, vip.port)
230 # packets to the VIP that do not match a
231 # translation are dropped
233 p1 = (Ether(dst=self.pg0.local_mac,
235 ip_class(src=getattr(src, ip_v), dst=vip.ip) /
236 vip.l4p(sport=sport, dport=6666) /
239 self.send_and_assert_no_replies(self.pg0,
244 # packets from the VIP that do not match a
245 # session are forwarded
247 p1 = (Ether(dst=self.pg1.local_mac,
248 src=self.pg1.remote_mac) /
249 ip_class(src=getattr(
250 self.pg1.remote_hosts[nbr],
252 dst=getattr(src, ip_v)) /
253 vip.l4p(sport=6666, dport=sport) /
256 rxs = self.send_and_expect(self.pg1,
260 self.assertEqual(t1.get_stats()['packets'],
263 len(self.pg0.remote_hosts))
265 def cnat_test_translation_update(self, t1, sports, isV6=False):
266 ip_v = "ip6" if isV6 else "ip4"
267 ip_class = IPv6 if isV6 else IP
271 # modify the translation to use a different backend
273 dep = Ep(getattr(self.pg2, 'remote_' + ip_v), 5000)
274 sep = Ep("::", 0) if isV6 else Ep("0.0.0.0", 0)
275 t1.modify_vpp_config([EpTuple(sep, dep)])
278 # existing flows follow the old path
280 for src in self.pg0.remote_hosts:
283 p1 = (Ether(dst=self.pg0.local_mac,
285 ip_class(src=getattr(src, ip_v), dst=vip.ip) /
286 vip.l4p(sport=sport, dport=vip.port) /
289 rxs = self.send_and_expect(self.pg0,
294 # new flows go to the new backend
296 for src in self.pg0.remote_hosts:
297 p1 = (Ether(dst=self.pg0.local_mac,
299 ip_class(src=getattr(src, ip_v), dst=vip.ip) /
300 vip.l4p(sport=9999, dport=vip.port) /
303 rxs = self.send_and_expect(self.pg0,
307 def cnat_translation(self, vips, isV6=False):
308 """ CNat Translation """
310 ip_class = IPv6 if isV6 else IP
311 ip_v = "ip6" if isV6 else "ip4"
312 sports = [1234, 1233]
315 # turn the scanner off whilst testing otherwise sessions
318 self.vapi.cli("test cnat scanner off")
320 sessions = self.vapi.cnat_session_dump()
323 for nbr, vip in enumerate(vips):
324 trs.append(self.cnat_create_translation(vip, nbr))
326 self.logger.info(self.vapi.cli("sh cnat client"))
327 self.logger.info(self.vapi.cli("sh cnat translation"))
332 for nbr, vip in enumerate(vips):
333 self.cnat_test_translation(trs[nbr], nbr, sports, isV6=isV6)
334 self.cnat_test_translation_update(trs[nbr], sports, isV6=isV6)
336 self.logger.info(self.vapi.cli(
337 "sh ip6 fib %s" % self.pg0.remote_ip6))
339 self.logger.info(self.vapi.cli(
340 "sh ip fib %s" % self.pg0.remote_ip4))
341 self.logger.info(self.vapi.cli("sh cnat session verbose"))
344 # turn the scanner back on and wait until the sessions
347 self.vapi.cli("test cnat scanner on")
350 sessions = self.vapi.cnat_session_dump()
351 while (len(sessions) and n_tries < 100):
353 sessions = self.vapi.cnat_session_dump()
355 print(self.vapi.cli("show cnat session verbose"))
357 self.assertTrue(n_tries < 100)
358 self.vapi.cli("test cnat scanner off")
361 # load some flows again and purge
364 for src in self.pg0.remote_hosts:
367 p1 = (Ether(dst=self.pg0.local_mac,
369 ip_class(src=getattr(src, ip_v), dst=vip.ip) /
370 vip.l4p(sport=sport, dport=vip.port) /
372 self.send_and_expect(self.pg0,
379 self.assertTrue(self.vapi.cnat_session_dump())
380 self.vapi.cnat_session_purge()
381 self.assertFalse(self.vapi.cnat_session_dump())
385 Ep("30.0.0.1", 5555),
386 Ep("30.0.0.2", 5554),
387 Ep("30.0.0.2", 5553, UDP),
389 Ep("30::2", 5553, UDP),
393 self.pg0.generate_remote_hosts(len(vips))
394 self.pg0.configure_ipv6_neighbors()
395 self.pg0.configure_ipv4_neighbors()
397 self.pg1.generate_remote_hosts(len(vips))
398 self.pg1.configure_ipv6_neighbors()
399 self.pg1.configure_ipv4_neighbors()
401 self.vapi.cli("test cnat scanner off")
403 for nbr, vip in enumerate(vips):
404 trs.append(self.cnat_create_translation(vip, nbr))
406 self.logger.info(self.vapi.cli("sh cnat client"))
407 self.logger.info(self.vapi.cli("sh cnat translation"))
409 for nbr, vip in enumerate(vips):
411 client_addr = self.pg0.remote_hosts[0].ip6
412 remote_addr = self.pg1.remote_hosts[nbr].ip6
413 remote2_addr = self.pg2.remote_hosts[0].ip6
415 client_addr = self.pg0.remote_hosts[0].ip4
416 remote_addr = self.pg1.remote_hosts[nbr].ip4
417 remote2_addr = self.pg2.remote_hosts[0].ip4
418 IP46 = IPv6 if vip.isV6 else IP
420 p1 = (Ether(dst=self.pg0.local_mac,
421 src=self.pg0.remote_hosts[0].mac) /
422 IP46(src=client_addr, dst=vip.ip) /
423 vip.l4p(sport=sport, dport=vip.port) /
426 rxs = self.send_and_expect(self.pg0,
431 self.assert_packet_checksums_valid(rx)
432 self.assertEqual(rx[IP46].dst, remote_addr)
433 self.assertEqual(rx[vip.l4p].dport, 4000 + nbr)
434 self.assertEqual(rx[IP46].src, client_addr)
435 self.assertEqual(rx[vip.l4p].sport, sport)
437 InnerIP = rxs[0][IP46]
439 ICMP46 = ICMPv6DestUnreach if vip.isV6 else ICMP
440 ICMPelem = ICMPv6DestUnreach(code=1) if vip.isV6 else ICMP(type=11)
441 # from vip to client, ICMP error
442 p1 = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
443 IP46(src=remote_addr, dst=client_addr) /
446 rxs = self.send_and_expect(self.pg1,
450 TCPUDPError = TCPerror if vip.l4p == TCP else UDPerror
451 IP46error = IPerror6 if vip.isV6 else IPerror
453 self.assert_packet_checksums_valid(rx)
454 self.assertEqual(rx[IP46].src, vip.ip)
455 self.assertEqual(rx[ICMP46][IP46error].src, client_addr)
456 self.assertEqual(rx[ICMP46][IP46error].dst, vip.ip)
457 self.assertEqual(rx[ICMP46][IP46error]
458 [TCPUDPError].sport, sport)
459 self.assertEqual(rx[ICMP46][IP46error]
460 [TCPUDPError].dport, vip.port)
462 # from other remote to client, ICMP error
463 # outside shouldn't be NAT-ed
464 p1 = (Ether(dst=self.pg2.local_mac, src=self.pg2.remote_mac) /
465 IP46(src=remote2_addr, dst=client_addr) /
468 rxs = self.send_and_expect(self.pg1,
472 TCPUDPError = TCPerror if vip.l4p == TCP else UDPerror
473 IP46error = IPerror6 if vip.isV6 else IPerror
475 self.assert_packet_checksums_valid(rx)
476 self.assertEqual(rx[IP46].src, remote2_addr)
477 self.assertEqual(rx[ICMP46][IP46error].src, client_addr)
478 self.assertEqual(rx[ICMP46][IP46error].dst, vip.ip)
479 self.assertEqual(rx[ICMP46][IP46error]
480 [TCPUDPError].sport, sport)
481 self.assertEqual(rx[ICMP46][IP46error]
482 [TCPUDPError].dport, vip.port)
484 self.vapi.cnat_session_purge()
486 def test_cnat6(self):
487 # """ CNat Translation ipv6 """
491 Ep("30::2", 5553, UDP),
494 self.pg0.generate_remote_hosts(len(vips))
495 self.pg0.configure_ipv6_neighbors()
496 self.pg1.generate_remote_hosts(len(vips))
497 self.pg1.configure_ipv6_neighbors()
499 self.cnat_translation(vips, isV6=True)
501 def test_cnat4(self):
502 # """ CNat Translation ipv4 """
505 Ep("30.0.0.1", 5555),
506 Ep("30.0.0.2", 5554),
507 Ep("30.0.0.2", 5553, UDP),
510 self.pg0.generate_remote_hosts(len(vips))
511 self.pg0.configure_ipv4_neighbors()
512 self.pg1.generate_remote_hosts(len(vips))
513 self.pg1.configure_ipv4_neighbors()
515 self.cnat_translation(vips)
518 class TestCNatSourceNAT(VppTestCase):
519 """ CNat Source NAT """
520 extra_vpp_punt_config = ["cnat", "{",
521 "session-max-age", "1",
522 "tcp-max-age", "1", "}"]
526 super(TestCNatSourceNAT, cls).setUpClass()
529 def tearDownClass(cls):
530 super(TestCNatSourceNAT, cls).tearDownClass()
533 super(TestCNatSourceNAT, self).setUp()
535 self.create_pg_interfaces(range(3))
537 for i in self.pg_interfaces:
544 self.pg0.configure_ipv6_neighbors()
545 self.pg0.configure_ipv4_neighbors()
546 self.pg1.generate_remote_hosts(2)
547 self.pg1.configure_ipv4_neighbors()
548 self.pg1.configure_ipv6_neighbors()
550 self.vapi.cli("test cnat scanner off")
551 self.vapi.cnat_set_snat_addresses(
552 snat_ip4=self.pg2.remote_hosts[0].ip4,
553 snat_ip6=self.pg2.remote_hosts[0].ip6)
554 self.vapi.feature_enable_disable(
556 arc_name="ip6-unicast",
557 feature_name="ip6-cnat-snat",
558 sw_if_index=self.pg0.sw_if_index)
559 self.vapi.feature_enable_disable(
561 arc_name="ip4-unicast",
562 feature_name="ip4-cnat-snat",
563 sw_if_index=self.pg0.sw_if_index)
566 self.vapi.cnat_session_purge()
567 for i in self.pg_interfaces:
571 super(TestCNatSourceNAT, self).tearDown()
573 def test_snat_v6(self):
574 # """ CNat Source Nat v6 """
575 self.sourcenat_test_tcp_udp_conf(TCP, isV6=True)
576 self.sourcenat_test_tcp_udp_conf(UDP, isV6=True)
577 self.sourcenat_test_icmp_err_conf(isV6=True)
578 self.sourcenat_test_icmp_echo6_conf()
580 def test_snat_v4(self):
581 # """ CNat Source Nat v4 """
582 self.sourcenat_test_tcp_udp_conf(TCP)
583 self.sourcenat_test_tcp_udp_conf(UDP)
584 self.sourcenat_test_icmp_err_conf()
585 self.sourcenat_test_icmp_echo4_conf()
587 def sourcenat_test_icmp_echo6_conf(self):
588 sports = [1234, 1235]
589 dports = [6661, 6662]
591 for nbr, remote_host in enumerate(self.pg1.remote_hosts):
592 client_addr = self.pg0.remote_hosts[0].ip6
593 remote_addr = self.pg1.remote_hosts[nbr].ip6
594 src_nat_addr = self.pg2.remote_hosts[0].ip6
596 # ping from pods to outside network
598 Ether(dst=self.pg0.local_mac,
599 src=self.pg0.remote_hosts[0].mac) /
600 IPv6(src=client_addr, dst=remote_addr) /
601 ICMPv6EchoRequest(id=0xfeed) /
604 rxs = self.send_and_expect(
610 self.assertEqual(rx[IPv6].src, src_nat_addr)
611 self.assert_packet_checksums_valid(rx)
613 received_id = rx[0][ICMPv6EchoRequest].id
614 # ping reply from outside to pods
616 Ether(dst=self.pg1.local_mac,
617 src=self.pg1.remote_hosts[nbr].mac) /
618 IPv6(src=remote_addr, dst=src_nat_addr) /
619 ICMPv6EchoReply(id=received_id))
620 rxs = self.send_and_expect(
626 self.assert_packet_checksums_valid(rx)
627 self.assertEqual(rx[IPv6].src, remote_addr)
628 self.assertEqual(rx[ICMPv6EchoReply].id, 0xfeed)
630 def sourcenat_test_icmp_echo4_conf(self):
631 sports = [1234, 1235]
632 dports = [6661, 6662]
634 for nbr, remote_host in enumerate(self.pg1.remote_hosts):
636 client_addr = self.pg0.remote_hosts[0].ip4
637 remote_addr = self.pg1.remote_hosts[nbr].ip4
638 src_nat_addr = self.pg2.remote_hosts[0].ip4
640 # ping from pods to outside network
642 Ether(dst=self.pg0.local_mac,
643 src=self.pg0.remote_hosts[0].mac) /
644 IP46(src=client_addr, dst=remote_addr) /
645 ICMP(type=8, id=0xfeed) /
648 rxs = self.send_and_expect(
654 self.assertEqual(rx[IP46].src, src_nat_addr)
655 self.assert_packet_checksums_valid(rx)
657 received_id = rx[0][ICMP].id
658 # ping reply from outside to pods
660 Ether(dst=self.pg1.local_mac,
661 src=self.pg1.remote_hosts[nbr].mac) /
662 IP46(src=remote_addr, dst=src_nat_addr) /
663 ICMP(type=0, id=received_id))
664 rxs = self.send_and_expect(
670 self.assert_packet_checksums_valid(rx)
671 self.assertEqual(rx[IP46].src, remote_addr)
672 self.assertEqual(rx[ICMP].id, 0xfeed)
674 def sourcenat_test_icmp_err_conf(self, isV6=False):
675 sports = [1234, 1235]
676 dports = [6661, 6662]
678 for nbr, remote_host in enumerate(self.pg1.remote_hosts):
681 client_addr = self.pg0.remote_hosts[0].ip6
682 remote_addr = self.pg1.remote_hosts[nbr].ip6
683 src_nat_addr = self.pg2.remote_hosts[0].ip6
684 ICMP46 = ICMPv6DestUnreach
685 ICMPelem = ICMPv6DestUnreach(code=1)
689 client_addr = self.pg0.remote_hosts[0].ip4
690 remote_addr = self.pg1.remote_hosts[nbr].ip4
691 src_nat_addr = self.pg2.remote_hosts[0].ip4
694 ICMPelem = ICMP(type=11)
696 # from pods to outside network
698 Ether(dst=self.pg0.local_mac,
699 src=self.pg0.remote_hosts[0].mac) /
700 IP46(src=client_addr, dst=remote_addr) /
701 TCP(sport=sports[nbr], dport=dports[nbr]) /
704 rxs = self.send_and_expect(
709 self.assert_packet_checksums_valid(rx)
710 self.assertEqual(rx[IP46].dst, remote_addr)
711 self.assertEqual(rx[TCP].dport, dports[nbr])
712 self.assertEqual(rx[IP46].src, src_nat_addr)
713 sport = rx[TCP].sport
715 InnerIP = rxs[0][IP46]
716 # from outside to pods, ICMP error
718 Ether(dst=self.pg1.local_mac,
719 src=self.pg1.remote_hosts[nbr].mac) /
720 IP46(src=remote_addr, dst=src_nat_addr) /
723 rxs = self.send_and_expect(
729 self.assert_packet_checksums_valid(rx)
730 self.assertEqual(rx[IP46].src, remote_addr)
731 self.assertEqual(rx[ICMP46][IP46error].src, client_addr)
732 self.assertEqual(rx[ICMP46][IP46error].dst, remote_addr)
733 self.assertEqual(rx[ICMP46][IP46error]
734 [TCPerror].sport, sports[nbr])
735 self.assertEqual(rx[ICMP46][IP46error]
736 [TCPerror].dport, dports[nbr])
738 def sourcenat_test_tcp_udp_conf(self, l4p, isV6=False):
739 sports = [1234, 1235]
740 dports = [6661, 6662]
742 for nbr, remote_host in enumerate(self.pg1.remote_hosts):
745 client_addr = self.pg0.remote_hosts[0].ip6
746 remote_addr = self.pg1.remote_hosts[nbr].ip6
747 src_nat_addr = self.pg2.remote_hosts[0].ip6
748 exclude_prefix = ip_network(
749 "%s/100" % remote_addr, strict=False)
752 client_addr = self.pg0.remote_hosts[0].ip4
753 remote_addr = self.pg1.remote_hosts[nbr].ip4
754 src_nat_addr = self.pg2.remote_hosts[0].ip4
755 exclude_prefix = ip_network(
756 "%s/16" % remote_addr, strict=False)
757 # from pods to outside network
759 Ether(dst=self.pg0.local_mac,
760 src=self.pg0.remote_hosts[0].mac) /
761 IP46(src=client_addr, dst=remote_addr) /
762 l4p(sport=sports[nbr], dport=dports[nbr]) /
765 rxs = self.send_and_expect(
770 self.assert_packet_checksums_valid(rx)
771 self.assertEqual(rx[IP46].dst, remote_addr)
772 self.assertEqual(rx[l4p].dport, dports[nbr])
773 self.assertEqual(rx[IP46].src, src_nat_addr)
774 sport = rx[l4p].sport
776 # from outside to pods
778 Ether(dst=self.pg1.local_mac,
779 src=self.pg1.remote_hosts[nbr].mac) /
780 IP46(src=remote_addr, dst=src_nat_addr) /
781 l4p(sport=dports[nbr], dport=sport) /
784 rxs = self.send_and_expect(
790 self.assert_packet_checksums_valid(rx)
791 self.assertEqual(rx[IP46].dst, client_addr)
792 self.assertEqual(rx[l4p].dport, sports[nbr])
793 self.assertEqual(rx[l4p].sport, dports[nbr])
794 self.assertEqual(rx[IP46].src, remote_addr)
796 # add remote host to exclude list
797 self.vapi.cnat_add_del_snat_prefix(prefix=exclude_prefix, is_add=1)
798 self.vapi.cnat_session_purge()
800 rxs = self.send_and_expect(
805 self.assert_packet_checksums_valid(rx)
806 self.assertEqual(rx[IP46].dst, remote_addr)
807 self.assertEqual(rx[l4p].dport, dports[nbr])
808 self.assertEqual(rx[IP46].src, client_addr)
810 # remove remote host from exclude list
811 self.vapi.cnat_add_del_snat_prefix(prefix=exclude_prefix, is_add=0)
812 self.vapi.cnat_session_purge()
814 rxs = self.send_and_expect(
820 self.assert_packet_checksums_valid(rx)
821 self.assertEqual(rx[IP46].dst, remote_addr)
822 self.assertEqual(rx[l4p].dport, dports[nbr])
823 self.assertEqual(rx[IP46].src, src_nat_addr)
825 self.vapi.cnat_session_purge()
828 if __name__ == '__main__':
829 unittest.main(testRunner=VppTestRunner)