X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=src%2Fplugins%2Fnat%2Ftest%2Ftest_nat.py;h=09bf8a2d41e13dcf6ccfdd4c78522be06e4ca9cc;hb=603e75465899385a95350f3c96499050f7f960a5;hp=1cddc405f076bc675f009dc9960823ef5df89166;hpb=c611f36bbc75a7157bbec26a78178872ddc5441f;p=vpp.git diff --git a/src/plugins/nat/test/test_nat.py b/src/plugins/nat/test/test_nat.py index 1cddc405f07..09bf8a2d41e 100644 --- a/src/plugins/nat/test/test_nat.py +++ b/src/plugins/nat/test/test_nat.py @@ -38,7 +38,7 @@ class Event(Packet): fields_desc = [ByteEnumField("event_type", None, {1: "add", 2: "del", 3: "refresh"}), ByteEnumField("protocol", None, - {0: "udp", 1: "tcp", 2: "icmp"}), + {0: "other", 1: "udp", 2: "tcp", 3: "icmp"}), ShortField("flags", 0), IPField("in_addr", None), IPField("out_addr", None), @@ -448,7 +448,7 @@ class MethodHolder(VppTestCase): return pkts def verify_capture_out(self, capture, nat_ip=None, same_port=False, - dst_ip=None, is_ip6=False): + dst_ip=None, is_ip6=False, ignore_port=False): """ Verify captured packets on outside network @@ -474,25 +474,32 @@ class MethodHolder(VppTestCase): if dst_ip is not None: self.assertEqual(packet[IP46].dst, dst_ip) if packet.haslayer(TCP): - if same_port: - self.assertEqual(packet[TCP].sport, self.tcp_port_in) - else: - self.assertNotEqual( - packet[TCP].sport, self.tcp_port_in) + if not ignore_port: + if same_port: + self.assertEqual( + packet[TCP].sport, self.tcp_port_in) + else: + self.assertNotEqual( + packet[TCP].sport, self.tcp_port_in) self.tcp_port_out = packet[TCP].sport self.assert_packet_checksums_valid(packet) elif packet.haslayer(UDP): - if same_port: - self.assertEqual(packet[UDP].sport, self.udp_port_in) - else: - self.assertNotEqual( - packet[UDP].sport, self.udp_port_in) + if not ignore_port: + if same_port: + self.assertEqual( + packet[UDP].sport, self.udp_port_in) + else: + self.assertNotEqual( + packet[UDP].sport, self.udp_port_in) self.udp_port_out = packet[UDP].sport else: - if same_port: - self.assertEqual(packet[ICMP46].id, self.icmp_id_in) - else: - self.assertNotEqual(packet[ICMP46].id, self.icmp_id_in) + if not ignore_port: + if same_port: + self.assertEqual( + packet[ICMP46].id, self.icmp_id_in) + else: + self.assertNotEqual( + packet[ICMP46].id, self.icmp_id_in) self.icmp_id_out = packet[ICMP46].id self.assert_packet_checksums_valid(packet) except: @@ -1105,7 +1112,8 @@ class MethodHolder(VppTestCase): else: raise Exception("Unsupported protocol") - def frag_in_order(self, proto=IP_PROTOS.tcp, dont_translate=False): + def frag_in_order(self, proto=IP_PROTOS.tcp, dont_translate=False, + ignore_port=False): layer = self.proto2layer(proto) if proto == IP_PROTOS.tcp: @@ -1132,14 +1140,16 @@ class MethodHolder(VppTestCase): if proto != IP_PROTOS.icmp: if not dont_translate: self.assertEqual(p[layer].dport, 20) - self.assertNotEqual(p[layer].sport, self.port_in) + if not ignore_port: + self.assertNotEqual(p[layer].sport, self.port_in) else: self.assertEqual(p[layer].sport, self.port_in) else: - if not dont_translate: - self.assertNotEqual(p[layer].id, self.port_in) - else: - self.assertEqual(p[layer].id, self.port_in) + if not ignore_port: + if not dont_translate: + self.assertNotEqual(p[layer].id, self.port_in) + else: + self.assertEqual(p[layer].id, self.port_in) self.assertEqual(data, p[Raw].load) # out2in @@ -1220,7 +1230,7 @@ class MethodHolder(VppTestCase): self.assertEqual(p[layer].id, self.port_in) self.assertEqual(data, p[Raw].load) - def reass_hairpinning(self, proto=IP_PROTOS.tcp): + def reass_hairpinning(self, proto=IP_PROTOS.tcp, ignore_port=False): layer = self.proto2layer(proto) if proto == IP_PROTOS.tcp: @@ -1243,13 +1253,16 @@ class MethodHolder(VppTestCase): self.nat_addr, self.server.ip4) if proto != IP_PROTOS.icmp: - self.assertNotEqual(p[layer].sport, self.host_in_port) + if not ignore_port: + self.assertNotEqual(p[layer].sport, self.host_in_port) self.assertEqual(p[layer].dport, self.server_in_port) else: - self.assertNotEqual(p[layer].id, self.host_in_port) + if not ignore_port: + self.assertNotEqual(p[layer].id, self.host_in_port) self.assertEqual(data, p[Raw].load) - def frag_out_of_order(self, proto=IP_PROTOS.tcp, dont_translate=False): + def frag_out_of_order(self, proto=IP_PROTOS.tcp, dont_translate=False, + ignore_port=False): layer = self.proto2layer(proto) if proto == IP_PROTOS.tcp: @@ -1278,14 +1291,16 @@ class MethodHolder(VppTestCase): if proto != IP_PROTOS.icmp: if not dont_translate: self.assertEqual(p[layer].dport, 20) - self.assertNotEqual(p[layer].sport, self.port_in) + if not ignore_port: + self.assertNotEqual(p[layer].sport, self.port_in) else: self.assertEqual(p[layer].sport, self.port_in) else: - if not dont_translate: - self.assertNotEqual(p[layer].id, self.port_in) - else: - self.assertEqual(p[layer].id, self.port_in) + if not ignore_port: + if not dont_translate: + self.assertNotEqual(p[layer].id, self.port_in) + else: + self.assertEqual(p[layer].id, self.port_in) self.assertEqual(data, p[Raw].load) # out2in @@ -1374,6 +1389,19 @@ class MethodHolder(VppTestCase): class TestNAT44(MethodHolder): """ NAT44 Test Cases """ + max_translations = 10240 + max_users = 10240 + + @classmethod + def setUpConstants(cls): + super(TestNAT44, cls).setUpConstants() + cls.vpp_cmdline.extend([ + "nat", "{", + "max translations per thread %d" % cls.max_translations, + "max users per thread %d" % cls.max_users, + "}" + ]) + @classmethod def setUpClass(cls): super(TestNAT44, cls).setUpClass() @@ -1486,14 +1514,10 @@ class TestNAT44(MethodHolder): is_add=1) # in2out - tcpn = self.statistics.get_err_counter( - '/err/nat44-in2out-slowpath/TCP packets') - udpn = self.statistics.get_err_counter( - '/err/nat44-in2out-slowpath/UDP packets') - icmpn = self.statistics.get_err_counter( - '/err/nat44-in2out-slowpath/ICMP packets') - totaln = self.statistics.get_err_counter( - '/err/nat44-in2out-slowpath/good in2out packets processed') + tcpn = self.statistics.get_counter('/nat44/in2out/slowpath/tcp')[0] + udpn = self.statistics.get_counter('/nat44/in2out/slowpath/udp')[0] + icmpn = self.statistics.get_counter('/nat44/in2out/slowpath/icmp')[0] + drops = self.statistics.get_counter('/nat44/in2out/slowpath/drops')[0] pkts = self.create_stream_in(self.pg0, self.pg1) self.pg0.add_stream(pkts) @@ -1502,26 +1526,21 @@ class TestNAT44(MethodHolder): capture = self.pg1.get_capture(len(pkts)) self.verify_capture_out(capture) - err = self.statistics.get_err_counter( - '/err/nat44-in2out-slowpath/TCP packets') - self.assertEqual(err - tcpn, 2) - err = self.statistics.get_err_counter( - '/err/nat44-in2out-slowpath/UDP packets') - self.assertEqual(err - udpn, 1) - err = self.statistics.get_err_counter( - '/err/nat44-in2out-slowpath/ICMP packets') - self.assertEqual(err - icmpn, 1) - err = self.statistics.get_err_counter( - '/err/nat44-in2out-slowpath/good in2out packets processed') - self.assertEqual(err - totaln, 4) + if_idx = self.pg0.sw_if_index + cnt = self.statistics.get_counter('/nat44/in2out/slowpath/tcp')[0] + self.assertEqual(cnt[if_idx] - tcpn[if_idx], 2) + cnt = self.statistics.get_counter('/nat44/in2out/slowpath/udp')[0] + self.assertEqual(cnt[if_idx] - udpn[if_idx], 1) + cnt = self.statistics.get_counter('/nat44/in2out/slowpath/icmp')[0] + self.assertEqual(cnt[if_idx] - icmpn[if_idx], 1) + cnt = self.statistics.get_counter('/nat44/in2out/slowpath/drops')[0] + self.assertEqual(cnt[if_idx] - drops[if_idx], 0) # out2in - tcpn = self.statistics.get_err_counter('/err/nat44-out2in/TCP packets') - udpn = self.statistics.get_err_counter('/err/nat44-out2in/UDP packets') - icmpn = self.statistics.get_err_counter( - '/err/nat44-out2in/ICMP packets') - totaln = self.statistics.get_err_counter( - '/err/nat44-out2in/good out2in packets processed') + tcpn = self.statistics.get_counter('/nat44/out2in/slowpath/tcp')[0] + udpn = self.statistics.get_counter('/nat44/out2in/slowpath/udp')[0] + icmpn = self.statistics.get_counter('/nat44/out2in/slowpath/icmp')[0] + drops = self.statistics.get_counter('/nat44/out2in/slowpath/drops')[0] pkts = self.create_stream_out(self.pg1) self.pg1.add_stream(pkts) @@ -1530,15 +1549,15 @@ class TestNAT44(MethodHolder): capture = self.pg0.get_capture(len(pkts)) self.verify_capture_in(capture, self.pg0) - err = self.statistics.get_err_counter('/err/nat44-out2in/TCP packets') - self.assertEqual(err - tcpn, 2) - err = self.statistics.get_err_counter('/err/nat44-out2in/UDP packets') - self.assertEqual(err - udpn, 1) - err = self.statistics.get_err_counter('/err/nat44-out2in/ICMP packets') - self.assertEqual(err - icmpn, 1) - err = self.statistics.get_err_counter( - '/err/nat44-out2in/good out2in packets processed') - self.assertEqual(err - totaln, 4) + if_idx = self.pg1.sw_if_index + cnt = self.statistics.get_counter('/nat44/out2in/slowpath/tcp')[0] + self.assertEqual(cnt[if_idx] - tcpn[if_idx], 2) + cnt = self.statistics.get_counter('/nat44/out2in/slowpath/udp')[0] + self.assertEqual(cnt[if_idx] - udpn[if_idx], 1) + cnt = self.statistics.get_counter('/nat44/out2in/slowpath/icmp')[0] + self.assertEqual(cnt[if_idx] - icmpn[if_idx], 1) + cnt = self.statistics.get_counter('/nat44/out2in/slowpath/drops')[0] + self.assertEqual(cnt[if_idx] - drops[if_idx], 0) users = self.statistics.get_counter('/nat44/total-users') self.assertEqual(users[0][0], 1) @@ -2326,6 +2345,7 @@ class TestNAT44(MethodHolder): server_in_port, server_out_port, proto=IP_PROTOS.tcp) + cnt = self.statistics.get_counter('/nat44/hairpinning')[0] # send packet from host to server p = (Ether(src=host.mac, dst=self.pg0.local_mac) / IP(src=host.ip4, dst=self.nat_addr) / @@ -2348,6 +2368,10 @@ class TestNAT44(MethodHolder): self.logger.error(ppp("Unexpected or invalid packet:", p)) raise + after = self.statistics.get_counter('/nat44/hairpinning')[0] + if_idx = self.pg0.sw_if_index + self.assertEqual(after[if_idx] - cnt[if_idx], 1) + # send reply from server to host p = (Ether(src=server.mac, dst=self.pg0.local_mac) / IP(src=server.ip4, dst=self.nat_addr) / @@ -2369,6 +2393,10 @@ class TestNAT44(MethodHolder): self.logger.error(ppp("Unexpected or invalid packet:", p)) raise + after = self.statistics.get_counter('/nat44/hairpinning')[0] + if_idx = self.pg0.sw_if_index + self.assertEqual(after[if_idx] - cnt[if_idx], 2) + def test_hairpinning2(self): """ NAT44 hairpinning - 1:1 NAT""" @@ -2770,8 +2798,7 @@ class TestNAT44(MethodHolder): sw_if_index=self.pg1.sw_if_index, is_add=1) - nat44_config = self.vapi.nat_show_config() - max_sessions = 10 * nat44_config.translation_buckets + max_sessions = self.max_translations pkts = [] for i in range(0, max_sessions): @@ -4437,9 +4464,9 @@ class TestNAT44EndpointDependent(MethodHolder): self.vapi.nat44_interface_add_del_feature( sw_if_index=self.pg1.sw_if_index, is_add=1) - self.frag_in_order(proto=IP_PROTOS.tcp) - self.frag_in_order(proto=IP_PROTOS.udp) - self.frag_in_order(proto=IP_PROTOS.icmp) + self.frag_in_order(proto=IP_PROTOS.tcp, ignore_port=True) + self.frag_in_order(proto=IP_PROTOS.udp, ignore_port=True) + self.frag_in_order(proto=IP_PROTOS.icmp, ignore_port=True) def test_frag_in_order_dont_translate(self): """ NAT44 don't translate fragments arriving in order """ @@ -4463,9 +4490,9 @@ class TestNAT44EndpointDependent(MethodHolder): self.vapi.nat44_interface_add_del_feature( sw_if_index=self.pg1.sw_if_index, is_add=1) - self.frag_out_of_order(proto=IP_PROTOS.tcp) - self.frag_out_of_order(proto=IP_PROTOS.udp) - self.frag_out_of_order(proto=IP_PROTOS.icmp) + self.frag_out_of_order(proto=IP_PROTOS.tcp, ignore_port=True) + self.frag_out_of_order(proto=IP_PROTOS.udp, ignore_port=True) + self.frag_out_of_order(proto=IP_PROTOS.icmp, ignore_port=True) def test_frag_out_of_order_dont_translate(self): """ NAT44 don't translate fragments arriving out of order """ @@ -4593,9 +4620,9 @@ class TestNAT44EndpointDependent(MethodHolder): proto=IP_PROTOS.udp) self.nat44_add_static_mapping(self.server.ip4, self.nat_addr) - self.reass_hairpinning(proto=IP_PROTOS.tcp) - self.reass_hairpinning(proto=IP_PROTOS.udp) - self.reass_hairpinning(proto=IP_PROTOS.icmp) + self.reass_hairpinning(proto=IP_PROTOS.tcp, ignore_port=True) + self.reass_hairpinning(proto=IP_PROTOS.udp, ignore_port=True) + self.reass_hairpinning(proto=IP_PROTOS.icmp, ignore_port=True) def test_clear_sessions(self): """ NAT44 ED session clearing test """ @@ -4617,7 +4644,7 @@ class TestNAT44EndpointDependent(MethodHolder): self.pg_enable_capture(self.pg_interfaces) self.pg_start() capture = self.pg1.get_capture(len(pkts)) - self.verify_capture_out(capture) + self.verify_capture_out(capture, ignore_port=True) sessions = self.statistics.get_counter('/nat44/total-sessions') self.assertTrue(sessions[0][0] > 0) @@ -4650,44 +4677,37 @@ class TestNAT44EndpointDependent(MethodHolder): self.assertEqual(1, nat_config.endpoint_dependent) # in2out - tcpn = self.statistics.get_err_counter( - '/err/nat44-ed-in2out-slowpath/TCP packets') - udpn = self.statistics.get_err_counter( - '/err/nat44-ed-in2out-slowpath/UDP packets') - icmpn = self.statistics.get_err_counter( - '/err/nat44-ed-in2out-slowpath/ICMP packets') - totaln = self.statistics.get_err_counter( - '/err/nat44-ed-in2out-slowpath/good in2out packets processed') + tcpn = self.statistics.get_counter('/nat44/ed/in2out/slowpath/tcp')[0] + udpn = self.statistics.get_counter('/nat44/ed/in2out/slowpath/udp')[0] + icmpn = self.statistics.get_counter( + '/nat44/ed/in2out/slowpath/icmp')[0] + drops = self.statistics.get_counter( + '/nat44/ed/in2out/slowpath/drops')[0] pkts = self.create_stream_in(self.pg0, self.pg1) self.pg0.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) self.pg_start() capture = self.pg1.get_capture(len(pkts)) - self.verify_capture_out(capture) - - err = self.statistics.get_err_counter( - '/err/nat44-ed-in2out-slowpath/TCP packets') - self.assertEqual(err - tcpn, 2) - err = self.statistics.get_err_counter( - '/err/nat44-ed-in2out-slowpath/UDP packets') - self.assertEqual(err - udpn, 1) - err = self.statistics.get_err_counter( - '/err/nat44-ed-in2out-slowpath/ICMP packets') - self.assertEqual(err - icmpn, 1) - err = self.statistics.get_err_counter( - '/err/nat44-ed-in2out-slowpath/good in2out packets processed') - self.assertEqual(err - totaln, 4) + self.verify_capture_out(capture, ignore_port=True) + + if_idx = self.pg0.sw_if_index + cnt = self.statistics.get_counter('/nat44/ed/in2out/slowpath/tcp')[0] + self.assertEqual(cnt[if_idx] - tcpn[if_idx], 2) + cnt = self.statistics.get_counter('/nat44/ed/in2out/slowpath/udp')[0] + self.assertEqual(cnt[if_idx] - udpn[if_idx], 1) + cnt = self.statistics.get_counter('/nat44/ed/in2out/slowpath/icmp')[0] + self.assertEqual(cnt[if_idx] - icmpn[if_idx], 1) + cnt = self.statistics.get_counter('/nat44/ed/in2out/slowpath/drops')[0] + self.assertEqual(cnt[if_idx] - drops[if_idx], 0) # out2in - tcpn = self.statistics.get_err_counter( - '/err/nat44-ed-out2in/TCP packets') - udpn = self.statistics.get_err_counter( - '/err/nat44-ed-out2in/UDP packets') - icmpn = self.statistics.get_err_counter( - '/err/nat44-ed-out2in-slowpath/ICMP packets') - totaln = self.statistics.get_err_counter( - '/err/nat44-ed-out2in/good out2in packets processed') + tcpn = self.statistics.get_counter('/nat44/ed/out2in/fastpath/tcp')[0] + udpn = self.statistics.get_counter('/nat44/ed/out2in/fastpath/udp')[0] + icmpn = self.statistics.get_counter( + '/nat44/ed/out2in/slowpath/icmp')[0] + drops = self.statistics.get_counter( + '/nat44/ed/out2in/fastpath/drops')[0] pkts = self.create_stream_out(self.pg1) self.pg1.add_stream(pkts) @@ -4696,22 +4716,66 @@ class TestNAT44EndpointDependent(MethodHolder): capture = self.pg0.get_capture(len(pkts)) self.verify_capture_in(capture, self.pg0) - err = self.statistics.get_err_counter( - '/err/nat44-ed-out2in/TCP packets') - self.assertEqual(err - tcpn, 2) - err = self.statistics.get_err_counter( - '/err/nat44-ed-out2in/UDP packets') - self.assertEqual(err - udpn, 1) - err = self.statistics.get_err_counter( - '/err/nat44-ed-out2in-slowpath/ICMP packets') - self.assertEqual(err - icmpn, 1) - err = self.statistics.get_err_counter( - '/err/nat44-ed-out2in/good out2in packets processed') - self.assertEqual(err - totaln, 3) + if_idx = self.pg1.sw_if_index + cnt = self.statistics.get_counter('/nat44/ed/out2in/fastpath/tcp')[0] + self.assertEqual(cnt[if_idx] - tcpn[if_idx], 2) + cnt = self.statistics.get_counter('/nat44/ed/out2in/fastpath/udp')[0] + self.assertEqual(cnt[if_idx] - udpn[if_idx], 1) + cnt = self.statistics.get_counter('/nat44/ed/out2in/slowpath/icmp')[0] + self.assertEqual(cnt[if_idx] - icmpn[if_idx], 1) + cnt = self.statistics.get_counter('/nat44/ed/out2in/fastpath/drops')[0] + self.assertEqual(cnt[if_idx] - drops[if_idx], 0) sessions = self.statistics.get_counter('/nat44/total-sessions') self.assertEqual(sessions[0][0], 3) + def test_dynamic_out_of_ports(self): + """ NAT44 dynamic translation test: out of ports """ + + flags = self.config_flags.NAT_IS_INSIDE + self.vapi.nat44_interface_add_del_feature( + sw_if_index=self.pg0.sw_if_index, + flags=flags, is_add=1) + self.vapi.nat44_interface_add_del_feature( + sw_if_index=self.pg1.sw_if_index, + is_add=1) + + nat_config = self.vapi.nat_show_config() + self.assertEqual(1, nat_config.endpoint_dependent) + + # in2out and no NAT addresses added + err_old = self.statistics.get_err_counter( + '/err/nat44-ed-in2out-slowpath/out of ports') + + pkts = self.create_stream_in(self.pg0, self.pg1) + self.pg0.add_stream(pkts) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + self.pg1.get_capture(0, timeout=1) + + err_new = self.statistics.get_err_counter( + '/err/nat44-ed-in2out-slowpath/out of ports') + + self.assertEqual(err_new - err_old, len(pkts)) + + # in2out after NAT addresses added + self.nat44_add_address(self.nat_addr) + + err_old = self.statistics.get_err_counter( + '/err/nat44-ed-in2out-slowpath/out of ports') + + pkts = self.create_stream_in(self.pg0, self.pg1) + self.pg0.add_stream(pkts) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg1.get_capture(len(pkts)) + self.verify_capture_out(capture, ignore_port=True) + + err_new = self.statistics.get_err_counter( + '/err/nat44-ed-in2out-slowpath/out of ports') + + self.assertEqual(err_new, err_old) + def test_dynamic_output_feature_vrf(self): """ NAT44 dynamic translation test: output-feature, VRF""" @@ -4745,44 +4809,45 @@ class TestNAT44EndpointDependent(MethodHolder): self.assertEqual(1, nat_config.endpoint_dependent) # in2out - tcpn = self.statistics.get_err_counter( - '/err/nat44-ed-in2out-slowpath/TCP packets') - udpn = self.statistics.get_err_counter( - '/err/nat44-ed-in2out-slowpath/UDP packets') - icmpn = self.statistics.get_err_counter( - '/err/nat44-ed-in2out-slowpath/ICMP packets') - totaln = self.statistics.get_err_counter( - '/err/nat44-ed-in2out-slowpath/good in2out packets processed') + tcpn = self.statistics.get_counter( + '/nat44/ed/in2out/slowpath/tcp')[0] + udpn = self.statistics.get_counter( + '/nat44/ed/in2out/slowpath/udp')[0] + icmpn = self.statistics.get_counter( + '/nat44/ed/in2out/slowpath/icmp')[0] + drops = self.statistics.get_counter( + '/nat44/ed/in2out/slowpath/drops')[0] pkts = self.create_stream_in(self.pg7, self.pg8) self.pg7.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) self.pg_start() capture = self.pg8.get_capture(len(pkts)) - self.verify_capture_out(capture) - - err = self.statistics.get_err_counter( - '/err/nat44-ed-in2out-slowpath/TCP packets') - self.assertEqual(err - tcpn, 2) - err = self.statistics.get_err_counter( - '/err/nat44-ed-in2out-slowpath/UDP packets') - self.assertEqual(err - udpn, 1) - err = self.statistics.get_err_counter( - '/err/nat44-ed-in2out-slowpath/ICMP packets') - self.assertEqual(err - icmpn, 1) - err = self.statistics.get_err_counter( - '/err/nat44-ed-in2out-slowpath/good in2out packets processed') - self.assertEqual(err - totaln, 4) + self.verify_capture_out(capture, ignore_port=True) + + if_idx = self.pg7.sw_if_index + cnt = self.statistics.get_counter( + '/nat44/ed/in2out/slowpath/tcp')[0] + self.assertEqual(cnt[if_idx] - tcpn[if_idx], 2) + cnt = self.statistics.get_counter( + '/nat44/ed/in2out/slowpath/udp')[0] + self.assertEqual(cnt[if_idx] - udpn[if_idx], 1) + cnt = self.statistics.get_counter( + '/nat44/ed/in2out/slowpath/icmp')[0] + self.assertEqual(cnt[if_idx] - icmpn[if_idx], 1) + cnt = self.statistics.get_counter( + '/nat44/ed/in2out/slowpath/drops')[0] + self.assertEqual(cnt[if_idx] - drops[if_idx], 0) # out2in - tcpn = self.statistics.get_err_counter( - '/err/nat44-ed-out2in/TCP packets') - udpn = self.statistics.get_err_counter( - '/err/nat44-ed-out2in/UDP packets') - icmpn = self.statistics.get_err_counter( - '/err/nat44-ed-out2in-slowpath/ICMP packets') - totaln = self.statistics.get_err_counter( - '/err/nat44-ed-out2in/good out2in packets processed') + tcpn = self.statistics.get_counter( + '/nat44/ed/out2in/fastpath/tcp')[0] + udpn = self.statistics.get_counter( + '/nat44/ed/out2in/fastpath/udp')[0] + icmpn = self.statistics.get_counter( + '/nat44/ed/out2in/slowpath/icmp')[0] + drops = self.statistics.get_counter( + '/nat44/ed/out2in/fastpath/drops')[0] pkts = self.create_stream_out(self.pg8) self.pg8.add_stream(pkts) @@ -4791,18 +4856,19 @@ class TestNAT44EndpointDependent(MethodHolder): capture = self.pg7.get_capture(len(pkts)) self.verify_capture_in(capture, self.pg7) - err = self.statistics.get_err_counter( - '/err/nat44-ed-out2in/TCP packets') - self.assertEqual(err - tcpn, 2) - err = self.statistics.get_err_counter( - '/err/nat44-ed-out2in/UDP packets') - self.assertEqual(err - udpn, 1) - err = self.statistics.get_err_counter( - '/err/nat44-ed-out2in-slowpath/ICMP packets') - self.assertEqual(err - icmpn, 1) - err = self.statistics.get_err_counter( - '/err/nat44-ed-out2in/good out2in packets processed') - self.assertEqual(err - totaln, 3) + if_idx = self.pg8.sw_if_index + cnt = self.statistics.get_counter( + '/nat44/ed/out2in/fastpath/tcp')[0] + self.assertEqual(cnt[if_idx] - tcpn[if_idx], 2) + cnt = self.statistics.get_counter( + '/nat44/ed/out2in/fastpath/udp')[0] + self.assertEqual(cnt[if_idx] - udpn[if_idx], 1) + cnt = self.statistics.get_counter( + '/nat44/ed/out2in/slowpath/icmp')[0] + self.assertEqual(cnt[if_idx] - icmpn[if_idx], 1) + cnt = self.statistics.get_counter( + '/nat44/ed/out2in/fastpath/drops')[0] + self.assertEqual(cnt[if_idx] - drops[if_idx], 0) sessions = self.statistics.get_counter('/nat44/total-sessions') self.assertEqual(sessions[0][0], 3) @@ -5508,13 +5574,13 @@ class TestNAT44EndpointDependent(MethodHolder): self.pg_enable_capture(self.pg_interfaces) self.pg_start() capture = self.pg1.get_capture(len(pkts)) - self.verify_capture_out(capture) + self.verify_capture_out(capture, ignore_port=True) pkts = self.create_stream_in(self.pg0, self.pg1) self.pg0.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) self.pg_start() capture = self.pg1.get_capture(len(pkts)) - self.verify_capture_out(capture) + self.verify_capture_out(capture, ignore_port=True) # from external network back to local network host pkts = self.create_stream_out(self.pg1) @@ -5538,7 +5604,7 @@ class TestNAT44EndpointDependent(MethodHolder): self.pg_enable_capture(self.pg_interfaces) self.pg_start() capture = self.pg1.get_capture(len(pkts)) - self.verify_capture_out(capture) + self.verify_capture_out(capture, ignore_port=True) pkts = self.create_stream_out(self.pg1) self.pg1.add_stream(pkts) @@ -6548,7 +6614,7 @@ class TestNAT44EndpointDependent(MethodHolder): self.pg_enable_capture(self.pg_interfaces) self.pg_start() capture = self.pg1.get_capture(len(pkts)) - self.verify_capture_out(capture) + self.verify_capture_out(capture, ignore_port=True) # out2in pkts = self.create_stream_out(self.pg1) @@ -6580,7 +6646,7 @@ class TestNAT44EndpointDependent(MethodHolder): pkts_in2out = self.create_stream_in(self.pg0, self.pg1) capture = self.send_and_expect(self.pg0, pkts_in2out, self.pg1, len(pkts_in2out)) - self.verify_capture_out(capture) + self.verify_capture_out(capture, ignore_port=True) # send out2in again, with sessions created it should work now pkts_out2in = self.create_stream_out(self.pg1) @@ -6610,7 +6676,7 @@ class TestNAT44EndpointDependent(MethodHolder): # send in2out to generate ACL state (NAT state was created earlier) capture = self.send_and_expect(self.pg0, pkts_in2out, self.pg1, len(pkts_in2out)) - self.verify_capture_out(capture) + self.verify_capture_out(capture, ignore_port=True) # send out2in again. ACL state exists so it should work now. # TCP packets with the syn flag set also need the ack flag @@ -6715,7 +6781,6 @@ class TestNAT44EndpointDependent(MethodHolder): ip = p[IP] tcp = p[TCP] self.assertEqual(ip.src, self.nat_addr) - self.assertNotEqual(tcp.sport, 2345) self.assert_packet_checksums_valid(p) port = tcp.sport except: @@ -7043,6 +7108,111 @@ class TestNAT44EndpointDependent(MethodHolder): capture = self.pg2.get_capture(1) self.verify_syslog_sess(capture[0][Raw].load, False) + def test_ed_users_dump(self): + """ API test - nat44_user_dump """ + flags = self.config_flags.NAT_IS_INSIDE + self.vapi.nat44_interface_add_del_feature( + sw_if_index=self.pg0.sw_if_index, + flags=flags, is_add=1) + self.vapi.nat44_interface_add_del_feature( + sw_if_index=self.pg1.sw_if_index, + is_add=1) + self.vapi.nat44_forwarding_enable_disable(enable=1) + + real_ip = self.pg0.remote_ip4 + alias_ip = self.nat_addr + flags = self.config_flags.NAT_IS_ADDR_ONLY + self.vapi.nat44_add_del_static_mapping(is_add=1, + local_ip_address=real_ip, + external_ip_address=alias_ip, + external_sw_if_index=0xFFFFFFFF, + flags=flags) + + users = self.vapi.nat44_user_dump() + self.assertEqual(len(users), 0) + try: + # in2out - static mapping match + + pkts = self.create_stream_out(self.pg1) + self.pg1.add_stream(pkts) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg0.get_capture(len(pkts)) + self.verify_capture_in(capture, self.pg0) + + pkts = self.create_stream_in(self.pg0, self.pg1) + self.pg0.add_stream(pkts) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg1.get_capture(len(pkts)) + self.verify_capture_out(capture, same_port=True) + + users = self.vapi.nat44_user_dump() + self.assertEqual(len(users), 1) + static_user = users[0] + self.assertEqual(static_user.nstaticsessions, 3) + self.assertEqual(static_user.nsessions, 0) + + # in2out - no static mapping match + + host0 = self.pg0.remote_hosts[0] + self.pg0.remote_hosts[0] = self.pg0.remote_hosts[1] + try: + pkts = self.create_stream_out(self.pg1, + dst_ip=self.pg0.remote_ip4, + use_inside_ports=True) + self.pg1.add_stream(pkts) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg0.get_capture(len(pkts)) + self.verify_capture_in(capture, self.pg0) + + pkts = self.create_stream_in(self.pg0, self.pg1) + self.pg0.add_stream(pkts) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg1.get_capture(len(pkts)) + self.verify_capture_out(capture, nat_ip=self.pg0.remote_ip4, + same_port=True) + finally: + self.pg0.remote_hosts[0] = host0 + + users = self.vapi.nat44_user_dump() + self.assertEqual(len(users), 2) + if str(users[0].ip_address) == self.pg0.remote_hosts[0].ip4: + non_static_user = users[1] + static_user = users[0] + else: + non_static_user = users[0] + static_user = users[1] + self.assertEqual(static_user.nstaticsessions, 3) + self.assertEqual(static_user.nsessions, 0) + self.assertEqual(non_static_user.nstaticsessions, 0) + self.assertEqual(non_static_user.nsessions, 3) + + users = self.vapi.nat44_user_dump() + self.assertEqual(len(users), 2) + if str(users[0].ip_address) == self.pg0.remote_hosts[0].ip4: + non_static_user = users[1] + static_user = users[0] + else: + non_static_user = users[0] + static_user = users[1] + self.assertEqual(static_user.nstaticsessions, 3) + self.assertEqual(static_user.nsessions, 0) + self.assertEqual(non_static_user.nstaticsessions, 0) + self.assertEqual(non_static_user.nsessions, 3) + + finally: + self.vapi.nat44_forwarding_enable_disable(enable=0) + flags = self.config_flags.NAT_IS_ADDR_ONLY + self.vapi.nat44_add_del_static_mapping( + is_add=0, + local_ip_address=real_ip, + external_ip_address=alias_ip, + external_sw_if_index=0xFFFFFFFF, + flags=flags) + def tearDown(self): super(TestNAT44EndpointDependent, self).tearDown() if not self.vpp_dead: @@ -7059,6 +7229,110 @@ class TestNAT44EndpointDependent(MethodHolder): self.logger.info(self.vapi.cli("show nat timeouts")) +class TestNAT44EndpointDependent3(MethodHolder): + """ Endpoint-Dependent mapping and filtering extra test cases """ + + max_translations = 50 + + @classmethod + def setUpConstants(cls): + super(TestNAT44EndpointDependent3, cls).setUpConstants() + cls.vpp_cmdline.extend([ + "nat", "{", "endpoint-dependent", + "max translations per thread %d" % cls.max_translations, + "}" + ]) + + @classmethod + def setUpClass(cls): + super(TestNAT44EndpointDependent3, cls).setUpClass() + cls.vapi.cli("set log class nat level debug") + + cls.nat_addr = '10.0.0.3' + + cls.create_pg_interfaces(range(2)) + + for i in cls.pg_interfaces: + i.admin_up() + i.config_ip4() + i.resolve_arp() + + def setUp(self): + super(TestNAT44EndpointDependent3, self).setUp() + self.vapi.nat_set_timeouts( + udp=1, tcp_established=7440, tcp_transitory=30, icmp=1) + self.nat44_add_address(self.nat_addr) + flags = self.config_flags.NAT_IS_INSIDE + self.vapi.nat44_interface_add_del_feature( + sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1) + self.vapi.nat44_interface_add_del_feature( + sw_if_index=self.pg1.sw_if_index, is_add=1) + + @classmethod + def tearDownClass(cls): + super(TestNAT44EndpointDependent3, cls).tearDownClass() + + def init_tcp_session(self, in_if, out_if, sport, ext_dport): + # SYN packet in->out + p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) / + IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) / + TCP(sport=sport, dport=ext_dport, flags="S")) + in_if.add_stream(p) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = out_if.get_capture(1) + p = capture[0] + tcp_port_out = p[TCP].sport + + # SYN + ACK packet out->in + p = (Ether(src=out_if.remote_mac, dst=out_if.local_mac) / + IP(src=out_if.remote_ip4, dst=self.nat_addr) / + TCP(sport=ext_dport, dport=tcp_port_out, flags="SA")) + out_if.add_stream(p) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + in_if.get_capture(1) + + # ACK packet in->out + p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) / + IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) / + TCP(sport=sport, dport=ext_dport, flags="A")) + in_if.add_stream(p) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + out_if.get_capture(1) + + return tcp_port_out + + def test_lru_cleanup(self): + """ LRU cleanup algorithm """ + tcp_port_out = self.init_tcp_session(self.pg0, self.pg1, 2000, 80) + pkts = [] + for i in range(0, self.max_translations - 1): + p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / + IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64) / + UDP(sport=7000+i, dport=80)) + pkts.append(p) + + self.pg0.add_stream(pkts) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + self.pg1.get_capture(len(pkts)) + self.sleep(1.5, "wait for timeouts") + + pkts = [] + for i in range(0, self.max_translations - 1): + p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / + IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64) / + ICMP(id=8000+i, type='echo-request')) + pkts.append(p) + + self.pg0.add_stream(pkts) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + self.pg1.get_capture(len(pkts)) + + class TestNAT44Out2InDPO(MethodHolder): """ NAT44 Test Cases using out2in DPO """ @@ -7188,669 +7462,50 @@ class TestNAT44Out2InDPO(MethodHolder): self.verify_capture_in(capture, self.pg0) -class TestDeterministicNAT(MethodHolder): - """ Deterministic NAT Test Cases """ +class TestNAT64(MethodHolder): + """ NAT64 Test Cases """ @classmethod def setUpConstants(cls): - super(TestDeterministicNAT, cls).setUpConstants() - cls.vpp_cmdline.extend(["nat", "{", "deterministic", "}"]) + super(TestNAT64, cls).setUpConstants() + cls.vpp_cmdline.extend(["nat", "{", "nat64 bib hash buckets 128", + "nat64 st hash buckets 256", "}"]) @classmethod def setUpClass(cls): - super(TestDeterministicNAT, cls).setUpClass() - cls.vapi.cli("set log class nat level debug") + super(TestNAT64, cls).setUpClass() cls.tcp_port_in = 6303 - cls.tcp_external_port = 6303 + cls.tcp_port_out = 6303 cls.udp_port_in = 6304 - cls.udp_external_port = 6304 + cls.udp_port_out = 6304 cls.icmp_id_in = 6305 + cls.icmp_id_out = 6305 + cls.tcp_external_port = 80 cls.nat_addr = '10.0.0.3' + cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr) + cls.vrf1_id = 10 + cls.vrf1_nat_addr = '10.0.10.3' + cls.ipfix_src_port = 4739 + cls.ipfix_domain_id = 1 - cls.create_pg_interfaces(range(3)) - cls.interfaces = list(cls.pg_interfaces) + cls.create_pg_interfaces(range(6)) + cls.ip6_interfaces = list(cls.pg_interfaces[0:1]) + cls.ip6_interfaces.append(cls.pg_interfaces[2]) + cls.ip4_interfaces = list(cls.pg_interfaces[1:2]) - for i in cls.interfaces: - i.admin_up() - i.config_ip4() - i.resolve_arp() + cls.vapi.ip_table_add_del(is_add=1, + table={'table_id': cls.vrf1_id, + 'is_ip6': 1}) + + cls.pg_interfaces[2].set_table_ip6(cls.vrf1_id) cls.pg0.generate_remote_hosts(2) - cls.pg0.configure_ipv4_neighbors() - @classmethod - def tearDownClass(cls): - super(TestDeterministicNAT, cls).tearDownClass() - - def create_stream_in(self, in_if, out_if, ttl=64): - """ - Create packet stream for inside network - - :param in_if: Inside interface - :param out_if: Outside interface - :param ttl: TTL of generated packets - """ - pkts = [] - # TCP - p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) / - IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) / - TCP(sport=self.tcp_port_in, dport=self.tcp_external_port)) - pkts.append(p) - - # UDP - p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) / - IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) / - UDP(sport=self.udp_port_in, dport=self.udp_external_port)) - pkts.append(p) - - # ICMP - p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) / - IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) / - ICMP(id=self.icmp_id_in, type='echo-request')) - pkts.append(p) - - return pkts - - def create_stream_out(self, out_if, dst_ip=None, ttl=64): - """ - Create packet stream for outside network - - :param out_if: Outside interface - :param dst_ip: Destination IP address (Default use global NAT address) - :param ttl: TTL of generated packets - """ - if dst_ip is None: - dst_ip = self.nat_addr - pkts = [] - # TCP - p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) / - IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) / - TCP(dport=self.tcp_port_out, sport=self.tcp_external_port)) - pkts.append(p) - - # UDP - p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) / - IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) / - UDP(dport=self.udp_port_out, sport=self.udp_external_port)) - pkts.append(p) - - # ICMP - p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) / - IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) / - ICMP(id=self.icmp_external_id, type='echo-reply')) - pkts.append(p) - - return pkts - - def verify_capture_out(self, capture, nat_ip=None): - """ - Verify captured packets on outside network - - :param capture: Captured packets - :param nat_ip: Translated IP address (Default use global NAT address) - :param same_port: Source port number is not translated (Default False) - """ - if nat_ip is None: - nat_ip = self.nat_addr - for packet in capture: - try: - self.assertEqual(packet[IP].src, nat_ip) - if packet.haslayer(TCP): - self.tcp_port_out = packet[TCP].sport - elif packet.haslayer(UDP): - self.udp_port_out = packet[UDP].sport - else: - self.icmp_external_id = packet[ICMP].id - except: - self.logger.error(ppp("Unexpected or invalid packet " - "(outside network):", packet)) - raise - - def test_deterministic_mode(self): - """ NAT plugin run deterministic mode """ - in_addr = '172.16.255.0' - out_addr = '172.17.255.50' - in_addr_t = '172.16.255.20' - in_plen = 24 - out_plen = 32 - - nat_config = self.vapi.nat_show_config() - self.assertEqual(1, nat_config.deterministic) - - self.vapi.nat_det_add_del_map(is_add=1, in_addr=in_addr, - in_plen=in_plen, out_addr=out_addr, - out_plen=out_plen) - - rep1 = self.vapi.nat_det_forward(in_addr_t) - self.assertEqual(str(rep1.out_addr), out_addr) - rep2 = self.vapi.nat_det_reverse(rep1.out_port_hi, out_addr) - - self.assertEqual(str(rep2.in_addr), in_addr_t) - - deterministic_mappings = self.vapi.nat_det_map_dump() - self.assertEqual(len(deterministic_mappings), 1) - dsm = deterministic_mappings[0] - self.assertEqual(in_addr, str(dsm.in_addr)) - self.assertEqual(in_plen, dsm.in_plen) - self.assertEqual(out_addr, str(dsm.out_addr)) - self.assertEqual(out_plen, dsm.out_plen) - - self.clear_nat_det() - deterministic_mappings = self.vapi.nat_det_map_dump() - self.assertEqual(len(deterministic_mappings), 0) - - def test_set_timeouts(self): - """ Set deterministic NAT timeouts """ - timeouts_before = self.vapi.nat_get_timeouts() - - self.vapi.nat_set_timeouts( - udp=timeouts_before.udp + 10, - tcp_established=timeouts_before.tcp_established + 10, - tcp_transitory=timeouts_before.tcp_transitory + 10, - icmp=timeouts_before.icmp + 10) - - timeouts_after = self.vapi.nat_get_timeouts() - - self.assertNotEqual(timeouts_before.udp, timeouts_after.udp) - self.assertNotEqual(timeouts_before.icmp, timeouts_after.icmp) - self.assertNotEqual(timeouts_before.tcp_established, - timeouts_after.tcp_established) - self.assertNotEqual(timeouts_before.tcp_transitory, - timeouts_after.tcp_transitory) - - def test_det_in(self): - """ Deterministic NAT translation test (TCP, UDP, ICMP) """ - - nat_ip = "10.0.0.10" - - self.vapi.nat_det_add_del_map(is_add=1, in_addr=self.pg0.remote_ip4, - in_plen=32, - out_addr=socket.inet_aton(nat_ip), - out_plen=32) - - flags = self.config_flags.NAT_IS_INSIDE - self.vapi.nat44_interface_add_del_feature( - sw_if_index=self.pg0.sw_if_index, - flags=flags, is_add=1) - self.vapi.nat44_interface_add_del_feature( - sw_if_index=self.pg1.sw_if_index, - is_add=1) - - # in2out - pkts = self.create_stream_in(self.pg0, self.pg1) - self.pg0.add_stream(pkts) - self.pg_enable_capture(self.pg_interfaces) - self.pg_start() - capture = self.pg1.get_capture(len(pkts)) - self.verify_capture_out(capture, nat_ip) - - # out2in - pkts = self.create_stream_out(self.pg1, nat_ip) - self.pg1.add_stream(pkts) - self.pg_enable_capture(self.pg_interfaces) - self.pg_start() - capture = self.pg0.get_capture(len(pkts)) - self.verify_capture_in(capture, self.pg0) - - # session dump test - sessions = self.vapi.nat_det_session_dump(self.pg0.remote_ip4) - self.assertEqual(len(sessions), 3) - - # TCP session - s = sessions[0] - self.assertEqual(str(s.ext_addr), self.pg1.remote_ip4) - self.assertEqual(s.in_port, self.tcp_port_in) - self.assertEqual(s.out_port, self.tcp_port_out) - self.assertEqual(s.ext_port, self.tcp_external_port) - - # UDP session - s = sessions[1] - self.assertEqual(str(s.ext_addr), self.pg1.remote_ip4) - self.assertEqual(s.in_port, self.udp_port_in) - self.assertEqual(s.out_port, self.udp_port_out) - self.assertEqual(s.ext_port, self.udp_external_port) - - # ICMP session - s = sessions[2] - self.assertEqual(str(s.ext_addr), self.pg1.remote_ip4) - self.assertEqual(s.in_port, self.icmp_id_in) - self.assertEqual(s.out_port, self.icmp_external_id) - - def test_multiple_users(self): - """ Deterministic NAT multiple users """ - - nat_ip = "10.0.0.10" - port_in = 80 - external_port = 6303 - - host0 = self.pg0.remote_hosts[0] - host1 = self.pg0.remote_hosts[1] - - self.vapi.nat_det_add_del_map(is_add=1, in_addr=host0.ip4, in_plen=24, - out_addr=socket.inet_aton(nat_ip), - out_plen=32) - flags = self.config_flags.NAT_IS_INSIDE - self.vapi.nat44_interface_add_del_feature( - sw_if_index=self.pg0.sw_if_index, - flags=flags, is_add=1) - self.vapi.nat44_interface_add_del_feature( - sw_if_index=self.pg1.sw_if_index, - is_add=1) - - # host0 to out - p = (Ether(src=host0.mac, dst=self.pg0.local_mac) / - IP(src=host0.ip4, dst=self.pg1.remote_ip4) / - TCP(sport=port_in, dport=external_port)) - self.pg0.add_stream(p) - self.pg_enable_capture(self.pg_interfaces) - self.pg_start() - capture = self.pg1.get_capture(1) - p = capture[0] - try: - ip = p[IP] - tcp = p[TCP] - self.assertEqual(ip.src, nat_ip) - self.assertEqual(ip.dst, self.pg1.remote_ip4) - self.assertEqual(tcp.dport, external_port) - port_out0 = tcp.sport - except: - self.logger.error(ppp("Unexpected or invalid packet:", p)) - raise - - # host1 to out - p = (Ether(src=host1.mac, dst=self.pg0.local_mac) / - IP(src=host1.ip4, dst=self.pg1.remote_ip4) / - TCP(sport=port_in, dport=external_port)) - self.pg0.add_stream(p) - self.pg_enable_capture(self.pg_interfaces) - self.pg_start() - capture = self.pg1.get_capture(1) - p = capture[0] - try: - ip = p[IP] - tcp = p[TCP] - self.assertEqual(ip.src, nat_ip) - self.assertEqual(ip.dst, self.pg1.remote_ip4) - self.assertEqual(tcp.dport, external_port) - port_out1 = tcp.sport - except: - self.logger.error(ppp("Unexpected or invalid packet:", p)) - raise - - dms = self.vapi.nat_det_map_dump() - self.assertEqual(1, len(dms)) - self.assertEqual(2, dms[0].ses_num) - - # out to host0 - p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) / - IP(src=self.pg1.remote_ip4, dst=nat_ip) / - TCP(sport=external_port, dport=port_out0)) - self.pg1.add_stream(p) - self.pg_enable_capture(self.pg_interfaces) - self.pg_start() - capture = self.pg0.get_capture(1) - p = capture[0] - try: - ip = p[IP] - tcp = p[TCP] - self.assertEqual(ip.src, self.pg1.remote_ip4) - self.assertEqual(ip.dst, host0.ip4) - self.assertEqual(tcp.dport, port_in) - self.assertEqual(tcp.sport, external_port) - except: - self.logger.error(ppp("Unexpected or invalid packet:", p)) - raise - - # out to host1 - p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) / - IP(src=self.pg1.remote_ip4, dst=nat_ip) / - TCP(sport=external_port, dport=port_out1)) - self.pg1.add_stream(p) - self.pg_enable_capture(self.pg_interfaces) - self.pg_start() - capture = self.pg0.get_capture(1) - p = capture[0] - try: - ip = p[IP] - tcp = p[TCP] - self.assertEqual(ip.src, self.pg1.remote_ip4) - self.assertEqual(ip.dst, host1.ip4) - self.assertEqual(tcp.dport, port_in) - self.assertEqual(tcp.sport, external_port) - except: - self.logger.error(ppp("Unexpected or invalid packet", p)) - raise - - # session close api test - self.vapi.nat_det_close_session_out(socket.inet_aton(nat_ip), - port_out1, - self.pg1.remote_ip4, - external_port) - dms = self.vapi.nat_det_map_dump() - self.assertEqual(dms[0].ses_num, 1) - - self.vapi.nat_det_close_session_in(host0.ip4, - port_in, - self.pg1.remote_ip4, - external_port) - dms = self.vapi.nat_det_map_dump() - self.assertEqual(dms[0].ses_num, 0) - - def test_tcp_session_close_detection_in(self): - """ Deterministic NAT TCP session close from inside network """ - self.vapi.nat_det_add_del_map(is_add=1, in_addr=self.pg0.remote_ip4, - in_plen=32, - out_addr=socket.inet_aton(self.nat_addr), - out_plen=32) - flags = self.config_flags.NAT_IS_INSIDE - self.vapi.nat44_interface_add_del_feature( - sw_if_index=self.pg0.sw_if_index, - flags=flags, is_add=1) - self.vapi.nat44_interface_add_del_feature( - sw_if_index=self.pg1.sw_if_index, - is_add=1) - - self.initiate_tcp_session(self.pg0, self.pg1) - - # close the session from inside - try: - # FIN packet in -> out - p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / - IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / - TCP(sport=self.tcp_port_in, dport=self.tcp_external_port, - flags="F")) - self.pg0.add_stream(p) - self.pg_enable_capture(self.pg_interfaces) - self.pg_start() - self.pg1.get_capture(1) - - pkts = [] - - # ACK packet out -> in - p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) / - IP(src=self.pg1.remote_ip4, dst=self.nat_addr) / - TCP(sport=self.tcp_external_port, dport=self.tcp_port_out, - flags="A")) - pkts.append(p) - - # FIN packet out -> in - p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) / - IP(src=self.pg1.remote_ip4, dst=self.nat_addr) / - TCP(sport=self.tcp_external_port, dport=self.tcp_port_out, - flags="F")) - pkts.append(p) - - self.pg1.add_stream(pkts) - self.pg_enable_capture(self.pg_interfaces) - self.pg_start() - self.pg0.get_capture(2) - - # ACK packet in -> out - p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / - IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / - TCP(sport=self.tcp_port_in, dport=self.tcp_external_port, - flags="A")) - self.pg0.add_stream(p) - self.pg_enable_capture(self.pg_interfaces) - self.pg_start() - self.pg1.get_capture(1) - - # Check if deterministic NAT44 closed the session - dms = self.vapi.nat_det_map_dump() - self.assertEqual(0, dms[0].ses_num) - except: - self.logger.error("TCP session termination failed") - raise - - def test_tcp_session_close_detection_out(self): - """ Deterministic NAT TCP session close from outside network """ - self.vapi.nat_det_add_del_map(is_add=1, in_addr=self.pg0.remote_ip4, - in_plen=32, - out_addr=socket.inet_aton(self.nat_addr), - out_plen=32) - flags = self.config_flags.NAT_IS_INSIDE - self.vapi.nat44_interface_add_del_feature( - sw_if_index=self.pg0.sw_if_index, - flags=flags, is_add=1) - self.vapi.nat44_interface_add_del_feature( - sw_if_index=self.pg1.sw_if_index, - is_add=1) - - self.initiate_tcp_session(self.pg0, self.pg1) - - # close the session from outside - try: - # FIN packet out -> in - p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) / - IP(src=self.pg1.remote_ip4, dst=self.nat_addr) / - TCP(sport=self.tcp_external_port, dport=self.tcp_port_out, - flags="F")) - self.pg1.add_stream(p) - self.pg_enable_capture(self.pg_interfaces) - self.pg_start() - self.pg0.get_capture(1) - - pkts = [] - - # ACK packet in -> out - p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / - IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / - TCP(sport=self.tcp_port_in, dport=self.tcp_external_port, - flags="A")) - pkts.append(p) - - # ACK packet in -> out - p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / - IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / - TCP(sport=self.tcp_port_in, dport=self.tcp_external_port, - flags="F")) - pkts.append(p) - - self.pg0.add_stream(pkts) - self.pg_enable_capture(self.pg_interfaces) - self.pg_start() - self.pg1.get_capture(2) - - # ACK packet out -> in - p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) / - IP(src=self.pg1.remote_ip4, dst=self.nat_addr) / - TCP(sport=self.tcp_external_port, dport=self.tcp_port_out, - flags="A")) - self.pg1.add_stream(p) - self.pg_enable_capture(self.pg_interfaces) - self.pg_start() - self.pg0.get_capture(1) - - # Check if deterministic NAT44 closed the session - dms = self.vapi.nat_det_map_dump() - self.assertEqual(0, dms[0].ses_num) - except: - self.logger.error("TCP session termination failed") - raise - - @unittest.skipUnless(running_extended_tests, "part of extended tests") - def test_session_timeout(self): - """ Deterministic NAT session timeouts """ - self.vapi.nat_det_add_del_map(is_add=1, in_addr=self.pg0.remote_ip4, - in_plen=32, - out_addr=socket.inet_aton(self.nat_addr), - out_plen=32) - flags = self.config_flags.NAT_IS_INSIDE - self.vapi.nat44_interface_add_del_feature( - sw_if_index=self.pg0.sw_if_index, - flags=flags, is_add=1) - self.vapi.nat44_interface_add_del_feature( - sw_if_index=self.pg1.sw_if_index, - is_add=1) - - self.initiate_tcp_session(self.pg0, self.pg1) - self.vapi.nat_set_timeouts(udp=5, tcp_established=5, tcp_transitory=5, - icmp=5) - pkts = self.create_stream_in(self.pg0, self.pg1) - self.pg0.add_stream(pkts) - self.pg_enable_capture(self.pg_interfaces) - self.pg_start() - capture = self.pg1.get_capture(len(pkts)) - sleep(15) - - dms = self.vapi.nat_det_map_dump() - self.assertEqual(0, dms[0].ses_num) - - @unittest.skipUnless(running_extended_tests, "part of extended tests") - def test_session_limit_per_user(self): - """ Deterministic NAT maximum sessions per user limit """ - self.vapi.nat_det_add_del_map(is_add=1, in_addr=self.pg0.remote_ip4, - in_plen=32, - out_addr=socket.inet_aton(self.nat_addr), - out_plen=32) - flags = self.config_flags.NAT_IS_INSIDE - self.vapi.nat44_interface_add_del_feature( - sw_if_index=self.pg0.sw_if_index, - flags=flags, is_add=1) - self.vapi.nat44_interface_add_del_feature( - sw_if_index=self.pg1.sw_if_index, - is_add=1) - self.vapi.set_ipfix_exporter(collector_address=self.pg2.remote_ip4, - src_address=self.pg2.local_ip4, - path_mtu=512, - template_interval=10) - self.vapi.nat_ipfix_enable_disable(domain_id=1, src_port=4739, - enable=1) - - pkts = [] - for port in range(1025, 2025): - p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / - IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / - UDP(sport=port, dport=port)) - pkts.append(p) - - self.pg0.add_stream(pkts) - self.pg_enable_capture(self.pg_interfaces) - self.pg_start() - capture = self.pg1.get_capture(len(pkts)) - - p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / - IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / - UDP(sport=3001, dport=3002)) - self.pg0.add_stream(p) - self.pg_enable_capture(self.pg_interfaces) - self.pg_start() - capture = self.pg1.assert_nothing_captured() - - # verify ICMP error packet - capture = self.pg0.get_capture(1) - p = capture[0] - self.assertTrue(p.haslayer(ICMP)) - icmp = p[ICMP] - self.assertEqual(icmp.type, 3) - self.assertEqual(icmp.code, 1) - self.assertTrue(icmp.haslayer(IPerror)) - inner_ip = icmp[IPerror] - self.assertEqual(inner_ip[UDPerror].sport, 3001) - self.assertEqual(inner_ip[UDPerror].dport, 3002) - - dms = self.vapi.nat_det_map_dump() - - self.assertEqual(1000, dms[0].ses_num) - - # verify IPFIX logging - self.vapi.ipfix_flush() - sleep(1) - capture = self.pg2.get_capture(2) - ipfix = IPFIXDecoder() - # first load template - for p in capture: - self.assertTrue(p.haslayer(IPFIX)) - if p.haslayer(Template): - ipfix.add_template(p.getlayer(Template)) - # verify events in data set - for p in capture: - if p.haslayer(Data): - data = ipfix.decode_data_set(p.getlayer(Set)) - self.verify_ipfix_max_entries_per_user(data, - 1000, - self.pg0.remote_ip4) - - def clear_nat_det(self): - """ - Clear deterministic NAT configuration. - """ - self.vapi.nat_ipfix_enable_disable(domain_id=1, src_port=4739, - enable=0) - self.vapi.nat_set_timeouts(udp=300, tcp_established=7440, - tcp_transitory=240, icmp=60) - deterministic_mappings = self.vapi.nat_det_map_dump() - for dsm in deterministic_mappings: - self.vapi.nat_det_add_del_map(is_add=0, in_addr=dsm.in_addr, - in_plen=dsm.in_plen, - out_addr=dsm.out_addr, - out_plen=dsm.out_plen) - - interfaces = self.vapi.nat44_interface_dump() - for intf in interfaces: - self.vapi.nat44_interface_add_del_feature( - sw_if_index=intf.sw_if_index, - flags=intf.flags) - - def tearDown(self): - super(TestDeterministicNAT, self).tearDown() - if not self.vpp_dead: - self.clear_nat_det() - - def show_commands_at_teardown(self): - self.logger.info(self.vapi.cli("show nat44 interfaces")) - self.logger.info(self.vapi.cli("show nat timeouts")) - self.logger.info( - self.vapi.cli("show nat44 deterministic mappings")) - self.logger.info( - self.vapi.cli("show nat44 deterministic sessions")) - - -class TestNAT64(MethodHolder): - """ NAT64 Test Cases """ - - @classmethod - def setUpConstants(cls): - super(TestNAT64, cls).setUpConstants() - cls.vpp_cmdline.extend(["nat", "{", "nat64 bib hash buckets 128", - "nat64 st hash buckets 256", "}"]) - - @classmethod - def setUpClass(cls): - super(TestNAT64, cls).setUpClass() - - cls.tcp_port_in = 6303 - cls.tcp_port_out = 6303 - cls.udp_port_in = 6304 - cls.udp_port_out = 6304 - cls.icmp_id_in = 6305 - cls.icmp_id_out = 6305 - cls.tcp_external_port = 80 - cls.nat_addr = '10.0.0.3' - cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr) - cls.vrf1_id = 10 - cls.vrf1_nat_addr = '10.0.10.3' - cls.ipfix_src_port = 4739 - cls.ipfix_domain_id = 1 - - cls.create_pg_interfaces(range(6)) - cls.ip6_interfaces = list(cls.pg_interfaces[0:1]) - cls.ip6_interfaces.append(cls.pg_interfaces[2]) - cls.ip4_interfaces = list(cls.pg_interfaces[1:2]) - - cls.vapi.ip_table_add_del(is_add=1, - table={'table_id': cls.vrf1_id, - 'is_ip6': 1}) - - cls.pg_interfaces[2].set_table_ip6(cls.vrf1_id) - - cls.pg0.generate_remote_hosts(2) - - for i in cls.ip6_interfaces: - i.admin_up() - i.config_ip6() - i.configure_ipv6_neighbors() + for i in cls.ip6_interfaces: + i.admin_up() + i.config_ip6() + i.configure_ipv6_neighbors() for i in cls.ip4_interfaces: i.admin_up() @@ -8051,12 +7706,10 @@ class TestNAT64(MethodHolder): sw_if_index=self.pg1.sw_if_index) # in2out - tcpn = self.statistics.get_err_counter('/err/nat64-in2out/TCP packets') - udpn = self.statistics.get_err_counter('/err/nat64-in2out/UDP packets') - icmpn = self.statistics.get_err_counter( - '/err/nat64-in2out/ICMP packets') - totaln = self.statistics.get_err_counter( - '/err/nat64-in2out/good in2out packets processed') + tcpn = self.statistics.get_counter('/nat64/in2out/tcp')[0] + udpn = self.statistics.get_counter('/nat64/in2out/udp')[0] + icmpn = self.statistics.get_counter('/nat64/in2out/icmp')[0] + drops = self.statistics.get_counter('/nat64/in2out/drops')[0] pkts = self.create_stream_in_ip6(self.pg0, self.pg1) self.pg0.add_stream(pkts) @@ -8066,23 +7719,21 @@ class TestNAT64(MethodHolder): self.verify_capture_out(capture, nat_ip=self.nat_addr, dst_ip=self.pg1.remote_ip4) - err = self.statistics.get_err_counter('/err/nat64-in2out/TCP packets') - self.assertEqual(err - tcpn, 1) - err = self.statistics.get_err_counter('/err/nat64-in2out/UDP packets') - self.assertEqual(err - udpn, 1) - err = self.statistics.get_err_counter('/err/nat64-in2out/ICMP packets') - self.assertEqual(err - icmpn, 1) - err = self.statistics.get_err_counter( - '/err/nat64-in2out/good in2out packets processed') - self.assertEqual(err - totaln, 3) + if_idx = self.pg0.sw_if_index + cnt = self.statistics.get_counter('/nat64/in2out/tcp')[0] + self.assertEqual(cnt[if_idx] - tcpn[if_idx], 1) + cnt = self.statistics.get_counter('/nat64/in2out/udp')[0] + self.assertEqual(cnt[if_idx] - udpn[if_idx], 1) + cnt = self.statistics.get_counter('/nat64/in2out/icmp')[0] + self.assertEqual(cnt[if_idx] - icmpn[if_idx], 1) + cnt = self.statistics.get_counter('/nat64/in2out/drops')[0] + self.assertEqual(cnt[if_idx] - drops[if_idx], 0) # out2in - tcpn = self.statistics.get_err_counter('/err/nat64-out2in/TCP packets') - udpn = self.statistics.get_err_counter('/err/nat64-out2in/UDP packets') - icmpn = self.statistics.get_err_counter( - '/err/nat64-out2in/ICMP packets') - totaln = self.statistics.get_err_counter( - '/err/nat64-out2in/good out2in packets processed') + tcpn = self.statistics.get_counter('/nat64/out2in/tcp')[0] + udpn = self.statistics.get_counter('/nat64/out2in/udp')[0] + icmpn = self.statistics.get_counter('/nat64/out2in/icmp')[0] + drops = self.statistics.get_counter('/nat64/out2in/drops')[0] pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr) self.pg1.add_stream(pkts) @@ -8092,15 +7743,15 @@ class TestNAT64(MethodHolder): ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4])) self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6) - err = self.statistics.get_err_counter('/err/nat64-out2in/TCP packets') - self.assertEqual(err - tcpn, 2) - err = self.statistics.get_err_counter('/err/nat64-out2in/UDP packets') - self.assertEqual(err - udpn, 1) - err = self.statistics.get_err_counter('/err/nat64-out2in/ICMP packets') - self.assertEqual(err - icmpn, 1) - err = self.statistics.get_err_counter( - '/err/nat64-out2in/good out2in packets processed') - self.assertEqual(err - totaln, 4) + if_idx = self.pg1.sw_if_index + cnt = self.statistics.get_counter('/nat64/out2in/tcp')[0] + self.assertEqual(cnt[if_idx] - tcpn[if_idx], 2) + cnt = self.statistics.get_counter('/nat64/out2in/udp')[0] + self.assertEqual(cnt[if_idx] - udpn[if_idx], 1) + cnt = self.statistics.get_counter('/nat64/out2in/icmp')[0] + self.assertEqual(cnt[if_idx] - icmpn[if_idx], 1) + cnt = self.statistics.get_counter('/nat64/out2in/drops')[0] + self.assertEqual(cnt[if_idx] - drops[if_idx], 0) bibs = self.statistics.get_counter('/nat64/total-bibs') self.assertEqual(bibs[0][0], 3) @@ -9224,158 +8875,5 @@ class TestNAT64(MethodHolder): self.logger.info(self.vapi.cli("show nat64 session table all")) -class TestNAT66(MethodHolder): - """ NAT66 Test Cases """ - - @classmethod - def setUpClass(cls): - super(TestNAT66, cls).setUpClass() - - cls.nat_addr = 'fd01:ff::2' - - cls.create_pg_interfaces(range(2)) - cls.interfaces = list(cls.pg_interfaces) - - for i in cls.interfaces: - i.admin_up() - i.config_ip6() - i.configure_ipv6_neighbors() - - @classmethod - def tearDownClass(cls): - super(TestNAT66, cls).tearDownClass() - - def test_static(self): - """ 1:1 NAT66 test """ - flags = self.config_flags.NAT_IS_INSIDE - self.vapi.nat66_add_del_interface(is_add=1, flags=flags, - sw_if_index=self.pg0.sw_if_index) - self.vapi.nat66_add_del_interface(is_add=1, - sw_if_index=self.pg1.sw_if_index) - self.vapi.nat66_add_del_static_mapping( - local_ip_address=self.pg0.remote_ip6, - external_ip_address=self.nat_addr, - is_add=1) - - # in2out - pkts = [] - p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / - IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) / - TCP()) - pkts.append(p) - p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / - IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) / - UDP()) - pkts.append(p) - p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / - IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) / - ICMPv6EchoRequest()) - pkts.append(p) - p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / - IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) / - GRE() / IP() / TCP()) - pkts.append(p) - self.pg0.add_stream(pkts) - self.pg_enable_capture(self.pg_interfaces) - self.pg_start() - capture = self.pg1.get_capture(len(pkts)) - - for packet in capture: - try: - self.assertEqual(packet[IPv6].src, self.nat_addr) - self.assertEqual(packet[IPv6].dst, self.pg1.remote_ip6) - self.assert_packet_checksums_valid(packet) - except: - self.logger.error(ppp("Unexpected or invalid packet:", packet)) - raise - - # out2in - pkts = [] - p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) / - IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) / - TCP()) - pkts.append(p) - p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) / - IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) / - UDP()) - pkts.append(p) - p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) / - IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) / - ICMPv6EchoReply()) - pkts.append(p) - p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) / - IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) / - GRE() / IP() / TCP()) - pkts.append(p) - self.pg1.add_stream(pkts) - self.pg_enable_capture(self.pg_interfaces) - self.pg_start() - capture = self.pg0.get_capture(len(pkts)) - for packet in capture: - try: - self.assertEqual(packet[IPv6].src, self.pg1.remote_ip6) - self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6) - self.assert_packet_checksums_valid(packet) - except: - self.logger.error(ppp("Unexpected or invalid packet:", packet)) - raise - - sm = self.vapi.nat66_static_mapping_dump() - self.assertEqual(len(sm), 1) - self.assertEqual(sm[0].total_pkts, 8) - - def test_check_no_translate(self): - """ NAT66 translate only when egress interface is outside interface """ - flags = self.config_flags.NAT_IS_INSIDE - self.vapi.nat66_add_del_interface(is_add=1, flags=flags, - sw_if_index=self.pg0.sw_if_index) - self.vapi.nat66_add_del_interface(is_add=1, flags=flags, - sw_if_index=self.pg1.sw_if_index) - self.vapi.nat66_add_del_static_mapping( - local_ip_address=self.pg0.remote_ip6, - external_ip_address=self.nat_addr, - is_add=1) - - # in2out - p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / - IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) / - UDP()) - self.pg0.add_stream([p]) - self.pg_enable_capture(self.pg_interfaces) - self.pg_start() - capture = self.pg1.get_capture(1) - packet = capture[0] - try: - self.assertEqual(packet[IPv6].src, self.pg0.remote_ip6) - self.assertEqual(packet[IPv6].dst, self.pg1.remote_ip6) - except: - self.logger.error(ppp("Unexpected or invalid packet:", packet)) - raise - - def clear_nat66(self): - """ - Clear NAT66 configuration. - """ - interfaces = self.vapi.nat66_interface_dump() - for intf in interfaces: - self.vapi.nat66_add_del_interface(is_add=0, flags=intf.flags, - sw_if_index=intf.sw_if_index) - - static_mappings = self.vapi.nat66_static_mapping_dump() - for sm in static_mappings: - self.vapi.nat66_add_del_static_mapping( - local_ip_address=sm.local_ip_address, - external_ip_address=sm.external_ip_address, vrf_id=sm.vrf_id, - is_add=0) - - def tearDown(self): - super(TestNAT66, self).tearDown() - self.clear_nat66() - - def show_commands_at_teardown(self): - self.logger.info(self.vapi.cli("show nat66 interfaces")) - self.logger.info(self.vapi.cli("show nat66 static mappings")) - - if __name__ == '__main__': unittest.main(testRunner=VppTestRunner)