- self.sourcenat_test_icmp_err_conf()
- self.sourcenat_test_icmp_echo4_conf()
-
- def sourcenat_test_icmp_echo6_conf(self):
- sports = [1234, 1235]
- dports = [6661, 6662]
-
- for nbr, remote_host in enumerate(self.pg1.remote_hosts):
- client_addr = self.pg0.remote_hosts[0].ip6
- remote_addr = self.pg1.remote_hosts[nbr].ip6
- src_nat_addr = self.pg2.remote_hosts[0].ip6
-
- # ping from pods to outside network
- p1 = (
- Ether(dst=self.pg0.local_mac,
- src=self.pg0.remote_hosts[0].mac) /
- IPv6(src=client_addr, dst=remote_addr) /
- ICMPv6EchoRequest(id=0xfeed) /
- Raw())
-
- rxs = self.send_and_expect(
- self.pg0,
- p1 * N_PKTS,
- self.pg1)
-
- for rx in rxs:
- self.assertEqual(rx[IPv6].src, src_nat_addr)
- self.assert_packet_checksums_valid(rx)
-
- received_id = rx[0][ICMPv6EchoRequest].id
- # ping reply from outside to pods
- p2 = (
- Ether(dst=self.pg1.local_mac,
- src=self.pg1.remote_hosts[nbr].mac) /
- IPv6(src=remote_addr, dst=src_nat_addr) /
- ICMPv6EchoReply(id=received_id))
- rxs = self.send_and_expect(
- self.pg1,
- p2 * N_PKTS,
- self.pg0)
-
- for rx in rxs:
- self.assert_packet_checksums_valid(rx)
- self.assertEqual(rx[IPv6].src, remote_addr)
- self.assertEqual(rx[ICMPv6EchoReply].id, 0xfeed)
-
- def sourcenat_test_icmp_echo4_conf(self):
- sports = [1234, 1235]
- dports = [6661, 6662]
-
- for nbr, remote_host in enumerate(self.pg1.remote_hosts):
- IP46 = IP
- client_addr = self.pg0.remote_hosts[0].ip4
- remote_addr = self.pg1.remote_hosts[nbr].ip4
- src_nat_addr = self.pg2.remote_hosts[0].ip4
-
- # ping from pods to outside network
- p1 = (
- Ether(dst=self.pg0.local_mac,
- src=self.pg0.remote_hosts[0].mac) /
- IP46(src=client_addr, dst=remote_addr) /
- ICMP(type=8, id=0xfeed) /
- Raw())
-
- rxs = self.send_and_expect(
- self.pg0,
- p1 * N_PKTS,
- self.pg1)
-
- for rx in rxs:
- self.assertEqual(rx[IP46].src, src_nat_addr)
- self.assert_packet_checksums_valid(rx)
-
- received_id = rx[0][ICMP].id
- # ping reply from outside to pods
- p2 = (
- Ether(dst=self.pg1.local_mac,
- src=self.pg1.remote_hosts[nbr].mac) /
- IP46(src=remote_addr, dst=src_nat_addr) /
- ICMP(type=0, id=received_id))
- rxs = self.send_and_expect(
- self.pg1,
- p2 * N_PKTS,
- self.pg0)
-
- for rx in rxs:
- self.assert_packet_checksums_valid(rx)
- self.assertEqual(rx[IP46].src, remote_addr)
- self.assertEqual(rx[ICMP].id, 0xfeed)
-
- def sourcenat_test_icmp_err_conf(self, isV6=False):
- sports = [1234, 1235]
- dports = [6661, 6662]
-
- for nbr, remote_host in enumerate(self.pg1.remote_hosts):
- if isV6:
- IP46 = IPv6
- client_addr = self.pg0.remote_hosts[0].ip6
- remote_addr = self.pg1.remote_hosts[nbr].ip6
- src_nat_addr = self.pg2.remote_hosts[0].ip6
- ICMP46 = ICMPv6DestUnreach
- ICMPelem = ICMPv6DestUnreach(code=1)
- IP46error = IPerror6
- else:
- IP46 = IP
- client_addr = self.pg0.remote_hosts[0].ip4
- remote_addr = self.pg1.remote_hosts[nbr].ip4
- src_nat_addr = self.pg2.remote_hosts[0].ip4
- IP46error = IPerror
- ICMP46 = ICMP
- ICMPelem = ICMP(type=11)
-
- # from pods to outside network
- p1 = (
- Ether(dst=self.pg0.local_mac,
- src=self.pg0.remote_hosts[0].mac) /
- IP46(src=client_addr, dst=remote_addr) /
- TCP(sport=sports[nbr], dport=dports[nbr]) /
- Raw())
-
- rxs = self.send_and_expect(
- self.pg0,
- p1 * N_PKTS,
- self.pg1)
- for rx in rxs:
- self.assert_packet_checksums_valid(rx)
- self.assertEqual(rx[IP46].dst, remote_addr)
- self.assertEqual(rx[TCP].dport, dports[nbr])
- self.assertEqual(rx[IP46].src, src_nat_addr)
- sport = rx[TCP].sport
-
- InnerIP = rxs[0][IP46]
- # from outside to pods, ICMP error
- p2 = (
- Ether(dst=self.pg1.local_mac,
- src=self.pg1.remote_hosts[nbr].mac) /
- IP46(src=remote_addr, dst=src_nat_addr) /
- ICMPelem / InnerIP)
-
- rxs = self.send_and_expect(
- self.pg1,
- p2 * N_PKTS,
- self.pg0)
-
- for rx in rxs:
- self.assert_packet_checksums_valid(rx)
- self.assertEqual(rx[IP46].src, remote_addr)
- self.assertEqual(rx[ICMP46][IP46error].src, client_addr)
- self.assertEqual(rx[ICMP46][IP46error].dst, remote_addr)
- self.assertEqual(rx[ICMP46][IP46error]
- [TCPerror].sport, sports[nbr])
- self.assertEqual(rx[ICMP46][IP46error]
- [TCPerror].dport, dports[nbr])
-
- def sourcenat_test_tcp_udp_conf(self, l4p, isV6=False):
- sports = [1234, 1235]
- dports = [6661, 6662]
-
- for nbr, remote_host in enumerate(self.pg1.remote_hosts):
- if isV6:
- IP46 = IPv6
- client_addr = self.pg0.remote_hosts[0].ip6
- remote_addr = self.pg1.remote_hosts[nbr].ip6
- src_nat_addr = self.pg2.remote_hosts[0].ip6
- exclude_prefix = ip_network(
- "%s/100" % remote_addr, strict=False)
- else:
- IP46 = IP
- client_addr = self.pg0.remote_hosts[0].ip4
- remote_addr = self.pg1.remote_hosts[nbr].ip4
- src_nat_addr = self.pg2.remote_hosts[0].ip4
- exclude_prefix = ip_network(
- "%s/16" % remote_addr, strict=False)
- # from pods to outside network
- p1 = (
- Ether(dst=self.pg0.local_mac,
- src=self.pg0.remote_hosts[0].mac) /
- IP46(src=client_addr, dst=remote_addr) /
- l4p(sport=sports[nbr], dport=dports[nbr]) /
- Raw())
-
- self.vapi.cli("trace add pg-input 1")
- rxs = self.send_and_expect(
- self.pg0,
- p1 * N_PKTS,
- self.pg1)
- self.logger.info(self.vapi.cli("show trace max 1"))
-
- for rx in rxs:
- self.assert_packet_checksums_valid(rx)
- self.assertEqual(rx[IP46].dst, remote_addr)
- self.assertEqual(rx[l4p].dport, dports[nbr])
- self.assertEqual(rx[IP46].src, src_nat_addr)
- sport = rx[l4p].sport
-
- # from outside to pods
- p2 = (
- Ether(dst=self.pg1.local_mac,
- src=self.pg1.remote_hosts[nbr].mac) /
- IP46(src=remote_addr, dst=src_nat_addr) /
- l4p(sport=dports[nbr], dport=sport) /
- Raw())
-
- rxs = self.send_and_expect(
- self.pg1,
- p2 * N_PKTS,
- self.pg0)
-
- for rx in rxs:
- self.assert_packet_checksums_valid(rx)
- self.assertEqual(rx[IP46].dst, client_addr)
- self.assertEqual(rx[l4p].dport, sports[nbr])
- self.assertEqual(rx[l4p].sport, dports[nbr])
- self.assertEqual(rx[IP46].src, remote_addr)
-
- # add remote host to exclude list
- self.vapi.cnat_snat_policy_add_del_exclude_pfx(
- prefix=exclude_prefix, is_add=1)
- self.vapi.cnat_session_purge()
-
- rxs = self.send_and_expect(
- self.pg0,
- p1 * N_PKTS,
- self.pg1)
- for rx in rxs:
- self.assert_packet_checksums_valid(rx)
- self.assertEqual(rx[IP46].dst, remote_addr)
- self.assertEqual(rx[l4p].dport, dports[nbr])
- self.assertEqual(rx[IP46].src, client_addr)
-
- # remove remote host from exclude list
- self.vapi.cnat_snat_policy_add_del_exclude_pfx(
- prefix=exclude_prefix, is_add=0)
- self.vapi.cnat_session_purge()
-
- rxs = self.send_and_expect(
- self.pg0,
- p1 * N_PKTS,
- self.pg1)
-
- for rx in rxs:
- self.assert_packet_checksums_valid(rx)
- self.assertEqual(rx[IP46].dst, remote_addr)
- self.assertEqual(rx[l4p].dport, dports[nbr])
- self.assertEqual(rx[IP46].src, src_nat_addr)
-
- self.vapi.cnat_session_purge()
-
-
-class TestCNatDHCP(VppTestCase):
- """ CNat Translation """
- extra_vpp_punt_config = ["cnat", "{",
- "session-db-buckets", "64",
- "session-cleanup-timeout", "0.1",
- "session-max-age", "1",
- "tcp-max-age", "1",
- "scanner", "off", "}"]
+ self.sourcenat_test_icmp_echo_conf()
+
+ def sourcenat_test_icmp_echo_conf(self, is_v6=False):
+ ctx = CnatTestContext(self, ICMP, is_v6=is_v6)
+ # 8 is ICMP type echo (v4 only)
+ ctx.cnat_send(self.pg0, 0, 0xFEED, self.pg1, 0, 8)
+ ctx.cnat_expect(self.pg2, 0, None, self.pg1, 0, 8)
+ ctx.cnat_send_return().cnat_expect_return()
+
+ def sourcenat_test_tcp_udp_conf(self, L4PROTO, is_v6=False):
+ ctx = CnatTestContext(self, L4PROTO, is_v6)
+ # we should source NAT
+ ctx.cnat_send(self.pg0, 0, 1234, self.pg1, 0, 6661)
+ ctx.cnat_expect(self.pg2, 0, None, self.pg1, 0, 6661)
+ ctx.cnat_send_return().cnat_expect_return()
+
+ # exclude dst address of pg1.1 from snat
+ if is_v6:
+ exclude_prefix = ip_network(
+ "%s/100" % self.pg1.remote_hosts[1].ip6, strict=False
+ )
+ else:
+ exclude_prefix = ip_network(
+ "%s/16" % self.pg1.remote_hosts[1].ip4, strict=False
+ )
+
+ # add remote host to exclude list
+ self.vapi.cnat_snat_policy_add_del_exclude_pfx(prefix=exclude_prefix, is_add=1)
+
+ # We should not source NAT the id=1
+ ctx.cnat_send(self.pg0, 0, 1234, self.pg1, 1, 6661)
+ ctx.cnat_expect(self.pg0, 0, 1234, self.pg1, 1, 6661)
+ ctx.cnat_send_return().cnat_expect_return()
+
+ # But we should source NAT the id=0
+ ctx.cnat_send(self.pg0, 0, 1234, self.pg1, 0, 6661)
+ ctx.cnat_expect(self.pg2, 0, None, self.pg1, 0, 6661)
+ ctx.cnat_send_return().cnat_expect_return()
+
+ # remove remote host from exclude list
+ self.vapi.cnat_snat_policy_add_del_exclude_pfx(prefix=exclude_prefix, is_add=0)
+ self.vapi.cnat_session_purge()
+
+ # We should source NAT again
+ ctx.cnat_send(self.pg0, 0, 1234, self.pg1, 1, 6661)
+ ctx.cnat_expect(self.pg2, 0, None, self.pg1, 1, 6661)
+ ctx.cnat_send_return().cnat_expect_return()
+
+ # test return ICMP error nating
+ ctx.cnat_send(self.pg0, 0, 1234, self.pg1, 1, 6661)
+ ctx.cnat_expect(self.pg2, 0, None, self.pg1, 1, 6661)
+ ctx.cnat_send_icmp_return_error().cnat_expect_icmp_error_return()
+
+ self.vapi.cnat_session_purge()
+
+
+class TestCNatDHCP(CnatCommonTestCase):
+ """CNat Translation"""