if self.pg7.has_ip4_config:
self.pg7.unconfig_ip4()
+ self.vapi.nat44_forwarding_enable_disable(0)
+
interfaces = self.vapi.nat44_interface_addr_dump()
for intf in interfaces:
self.vapi.nat44_add_interface_addr(intf.sw_if_index,
vrf_id=sm.vrf_id,
protocol=sm.protocol,
twice_nat=sm.twice_nat,
+ out2in_only=sm.out2in_only,
+ tag=sm.tag,
is_add=0)
lb_static_mappings = self.vapi.nat44_lb_static_mapping_dump()
lb_sm.protocol,
vrf_id=lb_sm.vrf_id,
twice_nat=lb_sm.twice_nat,
+ out2in_only=lb_sm.out2in_only,
+ tag=lb_sm.tag,
is_add=0,
local_num=0,
locals=[])
def nat44_add_static_mapping(self, local_ip, external_ip='0.0.0.0',
local_port=0, external_port=0, vrf_id=0,
is_add=1, external_sw_if_index=0xFFFFFFFF,
- proto=0, twice_nat=0):
+ proto=0, twice_nat=0, out2in_only=0, tag=""):
"""
Add/delete NAT44 static mapping
:param external_sw_if_index: External interface instead of IP address
:param proto: IP protocol (Mandatory if port specified)
:param twice_nat: 1 if translate external host address and port
+ :param out2in_only: if 1 rule is matching only out2in direction
+ :param tag: Opaque string tag
"""
addr_only = 1
if local_port and external_port:
vrf_id,
proto,
twice_nat,
+ out2in_only,
+ tag,
is_add)
def nat44_add_address(self, ip, is_add=1, vrf_id=0xFFFFFFFF, twice_nat=0):
self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
is_inside=0)
+ sm = self.vapi.nat44_static_mapping_dump()
+ self.assertEqual(len(sm), 1)
+ self.assertEqual((sm[0].tag).split('\0', 1)[0], '')
# in2out
pkts = self.create_stream_in(self.pg0, self.pg1)
self.tcp_port_out = 6303
self.udp_port_out = 6304
self.icmp_id_out = 6305
+ tag = "testTAG"
- self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
+ self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip, tag=tag)
self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
is_inside=0)
+ sm = self.vapi.nat44_static_mapping_dump()
+ self.assertEqual(len(sm), 1)
+ self.assertEqual((sm[0].tag).split('\0', 1)[0], tag)
# out2in
pkts = self.create_stream_out(self.pg1, nat_ip)
capture = self.pg1.get_capture(len(pkts))
self.verify_capture_out(capture)
+ def test_static_with_port_out2(self):
+ """ 1:1 NAPT symmetrical rule """
+
+ external_port = 80
+ local_port = 8080
+
+ self.vapi.nat44_forwarding_enable_disable(1)
+ self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
+ local_port, external_port,
+ proto=IP_PROTOS.tcp, out2in_only=1)
+ self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
+ self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
+ is_inside=0)
+
+ # from client to service
+ p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
+ IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
+ TCP(sport=12345, dport=external_port))
+ self.pg1.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg0.get_capture(1)
+ p = capture[0]
+ server = None
+ try:
+ ip = p[IP]
+ tcp = p[TCP]
+ self.assertEqual(ip.dst, self.pg0.remote_ip4)
+ self.assertEqual(tcp.dport, local_port)
+ self.check_tcp_checksum(p)
+ self.check_ip_checksum(p)
+ except:
+ self.logger.error(ppp("Unexpected or invalid packet:", p))
+ raise
+
+ # from service back to client
+ p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
+ IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
+ TCP(sport=local_port, dport=12345))
+ self.pg0.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg1.get_capture(1)
+ p = capture[0]
+ try:
+ ip = p[IP]
+ tcp = p[TCP]
+ self.assertEqual(ip.src, self.nat_addr)
+ self.assertEqual(tcp.sport, external_port)
+ self.check_tcp_checksum(p)
+ self.check_ip_checksum(p)
+ except:
+ self.logger.error(ppp("Unexpected or invalid packet:", p))
+ raise
+
+ # from client to server (no translation)
+ p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
+ IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) /
+ TCP(sport=12346, dport=local_port))
+ self.pg1.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg0.get_capture(1)
+ p = capture[0]
+ server = None
+ try:
+ ip = p[IP]
+ tcp = p[TCP]
+ self.assertEqual(ip.dst, self.pg0.remote_ip4)
+ self.assertEqual(tcp.dport, local_port)
+ self.check_tcp_checksum(p)
+ self.check_ip_checksum(p)
+ except:
+ self.logger.error(ppp("Unexpected or invalid packet:", p))
+ raise
+
+ # from service back to client (no translation)
+ p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
+ IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
+ TCP(sport=local_port, dport=12346))
+ self.pg0.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg1.get_capture(1)
+ p = capture[0]
+ try:
+ ip = p[IP]
+ tcp = p[TCP]
+ self.assertEqual(ip.src, self.pg0.remote_ip4)
+ self.assertEqual(tcp.sport, local_port)
+ self.check_tcp_checksum(p)
+ self.check_ip_checksum(p)
+ except:
+ self.logger.error(ppp("Unexpected or invalid packet:", p))
+ raise
+
def test_static_vrf_aware(self):
""" 1:1 NAT VRF awareness """
server2_n += 1
self.assertTrue(server1_n > server2_n)
+ def test_static_lb_2(self):
+ """ NAT44 local service load balancing (asymmetrical rule) """
+ external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
+ external_port = 80
+ local_port = 8080
+ server1 = self.pg0.remote_hosts[0]
+ server2 = self.pg0.remote_hosts[1]
+
+ locals = [{'addr': server1.ip4n,
+ 'port': local_port,
+ 'probability': 70},
+ {'addr': server2.ip4n,
+ 'port': local_port,
+ 'probability': 30}]
+
+ self.vapi.nat44_forwarding_enable_disable(1)
+ self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
+ external_port,
+ IP_PROTOS.tcp,
+ out2in_only=1,
+ local_num=len(locals),
+ locals=locals)
+ self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
+ self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
+ is_inside=0)
+
+ # from client to service
+ p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
+ IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
+ TCP(sport=12345, dport=external_port))
+ self.pg1.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg0.get_capture(1)
+ p = capture[0]
+ server = None
+ try:
+ ip = p[IP]
+ tcp = p[TCP]
+ self.assertIn(ip.dst, [server1.ip4, server2.ip4])
+ if ip.dst == server1.ip4:
+ server = server1
+ else:
+ server = server2
+ self.assertEqual(tcp.dport, local_port)
+ self.check_tcp_checksum(p)
+ self.check_ip_checksum(p)
+ except:
+ self.logger.error(ppp("Unexpected or invalid packet:", p))
+ raise
+
+ # from service back to client
+ p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
+ IP(src=server.ip4, dst=self.pg1.remote_ip4) /
+ TCP(sport=local_port, dport=12345))
+ self.pg0.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg1.get_capture(1)
+ p = capture[0]
+ try:
+ ip = p[IP]
+ tcp = p[TCP]
+ self.assertEqual(ip.src, self.nat_addr)
+ self.assertEqual(tcp.sport, external_port)
+ self.check_tcp_checksum(p)
+ self.check_ip_checksum(p)
+ except:
+ self.logger.error(ppp("Unexpected or invalid packet:", p))
+ raise
+
+ # from client to server (no translation)
+ p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
+ IP(src=self.pg1.remote_ip4, dst=server1.ip4) /
+ TCP(sport=12346, dport=local_port))
+ self.pg1.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg0.get_capture(1)
+ p = capture[0]
+ server = None
+ try:
+ ip = p[IP]
+ tcp = p[TCP]
+ self.assertEqual(ip.dst, server1.ip4)
+ self.assertEqual(tcp.dport, local_port)
+ self.check_tcp_checksum(p)
+ self.check_ip_checksum(p)
+ except:
+ self.logger.error(ppp("Unexpected or invalid packet:", p))
+ raise
+
+ # from service back to client (no translation)
+ p = (Ether(src=server1.mac, dst=self.pg0.local_mac) /
+ IP(src=server1.ip4, dst=self.pg1.remote_ip4) /
+ TCP(sport=local_port, dport=12346))
+ self.pg0.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg1.get_capture(1)
+ p = capture[0]
+ try:
+ ip = p[IP]
+ tcp = p[TCP]
+ self.assertEqual(ip.src, server1.ip4)
+ self.assertEqual(tcp.sport, local_port)
+ self.check_tcp_checksum(p)
+ self.check_ip_checksum(p)
+ except:
+ self.logger.error(ppp("Unexpected or invalid packet:", p))
+ raise
+
def test_multiple_inside_interfaces(self):
""" NAT44 multiple non-overlapping address space inside interfaces """
self.assertEqual(tcp.dport, host_in_port)
self.check_tcp_checksum(p)
except:
- self.logger.error(ppp("Unexpected or invalid packet:"), p)
+ self.logger.error(ppp("Unexpected or invalid packet:", p))
raise
def test_hairpinning2(self):
def test_interface_addr_static_mapping(self):
""" Static mapping with addresses from interface """
+ tag = "testTAG"
+
self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
self.nat44_add_static_mapping(
'1.2.3.4',
- external_sw_if_index=self.pg7.sw_if_index)
+ external_sw_if_index=self.pg7.sw_if_index,
+ tag=tag)
# static mappings with external interface
static_mappings = self.vapi.nat44_static_mapping_dump()
self.assertEqual(1, len(static_mappings))
self.assertEqual(self.pg7.sw_if_index,
static_mappings[0].external_sw_if_index)
+ self.assertEqual((static_mappings[0].tag).split('\0', 1)[0], tag)
# configure interface address and check static mappings
self.pg7.config_ip4()
self.assertEqual(static_mappings[0].external_ip_address[0:4],
self.pg7.local_ip4n)
self.assertEqual(0xFFFFFFFF, static_mappings[0].external_sw_if_index)
+ self.assertEqual((static_mappings[0].tag).split('\0', 1)[0], tag)
# remove interface address and check static mappings
self.pg7.unconfig_ip4()
self.assertEqual(tcp.dport, host_in_port)
self.check_tcp_checksum(p)
except:
- self.logger.error(ppp("Unexpected or invalid packet:"), p)
+ self.logger.error(ppp("Unexpected or invalid packet:", p))
raise
def test_one_armed_nat44(self):
self.logger.error(ppp("Unexpected or invalid packet:", p))
raise
+ def test_one_armed_nat44_static(self):
+ """ One armed NAT44 and 1:1 NAPT symmetrical rule """
+ remote_host = self.pg9.remote_hosts[0]
+ local_host = self.pg9.remote_hosts[1]
+ external_port = 80
+ local_port = 8080
+ eh_port_in = 0
+
+ self.vapi.nat44_forwarding_enable_disable(1)
+ self.nat44_add_address(self.nat_addr, twice_nat=1)
+ self.nat44_add_static_mapping(local_host.ip4, self.nat_addr,
+ local_port, external_port,
+ proto=IP_PROTOS.tcp, out2in_only=1,
+ twice_nat=1)
+ self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index)
+ self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index,
+ is_inside=0)
+
+ # from client to service
+ p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
+ IP(src=remote_host.ip4, dst=self.nat_addr) /
+ TCP(sport=12345, dport=external_port))
+ self.pg9.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg9.get_capture(1)
+ p = capture[0]
+ server = None
+ try:
+ ip = p[IP]
+ tcp = p[TCP]
+ self.assertEqual(ip.dst, local_host.ip4)
+ self.assertEqual(ip.src, self.nat_addr)
+ self.assertEqual(tcp.dport, local_port)
+ self.assertNotEqual(tcp.sport, 12345)
+ eh_port_in = tcp.sport
+ self.check_tcp_checksum(p)
+ self.check_ip_checksum(p)
+ except:
+ self.logger.error(ppp("Unexpected or invalid packet:", p))
+ raise
+
+ # from service back to client
+ p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
+ IP(src=local_host.ip4, dst=self.nat_addr) /
+ TCP(sport=local_port, dport=eh_port_in))
+ self.pg9.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg9.get_capture(1)
+ p = capture[0]
+ try:
+ ip = p[IP]
+ tcp = p[TCP]
+ self.assertEqual(ip.src, self.nat_addr)
+ self.assertEqual(ip.dst, remote_host.ip4)
+ self.assertEqual(tcp.sport, external_port)
+ self.assertEqual(tcp.dport, 12345)
+ self.check_tcp_checksum(p)
+ self.check_ip_checksum(p)
+ except:
+ self.logger.error(ppp("Unexpected or invalid packet:", p))
+ raise
+
def test_del_session(self):
""" Delete NAT44 session """
self.nat44_add_address(self.nat_addr)
def tearDown(self):
super(TestNAT44, self).tearDown()
if not self.vpp_dead:
- self.logger.info(self.vapi.cli("show nat44 verbose"))
+ self.logger.info(self.vapi.cli("show nat44 addresses"))
+ self.logger.info(self.vapi.cli("show nat44 interfaces"))
+ self.logger.info(self.vapi.cli("show nat44 static mappings"))
+ self.logger.info(self.vapi.cli("show nat44 interface address"))
+ self.logger.info(self.vapi.cli("show nat44 sessions detail"))
self.logger.info(self.vapi.cli("show nat virtual-reassembly"))
self.vapi.cli("nat addr-port-assignment-alg default")
self.clear_nat44()
def tearDown(self):
super(TestDeterministicNAT, self).tearDown()
if not self.vpp_dead:
- self.logger.info(self.vapi.cli("show nat44 detail"))
+ self.logger.info(self.vapi.cli("show nat44 interfaces"))
+ self.logger.info(
+ self.vapi.cli("show nat44 deterministic mappings"))
+ self.logger.info(
+ self.vapi.cli("show nat44 deterministic timeouts"))
+ self.logger.info(
+ self.vapi.cli("show nat44 deterministic sessions"))
self.clear_nat_det()