import unittest
import struct
-from framework import VppTestCase, VppTestRunner
+from framework import VppTestCase, VppTestRunner, running_extended_tests
from scapy.layers.inet import IP, TCP, UDP, ICMP
from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
+from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply
from scapy.layers.l2 import Ether, ARP
from scapy.data import IP_PROTOS
+from scapy.packet import bind_layers
from util import ppp
from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
+from time import sleep
-class TestSNAT(VppTestCase):
- """ SNAT Test Cases """
+class MethodHolder(VppTestCase):
+ """ SNAT create capture and verify method holder """
@classmethod
def setUpClass(cls):
- super(TestSNAT, cls).setUpClass()
-
- try:
- cls.tcp_port_in = 6303
- cls.tcp_port_out = 6303
- cls.udp_port_in = 6304
- cls.udp_port_out = 6304
- cls.icmp_id_in = 6305
- cls.icmp_id_out = 6305
- cls.snat_addr = '10.0.0.3'
-
- cls.create_pg_interfaces(range(8))
- cls.interfaces = list(cls.pg_interfaces[0:4])
-
- for i in cls.interfaces:
- i.admin_up()
- i.config_ip4()
- i.resolve_arp()
-
- cls.pg0.generate_remote_hosts(2)
- cls.pg0.configure_ipv4_neighbors()
-
- cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
-
- cls.pg4._local_ip4 = "172.16.255.1"
- cls.pg4._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
- cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
- cls.pg4.set_table_ip4(10)
- cls.pg5._local_ip4 = "172.16.255.3"
- cls.pg5._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
- cls.pg5._remote_hosts[0]._ip4 = "172.16.255.4"
- cls.pg5.set_table_ip4(10)
- cls.pg6._local_ip4 = "172.16.255.1"
- cls.pg6._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
- cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
- cls.pg6.set_table_ip4(20)
- for i in cls.overlapping_interfaces:
- i.config_ip4()
- i.admin_up()
- i.resolve_arp()
+ super(MethodHolder, cls).setUpClass()
- cls.pg7.admin_up()
-
- except Exception:
- super(TestSNAT, cls).tearDownClass()
- raise
+ def tearDown(self):
+ super(MethodHolder, self).tearDown()
def create_stream_in(self, in_if, out_if, ttl=64):
"""
# TCP
p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
- TCP(sport=self.tcp_port_in))
+ TCP(sport=self.tcp_port_in, dport=20))
pkts.append(p)
# UDP
p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
- UDP(sport=self.udp_port_in))
+ UDP(sport=self.udp_port_in, dport=20))
pkts.append(p)
# ICMP
return pkts
+ def create_stream_in_ip6(self, in_if, out_if, hlim=64):
+ """
+ Create IPv6 packet stream for inside network
+
+ :param in_if: Inside interface
+ :param out_if: Outside interface
+ :param ttl: Hop Limit of generated packets
+ """
+ pkts = []
+ dst = ''.join(['64:ff9b::', out_if.remote_ip4])
+ # TCP
+ p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
+ IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
+ TCP(sport=self.tcp_port_in, dport=20))
+ pkts.append(p)
+
+ # UDP
+ p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
+ IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
+ UDP(sport=self.udp_port_in, dport=20))
+ pkts.append(p)
+
+ # ICMP
+ p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
+ IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
+ ICMPv6EchoRequest(id=self.icmp_id_in))
+ pkts.append(p)
+
+ return pkts
+
def create_stream_out(self, out_if, dst_ip=None, ttl=64):
"""
Create packet stream for outside network
# TCP
p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
- TCP(dport=self.tcp_port_out))
+ TCP(dport=self.tcp_port_out, sport=20))
pkts.append(p)
# UDP
p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
- UDP(dport=self.udp_port_out))
+ UDP(dport=self.udp_port_out, sport=20))
pkts.append(p)
# ICMP
return pkts
def verify_capture_out(self, capture, nat_ip=None, same_port=False,
- packet_num=3):
+ packet_num=3, dst_ip=None):
"""
Verify captured packets on outside network
:param nat_ip: Translated IP address (Default use global SNAT address)
:param same_port: Sorce port number is not translated (Default False)
:param packet_num: Expected number of packets (Default 3)
+ :param dst_ip: Destination IP address (Default do not verify)
"""
if nat_ip is None:
nat_ip = self.snat_addr
for packet in capture:
try:
self.assertEqual(packet[IP].src, nat_ip)
+ if dst_ip is not None:
+ self.assertEqual(packet[IP].dst, dst_ip)
if packet.haslayer(TCP):
if same_port:
self.assertEqual(packet[TCP].sport, self.tcp_port_in)
"(inside network):", packet))
raise
+ def verify_capture_in_ip6(self, capture, src_ip, dst_ip, packet_num=3):
+ """
+ Verify captured IPv6 packets on inside network
+
+ :param capture: Captured packets
+ :param src_ip: Source IP
+ :param dst_ip: Destination IP address
+ :param packet_num: Expected number of packets (Default 3)
+ """
+ self.assertEqual(packet_num, len(capture))
+ for packet in capture:
+ try:
+ self.assertEqual(packet[IPv6].src, src_ip)
+ self.assertEqual(packet[IPv6].dst, dst_ip)
+ if packet.haslayer(TCP):
+ self.assertEqual(packet[TCP].dport, self.tcp_port_in)
+ elif packet.haslayer(UDP):
+ self.assertEqual(packet[UDP].dport, self.udp_port_in)
+ else:
+ self.assertEqual(packet[ICMPv6EchoReply].id,
+ self.icmp_id_in)
+ except:
+ self.logger.error(ppp("Unexpected or invalid packet "
+ "(inside network):", packet))
+ raise
+
def verify_capture_no_translation(self, capture, ingress_if, egress_if):
"""
Verify captured packet that don't have to be translated
# natPoolID
self.assertEqual(struct.pack("!I", 0), record[283])
+
+class TestSNAT(MethodHolder):
+ """ SNAT Test Cases """
+
+ @classmethod
+ def setUpClass(cls):
+ super(TestSNAT, cls).setUpClass()
+
+ try:
+ cls.tcp_port_in = 6303
+ cls.tcp_port_out = 6303
+ cls.udp_port_in = 6304
+ cls.udp_port_out = 6304
+ cls.icmp_id_in = 6305
+ cls.icmp_id_out = 6305
+ cls.snat_addr = '10.0.0.3'
+ cls.ipfix_src_port = 4739
+ cls.ipfix_domain_id = 1
+
+ cls.create_pg_interfaces(range(9))
+ cls.interfaces = list(cls.pg_interfaces[0:4])
+
+ for i in cls.interfaces:
+ i.admin_up()
+ i.config_ip4()
+ i.resolve_arp()
+
+ cls.pg0.generate_remote_hosts(3)
+ cls.pg0.configure_ipv4_neighbors()
+
+ cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
+
+ cls.pg4._local_ip4 = "172.16.255.1"
+ cls.pg4._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
+ cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
+ cls.pg4.set_table_ip4(10)
+ cls.pg5._local_ip4 = "172.16.255.3"
+ cls.pg5._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
+ cls.pg5._remote_hosts[0]._ip4 = "172.16.255.4"
+ cls.pg5.set_table_ip4(10)
+ cls.pg6._local_ip4 = "172.16.255.1"
+ cls.pg6._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
+ cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
+ cls.pg6.set_table_ip4(20)
+ for i in cls.overlapping_interfaces:
+ i.config_ip4()
+ i.admin_up()
+ i.resolve_arp()
+
+ cls.pg7.admin_up()
+ cls.pg8.admin_up()
+
+ except Exception:
+ super(TestSNAT, cls).tearDownClass()
+ raise
+
def clear_snat(self):
"""
Clear SNAT configuration.
"""
+ # I found no elegant way to do this
+ self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
+ dst_address_length=32,
+ next_hop_address=self.pg7.remote_ip4n,
+ next_hop_sw_if_index=self.pg7.sw_if_index,
+ is_add=0)
+ self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
+ dst_address_length=32,
+ next_hop_address=self.pg8.remote_ip4n,
+ next_hop_sw_if_index=self.pg8.sw_if_index,
+ is_add=0)
+
+ for intf in [self.pg7, self.pg8]:
+ neighbors = self.vapi.ip_neighbor_dump(intf.sw_if_index)
+ for n in neighbors:
+ self.vapi.ip_neighbor_add_del(intf.sw_if_index,
+ n.mac_address,
+ n.ip_address,
+ is_add=0)
+
if self.pg7.has_ip4_config:
self.pg7.unconfig_ip4()
for intf in interfaces:
self.vapi.snat_add_interface_addr(intf.sw_if_index, is_add=0)
- self.vapi.snat_ipfix(enable=0)
+ self.vapi.snat_ipfix(enable=0, src_port=self.ipfix_src_port,
+ domain_id=self.ipfix_domain_id)
+ self.ipfix_src_port = 4739
+ self.ipfix_domain_id = 1
interfaces = self.vapi.snat_interface_dump()
for intf in interfaces:
proto,
is_add)
- def snat_add_address(self, ip, is_add=1):
+ def snat_add_address(self, ip, is_add=1, vrf_id=0xFFFFFFFF):
"""
Add/delete S-NAT address
:param is_add: 1 if add, 0 if delete (Default add)
"""
snat_addr = socket.inet_pton(socket.AF_INET, ip)
- self.vapi.snat_add_address_range(snat_addr, snat_addr, is_add)
+ self.vapi.snat_add_address_range(snat_addr, snat_addr, is_add,
+ vrf_id=vrf_id)
def test_dynamic(self):
""" SNAT dynamic translation test """
self.verify_capture_out_with_icmp_errors(capture)
def test_ping_out_interface_from_outside(self):
- """ Ping SNAT out interface from outside """
+ """ Ping SNAT out interface from outside network """
self.snat_add_address(self.snat_addr)
self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
"(outside network):", packet))
raise
+ def test_ping_internal_host_from_outside(self):
+ """ Ping internal host from outside network """
+
+ self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr)
+ self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
+ self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
+ is_inside=0)
+
+ # out2in
+ pkt = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
+ IP(src=self.pg1.remote_ip4, dst=self.snat_addr, ttl=64) /
+ ICMP(id=self.icmp_id_out, type='echo-request'))
+ self.pg1.add_stream(pkt)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg0.get_capture(1)
+ self.verify_capture_in(capture, self.pg0, packet_num=1)
+ self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
+
+ # in2out
+ pkt = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+ IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64) /
+ ICMP(id=self.icmp_id_in, type='echo-reply'))
+ self.pg0.add_stream(pkt)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg1.get_capture(1)
+ self.verify_capture_out(capture, same_port=True, packet_num=1)
+ self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
+
def test_static_in(self):
""" SNAT 1:1 NAT initialized from inside network """
capture = self.pg5.get_capture(len(pkts))
self.verify_capture_in(capture, self.pg5)
+ # pg5 session dump
+ addresses = self.vapi.snat_address_dump()
+ self.assertEqual(len(addresses), 1)
+ sessions = self.vapi.snat_user_session_dump(self.pg5.remote_ip4n, 10)
+ self.assertEqual(len(sessions), 3)
+ for session in sessions:
+ self.assertFalse(session.is_static)
+ self.assertEqual(session.inside_ip_address[0:4],
+ self.pg5.remote_ip4n)
+ self.assertEqual(session.outside_ip_address,
+ addresses[0].ip_address)
+ self.assertEqual(sessions[0].protocol, IP_PROTOS.tcp)
+ self.assertEqual(sessions[1].protocol, IP_PROTOS.udp)
+ self.assertEqual(sessions[2].protocol, IP_PROTOS.icmp)
+ self.assertEqual(sessions[0].inside_port, self.tcp_port_in)
+ self.assertEqual(sessions[1].inside_port, self.udp_port_in)
+ self.assertEqual(sessions[2].inside_port, self.icmp_id_in)
+ self.assertEqual(sessions[0].outside_port, self.tcp_port_out)
+ self.assertEqual(sessions[1].outside_port, self.udp_port_out)
+ self.assertEqual(sessions[2].outside_port, self.icmp_id_out)
+
# in2out 3rd interface
pkts = self.create_stream_in(self.pg6, self.pg3)
self.pg6.add_stream(pkts)
capture = self.pg6.get_capture(len(pkts))
self.verify_capture_in(capture, self.pg6)
+ # general user and session dump verifications
+ users = self.vapi.snat_user_dump()
+ self.assertTrue(len(users) >= 3)
+ addresses = self.vapi.snat_address_dump()
+ self.assertEqual(len(addresses), 1)
+ for user in users:
+ sessions = self.vapi.snat_user_session_dump(user.ip_address,
+ user.vrf_id)
+ for session in sessions:
+ self.assertEqual(user.ip_address, session.inside_ip_address)
+ self.assertTrue(session.total_bytes > session.total_pkts > 0)
+ self.assertTrue(session.protocol in
+ [IP_PROTOS.tcp, IP_PROTOS.udp,
+ IP_PROTOS.icmp])
+
+ # pg4 session dump
+ sessions = self.vapi.snat_user_session_dump(self.pg4.remote_ip4n, 10)
+ self.assertTrue(len(sessions) >= 4)
+ for session in sessions:
+ self.assertFalse(session.is_static)
+ self.assertEqual(session.inside_ip_address[0:4],
+ self.pg4.remote_ip4n)
+ self.assertEqual(session.outside_ip_address,
+ addresses[0].ip_address)
+
+ # pg6 session dump
+ sessions = self.vapi.snat_user_session_dump(self.pg6.remote_ip4n, 20)
+ self.assertTrue(len(sessions) >= 3)
+ for session in sessions:
+ self.assertTrue(session.is_static)
+ self.assertEqual(session.inside_ip_address[0:4],
+ self.pg6.remote_ip4n)
+ self.assertEqual(map(ord, session.outside_ip_address[0:4]),
+ map(int, static_nat_ip.split('.')))
+ self.assertTrue(session.inside_port in
+ [self.tcp_port_in, self.udp_port_in,
+ self.icmp_id_in])
+
def test_hairpinning(self):
- """ SNAT hairpinning """
+ """ SNAT hairpinning - 1:1 NAT with port"""
host = self.pg0.remote_hosts[0]
server = self.pg0.remote_hosts[1]
self.logger.error(ppp("Unexpected or invalid packet:"), p)
raise
+ def test_hairpinning2(self):
+ """ SNAT hairpinning - 1:1 NAT"""
+
+ server1_nat_ip = "10.0.0.10"
+ server2_nat_ip = "10.0.0.11"
+ host = self.pg0.remote_hosts[0]
+ server1 = self.pg0.remote_hosts[1]
+ server2 = self.pg0.remote_hosts[2]
+ server_tcp_port = 22
+ server_udp_port = 20
+
+ self.snat_add_address(self.snat_addr)
+ self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
+ self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
+ is_inside=0)
+
+ # add static mapping for servers
+ self.snat_add_static_mapping(server1.ip4, server1_nat_ip)
+ self.snat_add_static_mapping(server2.ip4, server2_nat_ip)
+
+ # host to server1
+ pkts = []
+ p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+ IP(src=host.ip4, dst=server1_nat_ip) /
+ TCP(sport=self.tcp_port_in, dport=server_tcp_port))
+ pkts.append(p)
+ p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+ IP(src=host.ip4, dst=server1_nat_ip) /
+ UDP(sport=self.udp_port_in, dport=server_udp_port))
+ pkts.append(p)
+ p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+ IP(src=host.ip4, dst=server1_nat_ip) /
+ ICMP(id=self.icmp_id_in, type='echo-request'))
+ pkts.append(p)
+ self.pg0.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg0.get_capture(len(pkts))
+ for packet in capture:
+ try:
+ self.assertEqual(packet[IP].src, self.snat_addr)
+ self.assertEqual(packet[IP].dst, server1.ip4)
+ if packet.haslayer(TCP):
+ self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
+ self.assertEqual(packet[TCP].dport, server_tcp_port)
+ self.tcp_port_out = packet[TCP].sport
+ elif packet.haslayer(UDP):
+ self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
+ self.assertEqual(packet[UDP].dport, server_udp_port)
+ self.udp_port_out = packet[UDP].sport
+ else:
+ self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
+ self.icmp_id_out = packet[ICMP].id
+ except:
+ self.logger.error(ppp("Unexpected or invalid packet:", packet))
+ raise
+
+ # server1 to host
+ pkts = []
+ p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+ IP(src=server1.ip4, dst=self.snat_addr) /
+ TCP(sport=server_tcp_port, dport=self.tcp_port_out))
+ pkts.append(p)
+ p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+ IP(src=server1.ip4, dst=self.snat_addr) /
+ UDP(sport=server_udp_port, dport=self.udp_port_out))
+ pkts.append(p)
+ p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+ IP(src=server1.ip4, dst=self.snat_addr) /
+ ICMP(id=self.icmp_id_out, type='echo-reply'))
+ pkts.append(p)
+ self.pg0.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg0.get_capture(len(pkts))
+ for packet in capture:
+ try:
+ self.assertEqual(packet[IP].src, server1_nat_ip)
+ self.assertEqual(packet[IP].dst, host.ip4)
+ if packet.haslayer(TCP):
+ self.assertEqual(packet[TCP].dport, self.tcp_port_in)
+ self.assertEqual(packet[TCP].sport, server_tcp_port)
+ elif packet.haslayer(UDP):
+ self.assertEqual(packet[UDP].dport, self.udp_port_in)
+ self.assertEqual(packet[UDP].sport, server_udp_port)
+ else:
+ self.assertEqual(packet[ICMP].id, self.icmp_id_in)
+ except:
+ self.logger.error(ppp("Unexpected or invalid packet:", packet))
+ raise
+
+ # server2 to server1
+ pkts = []
+ p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+ IP(src=server2.ip4, dst=server1_nat_ip) /
+ TCP(sport=self.tcp_port_in, dport=server_tcp_port))
+ pkts.append(p)
+ p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+ IP(src=server2.ip4, dst=server1_nat_ip) /
+ UDP(sport=self.udp_port_in, dport=server_udp_port))
+ pkts.append(p)
+ p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+ IP(src=server2.ip4, dst=server1_nat_ip) /
+ ICMP(id=self.icmp_id_in, type='echo-request'))
+ pkts.append(p)
+ self.pg0.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg0.get_capture(len(pkts))
+ for packet in capture:
+ try:
+ self.assertEqual(packet[IP].src, server2_nat_ip)
+ self.assertEqual(packet[IP].dst, server1.ip4)
+ if packet.haslayer(TCP):
+ self.assertEqual(packet[TCP].sport, self.tcp_port_in)
+ self.assertEqual(packet[TCP].dport, server_tcp_port)
+ self.tcp_port_out = packet[TCP].sport
+ elif packet.haslayer(UDP):
+ self.assertEqual(packet[UDP].sport, self.udp_port_in)
+ self.assertEqual(packet[UDP].dport, server_udp_port)
+ self.udp_port_out = packet[UDP].sport
+ else:
+ self.assertEqual(packet[ICMP].id, self.icmp_id_in)
+ self.icmp_id_out = packet[ICMP].id
+ except:
+ self.logger.error(ppp("Unexpected or invalid packet:", packet))
+ raise
+
+ # server1 to server2
+ pkts = []
+ p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+ IP(src=server1.ip4, dst=server2_nat_ip) /
+ TCP(sport=server_tcp_port, dport=self.tcp_port_out))
+ pkts.append(p)
+ p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+ IP(src=server1.ip4, dst=server2_nat_ip) /
+ UDP(sport=server_udp_port, dport=self.udp_port_out))
+ pkts.append(p)
+ p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+ IP(src=server1.ip4, dst=server2_nat_ip) /
+ ICMP(id=self.icmp_id_out, type='echo-reply'))
+ pkts.append(p)
+ self.pg0.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg0.get_capture(len(pkts))
+ for packet in capture:
+ try:
+ self.assertEqual(packet[IP].src, server1_nat_ip)
+ self.assertEqual(packet[IP].dst, server2.ip4)
+ if packet.haslayer(TCP):
+ self.assertEqual(packet[TCP].dport, self.tcp_port_in)
+ self.assertEqual(packet[TCP].sport, server_tcp_port)
+ elif packet.haslayer(UDP):
+ self.assertEqual(packet[UDP].dport, self.udp_port_in)
+ self.assertEqual(packet[UDP].sport, server_udp_port)
+ else:
+ self.assertEqual(packet[ICMP].id, self.icmp_id_in)
+ except:
+ self.logger.error(ppp("Unexpected or invalid packet:", packet))
+ raise
+
def test_max_translations_per_user(self):
""" MAX translations per user - recycle the least recently used """
def test_ipfix_nat44_sess(self):
""" S-NAT IPFIX logging NAT44 session created/delted """
+ self.ipfix_domain_id = 10
+ self.ipfix_src_port = 20202
+ colector_port = 30303
+ bind_layers(UDP, IPFIX, dport=30303)
self.snat_add_address(self.snat_addr)
self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
src_address=self.pg3.local_ip4n,
path_mtu=512,
- template_interval=10)
- self.vapi.snat_ipfix()
+ template_interval=10,
+ collector_port=colector_port)
+ self.vapi.snat_ipfix(domain_id=self.ipfix_domain_id,
+ src_port=self.ipfix_src_port)
pkts = self.create_stream_in(self.pg0, self.pg1)
self.pg0.add_stream(pkts)
# first load template
for p in capture:
self.assertTrue(p.haslayer(IPFIX))
+ self.assertEqual(p[IP].src, self.pg3.local_ip4)
+ self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
+ self.assertEqual(p[UDP].sport, self.ipfix_src_port)
+ self.assertEqual(p[UDP].dport, colector_port)
+ self.assertEqual(p[IPFIX].observationDomainID,
+ self.ipfix_domain_id)
if p.haslayer(Template):
ipfix.add_template(p.getlayer(Template))
# verify events in data set
src_address=self.pg3.local_ip4n,
path_mtu=512,
template_interval=10)
- self.vapi.snat_ipfix()
+ self.vapi.snat_ipfix(domain_id=self.ipfix_domain_id,
+ src_port=self.ipfix_src_port)
p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
# first load template
for p in capture:
self.assertTrue(p.haslayer(IPFIX))
+ self.assertEqual(p[IP].src, self.pg3.local_ip4)
+ self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
+ self.assertEqual(p[UDP].sport, self.ipfix_src_port)
+ self.assertEqual(p[UDP].dport, 4739)
+ self.assertEqual(p[IPFIX].observationDomainID,
+ self.ipfix_domain_id)
if p.haslayer(Template):
ipfix.add_template(p.getlayer(Template))
# verify events in data set
self.pg_start()
capture = self.pg1.get_capture(0)
- def tearDown(self):
- super(TestSNAT, self).tearDown()
- if not self.vpp_dead:
- self.logger.info(self.vapi.cli("show snat verbose"))
- self.clear_snat()
+ def test_vrf_mode(self):
+ """ S-NAT tenant VRF aware address pool mode """
+
+ vrf_id1 = 1
+ vrf_id2 = 2
+ nat_ip1 = "10.0.0.10"
+ nat_ip2 = "10.0.0.11"
+
+ self.pg0.unconfig_ip4()
+ self.pg1.unconfig_ip4()
+ self.pg0.set_table_ip4(vrf_id1)
+ self.pg1.set_table_ip4(vrf_id2)
+ self.pg0.config_ip4()
+ self.pg1.config_ip4()
+ self.snat_add_address(nat_ip1, vrf_id=vrf_id1)
+ self.snat_add_address(nat_ip2, vrf_id=vrf_id2)
+ self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
+ self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index)
+ self.vapi.snat_interface_add_del_feature(self.pg2.sw_if_index,
+ is_inside=0)
+
+ # first VRF
+ pkts = self.create_stream_in(self.pg0, self.pg2)
+ self.pg0.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg2.get_capture(len(pkts))
+ self.verify_capture_out(capture, nat_ip1)
+
+ # second VRF
+ pkts = self.create_stream_in(self.pg1, self.pg2)
+ self.pg1.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg2.get_capture(len(pkts))
+ self.verify_capture_out(capture, nat_ip2)
+
+ def test_vrf_feature_independent(self):
+ """ S-NAT tenant VRF independent address pool mode """
+
+ nat_ip1 = "10.0.0.10"
+ nat_ip2 = "10.0.0.11"
+
+ self.snat_add_address(nat_ip1)
+ self.snat_add_address(nat_ip2)
+ self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
+ self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index)
+ self.vapi.snat_interface_add_del_feature(self.pg2.sw_if_index,
+ is_inside=0)
+
+ # first VRF
+ pkts = self.create_stream_in(self.pg0, self.pg2)
+ self.pg0.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg2.get_capture(len(pkts))
+ self.verify_capture_out(capture, nat_ip1)
+
+ # second VRF
+ pkts = self.create_stream_in(self.pg1, self.pg2)
+ self.pg1.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg2.get_capture(len(pkts))
+ self.verify_capture_out(capture, nat_ip1)
+
+ def test_dynamic_ipless_interfaces(self):
+ """ SNAT interfaces without configured ip dynamic map """
+
+ self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
+ self.pg7.remote_mac,
+ self.pg7.remote_ip4n,
+ is_static=1)
+ self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
+ self.pg8.remote_mac,
+ self.pg8.remote_ip4n,
+ is_static=1)
+
+ self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
+ dst_address_length=32,
+ next_hop_address=self.pg7.remote_ip4n,
+ next_hop_sw_if_index=self.pg7.sw_if_index)
+ self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
+ dst_address_length=32,
+ next_hop_address=self.pg8.remote_ip4n,
+ next_hop_sw_if_index=self.pg8.sw_if_index)
+
+ self.snat_add_address(self.snat_addr)
+ self.vapi.snat_interface_add_del_feature(self.pg7.sw_if_index)
+ self.vapi.snat_interface_add_del_feature(self.pg8.sw_if_index,
+ is_inside=0)
+
+ # in2out
+ pkts = self.create_stream_in(self.pg7, self.pg8)
+ self.pg7.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg8.get_capture(len(pkts))
+ self.verify_capture_out(capture)
+
+ # out2in
+ pkts = self.create_stream_out(self.pg8, self.snat_addr)
+ self.pg8.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg7.get_capture(len(pkts))
+ self.verify_capture_in(capture, self.pg7)
+
+ def test_static_ipless_interfaces(self):
+ """ SNAT 1:1 NAT interfaces without configured ip """
+
+ self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
+ self.pg7.remote_mac,
+ self.pg7.remote_ip4n,
+ is_static=1)
+ self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
+ self.pg8.remote_mac,
+ self.pg8.remote_ip4n,
+ is_static=1)
+
+ self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
+ dst_address_length=32,
+ next_hop_address=self.pg7.remote_ip4n,
+ next_hop_sw_if_index=self.pg7.sw_if_index)
+ self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
+ dst_address_length=32,
+ next_hop_address=self.pg8.remote_ip4n,
+ next_hop_sw_if_index=self.pg8.sw_if_index)
+
+ self.snat_add_static_mapping(self.pg7.remote_ip4, self.snat_addr)
+ self.vapi.snat_interface_add_del_feature(self.pg7.sw_if_index)
+ self.vapi.snat_interface_add_del_feature(self.pg8.sw_if_index,
+ is_inside=0)
+
+ # out2in
+ pkts = self.create_stream_out(self.pg8)
+ self.pg8.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg7.get_capture(len(pkts))
+ self.verify_capture_in(capture, self.pg7)
+
+ # in2out
+ pkts = self.create_stream_in(self.pg7, self.pg8)
+ self.pg7.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg8.get_capture(len(pkts))
+ self.verify_capture_out(capture, self.snat_addr, True)
+
+ def test_static_with_port_ipless_interfaces(self):
+ """ SNAT 1:1 NAT with port interfaces without configured ip """
+
+ self.tcp_port_out = 30606
+ self.udp_port_out = 30607
+ self.icmp_id_out = 30608
+
+ self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
+ self.pg7.remote_mac,
+ self.pg7.remote_ip4n,
+ is_static=1)
+ self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
+ self.pg8.remote_mac,
+ self.pg8.remote_ip4n,
+ is_static=1)
+
+ self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
+ dst_address_length=32,
+ next_hop_address=self.pg7.remote_ip4n,
+ next_hop_sw_if_index=self.pg7.sw_if_index)
+ self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
+ dst_address_length=32,
+ next_hop_address=self.pg8.remote_ip4n,
+ next_hop_sw_if_index=self.pg8.sw_if_index)
+
+ self.snat_add_address(self.snat_addr)
+ self.snat_add_static_mapping(self.pg7.remote_ip4, self.snat_addr,
+ self.tcp_port_in, self.tcp_port_out,
+ proto=IP_PROTOS.tcp)
+ self.snat_add_static_mapping(self.pg7.remote_ip4, self.snat_addr,
+ self.udp_port_in, self.udp_port_out,
+ proto=IP_PROTOS.udp)
+ self.snat_add_static_mapping(self.pg7.remote_ip4, self.snat_addr,
+ self.icmp_id_in, self.icmp_id_out,
+ proto=IP_PROTOS.icmp)
+ self.vapi.snat_interface_add_del_feature(self.pg7.sw_if_index)
+ self.vapi.snat_interface_add_del_feature(self.pg8.sw_if_index,
+ is_inside=0)
+
+ # out2in
+ pkts = self.create_stream_out(self.pg8)
+ self.pg8.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg7.get_capture(len(pkts))
+ self.verify_capture_in(capture, self.pg7)
+
+ # in2out
+ pkts = self.create_stream_in(self.pg7, self.pg8)
+ self.pg7.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg8.get_capture(len(pkts))
+ self.verify_capture_out(capture)
+
+ def tearDown(self):
+ super(TestSNAT, self).tearDown()
+ if not self.vpp_dead:
+ self.logger.info(self.vapi.cli("show snat verbose"))
+ self.clear_snat()
+
+
+class TestDeterministicNAT(MethodHolder):
+ """ Deterministic NAT Test Cases """
+
+ @classmethod
+ def setUpConstants(cls):
+ super(TestDeterministicNAT, cls).setUpConstants()
+ cls.vpp_cmdline.extend(["snat", "{", "deterministic", "}"])
+
+ @classmethod
+ def setUpClass(cls):
+ super(TestDeterministicNAT, cls).setUpClass()
+
+ try:
+ cls.tcp_port_in = 6303
+ cls.tcp_external_port = 6303
+ cls.udp_port_in = 6304
+ cls.udp_external_port = 6304
+ cls.icmp_id_in = 6305
+ cls.snat_addr = '10.0.0.3'
+
+ cls.create_pg_interfaces(range(3))
+ cls.interfaces = list(cls.pg_interfaces)
+
+ for i in cls.interfaces:
+ i.admin_up()
+ i.config_ip4()
+ i.resolve_arp()
+
+ cls.pg0.generate_remote_hosts(2)
+ cls.pg0.configure_ipv4_neighbors()
+
+ except Exception:
+ super(TestDeterministicNAT, cls).tearDownClass()
+ raise
+
+ def create_stream_in(self, in_if, out_if, ttl=64):
+ """
+ Create packet stream for inside network
+
+ :param in_if: Inside interface
+ :param out_if: Outside interface
+ :param ttl: TTL of generated packets
+ """
+ pkts = []
+ # TCP
+ p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
+ IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
+ TCP(sport=self.tcp_port_in, dport=self.tcp_external_port))
+ pkts.append(p)
+
+ # UDP
+ p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
+ IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
+ UDP(sport=self.udp_port_in, dport=self.udp_external_port))
+ pkts.append(p)
+
+ # ICMP
+ p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
+ IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
+ ICMP(id=self.icmp_id_in, type='echo-request'))
+ pkts.append(p)
+
+ return pkts
+
+ def create_stream_out(self, out_if, dst_ip=None, ttl=64):
+ """
+ Create packet stream for outside network
+
+ :param out_if: Outside interface
+ :param dst_ip: Destination IP address (Default use global SNAT address)
+ :param ttl: TTL of generated packets
+ """
+ if dst_ip is None:
+ dst_ip = self.snat_addr
+ pkts = []
+ # TCP
+ p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
+ IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
+ TCP(dport=self.tcp_port_out, sport=self.tcp_external_port))
+ pkts.append(p)
+
+ # UDP
+ p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
+ IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
+ UDP(dport=self.udp_port_out, sport=self.udp_external_port))
+ pkts.append(p)
+
+ # ICMP
+ p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
+ IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
+ ICMP(id=self.icmp_external_id, type='echo-reply'))
+ pkts.append(p)
+
+ return pkts
+
+ def verify_capture_out(self, capture, nat_ip=None, packet_num=3):
+ """
+ Verify captured packets on outside network
+
+ :param capture: Captured packets
+ :param nat_ip: Translated IP address (Default use global SNAT address)
+ :param same_port: Sorce port number is not translated (Default False)
+ :param packet_num: Expected number of packets (Default 3)
+ """
+ if nat_ip is None:
+ nat_ip = self.snat_addr
+ self.assertEqual(packet_num, len(capture))
+ for packet in capture:
+ try:
+ self.assertEqual(packet[IP].src, nat_ip)
+ if packet.haslayer(TCP):
+ self.tcp_port_out = packet[TCP].sport
+ elif packet.haslayer(UDP):
+ self.udp_port_out = packet[UDP].sport
+ else:
+ self.icmp_external_id = packet[ICMP].id
+ except:
+ self.logger.error(ppp("Unexpected or invalid packet "
+ "(outside network):", packet))
+ raise
+
+ def initiate_tcp_session(self, in_if, out_if):
+ """
+ Initiates TCP session
+
+ :param in_if: Inside interface
+ :param out_if: Outside interface
+ """
+ try:
+ # SYN packet in->out
+ p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
+ IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
+ TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
+ flags="S"))
+ in_if.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = out_if.get_capture(1)
+ p = capture[0]
+ self.tcp_port_out = p[TCP].sport
+
+ # SYN + ACK packet out->in
+ p = (Ether(src=out_if.remote_mac, dst=out_if.local_mac) /
+ IP(src=out_if.remote_ip4, dst=self.snat_addr) /
+ TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
+ flags="SA"))
+ out_if.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ in_if.get_capture(1)
+
+ # ACK packet in->out
+ p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
+ IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
+ TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
+ flags="A"))
+ in_if.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ out_if.get_capture(1)
+
+ except:
+ self.logger.error("TCP 3 way handshake failed")
+ raise
+
+ def verify_ipfix_max_entries_per_user(self, data):
+ """
+ Verify IPFIX maximum entries per user exceeded event
+
+ :param data: Decoded IPFIX data records
+ """
+ self.assertEqual(1, len(data))
+ record = data[0]
+ # natEvent
+ self.assertEqual(ord(record[230]), 13)
+ # natQuotaExceededEvent
+ self.assertEqual('\x03\x00\x00\x00', record[466])
+ # sourceIPv4Address
+ self.assertEqual(self.pg0.remote_ip4n, record[8])
+
+ def test_deterministic_mode(self):
+ """ S-NAT run deterministic mode """
+ in_addr = '172.16.255.0'
+ out_addr = '172.17.255.50'
+ in_addr_t = '172.16.255.20'
+ in_addr_n = socket.inet_aton(in_addr)
+ out_addr_n = socket.inet_aton(out_addr)
+ in_addr_t_n = socket.inet_aton(in_addr_t)
+ in_plen = 24
+ out_plen = 32
+
+ snat_config = self.vapi.snat_show_config()
+ self.assertEqual(1, snat_config.deterministic)
+
+ self.vapi.snat_add_det_map(in_addr_n, in_plen, out_addr_n, out_plen)
+
+ rep1 = self.vapi.snat_det_forward(in_addr_t_n)
+ self.assertEqual(rep1.out_addr[:4], out_addr_n)
+ rep2 = self.vapi.snat_det_reverse(out_addr_n, rep1.out_port_hi)
+ self.assertEqual(rep2.in_addr[:4], in_addr_t_n)
+
+ deterministic_mappings = self.vapi.snat_det_map_dump()
+ self.assertEqual(len(deterministic_mappings), 1)
+ dsm = deterministic_mappings[0]
+ self.assertEqual(in_addr_n, dsm.in_addr[:4])
+ self.assertEqual(in_plen, dsm.in_plen)
+ self.assertEqual(out_addr_n, dsm.out_addr[:4])
+ self.assertEqual(out_plen, dsm.out_plen)
+
+ self.clear_snat()
+ deterministic_mappings = self.vapi.snat_det_map_dump()
+ self.assertEqual(len(deterministic_mappings), 0)
+
+ def test_set_timeouts(self):
+ """ Set deterministic NAT timeouts """
+ timeouts_before = self.vapi.snat_det_get_timeouts()
+
+ self.vapi.snat_det_set_timeouts(timeouts_before.udp + 10,
+ timeouts_before.tcp_established + 10,
+ timeouts_before.tcp_transitory + 10,
+ timeouts_before.icmp + 10)
+
+ timeouts_after = self.vapi.snat_det_get_timeouts()
+
+ self.assertNotEqual(timeouts_before.udp, timeouts_after.udp)
+ self.assertNotEqual(timeouts_before.icmp, timeouts_after.icmp)
+ self.assertNotEqual(timeouts_before.tcp_established,
+ timeouts_after.tcp_established)
+ self.assertNotEqual(timeouts_before.tcp_transitory,
+ timeouts_after.tcp_transitory)
+
+ def test_det_in(self):
+ """ CGNAT translation test (TCP, UDP, ICMP) """
+
+ nat_ip = "10.0.0.10"
+
+ self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
+ 32,
+ socket.inet_aton(nat_ip),
+ 32)
+ self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
+ self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
+ is_inside=0)
+
+ # in2out
+ pkts = self.create_stream_in(self.pg0, self.pg1)
+ self.pg0.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg1.get_capture(len(pkts))
+ self.verify_capture_out(capture, nat_ip)
+
+ # out2in
+ pkts = self.create_stream_out(self.pg1, nat_ip)
+ self.pg1.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg0.get_capture(len(pkts))
+ self.verify_capture_in(capture, self.pg0)
+
+ # session dump test
+ sessions = self.vapi.snat_det_session_dump(self.pg0.remote_ip4n)
+ self.assertEqual(len(sessions), 3)
+
+ # TCP session
+ s = sessions[0]
+ self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
+ self.assertEqual(s.in_port, self.tcp_port_in)
+ self.assertEqual(s.out_port, self.tcp_port_out)
+ self.assertEqual(s.ext_port, self.tcp_external_port)
+
+ # UDP session
+ s = sessions[1]
+ self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
+ self.assertEqual(s.in_port, self.udp_port_in)
+ self.assertEqual(s.out_port, self.udp_port_out)
+ self.assertEqual(s.ext_port, self.udp_external_port)
+
+ # ICMP session
+ s = sessions[2]
+ self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
+ self.assertEqual(s.in_port, self.icmp_id_in)
+ self.assertEqual(s.out_port, self.icmp_external_id)
+
+ def test_multiple_users(self):
+ """ CGNAT multiple users """
+
+ nat_ip = "10.0.0.10"
+ port_in = 80
+ external_port = 6303
+
+ host0 = self.pg0.remote_hosts[0]
+ host1 = self.pg0.remote_hosts[1]
+
+ self.vapi.snat_add_det_map(host0.ip4n,
+ 24,
+ socket.inet_aton(nat_ip),
+ 32)
+ self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
+ self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
+ is_inside=0)
+
+ # host0 to out
+ p = (Ether(src=host0.mac, dst=self.pg0.local_mac) /
+ IP(src=host0.ip4, dst=self.pg1.remote_ip4) /
+ TCP(sport=port_in, dport=external_port))
+ self.pg0.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg1.get_capture(1)
+ p = capture[0]
+ try:
+ ip = p[IP]
+ tcp = p[TCP]
+ self.assertEqual(ip.src, nat_ip)
+ self.assertEqual(ip.dst, self.pg1.remote_ip4)
+ self.assertEqual(tcp.dport, external_port)
+ port_out0 = tcp.sport
+ except:
+ self.logger.error(ppp("Unexpected or invalid packet:", p))
+ raise
+
+ # host1 to out
+ p = (Ether(src=host1.mac, dst=self.pg0.local_mac) /
+ IP(src=host1.ip4, dst=self.pg1.remote_ip4) /
+ TCP(sport=port_in, dport=external_port))
+ self.pg0.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg1.get_capture(1)
+ p = capture[0]
+ try:
+ ip = p[IP]
+ tcp = p[TCP]
+ self.assertEqual(ip.src, nat_ip)
+ self.assertEqual(ip.dst, self.pg1.remote_ip4)
+ self.assertEqual(tcp.dport, external_port)
+ port_out1 = tcp.sport
+ except:
+ self.logger.error(ppp("Unexpected or invalid packet:", p))
+ raise
+
+ dms = self.vapi.snat_det_map_dump()
+ self.assertEqual(1, len(dms))
+ self.assertEqual(2, dms[0].ses_num)
+
+ # out to host0
+ p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
+ IP(src=self.pg1.remote_ip4, dst=nat_ip) /
+ TCP(sport=external_port, dport=port_out0))
+ self.pg1.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg0.get_capture(1)
+ p = capture[0]
+ try:
+ ip = p[IP]
+ tcp = p[TCP]
+ self.assertEqual(ip.src, self.pg1.remote_ip4)
+ self.assertEqual(ip.dst, host0.ip4)
+ self.assertEqual(tcp.dport, port_in)
+ self.assertEqual(tcp.sport, external_port)
+ except:
+ self.logger.error(ppp("Unexpected or invalid packet:", p))
+ raise
+
+ # out to host1
+ p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
+ IP(src=self.pg1.remote_ip4, dst=nat_ip) /
+ TCP(sport=external_port, dport=port_out1))
+ self.pg1.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg0.get_capture(1)
+ p = capture[0]
+ try:
+ ip = p[IP]
+ tcp = p[TCP]
+ self.assertEqual(ip.src, self.pg1.remote_ip4)
+ self.assertEqual(ip.dst, host1.ip4)
+ self.assertEqual(tcp.dport, port_in)
+ self.assertEqual(tcp.sport, external_port)
+ except:
+ self.logger.error(ppp("Unexpected or invalid packet", p))
+ raise
+
+ # session close api test
+ self.vapi.snat_det_close_session_out(socket.inet_aton(nat_ip),
+ port_out1,
+ self.pg1.remote_ip4n,
+ external_port)
+ dms = self.vapi.snat_det_map_dump()
+ self.assertEqual(dms[0].ses_num, 1)
+
+ self.vapi.snat_det_close_session_in(host0.ip4n,
+ port_in,
+ self.pg1.remote_ip4n,
+ external_port)
+ dms = self.vapi.snat_det_map_dump()
+ self.assertEqual(dms[0].ses_num, 0)
+
+ def test_tcp_session_close_detection_in(self):
+ """ CGNAT TCP session close initiated from inside network """
+ self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
+ 32,
+ socket.inet_aton(self.snat_addr),
+ 32)
+ self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
+ self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
+ is_inside=0)
+
+ self.initiate_tcp_session(self.pg0, self.pg1)
+
+ # close the session from inside
+ try:
+ # FIN packet in -> out
+ p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
+ IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
+ TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
+ flags="F"))
+ self.pg0.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ self.pg1.get_capture(1)
+
+ pkts = []
+
+ # ACK packet out -> in
+ p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
+ IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
+ TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
+ flags="A"))
+ pkts.append(p)
+
+ # FIN packet out -> in
+ p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
+ IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
+ TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
+ flags="F"))
+ pkts.append(p)
+
+ self.pg1.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ self.pg0.get_capture(2)
+
+ # ACK packet in -> out
+ p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
+ IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
+ TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
+ flags="A"))
+ self.pg0.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ self.pg1.get_capture(1)
+
+ # Check if snat closed the session
+ dms = self.vapi.snat_det_map_dump()
+ self.assertEqual(0, dms[0].ses_num)
+ except:
+ self.logger.error("TCP session termination failed")
+ raise
+
+ def test_tcp_session_close_detection_out(self):
+ """ CGNAT TCP session close initiated from outside network """
+ self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
+ 32,
+ socket.inet_aton(self.snat_addr),
+ 32)
+ self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
+ self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
+ is_inside=0)
+
+ self.initiate_tcp_session(self.pg0, self.pg1)
+
+ # close the session from outside
+ try:
+ # FIN packet out -> in
+ p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
+ IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
+ TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
+ flags="F"))
+ self.pg1.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ self.pg0.get_capture(1)
+
+ pkts = []
+
+ # ACK packet in -> out
+ p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
+ IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
+ TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
+ flags="A"))
+ pkts.append(p)
+
+ # ACK packet in -> out
+ p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
+ IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
+ TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
+ flags="F"))
+ pkts.append(p)
+
+ self.pg0.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ self.pg1.get_capture(2)
+
+ # ACK packet out -> in
+ p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
+ IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
+ TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
+ flags="A"))
+ self.pg1.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ self.pg0.get_capture(1)
+
+ # Check if snat closed the session
+ dms = self.vapi.snat_det_map_dump()
+ self.assertEqual(0, dms[0].ses_num)
+ except:
+ self.logger.error("TCP session termination failed")
+ raise
+
+ @unittest.skipUnless(running_extended_tests(), "part of extended tests")
+ def test_session_timeout(self):
+ """ CGNAT session timeouts """
+ self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
+ 32,
+ socket.inet_aton(self.snat_addr),
+ 32)
+ self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
+ self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
+ is_inside=0)
+
+ self.initiate_tcp_session(self.pg0, self.pg1)
+ self.vapi.snat_det_set_timeouts(5, 5, 5, 5)
+ pkts = self.create_stream_in(self.pg0, self.pg1)
+ self.pg0.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg1.get_capture(len(pkts))
+ sleep(15)
+
+ dms = self.vapi.snat_det_map_dump()
+ self.assertEqual(0, dms[0].ses_num)
+
+ def test_session_limit_per_user(self):
+ """ CGNAT maximum 1000 sessions per user should be created """
+ self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
+ 32,
+ socket.inet_aton(self.snat_addr),
+ 32)
+ self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
+ self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
+ is_inside=0)
+ self.vapi.set_ipfix_exporter(collector_address=self.pg2.remote_ip4n,
+ src_address=self.pg2.local_ip4n,
+ path_mtu=512,
+ template_interval=10)
+ self.vapi.snat_ipfix()
+
+ pkts = []
+ for port in range(1025, 2025):
+ p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
+ IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
+ UDP(sport=port, dport=port))
+ pkts.append(p)
+
+ self.pg0.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg1.get_capture(len(pkts))
+
+ p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
+ IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
+ UDP(sport=3001, dport=3002))
+ self.pg0.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg1.assert_nothing_captured()
+
+ # verify ICMP error packet
+ capture = self.pg0.get_capture(1)
+ p = capture[0]
+ self.assertTrue(p.haslayer(ICMP))
+ icmp = p[ICMP]
+ self.assertEqual(icmp.type, 3)
+ self.assertEqual(icmp.code, 1)
+ self.assertTrue(icmp.haslayer(IPerror))
+ inner_ip = icmp[IPerror]
+ self.assertEqual(inner_ip[UDPerror].sport, 3001)
+ self.assertEqual(inner_ip[UDPerror].dport, 3002)
+
+ dms = self.vapi.snat_det_map_dump()
+
+ self.assertEqual(1000, dms[0].ses_num)
+
+ # verify IPFIX logging
+ self.vapi.cli("ipfix flush") # FIXME this should be an API call
+ capture = self.pg2.get_capture(2)
+ ipfix = IPFIXDecoder()
+ # first load template
+ for p in capture:
+ self.assertTrue(p.haslayer(IPFIX))
+ if p.haslayer(Template):
+ ipfix.add_template(p.getlayer(Template))
+ # verify events in data set
+ for p in capture:
+ if p.haslayer(Data):
+ data = ipfix.decode_data_set(p.getlayer(Set))
+ self.verify_ipfix_max_entries_per_user(data)
+
+ def clear_snat(self):
+ """
+ Clear SNAT configuration.
+ """
+ self.vapi.snat_ipfix(enable=0)
+ self.vapi.snat_det_set_timeouts()
+ deterministic_mappings = self.vapi.snat_det_map_dump()
+ for dsm in deterministic_mappings:
+ self.vapi.snat_add_det_map(dsm.in_addr,
+ dsm.in_plen,
+ dsm.out_addr,
+ dsm.out_plen,
+ is_add=0)
+
+ interfaces = self.vapi.snat_interface_dump()
+ for intf in interfaces:
+ self.vapi.snat_interface_add_del_feature(intf.sw_if_index,
+ intf.is_inside,
+ is_add=0)
+
+ def tearDown(self):
+ super(TestDeterministicNAT, self).tearDown()
+ if not self.vpp_dead:
+ self.logger.info(self.vapi.cli("show snat detail"))
+ self.clear_snat()
+
+
+class TestNAT64(MethodHolder):
+ """ NAT64 Test Cases """
+
+ @classmethod
+ def setUpClass(cls):
+ super(TestNAT64, cls).setUpClass()
+
+ try:
+ cls.tcp_port_in = 6303
+ cls.tcp_port_out = 6303
+ cls.udp_port_in = 6304
+ cls.udp_port_out = 6304
+ cls.icmp_id_in = 6305
+ cls.icmp_id_out = 6305
+ cls.nat_addr = '10.0.0.3'
+ cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
+
+ cls.create_pg_interfaces(range(2))
+ cls.ip6_interfaces = list(cls.pg_interfaces[0:1])
+ cls.ip4_interfaces = list(cls.pg_interfaces[1:2])
+
+ for i in cls.ip6_interfaces:
+ i.admin_up()
+ i.config_ip6()
+ i.resolve_ndp()
+
+ for i in cls.ip4_interfaces:
+ i.admin_up()
+ i.config_ip4()
+ i.resolve_arp()
+
+ except Exception:
+ super(TestNAT64, cls).tearDownClass()
+ raise
+
+ def test_pool(self):
+ """ Add/delete address to NAT64 pool """
+ nat_addr = socket.inet_pton(socket.AF_INET, '1.2.3.4')
+
+ self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr)
+
+ addresses = self.vapi.nat64_pool_addr_dump()
+ self.assertEqual(len(addresses), 1)
+ self.assertEqual(addresses[0].address, nat_addr)
+
+ self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr, is_add=0)
+
+ addresses = self.vapi.nat64_pool_addr_dump()
+ self.assertEqual(len(addresses), 0)
+
+ def test_interface(self):
+ """ Enable/disable NAT64 feature on the interface """
+ self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
+ self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
+
+ interfaces = self.vapi.nat64_interface_dump()
+ self.assertEqual(len(interfaces), 2)
+ pg0_found = False
+ pg1_found = False
+ for intf in interfaces:
+ if intf.sw_if_index == self.pg0.sw_if_index:
+ self.assertEqual(intf.is_inside, 1)
+ pg0_found = True
+ elif intf.sw_if_index == self.pg1.sw_if_index:
+ self.assertEqual(intf.is_inside, 0)
+ pg1_found = True
+ self.assertTrue(pg0_found)
+ self.assertTrue(pg1_found)
+
+ features = self.vapi.cli("show interface features pg0")
+ self.assertNotEqual(features.find('nat64-in2out'), -1)
+ features = self.vapi.cli("show interface features pg1")
+ self.assertNotEqual(features.find('nat64-out2in'), -1)
+
+ self.vapi.nat64_add_del_interface(self.pg0.sw_if_index, is_add=0)
+ self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_add=0)
+
+ interfaces = self.vapi.nat64_interface_dump()
+ self.assertEqual(len(interfaces), 0)
+
+ def test_static_bib(self):
+ """ Add/delete static BIB entry """
+ in_addr = socket.inet_pton(socket.AF_INET6,
+ '2001:db8:85a3::8a2e:370:7334')
+ out_addr = socket.inet_pton(socket.AF_INET, '10.1.1.3')
+ in_port = 1234
+ out_port = 5678
+ proto = IP_PROTOS.tcp
+
+ self.vapi.nat64_add_del_static_bib(in_addr,
+ out_addr,
+ in_port,
+ out_port,
+ proto)
+ bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
+ static_bib_num = 0
+ for bibe in bib:
+ if bibe.is_static:
+ static_bib_num += 1
+ self.assertEqual(bibe.i_addr, in_addr)
+ self.assertEqual(bibe.o_addr, out_addr)
+ self.assertEqual(bibe.i_port, in_port)
+ self.assertEqual(bibe.o_port, out_port)
+ self.assertEqual(static_bib_num, 1)
+
+ self.vapi.nat64_add_del_static_bib(in_addr,
+ out_addr,
+ in_port,
+ out_port,
+ proto,
+ is_add=0)
+ bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
+ static_bib_num = 0
+ for bibe in bib:
+ if bibe.is_static:
+ static_bib_num += 1
+ self.assertEqual(static_bib_num, 0)
+
+ def test_set_timeouts(self):
+ """ Set NAT64 timeouts """
+ # verify default values
+ timeouts = self.vapi.nat64_get_timeouts()
+ self.assertEqual(timeouts.udp, 300)
+ self.assertEqual(timeouts.icmp, 60)
+ self.assertEqual(timeouts.tcp_trans, 240)
+ self.assertEqual(timeouts.tcp_est, 7440)
+ self.assertEqual(timeouts.tcp_incoming_syn, 6)
+
+ # set and verify custom values
+ self.vapi.nat64_set_timeouts(udp=200, icmp=30, tcp_trans=250,
+ tcp_est=7450, tcp_incoming_syn=10)
+ timeouts = self.vapi.nat64_get_timeouts()
+ self.assertEqual(timeouts.udp, 200)
+ self.assertEqual(timeouts.icmp, 30)
+ self.assertEqual(timeouts.tcp_trans, 250)
+ self.assertEqual(timeouts.tcp_est, 7450)
+ self.assertEqual(timeouts.tcp_incoming_syn, 10)
+
+ def test_dynamic(self):
+ """ NAT64 dynamic translation test """
+ self.tcp_port_in = 6303
+ self.udp_port_in = 6304
+ self.icmp_id_in = 6305
+
+ ses_num_start = self.nat64_get_ses_num()
+
+ self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
+ self.nat_addr_n)
+ self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
+ self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
+
+ # in2out
+ pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
+ self.pg0.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg1.get_capture(3)
+ self.verify_capture_out(capture, packet_num=3, nat_ip=self.nat_addr,
+ dst_ip=self.pg1.remote_ip4)
+
+ # out2in
+ pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
+ self.pg1.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg0.get_capture(3)
+ ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
+ self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
+
+ # in2out
+ pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
+ self.pg0.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg1.get_capture(3)
+ self.verify_capture_out(capture, packet_num=3, nat_ip=self.nat_addr,
+ dst_ip=self.pg1.remote_ip4)
+
+ # out2in
+ pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
+ self.pg1.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg0.get_capture(3)
+ ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
+ self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
+
+ ses_num_end = self.nat64_get_ses_num()
+
+ self.assertEqual(ses_num_end - ses_num_start, 3)
+
+ def test_static(self):
+ """ NAT64 static translation test """
+ self.tcp_port_in = 60303
+ self.udp_port_in = 60304
+ self.icmp_id_in = 60305
+ self.tcp_port_out = 60303
+ self.udp_port_out = 60304
+ self.icmp_id_out = 60305
+
+ ses_num_start = self.nat64_get_ses_num()
+
+ self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
+ self.nat_addr_n)
+ self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
+ self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
+
+ self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
+ self.nat_addr_n,
+ self.tcp_port_in,
+ self.tcp_port_out,
+ IP_PROTOS.tcp)
+ self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
+ self.nat_addr_n,
+ self.udp_port_in,
+ self.udp_port_out,
+ IP_PROTOS.udp)
+ self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
+ self.nat_addr_n,
+ self.icmp_id_in,
+ self.icmp_id_out,
+ IP_PROTOS.icmp)
+
+ # in2out
+ pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
+ self.pg0.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg1.get_capture(3)
+ self.verify_capture_out(capture, packet_num=3, nat_ip=self.nat_addr,
+ dst_ip=self.pg1.remote_ip4, same_port=True)
+
+ # out2in
+ pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
+ self.pg1.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg0.get_capture(3)
+ ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
+ self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
+
+ ses_num_end = self.nat64_get_ses_num()
+
+ self.assertEqual(ses_num_end - ses_num_start, 3)
+
+ @unittest.skipUnless(running_extended_tests(), "part of extended tests")
+ def test_session_timeout(self):
+ """ NAT64 session timeout """
+ self.icmp_id_in = 1234
+ self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
+ self.nat_addr_n)
+ self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
+ self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
+ self.vapi.nat64_set_timeouts(icmp=5)
+
+ pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
+ self.pg0.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg1.get_capture(3)
+
+ ses_num_before_timeout = self.nat64_get_ses_num()
+
+ sleep(15)
+
+ # ICMP session after timeout
+ ses_num_after_timeout = self.nat64_get_ses_num()
+ self.assertNotEqual(ses_num_before_timeout, ses_num_after_timeout)
+
+ def nat64_get_ses_num(self):
+ """
+ Return number of active NAT64 sessions.
+ """
+ ses_num = 0
+ st = self.vapi.nat64_st_dump(IP_PROTOS.tcp)
+ ses_num += len(st)
+ st = self.vapi.nat64_st_dump(IP_PROTOS.udp)
+ ses_num += len(st)
+ st = self.vapi.nat64_st_dump(IP_PROTOS.icmp)
+ ses_num += len(st)
+ return ses_num
+
+ def clear_nat64(self):
+ """
+ Clear NAT64 configuration.
+ """
+ self.vapi.nat64_set_timeouts()
+
+ interfaces = self.vapi.nat64_interface_dump()
+ for intf in interfaces:
+ self.vapi.nat64_add_del_interface(intf.sw_if_index,
+ intf.is_inside,
+ is_add=0)
+
+ bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
+ for bibe in bib:
+ if bibe.is_static:
+ self.vapi.nat64_add_del_static_bib(bibe.i_addr,
+ bibe.o_addr,
+ bibe.i_port,
+ bibe.o_port,
+ bibe.proto,
+ bibe.vrf_id,
+ is_add=0)
+
+ bib = self.vapi.nat64_bib_dump(IP_PROTOS.udp)
+ for bibe in bib:
+ if bibe.is_static:
+ self.vapi.nat64_add_del_static_bib(bibe.i_addr,
+ bibe.o_addr,
+ bibe.i_port,
+ bibe.o_port,
+ bibe.proto,
+ bibe.vrf_id,
+ is_add=0)
+
+ bib = self.vapi.nat64_bib_dump(IP_PROTOS.icmp)
+ for bibe in bib:
+ if bibe.is_static:
+ self.vapi.nat64_add_del_static_bib(bibe.i_addr,
+ bibe.o_addr,
+ bibe.i_port,
+ bibe.o_port,
+ bibe.proto,
+ bibe.vrf_id,
+ is_add=0)
+
+ adresses = self.vapi.nat64_pool_addr_dump()
+ for addr in adresses:
+ self.vapi.nat64_add_del_pool_addr_range(addr.address,
+ addr.address,
+ is_add=0)
+
+ def tearDown(self):
+ super(TestNAT64, self).tearDown()
+ if not self.vpp_dead:
+ self.logger.info(self.vapi.cli("show nat64 pool"))
+ self.logger.info(self.vapi.cli("show nat64 interfaces"))
+ self.logger.info(self.vapi.cli("show nat64 bib tcp"))
+ self.logger.info(self.vapi.cli("show nat64 bib udp"))
+ self.logger.info(self.vapi.cli("show nat64 bib icmp"))
+ self.logger.info(self.vapi.cli("show nat64 session table tcp"))
+ self.logger.info(self.vapi.cli("show nat64 session table udp"))
+ self.logger.info(self.vapi.cli("show nat64 session table icmp"))
+ self.clear_nat64()
if __name__ == '__main__':
unittest.main(testRunner=VppTestRunner)