cls.pg0.generate_remote_hosts(3)
cls.pg0.configure_ipv4_neighbors()
+ cls.pg1.generate_remote_hosts(1)
+ cls.pg1.configure_ipv4_neighbors()
+
cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
cls.vapi.ip_table_add_del(10, is_add=1)
cls.vapi.ip_table_add_del(20, is_add=1)
vrf_id=sm.vrf_id,
protocol=sm.protocol,
twice_nat=sm.twice_nat,
+ self_twice_nat=sm.self_twice_nat,
out2in_only=sm.out2in_only,
tag=sm.tag,
+ external_sw_if_index=sm.external_sw_if_index,
is_add=0)
lb_static_mappings = self.vapi.nat44_lb_static_mapping_dump()
lb_sm.protocol,
vrf_id=lb_sm.vrf_id,
twice_nat=lb_sm.twice_nat,
+ self_twice_nat=lb_sm.self_twice_nat,
out2in_only=lb_sm.out2in_only,
tag=lb_sm.tag,
is_add=0,
def nat44_add_static_mapping(self, local_ip, external_ip='0.0.0.0',
local_port=0, external_port=0, vrf_id=0,
is_add=1, external_sw_if_index=0xFFFFFFFF,
- proto=0, twice_nat=0, out2in_only=0, tag=""):
+ proto=0, twice_nat=0, self_twice_nat=0,
+ out2in_only=0, tag=""):
"""
Add/delete NAT44 static mapping
:param external_sw_if_index: External interface instead of IP address
:param proto: IP protocol (Mandatory if port specified)
:param twice_nat: 1 if translate external host address and port
+ :param self_twice_nat: 1 if translate external host address and port
+ whenever external host address equals
+ local address of internal host
:param out2in_only: if 1 rule is matching only out2in direction
:param tag: Opaque string tag
"""
vrf_id,
proto,
twice_nat,
+ self_twice_nat,
out2in_only,
tag,
is_add)
self.logger.error(ppp("Unexpected or invalid packet:", p))
raise
- # multiple clients
+ @unittest.skipUnless(running_extended_tests(), "part of extended tests")
+ def test_static_lb_multi_clients(self):
+ """ NAT44 local service load balancing - multiple clients"""
+
+ external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
+ external_port = 80
+ local_port = 8080
+ server1 = self.pg0.remote_hosts[0]
+ server2 = self.pg0.remote_hosts[1]
+
+ locals = [{'addr': server1.ip4n,
+ 'port': local_port,
+ 'probability': 90},
+ {'addr': server2.ip4n,
+ 'port': local_port,
+ 'probability': 10}]
+
+ self.nat44_add_address(self.nat_addr)
+ self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
+ external_port,
+ IP_PROTOS.tcp,
+ local_num=len(locals),
+ locals=locals)
+ self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
+ self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
+ is_inside=0)
+
server1_n = 0
server2_n = 0
- clients = ip4_range(self.pg1.remote_ip4, 10, 20)
+ clients = ip4_range(self.pg1.remote_ip4, 10, 50)
pkts = []
for client in clients:
p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
# configure interface address and check static mappings
self.pg7.config_ip4()
static_mappings = self.vapi.nat44_static_mapping_dump()
- self.assertEqual(1, len(static_mappings))
- self.assertEqual(static_mappings[0].external_ip_address[0:4],
- self.pg7.local_ip4n)
- self.assertEqual(0xFFFFFFFF, static_mappings[0].external_sw_if_index)
- self.assertEqual((static_mappings[0].tag).split('\0', 1)[0], tag)
+ self.assertEqual(2, len(static_mappings))
+ resolved = False
+ for sm in static_mappings:
+ if sm.external_sw_if_index == 0xFFFFFFFF:
+ self.assertEqual(sm.external_ip_address[0:4],
+ self.pg7.local_ip4n)
+ self.assertEqual((sm.tag).split('\0', 1)[0], tag)
+ resolved = True
+ self.assertTrue(resolved)
# remove interface address and check static mappings
self.pg7.unconfig_ip4()
static_mappings = self.vapi.nat44_static_mapping_dump()
+ self.assertEqual(1, len(static_mappings))
+ self.assertEqual(self.pg7.sw_if_index,
+ static_mappings[0].external_sw_if_index)
+ self.assertEqual((static_mappings[0].tag).split('\0', 1)[0], tag)
+
+ # configure interface address again and check static mappings
+ self.pg7.config_ip4()
+ static_mappings = self.vapi.nat44_static_mapping_dump()
+ self.assertEqual(2, len(static_mappings))
+ resolved = False
+ for sm in static_mappings:
+ if sm.external_sw_if_index == 0xFFFFFFFF:
+ self.assertEqual(sm.external_ip_address[0:4],
+ self.pg7.local_ip4n)
+ self.assertEqual((sm.tag).split('\0', 1)[0], tag)
+ resolved = True
+ self.assertTrue(resolved)
+
+ # remove static mapping
+ self.nat44_add_static_mapping(
+ '1.2.3.4',
+ external_sw_if_index=self.pg7.sw_if_index,
+ tag=tag,
+ is_add=0)
+ static_mappings = self.vapi.nat44_static_mapping_dump()
self.assertEqual(0, len(static_mappings))
def test_interface_addr_identity_nat(self):
# configure interface address and check identity mappings
self.pg7.config_ip4()
identity_mappings = self.vapi.nat44_identity_mapping_dump()
- self.assertEqual(1, len(identity_mappings))
- self.assertEqual(identity_mappings[0].ip_address,
- self.pg7.local_ip4n)
- self.assertEqual(0xFFFFFFFF, identity_mappings[0].sw_if_index)
- self.assertEqual(port, identity_mappings[0].port)
- self.assertEqual(IP_PROTOS.tcp, identity_mappings[0].protocol)
+ resolved = False
+ self.assertEqual(2, len(identity_mappings))
+ for sm in identity_mappings:
+ if sm.sw_if_index == 0xFFFFFFFF:
+ self.assertEqual(identity_mappings[0].ip_address,
+ self.pg7.local_ip4n)
+ self.assertEqual(port, identity_mappings[0].port)
+ self.assertEqual(IP_PROTOS.tcp, identity_mappings[0].protocol)
+ resolved = True
+ self.assertTrue(resolved)
# remove interface address and check identity mappings
self.pg7.unconfig_ip4()
identity_mappings = self.vapi.nat44_identity_mapping_dump()
- self.assertEqual(0, len(identity_mappings))
+ self.assertEqual(1, len(identity_mappings))
+ self.assertEqual(self.pg7.sw_if_index,
+ identity_mappings[0].sw_if_index)
def test_ipfix_nat44_sess(self):
""" IPFIX logging NAT44 session created/delted """
raise
# from local network host to external network
- ext_port = 0
- p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
- IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
- TCP(sport=23456, dport=34567))
- self.pg0.add_stream(p)
+ pkts = self.create_stream_in(self.pg0, self.pg1)
+ self.pg0.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- capture = self.pg1.get_capture(1)
- p = capture[0]
- try:
- ip = p[IP]
- tcp = p[TCP]
- self.assertEqual(ip.src, self.nat_addr)
- self.assertNotEqual(tcp.sport, 23456)
- ext_port = tcp.sport
- self.check_tcp_checksum(p)
- self.check_ip_checksum(p)
- except:
- self.logger.error(ppp("Unexpected or invalid packet:", p))
- raise
+ capture = self.pg1.get_capture(len(pkts))
+ self.verify_capture_out(capture)
+ pkts = self.create_stream_in(self.pg0, self.pg1)
+ self.pg0.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg1.get_capture(len(pkts))
+ self.verify_capture_out(capture)
# from external network back to local network host
- p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
- IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
- TCP(sport=34567, dport=ext_port))
- self.pg1.add_stream(p)
+ pkts = self.create_stream_out(self.pg1)
+ self.pg1.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- capture = self.pg0.get_capture(1)
- p = capture[0]
- server = None
- try:
- ip = p[IP]
- tcp = p[TCP]
- self.assertEqual(ip.dst, self.pg0.remote_ip4)
- self.assertEqual(tcp.dport, 23456)
- self.check_tcp_checksum(p)
- self.check_ip_checksum(p)
- except:
- self.logger.error(ppp("Unexpected or invalid packet:", p))
- raise
+ capture = self.pg0.get_capture(len(pkts))
+ self.verify_capture_in(capture, self.pg0)
def test_output_feature_and_service2(self):
""" NAT44 interface output feature and service host direct access """
self.verify_capture_out(capture, nat_ip=self.pg0.remote_ip4,
same_port=True)
+ def test_output_feature_and_service3(self):
+ """ NAT44 interface output feature and DST NAT """
+ external_addr = '1.2.3.4'
+ external_port = 80
+ local_port = 8080
+
+ self.vapi.nat44_forwarding_enable_disable(1)
+ self.nat44_add_address(self.nat_addr)
+ self.nat44_add_static_mapping(self.pg1.remote_ip4, external_addr,
+ local_port, external_port,
+ proto=IP_PROTOS.tcp, out2in_only=1)
+ self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
+ self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index,
+ is_inside=0)
+ self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
+ is_inside=0)
+
+ p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
+ IP(src=self.pg0.remote_ip4, dst=external_addr) /
+ TCP(sport=12345, dport=external_port))
+ self.pg0.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg1.get_capture(1)
+ p = capture[0]
+ try:
+ ip = p[IP]
+ tcp = p[TCP]
+ self.assertEqual(ip.src, self.pg0.remote_ip4)
+ self.assertEqual(tcp.sport, 12345)
+ self.assertEqual(ip.dst, self.pg1.remote_ip4)
+ self.assertEqual(tcp.dport, local_port)
+ self.check_tcp_checksum(p)
+ self.check_ip_checksum(p)
+ except:
+ self.logger.error(ppp("Unexpected or invalid packet:", p))
+ raise
+
+ p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
+ IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) /
+ TCP(sport=local_port, dport=12345))
+ self.pg1.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg0.get_capture(1)
+ p = capture[0]
+ try:
+ ip = p[IP]
+ tcp = p[TCP]
+ self.assertEqual(ip.src, external_addr)
+ self.assertEqual(tcp.sport, external_port)
+ self.assertEqual(ip.dst, self.pg0.remote_ip4)
+ self.assertEqual(tcp.dport, 12345)
+ self.check_tcp_checksum(p)
+ self.check_ip_checksum(p)
+ except:
+ self.logger.error(ppp("Unexpected or invalid packet:", p))
+ raise
+
def test_one_armed_nat44(self):
""" One armed NAT44 """
remote_host = self.pg9.remote_hosts[0]
self.logger.error(ppp("Unexpected or invalid packet:", p))
raise
- def test_twice_nat(self):
- """ Twice NAT44 """
+ def twice_nat_common(self, self_twice_nat=False, same_pg=False, lb=False,
+ client_id=None):
twice_nat_addr = '10.0.1.3'
+
port_in = 8080
+ if lb:
+ if not same_pg:
+ port_in1 = port_in
+ port_in2 = port_in
+ else:
+ port_in1 = port_in+1
+ port_in2 = port_in+2
+
port_out = 80
eh_port_out = 4567
- eh_port_in = 0
+
+ server1 = self.pg0.remote_hosts[0]
+ server2 = self.pg0.remote_hosts[1]
+ if lb and same_pg:
+ server2 = server1
+ if not lb:
+ server = server1
+
+ pg0 = self.pg0
+ if same_pg:
+ pg1 = self.pg0
+ else:
+ pg1 = self.pg1
+
+ eh_translate = ((not self_twice_nat) or (not lb and same_pg) or
+ client_id == 1)
+
self.nat44_add_address(self.nat_addr)
self.nat44_add_address(twice_nat_addr, twice_nat=1)
- self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
- port_in, port_out, proto=IP_PROTOS.tcp,
- twice_nat=1)
- self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
- self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
+ if not lb:
+ self.nat44_add_static_mapping(pg0.remote_ip4, self.nat_addr,
+ port_in, port_out,
+ proto=IP_PROTOS.tcp,
+ twice_nat=int(not self_twice_nat),
+ self_twice_nat=int(self_twice_nat))
+ else:
+ locals = [{'addr': server1.ip4n,
+ 'port': port_in1,
+ 'probability': 50},
+ {'addr': server2.ip4n,
+ 'port': port_in2,
+ 'probability': 50}]
+ out_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
+ self.vapi.nat44_add_del_lb_static_mapping(out_addr_n,
+ port_out,
+ IP_PROTOS.tcp,
+ twice_nat=int(
+ not self_twice_nat),
+ self_twice_nat=int(
+ self_twice_nat),
+ local_num=len(locals),
+ locals=locals)
+ self.vapi.nat44_interface_add_del_feature(pg0.sw_if_index)
+ self.vapi.nat44_interface_add_del_feature(pg1.sw_if_index,
is_inside=0)
- p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
- IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
+ if same_pg:
+ if not lb:
+ client = server
+ else:
+ assert client_id is not None
+ if client_id == 1:
+ client = self.pg0.remote_hosts[0]
+ elif client_id == 2:
+ client = self.pg0.remote_hosts[1]
+ else:
+ client = pg1.remote_hosts[0]
+ p = (Ether(src=pg1.remote_mac, dst=pg1.local_mac) /
+ IP(src=client.ip4, dst=self.nat_addr) /
TCP(sport=eh_port_out, dport=port_out))
- self.pg1.add_stream(p)
+ pg1.add_stream(p)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- capture = self.pg0.get_capture(1)
+ capture = pg0.get_capture(1)
p = capture[0]
try:
ip = p[IP]
tcp = p[TCP]
- self.assertEqual(ip.dst, self.pg0.remote_ip4)
- self.assertEqual(ip.src, twice_nat_addr)
- self.assertEqual(tcp.dport, port_in)
- self.assertNotEqual(tcp.sport, eh_port_out)
+ if lb:
+ if ip.dst == server1.ip4:
+ server = server1
+ port_in = port_in1
+ else:
+ server = server2
+ port_in = port_in2
+ self.assertEqual(ip.dst, server.ip4)
+ if lb and same_pg:
+ self.assertIn(tcp.dport, [port_in1, port_in2])
+ else:
+ self.assertEqual(tcp.dport, port_in)
+ if eh_translate:
+ self.assertEqual(ip.src, twice_nat_addr)
+ self.assertNotEqual(tcp.sport, eh_port_out)
+ else:
+ self.assertEqual(ip.src, client.ip4)
+ self.assertEqual(tcp.sport, eh_port_out)
+ eh_addr_in = ip.src
eh_port_in = tcp.sport
+ saved_port_in = tcp.dport
self.check_tcp_checksum(p)
self.check_ip_checksum(p)
except:
self.logger.error(ppp("Unexpected or invalid packet:", p))
raise
- p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
- IP(src=self.pg0.remote_ip4, dst=twice_nat_addr) /
- TCP(sport=port_in, dport=eh_port_in))
- self.pg0.add_stream(p)
+ p = (Ether(src=server.mac, dst=pg0.local_mac) /
+ IP(src=server.ip4, dst=eh_addr_in) /
+ TCP(sport=saved_port_in, dport=eh_port_in))
+ pg0.add_stream(p)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- capture = self.pg1.get_capture(1)
+ capture = pg1.get_capture(1)
p = capture[0]
try:
ip = p[IP]
tcp = p[TCP]
- self.assertEqual(ip.dst, self.pg1.remote_ip4)
+ self.assertEqual(ip.dst, client.ip4)
self.assertEqual(ip.src, self.nat_addr)
self.assertEqual(tcp.dport, eh_port_out)
self.assertEqual(tcp.sport, port_out)
self.logger.error(ppp("Unexpected or invalid packet:", p))
raise
- def test_twice_nat_lb(self):
- """ Twice NAT44 local service load balancing """
- external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
- twice_nat_addr = '10.0.1.3'
- local_port = 8080
- external_port = 80
- eh_port_out = 4567
- eh_port_in = 0
- server1 = self.pg0.remote_hosts[0]
- server2 = self.pg0.remote_hosts[1]
+ def test_twice_nat(self):
+ """ Twice NAT44 """
+ self.twice_nat_common()
- locals = [{'addr': server1.ip4n,
- 'port': local_port,
- 'probability': 50},
- {'addr': server2.ip4n,
- 'port': local_port,
- 'probability': 50}]
+ def test_self_twice_nat_positive(self):
+ """ Self Twice NAT44 (positive test) """
+ self.twice_nat_common(self_twice_nat=True, same_pg=True)
- self.nat44_add_address(self.nat_addr)
- self.nat44_add_address(twice_nat_addr, twice_nat=1)
+ def test_self_twice_nat_negative(self):
+ """ Self Twice NAT44 (negative test) """
+ self.twice_nat_common(self_twice_nat=True)
- self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
- external_port,
- IP_PROTOS.tcp,
- twice_nat=1,
- local_num=len(locals),
- locals=locals)
- self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
- self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
- is_inside=0)
+ def test_twice_nat_lb(self):
+ """ Twice NAT44 local service load balancing """
+ self.twice_nat_common(lb=True)
- p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
- IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
- TCP(sport=eh_port_out, dport=external_port))
- self.pg1.add_stream(p)
- self.pg_enable_capture(self.pg_interfaces)
- self.pg_start()
- capture = self.pg0.get_capture(1)
- p = capture[0]
- server = None
- try:
- ip = p[IP]
- tcp = p[TCP]
- self.assertEqual(ip.src, twice_nat_addr)
- self.assertIn(ip.dst, [server1.ip4, server2.ip4])
- if ip.dst == server1.ip4:
- server = server1
- else:
- server = server2
- self.assertNotEqual(tcp.sport, eh_port_out)
- eh_port_in = tcp.sport
- self.assertEqual(tcp.dport, local_port)
- self.check_tcp_checksum(p)
- self.check_ip_checksum(p)
- except:
- self.logger.error(ppp("Unexpected or invalid packet:", p))
- raise
+ def test_self_twice_nat_lb_positive(self):
+ """ Self Twice NAT44 local service load balancing (positive test) """
+ self.twice_nat_common(lb=True, self_twice_nat=True, same_pg=True,
+ client_id=1)
- p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
- IP(src=server.ip4, dst=twice_nat_addr) /
- TCP(sport=local_port, dport=eh_port_in))
- self.pg0.add_stream(p)
- self.pg_enable_capture(self.pg_interfaces)
- self.pg_start()
- capture = self.pg1.get_capture(1)
- p = capture[0]
- try:
- ip = p[IP]
- tcp = p[TCP]
- self.assertEqual(ip.src, self.nat_addr)
- self.assertEqual(ip.dst, self.pg1.remote_ip4)
- self.assertEqual(tcp.sport, external_port)
- self.assertEqual(tcp.dport, eh_port_out)
- self.check_tcp_checksum(p)
- self.check_ip_checksum(p)
- except:
- self.logger.error(ppp("Unexpected or invalid packet:", p))
- raise
+ def test_self_twice_nat_lb_negative(self):
+ """ Self Twice NAT44 local service load balancing (negative test) """
+ self.twice_nat_common(lb=True, self_twice_nat=True, same_pg=True,
+ client_id=2)
def test_twice_nat_interface_addr(self):
""" Acquire twice NAT44 addresses from interface """
self.assertEqual(len(sm), 1)
self.assertEqual(sm[0].total_pkts, 8)
+ def test_check_no_translate(self):
+ """ NAT66 translate only when egress interface is outside interface """
+ self.vapi.nat66_add_del_interface(self.pg0.sw_if_index)
+ self.vapi.nat66_add_del_interface(self.pg1.sw_if_index)
+ self.vapi.nat66_add_del_static_mapping(self.pg0.remote_ip6n,
+ self.nat_addr_n)
+
+ # in2out
+ p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+ IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
+ UDP())
+ self.pg0.add_stream([p])
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg1.get_capture(1)
+ packet = capture[0]
+ try:
+ self.assertEqual(packet[IPv6].src, self.pg0.remote_ip6)
+ self.assertEqual(packet[IPv6].dst, self.pg1.remote_ip6)
+ except:
+ self.logger.error(ppp("Unexpected or invalid packet:", packet))
+ raise
+
def clear_nat66(self):
"""
Clear NAT66 configuration.