X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=test%2Ftest_nat.py;h=363df719a75d8f4b6f854d5391d68146b2c765a8;hb=16782368979b3af7918e54b6998c0b14dbb16da2;hp=35e89e39b9cbfc102a6bedcc9a5c56f6b026960a;hpb=235a47e371c8ffea352790c001ef39876f2aaef5;p=vpp.git diff --git a/test/test_nat.py b/test/test_nat.py index 35e89e39b9c..363df719a75 100644 --- a/test/test_nat.py +++ b/test/test_nat.py @@ -9,7 +9,8 @@ import random from framework import VppTestCase, VppTestRunner, running_extended_tests from scapy.layers.inet import IP, TCP, UDP, ICMP from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror -from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply +from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply, \ + ICMPv6ND_NS, ICMPv6ND_NA, ICMPv6NDOptDstLLAddr from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6, IPv6ExtHdrFragment from scapy.layers.l2 import Ether, ARP, GRE from scapy.data import IP_PROTOS @@ -1252,7 +1253,7 @@ class TestNAT44(MethodHolder): self.verify_capture_out(capture, same_port=True, packet_num=1) self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp) - def _test_forwarding(self): + def test_forwarding(self): """ NAT44 forwarding test """ self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index) @@ -1266,7 +1267,7 @@ class TestNAT44(MethodHolder): external_ip=alias_ip) try: - # in2out - static mapping match + # static mapping match pkts = self.create_stream_out(self.pg1) self.pg1.add_stream(pkts) @@ -1282,7 +1283,7 @@ class TestNAT44(MethodHolder): capture = self.pg1.get_capture(len(pkts)) self.verify_capture_out(capture, same_port=True) - # in2out - no static mapping match + # no static mapping match host0 = self.pg0.remote_hosts[0] self.pg0.remote_hosts[0] = self.pg0.remote_hosts[1] @@ -1306,19 +1307,6 @@ class TestNAT44(MethodHolder): finally: self.pg0.remote_hosts[0] = host0 - user = self.pg0.remote_hosts[1] - sessions = self.vapi.nat44_user_session_dump(user.ip4n, 0) - self.assertEqual(len(sessions), 3) - self.assertTrue(sessions[0].ext_host_valid) - self.vapi.nat44_del_session( - sessions[0].inside_ip_address, - sessions[0].inside_port, - sessions[0].protocol, - ext_host_address=sessions[0].ext_host_address, - ext_host_port=sessions[0].ext_host_port) - sessions = self.vapi.nat44_user_session_dump(user.ip4n, 0) - self.assertEqual(len(sessions), 2) - finally: self.vapi.nat44_forwarding_enable_disable(0) self.vapi.nat44_add_del_static_mapping(local_ip=real_ip, @@ -2382,6 +2370,8 @@ class TestNAT44(MethodHolder): self.pg1.set_table_ip4(vrf_id2) self.pg0.config_ip4() self.pg1.config_ip4() + self.pg0.resolve_arp() + self.pg1.resolve_arp() self.nat44_add_address(nat_ip1, vrf_id=vrf_id1) self.nat44_add_address(nat_ip2, vrf_id=vrf_id2) @@ -2390,28 +2380,34 @@ class TestNAT44(MethodHolder): self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index, is_inside=0) - # first VRF - pkts = self.create_stream_in(self.pg0, self.pg2) - self.pg0.add_stream(pkts) - self.pg_enable_capture(self.pg_interfaces) - self.pg_start() - capture = self.pg2.get_capture(len(pkts)) - self.verify_capture_out(capture, nat_ip1) + try: + # first VRF + pkts = self.create_stream_in(self.pg0, self.pg2) + self.pg0.add_stream(pkts) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg2.get_capture(len(pkts)) + self.verify_capture_out(capture, nat_ip1) - # second VRF - pkts = self.create_stream_in(self.pg1, self.pg2) - self.pg1.add_stream(pkts) - self.pg_enable_capture(self.pg_interfaces) - self.pg_start() - capture = self.pg2.get_capture(len(pkts)) - self.verify_capture_out(capture, nat_ip2) + # second VRF + pkts = self.create_stream_in(self.pg1, self.pg2) + self.pg1.add_stream(pkts) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg2.get_capture(len(pkts)) + self.verify_capture_out(capture, nat_ip2) - self.pg0.unconfig_ip4() - self.pg1.unconfig_ip4() - self.pg0.set_table_ip4(0) - self.pg1.set_table_ip4(0) - self.vapi.ip_table_add_del(vrf_id1, is_add=0) - self.vapi.ip_table_add_del(vrf_id2, is_add=0) + finally: + self.pg0.unconfig_ip4() + self.pg1.unconfig_ip4() + self.pg0.set_table_ip4(0) + self.pg1.set_table_ip4(0) + self.pg0.config_ip4() + self.pg1.config_ip4() + self.pg0.resolve_arp() + self.pg1.resolve_arp() + self.vapi.ip_table_add_del(vrf_id1, is_add=0) + self.vapi.ip_table_add_del(vrf_id2, is_add=0) def test_vrf_feature_independent(self): """ NAT44 tenant VRF independent address pool mode """ @@ -3142,6 +3138,74 @@ class TestNAT44(MethodHolder): self.verify_ipfix_max_fragments_ip4(data, 0, self.pg0.remote_ip4n) + def test_multiple_outside_vrf(self): + """ Multiple outside VRF """ + vrf_id1 = 1 + vrf_id2 = 2 + + self.pg1.unconfig_ip4() + self.pg2.unconfig_ip4() + self.vapi.ip_table_add_del(vrf_id1, is_add=1) + self.vapi.ip_table_add_del(vrf_id2, is_add=1) + self.pg1.set_table_ip4(vrf_id1) + self.pg2.set_table_ip4(vrf_id2) + self.pg1.config_ip4() + self.pg2.config_ip4() + self.pg1.resolve_arp() + self.pg2.resolve_arp() + + self.nat44_add_address(self.nat_addr) + self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index) + self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index, + is_inside=0) + self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index, + is_inside=0) + + try: + # first VRF + pkts = self.create_stream_in(self.pg0, self.pg1) + self.pg0.add_stream(pkts) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg1.get_capture(len(pkts)) + self.verify_capture_out(capture, self.nat_addr) + + pkts = self.create_stream_out(self.pg1, self.nat_addr) + self.pg1.add_stream(pkts) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg0.get_capture(len(pkts)) + self.verify_capture_in(capture, self.pg0) + + self.tcp_port_in = 60303 + self.udp_port_in = 60304 + self.icmp_id_in = 60305 + + # second VRF + pkts = self.create_stream_in(self.pg0, self.pg2) + self.pg0.add_stream(pkts) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg2.get_capture(len(pkts)) + self.verify_capture_out(capture, self.nat_addr) + + pkts = self.create_stream_out(self.pg2, self.nat_addr) + self.pg2.add_stream(pkts) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg0.get_capture(len(pkts)) + self.verify_capture_in(capture, self.pg0) + + finally: + self.pg1.unconfig_ip4() + self.pg2.unconfig_ip4() + self.pg1.set_table_ip4(0) + self.pg2.set_table_ip4(0) + self.pg1.config_ip4() + self.pg2.config_ip4() + self.pg1.resolve_arp() + self.pg2.resolve_arp() + def tearDown(self): super(TestNAT44, self).tearDown() if not self.vpp_dead: @@ -3182,7 +3246,7 @@ class TestNAT44EndpointDependent(MethodHolder): cls.ipfix_domain_id = 1 cls.tcp_external_port = 80 - cls.create_pg_interfaces(range(5)) + cls.create_pg_interfaces(range(7)) cls.interfaces = list(cls.pg_interfaces[0:3]) for i in cls.interfaces: @@ -3206,6 +3270,58 @@ class TestNAT44EndpointDependent(MethodHolder): cls.pg4._remote_hosts[1]._ip4 = cls.pg4._remote_hosts[0]._ip4 cls.pg4.resolve_arp() + zero_ip4n = socket.inet_pton(socket.AF_INET, "0.0.0.0") + cls.vapi.ip_table_add_del(1, is_add=1) + + cls.pg5._local_ip4 = "10.1.1.1" + cls.pg5._local_ip4n = socket.inet_pton(socket.AF_INET, + cls.pg5.local_ip4) + cls.pg5._remote_hosts[0]._ip4 = "10.1.1.2" + cls.pg5._remote_hosts[0]._ip4n = socket.inet_pton( + socket.AF_INET, cls.pg5.remote_ip4) + cls.pg5.set_table_ip4(1) + cls.pg5.config_ip4() + cls.pg5.admin_up() + cls.vapi.ip_add_del_route(dst_address=cls.pg5.remote_ip4n, + dst_address_length=32, + table_id=1, + next_hop_sw_if_index=cls.pg5.sw_if_index, + next_hop_address=zero_ip4n) + + cls.pg6._local_ip4 = "10.1.2.1" + cls.pg6._local_ip4n = socket.inet_pton(socket.AF_INET, + cls.pg6.local_ip4) + cls.pg6._remote_hosts[0]._ip4 = "10.1.2.2" + cls.pg6._remote_hosts[0]._ip4n = socket.inet_pton( + socket.AF_INET, cls.pg6.remote_ip4) + cls.pg6.set_table_ip4(1) + cls.pg6.config_ip4() + cls.pg6.admin_up() + cls.vapi.ip_add_del_route(dst_address=cls.pg6.remote_ip4n, + dst_address_length=32, + table_id=1, + next_hop_sw_if_index=cls.pg6.sw_if_index, + next_hop_address=zero_ip4n) + + cls.vapi.ip_add_del_route(dst_address=cls.pg6.remote_ip4n, + dst_address_length=16, + next_hop_address=zero_ip4n, + table_id=0, + next_hop_table_id=1) + cls.vapi.ip_add_del_route(dst_address=zero_ip4n, + dst_address_length=0, + next_hop_address=zero_ip4n, + table_id=1, + next_hop_table_id=0) + cls.vapi.ip_add_del_route(dst_address=zero_ip4n, + dst_address_length=0, + table_id=0, + next_hop_sw_if_index=cls.pg1.sw_if_index, + next_hop_address=cls.pg1.local_ip4n) + + cls.pg5.resolve_arp() + cls.pg6.resolve_arp() + except Exception: super(TestNAT44EndpointDependent, cls).tearDownClass() raise @@ -4421,6 +4537,299 @@ class TestNAT44EndpointDependent(MethodHolder): capture = self.pg0.get_capture(len(pkts)) self.verify_capture_in(capture, self.pg0) + def test_multiple_vrf(self): + """ Multiple VRF setup """ + external_addr = '1.2.3.4' + external_port = 80 + local_port = 8080 + port = 0 + + self.vapi.nat44_forwarding_enable_disable(1) + self.nat44_add_address(self.nat_addr) + self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index) + self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index, + is_inside=0) + self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index, + is_inside=0) + self.vapi.nat44_interface_add_del_feature(self.pg5.sw_if_index) + self.vapi.nat44_interface_add_del_feature(self.pg5.sw_if_index, + is_inside=0) + self.vapi.nat44_interface_add_del_feature(self.pg6.sw_if_index, + is_inside=0) + self.nat44_add_static_mapping(self.pg5.remote_ip4, external_addr, + local_port, external_port, vrf_id=1, + proto=IP_PROTOS.tcp, out2in_only=1) + self.nat44_add_static_mapping( + self.pg0.remote_ip4, external_sw_if_index=self.pg0.sw_if_index, + local_port=local_port, vrf_id=0, external_port=external_port, + proto=IP_PROTOS.tcp, out2in_only=1) + + # from client to service (both VRF1) + p = (Ether(src=self.pg6.remote_mac, dst=self.pg6.local_mac) / + IP(src=self.pg6.remote_ip4, dst=external_addr) / + TCP(sport=12345, dport=external_port)) + self.pg6.add_stream(p) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg5.get_capture(1) + p = capture[0] + try: + ip = p[IP] + tcp = p[TCP] + self.assertEqual(ip.dst, self.pg5.remote_ip4) + self.assertEqual(tcp.dport, local_port) + self.assert_packet_checksums_valid(p) + except: + self.logger.error(ppp("Unexpected or invalid packet:", p)) + raise + + # from service back to client (both VRF1) + p = (Ether(src=self.pg5.remote_mac, dst=self.pg5.local_mac) / + IP(src=self.pg5.remote_ip4, dst=self.pg6.remote_ip4) / + TCP(sport=local_port, dport=12345)) + self.pg5.add_stream(p) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg6.get_capture(1) + p = capture[0] + try: + ip = p[IP] + tcp = p[TCP] + self.assertEqual(ip.src, external_addr) + self.assertEqual(tcp.sport, external_port) + self.assert_packet_checksums_valid(p) + except: + self.logger.error(ppp("Unexpected or invalid packet:", p)) + raise + + # dynamic NAT from VRF1 to VRF0 (output-feature) + p = (Ether(src=self.pg5.remote_mac, dst=self.pg5.local_mac) / + IP(src=self.pg5.remote_ip4, dst=self.pg1.remote_ip4) / + TCP(sport=2345, dport=22)) + self.pg5.add_stream(p) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg1.get_capture(1) + p = capture[0] + try: + ip = p[IP] + tcp = p[TCP] + self.assertEqual(ip.src, self.nat_addr) + self.assertNotEqual(tcp.sport, 2345) + self.assert_packet_checksums_valid(p) + port = tcp.sport + except: + self.logger.error(ppp("Unexpected or invalid packet:", p)) + raise + + p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) / + IP(src=self.pg1.remote_ip4, dst=self.nat_addr) / + TCP(sport=22, dport=port)) + self.pg1.add_stream(p) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg5.get_capture(1) + p = capture[0] + try: + ip = p[IP] + tcp = p[TCP] + self.assertEqual(ip.dst, self.pg5.remote_ip4) + self.assertEqual(tcp.dport, 2345) + self.assert_packet_checksums_valid(p) + except: + self.logger.error(ppp("Unexpected or invalid packet:", p)) + raise + + # from client VRF1 to service VRF0 + p = (Ether(src=self.pg6.remote_mac, dst=self.pg6.local_mac) / + IP(src=self.pg6.remote_ip4, dst=self.pg0.local_ip4) / + TCP(sport=12346, dport=external_port)) + self.pg6.add_stream(p) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg0.get_capture(1) + p = capture[0] + try: + ip = p[IP] + tcp = p[TCP] + self.assertEqual(ip.dst, self.pg0.remote_ip4) + self.assertEqual(tcp.dport, local_port) + self.assert_packet_checksums_valid(p) + except: + self.logger.error(ppp("Unexpected or invalid packet:", p)) + raise + + # from service VRF0 back to client VRF1 + p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / + IP(src=self.pg0.remote_ip4, dst=self.pg6.remote_ip4) / + TCP(sport=local_port, dport=12346)) + self.pg0.add_stream(p) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg6.get_capture(1) + p = capture[0] + try: + ip = p[IP] + tcp = p[TCP] + self.assertEqual(ip.src, self.pg0.local_ip4) + self.assertEqual(tcp.sport, external_port) + self.assert_packet_checksums_valid(p) + except: + self.logger.error(ppp("Unexpected or invalid packet:", p)) + raise + + # from client VRF0 to service VRF1 + p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / + IP(src=self.pg0.remote_ip4, dst=external_addr) / + TCP(sport=12347, dport=external_port)) + self.pg0.add_stream(p) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg5.get_capture(1) + p = capture[0] + try: + ip = p[IP] + tcp = p[TCP] + self.assertEqual(ip.dst, self.pg5.remote_ip4) + self.assertEqual(tcp.dport, local_port) + self.assert_packet_checksums_valid(p) + except: + self.logger.error(ppp("Unexpected or invalid packet:", p)) + raise + + # from service VRF1 back to client VRF0 + p = (Ether(src=self.pg5.remote_mac, dst=self.pg5.local_mac) / + IP(src=self.pg5.remote_ip4, dst=self.pg0.remote_ip4) / + TCP(sport=local_port, dport=12347)) + self.pg5.add_stream(p) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg0.get_capture(1) + p = capture[0] + try: + ip = p[IP] + tcp = p[TCP] + self.assertEqual(ip.src, external_addr) + self.assertEqual(tcp.sport, external_port) + self.assert_packet_checksums_valid(p) + except: + self.logger.error(ppp("Unexpected or invalid packet:", p)) + raise + + # from client to server (both VRF1, no translation) + p = (Ether(src=self.pg6.remote_mac, dst=self.pg6.local_mac) / + IP(src=self.pg6.remote_ip4, dst=self.pg5.remote_ip4) / + TCP(sport=12348, dport=local_port)) + self.pg6.add_stream(p) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg5.get_capture(1) + p = capture[0] + try: + ip = p[IP] + tcp = p[TCP] + self.assertEqual(ip.dst, self.pg5.remote_ip4) + self.assertEqual(tcp.dport, local_port) + self.assert_packet_checksums_valid(p) + except: + self.logger.error(ppp("Unexpected or invalid packet:", p)) + raise + + # from server back to client (both VRF1, no translation) + p = (Ether(src=self.pg5.remote_mac, dst=self.pg5.local_mac) / + IP(src=self.pg5.remote_ip4, dst=self.pg6.remote_ip4) / + TCP(sport=local_port, dport=12348)) + self.pg5.add_stream(p) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg6.get_capture(1) + p = capture[0] + try: + ip = p[IP] + tcp = p[TCP] + self.assertEqual(ip.src, self.pg5.remote_ip4) + self.assertEqual(tcp.sport, local_port) + self.assert_packet_checksums_valid(p) + except: + self.logger.error(ppp("Unexpected or invalid packet:", p)) + raise + + # from client VRF1 to server VRF0 (no translation) + p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / + IP(src=self.pg0.remote_ip4, dst=self.pg6.remote_ip4) / + TCP(sport=local_port, dport=12349)) + self.pg0.add_stream(p) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg6.get_capture(1) + p = capture[0] + try: + ip = p[IP] + tcp = p[TCP] + self.assertEqual(ip.src, self.pg0.remote_ip4) + self.assertEqual(tcp.sport, local_port) + self.assert_packet_checksums_valid(p) + except: + self.logger.error(ppp("Unexpected or invalid packet:", p)) + raise + + # from server VRF0 back to client VRF1 (no translation) + p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / + IP(src=self.pg0.remote_ip4, dst=self.pg6.remote_ip4) / + TCP(sport=local_port, dport=12349)) + self.pg0.add_stream(p) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg6.get_capture(1) + p = capture[0] + try: + ip = p[IP] + tcp = p[TCP] + self.assertEqual(ip.src, self.pg0.remote_ip4) + self.assertEqual(tcp.sport, local_port) + self.assert_packet_checksums_valid(p) + except: + self.logger.error(ppp("Unexpected or invalid packet:", p)) + raise + + # from client VRF0 to server VRF1 (no translation) + p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / + IP(src=self.pg0.remote_ip4, dst=self.pg5.remote_ip4) / + TCP(sport=12344, dport=local_port)) + self.pg0.add_stream(p) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg5.get_capture(1) + p = capture[0] + try: + ip = p[IP] + tcp = p[TCP] + self.assertEqual(ip.dst, self.pg5.remote_ip4) + self.assertEqual(tcp.dport, local_port) + self.assert_packet_checksums_valid(p) + except: + self.logger.error(ppp("Unexpected or invalid packet:", p)) + raise + + # from server VRF1 back to client VRF0 (no translation) + p = (Ether(src=self.pg5.remote_mac, dst=self.pg5.local_mac) / + IP(src=self.pg5.remote_ip4, dst=self.pg0.remote_ip4) / + TCP(sport=local_port, dport=12344)) + self.pg5.add_stream(p) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg0.get_capture(1) + p = capture[0] + try: + ip = p[IP] + tcp = p[TCP] + self.assertEqual(ip.src, self.pg5.remote_ip4) + self.assertEqual(tcp.sport, local_port) + self.assert_packet_checksums_valid(p) + except: + self.logger.error(ppp("Unexpected or invalid packet:", p)) + raise + def tearDown(self): super(TestNAT44EndpointDependent, self).tearDown() if not self.vpp_dead: @@ -5189,7 +5598,7 @@ class TestNAT64(MethodHolder): cls.ipfix_src_port = 4739 cls.ipfix_domain_id = 1 - cls.create_pg_interfaces(range(5)) + cls.create_pg_interfaces(range(6)) cls.ip6_interfaces = list(cls.pg_interfaces[0:1]) cls.ip6_interfaces.append(cls.pg_interfaces[2]) cls.ip4_interfaces = list(cls.pg_interfaces[1:2]) @@ -5216,10 +5625,67 @@ class TestNAT64(MethodHolder): cls.pg3.config_ip6() cls.pg3.configure_ipv6_neighbors() + cls.pg5.admin_up() + cls.pg5.config_ip6() + except Exception: super(TestNAT64, cls).tearDownClass() raise + def test_nat64_inside_interface_handles_neighbor_advertisement(self): + """ NAT64 inside interface handles Neighbor Advertisement """ + + self.vapi.nat64_add_del_interface(self.pg5.sw_if_index) + + # Try to send ping + ping = (Ether(dst=self.pg5.local_mac, src=self.pg5.remote_mac) / + IPv6(src=self.pg5.remote_ip6, dst=self.pg5.local_ip6) / + ICMPv6EchoRequest()) + pkts = [ping] + self.pg5.add_stream(pkts) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + + # Wait for Neighbor Solicitation + capture = self.pg5.get_capture(len(pkts)) + self.assertEqual(1, len(capture)) + packet = capture[0] + try: + self.assertEqual(packet[IPv6].src, self.pg5.local_ip6) + self.assertTrue(packet.haslayer(ICMPv6ND_NS)) + tgt = packet[ICMPv6ND_NS].tgt + except: + self.logger.error(ppp("Unexpected or invalid packet:", packet)) + raise + + # Send Neighbor Advertisement + p = (Ether(dst=self.pg5.local_mac, src=self.pg5.remote_mac) / + IPv6(src=self.pg5.remote_ip6, dst=self.pg5.local_ip6) / + ICMPv6ND_NA(tgt=tgt) / + ICMPv6NDOptDstLLAddr(lladdr=self.pg5.remote_mac)) + pkts = [p] + self.pg5.add_stream(pkts) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + + # Try to send ping again + pkts = [ping] + self.pg5.add_stream(pkts) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + + # Wait for ping reply + capture = self.pg5.get_capture(len(pkts)) + self.assertEqual(1, len(capture)) + packet = capture[0] + try: + self.assertEqual(packet[IPv6].src, self.pg5.local_ip6) + self.assertEqual(packet[IPv6].dst, self.pg5.remote_ip6) + self.assertTrue(packet.haslayer(ICMPv6EchoReply)) + except: + self.logger.error(ppp("Unexpected or invalid packet:", packet)) + raise + def test_pool(self): """ Add/delete address to NAT64 pool """ nat_addr = socket.inet_pton(socket.AF_INET, '1.2.3.4')