import unittest
from framework import VppTestCase, VppTestRunner
-from vpp_ip import DpoProto
+from vpp_ip import DpoProto, INVALID_INDEX
+from itertools import product
from scapy.packet import Raw
from scapy.layers.l2 import Ether
from scapy.layers.inet import IP, UDP, TCP, ICMP
from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
from scapy.layers.inet6 import IPv6, IPerror6, ICMPv6DestUnreach
+from scapy.layers.inet6 import ICMPv6EchoRequest, ICMPv6EchoReply
import struct
N_PKTS = 15
-def find_cnat_translation(test, id):
- ts = test.vapi.cnat_translation_dump()
- for t in ts:
- if id == t.translation.id:
- return True
- return False
-
-
class Ep(object):
""" CNat endpoint """
- def __init__(self, ip, port, l4p=TCP):
+ def __init__(self, ip=None, port=0, l4p=TCP,
+ sw_if_index=INVALID_INDEX, is_v6=False):
self.ip = ip
+ if ip is None:
+ self.ip = "::" if is_v6 else "0.0.0.0"
self.port = port
self.l4p = l4p
+ self.sw_if_index = sw_if_index
+ if is_v6:
+ self.if_af = VppEnum.vl_api_address_family_t.ADDRESS_IP6
+ else:
+ self.if_af = VppEnum.vl_api_address_family_t.ADDRESS_IP4
def encode(self):
return {'addr': self.ip,
- 'port': self.port}
+ 'port': self.port,
+ 'sw_if_index': self.sw_if_index,
+ 'if_af': self.if_af}
+
+ @classmethod
+ def from_pg(cls, pg, is_v6=False):
+ if pg is None:
+ return cls(is_v6=is_v6)
+ else:
+ return cls(sw_if_index=pg.sw_if_index, is_v6=is_v6)
@property
def isV6(self):
for path in self.paths:
self.encoded_paths.append(path.encode())
+ def __str__(self):
+ return ("%s %s %s" % (self.vip, self.iproto, self.paths))
+
@property
def vl4_proto(self):
ip_proto = VppEnum.vl_api_ip_proto_t
TCP: ip_proto.IP_API_PROTO_TCP,
}[self.iproto]
- def delete(self):
- r = self._test.vapi.cnat_translation_del(id=self.id)
-
def add_vpp_config(self):
r = self._test.vapi.cnat_translation_update(
{'vip': self.vip.encode(),
self._test.registry.register(self, self._test.logger)
def remove_vpp_config(self):
- self._test.vapi.cnat_translation_del(self.id)
+ self._test.vapi.cnat_translation_del(id=self.id)
def query_vpp_config(self):
- return find_cnat_translation(self._test, self.id)
+ for t in self._test.vapi.cnat_translation_dump():
+ if self.id == t.translation.id:
+ return t.translation
+ return None
def object_id(self):
return ("cnat-translation-%s" % (self.vip))
return c[0][self.id]
-class VppCNATSourceNat(VppObject):
-
- def __init__(self, test, address, exclude_subnets=[]):
- self._test = test
- self.address = address
- self.exclude_subnets = exclude_subnets
-
- def add_vpp_config(self):
- a = ip_address(self.address)
- if 4 == a.version:
- self._test.vapi.cnat_set_snat_addresses(snat_ip4=self.address)
- else:
- self._test.vapi.cnat_set_snat_addresses(snat_ip6=self.address)
- for subnet in self.exclude_subnets:
- self.cnat_exclude_subnet(subnet, True)
-
- def cnat_exclude_subnet(self, exclude_subnet, isAdd=True):
- add = 1 if isAdd else 0
- self._test.vapi.cnat_add_del_snat_prefix(
- prefix=exclude_subnet, is_add=add)
-
- def query_vpp_config(self):
- return False
-
- def remove_vpp_config(self):
- return False
-
-
class TestCNatTranslation(VppTestCase):
""" CNat Translation """
extra_vpp_punt_config = ["cnat", "{",
rxs = self.send_and_expect(self.pg0,
p1 * N_PKTS,
self.pg1)
+ self.logger.info(self.vapi.cli("show trace max 1"))
for rx in rxs:
self.assert_packet_checksums_valid(rx)
p1 * N_PKTS,
self.pg0)
- self.assertEqual(t1.get_stats()['packets'],
- N_PKTS *
- len(sports) *
- len(self.pg0.remote_hosts))
-
def cnat_test_translation_update(self, t1, sports, isV6=False):
ip_v = "ip6" if isV6 else "ip4"
ip_class = IPv6 if isV6 else IP
n_tries += 1
sessions = self.vapi.cnat_session_dump()
self.sleep(2)
- print(self.vapi.cli("show cnat session verbose"))
+ self.logger.info(self.vapi.cli("show cnat session verbose"))
self.assertTrue(n_tries < 100)
self.vapi.cli("test cnat scanner off")
self.pg2)
for tr in trs:
- tr.delete()
+ tr.remove_vpp_config()
self.assertTrue(self.vapi.cnat_session_dump())
self.vapi.cnat_session_purge()
class TestCNatSourceNAT(VppTestCase):
""" CNat Source NAT """
extra_vpp_punt_config = ["cnat", "{",
+ "session-cleanup-timeout", "0.1",
"session-max-age", "1",
- "tcp-max-age", "1", "}"]
+ "tcp-max-age", "1",
+ "scanner", "off", "}"]
@classmethod
def setUpClass(cls):
i.config_ip6()
i.resolve_ndp()
+ self.pg0.configure_ipv6_neighbors()
+ self.pg0.configure_ipv4_neighbors()
+ self.pg1.generate_remote_hosts(2)
+ self.pg1.configure_ipv4_neighbors()
+ self.pg1.configure_ipv6_neighbors()
+
+ self.vapi.cnat_set_snat_addresses(
+ snat_ip4=self.pg2.remote_hosts[0].ip4,
+ snat_ip6=self.pg2.remote_hosts[0].ip6,
+ sw_if_index=INVALID_INDEX)
+ self.vapi.feature_enable_disable(
+ enable=1,
+ arc_name="ip6-unicast",
+ feature_name="cnat-snat-ip6",
+ sw_if_index=self.pg0.sw_if_index)
+ self.vapi.feature_enable_disable(
+ enable=1,
+ arc_name="ip4-unicast",
+ feature_name="cnat-snat-ip4",
+ sw_if_index=self.pg0.sw_if_index)
+
+ policie_tbls = VppEnum.vl_api_cnat_snat_policy_table_t
+ self.vapi.cnat_set_snat_policy(
+ policy=VppEnum.vl_api_cnat_snat_policies_t.CNAT_POLICY_IF_PFX)
+ for i in self.pg_interfaces:
+ self.vapi.cnat_snat_policy_add_del_if(
+ sw_if_index=i.sw_if_index, is_add=1,
+ table=policie_tbls.CNAT_POLICY_INCLUDE_V6)
+ self.vapi.cnat_snat_policy_add_del_if(
+ sw_if_index=i.sw_if_index, is_add=1,
+ table=policie_tbls.CNAT_POLICY_INCLUDE_V4)
+
def tearDown(self):
+ self.vapi.cnat_session_purge()
for i in self.pg_interfaces:
i.unconfig_ip4()
i.unconfig_ip6()
i.admin_down()
super(TestCNatSourceNAT, self).tearDown()
- def cnat_set_snat_address(self, srcNatAddr, interface, isV6=False):
- t1 = VppCNATSourceNat(self, srcNatAddr)
- t1.add_vpp_config()
- cnat_arc_name = "ip6-unicast" if isV6 else "ip4-unicast"
- cnat_feature_name = "ip6-cnat-snat" if isV6 else "ip4-cnat-snat"
- self.vapi.feature_enable_disable(
- enable=1,
- arc_name=cnat_arc_name,
- feature_name=cnat_feature_name,
- sw_if_index=interface.sw_if_index)
+ def test_snat_v6(self):
+ # """ CNat Source Nat v6 """
+ self.sourcenat_test_tcp_udp_conf(TCP, isV6=True)
+ self.sourcenat_test_tcp_udp_conf(UDP, isV6=True)
+ self.sourcenat_test_icmp_err_conf(isV6=True)
+ self.sourcenat_test_icmp_echo6_conf()
- return t1
+ def test_snat_v4(self):
+ # """ CNat Source Nat v4 """
+ self.sourcenat_test_tcp_udp_conf(TCP)
+ self.sourcenat_test_tcp_udp_conf(UDP)
+ self.sourcenat_test_icmp_err_conf()
+ self.sourcenat_test_icmp_echo4_conf()
- def cnat_test_sourcenat(self, srcNatAddr, l4p=TCP, isV6=False):
- ip_v = "ip6" if isV6 else "ip4"
- IP46 = IPv6 if isV6 else IP
- sports = [1234, 1235, 1236]
- dports = [6661, 6662, 6663]
+ def sourcenat_test_icmp_echo6_conf(self):
+ sports = [1234, 1235]
+ dports = [6661, 6662]
- self.pg0.generate_remote_hosts(1)
- self.pg0.configure_ipv4_neighbors()
- self.pg0.configure_ipv6_neighbors()
- self.pg1.generate_remote_hosts(len(sports))
- self.pg1.configure_ipv4_neighbors()
- self.pg1.configure_ipv6_neighbors()
+ for nbr, remote_host in enumerate(self.pg1.remote_hosts):
+ client_addr = self.pg0.remote_hosts[0].ip6
+ remote_addr = self.pg1.remote_hosts[nbr].ip6
+ src_nat_addr = self.pg2.remote_hosts[0].ip6
- self.vapi.cli("test cnat scanner on")
- t1 = self.cnat_set_snat_address(srcNatAddr, self.pg0, isV6)
+ # ping from pods to outside network
+ p1 = (
+ Ether(dst=self.pg0.local_mac,
+ src=self.pg0.remote_hosts[0].mac) /
+ IPv6(src=client_addr, dst=remote_addr) /
+ ICMPv6EchoRequest(id=0xfeed) /
+ Raw())
+
+ rxs = self.send_and_expect(
+ self.pg0,
+ p1 * N_PKTS,
+ self.pg1)
+
+ for rx in rxs:
+ self.assertEqual(rx[IPv6].src, src_nat_addr)
+ self.assert_packet_checksums_valid(rx)
+
+ received_id = rx[0][ICMPv6EchoRequest].id
+ # ping reply from outside to pods
+ p2 = (
+ Ether(dst=self.pg1.local_mac,
+ src=self.pg1.remote_hosts[nbr].mac) /
+ IPv6(src=remote_addr, dst=src_nat_addr) /
+ ICMPv6EchoReply(id=received_id))
+ rxs = self.send_and_expect(
+ self.pg1,
+ p2 * N_PKTS,
+ self.pg0)
+
+ for rx in rxs:
+ self.assert_packet_checksums_valid(rx)
+ self.assertEqual(rx[IPv6].src, remote_addr)
+ self.assertEqual(rx[ICMPv6EchoReply].id, 0xfeed)
+
+ def sourcenat_test_icmp_echo4_conf(self):
+ sports = [1234, 1235]
+ dports = [6661, 6662]
+
+ for nbr, remote_host in enumerate(self.pg1.remote_hosts):
+ IP46 = IP
+ client_addr = self.pg0.remote_hosts[0].ip4
+ remote_addr = self.pg1.remote_hosts[nbr].ip4
+ src_nat_addr = self.pg2.remote_hosts[0].ip4
+
+ # ping from pods to outside network
+ p1 = (
+ Ether(dst=self.pg0.local_mac,
+ src=self.pg0.remote_hosts[0].mac) /
+ IP46(src=client_addr, dst=remote_addr) /
+ ICMP(type=8, id=0xfeed) /
+ Raw())
+
+ rxs = self.send_and_expect(
+ self.pg0,
+ p1 * N_PKTS,
+ self.pg1)
+
+ for rx in rxs:
+ self.assertEqual(rx[IP46].src, src_nat_addr)
+ self.assert_packet_checksums_valid(rx)
+
+ received_id = rx[0][ICMP].id
+ # ping reply from outside to pods
+ p2 = (
+ Ether(dst=self.pg1.local_mac,
+ src=self.pg1.remote_hosts[nbr].mac) /
+ IP46(src=remote_addr, dst=src_nat_addr) /
+ ICMP(type=0, id=received_id))
+ rxs = self.send_and_expect(
+ self.pg1,
+ p2 * N_PKTS,
+ self.pg0)
+
+ for rx in rxs:
+ self.assert_packet_checksums_valid(rx)
+ self.assertEqual(rx[IP46].src, remote_addr)
+ self.assertEqual(rx[ICMP].id, 0xfeed)
+
+ def sourcenat_test_icmp_err_conf(self, isV6=False):
+ sports = [1234, 1235]
+ dports = [6661, 6662]
+
+ for nbr, remote_host in enumerate(self.pg1.remote_hosts):
+ if isV6:
+ IP46 = IPv6
+ client_addr = self.pg0.remote_hosts[0].ip6
+ remote_addr = self.pg1.remote_hosts[nbr].ip6
+ src_nat_addr = self.pg2.remote_hosts[0].ip6
+ ICMP46 = ICMPv6DestUnreach
+ ICMPelem = ICMPv6DestUnreach(code=1)
+ IP46error = IPerror6
+ else:
+ IP46 = IP
+ client_addr = self.pg0.remote_hosts[0].ip4
+ remote_addr = self.pg1.remote_hosts[nbr].ip4
+ src_nat_addr = self.pg2.remote_hosts[0].ip4
+ IP46error = IPerror
+ ICMP46 = ICMP
+ ICMPelem = ICMP(type=11)
+
+ # from pods to outside network
+ p1 = (
+ Ether(dst=self.pg0.local_mac,
+ src=self.pg0.remote_hosts[0].mac) /
+ IP46(src=client_addr, dst=remote_addr) /
+ TCP(sport=sports[nbr], dport=dports[nbr]) /
+ Raw())
+
+ rxs = self.send_and_expect(
+ self.pg0,
+ p1 * N_PKTS,
+ self.pg1)
+ for rx in rxs:
+ self.assert_packet_checksums_valid(rx)
+ self.assertEqual(rx[IP46].dst, remote_addr)
+ self.assertEqual(rx[TCP].dport, dports[nbr])
+ self.assertEqual(rx[IP46].src, src_nat_addr)
+ sport = rx[TCP].sport
+
+ InnerIP = rxs[0][IP46]
+ # from outside to pods, ICMP error
+ p2 = (
+ Ether(dst=self.pg1.local_mac,
+ src=self.pg1.remote_hosts[nbr].mac) /
+ IP46(src=remote_addr, dst=src_nat_addr) /
+ ICMPelem / InnerIP)
+
+ rxs = self.send_and_expect(
+ self.pg1,
+ p2 * N_PKTS,
+ self.pg0)
+
+ for rx in rxs:
+ self.assert_packet_checksums_valid(rx)
+ self.assertEqual(rx[IP46].src, remote_addr)
+ self.assertEqual(rx[ICMP46][IP46error].src, client_addr)
+ self.assertEqual(rx[ICMP46][IP46error].dst, remote_addr)
+ self.assertEqual(rx[ICMP46][IP46error]
+ [TCPerror].sport, sports[nbr])
+ self.assertEqual(rx[ICMP46][IP46error]
+ [TCPerror].dport, dports[nbr])
+
+ def sourcenat_test_tcp_udp_conf(self, l4p, isV6=False):
+ sports = [1234, 1235]
+ dports = [6661, 6662]
for nbr, remote_host in enumerate(self.pg1.remote_hosts):
if isV6:
+ IP46 = IPv6
client_addr = self.pg0.remote_hosts[0].ip6
remote_addr = self.pg1.remote_hosts[nbr].ip6
+ src_nat_addr = self.pg2.remote_hosts[0].ip6
+ exclude_prefix = ip_network(
+ "%s/100" % remote_addr, strict=False)
else:
+ IP46 = IP
client_addr = self.pg0.remote_hosts[0].ip4
remote_addr = self.pg1.remote_hosts[nbr].ip4
+ src_nat_addr = self.pg2.remote_hosts[0].ip4
+ exclude_prefix = ip_network(
+ "%s/16" % remote_addr, strict=False)
# from pods to outside network
p1 = (
Ether(dst=self.pg0.local_mac,
l4p(sport=sports[nbr], dport=dports[nbr]) /
Raw())
+ self.vapi.cli("trace add pg-input 1")
rxs = self.send_and_expect(
self.pg0,
p1 * N_PKTS,
self.pg1)
+ self.logger.info(self.vapi.cli("show trace max 1"))
+
for rx in rxs:
self.assert_packet_checksums_valid(rx)
self.assertEqual(rx[IP46].dst, remote_addr)
self.assertEqual(rx[l4p].dport, dports[nbr])
- self.assertEqual(rx[IP46].src, srcNatAddr)
+ self.assertEqual(rx[IP46].src, src_nat_addr)
sport = rx[l4p].sport
# from outside to pods
p2 = (
Ether(dst=self.pg1.local_mac,
src=self.pg1.remote_hosts[nbr].mac) /
- IP46(src=remote_addr, dst=srcNatAddr) /
+ IP46(src=remote_addr, dst=src_nat_addr) /
l4p(sport=dports[nbr], dport=sport) /
Raw())
self.assertEqual(rx[IP46].src, remote_addr)
# add remote host to exclude list
- subnet_mask = 100 if isV6 else 16
- subnet = "%s/%d" % (remote_addr, subnet_mask)
- exclude_subnet = ip_network(subnet, strict=False)
-
- t1.cnat_exclude_subnet(exclude_subnet)
+ self.vapi.cnat_snat_policy_add_del_exclude_pfx(
+ prefix=exclude_prefix, is_add=1)
self.vapi.cnat_session_purge()
rxs = self.send_and_expect(
self.assertEqual(rx[IP46].src, client_addr)
# remove remote host from exclude list
- t1.cnat_exclude_subnet(exclude_subnet, isAdd=False)
+ self.vapi.cnat_snat_policy_add_del_exclude_pfx(
+ prefix=exclude_prefix, is_add=0)
self.vapi.cnat_session_purge()
rxs = self.send_and_expect(
self.assert_packet_checksums_valid(rx)
self.assertEqual(rx[IP46].dst, remote_addr)
self.assertEqual(rx[l4p].dport, dports[nbr])
- self.assertEqual(rx[IP46].src, srcNatAddr)
+ self.assertEqual(rx[IP46].src, src_nat_addr)
+
+ self.vapi.cnat_session_purge()
+
+
+class TestCNatDHCP(VppTestCase):
+ """ CNat Translation """
+ extra_vpp_punt_config = ["cnat", "{",
+ "session-db-buckets", "64",
+ "session-cleanup-timeout", "0.1",
+ "session-max-age", "1",
+ "tcp-max-age", "1",
+ "scanner", "off", "}"]
+
+ @classmethod
+ def setUpClass(cls):
+ super(TestCNatDHCP, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(TestCNatDHCP, cls).tearDownClass()
+
+ def tearDown(self):
+ for i in self.pg_interfaces:
+ i.admin_down()
+ super(TestCNatDHCP, self).tearDown()
+
+ def create_translation(self, vip_pg, *args, is_v6=False):
+ vip = Ep(sw_if_index=vip_pg.sw_if_index, is_v6=is_v6)
+ paths = []
+ for (src_pg, dst_pg) in args:
+ paths.append(EpTuple(
+ Ep.from_pg(src_pg, is_v6=is_v6),
+ Ep.from_pg(dst_pg, is_v6=is_v6)
+ ))
+ t1 = VppCNatTranslation(self, TCP, vip, paths)
+ t1.add_vpp_config()
+ return t1
- def test_cnat6_sourcenat(self):
- # """ CNat Source Nat ipv6 """
- self.cnat_test_sourcenat(self.pg2.remote_hosts[0].ip6, TCP, True)
- self.cnat_test_sourcenat(self.pg2.remote_hosts[0].ip6, UDP, True)
+ def make_addr(self, sw_if_index, i, is_v6):
+ if is_v6:
+ return "fd01:%x::%u" % (sw_if_index, i + 1)
+ else:
+ return "172.16.%u.%u" % (sw_if_index, i)
- def test_cnat4_sourcenat(self):
- # """ CNat Source Nat ipv4 """
- self.cnat_test_sourcenat(self.pg2.remote_hosts[0].ip4, TCP)
- self.cnat_test_sourcenat(self.pg2.remote_hosts[0].ip4, UDP)
+ def make_prefix(self, sw_if_index, i, is_v6):
+ if is_v6:
+ return "%s/128" % self.make_addr(sw_if_index, i, is_v6)
+ else:
+ return "%s/32" % self.make_addr(sw_if_index, i, is_v6)
+
+ def check_resolved(self, tr, vip_pg, *args, i=0, is_v6=False):
+ qt1 = tr.query_vpp_config()
+ self.assertEqual(str(qt1.vip.addr), self.make_addr(
+ vip_pg.sw_if_index, i, is_v6))
+ for (src_pg, dst_pg), path in zip(args, qt1.paths):
+ if src_pg:
+ self.assertEqual(str(path.src_ep.addr), self.make_addr(
+ src_pg.sw_if_index, i, is_v6))
+ if dst_pg:
+ self.assertEqual(str(path.dst_ep.addr), self.make_addr(
+ dst_pg.sw_if_index, i, is_v6))
+
+ def config_ips(self, rng, is_add=1, is_v6=False):
+ for pg, i in product(self.pg_interfaces, rng):
+ self.vapi.sw_interface_add_del_address(
+ sw_if_index=pg.sw_if_index,
+ prefix=self.make_prefix(pg.sw_if_index, i, is_v6),
+ is_add=is_add)
+
+ def test_dhcp_v4(self):
+ self.create_pg_interfaces(range(5))
+ for i in self.pg_interfaces:
+ i.admin_up()
+ pglist = (self.pg0, (self.pg1, self.pg2), (self.pg1, self.pg4))
+ t1 = self.create_translation(*pglist)
+ self.config_ips([0])
+ self.check_resolved(t1, *pglist)
+ self.config_ips([1])
+ self.config_ips([0], is_add=0)
+ self.check_resolved(t1, *pglist, i=1)
+ self.config_ips([1], is_add=0)
+ t1.remove_vpp_config()
+
+ def test_dhcp_v6(self):
+ self.create_pg_interfaces(range(5))
+ for i in self.pg_interfaces:
+ i.admin_up()
+ pglist = (self.pg0, (self.pg1, self.pg2), (self.pg1, self.pg4))
+ t1 = self.create_translation(*pglist, is_v6=True)
+ self.config_ips([0], is_v6=True)
+ self.check_resolved(t1, *pglist, is_v6=True)
+ self.config_ips([1], is_v6=True)
+ self.config_ips([0], is_add=0, is_v6=True)
+ self.check_resolved(t1, *pglist, i=1, is_v6=True)
+ self.config_ips([1], is_add=0, is_v6=True)
+ t1.remove_vpp_config()
+
+ def test_dhcp_snat(self):
+ self.create_pg_interfaces(range(1))
+ for i in self.pg_interfaces:
+ i.admin_up()
+ self.vapi.cnat_set_snat_addresses(sw_if_index=self.pg0.sw_if_index)
+ self.config_ips([0], is_v6=False)
+ self.config_ips([0], is_v6=True)
+ r = self.vapi.cnat_get_snat_addresses()
+ self.assertEqual(str(r.snat_ip4), self.make_addr(
+ self.pg0.sw_if_index, 0, False))
+ self.assertEqual(str(r.snat_ip6), self.make_addr(
+ self.pg0.sw_if_index, 0, True))
+ self.config_ips([1], is_v6=False)
+ self.config_ips([1], is_v6=True)
+ self.config_ips([0], is_add=0, is_v6=False)
+ self.config_ips([0], is_add=0, is_v6=True)
+ r = self.vapi.cnat_get_snat_addresses()
+ self.assertEqual(str(r.snat_ip4), self.make_addr(
+ self.pg0.sw_if_index, 1, False))
+ self.assertEqual(str(r.snat_ip6), self.make_addr(
+ self.pg0.sw_if_index, 1, True))
+ self.config_ips([1], is_add=0, is_v6=False)
+ self.config_ips([1], is_add=0, is_v6=True)
+ self.vapi.cnat_set_snat_addresses(sw_if_index=INVALID_INDEX)
if __name__ == '__main__':