5 from framework import VppTestCase, VppTestRunner
6 from vpp_ip import DpoProto, INVALID_INDEX
7 from itertools import product
9 from scapy.packet import Raw
10 from scapy.layers.l2 import Ether
11 from scapy.layers.inet import IP, UDP, TCP, ICMP
12 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
13 from scapy.layers.inet6 import IPv6, IPerror6, ICMPv6DestUnreach
14 from scapy.layers.inet6 import ICMPv6EchoRequest, ICMPv6EchoReply
18 from ipaddress import ip_address, ip_network, \
19 IPv4Address, IPv6Address, IPv4Network, IPv6Network
21 from vpp_object import VppObject
22 from vpp_papi import VppEnum
30 def __init__(self, ip=None, port=0, l4p=TCP,
31 sw_if_index=INVALID_INDEX, is_v6=False):
34 self.ip = "::" if is_v6 else "0.0.0.0"
37 self.sw_if_index = sw_if_index
39 self.if_af = VppEnum.vl_api_address_family_t.ADDRESS_IP6
41 self.if_af = VppEnum.vl_api_address_family_t.ADDRESS_IP4
44 return {'addr': self.ip,
46 'sw_if_index': self.sw_if_index,
50 def from_pg(cls, pg, is_v6=False):
52 return cls(is_v6=is_v6)
54 return cls(sw_if_index=pg.sw_if_index, is_v6=is_v6)
61 return ("%s:%d" % (self.ip, self.port))
64 class EpTuple(object):
67 def __init__(self, src, dst):
72 return {'src_ep': self.src.encode(),
73 'dst_ep': self.dst.encode()}
76 return ("%s->%s" % (self.src, self.dst))
79 class VppCNatTranslation(VppObject):
81 def __init__(self, test, iproto, vip, paths):
86 self.encoded_paths = []
87 for path in self.paths:
88 self.encoded_paths.append(path.encode())
91 return ("%s %s %s" % (self.vip, self.iproto, self.paths))
95 ip_proto = VppEnum.vl_api_ip_proto_t
97 UDP: ip_proto.IP_API_PROTO_UDP,
98 TCP: ip_proto.IP_API_PROTO_TCP,
101 def add_vpp_config(self):
102 r = self._test.vapi.cnat_translation_update(
103 {'vip': self.vip.encode(),
104 'ip_proto': self.vl4_proto,
105 'n_paths': len(self.paths),
106 'paths': self.encoded_paths})
107 self._test.registry.register(self, self._test.logger)
110 def modify_vpp_config(self, paths):
112 self.encoded_paths = []
113 for path in self.paths:
114 self.encoded_paths.append(path.encode())
116 r = self._test.vapi.cnat_translation_update(
117 {'vip': self.vip.encode(),
118 'ip_proto': self.vl4_proto,
119 'n_paths': len(self.paths),
120 'paths': self.encoded_paths})
121 self._test.registry.register(self, self._test.logger)
123 def remove_vpp_config(self):
124 self._test.vapi.cnat_translation_del(id=self.id)
126 def query_vpp_config(self):
127 for t in self._test.vapi.cnat_translation_dump():
128 if self.id == t.translation.id:
133 return ("cnat-translation-%s" % (self.vip))
136 c = self._test.statistics.get_counter("/net/cnat-translation")
140 class TestCNatTranslation(VppTestCase):
141 """ CNat Translation """
142 extra_vpp_punt_config = ["cnat", "{",
143 "session-db-buckets", "64",
144 "session-cleanup-timeout", "0.1",
145 "session-max-age", "1",
147 "scanner", "off", "}"]
151 super(TestCNatTranslation, cls).setUpClass()
154 def tearDownClass(cls):
155 super(TestCNatTranslation, cls).tearDownClass()
158 super(TestCNatTranslation, self).setUp()
160 self.create_pg_interfaces(range(3))
162 for i in self.pg_interfaces:
170 for i in self.pg_interfaces:
174 super(TestCNatTranslation, self).tearDown()
176 def cnat_create_translation(self, vip, nbr):
177 ip_v = "ip6" if vip.isV6 else "ip4"
178 dep = Ep(getattr(self.pg1.remote_hosts[nbr], ip_v), 4000 + nbr)
179 sep = Ep("::", 0) if vip.isV6 else Ep("0.0.0.0", 0)
180 t1 = VppCNatTranslation(
182 [EpTuple(sep, dep), EpTuple(sep, dep)])
186 def cnat_test_translation(self, t1, nbr, sports, isV6=False):
187 ip_v = "ip6" if isV6 else "ip4"
188 ip_class = IPv6 if isV6 else IP
194 for src in self.pg0.remote_hosts:
197 p1 = (Ether(dst=self.pg0.local_mac,
199 ip_class(src=getattr(src, ip_v), dst=vip.ip) /
200 vip.l4p(sport=sport, dport=vip.port) /
203 self.vapi.cli("trace add pg-input 1")
204 rxs = self.send_and_expect(self.pg0,
207 self.logger.info(self.vapi.cli("show trace max 1"))
210 self.assert_packet_checksums_valid(rx)
213 getattr(self.pg1.remote_hosts[nbr], ip_v))
214 self.assertEqual(rx[vip.l4p].dport, 4000 + nbr)
218 self.assertEqual(rx[vip.l4p].sport, sport)
221 p1 = (Ether(dst=self.pg1.local_mac,
222 src=self.pg1.remote_mac) /
223 ip_class(src=getattr(
224 self.pg1.remote_hosts[nbr],
226 dst=getattr(src, ip_v)) /
227 vip.l4p(sport=4000 + nbr, dport=sport) /
230 rxs = self.send_and_expect(self.pg1,
235 self.assert_packet_checksums_valid(rx)
239 self.assertEqual(rx[vip.l4p].dport, sport)
240 self.assertEqual(rx[ip_class].src, vip.ip)
241 self.assertEqual(rx[vip.l4p].sport, vip.port)
244 # packets to the VIP that do not match a
245 # translation are dropped
247 p1 = (Ether(dst=self.pg0.local_mac,
249 ip_class(src=getattr(src, ip_v), dst=vip.ip) /
250 vip.l4p(sport=sport, dport=6666) /
253 self.send_and_assert_no_replies(self.pg0,
258 # packets from the VIP that do not match a
259 # session are forwarded
261 p1 = (Ether(dst=self.pg1.local_mac,
262 src=self.pg1.remote_mac) /
263 ip_class(src=getattr(
264 self.pg1.remote_hosts[nbr],
266 dst=getattr(src, ip_v)) /
267 vip.l4p(sport=6666, dport=sport) /
270 rxs = self.send_and_expect(self.pg1,
274 def cnat_test_translation_update(self, t1, sports, isV6=False):
275 ip_v = "ip6" if isV6 else "ip4"
276 ip_class = IPv6 if isV6 else IP
280 # modify the translation to use a different backend
282 dep = Ep(getattr(self.pg2, 'remote_' + ip_v), 5000)
283 sep = Ep("::", 0) if isV6 else Ep("0.0.0.0", 0)
284 t1.modify_vpp_config([EpTuple(sep, dep)])
287 # existing flows follow the old path
289 for src in self.pg0.remote_hosts:
292 p1 = (Ether(dst=self.pg0.local_mac,
294 ip_class(src=getattr(src, ip_v), dst=vip.ip) /
295 vip.l4p(sport=sport, dport=vip.port) /
298 rxs = self.send_and_expect(self.pg0,
303 # new flows go to the new backend
305 for src in self.pg0.remote_hosts:
306 p1 = (Ether(dst=self.pg0.local_mac,
308 ip_class(src=getattr(src, ip_v), dst=vip.ip) /
309 vip.l4p(sport=9999, dport=vip.port) /
312 rxs = self.send_and_expect(self.pg0,
316 def cnat_translation(self, vips, isV6=False):
317 """ CNat Translation """
319 ip_class = IPv6 if isV6 else IP
320 ip_v = "ip6" if isV6 else "ip4"
321 sports = [1234, 1233]
324 # turn the scanner off whilst testing otherwise sessions
327 self.vapi.cli("test cnat scanner off")
329 sessions = self.vapi.cnat_session_dump()
332 for nbr, vip in enumerate(vips):
333 trs.append(self.cnat_create_translation(vip, nbr))
335 self.logger.info(self.vapi.cli("sh cnat client"))
336 self.logger.info(self.vapi.cli("sh cnat translation"))
341 for nbr, vip in enumerate(vips):
342 self.cnat_test_translation(trs[nbr], nbr, sports, isV6=isV6)
343 self.cnat_test_translation_update(trs[nbr], sports, isV6=isV6)
345 self.logger.info(self.vapi.cli(
346 "sh ip6 fib %s" % self.pg0.remote_ip6))
348 self.logger.info(self.vapi.cli(
349 "sh ip fib %s" % self.pg0.remote_ip4))
350 self.logger.info(self.vapi.cli("sh cnat session verbose"))
353 # turn the scanner back on and wait until the sessions
356 self.vapi.cli("test cnat scanner on")
359 sessions = self.vapi.cnat_session_dump()
360 while (len(sessions) and n_tries < 100):
362 sessions = self.vapi.cnat_session_dump()
364 self.logger.info(self.vapi.cli("show cnat session verbose"))
366 self.assertTrue(n_tries < 100)
367 self.vapi.cli("test cnat scanner off")
370 # load some flows again and purge
373 for src in self.pg0.remote_hosts:
376 p1 = (Ether(dst=self.pg0.local_mac,
378 ip_class(src=getattr(src, ip_v), dst=vip.ip) /
379 vip.l4p(sport=sport, dport=vip.port) /
381 self.send_and_expect(self.pg0,
386 tr.remove_vpp_config()
388 self.assertTrue(self.vapi.cnat_session_dump())
389 self.vapi.cnat_session_purge()
390 self.assertFalse(self.vapi.cnat_session_dump())
394 Ep("30.0.0.1", 5555),
395 Ep("30.0.0.2", 5554),
396 Ep("30.0.0.2", 5553, UDP),
398 Ep("30::2", 5553, UDP),
402 self.pg0.generate_remote_hosts(len(vips))
403 self.pg0.configure_ipv6_neighbors()
404 self.pg0.configure_ipv4_neighbors()
406 self.pg1.generate_remote_hosts(len(vips))
407 self.pg1.configure_ipv6_neighbors()
408 self.pg1.configure_ipv4_neighbors()
410 self.vapi.cli("test cnat scanner off")
412 for nbr, vip in enumerate(vips):
413 trs.append(self.cnat_create_translation(vip, nbr))
415 self.logger.info(self.vapi.cli("sh cnat client"))
416 self.logger.info(self.vapi.cli("sh cnat translation"))
418 for nbr, vip in enumerate(vips):
420 client_addr = self.pg0.remote_hosts[0].ip6
421 remote_addr = self.pg1.remote_hosts[nbr].ip6
422 remote2_addr = self.pg2.remote_hosts[0].ip6
424 client_addr = self.pg0.remote_hosts[0].ip4
425 remote_addr = self.pg1.remote_hosts[nbr].ip4
426 remote2_addr = self.pg2.remote_hosts[0].ip4
427 IP46 = IPv6 if vip.isV6 else IP
429 p1 = (Ether(dst=self.pg0.local_mac,
430 src=self.pg0.remote_hosts[0].mac) /
431 IP46(src=client_addr, dst=vip.ip) /
432 vip.l4p(sport=sport, dport=vip.port) /
435 rxs = self.send_and_expect(self.pg0,
440 self.assert_packet_checksums_valid(rx)
441 self.assertEqual(rx[IP46].dst, remote_addr)
442 self.assertEqual(rx[vip.l4p].dport, 4000 + nbr)
443 self.assertEqual(rx[IP46].src, client_addr)
444 self.assertEqual(rx[vip.l4p].sport, sport)
446 InnerIP = rxs[0][IP46]
448 ICMP46 = ICMPv6DestUnreach if vip.isV6 else ICMP
449 ICMPelem = ICMPv6DestUnreach(code=1) if vip.isV6 else ICMP(type=11)
450 # from vip to client, ICMP error
451 p1 = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
452 IP46(src=remote_addr, dst=client_addr) /
455 rxs = self.send_and_expect(self.pg1,
459 TCPUDPError = TCPerror if vip.l4p == TCP else UDPerror
460 IP46error = IPerror6 if vip.isV6 else IPerror
462 self.assert_packet_checksums_valid(rx)
463 self.assertEqual(rx[IP46].src, vip.ip)
464 self.assertEqual(rx[ICMP46][IP46error].src, client_addr)
465 self.assertEqual(rx[ICMP46][IP46error].dst, vip.ip)
466 self.assertEqual(rx[ICMP46][IP46error]
467 [TCPUDPError].sport, sport)
468 self.assertEqual(rx[ICMP46][IP46error]
469 [TCPUDPError].dport, vip.port)
471 # from other remote to client, ICMP error
472 # outside shouldn't be NAT-ed
473 p1 = (Ether(dst=self.pg2.local_mac, src=self.pg2.remote_mac) /
474 IP46(src=remote2_addr, dst=client_addr) /
477 rxs = self.send_and_expect(self.pg1,
481 TCPUDPError = TCPerror if vip.l4p == TCP else UDPerror
482 IP46error = IPerror6 if vip.isV6 else IPerror
484 self.assert_packet_checksums_valid(rx)
485 self.assertEqual(rx[IP46].src, remote2_addr)
486 self.assertEqual(rx[ICMP46][IP46error].src, client_addr)
487 self.assertEqual(rx[ICMP46][IP46error].dst, vip.ip)
488 self.assertEqual(rx[ICMP46][IP46error]
489 [TCPUDPError].sport, sport)
490 self.assertEqual(rx[ICMP46][IP46error]
491 [TCPUDPError].dport, vip.port)
493 self.vapi.cnat_session_purge()
495 def test_cnat6(self):
496 # """ CNat Translation ipv6 """
500 Ep("30::2", 5553, UDP),
503 self.pg0.generate_remote_hosts(len(vips))
504 self.pg0.configure_ipv6_neighbors()
505 self.pg1.generate_remote_hosts(len(vips))
506 self.pg1.configure_ipv6_neighbors()
508 self.cnat_translation(vips, isV6=True)
510 def test_cnat4(self):
511 # """ CNat Translation ipv4 """
514 Ep("30.0.0.1", 5555),
515 Ep("30.0.0.2", 5554),
516 Ep("30.0.0.2", 5553, UDP),
519 self.pg0.generate_remote_hosts(len(vips))
520 self.pg0.configure_ipv4_neighbors()
521 self.pg1.generate_remote_hosts(len(vips))
522 self.pg1.configure_ipv4_neighbors()
524 self.cnat_translation(vips)
527 class TestCNatSourceNAT(VppTestCase):
528 """ CNat Source NAT """
529 extra_vpp_punt_config = ["cnat", "{",
530 "session-cleanup-timeout", "0.1",
531 "session-max-age", "1",
533 "scanner", "off", "}"]
537 super(TestCNatSourceNAT, cls).setUpClass()
540 def tearDownClass(cls):
541 super(TestCNatSourceNAT, cls).tearDownClass()
544 super(TestCNatSourceNAT, self).setUp()
546 self.create_pg_interfaces(range(3))
548 for i in self.pg_interfaces:
555 self.pg0.configure_ipv6_neighbors()
556 self.pg0.configure_ipv4_neighbors()
557 self.pg1.generate_remote_hosts(2)
558 self.pg1.configure_ipv4_neighbors()
559 self.pg1.configure_ipv6_neighbors()
561 self.vapi.cnat_set_snat_addresses(
562 snat_ip4=self.pg2.remote_hosts[0].ip4,
563 snat_ip6=self.pg2.remote_hosts[0].ip6,
564 sw_if_index=INVALID_INDEX)
565 self.vapi.feature_enable_disable(
567 arc_name="ip6-unicast",
568 feature_name="cnat-snat-ip6",
569 sw_if_index=self.pg0.sw_if_index)
570 self.vapi.feature_enable_disable(
572 arc_name="ip4-unicast",
573 feature_name="cnat-snat-ip4",
574 sw_if_index=self.pg0.sw_if_index)
577 self.vapi.cnat_session_purge()
578 for i in self.pg_interfaces:
582 super(TestCNatSourceNAT, self).tearDown()
584 def test_snat_v6(self):
585 # """ CNat Source Nat v6 """
586 self.sourcenat_test_tcp_udp_conf(TCP, isV6=True)
587 self.sourcenat_test_tcp_udp_conf(UDP, isV6=True)
588 self.sourcenat_test_icmp_err_conf(isV6=True)
589 self.sourcenat_test_icmp_echo6_conf()
591 def test_snat_v4(self):
592 # """ CNat Source Nat v4 """
593 self.sourcenat_test_tcp_udp_conf(TCP)
594 self.sourcenat_test_tcp_udp_conf(UDP)
595 self.sourcenat_test_icmp_err_conf()
596 self.sourcenat_test_icmp_echo4_conf()
598 def sourcenat_test_icmp_echo6_conf(self):
599 sports = [1234, 1235]
600 dports = [6661, 6662]
602 for nbr, remote_host in enumerate(self.pg1.remote_hosts):
603 client_addr = self.pg0.remote_hosts[0].ip6
604 remote_addr = self.pg1.remote_hosts[nbr].ip6
605 src_nat_addr = self.pg2.remote_hosts[0].ip6
607 # ping from pods to outside network
609 Ether(dst=self.pg0.local_mac,
610 src=self.pg0.remote_hosts[0].mac) /
611 IPv6(src=client_addr, dst=remote_addr) /
612 ICMPv6EchoRequest(id=0xfeed) /
615 rxs = self.send_and_expect(
621 self.assertEqual(rx[IPv6].src, src_nat_addr)
622 self.assert_packet_checksums_valid(rx)
624 received_id = rx[0][ICMPv6EchoRequest].id
625 # ping reply from outside to pods
627 Ether(dst=self.pg1.local_mac,
628 src=self.pg1.remote_hosts[nbr].mac) /
629 IPv6(src=remote_addr, dst=src_nat_addr) /
630 ICMPv6EchoReply(id=received_id))
631 rxs = self.send_and_expect(
637 self.assert_packet_checksums_valid(rx)
638 self.assertEqual(rx[IPv6].src, remote_addr)
639 self.assertEqual(rx[ICMPv6EchoReply].id, 0xfeed)
641 def sourcenat_test_icmp_echo4_conf(self):
642 sports = [1234, 1235]
643 dports = [6661, 6662]
645 for nbr, remote_host in enumerate(self.pg1.remote_hosts):
647 client_addr = self.pg0.remote_hosts[0].ip4
648 remote_addr = self.pg1.remote_hosts[nbr].ip4
649 src_nat_addr = self.pg2.remote_hosts[0].ip4
651 # ping from pods to outside network
653 Ether(dst=self.pg0.local_mac,
654 src=self.pg0.remote_hosts[0].mac) /
655 IP46(src=client_addr, dst=remote_addr) /
656 ICMP(type=8, id=0xfeed) /
659 rxs = self.send_and_expect(
665 self.assertEqual(rx[IP46].src, src_nat_addr)
666 self.assert_packet_checksums_valid(rx)
668 received_id = rx[0][ICMP].id
669 # ping reply from outside to pods
671 Ether(dst=self.pg1.local_mac,
672 src=self.pg1.remote_hosts[nbr].mac) /
673 IP46(src=remote_addr, dst=src_nat_addr) /
674 ICMP(type=0, id=received_id))
675 rxs = self.send_and_expect(
681 self.assert_packet_checksums_valid(rx)
682 self.assertEqual(rx[IP46].src, remote_addr)
683 self.assertEqual(rx[ICMP].id, 0xfeed)
685 def sourcenat_test_icmp_err_conf(self, isV6=False):
686 sports = [1234, 1235]
687 dports = [6661, 6662]
689 for nbr, remote_host in enumerate(self.pg1.remote_hosts):
692 client_addr = self.pg0.remote_hosts[0].ip6
693 remote_addr = self.pg1.remote_hosts[nbr].ip6
694 src_nat_addr = self.pg2.remote_hosts[0].ip6
695 ICMP46 = ICMPv6DestUnreach
696 ICMPelem = ICMPv6DestUnreach(code=1)
700 client_addr = self.pg0.remote_hosts[0].ip4
701 remote_addr = self.pg1.remote_hosts[nbr].ip4
702 src_nat_addr = self.pg2.remote_hosts[0].ip4
705 ICMPelem = ICMP(type=11)
707 # from pods to outside network
709 Ether(dst=self.pg0.local_mac,
710 src=self.pg0.remote_hosts[0].mac) /
711 IP46(src=client_addr, dst=remote_addr) /
712 TCP(sport=sports[nbr], dport=dports[nbr]) /
715 rxs = self.send_and_expect(
720 self.assert_packet_checksums_valid(rx)
721 self.assertEqual(rx[IP46].dst, remote_addr)
722 self.assertEqual(rx[TCP].dport, dports[nbr])
723 self.assertEqual(rx[IP46].src, src_nat_addr)
724 sport = rx[TCP].sport
726 InnerIP = rxs[0][IP46]
727 # from outside to pods, ICMP error
729 Ether(dst=self.pg1.local_mac,
730 src=self.pg1.remote_hosts[nbr].mac) /
731 IP46(src=remote_addr, dst=src_nat_addr) /
734 rxs = self.send_and_expect(
740 self.assert_packet_checksums_valid(rx)
741 self.assertEqual(rx[IP46].src, remote_addr)
742 self.assertEqual(rx[ICMP46][IP46error].src, client_addr)
743 self.assertEqual(rx[ICMP46][IP46error].dst, remote_addr)
744 self.assertEqual(rx[ICMP46][IP46error]
745 [TCPerror].sport, sports[nbr])
746 self.assertEqual(rx[ICMP46][IP46error]
747 [TCPerror].dport, dports[nbr])
749 def sourcenat_test_tcp_udp_conf(self, l4p, isV6=False):
750 sports = [1234, 1235]
751 dports = [6661, 6662]
753 for nbr, remote_host in enumerate(self.pg1.remote_hosts):
756 client_addr = self.pg0.remote_hosts[0].ip6
757 remote_addr = self.pg1.remote_hosts[nbr].ip6
758 src_nat_addr = self.pg2.remote_hosts[0].ip6
759 exclude_prefix = ip_network(
760 "%s/100" % remote_addr, strict=False)
763 client_addr = self.pg0.remote_hosts[0].ip4
764 remote_addr = self.pg1.remote_hosts[nbr].ip4
765 src_nat_addr = self.pg2.remote_hosts[0].ip4
766 exclude_prefix = ip_network(
767 "%s/16" % remote_addr, strict=False)
768 # from pods to outside network
770 Ether(dst=self.pg0.local_mac,
771 src=self.pg0.remote_hosts[0].mac) /
772 IP46(src=client_addr, dst=remote_addr) /
773 l4p(sport=sports[nbr], dport=dports[nbr]) /
776 self.vapi.cli("trace add pg-input 1")
777 rxs = self.send_and_expect(
781 self.logger.info(self.vapi.cli("show trace max 1"))
784 self.assert_packet_checksums_valid(rx)
785 self.assertEqual(rx[IP46].dst, remote_addr)
786 self.assertEqual(rx[l4p].dport, dports[nbr])
787 self.assertEqual(rx[IP46].src, src_nat_addr)
788 sport = rx[l4p].sport
790 # from outside to pods
792 Ether(dst=self.pg1.local_mac,
793 src=self.pg1.remote_hosts[nbr].mac) /
794 IP46(src=remote_addr, dst=src_nat_addr) /
795 l4p(sport=dports[nbr], dport=sport) /
798 rxs = self.send_and_expect(
804 self.assert_packet_checksums_valid(rx)
805 self.assertEqual(rx[IP46].dst, client_addr)
806 self.assertEqual(rx[l4p].dport, sports[nbr])
807 self.assertEqual(rx[l4p].sport, dports[nbr])
808 self.assertEqual(rx[IP46].src, remote_addr)
810 # add remote host to exclude list
811 self.vapi.cnat_add_del_snat_prefix(prefix=exclude_prefix, is_add=1)
812 self.vapi.cnat_session_purge()
814 rxs = self.send_and_expect(
819 self.assert_packet_checksums_valid(rx)
820 self.assertEqual(rx[IP46].dst, remote_addr)
821 self.assertEqual(rx[l4p].dport, dports[nbr])
822 self.assertEqual(rx[IP46].src, client_addr)
824 # remove remote host from exclude list
825 self.vapi.cnat_add_del_snat_prefix(prefix=exclude_prefix, is_add=0)
826 self.vapi.cnat_session_purge()
828 rxs = self.send_and_expect(
834 self.assert_packet_checksums_valid(rx)
835 self.assertEqual(rx[IP46].dst, remote_addr)
836 self.assertEqual(rx[l4p].dport, dports[nbr])
837 self.assertEqual(rx[IP46].src, src_nat_addr)
839 self.vapi.cnat_session_purge()
842 class TestCNatDHCP(VppTestCase):
843 """ CNat Translation """
844 extra_vpp_punt_config = ["cnat", "{",
845 "session-db-buckets", "64",
846 "session-cleanup-timeout", "0.1",
847 "session-max-age", "1",
849 "scanner", "off", "}"]
853 super(TestCNatDHCP, cls).setUpClass()
856 def tearDownClass(cls):
857 super(TestCNatDHCP, cls).tearDownClass()
860 for i in self.pg_interfaces:
862 super(TestCNatDHCP, self).tearDown()
864 def create_translation(self, vip_pg, *args, is_v6=False):
865 vip = Ep(sw_if_index=vip_pg.sw_if_index, is_v6=is_v6)
867 for (src_pg, dst_pg) in args:
868 paths.append(EpTuple(
869 Ep.from_pg(src_pg, is_v6=is_v6),
870 Ep.from_pg(dst_pg, is_v6=is_v6)
872 t1 = VppCNatTranslation(self, TCP, vip, paths)
876 def make_addr(self, sw_if_index, i, is_v6):
878 return "fd01:%x::%u" % (sw_if_index, i + 1)
880 return "172.16.%u.%u" % (sw_if_index, i)
882 def make_prefix(self, sw_if_index, i, is_v6):
884 return "%s/128" % self.make_addr(sw_if_index, i, is_v6)
886 return "%s/32" % self.make_addr(sw_if_index, i, is_v6)
888 def check_resolved(self, tr, vip_pg, *args, i=0, is_v6=False):
889 qt1 = tr.query_vpp_config()
890 self.assertEqual(str(qt1.vip.addr), self.make_addr(
891 vip_pg.sw_if_index, i, is_v6))
892 for (src_pg, dst_pg), path in zip(args, qt1.paths):
894 self.assertEqual(str(path.src_ep.addr), self.make_addr(
895 src_pg.sw_if_index, i, is_v6))
897 self.assertEqual(str(path.dst_ep.addr), self.make_addr(
898 dst_pg.sw_if_index, i, is_v6))
900 def config_ips(self, rng, is_add=1, is_v6=False):
901 for pg, i in product(self.pg_interfaces, rng):
902 self.vapi.sw_interface_add_del_address(
903 sw_if_index=pg.sw_if_index,
904 prefix=self.make_prefix(pg.sw_if_index, i, is_v6),
907 def test_dhcp_v4(self):
908 self.create_pg_interfaces(range(5))
909 for i in self.pg_interfaces:
911 pglist = (self.pg0, (self.pg1, self.pg2), (self.pg1, self.pg4))
912 t1 = self.create_translation(*pglist)
914 self.check_resolved(t1, *pglist)
916 self.config_ips([0], is_add=0)
917 self.check_resolved(t1, *pglist, i=1)
918 self.config_ips([1], is_add=0)
919 t1.remove_vpp_config()
921 def test_dhcp_v6(self):
922 self.create_pg_interfaces(range(5))
923 for i in self.pg_interfaces:
925 pglist = (self.pg0, (self.pg1, self.pg2), (self.pg1, self.pg4))
926 t1 = self.create_translation(*pglist, is_v6=True)
927 self.config_ips([0], is_v6=True)
928 self.check_resolved(t1, *pglist, is_v6=True)
929 self.config_ips([1], is_v6=True)
930 self.config_ips([0], is_add=0, is_v6=True)
931 self.check_resolved(t1, *pglist, i=1, is_v6=True)
932 self.config_ips([1], is_add=0, is_v6=True)
933 t1.remove_vpp_config()
935 def test_dhcp_snat(self):
936 self.create_pg_interfaces(range(1))
937 for i in self.pg_interfaces:
939 self.vapi.cnat_set_snat_addresses(sw_if_index=self.pg0.sw_if_index)
940 self.config_ips([0], is_v6=False)
941 self.config_ips([0], is_v6=True)
942 r = self.vapi.cnat_get_snat_addresses()
943 self.assertEqual(str(r.snat_ip4), self.make_addr(
944 self.pg0.sw_if_index, 0, False))
945 self.assertEqual(str(r.snat_ip6), self.make_addr(
946 self.pg0.sw_if_index, 0, True))
947 self.config_ips([1], is_v6=False)
948 self.config_ips([1], is_v6=True)
949 self.config_ips([0], is_add=0, is_v6=False)
950 self.config_ips([0], is_add=0, is_v6=True)
951 r = self.vapi.cnat_get_snat_addresses()
952 self.assertEqual(str(r.snat_ip4), self.make_addr(
953 self.pg0.sw_if_index, 1, False))
954 self.assertEqual(str(r.snat_ip6), self.make_addr(
955 self.pg0.sw_if_index, 1, True))
956 self.config_ips([1], is_add=0, is_v6=False)
957 self.config_ips([1], is_add=0, is_v6=True)
958 self.vapi.cnat_set_snat_addresses(sw_if_index=INVALID_INDEX)
961 if __name__ == '__main__':
962 unittest.main(testRunner=VppTestRunner)