X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=src%2Fplugins%2Fcnat%2Ftest%2Ftest_cnat.py;h=34cd8b58240a6cdf194d87c0e0d4e63459087bb2;hb=ece39214bcb05c535ba5de9af97b5f84f6911cba;hp=518d7335edc3dcc4b391d1c93c779490ad084048;hpb=277d402bee6f4ad0ffe185515de44e6457a57a9e;p=vpp.git diff --git a/src/plugins/cnat/test/test_cnat.py b/src/plugins/cnat/test/test_cnat.py index 518d7335edc..34cd8b58240 100644 --- a/src/plugins/cnat/test/test_cnat.py +++ b/src/plugins/cnat/test/test_cnat.py @@ -7,8 +7,11 @@ from vpp_ip import DpoProto from scapy.packet import Raw from scapy.layers.l2 import Ether -from scapy.layers.inet import IP, UDP, TCP -from scapy.layers.inet6 import IPv6 +from scapy.layers.inet import IP, UDP, TCP, ICMP +from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror +from scapy.layers.inet6 import IPv6, IPerror6, ICMPv6DestUnreach + +import struct from ipaddress import ip_address, ip_network, \ IPv4Address, IPv6Address, IPv4Network, IPv6Network @@ -39,6 +42,10 @@ class Ep(object): return {'addr': self.ip, 'port': self.port} + @property + def isV6(self): + return ":" in self.ip + def __str__(self): return ("%s:%d" % (self.ip, self.port)) @@ -180,10 +187,10 @@ class TestCNatTranslation(VppTestCase): i.admin_down() super(TestCNatTranslation, self).tearDown() - def cnat_create_translation(self, vip, nbr, isV6=False): - ip_v = "ip6" if isV6 else "ip4" + def cnat_create_translation(self, vip, nbr): + ip_v = "ip6" if vip.isV6 else "ip4" dep = Ep(getattr(self.pg1.remote_hosts[nbr], ip_v), 4000 + nbr) - sep = Ep("::", 0) if isV6 else Ep("0.0.0.0", 0) + sep = Ep("::", 0) if vip.isV6 else Ep("0.0.0.0", 0) t1 = VppCNatTranslation( self, vip.l4p, vip, [EpTuple(sep, dep), EpTuple(sep, dep)]) @@ -341,7 +348,7 @@ class TestCNatTranslation(VppTestCase): trs = [] for nbr, vip in enumerate(vips): - trs.append(self.cnat_create_translation(vip, nbr, isV6=isV6)) + trs.append(self.cnat_create_translation(vip, nbr)) self.logger.info(self.vapi.cli("sh cnat client")) self.logger.info(self.vapi.cli("sh cnat translation")) @@ -372,8 +379,10 @@ class TestCNatTranslation(VppTestCase): n_tries += 1 sessions = self.vapi.cnat_session_dump() self.sleep(2) + print(self.vapi.cli("show cnat session verbose")) self.assertTrue(n_tries < 100) + self.vapi.cli("test cnat scanner off") # # load some flows again and purge @@ -398,6 +407,109 @@ class TestCNatTranslation(VppTestCase): self.vapi.cnat_session_purge() self.assertFalse(self.vapi.cnat_session_dump()) + def test_icmp(self): + vips = [ + Ep("30.0.0.1", 5555), + Ep("30.0.0.2", 5554), + Ep("30.0.0.2", 5553, UDP), + Ep("30::1", 6666), + Ep("30::2", 5553, UDP), + ] + sport = 1234 + + self.pg0.generate_remote_hosts(len(vips)) + self.pg0.configure_ipv6_neighbors() + self.pg0.configure_ipv4_neighbors() + + self.pg1.generate_remote_hosts(len(vips)) + self.pg1.configure_ipv6_neighbors() + self.pg1.configure_ipv4_neighbors() + + self.vapi.cli("test cnat scanner off") + trs = [] + for nbr, vip in enumerate(vips): + trs.append(self.cnat_create_translation(vip, nbr)) + + self.logger.info(self.vapi.cli("sh cnat client")) + self.logger.info(self.vapi.cli("sh cnat translation")) + + for nbr, vip in enumerate(vips): + if vip.isV6: + client_addr = self.pg0.remote_hosts[0].ip6 + remote_addr = self.pg1.remote_hosts[nbr].ip6 + remote2_addr = self.pg2.remote_hosts[0].ip6 + else: + client_addr = self.pg0.remote_hosts[0].ip4 + remote_addr = self.pg1.remote_hosts[nbr].ip4 + remote2_addr = self.pg2.remote_hosts[0].ip4 + IP46 = IPv6 if vip.isV6 else IP + # from client to vip + p1 = (Ether(dst=self.pg0.local_mac, + src=self.pg0.remote_hosts[0].mac) / + IP46(src=client_addr, dst=vip.ip) / + vip.l4p(sport=sport, dport=vip.port) / + Raw()) + + rxs = self.send_and_expect(self.pg0, + p1 * N_PKTS, + self.pg1) + + for rx in rxs: + self.assert_packet_checksums_valid(rx) + self.assertEqual(rx[IP46].dst, remote_addr) + self.assertEqual(rx[vip.l4p].dport, 4000 + nbr) + self.assertEqual(rx[IP46].src, client_addr) + self.assertEqual(rx[vip.l4p].sport, sport) + + InnerIP = rxs[0][IP46] + + ICMP46 = ICMPv6DestUnreach if vip.isV6 else ICMP + ICMPelem = ICMPv6DestUnreach(code=1) if vip.isV6 else ICMP(type=11) + # from vip to client, ICMP error + p1 = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) / + IP46(src=remote_addr, dst=client_addr) / + ICMPelem / InnerIP) + + rxs = self.send_and_expect(self.pg1, + p1 * N_PKTS, + self.pg0) + + TCPUDPError = TCPerror if vip.l4p == TCP else UDPerror + IP46error = IPerror6 if vip.isV6 else IPerror + for rx in rxs: + self.assert_packet_checksums_valid(rx) + self.assertEqual(rx[IP46].src, vip.ip) + self.assertEqual(rx[ICMP46][IP46error].src, client_addr) + self.assertEqual(rx[ICMP46][IP46error].dst, vip.ip) + self.assertEqual(rx[ICMP46][IP46error] + [TCPUDPError].sport, sport) + self.assertEqual(rx[ICMP46][IP46error] + [TCPUDPError].dport, vip.port) + + # from other remote to client, ICMP error + # outside shouldn't be NAT-ed + p1 = (Ether(dst=self.pg2.local_mac, src=self.pg2.remote_mac) / + IP46(src=remote2_addr, dst=client_addr) / + ICMPelem / InnerIP) + + rxs = self.send_and_expect(self.pg1, + p1 * N_PKTS, + self.pg0) + + TCPUDPError = TCPerror if vip.l4p == TCP else UDPerror + IP46error = IPerror6 if vip.isV6 else IPerror + for rx in rxs: + self.assert_packet_checksums_valid(rx) + self.assertEqual(rx[IP46].src, remote2_addr) + self.assertEqual(rx[ICMP46][IP46error].src, client_addr) + self.assertEqual(rx[ICMP46][IP46error].dst, vip.ip) + self.assertEqual(rx[ICMP46][IP46error] + [TCPUDPError].sport, sport) + self.assertEqual(rx[ICMP46][IP46error] + [TCPUDPError].dport, vip.port) + + self.vapi.cnat_session_purge() + def test_cnat6(self): # """ CNat Translation ipv6 """ vips = [ @@ -478,7 +590,7 @@ class TestCNatSourceNAT(VppTestCase): def cnat_test_sourcenat(self, srcNatAddr, l4p=TCP, isV6=False): ip_v = "ip6" if isV6 else "ip4" - ip_class = IPv6 if isV6 else IP + IP46 = IPv6 if isV6 else IP sports = [1234, 1235, 1236] dports = [6661, 6662, 6663] @@ -493,14 +605,17 @@ class TestCNatSourceNAT(VppTestCase): t1 = self.cnat_set_snat_address(srcNatAddr, self.pg0, isV6) for nbr, remote_host in enumerate(self.pg1.remote_hosts): + if isV6: + client_addr = self.pg0.remote_hosts[0].ip6 + remote_addr = self.pg1.remote_hosts[nbr].ip6 + else: + client_addr = self.pg0.remote_hosts[0].ip4 + remote_addr = self.pg1.remote_hosts[nbr].ip4 # from pods to outside network p1 = ( - Ether( - dst=self.pg0.local_mac, - src=self.pg0.remote_hosts[0].mac) / - ip_class( - src=getattr(self.pg0.remote_hosts[0], ip_v), - dst=getattr(remote_host, ip_v)) / + Ether(dst=self.pg0.local_mac, + src=self.pg0.remote_hosts[0].mac) / + IP46(src=client_addr, dst=remote_addr) / l4p(sport=sports[nbr], dport=dports[nbr]) / Raw()) @@ -510,21 +625,16 @@ class TestCNatSourceNAT(VppTestCase): self.pg1) for rx in rxs: self.assert_packet_checksums_valid(rx) - self.assertEqual( - rx[ip_class].dst, - getattr(remote_host, ip_v)) + self.assertEqual(rx[IP46].dst, remote_addr) self.assertEqual(rx[l4p].dport, dports[nbr]) - self.assertEqual( - rx[ip_class].src, - srcNatAddr) + self.assertEqual(rx[IP46].src, srcNatAddr) sport = rx[l4p].sport # from outside to pods p2 = ( - Ether( - dst=self.pg1.local_mac, - src=self.pg1.remote_hosts[nbr].mac) / - ip_class(src=getattr(remote_host, ip_v), dst=srcNatAddr) / + Ether(dst=self.pg1.local_mac, + src=self.pg1.remote_hosts[nbr].mac) / + IP46(src=remote_addr, dst=srcNatAddr) / l4p(sport=dports[nbr], dport=sport) / Raw()) @@ -535,18 +645,14 @@ class TestCNatSourceNAT(VppTestCase): for rx in rxs: self.assert_packet_checksums_valid(rx) - self.assertEqual( - rx[ip_class].dst, - getattr(self.pg0.remote_hosts[0], ip_v)) + self.assertEqual(rx[IP46].dst, client_addr) self.assertEqual(rx[l4p].dport, sports[nbr]) self.assertEqual(rx[l4p].sport, dports[nbr]) - self.assertEqual( - rx[ip_class].src, - getattr(remote_host, ip_v)) + self.assertEqual(rx[IP46].src, remote_addr) # add remote host to exclude list subnet_mask = 100 if isV6 else 16 - subnet = getattr(remote_host, ip_v) + "/" + str(subnet_mask) + subnet = "%s/%d" % (remote_addr, subnet_mask) exclude_subnet = ip_network(subnet, strict=False) t1.cnat_exclude_subnet(exclude_subnet) @@ -558,13 +664,9 @@ class TestCNatSourceNAT(VppTestCase): self.pg1) for rx in rxs: self.assert_packet_checksums_valid(rx) - self.assertEqual( - rx[ip_class].dst, - getattr(remote_host, ip_v)) + self.assertEqual(rx[IP46].dst, remote_addr) self.assertEqual(rx[l4p].dport, dports[nbr]) - self.assertEqual( - rx[ip_class].src, - getattr(self.pg0.remote_hosts[0], ip_v)) + self.assertEqual(rx[IP46].src, client_addr) # remove remote host from exclude list t1.cnat_exclude_subnet(exclude_subnet, isAdd=False) @@ -577,13 +679,9 @@ class TestCNatSourceNAT(VppTestCase): for rx in rxs: self.assert_packet_checksums_valid(rx) - self.assertEqual( - rx[ip_class].dst, - getattr(remote_host, ip_v)) + self.assertEqual(rx[IP46].dst, remote_addr) self.assertEqual(rx[l4p].dport, dports[nbr]) - self.assertEqual( - rx[ip_class].src, - srcNatAddr) + self.assertEqual(rx[IP46].src, srcNatAddr) def test_cnat6_sourcenat(self): # """ CNat Source Nat ipv6 """