5 from framework import VppTestCase, VppTestRunner
6 from vpp_ip import DpoProto, INVALID_INDEX
7 from itertools import product
9 from scapy.packet import Raw
10 from scapy.layers.l2 import Ether
11 from scapy.layers.inet import IP, UDP, TCP, ICMP
12 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
13 from scapy.layers.inet6 import IPv6, IPerror6, ICMPv6DestUnreach
14 from scapy.layers.inet6 import ICMPv6EchoRequest, ICMPv6EchoReply
18 from ipaddress import ip_address, ip_network, \
19 IPv4Address, IPv6Address, IPv4Network, IPv6Network
21 from vpp_object import VppObject
22 from vpp_papi import VppEnum
30 def __init__(self, ip=None, port=0, l4p=TCP,
31 sw_if_index=INVALID_INDEX, is_v6=False):
34 self.ip = "::" if is_v6 else "0.0.0.0"
37 self.sw_if_index = sw_if_index
39 self.if_af = VppEnum.vl_api_address_family_t.ADDRESS_IP6
41 self.if_af = VppEnum.vl_api_address_family_t.ADDRESS_IP4
44 return {'addr': self.ip,
46 'sw_if_index': self.sw_if_index,
50 def from_pg(cls, pg, is_v6=False):
52 return cls(is_v6=is_v6)
54 return cls(sw_if_index=pg.sw_if_index, is_v6=is_v6)
61 return ("%s:%d" % (self.ip, self.port))
64 class EpTuple(object):
67 def __init__(self, src, dst):
72 return {'src_ep': self.src.encode(),
73 'dst_ep': self.dst.encode()}
76 return ("%s->%s" % (self.src, self.dst))
79 class VppCNatTranslation(VppObject):
81 def __init__(self, test, iproto, vip, paths):
86 self.encoded_paths = []
87 for path in self.paths:
88 self.encoded_paths.append(path.encode())
91 return ("%s %s %s" % (self.vip, self.iproto, self.paths))
95 ip_proto = VppEnum.vl_api_ip_proto_t
97 UDP: ip_proto.IP_API_PROTO_UDP,
98 TCP: ip_proto.IP_API_PROTO_TCP,
101 def add_vpp_config(self):
102 r = self._test.vapi.cnat_translation_update(
103 {'vip': self.vip.encode(),
104 'ip_proto': self.vl4_proto,
105 'n_paths': len(self.paths),
106 'paths': self.encoded_paths})
107 self._test.registry.register(self, self._test.logger)
110 def modify_vpp_config(self, paths):
112 self.encoded_paths = []
113 for path in self.paths:
114 self.encoded_paths.append(path.encode())
116 r = self._test.vapi.cnat_translation_update(
117 {'vip': self.vip.encode(),
118 'ip_proto': self.vl4_proto,
119 'n_paths': len(self.paths),
120 'paths': self.encoded_paths})
121 self._test.registry.register(self, self._test.logger)
123 def remove_vpp_config(self):
124 self._test.vapi.cnat_translation_del(id=self.id)
126 def query_vpp_config(self):
127 for t in self._test.vapi.cnat_translation_dump():
128 if self.id == t.translation.id:
133 return ("cnat-translation-%s" % (self.vip))
136 c = self._test.statistics.get_counter("/net/cnat-translation")
140 class TestCNatTranslation(VppTestCase):
141 """ CNat Translation """
142 extra_vpp_punt_config = ["cnat", "{",
143 "session-db-buckets", "64",
144 "session-cleanup-timeout", "0.1",
145 "session-max-age", "1",
147 "scanner", "off", "}"]
151 super(TestCNatTranslation, cls).setUpClass()
154 def tearDownClass(cls):
155 super(TestCNatTranslation, cls).tearDownClass()
158 super(TestCNatTranslation, self).setUp()
160 self.create_pg_interfaces(range(3))
162 for i in self.pg_interfaces:
170 for i in self.pg_interfaces:
174 super(TestCNatTranslation, self).tearDown()
176 def cnat_create_translation(self, vip, nbr):
177 ip_v = "ip6" if vip.isV6 else "ip4"
178 dep = Ep(getattr(self.pg1.remote_hosts[nbr], ip_v), 4000 + nbr)
179 sep = Ep("::", 0) if vip.isV6 else Ep("0.0.0.0", 0)
180 t1 = VppCNatTranslation(
182 [EpTuple(sep, dep), EpTuple(sep, dep)])
186 def cnat_test_translation(self, t1, nbr, sports, isV6=False):
187 ip_v = "ip6" if isV6 else "ip4"
188 ip_class = IPv6 if isV6 else IP
194 for src in self.pg0.remote_hosts:
197 p1 = (Ether(dst=self.pg0.local_mac,
199 ip_class(src=getattr(src, ip_v), dst=vip.ip) /
200 vip.l4p(sport=sport, dport=vip.port) /
203 self.vapi.cli("trace add pg-input 1")
204 rxs = self.send_and_expect(self.pg0,
207 self.logger.info(self.vapi.cli("show trace max 1"))
210 self.assert_packet_checksums_valid(rx)
213 getattr(self.pg1.remote_hosts[nbr], ip_v))
214 self.assertEqual(rx[vip.l4p].dport, 4000 + nbr)
218 self.assertEqual(rx[vip.l4p].sport, sport)
221 p1 = (Ether(dst=self.pg1.local_mac,
222 src=self.pg1.remote_mac) /
223 ip_class(src=getattr(
224 self.pg1.remote_hosts[nbr],
226 dst=getattr(src, ip_v)) /
227 vip.l4p(sport=4000 + nbr, dport=sport) /
230 rxs = self.send_and_expect(self.pg1,
235 self.assert_packet_checksums_valid(rx)
239 self.assertEqual(rx[vip.l4p].dport, sport)
240 self.assertEqual(rx[ip_class].src, vip.ip)
241 self.assertEqual(rx[vip.l4p].sport, vip.port)
244 # packets to the VIP that do not match a
245 # translation are dropped
247 p1 = (Ether(dst=self.pg0.local_mac,
249 ip_class(src=getattr(src, ip_v), dst=vip.ip) /
250 vip.l4p(sport=sport, dport=6666) /
253 self.send_and_assert_no_replies(self.pg0,
258 # packets from the VIP that do not match a
259 # session are forwarded
261 p1 = (Ether(dst=self.pg1.local_mac,
262 src=self.pg1.remote_mac) /
263 ip_class(src=getattr(
264 self.pg1.remote_hosts[nbr],
266 dst=getattr(src, ip_v)) /
267 vip.l4p(sport=6666, dport=sport) /
270 rxs = self.send_and_expect(self.pg1,
274 self.assertEqual(t1.get_stats()['packets'],
277 len(self.pg0.remote_hosts))
279 def cnat_test_translation_update(self, t1, sports, isV6=False):
280 ip_v = "ip6" if isV6 else "ip4"
281 ip_class = IPv6 if isV6 else IP
285 # modify the translation to use a different backend
287 dep = Ep(getattr(self.pg2, 'remote_' + ip_v), 5000)
288 sep = Ep("::", 0) if isV6 else Ep("0.0.0.0", 0)
289 t1.modify_vpp_config([EpTuple(sep, dep)])
292 # existing flows follow the old path
294 for src in self.pg0.remote_hosts:
297 p1 = (Ether(dst=self.pg0.local_mac,
299 ip_class(src=getattr(src, ip_v), dst=vip.ip) /
300 vip.l4p(sport=sport, dport=vip.port) /
303 rxs = self.send_and_expect(self.pg0,
308 # new flows go to the new backend
310 for src in self.pg0.remote_hosts:
311 p1 = (Ether(dst=self.pg0.local_mac,
313 ip_class(src=getattr(src, ip_v), dst=vip.ip) /
314 vip.l4p(sport=9999, dport=vip.port) /
317 rxs = self.send_and_expect(self.pg0,
321 def cnat_translation(self, vips, isV6=False):
322 """ CNat Translation """
324 ip_class = IPv6 if isV6 else IP
325 ip_v = "ip6" if isV6 else "ip4"
326 sports = [1234, 1233]
329 # turn the scanner off whilst testing otherwise sessions
332 self.vapi.cli("test cnat scanner off")
334 sessions = self.vapi.cnat_session_dump()
337 for nbr, vip in enumerate(vips):
338 trs.append(self.cnat_create_translation(vip, nbr))
340 self.logger.info(self.vapi.cli("sh cnat client"))
341 self.logger.info(self.vapi.cli("sh cnat translation"))
346 for nbr, vip in enumerate(vips):
347 self.cnat_test_translation(trs[nbr], nbr, sports, isV6=isV6)
348 self.cnat_test_translation_update(trs[nbr], sports, isV6=isV6)
350 self.logger.info(self.vapi.cli(
351 "sh ip6 fib %s" % self.pg0.remote_ip6))
353 self.logger.info(self.vapi.cli(
354 "sh ip fib %s" % self.pg0.remote_ip4))
355 self.logger.info(self.vapi.cli("sh cnat session verbose"))
358 # turn the scanner back on and wait until the sessions
361 self.vapi.cli("test cnat scanner on")
364 sessions = self.vapi.cnat_session_dump()
365 while (len(sessions) and n_tries < 100):
367 sessions = self.vapi.cnat_session_dump()
369 self.logger.info(self.vapi.cli("show cnat session verbose"))
371 self.assertTrue(n_tries < 100)
372 self.vapi.cli("test cnat scanner off")
375 # load some flows again and purge
378 for src in self.pg0.remote_hosts:
381 p1 = (Ether(dst=self.pg0.local_mac,
383 ip_class(src=getattr(src, ip_v), dst=vip.ip) /
384 vip.l4p(sport=sport, dport=vip.port) /
386 self.send_and_expect(self.pg0,
391 tr.remove_vpp_config()
393 self.assertTrue(self.vapi.cnat_session_dump())
394 self.vapi.cnat_session_purge()
395 self.assertFalse(self.vapi.cnat_session_dump())
399 Ep("30.0.0.1", 5555),
400 Ep("30.0.0.2", 5554),
401 Ep("30.0.0.2", 5553, UDP),
403 Ep("30::2", 5553, UDP),
407 self.pg0.generate_remote_hosts(len(vips))
408 self.pg0.configure_ipv6_neighbors()
409 self.pg0.configure_ipv4_neighbors()
411 self.pg1.generate_remote_hosts(len(vips))
412 self.pg1.configure_ipv6_neighbors()
413 self.pg1.configure_ipv4_neighbors()
415 self.vapi.cli("test cnat scanner off")
417 for nbr, vip in enumerate(vips):
418 trs.append(self.cnat_create_translation(vip, nbr))
420 self.logger.info(self.vapi.cli("sh cnat client"))
421 self.logger.info(self.vapi.cli("sh cnat translation"))
423 for nbr, vip in enumerate(vips):
425 client_addr = self.pg0.remote_hosts[0].ip6
426 remote_addr = self.pg1.remote_hosts[nbr].ip6
427 remote2_addr = self.pg2.remote_hosts[0].ip6
429 client_addr = self.pg0.remote_hosts[0].ip4
430 remote_addr = self.pg1.remote_hosts[nbr].ip4
431 remote2_addr = self.pg2.remote_hosts[0].ip4
432 IP46 = IPv6 if vip.isV6 else IP
434 p1 = (Ether(dst=self.pg0.local_mac,
435 src=self.pg0.remote_hosts[0].mac) /
436 IP46(src=client_addr, dst=vip.ip) /
437 vip.l4p(sport=sport, dport=vip.port) /
440 rxs = self.send_and_expect(self.pg0,
445 self.assert_packet_checksums_valid(rx)
446 self.assertEqual(rx[IP46].dst, remote_addr)
447 self.assertEqual(rx[vip.l4p].dport, 4000 + nbr)
448 self.assertEqual(rx[IP46].src, client_addr)
449 self.assertEqual(rx[vip.l4p].sport, sport)
451 InnerIP = rxs[0][IP46]
453 ICMP46 = ICMPv6DestUnreach if vip.isV6 else ICMP
454 ICMPelem = ICMPv6DestUnreach(code=1) if vip.isV6 else ICMP(type=11)
455 # from vip to client, ICMP error
456 p1 = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
457 IP46(src=remote_addr, dst=client_addr) /
460 rxs = self.send_and_expect(self.pg1,
464 TCPUDPError = TCPerror if vip.l4p == TCP else UDPerror
465 IP46error = IPerror6 if vip.isV6 else IPerror
467 self.assert_packet_checksums_valid(rx)
468 self.assertEqual(rx[IP46].src, vip.ip)
469 self.assertEqual(rx[ICMP46][IP46error].src, client_addr)
470 self.assertEqual(rx[ICMP46][IP46error].dst, vip.ip)
471 self.assertEqual(rx[ICMP46][IP46error]
472 [TCPUDPError].sport, sport)
473 self.assertEqual(rx[ICMP46][IP46error]
474 [TCPUDPError].dport, vip.port)
476 # from other remote to client, ICMP error
477 # outside shouldn't be NAT-ed
478 p1 = (Ether(dst=self.pg2.local_mac, src=self.pg2.remote_mac) /
479 IP46(src=remote2_addr, dst=client_addr) /
482 rxs = self.send_and_expect(self.pg1,
486 TCPUDPError = TCPerror if vip.l4p == TCP else UDPerror
487 IP46error = IPerror6 if vip.isV6 else IPerror
489 self.assert_packet_checksums_valid(rx)
490 self.assertEqual(rx[IP46].src, remote2_addr)
491 self.assertEqual(rx[ICMP46][IP46error].src, client_addr)
492 self.assertEqual(rx[ICMP46][IP46error].dst, vip.ip)
493 self.assertEqual(rx[ICMP46][IP46error]
494 [TCPUDPError].sport, sport)
495 self.assertEqual(rx[ICMP46][IP46error]
496 [TCPUDPError].dport, vip.port)
498 self.vapi.cnat_session_purge()
500 def test_cnat6(self):
501 # """ CNat Translation ipv6 """
505 Ep("30::2", 5553, UDP),
508 self.pg0.generate_remote_hosts(len(vips))
509 self.pg0.configure_ipv6_neighbors()
510 self.pg1.generate_remote_hosts(len(vips))
511 self.pg1.configure_ipv6_neighbors()
513 self.cnat_translation(vips, isV6=True)
515 def test_cnat4(self):
516 # """ CNat Translation ipv4 """
519 Ep("30.0.0.1", 5555),
520 Ep("30.0.0.2", 5554),
521 Ep("30.0.0.2", 5553, UDP),
524 self.pg0.generate_remote_hosts(len(vips))
525 self.pg0.configure_ipv4_neighbors()
526 self.pg1.generate_remote_hosts(len(vips))
527 self.pg1.configure_ipv4_neighbors()
529 self.cnat_translation(vips)
532 class TestCNatSourceNAT(VppTestCase):
533 """ CNat Source NAT """
534 extra_vpp_punt_config = ["cnat", "{",
535 "session-max-age", "1",
536 "tcp-max-age", "1", "}"]
540 super(TestCNatSourceNAT, cls).setUpClass()
543 def tearDownClass(cls):
544 super(TestCNatSourceNAT, cls).tearDownClass()
547 super(TestCNatSourceNAT, self).setUp()
549 self.create_pg_interfaces(range(3))
551 for i in self.pg_interfaces:
558 self.pg0.configure_ipv6_neighbors()
559 self.pg0.configure_ipv4_neighbors()
560 self.pg1.generate_remote_hosts(2)
561 self.pg1.configure_ipv4_neighbors()
562 self.pg1.configure_ipv6_neighbors()
564 self.vapi.cli("test cnat scanner off")
565 self.vapi.cnat_set_snat_addresses(
566 snat_ip4=self.pg2.remote_hosts[0].ip4,
567 snat_ip6=self.pg2.remote_hosts[0].ip6)
568 self.vapi.feature_enable_disable(
570 arc_name="ip6-unicast",
571 feature_name="ip6-cnat-snat",
572 sw_if_index=self.pg0.sw_if_index)
573 self.vapi.feature_enable_disable(
575 arc_name="ip4-unicast",
576 feature_name="ip4-cnat-snat",
577 sw_if_index=self.pg0.sw_if_index)
580 self.vapi.cnat_session_purge()
581 for i in self.pg_interfaces:
585 super(TestCNatSourceNAT, self).tearDown()
587 def test_snat_v6(self):
588 # """ CNat Source Nat v6 """
589 self.sourcenat_test_tcp_udp_conf(TCP, isV6=True)
590 self.sourcenat_test_tcp_udp_conf(UDP, isV6=True)
591 self.sourcenat_test_icmp_err_conf(isV6=True)
592 self.sourcenat_test_icmp_echo6_conf()
594 def test_snat_v4(self):
595 # """ CNat Source Nat v4 """
596 self.sourcenat_test_tcp_udp_conf(TCP)
597 self.sourcenat_test_tcp_udp_conf(UDP)
598 self.sourcenat_test_icmp_err_conf()
599 self.sourcenat_test_icmp_echo4_conf()
601 def sourcenat_test_icmp_echo6_conf(self):
602 sports = [1234, 1235]
603 dports = [6661, 6662]
605 for nbr, remote_host in enumerate(self.pg1.remote_hosts):
606 client_addr = self.pg0.remote_hosts[0].ip6
607 remote_addr = self.pg1.remote_hosts[nbr].ip6
608 src_nat_addr = self.pg2.remote_hosts[0].ip6
610 # ping from pods to outside network
612 Ether(dst=self.pg0.local_mac,
613 src=self.pg0.remote_hosts[0].mac) /
614 IPv6(src=client_addr, dst=remote_addr) /
615 ICMPv6EchoRequest(id=0xfeed) /
618 rxs = self.send_and_expect(
624 self.assertEqual(rx[IPv6].src, src_nat_addr)
625 self.assert_packet_checksums_valid(rx)
627 received_id = rx[0][ICMPv6EchoRequest].id
628 # ping reply from outside to pods
630 Ether(dst=self.pg1.local_mac,
631 src=self.pg1.remote_hosts[nbr].mac) /
632 IPv6(src=remote_addr, dst=src_nat_addr) /
633 ICMPv6EchoReply(id=received_id))
634 rxs = self.send_and_expect(
640 self.assert_packet_checksums_valid(rx)
641 self.assertEqual(rx[IPv6].src, remote_addr)
642 self.assertEqual(rx[ICMPv6EchoReply].id, 0xfeed)
644 def sourcenat_test_icmp_echo4_conf(self):
645 sports = [1234, 1235]
646 dports = [6661, 6662]
648 for nbr, remote_host in enumerate(self.pg1.remote_hosts):
650 client_addr = self.pg0.remote_hosts[0].ip4
651 remote_addr = self.pg1.remote_hosts[nbr].ip4
652 src_nat_addr = self.pg2.remote_hosts[0].ip4
654 # ping from pods to outside network
656 Ether(dst=self.pg0.local_mac,
657 src=self.pg0.remote_hosts[0].mac) /
658 IP46(src=client_addr, dst=remote_addr) /
659 ICMP(type=8, id=0xfeed) /
662 rxs = self.send_and_expect(
668 self.assertEqual(rx[IP46].src, src_nat_addr)
669 self.assert_packet_checksums_valid(rx)
671 received_id = rx[0][ICMP].id
672 # ping reply from outside to pods
674 Ether(dst=self.pg1.local_mac,
675 src=self.pg1.remote_hosts[nbr].mac) /
676 IP46(src=remote_addr, dst=src_nat_addr) /
677 ICMP(type=0, id=received_id))
678 rxs = self.send_and_expect(
684 self.assert_packet_checksums_valid(rx)
685 self.assertEqual(rx[IP46].src, remote_addr)
686 self.assertEqual(rx[ICMP].id, 0xfeed)
688 def sourcenat_test_icmp_err_conf(self, isV6=False):
689 sports = [1234, 1235]
690 dports = [6661, 6662]
692 for nbr, remote_host in enumerate(self.pg1.remote_hosts):
695 client_addr = self.pg0.remote_hosts[0].ip6
696 remote_addr = self.pg1.remote_hosts[nbr].ip6
697 src_nat_addr = self.pg2.remote_hosts[0].ip6
698 ICMP46 = ICMPv6DestUnreach
699 ICMPelem = ICMPv6DestUnreach(code=1)
703 client_addr = self.pg0.remote_hosts[0].ip4
704 remote_addr = self.pg1.remote_hosts[nbr].ip4
705 src_nat_addr = self.pg2.remote_hosts[0].ip4
708 ICMPelem = ICMP(type=11)
710 # from pods to outside network
712 Ether(dst=self.pg0.local_mac,
713 src=self.pg0.remote_hosts[0].mac) /
714 IP46(src=client_addr, dst=remote_addr) /
715 TCP(sport=sports[nbr], dport=dports[nbr]) /
718 rxs = self.send_and_expect(
723 self.assert_packet_checksums_valid(rx)
724 self.assertEqual(rx[IP46].dst, remote_addr)
725 self.assertEqual(rx[TCP].dport, dports[nbr])
726 self.assertEqual(rx[IP46].src, src_nat_addr)
727 sport = rx[TCP].sport
729 InnerIP = rxs[0][IP46]
730 # from outside to pods, ICMP error
732 Ether(dst=self.pg1.local_mac,
733 src=self.pg1.remote_hosts[nbr].mac) /
734 IP46(src=remote_addr, dst=src_nat_addr) /
737 rxs = self.send_and_expect(
743 self.assert_packet_checksums_valid(rx)
744 self.assertEqual(rx[IP46].src, remote_addr)
745 self.assertEqual(rx[ICMP46][IP46error].src, client_addr)
746 self.assertEqual(rx[ICMP46][IP46error].dst, remote_addr)
747 self.assertEqual(rx[ICMP46][IP46error]
748 [TCPerror].sport, sports[nbr])
749 self.assertEqual(rx[ICMP46][IP46error]
750 [TCPerror].dport, dports[nbr])
752 def sourcenat_test_tcp_udp_conf(self, l4p, isV6=False):
753 sports = [1234, 1235]
754 dports = [6661, 6662]
756 for nbr, remote_host in enumerate(self.pg1.remote_hosts):
759 client_addr = self.pg0.remote_hosts[0].ip6
760 remote_addr = self.pg1.remote_hosts[nbr].ip6
761 src_nat_addr = self.pg2.remote_hosts[0].ip6
762 exclude_prefix = ip_network(
763 "%s/100" % remote_addr, strict=False)
766 client_addr = self.pg0.remote_hosts[0].ip4
767 remote_addr = self.pg1.remote_hosts[nbr].ip4
768 src_nat_addr = self.pg2.remote_hosts[0].ip4
769 exclude_prefix = ip_network(
770 "%s/16" % remote_addr, strict=False)
771 # from pods to outside network
773 Ether(dst=self.pg0.local_mac,
774 src=self.pg0.remote_hosts[0].mac) /
775 IP46(src=client_addr, dst=remote_addr) /
776 l4p(sport=sports[nbr], dport=dports[nbr]) /
779 self.vapi.cli("trace add pg-input 1")
780 rxs = self.send_and_expect(
784 self.logger.info(self.vapi.cli("show trace max 1"))
787 self.assert_packet_checksums_valid(rx)
788 self.assertEqual(rx[IP46].dst, remote_addr)
789 self.assertEqual(rx[l4p].dport, dports[nbr])
790 self.assertEqual(rx[IP46].src, src_nat_addr)
791 sport = rx[l4p].sport
793 # from outside to pods
795 Ether(dst=self.pg1.local_mac,
796 src=self.pg1.remote_hosts[nbr].mac) /
797 IP46(src=remote_addr, dst=src_nat_addr) /
798 l4p(sport=dports[nbr], dport=sport) /
801 rxs = self.send_and_expect(
807 self.assert_packet_checksums_valid(rx)
808 self.assertEqual(rx[IP46].dst, client_addr)
809 self.assertEqual(rx[l4p].dport, sports[nbr])
810 self.assertEqual(rx[l4p].sport, dports[nbr])
811 self.assertEqual(rx[IP46].src, remote_addr)
813 # add remote host to exclude list
814 self.vapi.cnat_add_del_snat_prefix(prefix=exclude_prefix, is_add=1)
815 self.vapi.cnat_session_purge()
817 rxs = self.send_and_expect(
822 self.assert_packet_checksums_valid(rx)
823 self.assertEqual(rx[IP46].dst, remote_addr)
824 self.assertEqual(rx[l4p].dport, dports[nbr])
825 self.assertEqual(rx[IP46].src, client_addr)
827 # remove remote host from exclude list
828 self.vapi.cnat_add_del_snat_prefix(prefix=exclude_prefix, is_add=0)
829 self.vapi.cnat_session_purge()
831 rxs = self.send_and_expect(
837 self.assert_packet_checksums_valid(rx)
838 self.assertEqual(rx[IP46].dst, remote_addr)
839 self.assertEqual(rx[l4p].dport, dports[nbr])
840 self.assertEqual(rx[IP46].src, src_nat_addr)
842 self.vapi.cnat_session_purge()
845 class TestCNatDHCP(VppTestCase):
846 """ CNat Translation """
847 extra_vpp_punt_config = ["cnat", "{",
848 "session-db-buckets", "64",
849 "session-cleanup-timeout", "0.1",
850 "session-max-age", "1",
852 "scanner", "off", "}"]
856 super(TestCNatDHCP, cls).setUpClass()
859 def tearDownClass(cls):
860 super(TestCNatDHCP, cls).tearDownClass()
863 for i in self.pg_interfaces:
865 super(TestCNatDHCP, self).tearDown()
867 def create_translation(self, vip_pg, *args, is_v6=False):
868 vip = Ep(sw_if_index=vip_pg.sw_if_index, is_v6=is_v6)
870 for (src_pg, dst_pg) in args:
871 paths.append(EpTuple(
872 Ep.from_pg(src_pg, is_v6=is_v6),
873 Ep.from_pg(dst_pg, is_v6=is_v6)
875 t1 = VppCNatTranslation(self, TCP, vip, paths)
879 def make_addr(self, sw_if_index, i, is_v6):
881 return "fd01:%x::%u" % (sw_if_index, i + 1)
883 return "172.16.%u.%u" % (sw_if_index, i)
885 def make_prefix(self, sw_if_index, i, is_v6):
887 return "%s/128" % self.make_addr(sw_if_index, i, is_v6)
889 return "%s/32" % self.make_addr(sw_if_index, i, is_v6)
891 def check_resolved(self, tr, vip_pg, *args, i=0, is_v6=False):
892 qt1 = tr.query_vpp_config()
893 self.assertEqual(str(qt1.vip.addr), self.make_addr(
894 vip_pg.sw_if_index, i, is_v6))
895 for (src_pg, dst_pg), path in zip(args, qt1.paths):
897 self.assertEqual(str(path.src_ep.addr), self.make_addr(
898 src_pg.sw_if_index, i, is_v6))
900 self.assertEqual(str(path.dst_ep.addr), self.make_addr(
901 dst_pg.sw_if_index, i, is_v6))
903 def config_ips(self, rng, is_add=1, is_v6=False):
904 for pg, i in product(self.pg_interfaces, rng):
905 self.vapi.sw_interface_add_del_address(
906 sw_if_index=pg.sw_if_index,
907 prefix=self.make_prefix(pg.sw_if_index, i, is_v6),
910 def test_dhcp_v4(self):
911 self.create_pg_interfaces(range(5))
912 for i in self.pg_interfaces:
914 pglist = (self.pg0, (self.pg1, self.pg2), (self.pg1, self.pg4))
915 t1 = self.create_translation(*pglist)
917 self.check_resolved(t1, *pglist)
919 self.config_ips([0], is_add=0)
920 self.check_resolved(t1, *pglist, i=1)
921 self.config_ips([1], is_add=0)
922 t1.remove_vpp_config()
924 def test_dhcp_v6(self):
925 self.create_pg_interfaces(range(5))
926 for i in self.pg_interfaces:
928 pglist = (self.pg0, (self.pg1, self.pg2), (self.pg1, self.pg4))
929 t1 = self.create_translation(*pglist, is_v6=True)
930 self.config_ips([0], is_v6=True)
931 self.check_resolved(t1, *pglist, is_v6=True)
932 self.config_ips([1], is_v6=True)
933 self.config_ips([0], is_add=0, is_v6=True)
934 self.check_resolved(t1, *pglist, i=1, is_v6=True)
935 self.config_ips([1], is_add=0, is_v6=True)
936 t1.remove_vpp_config()
938 def test_dhcp_snat(self):
939 self.create_pg_interfaces(range(1))
940 for i in self.pg_interfaces:
942 self.vapi.cnat_set_snat_addresses(sw_if_index=self.pg0.sw_if_index)
943 self.config_ips([0], is_v6=False)
944 self.config_ips([0], is_v6=True)
945 r = self.vapi.cnat_get_snat_addresses()
946 self.assertEqual(str(r.snat_ip4), self.make_addr(
947 self.pg0.sw_if_index, 0, False))
948 self.assertEqual(str(r.snat_ip6), self.make_addr(
949 self.pg0.sw_if_index, 0, True))
950 self.config_ips([1], is_v6=False)
951 self.config_ips([1], is_v6=True)
952 self.config_ips([0], is_add=0, is_v6=False)
953 self.config_ips([0], is_add=0, is_v6=True)
954 r = self.vapi.cnat_get_snat_addresses()
955 self.assertEqual(str(r.snat_ip4), self.make_addr(
956 self.pg0.sw_if_index, 1, False))
957 self.assertEqual(str(r.snat_ip6), self.make_addr(
958 self.pg0.sw_if_index, 1, True))
959 self.config_ips([1], is_add=0, is_v6=False)
960 self.config_ips([1], is_add=0, is_v6=True)
963 if __name__ == '__main__':
964 unittest.main(testRunner=VppTestRunner)