import random
from framework import VppTestCase, VppTestRunner, running_extended_tests
+
+import scapy.compat
from scapy.layers.inet import IP, TCP, UDP, ICMP
from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply, \
protocol=id_m.protocol,
is_add=0)
- adresses = self.vapi.nat44_address_dump()
- for addr in adresses:
+ addresses = self.vapi.nat44_address_dump()
+ for addr in addresses:
self.vapi.nat44_add_del_address_range(addr.ip_address,
addr.ip_address,
twice_nat=addr.twice_nat,
:param ip: IP address
:param is_add: 1 if add, 0 if delete (Default add)
- :param twice_nat: twice NAT address for extenal hosts
+ :param twice_nat: twice NAT address for external hosts
"""
nat_addr = socket.inet_pton(socket.AF_INET, ip)
self.vapi.nat44_add_del_address_range(nat_addr, nat_addr, is_add,
pref_n[13] = ip4_n[1]
pref_n[14] = ip4_n[2]
pref_n[15] = ip4_n[3]
- return socket.inet_ntop(socket.AF_INET6, ''.join(pref_n))
+ packed_pref_n = b''.join([scapy.compat.chb(x) for x in pref_n])
+ return socket.inet_ntop(socket.AF_INET6, packed_pref_n)
def extract_ip4(self, ip6, plen):
"""
:param capture: Captured packets
:param nat_ip: Translated IP address (Default use global NAT address)
- :param same_port: Sorce port number is not translated (Default False)
+ :param same_port: Source port number is not translated (Default False)
:param dst_ip: Destination IP address (Default do not verify)
:param is_ip6: If L3 protocol is IPv6 (Default False)
"""
:param capture: Captured packets
:param nat_ip: Translated IP address
- :param same_port: Sorce port number is not translated (Default False)
+ :param same_port: Source port number is not translated (Default False)
:param dst_ip: Destination IP address (Default do not verify)
"""
return self.verify_capture_out(capture, nat_ip, same_port, dst_ip,
:param data: Payload data
:param proto: protocol (TCP, UDP, ICMP)
:param echo_reply: use echo_reply if protocol is ICMP
- :returns: Fragmets
+ :returns: Fragments
"""
if proto == IP_PROTOS.tcp:
p = (IP(src=src_if.remote_ip4, dst=dst) /
TCP(sport=sport, dport=dport) /
Raw(data))
- p = p.__class__(str(p))
- chksum = p['TCP'].chksum
+ p = p.__class__(scapy.compat.raw(p))
+ chksum = p[TCP].chksum
proto_header = TCP(sport=sport, dport=dport, chksum=chksum)
elif proto == IP_PROTOS.udp:
proto_header = UDP(sport=sport, dport=dport)
:param pref: NAT64 prefix
:param plen: NAT64 prefix length
:param fragsize: size of fragments
- :returns: Fragmets
+ :returns: Fragments
"""
if pref is None:
dst_ip6 = ''.join(['64:ff9b::', dst])
self.assertEqual(6, len(data))
for record in data:
# natEvent
- self.assertIn(ord(record[230]), [4, 5])
- if ord(record[230]) == 4:
+ self.assertIn(scapy.compat.orb(record[230]), [4, 5])
+ if scapy.compat.orb(record[230]) == 4:
nat44_ses_create_num += 1
else:
nat44_ses_delete_num += 1
# ingressVRFID
self.assertEqual(struct.pack("!I", 0), record[234])
# protocolIdentifier/sourceTransportPort/postNAPTSourceTransportPort
- if IP_PROTOS.icmp == ord(record[4]):
+ if IP_PROTOS.icmp == scapy.compat.orb(record[4]):
self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7])
self.assertEqual(struct.pack("!H", self.icmp_id_out),
record[227])
- elif IP_PROTOS.tcp == ord(record[4]):
+ elif IP_PROTOS.tcp == scapy.compat.orb(record[4]):
self.assertEqual(struct.pack("!H", self.tcp_port_in),
record[7])
self.assertEqual(struct.pack("!H", self.tcp_port_out),
record[227])
- elif IP_PROTOS.udp == ord(record[4]):
+ elif IP_PROTOS.udp == scapy.compat.orb(record[4]):
self.assertEqual(struct.pack("!H", self.udp_port_in),
record[7])
self.assertEqual(struct.pack("!H", self.udp_port_out),
self.assertEqual(1, len(data))
record = data[0]
# natEvent
- self.assertEqual(ord(record[230]), 3)
+ self.assertEqual(scapy.compat.orb(record[230]), 3)
# natPoolID
self.assertEqual(struct.pack("!I", 0), record[283])
self.assertEqual(1, len(data))
record = data[0]
# natEvent
- self.assertEqual(ord(record[230]), 13)
+ self.assertEqual(scapy.compat.orb(record[230]), 13)
# natQuotaExceededEvent
self.assertEqual(struct.pack("I", 1), record[466])
# maxSessionEntries
self.assertEqual(1, len(data))
record = data[0]
# natEvent
- self.assertEqual(ord(record[230]), 13)
+ self.assertEqual(scapy.compat.orb(record[230]), 13)
# natQuotaExceededEvent
self.assertEqual(struct.pack("I", 2), record[466])
# maxBIBEntries
self.assertEqual(1, len(data))
record = data[0]
# natEvent
- self.assertEqual(ord(record[230]), 13)
+ self.assertEqual(scapy.compat.orb(record[230]), 13)
# natQuotaExceededEvent
self.assertEqual(struct.pack("I", 5), record[466])
# maxFragmentsPendingReassembly
self.assertEqual(1, len(data))
record = data[0]
# natEvent
- self.assertEqual(ord(record[230]), 13)
+ self.assertEqual(scapy.compat.orb(record[230]), 13)
# natQuotaExceededEvent
self.assertEqual(struct.pack("I", 5), record[466])
# maxFragmentsPendingReassembly
record = data[0]
# natEvent
if is_create:
- self.assertEqual(ord(record[230]), 10)
+ self.assertEqual(scapy.compat.orb(record[230]), 10)
else:
- self.assertEqual(ord(record[230]), 11)
+ self.assertEqual(scapy.compat.orb(record[230]), 11)
# sourceIPv6Address
self.assertEqual(src_addr, record[27])
# postNATSourceIPv4Address
self.assertEqual(self.nat_addr_n, record[225])
# protocolIdentifier
- self.assertEqual(IP_PROTOS.tcp, ord(record[4]))
+ self.assertEqual(IP_PROTOS.tcp, scapy.compat.orb(record[4]))
# ingressVRFID
self.assertEqual(struct.pack("!I", 0), record[234])
# sourceTransportPort
record = data[0]
# natEvent
if is_create:
- self.assertEqual(ord(record[230]), 6)
+ self.assertEqual(scapy.compat.orb(record[230]), 6)
else:
- self.assertEqual(ord(record[230]), 7)
+ self.assertEqual(scapy.compat.orb(record[230]), 7)
# sourceIPv6Address
self.assertEqual(src_addr, record[27])
# destinationIPv6Address
self.assertEqual(socket.inet_pton(socket.AF_INET, dst_addr),
record[226])
# protocolIdentifier
- self.assertEqual(IP_PROTOS.tcp, ord(record[4]))
+ self.assertEqual(IP_PROTOS.tcp, scapy.compat.orb(record[4]))
# ingressVRFID
self.assertEqual(struct.pack("!I", 0), record[234])
# sourceTransportPort
self.assertEqual(1, len(data))
record = data[0]
# natEvent
- self.assertEqual(ord(record[230]), 13)
+ self.assertEqual(scapy.compat.orb(record[230]), 13)
# natQuotaExceededEvent
self.assertEqual(struct.pack("I", 3), record[466])
# maxEntriesPerUser
layer = self.proto2layer(proto)
if proto == IP_PROTOS.tcp:
- data = "A" * 4 + "B" * 16 + "C" * 3
+ data = b"A" * 4 + b"B" * 16 + b"C" * 3
else:
- data = "A" * 16 + "B" * 16 + "C" * 3
+ data = b"A" * 16 + b"B" * 16 + b"C" * 3
self.port_in = random.randint(1025, 65535)
reass = self.vapi.nat_reass_dump()
layer = self.proto2layer(proto)
if proto == IP_PROTOS.tcp:
- data = "A" * 4 + "B" * 16 + "C" * 3
+ data = b"A" * 4 + b"B" * 16 + b"C" * 3
else:
- data = "A" * 16 + "B" * 16 + "C" * 3
+ data = b"A" * 16 + b"B" * 16 + b"C" * 3
self.port_in = random.randint(1025, 65535)
for i in range(2):
layer = self.proto2layer(proto)
if proto == IP_PROTOS.tcp:
- data = "A" * 4 + "B" * 16 + "C" * 3
+ data = b"A" * 4 + b"B" * 16 + b"C" * 3
else:
- data = "A" * 16 + "B" * 16 + "C" * 3
+ data = b"A" * 16 + b"B" * 16 + b"C" * 3
# send packet from host to server
pkts = self.create_stream_frag(self.pg0,
layer = self.proto2layer(proto)
if proto == IP_PROTOS.tcp:
- data = "A" * 4 + "B" * 16 + "C" * 3
+ data = b"A" * 4 + b"B" * 16 + b"C" * 3
else:
- data = "A" * 16 + "B" * 16 + "C" * 3
+ data = b"A" * 16 + b"B" * 16 + b"C" * 3
self.port_in = random.randint(1025, 65535)
for i in range(2):
layer = self.proto2layer(proto)
if proto == IP_PROTOS.tcp:
- data = "A" * 4 + "B" * 16 + "C" * 3
+ data = b"A" * 4 + b"B" * 16 + b"C" * 3
else:
- data = "A" * 16 + "B" * 16 + "C" * 3
+ data = b"A" * 16 + b"B" * 16 + b"C" * 3
self.port_in = random.randint(1025, 65535)
for i in range(2):
super(TestNAT44, cls).tearDownClass()
raise
+ @classmethod
+ def tearDownClass(cls):
+ super(TestNAT44, cls).tearDownClass()
+
def test_dynamic(self):
""" NAT44 dynamic translation test """
self.nat44_add_address(self.nat_addr)
is_inside=0)
sm = self.vapi.nat44_static_mapping_dump()
self.assertEqual(len(sm), 1)
- self.assertEqual((sm[0].tag).split('\0', 1)[0], '')
+ self.assertEqual((sm[0].tag).split(b'\0', 1)[0], b'')
self.assertEqual(sm[0].protocol, 0)
self.assertEqual(sm[0].local_port, 0)
self.assertEqual(sm[0].external_port, 0)
self.tcp_port_out = 6303
self.udp_port_out = 6304
self.icmp_id_out = 6305
- tag = "testTAG"
+ tag = b"testTAG"
self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip, tag=tag)
self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
is_inside=0)
sm = self.vapi.nat44_static_mapping_dump()
self.assertEqual(len(sm), 1)
- self.assertEqual((sm[0].tag).split('\0', 1)[0], tag)
+ self.assertEqual((sm[0].tag).split(b'\0', 1)[0], tag)
# out2in
pkts = self.create_stream_out(self.pg1, nat_ip)
self.vapi.nat44_add_del_interface_addr(self.pg7.sw_if_index)
# no address in NAT pool
- adresses = self.vapi.nat44_address_dump()
- self.assertEqual(0, len(adresses))
+ addresses = self.vapi.nat44_address_dump()
+ self.assertEqual(0, len(addresses))
# configure interface address and check NAT address pool
self.pg7.config_ip4()
- adresses = self.vapi.nat44_address_dump()
- self.assertEqual(1, len(adresses))
- self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
+ addresses = self.vapi.nat44_address_dump()
+ self.assertEqual(1, len(addresses))
+ self.assertEqual(addresses[0].ip_address[0:4], self.pg7.local_ip4n)
# remove interface address and check NAT address pool
self.pg7.unconfig_ip4()
- adresses = self.vapi.nat44_address_dump()
- self.assertEqual(0, len(adresses))
+ addresses = self.vapi.nat44_address_dump()
+ self.assertEqual(0, len(addresses))
def test_interface_addr_static_mapping(self):
""" Static mapping with addresses from interface """
- tag = "testTAG"
+ tag = b"testTAG"
self.vapi.nat44_add_del_interface_addr(self.pg7.sw_if_index)
self.nat44_add_static_mapping(
self.assertEqual(1, len(static_mappings))
self.assertEqual(self.pg7.sw_if_index,
static_mappings[0].external_sw_if_index)
- self.assertEqual((static_mappings[0].tag).split('\0', 1)[0], tag)
+ self.assertEqual((static_mappings[0].tag).split(b'\0', 1)[0], tag)
# configure interface address and check static mappings
self.pg7.config_ip4()
if sm.external_sw_if_index == 0xFFFFFFFF:
self.assertEqual(sm.external_ip_address[0:4],
self.pg7.local_ip4n)
- self.assertEqual((sm.tag).split('\0', 1)[0], tag)
+ self.assertEqual((sm.tag).split(b'\0', 1)[0], tag)
resolved = True
self.assertTrue(resolved)
self.assertEqual(1, len(static_mappings))
self.assertEqual(self.pg7.sw_if_index,
static_mappings[0].external_sw_if_index)
- self.assertEqual((static_mappings[0].tag).split('\0', 1)[0], tag)
+ self.assertEqual((static_mappings[0].tag).split(b'\0', 1)[0], tag)
# configure interface address again and check static mappings
self.pg7.config_ip4()
if sm.external_sw_if_index == 0xFFFFFFFF:
self.assertEqual(sm.external_ip_address[0:4],
self.pg7.local_ip4n)
- self.assertEqual((sm.tag).split('\0', 1)[0], tag)
+ self.assertEqual((sm.tag).split(b'\0', 1)[0], tag)
resolved = True
self.assertTrue(resolved)
identity_mappings[0].sw_if_index)
def test_ipfix_nat44_sess(self):
- """ IPFIX logging NAT44 session created/delted """
+ """ IPFIX logging NAT44 session created/deleted """
self.ipfix_domain_id = 10
self.ipfix_src_port = 20202
- colector_port = 30303
+ collector_port = 30303
bind_layers(UDP, IPFIX, dport=30303)
self.nat44_add_address(self.nat_addr)
self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
src_address=self.pg3.local_ip4n,
path_mtu=512,
template_interval=10,
- collector_port=colector_port)
+ collector_port=collector_port)
self.vapi.nat_ipfix_enable_disable(domain_id=self.ipfix_domain_id,
src_port=self.ipfix_src_port)
self.assertEqual(p[IP].src, self.pg3.local_ip4)
self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
self.assertEqual(p[UDP].sport, self.ipfix_src_port)
- self.assertEqual(p[UDP].dport, colector_port)
+ self.assertEqual(p[UDP].dport, collector_port)
self.assertEqual(p[IPFIX].observationDomainID,
self.ipfix_domain_id)
if p.haslayer(Template):
is_inside=0)
self.vapi.nat44_forwarding_enable_disable(1)
- data = "A" * 16 + "B" * 16 + "C" * 3
+ data = b"A" * 16 + b"B" * 16 + b"C" * 3
pkts = self.create_stream_frag(self.pg1,
self.pg0.remote_ip4,
4789,
self.vapi.nat_ipfix_enable_disable(domain_id=self.ipfix_domain_id,
src_port=self.ipfix_src_port)
- data = "A" * 4 + "B" * 16 + "C" * 3
+ data = b"A" * 4 + b"B" * 16 + b"C" * 3
self.tcp_port_in = random.randint(1025, 65535)
pkts = self.create_stream_frag(self.pg0,
self.pg1.remote_ip4,
def tearDown(self):
super(TestNAT44, self).tearDown()
- if not self.vpp_dead:
- self.logger.info(self.vapi.cli("show nat44 addresses"))
- self.logger.info(self.vapi.cli("show nat44 interfaces"))
- self.logger.info(self.vapi.cli("show nat44 static mappings"))
- self.logger.info(self.vapi.cli("show nat44 interface address"))
- self.logger.info(self.vapi.cli("show nat44 sessions detail"))
- self.logger.info(self.vapi.cli("show nat virtual-reassembly"))
- self.logger.info(self.vapi.cli("show nat44 hash tables detail"))
- self.logger.info(self.vapi.cli("show nat timeouts"))
- self.logger.info(
- self.vapi.cli("show nat addr-port-assignment-alg"))
- self.logger.info(self.vapi.cli("show nat ha"))
- self.clear_nat44()
- self.vapi.cli("clear logging")
+ self.clear_nat44()
+ self.vapi.cli("clear logging")
+
+ def show_commands_at_teardown(self):
+ self.logger.info(self.vapi.cli("show nat44 addresses"))
+ self.logger.info(self.vapi.cli("show nat44 interfaces"))
+ self.logger.info(self.vapi.cli("show nat44 static mappings"))
+ self.logger.info(self.vapi.cli("show nat44 interface address"))
+ self.logger.info(self.vapi.cli("show nat44 sessions detail"))
+ self.logger.info(self.vapi.cli("show nat virtual-reassembly"))
+ self.logger.info(self.vapi.cli("show nat44 hash tables detail"))
+ self.logger.info(self.vapi.cli("show nat timeouts"))
+ self.logger.info(
+ self.vapi.cli("show nat addr-port-assignment-alg"))
+ self.logger.info(self.vapi.cli("show nat ha"))
class TestNAT44EndpointDependent(MethodHolder):
cls.pg5.admin_up()
cls.vapi.ip_add_del_route(dst_address=cls.pg5.remote_ip4n,
dst_address_length=32,
- table_id=1,
+ next_hop_address=zero_ip4n,
next_hop_sw_if_index=cls.pg5.sw_if_index,
- next_hop_address=zero_ip4n)
+ table_id=1)
cls.pg6._local_ip4 = "10.1.2.1"
cls.pg6._local_ip4n = socket.inet_pton(socket.AF_INET,
cls.pg6.admin_up()
cls.vapi.ip_add_del_route(dst_address=cls.pg6.remote_ip4n,
dst_address_length=32,
- table_id=1,
+ next_hop_address=zero_ip4n,
next_hop_sw_if_index=cls.pg6.sw_if_index,
- next_hop_address=zero_ip4n)
+ table_id=1)
cls.vapi.ip_add_del_route(dst_address=cls.pg6.remote_ip4n,
dst_address_length=16,
- next_hop_address=zero_ip4n,
- table_id=0,
+ next_hop_address=zero_ip4n, table_id=0,
next_hop_table_id=1)
cls.vapi.ip_add_del_route(dst_address=zero_ip4n,
dst_address_length=0,
- next_hop_address=zero_ip4n,
- table_id=1,
+ next_hop_address=zero_ip4n, table_id=1,
next_hop_table_id=0)
cls.vapi.ip_add_del_route(dst_address=zero_ip4n,
dst_address_length=0,
- table_id=0,
+ next_hop_address=cls.pg1.local_ip4n,
next_hop_sw_if_index=cls.pg1.sw_if_index,
- next_hop_address=cls.pg1.local_ip4n)
+ table_id=0)
cls.pg5.resolve_arp()
cls.pg6.resolve_arp()
super(TestNAT44EndpointDependent, cls).tearDownClass()
raise
+ @classmethod
+ def tearDownClass(cls):
+ super(TestNAT44EndpointDependent, cls).tearDownClass()
+
def test_frag_in_order(self):
""" NAT44 translate fragments arriving in order """
self.nat44_add_address(self.nat_addr)
self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
is_inside=0)
- # session initiaded from service host - translate
+ # session initiated from service host - translate
pkts = self.create_stream_in(self.pg0, self.pg1)
self.pg0.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
capture = self.pg0.get_capture(len(pkts))
self.verify_capture_in(capture, self.pg0)
- # session initiaded from remote host - do not translate
+ # session initiated from remote host - do not translate
self.tcp_port_in = 60303
self.udp_port_in = 60304
self.icmp_id_in = 60305
def tearDown(self):
super(TestNAT44EndpointDependent, self).tearDown()
if not self.vpp_dead:
- self.logger.info(self.vapi.cli("show nat44 addresses"))
- self.logger.info(self.vapi.cli("show nat44 interfaces"))
- self.logger.info(self.vapi.cli("show nat44 static mappings"))
- self.logger.info(self.vapi.cli("show nat44 interface address"))
- self.logger.info(self.vapi.cli("show nat44 sessions detail"))
- self.logger.info(self.vapi.cli("show nat44 hash tables detail"))
- self.logger.info(self.vapi.cli("show nat timeouts"))
self.clear_nat44()
self.vapi.cli("clear logging")
+ def show_commands_at_teardown(self):
+ self.logger.info(self.vapi.cli("show nat44 addresses"))
+ self.logger.info(self.vapi.cli("show nat44 interfaces"))
+ self.logger.info(self.vapi.cli("show nat44 static mappings"))
+ self.logger.info(self.vapi.cli("show nat44 interface address"))
+ self.logger.info(self.vapi.cli("show nat44 sessions detail"))
+ self.logger.info(self.vapi.cli("show nat44 hash tables detail"))
+ self.logger.info(self.vapi.cli("show nat timeouts"))
+
class TestNAT44Out2InDPO(MethodHolder):
""" NAT44 Test Cases using out2in DPO """
cls.pg1.config_ip6()
cls.pg1.resolve_ndp()
- cls.vapi.ip_add_del_route(is_ipv6=True, dst_address='\x00' * 16,
+ cls.vapi.ip_add_del_route(dst_address=b'\x00' * 16,
dst_address_length=0,
next_hop_address=cls.pg1.remote_ip6n,
- next_hop_sw_if_index=cls.pg1.sw_if_index)
+ next_hop_sw_if_index=cls.pg1.sw_if_index,
+ is_ipv6=True)
except Exception:
super(TestNAT44Out2InDPO, cls).tearDownClass()
raise
+ @classmethod
+ def tearDownClass(cls):
+ super(TestNAT44Out2InDPO, cls).tearDownClass()
+
def configure_xlat(self):
self.dst_ip6_pfx = '1:2:3::'
self.dst_ip6_pfx_n = socket.inet_pton(socket.AF_INET6,
super(TestDeterministicNAT, cls).tearDownClass()
raise
+ @classmethod
+ def tearDownClass(cls):
+ super(TestDeterministicNAT, cls).tearDownClass()
+
def create_stream_in(self, in_if, out_if, ttl=64):
"""
Create packet stream for inside network
:param capture: Captured packets
:param nat_ip: Translated IP address (Default use global NAT address)
- :param same_port: Sorce port number is not translated (Default False)
+ :param same_port: Source port number is not translated (Default False)
"""
if nat_ip is None:
nat_ip = self.nat_addr
def tearDown(self):
super(TestDeterministicNAT, self).tearDown()
if not self.vpp_dead:
- self.logger.info(self.vapi.cli("show nat44 interfaces"))
- self.logger.info(self.vapi.cli("show nat timeouts"))
- self.logger.info(
- self.vapi.cli("show nat44 deterministic mappings"))
- self.logger.info(
- self.vapi.cli("show nat44 deterministic sessions"))
self.clear_nat_det()
+ def show_commands_at_teardown(self):
+ self.logger.info(self.vapi.cli("show nat44 interfaces"))
+ self.logger.info(self.vapi.cli("show nat timeouts"))
+ self.logger.info(
+ self.vapi.cli("show nat44 deterministic mappings"))
+ self.logger.info(
+ self.vapi.cli("show nat44 deterministic sessions"))
+
class TestNAT64(MethodHolder):
""" NAT64 Test Cases """
super(TestNAT64, cls).tearDownClass()
raise
+ @classmethod
+ def tearDownClass(cls):
+ super(TestNAT64, cls).tearDownClass()
+
def test_nat64_inside_interface_handles_neighbor_advertisement(self):
""" NAT64 inside interface handles Neighbor Advertisement """
reass_n_start = len(reass)
# in2out
- data = 'a' * 200
+ data = b'a' * 200
pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
self.tcp_port_in, 20, data)
self.pg0.add_stream(pkts)
self.assertEqual(data, p[Raw].load)
# out2in
- data = "A" * 4 + "b" * 16 + "C" * 3
+ data = b"A" * 4 + b"b" * 16 + b"C" * 3
pkts = self.create_stream_frag(self.pg1,
self.nat_addr,
20,
self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
# in2out
- data = 'a' * 200
+ data = b'a' * 200
pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
self.tcp_port_in, 20, data)
pkts.reverse()
self.assertEqual(data, p[Raw].load)
# out2in
- data = "A" * 4 + "B" * 16 + "C" * 3
+ data = b"A" * 4 + b"B" * 16 + b"C" * 3
pkts = self.create_stream_frag(self.pg1,
self.nat_addr,
20,
self.vapi.nat64_add_del_interface_addr(self.pg4.sw_if_index)
# no address in NAT64 pool
- adresses = self.vapi.nat44_address_dump()
- self.assertEqual(0, len(adresses))
+ addresses = self.vapi.nat44_address_dump()
+ self.assertEqual(0, len(addresses))
# configure interface address and check NAT64 address pool
self.pg4.config_ip4()
# remove interface address and check NAT64 address pool
self.pg4.unconfig_ip4()
addresses = self.vapi.nat64_pool_addr_dump()
- self.assertEqual(0, len(adresses))
+ self.assertEqual(0, len(addresses))
@unittest.skipUnless(running_extended_tests, "part of extended tests")
def test_ipfix_max_bibs_sessions(self):
self.vapi.nat_ipfix_enable_disable(domain_id=self.ipfix_domain_id,
src_port=self.ipfix_src_port)
- data = 'a' * 200
+ data = b'a' * 200
pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
self.tcp_port_in, 20, data)
pkts.reverse()
for p in capture:
if p.haslayer(Data):
data = ipfix.decode_data_set(p.getlayer(Set))
- if ord(data[0][230]) == 10:
+ if scapy.compat.orb(data[0][230]) == 10:
self.verify_ipfix_bib(data, 1, self.pg0.remote_ip6n)
- elif ord(data[0][230]) == 6:
+ elif scapy.compat.orb(data[0][230]) == 6:
self.verify_ipfix_nat64_ses(data,
1,
self.pg0.remote_ip6n,
self.ipfix_domain_id)
if p.haslayer(Data):
data = ipfix.decode_data_set(p.getlayer(Set))
- if ord(data[0][230]) == 11:
+ if scapy.compat.orb(data[0][230]) == 11:
self.verify_ipfix_bib(data, 0, self.pg0.remote_ip6n)
- elif ord(data[0][230]) == 7:
+ elif scapy.compat.orb(data[0][230]) == 7:
self.verify_ipfix_nat64_ses(data,
0,
self.pg0.remote_ip6n,
def tearDown(self):
super(TestNAT64, self).tearDown()
if not self.vpp_dead:
- self.logger.info(self.vapi.cli("show nat64 pool"))
- self.logger.info(self.vapi.cli("show nat64 interfaces"))
- self.logger.info(self.vapi.cli("show nat64 prefix"))
- self.logger.info(self.vapi.cli("show nat64 bib all"))
- self.logger.info(self.vapi.cli("show nat64 session table all"))
- self.logger.info(self.vapi.cli("show nat virtual-reassembly"))
self.clear_nat64()
+ def show_commands_at_teardown(self):
+ self.logger.info(self.vapi.cli("show nat64 pool"))
+ self.logger.info(self.vapi.cli("show nat64 interfaces"))
+ self.logger.info(self.vapi.cli("show nat64 prefix"))
+ self.logger.info(self.vapi.cli("show nat64 bib all"))
+ self.logger.info(self.vapi.cli("show nat64 session table all"))
+ self.logger.info(self.vapi.cli("show nat virtual-reassembly"))
+
class TestDSlite(MethodHolder):
""" DS-Lite Test Cases """
super(TestDSlite, cls).tearDownClass()
raise
+ @classmethod
+ def tearDownClass(cls):
+ super(TestDSlite, cls).tearDownClass()
+
def verify_syslog_apmadd(self, data, isaddr, isport, xsaddr, xsport,
sv6enc, proto):
message = data.decode('utf-8')
def tearDown(self):
super(TestDSlite, self).tearDown()
- if not self.vpp_dead:
- self.logger.info(self.vapi.cli("show dslite pool"))
- self.logger.info(
- self.vapi.cli("show dslite aftr-tunnel-endpoint-address"))
- self.logger.info(self.vapi.cli("show dslite sessions"))
+
+ def show_commands_at_teardown(self):
+ self.logger.info(self.vapi.cli("show dslite pool"))
+ self.logger.info(
+ self.vapi.cli("show dslite aftr-tunnel-endpoint-address"))
+ self.logger.info(self.vapi.cli("show dslite sessions"))
class TestDSliteCE(MethodHolder):
super(TestDSliteCE, cls).tearDownClass()
raise
+ @classmethod
+ def tearDownClass(cls):
+ super(TestDSliteCE, cls).tearDownClass()
+
def test_dslite_ce(self):
""" Test DS-Lite CE """
def tearDown(self):
super(TestDSliteCE, self).tearDown()
- if not self.vpp_dead:
- self.logger.info(
- self.vapi.cli("show dslite aftr-tunnel-endpoint-address"))
- self.logger.info(
- self.vapi.cli("show dslite b4-tunnel-endpoint-address"))
+
+ def show_commands_at_teardown(self):
+ self.logger.info(
+ self.vapi.cli("show dslite aftr-tunnel-endpoint-address"))
+ self.logger.info(
+ self.vapi.cli("show dslite b4-tunnel-endpoint-address"))
class TestNAT66(MethodHolder):
super(TestNAT66, cls).tearDownClass()
raise
+ @classmethod
+ def tearDownClass(cls):
+ super(TestNAT66, cls).tearDownClass()
+
def test_static(self):
""" 1:1 NAT66 test """
self.vapi.nat66_add_del_interface(self.pg0.sw_if_index)
def tearDown(self):
super(TestNAT66, self).tearDown()
- if not self.vpp_dead:
- self.logger.info(self.vapi.cli("show nat66 interfaces"))
- self.logger.info(self.vapi.cli("show nat66 static mappings"))
- self.clear_nat66()
+ self.clear_nat66()
+ def show_commands_at_teardown(self):
+ self.logger.info(self.vapi.cli("show nat66 interfaces"))
+ self.logger.info(self.vapi.cli("show nat66 static mappings"))
if __name__ == '__main__':
unittest.main(testRunner=VppTestRunner)