X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=test%2Ftest_nat.py;h=5cd0ad9d8abd746d30954711d07f1928f081b6f7;hb=bee9768bd41d52d80d8ee3a03a2b1c236da080b0;hp=e368fe032ccb11d8ab381029a0297135b424a4ef;hpb=9a475373ed9b1ff9952acc06353ddaa50c2c96bd;p=vpp.git diff --git a/test/test_nat.py b/test/test_nat.py index e368fe032cc..5cd0ad9d8ab 100644 --- a/test/test_nat.py +++ b/test/test_nat.py @@ -6,6 +6,8 @@ import struct import random from framework import VppTestCase, VppTestRunner, running_extended_tests + +import scapy.compat from scapy.layers.inet import IP, TCP, UDP, ICMP from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply, \ @@ -176,8 +178,8 @@ class MethodHolder(VppTestCase): protocol=id_m.protocol, is_add=0) - adresses = self.vapi.nat44_address_dump() - for addr in adresses: + addresses = self.vapi.nat44_address_dump() + for addr in addresses: self.vapi.nat44_add_del_address_range(addr.ip_address, addr.ip_address, twice_nat=addr.twice_nat, @@ -239,7 +241,7 @@ class MethodHolder(VppTestCase): :param ip: IP address :param is_add: 1 if add, 0 if delete (Default add) - :param twice_nat: twice NAT address for extenal hosts + :param twice_nat: twice NAT address for external hosts """ nat_addr = socket.inet_pton(socket.AF_INET, ip) self.vapi.nat44_add_del_address_range(nat_addr, nat_addr, is_add, @@ -320,7 +322,8 @@ class MethodHolder(VppTestCase): pref_n[13] = ip4_n[1] pref_n[14] = ip4_n[2] pref_n[15] = ip4_n[3] - return socket.inet_ntop(socket.AF_INET6, ''.join(pref_n)) + packed_pref_n = b''.join([scapy.compat.chb(x) for x in pref_n]) + return socket.inet_ntop(socket.AF_INET6, packed_pref_n) def extract_ip4(self, ip6, plen): """ @@ -478,7 +481,7 @@ class MethodHolder(VppTestCase): :param capture: Captured packets :param nat_ip: Translated IP address (Default use global NAT address) - :param same_port: Sorce port number is not translated (Default False) + :param same_port: Source port number is not translated (Default False) :param dst_ip: Destination IP address (Default do not verify) :param is_ip6: If L3 protocol is IPv6 (Default False) """ @@ -531,7 +534,7 @@ class MethodHolder(VppTestCase): :param capture: Captured packets :param nat_ip: Translated IP address - :param same_port: Sorce port number is not translated (Default False) + :param same_port: Source port number is not translated (Default False) :param dst_ip: Destination IP address (Default do not verify) """ return self.verify_capture_out(capture, nat_ip, same_port, dst_ip, @@ -683,14 +686,14 @@ class MethodHolder(VppTestCase): :param data: Payload data :param proto: protocol (TCP, UDP, ICMP) :param echo_reply: use echo_reply if protocol is ICMP - :returns: Fragmets + :returns: Fragments """ if proto == IP_PROTOS.tcp: p = (IP(src=src_if.remote_ip4, dst=dst) / TCP(sport=sport, dport=dport) / Raw(data)) - p = p.__class__(str(p)) - chksum = p['TCP'].chksum + p = p.__class__(scapy.compat.raw(p)) + chksum = p[TCP].chksum proto_header = TCP(sport=sport, dport=dport, chksum=chksum) elif proto == IP_PROTOS.udp: proto_header = UDP(sport=sport, dport=dport) @@ -745,7 +748,7 @@ class MethodHolder(VppTestCase): :param pref: NAT64 prefix :param plen: NAT64 prefix length :param fragsize: size of fragments - :returns: Fragmets + :returns: Fragments """ if pref is None: dst_ip6 = ''.join(['64:ff9b::', dst]) @@ -869,8 +872,8 @@ class MethodHolder(VppTestCase): self.assertEqual(6, len(data)) for record in data: # natEvent - self.assertIn(ord(record[230]), [4, 5]) - if ord(record[230]) == 4: + self.assertIn(scapy.compat.orb(record[230]), [4, 5]) + if scapy.compat.orb(record[230]) == 4: nat44_ses_create_num += 1 else: nat44_ses_delete_num += 1 @@ -882,16 +885,16 @@ class MethodHolder(VppTestCase): # ingressVRFID self.assertEqual(struct.pack("!I", 0), record[234]) # protocolIdentifier/sourceTransportPort/postNAPTSourceTransportPort - if IP_PROTOS.icmp == ord(record[4]): + if IP_PROTOS.icmp == scapy.compat.orb(record[4]): self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7]) self.assertEqual(struct.pack("!H", self.icmp_id_out), record[227]) - elif IP_PROTOS.tcp == ord(record[4]): + elif IP_PROTOS.tcp == scapy.compat.orb(record[4]): self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7]) self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227]) - elif IP_PROTOS.udp == ord(record[4]): + elif IP_PROTOS.udp == scapy.compat.orb(record[4]): self.assertEqual(struct.pack("!H", self.udp_port_in), record[7]) self.assertEqual(struct.pack("!H", self.udp_port_out), @@ -910,7 +913,7 @@ class MethodHolder(VppTestCase): self.assertEqual(1, len(data)) record = data[0] # natEvent - self.assertEqual(ord(record[230]), 3) + self.assertEqual(scapy.compat.orb(record[230]), 3) # natPoolID self.assertEqual(struct.pack("!I", 0), record[283]) @@ -924,7 +927,7 @@ class MethodHolder(VppTestCase): self.assertEqual(1, len(data)) record = data[0] # natEvent - self.assertEqual(ord(record[230]), 13) + self.assertEqual(scapy.compat.orb(record[230]), 13) # natQuotaExceededEvent self.assertEqual(struct.pack("I", 1), record[466]) # maxSessionEntries @@ -940,7 +943,7 @@ class MethodHolder(VppTestCase): self.assertEqual(1, len(data)) record = data[0] # natEvent - self.assertEqual(ord(record[230]), 13) + self.assertEqual(scapy.compat.orb(record[230]), 13) # natQuotaExceededEvent self.assertEqual(struct.pack("I", 2), record[466]) # maxBIBEntries @@ -957,7 +960,7 @@ class MethodHolder(VppTestCase): self.assertEqual(1, len(data)) record = data[0] # natEvent - self.assertEqual(ord(record[230]), 13) + self.assertEqual(scapy.compat.orb(record[230]), 13) # natQuotaExceededEvent self.assertEqual(struct.pack("I", 5), record[466]) # maxFragmentsPendingReassembly @@ -976,7 +979,7 @@ class MethodHolder(VppTestCase): self.assertEqual(1, len(data)) record = data[0] # natEvent - self.assertEqual(ord(record[230]), 13) + self.assertEqual(scapy.compat.orb(record[230]), 13) # natQuotaExceededEvent self.assertEqual(struct.pack("I", 5), record[466]) # maxFragmentsPendingReassembly @@ -996,15 +999,15 @@ class MethodHolder(VppTestCase): record = data[0] # natEvent if is_create: - self.assertEqual(ord(record[230]), 10) + self.assertEqual(scapy.compat.orb(record[230]), 10) else: - self.assertEqual(ord(record[230]), 11) + self.assertEqual(scapy.compat.orb(record[230]), 11) # sourceIPv6Address self.assertEqual(src_addr, record[27]) # postNATSourceIPv4Address self.assertEqual(self.nat_addr_n, record[225]) # protocolIdentifier - self.assertEqual(IP_PROTOS.tcp, ord(record[4])) + self.assertEqual(IP_PROTOS.tcp, scapy.compat.orb(record[4])) # ingressVRFID self.assertEqual(struct.pack("!I", 0), record[234]) # sourceTransportPort @@ -1027,9 +1030,9 @@ class MethodHolder(VppTestCase): record = data[0] # natEvent if is_create: - self.assertEqual(ord(record[230]), 6) + self.assertEqual(scapy.compat.orb(record[230]), 6) else: - self.assertEqual(ord(record[230]), 7) + self.assertEqual(scapy.compat.orb(record[230]), 7) # sourceIPv6Address self.assertEqual(src_addr, record[27]) # destinationIPv6Address @@ -1044,7 +1047,7 @@ class MethodHolder(VppTestCase): self.assertEqual(socket.inet_pton(socket.AF_INET, dst_addr), record[226]) # protocolIdentifier - self.assertEqual(IP_PROTOS.tcp, ord(record[4])) + self.assertEqual(IP_PROTOS.tcp, scapy.compat.orb(record[4])) # ingressVRFID self.assertEqual(struct.pack("!I", 0), record[234]) # sourceTransportPort @@ -1076,7 +1079,7 @@ class MethodHolder(VppTestCase): self.assertEqual(1, len(data)) record = data[0] # natEvent - self.assertEqual(ord(record[230]), 13) + self.assertEqual(scapy.compat.orb(record[230]), 13) # natQuotaExceededEvent self.assertEqual(struct.pack("I", 3), record[466]) # maxEntriesPerUser @@ -1167,9 +1170,9 @@ class MethodHolder(VppTestCase): layer = self.proto2layer(proto) if proto == IP_PROTOS.tcp: - data = "A" * 4 + "B" * 16 + "C" * 3 + data = b"A" * 4 + b"B" * 16 + b"C" * 3 else: - data = "A" * 16 + "B" * 16 + "C" * 3 + data = b"A" * 16 + b"B" * 16 + b"C" * 3 self.port_in = random.randint(1025, 65535) reass = self.vapi.nat_reass_dump() @@ -1248,9 +1251,9 @@ class MethodHolder(VppTestCase): layer = self.proto2layer(proto) if proto == IP_PROTOS.tcp: - data = "A" * 4 + "B" * 16 + "C" * 3 + data = b"A" * 4 + b"B" * 16 + b"C" * 3 else: - data = "A" * 16 + "B" * 16 + "C" * 3 + data = b"A" * 16 + b"B" * 16 + b"C" * 3 self.port_in = random.randint(1025, 65535) for i in range(2): @@ -1317,9 +1320,9 @@ class MethodHolder(VppTestCase): layer = self.proto2layer(proto) if proto == IP_PROTOS.tcp: - data = "A" * 4 + "B" * 16 + "C" * 3 + data = b"A" * 4 + b"B" * 16 + b"C" * 3 else: - data = "A" * 16 + "B" * 16 + "C" * 3 + data = b"A" * 16 + b"B" * 16 + b"C" * 3 # send packet from host to server pkts = self.create_stream_frag(self.pg0, @@ -1346,9 +1349,9 @@ class MethodHolder(VppTestCase): layer = self.proto2layer(proto) if proto == IP_PROTOS.tcp: - data = "A" * 4 + "B" * 16 + "C" * 3 + data = b"A" * 4 + b"B" * 16 + b"C" * 3 else: - data = "A" * 16 + "B" * 16 + "C" * 3 + data = b"A" * 16 + b"B" * 16 + b"C" * 3 self.port_in = random.randint(1025, 65535) for i in range(2): @@ -1422,9 +1425,9 @@ class MethodHolder(VppTestCase): layer = self.proto2layer(proto) if proto == IP_PROTOS.tcp: - data = "A" * 4 + "B" * 16 + "C" * 3 + data = b"A" * 4 + b"B" * 16 + b"C" * 3 else: - data = "A" * 16 + "B" * 16 + "C" * 3 + data = b"A" * 16 + b"B" * 16 + b"C" * 3 self.port_in = random.randint(1025, 65535) for i in range(2): @@ -1559,6 +1562,10 @@ class TestNAT44(MethodHolder): super(TestNAT44, cls).tearDownClass() raise + @classmethod + def tearDownClass(cls): + super(TestNAT44, cls).tearDownClass() + def test_dynamic(self): """ NAT44 dynamic translation test """ self.nat44_add_address(self.nat_addr) @@ -1863,7 +1870,7 @@ class TestNAT44(MethodHolder): is_inside=0) sm = self.vapi.nat44_static_mapping_dump() self.assertEqual(len(sm), 1) - self.assertEqual((sm[0].tag).split('\0', 1)[0], '') + self.assertEqual((sm[0].tag).split(b'\0', 1)[0], b'') self.assertEqual(sm[0].protocol, 0) self.assertEqual(sm[0].local_port, 0) self.assertEqual(sm[0].external_port, 0) @@ -1891,7 +1898,7 @@ class TestNAT44(MethodHolder): self.tcp_port_out = 6303 self.udp_port_out = 6304 self.icmp_id_out = 6305 - tag = "testTAG" + tag = b"testTAG" self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip, tag=tag) self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index) @@ -1899,7 +1906,7 @@ class TestNAT44(MethodHolder): is_inside=0) sm = self.vapi.nat44_static_mapping_dump() self.assertEqual(len(sm), 1) - self.assertEqual((sm[0].tag).split('\0', 1)[0], tag) + self.assertEqual((sm[0].tag).split(b'\0', 1)[0], tag) # out2in pkts = self.create_stream_out(self.pg1, nat_ip) @@ -2581,23 +2588,23 @@ class TestNAT44(MethodHolder): self.vapi.nat44_add_del_interface_addr(self.pg7.sw_if_index) # no address in NAT pool - adresses = self.vapi.nat44_address_dump() - self.assertEqual(0, len(adresses)) + addresses = self.vapi.nat44_address_dump() + self.assertEqual(0, len(addresses)) # configure interface address and check NAT address pool self.pg7.config_ip4() - adresses = self.vapi.nat44_address_dump() - self.assertEqual(1, len(adresses)) - self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n) + addresses = self.vapi.nat44_address_dump() + self.assertEqual(1, len(addresses)) + self.assertEqual(addresses[0].ip_address[0:4], self.pg7.local_ip4n) # remove interface address and check NAT address pool self.pg7.unconfig_ip4() - adresses = self.vapi.nat44_address_dump() - self.assertEqual(0, len(adresses)) + addresses = self.vapi.nat44_address_dump() + self.assertEqual(0, len(addresses)) def test_interface_addr_static_mapping(self): """ Static mapping with addresses from interface """ - tag = "testTAG" + tag = b"testTAG" self.vapi.nat44_add_del_interface_addr(self.pg7.sw_if_index) self.nat44_add_static_mapping( @@ -2610,7 +2617,7 @@ class TestNAT44(MethodHolder): self.assertEqual(1, len(static_mappings)) self.assertEqual(self.pg7.sw_if_index, static_mappings[0].external_sw_if_index) - self.assertEqual((static_mappings[0].tag).split('\0', 1)[0], tag) + self.assertEqual((static_mappings[0].tag).split(b'\0', 1)[0], tag) # configure interface address and check static mappings self.pg7.config_ip4() @@ -2621,7 +2628,7 @@ class TestNAT44(MethodHolder): if sm.external_sw_if_index == 0xFFFFFFFF: self.assertEqual(sm.external_ip_address[0:4], self.pg7.local_ip4n) - self.assertEqual((sm.tag).split('\0', 1)[0], tag) + self.assertEqual((sm.tag).split(b'\0', 1)[0], tag) resolved = True self.assertTrue(resolved) @@ -2631,7 +2638,7 @@ class TestNAT44(MethodHolder): self.assertEqual(1, len(static_mappings)) self.assertEqual(self.pg7.sw_if_index, static_mappings[0].external_sw_if_index) - self.assertEqual((static_mappings[0].tag).split('\0', 1)[0], tag) + self.assertEqual((static_mappings[0].tag).split(b'\0', 1)[0], tag) # configure interface address again and check static mappings self.pg7.config_ip4() @@ -2642,7 +2649,7 @@ class TestNAT44(MethodHolder): if sm.external_sw_if_index == 0xFFFFFFFF: self.assertEqual(sm.external_ip_address[0:4], self.pg7.local_ip4n) - self.assertEqual((sm.tag).split('\0', 1)[0], tag) + self.assertEqual((sm.tag).split(b'\0', 1)[0], tag) resolved = True self.assertTrue(resolved) @@ -2694,10 +2701,10 @@ class TestNAT44(MethodHolder): identity_mappings[0].sw_if_index) def test_ipfix_nat44_sess(self): - """ IPFIX logging NAT44 session created/delted """ + """ IPFIX logging NAT44 session created/deleted """ self.ipfix_domain_id = 10 self.ipfix_src_port = 20202 - colector_port = 30303 + collector_port = 30303 bind_layers(UDP, IPFIX, dport=30303) self.nat44_add_address(self.nat_addr) self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index) @@ -2707,7 +2714,7 @@ class TestNAT44(MethodHolder): src_address=self.pg3.local_ip4n, path_mtu=512, template_interval=10, - collector_port=colector_port) + collector_port=collector_port) self.vapi.nat_ipfix_enable_disable(domain_id=self.ipfix_domain_id, src_port=self.ipfix_src_port) @@ -2727,7 +2734,7 @@ class TestNAT44(MethodHolder): self.assertEqual(p[IP].src, self.pg3.local_ip4) self.assertEqual(p[IP].dst, self.pg3.remote_ip4) self.assertEqual(p[UDP].sport, self.ipfix_src_port) - self.assertEqual(p[UDP].dport, colector_port) + self.assertEqual(p[UDP].dport, collector_port) self.assertEqual(p[IPFIX].observationDomainID, self.ipfix_domain_id) if p.haslayer(Template): @@ -3539,7 +3546,7 @@ class TestNAT44(MethodHolder): is_inside=0) self.vapi.nat44_forwarding_enable_disable(1) - data = "A" * 16 + "B" * 16 + "C" * 3 + data = b"A" * 16 + b"B" * 16 + b"C" * 3 pkts = self.create_stream_frag(self.pg1, self.pg0.remote_ip4, 4789, @@ -3667,7 +3674,7 @@ class TestNAT44(MethodHolder): self.vapi.nat_ipfix_enable_disable(domain_id=self.ipfix_domain_id, src_port=self.ipfix_src_port) - data = "A" * 4 + "B" * 16 + "C" * 3 + data = b"A" * 4 + b"B" * 16 + b"C" * 3 self.tcp_port_in = random.randint(1025, 65535) pkts = self.create_stream_frag(self.pg0, self.pg1.remote_ip4, @@ -4233,9 +4240,9 @@ class TestNAT44EndpointDependent(MethodHolder): cls.pg5.admin_up() cls.vapi.ip_add_del_route(dst_address=cls.pg5.remote_ip4n, dst_address_length=32, - table_id=1, + next_hop_address=zero_ip4n, next_hop_sw_if_index=cls.pg5.sw_if_index, - next_hop_address=zero_ip4n) + table_id=1) cls.pg6._local_ip4 = "10.1.2.1" cls.pg6._local_ip4n = socket.inet_pton(socket.AF_INET, @@ -4248,25 +4255,23 @@ class TestNAT44EndpointDependent(MethodHolder): cls.pg6.admin_up() cls.vapi.ip_add_del_route(dst_address=cls.pg6.remote_ip4n, dst_address_length=32, - table_id=1, + next_hop_address=zero_ip4n, next_hop_sw_if_index=cls.pg6.sw_if_index, - next_hop_address=zero_ip4n) + table_id=1) cls.vapi.ip_add_del_route(dst_address=cls.pg6.remote_ip4n, dst_address_length=16, - next_hop_address=zero_ip4n, - table_id=0, + next_hop_address=zero_ip4n, table_id=0, next_hop_table_id=1) cls.vapi.ip_add_del_route(dst_address=zero_ip4n, dst_address_length=0, - next_hop_address=zero_ip4n, - table_id=1, + next_hop_address=zero_ip4n, table_id=1, next_hop_table_id=0) cls.vapi.ip_add_del_route(dst_address=zero_ip4n, dst_address_length=0, - table_id=0, + next_hop_address=cls.pg1.local_ip4n, next_hop_sw_if_index=cls.pg1.sw_if_index, - next_hop_address=cls.pg1.local_ip4n) + table_id=0) cls.pg5.resolve_arp() cls.pg6.resolve_arp() @@ -4275,6 +4280,10 @@ class TestNAT44EndpointDependent(MethodHolder): super(TestNAT44EndpointDependent, cls).tearDownClass() raise + @classmethod + def tearDownClass(cls): + super(TestNAT44EndpointDependent, cls).tearDownClass() + def test_frag_in_order(self): """ NAT44 translate fragments arriving in order """ self.nat44_add_address(self.nat_addr) @@ -5126,7 +5135,7 @@ class TestNAT44EndpointDependent(MethodHolder): self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index, is_inside=0) - # session initiaded from service host - translate + # session initiated from service host - translate pkts = self.create_stream_in(self.pg0, self.pg1) self.pg0.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) @@ -5141,7 +5150,7 @@ class TestNAT44EndpointDependent(MethodHolder): capture = self.pg0.get_capture(len(pkts)) self.verify_capture_in(capture, self.pg0) - # session initiaded from remote host - do not translate + # session initiated from remote host - do not translate self.tcp_port_in = 60303 self.udp_port_in = 60304 self.icmp_id_in = 60305 @@ -6455,15 +6464,20 @@ class TestNAT44Out2InDPO(MethodHolder): cls.pg1.config_ip6() cls.pg1.resolve_ndp() - cls.vapi.ip_add_del_route(is_ipv6=True, dst_address='\x00' * 16, + cls.vapi.ip_add_del_route(dst_address=b'\x00' * 16, dst_address_length=0, next_hop_address=cls.pg1.remote_ip6n, - next_hop_sw_if_index=cls.pg1.sw_if_index) + next_hop_sw_if_index=cls.pg1.sw_if_index, + is_ipv6=True) except Exception: super(TestNAT44Out2InDPO, cls).tearDownClass() raise + @classmethod + def tearDownClass(cls): + super(TestNAT44Out2InDPO, cls).tearDownClass() + def configure_xlat(self): self.dst_ip6_pfx = '1:2:3::' self.dst_ip6_pfx_n = socket.inet_pton(socket.AF_INET6, @@ -6579,6 +6593,10 @@ class TestDeterministicNAT(MethodHolder): super(TestDeterministicNAT, cls).tearDownClass() raise + @classmethod + def tearDownClass(cls): + super(TestDeterministicNAT, cls).tearDownClass() + def create_stream_in(self, in_if, out_if, ttl=64): """ Create packet stream for inside network @@ -6645,7 +6663,7 @@ class TestDeterministicNAT(MethodHolder): :param capture: Captured packets :param nat_ip: Translated IP address (Default use global NAT address) - :param same_port: Sorce port number is not translated (Default False) + :param same_port: Source port number is not translated (Default False) """ if nat_ip is None: nat_ip = self.nat_addr @@ -7198,6 +7216,10 @@ class TestNAT64(MethodHolder): super(TestNAT64, cls).tearDownClass() raise + @classmethod + def tearDownClass(cls): + super(TestNAT64, cls).tearDownClass() + def test_nat64_inside_interface_handles_neighbor_advertisement(self): """ NAT64 inside interface handles Neighbor Advertisement """ @@ -7285,9 +7307,9 @@ class TestNAT64(MethodHolder): self.assertTrue(pg1_found) features = self.vapi.cli("show interface features pg0") - self.assertNotEqual(features.find('nat64-in2out'), -1) + self.assertIn('nat64-in2out', features) features = self.vapi.cli("show interface features pg1") - self.assertNotEqual(features.find('nat64-out2in'), -1) + self.assertIn('nat64-out2in', features) self.vapi.nat64_add_del_interface(self.pg0.sw_if_index, is_add=0) self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_add=0) @@ -8043,7 +8065,7 @@ class TestNAT64(MethodHolder): reass_n_start = len(reass) # in2out - data = 'a' * 200 + data = b'a' * 200 pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4, self.tcp_port_in, 20, data) self.pg0.add_stream(pkts) @@ -8059,7 +8081,7 @@ class TestNAT64(MethodHolder): self.assertEqual(data, p[Raw].load) # out2in - data = "A" * 4 + "b" * 16 + "C" * 3 + data = b"A" * 4 + b"b" * 16 + b"C" * 3 pkts = self.create_stream_frag(self.pg1, self.nat_addr, 20, @@ -8127,7 +8149,7 @@ class TestNAT64(MethodHolder): self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0) # in2out - data = 'a' * 200 + data = b'a' * 200 pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4, self.tcp_port_in, 20, data) pkts.reverse() @@ -8144,7 +8166,7 @@ class TestNAT64(MethodHolder): self.assertEqual(data, p[Raw].load) # out2in - data = "A" * 4 + "B" * 16 + "C" * 3 + data = b"A" * 4 + b"B" * 16 + b"C" * 3 pkts = self.create_stream_frag(self.pg1, self.nat_addr, 20, @@ -8166,8 +8188,8 @@ class TestNAT64(MethodHolder): self.vapi.nat64_add_del_interface_addr(self.pg4.sw_if_index) # no address in NAT64 pool - adresses = self.vapi.nat44_address_dump() - self.assertEqual(0, len(adresses)) + addresses = self.vapi.nat44_address_dump() + self.assertEqual(0, len(addresses)) # configure interface address and check NAT64 address pool self.pg4.config_ip4() @@ -8178,7 +8200,7 @@ class TestNAT64(MethodHolder): # remove interface address and check NAT64 address pool self.pg4.unconfig_ip4() addresses = self.vapi.nat64_pool_addr_dump() - self.assertEqual(0, len(adresses)) + self.assertEqual(0, len(addresses)) @unittest.skipUnless(running_extended_tests, "part of extended tests") def test_ipfix_max_bibs_sessions(self): @@ -8283,7 +8305,7 @@ class TestNAT64(MethodHolder): self.vapi.nat_ipfix_enable_disable(domain_id=self.ipfix_domain_id, src_port=self.ipfix_src_port) - data = 'a' * 200 + data = b'a' * 200 pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4, self.tcp_port_in, 20, data) pkts.reverse() @@ -8358,9 +8380,9 @@ class TestNAT64(MethodHolder): for p in capture: if p.haslayer(Data): data = ipfix.decode_data_set(p.getlayer(Set)) - if ord(data[0][230]) == 10: + if scapy.compat.orb(data[0][230]) == 10: self.verify_ipfix_bib(data, 1, self.pg0.remote_ip6n) - elif ord(data[0][230]) == 6: + elif scapy.compat.orb(data[0][230]) == 6: self.verify_ipfix_nat64_ses(data, 1, self.pg0.remote_ip6n, @@ -8387,9 +8409,9 @@ class TestNAT64(MethodHolder): self.ipfix_domain_id) if p.haslayer(Data): data = ipfix.decode_data_set(p.getlayer(Set)) - if ord(data[0][230]) == 11: + if scapy.compat.orb(data[0][230]) == 11: self.verify_ipfix_bib(data, 0, self.pg0.remote_ip6n) - elif ord(data[0][230]) == 7: + elif scapy.compat.orb(data[0][230]) == 7: self.verify_ipfix_nat64_ses(data, 0, self.pg0.remote_ip6n, @@ -8531,6 +8553,10 @@ class TestDSlite(MethodHolder): super(TestDSlite, cls).tearDownClass() raise + @classmethod + def tearDownClass(cls): + super(TestDSlite, cls).tearDownClass() + def verify_syslog_apmadd(self, data, isaddr, isport, xsaddr, xsport, sv6enc, proto): message = data.decode('utf-8') @@ -8725,6 +8751,10 @@ class TestDSliteCE(MethodHolder): super(TestDSliteCE, cls).tearDownClass() raise + @classmethod + def tearDownClass(cls): + super(TestDSliteCE, cls).tearDownClass() + def test_dslite_ce(self): """ Test DS-Lite CE """ @@ -8828,6 +8858,10 @@ class TestNAT66(MethodHolder): super(TestNAT66, cls).tearDownClass() raise + @classmethod + def tearDownClass(cls): + super(TestNAT66, cls).tearDownClass() + def test_static(self): """ 1:1 NAT66 test """ self.vapi.nat66_add_del_interface(self.pg0.sw_if_index)