5 from framework import VppTestCase, VppTestRunner
6 from vpp_ip import DpoProto, INVALID_INDEX
7 from itertools import product
9 from scapy.packet import Raw
10 from scapy.layers.l2 import Ether
11 from scapy.layers.inet import IP, UDP, TCP, ICMP
12 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
13 from scapy.layers.inet6 import IPv6, IPerror6, ICMPv6DestUnreach
14 from scapy.layers.inet6 import ICMPv6EchoRequest, ICMPv6EchoReply
18 from ipaddress import ip_address, ip_network, \
19 IPv4Address, IPv6Address, IPv4Network, IPv6Network
21 from vpp_object import VppObject
22 from vpp_papi import VppEnum
30 def __init__(self, ip=None, port=0, l4p=TCP,
31 sw_if_index=INVALID_INDEX, is_v6=False):
34 self.ip = "::" if is_v6 else "0.0.0.0"
37 self.sw_if_index = sw_if_index
39 self.if_af = VppEnum.vl_api_address_family_t.ADDRESS_IP6
41 self.if_af = VppEnum.vl_api_address_family_t.ADDRESS_IP4
44 return {'addr': self.ip,
46 'sw_if_index': self.sw_if_index,
50 def from_pg(cls, pg, is_v6=False):
52 return cls(is_v6=is_v6)
54 return cls(sw_if_index=pg.sw_if_index, is_v6=is_v6)
61 return ("%s:%d" % (self.ip, self.port))
64 class EpTuple(object):
67 def __init__(self, src, dst):
72 return {'src_ep': self.src.encode(),
73 'dst_ep': self.dst.encode()}
76 return ("%s->%s" % (self.src, self.dst))
79 class VppCNatTranslation(VppObject):
81 def __init__(self, test, iproto, vip, paths):
86 self.encoded_paths = []
87 for path in self.paths:
88 self.encoded_paths.append(path.encode())
91 return ("%s %s %s" % (self.vip, self.iproto, self.paths))
95 ip_proto = VppEnum.vl_api_ip_proto_t
97 UDP: ip_proto.IP_API_PROTO_UDP,
98 TCP: ip_proto.IP_API_PROTO_TCP,
101 def add_vpp_config(self):
102 r = self._test.vapi.cnat_translation_update(
103 {'vip': self.vip.encode(),
104 'ip_proto': self.vl4_proto,
105 'n_paths': len(self.paths),
106 'paths': self.encoded_paths})
107 self._test.registry.register(self, self._test.logger)
110 def modify_vpp_config(self, paths):
112 self.encoded_paths = []
113 for path in self.paths:
114 self.encoded_paths.append(path.encode())
116 r = self._test.vapi.cnat_translation_update(
117 {'vip': self.vip.encode(),
118 'ip_proto': self.vl4_proto,
119 'n_paths': len(self.paths),
120 'paths': self.encoded_paths})
121 self._test.registry.register(self, self._test.logger)
123 def remove_vpp_config(self):
124 self._test.vapi.cnat_translation_del(id=self.id)
126 def query_vpp_config(self):
127 for t in self._test.vapi.cnat_translation_dump():
128 if self.id == t.translation.id:
133 return ("cnat-translation-%s" % (self.vip))
136 c = self._test.statistics.get_counter("/net/cnat-translation")
140 class TestCNatTranslation(VppTestCase):
141 """ CNat Translation """
142 extra_vpp_punt_config = ["cnat", "{",
143 "session-db-buckets", "64",
144 "session-cleanup-timeout", "0.1",
145 "session-max-age", "1",
147 "scanner", "off", "}"]
151 super(TestCNatTranslation, cls).setUpClass()
154 def tearDownClass(cls):
155 super(TestCNatTranslation, cls).tearDownClass()
158 super(TestCNatTranslation, self).setUp()
160 self.create_pg_interfaces(range(3))
162 for i in self.pg_interfaces:
170 for i in self.pg_interfaces:
174 super(TestCNatTranslation, self).tearDown()
176 def cnat_create_translation(self, vip, nbr):
177 ip_v = "ip6" if vip.isV6 else "ip4"
178 dep = Ep(getattr(self.pg1.remote_hosts[nbr], ip_v), 4000 + nbr)
179 sep = Ep("::", 0) if vip.isV6 else Ep("0.0.0.0", 0)
180 t1 = VppCNatTranslation(
182 [EpTuple(sep, dep), EpTuple(sep, dep)])
186 def cnat_test_translation(self, t1, nbr, sports, isV6=False):
187 ip_v = "ip6" if isV6 else "ip4"
188 ip_class = IPv6 if isV6 else IP
194 for src in self.pg0.remote_hosts:
197 p1 = (Ether(dst=self.pg0.local_mac,
199 ip_class(src=getattr(src, ip_v), dst=vip.ip) /
200 vip.l4p(sport=sport, dport=vip.port) /
203 self.vapi.cli("trace add pg-input 1")
204 rxs = self.send_and_expect(self.pg0,
207 self.logger.info(self.vapi.cli("show trace max 1"))
210 self.assert_packet_checksums_valid(rx)
213 getattr(self.pg1.remote_hosts[nbr], ip_v))
214 self.assertEqual(rx[vip.l4p].dport, 4000 + nbr)
218 self.assertEqual(rx[vip.l4p].sport, sport)
221 p1 = (Ether(dst=self.pg1.local_mac,
222 src=self.pg1.remote_mac) /
223 ip_class(src=getattr(
224 self.pg1.remote_hosts[nbr],
226 dst=getattr(src, ip_v)) /
227 vip.l4p(sport=4000 + nbr, dport=sport) /
230 rxs = self.send_and_expect(self.pg1,
235 self.assert_packet_checksums_valid(rx)
239 self.assertEqual(rx[vip.l4p].dport, sport)
240 self.assertEqual(rx[ip_class].src, vip.ip)
241 self.assertEqual(rx[vip.l4p].sport, vip.port)
244 # packets to the VIP that do not match a
245 # translation are dropped
247 p1 = (Ether(dst=self.pg0.local_mac,
249 ip_class(src=getattr(src, ip_v), dst=vip.ip) /
250 vip.l4p(sport=sport, dport=6666) /
253 self.send_and_assert_no_replies(self.pg0,
258 # packets from the VIP that do not match a
259 # session are forwarded
261 p1 = (Ether(dst=self.pg1.local_mac,
262 src=self.pg1.remote_mac) /
263 ip_class(src=getattr(
264 self.pg1.remote_hosts[nbr],
266 dst=getattr(src, ip_v)) /
267 vip.l4p(sport=6666, dport=sport) /
270 rxs = self.send_and_expect(self.pg1,
274 def cnat_test_translation_update(self, t1, sports, isV6=False):
275 ip_v = "ip6" if isV6 else "ip4"
276 ip_class = IPv6 if isV6 else IP
280 # modify the translation to use a different backend
282 dep = Ep(getattr(self.pg2, 'remote_' + ip_v), 5000)
283 sep = Ep("::", 0) if isV6 else Ep("0.0.0.0", 0)
284 t1.modify_vpp_config([EpTuple(sep, dep)])
287 # existing flows follow the old path
289 for src in self.pg0.remote_hosts:
292 p1 = (Ether(dst=self.pg0.local_mac,
294 ip_class(src=getattr(src, ip_v), dst=vip.ip) /
295 vip.l4p(sport=sport, dport=vip.port) /
298 rxs = self.send_and_expect(self.pg0,
303 # new flows go to the new backend
305 for src in self.pg0.remote_hosts:
306 p1 = (Ether(dst=self.pg0.local_mac,
308 ip_class(src=getattr(src, ip_v), dst=vip.ip) /
309 vip.l4p(sport=9999, dport=vip.port) /
312 rxs = self.send_and_expect(self.pg0,
316 def cnat_translation(self, vips, isV6=False):
317 """ CNat Translation """
319 ip_class = IPv6 if isV6 else IP
320 ip_v = "ip6" if isV6 else "ip4"
321 sports = [1234, 1233]
324 # turn the scanner off whilst testing otherwise sessions
327 self.vapi.cli("test cnat scanner off")
329 sessions = self.vapi.cnat_session_dump()
332 for nbr, vip in enumerate(vips):
333 trs.append(self.cnat_create_translation(vip, nbr))
335 self.logger.info(self.vapi.cli("sh cnat client"))
336 self.logger.info(self.vapi.cli("sh cnat translation"))
341 for nbr, vip in enumerate(vips):
342 self.cnat_test_translation(trs[nbr], nbr, sports, isV6=isV6)
343 self.cnat_test_translation_update(trs[nbr], sports, isV6=isV6)
345 self.logger.info(self.vapi.cli(
346 "sh ip6 fib %s" % self.pg0.remote_ip6))
348 self.logger.info(self.vapi.cli(
349 "sh ip fib %s" % self.pg0.remote_ip4))
350 self.logger.info(self.vapi.cli("sh cnat session verbose"))
353 # turn the scanner back on and wait until the sessions
356 self.vapi.cli("test cnat scanner on")
359 sessions = self.vapi.cnat_session_dump()
360 while (len(sessions) and n_tries < 100):
362 sessions = self.vapi.cnat_session_dump()
364 self.logger.info(self.vapi.cli("show cnat session verbose"))
366 self.assertTrue(n_tries < 100)
367 self.vapi.cli("test cnat scanner off")
370 # load some flows again and purge
373 for src in self.pg0.remote_hosts:
376 p1 = (Ether(dst=self.pg0.local_mac,
378 ip_class(src=getattr(src, ip_v), dst=vip.ip) /
379 vip.l4p(sport=sport, dport=vip.port) /
381 self.send_and_expect(self.pg0,
386 tr.remove_vpp_config()
388 self.assertTrue(self.vapi.cnat_session_dump())
389 self.vapi.cnat_session_purge()
390 self.assertFalse(self.vapi.cnat_session_dump())
394 Ep("30.0.0.1", 5555),
395 Ep("30.0.0.2", 5554),
396 Ep("30.0.0.2", 5553, UDP),
398 Ep("30::2", 5553, UDP),
402 self.pg0.generate_remote_hosts(len(vips))
403 self.pg0.configure_ipv6_neighbors()
404 self.pg0.configure_ipv4_neighbors()
406 self.pg1.generate_remote_hosts(len(vips))
407 self.pg1.configure_ipv6_neighbors()
408 self.pg1.configure_ipv4_neighbors()
410 self.vapi.cli("test cnat scanner off")
412 for nbr, vip in enumerate(vips):
413 trs.append(self.cnat_create_translation(vip, nbr))
415 self.logger.info(self.vapi.cli("sh cnat client"))
416 self.logger.info(self.vapi.cli("sh cnat translation"))
418 for nbr, vip in enumerate(vips):
420 client_addr = self.pg0.remote_hosts[0].ip6
421 remote_addr = self.pg1.remote_hosts[nbr].ip6
422 remote2_addr = self.pg2.remote_hosts[0].ip6
424 client_addr = self.pg0.remote_hosts[0].ip4
425 remote_addr = self.pg1.remote_hosts[nbr].ip4
426 remote2_addr = self.pg2.remote_hosts[0].ip4
427 IP46 = IPv6 if vip.isV6 else IP
429 p1 = (Ether(dst=self.pg0.local_mac,
430 src=self.pg0.remote_hosts[0].mac) /
431 IP46(src=client_addr, dst=vip.ip) /
432 vip.l4p(sport=sport, dport=vip.port) /
435 rxs = self.send_and_expect(self.pg0,
440 self.assert_packet_checksums_valid(rx)
441 self.assertEqual(rx[IP46].dst, remote_addr)
442 self.assertEqual(rx[vip.l4p].dport, 4000 + nbr)
443 self.assertEqual(rx[IP46].src, client_addr)
444 self.assertEqual(rx[vip.l4p].sport, sport)
446 InnerIP = rxs[0][IP46]
448 ICMP46 = ICMPv6DestUnreach if vip.isV6 else ICMP
449 ICMPelem = ICMPv6DestUnreach(code=1) if vip.isV6 else ICMP(type=11)
450 # from vip to client, ICMP error
451 p1 = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
452 IP46(src=remote_addr, dst=client_addr) /
455 rxs = self.send_and_expect(self.pg1,
459 TCPUDPError = TCPerror if vip.l4p == TCP else UDPerror
460 IP46error = IPerror6 if vip.isV6 else IPerror
462 self.assert_packet_checksums_valid(rx)
463 self.assertEqual(rx[IP46].src, vip.ip)
464 self.assertEqual(rx[ICMP46][IP46error].src, client_addr)
465 self.assertEqual(rx[ICMP46][IP46error].dst, vip.ip)
466 self.assertEqual(rx[ICMP46][IP46error]
467 [TCPUDPError].sport, sport)
468 self.assertEqual(rx[ICMP46][IP46error]
469 [TCPUDPError].dport, vip.port)
471 # from other remote to client, ICMP error
472 # outside shouldn't be NAT-ed
473 p1 = (Ether(dst=self.pg2.local_mac, src=self.pg2.remote_mac) /
474 IP46(src=remote2_addr, dst=client_addr) /
477 rxs = self.send_and_expect(self.pg1,
481 TCPUDPError = TCPerror if vip.l4p == TCP else UDPerror
482 IP46error = IPerror6 if vip.isV6 else IPerror
484 self.assert_packet_checksums_valid(rx)
485 self.assertEqual(rx[IP46].src, remote2_addr)
486 self.assertEqual(rx[ICMP46][IP46error].src, client_addr)
487 self.assertEqual(rx[ICMP46][IP46error].dst, vip.ip)
488 self.assertEqual(rx[ICMP46][IP46error]
489 [TCPUDPError].sport, sport)
490 self.assertEqual(rx[ICMP46][IP46error]
491 [TCPUDPError].dport, vip.port)
493 self.vapi.cnat_session_purge()
495 def test_cnat6(self):
496 # """ CNat Translation ipv6 """
500 Ep("30::2", 5553, UDP),
503 self.pg0.generate_remote_hosts(len(vips))
504 self.pg0.configure_ipv6_neighbors()
505 self.pg1.generate_remote_hosts(len(vips))
506 self.pg1.configure_ipv6_neighbors()
508 self.cnat_translation(vips, isV6=True)
510 def test_cnat4(self):
511 # """ CNat Translation ipv4 """
514 Ep("30.0.0.1", 5555),
515 Ep("30.0.0.2", 5554),
516 Ep("30.0.0.2", 5553, UDP),
519 self.pg0.generate_remote_hosts(len(vips))
520 self.pg0.configure_ipv4_neighbors()
521 self.pg1.generate_remote_hosts(len(vips))
522 self.pg1.configure_ipv4_neighbors()
524 self.cnat_translation(vips)
527 class TestCNatSourceNAT(VppTestCase):
528 """ CNat Source NAT """
529 extra_vpp_punt_config = ["cnat", "{",
530 "session-max-age", "1",
531 "tcp-max-age", "1", "}"]
535 super(TestCNatSourceNAT, cls).setUpClass()
538 def tearDownClass(cls):
539 super(TestCNatSourceNAT, cls).tearDownClass()
542 super(TestCNatSourceNAT, self).setUp()
544 self.create_pg_interfaces(range(3))
546 for i in self.pg_interfaces:
553 self.pg0.configure_ipv6_neighbors()
554 self.pg0.configure_ipv4_neighbors()
555 self.pg1.generate_remote_hosts(2)
556 self.pg1.configure_ipv4_neighbors()
557 self.pg1.configure_ipv6_neighbors()
559 self.vapi.cli("test cnat scanner off")
560 self.vapi.cnat_set_snat_addresses(
561 snat_ip4=self.pg2.remote_hosts[0].ip4,
562 snat_ip6=self.pg2.remote_hosts[0].ip6)
563 self.vapi.feature_enable_disable(
565 arc_name="ip6-unicast",
566 feature_name="cnat-snat-ip6",
567 sw_if_index=self.pg0.sw_if_index)
568 self.vapi.feature_enable_disable(
570 arc_name="ip4-unicast",
571 feature_name="cnat-snat-ip4",
572 sw_if_index=self.pg0.sw_if_index)
575 self.vapi.cnat_session_purge()
576 for i in self.pg_interfaces:
580 super(TestCNatSourceNAT, self).tearDown()
582 def test_snat_v6(self):
583 # """ CNat Source Nat v6 """
584 self.sourcenat_test_tcp_udp_conf(TCP, isV6=True)
585 self.sourcenat_test_tcp_udp_conf(UDP, isV6=True)
586 self.sourcenat_test_icmp_err_conf(isV6=True)
587 self.sourcenat_test_icmp_echo6_conf()
589 def test_snat_v4(self):
590 # """ CNat Source Nat v4 """
591 self.sourcenat_test_tcp_udp_conf(TCP)
592 self.sourcenat_test_tcp_udp_conf(UDP)
593 self.sourcenat_test_icmp_err_conf()
594 self.sourcenat_test_icmp_echo4_conf()
596 def sourcenat_test_icmp_echo6_conf(self):
597 sports = [1234, 1235]
598 dports = [6661, 6662]
600 for nbr, remote_host in enumerate(self.pg1.remote_hosts):
601 client_addr = self.pg0.remote_hosts[0].ip6
602 remote_addr = self.pg1.remote_hosts[nbr].ip6
603 src_nat_addr = self.pg2.remote_hosts[0].ip6
605 # ping from pods to outside network
607 Ether(dst=self.pg0.local_mac,
608 src=self.pg0.remote_hosts[0].mac) /
609 IPv6(src=client_addr, dst=remote_addr) /
610 ICMPv6EchoRequest(id=0xfeed) /
613 rxs = self.send_and_expect(
619 self.assertEqual(rx[IPv6].src, src_nat_addr)
620 self.assert_packet_checksums_valid(rx)
622 received_id = rx[0][ICMPv6EchoRequest].id
623 # ping reply from outside to pods
625 Ether(dst=self.pg1.local_mac,
626 src=self.pg1.remote_hosts[nbr].mac) /
627 IPv6(src=remote_addr, dst=src_nat_addr) /
628 ICMPv6EchoReply(id=received_id))
629 rxs = self.send_and_expect(
635 self.assert_packet_checksums_valid(rx)
636 self.assertEqual(rx[IPv6].src, remote_addr)
637 self.assertEqual(rx[ICMPv6EchoReply].id, 0xfeed)
639 def sourcenat_test_icmp_echo4_conf(self):
640 sports = [1234, 1235]
641 dports = [6661, 6662]
643 for nbr, remote_host in enumerate(self.pg1.remote_hosts):
645 client_addr = self.pg0.remote_hosts[0].ip4
646 remote_addr = self.pg1.remote_hosts[nbr].ip4
647 src_nat_addr = self.pg2.remote_hosts[0].ip4
649 # ping from pods to outside network
651 Ether(dst=self.pg0.local_mac,
652 src=self.pg0.remote_hosts[0].mac) /
653 IP46(src=client_addr, dst=remote_addr) /
654 ICMP(type=8, id=0xfeed) /
657 rxs = self.send_and_expect(
663 self.assertEqual(rx[IP46].src, src_nat_addr)
664 self.assert_packet_checksums_valid(rx)
666 received_id = rx[0][ICMP].id
667 # ping reply from outside to pods
669 Ether(dst=self.pg1.local_mac,
670 src=self.pg1.remote_hosts[nbr].mac) /
671 IP46(src=remote_addr, dst=src_nat_addr) /
672 ICMP(type=0, id=received_id))
673 rxs = self.send_and_expect(
679 self.assert_packet_checksums_valid(rx)
680 self.assertEqual(rx[IP46].src, remote_addr)
681 self.assertEqual(rx[ICMP].id, 0xfeed)
683 def sourcenat_test_icmp_err_conf(self, isV6=False):
684 sports = [1234, 1235]
685 dports = [6661, 6662]
687 for nbr, remote_host in enumerate(self.pg1.remote_hosts):
690 client_addr = self.pg0.remote_hosts[0].ip6
691 remote_addr = self.pg1.remote_hosts[nbr].ip6
692 src_nat_addr = self.pg2.remote_hosts[0].ip6
693 ICMP46 = ICMPv6DestUnreach
694 ICMPelem = ICMPv6DestUnreach(code=1)
698 client_addr = self.pg0.remote_hosts[0].ip4
699 remote_addr = self.pg1.remote_hosts[nbr].ip4
700 src_nat_addr = self.pg2.remote_hosts[0].ip4
703 ICMPelem = ICMP(type=11)
705 # from pods to outside network
707 Ether(dst=self.pg0.local_mac,
708 src=self.pg0.remote_hosts[0].mac) /
709 IP46(src=client_addr, dst=remote_addr) /
710 TCP(sport=sports[nbr], dport=dports[nbr]) /
713 rxs = self.send_and_expect(
718 self.assert_packet_checksums_valid(rx)
719 self.assertEqual(rx[IP46].dst, remote_addr)
720 self.assertEqual(rx[TCP].dport, dports[nbr])
721 self.assertEqual(rx[IP46].src, src_nat_addr)
722 sport = rx[TCP].sport
724 InnerIP = rxs[0][IP46]
725 # from outside to pods, ICMP error
727 Ether(dst=self.pg1.local_mac,
728 src=self.pg1.remote_hosts[nbr].mac) /
729 IP46(src=remote_addr, dst=src_nat_addr) /
732 rxs = self.send_and_expect(
738 self.assert_packet_checksums_valid(rx)
739 self.assertEqual(rx[IP46].src, remote_addr)
740 self.assertEqual(rx[ICMP46][IP46error].src, client_addr)
741 self.assertEqual(rx[ICMP46][IP46error].dst, remote_addr)
742 self.assertEqual(rx[ICMP46][IP46error]
743 [TCPerror].sport, sports[nbr])
744 self.assertEqual(rx[ICMP46][IP46error]
745 [TCPerror].dport, dports[nbr])
747 def sourcenat_test_tcp_udp_conf(self, l4p, isV6=False):
748 sports = [1234, 1235]
749 dports = [6661, 6662]
751 for nbr, remote_host in enumerate(self.pg1.remote_hosts):
754 client_addr = self.pg0.remote_hosts[0].ip6
755 remote_addr = self.pg1.remote_hosts[nbr].ip6
756 src_nat_addr = self.pg2.remote_hosts[0].ip6
757 exclude_prefix = ip_network(
758 "%s/100" % remote_addr, strict=False)
761 client_addr = self.pg0.remote_hosts[0].ip4
762 remote_addr = self.pg1.remote_hosts[nbr].ip4
763 src_nat_addr = self.pg2.remote_hosts[0].ip4
764 exclude_prefix = ip_network(
765 "%s/16" % remote_addr, strict=False)
766 # from pods to outside network
768 Ether(dst=self.pg0.local_mac,
769 src=self.pg0.remote_hosts[0].mac) /
770 IP46(src=client_addr, dst=remote_addr) /
771 l4p(sport=sports[nbr], dport=dports[nbr]) /
774 self.vapi.cli("trace add pg-input 1")
775 rxs = self.send_and_expect(
779 self.logger.info(self.vapi.cli("show trace max 1"))
782 self.assert_packet_checksums_valid(rx)
783 self.assertEqual(rx[IP46].dst, remote_addr)
784 self.assertEqual(rx[l4p].dport, dports[nbr])
785 self.assertEqual(rx[IP46].src, src_nat_addr)
786 sport = rx[l4p].sport
788 # from outside to pods
790 Ether(dst=self.pg1.local_mac,
791 src=self.pg1.remote_hosts[nbr].mac) /
792 IP46(src=remote_addr, dst=src_nat_addr) /
793 l4p(sport=dports[nbr], dport=sport) /
796 rxs = self.send_and_expect(
802 self.assert_packet_checksums_valid(rx)
803 self.assertEqual(rx[IP46].dst, client_addr)
804 self.assertEqual(rx[l4p].dport, sports[nbr])
805 self.assertEqual(rx[l4p].sport, dports[nbr])
806 self.assertEqual(rx[IP46].src, remote_addr)
808 # add remote host to exclude list
809 self.vapi.cnat_add_del_snat_prefix(prefix=exclude_prefix, is_add=1)
810 self.vapi.cnat_session_purge()
812 rxs = self.send_and_expect(
817 self.assert_packet_checksums_valid(rx)
818 self.assertEqual(rx[IP46].dst, remote_addr)
819 self.assertEqual(rx[l4p].dport, dports[nbr])
820 self.assertEqual(rx[IP46].src, client_addr)
822 # remove remote host from exclude list
823 self.vapi.cnat_add_del_snat_prefix(prefix=exclude_prefix, is_add=0)
824 self.vapi.cnat_session_purge()
826 rxs = self.send_and_expect(
832 self.assert_packet_checksums_valid(rx)
833 self.assertEqual(rx[IP46].dst, remote_addr)
834 self.assertEqual(rx[l4p].dport, dports[nbr])
835 self.assertEqual(rx[IP46].src, src_nat_addr)
837 self.vapi.cnat_session_purge()
840 class TestCNatDHCP(VppTestCase):
841 """ CNat Translation """
842 extra_vpp_punt_config = ["cnat", "{",
843 "session-db-buckets", "64",
844 "session-cleanup-timeout", "0.1",
845 "session-max-age", "1",
847 "scanner", "off", "}"]
851 super(TestCNatDHCP, cls).setUpClass()
854 def tearDownClass(cls):
855 super(TestCNatDHCP, cls).tearDownClass()
858 for i in self.pg_interfaces:
860 super(TestCNatDHCP, self).tearDown()
862 def create_translation(self, vip_pg, *args, is_v6=False):
863 vip = Ep(sw_if_index=vip_pg.sw_if_index, is_v6=is_v6)
865 for (src_pg, dst_pg) in args:
866 paths.append(EpTuple(
867 Ep.from_pg(src_pg, is_v6=is_v6),
868 Ep.from_pg(dst_pg, is_v6=is_v6)
870 t1 = VppCNatTranslation(self, TCP, vip, paths)
874 def make_addr(self, sw_if_index, i, is_v6):
876 return "fd01:%x::%u" % (sw_if_index, i + 1)
878 return "172.16.%u.%u" % (sw_if_index, i)
880 def make_prefix(self, sw_if_index, i, is_v6):
882 return "%s/128" % self.make_addr(sw_if_index, i, is_v6)
884 return "%s/32" % self.make_addr(sw_if_index, i, is_v6)
886 def check_resolved(self, tr, vip_pg, *args, i=0, is_v6=False):
887 qt1 = tr.query_vpp_config()
888 self.assertEqual(str(qt1.vip.addr), self.make_addr(
889 vip_pg.sw_if_index, i, is_v6))
890 for (src_pg, dst_pg), path in zip(args, qt1.paths):
892 self.assertEqual(str(path.src_ep.addr), self.make_addr(
893 src_pg.sw_if_index, i, is_v6))
895 self.assertEqual(str(path.dst_ep.addr), self.make_addr(
896 dst_pg.sw_if_index, i, is_v6))
898 def config_ips(self, rng, is_add=1, is_v6=False):
899 for pg, i in product(self.pg_interfaces, rng):
900 self.vapi.sw_interface_add_del_address(
901 sw_if_index=pg.sw_if_index,
902 prefix=self.make_prefix(pg.sw_if_index, i, is_v6),
905 def test_dhcp_v4(self):
906 self.create_pg_interfaces(range(5))
907 for i in self.pg_interfaces:
909 pglist = (self.pg0, (self.pg1, self.pg2), (self.pg1, self.pg4))
910 t1 = self.create_translation(*pglist)
912 self.check_resolved(t1, *pglist)
914 self.config_ips([0], is_add=0)
915 self.check_resolved(t1, *pglist, i=1)
916 self.config_ips([1], is_add=0)
917 t1.remove_vpp_config()
919 def test_dhcp_v6(self):
920 self.create_pg_interfaces(range(5))
921 for i in self.pg_interfaces:
923 pglist = (self.pg0, (self.pg1, self.pg2), (self.pg1, self.pg4))
924 t1 = self.create_translation(*pglist, is_v6=True)
925 self.config_ips([0], is_v6=True)
926 self.check_resolved(t1, *pglist, is_v6=True)
927 self.config_ips([1], is_v6=True)
928 self.config_ips([0], is_add=0, is_v6=True)
929 self.check_resolved(t1, *pglist, i=1, is_v6=True)
930 self.config_ips([1], is_add=0, is_v6=True)
931 t1.remove_vpp_config()
933 def test_dhcp_snat(self):
934 self.create_pg_interfaces(range(1))
935 for i in self.pg_interfaces:
937 self.vapi.cnat_set_snat_addresses(sw_if_index=self.pg0.sw_if_index)
938 self.config_ips([0], is_v6=False)
939 self.config_ips([0], is_v6=True)
940 r = self.vapi.cnat_get_snat_addresses()
941 self.assertEqual(str(r.snat_ip4), self.make_addr(
942 self.pg0.sw_if_index, 0, False))
943 self.assertEqual(str(r.snat_ip6), self.make_addr(
944 self.pg0.sw_if_index, 0, True))
945 self.config_ips([1], is_v6=False)
946 self.config_ips([1], is_v6=True)
947 self.config_ips([0], is_add=0, is_v6=False)
948 self.config_ips([0], is_add=0, is_v6=True)
949 r = self.vapi.cnat_get_snat_addresses()
950 self.assertEqual(str(r.snat_ip4), self.make_addr(
951 self.pg0.sw_if_index, 1, False))
952 self.assertEqual(str(r.snat_ip6), self.make_addr(
953 self.pg0.sw_if_index, 1, True))
954 self.config_ips([1], is_add=0, is_v6=False)
955 self.config_ips([1], is_add=0, is_v6=True)
958 if __name__ == '__main__':
959 unittest.main(testRunner=VppTestRunner)