return pkts
- def create_stream_out(self, out_if, dst_ip=None, ttl=64):
+ def create_stream_out(self, out_if, dst_ip=None, ttl=64,
+ use_inside_ports=False):
"""
Create packet stream for outside network
:param out_if: Outside interface
:param dst_ip: Destination IP address (Default use global NAT address)
:param ttl: TTL of generated packets
+ :param use_inside_ports: Use inside NAT ports as destination ports
+ instead of outside ports
"""
if dst_ip is None:
dst_ip = self.nat_addr
+ if not use_inside_ports:
+ tcp_port = self.tcp_port_out
+ udp_port = self.udp_port_out
+ icmp_id = self.icmp_id_out
+ else:
+ tcp_port = self.tcp_port_in
+ udp_port = self.udp_port_in
+ icmp_id = self.icmp_id_in
pkts = []
# TCP
p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
- TCP(dport=self.tcp_port_out, sport=20))
+ TCP(dport=tcp_port, sport=20))
pkts.append(p)
# UDP
p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
- UDP(dport=self.udp_port_out, sport=20))
+ UDP(dport=udp_port, sport=20))
pkts.append(p)
# ICMP
p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
- ICMP(id=self.icmp_id_out, type='echo-reply'))
+ ICMP(id=icmp_id, type='echo-reply'))
pkts.append(p)
return pkts
cls.icmp_id_in = 6305
cls.icmp_id_out = 6305
cls.nat_addr = '10.0.0.3'
+ cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
cls.ipfix_src_port = 4739
cls.ipfix_domain_id = 1
cls.pg4._remote_ip4 = cls.pg9._remote_hosts[0]._ip4 = "10.0.0.2"
cls.pg9.resolve_arp()
- random.seed()
-
except Exception:
super(TestNAT44, cls).tearDownClass()
raise
interfaces = self.vapi.nat44_interface_addr_dump()
for intf in interfaces:
- self.vapi.nat44_add_interface_addr(intf.sw_if_index, is_add=0)
+ self.vapi.nat44_add_interface_addr(intf.sw_if_index,
+ twice_nat=intf.twice_nat,
+ is_add=0)
self.vapi.nat_ipfix(enable=0, src_port=self.ipfix_src_port,
domain_id=self.ipfix_domain_id)
addr_only=sm.addr_only,
vrf_id=sm.vrf_id,
protocol=sm.protocol,
+ twice_nat=sm.twice_nat,
is_add=0)
lb_static_mappings = self.vapi.nat44_lb_static_mapping_dump()
lb_sm.external_addr,
lb_sm.external_port,
lb_sm.protocol,
- lb_sm.vrf_id,
+ vrf_id=lb_sm.vrf_id,
+ twice_nat=lb_sm.twice_nat,
is_add=0,
local_num=0,
locals=[])
+ identity_mappings = self.vapi.nat44_identity_mapping_dump()
+ for id_m in identity_mappings:
+ self.vapi.nat44_add_del_identity_mapping(
+ addr_only=id_m.addr_only,
+ ip=id_m.ip_address,
+ port=id_m.port,
+ sw_if_index=id_m.sw_if_index,
+ vrf_id=id_m.vrf_id,
+ protocol=id_m.protocol,
+ is_add=0)
+
adresses = self.vapi.nat44_address_dump()
for addr in adresses:
self.vapi.nat44_add_del_address_range(addr.ip_address,
addr.ip_address,
+ twice_nat=addr.twice_nat,
is_add=0)
self.vapi.nat_set_reass()
def nat44_add_static_mapping(self, local_ip, external_ip='0.0.0.0',
local_port=0, external_port=0, vrf_id=0,
is_add=1, external_sw_if_index=0xFFFFFFFF,
- proto=0):
+ proto=0, twice_nat=0):
"""
Add/delete NAT44 static mapping
:param is_add: 1 if add, 0 if delete (Default add)
:param external_sw_if_index: External interface instead of IP address
:param proto: IP protocol (Mandatory if port specified)
+ :param twice_nat: 1 if translate external host address and port
"""
addr_only = 1
if local_port and external_port:
addr_only,
vrf_id,
proto,
+ twice_nat,
is_add)
- def nat44_add_address(self, ip, is_add=1, vrf_id=0xFFFFFFFF):
+ def nat44_add_address(self, ip, is_add=1, vrf_id=0xFFFFFFFF, twice_nat=0):
"""
Add/delete NAT44 address
:param ip: IP address
:param is_add: 1 if add, 0 if delete (Default add)
+ :param twice_nat: twice NAT address for extenal hosts
"""
nat_addr = socket.inet_pton(socket.AF_INET, ip)
self.vapi.nat44_add_del_address_range(nat_addr, nat_addr, is_add,
- vrf_id=vrf_id)
+ vrf_id=vrf_id,
+ twice_nat=twice_nat)
def test_dynamic(self):
""" NAT44 dynamic translation test """
self.verify_capture_out(capture, same_port=True, packet_num=1)
self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
+ def test_forwarding(self):
+ """ NAT44 forwarding test """
+
+ self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
+ self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
+ is_inside=0)
+ self.vapi.nat44_forwarding_enable_disable(1)
+
+ real_ip = self.pg0.remote_ip4n
+ alias_ip = self.nat_addr_n
+ self.vapi.nat44_add_del_static_mapping(local_ip=real_ip,
+ external_ip=alias_ip)
+
+ try:
+ # in2out - static mapping match
+
+ pkts = self.create_stream_out(self.pg1)
+ self.pg1.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg0.get_capture(len(pkts))
+ self.verify_capture_in(capture, self.pg0)
+
+ pkts = self.create_stream_in(self.pg0, self.pg1)
+ self.pg0.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg1.get_capture(len(pkts))
+ self.verify_capture_out(capture, same_port=True)
+
+ # in2out - no static mapping match
+
+ host0 = self.pg0.remote_hosts[0]
+ self.pg0.remote_hosts[0] = self.pg0.remote_hosts[1]
+ try:
+ pkts = self.create_stream_out(self.pg1,
+ dst_ip=self.pg0.remote_ip4,
+ use_inside_ports=True)
+ self.pg1.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg0.get_capture(len(pkts))
+ self.verify_capture_in(capture, self.pg0)
+
+ pkts = self.create_stream_in(self.pg0, self.pg1)
+ self.pg0.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg1.get_capture(len(pkts))
+ self.verify_capture_out(capture, nat_ip=self.pg0.remote_ip4,
+ same_port=True)
+ finally:
+ self.pg0.remote_hosts[0] = host0
+
+ finally:
+ self.vapi.nat44_forwarding_enable_disable(0)
+ self.vapi.nat44_add_del_static_mapping(local_ip=real_ip,
+ external_ip=alias_ip,
+ is_add=0)
+
def test_static_in(self):
""" 1:1 NAT initialized from inside network """
self.pg_start()
self.pg3.assert_nothing_captured()
+ def test_identity_nat(self):
+ """ Identity NAT """
+
+ self.vapi.nat44_add_del_identity_mapping(ip=self.pg0.remote_ip4n)
+ self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
+ self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
+ is_inside=0)
+
+ p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
+ IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) /
+ TCP(sport=12345, dport=56789))
+ self.pg1.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg0.get_capture(1)
+ p = capture[0]
+ try:
+ ip = p[IP]
+ tcp = p[TCP]
+ self.assertEqual(ip.dst, self.pg0.remote_ip4)
+ self.assertEqual(ip.src, self.pg1.remote_ip4)
+ self.assertEqual(tcp.dport, 56789)
+ self.assertEqual(tcp.sport, 12345)
+ self.check_tcp_checksum(p)
+ self.check_ip_checksum(p)
+ except:
+ self.logger.error(ppp("Unexpected or invalid packet:", p))
+ raise
+
def test_static_lb(self):
""" NAT44 local service load balancing """
external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
static_mappings = self.vapi.nat44_static_mapping_dump()
self.assertEqual(0, len(static_mappings))
+ def test_interface_addr_identity_nat(self):
+ """ Identity NAT with addresses from interface """
+
+ port = 53053
+ self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
+ self.vapi.nat44_add_del_identity_mapping(
+ sw_if_index=self.pg7.sw_if_index,
+ port=port,
+ protocol=IP_PROTOS.tcp,
+ addr_only=0)
+
+ # identity mappings with external interface
+ identity_mappings = self.vapi.nat44_identity_mapping_dump()
+ self.assertEqual(1, len(identity_mappings))
+ self.assertEqual(self.pg7.sw_if_index,
+ identity_mappings[0].sw_if_index)
+
+ # configure interface address and check identity mappings
+ self.pg7.config_ip4()
+ identity_mappings = self.vapi.nat44_identity_mapping_dump()
+ self.assertEqual(1, len(identity_mappings))
+ self.assertEqual(identity_mappings[0].ip_address,
+ self.pg7.local_ip4n)
+ self.assertEqual(0xFFFFFFFF, identity_mappings[0].sw_if_index)
+ self.assertEqual(port, identity_mappings[0].port)
+ self.assertEqual(IP_PROTOS.tcp, identity_mappings[0].protocol)
+
+ # remove interface address and check identity mappings
+ self.pg7.unconfig_ip4()
+ identity_mappings = self.vapi.nat44_identity_mapping_dump()
+ self.assertEqual(0, len(identity_mappings))
+
def test_ipfix_nat44_sess(self):
""" IPFIX logging NAT44 session created/delted """
self.ipfix_domain_id = 10
nat_ip2 = "10.0.0.11"
self.nat44_add_address(nat_ip1)
- self.nat44_add_address(nat_ip2)
+ self.nat44_add_address(nat_ip2, vrf_id=99)
self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
is_inside=0)
- self.vapi.cli("nat44 addr-port-assignment-alg map-e psid 10 "
+ self.vapi.cli("nat addr-port-assignment-alg map-e psid 10 "
"psid-offset 6 psid-len 6")
p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
self.logger.error(ppp("Unexpected or invalid packet:", p))
raise
+ def test_twice_nat(self):
+ """ Twice NAT44 """
+ twice_nat_addr = '10.0.1.3'
+ port_in = 8080
+ port_out = 80
+ eh_port_out = 4567
+ eh_port_in = 0
+ self.nat44_add_address(self.nat_addr)
+ self.nat44_add_address(twice_nat_addr, twice_nat=1)
+ self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
+ port_in, port_out, proto=IP_PROTOS.tcp,
+ twice_nat=1)
+ self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
+ self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
+ is_inside=0)
+
+ p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
+ IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
+ TCP(sport=eh_port_out, dport=port_out))
+ self.pg1.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg0.get_capture(1)
+ p = capture[0]
+ try:
+ ip = p[IP]
+ tcp = p[TCP]
+ self.assertEqual(ip.dst, self.pg0.remote_ip4)
+ self.assertEqual(ip.src, twice_nat_addr)
+ self.assertEqual(tcp.dport, port_in)
+ self.assertNotEqual(tcp.sport, eh_port_out)
+ eh_port_in = tcp.sport
+ self.check_tcp_checksum(p)
+ self.check_ip_checksum(p)
+ except:
+ self.logger.error(ppp("Unexpected or invalid packet:", p))
+ raise
+
+ p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
+ IP(src=self.pg0.remote_ip4, dst=twice_nat_addr) /
+ TCP(sport=port_in, dport=eh_port_in))
+ self.pg0.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg1.get_capture(1)
+ p = capture[0]
+ try:
+ ip = p[IP]
+ tcp = p[TCP]
+ self.assertEqual(ip.dst, self.pg1.remote_ip4)
+ self.assertEqual(ip.src, self.nat_addr)
+ self.assertEqual(tcp.dport, eh_port_out)
+ self.assertEqual(tcp.sport, port_out)
+ self.check_tcp_checksum(p)
+ self.check_ip_checksum(p)
+ except:
+ self.logger.error(ppp("Unexpected or invalid packet:", p))
+ raise
+
+ def test_twice_nat_lb(self):
+ """ Twice NAT44 local service load balancing """
+ external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
+ twice_nat_addr = '10.0.1.3'
+ local_port = 8080
+ external_port = 80
+ eh_port_out = 4567
+ eh_port_in = 0
+ server1 = self.pg0.remote_hosts[0]
+ server2 = self.pg0.remote_hosts[1]
+
+ locals = [{'addr': server1.ip4n,
+ 'port': local_port,
+ 'probability': 50},
+ {'addr': server2.ip4n,
+ 'port': local_port,
+ 'probability': 50}]
+
+ self.nat44_add_address(self.nat_addr)
+ self.nat44_add_address(twice_nat_addr, twice_nat=1)
+
+ self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
+ external_port,
+ IP_PROTOS.tcp,
+ twice_nat=1,
+ local_num=len(locals),
+ locals=locals)
+ self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
+ self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
+ is_inside=0)
+
+ p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
+ IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
+ TCP(sport=eh_port_out, dport=external_port))
+ self.pg1.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg0.get_capture(1)
+ p = capture[0]
+ server = None
+ try:
+ ip = p[IP]
+ tcp = p[TCP]
+ self.assertEqual(ip.src, twice_nat_addr)
+ self.assertIn(ip.dst, [server1.ip4, server2.ip4])
+ if ip.dst == server1.ip4:
+ server = server1
+ else:
+ server = server2
+ self.assertNotEqual(tcp.sport, eh_port_out)
+ eh_port_in = tcp.sport
+ self.assertEqual(tcp.dport, local_port)
+ self.check_tcp_checksum(p)
+ self.check_ip_checksum(p)
+ except:
+ self.logger.error(ppp("Unexpected or invalid packet:", p))
+ raise
+
+ p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
+ IP(src=server.ip4, dst=twice_nat_addr) /
+ TCP(sport=local_port, dport=eh_port_in))
+ self.pg0.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg1.get_capture(1)
+ p = capture[0]
+ try:
+ ip = p[IP]
+ tcp = p[TCP]
+ self.assertEqual(ip.src, self.nat_addr)
+ self.assertEqual(ip.dst, self.pg1.remote_ip4)
+ self.assertEqual(tcp.sport, external_port)
+ self.assertEqual(tcp.dport, eh_port_out)
+ self.check_tcp_checksum(p)
+ self.check_ip_checksum(p)
+ except:
+ self.logger.error(ppp("Unexpected or invalid packet:", p))
+ raise
+
+ def test_twice_nat_interface_addr(self):
+ """ Acquire twice NAT44 addresses from interface """
+ self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index, twice_nat=1)
+
+ # no address in NAT pool
+ adresses = self.vapi.nat44_address_dump()
+ self.assertEqual(0, len(adresses))
+
+ # configure interface address and check NAT address pool
+ self.pg7.config_ip4()
+ adresses = self.vapi.nat44_address_dump()
+ self.assertEqual(1, len(adresses))
+ self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
+ self.assertEqual(adresses[0].twice_nat, 1)
+
+ # remove interface address and check NAT address pool
+ self.pg7.unconfig_ip4()
+ adresses = self.vapi.nat44_address_dump()
+ self.assertEqual(0, len(adresses))
+
def tearDown(self):
super(TestNAT44, self).tearDown()
if not self.vpp_dead:
self.logger.info(self.vapi.cli("show nat44 verbose"))
self.logger.info(self.vapi.cli("show nat virtual-reassembly"))
- self.vapi.cli("nat44 addr-port-assignment-alg default")
+ self.vapi.cli("nat addr-port-assignment-alg default")
self.clear_nat44()
self.check_ip_checksum(capture)
self.check_icmp_checksum(capture)
+ # ping DS-Lite AFTR tunnel endpoint address
+ p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
+ IPv6(src=self.pg1.remote_hosts[1].ip6, dst=aftr_ip6) /
+ ICMPv6EchoRequest())
+ self.pg1.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg1.get_capture(1)
+ self.assertEqual(1, len(capture))
+ capture = capture[0]
+ self.assertEqual(capture[IPv6].src, aftr_ip6)
+ self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
+ self.assertTrue(capture.haslayer(ICMPv6EchoReply))
+
def tearDown(self):
super(TestDSlite, self).tearDown()
if not self.vpp_dead: