X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=src%2Fplugins%2Fnat%2Ftest%2Ftest_nat.py;h=1b3c7e7a8b154bd2ea541e0658f4542b15a6fb9a;hb=88120824acc299a0aec17ce4c208dbc8be394779;hp=e267c4ef2bcafba2c8a936ca636bddd8a19741e1;hpb=1a0a89770688a37e500f634b68805b1984eccac0;p=vpp.git diff --git a/src/plugins/nat/test/test_nat.py b/src/plugins/nat/test/test_nat.py index e267c4ef2bc..1b3c7e7a8b1 100644 --- a/src/plugins/nat/test/test_nat.py +++ b/src/plugins/nat/test/test_nat.py @@ -1,39 +1,35 @@ #!/usr/bin/env python3 +import ipaddress +import random import socket -import unittest import struct -import random - -from framework import VppTestCase, VppTestRunner, running_extended_tests +import unittest +from io import BytesIO +from time import sleep import scapy.compat +from framework import VppTestCase, VppTestRunner, running_extended_tests +from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder +from scapy.all import bind_layers, Packet, ByteEnumField, ShortField, \ + IPField, IntField, LongField, XByteField, FlagsField, FieldLenField, \ + PacketListField +from scapy.data import IP_PROTOS from scapy.layers.inet import IP, TCP, UDP, ICMP from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror +from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6, IPv6ExtHdrFragment from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply, \ ICMPv6ND_NS, ICMPv6ND_NA, ICMPv6NDOptDstLLAddr, fragment6 -from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6, IPv6ExtHdrFragment from scapy.layers.l2 import Ether, ARP, GRE -from scapy.data import IP_PROTOS -from scapy.packet import bind_layers, Raw -from util import ppp -from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder -from time import sleep -from util import ip4_range -from vpp_papi import mac_pton +from scapy.packet import Raw from syslog_rfc5424_parser import SyslogMessage, ParseError -from syslog_rfc5424_parser.constants import SyslogFacility, SyslogSeverity -from io import BytesIO -from vpp_papi import VppEnum -from vpp_ip_route import VppIpRoute, VppRoutePath, FibPathType -from vpp_neighbor import VppNeighbor -from scapy.all import bind_layers, Packet, ByteEnumField, ShortField, \ - IPField, IntField, LongField, XByteField, FlagsField, FieldLenField, \ - PacketListField -from ipaddress import IPv6Network +from syslog_rfc5424_parser.constants import SyslogSeverity +from util import ip4_range from util import ppc, ppp -from socket import inet_pton, AF_INET from vpp_acl import AclRule, VppAcl, VppAclInterface +from vpp_ip_route import VppIpRoute, VppRoutePath +from vpp_neighbor import VppNeighbor +from vpp_papi import VppEnum # NAT HA protocol event data @@ -42,7 +38,7 @@ class Event(Packet): fields_desc = [ByteEnumField("event_type", None, {1: "add", 2: "del", 3: "refresh"}), ByteEnumField("protocol", None, - {0: "udp", 1: "tcp", 2: "icmp"}), + {0: "other", 1: "udp", 2: "tcp", 3: "icmp"}), ShortField("flags", 0), IPField("in_addr", None), IPField("out_addr", None), @@ -857,7 +853,8 @@ class MethodHolder(VppTestCase): else: nat44_ses_delete_num += 1 # sourceIPv4Address - self.assertEqual(self.pg0.remote_ip4n, record[8]) + self.assertEqual(self.pg0.remote_ip4, + str(ipaddress.IPv4Address(record[8]))) # postNATSourceIPv4Address self.assertEqual(socket.inet_pton(socket.AF_INET, self.nat_addr), record[225]) @@ -929,44 +926,6 @@ class MethodHolder(VppTestCase): # maxBIBEntries self.assertEqual(struct.pack("I", limit), record[472]) - def verify_ipfix_max_fragments_ip6(self, data, limit, src_addr): - """ - Verify IPFIX maximum IPv6 fragments pending reassembly exceeded event - - :param data: Decoded IPFIX data records - :param limit: Number of maximum fragments pending reassembly - :param src_addr: IPv6 source address - """ - self.assertEqual(1, len(data)) - record = data[0] - # natEvent - self.assertEqual(scapy.compat.orb(record[230]), 13) - # natQuotaExceededEvent - self.assertEqual(struct.pack("I", 5), record[466]) - # maxFragmentsPendingReassembly - self.assertEqual(struct.pack("I", limit), record[475]) - # sourceIPv6Address - self.assertEqual(src_addr, record[27]) - - def verify_ipfix_max_fragments_ip4(self, data, limit, src_addr): - """ - Verify IPFIX maximum IPv4 fragments pending reassembly exceeded event - - :param data: Decoded IPFIX data records - :param limit: Number of maximum fragments pending reassembly - :param src_addr: IPv4 source address - """ - self.assertEqual(1, len(data)) - record = data[0] - # natEvent - self.assertEqual(scapy.compat.orb(record[230]), 13) - # natQuotaExceededEvent - self.assertEqual(struct.pack("I", 5), record[466]) - # maxFragmentsPendingReassembly - self.assertEqual(struct.pack("I", limit), record[475]) - # sourceIPv4Address - self.assertEqual(src_addr, record[8]) - def verify_ipfix_bib(self, data, is_create, src_addr): """ Verify IPFIX NAT64 BIB create and delete events @@ -983,7 +942,7 @@ class MethodHolder(VppTestCase): else: self.assertEqual(scapy.compat.orb(record[230]), 11) # sourceIPv6Address - self.assertEqual(src_addr, record[27]) + self.assertEqual(src_addr, str(ipaddress.IPv6Address(record[27]))) # postNATSourceIPv4Address self.assertEqual(self.nat_addr_n, record[225]) # protocolIdentifier @@ -1014,7 +973,7 @@ class MethodHolder(VppTestCase): else: self.assertEqual(scapy.compat.orb(record[230]), 7) # sourceIPv6Address - self.assertEqual(src_addr, record[27]) + self.assertEqual(src_addr, str(ipaddress.IPv6Address(record[27]))) # destinationIPv6Address self.assertEqual(socket.inet_pton(socket.AF_INET6, self.compose_ip6(dst_addr, @@ -1483,6 +1442,38 @@ class TestNAT44(MethodHolder): def tearDownClass(cls): super(TestNAT44, cls).tearDownClass() + def test_clear_sessions(self): + """ NAT44 session clearing test """ + + self.nat44_add_address(self.nat_addr) + flags = self.config_flags.NAT_IS_INSIDE + self.vapi.nat44_interface_add_del_feature( + sw_if_index=self.pg0.sw_if_index, + flags=flags, is_add=1) + self.vapi.nat44_interface_add_del_feature( + sw_if_index=self.pg1.sw_if_index, + is_add=1) + + nat_config = self.vapi.nat_show_config() + self.assertEqual(0, nat_config.endpoint_dependent) + + pkts = self.create_stream_in(self.pg0, self.pg1) + self.pg0.add_stream(pkts) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg1.get_capture(len(pkts)) + self.verify_capture_out(capture) + + sessions = self.statistics.get_counter('/nat44/total-sessions') + self.assertTrue(sessions[0][0] > 0) + self.logger.info("sessions before clearing: %s" % sessions[0][0]) + + self.vapi.cli("clear nat44 sessions") + + sessions = self.statistics.get_counter('/nat44/total-sessions') + self.assertEqual(sessions[0][0], 0) + self.logger.info("sessions after clearing: %s" % sessions[0][0]) + def test_dynamic(self): """ NAT44 dynamic translation test """ self.nat44_add_address(self.nat_addr) @@ -2703,7 +2694,7 @@ class TestNAT44(MethodHolder): self.verify_capture_out(capture) self.nat44_add_address(self.nat_addr, is_add=0) self.vapi.ipfix_flush() - capture = self.pg3.get_capture(9) + capture = self.pg3.get_capture(7) ipfix = IPFIXDecoder() # first load template for p in capture: @@ -2748,7 +2739,7 @@ class TestNAT44(MethodHolder): self.pg1.assert_nothing_captured() sleep(1) self.vapi.ipfix_flush() - capture = self.pg3.get_capture(9) + capture = self.pg3.get_capture(7) ipfix = IPFIXDecoder() # first load template for p in capture: @@ -2811,7 +2802,7 @@ class TestNAT44(MethodHolder): self.pg1.assert_nothing_captured() sleep(1) self.vapi.ipfix_flush() - capture = self.pg3.get_capture(9) + capture = self.pg3.get_capture(7) ipfix = IPFIXDecoder() # first load template for p in capture: @@ -4187,6 +4178,139 @@ class TestNAT44(MethodHolder): self.logger.info(self.vapi.cli("show nat ha")) +class TestNAT44EndpointDependent2(MethodHolder): + """ Endpoint-Dependent mapping and filtering test cases """ + + @classmethod + def setUpConstants(cls): + super(TestNAT44EndpointDependent2, cls).setUpConstants() + cls.vpp_cmdline.extend(["nat", "{", "endpoint-dependent", "}"]) + + @classmethod + def tearDownClass(cls): + super(TestNAT44EndpointDependent2, cls).tearDownClass() + + def tearDown(self): + super(TestNAT44EndpointDependent2, self).tearDown() + + @classmethod + def create_and_add_ip4_table(cls, i, table_id): + cls.vapi.ip_table_add_del(is_add=1, table={'table_id': table_id}) + i.set_table_ip4(table_id) + + @classmethod + def setUpClass(cls): + super(TestNAT44EndpointDependent2, cls).setUpClass() + + cls.create_pg_interfaces(range(3)) + cls.interfaces = list(cls.pg_interfaces) + + cls.create_and_add_ip4_table(cls.pg1, 10) + + for i in cls.interfaces: + i.admin_up() + i.config_ip4() + i.resolve_arp() + + i.generate_remote_hosts(1) + i.configure_ipv4_neighbors() + + def setUp(self): + super(TestNAT44EndpointDependent2, self).setUp() + + nat_config = self.vapi.nat_show_config() + self.assertEqual(1, nat_config.endpoint_dependent) + + def nat_add_inside_interface(self, i): + self.vapi.nat44_interface_add_del_feature( + flags=self.config_flags.NAT_IS_INSIDE, + sw_if_index=i.sw_if_index, is_add=1) + + def nat_add_outside_interface(self, i): + self.vapi.nat44_interface_add_del_feature( + flags=self.config_flags.NAT_IS_OUTSIDE, + sw_if_index=i.sw_if_index, is_add=1) + + def nat_add_interface_address(self, i): + self.nat_addr = i.local_ip4 + self.vapi.nat44_add_del_interface_addr( + sw_if_index=i.sw_if_index, is_add=1) + + def nat_add_address(self, address, vrf_id=0xFFFFFFFF): + self.nat_addr = address + self.nat44_add_address(address, vrf_id=vrf_id) + + def cli(self, command): + result = self.vapi.cli(command) + self.logger.info(result) + # print(result) + + def show_configuration(self): + self.cli("show interface") + self.cli("show interface address") + self.cli("show nat44 addresses") + self.cli("show nat44 interfaces") + + def create_tcp_stream(self, in_if, out_if, count): + """ + Create tcp packet stream + + :param in_if: Inside interface + :param out_if: Outside interface + :param count: count of packets to generate + """ + pkts = [] + port = 6303 + + for i in range(count): + p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) / + IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=64) / + TCP(sport=port + i, dport=20)) + pkts.append(p) + + return pkts + + def test_session_limit_per_vrf(self): + + inside = self.pg0 + inside_vrf10 = self.pg1 + outside = self.pg2 + + limit = 5 + + # 2 interfaces pg0, pg1 (vrf10, limit 1 tcp session) + # non existing vrf_id makes process core dump + self.vapi.nat44_set_session_limit(session_limit=limit, vrf_id=10) + + self.nat_add_inside_interface(inside) + self.nat_add_inside_interface(inside_vrf10) + self.nat_add_outside_interface(outside) + + # vrf independent + self.nat_add_interface_address(outside) + + # BUG: causing core dump - when bad vrf_id is specified + # self.nat44_add_address(outside.local_ip4, vrf_id=20) + + self.show_configuration() + + stream = self.create_tcp_stream(inside_vrf10, outside, limit * 2) + inside_vrf10.add_stream(stream) + + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + + capture = outside.get_capture(limit) + + stream = self.create_tcp_stream(inside, outside, limit * 2) + inside.add_stream(stream) + + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + + capture = outside.get_capture(len(stream)) + + class TestNAT44EndpointDependent(MethodHolder): """ Endpoint-Dependent mapping and filtering test cases """ @@ -4473,6 +4597,43 @@ class TestNAT44EndpointDependent(MethodHolder): self.reass_hairpinning(proto=IP_PROTOS.udp) self.reass_hairpinning(proto=IP_PROTOS.icmp) + def test_clear_sessions(self): + """ NAT44 ED session clearing test """ + + self.nat44_add_address(self.nat_addr) + flags = self.config_flags.NAT_IS_INSIDE + self.vapi.nat44_interface_add_del_feature( + sw_if_index=self.pg0.sw_if_index, + flags=flags, is_add=1) + self.vapi.nat44_interface_add_del_feature( + sw_if_index=self.pg1.sw_if_index, + is_add=1) + + nat_config = self.vapi.nat_show_config() + self.assertEqual(1, nat_config.endpoint_dependent) + + pkts = self.create_stream_in(self.pg0, self.pg1) + self.pg0.add_stream(pkts) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg1.get_capture(len(pkts)) + self.verify_capture_out(capture) + + sessions = self.statistics.get_counter('/nat44/total-sessions') + self.assertTrue(sessions[0][0] > 0) + self.logger.info("sessions before clearing: %s" % sessions[0][0]) + + # just for testing purposes + self.logger.info(self.vapi.cli("show nat44 summary")) + + self.vapi.cli("clear nat44 sessions") + + self.logger.info(self.vapi.cli("show nat44 summary")) + + sessions = self.statistics.get_counter('/nat44/total-sessions') + self.assertEqual(sessions[0][0], 0) + self.logger.info("sessions after clearing: %s" % sessions[0][0]) + def test_dynamic(self): """ NAT44 dynamic translation test """ @@ -4548,11 +4709,56 @@ class TestNAT44EndpointDependent(MethodHolder): '/err/nat44-ed-out2in/good out2in packets processed') self.assertEqual(err - totaln, 3) - users = self.statistics.get_counter('/nat44/total-users') - self.assertEqual(users[0][0], 1) sessions = self.statistics.get_counter('/nat44/total-sessions') self.assertEqual(sessions[0][0], 3) + def test_dynamic_out_of_ports(self): + """ NAT44 dynamic translation test: out of ports """ + + flags = self.config_flags.NAT_IS_INSIDE + self.vapi.nat44_interface_add_del_feature( + sw_if_index=self.pg0.sw_if_index, + flags=flags, is_add=1) + self.vapi.nat44_interface_add_del_feature( + sw_if_index=self.pg1.sw_if_index, + is_add=1) + + nat_config = self.vapi.nat_show_config() + self.assertEqual(1, nat_config.endpoint_dependent) + + # in2out and no NAT addresses added + err_old = self.statistics.get_err_counter( + '/err/nat44-ed-in2out-slowpath/out of ports') + + pkts = self.create_stream_in(self.pg0, self.pg1) + self.pg0.add_stream(pkts) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + self.pg1.get_capture(0, timeout=1) + + err_new = self.statistics.get_err_counter( + '/err/nat44-ed-in2out-slowpath/out of ports') + + self.assertEqual(err_new - err_old, len(pkts)) + + # in2out after NAT addresses added + self.nat44_add_address(self.nat_addr) + + err_old = self.statistics.get_err_counter( + '/err/nat44-ed-in2out-slowpath/out of ports') + + pkts = self.create_stream_in(self.pg0, self.pg1) + self.pg0.add_stream(pkts) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg1.get_capture(len(pkts)) + self.verify_capture_out(capture) + + err_new = self.statistics.get_err_counter( + '/err/nat44-ed-in2out-slowpath/out of ports') + + self.assertEqual(err_new, err_old) + def test_dynamic_output_feature_vrf(self): """ NAT44 dynamic translation test: output-feature, VRF""" @@ -4645,8 +4851,6 @@ class TestNAT44EndpointDependent(MethodHolder): '/err/nat44-ed-out2in/good out2in packets processed') self.assertEqual(err - totaln, 3) - users = self.statistics.get_counter('/nat44/total-users') - self.assertEqual(users[0][0], 1) sessions = self.statistics.get_counter('/nat44/total-sessions') self.assertEqual(sessions[0][0], 3) @@ -6855,13 +7059,6 @@ class TestNAT44EndpointDependent(MethodHolder): self.pg_start() self.pg1.get_capture(1) - nsessions = 0 - users = self.vapi.nat44_user_dump() - self.assertEqual(len(users), 1) - self.assertEqual(str(users[0].ip_address), - self.pg0.remote_ip4) - self.assertEqual(users[0].nsessions, 1) - def test_syslog_sess(self): """ Test syslog session creation and deletion """ self.vapi.syslog_set_filter( @@ -6909,6 +7106,111 @@ class TestNAT44EndpointDependent(MethodHolder): self.logger.info(self.vapi.cli("show nat timeouts")) +class TestNAT44EndpointDependent2(MethodHolder): + """ Endpoint-Dependent mapping and filtering extra test cases """ + + translation_buckets = 5 + + @classmethod + def setUpConstants(cls): + super(TestNAT44EndpointDependent2, cls).setUpConstants() + cls.vpp_cmdline.extend([ + "nat", "{", "endpoint-dependent", + "translation hash buckets %d" % cls.translation_buckets, + "}" + ]) + + @classmethod + def setUpClass(cls): + super(TestNAT44EndpointDependent2, cls).setUpClass() + cls.vapi.cli("set log class nat level debug") + + cls.nat_addr = '10.0.0.3' + + cls.create_pg_interfaces(range(2)) + + for i in cls.pg_interfaces: + i.admin_up() + i.config_ip4() + i.resolve_arp() + + def setUp(self): + super(TestNAT44EndpointDependent2, self).setUp() + self.vapi.nat_set_timeouts( + udp=1, tcp_established=7440, tcp_transitory=30, icmp=1) + self.nat44_add_address(self.nat_addr) + flags = self.config_flags.NAT_IS_INSIDE + self.vapi.nat44_interface_add_del_feature( + sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1) + self.vapi.nat44_interface_add_del_feature( + sw_if_index=self.pg1.sw_if_index, is_add=1) + + @classmethod + def tearDownClass(cls): + super(TestNAT44EndpointDependent2, cls).tearDownClass() + + def init_tcp_session(self, in_if, out_if, sport, ext_dport): + # SYN packet in->out + p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) / + IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) / + TCP(sport=sport, dport=ext_dport, flags="S")) + in_if.add_stream(p) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = out_if.get_capture(1) + p = capture[0] + tcp_port_out = p[TCP].sport + + # SYN + ACK packet out->in + p = (Ether(src=out_if.remote_mac, dst=out_if.local_mac) / + IP(src=out_if.remote_ip4, dst=self.nat_addr) / + TCP(sport=ext_dport, dport=tcp_port_out, flags="SA")) + out_if.add_stream(p) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + in_if.get_capture(1) + + # ACK packet in->out + p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) / + IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) / + TCP(sport=sport, dport=ext_dport, flags="A")) + in_if.add_stream(p) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + out_if.get_capture(1) + + return tcp_port_out + + def test_lru_cleanup(self): + """ LRU cleanup algorithm """ + tcp_port_out = self.init_tcp_session(self.pg0, self.pg1, 2000, 80) + max_translations = 10 * self.translation_buckets + pkts = [] + for i in range(0, max_translations - 1): + p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / + IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64) / + UDP(sport=7000+i, dport=80)) + pkts.append(p) + + self.pg0.add_stream(pkts) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + self.pg1.get_capture(len(pkts)) + self.sleep(1.5, "wait for timeouts") + + pkts = [] + for i in range(0, max_translations - 1): + p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / + IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64) / + ICMP(id=8000+i, type='echo-request')) + pkts.append(p) + + self.pg0.add_stream(pkts) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + self.pg1.get_capture(len(pkts)) + + class TestNAT44Out2InDPO(MethodHolder): """ NAT44 Test Cases using out2in DPO """ @@ -8022,19 +8324,19 @@ class TestNAT64(MethodHolder): self.vapi.nat64_add_del_interface(is_add=1, flags=0, sw_if_index=self.pg1.sw_if_index) - self.vapi.nat64_add_del_static_bib(i_addr=self.pg0.remote_ip6n, + self.vapi.nat64_add_del_static_bib(i_addr=self.pg0.remote_ip6, o_addr=self.nat_addr, i_port=self.tcp_port_in, o_port=self.tcp_port_out, proto=IP_PROTOS.tcp, vrf_id=0, is_add=1) - self.vapi.nat64_add_del_static_bib(i_addr=self.pg0.remote_ip6n, + self.vapi.nat64_add_del_static_bib(i_addr=self.pg0.remote_ip6, o_addr=self.nat_addr, i_port=self.udp_port_in, o_port=self.udp_port_out, proto=IP_PROTOS.udp, vrf_id=0, is_add=1) - self.vapi.nat64_add_del_static_bib(i_addr=self.pg0.remote_ip6n, + self.vapi.nat64_add_del_static_bib(i_addr=self.pg0.remote_ip6, o_addr=self.nat_addr, i_port=self.icmp_id_in, o_port=self.icmp_id_out, @@ -8831,7 +9133,7 @@ class TestNAT64(MethodHolder): self.pg1.assert_nothing_captured() sleep(1) self.vapi.ipfix_flush() - capture = self.pg3.get_capture(9) + capture = self.pg3.get_capture(7) ipfix = IPFIXDecoder() # first load template for p in capture: @@ -8907,7 +9209,7 @@ class TestNAT64(MethodHolder): p = self.pg1.get_capture(1) self.tcp_port_out = p[0][TCP].sport self.vapi.ipfix_flush() - capture = self.pg3.get_capture(10) + capture = self.pg3.get_capture(8) ipfix = IPFIXDecoder() # first load template for p in capture: @@ -8925,11 +9227,11 @@ class TestNAT64(MethodHolder): if p.haslayer(Data): data = ipfix.decode_data_set(p.getlayer(Set)) if scapy.compat.orb(data[0][230]) == 10: - self.verify_ipfix_bib(data, 1, self.pg0.remote_ip6n) + self.verify_ipfix_bib(data, 1, self.pg0.remote_ip6) elif scapy.compat.orb(data[0][230]) == 6: self.verify_ipfix_nat64_ses(data, 1, - self.pg0.remote_ip6n, + self.pg0.remote_ip6, self.pg1.remote_ip4, 25) else: @@ -8955,11 +9257,11 @@ class TestNAT64(MethodHolder): if p.haslayer(Data): data = ipfix.decode_data_set(p.getlayer(Set)) if scapy.compat.orb(data[0][230]) == 11: - self.verify_ipfix_bib(data, 0, self.pg0.remote_ip6n) + self.verify_ipfix_bib(data, 0, self.pg0.remote_ip6) elif scapy.compat.orb(data[0][230]) == 7: self.verify_ipfix_nat64_ses(data, 0, - self.pg0.remote_ip6n, + self.pg0.remote_ip6, self.pg1.remote_ip4, 25) else: @@ -9103,7 +9405,7 @@ class TestNAT66(MethodHolder): self.vapi.nat66_add_del_interface(is_add=1, sw_if_index=self.pg1.sw_if_index) self.vapi.nat66_add_del_static_mapping( - local_ip_address=self.pg0.remote_ip6n, + local_ip_address=self.pg0.remote_ip6, external_ip_address=self.nat_addr, is_add=1) @@ -9182,7 +9484,7 @@ class TestNAT66(MethodHolder): self.vapi.nat66_add_del_interface(is_add=1, flags=flags, sw_if_index=self.pg1.sw_if_index) self.vapi.nat66_add_del_static_mapping( - local_ip_address=self.pg0.remote_ip6n, + local_ip_address=self.pg0.remote_ip6, external_ip_address=self.nat_addr, is_add=1)