#!/usr/bin/env python3 import unittest from framework import VppTestCase, VppTestRunner from vpp_ip import DpoProto from scapy.packet import Raw from scapy.layers.l2 import Ether from scapy.layers.inet import IP, UDP, TCP from scapy.layers.inet6 import IPv6 from ipaddress import ip_address, ip_network, \ IPv4Address, IPv6Address, IPv4Network, IPv6Network from vpp_object import VppObject from vpp_papi import VppEnum N_PKTS = 15 def find_cnat_translation(test, id): ts = test.vapi.cnat_translation_dump() for t in ts: if id == t.translation.id: return True return False class Ep(object): """ CNat endpoint """ def __init__(self, ip, port, l4p=TCP): self.ip = ip self.port = port self.l4p = l4p def encode(self): return {'addr': self.ip, 'port': self.port} def __str__(self): return ("%s:%d" % (self.ip, self.port)) class EpTuple(object): """ CNat endpoint """ def __init__(self, src, dst): self.src = src self.dst = dst def encode(self): return {'src_ep': self.src.encode(), 'dst_ep': self.dst.encode()} def __str__(self): return ("%s->%s" % (self.src, self.dst)) class VppCNatTranslation(VppObject): def __init__(self, test, iproto, vip, paths): self._test = test self.vip = vip self.iproto = iproto self.paths = paths self.encoded_paths = [] for path in self.paths: self.encoded_paths.append(path.encode()) @property def vl4_proto(self): ip_proto = VppEnum.vl_api_ip_proto_t return { UDP: ip_proto.IP_API_PROTO_UDP, TCP: ip_proto.IP_API_PROTO_TCP, }[self.iproto] def delete(self): r = self._test.vapi.cnat_translation_del(id=self.id) def add_vpp_config(self): r = self._test.vapi.cnat_translation_update( {'vip': self.vip.encode(), 'ip_proto': self.vl4_proto, 'n_paths': len(self.paths), 'paths': self.encoded_paths}) self._test.registry.register(self, self._test.logger) self.id = r.id def modify_vpp_config(self, paths): self.paths = paths self.encoded_paths = [] for path in self.paths: self.encoded_paths.append(path.encode()) r = self._test.vapi.cnat_translation_update( {'vip': self.vip.encode(), 'ip_proto': self.vl4_proto, 'n_paths': len(self.paths), 'paths': self.encoded_paths}) self._test.registry.register(self, self._test.logger) def remove_vpp_config(self): self._test.vapi.cnat_translation_del(self.id) def query_vpp_config(self): return find_cnat_translation(self._test, self.id) def object_id(self): return ("cnat-translation-%s" % (self.vip)) def get_stats(self): c = self._test.statistics.get_counter("/net/cnat-translation") return c[0][self.id] class VppCNATSourceNat(VppObject): def __init__(self, test, address, exclude_subnets=[]): self._test = test self.address = address self.exclude_subnets = exclude_subnets def add_vpp_config(self): a = ip_address(self.address) if 4 == a.version: self._test.vapi.cnat_set_snat_addresses(snat_ip4=self.address) else: self._test.vapi.cnat_set_snat_addresses(snat_ip6=self.address) for subnet in self.exclude_subnets: self.cnat_exclude_subnet(subnet, True) def cnat_exclude_subnet(self, exclude_subnet, isAdd=True): add = 1 if isAdd else 0 self._test.vapi.cnat_add_del_snat_prefix( prefix=exclude_subnet, is_add=add) def query_vpp_config(self): return False def remove_vpp_config(self): return False class TestCNatTranslation(VppTestCase): """ CNat Translation """ extra_vpp_punt_config = ["cnat", "{", "session-max-age", "1", "tcp-max-age", "1", "}"] @classmethod def setUpClass(cls): super(TestCNatTranslation, cls).setUpClass() @classmethod def tearDownClass(cls): super(TestCNatTranslation, cls).tearDownClass() def setUp(self): super(TestCNatTranslation, self).setUp() self.create_pg_interfaces(range(3)) for i in self.pg_interfaces: i.admin_up() i.config_ip4() i.resolve_arp() i.config_ip6() i.resolve_ndp() def tearDown(self): for i in self.pg_interfaces: i.unconfig_ip4() i.unconfig_ip6() i.admin_down() super(TestCNatTranslation, self).tearDown() def cnat_create_translation(self, vip, nbr, isV6=False): ip_v = "ip6" if isV6 else "ip4" dep = Ep(getattr(self.pg1.remote_hosts[nbr], ip_v), 4000 + nbr) sep = Ep("::", 0) if isV6 else Ep("0.0.0.0", 0) t1 = VppCNatTranslation( self, vip.l4p, vip, [EpTuple(sep, dep), EpTuple(sep, dep)]) t1.add_vpp_config() return t1 def cnat_test_translation(self, t1, nbr, sports, isV6=False): ip_v = "ip6" if isV6 else "ip4" ip_class = IPv6 if isV6 else IP vip = t1.vip # # Flows # for src in self.pg0.remote_hosts: for sport in sports: # from client to vip p1 = (Ether(dst=self.pg0.local_mac, src=src.mac) / ip_class(src=getattr(src, ip_v), dst=vip.ip) / vip.l4p(sport=sport, dport=vip.port) / Raw()) self.vapi.cli("trace add pg-input 1") rxs = self.send_and_expect(self.pg0, p1 * N_PKTS, self.pg1) for rx in rxs: self.assert_packet_checksums_valid(rx) self.assertEqual( rx[ip_class].dst, getattr(self.pg1.remote_hosts[nbr], ip_v)) self.assertEqual(rx[vip.l4p].dport, 4000 + nbr) self.assertEqual( rx[ip_class].src, getattr(src, ip_v)) self.assertEqual(rx[vip.l4p].sport, sport) # from vip to client p1 = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) / ip_class(src=getattr( self.pg1.remote_hosts[nbr], ip_v), dst=getattr(src, ip_v)) / vip.l4p(sport=4000 + nbr, dport=sport) / Raw()) rxs = self.send_and_expect(self.pg1, p1 * N_PKTS, self.pg0) for rx in rxs: self.assert_packet_checksums_valid(rx) self.assertEqual( rx[ip_class].dst, getattr(src, ip_v)) self.assertEqual(rx[vip.l4p].dport, sport) self.assertEqual(rx[ip_class].src, vip.ip) self.assertEqual(rx[vip.l4p].sport, vip.port) # # packets to the VIP that do not match a # translation are dropped # p1 = (Ether(dst=self.pg0.local_mac, src=src.mac) / ip_class(src=getattr(src, ip_v), dst=vip.ip) / vip.l4p(sport=sport, dport=6666) / Raw()) self.send_and_assert_no_replies(self.pg0, p1 * N_PKTS, self.pg1) # # packets from the VIP that do not match a # session are forwarded # p1 = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) / ip_class(src=getattr( self.pg1.remote_hosts[nbr], ip_v), dst=getattr(src, ip_v)) / vip.l4p(sport=6666, dport=sport) / Raw()) rxs = self.send_and_expect(self.pg1, p1 * N_PKTS, self.pg0) self.assertEqual(t1.get_stats()['packets'], N_PKTS * len(sports) * len(self.pg0.remote_hosts)) def cnat_test_translation_update(self, t1, sports, isV6=False): ip_v = "ip6" if isV6 else "ip4" ip_class = IPv6 if isV6 else IP vip = t1.vip # # modify the translation to use a different backend # dep = Ep(getattr(self.pg2, 'remote_' + ip_v), 5000) sep = Ep("::", 0) if isV6 else Ep("0.0.0.0", 0) t1.modify_vpp_config([EpTuple(sep, dep)]) # # existing flows follow the old path # for src in self.pg0.remote_hosts: for sport in sports: # from client to vip p1 = (Ether(dst=self.pg0.local_mac, src=src.mac) / ip_class(src=getattr(src, ip_v), dst=vip.ip) / vip.l4p(sport=sport, dport=vip.port) / Raw()) rxs = self.send_and_expect(self.pg0, p1 * N_PKTS, self.pg1) # # new flows go to the new backend # for src in self.pg0.remote_hosts: p1 = (Ether(dst=self.pg0.local_mac, src=src.mac) / ip_class(src=getattr(src, ip_v), dst=vip.ip) / vip.l4p(sport=9999, dport=vip.port) / Raw()) rxs = self.send_and_expect(self.pg0, p1 * N_PKTS, self.pg2) def cnat_translation(self, vips, isV6=False): """ CNat Translation """ ip_class = IPv6 if isV6 else IP ip_v = "ip6" if isV6 else "ip4" sports = [1234, 1233] # # turn the scanner off whilst testing otherwise sessions # will time out # self.vapi.cli("test cnat scanner off") sessions = self.vapi.cnat_session_dump() trs = [] for nbr, vip in enumerate(vips): trs.append(self.cnat_create_translation(vip, nbr, isV6=isV6)) self.logger.info(self.vapi.cli("sh cnat client")) self.logger.info(self.vapi.cli("sh cnat translation")) # # translations # for nbr, vip in enumerate(vips): self.cnat_test_translation(trs[nbr], nbr, sports, isV6=isV6) self.cnat_test_translation_update(trs[nbr], sports, isV6=isV6) if isV6: self.logger.info(self.vapi.cli( "sh ip6 fib %s" % self.pg0.remote_ip6)) else: self.logger.info(self.vapi.cli( "sh ip fib %s" % self.pg0.remote_ip4)) self.logger.info(self.vapi.cli("sh cnat session verbose")) # # turn the scanner back on and wait untill the sessions # all disapper # self.vapi.cli("test cnat scanner on") n_tries = 0 sessions = self.vapi.cnat_session_dump() while (len(sessions) and n_tries < 100): n_tries += 1 sessions = self.vapi.cnat_session_dump() self.sleep(2) self.assertTrue(n_tries < 100) # # load some flows again and purge # for vip in vips: for src in self.pg0.remote_hosts: for sport in sports: # from client to vip p1 = (Ether(dst=self.pg0.local_mac, src=src.mac) / ip_class(src=getattr(src, ip_v), dst=vip.ip) / vip.l4p(sport=sport, dport=vip.port) / Raw()) self.send_and_expect(self.pg0, p1 * N_PKTS, self.pg2) for tr in trs: tr.delete() self.assertTrue(self.vapi.cnat_session_dump()) self.vapi.cnat_session_purge() self.assertFalse(self.vapi.cnat_session_dump()) def test_cnat6(self): # """ CNat Translation ipv6 """ vips = [ Ep("30::1", 5555), Ep("30::2", 5554), Ep("30::2", 5553, UDP), ] self.pg0.generate_remote_hosts(len(vips)) self.pg0.configure_ipv6_neighbors() self.pg1.generate_remote_hosts(len(vips)) self.pg1.configure_ipv6_neighbors() self.cnat_translation(vips, isV6=True) def test_cnat4(self): # """ CNat Translation ipv4 """ vips = [ Ep("30.0.0.1", 5555), Ep("30.0.0.2", 5554), Ep("30.0.0.2", 5553, UDP), ] self.pg0.generate_remote_hosts(len(vips)) self.pg0.configure_ipv4_neighbors() self.pg1.generate_remote_hosts(len(vips)) self.pg1.configure_ipv4_neighbors() self.cnat_translation(vips) class TestCNatSourceNAT(VppTestCase): """ CNat Source NAT """ extra_vpp_punt_config = ["cnat", "{", "session-max-age", "1", "tcp-max-age", "1", "}"] @classmethod def setUpClass(cls): super(TestCNatSourceNAT, cls).setUpClass() @classmethod def tearDownClass(cls): super(TestCNatSourceNAT, cls).tearDownClass() def setUp(self): super(TestCNatSourceNAT, self).setUp() self.create_pg_interfaces(range(3)) for i in self.pg_interfaces: i.admin_up() i.config_ip4() i.resolve_arp() i.config_ip6() i.resolve_ndp() def tearDown(self): for i in self.pg_interfaces: i.unconfig_ip4() i.unconfig_ip6() i.admin_down() super(TestCNatSourceNAT, self).tearDown() def cnat_create_translation(self, srcNatAddr, interface, isV6=False): t1 = VppCNATSourceNat(self, srcNatAddr) t1.add_vpp_config() cnat_arc_name = "ip6-unicast" if isV6 else "ip4-unicast" cnat_feature_name = "ip6-cnat-snat" if isV6 else "ip4-cnat-snat" self.vapi.feature_enable_disable( enable=1, arc_name=cnat_arc_name, feature_name=cnat_feature_name, sw_if_index=interface.sw_if_index) return t1 def cnat_test_sourcenat(self, srcNatAddr, l4p=TCP, isV6=False): ip_v = "ip6" if isV6 else "ip4" ip_class = IPv6 if isV6 else IP sports = [1234, 1235, 1236] dports = [6661, 6662, 6663] self.pg0.generate_remote_hosts(1) self.pg0.configure_ipv4_neighbors() self.pg0.configure_ipv6_neighbors() self.pg1.generate_remote_hosts(len(sports)) self.pg1.configure_ipv4_neighbors() self.pg1.configure_ipv6_neighbors() self.vapi.cli("test cnat scanner on") t1 = self.cnat_create_translation(srcNatAddr, self.pg0) for nbr, remote_host in enumerate(self.pg1.remote_hosts): # from pods to outside network p1 = ( Ether( dst=self.pg0.local_mac, src=self.pg0.remote_hosts[0].mac) / ip_class( src=getattr(self.pg0.remote_hosts[0], ip_v), dst=getattr(remote_host, ip_v)) / l4p(sport=sports[nbr], dport=dports[nbr]) / Raw()) rxs = self.send_and_expect( self.pg0, p1 * N_PKTS, self.pg1) for rx in rxs: self.assert_packet_checksums_valid(rx) self.assertEqual( rx[ip_class].dst, getattr(remote_host, ip_v)) self.assertEqual(rx[l4p].dport, dports[nbr]) self.assertEqual( rx[ip_class].src, srcNatAddr) sport = rx[l4p].sport # from outside to pods p2 = ( Ether( dst=self.pg1.local_mac, src=self.pg1.remote_hosts[nbr].mac) / ip_class(src=getattr(remote_host, ip_v), dst=srcNatAddr) / l4p(sport=dports[nbr], dport=sport) / Raw()) rxs = self.send_and_expect( self.pg1, p2 * N_PKTS, self.pg0) for rx in rxs: self.assert_packet_checksums_valid(rx) self.assertEqual( rx[ip_class].dst, getattr(self.pg0.remote_hosts[0], ip_v)) self.assertEqual(rx[l4p].dport, sports[nbr]) self.assertEqual(rx[l4p].sport, dports[nbr]) self.assertEqual( rx[ip_class].src, getattr(remote_host, ip_v)) # add remote host to exclude list subnet_mask = 100 if isV6 else 16 subnet = getattr(remote_host, ip_v) + "/" + str(subnet_mask) exclude_subnet = ip_network(subnet, strict=False) t1.cnat_exclude_subnet(exclude_subnet) self.vapi.cnat_session_purge() rxs = self.send_and_expect( self.pg0, p1 * N_PKTS, self.pg1) for rx in rxs: self.assert_packet_checksums_valid(rx) self.assertEqual( rx[ip_class].dst, getattr(remote_host, ip_v)) self.assertEqual(rx[l4p].dport, dports[nbr]) self.assertEqual( rx[ip_class].src, getattr(self.pg0.remote_hosts[0], ip_v)) # remove remote host from exclude list t1.cnat_exclude_subnet(exclude_subnet, isAdd=False) self.vapi.cnat_session_purge() rxs = self.send_and_expect( self.pg0, p1 * N_PKTS, self.pg1) for rx in rxs: self.assert_packet_checksums_valid(rx) self.assertEqual( rx[ip_class].dst, getattr(remote_host, ip_v)) self.assertEqual(rx[l4p].dport, dports[nbr]) self.assertEqual( rx[ip_class].src, srcNatAddr) # def test_cnat6_sourcenat(self): # # """ CNat Source Nat ipv6 """ # self.cnat_test_sourcenat(self.pg2.remote_hosts[0].ip6, TCP, True) # self.cnat_test_sourcenat(self.pg2.remote_hosts[0].ip6, UDP, True) def test_cnat4_sourcenat(self): # """ CNat Source Nat ipv4 """ self.cnat_test_sourcenat(self.pg2.remote_hosts[0].ip4, TCP) self.cnat_test_sourcenat(self.pg2.remote_hosts[0].ip4, UDP) if __name__ == '__main__': unittest.main(testRunner=VppTestRunner)