import socket
import unittest
import struct
+import StringIO
+import random
from framework import VppTestCase, VppTestRunner, running_extended_tests
from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto
from scapy.layers.inet import IP, TCP, UDP, ICMP
from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply
-from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6
+from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6, IPv6ExtHdrFragment
from scapy.layers.l2 import Ether, ARP, GRE
from scapy.data import IP_PROTOS
-from scapy.packet import bind_layers
+from scapy.packet import bind_layers, Raw
+from scapy.all import fragment6
from util import ppp
from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
from time import sleep
return pkts
- def create_stream_out(self, out_if, dst_ip=None, ttl=64):
+ def create_stream_out(self, out_if, dst_ip=None, ttl=64,
+ use_inside_ports=False):
"""
Create packet stream for outside network
:param out_if: Outside interface
:param dst_ip: Destination IP address (Default use global NAT address)
:param ttl: TTL of generated packets
+ :param use_inside_ports: Use inside NAT ports as destination ports
+ instead of outside ports
"""
if dst_ip is None:
dst_ip = self.nat_addr
+ if not use_inside_ports:
+ tcp_port = self.tcp_port_out
+ udp_port = self.udp_port_out
+ icmp_id = self.icmp_id_out
+ else:
+ tcp_port = self.tcp_port_in
+ udp_port = self.udp_port_in
+ icmp_id = self.icmp_id_in
pkts = []
# TCP
p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
- TCP(dport=self.tcp_port_out, sport=20))
+ TCP(dport=tcp_port, sport=20))
pkts.append(p)
# UDP
p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
- UDP(dport=self.udp_port_out, sport=20))
+ UDP(dport=udp_port, sport=20))
pkts.append(p)
# ICMP
p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
- ICMP(id=self.icmp_id_out, type='echo-reply'))
+ ICMP(id=icmp_id, type='echo-reply'))
pkts.append(p)
return pkts
"(inside network):", packet))
raise
+ def create_stream_frag(self, src_if, dst, sport, dport, data):
+ """
+ Create fragmented packet stream
+
+ :param src_if: Source interface
+ :param dst: Destination IPv4 address
+ :param sport: Source TCP port
+ :param dport: Destination TCP port
+ :param data: Payload data
+ :returns: Fragmets
+ """
+ id = random.randint(0, 65535)
+ p = (IP(src=src_if.remote_ip4, dst=dst) /
+ TCP(sport=sport, dport=dport) /
+ Raw(data))
+ p = p.__class__(str(p))
+ chksum = p['TCP'].chksum
+ pkts = []
+ p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
+ IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=0, id=id) /
+ TCP(sport=sport, dport=dport, chksum=chksum) /
+ Raw(data[0:4]))
+ pkts.append(p)
+ p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
+ IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=3, id=id,
+ proto=IP_PROTOS.tcp) /
+ Raw(data[4:20]))
+ pkts.append(p)
+ p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
+ IP(src=src_if.remote_ip4, dst=dst, frag=5, proto=IP_PROTOS.tcp,
+ id=id) /
+ Raw(data[20:]))
+ pkts.append(p)
+ return pkts
+
+ def create_stream_frag_ip6(self, src_if, dst, sport, dport, data,
+ pref=None, plen=0, frag_size=128):
+ """
+ Create fragmented packet stream
+
+ :param src_if: Source interface
+ :param dst: Destination IPv4 address
+ :param sport: Source TCP port
+ :param dport: Destination TCP port
+ :param data: Payload data
+ :param pref: NAT64 prefix
+ :param plen: NAT64 prefix length
+ :param fragsize: size of fragments
+ :returns: Fragmets
+ """
+ if pref is None:
+ dst_ip6 = ''.join(['64:ff9b::', dst])
+ else:
+ dst_ip6 = self.compose_ip6(dst, pref, plen)
+
+ p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
+ IPv6(src=src_if.remote_ip6, dst=dst_ip6) /
+ IPv6ExtHdrFragment(id=random.randint(0, 65535)) /
+ TCP(sport=sport, dport=dport) /
+ Raw(data))
+
+ return fragment6(p, frag_size)
+
+ def reass_frags_and_verify(self, frags, src, dst):
+ """
+ Reassemble and verify fragmented packet
+
+ :param frags: Captured fragments
+ :param src: Source IPv4 address to verify
+ :param dst: Destination IPv4 address to verify
+
+ :returns: Reassembled IPv4 packet
+ """
+ buffer = StringIO.StringIO()
+ for p in frags:
+ self.assertEqual(p[IP].src, src)
+ self.assertEqual(p[IP].dst, dst)
+ self.check_ip_checksum(p)
+ buffer.seek(p[IP].frag * 8)
+ buffer.write(p[IP].payload)
+ ip = frags[0].getlayer(IP)
+ ip = IP(src=frags[0][IP].src, dst=frags[0][IP].dst,
+ proto=frags[0][IP].proto)
+ if ip.proto == IP_PROTOS.tcp:
+ p = (ip / TCP(buffer.getvalue()))
+ self.check_tcp_checksum(p)
+ elif ip.proto == IP_PROTOS.udp:
+ p = (ip / UDP(buffer.getvalue()))
+ return p
+
+ def reass_frags_and_verify_ip6(self, frags, src, dst):
+ """
+ Reassemble and verify fragmented packet
+
+ :param frags: Captured fragments
+ :param src: Source IPv6 address to verify
+ :param dst: Destination IPv6 address to verify
+
+ :returns: Reassembled IPv6 packet
+ """
+ buffer = StringIO.StringIO()
+ for p in frags:
+ self.assertEqual(p[IPv6].src, src)
+ self.assertEqual(p[IPv6].dst, dst)
+ buffer.seek(p[IPv6ExtHdrFragment].offset * 8)
+ buffer.write(p[IPv6ExtHdrFragment].payload)
+ ip = IPv6(src=frags[0][IPv6].src, dst=frags[0][IPv6].dst,
+ nh=frags[0][IPv6ExtHdrFragment].nh)
+ if ip.nh == IP_PROTOS.tcp:
+ p = (ip / TCP(buffer.getvalue()))
+ self.check_tcp_checksum(p)
+ elif ip.nh == IP_PROTOS.udp:
+ p = (ip / UDP(buffer.getvalue()))
+ return p
+
def verify_ipfix_nat44_ses(self, data):
"""
Verify IPFIX NAT44 session create/delete event
cls.icmp_id_in = 6305
cls.icmp_id_out = 6305
cls.nat_addr = '10.0.0.3'
+ cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
cls.ipfix_src_port = 4739
cls.ipfix_domain_id = 1
interfaces = self.vapi.nat44_interface_addr_dump()
for intf in interfaces:
- self.vapi.nat44_add_interface_addr(intf.sw_if_index, is_add=0)
+ self.vapi.nat44_add_interface_addr(intf.sw_if_index,
+ twice_nat=intf.twice_nat,
+ is_add=0)
self.vapi.nat_ipfix(enable=0, src_port=self.ipfix_src_port,
domain_id=self.ipfix_domain_id)
addr_only=sm.addr_only,
vrf_id=sm.vrf_id,
protocol=sm.protocol,
+ twice_nat=sm.twice_nat,
is_add=0)
lb_static_mappings = self.vapi.nat44_lb_static_mapping_dump()
lb_sm.external_addr,
lb_sm.external_port,
lb_sm.protocol,
- lb_sm.vrf_id,
+ vrf_id=lb_sm.vrf_id,
+ twice_nat=lb_sm.twice_nat,
is_add=0,
local_num=0,
locals=[])
+ identity_mappings = self.vapi.nat44_identity_mapping_dump()
+ for id_m in identity_mappings:
+ self.vapi.nat44_add_del_identity_mapping(
+ addr_only=id_m.addr_only,
+ ip=id_m.ip_address,
+ port=id_m.port,
+ sw_if_index=id_m.sw_if_index,
+ vrf_id=id_m.vrf_id,
+ protocol=id_m.protocol,
+ is_add=0)
+
adresses = self.vapi.nat44_address_dump()
for addr in adresses:
self.vapi.nat44_add_del_address_range(addr.ip_address,
addr.ip_address,
+ twice_nat=addr.twice_nat,
is_add=0)
+ self.vapi.nat_set_reass()
+ self.vapi.nat_set_reass(is_ip6=1)
+
def nat44_add_static_mapping(self, local_ip, external_ip='0.0.0.0',
local_port=0, external_port=0, vrf_id=0,
is_add=1, external_sw_if_index=0xFFFFFFFF,
- proto=0):
+ proto=0, twice_nat=0):
"""
Add/delete NAT44 static mapping
:param is_add: 1 if add, 0 if delete (Default add)
:param external_sw_if_index: External interface instead of IP address
:param proto: IP protocol (Mandatory if port specified)
+ :param twice_nat: 1 if translate external host address and port
"""
addr_only = 1
if local_port and external_port:
addr_only,
vrf_id,
proto,
+ twice_nat,
is_add)
- def nat44_add_address(self, ip, is_add=1, vrf_id=0xFFFFFFFF):
+ def nat44_add_address(self, ip, is_add=1, vrf_id=0xFFFFFFFF, twice_nat=0):
"""
Add/delete NAT44 address
:param ip: IP address
:param is_add: 1 if add, 0 if delete (Default add)
+ :param twice_nat: twice NAT address for extenal hosts
"""
nat_addr = socket.inet_pton(socket.AF_INET, ip)
self.vapi.nat44_add_del_address_range(nat_addr, nat_addr, is_add,
- vrf_id=vrf_id)
+ vrf_id=vrf_id,
+ twice_nat=twice_nat)
def test_dynamic(self):
""" NAT44 dynamic translation test """
self.verify_capture_out(capture, same_port=True, packet_num=1)
self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
+ def test_forwarding(self):
+ """ NAT44 forwarding test """
+
+ self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
+ self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
+ is_inside=0)
+ self.vapi.nat44_forwarding_enable_disable(1)
+
+ real_ip = self.pg0.remote_ip4n
+ alias_ip = self.nat_addr_n
+ self.vapi.nat44_add_del_static_mapping(local_ip=real_ip,
+ external_ip=alias_ip)
+
+ try:
+ # in2out - static mapping match
+
+ pkts = self.create_stream_out(self.pg1)
+ self.pg1.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg0.get_capture(len(pkts))
+ self.verify_capture_in(capture, self.pg0)
+
+ pkts = self.create_stream_in(self.pg0, self.pg1)
+ self.pg0.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg1.get_capture(len(pkts))
+ self.verify_capture_out(capture, same_port=True)
+
+ # in2out - no static mapping match
+
+ host0 = self.pg0.remote_hosts[0]
+ self.pg0.remote_hosts[0] = self.pg0.remote_hosts[1]
+ try:
+ pkts = self.create_stream_out(self.pg1,
+ dst_ip=self.pg0.remote_ip4,
+ use_inside_ports=True)
+ self.pg1.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg0.get_capture(len(pkts))
+ self.verify_capture_in(capture, self.pg0)
+
+ pkts = self.create_stream_in(self.pg0, self.pg1)
+ self.pg0.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg1.get_capture(len(pkts))
+ self.verify_capture_out(capture, nat_ip=self.pg0.remote_ip4,
+ same_port=True)
+ finally:
+ self.pg0.remote_hosts[0] = host0
+
+ finally:
+ self.vapi.nat44_forwarding_enable_disable(0)
+ self.vapi.nat44_add_del_static_mapping(local_ip=real_ip,
+ external_ip=alias_ip,
+ is_add=0)
+
def test_static_in(self):
""" 1:1 NAT initialized from inside network """
self.pg_start()
self.pg3.assert_nothing_captured()
+ def test_identity_nat(self):
+ """ Identity NAT """
+
+ self.vapi.nat44_add_del_identity_mapping(ip=self.pg0.remote_ip4n)
+ self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
+ self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
+ is_inside=0)
+
+ p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
+ IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) /
+ TCP(sport=12345, dport=56789))
+ self.pg1.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg0.get_capture(1)
+ p = capture[0]
+ try:
+ ip = p[IP]
+ tcp = p[TCP]
+ self.assertEqual(ip.dst, self.pg0.remote_ip4)
+ self.assertEqual(ip.src, self.pg1.remote_ip4)
+ self.assertEqual(tcp.dport, 56789)
+ self.assertEqual(tcp.sport, 12345)
+ self.check_tcp_checksum(p)
+ self.check_ip_checksum(p)
+ except:
+ self.logger.error(ppp("Unexpected or invalid packet:", p))
+ raise
+
def test_static_lb(self):
""" NAT44 local service load balancing """
external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
static_mappings = self.vapi.nat44_static_mapping_dump()
self.assertEqual(0, len(static_mappings))
+ def test_interface_addr_identity_nat(self):
+ """ Identity NAT with addresses from interface """
+
+ port = 53053
+ self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
+ self.vapi.nat44_add_del_identity_mapping(
+ sw_if_index=self.pg7.sw_if_index,
+ port=port,
+ protocol=IP_PROTOS.tcp,
+ addr_only=0)
+
+ # identity mappings with external interface
+ identity_mappings = self.vapi.nat44_identity_mapping_dump()
+ self.assertEqual(1, len(identity_mappings))
+ self.assertEqual(self.pg7.sw_if_index,
+ identity_mappings[0].sw_if_index)
+
+ # configure interface address and check identity mappings
+ self.pg7.config_ip4()
+ identity_mappings = self.vapi.nat44_identity_mapping_dump()
+ self.assertEqual(1, len(identity_mappings))
+ self.assertEqual(identity_mappings[0].ip_address,
+ self.pg7.local_ip4n)
+ self.assertEqual(0xFFFFFFFF, identity_mappings[0].sw_if_index)
+ self.assertEqual(port, identity_mappings[0].port)
+ self.assertEqual(IP_PROTOS.tcp, identity_mappings[0].protocol)
+
+ # remove interface address and check identity mappings
+ self.pg7.unconfig_ip4()
+ identity_mappings = self.vapi.nat44_identity_mapping_dump()
+ self.assertEqual(0, len(identity_mappings))
+
def test_ipfix_nat44_sess(self):
""" IPFIX logging NAT44 session created/delted """
self.ipfix_domain_id = 10
nat_ip2 = "10.0.0.11"
self.nat44_add_address(nat_ip1)
- self.nat44_add_address(nat_ip2)
+ self.nat44_add_address(nat_ip2, vrf_id=99)
self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
self.assertEqual(nsessions - len(sessions), 2)
+ def test_set_get_reass(self):
+ """ NAT44 set/get virtual fragmentation reassembly """
+ reas_cfg1 = self.vapi.nat_get_reass()
+
+ self.vapi.nat_set_reass(timeout=reas_cfg1.ip4_timeout + 5,
+ max_reass=reas_cfg1.ip4_max_reass * 2,
+ max_frag=reas_cfg1.ip4_max_frag * 2)
+
+ reas_cfg2 = self.vapi.nat_get_reass()
+
+ self.assertEqual(reas_cfg1.ip4_timeout + 5, reas_cfg2.ip4_timeout)
+ self.assertEqual(reas_cfg1.ip4_max_reass * 2, reas_cfg2.ip4_max_reass)
+ self.assertEqual(reas_cfg1.ip4_max_frag * 2, reas_cfg2.ip4_max_frag)
+
+ self.vapi.nat_set_reass(drop_frag=1)
+ self.assertTrue(self.vapi.nat_get_reass().ip4_drop_frag)
+
+ def test_frag_in_order(self):
+ """ NAT44 translate fragments arriving in order """
+ self.nat44_add_address(self.nat_addr)
+ self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
+ self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
+ is_inside=0)
+
+ data = "A" * 4 + "B" * 16 + "C" * 3
+ self.tcp_port_in = random.randint(1025, 65535)
+
+ reass = self.vapi.nat_reass_dump()
+ reass_n_start = len(reass)
+
+ # in2out
+ pkts = self.create_stream_frag(self.pg0,
+ self.pg1.remote_ip4,
+ self.tcp_port_in,
+ 20,
+ data)
+ self.pg0.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ frags = self.pg1.get_capture(len(pkts))
+ p = self.reass_frags_and_verify(frags,
+ self.nat_addr,
+ self.pg1.remote_ip4)
+ self.assertEqual(p[TCP].dport, 20)
+ self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
+ self.tcp_port_out = p[TCP].sport
+ self.assertEqual(data, p[Raw].load)
+
+ # out2in
+ pkts = self.create_stream_frag(self.pg1,
+ self.nat_addr,
+ 20,
+ self.tcp_port_out,
+ data)
+ self.pg1.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ frags = self.pg0.get_capture(len(pkts))
+ p = self.reass_frags_and_verify(frags,
+ self.pg1.remote_ip4,
+ self.pg0.remote_ip4)
+ self.assertEqual(p[TCP].sport, 20)
+ self.assertEqual(p[TCP].dport, self.tcp_port_in)
+ self.assertEqual(data, p[Raw].load)
+
+ reass = self.vapi.nat_reass_dump()
+ reass_n_end = len(reass)
+
+ self.assertEqual(reass_n_end - reass_n_start, 2)
+
+ def test_reass_hairpinning(self):
+ """ NAT44 fragments hairpinning """
+ host = self.pg0.remote_hosts[0]
+ server = self.pg0.remote_hosts[1]
+ host_in_port = random.randint(1025, 65535)
+ host_out_port = 0
+ server_in_port = random.randint(1025, 65535)
+ server_out_port = random.randint(1025, 65535)
+ data = "A" * 4 + "B" * 16 + "C" * 3
+
+ self.nat44_add_address(self.nat_addr)
+ self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
+ self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
+ is_inside=0)
+ # add static mapping for server
+ self.nat44_add_static_mapping(server.ip4, self.nat_addr,
+ server_in_port, server_out_port,
+ proto=IP_PROTOS.tcp)
+
+ # send packet from host to server
+ pkts = self.create_stream_frag(self.pg0,
+ self.nat_addr,
+ host_in_port,
+ server_out_port,
+ data)
+ self.pg0.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ frags = self.pg0.get_capture(len(pkts))
+ p = self.reass_frags_and_verify(frags,
+ self.nat_addr,
+ server.ip4)
+ self.assertNotEqual(p[TCP].sport, host_in_port)
+ self.assertEqual(p[TCP].dport, server_in_port)
+ self.assertEqual(data, p[Raw].load)
+
+ def test_frag_out_of_order(self):
+ """ NAT44 translate fragments arriving out of order """
+ self.nat44_add_address(self.nat_addr)
+ self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
+ self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
+ is_inside=0)
+
+ data = "A" * 4 + "B" * 16 + "C" * 3
+ random.randint(1025, 65535)
+
+ # in2out
+ pkts = self.create_stream_frag(self.pg0,
+ self.pg1.remote_ip4,
+ self.tcp_port_in,
+ 20,
+ data)
+ pkts.reverse()
+ self.pg0.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ frags = self.pg1.get_capture(len(pkts))
+ p = self.reass_frags_and_verify(frags,
+ self.nat_addr,
+ self.pg1.remote_ip4)
+ self.assertEqual(p[TCP].dport, 20)
+ self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
+ self.tcp_port_out = p[TCP].sport
+ self.assertEqual(data, p[Raw].load)
+
+ # out2in
+ pkts = self.create_stream_frag(self.pg1,
+ self.nat_addr,
+ 20,
+ self.tcp_port_out,
+ data)
+ pkts.reverse()
+ self.pg1.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ frags = self.pg0.get_capture(len(pkts))
+ p = self.reass_frags_and_verify(frags,
+ self.pg1.remote_ip4,
+ self.pg0.remote_ip4)
+ self.assertEqual(p[TCP].sport, 20)
+ self.assertEqual(p[TCP].dport, self.tcp_port_in)
+ self.assertEqual(data, p[Raw].load)
+
+ def test_port_restricted(self):
+ """ Port restricted NAT44 (MAP-E CE) """
+ self.nat44_add_address(self.nat_addr)
+ self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
+ self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
+ is_inside=0)
+ self.vapi.cli("nat addr-port-assignment-alg map-e psid 10 "
+ "psid-offset 6 psid-len 6")
+
+ p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
+ IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
+ TCP(sport=4567, dport=22))
+ self.pg0.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg1.get_capture(1)
+ p = capture[0]
+ try:
+ ip = p[IP]
+ tcp = p[TCP]
+ self.assertEqual(ip.dst, self.pg1.remote_ip4)
+ self.assertEqual(ip.src, self.nat_addr)
+ self.assertEqual(tcp.dport, 22)
+ self.assertNotEqual(tcp.sport, 4567)
+ self.assertEqual((tcp.sport >> 6) & 63, 10)
+ self.check_tcp_checksum(p)
+ self.check_ip_checksum(p)
+ except:
+ self.logger.error(ppp("Unexpected or invalid packet:", p))
+ raise
+
+ def test_twice_nat(self):
+ """ Twice NAT44 """
+ twice_nat_addr = '10.0.1.3'
+ port_in = 8080
+ port_out = 80
+ eh_port_out = 4567
+ eh_port_in = 0
+ self.nat44_add_address(self.nat_addr)
+ self.nat44_add_address(twice_nat_addr, twice_nat=1)
+ self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
+ port_in, port_out, proto=IP_PROTOS.tcp,
+ twice_nat=1)
+ self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
+ self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
+ is_inside=0)
+
+ p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
+ IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
+ TCP(sport=eh_port_out, dport=port_out))
+ self.pg1.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg0.get_capture(1)
+ p = capture[0]
+ try:
+ ip = p[IP]
+ tcp = p[TCP]
+ self.assertEqual(ip.dst, self.pg0.remote_ip4)
+ self.assertEqual(ip.src, twice_nat_addr)
+ self.assertEqual(tcp.dport, port_in)
+ self.assertNotEqual(tcp.sport, eh_port_out)
+ eh_port_in = tcp.sport
+ self.check_tcp_checksum(p)
+ self.check_ip_checksum(p)
+ except:
+ self.logger.error(ppp("Unexpected or invalid packet:", p))
+ raise
+
+ p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
+ IP(src=self.pg0.remote_ip4, dst=twice_nat_addr) /
+ TCP(sport=port_in, dport=eh_port_in))
+ self.pg0.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg1.get_capture(1)
+ p = capture[0]
+ try:
+ ip = p[IP]
+ tcp = p[TCP]
+ self.assertEqual(ip.dst, self.pg1.remote_ip4)
+ self.assertEqual(ip.src, self.nat_addr)
+ self.assertEqual(tcp.dport, eh_port_out)
+ self.assertEqual(tcp.sport, port_out)
+ self.check_tcp_checksum(p)
+ self.check_ip_checksum(p)
+ except:
+ self.logger.error(ppp("Unexpected or invalid packet:", p))
+ raise
+
+ def test_twice_nat_lb(self):
+ """ Twice NAT44 local service load balancing """
+ external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
+ twice_nat_addr = '10.0.1.3'
+ local_port = 8080
+ external_port = 80
+ eh_port_out = 4567
+ eh_port_in = 0
+ server1 = self.pg0.remote_hosts[0]
+ server2 = self.pg0.remote_hosts[1]
+
+ locals = [{'addr': server1.ip4n,
+ 'port': local_port,
+ 'probability': 50},
+ {'addr': server2.ip4n,
+ 'port': local_port,
+ 'probability': 50}]
+
+ self.nat44_add_address(self.nat_addr)
+ self.nat44_add_address(twice_nat_addr, twice_nat=1)
+
+ self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
+ external_port,
+ IP_PROTOS.tcp,
+ twice_nat=1,
+ local_num=len(locals),
+ locals=locals)
+ self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
+ self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
+ is_inside=0)
+
+ p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
+ IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
+ TCP(sport=eh_port_out, dport=external_port))
+ self.pg1.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg0.get_capture(1)
+ p = capture[0]
+ server = None
+ try:
+ ip = p[IP]
+ tcp = p[TCP]
+ self.assertEqual(ip.src, twice_nat_addr)
+ self.assertIn(ip.dst, [server1.ip4, server2.ip4])
+ if ip.dst == server1.ip4:
+ server = server1
+ else:
+ server = server2
+ self.assertNotEqual(tcp.sport, eh_port_out)
+ eh_port_in = tcp.sport
+ self.assertEqual(tcp.dport, local_port)
+ self.check_tcp_checksum(p)
+ self.check_ip_checksum(p)
+ except:
+ self.logger.error(ppp("Unexpected or invalid packet:", p))
+ raise
+
+ p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
+ IP(src=server.ip4, dst=twice_nat_addr) /
+ TCP(sport=local_port, dport=eh_port_in))
+ self.pg0.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg1.get_capture(1)
+ p = capture[0]
+ try:
+ ip = p[IP]
+ tcp = p[TCP]
+ self.assertEqual(ip.src, self.nat_addr)
+ self.assertEqual(ip.dst, self.pg1.remote_ip4)
+ self.assertEqual(tcp.sport, external_port)
+ self.assertEqual(tcp.dport, eh_port_out)
+ self.check_tcp_checksum(p)
+ self.check_ip_checksum(p)
+ except:
+ self.logger.error(ppp("Unexpected or invalid packet:", p))
+ raise
+
+ def test_twice_nat_interface_addr(self):
+ """ Acquire twice NAT44 addresses from interface """
+ self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index, twice_nat=1)
+
+ # no address in NAT pool
+ adresses = self.vapi.nat44_address_dump()
+ self.assertEqual(0, len(adresses))
+
+ # configure interface address and check NAT address pool
+ self.pg7.config_ip4()
+ adresses = self.vapi.nat44_address_dump()
+ self.assertEqual(1, len(adresses))
+ self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
+ self.assertEqual(adresses[0].twice_nat, 1)
+
+ # remove interface address and check NAT address pool
+ self.pg7.unconfig_ip4()
+ adresses = self.vapi.nat44_address_dump()
+ self.assertEqual(0, len(adresses))
+
def tearDown(self):
super(TestNAT44, self).tearDown()
if not self.vpp_dead:
self.logger.info(self.vapi.cli("show nat44 verbose"))
+ self.logger.info(self.vapi.cli("show nat virtual-reassembly"))
+ self.vapi.cli("nat addr-port-assignment-alg default")
self.clear_nat44()
cls.vrf1_nat_addr_n = socket.inet_pton(socket.AF_INET,
cls.vrf1_nat_addr)
- cls.create_pg_interfaces(range(4))
+ cls.create_pg_interfaces(range(5))
cls.ip6_interfaces = list(cls.pg_interfaces[0:1])
cls.ip6_interfaces.append(cls.pg_interfaces[2])
cls.ip4_interfaces = list(cls.pg_interfaces[1:2])
self.logger.error(ppp("Unexpected or invalid packet:", p))
raise
+ def test_frag_in_order(self):
+ """ NAT64 translate fragments arriving in order """
+ self.tcp_port_in = random.randint(1025, 65535)
+
+ self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
+ self.nat_addr_n)
+ self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
+ self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
+
+ reass = self.vapi.nat_reass_dump()
+ reass_n_start = len(reass)
+
+ # in2out
+ data = 'a' * 200
+ pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
+ self.tcp_port_in, 20, data)
+ self.pg0.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ frags = self.pg1.get_capture(len(pkts))
+ p = self.reass_frags_and_verify(frags,
+ self.nat_addr,
+ self.pg1.remote_ip4)
+ self.assertEqual(p[TCP].dport, 20)
+ self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
+ self.tcp_port_out = p[TCP].sport
+ self.assertEqual(data, p[Raw].load)
+
+ # out2in
+ data = "A" * 4 + "b" * 16 + "C" * 3
+ pkts = self.create_stream_frag(self.pg1,
+ self.nat_addr,
+ 20,
+ self.tcp_port_out,
+ data)
+ self.pg1.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ frags = self.pg0.get_capture(len(pkts))
+ src = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
+ p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
+ self.assertEqual(p[TCP].sport, 20)
+ self.assertEqual(p[TCP].dport, self.tcp_port_in)
+ self.assertEqual(data, p[Raw].load)
+
+ reass = self.vapi.nat_reass_dump()
+ reass_n_end = len(reass)
+
+ self.assertEqual(reass_n_end - reass_n_start, 2)
+
+ def test_reass_hairpinning(self):
+ """ NAT64 fragments hairpinning """
+ data = 'a' * 200
+ client = self.pg0.remote_hosts[0]
+ server = self.pg0.remote_hosts[1]
+ server_in_port = random.randint(1025, 65535)
+ server_out_port = random.randint(1025, 65535)
+ client_in_port = random.randint(1025, 65535)
+ ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
+ nat_addr_ip6 = ip.src
+
+ self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
+ self.nat_addr_n)
+ self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
+ self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
+
+ # add static BIB entry for server
+ self.vapi.nat64_add_del_static_bib(server.ip6n,
+ self.nat_addr_n,
+ server_in_port,
+ server_out_port,
+ IP_PROTOS.tcp)
+
+ # send packet from host to server
+ pkts = self.create_stream_frag_ip6(self.pg0,
+ self.nat_addr,
+ client_in_port,
+ server_out_port,
+ data)
+ self.pg0.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ frags = self.pg0.get_capture(len(pkts))
+ p = self.reass_frags_and_verify_ip6(frags, nat_addr_ip6, server.ip6)
+ self.assertNotEqual(p[TCP].sport, client_in_port)
+ self.assertEqual(p[TCP].dport, server_in_port)
+ self.assertEqual(data, p[Raw].load)
+
+ def test_frag_out_of_order(self):
+ """ NAT64 translate fragments arriving out of order """
+ self.tcp_port_in = random.randint(1025, 65535)
+
+ self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
+ self.nat_addr_n)
+ self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
+ self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
+
+ # in2out
+ data = 'a' * 200
+ pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
+ self.tcp_port_in, 20, data)
+ pkts.reverse()
+ self.pg0.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ frags = self.pg1.get_capture(len(pkts))
+ p = self.reass_frags_and_verify(frags,
+ self.nat_addr,
+ self.pg1.remote_ip4)
+ self.assertEqual(p[TCP].dport, 20)
+ self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
+ self.tcp_port_out = p[TCP].sport
+ self.assertEqual(data, p[Raw].load)
+
+ # out2in
+ data = "A" * 4 + "B" * 16 + "C" * 3
+ pkts = self.create_stream_frag(self.pg1,
+ self.nat_addr,
+ 20,
+ self.tcp_port_out,
+ data)
+ pkts.reverse()
+ self.pg1.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ frags = self.pg0.get_capture(len(pkts))
+ src = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
+ p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
+ self.assertEqual(p[TCP].sport, 20)
+ self.assertEqual(p[TCP].dport, self.tcp_port_in)
+ self.assertEqual(data, p[Raw].load)
+
+ def test_interface_addr(self):
+ """ Acquire NAT64 pool addresses from interface """
+ self.vapi.nat64_add_interface_addr(self.pg4.sw_if_index)
+
+ # no address in NAT64 pool
+ adresses = self.vapi.nat44_address_dump()
+ self.assertEqual(0, len(adresses))
+
+ # configure interface address and check NAT64 address pool
+ self.pg4.config_ip4()
+ addresses = self.vapi.nat64_pool_addr_dump()
+ self.assertEqual(len(addresses), 1)
+ self.assertEqual(addresses[0].address, self.pg4.local_ip4n)
+
+ # remove interface address and check NAT64 address pool
+ self.pg4.unconfig_ip4()
+ addresses = self.vapi.nat64_pool_addr_dump()
+ self.assertEqual(0, len(adresses))
+
def nat64_get_ses_num(self):
"""
Return number of active NAT64 sessions.
self.logger.info(self.vapi.cli("show nat64 prefix"))
self.logger.info(self.vapi.cli("show nat64 bib all"))
self.logger.info(self.vapi.cli("show nat64 session table all"))
+ self.logger.info(self.vapi.cli("show nat virtual-reassembly"))
self.clear_nat64()
self.check_ip_checksum(capture)
self.check_icmp_checksum(capture)
+ # ping DS-Lite AFTR tunnel endpoint address
+ p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
+ IPv6(src=self.pg1.remote_hosts[1].ip6, dst=aftr_ip6) /
+ ICMPv6EchoRequest())
+ self.pg1.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg1.get_capture(1)
+ self.assertEqual(1, len(capture))
+ capture = capture[0]
+ self.assertEqual(capture[IPv6].src, aftr_ip6)
+ self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
+ self.assertTrue(capture.haslayer(ICMPv6EchoReply))
+
def tearDown(self):
super(TestDSlite, self).tearDown()
if not self.vpp_dead: