import unittest
from framework import VppTestCase, VppTestRunner
-from vpp_ip import DpoProto
+from vpp_ip import DpoProto, INVALID_INDEX
+from itertools import product
from scapy.packet import Raw
from scapy.layers.l2 import Ether
-from scapy.layers.inet import IP, UDP, TCP
-from scapy.layers.inet6 import IPv6
+from scapy.layers.inet import IP, UDP, TCP, ICMP
+from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
+from scapy.layers.inet6 import IPv6, IPerror6, ICMPv6DestUnreach
+from scapy.layers.inet6 import ICMPv6EchoRequest, ICMPv6EchoReply
+
+import struct
from ipaddress import ip_address, ip_network, \
IPv4Address, IPv6Address, IPv4Network, IPv6Network
N_PKTS = 15
-def find_cnat_translation(test, id):
- ts = test.vapi.cnat_translation_dump()
- for t in ts:
- if id == t.translation.id:
- return True
- return False
-
-
class Ep(object):
""" CNat endpoint """
- def __init__(self, ip, port, l4p=TCP):
+ def __init__(self, ip=None, port=0, l4p=TCP,
+ sw_if_index=INVALID_INDEX, is_v6=False):
self.ip = ip
+ if ip is None:
+ self.ip = "::" if is_v6 else "0.0.0.0"
self.port = port
self.l4p = l4p
+ self.sw_if_index = sw_if_index
+ if is_v6:
+ self.if_af = VppEnum.vl_api_address_family_t.ADDRESS_IP6
+ else:
+ self.if_af = VppEnum.vl_api_address_family_t.ADDRESS_IP4
def encode(self):
return {'addr': self.ip,
- 'port': self.port}
+ 'port': self.port,
+ 'sw_if_index': self.sw_if_index,
+ 'if_af': self.if_af}
+
+ @classmethod
+ def from_pg(cls, pg, is_v6=False):
+ if pg is None:
+ return cls(is_v6=is_v6)
+ else:
+ return cls(sw_if_index=pg.sw_if_index, is_v6=is_v6)
+
+ @property
+ def isV6(self):
+ return ":" in self.ip
def __str__(self):
return ("%s:%d" % (self.ip, self.port))
for path in self.paths:
self.encoded_paths.append(path.encode())
+ def __str__(self):
+ return ("%s %s %s" % (self.vip, self.iproto, self.paths))
+
@property
def vl4_proto(self):
ip_proto = VppEnum.vl_api_ip_proto_t
TCP: ip_proto.IP_API_PROTO_TCP,
}[self.iproto]
- def delete(self):
- r = self._test.vapi.cnat_translation_del(id=self.id)
-
def add_vpp_config(self):
r = self._test.vapi.cnat_translation_update(
{'vip': self.vip.encode(),
self._test.registry.register(self, self._test.logger)
def remove_vpp_config(self):
- self._test.vapi.cnat_translation_del(self.id)
+ self._test.vapi.cnat_translation_del(id=self.id)
def query_vpp_config(self):
- return find_cnat_translation(self._test, self.id)
+ for t in self._test.vapi.cnat_translation_dump():
+ if self.id == t.translation.id:
+ return t.translation
+ return None
def object_id(self):
return ("cnat-translation-%s" % (self.vip))
return c[0][self.id]
-class VppCNATSourceNat(VppObject):
-
- def __init__(self, test, address, exclude_subnets=[]):
- self._test = test
- self.address = address
- self.exclude_subnets = exclude_subnets
-
- def add_vpp_config(self):
- a = ip_address(self.address)
- if 4 == a.version:
- self._test.vapi.cnat_set_snat_addresses(snat_ip4=self.address)
- else:
- self._test.vapi.cnat_set_snat_addresses(snat_ip6=self.address)
- for subnet in self.exclude_subnets:
- self.cnat_exclude_subnet(subnet, True)
-
- def cnat_exclude_subnet(self, exclude_subnet, isAdd=True):
- add = 1 if isAdd else 0
- self._test.vapi.cnat_add_del_snat_prefix(
- prefix=exclude_subnet, is_add=add)
-
- def query_vpp_config(self):
- return False
-
- def remove_vpp_config(self):
- return False
-
-
class TestCNatTranslation(VppTestCase):
""" CNat Translation """
extra_vpp_punt_config = ["cnat", "{",
+ "session-db-buckets", "64",
+ "session-cleanup-timeout", "0.1",
"session-max-age", "1",
- "tcp-max-age", "1", "}"]
+ "tcp-max-age", "1",
+ "scanner", "off", "}"]
@classmethod
def setUpClass(cls):
i.admin_down()
super(TestCNatTranslation, self).tearDown()
- def cnat_create_translation(self, vip, nbr, isV6=False):
- ip_v = "ip6" if isV6 else "ip4"
+ def cnat_create_translation(self, vip, nbr):
+ ip_v = "ip6" if vip.isV6 else "ip4"
dep = Ep(getattr(self.pg1.remote_hosts[nbr], ip_v), 4000 + nbr)
- sep = Ep("::", 0) if isV6 else Ep("0.0.0.0", 0)
+ sep = Ep("::", 0) if vip.isV6 else Ep("0.0.0.0", 0)
t1 = VppCNatTranslation(
self, vip.l4p, vip,
[EpTuple(sep, dep), EpTuple(sep, dep)])
rxs = self.send_and_expect(self.pg0,
p1 * N_PKTS,
self.pg1)
+ self.logger.info(self.vapi.cli("show trace max 1"))
for rx in rxs:
self.assert_packet_checksums_valid(rx)
p1 * N_PKTS,
self.pg0)
- self.assertEqual(t1.get_stats()['packets'],
- N_PKTS *
- len(sports) *
- len(self.pg0.remote_hosts))
-
def cnat_test_translation_update(self, t1, sports, isV6=False):
ip_v = "ip6" if isV6 else "ip4"
ip_class = IPv6 if isV6 else IP
trs = []
for nbr, vip in enumerate(vips):
- trs.append(self.cnat_create_translation(vip, nbr, isV6=isV6))
+ trs.append(self.cnat_create_translation(vip, nbr))
self.logger.info(self.vapi.cli("sh cnat client"))
self.logger.info(self.vapi.cli("sh cnat translation"))
self.logger.info(self.vapi.cli("sh cnat session verbose"))
#
- # turn the scanner back on and wait untill the sessions
+ # turn the scanner back on and wait until the sessions
# all disapper
#
self.vapi.cli("test cnat scanner on")
n_tries += 1
sessions = self.vapi.cnat_session_dump()
self.sleep(2)
+ self.logger.info(self.vapi.cli("show cnat session verbose"))
self.assertTrue(n_tries < 100)
+ self.vapi.cli("test cnat scanner off")
#
# load some flows again and purge
self.pg2)
for tr in trs:
- tr.delete()
+ tr.remove_vpp_config()
self.assertTrue(self.vapi.cnat_session_dump())
self.vapi.cnat_session_purge()
self.assertFalse(self.vapi.cnat_session_dump())
+ def test_icmp(self):
+ vips = [
+ Ep("30.0.0.1", 5555),
+ Ep("30.0.0.2", 5554),
+ Ep("30.0.0.2", 5553, UDP),
+ Ep("30::1", 6666),
+ Ep("30::2", 5553, UDP),
+ ]
+ sport = 1234
+
+ self.pg0.generate_remote_hosts(len(vips))
+ self.pg0.configure_ipv6_neighbors()
+ self.pg0.configure_ipv4_neighbors()
+
+ self.pg1.generate_remote_hosts(len(vips))
+ self.pg1.configure_ipv6_neighbors()
+ self.pg1.configure_ipv4_neighbors()
+
+ self.vapi.cli("test cnat scanner off")
+ trs = []
+ for nbr, vip in enumerate(vips):
+ trs.append(self.cnat_create_translation(vip, nbr))
+
+ self.logger.info(self.vapi.cli("sh cnat client"))
+ self.logger.info(self.vapi.cli("sh cnat translation"))
+
+ for nbr, vip in enumerate(vips):
+ if vip.isV6:
+ client_addr = self.pg0.remote_hosts[0].ip6
+ remote_addr = self.pg1.remote_hosts[nbr].ip6
+ remote2_addr = self.pg2.remote_hosts[0].ip6
+ else:
+ client_addr = self.pg0.remote_hosts[0].ip4
+ remote_addr = self.pg1.remote_hosts[nbr].ip4
+ remote2_addr = self.pg2.remote_hosts[0].ip4
+ IP46 = IPv6 if vip.isV6 else IP
+ # from client to vip
+ p1 = (Ether(dst=self.pg0.local_mac,
+ src=self.pg0.remote_hosts[0].mac) /
+ IP46(src=client_addr, dst=vip.ip) /
+ vip.l4p(sport=sport, dport=vip.port) /
+ Raw())
+
+ rxs = self.send_and_expect(self.pg0,
+ p1 * N_PKTS,
+ self.pg1)
+
+ for rx in rxs:
+ self.assert_packet_checksums_valid(rx)
+ self.assertEqual(rx[IP46].dst, remote_addr)
+ self.assertEqual(rx[vip.l4p].dport, 4000 + nbr)
+ self.assertEqual(rx[IP46].src, client_addr)
+ self.assertEqual(rx[vip.l4p].sport, sport)
+
+ InnerIP = rxs[0][IP46]
+
+ ICMP46 = ICMPv6DestUnreach if vip.isV6 else ICMP
+ ICMPelem = ICMPv6DestUnreach(code=1) if vip.isV6 else ICMP(type=11)
+ # from vip to client, ICMP error
+ p1 = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
+ IP46(src=remote_addr, dst=client_addr) /
+ ICMPelem / InnerIP)
+
+ rxs = self.send_and_expect(self.pg1,
+ p1 * N_PKTS,
+ self.pg0)
+
+ TCPUDPError = TCPerror if vip.l4p == TCP else UDPerror
+ IP46error = IPerror6 if vip.isV6 else IPerror
+ for rx in rxs:
+ self.assert_packet_checksums_valid(rx)
+ self.assertEqual(rx[IP46].src, vip.ip)
+ self.assertEqual(rx[ICMP46][IP46error].src, client_addr)
+ self.assertEqual(rx[ICMP46][IP46error].dst, vip.ip)
+ self.assertEqual(rx[ICMP46][IP46error]
+ [TCPUDPError].sport, sport)
+ self.assertEqual(rx[ICMP46][IP46error]
+ [TCPUDPError].dport, vip.port)
+
+ # from other remote to client, ICMP error
+ # outside shouldn't be NAT-ed
+ p1 = (Ether(dst=self.pg2.local_mac, src=self.pg2.remote_mac) /
+ IP46(src=remote2_addr, dst=client_addr) /
+ ICMPelem / InnerIP)
+
+ rxs = self.send_and_expect(self.pg1,
+ p1 * N_PKTS,
+ self.pg0)
+
+ TCPUDPError = TCPerror if vip.l4p == TCP else UDPerror
+ IP46error = IPerror6 if vip.isV6 else IPerror
+ for rx in rxs:
+ self.assert_packet_checksums_valid(rx)
+ self.assertEqual(rx[IP46].src, remote2_addr)
+ self.assertEqual(rx[ICMP46][IP46error].src, client_addr)
+ self.assertEqual(rx[ICMP46][IP46error].dst, vip.ip)
+ self.assertEqual(rx[ICMP46][IP46error]
+ [TCPUDPError].sport, sport)
+ self.assertEqual(rx[ICMP46][IP46error]
+ [TCPUDPError].dport, vip.port)
+
+ self.vapi.cnat_session_purge()
+
def test_cnat6(self):
# """ CNat Translation ipv6 """
vips = [
class TestCNatSourceNAT(VppTestCase):
""" CNat Source NAT """
extra_vpp_punt_config = ["cnat", "{",
+ "session-cleanup-timeout", "0.1",
"session-max-age", "1",
- "tcp-max-age", "1", "}"]
+ "tcp-max-age", "1",
+ "scanner", "off", "}"]
@classmethod
def setUpClass(cls):
i.config_ip6()
i.resolve_ndp()
+ self.pg0.configure_ipv6_neighbors()
+ self.pg0.configure_ipv4_neighbors()
+ self.pg1.generate_remote_hosts(2)
+ self.pg1.configure_ipv4_neighbors()
+ self.pg1.configure_ipv6_neighbors()
+
+ self.vapi.cnat_set_snat_addresses(
+ snat_ip4=self.pg2.remote_hosts[0].ip4,
+ snat_ip6=self.pg2.remote_hosts[0].ip6,
+ sw_if_index=INVALID_INDEX)
+ self.vapi.feature_enable_disable(
+ enable=1,
+ arc_name="ip6-unicast",
+ feature_name="cnat-snat-ip6",
+ sw_if_index=self.pg0.sw_if_index)
+ self.vapi.feature_enable_disable(
+ enable=1,
+ arc_name="ip4-unicast",
+ feature_name="cnat-snat-ip4",
+ sw_if_index=self.pg0.sw_if_index)
+
+ policie_tbls = VppEnum.vl_api_cnat_snat_policy_table_t
+ self.vapi.cnat_set_snat_policy(
+ policy=VppEnum.vl_api_cnat_snat_policies_t.CNAT_POLICY_IF_PFX)
+ for i in self.pg_interfaces:
+ self.vapi.cnat_snat_policy_add_del_if(
+ sw_if_index=i.sw_if_index, is_add=1,
+ table=policie_tbls.CNAT_POLICY_INCLUDE_V6)
+ self.vapi.cnat_snat_policy_add_del_if(
+ sw_if_index=i.sw_if_index, is_add=1,
+ table=policie_tbls.CNAT_POLICY_INCLUDE_V4)
+
def tearDown(self):
+ self.vapi.cnat_session_purge()
for i in self.pg_interfaces:
i.unconfig_ip4()
i.unconfig_ip6()
i.admin_down()
super(TestCNatSourceNAT, self).tearDown()
- def cnat_create_translation(self, srcNatAddr, interface, isV6=False):
- t1 = VppCNATSourceNat(self, srcNatAddr)
- t1.add_vpp_config()
- cnat_arc_name = "ip6-unicast" if isV6 else "ip4-unicast"
- cnat_feature_name = "ip6-cnat-snat" if isV6 else "ip4-cnat-snat"
- self.vapi.feature_enable_disable(
- enable=1,
- arc_name=cnat_arc_name,
- feature_name=cnat_feature_name,
- sw_if_index=interface.sw_if_index)
+ def test_snat_v6(self):
+ # """ CNat Source Nat v6 """
+ self.sourcenat_test_tcp_udp_conf(TCP, isV6=True)
+ self.sourcenat_test_tcp_udp_conf(UDP, isV6=True)
+ self.sourcenat_test_icmp_err_conf(isV6=True)
+ self.sourcenat_test_icmp_echo6_conf()
- return t1
+ def test_snat_v4(self):
+ # """ CNat Source Nat v4 """
+ self.sourcenat_test_tcp_udp_conf(TCP)
+ self.sourcenat_test_tcp_udp_conf(UDP)
+ self.sourcenat_test_icmp_err_conf()
+ self.sourcenat_test_icmp_echo4_conf()
- def cnat_test_sourcenat(self, srcNatAddr, l4p=TCP, isV6=False):
- ip_v = "ip6" if isV6 else "ip4"
- ip_class = IPv6 if isV6 else IP
- sports = [1234, 1235, 1236]
- dports = [6661, 6662, 6663]
+ def sourcenat_test_icmp_echo6_conf(self):
+ sports = [1234, 1235]
+ dports = [6661, 6662]
- self.pg0.generate_remote_hosts(1)
- self.pg0.configure_ipv4_neighbors()
- self.pg0.configure_ipv6_neighbors()
- self.pg1.generate_remote_hosts(len(sports))
- self.pg1.configure_ipv4_neighbors()
- self.pg1.configure_ipv6_neighbors()
+ for nbr, remote_host in enumerate(self.pg1.remote_hosts):
+ client_addr = self.pg0.remote_hosts[0].ip6
+ remote_addr = self.pg1.remote_hosts[nbr].ip6
+ src_nat_addr = self.pg2.remote_hosts[0].ip6
- self.vapi.cli("test cnat scanner on")
- t1 = self.cnat_create_translation(srcNatAddr, self.pg0)
+ # ping from pods to outside network
+ p1 = (
+ Ether(dst=self.pg0.local_mac,
+ src=self.pg0.remote_hosts[0].mac) /
+ IPv6(src=client_addr, dst=remote_addr) /
+ ICMPv6EchoRequest(id=0xfeed) /
+ Raw())
+
+ rxs = self.send_and_expect(
+ self.pg0,
+ p1 * N_PKTS,
+ self.pg1)
+
+ for rx in rxs:
+ self.assertEqual(rx[IPv6].src, src_nat_addr)
+ self.assert_packet_checksums_valid(rx)
+
+ received_id = rx[0][ICMPv6EchoRequest].id
+ # ping reply from outside to pods
+ p2 = (
+ Ether(dst=self.pg1.local_mac,
+ src=self.pg1.remote_hosts[nbr].mac) /
+ IPv6(src=remote_addr, dst=src_nat_addr) /
+ ICMPv6EchoReply(id=received_id))
+ rxs = self.send_and_expect(
+ self.pg1,
+ p2 * N_PKTS,
+ self.pg0)
+
+ for rx in rxs:
+ self.assert_packet_checksums_valid(rx)
+ self.assertEqual(rx[IPv6].src, remote_addr)
+ self.assertEqual(rx[ICMPv6EchoReply].id, 0xfeed)
+
+ def sourcenat_test_icmp_echo4_conf(self):
+ sports = [1234, 1235]
+ dports = [6661, 6662]
+
+ for nbr, remote_host in enumerate(self.pg1.remote_hosts):
+ IP46 = IP
+ client_addr = self.pg0.remote_hosts[0].ip4
+ remote_addr = self.pg1.remote_hosts[nbr].ip4
+ src_nat_addr = self.pg2.remote_hosts[0].ip4
+
+ # ping from pods to outside network
+ p1 = (
+ Ether(dst=self.pg0.local_mac,
+ src=self.pg0.remote_hosts[0].mac) /
+ IP46(src=client_addr, dst=remote_addr) /
+ ICMP(type=8, id=0xfeed) /
+ Raw())
+
+ rxs = self.send_and_expect(
+ self.pg0,
+ p1 * N_PKTS,
+ self.pg1)
+
+ for rx in rxs:
+ self.assertEqual(rx[IP46].src, src_nat_addr)
+ self.assert_packet_checksums_valid(rx)
+
+ received_id = rx[0][ICMP].id
+ # ping reply from outside to pods
+ p2 = (
+ Ether(dst=self.pg1.local_mac,
+ src=self.pg1.remote_hosts[nbr].mac) /
+ IP46(src=remote_addr, dst=src_nat_addr) /
+ ICMP(type=0, id=received_id))
+ rxs = self.send_and_expect(
+ self.pg1,
+ p2 * N_PKTS,
+ self.pg0)
+
+ for rx in rxs:
+ self.assert_packet_checksums_valid(rx)
+ self.assertEqual(rx[IP46].src, remote_addr)
+ self.assertEqual(rx[ICMP].id, 0xfeed)
+
+ def sourcenat_test_icmp_err_conf(self, isV6=False):
+ sports = [1234, 1235]
+ dports = [6661, 6662]
for nbr, remote_host in enumerate(self.pg1.remote_hosts):
+ if isV6:
+ IP46 = IPv6
+ client_addr = self.pg0.remote_hosts[0].ip6
+ remote_addr = self.pg1.remote_hosts[nbr].ip6
+ src_nat_addr = self.pg2.remote_hosts[0].ip6
+ ICMP46 = ICMPv6DestUnreach
+ ICMPelem = ICMPv6DestUnreach(code=1)
+ IP46error = IPerror6
+ else:
+ IP46 = IP
+ client_addr = self.pg0.remote_hosts[0].ip4
+ remote_addr = self.pg1.remote_hosts[nbr].ip4
+ src_nat_addr = self.pg2.remote_hosts[0].ip4
+ IP46error = IPerror
+ ICMP46 = ICMP
+ ICMPelem = ICMP(type=11)
+
# from pods to outside network
p1 = (
- Ether(
- dst=self.pg0.local_mac,
- src=self.pg0.remote_hosts[0].mac) /
- ip_class(
- src=getattr(self.pg0.remote_hosts[0], ip_v),
- dst=getattr(remote_host, ip_v)) /
+ Ether(dst=self.pg0.local_mac,
+ src=self.pg0.remote_hosts[0].mac) /
+ IP46(src=client_addr, dst=remote_addr) /
+ TCP(sport=sports[nbr], dport=dports[nbr]) /
+ Raw())
+
+ rxs = self.send_and_expect(
+ self.pg0,
+ p1 * N_PKTS,
+ self.pg1)
+ for rx in rxs:
+ self.assert_packet_checksums_valid(rx)
+ self.assertEqual(rx[IP46].dst, remote_addr)
+ self.assertEqual(rx[TCP].dport, dports[nbr])
+ self.assertEqual(rx[IP46].src, src_nat_addr)
+ sport = rx[TCP].sport
+
+ InnerIP = rxs[0][IP46]
+ # from outside to pods, ICMP error
+ p2 = (
+ Ether(dst=self.pg1.local_mac,
+ src=self.pg1.remote_hosts[nbr].mac) /
+ IP46(src=remote_addr, dst=src_nat_addr) /
+ ICMPelem / InnerIP)
+
+ rxs = self.send_and_expect(
+ self.pg1,
+ p2 * N_PKTS,
+ self.pg0)
+
+ for rx in rxs:
+ self.assert_packet_checksums_valid(rx)
+ self.assertEqual(rx[IP46].src, remote_addr)
+ self.assertEqual(rx[ICMP46][IP46error].src, client_addr)
+ self.assertEqual(rx[ICMP46][IP46error].dst, remote_addr)
+ self.assertEqual(rx[ICMP46][IP46error]
+ [TCPerror].sport, sports[nbr])
+ self.assertEqual(rx[ICMP46][IP46error]
+ [TCPerror].dport, dports[nbr])
+
+ def sourcenat_test_tcp_udp_conf(self, l4p, isV6=False):
+ sports = [1234, 1235]
+ dports = [6661, 6662]
+
+ for nbr, remote_host in enumerate(self.pg1.remote_hosts):
+ if isV6:
+ IP46 = IPv6
+ client_addr = self.pg0.remote_hosts[0].ip6
+ remote_addr = self.pg1.remote_hosts[nbr].ip6
+ src_nat_addr = self.pg2.remote_hosts[0].ip6
+ exclude_prefix = ip_network(
+ "%s/100" % remote_addr, strict=False)
+ else:
+ IP46 = IP
+ client_addr = self.pg0.remote_hosts[0].ip4
+ remote_addr = self.pg1.remote_hosts[nbr].ip4
+ src_nat_addr = self.pg2.remote_hosts[0].ip4
+ exclude_prefix = ip_network(
+ "%s/16" % remote_addr, strict=False)
+ # from pods to outside network
+ p1 = (
+ Ether(dst=self.pg0.local_mac,
+ src=self.pg0.remote_hosts[0].mac) /
+ IP46(src=client_addr, dst=remote_addr) /
l4p(sport=sports[nbr], dport=dports[nbr]) /
Raw())
+ self.vapi.cli("trace add pg-input 1")
rxs = self.send_and_expect(
- self.pg0,
- p1 * N_PKTS,
- self.pg1)
+ self.pg0,
+ p1 * N_PKTS,
+ self.pg1)
+ self.logger.info(self.vapi.cli("show trace max 1"))
+
for rx in rxs:
self.assert_packet_checksums_valid(rx)
- self.assertEqual(
- rx[ip_class].dst,
- getattr(remote_host, ip_v))
+ self.assertEqual(rx[IP46].dst, remote_addr)
self.assertEqual(rx[l4p].dport, dports[nbr])
- self.assertEqual(
- rx[ip_class].src,
- srcNatAddr)
+ self.assertEqual(rx[IP46].src, src_nat_addr)
sport = rx[l4p].sport
# from outside to pods
p2 = (
- Ether(
- dst=self.pg1.local_mac,
- src=self.pg1.remote_hosts[nbr].mac) /
- ip_class(src=getattr(remote_host, ip_v), dst=srcNatAddr) /
+ Ether(dst=self.pg1.local_mac,
+ src=self.pg1.remote_hosts[nbr].mac) /
+ IP46(src=remote_addr, dst=src_nat_addr) /
l4p(sport=dports[nbr], dport=sport) /
Raw())
rxs = self.send_and_expect(
- self.pg1,
- p2 * N_PKTS,
- self.pg0)
+ self.pg1,
+ p2 * N_PKTS,
+ self.pg0)
for rx in rxs:
self.assert_packet_checksums_valid(rx)
- self.assertEqual(
- rx[ip_class].dst,
- getattr(self.pg0.remote_hosts[0], ip_v))
+ self.assertEqual(rx[IP46].dst, client_addr)
self.assertEqual(rx[l4p].dport, sports[nbr])
self.assertEqual(rx[l4p].sport, dports[nbr])
- self.assertEqual(
- rx[ip_class].src,
- getattr(remote_host, ip_v))
+ self.assertEqual(rx[IP46].src, remote_addr)
# add remote host to exclude list
- subnet_mask = 100 if isV6 else 16
- subnet = getattr(remote_host, ip_v) + "/" + str(subnet_mask)
- exclude_subnet = ip_network(subnet, strict=False)
-
- t1.cnat_exclude_subnet(exclude_subnet)
+ self.vapi.cnat_snat_policy_add_del_exclude_pfx(
+ prefix=exclude_prefix, is_add=1)
self.vapi.cnat_session_purge()
rxs = self.send_and_expect(
- self.pg0,
- p1 * N_PKTS,
- self.pg1)
+ self.pg0,
+ p1 * N_PKTS,
+ self.pg1)
for rx in rxs:
self.assert_packet_checksums_valid(rx)
- self.assertEqual(
- rx[ip_class].dst,
- getattr(remote_host, ip_v))
+ self.assertEqual(rx[IP46].dst, remote_addr)
self.assertEqual(rx[l4p].dport, dports[nbr])
- self.assertEqual(
- rx[ip_class].src,
- getattr(self.pg0.remote_hosts[0], ip_v))
+ self.assertEqual(rx[IP46].src, client_addr)
# remove remote host from exclude list
- t1.cnat_exclude_subnet(exclude_subnet, isAdd=False)
+ self.vapi.cnat_snat_policy_add_del_exclude_pfx(
+ prefix=exclude_prefix, is_add=0)
self.vapi.cnat_session_purge()
rxs = self.send_and_expect(
- self.pg0,
- p1 * N_PKTS,
- self.pg1)
+ self.pg0,
+ p1 * N_PKTS,
+ self.pg1)
for rx in rxs:
self.assert_packet_checksums_valid(rx)
- self.assertEqual(
- rx[ip_class].dst,
- getattr(remote_host, ip_v))
+ self.assertEqual(rx[IP46].dst, remote_addr)
self.assertEqual(rx[l4p].dport, dports[nbr])
- self.assertEqual(
- rx[ip_class].src,
- srcNatAddr)
-
- # def test_cnat6_sourcenat(self):
- # # """ CNat Source Nat ipv6 """
- # self.cnat_test_sourcenat(self.pg2.remote_hosts[0].ip6, TCP, True)
- # self.cnat_test_sourcenat(self.pg2.remote_hosts[0].ip6, UDP, True)
-
- def test_cnat4_sourcenat(self):
- # """ CNat Source Nat ipv4 """
- self.cnat_test_sourcenat(self.pg2.remote_hosts[0].ip4, TCP)
- self.cnat_test_sourcenat(self.pg2.remote_hosts[0].ip4, UDP)
+ self.assertEqual(rx[IP46].src, src_nat_addr)
+
+ self.vapi.cnat_session_purge()
+
+
+class TestCNatDHCP(VppTestCase):
+ """ CNat Translation """
+ extra_vpp_punt_config = ["cnat", "{",
+ "session-db-buckets", "64",
+ "session-cleanup-timeout", "0.1",
+ "session-max-age", "1",
+ "tcp-max-age", "1",
+ "scanner", "off", "}"]
+
+ @classmethod
+ def setUpClass(cls):
+ super(TestCNatDHCP, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(TestCNatDHCP, cls).tearDownClass()
+
+ def tearDown(self):
+ for i in self.pg_interfaces:
+ i.admin_down()
+ super(TestCNatDHCP, self).tearDown()
+
+ def create_translation(self, vip_pg, *args, is_v6=False):
+ vip = Ep(sw_if_index=vip_pg.sw_if_index, is_v6=is_v6)
+ paths = []
+ for (src_pg, dst_pg) in args:
+ paths.append(EpTuple(
+ Ep.from_pg(src_pg, is_v6=is_v6),
+ Ep.from_pg(dst_pg, is_v6=is_v6)
+ ))
+ t1 = VppCNatTranslation(self, TCP, vip, paths)
+ t1.add_vpp_config()
+ return t1
+
+ def make_addr(self, sw_if_index, i, is_v6):
+ if is_v6:
+ return "fd01:%x::%u" % (sw_if_index, i + 1)
+ else:
+ return "172.16.%u.%u" % (sw_if_index, i)
+
+ def make_prefix(self, sw_if_index, i, is_v6):
+ if is_v6:
+ return "%s/128" % self.make_addr(sw_if_index, i, is_v6)
+ else:
+ return "%s/32" % self.make_addr(sw_if_index, i, is_v6)
+
+ def check_resolved(self, tr, vip_pg, *args, i=0, is_v6=False):
+ qt1 = tr.query_vpp_config()
+ self.assertEqual(str(qt1.vip.addr), self.make_addr(
+ vip_pg.sw_if_index, i, is_v6))
+ for (src_pg, dst_pg), path in zip(args, qt1.paths):
+ if src_pg:
+ self.assertEqual(str(path.src_ep.addr), self.make_addr(
+ src_pg.sw_if_index, i, is_v6))
+ if dst_pg:
+ self.assertEqual(str(path.dst_ep.addr), self.make_addr(
+ dst_pg.sw_if_index, i, is_v6))
+
+ def config_ips(self, rng, is_add=1, is_v6=False):
+ for pg, i in product(self.pg_interfaces, rng):
+ self.vapi.sw_interface_add_del_address(
+ sw_if_index=pg.sw_if_index,
+ prefix=self.make_prefix(pg.sw_if_index, i, is_v6),
+ is_add=is_add)
+
+ def test_dhcp_v4(self):
+ self.create_pg_interfaces(range(5))
+ for i in self.pg_interfaces:
+ i.admin_up()
+ pglist = (self.pg0, (self.pg1, self.pg2), (self.pg1, self.pg4))
+ t1 = self.create_translation(*pglist)
+ self.config_ips([0])
+ self.check_resolved(t1, *pglist)
+ self.config_ips([1])
+ self.config_ips([0], is_add=0)
+ self.check_resolved(t1, *pglist, i=1)
+ self.config_ips([1], is_add=0)
+ t1.remove_vpp_config()
+
+ def test_dhcp_v6(self):
+ self.create_pg_interfaces(range(5))
+ for i in self.pg_interfaces:
+ i.admin_up()
+ pglist = (self.pg0, (self.pg1, self.pg2), (self.pg1, self.pg4))
+ t1 = self.create_translation(*pglist, is_v6=True)
+ self.config_ips([0], is_v6=True)
+ self.check_resolved(t1, *pglist, is_v6=True)
+ self.config_ips([1], is_v6=True)
+ self.config_ips([0], is_add=0, is_v6=True)
+ self.check_resolved(t1, *pglist, i=1, is_v6=True)
+ self.config_ips([1], is_add=0, is_v6=True)
+ t1.remove_vpp_config()
+
+ def test_dhcp_snat(self):
+ self.create_pg_interfaces(range(1))
+ for i in self.pg_interfaces:
+ i.admin_up()
+ self.vapi.cnat_set_snat_addresses(sw_if_index=self.pg0.sw_if_index)
+ self.config_ips([0], is_v6=False)
+ self.config_ips([0], is_v6=True)
+ r = self.vapi.cnat_get_snat_addresses()
+ self.assertEqual(str(r.snat_ip4), self.make_addr(
+ self.pg0.sw_if_index, 0, False))
+ self.assertEqual(str(r.snat_ip6), self.make_addr(
+ self.pg0.sw_if_index, 0, True))
+ self.config_ips([1], is_v6=False)
+ self.config_ips([1], is_v6=True)
+ self.config_ips([0], is_add=0, is_v6=False)
+ self.config_ips([0], is_add=0, is_v6=True)
+ r = self.vapi.cnat_get_snat_addresses()
+ self.assertEqual(str(r.snat_ip4), self.make_addr(
+ self.pg0.sw_if_index, 1, False))
+ self.assertEqual(str(r.snat_ip6), self.make_addr(
+ self.pg0.sw_if_index, 1, True))
+ self.config_ips([1], is_add=0, is_v6=False)
+ self.config_ips([1], is_add=0, is_v6=True)
+ self.vapi.cnat_set_snat_addresses(sw_if_index=INVALID_INDEX)
+
if __name__ == '__main__':
unittest.main(testRunner=VppTestRunner)