from framework import VppTestCase, VppTestRunner
from scapy.layers.inet import IP, TCP, UDP, ICMP
+from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
from scapy.layers.l2 import Ether, ARP
from scapy.data import IP_PROTOS
from util import ppp
super(TestSNAT, cls).tearDownClass()
raise
- def create_stream_in(self, in_if, out_if):
+ def create_stream_in(self, in_if, out_if, ttl=64):
"""
Create packet stream for inside network
:param in_if: Inside interface
:param out_if: Outside interface
+ :param ttl: TTL of generated packets
"""
pkts = []
# TCP
p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
- IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
+ IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
TCP(sport=self.tcp_port_in))
pkts.append(p)
# UDP
p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
- IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
+ IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
UDP(sport=self.udp_port_in))
pkts.append(p)
# ICMP
p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
- IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
+ IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
ICMP(id=self.icmp_id_in, type='echo-request'))
pkts.append(p)
return pkts
- def create_stream_out(self, out_if, dst_ip=None):
+ def create_stream_out(self, out_if, dst_ip=None, ttl=64):
"""
Create packet stream for outside network
:param out_if: Outside interface
:param dst_ip: Destination IP address (Default use global SNAT address)
+ :param ttl: TTL of generated packets
"""
if dst_ip is None:
dst_ip = self.snat_addr
pkts = []
# TCP
p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
- IP(src=out_if.remote_ip4, dst=dst_ip) /
+ IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
TCP(dport=self.tcp_port_out))
pkts.append(p)
# UDP
p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
- IP(src=out_if.remote_ip4, dst=dst_ip) /
+ IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
UDP(dport=self.udp_port_out))
pkts.append(p)
# ICMP
p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
- IP(src=out_if.remote_ip4, dst=dst_ip) /
+ IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
ICMP(id=self.icmp_id_out, type='echo-reply'))
pkts.append(p)
"(inside network):", packet))
raise
+ def verify_capture_out_with_icmp_errors(self, capture, src_ip=None,
+ packet_num=3, icmp_type=11):
+ """
+ Verify captured packets with ICMP errors on outside network
+
+ :param capture: Captured packets
+ :param src_ip: Translated IP address or IP address of VPP
+ (Default use global SNAT address)
+ :param packet_num: Expected number of packets (Default 3)
+ :param icmp_type: Type of error ICMP packet
+ we are expecting (Default 11)
+ """
+ if src_ip is None:
+ src_ip = self.snat_addr
+ self.assertEqual(packet_num, len(capture))
+ for packet in capture:
+ try:
+ self.assertEqual(packet[IP].src, src_ip)
+ self.assertTrue(packet.haslayer(ICMP))
+ icmp = packet[ICMP]
+ self.assertEqual(icmp.type, icmp_type)
+ self.assertTrue(icmp.haslayer(IPerror))
+ inner_ip = icmp[IPerror]
+ if inner_ip.haslayer(TCPerror):
+ self.assertEqual(inner_ip[TCPerror].dport,
+ self.tcp_port_out)
+ elif inner_ip.haslayer(UDPerror):
+ self.assertEqual(inner_ip[UDPerror].dport,
+ self.udp_port_out)
+ else:
+ self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_out)
+ except:
+ self.logger.error(ppp("Unexpected or invalid packet "
+ "(outside network):", packet))
+ raise
+
+ def verify_capture_in_with_icmp_errors(self, capture, in_if, packet_num=3,
+ icmp_type=11):
+ """
+ Verify captured packets with ICMP errors on inside network
+
+ :param capture: Captured packets
+ :param in_if: Inside interface
+ :param packet_num: Expected number of packets (Default 3)
+ :param icmp_type: Type of error ICMP packet
+ we are expecting (Default 11)
+ """
+ self.assertEqual(packet_num, len(capture))
+ for packet in capture:
+ try:
+ self.assertEqual(packet[IP].dst, in_if.remote_ip4)
+ self.assertTrue(packet.haslayer(ICMP))
+ icmp = packet[ICMP]
+ self.assertEqual(icmp.type, icmp_type)
+ self.assertTrue(icmp.haslayer(IPerror))
+ inner_ip = icmp[IPerror]
+ if inner_ip.haslayer(TCPerror):
+ self.assertEqual(inner_ip[TCPerror].sport,
+ self.tcp_port_in)
+ elif inner_ip.haslayer(UDPerror):
+ self.assertEqual(inner_ip[UDPerror].sport,
+ self.udp_port_in)
+ else:
+ self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_in)
+ except:
+ self.logger.error(ppp("Unexpected or invalid packet "
+ "(inside network):", packet))
+ raise
+
def verify_ipfix_nat44_ses(self, data):
"""
Verify IPFIX NAT44 session create/delete event
external_port=sm.external_port,
addr_only=sm.addr_only,
vrf_id=sm.vrf_id,
+ protocol=sm.protocol,
is_add=0)
adresses = self.vapi.snat_address_dump()
def snat_add_static_mapping(self, local_ip, external_ip='0.0.0.0',
local_port=0, external_port=0, vrf_id=0,
- is_add=1, external_sw_if_index=0xFFFFFFFF):
+ is_add=1, external_sw_if_index=0xFFFFFFFF,
+ proto=0):
"""
Add/delete S-NAT static mapping
:param vrf_id: VRF ID (Default 0)
:param is_add: 1 if add, 0 if delete (Default add)
:param external_sw_if_index: External interface instead of IP address
+ :param proto: IP protocol (Mandatory if port specified)
"""
addr_only = 1
if local_port and external_port:
external_port,
addr_only,
vrf_id,
+ proto,
is_add)
def snat_add_address(self, ip, is_add=1):
capture = self.pg0.get_capture(len(pkts))
self.verify_capture_in(capture, self.pg0)
+ def test_dynamic_icmp_errors_in2out_ttl_1(self):
+ """ SNAT handling of client packets with TTL=1 """
+
+ self.snat_add_address(self.snat_addr)
+ self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
+ self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
+ is_inside=0)
+
+ # Client side - generate traffic
+ pkts = self.create_stream_in(self.pg0, self.pg1, ttl=1)
+ self.pg0.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+
+ # Client side - verify ICMP type 11 packets
+ capture = self.pg0.get_capture(len(pkts))
+ self.verify_capture_in_with_icmp_errors(capture, self.pg0)
+
+ def test_dynamic_icmp_errors_out2in_ttl_1(self):
+ """ SNAT handling of server packets with TTL=1 """
+
+ self.snat_add_address(self.snat_addr)
+ self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
+ self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
+ is_inside=0)
+
+ # Client side - create sessions
+ pkts = self.create_stream_in(self.pg0, self.pg1)
+ self.pg0.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+
+ # Server side - generate traffic
+ capture = self.pg1.get_capture(len(pkts))
+ self.verify_capture_out(capture)
+ pkts = self.create_stream_out(self.pg1, ttl=1)
+ self.pg1.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+
+ # Server side - verify ICMP type 11 packets
+ capture = self.pg1.get_capture(len(pkts))
+ self.verify_capture_out_with_icmp_errors(capture,
+ src_ip=self.pg1.local_ip4)
+
+ def test_dynamic_icmp_errors_in2out_ttl_2(self):
+ """ SNAT handling of error responses to client packets with TTL=2 """
+
+ self.snat_add_address(self.snat_addr)
+ self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
+ self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
+ is_inside=0)
+
+ # Client side - generate traffic
+ pkts = self.create_stream_in(self.pg0, self.pg1, ttl=2)
+ self.pg0.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+
+ # Server side - simulate ICMP type 11 response
+ capture = self.pg1.get_capture(len(pkts))
+ pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
+ IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
+ ICMP(type=11) / packet[IP] for packet in capture]
+ self.pg1.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+
+ # Client side - verify ICMP type 11 packets
+ capture = self.pg0.get_capture(len(pkts))
+ self.verify_capture_in_with_icmp_errors(capture, self.pg0)
+
+ def test_dynamic_icmp_errors_out2in_ttl_2(self):
+ """ SNAT handling of error responses to server packets with TTL=2 """
+
+ self.snat_add_address(self.snat_addr)
+ self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
+ self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
+ is_inside=0)
+
+ # Client side - create sessions
+ pkts = self.create_stream_in(self.pg0, self.pg1)
+ self.pg0.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+
+ # Server side - generate traffic
+ capture = self.pg1.get_capture(len(pkts))
+ self.verify_capture_out(capture)
+ pkts = self.create_stream_out(self.pg1, ttl=2)
+ self.pg1.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+
+ # Client side - simulate ICMP type 11 response
+ capture = self.pg0.get_capture(len(pkts))
+ pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+ IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
+ ICMP(type=11) / packet[IP] for packet in capture]
+ self.pg0.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+
+ # Server side - verify ICMP type 11 packets
+ capture = self.pg1.get_capture(len(pkts))
+ self.verify_capture_out_with_icmp_errors(capture)
+
+ def test_ping_out_interface_from_outside(self):
+ """ Ping SNAT out interface from outside """
+
+ self.snat_add_address(self.snat_addr)
+ self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
+ self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
+ is_inside=0)
+
+ p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
+ IP(src=self.pg1.remote_ip4, dst=self.pg1.local_ip4) /
+ ICMP(id=self.icmp_id_out, type='echo-request'))
+ pkts = [p]
+ self.pg1.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg1.get_capture(len(pkts))
+ self.assertEqual(1, len(capture))
+ packet = capture[0]
+ try:
+ self.assertEqual(packet[IP].src, self.pg1.local_ip4)
+ self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
+ self.assertEqual(packet[ICMP].id, self.icmp_id_in)
+ self.assertEqual(packet[ICMP].type, 0) # echo reply
+ except:
+ self.logger.error(ppp("Unexpected or invalid packet "
+ "(outside network):", packet))
+ raise
+
def test_static_in(self):
""" SNAT 1:1 NAT initialized from inside network """
self.snat_add_address(self.snat_addr)
self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
- self.tcp_port_in, self.tcp_port_out)
+ self.tcp_port_in, self.tcp_port_out,
+ proto=IP_PROTOS.tcp)
self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
- self.udp_port_in, self.udp_port_out)
+ self.udp_port_in, self.udp_port_out,
+ proto=IP_PROTOS.udp)
self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
- self.icmp_id_in, self.icmp_id_out)
+ self.icmp_id_in, self.icmp_id_out,
+ proto=IP_PROTOS.icmp)
self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
is_inside=0)
self.snat_add_address(self.snat_addr)
self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
- self.tcp_port_in, self.tcp_port_out)
+ self.tcp_port_in, self.tcp_port_out,
+ proto=IP_PROTOS.tcp)
self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
- self.udp_port_in, self.udp_port_out)
+ self.udp_port_in, self.udp_port_out,
+ proto=IP_PROTOS.udp)
self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
- self.icmp_id_in, self.icmp_id_out)
+ self.icmp_id_in, self.icmp_id_out,
+ proto=IP_PROTOS.icmp)
self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
is_inside=0)
is_inside=0)
# add static mapping for server
self.snat_add_static_mapping(server.ip4, self.snat_addr,
- server_in_port, server_out_port)
+ server_in_port, server_out_port,
+ proto=IP_PROTOS.tcp)
# send packet from host to server
p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
self.snat_add_static_mapping('1.2.3.4',
external_sw_if_index=self.pg7.sw_if_index)
- # no static mappings
+ # static mappings with external interface
static_mappings = self.vapi.snat_static_mapping_dump()
- self.assertEqual(0, len(static_mappings))
+ self.assertEqual(1, len(static_mappings))
+ self.assertEqual(self.pg7.sw_if_index,
+ static_mappings[0].external_sw_if_index)
# configure interface address and check static mappings
self.pg7.config_ip4()
self.assertEqual(1, len(static_mappings))
self.assertEqual(static_mappings[0].external_ip_address[0:4],
self.pg7.local_ip4n)
+ self.assertEqual(0xFFFFFFFF, static_mappings[0].external_sw_if_index)
# remove interface address and check static mappings
self.pg7.unconfig_ip4()