from framework import VppTestCase, VppTestRunner, running_extended_tests
from scapy.layers.inet import IP, TCP, UDP, ICMP
from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
-from scapy.layers.l2 import Ether, ARP
+from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply
+from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6
+from scapy.layers.l2 import Ether, ARP, GRE
from scapy.data import IP_PROTOS
from scapy.packet import bind_layers
from util import ppp
def tearDown(self):
super(MethodHolder, self).tearDown()
+ def check_ip_checksum(self, pkt):
+ """
+ Check IP checksum of the packet
+
+ :param pkt: Packet to check IP checksum
+ """
+ new = pkt.__class__(str(pkt))
+ del new['IP'].chksum
+ new = new.__class__(str(new))
+ self.assertEqual(new['IP'].chksum, pkt['IP'].chksum)
+
+ def check_tcp_checksum(self, pkt):
+ """
+ Check TCP checksum in IP packet
+
+ :param pkt: Packet to check TCP checksum
+ """
+ new = pkt.__class__(str(pkt))
+ del new['TCP'].chksum
+ new = new.__class__(str(new))
+ self.assertEqual(new['TCP'].chksum, pkt['TCP'].chksum)
+
+ def check_udp_checksum(self, pkt):
+ """
+ Check UDP checksum in IP packet
+
+ :param pkt: Packet to check UDP checksum
+ """
+ new = pkt.__class__(str(pkt))
+ del new['UDP'].chksum
+ new = new.__class__(str(new))
+ self.assertEqual(new['UDP'].chksum, pkt['UDP'].chksum)
+
+ def check_icmp_errror_embedded(self, pkt):
+ """
+ Check ICMP error embeded packet checksum
+
+ :param pkt: Packet to check ICMP error embeded packet checksum
+ """
+ if pkt.haslayer(IPerror):
+ new = pkt.__class__(str(pkt))
+ del new['IPerror'].chksum
+ new = new.__class__(str(new))
+ self.assertEqual(new['IPerror'].chksum, pkt['IPerror'].chksum)
+
+ if pkt.haslayer(TCPerror):
+ new = pkt.__class__(str(pkt))
+ del new['TCPerror'].chksum
+ new = new.__class__(str(new))
+ self.assertEqual(new['TCPerror'].chksum, pkt['TCPerror'].chksum)
+
+ if pkt.haslayer(UDPerror):
+ if pkt['UDPerror'].chksum != 0:
+ new = pkt.__class__(str(pkt))
+ del new['UDPerror'].chksum
+ new = new.__class__(str(new))
+ self.assertEqual(new['UDPerror'].chksum,
+ pkt['UDPerror'].chksum)
+
+ if pkt.haslayer(ICMPerror):
+ del new['ICMPerror'].chksum
+ new = new.__class__(str(new))
+ self.assertEqual(new['ICMPerror'].chksum, pkt['ICMPerror'].chksum)
+
+ def check_icmp_checksum(self, pkt):
+ """
+ Check ICMP checksum in IPv4 packet
+
+ :param pkt: Packet to check ICMP checksum
+ """
+ new = pkt.__class__(str(pkt))
+ del new['ICMP'].chksum
+ new = new.__class__(str(new))
+ self.assertEqual(new['ICMP'].chksum, pkt['ICMP'].chksum)
+ if pkt.haslayer(IPerror):
+ self.check_icmp_errror_embedded(pkt)
+
+ def check_icmpv6_checksum(self, pkt):
+ """
+ Check ICMPv6 checksum in IPv4 packet
+
+ :param pkt: Packet to check ICMPv6 checksum
+ """
+ new = pkt.__class__(str(pkt))
+ if pkt.haslayer(ICMPv6DestUnreach):
+ del new['ICMPv6DestUnreach'].cksum
+ new = new.__class__(str(new))
+ self.assertEqual(new['ICMPv6DestUnreach'].cksum,
+ pkt['ICMPv6DestUnreach'].cksum)
+ self.check_icmp_errror_embedded(pkt)
+ if pkt.haslayer(ICMPv6EchoRequest):
+ del new['ICMPv6EchoRequest'].cksum
+ new = new.__class__(str(new))
+ self.assertEqual(new['ICMPv6EchoRequest'].cksum,
+ pkt['ICMPv6EchoRequest'].cksum)
+ if pkt.haslayer(ICMPv6EchoReply):
+ del new['ICMPv6EchoReply'].cksum
+ new = new.__class__(str(new))
+ self.assertEqual(new['ICMPv6EchoReply'].cksum,
+ pkt['ICMPv6EchoReply'].cksum)
+
def create_stream_in(self, in_if, out_if, ttl=64):
"""
Create packet stream for inside network
# TCP
p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
- TCP(sport=self.tcp_port_in))
+ TCP(sport=self.tcp_port_in, dport=20))
pkts.append(p)
# UDP
p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
- UDP(sport=self.udp_port_in))
+ UDP(sport=self.udp_port_in, dport=20))
pkts.append(p)
# ICMP
return pkts
+ def compose_ip6(self, ip4, pref, plen):
+ """
+ Compose IPv4-embedded IPv6 addresses
+
+ :param ip4: IPv4 address
+ :param pref: IPv6 prefix
+ :param plen: IPv6 prefix length
+ :returns: IPv4-embedded IPv6 addresses
+ """
+ pref_n = list(socket.inet_pton(socket.AF_INET6, pref))
+ ip4_n = list(socket.inet_pton(socket.AF_INET, ip4))
+ if plen == 32:
+ pref_n[4] = ip4_n[0]
+ pref_n[5] = ip4_n[1]
+ pref_n[6] = ip4_n[2]
+ pref_n[7] = ip4_n[3]
+ elif plen == 40:
+ pref_n[5] = ip4_n[0]
+ pref_n[6] = ip4_n[1]
+ pref_n[7] = ip4_n[2]
+ pref_n[9] = ip4_n[3]
+ elif plen == 48:
+ pref_n[6] = ip4_n[0]
+ pref_n[7] = ip4_n[1]
+ pref_n[9] = ip4_n[2]
+ pref_n[10] = ip4_n[3]
+ elif plen == 56:
+ pref_n[7] = ip4_n[0]
+ pref_n[9] = ip4_n[1]
+ pref_n[10] = ip4_n[2]
+ pref_n[11] = ip4_n[3]
+ elif plen == 64:
+ pref_n[9] = ip4_n[0]
+ pref_n[10] = ip4_n[1]
+ pref_n[11] = ip4_n[2]
+ pref_n[12] = ip4_n[3]
+ elif plen == 96:
+ pref_n[12] = ip4_n[0]
+ pref_n[13] = ip4_n[1]
+ pref_n[14] = ip4_n[2]
+ pref_n[15] = ip4_n[3]
+ return socket.inet_ntop(socket.AF_INET6, ''.join(pref_n))
+
+ def create_stream_in_ip6(self, in_if, out_if, hlim=64, pref=None, plen=0):
+ """
+ Create IPv6 packet stream for inside network
+
+ :param in_if: Inside interface
+ :param out_if: Outside interface
+ :param ttl: Hop Limit of generated packets
+ :param pref: NAT64 prefix
+ :param plen: NAT64 prefix length
+ """
+ pkts = []
+ if pref is None:
+ dst = ''.join(['64:ff9b::', out_if.remote_ip4])
+ else:
+ dst = self.compose_ip6(out_if.remote_ip4, pref, plen)
+
+ # TCP
+ p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
+ IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
+ TCP(sport=self.tcp_port_in, dport=20))
+ pkts.append(p)
+
+ # UDP
+ p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
+ IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
+ UDP(sport=self.udp_port_in, dport=20))
+ pkts.append(p)
+
+ # ICMP
+ p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
+ IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
+ ICMPv6EchoRequest(id=self.icmp_id_in))
+ pkts.append(p)
+
+ return pkts
+
def create_stream_out(self, out_if, dst_ip=None, ttl=64):
"""
Create packet stream for outside network
# TCP
p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
- TCP(dport=self.tcp_port_out))
+ TCP(dport=self.tcp_port_out, sport=20))
pkts.append(p)
# UDP
p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
- UDP(dport=self.udp_port_out))
+ UDP(dport=self.udp_port_out, sport=20))
pkts.append(p)
# ICMP
return pkts
def verify_capture_out(self, capture, nat_ip=None, same_port=False,
- packet_num=3):
+ packet_num=3, dst_ip=None):
"""
Verify captured packets on outside network
:param nat_ip: Translated IP address (Default use global SNAT address)
:param same_port: Sorce port number is not translated (Default False)
:param packet_num: Expected number of packets (Default 3)
+ :param dst_ip: Destination IP address (Default do not verify)
"""
if nat_ip is None:
nat_ip = self.snat_addr
self.assertEqual(packet_num, len(capture))
for packet in capture:
try:
+ self.check_ip_checksum(packet)
self.assertEqual(packet[IP].src, nat_ip)
+ if dst_ip is not None:
+ self.assertEqual(packet[IP].dst, dst_ip)
if packet.haslayer(TCP):
if same_port:
self.assertEqual(packet[TCP].sport, self.tcp_port_in)
self.assertNotEqual(
packet[TCP].sport, self.tcp_port_in)
self.tcp_port_out = packet[TCP].sport
+ self.check_tcp_checksum(packet)
elif packet.haslayer(UDP):
if same_port:
self.assertEqual(packet[UDP].sport, self.udp_port_in)
else:
self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
self.icmp_id_out = packet[ICMP].id
+ self.check_icmp_checksum(packet)
except:
self.logger.error(ppp("Unexpected or invalid packet "
"(outside network):", packet))
self.assertEqual(packet_num, len(capture))
for packet in capture:
try:
+ self.check_ip_checksum(packet)
self.assertEqual(packet[IP].dst, in_if.remote_ip4)
if packet.haslayer(TCP):
self.assertEqual(packet[TCP].dport, self.tcp_port_in)
+ self.check_tcp_checksum(packet)
elif packet.haslayer(UDP):
self.assertEqual(packet[UDP].dport, self.udp_port_in)
else:
self.assertEqual(packet[ICMP].id, self.icmp_id_in)
+ self.check_icmp_checksum(packet)
+ except:
+ self.logger.error(ppp("Unexpected or invalid packet "
+ "(inside network):", packet))
+ raise
+
+ def verify_capture_in_ip6(self, capture, src_ip, dst_ip, packet_num=3):
+ """
+ Verify captured IPv6 packets on inside network
+
+ :param capture: Captured packets
+ :param src_ip: Source IP
+ :param dst_ip: Destination IP address
+ :param packet_num: Expected number of packets (Default 3)
+ """
+ self.assertEqual(packet_num, len(capture))
+ for packet in capture:
+ try:
+ self.assertEqual(packet[IPv6].src, src_ip)
+ self.assertEqual(packet[IPv6].dst, dst_ip)
+ if packet.haslayer(TCP):
+ self.assertEqual(packet[TCP].dport, self.tcp_port_in)
+ self.check_tcp_checksum(packet)
+ elif packet.haslayer(UDP):
+ self.assertEqual(packet[UDP].dport, self.udp_port_in)
+ self.check_udp_checksum(packet)
+ else:
+ self.assertEqual(packet[ICMPv6EchoReply].id,
+ self.icmp_id_in)
+ self.check_icmpv6_checksum(packet)
except:
self.logger.error(ppp("Unexpected or invalid packet "
"(inside network):", packet))
self.assertEqual(ip.dst, server.ip4)
self.assertNotEqual(tcp.sport, host_in_port)
self.assertEqual(tcp.dport, server_in_port)
+ self.check_tcp_checksum(p)
host_out_port = tcp.sport
except:
self.logger.error(ppp("Unexpected or invalid packet:", p))
self.assertEqual(ip.dst, host.ip4)
self.assertEqual(tcp.sport, server_out_port)
self.assertEqual(tcp.dport, host_in_port)
+ self.check_tcp_checksum(p)
except:
self.logger.error(ppp("Unexpected or invalid packet:"), p)
raise
self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
self.assertEqual(packet[TCP].dport, server_tcp_port)
self.tcp_port_out = packet[TCP].sport
+ self.check_tcp_checksum(packet)
elif packet.haslayer(UDP):
self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
self.assertEqual(packet[UDP].dport, server_udp_port)
if packet.haslayer(TCP):
self.assertEqual(packet[TCP].dport, self.tcp_port_in)
self.assertEqual(packet[TCP].sport, server_tcp_port)
+ self.check_tcp_checksum(packet)
elif packet.haslayer(UDP):
self.assertEqual(packet[UDP].dport, self.udp_port_in)
self.assertEqual(packet[UDP].sport, server_udp_port)
self.assertEqual(packet[TCP].sport, self.tcp_port_in)
self.assertEqual(packet[TCP].dport, server_tcp_port)
self.tcp_port_out = packet[TCP].sport
+ self.check_tcp_checksum(packet)
elif packet.haslayer(UDP):
self.assertEqual(packet[UDP].sport, self.udp_port_in)
self.assertEqual(packet[UDP].dport, server_udp_port)
if packet.haslayer(TCP):
self.assertEqual(packet[TCP].dport, self.tcp_port_in)
self.assertEqual(packet[TCP].sport, server_tcp_port)
+ self.check_tcp_checksum(packet)
elif packet.haslayer(UDP):
self.assertEqual(packet[UDP].dport, self.udp_port_in)
self.assertEqual(packet[UDP].sport, server_udp_port)
capture = self.pg8.get_capture(len(pkts))
self.verify_capture_out(capture)
+ def test_static_unknown_proto(self):
+ """ 1:1 NAT translate packet with unknown protocol """
+ nat_ip = "10.0.0.10"
+ self.snat_add_static_mapping(self.pg0.remote_ip4, nat_ip)
+ self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
+ self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
+ is_inside=0)
+
+ # in2out
+ p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+ IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
+ GRE() /
+ IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
+ TCP(sport=1234, dport=1234))
+ self.pg0.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ p = self.pg1.get_capture(1)
+ packet = p[0]
+ try:
+ self.assertEqual(packet[IP].src, nat_ip)
+ self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
+ self.assertTrue(packet.haslayer(GRE))
+ self.check_ip_checksum(packet)
+ except:
+ self.logger.error(ppp("Unexpected or invalid packet:", packet))
+ raise
+
+ # out2in
+ p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
+ IP(src=self.pg1.remote_ip4, dst=nat_ip) /
+ GRE() /
+ IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
+ TCP(sport=1234, dport=1234))
+ self.pg1.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ p = self.pg0.get_capture(1)
+ packet = p[0]
+ try:
+ self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
+ self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
+ self.assertTrue(packet.haslayer(GRE))
+ self.check_ip_checksum(packet)
+ except:
+ self.logger.error(ppp("Unexpected or invalid packet:", packet))
+ raise
+
+ def test_hairpinning_unknown_proto(self):
+ """ 1:1 NAT translate packet with unknown protocol - hairpinning """
+
+ host = self.pg0.remote_hosts[0]
+ server = self.pg0.remote_hosts[1]
+
+ host_nat_ip = "10.0.0.10"
+ server_nat_ip = "10.0.0.11"
+
+ self.snat_add_static_mapping(host.ip4, host_nat_ip)
+ self.snat_add_static_mapping(server.ip4, server_nat_ip)
+ self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
+ self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
+ is_inside=0)
+
+ # host to server
+ p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
+ IP(src=host.ip4, dst=server_nat_ip) /
+ GRE() /
+ IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
+ TCP(sport=1234, dport=1234))
+ self.pg0.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ p = self.pg0.get_capture(1)
+ packet = p[0]
+ try:
+ self.assertEqual(packet[IP].src, host_nat_ip)
+ self.assertEqual(packet[IP].dst, server.ip4)
+ self.assertTrue(packet.haslayer(GRE))
+ self.check_ip_checksum(packet)
+ except:
+ self.logger.error(ppp("Unexpected or invalid packet:", packet))
+ raise
+
+ # server to host
+ p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
+ IP(src=server.ip4, dst=host_nat_ip) /
+ GRE() /
+ IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
+ TCP(sport=1234, dport=1234))
+ self.pg0.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ p = self.pg0.get_capture(1)
+ packet = p[0]
+ try:
+ self.assertEqual(packet[IP].src, server_nat_ip)
+ self.assertEqual(packet[IP].dst, host.ip4)
+ self.assertTrue(packet.haslayer(GRE))
+ self.check_ip_checksum(packet)
+ except:
+ self.logger.error(ppp("Unexpected or invalid packet:", packet))
+ raise
+
def tearDown(self):
super(TestSNAT, self).tearDown()
if not self.vpp_dead:
self.logger.info(self.vapi.cli("show snat detail"))
self.clear_snat()
+
+class TestNAT64(MethodHolder):
+ """ NAT64 Test Cases """
+
+ @classmethod
+ def setUpClass(cls):
+ super(TestNAT64, cls).setUpClass()
+
+ try:
+ cls.tcp_port_in = 6303
+ cls.tcp_port_out = 6303
+ cls.udp_port_in = 6304
+ cls.udp_port_out = 6304
+ cls.icmp_id_in = 6305
+ cls.icmp_id_out = 6305
+ cls.nat_addr = '10.0.0.3'
+ cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
+ cls.vrf1_id = 10
+ cls.vrf1_nat_addr = '10.0.10.3'
+ cls.vrf1_nat_addr_n = socket.inet_pton(socket.AF_INET,
+ cls.vrf1_nat_addr)
+
+ cls.create_pg_interfaces(range(3))
+ cls.ip6_interfaces = list(cls.pg_interfaces[0:1])
+ cls.ip6_interfaces.append(cls.pg_interfaces[2])
+ cls.ip4_interfaces = list(cls.pg_interfaces[1:2])
+
+ cls.pg_interfaces[2].set_table_ip6(cls.vrf1_id)
+
+ cls.pg0.generate_remote_hosts(2)
+
+ for i in cls.ip6_interfaces:
+ i.admin_up()
+ i.config_ip6()
+ i.configure_ipv6_neighbors()
+
+ for i in cls.ip4_interfaces:
+ i.admin_up()
+ i.config_ip4()
+ i.resolve_arp()
+
+ except Exception:
+ super(TestNAT64, cls).tearDownClass()
+ raise
+
+ def test_pool(self):
+ """ Add/delete address to NAT64 pool """
+ nat_addr = socket.inet_pton(socket.AF_INET, '1.2.3.4')
+
+ self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr)
+
+ addresses = self.vapi.nat64_pool_addr_dump()
+ self.assertEqual(len(addresses), 1)
+ self.assertEqual(addresses[0].address, nat_addr)
+
+ self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr, is_add=0)
+
+ addresses = self.vapi.nat64_pool_addr_dump()
+ self.assertEqual(len(addresses), 0)
+
+ def test_interface(self):
+ """ Enable/disable NAT64 feature on the interface """
+ self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
+ self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
+
+ interfaces = self.vapi.nat64_interface_dump()
+ self.assertEqual(len(interfaces), 2)
+ pg0_found = False
+ pg1_found = False
+ for intf in interfaces:
+ if intf.sw_if_index == self.pg0.sw_if_index:
+ self.assertEqual(intf.is_inside, 1)
+ pg0_found = True
+ elif intf.sw_if_index == self.pg1.sw_if_index:
+ self.assertEqual(intf.is_inside, 0)
+ pg1_found = True
+ self.assertTrue(pg0_found)
+ self.assertTrue(pg1_found)
+
+ features = self.vapi.cli("show interface features pg0")
+ self.assertNotEqual(features.find('nat64-in2out'), -1)
+ features = self.vapi.cli("show interface features pg1")
+ self.assertNotEqual(features.find('nat64-out2in'), -1)
+
+ self.vapi.nat64_add_del_interface(self.pg0.sw_if_index, is_add=0)
+ self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_add=0)
+
+ interfaces = self.vapi.nat64_interface_dump()
+ self.assertEqual(len(interfaces), 0)
+
+ def test_static_bib(self):
+ """ Add/delete static BIB entry """
+ in_addr = socket.inet_pton(socket.AF_INET6,
+ '2001:db8:85a3::8a2e:370:7334')
+ out_addr = socket.inet_pton(socket.AF_INET, '10.1.1.3')
+ in_port = 1234
+ out_port = 5678
+ proto = IP_PROTOS.tcp
+
+ self.vapi.nat64_add_del_static_bib(in_addr,
+ out_addr,
+ in_port,
+ out_port,
+ proto)
+ bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
+ static_bib_num = 0
+ for bibe in bib:
+ if bibe.is_static:
+ static_bib_num += 1
+ self.assertEqual(bibe.i_addr, in_addr)
+ self.assertEqual(bibe.o_addr, out_addr)
+ self.assertEqual(bibe.i_port, in_port)
+ self.assertEqual(bibe.o_port, out_port)
+ self.assertEqual(static_bib_num, 1)
+
+ self.vapi.nat64_add_del_static_bib(in_addr,
+ out_addr,
+ in_port,
+ out_port,
+ proto,
+ is_add=0)
+ bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
+ static_bib_num = 0
+ for bibe in bib:
+ if bibe.is_static:
+ static_bib_num += 1
+ self.assertEqual(static_bib_num, 0)
+
+ def test_set_timeouts(self):
+ """ Set NAT64 timeouts """
+ # verify default values
+ timeouts = self.vapi.nat64_get_timeouts()
+ self.assertEqual(timeouts.udp, 300)
+ self.assertEqual(timeouts.icmp, 60)
+ self.assertEqual(timeouts.tcp_trans, 240)
+ self.assertEqual(timeouts.tcp_est, 7440)
+ self.assertEqual(timeouts.tcp_incoming_syn, 6)
+
+ # set and verify custom values
+ self.vapi.nat64_set_timeouts(udp=200, icmp=30, tcp_trans=250,
+ tcp_est=7450, tcp_incoming_syn=10)
+ timeouts = self.vapi.nat64_get_timeouts()
+ self.assertEqual(timeouts.udp, 200)
+ self.assertEqual(timeouts.icmp, 30)
+ self.assertEqual(timeouts.tcp_trans, 250)
+ self.assertEqual(timeouts.tcp_est, 7450)
+ self.assertEqual(timeouts.tcp_incoming_syn, 10)
+
+ def test_dynamic(self):
+ """ NAT64 dynamic translation test """
+ self.tcp_port_in = 6303
+ self.udp_port_in = 6304
+ self.icmp_id_in = 6305
+
+ ses_num_start = self.nat64_get_ses_num()
+
+ self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
+ self.nat_addr_n)
+ self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
+ self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
+
+ # in2out
+ pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
+ self.pg0.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg1.get_capture(len(pkts))
+ self.verify_capture_out(capture, nat_ip=self.nat_addr,
+ dst_ip=self.pg1.remote_ip4)
+
+ # out2in
+ pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
+ self.pg1.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg0.get_capture(len(pkts))
+ ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
+ self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
+
+ # in2out
+ pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
+ self.pg0.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg1.get_capture(len(pkts))
+ self.verify_capture_out(capture, nat_ip=self.nat_addr,
+ dst_ip=self.pg1.remote_ip4)
+
+ # out2in
+ pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
+ self.pg1.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg0.get_capture(len(pkts))
+ self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
+
+ ses_num_end = self.nat64_get_ses_num()
+
+ self.assertEqual(ses_num_end - ses_num_start, 3)
+
+ # tenant with specific VRF
+ self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
+ self.vrf1_nat_addr_n,
+ vrf_id=self.vrf1_id)
+ self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
+
+ pkts = self.create_stream_in_ip6(self.pg2, self.pg1)
+ self.pg2.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg1.get_capture(len(pkts))
+ self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
+ dst_ip=self.pg1.remote_ip4)
+
+ pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
+ self.pg1.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg2.get_capture(len(pkts))
+ self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg2.remote_ip6)
+
+ def test_static(self):
+ """ NAT64 static translation test """
+ self.tcp_port_in = 60303
+ self.udp_port_in = 60304
+ self.icmp_id_in = 60305
+ self.tcp_port_out = 60303
+ self.udp_port_out = 60304
+ self.icmp_id_out = 60305
+
+ ses_num_start = self.nat64_get_ses_num()
+
+ self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
+ self.nat_addr_n)
+ self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
+ self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
+
+ self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
+ self.nat_addr_n,
+ self.tcp_port_in,
+ self.tcp_port_out,
+ IP_PROTOS.tcp)
+ self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
+ self.nat_addr_n,
+ self.udp_port_in,
+ self.udp_port_out,
+ IP_PROTOS.udp)
+ self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
+ self.nat_addr_n,
+ self.icmp_id_in,
+ self.icmp_id_out,
+ IP_PROTOS.icmp)
+
+ # in2out
+ pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
+ self.pg0.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg1.get_capture(len(pkts))
+ self.verify_capture_out(capture, nat_ip=self.nat_addr,
+ dst_ip=self.pg1.remote_ip4, same_port=True)
+
+ # out2in
+ pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
+ self.pg1.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg0.get_capture(len(pkts))
+ ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
+ self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
+
+ ses_num_end = self.nat64_get_ses_num()
+
+ self.assertEqual(ses_num_end - ses_num_start, 3)
+
+ @unittest.skipUnless(running_extended_tests(), "part of extended tests")
+ def test_session_timeout(self):
+ """ NAT64 session timeout """
+ self.icmp_id_in = 1234
+ self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
+ self.nat_addr_n)
+ self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
+ self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
+ self.vapi.nat64_set_timeouts(icmp=5)
+
+ pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
+ self.pg0.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg1.get_capture(len(pkts))
+
+ ses_num_before_timeout = self.nat64_get_ses_num()
+
+ sleep(15)
+
+ # ICMP session after timeout
+ ses_num_after_timeout = self.nat64_get_ses_num()
+ self.assertNotEqual(ses_num_before_timeout, ses_num_after_timeout)
+
+ def test_icmp_error(self):
+ """ NAT64 ICMP Error message translation """
+ self.tcp_port_in = 6303
+ self.udp_port_in = 6304
+ self.icmp_id_in = 6305
+
+ ses_num_start = self.nat64_get_ses_num()
+
+ self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
+ self.nat_addr_n)
+ self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
+ self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
+
+ # send some packets to create sessions
+ pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
+ self.pg0.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture_ip4 = self.pg1.get_capture(len(pkts))
+ self.verify_capture_out(capture_ip4,
+ nat_ip=self.nat_addr,
+ dst_ip=self.pg1.remote_ip4)
+
+ pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
+ self.pg1.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture_ip6 = self.pg0.get_capture(len(pkts))
+ ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
+ self.verify_capture_in_ip6(capture_ip6, ip[IPv6].src,
+ self.pg0.remote_ip6)
+
+ # in2out
+ pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+ IPv6(src=self.pg0.remote_ip6, dst=ip[IPv6].src) /
+ ICMPv6DestUnreach(code=1) /
+ packet[IPv6] for packet in capture_ip6]
+ self.pg0.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg1.get_capture(len(pkts))
+ for packet in capture:
+ try:
+ self.assertEqual(packet[IP].src, self.nat_addr)
+ self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
+ self.assertEqual(packet[ICMP].type, 3)
+ self.assertEqual(packet[ICMP].code, 13)
+ inner = packet[IPerror]
+ self.assertEqual(inner.src, self.pg1.remote_ip4)
+ self.assertEqual(inner.dst, self.nat_addr)
+ self.check_icmp_checksum(packet)
+ if inner.haslayer(TCPerror):
+ self.assertEqual(inner[TCPerror].dport, self.tcp_port_out)
+ elif inner.haslayer(UDPerror):
+ self.assertEqual(inner[UDPerror].dport, self.udp_port_out)
+ else:
+ self.assertEqual(inner[ICMPerror].id, self.icmp_id_out)
+ except:
+ self.logger.error(ppp("Unexpected or invalid packet:", packet))
+ raise
+
+ # out2in
+ pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
+ IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
+ ICMP(type=3, code=13) /
+ packet[IP] for packet in capture_ip4]
+ self.pg1.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg0.get_capture(len(pkts))
+ for packet in capture:
+ try:
+ self.assertEqual(packet[IPv6].src, ip.src)
+ self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
+ icmp = packet[ICMPv6DestUnreach]
+ self.assertEqual(icmp.code, 1)
+ inner = icmp[IPerror6]
+ self.assertEqual(inner.src, self.pg0.remote_ip6)
+ self.assertEqual(inner.dst, ip.src)
+ self.check_icmpv6_checksum(packet)
+ if inner.haslayer(TCPerror):
+ self.assertEqual(inner[TCPerror].sport, self.tcp_port_in)
+ elif inner.haslayer(UDPerror):
+ self.assertEqual(inner[UDPerror].sport, self.udp_port_in)
+ else:
+ self.assertEqual(inner[ICMPv6EchoRequest].id,
+ self.icmp_id_in)
+ except:
+ self.logger.error(ppp("Unexpected or invalid packet:", packet))
+ raise
+
+ def test_hairpinning(self):
+ """ NAT64 hairpinning """
+
+ client = self.pg0.remote_hosts[0]
+ server = self.pg0.remote_hosts[1]
+ server_tcp_in_port = 22
+ server_tcp_out_port = 4022
+ server_udp_in_port = 23
+ server_udp_out_port = 4023
+ client_tcp_in_port = 1234
+ client_udp_in_port = 1235
+ client_tcp_out_port = 0
+ client_udp_out_port = 0
+ ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
+ nat_addr_ip6 = ip.src
+
+ self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
+ self.nat_addr_n)
+ self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
+ self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
+
+ self.vapi.nat64_add_del_static_bib(server.ip6n,
+ self.nat_addr_n,
+ server_tcp_in_port,
+ server_tcp_out_port,
+ IP_PROTOS.tcp)
+ self.vapi.nat64_add_del_static_bib(server.ip6n,
+ self.nat_addr_n,
+ server_udp_in_port,
+ server_udp_out_port,
+ IP_PROTOS.udp)
+
+ # client to server
+ pkts = []
+ p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+ IPv6(src=client.ip6, dst=nat_addr_ip6) /
+ TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
+ pkts.append(p)
+ p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+ IPv6(src=client.ip6, dst=nat_addr_ip6) /
+ UDP(sport=client_udp_in_port, dport=server_udp_out_port))
+ pkts.append(p)
+ self.pg0.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg0.get_capture(len(pkts))
+ for packet in capture:
+ try:
+ self.assertEqual(packet[IPv6].src, nat_addr_ip6)
+ self.assertEqual(packet[IPv6].dst, server.ip6)
+ if packet.haslayer(TCP):
+ self.assertNotEqual(packet[TCP].sport, client_tcp_in_port)
+ self.assertEqual(packet[TCP].dport, server_tcp_in_port)
+ self.check_tcp_checksum(packet)
+ client_tcp_out_port = packet[TCP].sport
+ else:
+ self.assertNotEqual(packet[UDP].sport, client_udp_in_port)
+ self.assertEqual(packet[UDP].dport, server_udp_in_port)
+ self.check_udp_checksum(packet)
+ client_udp_out_port = packet[UDP].sport
+ except:
+ self.logger.error(ppp("Unexpected or invalid packet:", packet))
+ raise
+
+ # server to client
+ pkts = []
+ p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+ IPv6(src=server.ip6, dst=nat_addr_ip6) /
+ TCP(sport=server_tcp_in_port, dport=client_tcp_out_port))
+ pkts.append(p)
+ p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+ IPv6(src=server.ip6, dst=nat_addr_ip6) /
+ UDP(sport=server_udp_in_port, dport=client_udp_out_port))
+ pkts.append(p)
+ self.pg0.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg0.get_capture(len(pkts))
+ for packet in capture:
+ try:
+ self.assertEqual(packet[IPv6].src, nat_addr_ip6)
+ self.assertEqual(packet[IPv6].dst, client.ip6)
+ if packet.haslayer(TCP):
+ self.assertEqual(packet[TCP].sport, server_tcp_out_port)
+ self.assertEqual(packet[TCP].dport, client_tcp_in_port)
+ self.check_tcp_checksum(packet)
+ else:
+ self.assertEqual(packet[UDP].sport, server_udp_out_port)
+ self.assertEqual(packet[UDP].dport, client_udp_in_port)
+ self.check_udp_checksum(packet)
+ except:
+ self.logger.error(ppp("Unexpected or invalid packet:", packet))
+ raise
+
+ # ICMP error
+ pkts = []
+ pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+ IPv6(src=client.ip6, dst=nat_addr_ip6) /
+ ICMPv6DestUnreach(code=1) /
+ packet[IPv6] for packet in capture]
+ self.pg0.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg0.get_capture(len(pkts))
+ for packet in capture:
+ try:
+ self.assertEqual(packet[IPv6].src, nat_addr_ip6)
+ self.assertEqual(packet[IPv6].dst, server.ip6)
+ icmp = packet[ICMPv6DestUnreach]
+ self.assertEqual(icmp.code, 1)
+ inner = icmp[IPerror6]
+ self.assertEqual(inner.src, server.ip6)
+ self.assertEqual(inner.dst, nat_addr_ip6)
+ self.check_icmpv6_checksum(packet)
+ if inner.haslayer(TCPerror):
+ self.assertEqual(inner[TCPerror].sport, server_tcp_in_port)
+ self.assertEqual(inner[TCPerror].dport,
+ client_tcp_out_port)
+ else:
+ self.assertEqual(inner[UDPerror].sport, server_udp_in_port)
+ self.assertEqual(inner[UDPerror].dport,
+ client_udp_out_port)
+ except:
+ self.logger.error(ppp("Unexpected or invalid packet:", packet))
+ raise
+
+ def test_prefix(self):
+ """ NAT64 Network-Specific Prefix """
+
+ self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
+ self.nat_addr_n)
+ self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
+ self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
+ self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
+ self.vrf1_nat_addr_n,
+ vrf_id=self.vrf1_id)
+ self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
+
+ # Add global prefix
+ global_pref64 = "2001:db8::"
+ global_pref64_n = socket.inet_pton(socket.AF_INET6, global_pref64)
+ global_pref64_len = 32
+ self.vapi.nat64_add_del_prefix(global_pref64_n, global_pref64_len)
+
+ prefix = self.vapi.nat64_prefix_dump()
+ self.assertEqual(len(prefix), 1)
+ self.assertEqual(prefix[0].prefix, global_pref64_n)
+ self.assertEqual(prefix[0].prefix_len, global_pref64_len)
+ self.assertEqual(prefix[0].vrf_id, 0)
+
+ # Add tenant specific prefix
+ vrf1_pref64 = "2001:db8:122:300::"
+ vrf1_pref64_n = socket.inet_pton(socket.AF_INET6, vrf1_pref64)
+ vrf1_pref64_len = 56
+ self.vapi.nat64_add_del_prefix(vrf1_pref64_n,
+ vrf1_pref64_len,
+ vrf_id=self.vrf1_id)
+ prefix = self.vapi.nat64_prefix_dump()
+ self.assertEqual(len(prefix), 2)
+
+ # Global prefix
+ pkts = self.create_stream_in_ip6(self.pg0,
+ self.pg1,
+ pref=global_pref64,
+ plen=global_pref64_len)
+ self.pg0.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg1.get_capture(len(pkts))
+ self.verify_capture_out(capture, nat_ip=self.nat_addr,
+ dst_ip=self.pg1.remote_ip4)
+
+ pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
+ self.pg1.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg0.get_capture(len(pkts))
+ dst_ip = self.compose_ip6(self.pg1.remote_ip4,
+ global_pref64,
+ global_pref64_len)
+ self.verify_capture_in_ip6(capture, dst_ip, self.pg0.remote_ip6)
+
+ # Tenant specific prefix
+ pkts = self.create_stream_in_ip6(self.pg2,
+ self.pg1,
+ pref=vrf1_pref64,
+ plen=vrf1_pref64_len)
+ self.pg2.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg1.get_capture(len(pkts))
+ self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
+ dst_ip=self.pg1.remote_ip4)
+
+ pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
+ self.pg1.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg2.get_capture(len(pkts))
+ dst_ip = self.compose_ip6(self.pg1.remote_ip4,
+ vrf1_pref64,
+ vrf1_pref64_len)
+ self.verify_capture_in_ip6(capture, dst_ip, self.pg2.remote_ip6)
+
+ def nat64_get_ses_num(self):
+ """
+ Return number of active NAT64 sessions.
+ """
+ ses_num = 0
+ st = self.vapi.nat64_st_dump(IP_PROTOS.tcp)
+ ses_num += len(st)
+ st = self.vapi.nat64_st_dump(IP_PROTOS.udp)
+ ses_num += len(st)
+ st = self.vapi.nat64_st_dump(IP_PROTOS.icmp)
+ ses_num += len(st)
+ return ses_num
+
+ def clear_nat64(self):
+ """
+ Clear NAT64 configuration.
+ """
+ self.vapi.nat64_set_timeouts()
+
+ interfaces = self.vapi.nat64_interface_dump()
+ for intf in interfaces:
+ self.vapi.nat64_add_del_interface(intf.sw_if_index,
+ intf.is_inside,
+ is_add=0)
+
+ bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
+ for bibe in bib:
+ if bibe.is_static:
+ self.vapi.nat64_add_del_static_bib(bibe.i_addr,
+ bibe.o_addr,
+ bibe.i_port,
+ bibe.o_port,
+ bibe.proto,
+ bibe.vrf_id,
+ is_add=0)
+
+ bib = self.vapi.nat64_bib_dump(IP_PROTOS.udp)
+ for bibe in bib:
+ if bibe.is_static:
+ self.vapi.nat64_add_del_static_bib(bibe.i_addr,
+ bibe.o_addr,
+ bibe.i_port,
+ bibe.o_port,
+ bibe.proto,
+ bibe.vrf_id,
+ is_add=0)
+
+ bib = self.vapi.nat64_bib_dump(IP_PROTOS.icmp)
+ for bibe in bib:
+ if bibe.is_static:
+ self.vapi.nat64_add_del_static_bib(bibe.i_addr,
+ bibe.o_addr,
+ bibe.i_port,
+ bibe.o_port,
+ bibe.proto,
+ bibe.vrf_id,
+ is_add=0)
+
+ adresses = self.vapi.nat64_pool_addr_dump()
+ for addr in adresses:
+ self.vapi.nat64_add_del_pool_addr_range(addr.address,
+ addr.address,
+ vrf_id=addr.vrf_id,
+ is_add=0)
+
+ prefixes = self.vapi.nat64_prefix_dump()
+ for prefix in prefixes:
+ self.vapi.nat64_add_del_prefix(prefix.prefix,
+ prefix.prefix_len,
+ vrf_id=prefix.vrf_id,
+ is_add=0)
+
+ def tearDown(self):
+ super(TestNAT64, self).tearDown()
+ if not self.vpp_dead:
+ self.logger.info(self.vapi.cli("show nat64 pool"))
+ self.logger.info(self.vapi.cli("show nat64 interfaces"))
+ self.logger.info(self.vapi.cli("show nat64 prefix"))
+ self.logger.info(self.vapi.cli("show nat64 bib tcp"))
+ self.logger.info(self.vapi.cli("show nat64 bib udp"))
+ self.logger.info(self.vapi.cli("show nat64 bib icmp"))
+ self.logger.info(self.vapi.cli("show nat64 session table tcp"))
+ self.logger.info(self.vapi.cli("show nat64 session table udp"))
+ self.logger.info(self.vapi.cli("show nat64 session table icmp"))
+ self.clear_nat64()
+
if __name__ == '__main__':
unittest.main(testRunner=VppTestRunner)