X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=src%2Fplugins%2Fcnat%2Ftest%2Ftest_cnat.py;h=ff4c44033cb4c55b84487f6d97d2c1a988a5f58d;hb=3fd77f7de;hp=18e3baadbed1d57fadf4adcffb850d4f00ba08f5;hpb=29f3c7d2ecac2f9d80bb33e91bd5d1f9d434768a;p=vpp.git diff --git a/src/plugins/cnat/test/test_cnat.py b/src/plugins/cnat/test/test_cnat.py index 18e3baadbed..ff4c44033cb 100644 --- a/src/plugins/cnat/test/test_cnat.py +++ b/src/plugins/cnat/test/test_cnat.py @@ -3,12 +3,17 @@ import unittest from framework import VppTestCase, VppTestRunner -from vpp_ip import DpoProto +from vpp_ip import DpoProto, INVALID_INDEX +from itertools import product from scapy.packet import Raw from scapy.layers.l2 import Ether -from scapy.layers.inet import IP, UDP, TCP -from scapy.layers.inet6 import IPv6 +from scapy.layers.inet import IP, UDP, TCP, ICMP +from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror +from scapy.layers.inet6 import IPv6, IPerror6, ICMPv6DestUnreach +from scapy.layers.inet6 import ICMPv6EchoRequest, ICMPv6EchoReply + +import struct from ipaddress import ip_address, ip_network, \ IPv4Address, IPv6Address, IPv4Network, IPv6Network @@ -19,25 +24,38 @@ from vpp_papi import VppEnum N_PKTS = 15 -def find_cnat_translation(test, id): - ts = test.vapi.cnat_translation_dump() - for t in ts: - if id == t.translation.id: - return True - return False - - class Ep(object): """ CNat endpoint """ - def __init__(self, ip, port, l4p=TCP): + def __init__(self, ip=None, port=0, l4p=TCP, + sw_if_index=INVALID_INDEX, is_v6=False): self.ip = ip + if ip is None: + self.ip = "::" if is_v6 else "0.0.0.0" self.port = port self.l4p = l4p + self.sw_if_index = sw_if_index + if is_v6: + self.if_af = VppEnum.vl_api_address_family_t.ADDRESS_IP6 + else: + self.if_af = VppEnum.vl_api_address_family_t.ADDRESS_IP4 def encode(self): return {'addr': self.ip, - 'port': self.port} + 'port': self.port, + 'sw_if_index': self.sw_if_index, + 'if_af': self.if_af} + + @classmethod + def from_pg(cls, pg, is_v6=False): + if pg is None: + return cls(is_v6=is_v6) + else: + return cls(sw_if_index=pg.sw_if_index, is_v6=is_v6) + + @property + def isV6(self): + return ":" in self.ip def __str__(self): return ("%s:%d" % (self.ip, self.port)) @@ -69,6 +87,9 @@ class VppCNatTranslation(VppObject): for path in self.paths: self.encoded_paths.append(path.encode()) + def __str__(self): + return ("%s %s %s" % (self.vip, self.iproto, self.paths)) + @property def vl4_proto(self): ip_proto = VppEnum.vl_api_ip_proto_t @@ -77,9 +98,6 @@ class VppCNatTranslation(VppObject): TCP: ip_proto.IP_API_PROTO_TCP, }[self.iproto] - def delete(self): - r = self._test.vapi.cnat_translation_del(id=self.id) - def add_vpp_config(self): r = self._test.vapi.cnat_translation_update( {'vip': self.vip.encode(), @@ -103,10 +121,13 @@ class VppCNatTranslation(VppObject): self._test.registry.register(self, self._test.logger) def remove_vpp_config(self): - self._test.vapi.cnat_translation_del(self.id) + self._test.vapi.cnat_translation_del(id=self.id) def query_vpp_config(self): - return find_cnat_translation(self._test, self.id) + for t in self._test.vapi.cnat_translation_dump(): + if self.id == t.translation.id: + return t.translation + return None def object_id(self): return ("cnat-translation-%s" % (self.vip)) @@ -116,39 +137,14 @@ class VppCNatTranslation(VppObject): return c[0][self.id] -class VppCNATSourceNat(VppObject): - - def __init__(self, test, address, exclude_subnets=[]): - self._test = test - self.address = address - self.exclude_subnets = exclude_subnets - - def add_vpp_config(self): - a = ip_address(self.address) - if 4 == a.version: - self._test.vapi.cnat_set_snat_addresses(snat_ip4=self.address) - else: - self._test.vapi.cnat_set_snat_addresses(snat_ip6=self.address) - for subnet in self.exclude_subnets: - self.cnat_exclude_subnet(subnet, True) - - def cnat_exclude_subnet(self, exclude_subnet, isAdd=True): - add = 1 if isAdd else 0 - self._test.vapi.cnat_add_del_snat_prefix( - prefix=exclude_subnet, is_add=add) - - def query_vpp_config(self): - return False - - def remove_vpp_config(self): - return False - - class TestCNatTranslation(VppTestCase): """ CNat Translation """ extra_vpp_punt_config = ["cnat", "{", + "session-db-buckets", "64", + "session-cleanup-timeout", "0.1", "session-max-age", "1", - "tcp-max-age", "1", "}"] + "tcp-max-age", "1", + "scanner", "off", "}"] @classmethod def setUpClass(cls): @@ -177,10 +173,10 @@ class TestCNatTranslation(VppTestCase): i.admin_down() super(TestCNatTranslation, self).tearDown() - def cnat_create_translation(self, vip, nbr, isV6=False): - ip_v = "ip6" if isV6 else "ip4" + def cnat_create_translation(self, vip, nbr): + ip_v = "ip6" if vip.isV6 else "ip4" dep = Ep(getattr(self.pg1.remote_hosts[nbr], ip_v), 4000 + nbr) - sep = Ep("::", 0) if isV6 else Ep("0.0.0.0", 0) + sep = Ep("::", 0) if vip.isV6 else Ep("0.0.0.0", 0) t1 = VppCNatTranslation( self, vip.l4p, vip, [EpTuple(sep, dep), EpTuple(sep, dep)]) @@ -208,6 +204,7 @@ class TestCNatTranslation(VppTestCase): rxs = self.send_and_expect(self.pg0, p1 * N_PKTS, self.pg1) + self.logger.info(self.vapi.cli("show trace max 1")) for rx in rxs: self.assert_packet_checksums_valid(rx) @@ -274,11 +271,6 @@ class TestCNatTranslation(VppTestCase): p1 * N_PKTS, self.pg0) - self.assertEqual(t1.get_stats()['packets'], - N_PKTS * - len(sports) * - len(self.pg0.remote_hosts)) - def cnat_test_translation_update(self, t1, sports, isV6=False): ip_v = "ip6" if isV6 else "ip4" ip_class = IPv6 if isV6 else IP @@ -338,7 +330,7 @@ class TestCNatTranslation(VppTestCase): trs = [] for nbr, vip in enumerate(vips): - trs.append(self.cnat_create_translation(vip, nbr, isV6=isV6)) + trs.append(self.cnat_create_translation(vip, nbr)) self.logger.info(self.vapi.cli("sh cnat client")) self.logger.info(self.vapi.cli("sh cnat translation")) @@ -358,7 +350,7 @@ class TestCNatTranslation(VppTestCase): self.logger.info(self.vapi.cli("sh cnat session verbose")) # - # turn the scanner back on and wait untill the sessions + # turn the scanner back on and wait until the sessions # all disapper # self.vapi.cli("test cnat scanner on") @@ -369,8 +361,10 @@ class TestCNatTranslation(VppTestCase): n_tries += 1 sessions = self.vapi.cnat_session_dump() self.sleep(2) + self.logger.info(self.vapi.cli("show cnat session verbose")) self.assertTrue(n_tries < 100) + self.vapi.cli("test cnat scanner off") # # load some flows again and purge @@ -389,12 +383,115 @@ class TestCNatTranslation(VppTestCase): self.pg2) for tr in trs: - tr.delete() + tr.remove_vpp_config() self.assertTrue(self.vapi.cnat_session_dump()) self.vapi.cnat_session_purge() self.assertFalse(self.vapi.cnat_session_dump()) + def test_icmp(self): + vips = [ + Ep("30.0.0.1", 5555), + Ep("30.0.0.2", 5554), + Ep("30.0.0.2", 5553, UDP), + Ep("30::1", 6666), + Ep("30::2", 5553, UDP), + ] + sport = 1234 + + self.pg0.generate_remote_hosts(len(vips)) + self.pg0.configure_ipv6_neighbors() + self.pg0.configure_ipv4_neighbors() + + self.pg1.generate_remote_hosts(len(vips)) + self.pg1.configure_ipv6_neighbors() + self.pg1.configure_ipv4_neighbors() + + self.vapi.cli("test cnat scanner off") + trs = [] + for nbr, vip in enumerate(vips): + trs.append(self.cnat_create_translation(vip, nbr)) + + self.logger.info(self.vapi.cli("sh cnat client")) + self.logger.info(self.vapi.cli("sh cnat translation")) + + for nbr, vip in enumerate(vips): + if vip.isV6: + client_addr = self.pg0.remote_hosts[0].ip6 + remote_addr = self.pg1.remote_hosts[nbr].ip6 + remote2_addr = self.pg2.remote_hosts[0].ip6 + else: + client_addr = self.pg0.remote_hosts[0].ip4 + remote_addr = self.pg1.remote_hosts[nbr].ip4 + remote2_addr = self.pg2.remote_hosts[0].ip4 + IP46 = IPv6 if vip.isV6 else IP + # from client to vip + p1 = (Ether(dst=self.pg0.local_mac, + src=self.pg0.remote_hosts[0].mac) / + IP46(src=client_addr, dst=vip.ip) / + vip.l4p(sport=sport, dport=vip.port) / + Raw()) + + rxs = self.send_and_expect(self.pg0, + p1 * N_PKTS, + self.pg1) + + for rx in rxs: + self.assert_packet_checksums_valid(rx) + self.assertEqual(rx[IP46].dst, remote_addr) + self.assertEqual(rx[vip.l4p].dport, 4000 + nbr) + self.assertEqual(rx[IP46].src, client_addr) + self.assertEqual(rx[vip.l4p].sport, sport) + + InnerIP = rxs[0][IP46] + + ICMP46 = ICMPv6DestUnreach if vip.isV6 else ICMP + ICMPelem = ICMPv6DestUnreach(code=1) if vip.isV6 else ICMP(type=11) + # from vip to client, ICMP error + p1 = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) / + IP46(src=remote_addr, dst=client_addr) / + ICMPelem / InnerIP) + + rxs = self.send_and_expect(self.pg1, + p1 * N_PKTS, + self.pg0) + + TCPUDPError = TCPerror if vip.l4p == TCP else UDPerror + IP46error = IPerror6 if vip.isV6 else IPerror + for rx in rxs: + self.assert_packet_checksums_valid(rx) + self.assertEqual(rx[IP46].src, vip.ip) + self.assertEqual(rx[ICMP46][IP46error].src, client_addr) + self.assertEqual(rx[ICMP46][IP46error].dst, vip.ip) + self.assertEqual(rx[ICMP46][IP46error] + [TCPUDPError].sport, sport) + self.assertEqual(rx[ICMP46][IP46error] + [TCPUDPError].dport, vip.port) + + # from other remote to client, ICMP error + # outside shouldn't be NAT-ed + p1 = (Ether(dst=self.pg2.local_mac, src=self.pg2.remote_mac) / + IP46(src=remote2_addr, dst=client_addr) / + ICMPelem / InnerIP) + + rxs = self.send_and_expect(self.pg1, + p1 * N_PKTS, + self.pg0) + + TCPUDPError = TCPerror if vip.l4p == TCP else UDPerror + IP46error = IPerror6 if vip.isV6 else IPerror + for rx in rxs: + self.assert_packet_checksums_valid(rx) + self.assertEqual(rx[IP46].src, remote2_addr) + self.assertEqual(rx[ICMP46][IP46error].src, client_addr) + self.assertEqual(rx[ICMP46][IP46error].dst, vip.ip) + self.assertEqual(rx[ICMP46][IP46error] + [TCPUDPError].sport, sport) + self.assertEqual(rx[ICMP46][IP46error] + [TCPUDPError].dport, vip.port) + + self.vapi.cnat_session_purge() + def test_cnat6(self): # """ CNat Translation ipv6 """ vips = [ @@ -430,8 +527,10 @@ class TestCNatTranslation(VppTestCase): class TestCNatSourceNAT(VppTestCase): """ CNat Source NAT """ extra_vpp_punt_config = ["cnat", "{", + "session-cleanup-timeout", "0.1", "session-max-age", "1", - "tcp-max-age", "1", "}"] + "tcp-max-age", "1", + "scanner", "off", "}"] @classmethod def setUpClass(cls): @@ -453,144 +552,424 @@ class TestCNatSourceNAT(VppTestCase): i.config_ip6() i.resolve_ndp() + self.pg0.configure_ipv6_neighbors() + self.pg0.configure_ipv4_neighbors() + self.pg1.generate_remote_hosts(2) + self.pg1.configure_ipv4_neighbors() + self.pg1.configure_ipv6_neighbors() + + self.vapi.cnat_set_snat_addresses( + snat_ip4=self.pg2.remote_hosts[0].ip4, + snat_ip6=self.pg2.remote_hosts[0].ip6, + sw_if_index=INVALID_INDEX) + self.vapi.feature_enable_disable( + enable=1, + arc_name="ip6-unicast", + feature_name="cnat-snat-ip6", + sw_if_index=self.pg0.sw_if_index) + self.vapi.feature_enable_disable( + enable=1, + arc_name="ip4-unicast", + feature_name="cnat-snat-ip4", + sw_if_index=self.pg0.sw_if_index) + + policie_tbls = VppEnum.vl_api_cnat_snat_policy_table_t + self.vapi.cnat_set_snat_policy( + policy=VppEnum.vl_api_cnat_snat_policies_t.CNAT_POLICY_IF_PFX) + for i in self.pg_interfaces: + self.vapi.cnat_snat_policy_add_del_if( + sw_if_index=i.sw_if_index, is_add=1, + table=policie_tbls.CNAT_POLICY_INCLUDE_V6) + self.vapi.cnat_snat_policy_add_del_if( + sw_if_index=i.sw_if_index, is_add=1, + table=policie_tbls.CNAT_POLICY_INCLUDE_V4) + def tearDown(self): + self.vapi.cnat_session_purge() for i in self.pg_interfaces: i.unconfig_ip4() i.unconfig_ip6() i.admin_down() super(TestCNatSourceNAT, self).tearDown() - def cnat_create_translation(self, srcNatAddr, interface, isV6=False): - t1 = VppCNATSourceNat(self, srcNatAddr) - t1.add_vpp_config() - cnat_arc_name = "ip6-unicast" if isV6 else "ip4-unicast" - cnat_feature_name = "ip6-cnat-snat" if isV6 else "ip4-cnat-snat" - self.vapi.feature_enable_disable( - enable=1, - arc_name=cnat_arc_name, - feature_name=cnat_feature_name, - sw_if_index=interface.sw_if_index) + def test_snat_v6(self): + # """ CNat Source Nat v6 """ + self.sourcenat_test_tcp_udp_conf(TCP, isV6=True) + self.sourcenat_test_tcp_udp_conf(UDP, isV6=True) + self.sourcenat_test_icmp_err_conf(isV6=True) + self.sourcenat_test_icmp_echo6_conf() - return t1 + def test_snat_v4(self): + # """ CNat Source Nat v4 """ + self.sourcenat_test_tcp_udp_conf(TCP) + self.sourcenat_test_tcp_udp_conf(UDP) + self.sourcenat_test_icmp_err_conf() + self.sourcenat_test_icmp_echo4_conf() - def cnat_test_sourcenat(self, srcNatAddr, l4p=TCP, isV6=False): - ip_v = "ip6" if isV6 else "ip4" - ip_class = IPv6 if isV6 else IP - sports = [1234, 1235, 1236] - dports = [6661, 6662, 6663] + def sourcenat_test_icmp_echo6_conf(self): + sports = [1234, 1235] + dports = [6661, 6662] - self.pg0.generate_remote_hosts(1) - self.pg0.configure_ipv4_neighbors() - self.pg0.configure_ipv6_neighbors() - self.pg1.generate_remote_hosts(len(sports)) - self.pg1.configure_ipv4_neighbors() - self.pg1.configure_ipv6_neighbors() + for nbr, remote_host in enumerate(self.pg1.remote_hosts): + client_addr = self.pg0.remote_hosts[0].ip6 + remote_addr = self.pg1.remote_hosts[nbr].ip6 + src_nat_addr = self.pg2.remote_hosts[0].ip6 - self.vapi.cli("test cnat scanner on") - t1 = self.cnat_create_translation(srcNatAddr, self.pg0) + # ping from pods to outside network + p1 = ( + Ether(dst=self.pg0.local_mac, + src=self.pg0.remote_hosts[0].mac) / + IPv6(src=client_addr, dst=remote_addr) / + ICMPv6EchoRequest(id=0xfeed) / + Raw()) + + rxs = self.send_and_expect( + self.pg0, + p1 * N_PKTS, + self.pg1) + + for rx in rxs: + self.assertEqual(rx[IPv6].src, src_nat_addr) + self.assert_packet_checksums_valid(rx) + + received_id = rx[0][ICMPv6EchoRequest].id + # ping reply from outside to pods + p2 = ( + Ether(dst=self.pg1.local_mac, + src=self.pg1.remote_hosts[nbr].mac) / + IPv6(src=remote_addr, dst=src_nat_addr) / + ICMPv6EchoReply(id=received_id)) + rxs = self.send_and_expect( + self.pg1, + p2 * N_PKTS, + self.pg0) + + for rx in rxs: + self.assert_packet_checksums_valid(rx) + self.assertEqual(rx[IPv6].src, remote_addr) + self.assertEqual(rx[ICMPv6EchoReply].id, 0xfeed) + + def sourcenat_test_icmp_echo4_conf(self): + sports = [1234, 1235] + dports = [6661, 6662] + + for nbr, remote_host in enumerate(self.pg1.remote_hosts): + IP46 = IP + client_addr = self.pg0.remote_hosts[0].ip4 + remote_addr = self.pg1.remote_hosts[nbr].ip4 + src_nat_addr = self.pg2.remote_hosts[0].ip4 + + # ping from pods to outside network + p1 = ( + Ether(dst=self.pg0.local_mac, + src=self.pg0.remote_hosts[0].mac) / + IP46(src=client_addr, dst=remote_addr) / + ICMP(type=8, id=0xfeed) / + Raw()) + + rxs = self.send_and_expect( + self.pg0, + p1 * N_PKTS, + self.pg1) + + for rx in rxs: + self.assertEqual(rx[IP46].src, src_nat_addr) + self.assert_packet_checksums_valid(rx) + + received_id = rx[0][ICMP].id + # ping reply from outside to pods + p2 = ( + Ether(dst=self.pg1.local_mac, + src=self.pg1.remote_hosts[nbr].mac) / + IP46(src=remote_addr, dst=src_nat_addr) / + ICMP(type=0, id=received_id)) + rxs = self.send_and_expect( + self.pg1, + p2 * N_PKTS, + self.pg0) + + for rx in rxs: + self.assert_packet_checksums_valid(rx) + self.assertEqual(rx[IP46].src, remote_addr) + self.assertEqual(rx[ICMP].id, 0xfeed) + + def sourcenat_test_icmp_err_conf(self, isV6=False): + sports = [1234, 1235] + dports = [6661, 6662] for nbr, remote_host in enumerate(self.pg1.remote_hosts): + if isV6: + IP46 = IPv6 + client_addr = self.pg0.remote_hosts[0].ip6 + remote_addr = self.pg1.remote_hosts[nbr].ip6 + src_nat_addr = self.pg2.remote_hosts[0].ip6 + ICMP46 = ICMPv6DestUnreach + ICMPelem = ICMPv6DestUnreach(code=1) + IP46error = IPerror6 + else: + IP46 = IP + client_addr = self.pg0.remote_hosts[0].ip4 + remote_addr = self.pg1.remote_hosts[nbr].ip4 + src_nat_addr = self.pg2.remote_hosts[0].ip4 + IP46error = IPerror + ICMP46 = ICMP + ICMPelem = ICMP(type=11) + # from pods to outside network p1 = ( - Ether( - dst=self.pg0.local_mac, - src=self.pg0.remote_hosts[0].mac) / - ip_class( - src=getattr(self.pg0.remote_hosts[0], ip_v), - dst=getattr(remote_host, ip_v)) / + Ether(dst=self.pg0.local_mac, + src=self.pg0.remote_hosts[0].mac) / + IP46(src=client_addr, dst=remote_addr) / + TCP(sport=sports[nbr], dport=dports[nbr]) / + Raw()) + + rxs = self.send_and_expect( + self.pg0, + p1 * N_PKTS, + self.pg1) + for rx in rxs: + self.assert_packet_checksums_valid(rx) + self.assertEqual(rx[IP46].dst, remote_addr) + self.assertEqual(rx[TCP].dport, dports[nbr]) + self.assertEqual(rx[IP46].src, src_nat_addr) + sport = rx[TCP].sport + + InnerIP = rxs[0][IP46] + # from outside to pods, ICMP error + p2 = ( + Ether(dst=self.pg1.local_mac, + src=self.pg1.remote_hosts[nbr].mac) / + IP46(src=remote_addr, dst=src_nat_addr) / + ICMPelem / InnerIP) + + rxs = self.send_and_expect( + self.pg1, + p2 * N_PKTS, + self.pg0) + + for rx in rxs: + self.assert_packet_checksums_valid(rx) + self.assertEqual(rx[IP46].src, remote_addr) + self.assertEqual(rx[ICMP46][IP46error].src, client_addr) + self.assertEqual(rx[ICMP46][IP46error].dst, remote_addr) + self.assertEqual(rx[ICMP46][IP46error] + [TCPerror].sport, sports[nbr]) + self.assertEqual(rx[ICMP46][IP46error] + [TCPerror].dport, dports[nbr]) + + def sourcenat_test_tcp_udp_conf(self, l4p, isV6=False): + sports = [1234, 1235] + dports = [6661, 6662] + + for nbr, remote_host in enumerate(self.pg1.remote_hosts): + if isV6: + IP46 = IPv6 + client_addr = self.pg0.remote_hosts[0].ip6 + remote_addr = self.pg1.remote_hosts[nbr].ip6 + src_nat_addr = self.pg2.remote_hosts[0].ip6 + exclude_prefix = ip_network( + "%s/100" % remote_addr, strict=False) + else: + IP46 = IP + client_addr = self.pg0.remote_hosts[0].ip4 + remote_addr = self.pg1.remote_hosts[nbr].ip4 + src_nat_addr = self.pg2.remote_hosts[0].ip4 + exclude_prefix = ip_network( + "%s/16" % remote_addr, strict=False) + # from pods to outside network + p1 = ( + Ether(dst=self.pg0.local_mac, + src=self.pg0.remote_hosts[0].mac) / + IP46(src=client_addr, dst=remote_addr) / l4p(sport=sports[nbr], dport=dports[nbr]) / Raw()) + self.vapi.cli("trace add pg-input 1") rxs = self.send_and_expect( - self.pg0, - p1 * N_PKTS, - self.pg1) + self.pg0, + p1 * N_PKTS, + self.pg1) + self.logger.info(self.vapi.cli("show trace max 1")) + for rx in rxs: self.assert_packet_checksums_valid(rx) - self.assertEqual( - rx[ip_class].dst, - getattr(remote_host, ip_v)) + self.assertEqual(rx[IP46].dst, remote_addr) self.assertEqual(rx[l4p].dport, dports[nbr]) - self.assertEqual( - rx[ip_class].src, - srcNatAddr) + self.assertEqual(rx[IP46].src, src_nat_addr) sport = rx[l4p].sport # from outside to pods p2 = ( - Ether( - dst=self.pg1.local_mac, - src=self.pg1.remote_hosts[nbr].mac) / - ip_class(src=getattr(remote_host, ip_v), dst=srcNatAddr) / + Ether(dst=self.pg1.local_mac, + src=self.pg1.remote_hosts[nbr].mac) / + IP46(src=remote_addr, dst=src_nat_addr) / l4p(sport=dports[nbr], dport=sport) / Raw()) rxs = self.send_and_expect( - self.pg1, - p2 * N_PKTS, - self.pg0) + self.pg1, + p2 * N_PKTS, + self.pg0) for rx in rxs: self.assert_packet_checksums_valid(rx) - self.assertEqual( - rx[ip_class].dst, - getattr(self.pg0.remote_hosts[0], ip_v)) + self.assertEqual(rx[IP46].dst, client_addr) self.assertEqual(rx[l4p].dport, sports[nbr]) self.assertEqual(rx[l4p].sport, dports[nbr]) - self.assertEqual( - rx[ip_class].src, - getattr(remote_host, ip_v)) + self.assertEqual(rx[IP46].src, remote_addr) # add remote host to exclude list - subnet_mask = 100 if isV6 else 16 - subnet = getattr(remote_host, ip_v) + "/" + str(subnet_mask) - exclude_subnet = ip_network(subnet, strict=False) - - t1.cnat_exclude_subnet(exclude_subnet) + self.vapi.cnat_snat_policy_add_del_exclude_pfx( + prefix=exclude_prefix, is_add=1) self.vapi.cnat_session_purge() rxs = self.send_and_expect( - self.pg0, - p1 * N_PKTS, - self.pg1) + self.pg0, + p1 * N_PKTS, + self.pg1) for rx in rxs: self.assert_packet_checksums_valid(rx) - self.assertEqual( - rx[ip_class].dst, - getattr(remote_host, ip_v)) + self.assertEqual(rx[IP46].dst, remote_addr) self.assertEqual(rx[l4p].dport, dports[nbr]) - self.assertEqual( - rx[ip_class].src, - getattr(self.pg0.remote_hosts[0], ip_v)) + self.assertEqual(rx[IP46].src, client_addr) # remove remote host from exclude list - t1.cnat_exclude_subnet(exclude_subnet, isAdd=False) + self.vapi.cnat_snat_policy_add_del_exclude_pfx( + prefix=exclude_prefix, is_add=0) self.vapi.cnat_session_purge() rxs = self.send_and_expect( - self.pg0, - p1 * N_PKTS, - self.pg1) + self.pg0, + p1 * N_PKTS, + self.pg1) for rx in rxs: self.assert_packet_checksums_valid(rx) - self.assertEqual( - rx[ip_class].dst, - getattr(remote_host, ip_v)) + self.assertEqual(rx[IP46].dst, remote_addr) self.assertEqual(rx[l4p].dport, dports[nbr]) - self.assertEqual( - rx[ip_class].src, - srcNatAddr) - - # def test_cnat6_sourcenat(self): - # # """ CNat Source Nat ipv6 """ - # self.cnat_test_sourcenat(self.pg2.remote_hosts[0].ip6, TCP, True) - # self.cnat_test_sourcenat(self.pg2.remote_hosts[0].ip6, UDP, True) - - def test_cnat4_sourcenat(self): - # """ CNat Source Nat ipv4 """ - self.cnat_test_sourcenat(self.pg2.remote_hosts[0].ip4, TCP) - self.cnat_test_sourcenat(self.pg2.remote_hosts[0].ip4, UDP) + self.assertEqual(rx[IP46].src, src_nat_addr) + + self.vapi.cnat_session_purge() + + +class TestCNatDHCP(VppTestCase): + """ CNat Translation """ + extra_vpp_punt_config = ["cnat", "{", + "session-db-buckets", "64", + "session-cleanup-timeout", "0.1", + "session-max-age", "1", + "tcp-max-age", "1", + "scanner", "off", "}"] + + @classmethod + def setUpClass(cls): + super(TestCNatDHCP, cls).setUpClass() + + @classmethod + def tearDownClass(cls): + super(TestCNatDHCP, cls).tearDownClass() + + def tearDown(self): + for i in self.pg_interfaces: + i.admin_down() + super(TestCNatDHCP, self).tearDown() + + def create_translation(self, vip_pg, *args, is_v6=False): + vip = Ep(sw_if_index=vip_pg.sw_if_index, is_v6=is_v6) + paths = [] + for (src_pg, dst_pg) in args: + paths.append(EpTuple( + Ep.from_pg(src_pg, is_v6=is_v6), + Ep.from_pg(dst_pg, is_v6=is_v6) + )) + t1 = VppCNatTranslation(self, TCP, vip, paths) + t1.add_vpp_config() + return t1 + + def make_addr(self, sw_if_index, i, is_v6): + if is_v6: + return "fd01:%x::%u" % (sw_if_index, i + 1) + else: + return "172.16.%u.%u" % (sw_if_index, i) + + def make_prefix(self, sw_if_index, i, is_v6): + if is_v6: + return "%s/128" % self.make_addr(sw_if_index, i, is_v6) + else: + return "%s/32" % self.make_addr(sw_if_index, i, is_v6) + + def check_resolved(self, tr, vip_pg, *args, i=0, is_v6=False): + qt1 = tr.query_vpp_config() + self.assertEqual(str(qt1.vip.addr), self.make_addr( + vip_pg.sw_if_index, i, is_v6)) + for (src_pg, dst_pg), path in zip(args, qt1.paths): + if src_pg: + self.assertEqual(str(path.src_ep.addr), self.make_addr( + src_pg.sw_if_index, i, is_v6)) + if dst_pg: + self.assertEqual(str(path.dst_ep.addr), self.make_addr( + dst_pg.sw_if_index, i, is_v6)) + + def config_ips(self, rng, is_add=1, is_v6=False): + for pg, i in product(self.pg_interfaces, rng): + self.vapi.sw_interface_add_del_address( + sw_if_index=pg.sw_if_index, + prefix=self.make_prefix(pg.sw_if_index, i, is_v6), + is_add=is_add) + + def test_dhcp_v4(self): + self.create_pg_interfaces(range(5)) + for i in self.pg_interfaces: + i.admin_up() + pglist = (self.pg0, (self.pg1, self.pg2), (self.pg1, self.pg4)) + t1 = self.create_translation(*pglist) + self.config_ips([0]) + self.check_resolved(t1, *pglist) + self.config_ips([1]) + self.config_ips([0], is_add=0) + self.check_resolved(t1, *pglist, i=1) + self.config_ips([1], is_add=0) + t1.remove_vpp_config() + + def test_dhcp_v6(self): + self.create_pg_interfaces(range(5)) + for i in self.pg_interfaces: + i.admin_up() + pglist = (self.pg0, (self.pg1, self.pg2), (self.pg1, self.pg4)) + t1 = self.create_translation(*pglist, is_v6=True) + self.config_ips([0], is_v6=True) + self.check_resolved(t1, *pglist, is_v6=True) + self.config_ips([1], is_v6=True) + self.config_ips([0], is_add=0, is_v6=True) + self.check_resolved(t1, *pglist, i=1, is_v6=True) + self.config_ips([1], is_add=0, is_v6=True) + t1.remove_vpp_config() + + def test_dhcp_snat(self): + self.create_pg_interfaces(range(1)) + for i in self.pg_interfaces: + i.admin_up() + self.vapi.cnat_set_snat_addresses(sw_if_index=self.pg0.sw_if_index) + self.config_ips([0], is_v6=False) + self.config_ips([0], is_v6=True) + r = self.vapi.cnat_get_snat_addresses() + self.assertEqual(str(r.snat_ip4), self.make_addr( + self.pg0.sw_if_index, 0, False)) + self.assertEqual(str(r.snat_ip6), self.make_addr( + self.pg0.sw_if_index, 0, True)) + self.config_ips([1], is_v6=False) + self.config_ips([1], is_v6=True) + self.config_ips([0], is_add=0, is_v6=False) + self.config_ips([0], is_add=0, is_v6=True) + r = self.vapi.cnat_get_snat_addresses() + self.assertEqual(str(r.snat_ip4), self.make_addr( + self.pg0.sw_if_index, 1, False)) + self.assertEqual(str(r.snat_ip6), self.make_addr( + self.pg0.sw_if_index, 1, True)) + self.config_ips([1], is_add=0, is_v6=False) + self.config_ips([1], is_add=0, is_v6=True) + self.vapi.cnat_set_snat_addresses(sw_if_index=INVALID_INDEX) + if __name__ == '__main__': unittest.main(testRunner=VppTestRunner)