5 from framework import VppTestCase, VppTestRunner
6 from vpp_ip import DpoProto, INVALID_INDEX
7 from itertools import product
9 from scapy.packet import Raw
10 from scapy.layers.l2 import Ether
11 from scapy.layers.inet import IP, UDP, TCP, ICMP
12 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
13 from scapy.layers.inet6 import IPv6, IPerror6, ICMPv6DestUnreach
14 from scapy.layers.inet6 import ICMPv6EchoRequest, ICMPv6EchoReply
18 from ipaddress import ip_address, ip_network, \
19 IPv4Address, IPv6Address, IPv4Network, IPv6Network
21 from vpp_object import VppObject
22 from vpp_papi import VppEnum
30 def __init__(self, ip=None, port=0, l4p=TCP,
31 sw_if_index=INVALID_INDEX, is_v6=False):
34 self.ip = "::" if is_v6 else "0.0.0.0"
37 self.sw_if_index = sw_if_index
39 self.if_af = VppEnum.vl_api_address_family_t.ADDRESS_IP6
41 self.if_af = VppEnum.vl_api_address_family_t.ADDRESS_IP4
44 return {'addr': self.ip,
46 'sw_if_index': self.sw_if_index,
50 def from_pg(cls, pg, is_v6=False):
52 return cls(is_v6=is_v6)
54 return cls(sw_if_index=pg.sw_if_index, is_v6=is_v6)
61 return ("%s:%d" % (self.ip, self.port))
64 class EpTuple(object):
67 def __init__(self, src, dst):
72 return {'src_ep': self.src.encode(),
73 'dst_ep': self.dst.encode()}
76 return ("%s->%s" % (self.src, self.dst))
79 class VppCNatTranslation(VppObject):
81 def __init__(self, test, iproto, vip, paths):
86 self.encoded_paths = []
87 for path in self.paths:
88 self.encoded_paths.append(path.encode())
91 return ("%s %s %s" % (self.vip, self.iproto, self.paths))
95 ip_proto = VppEnum.vl_api_ip_proto_t
97 UDP: ip_proto.IP_API_PROTO_UDP,
98 TCP: ip_proto.IP_API_PROTO_TCP,
101 def add_vpp_config(self):
102 r = self._test.vapi.cnat_translation_update(
103 {'vip': self.vip.encode(),
104 'ip_proto': self.vl4_proto,
105 'n_paths': len(self.paths),
106 'paths': self.encoded_paths})
107 self._test.registry.register(self, self._test.logger)
110 def modify_vpp_config(self, paths):
112 self.encoded_paths = []
113 for path in self.paths:
114 self.encoded_paths.append(path.encode())
116 r = self._test.vapi.cnat_translation_update(
117 {'vip': self.vip.encode(),
118 'ip_proto': self.vl4_proto,
119 'n_paths': len(self.paths),
120 'paths': self.encoded_paths})
121 self._test.registry.register(self, self._test.logger)
123 def remove_vpp_config(self):
124 self._test.vapi.cnat_translation_del(id=self.id)
126 def query_vpp_config(self):
127 for t in self._test.vapi.cnat_translation_dump():
128 if self.id == t.translation.id:
133 return ("cnat-translation-%s" % (self.vip))
136 c = self._test.statistics.get_counter("/net/cnat-translation")
140 class TestCNatTranslation(VppTestCase):
141 """ CNat Translation """
142 extra_vpp_punt_config = ["cnat", "{",
143 "session-db-buckets", "64",
144 "session-cleanup-timeout", "0.1",
145 "session-max-age", "1",
147 "scanner", "off", "}"]
151 super(TestCNatTranslation, cls).setUpClass()
154 def tearDownClass(cls):
155 super(TestCNatTranslation, cls).tearDownClass()
158 super(TestCNatTranslation, self).setUp()
160 self.create_pg_interfaces(range(3))
162 for i in self.pg_interfaces:
170 for i in self.pg_interfaces:
174 super(TestCNatTranslation, self).tearDown()
176 def cnat_create_translation(self, vip, nbr):
177 ip_v = "ip6" if vip.isV6 else "ip4"
178 dep = Ep(getattr(self.pg1.remote_hosts[nbr], ip_v), 4000 + nbr)
179 sep = Ep("::", 0) if vip.isV6 else Ep("0.0.0.0", 0)
180 t1 = VppCNatTranslation(
182 [EpTuple(sep, dep), EpTuple(sep, dep)])
186 def cnat_test_translation(self, t1, nbr, sports, isV6=False):
187 ip_v = "ip6" if isV6 else "ip4"
188 ip_class = IPv6 if isV6 else IP
194 for src in self.pg0.remote_hosts:
197 p1 = (Ether(dst=self.pg0.local_mac,
199 ip_class(src=getattr(src, ip_v), dst=vip.ip) /
200 vip.l4p(sport=sport, dport=vip.port) /
203 self.vapi.cli("trace add pg-input 1")
204 rxs = self.send_and_expect(self.pg0,
207 self.logger.info(self.vapi.cli("show trace max 1"))
210 self.assert_packet_checksums_valid(rx)
213 getattr(self.pg1.remote_hosts[nbr], ip_v))
214 self.assertEqual(rx[vip.l4p].dport, 4000 + nbr)
218 self.assertEqual(rx[vip.l4p].sport, sport)
221 p1 = (Ether(dst=self.pg1.local_mac,
222 src=self.pg1.remote_mac) /
223 ip_class(src=getattr(
224 self.pg1.remote_hosts[nbr],
226 dst=getattr(src, ip_v)) /
227 vip.l4p(sport=4000 + nbr, dport=sport) /
230 rxs = self.send_and_expect(self.pg1,
235 self.assert_packet_checksums_valid(rx)
239 self.assertEqual(rx[vip.l4p].dport, sport)
240 self.assertEqual(rx[ip_class].src, vip.ip)
241 self.assertEqual(rx[vip.l4p].sport, vip.port)
244 # packets to the VIP that do not match a
245 # translation are dropped
247 p1 = (Ether(dst=self.pg0.local_mac,
249 ip_class(src=getattr(src, ip_v), dst=vip.ip) /
250 vip.l4p(sport=sport, dport=6666) /
253 self.send_and_assert_no_replies(self.pg0,
258 # packets from the VIP that do not match a
259 # session are forwarded
261 p1 = (Ether(dst=self.pg1.local_mac,
262 src=self.pg1.remote_mac) /
263 ip_class(src=getattr(
264 self.pg1.remote_hosts[nbr],
266 dst=getattr(src, ip_v)) /
267 vip.l4p(sport=6666, dport=sport) /
270 rxs = self.send_and_expect(self.pg1,
274 def cnat_test_translation_update(self, t1, sports, isV6=False):
275 ip_v = "ip6" if isV6 else "ip4"
276 ip_class = IPv6 if isV6 else IP
280 # modify the translation to use a different backend
282 dep = Ep(getattr(self.pg2, 'remote_' + ip_v), 5000)
283 sep = Ep("::", 0) if isV6 else Ep("0.0.0.0", 0)
284 t1.modify_vpp_config([EpTuple(sep, dep)])
287 # existing flows follow the old path
289 for src in self.pg0.remote_hosts:
292 p1 = (Ether(dst=self.pg0.local_mac,
294 ip_class(src=getattr(src, ip_v), dst=vip.ip) /
295 vip.l4p(sport=sport, dport=vip.port) /
298 rxs = self.send_and_expect(self.pg0,
303 # new flows go to the new backend
305 for src in self.pg0.remote_hosts:
306 p1 = (Ether(dst=self.pg0.local_mac,
308 ip_class(src=getattr(src, ip_v), dst=vip.ip) /
309 vip.l4p(sport=9999, dport=vip.port) /
312 rxs = self.send_and_expect(self.pg0,
316 def cnat_translation(self, vips, isV6=False):
317 """ CNat Translation """
319 ip_class = IPv6 if isV6 else IP
320 ip_v = "ip6" if isV6 else "ip4"
321 sports = [1234, 1233]
324 # turn the scanner off whilst testing otherwise sessions
327 self.vapi.cli("test cnat scanner off")
329 sessions = self.vapi.cnat_session_dump()
332 for nbr, vip in enumerate(vips):
333 trs.append(self.cnat_create_translation(vip, nbr))
335 self.logger.info(self.vapi.cli("sh cnat client"))
336 self.logger.info(self.vapi.cli("sh cnat translation"))
341 for nbr, vip in enumerate(vips):
342 self.cnat_test_translation(trs[nbr], nbr, sports, isV6=isV6)
343 self.cnat_test_translation_update(trs[nbr], sports, isV6=isV6)
345 self.logger.info(self.vapi.cli(
346 "sh ip6 fib %s" % self.pg0.remote_ip6))
348 self.logger.info(self.vapi.cli(
349 "sh ip fib %s" % self.pg0.remote_ip4))
350 self.logger.info(self.vapi.cli("sh cnat session verbose"))
353 # turn the scanner back on and wait until the sessions
356 self.vapi.cli("test cnat scanner on")
359 sessions = self.vapi.cnat_session_dump()
360 while (len(sessions) and n_tries < 100):
362 sessions = self.vapi.cnat_session_dump()
364 self.logger.info(self.vapi.cli("show cnat session verbose"))
366 self.assertTrue(n_tries < 100)
367 self.vapi.cli("test cnat scanner off")
370 # load some flows again and purge
373 for src in self.pg0.remote_hosts:
376 p1 = (Ether(dst=self.pg0.local_mac,
378 ip_class(src=getattr(src, ip_v), dst=vip.ip) /
379 vip.l4p(sport=sport, dport=vip.port) /
381 self.send_and_expect(self.pg0,
386 tr.remove_vpp_config()
388 self.assertTrue(self.vapi.cnat_session_dump())
389 self.vapi.cnat_session_purge()
390 self.assertFalse(self.vapi.cnat_session_dump())
394 Ep("30.0.0.1", 5555),
395 Ep("30.0.0.2", 5554),
396 Ep("30.0.0.2", 5553, UDP),
398 Ep("30::2", 5553, UDP),
402 self.pg0.generate_remote_hosts(len(vips))
403 self.pg0.configure_ipv6_neighbors()
404 self.pg0.configure_ipv4_neighbors()
406 self.pg1.generate_remote_hosts(len(vips))
407 self.pg1.configure_ipv6_neighbors()
408 self.pg1.configure_ipv4_neighbors()
410 self.vapi.cli("test cnat scanner off")
412 for nbr, vip in enumerate(vips):
413 trs.append(self.cnat_create_translation(vip, nbr))
415 self.logger.info(self.vapi.cli("sh cnat client"))
416 self.logger.info(self.vapi.cli("sh cnat translation"))
418 for nbr, vip in enumerate(vips):
420 client_addr = self.pg0.remote_hosts[0].ip6
421 remote_addr = self.pg1.remote_hosts[nbr].ip6
422 remote2_addr = self.pg2.remote_hosts[0].ip6
424 client_addr = self.pg0.remote_hosts[0].ip4
425 remote_addr = self.pg1.remote_hosts[nbr].ip4
426 remote2_addr = self.pg2.remote_hosts[0].ip4
427 IP46 = IPv6 if vip.isV6 else IP
429 p1 = (Ether(dst=self.pg0.local_mac,
430 src=self.pg0.remote_hosts[0].mac) /
431 IP46(src=client_addr, dst=vip.ip) /
432 vip.l4p(sport=sport, dport=vip.port) /
435 rxs = self.send_and_expect(self.pg0,
440 self.assert_packet_checksums_valid(rx)
441 self.assertEqual(rx[IP46].dst, remote_addr)
442 self.assertEqual(rx[vip.l4p].dport, 4000 + nbr)
443 self.assertEqual(rx[IP46].src, client_addr)
444 self.assertEqual(rx[vip.l4p].sport, sport)
446 InnerIP = rxs[0][IP46]
448 ICMP46 = ICMPv6DestUnreach if vip.isV6 else ICMP
449 ICMPelem = ICMPv6DestUnreach(code=1) if vip.isV6 else ICMP(type=11)
450 # from vip to client, ICMP error
451 p1 = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
452 IP46(src=remote_addr, dst=client_addr) /
455 rxs = self.send_and_expect(self.pg1,
459 TCPUDPError = TCPerror if vip.l4p == TCP else UDPerror
460 IP46error = IPerror6 if vip.isV6 else IPerror
462 self.assert_packet_checksums_valid(rx)
463 self.assertEqual(rx[IP46].src, vip.ip)
464 self.assertEqual(rx[ICMP46][IP46error].src, client_addr)
465 self.assertEqual(rx[ICMP46][IP46error].dst, vip.ip)
466 self.assertEqual(rx[ICMP46][IP46error]
467 [TCPUDPError].sport, sport)
468 self.assertEqual(rx[ICMP46][IP46error]
469 [TCPUDPError].dport, vip.port)
471 # from other remote to client, ICMP error
472 # outside shouldn't be NAT-ed
473 p1 = (Ether(dst=self.pg2.local_mac, src=self.pg2.remote_mac) /
474 IP46(src=remote2_addr, dst=client_addr) /
477 rxs = self.send_and_expect(self.pg1,
481 TCPUDPError = TCPerror if vip.l4p == TCP else UDPerror
482 IP46error = IPerror6 if vip.isV6 else IPerror
484 self.assert_packet_checksums_valid(rx)
485 self.assertEqual(rx[IP46].src, remote2_addr)
486 self.assertEqual(rx[ICMP46][IP46error].src, client_addr)
487 self.assertEqual(rx[ICMP46][IP46error].dst, vip.ip)
488 self.assertEqual(rx[ICMP46][IP46error]
489 [TCPUDPError].sport, sport)
490 self.assertEqual(rx[ICMP46][IP46error]
491 [TCPUDPError].dport, vip.port)
493 self.vapi.cnat_session_purge()
495 def test_cnat6(self):
496 # """ CNat Translation ipv6 """
500 Ep("30::2", 5553, UDP),
503 self.pg0.generate_remote_hosts(len(vips))
504 self.pg0.configure_ipv6_neighbors()
505 self.pg1.generate_remote_hosts(len(vips))
506 self.pg1.configure_ipv6_neighbors()
508 self.cnat_translation(vips, isV6=True)
510 def test_cnat4(self):
511 # """ CNat Translation ipv4 """
514 Ep("30.0.0.1", 5555),
515 Ep("30.0.0.2", 5554),
516 Ep("30.0.0.2", 5553, UDP),
519 self.pg0.generate_remote_hosts(len(vips))
520 self.pg0.configure_ipv4_neighbors()
521 self.pg1.generate_remote_hosts(len(vips))
522 self.pg1.configure_ipv4_neighbors()
524 self.cnat_translation(vips)
527 class TestCNatSourceNAT(VppTestCase):
528 """ CNat Source NAT """
529 extra_vpp_punt_config = ["cnat", "{",
530 "session-cleanup-timeout", "0.1",
531 "session-max-age", "1",
533 "scanner", "off", "}"]
537 super(TestCNatSourceNAT, cls).setUpClass()
540 def tearDownClass(cls):
541 super(TestCNatSourceNAT, cls).tearDownClass()
544 super(TestCNatSourceNAT, self).setUp()
546 self.create_pg_interfaces(range(3))
548 for i in self.pg_interfaces:
555 self.pg0.configure_ipv6_neighbors()
556 self.pg0.configure_ipv4_neighbors()
557 self.pg1.generate_remote_hosts(2)
558 self.pg1.configure_ipv4_neighbors()
559 self.pg1.configure_ipv6_neighbors()
561 self.vapi.cnat_set_snat_addresses(
562 snat_ip4=self.pg2.remote_hosts[0].ip4,
563 snat_ip6=self.pg2.remote_hosts[0].ip6,
564 sw_if_index=INVALID_INDEX)
565 self.vapi.feature_enable_disable(
567 arc_name="ip6-unicast",
568 feature_name="cnat-snat-ip6",
569 sw_if_index=self.pg0.sw_if_index)
570 self.vapi.feature_enable_disable(
572 arc_name="ip4-unicast",
573 feature_name="cnat-snat-ip4",
574 sw_if_index=self.pg0.sw_if_index)
576 policie_tbls = VppEnum.vl_api_cnat_snat_policy_table_t
577 self.vapi.cnat_set_snat_policy(
578 policy=VppEnum.vl_api_cnat_snat_policies_t.CNAT_POLICY_IF_PFX)
579 for i in self.pg_interfaces:
580 self.vapi.cnat_snat_policy_add_del_if(
581 sw_if_index=i.sw_if_index, is_add=1,
582 table=policie_tbls.CNAT_POLICY_INCLUDE_V6)
583 self.vapi.cnat_snat_policy_add_del_if(
584 sw_if_index=i.sw_if_index, is_add=1,
585 table=policie_tbls.CNAT_POLICY_INCLUDE_V4)
588 self.vapi.cnat_session_purge()
589 for i in self.pg_interfaces:
593 super(TestCNatSourceNAT, self).tearDown()
595 def test_snat_v6(self):
596 # """ CNat Source Nat v6 """
597 self.sourcenat_test_tcp_udp_conf(TCP, isV6=True)
598 self.sourcenat_test_tcp_udp_conf(UDP, isV6=True)
599 self.sourcenat_test_icmp_err_conf(isV6=True)
600 self.sourcenat_test_icmp_echo6_conf()
602 def test_snat_v4(self):
603 # """ CNat Source Nat v4 """
604 self.sourcenat_test_tcp_udp_conf(TCP)
605 self.sourcenat_test_tcp_udp_conf(UDP)
606 self.sourcenat_test_icmp_err_conf()
607 self.sourcenat_test_icmp_echo4_conf()
609 def sourcenat_test_icmp_echo6_conf(self):
610 sports = [1234, 1235]
611 dports = [6661, 6662]
613 for nbr, remote_host in enumerate(self.pg1.remote_hosts):
614 client_addr = self.pg0.remote_hosts[0].ip6
615 remote_addr = self.pg1.remote_hosts[nbr].ip6
616 src_nat_addr = self.pg2.remote_hosts[0].ip6
618 # ping from pods to outside network
620 Ether(dst=self.pg0.local_mac,
621 src=self.pg0.remote_hosts[0].mac) /
622 IPv6(src=client_addr, dst=remote_addr) /
623 ICMPv6EchoRequest(id=0xfeed) /
626 rxs = self.send_and_expect(
632 self.assertEqual(rx[IPv6].src, src_nat_addr)
633 self.assert_packet_checksums_valid(rx)
635 received_id = rx[0][ICMPv6EchoRequest].id
636 # ping reply from outside to pods
638 Ether(dst=self.pg1.local_mac,
639 src=self.pg1.remote_hosts[nbr].mac) /
640 IPv6(src=remote_addr, dst=src_nat_addr) /
641 ICMPv6EchoReply(id=received_id))
642 rxs = self.send_and_expect(
648 self.assert_packet_checksums_valid(rx)
649 self.assertEqual(rx[IPv6].src, remote_addr)
650 self.assertEqual(rx[ICMPv6EchoReply].id, 0xfeed)
652 def sourcenat_test_icmp_echo4_conf(self):
653 sports = [1234, 1235]
654 dports = [6661, 6662]
656 for nbr, remote_host in enumerate(self.pg1.remote_hosts):
658 client_addr = self.pg0.remote_hosts[0].ip4
659 remote_addr = self.pg1.remote_hosts[nbr].ip4
660 src_nat_addr = self.pg2.remote_hosts[0].ip4
662 # ping from pods to outside network
664 Ether(dst=self.pg0.local_mac,
665 src=self.pg0.remote_hosts[0].mac) /
666 IP46(src=client_addr, dst=remote_addr) /
667 ICMP(type=8, id=0xfeed) /
670 rxs = self.send_and_expect(
676 self.assertEqual(rx[IP46].src, src_nat_addr)
677 self.assert_packet_checksums_valid(rx)
679 received_id = rx[0][ICMP].id
680 # ping reply from outside to pods
682 Ether(dst=self.pg1.local_mac,
683 src=self.pg1.remote_hosts[nbr].mac) /
684 IP46(src=remote_addr, dst=src_nat_addr) /
685 ICMP(type=0, id=received_id))
686 rxs = self.send_and_expect(
692 self.assert_packet_checksums_valid(rx)
693 self.assertEqual(rx[IP46].src, remote_addr)
694 self.assertEqual(rx[ICMP].id, 0xfeed)
696 def sourcenat_test_icmp_err_conf(self, isV6=False):
697 sports = [1234, 1235]
698 dports = [6661, 6662]
700 for nbr, remote_host in enumerate(self.pg1.remote_hosts):
703 client_addr = self.pg0.remote_hosts[0].ip6
704 remote_addr = self.pg1.remote_hosts[nbr].ip6
705 src_nat_addr = self.pg2.remote_hosts[0].ip6
706 ICMP46 = ICMPv6DestUnreach
707 ICMPelem = ICMPv6DestUnreach(code=1)
711 client_addr = self.pg0.remote_hosts[0].ip4
712 remote_addr = self.pg1.remote_hosts[nbr].ip4
713 src_nat_addr = self.pg2.remote_hosts[0].ip4
716 ICMPelem = ICMP(type=11)
718 # from pods to outside network
720 Ether(dst=self.pg0.local_mac,
721 src=self.pg0.remote_hosts[0].mac) /
722 IP46(src=client_addr, dst=remote_addr) /
723 TCP(sport=sports[nbr], dport=dports[nbr]) /
726 rxs = self.send_and_expect(
731 self.assert_packet_checksums_valid(rx)
732 self.assertEqual(rx[IP46].dst, remote_addr)
733 self.assertEqual(rx[TCP].dport, dports[nbr])
734 self.assertEqual(rx[IP46].src, src_nat_addr)
735 sport = rx[TCP].sport
737 InnerIP = rxs[0][IP46]
738 # from outside to pods, ICMP error
740 Ether(dst=self.pg1.local_mac,
741 src=self.pg1.remote_hosts[nbr].mac) /
742 IP46(src=remote_addr, dst=src_nat_addr) /
745 rxs = self.send_and_expect(
751 self.assert_packet_checksums_valid(rx)
752 self.assertEqual(rx[IP46].src, remote_addr)
753 self.assertEqual(rx[ICMP46][IP46error].src, client_addr)
754 self.assertEqual(rx[ICMP46][IP46error].dst, remote_addr)
755 self.assertEqual(rx[ICMP46][IP46error]
756 [TCPerror].sport, sports[nbr])
757 self.assertEqual(rx[ICMP46][IP46error]
758 [TCPerror].dport, dports[nbr])
760 def sourcenat_test_tcp_udp_conf(self, l4p, isV6=False):
761 sports = [1234, 1235]
762 dports = [6661, 6662]
764 for nbr, remote_host in enumerate(self.pg1.remote_hosts):
767 client_addr = self.pg0.remote_hosts[0].ip6
768 remote_addr = self.pg1.remote_hosts[nbr].ip6
769 src_nat_addr = self.pg2.remote_hosts[0].ip6
770 exclude_prefix = ip_network(
771 "%s/100" % remote_addr, strict=False)
774 client_addr = self.pg0.remote_hosts[0].ip4
775 remote_addr = self.pg1.remote_hosts[nbr].ip4
776 src_nat_addr = self.pg2.remote_hosts[0].ip4
777 exclude_prefix = ip_network(
778 "%s/16" % remote_addr, strict=False)
779 # from pods to outside network
781 Ether(dst=self.pg0.local_mac,
782 src=self.pg0.remote_hosts[0].mac) /
783 IP46(src=client_addr, dst=remote_addr) /
784 l4p(sport=sports[nbr], dport=dports[nbr]) /
787 self.vapi.cli("trace add pg-input 1")
788 rxs = self.send_and_expect(
792 self.logger.info(self.vapi.cli("show trace max 1"))
795 self.assert_packet_checksums_valid(rx)
796 self.assertEqual(rx[IP46].dst, remote_addr)
797 self.assertEqual(rx[l4p].dport, dports[nbr])
798 self.assertEqual(rx[IP46].src, src_nat_addr)
799 sport = rx[l4p].sport
801 # from outside to pods
803 Ether(dst=self.pg1.local_mac,
804 src=self.pg1.remote_hosts[nbr].mac) /
805 IP46(src=remote_addr, dst=src_nat_addr) /
806 l4p(sport=dports[nbr], dport=sport) /
809 rxs = self.send_and_expect(
815 self.assert_packet_checksums_valid(rx)
816 self.assertEqual(rx[IP46].dst, client_addr)
817 self.assertEqual(rx[l4p].dport, sports[nbr])
818 self.assertEqual(rx[l4p].sport, dports[nbr])
819 self.assertEqual(rx[IP46].src, remote_addr)
821 # add remote host to exclude list
822 self.vapi.cnat_snat_policy_add_del_exclude_pfx(
823 prefix=exclude_prefix, is_add=1)
824 self.vapi.cnat_session_purge()
826 rxs = self.send_and_expect(
831 self.assert_packet_checksums_valid(rx)
832 self.assertEqual(rx[IP46].dst, remote_addr)
833 self.assertEqual(rx[l4p].dport, dports[nbr])
834 self.assertEqual(rx[IP46].src, client_addr)
836 # remove remote host from exclude list
837 self.vapi.cnat_snat_policy_add_del_exclude_pfx(
838 prefix=exclude_prefix, is_add=0)
839 self.vapi.cnat_session_purge()
841 rxs = self.send_and_expect(
847 self.assert_packet_checksums_valid(rx)
848 self.assertEqual(rx[IP46].dst, remote_addr)
849 self.assertEqual(rx[l4p].dport, dports[nbr])
850 self.assertEqual(rx[IP46].src, src_nat_addr)
852 self.vapi.cnat_session_purge()
855 class TestCNatDHCP(VppTestCase):
856 """ CNat Translation """
857 extra_vpp_punt_config = ["cnat", "{",
858 "session-db-buckets", "64",
859 "session-cleanup-timeout", "0.1",
860 "session-max-age", "1",
862 "scanner", "off", "}"]
866 super(TestCNatDHCP, cls).setUpClass()
869 def tearDownClass(cls):
870 super(TestCNatDHCP, cls).tearDownClass()
873 for i in self.pg_interfaces:
875 super(TestCNatDHCP, self).tearDown()
877 def create_translation(self, vip_pg, *args, is_v6=False):
878 vip = Ep(sw_if_index=vip_pg.sw_if_index, is_v6=is_v6)
880 for (src_pg, dst_pg) in args:
881 paths.append(EpTuple(
882 Ep.from_pg(src_pg, is_v6=is_v6),
883 Ep.from_pg(dst_pg, is_v6=is_v6)
885 t1 = VppCNatTranslation(self, TCP, vip, paths)
889 def make_addr(self, sw_if_index, i, is_v6):
891 return "fd01:%x::%u" % (sw_if_index, i + 1)
893 return "172.16.%u.%u" % (sw_if_index, i)
895 def make_prefix(self, sw_if_index, i, is_v6):
897 return "%s/128" % self.make_addr(sw_if_index, i, is_v6)
899 return "%s/32" % self.make_addr(sw_if_index, i, is_v6)
901 def check_resolved(self, tr, vip_pg, *args, i=0, is_v6=False):
902 qt1 = tr.query_vpp_config()
903 self.assertEqual(str(qt1.vip.addr), self.make_addr(
904 vip_pg.sw_if_index, i, is_v6))
905 for (src_pg, dst_pg), path in zip(args, qt1.paths):
907 self.assertEqual(str(path.src_ep.addr), self.make_addr(
908 src_pg.sw_if_index, i, is_v6))
910 self.assertEqual(str(path.dst_ep.addr), self.make_addr(
911 dst_pg.sw_if_index, i, is_v6))
913 def config_ips(self, rng, is_add=1, is_v6=False):
914 for pg, i in product(self.pg_interfaces, rng):
915 self.vapi.sw_interface_add_del_address(
916 sw_if_index=pg.sw_if_index,
917 prefix=self.make_prefix(pg.sw_if_index, i, is_v6),
920 def test_dhcp_v4(self):
921 self.create_pg_interfaces(range(5))
922 for i in self.pg_interfaces:
924 pglist = (self.pg0, (self.pg1, self.pg2), (self.pg1, self.pg4))
925 t1 = self.create_translation(*pglist)
927 self.check_resolved(t1, *pglist)
929 self.config_ips([0], is_add=0)
930 self.check_resolved(t1, *pglist, i=1)
931 self.config_ips([1], is_add=0)
932 t1.remove_vpp_config()
934 def test_dhcp_v6(self):
935 self.create_pg_interfaces(range(5))
936 for i in self.pg_interfaces:
938 pglist = (self.pg0, (self.pg1, self.pg2), (self.pg1, self.pg4))
939 t1 = self.create_translation(*pglist, is_v6=True)
940 self.config_ips([0], is_v6=True)
941 self.check_resolved(t1, *pglist, is_v6=True)
942 self.config_ips([1], is_v6=True)
943 self.config_ips([0], is_add=0, is_v6=True)
944 self.check_resolved(t1, *pglist, i=1, is_v6=True)
945 self.config_ips([1], is_add=0, is_v6=True)
946 t1.remove_vpp_config()
948 def test_dhcp_snat(self):
949 self.create_pg_interfaces(range(1))
950 for i in self.pg_interfaces:
952 self.vapi.cnat_set_snat_addresses(sw_if_index=self.pg0.sw_if_index)
953 self.config_ips([0], is_v6=False)
954 self.config_ips([0], is_v6=True)
955 r = self.vapi.cnat_get_snat_addresses()
956 self.assertEqual(str(r.snat_ip4), self.make_addr(
957 self.pg0.sw_if_index, 0, False))
958 self.assertEqual(str(r.snat_ip6), self.make_addr(
959 self.pg0.sw_if_index, 0, True))
960 self.config_ips([1], is_v6=False)
961 self.config_ips([1], is_v6=True)
962 self.config_ips([0], is_add=0, is_v6=False)
963 self.config_ips([0], is_add=0, is_v6=True)
964 r = self.vapi.cnat_get_snat_addresses()
965 self.assertEqual(str(r.snat_ip4), self.make_addr(
966 self.pg0.sw_if_index, 1, False))
967 self.assertEqual(str(r.snat_ip6), self.make_addr(
968 self.pg0.sw_if_index, 1, True))
969 self.config_ips([1], is_add=0, is_v6=False)
970 self.config_ips([1], is_add=0, is_v6=True)
971 self.vapi.cnat_set_snat_addresses(sw_if_index=INVALID_INDEX)
974 if __name__ == '__main__':
975 unittest.main(testRunner=VppTestRunner)